feat(gemini): block free-tier keys at setup + surface guidance on 429 (#15100)

Google AI Studio's free tier (<= 250 req/day for gemini-2.5-flash) is
exhausted in a handful of agent turns, so the setup wizard now refuses
to wire up Gemini when the supplied key is on the free tier, and the
runtime 429 handler appends actionable billing guidance.

Setup-time probe (hermes_cli/main.py):
- `_model_flow_api_key_provider` fires one minimal generateContent call
  when provider_id == 'gemini' and classifies the response as
  free/paid/unknown via x-ratelimit-limit-requests-per-day header or
  429 body containing 'free_tier'.
- Free  -> print block message, refuse to save the provider, return.
- Paid  -> 'Tier check: paid' and proceed.
- Unknown (network/auth error) -> 'could not verify', proceed anyway.

Runtime 429 handler (agent/gemini_native_adapter.py):
- `gemini_http_error` appends billing guidance when the 429 error body
  mentions 'free_tier', catching users who bypass setup by putting
  GOOGLE_API_KEY directly in .env.

Tests: 21 unit tests for the probe + error path, 4 tests for the
setup-flow block. All 67 existing gemini tests still pass.
This commit is contained in:
Teknium 2026-04-24 04:46:17 -07:00 committed by GitHub
parent 346601ca8d
commit 3aa1a41e88
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 463 additions and 0 deletions

View file

@ -44,6 +44,97 @@ def is_native_gemini_base_url(base_url: str) -> bool:
return not normalized.endswith("/openai")
def probe_gemini_tier(
api_key: str,
base_url: str = DEFAULT_GEMINI_BASE_URL,
*,
model: str = "gemini-2.5-flash",
timeout: float = 10.0,
) -> str:
"""Probe a Google AI Studio API key and return its tier.
Returns one of:
- ``"free"`` -- key is on the free tier (unusable with Hermes)
- ``"paid"`` -- key is on a paid tier
- ``"unknown"`` -- probe failed; callers should proceed without blocking.
"""
key = (api_key or "").strip()
if not key:
return "unknown"
normalized_base = str(base_url or DEFAULT_GEMINI_BASE_URL).strip().rstrip("/")
if not normalized_base:
normalized_base = DEFAULT_GEMINI_BASE_URL
if normalized_base.lower().endswith("/openai"):
normalized_base = normalized_base[: -len("/openai")]
url = f"{normalized_base}/models/{model}:generateContent"
payload = {
"contents": [{"role": "user", "parts": [{"text": "hi"}]}],
"generationConfig": {"maxOutputTokens": 1},
}
try:
with httpx.Client(timeout=timeout) as client:
resp = client.post(
url,
params={"key": key},
json=payload,
headers={"Content-Type": "application/json"},
)
except Exception as exc:
logger.debug("probe_gemini_tier: network error: %s", exc)
return "unknown"
headers_lower = {k.lower(): v for k, v in resp.headers.items()}
rpd_header = headers_lower.get("x-ratelimit-limit-requests-per-day")
if rpd_header:
try:
rpd_val = int(rpd_header)
except (TypeError, ValueError):
rpd_val = None
# Published free-tier daily caps (Dec 2025):
# gemini-2.5-pro: 100, gemini-2.5-flash: 250, flash-lite: 1000
# Tier 1 starts at ~1500+ for Flash. We treat <= 1000 as free.
if rpd_val is not None and rpd_val <= 1000:
return "free"
if rpd_val is not None and rpd_val > 1000:
return "paid"
if resp.status_code == 429:
body_text = ""
try:
body_text = resp.text or ""
except Exception:
body_text = ""
if "free_tier" in body_text.lower():
return "free"
return "paid"
if 200 <= resp.status_code < 300:
return "paid"
return "unknown"
def is_free_tier_quota_error(error_message: str) -> bool:
"""Return True when a Gemini 429 message indicates free-tier exhaustion."""
if not error_message:
return False
return "free_tier" in error_message.lower()
_FREE_TIER_GUIDANCE = (
"\n\nYour Google API key is on the free tier (<= 250 requests/day for "
"gemini-2.5-flash). Hermes typically makes 3-10 API calls per user turn, "
"so the free tier is exhausted in a handful of messages and cannot sustain "
"an agent session. Enable billing on your Google Cloud project and "
"regenerate the key in a billing-enabled project: "
"https://aistudio.google.com/apikey"
)
class GeminiAPIError(Exception):
"""Error shape compatible with Hermes retry/error classification."""
@ -650,6 +741,12 @@ def gemini_http_error(response: httpx.Response) -> GeminiAPIError:
else:
message = f"Gemini returned HTTP {status}: {body_text[:500]}"
# Free-tier quota exhaustion -> append actionable guidance so users who
# bypassed the setup wizard (direct GOOGLE_API_KEY in .env) still learn
# that the free tier cannot sustain an agent session.
if status == 429 and is_free_tier_quota_error(err_message or body_text):
message = message + _FREE_TIER_GUIDANCE
return GeminiAPIError(
message,
code=code,

View file

@ -3930,12 +3930,71 @@ def _model_flow_api_key_provider(config, provider_id, current_model=""):
print("Cancelled.")
return
save_env_value(key_env, new_key)
existing_key = new_key
print("API key saved.")
print()
else:
print(f" {pconfig.name} API key: {existing_key[:8]}... ✓")
print()
# Gemini free-tier gate: free-tier daily quotas (<= 250 RPD for Flash)
# are exhausted in a handful of agent turns, so refuse to wire up the
# provider with a free-tier key. Probe is best-effort; network or auth
# errors fall through without blocking.
if provider_id == "gemini" and existing_key:
try:
from agent.gemini_native_adapter import probe_gemini_tier
except Exception:
probe_gemini_tier = None
if probe_gemini_tier is not None:
print(" Checking Gemini API tier...")
probe_base = (
(get_env_value(base_url_env) if base_url_env else "")
or os.getenv(base_url_env or "", "")
or pconfig.inference_base_url
)
tier = probe_gemini_tier(existing_key, probe_base)
if tier == "free":
print()
print(
"❌ This Google API key is on the free tier "
"(<= 250 requests/day for gemini-2.5-flash)."
)
print(
" Hermes typically makes 3-10 API calls per user turn "
"(tool iterations + auxiliary tasks),"
)
print(
" so the free tier is exhausted after a handful of "
"messages and cannot sustain"
)
print(" an agent session.")
print()
print(
" To use Gemini with Hermes, enable billing on your "
"Google Cloud project and regenerate"
)
print(
" the key in a billing-enabled project: "
"https://aistudio.google.com/apikey"
)
print()
print(
" Alternatives with workable free usage: DeepSeek, "
"OpenRouter (free models), Groq, Nous."
)
print()
print("Not saving Gemini as the default provider.")
return
if tier == "paid":
print(" Tier check: paid ✓")
else:
# "unknown" -- network issue, auth problem, unexpected response.
# Don't block; the runtime 429 handler will surface free-tier
# guidance if the key turns out to be free tier.
print(" Tier check: could not verify (proceeding anyway).")
print()
# Optional base URL override
current_base = ""
if base_url_env:

View file

@ -0,0 +1,166 @@
"""Tests for Gemini free-tier detection and blocking."""
from __future__ import annotations
from unittest.mock import MagicMock, patch
import pytest
from agent.gemini_native_adapter import (
gemini_http_error,
is_free_tier_quota_error,
probe_gemini_tier,
)
def _mock_response(status: int, headers: dict | None = None, text: str = "") -> MagicMock:
resp = MagicMock()
resp.status_code = status
resp.headers = headers or {}
resp.text = text
return resp
def _run_probe(resp: MagicMock) -> str:
with patch("agent.gemini_native_adapter.httpx.Client") as MC:
inst = MagicMock()
inst.post.return_value = resp
MC.return_value.__enter__.return_value = inst
return probe_gemini_tier("fake-key")
class TestProbeGeminiTier:
"""Verify the tier probe classifies keys correctly."""
def test_free_tier_via_rpd_header_flash(self):
# gemini-2.5-flash free tier: 250 RPD
resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "250"}, "{}")
assert _run_probe(resp) == "free"
def test_free_tier_via_rpd_header_pro(self):
# gemini-2.5-pro free tier: 100 RPD
resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "100"}, "{}")
assert _run_probe(resp) == "free"
def test_free_tier_via_rpd_header_flash_lite(self):
# flash-lite free tier: 1000 RPD (our upper bound)
resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "1000"}, "{}")
assert _run_probe(resp) == "free"
def test_paid_tier_via_rpd_header(self):
# Tier 1 starts at 1500+ RPD
resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "1500"}, "{}")
assert _run_probe(resp) == "paid"
def test_free_tier_via_429_body(self):
body = (
'{"error":{"code":429,"message":"Quota exceeded for metric: '
'generativelanguage.googleapis.com/generate_content_free_tier_requests, '
'limit: 20"}}'
)
resp = _mock_response(429, {}, body)
assert _run_probe(resp) == "free"
def test_paid_429_has_no_free_tier_marker(self):
body = '{"error":{"code":429,"message":"rate limited"}}'
resp = _mock_response(429, {}, body)
assert _run_probe(resp) == "paid"
def test_successful_200_without_rpd_header_is_paid(self):
resp = _mock_response(200, {}, '{"candidates":[]}')
assert _run_probe(resp) == "paid"
def test_401_returns_unknown(self):
resp = _mock_response(401, {}, '{"error":{"code":401}}')
assert _run_probe(resp) == "unknown"
def test_404_returns_unknown(self):
resp = _mock_response(404, {}, '{"error":{"code":404}}')
assert _run_probe(resp) == "unknown"
def test_network_error_returns_unknown(self):
with patch(
"agent.gemini_native_adapter.httpx.Client",
side_effect=Exception("dns failure"),
):
assert probe_gemini_tier("fake-key") == "unknown"
def test_empty_key_returns_unknown(self):
assert probe_gemini_tier("") == "unknown"
assert probe_gemini_tier(" ") == "unknown"
assert probe_gemini_tier(None) == "unknown" # type: ignore[arg-type]
def test_malformed_rpd_header_falls_through(self):
# Non-integer header value shouldn't crash; 200 with no usable header -> paid.
resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "abc"}, "{}")
assert _run_probe(resp) == "paid"
def test_openai_compat_suffix_stripped(self):
"""Base URLs ending in /openai get normalized to the native endpoint."""
resp = _mock_response(200, {"x-ratelimit-limit-requests-per-day": "1500"}, "{}")
with patch("agent.gemini_native_adapter.httpx.Client") as MC:
inst = MagicMock()
inst.post.return_value = resp
MC.return_value.__enter__.return_value = inst
probe_gemini_tier(
"fake",
"https://generativelanguage.googleapis.com/v1beta/openai",
)
# Verify the post URL does NOT contain /openai
called_url = inst.post.call_args[0][0]
assert "/openai/" not in called_url
assert called_url.endswith(":generateContent")
class TestIsFreeTierQuotaError:
def test_detects_free_tier_marker(self):
assert is_free_tier_quota_error(
"Quota exceeded for metric: generate_content_free_tier_requests"
)
def test_case_insensitive(self):
assert is_free_tier_quota_error("QUOTA: FREE_TIER_REQUESTS")
def test_no_free_tier_marker(self):
assert not is_free_tier_quota_error("rate limited")
def test_empty_string(self):
assert not is_free_tier_quota_error("")
def test_none(self):
assert not is_free_tier_quota_error(None) # type: ignore[arg-type]
class TestGeminiHttpErrorFreeTierGuidance:
"""gemini_http_error should append free-tier guidance for free-tier 429s."""
class _FakeResp:
def __init__(self, status: int, text: str):
self.status_code = status
self.headers: dict = {}
self.text = text
def test_free_tier_429_appends_guidance(self):
body = (
'{"error":{"code":429,"message":"Quota exceeded for metric: '
"generativelanguage.googleapis.com/generate_content_free_tier_requests, "
'limit: 20","status":"RESOURCE_EXHAUSTED"}}'
)
err = gemini_http_error(self._FakeResp(429, body))
msg = str(err)
assert "free tier" in msg.lower()
assert "aistudio.google.com/apikey" in msg
def test_paid_429_has_no_billing_url(self):
body = '{"error":{"code":429,"message":"Rate limited","status":"RESOURCE_EXHAUSTED"}}'
err = gemini_http_error(self._FakeResp(429, body))
assert "aistudio.google.com/apikey" not in str(err)
def test_non_429_has_no_billing_url(self):
body = '{"error":{"code":400,"message":"bad request","status":"INVALID_ARGUMENT"}}'
err = gemini_http_error(self._FakeResp(400, body))
assert "aistudio.google.com/apikey" not in str(err)
def test_401_has_no_billing_url(self):
body = '{"error":{"code":401,"message":"API key invalid","status":"UNAUTHENTICATED"}}'
err = gemini_http_error(self._FakeResp(401, body))
assert "aistudio.google.com/apikey" not in str(err)

