mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
fix(xai-oauth): pin inference base_url to x.ai origin (#28952)
XAI_BASE_URL / HERMES_XAI_BASE_URL let users repoint the OAuth-authenticated inference endpoint, but the env override was an unguarded credential-leak vector: a tampered .env or hostile shell init setting XAI_BASE_URL=https://attacker.example/v1 would silently ship the SuperGrok OAuth bearer to a third party on every request. Add _xai_validate_inference_base_url() that pins the host to x.ai or a *.x.ai subdomain and rejects non-HTTPS. On rejection, fall back to the default with a warning rather than raise — a bad env var should not deadlock auth, but should never leak the bearer either. Apply at all three sites that read the env override for xai-oauth: - hermes_cli/auth.py resolve_xai_oauth_runtime_credentials (main path) - hermes_cli/auth.py _xai_oauth_loopback_login (initial login) - agent/auxiliary_client.py _resolve_xai_oauth_for_aux (aux client) E2E validated against four scenarios: attacker.example, lookalike api.x.ai.evil.com, http:// downgrade on api.x.ai, and legit custom.x.ai subdomain (which still resolves correctly). Discovered while comparing against the opencode-grok-auth plugin (github.com/ysnock404/opencode-grok-auth), which highlighted the same guard on the OpenCode side.
This commit is contained in:
parent
c9d5ef28bf
commit
64a9a199bb
3 changed files with 200 additions and 12 deletions
|
|
@ -1307,7 +1307,10 @@ def _resolve_xai_oauth_for_aux() -> Optional[Tuple[str, str]]:
|
|||
with xAI Grok OAuth.
|
||||
"""
|
||||
try:
|
||||
from hermes_cli.auth import DEFAULT_XAI_OAUTH_BASE_URL
|
||||
from hermes_cli.auth import (
|
||||
DEFAULT_XAI_OAUTH_BASE_URL,
|
||||
_xai_validate_inference_base_url,
|
||||
)
|
||||
|
||||
pool = load_pool("xai-oauth")
|
||||
if pool and pool.has_credentials():
|
||||
|
|
@ -1318,13 +1321,13 @@ def _resolve_xai_oauth_for_aux() -> Optional[Tuple[str, str]]:
|
|||
or getattr(entry, "access_token", "")
|
||||
or ""
|
||||
).strip()
|
||||
base_url = str(
|
||||
base_url = _xai_validate_inference_base_url(
|
||||
os.getenv("HERMES_XAI_BASE_URL", "").strip().rstrip("/")
|
||||
or os.getenv("XAI_BASE_URL", "").strip().rstrip("/")
|
||||
or getattr(entry, "runtime_base_url", None)
|
||||
or getattr(entry, "base_url", None)
|
||||
or DEFAULT_XAI_OAUTH_BASE_URL
|
||||
).strip().rstrip("/")
|
||||
or str(getattr(entry, "runtime_base_url", None) or "").strip().rstrip("/")
|
||||
or str(getattr(entry, "base_url", None) or "").strip().rstrip("/"),
|
||||
fallback=DEFAULT_XAI_OAUTH_BASE_URL,
|
||||
)
|
||||
if api_key and base_url:
|
||||
return api_key, base_url
|
||||
except Exception as exc:
|
||||
|
|
|
|||
|
|
@ -3465,6 +3465,62 @@ def _xai_validate_oauth_endpoint(url: str, *, field: str) -> str:
|
|||
return url
|
||||
|
||||
|
||||
def _xai_validate_inference_base_url(value: str, *, fallback: str) -> str:
|
||||
"""Refuse a non-xAI base_url for the OAuth-authenticated inference path.
|
||||
|
||||
The xAI Grok OAuth bearer is a high-value, long-lived credential tied to
|
||||
the user's SuperGrok subscription. ``XAI_BASE_URL`` / ``HERMES_XAI_BASE_URL``
|
||||
let users repoint the inference endpoint (handy for staging or a local
|
||||
proxy), but the env override is also a credential-leak vector: a tampered
|
||||
``.env`` or hostile shell init that sets
|
||||
``XAI_BASE_URL=https://attacker.example/v1`` would ship the OAuth access
|
||||
token to a third party on every request, silently.
|
||||
|
||||
Pin the inference origin to ``api.x.ai`` (or any ``*.x.ai`` subdomain xAI
|
||||
may add). On rejection, fall back to the default and log a warning rather
|
||||
than raise — a bad env var should not deadlock authentication, but it
|
||||
should also never leak the bearer.
|
||||
|
||||
``value`` is the already-stripped, trailing-slash-trimmed candidate from
|
||||
env. Empty input returns ``fallback`` unchanged.
|
||||
"""
|
||||
candidate = (value or "").strip().rstrip("/")
|
||||
if not candidate:
|
||||
return fallback
|
||||
try:
|
||||
parsed = urlparse(candidate)
|
||||
except Exception:
|
||||
logger.warning(
|
||||
"Ignoring malformed xAI base_url override %r; using %s instead.",
|
||||
candidate, fallback,
|
||||
)
|
||||
return fallback
|
||||
if parsed.scheme != "https":
|
||||
logger.warning(
|
||||
"Refusing non-HTTPS xAI base_url override %r (xai-oauth bearer would "
|
||||
"be sent in cleartext); falling back to %s.",
|
||||
candidate, fallback,
|
||||
)
|
||||
return fallback
|
||||
host = (parsed.hostname or "").lower()
|
||||
if not host:
|
||||
logger.warning(
|
||||
"Ignoring xAI base_url override %r with no hostname; using %s instead.",
|
||||
candidate, fallback,
|
||||
)
|
||||
return fallback
|
||||
if host != "x.ai" and not host.endswith(".x.ai"):
|
||||
logger.warning(
|
||||
"Refusing xAI base_url override %r — host %r is not on the xAI origin "
|
||||
"(expected x.ai or a *.x.ai subdomain). The xai-oauth bearer is only "
|
||||
"valid against xAI's inference API; sending it elsewhere would leak "
|
||||
"the credential. Falling back to %s.",
|
||||
candidate, host, fallback,
|
||||
)
|
||||
return fallback
|
||||
return candidate
|
||||
|
||||
|
||||
def _xai_oauth_discovery(timeout_seconds: float = 15.0) -> Dict[str, str]:
|
||||
try:
|
||||
response = httpx.get(
|
||||
|
|
@ -3710,10 +3766,10 @@ def resolve_xai_oauth_runtime_credentials(
|
|||
)
|
||||
raise
|
||||
|
||||
base_url = (
|
||||
base_url = _xai_validate_inference_base_url(
|
||||
os.getenv("HERMES_XAI_BASE_URL", "").strip().rstrip("/")
|
||||
or os.getenv("XAI_BASE_URL", "").strip().rstrip("/")
|
||||
or DEFAULT_XAI_OAUTH_BASE_URL
|
||||
or os.getenv("XAI_BASE_URL", "").strip().rstrip("/"),
|
||||
fallback=DEFAULT_XAI_OAUTH_BASE_URL,
|
||||
)
|
||||
return {
|
||||
"provider": "xai-oauth",
|
||||
|
|
@ -6542,10 +6598,10 @@ def _xai_oauth_loopback_login(
|
|||
code="xai_token_exchange_invalid",
|
||||
)
|
||||
|
||||
base_url = (
|
||||
base_url = _xai_validate_inference_base_url(
|
||||
os.getenv("HERMES_XAI_BASE_URL", "").strip().rstrip("/")
|
||||
or os.getenv("XAI_BASE_URL", "").strip().rstrip("/")
|
||||
or DEFAULT_XAI_OAUTH_BASE_URL
|
||||
or os.getenv("XAI_BASE_URL", "").strip().rstrip("/"),
|
||||
fallback=DEFAULT_XAI_OAUTH_BASE_URL,
|
||||
)
|
||||
return {
|
||||
"tokens": {
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ from hermes_cli.auth import (
|
|||
_xai_callback_cors_origin,
|
||||
_xai_oauth_build_authorize_url,
|
||||
_xai_start_callback_server,
|
||||
_xai_validate_inference_base_url,
|
||||
_xai_validate_loopback_redirect_uri,
|
||||
format_auth_error,
|
||||
get_xai_oauth_auth_status,
|
||||
|
|
@ -554,6 +555,134 @@ def test_resolve_xai_runtime_credentials_honours_env_base_url(tmp_path, monkeypa
|
|||
assert creds["base_url"] == "https://custom.x.ai/v1"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Inference base-URL host guard (xai-oauth bearer leak protection)
|
||||
#
|
||||
# The xAI OAuth bearer is a high-value, long-lived SuperGrok credential.
|
||||
# ``XAI_BASE_URL`` / ``HERMES_XAI_BASE_URL`` are a credential-leak vector
|
||||
# unless the host is pinned to the xAI origin. These tests cover the
|
||||
# accept/reject matrix for `_xai_validate_inference_base_url` and confirm
|
||||
# the runtime resolver falls back to the default on rejection rather than
|
||||
# leaking the bearer to an attacker-controlled endpoint.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_xai_inference_base_url_accepts_default():
|
||||
assert (
|
||||
_xai_validate_inference_base_url(
|
||||
"https://api.x.ai/v1", fallback=DEFAULT_XAI_OAUTH_BASE_URL,
|
||||
)
|
||||
== "https://api.x.ai/v1"
|
||||
)
|
||||
|
||||
|
||||
def test_xai_inference_base_url_accepts_bare_apex():
|
||||
assert (
|
||||
_xai_validate_inference_base_url(
|
||||
"https://x.ai/v1", fallback=DEFAULT_XAI_OAUTH_BASE_URL,
|
||||
)
|
||||
== "https://x.ai/v1"
|
||||
)
|
||||
|
||||
|
||||
def test_xai_inference_base_url_accepts_subdomain():
|
||||
assert (
|
||||
_xai_validate_inference_base_url(
|
||||
"https://custom.x.ai/v1", fallback=DEFAULT_XAI_OAUTH_BASE_URL,
|
||||
)
|
||||
== "https://custom.x.ai/v1"
|
||||
)
|
||||
|
||||
|
||||
def test_xai_inference_base_url_strips_trailing_slash():
|
||||
assert (
|
||||
_xai_validate_inference_base_url(
|
||||
"https://api.x.ai/v1/", fallback=DEFAULT_XAI_OAUTH_BASE_URL,
|
||||
)
|
||||
== "https://api.x.ai/v1"
|
||||
)
|
||||
|
||||
|
||||
def test_xai_inference_base_url_empty_returns_fallback():
|
||||
assert (
|
||||
_xai_validate_inference_base_url("", fallback=DEFAULT_XAI_OAUTH_BASE_URL)
|
||||
== DEFAULT_XAI_OAUTH_BASE_URL
|
||||
)
|
||||
assert (
|
||||
_xai_validate_inference_base_url(" ", fallback=DEFAULT_XAI_OAUTH_BASE_URL)
|
||||
== DEFAULT_XAI_OAUTH_BASE_URL
|
||||
)
|
||||
|
||||
|
||||
def test_xai_inference_base_url_rejects_off_origin_host():
|
||||
# The headline attack: env var pointing at an attacker-controlled host.
|
||||
result = _xai_validate_inference_base_url(
|
||||
"https://attacker.example/v1", fallback=DEFAULT_XAI_OAUTH_BASE_URL,
|
||||
)
|
||||
assert result == DEFAULT_XAI_OAUTH_BASE_URL
|
||||
|
||||
|
||||
def test_xai_inference_base_url_rejects_suffix_lookalike():
|
||||
# ``api.x.ai.example`` ends in ``.example``, not ``.x.ai``. urlparse picks
|
||||
# the full host as the hostname, and the suffix check uses ``.x.ai`` (with
|
||||
# leading dot) so a lookalike like ``apix.ai`` or ``api.x.ai.evil.com``
|
||||
# is rejected.
|
||||
for hostile in (
|
||||
"https://api.x.ai.evil.com/v1",
|
||||
"https://apix.ai/v1",
|
||||
"https://x.ai.evil.com/v1",
|
||||
):
|
||||
assert (
|
||||
_xai_validate_inference_base_url(
|
||||
hostile, fallback=DEFAULT_XAI_OAUTH_BASE_URL,
|
||||
)
|
||||
== DEFAULT_XAI_OAUTH_BASE_URL
|
||||
), hostile
|
||||
|
||||
|
||||
def test_xai_inference_base_url_rejects_http():
|
||||
# http:// would put the bearer on the wire in cleartext.
|
||||
assert (
|
||||
_xai_validate_inference_base_url(
|
||||
"http://api.x.ai/v1", fallback=DEFAULT_XAI_OAUTH_BASE_URL,
|
||||
)
|
||||
== DEFAULT_XAI_OAUTH_BASE_URL
|
||||
)
|
||||
|
||||
|
||||
def test_xai_inference_base_url_rejects_other_schemes():
|
||||
for hostile in (
|
||||
"ftp://api.x.ai/v1",
|
||||
"file:///etc/passwd",
|
||||
"javascript:alert(1)",
|
||||
):
|
||||
assert (
|
||||
_xai_validate_inference_base_url(
|
||||
hostile, fallback=DEFAULT_XAI_OAUTH_BASE_URL,
|
||||
)
|
||||
== DEFAULT_XAI_OAUTH_BASE_URL
|
||||
), hostile
|
||||
|
||||
|
||||
def test_resolve_xai_runtime_credentials_rejects_off_origin_env_base_url(tmp_path, monkeypatch, caplog):
|
||||
# The end-to-end guarantee: if the env var points at an attacker host,
|
||||
# the resolver MUST silently fall back to the default rather than ship
|
||||
# the OAuth bearer to the attacker.
|
||||
hermes_home = tmp_path / "hermes"
|
||||
fresh = _jwt_with_exp(int(time.time()) + 3600)
|
||||
_setup_hermes_auth(hermes_home, access_token=fresh)
|
||||
monkeypatch.setenv("HERMES_HOME", str(hermes_home))
|
||||
monkeypatch.setenv("XAI_BASE_URL", "https://attacker.example/v1")
|
||||
monkeypatch.delenv("HERMES_XAI_BASE_URL", raising=False)
|
||||
|
||||
with caplog.at_level("WARNING"):
|
||||
creds = resolve_xai_oauth_runtime_credentials()
|
||||
assert creds["base_url"] == DEFAULT_XAI_OAUTH_BASE_URL
|
||||
assert any(
|
||||
"attacker.example" in record.getMessage() for record in caplog.records
|
||||
), "Expected a warning identifying the rejected override host."
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Quarantine: terminal refresh failure clears dead tokens (#28155 sibling)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue