fix: sweep remaining provider-URL substring checks across codebase

Completes the hostname-hardening sweep — every substring check against a
provider host in live-routing code is now hostname-based. This closes the
same false-positive class for OpenRouter, GitHub Copilot, Kimi, Qwen,
ChatGPT/Codex, Bedrock, GitHub Models, Vercel AI Gateway, Nous, Z.AI,
Moonshot, Arcee, and MiniMax that the original PR closed for OpenAI, xAI,
and Anthropic.

New helper:
- utils.base_url_host_matches(base_url, domain) — safe counterpart to
  'domain in base_url'. Accepts hostname equality and subdomain matches;
  rejects path segments, host suffixes, and prefix collisions.

Call sites converted (real-code only; tests, optional-skills, red-teaming
scripts untouched):

run_agent.py (10 sites):
- AIAgent.__init__ Bedrock branch, ChatGPT/Codex branch (also path check)
- header cascade for openrouter / copilot / kimi / qwen / chatgpt
- interleaved-thinking trigger (openrouter + claude)
- _is_openrouter_url(), _is_qwen_portal()
- is_native_anthropic check
- github-models-vs-copilot detection (3 sites)
- reasoning-capable route gate (nousresearch, vercel, github)
- codex-backend detection in API kwargs build
- fallback api_mode Bedrock detection

agent/auxiliary_client.py (7 sites):
- extra-headers cascades in 4 distinct client-construction paths
  (resolve custom, resolve auto, OpenRouter-fallback-to-custom,
  _async_client_from_sync, resolve_provider_client explicit-custom,
  resolve_auto_with_codex)
- _is_openrouter_client() base_url sniff

agent/usage_pricing.py:
- resolve_billing_route openrouter branch

agent/model_metadata.py:
- _is_openrouter_base_url(), Bedrock context-length lookup

hermes_cli/providers.py:
- determine_api_mode Bedrock heuristic

