Add pooled same-provider credential fallback

This commit is contained in:
kshitijk4poor 2026-03-23 22:37:13 +05:30
parent 934fbe3c06
commit b17e5c101d
18 changed files with 2872 additions and 195 deletions

View file

@ -201,60 +201,75 @@ def is_claude_code_token_valid(creds: Dict[str, Any]) -> bool:
return now_ms < (expires_at - 60_000) return now_ms < (expires_at - 60_000)
def _refresh_oauth_token(creds: Dict[str, Any]) -> Optional[str]: def refresh_anthropic_oauth_pure(refresh_token: str, *, use_json: bool = False) -> Dict[str, Any]:
"""Attempt to refresh an expired Claude Code OAuth token. """Refresh an Anthropic OAuth token without mutating local credential files."""
import time
Uses the same token endpoint and client_id as Claude Code / OpenCode.
Only works for credentials that have a refresh token (from claude /login
or claude setup-token with OAuth flow).
Returns the new access token, or None if refresh fails.
"""
import urllib.parse import urllib.parse
import urllib.request import urllib.request
refresh_token = creds.get("refreshToken", "")
if not refresh_token: if not refresh_token:
logger.debug("No refresh token available — cannot refresh") raise ValueError("refresh_token is required")
return None
# Client ID used by Claude Code's OAuth flow client_id = "9d1c250a-e61b-44d9-88ed-5944d1962f5e"
CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e" if use_json:
data = json.dumps({
data = urllib.parse.urlencode({ "grant_type": "refresh_token",
"grant_type": "refresh_token", "refresh_token": refresh_token,
"refresh_token": refresh_token, "client_id": client_id,
"client_id": CLIENT_ID, }).encode()
}).encode() content_type = "application/json"
else:
data = urllib.parse.urlencode({
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": client_id,
}).encode()
content_type = "application/x-www-form-urlencoded"
req = urllib.request.Request( req = urllib.request.Request(
"https://console.anthropic.com/v1/oauth/token", "https://console.anthropic.com/v1/oauth/token",
data=data, data=data,
headers={ headers={
"Content-Type": "application/x-www-form-urlencoded", "Content-Type": content_type,
"User-Agent": f"claude-cli/{_CLAUDE_CODE_VERSION} (external, cli)", "User-Agent": f"claude-cli/{_CLAUDE_CODE_VERSION} (external, cli)",
}, },
method="POST", method="POST",
) )
try: with urllib.request.urlopen(req, timeout=10) as resp:
with urllib.request.urlopen(req, timeout=10) as resp: result = json.loads(resp.read().decode())
result = json.loads(resp.read().decode())
new_access = result.get("access_token", "")
new_refresh = result.get("refresh_token", refresh_token)
expires_in = result.get("expires_in", 3600) # seconds
if new_access: access_token = result.get("access_token", "")
import time if not access_token:
new_expires_ms = int(time.time() * 1000) + (expires_in * 1000) raise ValueError("Anthropic refresh response was missing access_token")
# Write refreshed credentials back to ~/.claude/.credentials.json next_refresh = result.get("refresh_token", refresh_token)
_write_claude_code_credentials(new_access, new_refresh, new_expires_ms) expires_in = result.get("expires_in", 3600)
logger.debug("Successfully refreshed Claude Code OAuth token") return {
return new_access "access_token": access_token,
"refresh_token": next_refresh,
"expires_at_ms": int(time.time() * 1000) + (expires_in * 1000),
}
def _refresh_oauth_token(creds: Dict[str, Any]) -> Optional[str]:
"""Attempt to refresh an expired Claude Code OAuth token."""
refresh_token = creds.get("refreshToken", "")
if not refresh_token:
logger.debug("No refresh token available — cannot refresh")
return None
try:
refreshed = refresh_anthropic_oauth_pure(refresh_token, use_json=False)
_write_claude_code_credentials(
refreshed["access_token"],
refreshed["refresh_token"],
refreshed["expires_at_ms"],
)
logger.debug("Successfully refreshed Claude Code OAuth token")
return refreshed["access_token"]
except Exception as e: except Exception as e:
logger.debug("Failed to refresh Claude Code token: %s", e) logger.debug("Failed to refresh Claude Code token: %s", e)
return None
return None
def _write_claude_code_credentials(access_token: str, refresh_token: str, expires_at_ms: int) -> None: def _write_claude_code_credentials(access_token: str, refresh_token: str, expires_at_ms: int) -> None:
@ -466,14 +481,8 @@ def _generate_pkce() -> tuple:
return verifier, challenge return verifier, challenge
def run_hermes_oauth_login() -> Optional[str]: def run_hermes_oauth_login_pure() -> Optional[Dict[str, Any]]:
"""Run Hermes-native OAuth PKCE flow for Claude Pro/Max subscription. """Run Hermes-native OAuth PKCE flow and return credential state."""
Opens a browser to claude.ai for authorization, prompts for the code,
exchanges it for tokens, and stores them in ~/.hermes/.anthropic_oauth.json.
Returns the access token on success, None on failure.
"""
import time import time
import webbrowser import webbrowser
@ -564,10 +573,32 @@ def run_hermes_oauth_login() -> Optional[str]:
print("No access token in response.") print("No access token in response.")
return None return None
# Store credentials
expires_at_ms = int(time.time() * 1000) + (expires_in * 1000) expires_at_ms = int(time.time() * 1000) + (expires_in * 1000)
_save_hermes_oauth_credentials(access_token, refresh_token, expires_at_ms) return {
"access_token": access_token,
"refresh_token": refresh_token,
"expires_at_ms": expires_at_ms,
}
def run_hermes_oauth_login() -> Optional[str]:
"""Run Hermes-native OAuth PKCE flow for Claude Pro/Max subscription.
Opens a browser to claude.ai for authorization, prompts for the code,
exchanges it for tokens, and stores them in ~/.hermes/.anthropic_oauth.json.
Returns the access token on success, None on failure.
"""
result = run_hermes_oauth_login_pure()
if not result:
return None
access_token = result["access_token"]
refresh_token = result["refresh_token"]
expires_at_ms = result["expires_at_ms"]
# Store credentials
_save_hermes_oauth_credentials(access_token, refresh_token, expires_at_ms)
# Also write to Claude Code's credential file for backward compat # Also write to Claude Code's credential file for backward compat
_write_claude_code_credentials(access_token, refresh_token, expires_at_ms) _write_claude_code_credentials(access_token, refresh_token, expires_at_ms)
@ -607,44 +638,27 @@ def refresh_hermes_oauth_token() -> Optional[str]:
Returns the new access token, or None if refresh fails. Returns the new access token, or None if refresh fails.
""" """
import time
import urllib.request
creds = read_hermes_oauth_credentials() creds = read_hermes_oauth_credentials()
if not creds or not creds.get("refreshToken"): if not creds or not creds.get("refreshToken"):
return None return None
try: try:
data = json.dumps({ refreshed = refresh_anthropic_oauth_pure(
"grant_type": "refresh_token", creds["refreshToken"],
"refresh_token": creds["refreshToken"], use_json=True,
"client_id": _OAUTH_CLIENT_ID,
}).encode()
req = urllib.request.Request(
_OAUTH_TOKEN_URL,
data=data,
headers={
"Content-Type": "application/json",
"User-Agent": f"claude-cli/{_CLAUDE_CODE_VERSION} (external, cli)",
},
method="POST",
) )
_save_hermes_oauth_credentials(
with urllib.request.urlopen(req, timeout=10) as resp: refreshed["access_token"],
result = json.loads(resp.read().decode()) refreshed["refresh_token"],
refreshed["expires_at_ms"],
new_access = result.get("access_token", "") )
new_refresh = result.get("refresh_token", creds["refreshToken"]) _write_claude_code_credentials(
expires_in = result.get("expires_in", 3600) refreshed["access_token"],
refreshed["refresh_token"],
if new_access: refreshed["expires_at_ms"],
new_expires_ms = int(time.time() * 1000) + (expires_in * 1000) )
_save_hermes_oauth_credentials(new_access, new_refresh, new_expires_ms) logger.debug("Successfully refreshed Hermes OAuth token")
# Also update Claude Code's credential file return refreshed["access_token"]
_write_claude_code_credentials(new_access, new_refresh, new_expires_ms)
logger.debug("Successfully refreshed Hermes OAuth token")
return new_access
except Exception as e: except Exception as e:
logger.debug("Failed to refresh Hermes OAuth token: %s", e) logger.debug("Failed to refresh Hermes OAuth token: %s", e)

View file

@ -47,6 +47,7 @@ from typing import Any, Dict, List, Optional, Tuple
from openai import OpenAI from openai import OpenAI
from agent.credential_pool import load_pool
from hermes_cli.config import get_hermes_home from hermes_cli.config import get_hermes_home
from hermes_constants import OPENROUTER_BASE_URL from hermes_constants import OPENROUTER_BASE_URL
@ -96,6 +97,45 @@ _CODEX_AUX_MODEL = "gpt-5.2-codex"
_CODEX_AUX_BASE_URL = "https://chatgpt.com/backend-api/codex" _CODEX_AUX_BASE_URL = "https://chatgpt.com/backend-api/codex"
def _select_pool_entry(provider: str) -> Tuple[bool, Optional[Any]]:
"""Return (pool_exists_for_provider, selected_entry)."""
try:
pool = load_pool(provider)
except Exception as exc:
logger.debug("Auxiliary client: could not load pool for %s: %s", provider, exc)
return False, None
if not pool or not pool.has_credentials():
return False, None
try:
return True, pool.select()
except Exception as exc:
logger.debug("Auxiliary client: could not select pool entry for %s: %s", provider, exc)
return True, None
def _pool_runtime_api_key(entry: Any) -> str:
if entry is None:
return ""
return str(
getattr(entry, "runtime_api_key", None)
or getattr(entry, "agent_key", None)
or getattr(entry, "access_token", "")
or ""
).strip()
def _pool_runtime_base_url(entry: Any, fallback: str = "") -> str:
if entry is None:
return str(fallback or "").strip().rstrip("/")
return str(
getattr(entry, "runtime_base_url", None)
or getattr(entry, "inference_base_url", None)
or getattr(entry, "base_url", None)
or fallback
or ""
).strip().rstrip("/")
# ── Codex Responses → chat.completions adapter ───────────────────────────── # ── Codex Responses → chat.completions adapter ─────────────────────────────
# All auxiliary consumers call client.chat.completions.create(**kwargs) and # All auxiliary consumers call client.chat.completions.create(**kwargs) and
# read response.choices[0].message.content. This adapter translates those # read response.choices[0].message.content. This adapter translates those
@ -439,6 +479,21 @@ def _read_nous_auth() -> Optional[dict]:
Returns the provider state dict if Nous is active with tokens, Returns the provider state dict if Nous is active with tokens,
otherwise None. otherwise None.
""" """
pool_present, entry = _select_pool_entry("nous")
if pool_present:
if entry is None:
return None
return {
"access_token": getattr(entry, "access_token", ""),
"refresh_token": getattr(entry, "refresh_token", None),
"agent_key": getattr(entry, "agent_key", None),
"inference_base_url": _pool_runtime_base_url(entry, _NOUS_DEFAULT_BASE_URL),
"portal_base_url": getattr(entry, "portal_base_url", None),
"client_id": getattr(entry, "client_id", None),
"scope": getattr(entry, "scope", None),
"token_type": getattr(entry, "token_type", "Bearer"),
}
try: try:
if not _AUTH_JSON_PATH.is_file(): if not _AUTH_JSON_PATH.is_file():
return None return None
@ -467,6 +522,11 @@ def _nous_base_url() -> str:
def _read_codex_access_token() -> Optional[str]: def _read_codex_access_token() -> Optional[str]:
"""Read a valid, non-expired Codex OAuth access token from Hermes auth store.""" """Read a valid, non-expired Codex OAuth access token from Hermes auth store."""
pool_present, entry = _select_pool_entry("openai-codex")
if pool_present:
token = _pool_runtime_api_key(entry)
return token or None
try: try:
from hermes_cli.auth import _read_codex_tokens from hermes_cli.auth import _read_codex_tokens
data = _read_codex_tokens() data = _read_codex_tokens()
@ -513,6 +573,24 @@ def _resolve_api_key_provider() -> Tuple[Optional[OpenAI], Optional[str]]:
if provider_id == "anthropic": if provider_id == "anthropic":
return _try_anthropic() return _try_anthropic()
pool_present, entry = _select_pool_entry(provider_id)
if pool_present:
api_key = _pool_runtime_api_key(entry)
if not api_key:
continue
base_url = _pool_runtime_base_url(entry, pconfig.inference_base_url) or pconfig.inference_base_url
model = _API_KEY_PROVIDER_AUX_MODELS.get(provider_id, "default")
logger.debug("Auxiliary text client: %s (%s) via pool", pconfig.name, model)
extra = {}
if "api.kimi.com" in base_url.lower():
extra["default_headers"] = {"User-Agent": "KimiCLI/1.0"}
elif "api.githubcopilot.com" in base_url.lower():
from hermes_cli.models import copilot_default_headers
extra["default_headers"] = copilot_default_headers()
return OpenAI(api_key=api_key, base_url=base_url, **extra), model
creds = resolve_api_key_provider_credentials(provider_id) creds = resolve_api_key_provider_credentials(provider_id)
api_key = str(creds.get("api_key", "")).strip() api_key = str(creds.get("api_key", "")).strip()
if not api_key: if not api_key:
@ -562,6 +640,16 @@ def _get_auxiliary_env_override(task: str, suffix: str) -> Optional[str]:
def _try_openrouter() -> Tuple[Optional[OpenAI], Optional[str]]: def _try_openrouter() -> Tuple[Optional[OpenAI], Optional[str]]:
pool_present, entry = _select_pool_entry("openrouter")
if pool_present:
or_key = _pool_runtime_api_key(entry)
if not or_key:
return None, None
base_url = _pool_runtime_base_url(entry, OPENROUTER_BASE_URL) or OPENROUTER_BASE_URL
logger.debug("Auxiliary client: OpenRouter via pool")
return OpenAI(api_key=or_key, base_url=base_url,
default_headers=_OR_HEADERS), _OPENROUTER_MODEL
or_key = os.getenv("OPENROUTER_API_KEY") or_key = os.getenv("OPENROUTER_API_KEY")
if not or_key: if not or_key:
return None, None return None, None
@ -578,7 +666,10 @@ def _try_nous() -> Tuple[Optional[OpenAI], Optional[str]]:
auxiliary_is_nous = True auxiliary_is_nous = True
logger.debug("Auxiliary client: Nous Portal") logger.debug("Auxiliary client: Nous Portal")
return ( return (
OpenAI(api_key=_nous_api_key(nous), base_url=_nous_base_url()), OpenAI(
api_key=_nous_api_key(nous),
base_url=str(nous.get("inference_base_url") or _nous_base_url()).rstrip("/"),
),
_NOUS_MODEL, _NOUS_MODEL,
) )
@ -654,11 +745,19 @@ def _try_custom_endpoint() -> Tuple[Optional[OpenAI], Optional[str]]:
def _try_codex() -> Tuple[Optional[Any], Optional[str]]: def _try_codex() -> Tuple[Optional[Any], Optional[str]]:
codex_token = _read_codex_access_token() pool_present, entry = _select_pool_entry("openai-codex")
if not codex_token: if pool_present:
return None, None codex_token = _pool_runtime_api_key(entry)
if not codex_token:
return None, None
base_url = _pool_runtime_base_url(entry, _CODEX_AUX_BASE_URL) or _CODEX_AUX_BASE_URL
else:
codex_token = _read_codex_access_token()
if not codex_token:
return None, None
base_url = _CODEX_AUX_BASE_URL
logger.debug("Auxiliary client: Codex OAuth (%s via Responses API)", _CODEX_AUX_MODEL) logger.debug("Auxiliary client: Codex OAuth (%s via Responses API)", _CODEX_AUX_MODEL)
real_client = OpenAI(api_key=codex_token, base_url=_CODEX_AUX_BASE_URL) real_client = OpenAI(api_key=codex_token, base_url=base_url)
return CodexAuxiliaryClient(real_client, _CODEX_AUX_MODEL), _CODEX_AUX_MODEL return CodexAuxiliaryClient(real_client, _CODEX_AUX_MODEL), _CODEX_AUX_MODEL
@ -668,14 +767,21 @@ def _try_anthropic() -> Tuple[Optional[Any], Optional[str]]:
except ImportError: except ImportError:
return None, None return None, None
token = resolve_anthropic_token() pool_present, entry = _select_pool_entry("anthropic")
if pool_present:
if entry is None:
return None, None
token = _pool_runtime_api_key(entry)
else:
entry = None
token = resolve_anthropic_token()
if not token: if not token:
return None, None return None, None
# Allow base URL override from config.yaml model.base_url, but only # Allow base URL override from config.yaml model.base_url, but only
# when the configured provider is anthropic — otherwise a non-Anthropic # when the configured provider is anthropic — otherwise a non-Anthropic
# base_url (e.g. Codex endpoint) would leak into Anthropic requests. # base_url (e.g. Codex endpoint) would leak into Anthropic requests.
base_url = _ANTHROPIC_DEFAULT_BASE_URL base_url = _pool_runtime_base_url(entry, _ANTHROPIC_DEFAULT_BASE_URL) if pool_present else _ANTHROPIC_DEFAULT_BASE_URL
try: try:
from hermes_cli.config import load_config from hermes_cli.config import load_config
cfg = load_config() cfg = load_config()

