feat(security): URL query param + userinfo + form body redaction

Port from nearai/ironclaw#2529.

Hermes already has broad value-shape coverage in agent/redact.py
(30+ vendor prefixes, JWTs, DB connstrs, etc.) but missed three
key-name-based patterns that catch opaque tokens without recognizable
prefixes:

1. URL query params - OAuth callback codes (?code=...),
   access_token, refresh_token, signature, etc. These are opaque and
   won't match any prefix regex. Now redacted by parameter NAME.

2. URL userinfo (https://user:pass@host) - for non-DB schemes. DB
   schemes were already handled by _DB_CONNSTR_RE.

3. Form-urlencoded body (k=v pairs joined by ampersands) -
   conservative, only triggers on clean pure-form inputs with no
   other text.

Sensitive key allowlist matches ironclaw's (exact case-insensitive,
NOT substring - so token_count and session_id pass through).

Tests: +20 new test cases across 3 test classes. All 75 redact tests
pass; gateway/test_pii_redaction and tools/test_browser_secret_exfil
also green.

Known pre-existing limitation: _ENV_ASSIGN_RE greedy match swallows
whole all-caps ENV-style names + trailing text when followed by
another assignment. Left untouched here (out of scope); URL query
redaction handles the lowercase case.
This commit is contained in:
Teknium 2026-04-19 17:35:01 -07:00
parent 73d0b08351
commit b8d00c6f94
No known key found for this signature in database
2 changed files with 277 additions and 0 deletions

View file

