"""Background memory/skill review — fork the agent to evaluate the turn. After every turn, ``AIAgent.run_conversation`` may call :func:`spawn_background_review` to fire off a daemon thread that replays the conversation snapshot in a forked :class:`AIAgent` and asks itself "should any skill/memory be saved or updated?". Writes go straight to the memory + skill stores. Main conversation and prompt cache are never touched. The fork inherits the parent's live runtime (provider, model, base_url, credentials, cached system prompt) so it hits the same prefix cache and uses the same auth. It runs with a tool whitelist limited to memory and skill management tools; everything else is denied at runtime. See the ``hermes-agent-dev`` skill (``references/self-improvement-loop.md``) for invariants and PR review criteria. """ from __future__ import annotations import contextlib import json import logging import os from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) def summarize_background_review_actions( review_messages: List[Dict], prior_snapshot: List[Dict], ) -> List[str]: """Build the human-facing action summary for a background review pass. Walks the review agent's session messages and collects "successful tool action" descriptions to surface to the user (e.g. "Memory updated"). Tool messages already present in ``prior_snapshot`` are skipped so we don't re-surface stale results from the prior conversation that the review agent inherited via ``conversation_history`` (issue #14944). Matching is by ``tool_call_id`` when available, with a content-equality fallback for tool messages that lack one. """ existing_tool_call_ids = set() existing_tool_contents = set() for prior in prior_snapshot or []: if not isinstance(prior, dict) or prior.get("role") != "tool": continue tcid = prior.get("tool_call_id") if tcid: existing_tool_call_ids.add(tcid) else: content = prior.get("content") if isinstance(content, str): existing_tool_contents.add(content) actions: List[str] = [] for msg in review_messages or []: if not isinstance(msg, dict) or msg.get("role") != "tool": continue tcid = msg.get("tool_call_id") if tcid and tcid in existing_tool_call_ids: continue if not tcid: content_str = msg.get("content") if isinstance(content_str, str) and content_str in existing_tool_contents: continue try: data = json.loads(msg.get("content", "{}")) except (json.JSONDecodeError, TypeError): continue if not isinstance(data, dict) or not data.get("success"): continue message = data.get("message", "") target = data.get("target", "") if "created" in message.lower(): actions.append(message) elif "updated" in message.lower(): actions.append(message) elif "added" in message.lower() or (target and "add" in message.lower()): label = "Memory" if target == "memory" else "User profile" if target == "user" else target actions.append(f"{label} updated") elif "Entry added" in message: label = "Memory" if target == "memory" else "User profile" if target == "user" else target actions.append(f"{label} updated") elif "removed" in message.lower() or "replaced" in message.lower(): label = "Memory" if target == "memory" else "User profile" if target == "user" else target actions.append(f"{label} updated") return actions def build_memory_write_metadata( agent: Any, *, write_origin: Optional[str] = None, execution_context: Optional[str] = None, task_id: Optional[str] = None, tool_call_id: Optional[str] = None, ) -> Dict[str, Any]: """Build provenance metadata for external memory-provider mirrors.""" metadata: Dict[str, Any] = { "write_origin": write_origin or getattr(agent, "_memory_write_origin", "assistant_tool"), "execution_context": ( execution_context or getattr(agent, "_memory_write_context", "foreground") ), "session_id": agent.session_id or "", "parent_session_id": agent._parent_session_id or "", "platform": agent.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"), "tool_name": "memory", } if task_id: metadata["task_id"] = task_id if tool_call_id: metadata["tool_call_id"] = tool_call_id return {k: v for k, v in metadata.items() if v not in {None, ""}} def _run_review_in_thread( agent: Any, messages_snapshot: List[Dict], prompt: str, ) -> None: """Worker function executed in the background-review daemon thread. Spawns a forked ``AIAgent`` inheriting the parent's runtime, runs the review prompt, and surfaces a compact action summary back to the user via ``agent._safe_print`` and ``agent.background_review_callback``. """ # Local import to avoid a hard circular dep at module load. from run_agent import AIAgent from tools.terminal_tool import set_approval_callback as _set_approval_callback # Install a non-interactive approval callback on this worker # thread so any dangerous-command guard the review agent trips # resolves to "deny" instead of falling back to input() -- which # deadlocks against the parent's prompt_toolkit TUI (#15216). # Same pattern as _subagent_auto_deny in tools/delegate_tool.py. def _bg_review_auto_deny(command, description, **kwargs): logger.warning( "Background review auto-denied dangerous command: %s (%s)", command, description, ) return "deny" try: _set_approval_callback(_bg_review_auto_deny) except Exception: pass review_agent = None review_messages: List[Dict] = [] try: with open(os.devnull, "w", encoding="utf-8") as _devnull, \ contextlib.redirect_stdout(_devnull), \ contextlib.redirect_stderr(_devnull): # Inherit the parent agent's live runtime (provider, model, # base_url, api_key, api_mode) so the fork uses the exact # same credentials the main turn is using. Without this, # AIAgent.__init__ re-runs auto-resolution from env vars, # which fails for OAuth-only providers, session-scoped # creds, or credential-pool setups where the resolver can't # reconstruct auth from scratch -- producing the spurious # "No LLM provider configured" warning at end of turn. _parent_runtime = agent._current_main_runtime() _parent_api_mode = _parent_runtime.get("api_mode") or None # The review fork needs to call agent-loop tools (memory, # skill_manage). Those tools require Hermes' own dispatch, # which the codex_app_server runtime bypasses entirely # (it runs the turn inside codex's subprocess). So when # the parent is on codex_app_server, downgrade the review # fork to codex_responses — same auth/credentials, but # talks to the OpenAI Responses API directly so Hermes # owns the loop and the agent-loop tools dispatch. if _parent_api_mode == "codex_app_server": _parent_api_mode = "codex_responses" review_agent = AIAgent( model=agent.model, max_iterations=16, quiet_mode=True, platform=agent.platform, provider=agent.provider, api_mode=_parent_api_mode, base_url=_parent_runtime.get("base_url") or None, api_key=_parent_runtime.get("api_key") or None, credential_pool=getattr(agent, "_credential_pool", None), parent_session_id=agent.session_id, ) review_agent._memory_write_origin = "background_review" review_agent._memory_write_context = "background_review" review_agent._memory_store = agent._memory_store review_agent._memory_enabled = agent._memory_enabled review_agent._user_profile_enabled = agent._user_profile_enabled review_agent._memory_nudge_interval = 0 review_agent._skill_nudge_interval = 0 # Suppress all status/warning emits from the fork so the # user only sees the final successful-action summary. # Without this, mid-review "Iteration budget exhausted", # rate-limit retries, compression warnings, and other # lifecycle messages bubble up through _emit_status -> # _vprint and leak past the stdout redirect (they go via # _print_fn/status_callback, which bypass sys.stdout). review_agent.suppress_status_output = True # Inherit the parent's cached system prompt verbatim so # the review fork's outbound HTTP request hits the same # Anthropic/OpenRouter prefix cache the parent warmed. # Without this, the fork rebuilds the system prompt from # scratch (fresh _hermes_now() timestamp, fresh # session_id, narrower toolset → different skills_prompt) # and the byte-exact prefix-cache key misses. See # issue #25322 and PR #17276 for the full analysis + # measured impact (~26% end-to-end cost reduction on # Sonnet 4.5). review_agent._cached_system_prompt = agent._cached_system_prompt # Defensive: pin session_start + session_id to the # parent's so any code path that re-renders parts of # the system prompt (compression, plugin hooks) still # produces byte-identical output. The cached-prompt # assignment above already short-circuits the normal # rebuild path, but these pins guarantee parity even # if a future code path bypasses the cache. review_agent.session_start = agent.session_start review_agent.session_id = agent.session_id from model_tools import get_tool_definitions from hermes_cli.plugins import ( set_thread_tool_whitelist, clear_thread_tool_whitelist, ) review_whitelist = { t["function"]["name"] for t in get_tool_definitions( enabled_toolsets=["memory", "skills"], quiet_mode=True, ) } set_thread_tool_whitelist( review_whitelist, deny_msg_fmt=( "Background review denied non-whitelisted tool: " "{tool_name}. Only memory/skill tools are allowed." ), ) try: review_agent.run_conversation( user_message=( prompt + "\n\nYou can only call memory and skill " "management tools. Other tools will be denied " "at runtime — do not attempt them." ), conversation_history=messages_snapshot, ) finally: clear_thread_tool_whitelist() # Tear down memory providers while stdout is still # redirected so background thread teardown (Honcho flush, # Hindsight sync, etc.) stays silent. The finally block # below is a safety net for the exception path. try: review_agent.shutdown_memory_provider() except Exception: pass try: review_agent.close() except Exception: pass review_messages = list(getattr(review_agent, "_session_messages", [])) review_agent = None # Scan the review agent's messages for successful tool actions # and surface a compact summary to the user. Tool messages # already present in messages_snapshot must be skipped, since # the review agent inherits that history and would otherwise # re-surface stale "created"/"updated" messages from the prior # conversation as if they just happened (issue #14944). actions = summarize_background_review_actions( review_messages, messages_snapshot, ) if actions: summary = " · ".join(dict.fromkeys(actions)) agent._safe_print( f" 💾 Self-improvement review: {summary}" ) _bg_cb = agent.background_review_callback if _bg_cb: try: _bg_cb( f"💾 Self-improvement review: {summary}" ) except Exception: pass except Exception as e: logger.warning("Background memory/skill review failed: %s", e) agent._emit_auxiliary_failure("background review", e) finally: # Safety-net cleanup for the exception path. Normal # completion already shut down inside redirect_stdout above. # Re-open devnull here so any teardown output (Honcho flush, # Hindsight sync, background thread joins) stays silent even # on the exception path where redirect_stdout already exited. if review_agent is not None: try: with open(os.devnull, "w", encoding="utf-8") as _fn, \ contextlib.redirect_stdout(_fn), \ contextlib.redirect_stderr(_fn): try: review_agent.shutdown_memory_provider() except Exception: pass try: review_agent.close() except Exception: pass except Exception: pass # Clear the approval callback on this bg-review thread so a # recycled thread-id doesn't inherit a stale reference. try: _set_approval_callback(None) except Exception: pass def spawn_background_review_thread( agent: Any, messages_snapshot: List[Dict], review_memory: bool = False, review_skills: bool = False, ): """Build the review thread target and prompt for a background review. Returns a ``(target, prompt)`` tuple. The caller (``AIAgent._spawn_background_review``) owns the actual ``threading.Thread`` construction so test-level patches of ``run_agent.threading.Thread`` keep working. """ # Pick the right prompt based on which triggers fired if review_memory and review_skills: prompt = agent._COMBINED_REVIEW_PROMPT elif review_memory: prompt = agent._MEMORY_REVIEW_PROMPT else: prompt = agent._SKILL_REVIEW_PROMPT def _target() -> None: _run_review_in_thread(agent, messages_snapshot, prompt) return _target, prompt __all__ = [ "spawn_background_review_thread", "summarize_background_review_actions", "build_memory_write_metadata", ]