hermes-agent/agent/agent_init.py
Siddharth Balyan fcb1944b4f
Some checks are pending
Deploy Site / deploy-vercel (push) Waiting to run
Deploy Site / deploy-docs (push) Waiting to run
Docker Build and Publish / build-amd64 (push) Waiting to run
Docker Build and Publish / build-arm64 (push) Waiting to run
Docker Build and Publish / merge (push) Blocked by required conditions
Lint (ruff + ty) / ruff + ty diff (push) Waiting to run
Lint (ruff + ty) / ruff enforcement (blocking) (push) Waiting to run
Lint (ruff + ty) / Windows footguns (blocking) (push) Waiting to run
Nix Lockfile Fix / auto-fix-main (push) Waiting to run
Nix Lockfile Fix / fix (push) Waiting to run
Nix / nix (macos-latest) (push) Waiting to run
Nix / nix (ubuntu-latest) (push) Waiting to run
OSV-Scanner / Scan lockfiles (push) Waiting to run
Tests / test (1) (push) Waiting to run
Tests / test (2) (push) Waiting to run
Tests / test (3) (push) Waiting to run
Tests / test (4) (push) Waiting to run
Tests / test (5) (push) Waiting to run
Tests / test (6) (push) Waiting to run
Tests / save-durations (push) Blocked by required conditions
Tests / e2e (push) Waiting to run
uv.lock check / uv lock --check (push) Waiting to run
feat(credits): usage-aware credits — in-session notices, /usage view, dev readout (#40011)
* feat(tui): HERMES_DEV_CREDITS live-spend dev readout (L0 tracer for usage-aware credits)

L0 of the usage-aware-credits feature: a dev-only, env-gated tracer that
exercises the real header -> CreditsState -> TUI pipe end-to-end behind
HERMES_DEV_CREDITS, de-risking the L1/L5 build before the notice policy exists.

- agent/credits_tracker.py: CreditsState + parse_credits_headers (headers are
  strings -> paid_access via == "true", never bool(); retain-last-known; only
  subscription_micros may be negative; *_usd kept verbatim).
- run_agent.py: _capture_credits / get_credits_state / get_credits_spent_micros,
  session-start baseline latch, + dev-gated "credits" capture log.
- agent/chat_completion_helpers.py: capture on the streaming response.
- agent/agent_init.py: init _credits_state + _credits_session_start_micros.
- tui_gateway/server.py: _get_usage emits dev_credits_spent_micros only when flagged.
- ui-tui appChrome.tsx / types.ts: cents delta status segment + "(dev credits)" banner.

Off by default; silent for normal users. Validated live against staging
(capture log delta matches the TUI segment). Throwaway consumer (readout/log/
banner); credits_tracker + the capture plumbing are the real feature foundation.

* test(credits): lock parser under 9-state matrix + harden validation (L2)

Add tests/agent/test_credits_tracker.py with 92 tests covering the 9-state
matrix (healthy, sub_90pct, grant_exhausted, purchased_only, tool_pool_free,
depleted, debt, missing, no_org) plus validation edge cases: version strict==1
with warn-once latch for v>1, bool-string trap (paid_access/tool_pool_gated_off
== "true"/"false", never bool()), half-pair subscription limit treated as
both-absent while parse succeeds, USD regex ^-?\d+\.\d{2}$, non-int micros
→ None, negative non-subscription micros → None, as_of_ms junk → None, zero
limit ZeroDivision guard.

Harden agent/credits_tracker.py to match the spec:
- Add tool_pool_micros/tool_pool_gated_off/from_header fields to CreditsState
- Add depleted property (== not paid_access, never remaining==0)
- Change used_fraction guard to key off subscription_limit_micros (the actual
  denominator) not denominator_kind (metadata)
- Replace fail-soft _safe_int with a sentinel-returning variant; full validation
  now returns None on any malformed field rather than silently defaulting
- Add module-level warn-once latch for version > 1
- Add USD regex validation; add denominator_kind allow-list check
- Parse x-nous-tool-pool-* prefix headers (not x-nous-credits-tool-pool-*)

* feat(credits): notice spine — AgentNotice + notice_callback/notice_clear_callback + TUI binding (L1)

L1 of usage-aware credits: the driver-agnostic notice delivery spine that L4's
policy will fire through and L5's TUI render will consume.

- agent/credits_tracker.py: AgentNotice dataclass (text/level/kind/ttl_ms/key/id;
  kind defaults "sticky", kept TTL-expressive for a future config seam).
- run_agent.py: AIAgent gains notice_callback + notice_clear_callback slots and
  _emit_notice / _emit_notice_clear emitters (swallow all callback errors — a
  notice must never break the agent loop; no-op when unbound).
- agent/agent_init.py: thread both callbacks through init_agent.
- tui_gateway/server.py: bind both in _agent_cbs → notification.show / notification.clear
  WS events (snake_case payload, matching the existing gateway-event convention).
- ui-tui/src/gatewayTypes.ts: notification.show / notification.clear arms on GatewayEvent.
- tests/run_agent/test_notice_spine.py: 15 tests (emitter fire + fail-open + no-op,
  signature threading, TUI binding payload shape).

Messaging push is out of v1 (binds neither callback). CLI binding + the TUI render/
decode land with L4 (firing) and L5 (render) so turn-end flush is wired correctly.

* feat(credits): threshold reconciliation policy + tests (L4.1)

* feat(credits): wire threshold policy into capture + latch (L4.2)

After a fresh header parse, _capture_credits runs evaluate_credits_notices against
the agent's _credits_latch and emits the result — clears first, then shows (so a
recovered depletion clears before the "restored" success lands, and depleted wins
the latest-wins slot). Gated on a bound notice_callback: messaging (no callbacks)
still caches state for /usage but runs no policy. Parse stays fail-open (miss →
keep last-known); the eval/emit path warns on failure rather than swallowing, so a
depletion-notice bug can't vanish silently.

- run_agent.py: _capture_credits split into parse (swallow→miss) + policy (warn);
  latch lazy-guarded (object.__new__ safety).
- agent/agent_init.py: init agent._credits_latch = {"active": set(), "seen_below_90": False}.

* feat(tui): render credits notices in the status bar (L5, Strategy B)

The TUI now renders the notification.show / notification.clear gateway events the
agent emits — a level-colored notice overrides the status/verb slot when not busy.

- Notice state machine on turnController (pendingNotice + dedicated noticeTimer +
  show/clear/applyNotice/flushPendingNotice/clearNoticeState). createGatewayEventHandler
  decodes the events and delegates.
- Render priority busy > notice > status (appChrome StatusRule); notice text rendered
  verbatim (its glyph comes from the policy), shrinkable so it never clips model│ctx;
  dev-credits banner + Δ segment preserved. UiState.notice is snake_case (matches wire).
- Busy-wins: a notice arriving mid-turn is held and flushed at the THREE turn-end sites
  (recordMessageComplete / interruptTurn / recordError) — never idle(), which reset()
  also calls (would leak across sessions); reset() clears instead.
- Dedicated noticeTimer (never statusTimer); TTL starts on visibility with an id-guard;
  latest-wins cancels the prior timer; clear is key-matched (no-op on mismatch); a sticky
  survives a turn (flush no-ops with no pending); session reset clears (no cross-session leak).
- 20 tests (handler/turnController logic incl. R3-C2 timer isolation + render priority).

* feat(credits): cold-start seed for new Nous sessions (L3)

A genuinely-new Nous session has no inference header yet, so seed credits state from
the authoritative GET /api/oauth/account snapshot at session start (in the new-session
branch of _restore_or_build_system_prompt — inline, since the on_session_start plugin
hook gets no agent reference). The seed runs the shared notice policy, so a session that
opens already depleted warns IMMEDIATELY rather than only after the first turn.

- Maps the nested account fields (paid_service_access → paid_access; total_usable /
  subscription / purchased on paid_service_access_info; rollover on subscription), each
  None-guarded; float dollars → micros via round(d*1e6), *_usd left "" (render formats
  from micros — never synthesize a verbatim usd from a float).
- Magnitudes-only: no monthlyCredits on the endpoint → subscription_limit_* unset →
  used_fraction None → no warn90 from the seed (% only once a header lands, per D-E).
- Provider-guarded to Nous; fail-open (any error leaves _credits_state None, never
  blocks startup); paid_access unknown ⇒ True (never falsely depleted).
- run_agent.py: extracted the warm-path policy/emit block into a shared
  _emit_credits_notices() so capture and the seed fire notices identically.

* feat(credits): /usage Nous credits magnitudes view + recovery trigger (L6)

Add Nous credit dollar magnitudes to /usage (subscription / top-up / total
+ rollover + renewal + portal CTA), magnitudes-only per v1 (no % until the
account endpoint exposes a denominator). Reuses the existing account-usage
render machinery via a new pure build_nous_credits_snapshot() that maps a
NousPortalAccountInfo to an AccountUsageSnapshot; no nous branch is added to
fetch_account_usage (keeps the per-provider boundary intact).

CLI /usage also doubles as a depletion-recovery trigger: a force_fresh
account fetch, kept in a SEPARATE local so it never clobbers the
header-sourced agent._credits_state (which alone carries used_fraction). If
paid access recovered while credits.depleted is latched and a notice
consumer is bound, it reuses agent._emit_credits_notices() to clear it.
Gateway /usage displays magnitudes only — messaging binds no notice
consumer, so it performs no recovery emit.

Fail-open throughout: any portal hiccup leaves /usage unaffected.

* refactor(credits): dedupe HERMES_DEV_CREDITS flag parse via shared helpers

The dev-flag truthy check was inlined in three places. Replace with the shared
utils.is_truthy_value (run_agent.py, tui_gateway/server.py — also drops a
redundant inline `import os`) and a hoisted DEV_CREDITS_MODE export in
ui-tui/src/config/env.ts (consumed by appChrome, which also stops recomputing the
env check on every render). Behaviour-preserving; identical truthy set.

* fix(credits): cut dead /usage recovery trigger + bound portal fetches (L6 review)

Adversarial review found the /usage depletion-recovery trigger dead AND broken:
the CLI binds no notice_clear_callback, the TUI runs /usage in a separate
slash-worker subprocess (its own agent/latch), and the no-clobber rule made it
evaluate stale paid_access anyway. Recovery already happens on the next inference
(warm path), so the trigger was redundant — remove it and stop the depleted
notice over-promising.

- cli.py: remove the dead recovery block; bound the /usage portal fetch with a
  10s wall-clock timeout (ThreadPoolExecutor) like the per-provider fetch —
  urllib's per-socket timeout is not a wall-clock guarantee.
- agent/credits_tracker.py: reword the depleted CTA to "run /usage for balance"
  (no false recovery promise; /usage shows fresh magnitudes, sticky clears next turn).
- agent/conversation_loop.py: same wall-clock timeout on the cold-start seed fetch
  so a stalled portal can't hang session startup; tidy its time import.

* chore(credits): dev notice-state fixtures (HERMES_DEV_CREDITS_FIXTURE)

Throwaway dev scaffolding to exercise the notice pipeline without real spend or
Redis seeding. Set HERMES_DEV_CREDITS_FIXTURE to a state name (healthy / sub_90pct
/ grant_exhausted / depleted / clear) or a file path whose contents name a state
(re-read each turn → flip states live for recovery testing). _capture_credits
injects the chosen CreditsState instead of parsing real headers and runs the
shared notice policy. Deletable with the rest of the HERMES_DEV_CREDITS scaffolding.

* feat(credits): /usage monthly-grant % gauge

The portal /api/oauth/account subscription block now carries monthly_credits
(the per-period grant allowance, the % denominator). The consumer parsed
monthly_charge but dropped monthly_credits, so /usage stayed magnitudes-only.

