diff --git a/agent/agent_runtime_helpers.py b/agent/agent_runtime_helpers.py index f2d9112ccd3..f0fbd0aa8c1 100644 --- a/agent/agent_runtime_helpers.py +++ b/agent/agent_runtime_helpers.py @@ -612,6 +612,8 @@ def recover_with_credential_pool( context_message = str(error_context.get("message") or "").lower() usage_limit_reached = ( "usage_limit_reached" in context_reason + or "gousagelimit" in context_reason + or "usage limit reached" in context_message or "usage limit has been reached" in context_message ) if not has_retried_429 and not usage_limit_reached: @@ -2090,19 +2092,33 @@ def extract_api_error_context(error: Exception) -> Dict[str, Any]: if "reset_at" not in context: message = context.get("message") or "" if isinstance(message, str): - delay_match = re.search(r"quotaResetDelay[:\s\"]+(\\d+(?:\\.\\d+)?)(ms|s)", message, re.IGNORECASE) + delay_match = re.search(r"quotaResetDelay[:\s\"]+(\d+(?:\.\d+)?)(ms|s)", message, re.IGNORECASE) if delay_match: value = float(delay_match.group(1)) seconds = value / 1000.0 if delay_match.group(2).lower() == "ms" else value context["reset_at"] = time.time() + seconds else: - sec_match = re.search( - r"retry\s+(?:after\s+)?(\d+(?:\.\d+)?)\s*(?:sec|secs|seconds|s\b)", + resets_in_match = re.search( + r"resets?\s+in\s+" + r"(?:(\d+(?:\.\d+)?)\s*(?:h|hr|hrs|hour|hours)\b\s*)?" + r"(?:(\d+(?:\.\d+)?)\s*(?:m|min|mins|minute|minutes)\b\s*)?" + r"(?:(\d+(?:\.\d+)?)\s*(?:s|sec|secs|second|seconds)\b)?", message, re.IGNORECASE, ) - if sec_match: - context["reset_at"] = time.time() + float(sec_match.group(1)) + if resets_in_match and any(resets_in_match.groups()): + hours = float(resets_in_match.group(1) or 0) + minutes = float(resets_in_match.group(2) or 0) + seconds = float(resets_in_match.group(3) or 0) + context["reset_at"] = time.time() + (hours * 3600) + (minutes * 60) + seconds + else: + sec_match = re.search( + r"retry\s+(?:after\s+)?(\d+(?:\.\d+)?)\s*(?:sec|secs|seconds|s\b)", + message, + re.IGNORECASE, + ) + if sec_match: + context["reset_at"] = time.time() + float(sec_match.group(1)) return context diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index bc98e9d8593..e6d42dd2165 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -1406,6 +1406,9 @@ def _resolve_api_key_provider() -> Tuple[Optional[OpenAI], Optional[str]]: for provider_id, pconfig in PROVIDER_REGISTRY.items(): if pconfig.auth_type != "api_key": continue + if _is_provider_unhealthy(provider_id): + logger.debug("Auxiliary api-key chain: %s is unhealthy, skipping", provider_id) + continue if provider_id == "anthropic": # Only try anthropic when the user has explicitly configured it. # Without this gate, Claude Code credentials get silently used @@ -2260,11 +2263,12 @@ def _is_payment_error(exc: Exception) -> bool: "credits", "insufficient funds", "can only afford", "billing", "payment required", - # Daily / monthly quota exhaustion keywords + # Daily / monthly / weekly quota exhaustion keywords "quota exceeded", "quota_exceeded", "too many tokens per day", "daily limit", "tokens per day", "daily quota", "resource exhausted", # Vertex AI / gRPC quota errors + "weekly usage limit", "weekly limit", # OpenCode Go weekly subscription cap )): return True return False @@ -2478,7 +2482,11 @@ def _pool_error_context(exc: Exception) -> Dict[str, Any]: return payload -def _recoverable_pool_provider(resolved_provider: str, client: Any) -> Optional[str]: +def _recoverable_pool_provider( + resolved_provider: str, + client: Any, + main_runtime: Optional[Dict[str, Any]] = None, +) -> Optional[str]: """Infer which provider pool can recover the current auxiliary client.""" normalized = _normalize_aux_provider(resolved_provider) if normalized not in {"", "auto", "custom"}: @@ -2496,11 +2504,33 @@ def _recoverable_pool_provider(resolved_provider: str, client: Any) -> Optional[ return "copilot" if base_url_host_matches(base, "api.kimi.com"): return "kimi-coding" + # For api_key providers not in the hardcoded list (e.g. opencode-go), match + # the client base URL against all registered api_key providers so that + # credential-pool rotation works for any provider the user configured. + if main_runtime: + rt = _normalize_main_runtime(main_runtime) + rt_provider = rt.get("provider", "") + if rt_provider and rt_provider not in {"", "auto", "custom"}: + try: + from hermes_cli.auth import PROVIDER_REGISTRY + pconfig = PROVIDER_REGISTRY.get(rt_provider) + if pconfig and getattr(pconfig, "auth_type", None) == "api_key": + rt_base = str(getattr(pconfig, "inference_base_url", "") or "").rstrip("/") + if rt_base and base_url_host_matches(base, base_url_hostname(rt_base)): + return rt_provider + except Exception: + pass return None -def _recover_provider_pool(provider: str, exc: Exception) -> bool: - """Try same-provider credential-pool recovery for auxiliary calls.""" +def _recover_provider_pool(provider: str, exc: Exception, *, failed_api_key: str = "") -> bool: + """Try same-provider credential-pool recovery for auxiliary calls. + + ``failed_api_key`` is the API key that was actually used for the failing + request. Passing it lets mark_exhausted_and_rotate identify the correct + pool entry even when another process has already rotated the pool (which + would leave current() as None, causing the wrong entry to be marked). + """ normalized = _normalize_aux_provider(provider) try: pool = load_pool(normalized) @@ -2512,6 +2542,7 @@ def _recover_provider_pool(provider: str, exc: Exception) -> bool: status_code = getattr(exc, "status_code", None) error_context = _pool_error_context(exc) + hint = failed_api_key or None if _is_auth_error(exc): refreshed = pool.try_refresh_current() @@ -2521,6 +2552,7 @@ def _recover_provider_pool(provider: str, exc: Exception) -> bool: next_entry = pool.mark_exhausted_and_rotate( status_code=status_code if status_code is not None else 401, error_context=error_context, + api_key_hint=hint, ) if next_entry is not None: _evict_cached_clients(normalized) @@ -2532,6 +2564,7 @@ def _recover_provider_pool(provider: str, exc: Exception) -> bool: next_entry = pool.mark_exhausted_and_rotate( status_code=status_code if status_code is not None else fallback_status, error_context=error_context, + api_key_hint=hint, ) if next_entry is not None: _evict_cached_clients(normalized) @@ -2936,6 +2969,11 @@ def _resolve_auto(main_runtime: Optional[Dict[str, Any]] = None) -> Tuple[Option resolved_provider = "custom" explicit_base_url = runtime_base_url explicit_api_key = runtime_api_key or None + elif runtime_api_key: + # Pin auxiliary to the same api_key as the active main chat session + # so that a working key is reused instead of re-selecting from the pool + # (which might pick a different, potentially exhausted key). + explicit_api_key = runtime_api_key # Skip Step-1 if the main provider was recently 402'd. The unhealthy # cache TTL bounds how long we bypass it, so a topped-up account # recovers automatically. If we tried Step-1 anyway, every aux call @@ -4328,13 +4366,25 @@ def _get_cached_client( else: effective = _compat_model(cached_client, model, cached_default) return cached_client, effective - # Build outside the lock + # Build outside the lock. + # For pool-backed api_key providers, derive the active API key from the + # pool entry rather than from env vars. resolve_api_key_provider_credentials + # always prefers env vars (first-entry bias), which bypasses pool rotation: + # after key #1 is marked exhausted the retry would still get key #1 from + # the env var and fail again, causing the retry2_err handler to mark key #2. + effective_api_key = api_key + if not effective_api_key: + _pe = _peek_pool_entry(_normalize_aux_provider(provider)) + if _pe is not None: + _pk = _pool_runtime_api_key(_pe) + if _pk: + effective_api_key = _pk client, default_model = resolve_provider_client( provider, model, async_mode, explicit_base_url=base_url, - explicit_api_key=api_key, + explicit_api_key=effective_api_key, api_mode=api_mode, main_runtime=runtime, is_vision=is_vision, @@ -4948,10 +4998,17 @@ def call_llm( ) # ── Same-provider credential-pool recovery ───────────────────── - pool_provider = _recoverable_pool_provider(resolved_provider, client) + pool_provider = _recoverable_pool_provider(resolved_provider, client, main_runtime=main_runtime) + # Capture the exact API key used so mark_exhausted_and_rotate can find + # the correct pool entry even when another process rotated the pool + # between this call and recovery (which leaves current()=None and makes + # _select_unlocked() return the NEXT key by mistake). + _client_api_key = str(getattr(client, "api_key", "") or "") if pool_provider and (_is_auth_error(first_err) or _is_payment_error(first_err) or _is_rate_limit_error(first_err)): recovery_err = first_err - if _is_rate_limit_error(first_err): + # Skip the extra retry for clear payment/quota errors — the endpoint + # won't accept another request with the same exhausted key. + if _is_rate_limit_error(first_err) and not _is_payment_error(first_err): try: return _validate_llm_response( client.chat.completions.create(**kwargs), task) @@ -4959,27 +5016,40 @@ def call_llm( if not (_is_auth_error(retry_err) or _is_payment_error(retry_err) or _is_rate_limit_error(retry_err)): raise recovery_err = retry_err - if _recover_provider_pool(pool_provider, recovery_err): + if _recover_provider_pool(pool_provider, recovery_err, failed_api_key=_client_api_key): logger.info( "Auxiliary %s: recovered %s via credential-pool rotation after %s", task or "call", pool_provider, type(recovery_err).__name__, ) - return _retry_same_provider_sync( - task=task, - resolved_provider=resolved_provider, - resolved_model=resolved_model, - resolved_base_url=resolved_base_url, - resolved_api_key=resolved_api_key, - resolved_api_mode=resolved_api_mode, - main_runtime=main_runtime, - final_model=final_model, - messages=messages, - temperature=temperature, - max_tokens=max_tokens, - tools=tools, - effective_timeout=effective_timeout, - effective_extra_body=effective_extra_body, - ) + try: + return _retry_same_provider_sync( + task=task, + resolved_provider=resolved_provider, + resolved_model=resolved_model, + resolved_base_url=resolved_base_url, + resolved_api_key=resolved_api_key, + resolved_api_mode=resolved_api_mode, + main_runtime=main_runtime, + final_model=final_model, + messages=messages, + temperature=temperature, + max_tokens=max_tokens, + tools=tools, + effective_timeout=effective_timeout, + effective_extra_body=effective_extra_body, + ) + except Exception as retry2_err: + # The rotated key also hit a quota/auth wall. Mark it + # immediately so concurrent processes don't make a + # redundant API call to discover it's exhausted too. + # Then fall through to the payment fallback below so + # alternative providers can still serve the request. + if (_is_payment_error(retry2_err) or _is_auth_error(retry2_err) + or _is_rate_limit_error(retry2_err)): + _recover_provider_pool(pool_provider, retry2_err) + first_err = retry2_err + else: + raise # ── Payment / credit exhaustion fallback ────────────────────── # When the resolved provider returns 402 or a credit-related error, @@ -5021,7 +5091,7 @@ def call_llm( # 402). Mark THAT label unhealthy so subsequent aux calls # skip it instead of paying another doomed RTT. _mark_provider_unhealthy( - _recoverable_pool_provider(resolved_provider, client) or resolved_provider + _recoverable_pool_provider(resolved_provider, client, main_runtime=main_runtime) or resolved_provider ) elif _is_rate_limit_error(first_err): reason = "rate limit" @@ -5141,6 +5211,7 @@ async def async_call_llm( model: str = None, base_url: str = None, api_key: str = None, + main_runtime: Optional[Dict[str, Any]] = None, messages: list, temperature: float = None, max_tokens: int = None, @@ -5327,10 +5398,13 @@ async def async_call_llm( ) # ── Same-provider credential-pool recovery (mirrors sync) ───── - pool_provider = _recoverable_pool_provider(resolved_provider, client) + pool_provider = _recoverable_pool_provider(resolved_provider, client, main_runtime=main_runtime) + _client_api_key = str(getattr(client, "api_key", "") or "") if pool_provider and (_is_auth_error(first_err) or _is_payment_error(first_err) or _is_rate_limit_error(first_err)): recovery_err = first_err - if _is_rate_limit_error(first_err): + # Skip the extra retry for clear payment/quota errors — the endpoint + # won't accept another request with the same exhausted key. + if _is_rate_limit_error(first_err) and not _is_payment_error(first_err): try: return _validate_llm_response( await client.chat.completions.create(**kwargs), task) @@ -5338,26 +5412,34 @@ async def async_call_llm( if not (_is_auth_error(retry_err) or _is_payment_error(retry_err) or _is_rate_limit_error(retry_err)): raise recovery_err = retry_err - if _recover_provider_pool(pool_provider, recovery_err): + if _recover_provider_pool(pool_provider, recovery_err, failed_api_key=_client_api_key): logger.info( "Auxiliary %s (async): recovered %s via credential-pool rotation after %s", task or "call", pool_provider, type(recovery_err).__name__, ) - return await _retry_same_provider_async( - task=task, - resolved_provider=resolved_provider, - resolved_model=resolved_model, - resolved_base_url=resolved_base_url, - resolved_api_key=resolved_api_key, - resolved_api_mode=resolved_api_mode, - final_model=final_model, - messages=messages, - temperature=temperature, - max_tokens=max_tokens, - tools=tools, - effective_timeout=effective_timeout, - effective_extra_body=effective_extra_body, - ) + try: + return await _retry_same_provider_async( + task=task, + resolved_provider=resolved_provider, + resolved_model=resolved_model, + resolved_base_url=resolved_base_url, + resolved_api_key=resolved_api_key, + resolved_api_mode=resolved_api_mode, + final_model=final_model, + messages=messages, + temperature=temperature, + max_tokens=max_tokens, + tools=tools, + effective_timeout=effective_timeout, + effective_extra_body=effective_extra_body, + ) + except Exception as retry2_err: + if (_is_payment_error(retry2_err) or _is_auth_error(retry2_err) + or _is_rate_limit_error(retry2_err)): + _recover_provider_pool(pool_provider, retry2_err) + first_err = retry2_err + else: + raise # ── Payment / connection / rate-limit fallback (mirrors sync call_llm) ── should_fallback = ( diff --git a/agent/credential_pool.py b/agent/credential_pool.py index 8c2bfe44229..60cd375d358 100644 --- a/agent/credential_pool.py +++ b/agent/credential_pool.py @@ -249,6 +249,16 @@ def _extract_retry_delay_seconds(message: str) -> Optional[float]: sec_match = re.search(r"retry\s+(?:after\s+)?(\d+(?:\.\d+)?)\s*(?:sec|secs|seconds|s\b)", message, re.IGNORECASE) if sec_match: return float(sec_match.group(1)) + # "Resets in 4hr 5min" format used by OpenCode Go weekly usage limits + hr_min_match = re.search(r"resets?\s+in\s+(\d+)\s*hr\s+(\d+)\s*min", message, re.IGNORECASE) + if hr_min_match: + return int(hr_min_match.group(1)) * 3600 + int(hr_min_match.group(2)) * 60 + hr_only_match = re.search(r"resets?\s+in\s+(\d+)\s*hr\b", message, re.IGNORECASE) + if hr_only_match: + return int(hr_only_match.group(1)) * 3600 + min_only_match = re.search(r"resets?\s+in\s+(\d+)\s*min\b", message, re.IGNORECASE) + if min_only_match: + return int(min_only_match.group(1)) * 60 return None @@ -1265,9 +1275,21 @@ class CredentialPool: *, status_code: Optional[int], error_context: Optional[Dict[str, Any]] = None, + api_key_hint: Optional[str] = None, ) -> Optional[PooledCredential]: with self._lock: - entry = self.current() or self._select_unlocked() + entry = None + if api_key_hint: + # Prefer the specific entry whose API key matches the one that + # actually failed. When this pool was freshly loaded from disk + # (another process already rotated), current() is None and + # _select_unlocked() would return the NEXT key — the wrong one. + entry = next( + (e for e in self._entries if e.runtime_api_key == api_key_hint), + None, + ) + if entry is None: + entry = self.current() or self._select_unlocked() if entry is None: return None _label = entry.label or entry.id[:8] diff --git a/tests/run_agent/test_run_agent.py b/tests/run_agent/test_run_agent.py index 7c49a002cff..7e26cfb9dfc 100644 --- a/tests/run_agent/test_run_agent.py +++ b/tests/run_agent/test_run_agent.py @@ -4089,6 +4089,25 @@ class TestCredentialPoolRecovery: assert context["reason"] == "usage_limit_reached" assert context["message"] == "The usage limit has been reached" + def test_extract_api_error_context_parses_resets_in_hours_and_minutes(self, agent, monkeypatch): + from agent import agent_runtime_helpers + + monkeypatch.setattr(agent_runtime_helpers.time, "time", lambda: 1_000.0) + error = SimpleNamespace( + body={ + "error": { + "type": "GoUsageLimitError", + "message": "Weekly usage limit reached. Resets in 6hr 29min.", + } + }, + response=SimpleNamespace(headers={}), + ) + + context = agent._extract_api_error_context(error) + + assert context["reason"] == "GoUsageLimitError" + assert context["reset_at"] == 1_000.0 + (6 * 60 * 60) + (29 * 60) + def test_recover_with_pool_passes_error_context_on_rotated_429(self, agent): next_entry = SimpleNamespace(label="secondary") captured = {}