hermes_cli/runtime_provider.py:
- _is_openrouter_url flag for API-key preference (issues #420, #560)

hermes_cli/doctor.py:
- Kimi User-Agent header for /models probes

tools/delegate_tool.py:
- subagent Codex endpoint detection

trajectory_compressor.py:
- _detect_provider() cascade (8 providers: openrouter, nous, codex, zai,
  kimi-coding, arcee, minimax-cn, minimax)

cli.py, gateway/run.py:
- /model-switch cache-enabled hint (openrouter + claude)

Bedrock detection tightened from 'bedrock-runtime in url' to
'hostname starts with bedrock-runtime. AND host is under amazonaws.com'.
ChatGPT/Codex detection tightened from 'chatgpt.com/backend-api/codex in
url' to 'hostname is chatgpt.com AND path contains /backend-api/codex'.

Tests:
- tests/test_base_url_hostname.py extended with a base_url_host_matches
  suite (exact match, subdomain, path-segment rejection, host-suffix
  rejection, host-prefix rejection, empty-input, case-insensitivity,
  trailing dot).

Validation: 651 targeted tests pass (runtime_provider, minimax, bedrock,
gemini, auxiliary, codex_cloudflare, usage_pricing, compressor_fallback,
fallback_model, openai_client_lifecycle, provider_parity, cli_provider_resolution,
delegate, credential_pool, context_compressor, plus the 4 hostname test
modules). 26-assertion E2E call-site verification across 6 modules passes.
This commit is contained in:
Teknium 2026-04-20 21:17:28 -07:00 committed by Teknium
parent cecf84daf7
commit dbb7e00e7e
13 changed files with 184 additions and 76 deletions

View file

@ -48,7 +48,7 @@ from openai import OpenAI
from agent.credential_pool import load_pool from agent.credential_pool import load_pool
from hermes_cli.config import get_hermes_home from hermes_cli.config import get_hermes_home
from hermes_constants import OPENROUTER_BASE_URL from hermes_constants import OPENROUTER_BASE_URL
from utils import base_url_hostname from utils import base_url_host_matches, base_url_hostname
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -817,9 +817,9 @@ def _resolve_api_key_provider() -> Tuple[Optional[OpenAI], Optional[str]]:
if is_native_gemini_base_url(base_url): if is_native_gemini_base_url(base_url):
return GeminiNativeClient(api_key=api_key, base_url=base_url), model return GeminiNativeClient(api_key=api_key, base_url=base_url), model
extra = {} extra = {}
if "api.kimi.com" in base_url.lower(): if base_url_host_matches(base_url, "api.kimi.com"):
extra["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"} extra["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"}
elif "api.githubcopilot.com" in base_url.lower(): elif base_url_host_matches(base_url, "api.githubcopilot.com"):
from hermes_cli.models import copilot_default_headers from hermes_cli.models import copilot_default_headers
extra["default_headers"] = copilot_default_headers() extra["default_headers"] = copilot_default_headers()
@ -843,9 +843,9 @@ def _resolve_api_key_provider() -> Tuple[Optional[OpenAI], Optional[str]]:
if is_native_gemini_base_url(base_url): if is_native_gemini_base_url(base_url):
return GeminiNativeClient(api_key=api_key, base_url=base_url), model return GeminiNativeClient(api_key=api_key, base_url=base_url), model
extra = {} extra = {}
if "api.kimi.com" in base_url.lower(): if base_url_host_matches(base_url, "api.kimi.com"):
extra["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"} extra["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"}
elif "api.githubcopilot.com" in base_url.lower(): elif base_url_host_matches(base_url, "api.githubcopilot.com"):
from hermes_cli.models import copilot_default_headers from hermes_cli.models import copilot_default_headers
extra["default_headers"] = copilot_default_headers() extra["default_headers"] = copilot_default_headers()
@ -994,7 +994,7 @@ def _resolve_custom_runtime() -> Tuple[Optional[str], Optional[str], Optional[st
return None, None, None return None, None, None
custom_base = custom_base.strip().rstrip("/") custom_base = custom_base.strip().rstrip("/")
if "openrouter.ai" in custom_base.lower(): if base_url_host_matches(custom_base, "openrouter.ai"):
# requested='custom' falls back to OpenRouter when no custom endpoint is # requested='custom' falls back to OpenRouter when no custom endpoint is
# configured. Treat that as "no custom endpoint" for auxiliary routing. # configured. Treat that as "no custom endpoint" for auxiliary routing.
return None, None, None return None, None, None
@ -1433,14 +1433,14 @@ def _to_async_client(sync_client, model: str):
"api_key": sync_client.api_key, "api_key": sync_client.api_key,
"base_url": str(sync_client.base_url), "base_url": str(sync_client.base_url),
} }
base_lower = str(sync_client.base_url).lower() sync_base_url = str(sync_client.base_url)
if "openrouter" in base_lower: if base_url_host_matches(sync_base_url, "openrouter.ai"):
async_kwargs["default_headers"] = dict(_OR_HEADERS) async_kwargs["default_headers"] = dict(_OR_HEADERS)
elif "api.githubcopilot.com" in base_lower: elif base_url_host_matches(sync_base_url, "api.githubcopilot.com"):
from hermes_cli.models import copilot_default_headers from hermes_cli.models import copilot_default_headers
async_kwargs["default_headers"] = copilot_default_headers() async_kwargs["default_headers"] = copilot_default_headers()
elif "api.kimi.com" in base_lower: elif base_url_host_matches(sync_base_url, "api.kimi.com"):
async_kwargs["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"} async_kwargs["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"}
return AsyncOpenAI(**async_kwargs), model return AsyncOpenAI(**async_kwargs), model
@ -1621,9 +1621,9 @@ def resolve_provider_client(
provider, provider,
) )
extra = {} extra = {}
if "api.kimi.com" in custom_base.lower(): if base_url_host_matches(custom_base, "api.kimi.com"):
extra["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"} extra["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"}
elif "api.githubcopilot.com" in custom_base.lower(): elif base_url_host_matches(custom_base, "api.githubcopilot.com"):
from hermes_cli.models import copilot_default_headers from hermes_cli.models import copilot_default_headers
extra["default_headers"] = copilot_default_headers() extra["default_headers"] = copilot_default_headers()
client = OpenAI(api_key=custom_key, base_url=custom_base, **extra) client = OpenAI(api_key=custom_key, base_url=custom_base, **extra)
@ -1728,9 +1728,9 @@ def resolve_provider_client(
# Provider-specific headers # Provider-specific headers
headers = {} headers = {}
if "api.kimi.com" in base_url.lower(): if base_url_host_matches(base_url, "api.kimi.com"):
headers["User-Agent"] = "KimiCLI/1.30.0" headers["User-Agent"] = "KimiCLI/1.30.0"
elif "api.githubcopilot.com" in base_url.lower(): elif base_url_host_matches(base_url, "api.githubcopilot.com"):
from hermes_cli.models import copilot_default_headers from hermes_cli.models import copilot_default_headers
headers.update(copilot_default_headers()) headers.update(copilot_default_headers())
@ -2154,7 +2154,7 @@ def cleanup_stale_async_clients() -> None:
def _is_openrouter_client(client: Any) -> bool: def _is_openrouter_client(client: Any) -> bool:
for obj in (client, getattr(client, "_client", None), getattr(client, "client", None)): for obj in (client, getattr(client, "_client", None), getattr(client, "client", None)):
if obj and "openrouter" in str(getattr(obj, "base_url", "") or "").lower(): if obj and base_url_host_matches(str(getattr(obj, "base_url", "") or ""), "openrouter.ai"):
return True return True
return False return False

View file

@ -14,7 +14,7 @@ from urllib.parse import urlparse
import requests import requests
import yaml import yaml
from utils import base_url_hostname from utils import base_url_host_matches, base_url_hostname
from hermes_constants import OPENROUTER_MODELS_URL from hermes_constants import OPENROUTER_MODELS_URL
@ -220,7 +220,7 @@ def _auth_headers(api_key: str = "") -> Dict[str, str]:
def _is_openrouter_base_url(base_url: str) -> bool: def _is_openrouter_base_url(base_url: str) -> bool:
return "openrouter.ai" in _normalize_base_url(base_url).lower() return base_url_host_matches(base_url, "openrouter.ai")
def _is_custom_endpoint(base_url: str) -> bool: def _is_custom_endpoint(base_url: str) -> bool:
@ -1089,7 +1089,11 @@ def get_model_context_length(
# 4b. AWS Bedrock — use static context length table. # 4b. AWS Bedrock — use static context length table.
# Bedrock's ListFoundationModels doesn't expose context window sizes, # Bedrock's ListFoundationModels doesn't expose context window sizes,
# so we maintain a curated table in bedrock_adapter.py. # so we maintain a curated table in bedrock_adapter.py.
if provider == "bedrock" or (base_url and "bedrock-runtime" in base_url): if provider == "bedrock" or (
base_url
and base_url_hostname(base_url).startswith("bedrock-runtime.")
and base_url_host_matches(base_url, "amazonaws.com")
):
try: try:
from agent.bedrock_adapter import get_bedrock_context_length from agent.bedrock_adapter import get_bedrock_context_length
return get_bedrock_context_length(model) return get_bedrock_context_length(model)

View file

@ -6,6 +6,7 @@ from decimal import Decimal
from typing import Any, Dict, Literal, Optional from typing import Any, Dict, Literal, Optional
from agent.model_metadata import fetch_endpoint_model_metadata, fetch_model_metadata from agent.model_metadata import fetch_endpoint_model_metadata, fetch_model_metadata
from utils import base_url_host_matches
DEFAULT_PRICING = {"input": 0.0, "output": 0.0} DEFAULT_PRICING = {"input": 0.0, "output": 0.0}
@ -393,7 +394,7 @@ def resolve_billing_route(
if provider_name == "openai-codex": if provider_name == "openai-codex":
return BillingRoute(provider="openai-codex", model=model, base_url=base_url or "", billing_mode="subscription_included") return BillingRoute(provider="openai-codex", model=model, base_url=base_url or "", billing_mode="subscription_included")
if provider_name == "openrouter" or "openrouter.ai" in base: if provider_name == "openrouter" or base_url_host_matches(base_url or "", "openrouter.ai"):
return BillingRoute(provider="openrouter", model=model, base_url=base_url or "", billing_mode="official_models_api") return BillingRoute(provider="openrouter", model=model, base_url=base_url or "", billing_mode="official_models_api")
if provider_name == "anthropic": if provider_name == "anthropic":
return BillingRoute(provider="anthropic", model=model.split("/")[-1], base_url=base_url or "", billing_mode="official_docs_snapshot") return BillingRoute(provider="anthropic", model=model.split("/")[-1], base_url=base_url or "", billing_mode="official_docs_snapshot")

7
cli.py
View file

@ -74,6 +74,7 @@ _COMMAND_SPINNER_FRAMES = ("⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧
# User-managed env files should override stale shell exports on restart. # User-managed env files should override stale shell exports on restart.
from hermes_constants import get_hermes_home, display_hermes_home from hermes_constants import get_hermes_home, display_hermes_home
from hermes_cli.env_loader import load_hermes_dotenv from hermes_cli.env_loader import load_hermes_dotenv
from utils import base_url_host_matches
_hermes_home = get_hermes_home() _hermes_home = get_hermes_home()
_project_env = Path(__file__).parent / '.env' _project_env = Path(__file__).parent / '.env'
@ -1836,7 +1837,7 @@ class HermesCLI:
# Match key to resolved base_url: OpenRouter URL → prefer OPENROUTER_API_KEY, # Match key to resolved base_url: OpenRouter URL → prefer OPENROUTER_API_KEY,
# custom endpoint → prefer OPENAI_API_KEY (issue #560). # custom endpoint → prefer OPENAI_API_KEY (issue #560).
# Note: _ensure_runtime_credentials() re-resolves this before first use. # Note: _ensure_runtime_credentials() re-resolves this before first use.
if self.base_url and "openrouter.ai" in self.base_url: if self.base_url and base_url_host_matches(self.base_url, "openrouter.ai"):
self.api_key = api_key or os.getenv("OPENROUTER_API_KEY") or os.getenv("OPENAI_API_KEY") self.api_key = api_key or os.getenv("OPENROUTER_API_KEY") or os.getenv("OPENAI_API_KEY")
else: else:
self.api_key = api_key or os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY") self.api_key = api_key or os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY")
@ -4996,7 +4997,7 @@ class HermesCLI:
pass pass
cache_enabled = ( cache_enabled = (
("openrouter" in (result.base_url or "").lower() and "claude" in result.new_model.lower()) (base_url_host_matches(result.base_url or "", "openrouter.ai") and "claude" in result.new_model.lower())
or result.api_mode == "anthropic_messages" or result.api_mode == "anthropic_messages"
) )
if cache_enabled: if cache_enabled:
@ -5224,7 +5225,7 @@ class HermesCLI:
# Cache notice # Cache notice
cache_enabled = ( cache_enabled = (
("openrouter" in (result.base_url or "").lower() and "claude" in result.new_model.lower()) (base_url_host_matches(result.base_url or "", "openrouter.ai") and "claude" in result.new_model.lower())
or result.api_mode == "anthropic_messages" or result.api_mode == "anthropic_messages"
) )
if cache_enabled: if cache_enabled:

View file

@ -86,7 +86,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
# Resolve Hermes home directory (respects HERMES_HOME override) # Resolve Hermes home directory (respects HERMES_HOME override)
from hermes_constants import get_hermes_home from hermes_constants import get_hermes_home
from utils import atomic_yaml_write, is_truthy_value from utils import atomic_yaml_write, base_url_host_matches, is_truthy_value
_hermes_home = get_hermes_home() _hermes_home = get_hermes_home()
# Load environment variables from ~/.hermes/.env first. # Load environment variables from ~/.hermes/.env first.
@ -5661,7 +5661,7 @@ class GatewayRunner:
# Cache notice # Cache notice
cache_enabled = ( cache_enabled = (
("openrouter" in (result.base_url or "").lower() and "claude" in result.new_model.lower()) (base_url_host_matches(result.base_url or "", "openrouter.ai") and "claude" in result.new_model.lower())
or result.api_mode == "anthropic_messages" or result.api_mode == "anthropic_messages"
) )
if cache_enabled: if cache_enabled:

View file

@ -30,6 +30,7 @@ load_dotenv(PROJECT_ROOT / ".env", override=False, encoding="utf-8")
from hermes_cli.colors import Colors, color from hermes_cli.colors import Colors, color
from hermes_constants import OPENROUTER_MODELS_URL from hermes_constants import OPENROUTER_MODELS_URL
from utils import base_url_host_matches
_PROVIDER_ENV_HINTS = ( _PROVIDER_ENV_HINTS = (
@ -952,7 +953,7 @@ def run_doctor(args):
_base = _to_openai_base_url(_base) _base = _to_openai_base_url(_base)
_url = (_base.rstrip("/") + "/models") if _base else _default_url _url = (_base.rstrip("/") + "/models") if _base else _default_url
_headers = {"Authorization": f"Bearer {_key}"} _headers = {"Authorization": f"Bearer {_key}"}
if "api.kimi.com" in _url.lower(): if base_url_host_matches(_base, "api.kimi.com"):
_headers["User-Agent"] = "KimiCLI/1.30.0" _headers["User-Agent"] = "KimiCLI/1.30.0"
_resp = httpx.get( _resp = httpx.get(
_url, _url,

View file

@ -23,7 +23,7 @@ import logging
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
from utils import base_url_hostname from utils import base_url_host_matches, base_url_hostname
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -441,7 +441,7 @@ def determine_api_mode(provider: str, base_url: str = "") -> str:
return "anthropic_messages" return "anthropic_messages"
if hostname == "api.openai.com": if hostname == "api.openai.com":
return "codex_responses" return "codex_responses"
if "bedrock-runtime" in url_lower and "amazonaws.com" in url_lower: if hostname.startswith("bedrock-runtime.") and base_url_host_matches(base_url, "amazonaws.com"):
return "bedrock_converse" return "bedrock_converse"
return "chat_completions" return "chat_completions"

View file

@ -29,7 +29,7 @@ from hermes_cli.auth import (
) )
from hermes_cli.config import get_compatible_custom_providers, load_config from hermes_cli.config import get_compatible_custom_providers, load_config
from hermes_constants import OPENROUTER_BASE_URL from hermes_constants import OPENROUTER_BASE_URL
from utils import base_url_hostname from utils import base_url_host_matches, base_url_hostname
def _normalize_custom_provider_name(value: str) -> str: def _normalize_custom_provider_name(value: str) -> str:
@ -482,7 +482,7 @@ def _resolve_openrouter_runtime(
# When hitting a custom endpoint (e.g. Z.ai, local LLM), prefer # When hitting a custom endpoint (e.g. Z.ai, local LLM), prefer
# OPENAI_API_KEY so the OpenRouter key doesn't leak to an unrelated # OPENAI_API_KEY so the OpenRouter key doesn't leak to an unrelated
# provider (issues #420, #560). # provider (issues #420, #560).
_is_openrouter_url = "openrouter.ai" in base_url _is_openrouter_url = base_url_host_matches(base_url, "openrouter.ai")
if _is_openrouter_url: if _is_openrouter_url:
api_key_candidates = [ api_key_candidates = [
explicit_api_key, explicit_api_key,

View file

@ -124,7 +124,7 @@ from agent.trajectory import (
convert_scratchpad_to_think, has_incomplete_scratchpad, convert_scratchpad_to_think, has_incomplete_scratchpad,
save_trajectory as _save_trajectory_to_file, save_trajectory as _save_trajectory_to_file,
) )
from utils import atomic_json_write, base_url_hostname, env_var_enabled from utils import atomic_json_write, base_url_host_matches, base_url_hostname, env_var_enabled
@ -845,7 +845,10 @@ class AIAgent:
self.api_mode = "codex_responses" self.api_mode = "codex_responses"
elif self.provider == "xai": elif self.provider == "xai":
self.api_mode = "codex_responses" self.api_mode = "codex_responses"
elif (provider_name is None) and "chatgpt.com/backend-api/codex" in self._base_url_lower: elif (provider_name is None) and (
self._base_url_hostname == "chatgpt.com"
and "/backend-api/codex" in self._base_url_lower
):
self.api_mode = "codex_responses" self.api_mode = "codex_responses"
self.provider = "openai-codex" self.provider = "openai-codex"
elif (provider_name is None) and self._base_url_hostname == "api.x.ai": elif (provider_name is None) and self._base_url_hostname == "api.x.ai":
@ -859,8 +862,12 @@ class AIAgent:
# use a URL convention ending in /anthropic. Auto-detect these so the # use a URL convention ending in /anthropic. Auto-detect these so the
# Anthropic Messages API adapter is used instead of chat completions. # Anthropic Messages API adapter is used instead of chat completions.
self.api_mode = "anthropic_messages" self.api_mode = "anthropic_messages"
elif self.provider == "bedrock" or "bedrock-runtime" in self._base_url_lower: elif self.provider == "bedrock" or (
# AWS Bedrock — auto-detect from provider name or base URL. self._base_url_hostname.startswith("bedrock-runtime.")
and base_url_host_matches(self._base_url_lower, "amazonaws.com")
):
# AWS Bedrock — auto-detect from provider name or base URL
# (bedrock-runtime.<region>.amazonaws.com).
self.api_mode = "bedrock_converse" self.api_mode = "bedrock_converse"
else: else:
self.api_mode = "chat_completions" self.api_mode = "chat_completions"
@ -1158,23 +1165,23 @@ class AIAgent:
client_kwargs["command"] = self.acp_command client_kwargs["command"] = self.acp_command
client_kwargs["args"] = self.acp_args client_kwargs["args"] = self.acp_args
effective_base = base_url effective_base = base_url
if "openrouter" in effective_base.lower(): if base_url_host_matches(effective_base, "openrouter.ai"):
client_kwargs["default_headers"] = { client_kwargs["default_headers"] = {
"HTTP-Referer": "https://hermes-agent.nousresearch.com", "HTTP-Referer": "https://hermes-agent.nousresearch.com",
"X-OpenRouter-Title": "Hermes Agent", "X-OpenRouter-Title": "Hermes Agent",
"X-OpenRouter-Categories": "productivity,cli-agent", "X-OpenRouter-Categories": "productivity,cli-agent",
} }
elif "api.githubcopilot.com" in effective_base.lower(): elif base_url_host_matches(effective_base, "api.githubcopilot.com"):
from hermes_cli.models import copilot_default_headers from hermes_cli.models import copilot_default_headers
client_kwargs["default_headers"] = copilot_default_headers() client_kwargs["default_headers"] = copilot_default_headers()
elif "api.kimi.com" in effective_base.lower(): elif base_url_host_matches(effective_base, "api.kimi.com"):
client_kwargs["default_headers"] = { client_kwargs["default_headers"] = {
"User-Agent": "KimiCLI/1.30.0", "User-Agent": "KimiCLI/1.30.0",
} }
elif "portal.qwen.ai" in effective_base.lower(): elif base_url_host_matches(effective_base, "portal.qwen.ai"):
client_kwargs["default_headers"] = _qwen_portal_headers() client_kwargs["default_headers"] = _qwen_portal_headers()
elif "chatgpt.com" in effective_base.lower(): elif base_url_host_matches(effective_base, "chatgpt.com"):
from agent.auxiliary_client import _codex_cloudflare_headers from agent.auxiliary_client import _codex_cloudflare_headers
client_kwargs["default_headers"] = _codex_cloudflare_headers(api_key) client_kwargs["default_headers"] = _codex_cloudflare_headers(api_key)
else: else:
@ -1230,7 +1237,7 @@ class AIAgent:
# stream tool call arguments token-by-token, keeping the # stream tool call arguments token-by-token, keeping the
# connection alive. # connection alive.
_effective_base = str(client_kwargs.get("base_url", "")).lower() _effective_base = str(client_kwargs.get("base_url", "")).lower()
if "openrouter" in _effective_base and "claude" in (self.model or "").lower(): if base_url_host_matches(_effective_base, "openrouter.ai") and "claude" in (self.model or "").lower():
headers = client_kwargs.get("default_headers") or {} headers = client_kwargs.get("default_headers") or {}
existing_beta = headers.get("x-anthropic-beta", "") existing_beta = headers.get("x-anthropic-beta", "")
_FINE_GRAINED = "fine-grained-tool-streaming-2025-05-14" _FINE_GRAINED = "fine-grained-tool-streaming-2025-05-14"
@ -2328,7 +2335,7 @@ class AIAgent:
def _is_openrouter_url(self) -> bool: def _is_openrouter_url(self) -> bool:
"""Return True when the base URL targets OpenRouter.""" """Return True when the base URL targets OpenRouter."""
return "openrouter" in self._base_url_lower return base_url_host_matches(self._base_url_lower, "openrouter.ai")
def _anthropic_prompt_cache_policy( def _anthropic_prompt_cache_policy(
self, self,
@ -2363,7 +2370,7 @@ class AIAgent:
base_lower = eff_base_url.lower() base_lower = eff_base_url.lower()
is_claude = "claude" in eff_model.lower() is_claude = "claude" in eff_model.lower()
is_openrouter = "openrouter" in base_lower is_openrouter = base_url_host_matches(eff_base_url, "openrouter.ai")
is_anthropic_wire = eff_api_mode == "anthropic_messages" is_anthropic_wire = eff_api_mode == "anthropic_messages"
is_native_anthropic = ( is_native_anthropic = (
is_anthropic_wire is_anthropic_wire
@ -5002,20 +5009,19 @@ class AIAgent:
def _apply_client_headers_for_base_url(self, base_url: str) -> None: def _apply_client_headers_for_base_url(self, base_url: str) -> None:
from agent.auxiliary_client import _AI_GATEWAY_HEADERS, _OR_HEADERS from agent.auxiliary_client import _AI_GATEWAY_HEADERS, _OR_HEADERS
normalized = (base_url or "").lower() if base_url_host_matches(base_url, "openrouter.ai"):
if "openrouter" in normalized:
self._client_kwargs["default_headers"] = dict(_OR_HEADERS) self._client_kwargs["default_headers"] = dict(_OR_HEADERS)
elif "ai-gateway.vercel.sh" in normalized: elif base_url_host_matches(base_url, "ai-gateway.vercel.sh"):
self._client_kwargs["default_headers"] = dict(_AI_GATEWAY_HEADERS) self._client_kwargs["default_headers"] = dict(_AI_GATEWAY_HEADERS)
elif "api.githubcopilot.com" in normalized: elif base_url_host_matches(base_url, "api.githubcopilot.com"):
from hermes_cli.models import copilot_default_headers from hermes_cli.models import copilot_default_headers
self._client_kwargs["default_headers"] = copilot_default_headers() self._client_kwargs["default_headers"] = copilot_default_headers()
elif "api.kimi.com" in normalized: elif base_url_host_matches(base_url, "api.kimi.com"):
self._client_kwargs["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"} self._client_kwargs["default_headers"] = {"User-Agent": "KimiCLI/1.30.0"}
elif "portal.qwen.ai" in normalized: elif base_url_host_matches(base_url, "portal.qwen.ai"):
self._client_kwargs["default_headers"] = _qwen_portal_headers() self._client_kwargs["default_headers"] = _qwen_portal_headers()
elif "chatgpt.com" in normalized: elif base_url_host_matches(base_url, "chatgpt.com"):
from agent.auxiliary_client import _codex_cloudflare_headers from agent.auxiliary_client import _codex_cloudflare_headers
self._client_kwargs["default_headers"] = _codex_cloudflare_headers( self._client_kwargs["default_headers"] = _codex_cloudflare_headers(
self._client_kwargs.get("api_key", "") self._client_kwargs.get("api_key", "")
@ -6163,7 +6169,10 @@ class AIAgent:
# provider-specific exceptions like Copilot gpt-5-mini on # provider-specific exceptions like Copilot gpt-5-mini on
# chat completions. # chat completions.
fb_api_mode = "codex_responses" fb_api_mode = "codex_responses"
elif fb_provider == "bedrock" or "bedrock-runtime" in fb_base_url.lower(): elif fb_provider == "bedrock" or (
base_url_hostname(fb_base_url).startswith("bedrock-runtime.")
and base_url_host_matches(fb_base_url, "amazonaws.com")
):
fb_api_mode = "bedrock_converse" fb_api_mode = "bedrock_converse"
old_model = self.model old_model = self.model
@ -6596,7 +6605,7 @@ class AIAgent:
def _is_qwen_portal(self) -> bool: def _is_qwen_portal(self) -> bool:
"""Return True when the base URL targets Qwen Portal.""" """Return True when the base URL targets Qwen Portal."""
return "portal.qwen.ai" in self._base_url_lower return base_url_host_matches(self._base_url_lower, "portal.qwen.ai")
def _qwen_prepare_chat_messages(self, api_messages: list) -> list: def _qwen_prepare_chat_messages(self, api_messages: list) -> list:
prepared = copy.deepcopy(api_messages) prepared = copy.deepcopy(api_messages)
@ -6717,12 +6726,15 @@ class AIAgent:
instructions = DEFAULT_AGENT_IDENTITY instructions = DEFAULT_AGENT_IDENTITY
is_github_responses = ( is_github_responses = (
"models.github.ai" in self.base_url.lower() base_url_host_matches(self.base_url, "models.github.ai")
or "api.githubcopilot.com" in self.base_url.lower() or base_url_host_matches(self.base_url, "api.githubcopilot.com")
) )
is_codex_backend = ( is_codex_backend = (
self.provider == "openai-codex" self.provider == "openai-codex"
or "chatgpt.com/backend-api/codex" in self.base_url.lower() or (
self._base_url_hostname == "chatgpt.com"
and "/backend-api/codex" in self._base_url_lower
)
) )
# Resolve reasoning effort: config > default (medium) # Resolve reasoning effort: config > default (medium)
@ -6923,8 +6935,8 @@ class AIAgent:
_is_openrouter = self._is_openrouter_url() _is_openrouter = self._is_openrouter_url()
_is_github_models = ( _is_github_models = (
"models.github.ai" in self._base_url_lower base_url_host_matches(self._base_url_lower, "models.github.ai")
or "api.githubcopilot.com" in self._base_url_lower or base_url_host_matches(self._base_url_lower, "api.githubcopilot.com")
) )
# Provider preferences (only, ignore, order, sort) are OpenRouter- # Provider preferences (only, ignore, order, sort) are OpenRouter-
@ -7000,11 +7012,14 @@ class AIAgent:
Some providers/routes reject `reasoning` with 400s, so gate it to Some providers/routes reject `reasoning` with 400s, so gate it to
known reasoning-capable model families and direct Nous Portal. known reasoning-capable model families and direct Nous Portal.
""" """
if "nousresearch" in self._base_url_lower: if base_url_host_matches(self._base_url_lower, "nousresearch.com"):
return True return True
if "ai-gateway.vercel.sh" in self._base_url_lower: if base_url_host_matches(self._base_url_lower, "ai-gateway.vercel.sh"):
return True return True
if "models.github.ai" in self._base_url_lower or "api.githubcopilot.com" in self._base_url_lower: if (
base_url_host_matches(self._base_url_lower, "models.github.ai")
or base_url_host_matches(self._base_url_lower, "api.githubcopilot.com")
):
try: try:
from hermes_cli.models import github_model_reasoning_efforts from hermes_cli.models import github_model_reasoning_efforts
@ -10566,7 +10581,7 @@ class AIAgent:
self._vprint(f"{self.log_prefix} 💡 Your API key was rejected by the provider. Check:", force=True) self._vprint(f"{self.log_prefix} 💡 Your API key was rejected by the provider. Check:", force=True)
self._vprint(f"{self.log_prefix} • Is the key valid? Run: hermes setup", force=True) self._vprint(f"{self.log_prefix} • Is the key valid? Run: hermes setup", force=True)
self._vprint(f"{self.log_prefix} • Does your account have access to {_model}?", force=True) self._vprint(f"{self.log_prefix} • Does your account have access to {_model}?", force=True)
if "openrouter" in str(_base).lower(): if base_url_host_matches(str(_base), "openrouter.ai"):
self._vprint(f"{self.log_prefix} • Check credits: https://openrouter.ai/settings/credits", force=True) self._vprint(f"{self.log_prefix} • Check credits: https://openrouter.ai/settings/credits", force=True)
else: else:
self._vprint(f"{self.log_prefix} 💡 This type of error won't be fixed by retrying.", force=True) self._vprint(f"{self.log_prefix} 💡 This type of error won't be fixed by retrying.", force=True)

View file

@ -1,13 +1,17 @@
"""Targeted tests for ``utils.base_url_hostname``. """Targeted tests for ``utils.base_url_hostname`` and ``base_url_host_matches``.
The helper is used across provider routing, auxiliary client, and setup These helpers are used across provider routing, auxiliary client, setup
wizards to avoid the substring-match false-positive class documented in wizards, billing routes, and the trajectory compressor to avoid the
substring-match false-positive class documented in
tests/agent/test_direct_provider_url_detection.py. tests/agent/test_direct_provider_url_detection.py.
""" """
from __future__ import annotations from __future__ import annotations
from utils import base_url_hostname from utils import base_url_hostname, base_url_host_matches
# ─── base_url_hostname ────────────────────────────────────────────────────
def test_empty_returns_empty_string(): def test_empty_returns_empty_string():
@ -31,18 +35,15 @@ def test_hostname_case_insensitive():
def test_trailing_dot_stripped(): def test_trailing_dot_stripped():
# Fully-qualified hostnames may include a trailing dot.
assert base_url_hostname("https://api.openai.com./v1") == "api.openai.com" assert base_url_hostname("https://api.openai.com./v1") == "api.openai.com"
def test_path_containing_provider_host_is_not_the_hostname(): def test_path_containing_provider_host_is_not_the_hostname():
# The key regression — proxy paths must never be misread as the host.
assert base_url_hostname("https://proxy.example.test/api.openai.com/v1") == "proxy.example.test" assert base_url_hostname("https://proxy.example.test/api.openai.com/v1") == "proxy.example.test"
assert base_url_hostname("https://proxy.example.test/api.anthropic.com/v1") == "proxy.example.test" assert base_url_hostname("https://proxy.example.test/api.anthropic.com/v1") == "proxy.example.test"
def test_host_suffix_is_not_the_provider(): def test_host_suffix_is_not_the_provider():
# A hostname that merely ends with the provider domain is not the provider.
assert base_url_hostname("https://api.openai.com.example/v1") == "api.openai.com.example" assert base_url_hostname("https://api.openai.com.example/v1") == "api.openai.com.example"
assert base_url_hostname("https://api.x.ai.example/v1") == "api.x.ai.example" assert base_url_hostname("https://api.x.ai.example/v1") == "api.x.ai.example"
@ -53,3 +54,55 @@ def test_port_is_ignored():
def test_whitespace_stripped(): def test_whitespace_stripped():
assert base_url_hostname(" https://api.openai.com/v1 ") == "api.openai.com" assert base_url_hostname(" https://api.openai.com/v1 ") == "api.openai.com"
# ─── base_url_host_matches ────────────────────────────────────────────────
class TestBaseUrlHostMatchesExact:
def test_exact_domain_matches(self):
assert base_url_host_matches("https://openrouter.ai/api/v1", "openrouter.ai") is True
assert base_url_host_matches("https://moonshot.ai", "moonshot.ai") is True
def test_subdomain_matches(self):
# A subdomain of the registered domain should match — needed for
# api.moonshot.ai / api.kimi.com / portal.qwen.ai lookups that
# accept both the bare registrable domain and any subdomain under it.
assert base_url_host_matches("https://api.moonshot.ai/v1", "moonshot.ai") is True
assert base_url_host_matches("https://api.kimi.com/v1", "api.kimi.com") is True
assert base_url_host_matches("https://portal.qwen.ai/v1", "portal.qwen.ai") is True
class TestBaseUrlHostMatchesNegatives:
"""The reason this helper exists — defend against substring collisions."""
def test_path_segment_containing_domain_does_not_match(self):
assert base_url_host_matches("https://evil.test/moonshot.ai/v1", "moonshot.ai") is False
assert base_url_host_matches("https://proxy.example.test/openrouter.ai/v1", "openrouter.ai") is False
assert base_url_host_matches("https://proxy/api.kimi.com/v1", "api.kimi.com") is False
def test_host_suffix_does_not_match(self):
# Attacker-controlled hosts that end with the domain string are not
# the domain.
assert base_url_host_matches("https://moonshot.ai.evil/v1", "moonshot.ai") is False
assert base_url_host_matches("https://openrouter.ai.example/v1", "openrouter.ai") is False
def test_host_prefix_does_not_match(self):
# "fake-openrouter.ai" is not a subdomain of openrouter.ai.
assert base_url_host_matches("https://fake-openrouter.ai/v1", "openrouter.ai") is False
class TestBaseUrlHostMatchesEdgeCases:
def test_empty_base_url_returns_false(self):
assert base_url_host_matches("", "openrouter.ai") is False
assert base_url_host_matches(None, "openrouter.ai") is False # type: ignore[arg-type]
def test_empty_domain_returns_false(self):
assert base_url_host_matches("https://openrouter.ai/v1", "") is False
def test_case_insensitive(self):
assert base_url_host_matches("https://OpenRouter.AI/v1", "openrouter.ai") is True
assert base_url_host_matches("https://openrouter.ai/v1", "OPENROUTER.AI") is True
def test_trailing_dot_on_domain_stripped(self):
assert base_url_host_matches("https://openrouter.ai/v1", "openrouter.ai.") is True

View file

@ -1025,7 +1025,10 @@ def _resolve_delegation_credentials(cfg: dict, parent_agent) -> dict:
base_lower = configured_base_url.lower() base_lower = configured_base_url.lower()
provider = "custom" provider = "custom"
api_mode = "chat_completions" api_mode = "chat_completions"
if "chatgpt.com/backend-api/codex" in base_lower: if (
base_url_hostname(configured_base_url) == "chatgpt.com"
and "/backend-api/codex" in base_lower
):
provider = "openai-codex" provider = "openai-codex"
api_mode = "codex_responses" api_mode = "codex_responses"
elif base_url_hostname(configured_base_url) == "api.anthropic.com": elif base_url_hostname(configured_base_url) == "api.anthropic.com":

View file

@ -40,6 +40,8 @@ from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple, Callable from typing import List, Dict, Any, Optional, Tuple, Callable
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime from datetime import datetime
from utils import base_url_host_matches, base_url_hostname
import fire import fire
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn, TimeElapsedColumn, TimeRemainingColumn from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn, TimeElapsedColumn, TimeRemainingColumn
from rich.console import Console from rich.console import Console
@ -432,22 +434,29 @@ class TrajectoryCompressor:
def _detect_provider(self) -> str: def _detect_provider(self) -> str:
"""Detect the provider name from the configured base_url.""" """Detect the provider name from the configured base_url."""
url = (self.config.base_url or "").lower() url = self.config.base_url or ""
if "openrouter" in url: if base_url_host_matches(url, "openrouter.ai"):
return "openrouter" return "openrouter"
if "nousresearch.com" in url: if base_url_host_matches(url, "nousresearch.com"):
return "nous" return "nous"
if "chatgpt.com/backend-api/codex" in url: if (
base_url_hostname(url) == "chatgpt.com"
and "/backend-api/codex" in url.lower()
):
return "codex" return "codex"
if "api.z.ai" in url: if base_url_host_matches(url, "z.ai"):
return "zai" return "zai"
if "moonshot.ai" in url or "moonshot.cn" in url or "api.kimi.com" in url: if (
base_url_host_matches(url, "moonshot.ai")
or base_url_host_matches(url, "moonshot.cn")
or base_url_host_matches(url, "api.kimi.com")
):
return "kimi-coding" return "kimi-coding"
if "arcee.ai" in url: if base_url_host_matches(url, "arcee.ai"):
return "arcee" return "arcee"
if "minimaxi.com" in url: if base_url_host_matches(url, "minimaxi.com"):
return "minimax-cn" return "minimax-cn"
if "minimax.io" in url: if base_url_host_matches(url, "minimax.io"):
return "minimax" return "minimax"
# Unknown base_url — not a known provider # Unknown base_url — not a known provider
return "" return ""

View file

@ -216,3 +216,24 @@ def base_url_hostname(base_url: str) -> str:
parsed = urlparse(raw if "://" in raw else f"//{raw}") parsed = urlparse(raw if "://" in raw else f"//{raw}")
return (parsed.hostname or "").lower().rstrip(".") return (parsed.hostname or "").lower().rstrip(".")
def base_url_host_matches(base_url: str, domain: str) -> bool:
"""Return True when the base URL's hostname is ``domain`` or a subdomain.
Safer counterpart to ``domain in base_url``, which is the substring
false-positive class documented on ``base_url_hostname``. Accepts bare
hosts, full URLs, and URLs with paths.
base_url_host_matches("https://api.moonshot.ai/v1", "moonshot.ai") == True
base_url_host_matches("https://moonshot.ai", "moonshot.ai") == True
base_url_host_matches("https://evil.com/moonshot.ai/v1", "moonshot.ai") == False
base_url_host_matches("https://moonshot.ai.evil/v1", "moonshot.ai") == False
"""
hostname = base_url_hostname(base_url)
if not hostname:
return False
domain = (domain or "").strip().lower().rstrip(".")
if not domain:
return False
return hostname == domain or hostname.endswith("." + domain)