From c42fa94afc39c7caca15cdb7b951cc338a4587f0 Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sat, 16 May 2026 19:03:30 -0700 Subject: [PATCH] refactor(run_agent): extract Codex runtime + assorted helpers to dedicated modules MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two new modules: * agent/codex_runtime.py — three Codex API-mode methods - run_codex_app_server_turn (148 LOC) — Codex CLI subprocess driver - run_codex_stream (125 LOC) — Codex Responses API stream - run_codex_create_stream_fallback (78 LOC) — fallback after Responses stream=true initial create failure * agent/agent_runtime_helpers.py — twelve assorted AIAgent helpers totalling ~1,166 LOC: convert_to_trajectory_format, sanitize_tool_call_arguments (static), repair_message_sequence, strip_think_blocks, recover_with_credential_pool, try_recover_primary_transport, drop_thinking_only_and_merge_users (static), restore_primary_runtime, extract_reasoning, dump_api_request_debug, anthropic_prompt_cache_policy, create_openai_client AIAgent keeps thin forwarder methods for all 15 (preserving @staticmethod where needed). Symbols tests patch on run_agent (OpenAI, AIAgent class attrs) are routed through _ra() to honor the patch contract. The _TRANSIENT_TRANSPORT_ERRORS frozenset moves with try_recover_primary_transport and is referenced as a module-level constant in the extracted code. tests/run_agent/ + tests/agent/: 4313 passed (same pre-existing test_auxiliary_client failure). run_agent.py: 11391 -> 9887 lines (-1504). --- agent/agent_runtime_helpers.py | 1260 +++++++++++++++++++++++++++ agent/codex_runtime.py | 400 +++++++++ run_agent.py | 1495 +------------------------------- 3 files changed, 1705 insertions(+), 1450 deletions(-) create mode 100644 agent/agent_runtime_helpers.py create mode 100644 agent/codex_runtime.py diff --git a/agent/agent_runtime_helpers.py b/agent/agent_runtime_helpers.py new file mode 100644 index 00000000000..4efe5203421 --- /dev/null +++ b/agent/agent_runtime_helpers.py @@ -0,0 +1,1260 @@ +"""Assorted AIAgent runtime helpers — moved out of run_agent.py for clarity. + +Each function takes the parent ``AIAgent`` as its first argument +(``agent``) except for the static helpers (``sanitize_tool_call_arguments``, +``drop_thinking_only_and_merge_users``) which are stateless. AIAgent +keeps thin forwarders for backward compatibility. + +Methods covered: +* ``convert_to_trajectory_format`` — internal -> trajectory-file format +* ``sanitize_tool_call_arguments`` — repair corrupted JSON in tool_calls +* ``repair_message_sequence`` — enforce alternation invariants +* ``strip_think_blocks`` — remove inline reasoning from stored content +* ``recover_with_credential_pool`` — rotate pool entries on 429 +* ``try_recover_primary_transport`` — re-create OpenAI client after rate-limit +* ``drop_thinking_only_and_merge_users`` — Anthropic-style cleanup +* ``restore_primary_runtime`` — un-do fallback activation +* ``extract_reasoning`` — pull reasoning fields out of API responses +* ``dump_api_request_debug`` — write request body for post-mortem +* ``anthropic_prompt_cache_policy`` — compute cache_control breakpoints +* ``create_openai_client`` — build the per-agent OpenAI SDK client +""" + +from __future__ import annotations + +import copy +import json +import logging +import os +import re +import threading +import time +import uuid +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from hermes_cli.timeouts import get_provider_request_timeout +from agent.message_sanitization import ( + _repair_tool_call_arguments, + _sanitize_surrogates, +) +from agent.tool_dispatch_helpers import _trajectory_normalize_msg +from agent.trajectory import convert_scratchpad_to_think +from agent.error_classifier import classify_api_error, FailoverReason +from utils import base_url_host_matches, base_url_hostname, env_var_enabled, atomic_json_write + +logger = logging.getLogger(__name__) + + +def _ra(): + """Lazy ``run_agent`` reference for test-patch routing.""" + import run_agent + return run_agent + + + +def convert_to_trajectory_format(agent, messages: List[Dict[str, Any]], user_query: str, completed: bool) -> List[Dict[str, Any]]: + """ + Convert internal message format to trajectory format for saving. + + Args: + messages (List[Dict]): Internal message history + user_query (str): Original user query + completed (bool): Whether the conversation completed successfully + + Returns: + List[Dict]: Messages in trajectory format + """ + # Normalize multimodal tool results — trajectories are text-only, so + # replace image-bearing tool messages with their text_summary to avoid + # embedding ~1MB base64 blobs into every saved trajectory. + messages = [_trajectory_normalize_msg(m) for m in messages] + trajectory = [] + + # Add system message with tool definitions + system_msg = ( + "You are a function calling AI model. You are provided with function signatures within XML tags. " + "You may call one or more functions to assist with the user query. If available tools are not relevant in assisting " + "with user query, just respond in natural conversational language. Don't make assumptions about what values to plug " + "into functions. After calling & executing the functions, you will be provided with function results within " + " XML tags. Here are the available tools:\n" + f"\n{agent._format_tools_for_system_message()}\n\n" + "For each function call return a JSON object, with the following pydantic model json schema for each:\n" + "{'title': 'FunctionCall', 'type': 'object', 'properties': {'name': {'title': 'Name', 'type': 'string'}, " + "'arguments': {'title': 'Arguments', 'type': 'object'}}, 'required': ['name', 'arguments']}\n" + "Each function call should be enclosed within XML tags.\n" + "Example:\n\n{'name': ,'arguments': }\n" + ) + + trajectory.append({ + "from": "system", + "value": system_msg + }) + + # Add the actual user prompt (from the dataset) as the first human message + trajectory.append({ + "from": "human", + "value": user_query + }) + + # Skip the first message (the user query) since we already added it above. + # Prefill messages are injected at API-call time only (not in the messages + # list), so no offset adjustment is needed here. + i = 1 + + while i < len(messages): + msg = messages[i] + + if msg["role"] == "assistant": + # Check if this message has tool calls + if "tool_calls" in msg and msg["tool_calls"]: + # Format assistant message with tool calls + # Add tags around reasoning for trajectory storage + content = "" + + # Prepend reasoning in tags if available (native thinking tokens) + if msg.get("reasoning") and msg["reasoning"].strip(): + content = f"\n{msg['reasoning']}\n\n" + + if msg.get("content") and msg["content"].strip(): + # Convert any tags to tags + # (used when native thinking is disabled and model reasons via XML) + content += convert_scratchpad_to_think(msg["content"]) + "\n" + + # Add tool calls wrapped in XML tags + for tool_call in msg["tool_calls"]: + if not tool_call or not isinstance(tool_call, dict): continue + # Parse arguments - should always succeed since we validate during conversation + # but keep try-except as safety net + try: + arguments = json.loads(tool_call["function"]["arguments"]) if isinstance(tool_call["function"]["arguments"], str) else tool_call["function"]["arguments"] + except json.JSONDecodeError: + # This shouldn't happen since we validate and retry during conversation, + # but if it does, log warning and use empty dict + logging.warning(f"Unexpected invalid JSON in trajectory conversion: {tool_call['function']['arguments'][:100]}") + arguments = {} + + tool_call_json = { + "name": tool_call["function"]["name"], + "arguments": arguments + } + content += f"\n{json.dumps(tool_call_json, ensure_ascii=False)}\n\n" + + # Ensure every gpt turn has a block (empty if no reasoning) + # so the format is consistent for training data + if "" not in content: + content = "\n\n" + content + + trajectory.append({ + "from": "gpt", + "value": content.rstrip() + }) + + # Collect all subsequent tool responses + tool_responses = [] + j = i + 1 + while j < len(messages) and messages[j]["role"] == "tool": + tool_msg = messages[j] + # Format tool response with XML tags + tool_response = "\n" + + # Try to parse tool content as JSON if it looks like JSON + tool_content = tool_msg["content"] + try: + if tool_content.strip().startswith(("{", "[")): + tool_content = json.loads(tool_content) + except (json.JSONDecodeError, AttributeError): + pass # Keep as string if not valid JSON + + tool_index = len(tool_responses) + tool_name = ( + msg["tool_calls"][tool_index]["function"]["name"] + if tool_index < len(msg["tool_calls"]) + else "unknown" + ) + tool_response += json.dumps({ + "tool_call_id": tool_msg.get("tool_call_id", ""), + "name": tool_name, + "content": tool_content + }, ensure_ascii=False) + tool_response += "\n" + tool_responses.append(tool_response) + j += 1 + + # Add all tool responses as a single message + if tool_responses: + trajectory.append({ + "from": "tool", + "value": "\n".join(tool_responses) + }) + i = j - 1 # Skip the tool messages we just processed + + else: + # Regular assistant message without tool calls + # Add tags around reasoning for trajectory storage + content = "" + + # Prepend reasoning in tags if available (native thinking tokens) + if msg.get("reasoning") and msg["reasoning"].strip(): + content = f"\n{msg['reasoning']}\n\n" + + # Convert any tags to tags + # (used when native thinking is disabled and model reasons via XML) + raw_content = msg["content"] or "" + content += convert_scratchpad_to_think(raw_content) + + # Ensure every gpt turn has a block (empty if no reasoning) + if "" not in content: + content = "\n\n" + content + + trajectory.append({ + "from": "gpt", + "value": content.strip() + }) + + elif msg["role"] == "user": + trajectory.append({ + "from": "human", + "value": msg["content"] + }) + + i += 1 + + return trajectory + + + +def sanitize_tool_call_arguments( + messages: list, + *, + logger=None, + session_id: str = None, +) -> int: + """Repair corrupted assistant tool-call argument JSON in-place.""" + log = logger or logging.getLogger(__name__) + if not isinstance(messages, list): + return 0 + + repaired = 0 + marker = _ra().AIAgent._TOOL_CALL_ARGUMENTS_CORRUPTION_MARKER + + def _prepend_marker(tool_msg: dict) -> None: + existing = tool_msg.get("content") + if isinstance(existing, str): + if not existing: + tool_msg["content"] = marker + elif not existing.startswith(marker): + tool_msg["content"] = f"{marker}\n{existing}" + return + if existing is None: + tool_msg["content"] = marker + return + try: + existing_text = json.dumps(existing) + except TypeError: + existing_text = str(existing) + tool_msg["content"] = f"{marker}\n{existing_text}" + + message_index = 0 + while message_index < len(messages): + msg = messages[message_index] + if not isinstance(msg, dict) or msg.get("role") != "assistant": + message_index += 1 + continue + + tool_calls = msg.get("tool_calls") + if not isinstance(tool_calls, list) or not tool_calls: + message_index += 1 + continue + + insert_at = message_index + 1 + for tool_call in tool_calls: + if not isinstance(tool_call, dict): + continue + function = tool_call.get("function") + if not isinstance(function, dict): + continue + + arguments = function.get("arguments") + if arguments is None or arguments == "": + function["arguments"] = "{}" + continue + if isinstance(arguments, str) and not arguments.strip(): + function["arguments"] = "{}" + continue + if not isinstance(arguments, str): + continue + + try: + json.loads(arguments) + except json.JSONDecodeError: + tool_call_id = tool_call.get("id") + function_name = function.get("name", "?") + preview = arguments[:80] + log.warning( + "Corrupted tool_call arguments repaired before request " + "(session=%s, message_index=%s, tool_call_id=%s, function=%s, preview=%r)", + session_id or "-", + message_index, + tool_call_id or "-", + function_name, + preview, + ) + function["arguments"] = "{}" + + existing_tool_msg = None + scan_index = message_index + 1 + while scan_index < len(messages): + candidate = messages[scan_index] + if not isinstance(candidate, dict) or candidate.get("role") != "tool": + break + if candidate.get("tool_call_id") == tool_call_id: + existing_tool_msg = candidate + break + scan_index += 1 + + if existing_tool_msg is None: + messages.insert( + insert_at, + { + "role": "tool", + "name": function_name if function_name != "?" else "", + "tool_call_id": tool_call_id, + "content": marker, + }, + ) + insert_at += 1 + else: + _prepend_marker(existing_tool_msg) + + repaired += 1 + + message_index += 1 + + return repaired + + + +def repair_message_sequence(agent, messages: List[Dict]) -> int: + """Collapse malformed role-alternation left in the live history. + + Providers (OpenAI, OpenRouter, Anthropic) expect strict alternation: + after the system message, user/tool alternates with assistant, with + no two consecutive user messages and no tool-result that doesn't + follow an assistant-with-tool_calls. Violations cause silent empty + responses on most providers, which triggers the empty-retry loop. + + This runs right before the API call as a defensive belt — by the + time it fires, the scaffolding strip should already have prevented + most shapes, but external callers (gateway multi-queue replay, + session resume, cron, explicit conversation_history passed in by + host code) can feed in already-broken histories. + + Repairs applied: + 1. Stray ``tool`` messages whose ``tool_call_id`` doesn't match + any preceding assistant tool_call — dropped. + 2. Consecutive ``user`` messages — merged with newline separator + so no user input is lost. + + Deliberately does NOT rewind orphan ``assistant(tool_calls)+tool`` + pairs that precede a user message — that pattern IS valid when the + previous turn completed normally and the user jumped in to redirect + before the model got a continuation turn (the ongoing dialog + pattern). The empty-response scaffolding stripper handles the + genuinely-broken variant via its flag-gated rewind. + + Returns the number of repairs made (for logging/telemetry). + """ + if not messages: + return 0 + + repairs = 0 + + # Pass 1: drop stray tool messages that don't follow a known + # assistant tool_call_id. Uses a rolling set of known ids refreshed + # on each assistant message. + known_tool_ids: set = set() + filtered: List[Dict] = [] + for msg in messages: + if not isinstance(msg, dict): + filtered.append(msg) + continue + role = msg.get("role") + if role == "assistant": + known_tool_ids = set() + for tc in (msg.get("tool_calls") or []): + tc_id = tc.get("id") if isinstance(tc, dict) else None + if tc_id: + known_tool_ids.add(tc_id) + filtered.append(msg) + elif role == "tool": + tc_id = msg.get("tool_call_id") + if tc_id and tc_id in known_tool_ids: + filtered.append(msg) + else: + repairs += 1 + else: + if role == "user": + # A user turn closes the tool-result run; subsequent + # tool messages without a fresh assistant tool_call + # are orphans. + known_tool_ids = set() + filtered.append(msg) + + # Pass 2: merge consecutive user messages. Preserves all user input + # so nothing the user typed is lost. + merged: List[Dict] = [] + for msg in filtered: + if ( + merged + and isinstance(msg, dict) + and msg.get("role") == "user" + and isinstance(merged[-1], dict) + and merged[-1].get("role") == "user" + ): + prev = merged[-1] + prev_content = prev.get("content", "") + new_content = msg.get("content", "") + # Only merge plain-text content; leave multimodal (list) + # content alone — collapsing image/audio blocks risks + # mangling the attachment structure. + if isinstance(prev_content, str) and isinstance(new_content, str): + prev["content"] = ( + (prev_content + "\n\n" + new_content) + if prev_content and new_content + else (prev_content or new_content) + ) + repairs += 1 + continue + merged.append(msg) + + if repairs > 0: + # Rewrite in place so downstream paths (persistence, return + # value, session DB flush) see the repaired sequence. + messages[:] = merged + + return repairs + + + +def strip_think_blocks(agent, content: str) -> str: + """Remove reasoning/thinking blocks from content, returning only visible text. + + Handles four cases: + 1. Closed tag pairs (````) — the common path when + the provider emits complete reasoning blocks. + 2. Unterminated open tag at a block boundary (start of text or + after a newline) — e.g. MiniMax M2.7 / NIM endpoints where the + closing tag is dropped. Everything from the open tag to end + of string is stripped. The block-boundary check mirrors + ``gateway/stream_consumer.py``'s filter so models that mention + ```` in prose aren't over-stripped. + 3. Stray orphan open/close tags that slip through. + 4. Tag variants: ````, ````, ````, + ````, ```` (Gemma 4), all + case-insensitive. + + Additionally strips standalone tool-call XML blocks that some open + models (notably Gemma variants on OpenRouter) emit inside assistant + content instead of via the structured ``tool_calls`` field: + * ```` + * ```` + * ```` + * ```` + * ```` + * ```` (Gemma style) + Ported from openclaw/openclaw#67318. The ```` variant is + boundary-gated (only strips when the tag sits at start-of-line or + after punctuation and carries a ``name="..."`` attribute) so prose + mentions like "Use in JavaScript" are preserved. + """ + if not content: + return "" + # 1. Closed tag pairs — case-insensitive for all variants so + # mixed-case tags (, ) don't slip through to + # the unterminated-tag pass and take trailing content with them. + content = re.sub(r'.*?', '', content, flags=re.DOTALL | re.IGNORECASE) + content = re.sub(r'.*?', '', content, flags=re.DOTALL | re.IGNORECASE) + content = re.sub(r'.*?', '', content, flags=re.DOTALL | re.IGNORECASE) + content = re.sub(r'.*?', '', content, flags=re.DOTALL | re.IGNORECASE) + content = re.sub(r'.*?', '', content, flags=re.DOTALL | re.IGNORECASE) + # 1b. Tool-call XML blocks (openclaw/openclaw#67318). Handle the + # generic tag names first — they have no attribute gating since + # a literal in prose is already vanishingly rare. + for _tc_name in ("tool_call", "tool_calls", "tool_result", + "function_call", "function_calls"): + content = re.sub( + rf'<{_tc_name}\b[^>]*>.*?', + '', + content, + flags=re.DOTALL | re.IGNORECASE, + ) + # 1c. ... — Gemma-style standalone + # tool call. Only strip when the tag sits at a block boundary + # (start of text, after a newline, or after sentence-ending + # punctuation) AND carries a name="..." attribute. This keeps + # prose mentions like "Use to declare" safe. + content = re.sub( + r'(?:(?<=^)|(?<=[\n\r.!?:]))[ \t]*' + r']*\bname\s*=[^>]*>' + r'(?:(?:(?!).)*)', + '', + content, + flags=re.DOTALL | re.IGNORECASE, + ) + # 2. Unterminated reasoning block — open tag at a block boundary + # (start of text, or after a newline) with no matching close. + # Strip from the tag to end of string. Fixes #8878 / #9568 + # (MiniMax M2.7 leaking raw reasoning into assistant content). + content = re.sub( + r'(?:^|\n)[ \t]*<(?:think|thinking|reasoning|thought|REASONING_SCRATCHPAD)\b[^>]*>.*$', + '', + content, + flags=re.DOTALL | re.IGNORECASE, + ) + # 3. Stray orphan open/close tags that slipped through. + content = re.sub( + r'\s*', + '', + content, + flags=re.IGNORECASE, + ) + # 3b. Stray tool-call closers. (We do NOT strip bare or + # unterminated because a truncated tail + # during streaming may still be valuable to the user; matches + # OpenClaw's intentional asymmetry.) + content = re.sub( + r'\s*', + '', + content, + flags=re.IGNORECASE, + ) + return content + + + +def recover_with_credential_pool( + agent, + *, + status_code: Optional[int], + has_retried_429: bool, + classified_reason: Optional[FailoverReason] = None, + error_context: Optional[Dict[str, Any]] = None, +) -> tuple[bool, bool]: + """Attempt credential recovery via pool rotation. + + Returns (recovered, has_retried_429). + On rate limits: first occurrence retries same credential (sets flag True). + second consecutive failure rotates to next credential. + On billing exhaustion: immediately rotates. + On auth failures: attempts token refresh before rotating. + + `classified_reason` lets the recovery path honor the structured error + classifier instead of relying only on raw HTTP codes. This matters for + providers that surface billing/rate-limit/auth conditions under a + different status code, such as Anthropic returning HTTP 400 for + "out of extra usage". + """ + pool = agent._credential_pool + if pool is None: + return False, has_retried_429 + + effective_reason = classified_reason + if effective_reason is None: + if status_code == 402: + effective_reason = FailoverReason.billing + elif status_code == 429: + effective_reason = FailoverReason.rate_limit + elif status_code in {401, 403}: + effective_reason = FailoverReason.auth + + if effective_reason == FailoverReason.billing: + rotate_status = status_code if status_code is not None else 402 + next_entry = pool.mark_exhausted_and_rotate(status_code=rotate_status, error_context=error_context) + if next_entry is not None: + logger.info( + "Credential %s (billing) — rotated to pool entry %s", + rotate_status, + getattr(next_entry, "id", "?"), + ) + agent._swap_credential(next_entry) + return True, False + return False, has_retried_429 + + if effective_reason == FailoverReason.rate_limit: + if not has_retried_429: + return False, True + rotate_status = status_code if status_code is not None else 429 + next_entry = pool.mark_exhausted_and_rotate(status_code=rotate_status, error_context=error_context) + if next_entry is not None: + logger.info( + "Credential %s (rate limit) — rotated to pool entry %s", + rotate_status, + getattr(next_entry, "id", "?"), + ) + agent._swap_credential(next_entry) + return True, False + return False, True + + if effective_reason == FailoverReason.auth: + refreshed = pool.try_refresh_current() + if refreshed is not None: + logger.info(f"Credential auth failure — refreshed pool entry {getattr(refreshed, 'id', '?')}") + agent._swap_credential(refreshed) + return True, has_retried_429 + # Refresh failed — rotate to next credential instead of giving up. + # The failed entry is already marked exhausted by try_refresh_current(). + rotate_status = status_code if status_code is not None else 401 + next_entry = pool.mark_exhausted_and_rotate(status_code=rotate_status, error_context=error_context) + if next_entry is not None: + logger.info( + "Credential %s (auth refresh failed) — rotated to pool entry %s", + rotate_status, + getattr(next_entry, "id", "?"), + ) + agent._swap_credential(next_entry) + return True, False + + return False, has_retried_429 + + + +def try_recover_primary_transport( + agent, api_error: Exception, *, retry_count: int, max_retries: int, +) -> bool: + """Attempt one extra primary-provider recovery cycle for transient transport failures. + + After ``max_retries`` exhaust, rebuild the primary client (clearing + stale connection pools) and give it one more attempt before falling + back. This is most useful for direct endpoints (custom, Z.AI, + Anthropic, OpenAI, local models) where a TCP-level hiccup does not + mean the provider is down. + + Skipped for proxy/aggregator providers (OpenRouter, Nous) which + already manage connection pools and retries server-side — if our + retries through them are exhausted, one more rebuilt client won't help. + """ + if agent._fallback_activated: + return False + + # Only for transient transport errors + error_type = type(api_error).__name__ + if error_type not in _TRANSIENT_TRANSPORT_ERRORS: + return False + + # Skip for aggregator providers — they manage their own retry infra + if agent._is_openrouter_url(): + return False + provider_lower = (agent.provider or "").strip().lower() + if provider_lower in {"nous", "nous-research"}: + return False + + try: + # Close existing client to release stale connections + if getattr(agent, "client", None) is not None: + try: + agent._close_openai_client( + agent.client, reason="primary_recovery", shared=True, + ) + except Exception: + pass + + # Rebuild from primary snapshot + rt = agent._primary_runtime + agent._client_kwargs = dict(rt["client_kwargs"]) + agent.model = rt["model"] + agent.provider = rt["provider"] + agent.base_url = rt["base_url"] + agent.api_mode = rt["api_mode"] + if hasattr(agent, "_transport_cache"): + agent._transport_cache.clear() + agent.api_key = rt["api_key"] + + if agent.api_mode == "anthropic_messages": + from agent.anthropic_adapter import build_anthropic_client + agent._anthropic_api_key = rt["anthropic_api_key"] + agent._anthropic_base_url = rt["anthropic_base_url"] + agent._anthropic_client = build_anthropic_client( + rt["anthropic_api_key"], rt["anthropic_base_url"], + timeout=get_provider_request_timeout(agent.provider, agent.model), + ) + agent._is_anthropic_oauth = rt["is_anthropic_oauth"] + agent.client = None + else: + agent.client = agent._create_openai_client( + dict(rt["client_kwargs"]), + reason="primary_recovery", + shared=True, + ) + + wait_time = min(3 + retry_count, 8) + agent._vprint( + f"{agent.log_prefix}🔁 Transient {error_type} on {agent.provider} — " + f"rebuilt client, waiting {wait_time}s before one last primary attempt.", + force=True, + ) + time.sleep(wait_time) + return True + except Exception as e: + logging.warning("Primary transport recovery failed: %s", e) + return False + +# ── End provider fallback ────────────────────────────────────────────── + + + +def drop_thinking_only_and_merge_users( + messages: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: + """Drop thinking-only assistant turns; merge any adjacent user messages left behind. + + Runs on the per-call ``api_messages`` copy only. The stored + conversation history (``agent.messages``) is never mutated, so the + user still sees the thinking block in the CLI/gateway transcript and + session persistence keeps the full trace. Only the wire copy sent to + the provider is cleaned. + + Why drop-and-merge rather than inject stub text: + - Fabricating ``"."`` / ``"(continued)"`` text lies in the history + and makes future turns see model output the model didn't emit. + - Dropping the turn preserves honesty; merging adjacent user messages + preserves the provider's role-alternation invariant. + - This is the pattern used by Claude Code's ``normalizeMessagesForAPI`` + (filterOrphanedThinkingOnlyMessages + mergeAdjacentUserMessages). + """ + if not messages: + return messages + + # Pass 1: drop thinking-only assistant turns. + kept = [m for m in messages if not _ra().AIAgent._is_thinking_only_assistant(m)] + dropped = len(messages) - len(kept) + if dropped == 0: + return messages + + # Pass 2: merge any newly-adjacent user messages. + merged: List[Dict[str, Any]] = [] + merges = 0 + for m in kept: + prev = merged[-1] if merged else None + if ( + prev is not None + and prev.get("role") == "user" + and m.get("role") == "user" + ): + prev_content = prev.get("content", "") + cur_content = m.get("content", "") + # Work on a copy of ``prev`` so the caller's input dicts are + # never mutated. ``_sanitize_api_messages`` upstream already + # hands us per-call copies, but staying pure here means we + # can be called safely from anywhere (tests, other loops). + prev_copy = dict(prev) + # Only string-content merge is meaningful for role-alternation + # purposes. If either side is a list (multimodal), append as a + # separate block rather than collapsing. + if isinstance(prev_content, str) and isinstance(cur_content, str): + sep = "\n\n" if prev_content and cur_content else "" + prev_copy["content"] = prev_content + sep + cur_content + elif isinstance(prev_content, list) and isinstance(cur_content, list): + prev_copy["content"] = list(prev_content) + list(cur_content) + elif isinstance(prev_content, list) and isinstance(cur_content, str): + if cur_content: + prev_copy["content"] = list(prev_content) + [ + {"type": "text", "text": cur_content} + ] + else: + prev_copy["content"] = list(prev_content) + elif isinstance(prev_content, str) and isinstance(cur_content, list): + new_blocks: List[Dict[str, Any]] = [] + if prev_content: + new_blocks.append({"type": "text", "text": prev_content}) + new_blocks.extend(cur_content) + prev_copy["content"] = new_blocks + else: + # Unknown content shape — fall back to appending separately + # (violates alternation, but safer than raising in a hot path). + merged.append(m) + continue + merged[-1] = prev_copy + merges += 1 + else: + merged.append(m) + + logger.debug( + "Pre-call sanitizer: dropped %d thinking-only assistant turn(s), " + "merged %d adjacent user message(s)", + dropped, + merges, + ) + return merged + + + +def restore_primary_runtime(agent) -> bool: + """Restore the primary runtime at the start of a new turn. + + In long-lived CLI sessions a single AIAgent instance spans multiple + turns. Without restoration, one transient failure pins the session + to the fallback provider for every subsequent turn. Calling this at + the top of ``run_conversation()`` makes fallback turn-scoped. + + The gateway caches agents across messages (``_agent_cache`` in + ``gateway/run.py``), so this restoration IS needed there too. + """ + if not agent._fallback_activated: + return False + + if getattr(agent, "_rate_limited_until", 0) > time.monotonic(): + return False # primary still in rate-limit cooldown, stay on fallback + + rt = agent._primary_runtime + try: + # ── Core runtime state ── + agent.model = rt["model"] + agent.provider = rt["provider"] + agent.base_url = rt["base_url"] # setter updates _base_url_lower + agent.api_mode = rt["api_mode"] + if hasattr(agent, "_transport_cache"): + agent._transport_cache.clear() + agent.api_key = rt["api_key"] + agent._client_kwargs = dict(rt["client_kwargs"]) + agent._use_prompt_caching = rt["use_prompt_caching"] + # Default to native layout when the restored snapshot predates the + # native-vs-proxy split (older sessions saved before this PR). + agent._use_native_cache_layout = rt.get( + "use_native_cache_layout", + agent.api_mode == "anthropic_messages" and agent.provider == "anthropic", + ) + + # ── Rebuild client for the primary provider ── + if agent.api_mode == "anthropic_messages": + from agent.anthropic_adapter import build_anthropic_client + agent._anthropic_api_key = rt["anthropic_api_key"] + agent._anthropic_base_url = rt["anthropic_base_url"] + agent._anthropic_client = build_anthropic_client( + rt["anthropic_api_key"], rt["anthropic_base_url"], + timeout=get_provider_request_timeout(agent.provider, agent.model), + ) + agent._is_anthropic_oauth = rt["is_anthropic_oauth"] + agent.client = None + else: + agent.client = agent._create_openai_client( + dict(rt["client_kwargs"]), + reason="restore_primary", + shared=True, + ) + + # ── Restore context engine state ── + cc = agent.context_compressor + cc.update_model( + model=rt["compressor_model"], + context_length=rt["compressor_context_length"], + base_url=rt["compressor_base_url"], + api_key=rt["compressor_api_key"], + provider=rt["compressor_provider"], + ) + + # ── Reset fallback chain for the new turn ── + agent._fallback_activated = False + agent._fallback_index = 0 + + logging.info( + "Primary runtime restored for new turn: %s (%s)", + agent.model, agent.provider, + ) + return True + except Exception as e: + logging.warning("Failed to restore primary runtime: %s", e) + return False + +# Which error types indicate a transient transport failure worth +# one more attempt with a rebuilt client / connection pool. +_TRANSIENT_TRANSPORT_ERRORS = frozenset({ + "ReadTimeout", "ConnectTimeout", "PoolTimeout", + "ConnectError", "RemoteProtocolError", + "APIConnectionError", "APITimeoutError", +}) + + + +def extract_reasoning(agent, assistant_message) -> Optional[str]: + """ + Extract reasoning/thinking content from an assistant message. + + OpenRouter and various providers can return reasoning in multiple formats: + 1. message.reasoning - Direct reasoning field (DeepSeek, Qwen, etc.) + 2. message.reasoning_content - Alternative field (Moonshot AI, Novita, etc.) + 3. message.reasoning_details - Array of {type, summary, ...} objects (OpenRouter unified) + + Args: + assistant_message: The assistant message object from the API response + + Returns: + Combined reasoning text, or None if no reasoning found + """ + reasoning_parts = [] + + # Check direct reasoning field + if hasattr(assistant_message, 'reasoning') and assistant_message.reasoning: + reasoning_parts.append(assistant_message.reasoning) + + # Check reasoning_content field (alternative name used by some providers) + if hasattr(assistant_message, 'reasoning_content') and assistant_message.reasoning_content: + # Don't duplicate if same as reasoning + if assistant_message.reasoning_content not in reasoning_parts: + reasoning_parts.append(assistant_message.reasoning_content) + + # Check reasoning_details array (OpenRouter unified format) + # Format: [{"type": "reasoning.summary", "summary": "...", ...}, ...] + if hasattr(assistant_message, 'reasoning_details') and assistant_message.reasoning_details: + for detail in assistant_message.reasoning_details: + if isinstance(detail, dict): + # Extract summary from reasoning detail object + summary = ( + detail.get('summary') + or detail.get('thinking') + or detail.get('content') + or detail.get('text') + ) + if summary and summary not in reasoning_parts: + reasoning_parts.append(summary) + + # Some providers embed reasoning directly inside assistant content + # instead of returning structured reasoning fields. Only fall back + # to inline extraction when no structured reasoning was found. + content = getattr(assistant_message, "content", None) + if not reasoning_parts and isinstance(content, list): + # DeepSeek V4 Pro (and compatible providers) return content as a + # list of typed blocks, e.g.: + # [{"type": "thinking", "thinking": "..."}, {"type": "output", ...}] + # Without this branch the thinking text is silently dropped and the + # next turn fails with HTTP 400 ("thinking must be passed back"). + # Refs #21944. + for block in content: + if isinstance(block, dict) and block.get("type") == "thinking": + thinking_text = block.get("thinking") or block.get("text") or "" + thinking_text = thinking_text.strip() + if thinking_text and thinking_text not in reasoning_parts: + reasoning_parts.append(thinking_text) + if not reasoning_parts and isinstance(content, str) and content: + inline_patterns = ( + r"(.*?)", + r"(.*?)", + r"(.*?)", + r"(.*?)", + r"(.*?)", + ) + for pattern in inline_patterns: + flags = re.DOTALL | re.IGNORECASE + for block in re.findall(pattern, content, flags=flags): + cleaned = block.strip() + if cleaned and cleaned not in reasoning_parts: + reasoning_parts.append(cleaned) + + # Combine all reasoning parts + if reasoning_parts: + return "\n\n".join(reasoning_parts) + + return None + + + +def dump_api_request_debug( + agent, + api_kwargs: Dict[str, Any], + *, + reason: str, + error: Optional[Exception] = None, +) -> Optional[Path]: + """ + Dump a debug-friendly HTTP request record for the active inference API. + + Captures the request body from api_kwargs (excluding transport-only keys + like timeout). Intended for debugging provider-side 4xx failures where + retries are not useful. + """ + try: + body = copy.deepcopy(api_kwargs) + body.pop("timeout", None) + body = {k: v for k, v in body.items() if v is not None} + + api_key = None + try: + api_key = getattr(agent.client, "api_key", None) + except Exception as e: + logger.debug("Could not extract API key for debug dump: %s", e) + + dump_payload: Dict[str, Any] = { + "timestamp": datetime.now().isoformat(), + "session_id": agent.session_id, + "reason": reason, + "request": { + "method": "POST", + "url": f"{agent.base_url.rstrip('/')}{'/responses' if agent.api_mode == 'codex_responses' else '/chat/completions'}", + "headers": { + "Authorization": f"Bearer {agent._mask_api_key_for_logs(api_key)}", + "Content-Type": "application/json", + }, + "body": body, + }, + } + + if error is not None: + error_info: Dict[str, Any] = { + "type": type(error).__name__, + "message": str(error), + } + for attr_name in ("status_code", "request_id", "code", "param", "type"): + attr_value = getattr(error, attr_name, None) + if attr_value is not None: + error_info[attr_name] = attr_value + + body_attr = getattr(error, "body", None) + if body_attr is not None: + error_info["body"] = body_attr + + response_obj = getattr(error, "response", None) + if response_obj is not None: + try: + error_info["response_status"] = getattr(response_obj, "status_code", None) + error_info["response_text"] = response_obj.text + except Exception as e: + logger.debug("Could not extract error response details: %s", e) + + dump_payload["error"] = error_info + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") + dump_file = agent.logs_dir / f"request_dump_{agent.session_id}_{timestamp}.json" + dump_file.write_text( + json.dumps(dump_payload, ensure_ascii=False, indent=2, default=str), + encoding="utf-8", + ) + + agent._vprint(f"{agent.log_prefix}🧾 Request debug dump written to: {dump_file}") + + if env_var_enabled("HERMES_DUMP_REQUEST_STDOUT"): + print(json.dumps(dump_payload, ensure_ascii=False, indent=2, default=str)) + + return dump_file + except Exception as dump_error: + if agent.verbose_logging: + logging.warning(f"Failed to dump API request debug payload: {dump_error}") + return None + + + +def anthropic_prompt_cache_policy( + agent, + *, + provider: Optional[str] = None, + base_url: Optional[str] = None, + api_mode: Optional[str] = None, + model: Optional[str] = None, +) -> tuple[bool, bool]: + """Decide whether to apply Anthropic prompt caching and which layout to use. + + Returns ``(should_cache, use_native_layout)``: + * ``should_cache`` — inject ``cache_control`` breakpoints for this + request (applies to OpenRouter Claude, native Anthropic, and + third-party gateways that speak the native Anthropic protocol). + * ``use_native_layout`` — place markers on the *inner* content + blocks (native Anthropic accepts and requires this layout); + when False markers go on the message envelope (OpenRouter and + OpenAI-wire proxies expect the looser layout). + + Third-party providers using the native Anthropic transport + (``api_mode == 'anthropic_messages'`` + Claude-named model) get + caching with the native layout so they benefit from the same + cost reduction as direct Anthropic callers, provided their + gateway implements the Anthropic cache_control contract + (MiniMax, Zhipu GLM, LiteLLM's Anthropic proxy mode all do). + + Qwen / Alibaba-family models on OpenCode, OpenCode Go, and direct + Alibaba (DashScope) also honour Anthropic-style ``cache_control`` + markers on OpenAI-wire chat completions. Upstream pi-mono #3392 / + pi #3393 documented this for opencode-go Qwen. Without markers + these providers serve zero cache hits, re-billing the full prompt + on every turn. + """ + eff_provider = (provider if provider is not None else agent.provider) or "" + eff_base_url = base_url if base_url is not None else (agent.base_url or "") + eff_api_mode = api_mode if api_mode is not None else (agent.api_mode or "") + eff_model = (model if model is not None else agent.model) or "" + + model_lower = eff_model.lower() + provider_lower = eff_provider.lower() + is_claude = "claude" in model_lower + is_openrouter = base_url_host_matches(eff_base_url, "openrouter.ai") + # Nous Portal proxies to OpenRouter behind the scenes — identical + # OpenAI-wire envelope cache_control semantics. Treat it as an + # OpenRouter-equivalent endpoint for caching layout purposes. + is_nous_portal = "nousresearch" in eff_base_url.lower() + is_anthropic_wire = eff_api_mode == "anthropic_messages" + is_native_anthropic = ( + is_anthropic_wire + and (eff_provider == "anthropic" or base_url_hostname(eff_base_url) == "api.anthropic.com") + ) + + if is_native_anthropic: + return True, True + if (is_openrouter or is_nous_portal) and is_claude: + return True, False + # Nous Portal Qwen (e.g. qwen3.6-plus) takes the same envelope-layout + # cache_control path as Portal Claude. Portal proxies to OpenRouter + # and the upstream Qwen route accepts cache_control markers; without + # this branch the alibaba-family check below only matches + # provider=opencode/alibaba and Portal traffic falls through to + # (False, False), serving 0% cache hits and re-billing the full + # prompt on every turn. + if is_nous_portal and "qwen" in model_lower: + return True, False + if is_anthropic_wire and is_claude: + # Third-party Anthropic-compatible gateway. + return True, True + + # MiniMax on its Anthropic-compatible endpoint serves its own + # model family (MiniMax-M2.7, M2.5, M2.1, M2) with documented + # cache_control support (0.1× read pricing, 5-minute TTL). The + # blanket is_claude gate above excludes these — opt them in + # explicitly via provider id or host match so users on + # provider=minimax / minimax-cn (or custom endpoints pointing at + # api.minimax.io/anthropic / api.minimaxi.com/anthropic) get the + # same cost reduction as Claude traffic. + # Docs: https://platform.minimax.io/docs/api-reference/anthropic-api-compatible-cache + if is_anthropic_wire: + is_minimax_provider = provider_lower in {"minimax", "minimax-cn"} + is_minimax_host = ( + base_url_host_matches(eff_base_url, "api.minimax.io") + or base_url_host_matches(eff_base_url, "api.minimaxi.com") + ) + if is_minimax_provider or is_minimax_host: + return True, True + + # Qwen/Alibaba on OpenCode (Zen/Go) and native DashScope: OpenAI-wire + # transport that accepts Anthropic-style cache_control markers and + # rewards them with real cache hits. Without this branch + # qwen3.6-plus on opencode-go reports 0% cached tokens and burns + # through the subscription on every turn. + model_is_qwen = "qwen" in model_lower + provider_is_alibaba_family = provider_lower in { + "opencode", "opencode-zen", "opencode-go", "alibaba", + } + if provider_is_alibaba_family and model_is_qwen: + # Envelope layout (native_anthropic=False): markers on inner + # content parts, not top-level tool messages. Matches + # pi-mono's "alibaba" cacheControlFormat. + return True, False + + return False, False + + + +def create_openai_client(agent, client_kwargs: dict, *, reason: str, shared: bool) -> Any: + from agent.auxiliary_client import _validate_base_url, _validate_proxy_env_urls + # Treat client_kwargs as read-only. Callers pass agent._client_kwargs (or shallow + # copies of it) in; any in-place mutation leaks back into the stored dict and is + # reused on subsequent requests. #10933 hit this by injecting an httpx.Client + # transport that was torn down after the first request, so the next request + # wrapped a closed transport and raised "Cannot send a request, as the client + # has been closed" on every retry. The revert resolved that specific path; this + # copy locks the contract so future transport/keepalive work can't reintroduce + # the same class of bug. + client_kwargs = dict(client_kwargs) + _validate_proxy_env_urls() + _validate_base_url(client_kwargs.get("base_url")) + if agent.provider == "copilot-acp" or str(client_kwargs.get("base_url", "")).startswith("acp://copilot"): + from agent.copilot_acp_client import CopilotACPClient + + client = CopilotACPClient(**client_kwargs) + logger.info( + "Copilot ACP client created (%s, shared=%s) %s", + reason, + shared, + agent._client_log_context(), + ) + return client + if agent.provider == "google-gemini-cli" or str(client_kwargs.get("base_url", "")).startswith("cloudcode-pa://"): + from agent.gemini_cloudcode_adapter import GeminiCloudCodeClient + + # Strip OpenAI-specific kwargs the Gemini client doesn't accept + safe_kwargs = { + k: v for k, v in client_kwargs.items() + if k in {"api_key", "base_url", "default_headers", "project_id", "timeout"} + } + client = GeminiCloudCodeClient(**safe_kwargs) + logger.info( + "Gemini Cloud Code Assist client created (%s, shared=%s) %s", + reason, + shared, + agent._client_log_context(), + ) + return client + if agent.provider == "gemini": + from agent.gemini_native_adapter import GeminiNativeClient, is_native_gemini_base_url + + base_url = str(client_kwargs.get("base_url", "") or "") + if is_native_gemini_base_url(base_url): + safe_kwargs = { + k: v for k, v in client_kwargs.items() + if k in {"api_key", "base_url", "default_headers", "timeout", "http_client"} + } + if "http_client" not in safe_kwargs: + keepalive_http = agent._build_keepalive_http_client(base_url) + if keepalive_http is not None: + safe_kwargs["http_client"] = keepalive_http + client = GeminiNativeClient(**safe_kwargs) + logger.info( + "Gemini native client created (%s, shared=%s) %s", + reason, + shared, + agent._client_log_context(), + ) + return client + # Inject TCP keepalives so the kernel detects dead provider connections + # instead of letting them sit silently in CLOSE-WAIT (#10324). Without + # this, a peer that drops mid-stream leaves the socket in a state where + # epoll_wait never fires, ``httpx`` read timeout may not trigger, and + # the agent hangs until manually killed. Probes after 30s idle, retry + # every 10s, give up after 3 → dead peer detected within ~60s. + # + # Safety against #10933: the ``client_kwargs = dict(client_kwargs)`` + # above means this injection only lands in the local per-call copy, + # never back into ``agent._client_kwargs``. Each ``_create_openai_client`` + # invocation therefore gets its OWN fresh ``httpx.Client`` whose + # lifetime is tied to the OpenAI client it is passed to. When the + # OpenAI client is closed (rebuild, teardown, credential rotation), + # the paired ``httpx.Client`` closes with it, and the next call + # constructs a fresh one — no stale closed transport can be reused. + # Tests in ``tests/run_agent/test_create_openai_client_reuse.py`` and + # ``tests/run_agent/test_sequential_chats_live.py`` pin this invariant. + if "http_client" not in client_kwargs: + keepalive_http = agent._build_keepalive_http_client(client_kwargs.get("base_url", "")) + if keepalive_http is not None: + client_kwargs["http_client"] = keepalive_http + # Uses the module-level `OpenAI` name, resolved lazily on first + # access via __getattr__ below. Tests patch via `run_agent.OpenAI`. + client = _ra().OpenAI(**client_kwargs) + logger.info( + "OpenAI client created (%s, shared=%s) %s", + reason, + shared, + agent._client_log_context(), + ) + return client + + + +__all__ = [ + "convert_to_trajectory_format", + "sanitize_tool_call_arguments", + "repair_message_sequence", + "strip_think_blocks", + "recover_with_credential_pool", + "try_recover_primary_transport", + "drop_thinking_only_and_merge_users", + "restore_primary_runtime", + "extract_reasoning", + "dump_api_request_debug", + "anthropic_prompt_cache_policy", + "create_openai_client", +] diff --git a/agent/codex_runtime.py b/agent/codex_runtime.py new file mode 100644 index 00000000000..73a455f6bb0 --- /dev/null +++ b/agent/codex_runtime.py @@ -0,0 +1,400 @@ +"""Codex API runtime — App Server and Responses-API streaming paths. + +Extracted from :class:`AIAgent` to keep the agent loop file focused. +Each function takes the parent ``AIAgent`` as its first argument +(``agent``). AIAgent keeps thin forwarder methods for backward +compatibility. + +* ``run_codex_app_server_turn`` — drives one turn through the + ``codex_app_server`` subprocess client (used when a Codex CLI install + is the active provider). +* ``run_codex_stream`` — streams a Codex Responses API call (the + ``codex_responses`` api_mode). +* ``run_codex_create_stream_fallback`` — recovery path when the + Responses ``stream=True`` initial create fails. +""" + +from __future__ import annotations + +import contextvars +import json +import logging +import os +import threading +import time +import uuid +from types import SimpleNamespace +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +def _ra(): + """Lazy ``run_agent`` reference for test-patch routing.""" + import run_agent + return run_agent + + + +def run_codex_app_server_turn( + agent, + *, + user_message: str, + original_user_message: Any, + messages: List[Dict[str, Any]], + effective_task_id: str, + should_review_memory: bool = False, +) -> Dict[str, Any]: + """Codex app-server runtime path. Hands the entire turn to a `codex + app-server` subprocess and projects its events back into Hermes' + messages list so memory/skill review keep working. + + Called from run_conversation() when agent.api_mode == "codex_app_server". + Returns the same dict shape as the chat_completions path. + """ + from agent.transports.codex_app_server_session import CodexAppServerSession + + # Lazy session: one CodexAppServerSession per AIAgent instance. + # Spawned on first turn, reused across turns, closed at AIAgent + # shutdown (see _cleanup hook). + if not hasattr(agent, "_codex_session") or agent._codex_session is None: + cwd = getattr(agent, "session_cwd", None) or os.getcwd() + # Approval callback: defer to Hermes' standard prompt flow if a + # CLI thread has installed one. Gateway / cron contexts get the + # codex-side fail-closed default. + try: + from tools.terminal_tool import _get_approval_callback + approval_callback = _get_approval_callback() + except Exception: + approval_callback = None + agent._codex_session = CodexAppServerSession( + cwd=cwd, + approval_callback=approval_callback, + ) + + # NOTE: the user message is ALREADY appended to messages by the + # standard run_conversation() flow (line ~11823) before the early + # return reaches us. Do NOT append again — that would duplicate. + + try: + turn = agent._codex_session.run_turn(user_input=user_message) + except Exception as exc: + logger.exception("codex app-server turn failed") + # Crash → unconditionally drop the session so the next turn + # respawns from scratch instead of reusing a dead client. + try: + agent._codex_session.close() + except Exception: + pass + agent._codex_session = None + return { + "final_response": ( + f"Codex app-server turn failed: {exc}. " + f"Fall back to default runtime with `/codex-runtime auto`." + ), + "messages": messages, + "api_calls": 0, + "completed": False, + "partial": True, + "error": str(exc), + } + + # If the turn signalled the underlying client is wedged (deadline + # blown, post-tool watchdog tripped, OAuth refresh died, subprocess + # exited), retire the session so the next turn respawns codex + # rather than riding the broken process. Mirrors openclaw beta.8's + # "retire timed-out app-server clients" fix. + if getattr(turn, "should_retire", False): + logger.warning( + "codex app-server session retired (turn error: %s)", + turn.error, + ) + try: + agent._codex_session.close() + except Exception: + pass + agent._codex_session = None + + # Splice projected messages into the conversation. The projector emits + # standard {role, content, tool_calls, tool_call_id} entries, which + # is exactly what curator.py / sessions DB expect. + if turn.projected_messages: + messages.extend(turn.projected_messages) + + # Counter ticks for the agent-improvement loop. + # _turns_since_memory and _user_turn_count are ALREADY incremented + # in the run_conversation() pre-loop block (lines ~11793-11817) so we + # do NOT touch them here — that would double-count. + # Only _iters_since_skill needs explicit increment, since the + # chat_completions loop bumps it per tool iteration (line ~12110) + # and that loop is bypassed on this path. + agent._iters_since_skill = ( + getattr(agent, "_iters_since_skill", 0) + turn.tool_iterations + ) + + # Now check the skill nudge AFTER iters were incremented — same + # pattern the chat_completions path uses (line ~15432). + should_review_skills = False + if ( + agent._skill_nudge_interval > 0 + and agent._iters_since_skill >= agent._skill_nudge_interval + and "skill_manage" in agent.valid_tool_names + ): + should_review_skills = True + agent._iters_since_skill = 0 + + # External memory provider sync (mirrors line ~15439). Skipped on + # interrupt/error to avoid feeding partial transcripts to memory. + if not turn.interrupted and turn.error is None: + try: + agent._sync_external_memory_for_turn( + original_user_message=original_user_message, + final_response=turn.final_text, + interrupted=False, + ) + except Exception: + logger.debug("external memory sync raised", exc_info=True) + + # Background review fork — same cadence + signature as the default + # path (line ~15449). Only fires when a trigger actually tripped AND + # we have a real final response. + if ( + turn.final_text + and not turn.interrupted + and (should_review_memory or should_review_skills) + ): + try: + agent._spawn_background_review( + messages_snapshot=list(messages), + review_memory=should_review_memory, + review_skills=should_review_skills, + ) + except Exception: + logger.debug("background review spawn raised", exc_info=True) + + return { + "final_response": turn.final_text, + "messages": messages, + "api_calls": 1, # one app-server "turn" maps to one logical API call + "completed": not turn.interrupted and turn.error is None, + "partial": turn.interrupted or turn.error is not None, + "error": turn.error, + "codex_thread_id": turn.thread_id, + "codex_turn_id": turn.turn_id, + } + + + + +def run_codex_stream(agent, api_kwargs: dict, client: Any = None, on_first_delta: callable = None): + """Execute one streaming Responses API request and return the final response.""" + import httpx as _httpx + + active_client = client or agent._ensure_primary_openai_client(reason="codex_stream_direct") + max_stream_retries = 1 + has_tool_calls = False + first_delta_fired = False + # Accumulate streamed text so we can recover if get_final_response() + # returns empty output (e.g. chatgpt.com backend-api sends + # response.incomplete instead of response.completed). + agent._codex_streamed_text_parts: list = [] + for attempt in range(max_stream_retries + 1): + if agent._interrupt_requested: + raise InterruptedError("Agent interrupted before Codex stream retry") + collected_output_items: list = [] + try: + with active_client.responses.stream(**api_kwargs) as stream: + for event in stream: + agent._touch_activity("receiving stream response") + if agent._interrupt_requested: + break + event_type = getattr(event, "type", "") + # Fire callbacks on text content deltas (suppress during tool calls) + if "output_text.delta" in event_type or event_type == "response.output_text.delta": + delta_text = getattr(event, "delta", "") + if delta_text: + agent._codex_streamed_text_parts.append(delta_text) + if delta_text and not has_tool_calls: + if not first_delta_fired: + first_delta_fired = True + if on_first_delta: + try: + on_first_delta() + except Exception: + pass + agent._fire_stream_delta(delta_text) + # Track tool calls to suppress text streaming + elif "function_call" in event_type: + has_tool_calls = True + # Fire reasoning callbacks + elif "reasoning" in event_type and "delta" in event_type: + reasoning_text = getattr(event, "delta", "") + if reasoning_text: + agent._fire_reasoning_delta(reasoning_text) + # Collect completed output items — some backends + # (chatgpt.com/backend-api/codex) stream valid items + # via response.output_item.done but the SDK's + # get_final_response() returns an empty output list. + elif event_type == "response.output_item.done": + done_item = getattr(event, "item", None) + if done_item is not None: + collected_output_items.append(done_item) + # Log non-completed terminal events for diagnostics + elif event_type in {"response.incomplete", "response.failed"}: + resp_obj = getattr(event, "response", None) + status = getattr(resp_obj, "status", None) if resp_obj else None + incomplete_details = getattr(resp_obj, "incomplete_details", None) if resp_obj else None + logger.warning( + "Codex Responses stream received terminal event %s " + "(status=%s, incomplete_details=%s, streamed_chars=%d). %s", + event_type, status, incomplete_details, + sum(len(p) for p in agent._codex_streamed_text_parts), + agent._client_log_context(), + ) + final_response = stream.get_final_response() + # PATCH: ChatGPT Codex backend streams valid output items + # but get_final_response() can return an empty output list. + # Backfill from collected items or synthesize from deltas. + _out = getattr(final_response, "output", None) + if isinstance(_out, list) and not _out: + if collected_output_items: + final_response.output = list(collected_output_items) + logger.debug( + "Codex stream: backfilled %d output items from stream events", + len(collected_output_items), + ) + elif agent._codex_streamed_text_parts and not has_tool_calls: + assembled = "".join(agent._codex_streamed_text_parts) + final_response.output = [SimpleNamespace( + type="message", + role="assistant", + status="completed", + content=[SimpleNamespace(type="output_text", text=assembled)], + )] + logger.debug( + "Codex stream: synthesized output from %d text deltas (%d chars)", + len(agent._codex_streamed_text_parts), len(assembled), + ) + return final_response + except (_httpx.RemoteProtocolError, _httpx.ReadTimeout, _httpx.ConnectError, ConnectionError) as exc: + if attempt < max_stream_retries: + logger.debug( + "Codex Responses stream transport failed (attempt %s/%s); retrying. %s error=%s", + attempt + 1, + max_stream_retries + 1, + agent._client_log_context(), + exc, + ) + continue + logger.debug( + "Codex Responses stream transport failed; falling back to create(stream=True). %s error=%s", + agent._client_log_context(), + exc, + ) + return agent._run_codex_create_stream_fallback(api_kwargs, client=active_client) + except RuntimeError as exc: + err_text = str(exc) + missing_completed = "response.completed" in err_text + if missing_completed and attempt < max_stream_retries: + logger.debug( + "Responses stream closed before completion (attempt %s/%s); retrying. %s", + attempt + 1, + max_stream_retries + 1, + agent._client_log_context(), + ) + continue + if missing_completed: + logger.debug( + "Responses stream did not emit response.completed; falling back to create(stream=True). %s", + agent._client_log_context(), + ) + return agent._run_codex_create_stream_fallback(api_kwargs, client=active_client) + raise + + + +def run_codex_create_stream_fallback(agent, api_kwargs: dict, client: Any = None): + """Fallback path for stream completion edge cases on Codex-style Responses backends.""" + active_client = client or agent._ensure_primary_openai_client(reason="codex_create_stream_fallback") + fallback_kwargs = dict(api_kwargs) + fallback_kwargs["stream"] = True + fallback_kwargs = agent._get_transport().preflight_kwargs(fallback_kwargs, allow_stream=True) + stream_or_response = active_client.responses.create(**fallback_kwargs) + + # Compatibility shim for mocks or providers that still return a concrete response. + if hasattr(stream_or_response, "output"): + return stream_or_response + if not hasattr(stream_or_response, "__iter__"): + return stream_or_response + + terminal_response = None + collected_output_items: list = [] + collected_text_deltas: list = [] + try: + for event in stream_or_response: + agent._touch_activity("receiving stream response") + event_type = getattr(event, "type", None) + if not event_type and isinstance(event, dict): + event_type = event.get("type") + + # Collect output items and text deltas for backfill + if event_type == "response.output_item.done": + done_item = getattr(event, "item", None) + if done_item is None and isinstance(event, dict): + done_item = event.get("item") + if done_item is not None: + collected_output_items.append(done_item) + elif event_type in {"response.output_text.delta",}: + delta = getattr(event, "delta", "") + if not delta and isinstance(event, dict): + delta = event.get("delta", "") + if delta: + collected_text_deltas.append(delta) + + if event_type not in {"response.completed", "response.incomplete", "response.failed"}: + continue + + terminal_response = getattr(event, "response", None) + if terminal_response is None and isinstance(event, dict): + terminal_response = event.get("response") + if terminal_response is not None: + # Backfill empty output from collected stream events + _out = getattr(terminal_response, "output", None) + if isinstance(_out, list) and not _out: + if collected_output_items: + terminal_response.output = list(collected_output_items) + logger.debug( + "Codex fallback stream: backfilled %d output items", + len(collected_output_items), + ) + elif collected_text_deltas: + assembled = "".join(collected_text_deltas) + terminal_response.output = [SimpleNamespace( + type="message", role="assistant", + status="completed", + content=[SimpleNamespace(type="output_text", text=assembled)], + )] + logger.debug( + "Codex fallback stream: synthesized from %d deltas (%d chars)", + len(collected_text_deltas), len(assembled), + ) + return terminal_response + finally: + close_fn = getattr(stream_or_response, "close", None) + if callable(close_fn): + try: + close_fn() + except Exception: + pass + + if terminal_response is not None: + return terminal_response + raise RuntimeError("Responses create(stream=True) fallback did not emit a terminal response.") + + + +__all__ = [ + "run_codex_app_server_turn", + "run_codex_stream", + "run_codex_create_stream_fallback", +] diff --git a/run_agent.py b/run_agent.py index 4b5e405018e..8d6f7c3f35c 100644 --- a/run_agent.py +++ b/run_agent.py @@ -2303,101 +2303,9 @@ class AIAgent: api_mode: Optional[str] = None, model: Optional[str] = None, ) -> tuple[bool, bool]: - """Decide whether to apply Anthropic prompt caching and which layout to use. - - Returns ``(should_cache, use_native_layout)``: - * ``should_cache`` — inject ``cache_control`` breakpoints for this - request (applies to OpenRouter Claude, native Anthropic, and - third-party gateways that speak the native Anthropic protocol). - * ``use_native_layout`` — place markers on the *inner* content - blocks (native Anthropic accepts and requires this layout); - when False markers go on the message envelope (OpenRouter and - OpenAI-wire proxies expect the looser layout). - - Third-party providers using the native Anthropic transport - (``api_mode == 'anthropic_messages'`` + Claude-named model) get - caching with the native layout so they benefit from the same - cost reduction as direct Anthropic callers, provided their - gateway implements the Anthropic cache_control contract - (MiniMax, Zhipu GLM, LiteLLM's Anthropic proxy mode all do). - - Qwen / Alibaba-family models on OpenCode, OpenCode Go, and direct - Alibaba (DashScope) also honour Anthropic-style ``cache_control`` - markers on OpenAI-wire chat completions. Upstream pi-mono #3392 / - pi #3393 documented this for opencode-go Qwen. Without markers - these providers serve zero cache hits, re-billing the full prompt - on every turn. - """ - eff_provider = (provider if provider is not None else self.provider) or "" - eff_base_url = base_url if base_url is not None else (self.base_url or "") - eff_api_mode = api_mode if api_mode is not None else (self.api_mode or "") - eff_model = (model if model is not None else self.model) or "" - - model_lower = eff_model.lower() - provider_lower = eff_provider.lower() - is_claude = "claude" in model_lower - is_openrouter = base_url_host_matches(eff_base_url, "openrouter.ai") - # Nous Portal proxies to OpenRouter behind the scenes — identical - # OpenAI-wire envelope cache_control semantics. Treat it as an - # OpenRouter-equivalent endpoint for caching layout purposes. - is_nous_portal = "nousresearch" in eff_base_url.lower() - is_anthropic_wire = eff_api_mode == "anthropic_messages" - is_native_anthropic = ( - is_anthropic_wire - and (eff_provider == "anthropic" or base_url_hostname(eff_base_url) == "api.anthropic.com") - ) - - if is_native_anthropic: - return True, True - if (is_openrouter or is_nous_portal) and is_claude: - return True, False - # Nous Portal Qwen (e.g. qwen3.6-plus) takes the same envelope-layout - # cache_control path as Portal Claude. Portal proxies to OpenRouter - # and the upstream Qwen route accepts cache_control markers; without - # this branch the alibaba-family check below only matches - # provider=opencode/alibaba and Portal traffic falls through to - # (False, False), serving 0% cache hits and re-billing the full - # prompt on every turn. - if is_nous_portal and "qwen" in model_lower: - return True, False - if is_anthropic_wire and is_claude: - # Third-party Anthropic-compatible gateway. - return True, True - - # MiniMax on its Anthropic-compatible endpoint serves its own - # model family (MiniMax-M2.7, M2.5, M2.1, M2) with documented - # cache_control support (0.1× read pricing, 5-minute TTL). The - # blanket is_claude gate above excludes these — opt them in - # explicitly via provider id or host match so users on - # provider=minimax / minimax-cn (or custom endpoints pointing at - # api.minimax.io/anthropic / api.minimaxi.com/anthropic) get the - # same cost reduction as Claude traffic. - # Docs: https://platform.minimax.io/docs/api-reference/anthropic-api-compatible-cache - if is_anthropic_wire: - is_minimax_provider = provider_lower in {"minimax", "minimax-cn"} - is_minimax_host = ( - base_url_host_matches(eff_base_url, "api.minimax.io") - or base_url_host_matches(eff_base_url, "api.minimaxi.com") - ) - if is_minimax_provider or is_minimax_host: - return True, True - - # Qwen/Alibaba on OpenCode (Zen/Go) and native DashScope: OpenAI-wire - # transport that accepts Anthropic-style cache_control markers and - # rewards them with real cache hits. Without this branch - # qwen3.6-plus on opencode-go reports 0% cached tokens and burns - # through the subscription on every turn. - model_is_qwen = "qwen" in model_lower - provider_is_alibaba_family = provider_lower in { - "opencode", "opencode-zen", "opencode-go", "alibaba", - } - if provider_is_alibaba_family and model_is_qwen: - # Envelope layout (native_anthropic=False): markers on inner - # content parts, not top-level tool messages. Matches - # pi-mono's "alibaba" cacheControlFormat. - return True, False - - return False, False + """Forwarder — see ``agent.agent_runtime_helpers.anthropic_prompt_cache_policy``.""" + from agent.agent_runtime_helpers import anthropic_prompt_cache_policy + return anthropic_prompt_cache_policy(self, provider=provider, base_url=base_url, api_mode=api_mode, model=model) @staticmethod def _model_requires_responses_api(model: str) -> bool: @@ -2473,98 +2381,9 @@ class AIAgent: return bool(cleaned.strip()) def _strip_think_blocks(self, content: str) -> str: - """Remove reasoning/thinking blocks from content, returning only visible text. - - Handles four cases: - 1. Closed tag pairs (````) — the common path when - the provider emits complete reasoning blocks. - 2. Unterminated open tag at a block boundary (start of text or - after a newline) — e.g. MiniMax M2.7 / NIM endpoints where the - closing tag is dropped. Everything from the open tag to end - of string is stripped. The block-boundary check mirrors - ``gateway/stream_consumer.py``'s filter so models that mention - ```` in prose aren't over-stripped. - 3. Stray orphan open/close tags that slip through. - 4. Tag variants: ````, ````, ````, - ````, ```` (Gemma 4), all - case-insensitive. - - Additionally strips standalone tool-call XML blocks that some open - models (notably Gemma variants on OpenRouter) emit inside assistant - content instead of via the structured ``tool_calls`` field: - * ```` - * ```` - * ```` - * ```` - * ```` - * ```` (Gemma style) - Ported from openclaw/openclaw#67318. The ```` variant is - boundary-gated (only strips when the tag sits at start-of-line or - after punctuation and carries a ``name="..."`` attribute) so prose - mentions like "Use in JavaScript" are preserved. - """ - if not content: - return "" - # 1. Closed tag pairs — case-insensitive for all variants so - # mixed-case tags (, ) don't slip through to - # the unterminated-tag pass and take trailing content with them. - content = re.sub(r'.*?', '', content, flags=re.DOTALL | re.IGNORECASE) - content = re.sub(r'.*?', '', content, flags=re.DOTALL | re.IGNORECASE) - content = re.sub(r'.*?', '', content, flags=re.DOTALL | re.IGNORECASE) - content = re.sub(r'.*?', '', content, flags=re.DOTALL | re.IGNORECASE) - content = re.sub(r'.*?', '', content, flags=re.DOTALL | re.IGNORECASE) - # 1b. Tool-call XML blocks (openclaw/openclaw#67318). Handle the - # generic tag names first — they have no attribute gating since - # a literal in prose is already vanishingly rare. - for _tc_name in ("tool_call", "tool_calls", "tool_result", - "function_call", "function_calls"): - content = re.sub( - rf'<{_tc_name}\b[^>]*>.*?', - '', - content, - flags=re.DOTALL | re.IGNORECASE, - ) - # 1c. ... — Gemma-style standalone - # tool call. Only strip when the tag sits at a block boundary - # (start of text, after a newline, or after sentence-ending - # punctuation) AND carries a name="..." attribute. This keeps - # prose mentions like "Use to declare" safe. - content = re.sub( - r'(?:(?<=^)|(?<=[\n\r.!?:]))[ \t]*' - r']*\bname\s*=[^>]*>' - r'(?:(?:(?!).)*)', - '', - content, - flags=re.DOTALL | re.IGNORECASE, - ) - # 2. Unterminated reasoning block — open tag at a block boundary - # (start of text, or after a newline) with no matching close. - # Strip from the tag to end of string. Fixes #8878 / #9568 - # (MiniMax M2.7 leaking raw reasoning into assistant content). - content = re.sub( - r'(?:^|\n)[ \t]*<(?:think|thinking|reasoning|thought|REASONING_SCRATCHPAD)\b[^>]*>.*$', - '', - content, - flags=re.DOTALL | re.IGNORECASE, - ) - # 3. Stray orphan open/close tags that slipped through. - content = re.sub( - r'\s*', - '', - content, - flags=re.IGNORECASE, - ) - # 3b. Stray tool-call closers. (We do NOT strip bare or - # unterminated because a truncated tail - # during streaming may still be valuable to the user; matches - # OpenClaw's intentional asymmetry.) - content = re.sub( - r'\s*', - '', - content, - flags=re.IGNORECASE, - ) - return content + """Forwarder — see ``agent.agent_runtime_helpers.strip_think_blocks``.""" + from agent.agent_runtime_helpers import strip_think_blocks + return strip_think_blocks(self, content) @staticmethod def _has_natural_response_ending(content: str) -> bool: @@ -2692,84 +2511,9 @@ class AIAgent: def _extract_reasoning(self, assistant_message) -> Optional[str]: - """ - Extract reasoning/thinking content from an assistant message. - - OpenRouter and various providers can return reasoning in multiple formats: - 1. message.reasoning - Direct reasoning field (DeepSeek, Qwen, etc.) - 2. message.reasoning_content - Alternative field (Moonshot AI, Novita, etc.) - 3. message.reasoning_details - Array of {type, summary, ...} objects (OpenRouter unified) - - Args: - assistant_message: The assistant message object from the API response - - Returns: - Combined reasoning text, or None if no reasoning found - """ - reasoning_parts = [] - - # Check direct reasoning field - if hasattr(assistant_message, 'reasoning') and assistant_message.reasoning: - reasoning_parts.append(assistant_message.reasoning) - - # Check reasoning_content field (alternative name used by some providers) - if hasattr(assistant_message, 'reasoning_content') and assistant_message.reasoning_content: - # Don't duplicate if same as reasoning - if assistant_message.reasoning_content not in reasoning_parts: - reasoning_parts.append(assistant_message.reasoning_content) - - # Check reasoning_details array (OpenRouter unified format) - # Format: [{"type": "reasoning.summary", "summary": "...", ...}, ...] - if hasattr(assistant_message, 'reasoning_details') and assistant_message.reasoning_details: - for detail in assistant_message.reasoning_details: - if isinstance(detail, dict): - # Extract summary from reasoning detail object - summary = ( - detail.get('summary') - or detail.get('thinking') - or detail.get('content') - or detail.get('text') - ) - if summary and summary not in reasoning_parts: - reasoning_parts.append(summary) - - # Some providers embed reasoning directly inside assistant content - # instead of returning structured reasoning fields. Only fall back - # to inline extraction when no structured reasoning was found. - content = getattr(assistant_message, "content", None) - if not reasoning_parts and isinstance(content, list): - # DeepSeek V4 Pro (and compatible providers) return content as a - # list of typed blocks, e.g.: - # [{"type": "thinking", "thinking": "..."}, {"type": "output", ...}] - # Without this branch the thinking text is silently dropped and the - # next turn fails with HTTP 400 ("thinking must be passed back"). - # Refs #21944. - for block in content: - if isinstance(block, dict) and block.get("type") == "thinking": - thinking_text = block.get("thinking") or block.get("text") or "" - thinking_text = thinking_text.strip() - if thinking_text and thinking_text not in reasoning_parts: - reasoning_parts.append(thinking_text) - if not reasoning_parts and isinstance(content, str) and content: - inline_patterns = ( - r"(.*?)", - r"(.*?)", - r"(.*?)", - r"(.*?)", - r"(.*?)", - ) - for pattern in inline_patterns: - flags = re.DOTALL | re.IGNORECASE - for block in re.findall(pattern, content, flags=flags): - cleaned = block.strip() - if cleaned and cleaned not in reasoning_parts: - reasoning_parts.append(cleaned) - - # Combine all reasoning parts - if reasoning_parts: - return "\n\n".join(reasoning_parts) - - return None + """Forwarder — see ``agent.agent_runtime_helpers.extract_reasoning``.""" + from agent.agent_runtime_helpers import extract_reasoning + return extract_reasoning(self, assistant_message) def _cleanup_task_resources(self, task_id: str) -> None: """Forwarder — see ``agent.chat_completion_helpers.cleanup_task_resources``.""" @@ -3097,104 +2841,9 @@ class AIAgent: messages.pop() def _repair_message_sequence(self, messages: List[Dict]) -> int: - """Collapse malformed role-alternation left in the live history. - - Providers (OpenAI, OpenRouter, Anthropic) expect strict alternation: - after the system message, user/tool alternates with assistant, with - no two consecutive user messages and no tool-result that doesn't - follow an assistant-with-tool_calls. Violations cause silent empty - responses on most providers, which triggers the empty-retry loop. - - This runs right before the API call as a defensive belt — by the - time it fires, the scaffolding strip should already have prevented - most shapes, but external callers (gateway multi-queue replay, - session resume, cron, explicit conversation_history passed in by - host code) can feed in already-broken histories. - - Repairs applied: - 1. Stray ``tool`` messages whose ``tool_call_id`` doesn't match - any preceding assistant tool_call — dropped. - 2. Consecutive ``user`` messages — merged with newline separator - so no user input is lost. - - Deliberately does NOT rewind orphan ``assistant(tool_calls)+tool`` - pairs that precede a user message — that pattern IS valid when the - previous turn completed normally and the user jumped in to redirect - before the model got a continuation turn (the ongoing dialog - pattern). The empty-response scaffolding stripper handles the - genuinely-broken variant via its flag-gated rewind. - - Returns the number of repairs made (for logging/telemetry). - """ - if not messages: - return 0 - - repairs = 0 - - # Pass 1: drop stray tool messages that don't follow a known - # assistant tool_call_id. Uses a rolling set of known ids refreshed - # on each assistant message. - known_tool_ids: set = set() - filtered: List[Dict] = [] - for msg in messages: - if not isinstance(msg, dict): - filtered.append(msg) - continue - role = msg.get("role") - if role == "assistant": - known_tool_ids = set() - for tc in (msg.get("tool_calls") or []): - tc_id = tc.get("id") if isinstance(tc, dict) else None - if tc_id: - known_tool_ids.add(tc_id) - filtered.append(msg) - elif role == "tool": - tc_id = msg.get("tool_call_id") - if tc_id and tc_id in known_tool_ids: - filtered.append(msg) - else: - repairs += 1 - else: - if role == "user": - # A user turn closes the tool-result run; subsequent - # tool messages without a fresh assistant tool_call - # are orphans. - known_tool_ids = set() - filtered.append(msg) - - # Pass 2: merge consecutive user messages. Preserves all user input - # so nothing the user typed is lost. - merged: List[Dict] = [] - for msg in filtered: - if ( - merged - and isinstance(msg, dict) - and msg.get("role") == "user" - and isinstance(merged[-1], dict) - and merged[-1].get("role") == "user" - ): - prev = merged[-1] - prev_content = prev.get("content", "") - new_content = msg.get("content", "") - # Only merge plain-text content; leave multimodal (list) - # content alone — collapsing image/audio blocks risks - # mangling the attachment structure. - if isinstance(prev_content, str) and isinstance(new_content, str): - prev["content"] = ( - (prev_content + "\n\n" + new_content) - if prev_content and new_content - else (prev_content or new_content) - ) - repairs += 1 - continue - merged.append(msg) - - if repairs > 0: - # Rewrite in place so downstream paths (persistence, return - # value, session DB flush) see the repaired sequence. - messages[:] = merged - - return repairs + """Forwarder — see ``agent.agent_runtime_helpers.repair_message_sequence``.""" + from agent.agent_runtime_helpers import repair_message_sequence + return repair_message_sequence(self, messages) def _flush_messages_to_session_db(self, messages: List[Dict], conversation_history: List[Dict] = None): """Persist any un-flushed messages to the SQLite session store. @@ -3292,173 +2941,9 @@ class AIAgent: return format_tools_for_system_message(self) def _convert_to_trajectory_format(self, messages: List[Dict[str, Any]], user_query: str, completed: bool) -> List[Dict[str, Any]]: - """ - Convert internal message format to trajectory format for saving. - - Args: - messages (List[Dict]): Internal message history - user_query (str): Original user query - completed (bool): Whether the conversation completed successfully - - Returns: - List[Dict]: Messages in trajectory format - """ - # Normalize multimodal tool results — trajectories are text-only, so - # replace image-bearing tool messages with their text_summary to avoid - # embedding ~1MB base64 blobs into every saved trajectory. - messages = [_trajectory_normalize_msg(m) for m in messages] - trajectory = [] - - # Add system message with tool definitions - system_msg = ( - "You are a function calling AI model. You are provided with function signatures within XML tags. " - "You may call one or more functions to assist with the user query. If available tools are not relevant in assisting " - "with user query, just respond in natural conversational language. Don't make assumptions about what values to plug " - "into functions. After calling & executing the functions, you will be provided with function results within " - " XML tags. Here are the available tools:\n" - f"\n{self._format_tools_for_system_message()}\n\n" - "For each function call return a JSON object, with the following pydantic model json schema for each:\n" - "{'title': 'FunctionCall', 'type': 'object', 'properties': {'name': {'title': 'Name', 'type': 'string'}, " - "'arguments': {'title': 'Arguments', 'type': 'object'}}, 'required': ['name', 'arguments']}\n" - "Each function call should be enclosed within XML tags.\n" - "Example:\n\n{'name': ,'arguments': }\n" - ) - - trajectory.append({ - "from": "system", - "value": system_msg - }) - - # Add the actual user prompt (from the dataset) as the first human message - trajectory.append({ - "from": "human", - "value": user_query - }) - - # Skip the first message (the user query) since we already added it above. - # Prefill messages are injected at API-call time only (not in the messages - # list), so no offset adjustment is needed here. - i = 1 - - while i < len(messages): - msg = messages[i] - - if msg["role"] == "assistant": - # Check if this message has tool calls - if "tool_calls" in msg and msg["tool_calls"]: - # Format assistant message with tool calls - # Add tags around reasoning for trajectory storage - content = "" - - # Prepend reasoning in tags if available (native thinking tokens) - if msg.get("reasoning") and msg["reasoning"].strip(): - content = f"\n{msg['reasoning']}\n\n" - - if msg.get("content") and msg["content"].strip(): - # Convert any tags to tags - # (used when native thinking is disabled and model reasons via XML) - content += convert_scratchpad_to_think(msg["content"]) + "\n" - - # Add tool calls wrapped in XML tags - for tool_call in msg["tool_calls"]: - if not tool_call or not isinstance(tool_call, dict): continue - # Parse arguments - should always succeed since we validate during conversation - # but keep try-except as safety net - try: - arguments = json.loads(tool_call["function"]["arguments"]) if isinstance(tool_call["function"]["arguments"], str) else tool_call["function"]["arguments"] - except json.JSONDecodeError: - # This shouldn't happen since we validate and retry during conversation, - # but if it does, log warning and use empty dict - logging.warning(f"Unexpected invalid JSON in trajectory conversion: {tool_call['function']['arguments'][:100]}") - arguments = {} - - tool_call_json = { - "name": tool_call["function"]["name"], - "arguments": arguments - } - content += f"\n{json.dumps(tool_call_json, ensure_ascii=False)}\n\n" - - # Ensure every gpt turn has a block (empty if no reasoning) - # so the format is consistent for training data - if "" not in content: - content = "\n\n" + content - - trajectory.append({ - "from": "gpt", - "value": content.rstrip() - }) - - # Collect all subsequent tool responses - tool_responses = [] - j = i + 1 - while j < len(messages) and messages[j]["role"] == "tool": - tool_msg = messages[j] - # Format tool response with XML tags - tool_response = "\n" - - # Try to parse tool content as JSON if it looks like JSON - tool_content = tool_msg["content"] - try: - if tool_content.strip().startswith(("{", "[")): - tool_content = json.loads(tool_content) - except (json.JSONDecodeError, AttributeError): - pass # Keep as string if not valid JSON - - tool_index = len(tool_responses) - tool_name = ( - msg["tool_calls"][tool_index]["function"]["name"] - if tool_index < len(msg["tool_calls"]) - else "unknown" - ) - tool_response += json.dumps({ - "tool_call_id": tool_msg.get("tool_call_id", ""), - "name": tool_name, - "content": tool_content - }, ensure_ascii=False) - tool_response += "\n" - tool_responses.append(tool_response) - j += 1 - - # Add all tool responses as a single message - if tool_responses: - trajectory.append({ - "from": "tool", - "value": "\n".join(tool_responses) - }) - i = j - 1 # Skip the tool messages we just processed - - else: - # Regular assistant message without tool calls - # Add tags around reasoning for trajectory storage - content = "" - - # Prepend reasoning in tags if available (native thinking tokens) - if msg.get("reasoning") and msg["reasoning"].strip(): - content = f"\n{msg['reasoning']}\n\n" - - # Convert any tags to tags - # (used when native thinking is disabled and model reasons via XML) - raw_content = msg["content"] or "" - content += convert_scratchpad_to_think(raw_content) - - # Ensure every gpt turn has a block (empty if no reasoning) - if "" not in content: - content = "\n\n" + content - - trajectory.append({ - "from": "gpt", - "value": content.strip() - }) - - elif msg["role"] == "user": - trajectory.append({ - "from": "human", - "value": msg["content"] - }) - - i += 1 - - return trajectory + """Forwarder — see ``agent.agent_runtime_helpers.convert_to_trajectory_format``.""" + from agent.agent_runtime_helpers import convert_to_trajectory_format + return convert_to_trajectory_format(self, messages, user_query, completed) def _save_trajectory(self, messages: List[Dict[str, Any]], user_query: str, completed: bool): """ @@ -3636,80 +3121,9 @@ class AIAgent: reason: str, error: Optional[Exception] = None, ) -> Optional[Path]: - """ - Dump a debug-friendly HTTP request record for the active inference API. - - Captures the request body from api_kwargs (excluding transport-only keys - like timeout). Intended for debugging provider-side 4xx failures where - retries are not useful. - """ - try: - body = copy.deepcopy(api_kwargs) - body.pop("timeout", None) - body = {k: v for k, v in body.items() if v is not None} - - api_key = None - try: - api_key = getattr(self.client, "api_key", None) - except Exception as e: - logger.debug("Could not extract API key for debug dump: %s", e) - - dump_payload: Dict[str, Any] = { - "timestamp": datetime.now().isoformat(), - "session_id": self.session_id, - "reason": reason, - "request": { - "method": "POST", - "url": f"{self.base_url.rstrip('/')}{'/responses' if self.api_mode == 'codex_responses' else '/chat/completions'}", - "headers": { - "Authorization": f"Bearer {self._mask_api_key_for_logs(api_key)}", - "Content-Type": "application/json", - }, - "body": body, - }, - } - - if error is not None: - error_info: Dict[str, Any] = { - "type": type(error).__name__, - "message": str(error), - } - for attr_name in ("status_code", "request_id", "code", "param", "type"): - attr_value = getattr(error, attr_name, None) - if attr_value is not None: - error_info[attr_name] = attr_value - - body_attr = getattr(error, "body", None) - if body_attr is not None: - error_info["body"] = body_attr - - response_obj = getattr(error, "response", None) - if response_obj is not None: - try: - error_info["response_status"] = getattr(response_obj, "status_code", None) - error_info["response_text"] = response_obj.text - except Exception as e: - logger.debug("Could not extract error response details: %s", e) - - dump_payload["error"] = error_info - - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") - dump_file = self.logs_dir / f"request_dump_{self.session_id}_{timestamp}.json" - dump_file.write_text( - json.dumps(dump_payload, ensure_ascii=False, indent=2, default=str), - encoding="utf-8", - ) - - self._vprint(f"{self.log_prefix}🧾 Request debug dump written to: {dump_file}") - - if env_var_enabled("HERMES_DUMP_REQUEST_STDOUT"): - print(json.dumps(dump_payload, ensure_ascii=False, indent=2, default=str)) - - return dump_file - except Exception as dump_error: - if self.verbose_logging: - logging.warning(f"Failed to dump API request debug payload: {dump_error}") - return None + """Forwarder — see ``agent.agent_runtime_helpers.dump_api_request_debug``.""" + from agent.agent_runtime_helpers import dump_api_request_debug + return dump_api_request_debug(self, api_kwargs, reason=reason, error=error) @staticmethod def _clean_session_content(content: str) -> str: @@ -4584,86 +3998,9 @@ class AIAgent: def _drop_thinking_only_and_merge_users( messages: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: - """Drop thinking-only assistant turns; merge any adjacent user messages left behind. - - Runs on the per-call ``api_messages`` copy only. The stored - conversation history (``self.messages``) is never mutated, so the - user still sees the thinking block in the CLI/gateway transcript and - session persistence keeps the full trace. Only the wire copy sent to - the provider is cleaned. - - Why drop-and-merge rather than inject stub text: - - Fabricating ``"."`` / ``"(continued)"`` text lies in the history - and makes future turns see model output the model didn't emit. - - Dropping the turn preserves honesty; merging adjacent user messages - preserves the provider's role-alternation invariant. - - This is the pattern used by Claude Code's ``normalizeMessagesForAPI`` - (filterOrphanedThinkingOnlyMessages + mergeAdjacentUserMessages). - """ - if not messages: - return messages - - # Pass 1: drop thinking-only assistant turns. - kept = [m for m in messages if not AIAgent._is_thinking_only_assistant(m)] - dropped = len(messages) - len(kept) - if dropped == 0: - return messages - - # Pass 2: merge any newly-adjacent user messages. - merged: List[Dict[str, Any]] = [] - merges = 0 - for m in kept: - prev = merged[-1] if merged else None - if ( - prev is not None - and prev.get("role") == "user" - and m.get("role") == "user" - ): - prev_content = prev.get("content", "") - cur_content = m.get("content", "") - # Work on a copy of ``prev`` so the caller's input dicts are - # never mutated. ``_sanitize_api_messages`` upstream already - # hands us per-call copies, but staying pure here means we - # can be called safely from anywhere (tests, other loops). - prev_copy = dict(prev) - # Only string-content merge is meaningful for role-alternation - # purposes. If either side is a list (multimodal), append as a - # separate block rather than collapsing. - if isinstance(prev_content, str) and isinstance(cur_content, str): - sep = "\n\n" if prev_content and cur_content else "" - prev_copy["content"] = prev_content + sep + cur_content - elif isinstance(prev_content, list) and isinstance(cur_content, list): - prev_copy["content"] = list(prev_content) + list(cur_content) - elif isinstance(prev_content, list) and isinstance(cur_content, str): - if cur_content: - prev_copy["content"] = list(prev_content) + [ - {"type": "text", "text": cur_content} - ] - else: - prev_copy["content"] = list(prev_content) - elif isinstance(prev_content, str) and isinstance(cur_content, list): - new_blocks: List[Dict[str, Any]] = [] - if prev_content: - new_blocks.append({"type": "text", "text": prev_content}) - new_blocks.extend(cur_content) - prev_copy["content"] = new_blocks - else: - # Unknown content shape — fall back to appending separately - # (violates alternation, but safer than raising in a hot path). - merged.append(m) - continue - merged[-1] = prev_copy - merges += 1 - else: - merged.append(m) - - logger.debug( - "Pre-call sanitizer: dropped %d thinking-only assistant turn(s), " - "merged %d adjacent user message(s)", - dropped, - merges, - ) - return merged + """Forwarder — see ``agent.agent_runtime_helpers.drop_thinking_only_and_merge_users``.""" + from agent.agent_runtime_helpers import drop_thinking_only_and_merge_users + return drop_thinking_only_and_merge_users(messages) @staticmethod def _cap_delegate_task_calls(tool_calls: list) -> list: @@ -4890,97 +4227,9 @@ class AIAgent: return None def _create_openai_client(self, client_kwargs: dict, *, reason: str, shared: bool) -> Any: - from agent.auxiliary_client import _validate_base_url, _validate_proxy_env_urls - # Treat client_kwargs as read-only. Callers pass self._client_kwargs (or shallow - # copies of it) in; any in-place mutation leaks back into the stored dict and is - # reused on subsequent requests. #10933 hit this by injecting an httpx.Client - # transport that was torn down after the first request, so the next request - # wrapped a closed transport and raised "Cannot send a request, as the client - # has been closed" on every retry. The revert resolved that specific path; this - # copy locks the contract so future transport/keepalive work can't reintroduce - # the same class of bug. - client_kwargs = dict(client_kwargs) - _validate_proxy_env_urls() - _validate_base_url(client_kwargs.get("base_url")) - if self.provider == "copilot-acp" or str(client_kwargs.get("base_url", "")).startswith("acp://copilot"): - from agent.copilot_acp_client import CopilotACPClient - - client = CopilotACPClient(**client_kwargs) - logger.info( - "Copilot ACP client created (%s, shared=%s) %s", - reason, - shared, - self._client_log_context(), - ) - return client - if self.provider == "google-gemini-cli" or str(client_kwargs.get("base_url", "")).startswith("cloudcode-pa://"): - from agent.gemini_cloudcode_adapter import GeminiCloudCodeClient - - # Strip OpenAI-specific kwargs the Gemini client doesn't accept - safe_kwargs = { - k: v for k, v in client_kwargs.items() - if k in {"api_key", "base_url", "default_headers", "project_id", "timeout"} - } - client = GeminiCloudCodeClient(**safe_kwargs) - logger.info( - "Gemini Cloud Code Assist client created (%s, shared=%s) %s", - reason, - shared, - self._client_log_context(), - ) - return client - if self.provider == "gemini": - from agent.gemini_native_adapter import GeminiNativeClient, is_native_gemini_base_url - - base_url = str(client_kwargs.get("base_url", "") or "") - if is_native_gemini_base_url(base_url): - safe_kwargs = { - k: v for k, v in client_kwargs.items() - if k in {"api_key", "base_url", "default_headers", "timeout", "http_client"} - } - if "http_client" not in safe_kwargs: - keepalive_http = self._build_keepalive_http_client(base_url) - if keepalive_http is not None: - safe_kwargs["http_client"] = keepalive_http - client = GeminiNativeClient(**safe_kwargs) - logger.info( - "Gemini native client created (%s, shared=%s) %s", - reason, - shared, - self._client_log_context(), - ) - return client - # Inject TCP keepalives so the kernel detects dead provider connections - # instead of letting them sit silently in CLOSE-WAIT (#10324). Without - # this, a peer that drops mid-stream leaves the socket in a state where - # epoll_wait never fires, ``httpx`` read timeout may not trigger, and - # the agent hangs until manually killed. Probes after 30s idle, retry - # every 10s, give up after 3 → dead peer detected within ~60s. - # - # Safety against #10933: the ``client_kwargs = dict(client_kwargs)`` - # above means this injection only lands in the local per-call copy, - # never back into ``self._client_kwargs``. Each ``_create_openai_client`` - # invocation therefore gets its OWN fresh ``httpx.Client`` whose - # lifetime is tied to the OpenAI client it is passed to. When the - # OpenAI client is closed (rebuild, teardown, credential rotation), - # the paired ``httpx.Client`` closes with it, and the next call - # constructs a fresh one — no stale closed transport can be reused. - # Tests in ``tests/run_agent/test_create_openai_client_reuse.py`` and - # ``tests/run_agent/test_sequential_chats_live.py`` pin this invariant. - if "http_client" not in client_kwargs: - keepalive_http = self._build_keepalive_http_client(client_kwargs.get("base_url", "")) - if keepalive_http is not None: - client_kwargs["http_client"] = keepalive_http - # Uses the module-level `OpenAI` name, resolved lazily on first - # access via __getattr__ below. Tests patch via `run_agent.OpenAI`. - client = OpenAI(**client_kwargs) - logger.info( - "OpenAI client created (%s, shared=%s) %s", - reason, - shared, - self._client_log_context(), - ) - return client + """Forwarder — see ``agent.agent_runtime_helpers.create_openai_client``.""" + from agent.agent_runtime_helpers import create_openai_client + return create_openai_client(self, client_kwargs, reason=reason, shared=shared) @staticmethod def _force_close_tcp_sockets(client: Any) -> int: @@ -5230,207 +4479,14 @@ class AIAgent: self._close_openai_client(client, reason=reason, shared=False) def _run_codex_stream(self, api_kwargs: dict, client: Any = None, on_first_delta: callable = None): - """Execute one streaming Responses API request and return the final response.""" - import httpx as _httpx - - active_client = client or self._ensure_primary_openai_client(reason="codex_stream_direct") - max_stream_retries = 1 - has_tool_calls = False - first_delta_fired = False - # Accumulate streamed text so we can recover if get_final_response() - # returns empty output (e.g. chatgpt.com backend-api sends - # response.incomplete instead of response.completed). - self._codex_streamed_text_parts: list = [] - for attempt in range(max_stream_retries + 1): - if self._interrupt_requested: - raise InterruptedError("Agent interrupted before Codex stream retry") - collected_output_items: list = [] - try: - with active_client.responses.stream(**api_kwargs) as stream: - for event in stream: - self._touch_activity("receiving stream response") - if self._interrupt_requested: - break - event_type = getattr(event, "type", "") - # Fire callbacks on text content deltas (suppress during tool calls) - if "output_text.delta" in event_type or event_type == "response.output_text.delta": - delta_text = getattr(event, "delta", "") - if delta_text: - self._codex_streamed_text_parts.append(delta_text) - if delta_text and not has_tool_calls: - if not first_delta_fired: - first_delta_fired = True - if on_first_delta: - try: - on_first_delta() - except Exception: - pass - self._fire_stream_delta(delta_text) - # Track tool calls to suppress text streaming - elif "function_call" in event_type: - has_tool_calls = True - # Fire reasoning callbacks - elif "reasoning" in event_type and "delta" in event_type: - reasoning_text = getattr(event, "delta", "") - if reasoning_text: - self._fire_reasoning_delta(reasoning_text) - # Collect completed output items — some backends - # (chatgpt.com/backend-api/codex) stream valid items - # via response.output_item.done but the SDK's - # get_final_response() returns an empty output list. - elif event_type == "response.output_item.done": - done_item = getattr(event, "item", None) - if done_item is not None: - collected_output_items.append(done_item) - # Log non-completed terminal events for diagnostics - elif event_type in {"response.incomplete", "response.failed"}: - resp_obj = getattr(event, "response", None) - status = getattr(resp_obj, "status", None) if resp_obj else None - incomplete_details = getattr(resp_obj, "incomplete_details", None) if resp_obj else None - logger.warning( - "Codex Responses stream received terminal event %s " - "(status=%s, incomplete_details=%s, streamed_chars=%d). %s", - event_type, status, incomplete_details, - sum(len(p) for p in self._codex_streamed_text_parts), - self._client_log_context(), - ) - final_response = stream.get_final_response() - # PATCH: ChatGPT Codex backend streams valid output items - # but get_final_response() can return an empty output list. - # Backfill from collected items or synthesize from deltas. - _out = getattr(final_response, "output", None) - if isinstance(_out, list) and not _out: - if collected_output_items: - final_response.output = list(collected_output_items) - logger.debug( - "Codex stream: backfilled %d output items from stream events", - len(collected_output_items), - ) - elif self._codex_streamed_text_parts and not has_tool_calls: - assembled = "".join(self._codex_streamed_text_parts) - final_response.output = [SimpleNamespace( - type="message", - role="assistant", - status="completed", - content=[SimpleNamespace(type="output_text", text=assembled)], - )] - logger.debug( - "Codex stream: synthesized output from %d text deltas (%d chars)", - len(self._codex_streamed_text_parts), len(assembled), - ) - return final_response - except (_httpx.RemoteProtocolError, _httpx.ReadTimeout, _httpx.ConnectError, ConnectionError) as exc: - if attempt < max_stream_retries: - logger.debug( - "Codex Responses stream transport failed (attempt %s/%s); retrying. %s error=%s", - attempt + 1, - max_stream_retries + 1, - self._client_log_context(), - exc, - ) - continue - logger.debug( - "Codex Responses stream transport failed; falling back to create(stream=True). %s error=%s", - self._client_log_context(), - exc, - ) - return self._run_codex_create_stream_fallback(api_kwargs, client=active_client) - except RuntimeError as exc: - err_text = str(exc) - missing_completed = "response.completed" in err_text - if missing_completed and attempt < max_stream_retries: - logger.debug( - "Responses stream closed before completion (attempt %s/%s); retrying. %s", - attempt + 1, - max_stream_retries + 1, - self._client_log_context(), - ) - continue - if missing_completed: - logger.debug( - "Responses stream did not emit response.completed; falling back to create(stream=True). %s", - self._client_log_context(), - ) - return self._run_codex_create_stream_fallback(api_kwargs, client=active_client) - raise + """Forwarder — see ``agent.codex_runtime.run_codex_stream``.""" + from agent.codex_runtime import run_codex_stream + return run_codex_stream(self, api_kwargs, client, on_first_delta) def _run_codex_create_stream_fallback(self, api_kwargs: dict, client: Any = None): - """Fallback path for stream completion edge cases on Codex-style Responses backends.""" - active_client = client or self._ensure_primary_openai_client(reason="codex_create_stream_fallback") - fallback_kwargs = dict(api_kwargs) - fallback_kwargs["stream"] = True - fallback_kwargs = self._get_transport().preflight_kwargs(fallback_kwargs, allow_stream=True) - stream_or_response = active_client.responses.create(**fallback_kwargs) - - # Compatibility shim for mocks or providers that still return a concrete response. - if hasattr(stream_or_response, "output"): - return stream_or_response - if not hasattr(stream_or_response, "__iter__"): - return stream_or_response - - terminal_response = None - collected_output_items: list = [] - collected_text_deltas: list = [] - try: - for event in stream_or_response: - self._touch_activity("receiving stream response") - event_type = getattr(event, "type", None) - if not event_type and isinstance(event, dict): - event_type = event.get("type") - - # Collect output items and text deltas for backfill - if event_type == "response.output_item.done": - done_item = getattr(event, "item", None) - if done_item is None and isinstance(event, dict): - done_item = event.get("item") - if done_item is not None: - collected_output_items.append(done_item) - elif event_type in {"response.output_text.delta",}: - delta = getattr(event, "delta", "") - if not delta and isinstance(event, dict): - delta = event.get("delta", "") - if delta: - collected_text_deltas.append(delta) - - if event_type not in {"response.completed", "response.incomplete", "response.failed"}: - continue - - terminal_response = getattr(event, "response", None) - if terminal_response is None and isinstance(event, dict): - terminal_response = event.get("response") - if terminal_response is not None: - # Backfill empty output from collected stream events - _out = getattr(terminal_response, "output", None) - if isinstance(_out, list) and not _out: - if collected_output_items: - terminal_response.output = list(collected_output_items) - logger.debug( - "Codex fallback stream: backfilled %d output items", - len(collected_output_items), - ) - elif collected_text_deltas: - assembled = "".join(collected_text_deltas) - terminal_response.output = [SimpleNamespace( - type="message", role="assistant", - status="completed", - content=[SimpleNamespace(type="output_text", text=assembled)], - )] - logger.debug( - "Codex fallback stream: synthesized from %d deltas (%d chars)", - len(collected_text_deltas), len(assembled), - ) - return terminal_response - finally: - close_fn = getattr(stream_or_response, "close", None) - if callable(close_fn): - try: - close_fn() - except Exception: - pass - - if terminal_response is not None: - return terminal_response - raise RuntimeError("Responses create(stream=True) fallback did not emit a terminal response.") + """Forwarder — see ``agent.codex_runtime.run_codex_create_stream_fallback``.""" + from agent.codex_runtime import run_codex_create_stream_fallback + return run_codex_create_stream_fallback(self, api_kwargs, client) def _try_refresh_codex_client_credentials(self, *, force: bool = True) -> bool: if self.api_mode != "codex_responses" or self.provider != "openai-codex": @@ -5657,81 +4713,9 @@ class AIAgent: classified_reason: Optional[FailoverReason] = None, error_context: Optional[Dict[str, Any]] = None, ) -> tuple[bool, bool]: - """Attempt credential recovery via pool rotation. - - Returns (recovered, has_retried_429). - On rate limits: first occurrence retries same credential (sets flag True). - second consecutive failure rotates to next credential. - On billing exhaustion: immediately rotates. - On auth failures: attempts token refresh before rotating. - - `classified_reason` lets the recovery path honor the structured error - classifier instead of relying only on raw HTTP codes. This matters for - providers that surface billing/rate-limit/auth conditions under a - different status code, such as Anthropic returning HTTP 400 for - "out of extra usage". - """ - pool = self._credential_pool - if pool is None: - return False, has_retried_429 - - effective_reason = classified_reason - if effective_reason is None: - if status_code == 402: - effective_reason = FailoverReason.billing - elif status_code == 429: - effective_reason = FailoverReason.rate_limit - elif status_code in {401, 403}: - effective_reason = FailoverReason.auth - - if effective_reason == FailoverReason.billing: - rotate_status = status_code if status_code is not None else 402 - next_entry = pool.mark_exhausted_and_rotate(status_code=rotate_status, error_context=error_context) - if next_entry is not None: - logger.info( - "Credential %s (billing) — rotated to pool entry %s", - rotate_status, - getattr(next_entry, "id", "?"), - ) - self._swap_credential(next_entry) - return True, False - return False, has_retried_429 - - if effective_reason == FailoverReason.rate_limit: - if not has_retried_429: - return False, True - rotate_status = status_code if status_code is not None else 429 - next_entry = pool.mark_exhausted_and_rotate(status_code=rotate_status, error_context=error_context) - if next_entry is not None: - logger.info( - "Credential %s (rate limit) — rotated to pool entry %s", - rotate_status, - getattr(next_entry, "id", "?"), - ) - self._swap_credential(next_entry) - return True, False - return False, True - - if effective_reason == FailoverReason.auth: - refreshed = pool.try_refresh_current() - if refreshed is not None: - logger.info(f"Credential auth failure — refreshed pool entry {getattr(refreshed, 'id', '?')}") - self._swap_credential(refreshed) - return True, has_retried_429 - # Refresh failed — rotate to next credential instead of giving up. - # The failed entry is already marked exhausted by try_refresh_current(). - rotate_status = status_code if status_code is not None else 401 - next_entry = pool.mark_exhausted_and_rotate(status_code=rotate_status, error_context=error_context) - if next_entry is not None: - logger.info( - "Credential %s (auth refresh failed) — rotated to pool entry %s", - rotate_status, - getattr(next_entry, "id", "?"), - ) - self._swap_credential(next_entry) - return True, False - - return False, has_retried_429 + """Forwarder — see ``agent.agent_runtime_helpers.recover_with_credential_pool``.""" + from agent.agent_runtime_helpers import recover_with_credential_pool + return recover_with_credential_pool(self, status_code=status_code, has_retried_429=has_retried_429, classified_reason=classified_reason, error_context=error_context) def _credential_pool_may_recover_rate_limit(self) -> bool: """Whether a rate-limit retry should wait for same-provider credentials.""" @@ -5965,171 +4949,16 @@ class AIAgent: # ── Per-turn primary restoration ───────────────────────────────────── def _restore_primary_runtime(self) -> bool: - """Restore the primary runtime at the start of a new turn. - - In long-lived CLI sessions a single AIAgent instance spans multiple - turns. Without restoration, one transient failure pins the session - to the fallback provider for every subsequent turn. Calling this at - the top of ``run_conversation()`` makes fallback turn-scoped. - - The gateway caches agents across messages (``_agent_cache`` in - ``gateway/run.py``), so this restoration IS needed there too. - """ - if not self._fallback_activated: - return False - - if getattr(self, "_rate_limited_until", 0) > time.monotonic(): - return False # primary still in rate-limit cooldown, stay on fallback - - rt = self._primary_runtime - try: - # ── Core runtime state ── - self.model = rt["model"] - self.provider = rt["provider"] - self.base_url = rt["base_url"] # setter updates _base_url_lower - self.api_mode = rt["api_mode"] - if hasattr(self, "_transport_cache"): - self._transport_cache.clear() - self.api_key = rt["api_key"] - self._client_kwargs = dict(rt["client_kwargs"]) - self._use_prompt_caching = rt["use_prompt_caching"] - # Default to native layout when the restored snapshot predates the - # native-vs-proxy split (older sessions saved before this PR). - self._use_native_cache_layout = rt.get( - "use_native_cache_layout", - self.api_mode == "anthropic_messages" and self.provider == "anthropic", - ) - - # ── Rebuild client for the primary provider ── - if self.api_mode == "anthropic_messages": - from agent.anthropic_adapter import build_anthropic_client - self._anthropic_api_key = rt["anthropic_api_key"] - self._anthropic_base_url = rt["anthropic_base_url"] - self._anthropic_client = build_anthropic_client( - rt["anthropic_api_key"], rt["anthropic_base_url"], - timeout=get_provider_request_timeout(self.provider, self.model), - ) - self._is_anthropic_oauth = rt["is_anthropic_oauth"] - self.client = None - else: - self.client = self._create_openai_client( - dict(rt["client_kwargs"]), - reason="restore_primary", - shared=True, - ) - - # ── Restore context engine state ── - cc = self.context_compressor - cc.update_model( - model=rt["compressor_model"], - context_length=rt["compressor_context_length"], - base_url=rt["compressor_base_url"], - api_key=rt["compressor_api_key"], - provider=rt["compressor_provider"], - ) - - # ── Reset fallback chain for the new turn ── - self._fallback_activated = False - self._fallback_index = 0 - - logging.info( - "Primary runtime restored for new turn: %s (%s)", - self.model, self.provider, - ) - return True - except Exception as e: - logging.warning("Failed to restore primary runtime: %s", e) - return False - - # Which error types indicate a transient transport failure worth - # one more attempt with a rebuilt client / connection pool. - _TRANSIENT_TRANSPORT_ERRORS = frozenset({ - "ReadTimeout", "ConnectTimeout", "PoolTimeout", - "ConnectError", "RemoteProtocolError", - "APIConnectionError", "APITimeoutError", - }) + """Forwarder — see ``agent.agent_runtime_helpers.restore_primary_runtime``.""" + from agent.agent_runtime_helpers import restore_primary_runtime + return restore_primary_runtime(self) def _try_recover_primary_transport( self, api_error: Exception, *, retry_count: int, max_retries: int, ) -> bool: - """Attempt one extra primary-provider recovery cycle for transient transport failures. - - After ``max_retries`` exhaust, rebuild the primary client (clearing - stale connection pools) and give it one more attempt before falling - back. This is most useful for direct endpoints (custom, Z.AI, - Anthropic, OpenAI, local models) where a TCP-level hiccup does not - mean the provider is down. - - Skipped for proxy/aggregator providers (OpenRouter, Nous) which - already manage connection pools and retries server-side — if our - retries through them are exhausted, one more rebuilt client won't help. - """ - if self._fallback_activated: - return False - - # Only for transient transport errors - error_type = type(api_error).__name__ - if error_type not in self._TRANSIENT_TRANSPORT_ERRORS: - return False - - # Skip for aggregator providers — they manage their own retry infra - if self._is_openrouter_url(): - return False - provider_lower = (self.provider or "").strip().lower() - if provider_lower in {"nous", "nous-research"}: - return False - - try: - # Close existing client to release stale connections - if getattr(self, "client", None) is not None: - try: - self._close_openai_client( - self.client, reason="primary_recovery", shared=True, - ) - except Exception: - pass - - # Rebuild from primary snapshot - rt = self._primary_runtime - self._client_kwargs = dict(rt["client_kwargs"]) - self.model = rt["model"] - self.provider = rt["provider"] - self.base_url = rt["base_url"] - self.api_mode = rt["api_mode"] - if hasattr(self, "_transport_cache"): - self._transport_cache.clear() - self.api_key = rt["api_key"] - - if self.api_mode == "anthropic_messages": - from agent.anthropic_adapter import build_anthropic_client - self._anthropic_api_key = rt["anthropic_api_key"] - self._anthropic_base_url = rt["anthropic_base_url"] - self._anthropic_client = build_anthropic_client( - rt["anthropic_api_key"], rt["anthropic_base_url"], - timeout=get_provider_request_timeout(self.provider, self.model), - ) - self._is_anthropic_oauth = rt["is_anthropic_oauth"] - self.client = None - else: - self.client = self._create_openai_client( - dict(rt["client_kwargs"]), - reason="primary_recovery", - shared=True, - ) - - wait_time = min(3 + retry_count, 8) - self._vprint( - f"{self.log_prefix}🔁 Transient {error_type} on {self.provider} — " - f"rebuilt client, waiting {wait_time}s before one last primary attempt.", - force=True, - ) - time.sleep(wait_time) - return True - except Exception as e: - logging.warning("Primary transport recovery failed: %s", e) - return False - - # ── End provider fallback ────────────────────────────────────────────── + """Forwarder — see ``agent.agent_runtime_helpers.try_recover_primary_transport``.""" + from agent.agent_runtime_helpers import try_recover_primary_transport + return try_recover_primary_transport(self, api_error, retry_count=retry_count, max_retries=max_retries) @staticmethod def _content_has_image_parts(content: Any) -> bool: @@ -6802,108 +5631,9 @@ class AIAgent: logger=None, session_id: str = None, ) -> int: - """Repair corrupted assistant tool-call argument JSON in-place.""" - log = logger or logging.getLogger(__name__) - if not isinstance(messages, list): - return 0 - - repaired = 0 - marker = AIAgent._TOOL_CALL_ARGUMENTS_CORRUPTION_MARKER - - def _prepend_marker(tool_msg: dict) -> None: - existing = tool_msg.get("content") - if isinstance(existing, str): - if not existing: - tool_msg["content"] = marker - elif not existing.startswith(marker): - tool_msg["content"] = f"{marker}\n{existing}" - return - if existing is None: - tool_msg["content"] = marker - return - try: - existing_text = json.dumps(existing) - except TypeError: - existing_text = str(existing) - tool_msg["content"] = f"{marker}\n{existing_text}" - - message_index = 0 - while message_index < len(messages): - msg = messages[message_index] - if not isinstance(msg, dict) or msg.get("role") != "assistant": - message_index += 1 - continue - - tool_calls = msg.get("tool_calls") - if not isinstance(tool_calls, list) or not tool_calls: - message_index += 1 - continue - - insert_at = message_index + 1 - for tool_call in tool_calls: - if not isinstance(tool_call, dict): - continue - function = tool_call.get("function") - if not isinstance(function, dict): - continue - - arguments = function.get("arguments") - if arguments is None or arguments == "": - function["arguments"] = "{}" - continue - if isinstance(arguments, str) and not arguments.strip(): - function["arguments"] = "{}" - continue - if not isinstance(arguments, str): - continue - - try: - json.loads(arguments) - except json.JSONDecodeError: - tool_call_id = tool_call.get("id") - function_name = function.get("name", "?") - preview = arguments[:80] - log.warning( - "Corrupted tool_call arguments repaired before request " - "(session=%s, message_index=%s, tool_call_id=%s, function=%s, preview=%r)", - session_id or "-", - message_index, - tool_call_id or "-", - function_name, - preview, - ) - function["arguments"] = "{}" - - existing_tool_msg = None - scan_index = message_index + 1 - while scan_index < len(messages): - candidate = messages[scan_index] - if not isinstance(candidate, dict) or candidate.get("role") != "tool": - break - if candidate.get("tool_call_id") == tool_call_id: - existing_tool_msg = candidate - break - scan_index += 1 - - if existing_tool_msg is None: - messages.insert( - insert_at, - { - "role": "tool", - "name": function_name if function_name != "?" else "", - "tool_call_id": tool_call_id, - "content": marker, - }, - ) - insert_at += 1 - else: - _prepend_marker(existing_tool_msg) - - repaired += 1 - - message_index += 1 - - return repaired + """Forwarder — see ``agent.agent_runtime_helpers.sanitize_tool_call_arguments``.""" + from agent.agent_runtime_helpers import sanitize_tool_call_arguments + return sanitize_tool_call_arguments(messages, logger=logger, session_id=session_id) def _should_sanitize_tool_calls(self) -> bool: """Determine if tool_calls need sanitization for strict APIs. @@ -11033,144 +9763,9 @@ class AIAgent: effective_task_id: str, should_review_memory: bool = False, ) -> Dict[str, Any]: - """Codex app-server runtime path. Hands the entire turn to a `codex - app-server` subprocess and projects its events back into Hermes' - messages list so memory/skill review keep working. - - Called from run_conversation() when self.api_mode == "codex_app_server". - Returns the same dict shape as the chat_completions path. - """ - from agent.transports.codex_app_server_session import CodexAppServerSession - - # Lazy session: one CodexAppServerSession per AIAgent instance. - # Spawned on first turn, reused across turns, closed at AIAgent - # shutdown (see _cleanup hook). - if not hasattr(self, "_codex_session") or self._codex_session is None: - cwd = getattr(self, "session_cwd", None) or os.getcwd() - # Approval callback: defer to Hermes' standard prompt flow if a - # CLI thread has installed one. Gateway / cron contexts get the - # codex-side fail-closed default. - try: - from tools.terminal_tool import _get_approval_callback - approval_callback = _get_approval_callback() - except Exception: - approval_callback = None - self._codex_session = CodexAppServerSession( - cwd=cwd, - approval_callback=approval_callback, - ) - - # NOTE: the user message is ALREADY appended to messages by the - # standard run_conversation() flow (line ~11823) before the early - # return reaches us. Do NOT append again — that would duplicate. - - try: - turn = self._codex_session.run_turn(user_input=user_message) - except Exception as exc: - logger.exception("codex app-server turn failed") - # Crash → unconditionally drop the session so the next turn - # respawns from scratch instead of reusing a dead client. - try: - self._codex_session.close() - except Exception: - pass - self._codex_session = None - return { - "final_response": ( - f"Codex app-server turn failed: {exc}. " - f"Fall back to default runtime with `/codex-runtime auto`." - ), - "messages": messages, - "api_calls": 0, - "completed": False, - "partial": True, - "error": str(exc), - } - - # If the turn signalled the underlying client is wedged (deadline - # blown, post-tool watchdog tripped, OAuth refresh died, subprocess - # exited), retire the session so the next turn respawns codex - # rather than riding the broken process. Mirrors openclaw beta.8's - # "retire timed-out app-server clients" fix. - if getattr(turn, "should_retire", False): - logger.warning( - "codex app-server session retired (turn error: %s)", - turn.error, - ) - try: - self._codex_session.close() - except Exception: - pass - self._codex_session = None - - # Splice projected messages into the conversation. The projector emits - # standard {role, content, tool_calls, tool_call_id} entries, which - # is exactly what curator.py / sessions DB expect. - if turn.projected_messages: - messages.extend(turn.projected_messages) - - # Counter ticks for the self-improvement loop. - # _turns_since_memory and _user_turn_count are ALREADY incremented - # in the run_conversation() pre-loop block (lines ~11793-11817) so we - # do NOT touch them here — that would double-count. - # Only _iters_since_skill needs explicit increment, since the - # chat_completions loop bumps it per tool iteration (line ~12110) - # and that loop is bypassed on this path. - self._iters_since_skill = ( - getattr(self, "_iters_since_skill", 0) + turn.tool_iterations - ) - - # Now check the skill nudge AFTER iters were incremented — same - # pattern the chat_completions path uses (line ~15432). - should_review_skills = False - if ( - self._skill_nudge_interval > 0 - and self._iters_since_skill >= self._skill_nudge_interval - and "skill_manage" in self.valid_tool_names - ): - should_review_skills = True - self._iters_since_skill = 0 - - # External memory provider sync (mirrors line ~15439). Skipped on - # interrupt/error to avoid feeding partial transcripts to memory. - if not turn.interrupted and turn.error is None: - try: - self._sync_external_memory_for_turn( - original_user_message=original_user_message, - final_response=turn.final_text, - interrupted=False, - ) - except Exception: - logger.debug("external memory sync raised", exc_info=True) - - # Background review fork — same cadence + signature as the default - # path (line ~15449). Only fires when a trigger actually tripped AND - # we have a real final response. - if ( - turn.final_text - and not turn.interrupted - and (should_review_memory or should_review_skills) - ): - try: - self._spawn_background_review( - messages_snapshot=list(messages), - review_memory=should_review_memory, - review_skills=should_review_skills, - ) - except Exception: - logger.debug("background review spawn raised", exc_info=True) - - return { - "final_response": turn.final_text, - "messages": messages, - "api_calls": 1, # one app-server "turn" maps to one logical API call - "completed": not turn.interrupted and turn.error is None, - "partial": turn.interrupted or turn.error is not None, - "error": turn.error, - "codex_thread_id": turn.thread_id, - "codex_turn_id": turn.turn_id, - } - + """Forwarder — see ``agent.codex_runtime.run_codex_app_server_turn``.""" + from agent.codex_runtime import run_codex_app_server_turn + return run_codex_app_server_turn(self, user_message=user_message, original_user_message=original_user_message, messages=messages, effective_task_id=effective_task_id, should_review_memory=should_review_memory) def main( query: str = None,