feat: capture provider rate limit headers and show in /usage (#6541)

Parse x-ratelimit-* headers from inference API responses (Nous Portal,
OpenRouter, OpenAI-compatible) and display them in the /usage command.

- New agent/rate_limit_tracker.py: parse 12 rate limit headers (RPM/RPH/
  TPM/TPH limits, remaining, reset timers), format as progress bars (CLI)
  or compact one-liner (gateway)
- Hook into streaming path in run_agent.py: stream.response.headers is
  available on the OpenAI SDK Stream object before chunks are consumed
- CLI /usage: appends rate limit section with progress bars + warnings
  when any bucket exceeds 80%
- Gateway /usage: appends compact rate limit summary
- 24 unit tests covering parsing, formatting, edge cases

Headers captured per response:
  x-ratelimit-{limit,remaining,reset}-{requests,tokens}{,-1h}

Example CLI display:
  Nous Rate Limits (captured just now):
    Requests/min [░░░░░░░░░░░░░░░░░░░░]  0.1%  1/800 used  (799 left, resets in 59s)
    Tokens/hr    [░░░░░░░░░░░░░░░░░░░░]  0.0%  49/336.0M   (336.0M left, resets in 52m)
This commit is contained in:
Teknium 2026-04-09 03:43:14 -07:00 committed by GitHub
parent 3c8ec7037c
commit 8dfc96dbbb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 519 additions and 15 deletions

242
agent/rate_limit_tracker.py Normal file
View file

@ -0,0 +1,242 @@
"""Rate limit tracking for inference API responses.
Captures x-ratelimit-* headers from provider responses and provides
formatted display for the /usage slash command. Currently supports
the Nous Portal header format (also used by OpenRouter and OpenAI-compatible
APIs that follow the same convention).
Header schema (12 headers total):
x-ratelimit-limit-requests RPM cap
x-ratelimit-limit-requests-1h RPH cap
x-ratelimit-limit-tokens TPM cap
x-ratelimit-limit-tokens-1h TPH cap
x-ratelimit-remaining-requests requests left in minute window
x-ratelimit-remaining-requests-1h requests left in hour window
x-ratelimit-remaining-tokens tokens left in minute window
x-ratelimit-remaining-tokens-1h tokens left in hour window
x-ratelimit-reset-requests seconds until minute request window resets
x-ratelimit-reset-requests-1h seconds until hour request window resets
x-ratelimit-reset-tokens seconds until minute token window resets
x-ratelimit-reset-tokens-1h seconds until hour token window resets
"""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import Any, Dict, Mapping, Optional
@dataclass
class RateLimitBucket:
"""One rate-limit window (e.g. requests per minute)."""
limit: int = 0
remaining: int = 0
reset_seconds: float = 0.0
captured_at: float = 0.0 # time.time() when this was captured
@property
def used(self) -> int:
return max(0, self.limit - self.remaining)
@property
def usage_pct(self) -> float:
if self.limit <= 0:
return 0.0
return (self.used / self.limit) * 100.0
@property
def remaining_seconds_now(self) -> float:
"""Estimated seconds remaining until reset, adjusted for elapsed time."""
elapsed = time.time() - self.captured_at
return max(0.0, self.reset_seconds - elapsed)
@dataclass
class RateLimitState:
"""Full rate-limit state parsed from response headers."""
requests_min: RateLimitBucket = field(default_factory=RateLimitBucket)
requests_hour: RateLimitBucket = field(default_factory=RateLimitBucket)
tokens_min: RateLimitBucket = field(default_factory=RateLimitBucket)
tokens_hour: RateLimitBucket = field(default_factory=RateLimitBucket)
captured_at: float = 0.0 # when the headers were captured
provider: str = ""
@property
def has_data(self) -> bool:
return self.captured_at > 0
@property
def age_seconds(self) -> float:
if not self.has_data:
return float("inf")
return time.time() - self.captured_at
def _safe_int(value: Any, default: int = 0) -> int:
try:
return int(float(value))
except (TypeError, ValueError):
return default
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
def parse_rate_limit_headers(
headers: Mapping[str, str],
provider: str = "",
) -> Optional[RateLimitState]:
"""Parse x-ratelimit-* headers into a RateLimitState.
Returns None if no rate limit headers are present.
"""
# Quick check: at least one rate limit header must exist
has_any = any(k.lower().startswith("x-ratelimit-") for k in headers)
if not has_any:
return None
now = time.time()
def _bucket(resource: str, suffix: str = "") -> RateLimitBucket:
# e.g. resource="requests", suffix="" -> per-minute
# resource="tokens", suffix="-1h" -> per-hour
tag = f"{resource}{suffix}"
return RateLimitBucket(
limit=_safe_int(headers.get(f"x-ratelimit-limit-{tag}")),
remaining=_safe_int(headers.get(f"x-ratelimit-remaining-{tag}")),
reset_seconds=_safe_float(headers.get(f"x-ratelimit-reset-{tag}")),
captured_at=now,
)
return RateLimitState(
requests_min=_bucket("requests"),
requests_hour=_bucket("requests", "-1h"),
tokens_min=_bucket("tokens"),
tokens_hour=_bucket("tokens", "-1h"),
captured_at=now,
provider=provider,
)
# ── Formatting ──────────────────────────────────────────────────────────
def _fmt_count(n: int) -> str:
"""Human-friendly number: 7999856 -> '8.0M', 33599 -> '33.6K', 799 -> '799'."""
if n >= 1_000_000:
return f"{n / 1_000_000:.1f}M"
if n >= 10_000:
return f"{n / 1_000:.1f}K"
if n >= 1_000:
return f"{n / 1_000:.1f}K"
return str(n)
def _fmt_seconds(seconds: float) -> str:
"""Seconds -> human-friendly duration: '58s', '2m 14s', '58m 57s', '1h 2m'."""
s = max(0, int(seconds))
if s < 60:
return f"{s}s"
if s < 3600:
m, sec = divmod(s, 60)
return f"{m}m {sec}s" if sec else f"{m}m"
h, remainder = divmod(s, 3600)
m = remainder // 60
return f"{h}h {m}m" if m else f"{h}h"
def _bar(pct: float, width: int = 20) -> str:
"""ASCII progress bar: [████████░░░░░░░░░░░░] 40%."""
filled = int(pct / 100.0 * width)
filled = max(0, min(width, filled))
empty = width - filled
return f"[{'' * filled}{'' * empty}]"
def _bucket_line(label: str, bucket: RateLimitBucket, label_width: int = 14) -> str:
"""Format one bucket as a single line."""
if bucket.limit <= 0:
return f" {label:<{label_width}} (no data)"
pct = bucket.usage_pct
used = _fmt_count(bucket.used)
limit = _fmt_count(bucket.limit)
remaining = _fmt_count(bucket.remaining)
reset = _fmt_seconds(bucket.remaining_seconds_now)
bar = _bar(pct)
return f" {label:<{label_width}} {bar} {pct:5.1f}% {used}/{limit} used ({remaining} left, resets in {reset})"
def format_rate_limit_display(state: RateLimitState) -> str:
"""Format rate limit state for terminal/chat display."""
if not state.has_data:
return "No rate limit data yet — make an API request first."
age = state.age_seconds
if age < 5:
freshness = "just now"
elif age < 60:
freshness = f"{int(age)}s ago"
else:
freshness = f"{_fmt_seconds(age)} ago"
provider_label = state.provider.title() if state.provider else "Provider"
lines = [
f"{provider_label} Rate Limits (captured {freshness}):",
"",
_bucket_line("Requests/min", state.requests_min),
_bucket_line("Requests/hr", state.requests_hour),
"",
_bucket_line("Tokens/min", state.tokens_min),
_bucket_line("Tokens/hr", state.tokens_hour),
]
# Add warnings if any bucket is getting hot
warnings = []
for label, bucket in [
("requests/min", state.requests_min),
("requests/hr", state.requests_hour),
("tokens/min", state.tokens_min),
("tokens/hr", state.tokens_hour),
]:
if bucket.limit > 0 and bucket.usage_pct >= 80:
reset = _fmt_seconds(bucket.remaining_seconds_now)
warnings.append(f"{label} at {bucket.usage_pct:.0f}% — resets in {reset}")
if warnings:
lines.append("")
lines.extend(warnings)
return "\n".join(lines)
def format_rate_limit_compact(state: RateLimitState) -> str:
"""One-line compact summary for status bars / gateway messages."""
if not state.has_data:
return "No rate limit data."
rm = state.requests_min
tm = state.tokens_min
rh = state.requests_hour
th = state.tokens_hour
parts = []
if rm.limit > 0:
parts.append(f"RPM: {rm.remaining}/{rm.limit}")
if rh.limit > 0:
parts.append(f"RPH: {_fmt_count(rh.remaining)}/{_fmt_count(rh.limit)} (resets {_fmt_seconds(rh.remaining_seconds_now)})")
if tm.limit > 0:
parts.append(f"TPM: {_fmt_count(tm.remaining)}/{_fmt_count(tm.limit)}")
if th.limit > 0:
parts.append(f"TPH: {_fmt_count(th.remaining)}/{_fmt_count(th.limit)} (resets {_fmt_seconds(th.remaining_seconds_now)})")
return " | ".join(parts)

23
cli.py
View file

@ -5409,12 +5409,27 @@ class HermesCLI:
print(f" ❌ Compression failed: {e}")
def _show_usage(self):
"""Show cumulative token usage for the current session."""
"""Show rate limits (if available) and session token usage."""
if not self.agent:
print("(._.) No active agent -- send a message first.")
return
agent = self.agent
calls = agent.session_api_calls
if calls == 0:
print("(._.) No API calls made yet in this session.")
return
# ── Rate limits (shown first when available) ────────────────
rl_state = agent.get_rate_limit_state()
if rl_state and rl_state.has_data:
from agent.rate_limit_tracker import format_rate_limit_display
print()
print(format_rate_limit_display(rl_state))
print()
# ── Session token usage ─────────────────────────────────────
input_tokens = getattr(agent, "session_input_tokens", 0) or 0
output_tokens = getattr(agent, "session_output_tokens", 0) or 0
cache_read_tokens = getattr(agent, "session_cache_read_tokens", 0) or 0
@ -5422,13 +5437,7 @@ class HermesCLI:
prompt = agent.session_prompt_tokens
completion = agent.session_completion_tokens
total = agent.session_total_tokens
calls = agent.session_api_calls
if calls == 0:
print("(._.) No API calls made yet in this session.")
return
# Current context window state
compressor = agent.context_compressor
last_prompt = compressor.last_prompt_tokens
ctx_len = compressor.context_length

View file

@ -5280,19 +5280,28 @@ class GatewayRunner:
agent = self._running_agents.get(session_key)
if agent and hasattr(agent, "session_total_tokens") and agent.session_api_calls > 0:
lines = [
"📊 **Session Token Usage**",
f"Prompt (input): {agent.session_prompt_tokens:,}",
f"Completion (output): {agent.session_completion_tokens:,}",
f"Total: {agent.session_total_tokens:,}",
f"API calls: {agent.session_api_calls}",
]
lines = []
# Rate limits first (when available from provider headers)
rl_state = agent.get_rate_limit_state()
if rl_state and rl_state.has_data:
from agent.rate_limit_tracker import format_rate_limit_compact
lines.append(f"⏱️ **Rate Limits:** {format_rate_limit_compact(rl_state)}")
lines.append("")
# Session token usage
lines.append("📊 **Session Token Usage**")
lines.append(f"Prompt (input): {agent.session_prompt_tokens:,}")
lines.append(f"Completion (output): {agent.session_completion_tokens:,}")
lines.append(f"Total: {agent.session_total_tokens:,}")
lines.append(f"API calls: {agent.session_api_calls}")
ctx = agent.context_compressor
if ctx.last_prompt_tokens:
pct = min(100, ctx.last_prompt_tokens / ctx.context_length * 100) if ctx.context_length else 0
lines.append(f"Context: {ctx.last_prompt_tokens:,} / {ctx.context_length:,} ({pct:.0f}%)")
if ctx.compression_count:
lines.append(f"Compressions: {ctx.compression_count}")
return "\n".join(lines)
# No running agent -- check session history for a rough count

View file

@ -129,7 +129,7 @@ COMMAND_REGISTRY: list[CommandDef] = [
CommandDef("commands", "Browse all commands and skills (paginated)", "Info",
gateway_only=True, args_hint="[page]"),
CommandDef("help", "Show available commands", "Info"),
CommandDef("usage", "Show token usage for the current session", "Info"),
CommandDef("usage", "Show token usage and rate limits for the current session", "Info"),
CommandDef("insights", "Show usage insights and analytics", "Info",
args_hint="[days]"),
CommandDef("platforms", "Show gateway/messaging platform status", "Info",

View file

@ -692,6 +692,10 @@ class AIAgent:
self._current_tool: str | None = None
self._api_call_count: int = 0
# Rate limit tracking — updated from x-ratelimit-* response headers
# after each API call. Accessed by /usage slash command.
self._rate_limit_state: Optional["RateLimitState"] = None
# Centralized logging — agent.log (INFO+) and errors.log (WARNING+)
# both live under ~/.hermes/logs/. Idempotent, so gateway mode
# (which creates a new AIAgent per message) won't duplicate handlers.
@ -2545,6 +2549,29 @@ class AIAgent:
self._last_activity_ts = time.time()
self._last_activity_desc = desc
def _capture_rate_limits(self, http_response: Any) -> None:
"""Parse x-ratelimit-* headers from an HTTP response and cache the state.
Called after each streaming API call. The httpx Response object is
available on the OpenAI SDK Stream via ``stream.response``.
"""
if http_response is None:
return
headers = getattr(http_response, "headers", None)
if not headers:
return
try:
from agent.rate_limit_tracker import parse_rate_limit_headers
state = parse_rate_limit_headers(headers, provider=self.provider)
if state is not None:
self._rate_limit_state = state
except Exception:
pass # Never let header parsing break the agent loop
def get_rate_limit_state(self):
"""Return the last captured RateLimitState, or None."""
return self._rate_limit_state
def get_activity_summary(self) -> dict:
"""Return a snapshot of the agent's current activity for diagnostics.
@ -4399,6 +4426,11 @@ class AIAgent:
self._touch_activity("waiting for provider response (streaming)")
stream = request_client_holder["client"].chat.completions.create(**stream_kwargs)
# Capture rate limit headers from the initial HTTP response.
# The OpenAI SDK Stream object exposes the underlying httpx
# response via .response before any chunks are consumed.
self._capture_rate_limits(getattr(stream, "response", None))
content_parts: list = []
tool_calls_acc: dict = {}
tool_gen_notified: set = set()

View file

@ -0,0 +1,212 @@
"""Tests for agent.rate_limit_tracker — header parsing and formatting."""
import time
import pytest
from agent.rate_limit_tracker import (
RateLimitBucket,
RateLimitState,
parse_rate_limit_headers,
format_rate_limit_display,
format_rate_limit_compact,
_fmt_count,
_fmt_seconds,
_bar,
)
# ── Sample headers from Nous inference API ──────────────────────────────
NOUS_HEADERS = {
"x-ratelimit-limit-requests": "800",
"x-ratelimit-limit-requests-1h": "33600",
"x-ratelimit-limit-tokens": "8000000",
"x-ratelimit-limit-tokens-1h": "336000000",
"x-ratelimit-remaining-requests": "795",
"x-ratelimit-remaining-requests-1h": "33590",
"x-ratelimit-remaining-tokens": "7999500",
"x-ratelimit-remaining-tokens-1h": "335999000",
"x-ratelimit-reset-requests": "45.5",
"x-ratelimit-reset-requests-1h": "3500.0",
"x-ratelimit-reset-tokens": "42.3",
"x-ratelimit-reset-tokens-1h": "3490.0",
}
class TestParseHeaders:
def test_basic_parsing(self):
state = parse_rate_limit_headers(NOUS_HEADERS, provider="nous")
assert state is not None
assert state.provider == "nous"
assert state.has_data
assert state.requests_min.limit == 800
assert state.requests_min.remaining == 795
assert state.requests_min.reset_seconds == 45.5
assert state.requests_hour.limit == 33600
assert state.requests_hour.remaining == 33590
assert state.tokens_min.limit == 8000000
assert state.tokens_min.remaining == 7999500
assert state.tokens_hour.limit == 336000000
assert state.tokens_hour.remaining == 335999000
assert state.tokens_hour.reset_seconds == 3490.0
def test_no_headers(self):
state = parse_rate_limit_headers({})
assert state is None
def test_partial_headers(self):
headers = {
"x-ratelimit-limit-requests": "100",
"x-ratelimit-remaining-requests": "50",
}
state = parse_rate_limit_headers(headers)
assert state is not None
assert state.requests_min.limit == 100
assert state.requests_min.remaining == 50
# Missing fields default to 0
assert state.tokens_min.limit == 0
def test_non_rate_limit_headers_ignored(self):
headers = {
"content-type": "application/json",
"server": "nginx",
}
state = parse_rate_limit_headers(headers)
assert state is None
def test_malformed_values(self):
headers = {
"x-ratelimit-limit-requests": "not-a-number",
"x-ratelimit-remaining-requests": "",
"x-ratelimit-reset-requests": "abc",
}
state = parse_rate_limit_headers(headers)
assert state is not None
assert state.requests_min.limit == 0
assert state.requests_min.remaining == 0
assert state.requests_min.reset_seconds == 0.0
class TestBucket:
def test_used(self):
b = RateLimitBucket(limit=800, remaining=795, reset_seconds=45.0, captured_at=time.time())
assert b.used == 5
def test_usage_pct(self):
b = RateLimitBucket(limit=100, remaining=20, reset_seconds=30.0, captured_at=time.time())
assert b.usage_pct == pytest.approx(80.0)
def test_usage_pct_zero_limit(self):
b = RateLimitBucket(limit=0, remaining=0)
assert b.usage_pct == 0.0
def test_remaining_seconds_now(self):
now = time.time()
b = RateLimitBucket(limit=800, remaining=795, reset_seconds=60.0, captured_at=now - 10)
# ~50 seconds should remain
assert 49 <= b.remaining_seconds_now <= 51
def test_remaining_seconds_expired(self):
b = RateLimitBucket(limit=800, remaining=795, reset_seconds=30.0, captured_at=time.time() - 60)
assert b.remaining_seconds_now == 0.0
class TestFormatting:
def test_fmt_count_millions(self):
assert _fmt_count(8000000) == "8.0M"
assert _fmt_count(336000000) == "336.0M"
def test_fmt_count_thousands(self):
assert _fmt_count(33600) == "33.6K"
assert _fmt_count(1500) == "1.5K"
def test_fmt_count_small(self):
assert _fmt_count(800) == "800"
assert _fmt_count(0) == "0"
def test_fmt_seconds_short(self):
assert _fmt_seconds(45) == "45s"
assert _fmt_seconds(0) == "0s"
def test_fmt_seconds_minutes(self):
assert _fmt_seconds(125) == "2m 5s"
assert _fmt_seconds(120) == "2m"
def test_fmt_seconds_hours(self):
assert _fmt_seconds(3660) == "1h 1m"
assert _fmt_seconds(3600) == "1h"
def test_bar(self):
bar = _bar(50.0, width=10)
assert bar == "[█████░░░░░]"
assert _bar(0.0, width=10) == "[░░░░░░░░░░]"
assert _bar(100.0, width=10) == "[██████████]"
def test_format_display_no_data(self):
state = RateLimitState()
result = format_rate_limit_display(state)
assert "No rate limit data" in result
def test_format_display_with_data(self):
state = parse_rate_limit_headers(NOUS_HEADERS, provider="nous")
result = format_rate_limit_display(state)
assert "Nous" in result
assert "Requests/min" in result
assert "Requests/hr" in result
assert "Tokens/min" in result
assert "Tokens/hr" in result
assert "resets in" in result
def test_format_display_warning_on_high_usage(self):
headers = {
**NOUS_HEADERS,
"x-ratelimit-remaining-requests": "50", # 750/800 used = 93.75%
}
state = parse_rate_limit_headers(headers)
result = format_rate_limit_display(state)
assert "" in result
def test_format_compact(self):
state = parse_rate_limit_headers(NOUS_HEADERS, provider="nous")
result = format_rate_limit_compact(state)
assert "RPM:" in result
assert "RPH:" in result
assert "TPM:" in result
assert "TPH:" in result
assert "resets" in result
def test_format_compact_no_data(self):
state = RateLimitState()
result = format_rate_limit_compact(state)
assert "No rate limit data" in result
class TestAgentIntegration:
"""Test that AIAgent captures rate limit state correctly."""
def test_capture_rate_limits_from_headers(self):
"""Simulate the header capture path without a real API call."""
import sys
import os
# Use a mock httpx-like response
class MockResponse:
headers = NOUS_HEADERS
# Import AIAgent minimally
from unittest.mock import MagicMock, patch
# Test the parsing directly
state = parse_rate_limit_headers(MockResponse.headers, provider="nous")
assert state is not None
assert state.requests_min.limit == 800
assert state.tokens_hour.limit == 336000000
def test_capture_rate_limits_none_response(self):
"""_capture_rate_limits should handle None gracefully."""
from agent.rate_limit_tracker import parse_rate_limit_headers
# None should not crash
result = parse_rate_limit_headers({})
assert result is None