refactor(run_agent): extract background memory/skill review to agent/background_review.py

Move the background-review subsystem (the self-improvement loop — see the
README) out of run_agent.py into a dedicated module.

* summarize_background_review_actions — was the @staticmethod that builds
  the user-facing action summary
* spawn_background_review_thread — builds the thread target + prompt;
  the actual review loop body (forked AIAgent, runtime inheritance,
  tool whitelist, suppression, teardown) lives in _run_review_in_thread
* build_memory_write_metadata — provenance for external memory mirrors

AIAgent keeps thin wrappers for backward compatibility AND because tests
patch run_agent.threading.Thread to assert lifecycle behavior — the
threading.Thread construction stays in AIAgent._spawn_background_review,
the inner work moves out.

tests/run_agent/ + tests/agent/: 4313 passed, 1 pre-existing failure
(test_auxiliary_client.py::test_custom_endpoint... — confirmed failing
on main before this change). 3 skipped.

run_agent.py: 15272 -> 14972 lines (-300).
This commit is contained in:
teknium1 2026-05-16 18:05:01 -07:00
parent 5f309ae685
commit 1f6eb1738c
No known key found for this signature in database
2 changed files with 386 additions and 285 deletions

View file

@ -3358,63 +3358,9 @@ class AIAgent:
review_messages: List[Dict],
prior_snapshot: List[Dict],
) -> List[str]:
"""Build the human-facing action summary for a background review pass.
Walks the review agent's session messages and collects "successful tool
action" descriptions to surface to the user (e.g. "Memory updated").
Tool messages already present in ``prior_snapshot`` are skipped so we
don't re-surface stale results from the prior conversation that the
review agent inherited via ``conversation_history`` (issue #14944).
Matching is by ``tool_call_id`` when available, with a content-equality
fallback for tool messages that lack one.
"""
existing_tool_call_ids = set()
existing_tool_contents = set()
for prior in prior_snapshot or []:
if not isinstance(prior, dict) or prior.get("role") != "tool":
continue
tcid = prior.get("tool_call_id")
if tcid:
existing_tool_call_ids.add(tcid)
else:
content = prior.get("content")
if isinstance(content, str):
existing_tool_contents.add(content)
actions: List[str] = []
for msg in review_messages or []:
if not isinstance(msg, dict) or msg.get("role") != "tool":
continue
tcid = msg.get("tool_call_id")
if tcid and tcid in existing_tool_call_ids:
continue
if not tcid:
content_str = msg.get("content")
if isinstance(content_str, str) and content_str in existing_tool_contents:
continue
try:
data = json.loads(msg.get("content", "{}"))
except (json.JSONDecodeError, TypeError):
continue
if not isinstance(data, dict) or not data.get("success"):
continue
message = data.get("message", "")
target = data.get("target", "")
if "created" in message.lower():
actions.append(message)
elif "updated" in message.lower():
actions.append(message)
elif "added" in message.lower() or (target and "add" in message.lower()):
label = "Memory" if target == "memory" else "User profile" if target == "user" else target
actions.append(f"{label} updated")
elif "Entry added" in message:
label = "Memory" if target == "memory" else "User profile" if target == "user" else target
actions.append(f"{label} updated")
elif "removed" in message.lower() or "replaced" in message.lower():
label = "Memory" if target == "memory" else "User profile" if target == "user" else target
actions.append(f"{label} updated")
return actions
"""Forwarder — see ``agent.background_review.summarize_background_review_actions``."""
from agent.background_review import summarize_background_review_actions
return summarize_background_review_actions(review_messages, prior_snapshot)
def _spawn_background_review(
self,
@ -3422,219 +3368,22 @@ class AIAgent:
review_memory: bool = False,
review_skills: bool = False,
) -> None:
"""Spawn a background thread to review the conversation for memory/skill saves.
"""Spawn the background memory/skill review thread.
Creates a full AIAgent fork with the same model, tools, and context as the
main session. The review prompt is appended as the next user turn in the
forked conversation. Writes directly to the shared memory/skill stores.
Never modifies the main conversation history or produces user-visible output.
Thin wrapper the heavy lifting lives in
``agent.background_review.spawn_background_review_thread`` which
returns the thread target. ``threading.Thread`` is constructed
here so existing tests that patch ``run_agent.threading.Thread``
keep working.
"""
import threading
# Pick the right prompt based on which triggers fired
if review_memory and review_skills:
prompt = self._COMBINED_REVIEW_PROMPT
elif review_memory:
prompt = self._MEMORY_REVIEW_PROMPT
else:
prompt = self._SKILL_REVIEW_PROMPT
def _run_review():
import contextlib
# Install a non-interactive approval callback on this worker
# thread so any dangerous-command guard the review agent trips
# resolves to "deny" instead of falling back to input() -- which
# deadlocks against the parent's prompt_toolkit TUI (#15216).
# Same pattern as _subagent_auto_deny in tools/delegate_tool.py.
def _bg_review_auto_deny(command, description, **kwargs):
logger.warning(
"Background review auto-denied dangerous command: %s (%s)",
command, description,
)
return "deny"
try:
_set_approval_callback(_bg_review_auto_deny)
except Exception:
pass
review_agent = None
review_messages = []
try:
with open(os.devnull, "w", encoding="utf-8") as _devnull, \
contextlib.redirect_stdout(_devnull), \
contextlib.redirect_stderr(_devnull):
# Inherit the parent agent's live runtime (provider, model,
# base_url, api_key, api_mode) so the fork uses the exact
# same credentials the main turn is using. Without this,
# AIAgent.__init__ re-runs auto-resolution from env vars,
# which fails for OAuth-only providers, session-scoped
# creds, or credential-pool setups where the resolver can't
# reconstruct auth from scratch -- producing the spurious
# "No LLM provider configured" warning at end of turn.
_parent_runtime = self._current_main_runtime()
_parent_api_mode = _parent_runtime.get("api_mode") or None
# The review fork needs to call agent-loop tools (memory,
# skill_manage). Those tools require Hermes' own dispatch,
# which the codex_app_server runtime bypasses entirely
# (it runs the turn inside codex's subprocess). So when
# the parent is on codex_app_server, downgrade the review
# fork to codex_responses — same auth/credentials, but
# talks to the OpenAI Responses API directly so Hermes
# owns the loop and the agent-loop tools dispatch.
if _parent_api_mode == "codex_app_server":
_parent_api_mode = "codex_responses"
review_agent = AIAgent(
model=self.model,
max_iterations=16,
quiet_mode=True,
platform=self.platform,
provider=self.provider,
api_mode=_parent_api_mode,
base_url=_parent_runtime.get("base_url") or None,
api_key=_parent_runtime.get("api_key") or None,
credential_pool=getattr(self, "_credential_pool", None),
parent_session_id=self.session_id,
)
review_agent._memory_write_origin = "background_review"
review_agent._memory_write_context = "background_review"
review_agent._memory_store = self._memory_store
review_agent._memory_enabled = self._memory_enabled
review_agent._user_profile_enabled = self._user_profile_enabled
review_agent._memory_nudge_interval = 0
review_agent._skill_nudge_interval = 0
# Suppress all status/warning emits from the fork so the
# user only sees the final successful-action summary.
# Without this, mid-review "Iteration budget exhausted",
# rate-limit retries, compression warnings, and other
# lifecycle messages bubble up through _emit_status ->
# _vprint and leak past the stdout redirect (they go via
# _print_fn/status_callback, which bypass sys.stdout).
review_agent.suppress_status_output = True
# Inherit the parent's cached system prompt verbatim so
# the review fork's outbound HTTP request hits the same
# Anthropic/OpenRouter prefix cache the parent warmed.
# Without this, the fork rebuilds the system prompt from
# scratch (fresh _hermes_now() timestamp, fresh
# session_id, narrower toolset → different skills_prompt)
# and the byte-exact prefix-cache key misses. See
# issue #25322 and PR #17276 for the full analysis +
# measured impact (~26% end-to-end cost reduction on
# Sonnet 4.5).
review_agent._cached_system_prompt = self._cached_system_prompt
# Defensive: pin session_start + session_id to the
# parent's so any code path that re-renders parts of
# the system prompt (compression, plugin hooks) still
# produces byte-identical output. The cached-prompt
# assignment above already short-circuits the normal
# rebuild path, but these pins guarantee parity even
# if a future code path bypasses the cache.
review_agent.session_start = self.session_start
review_agent.session_id = self.session_id
from model_tools import get_tool_definitions
from hermes_cli.plugins import (
set_thread_tool_whitelist,
clear_thread_tool_whitelist,
)
review_whitelist = {
t["function"]["name"]
for t in get_tool_definitions(
enabled_toolsets=["memory", "skills"],
quiet_mode=True,
)
}
set_thread_tool_whitelist(
review_whitelist,
deny_msg_fmt=(
"Background review denied non-whitelisted tool: "
"{tool_name}. Only memory/skill tools are allowed."
),
)
try:
review_agent.run_conversation(
user_message=(
prompt
+ "\n\nYou can only call memory and skill "
"management tools. Other tools will be denied "
"at runtime — do not attempt them."
),
conversation_history=messages_snapshot,
)
finally:
clear_thread_tool_whitelist()
# Tear down memory providers while stdout is still
# redirected so background thread teardown (Honcho flush,
# Hindsight sync, etc.) stays silent. The finally block
# below is a safety net for the exception path.
try:
review_agent.shutdown_memory_provider()
except Exception:
pass
try:
review_agent.close()
except Exception:
pass
review_messages = list(getattr(review_agent, "_session_messages", []))
review_agent = None
# Scan the review agent's messages for successful tool actions
# and surface a compact summary to the user. Tool messages
# already present in messages_snapshot must be skipped, since
# the review agent inherits that history and would otherwise
# re-surface stale "created"/"updated" messages from the prior
# conversation as if they just happened (issue #14944).
actions = self._summarize_background_review_actions(
review_messages,
messages_snapshot,
)
if actions:
summary = " · ".join(dict.fromkeys(actions))
self._safe_print(
f" 💾 Self-improvement review: {summary}"
)
_bg_cb = self.background_review_callback
if _bg_cb:
try:
_bg_cb(
f"💾 Self-improvement review: {summary}"
)
except Exception:
pass
except Exception as e:
logger.warning("Background memory/skill review failed: %s", e)
self._emit_auxiliary_failure("background review", e)
finally:
# Safety-net cleanup for the exception path. Normal
# completion already shut down inside redirect_stdout above.
# Re-open devnull here so any teardown output (Honcho flush,
# Hindsight sync, background thread joins) stays silent even
# on the exception path where redirect_stdout already exited.
if review_agent is not None:
try:
with open(os.devnull, "w", encoding="utf-8") as _fn, \
contextlib.redirect_stdout(_fn), \
contextlib.redirect_stderr(_fn):
try:
review_agent.shutdown_memory_provider()
except Exception:
pass
try:
review_agent.close()
except Exception:
pass
except Exception:
pass
# Clear the approval callback on this bg-review thread so a
# recycled thread-id doesn't inherit a stale reference.
try:
_set_approval_callback(None)
except Exception:
pass
t = threading.Thread(target=_run_review, daemon=True, name="bg-review")
from agent.background_review import spawn_background_review_thread
target, _prompt = spawn_background_review_thread(
self,
messages_snapshot,
review_memory=review_memory,
review_skills=review_skills,
)
t = threading.Thread(target=target, daemon=True, name="bg-review")
t.start()
def _build_memory_write_metadata(
@ -3645,23 +3394,15 @@ class AIAgent:
task_id: Optional[str] = None,
tool_call_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Build provenance metadata for external memory-provider mirrors."""
metadata: Dict[str, Any] = {
"write_origin": write_origin or getattr(self, "_memory_write_origin", "assistant_tool"),
"execution_context": (
execution_context
or getattr(self, "_memory_write_context", "foreground")
),
"session_id": self.session_id or "",
"parent_session_id": self._parent_session_id or "",
"platform": self.platform or os.environ.get("HERMES_SESSION_SOURCE", "cli"),
"tool_name": "memory",
}
if task_id:
metadata["task_id"] = task_id
if tool_call_id:
metadata["tool_call_id"] = tool_call_id
return {k: v for k, v in metadata.items() if v not in {None, ""}}
"""Forwarder — see ``agent.background_review.build_memory_write_metadata``."""
from agent.background_review import build_memory_write_metadata
return build_memory_write_metadata(
self,
write_origin=write_origin,
execution_context=execution_context,
task_id=task_id,
tool_call_id=tool_call_id,
)
def _apply_persist_user_message_override(self, messages: List[Dict]) -> None:
"""Rewrite the current-turn user message before persistence/return.