456
agent/credential_pool.py Normal file
View file

@ -0,0 +1,456 @@
"""Persistent multi-credential pool for same-provider failover."""
from __future__ import annotations
import time
import uuid
import os
from dataclasses import dataclass, fields
from typing import Any, Dict, List, Optional
from hermes_constants import OPENROUTER_BASE_URL
import hermes_cli.auth as auth_mod
from hermes_cli.auth import (
ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
PROVIDER_REGISTRY,
_agent_key_is_usable,
_codex_access_token_is_expiring,
_decode_jwt_claims,
_is_expiring,
_load_auth_store,
_load_provider_state,
read_credential_pool,
write_credential_pool,
)
EXHAUSTED_TTL_SECONDS = 24 * 60 * 60
@dataclass
class PooledCredential:
provider: str
id: str
label: str
auth_type: str
priority: int
source: str
access_token: str
refresh_token: Optional[str] = None
last_status: Optional[str] = None
last_status_at: Optional[float] = None
last_error_code: Optional[int] = None
base_url: Optional[str] = None
expires_at: Optional[str] = None
expires_at_ms: Optional[int] = None
last_refresh: Optional[str] = None
token_type: Optional[str] = None
scope: Optional[str] = None
client_id: Optional[str] = None
portal_base_url: Optional[str] = None
inference_base_url: Optional[str] = None
obtained_at: Optional[str] = None
expires_in: Optional[int] = None
agent_key: Optional[str] = None
agent_key_id: Optional[str] = None
agent_key_expires_at: Optional[str] = None
agent_key_expires_in: Optional[int] = None
agent_key_reused: Optional[bool] = None
agent_key_obtained_at: Optional[str] = None
tls: Optional[Dict[str, Any]] = None
@classmethod
def from_dict(cls, provider: str, payload: Dict[str, Any]) -> "PooledCredential":
allowed = {f.name for f in fields(cls) if f.name != "provider"}
data = {k: payload.get(k) for k in allowed if k in payload}
data.setdefault("id", uuid.uuid4().hex[:6])
data.setdefault("label", payload.get("source", provider))
data.setdefault("auth_type", "api_key")
data.setdefault("priority", 0)
data.setdefault("source", "manual")
data.setdefault("access_token", "")
return cls(provider=provider, **data)
def to_dict(self) -> Dict[str, Any]:
result: Dict[str, Any] = {}
for field_def in fields(self):
if field_def.name == "provider":
continue
value = getattr(self, field_def.name)
if value is not None:
result[field_def.name] = value
for key in ("last_status", "last_status_at", "last_error_code"):
result.setdefault(key, getattr(self, key))
return result
@property
def runtime_api_key(self) -> str:
if self.provider == "nous":
return str(self.agent_key or self.access_token or "")
return str(self.access_token or "")
@property
def runtime_base_url(self) -> Optional[str]:
if self.provider == "nous":
return self.inference_base_url or self.base_url
return self.base_url
def _label_from_token(token: str, fallback: str) -> str:
claims = _decode_jwt_claims(token)
for key in ("email", "preferred_username", "upn"):
value = claims.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return fallback
def _next_priority(entries: List[PooledCredential]) -> int:
return max((entry.priority for entry in entries), default=-1) + 1
class CredentialPool:
def __init__(self, provider: str, entries: List[PooledCredential]):
self.provider = provider
self._entries = sorted(entries, key=lambda entry: entry.priority)
self._current_id: Optional[str] = None
def has_credentials(self) -> bool:
return bool(self._entries)
def entries(self) -> List[PooledCredential]:
return list(sorted(self._entries, key=lambda entry: entry.priority))
def current(self) -> Optional[PooledCredential]:
if not self._current_id:
return None
return next((entry for entry in self._entries if entry.id == self._current_id), None)
def _persist(self) -> None:
write_credential_pool(
self.provider,
[entry.to_dict() for entry in sorted(self._entries, key=lambda item: item.priority)],
)
def _mark_exhausted(self, entry: PooledCredential, status_code: Optional[int]) -> None:
entry.last_status = "exhausted"
entry.last_status_at = time.time()
entry.last_error_code = status_code
self._persist()
def _refresh_entry(self, entry: PooledCredential, *, force: bool) -> Optional[PooledCredential]:
if entry.auth_type != "oauth" or not entry.refresh_token:
if force:
self._mark_exhausted(entry, None)
return None
try:
if self.provider == "anthropic":
from agent.anthropic_adapter import refresh_anthropic_oauth_pure
refreshed = refresh_anthropic_oauth_pure(
entry.refresh_token,
use_json=entry.source.endswith("hermes_pkce"),
)
entry.access_token = refreshed["access_token"]
entry.refresh_token = refreshed["refresh_token"]
entry.expires_at_ms = refreshed["expires_at_ms"]
elif self.provider == "openai-codex":
refreshed = auth_mod.refresh_codex_oauth_pure(
entry.access_token,
entry.refresh_token,
)
entry.access_token = refreshed["access_token"]
entry.refresh_token = refreshed["refresh_token"]
entry.last_refresh = refreshed.get("last_refresh")
elif self.provider == "nous":
refreshed = auth_mod.refresh_nous_oauth_pure(
entry.access_token,
entry.refresh_token,
entry.client_id or "hermes-cli",
entry.portal_base_url or "https://portal.nousresearch.com",
entry.inference_base_url or "https://inference-api.nousresearch.com/v1",
token_type=entry.token_type or "Bearer",
scope=entry.scope or "",
obtained_at=entry.obtained_at,
expires_at=entry.expires_at,
agent_key=entry.agent_key,
agent_key_expires_at=entry.agent_key_expires_at,
min_key_ttl_seconds=DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
force_refresh=force,
force_mint=force,
)
for key, value in refreshed.items():
if hasattr(entry, key):
setattr(entry, key, value)
else:
return entry
except Exception:
self._mark_exhausted(entry, None)
return None
entry.last_status = "ok"
entry.last_status_at = None
entry.last_error_code = None
self._persist()
return entry
def _entry_needs_refresh(self, entry: PooledCredential) -> bool:
if entry.auth_type != "oauth":
return False
if self.provider == "anthropic":
if entry.expires_at_ms is None:
return False
return int(entry.expires_at_ms) <= int(time.time() * 1000) + 120_000
if self.provider == "openai-codex":
return _codex_access_token_is_expiring(
entry.access_token,
CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
)
if self.provider == "nous":
if _is_expiring(entry.expires_at, ACCESS_TOKEN_REFRESH_SKEW_SECONDS):
return True
return not _agent_key_is_usable(
{
"agent_key": entry.agent_key,
"agent_key_expires_at": entry.agent_key_expires_at,
},
DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
)
return False
def select(self) -> Optional[PooledCredential]:
now = time.time()
for entry in sorted(self._entries, key=lambda item: item.priority):
if entry.last_status == "exhausted":
if entry.last_status_at and now - entry.last_status_at < EXHAUSTED_TTL_SECONDS:
continue
entry.last_status = "ok"
entry.last_status_at = None
entry.last_error_code = None
self._persist()
if self._entry_needs_refresh(entry):
refreshed = self._refresh_entry(entry, force=False)
if refreshed is None:
continue
entry = refreshed
self._current_id = entry.id
return entry
self._current_id = None
return None
def mark_exhausted_and_rotate(self, *, status_code: Optional[int]) -> Optional[PooledCredential]:
entry = self.current() or self.select()
if entry is None:
return None
self._mark_exhausted(entry, status_code)
self._current_id = None
return self.select()
def try_refresh_current(self) -> Optional[PooledCredential]:
entry = self.current()
if entry is None:
return None
refreshed = self._refresh_entry(entry, force=True)
if refreshed is not None:
self._current_id = refreshed.id
return refreshed
def reset_statuses(self) -> int:
count = 0
for entry in self._entries:
if entry.last_status or entry.last_status_at or entry.last_error_code:
entry.last_status = None
entry.last_status_at = None
entry.last_error_code = None
count += 1
if count:
self._persist()
return count
def remove_index(self, index: int) -> Optional[PooledCredential]:
ordered = sorted(self._entries, key=lambda item: item.priority)
if index < 1 or index > len(ordered):
return None
removed = ordered.pop(index - 1)
for new_priority, entry in enumerate(ordered):
entry.priority = new_priority
self._entries = ordered
self._persist()
if self._current_id == removed.id:
self._current_id = None
return removed
def add_entry(self, entry: PooledCredential) -> PooledCredential:
entry.priority = _next_priority(self._entries)
self._entries.append(entry)
self._persist()
return entry
def _upsert_entry(entries: List[PooledCredential], provider: str, source: str, payload: Dict[str, Any]) -> bool:
existing = next((entry for entry in entries if entry.source == source), None)
if existing is None:
payload.setdefault("id", uuid.uuid4().hex[:6])
payload.setdefault("priority", _next_priority(entries))
payload.setdefault("label", payload.get("label") or source)
entries.append(PooledCredential.from_dict(provider, payload))
return True
changed = False
for key, value in payload.items():
if key in {"id", "priority"} or value is None:
continue
if key == "label" and existing.label:
continue
if hasattr(existing, key) and getattr(existing, key) != value:
setattr(existing, key, value)
changed = True
return changed
def _seed_from_env(provider: str, entries: List[PooledCredential]) -> bool:
changed = False
if provider == "openrouter":
token = os.getenv("OPENROUTER_API_KEY", "").strip()
if token:
changed |= _upsert_entry(
entries,
provider,
"env:OPENROUTER_API_KEY",
{
"source": "env:OPENROUTER_API_KEY",
"auth_type": "api_key",
"access_token": token,
"base_url": OPENROUTER_BASE_URL,
"label": "OPENROUTER_API_KEY",
},
)
return changed
pconfig = PROVIDER_REGISTRY.get(provider)
if not pconfig or pconfig.auth_type != "api_key":
return changed
env_url = ""
if pconfig.base_url_env_var:
env_url = os.getenv(pconfig.base_url_env_var, "").strip().rstrip("/")
for env_var in pconfig.api_key_env_vars:
token = os.getenv(env_var, "").strip()
if not token:
continue
auth_type = "oauth" if provider == "anthropic" and not token.startswith("sk-ant-api") else "api_key"
base_url = env_url or pconfig.inference_base_url
changed |= _upsert_entry(
entries,
provider,
f"env:{env_var}",
{
"source": f"env:{env_var}",
"auth_type": auth_type,
"access_token": token,
"base_url": base_url,
"label": env_var,
},
)
return changed
def _seed_from_singletons(provider: str, entries: List[PooledCredential]) -> bool:
changed = False
auth_store = _load_auth_store()
if provider == "anthropic":
from agent.anthropic_adapter import read_claude_code_credentials, read_hermes_oauth_credentials
hermes_creds = read_hermes_oauth_credentials()
if hermes_creds and hermes_creds.get("accessToken"):
changed |= _upsert_entry(
entries,
provider,
"hermes_pkce",
{
"source": "hermes_pkce",
"auth_type": "oauth",
"access_token": hermes_creds.get("accessToken", ""),
"refresh_token": hermes_creds.get("refreshToken"),
"expires_at_ms": hermes_creds.get("expiresAt"),
"label": _label_from_token(hermes_creds.get("accessToken", ""), "hermes_pkce"),
},
)
claude_creds = read_claude_code_credentials()
if claude_creds and claude_creds.get("accessToken"):
changed |= _upsert_entry(
entries,
provider,
"claude_code",
{
"source": "claude_code",
"auth_type": "oauth",
"access_token": claude_creds.get("accessToken", ""),
"refresh_token": claude_creds.get("refreshToken"),
"expires_at_ms": claude_creds.get("expiresAt"),
"label": _label_from_token(claude_creds.get("accessToken", ""), "claude_code"),
},
)
elif provider == "nous":
state = _load_provider_state(auth_store, "nous")
if state:
changed |= _upsert_entry(
entries,
provider,
"device_code",
{
"source": "device_code",
"auth_type": "oauth",
"access_token": state.get("access_token", ""),
"refresh_token": state.get("refresh_token"),
"expires_at": state.get("expires_at"),
"token_type": state.get("token_type"),
"scope": state.get("scope"),
"client_id": state.get("client_id"),
"portal_base_url": state.get("portal_base_url"),
"inference_base_url": state.get("inference_base_url"),
"agent_key": state.get("agent_key"),
"agent_key_expires_at": state.get("agent_key_expires_at"),
"label": _label_from_token(state.get("access_token", ""), "device_code"),
},
)
elif provider == "openai-codex":
state = _load_provider_state(auth_store, "openai-codex")
tokens = state.get("tokens") if isinstance(state, dict) else None
if isinstance(tokens, dict) and tokens.get("access_token"):
changed |= _upsert_entry(
entries,
provider,
"device_code",
{
"source": "device_code",
"auth_type": "oauth",
"access_token": tokens.get("access_token", ""),
"refresh_token": tokens.get("refresh_token"),
"base_url": "https://chatgpt.com/backend-api/codex",
"last_refresh": state.get("last_refresh"),
"label": _label_from_token(tokens.get("access_token", ""), "device_code"),
},
)
return changed
def load_pool(provider: str) -> CredentialPool:
provider = (provider or "").strip().lower()
raw_entries = read_credential_pool(provider)
entries = [PooledCredential.from_dict(provider, payload) for payload in raw_entries]
changed = _seed_from_singletons(provider, entries)
changed |= _seed_from_env(provider, entries)
if changed:
write_credential_pool(
provider,
[entry.to_dict() for entry in sorted(entries, key=lambda item: item.priority)],
)
return CredentialPool(provider, entries)

