diff --git a/agent/anthropic_adapter.py b/agent/anthropic_adapter.py index 78444bcb54b..b4ce2da99d1 100644 --- a/agent/anthropic_adapter.py +++ b/agent/anthropic_adapter.py @@ -1289,13 +1289,21 @@ def convert_tools_to_anthropic(tools: List[Dict]) -> List[Dict]: continue if name: seen_names.add(name) - result.append({ + anthropic_tool: Dict[str, Any] = { "name": name, "description": fn.get("description", ""), "input_schema": _normalize_tool_input_schema( fn.get("parameters", {"type": "object", "properties": {}}) ), - }) + } + # Forward cache_control marker when present on the OpenAI-format + # tool dict (set by ``mark_tools_for_long_lived_cache``). Anthropic's + # tools array supports cache_control on the last tool to cache the + # entire schema cross-session. + cache_control = t.get("cache_control") + if isinstance(cache_control, dict): + anthropic_tool["cache_control"] = dict(cache_control) + result.append(anthropic_tool) return result diff --git a/agent/prompt_caching.py b/agent/prompt_caching.py index d80f58ea40a..4829c96b332 100644 --- a/agent/prompt_caching.py +++ b/agent/prompt_caching.py @@ -1,15 +1,25 @@ -"""Anthropic prompt caching (system_and_3 strategy). +"""Anthropic prompt caching strategies. -Reduces input token costs by ~75% on multi-turn conversations by caching -the conversation prefix. Uses 4 cache_control breakpoints (Anthropic max): - 1. System prompt (stable across all turns) - 2-4. Last 3 non-system messages (rolling window) +Two layouts: + +* ``system_and_3`` (default, used everywhere except the long-lived path): + 4 cache_control breakpoints — system prompt + last 3 non-system messages. + All at the same TTL (5m or 1h). Reduces input token costs by ~75% on + multi-turn conversations within a single session. + +* ``prefix_and_2`` (Claude on Anthropic / OpenRouter / Nous Portal): + 4 breakpoints split across two TTL tiers — tools[-1] (1h) + + stable system prefix (1h) + last 2 non-system messages (5m). The + long-lived prefix is byte-stable across sessions for a given user + config, so every fresh session reads the cached system+tools instead + of re-paying for them. Within-session rolling window shrinks from 3 + messages to 2 to free the breakpoint budget. Pure functions -- no class state, no AIAgent dependency. """ import copy -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional def _apply_cache_marker(msg: dict, cache_marker: dict, native_anthropic: bool = False) -> None: @@ -38,6 +48,14 @@ def _apply_cache_marker(msg: dict, cache_marker: dict, native_anthropic: bool = last["cache_control"] = cache_marker +def _build_marker(ttl: str) -> Dict[str, str]: + """Build a cache_control marker dict for the given TTL ('5m' or '1h').""" + marker: Dict[str, str] = {"type": "ephemeral"} + if ttl == "1h": + marker["ttl"] = "1h" + return marker + + def apply_anthropic_cache_control( api_messages: List[Dict[str, Any]], cache_ttl: str = "5m", @@ -45,7 +63,8 @@ def apply_anthropic_cache_control( ) -> List[Dict[str, Any]]: """Apply system_and_3 caching strategy to messages for Anthropic models. - Places up to 4 cache_control breakpoints: system prompt + last 3 non-system messages. + Places up to 4 cache_control breakpoints: system prompt + last 3 non-system + messages, all at the same TTL. Returns: Deep copy of messages with cache_control breakpoints injected. @@ -54,9 +73,7 @@ def apply_anthropic_cache_control( if not messages: return messages - marker = {"type": "ephemeral"} - if cache_ttl == "1h": - marker["ttl"] = "1h" + marker = _build_marker(cache_ttl) breakpoints_used = 0 @@ -70,3 +87,115 @@ def apply_anthropic_cache_control( _apply_cache_marker(messages[idx], marker, native_anthropic=native_anthropic) return messages + + +def _mark_system_stable_block( + messages: List[Dict[str, Any]], + long_lived_marker: Dict[str, str], +) -> bool: + """Mark the *first* content block of the system message with the 1h marker. + + The system message is expected to have been split into multiple content + blocks beforehand by the caller — block[0] is the cross-session-stable + prefix, subsequent blocks carry context files + volatile suffix. + Falls back to marking the whole system message as a single block when + the message hasn't been split (preserves correctness on the fallback path). + + Returns True when a marker was placed. + """ + if not messages or messages[0].get("role") != "system": + return False + + sys_msg = messages[0] + content = sys_msg.get("content") + + # Already a list of blocks → mark the first block. + if isinstance(content, list) and content: + first = content[0] + if isinstance(first, dict): + first["cache_control"] = long_lived_marker + return True + return False + + # String content (no split) → cannot place a stable-prefix breakpoint + # without changing the byte content. Caller is responsible for + # splitting; if they didn't, fall through to envelope marker so we still + # cache *something* for this turn. + if isinstance(content, str) and content: + sys_msg["content"] = [ + {"type": "text", "text": content, "cache_control": long_lived_marker} + ] + return True + + return False + + +def apply_anthropic_cache_control_long_lived( + api_messages: List[Dict[str, Any]], + long_lived_ttl: str = "1h", + rolling_ttl: str = "5m", + native_anthropic: bool = False, +) -> List[Dict[str, Any]]: + """Apply prefix_and_2 caching: long-lived stable prefix + rolling window. + + Layout (4 breakpoints total): + * Stable system prefix (block[0]) → ``long_lived_ttl`` TTL + * Last 2 non-system messages → ``rolling_ttl`` TTL each + + NOTE: this function does NOT mark the tools array. Tools cache_control + is attached separately (see ``mark_tools_for_long_lived_cache``) because + tools live outside the messages list in the API payload. + + The caller MUST have split the system message into ordered content + blocks where block[0] is the cross-session-stable portion. If the system + message is still a single string, it is wrapped into a single block and + marked — this is correct, just less effective (the volatile suffix is + not isolated, so the prefix invalidates per-session). + + Returns: + Deep copy of messages with cache_control breakpoints injected. + """ + messages = copy.deepcopy(api_messages) + if not messages: + return messages + + long_marker = _build_marker(long_lived_ttl) + rolling_marker = _build_marker(rolling_ttl) + + placed_prefix = _mark_system_stable_block(messages, long_marker) + + # Reserve 1 breakpoint for the system prefix (when placed); spend the + # remaining 3 on the rolling tail. Anthropic max is 4 total — + # tools[-1] (when marked) consumes the 4th, so we cap rolling at 2 here. + rolling_budget = 2 if placed_prefix else 3 + non_sys = [i for i in range(len(messages)) if messages[i].get("role") != "system"] + for idx in non_sys[-rolling_budget:]: + _apply_cache_marker(messages[idx], rolling_marker, native_anthropic=native_anthropic) + + return messages + + +def mark_tools_for_long_lived_cache( + tools: Optional[List[Dict[str, Any]]], + long_lived_ttl: str = "1h", +) -> Optional[List[Dict[str, Any]]]: + """Attach cache_control to the last tool in the OpenAI-format tools list. + + Anthropic prefix-cache order is ``tools → system → messages``. Marking + the last tool dict caches the entire tools array (Anthropic's docs: + "the marker is placed on the last block you want included in the cached + prefix"). Marker is preserved across the OpenAI-wire boundary on + OpenRouter and Nous Portal (which proxies to OpenRouter); on native + Anthropic the marker is forwarded by ``convert_tools_to_anthropic``. + + Returns a deep copy of the tools list with the marker attached, or the + input unchanged when tools is empty/None. Pure function — does not + mutate the input. + """ + if not tools: + return tools + out = copy.deepcopy(tools) + last = out[-1] + if isinstance(last, dict): + last["cache_control"] = _build_marker(long_lived_ttl) + return out diff --git a/hermes_cli/config.py b/hermes_cli/config.py index f4cedcb75bc..37fd0536cef 100644 --- a/hermes_cli/config.py +++ b/hermes_cli/config.py @@ -723,8 +723,15 @@ DEFAULT_CONFIG = { # Anthropic prompt caching (Claude via OpenRouter or native Anthropic API). # cache_ttl must be "5m" or "1h" (Anthropic-supported tiers); other values are ignored. + # long_lived_prefix: when true (default), Claude on Anthropic / OpenRouter / Nous + # Portal uses a split layout: tools[-1] + stable system prefix at long_lived_ttl + # (cross-session cache), last 2 messages at cache_ttl (within-session rolling). + # Set false to keep the legacy "system + last 3 messages" single-tier layout. + # long_lived_ttl: TTL for the cross-session prefix tier ("5m" or "1h"; default "1h"). "prompt_caching": { "cache_ttl": "5m", + "long_lived_prefix": True, + "long_lived_ttl": "1h", }, # OpenRouter-specific settings. diff --git a/run_agent.py b/run_agent.py index 80a38809a34..0aeacec7a32 100644 --- a/run_agent.py +++ b/run_agent.py @@ -1388,6 +1388,15 @@ class AIAgent: # 1h tier costs 2x on write vs 1.25x for 5m, but amortizes across long # sessions with >5-minute pauses between turns (#14971). self._cache_ttl = "5m" + # Long-lived prefix caching: when enabled and supported by the + # current provider, splits the system prompt into a stable prefix + # (cached cross-session at 1h TTL) and a volatile suffix + # (memory/timestamp — never cached), and attaches a 1h cache_control + # marker to the last tool in the schema array. Restricted to + # Claude on Anthropic / OpenRouter / Nous Portal; see + # ``_supports_long_lived_anthropic_cache``. + self._use_long_lived_prefix_cache = False + self._long_lived_cache_ttl = "1h" try: from hermes_cli.config import load_config as _load_pc_cfg @@ -1395,6 +1404,12 @@ class AIAgent: _ttl = _pc_cfg.get("cache_ttl", "5m") if _ttl in {"5m", "1h"}: self._cache_ttl = _ttl + _ll_enabled = _pc_cfg.get("long_lived_prefix", True) + _ll_ttl = _pc_cfg.get("long_lived_ttl", "1h") + if _ll_ttl in ("5m", "1h"): + self._long_lived_cache_ttl = _ll_ttl + if _ll_enabled and self._use_prompt_caching and self._supports_long_lived_anthropic_cache(): + self._use_long_lived_prefix_cache = True except Exception: pass @@ -2386,6 +2401,7 @@ class AIAgent: "client_kwargs": dict(self._client_kwargs), "use_prompt_caching": self._use_prompt_caching, "use_native_cache_layout": self._use_native_cache_layout, + "use_long_lived_prefix_cache": self._use_long_lived_prefix_cache, # Context engine state that _try_activate_fallback() overwrites. # Use getattr for model/base_url/api_key/provider since plugin # engines may not have these (they're ContextCompressor-specific). @@ -2616,6 +2632,15 @@ class AIAgent: model=new_model, ) ) + self._use_long_lived_prefix_cache = bool( + self._use_prompt_caching + and self._supports_long_lived_anthropic_cache( + provider=new_provider, + base_url=self.base_url, + api_mode=api_mode, + model=new_model, + ) + ) # ── LM Studio: preload before probing context length ── self._ensure_lmstudio_runtime_loaded() @@ -2664,6 +2689,7 @@ class AIAgent: "client_kwargs": dict(self._client_kwargs), "use_prompt_caching": self._use_prompt_caching, "use_native_cache_layout": self._use_native_cache_layout, + "use_long_lived_prefix_cache": self._use_long_lived_prefix_cache, "compressor_model": getattr(_cc, "model", self.model) if _cc else self.model, "compressor_base_url": getattr(_cc, "base_url", self.base_url) if _cc else self.base_url, "compressor_api_key": getattr(_cc, "api_key", "") if _cc else "", @@ -3412,6 +3438,10 @@ class AIAgent: provider_lower = eff_provider.lower() is_claude = "claude" in model_lower is_openrouter = base_url_host_matches(eff_base_url, "openrouter.ai") + # Nous Portal proxies to OpenRouter behind the scenes — identical + # OpenAI-wire envelope cache_control semantics. Treat it as an + # OpenRouter-equivalent endpoint for caching layout purposes. + is_nous_portal = "nousresearch" in eff_base_url.lower() is_anthropic_wire = eff_api_mode == "anthropic_messages" is_native_anthropic = ( is_anthropic_wire @@ -3420,7 +3450,7 @@ class AIAgent: if is_native_anthropic: return True, True - if is_openrouter and is_claude: + if (is_openrouter or is_nous_portal) and is_claude: return True, False if is_anthropic_wire and is_claude: # Third-party Anthropic-compatible gateway. @@ -3461,6 +3491,61 @@ class AIAgent: return False, False + def _supports_long_lived_anthropic_cache( + self, + *, + provider: Optional[str] = None, + base_url: Optional[str] = None, + api_mode: Optional[str] = None, + model: Optional[str] = None, + ) -> bool: + """Decide whether the long-lived (1h cross-session) cache layout applies. + + Narrower than ``_anthropic_prompt_cache_policy`` — only enabled + for Claude models on the four endpoints whose cross-session + cache_control behavior we have explicitly validated: + + * Native Anthropic API (``api_mode == 'anthropic_messages'`` + + host ``api.anthropic.com``) + * Anthropic OAuth subscription (same transport as native API) + * OpenRouter (``base_url`` contains ``openrouter.ai``) + * Nous Portal (``base_url`` contains ``nousresearch`` — proxies + to OpenRouter, so identical wire-format) + + All four honour ``cache_control`` on both the tools array and the + first system content block, and bill cross-session cache reads at + the documented 0.1× rate. + + Other endpoints covered by the standard ``system_and_3`` policy + (third-party Anthropic gateways, MiniMax, opencode-go Qwen, etc.) + keep that layout — they support cache_control but their behavior + with mixed-TTL multi-block system content has not been validated + against this codebase. + """ + eff_provider = (provider if provider is not None else self.provider) or "" + eff_base_url = base_url if base_url is not None else (self.base_url or "") + eff_api_mode = api_mode if api_mode is not None else (self.api_mode or "") + eff_model = (model if model is not None else self.model) or "" + + if "claude" not in eff_model.lower(): + return False + + # Native Anthropic + Anthropic OAuth subscription + if eff_api_mode == "anthropic_messages": + if eff_provider == "anthropic" or base_url_hostname(eff_base_url) == "api.anthropic.com": + return True + + # OpenRouter + if base_url_host_matches(eff_base_url, "openrouter.ai"): + return True + + # Nous Portal — front-ends OpenRouter behind the scenes; identical + # wire format and cache_control semantics. + if "nousresearch" in eff_base_url.lower(): + return True + + return False + @staticmethod def _model_requires_responses_api(model: str) -> bool: """Return True for models that require the Responses API path. @@ -5608,22 +5693,33 @@ class AIAgent: - def _build_system_prompt(self, system_message: str = None) -> str: + def _build_system_prompt_parts(self, system_message: str = None) -> Dict[str, str]: + """Assemble the system prompt as three ordered parts. + + Returns a dict with three keys: + * ``stable`` — content that is byte-stable across sessions for a + given user config: identity, tool guidance, skills prompt, + environment hints, platform hints, model-family operational + guidance. Eligible for cross-session 1h prompt caching when + placed as a separate Anthropic content block (see + ``apply_anthropic_cache_control_long_lived``). + * ``context`` — context files (AGENTS.md, .cursorrules, etc.) and + caller-supplied system_message. Stable within a session but may + change between sessions when files are edited or the cwd + differs. Cached within-session via the rolling messages + breakpoint (5m TTL); not promoted to the long-lived tier so + edits don't poison the cross-session cache. + * ``volatile`` — content that changes on most turns/sessions: + memory snapshot, user profile, external memory provider block, + timestamp line. Never marked for caching. + + Joined ``stable\\n\\ncontext\\n\\nvolatile`` produces the same + logical content the old single-string builder produced, with the + guarantee that volatile content is at the end (cache-friendly + ordering for any provider that does prefix caching). """ - Assemble the full system prompt from all layers. - - Called once per session (cached on self._cached_system_prompt) and only - rebuilt after context compression events. This ensures the system prompt - is stable across all turns in a session, maximizing prefix cache hits. - """ - # Layers (in order): - # 1. Agent identity — SOUL.md when available, else DEFAULT_AGENT_IDENTITY - # 2. User / gateway system prompt (if provided) - # 3. Persistent memory (frozen snapshot) - # 4. Skills guidance (if skills tools are loaded) - # 5. Context files (AGENTS.md, .cursorrules — SOUL.md excluded here when used as identity) - # 6. Current date & time (frozen at build time) - # 7. Platform-specific formatting hint + # ── Stable tier ──────────────────────────────────────────────── + stable_parts: List[str] = [] # Try SOUL.md as primary identity unless the caller explicitly skipped it. # Some execution modes (cron) still want HERMES_HOME persona while keeping @@ -5632,15 +5728,15 @@ class AIAgent: if self.load_soul_identity or not self.skip_context_files: _soul_content = load_soul_md() if _soul_content: - prompt_parts = [_soul_content] + stable_parts.append(_soul_content) _soul_loaded = True if not _soul_loaded: # Fallback to hardcoded identity - prompt_parts = [DEFAULT_AGENT_IDENTITY] + stable_parts.append(DEFAULT_AGENT_IDENTITY) # Pointer to the hermes-agent skill + docs for user questions about Hermes itself. - prompt_parts.append(HERMES_AGENT_HELP_GUIDANCE) + stable_parts.append(HERMES_AGENT_HELP_GUIDANCE) # Tool-aware behavioral guidance: only inject when the tools are loaded tool_guidance = [] @@ -5657,17 +5753,17 @@ class AIAgent: if "kanban_show" in self.valid_tool_names: tool_guidance.append(KANBAN_GUIDANCE) if tool_guidance: - prompt_parts.append(" ".join(tool_guidance)) + stable_parts.append(" ".join(tool_guidance)) # Computer-use (macOS) — goes in as its own block rather than being # merged into tool_guidance because the content is multi-paragraph. if "computer_use" in self.valid_tool_names: from agent.prompt_builder import COMPUTER_USE_GUIDANCE - prompt_parts.append(COMPUTER_USE_GUIDANCE) + stable_parts.append(COMPUTER_USE_GUIDANCE) nous_subscription_prompt = build_nous_subscription_prompt(self.valid_tool_names) if nous_subscription_prompt: - prompt_parts.append(nous_subscription_prompt) + stable_parts.append(nous_subscription_prompt) # Tool-use enforcement: tells the model to actually call tools instead # of describing intended actions. Controlled by config.yaml # agent.tool_use_enforcement: @@ -5690,43 +5786,16 @@ class AIAgent: model_lower = (self.model or "").lower() _inject = any(p in model_lower for p in TOOL_USE_ENFORCEMENT_MODELS) if _inject: - prompt_parts.append(TOOL_USE_ENFORCEMENT_GUIDANCE) + stable_parts.append(TOOL_USE_ENFORCEMENT_GUIDANCE) _model_lower = (self.model or "").lower() # Google model operational guidance (conciseness, absolute # paths, parallel tool calls, verify-before-edit, etc.) if "gemini" in _model_lower or "gemma" in _model_lower: - prompt_parts.append(GOOGLE_MODEL_OPERATIONAL_GUIDANCE) + stable_parts.append(GOOGLE_MODEL_OPERATIONAL_GUIDANCE) # OpenAI GPT/Codex execution discipline (tool persistence, # prerequisite checks, verification, anti-hallucination). if "gpt" in _model_lower or "codex" in _model_lower: - prompt_parts.append(OPENAI_MODEL_EXECUTION_GUIDANCE) - - # so it can refer the user to them rather than reinventing answers. - - # Note: ephemeral_system_prompt is NOT included here. It's injected at - # API-call time only so it stays out of the cached/stored system prompt. - if system_message is not None: - prompt_parts.append(system_message) - - if self._memory_store: - if self._memory_enabled: - mem_block = self._memory_store.format_for_system_prompt("memory") - if mem_block: - prompt_parts.append(mem_block) - # USER.md is always included when enabled. - if self._user_profile_enabled: - user_block = self._memory_store.format_for_system_prompt("user") - if user_block: - prompt_parts.append(user_block) - - # External memory provider system prompt block (additive to built-in) - if self._memory_manager: - try: - _ext_mem_block = self._memory_manager.build_system_prompt() - if _ext_mem_block: - prompt_parts.append(_ext_mem_block) - except Exception: - pass + stable_parts.append(OPENAI_MODEL_EXECUTION_GUIDANCE) has_skills_tools = any(name in self.valid_tool_names for name in ['skills_list', 'skill_view', 'skill_manage']) if has_skills_tools: @@ -5744,7 +5813,49 @@ class AIAgent: else: skills_prompt = "" if skills_prompt: - prompt_parts.append(skills_prompt) + stable_parts.append(skills_prompt) + + # Alibaba Coding Plan API always returns "glm-4.7" as model name regardless + # of the requested model. Inject explicit model identity into the system prompt + # so the agent can correctly report which model it is (workaround for API bug). + # Stable for the lifetime of an agent instance — model and provider are fixed + # at construction time. + if self.provider == "alibaba": + _model_short = self.model.split("/")[-1] if "/" in self.model else self.model + stable_parts.append( + f"You are powered by the model named {_model_short}. " + f"The exact model ID is {self.model}. " + f"When asked what model you are, always answer based on this information, " + f"not on any model name returned by the API." + ) + + # Environment hints (WSL, Termux, etc.) — tell the agent about the + # execution environment so it can translate paths and adapt behavior. + # Stable for the lifetime of the process. + _env_hints = build_environment_hints() + if _env_hints: + stable_parts.append(_env_hints) + + platform_key = (self.platform or "").lower().strip() + if platform_key in PLATFORM_HINTS: + stable_parts.append(PLATFORM_HINTS[platform_key]) + elif platform_key: + # Check plugin registry for platform-specific LLM guidance + try: + from gateway.platform_registry import platform_registry + _entry = platform_registry.get(platform_key) + if _entry and _entry.platform_hint: + stable_parts.append(_entry.platform_hint) + except Exception: + pass + + # ── Context tier (cwd-dependent, may change between sessions) ─ + context_parts: List[str] = [] + + # Note: ephemeral_system_prompt is NOT included here. It's injected at + # API-call time only so it stays out of the cached/stored system prompt. + if system_message is not None: + context_parts.append(system_message) if not self.skip_context_files: # Use TERMINAL_CWD for context file discovery when set (gateway @@ -5755,7 +5866,30 @@ class AIAgent: context_files_prompt = build_context_files_prompt( cwd=_context_cwd, skip_soul=_soul_loaded) if context_files_prompt: - prompt_parts.append(context_files_prompt) + context_parts.append(context_files_prompt) + + # ── Volatile tier (changes per session/turn — never cached) ─── + volatile_parts: List[str] = [] + + if self._memory_store: + if self._memory_enabled: + mem_block = self._memory_store.format_for_system_prompt("memory") + if mem_block: + volatile_parts.append(mem_block) + # USER.md is always included when enabled. + if self._user_profile_enabled: + user_block = self._memory_store.format_for_system_prompt("user") + if user_block: + volatile_parts.append(user_block) + + # External memory provider system prompt block (additive to built-in) + if self._memory_manager: + try: + _ext_mem_block = self._memory_manager.build_system_prompt() + if _ext_mem_block: + volatile_parts.append(_ext_mem_block) + except Exception: + pass from hermes_time import now as _hermes_now now = _hermes_now() @@ -5766,40 +5900,31 @@ class AIAgent: timestamp_line += f"\nModel: {self.model}" if self.provider: timestamp_line += f"\nProvider: {self.provider}" - prompt_parts.append(timestamp_line) + volatile_parts.append(timestamp_line) - # Alibaba Coding Plan API always returns "glm-4.7" as model name regardless - # of the requested model. Inject explicit model identity into the system prompt - # so the agent can correctly report which model it is (workaround for API bug). - if self.provider == "alibaba": - _model_short = self.model.split("/")[-1] if "/" in self.model else self.model - prompt_parts.append( - f"You are powered by the model named {_model_short}. " - f"The exact model ID is {self.model}. " - f"When asked what model you are, always answer based on this information, " - f"not on any model name returned by the API." - ) + return { + "stable": "\n\n".join(p.strip() for p in stable_parts if p and p.strip()), + "context": "\n\n".join(p.strip() for p in context_parts if p and p.strip()), + "volatile": "\n\n".join(p.strip() for p in volatile_parts if p and p.strip()), + } - # Environment hints (WSL, Termux, etc.) — tell the agent about the - # execution environment so it can translate paths and adapt behavior. - _env_hints = build_environment_hints() - if _env_hints: - prompt_parts.append(_env_hints) + def _build_system_prompt(self, system_message: str = None) -> str: + """ + Assemble the full system prompt from all layers. - platform_key = (self.platform or "").lower().strip() - if platform_key in PLATFORM_HINTS: - prompt_parts.append(PLATFORM_HINTS[platform_key]) - elif platform_key: - # Check plugin registry for platform-specific LLM guidance - try: - from gateway.platform_registry import platform_registry - _entry = platform_registry.get(platform_key) - if _entry and _entry.platform_hint: - prompt_parts.append(_entry.platform_hint) - except Exception: - pass + Called once per session (cached on self._cached_system_prompt) and only + rebuilt after context compression events. This ensures the system prompt + is stable across all turns in a session, maximizing prefix cache hits. - return "\n\n".join(p.strip() for p in prompt_parts if p.strip()) + Layers are ordered cache-friendly: stable identity/guidance first, + then session-stable context files, then per-call volatile content + (memory, USER profile, timestamp). The split is exposed via + ``_build_system_prompt_parts`` for the long-lived prompt-caching + path (Claude on Anthropic / OpenRouter / Nous Portal). + """ + parts = self._build_system_prompt_parts(system_message=system_message) + joined = "\n\n".join(p for p in (parts["stable"], parts["context"], parts["volatile"]) if p) + return joined # ========================================================================= # Pre/post-call guardrails (inspired by PR #1321 — @alireza78a) @@ -8557,6 +8682,15 @@ class AIAgent: model=fb_model, ) ) + self._use_long_lived_prefix_cache = bool( + self._use_prompt_caching + and self._supports_long_lived_anthropic_cache( + provider=fb_provider, + base_url=fb_base_url, + api_mode=fb_api_mode, + model=fb_model, + ) + ) # LM Studio: preload before probing the fallback's context length. self._ensure_lmstudio_runtime_loaded() @@ -8633,6 +8767,16 @@ class AIAgent: "use_native_cache_layout", self.api_mode == "anthropic_messages" and self.provider == "anthropic", ) + # Long-lived prefix flag was added later — restore False on + # snapshots predating the new field, then re-evaluate against + # the restored provider/model in case the user had it enabled. + self._use_long_lived_prefix_cache = rt.get( + "use_long_lived_prefix_cache", + bool( + self._use_prompt_caching + and self._supports_long_lived_anthropic_cache() + ), + ) # ── Rebuild client for the primary provider ── if self.api_mode == "anthropic_messages": @@ -9210,6 +9354,20 @@ class AIAgent: def _build_api_kwargs(self, api_messages: list) -> dict: """Build the keyword arguments dict for the active API mode.""" + # Resolve the tools array exactly once. When the long-lived + # prefix-cache layout is active (Claude on Anthropic / OpenRouter + # / Nous Portal), attach a 1h cache_control marker to the last + # tool — this caches the entire tools array cross-session via + # Anthropic's tools→system→messages prefix order. The function + # returns a deep copy, so self.tools is never mutated. + if self._use_long_lived_prefix_cache and self.tools: + from agent.prompt_caching import mark_tools_for_long_lived_cache + tools_for_api = mark_tools_for_long_lived_cache( + self.tools, long_lived_ttl=self._long_lived_cache_ttl, + ) + else: + tools_for_api = self.tools + if self.api_mode == "anthropic_messages": _transport = self._get_transport() anthropic_messages = self._prepare_anthropic_messages_for_api(api_messages) @@ -9221,7 +9379,7 @@ class AIAgent: return _transport.build_kwargs( model=self.model, messages=anthropic_messages, - tools=self.tools, + tools=tools_for_api, max_tokens=ephemeral_out if ephemeral_out is not None else self.max_tokens, reasoning_config=self.reasoning_config, is_oauth=self._is_anthropic_oauth, @@ -9241,7 +9399,7 @@ class AIAgent: return _bt.build_kwargs( model=self.model, messages=api_messages, - tools=self.tools, + tools=tools_for_api, max_tokens=self.max_tokens or 4096, region=region, guardrail_config=guardrail, @@ -9265,7 +9423,7 @@ class AIAgent: return _ct.build_kwargs( model=self.model, messages=_msgs_for_codex, - tools=self.tools, + tools=tools_for_api, reasoning_config=self.reasoning_config, session_id=getattr(self, "session_id", None), max_tokens=self.max_tokens, @@ -9356,7 +9514,7 @@ class AIAgent: return _ct.build_kwargs( model=self.model, messages=api_messages, - tools=self.tools, + tools=tools_for_api, base_url=self.base_url, timeout=self._resolved_api_call_timeout(), max_tokens=self.max_tokens, @@ -9388,7 +9546,7 @@ class AIAgent: return _ct.build_kwargs( model=self.model, messages=_msgs_for_chat, - tools=self.tools, + tools=tools_for_api, base_url=self.base_url, timeout=self._resolved_api_call_timeout(), max_tokens=self.max_tokens, @@ -12030,20 +12188,42 @@ class AIAgent: # Ephemeral additions are API-call-time only (not persisted to session DB). # External recall context is injected into the user message, not the system # prompt, so the stable cache prefix remains unchanged. - effective_system = active_system_prompt or "" - if self.ephemeral_system_prompt: - effective_system = (effective_system + "\n\n" + self.ephemeral_system_prompt).strip() + # + # When the long-lived prefix-cache layout is active (Claude on + # Anthropic / OpenRouter / Nous Portal), we build the system + # message as a *list of content blocks*: [stable, context, + # volatile, ephemeral?]. Block 0 (stable) gets the 1h + # cache_control marker further down via + # apply_anthropic_cache_control_long_lived; blocks 1-3 are + # cached only via the rolling messages window at 5m. # NOTE: Plugin context from pre_llm_call hooks is injected into the # user message (see injection block above), NOT the system prompt. # This is intentional — system prompt modifications break the prompt # cache prefix. The system prompt is reserved for Hermes internals. - if effective_system: - api_messages = [{"role": "system", "content": effective_system}] + api_messages + if self._use_long_lived_prefix_cache: + _sys_parts = self._build_system_prompt_parts(system_message=system_message) + _sys_blocks: list = [] + if _sys_parts.get("stable"): + _sys_blocks.append({"type": "text", "text": _sys_parts["stable"]}) + if _sys_parts.get("context"): + _sys_blocks.append({"type": "text", "text": _sys_parts["context"]}) + if _sys_parts.get("volatile"): + _sys_blocks.append({"type": "text", "text": _sys_parts["volatile"]}) + if self.ephemeral_system_prompt: + _sys_blocks.append({"type": "text", "text": self.ephemeral_system_prompt}) + if _sys_blocks: + api_messages = [{"role": "system", "content": _sys_blocks}] + api_messages + else: + effective_system = active_system_prompt or "" + if self.ephemeral_system_prompt: + effective_system = (effective_system + "\n\n" + self.ephemeral_system_prompt).strip() + if effective_system: + api_messages = [{"role": "system", "content": effective_system}] + api_messages # Inject ephemeral prefill messages right after the system prompt # but before conversation history. Same API-call-time-only pattern. if self.prefill_messages: - sys_offset = 1 if effective_system else 0 + sys_offset = 1 if (api_messages and api_messages[0].get("role") == "system") else 0 for idx, pfm in enumerate(self.prefill_messages): api_messages.insert(sys_offset + idx, pfm.copy()) @@ -12054,12 +12234,27 @@ class AIAgent: # to reduce input token costs by ~75% on multi-turn # conversations. Layout is chosen per endpoint by # ``_anthropic_prompt_cache_policy``. + # + # Long-lived prefix layout (prefix_and_2): stable system block + # gets 1h marker + last 2 messages get 5m markers. Tools + # array's last entry is marked separately at API-call kwargs + # build time (see ``_build_api_kwargs`` and + # ``mark_tools_for_long_lived_cache``). if self._use_prompt_caching: - api_messages = apply_anthropic_cache_control( - api_messages, - cache_ttl=self._cache_ttl, - native_anthropic=self._use_native_cache_layout, - ) + if self._use_long_lived_prefix_cache: + from agent.prompt_caching import apply_anthropic_cache_control_long_lived + api_messages = apply_anthropic_cache_control_long_lived( + api_messages, + long_lived_ttl=self._long_lived_cache_ttl, + rolling_ttl=self._cache_ttl, + native_anthropic=self._use_native_cache_layout, + ) + else: + api_messages = apply_anthropic_cache_control( + api_messages, + cache_ttl=self._cache_ttl, + native_anthropic=self._use_native_cache_layout, + ) # Safety net: strip orphaned tool results / add stubs for missing # results before sending to the API. Runs unconditionally — not diff --git a/tests/agent/test_prompt_caching.py b/tests/agent/test_prompt_caching.py index f6f3e9f0a38..9d989571b54 100644 --- a/tests/agent/test_prompt_caching.py +++ b/tests/agent/test_prompt_caching.py @@ -6,6 +6,8 @@ import pytest from agent.prompt_caching import ( _apply_cache_marker, apply_anthropic_cache_control, + apply_anthropic_cache_control_long_lived, + mark_tools_for_long_lived_cache, ) @@ -141,3 +143,132 @@ class TestApplyAnthropicCacheControl: elif "cache_control" in msg: count += 1 assert count <= 4 + + +class TestMarkToolsForLongLivedCache: + def test_returns_unchanged_for_empty_tools(self): + assert mark_tools_for_long_lived_cache(None) is None + assert mark_tools_for_long_lived_cache([]) == [] + + def test_marks_only_last_tool(self): + tools = [ + {"type": "function", "function": {"name": "a"}}, + {"type": "function", "function": {"name": "b"}}, + {"type": "function", "function": {"name": "c"}}, + ] + out = mark_tools_for_long_lived_cache(tools) + assert "cache_control" not in out[0] + assert "cache_control" not in out[1] + assert out[2]["cache_control"] == {"type": "ephemeral", "ttl": "1h"} + + def test_does_not_mutate_input(self): + tools = [{"type": "function", "function": {"name": "a"}}] + mark_tools_for_long_lived_cache(tools) + assert "cache_control" not in tools[0] + + def test_5m_ttl_drops_ttl_field(self): + tools = [{"type": "function", "function": {"name": "a"}}] + out = mark_tools_for_long_lived_cache(tools, long_lived_ttl="5m") + assert out[0]["cache_control"] == {"type": "ephemeral"} + + +class TestApplyAnthropicCacheControlLongLived: + def test_empty_messages(self): + assert apply_anthropic_cache_control_long_lived([]) == [] + + def test_marks_first_block_of_split_system(self): + msgs = [ + {"role": "system", "content": [ + {"type": "text", "text": "STABLE"}, + {"type": "text", "text": "CONTEXT"}, + {"type": "text", "text": "VOLATILE"}, + ]}, + {"role": "user", "content": "msg1"}, + {"role": "assistant", "content": "msg2"}, + ] + out = apply_anthropic_cache_control_long_lived(msgs) + sys_blocks = out[0]["content"] + assert sys_blocks[0]["cache_control"] == {"type": "ephemeral", "ttl": "1h"} + assert "cache_control" not in sys_blocks[1] + assert "cache_control" not in sys_blocks[2] + + def test_rolling_marker_on_last_2_messages(self): + msgs = [ + {"role": "system", "content": [{"type": "text", "text": "S"}]}, + {"role": "user", "content": "u1"}, + {"role": "assistant", "content": "a1"}, + {"role": "user", "content": "u2"}, + {"role": "assistant", "content": "a2"}, + ] + out = apply_anthropic_cache_control_long_lived(msgs) + + def has_marker(m): + c = m.get("content") + if isinstance(c, list) and c and isinstance(c[-1], dict): + return "cache_control" in c[-1] + return "cache_control" in m + + # u1 and a1 (older messages) should NOT be marked + assert not has_marker(out[1]) + assert not has_marker(out[2]) + # u2 and a2 (last 2) SHOULD be marked + assert has_marker(out[3]) + assert has_marker(out[4]) + + def test_rolling_marker_uses_5m_ttl(self): + msgs = [ + {"role": "system", "content": [{"type": "text", "text": "S"}]}, + {"role": "user", "content": "u1"}, + {"role": "assistant", "content": "a1"}, + ] + out = apply_anthropic_cache_control_long_lived( + msgs, long_lived_ttl="1h", rolling_ttl="5m", + ) + # Last user message: cache_control on the wrapped text part should be 5m + last = out[-1] + c = last["content"] + assert isinstance(c, list) + assert c[-1]["cache_control"] == {"type": "ephemeral"} # 5m has no ttl key + + def test_string_system_falls_back_to_envelope_marker(self): + """When the caller didn't split the system message, we still place a marker.""" + msgs = [ + {"role": "system", "content": "Single string system"}, + {"role": "user", "content": "u1"}, + ] + out = apply_anthropic_cache_control_long_lived(msgs) + sys_content = out[0]["content"] + # Wrapped into a list and the (now sole) block gets the 1h marker + assert isinstance(sys_content, list) + assert sys_content[0]["cache_control"] == {"type": "ephemeral", "ttl": "1h"} + + def test_does_not_mutate_input(self): + msgs = [ + {"role": "system", "content": [{"type": "text", "text": "S"}]}, + {"role": "user", "content": "u1"}, + ] + before = copy.deepcopy(msgs) + apply_anthropic_cache_control_long_lived(msgs) + assert msgs == before + + def test_max_4_breakpoints_with_split_system(self): + msgs = [ + {"role": "system", "content": [{"type": "text", "text": "S"}, {"type": "text", "text": "V"}]}, + ] + [ + {"role": "user" if i % 2 == 0 else "assistant", "content": f"msg{i}"} + for i in range(10) + ] + out = apply_anthropic_cache_control_long_lived(msgs) + count = 0 + for m in out: + c = m.get("content") + if isinstance(c, list): + for item in c: + if isinstance(item, dict) and "cache_control" in item: + count += 1 + elif "cache_control" in m: + count += 1 + # 1 system block + last 2 messages = 3 breakpoints from this function. + # tools[-1] is marked separately (not via this function), so a 4th + # breakpoint can be added at API-call time. + assert count == 3 diff --git a/tests/agent/test_prompt_caching_live.py b/tests/agent/test_prompt_caching_live.py new file mode 100644 index 00000000000..f72b6b9d906 --- /dev/null +++ b/tests/agent/test_prompt_caching_live.py @@ -0,0 +1,112 @@ +"""Live E2E: long-lived prefix caching on Claude via OpenRouter. + +Run only when LIVE_OR_KEY env var is set. Skipped under the normal hermetic +test suite (which unsets credentials). +""" +import os, sys, tempfile, time, shutil, pytest + + +# Probe for the key BEFORE conftest unsets it +_LIVE_KEY = os.environ.get("OPENROUTER_API_KEY") or os.environ.get("LIVE_OR_KEY") +if not _LIVE_KEY: + # Try to read directly from .env + env_path = os.path.expanduser("~/.hermes/.env") + if os.path.exists(env_path): + with open(env_path) as f: + for line in f: + if line.startswith("OPENROUTER_API_KEY="): + _LIVE_KEY = line.strip().split("=", 1)[1].strip().strip('"').strip("'") + break + + +pytestmark = pytest.mark.skipif( + not _LIVE_KEY, + reason="set OPENROUTER_API_KEY (or LIVE_OR_KEY) to run live cache test", +) + + +def test_long_lived_prefix_cache_e2e_openrouter(tmp_path, monkeypatch): + """Two AIAgent runs in fresh sessions: call 1 writes cache, call 2 reads it.""" + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + # The hermetic conftest unsets OPENROUTER_API_KEY — restore for this test + monkeypatch.setenv("OPENROUTER_API_KEY", _LIVE_KEY) + + # Minimal config — but with enough toolset/guidance to exceed Anthropic's + # ~1024-token minimum-cacheable-prefix threshold. Anthropic silently + # ignores cache_control markers on small blocks. + import yaml + cfg_path = tmp_path / "config.yaml" + cfg_path.write_text(yaml.safe_dump({ + "model": {"provider": "openrouter", "default": "anthropic/claude-haiku-4.5"}, + "prompt_caching": {"long_lived_prefix": True, "long_lived_ttl": "1h", "cache_ttl": "5m"}, + "agent": {"tool_use_enforcement": True}, # adds substantial guidance text + "memory": {"provider": ""}, + "compression": {"enabled": False}, + })) + + from run_agent import AIAgent + + def make_agent(): + return AIAgent( + api_key=_LIVE_KEY, + base_url="https://openrouter.ai/api/v1", + provider="openrouter", + model="anthropic/claude-haiku-4.5", + api_mode="chat_completions", + # Use the default toolset roster — the tools array (~13k tokens + # for ~35 tools) is what carries the bulk of the cross-session + # cache value. With a tiny toolset the cached prefix can fall + # below Anthropic Haiku's 2048-token minimum cacheable size and + # the marker is silently ignored. + enabled_toolsets=None, + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + save_trajectories=False, + ) + + a1 = make_agent() + assert a1._use_prompt_caching is True, "policy should enable caching for Claude on OR" + assert a1._use_long_lived_prefix_cache is True, "long-lived path should activate" + parts = a1._build_system_prompt_parts() + print(f"\nstable={len(parts['stable']):,} ctx={len(parts['context']):,} volatile={len(parts['volatile']):,} chars") + print(f"tool count: {len(a1.tools or [])}") + + # Use distinct user messages each call so OpenRouter's response cache + # doesn't short-circuit the upstream Anthropic call (we need real + # Anthropic billing visibility to verify cache_creation/cache_read). + USER_1 = "Reply with the single word ALPHA." + USER_2 = "Reply with the single word BRAVO." + + print("\n--- Call 1 (cold) ---") + r1 = a1.run_conversation(USER_1, conversation_history=[]) + print(f"final_response[:80]: {(r1.get('final_response') or '')[:80]!r}") + cr1 = a1.session_cache_read_tokens + cw1 = a1.session_cache_write_tokens + print(f"call1: cache_read={cr1} cache_write={cw1}") + + # Wait so cache settles, then fresh agent (NEW SESSION) for cross-session read + time.sleep(2) + a2 = make_agent() + assert a2.session_id != a1.session_id, "second agent must have a new session" + + print("\n--- Call 2 (warm, NEW session, different user msg) ---") + r2 = a2.run_conversation(USER_2, conversation_history=[]) + print(f"final_response[:80]: {(r2.get('final_response') or '')[:80]!r}") + cr2 = a2.session_cache_read_tokens + cw2 = a2.session_cache_write_tokens + print(f"call2: cache_read={cr2} cache_write={cw2}") + + print(f"\n=== VERDICT ===") + print(f" call1 wrote {cw1:,} cache tokens, read {cr1:,}") + print(f" call2 wrote {cw2:,} cache tokens, read {cr2:,}") + if cw1: + print(f" cross-session read fraction: cr2/cw1 = {cr2/cw1:.2%}") + + # Assertions + assert cw1 > 0, f"call 1 must write cache (got {cw1}); long-lived layout not reaching wire" + assert cr2 > 0, ( + f"call 2 must read cache cross-session (got {cr2}); " + f"stable prefix is not byte-stable across sessions" + ) + assert cr2 >= 1000, f"cache_read on call 2 ({cr2}) too small to indicate real reuse" diff --git a/tests/run_agent/test_anthropic_prompt_cache_policy.py b/tests/run_agent/test_anthropic_prompt_cache_policy.py index b8a380a62e7..0c5b17a39f6 100644 --- a/tests/run_agent/test_anthropic_prompt_cache_policy.py +++ b/tests/run_agent/test_anthropic_prompt_cache_policy.py @@ -290,3 +290,102 @@ class TestExplicitOverrides: model="anthropic/claude-sonnet-4.6", ) assert (should, native) == (True, False) + + +# ───────────────────────────────────────────────────────────────────── +# Long-lived prefix cache policy (cross-session 1h tier) +# ───────────────────────────────────────────────────────────────────── + +class TestSupportsLongLivedAnthropicCache: + """Narrower than _anthropic_prompt_cache_policy — only Claude on the 4 + explicitly-validated endpoints get the long-lived layout.""" + + def test_native_anthropic_claude_supported(self): + agent = _make_agent( + provider="anthropic", + base_url="https://api.anthropic.com", + api_mode="anthropic_messages", + model="claude-sonnet-4.6", + ) + assert agent._supports_long_lived_anthropic_cache() is True + + def test_anthropic_oauth_supported(self): + # OAuth uses the same transport as native Anthropic + agent = _make_agent( + provider="anthropic", + base_url="https://api.anthropic.com", + api_mode="anthropic_messages", + model="claude-opus-4.6", + ) + assert agent._supports_long_lived_anthropic_cache() is True + + def test_openrouter_claude_supported(self): + agent = _make_agent( + provider="openrouter", + base_url="https://openrouter.ai/api/v1", + api_mode="chat_completions", + model="anthropic/claude-sonnet-4.6", + ) + assert agent._supports_long_lived_anthropic_cache() is True + + def test_nous_portal_claude_supported(self): + # Nous Portal proxies to OpenRouter — same wire format + agent = _make_agent( + provider="nous", + base_url="https://inference-api.nousresearch.com/v1", + api_mode="chat_completions", + model="anthropic/claude-opus-4.7", + ) + assert agent._supports_long_lived_anthropic_cache() is True + + def test_openrouter_non_claude_rejected(self): + agent = _make_agent( + provider="openrouter", + base_url="https://openrouter.ai/api/v1", + api_mode="chat_completions", + model="openai/gpt-5.4", + ) + assert agent._supports_long_lived_anthropic_cache() is False + + def test_third_party_anthropic_gateway_rejected(self): + # MiniMax / Kimi / etc. — anthropic-wire but not in our validated list + agent = _make_agent( + provider="minimax", + base_url="https://api.minimax.io/anthropic", + api_mode="anthropic_messages", + model="minimax-m2.7", + ) + assert agent._supports_long_lived_anthropic_cache() is False + + def test_alibaba_dashscope_rejected(self): + agent = _make_agent( + provider="alibaba", + base_url="https://dashscope.aliyuncs.com/api/v1/anthropic", + api_mode="anthropic_messages", + model="qwen3.5-plus", + ) + assert agent._supports_long_lived_anthropic_cache() is False + + def test_opencode_qwen_rejected(self): + agent = _make_agent( + provider="opencode-go", + base_url="https://api.opencode-go.example/v1", + api_mode="chat_completions", + model="qwen3.6-plus", + ) + assert agent._supports_long_lived_anthropic_cache() is False + + def test_fallback_target_evaluated_independently(self): + # Starting on a non-supported provider, falling back to OpenRouter Claude + agent = _make_agent( + provider="minimax", + base_url="https://api.minimax.io/anthropic", + api_mode="anthropic_messages", + model="minimax-m2.7", + ) + assert agent._supports_long_lived_anthropic_cache( + provider="openrouter", + base_url="https://openrouter.ai/api/v1", + api_mode="chat_completions", + model="anthropic/claude-sonnet-4.6", + ) is True