Capture monthly_credits into NousPortalSubscriptionInfo + _subscription_from_payload.
build_nous_credits_snapshot emits a Subscription usage window (real % used, routed
through the existing render machinery) when monthly_credits is a finite positive
denominator and credits_remaining is finite and <= cap; otherwise it degrades to
magnitudes-only (older portals, rollover-over-cap, or non-finite payloads).

Guards (adversarial-review-driven): reject non-finite operands (json.loads parses
bare NaN/Infinity by default → would render $nan + a false 100% used), reject
bools, guard div-by-zero (cap>0), and suppress the gauge when remaining > cap
(rollover spanning the period makes the cap a nonsensical denominator → the
$X-of-$Y detail would read as a contradiction). Debt (remaining<0) clamps to 100%.

Money rule preserved: the ratio + magnitudes are computed from numeric float
account fields via display formatting, never by parsing a server *_usd string
(there are none on these dataclasses).

13 gauge tests added (tests/agent/test_nous_credits_gauge.py).

* fix(credits): show /usage Nous block whenever a Nous account is present

/usage runs in a slash-worker subprocess whose resolved inference provider is
often not "nous" even when the user has a Nous account, so gating the Nous
credits block on (provider == "nous") hid it entirely — the account data was
fully available but never rendered.

Gate instead on "a Nous account is logged in": a cheap local auth-state lookup
(get_provider_auth_state('nous') has an access_token) decides whether to attempt
the portal fetch, regardless of which provider inference runs on. In the gateway
the block is also lifted out of the 'if provider:' scope so a Nous-credentialled
user with another (or no) resident inference provider still sees their balance.
Fail-open and the per-fetch wall-clock timeout are preserved.

* fix(credits): show /usage Nous block when there's no live agent (TUI slash-worker)