4
cli.py
View file

@ -1761,6 +1761,7 @@ class HermesCLI:
resolved_api_mode = runtime.get("api_mode", self.api_mode) resolved_api_mode = runtime.get("api_mode", self.api_mode)
resolved_acp_command = runtime.get("command") resolved_acp_command = runtime.get("command")
resolved_acp_args = list(runtime.get("args") or []) resolved_acp_args = list(runtime.get("args") or [])
resolved_credential_pool = runtime.get("credential_pool")
if not isinstance(api_key, str) or not api_key: if not isinstance(api_key, str) or not api_key:
# Custom / local endpoints (llama.cpp, ollama, vLLM, etc.) often # Custom / local endpoints (llama.cpp, ollama, vLLM, etc.) often
# don't require authentication. When a base_url IS configured but # don't require authentication. When a base_url IS configured but
@ -1793,6 +1794,7 @@ class HermesCLI:
self.api_mode = resolved_api_mode self.api_mode = resolved_api_mode
self.acp_command = resolved_acp_command self.acp_command = resolved_acp_command
self.acp_args = resolved_acp_args self.acp_args = resolved_acp_args
self._credential_pool = resolved_credential_pool
self._provider_source = runtime.get("source") self._provider_source = runtime.get("source")
self.api_key = api_key self.api_key = api_key
self.base_url = base_url self.base_url = base_url
@ -1894,6 +1896,7 @@ class HermesCLI:
"api_mode": self.api_mode, "api_mode": self.api_mode,
"command": self.acp_command, "command": self.acp_command,
"args": list(self.acp_args or []), "args": list(self.acp_args or []),
"credential_pool": getattr(self, "_credential_pool", None),
} }
effective_model = model_override or self.model effective_model = model_override or self.model
self.agent = AIAgent( self.agent = AIAgent(
@ -1904,6 +1907,7 @@ class HermesCLI:
api_mode=runtime.get("api_mode"), api_mode=runtime.get("api_mode"),
acp_command=runtime.get("command"), acp_command=runtime.get("command"),
acp_args=runtime.get("args"), acp_args=runtime.get("args"),
credential_pool=runtime.get("credential_pool"),
max_iterations=self.max_turns, max_iterations=self.max_turns,
enabled_toolsets=self.enabled_toolsets, enabled_toolsets=self.enabled_toolsets,
verbose_logging=self.verbose, verbose_logging=self.verbose,

View file

@ -0,0 +1,718 @@
# Multi-Credential OAuth Fallback
**Date:** 2026-03-23
**Status:** Design v3 — implementation-ready
## Problem
Hermes supports one credential per provider. When it runs out of credits (402) or hits hard rate limits (429), the user is stuck. Users with multiple OAuth accounts (e.g., personal Claude Pro + work Claude Max + API key) can't leverage them.
## Design Decisions
| Decision | Choice | Rationale |
|----------|--------|-----------|
| Registration UX | `hermes auth add <provider>` for both OAuth and API keys | Pool is the single authority — all credential types managed through one CLI |
| Rotation trigger | Rotate on 402 immediately; retry-then-rotate on 429 | Distinguishes transient throttle from hard credit cap |
| All exhausted | Fall through to existing cross-provider `_try_activate_fallback()` | Credential rotation = inner loop; cross-provider = outer loop |
| State persistence | Persist `last_status` + `last_status_at` to `auth.json`, 24h TTL | Avoids re-probing dead creds; TTL prevents stale-state bugs |
| Selection strategy | Fill-first (exhaust primary before advancing) | Matches "use primary until exhausted" goal |
| Pool entries | Provider-specific types, not generic + opaque bag | Each provider's refresh needs different state; typed entries make schema self-documenting |
| API key authority | Pool owns all keys — env vars seed pool on first run | One source of truth, no ambiguity between env/config.yaml/pool |
| Startup credential | `runtime_provider.py` consults pool | Pool is authoritative for initial credential, not env-var chain |
| Auxiliary clients | Independent — read pool `last_status` to skip dead creds only | Low-volume tasks; full pool wiring is disproportionate for v1 |
---
## Data Model
### Provider-Specific Pool Entries
Stored in `~/.hermes/auth.json` under `credential_pool`. Each provider defines its own entry schema carrying exactly the fields its refresh logic needs.
#### Anthropic
```json
{
"credential_pool": {
"anthropic": [
{
"id": "a1b2c3",
"label": "user@gmail.com",
"auth_type": "oauth",
"priority": 0,
"source": "claude_code",
"access_token": "sk-ant-oat-...",
"refresh_token": "rt-...",
"expires_at_ms": 1711234567000,
"last_status": "ok",
"last_status_at": null,
"last_error_code": null
},
{
"id": "d4e5f6",
"label": "work@company.com",
"auth_type": "oauth",
"priority": 1,
"source": "hermes_pkce",
"access_token": "sk-ant-oat-...",
"refresh_token": "rt-...",
"expires_at_ms": 1711234999000,
"last_status": "exhausted",
"last_status_at": 1711230000.0,
"last_error_code": 402
},
{
"id": "g7h8i9",
"label": "work-budget",
"auth_type": "api_key",
"priority": 2,
"source": "manual",
"access_token": "sk-ant-api-...",
"refresh_token": null,
"expires_at_ms": null,
"last_status": "ok",
"last_status_at": null,
"last_error_code": null
}
]
}
}
```
Refresh needs: `refresh_token` only. The Anthropic OAuth token exchange returns a new `access_token` + `refresh_token` + `expires_in`. No extra state.
#### Nous
```json
{
"credential_pool": {
"nous": [
{
"id": "n1o2u3",
"label": "user@nous.com",
"auth_type": "oauth",
"priority": 0,
"source": "device_code",
"access_token": "eyJ...",
"refresh_token": "rt-...",
"expires_at": "2026-03-24T12:00:00+00:00",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"client_id": "hermes-cli",
"portal_base_url": "https://portal.nousresearch.com",
"inference_base_url": "https://inference-api.nousresearch.com/v1",
"agent_key": "ak-...",
"agent_key_expires_at": "2026-03-23T13:30:00+00:00",
"last_status": "ok",
"last_status_at": null,
"last_error_code": null
}
]
}
}
```
Refresh needs: `access_token`, `refresh_token`, `client_id`, `portal_base_url` for token refresh; then `access_token`, `portal_base_url`, `inference_base_url` for agent key minting. This is the full state currently in `auth.json → providers.nous`.
#### Codex
```json
{
"credential_pool": {
"openai-codex": [
{
"id": "c1d2x3",
"label": "user@openai.com",
"auth_type": "oauth",
"priority": 0,
"source": "device_code",
"access_token": "eyJ...",
"refresh_token": "rt-...",
"base_url": "https://chatgpt.com/backend-api/codex",
"last_refresh": "2026-03-23T10:00:00Z",
"last_status": "ok",
"last_status_at": null,
"last_error_code": null
}
]
}
}
```
Refresh needs: `access_token`, `refresh_token`. Returns new tokens dict. `base_url` is carried per-entry because it can vary.
#### API-Key Providers (generic)
For providers that only use API keys (OpenRouter, Z.AI, Kimi, MiniMax, DeepSeek, etc.), entries are simpler:
```json
{
"credential_pool": {
"openrouter": [
{
"id": "or1234",
"label": "personal",
"auth_type": "api_key",
"priority": 0,
"source": "env:OPENROUTER_API_KEY",
"access_token": "sk-or-...",
"refresh_token": null,
"base_url": "https://openrouter.ai/api/v1",
"last_status": "ok",
"last_status_at": null,
"last_error_code": null
}
]
}
}
```
No refresh logic — API keys are static. Rotation still works on 402/429.
### Common Fields (all entry types)
| Field | Type | Description |
|-------|------|-------------|
| `id` | str | Unique ID (hex, assigned at registration) |
| `label` | str | Display name (auto-extracted JWT email or user-provided) |
| `auth_type` | str | `"oauth"` or `"api_key"` |
| `priority` | int | Lower = tried first (fill-first). Set at registration time. |
| `source` | str | Provenance: `claude_code`, `hermes_pkce`, `device_code`, `env:VAR_NAME`, `manual` |
| `access_token` | str | The token used for API calls |
| `refresh_token` | str? | OAuth refresh token (null for API keys) |
| `last_status` | str? | `"ok"`, `"exhausted"`, or null |
| `last_status_at` | float? | Unix timestamp of last status change |
| `last_error_code` | int? | HTTP status code that caused exhaustion |
---
## Single-Authority API Key Storage
### The Problem Today
API keys currently come from three sources with no single owner:
1. **Env vars**`ANTHROPIC_API_KEY`, `OPENROUTER_API_KEY`, etc. (resolved by `runtime_provider.py`)
2. **Config.yaml**`model.api_key` for custom endpoints
3. **Future: `hermes auth add --type api-key`** — manual pool registration
### The Solution: Pool Owns Everything
The pool is the single source of truth for all credentials. Env vars and config.yaml become **seed sources** that populate the pool, not runtime resolution paths.
**Seeding rules (on pool load):**
1. For each provider in `PROVIDER_REGISTRY` with `api_key_env_vars`:
- Check each env var in priority order
- If set and no pool entry exists with `source: "env:VAR_NAME"` → create one at lowest priority
- If set and a pool entry with that source already exists → update `access_token` if changed (env var wins on conflict — user may have rotated the key)
2. For Anthropic specifically, also check:
- `~/.claude/.credentials.json` → seed as `source: "claude_code"` OAuth entry
- `~/.hermes/.anthropic_oauth.json` → seed as `source: "hermes_pkce"` OAuth entry
3. For Nous/Codex, also check:
- `auth.json → providers.nous` → seed as `source: "device_code"` OAuth entry
- `auth.json → providers.openai-codex` → seed as `source: "device_code"` OAuth entry
**Key property:** seeding is additive and idempotent. Existing pool entries are never deleted by seeding. Manual entries (`source: "manual"`) are never touched.
**After seeding, runtime_provider.py calls `pool.select()` instead of its own env-var chain.** The pool returns the first non-exhausted credential by priority.
### What Happens to Env Vars
Env vars still work — they seed the pool transparently. A user who sets `ANTHROPIC_API_KEY` and never runs `hermes auth add` gets exactly the same behavior as today: one credential, no rotation. The pool is invisible until they add a second credential.
If a user later runs `hermes auth add anthropic --type api-key`, the new key gets priority after the env-var-seeded entry. They now have rotation.
---
## Refresh Architecture
### Pure Refresh Functions (New)
Each OAuth provider gets a pure function that takes credential state in and returns updated state out, with **no file writes**:
```python
# agent/anthropic_adapter.py
def refresh_anthropic_oauth_pure(refresh_token: str) -> Dict[str, Any]:
"""Token exchange only. No file writes.
Returns: {"access_token": str, "refresh_token": str, "expires_at_ms": int}
"""
CLIENT_ID = "9d1c250a-e61b-44d9-88ed-5944d1962f5e"
data = urllib.parse.urlencode({
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": CLIENT_ID,
}).encode()
req = urllib.request.Request(
"https://console.anthropic.com/v1/oauth/token",
data=data,
headers={
"Content-Type": "application/x-www-form-urlencoded",
"User-Agent": f"claude-cli/{_CLAUDE_CODE_VERSION} (external, cli)",
},
method="POST",
)
with urllib.request.urlopen(req, timeout=10) as resp:
result = json.loads(resp.read().decode())
return {
"access_token": result["access_token"],
"refresh_token": result.get("refresh_token", refresh_token),
"expires_at_ms": int(time.time() * 1000) + (result.get("expires_in", 3600) * 1000),
}
```
```python
# hermes_cli/auth.py
def refresh_nous_oauth_pure(
access_token: str,
refresh_token: str,
client_id: str,
portal_base_url: str,
inference_base_url: str,
*,
min_key_ttl_seconds: int = 1800,
timeout_seconds: float = 15.0,
) -> Dict[str, Any]:
"""Refresh Nous access token + mint agent key. No auth.json writes.
Returns updated state dict with all Nous-specific fields.
"""
# Step 1: refresh access_token if expiring (same HTTP call as _refresh_access_token)
# Step 2: mint agent key (same HTTP call as _mint_agent_key)
# Returns: {"access_token", "refresh_token", "expires_at", "agent_key",
# "agent_key_expires_at", "inference_base_url", ...}
...
def refresh_codex_oauth_pure(
access_token: str,
refresh_token: str,
*,
timeout_seconds: float = 20.0,
) -> Dict[str, Any]:
"""Refresh Codex OAuth tokens. No auth.json writes.
Returns: {"access_token": str, "refresh_token": str}
"""
# Same HTTP call as _refresh_codex_auth_tokens
...
```
### Existing Functions Refactored (Backward Compat)
The existing singleton functions call the new pure functions + write to their singleton files. No behavior change for code that doesn't use the pool.
```python
# BEFORE:
def _refresh_oauth_token(creds):
# ... HTTP call ...
_write_claude_code_credentials(new_access, new_refresh, new_expires_ms)
return new_access
# AFTER:
def _refresh_oauth_token(creds):
result = refresh_anthropic_oauth_pure(creds["refreshToken"])
_write_claude_code_credentials(
result["access_token"], result["refresh_token"], result["expires_at_ms"]
)
return result["access_token"]
```
### Pool Refresh Flow
```
pool.try_refresh(entry) → updated_entry | None:
1. Dispatch to provider-specific pure refresh:
- anthropic → refresh_anthropic_oauth_pure(entry.refresh_token)
- nous → refresh_nous_oauth_pure(entry.access_token, entry.refresh_token, ...)
- codex → refresh_codex_oauth_pure(entry.access_token, entry.refresh_token)
2. On success:
- Update entry fields in-memory (access_token, refresh_token, expires_at, etc.)
- Persist updated pool entry to auth.json (pool's section, not singleton files)
- Return updated entry
3. On failure:
- Mark entry exhausted (last_status="exhausted", last_status_at=now)
- Persist status to auth.json
- Return None
```
**Key guarantee:** refreshing entry B never touches entry A. Each entry carries its own state, and the pure refresh functions have no side effects.
---
## Startup Wiring
### runtime_provider.py Changes
`resolve_runtime_provider()` currently resolves credentials via provider-specific chains (env vars, auth.json singletons, file reads). After this change:
```python
def resolve_runtime_provider(*, requested=None, explicit_api_key=None, explicit_base_url=None):
# ... existing provider resolution (which provider to use) stays the same ...
provider = resolve_provider(requested, ...)
# NEW: consult pool for initial credential
from agent.credential_pool import load_pool
pool = load_pool(provider)
if pool and pool.has_credentials():
entry = pool.select()
if entry:
return {
"provider": provider,
"api_mode": _api_mode_for_provider(provider, entry),
"base_url": _base_url_for_entry(provider, entry),
"api_key": entry.access_token,
"source": entry.source,
"credential_pool": pool, # pass pool to AIAgent for rotation
# ... provider-specific fields from entry ...
}
# FALLBACK: no pool or pool empty — use existing resolution
# (this path handles first-time users who haven't run setup yet)
if provider == "nous":
creds = resolve_nous_runtime_credentials(...)
...
elif provider == "anthropic":
...
```
The pool is passed to `AIAgent` via the runtime dict so the agent can rotate credentials mid-conversation without re-resolving.
### AIAgent.__init__ Changes
```python
class AIAgent:
def __init__(self, ..., credential_pool=None):
self._credential_pool = credential_pool
# ... existing init ...
```
### Gateway Startup
Gateway creates `AIAgent` instances per session. Since `resolve_runtime_provider()` now returns the pool, gateway gets rotation for free:
```python
# gateway/run.py — existing code already calls resolve_runtime_provider()
runtime = resolve_runtime_provider(requested=config.get("provider"))
agent = AIAgent(..., credential_pool=runtime.get("credential_pool"))
```
No additional gateway changes needed.
---
## Runtime Flow
### Credential Selection (fill-first)
```
pool.select():
1. For each entry by priority (ascending):
a. If last_status == "exhausted" and now - last_status_at < 86400 skip
b. If last_status == "exhausted" and now - last_status_at >= 86400 → reset to "ok"
c. If auth_type == "oauth" and token expires within 120s:
- try_refresh(entry)
- If refresh fails → mark exhausted, continue to next
d. Return this entry
2. All skipped/exhausted → return None
```
### Error Handling in run_agent.py
Replaces the three provider-specific `if/elif` blocks (~lines 6104-6147):
```python
# In the except block, after status_code is extracted:
# Credential pool rotation (replaces 3 provider-specific refresh blocks)
if self._credential_pool:
if status_code == 402:
prev = self._credential_pool.current()
next_entry = self._credential_pool.mark_exhausted_and_rotate(
status_code=402)
if next_entry:
self._swap_credential(next_entry)
print(f"{self.log_prefix}🔐 {prev.label} exhausted (402), "
f"switching to {next_entry.label}")
retry_count = 0
continue
# All exhausted — fall through to cross-provider fallback below
elif status_code == 429 and retry_429_with_same_cred:
# Second 429 on same credential for this request
prev = self._credential_pool.current()
next_entry = self._credential_pool.mark_exhausted_and_rotate(
status_code=429)
if next_entry:
self._swap_credential(next_entry)
print(f"{self.log_prefix}🔐 {prev.label} rate-limited (429), "
f"switching to {next_entry.label}")
retry_count = 0
continue
elif status_code == 429 and not retry_429_with_same_cred:
retry_429_with_same_cred = True
# Fall through to existing backoff logic (retry same credential)
elif status_code == 401:
refreshed = self._credential_pool.try_refresh_current()
if refreshed:
self._swap_credential(refreshed)
print(f"{self.log_prefix}🔐 Credentials refreshed, retrying...")
continue
# Refresh failed — show existing diagnostic output
```
### _swap_credential (replaces 3 methods)
```python
def _swap_credential(self, entry):
"""Hot-swap the active credential. Dispatches by api_mode."""
if self.api_mode == "anthropic_messages":
from agent.anthropic_adapter import build_anthropic_client, _is_oauth_token
try:
self._anthropic_client.close()
except Exception:
pass
self._anthropic_api_key = entry.access_token
self._anthropic_client = build_anthropic_client(
entry.access_token, self._anthropic_base_url)
self._is_anthropic_oauth = _is_oauth_token(entry.access_token)
elif self.api_mode == "codex_responses":
self.api_key = entry.access_token
self.base_url = getattr(entry, "base_url", self.base_url)
self._client_kwargs["api_key"] = self.api_key
self._client_kwargs["base_url"] = self.base_url
self._replace_primary_openai_client(reason="credential_rotation")
elif self.api_mode == "chat_completions":
self.api_key = entry.access_token
base = getattr(entry, "inference_base_url", None) or self.base_url
self.base_url = base
self._client_kwargs["api_key"] = self.api_key
self._client_kwargs["base_url"] = self.base_url
self._client_kwargs.pop("default_headers", None)
self._replace_primary_openai_client(reason="credential_rotation")
```
### Deleted Methods
- `_try_refresh_codex_client_credentials()` (~30 lines)
- `_try_refresh_nous_client_credentials()` (~35 lines)
- `_try_refresh_anthropic_client_credentials()` (~40 lines)
Total: ~105 lines removed, replaced by `_swap_credential()` (~30 lines) + pool delegation.
---
## CLI Commands
### `hermes auth add <provider>`
```
$ hermes auth add anthropic
How would you like to authenticate?
1. Claude Pro/Max subscription (OAuth login)
2. API key
> 1
Running OAuth flow...
[browser opens, user logs in with different account]
✓ Authenticated as work@company.com
✓ Added as anthropic credential #2 (priority 1)
```
```
$ hermes auth add anthropic --type api-key
Paste your API key: sk-ant-api-***
Label (optional, default: api-key-1): work-budget
✓ Added as anthropic credential #3: "work-budget" (priority 2)
```
Implementation: reuses existing OAuth flows from `setup.py` (`run_oauth_setup_token()`, device code flow, etc.) — the pool just stores the result instead of the singleton file.
### `hermes auth list`
```
$ hermes auth list
anthropic (3 credentials):
#1 user@gmail.com oauth claude_code ← active
#2 work@company.com oauth hermes_pkce exhausted (402, 2h ago)
#3 work-budget api_key manual
nous (1 credential):
#1 user@nous.com oauth device_code ← active
openrouter (1 credential):
#1 personal api_key env:OPENROUTER_API_KEY ← active
```
### `hermes auth remove <provider> <index>`
```
$ hermes auth remove anthropic 2
✓ Removed anthropic credential #2 (work@company.com)
Remaining credentials re-prioritized.
```
### `hermes auth reset <provider>`
Clears `last_status` on all credentials for a provider (manual recovery):
```
$ hermes auth reset anthropic
✓ Reset status on 3 anthropic credentials
```
---
## Backward Compatibility
### Auto-Migration (First Pool Load)
When `credential_pool` is absent in `auth.json`, `load_pool()` runs migration:
1. **Anthropic:** Walk the existing `resolve_anthropic_token()` priority chain. For each source that has a credential, create a pool entry:
- `ANTHROPIC_TOKEN` env → entry with `source: "env:ANTHROPIC_TOKEN"`
- `~/.hermes/.anthropic_oauth.json` → entry with `source: "hermes_pkce"`
- `~/.claude/.credentials.json` → entry with `source: "claude_code"`
- `ANTHROPIC_API_KEY` env → entry with `source: "env:ANTHROPIC_API_KEY"`
- Priority follows the existing resolution order (first found = priority 0)
2. **Nous:** Copy `auth.json → providers.nous` state into a pool entry with `source: "device_code"`.
3. **Codex:** Copy `auth.json → providers.openai-codex` state into a pool entry with `source: "device_code"`.
4. **API-key providers:** For each provider in `PROVIDER_REGISTRY` with `api_key_env_vars`, check env vars and create entries.
**Migration is additive.** Original singleton state is preserved (existing code paths still work). The pool is written alongside, and `runtime_provider.py` prefers it when present.
### Env Var Re-Seeding (Every Pool Load)
On every `load_pool()`, env vars are re-checked:
- If env var value changed since last seed → update the pool entry's `access_token`
- If env var is newly set → create entry at lowest priority
- If env var is now empty but pool entry with that source exists → keep pool entry (user may have moved the key to pool-only)
This ensures `export ANTHROPIC_API_KEY=new-key` takes effect without `hermes auth add`.
---
## Auxiliary Clients
`auxiliary_client.py` does **not** use the pool for rotation. It continues resolving credentials via its existing paths (`_read_nous_auth()`, `_read_codex_access_token()`, `_try_anthropic()`, etc.).
**One addition:** before resolving, check if the pool has a `last_status: "exhausted"` (within 24h) for the entry that would be resolved. If so, skip to the next available credential in the pool:
```python
# In _try_anthropic() or resolve_provider_client():
from agent.credential_pool import load_pool
pool = load_pool("anthropic")
if pool:
entry = pool.select() # skips exhausted entries
if entry:
return build_anthropic_client(entry.access_token, ...), model
# Fall through to existing resolution
```
This is ~15 lines per provider in `auxiliary_client.py`. It prevents auxiliary tasks from wasting a round-trip on a known-dead credential without requiring full pool integration.
---
## File Changes
### New Files
| File | Est. Lines | Purpose |
|------|-----------|---------|
| `agent/credential_pool.py` | ~350 | `CredentialPool` class, `load_pool()`, provider-specific entry parsing, fill-first selection, mark/rotate, persist, migration, env-var seeding, JWT label extraction |
| `hermes_cli/auth_commands.py` | ~150 | `auth add`, `auth list`, `auth remove`, `auth reset` CLI commands |
### Modified Files
| File | Change | Est. Delta |
|------|--------|-----------|
| `agent/anthropic_adapter.py` | Extract `refresh_anthropic_oauth_pure()`. Refactor `_refresh_oauth_token()` + `refresh_hermes_oauth_token()` to call it. | +40, ~15 refactored |
| `hermes_cli/auth.py` | Extract `refresh_nous_oauth_pure()`, `refresh_codex_oauth_pure()`. Add `read_credential_pool()` / `write_credential_pool()` with file-lock integration. | +100, ~25 refactored |
| `hermes_cli/runtime_provider.py` | `resolve_runtime_provider()` consults pool before falling back to existing chains. Passes pool in return dict. | +30 |
| `run_agent.py` | Accept `credential_pool` in init. Replace 3 `_try_refresh_*` methods + 3 error blocks with pool rotation + `_swap_credential()`. | -105, +60 |
| `hermes_cli/main.py` | Register `auth add/list/remove/reset` subcommands. | +10 |
| `agent/auxiliary_client.py` | Check pool `last_status` before resolving credentials in `_try_anthropic()`, `_read_nous_auth()`, `_read_codex_access_token()`. | +20 |
### Not Touched
- `gateway/run.py` — gets pool for free via `resolve_runtime_provider()``AIAgent`
- `config.yaml` — no new config keys
- `hermes_cli/setup.py` — existing OAuth flows reused by `hermes auth add`
### Total
~500 new lines, ~145 removed/refactored, 8 files touched.
---
## Test Plan
### Unit Tests — credential_pool.py
| # | Test | Verifies |
|---|------|----------|
| 1 | Fill-first selection returns lowest-priority non-exhausted entry | Selection strategy |
| 2 | All entries exhausted → returns None | Exhaustion boundary |
| 3 | 24h TTL: exhausted entry with old timestamp resets to "ok" | TTL expiry |
| 4 | 24h TTL: exhausted entry within window stays exhausted | TTL enforcement |
| 5 | `mark_exhausted_and_rotate()` sets status + persists + returns next | Rotation + persistence |
| 6 | `try_refresh()` success: updates token fields in-memory + disk | Refresh happy path |
| 7 | `try_refresh()` failure: marks exhausted, returns None | Refresh failure |
| 8 | JWT label extraction: valid JWT → email | Auto-labeling |
| 9 | JWT label extraction: non-JWT → None | Graceful fallback |
| 10 | Env-var seeding: creates entry, deduplicates on reload | Seeding idempotency |
| 11 | Env-var seeding: updated env var updates pool entry token | Env var rotation |
| 12 | Migration: Anthropic sources → pool entries with correct priority | Backward compat |
| 13 | Migration: Nous state → pool entry with full provider fields | Provider-specific migration |
| 14 | Migration: Codex state → pool entry | Provider-specific migration |
| 15 | Migration: idempotent (running twice doesn't duplicate) | Safety |
### Integration Tests — refresh isolation
| # | Test | Verifies |
|---|------|----------|
| 16 | Refresh Anthropic cred B does NOT overwrite cred A's token | No singleton clobbering |
| 17 | Refresh Nous cred B does NOT overwrite cred A's agent key | No singleton clobbering |
| 18 | Pool persists refresh result to `credential_pool` section only | Isolation from singleton files |
| 19 | Existing singleton refresh functions still work (backward compat) | Refactor didn't break old path |
### Integration Tests — runtime wiring
| # | Test | Verifies |
|---|------|----------|
| 20 | `resolve_runtime_provider()` returns pool credential when pool exists | Startup wire-up |
| 21 | `resolve_runtime_provider()` falls back to old chain when pool empty | Backward compat |
| 22 | Pool passed through to AIAgent via runtime dict | Rotation availability |
| 23 | Gateway creates agent with pool from `resolve_runtime_provider()` | Gateway gets rotation |
### Integration Tests — rotation flow
| # | Test | Verifies |
|---|------|----------|
| 24 | 402 on cred 1 → auto-rotate to cred 2 → success | Happy path rotation |
| 25 | 429 (first) → retry same cred → success | Transient throttle |
| 26 | 429 (first) → retry same cred → 429 again → rotate | Hard rate limit |
| 27 | All creds exhausted → `_try_activate_fallback()` | Cross-provider fallback |
| 28 | 401 → `try_refresh_current()` → swap → success | Auth refresh |
| 29 | Cross-session: exhaust in session 1, session 2 skips it | Persisted status |
| 30 | Cross-session: 24h later, session re-probes | TTL expiry |
| 31 | Auxiliary client skips known-dead credential | Auxiliary awareness |
### CLI Tests
| # | Test | Verifies |
|---|------|----------|
| 32 | `hermes auth add anthropic` (OAuth mock) → pool entry with JWT label | Registration |
| 33 | `hermes auth add anthropic --type api-key` → pool entry with manual label | API key registration |
| 34 | `hermes auth list` output format matches spec | Display |
| 35 | `hermes auth remove` removes correct entry, re-indexes priorities | Removal |
| 36 | `hermes auth reset` clears all `last_status` for provider | Manual recovery |

View file

@ -250,6 +250,7 @@ def _resolve_runtime_agent_kwargs() -> dict:
"api_mode": runtime.get("api_mode"), "api_mode": runtime.get("api_mode"),
"command": runtime.get("command"), "command": runtime.get("command"),
"args": list(runtime.get("args") or []), "args": list(runtime.get("args") or []),
"credential_pool": runtime.get("credential_pool"),
} }

View file

@ -537,7 +537,11 @@ def _load_auth_store(auth_file: Optional[Path] = None) -> Dict[str, Any]:
except Exception: except Exception:
return {"version": AUTH_STORE_VERSION, "providers": {}} return {"version": AUTH_STORE_VERSION, "providers": {}}
if isinstance(raw, dict) and isinstance(raw.get("providers"), dict): if isinstance(raw, dict) and (
isinstance(raw.get("providers"), dict)
or isinstance(raw.get("credential_pool"), dict)
):
raw.setdefault("providers", {})
return raw return raw
# Migrate from PR's "systems" format if present # Migrate from PR's "systems" format if present
@ -605,6 +609,30 @@ def _save_provider_state(auth_store: Dict[str, Any], provider_id: str, state: Di
auth_store["active_provider"] = provider_id auth_store["active_provider"] = provider_id
def read_credential_pool(provider_id: Optional[str] = None) -> Dict[str, Any]:
"""Return the persisted credential pool, or one provider slice."""
auth_store = _load_auth_store()
pool = auth_store.get("credential_pool")
if not isinstance(pool, dict):
pool = {}
if provider_id is None:
return dict(pool)
provider_entries = pool.get(provider_id)
return list(provider_entries) if isinstance(provider_entries, list) else []
def write_credential_pool(provider_id: str, entries: List[Dict[str, Any]]) -> Path:
"""Persist one provider's credential pool under auth.json."""
with _auth_store_lock():
auth_store = _load_auth_store()
pool = auth_store.get("credential_pool")
if not isinstance(pool, dict):
pool = {}
auth_store["credential_pool"] = pool
pool[provider_id] = list(entries)
return _save_auth_store(auth_store)
def get_provider_auth_state(provider_id: str) -> Optional[Dict[str, Any]]: def get_provider_auth_state(provider_id: str) -> Optional[Dict[str, Any]]:
"""Return persisted auth state for a provider, or None.""" """Return persisted auth state for a provider, or None."""
auth_store = _load_auth_store() auth_store = _load_auth_store()
@ -878,15 +906,14 @@ def _save_codex_tokens(tokens: Dict[str, str], last_refresh: str = None) -> None
_save_auth_store(auth_store) _save_auth_store(auth_store)
def _refresh_codex_auth_tokens( def refresh_codex_oauth_pure(
tokens: Dict[str, str], access_token: str,
timeout_seconds: float, refresh_token: str,
) -> Dict[str, str]: *,
"""Refresh Codex access token using the refresh token. timeout_seconds: float = 20.0,
) -> Dict[str, Any]:
Saves the new tokens to Hermes auth store automatically. """Refresh Codex OAuth tokens without mutating Hermes auth state."""
""" del access_token # Access token is only used by callers to decide whether to refresh.
refresh_token = tokens.get("refresh_token")
if not isinstance(refresh_token, str) or not refresh_token.strip(): if not isinstance(refresh_token, str) or not refresh_token.strip():
raise AuthError( raise AuthError(
"Codex auth is missing refresh_token. Run `hermes login` to re-authenticate.", "Codex auth is missing refresh_token. Run `hermes login` to re-authenticate.",
@ -941,8 +968,8 @@ def _refresh_codex_auth_tokens(
relogin_required=True, relogin_required=True,
) from exc ) from exc
access_token = refresh_payload.get("access_token") refreshed_access = refresh_payload.get("access_token")
if not isinstance(access_token, str) or not access_token.strip(): if not isinstance(refreshed_access, str) or not refreshed_access.strip():
raise AuthError( raise AuthError(
"Codex token refresh response was missing access_token.", "Codex token refresh response was missing access_token.",
provider="openai-codex", provider="openai-codex",
@ -950,11 +977,33 @@ def _refresh_codex_auth_tokens(
relogin_required=True, relogin_required=True,
) )
updated_tokens = dict(tokens) updated = {
updated_tokens["access_token"] = access_token.strip() "access_token": refreshed_access.strip(),
"refresh_token": refresh_token.strip(),
"last_refresh": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
}
next_refresh = refresh_payload.get("refresh_token") next_refresh = refresh_payload.get("refresh_token")
if isinstance(next_refresh, str) and next_refresh.strip(): if isinstance(next_refresh, str) and next_refresh.strip():
updated_tokens["refresh_token"] = next_refresh.strip() updated["refresh_token"] = next_refresh.strip()
return updated
def _refresh_codex_auth_tokens(
tokens: Dict[str, str],
timeout_seconds: float,
) -> Dict[str, str]:
"""Refresh Codex access token using the refresh token.
Saves the new tokens to Hermes auth store automatically.
"""
refreshed = refresh_codex_oauth_pure(
str(tokens.get("access_token", "") or ""),
str(tokens.get("refresh_token", "") or ""),
timeout_seconds=timeout_seconds,
)
updated_tokens = dict(tokens)
updated_tokens["access_token"] = refreshed["access_token"]
updated_tokens["refresh_token"] = refreshed["refresh_token"]
_save_codex_tokens(updated_tokens) _save_codex_tokens(updated_tokens)
return updated_tokens return updated_tokens
@ -1293,6 +1342,91 @@ def _agent_key_is_usable(state: Dict[str, Any], min_ttl_seconds: int) -> bool:
return not _is_expiring(state.get("agent_key_expires_at"), min_ttl_seconds) return not _is_expiring(state.get("agent_key_expires_at"), min_ttl_seconds)
def refresh_nous_oauth_pure(
access_token: str,
refresh_token: str,
client_id: str,
portal_base_url: str,
inference_base_url: str,
*,
token_type: str = "Bearer",
scope: str = DEFAULT_NOUS_SCOPE,
obtained_at: Optional[str] = None,
expires_at: Optional[str] = None,
agent_key: Optional[str] = None,
agent_key_expires_at: Optional[str] = None,
min_key_ttl_seconds: int = DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
timeout_seconds: float = 15.0,
insecure: Optional[bool] = None,
ca_bundle: Optional[str] = None,
force_refresh: bool = False,
force_mint: bool = False,
) -> Dict[str, Any]:
"""Refresh Nous OAuth state without mutating auth.json."""
state: Dict[str, Any] = {
"access_token": access_token,
"refresh_token": refresh_token,
"client_id": client_id or DEFAULT_NOUS_CLIENT_ID,
"portal_base_url": (portal_base_url or DEFAULT_NOUS_PORTAL_URL).rstrip("/"),
"inference_base_url": (inference_base_url or DEFAULT_NOUS_INFERENCE_URL).rstrip("/"),
"token_type": token_type or "Bearer",
"scope": scope or DEFAULT_NOUS_SCOPE,
"obtained_at": obtained_at,
"expires_at": expires_at,
"agent_key": agent_key,
"agent_key_expires_at": agent_key_expires_at,
"tls": {
"insecure": bool(insecure),
"ca_bundle": ca_bundle,
},
}
verify = _resolve_verify(insecure=insecure, ca_bundle=ca_bundle, auth_state=state)
timeout = httpx.Timeout(timeout_seconds if timeout_seconds else 15.0)
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client:
if force_refresh or _is_expiring(state.get("expires_at"), ACCESS_TOKEN_REFRESH_SKEW_SECONDS):
refreshed = _refresh_access_token(
client=client,
portal_base_url=state["portal_base_url"],
client_id=state["client_id"],
refresh_token=state["refresh_token"],
)
now = datetime.now(timezone.utc)
access_ttl = _coerce_ttl_seconds(refreshed.get("expires_in"))
state["access_token"] = refreshed["access_token"]
state["refresh_token"] = refreshed.get("refresh_token") or state["refresh_token"]
state["token_type"] = refreshed.get("token_type") or state.get("token_type") or "Bearer"
state["scope"] = refreshed.get("scope") or state.get("scope")
refreshed_url = _optional_base_url(refreshed.get("inference_base_url"))
if refreshed_url:
state["inference_base_url"] = refreshed_url
state["obtained_at"] = now.isoformat()
state["expires_in"] = access_ttl
state["expires_at"] = datetime.fromtimestamp(
now.timestamp() + access_ttl, tz=timezone.utc
).isoformat()
if force_mint or not _agent_key_is_usable(state, max(60, int(min_key_ttl_seconds))):
mint_payload = _mint_agent_key(
client=client,
portal_base_url=state["portal_base_url"],
access_token=state["access_token"],
min_ttl_seconds=min_key_ttl_seconds,
)
now = datetime.now(timezone.utc)
state["agent_key"] = mint_payload.get("api_key")
state["agent_key_id"] = mint_payload.get("key_id")
state["agent_key_expires_at"] = mint_payload.get("expires_at")
state["agent_key_expires_in"] = mint_payload.get("expires_in")
state["agent_key_reused"] = bool(mint_payload.get("reused", False))
state["agent_key_obtained_at"] = now.isoformat()
minted_url = _optional_base_url(mint_payload.get("inference_base_url"))
if minted_url:
state["inference_base_url"] = minted_url
return state
def resolve_nous_runtime_credentials( def resolve_nous_runtime_credentials(
*, *,
min_key_ttl_seconds: int = DEFAULT_AGENT_KEY_MIN_TTL_SECONDS, min_key_ttl_seconds: int = DEFAULT_AGENT_KEY_MIN_TTL_SECONDS,
@ -2159,34 +2293,36 @@ def _codex_device_code_login() -> Dict[str, Any]:
} }
def _login_nous(args, pconfig: ProviderConfig) -> None: def _nous_device_code_login(
"""Nous Portal device authorization flow.""" *,
portal_base_url: Optional[str] = None,
inference_base_url: Optional[str] = None,
client_id: Optional[str] = None,
scope: Optional[str] = None,
open_browser: bool = True,
timeout_seconds: float = 15.0,
insecure: bool = False,
ca_bundle: Optional[str] = None,
min_key_ttl_seconds: int = 5 * 60,
) -> Dict[str, Any]:
"""Run the Nous device-code flow and return full OAuth state without persisting."""
pconfig = PROVIDER_REGISTRY["nous"]
portal_base_url = ( portal_base_url = (
getattr(args, "portal_url", None) portal_base_url
or os.getenv("HERMES_PORTAL_BASE_URL") or os.getenv("HERMES_PORTAL_BASE_URL")
or os.getenv("NOUS_PORTAL_BASE_URL") or os.getenv("NOUS_PORTAL_BASE_URL")
or pconfig.portal_base_url or pconfig.portal_base_url
).rstrip("/") ).rstrip("/")
requested_inference_url = ( requested_inference_url = (
getattr(args, "inference_url", None) inference_base_url
or os.getenv("NOUS_INFERENCE_BASE_URL") or os.getenv("NOUS_INFERENCE_BASE_URL")
or pconfig.inference_base_url or pconfig.inference_base_url
).rstrip("/") ).rstrip("/")
client_id = getattr(args, "client_id", None) or pconfig.client_id client_id = client_id or pconfig.client_id
scope = getattr(args, "scope", None) or pconfig.scope scope = scope or pconfig.scope
open_browser = not getattr(args, "no_browser", False)
timeout_seconds = getattr(args, "timeout", None) or 15.0
timeout = httpx.Timeout(timeout_seconds) timeout = httpx.Timeout(timeout_seconds)
insecure = bool(getattr(args, "insecure", False))
ca_bundle = (
getattr(args, "ca_bundle", None)
or os.getenv("HERMES_CA_BUNDLE")
or os.getenv("SSL_CERT_FILE")
)
verify: bool | str = False if insecure else (ca_bundle if ca_bundle else True) verify: bool | str = False if insecure else (ca_bundle if ca_bundle else True)
# Skip browser open in SSH sessions
if _is_remote_session(): if _is_remote_session():
open_browser = False open_browser = False
@ -2197,74 +2333,121 @@ def _login_nous(args, pconfig: ProviderConfig) -> None:
elif ca_bundle: elif ca_bundle:
print(f"TLS verification: custom CA bundle ({ca_bundle})") print(f"TLS verification: custom CA bundle ({ca_bundle})")
try: with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client:
with httpx.Client(timeout=timeout, headers={"Accept": "application/json"}, verify=verify) as client: device_data = _request_device_code(
device_data = _request_device_code( client=client,
client=client, portal_base_url=portal_base_url, portal_base_url=portal_base_url,
client_id=client_id, scope=scope, client_id=client_id,
) scope=scope,
verification_url = str(device_data["verification_uri_complete"])
user_code = str(device_data["user_code"])
expires_in = int(device_data["expires_in"])
interval = int(device_data["interval"])
print()
print("To continue:")
print(f" 1. Open: {verification_url}")
print(f" 2. If prompted, enter code: {user_code}")
if open_browser:
opened = webbrowser.open(verification_url)
if opened:
print(" (Opened browser for verification)")
else:
print(" Could not open browser automatically — use the URL above.")
effective_interval = max(1, min(interval, DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS))
print(f"Waiting for approval (polling every {effective_interval}s)...")
token_data = _poll_for_token(
client=client, portal_base_url=portal_base_url,
client_id=client_id, device_code=str(device_data["device_code"]),
expires_in=expires_in, poll_interval=interval,
)
# Process token response
now = datetime.now(timezone.utc)
token_expires_in = _coerce_ttl_seconds(token_data.get("expires_in", 0))
expires_at = now.timestamp() + token_expires_in
inference_base_url = (
_optional_base_url(token_data.get("inference_base_url"))
or requested_inference_url
) )
if inference_base_url != requested_inference_url:
print(f"Using portal-provided inference URL: {inference_base_url}")
auth_state = { verification_url = str(device_data["verification_uri_complete"])
"portal_base_url": portal_base_url, user_code = str(device_data["user_code"])
"inference_base_url": inference_base_url, expires_in = int(device_data["expires_in"])
"client_id": client_id, interval = int(device_data["interval"])
"scope": token_data.get("scope") or scope,
"token_type": token_data.get("token_type", "Bearer"), print()
"access_token": token_data["access_token"], print("To continue:")
"refresh_token": token_data.get("refresh_token"), print(f" 1. Open: {verification_url}")
"obtained_at": now.isoformat(), print(f" 2. If prompted, enter code: {user_code}")
"expires_at": datetime.fromtimestamp(expires_at, tz=timezone.utc).isoformat(),
"expires_in": token_expires_in, if open_browser:
"tls": { opened = webbrowser.open(verification_url)
"insecure": verify is False, if opened:
"ca_bundle": verify if isinstance(verify, str) else None, print(" (Opened browser for verification)")
}, else:
"agent_key": None, print(" Could not open browser automatically — use the URL above.")
"agent_key_id": None,
"agent_key_expires_at": None, effective_interval = max(1, min(interval, DEVICE_AUTH_POLL_INTERVAL_CAP_SECONDS))
"agent_key_expires_in": None, print(f"Waiting for approval (polling every {effective_interval}s)...")
"agent_key_reused": None,
"agent_key_obtained_at": None, token_data = _poll_for_token(
} client=client,
portal_base_url=portal_base_url,
client_id=client_id,
device_code=str(device_data["device_code"]),
expires_in=expires_in,
poll_interval=interval,
)
now = datetime.now(timezone.utc)
token_expires_in = _coerce_ttl_seconds(token_data.get("expires_in", 0))
expires_at = now.timestamp() + token_expires_in
resolved_inference_url = (
_optional_base_url(token_data.get("inference_base_url"))
or requested_inference_url
)
if resolved_inference_url != requested_inference_url:
print(f"Using portal-provided inference URL: {resolved_inference_url}")
auth_state = {
"portal_base_url": portal_base_url,
"inference_base_url": resolved_inference_url,
"client_id": client_id,
"scope": token_data.get("scope") or scope,
"token_type": token_data.get("token_type", "Bearer"),
"access_token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token"),
"obtained_at": now.isoformat(),
"expires_at": datetime.fromtimestamp(expires_at, tz=timezone.utc).isoformat(),
"expires_in": token_expires_in,
"tls": {
"insecure": verify is False,
"ca_bundle": verify if isinstance(verify, str) else None,
},
"agent_key": None,
"agent_key_id": None,
"agent_key_expires_at": None,
"agent_key_expires_in": None,
"agent_key_reused": None,
"agent_key_obtained_at": None,
}
return refresh_nous_oauth_pure(
auth_state["access_token"],
auth_state["refresh_token"],
auth_state["client_id"],
auth_state["portal_base_url"],
auth_state["inference_base_url"],
token_type=auth_state["token_type"],
scope=auth_state["scope"],
obtained_at=auth_state["obtained_at"],
expires_at=auth_state["expires_at"],
agent_key=auth_state["agent_key"],
agent_key_expires_at=auth_state["agent_key_expires_at"],
min_key_ttl_seconds=min_key_ttl_seconds,
timeout_seconds=timeout_seconds,
insecure=insecure,
ca_bundle=ca_bundle,
force_refresh=False,
force_mint=True,
)
def _login_nous(args, pconfig: ProviderConfig) -> None:
"""Nous Portal device authorization flow."""
timeout_seconds = getattr(args, "timeout", None) or 15.0
insecure = bool(getattr(args, "insecure", False))
ca_bundle = (
getattr(args, "ca_bundle", None)
or os.getenv("HERMES_CA_BUNDLE")
or os.getenv("SSL_CERT_FILE")
)
try:
auth_state = _nous_device_code_login(
portal_base_url=getattr(args, "portal_url", None) or pconfig.portal_base_url,
inference_base_url=getattr(args, "inference_url", None) or pconfig.inference_base_url,
client_id=getattr(args, "client_id", None) or pconfig.client_id,
scope=getattr(args, "scope", None) or pconfig.scope,
open_browser=not getattr(args, "no_browser", False),
timeout_seconds=timeout_seconds,
insecure=insecure,
ca_bundle=ca_bundle,
min_key_ttl_seconds=5 * 60,
)
inference_base_url = auth_state["inference_base_url"]
verify: bool | str = False if insecure else (ca_bundle if ca_bundle else True)
# Save auth state
with _auth_store_lock(): with _auth_store_lock():
auth_store = _load_auth_store() auth_store = _load_auth_store()
_save_provider_state(auth_store, "nous", auth_state) _save_provider_state(auth_store, "nous", auth_state)
@ -2276,21 +2459,17 @@ def _login_nous(args, pconfig: ProviderConfig) -> None:
print(f" Auth state: {saved_to}") print(f" Auth state: {saved_to}")
print(f" Config updated: {config_path} (model.provider=nous)") print(f" Config updated: {config_path} (model.provider=nous)")
# Mint an initial agent key and list available models
try: try:
runtime_creds = resolve_nous_runtime_credentials( runtime_key = auth_state.get("agent_key") or auth_state.get("access_token")
min_key_ttl_seconds=5 * 60,
timeout_seconds=timeout_seconds,
insecure=insecure, ca_bundle=ca_bundle,
)
runtime_key = runtime_creds.get("api_key")
runtime_base_url = runtime_creds.get("base_url") or inference_base_url
if not isinstance(runtime_key, str) or not runtime_key: if not isinstance(runtime_key, str) or not runtime_key:
raise AuthError("No runtime API key available to fetch models", raise AuthError(
provider="nous", code="invalid_token") "No runtime API key available to fetch models",
provider="nous",
code="invalid_token",
)
model_ids = fetch_nous_models( model_ids = fetch_nous_models(
inference_base_url=runtime_base_url, inference_base_url=inference_base_url,
api_key=runtime_key, api_key=runtime_key,
timeout_seconds=timeout_seconds, timeout_seconds=timeout_seconds,
verify=verify, verify=verify,

238
hermes_cli/auth_commands.py Normal file
View file

@ -0,0 +1,238 @@
"""Credential-pool auth subcommands."""
from __future__ import annotations
from getpass import getpass
import uuid
from agent.credential_pool import PooledCredential, load_pool
import hermes_cli.auth as auth_mod
from hermes_cli.auth import PROVIDER_REGISTRY
def _normalize_provider(provider: str) -> str:
normalized = (provider or "").strip().lower()
if normalized in {"or", "open-router"}:
return "openrouter"
return normalized
def _provider_base_url(provider: str) -> str:
if provider == "openrouter":
return "https://openrouter.ai/api/v1"
pconfig = PROVIDER_REGISTRY.get(provider)
return pconfig.inference_base_url if pconfig else ""
def _derive_label(token: str, fallback: str) -> str:
claims = auth_mod._decode_jwt_claims(token)
for key in ("email", "preferred_username", "upn"):
value = claims.get(key)
if isinstance(value, str) and value.strip():
return value.strip()
return fallback
def _oauth_default_label(provider: str, count: int) -> str:
return f"{provider}-oauth-{count}"
def _api_key_default_label(count: int) -> str:
return f"api-key-{count}"
def _display_source(source: str) -> str:
return source.split(":", 1)[1] if source.startswith("manual:") else source
def auth_add_command(args) -> None:
provider = _normalize_provider(getattr(args, "provider", ""))
if provider not in PROVIDER_REGISTRY and provider != "openrouter":
raise SystemExit(f"Unknown provider: {provider}")
requested_type = str(getattr(args, "auth_type", "") or "").strip().lower()
if requested_type in {"api_key", "api-key"}:
requested_type = "api_key"
if not requested_type:
requested_type = "oauth" if provider in {"anthropic", "nous", "openai-codex"} else "api_key"
pool = load_pool(provider)
if requested_type == "api_key":
token = (getattr(args, "api_key", None) or "").strip()
if not token:
token = getpass("Paste your API key: ").strip()
if not token:
raise SystemExit("No API key provided.")
default_label = _api_key_default_label(len(pool.entries()) + 1)
label = (getattr(args, "label", None) or "").strip()
if not label:
label = input(f"Label (optional, default: {default_label}): ").strip() or default_label
entry = PooledCredential(
provider=provider,
id=uuid.uuid4().hex[:6],
label=label,
auth_type="api_key",
priority=0,
source="manual",
access_token=token,
base_url=_provider_base_url(provider),
)
pool.add_entry(entry)
print(f'Added {provider} credential #{len(pool.entries())}: "{label}"')
return
if provider == "anthropic":
from agent import anthropic_adapter as anthropic_mod
creds = anthropic_mod.run_hermes_oauth_login_pure()
if not creds:
raise SystemExit("Anthropic OAuth login did not return credentials.")
label = (getattr(args, "label", None) or "").strip() or _derive_label(
creds["access_token"],
_oauth_default_label(provider, len(pool.entries()) + 1),
)
entry = PooledCredential(
provider=provider,
id=uuid.uuid4().hex[:6],
label=label,
auth_type="oauth",
priority=0,
source="manual:hermes_pkce",
access_token=creds["access_token"],
refresh_token=creds.get("refresh_token"),
expires_at_ms=creds.get("expires_at_ms"),
base_url=_provider_base_url(provider),
)
pool.add_entry(entry)
print(f'Added {provider} OAuth credential #{len(pool.entries())}: "{entry.label}"')
return
if provider == "nous":
creds = auth_mod._nous_device_code_login(
portal_base_url=getattr(args, "portal_url", None),
inference_base_url=getattr(args, "inference_url", None),
client_id=getattr(args, "client_id", None),
scope=getattr(args, "scope", None),
open_browser=not getattr(args, "no_browser", False),
timeout_seconds=getattr(args, "timeout", None) or 15.0,
insecure=bool(getattr(args, "insecure", False)),
ca_bundle=getattr(args, "ca_bundle", None),
min_key_ttl_seconds=max(60, int(getattr(args, "min_key_ttl_seconds", 5 * 60))),
)
label = (getattr(args, "label", None) or "").strip() or _derive_label(
creds.get("access_token", ""),
_oauth_default_label(provider, len(pool.entries()) + 1),
)
entry = PooledCredential(
provider=provider,
id=uuid.uuid4().hex[:6],
label=label,
auth_type="oauth",
priority=0,
source="manual:device_code",
access_token=creds["access_token"],
refresh_token=creds.get("refresh_token"),
expires_at=creds.get("expires_at"),
token_type=creds.get("token_type"),
scope=creds.get("scope"),
client_id=creds.get("client_id"),
portal_base_url=creds.get("portal_base_url"),
inference_base_url=creds.get("inference_base_url"),
obtained_at=creds.get("obtained_at"),
expires_in=creds.get("expires_in"),
agent_key=creds.get("agent_key"),
agent_key_id=creds.get("agent_key_id"),
agent_key_expires_at=creds.get("agent_key_expires_at"),
agent_key_expires_in=creds.get("agent_key_expires_in"),
agent_key_reused=creds.get("agent_key_reused"),
agent_key_obtained_at=creds.get("agent_key_obtained_at"),
tls=creds.get("tls"),
base_url=creds.get("inference_base_url"),
)
pool.add_entry(entry)
print(f'Added {provider} OAuth credential #{len(pool.entries())}: "{entry.label}"')
return
if provider == "openai-codex":
creds = auth_mod._codex_device_code_login()
label = (getattr(args, "label", None) or "").strip() or _derive_label(
creds["tokens"]["access_token"],
_oauth_default_label(provider, len(pool.entries()) + 1),
)
entry = PooledCredential(
provider=provider,
id=uuid.uuid4().hex[:6],
label=label,
auth_type="oauth",
priority=0,
source="manual:device_code",
access_token=creds["tokens"]["access_token"],
refresh_token=creds["tokens"].get("refresh_token"),
base_url=creds.get("base_url"),
last_refresh=creds.get("last_refresh"),
)
pool.add_entry(entry)
print(f'Added {provider} OAuth credential #{len(pool.entries())}: "{entry.label}"')
return
raise SystemExit(f"`hermes auth add {provider}` is not implemented for auth type {requested_type} yet.")
def auth_list_command(args) -> None:
provider_filter = _normalize_provider(getattr(args, "provider", "") or "")
providers = [provider_filter] if provider_filter else sorted({
*PROVIDER_REGISTRY.keys(),
"openrouter",
})
for provider in providers:
pool = load_pool(provider)
entries = pool.entries()
if not entries:
continue
current = pool.select()
print(f"{provider} ({len(entries)} credentials):")
for idx, entry in enumerate(entries, start=1):
marker = " "
if current is not None and entry.id == current.id:
marker = ""
status = ""
if entry.last_status == "exhausted":
status = f" exhausted ({entry.last_error_code})"
source = _display_source(entry.source)
print(f" #{idx} {entry.label:<20} {entry.auth_type:<7} {source}{status} {marker}".rstrip())
print()
def auth_remove_command(args) -> None:
provider = _normalize_provider(getattr(args, "provider", ""))
index = int(getattr(args, "index"))
pool = load_pool(provider)
removed = pool.remove_index(index)
if removed is None:
raise SystemExit(f"No credential #{index} for provider {provider}.")
print(f"Removed {provider} credential #{index} ({removed.label})")
def auth_reset_command(args) -> None:
provider = _normalize_provider(getattr(args, "provider", ""))
pool = load_pool(provider)
count = pool.reset_statuses()
print(f"Reset status on {count} {provider} credentials")
def auth_command(args) -> None:
action = getattr(args, "auth_action", "")
if action == "add":
auth_add_command(args)
return
if action == "list":
auth_list_command(args)
return
if action == "remove":
auth_remove_command(args)
return
if action == "reset":
auth_reset_command(args)
return
raise SystemExit("Usage: hermes auth [add|list|remove|reset] ...")

View file

@ -2310,6 +2310,12 @@ def cmd_logout(args):
logout_command(args) logout_command(args)
def cmd_auth(args):
"""Manage pooled credentials."""
from hermes_cli.auth_commands import auth_command
auth_command(args)
def cmd_status(args): def cmd_status(args):
"""Show status of all components.""" """Show status of all components."""
from hermes_cli.status import show_status from hermes_cli.status import show_status
@ -2980,7 +2986,7 @@ def _coalesce_session_name_args(argv: list) -> list:
or a known top-level subcommand. or a known top-level subcommand.
""" """
_SUBCOMMANDS = { _SUBCOMMANDS = {
"chat", "model", "gateway", "setup", "whatsapp", "login", "logout", "chat", "model", "gateway", "setup", "whatsapp", "login", "logout", "auth",
"status", "cron", "doctor", "config", "pairing", "skills", "tools", "status", "cron", "doctor", "config", "pairing", "skills", "tools",
"mcp", "sessions", "insights", "version", "update", "uninstall", "mcp", "sessions", "insights", "version", "update", "uninstall",
} }
@ -3021,6 +3027,10 @@ Examples:
hermes --resume <session_id> Resume a specific session by ID hermes --resume <session_id> Resume a specific session by ID
hermes setup Run setup wizard hermes setup Run setup wizard
hermes logout Clear stored authentication hermes logout Clear stored authentication
hermes auth add <provider> Add a pooled credential
hermes auth list List pooled credentials
hermes auth remove <p> <n> Remove pooled credential by index
hermes auth reset <provider> Clear exhaustion status for a provider
hermes model Select default model hermes model Select default model
hermes config View configuration hermes config View configuration
hermes config edit Edit config in $EDITOR hermes config edit Edit config in $EDITOR
@ -3334,6 +3344,33 @@ For more help on a command:
) )
logout_parser.set_defaults(func=cmd_logout) logout_parser.set_defaults(func=cmd_logout)
auth_parser = subparsers.add_parser(
"auth",
help="Manage pooled provider credentials",
)
auth_subparsers = auth_parser.add_subparsers(dest="auth_action")
auth_add = auth_subparsers.add_parser("add", help="Add a pooled credential")
auth_add.add_argument("provider", help="Provider id (for example: anthropic, openai-codex, openrouter)")
auth_add.add_argument("--type", dest="auth_type", choices=["oauth", "api-key", "api_key"], help="Credential type to add")
auth_add.add_argument("--label", help="Optional display label")
auth_add.add_argument("--api-key", help="API key value (otherwise prompted securely)")
auth_add.add_argument("--portal-url", help="Nous portal base URL")
auth_add.add_argument("--inference-url", help="Nous inference base URL")
auth_add.add_argument("--client-id", help="OAuth client id")
auth_add.add_argument("--scope", help="OAuth scope override")
auth_add.add_argument("--no-browser", action="store_true", help="Do not auto-open a browser for OAuth login")
auth_add.add_argument("--timeout", type=float, help="OAuth/network timeout in seconds")
auth_add.add_argument("--insecure", action="store_true", help="Disable TLS verification for OAuth login")
auth_add.add_argument("--ca-bundle", help="Custom CA bundle for OAuth login")
auth_list = auth_subparsers.add_parser("list", help="List pooled credentials")
auth_list.add_argument("provider", nargs="?", help="Optional provider filter")
auth_remove = auth_subparsers.add_parser("remove", help="Remove a pooled credential by index")
auth_remove.add_argument("provider", help="Provider id")
auth_remove.add_argument("index", type=int, help="1-based credential index")
auth_reset = auth_subparsers.add_parser("reset", help="Clear exhaustion status for all credentials for a provider")
auth_reset.add_argument("provider", help="Provider id")
auth_parser.set_defaults(func=cmd_auth)
# ========================================================================= # =========================================================================
# status command # status command
# ========================================================================= # =========================================================================

View file

@ -6,8 +6,10 @@ import os
from typing import Any, Dict, Optional from typing import Any, Dict, Optional
from hermes_cli import auth as auth_mod from hermes_cli import auth as auth_mod
from agent.credential_pool import load_pool
from hermes_cli.auth import ( from hermes_cli.auth import (
AuthError, AuthError,
DEFAULT_CODEX_BASE_URL,
PROVIDER_REGISTRY, PROVIDER_REGISTRY,
format_auth_error, format_auth_error,
resolve_provider, resolve_provider,
@ -106,6 +108,48 @@ def _parse_api_mode(raw: Any) -> Optional[str]:
return None return None
def _resolve_runtime_from_pool_entry(
*,
provider: str,
entry: Any,
requested_provider: str,
model_cfg: Optional[Dict[str, Any]] = None,
pool: Any = None,
) -> Dict[str, Any]:
model_cfg = model_cfg or _get_model_config()
base_url = (getattr(entry, "runtime_base_url", None) or getattr(entry, "base_url", None) or "").rstrip("/")
api_key = getattr(entry, "runtime_api_key", None) or getattr(entry, "access_token", "")
api_mode = "chat_completions"
if provider == "openai-codex":
api_mode = "codex_responses"
base_url = base_url or DEFAULT_CODEX_BASE_URL
elif provider == "anthropic":
api_mode = "anthropic_messages"
base_url = base_url or "https://api.anthropic.com"
elif provider == "nous":
api_mode = "chat_completions"
elif provider == "copilot":
api_mode = _copilot_runtime_api_mode(model_cfg, getattr(entry, "runtime_api_key", ""))
else:
configured_mode = _parse_api_mode(model_cfg.get("api_mode"))
if configured_mode:
api_mode = configured_mode
elif base_url.rstrip("/").endswith("/anthropic") or provider in ("minimax", "minimax-cn"):
api_mode = "anthropic_messages"
if base_url.rstrip("/").endswith("/v1"):
base_url = base_url.rstrip("/")[:-3] + "/anthropic"
return {
"provider": provider,
"api_mode": api_mode,
"base_url": base_url,
"api_key": api_key,
"source": getattr(entry, "source", "pool"),
"credential_pool": pool,
"requested_provider": requested_provider,
}
def resolve_requested_provider(requested: Optional[str] = None) -> str: def resolve_requested_provider(requested: Optional[str] = None) -> str:
"""Resolve provider request from explicit arg, config, then env.""" """Resolve provider request from explicit arg, config, then env."""
if requested and requested.strip(): if requested and requested.strip():
@ -313,6 +357,38 @@ def resolve_runtime_provider(
explicit_api_key=explicit_api_key, explicit_api_key=explicit_api_key,
explicit_base_url=explicit_base_url, explicit_base_url=explicit_base_url,
) )
model_cfg = _get_model_config()
should_use_pool = provider != "openrouter"
if provider == "openrouter":
cfg_provider = str(model_cfg.get("provider") or "").strip().lower()
cfg_base_url = str(model_cfg.get("base_url") or "").strip()
env_openai_base_url = os.getenv("OPENAI_BASE_URL", "").strip()
has_custom_endpoint = bool(explicit_base_url or env_openai_base_url)
if cfg_base_url and cfg_provider in {"auto", "custom"}:
has_custom_endpoint = True
should_use_pool = requested_provider == "openrouter" and not has_custom_endpoint
try:
pool = load_pool(provider) if should_use_pool else None
except Exception:
pool = None
if pool and pool.has_credentials():
entry = pool.select()
pool_api_key = ""
if entry is not None:
pool_api_key = (
getattr(entry, "runtime_api_key", None)
or getattr(entry, "access_token", "")
)
if entry is not None and pool_api_key:
return _resolve_runtime_from_pool_entry(
provider=provider,
entry=entry,
requested_provider=requested_provider,
model_cfg=model_cfg,
pool=pool,
)
if provider == "nous": if provider == "nous":
creds = resolve_nous_runtime_credentials( creds = resolve_nous_runtime_credentials(
@ -385,7 +461,6 @@ def resolve_runtime_provider(
pconfig = PROVIDER_REGISTRY.get(provider) pconfig = PROVIDER_REGISTRY.get(provider)
if pconfig and pconfig.auth_type == "api_key": if pconfig and pconfig.auth_type == "api_key":
creds = resolve_api_key_provider_credentials(provider) creds = resolve_api_key_provider_credentials(provider)
model_cfg = _get_model_config()
base_url = creds.get("base_url", "").rstrip("/") base_url = creds.get("base_url", "").rstrip("/")
api_mode = "chat_completions" api_mode = "chat_completions"
if provider == "copilot": if provider == "copilot":

View file

@ -418,6 +418,7 @@ class AIAgent:
honcho_config=None, honcho_config=None,
iteration_budget: "IterationBudget" = None, iteration_budget: "IterationBudget" = None,
fallback_model: Dict[str, Any] = None, fallback_model: Dict[str, Any] = None,
credential_pool=None,
checkpoints_enabled: bool = False, checkpoints_enabled: bool = False,
checkpoint_max_snapshots: int = 50, checkpoint_max_snapshots: int = 50,
pass_session_id: bool = False, pass_session_id: bool = False,
@ -485,6 +486,7 @@ class AIAgent:
self._print_fn = None self._print_fn = None
self.skip_context_files = skip_context_files self.skip_context_files = skip_context_files
self.pass_session_id = pass_session_id self.pass_session_id = pass_session_id
self._credential_pool = credential_pool
self.log_prefix_chars = log_prefix_chars self.log_prefix_chars = log_prefix_chars
self.log_prefix = f"{log_prefix} " if log_prefix else "" self.log_prefix = f"{log_prefix} " if log_prefix else ""
# Store effective base URL for feature detection (prompt caching, reasoning, etc.) # Store effective base URL for feature detection (prompt caching, reasoning, etc.)
@ -3420,6 +3422,84 @@ class AIAgent:
self._is_anthropic_oauth = _is_oauth_token(new_token) self._is_anthropic_oauth = _is_oauth_token(new_token)
return True return True
def _apply_client_headers_for_base_url(self, base_url: str) -> None:
normalized = (base_url or "").lower()
if "openrouter" in normalized:
self._client_kwargs["default_headers"] = {
"HTTP-Referer": "https://hermes-agent.nousresearch.com",
"X-OpenRouter-Title": "Hermes Agent",
"X-OpenRouter-Categories": "productivity,cli-agent",
}
elif "api.githubcopilot.com" in normalized:
from hermes_cli.models import copilot_default_headers
self._client_kwargs["default_headers"] = copilot_default_headers()
elif "api.kimi.com" in normalized:
self._client_kwargs["default_headers"] = {"User-Agent": "KimiCLI/1.3"}
else:
self._client_kwargs.pop("default_headers", None)
def _swap_credential(self, entry) -> None:
runtime_key = getattr(entry, "runtime_api_key", None) or getattr(entry, "access_token", "")
runtime_base = getattr(entry, "runtime_base_url", None) or getattr(entry, "base_url", None) or self.base_url
if self.api_mode == "anthropic_messages":
from agent.anthropic_adapter import build_anthropic_client, _is_oauth_token
try:
self._anthropic_client.close()
except Exception:
pass
self._anthropic_api_key = runtime_key
self._anthropic_base_url = runtime_base
self._anthropic_client = build_anthropic_client(runtime_key, runtime_base)
self._is_anthropic_oauth = _is_oauth_token(runtime_key) if self.provider == "anthropic" else False
self.api_key = runtime_key
self.base_url = runtime_base
return
self.api_key = runtime_key
self.base_url = runtime_base.rstrip("/") if isinstance(runtime_base, str) else runtime_base
self._client_kwargs["api_key"] = self.api_key
self._client_kwargs["base_url"] = self.base_url
self._apply_client_headers_for_base_url(self.base_url)
self._replace_primary_openai_client(reason="credential_rotation")
def _recover_with_credential_pool(
self,
*,
status_code: Optional[int],
retry_429_with_same_cred: bool,
) -> tuple[bool, bool]:
pool = getattr(self, "_credential_pool", None)
if pool is None or status_code is None:
return False, retry_429_with_same_cred
if status_code == 402:
next_entry = pool.mark_exhausted_and_rotate(status_code=402)
if next_entry is not None:
self._swap_credential(next_entry)
return True, False
return False, retry_429_with_same_cred
if status_code == 429:
if not retry_429_with_same_cred:
return False, True
next_entry = pool.mark_exhausted_and_rotate(status_code=429)
if next_entry is not None:
self._swap_credential(next_entry)
return True, False
return False, True
if status_code == 401:
refreshed = pool.try_refresh_current()
if refreshed is not None:
self._swap_credential(refreshed)
return True, retry_429_with_same_cred
return False, retry_429_with_same_cred
def _anthropic_messages_create(self, api_kwargs: dict): def _anthropic_messages_create(self, api_kwargs: dict):
if self.api_mode == "anthropic_messages": if self.api_mode == "anthropic_messages":
self._try_refresh_anthropic_client_credentials() self._try_refresh_anthropic_client_credentials()
@ -5724,6 +5804,7 @@ class AIAgent:
codex_auth_retry_attempted = False codex_auth_retry_attempted = False
anthropic_auth_retry_attempted = False anthropic_auth_retry_attempted = False
nous_auth_retry_attempted = False nous_auth_retry_attempted = False
retry_429_with_same_cred = False
restart_with_compressed_messages = False restart_with_compressed_messages = False
restart_with_length_continuation = False restart_with_length_continuation = False
@ -6101,6 +6182,12 @@ class AIAgent:
self.thinking_callback("") self.thinking_callback("")
status_code = getattr(api_error, "status_code", None) status_code = getattr(api_error, "status_code", None)
recovered_with_pool, retry_429_with_same_cred = self._recover_with_credential_pool(
status_code=status_code,
retry_429_with_same_cred=retry_429_with_same_cred,
)
if recovered_with_pool:
continue
if ( if (
self.api_mode == "codex_responses" self.api_mode == "codex_responses"
and self.provider == "openai-codex" and self.provider == "openai-codex"

View file

@ -206,6 +206,31 @@ class TestAnthropicOAuthFlag:
adapter = client.chat.completions adapter = client.chat.completions
assert adapter._is_oauth is False assert adapter._is_oauth is False
def test_pool_entry_takes_priority_over_legacy_resolution(self):
class _Entry:
access_token = "sk-ant-oat01-pooled"
base_url = "https://api.anthropic.com"
class _Pool:
def has_credentials(self):
return True
def select(self):
return _Entry()
with (
patch("agent.auxiliary_client.load_pool", return_value=_Pool()),
patch("agent.anthropic_adapter.resolve_anthropic_token", side_effect=AssertionError("legacy path should not run")),
patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()) as mock_build,
):
from agent.auxiliary_client import _try_anthropic
client, model = _try_anthropic()
assert client is not None
assert model == "claude-haiku-4-5-20251001"
assert mock_build.call_args.args[0] == "sk-ant-oat01-pooled"
class TestExpiredCodexFallback: class TestExpiredCodexFallback:
"""Test that expired Codex tokens don't block the auto chain.""" """Test that expired Codex tokens don't block the auto chain."""
@ -533,6 +558,32 @@ class TestGetTextAuxiliaryClient:
from agent.auxiliary_client import CodexAuxiliaryClient from agent.auxiliary_client import CodexAuxiliaryClient
assert isinstance(client, CodexAuxiliaryClient) assert isinstance(client, CodexAuxiliaryClient)
def test_codex_pool_entry_takes_priority_over_auth_store(self):
class _Entry:
access_token = "pooled-codex-token"
base_url = "https://chatgpt.com/backend-api/codex"
class _Pool:
def has_credentials(self):
return True
def select(self):
return _Entry()
with (
patch("agent.auxiliary_client.load_pool", return_value=_Pool()),
patch("agent.auxiliary_client.OpenAI"),
patch("hermes_cli.auth._read_codex_tokens", side_effect=AssertionError("legacy codex store should not run")),
):
from agent.auxiliary_client import _try_codex
client, model = _try_codex()
from agent.auxiliary_client import CodexAuxiliaryClient
assert isinstance(client, CodexAuxiliaryClient)
assert model == "gpt-5.2-codex"
def test_returns_none_when_nothing_available(self, monkeypatch): def test_returns_none_when_nothing_available(self, monkeypatch):
monkeypatch.delenv("OPENAI_BASE_URL", raising=False) monkeypatch.delenv("OPENAI_BASE_URL", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False) monkeypatch.delenv("OPENAI_API_KEY", raising=False)
@ -581,6 +632,35 @@ class TestVisionClientFallback:
assert client.__class__.__name__ == "AnthropicAuxiliaryClient" assert client.__class__.__name__ == "AnthropicAuxiliaryClient"
assert model == "claude-haiku-4-5-20251001" assert model == "claude-haiku-4-5-20251001"
class TestAuxiliaryPoolAwareness:
def test_try_nous_uses_pool_entry(self):
class _Entry:
access_token = "pooled-access-token"
agent_key = "pooled-agent-key"
inference_base_url = "https://inference.pool.example/v1"
class _Pool:
def has_credentials(self):
return True
def select(self):
return _Entry()
with (
patch("agent.auxiliary_client.load_pool", return_value=_Pool()),
patch("agent.auxiliary_client.OpenAI") as mock_openai,
):
from agent.auxiliary_client import _try_nous
client, model = _try_nous()
assert client is not None
assert model == "gemini-3-flash"
call_kwargs = mock_openai.call_args.kwargs
assert call_kwargs["api_key"] == "pooled-agent-key"
assert call_kwargs["base_url"] == "https://inference.pool.example/v1"
def test_resolve_provider_client_copilot_uses_runtime_credentials(self, monkeypatch): def test_resolve_provider_client_copilot_uses_runtime_credentials(self, monkeypatch):
monkeypatch.delenv("GITHUB_TOKEN", raising=False) monkeypatch.delenv("GITHUB_TOKEN", raising=False)
monkeypatch.delenv("GH_TOKEN", raising=False) monkeypatch.delenv("GH_TOKEN", raising=False)

268
tests/test_auth_commands.py Normal file
View file

@ -0,0 +1,268 @@
"""Tests for auth subcommands backed by the credential pool."""
from __future__ import annotations
import base64
import json
import pytest
def _write_auth_store(tmp_path, payload: dict) -> None:
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps(payload, indent=2))
def _jwt_with_email(email: str) -> str:
header = base64.urlsafe_b64encode(b'{"alg":"RS256","typ":"JWT"}').rstrip(b"=").decode()
payload = base64.urlsafe_b64encode(
json.dumps({"email": email}).encode()
).rstrip(b"=").decode()
return f"{header}.{payload}.signature"
@pytest.fixture(autouse=True)
def _clear_provider_env(monkeypatch):
for key in (
"OPENROUTER_API_KEY",
"OPENAI_API_KEY",
"ANTHROPIC_API_KEY",
"ANTHROPIC_TOKEN",
"CLAUDE_CODE_OAUTH_TOKEN",
):
monkeypatch.delenv(key, raising=False)
def test_auth_add_api_key_persists_manual_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.delenv("OPENROUTER_API_KEY", raising=False)
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
_write_auth_store(tmp_path, {"version": 1, "providers": {}})
from hermes_cli.auth_commands import auth_add_command
class _Args:
provider = "openrouter"
auth_type = "api-key"
api_key = "sk-or-manual"
label = "personal"
auth_add_command(_Args())
payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
entries = payload["credential_pool"]["openrouter"]
entry = next(item for item in entries if item["source"] == "manual")
assert entry["label"] == "personal"
assert entry["auth_type"] == "api_key"
assert entry["source"] == "manual"
assert entry["access_token"] == "sk-or-manual"
def test_auth_add_anthropic_oauth_persists_pool_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
monkeypatch.delenv("ANTHROPIC_TOKEN", raising=False)
monkeypatch.delenv("CLAUDE_CODE_OAUTH_TOKEN", raising=False)
_write_auth_store(tmp_path, {"version": 1, "providers": {}})
token = _jwt_with_email("claude@example.com")
monkeypatch.setattr(
"agent.anthropic_adapter.run_hermes_oauth_login_pure",
lambda: {
"access_token": token,
"refresh_token": "refresh-token",
"expires_at_ms": 1711234567000,
},
)
from hermes_cli.auth_commands import auth_add_command
class _Args:
provider = "anthropic"
auth_type = "oauth"
api_key = None
label = None
auth_add_command(_Args())
payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
entries = payload["credential_pool"]["anthropic"]
entry = next(item for item in entries if item["source"] == "manual:hermes_pkce")
assert entry["label"] == "claude@example.com"
assert entry["source"] == "manual:hermes_pkce"
assert entry["refresh_token"] == "refresh-token"
assert entry["expires_at_ms"] == 1711234567000
def test_auth_add_nous_oauth_persists_pool_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(tmp_path, {"version": 1, "providers": {}})
token = _jwt_with_email("nous@example.com")
monkeypatch.setattr(
"hermes_cli.auth._nous_device_code_login",
lambda **kwargs: {
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"scope": "inference:mint_agent_key",
"token_type": "Bearer",
"access_token": token,
"refresh_token": "refresh-token",
"obtained_at": "2026-03-23T10:00:00+00:00",
"expires_at": "2026-03-23T11:00:00+00:00",
"expires_in": 3600,
"agent_key": "ak-test",
"agent_key_id": "ak-id",
"agent_key_expires_at": "2026-03-23T10:30:00+00:00",
"agent_key_expires_in": 1800,
"agent_key_reused": False,
"agent_key_obtained_at": "2026-03-23T10:00:10+00:00",
"tls": {"insecure": False, "ca_bundle": None},
},
)
from hermes_cli.auth_commands import auth_add_command
class _Args:
provider = "nous"
auth_type = "oauth"
api_key = None
label = None
portal_url = None
inference_url = None
client_id = None
scope = None
no_browser = False
timeout = None
insecure = False
ca_bundle = None
auth_add_command(_Args())
payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
entries = payload["credential_pool"]["nous"]
entry = next(item for item in entries if item["source"] == "manual:device_code")
assert entry["label"] == "nous@example.com"
assert entry["source"] == "manual:device_code"
assert entry["agent_key"] == "ak-test"
assert entry["portal_base_url"] == "https://portal.example.com"
def test_auth_add_codex_oauth_persists_pool_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(tmp_path, {"version": 1, "providers": {}})
token = _jwt_with_email("codex@example.com")
monkeypatch.setattr(
"hermes_cli.auth._codex_device_code_login",
lambda: {
"tokens": {
"access_token": token,
"refresh_token": "refresh-token",
},
"base_url": "https://chatgpt.com/backend-api/codex",
"last_refresh": "2026-03-23T10:00:00Z",
},
)
from hermes_cli.auth_commands import auth_add_command
class _Args:
provider = "openai-codex"
auth_type = "oauth"
api_key = None
label = None
auth_add_command(_Args())
payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
entries = payload["credential_pool"]["openai-codex"]
entry = next(item for item in entries if item["source"] == "manual:device_code")
assert entry["label"] == "codex@example.com"
assert entry["source"] == "manual:device_code"
assert entry["refresh_token"] == "refresh-token"
assert entry["base_url"] == "https://chatgpt.com/backend-api/codex"
def test_auth_remove_reindexes_priorities(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"anthropic": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-ant-api-primary",
},
{
"id": "cred-2",
"label": "secondary",
"auth_type": "api_key",
"priority": 1,
"source": "manual",
"access_token": "sk-ant-api-secondary",
},
]
},
},
)
from hermes_cli.auth_commands import auth_remove_command
class _Args:
provider = "anthropic"
index = 1
auth_remove_command(_Args())
payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
entries = payload["credential_pool"]["anthropic"]
assert len(entries) == 1
assert entries[0]["label"] == "secondary"
assert entries[0]["priority"] == 0
def test_auth_reset_clears_provider_statuses(tmp_path, monkeypatch, capsys):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"anthropic": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-ant-api-primary",
"last_status": "exhausted",
"last_status_at": 1711230000.0,
"last_error_code": 402,
}
]
},
},
)
from hermes_cli.auth_commands import auth_reset_command
class _Args:
provider = "anthropic"
auth_reset_command(_Args())
out = capsys.readouterr().out
assert "Reset status" in out
payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
entry = payload["credential_pool"]["anthropic"][0]
assert entry["last_status"] is None
assert entry["last_status_at"] is None
assert entry["last_error_code"] is None

