"""Honcho memory plugin — MemoryProvider for Honcho AI-native memory. Provides cross-session user modeling with dialectic Q&A, semantic search, peer cards, and persistent conclusions via the Honcho SDK. Honcho provides AI-native cross-session user modeling with dialectic Q&A, semantic search, peer cards, and conclusions. The 4 tools (profile, search, context, conclude) are exposed through the MemoryProvider interface. Config: Uses the existing Honcho config chain: 1. $HERMES_HOME/honcho.json (profile-scoped) 2. ~/.honcho/config.json (legacy global) 3. Environment variables """ from __future__ import annotations import json import logging import re import threading from typing import Any, Dict, List, Optional from agent.memory_provider import MemoryProvider from tools.registry import tool_error logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Tool schemas (moved from tools/honcho_tools.py) # --------------------------------------------------------------------------- PROFILE_SCHEMA = { "name": "honcho_profile", "description": ( "Retrieve or update a peer card from Honcho — a curated list of key facts " "about that peer (name, role, preferences, communication style, patterns). " "Pass `card` to update; omit `card` to read." ), "parameters": { "type": "object", "properties": { "peer": { "type": "string", "description": "Peer to query. Built-in aliases: 'user' (default), 'ai'. Or pass any peer ID from this workspace.", }, "card": { "type": "array", "items": {"type": "string"}, "description": "New peer card as a list of fact strings. Omit to read the current card.", }, }, "required": [], }, } SEARCH_SCHEMA = { "name": "honcho_search", "description": ( "Semantic search over Honcho's stored context about a peer. " "Returns raw excerpts ranked by relevance — no LLM synthesis. " "Cheaper and faster than honcho_reasoning. " "Good when you want to find specific past facts and reason over them yourself." ), "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "What to search for in Honcho's memory.", }, "max_tokens": { "type": "integer", "description": "Token budget for returned context (default 800, max 2000).", }, "peer": { "type": "string", "description": "Peer to query. Built-in aliases: 'user' (default), 'ai'. Or pass any peer ID from this workspace.", }, }, "required": ["query"], }, } REASONING_SCHEMA = { "name": "honcho_reasoning", "description": ( "Ask Honcho a natural language question and get a synthesized answer. " "Uses Honcho's LLM (dialectic reasoning) — higher cost than honcho_profile or honcho_search. " "Can query about any peer via alias or explicit peer ID. " "Pass reasoning_level to control depth: minimal (fast/cheap), low (default), " "medium, high, max (deep/expensive). Omit for configured default." ), "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "A natural language question.", }, "reasoning_level": { "type": "string", "description": ( "Override the default reasoning depth. " "Omit to use the configured default (typically low). " "Guide:\n" "- minimal: quick factual lookups (name, role, simple preference)\n" "- low: straightforward questions with clear answers\n" "- medium: multi-aspect questions requiring synthesis across observations\n" "- high: complex behavioral patterns, contradictions, deep analysis\n" "- max: thorough audit-level analysis, leave no stone unturned" ), "enum": ["minimal", "low", "medium", "high", "max"], }, "peer": { "type": "string", "description": "Peer to query. Built-in aliases: 'user' (default), 'ai'. Or pass any peer ID from this workspace.", }, }, "required": ["query"], }, } CONTEXT_SCHEMA = { "name": "honcho_context", "description": ( "Retrieve full session context from Honcho — summary, peer representation, " "peer card, and recent messages. No LLM synthesis. " "Cheaper than honcho_reasoning. Use this to see what Honcho knows about " "the current conversation and the specified peer." ), "parameters": { "type": "object", "properties": { "query": { "type": "string", "description": "Optional focus query to filter context. Omit for full session context snapshot.", }, "peer": { "type": "string", "description": "Peer to query. Built-in aliases: 'user' (default), 'ai'. Or pass any peer ID from this workspace.", }, }, "required": [], }, } CONCLUDE_SCHEMA = { "name": "honcho_conclude", "description": ( "Write or delete a conclusion about a peer in Honcho's memory. " "Conclusions are persistent facts that build a peer's profile. " "You MUST pass exactly one of: `conclusion` (to create) or `delete_id` (to delete). " "Passing neither is an error. " "Deletion is only for PII removal — Honcho self-heals incorrect conclusions over time." ), "parameters": { "type": "object", "properties": { "conclusion": { "type": "string", "description": "A factual statement to persist. Required when not using delete_id.", }, "delete_id": { "type": "string", "description": "Conclusion ID to delete (for PII removal). Required when not using conclusion.", }, "peer": { "type": "string", "description": "Peer to query. Built-in aliases: 'user' (default), 'ai'. Or pass any peer ID from this workspace.", }, }, "required": [], }, } ALL_TOOL_SCHEMAS = [PROFILE_SCHEMA, SEARCH_SCHEMA, REASONING_SCHEMA, CONTEXT_SCHEMA, CONCLUDE_SCHEMA] # --------------------------------------------------------------------------- # MemoryProvider implementation # --------------------------------------------------------------------------- class HonchoMemoryProvider(MemoryProvider): """Honcho AI-native memory with dialectic Q&A and persistent user modeling.""" def __init__(self): self._manager = None # HonchoSessionManager self._config = None # HonchoClientConfig self._session_key = "" self._prefetch_result = "" self._prefetch_lock = threading.Lock() self._prefetch_thread: Optional[threading.Thread] = None self._sync_thread: Optional[threading.Thread] = None # B1: recall_mode — set during initialize from config self._recall_mode = "hybrid" # "context", "tools", or "hybrid" # Base context cache — refreshed on context_cadence, not frozen self._base_context_cache: Optional[str] = None self._base_context_lock = threading.Lock() # B5: Cost-awareness turn counting and cadence self._turn_count = 0 self._injection_frequency = "every-turn" # or "first-turn" self._context_cadence = 1 # minimum turns between context API calls self._dialectic_cadence = 3 # minimum turns between dialectic API calls self._dialectic_depth = 1 # how many .chat() calls per dialectic cycle (1-3) self._dialectic_depth_levels: list[str] | None = None # per-pass reasoning levels self._reasoning_level_cap: Optional[str] = None # "minimal", "low", "medium", "high" self._last_context_turn = -999 self._last_dialectic_turn = -999 # Port #1957: lazy session init for tools-only mode self._session_initialized = False self._lazy_init_kwargs: Optional[dict] = None self._lazy_init_session_id: Optional[str] = None # Port #4053: cron guard — when True, plugin is fully inactive self._cron_skipped = False @property def name(self) -> str: return "honcho" def is_available(self) -> bool: """Check if Honcho is configured. No network calls.""" try: from plugins.memory.honcho.client import HonchoClientConfig cfg = HonchoClientConfig.from_global_config() # Port #2645: baseUrl-only verification — api_key OR base_url suffices return cfg.enabled and bool(cfg.api_key or cfg.base_url) except Exception: return False def save_config(self, values, hermes_home): """Write config to $HERMES_HOME/honcho.json (Honcho SDK native format).""" import json from pathlib import Path config_path = Path(hermes_home) / "honcho.json" existing = {} if config_path.exists(): try: existing = json.loads(config_path.read_text()) except Exception: pass existing.update(values) config_path.write_text(json.dumps(existing, indent=2)) def get_config_schema(self): return [ {"key": "api_key", "description": "Honcho API key", "secret": True, "env_var": "HONCHO_API_KEY", "url": "https://app.honcho.dev"}, {"key": "baseUrl", "description": "Honcho base URL (for self-hosted)"}, ] def post_setup(self, hermes_home: str, config: dict) -> None: """Run the full Honcho setup wizard after provider selection.""" import types from plugins.memory.honcho.cli import cmd_setup cmd_setup(types.SimpleNamespace()) def initialize(self, session_id: str, **kwargs) -> None: """Initialize Honcho session manager. Handles: cron guard, recall_mode, session name resolution, peer memory mode, SOUL.md ai_peer sync, memory file migration, and pre-warming context at init. """ try: # ----- Port #4053: cron guard ----- agent_context = kwargs.get("agent_context", "") platform = kwargs.get("platform", "cli") if agent_context in ("cron", "flush") or platform == "cron": logger.debug("Honcho skipped: cron/flush context (agent_context=%s, platform=%s)", agent_context, platform) self._cron_skipped = True return from plugins.memory.honcho.client import HonchoClientConfig, get_honcho_client from plugins.memory.honcho.session import HonchoSessionManager cfg = HonchoClientConfig.from_global_config() if not cfg.enabled or not (cfg.api_key or cfg.base_url): logger.debug("Honcho not configured — plugin inactive") return # Override peer_name with gateway user_id for per-user memory scoping. # Only when no explicit peerName was configured — an explicit peerName # means the user chose their identity; a raw user_id (e.g. Telegram # chat ID) should not silently replace it. _gw_user_id = kwargs.get("user_id") if _gw_user_id and not cfg.peer_name: cfg.peer_name = _gw_user_id self._config = cfg # ----- B1: recall_mode from config ----- self._recall_mode = cfg.recall_mode # "context", "tools", or "hybrid" logger.debug("Honcho recall_mode: %s", self._recall_mode) # ----- B5: cost-awareness config ----- try: raw = cfg.raw or {} self._injection_frequency = raw.get("injectionFrequency", "every-turn") self._context_cadence = int(raw.get("contextCadence", 1)) self._dialectic_cadence = int(raw.get("dialecticCadence", 3)) self._dialectic_depth = max(1, min(cfg.dialectic_depth, 3)) self._dialectic_depth_levels = cfg.dialectic_depth_levels cap = raw.get("reasoningLevelCap") if cap and cap in ("minimal", "low", "medium", "high"): self._reasoning_level_cap = cap except Exception as e: logger.debug("Honcho cost-awareness config parse error: %s", e) # ----- Port #1969: aiPeer sync from SOUL.md — REMOVED ----- # SOUL.md is persona content, not identity config. aiPeer should # only come from honcho.json (host block or root) or the default. # See scratch/memory-plugin-ux-specs.md #10 for rationale. # ----- Port #1957: lazy session init for tools-only mode ----- if self._recall_mode == "tools": if cfg.init_on_session_start: # Eager init even in tools mode (opt-in) self._do_session_init(cfg, session_id, **kwargs) return # Defer actual session creation until first tool call self._lazy_init_kwargs = kwargs self._lazy_init_session_id = session_id # Still need a client reference for _ensure_session self._config = cfg logger.debug("Honcho tools-only mode — deferring session init until first tool call") return # ----- Eager init (context or hybrid mode) ----- self._do_session_init(cfg, session_id, **kwargs) except ImportError: logger.debug("honcho-ai package not installed — plugin inactive") except Exception as e: logger.warning("Honcho init failed: %s", e) self._manager = None def _do_session_init(self, cfg, session_id: str, **kwargs) -> None: """Shared session initialization logic for both eager and lazy paths.""" from plugins.memory.honcho.client import get_honcho_client from plugins.memory.honcho.session import HonchoSessionManager client = get_honcho_client(cfg) self._manager = HonchoSessionManager( honcho=client, config=cfg, context_tokens=cfg.context_tokens, ) # ----- B3: resolve_session_name ----- session_title = kwargs.get("session_title") gateway_session_key = kwargs.get("gateway_session_key") self._session_key = ( cfg.resolve_session_name( session_title=session_title, session_id=session_id, gateway_session_key=gateway_session_key, ) or session_id or "hermes-default" ) logger.debug("Honcho session key resolved: %s", self._session_key) # Create session eagerly session = self._manager.get_or_create(self._session_key) self._session_initialized = True # ----- B6: Memory file migration (one-time, for new sessions) ----- # Skip under per-session strategy: every Hermes run creates a fresh # Honcho session by design, so uploading MEMORY.md/USER.md/SOUL.md to # each one would flood the backend with short-lived duplicates instead # of performing a one-time migration. try: if not session.messages and cfg.session_strategy != "per-session": from hermes_constants import get_hermes_home mem_dir = str(get_hermes_home() / "memories") self._manager.migrate_memory_files(self._session_key, mem_dir) logger.debug("Honcho memory file migration attempted for new session: %s", self._session_key) elif cfg.session_strategy == "per-session": logger.debug( "Honcho memory file migration skipped: per-session strategy creates a fresh session per run (%s)", self._session_key, ) except Exception as e: logger.debug("Honcho memory file migration skipped: %s", e) # ----- B7: Pre-warming context at init ----- if self._recall_mode in ("context", "hybrid"): try: self._manager.prefetch_context(self._session_key) self._manager.prefetch_dialectic(self._session_key, "What should I know about this user?") logger.debug("Honcho pre-warm threads started for session: %s", self._session_key) except Exception as e: logger.debug("Honcho pre-warm failed: %s", e) def _ensure_session(self) -> bool: """Lazily initialize the Honcho session (for tools-only mode). Returns True if the manager is ready, False otherwise. """ if self._manager and self._session_initialized: return True if self._cron_skipped: return False if not self._config or not self._lazy_init_kwargs: return False try: self._do_session_init( self._config, self._lazy_init_session_id or "hermes-default", **self._lazy_init_kwargs, ) # Clear lazy refs self._lazy_init_kwargs = None self._lazy_init_session_id = None return self._manager is not None except Exception as e: logger.warning("Honcho lazy session init failed: %s", e) return False def _format_first_turn_context(self, ctx: dict) -> str: """Format the prefetch context dict into a readable system prompt block.""" parts = [] # Session summary — session-scoped context, placed first for relevance summary = ctx.get("summary", "") if summary: parts.append(f"## Session Summary\n{summary}") rep = ctx.get("representation", "") if rep: parts.append(f"## User Representation\n{rep}") card = ctx.get("card", "") if card: parts.append(f"## User Peer Card\n{card}") ai_rep = ctx.get("ai_representation", "") if ai_rep: parts.append(f"## AI Self-Representation\n{ai_rep}") ai_card = ctx.get("ai_card", "") if ai_card: parts.append(f"## AI Identity Card\n{ai_card}") if not parts: return "" return "\n\n".join(parts) def system_prompt_block(self) -> str: """Return system prompt text, adapted by recall_mode. Returns only the mode header and tool instructions — static text that doesn't change between turns (prompt-cache friendly). Live context (representation, card) is injected via prefetch(). """ if self._cron_skipped: return "" if not self._manager or not self._session_key: # tools-only mode without session yet still returns a minimal block if self._recall_mode == "tools" and self._config: return ( "# Honcho Memory\n" "Active (tools-only mode). Use honcho_profile, honcho_search, " "honcho_reasoning, honcho_context, and honcho_conclude tools to access user memory." ) return "" # ----- B1: adapt text based on recall_mode ----- if self._recall_mode == "context": header = ( "# Honcho Memory\n" "Active (context-injection mode). Relevant user context is automatically " "injected before each turn. No memory tools are available — context is " "managed automatically." ) elif self._recall_mode == "tools": header = ( "# Honcho Memory\n" "Active (tools-only mode). Use honcho_profile for a quick factual snapshot, " "honcho_search for raw excerpts, honcho_context for raw peer context, " "honcho_reasoning for synthesized answers, " "honcho_conclude to save facts about the user. " "No automatic context injection — you must use tools to access memory." ) else: # hybrid header = ( "# Honcho Memory\n" "Active (hybrid mode). Relevant context is auto-injected AND memory tools are available. " "Use honcho_profile for a quick factual snapshot, " "honcho_search for raw excerpts, honcho_context for raw peer context, " "honcho_reasoning for synthesized answers, " "honcho_conclude to save facts about the user." ) return header def prefetch(self, query: str, *, session_id: str = "") -> str: """Return base context (representation + card) plus dialectic supplement. Assembles two layers: 1. Base context from peer.context() — cached, refreshed on context_cadence 2. Dialectic supplement — cached, refreshed on dialectic_cadence B1: Returns empty when recall_mode is "tools" (no injection). B5: Respects injection_frequency — "first-turn" returns cached/empty after turn 0. Port #3265: Truncates to context_tokens budget. """ if self._cron_skipped: return "" # B1: tools-only mode — no auto-injection if self._recall_mode == "tools": return "" # B5: injection_frequency — if "first-turn" and past first turn, return empty. # _turn_count is 1-indexed (first user message = 1), so > 1 means "past first". if self._injection_frequency == "first-turn" and self._turn_count > 1: return "" parts = [] # ----- Layer 1: Base context (representation + card) ----- # On first call, fetch synchronously so turn 1 isn't empty. # After that, serve from cache and refresh in background on cadence. with self._base_context_lock: if self._base_context_cache is None: # First call — synchronous fetch try: ctx = self._manager.get_prefetch_context(self._session_key) self._base_context_cache = self._format_first_turn_context(ctx) if ctx else "" self._last_context_turn = self._turn_count except Exception as e: logger.debug("Honcho base context fetch failed: %s", e) self._base_context_cache = "" base_context = self._base_context_cache # Check if background context prefetch has a fresher result if self._manager: fresh_ctx = self._manager.pop_context_result(self._session_key) if fresh_ctx: formatted = self._format_first_turn_context(fresh_ctx) if formatted: with self._base_context_lock: self._base_context_cache = formatted base_context = formatted if base_context: parts.append(base_context) # ----- Layer 2: Dialectic supplement ----- # On the very first turn, no queue_prefetch() has run yet so the # dialectic result is empty. Run with a bounded timeout so a slow # Honcho connection doesn't block the first response indefinitely. # On timeout the result is skipped and queue_prefetch() will pick it # up at the next cadence-allowed turn. if self._last_dialectic_turn == -999 and query: _first_turn_timeout = ( self._config.timeout if self._config and self._config.timeout else 8.0 ) _result_holder: list[str] = [] def _run_first_turn() -> None: try: _result_holder.append(self._run_dialectic_depth(query)) except Exception as exc: logger.debug("Honcho first-turn dialectic failed: %s", exc) _t = threading.Thread(target=_run_first_turn, daemon=True) _t.start() _t.join(timeout=_first_turn_timeout) if not _t.is_alive(): first_turn_dialectic = _result_holder[0] if _result_holder else "" if first_turn_dialectic and first_turn_dialectic.strip(): with self._prefetch_lock: self._prefetch_result = first_turn_dialectic self._last_dialectic_turn = self._turn_count else: logger.debug( "Honcho first-turn dialectic timed out (%.1fs) — " "will inject at next cadence-allowed turn", _first_turn_timeout, ) # Don't update _last_dialectic_turn: queue_prefetch() will # retry at the next cadence-allowed turn via the async path. if self._prefetch_thread and self._prefetch_thread.is_alive(): self._prefetch_thread.join(timeout=3.0) with self._prefetch_lock: dialectic_result = self._prefetch_result self._prefetch_result = "" if dialectic_result and dialectic_result.strip(): parts.append(dialectic_result) if not parts: return "" result = "\n\n".join(parts) # ----- Port #3265: token budget enforcement ----- result = self._truncate_to_budget(result) return result def _truncate_to_budget(self, text: str) -> str: """Truncate text to fit within context_tokens budget if set.""" if not self._config or not self._config.context_tokens: return text budget_chars = self._config.context_tokens * 4 # conservative char estimate if len(text) <= budget_chars: return text # Truncate at word boundary truncated = text[:budget_chars] last_space = truncated.rfind(" ") if last_space > budget_chars * 0.8: truncated = truncated[:last_space] return truncated + " …" def queue_prefetch(self, query: str, *, session_id: str = "") -> None: """Fire background prefetch threads for the upcoming turn. B5: Checks cadence independently for dialectic and context refresh. Context refresh updates the base layer (representation + card). Dialectic fires the LLM reasoning supplement. """ if self._cron_skipped: return if not self._manager or not self._session_key or not query: return # B1: tools-only mode — no prefetch if self._recall_mode == "tools": return # ----- Context refresh (base layer) — independent cadence ----- if self._context_cadence <= 1 or (self._turn_count - self._last_context_turn) >= self._context_cadence: self._last_context_turn = self._turn_count try: self._manager.prefetch_context(self._session_key, query) except Exception as e: logger.debug("Honcho context prefetch failed: %s", e) # ----- Dialectic prefetch (supplement layer) ----- # B5: cadence check — skip if too soon since last dialectic call if self._dialectic_cadence > 1: if (self._turn_count - self._last_dialectic_turn) < self._dialectic_cadence: logger.debug("Honcho dialectic prefetch skipped: cadence %d, turns since last: %d", self._dialectic_cadence, self._turn_count - self._last_dialectic_turn) return self._last_dialectic_turn = self._turn_count def _run(): try: result = self._run_dialectic_depth(query) if result and result.strip(): with self._prefetch_lock: self._prefetch_result = result except Exception as e: logger.debug("Honcho prefetch failed: %s", e) self._prefetch_thread = threading.Thread( target=_run, daemon=True, name="honcho-prefetch" ) self._prefetch_thread.start() # ----- Dialectic depth: multi-pass .chat() with cold/warm prompts ----- # Proportional reasoning levels per depth/pass when dialecticDepthLevels # is not configured. The base level is dialecticReasoningLevel. # Index: (depth, pass) → level relative to base. _PROPORTIONAL_LEVELS: dict[tuple[int, int], str] = { # depth 1: single pass at base level (1, 0): "base", # depth 2: pass 0 lighter, pass 1 at base (2, 0): "minimal", (2, 1): "base", # depth 3: pass 0 lighter, pass 1 at base, pass 2 one above minimal (3, 0): "minimal", (3, 1): "base", (3, 2): "low", } _LEVEL_ORDER = ("minimal", "low", "medium", "high", "max") def _resolve_pass_level(self, pass_idx: int) -> str: """Resolve reasoning level for a given pass index. Uses dialecticDepthLevels if configured, otherwise proportional defaults relative to dialecticReasoningLevel. """ if self._dialectic_depth_levels and pass_idx < len(self._dialectic_depth_levels): return self._dialectic_depth_levels[pass_idx] base = (self._config.dialectic_reasoning_level if self._config else "low") mapping = self._PROPORTIONAL_LEVELS.get((self._dialectic_depth, pass_idx)) if mapping is None or mapping == "base": return base return mapping def _build_dialectic_prompt(self, pass_idx: int, prior_results: list[str], is_cold: bool) -> str: """Build the prompt for a given dialectic pass. Pass 0: cold start (general user query) or warm (session-scoped). Pass 1: self-audit / targeted synthesis against gaps from pass 0. Pass 2: reconciliation / contradiction check across prior passes. """ if pass_idx == 0: if is_cold: return ( "Who is this person? What are their preferences, goals, " "and working style? Focus on facts that would help an AI " "assistant be immediately useful." ) return ( "Given what's been discussed in this session so far, what " "context about this user is most relevant to the current " "conversation? Prioritize active context over biographical facts." ) elif pass_idx == 1: prior = prior_results[-1] if prior_results else "" return ( f"Given this initial assessment:\n\n{prior}\n\n" "What gaps remain in your understanding that would help " "going forward? Synthesize what you actually know about " "the user's current state and immediate needs, grounded " "in evidence from recent sessions." ) else: # pass 2: reconciliation return ( f"Prior passes produced:\n\n" f"Pass 1:\n{prior_results[0] if len(prior_results) > 0 else '(empty)'}\n\n" f"Pass 2:\n{prior_results[1] if len(prior_results) > 1 else '(empty)'}\n\n" "Do these assessments cohere? Reconcile any contradictions " "and produce a final, concise synthesis of what matters most " "for the current conversation." ) @staticmethod def _signal_sufficient(result: str) -> bool: """Check if a dialectic pass returned enough signal to skip further passes. Heuristic: a response longer than 100 chars with some structure (section headers, bullets, or an ordered list) is considered sufficient. """ if not result or len(result.strip()) < 100: return False # Structured output with sections/bullets is strong signal if "\n" in result and ( "##" in result or "•" in result or re.search(r"^[*-] ", result, re.MULTILINE) or re.search(r"^\s*\d+\. ", result, re.MULTILINE) ): return True # Long enough even without structure return len(result.strip()) > 300 def _run_dialectic_depth(self, query: str) -> str: """Execute up to dialecticDepth .chat() calls with conditional bail-out. Cold start (no base context): general user-oriented query. Warm session (base context exists): session-scoped query. Each pass is conditional — bails early if prior pass returned strong signal. Returns the best (usually last) result. """ if not self._manager or not self._session_key: return "" is_cold = not self._base_context_cache results: list[str] = [] for i in range(self._dialectic_depth): if i == 0: prompt = self._build_dialectic_prompt(0, results, is_cold) else: # Skip further passes if prior pass delivered strong signal if results and self._signal_sufficient(results[-1]): logger.debug("Honcho dialectic depth %d: pass %d skipped, prior signal sufficient", self._dialectic_depth, i) break prompt = self._build_dialectic_prompt(i, results, is_cold) level = self._resolve_pass_level(i) logger.debug("Honcho dialectic depth %d: pass %d, level=%s, cold=%s", self._dialectic_depth, i, level, is_cold) result = self._manager.dialectic_query( self._session_key, prompt, reasoning_level=level, peer="user", ) results.append(result or "") # Return the last non-empty result (deepest pass that ran) for r in reversed(results): if r and r.strip(): return r return "" def on_turn_start(self, turn_number: int, message: str, **kwargs) -> None: """Track turn count for cadence and injection_frequency logic.""" self._turn_count = turn_number @staticmethod def _chunk_message(content: str, limit: int) -> list[str]: """Split content into chunks that fit within the Honcho message limit. Splits at paragraph boundaries when possible, falling back to sentence boundaries, then word boundaries. Each continuation chunk is prefixed with "[continued] " so Honcho's representation engine can reconstruct the full message. """ if len(content) <= limit: return [content] prefix = "[continued] " prefix_len = len(prefix) chunks = [] remaining = content first = True while remaining: effective = limit if first else limit - prefix_len if len(remaining) <= effective: chunks.append(remaining if first else prefix + remaining) break segment = remaining[:effective] # Try paragraph break, then sentence, then word cut = segment.rfind("\n\n") if cut < effective * 0.3: cut = segment.rfind(". ") if cut >= 0: cut += 2 # include the period and space if cut < effective * 0.3: cut = segment.rfind(" ") if cut < effective * 0.3: cut = effective # hard cut chunk = remaining[:cut].rstrip() remaining = remaining[cut:].lstrip() if not first: chunk = prefix + chunk chunks.append(chunk) first = False return chunks def sync_turn(self, user_content: str, assistant_content: str, *, session_id: str = "") -> None: """Record the conversation turn in Honcho (non-blocking). Messages exceeding the Honcho API limit (default 25k chars) are split into multiple messages with continuation markers. """ if self._cron_skipped: return if not self._manager or not self._session_key: return msg_limit = self._config.message_max_chars if self._config else 25000 def _sync(): try: session = self._manager.get_or_create(self._session_key) for chunk in self._chunk_message(user_content, msg_limit): session.add_message("user", chunk) for chunk in self._chunk_message(assistant_content, msg_limit): session.add_message("assistant", chunk) self._manager._flush_session(session) except Exception as e: logger.debug("Honcho sync_turn failed: %s", e) if self._sync_thread and self._sync_thread.is_alive(): self._sync_thread.join(timeout=5.0) self._sync_thread = threading.Thread( target=_sync, daemon=True, name="honcho-sync" ) self._sync_thread.start() def on_memory_write(self, action: str, target: str, content: str) -> None: """Mirror built-in user profile writes as Honcho conclusions.""" if action != "add" or target != "user" or not content: return if self._cron_skipped: return if not self._manager or not self._session_key: return def _write(): try: self._manager.create_conclusion(self._session_key, content) except Exception as e: logger.debug("Honcho memory mirror failed: %s", e) t = threading.Thread(target=_write, daemon=True, name="honcho-memwrite") t.start() def on_session_end(self, messages: List[Dict[str, Any]]) -> None: """Flush all pending messages to Honcho on session end.""" if self._cron_skipped: return if not self._manager: return # Wait for pending sync if self._sync_thread and self._sync_thread.is_alive(): self._sync_thread.join(timeout=10.0) try: self._manager.flush_all() except Exception as e: logger.debug("Honcho session-end flush failed: %s", e) def get_tool_schemas(self) -> List[Dict[str, Any]]: """Return tool schemas, respecting recall_mode. B1: context-only mode hides all tools. """ if self._cron_skipped: return [] if self._recall_mode == "context": return [] return list(ALL_TOOL_SCHEMAS) def handle_tool_call(self, tool_name: str, args: dict, **kwargs) -> str: """Handle a Honcho tool call, with lazy session init for tools-only mode.""" if self._cron_skipped: return tool_error("Honcho is not active (cron context).") # Port #1957: ensure session is initialized for tools-only mode if not self._session_initialized: if not self._ensure_session(): return tool_error("Honcho session could not be initialized.") if not self._manager or not self._session_key: return tool_error("Honcho is not active for this session.") try: if tool_name == "honcho_profile": peer = args.get("peer", "user") card_update = args.get("card") if card_update: result = self._manager.set_peer_card(self._session_key, card_update, peer=peer) if result is None: return tool_error("Failed to update peer card.") return json.dumps({"result": f"Peer card updated ({len(result)} facts).", "card": result}) card = self._manager.get_peer_card(self._session_key, peer=peer) if not card: return json.dumps({"result": "No profile facts available yet."}) return json.dumps({"result": card}) elif tool_name == "honcho_search": query = args.get("query", "") if not query: return tool_error("Missing required parameter: query") max_tokens = min(int(args.get("max_tokens", 800)), 2000) peer = args.get("peer", "user") result = self._manager.search_context( self._session_key, query, max_tokens=max_tokens, peer=peer ) if not result: return json.dumps({"result": "No relevant context found."}) return json.dumps({"result": result}) elif tool_name == "honcho_reasoning": query = args.get("query", "") if not query: return tool_error("Missing required parameter: query") peer = args.get("peer", "user") reasoning_level = args.get("reasoning_level") result = self._manager.dialectic_query( self._session_key, query, reasoning_level=reasoning_level, peer=peer, ) # Update cadence tracker so auto-injection respects the gap after an explicit call self._last_dialectic_turn = self._turn_count return json.dumps({"result": result or "No result from Honcho."}) elif tool_name == "honcho_context": peer = args.get("peer", "user") ctx = self._manager.get_session_context(self._session_key, peer=peer) if not ctx: return json.dumps({"result": "No context available yet."}) parts = [] if ctx.get("summary"): parts.append(f"## Summary\n{ctx['summary']}") if ctx.get("representation"): parts.append(f"## Representation\n{ctx['representation']}") if ctx.get("card"): parts.append(f"## Card\n{ctx['card']}") if ctx.get("recent_messages"): msgs = ctx["recent_messages"] msg_str = "\n".join( f" [{m['role']}] {m['content'][:200]}" for m in msgs[-5:] # last 5 for brevity ) parts.append(f"## Recent messages\n{msg_str}") return json.dumps({"result": "\n\n".join(parts) or "No context available."}) elif tool_name == "honcho_conclude": delete_id = args.get("delete_id") peer = args.get("peer", "user") if delete_id: ok = self._manager.delete_conclusion(self._session_key, delete_id, peer=peer) if ok: return json.dumps({"result": f"Conclusion {delete_id} deleted."}) return tool_error(f"Failed to delete conclusion {delete_id}.") conclusion = args.get("conclusion", "") if not conclusion: return tool_error("Missing required parameter: conclusion or delete_id") ok = self._manager.create_conclusion(self._session_key, conclusion, peer=peer) if ok: return json.dumps({"result": f"Conclusion saved for {peer}: {conclusion}"}) return tool_error("Failed to save conclusion.") return tool_error(f"Unknown tool: {tool_name}") except Exception as e: logger.error("Honcho tool %s failed: %s", tool_name, e) return tool_error(f"Honcho {tool_name} failed: {e}") def shutdown(self) -> None: for t in (self._prefetch_thread, self._sync_thread): if t and t.is_alive(): t.join(timeout=5.0) # Flush any remaining messages if self._manager: try: self._manager.flush_all() except Exception: pass # --------------------------------------------------------------------------- # Plugin entry point # --------------------------------------------------------------------------- def register(ctx) -> None: """Register Honcho as a memory provider plugin.""" ctx.register_memory_provider(HonchoMemoryProvider())