From 9f408989c40c2f1ca5830bd571eb5e1701cad0ce Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sat, 16 May 2026 19:43:38 -0700 Subject: [PATCH] refactor(run_agent): extract __init__ (1,381 LOC) to agent/agent_init.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The largest method left on AIAgent (60+ parameters, the entire startup sequence — credential resolution, provider auto-detection, context engine bootstrap, memory store hydration, plugin lifecycle hooks) moves into agent/agent_init.py. AIAgent.__init__ is now a thin wrapper that calls agent.agent_init.init_agent(self, ...) with the original full parameter list preserved. Module-level run_agent names referenced in the body (_openrouter_prewarm_done, _qwen_portal_headers, _routermint_headers, _hermes_home, OpenAI, get_tool_definitions, check_toolset_requirements) are resolved through _ra() so test patches on those names keep working. agent_init's logger warnings are routed via _ra().logger so tests patching run_agent.logger capture them (TestStringKSuffixContextLengthWarns, TestCustomProvidersInvalidContextLengthWarns). Live E2E reconfirmed on three model paths (openai/gpt-5.4, anthropic/claude-sonnet-4.6, moonshotai/kimi-k2-thinking). tests/run_agent/ + tests/agent/: 4313 passed (same pre-existing test_auxiliary_client failure). run_agent.py: 5944 -> 4564 lines (-1380). Total reduction since baseline: 16083 -> 4564 (-11519, 72%). --- agent/agent_init.py | 1457 +++++++++++++++++++++++++++++++++++++++++++ run_agent.py | 1316 +------------------------------------- 2 files changed, 1460 insertions(+), 1313 deletions(-) create mode 100644 agent/agent_init.py diff --git a/agent/agent_init.py b/agent/agent_init.py new file mode 100644 index 00000000000..acae61487c4 --- /dev/null +++ b/agent/agent_init.py @@ -0,0 +1,1457 @@ +"""Implementation of :meth:`AIAgent.__init__` — extracted as a module function. + +``AIAgent.__init__`` is one of the longest methods in the codebase (60+ +parameters, ~1,400 lines of attribute initialization, provider +auto-detection, credential resolution, context-engine bootstrap, etc.). +Keeping it in ``run_agent.py`` bloats that file with code that's mostly +"setup state, then forget". + +After this extraction the body lives here as ``init_agent(agent, ...)`` +and :meth:`AIAgent.__init__` is a thin wrapper that calls +``init_agent(self, ...)``. All imports the body needs at module-load +time are listed below; the body also performs many lazy imports inside +its own scope that come along unchanged. + +Symbols that tests patch on ``run_agent.*`` (``OpenAI``, ``cleanup_vm``, +etc.) are resolved through :func:`_ra` so the patch contract is +preserved. +""" + +from __future__ import annotations + +import logging +import os +import re +import sys +import threading +import time +import uuid +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional +from urllib.parse import urlparse, parse_qs, urlunparse + +from agent.context_compressor import ContextCompressor +from agent.iteration_budget import IterationBudget +from agent.memory_manager import StreamingContextScrubber +from agent.model_metadata import ( + MINIMUM_CONTEXT_LENGTH, + fetch_model_metadata, + get_model_context_length, + is_local_endpoint, + query_ollama_num_ctx, +) +from agent.process_bootstrap import _install_safe_stdio +from agent.subdirectory_hints import SubdirectoryHintTracker +from agent.think_scrubber import StreamingThinkScrubber +from agent.tool_guardrails import ( + ToolCallGuardrailConfig, + ToolCallGuardrailController, + ToolGuardrailDecision, +) +from hermes_cli.config import cfg_get +from hermes_cli.timeouts import get_provider_request_timeout +from hermes_constants import get_hermes_home +from model_tools import check_toolset_requirements, get_tool_definitions +from utils import base_url_host_matches + +# Use the same logger name as run_agent so tests patching ``run_agent.logger`` +# capture our warnings. (run_agent.py also does +# ``logger = logging.getLogger(__name__)``, which resolves to "run_agent" +# from inside that module.) +logger = logging.getLogger("run_agent") + + +def _ra(): + """Lazy reference to ``run_agent`` so callers can patch + ``run_agent.OpenAI`` / ``run_agent.cleanup_vm`` / ... and have those + patches reach this code path. + """ + import run_agent + return run_agent + + +def init_agent( + agent, + base_url: str = None, + api_key: str = None, + provider: str = None, + api_mode: str = None, + acp_command: str = None, + acp_args: list[str] | None = None, + command: str = None, + args: list[str] | None = None, + model: str = "", + max_iterations: int = 90, # Default tool-calling iterations (shared with subagents) + tool_delay: float = 1.0, + enabled_toolsets: List[str] = None, + disabled_toolsets: List[str] = None, + save_trajectories: bool = False, + verbose_logging: bool = False, + quiet_mode: bool = False, + ephemeral_system_prompt: str = None, + log_prefix_chars: int = 100, + log_prefix: str = "", + providers_allowed: List[str] = None, + providers_ignored: List[str] = None, + providers_order: List[str] = None, + provider_sort: str = None, + provider_require_parameters: bool = False, + provider_data_collection: str = None, + openrouter_min_coding_score: Optional[float] = None, + session_id: str = None, + tool_progress_callback: callable = None, + tool_start_callback: callable = None, + tool_complete_callback: callable = None, + thinking_callback: callable = None, + reasoning_callback: callable = None, + clarify_callback: callable = None, + step_callback: callable = None, + stream_delta_callback: callable = None, + interim_assistant_callback: callable = None, + tool_gen_callback: callable = None, + status_callback: callable = None, + max_tokens: int = None, + reasoning_config: Dict[str, Any] = None, + service_tier: str = None, + request_overrides: Dict[str, Any] = None, + prefill_messages: List[Dict[str, Any]] = None, + platform: str = None, + user_id: str = None, + user_name: str = None, + chat_id: str = None, + chat_name: str = None, + chat_type: str = None, + thread_id: str = None, + gateway_session_key: str = None, + skip_context_files: bool = False, + load_soul_identity: bool = False, + skip_memory: bool = False, + session_db=None, + parent_session_id: str = None, + iteration_budget: "IterationBudget" = None, + fallback_model: Dict[str, Any] = None, + credential_pool=None, + checkpoints_enabled: bool = False, + checkpoint_max_snapshots: int = 20, + checkpoint_max_total_size_mb: int = 500, + checkpoint_max_file_size_mb: int = 10, + pass_session_id: bool = False, +): + """ + Initialize the AI Agent. + + Args: + base_url (str): Base URL for the model API (optional) + api_key (str): API key for authentication (optional, uses env var if not provided) + provider (str): Provider identifier (optional; used for telemetry/routing hints) + api_mode (str): API mode override: "chat_completions" or "codex_responses" + model (str): Model name to use (default: "anthropic/claude-opus-4.6") + max_iterations (int): Maximum number of tool calling iterations (default: 90) + tool_delay (float): Delay between tool calls in seconds (default: 1.0) + enabled_toolsets (List[str]): Only enable tools from these toolsets (optional) + disabled_toolsets (List[str]): Disable tools from these toolsets (optional) + save_trajectories (bool): Whether to save conversation trajectories to JSONL files (default: False) + verbose_logging (bool): Enable verbose logging for debugging (default: False) + quiet_mode (bool): Suppress progress output for clean CLI experience (default: False) + ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional) + log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 100) + log_prefix (str): Prefix to add to all log messages for identification in parallel processing (default: "") + providers_allowed (List[str]): OpenRouter providers to allow (optional) + providers_ignored (List[str]): OpenRouter providers to ignore (optional) + providers_order (List[str]): OpenRouter providers to try in order (optional) + provider_sort (str): Sort providers by price/throughput/latency (optional) + openrouter_min_coding_score (float): Coding-score floor (0.0-1.0) for the + openrouter/pareto-code router. Only applied when model == "openrouter/pareto-code". + None or empty = let OpenRouter pick the strongest available coder. + session_id (str): Pre-generated session ID for logging (optional, auto-generated if not provided) + tool_progress_callback (callable): Callback function(tool_name, args_preview) for progress notifications + clarify_callback (callable): Callback function(question, choices) -> str for interactive user questions. + Provided by the platform layer (CLI or gateway). If None, the clarify tool returns an error. + max_tokens (int): Maximum tokens for model responses (optional, uses model default if not set) + reasoning_config (Dict): OpenRouter reasoning configuration override (e.g. {"effort": "none"} to disable thinking). + If None, defaults to {"enabled": True, "effort": "medium"} for OpenRouter. Set to disable/customize reasoning. + prefill_messages (List[Dict]): Messages to prepend to conversation history as prefilled context. + Useful for injecting a few-shot example or priming the model's response style. + Example: [{"role": "user", "content": "Hi!"}, {"role": "assistant", "content": "Hello!"}] + NOTE: Anthropic Sonnet 4.6+ and Opus 4.6+ reject a conversation that ends on an + assistant-role message (400 error). For those models use structured outputs or + output_config.format instead of a trailing-assistant prefill. + platform (str): The interface platform the user is on (e.g. "cli", "telegram", "discord", "whatsapp"). + Used to inject platform-specific formatting hints into the system prompt. + skip_context_files (bool): If True, skip auto-injection of SOUL.md, AGENTS.md, and .cursorrules + into the system prompt. Use this for batch processing and data generation to avoid + polluting trajectories with user-specific persona or project instructions. + load_soul_identity (bool): If True, still use ~/.hermes/SOUL.md as the primary + identity even when skip_context_files=True. Project context files from the cwd + remain skipped. + """ + _install_safe_stdio() + + agent.model = model + agent.max_iterations = max_iterations + # Shared iteration budget — parent creates, children inherit. + # Consumed by every LLM turn across parent + all subagents. + agent.iteration_budget = iteration_budget or IterationBudget(max_iterations) + agent.tool_delay = tool_delay + agent.save_trajectories = save_trajectories + agent.verbose_logging = verbose_logging + agent.quiet_mode = quiet_mode + agent.ephemeral_system_prompt = ephemeral_system_prompt + agent.platform = platform # "cli", "telegram", "discord", "whatsapp", etc. + agent._user_id = user_id # Platform user identifier (gateway sessions) + agent._user_name = user_name + agent._chat_id = chat_id + agent._chat_name = chat_name + agent._chat_type = chat_type + agent._thread_id = thread_id + agent._gateway_session_key = gateway_session_key # Stable per-chat key (e.g. agent:main:telegram:dm:123) + # Pluggable print function — CLI replaces this with _cprint so that + # raw ANSI status lines are routed through prompt_toolkit's renderer + # instead of going directly to stdout where patch_stdout's StdoutProxy + # would mangle the escape sequences. None = use builtins.print. + agent._print_fn = None + agent.background_review_callback = None # Optional sync callback for gateway delivery + agent.skip_context_files = skip_context_files + agent.load_soul_identity = load_soul_identity + agent.pass_session_id = pass_session_id + agent._credential_pool = credential_pool + agent.log_prefix_chars = log_prefix_chars + agent.log_prefix = f"{log_prefix} " if log_prefix else "" + # Store effective base URL for feature detection (prompt caching, reasoning, etc.) + agent.base_url = base_url or "" + provider_name = provider.strip().lower() if isinstance(provider, str) and provider.strip() else None + agent.provider = provider_name or "" + agent.acp_command = acp_command or command + agent.acp_args = list(acp_args or args or []) + if api_mode in {"chat_completions", "codex_responses", "anthropic_messages", "bedrock_converse", "codex_app_server"}: + agent.api_mode = api_mode + elif agent.provider == "openai-codex": + agent.api_mode = "codex_responses" + elif agent.provider == "xai": + agent.api_mode = "codex_responses" + elif (provider_name is None) and ( + agent._base_url_hostname == "chatgpt.com" + and "/backend-api/codex" in agent._base_url_lower + ): + agent.api_mode = "codex_responses" + agent.provider = "openai-codex" + elif (provider_name is None) and agent._base_url_hostname == "api.x.ai": + agent.api_mode = "codex_responses" + agent.provider = "xai" + elif agent.provider == "anthropic" or (provider_name is None and agent._base_url_hostname == "api.anthropic.com"): + agent.api_mode = "anthropic_messages" + agent.provider = "anthropic" + elif agent._base_url_lower.rstrip("/").endswith("/anthropic"): + # Third-party Anthropic-compatible endpoints (e.g. MiniMax, DashScope) + # use a URL convention ending in /anthropic. Auto-detect these so the + # Anthropic Messages API adapter is used instead of chat completions. + agent.api_mode = "anthropic_messages" + elif agent.provider == "bedrock" or ( + agent._base_url_hostname.startswith("bedrock-runtime.") + and base_url_host_matches(agent._base_url_lower, "amazonaws.com") + ): + # AWS Bedrock — auto-detect from provider name or base URL + # (bedrock-runtime..amazonaws.com). + agent.api_mode = "bedrock_converse" + else: + agent.api_mode = "chat_completions" + + # Eagerly warm the transport cache so import errors surface at init, + # not mid-conversation. Also validates the api_mode is registered. + try: + agent._get_transport() + except Exception: + pass # Non-fatal — transport may not exist for all modes yet + + try: + from hermes_cli.model_normalize import ( + _AGGREGATOR_PROVIDERS, + normalize_model_for_provider, + ) + + if agent.provider not in _AGGREGATOR_PROVIDERS: + agent.model = normalize_model_for_provider(agent.model, agent.provider) + except Exception: + pass + + # GPT-5.x models usually require the Responses API path, but some + # providers have exceptions (for example Copilot's gpt-5-mini still + # uses chat completions). Also auto-upgrade for direct OpenAI URLs + # (api.openai.com) since all newer tool-calling models prefer + # Responses there. ACP runtimes are excluded: CopilotACPClient + # handles its own routing and does not implement the Responses API + # surface. + # When api_mode was explicitly provided, respect it — the user + # knows what their endpoint supports (#10473). + # Exception: Azure OpenAI serves gpt-5.x on /chat/completions and + # does NOT support the Responses API — skip the upgrade for Azure + # (openai.azure.com), even though it looks OpenAI-compatible. + if ( + api_mode is None + and agent.api_mode == "chat_completions" + and agent.provider != "copilot-acp" + and not str(agent.base_url or "").lower().startswith("acp://copilot") + and not str(agent.base_url or "").lower().startswith("acp+tcp://") + and not agent._is_azure_openai_url() + and ( + agent._is_direct_openai_url() + or agent._provider_model_requires_responses_api( + agent.model, + provider=agent.provider, + ) + ) + ): + agent.api_mode = "codex_responses" + # Invalidate the eager-warmed transport cache — api_mode changed + # from chat_completions to codex_responses after the warm at __init__. + if hasattr(agent, "_transport_cache"): + agent._transport_cache.clear() + + # Pre-warm OpenRouter model metadata cache in a background thread. + # fetch_model_metadata() is cached for 1 hour; this avoids a blocking + # HTTP request on the first API response when pricing is estimated. + # Use a process-level Event so this thread is only spawned once — a new + # AIAgent is created for every gateway request, so without the guard + # each message leaks one OS thread and the process eventually exhausts + # the system thread limit (RuntimeError: can't start new thread). + if (agent.provider == "openrouter" or agent._is_openrouter_url()) and \ + not _ra()._openrouter_prewarm_done.is_set(): + _ra()._openrouter_prewarm_done.set() + threading.Thread( + target=fetch_model_metadata, + daemon=True, + name="openrouter-prewarm", + ).start() + + agent.tool_progress_callback = tool_progress_callback + agent.tool_start_callback = tool_start_callback + agent.tool_complete_callback = tool_complete_callback + agent.suppress_status_output = False + agent.thinking_callback = thinking_callback + agent.reasoning_callback = reasoning_callback + agent.clarify_callback = clarify_callback + agent.step_callback = step_callback + agent.stream_delta_callback = stream_delta_callback + agent.interim_assistant_callback = interim_assistant_callback + agent.status_callback = status_callback + agent.tool_gen_callback = tool_gen_callback + + + # Tool execution state — allows _vprint during tool execution + # even when stream consumers are registered (no tokens streaming then) + agent._executing_tools = False + agent._tool_guardrails = ToolCallGuardrailController() + agent._tool_guardrail_halt_decision: ToolGuardrailDecision | None = None + + # Interrupt mechanism for breaking out of tool loops + agent._interrupt_requested = False + agent._interrupt_message = None # Optional message that triggered interrupt + agent._execution_thread_id: int | None = None # Set at run_conversation() start + agent._interrupt_thread_signal_pending = False + agent._client_lock = threading.RLock() + + # /steer mechanism — inject a user note into the next tool result + # without interrupting the agent. Unlike interrupt(), steer() does + # NOT set _interrupt_requested; it waits for the current tool batch + # to finish naturally, then the drain hook appends the text to the + # last tool result's content so the model sees it on its next + # iteration. Message-role alternation is preserved (we modify an + # existing tool message rather than inserting a new user turn). + agent._pending_steer: Optional[str] = None + agent._pending_steer_lock = threading.Lock() + + # Concurrent-tool worker thread tracking. `_execute_tool_calls_concurrent` + # runs each tool on its own ThreadPoolExecutor worker — those worker + # threads have tids distinct from `_execution_thread_id`, so + # `_set_interrupt(True, _execution_thread_id)` alone does NOT cause + # `is_interrupted()` inside the worker to return True. Track the + # workers here so `interrupt()` / `clear_interrupt()` can fan out to + # their tids explicitly. + agent._tool_worker_threads: set[int] = set() + agent._tool_worker_threads_lock = threading.Lock() + + # Subagent delegation state + agent._delegate_depth = 0 # 0 = top-level agent, incremented for children + agent._active_children = [] # Running child AIAgents (for interrupt propagation) + agent._active_children_lock = threading.Lock() + + # Store OpenRouter provider preferences + agent.providers_allowed = providers_allowed + agent.providers_ignored = providers_ignored + agent.providers_order = providers_order + agent.provider_sort = provider_sort + agent.provider_require_parameters = provider_require_parameters + agent.provider_data_collection = provider_data_collection + agent.openrouter_min_coding_score = openrouter_min_coding_score + + # Store toolset filtering options + agent.enabled_toolsets = enabled_toolsets + agent.disabled_toolsets = disabled_toolsets + + # Model response configuration + agent.max_tokens = max_tokens # None = use model default + agent.reasoning_config = reasoning_config # None = use default (medium for OpenRouter) + agent.service_tier = service_tier + agent.request_overrides = dict(request_overrides or {}) + agent.prefill_messages = prefill_messages or [] # Prefilled conversation turns + agent._force_ascii_payload = False + + # Anthropic prompt caching: auto-enabled for Claude models on native + # Anthropic, OpenRouter, and third-party gateways that speak the + # Anthropic protocol (``api_mode == 'anthropic_messages'``). Reduces + # input costs by ~75% on multi-turn conversations. Uses system_and_3 + # strategy (4 breakpoints). See ``_anthropic_prompt_cache_policy`` + # for the layout-vs-transport decision. + agent._use_prompt_caching, agent._use_native_cache_layout = ( + agent._anthropic_prompt_cache_policy() + ) + # Anthropic supports "5m" (default) and "1h" cache TTL tiers. Read from + # config.yaml under prompt_caching.cache_ttl; unknown values keep "5m". + # 1h tier costs 2x on write vs 1.25x for 5m, but amortizes across long + # sessions with >5-minute pauses between turns (#14971). + agent._cache_ttl = "5m" + try: + from hermes_cli.config import load_config as _load_pc_cfg + + _pc_cfg = _load_pc_cfg().get("prompt_caching", {}) or {} + _ttl = _pc_cfg.get("cache_ttl", "5m") + if _ttl in {"5m", "1h"}: + agent._cache_ttl = _ttl + except Exception: + pass + + # Iteration budget: the LLM is only notified when it actually exhausts + # the iteration budget (api_call_count >= max_iterations). At that + # point we inject ONE message, allow one final API call, and if the + # model doesn't produce a text response, force a user-message asking + # it to summarise. No intermediate pressure warnings — they caused + # models to "give up" prematurely on complex tasks (#7915). + agent._budget_exhausted_injected = False + agent._budget_grace_call = False + + # Activity tracking — updated on each API call, tool execution, and + # stream chunk. Used by the gateway timeout handler to report what the + # agent was doing when it was killed, and by the "still working" + # notifications to show progress. + agent._last_activity_ts: float = time.time() + agent._last_activity_desc: str = "initializing" + agent._current_tool: str | None = None + agent._api_call_count: int = 0 + + # Rate limit tracking — updated from x-ratelimit-* response headers + # after each API call. Accessed by /usage slash command. + agent._rate_limit_state: Optional["RateLimitState"] = None + + # OpenRouter response cache hit counter — incremented when + # X-OpenRouter-Cache-Status: HIT is seen in streaming response headers. + agent._or_cache_hits: int = 0 + + # Centralized logging — agent.log (INFO+) and errors.log (WARNING+) + # both live under ~/.hermes/logs/. Idempotent, so gateway mode + # (which creates a new AIAgent per message) won't duplicate handlers. + from hermes_logging import setup_logging, setup_verbose_logging + setup_logging(hermes_home=_ra()._hermes_home) + + if agent.verbose_logging: + setup_verbose_logging() + _ra().logger.info("Verbose logging enabled (third-party library logs suppressed)") + elif agent.quiet_mode: + # In quiet mode (CLI default), keep console output clean — + # but DO NOT raise per-logger levels. Doing so prevents the + # root logger's file handlers (agent.log, errors.log) from + # ever seeing the records, because Python checks + # logger.isEnabledFor() before handler propagation. We rely + # on the fact that hermes_logging.setup_logging() does not + # install a console StreamHandler in quiet mode — so INFO + # records flow to the file handlers but never reach a + # console. Any future noise reduction belongs at the + # handler level inside hermes_logging.py, not here. + pass + + # Internal stream callback (set during streaming TTS). + # Initialized here so _vprint can reference it before run_conversation. + agent._stream_callback = None + # Deferred paragraph break flag — set after tool iterations so a + # single "\n\n" is prepended to the next real text delta. + agent._stream_needs_break = False + # Stateful scrubber for spans split across stream + # deltas (#5719). sanitize_context() alone can't survive chunk + # boundaries because the block regex needs both tags in one string. + agent._stream_context_scrubber = StreamingContextScrubber() + # Stateful scrubber for reasoning/thinking tags in streamed deltas + # (#17924). Replaces the per-delta _strip_think_blocks regex that + # destroyed downstream state (e.g. MiniMax-M2.7 streaming + # '' as delta1 and 'Let me check' as delta2 — the regex + # erased delta1, so downstream state machines never learned a + # block was open and leaked delta2 as content). + agent._stream_think_scrubber = StreamingThinkScrubber() + # Visible assistant text already delivered through live token callbacks + # during the current model response. Used to avoid re-sending the same + # commentary when the provider later returns it as a completed interim + # assistant message. + agent._current_streamed_assistant_text = "" + + # Optional current-turn user-message override used when the API-facing + # user message intentionally differs from the persisted transcript + # (e.g. CLI voice mode adds a temporary prefix for the live call only). + agent._persist_user_message_idx = None + agent._persist_user_message_override = None + + # Cache anthropic image-to-text fallbacks per image payload/URL so a + # single tool loop does not repeatedly re-run auxiliary vision on the + # same image history. + agent._anthropic_image_fallback_cache: Dict[str, str] = {} + + # Initialize LLM client via centralized provider router. + # The router handles auth resolution, base URL, headers, and + # Codex/Anthropic wrapping for all known providers. + # raw_codex=True because the main agent needs direct responses.stream() + # access for Codex Responses API streaming. + agent._anthropic_client = None + agent._is_anthropic_oauth = False + + # Resolve per-provider / per-model request timeout once up front so + # every client construction path below (Anthropic native, OpenAI-wire, + # router-based implicit auth) can apply it consistently. Bedrock + # Claude uses its own timeout path and is not covered here. + _provider_timeout = get_provider_request_timeout(agent.provider, agent.model) + + if agent.api_mode == "anthropic_messages": + from agent.anthropic_adapter import build_anthropic_client, resolve_anthropic_token + # Bedrock + Claude → use AnthropicBedrock SDK for full feature parity + # (prompt caching, thinking budgets, adaptive thinking). + _is_bedrock_anthropic = agent.provider == "bedrock" + if _is_bedrock_anthropic: + from agent.anthropic_adapter import build_anthropic_bedrock_client + _region_match = re.search(r"bedrock-runtime\.([a-z0-9-]+)\.", base_url or "") + _br_region = _region_match.group(1) if _region_match else "us-east-1" + agent._bedrock_region = _br_region + agent._anthropic_client = build_anthropic_bedrock_client(_br_region) + agent._anthropic_api_key = "aws-sdk" + agent._anthropic_base_url = base_url + agent._is_anthropic_oauth = False + agent.api_key = "aws-sdk" + agent.client = None + agent._client_kwargs = {} + if not agent.quiet_mode: + print(f"🤖 AI Agent initialized with model: {agent.model} (AWS Bedrock + AnthropicBedrock SDK, {_br_region})") + else: + # Only fall back to ANTHROPIC_TOKEN when the provider is actually Anthropic. + # Other anthropic_messages providers (MiniMax, Alibaba, etc.) must use their own API key. + # Falling back would send Anthropic credentials to third-party endpoints (Fixes #1739, #minimax-401). + _is_native_anthropic = agent.provider == "anthropic" + effective_key = (api_key or resolve_anthropic_token() or "") if _is_native_anthropic else (api_key or "") + agent.api_key = effective_key + agent._anthropic_api_key = effective_key + agent._anthropic_base_url = base_url + # Only mark the session as OAuth-authenticated when the token + # genuinely belongs to native Anthropic. Third-party providers + # (MiniMax, Kimi, GLM, LiteLLM proxies) that accept the + # Anthropic protocol must never trip OAuth code paths — doing + # so injects Claude-Code identity headers and system prompts + # that cause 401/403 on their endpoints. Guards #1739 and + # the third-party identity-injection bug. + from agent.anthropic_adapter import _is_oauth_token as _is_oat + agent._is_anthropic_oauth = _is_oat(effective_key) if _is_native_anthropic else False + agent._anthropic_client = build_anthropic_client(effective_key, base_url, timeout=_provider_timeout) + # No OpenAI client needed for Anthropic mode + agent.client = None + agent._client_kwargs = {} + if not agent.quiet_mode: + print(f"🤖 AI Agent initialized with model: {agent.model} (Anthropic native)") + if effective_key and len(effective_key) > 12: + print(f"🔑 Using token: {effective_key[:8]}...{effective_key[-4:]}") + elif agent.api_mode == "bedrock_converse": + # AWS Bedrock — uses boto3 directly, no OpenAI client needed. + # Region is extracted from the base_url or defaults to us-east-1. + _region_match = re.search(r"bedrock-runtime\.([a-z0-9-]+)\.", base_url or "") + agent._bedrock_region = _region_match.group(1) if _region_match else "us-east-1" + # Guardrail config — read from config.yaml at init time. + agent._bedrock_guardrail_config = None + try: + from hermes_cli.config import load_config as _load_br_cfg + _gr = _load_br_cfg().get("bedrock", {}).get("guardrail", {}) + if _gr.get("guardrail_identifier") and _gr.get("guardrail_version"): + agent._bedrock_guardrail_config = { + "guardrailIdentifier": _gr["guardrail_identifier"], + "guardrailVersion": _gr["guardrail_version"], + } + if _gr.get("stream_processing_mode"): + agent._bedrock_guardrail_config["streamProcessingMode"] = _gr["stream_processing_mode"] + if _gr.get("trace"): + agent._bedrock_guardrail_config["trace"] = _gr["trace"] + except Exception: + pass + agent.client = None + agent._client_kwargs = {} + if not agent.quiet_mode: + _gr_label = " + Guardrails" if agent._bedrock_guardrail_config else "" + print(f"🤖 AI Agent initialized with model: {agent.model} (AWS Bedrock, {agent._bedrock_region}{_gr_label})") + else: + if api_key and base_url: + # Explicit credentials from CLI/gateway — construct directly. + # The runtime provider resolver already handled auth for us. + # Extract query params (e.g. Azure api-version) from base_url + # and pass via default_query to prevent loss during SDK URL + # joining (httpx drops query string when joining paths). + _parsed_url = urlparse(base_url) + if _parsed_url.query: + _clean_url = urlunparse(_parsed_url._replace(query="")) + _query_params = { + k: v[0] for k, v in parse_qs(_parsed_url.query).items() + } + client_kwargs = { + "api_key": api_key, + "base_url": _clean_url, + "default_query": _query_params, + } + else: + client_kwargs = {"api_key": api_key, "base_url": base_url} + if _provider_timeout is not None: + client_kwargs["timeout"] = _provider_timeout + if agent.provider == "copilot-acp": + client_kwargs["command"] = agent.acp_command + client_kwargs["args"] = agent.acp_args + effective_base = base_url + if base_url_host_matches(effective_base, "openrouter.ai"): + from agent.auxiliary_client import build_or_headers + client_kwargs["default_headers"] = build_or_headers() + elif base_url_host_matches(effective_base, "api.routermint.com"): + client_kwargs["default_headers"] = _ra()._routermint_headers() + elif base_url_host_matches(effective_base, "api.githubcopilot.com"): + from hermes_cli.models import copilot_default_headers + + client_kwargs["default_headers"] = copilot_default_headers() + elif base_url_host_matches(effective_base, "api.kimi.com"): + client_kwargs["default_headers"] = { + "User-Agent": "claude-code/0.1.0", + } + elif base_url_host_matches(effective_base, "portal.qwen.ai"): + client_kwargs["default_headers"] = _ra()._qwen_portal_headers() + elif base_url_host_matches(effective_base, "chatgpt.com"): + from agent.auxiliary_client import _codex_cloudflare_headers + client_kwargs["default_headers"] = _codex_cloudflare_headers(api_key) + elif "default_headers" not in client_kwargs: + # Fall back to profile.default_headers for providers that + # declare custom headers (e.g. Vercel AI Gateway attribution, + # Kimi User-Agent on non-kimi.com endpoints). + try: + from providers import get_provider_profile as _gpf + _ph = _gpf(agent.provider) + if _ph and _ph.default_headers: + client_kwargs["default_headers"] = dict(_ph.default_headers) + except Exception: + pass + else: + # No explicit creds — use the centralized provider router + from agent.auxiliary_client import resolve_provider_client + _routed_client, _ = resolve_provider_client( + agent.provider or "auto", model=agent.model, raw_codex=True) + if _routed_client is not None: + client_kwargs = { + "api_key": _routed_client.api_key, + "base_url": str(_routed_client.base_url), + } + if _provider_timeout is not None: + client_kwargs["timeout"] = _provider_timeout + # Preserve any default_headers the router set + if hasattr(_routed_client, '_default_headers') and _routed_client._default_headers: + client_kwargs["default_headers"] = dict(_routed_client._default_headers) + else: + # When the user explicitly chose a non-OpenRouter provider + # but no credentials were found, fail fast with a clear + # message instead of silently routing through OpenRouter. + _explicit = (agent.provider or "").strip().lower() + if _explicit and _explicit not in {"auto", "openrouter", "custom"}: + # Look up the actual env var name from the provider + # config — some providers use non-standard names + # (e.g. alibaba → DASHSCOPE_API_KEY, not ALIBABA_API_KEY). + _env_hint = f"{_explicit.upper()}_API_KEY" + try: + from hermes_cli.auth import PROVIDER_REGISTRY + _pcfg = PROVIDER_REGISTRY.get(_explicit) + if _pcfg and _pcfg.api_key_env_vars: + _env_hint = _pcfg.api_key_env_vars[0] + except Exception: + pass + # --- Init-time fallback (#17929) --- + _fb_entries = [] + if isinstance(fallback_model, list): + _fb_entries = [ + f for f in fallback_model + if isinstance(f, dict) and f.get("provider") and f.get("model") + ] + elif isinstance(fallback_model, dict) and fallback_model.get("provider") and fallback_model.get("model"): + _fb_entries = [fallback_model] + _fb_resolved = False + for _fb in _fb_entries: + _fb_explicit_key = (_fb.get("api_key") or "").strip() or None + if not _fb_explicit_key: + _fb_key_env = (_fb.get("key_env") or _fb.get("api_key_env") or "").strip() + if _fb_key_env: + _fb_explicit_key = os.getenv(_fb_key_env, "").strip() or None + _fb_client, _fb_model = resolve_provider_client( + _fb["provider"], model=_fb["model"], raw_codex=True, + explicit_base_url=_fb.get("base_url"), + explicit_api_key=_fb_explicit_key, + ) + if _fb_client is not None: + agent.provider = _fb["provider"] + agent.model = _fb_model or _fb["model"] + agent._fallback_activated = True + client_kwargs = { + "api_key": _fb_client.api_key, + "base_url": str(_fb_client.base_url), + } + if _provider_timeout is not None: + client_kwargs["timeout"] = _provider_timeout + if hasattr(_fb_client, "_default_headers") and _fb_client._default_headers: + client_kwargs["default_headers"] = dict(_fb_client._default_headers) + _fb_resolved = True + break + if not _fb_resolved: + raise RuntimeError( + f"Provider '{_explicit}' is set in config.yaml but no API key " + f"was found. Set the {_env_hint} environment " + f"variable, or switch to a different provider with `hermes model`." + ) + if not getattr(agent, "_fallback_activated", False): + # No provider configured — reject with a clear message. + raise RuntimeError( + "No LLM provider configured. Run `hermes model` to " + "select a provider, or run `hermes setup` for first-time " + "configuration." + ) + + agent._client_kwargs = client_kwargs # stored for rebuilding after interrupt + + # Enable fine-grained tool streaming for Claude on OpenRouter. + # Without this, Anthropic buffers the entire tool call and goes + # silent for minutes while thinking — OpenRouter's upstream proxy + # times out during the silence. The beta header makes Anthropic + # stream tool call arguments token-by-token, keeping the + # connection alive. + _effective_base = str(client_kwargs.get("base_url", "")).lower() + if base_url_host_matches(_effective_base, "openrouter.ai") and "claude" in (agent.model or "").lower(): + headers = client_kwargs.get("default_headers") or {} + existing_beta = headers.get("x-anthropic-beta", "") + _FINE_GRAINED = "fine-grained-tool-streaming-2025-05-14" + if _FINE_GRAINED not in existing_beta: + if existing_beta: + headers["x-anthropic-beta"] = f"{existing_beta},{_FINE_GRAINED}" + else: + headers["x-anthropic-beta"] = _FINE_GRAINED + client_kwargs["default_headers"] = headers + + agent.api_key = client_kwargs.get("api_key", "") + agent.base_url = client_kwargs.get("base_url", agent.base_url) + try: + agent.client = agent._create_openai_client(client_kwargs, reason="agent_init", shared=True) + if not agent.quiet_mode: + print(f"🤖 AI Agent initialized with model: {agent.model}") + if base_url: + print(f"🔗 Using custom base URL: {base_url}") + # Always show API key info (masked) for debugging auth issues + key_used = client_kwargs.get("api_key", "none") + if key_used and key_used != "dummy-key" and len(key_used) > 12: + print(f"🔑 Using API key: {key_used[:8]}...{key_used[-4:]}") + else: + print(f"⚠️ Warning: API key appears invalid or missing (got: '{key_used[:20] if key_used else 'none'}...')") + except Exception as e: + raise RuntimeError(f"Failed to initialize OpenAI client: {e}") + + # Provider fallback chain — ordered list of backup providers tried + # when the primary is exhausted (rate-limit, overload, connection + # failure). Supports both legacy single-dict ``fallback_model`` and + # new list ``fallback_providers`` format. + if isinstance(fallback_model, list): + agent._fallback_chain = [ + f for f in fallback_model + if isinstance(f, dict) and f.get("provider") and f.get("model") + ] + elif isinstance(fallback_model, dict) and fallback_model.get("provider") and fallback_model.get("model"): + agent._fallback_chain = [fallback_model] + else: + agent._fallback_chain = [] + agent._fallback_index = 0 + agent._fallback_activated = getattr(agent, "_fallback_activated", False) + # Legacy attribute kept for backward compat (tests, external callers) + agent._fallback_model = agent._fallback_chain[0] if agent._fallback_chain else None + if agent._fallback_chain and not agent.quiet_mode: + if len(agent._fallback_chain) == 1: + fb = agent._fallback_chain[0] + print(f"🔄 Fallback model: {fb['model']} ({fb['provider']})") + else: + print(f"🔄 Fallback chain ({len(agent._fallback_chain)} providers): " + + " → ".join(f"{f['model']} ({f['provider']})" for f in agent._fallback_chain)) + + # Get available tools with filtering + agent.tools = _ra().get_tool_definitions( + enabled_toolsets=enabled_toolsets, + disabled_toolsets=disabled_toolsets, + quiet_mode=agent.quiet_mode, + ) + + # Show tool configuration and store valid tool names for validation + agent.valid_tool_names = set() + if agent.tools: + agent.valid_tool_names = {tool["function"]["name"] for tool in agent.tools} + tool_names = sorted(agent.valid_tool_names) + if not agent.quiet_mode: + print(f"🛠️ Loaded {len(agent.tools)} tools: {', '.join(tool_names)}") + + # Show filtering info if applied + if enabled_toolsets: + print(f" ✅ Enabled toolsets: {', '.join(enabled_toolsets)}") + if disabled_toolsets: + print(f" ❌ Disabled toolsets: {', '.join(disabled_toolsets)}") + elif not agent.quiet_mode: + print("🛠️ No tools loaded (all tools filtered out or unavailable)") + + # Check tool requirements + if agent.tools and not agent.quiet_mode: + requirements = _ra().check_toolset_requirements() + missing_reqs = [name for name, available in requirements.items() if not available] + if missing_reqs: + print(f"⚠️ Some tools may not work due to missing requirements: {missing_reqs}") + + # Show trajectory saving status + if agent.save_trajectories and not agent.quiet_mode: + print("📝 Trajectory saving enabled") + + # Show ephemeral system prompt status + if agent.ephemeral_system_prompt and not agent.quiet_mode: + prompt_preview = agent.ephemeral_system_prompt[:60] + "..." if len(agent.ephemeral_system_prompt) > 60 else agent.ephemeral_system_prompt + print(f"🔒 Ephemeral system prompt: '{prompt_preview}' (not saved to trajectories)") + + # Show prompt caching status + if agent._use_prompt_caching and not agent.quiet_mode: + if agent._use_native_cache_layout and agent.provider == "anthropic": + source = "native Anthropic" + elif agent._use_native_cache_layout: + source = "Anthropic-compatible endpoint" + else: + source = "Claude via OpenRouter" + print(f"💾 Prompt caching: ENABLED ({source}, {agent._cache_ttl} TTL)") + + # Session logging setup - auto-save conversation trajectories for debugging + agent.session_start = datetime.now() + if session_id: + # Use provided session ID (e.g., from CLI) + agent.session_id = session_id + else: + # Generate a new session ID + timestamp_str = agent.session_start.strftime("%Y%m%d_%H%M%S") + short_uuid = uuid.uuid4().hex[:6] + agent.session_id = f"{timestamp_str}_{short_uuid}" + + # Expose session ID to tools (terminal, execute_code) so agents can + # reference their own session for --resume commands, cross-session + # coordination, and logging. Uses the ContextVar system from + # session_context.py for concurrency safety (gateway runs multiple + # sessions in one process). Also writes os.environ as fallback for + # CLI mode where ContextVars aren't used. + os.environ["HERMES_SESSION_ID"] = agent.session_id + try: + from gateway.session_context import _SESSION_ID + _SESSION_ID.set(agent.session_id) + except Exception: + pass # CLI/test mode — ContextVar not needed + + # Session logs go into ~/.hermes/sessions/ alongside gateway sessions + hermes_home = get_hermes_home() + agent.logs_dir = hermes_home / "sessions" + agent.logs_dir.mkdir(parents=True, exist_ok=True) + agent.session_log_file = agent.logs_dir / f"session_{agent.session_id}.json" + + # Track conversation messages for session logging + agent._session_messages: List[Dict[str, Any]] = [] + agent._memory_write_origin = "assistant_tool" + agent._memory_write_context = "foreground" + + # Cached system prompt -- built once per session, only rebuilt on compression + agent._cached_system_prompt: Optional[str] = None + + # Filesystem checkpoint manager (transparent — not a tool) + from tools.checkpoint_manager import CheckpointManager + agent._checkpoint_mgr = CheckpointManager( + enabled=checkpoints_enabled, + max_snapshots=checkpoint_max_snapshots, + max_total_size_mb=checkpoint_max_total_size_mb, + max_file_size_mb=checkpoint_max_file_size_mb, + ) + + # SQLite session store (optional -- provided by CLI or gateway) + agent._session_db = session_db + agent._parent_session_id = parent_session_id + agent._last_flushed_db_idx = 0 # tracks DB-write cursor to prevent duplicate writes + agent._session_db_created = False # DB row deferred to run_conversation() + agent._session_init_model_config = { + "max_iterations": agent.max_iterations, + "reasoning_config": reasoning_config, + "max_tokens": max_tokens, + } + + # In-memory todo list for task planning (one per agent/session) + from tools.todo_tool import TodoStore + agent._todo_store = TodoStore() + + # Load config once for memory, skills, and compression sections + try: + from hermes_cli.config import load_config as _load_agent_config + _agent_cfg = _load_agent_config() + except Exception: + _agent_cfg = {} + try: + agent._tool_guardrails = ToolCallGuardrailController( + ToolCallGuardrailConfig.from_mapping( + _agent_cfg.get("tool_loop_guardrails", {}) + ) + ) + except Exception as _tlg_err: + _ra().logger.warning("Tool loop guardrail config ignored: %s", _tlg_err) + # Cache only the derived auxiliary compression context override that is + # needed later by the startup feasibility check. Avoid exposing a + # broad pseudo-public config object on the agent instance. + agent._aux_compression_context_length_config = None + + # Persistent memory (MEMORY.md + USER.md) -- loaded from disk + agent._memory_store = None + agent._memory_enabled = False + agent._user_profile_enabled = False + agent._memory_nudge_interval = 10 + agent._turns_since_memory = 0 + agent._iters_since_skill = 0 + if not skip_memory: + try: + mem_config = _agent_cfg.get("memory", {}) + agent._memory_enabled = mem_config.get("memory_enabled", False) + agent._user_profile_enabled = mem_config.get("user_profile_enabled", False) + agent._memory_nudge_interval = int(mem_config.get("nudge_interval", 10)) + if agent._memory_enabled or agent._user_profile_enabled: + from tools.memory_tool import MemoryStore + agent._memory_store = MemoryStore( + memory_char_limit=mem_config.get("memory_char_limit", 2200), + user_char_limit=mem_config.get("user_char_limit", 1375), + ) + agent._memory_store.load_from_disk() + except Exception: + pass # Memory is optional -- don't break agent init + + + + # Memory provider plugin (external — one at a time, alongside built-in) + # Reads memory.provider from config to select which plugin to activate. + agent._memory_manager = None + if not skip_memory: + try: + _mem_provider_name = mem_config.get("provider", "") if mem_config else "" + + if _mem_provider_name: + from agent.memory_manager import MemoryManager as _MemoryManager + from plugins.memory import load_memory_provider as _load_mem + agent._memory_manager = _MemoryManager() + _mp = _load_mem(_mem_provider_name) + if _mp and _mp.is_available(): + agent._memory_manager.add_provider(_mp) + if agent._memory_manager.providers: + _init_kwargs = { + "session_id": agent.session_id, + "platform": platform or "cli", + "hermes_home": str(get_hermes_home()), + "agent_context": "primary", + } + # Thread session title for memory provider scoping + # (e.g. honcho uses this to derive chat-scoped session keys) + if agent._session_db: + try: + _st = agent._session_db.get_session_title(agent.session_id) + if _st: + _init_kwargs["session_title"] = _st + except Exception: + pass + # Thread gateway user identity for per-user memory scoping + if agent._user_id: + _init_kwargs["user_id"] = agent._user_id + if agent._user_name: + _init_kwargs["user_name"] = agent._user_name + if agent._chat_id: + _init_kwargs["chat_id"] = agent._chat_id + if agent._chat_name: + _init_kwargs["chat_name"] = agent._chat_name + if agent._chat_type: + _init_kwargs["chat_type"] = agent._chat_type + if agent._thread_id: + _init_kwargs["thread_id"] = agent._thread_id + # Thread gateway session key for stable per-chat Honcho session isolation + if agent._gateway_session_key: + _init_kwargs["gateway_session_key"] = agent._gateway_session_key + # Profile identity for per-profile provider scoping + try: + from hermes_cli.profiles import get_active_profile_name + _profile = get_active_profile_name() + _init_kwargs["agent_identity"] = _profile + _init_kwargs["agent_workspace"] = "hermes" + except Exception: + pass + agent._memory_manager.initialize_all(**_init_kwargs) + _ra().logger.info("Memory provider '%s' activated", _mem_provider_name) + else: + _ra().logger.debug("Memory provider '%s' not found or not available", _mem_provider_name) + agent._memory_manager = None + except Exception as _mpe: + _ra().logger.warning("Memory provider plugin init failed: %s", _mpe) + agent._memory_manager = None + + # Inject memory provider tool schemas into the tool surface. + # Skip tools whose names already exist (plugins may register the + # same tools via ctx.register_tool(), which lands in agent.tools + # through _ra().get_tool_definitions()). Duplicate function names cause + # 400 errors on providers that enforce unique names (e.g. Xiaomi + # MiMo via Nous Portal). + if agent._memory_manager and agent.tools is not None: + _existing_tool_names = { + t.get("function", {}).get("name") + for t in agent.tools + if isinstance(t, dict) + } + for _schema in agent._memory_manager.get_all_tool_schemas(): + _tname = _schema.get("name", "") + if _tname and _tname in _existing_tool_names: + continue # already registered via plugin path + _wrapped = {"type": "function", "function": _schema} + agent.tools.append(_wrapped) + if _tname: + agent.valid_tool_names.add(_tname) + _existing_tool_names.add(_tname) + + # Skills config: nudge interval for skill creation reminders + agent._skill_nudge_interval = 10 + try: + skills_config = _agent_cfg.get("skills", {}) + agent._skill_nudge_interval = int(skills_config.get("creation_nudge_interval", 10)) + except Exception: + pass + + # Tool-use enforcement config: "auto" (default — matches hardcoded + # model list), true (always), false (never), or list of substrings. + _agent_section = _agent_cfg.get("agent", {}) + if not isinstance(_agent_section, dict): + _agent_section = {} + agent._tool_use_enforcement = _agent_section.get("tool_use_enforcement", "auto") + + # App-level API retry count (wraps each model API call). Default 3, + # overridable via agent.api_max_retries in config.yaml. See #11616. + try: + _raw_api_retries = _agent_section.get("api_max_retries", 3) + _api_retries = int(_raw_api_retries) + _api_retries = max(_api_retries, 1) # 1 = no retry (single attempt) + except (TypeError, ValueError): + _api_retries = 3 + agent._api_max_retries = _api_retries + + # Initialize context compressor for automatic context management + # Compresses conversation when approaching model's context limit + # Configuration via config.yaml (compression section) + _compression_cfg = _agent_cfg.get("compression", {}) + if not isinstance(_compression_cfg, dict): + _compression_cfg = {} + compression_threshold = float(_compression_cfg.get("threshold", 0.50)) + try: + from agent.auxiliary_client import _compression_threshold_for_model as _cthresh_fn + _model_cthresh = _cthresh_fn(agent.model) + if _model_cthresh is not None: + compression_threshold = _model_cthresh + except Exception: + pass + compression_enabled = str(_compression_cfg.get("enabled", True)).lower() in {"true", "1", "yes"} + compression_target_ratio = float(_compression_cfg.get("target_ratio", 0.20)) + compression_protect_last = int(_compression_cfg.get("protect_last_n", 20)) + # protect_first_n is the number of non-system messages to protect at + # the head, in addition to the system prompt (which is always + # implicitly protected by the compressor). Floor at 0 — a value of + # 0 means "preserve only the system prompt + summary + tail", which + # is a legitimate (and common) configuration for long-running + # rolling-compaction sessions. + compression_protect_first = max( + 0, int(_compression_cfg.get("protect_first_n", 3)) + ) + + # Read optional explicit context_length override for the auxiliary + # compression model. Custom endpoints often cannot report this via + # /models, so the startup feasibility check needs the config hint. + try: + _aux_cfg = cfg_get(_agent_cfg, "auxiliary", "compression", default={}) + except Exception: + _aux_cfg = {} + if isinstance(_aux_cfg, dict): + _aux_context_config = _aux_cfg.get("context_length") + else: + _aux_context_config = None + if _aux_context_config is not None: + try: + _aux_context_config = int(_aux_context_config) + except (TypeError, ValueError): + _aux_context_config = None + agent._aux_compression_context_length_config = _aux_context_config + + # Read explicit model output-token override from config when the + # caller did not pass one directly. + _model_cfg = _agent_cfg.get("model", {}) + if agent.max_tokens is None and isinstance(_model_cfg, dict): + _config_max_tokens = _model_cfg.get("max_tokens") + if _config_max_tokens is not None: + try: + if isinstance(_config_max_tokens, bool): + raise ValueError + _parsed_max_tokens = int(_config_max_tokens) + if _parsed_max_tokens <= 0: + raise ValueError + agent.max_tokens = _parsed_max_tokens + except (TypeError, ValueError): + _ra().logger.warning( + "Invalid model.max_tokens in config.yaml: %r — " + "must be a positive integer (e.g. 4096). " + "Falling back to provider default.", + _config_max_tokens, + ) + print( + f"\n⚠ Invalid model.max_tokens in config.yaml: {_config_max_tokens!r}\n" + f" Must be a positive integer (e.g. 4096).\n" + f" Falling back to provider default.\n", + file=sys.stderr, + ) + agent._session_init_model_config["max_tokens"] = agent.max_tokens + + # Read explicit context_length override from model config + if isinstance(_model_cfg, dict): + _config_context_length = _model_cfg.get("context_length") + else: + _config_context_length = None + if _config_context_length is not None: + try: + _config_context_length = int(_config_context_length) + except (TypeError, ValueError): + _ra().logger.warning( + "Invalid model.context_length in config.yaml: %r — " + "must be a plain integer (e.g. 256000, not '256K'). " + "Falling back to auto-detection.", + _config_context_length, + ) + print( + f"\n⚠ Invalid model.context_length in config.yaml: {_config_context_length!r}\n" + f" Must be a plain integer (e.g. 256000, not '256K').\n" + f" Falling back to auto-detected context window.\n", + file=sys.stderr, + ) + _config_context_length = None + + # Resolve custom_providers list once for reuse below (startup + # context-length override and plugin context-engine init). + try: + from hermes_cli.config import get_compatible_custom_providers + _custom_providers = get_compatible_custom_providers(_agent_cfg) + except Exception: + _custom_providers = _agent_cfg.get("custom_providers") + if not isinstance(_custom_providers, list): + _custom_providers = [] + + # Store for reuse by _check_compression_model_feasibility (auxiliary + # compression model context-length detection needs the same list). + agent._custom_providers = _custom_providers + + # Check custom_providers per-model context_length + if _config_context_length is None and _custom_providers: + try: + from hermes_cli.config import get_custom_provider_context_length + _cp_ctx_resolved = get_custom_provider_context_length( + model=agent.model, + base_url=agent.base_url, + custom_providers=_custom_providers, + ) + if _cp_ctx_resolved: + _config_context_length = int(_cp_ctx_resolved) + except Exception: + _cp_ctx_resolved = None + + # Surface a clear warning if the user set a context_length but it + # wasn't a valid positive int — the helper silently skips those. + if _config_context_length is None: + _target = agent.base_url.rstrip("/") if agent.base_url else "" + for _cp_entry in _custom_providers: + if not isinstance(_cp_entry, dict): + continue + _cp_url = (_cp_entry.get("base_url") or "").rstrip("/") + if _target and _cp_url == _target: + _cp_models = _cp_entry.get("models", {}) + if isinstance(_cp_models, dict): + _cp_model_cfg = _cp_models.get(agent.model, {}) + if isinstance(_cp_model_cfg, dict): + _cp_ctx = _cp_model_cfg.get("context_length") + if _cp_ctx is not None: + try: + _parsed = int(_cp_ctx) + if _parsed <= 0: + raise ValueError + except (TypeError, ValueError): + _ra().logger.warning( + "Invalid context_length for model %r in " + "custom_providers: %r — must be a positive " + "integer (e.g. 256000, not '256K'). " + "Falling back to auto-detection.", + agent.model, _cp_ctx, + ) + print( + f"\n⚠ Invalid context_length for model {agent.model!r} in custom_providers: {_cp_ctx!r}\n" + f" Must be a positive integer (e.g. 256000, not '256K').\n" + f" Falling back to auto-detected context window.\n", + file=sys.stderr, + ) + break + + # Persist for reuse on switch_model / fallback activation. Must come + # AFTER the custom_providers branch so per-model overrides aren't lost. + agent._config_context_length = _config_context_length + + agent._ensure_lmstudio_runtime_loaded(_config_context_length) + + + + # Select context engine: config-driven (like memory providers). + # 1. Check config.yaml context.engine setting + # 2. Check plugins/context_engine// directory (repo-shipped) + # 3. Check general plugin system (user-installed plugins) + # 4. Fall back to built-in ContextCompressor + _selected_engine = None + _engine_name = "compressor" # default + try: + _ctx_cfg = _agent_cfg.get("context", {}) if isinstance(_agent_cfg, dict) else {} + _engine_name = _ctx_cfg.get("engine", "compressor") or "compressor" + except Exception: + pass + + if _engine_name != "compressor": + # Try loading from plugins/context_engine// + try: + from plugins.context_engine import load_context_engine + _selected_engine = load_context_engine(_engine_name) + except Exception as _ce_load_err: + _ra().logger.debug("Context engine load from plugins/context_engine/: %s", _ce_load_err) + + # Try general plugin system as fallback + if _selected_engine is None: + try: + from hermes_cli.plugins import get_plugin_context_engine + _candidate = get_plugin_context_engine() + if _candidate and _candidate.name == _engine_name: + _selected_engine = _candidate + except Exception: + pass + + if _selected_engine is None: + _ra().logger.warning( + "Context engine '%s' not found — falling back to built-in compressor", + _engine_name, + ) + # else: config says "compressor" — use built-in, don't auto-activate plugins + + if _selected_engine is not None: + agent.context_compressor = _selected_engine + # Resolve context_length for plugin engines — mirrors switch_model() path + from agent.model_metadata import get_model_context_length + _plugin_ctx_len = get_model_context_length( + agent.model, + base_url=agent.base_url, + api_key=getattr(agent, "api_key", ""), + config_context_length=_config_context_length, + provider=agent.provider, + custom_providers=_custom_providers, + ) + agent.context_compressor.update_model( + model=agent.model, + context_length=_plugin_ctx_len, + base_url=agent.base_url, + api_key=getattr(agent, "api_key", ""), + provider=agent.provider, + ) + if not agent.quiet_mode: + _ra().logger.info("Using context engine: %s", _selected_engine.name) + else: + agent.context_compressor = ContextCompressor( + model=agent.model, + threshold_percent=compression_threshold, + protect_first_n=compression_protect_first, + protect_last_n=compression_protect_last, + summary_target_ratio=compression_target_ratio, + summary_model_override=None, + quiet_mode=agent.quiet_mode, + base_url=agent.base_url, + api_key=getattr(agent, "api_key", ""), + config_context_length=_config_context_length, + provider=agent.provider, + api_mode=agent.api_mode, + ) + agent.compression_enabled = compression_enabled + + # Reject models whose context window is below the minimum required + # for reliable tool-calling workflows (64K tokens). + from agent.model_metadata import MINIMUM_CONTEXT_LENGTH + _ctx = getattr(agent.context_compressor, "context_length", 0) + if _ctx and _ctx < MINIMUM_CONTEXT_LENGTH: + raise ValueError( + f"Model {agent.model} has a context window of {_ctx:,} tokens, " + f"which is below the minimum {MINIMUM_CONTEXT_LENGTH:,} required " + f"by Hermes Agent. Choose a model with at least " + f"{MINIMUM_CONTEXT_LENGTH // 1000}K context, or set " + f"model.context_length in config.yaml to override." + ) + + # Inject context engine tool schemas (e.g. lcm_grep, lcm_describe, lcm_expand). + # Skip names that are already present — the _ra().get_tool_definitions() + # quiet_mode cache returned a shared list pre-#17335, so a stray + # mutation here would poison subsequent agent inits in the same + # Gateway process and trip provider-side 'duplicate tool name' + # errors. Even with the cache fix, dedup is the right defense + # against plugin paths that may register the same schemas via + # ctx.register_tool(). Mirrors the memory tools dedup above. + agent._context_engine_tool_names: set = set() + if hasattr(agent, "context_compressor") and agent.context_compressor and agent.tools is not None: + _existing_tool_names = { + t.get("function", {}).get("name") + for t in agent.tools + if isinstance(t, dict) + } + for _schema in agent.context_compressor.get_tool_schemas(): + _tname = _schema.get("name", "") + if _tname and _tname in _existing_tool_names: + continue # already registered via plugin/cache path + _wrapped = {"type": "function", "function": _schema} + agent.tools.append(_wrapped) + if _tname: + agent.valid_tool_names.add(_tname) + agent._context_engine_tool_names.add(_tname) + _existing_tool_names.add(_tname) + + # Notify context engine of session start + if hasattr(agent, "context_compressor") and agent.context_compressor: + try: + agent.context_compressor.on_session_start( + agent.session_id, + hermes_home=str(get_hermes_home()), + platform=agent.platform or "cli", + model=agent.model, + context_length=getattr(agent.context_compressor, "context_length", 0), + ) + except Exception as _ce_err: + _ra().logger.debug("Context engine on_session_start: %s", _ce_err) + + agent._subdirectory_hints = SubdirectoryHintTracker( + working_dir=os.getenv("TERMINAL_CWD") or None, + ) + agent._user_turn_count = 0 + + # Cumulative token usage for the session + agent.session_prompt_tokens = 0 + agent.session_completion_tokens = 0 + agent.session_total_tokens = 0 + agent.session_api_calls = 0 + agent.session_input_tokens = 0 + agent.session_output_tokens = 0 + agent.session_cache_read_tokens = 0 + agent.session_cache_write_tokens = 0 + agent.session_reasoning_tokens = 0 + agent.session_estimated_cost_usd = 0.0 + agent.session_cost_status = "unknown" + agent.session_cost_source = "none" + + # ── Ollama num_ctx injection ── + # Ollama defaults to 2048 context regardless of the model's capabilities. + # When running against an Ollama server, detect the model's max context + # and pass num_ctx on every chat request so the full window is used. + # User override: set model.ollama_num_ctx in config.yaml to cap VRAM use. + # If model.context_length is set, it caps num_ctx so the user's VRAM + # budget is respected even when GGUF metadata advertises a larger window. + agent._ollama_num_ctx: int | None = None + _ollama_num_ctx_override = None + if isinstance(_model_cfg, dict): + _ollama_num_ctx_override = _model_cfg.get("ollama_num_ctx") + if _ollama_num_ctx_override is not None: + try: + agent._ollama_num_ctx = int(_ollama_num_ctx_override) + except (TypeError, ValueError): + _ra().logger.debug("Invalid ollama_num_ctx config value: %r", _ollama_num_ctx_override) + if agent._ollama_num_ctx is None and agent.base_url and is_local_endpoint(agent.base_url): + try: + _detected = query_ollama_num_ctx(agent.model, agent.base_url, api_key=agent.api_key or "") + if _detected and _detected > 0: + agent._ollama_num_ctx = _detected + except Exception as exc: + _ra().logger.debug("Ollama num_ctx detection failed: %s", exc) + # Cap auto-detected ollama_num_ctx to the user's explicit context_length. + # Without this, GGUF metadata can advertise 256K+ which Ollama honours + # by allocating that much VRAM — blowing up small GPUs even though the + # user explicitly set a smaller context_length in config.yaml. + if ( + agent._ollama_num_ctx + and _config_context_length + and _ollama_num_ctx_override is None # don't override explicit ollama_num_ctx + and agent._ollama_num_ctx > _config_context_length + ): + _ra().logger.info( + "Ollama num_ctx capped: %d -> %d (model.context_length override)", + agent._ollama_num_ctx, _config_context_length, + ) + agent._ollama_num_ctx = _config_context_length + if agent._ollama_num_ctx and not agent.quiet_mode: + _ra().logger.info( + "Ollama num_ctx: will request %d tokens (model max from /api/show)", + agent._ollama_num_ctx, + ) + + if not agent.quiet_mode: + if compression_enabled: + print(f"📊 Context limit: {agent.context_compressor.context_length:,} tokens (compress at {int(compression_threshold*100)}% = {agent.context_compressor.threshold_tokens:,})") + else: + print(f"📊 Context limit: {agent.context_compressor.context_length:,} tokens (auto-compression disabled)") + + # Check immediately so CLI users see the warning at startup. + # Gateway status_callback is not yet wired, so any warning is stored + # in _compression_warning and replayed in the first run_conversation(). + agent._compression_warning = None + agent._check_compression_model_feasibility() + + # Snapshot primary runtime for per-turn restoration. When fallback + # activates during a turn, the next turn restores these values so the + # preferred model gets a fresh attempt each time. Uses a single dict + # so new state fields are easy to add without N individual attributes. + _cc = agent.context_compressor + agent._primary_runtime = { + "model": agent.model, + "provider": agent.provider, + "base_url": agent.base_url, + "api_mode": agent.api_mode, + "api_key": getattr(agent, "api_key", ""), + "client_kwargs": dict(agent._client_kwargs), + "use_prompt_caching": agent._use_prompt_caching, + "use_native_cache_layout": agent._use_native_cache_layout, + # Context engine state that _try_activate_fallback() overwrites. + # Use getattr for model/base_url/api_key/provider since plugin + # engines may not have these (they're ContextCompressor-specific). + "compressor_model": getattr(_cc, "model", agent.model), + "compressor_base_url": getattr(_cc, "base_url", agent.base_url), + "compressor_api_key": getattr(_cc, "api_key", ""), + "compressor_provider": getattr(_cc, "provider", agent.provider), + "compressor_context_length": _cc.context_length, + "compressor_threshold_tokens": _cc.threshold_tokens, + } + if agent.api_mode == "anthropic_messages": + agent._primary_runtime.update({ + "anthropic_api_key": agent._anthropic_api_key, + "anthropic_base_url": agent._anthropic_base_url, + "is_anthropic_oauth": agent._is_anthropic_oauth, + }) + + + +__all__ = ["init_agent"] diff --git a/run_agent.py b/run_agent.py index b13eb851175..05d648f94e2 100644 --- a/run_agent.py +++ b/run_agent.py @@ -374,1319 +374,9 @@ class AIAgent: checkpoint_max_file_size_mb: int = 10, pass_session_id: bool = False, ): - """ - Initialize the AI Agent. - - Args: - base_url (str): Base URL for the model API (optional) - api_key (str): API key for authentication (optional, uses env var if not provided) - provider (str): Provider identifier (optional; used for telemetry/routing hints) - api_mode (str): API mode override: "chat_completions" or "codex_responses" - model (str): Model name to use (default: "anthropic/claude-opus-4.6") - max_iterations (int): Maximum number of tool calling iterations (default: 90) - tool_delay (float): Delay between tool calls in seconds (default: 1.0) - enabled_toolsets (List[str]): Only enable tools from these toolsets (optional) - disabled_toolsets (List[str]): Disable tools from these toolsets (optional) - save_trajectories (bool): Whether to save conversation trajectories to JSONL files (default: False) - verbose_logging (bool): Enable verbose logging for debugging (default: False) - quiet_mode (bool): Suppress progress output for clean CLI experience (default: False) - ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional) - log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 100) - log_prefix (str): Prefix to add to all log messages for identification in parallel processing (default: "") - providers_allowed (List[str]): OpenRouter providers to allow (optional) - providers_ignored (List[str]): OpenRouter providers to ignore (optional) - providers_order (List[str]): OpenRouter providers to try in order (optional) - provider_sort (str): Sort providers by price/throughput/latency (optional) - openrouter_min_coding_score (float): Coding-score floor (0.0-1.0) for the - openrouter/pareto-code router. Only applied when model == "openrouter/pareto-code". - None or empty = let OpenRouter pick the strongest available coder. - session_id (str): Pre-generated session ID for logging (optional, auto-generated if not provided) - tool_progress_callback (callable): Callback function(tool_name, args_preview) for progress notifications - clarify_callback (callable): Callback function(question, choices) -> str for interactive user questions. - Provided by the platform layer (CLI or gateway). If None, the clarify tool returns an error. - max_tokens (int): Maximum tokens for model responses (optional, uses model default if not set) - reasoning_config (Dict): OpenRouter reasoning configuration override (e.g. {"effort": "none"} to disable thinking). - If None, defaults to {"enabled": True, "effort": "medium"} for OpenRouter. Set to disable/customize reasoning. - prefill_messages (List[Dict]): Messages to prepend to conversation history as prefilled context. - Useful for injecting a few-shot example or priming the model's response style. - Example: [{"role": "user", "content": "Hi!"}, {"role": "assistant", "content": "Hello!"}] - NOTE: Anthropic Sonnet 4.6+ and Opus 4.6+ reject a conversation that ends on an - assistant-role message (400 error). For those models use structured outputs or - output_config.format instead of a trailing-assistant prefill. - platform (str): The interface platform the user is on (e.g. "cli", "telegram", "discord", "whatsapp"). - Used to inject platform-specific formatting hints into the system prompt. - skip_context_files (bool): If True, skip auto-injection of SOUL.md, AGENTS.md, and .cursorrules - into the system prompt. Use this for batch processing and data generation to avoid - polluting trajectories with user-specific persona or project instructions. - load_soul_identity (bool): If True, still use ~/.hermes/SOUL.md as the primary - identity even when skip_context_files=True. Project context files from the cwd - remain skipped. - """ - _install_safe_stdio() - - self.model = model - self.max_iterations = max_iterations - # Shared iteration budget — parent creates, children inherit. - # Consumed by every LLM turn across parent + all subagents. - self.iteration_budget = iteration_budget or IterationBudget(max_iterations) - self.tool_delay = tool_delay - self.save_trajectories = save_trajectories - self.verbose_logging = verbose_logging - self.quiet_mode = quiet_mode - self.ephemeral_system_prompt = ephemeral_system_prompt - self.platform = platform # "cli", "telegram", "discord", "whatsapp", etc. - self._user_id = user_id # Platform user identifier (gateway sessions) - self._user_name = user_name - self._chat_id = chat_id - self._chat_name = chat_name - self._chat_type = chat_type - self._thread_id = thread_id - self._gateway_session_key = gateway_session_key # Stable per-chat key (e.g. agent:main:telegram:dm:123) - # Pluggable print function — CLI replaces this with _cprint so that - # raw ANSI status lines are routed through prompt_toolkit's renderer - # instead of going directly to stdout where patch_stdout's StdoutProxy - # would mangle the escape sequences. None = use builtins.print. - self._print_fn = None - self.background_review_callback = None # Optional sync callback for gateway delivery - self.skip_context_files = skip_context_files - self.load_soul_identity = load_soul_identity - self.pass_session_id = pass_session_id - self._credential_pool = credential_pool - self.log_prefix_chars = log_prefix_chars - self.log_prefix = f"{log_prefix} " if log_prefix else "" - # Store effective base URL for feature detection (prompt caching, reasoning, etc.) - self.base_url = base_url or "" - provider_name = provider.strip().lower() if isinstance(provider, str) and provider.strip() else None - self.provider = provider_name or "" - self.acp_command = acp_command or command - self.acp_args = list(acp_args or args or []) - if api_mode in {"chat_completions", "codex_responses", "anthropic_messages", "bedrock_converse", "codex_app_server"}: - self.api_mode = api_mode - elif self.provider == "openai-codex": - self.api_mode = "codex_responses" - elif self.provider == "xai": - self.api_mode = "codex_responses" - elif (provider_name is None) and ( - self._base_url_hostname == "chatgpt.com" - and "/backend-api/codex" in self._base_url_lower - ): - self.api_mode = "codex_responses" - self.provider = "openai-codex" - elif (provider_name is None) and self._base_url_hostname == "api.x.ai": - self.api_mode = "codex_responses" - self.provider = "xai" - elif self.provider == "anthropic" or (provider_name is None and self._base_url_hostname == "api.anthropic.com"): - self.api_mode = "anthropic_messages" - self.provider = "anthropic" - elif self._base_url_lower.rstrip("/").endswith("/anthropic"): - # Third-party Anthropic-compatible endpoints (e.g. MiniMax, DashScope) - # use a URL convention ending in /anthropic. Auto-detect these so the - # Anthropic Messages API adapter is used instead of chat completions. - self.api_mode = "anthropic_messages" - elif self.provider == "bedrock" or ( - self._base_url_hostname.startswith("bedrock-runtime.") - and base_url_host_matches(self._base_url_lower, "amazonaws.com") - ): - # AWS Bedrock — auto-detect from provider name or base URL - # (bedrock-runtime..amazonaws.com). - self.api_mode = "bedrock_converse" - else: - self.api_mode = "chat_completions" - - # Eagerly warm the transport cache so import errors surface at init, - # not mid-conversation. Also validates the api_mode is registered. - try: - self._get_transport() - except Exception: - pass # Non-fatal — transport may not exist for all modes yet - - try: - from hermes_cli.model_normalize import ( - _AGGREGATOR_PROVIDERS, - normalize_model_for_provider, - ) - - if self.provider not in _AGGREGATOR_PROVIDERS: - self.model = normalize_model_for_provider(self.model, self.provider) - except Exception: - pass - - # GPT-5.x models usually require the Responses API path, but some - # providers have exceptions (for example Copilot's gpt-5-mini still - # uses chat completions). Also auto-upgrade for direct OpenAI URLs - # (api.openai.com) since all newer tool-calling models prefer - # Responses there. ACP runtimes are excluded: CopilotACPClient - # handles its own routing and does not implement the Responses API - # surface. - # When api_mode was explicitly provided, respect it — the user - # knows what their endpoint supports (#10473). - # Exception: Azure OpenAI serves gpt-5.x on /chat/completions and - # does NOT support the Responses API — skip the upgrade for Azure - # (openai.azure.com), even though it looks OpenAI-compatible. - if ( - api_mode is None - and self.api_mode == "chat_completions" - and self.provider != "copilot-acp" - and not str(self.base_url or "").lower().startswith("acp://copilot") - and not str(self.base_url or "").lower().startswith("acp+tcp://") - and not self._is_azure_openai_url() - and ( - self._is_direct_openai_url() - or self._provider_model_requires_responses_api( - self.model, - provider=self.provider, - ) - ) - ): - self.api_mode = "codex_responses" - # Invalidate the eager-warmed transport cache — api_mode changed - # from chat_completions to codex_responses after the warm at __init__. - if hasattr(self, "_transport_cache"): - self._transport_cache.clear() - - # Pre-warm OpenRouter model metadata cache in a background thread. - # fetch_model_metadata() is cached for 1 hour; this avoids a blocking - # HTTP request on the first API response when pricing is estimated. - # Use a process-level Event so this thread is only spawned once — a new - # AIAgent is created for every gateway request, so without the guard - # each message leaks one OS thread and the process eventually exhausts - # the system thread limit (RuntimeError: can't start new thread). - if (self.provider == "openrouter" or self._is_openrouter_url()) and \ - not _openrouter_prewarm_done.is_set(): - _openrouter_prewarm_done.set() - threading.Thread( - target=fetch_model_metadata, - daemon=True, - name="openrouter-prewarm", - ).start() - - self.tool_progress_callback = tool_progress_callback - self.tool_start_callback = tool_start_callback - self.tool_complete_callback = tool_complete_callback - self.suppress_status_output = False - self.thinking_callback = thinking_callback - self.reasoning_callback = reasoning_callback - self.clarify_callback = clarify_callback - self.step_callback = step_callback - self.stream_delta_callback = stream_delta_callback - self.interim_assistant_callback = interim_assistant_callback - self.status_callback = status_callback - self.tool_gen_callback = tool_gen_callback - - - # Tool execution state — allows _vprint during tool execution - # even when stream consumers are registered (no tokens streaming then) - self._executing_tools = False - self._tool_guardrails = ToolCallGuardrailController() - self._tool_guardrail_halt_decision: ToolGuardrailDecision | None = None - - # Interrupt mechanism for breaking out of tool loops - self._interrupt_requested = False - self._interrupt_message = None # Optional message that triggered interrupt - self._execution_thread_id: int | None = None # Set at run_conversation() start - self._interrupt_thread_signal_pending = False - self._client_lock = threading.RLock() - - # /steer mechanism — inject a user note into the next tool result - # without interrupting the agent. Unlike interrupt(), steer() does - # NOT set _interrupt_requested; it waits for the current tool batch - # to finish naturally, then the drain hook appends the text to the - # last tool result's content so the model sees it on its next - # iteration. Message-role alternation is preserved (we modify an - # existing tool message rather than inserting a new user turn). - self._pending_steer: Optional[str] = None - self._pending_steer_lock = threading.Lock() - - # Concurrent-tool worker thread tracking. `_execute_tool_calls_concurrent` - # runs each tool on its own ThreadPoolExecutor worker — those worker - # threads have tids distinct from `_execution_thread_id`, so - # `_set_interrupt(True, _execution_thread_id)` alone does NOT cause - # `is_interrupted()` inside the worker to return True. Track the - # workers here so `interrupt()` / `clear_interrupt()` can fan out to - # their tids explicitly. - self._tool_worker_threads: set[int] = set() - self._tool_worker_threads_lock = threading.Lock() - - # Subagent delegation state - self._delegate_depth = 0 # 0 = top-level agent, incremented for children - self._active_children = [] # Running child AIAgents (for interrupt propagation) - self._active_children_lock = threading.Lock() - - # Store OpenRouter provider preferences - self.providers_allowed = providers_allowed - self.providers_ignored = providers_ignored - self.providers_order = providers_order - self.provider_sort = provider_sort - self.provider_require_parameters = provider_require_parameters - self.provider_data_collection = provider_data_collection - self.openrouter_min_coding_score = openrouter_min_coding_score - - # Store toolset filtering options - self.enabled_toolsets = enabled_toolsets - self.disabled_toolsets = disabled_toolsets - - # Model response configuration - self.max_tokens = max_tokens # None = use model default - self.reasoning_config = reasoning_config # None = use default (medium for OpenRouter) - self.service_tier = service_tier - self.request_overrides = dict(request_overrides or {}) - self.prefill_messages = prefill_messages or [] # Prefilled conversation turns - self._force_ascii_payload = False - - # Anthropic prompt caching: auto-enabled for Claude models on native - # Anthropic, OpenRouter, and third-party gateways that speak the - # Anthropic protocol (``api_mode == 'anthropic_messages'``). Reduces - # input costs by ~75% on multi-turn conversations. Uses system_and_3 - # strategy (4 breakpoints). See ``_anthropic_prompt_cache_policy`` - # for the layout-vs-transport decision. - self._use_prompt_caching, self._use_native_cache_layout = ( - self._anthropic_prompt_cache_policy() - ) - # Anthropic supports "5m" (default) and "1h" cache TTL tiers. Read from - # config.yaml under prompt_caching.cache_ttl; unknown values keep "5m". - # 1h tier costs 2x on write vs 1.25x for 5m, but amortizes across long - # sessions with >5-minute pauses between turns (#14971). - self._cache_ttl = "5m" - try: - from hermes_cli.config import load_config as _load_pc_cfg - - _pc_cfg = _load_pc_cfg().get("prompt_caching", {}) or {} - _ttl = _pc_cfg.get("cache_ttl", "5m") - if _ttl in {"5m", "1h"}: - self._cache_ttl = _ttl - except Exception: - pass - - # Iteration budget: the LLM is only notified when it actually exhausts - # the iteration budget (api_call_count >= max_iterations). At that - # point we inject ONE message, allow one final API call, and if the - # model doesn't produce a text response, force a user-message asking - # it to summarise. No intermediate pressure warnings — they caused - # models to "give up" prematurely on complex tasks (#7915). - self._budget_exhausted_injected = False - self._budget_grace_call = False - - # Activity tracking — updated on each API call, tool execution, and - # stream chunk. Used by the gateway timeout handler to report what the - # agent was doing when it was killed, and by the "still working" - # notifications to show progress. - self._last_activity_ts: float = time.time() - self._last_activity_desc: str = "initializing" - self._current_tool: str | None = None - self._api_call_count: int = 0 - - # Rate limit tracking — updated from x-ratelimit-* response headers - # after each API call. Accessed by /usage slash command. - self._rate_limit_state: Optional["RateLimitState"] = None - - # OpenRouter response cache hit counter — incremented when - # X-OpenRouter-Cache-Status: HIT is seen in streaming response headers. - self._or_cache_hits: int = 0 - - # Centralized logging — agent.log (INFO+) and errors.log (WARNING+) - # both live under ~/.hermes/logs/. Idempotent, so gateway mode - # (which creates a new AIAgent per message) won't duplicate handlers. - from hermes_logging import setup_logging, setup_verbose_logging - setup_logging(hermes_home=_hermes_home) - - if self.verbose_logging: - setup_verbose_logging() - logger.info("Verbose logging enabled (third-party library logs suppressed)") - elif self.quiet_mode: - # In quiet mode (CLI default), keep console output clean — - # but DO NOT raise per-logger levels. Doing so prevents the - # root logger's file handlers (agent.log, errors.log) from - # ever seeing the records, because Python checks - # logger.isEnabledFor() before handler propagation. We rely - # on the fact that hermes_logging.setup_logging() does not - # install a console StreamHandler in quiet mode — so INFO - # records flow to the file handlers but never reach a - # console. Any future noise reduction belongs at the - # handler level inside hermes_logging.py, not here. - pass - - # Internal stream callback (set during streaming TTS). - # Initialized here so _vprint can reference it before run_conversation. - self._stream_callback = None - # Deferred paragraph break flag — set after tool iterations so a - # single "\n\n" is prepended to the next real text delta. - self._stream_needs_break = False - # Stateful scrubber for spans split across stream - # deltas (#5719). sanitize_context() alone can't survive chunk - # boundaries because the block regex needs both tags in one string. - self._stream_context_scrubber = StreamingContextScrubber() - # Stateful scrubber for reasoning/thinking tags in streamed deltas - # (#17924). Replaces the per-delta _strip_think_blocks regex that - # destroyed downstream state (e.g. MiniMax-M2.7 streaming - # '' as delta1 and 'Let me check' as delta2 — the regex - # erased delta1, so downstream state machines never learned a - # block was open and leaked delta2 as content). - self._stream_think_scrubber = StreamingThinkScrubber() - # Visible assistant text already delivered through live token callbacks - # during the current model response. Used to avoid re-sending the same - # commentary when the provider later returns it as a completed interim - # assistant message. - self._current_streamed_assistant_text = "" - - # Optional current-turn user-message override used when the API-facing - # user message intentionally differs from the persisted transcript - # (e.g. CLI voice mode adds a temporary prefix for the live call only). - self._persist_user_message_idx = None - self._persist_user_message_override = None - - # Cache anthropic image-to-text fallbacks per image payload/URL so a - # single tool loop does not repeatedly re-run auxiliary vision on the - # same image history. - self._anthropic_image_fallback_cache: Dict[str, str] = {} - - # Initialize LLM client via centralized provider router. - # The router handles auth resolution, base URL, headers, and - # Codex/Anthropic wrapping for all known providers. - # raw_codex=True because the main agent needs direct responses.stream() - # access for Codex Responses API streaming. - self._anthropic_client = None - self._is_anthropic_oauth = False - - # Resolve per-provider / per-model request timeout once up front so - # every client construction path below (Anthropic native, OpenAI-wire, - # router-based implicit auth) can apply it consistently. Bedrock - # Claude uses its own timeout path and is not covered here. - _provider_timeout = get_provider_request_timeout(self.provider, self.model) - - if self.api_mode == "anthropic_messages": - from agent.anthropic_adapter import build_anthropic_client, resolve_anthropic_token - # Bedrock + Claude → use AnthropicBedrock SDK for full feature parity - # (prompt caching, thinking budgets, adaptive thinking). - _is_bedrock_anthropic = self.provider == "bedrock" - if _is_bedrock_anthropic: - from agent.anthropic_adapter import build_anthropic_bedrock_client - _region_match = re.search(r"bedrock-runtime\.([a-z0-9-]+)\.", base_url or "") - _br_region = _region_match.group(1) if _region_match else "us-east-1" - self._bedrock_region = _br_region - self._anthropic_client = build_anthropic_bedrock_client(_br_region) - self._anthropic_api_key = "aws-sdk" - self._anthropic_base_url = base_url - self._is_anthropic_oauth = False - self.api_key = "aws-sdk" - self.client = None - self._client_kwargs = {} - if not self.quiet_mode: - print(f"🤖 AI Agent initialized with model: {self.model} (AWS Bedrock + AnthropicBedrock SDK, {_br_region})") - else: - # Only fall back to ANTHROPIC_TOKEN when the provider is actually Anthropic. - # Other anthropic_messages providers (MiniMax, Alibaba, etc.) must use their own API key. - # Falling back would send Anthropic credentials to third-party endpoints (Fixes #1739, #minimax-401). - _is_native_anthropic = self.provider == "anthropic" - effective_key = (api_key or resolve_anthropic_token() or "") if _is_native_anthropic else (api_key or "") - self.api_key = effective_key - self._anthropic_api_key = effective_key - self._anthropic_base_url = base_url - # Only mark the session as OAuth-authenticated when the token - # genuinely belongs to native Anthropic. Third-party providers - # (MiniMax, Kimi, GLM, LiteLLM proxies) that accept the - # Anthropic protocol must never trip OAuth code paths — doing - # so injects Claude-Code identity headers and system prompts - # that cause 401/403 on their endpoints. Guards #1739 and - # the third-party identity-injection bug. - from agent.anthropic_adapter import _is_oauth_token as _is_oat - self._is_anthropic_oauth = _is_oat(effective_key) if _is_native_anthropic else False - self._anthropic_client = build_anthropic_client(effective_key, base_url, timeout=_provider_timeout) - # No OpenAI client needed for Anthropic mode - self.client = None - self._client_kwargs = {} - if not self.quiet_mode: - print(f"🤖 AI Agent initialized with model: {self.model} (Anthropic native)") - if effective_key and len(effective_key) > 12: - print(f"🔑 Using token: {effective_key[:8]}...{effective_key[-4:]}") - elif self.api_mode == "bedrock_converse": - # AWS Bedrock — uses boto3 directly, no OpenAI client needed. - # Region is extracted from the base_url or defaults to us-east-1. - _region_match = re.search(r"bedrock-runtime\.([a-z0-9-]+)\.", base_url or "") - self._bedrock_region = _region_match.group(1) if _region_match else "us-east-1" - # Guardrail config — read from config.yaml at init time. - self._bedrock_guardrail_config = None - try: - from hermes_cli.config import load_config as _load_br_cfg - _gr = _load_br_cfg().get("bedrock", {}).get("guardrail", {}) - if _gr.get("guardrail_identifier") and _gr.get("guardrail_version"): - self._bedrock_guardrail_config = { - "guardrailIdentifier": _gr["guardrail_identifier"], - "guardrailVersion": _gr["guardrail_version"], - } - if _gr.get("stream_processing_mode"): - self._bedrock_guardrail_config["streamProcessingMode"] = _gr["stream_processing_mode"] - if _gr.get("trace"): - self._bedrock_guardrail_config["trace"] = _gr["trace"] - except Exception: - pass - self.client = None - self._client_kwargs = {} - if not self.quiet_mode: - _gr_label = " + Guardrails" if self._bedrock_guardrail_config else "" - print(f"🤖 AI Agent initialized with model: {self.model} (AWS Bedrock, {self._bedrock_region}{_gr_label})") - else: - if api_key and base_url: - # Explicit credentials from CLI/gateway — construct directly. - # The runtime provider resolver already handled auth for us. - # Extract query params (e.g. Azure api-version) from base_url - # and pass via default_query to prevent loss during SDK URL - # joining (httpx drops query string when joining paths). - _parsed_url = urlparse(base_url) - if _parsed_url.query: - _clean_url = urlunparse(_parsed_url._replace(query="")) - _query_params = { - k: v[0] for k, v in parse_qs(_parsed_url.query).items() - } - client_kwargs = { - "api_key": api_key, - "base_url": _clean_url, - "default_query": _query_params, - } - else: - client_kwargs = {"api_key": api_key, "base_url": base_url} - if _provider_timeout is not None: - client_kwargs["timeout"] = _provider_timeout - if self.provider == "copilot-acp": - client_kwargs["command"] = self.acp_command - client_kwargs["args"] = self.acp_args - effective_base = base_url - if base_url_host_matches(effective_base, "openrouter.ai"): - from agent.auxiliary_client import build_or_headers - client_kwargs["default_headers"] = build_or_headers() - elif base_url_host_matches(effective_base, "api.routermint.com"): - client_kwargs["default_headers"] = _routermint_headers() - elif base_url_host_matches(effective_base, "api.githubcopilot.com"): - from hermes_cli.models import copilot_default_headers - - client_kwargs["default_headers"] = copilot_default_headers() - elif base_url_host_matches(effective_base, "api.kimi.com"): - client_kwargs["default_headers"] = { - "User-Agent": "claude-code/0.1.0", - } - elif base_url_host_matches(effective_base, "portal.qwen.ai"): - client_kwargs["default_headers"] = _qwen_portal_headers() - elif base_url_host_matches(effective_base, "chatgpt.com"): - from agent.auxiliary_client import _codex_cloudflare_headers - client_kwargs["default_headers"] = _codex_cloudflare_headers(api_key) - elif "default_headers" not in client_kwargs: - # Fall back to profile.default_headers for providers that - # declare custom headers (e.g. Vercel AI Gateway attribution, - # Kimi User-Agent on non-kimi.com endpoints). - try: - from providers import get_provider_profile as _gpf - _ph = _gpf(self.provider) - if _ph and _ph.default_headers: - client_kwargs["default_headers"] = dict(_ph.default_headers) - except Exception: - pass - else: - # No explicit creds — use the centralized provider router - from agent.auxiliary_client import resolve_provider_client - _routed_client, _ = resolve_provider_client( - self.provider or "auto", model=self.model, raw_codex=True) - if _routed_client is not None: - client_kwargs = { - "api_key": _routed_client.api_key, - "base_url": str(_routed_client.base_url), - } - if _provider_timeout is not None: - client_kwargs["timeout"] = _provider_timeout - # Preserve any default_headers the router set - if hasattr(_routed_client, '_default_headers') and _routed_client._default_headers: - client_kwargs["default_headers"] = dict(_routed_client._default_headers) - else: - # When the user explicitly chose a non-OpenRouter provider - # but no credentials were found, fail fast with a clear - # message instead of silently routing through OpenRouter. - _explicit = (self.provider or "").strip().lower() - if _explicit and _explicit not in {"auto", "openrouter", "custom"}: - # Look up the actual env var name from the provider - # config — some providers use non-standard names - # (e.g. alibaba → DASHSCOPE_API_KEY, not ALIBABA_API_KEY). - _env_hint = f"{_explicit.upper()}_API_KEY" - try: - from hermes_cli.auth import PROVIDER_REGISTRY - _pcfg = PROVIDER_REGISTRY.get(_explicit) - if _pcfg and _pcfg.api_key_env_vars: - _env_hint = _pcfg.api_key_env_vars[0] - except Exception: - pass - # --- Init-time fallback (#17929) --- - _fb_entries = [] - if isinstance(fallback_model, list): - _fb_entries = [ - f for f in fallback_model - if isinstance(f, dict) and f.get("provider") and f.get("model") - ] - elif isinstance(fallback_model, dict) and fallback_model.get("provider") and fallback_model.get("model"): - _fb_entries = [fallback_model] - _fb_resolved = False - for _fb in _fb_entries: - _fb_explicit_key = (_fb.get("api_key") or "").strip() or None - if not _fb_explicit_key: - _fb_key_env = (_fb.get("key_env") or _fb.get("api_key_env") or "").strip() - if _fb_key_env: - _fb_explicit_key = os.getenv(_fb_key_env, "").strip() or None - _fb_client, _fb_model = resolve_provider_client( - _fb["provider"], model=_fb["model"], raw_codex=True, - explicit_base_url=_fb.get("base_url"), - explicit_api_key=_fb_explicit_key, - ) - if _fb_client is not None: - self.provider = _fb["provider"] - self.model = _fb_model or _fb["model"] - self._fallback_activated = True - client_kwargs = { - "api_key": _fb_client.api_key, - "base_url": str(_fb_client.base_url), - } - if _provider_timeout is not None: - client_kwargs["timeout"] = _provider_timeout - if hasattr(_fb_client, "_default_headers") and _fb_client._default_headers: - client_kwargs["default_headers"] = dict(_fb_client._default_headers) - _fb_resolved = True - break - if not _fb_resolved: - raise RuntimeError( - f"Provider '{_explicit}' is set in config.yaml but no API key " - f"was found. Set the {_env_hint} environment " - f"variable, or switch to a different provider with `hermes model`." - ) - if not getattr(self, "_fallback_activated", False): - # No provider configured — reject with a clear message. - raise RuntimeError( - "No LLM provider configured. Run `hermes model` to " - "select a provider, or run `hermes setup` for first-time " - "configuration." - ) - - self._client_kwargs = client_kwargs # stored for rebuilding after interrupt - - # Enable fine-grained tool streaming for Claude on OpenRouter. - # Without this, Anthropic buffers the entire tool call and goes - # silent for minutes while thinking — OpenRouter's upstream proxy - # times out during the silence. The beta header makes Anthropic - # stream tool call arguments token-by-token, keeping the - # connection alive. - _effective_base = str(client_kwargs.get("base_url", "")).lower() - if base_url_host_matches(_effective_base, "openrouter.ai") and "claude" in (self.model or "").lower(): - headers = client_kwargs.get("default_headers") or {} - existing_beta = headers.get("x-anthropic-beta", "") - _FINE_GRAINED = "fine-grained-tool-streaming-2025-05-14" - if _FINE_GRAINED not in existing_beta: - if existing_beta: - headers["x-anthropic-beta"] = f"{existing_beta},{_FINE_GRAINED}" - else: - headers["x-anthropic-beta"] = _FINE_GRAINED - client_kwargs["default_headers"] = headers - - self.api_key = client_kwargs.get("api_key", "") - self.base_url = client_kwargs.get("base_url", self.base_url) - try: - self.client = self._create_openai_client(client_kwargs, reason="agent_init", shared=True) - if not self.quiet_mode: - print(f"🤖 AI Agent initialized with model: {self.model}") - if base_url: - print(f"🔗 Using custom base URL: {base_url}") - # Always show API key info (masked) for debugging auth issues - key_used = client_kwargs.get("api_key", "none") - if key_used and key_used != "dummy-key" and len(key_used) > 12: - print(f"🔑 Using API key: {key_used[:8]}...{key_used[-4:]}") - else: - print(f"⚠️ Warning: API key appears invalid or missing (got: '{key_used[:20] if key_used else 'none'}...')") - except Exception as e: - raise RuntimeError(f"Failed to initialize OpenAI client: {e}") - - # Provider fallback chain — ordered list of backup providers tried - # when the primary is exhausted (rate-limit, overload, connection - # failure). Supports both legacy single-dict ``fallback_model`` and - # new list ``fallback_providers`` format. - if isinstance(fallback_model, list): - self._fallback_chain = [ - f for f in fallback_model - if isinstance(f, dict) and f.get("provider") and f.get("model") - ] - elif isinstance(fallback_model, dict) and fallback_model.get("provider") and fallback_model.get("model"): - self._fallback_chain = [fallback_model] - else: - self._fallback_chain = [] - self._fallback_index = 0 - self._fallback_activated = getattr(self, "_fallback_activated", False) - # Legacy attribute kept for backward compat (tests, external callers) - self._fallback_model = self._fallback_chain[0] if self._fallback_chain else None - if self._fallback_chain and not self.quiet_mode: - if len(self._fallback_chain) == 1: - fb = self._fallback_chain[0] - print(f"🔄 Fallback model: {fb['model']} ({fb['provider']})") - else: - print(f"🔄 Fallback chain ({len(self._fallback_chain)} providers): " + - " → ".join(f"{f['model']} ({f['provider']})" for f in self._fallback_chain)) - - # Get available tools with filtering - self.tools = get_tool_definitions( - enabled_toolsets=enabled_toolsets, - disabled_toolsets=disabled_toolsets, - quiet_mode=self.quiet_mode, - ) - - # Show tool configuration and store valid tool names for validation - self.valid_tool_names = set() - if self.tools: - self.valid_tool_names = {tool["function"]["name"] for tool in self.tools} - tool_names = sorted(self.valid_tool_names) - if not self.quiet_mode: - print(f"🛠️ Loaded {len(self.tools)} tools: {', '.join(tool_names)}") - - # Show filtering info if applied - if enabled_toolsets: - print(f" ✅ Enabled toolsets: {', '.join(enabled_toolsets)}") - if disabled_toolsets: - print(f" ❌ Disabled toolsets: {', '.join(disabled_toolsets)}") - elif not self.quiet_mode: - print("🛠️ No tools loaded (all tools filtered out or unavailable)") - - # Check tool requirements - if self.tools and not self.quiet_mode: - requirements = check_toolset_requirements() - missing_reqs = [name for name, available in requirements.items() if not available] - if missing_reqs: - print(f"⚠️ Some tools may not work due to missing requirements: {missing_reqs}") - - # Show trajectory saving status - if self.save_trajectories and not self.quiet_mode: - print("📝 Trajectory saving enabled") - - # Show ephemeral system prompt status - if self.ephemeral_system_prompt and not self.quiet_mode: - prompt_preview = self.ephemeral_system_prompt[:60] + "..." if len(self.ephemeral_system_prompt) > 60 else self.ephemeral_system_prompt - print(f"🔒 Ephemeral system prompt: '{prompt_preview}' (not saved to trajectories)") - - # Show prompt caching status - if self._use_prompt_caching and not self.quiet_mode: - if self._use_native_cache_layout and self.provider == "anthropic": - source = "native Anthropic" - elif self._use_native_cache_layout: - source = "Anthropic-compatible endpoint" - else: - source = "Claude via OpenRouter" - print(f"💾 Prompt caching: ENABLED ({source}, {self._cache_ttl} TTL)") - - # Session logging setup - auto-save conversation trajectories for debugging - self.session_start = datetime.now() - if session_id: - # Use provided session ID (e.g., from CLI) - self.session_id = session_id - else: - # Generate a new session ID - timestamp_str = self.session_start.strftime("%Y%m%d_%H%M%S") - short_uuid = uuid.uuid4().hex[:6] - self.session_id = f"{timestamp_str}_{short_uuid}" - - # Expose session ID to tools (terminal, execute_code) so agents can - # reference their own session for --resume commands, cross-session - # coordination, and logging. Uses the ContextVar system from - # session_context.py for concurrency safety (gateway runs multiple - # sessions in one process). Also writes os.environ as fallback for - # CLI mode where ContextVars aren't used. - os.environ["HERMES_SESSION_ID"] = self.session_id - try: - from gateway.session_context import _SESSION_ID - _SESSION_ID.set(self.session_id) - except Exception: - pass # CLI/test mode — ContextVar not needed - - # Session logs go into ~/.hermes/sessions/ alongside gateway sessions - hermes_home = get_hermes_home() - self.logs_dir = hermes_home / "sessions" - self.logs_dir.mkdir(parents=True, exist_ok=True) - self.session_log_file = self.logs_dir / f"session_{self.session_id}.json" - - # Track conversation messages for session logging - self._session_messages: List[Dict[str, Any]] = [] - self._memory_write_origin = "assistant_tool" - self._memory_write_context = "foreground" - - # Cached system prompt -- built once per session, only rebuilt on compression - self._cached_system_prompt: Optional[str] = None - - # Filesystem checkpoint manager (transparent — not a tool) - from tools.checkpoint_manager import CheckpointManager - self._checkpoint_mgr = CheckpointManager( - enabled=checkpoints_enabled, - max_snapshots=checkpoint_max_snapshots, - max_total_size_mb=checkpoint_max_total_size_mb, - max_file_size_mb=checkpoint_max_file_size_mb, - ) - - # SQLite session store (optional -- provided by CLI or gateway) - self._session_db = session_db - self._parent_session_id = parent_session_id - self._last_flushed_db_idx = 0 # tracks DB-write cursor to prevent duplicate writes - self._session_db_created = False # DB row deferred to run_conversation() - self._session_init_model_config = { - "max_iterations": self.max_iterations, - "reasoning_config": reasoning_config, - "max_tokens": max_tokens, - } - - # In-memory todo list for task planning (one per agent/session) - from tools.todo_tool import TodoStore - self._todo_store = TodoStore() - - # Load config once for memory, skills, and compression sections - try: - from hermes_cli.config import load_config as _load_agent_config - _agent_cfg = _load_agent_config() - except Exception: - _agent_cfg = {} - try: - self._tool_guardrails = ToolCallGuardrailController( - ToolCallGuardrailConfig.from_mapping( - _agent_cfg.get("tool_loop_guardrails", {}) - ) - ) - except Exception as _tlg_err: - logger.warning("Tool loop guardrail config ignored: %s", _tlg_err) - # Cache only the derived auxiliary compression context override that is - # needed later by the startup feasibility check. Avoid exposing a - # broad pseudo-public config object on the agent instance. - self._aux_compression_context_length_config = None - - # Persistent memory (MEMORY.md + USER.md) -- loaded from disk - self._memory_store = None - self._memory_enabled = False - self._user_profile_enabled = False - self._memory_nudge_interval = 10 - self._turns_since_memory = 0 - self._iters_since_skill = 0 - if not skip_memory: - try: - mem_config = _agent_cfg.get("memory", {}) - self._memory_enabled = mem_config.get("memory_enabled", False) - self._user_profile_enabled = mem_config.get("user_profile_enabled", False) - self._memory_nudge_interval = int(mem_config.get("nudge_interval", 10)) - if self._memory_enabled or self._user_profile_enabled: - from tools.memory_tool import MemoryStore - self._memory_store = MemoryStore( - memory_char_limit=mem_config.get("memory_char_limit", 2200), - user_char_limit=mem_config.get("user_char_limit", 1375), - ) - self._memory_store.load_from_disk() - except Exception: - pass # Memory is optional -- don't break agent init - - - - # Memory provider plugin (external — one at a time, alongside built-in) - # Reads memory.provider from config to select which plugin to activate. - self._memory_manager = None - if not skip_memory: - try: - _mem_provider_name = mem_config.get("provider", "") if mem_config else "" - - if _mem_provider_name: - from agent.memory_manager import MemoryManager as _MemoryManager - from plugins.memory import load_memory_provider as _load_mem - self._memory_manager = _MemoryManager() - _mp = _load_mem(_mem_provider_name) - if _mp and _mp.is_available(): - self._memory_manager.add_provider(_mp) - if self._memory_manager.providers: - _init_kwargs = { - "session_id": self.session_id, - "platform": platform or "cli", - "hermes_home": str(get_hermes_home()), - "agent_context": "primary", - } - # Thread session title for memory provider scoping - # (e.g. honcho uses this to derive chat-scoped session keys) - if self._session_db: - try: - _st = self._session_db.get_session_title(self.session_id) - if _st: - _init_kwargs["session_title"] = _st - except Exception: - pass - # Thread gateway user identity for per-user memory scoping - if self._user_id: - _init_kwargs["user_id"] = self._user_id - if self._user_name: - _init_kwargs["user_name"] = self._user_name - if self._chat_id: - _init_kwargs["chat_id"] = self._chat_id - if self._chat_name: - _init_kwargs["chat_name"] = self._chat_name - if self._chat_type: - _init_kwargs["chat_type"] = self._chat_type - if self._thread_id: - _init_kwargs["thread_id"] = self._thread_id - # Thread gateway session key for stable per-chat Honcho session isolation - if self._gateway_session_key: - _init_kwargs["gateway_session_key"] = self._gateway_session_key - # Profile identity for per-profile provider scoping - try: - from hermes_cli.profiles import get_active_profile_name - _profile = get_active_profile_name() - _init_kwargs["agent_identity"] = _profile - _init_kwargs["agent_workspace"] = "hermes" - except Exception: - pass - self._memory_manager.initialize_all(**_init_kwargs) - logger.info("Memory provider '%s' activated", _mem_provider_name) - else: - logger.debug("Memory provider '%s' not found or not available", _mem_provider_name) - self._memory_manager = None - except Exception as _mpe: - logger.warning("Memory provider plugin init failed: %s", _mpe) - self._memory_manager = None - - # Inject memory provider tool schemas into the tool surface. - # Skip tools whose names already exist (plugins may register the - # same tools via ctx.register_tool(), which lands in self.tools - # through get_tool_definitions()). Duplicate function names cause - # 400 errors on providers that enforce unique names (e.g. Xiaomi - # MiMo via Nous Portal). - if self._memory_manager and self.tools is not None: - _existing_tool_names = { - t.get("function", {}).get("name") - for t in self.tools - if isinstance(t, dict) - } - for _schema in self._memory_manager.get_all_tool_schemas(): - _tname = _schema.get("name", "") - if _tname and _tname in _existing_tool_names: - continue # already registered via plugin path - _wrapped = {"type": "function", "function": _schema} - self.tools.append(_wrapped) - if _tname: - self.valid_tool_names.add(_tname) - _existing_tool_names.add(_tname) - - # Skills config: nudge interval for skill creation reminders - self._skill_nudge_interval = 10 - try: - skills_config = _agent_cfg.get("skills", {}) - self._skill_nudge_interval = int(skills_config.get("creation_nudge_interval", 10)) - except Exception: - pass - - # Tool-use enforcement config: "auto" (default — matches hardcoded - # model list), true (always), false (never), or list of substrings. - _agent_section = _agent_cfg.get("agent", {}) - if not isinstance(_agent_section, dict): - _agent_section = {} - self._tool_use_enforcement = _agent_section.get("tool_use_enforcement", "auto") - - # App-level API retry count (wraps each model API call). Default 3, - # overridable via agent.api_max_retries in config.yaml. See #11616. - try: - _raw_api_retries = _agent_section.get("api_max_retries", 3) - _api_retries = int(_raw_api_retries) - _api_retries = max(_api_retries, 1) # 1 = no retry (single attempt) - except (TypeError, ValueError): - _api_retries = 3 - self._api_max_retries = _api_retries - - # Initialize context compressor for automatic context management - # Compresses conversation when approaching model's context limit - # Configuration via config.yaml (compression section) - _compression_cfg = _agent_cfg.get("compression", {}) - if not isinstance(_compression_cfg, dict): - _compression_cfg = {} - compression_threshold = float(_compression_cfg.get("threshold", 0.50)) - try: - from agent.auxiliary_client import _compression_threshold_for_model as _cthresh_fn - _model_cthresh = _cthresh_fn(self.model) - if _model_cthresh is not None: - compression_threshold = _model_cthresh - except Exception: - pass - compression_enabled = str(_compression_cfg.get("enabled", True)).lower() in {"true", "1", "yes"} - compression_target_ratio = float(_compression_cfg.get("target_ratio", 0.20)) - compression_protect_last = int(_compression_cfg.get("protect_last_n", 20)) - # protect_first_n is the number of non-system messages to protect at - # the head, in addition to the system prompt (which is always - # implicitly protected by the compressor). Floor at 0 — a value of - # 0 means "preserve only the system prompt + summary + tail", which - # is a legitimate (and common) configuration for long-running - # rolling-compaction sessions. - compression_protect_first = max( - 0, int(_compression_cfg.get("protect_first_n", 3)) - ) - - # Read optional explicit context_length override for the auxiliary - # compression model. Custom endpoints often cannot report this via - # /models, so the startup feasibility check needs the config hint. - try: - _aux_cfg = cfg_get(_agent_cfg, "auxiliary", "compression", default={}) - except Exception: - _aux_cfg = {} - if isinstance(_aux_cfg, dict): - _aux_context_config = _aux_cfg.get("context_length") - else: - _aux_context_config = None - if _aux_context_config is not None: - try: - _aux_context_config = int(_aux_context_config) - except (TypeError, ValueError): - _aux_context_config = None - self._aux_compression_context_length_config = _aux_context_config - - # Read explicit model output-token override from config when the - # caller did not pass one directly. - _model_cfg = _agent_cfg.get("model", {}) - if self.max_tokens is None and isinstance(_model_cfg, dict): - _config_max_tokens = _model_cfg.get("max_tokens") - if _config_max_tokens is not None: - try: - if isinstance(_config_max_tokens, bool): - raise ValueError - _parsed_max_tokens = int(_config_max_tokens) - if _parsed_max_tokens <= 0: - raise ValueError - self.max_tokens = _parsed_max_tokens - except (TypeError, ValueError): - logger.warning( - "Invalid model.max_tokens in config.yaml: %r — " - "must be a positive integer (e.g. 4096). " - "Falling back to provider default.", - _config_max_tokens, - ) - print( - f"\n⚠ Invalid model.max_tokens in config.yaml: {_config_max_tokens!r}\n" - f" Must be a positive integer (e.g. 4096).\n" - f" Falling back to provider default.\n", - file=sys.stderr, - ) - self._session_init_model_config["max_tokens"] = self.max_tokens - - # Read explicit context_length override from model config - if isinstance(_model_cfg, dict): - _config_context_length = _model_cfg.get("context_length") - else: - _config_context_length = None - if _config_context_length is not None: - try: - _config_context_length = int(_config_context_length) - except (TypeError, ValueError): - logger.warning( - "Invalid model.context_length in config.yaml: %r — " - "must be a plain integer (e.g. 256000, not '256K'). " - "Falling back to auto-detection.", - _config_context_length, - ) - print( - f"\n⚠ Invalid model.context_length in config.yaml: {_config_context_length!r}\n" - f" Must be a plain integer (e.g. 256000, not '256K').\n" - f" Falling back to auto-detected context window.\n", - file=sys.stderr, - ) - _config_context_length = None - - # Resolve custom_providers list once for reuse below (startup - # context-length override and plugin context-engine init). - try: - from hermes_cli.config import get_compatible_custom_providers - _custom_providers = get_compatible_custom_providers(_agent_cfg) - except Exception: - _custom_providers = _agent_cfg.get("custom_providers") - if not isinstance(_custom_providers, list): - _custom_providers = [] - - # Store for reuse by _check_compression_model_feasibility (auxiliary - # compression model context-length detection needs the same list). - self._custom_providers = _custom_providers - - # Check custom_providers per-model context_length - if _config_context_length is None and _custom_providers: - try: - from hermes_cli.config import get_custom_provider_context_length - _cp_ctx_resolved = get_custom_provider_context_length( - model=self.model, - base_url=self.base_url, - custom_providers=_custom_providers, - ) - if _cp_ctx_resolved: - _config_context_length = int(_cp_ctx_resolved) - except Exception: - _cp_ctx_resolved = None - - # Surface a clear warning if the user set a context_length but it - # wasn't a valid positive int — the helper silently skips those. - if _config_context_length is None: - _target = self.base_url.rstrip("/") if self.base_url else "" - for _cp_entry in _custom_providers: - if not isinstance(_cp_entry, dict): - continue - _cp_url = (_cp_entry.get("base_url") or "").rstrip("/") - if _target and _cp_url == _target: - _cp_models = _cp_entry.get("models", {}) - if isinstance(_cp_models, dict): - _cp_model_cfg = _cp_models.get(self.model, {}) - if isinstance(_cp_model_cfg, dict): - _cp_ctx = _cp_model_cfg.get("context_length") - if _cp_ctx is not None: - try: - _parsed = int(_cp_ctx) - if _parsed <= 0: - raise ValueError - except (TypeError, ValueError): - logger.warning( - "Invalid context_length for model %r in " - "custom_providers: %r — must be a positive " - "integer (e.g. 256000, not '256K'). " - "Falling back to auto-detection.", - self.model, _cp_ctx, - ) - print( - f"\n⚠ Invalid context_length for model {self.model!r} in custom_providers: {_cp_ctx!r}\n" - f" Must be a positive integer (e.g. 256000, not '256K').\n" - f" Falling back to auto-detected context window.\n", - file=sys.stderr, - ) - break - - # Persist for reuse on switch_model / fallback activation. Must come - # AFTER the custom_providers branch so per-model overrides aren't lost. - self._config_context_length = _config_context_length - - self._ensure_lmstudio_runtime_loaded(_config_context_length) - - - - # Select context engine: config-driven (like memory providers). - # 1. Check config.yaml context.engine setting - # 2. Check plugins/context_engine// directory (repo-shipped) - # 3. Check general plugin system (user-installed plugins) - # 4. Fall back to built-in ContextCompressor - _selected_engine = None - _engine_name = "compressor" # default - try: - _ctx_cfg = _agent_cfg.get("context", {}) if isinstance(_agent_cfg, dict) else {} - _engine_name = _ctx_cfg.get("engine", "compressor") or "compressor" - except Exception: - pass - - if _engine_name != "compressor": - # Try loading from plugins/context_engine// - try: - from plugins.context_engine import load_context_engine - _selected_engine = load_context_engine(_engine_name) - except Exception as _ce_load_err: - logger.debug("Context engine load from plugins/context_engine/: %s", _ce_load_err) - - # Try general plugin system as fallback - if _selected_engine is None: - try: - from hermes_cli.plugins import get_plugin_context_engine - _candidate = get_plugin_context_engine() - if _candidate and _candidate.name == _engine_name: - _selected_engine = _candidate - except Exception: - pass - - if _selected_engine is None: - logger.warning( - "Context engine '%s' not found — falling back to built-in compressor", - _engine_name, - ) - # else: config says "compressor" — use built-in, don't auto-activate plugins - - if _selected_engine is not None: - self.context_compressor = _selected_engine - # Resolve context_length for plugin engines — mirrors switch_model() path - from agent.model_metadata import get_model_context_length - _plugin_ctx_len = get_model_context_length( - self.model, - base_url=self.base_url, - api_key=getattr(self, "api_key", ""), - config_context_length=_config_context_length, - provider=self.provider, - custom_providers=_custom_providers, - ) - self.context_compressor.update_model( - model=self.model, - context_length=_plugin_ctx_len, - base_url=self.base_url, - api_key=getattr(self, "api_key", ""), - provider=self.provider, - ) - if not self.quiet_mode: - logger.info("Using context engine: %s", _selected_engine.name) - else: - self.context_compressor = ContextCompressor( - model=self.model, - threshold_percent=compression_threshold, - protect_first_n=compression_protect_first, - protect_last_n=compression_protect_last, - summary_target_ratio=compression_target_ratio, - summary_model_override=None, - quiet_mode=self.quiet_mode, - base_url=self.base_url, - api_key=getattr(self, "api_key", ""), - config_context_length=_config_context_length, - provider=self.provider, - api_mode=self.api_mode, - ) - self.compression_enabled = compression_enabled - - # Reject models whose context window is below the minimum required - # for reliable tool-calling workflows (64K tokens). - from agent.model_metadata import MINIMUM_CONTEXT_LENGTH - _ctx = getattr(self.context_compressor, "context_length", 0) - if _ctx and _ctx < MINIMUM_CONTEXT_LENGTH: - raise ValueError( - f"Model {self.model} has a context window of {_ctx:,} tokens, " - f"which is below the minimum {MINIMUM_CONTEXT_LENGTH:,} required " - f"by Hermes Agent. Choose a model with at least " - f"{MINIMUM_CONTEXT_LENGTH // 1000}K context, or set " - f"model.context_length in config.yaml to override." - ) - - # Inject context engine tool schemas (e.g. lcm_grep, lcm_describe, lcm_expand). - # Skip names that are already present — the get_tool_definitions() - # quiet_mode cache returned a shared list pre-#17335, so a stray - # mutation here would poison subsequent agent inits in the same - # Gateway process and trip provider-side 'duplicate tool name' - # errors. Even with the cache fix, dedup is the right defense - # against plugin paths that may register the same schemas via - # ctx.register_tool(). Mirrors the memory tools dedup above. - self._context_engine_tool_names: set = set() - if hasattr(self, "context_compressor") and self.context_compressor and self.tools is not None: - _existing_tool_names = { - t.get("function", {}).get("name") - for t in self.tools - if isinstance(t, dict) - } - for _schema in self.context_compressor.get_tool_schemas(): - _tname = _schema.get("name", "") - if _tname and _tname in _existing_tool_names: - continue # already registered via plugin/cache path - _wrapped = {"type": "function", "function": _schema} - self.tools.append(_wrapped) - if _tname: - self.valid_tool_names.add(_tname) - self._context_engine_tool_names.add(_tname) - _existing_tool_names.add(_tname) - - # Notify context engine of session start - if hasattr(self, "context_compressor") and self.context_compressor: - try: - self.context_compressor.on_session_start( - self.session_id, - hermes_home=str(get_hermes_home()), - platform=self.platform or "cli", - model=self.model, - context_length=getattr(self.context_compressor, "context_length", 0), - ) - except Exception as _ce_err: - logger.debug("Context engine on_session_start: %s", _ce_err) - - self._subdirectory_hints = SubdirectoryHintTracker( - working_dir=os.getenv("TERMINAL_CWD") or None, - ) - self._user_turn_count = 0 - - # Cumulative token usage for the session - self.session_prompt_tokens = 0 - self.session_completion_tokens = 0 - self.session_total_tokens = 0 - self.session_api_calls = 0 - self.session_input_tokens = 0 - self.session_output_tokens = 0 - self.session_cache_read_tokens = 0 - self.session_cache_write_tokens = 0 - self.session_reasoning_tokens = 0 - self.session_estimated_cost_usd = 0.0 - self.session_cost_status = "unknown" - self.session_cost_source = "none" - - # ── Ollama num_ctx injection ── - # Ollama defaults to 2048 context regardless of the model's capabilities. - # When running against an Ollama server, detect the model's max context - # and pass num_ctx on every chat request so the full window is used. - # User override: set model.ollama_num_ctx in config.yaml to cap VRAM use. - # If model.context_length is set, it caps num_ctx so the user's VRAM - # budget is respected even when GGUF metadata advertises a larger window. - self._ollama_num_ctx: int | None = None - _ollama_num_ctx_override = None - if isinstance(_model_cfg, dict): - _ollama_num_ctx_override = _model_cfg.get("ollama_num_ctx") - if _ollama_num_ctx_override is not None: - try: - self._ollama_num_ctx = int(_ollama_num_ctx_override) - except (TypeError, ValueError): - logger.debug("Invalid ollama_num_ctx config value: %r", _ollama_num_ctx_override) - if self._ollama_num_ctx is None and self.base_url and is_local_endpoint(self.base_url): - try: - _detected = query_ollama_num_ctx(self.model, self.base_url, api_key=self.api_key or "") - if _detected and _detected > 0: - self._ollama_num_ctx = _detected - except Exception as exc: - logger.debug("Ollama num_ctx detection failed: %s", exc) - # Cap auto-detected ollama_num_ctx to the user's explicit context_length. - # Without this, GGUF metadata can advertise 256K+ which Ollama honours - # by allocating that much VRAM — blowing up small GPUs even though the - # user explicitly set a smaller context_length in config.yaml. - if ( - self._ollama_num_ctx - and _config_context_length - and _ollama_num_ctx_override is None # don't override explicit ollama_num_ctx - and self._ollama_num_ctx > _config_context_length - ): - logger.info( - "Ollama num_ctx capped: %d -> %d (model.context_length override)", - self._ollama_num_ctx, _config_context_length, - ) - self._ollama_num_ctx = _config_context_length - if self._ollama_num_ctx and not self.quiet_mode: - logger.info( - "Ollama num_ctx: will request %d tokens (model max from /api/show)", - self._ollama_num_ctx, - ) - - if not self.quiet_mode: - if compression_enabled: - print(f"📊 Context limit: {self.context_compressor.context_length:,} tokens (compress at {int(compression_threshold*100)}% = {self.context_compressor.threshold_tokens:,})") - else: - print(f"📊 Context limit: {self.context_compressor.context_length:,} tokens (auto-compression disabled)") - - # Check immediately so CLI users see the warning at startup. - # Gateway status_callback is not yet wired, so any warning is stored - # in _compression_warning and replayed in the first run_conversation(). - self._compression_warning = None - self._check_compression_model_feasibility() - - # Snapshot primary runtime for per-turn restoration. When fallback - # activates during a turn, the next turn restores these values so the - # preferred model gets a fresh attempt each time. Uses a single dict - # so new state fields are easy to add without N individual attributes. - _cc = self.context_compressor - self._primary_runtime = { - "model": self.model, - "provider": self.provider, - "base_url": self.base_url, - "api_mode": self.api_mode, - "api_key": getattr(self, "api_key", ""), - "client_kwargs": dict(self._client_kwargs), - "use_prompt_caching": self._use_prompt_caching, - "use_native_cache_layout": self._use_native_cache_layout, - # Context engine state that _try_activate_fallback() overwrites. - # Use getattr for model/base_url/api_key/provider since plugin - # engines may not have these (they're ContextCompressor-specific). - "compressor_model": getattr(_cc, "model", self.model), - "compressor_base_url": getattr(_cc, "base_url", self.base_url), - "compressor_api_key": getattr(_cc, "api_key", ""), - "compressor_provider": getattr(_cc, "provider", self.provider), - "compressor_context_length": _cc.context_length, - "compressor_threshold_tokens": _cc.threshold_tokens, - } - if self.api_mode == "anthropic_messages": - self._primary_runtime.update({ - "anthropic_api_key": self._anthropic_api_key, - "anthropic_base_url": self._anthropic_base_url, - "is_anthropic_oauth": self._is_anthropic_oauth, - }) + """Forwarder — see ``agent.agent_init.init_agent``.""" + from agent.agent_init import init_agent + init_agent(self, base_url, api_key, provider, api_mode, acp_command, acp_args, command, args, model, max_iterations, tool_delay, enabled_toolsets, disabled_toolsets, save_trajectories, verbose_logging, quiet_mode, ephemeral_system_prompt, log_prefix_chars, log_prefix, providers_allowed, providers_ignored, providers_order, provider_sort, provider_require_parameters, provider_data_collection, openrouter_min_coding_score, session_id, tool_progress_callback, tool_start_callback, tool_complete_callback, thinking_callback, reasoning_callback, clarify_callback, step_callback, stream_delta_callback, interim_assistant_callback, tool_gen_callback, status_callback, max_tokens, reasoning_config, service_tier, request_overrides, prefill_messages, platform, user_id, user_name, chat_id, chat_name, chat_type, thread_id, gateway_session_key, skip_context_files, load_soul_identity, skip_memory, session_db, parent_session_id, iteration_budget, fallback_model, credential_pool, checkpoints_enabled, checkpoint_max_snapshots, checkpoint_max_total_size_mb, checkpoint_max_file_size_mb, pass_session_id) def _get_session_db_for_recall(self): """Return a SessionDB for recall, lazily creating it if an entrypoint forgot.