diff --git a/agent/agent_runtime_helpers.py b/agent/agent_runtime_helpers.py index 4efe5203421..797047f95d3 100644 --- a/agent/agent_runtime_helpers.py +++ b/agent/agent_runtime_helpers.py @@ -573,7 +573,7 @@ def recover_with_credential_pool( rotate_status = status_code if status_code is not None else 402 next_entry = pool.mark_exhausted_and_rotate(status_code=rotate_status, error_context=error_context) if next_entry is not None: - logger.info( + _ra().logger.info( "Credential %s (billing) — rotated to pool entry %s", rotate_status, getattr(next_entry, "id", "?"), @@ -588,7 +588,7 @@ def recover_with_credential_pool( rotate_status = status_code if status_code is not None else 429 next_entry = pool.mark_exhausted_and_rotate(status_code=rotate_status, error_context=error_context) if next_entry is not None: - logger.info( + _ra().logger.info( "Credential %s (rate limit) — rotated to pool entry %s", rotate_status, getattr(next_entry, "id", "?"), @@ -600,7 +600,7 @@ def recover_with_credential_pool( if effective_reason == FailoverReason.auth: refreshed = pool.try_refresh_current() if refreshed is not None: - logger.info(f"Credential auth failure — refreshed pool entry {getattr(refreshed, 'id', '?')}") + _ra().logger.info(f"Credential auth failure — refreshed pool entry {getattr(refreshed, 'id', '?')}") agent._swap_credential(refreshed) return True, has_retried_429 # Refresh failed — rotate to next credential instead of giving up. @@ -608,7 +608,7 @@ def recover_with_credential_pool( rotate_status = status_code if status_code is not None else 401 next_entry = pool.mark_exhausted_and_rotate(status_code=rotate_status, error_context=error_context) if next_entry is not None: - logger.info( + _ra().logger.info( "Credential %s (auth refresh failed) — rotated to pool entry %s", rotate_status, getattr(next_entry, "id", "?"), @@ -780,7 +780,7 @@ def drop_thinking_only_and_merge_users( else: merged.append(m) - logger.debug( + _ra().logger.debug( "Pre-call sanitizer: dropped %d thinking-only assistant turn(s), " "merged %d adjacent user message(s)", dropped, @@ -982,7 +982,7 @@ def dump_api_request_debug( try: api_key = getattr(agent.client, "api_key", None) except Exception as e: - logger.debug("Could not extract API key for debug dump: %s", e) + _ra().logger.debug("Could not extract API key for debug dump: %s", e) dump_payload: Dict[str, Any] = { "timestamp": datetime.now().isoformat(), @@ -1019,7 +1019,7 @@ def dump_api_request_debug( error_info["response_status"] = getattr(response_obj, "status_code", None) error_info["response_text"] = response_obj.text except Exception as e: - logger.debug("Could not extract error response details: %s", e) + _ra().logger.debug("Could not extract error response details: %s", e) dump_payload["error"] = error_info @@ -1166,7 +1166,7 @@ def create_openai_client(agent, client_kwargs: dict, *, reason: str, shared: boo from agent.copilot_acp_client import CopilotACPClient client = CopilotACPClient(**client_kwargs) - logger.info( + _ra().logger.info( "Copilot ACP client created (%s, shared=%s) %s", reason, shared, @@ -1182,7 +1182,7 @@ def create_openai_client(agent, client_kwargs: dict, *, reason: str, shared: boo if k in {"api_key", "base_url", "default_headers", "project_id", "timeout"} } client = GeminiCloudCodeClient(**safe_kwargs) - logger.info( + _ra().logger.info( "Gemini Cloud Code Assist client created (%s, shared=%s) %s", reason, shared, @@ -1203,7 +1203,7 @@ def create_openai_client(agent, client_kwargs: dict, *, reason: str, shared: boo if keepalive_http is not None: safe_kwargs["http_client"] = keepalive_http client = GeminiNativeClient(**safe_kwargs) - logger.info( + _ra().logger.info( "Gemini native client created (%s, shared=%s) %s", reason, shared, @@ -1234,7 +1234,7 @@ def create_openai_client(agent, client_kwargs: dict, *, reason: str, shared: boo # Uses the module-level `OpenAI` name, resolved lazily on first # access via __getattr__ below. Tests patch via `run_agent.OpenAI`. client = _ra().OpenAI(**client_kwargs) - logger.info( + _ra().logger.info( "OpenAI client created (%s, shared=%s) %s", reason, shared, @@ -1243,6 +1243,845 @@ def create_openai_client(agent, client_kwargs: dict, *, reason: str, shared: boo return client +def switch_model(agent, new_model, new_provider, api_key='', base_url='', api_mode=''): + """Switch the model/provider in-place for a live agent. + + Called by the /model command handlers (CLI and gateway) after + ``model_switch.switch_model()`` has resolved credentials and + validated the model. This method performs the actual runtime + swap: rebuilding clients, updating caching flags, and refreshing + the context compressor. + + The implementation mirrors ``_try_activate_fallback()`` for the + client-swap logic but also updates ``_primary_runtime`` so the + change persists across turns (unlike fallback which is + turn-scoped). + """ + from hermes_cli.providers import determine_api_mode + + # ── Determine api_mode if not provided ── + if not api_mode: + api_mode = determine_api_mode(new_provider, base_url) + + # Defense-in-depth: ensure OpenCode base_url doesn't carry a trailing + # /v1 into the anthropic_messages client, which would cause the SDK to + # hit /v1/v1/messages. `model_switch.switch_model()` already strips + # this, but we guard here so any direct callers (future code paths, + # tests) can't reintroduce the double-/v1 404 bug. + if ( + api_mode == "anthropic_messages" + and new_provider in {"opencode-zen", "opencode-go"} + and isinstance(base_url, str) + and base_url + ): + base_url = re.sub(r"/v1/?$", "", base_url) + + old_model = agent.model + old_provider = agent.provider + + # Clear the per-config context_length override so the new model's + # actual context window is resolved via get_model_context_length() + # instead of inheriting the stale value from the previous model. + agent._config_context_length = None + + # ── Swap core runtime fields ── + agent.model = new_model + agent.provider = new_provider + # Use new base_url when provided; only fall back to current when the + # new provider genuinely has no endpoint (e.g. native SDK providers). + # Without this guard the old provider's URL (e.g. Ollama's localhost + # address) would persist silently after switching to a cloud provider + # that returns an empty base_url string. + if base_url: + agent.base_url = base_url + agent.api_mode = api_mode + # Invalidate transport cache — new api_mode may need a different transport + if hasattr(agent, "_transport_cache"): + agent._transport_cache.clear() + if api_key: + agent.api_key = api_key + + # ── Build new client ── + if api_mode == "anthropic_messages": + from agent.anthropic_adapter import ( + build_anthropic_client, + resolve_anthropic_token, + _is_oauth_token, + ) + # Only fall back to ANTHROPIC_TOKEN when the provider is actually Anthropic. + # Other anthropic_messages providers (MiniMax, Alibaba, etc.) must use their own + # API key — falling back would send Anthropic credentials to third-party endpoints. + _is_native_anthropic = new_provider == "anthropic" + effective_key = (api_key or agent.api_key or resolve_anthropic_token() or "") if _is_native_anthropic else (api_key or agent.api_key or "") + agent.api_key = effective_key + agent._anthropic_api_key = effective_key + agent._anthropic_base_url = base_url or getattr(agent, "_anthropic_base_url", None) + agent._anthropic_client = build_anthropic_client( + effective_key, agent._anthropic_base_url, + timeout=get_provider_request_timeout(agent.provider, agent.model), + ) + agent._is_anthropic_oauth = _is_oauth_token(effective_key) if _is_native_anthropic else False + agent.client = None + agent._client_kwargs = {} + else: + effective_key = api_key or agent.api_key + effective_base = base_url or agent.base_url + agent._client_kwargs = { + "api_key": effective_key, + "base_url": effective_base, + } + _sm_timeout = get_provider_request_timeout(agent.provider, agent.model) + if _sm_timeout is not None: + agent._client_kwargs["timeout"] = _sm_timeout + agent.client = agent._create_openai_client( + dict(agent._client_kwargs), + reason="switch_model", + shared=True, + ) + + # ── Re-evaluate prompt caching ── + agent._use_prompt_caching, agent._use_native_cache_layout = ( + agent._anthropic_prompt_cache_policy( + provider=new_provider, + base_url=agent.base_url, + api_mode=api_mode, + model=new_model, + ) + ) + + # ── LM Studio: preload before probing context length ── + agent._ensure_lmstudio_runtime_loaded() + + # ── Update context compressor ── + if hasattr(agent, "context_compressor") and agent.context_compressor: + from agent.model_metadata import get_model_context_length + # Re-read custom_providers from live config so per-model + # context_length overrides are honored when switching to a + # custom provider mid-session (closes #15779). + _sm_custom_providers = None + try: + from hermes_cli.config import load_config, get_compatible_custom_providers + _sm_cfg = load_config() + _sm_custom_providers = get_compatible_custom_providers(_sm_cfg) + except Exception: + _sm_custom_providers = None + new_context_length = get_model_context_length( + agent.model, + base_url=agent.base_url, + api_key=agent.api_key, + provider=agent.provider, + config_context_length=getattr(agent, "_config_context_length", None), + custom_providers=_sm_custom_providers, + ) + agent.context_compressor.update_model( + model=agent.model, + context_length=new_context_length, + base_url=agent.base_url, + api_key=getattr(agent, "api_key", ""), + provider=agent.provider, + api_mode=agent.api_mode, + ) + + # ── Invalidate cached system prompt so it rebuilds next turn ── + agent._cached_system_prompt = None + + # ── Update _primary_runtime so the change persists across turns ── + _cc = agent.context_compressor if hasattr(agent, "context_compressor") and agent.context_compressor else None + agent._primary_runtime = { + "model": agent.model, + "provider": agent.provider, + "base_url": agent.base_url, + "api_mode": agent.api_mode, + "api_key": getattr(agent, "api_key", ""), + "client_kwargs": dict(agent._client_kwargs), + "use_prompt_caching": agent._use_prompt_caching, + "use_native_cache_layout": agent._use_native_cache_layout, + "compressor_model": getattr(_cc, "model", agent.model) if _cc else agent.model, + "compressor_base_url": getattr(_cc, "base_url", agent.base_url) if _cc else agent.base_url, + "compressor_api_key": getattr(_cc, "api_key", "") if _cc else "", + "compressor_provider": getattr(_cc, "provider", agent.provider) if _cc else agent.provider, + "compressor_context_length": _cc.context_length if _cc else 0, + "compressor_threshold_tokens": _cc.threshold_tokens if _cc else 0, + } + if api_mode == "anthropic_messages": + agent._primary_runtime.update({ + "anthropic_api_key": agent._anthropic_api_key, + "anthropic_base_url": agent._anthropic_base_url, + "is_anthropic_oauth": agent._is_anthropic_oauth, + }) + + # ── Reset fallback state ── + agent._fallback_activated = False + agent._fallback_index = 0 + + # When the user deliberately swaps primary providers (e.g. openrouter + # → anthropic), drop any fallback entries that target the OLD primary + # or the NEW one. The chain was seeded from config at agent init for + # the original provider — without pruning, a failed turn on the new + # primary silently re-activates the provider the user just rejected, + # which is exactly what was reported during TUI v2 blitz testing + # ("switched to anthropic, tui keeps trying openrouter"). + old_norm = (old_provider or "").strip().lower() + new_norm = (new_provider or "").strip().lower() + fallback_chain = list(getattr(agent, "_fallback_chain", []) or []) + if old_norm and new_norm and old_norm != new_norm: + fallback_chain = [ + entry for entry in fallback_chain + if (entry.get("provider") or "").strip().lower() not in {old_norm, new_norm} + ] + agent._fallback_chain = fallback_chain + agent._fallback_model = fallback_chain[0] if fallback_chain else None + + logging.info( + "Model switched in-place: %s (%s) -> %s (%s)", + old_model, old_provider, new_model, new_provider, + ) + + + +def invoke_tool(agent, function_name: str, function_args: dict, effective_task_id: str, + tool_call_id: Optional[str] = None, messages: list = None, + pre_tool_block_checked: bool = False) -> str: + """Invoke a single tool and return the result string. No display logic. + + Handles both agent-level tools (todo, memory, etc.) and registry-dispatched + tools. Used by the concurrent execution path; the sequential path retains + its own inline invocation for backward-compatible display handling. + """ + # Check plugin hooks for a block directive before executing anything. + block_message: Optional[str] = None + if not pre_tool_block_checked: + try: + from hermes_cli.plugins import get_pre_tool_call_block_message + block_message = get_pre_tool_call_block_message( + function_name, function_args, task_id=effective_task_id or "", + ) + except Exception: + pass + if block_message is not None: + return json.dumps({"error": block_message}, ensure_ascii=False) + + if function_name == "todo": + from tools.todo_tool import todo_tool as _todo_tool + return _todo_tool( + todos=function_args.get("todos"), + merge=function_args.get("merge", False), + store=agent._todo_store, + ) + elif function_name == "session_search": + session_db = agent._get_session_db_for_recall() + if not session_db: + from hermes_state import format_session_db_unavailable + return json.dumps({"success": False, "error": format_session_db_unavailable()}) + from tools.session_search_tool import session_search as _session_search + return _session_search( + query=function_args.get("query", ""), + role_filter=function_args.get("role_filter"), + limit=function_args.get("limit", 3), + db=session_db, + current_session_id=agent.session_id, + ) + elif function_name == "memory": + target = function_args.get("target", "memory") + from tools.memory_tool import memory_tool as _memory_tool + result = _memory_tool( + action=function_args.get("action"), + target=target, + content=function_args.get("content"), + old_text=function_args.get("old_text"), + store=agent._memory_store, + ) + # Bridge: notify external memory provider of built-in memory writes + if agent._memory_manager and function_args.get("action") in {"add", "replace"}: + try: + agent._memory_manager.on_memory_write( + function_args.get("action", ""), + target, + function_args.get("content", ""), + metadata=agent._build_memory_write_metadata( + task_id=effective_task_id, + tool_call_id=tool_call_id, + ), + ) + except Exception: + pass + return result + elif agent._memory_manager and agent._memory_manager.has_tool(function_name): + return agent._memory_manager.handle_tool_call(function_name, function_args) + elif function_name == "clarify": + from tools.clarify_tool import clarify_tool as _clarify_tool + return _clarify_tool( + question=function_args.get("question", ""), + choices=function_args.get("choices"), + callback=agent.clarify_callback, + ) + elif function_name == "delegate_task": + return agent._dispatch_delegate_task(function_args) + else: + return _ra().handle_function_call( + function_name, function_args, effective_task_id, + tool_call_id=tool_call_id, + session_id=agent.session_id or "", + enabled_tools=list(agent.valid_tool_names) if agent.valid_tool_names else None, + skip_pre_tool_call_hook=True, + ) + + + +def repair_tool_call(agent, tool_name: str) -> str | None: + """Attempt to repair a mismatched tool name before aborting. + + Models sometimes emit variants of a tool name that differ only + in casing, separators, or class-like suffixes. Normalize + aggressively before falling back to fuzzy match: + + 1. Lowercase direct match. + 2. Lowercase + hyphens/spaces -> underscores. + 3. CamelCase -> snake_case (TodoTool -> todo_tool). + 4. Strip trailing ``_tool`` / ``-tool`` / ``tool`` suffix that + Claude-style models sometimes tack on (TodoTool_tool -> + TodoTool -> Todo -> todo). Applied twice so double-tacked + suffixes like ``TodoTool_tool`` reduce all the way. + 5. Fuzzy match (difflib, cutoff=0.7). + + See #14784 for the original reports (TodoTool_tool, Patch_tool, + BrowserClick_tool were all returning "Unknown tool" before). + + Returns the repaired name if found in valid_tool_names, else None. + """ + import re + from difflib import get_close_matches + + if not tool_name: + return None + + def _norm(s: str) -> str: + return s.lower().replace("-", "_").replace(" ", "_") + + def _camel_snake(s: str) -> str: + return re.sub(r"(? str | None: + lc = s.lower() + for suffix in ("_tool", "-tool", "tool"): + if lc.endswith(suffix): + return s[: -len(suffix)].rstrip("_-") + return None + + # Cheap fast-paths first — these cover the common case. + lowered = tool_name.lower() + if lowered in agent.valid_tool_names: + return lowered + normalized = _norm(tool_name) + if normalized in agent.valid_tool_names: + return normalized + + # Build the full candidate set for class-like emissions. + cands: set[str] = {tool_name, lowered, normalized, _camel_snake(tool_name)} + # Strip trailing tool-suffix up to twice — TodoTool_tool needs it. + for _ in range(2): + extra: set[str] = set() + for c in cands: + stripped = _strip_tool_suffix(c) + if stripped: + extra.add(stripped) + extra.add(_norm(stripped)) + extra.add(_camel_snake(stripped)) + cands |= extra + + for c in cands: + if c and c in agent.valid_tool_names: + return c + + # Fuzzy match as last resort. + matches = get_close_matches(lowered, agent.valid_tool_names, n=1, cutoff=0.7) + if matches: + return matches[0] + + return None + + + +def sanitize_api_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Fix orphaned tool_call / tool_result pairs before every LLM call. + + Runs unconditionally — not gated on whether the context compressor + is present — so orphans from session loading or manual message + manipulation are always caught. + """ + # --- Role allowlist: drop messages with roles the API won't accept --- + filtered = [] + for msg in messages: + role = msg.get("role") + if role not in _ra().AIAgent._VALID_API_ROLES: + _ra().logger.debug( + "Pre-call sanitizer: dropping message with invalid role %r", + role, + ) + continue + filtered.append(msg) + messages = filtered + + surviving_call_ids: set = set() + for msg in messages: + if msg.get("role") == "assistant": + for tc in msg.get("tool_calls") or []: + cid = _ra().AIAgent._get_tool_call_id_static(tc) + if cid: + surviving_call_ids.add(cid) + + result_call_ids: set = set() + for msg in messages: + if msg.get("role") == "tool": + cid = msg.get("tool_call_id") + if cid: + result_call_ids.add(cid) + + # 1. Drop tool results with no matching assistant call + orphaned_results = result_call_ids - surviving_call_ids + if orphaned_results: + messages = [ + m for m in messages + if not (m.get("role") == "tool" and m.get("tool_call_id") in orphaned_results) + ] + _ra().logger.debug( + "Pre-call sanitizer: removed %d orphaned tool result(s)", + len(orphaned_results), + ) + + # 2. Inject stub results for calls whose result was dropped + missing_results = surviving_call_ids - result_call_ids + if missing_results: + patched: List[Dict[str, Any]] = [] + for msg in messages: + patched.append(msg) + if msg.get("role") == "assistant": + for tc in msg.get("tool_calls") or []: + cid = _ra().AIAgent._get_tool_call_id_static(tc) + if cid in missing_results: + patched.append({ + "role": "tool", + "name": _ra().AIAgent._get_tool_call_name_static(tc), + "content": "[Result unavailable — see context summary above]", + "tool_call_id": cid, + }) + messages = patched + _ra().logger.debug( + "Pre-call sanitizer: added %d stub tool result(s)", + len(missing_results), + ) + return messages + + + +def looks_like_codex_intermediate_ack( + agent, + user_message: str, + assistant_content: str, + messages: List[Dict[str, Any]], +) -> bool: + """Detect a planning/ack message that should continue instead of ending the turn.""" + if any(isinstance(msg, dict) and msg.get("role") == "tool" for msg in messages): + return False + + assistant_text = agent._strip_think_blocks(assistant_content or "").strip().lower() + if not assistant_text: + return False + if len(assistant_text) > 1200: + return False + + has_future_ack = bool( + re.search(r"\b(i['’]ll|i will|let me|i can do that|i can help with that)\b", assistant_text) + ) + if not has_future_ack: + return False + + action_markers = ( + "look into", + "look at", + "inspect", + "scan", + "check", + "analyz", + "review", + "explore", + "read", + "open", + "run", + "test", + "fix", + "debug", + "search", + "find", + "walkthrough", + "report back", + "summarize", + ) + workspace_markers = ( + "directory", + "current directory", + "current dir", + "cwd", + "repo", + "repository", + "codebase", + "project", + "folder", + "filesystem", + "file tree", + "files", + "path", + ) + + user_text = (user_message or "").strip().lower() + user_targets_workspace = ( + any(marker in user_text for marker in workspace_markers) + or "~/" in user_text + or "/" in user_text + ) + assistant_mentions_action = any(marker in assistant_text for marker in action_markers) + assistant_targets_workspace = any( + marker in assistant_text for marker in workspace_markers + ) + return (user_targets_workspace or assistant_targets_workspace) and assistant_mentions_action + + + + +def copy_reasoning_content_for_api(agent, source_msg: dict, api_msg: dict) -> None: + """Copy provider-facing reasoning fields onto an API replay message.""" + if source_msg.get("role") != "assistant": + return + + # 1. Explicit reasoning_content already set — preserve it verbatim + # (includes DeepSeek/Kimi's own space-placeholder written at creation + # time, and any valid reasoning content from the same provider). + # + # Exception: sessions persisted BEFORE #17341 have empty-string + # placeholders pinned at creation time. DeepSeek V4 Pro rejects + # those with HTTP 400. When the active provider enforces the + # thinking-mode echo, upgrade "" → " " on replay so stale history + # doesn't 400 the user on the next turn. + existing = source_msg.get("reasoning_content") + if isinstance(existing, str): + if existing == "" and agent._needs_thinking_reasoning_pad(): + api_msg["reasoning_content"] = " " + else: + api_msg["reasoning_content"] = existing + return + + needs_thinking_pad = agent._needs_thinking_reasoning_pad() + + # 2. Cross-provider poisoned history (#15748): on DeepSeek/Kimi, + # if the source turn has tool_calls AND a 'reasoning' field but no + # 'reasoning_content' key, the 'reasoning' text was written by a + # prior provider (e.g. MiniMax) — DeepSeek's own _build_assistant_message + # pins reasoning_content at creation time for tool-call turns, so the + # shape (reasoning set, reasoning_content absent, tool_calls present) + # is unreachable from same-provider DeepSeek history after this fix. + # Inject a single space to satisfy the API without leaking another + # provider's chain of thought to DeepSeek/Kimi. Space (not "") + # because DeepSeek V4 Pro rejects empty-string reasoning_content + # in thinking mode (refs #17341). + normalized_reasoning = source_msg.get("reasoning") + if ( + needs_thinking_pad + and source_msg.get("tool_calls") + and isinstance(normalized_reasoning, str) + and normalized_reasoning + ): + api_msg["reasoning_content"] = " " + return + + # 3. Healthy session: promote 'reasoning' field to 'reasoning_content' + # for providers that use the internal 'reasoning' key. + # This must happen before the unconditional empty-string fallback so + # genuine reasoning content is not overwritten (#15812 regression in + # PR #15478). + if isinstance(normalized_reasoning, str) and normalized_reasoning: + api_msg["reasoning_content"] = normalized_reasoning + return + + # 4. DeepSeek / Kimi thinking mode: all assistant messages need + # reasoning_content. Inject a single space to satisfy the provider's + # requirement when no explicit reasoning content is present. Covers + # both tool-call turns (already-poisoned history with no reasoning + # at all) and plain text turns. Space (not "") because DeepSeek V4 + # Pro tightened validation and rejects empty string with HTTP 400 + # ("The reasoning content in the thinking mode must be passed back + # to the API"). Refs #17341. + if needs_thinking_pad: + api_msg["reasoning_content"] = " " + return + + # 5. reasoning_content was present but not a string (e.g. None after + # context compaction). Don't pass null to the API. + api_msg.pop("reasoning_content", None) + + + +def cleanup_dead_connections(agent) -> bool: + """Detect and clean up dead TCP connections on the primary client. + + Inspects the httpx connection pool for sockets in unhealthy states + (CLOSE-WAIT, errors). If any are found, force-closes all sockets + and rebuilds the primary client from scratch. + + Returns True if dead connections were found and cleaned up. + """ + client = getattr(agent, "client", None) + if client is None: + return False + try: + http_client = getattr(client, "_client", None) + if http_client is None: + return False + transport = getattr(http_client, "_transport", None) + if transport is None: + return False + pool = getattr(transport, "_pool", None) + if pool is None: + return False + connections = ( + getattr(pool, "_connections", None) + or getattr(pool, "_pool", None) + or [] + ) + dead_count = 0 + for conn in list(connections): + # Check for connections that are idle but have closed sockets + stream = ( + getattr(conn, "_network_stream", None) + or getattr(conn, "_stream", None) + ) + if stream is None: + continue + sock = getattr(stream, "_sock", None) + if sock is None: + sock = getattr(stream, "stream", None) + if sock is not None: + sock = getattr(sock, "_sock", None) + if sock is None: + continue + # Probe socket health with a non-blocking recv peek + import socket as _socket + try: + sock.setblocking(False) + data = sock.recv(1, _socket.MSG_PEEK | _socket.MSG_DONTWAIT) + if data == b"": + dead_count += 1 + except BlockingIOError: + pass # No data available — socket is healthy + except OSError: + dead_count += 1 + finally: + try: + sock.setblocking(True) + except OSError: + pass + if dead_count > 0: + _ra().logger.warning( + "Found %d dead connection(s) in client pool — rebuilding client", + dead_count, + ) + agent._replace_primary_openai_client(reason="dead_connection_cleanup") + return True + except Exception as exc: + _ra().logger.debug("Dead connection check error: %s", exc) + return False + + + +def extract_api_error_context(error: Exception) -> Dict[str, Any]: + """Extract structured rate-limit details from provider errors.""" + context: Dict[str, Any] = {} + + body = getattr(error, "body", None) + payload = None + if isinstance(body, dict): + payload = body.get("error") if isinstance(body.get("error"), dict) else body + if isinstance(payload, dict): + reason = payload.get("code") or payload.get("error") + if isinstance(reason, str) and reason.strip(): + context["reason"] = reason.strip() + message = payload.get("message") or payload.get("error_description") + if isinstance(message, str) and message.strip(): + context["message"] = message.strip() + for key in ("resets_at", "reset_at"): + value = payload.get(key) + if value not in {None, ""}: + context["reset_at"] = value + break + retry_after = payload.get("retry_after") + if retry_after not in {None, ""} and "reset_at" not in context: + try: + context["reset_at"] = time.time() + float(retry_after) + except (TypeError, ValueError): + pass + + response = getattr(error, "response", None) + headers = getattr(response, "headers", None) + if headers: + retry_after = headers.get("retry-after") or headers.get("Retry-After") + if retry_after and "reset_at" not in context: + try: + context["reset_at"] = time.time() + float(retry_after) + except (TypeError, ValueError): + pass + ratelimit_reset = headers.get("x-ratelimit-reset") + if ratelimit_reset and "reset_at" not in context: + context["reset_at"] = ratelimit_reset + + if "message" not in context: + raw_message = str(error).strip() + if raw_message: + context["message"] = raw_message[:500] + + if "reset_at" not in context: + message = context.get("message") or "" + if isinstance(message, str): + delay_match = re.search(r"quotaResetDelay[:\s\"]+(\\d+(?:\\.\\d+)?)(ms|s)", message, re.IGNORECASE) + if delay_match: + value = float(delay_match.group(1)) + seconds = value / 1000.0 if delay_match.group(2).lower() == "ms" else value + context["reset_at"] = time.time() + seconds + else: + sec_match = re.search( + r"retry\s+(?:after\s+)?(\d+(?:\.\d+)?)\s*(?:sec|secs|seconds|s\b)", + message, + re.IGNORECASE, + ) + if sec_match: + context["reset_at"] = time.time() + float(sec_match.group(1)) + + return context + + + +def apply_pending_steer_to_tool_results(agent, messages: list, num_tool_msgs: int) -> None: + """Append any pending /steer text to the last tool result in this turn. + + Called at the end of a tool-call batch, before the next API call. + The steer is appended to the last ``role:"tool"`` message's content + with a clear marker so the model understands it came from the user + and NOT from the tool itself. Role alternation is preserved — + nothing new is inserted, we only modify existing content. + + Args: + messages: The running messages list. + num_tool_msgs: Number of tool results appended in this batch; + used to locate the tail slice safely. + """ + if num_tool_msgs <= 0 or not messages: + return + steer_text = agent._drain_pending_steer() + if not steer_text: + return + # Find the last tool-role message in the recent tail. Skipping + # non-tool messages defends against future code appending + # something else at the boundary. + target_idx = None + for j in range(len(messages) - 1, max(len(messages) - num_tool_msgs - 1, -1), -1): + msg = messages[j] + if isinstance(msg, dict) and msg.get("role") == "tool": + target_idx = j + break + if target_idx is None: + # No tool result in this batch (e.g. all skipped by interrupt); + # put the steer back so the caller's fallback path can deliver + # it as a normal next-turn user message. + _lock = getattr(agent, "_pending_steer_lock", None) + if _lock is not None: + with _lock: + if agent._pending_steer: + agent._pending_steer = agent._pending_steer + "\n" + steer_text + else: + agent._pending_steer = steer_text + else: + existing = getattr(agent, "_pending_steer", None) + agent._pending_steer = (existing + "\n" + steer_text) if existing else steer_text + return + marker = f"\n\nUser guidance: {steer_text}" + existing_content = messages[target_idx].get("content", "") + if not isinstance(existing_content, str): + # Anthropic multimodal content blocks — preserve them and append + # a text block at the end. + try: + blocks = list(existing_content) if existing_content else [] + blocks.append({"type": "text", "text": marker.lstrip()}) + messages[target_idx]["content"] = blocks + except Exception: + # Fall back to string replacement if content shape is unexpected. + messages[target_idx]["content"] = f"{existing_content}{marker}" + else: + messages[target_idx]["content"] = existing_content + marker + _ra().logger.info( + "Delivered /steer to agent after tool batch (%d chars): %s", + len(steer_text), + steer_text[:120] + ("..." if len(steer_text) > 120 else ""), + ) + + + +def force_close_tcp_sockets(client: Any) -> int: + """Force-close underlying TCP sockets to prevent CLOSE-WAIT accumulation. + + When a provider drops a connection mid-stream, httpx's ``client.close()`` + performs a graceful shutdown which leaves sockets in CLOSE-WAIT until the + OS times them out (often minutes). This method walks the httpx transport + pool and issues ``socket.shutdown(SHUT_RDWR)`` + ``socket.close()`` to + force an immediate TCP RST, freeing the file descriptors. + + Returns the number of sockets force-closed. + """ + import socket as _socket + + closed = 0 + try: + http_client = getattr(client, "_client", None) + if http_client is None: + return 0 + transport = getattr(http_client, "_transport", None) + if transport is None: + return 0 + pool = getattr(transport, "_pool", None) + if pool is None: + return 0 + # httpx uses httpcore connection pools; connections live in + # _connections (list) or _pool (list) depending on version. + connections = ( + getattr(pool, "_connections", None) + or getattr(pool, "_pool", None) + or [] + ) + for conn in list(connections): + stream = ( + getattr(conn, "_network_stream", None) + or getattr(conn, "_stream", None) + ) + if stream is None: + continue + sock = getattr(stream, "_sock", None) + if sock is None: + sock = getattr(stream, "stream", None) + if sock is not None: + sock = getattr(sock, "_sock", None) + if sock is None: + continue + try: + sock.shutdown(_socket.SHUT_RDWR) + except OSError: + pass + try: + sock.close() + except OSError: + pass + closed += 1 + except Exception as exc: + _ra().logger.debug("Force-close TCP sockets sweep error: %s", exc) + return closed + + __all__ = [ "convert_to_trajectory_format", @@ -1257,4 +2096,14 @@ __all__ = [ "dump_api_request_debug", "anthropic_prompt_cache_policy", "create_openai_client", + "switch_model", + "invoke_tool", + "repair_tool_call", + "sanitize_api_messages", + "looks_like_codex_intermediate_ack", + "copy_reasoning_content_for_api", + "cleanup_dead_connections", + "extract_api_error_context", + "apply_pending_steer_to_tool_results", + "force_close_tcp_sockets", ] diff --git a/run_agent.py b/run_agent.py index 05d648f94e2..af1483e0e32 100644 --- a/run_agent.py +++ b/run_agent.py @@ -492,198 +492,9 @@ class AIAgent: logger.debug("LM Studio preload skipped: %s", err) def switch_model(self, new_model, new_provider, api_key='', base_url='', api_mode=''): - """Switch the model/provider in-place for a live agent. - - Called by the /model command handlers (CLI and gateway) after - ``model_switch.switch_model()`` has resolved credentials and - validated the model. This method performs the actual runtime - swap: rebuilding clients, updating caching flags, and refreshing - the context compressor. - - The implementation mirrors ``_try_activate_fallback()`` for the - client-swap logic but also updates ``_primary_runtime`` so the - change persists across turns (unlike fallback which is - turn-scoped). - """ - from hermes_cli.providers import determine_api_mode - - # ── Determine api_mode if not provided ── - if not api_mode: - api_mode = determine_api_mode(new_provider, base_url) - - # Defense-in-depth: ensure OpenCode base_url doesn't carry a trailing - # /v1 into the anthropic_messages client, which would cause the SDK to - # hit /v1/v1/messages. `model_switch.switch_model()` already strips - # this, but we guard here so any direct callers (future code paths, - # tests) can't reintroduce the double-/v1 404 bug. - if ( - api_mode == "anthropic_messages" - and new_provider in {"opencode-zen", "opencode-go"} - and isinstance(base_url, str) - and base_url - ): - base_url = re.sub(r"/v1/?$", "", base_url) - - old_model = self.model - old_provider = self.provider - - # Clear the per-config context_length override so the new model's - # actual context window is resolved via get_model_context_length() - # instead of inheriting the stale value from the previous model. - self._config_context_length = None - - # ── Swap core runtime fields ── - self.model = new_model - self.provider = new_provider - # Use new base_url when provided; only fall back to current when the - # new provider genuinely has no endpoint (e.g. native SDK providers). - # Without this guard the old provider's URL (e.g. Ollama's localhost - # address) would persist silently after switching to a cloud provider - # that returns an empty base_url string. - if base_url: - self.base_url = base_url - self.api_mode = api_mode - # Invalidate transport cache — new api_mode may need a different transport - if hasattr(self, "_transport_cache"): - self._transport_cache.clear() - if api_key: - self.api_key = api_key - - # ── Build new client ── - if api_mode == "anthropic_messages": - from agent.anthropic_adapter import ( - build_anthropic_client, - resolve_anthropic_token, - _is_oauth_token, - ) - # Only fall back to ANTHROPIC_TOKEN when the provider is actually Anthropic. - # Other anthropic_messages providers (MiniMax, Alibaba, etc.) must use their own - # API key — falling back would send Anthropic credentials to third-party endpoints. - _is_native_anthropic = new_provider == "anthropic" - effective_key = (api_key or self.api_key or resolve_anthropic_token() or "") if _is_native_anthropic else (api_key or self.api_key or "") - self.api_key = effective_key - self._anthropic_api_key = effective_key - self._anthropic_base_url = base_url or getattr(self, "_anthropic_base_url", None) - self._anthropic_client = build_anthropic_client( - effective_key, self._anthropic_base_url, - timeout=get_provider_request_timeout(self.provider, self.model), - ) - self._is_anthropic_oauth = _is_oauth_token(effective_key) if _is_native_anthropic else False - self.client = None - self._client_kwargs = {} - else: - effective_key = api_key or self.api_key - effective_base = base_url or self.base_url - self._client_kwargs = { - "api_key": effective_key, - "base_url": effective_base, - } - _sm_timeout = get_provider_request_timeout(self.provider, self.model) - if _sm_timeout is not None: - self._client_kwargs["timeout"] = _sm_timeout - self.client = self._create_openai_client( - dict(self._client_kwargs), - reason="switch_model", - shared=True, - ) - - # ── Re-evaluate prompt caching ── - self._use_prompt_caching, self._use_native_cache_layout = ( - self._anthropic_prompt_cache_policy( - provider=new_provider, - base_url=self.base_url, - api_mode=api_mode, - model=new_model, - ) - ) - - # ── LM Studio: preload before probing context length ── - self._ensure_lmstudio_runtime_loaded() - - # ── Update context compressor ── - if hasattr(self, "context_compressor") and self.context_compressor: - from agent.model_metadata import get_model_context_length - # Re-read custom_providers from live config so per-model - # context_length overrides are honored when switching to a - # custom provider mid-session (closes #15779). - _sm_custom_providers = None - try: - from hermes_cli.config import load_config, get_compatible_custom_providers - _sm_cfg = load_config() - _sm_custom_providers = get_compatible_custom_providers(_sm_cfg) - except Exception: - _sm_custom_providers = None - new_context_length = get_model_context_length( - self.model, - base_url=self.base_url, - api_key=self.api_key, - provider=self.provider, - config_context_length=getattr(self, "_config_context_length", None), - custom_providers=_sm_custom_providers, - ) - self.context_compressor.update_model( - model=self.model, - context_length=new_context_length, - base_url=self.base_url, - api_key=getattr(self, "api_key", ""), - provider=self.provider, - api_mode=self.api_mode, - ) - - # ── Invalidate cached system prompt so it rebuilds next turn ── - self._cached_system_prompt = None - - # ── Update _primary_runtime so the change persists across turns ── - _cc = self.context_compressor if hasattr(self, "context_compressor") and self.context_compressor else None - self._primary_runtime = { - "model": self.model, - "provider": self.provider, - "base_url": self.base_url, - "api_mode": self.api_mode, - "api_key": getattr(self, "api_key", ""), - "client_kwargs": dict(self._client_kwargs), - "use_prompt_caching": self._use_prompt_caching, - "use_native_cache_layout": self._use_native_cache_layout, - "compressor_model": getattr(_cc, "model", self.model) if _cc else self.model, - "compressor_base_url": getattr(_cc, "base_url", self.base_url) if _cc else self.base_url, - "compressor_api_key": getattr(_cc, "api_key", "") if _cc else "", - "compressor_provider": getattr(_cc, "provider", self.provider) if _cc else self.provider, - "compressor_context_length": _cc.context_length if _cc else 0, - "compressor_threshold_tokens": _cc.threshold_tokens if _cc else 0, - } - if api_mode == "anthropic_messages": - self._primary_runtime.update({ - "anthropic_api_key": self._anthropic_api_key, - "anthropic_base_url": self._anthropic_base_url, - "is_anthropic_oauth": self._is_anthropic_oauth, - }) - - # ── Reset fallback state ── - self._fallback_activated = False - self._fallback_index = 0 - - # When the user deliberately swaps primary providers (e.g. openrouter - # → anthropic), drop any fallback entries that target the OLD primary - # or the NEW one. The chain was seeded from config at agent init for - # the original provider — without pruning, a failed turn on the new - # primary silently re-activates the provider the user just rejected, - # which is exactly what was reported during TUI v2 blitz testing - # ("switched to anthropic, tui keeps trying openrouter"). - old_norm = (old_provider or "").strip().lower() - new_norm = (new_provider or "").strip().lower() - fallback_chain = list(getattr(self, "_fallback_chain", []) or []) - if old_norm and new_norm and old_norm != new_norm: - fallback_chain = [ - entry for entry in fallback_chain - if (entry.get("provider") or "").strip().lower() not in {old_norm, new_norm} - ] - self._fallback_chain = fallback_chain - self._fallback_model = fallback_chain[0] if fallback_chain else None - - logging.info( - "Model switched in-place: %s (%s) -> %s (%s)", - old_model, old_provider, new_model, new_provider, - ) + """Forwarder — see ``agent.agent_runtime_helpers.switch_model``.""" + from agent.agent_runtime_helpers import switch_model + return switch_model(self, new_model, new_provider, api_key, base_url, api_mode) def _safe_print(self, *args, **kwargs): """Print that silently handles broken pipes / closed stdout. @@ -1134,71 +945,9 @@ class AIAgent: assistant_content: str, messages: List[Dict[str, Any]], ) -> bool: - """Detect a planning/ack message that should continue instead of ending the turn.""" - if any(isinstance(msg, dict) and msg.get("role") == "tool" for msg in messages): - return False - - assistant_text = self._strip_think_blocks(assistant_content or "").strip().lower() - if not assistant_text: - return False - if len(assistant_text) > 1200: - return False - - has_future_ack = bool( - re.search(r"\b(i['’]ll|i will|let me|i can do that|i can help with that)\b", assistant_text) - ) - if not has_future_ack: - return False - - action_markers = ( - "look into", - "look at", - "inspect", - "scan", - "check", - "analyz", - "review", - "explore", - "read", - "open", - "run", - "test", - "fix", - "debug", - "search", - "find", - "walkthrough", - "report back", - "summarize", - ) - workspace_markers = ( - "directory", - "current directory", - "current dir", - "cwd", - "repo", - "repository", - "codebase", - "project", - "folder", - "filesystem", - "file tree", - "files", - "path", - ) - - user_text = (user_message or "").strip().lower() - user_targets_workspace = ( - any(marker in user_text for marker in workspace_markers) - or "~/" in user_text - or "/" in user_text - ) - assistant_mentions_action = any(marker in assistant_text for marker in action_markers) - assistant_targets_workspace = any( - marker in assistant_text for marker in workspace_markers - ) - return (user_targets_workspace or assistant_targets_workspace) and assistant_mentions_action - + """Forwarder — see ``agent.agent_runtime_helpers.looks_like_codex_intermediate_ack``.""" + from agent.agent_runtime_helpers import looks_like_codex_intermediate_ack + return looks_like_codex_intermediate_ack(self, user_message, assistant_content, messages) def _extract_reasoning(self, assistant_message) -> Optional[str]: """Forwarder — see ``agent.agent_runtime_helpers.extract_reasoning``.""" @@ -1547,68 +1296,9 @@ class AIAgent: @staticmethod def _extract_api_error_context(error: Exception) -> Dict[str, Any]: - """Extract structured rate-limit details from provider errors.""" - context: Dict[str, Any] = {} - - body = getattr(error, "body", None) - payload = None - if isinstance(body, dict): - payload = body.get("error") if isinstance(body.get("error"), dict) else body - if isinstance(payload, dict): - reason = payload.get("code") or payload.get("error") - if isinstance(reason, str) and reason.strip(): - context["reason"] = reason.strip() - message = payload.get("message") or payload.get("error_description") - if isinstance(message, str) and message.strip(): - context["message"] = message.strip() - for key in ("resets_at", "reset_at"): - value = payload.get(key) - if value not in {None, ""}: - context["reset_at"] = value - break - retry_after = payload.get("retry_after") - if retry_after not in {None, ""} and "reset_at" not in context: - try: - context["reset_at"] = time.time() + float(retry_after) - except (TypeError, ValueError): - pass - - response = getattr(error, "response", None) - headers = getattr(response, "headers", None) - if headers: - retry_after = headers.get("retry-after") or headers.get("Retry-After") - if retry_after and "reset_at" not in context: - try: - context["reset_at"] = time.time() + float(retry_after) - except (TypeError, ValueError): - pass - ratelimit_reset = headers.get("x-ratelimit-reset") - if ratelimit_reset and "reset_at" not in context: - context["reset_at"] = ratelimit_reset - - if "message" not in context: - raw_message = str(error).strip() - if raw_message: - context["message"] = raw_message[:500] - - if "reset_at" not in context: - message = context.get("message") or "" - if isinstance(message, str): - delay_match = re.search(r"quotaResetDelay[:\s\"]+(\\d+(?:\\.\\d+)?)(ms|s)", message, re.IGNORECASE) - if delay_match: - value = float(delay_match.group(1)) - seconds = value / 1000.0 if delay_match.group(2).lower() == "ms" else value - context["reset_at"] = time.time() + seconds - else: - sec_match = re.search( - r"retry\s+(?:after\s+)?(\d+(?:\.\d+)?)\s*(?:sec|secs|seconds|s\b)", - message, - re.IGNORECASE, - ) - if sec_match: - context["reset_at"] = time.time() + float(sec_match.group(1)) - - return context + """Forwarder — see ``agent.agent_runtime_helpers.extract_api_error_context``.""" + from agent.agent_runtime_helpers import extract_api_error_context + return extract_api_error_context(error) def _usage_summary_for_api_request_hook(self, response: Any) -> Optional[Dict[str, Any]]: """Token buckets for ``post_api_request`` plugins (no raw ``response`` object).""" @@ -1965,67 +1655,9 @@ class AIAgent: return "\n".join(lines) def _apply_pending_steer_to_tool_results(self, messages: list, num_tool_msgs: int) -> None: - """Append any pending /steer text to the last tool result in this turn. - - Called at the end of a tool-call batch, before the next API call. - The steer is appended to the last ``role:"tool"`` message's content - with a clear marker so the model understands it came from the user - and NOT from the tool itself. Role alternation is preserved — - nothing new is inserted, we only modify existing content. - - Args: - messages: The running messages list. - num_tool_msgs: Number of tool results appended in this batch; - used to locate the tail slice safely. - """ - if num_tool_msgs <= 0 or not messages: - return - steer_text = self._drain_pending_steer() - if not steer_text: - return - # Find the last tool-role message in the recent tail. Skipping - # non-tool messages defends against future code appending - # something else at the boundary. - target_idx = None - for j in range(len(messages) - 1, max(len(messages) - num_tool_msgs - 1, -1), -1): - msg = messages[j] - if isinstance(msg, dict) and msg.get("role") == "tool": - target_idx = j - break - if target_idx is None: - # No tool result in this batch (e.g. all skipped by interrupt); - # put the steer back so the caller's fallback path can deliver - # it as a normal next-turn user message. - _lock = getattr(self, "_pending_steer_lock", None) - if _lock is not None: - with _lock: - if self._pending_steer: - self._pending_steer = self._pending_steer + "\n" + steer_text - else: - self._pending_steer = steer_text - else: - existing = getattr(self, "_pending_steer", None) - self._pending_steer = (existing + "\n" + steer_text) if existing else steer_text - return - marker = f"\n\nUser guidance: {steer_text}" - existing_content = messages[target_idx].get("content", "") - if not isinstance(existing_content, str): - # Anthropic multimodal content blocks — preserve them and append - # a text block at the end. - try: - blocks = list(existing_content) if existing_content else [] - blocks.append({"type": "text", "text": marker.lstrip()}) - messages[target_idx]["content"] = blocks - except Exception: - # Fall back to string replacement if content shape is unexpected. - messages[target_idx]["content"] = f"{existing_content}{marker}" - else: - messages[target_idx]["content"] = existing_content + marker - logger.info( - "Delivered /steer to agent after tool batch (%d chars): %s", - len(steer_text), - steer_text[:120] + ("..." if len(steer_text) > 120 else ""), - ) + """Forwarder — see ``agent.agent_runtime_helpers.apply_pending_steer_to_tool_results``.""" + from agent.agent_runtime_helpers import apply_pending_steer_to_tool_results + return apply_pending_steer_to_tool_results(self, messages, num_tool_msgs) def _touch_activity(self, desc: str) -> None: """Update the last-activity timestamp and description (thread-safe).""" @@ -2383,74 +2015,9 @@ class AIAgent: @staticmethod def _sanitize_api_messages(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """Fix orphaned tool_call / tool_result pairs before every LLM call. - - Runs unconditionally — not gated on whether the context compressor - is present — so orphans from session loading or manual message - manipulation are always caught. - """ - # --- Role allowlist: drop messages with roles the API won't accept --- - filtered = [] - for msg in messages: - role = msg.get("role") - if role not in AIAgent._VALID_API_ROLES: - logger.debug( - "Pre-call sanitizer: dropping message with invalid role %r", - role, - ) - continue - filtered.append(msg) - messages = filtered - - surviving_call_ids: set = set() - for msg in messages: - if msg.get("role") == "assistant": - for tc in msg.get("tool_calls") or []: - cid = AIAgent._get_tool_call_id_static(tc) - if cid: - surviving_call_ids.add(cid) - - result_call_ids: set = set() - for msg in messages: - if msg.get("role") == "tool": - cid = msg.get("tool_call_id") - if cid: - result_call_ids.add(cid) - - # 1. Drop tool results with no matching assistant call - orphaned_results = result_call_ids - surviving_call_ids - if orphaned_results: - messages = [ - m for m in messages - if not (m.get("role") == "tool" and m.get("tool_call_id") in orphaned_results) - ] - logger.debug( - "Pre-call sanitizer: removed %d orphaned tool result(s)", - len(orphaned_results), - ) - - # 2. Inject stub results for calls whose result was dropped - missing_results = surviving_call_ids - result_call_ids - if missing_results: - patched: List[Dict[str, Any]] = [] - for msg in messages: - patched.append(msg) - if msg.get("role") == "assistant": - for tc in msg.get("tool_calls") or []: - cid = AIAgent._get_tool_call_id_static(tc) - if cid in missing_results: - patched.append({ - "role": "tool", - "name": AIAgent._get_tool_call_name_static(tc), - "content": "[Result unavailable — see context summary above]", - "tool_call_id": cid, - }) - messages = patched - logger.debug( - "Pre-call sanitizer: added %d stub tool result(s)", - len(missing_results), - ) - return messages + """Forwarder — see ``agent.agent_runtime_helpers.sanitize_api_messages``.""" + from agent.agent_runtime_helpers import sanitize_api_messages + return sanitize_api_messages(messages) @staticmethod def _is_thinking_only_assistant(msg: Dict[str, Any]) -> bool: @@ -2564,76 +2131,9 @@ class AIAgent: return unique if len(unique) < len(tool_calls) else tool_calls def _repair_tool_call(self, tool_name: str) -> str | None: - """Attempt to repair a mismatched tool name before aborting. - - Models sometimes emit variants of a tool name that differ only - in casing, separators, or class-like suffixes. Normalize - aggressively before falling back to fuzzy match: - - 1. Lowercase direct match. - 2. Lowercase + hyphens/spaces -> underscores. - 3. CamelCase -> snake_case (TodoTool -> todo_tool). - 4. Strip trailing ``_tool`` / ``-tool`` / ``tool`` suffix that - Claude-style models sometimes tack on (TodoTool_tool -> - TodoTool -> Todo -> todo). Applied twice so double-tacked - suffixes like ``TodoTool_tool`` reduce all the way. - 5. Fuzzy match (difflib, cutoff=0.7). - - See #14784 for the original reports (TodoTool_tool, Patch_tool, - BrowserClick_tool were all returning "Unknown tool" before). - - Returns the repaired name if found in valid_tool_names, else None. - """ - import re - from difflib import get_close_matches - - if not tool_name: - return None - - def _norm(s: str) -> str: - return s.lower().replace("-", "_").replace(" ", "_") - - def _camel_snake(s: str) -> str: - return re.sub(r"(? str | None: - lc = s.lower() - for suffix in ("_tool", "-tool", "tool"): - if lc.endswith(suffix): - return s[: -len(suffix)].rstrip("_-") - return None - - # Cheap fast-paths first — these cover the common case. - lowered = tool_name.lower() - if lowered in self.valid_tool_names: - return lowered - normalized = _norm(tool_name) - if normalized in self.valid_tool_names: - return normalized - - # Build the full candidate set for class-like emissions. - cands: set[str] = {tool_name, lowered, normalized, _camel_snake(tool_name)} - # Strip trailing tool-suffix up to twice — TodoTool_tool needs it. - for _ in range(2): - extra: set[str] = set() - for c in cands: - stripped = _strip_tool_suffix(c) - if stripped: - extra.add(stripped) - extra.add(_norm(stripped)) - extra.add(_camel_snake(stripped)) - cands |= extra - - for c in cands: - if c and c in self.valid_tool_names: - return c - - # Fuzzy match as last resort. - matches = get_close_matches(lowered, self.valid_tool_names, n=1, cutoff=0.7) - if matches: - return matches[0] - - return None + """Forwarder — see ``agent.agent_runtime_helpers.repair_tool_call``.""" + from agent.agent_runtime_helpers import repair_tool_call + return repair_tool_call(self, tool_name) def _invalidate_system_prompt(self): """Forwarder — see ``agent.system_prompt.invalidate_system_prompt``.""" @@ -2745,62 +2245,9 @@ class AIAgent: @staticmethod def _force_close_tcp_sockets(client: Any) -> int: - """Force-close underlying TCP sockets to prevent CLOSE-WAIT accumulation. - - When a provider drops a connection mid-stream, httpx's ``client.close()`` - performs a graceful shutdown which leaves sockets in CLOSE-WAIT until the - OS times them out (often minutes). This method walks the httpx transport - pool and issues ``socket.shutdown(SHUT_RDWR)`` + ``socket.close()`` to - force an immediate TCP RST, freeing the file descriptors. - - Returns the number of sockets force-closed. - """ - import socket as _socket - - closed = 0 - try: - http_client = getattr(client, "_client", None) - if http_client is None: - return 0 - transport = getattr(http_client, "_transport", None) - if transport is None: - return 0 - pool = getattr(transport, "_pool", None) - if pool is None: - return 0 - # httpx uses httpcore connection pools; connections live in - # _connections (list) or _pool (list) depending on version. - connections = ( - getattr(pool, "_connections", None) - or getattr(pool, "_pool", None) - or [] - ) - for conn in list(connections): - stream = ( - getattr(conn, "_network_stream", None) - or getattr(conn, "_stream", None) - ) - if stream is None: - continue - sock = getattr(stream, "_sock", None) - if sock is None: - sock = getattr(stream, "stream", None) - if sock is not None: - sock = getattr(sock, "_sock", None) - if sock is None: - continue - try: - sock.shutdown(_socket.SHUT_RDWR) - except OSError: - pass - try: - sock.close() - except OSError: - pass - closed += 1 - except Exception as exc: - logger.debug("Force-close TCP sockets sweep error: %s", exc) - return closed + """Forwarder — see ``agent.agent_runtime_helpers.force_close_tcp_sockets``.""" + from agent.agent_runtime_helpers import force_close_tcp_sockets + return force_close_tcp_sockets(client) def _close_openai_client(self, client: Any, *, reason: str, shared: bool) -> None: if client is None: @@ -2860,74 +2307,9 @@ class AIAgent: return self.client def _cleanup_dead_connections(self) -> bool: - """Detect and clean up dead TCP connections on the primary client. - - Inspects the httpx connection pool for sockets in unhealthy states - (CLOSE-WAIT, errors). If any are found, force-closes all sockets - and rebuilds the primary client from scratch. - - Returns True if dead connections were found and cleaned up. - """ - client = getattr(self, "client", None) - if client is None: - return False - try: - http_client = getattr(client, "_client", None) - if http_client is None: - return False - transport = getattr(http_client, "_transport", None) - if transport is None: - return False - pool = getattr(transport, "_pool", None) - if pool is None: - return False - connections = ( - getattr(pool, "_connections", None) - or getattr(pool, "_pool", None) - or [] - ) - dead_count = 0 - for conn in list(connections): - # Check for connections that are idle but have closed sockets - stream = ( - getattr(conn, "_network_stream", None) - or getattr(conn, "_stream", None) - ) - if stream is None: - continue - sock = getattr(stream, "_sock", None) - if sock is None: - sock = getattr(stream, "stream", None) - if sock is not None: - sock = getattr(sock, "_sock", None) - if sock is None: - continue - # Probe socket health with a non-blocking recv peek - import socket as _socket - try: - sock.setblocking(False) - data = sock.recv(1, _socket.MSG_PEEK | _socket.MSG_DONTWAIT) - if data == b"": - dead_count += 1 - except BlockingIOError: - pass # No data available — socket is healthy - except OSError: - dead_count += 1 - finally: - try: - sock.setblocking(True) - except OSError: - pass - if dead_count > 0: - logger.warning( - "Found %d dead connection(s) in client pool — rebuilding client", - dead_count, - ) - self._replace_primary_openai_client(reason="dead_connection_cleanup") - return True - except Exception as exc: - logger.debug("Dead connection check error: %s", exc) - return False + """Forwarder — see ``agent.agent_runtime_helpers.cleanup_dead_connections``.""" + from agent.agent_runtime_helpers import cleanup_dead_connections + return cleanup_dead_connections(self) @staticmethod def _api_kwargs_have_image_parts(api_kwargs: dict) -> bool: @@ -4039,74 +3421,9 @@ class AIAgent: ) def _copy_reasoning_content_for_api(self, source_msg: dict, api_msg: dict) -> None: - """Copy provider-facing reasoning fields onto an API replay message.""" - if source_msg.get("role") != "assistant": - return - - # 1. Explicit reasoning_content already set — preserve it verbatim - # (includes DeepSeek/Kimi's own space-placeholder written at creation - # time, and any valid reasoning content from the same provider). - # - # Exception: sessions persisted BEFORE #17341 have empty-string - # placeholders pinned at creation time. DeepSeek V4 Pro rejects - # those with HTTP 400. When the active provider enforces the - # thinking-mode echo, upgrade "" → " " on replay so stale history - # doesn't 400 the user on the next turn. - existing = source_msg.get("reasoning_content") - if isinstance(existing, str): - if existing == "" and self._needs_thinking_reasoning_pad(): - api_msg["reasoning_content"] = " " - else: - api_msg["reasoning_content"] = existing - return - - needs_thinking_pad = self._needs_thinking_reasoning_pad() - - # 2. Cross-provider poisoned history (#15748): on DeepSeek/Kimi, - # if the source turn has tool_calls AND a 'reasoning' field but no - # 'reasoning_content' key, the 'reasoning' text was written by a - # prior provider (e.g. MiniMax) — DeepSeek's own _build_assistant_message - # pins reasoning_content at creation time for tool-call turns, so the - # shape (reasoning set, reasoning_content absent, tool_calls present) - # is unreachable from same-provider DeepSeek history after this fix. - # Inject a single space to satisfy the API without leaking another - # provider's chain of thought to DeepSeek/Kimi. Space (not "") - # because DeepSeek V4 Pro rejects empty-string reasoning_content - # in thinking mode (refs #17341). - normalized_reasoning = source_msg.get("reasoning") - if ( - needs_thinking_pad - and source_msg.get("tool_calls") - and isinstance(normalized_reasoning, str) - and normalized_reasoning - ): - api_msg["reasoning_content"] = " " - return - - # 3. Healthy session: promote 'reasoning' field to 'reasoning_content' - # for providers that use the internal 'reasoning' key. - # This must happen before the unconditional empty-string fallback so - # genuine reasoning content is not overwritten (#15812 regression in - # PR #15478). - if isinstance(normalized_reasoning, str) and normalized_reasoning: - api_msg["reasoning_content"] = normalized_reasoning - return - - # 4. DeepSeek / Kimi thinking mode: all assistant messages need - # reasoning_content. Inject a single space to satisfy the provider's - # requirement when no explicit reasoning content is present. Covers - # both tool-call turns (already-poisoned history with no reasoning - # at all) and plain text turns. Space (not "") because DeepSeek V4 - # Pro tightened validation and rejects empty string with HTTP 400 - # ("The reasoning content in the thinking mode must be passed back - # to the API"). Refs #17341. - if needs_thinking_pad: - api_msg["reasoning_content"] = " " - return - - # 5. reasoning_content was present but not a string (e.g. None after - # context compaction). Don't pass null to the API. - api_msg.pop("reasoning_content", None) + """Forwarder — see ``agent.agent_runtime_helpers.copy_reasoning_content_for_api``.""" + from agent.agent_runtime_helpers import copy_reasoning_content_for_api + return copy_reasoning_content_for_api(self, source_msg, api_msg) @staticmethod def _sanitize_tool_calls_for_strict_api(api_msg: dict) -> dict: @@ -4251,89 +3568,9 @@ class AIAgent: def _invoke_tool(self, function_name: str, function_args: dict, effective_task_id: str, tool_call_id: Optional[str] = None, messages: list = None, pre_tool_block_checked: bool = False) -> str: - """Invoke a single tool and return the result string. No display logic. - - Handles both agent-level tools (todo, memory, etc.) and registry-dispatched - tools. Used by the concurrent execution path; the sequential path retains - its own inline invocation for backward-compatible display handling. - """ - # Check plugin hooks for a block directive before executing anything. - block_message: Optional[str] = None - if not pre_tool_block_checked: - try: - from hermes_cli.plugins import get_pre_tool_call_block_message - block_message = get_pre_tool_call_block_message( - function_name, function_args, task_id=effective_task_id or "", - ) - except Exception: - pass - if block_message is not None: - return json.dumps({"error": block_message}, ensure_ascii=False) - - if function_name == "todo": - from tools.todo_tool import todo_tool as _todo_tool - return _todo_tool( - todos=function_args.get("todos"), - merge=function_args.get("merge", False), - store=self._todo_store, - ) - elif function_name == "session_search": - session_db = self._get_session_db_for_recall() - if not session_db: - from hermes_state import format_session_db_unavailable - return json.dumps({"success": False, "error": format_session_db_unavailable()}) - from tools.session_search_tool import session_search as _session_search - return _session_search( - query=function_args.get("query", ""), - role_filter=function_args.get("role_filter"), - limit=function_args.get("limit", 3), - db=session_db, - current_session_id=self.session_id, - ) - elif function_name == "memory": - target = function_args.get("target", "memory") - from tools.memory_tool import memory_tool as _memory_tool - result = _memory_tool( - action=function_args.get("action"), - target=target, - content=function_args.get("content"), - old_text=function_args.get("old_text"), - store=self._memory_store, - ) - # Bridge: notify external memory provider of built-in memory writes - if self._memory_manager and function_args.get("action") in {"add", "replace"}: - try: - self._memory_manager.on_memory_write( - function_args.get("action", ""), - target, - function_args.get("content", ""), - metadata=self._build_memory_write_metadata( - task_id=effective_task_id, - tool_call_id=tool_call_id, - ), - ) - except Exception: - pass - return result - elif self._memory_manager and self._memory_manager.has_tool(function_name): - return self._memory_manager.handle_tool_call(function_name, function_args) - elif function_name == "clarify": - from tools.clarify_tool import clarify_tool as _clarify_tool - return _clarify_tool( - question=function_args.get("question", ""), - choices=function_args.get("choices"), - callback=self.clarify_callback, - ) - elif function_name == "delegate_task": - return self._dispatch_delegate_task(function_args) - else: - return handle_function_call( - function_name, function_args, effective_task_id, - tool_call_id=tool_call_id, - session_id=self.session_id or "", - enabled_tools=list(self.valid_tool_names) if self.valid_tool_names else None, - skip_pre_tool_call_hook=True, - ) + """Forwarder — see ``agent.agent_runtime_helpers.invoke_tool``.""" + from agent.agent_runtime_helpers import invoke_tool + return invoke_tool(self, function_name, function_args, effective_task_id, tool_call_id, messages, pre_tool_block_checked) @staticmethod def _wrap_verbose(label: str, text: str, indent: str = " ") -> str: