fix(auth): preserve Codex pool-only rate-limit state

Classify exhausted pool-only openai-codex credentials as quota/rate-limited instead of missing auth. This prevents auth status and runtime credential resolution from reporting missing credentials when a valid manual:device_code pool credential exists but is temporarily in a 429 usage-limit cooldown.

Adds regression coverage for pool-only Codex auth status and runtime resolution.
This commit is contained in:
MrDiamondBallz 2026-06-15 11:10:55 -07:00 committed by Teknium
parent 6373aba80f
commit 9a59ad73dd
2 changed files with 179 additions and 0 deletions

View file

@ -3806,6 +3806,26 @@ def resolve_codex_runtime_credentials(
"last_refresh": None,
"auth_mode": "chatgpt",
}
pool_rate_limit = _codex_pool_rate_limit_status()
if pool_rate_limit:
reset_at = pool_rate_limit.get("reset_at")
if isinstance(reset_at, (int, float)) and reset_at > time.time():
remaining = int(reset_at - time.time())
message = (
f"Codex provider quota exhausted (429); retry after {remaining}s. "
"Credentials are still valid."
)
else:
message = (
"Codex provider quota exhausted (429). Credentials are still valid; "
"retry after the usage limit resets."
)
raise AuthError(
message,
provider="openai-codex",
code=CODEX_RATE_LIMITED_CODE,
relogin_required=False,
)
if read_error is not None:
raise read_error
raise AuthError(
@ -3852,6 +3872,79 @@ def resolve_codex_runtime_credentials(
}
def _codex_pool_rate_limit_status() -> Optional[Dict[str, Any]]:
"""Return metadata for a pool-only Codex credential in quota cooldown."""
def _parse_reset_at(value: Any) -> Optional[float]:
if value is None or value == "":
return None
if isinstance(value, (int, float)):
numeric = float(value)
if numeric <= 0:
return None
return numeric / 1000.0 if numeric > 1_000_000_000_000 else numeric
if isinstance(value, str):
raw = value.strip()
if not raw:
return None
try:
numeric = float(raw)
except ValueError:
numeric = None
if numeric is not None:
return numeric / 1000.0 if numeric > 1_000_000_000_000 else numeric
try:
return datetime.fromisoformat(raw.replace("Z", "+00:00")).timestamp()
except ValueError:
return None
return None
try:
with _auth_store_lock():
auth_store = _load_auth_store()
pool = auth_store.get("credential_pool")
if not isinstance(pool, dict):
return None
entries = pool.get("openai-codex")
if not isinstance(entries, list):
return None
now = time.time()
for entry in entries:
if not isinstance(entry, dict):
continue
token = entry.get("access_token")
if not isinstance(token, str) or not token.strip():
continue
if entry.get("last_status") != "exhausted":
continue
code = entry.get("last_error_code")
reason = str(entry.get("last_error_reason") or "").lower()
message = str(entry.get("last_error_message") or "").lower()
is_rate_limited = (
code == 429
or "rate_limit" in reason
or "usage_limit" in reason
or "quota" in reason
or "rate limit" in message
or "usage limit" in message
or "quota" in message
)
if not is_rate_limited:
continue
reset_at = _parse_reset_at(entry.get("last_error_reset_at"))
if reset_at is not None and reset_at <= now:
continue
return {
"label": entry.get("label"),
"last_refresh": entry.get("last_refresh"),
"reset_at": reset_at,
"reason": entry.get("last_error_reason"),
"message": entry.get("last_error_message"),
}
except Exception:
logger.debug("Codex pool rate-limit lookup failed", exc_info=True)
return None
def _pool_codex_access_token() -> str:
"""Return the most-recent usable access_token from the openai-codex pool.
@ -5907,6 +6000,22 @@ def get_codex_auth_status() -> Dict[str, Any]:
"source": f"pool:{getattr(entry, 'label', 'unknown')}",
"api_key": api_key,
}
rate_limit = _codex_pool_rate_limit_status()
if rate_limit:
return {
"logged_in": True,
"auth_store": str(_auth_file_path()),
"last_refresh": rate_limit.get("last_refresh"),
"auth_mode": "chatgpt",
"source": f"pool:{rate_limit.get('label') or 'unknown'}",
"rate_limited": True,
"error_code": CODEX_RATE_LIMITED_CODE,
"error": (
rate_limit.get("message")
or "Codex provider quota exhausted; retry after the usage limit resets."
),
"reset_at": rate_limit.get("reset_at"),
}
except Exception:
pass

View file

@ -4,6 +4,7 @@ from __future__ import annotations
import base64
import json
import time
from datetime import datetime, timezone
from unittest.mock import patch
@ -25,6 +26,37 @@ def _jwt_with_email(email: str) -> str:
return f"{header}.{payload}.signature"
def _codex_pool_only_store(*, exhausted: bool = False) -> dict:
entry = {
"id": "codex-1",
"label": "codex@example.com",
"auth_type": "oauth",
"priority": 0,
"source": "manual:device_code",
"access_token": _jwt_with_email("codex@example.com"),
"refresh_token": "refresh-token",
"base_url": "https://chatgpt.com/backend-api/codex",
"last_refresh": "2026-06-15T10:00:00Z",
}
if exhausted:
entry.update(
{
"last_status": "exhausted",
"last_status_at": time.time(),
"last_error_code": 429,
"last_error_reason": "usage_limit_reached",
"last_error_message": "The usage limit has been reached",
"last_error_reset_at": time.time() + 3600,
}
)
return {
"version": 1,
"active_provider": "openai-codex",
"providers": {},
"credential_pool": {"openai-codex": [entry]},
}
@pytest.fixture(autouse=True)
def _clear_provider_env(monkeypatch):
for key in (
@ -483,6 +515,44 @@ def test_auth_add_codex_oauth_keeps_distinct_pool_accounts(tmp_path, monkeypatch
assert payload["active_provider"] == "openai-codex"
def test_codex_auth_status_reports_pool_only_credential(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(tmp_path, _codex_pool_only_store())
from hermes_cli.auth import get_codex_auth_status
status = get_codex_auth_status()
assert status["logged_in"] is True
assert status["source"] == "pool:codex@example.com"
def test_codex_auth_status_reports_pool_only_rate_limit(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(tmp_path, _codex_pool_only_store(exhausted=True))
from hermes_cli.auth import get_codex_auth_status
status = get_codex_auth_status()
assert status["logged_in"] is True
assert status["rate_limited"] is True
assert status["error_code"] == "codex_rate_limited"
def test_codex_runtime_pool_only_rate_limit_is_not_missing_auth(tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path / "hermes"))
_write_auth_store(tmp_path, _codex_pool_only_store(exhausted=True))
from hermes_cli.auth import AuthError, CODEX_RATE_LIMITED_CODE, resolve_codex_runtime_credentials
with pytest.raises(AuthError) as exc_info:
resolve_codex_runtime_credentials()
assert exc_info.value.code == CODEX_RATE_LIMITED_CODE
assert exc_info.value.relogin_required is False
def test_auth_add_xai_oauth_sets_active_provider(tmp_path, monkeypatch):
"""hermes auth add xai-oauth must write providers singleton and set active_provider.