View file

@ -0,0 +1,297 @@
"""Tests for multi-credential runtime pooling and rotation."""
from __future__ import annotations
import json
import time
import pytest
def _write_auth_store(tmp_path, payload: dict) -> None:
hermes_home = tmp_path / "hermes"
hermes_home.mkdir(parents=True, exist_ok=True)
(hermes_home / "auth.json").write_text(json.dumps(payload, indent=2))
def test_fill_first_selection_skips_recently_exhausted_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"anthropic": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-ant-api-primary",
"last_status": "exhausted",
"last_status_at": time.time(),
"last_error_code": 402,
},
{
"id": "cred-2",
"label": "secondary",
"auth_type": "api_key",
"priority": 1,
"source": "manual",
"access_token": "sk-ant-api-secondary",
"last_status": "ok",
"last_status_at": None,
"last_error_code": None,
},
]
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("anthropic")
entry = pool.select()
assert entry is not None
assert entry.id == "cred-2"
assert pool.current().id == "cred-2"
def test_exhausted_entry_resets_after_ttl(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"openrouter": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-or-primary",
"base_url": "https://openrouter.ai/api/v1",
"last_status": "exhausted",
"last_status_at": time.time() - 90000,
"last_error_code": 429,
}
]
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("openrouter")
entry = pool.select()
assert entry is not None
assert entry.id == "cred-1"
assert entry.last_status == "ok"
def test_mark_exhausted_and_rotate_persists_status(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"anthropic": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "api_key",
"priority": 0,
"source": "manual",
"access_token": "sk-ant-api-primary",
},
{
"id": "cred-2",
"label": "secondary",
"auth_type": "api_key",
"priority": 1,
"source": "manual",
"access_token": "sk-ant-api-secondary",
},
]
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("anthropic")
assert pool.select().id == "cred-1"
next_entry = pool.mark_exhausted_and_rotate(status_code=402)
assert next_entry is not None
assert next_entry.id == "cred-2"
auth_payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
persisted = auth_payload["credential_pool"]["anthropic"][0]
assert persisted["last_status"] == "exhausted"
assert persisted["last_error_code"] == 402
def test_try_refresh_current_updates_only_current_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"openai-codex": [
{
"id": "cred-1",
"label": "primary",
"auth_type": "oauth",
"priority": 0,
"source": "device_code",
"access_token": "access-old",
"refresh_token": "refresh-old",
"base_url": "https://chatgpt.com/backend-api/codex",
},
{
"id": "cred-2",
"label": "secondary",
"auth_type": "oauth",
"priority": 1,
"source": "device_code",
"access_token": "access-other",
"refresh_token": "refresh-other",
"base_url": "https://chatgpt.com/backend-api/codex",
},
]
},
},
)
from agent.credential_pool import load_pool
monkeypatch.setattr(
"hermes_cli.auth.refresh_codex_oauth_pure",
lambda access_token, refresh_token, timeout_seconds=20.0: {
"access_token": "access-new",
"refresh_token": "refresh-new",
},
)
pool = load_pool("openai-codex")
current = pool.select()
assert current.id == "cred-1"
refreshed = pool.try_refresh_current()
assert refreshed is not None
assert refreshed.access_token == "access-new"
auth_payload = json.loads((tmp_path / "hermes" / "auth.json").read_text())
primary, secondary = auth_payload["credential_pool"]["openai-codex"]
assert primary["access_token"] == "access-new"
assert primary["refresh_token"] == "refresh-new"
assert secondary["access_token"] == "access-other"
assert secondary["refresh_token"] == "refresh-other"
def test_load_pool_seeds_env_api_key(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.setenv("OPENROUTER_API_KEY", "sk-or-seeded")
_write_auth_store(tmp_path, {"version": 1, "providers": {}})
from agent.credential_pool import load_pool
pool = load_pool("openrouter")
entry = pool.select()
assert entry is not None
assert entry.source == "env:OPENROUTER_API_KEY"
assert entry.access_token == "sk-or-seeded"
def test_load_pool_migrates_nous_provider_state(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(
tmp_path,
{
"version": 1,
"active_provider": "nous",
"providers": {
"nous": {
"portal_base_url": "https://portal.example.com",
"inference_base_url": "https://inference.example.com/v1",
"client_id": "hermes-cli",
"token_type": "Bearer",
"scope": "inference:mint_agent_key",
"access_token": "access-token",
"refresh_token": "refresh-token",
"expires_at": "2026-03-24T12:00:00+00:00",
"agent_key": "agent-key",
"agent_key_expires_at": "2026-03-24T13:30:00+00:00",
}
},
},
)
from agent.credential_pool import load_pool
pool = load_pool("nous")
entry = pool.select()
assert entry is not None
assert entry.source == "device_code"
assert entry.portal_base_url == "https://portal.example.com"
assert entry.agent_key == "agent-key"
def test_singleton_seed_does_not_clobber_manual_oauth_entry(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
monkeypatch.delenv("ANTHROPIC_API_KEY", raising=False)
monkeypatch.delenv("ANTHROPIC_TOKEN", raising=False)
monkeypatch.delenv("CLAUDE_CODE_OAUTH_TOKEN", raising=False)
_write_auth_store(
tmp_path,
{
"version": 1,
"credential_pool": {
"anthropic": [
{
"id": "manual-1",
"label": "manual-pkce",
"auth_type": "oauth",
"priority": 0,
"source": "manual:hermes_pkce",
"access_token": "manual-token",
"refresh_token": "manual-refresh",
"expires_at_ms": 1711234567000,
}
]
},
},
)
monkeypatch.setattr(
"agent.anthropic_adapter.read_hermes_oauth_credentials",
lambda: {
"accessToken": "seeded-token",
"refreshToken": "seeded-refresh",
"expiresAt": 1711234999000,
},
)
monkeypatch.setattr(
"agent.anthropic_adapter.read_claude_code_credentials",
lambda: None,
)
from agent.credential_pool import load_pool
pool = load_pool("anthropic")
entries = pool.entries()
assert len(entries) == 2
assert {entry.source for entry in entries} == {"manual:hermes_pkce", "hermes_pkce"}

View file

@ -1528,6 +1528,62 @@ class TestNousCredentialRefresh:
assert isinstance(agent.client, _RebuiltClient) assert isinstance(agent.client, _RebuiltClient)
class TestCredentialPoolRecovery:
def test_recover_with_pool_rotates_on_402(self, agent):
current = SimpleNamespace(label="primary")
next_entry = SimpleNamespace(label="secondary")
class _Pool:
def current(self):
return current
def mark_exhausted_and_rotate(self, *, status_code):
assert status_code == 402
return next_entry
agent._credential_pool = _Pool()
agent._swap_credential = MagicMock()
recovered, retry_same = agent._recover_with_credential_pool(
status_code=402,
retry_429_with_same_cred=False,
)
assert recovered is True
assert retry_same is False
agent._swap_credential.assert_called_once_with(next_entry)
def test_recover_with_pool_retries_first_429_then_rotates(self, agent):
next_entry = SimpleNamespace(label="secondary")
class _Pool:
def current(self):
return SimpleNamespace(label="primary")
def mark_exhausted_and_rotate(self, *, status_code):
assert status_code == 429
return next_entry
agent._credential_pool = _Pool()
agent._swap_credential = MagicMock()
recovered, retry_same = agent._recover_with_credential_pool(
status_code=429,
retry_429_with_same_cred=False,
)
assert recovered is False
assert retry_same is True
agent._swap_credential.assert_not_called()
recovered, retry_same = agent._recover_with_credential_pool(
status_code=429,
retry_429_with_same_cred=True,
)
assert recovered is True
assert retry_same is False
agent._swap_credential.assert_called_once_with(next_entry)
class TestMaxTokensParam: class TestMaxTokensParam:
"""Verify _max_tokens_param returns the correct key for each provider.""" """Verify _max_tokens_param returns the correct key for each provider."""

View file

@ -1,6 +1,55 @@
from hermes_cli import runtime_provider as rp from hermes_cli import runtime_provider as rp
def test_resolve_runtime_provider_uses_credential_pool(monkeypatch):
class _Entry:
access_token = "pool-token"
source = "manual"
base_url = "https://chatgpt.com/backend-api/codex"
class _Pool:
def has_credentials(self):
return True
def select(self):
return _Entry()
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openai-codex")
monkeypatch.setattr(rp, "load_pool", lambda provider: _Pool())
resolved = rp.resolve_runtime_provider(requested="openai-codex")
assert resolved["provider"] == "openai-codex"
assert resolved["api_key"] == "pool-token"
assert resolved["credential_pool"] is not None
assert resolved["source"] == "manual"
def test_resolve_runtime_provider_falls_back_when_pool_empty(monkeypatch):
class _Pool:
def has_credentials(self):
return False
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openai-codex")
monkeypatch.setattr(rp, "load_pool", lambda provider: _Pool())
monkeypatch.setattr(
rp,
"resolve_codex_runtime_credentials",
lambda: {
"provider": "openai-codex",
"base_url": "https://chatgpt.com/backend-api/codex",
"api_key": "codex-token",
"source": "hermes-auth-store",
"last_refresh": "2026-02-26T00:00:00Z",
},
)
resolved = rp.resolve_runtime_provider(requested="openai-codex")
assert resolved["api_key"] == "codex-token"
assert resolved.get("credential_pool") is None
def test_resolve_runtime_provider_codex(monkeypatch): def test_resolve_runtime_provider_codex(monkeypatch):
monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openai-codex") monkeypatch.setattr(rp, "resolve_provider", lambda *a, **k: "openai-codex")
monkeypatch.setattr( monkeypatch.setattr(

View file

@ -593,7 +593,14 @@ class TestDelegationCredentialResolution(unittest.TestCase):
"model": "qwen2.5-coder", "model": "qwen2.5-coder",
"base_url": "http://localhost:1234/v1", "base_url": "http://localhost:1234/v1",
} }
with patch.dict(os.environ, {"OPENROUTER_API_KEY": "env-openrouter-key"}, clear=False): with patch.dict(
os.environ,
{
"OPENROUTER_API_KEY": "env-openrouter-key",
"OPENAI_API_KEY": "",
},
clear=False,
):
with self.assertRaises(ValueError) as ctx: with self.assertRaises(ValueError) as ctx:
_resolve_delegation_credentials(cfg, parent) _resolve_delegation_credentials(cfg, parent)
self.assertIn("OPENAI_API_KEY", str(ctx.exception)) self.assertIn("OPENAI_API_KEY", str(ctx.exception))

View file

@ -18,6 +18,11 @@ import pytest
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _clear_openai_env(monkeypatch):
monkeypatch.delenv("OPENAI_API_KEY", raising=False)
class TestGetProvider: class TestGetProvider:
"""_get_provider() picks the right backend based on config + availability.""" """_get_provider() picks the right backend based on config + availability."""