fix: Nous Portal rate limit guard — prevent retry amplification (#10568)

When Nous returns a 429, the retry amplification chain burns up to 9
API requests per conversation turn (3 SDK retries × 3 Hermes retries),
each counting against RPH and deepening the rate limit. With multiple
concurrent sessions (cron + gateway + auxiliary), this creates a spiral
where retries keep the limit tapped indefinitely.

New module: agent/nous_rate_guard.py
- Shared file-based rate limit state (~/.hermes/rate_limits/nous.json)
- Parses reset time from x-ratelimit-reset-requests-1h, x-ratelimit-
  reset-requests, retry-after headers, or error context
- Falls back to 5-minute default cooldown if no header data
- Atomic writes (tempfile + rename) for cross-process safety
- Auto-cleanup of expired state files

run_agent.py changes:
- Top-of-retry-loop guard: when another session already recorded Nous
  as rate-limited, skip the API call entirely. Try fallback provider
  first, then return a clear message with the reset time.
- On 429 from Nous: record rate limit state and skip further retries
  (sets retry_count = max_retries to trigger fallback path)
- On success from Nous: clear the rate limit state so other sessions
  know they can resume

auxiliary_client.py changes:
- _try_nous() checks rate guard before attempting Nous in the auxiliary
  fallback chain. When rate-limited, returns (None, None) so the chain
  skips to the next provider instead of piling more requests onto Nous.

This eliminates three sources of amplification:
1. Hermes-level retries (saves 6 of 9 calls per turn)
2. Cross-session retries (cron + gateway all skip Nous)
3. Auxiliary fallback to Nous (compression/session_search skip too)

Includes 24 tests covering the rate guard module, header parsing,
state lifecycle, and auxiliary client integration.
This commit is contained in:
Teknium 2026-04-15 16:31:48 -07:00 committed by GitHub
parent 0d05bd34f8
commit 9d9b424390
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 538 additions and 0 deletions

View file

@ -775,6 +775,21 @@ def _try_openrouter() -> Tuple[Optional[OpenAI], Optional[str]]:
def _try_nous(vision: bool = False) -> Tuple[Optional[OpenAI], Optional[str]]: def _try_nous(vision: bool = False) -> Tuple[Optional[OpenAI], Optional[str]]:
# Check cross-session rate limit guard before attempting Nous —
# if another session already recorded a 429, skip Nous entirely
# to avoid piling more requests onto the tapped RPH bucket.
try:
from agent.nous_rate_guard import nous_rate_limit_remaining
_remaining = nous_rate_limit_remaining()
if _remaining is not None and _remaining > 0:
logger.debug(
"Auxiliary: skipping Nous Portal (rate-limited, resets in %.0fs)",
_remaining,
)
return None, None
except Exception:
pass
nous = _read_nous_auth() nous = _read_nous_auth()
if not nous: if not nous:
return None, None return None, None

182
agent/nous_rate_guard.py Normal file
View file

@ -0,0 +1,182 @@
"""Cross-session rate limit guard for Nous Portal.
Writes rate limit state to a shared file so all sessions (CLI, gateway,
cron, auxiliary) can check whether Nous Portal is currently rate-limited
before making requests. Prevents retry amplification when RPH is tapped.
Each 429 from Nous triggers up to 9 API calls per conversation turn
(3 SDK retries x 3 Hermes retries), and every one of those calls counts
against RPH. By recording the rate limit state on first 429 and checking
it before subsequent attempts, we eliminate the amplification effect.
"""
from __future__ import annotations
import json
import logging
import os
import tempfile
import time
from typing import Any, Mapping, Optional
logger = logging.getLogger(__name__)
_STATE_SUBDIR = "rate_limits"
_STATE_FILENAME = "nous.json"
def _state_path() -> str:
"""Return the path to the Nous rate limit state file."""
try:
from hermes_constants import get_hermes_home
base = get_hermes_home()
except ImportError:
base = os.path.join(os.path.expanduser("~"), ".hermes")
return os.path.join(base, _STATE_SUBDIR, _STATE_FILENAME)
def _parse_reset_seconds(headers: Optional[Mapping[str, str]]) -> Optional[float]:
"""Extract the best available reset-time estimate from response headers.
Priority:
1. x-ratelimit-reset-requests-1h (hourly RPH window most useful)
2. x-ratelimit-reset-requests (per-minute RPM window)
3. retry-after (generic HTTP header)
Returns seconds-from-now, or None if no usable header found.
"""
if not headers:
return None
lowered = {k.lower(): v for k, v in headers.items()}
for key in (
"x-ratelimit-reset-requests-1h",
"x-ratelimit-reset-requests",
"retry-after",
):
raw = lowered.get(key)
if raw is not None:
try:
val = float(raw)
if val > 0:
return val
except (TypeError, ValueError):
pass
return None
def record_nous_rate_limit(
*,
headers: Optional[Mapping[str, str]] = None,
error_context: Optional[dict[str, Any]] = None,
default_cooldown: float = 300.0,
) -> None:
"""Record that Nous Portal is rate-limited.
Parses the reset time from response headers or error context.
Falls back to ``default_cooldown`` (5 minutes) if no reset info
is available. Writes to a shared file that all sessions can read.
Args:
headers: HTTP response headers from the 429 error.
error_context: Structured error context from _extract_api_error_context().
default_cooldown: Fallback cooldown in seconds when no header data.
"""
now = time.time()
reset_at = None
# Try headers first (most accurate)
header_seconds = _parse_reset_seconds(headers)
if header_seconds is not None:
reset_at = now + header_seconds
# Try error_context reset_at (from body parsing)
if reset_at is None and isinstance(error_context, dict):
ctx_reset = error_context.get("reset_at")
if isinstance(ctx_reset, (int, float)) and ctx_reset > now:
reset_at = float(ctx_reset)
# Default cooldown
if reset_at is None:
reset_at = now + default_cooldown
path = _state_path()
try:
state_dir = os.path.dirname(path)
os.makedirs(state_dir, exist_ok=True)
state = {
"reset_at": reset_at,
"recorded_at": now,
"reset_seconds": reset_at - now,
}
# Atomic write: write to temp file + rename
fd, tmp_path = tempfile.mkstemp(dir=state_dir, suffix=".tmp")
try:
with os.fdopen(fd, "w") as f:
json.dump(state, f)
os.replace(tmp_path, path)
except Exception:
# Clean up temp file on failure
try:
os.unlink(tmp_path)
except OSError:
pass
raise
logger.info(
"Nous rate limit recorded: resets in %.0fs (at %.0f)",
reset_at - now, reset_at,
)
except Exception as exc:
logger.debug("Failed to write Nous rate limit state: %s", exc)
def nous_rate_limit_remaining() -> Optional[float]:
"""Check if Nous Portal is currently rate-limited.
Returns:
Seconds remaining until reset, or None if not rate-limited.
"""
path = _state_path()
try:
with open(path) as f:
state = json.load(f)
reset_at = state.get("reset_at", 0)
remaining = reset_at - time.time()
if remaining > 0:
return remaining
# Expired — clean up
try:
os.unlink(path)
except OSError:
pass
return None
except (FileNotFoundError, json.JSONDecodeError, KeyError, TypeError):
return None
def clear_nous_rate_limit() -> None:
"""Clear the rate limit state (e.g., after a successful Nous request)."""
try:
os.unlink(_state_path())
except FileNotFoundError:
pass
except OSError as exc:
logger.debug("Failed to clear Nous rate limit state: %s", exc)
def format_remaining(seconds: float) -> str:
"""Format seconds remaining into human-readable duration."""
s = max(0, int(seconds))
if s < 60:
return f"{s}s"
if s < 3600:
m, sec = divmod(s, 60)
return f"{m}m {sec}s" if sec else f"{m}m"
h, remainder = divmod(s, 3600)
m = remainder // 60
return f"{h}h {m}m" if m else f"{h}h"

View file

@ -8660,6 +8660,53 @@ class AIAgent:
api_kwargs = None # Guard against UnboundLocalError in except handler api_kwargs = None # Guard against UnboundLocalError in except handler
while retry_count < max_retries: while retry_count < max_retries:
# ── Nous Portal rate limit guard ──────────────────────
# If another session already recorded that Nous is rate-
# limited, skip the API call entirely. Each attempt
# (including SDK-level retries) counts against RPH and
# deepens the rate limit hole.
if self.provider == "nous":
try:
from agent.nous_rate_guard import (
nous_rate_limit_remaining,
format_remaining as _fmt_nous_remaining,
)
_nous_remaining = nous_rate_limit_remaining()
if _nous_remaining is not None and _nous_remaining > 0:
_nous_msg = (
f"Nous Portal rate limit active — "
f"resets in {_fmt_nous_remaining(_nous_remaining)}."
)
self._vprint(
f"{self.log_prefix}{_nous_msg} Trying fallback...",
force=True,
)
self._emit_status(f"{_nous_msg}")
if self._try_activate_fallback():
retry_count = 0
compression_attempts = 0
primary_recovery_attempted = False
continue
# No fallback available — return with clear message
self._persist_session(messages, conversation_history)
return {
"final_response": (
f"{_nous_msg}\n\n"
"No fallback provider available. "
"Try again after the reset, or add a "
"fallback provider in config.yaml."
),
"messages": messages,
"api_calls": api_call_count,
"completed": False,
"failed": True,
"error": _nous_msg,
}
except ImportError:
pass
except Exception:
pass # Never let rate guard break the agent loop
try: try:
self._reset_stream_delivery_tracking() self._reset_stream_delivery_tracking()
api_kwargs = self._build_api_kwargs(api_messages) api_kwargs = self._build_api_kwargs(api_messages)
@ -9248,6 +9295,15 @@ class AIAgent:
self._vprint(f"{self.log_prefix} 💾 Cache: {cached:,}/{prompt:,} tokens ({hit_pct:.0f}% hit, {written:,} written)") self._vprint(f"{self.log_prefix} 💾 Cache: {cached:,}/{prompt:,} tokens ({hit_pct:.0f}% hit, {written:,} written)")
has_retried_429 = False # Reset on success has_retried_429 = False # Reset on success
# Clear Nous rate limit state on successful request —
# proves the limit has reset and other sessions can
# resume hitting Nous.
if self.provider == "nous":
try:
from agent.nous_rate_guard import clear_nous_rate_limit
clear_nous_rate_limit()
except Exception:
pass
self._touch_activity(f"API call #{api_call_count} completed") self._touch_activity(f"API call #{api_call_count} completed")
break # Success, exit retry loop break # Success, exit retry loop
@ -9659,6 +9715,38 @@ class AIAgent:
primary_recovery_attempted = False primary_recovery_attempted = False
continue continue
# ── Nous Portal: record rate limit & skip retries ─────
# When Nous returns a 429, record the reset time to a
# shared file so ALL sessions (cron, gateway, auxiliary)
# know not to pile on. Then skip further retries —
# each one burns another RPH request and deepens the
# rate limit hole. The retry loop's top-of-iteration
# guard will catch this on the next pass and try
# fallback or bail with a clear message.
if (
is_rate_limited
and self.provider == "nous"
and classified.reason == FailoverReason.rate_limit
and not recovered_with_pool
):
try:
from agent.nous_rate_guard import record_nous_rate_limit
_err_resp = getattr(api_error, "response", None)
_err_hdrs = (
getattr(_err_resp, "headers", None)
if _err_resp else None
)
record_nous_rate_limit(
headers=_err_hdrs,
error_context=error_context,
)
except Exception:
pass
# Skip straight to max_retries — the top-of-loop
# guard will handle fallback or bail cleanly.
retry_count = max_retries
continue
is_payload_too_large = ( is_payload_too_large = (
classified.reason == FailoverReason.payload_too_large classified.reason == FailoverReason.payload_too_large
) )

View file

@ -0,0 +1,253 @@
"""Tests for agent/nous_rate_guard.py — cross-session Nous Portal rate limit guard."""
import json
import os
import time
import pytest
@pytest.fixture
def rate_guard_env(tmp_path, monkeypatch):
"""Isolate rate guard state to a temp directory."""
hermes_home = str(tmp_path / ".hermes")
os.makedirs(hermes_home, exist_ok=True)
monkeypatch.setenv("HERMES_HOME", hermes_home)
# Clear any cached module-level imports
return hermes_home
class TestRecordNousRateLimit:
"""Test recording rate limit state."""
def test_records_with_header_reset(self, rate_guard_env):
from agent.nous_rate_guard import record_nous_rate_limit, _state_path
headers = {"x-ratelimit-reset-requests-1h": "1800"}
record_nous_rate_limit(headers=headers)
path = _state_path()
assert os.path.exists(path)
with open(path) as f:
state = json.load(f)
assert state["reset_seconds"] == pytest.approx(1800, abs=2)
assert state["reset_at"] > time.time()
def test_records_with_per_minute_header(self, rate_guard_env):
from agent.nous_rate_guard import record_nous_rate_limit, _state_path
headers = {"x-ratelimit-reset-requests": "45"}
record_nous_rate_limit(headers=headers)
with open(_state_path()) as f:
state = json.load(f)
assert state["reset_seconds"] == pytest.approx(45, abs=2)
def test_records_with_retry_after_header(self, rate_guard_env):
from agent.nous_rate_guard import record_nous_rate_limit, _state_path
headers = {"retry-after": "60"}
record_nous_rate_limit(headers=headers)
with open(_state_path()) as f:
state = json.load(f)
assert state["reset_seconds"] == pytest.approx(60, abs=2)
def test_prefers_hourly_over_per_minute(self, rate_guard_env):
from agent.nous_rate_guard import record_nous_rate_limit, _state_path
headers = {
"x-ratelimit-reset-requests-1h": "1800",
"x-ratelimit-reset-requests": "45",
}
record_nous_rate_limit(headers=headers)
with open(_state_path()) as f:
state = json.load(f)
# Should use the hourly value, not the per-minute one
assert state["reset_seconds"] == pytest.approx(1800, abs=2)
def test_falls_back_to_error_context_reset_at(self, rate_guard_env):
from agent.nous_rate_guard import record_nous_rate_limit, _state_path
future_reset = time.time() + 900
record_nous_rate_limit(
headers=None,
error_context={"reset_at": future_reset},
)
with open(_state_path()) as f:
state = json.load(f)
assert state["reset_at"] == pytest.approx(future_reset, abs=1)
def test_falls_back_to_default_cooldown(self, rate_guard_env):
from agent.nous_rate_guard import record_nous_rate_limit, _state_path
record_nous_rate_limit(headers=None)
with open(_state_path()) as f:
state = json.load(f)
# Default is 300 seconds (5 minutes)
assert state["reset_seconds"] == pytest.approx(300, abs=2)
def test_custom_default_cooldown(self, rate_guard_env):
from agent.nous_rate_guard import record_nous_rate_limit, _state_path
record_nous_rate_limit(headers=None, default_cooldown=120.0)
with open(_state_path()) as f:
state = json.load(f)
assert state["reset_seconds"] == pytest.approx(120, abs=2)
def test_creates_directory_if_missing(self, rate_guard_env):
from agent.nous_rate_guard import record_nous_rate_limit, _state_path
record_nous_rate_limit(headers={"retry-after": "10"})
assert os.path.exists(_state_path())
class TestNousRateLimitRemaining:
"""Test checking remaining rate limit time."""
def test_returns_none_when_no_file(self, rate_guard_env):
from agent.nous_rate_guard import nous_rate_limit_remaining
assert nous_rate_limit_remaining() is None
def test_returns_remaining_seconds_when_active(self, rate_guard_env):
from agent.nous_rate_guard import record_nous_rate_limit, nous_rate_limit_remaining
record_nous_rate_limit(headers={"x-ratelimit-reset-requests-1h": "600"})
remaining = nous_rate_limit_remaining()
assert remaining is not None
assert 595 < remaining <= 605 # ~600 seconds, allowing for test execution time
def test_returns_none_when_expired(self, rate_guard_env):
from agent.nous_rate_guard import nous_rate_limit_remaining, _state_path
# Write an already-expired state
state_dir = os.path.dirname(_state_path())
os.makedirs(state_dir, exist_ok=True)
with open(_state_path(), "w") as f:
json.dump({"reset_at": time.time() - 10, "recorded_at": time.time() - 100}, f)
assert nous_rate_limit_remaining() is None
# File should be cleaned up
assert not os.path.exists(_state_path())
def test_handles_corrupt_file(self, rate_guard_env):
from agent.nous_rate_guard import nous_rate_limit_remaining, _state_path
state_dir = os.path.dirname(_state_path())
os.makedirs(state_dir, exist_ok=True)
with open(_state_path(), "w") as f:
f.write("not valid json{{{")
assert nous_rate_limit_remaining() is None
class TestClearNousRateLimit:
"""Test clearing rate limit state."""
def test_clears_existing_file(self, rate_guard_env):
from agent.nous_rate_guard import (
record_nous_rate_limit,
clear_nous_rate_limit,
nous_rate_limit_remaining,
_state_path,
)
record_nous_rate_limit(headers={"retry-after": "600"})
assert nous_rate_limit_remaining() is not None
clear_nous_rate_limit()
assert nous_rate_limit_remaining() is None
assert not os.path.exists(_state_path())
def test_clear_when_no_file(self, rate_guard_env):
from agent.nous_rate_guard import clear_nous_rate_limit
# Should not raise
clear_nous_rate_limit()
class TestFormatRemaining:
"""Test human-readable duration formatting."""
def test_seconds(self):
from agent.nous_rate_guard import format_remaining
assert format_remaining(30) == "30s"
def test_minutes(self):
from agent.nous_rate_guard import format_remaining
assert format_remaining(125) == "2m 5s"
def test_exact_minutes(self):
from agent.nous_rate_guard import format_remaining
assert format_remaining(120) == "2m"
def test_hours(self):
from agent.nous_rate_guard import format_remaining
assert format_remaining(3720) == "1h 2m"
class TestParseResetSeconds:
"""Test header parsing for reset times."""
def test_case_insensitive_headers(self, rate_guard_env):
from agent.nous_rate_guard import _parse_reset_seconds
headers = {"X-Ratelimit-Reset-Requests-1h": "1200"}
assert _parse_reset_seconds(headers) == 1200.0
def test_returns_none_for_empty_headers(self):
from agent.nous_rate_guard import _parse_reset_seconds
assert _parse_reset_seconds(None) is None
assert _parse_reset_seconds({}) is None
def test_ignores_zero_values(self):
from agent.nous_rate_guard import _parse_reset_seconds
headers = {"x-ratelimit-reset-requests-1h": "0"}
assert _parse_reset_seconds(headers) is None
def test_ignores_invalid_values(self):
from agent.nous_rate_guard import _parse_reset_seconds
headers = {"x-ratelimit-reset-requests-1h": "not-a-number"}
assert _parse_reset_seconds(headers) is None
class TestAuxiliaryClientIntegration:
"""Test that the auxiliary client respects the rate guard."""
def test_try_nous_skips_when_rate_limited(self, rate_guard_env, monkeypatch):
from agent.nous_rate_guard import record_nous_rate_limit
# Record a rate limit
record_nous_rate_limit(headers={"retry-after": "600"})
# Mock _read_nous_auth to return valid creds (would normally succeed)
import agent.auxiliary_client as aux
monkeypatch.setattr(aux, "_read_nous_auth", lambda: {
"access_token": "test-token",
"inference_base_url": "https://api.nous.test/v1",
})
result = aux._try_nous()
assert result == (None, None)
def test_try_nous_works_when_not_rate_limited(self, rate_guard_env, monkeypatch):
import agent.auxiliary_client as aux
# No rate limit recorded — _try_nous should proceed normally
# (will return None because no real creds, but won't be blocked
# by the rate guard)
monkeypatch.setattr(aux, "_read_nous_auth", lambda: None)
result = aux._try_nous()
assert result == (None, None)