diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index fd2f2d812..9702da941 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -775,6 +775,21 @@ def _try_openrouter() -> Tuple[Optional[OpenAI], Optional[str]]: def _try_nous(vision: bool = False) -> Tuple[Optional[OpenAI], Optional[str]]: + # Check cross-session rate limit guard before attempting Nous — + # if another session already recorded a 429, skip Nous entirely + # to avoid piling more requests onto the tapped RPH bucket. + try: + from agent.nous_rate_guard import nous_rate_limit_remaining + _remaining = nous_rate_limit_remaining() + if _remaining is not None and _remaining > 0: + logger.debug( + "Auxiliary: skipping Nous Portal (rate-limited, resets in %.0fs)", + _remaining, + ) + return None, None + except Exception: + pass + nous = _read_nous_auth() if not nous: return None, None diff --git a/agent/nous_rate_guard.py b/agent/nous_rate_guard.py new file mode 100644 index 000000000..712d8a0f1 --- /dev/null +++ b/agent/nous_rate_guard.py @@ -0,0 +1,182 @@ +"""Cross-session rate limit guard for Nous Portal. + +Writes rate limit state to a shared file so all sessions (CLI, gateway, +cron, auxiliary) can check whether Nous Portal is currently rate-limited +before making requests. Prevents retry amplification when RPH is tapped. + +Each 429 from Nous triggers up to 9 API calls per conversation turn +(3 SDK retries x 3 Hermes retries), and every one of those calls counts +against RPH. By recording the rate limit state on first 429 and checking +it before subsequent attempts, we eliminate the amplification effect. +""" + +from __future__ import annotations + +import json +import logging +import os +import tempfile +import time +from typing import Any, Mapping, Optional + +logger = logging.getLogger(__name__) + +_STATE_SUBDIR = "rate_limits" +_STATE_FILENAME = "nous.json" + + +def _state_path() -> str: + """Return the path to the Nous rate limit state file.""" + try: + from hermes_constants import get_hermes_home + base = get_hermes_home() + except ImportError: + base = os.path.join(os.path.expanduser("~"), ".hermes") + return os.path.join(base, _STATE_SUBDIR, _STATE_FILENAME) + + +def _parse_reset_seconds(headers: Optional[Mapping[str, str]]) -> Optional[float]: + """Extract the best available reset-time estimate from response headers. + + Priority: + 1. x-ratelimit-reset-requests-1h (hourly RPH window — most useful) + 2. x-ratelimit-reset-requests (per-minute RPM window) + 3. retry-after (generic HTTP header) + + Returns seconds-from-now, or None if no usable header found. + """ + if not headers: + return None + + lowered = {k.lower(): v for k, v in headers.items()} + + for key in ( + "x-ratelimit-reset-requests-1h", + "x-ratelimit-reset-requests", + "retry-after", + ): + raw = lowered.get(key) + if raw is not None: + try: + val = float(raw) + if val > 0: + return val + except (TypeError, ValueError): + pass + + return None + + +def record_nous_rate_limit( + *, + headers: Optional[Mapping[str, str]] = None, + error_context: Optional[dict[str, Any]] = None, + default_cooldown: float = 300.0, +) -> None: + """Record that Nous Portal is rate-limited. + + Parses the reset time from response headers or error context. + Falls back to ``default_cooldown`` (5 minutes) if no reset info + is available. Writes to a shared file that all sessions can read. + + Args: + headers: HTTP response headers from the 429 error. + error_context: Structured error context from _extract_api_error_context(). + default_cooldown: Fallback cooldown in seconds when no header data. + """ + now = time.time() + reset_at = None + + # Try headers first (most accurate) + header_seconds = _parse_reset_seconds(headers) + if header_seconds is not None: + reset_at = now + header_seconds + + # Try error_context reset_at (from body parsing) + if reset_at is None and isinstance(error_context, dict): + ctx_reset = error_context.get("reset_at") + if isinstance(ctx_reset, (int, float)) and ctx_reset > now: + reset_at = float(ctx_reset) + + # Default cooldown + if reset_at is None: + reset_at = now + default_cooldown + + path = _state_path() + try: + state_dir = os.path.dirname(path) + os.makedirs(state_dir, exist_ok=True) + + state = { + "reset_at": reset_at, + "recorded_at": now, + "reset_seconds": reset_at - now, + } + + # Atomic write: write to temp file + rename + fd, tmp_path = tempfile.mkstemp(dir=state_dir, suffix=".tmp") + try: + with os.fdopen(fd, "w") as f: + json.dump(state, f) + os.replace(tmp_path, path) + except Exception: + # Clean up temp file on failure + try: + os.unlink(tmp_path) + except OSError: + pass + raise + + logger.info( + "Nous rate limit recorded: resets in %.0fs (at %.0f)", + reset_at - now, reset_at, + ) + except Exception as exc: + logger.debug("Failed to write Nous rate limit state: %s", exc) + + +def nous_rate_limit_remaining() -> Optional[float]: + """Check if Nous Portal is currently rate-limited. + + Returns: + Seconds remaining until reset, or None if not rate-limited. + """ + path = _state_path() + try: + with open(path) as f: + state = json.load(f) + reset_at = state.get("reset_at", 0) + remaining = reset_at - time.time() + if remaining > 0: + return remaining + # Expired — clean up + try: + os.unlink(path) + except OSError: + pass + return None + except (FileNotFoundError, json.JSONDecodeError, KeyError, TypeError): + return None + + +def clear_nous_rate_limit() -> None: + """Clear the rate limit state (e.g., after a successful Nous request).""" + try: + os.unlink(_state_path()) + except FileNotFoundError: + pass + except OSError as exc: + logger.debug("Failed to clear Nous rate limit state: %s", exc) + + +def format_remaining(seconds: float) -> str: + """Format seconds remaining into human-readable duration.""" + s = max(0, int(seconds)) + if s < 60: + return f"{s}s" + if s < 3600: + m, sec = divmod(s, 60) + return f"{m}m {sec}s" if sec else f"{m}m" + h, remainder = divmod(s, 3600) + m = remainder // 60 + return f"{h}h {m}m" if m else f"{h}h" diff --git a/run_agent.py b/run_agent.py index 6cbc6f6ee..f199d806d 100644 --- a/run_agent.py +++ b/run_agent.py @@ -8660,6 +8660,53 @@ class AIAgent: api_kwargs = None # Guard against UnboundLocalError in except handler while retry_count < max_retries: + # ── Nous Portal rate limit guard ────────────────────── + # If another session already recorded that Nous is rate- + # limited, skip the API call entirely. Each attempt + # (including SDK-level retries) counts against RPH and + # deepens the rate limit hole. + if self.provider == "nous": + try: + from agent.nous_rate_guard import ( + nous_rate_limit_remaining, + format_remaining as _fmt_nous_remaining, + ) + _nous_remaining = nous_rate_limit_remaining() + if _nous_remaining is not None and _nous_remaining > 0: + _nous_msg = ( + f"Nous Portal rate limit active — " + f"resets in {_fmt_nous_remaining(_nous_remaining)}." + ) + self._vprint( + f"{self.log_prefix}⏳ {_nous_msg} Trying fallback...", + force=True, + ) + self._emit_status(f"⏳ {_nous_msg}") + if self._try_activate_fallback(): + retry_count = 0 + compression_attempts = 0 + primary_recovery_attempted = False + continue + # No fallback available — return with clear message + self._persist_session(messages, conversation_history) + return { + "final_response": ( + f"⏳ {_nous_msg}\n\n" + "No fallback provider available. " + "Try again after the reset, or add a " + "fallback provider in config.yaml." + ), + "messages": messages, + "api_calls": api_call_count, + "completed": False, + "failed": True, + "error": _nous_msg, + } + except ImportError: + pass + except Exception: + pass # Never let rate guard break the agent loop + try: self._reset_stream_delivery_tracking() api_kwargs = self._build_api_kwargs(api_messages) @@ -9248,6 +9295,15 @@ class AIAgent: self._vprint(f"{self.log_prefix} 💾 Cache: {cached:,}/{prompt:,} tokens ({hit_pct:.0f}% hit, {written:,} written)") has_retried_429 = False # Reset on success + # Clear Nous rate limit state on successful request — + # proves the limit has reset and other sessions can + # resume hitting Nous. + if self.provider == "nous": + try: + from agent.nous_rate_guard import clear_nous_rate_limit + clear_nous_rate_limit() + except Exception: + pass self._touch_activity(f"API call #{api_call_count} completed") break # Success, exit retry loop @@ -9659,6 +9715,38 @@ class AIAgent: primary_recovery_attempted = False continue + # ── Nous Portal: record rate limit & skip retries ───── + # When Nous returns a 429, record the reset time to a + # shared file so ALL sessions (cron, gateway, auxiliary) + # know not to pile on. Then skip further retries — + # each one burns another RPH request and deepens the + # rate limit hole. The retry loop's top-of-iteration + # guard will catch this on the next pass and try + # fallback or bail with a clear message. + if ( + is_rate_limited + and self.provider == "nous" + and classified.reason == FailoverReason.rate_limit + and not recovered_with_pool + ): + try: + from agent.nous_rate_guard import record_nous_rate_limit + _err_resp = getattr(api_error, "response", None) + _err_hdrs = ( + getattr(_err_resp, "headers", None) + if _err_resp else None + ) + record_nous_rate_limit( + headers=_err_hdrs, + error_context=error_context, + ) + except Exception: + pass + # Skip straight to max_retries — the top-of-loop + # guard will handle fallback or bail cleanly. + retry_count = max_retries + continue + is_payload_too_large = ( classified.reason == FailoverReason.payload_too_large ) diff --git a/tests/agent/test_nous_rate_guard.py b/tests/agent/test_nous_rate_guard.py new file mode 100644 index 000000000..45d30f724 --- /dev/null +++ b/tests/agent/test_nous_rate_guard.py @@ -0,0 +1,253 @@ +"""Tests for agent/nous_rate_guard.py — cross-session Nous Portal rate limit guard.""" + +import json +import os +import time + +import pytest + + +@pytest.fixture +def rate_guard_env(tmp_path, monkeypatch): + """Isolate rate guard state to a temp directory.""" + hermes_home = str(tmp_path / ".hermes") + os.makedirs(hermes_home, exist_ok=True) + monkeypatch.setenv("HERMES_HOME", hermes_home) + # Clear any cached module-level imports + return hermes_home + + +class TestRecordNousRateLimit: + """Test recording rate limit state.""" + + def test_records_with_header_reset(self, rate_guard_env): + from agent.nous_rate_guard import record_nous_rate_limit, _state_path + + headers = {"x-ratelimit-reset-requests-1h": "1800"} + record_nous_rate_limit(headers=headers) + + path = _state_path() + assert os.path.exists(path) + with open(path) as f: + state = json.load(f) + assert state["reset_seconds"] == pytest.approx(1800, abs=2) + assert state["reset_at"] > time.time() + + def test_records_with_per_minute_header(self, rate_guard_env): + from agent.nous_rate_guard import record_nous_rate_limit, _state_path + + headers = {"x-ratelimit-reset-requests": "45"} + record_nous_rate_limit(headers=headers) + + with open(_state_path()) as f: + state = json.load(f) + assert state["reset_seconds"] == pytest.approx(45, abs=2) + + def test_records_with_retry_after_header(self, rate_guard_env): + from agent.nous_rate_guard import record_nous_rate_limit, _state_path + + headers = {"retry-after": "60"} + record_nous_rate_limit(headers=headers) + + with open(_state_path()) as f: + state = json.load(f) + assert state["reset_seconds"] == pytest.approx(60, abs=2) + + def test_prefers_hourly_over_per_minute(self, rate_guard_env): + from agent.nous_rate_guard import record_nous_rate_limit, _state_path + + headers = { + "x-ratelimit-reset-requests-1h": "1800", + "x-ratelimit-reset-requests": "45", + } + record_nous_rate_limit(headers=headers) + + with open(_state_path()) as f: + state = json.load(f) + # Should use the hourly value, not the per-minute one + assert state["reset_seconds"] == pytest.approx(1800, abs=2) + + def test_falls_back_to_error_context_reset_at(self, rate_guard_env): + from agent.nous_rate_guard import record_nous_rate_limit, _state_path + + future_reset = time.time() + 900 + record_nous_rate_limit( + headers=None, + error_context={"reset_at": future_reset}, + ) + + with open(_state_path()) as f: + state = json.load(f) + assert state["reset_at"] == pytest.approx(future_reset, abs=1) + + def test_falls_back_to_default_cooldown(self, rate_guard_env): + from agent.nous_rate_guard import record_nous_rate_limit, _state_path + + record_nous_rate_limit(headers=None) + + with open(_state_path()) as f: + state = json.load(f) + # Default is 300 seconds (5 minutes) + assert state["reset_seconds"] == pytest.approx(300, abs=2) + + def test_custom_default_cooldown(self, rate_guard_env): + from agent.nous_rate_guard import record_nous_rate_limit, _state_path + + record_nous_rate_limit(headers=None, default_cooldown=120.0) + + with open(_state_path()) as f: + state = json.load(f) + assert state["reset_seconds"] == pytest.approx(120, abs=2) + + def test_creates_directory_if_missing(self, rate_guard_env): + from agent.nous_rate_guard import record_nous_rate_limit, _state_path + + record_nous_rate_limit(headers={"retry-after": "10"}) + assert os.path.exists(_state_path()) + + +class TestNousRateLimitRemaining: + """Test checking remaining rate limit time.""" + + def test_returns_none_when_no_file(self, rate_guard_env): + from agent.nous_rate_guard import nous_rate_limit_remaining + + assert nous_rate_limit_remaining() is None + + def test_returns_remaining_seconds_when_active(self, rate_guard_env): + from agent.nous_rate_guard import record_nous_rate_limit, nous_rate_limit_remaining + + record_nous_rate_limit(headers={"x-ratelimit-reset-requests-1h": "600"}) + remaining = nous_rate_limit_remaining() + assert remaining is not None + assert 595 < remaining <= 605 # ~600 seconds, allowing for test execution time + + def test_returns_none_when_expired(self, rate_guard_env): + from agent.nous_rate_guard import nous_rate_limit_remaining, _state_path + + # Write an already-expired state + state_dir = os.path.dirname(_state_path()) + os.makedirs(state_dir, exist_ok=True) + with open(_state_path(), "w") as f: + json.dump({"reset_at": time.time() - 10, "recorded_at": time.time() - 100}, f) + + assert nous_rate_limit_remaining() is None + # File should be cleaned up + assert not os.path.exists(_state_path()) + + def test_handles_corrupt_file(self, rate_guard_env): + from agent.nous_rate_guard import nous_rate_limit_remaining, _state_path + + state_dir = os.path.dirname(_state_path()) + os.makedirs(state_dir, exist_ok=True) + with open(_state_path(), "w") as f: + f.write("not valid json{{{") + + assert nous_rate_limit_remaining() is None + + +class TestClearNousRateLimit: + """Test clearing rate limit state.""" + + def test_clears_existing_file(self, rate_guard_env): + from agent.nous_rate_guard import ( + record_nous_rate_limit, + clear_nous_rate_limit, + nous_rate_limit_remaining, + _state_path, + ) + + record_nous_rate_limit(headers={"retry-after": "600"}) + assert nous_rate_limit_remaining() is not None + + clear_nous_rate_limit() + assert nous_rate_limit_remaining() is None + assert not os.path.exists(_state_path()) + + def test_clear_when_no_file(self, rate_guard_env): + from agent.nous_rate_guard import clear_nous_rate_limit + + # Should not raise + clear_nous_rate_limit() + + +class TestFormatRemaining: + """Test human-readable duration formatting.""" + + def test_seconds(self): + from agent.nous_rate_guard import format_remaining + + assert format_remaining(30) == "30s" + + def test_minutes(self): + from agent.nous_rate_guard import format_remaining + + assert format_remaining(125) == "2m 5s" + + def test_exact_minutes(self): + from agent.nous_rate_guard import format_remaining + + assert format_remaining(120) == "2m" + + def test_hours(self): + from agent.nous_rate_guard import format_remaining + + assert format_remaining(3720) == "1h 2m" + + +class TestParseResetSeconds: + """Test header parsing for reset times.""" + + def test_case_insensitive_headers(self, rate_guard_env): + from agent.nous_rate_guard import _parse_reset_seconds + + headers = {"X-Ratelimit-Reset-Requests-1h": "1200"} + assert _parse_reset_seconds(headers) == 1200.0 + + def test_returns_none_for_empty_headers(self): + from agent.nous_rate_guard import _parse_reset_seconds + + assert _parse_reset_seconds(None) is None + assert _parse_reset_seconds({}) is None + + def test_ignores_zero_values(self): + from agent.nous_rate_guard import _parse_reset_seconds + + headers = {"x-ratelimit-reset-requests-1h": "0"} + assert _parse_reset_seconds(headers) is None + + def test_ignores_invalid_values(self): + from agent.nous_rate_guard import _parse_reset_seconds + + headers = {"x-ratelimit-reset-requests-1h": "not-a-number"} + assert _parse_reset_seconds(headers) is None + + +class TestAuxiliaryClientIntegration: + """Test that the auxiliary client respects the rate guard.""" + + def test_try_nous_skips_when_rate_limited(self, rate_guard_env, monkeypatch): + from agent.nous_rate_guard import record_nous_rate_limit + + # Record a rate limit + record_nous_rate_limit(headers={"retry-after": "600"}) + + # Mock _read_nous_auth to return valid creds (would normally succeed) + import agent.auxiliary_client as aux + monkeypatch.setattr(aux, "_read_nous_auth", lambda: { + "access_token": "test-token", + "inference_base_url": "https://api.nous.test/v1", + }) + + result = aux._try_nous() + assert result == (None, None) + + def test_try_nous_works_when_not_rate_limited(self, rate_guard_env, monkeypatch): + import agent.auxiliary_client as aux + + # No rate limit recorded — _try_nous should proceed normally + # (will return None because no real creds, but won't be blocked + # by the rate guard) + monkeypatch.setattr(aux, "_read_nous_auth", lambda: None) + result = aux._try_nous() + assert result == (None, None)