From 54870847cb0f530105907b1a793531b8d0f03d78 Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sun, 7 Jun 2026 20:55:46 -0700 Subject: [PATCH] refactor(agent): extract run_conversation prologue into agent/turn_context.py Phase 1 of the god-file decomposition plan. run_conversation's ~470-line once-per-turn setup block (stdio guarding, retry-counter resets, user-message sanitization, todo/nudge hydration, system-prompt restore-or-build, crash-resilience persistence, preflight compression, the pre_llm_call hook, and external-memory prefetch) is moved verbatim into build_turn_context(), which returns a TurnContext dataclass the loop unpacks. Behavior-neutral move-and-name refactor: the builder mutates `agent` exactly as the inline code did; only the locals the loop reads back are returned. - run_conversation: 4602 -> 4217 LOC (-385) - agent/conversation_loop.py: 4965 -> ~4580 LOC - new agent/turn_context.py: focused, dependency-injected, unit-tested in isolation Tests: tests/run_agent/ 1570 passed / 0 failed under per-file process isolation. Relocation follow-ups: 413_compression mocks now patch both module references; nudge/on_turn_start source-inspection guards point at the extracted module. --- agent/conversation_loop.py | 451 ++---------------- agent/turn_context.py | 388 +++++++++++++++ tests/agent/test_turn_context.py | 187 ++++++++ tests/run_agent/test_413_compression.py | 4 + .../test_memory_nudge_counter_hydration.py | 32 +- tests/run_agent/test_run_agent.py | 32 +- 6 files changed, 648 insertions(+), 446 deletions(-) create mode 100644 agent/turn_context.py create mode 100644 tests/agent/test_turn_context.py diff --git a/agent/conversation_loop.py b/agent/conversation_loop.py index 330d37df270..36f35a45a0f 100644 --- a/agent/conversation_loop.py +++ b/agent/conversation_loop.py @@ -31,6 +31,7 @@ from agent.codex_responses_adapter import _summarize_user_message_for_log from agent.display import KawaiiSpinner from agent.error_classifier import FailoverReason, classify_api_error from agent.iteration_budget import IterationBudget +from agent.turn_context import build_turn_context from agent.memory_manager import build_memory_context_block from agent.message_sanitization import ( _repair_tool_call_arguments, @@ -389,376 +390,43 @@ def run_conversation( Returns: Dict: Complete conversation result with final response and message history """ - # Guard stdio against OSError from broken pipes (systemd/headless/daemon). - # Installed once, transparent when streams are healthy, prevents crash on write. - _install_safe_stdio() - - agent._ensure_db_session() - - # Tell auxiliary_client what the live main provider/model are for - # this turn. Used by tools whose behaviour depends on the active - # main model (e.g. vision_analyze's native fast path) so they see - # the CLI/gateway override instead of the stale config.yaml - # default. Idempotent — fine to call every turn. - try: - from agent.auxiliary_client import set_runtime_main - set_runtime_main( - getattr(agent, "provider", "") or "", - getattr(agent, "model", "") or "", - base_url=getattr(agent, "base_url", "") or "", - api_key=getattr(agent, "api_key", "") or "", - api_mode=getattr(agent, "api_mode", "") or "", - ) - except Exception: - pass - - # Tag all log records on this thread with the session ID so - # ``hermes logs --session `` can filter a single conversation. - set_session_context(agent.session_id) - - # Bind the skill write-origin ContextVar for this thread so tool - # handlers (e.g. skill_manage create) can tell whether they are - # running inside the background agent-improvement review fork vs. - # a foreground user-directed turn. Set at the top of each call; - # the review fork runs on its own thread with a fresh context, - # so the foreground value here does not leak into it. - set_current_write_origin(getattr(agent, "_memory_write_origin", "assistant_tool")) - - # If the previous turn activated fallback, restore the primary - # runtime so this turn gets a fresh attempt with the preferred model. - # No-op when _fallback_activated is False (gateway, first turn, etc.). - agent._restore_primary_runtime() - - # Sanitize surrogate characters from user input. Clipboard paste from - # rich-text editors (Google Docs, Word, etc.) can inject lone surrogates - # that are invalid UTF-8 and crash JSON serialization in the OpenAI SDK. - if isinstance(user_message, str): - user_message = _sanitize_surrogates(user_message) - if isinstance(persist_user_message, str): - persist_user_message = _sanitize_surrogates(persist_user_message) - - # Store stream callback for _interruptible_api_call to pick up - agent._stream_callback = stream_callback - agent._persist_user_message_idx = None - agent._persist_user_message_override = persist_user_message - # Generate unique task_id if not provided to isolate VMs between concurrent tasks - effective_task_id = task_id or str(uuid.uuid4()) - # Expose the active task_id so tools running mid-turn (e.g. delegate_task - # in delegate_tool.py) can identify this agent for the cross-agent file - # state registry. Set BEFORE any tool dispatch so snapshots taken at - # child-launch time see the parent's real id, not None. - agent._current_task_id = effective_task_id - turn_id = f"{agent.session_id or 'session'}:{effective_task_id}:{uuid.uuid4().hex[:8]}" - agent._current_turn_id = turn_id - agent._current_api_request_id = "" - - # Reset retry counters and iteration budget at the start of each turn - # so subagent usage from a previous turn doesn't eat into the next one. - agent._invalid_tool_retries = 0 - agent._invalid_json_retries = 0 - agent._empty_content_retries = 0 - agent._incomplete_scratchpad_retries = 0 - agent._codex_incomplete_retries = 0 - agent._thinking_prefill_retries = 0 - agent._post_tool_empty_retried = False - agent._last_content_with_tools = None - agent._last_content_tools_all_housekeeping = False - agent._mute_post_response = False - agent._unicode_sanitization_passes = 0 - agent._tool_guardrails.reset_for_turn() - agent._tool_guardrail_halt_decision = None - # True until the server rejects an image_url content part with an error - # like "Only 'text' content type is supported." Set to False on first - # rejection and kept False for the rest of the session so we never re-send - # images to a text-only endpoint. Scoped per `_run()` call, not per instance. - agent._vision_supported = True - - # Pre-turn connection health check: detect and clean up dead TCP - # connections left over from provider outages or dropped streams. - # This prevents the next API call from hanging on a zombie socket. - if agent.api_mode != "anthropic_messages": - try: - if agent._cleanup_dead_connections(): - agent._emit_status( - "🔌 Detected stale connections from a previous provider " - "issue — cleaned up automatically. Proceeding with fresh " - "connection." - ) - except Exception: - pass - # Replay compression warning through status_callback for gateway - # platforms (the callback was not wired during __init__). - if agent._compression_warning: - agent._replay_compression_warning() - agent._compression_warning = None # send once - - # NOTE: _turns_since_memory and _iters_since_skill are NOT reset here. - # They are initialized in __init__ and must persist across run_conversation - # calls so that nudge logic accumulates correctly in CLI mode. - agent.iteration_budget = IterationBudget(agent.max_iterations) - - # Log conversation turn start for debugging/observability - _preview_text = _summarize_user_message_for_log(user_message) - _msg_preview = (_preview_text[:80] + "...") if len(_preview_text) > 80 else _preview_text - _msg_preview = _msg_preview.replace("\n", " ") - logger.info( - "conversation turn: session=%s model=%s provider=%s platform=%s history=%d msg=%r", - agent.session_id or "none", agent.model, agent.provider or "unknown", - agent.platform or "unknown", len(conversation_history or []), - _msg_preview, + # ── Per-turn setup (the prologue) ── + # All once-per-turn setup — stdio guarding, retry-counter resets, user + # message sanitization, todo/nudge hydration, system-prompt restore-or- + # build, crash-resilience persistence, preflight compression, the + # ``pre_llm_call`` plugin hook, and external-memory prefetch — lives in + # ``build_turn_context``. It mutates ``agent`` exactly as the inline code + # did and returns the locals the loop below reads back. See + # ``agent/turn_context.py``. + _ctx = build_turn_context( + agent, + user_message, + system_message, + conversation_history, + task_id, + stream_callback, + persist_user_message, + restore_or_build_system_prompt=_restore_or_build_system_prompt, + install_safe_stdio=_install_safe_stdio, + sanitize_surrogates=_sanitize_surrogates, + summarize_user_message_for_log=_summarize_user_message_for_log, + set_session_context=set_session_context, + set_current_write_origin=set_current_write_origin, + ra=_ra, ) + user_message = _ctx.user_message + original_user_message = _ctx.original_user_message + messages = _ctx.messages + conversation_history = _ctx.conversation_history + active_system_prompt = _ctx.active_system_prompt + effective_task_id = _ctx.effective_task_id + turn_id = _ctx.turn_id + current_turn_user_idx = _ctx.current_turn_user_idx + _should_review_memory = _ctx.should_review_memory + _plugin_user_context = _ctx.plugin_user_context + _ext_prefetch_cache = _ctx.ext_prefetch_cache - # Initialize conversation (copy to avoid mutating the caller's list) - messages = list(conversation_history) if conversation_history else [] - - # Hydrate todo store from conversation history (gateway creates a fresh - # AIAgent per message, so the in-memory store is empty -- we need to - # recover the todo state from the most recent todo tool response in history) - if conversation_history and not agent._todo_store.has_items(): - agent._hydrate_todo_store(conversation_history) - - # Hydrate per-session nudge counters from persisted history. - # Gateway creates a fresh AIAgent per inbound message (cache miss / - # 1h idle eviction / config-signature mismatch / process restart), so - # _turns_since_memory and _user_turn_count start at 0 every turn and - # the memory.nudge_interval trigger may never be reached. Reconstruct - # an effective count from prior user turns in conversation_history. - # Idempotent: a cached agent that already accumulated counters keeps - # them; only a freshly-built agent with empty in-memory state hydrates. - # See issue #22357. - if conversation_history and agent._user_turn_count == 0: - prior_user_turns = sum( - 1 for m in conversation_history if m.get("role") == "user" - ) - if prior_user_turns > 0: - agent._user_turn_count = prior_user_turns - if agent._memory_nudge_interval > 0 and agent._turns_since_memory == 0: - # % preserves original 1-in-N cadence rather than firing a - # review immediately on resume (which would surprise users - # whose session happened to land just past a multiple of N). - agent._turns_since_memory = prior_user_turns % agent._memory_nudge_interval - - - # Prefill messages (few-shot priming) are injected at API-call time only, - # never stored in the messages list. This keeps them ephemeral: they won't - # be saved to session DB, session logs, or batch trajectories, but they're - # automatically re-applied on every API call (including session continuations). - - # Track user turns for memory flush and periodic nudge logic - agent._user_turn_count += 1 - - # Reset the streaming context scrubber at the top of each turn so a - # hung span from a prior interrupted stream can't taint this turn's - # output. - scrubber = getattr(agent, "_stream_context_scrubber", None) - if scrubber is not None: - scrubber.reset() - # Reset the think scrubber for the same reason — an interrupted - # prior stream may have left us inside an unterminated block. - think_scrubber = getattr(agent, "_stream_think_scrubber", None) - if think_scrubber is not None: - think_scrubber.reset() - - # Preserve the original user message (no nudge injection). - original_user_message = persist_user_message if persist_user_message is not None else user_message - - # Track memory nudge trigger (turn-based, checked here). - # Skill trigger is checked AFTER the agent loop completes, based on - # how many tool iterations THIS turn used. - _should_review_memory = False - if (agent._memory_nudge_interval > 0 - and "memory" in agent.valid_tool_names - and agent._memory_store): - agent._turns_since_memory += 1 - if agent._turns_since_memory >= agent._memory_nudge_interval: - _should_review_memory = True - agent._turns_since_memory = 0 - - # Add user message - user_msg = {"role": "user", "content": user_message} - messages.append(user_msg) - current_turn_user_idx = len(messages) - 1 - agent._persist_user_message_idx = current_turn_user_idx - - if not agent.quiet_mode: - _print_preview = _summarize_user_message_for_log(user_message) - agent._safe_print(f"💬 Starting conversation: '{_print_preview[:60]}{'...' if len(_print_preview) > 60 else ''}'") - - # ── System prompt (cached per session for prefix caching) ── - # Built once on first call, reused for all subsequent calls. - # Only rebuilt after context compression events (which invalidate - # the cache and reload memory from disk). - # - # For continuing sessions (gateway creates a fresh AIAgent per - # message), we load the stored system prompt from the session DB - # instead of rebuilding. Rebuilding would pick up memory changes - # from disk that the model already knows about (it wrote them!), - # producing a different system prompt and breaking the Anthropic - # prefix cache. - if agent._cached_system_prompt is None: - _restore_or_build_system_prompt(agent, system_message, conversation_history) - - active_system_prompt = agent._cached_system_prompt - - # Crash-resilience: persist the inbound user turn as soon as the session row - # has a valid system prompt, before any provider call or tool execution can - # hang/kill the process. The normal end-of-turn persist still runs later; - # _last_flushed_db_idx makes this idempotent and prevents duplicate rows. - try: - agent._persist_session(messages, conversation_history) - except Exception: - logger.warning( - "Early turn-start session persistence failed for session=%s", - agent.session_id or "none", - exc_info=True, - ) - - # ── Preflight context compression ── - # Before entering the main loop, check if the loaded conversation - # history already exceeds the model's context threshold. This handles - # cases where a user switches to a model with a smaller context window - # while having a large existing session — compress proactively rather - # than waiting for an API error (which might be caught as a non-retryable - # 4xx and abort the request entirely). - if ( - agent.compression_enabled - and len(messages) > agent.context_compressor.protect_first_n - + agent.context_compressor.protect_last_n + 1 - ): - # Include tool schema tokens — with many tools these can add - # 20-30K+ tokens that the old sys+msg estimate missed entirely. - _preflight_tokens = estimate_request_tokens_rough( - messages, - system_prompt=active_system_prompt or "", - tools=agent.tools or None, - ) - _compressor = agent.context_compressor - _defer_preflight = getattr( - _compressor, - "should_defer_preflight_to_real_usage", - lambda _tokens: False, - ) - _preflight_deferred = _defer_preflight(_preflight_tokens) - - if not _preflight_deferred: - # Keep the CLI/ACP context display in sync with what preflight - # actually measured. The status bar reads - # ``compressor.last_prompt_tokens``, which otherwise only updates - # from a *successful* API response. When the conversation has grown - # since the last successful call — or when compression then fails - # (e.g. the auxiliary summary model times out) and no fresh usage - # arrives — the bar stays stuck at the old, smaller value while - # preflight reports a much larger number, looking out of sync. - # Seed it with the fresh estimate (only ever revising upward; a real - # ``update_from_response`` will correct it after the next API call). - # Skipped when deferring — a deferred estimate is known to over-count - # vs the last real provider prompt, so trusting it for the display - # would re-introduce the very desync we're avoiding. - _last = _compressor.last_prompt_tokens - # Do NOT overwrite the -1 sentinel. compress_context() sets - # last_prompt_tokens=-1 right after compression to mark "no real API - # usage yet". `(x or 0)` evaluates to -1 (truthy) for the sentinel, - # so the old comparison was always True and clobbered the sentinel - # with a schema-inflated rough estimate — re-triggering compression - # on the next turn (#36718). Treat any negative value as "no data". - if _last >= 0 and _preflight_tokens > _last: - _compressor.last_prompt_tokens = _preflight_tokens - - if _preflight_deferred: - logger.info( - "Skipping preflight compression: rough estimate ~%s >= %s, " - "but last real provider prompt was %s after compression", - f"{_preflight_tokens:,}", - f"{_compressor.threshold_tokens:,}", - f"{_compressor.last_real_prompt_tokens:,}", - ) - elif _compressor.should_compress(_preflight_tokens): - logger.info( - "Preflight compression: ~%s tokens >= %s threshold (model %s, ctx %s)", - f"{_preflight_tokens:,}", - f"{_compressor.threshold_tokens:,}", - agent.model, - f"{_compressor.context_length:,}", - ) - agent._emit_status( - f"📦 Preflight compression: ~{_preflight_tokens:,} tokens " - f">= {_compressor.threshold_tokens:,} threshold. " - "This may take a moment." - ) - # May need multiple passes for very large sessions with small - # context windows (each pass summarises the middle N turns). - for _pass in range(3): - _orig_len = len(messages) - messages, active_system_prompt = agent._compress_context( - messages, system_message, approx_tokens=_preflight_tokens, - task_id=effective_task_id, - ) - if len(messages) >= _orig_len: - break # Cannot compress further - # Compression created a new session — clear the history - # reference so _flush_messages_to_session_db writes ALL - # compressed messages to the new session's SQLite, not - # skipping them because conversation_history is still the - # pre-compression length. - conversation_history = None - # Fix: reset retry counters after compression so the model - # gets a fresh budget on the compressed context. Without - # this, pre-compression retries carry over and the model - # hits "(empty)" immediately after compression-induced - # context loss. - agent._empty_content_retries = 0 - agent._thinking_prefill_retries = 0 - agent._last_content_with_tools = None - agent._last_content_tools_all_housekeeping = False - agent._mute_post_response = False - # Re-estimate after compression - _preflight_tokens = estimate_request_tokens_rough( - messages, - system_prompt=active_system_prompt or "", - tools=agent.tools or None, - ) - if not _compressor.should_compress(_preflight_tokens): - break # Under threshold or anti-thrash guard stopped it - - # Plugin hook: pre_llm_call - # Fired once per turn before the tool-calling loop. Plugins can - # return a dict with a ``context`` key (or a plain string) whose - # value is appended to the current turn's user message. - # - # Context is ALWAYS injected into the user message, never the - # system prompt. This preserves the prompt cache prefix — the - # system prompt stays identical across turns so cached tokens - # are reused. The system prompt is Hermes's territory; plugins - # contribute context alongside the user's input. - # - # All injected context is ephemeral (not persisted to session DB). - _plugin_user_context = "" - try: - from hermes_cli.plugins import invoke_hook as _invoke_hook - _pre_results = _invoke_hook( - "pre_llm_call", - session_id=agent.session_id, - task_id=effective_task_id, - turn_id=turn_id, - user_message=original_user_message, - conversation_history=list(messages), - is_first_turn=(not bool(conversation_history)), - model=agent.model, - platform=getattr(agent, "platform", None) or "", - sender_id=getattr(agent, "_user_id", None) or "", - ) - _ctx_parts: list[str] = [] - for r in _pre_results: - if isinstance(r, dict) and r.get("context"): - _ctx_parts.append(str(r["context"])) - elif isinstance(r, str) and r.strip(): - _ctx_parts.append(r) - if _ctx_parts: - _plugin_user_context = "\n\n".join(_ctx_parts) - except Exception as exc: - logger.warning("pre_llm_call hook failed: %s", exc) - - # Main conversation loop + # Main conversation loop counters (pure locals consumed by the loop below). api_call_count = 0 final_response = None interrupted = False @@ -770,53 +438,6 @@ def run_conversation( compression_attempts = 0 _turn_exit_reason = "unknown" # Diagnostic: why the loop ended - # Per-turn file-mutation verifier state. Keyed by resolved path; - # each failed ``write_file`` / ``patch`` call records the error - # preview. Later successful writes to the same path remove the - # entry (the model recovered). At end-of-turn, any entries still - # present are surfaced in an advisory footer so the model cannot - # over-claim success while the file is actually unchanged on disk. - agent._turn_failed_file_mutations: Dict[str, Dict[str, Any]] = {} - - # Record the execution thread so interrupt()/clear_interrupt() can - # scope the tool-level interrupt signal to THIS agent's thread only. - # Must be set before any thread-scoped interrupt syncing. - agent._execution_thread_id = threading.current_thread().ident - - # Always clear stale per-thread state from a previous turn. If an - # interrupt arrived before startup finished, preserve it and bind it - # to this execution thread now instead of dropping it on the floor. - _ra()._set_interrupt(False, agent._execution_thread_id) - if agent._interrupt_requested: - _ra()._set_interrupt(True, agent._execution_thread_id) - agent._interrupt_thread_signal_pending = False - else: - agent._interrupt_message = None - agent._interrupt_thread_signal_pending = False - - # Notify memory providers of the new turn so cadence tracking works. - # Must happen BEFORE prefetch_all() so providers know which turn it is - # and can gate context/dialectic refresh via contextCadence/dialecticCadence. - if agent._memory_manager: - try: - _turn_msg = original_user_message if isinstance(original_user_message, str) else "" - agent._memory_manager.on_turn_start(agent._user_turn_count, _turn_msg) - except Exception: - pass - - # External memory provider: prefetch once before the tool loop. - # Reuse the cached result on every iteration to avoid re-calling - # prefetch_all() on each tool call (10 tool calls = 10x latency + cost). - # Use original_user_message (clean input) — user_message may contain - # injected skill content that bloats / breaks provider queries. - _ext_prefetch_cache = "" - if agent._memory_manager: - try: - _query = original_user_message if isinstance(original_user_message, str) else "" - _ext_prefetch_cache = agent._memory_manager.prefetch_all(_query) or "" - except Exception: - pass - # Optional opt-in runtime: if api_mode == codex_app_server, hand the # turn to the codex app-server subprocess (terminal/file ops/patching # all run inside Codex). Default Hermes path is bypassed entirely. diff --git a/agent/turn_context.py b/agent/turn_context.py new file mode 100644 index 00000000000..e94d43279ab --- /dev/null +++ b/agent/turn_context.py @@ -0,0 +1,388 @@ +"""Per-turn setup for ``run_conversation`` (the turn prologue). + +``run_conversation`` opened with ~470 lines of straight-line setup before the +tool-calling loop ever started: stdio guarding, runtime-main wiring, retry-counter +resets, user-message sanitization, todo/nudge-counter hydration, system-prompt +restore-or-build, crash-resilience persistence, preflight context compression, the +``pre_llm_call`` plugin hook, and external-memory prefetch. + +All of that is *prologue* — it runs once per turn, has no back-references into the +loop, and produces a fixed set of values the loop then consumes. ``TurnContext`` +captures those produced values; ``build_turn_context`` performs the setup work and +returns one. ``run_conversation`` is left to unpack the context and run the loop, +shrinking the orchestrator by the full prologue. + +The builder still mutates ``agent`` heavily (counters, thread id, cached prompt, +session DB) exactly as the inline code did — those side effects are the point. The +``TurnContext`` it returns carries only the *locals* the loop reads back. + +Behavior is identical to the original inline prologue; this is a pure +move-and-name refactor with no semantic change. +""" + +from __future__ import annotations + +import logging +import threading +import uuid +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +from agent.iteration_budget import IterationBudget +from agent.model_metadata import estimate_request_tokens_rough + +logger = logging.getLogger(__name__) + + +@dataclass +class TurnContext: + """Values produced by the turn prologue and consumed by the turn loop.""" + + # Sanitized inbound message (surrogates stripped). + user_message: str + # Clean message preserved for transcripts / memory queries (no nudge injection). + original_user_message: Any + # Working message list for this turn (loop appends to it). + messages: List[Dict[str, Any]] + # May be reset to None by preflight compression (new session created). + conversation_history: Optional[List[Dict[str, Any]]] + # Cached system prompt active for this turn (may be rebuilt by compression). + active_system_prompt: Optional[str] + # Task / turn identifiers. + effective_task_id: str + turn_id: str + # Index of the current user turn within ``messages``. + current_turn_user_idx: int + # Whether the post-turn memory review should fire. + should_review_memory: bool = False + # Context contributed by ``pre_llm_call`` plugins (appended to user message). + plugin_user_context: str = "" + # External-memory prefetch result, reused across loop iterations. + ext_prefetch_cache: str = "" + + +def build_turn_context( + agent, + user_message: str, + system_message: Optional[str], + conversation_history: Optional[List[Dict[str, Any]]], + task_id: Optional[str], + stream_callback, + persist_user_message: Optional[str], + *, + restore_or_build_system_prompt, + install_safe_stdio, + sanitize_surrogates, + summarize_user_message_for_log, + set_session_context, + set_current_write_origin, + ra, +) -> TurnContext: + """Run the once-per-turn setup and return the loop's input context. + + The callables/helpers the original prologue referenced from the + ``conversation_loop`` module are passed in explicitly to keep this module + free of an import cycle with ``agent.conversation_loop``. + """ + # Guard stdio against OSError from broken pipes (systemd/headless/daemon). + install_safe_stdio() + + agent._ensure_db_session() + + # Tell auxiliary_client what the live main provider/model are for this turn. + try: + from agent.auxiliary_client import set_runtime_main + set_runtime_main( + getattr(agent, "provider", "") or "", + getattr(agent, "model", "") or "", + base_url=getattr(agent, "base_url", "") or "", + api_key=getattr(agent, "api_key", "") or "", + api_mode=getattr(agent, "api_mode", "") or "", + ) + except Exception: + pass + + # Tag log records on this thread with the session ID for ``hermes logs``. + set_session_context(agent.session_id) + + # Bind the skill write-origin ContextVar for this thread. + set_current_write_origin(getattr(agent, "_memory_write_origin", "assistant_tool")) + + # Restore the primary runtime if the previous turn activated fallback. + agent._restore_primary_runtime() + + # Sanitize surrogate characters from user input. + if isinstance(user_message, str): + user_message = sanitize_surrogates(user_message) + if isinstance(persist_user_message, str): + persist_user_message = sanitize_surrogates(persist_user_message) + + # Store stream callback for _interruptible_api_call to pick up. + agent._stream_callback = stream_callback + agent._persist_user_message_idx = None + agent._persist_user_message_override = persist_user_message + # Generate unique task_id if not provided to isolate VMs between tasks. + effective_task_id = task_id or str(uuid.uuid4()) + agent._current_task_id = effective_task_id + turn_id = f"{agent.session_id or 'session'}:{effective_task_id}:{uuid.uuid4().hex[:8]}" + agent._current_turn_id = turn_id + agent._current_api_request_id = "" + + # Reset retry counters and iteration budget at the start of each turn. + agent._invalid_tool_retries = 0 + agent._invalid_json_retries = 0 + agent._empty_content_retries = 0 + agent._incomplete_scratchpad_retries = 0 + agent._codex_incomplete_retries = 0 + agent._thinking_prefill_retries = 0 + agent._post_tool_empty_retried = False + agent._last_content_with_tools = None + agent._last_content_tools_all_housekeeping = False + agent._mute_post_response = False + agent._unicode_sanitization_passes = 0 + agent._tool_guardrails.reset_for_turn() + agent._tool_guardrail_halt_decision = None + agent._vision_supported = True + + # Pre-turn connection health check: clean up dead TCP connections. + if agent.api_mode != "anthropic_messages": + try: + if agent._cleanup_dead_connections(): + agent._emit_status( + "🔌 Detected stale connections from a previous provider " + "issue — cleaned up automatically. Proceeding with fresh " + "connection." + ) + except Exception: + pass + # Replay compression warning through status_callback for gateway platforms. + if agent._compression_warning: + agent._replay_compression_warning() + agent._compression_warning = None # send once + + # NOTE: _turns_since_memory and _iters_since_skill are NOT reset here. + agent.iteration_budget = IterationBudget(agent.max_iterations) + + # Log conversation turn start for debugging/observability. + _preview_text = summarize_user_message_for_log(user_message) + _msg_preview = (_preview_text[:80] + "...") if len(_preview_text) > 80 else _preview_text + _msg_preview = _msg_preview.replace("\n", " ") + logger.info( + "conversation turn: session=%s model=%s provider=%s platform=%s history=%d msg=%r", + agent.session_id or "none", agent.model, agent.provider or "unknown", + agent.platform or "unknown", len(conversation_history or []), + _msg_preview, + ) + + # Initialize conversation (copy to avoid mutating the caller's list). + messages = list(conversation_history) if conversation_history else [] + + # Hydrate todo store from conversation history. + if conversation_history and not agent._todo_store.has_items(): + agent._hydrate_todo_store(conversation_history) + + # Hydrate per-session nudge counters from persisted history (issue #22357). + if conversation_history and agent._user_turn_count == 0: + prior_user_turns = sum( + 1 for m in conversation_history if m.get("role") == "user" + ) + if prior_user_turns > 0: + agent._user_turn_count = prior_user_turns + if agent._memory_nudge_interval > 0 and agent._turns_since_memory == 0: + agent._turns_since_memory = prior_user_turns % agent._memory_nudge_interval + + # Track user turns for memory flush and periodic nudge logic. + agent._user_turn_count += 1 + + # Reset the streaming context scrubber at the top of each turn. + scrubber = getattr(agent, "_stream_context_scrubber", None) + if scrubber is not None: + scrubber.reset() + # Reset the think scrubber for the same reason. + think_scrubber = getattr(agent, "_stream_think_scrubber", None) + if think_scrubber is not None: + think_scrubber.reset() + + # Preserve the original user message (no nudge injection). + original_user_message = persist_user_message if persist_user_message is not None else user_message + + # Track memory nudge trigger (turn-based, checked here). + should_review_memory = False + if (agent._memory_nudge_interval > 0 + and "memory" in agent.valid_tool_names + and agent._memory_store): + agent._turns_since_memory += 1 + if agent._turns_since_memory >= agent._memory_nudge_interval: + should_review_memory = True + agent._turns_since_memory = 0 + + # Add user message. + user_msg = {"role": "user", "content": user_message} + messages.append(user_msg) + current_turn_user_idx = len(messages) - 1 + agent._persist_user_message_idx = current_turn_user_idx + + if not agent.quiet_mode: + _print_preview = summarize_user_message_for_log(user_message) + agent._safe_print( + f"💬 Starting conversation: '{_print_preview[:60]}" + f"{'...' if len(_print_preview) > 60 else ''}'" + ) + + # ── System prompt (cached per session for prefix caching) ── + if agent._cached_system_prompt is None: + restore_or_build_system_prompt(agent, system_message, conversation_history) + + active_system_prompt = agent._cached_system_prompt + + # Crash-resilience: persist the inbound user turn as soon as the session row exists. + try: + agent._persist_session(messages, conversation_history) + except Exception: + logger.warning( + "Early turn-start session persistence failed for session=%s", + agent.session_id or "none", + exc_info=True, + ) + + # ── Preflight context compression ── + if ( + agent.compression_enabled + and len(messages) > agent.context_compressor.protect_first_n + + agent.context_compressor.protect_last_n + 1 + ): + _preflight_tokens = estimate_request_tokens_rough( + messages, + system_prompt=active_system_prompt or "", + tools=agent.tools or None, + ) + _compressor = agent.context_compressor + _defer_preflight = getattr( + _compressor, + "should_defer_preflight_to_real_usage", + lambda _tokens: False, + ) + _preflight_deferred = _defer_preflight(_preflight_tokens) + + if not _preflight_deferred: + _last = _compressor.last_prompt_tokens + # Do NOT overwrite the -1 sentinel (#36718). + if _last >= 0 and _preflight_tokens > _last: + _compressor.last_prompt_tokens = _preflight_tokens + + if _preflight_deferred: + logger.info( + "Skipping preflight compression: rough estimate ~%s >= %s, " + "but last real provider prompt was %s after compression", + f"{_preflight_tokens:,}", + f"{_compressor.threshold_tokens:,}", + f"{_compressor.last_real_prompt_tokens:,}", + ) + elif _compressor.should_compress(_preflight_tokens): + logger.info( + "Preflight compression: ~%s tokens >= %s threshold (model %s, ctx %s)", + f"{_preflight_tokens:,}", + f"{_compressor.threshold_tokens:,}", + agent.model, + f"{_compressor.context_length:,}", + ) + agent._emit_status( + f"📦 Preflight compression: ~{_preflight_tokens:,} tokens " + f">= {_compressor.threshold_tokens:,} threshold. " + "This may take a moment." + ) + for _pass in range(3): + _orig_len = len(messages) + messages, active_system_prompt = agent._compress_context( + messages, system_message, approx_tokens=_preflight_tokens, + task_id=effective_task_id, + ) + if len(messages) >= _orig_len: + break # Cannot compress further + conversation_history = None + agent._empty_content_retries = 0 + agent._thinking_prefill_retries = 0 + agent._last_content_with_tools = None + agent._last_content_tools_all_housekeeping = False + agent._mute_post_response = False + _preflight_tokens = estimate_request_tokens_rough( + messages, + system_prompt=active_system_prompt or "", + tools=agent.tools or None, + ) + if not _compressor.should_compress(_preflight_tokens): + break + + # Plugin hook: pre_llm_call (context injected into user message, not system prompt). + plugin_user_context = "" + try: + from hermes_cli.plugins import invoke_hook as _invoke_hook + _pre_results = _invoke_hook( + "pre_llm_call", + session_id=agent.session_id, + task_id=effective_task_id, + turn_id=turn_id, + user_message=original_user_message, + conversation_history=list(messages), + is_first_turn=(not bool(conversation_history)), + model=agent.model, + platform=getattr(agent, "platform", None) or "", + sender_id=getattr(agent, "_user_id", None) or "", + ) + _ctx_parts: list[str] = [] + for r in _pre_results: + if isinstance(r, dict) and r.get("context"): + _ctx_parts.append(str(r["context"])) + elif isinstance(r, str) and r.strip(): + _ctx_parts.append(r) + if _ctx_parts: + plugin_user_context = "\n\n".join(_ctx_parts) + except Exception as exc: + logger.warning("pre_llm_call hook failed: %s", exc) + + # Per-turn file-mutation verifier state. + agent._turn_failed_file_mutations = {} + + # Record the execution thread so interrupt()/clear_interrupt() can scope + # the tool-level interrupt signal to THIS agent's thread only. + agent._execution_thread_id = threading.current_thread().ident + + # Clear stale per-thread interrupt state, preserving a pending interrupt. + ra()._set_interrupt(False, agent._execution_thread_id) + if agent._interrupt_requested: + ra()._set_interrupt(True, agent._execution_thread_id) + agent._interrupt_thread_signal_pending = False + else: + agent._interrupt_message = None + agent._interrupt_thread_signal_pending = False + + # Notify memory providers of the new turn (BEFORE prefetch_all). + if agent._memory_manager: + try: + _turn_msg = original_user_message if isinstance(original_user_message, str) else "" + agent._memory_manager.on_turn_start(agent._user_turn_count, _turn_msg) + except Exception: + pass + + # External memory provider: prefetch once before the tool loop. + ext_prefetch_cache = "" + if agent._memory_manager: + try: + _query = original_user_message if isinstance(original_user_message, str) else "" + ext_prefetch_cache = agent._memory_manager.prefetch_all(_query) or "" + except Exception: + pass + + return TurnContext( + user_message=user_message, + original_user_message=original_user_message, + messages=messages, + conversation_history=conversation_history, + active_system_prompt=active_system_prompt, + effective_task_id=effective_task_id, + turn_id=turn_id, + current_turn_user_idx=current_turn_user_idx, + should_review_memory=should_review_memory, + plugin_user_context=plugin_user_context, + ext_prefetch_cache=ext_prefetch_cache, + ) diff --git a/tests/agent/test_turn_context.py b/tests/agent/test_turn_context.py new file mode 100644 index 00000000000..52aef95ed96 --- /dev/null +++ b/tests/agent/test_turn_context.py @@ -0,0 +1,187 @@ +"""Unit tests for the extracted turn prologue (``agent/turn_context.py``). + +These exercise ``build_turn_context`` against a lightweight fake agent to +confirm the prologue produces the right ``TurnContext`` and applies the +``agent`` side effects the loop relies on — without spinning up a real +``AIAgent`` or hitting any provider. +""" + +from __future__ import annotations + +import types +from unittest.mock import patch + +import pytest + +from agent.turn_context import TurnContext, build_turn_context + + +class _FakeTodoStore: + def has_items(self): + return True + + def _hydrate(self, *_a, **_k): + pass + + +class _FakeGuardrails: + def __init__(self): + self.reset_called = False + + def reset_for_turn(self): + self.reset_called = True + + +class _FakeAgent: + """Minimal stand-in covering only what the prologue touches.""" + + def __init__(self): + self.session_id = "sess-1" + self.model = "test/model" + self.provider = "openrouter" + self.base_url = "https://openrouter.ai/api/v1" + self.api_key = "sk-x" + self.api_mode = "chat_completions" + self.platform = "cli" + self.quiet_mode = True + self.max_iterations = 90 + self.tools = [] + self.valid_tool_names = set() + self.compression_enabled = False + self.context_compressor = types.SimpleNamespace( + protect_first_n=2, protect_last_n=2 + ) + self._cached_system_prompt = "SYSTEM" + self._memory_store = None + self._memory_manager = None + self._memory_nudge_interval = 0 + self._turns_since_memory = 0 + self._user_turn_count = 0 + self._todo_store = _FakeTodoStore() + self._tool_guardrails = _FakeGuardrails() + self._compression_warning = None + self._interrupt_requested = False + self._memory_write_origin = "assistant_tool" + self._stream_context_scrubber = None + self._stream_think_scrubber = None + # Attributes the prologue assigns; recorded for assertions. + self._invalid_tool_retries = -1 + self._vision_supported = None + self._persist_calls = 0 + + # --- methods the prologue calls --- + def _ensure_db_session(self): + pass + + def _restore_primary_runtime(self): + pass + + def _cleanup_dead_connections(self): + return False + + def _emit_status(self, _msg): + pass + + def _replay_compression_warning(self): + pass + + def _hydrate_todo_store(self, *_a, **_k): + pass + + def _safe_print(self, *_a, **_k): + pass + + def _persist_session(self, *_a, **_k): + self._persist_calls += 1 + + +@pytest.fixture(autouse=True) +def _stub_runtime_main(): + """``build_turn_context`` calls ``auxiliary_client.set_runtime_main`` as a + production side effect (telling aux tools the live main provider/model). + That writes a module-level global these unit tests don't care about and + which would otherwise leak into sibling tests (e.g. provider-parity + resolution) when the per-test process isolation plugin is disabled. Stub + it out so the prologue tests stay hermetic. + """ + with patch("agent.auxiliary_client.set_runtime_main", lambda *a, **k: None): + yield + + +def _build(agent, **overrides): + kwargs = dict( + agent=agent, + user_message="hello", + system_message=None, + conversation_history=None, + task_id=None, + stream_callback=None, + persist_user_message=None, + restore_or_build_system_prompt=lambda *a, **k: None, + install_safe_stdio=lambda: None, + sanitize_surrogates=lambda s: s, + summarize_user_message_for_log=lambda s: s, + set_session_context=lambda _sid: None, + set_current_write_origin=lambda _o: None, + ra=lambda: types.SimpleNamespace(_set_interrupt=lambda *a, **k: None), + ) + kwargs.update(overrides) + return build_turn_context(**kwargs) + + +def test_returns_turn_context_with_user_message_appended(): + agent = _FakeAgent() + ctx = _build(agent) + assert isinstance(ctx, TurnContext) + assert ctx.user_message == "hello" + # The user turn was appended and indexed. + assert ctx.messages[-1] == {"role": "user", "content": "hello"} + assert ctx.current_turn_user_idx == len(ctx.messages) - 1 + assert ctx.active_system_prompt == "SYSTEM" + + +def test_applies_agent_side_effects(): + agent = _FakeAgent() + _build(agent) + # Retry counters reset, guardrails reset, vision re-armed, turn counted. + assert agent._invalid_tool_retries == 0 + assert agent._tool_guardrails.reset_called is True + assert agent._vision_supported is True + assert agent._user_turn_count == 1 + # Crash-resilience persistence fired once. + assert agent._persist_calls == 1 + # task/turn ids assigned on the agent. + assert agent._current_task_id + assert agent._current_turn_id + + +def test_task_id_passthrough(): + agent = _FakeAgent() + ctx = _build(agent, task_id="fixed-task") + assert ctx.effective_task_id == "fixed-task" + assert agent._current_task_id == "fixed-task" + + +def test_persist_user_message_becomes_original(): + agent = _FakeAgent() + ctx = _build(agent, user_message="api-prefixed", persist_user_message="clean") + # original_user_message tracks the clean persist override. + assert ctx.original_user_message == "clean" + # but the appended user turn carries the full (sanitized) message. + assert ctx.messages[-1]["content"] == "api-prefixed" + + +def test_memory_nudge_fires_at_interval(): + agent = _FakeAgent() + agent._memory_nudge_interval = 1 + agent.valid_tool_names = {"memory"} + agent._memory_store = object() + ctx = _build(agent) + assert ctx.should_review_memory is True + assert agent._turns_since_memory == 0 # reset after firing + + +def test_no_review_when_memory_disabled(): + agent = _FakeAgent() + ctx = _build(agent) + assert ctx.should_review_memory is False diff --git a/tests/run_agent/test_413_compression.py b/tests/run_agent/test_413_compression.py index 939c3682b88..4801e48eda3 100644 --- a/tests/run_agent/test_413_compression.py +++ b/tests/run_agent/test_413_compression.py @@ -553,6 +553,7 @@ class TestPreflightCompression: agent.status_callback = lambda ev, msg: status_messages.append((ev, msg)) with ( + patch("agent.turn_context.estimate_request_tokens_rough", return_value=114_000), patch("agent.conversation_loop.estimate_request_tokens_rough", return_value=114_000), patch.object(agent, "_compress_context") as mock_compress, patch.object(agent, "_persist_session"), @@ -604,6 +605,7 @@ class TestPreflightCompression: return 125_000 if _rough_calls["n"] == 1 else 40_000 with ( + patch("agent.turn_context.estimate_request_tokens_rough", side_effect=_rough_estimate), patch("agent.conversation_loop.estimate_request_tokens_rough", side_effect=_rough_estimate), patch.object(agent, "_compress_context") as mock_compress, patch.object(agent, "_persist_session"), @@ -728,6 +730,7 @@ class TestPreflightCompression: agent.client.chat.completions.create.side_effect = [ok_resp] with ( + patch("agent.turn_context.estimate_request_tokens_rough", return_value=144_669), patch("agent.conversation_loop.estimate_request_tokens_rough", return_value=144_669), # Compression no-ops (returns input unchanged) — mirrors an aux # summary-model timeout where the messages can't be reduced. @@ -760,6 +763,7 @@ class TestPreflightCompression: agent.client.chat.completions.create.side_effect = [ok_resp] with ( + patch("agent.turn_context.estimate_request_tokens_rough", return_value=144_669), patch("agent.conversation_loop.estimate_request_tokens_rough", return_value=144_669), patch.object(agent, "_compress_context", side_effect=lambda msgs, *a, **k: (msgs, agent._cached_system_prompt)), patch.object(agent, "_persist_session"), diff --git a/tests/run_agent/test_memory_nudge_counter_hydration.py b/tests/run_agent/test_memory_nudge_counter_hydration.py index 1b9bf56005d..6ce1a3afa59 100644 --- a/tests/run_agent/test_memory_nudge_counter_hydration.py +++ b/tests/run_agent/test_memory_nudge_counter_hydration.py @@ -117,25 +117,29 @@ def test_assistant_only_history_does_not_advance_user_turn_count(): def test_production_code_contains_hydration_block(): - """Smoke test: confirm the hydration code is actually wired into - run_conversation(). If someone deletes it, tests above still pass - against the inline replica — this fails them awake. + """Smoke test: confirm the hydration code is actually wired into the + turn path. If someone deletes it, tests above still pass against the + inline replica — this fails them awake. - After the run_agent.py refactor the agent-loop body lives in - ``agent/conversation_loop.py`` and uses ``agent.X`` rather than - ``self.X``. Assert the block is present in the extracted module - specifically — if it ever drifts back into run_agent.py or - disappears entirely, this guard fails loudly. + The agent-loop prologue now lives in ``agent/turn_context.py`` + (``build_turn_context``), with the loop body in + ``agent/conversation_loop.py``. Assert the block is present in the + turn subsystem — if it disappears entirely, this guard fails loudly. + Either module counts so the guard tolerates legitimate relocation + within the turn subsystem. """ from pathlib import Path repo = Path(__file__).resolve().parents[2] - cl_path = repo / "agent" / "conversation_loop.py" - src_cl = cl_path.read_text(encoding="utf-8") + turn_src = "".join( + (repo / "agent" / name).read_text(encoding="utf-8") + for name in ("conversation_loop.py", "turn_context.py") + ) # Anchor on the unique comment + the modulo line. - assert "Hydrate per-session nudge counters from persisted history" in src_cl, ( - f"Hydration comment missing from {cl_path}" + assert "Hydrate per-session nudge counters from persisted history" in turn_src, ( + "Hydration comment missing from the turn subsystem " + "(conversation_loop.py / turn_context.py)" ) assert ( "agent._turns_since_memory = prior_user_turns % agent._memory_nudge_interval" - in src_cl - ), f"Hydration modulo assignment missing from {cl_path}" + in turn_src + ), "Hydration modulo assignment missing from the turn subsystem" diff --git a/tests/run_agent/test_run_agent.py b/tests/run_agent/test_run_agent.py index 8580f7c37d7..884f9995ac1 100644 --- a/tests/run_agent/test_run_agent.py +++ b/tests/run_agent/test_run_agent.py @@ -6393,18 +6393,16 @@ class TestMemoryNudgeCounterPersistence: assert a._iters_since_skill == 0 def test_counters_not_reset_in_preamble(self): - """The run_conversation preamble must not zero the nudge counters.""" + """The turn preamble must not zero the nudge counters.""" import inspect - from agent.conversation_loop import run_conversation as _rc - src = inspect.getsource(_rc) - # The preamble resets many fields (retry counts, budget, etc.) - # before the main loop. Find that reset block and verify our - # counters aren't in it. The reset block ends at iteration_budget. - # The extracted body uses ``agent.X`` (not ``self.X``). Anchor - # exactly on ``agent.iteration_budget = IterationBudget`` so an - # unrelated identifier ending in ``iteration_budget`` (e.g. - # ``_iteration_budget`` or ``shared_iteration_budget``) can't - # match the boundary. + from agent.turn_context import build_turn_context as _btc + src = inspect.getsource(_btc) + # The preamble (now in build_turn_context) resets many fields (retry + # counts, budget, etc.) before returning. Find that reset block and + # verify our counters aren't in it. The reset block ends at + # iteration_budget. Anchor exactly on + # ``agent.iteration_budget = IterationBudget`` so an unrelated + # identifier ending in ``iteration_budget`` can't match the boundary. preamble_end = src.index("agent.iteration_budget = IterationBudget") preamble = src[:preamble_end] assert "agent._turns_since_memory = 0" not in preamble @@ -6490,23 +6488,23 @@ class TestMemoryProviderTurnStart: """ def test_on_turn_start_called_before_prefetch(self): - """Source-level check: on_turn_start appears before prefetch_all in run_conversation.""" + """Source-level check: on_turn_start appears before prefetch_all in the prologue.""" import inspect - from agent.conversation_loop import run_conversation as _rc - src = inspect.getsource(_rc) + from agent.turn_context import build_turn_context as _btc + src = inspect.getsource(_btc) # Find the actual method calls, not comments idx_turn_start = src.index(".on_turn_start(") idx_prefetch = src.index(".prefetch_all(") assert idx_turn_start < idx_prefetch, ( - "on_turn_start() must be called before prefetch_all() in run_conversation " + "on_turn_start() must be called before prefetch_all() in the turn prologue " "so that memory providers have the correct turn count for cadence checks" ) def test_on_turn_start_uses_user_turn_count(self): """Source-level check: on_turn_start receives the user_turn_count.""" import inspect - from agent.conversation_loop import run_conversation as _rc - src = inspect.getsource(_rc) + from agent.turn_context import build_turn_context as _btc + src = inspect.getsource(_btc) # The extracted body uses ``agent.X`` rather than ``self.X``; # assert the extracted-form spelling directly. assert "on_turn_start(agent._user_turn_count" in src