hermes-agent/tests/agent/test_credential_pool_routing.py
Brian D. Evans 24461ec0bc fix(error-classifier): route 429 'overloaded' messages to FailoverReason.overloaded (#15297)
Some providers — notably Z.AI / Zhipu — return HTTP 429 with messages
like "The service may be temporarily overloaded, please try again later"
to signal **server-wide overload**, not per-credential rate limiting.
The two conditions need different recovery strategies:

| Reason | Correct strategy |
|---|---|
| ``rate_limit`` | Retry same credential once (the limit may have reset), then rotate |
| ``overloaded`` | Skip retry, rotate immediately (the whole endpoint is saturated) |

Before this fix:

- ``error_classifier.py`` mapped every 429 to ``FailoverReason.rate_limit``
  regardless of message body.
- ``FailoverReason.overloaded`` already existed as an enum value (and was
  produced for 503/529) but no production path emitted it for 429.
- ``_recover_with_credential_pool`` had no handler for ``overloaded`` —
  an ``overloaded`` classification fell through to the default no-op
  ``return False, has_retried_429`` line.

Net effect: every overload-language 429 burned a ``has_retried_429``
slot on the same saturated credential before the rotation happened, and
cron jobs (one turn each) often used their entire execution on that
wasted retry.

### Fix

Two narrow, additive changes:

1. ``agent/error_classifier.py`` — new ``_OVERLOADED_PATTERNS`` list
   containing provider-language overload phrases (``"temporarily
   overloaded"``, ``"server is overloaded"``, ``"at capacity"``, …).
   The 429 branch now checks the error body against this list and
   emits ``FailoverReason.overloaded`` when matched, preserving
   ``rate_limit`` for everything else.  Kept phrases narrow and
   provider-language-flavoured so a normal rate-limit message
   ("you have been rate-limited") doesn't hit this bucket.

2. ``run_agent.py::_recover_with_credential_pool`` — new branch for
   ``effective_reason == FailoverReason.overloaded``.  Same shape as
   the existing ``billing`` handler: rotate immediately via
   ``mark_exhausted_and_rotate(...)``, no retry-on-same-credential
   first.  The 503/529 ``overloaded`` classifications produced by the
   existing code now also flow through this branch.

### Tests (15 new, all passing on py3.11 venv)

``tests/agent/test_error_classifier.py`` — 7 cases:

- The exact #15297 Z.AI message → ``overloaded``
- 9-phrase parametrised matrix (server/service/upstream overloaded,
  at/over capacity, "currently overloaded", …) → all ``overloaded``
- Plain "Rate limit reached for requests per minute" 429 → still
  ``rate_limit`` (regression guard against the disambiguation
  silently broadening)
- Plain "Too Many Requests" 429 → still ``rate_limit``
- 503 path unchanged → still ``overloaded`` (negative control)

``tests/agent/test_credential_pool_routing.py`` —
``TestPoolOverloadedRotation`` (4 cases):

- Overloaded on first failure → rotates immediately (the fix)
- Overloaded with ``has_retried_429=True`` → still rotates (no retry-
  flag dependence — the rate-limit gate is specific to ``rate_limit``)
- Single-entry pool overload → ``recovered=False`` so outer fallback
  takes over rather than spinning
- ``rate_limit`` on first failure → still uses retry-first path
  (regression guard against the new branch broadening)

**Verified regression guards**: temporarily reverted the classifier's
overload branch; 10 of the 11 new classifier tests correctly failed
with clear assertion messages.  Restored fix → all 145 tests pass
(``test_error_classifier.py`` + ``test_credential_pool_routing.py``,
existing + new).

Closes #15297

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-24 16:39:48 -07:00

365 lines
15 KiB
Python

"""Tests for credential pool preservation through turn config and 429 recovery.
Covers:
1. CLI _resolve_turn_agent_config passes credential_pool to runtime dict
2. Gateway _resolve_turn_agent_config passes credential_pool to runtime dict
3. Eager fallback deferred when credential pool has credentials
4. Eager fallback fires when no credential pool exists
5. Full 429 rotation cycle: retry-same → rotate → exhaust → fallback
"""
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
# ---------------------------------------------------------------------------
# 1. CLI _resolve_turn_agent_config includes credential_pool
# ---------------------------------------------------------------------------
class TestCliTurnRoutePool:
def test_resolve_turn_includes_pool(self):
"""CLI's _resolve_turn_agent_config must pass credential_pool in runtime."""
fake_pool = MagicMock(name="FakePool")
shell = SimpleNamespace(
model="gpt-5.4",
api_key="sk-test",
base_url=None,
provider="openai-codex",
api_mode="codex_responses",
acp_command=None,
acp_args=[],
_credential_pool=fake_pool,
service_tier=None,
)
from cli import HermesCLI
bound = HermesCLI._resolve_turn_agent_config.__get__(shell)
route = bound("test message")
assert route["runtime"]["credential_pool"] is fake_pool
# ---------------------------------------------------------------------------
# 2. Gateway _resolve_turn_agent_config includes credential_pool
# ---------------------------------------------------------------------------
class TestGatewayTurnRoutePool:
def test_resolve_turn_includes_pool(self):
"""Gateway's _resolve_turn_agent_config must pass credential_pool."""
from gateway.run import GatewayRunner
fake_pool = MagicMock(name="FakePool")
runner = SimpleNamespace(_service_tier=None)
runtime_kwargs = {
"api_key": "***",
"base_url": None,
"provider": "openai-codex",
"api_mode": "codex_responses",
"command": None,
"args": [],
"credential_pool": fake_pool,
}
bound = GatewayRunner._resolve_turn_agent_config.__get__(runner)
route = bound("test message", "gpt-5.4", runtime_kwargs)
assert route["runtime"]["credential_pool"] is fake_pool
# ---------------------------------------------------------------------------
# 3 & 4. Eager fallback deferred/fires based on credential pool
# ---------------------------------------------------------------------------
class TestEagerFallbackWithPool:
"""Test the eager fallback guard in run_agent.py's error handling loop."""
def _make_agent(self, has_pool=True, pool_has_creds=True, has_fallback=True):
"""Create a minimal AIAgent mock with the fields needed."""
from run_agent import AIAgent
with patch.object(AIAgent, "__init__", lambda self, **kw: None):
agent = AIAgent()
agent._credential_pool = None
if has_pool:
pool = MagicMock()
pool.has_available.return_value = pool_has_creds
agent._credential_pool = pool
agent._fallback_chain = [{"model": "fallback/model"}] if has_fallback else []
agent._fallback_index = 0
agent._try_activate_fallback = MagicMock(return_value=True)
agent._emit_status = MagicMock()
return agent
def test_eager_fallback_deferred_when_pool_has_credentials(self):
"""429 with active pool should NOT trigger eager fallback."""
agent = self._make_agent(has_pool=True, pool_has_creds=True, has_fallback=True)
# Simulate the check from run_agent.py lines 7180-7191
is_rate_limited = True
if is_rate_limited and agent._fallback_index < len(agent._fallback_chain):
pool = agent._credential_pool
pool_may_recover = pool is not None and pool.has_available()
if not pool_may_recover:
agent._try_activate_fallback()
agent._try_activate_fallback.assert_not_called()
def test_eager_fallback_fires_when_no_pool(self):
"""429 without pool should trigger eager fallback."""
agent = self._make_agent(has_pool=False, has_fallback=True)
is_rate_limited = True
if is_rate_limited and agent._fallback_index < len(agent._fallback_chain):
pool = agent._credential_pool
pool_may_recover = pool is not None and pool.has_available()
if not pool_may_recover:
agent._try_activate_fallback()
agent._try_activate_fallback.assert_called_once()
def test_eager_fallback_fires_when_pool_exhausted(self):
"""429 with exhausted pool should trigger eager fallback."""
agent = self._make_agent(has_pool=True, pool_has_creds=False, has_fallback=True)
is_rate_limited = True
if is_rate_limited and agent._fallback_index < len(agent._fallback_chain):
pool = agent._credential_pool
pool_may_recover = pool is not None and pool.has_available()
if not pool_may_recover:
agent._try_activate_fallback()
agent._try_activate_fallback.assert_called_once()
# ---------------------------------------------------------------------------
# 5. Full 429 rotation cycle via _recover_with_credential_pool
# ---------------------------------------------------------------------------
class TestPoolRotationCycle:
"""Verify the retry-same → rotate → exhaust flow in _recover_with_credential_pool."""
def _make_agent_with_pool(self, pool_entries=3):
from run_agent import AIAgent
with patch.object(AIAgent, "__init__", lambda self, **kw: None):
agent = AIAgent()
entries = []
for i in range(pool_entries):
e = MagicMock(name=f"entry_{i}")
e.id = f"cred-{i}"
entries.append(e)
pool = MagicMock()
pool.has_credentials.return_value = True
# mark_exhausted_and_rotate returns next entry until exhausted
self._rotation_index = 0
def rotate(status_code=None, error_context=None):
self._rotation_index += 1
if self._rotation_index < pool_entries:
return entries[self._rotation_index]
pool.has_credentials.return_value = False
return None
pool.mark_exhausted_and_rotate = MagicMock(side_effect=rotate)
agent._credential_pool = pool
agent._swap_credential = MagicMock()
agent.log_prefix = ""
return agent, pool, entries
def test_first_429_sets_retry_flag_no_rotation(self):
"""First 429 should just set has_retried_429=True, no rotation."""
agent, pool, _ = self._make_agent_with_pool(3)
recovered, has_retried = agent._recover_with_credential_pool(
status_code=429, has_retried_429=False
)
assert recovered is False
assert has_retried is True
pool.mark_exhausted_and_rotate.assert_not_called()
def test_second_429_rotates_to_next(self):
"""Second consecutive 429 should rotate to next credential."""
agent, pool, entries = self._make_agent_with_pool(3)
recovered, has_retried = agent._recover_with_credential_pool(
status_code=429, has_retried_429=True
)
assert recovered is True
assert has_retried is False # reset after rotation
pool.mark_exhausted_and_rotate.assert_called_once_with(status_code=429, error_context=None)
agent._swap_credential.assert_called_once_with(entries[1])
def test_pool_exhaustion_returns_false(self):
"""When all credentials exhausted, recovery should return False."""
agent, pool, _ = self._make_agent_with_pool(1)
# First 429 sets flag
_, has_retried = agent._recover_with_credential_pool(
status_code=429, has_retried_429=False
)
assert has_retried is True
# Second 429 tries to rotate but pool is exhausted (only 1 entry)
recovered, _ = agent._recover_with_credential_pool(
status_code=429, has_retried_429=True
)
assert recovered is False
def test_402_immediate_rotation(self):
"""402 (billing) should immediately rotate, no retry-first."""
agent, pool, entries = self._make_agent_with_pool(3)
recovered, has_retried = agent._recover_with_credential_pool(
status_code=402, has_retried_429=False
)
assert recovered is True
assert has_retried is False
pool.mark_exhausted_and_rotate.assert_called_once_with(status_code=402, error_context=None)
def test_no_pool_returns_false(self):
"""No pool should return (False, unchanged)."""
from run_agent import AIAgent
with patch.object(AIAgent, "__init__", lambda self, **kw: None):
agent = AIAgent()
agent._credential_pool = None
recovered, has_retried = agent._recover_with_credential_pool(
status_code=429, has_retried_429=False
)
assert recovered is False
assert has_retried is False
# ---------------------------------------------------------------------------
# 6. FailoverReason.overloaded handler (#15297)
#
# When the classifier produces ``overloaded`` (e.g. Z.AI 429 "service is
# temporarily overloaded"), the recovery path must rotate immediately —
# the whole endpoint is down, retrying the same credential burns a slot
# and delays recovery. Same shape as ``billing``, distinct from
# ``rate_limit`` which retries-first.
# ---------------------------------------------------------------------------
class TestPoolOverloadedRotation:
"""Verify ``FailoverReason.overloaded`` rotates immediately, no retry-first."""
def _make_agent_with_pool(self, pool_entries=3):
"""Same factory shape as TestPoolRotationCycle — keep them parallel
so a future refactor of the agent's pool seam updates both
classes uniformly."""
from run_agent import AIAgent
with patch.object(AIAgent, "__init__", lambda self, **kw: None):
agent = AIAgent()
entries = []
for i in range(pool_entries):
e = MagicMock(name=f"entry_{i}")
e.id = f"cred-{i}"
entries.append(e)
pool = MagicMock()
pool.has_credentials.return_value = True
self._rotation_index = 0
def rotate(status_code=None, error_context=None):
self._rotation_index += 1
if self._rotation_index < pool_entries:
return entries[self._rotation_index]
pool.has_credentials.return_value = False
return None
pool.mark_exhausted_and_rotate = MagicMock(side_effect=rotate)
agent._credential_pool = pool
agent._swap_credential = MagicMock()
agent.log_prefix = ""
return agent, pool, entries
def test_overloaded_rotates_immediately_on_first_failure(self):
"""The #15297 fix: ``overloaded`` must NOT use the rate-limit
retry-first path. First failure rotates straight away because
retrying the same credential against a saturated provider just
burns the next ``has_retried_429`` slot too."""
from agent.error_classifier import FailoverReason
agent, pool, entries = self._make_agent_with_pool(3)
recovered, has_retried = agent._recover_with_credential_pool(
status_code=429,
has_retried_429=False, # first failure
classified_reason=FailoverReason.overloaded,
)
assert recovered is True, (
"overloaded must rotate on first failure, like billing — "
"not wait for has_retried_429 like rate_limit does"
)
assert has_retried is False
pool.mark_exhausted_and_rotate.assert_called_once_with(
status_code=429, error_context=None,
)
agent._swap_credential.assert_called_once_with(entries[1])
def test_overloaded_does_not_block_on_has_retried_flag(self):
"""The retry-first gate is specific to rate_limit. An
``overloaded`` reason must rotate regardless of whether the
retry flag is currently set or unset — same behaviour both
ways, no state leak from a previous rate-limit cycle."""
from agent.error_classifier import FailoverReason
agent, pool, entries = self._make_agent_with_pool(3)
recovered, has_retried = agent._recover_with_credential_pool(
status_code=429,
has_retried_429=True, # already retried — should still rotate
classified_reason=FailoverReason.overloaded,
)
assert recovered is True
assert has_retried is False # reset after rotation
pool.mark_exhausted_and_rotate.assert_called_once()
def test_overloaded_pool_exhaustion_returns_false(self):
"""When the pool only has one credential and it overloads, the
rotation step returns no next entry — the recovery call must
report ``recovered=False`` so the caller falls through to its
fallback chain rather than spinning."""
from agent.error_classifier import FailoverReason
agent, _pool, _ = self._make_agent_with_pool(1)
recovered, has_retried = agent._recover_with_credential_pool(
status_code=429,
has_retried_429=False,
classified_reason=FailoverReason.overloaded,
)
assert recovered is False, (
"single-entry pool that overloads must return recovered=False "
"so the outer fallback chain takes over"
)
# has_retried_429 is propagated unchanged in the no-rotation
# branch — pinned so a future tweak to the contract surfaces
# via this test.
assert has_retried is False
def test_rate_limit_still_uses_retry_first(self):
"""Negative control: ``rate_limit`` must KEEP its retry-first
semantics. A regression that broadened the overloaded handler
to also catch rate_limit would silently double-rotate every
per-key throttle and burn the pool fast."""
from agent.error_classifier import FailoverReason
agent, pool, _ = self._make_agent_with_pool(3)
recovered, has_retried = agent._recover_with_credential_pool(
status_code=429,
has_retried_429=False,
classified_reason=FailoverReason.rate_limit,
)
assert recovered is False, (
"rate_limit on first failure must NOT rotate — the retry-"
"first path is the contract"
)
assert has_retried is True
pool.mark_exhausted_and_rotate.assert_not_called()