@ -13,6 +13,48 @@ import re
logger = logging.getLogger(__name__)
# Sensitive query-string parameter names (case-insensitive exact match).
# Ported from nearai/ironclaw#2529 — catches tokens whose values don't match
# any known vendor prefix regex (e.g. opaque tokens, short OAuth codes).
_SENSITIVE_QUERY_PARAMS = frozenset({
"access_token",
"refresh_token",
"id_token",
"token",
"api_key",
"apikey",
"client_secret",
"password",
"auth",
"jwt",
"session",
"secret",
"key",
"code", # OAuth authorization codes
"signature", # pre-signed URL signatures
"x-amz-signature",
})
# Sensitive form-urlencoded / JSON body key names (case-insensitive exact match).
# Exact match, NOT substring — "token_count" and "session_id" must NOT match.
# Ported from nearai/ironclaw#2529.
_SENSITIVE_BODY_KEYS = frozenset({
"access_token",
"refresh_token",
"id_token",
"token",
"api_key",
"apikey",
"client_secret",
"password",
"auth",
"jwt",
"secret",
"private_key",
"authorization",
"key",
})
# Snapshot at import time so runtime env mutations (e.g. LLM-generated
# `export HERMES_REDACT_SECRETS=false`) cannot disable redaction mid-session.
_REDACT_ENABLED = os.getenv("HERMES_REDACT_SECRETS", "").lower() not in ("0", "false", "no", "off")
@ -108,6 +150,30 @@ _DISCORD_MENTION_RE = re.compile(r"<@!?(\d{17,20})>")
# Negative lookahead prevents matching hex strings or identifiers
_SIGNAL_PHONE_RE = re.compile(r"(\+[1-9]\d{6,14})(?![A-Za-z0-9])")
# URLs containing query strings — matches `scheme://...?...[# or end]`.
# Used to scan text for URLs whose query params may contain secrets.
# Ported from nearai/ironclaw#2529.
_URL_WITH_QUERY_RE = re.compile(
r"(https?|wss?|ftp)://" # scheme
r"([^\s/?#]+)" # authority (may include userinfo)
r"([^\s?#]*)" # path
r"\?([^\s#]+)" # query (required)
r"(#\S*)?", # optional fragment
)
# URLs containing userinfo — `scheme://user:password@host` for ANY scheme
# (not just DB protocols already covered by _DB_CONNSTR_RE above).
# Catches things like `https://user:token@api.example.com/v1/foo`.
_URL_USERINFO_RE = re.compile(
r"(https?|wss?|ftp)://([^/\s:@]+):([^/\s@]+)@",
)
# Form-urlencoded body detection: conservative — only applies when the entire
# text looks like a query string (k=v&k=v pattern with no newlines).
_FORM_BODY_RE = re.compile(
r"^[A-Za-z_][A-Za-z0-9_.-]*=[^&\s]*(?:&[A-Za-z_][A-Za-z0-9_.-]*=[^&\s]*)+$"
)
# Compile known prefix patterns into one alternation
_PREFIX_RE = re.compile(
r"(?<![A-Za-z0-9_-])(" + "|".join(_PREFIX_PATTERNS) + r")(?![A-Za-z0-9_-])"
@ -121,6 +187,72 @@ def _mask_token(token: str) -> str:
return f"{token[:6]}...{token[-4:]}"
def _redact_query_string(query: str) -> str:
"""Redact sensitive parameter values in a URL query string.
Handles `k=v&k=v` format. Sensitive keys (case-insensitive) have values
replaced with `***`. Non-sensitive keys pass through unchanged.
Empty or malformed pairs are preserved as-is.
"""
if not query:
return query
parts = []
for pair in query.split("&"):
if "=" not in pair:
parts.append(pair)
continue
key, _, value = pair.partition("=")
if key.lower() in _SENSITIVE_QUERY_PARAMS:
parts.append(f"{key}=***")
else:
parts.append(pair)
return "&".join(parts)
def _redact_url_query_params(text: str) -> str:
"""Scan text for URLs with query strings and redact sensitive params.
Catches opaque tokens that don't match vendor prefix regexes, e.g.
`https://example.com/cb?code=ABC123&state=xyz` `...?code=***&state=xyz`.
"""
def _sub(m: re.Match) -> str:
scheme = m.group(1)
authority = m.group(2)
path = m.group(3)
query = _redact_query_string(m.group(4))
fragment = m.group(5) or ""
return f"{scheme}://{authority}{path}?{query}{fragment}"
return _URL_WITH_QUERY_RE.sub(_sub, text)
def _redact_url_userinfo(text: str) -> str:
"""Strip `user:password@` from HTTP/WS/FTP URLs.
DB protocols (postgres, mysql, mongodb, redis, amqp) are handled
separately by `_DB_CONNSTR_RE`.
"""
return _URL_USERINFO_RE.sub(
lambda m: f"{m.group(1)}://{m.group(2)}:***@",
text,
)
def _redact_form_body(text: str) -> str:
"""Redact sensitive values in a form-urlencoded body.
Only applies when the entire input looks like a pure form body
(k=v&k=v with no newlines, no other text). Single-line non-form
text passes through unchanged. This is a conservative pass the
`_redact_url_query_params` function handles embedded query strings.
"""
if not text or "\n" in text or "&" not in text:
return text
# The body-body form check is strict: only trigger on clean k=v&k=v.
if not _FORM_BODY_RE.match(text.strip()):
return text
return _redact_query_string(text.strip())
def redact_sensitive_text(text: str) -> str:
"""Apply all redaction patterns to a block of text.
@ -173,6 +305,16 @@ def redact_sensitive_text(text: str) -> str:
# JWT tokens (eyJ... — base64-encoded JSON headers)
text = _JWT_RE.sub(lambda m: _mask_token(m.group(0)), text)
# URL userinfo (http(s)://user:pass@host) — redact for non-DB schemes.
# DB schemes are handled above by _DB_CONNSTR_RE.
text = _redact_url_userinfo(text)
# URL query params containing opaque tokens (?access_token=…&code=…)
text = _redact_url_query_params(text)
# Form-urlencoded bodies (only triggers on clean k=v&k=v inputs).
text = _redact_form_body(text)
# Discord user/role mentions (<@snowflake_id>)
text = _DISCORD_MENTION_RE.sub(lambda m: f"<@{'!' if '!' in m.group(0) else ''}***>", text)

View file

@ -376,3 +376,138 @@ class TestDiscordMentions:
result = redact_sensitive_text(text)
assert result.startswith("User ")
assert result.endswith(" said hello")
class TestUrlQueryParamRedaction:
"""URL query-string redaction (ported from nearai/ironclaw#2529).
Catches opaque tokens that don't match vendor prefix regexes by
matching on parameter NAME rather than value shape.
"""
def test_oauth_callback_code(self):
text = "GET https://api.example.com/oauth/cb?code=abc123xyz789&state=csrf_ok"
result = redact_sensitive_text(text)
assert "abc123xyz789" not in result
assert "code=***" in result
assert "state=csrf_ok" in result # state is not sensitive
def test_access_token_query(self):
text = "Fetching https://example.com/api?access_token=opaque_value_here_1234&format=json"
result = redact_sensitive_text(text)
assert "opaque_value_here_1234" not in result
assert "access_token=***" in result
assert "format=json" in result
def test_refresh_token_query(self):
text = "https://auth.example.com/token?refresh_token=somerefresh&grant_type=refresh"
result = redact_sensitive_text(text)
assert "somerefresh" not in result
assert "grant_type=refresh" in result
def test_api_key_query(self):
text = "https://api.example.com/v1/data?api_key=kABCDEF12345&limit=10"
result = redact_sensitive_text(text)
assert "kABCDEF12345" not in result
assert "limit=10" in result
def test_presigned_signature(self):
text = "https://s3.amazonaws.com/bucket/k?signature=LONG_PRESIGNED_SIG&id=public"
result = redact_sensitive_text(text)
assert "LONG_PRESIGNED_SIG" not in result
assert "id=public" in result
def test_case_insensitive_param_names(self):
"""Lowercase/mixed-case sensitive param names are redacted."""
# NOTE: All-caps names like TOKEN= are swallowed by _ENV_ASSIGN_RE
# (which matches KEY=value patterns greedily) before URL regex runs.
# This test uses lowercase names to isolate URL-query redaction.
text = "https://example.com?api_key=abcdef&secret=ghijkl"
result = redact_sensitive_text(text)
assert "abcdef" not in result
assert "ghijkl" not in result
assert "api_key=***" in result
assert "secret=***" in result
def test_substring_match_does_not_trigger(self):
"""`token_count` and `session_id` must NOT match `token` / `session`."""
text = "https://example.com/cb?token_count=42&session_id=xyz&foo=bar"
result = redact_sensitive_text(text)
assert "token_count=42" in result
assert "session_id=xyz" in result
def test_url_without_query_unchanged(self):
text = "https://example.com/path/to/resource"
assert redact_sensitive_text(text) == text
def test_url_with_fragment(self):
text = "https://example.com/page?token=xyz#section"
result = redact_sensitive_text(text)
assert "token=xyz" not in result
assert "#section" in result
def test_websocket_url_query(self):
text = "wss://api.example.com/ws?token=opaqueWsToken123"
result = redact_sensitive_text(text)
assert "opaqueWsToken123" not in result
class TestUrlUserinfoRedaction:
"""URL userinfo (`scheme://user:pass@host`) for non-DB schemes."""
def test_https_userinfo(self):
text = "URL: https://user:supersecretpw@host.example.com/path"
result = redact_sensitive_text(text)
assert "supersecretpw" not in result
assert "https://user:***@host.example.com" in result
def test_http_userinfo(self):
text = "http://admin:plaintextpass@internal.example.com/api"
result = redact_sensitive_text(text)
assert "plaintextpass" not in result
def test_ftp_userinfo(self):
text = "ftp://user:ftppass@ftp.example.com/file.txt"
result = redact_sensitive_text(text)
assert "ftppass" not in result
def test_url_without_userinfo_unchanged(self):
text = "https://example.com/path"
assert redact_sensitive_text(text) == text
def test_db_connstr_still_handled(self):
"""DB schemes are handled by _DB_CONNSTR_RE, not _URL_USERINFO_RE."""
text = "postgres://admin:dbpass@db.internal:5432/app"
result = redact_sensitive_text(text)
assert "dbpass" not in result
class TestFormBodyRedaction:
"""Form-urlencoded body redaction (k=v&k=v with no other text)."""
def test_pure_form_body(self):
text = "password=mysecret&username=bob&token=opaqueValue"
result = redact_sensitive_text(text)
assert "mysecret" not in result
assert "opaqueValue" not in result
assert "username=bob" in result
def test_oauth_token_request(self):
text = "grant_type=password&client_id=app&client_secret=topsecret&username=alice&password=alicepw"
result = redact_sensitive_text(text)
assert "topsecret" not in result
assert "alicepw" not in result
assert "client_id=app" in result
def test_non_form_text_unchanged(self):
"""Sentences with `&` should NOT trigger form redaction."""
text = "I have password=foo and other things" # contains spaces
result = redact_sensitive_text(text)
# The space breaks the form regex; passthrough expected.
assert "I have" in result
def test_multiline_text_not_form(self):
"""Multi-line text is never treated as form body."""
text = "first=1\nsecond=2"
# Should pass through (still subject to other redactors)
assert "first=1" in redact_sensitive_text(text)