From 1f6eb1738c206e95c3e0641c3d8000a4d0be841b Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sat, 16 May 2026 18:05:01 -0700 Subject: [PATCH] refactor(run_agent): extract background memory/skill review to agent/background_review.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move the background-review subsystem (the self-improvement loop — see the README) out of run_agent.py into a dedicated module. * summarize_background_review_actions — was the @staticmethod that builds the user-facing action summary * spawn_background_review_thread — builds the thread target + prompt; the actual review loop body (forked AIAgent, runtime inheritance, tool whitelist, suppression, teardown) lives in _run_review_in_thread * build_memory_write_metadata — provenance for external memory mirrors AIAgent keeps thin wrappers for backward compatibility AND because tests patch run_agent.threading.Thread to assert lifecycle behavior — the threading.Thread construction stays in AIAgent._spawn_background_review, the inner work moves out. tests/run_agent/ + tests/agent/: 4313 passed, 1 pre-existing failure (test_auxiliary_client.py::test_custom_endpoint... — confirmed failing on main before this change). 3 skipped. run_agent.py: 15272 -> 14972 lines (-300). --- agent/background_review.py | 360 +++++++++++++++++++++++++++++++++++++ run_agent.py | 311 +++----------------------------- 2 files changed, 386 insertions(+), 285 deletions(-) create mode 100644 agent/background_review.py diff --git a/agent/background_review.py b/agent/background_review.py new file mode 100644 index 00000000000..351ab1d43dc --- /dev/null +++ b/agent/background_review.py @@ -0,0 +1,360 @@ +"""Background memory/skill review — fork the agent to evaluate the turn. + +After every turn, ``AIAgent.run_conversation`` may call +:func:`spawn_background_review` to fire off a daemon thread that replays +the conversation snapshot in a forked :class:`AIAgent` and asks itself +"should any skill/memory be saved or updated?". Writes go straight to +the memory + skill stores. Main conversation and prompt cache are never +touched. + +The fork inherits the parent's live runtime (provider, model, base_url, +credentials, cached system prompt) so it hits the same prefix cache and +uses the same auth. It runs with a tool whitelist limited to memory and +skill management tools; everything else is denied at runtime. + +See the ``hermes-agent-dev`` skill (``references/self-improvement-loop.md``) +for invariants and PR review criteria. +""" + +from __future__ import annotations + +import contextlib +import json +import logging +import os +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +def summarize_background_review_actions( + review_messages: List[Dict], + prior_snapshot: List[Dict], +) -> List[str]: + """Build the human-facing action summary for a background review pass. + + Walks the review agent's session messages and collects "successful tool + action" descriptions to surface to the user (e.g. "Memory updated"). + Tool messages already present in ``prior_snapshot`` are skipped so we + don't re-surface stale results from the prior conversation that the + review agent inherited via ``conversation_history`` (issue #14944). + + Matching is by ``tool_call_id`` when available, with a content-equality + fallback for tool messages that lack one. + """ + existing_tool_call_ids = set() + existing_tool_contents = set() + for prior in prior_snapshot or []: + if not isinstance(prior, dict) or prior.get("role") != "tool": + continue + tcid = prior.get("tool_call_id") + if tcid: + existing_tool_call_ids.add(tcid) + else: + content = prior.get("content") + if isinstance(content, str): + existing_tool_contents.add(content) + + actions: List[str] = [] + for msg in review_messages or []: + if not isinstance(msg, dict) or msg.get("role") != "tool": + continue + tcid = msg.get("tool_call_id") + if tcid and tcid in existing_tool_call_ids: + continue + if not tcid: + content_str = msg.get("content") + if isinstance(content_str, str) and content_str in existing_tool_contents: + continue + try: + data = json.loads(msg.get("content", "{}")) + except (json.JSONDecodeError, TypeError): + continue + if not isinstance(data, dict) or not data.get("success"): + continue + message = data.get("message", "") + target = data.get("target", "") + if "created" in message.lower(): + actions.append(message) + elif "updated" in message.lower(): + actions.append(message) + elif "added" in message.lower() or (target and "add" in message.lower()): + label = "Memory" if target == "memory" else "User profile" if target == "user" else target + actions.append(f"{label} updated") + elif "Entry added" in message: + label = "Memory" if target == "memory" else "User profile" if target == "user" else target + actions.append(f"{label} updated") + elif "removed" in message.lower() or "replaced" in message.lower(): + label = "Memory" if target == "memory" else "User profile" if target == "user" else target + actions.append(f"{label} updated") + return actions + + +def build_memory_write_metadata( + agent: Any, + *, + write_origin: Optional[str] = None, + execution_context: Optional[str] = None, + task_id: Optional[str] = None, + tool_call_id: Optional[str] = None, +) -> Dict[str, Any]: + """Build provenance metadata for external memory-provider mirrors.""" + metadata: Dict[str, Any] = { + "write_origin": write_origin or getattr(agent, "_memory_write_origin", "assistant_tool"), + "execution_context": ( + execution_context + or getattr(agent, "_memory_write_context", "foreground") + ), + "session_id": agent.session_id or "", + "parent_session_id": agent._parent_session_id or "", + "platform": agent.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"), + "tool_name": "memory", + } + if task_id: + metadata["task_id"] = task_id + if tool_call_id: + metadata["tool_call_id"] = tool_call_id + return {k: v for k, v in metadata.items() if v not in {None, ""}} + + +def _run_review_in_thread( + agent: Any, + messages_snapshot: List[Dict], + prompt: str, +) -> None: + """Worker function executed in the background-review daemon thread. + + Spawns a forked ``AIAgent`` inheriting the parent's runtime, runs the + review prompt, and surfaces a compact action summary back to the user + via ``agent._safe_print`` and ``agent.background_review_callback``. + """ + # Local import to avoid a hard circular dep at module load. + from run_agent import AIAgent + from tools.terminal_tool import set_approval_callback as _set_approval_callback + + # Install a non-interactive approval callback on this worker + # thread so any dangerous-command guard the review agent trips + # resolves to "deny" instead of falling back to input() -- which + # deadlocks against the parent's prompt_toolkit TUI (#15216). + # Same pattern as _subagent_auto_deny in tools/delegate_tool.py. + def _bg_review_auto_deny(command, description, **kwargs): + logger.warning( + "Background review auto-denied dangerous command: %s (%s)", + command, description, + ) + return "deny" + try: + _set_approval_callback(_bg_review_auto_deny) + except Exception: + pass + + review_agent = None + review_messages: List[Dict] = [] + try: + with open(os.devnull, "w", encoding="utf-8") as _devnull, \ + contextlib.redirect_stdout(_devnull), \ + contextlib.redirect_stderr(_devnull): + # Inherit the parent agent's live runtime (provider, model, + # base_url, api_key, api_mode) so the fork uses the exact + # same credentials the main turn is using. Without this, + # AIAgent.__init__ re-runs auto-resolution from env vars, + # which fails for OAuth-only providers, session-scoped + # creds, or credential-pool setups where the resolver can't + # reconstruct auth from scratch -- producing the spurious + # "No LLM provider configured" warning at end of turn. + _parent_runtime = agent._current_main_runtime() + _parent_api_mode = _parent_runtime.get("api_mode") or None + # The review fork needs to call agent-loop tools (memory, + # skill_manage). Those tools require Hermes' own dispatch, + # which the codex_app_server runtime bypasses entirely + # (it runs the turn inside codex's subprocess). So when + # the parent is on codex_app_server, downgrade the review + # fork to codex_responses — same auth/credentials, but + # talks to the OpenAI Responses API directly so Hermes + # owns the loop and the agent-loop tools dispatch. + if _parent_api_mode == "codex_app_server": + _parent_api_mode = "codex_responses" + review_agent = AIAgent( + model=agent.model, + max_iterations=16, + quiet_mode=True, + platform=agent.platform, + provider=agent.provider, + api_mode=_parent_api_mode, + base_url=_parent_runtime.get("base_url") or None, + api_key=_parent_runtime.get("api_key") or None, + credential_pool=getattr(agent, "_credential_pool", None), + parent_session_id=agent.session_id, + ) + review_agent._memory_write_origin = "background_review" + review_agent._memory_write_context = "background_review" + review_agent._memory_store = agent._memory_store + review_agent._memory_enabled = agent._memory_enabled + review_agent._user_profile_enabled = agent._user_profile_enabled + review_agent._memory_nudge_interval = 0 + review_agent._skill_nudge_interval = 0 + # Suppress all status/warning emits from the fork so the + # user only sees the final successful-action summary. + # Without this, mid-review "Iteration budget exhausted", + # rate-limit retries, compression warnings, and other + # lifecycle messages bubble up through _emit_status -> + # _vprint and leak past the stdout redirect (they go via + # _print_fn/status_callback, which bypass sys.stdout). + review_agent.suppress_status_output = True + # Inherit the parent's cached system prompt verbatim so + # the review fork's outbound HTTP request hits the same + # Anthropic/OpenRouter prefix cache the parent warmed. + # Without this, the fork rebuilds the system prompt from + # scratch (fresh _hermes_now() timestamp, fresh + # session_id, narrower toolset → different skills_prompt) + # and the byte-exact prefix-cache key misses. See + # issue #25322 and PR #17276 for the full analysis + + # measured impact (~26% end-to-end cost reduction on + # Sonnet 4.5). + review_agent._cached_system_prompt = agent._cached_system_prompt + # Defensive: pin session_start + session_id to the + # parent's so any code path that re-renders parts of + # the system prompt (compression, plugin hooks) still + # produces byte-identical output. The cached-prompt + # assignment above already short-circuits the normal + # rebuild path, but these pins guarantee parity even + # if a future code path bypasses the cache. + review_agent.session_start = agent.session_start + review_agent.session_id = agent.session_id + + from model_tools import get_tool_definitions + from hermes_cli.plugins import ( + set_thread_tool_whitelist, + clear_thread_tool_whitelist, + ) + + review_whitelist = { + t["function"]["name"] + for t in get_tool_definitions( + enabled_toolsets=["memory", "skills"], + quiet_mode=True, + ) + } + set_thread_tool_whitelist( + review_whitelist, + deny_msg_fmt=( + "Background review denied non-whitelisted tool: " + "{tool_name}. Only memory/skill tools are allowed." + ), + ) + try: + review_agent.run_conversation( + user_message=( + prompt + + "\n\nYou can only call memory and skill " + "management tools. Other tools will be denied " + "at runtime — do not attempt them." + ), + conversation_history=messages_snapshot, + ) + finally: + clear_thread_tool_whitelist() + + # Tear down memory providers while stdout is still + # redirected so background thread teardown (Honcho flush, + # Hindsight sync, etc.) stays silent. The finally block + # below is a safety net for the exception path. + try: + review_agent.shutdown_memory_provider() + except Exception: + pass + try: + review_agent.close() + except Exception: + pass + review_messages = list(getattr(review_agent, "_session_messages", [])) + review_agent = None + + # Scan the review agent's messages for successful tool actions + # and surface a compact summary to the user. Tool messages + # already present in messages_snapshot must be skipped, since + # the review agent inherits that history and would otherwise + # re-surface stale "created"/"updated" messages from the prior + # conversation as if they just happened (issue #14944). + actions = summarize_background_review_actions( + review_messages, + messages_snapshot, + ) + + if actions: + summary = " · ".join(dict.fromkeys(actions)) + agent._safe_print( + f" 💾 Self-improvement review: {summary}" + ) + _bg_cb = agent.background_review_callback + if _bg_cb: + try: + _bg_cb( + f"💾 Self-improvement review: {summary}" + ) + except Exception: + pass + + except Exception as e: + logger.warning("Background memory/skill review failed: %s", e) + agent._emit_auxiliary_failure("background review", e) + finally: + # Safety-net cleanup for the exception path. Normal + # completion already shut down inside redirect_stdout above. + # Re-open devnull here so any teardown output (Honcho flush, + # Hindsight sync, background thread joins) stays silent even + # on the exception path where redirect_stdout already exited. + if review_agent is not None: + try: + with open(os.devnull, "w", encoding="utf-8") as _fn, \ + contextlib.redirect_stdout(_fn), \ + contextlib.redirect_stderr(_fn): + try: + review_agent.shutdown_memory_provider() + except Exception: + pass + try: + review_agent.close() + except Exception: + pass + except Exception: + pass + # Clear the approval callback on this bg-review thread so a + # recycled thread-id doesn't inherit a stale reference. + try: + _set_approval_callback(None) + except Exception: + pass + + +def spawn_background_review_thread( + agent: Any, + messages_snapshot: List[Dict], + review_memory: bool = False, + review_skills: bool = False, +): + """Build the review thread target and prompt for a background review. + + Returns a ``(target, prompt)`` tuple. The caller (``AIAgent._spawn_background_review``) + owns the actual ``threading.Thread`` construction so test-level patches + of ``run_agent.threading.Thread`` keep working. + """ + # Pick the right prompt based on which triggers fired + if review_memory and review_skills: + prompt = agent._COMBINED_REVIEW_PROMPT + elif review_memory: + prompt = agent._MEMORY_REVIEW_PROMPT + else: + prompt = agent._SKILL_REVIEW_PROMPT + + def _target() -> None: + _run_review_in_thread(agent, messages_snapshot, prompt) + + return _target, prompt + + +__all__ = [ + "spawn_background_review_thread", + "summarize_background_review_actions", + "build_memory_write_metadata", +] diff --git a/run_agent.py b/run_agent.py index 22848b2f20e..28171724dd6 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3358,63 +3358,9 @@ class AIAgent: review_messages: List[Dict], prior_snapshot: List[Dict], ) -> List[str]: - """Build the human-facing action summary for a background review pass. - - Walks the review agent's session messages and collects "successful tool - action" descriptions to surface to the user (e.g. "Memory updated"). - Tool messages already present in ``prior_snapshot`` are skipped so we - don't re-surface stale results from the prior conversation that the - review agent inherited via ``conversation_history`` (issue #14944). - - Matching is by ``tool_call_id`` when available, with a content-equality - fallback for tool messages that lack one. - """ - existing_tool_call_ids = set() - existing_tool_contents = set() - for prior in prior_snapshot or []: - if not isinstance(prior, dict) or prior.get("role") != "tool": - continue - tcid = prior.get("tool_call_id") - if tcid: - existing_tool_call_ids.add(tcid) - else: - content = prior.get("content") - if isinstance(content, str): - existing_tool_contents.add(content) - - actions: List[str] = [] - for msg in review_messages or []: - if not isinstance(msg, dict) or msg.get("role") != "tool": - continue - tcid = msg.get("tool_call_id") - if tcid and tcid in existing_tool_call_ids: - continue - if not tcid: - content_str = msg.get("content") - if isinstance(content_str, str) and content_str in existing_tool_contents: - continue - try: - data = json.loads(msg.get("content", "{}")) - except (json.JSONDecodeError, TypeError): - continue - if not isinstance(data, dict) or not data.get("success"): - continue - message = data.get("message", "") - target = data.get("target", "") - if "created" in message.lower(): - actions.append(message) - elif "updated" in message.lower(): - actions.append(message) - elif "added" in message.lower() or (target and "add" in message.lower()): - label = "Memory" if target == "memory" else "User profile" if target == "user" else target - actions.append(f"{label} updated") - elif "Entry added" in message: - label = "Memory" if target == "memory" else "User profile" if target == "user" else target - actions.append(f"{label} updated") - elif "removed" in message.lower() or "replaced" in message.lower(): - label = "Memory" if target == "memory" else "User profile" if target == "user" else target - actions.append(f"{label} updated") - return actions + """Forwarder — see ``agent.background_review.summarize_background_review_actions``.""" + from agent.background_review import summarize_background_review_actions + return summarize_background_review_actions(review_messages, prior_snapshot) def _spawn_background_review( self, @@ -3422,219 +3368,22 @@ class AIAgent: review_memory: bool = False, review_skills: bool = False, ) -> None: - """Spawn a background thread to review the conversation for memory/skill saves. + """Spawn the background memory/skill review thread. - Creates a full AIAgent fork with the same model, tools, and context as the - main session. The review prompt is appended as the next user turn in the - forked conversation. Writes directly to the shared memory/skill stores. - Never modifies the main conversation history or produces user-visible output. + Thin wrapper — the heavy lifting lives in + ``agent.background_review.spawn_background_review_thread`` which + returns the thread target. ``threading.Thread`` is constructed + here so existing tests that patch ``run_agent.threading.Thread`` + keep working. """ - import threading - - # Pick the right prompt based on which triggers fired - if review_memory and review_skills: - prompt = self._COMBINED_REVIEW_PROMPT - elif review_memory: - prompt = self._MEMORY_REVIEW_PROMPT - else: - prompt = self._SKILL_REVIEW_PROMPT - - def _run_review(): - import contextlib - # Install a non-interactive approval callback on this worker - # thread so any dangerous-command guard the review agent trips - # resolves to "deny" instead of falling back to input() -- which - # deadlocks against the parent's prompt_toolkit TUI (#15216). - # Same pattern as _subagent_auto_deny in tools/delegate_tool.py. - def _bg_review_auto_deny(command, description, **kwargs): - logger.warning( - "Background review auto-denied dangerous command: %s (%s)", - command, description, - ) - return "deny" - try: - _set_approval_callback(_bg_review_auto_deny) - except Exception: - pass - review_agent = None - review_messages = [] - try: - with open(os.devnull, "w", encoding="utf-8") as _devnull, \ - contextlib.redirect_stdout(_devnull), \ - contextlib.redirect_stderr(_devnull): - # Inherit the parent agent's live runtime (provider, model, - # base_url, api_key, api_mode) so the fork uses the exact - # same credentials the main turn is using. Without this, - # AIAgent.__init__ re-runs auto-resolution from env vars, - # which fails for OAuth-only providers, session-scoped - # creds, or credential-pool setups where the resolver can't - # reconstruct auth from scratch -- producing the spurious - # "No LLM provider configured" warning at end of turn. - _parent_runtime = self._current_main_runtime() - _parent_api_mode = _parent_runtime.get("api_mode") or None - # The review fork needs to call agent-loop tools (memory, - # skill_manage). Those tools require Hermes' own dispatch, - # which the codex_app_server runtime bypasses entirely - # (it runs the turn inside codex's subprocess). So when - # the parent is on codex_app_server, downgrade the review - # fork to codex_responses — same auth/credentials, but - # talks to the OpenAI Responses API directly so Hermes - # owns the loop and the agent-loop tools dispatch. - if _parent_api_mode == "codex_app_server": - _parent_api_mode = "codex_responses" - review_agent = AIAgent( - model=self.model, - max_iterations=16, - quiet_mode=True, - platform=self.platform, - provider=self.provider, - api_mode=_parent_api_mode, - base_url=_parent_runtime.get("base_url") or None, - api_key=_parent_runtime.get("api_key") or None, - credential_pool=getattr(self, "_credential_pool", None), - parent_session_id=self.session_id, - ) - review_agent._memory_write_origin = "background_review" - review_agent._memory_write_context = "background_review" - review_agent._memory_store = self._memory_store - review_agent._memory_enabled = self._memory_enabled - review_agent._user_profile_enabled = self._user_profile_enabled - review_agent._memory_nudge_interval = 0 - review_agent._skill_nudge_interval = 0 - # Suppress all status/warning emits from the fork so the - # user only sees the final successful-action summary. - # Without this, mid-review "Iteration budget exhausted", - # rate-limit retries, compression warnings, and other - # lifecycle messages bubble up through _emit_status -> - # _vprint and leak past the stdout redirect (they go via - # _print_fn/status_callback, which bypass sys.stdout). - review_agent.suppress_status_output = True - # Inherit the parent's cached system prompt verbatim so - # the review fork's outbound HTTP request hits the same - # Anthropic/OpenRouter prefix cache the parent warmed. - # Without this, the fork rebuilds the system prompt from - # scratch (fresh _hermes_now() timestamp, fresh - # session_id, narrower toolset → different skills_prompt) - # and the byte-exact prefix-cache key misses. See - # issue #25322 and PR #17276 for the full analysis + - # measured impact (~26% end-to-end cost reduction on - # Sonnet 4.5). - review_agent._cached_system_prompt = self._cached_system_prompt - # Defensive: pin session_start + session_id to the - # parent's so any code path that re-renders parts of - # the system prompt (compression, plugin hooks) still - # produces byte-identical output. The cached-prompt - # assignment above already short-circuits the normal - # rebuild path, but these pins guarantee parity even - # if a future code path bypasses the cache. - review_agent.session_start = self.session_start - review_agent.session_id = self.session_id - - from model_tools import get_tool_definitions - from hermes_cli.plugins import ( - set_thread_tool_whitelist, - clear_thread_tool_whitelist, - ) - - review_whitelist = { - t["function"]["name"] - for t in get_tool_definitions( - enabled_toolsets=["memory", "skills"], - quiet_mode=True, - ) - } - set_thread_tool_whitelist( - review_whitelist, - deny_msg_fmt=( - "Background review denied non-whitelisted tool: " - "{tool_name}. Only memory/skill tools are allowed." - ), - ) - try: - review_agent.run_conversation( - user_message=( - prompt - + "\n\nYou can only call memory and skill " - "management tools. Other tools will be denied " - "at runtime — do not attempt them." - ), - conversation_history=messages_snapshot, - ) - finally: - clear_thread_tool_whitelist() - - # Tear down memory providers while stdout is still - # redirected so background thread teardown (Honcho flush, - # Hindsight sync, etc.) stays silent. The finally block - # below is a safety net for the exception path. - try: - review_agent.shutdown_memory_provider() - except Exception: - pass - try: - review_agent.close() - except Exception: - pass - review_messages = list(getattr(review_agent, "_session_messages", [])) - review_agent = None - - # Scan the review agent's messages for successful tool actions - # and surface a compact summary to the user. Tool messages - # already present in messages_snapshot must be skipped, since - # the review agent inherits that history and would otherwise - # re-surface stale "created"/"updated" messages from the prior - # conversation as if they just happened (issue #14944). - actions = self._summarize_background_review_actions( - review_messages, - messages_snapshot, - ) - - if actions: - summary = " · ".join(dict.fromkeys(actions)) - self._safe_print( - f" 💾 Self-improvement review: {summary}" - ) - _bg_cb = self.background_review_callback - if _bg_cb: - try: - _bg_cb( - f"💾 Self-improvement review: {summary}" - ) - except Exception: - pass - - except Exception as e: - logger.warning("Background memory/skill review failed: %s", e) - self._emit_auxiliary_failure("background review", e) - finally: - # Safety-net cleanup for the exception path. Normal - # completion already shut down inside redirect_stdout above. - # Re-open devnull here so any teardown output (Honcho flush, - # Hindsight sync, background thread joins) stays silent even - # on the exception path where redirect_stdout already exited. - if review_agent is not None: - try: - with open(os.devnull, "w", encoding="utf-8") as _fn, \ - contextlib.redirect_stdout(_fn), \ - contextlib.redirect_stderr(_fn): - try: - review_agent.shutdown_memory_provider() - except Exception: - pass - try: - review_agent.close() - except Exception: - pass - except Exception: - pass - # Clear the approval callback on this bg-review thread so a - # recycled thread-id doesn't inherit a stale reference. - try: - _set_approval_callback(None) - except Exception: - pass - - t = threading.Thread(target=_run_review, daemon=True, name="bg-review") + from agent.background_review import spawn_background_review_thread + target, _prompt = spawn_background_review_thread( + self, + messages_snapshot, + review_memory=review_memory, + review_skills=review_skills, + ) + t = threading.Thread(target=target, daemon=True, name="bg-review") t.start() def _build_memory_write_metadata( @@ -3645,23 +3394,15 @@ class AIAgent: task_id: Optional[str] = None, tool_call_id: Optional[str] = None, ) -> Dict[str, Any]: - """Build provenance metadata for external memory-provider mirrors.""" - metadata: Dict[str, Any] = { - "write_origin": write_origin or getattr(self, "_memory_write_origin", "assistant_tool"), - "execution_context": ( - execution_context - or getattr(self, "_memory_write_context", "foreground") - ), - "session_id": self.session_id or "", - "parent_session_id": self._parent_session_id or "", - "platform": self.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"), - "tool_name": "memory", - } - if task_id: - metadata["task_id"] = task_id - if tool_call_id: - metadata["tool_call_id"] = tool_call_id - return {k: v for k, v in metadata.items() if v not in {None, ""}} + """Forwarder — see ``agent.background_review.build_memory_write_metadata``.""" + from agent.background_review import build_memory_write_metadata + return build_memory_write_metadata( + self, + write_origin=write_origin, + execution_context=execution_context, + task_id=task_id, + tool_call_id=tool_call_id, + ) def _apply_persist_user_message_override(self, messages: List[Dict]) -> None: """Rewrite the current-turn user message before persistence/return.