In the TUI, /usage runs in a slash-worker subprocess that resumes the session
WITHOUT building an agent (self.agent is None), so _show_usage early-returned
"(._.) No active agent" before ever reaching the Nous credits block — which is
agent-independent (a portal fetch gated on Nous auth-state). Extract the block
into _print_nous_credits_block() and run it at the no-agent / no-calls
early-returns too (returns True if it printed, so the fallback message only
shows when there's genuinely nothing).

Verified live against staging: the block + monthly-grant gauge now render in the
slash-worker /usage path (previously hidden). The plain CLI REPL + messaging
paths are unchanged (they have a live agent).

* feat(credits): escalating 50/75/90 usage bands (single status line)

Replace the lone 90%-used warning with three escalating bands (50 info, 75 warn,
90 warn) shown as ONE status-bar line: it displays the highest band the
subscription grant has crossed, replaces the line as usage climbs, steps back
down on recovery, and clears below 50%. No stacking, no per-turn churn.

Bands live in a tunable CREDITS_USAGE_BANDS list; the policy derives everything
from it. Single notice key (credits.usage) with a usage_band latch field so the
notice only re-emits when the band actually changes. The crossing gate
(seen_below_90) is preserved so a fresh live session that opens mid-range stays
quiet until it has been observed below the lowest band (cold-start primes it when
it wants an open-high warning). Denominator math unchanged: % = subscription
grant burn (cap - grant_remaining)/cap, clamped [0,1]; top-up never moves the %.

Migrated test_credits_policy.py to the new key + added TestUsageBands (climb,
step-down, recovery-clear, idempotent, inclusive boundaries).

* feat(credits): hydrate notices at session OPEN via shared seed (TUI + first-turn)

Notices previously only fired inside a conversation turn (first message), so a
session that opened already depleted / past a usage band showed nothing at
'ready'. Extract the cold-start seed into a shared seed_credits_at_session_start()
and call it (a) in the TUI/desktop agent build right after the notice callback is
wired (fires at 'ready', before any message) and (b) as the first-turn fallback in
conversation_loop. Idempotent (skips once _credits_state exists) and fail-open.

The seed now maps monthly_credits -> subscription_limit_micros +
denominator_kind='subscription_cap', so used_fraction is computable at seed time
and usage-band warnings (not just depletion) hydrate on open. Primes the crossing
latch so a session opening already in a band warns immediately. Degrades to
depletion-only when monthly_credits is absent (older portals).

Adds test_credits_cold_start.py covering open-at-band, depletion, debt, no-cap
degradation, and the shared seed (fires/idempotent/skips-non-nous).

* feat(credits): /usage monthly-grant % gauge + fixture support + TUI surfacing

agent/account_usage.py: build_nous_credits_snapshot emits a subscription %% gauge
when the portal supplies a positive, finite monthly_credits denominator with
remaining <= cap (guards reject NaN/Infinity and rollover-over-cap, which would
render $nan or a contradictory $X-of-$Y); degrades to magnitudes-only otherwise.
Adds shared nous_credits_lines() (auth-gated, wall-clock-bounded portal fetch) so
the CLI and TUI /usage render the same block, and _snapshot_from_credits_state()
so HERMES_DEV_CREDITS_FIXTURE drives /usage offline too.

TUI: session.usage RPC carries credits_lines (agent-independent) and the /usage
panel renders them regardless of API-call count or resume state — previously the
TUI's separate /usage implementation only showed token counts.

Money rule preserved: %% and magnitudes come from numeric float account fields via
display formatting, never by parsing a server *_usd string.

* feat(credits): CLI REPL inline notices (parity with TUI)

The plain CLI agent bound no notice callbacks, so credit notices were TUI-only.
Bind notice_callback/notice_clear_callback on the CLI AIAgent; _on_notice renders
a single level-colored line above the prompt (error red / warn yellow / success
green / info dim) via _cprint, and seed credits at session open so a depletion or
usage-band warning shows before the first message — the same hydration the TUI
got. _on_notice_clear is a no-op (the REPL prints lines, no persistent slot).

* test(credits): add sub_50pct + sub_75pct dev fixtures for the new usage bands

The fixture set jumped 10%% -> 90%%; add sub_50pct (uf 0.5 -> band 50 info) and
sub_75pct (uf 0.75 -> band 75 warn) so the new escalating bands are exercisable
via HERMES_DEV_CREDITS_FIXTURE across all three surfaces (notice, session-open
seed, /usage gauge).

* fix(credits): usage-band notice clears on next prompt (not sticky-forever)

A 50/75/90 usage heads-up was sticky and camped the status bar indefinitely. Clear
the visible credits.usage notice when a new turn starts (startMessage), so it shows
until your next prompt then yields. The server latch is unchanged, so it won't
re-nag at the same band — it only re-shows when the band actually changes (climb)
or clears when usage drops below the lowest band. Depletion stays sticky.

* refactor(credits): consolidate the /usage credits block behind nous_credits_lines()

The CLI (_print_nous_credits_block) and the messaging gateway (_handle_usage_command)
each re-implemented the auth-gate + portal fetch + render, and both bypassed the
dev-fixture short-circuit that only the TUI honored — so /usage ignored
HERMES_DEV_CREDITS_FIXTURE on the CLI and in chat. Route both through the shared
agent.account_usage.nous_credits_lines() helper: one fetch/render path, one auth
gate, and the fixture works on every surface (~60 fewer duplicated lines).

The gateway usage test recorded only the last asyncio.to_thread call; /usage now
dispatches both the account fetch and the credits fetch, so it records every call
and matches the account fetch by its provider arg.

* fix(credits): keep the /usage gauge type-safe and log its fail-open path

_is_finite_num is now a TypeGuard[float], so the type checker narrows the gauge
operands (monthly_credits / credits_remaining) and the magnitudes passed to
_fmt_usd through it — no more None-operand warnings on the arithmetic. Add a debug
breadcrumb on the nous_credits_lines portal-fetch fail-open so a dead /usage block
is diagnosable in agent.log without a dev flag.

* fix(credits): harden the header tracker — prod-leak gate, hot-path probe, fire-and-forget seed

- Prod-leak guard: dev fixtures (HERMES_DEV_CREDITS_FIXTURE) now also require
  HERMES_DEV_CREDITS, so a stray fixture var can't surface fabricated balances on a
  real account. Matches the documented run workflow (both vars set together).
- Hot-path probe: parse_credits_headers checks for the version sentinel header
  before allocating a lowercased copy of the response headers — skips that work on
  every non-Nous API call. Behaviour-identical and still case-insensitive.
- Fire-and-forget seed: the real portal fetch in seed_credits_at_session_start now
  runs in a daemon thread, so a slow/unreachable portal never delays session "ready"
  (previously blocked up to 10s). The dev-fixture path stays synchronous; the thread
  re-checks idempotency before hydrating (a live header may land first).
- Diagnostics: debug breadcrumbs on the parse and seed fail-open paths so a crashed
  parser / dead seed is distinguishable from a legitimate no-headers miss.

Cold-start tests set HERMES_DEV_CREDITS alongside the fixture to match the gate.

* test(tui): fix env-timing in the StatusRule dev-credits assertion

DEV_CREDITS_MODE is read once at module load (config/env), so mutating
process.env.HERMES_DEV_CREDITS inside the test couldn't flip it — the dev-banner
assertion only passed if the env was exported before vitest started, and failed in a
normal run. Move that assertion to a sibling file that mocks config/env with
DEV_CREDITS_MODE: true (scoped, no module-reset / React-identity hazard).

* test(credits): cover the dev-fixture /usage render and usage-band clear-on-prompt

- _snapshot_from_credits_state (the offline /usage renderer) had no direct test:
  lock the gauge math, the verbatim *_usd magnitudes, the depletion line and the
  fixture marker, plus the no-cap (no gauge) and None-state cases.
- turnController.startMessage had no test for clearing the credits.usage notice on
  the next prompt while leaving credits.depleted sticky.

* feat(credits): deliver credit notices over messaging gateways

Bind notice_callback/notice_clear_callback on the per-turn gateway agent
so usage-band / depletion / restored notices reach Telegram/Discord/Slack/
etc. Previously the messaging gateway bound neither callback, so the agent's
_emit_credits_notices early-returned and a chat user crossing a band got
nothing unless they ran /usage manually.

- render_notice_line(): AgentNotice -> single plaintext line (level glyph +
  text), plaintext-only so it renders uniformly without per-platform escaping.
  Fail-soft on malformed/empty notices.
- Standalone push for every notice (messaging has no persistent status bar):
  route through the shared _deliver_platform_notice rail (honors private/
  public delivery + thread metadata), scheduled onto the gateway loop via
  safe_schedule_threadsafe from the agent's sync worker thread — same pattern
  as _status_callback_sync.
- The fired-once latch lives on the cached (reused-in-place) agent and
  persists across turns, so a band crosses once -> one push, no per-turn
  re-nag. Re-fires only after idle-eviction rebuilds the agent (a reminder).
- Recovery ('Credit access restored') rides the show path (emitted as a
  success notice, not a clear). notice_clear_callback is a no-op: a sent
  platform message can't be cleanly retracted.

Tests: render glyph/levels/fail-soft + public/private delivery seam through
_deliver_platform_notice + no-adapter no-op.

* fix(credits): don't double the glyph on messaging notices

render_notice_line prepended a per-level glyph, but the notice policy already
bakes the glyph into the text (and the TUI + CLI render it verbatim) — so every
credit notice over messaging came out doubled ("⚠ ⚠ Credits 90% used",
" ✕ Credit access paused"). Emit the text verbatim instead; drop the now-dead
level→glyph map.

The render tests fed glyph-less text (and the success case only checked
startswith), so the doubling slipped through. Rework them around the verbatim
contract and add an end-to-end regression that runs real evaluate_credits_notices
output through render_notice_line and asserts the line is returned unchanged.
2026-06-06 13:18:18 +05:30

1670 lines
82 KiB
Python

"""Implementation of :meth:`AIAgent.__init__` — extracted as a module function.
``AIAgent.__init__`` is one of the longest methods in the codebase (60+
parameters, ~1,400 lines of attribute initialization, provider
auto-detection, credential resolution, context-engine bootstrap, etc.).
Keeping it in ``run_agent.py`` bloats that file with code that's mostly
"setup state, then forget".
After this extraction the body lives here as ``init_agent(agent, ...)``
and :meth:`AIAgent.__init__` is a thin wrapper that calls
``init_agent(self, ...)``. All imports the body needs at module-load
time are listed below; the body also performs many lazy imports inside
its own scope that come along unchanged.
Symbols that tests patch on ``run_agent.*`` (``OpenAI``, ``cleanup_vm``,
etc.) are resolved through :func:`_ra` so the patch contract is
preserved.
"""
from __future__ import annotations
import logging
import os
import re
import sys
import threading
import time
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from urllib.parse import urlparse, parse_qs, urlunparse
from agent.context_compressor import ContextCompressor
from agent.iteration_budget import IterationBudget
from agent.memory_manager import StreamingContextScrubber
from agent.model_metadata import (
MINIMUM_CONTEXT_LENGTH,
fetch_model_metadata,
is_local_endpoint,
query_ollama_num_ctx,
)
from agent.process_bootstrap import _install_safe_stdio
from agent.subdirectory_hints import SubdirectoryHintTracker
from agent.think_scrubber import StreamingThinkScrubber
from agent.tool_guardrails import (
ToolCallGuardrailConfig,
ToolCallGuardrailController,
ToolGuardrailDecision,
)
from hermes_cli.config import cfg_get
from hermes_cli.timeouts import get_provider_request_timeout
from hermes_constants import get_hermes_home
from utils import base_url_host_matches
# Use the same logger name as run_agent so tests patching ``run_agent.logger``
# capture our warnings. (run_agent.py also does
# ``logger = logging.getLogger(__name__)``, which resolves to "run_agent"
# from inside that module.)
logger = logging.getLogger("run_agent")
def _ra():
"""Lazy reference to ``run_agent`` so callers can patch
``run_agent.OpenAI`` / ``run_agent.cleanup_vm`` / ... and have those
patches reach this code path.
"""
import run_agent
return run_agent
def _normalized_custom_base_url(value: Any) -> str:
if not isinstance(value, str):
return ""
return value.strip().rstrip("/")
def _custom_provider_model_matches(agent_model: str, entry: Dict[str, Any]) -> bool:
provider_model = str(entry.get("model", "") or "").strip().lower()
if not provider_model:
return True
return provider_model == str(agent_model or "").strip().lower()
def _custom_provider_extra_body_for_agent(
*,
provider: str,
model: str,
base_url: str,
custom_providers: List[Dict[str, Any]],
) -> Optional[Dict[str, Any]]:
if (provider or "").strip().lower() != "custom":
return None
target_url = _normalized_custom_base_url(base_url)
if not target_url:
return None
fallback: Optional[Dict[str, Any]] = None
for entry in custom_providers or []:
if not isinstance(entry, dict):
continue
if _normalized_custom_base_url(entry.get("base_url")) != target_url:
continue
extra_body = entry.get("extra_body")
if not isinstance(extra_body, dict) or not extra_body:
continue
provider_model = str(entry.get("model", "") or "").strip()
if provider_model:
if _custom_provider_model_matches(model, entry):
return dict(extra_body)
elif fallback is None:
fallback = dict(extra_body)
return fallback
def _merge_custom_provider_extra_body(agent, custom_providers: List[Dict[str, Any]]) -> None:
extra_body = _custom_provider_extra_body_for_agent(
provider=agent.provider,
model=agent.model,
base_url=agent.base_url,
custom_providers=custom_providers,
)
if not extra_body:
return
overrides = dict(getattr(agent, "request_overrides", {}) or {})
merged_extra_body = dict(extra_body)
existing_extra_body = overrides.get("extra_body")
if isinstance(existing_extra_body, dict):
merged_extra_body.update(existing_extra_body)
overrides["extra_body"] = merged_extra_body
agent.request_overrides = overrides
def init_agent(
agent,
base_url: str = None,
api_key: str = None,
provider: str = None,
api_mode: str = None,
acp_command: str = None,
acp_args: list[str] | None = None,
command: str = None,
args: list[str] | None = None,
model: str = "",
max_iterations: int = 90, # Default tool-calling iterations (shared with subagents)
tool_delay: float = 1.0,
enabled_toolsets: List[str] = None,
disabled_toolsets: List[str] = None,
save_trajectories: bool = False,
verbose_logging: bool = False,
quiet_mode: bool = False,
ephemeral_system_prompt: str = None,
log_prefix_chars: int = 100,
log_prefix: str = "",
providers_allowed: List[str] = None,
providers_ignored: List[str] = None,
providers_order: List[str] = None,
provider_sort: str = None,
provider_require_parameters: bool = False,
provider_data_collection: str = None,
openrouter_min_coding_score: Optional[float] = None,
session_id: str = None,
tool_progress_callback: callable = None,
tool_start_callback: callable = None,
tool_complete_callback: callable = None,
thinking_callback: callable = None,
reasoning_callback: callable = None,
clarify_callback: callable = None,
step_callback: callable = None,
stream_delta_callback: callable = None,
interim_assistant_callback: callable = None,
tool_gen_callback: callable = None,
status_callback: callable = None,
notice_callback: callable = None,
notice_clear_callback: callable = None,
max_tokens: int = None,
reasoning_config: Dict[str, Any] = None,
service_tier: str = None,
request_overrides: Dict[str, Any] = None,
prefill_messages: List[Dict[str, Any]] = None,
platform: str = None,
user_id: str = None,
user_id_alt: str = None,
user_name: str = None,
chat_id: str = None,
chat_name: str = None,
chat_type: str = None,
thread_id: str = None,
gateway_session_key: str = None,
skip_context_files: bool = False,
load_soul_identity: bool = False,
skip_memory: bool = False,
session_db=None,
parent_session_id: str = None,
iteration_budget: "IterationBudget" = None,
fallback_model: Dict[str, Any] = None,
credential_pool=None,
checkpoints_enabled: bool = False,
checkpoint_max_snapshots: int = 20,
checkpoint_max_total_size_mb: int = 500,
checkpoint_max_file_size_mb: int = 10,
pass_session_id: bool = False,
):
"""
Initialize the AI Agent.
Args:
base_url (str): Base URL for the model API (optional)
api_key (str): API key for authentication (optional, uses env var if not provided)
provider (str): Provider identifier (optional; used for telemetry/routing hints)
api_mode (str): API mode override: "chat_completions" or "codex_responses"
model (str): Model name to use (default: "anthropic/claude-opus-4.6")
max_iterations (int): Maximum number of tool calling iterations (default: 90)
tool_delay (float): Delay between tool calls in seconds (default: 1.0)
enabled_toolsets (List[str]): Only enable tools from these toolsets (optional)
disabled_toolsets (List[str]): Disable tools from these toolsets (optional)
save_trajectories (bool): Whether to save conversation trajectories to JSONL files (default: False)
verbose_logging (bool): Enable verbose logging for debugging (default: False)
quiet_mode (bool): Suppress progress output for clean CLI experience (default: False)
ephemeral_system_prompt (str): System prompt used during agent execution but NOT saved to trajectories (optional)
log_prefix_chars (int): Number of characters to show in log previews for tool calls/responses (default: 100)
log_prefix (str): Prefix to add to all log messages for identification in parallel processing (default: "")
providers_allowed (List[str]): OpenRouter providers to allow (optional)
providers_ignored (List[str]): OpenRouter providers to ignore (optional)
providers_order (List[str]): OpenRouter providers to try in order (optional)
provider_sort (str): Sort providers by price/throughput/latency (optional)
openrouter_min_coding_score (float): Coding-score floor (0.0-1.0) for the
openrouter/pareto-code router. Only applied when model == "openrouter/pareto-code".
None or empty = let OpenRouter pick the strongest available coder.
session_id (str): Pre-generated session ID for logging (optional, auto-generated if not provided)
tool_progress_callback (callable): Callback function(tool_name, args_preview) for progress notifications
clarify_callback (callable): Callback function(question, choices) -> str for interactive user questions.
Provided by the platform layer (CLI or gateway). If None, the clarify tool returns an error.
max_tokens (int): Maximum tokens for model responses (optional, uses model default if not set)
reasoning_config (Dict): OpenRouter reasoning configuration override (e.g. {"effort": "none"} to disable thinking).
If None, defaults to {"enabled": True, "effort": "medium"} for OpenRouter. Set to disable/customize reasoning.
prefill_messages (List[Dict]): Messages to prepend to conversation history as prefilled context.
Useful for injecting a few-shot example or priming the model's response style.
Example: [{"role": "user", "content": "Hi!"}, {"role": "assistant", "content": "Hello!"}]
NOTE: Anthropic Sonnet 4.6+ and Opus 4.6+ reject a conversation that ends on an
assistant-role message (400 error). For those models use structured outputs or
output_config.format instead of a trailing-assistant prefill.
platform (str): The interface platform the user is on (e.g. "cli", "telegram", "discord", "whatsapp").
Used to inject platform-specific formatting hints into the system prompt.
skip_context_files (bool): If True, skip auto-injection of SOUL.md, AGENTS.md, and .cursorrules
into the system prompt. Use this for batch processing and data generation to avoid
polluting trajectories with user-specific persona or project instructions.
load_soul_identity (bool): If True, still use ~/.hermes/SOUL.md as the primary
identity even when skip_context_files=True. Project context files from the cwd
remain skipped.
"""
_install_safe_stdio()
agent.model = model
agent.max_iterations = max_iterations
# Shared iteration budget — parent creates, children inherit.
# Consumed by every LLM turn across parent + all subagents.
agent.iteration_budget = iteration_budget or IterationBudget(max_iterations)
agent.tool_delay = tool_delay
agent.save_trajectories = save_trajectories
agent.verbose_logging = verbose_logging
agent.quiet_mode = quiet_mode
agent.ephemeral_system_prompt = ephemeral_system_prompt
agent.platform = platform # "cli", "telegram", "discord", "whatsapp", etc.
agent._user_id = user_id # Platform user identifier (gateway sessions)
agent._user_id_alt = user_id_alt # Optional stable alternate platform identifier
agent._user_name = user_name
agent._chat_id = chat_id
agent._chat_name = chat_name
agent._chat_type = chat_type
agent._thread_id = thread_id
agent._gateway_session_key = gateway_session_key # Stable per-chat key (e.g. agent:main:telegram:dm:123)
# Pluggable print function — CLI replaces this with _cprint so that
# raw ANSI status lines are routed through prompt_toolkit's renderer
# instead of going directly to stdout where patch_stdout's StdoutProxy
# would mangle the escape sequences. None = use builtins.print.
agent._print_fn = None
agent.background_review_callback = None # Optional sync callback for gateway delivery
agent.skip_context_files = skip_context_files
agent.load_soul_identity = load_soul_identity
agent.pass_session_id = pass_session_id
agent._credential_pool = credential_pool
agent.log_prefix_chars = log_prefix_chars
agent.log_prefix = f"{log_prefix} " if log_prefix else ""
# Store effective base URL for feature detection (prompt caching, reasoning, etc.)
agent.base_url = base_url or ""
provider_name = provider.strip().lower() if isinstance(provider, str) and provider.strip() else None
agent.provider = provider_name or ""
agent.acp_command = acp_command or command
agent.acp_args = list(acp_args or args or [])
if api_mode in {"chat_completions", "codex_responses", "anthropic_messages", "bedrock_converse", "codex_app_server"}:
agent.api_mode = api_mode
elif agent.provider == "openai-codex":
agent.api_mode = "codex_responses"
elif agent.provider in {"xai", "xai-oauth"}:
agent.api_mode = "codex_responses"
elif (provider_name is None) and (
agent._base_url_hostname == "chatgpt.com"
and "/backend-api/codex" in agent._base_url_lower
):
agent.api_mode = "codex_responses"
agent.provider = "openai-codex"
elif (provider_name is None) and agent._base_url_hostname == "api.x.ai":
agent.api_mode = "codex_responses"
agent.provider = "xai"
elif agent.provider == "anthropic" or (provider_name is None and agent._base_url_hostname == "api.anthropic.com"):
agent.api_mode = "anthropic_messages"
agent.provider = "anthropic"
elif agent._base_url_lower.rstrip("/").endswith("/anthropic"):
# Third-party Anthropic-compatible endpoints (e.g. MiniMax, DashScope)
# use a URL convention ending in /anthropic. Auto-detect these so the
# Anthropic Messages API adapter is used instead of chat completions.
agent.api_mode = "anthropic_messages"
elif agent.provider == "bedrock" or (
agent._base_url_hostname.startswith("bedrock-runtime.")
and base_url_host_matches(agent._base_url_lower, "amazonaws.com")
):
# AWS Bedrock — auto-detect from provider name or base URL
# (bedrock-runtime.<region>.amazonaws.com).
agent.api_mode = "bedrock_converse"
else:
agent.api_mode = "chat_completions"
# Eagerly warm the transport cache so import errors surface at init,
# not mid-conversation. Also validates the api_mode is registered.
try:
agent._get_transport()
except Exception:
pass # Non-fatal — transport may not exist for all modes yet
try:
from hermes_cli.model_normalize import (
_AGGREGATOR_PROVIDERS,
normalize_model_for_provider,
)
if agent.provider not in _AGGREGATOR_PROVIDERS:
agent.model = normalize_model_for_provider(agent.model, agent.provider)
except Exception:
pass
# GPT-5.x models usually require the Responses API path, but some
# providers have exceptions (for example Copilot's gpt-5-mini still
# uses chat completions). Also auto-upgrade for direct OpenAI URLs
# (api.openai.com) since all newer tool-calling models prefer
# Responses there. ACP runtimes are excluded: CopilotACPClient
# handles its own routing and does not implement the Responses API
# surface.
# When api_mode was explicitly provided, respect it — the user
# knows what their endpoint supports (#10473).
# Exception: Azure OpenAI serves gpt-5.x on /chat/completions and
# does NOT support the Responses API — skip the upgrade for Azure
# (openai.azure.com), even though it looks OpenAI-compatible.
if (
api_mode is None
and agent.api_mode == "chat_completions"
and agent.provider != "copilot-acp"
and not str(agent.base_url or "").lower().startswith("acp://copilot")
and not str(agent.base_url or "").lower().startswith("acp+tcp://")
and not agent._is_azure_openai_url()
and (
agent._is_direct_openai_url()
or agent._provider_model_requires_responses_api(
agent.model,
provider=agent.provider,
)
)
):
agent.api_mode = "codex_responses"
# Invalidate the eager-warmed transport cache — api_mode changed
# from chat_completions to codex_responses after the warm at __init__.
if hasattr(agent, "_transport_cache"):
agent._transport_cache.clear()
# Pre-warm OpenRouter model metadata cache in a background thread.
# fetch_model_metadata() is cached for 1 hour; this avoids a blocking
# HTTP request on the first API response when pricing is estimated.
# Use a process-level Event so this thread is only spawned once — a new
# AIAgent is created for every gateway request, so without the guard
# each message leaks one OS thread and the process eventually exhausts
# the system thread limit (RuntimeError: can't start new thread).
if (agent.provider == "openrouter" or agent._is_openrouter_url()) and \
not _ra()._openrouter_prewarm_done.is_set():
_ra()._openrouter_prewarm_done.set()
threading.Thread(
target=fetch_model_metadata,
daemon=True,
name="openrouter-prewarm",
).start()
agent.tool_progress_callback = tool_progress_callback
agent.tool_start_callback = tool_start_callback
agent.tool_complete_callback = tool_complete_callback
agent.suppress_status_output = False
agent.thinking_callback = thinking_callback
agent.reasoning_callback = reasoning_callback
agent.clarify_callback = clarify_callback
agent.step_callback = step_callback
agent.stream_delta_callback = stream_delta_callback
agent.interim_assistant_callback = interim_assistant_callback
agent.status_callback = status_callback
agent.notice_callback = notice_callback
agent.notice_clear_callback = notice_clear_callback
agent.tool_gen_callback = tool_gen_callback
# Tool execution state — allows _vprint during tool execution
# even when stream consumers are registered (no tokens streaming then)
agent._executing_tools = False
agent._tool_guardrails = ToolCallGuardrailController()
agent._tool_guardrail_halt_decision: ToolGuardrailDecision | None = None
# Interrupt mechanism for breaking out of tool loops
agent._interrupt_requested = False
agent._interrupt_message = None # Optional message that triggered interrupt
agent._execution_thread_id: int | None = None # Set at run_conversation() start
agent._interrupt_thread_signal_pending = False
agent._client_lock = threading.RLock()
# /steer mechanism — inject a user note into the next tool result
# without interrupting the agent. Unlike interrupt(), steer() does
# NOT set _interrupt_requested; it waits for the current tool batch
# to finish naturally, then the drain hook appends the text to the
# last tool result's content so the model sees it on its next
# iteration. Message-role alternation is preserved (we modify an
# existing tool message rather than inserting a new user turn).
agent._pending_steer: Optional[str] = None
agent._pending_steer_lock = threading.Lock()
# Concurrent-tool worker thread tracking. `_execute_tool_calls_concurrent`
# runs each tool on its own ThreadPoolExecutor worker — those worker
# threads have tids distinct from `_execution_thread_id`, so
# `_set_interrupt(True, _execution_thread_id)` alone does NOT cause
# `is_interrupted()` inside the worker to return True. Track the
# workers here so `interrupt()` / `clear_interrupt()` can fan out to
# their tids explicitly.
agent._tool_worker_threads: set[int] = set()
agent._tool_worker_threads_lock = threading.Lock()
# Subagent delegation state
agent._delegate_depth = 0 # 0 = top-level agent, incremented for children
agent._active_children = [] # Running child AIAgents (for interrupt propagation)
agent._active_children_lock = threading.Lock()
# Store OpenRouter provider preferences
agent.providers_allowed = providers_allowed
agent.providers_ignored = providers_ignored
agent.providers_order = providers_order
agent.provider_sort = provider_sort
agent.provider_require_parameters = provider_require_parameters
agent.provider_data_collection = provider_data_collection
agent.openrouter_min_coding_score = openrouter_min_coding_score
# Store toolset filtering options
agent.enabled_toolsets = enabled_toolsets
agent.disabled_toolsets = disabled_toolsets
# Model response configuration
agent.max_tokens = max_tokens # None = use model default
agent.reasoning_config = reasoning_config # None = use default (medium for OpenRouter)
agent.service_tier = service_tier
agent.request_overrides = dict(request_overrides or {})
agent.prefill_messages = prefill_messages or [] # Prefilled conversation turns
agent._force_ascii_payload = False
# Anthropic prompt caching: auto-enabled for Claude models on native
# Anthropic, OpenRouter, and third-party gateways that speak the
# Anthropic protocol (``api_mode == 'anthropic_messages'``). Reduces
# input costs by ~75% on multi-turn conversations. Uses system_and_3
# strategy (4 breakpoints). See ``_anthropic_prompt_cache_policy``
# for the layout-vs-transport decision.
agent._use_prompt_caching, agent._use_native_cache_layout = (
agent._anthropic_prompt_cache_policy()
)
# Anthropic supports "5m" (default) and "1h" cache TTL tiers. Read from
# config.yaml under prompt_caching.cache_ttl; unknown values keep "5m".
# 1h tier costs 2x on write vs 1.25x for 5m, but amortizes across long
# sessions with >5-minute pauses between turns (#14971).
agent._cache_ttl = "5m"
try:
from hermes_cli.config import load_config as _load_pc_cfg
_pc_cfg = _load_pc_cfg().get("prompt_caching", {}) or {}
_ttl = _pc_cfg.get("cache_ttl", "5m")
if _ttl in {"5m", "1h"}:
agent._cache_ttl = _ttl
except Exception:
pass
# Iteration budget: the LLM is only notified when it actually exhausts
# the iteration budget (api_call_count >= max_iterations). At that
# point we inject ONE message, allow one final API call, and if the
# model doesn't produce a text response, force a user-message asking
# it to summarise. No intermediate pressure warnings — they caused
# models to "give up" prematurely on complex tasks (#7915).
agent._budget_exhausted_injected = False
agent._budget_grace_call = False
# Activity tracking — updated on each API call, tool execution, and
# stream chunk. Used by the gateway timeout handler to report what the
# agent was doing when it was killed, and by the "still working"
# notifications to show progress.
agent._last_activity_ts: float = time.time()
agent._last_activity_desc: str = "initializing"
agent._current_tool: str | None = None
agent._api_call_count: int = 0
# Rate limit tracking — updated from x-ratelimit-* response headers
# after each API call. Accessed by /usage slash command.
agent._rate_limit_state: Optional["RateLimitState"] = None
# Credits tracking (dev-only, L0 usage-aware-credits) — updated from
# x-nous-credits-* response headers after each API call. Session-start
# remaining is latched the first time a header is ever seen so we can
# report cumulative micros spent. Surfaced behind HERMES_DEV_CREDITS.
agent._credits_state = None
agent._credits_session_start_micros = None
# Threshold-notice latch (L4): active sticky-notice keys + the warn90 crossing gate.
agent._credits_latch = {"active": set(), "seen_below_90": False, "usage_band": None}
# OpenRouter response cache hit counter — incremented when
# X-OpenRouter-Cache-Status: HIT is seen in streaming response headers.
agent._or_cache_hits: int = 0
# Centralized logging — agent.log (INFO+) and errors.log (WARNING+)
# both live under ~/.hermes/logs/. Idempotent, so gateway mode
# (which creates a new AIAgent per message) won't duplicate handlers.
from hermes_logging import setup_logging, setup_verbose_logging
setup_logging(hermes_home=_ra()._hermes_home)
if agent.verbose_logging:
setup_verbose_logging()
_ra().logger.info("Verbose logging enabled (third-party library logs suppressed)")
elif agent.quiet_mode:
# In quiet mode (CLI default), keep console output clean —
# but DO NOT raise per-logger levels. Doing so prevents the
# root logger's file handlers (agent.log, errors.log) from
# ever seeing the records, because Python checks
# logger.isEnabledFor() before handler propagation. We rely
# on the fact that hermes_logging.setup_logging() does not
# install a console StreamHandler in quiet mode — so INFO
# records flow to the file handlers but never reach a
# console. Any future noise reduction belongs at the
# handler level inside hermes_logging.py, not here.
pass
# Internal stream callback (set during streaming TTS).
# Initialized here so _vprint can reference it before run_conversation.
agent._stream_callback = None
# Deferred paragraph break flag — set after tool iterations so a
# single "\n\n" is prepended to the next real text delta.
agent._stream_needs_break = False
# Stateful scrubber for <memory-context> spans split across stream
# deltas (#5719). sanitize_context() alone can't survive chunk
# boundaries because the block regex needs both tags in one string.
agent._stream_context_scrubber = StreamingContextScrubber()
# Stateful scrubber for reasoning/thinking tags in streamed deltas
# (#17924). Replaces the per-delta _strip_think_blocks regex that
# destroyed downstream state (e.g. MiniMax-M2.7 streaming
# '<think>' as delta1 and 'Let me check' as delta2 — the regex
# erased delta1, so downstream state machines never learned a
# block was open and leaked delta2 as content).
agent._stream_think_scrubber = StreamingThinkScrubber()
# Visible assistant text already delivered through live token callbacks
# during the current model response. Used to avoid re-sending the same
# commentary when the provider later returns it as a completed interim
# assistant message.
agent._current_streamed_assistant_text = ""
# Optional current-turn user-message override used when the API-facing
# user message intentionally differs from the persisted transcript
# (e.g. CLI voice mode adds a temporary prefix for the live call only).
agent._persist_user_message_idx = None
agent._persist_user_message_override = None
# Cache anthropic image-to-text fallbacks per image payload/URL so a
# single tool loop does not repeatedly re-run auxiliary vision on the
# same image history.
agent._anthropic_image_fallback_cache: Dict[str, str] = {}
# Initialize LLM client via centralized provider router.
# The router handles auth resolution, base URL, headers, and
# Codex/Anthropic wrapping for all known providers.
# raw_codex=True because the main agent needs direct responses.stream()
# access for Codex Responses API streaming.
agent._anthropic_client = None
agent._is_anthropic_oauth = False
# Resolve per-provider / per-model request timeout once up front so
# every client construction path below (Anthropic native, OpenAI-wire,
# router-based implicit auth) can apply it consistently. Bedrock
# Claude uses its own timeout path and is not covered here.
_provider_timeout = get_provider_request_timeout(agent.provider, agent.model)
if agent.api_mode == "anthropic_messages":
from agent.anthropic_adapter import build_anthropic_client, resolve_anthropic_token
# Bedrock + Claude → use AnthropicBedrock SDK for full feature parity
# (prompt caching, thinking budgets, adaptive thinking).
_is_bedrock_anthropic = agent.provider == "bedrock"
if _is_bedrock_anthropic:
from agent.anthropic_adapter import build_anthropic_bedrock_client
_region_match = re.search(r"bedrock-runtime\.([a-z0-9-]+)\.", base_url or "")
_br_region = _region_match.group(1) if _region_match else "us-east-1"
agent._bedrock_region = _br_region
agent._anthropic_client = build_anthropic_bedrock_client(_br_region)
agent._anthropic_api_key = "aws-sdk"
agent._anthropic_base_url = base_url
agent._is_anthropic_oauth = False
agent.api_key = "aws-sdk"
agent.client = None
agent._client_kwargs = {}
if not agent.quiet_mode:
print(f"🤖 AI Agent initialized with model: {agent.model} (AWS Bedrock + AnthropicBedrock SDK, {_br_region})")
else:
# Only fall back to ANTHROPIC_TOKEN when the provider is actually Anthropic.
# Other anthropic_messages providers (MiniMax, Alibaba, etc.) must use their own API key.
# Falling back would send Anthropic credentials to third-party endpoints (Fixes #1739, #minimax-401).
_is_native_anthropic = agent.provider == "anthropic"
effective_key = (api_key or resolve_anthropic_token() or "") if _is_native_anthropic else (api_key or "")
# MiniMax OAuth issues short-lived (~15-min) access tokens. The
# Anthropic SDK caches ``api_key`` as a static string at client
# construction time, so a session that resolves the bearer once
# at startup will keep sending the same token until MiniMax
# returns 401 mid-session. Swap the static string for a callable
# token provider — ``build_anthropic_client`` recognizes the
# callable and installs an httpx event hook that mints a fresh
# bearer per outbound request (re-reading auth.json so a refresh
# persisted by another process is visible immediately).
# The cached refresh path is a no-op when the token still has
# ``MINIMAX_OAUTH_REFRESH_SKEW_SECONDS`` of life left, so steady-
# state cost is one file read + one timestamp compare per request.
if agent.provider == "minimax-oauth" and isinstance(effective_key, str) and effective_key:
try:
from hermes_cli.auth import build_minimax_oauth_token_provider
effective_key = build_minimax_oauth_token_provider()
except Exception as _mm_exc: # noqa: BLE001 — never block startup on this
import logging as _logging
_logging.getLogger(__name__).warning(
"MiniMax OAuth: failed to install per-request token provider "
"(%s); falling back to static bearer that will expire ~15min in.",
_mm_exc,
)
agent.api_key = effective_key
agent._anthropic_api_key = effective_key
agent._anthropic_base_url = base_url
# Only mark the session as OAuth-authenticated when the token
# genuinely belongs to native Anthropic. Third-party providers
# (MiniMax, Kimi, GLM, LiteLLM proxies) that accept the
# Anthropic protocol must never trip OAuth code paths — doing
# so injects Claude-Code identity headers and system prompts
# that cause 401/403 on their endpoints. Guards #1739 and
# the third-party identity-injection bug.
from agent.anthropic_adapter import _is_oauth_token as _is_oat
agent._is_anthropic_oauth = _is_oat(effective_key) if (_is_native_anthropic and isinstance(effective_key, str)) else False
agent._anthropic_client = build_anthropic_client(effective_key, base_url, timeout=_provider_timeout)
# No OpenAI client needed for Anthropic mode
agent.client = None
agent._client_kwargs = {}
if not agent.quiet_mode:
print(f"🤖 AI Agent initialized with model: {agent.model} (Anthropic native)")
# ``effective_key`` may be a callable Entra ID bearer
# provider for Azure Foundry anthropic_messages mode.
# The Anthropic adapter installs an httpx event hook
# that mints a fresh JWT per request — we never
# invoke or inspect the callable in the banner.
from agent.azure_identity_adapter import is_token_provider
if is_token_provider(effective_key):
print("🔑 Using credentials: Microsoft Entra ID")
elif isinstance(effective_key, str) and len(effective_key) > 12:
print(f"🔑 Using token: {effective_key[:8]}...{effective_key[-4:]}")
elif agent.api_mode == "bedrock_converse":
# AWS Bedrock — uses boto3 directly, no OpenAI client needed.
# Region is extracted from the base_url or defaults to us-east-1.
_region_match = re.search(r"bedrock-runtime\.([a-z0-9-]+)\.", base_url or "")
agent._bedrock_region = _region_match.group(1) if _region_match else "us-east-1"
# Guardrail config — read from config.yaml at init time.
agent._bedrock_guardrail_config = None
try:
from hermes_cli.config import load_config as _load_br_cfg
_gr = _load_br_cfg().get("bedrock", {}).get("guardrail", {})
if _gr.get("guardrail_identifier") and _gr.get("guardrail_version"):
agent._bedrock_guardrail_config = {
"guardrailIdentifier": _gr["guardrail_identifier"],
"guardrailVersion": _gr["guardrail_version"],
}
if _gr.get("stream_processing_mode"):
agent._bedrock_guardrail_config["streamProcessingMode"] = _gr["stream_processing_mode"]
if _gr.get("trace"):
agent._bedrock_guardrail_config["trace"] = _gr["trace"]
except Exception:
pass
agent.client = None
agent._client_kwargs = {}
if not agent.quiet_mode:
_gr_label = " + Guardrails" if agent._bedrock_guardrail_config else ""
print(f"🤖 AI Agent initialized with model: {agent.model} (AWS Bedrock, {agent._bedrock_region}{_gr_label})")
else:
if api_key and base_url:
# Explicit credentials from CLI/gateway — construct directly.
# The runtime provider resolver already handled auth for us.
# Extract query params (e.g. Azure api-version) from base_url
# and pass via default_query to prevent loss during SDK URL
# joining (httpx drops query string when joining paths).
_parsed_url = urlparse(base_url)
if _parsed_url.query:
_clean_url = urlunparse(_parsed_url._replace(query=""))
_query_params = {
k: v[0] for k, v in parse_qs(_parsed_url.query).items()
}
client_kwargs = {
"api_key": api_key,
"base_url": _clean_url,
"default_query": _query_params,
}
else:
client_kwargs = {"api_key": api_key, "base_url": base_url}
if _provider_timeout is not None:
client_kwargs["timeout"] = _provider_timeout
if agent.provider == "copilot-acp":
client_kwargs["command"] = agent.acp_command
client_kwargs["args"] = agent.acp_args
effective_base = base_url
if base_url_host_matches(effective_base, "openrouter.ai"):
from agent.auxiliary_client import build_or_headers
client_kwargs["default_headers"] = build_or_headers()
elif base_url_host_matches(effective_base, "integrate.api.nvidia.com"):
from agent.auxiliary_client import build_nvidia_nim_headers
client_kwargs["default_headers"] = build_nvidia_nim_headers(effective_base)
elif base_url_host_matches(effective_base, "api.routermint.com"):
client_kwargs["default_headers"] = _ra()._routermint_headers()
elif base_url_host_matches(effective_base, "api.githubcopilot.com"):
from hermes_cli.models import copilot_default_headers
client_kwargs["default_headers"] = copilot_default_headers()
elif base_url_host_matches(effective_base, "api.kimi.com"):
client_kwargs["default_headers"] = {
"User-Agent": "claude-code/0.1.0",
}
elif base_url_host_matches(effective_base, "portal.qwen.ai"):
client_kwargs["default_headers"] = _ra()._qwen_portal_headers()
elif base_url_host_matches(effective_base, "chatgpt.com"):
from agent.auxiliary_client import _codex_cloudflare_headers
client_kwargs["default_headers"] = _codex_cloudflare_headers(api_key)
elif "default_headers" not in client_kwargs:
# Fall back to profile.default_headers for providers that
# declare custom headers (e.g. Kimi User-Agent on non-kimi.com
# endpoints).
try:
from providers import get_provider_profile as _gpf
_ph = _gpf(agent.provider)
if _ph and _ph.default_headers:
client_kwargs["default_headers"] = dict(_ph.default_headers)
except Exception:
pass
else:
# No explicit creds — use the centralized provider router
from agent.auxiliary_client import resolve_provider_client
_routed_client, _ = resolve_provider_client(
agent.provider or "auto", model=agent.model, raw_codex=True)
if _routed_client is not None:
client_kwargs = {
"api_key": _routed_client.api_key,
"base_url": str(_routed_client.base_url),
}
if _provider_timeout is not None:
client_kwargs["timeout"] = _provider_timeout
# Preserve provider-specific headers the router set. The
# OpenAI SDK stores caller-provided default_headers in
# _custom_headers; older/mocked clients may expose
# _default_headers instead.
_routed_headers = getattr(_routed_client, "_custom_headers", None)
if not _routed_headers:
_routed_headers = getattr(_routed_client, "_default_headers", None)
if _routed_headers:
client_kwargs["default_headers"] = dict(_routed_headers)
else:
# When the user explicitly chose a non-OpenRouter provider
# but no credentials were found, fail fast with a clear
# message instead of silently routing through OpenRouter.
_explicit = (agent.provider or "").strip().lower()
if _explicit and _explicit not in {"auto", "openrouter", "custom"}:
# Look up the actual env var name from the provider
# config — some providers use non-standard names
# (e.g. alibaba → DASHSCOPE_API_KEY, not ALIBABA_API_KEY).
_env_hint = f"{_explicit.upper()}_API_KEY"
try:
from hermes_cli.auth import PROVIDER_REGISTRY
_pcfg = PROVIDER_REGISTRY.get(_explicit)
if _pcfg and _pcfg.api_key_env_vars:
_env_hint = _pcfg.api_key_env_vars[0]
except Exception:
pass
# --- Init-time fallback (#17929) ---
_fb_entries = []
if isinstance(fallback_model, list):
_fb_entries = [
f for f in fallback_model
if isinstance(f, dict) and f.get("provider") and f.get("model")
]
elif isinstance(fallback_model, dict) and fallback_model.get("provider") and fallback_model.get("model"):
_fb_entries = [fallback_model]
_fb_resolved = False
for _fb in _fb_entries:
_fb_explicit_key = (_fb.get("api_key") or "").strip() or None
if not _fb_explicit_key:
_fb_key_env = (_fb.get("key_env") or _fb.get("api_key_env") or "").strip()
if _fb_key_env:
_fb_explicit_key = os.getenv(_fb_key_env, "").strip() or None
_fb_client, _fb_model = resolve_provider_client(
_fb["provider"], model=_fb["model"], raw_codex=True,
explicit_base_url=_fb.get("base_url"),
explicit_api_key=_fb_explicit_key,
)
if _fb_client is not None:
agent.provider = _fb["provider"]
agent.model = _fb_model or _fb["model"]
agent._fallback_activated = True
client_kwargs = {
"api_key": _fb_client.api_key,
"base_url": str(_fb_client.base_url),
}
if _provider_timeout is not None:
client_kwargs["timeout"] = _provider_timeout
_fb_headers = getattr(_fb_client, "_custom_headers", None)
if not _fb_headers:
_fb_headers = getattr(_fb_client, "_default_headers", None)
if _fb_headers:
client_kwargs["default_headers"] = dict(_fb_headers)
_fb_resolved = True
break
if not _fb_resolved:
raise RuntimeError(
f"Provider '{_explicit}' is set in config.yaml but no API key "
f"was found. Set the {_env_hint} environment "
f"variable, or switch to a different provider with `hermes model`."
)
if not getattr(agent, "_fallback_activated", False):
# No provider configured — reject with a clear message.
raise RuntimeError(
"No LLM provider configured. Run `hermes model` to "
"select a provider, or run `hermes setup` for first-time "
"configuration."
)
agent._client_kwargs = client_kwargs # stored for rebuilding after interrupt
# Enable fine-grained tool streaming for Claude on OpenRouter.
# Without this, Anthropic buffers the entire tool call and goes
# silent for minutes while thinking — OpenRouter's upstream proxy
# times out during the silence. The beta header makes Anthropic
# stream tool call arguments token-by-token, keeping the
# connection alive.
_effective_base = str(client_kwargs.get("base_url", "")).lower()
if base_url_host_matches(_effective_base, "openrouter.ai") and "claude" in (agent.model or "").lower():
headers = client_kwargs.get("default_headers") or {}
existing_beta = headers.get("x-anthropic-beta", "")
_FINE_GRAINED = "fine-grained-tool-streaming-2025-05-14"
if _FINE_GRAINED not in existing_beta:
if existing_beta:
headers["x-anthropic-beta"] = f"{existing_beta},{_FINE_GRAINED}"
else:
headers["x-anthropic-beta"] = _FINE_GRAINED
client_kwargs["default_headers"] = headers
agent.api_key = client_kwargs.get("api_key", "")
agent.base_url = client_kwargs.get("base_url", agent.base_url)
try:
agent.client = agent._create_openai_client(client_kwargs, reason="agent_init", shared=True)
if not agent.quiet_mode:
print(f"🤖 AI Agent initialized with model: {agent.model}")
if base_url:
print(f"🔗 Using custom base URL: {base_url}")
# ``api_key`` may be a callable Entra ID bearer
# provider (Azure Foundry). The OpenAI SDK mints a
# fresh JWT per request internally — the banner
# never invokes or inspects the callable.
from agent.azure_identity_adapter import is_token_provider
key_used = client_kwargs.get("api_key", "none")
if is_token_provider(key_used):
print("🔑 Using credentials: Microsoft Entra ID")
elif isinstance(key_used, str) and key_used and key_used != "dummy-key" and len(key_used) > 12:
print(f"🔑 Using API key: {key_used[:8]}...{key_used[-4:]}")
else:
print("⚠️ Warning: API key appears invalid or missing")
except Exception as e:
raise RuntimeError(f"Failed to initialize OpenAI client: {e}")
# Provider fallback chain — ordered list of backup providers tried
# when the primary is exhausted (rate-limit, overload, connection
# failure). Supports both legacy single-dict ``fallback_model`` and
# new list ``fallback_providers`` format.
if isinstance(fallback_model, list):
agent._fallback_chain = [
f for f in fallback_model
if isinstance(f, dict) and f.get("provider") and f.get("model")
]
elif isinstance(fallback_model, dict) and fallback_model.get("provider") and fallback_model.get("model"):
agent._fallback_chain = [fallback_model]
else:
agent._fallback_chain = []
agent._fallback_index = 0
agent._fallback_activated = getattr(agent, "_fallback_activated", False)
# Legacy attribute kept for backward compat (tests, external callers)
agent._fallback_model = agent._fallback_chain[0] if agent._fallback_chain else None
if agent._fallback_chain and not agent.quiet_mode:
if len(agent._fallback_chain) == 1:
fb = agent._fallback_chain[0]
print(f"🔄 Fallback model: {fb['model']} ({fb['provider']})")
else:
print(f"🔄 Fallback chain ({len(agent._fallback_chain)} providers): " +
"".join(f"{f['model']} ({f['provider']})" for f in agent._fallback_chain))
# Get available tools with filtering
agent.tools = _ra().get_tool_definitions(
enabled_toolsets=enabled_toolsets,
disabled_toolsets=disabled_toolsets,
quiet_mode=agent.quiet_mode,
)
# Show tool configuration and store valid tool names for validation
agent.valid_tool_names = set()
if agent.tools:
agent.valid_tool_names = {tool["function"]["name"] for tool in agent.tools}
tool_names = sorted(agent.valid_tool_names)
if not agent.quiet_mode:
print(f"🛠️ Loaded {len(agent.tools)} tools: {', '.join(tool_names)}")
# Show filtering info if applied
if enabled_toolsets:
print(f" ✅ Enabled toolsets: {', '.join(enabled_toolsets)}")
if disabled_toolsets:
print(f" ❌ Disabled toolsets: {', '.join(disabled_toolsets)}")
elif not agent.quiet_mode:
print("🛠️ No tools loaded (all tools filtered out or unavailable)")
# Kanban worker/orchestrator lifecycle guidance is session-static:
# the dispatcher decides at spawn time whether this process is a kanban
# worker (kanban_show tool is present iff HERMES_KANBAN_TASK is set).
# Resolving the ~835-token block once here avoids re-running the
# membership test + reference on every system-prompt rebuild
# (init + each context compression).
from agent.prompt_builder import KANBAN_GUIDANCE
agent._kanban_worker_guidance = (
KANBAN_GUIDANCE if "kanban_show" in agent.valid_tool_names else ""
)
# Check tool requirements
if agent.tools and not agent.quiet_mode:
requirements = _ra().check_toolset_requirements()
missing_reqs = [name for name, available in requirements.items() if not available]
if missing_reqs:
print(f"⚠️ Some tools may not work due to missing requirements: {missing_reqs}")
# Show trajectory saving status
if agent.save_trajectories and not agent.quiet_mode:
print("📝 Trajectory saving enabled")
# Show ephemeral system prompt status
if agent.ephemeral_system_prompt and not agent.quiet_mode:
prompt_preview = agent.ephemeral_system_prompt[:60] + "..." if len(agent.ephemeral_system_prompt) > 60 else agent.ephemeral_system_prompt
print(f"🔒 Ephemeral system prompt: '{prompt_preview}' (not saved to trajectories)")
# Show prompt caching status
if agent._use_prompt_caching and not agent.quiet_mode:
if agent._use_native_cache_layout and agent.provider == "anthropic":
source = "native Anthropic"
elif agent._use_native_cache_layout:
source = "Anthropic-compatible endpoint"
else:
source = "Claude via OpenRouter"
print(f"💾 Prompt caching: ENABLED ({source}, {agent._cache_ttl} TTL)")
# Session logging setup - auto-save conversation trajectories for debugging
agent.session_start = datetime.now()
if session_id:
# Use provided session ID (e.g., from CLI)
agent.session_id = session_id
else:
# Generate a new session ID
timestamp_str = agent.session_start.strftime("%Y%m%d_%H%M%S")
short_uuid = uuid.uuid4().hex[:6]
agent.session_id = f"{timestamp_str}_{short_uuid}"
# Expose session ID to tools (terminal, execute_code) so agents can
# reference their own session for --resume commands, cross-session
# coordination, and logging. Keep the ContextVar and os.environ
# fallback synchronized because different tool paths still read both.
try:
from gateway.session_context import set_current_session_id
set_current_session_id(agent.session_id)
except Exception:
os.environ["HERMES_SESSION_ID"] = agent.session_id
# Session logs go into ~/.hermes/sessions/ alongside gateway sessions
hermes_home = get_hermes_home()
agent.logs_dir = hermes_home / "sessions"
agent.logs_dir.mkdir(parents=True, exist_ok=True)
# Per-session JSON snapshot writer (~/.hermes/sessions/session_{sid}.json)
# is opt-in via sessions.write_json_snapshots (default False). state.db
# is canonical — the snapshot is only useful for external tooling that
# reads the JSON files directly. See run_agent._save_session_log.
agent._session_json_enabled = False
try:
from hermes_cli.config import load_config as _load_sess_cfg
_sess_cfg = (_load_sess_cfg().get("sessions") or {})
agent._session_json_enabled = bool(_sess_cfg.get("write_json_snapshots", False))
except Exception:
pass
# logs_dir is retained unconditionally for request_dump_*.json (debug
# breadcrumb path written by agent_runtime_helpers.dump_api_request_debug).
# Track conversation messages for session logging
agent._session_messages: List[Dict[str, Any]] = []
# Responses encrypted reasoning replay state. Some OpenAI-compatible
# routes accept GPT-5 Responses requests but later reject replayed
# encrypted reasoning blobs (HTTP 400 ``invalid_encrypted_content``).
# When that happens we disable replay for the rest of the session and
# fall back to stateless continuity. See
# agent/conversation_loop.py's invalid_encrypted_content retry branch.
agent._codex_reasoning_replay_enabled = True
agent._memory_write_origin = "assistant_tool"
agent._memory_write_context = "foreground"
# Cached system prompt -- built once per session, only rebuilt on compression
agent._cached_system_prompt: Optional[str] = None
# Filesystem checkpoint manager (transparent — not a tool)
from tools.checkpoint_manager import CheckpointManager
agent._checkpoint_mgr = CheckpointManager(
enabled=checkpoints_enabled,
max_snapshots=checkpoint_max_snapshots,
max_total_size_mb=checkpoint_max_total_size_mb,
max_file_size_mb=checkpoint_max_file_size_mb,
)
# SQLite session store (optional -- provided by CLI or gateway)
agent._session_db = session_db
agent._parent_session_id = parent_session_id
agent._last_flushed_db_idx = 0 # tracks DB-write cursor to prevent duplicate writes
agent._session_db_created = False # DB row deferred to run_conversation()
agent._session_init_model_config = {
"max_iterations": agent.max_iterations,
"reasoning_config": reasoning_config,
"max_tokens": max_tokens,
}
# In-memory todo list for task planning (one per agent/session)
from tools.todo_tool import TodoStore
agent._todo_store = TodoStore()
# Load config once for memory, skills, and compression sections
try:
from hermes_cli.config import load_config as _load_agent_config
_agent_cfg = _load_agent_config()
except Exception:
_agent_cfg = {}
try:
agent._tool_guardrails = ToolCallGuardrailController(
ToolCallGuardrailConfig.from_mapping(
_agent_cfg.get("tool_loop_guardrails", {})
)
)
except Exception as _tlg_err:
_ra().logger.warning("Tool loop guardrail config ignored: %s", _tlg_err)
# Cache only the derived auxiliary compression context override that is
# needed later by the startup feasibility check. Avoid exposing a
# broad pseudo-public config object on the agent instance.
agent._aux_compression_context_length_config = None
# Persistent memory (MEMORY.md + USER.md) -- loaded from disk
agent._memory_store = None
agent._memory_enabled = False
agent._user_profile_enabled = False
agent._memory_nudge_interval = 10
agent._turns_since_memory = 0
agent._iters_since_skill = 0
if not skip_memory:
try:
mem_config = _agent_cfg.get("memory", {})
agent._memory_enabled = mem_config.get("memory_enabled", False)
agent._user_profile_enabled = mem_config.get("user_profile_enabled", False)
agent._memory_nudge_interval = int(mem_config.get("nudge_interval", 10))
if agent._memory_enabled or agent._user_profile_enabled:
from tools.memory_tool import MemoryStore
agent._memory_store = MemoryStore(
memory_char_limit=mem_config.get("memory_char_limit", 2200),
user_char_limit=mem_config.get("user_char_limit", 1375),
)
agent._memory_store.load_from_disk()
except Exception:
pass # Memory is optional -- don't break agent init
# Memory provider plugin (external — one at a time, alongside built-in)
# Reads memory.provider from config to select which plugin to activate.
agent._memory_manager = None
if not skip_memory:
try:
_mem_provider_name = mem_config.get("provider", "") if mem_config else ""
if _mem_provider_name and _mem_provider_name.strip():
from agent.memory_manager import MemoryManager as _MemoryManager
from plugins.memory import load_memory_provider as _load_mem
agent._memory_manager = _MemoryManager()
_mp = _load_mem(_mem_provider_name)
if _mp and _mp.is_available():
agent._memory_manager.add_provider(_mp)
if agent._memory_manager.providers:
_init_kwargs = {
"session_id": agent.session_id,
"platform": platform or "cli",
"hermes_home": str(get_hermes_home()),
"agent_context": "primary",
}
# Thread session title for memory provider scoping
# (e.g. honcho uses this to derive chat-scoped session keys)
if agent._session_db:
try:
_st = agent._session_db.get_session_title(agent.session_id)
if _st:
_init_kwargs["session_title"] = _st
except Exception:
pass
# Thread gateway user identity for per-user memory scoping
if agent._user_id:
_init_kwargs["user_id"] = agent._user_id
if agent._user_id_alt:
_init_kwargs["user_id_alt"] = agent._user_id_alt
if agent._user_name:
_init_kwargs["user_name"] = agent._user_name
if agent._chat_id:
_init_kwargs["chat_id"] = agent._chat_id
if agent._chat_name:
_init_kwargs["chat_name"] = agent._chat_name
if agent._chat_type:
_init_kwargs["chat_type"] = agent._chat_type
if agent._thread_id:
_init_kwargs["thread_id"] = agent._thread_id
# Thread gateway session key for stable per-chat Honcho session isolation
if agent._gateway_session_key:
_init_kwargs["gateway_session_key"] = agent._gateway_session_key
# Profile identity for per-profile provider scoping
try:
from hermes_cli.profiles import get_active_profile_name
_profile = get_active_profile_name()
_init_kwargs["agent_identity"] = _profile
_init_kwargs["agent_workspace"] = "hermes"
except Exception:
pass
agent._memory_manager.initialize_all(**_init_kwargs)
_ra().logger.info("Memory provider '%s' activated", _mem_provider_name)
else:
_ra().logger.debug("Memory provider '%s' not found or not available", _mem_provider_name)
agent._memory_manager = None
except Exception as _mpe:
_ra().logger.warning("Memory provider plugin init failed: %s", _mpe)
agent._memory_manager = None
# Inject memory provider tool schemas into the tool surface.
# Skip tools whose names already exist (plugins may register the
# same tools via ctx.register_tool(), which lands in agent.tools
# through _ra().get_tool_definitions()). Duplicate function names cause
# 400 errors on providers that enforce unique names (e.g. Xiaomi
# MiMo via Nous Portal).
#
# Respect the platform's enabled_toolsets configuration (#5544):
# enabled_toolsets is None → no filter, inject (backward compat)
# "memory" in enabled_toolsets → user opted in, inject
# otherwise (incl. []) → user excluded memory, skip injection
#
# Without this gate, `platform_toolsets: telegram: []` still leaks memory
# provider tools (fact_store, etc.) into the tool surface — a 10x latency
# penalty on local models and a frequent trigger of tool-call loops.
if agent._memory_manager and agent.tools is not None and (
agent.enabled_toolsets is None or "memory" in agent.enabled_toolsets
):
_existing_tool_names = {
t.get("function", {}).get("name")
for t in agent.tools
if isinstance(t, dict)
}
for _schema in agent._memory_manager.get_all_tool_schemas():
_tname = _schema.get("name", "")
if _tname and _tname in _existing_tool_names:
continue # already registered via plugin path
_wrapped = {"type": "function", "function": _schema}
agent.tools.append(_wrapped)
if _tname:
agent.valid_tool_names.add(_tname)
_existing_tool_names.add(_tname)
# Skills config: nudge interval for skill creation reminders
agent._skill_nudge_interval = 10
try:
skills_config = _agent_cfg.get("skills", {})
agent._skill_nudge_interval = int(skills_config.get("creation_nudge_interval", 10))
except Exception:
pass
# Tool-use enforcement config: "auto" (default — matches hardcoded
# model list), true (always), false (never), or list of substrings.
_agent_section = _agent_cfg.get("agent", {})
if not isinstance(_agent_section, dict):
_agent_section = {}
agent._tool_use_enforcement = _agent_section.get("tool_use_enforcement", "auto")
# Universal task-completion guidance toggle. Default True. Surfaced
# as a separate flag from tool_use_enforcement because the guidance
# applies to ALL models, not just the model families enforcement
# targets.
agent._task_completion_guidance = bool(_agent_section.get("task_completion_guidance", True))
# Local Python toolchain probe toggle. Default True. When False,
# the probe is skipped entirely (no subprocess calls, no system-prompt
# line). Useful for users on exotic setups where the probe heuristics
# are noisy.
agent._environment_probe = bool(_agent_section.get("environment_probe", True))
# App-level API retry count (wraps each model API call). Default 3,
# overridable via agent.api_max_retries in config.yaml. See #11616.
try:
_raw_api_retries = _agent_section.get("api_max_retries", 3)
_api_retries = int(_raw_api_retries)
_api_retries = max(_api_retries, 1) # 1 = no retry (single attempt)
except (TypeError, ValueError):
_api_retries = 3
agent._api_max_retries = _api_retries
# Initialize context compressor for automatic context management
# Compresses conversation when approaching model's context limit
# Configuration via config.yaml (compression section)
_compression_cfg = _agent_cfg.get("compression", {})
if not isinstance(_compression_cfg, dict):
_compression_cfg = {}
compression_threshold = float(_compression_cfg.get("threshold", 0.50))
try:
from agent.auxiliary_client import _compression_threshold_for_model as _cthresh_fn
_model_cthresh = _cthresh_fn(agent.model)
if _model_cthresh is not None:
compression_threshold = _model_cthresh
except Exception:
pass
compression_enabled = str(_compression_cfg.get("enabled", True)).lower() in {"true", "1", "yes"}
compression_target_ratio = float(_compression_cfg.get("target_ratio", 0.20))
compression_protect_last = int(_compression_cfg.get("protect_last_n", 20))
# protect_first_n is the number of non-system messages to protect at
# the head, in addition to the system prompt (which is always
# implicitly protected by the compressor). Floor at 0 — a value of
# 0 means "preserve only the system prompt + summary + tail", which
# is a legitimate (and common) configuration for long-running
# rolling-compaction sessions.
compression_protect_first = max(
0, int(_compression_cfg.get("protect_first_n", 3))
)
compression_abort_on_summary_failure = str(
_compression_cfg.get("abort_on_summary_failure", False)
).lower() in {"true", "1", "yes"}
# Read optional explicit context_length override for the auxiliary
# compression model. Custom endpoints often cannot report this via
# /models, so the startup feasibility check needs the config hint.
try:
_aux_cfg = cfg_get(_agent_cfg, "auxiliary", "compression", default={})
except Exception:
_aux_cfg = {}
if isinstance(_aux_cfg, dict):
_aux_context_config = _aux_cfg.get("context_length")
else:
_aux_context_config = None
if _aux_context_config is not None:
try:
_aux_context_config = int(_aux_context_config)
except (TypeError, ValueError):
_aux_context_config = None
agent._aux_compression_context_length_config = _aux_context_config
# Read explicit model output-token override from config when the
# caller did not pass one directly.
_model_cfg = _agent_cfg.get("model", {})
if agent.max_tokens is None and isinstance(_model_cfg, dict):
_config_max_tokens = _model_cfg.get("max_tokens")
if _config_max_tokens is not None:
try:
if isinstance(_config_max_tokens, bool):
raise ValueError
_parsed_max_tokens = int(_config_max_tokens)
if _parsed_max_tokens <= 0:
raise ValueError
agent.max_tokens = _parsed_max_tokens
except (TypeError, ValueError):
_ra().logger.warning(
"Invalid model.max_tokens in config.yaml: %r"
"must be a positive integer (e.g. 4096). "
"Falling back to provider default.",
_config_max_tokens,
)
print(
f"\n⚠ Invalid model.max_tokens in config.yaml: {_config_max_tokens!r}\n"
f" Must be a positive integer (e.g. 4096).\n"
f" Falling back to provider default.\n",
file=sys.stderr,
)
agent._session_init_model_config["max_tokens"] = agent.max_tokens
# Read explicit context_length override from model config
if isinstance(_model_cfg, dict):
_config_context_length = _model_cfg.get("context_length")
else:
_config_context_length = None
if _config_context_length is not None:
try:
_config_context_length = int(_config_context_length)
except (TypeError, ValueError):
_ra().logger.warning(
"Invalid model.context_length in config.yaml: %r"
"must be a plain integer (e.g. 256000, not '256K'). "
"Falling back to auto-detection.",
_config_context_length,
)
print(
f"\n⚠ Invalid model.context_length in config.yaml: {_config_context_length!r}\n"
f" Must be a plain integer (e.g. 256000, not '256K').\n"
f" Falling back to auto-detected context window.\n",
file=sys.stderr,
)
_config_context_length = None
# Resolve custom_providers list once for reuse below (startup
# context-length override and plugin context-engine init).
try:
from hermes_cli.config import get_compatible_custom_providers
_custom_providers = get_compatible_custom_providers(_agent_cfg)
except Exception:
_custom_providers = _agent_cfg.get("custom_providers")
if not isinstance(_custom_providers, list):
_custom_providers = []
# Store for reuse by _check_compression_model_feasibility (auxiliary
# compression model context-length detection needs the same list).
agent._custom_providers = _custom_providers
_merge_custom_provider_extra_body(agent, _custom_providers)
# Check custom_providers per-model context_length
if _config_context_length is None and _custom_providers:
try:
from hermes_cli.config import get_custom_provider_context_length
_cp_ctx_resolved = get_custom_provider_context_length(
model=agent.model,
base_url=agent.base_url,
custom_providers=_custom_providers,
)
if _cp_ctx_resolved:
_config_context_length = int(_cp_ctx_resolved)
except Exception:
_cp_ctx_resolved = None
# Surface a clear warning if the user set a context_length but it
# wasn't a valid positive int — the helper silently skips those.
if _config_context_length is None:
_target = agent.base_url.rstrip("/") if agent.base_url else ""
for _cp_entry in _custom_providers:
if not isinstance(_cp_entry, dict):
continue
_cp_url = (_cp_entry.get("base_url") or "").rstrip("/")
if _target and _cp_url == _target:
_cp_models = _cp_entry.get("models", {})
if isinstance(_cp_models, dict):
_cp_model_cfg = _cp_models.get(agent.model, {})
if isinstance(_cp_model_cfg, dict):
_cp_ctx = _cp_model_cfg.get("context_length")
if _cp_ctx is not None:
try:
_parsed = int(_cp_ctx)
if _parsed <= 0:
raise ValueError
except (TypeError, ValueError):
_ra().logger.warning(
"Invalid context_length for model %r in "
"custom_providers: %r — must be a positive "
"integer (e.g. 256000, not '256K'). "
"Falling back to auto-detection.",
agent.model, _cp_ctx,
)
print(
f"\n⚠ Invalid context_length for model {agent.model!r} in custom_providers: {_cp_ctx!r}\n"
f" Must be a positive integer (e.g. 256000, not '256K').\n"
f" Falling back to auto-detected context window.\n",
file=sys.stderr,
)
break
# Persist for reuse on switch_model / fallback activation. Must come
# AFTER the custom_providers branch so per-model overrides aren't lost.
agent._config_context_length = _config_context_length
agent._ensure_lmstudio_runtime_loaded(_config_context_length)
# Select context engine: config-driven (like memory providers).
# 1. Check config.yaml context.engine setting
# 2. Check plugins/context_engine/<name>/ directory (repo-shipped)
# 3. Check general plugin system (user-installed plugins)
# 4. Fall back to built-in ContextCompressor
_selected_engine = None
_engine_name = "compressor" # default
try:
_ctx_cfg = _agent_cfg.get("context", {}) if isinstance(_agent_cfg, dict) else {}
_engine_name = _ctx_cfg.get("engine", "compressor") or "compressor"
except Exception:
pass
if _engine_name != "compressor":
# Try loading from plugins/context_engine/<name>/
try:
from plugins.context_engine import load_context_engine
_selected_engine = load_context_engine(_engine_name)
except Exception as _ce_load_err:
_ra().logger.debug("Context engine load from plugins/context_engine/: %s", _ce_load_err)
# Try general plugin system as fallback
if _selected_engine is None:
try:
from hermes_cli.plugins import get_plugin_context_engine
_candidate = get_plugin_context_engine()
if _candidate and _candidate.name == _engine_name:
_selected_engine = _candidate
except Exception:
pass
if _selected_engine is None:
_ra().logger.warning(
"Context engine '%s' not found — falling back to built-in compressor",
_engine_name,
)
# else: config says "compressor" — use built-in, don't auto-activate plugins
if _selected_engine is not None:
agent.context_compressor = _selected_engine
# Resolve context_length for plugin engines — mirrors switch_model() path
from agent.model_metadata import get_model_context_length
_plugin_ctx_len = get_model_context_length(
agent.model,
base_url=agent.base_url,
api_key=getattr(agent, "api_key", ""),
config_context_length=_config_context_length,
provider=agent.provider,
custom_providers=_custom_providers,
)
agent.context_compressor.update_model(
model=agent.model,
context_length=_plugin_ctx_len,
base_url=agent.base_url,
api_key=getattr(agent, "api_key", ""),
provider=agent.provider,
api_mode=agent.api_mode,
)
if not agent.quiet_mode:
_ra().logger.info("Using context engine: %s", _selected_engine.name)
else:
agent.context_compressor = ContextCompressor(
model=agent.model,
threshold_percent=compression_threshold,
protect_first_n=compression_protect_first,
protect_last_n=compression_protect_last,
summary_target_ratio=compression_target_ratio,
summary_model_override=None,
quiet_mode=agent.quiet_mode,
base_url=agent.base_url,
api_key=getattr(agent, "api_key", ""),
config_context_length=_config_context_length,
provider=agent.provider,
api_mode=agent.api_mode,
abort_on_summary_failure=compression_abort_on_summary_failure,
)
agent.compression_enabled = compression_enabled
# Reject models whose context window is below the minimum required
# for reliable tool-calling workflows (64K tokens).
_ctx = getattr(agent.context_compressor, "context_length", 0)
if _ctx and _ctx < MINIMUM_CONTEXT_LENGTH:
raise ValueError(
f"Model {agent.model} has a context window of {_ctx:,} tokens, "
f"which is below the minimum {MINIMUM_CONTEXT_LENGTH:,} required "
f"by Hermes Agent. Choose a model with at least "
f"{MINIMUM_CONTEXT_LENGTH // 1000}K context, or set "
f"model.context_length in config.yaml to override."
)
# Inject context engine tool schemas (e.g. lcm_grep, lcm_describe, lcm_expand).
# Skip names that are already present — the _ra().get_tool_definitions()
# quiet_mode cache returned a shared list pre-#17335, so a stray
# mutation here would poison subsequent agent inits in the same
# Gateway process and trip provider-side 'duplicate tool name'
# errors. Even with the cache fix, dedup is the right defense
# against plugin paths that may register the same schemas via
# ctx.register_tool(). Mirrors the memory tools dedup above.
#
# Respect the platform's enabled_toolsets configuration (#5544):
# context engine tools follow the same gating pattern as memory
# provider tools — without the gate, `platform_toolsets: telegram: []`
# would still leak lcm_* tools into the tool surface and incur the
# same local-model latency penalty.
agent._context_engine_tool_names: set = set()
if (
hasattr(agent, "context_compressor")
and agent.context_compressor
and agent.tools is not None
and (
agent.enabled_toolsets is None
or "context_engine" in agent.enabled_toolsets
)
):
_existing_tool_names = {
t.get("function", {}).get("name")
for t in agent.tools
if isinstance(t, dict)
}
for _schema in agent.context_compressor.get_tool_schemas():
_tname = _schema.get("name", "")
if _tname and _tname in _existing_tool_names:
continue # already registered via plugin/cache path
_wrapped = {"type": "function", "function": _schema}
agent.tools.append(_wrapped)
if _tname:
agent.valid_tool_names.add(_tname)
agent._context_engine_tool_names.add(_tname)
_existing_tool_names.add(_tname)
# Notify context engine of session start
if hasattr(agent, "context_compressor") and agent.context_compressor:
try:
agent.context_compressor.on_session_start(
agent.session_id,
hermes_home=str(get_hermes_home()),
platform=agent.platform or "cli",
model=agent.model,
context_length=getattr(agent.context_compressor, "context_length", 0),
conversation_id=getattr(agent, "_gateway_session_key", None),
)
except Exception as _ce_err:
_ra().logger.debug("Context engine on_session_start: %s", _ce_err)
agent._subdirectory_hints = SubdirectoryHintTracker(
working_dir=os.getenv("TERMINAL_CWD") or None,
)
agent._user_turn_count = 0
# Cumulative token usage for the session
agent.session_prompt_tokens = 0
agent.session_completion_tokens = 0
agent.session_total_tokens = 0
agent.session_api_calls = 0
agent.session_input_tokens = 0
agent.session_output_tokens = 0
agent.session_cache_read_tokens = 0
agent.session_cache_write_tokens = 0
agent.session_reasoning_tokens = 0
agent.session_estimated_cost_usd = 0.0
agent.session_cost_status = "unknown"
agent.session_cost_source = "none"
# ── Ollama num_ctx injection ──
# Ollama defaults to 2048 context regardless of the model's capabilities.
# When running against an Ollama server, detect the model's max context
# and pass num_ctx on every chat request so the full window is used.
# User override: set model.ollama_num_ctx in config.yaml to cap VRAM use.
# If model.context_length is set, it caps num_ctx so the user's VRAM
# budget is respected even when GGUF metadata advertises a larger window.
agent._ollama_num_ctx: int | None = None
_ollama_num_ctx_override = None
if isinstance(_model_cfg, dict):
_ollama_num_ctx_override = _model_cfg.get("ollama_num_ctx")
if _ollama_num_ctx_override is not None:
try:
agent._ollama_num_ctx = int(_ollama_num_ctx_override)
except (TypeError, ValueError):
_ra().logger.debug("Invalid ollama_num_ctx config value: %r", _ollama_num_ctx_override)
if agent._ollama_num_ctx is None and agent.base_url and is_local_endpoint(agent.base_url):
try:
# ``agent.api_key`` may be a callable (Entra token provider).
# Ollama detection makes a manual HTTP request and expects a
# string — Azure Foundry isn't a local endpoint so this branch
# never fires for Entra, but guard defensively.
_key_for_ollama = agent.api_key if isinstance(agent.api_key, str) else ""
_detected = query_ollama_num_ctx(agent.model, agent.base_url, api_key=_key_for_ollama or "")
if _detected and _detected > 0:
agent._ollama_num_ctx = _detected
except Exception as exc:
_ra().logger.debug("Ollama num_ctx detection failed: %s", exc)
# Cap auto-detected ollama_num_ctx to the user's explicit context_length.
# Without this, GGUF metadata can advertise 256K+ which Ollama honours
# by allocating that much VRAM — blowing up small GPUs even though the
# user explicitly set a smaller context_length in config.yaml.
if (
agent._ollama_num_ctx
and _config_context_length
and _ollama_num_ctx_override is None # don't override explicit ollama_num_ctx
and agent._ollama_num_ctx > _config_context_length
):
_ra().logger.info(
"Ollama num_ctx capped: %d -> %d (model.context_length override)",
agent._ollama_num_ctx, _config_context_length,
)
agent._ollama_num_ctx = _config_context_length
if agent._ollama_num_ctx and not agent.quiet_mode:
_ra().logger.info(
"Ollama num_ctx: will request %d tokens (model max from /api/show)",
agent._ollama_num_ctx,
)
if not agent.quiet_mode:
if compression_enabled:
print(f"📊 Context limit: {agent.context_compressor.context_length:,} tokens (compress at {int(compression_threshold*100)}% = {agent.context_compressor.threshold_tokens:,})")
else:
print(f"📊 Context limit: {agent.context_compressor.context_length:,} tokens (auto-compression disabled)")
# Check immediately so CLI users see the warning at startup.
# Gateway status_callback is not yet wired, so any warning is stored
# in _compression_warning and replayed in the first run_conversation().
agent._compression_warning = None
# Lazy feasibility check: deferred to the first turn that approaches the
# compression threshold. Running it eagerly here costs ~400ms cold (network
# probe of the auxiliary provider chain + /models lookup) on every agent
# init, including short ``chat -q`` runs that never reach the threshold.
# ``ensure_compression_feasibility_checked`` (called from
# ``run_conversation``'s preflight) runs it at most once per agent.
agent._compression_feasibility_checked = False
# Snapshot primary runtime for per-turn restoration. When fallback
# activates during a turn, the next turn restores these values so the
# preferred model gets a fresh attempt each time. Uses a single dict
# so new state fields are easy to add without N individual attributes.
_cc = agent.context_compressor
agent._primary_runtime = {
"model": agent.model,
"provider": agent.provider,
"base_url": agent.base_url,
"api_mode": agent.api_mode,
"api_key": getattr(agent, "api_key", ""),
"client_kwargs": dict(agent._client_kwargs),
"use_prompt_caching": agent._use_prompt_caching,
"use_native_cache_layout": agent._use_native_cache_layout,
# Context engine state that _try_activate_fallback() overwrites.
# Use getattr for model/base_url/api_key/provider since plugin
# engines may not have these (they're ContextCompressor-specific).
"compressor_model": getattr(_cc, "model", agent.model),
"compressor_base_url": getattr(_cc, "base_url", agent.base_url),
"compressor_api_key": getattr(_cc, "api_key", ""),
"compressor_provider": getattr(_cc, "provider", agent.provider),
"compressor_context_length": _cc.context_length,
"compressor_threshold_tokens": _cc.threshold_tokens,
}
if agent.api_mode == "anthropic_messages":
agent._primary_runtime.update({
"anthropic_api_key": agent._anthropic_api_key,
"anthropic_base_url": agent._anthropic_base_url,
"is_anthropic_oauth": agent._is_anthropic_oauth,
})
__all__ = ["init_agent"]