View file

@ -0,0 +1,141 @@
"""Tests for the Gemini free-tier block in the setup wizard."""
from __future__ import annotations
from unittest.mock import patch
import pytest
@pytest.fixture
def config_home(tmp_path, monkeypatch):
"""Isolated HERMES_HOME with an empty config."""
home = tmp_path / "hermes"
home.mkdir()
(home / "config.yaml").write_text("model: some-old-model\n")
(home / ".env").write_text("")
monkeypatch.setenv("HERMES_HOME", str(home))
# Clear any ambient env that could alter provider resolution
for var in (
"HERMES_MODEL",
"LLM_MODEL",
"HERMES_INFERENCE_PROVIDER",
"OPENAI_BASE_URL",
"OPENAI_API_KEY",
"GEMINI_BASE_URL",
):
monkeypatch.delenv(var, raising=False)
return home
class TestGeminiSetupFreeTierBlock:
"""_model_flow_api_key_provider should refuse to wire up a free-tier Gemini key."""
def test_free_tier_key_is_blocked(self, config_home, monkeypatch, capsys):
"""Free-tier probe result -> provider is NOT saved, message is printed."""
monkeypatch.setenv("GOOGLE_API_KEY", "fake-free-tier-key")
from hermes_cli.main import _model_flow_api_key_provider
from hermes_cli.config import load_config
# Mock the probe to claim this is a free-tier key
with patch(
"agent.gemini_native_adapter.probe_gemini_tier",
return_value="free",
), patch(
"hermes_cli.auth._prompt_model_selection",
return_value="gemini-2.5-flash",
), patch(
"hermes_cli.auth.deactivate_provider",
), patch("builtins.input", return_value=""):
_model_flow_api_key_provider(load_config(), "gemini", "old-model")
output = capsys.readouterr().out
assert "free tier" in output.lower()
assert "aistudio.google.com/apikey" in output
assert "Not saving Gemini as the default provider" in output
# Config must NOT show gemini as the provider
import yaml
cfg = yaml.safe_load((config_home / "config.yaml").read_text()) or {}
model = cfg.get("model")
if isinstance(model, dict):
assert model.get("provider") != "gemini", (
"Free-tier key should not have saved gemini as provider"
)
# If still a string, also fine — nothing was saved
def test_paid_tier_key_proceeds(self, config_home, monkeypatch, capsys):
"""Paid-tier probe result -> provider IS saved normally."""
monkeypatch.setenv("GOOGLE_API_KEY", "fake-paid-tier-key")
from hermes_cli.main import _model_flow_api_key_provider
from hermes_cli.config import load_config
with patch(
"agent.gemini_native_adapter.probe_gemini_tier",
return_value="paid",
), patch(
"hermes_cli.auth._prompt_model_selection",
return_value="gemini-2.5-flash",
), patch(
"hermes_cli.auth.deactivate_provider",
), patch("builtins.input", return_value=""):
_model_flow_api_key_provider(load_config(), "gemini", "old-model")
output = capsys.readouterr().out
assert "paid" in output.lower()
assert "Not saving Gemini" not in output
import yaml
cfg = yaml.safe_load((config_home / "config.yaml").read_text()) or {}
model = cfg.get("model")
assert isinstance(model, dict), f"model should be dict, got {type(model)}"
assert model.get("provider") == "gemini"
assert model.get("default") == "gemini-2.5-flash"
def test_unknown_tier_proceeds_with_warning(self, config_home, monkeypatch, capsys):
"""Probe returning 'unknown' (network/auth error) -> proceed without blocking."""
monkeypatch.setenv("GOOGLE_API_KEY", "fake-key")
from hermes_cli.main import _model_flow_api_key_provider
from hermes_cli.config import load_config
with patch(
"agent.gemini_native_adapter.probe_gemini_tier",
return_value="unknown",
), patch(
"hermes_cli.auth._prompt_model_selection",
return_value="gemini-2.5-flash",
), patch(
"hermes_cli.auth.deactivate_provider",
), patch("builtins.input", return_value=""):
_model_flow_api_key_provider(load_config(), "gemini", "old-model")
output = capsys.readouterr().out
assert "could not verify" in output.lower()
assert "Not saving Gemini" not in output
import yaml
cfg = yaml.safe_load((config_home / "config.yaml").read_text()) or {}
model = cfg.get("model")
assert isinstance(model, dict)
assert model.get("provider") == "gemini"
def test_non_gemini_provider_skips_probe(self, config_home, monkeypatch):
"""Probe must only run for provider_id == 'gemini', not for other providers."""
monkeypatch.setenv("DEEPSEEK_API_KEY", "fake-key")
from hermes_cli.main import _model_flow_api_key_provider
from hermes_cli.config import load_config
with patch(
"agent.gemini_native_adapter.probe_gemini_tier",
) as mock_probe, patch(
"hermes_cli.auth._prompt_model_selection",
return_value="deepseek-chat",
), patch(
"hermes_cli.auth.deactivate_provider",
), patch("builtins.input", return_value=""):
_model_flow_api_key_provider(load_config(), "deepseek", "old-model")
mock_probe.assert_not_called()