refactor(agent): extract run_conversation prologue into agent/turn_context.py

Phase 1 of the god-file decomposition plan. run_conversation's ~470-line
once-per-turn setup block (stdio guarding, retry-counter resets, user-message
sanitization, todo/nudge hydration, system-prompt restore-or-build,
crash-resilience persistence, preflight compression, the pre_llm_call hook, and
external-memory prefetch) is moved verbatim into build_turn_context(), which
returns a TurnContext dataclass the loop unpacks.

Behavior-neutral move-and-name refactor: the builder mutates `agent` exactly as
the inline code did; only the locals the loop reads back are returned.

- run_conversation: 4602 -> 4217 LOC (-385)
- agent/conversation_loop.py: 4965 -> ~4580 LOC
- new agent/turn_context.py: focused, dependency-injected, unit-tested in isolation

Tests: tests/run_agent/ 1570 passed / 0 failed under per-file process isolation.
Relocation follow-ups: 413_compression mocks now patch both module references;
nudge/on_turn_start source-inspection guards point at the extracted module.
This commit is contained in:
teknium1 2026-06-07 20:55:46 -07:00 committed by Teknium
parent 86c537d209
commit 54870847cb
6 changed files with 648 additions and 446 deletions

View file

@ -31,6 +31,7 @@ from agent.codex_responses_adapter import _summarize_user_message_for_log
from agent.display import KawaiiSpinner
from agent.error_classifier import FailoverReason, classify_api_error
from agent.iteration_budget import IterationBudget
from agent.turn_context import build_turn_context
from agent.memory_manager import build_memory_context_block
from agent.message_sanitization import (
_repair_tool_call_arguments,
@ -389,376 +390,43 @@ def run_conversation(
Returns:
Dict: Complete conversation result with final response and message history
"""
# Guard stdio against OSError from broken pipes (systemd/headless/daemon).
# Installed once, transparent when streams are healthy, prevents crash on write.
_install_safe_stdio()
agent._ensure_db_session()
# Tell auxiliary_client what the live main provider/model are for
# this turn. Used by tools whose behaviour depends on the active
# main model (e.g. vision_analyze's native fast path) so they see
# the CLI/gateway override instead of the stale config.yaml
# default. Idempotent — fine to call every turn.
try:
from agent.auxiliary_client import set_runtime_main
set_runtime_main(
getattr(agent, "provider", "") or "",
getattr(agent, "model", "") or "",
base_url=getattr(agent, "base_url", "") or "",
api_key=getattr(agent, "api_key", "") or "",
api_mode=getattr(agent, "api_mode", "") or "",
)
except Exception:
pass
# Tag all log records on this thread with the session ID so
# ``hermes logs --session <id>`` can filter a single conversation.
set_session_context(agent.session_id)
# Bind the skill write-origin ContextVar for this thread so tool
# handlers (e.g. skill_manage create) can tell whether they are
# running inside the background agent-improvement review fork vs.
# a foreground user-directed turn. Set at the top of each call;
# the review fork runs on its own thread with a fresh context,
# so the foreground value here does not leak into it.
set_current_write_origin(getattr(agent, "_memory_write_origin", "assistant_tool"))
# If the previous turn activated fallback, restore the primary
# runtime so this turn gets a fresh attempt with the preferred model.
# No-op when _fallback_activated is False (gateway, first turn, etc.).
agent._restore_primary_runtime()
# Sanitize surrogate characters from user input. Clipboard paste from
# rich-text editors (Google Docs, Word, etc.) can inject lone surrogates
# that are invalid UTF-8 and crash JSON serialization in the OpenAI SDK.
if isinstance(user_message, str):
user_message = _sanitize_surrogates(user_message)
if isinstance(persist_user_message, str):
persist_user_message = _sanitize_surrogates(persist_user_message)
# Store stream callback for _interruptible_api_call to pick up
agent._stream_callback = stream_callback
agent._persist_user_message_idx = None
agent._persist_user_message_override = persist_user_message
# Generate unique task_id if not provided to isolate VMs between concurrent tasks
effective_task_id = task_id or str(uuid.uuid4())
# Expose the active task_id so tools running mid-turn (e.g. delegate_task
# in delegate_tool.py) can identify this agent for the cross-agent file
# state registry. Set BEFORE any tool dispatch so snapshots taken at
# child-launch time see the parent's real id, not None.
agent._current_task_id = effective_task_id
turn_id = f"{agent.session_id or 'session'}:{effective_task_id}:{uuid.uuid4().hex[:8]}"
agent._current_turn_id = turn_id
agent._current_api_request_id = ""
# Reset retry counters and iteration budget at the start of each turn
# so subagent usage from a previous turn doesn't eat into the next one.
agent._invalid_tool_retries = 0
agent._invalid_json_retries = 0
agent._empty_content_retries = 0
agent._incomplete_scratchpad_retries = 0
agent._codex_incomplete_retries = 0
agent._thinking_prefill_retries = 0
agent._post_tool_empty_retried = False
agent._last_content_with_tools = None
agent._last_content_tools_all_housekeeping = False
agent._mute_post_response = False
agent._unicode_sanitization_passes = 0
agent._tool_guardrails.reset_for_turn()
agent._tool_guardrail_halt_decision = None
# True until the server rejects an image_url content part with an error
# like "Only 'text' content type is supported." Set to False on first
# rejection and kept False for the rest of the session so we never re-send
# images to a text-only endpoint. Scoped per `_run()` call, not per instance.
agent._vision_supported = True
# Pre-turn connection health check: detect and clean up dead TCP
# connections left over from provider outages or dropped streams.
# This prevents the next API call from hanging on a zombie socket.
if agent.api_mode != "anthropic_messages":
try:
if agent._cleanup_dead_connections():
agent._emit_status(
"🔌 Detected stale connections from a previous provider "
"issue — cleaned up automatically. Proceeding with fresh "
"connection."
)
except Exception:
pass
# Replay compression warning through status_callback for gateway
# platforms (the callback was not wired during __init__).
if agent._compression_warning:
agent._replay_compression_warning()
agent._compression_warning = None # send once
# NOTE: _turns_since_memory and _iters_since_skill are NOT reset here.
# They are initialized in __init__ and must persist across run_conversation
# calls so that nudge logic accumulates correctly in CLI mode.
agent.iteration_budget = IterationBudget(agent.max_iterations)
# Log conversation turn start for debugging/observability
_preview_text = _summarize_user_message_for_log(user_message)
_msg_preview = (_preview_text[:80] + "...") if len(_preview_text) > 80 else _preview_text
_msg_preview = _msg_preview.replace("\n", " ")
logger.info(
"conversation turn: session=%s model=%s provider=%s platform=%s history=%d msg=%r",
agent.session_id or "none", agent.model, agent.provider or "unknown",
agent.platform or "unknown", len(conversation_history or []),
_msg_preview,
# ── Per-turn setup (the prologue) ──
# All once-per-turn setup — stdio guarding, retry-counter resets, user
# message sanitization, todo/nudge hydration, system-prompt restore-or-
# build, crash-resilience persistence, preflight compression, the
# ``pre_llm_call`` plugin hook, and external-memory prefetch — lives in
# ``build_turn_context``. It mutates ``agent`` exactly as the inline code
# did and returns the locals the loop below reads back. See
# ``agent/turn_context.py``.
_ctx = build_turn_context(
agent,
user_message,
system_message,
conversation_history,
task_id,
stream_callback,
persist_user_message,
restore_or_build_system_prompt=_restore_or_build_system_prompt,
install_safe_stdio=_install_safe_stdio,
sanitize_surrogates=_sanitize_surrogates,
summarize_user_message_for_log=_summarize_user_message_for_log,
set_session_context=set_session_context,
set_current_write_origin=set_current_write_origin,
ra=_ra,
)
user_message = _ctx.user_message
original_user_message = _ctx.original_user_message
messages = _ctx.messages
conversation_history = _ctx.conversation_history
active_system_prompt = _ctx.active_system_prompt
effective_task_id = _ctx.effective_task_id
turn_id = _ctx.turn_id
current_turn_user_idx = _ctx.current_turn_user_idx
_should_review_memory = _ctx.should_review_memory
_plugin_user_context = _ctx.plugin_user_context
_ext_prefetch_cache = _ctx.ext_prefetch_cache
# Initialize conversation (copy to avoid mutating the caller's list)
messages = list(conversation_history) if conversation_history else []
# Hydrate todo store from conversation history (gateway creates a fresh
# AIAgent per message, so the in-memory store is empty -- we need to
# recover the todo state from the most recent todo tool response in history)
if conversation_history and not agent._todo_store.has_items():
agent._hydrate_todo_store(conversation_history)
# Hydrate per-session nudge counters from persisted history.
# Gateway creates a fresh AIAgent per inbound message (cache miss /
# 1h idle eviction / config-signature mismatch / process restart), so
# _turns_since_memory and _user_turn_count start at 0 every turn and
# the memory.nudge_interval trigger may never be reached. Reconstruct
# an effective count from prior user turns in conversation_history.
# Idempotent: a cached agent that already accumulated counters keeps
# them; only a freshly-built agent with empty in-memory state hydrates.
# See issue #22357.
if conversation_history and agent._user_turn_count == 0:
prior_user_turns = sum(
1 for m in conversation_history if m.get("role") == "user"
)
if prior_user_turns > 0:
agent._user_turn_count = prior_user_turns
if agent._memory_nudge_interval > 0 and agent._turns_since_memory == 0:
# % preserves original 1-in-N cadence rather than firing a
# review immediately on resume (which would surprise users
# whose session happened to land just past a multiple of N).
agent._turns_since_memory = prior_user_turns % agent._memory_nudge_interval
# Prefill messages (few-shot priming) are injected at API-call time only,
# never stored in the messages list. This keeps them ephemeral: they won't
# be saved to session DB, session logs, or batch trajectories, but they're
# automatically re-applied on every API call (including session continuations).
# Track user turns for memory flush and periodic nudge logic
agent._user_turn_count += 1
# Reset the streaming context scrubber at the top of each turn so a
# hung span from a prior interrupted stream can't taint this turn's
# output.
scrubber = getattr(agent, "_stream_context_scrubber", None)
if scrubber is not None:
scrubber.reset()
# Reset the think scrubber for the same reason — an interrupted
# prior stream may have left us inside an unterminated block.
think_scrubber = getattr(agent, "_stream_think_scrubber", None)
if think_scrubber is not None:
think_scrubber.reset()
# Preserve the original user message (no nudge injection).
original_user_message = persist_user_message if persist_user_message is not None else user_message
# Track memory nudge trigger (turn-based, checked here).
# Skill trigger is checked AFTER the agent loop completes, based on
# how many tool iterations THIS turn used.
_should_review_memory = False
if (agent._memory_nudge_interval > 0
and "memory" in agent.valid_tool_names
and agent._memory_store):
agent._turns_since_memory += 1
if agent._turns_since_memory >= agent._memory_nudge_interval:
_should_review_memory = True
agent._turns_since_memory = 0
# Add user message
user_msg = {"role": "user", "content": user_message}
messages.append(user_msg)
current_turn_user_idx = len(messages) - 1
agent._persist_user_message_idx = current_turn_user_idx
if not agent.quiet_mode:
_print_preview = _summarize_user_message_for_log(user_message)
agent._safe_print(f"💬 Starting conversation: '{_print_preview[:60]}{'...' if len(_print_preview) > 60 else ''}'")
# ── System prompt (cached per session for prefix caching) ──
# Built once on first call, reused for all subsequent calls.
# Only rebuilt after context compression events (which invalidate
# the cache and reload memory from disk).
#
# For continuing sessions (gateway creates a fresh AIAgent per
# message), we load the stored system prompt from the session DB
# instead of rebuilding. Rebuilding would pick up memory changes
# from disk that the model already knows about (it wrote them!),
# producing a different system prompt and breaking the Anthropic
# prefix cache.
if agent._cached_system_prompt is None:
_restore_or_build_system_prompt(agent, system_message, conversation_history)
active_system_prompt = agent._cached_system_prompt
# Crash-resilience: persist the inbound user turn as soon as the session row
# has a valid system prompt, before any provider call or tool execution can
# hang/kill the process. The normal end-of-turn persist still runs later;
# _last_flushed_db_idx makes this idempotent and prevents duplicate rows.
try:
agent._persist_session(messages, conversation_history)
except Exception:
logger.warning(
"Early turn-start session persistence failed for session=%s",
agent.session_id or "none",
exc_info=True,
)
# ── Preflight context compression ──
# Before entering the main loop, check if the loaded conversation
# history already exceeds the model's context threshold. This handles
# cases where a user switches to a model with a smaller context window
# while having a large existing session — compress proactively rather
# than waiting for an API error (which might be caught as a non-retryable
# 4xx and abort the request entirely).
if (
agent.compression_enabled
and len(messages) > agent.context_compressor.protect_first_n
+ agent.context_compressor.protect_last_n + 1
):
# Include tool schema tokens — with many tools these can add
# 20-30K+ tokens that the old sys+msg estimate missed entirely.
_preflight_tokens = estimate_request_tokens_rough(
messages,
system_prompt=active_system_prompt or "",
tools=agent.tools or None,
)
_compressor = agent.context_compressor
_defer_preflight = getattr(
_compressor,
"should_defer_preflight_to_real_usage",
lambda _tokens: False,
)
_preflight_deferred = _defer_preflight(_preflight_tokens)
if not _preflight_deferred:
# Keep the CLI/ACP context display in sync with what preflight
# actually measured. The status bar reads
# ``compressor.last_prompt_tokens``, which otherwise only updates
# from a *successful* API response. When the conversation has grown
# since the last successful call — or when compression then fails
# (e.g. the auxiliary summary model times out) and no fresh usage
# arrives — the bar stays stuck at the old, smaller value while
# preflight reports a much larger number, looking out of sync.
# Seed it with the fresh estimate (only ever revising upward; a real
# ``update_from_response`` will correct it after the next API call).
# Skipped when deferring — a deferred estimate is known to over-count
# vs the last real provider prompt, so trusting it for the display
# would re-introduce the very desync we're avoiding.
_last = _compressor.last_prompt_tokens
# Do NOT overwrite the -1 sentinel. compress_context() sets
# last_prompt_tokens=-1 right after compression to mark "no real API
# usage yet". `(x or 0)` evaluates to -1 (truthy) for the sentinel,
# so the old comparison was always True and clobbered the sentinel
# with a schema-inflated rough estimate — re-triggering compression
# on the next turn (#36718). Treat any negative value as "no data".
if _last >= 0 and _preflight_tokens > _last:
_compressor.last_prompt_tokens = _preflight_tokens
if _preflight_deferred:
logger.info(
"Skipping preflight compression: rough estimate ~%s >= %s, "
"but last real provider prompt was %s after compression",
f"{_preflight_tokens:,}",
f"{_compressor.threshold_tokens:,}",
f"{_compressor.last_real_prompt_tokens:,}",
)
elif _compressor.should_compress(_preflight_tokens):
logger.info(
"Preflight compression: ~%s tokens >= %s threshold (model %s, ctx %s)",
f"{_preflight_tokens:,}",
f"{_compressor.threshold_tokens:,}",
agent.model,
f"{_compressor.context_length:,}",
)
agent._emit_status(
f"📦 Preflight compression: ~{_preflight_tokens:,} tokens "
f">= {_compressor.threshold_tokens:,} threshold. "
"This may take a moment."
)
# May need multiple passes for very large sessions with small
# context windows (each pass summarises the middle N turns).
for _pass in range(3):
_orig_len = len(messages)
messages, active_system_prompt = agent._compress_context(
messages, system_message, approx_tokens=_preflight_tokens,
task_id=effective_task_id,
)
if len(messages) >= _orig_len:
break # Cannot compress further
# Compression created a new session — clear the history
# reference so _flush_messages_to_session_db writes ALL
# compressed messages to the new session's SQLite, not
# skipping them because conversation_history is still the
# pre-compression length.
conversation_history = None
# Fix: reset retry counters after compression so the model
# gets a fresh budget on the compressed context. Without
# this, pre-compression retries carry over and the model
# hits "(empty)" immediately after compression-induced
# context loss.
agent._empty_content_retries = 0
agent._thinking_prefill_retries = 0
agent._last_content_with_tools = None
agent._last_content_tools_all_housekeeping = False
agent._mute_post_response = False
# Re-estimate after compression
_preflight_tokens = estimate_request_tokens_rough(
messages,
system_prompt=active_system_prompt or "",
tools=agent.tools or None,
)
if not _compressor.should_compress(_preflight_tokens):
break # Under threshold or anti-thrash guard stopped it
# Plugin hook: pre_llm_call
# Fired once per turn before the tool-calling loop. Plugins can
# return a dict with a ``context`` key (or a plain string) whose
# value is appended to the current turn's user message.
#
# Context is ALWAYS injected into the user message, never the
# system prompt. This preserves the prompt cache prefix — the
# system prompt stays identical across turns so cached tokens
# are reused. The system prompt is Hermes's territory; plugins
# contribute context alongside the user's input.
#
# All injected context is ephemeral (not persisted to session DB).
_plugin_user_context = ""
try:
from hermes_cli.plugins import invoke_hook as _invoke_hook
_pre_results = _invoke_hook(
"pre_llm_call",
session_id=agent.session_id,
task_id=effective_task_id,
turn_id=turn_id,
user_message=original_user_message,
conversation_history=list(messages),
is_first_turn=(not bool(conversation_history)),
model=agent.model,
platform=getattr(agent, "platform", None) or "",
sender_id=getattr(agent, "_user_id", None) or "",
)
_ctx_parts: list[str] = []
for r in _pre_results:
if isinstance(r, dict) and r.get("context"):
_ctx_parts.append(str(r["context"]))
elif isinstance(r, str) and r.strip():
_ctx_parts.append(r)
if _ctx_parts:
_plugin_user_context = "\n\n".join(_ctx_parts)
except Exception as exc:
logger.warning("pre_llm_call hook failed: %s", exc)
# Main conversation loop
# Main conversation loop counters (pure locals consumed by the loop below).
api_call_count = 0
final_response = None
interrupted = False
@ -770,53 +438,6 @@ def run_conversation(
compression_attempts = 0
_turn_exit_reason = "unknown" # Diagnostic: why the loop ended
# Per-turn file-mutation verifier state. Keyed by resolved path;
# each failed ``write_file`` / ``patch`` call records the error
# preview. Later successful writes to the same path remove the
# entry (the model recovered). At end-of-turn, any entries still
# present are surfaced in an advisory footer so the model cannot
# over-claim success while the file is actually unchanged on disk.
agent._turn_failed_file_mutations: Dict[str, Dict[str, Any]] = {}
# Record the execution thread so interrupt()/clear_interrupt() can
# scope the tool-level interrupt signal to THIS agent's thread only.
# Must be set before any thread-scoped interrupt syncing.
agent._execution_thread_id = threading.current_thread().ident
# Always clear stale per-thread state from a previous turn. If an
# interrupt arrived before startup finished, preserve it and bind it
# to this execution thread now instead of dropping it on the floor.
_ra()._set_interrupt(False, agent._execution_thread_id)
if agent._interrupt_requested:
_ra()._set_interrupt(True, agent._execution_thread_id)
agent._interrupt_thread_signal_pending = False
else:
agent._interrupt_message = None
agent._interrupt_thread_signal_pending = False
# Notify memory providers of the new turn so cadence tracking works.
# Must happen BEFORE prefetch_all() so providers know which turn it is
# and can gate context/dialectic refresh via contextCadence/dialecticCadence.
if agent._memory_manager:
try:
_turn_msg = original_user_message if isinstance(original_user_message, str) else ""
agent._memory_manager.on_turn_start(agent._user_turn_count, _turn_msg)
except Exception:
pass
# External memory provider: prefetch once before the tool loop.
# Reuse the cached result on every iteration to avoid re-calling
# prefetch_all() on each tool call (10 tool calls = 10x latency + cost).
# Use original_user_message (clean input) — user_message may contain
# injected skill content that bloats / breaks provider queries.
_ext_prefetch_cache = ""
if agent._memory_manager:
try:
_query = original_user_message if isinstance(original_user_message, str) else ""
_ext_prefetch_cache = agent._memory_manager.prefetch_all(_query) or ""
except Exception:
pass
# Optional opt-in runtime: if api_mode == codex_app_server, hand the
# turn to the codex app-server subprocess (terminal/file ops/patching
# all run inside Codex). Default Hermes path is bypassed entirely.

388
agent/turn_context.py Normal file
View file

@ -0,0 +1,388 @@
"""Per-turn setup for ``run_conversation`` (the turn prologue).
``run_conversation`` opened with ~470 lines of straight-line setup before the
tool-calling loop ever started: stdio guarding, runtime-main wiring, retry-counter
resets, user-message sanitization, todo/nudge-counter hydration, system-prompt
restore-or-build, crash-resilience persistence, preflight context compression, the
``pre_llm_call`` plugin hook, and external-memory prefetch.
All of that is *prologue* it runs once per turn, has no back-references into the
loop, and produces a fixed set of values the loop then consumes. ``TurnContext``
captures those produced values; ``build_turn_context`` performs the setup work and
returns one. ``run_conversation`` is left to unpack the context and run the loop,
shrinking the orchestrator by the full prologue.
The builder still mutates ``agent`` heavily (counters, thread id, cached prompt,
session DB) exactly as the inline code did those side effects are the point. The
``TurnContext`` it returns carries only the *locals* the loop reads back.
Behavior is identical to the original inline prologue; this is a pure
move-and-name refactor with no semantic change.
"""
from __future__ import annotations
import logging
import threading
import uuid
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from agent.iteration_budget import IterationBudget
from agent.model_metadata import estimate_request_tokens_rough
logger = logging.getLogger(__name__)
@dataclass
class TurnContext:
"""Values produced by the turn prologue and consumed by the turn loop."""
# Sanitized inbound message (surrogates stripped).
user_message: str
# Clean message preserved for transcripts / memory queries (no nudge injection).
original_user_message: Any
# Working message list for this turn (loop appends to it).
messages: List[Dict[str, Any]]
# May be reset to None by preflight compression (new session created).
conversation_history: Optional[List[Dict[str, Any]]]
# Cached system prompt active for this turn (may be rebuilt by compression).
active_system_prompt: Optional[str]
# Task / turn identifiers.
effective_task_id: str
turn_id: str
# Index of the current user turn within ``messages``.
current_turn_user_idx: int
# Whether the post-turn memory review should fire.
should_review_memory: bool = False
# Context contributed by ``pre_llm_call`` plugins (appended to user message).
plugin_user_context: str = ""
# External-memory prefetch result, reused across loop iterations.
ext_prefetch_cache: str = ""
def build_turn_context(
agent,
user_message: str,
system_message: Optional[str],
conversation_history: Optional[List[Dict[str, Any]]],
task_id: Optional[str],
stream_callback,
persist_user_message: Optional[str],
*,
restore_or_build_system_prompt,
install_safe_stdio,
sanitize_surrogates,
summarize_user_message_for_log,
set_session_context,
set_current_write_origin,
ra,
) -> TurnContext:
"""Run the once-per-turn setup and return the loop's input context.
The callables/helpers the original prologue referenced from the
``conversation_loop`` module are passed in explicitly to keep this module
free of an import cycle with ``agent.conversation_loop``.
"""
# Guard stdio against OSError from broken pipes (systemd/headless/daemon).
install_safe_stdio()
agent._ensure_db_session()
# Tell auxiliary_client what the live main provider/model are for this turn.
try:
from agent.auxiliary_client import set_runtime_main
set_runtime_main(
getattr(agent, "provider", "") or "",
getattr(agent, "model", "") or "",
base_url=getattr(agent, "base_url", "") or "",
api_key=getattr(agent, "api_key", "") or "",
api_mode=getattr(agent, "api_mode", "") or "",
)
except Exception:
pass
# Tag log records on this thread with the session ID for ``hermes logs``.
set_session_context(agent.session_id)
# Bind the skill write-origin ContextVar for this thread.
set_current_write_origin(getattr(agent, "_memory_write_origin", "assistant_tool"))
# Restore the primary runtime if the previous turn activated fallback.
agent._restore_primary_runtime()
# Sanitize surrogate characters from user input.
if isinstance(user_message, str):
user_message = sanitize_surrogates(user_message)
if isinstance(persist_user_message, str):
persist_user_message = sanitize_surrogates(persist_user_message)
# Store stream callback for _interruptible_api_call to pick up.
agent._stream_callback = stream_callback
agent._persist_user_message_idx = None
agent._persist_user_message_override = persist_user_message
# Generate unique task_id if not provided to isolate VMs between tasks.
effective_task_id = task_id or str(uuid.uuid4())
agent._current_task_id = effective_task_id
turn_id = f"{agent.session_id or 'session'}:{effective_task_id}:{uuid.uuid4().hex[:8]}"
agent._current_turn_id = turn_id
agent._current_api_request_id = ""
# Reset retry counters and iteration budget at the start of each turn.
agent._invalid_tool_retries = 0
agent._invalid_json_retries = 0
agent._empty_content_retries = 0
agent._incomplete_scratchpad_retries = 0
agent._codex_incomplete_retries = 0
agent._thinking_prefill_retries = 0
agent._post_tool_empty_retried = False
agent._last_content_with_tools = None
agent._last_content_tools_all_housekeeping = False
agent._mute_post_response = False
agent._unicode_sanitization_passes = 0
agent._tool_guardrails.reset_for_turn()
agent._tool_guardrail_halt_decision = None
agent._vision_supported = True
# Pre-turn connection health check: clean up dead TCP connections.
if agent.api_mode != "anthropic_messages":
try:
if agent._cleanup_dead_connections():
agent._emit_status(
"🔌 Detected stale connections from a previous provider "
"issue — cleaned up automatically. Proceeding with fresh "
"connection."
)
except Exception:
pass
# Replay compression warning through status_callback for gateway platforms.
if agent._compression_warning:
agent._replay_compression_warning()
agent._compression_warning = None # send once
# NOTE: _turns_since_memory and _iters_since_skill are NOT reset here.
agent.iteration_budget = IterationBudget(agent.max_iterations)
# Log conversation turn start for debugging/observability.
_preview_text = summarize_user_message_for_log(user_message)
_msg_preview = (_preview_text[:80] + "...") if len(_preview_text) > 80 else _preview_text
_msg_preview = _msg_preview.replace("\n", " ")
logger.info(
"conversation turn: session=%s model=%s provider=%s platform=%s history=%d msg=%r",
agent.session_id or "none", agent.model, agent.provider or "unknown",
agent.platform or "unknown", len(conversation_history or []),
_msg_preview,
)
# Initialize conversation (copy to avoid mutating the caller's list).
messages = list(conversation_history) if conversation_history else []
# Hydrate todo store from conversation history.
if conversation_history and not agent._todo_store.has_items():
agent._hydrate_todo_store(conversation_history)
# Hydrate per-session nudge counters from persisted history (issue #22357).
if conversation_history and agent._user_turn_count == 0:
prior_user_turns = sum(
1 for m in conversation_history if m.get("role") == "user"
)
if prior_user_turns > 0:
agent._user_turn_count = prior_user_turns
if agent._memory_nudge_interval > 0 and agent._turns_since_memory == 0:
agent._turns_since_memory = prior_user_turns % agent._memory_nudge_interval
# Track user turns for memory flush and periodic nudge logic.
agent._user_turn_count += 1
# Reset the streaming context scrubber at the top of each turn.
scrubber = getattr(agent, "_stream_context_scrubber", None)
if scrubber is not None:
scrubber.reset()
# Reset the think scrubber for the same reason.
think_scrubber = getattr(agent, "_stream_think_scrubber", None)
if think_scrubber is not None:
think_scrubber.reset()
# Preserve the original user message (no nudge injection).
original_user_message = persist_user_message if persist_user_message is not None else user_message
# Track memory nudge trigger (turn-based, checked here).
should_review_memory = False
if (agent._memory_nudge_interval > 0
and "memory" in agent.valid_tool_names
and agent._memory_store):
agent._turns_since_memory += 1
if agent._turns_since_memory >= agent._memory_nudge_interval:
should_review_memory = True
agent._turns_since_memory = 0
# Add user message.
user_msg = {"role": "user", "content": user_message}
messages.append(user_msg)
current_turn_user_idx = len(messages) - 1
agent._persist_user_message_idx = current_turn_user_idx
if not agent.quiet_mode:
_print_preview = summarize_user_message_for_log(user_message)
agent._safe_print(
f"💬 Starting conversation: '{_print_preview[:60]}"
f"{'...' if len(_print_preview) > 60 else ''}'"
)
# ── System prompt (cached per session for prefix caching) ──
if agent._cached_system_prompt is None:
restore_or_build_system_prompt(agent, system_message, conversation_history)
active_system_prompt = agent._cached_system_prompt
# Crash-resilience: persist the inbound user turn as soon as the session row exists.
try:
agent._persist_session(messages, conversation_history)
except Exception:
logger.warning(
"Early turn-start session persistence failed for session=%s",
agent.session_id or "none",
exc_info=True,
)
# ── Preflight context compression ──
if (
agent.compression_enabled
and len(messages) > agent.context_compressor.protect_first_n
+ agent.context_compressor.protect_last_n + 1
):
_preflight_tokens = estimate_request_tokens_rough(
messages,
system_prompt=active_system_prompt or "",
tools=agent.tools or None,
)
_compressor = agent.context_compressor
_defer_preflight = getattr(
_compressor,
"should_defer_preflight_to_real_usage",
lambda _tokens: False,
)
_preflight_deferred = _defer_preflight(_preflight_tokens)
if not _preflight_deferred:
_last = _compressor.last_prompt_tokens
# Do NOT overwrite the -1 sentinel (#36718).
if _last >= 0 and _preflight_tokens > _last:
_compressor.last_prompt_tokens = _preflight_tokens
if _preflight_deferred:
logger.info(
"Skipping preflight compression: rough estimate ~%s >= %s, "
"but last real provider prompt was %s after compression",
f"{_preflight_tokens:,}",
f"{_compressor.threshold_tokens:,}",
f"{_compressor.last_real_prompt_tokens:,}",
)
elif _compressor.should_compress(_preflight_tokens):
logger.info(
"Preflight compression: ~%s tokens >= %s threshold (model %s, ctx %s)",
f"{_preflight_tokens:,}",
f"{_compressor.threshold_tokens:,}",
agent.model,
f"{_compressor.context_length:,}",
)
agent._emit_status(
f"📦 Preflight compression: ~{_preflight_tokens:,} tokens "
f">= {_compressor.threshold_tokens:,} threshold. "
"This may take a moment."
)
for _pass in range(3):
_orig_len = len(messages)
messages, active_system_prompt = agent._compress_context(
messages, system_message, approx_tokens=_preflight_tokens,
task_id=effective_task_id,
)
if len(messages) >= _orig_len:
break # Cannot compress further
conversation_history = None
agent._empty_content_retries = 0
agent._thinking_prefill_retries = 0
agent._last_content_with_tools = None
agent._last_content_tools_all_housekeeping = False
agent._mute_post_response = False
_preflight_tokens = estimate_request_tokens_rough(
messages,
system_prompt=active_system_prompt or "",
tools=agent.tools or None,
)
if not _compressor.should_compress(_preflight_tokens):
break
# Plugin hook: pre_llm_call (context injected into user message, not system prompt).
plugin_user_context = ""
try:
from hermes_cli.plugins import invoke_hook as _invoke_hook
_pre_results = _invoke_hook(
"pre_llm_call",
session_id=agent.session_id,
task_id=effective_task_id,
turn_id=turn_id,
user_message=original_user_message,
conversation_history=list(messages),
is_first_turn=(not bool(conversation_history)),
model=agent.model,
platform=getattr(agent, "platform", None) or "",
sender_id=getattr(agent, "_user_id", None) or "",
)
_ctx_parts: list[str] = []
for r in _pre_results:
if isinstance(r, dict) and r.get("context"):
_ctx_parts.append(str(r["context"]))
elif isinstance(r, str) and r.strip():
_ctx_parts.append(r)
if _ctx_parts:
plugin_user_context = "\n\n".join(_ctx_parts)
except Exception as exc:
logger.warning("pre_llm_call hook failed: %s", exc)
# Per-turn file-mutation verifier state.
agent._turn_failed_file_mutations = {}
# Record the execution thread so interrupt()/clear_interrupt() can scope
# the tool-level interrupt signal to THIS agent's thread only.
agent._execution_thread_id = threading.current_thread().ident
# Clear stale per-thread interrupt state, preserving a pending interrupt.
ra()._set_interrupt(False, agent._execution_thread_id)
if agent._interrupt_requested:
ra()._set_interrupt(True, agent._execution_thread_id)
agent._interrupt_thread_signal_pending = False
else:
agent._interrupt_message = None
agent._interrupt_thread_signal_pending = False
# Notify memory providers of the new turn (BEFORE prefetch_all).
if agent._memory_manager:
try:
_turn_msg = original_user_message if isinstance(original_user_message, str) else ""
agent._memory_manager.on_turn_start(agent._user_turn_count, _turn_msg)
except Exception:
pass
# External memory provider: prefetch once before the tool loop.
ext_prefetch_cache = ""
if agent._memory_manager:
try:
_query = original_user_message if isinstance(original_user_message, str) else ""
ext_prefetch_cache = agent._memory_manager.prefetch_all(_query) or ""
except Exception:
pass
return TurnContext(
user_message=user_message,
original_user_message=original_user_message,
messages=messages,
conversation_history=conversation_history,
active_system_prompt=active_system_prompt,
effective_task_id=effective_task_id,
turn_id=turn_id,
current_turn_user_idx=current_turn_user_idx,
should_review_memory=should_review_memory,
plugin_user_context=plugin_user_context,
ext_prefetch_cache=ext_prefetch_cache,
)

View file

@ -0,0 +1,187 @@
"""Unit tests for the extracted turn prologue (``agent/turn_context.py``).
These exercise ``build_turn_context`` against a lightweight fake agent to
confirm the prologue produces the right ``TurnContext`` and applies the
``agent`` side effects the loop relies on without spinning up a real
``AIAgent`` or hitting any provider.
"""
from __future__ import annotations
import types
from unittest.mock import patch
import pytest
from agent.turn_context import TurnContext, build_turn_context
class _FakeTodoStore:
def has_items(self):
return True
def _hydrate(self, *_a, **_k):
pass
class _FakeGuardrails:
def __init__(self):
self.reset_called = False
def reset_for_turn(self):
self.reset_called = True
class _FakeAgent:
"""Minimal stand-in covering only what the prologue touches."""
def __init__(self):
self.session_id = "sess-1"
self.model = "test/model"
self.provider = "openrouter"
self.base_url = "https://openrouter.ai/api/v1"
self.api_key = "sk-x"
self.api_mode = "chat_completions"
self.platform = "cli"
self.quiet_mode = True
self.max_iterations = 90
self.tools = []
self.valid_tool_names = set()
self.compression_enabled = False
self.context_compressor = types.SimpleNamespace(
protect_first_n=2, protect_last_n=2
)
self._cached_system_prompt = "SYSTEM"
self._memory_store = None
self._memory_manager = None
self._memory_nudge_interval = 0
self._turns_since_memory = 0
self._user_turn_count = 0
self._todo_store = _FakeTodoStore()
self._tool_guardrails = _FakeGuardrails()
self._compression_warning = None
self._interrupt_requested = False
self._memory_write_origin = "assistant_tool"
self._stream_context_scrubber = None
self._stream_think_scrubber = None
# Attributes the prologue assigns; recorded for assertions.
self._invalid_tool_retries = -1
self._vision_supported = None
self._persist_calls = 0
# --- methods the prologue calls ---
def _ensure_db_session(self):
pass
def _restore_primary_runtime(self):
pass
def _cleanup_dead_connections(self):
return False
def _emit_status(self, _msg):
pass
def _replay_compression_warning(self):
pass
def _hydrate_todo_store(self, *_a, **_k):
pass
def _safe_print(self, *_a, **_k):
pass
def _persist_session(self, *_a, **_k):
self._persist_calls += 1
@pytest.fixture(autouse=True)
def _stub_runtime_main():
"""``build_turn_context`` calls ``auxiliary_client.set_runtime_main`` as a
production side effect (telling aux tools the live main provider/model).
That writes a module-level global these unit tests don't care about and
which would otherwise leak into sibling tests (e.g. provider-parity
resolution) when the per-test process isolation plugin is disabled. Stub
it out so the prologue tests stay hermetic.
"""
with patch("agent.auxiliary_client.set_runtime_main", lambda *a, **k: None):
yield
def _build(agent, **overrides):
kwargs = dict(
agent=agent,
user_message="hello",
system_message=None,
conversation_history=None,
task_id=None,
stream_callback=None,
persist_user_message=None,
restore_or_build_system_prompt=lambda *a, **k: None,
install_safe_stdio=lambda: None,
sanitize_surrogates=lambda s: s,
summarize_user_message_for_log=lambda s: s,
set_session_context=lambda _sid: None,
set_current_write_origin=lambda _o: None,
ra=lambda: types.SimpleNamespace(_set_interrupt=lambda *a, **k: None),
)
kwargs.update(overrides)
return build_turn_context(**kwargs)
def test_returns_turn_context_with_user_message_appended():
agent = _FakeAgent()
ctx = _build(agent)
assert isinstance(ctx, TurnContext)
assert ctx.user_message == "hello"
# The user turn was appended and indexed.
assert ctx.messages[-1] == {"role": "user", "content": "hello"}
assert ctx.current_turn_user_idx == len(ctx.messages) - 1
assert ctx.active_system_prompt == "SYSTEM"
def test_applies_agent_side_effects():
agent = _FakeAgent()
_build(agent)
# Retry counters reset, guardrails reset, vision re-armed, turn counted.
assert agent._invalid_tool_retries == 0
assert agent._tool_guardrails.reset_called is True
assert agent._vision_supported is True
assert agent._user_turn_count == 1
# Crash-resilience persistence fired once.
assert agent._persist_calls == 1
# task/turn ids assigned on the agent.
assert agent._current_task_id
assert agent._current_turn_id
def test_task_id_passthrough():
agent = _FakeAgent()
ctx = _build(agent, task_id="fixed-task")
assert ctx.effective_task_id == "fixed-task"
assert agent._current_task_id == "fixed-task"
def test_persist_user_message_becomes_original():
agent = _FakeAgent()
ctx = _build(agent, user_message="api-prefixed", persist_user_message="clean")
# original_user_message tracks the clean persist override.
assert ctx.original_user_message == "clean"
# but the appended user turn carries the full (sanitized) message.
assert ctx.messages[-1]["content"] == "api-prefixed"
def test_memory_nudge_fires_at_interval():
agent = _FakeAgent()
agent._memory_nudge_interval = 1
agent.valid_tool_names = {"memory"}
agent._memory_store = object()
ctx = _build(agent)
assert ctx.should_review_memory is True
assert agent._turns_since_memory == 0 # reset after firing
def test_no_review_when_memory_disabled():
agent = _FakeAgent()
ctx = _build(agent)
assert ctx.should_review_memory is False

View file

@ -553,6 +553,7 @@ class TestPreflightCompression:
agent.status_callback = lambda ev, msg: status_messages.append((ev, msg))
with (
patch("agent.turn_context.estimate_request_tokens_rough", return_value=114_000),
patch("agent.conversation_loop.estimate_request_tokens_rough", return_value=114_000),
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
@ -604,6 +605,7 @@ class TestPreflightCompression:
return 125_000 if _rough_calls["n"] == 1 else 40_000
with (
patch("agent.turn_context.estimate_request_tokens_rough", side_effect=_rough_estimate),
patch("agent.conversation_loop.estimate_request_tokens_rough", side_effect=_rough_estimate),
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
@ -728,6 +730,7 @@ class TestPreflightCompression:
agent.client.chat.completions.create.side_effect = [ok_resp]
with (
patch("agent.turn_context.estimate_request_tokens_rough", return_value=144_669),
patch("agent.conversation_loop.estimate_request_tokens_rough", return_value=144_669),
# Compression no-ops (returns input unchanged) — mirrors an aux
# summary-model timeout where the messages can't be reduced.
@ -760,6 +763,7 @@ class TestPreflightCompression:
agent.client.chat.completions.create.side_effect = [ok_resp]
with (
patch("agent.turn_context.estimate_request_tokens_rough", return_value=144_669),
patch("agent.conversation_loop.estimate_request_tokens_rough", return_value=144_669),
patch.object(agent, "_compress_context", side_effect=lambda msgs, *a, **k: (msgs, agent._cached_system_prompt)),
patch.object(agent, "_persist_session"),

View file

@ -117,25 +117,29 @@ def test_assistant_only_history_does_not_advance_user_turn_count():
def test_production_code_contains_hydration_block():
"""Smoke test: confirm the hydration code is actually wired into
run_conversation(). If someone deletes it, tests above still pass
against the inline replica this fails them awake.
"""Smoke test: confirm the hydration code is actually wired into the
turn path. If someone deletes it, tests above still pass against the
inline replica this fails them awake.
After the run_agent.py refactor the agent-loop body lives in
``agent/conversation_loop.py`` and uses ``agent.X`` rather than
``self.X``. Assert the block is present in the extracted module
specifically if it ever drifts back into run_agent.py or
disappears entirely, this guard fails loudly.
The agent-loop prologue now lives in ``agent/turn_context.py``
(``build_turn_context``), with the loop body in
``agent/conversation_loop.py``. Assert the block is present in the
turn subsystem if it disappears entirely, this guard fails loudly.
Either module counts so the guard tolerates legitimate relocation
within the turn subsystem.
"""
from pathlib import Path
repo = Path(__file__).resolve().parents[2]
cl_path = repo / "agent" / "conversation_loop.py"
src_cl = cl_path.read_text(encoding="utf-8")
turn_src = "".join(
(repo / "agent" / name).read_text(encoding="utf-8")
for name in ("conversation_loop.py", "turn_context.py")
)
# Anchor on the unique comment + the modulo line.
assert "Hydrate per-session nudge counters from persisted history" in src_cl, (
f"Hydration comment missing from {cl_path}"
assert "Hydrate per-session nudge counters from persisted history" in turn_src, (
"Hydration comment missing from the turn subsystem "
"(conversation_loop.py / turn_context.py)"
)
assert (
"agent._turns_since_memory = prior_user_turns % agent._memory_nudge_interval"
in src_cl
), f"Hydration modulo assignment missing from {cl_path}"
in turn_src
), "Hydration modulo assignment missing from the turn subsystem"

View file

@ -6393,18 +6393,16 @@ class TestMemoryNudgeCounterPersistence:
assert a._iters_since_skill == 0
def test_counters_not_reset_in_preamble(self):
"""The run_conversation preamble must not zero the nudge counters."""
"""The turn preamble must not zero the nudge counters."""
import inspect
from agent.conversation_loop import run_conversation as _rc
src = inspect.getsource(_rc)
# The preamble resets many fields (retry counts, budget, etc.)
# before the main loop. Find that reset block and verify our
# counters aren't in it. The reset block ends at iteration_budget.
# The extracted body uses ``agent.X`` (not ``self.X``). Anchor
# exactly on ``agent.iteration_budget = IterationBudget`` so an
# unrelated identifier ending in ``iteration_budget`` (e.g.
# ``_iteration_budget`` or ``shared_iteration_budget``) can't
# match the boundary.
from agent.turn_context import build_turn_context as _btc
src = inspect.getsource(_btc)
# The preamble (now in build_turn_context) resets many fields (retry
# counts, budget, etc.) before returning. Find that reset block and
# verify our counters aren't in it. The reset block ends at
# iteration_budget. Anchor exactly on
# ``agent.iteration_budget = IterationBudget`` so an unrelated
# identifier ending in ``iteration_budget`` can't match the boundary.
preamble_end = src.index("agent.iteration_budget = IterationBudget")
preamble = src[:preamble_end]
assert "agent._turns_since_memory = 0" not in preamble
@ -6490,23 +6488,23 @@ class TestMemoryProviderTurnStart:
"""
def test_on_turn_start_called_before_prefetch(self):
"""Source-level check: on_turn_start appears before prefetch_all in run_conversation."""
"""Source-level check: on_turn_start appears before prefetch_all in the prologue."""
import inspect
from agent.conversation_loop import run_conversation as _rc
src = inspect.getsource(_rc)
from agent.turn_context import build_turn_context as _btc
src = inspect.getsource(_btc)
# Find the actual method calls, not comments
idx_turn_start = src.index(".on_turn_start(")
idx_prefetch = src.index(".prefetch_all(")
assert idx_turn_start < idx_prefetch, (
"on_turn_start() must be called before prefetch_all() in run_conversation "
"on_turn_start() must be called before prefetch_all() in the turn prologue "
"so that memory providers have the correct turn count for cadence checks"
)
def test_on_turn_start_uses_user_turn_count(self):
"""Source-level check: on_turn_start receives the user_turn_count."""
import inspect
from agent.conversation_loop import run_conversation as _rc
src = inspect.getsource(_rc)
from agent.turn_context import build_turn_context as _btc
src = inspect.getsource(_btc)
# The extracted body uses ``agent.X`` rather than ``self.X``;
# assert the extracted-form spelling directly.
assert "on_turn_start(agent._user_turn_count" in src