"""Regex-based secret redaction for logs and tool output. Applies pattern matching to mask API keys, tokens, and credentials before they reach log files, verbose output, or gateway logs. Short tokens (< 18 chars) are fully masked. Longer tokens preserve the first 6 and last 4 characters for debuggability. """ import logging import os import re logger = logging.getLogger(__name__) # Sensitive query-string parameter names (case-insensitive exact match). # Ported from nearai/ironclaw#2529 — catches tokens whose values don't match # any known vendor prefix regex (e.g. opaque tokens, short OAuth codes). _SENSITIVE_QUERY_PARAMS = frozenset({ "access_token", "refresh_token", "id_token", "token", "api_key", "apikey", "client_secret", "password", "auth", "jwt", "session", "secret", "key", "code", # OAuth authorization codes "signature", # pre-signed URL signatures "x-amz-signature", }) # Sensitive form-urlencoded / JSON body key names (case-insensitive exact match). # Exact match, NOT substring — "token_count" and "session_id" must NOT match. # Ported from nearai/ironclaw#2529. _SENSITIVE_BODY_KEYS = frozenset({ "access_token", "refresh_token", "id_token", "token", "api_key", "apikey", "client_secret", "password", "auth", "jwt", "secret", "private_key", "authorization", "key", }) # Snapshot at import time so runtime env mutations (e.g. LLM-generated # `export HERMES_REDACT_SECRETS=false`) cannot disable redaction mid-session. _REDACT_ENABLED = os.getenv("HERMES_REDACT_SECRETS", "").lower() not in ("0", "false", "no", "off") # Known API key prefixes -- match the prefix + contiguous token chars _PREFIX_PATTERNS = [ r"sk-[A-Za-z0-9_-]{10,}", # OpenAI / OpenRouter / Anthropic (sk-ant-*) r"ghp_[A-Za-z0-9]{10,}", # GitHub PAT (classic) r"github_pat_[A-Za-z0-9_]{10,}", # GitHub PAT (fine-grained) r"gho_[A-Za-z0-9]{10,}", # GitHub OAuth access token r"ghu_[A-Za-z0-9]{10,}", # GitHub user-to-server token r"ghs_[A-Za-z0-9]{10,}", # GitHub server-to-server token r"ghr_[A-Za-z0-9]{10,}", # GitHub refresh token r"xox[baprs]-[A-Za-z0-9-]{10,}", # Slack tokens r"AIza[A-Za-z0-9_-]{30,}", # Google API keys r"pplx-[A-Za-z0-9]{10,}", # Perplexity r"fal_[A-Za-z0-9_-]{10,}", # Fal.ai r"fc-[A-Za-z0-9]{10,}", # Firecrawl r"bb_live_[A-Za-z0-9_-]{10,}", # BrowserBase r"gAAAA[A-Za-z0-9_=-]{20,}", # Codex encrypted tokens r"AKIA[A-Z0-9]{16}", # AWS Access Key ID r"sk_live_[A-Za-z0-9]{10,}", # Stripe secret key (live) r"sk_test_[A-Za-z0-9]{10,}", # Stripe secret key (test) r"rk_live_[A-Za-z0-9]{10,}", # Stripe restricted key r"SG\.[A-Za-z0-9_-]{10,}", # SendGrid API key r"hf_[A-Za-z0-9]{10,}", # HuggingFace token r"r8_[A-Za-z0-9]{10,}", # Replicate API token r"npm_[A-Za-z0-9]{10,}", # npm access token r"pypi-[A-Za-z0-9_-]{10,}", # PyPI API token r"dop_v1_[A-Za-z0-9]{10,}", # DigitalOcean PAT r"doo_v1_[A-Za-z0-9]{10,}", # DigitalOcean OAuth r"am_[A-Za-z0-9_-]{10,}", # AgentMail API key r"sk_[A-Za-z0-9_]{10,}", # ElevenLabs TTS key (sk_ underscore, not sk- dash) r"tvly-[A-Za-z0-9]{10,}", # Tavily search API key r"exa_[A-Za-z0-9]{10,}", # Exa search API key r"gsk_[A-Za-z0-9]{10,}", # Groq Cloud API key r"syt_[A-Za-z0-9]{10,}", # Matrix access token r"retaindb_[A-Za-z0-9]{10,}", # RetainDB API key r"hsk-[A-Za-z0-9]{10,}", # Hindsight API key r"mem0_[A-Za-z0-9]{10,}", # Mem0 Platform API key r"brv_[A-Za-z0-9]{10,}", # ByteRover API key ] # ENV assignment patterns: KEY=value where KEY contains a secret-like name _SECRET_ENV_NAMES = r"(?:API_?KEY|TOKEN|SECRET|PASSWORD|PASSWD|CREDENTIAL|AUTH)" _ENV_ASSIGN_RE = re.compile( rf"([A-Z0-9_]{{0,50}}{_SECRET_ENV_NAMES}[A-Z0-9_]{{0,50}})\s*=\s*(['\"]?)(\S+)\2", ) # JSON field patterns: "apiKey": "value", "token": "value", etc. _JSON_KEY_NAMES = r"(?:api_?[Kk]ey|token|secret|password|access_token|refresh_token|auth_token|bearer|secret_value|raw_secret|secret_input|key_material)" _JSON_FIELD_RE = re.compile( rf'("{_JSON_KEY_NAMES}")\s*:\s*"([^"]+)"', re.IGNORECASE, ) # Authorization headers _AUTH_HEADER_RE = re.compile( r"(Authorization:\s*Bearer\s+)(\S+)", re.IGNORECASE, ) # Telegram bot tokens: bot: or :, # where token part is restricted to [-A-Za-z0-9_] and length >= 30 _TELEGRAM_RE = re.compile( r"(bot)?(\d{8,}):([-A-Za-z0-9_]{30,})", ) # Private key blocks: -----BEGIN RSA PRIVATE KEY----- ... -----END RSA PRIVATE KEY----- _PRIVATE_KEY_RE = re.compile( r"-----BEGIN[A-Z ]*PRIVATE KEY-----[\s\S]*?-----END[A-Z ]*PRIVATE KEY-----" ) # Database connection strings: protocol://user:PASSWORD@host # Catches postgres, mysql, mongodb, redis, amqp URLs and redacts the password _DB_CONNSTR_RE = re.compile( r"((?:postgres(?:ql)?|mysql|mongodb(?:\+srv)?|redis|amqp)://[^:]+:)([^@]+)(@)", re.IGNORECASE, ) # JWT tokens: header.payload[.signature] — always start with "eyJ" (base64 for "{") # Matches 1-part (header only), 2-part (header.payload), and full 3-part JWTs. _JWT_RE = re.compile( r"eyJ[A-Za-z0-9_-]{10,}" # Header (always starts with eyJ) r"(?:\.[A-Za-z0-9_=-]{4,}){0,2}" # Optional payload and/or signature ) # Discord user/role mentions: <@123456789012345678> or <@!123456789012345678> # Snowflake IDs are 17-20 digit integers that resolve to specific Discord accounts. _DISCORD_MENTION_RE = re.compile(r"<@!?(\d{17,20})>") # E.164 phone numbers: +, 7-15 digits # Negative lookahead prevents matching hex strings or identifiers _SIGNAL_PHONE_RE = re.compile(r"(\+[1-9]\d{6,14})(?![A-Za-z0-9])") # URLs containing query strings — matches `scheme://...?...[# or end]`. # Used to scan text for URLs whose query params may contain secrets. # Ported from nearai/ironclaw#2529. _URL_WITH_QUERY_RE = re.compile( r"(https?|wss?|ftp)://" # scheme r"([^\s/?#]+)" # authority (may include userinfo) r"([^\s?#]*)" # path r"\?([^\s#]+)" # query (required) r"(#\S*)?", # optional fragment ) # URLs containing userinfo — `scheme://user:password@host` for ANY scheme # (not just DB protocols already covered by _DB_CONNSTR_RE above). # Catches things like `https://user:token@api.example.com/v1/foo`. _URL_USERINFO_RE = re.compile( r"(https?|wss?|ftp)://([^/\s:@]+):([^/\s@]+)@", ) # Form-urlencoded body detection: conservative — only applies when the entire # text looks like a query string (k=v&k=v pattern with no newlines). _FORM_BODY_RE = re.compile( r"^[A-Za-z_][A-Za-z0-9_.-]*=[^&\s]*(?:&[A-Za-z_][A-Za-z0-9_.-]*=[^&\s]*)+$" ) # Compile known prefix patterns into one alternation _PREFIX_RE = re.compile( r"(? str: """Mask a token, preserving prefix for long tokens.""" if len(token) < 18: return "***" return f"{token[:6]}...{token[-4:]}" def _redact_query_string(query: str) -> str: """Redact sensitive parameter values in a URL query string. Handles `k=v&k=v` format. Sensitive keys (case-insensitive) have values replaced with `***`. Non-sensitive keys pass through unchanged. Empty or malformed pairs are preserved as-is. """ if not query: return query parts = [] for pair in query.split("&"): if "=" not in pair: parts.append(pair) continue key, _, value = pair.partition("=") if key.lower() in _SENSITIVE_QUERY_PARAMS: parts.append(f"{key}=***") else: parts.append(pair) return "&".join(parts) def _redact_url_query_params(text: str) -> str: """Scan text for URLs with query strings and redact sensitive params. Catches opaque tokens that don't match vendor prefix regexes, e.g. `https://example.com/cb?code=ABC123&state=xyz` → `...?code=***&state=xyz`. """ def _sub(m: re.Match) -> str: scheme = m.group(1) authority = m.group(2) path = m.group(3) query = _redact_query_string(m.group(4)) fragment = m.group(5) or "" return f"{scheme}://{authority}{path}?{query}{fragment}" return _URL_WITH_QUERY_RE.sub(_sub, text) def _redact_url_userinfo(text: str) -> str: """Strip `user:password@` from HTTP/WS/FTP URLs. DB protocols (postgres, mysql, mongodb, redis, amqp) are handled separately by `_DB_CONNSTR_RE`. """ return _URL_USERINFO_RE.sub( lambda m: f"{m.group(1)}://{m.group(2)}:***@", text, ) def _redact_form_body(text: str) -> str: """Redact sensitive values in a form-urlencoded body. Only applies when the entire input looks like a pure form body (k=v&k=v with no newlines, no other text). Single-line non-form text passes through unchanged. This is a conservative pass — the `_redact_url_query_params` function handles embedded query strings. """ if not text or "\n" in text or "&" not in text: return text # The body-body form check is strict: only trigger on clean k=v&k=v. if not _FORM_BODY_RE.match(text.strip()): return text return _redact_query_string(text.strip()) def redact_sensitive_text(text: str) -> str: """Apply all redaction patterns to a block of text. Safe to call on any string -- non-matching text passes through unchanged. Disabled when security.redact_secrets is false in config.yaml. """ if text is None: return None if not isinstance(text, str): text = str(text) if not text: return text if not _REDACT_ENABLED: return text # Known prefixes (sk-, ghp_, etc.) text = _PREFIX_RE.sub(lambda m: _mask_token(m.group(1)), text) # ENV assignments: OPENAI_API_KEY=sk-abc... def _redact_env(m): name, quote, value = m.group(1), m.group(2), m.group(3) return f"{name}={quote}{_mask_token(value)}{quote}" text = _ENV_ASSIGN_RE.sub(_redact_env, text) # JSON fields: "apiKey": "value" def _redact_json(m): key, value = m.group(1), m.group(2) return f'{key}: "{_mask_token(value)}"' text = _JSON_FIELD_RE.sub(_redact_json, text) # Authorization headers text = _AUTH_HEADER_RE.sub( lambda m: m.group(1) + _mask_token(m.group(2)), text, ) # Telegram bot tokens def _redact_telegram(m): prefix = m.group(1) or "" digits = m.group(2) return f"{prefix}{digits}:***" text = _TELEGRAM_RE.sub(_redact_telegram, text) # Private key blocks text = _PRIVATE_KEY_RE.sub("[REDACTED PRIVATE KEY]", text) # Database connection string passwords text = _DB_CONNSTR_RE.sub(lambda m: f"{m.group(1)}***{m.group(3)}", text) # JWT tokens (eyJ... — base64-encoded JSON headers) text = _JWT_RE.sub(lambda m: _mask_token(m.group(0)), text) # URL userinfo (http(s)://user:pass@host) — redact for non-DB schemes. # DB schemes are handled above by _DB_CONNSTR_RE. text = _redact_url_userinfo(text) # URL query params containing opaque tokens (?access_token=…&code=…) text = _redact_url_query_params(text) # Form-urlencoded bodies (only triggers on clean k=v&k=v inputs). text = _redact_form_body(text) # Discord user/role mentions (<@snowflake_id>) text = _DISCORD_MENTION_RE.sub(lambda m: f"<@{'!' if '!' in m.group(0) else ''}***>", text) # E.164 phone numbers (Signal, WhatsApp) def _redact_phone(m): phone = m.group(1) if len(phone) <= 8: return phone[:2] + "****" + phone[-2:] return phone[:4] + "****" + phone[-4:] text = _SIGNAL_PHONE_RE.sub(_redact_phone, text) return text class RedactingFormatter(logging.Formatter): """Log formatter that redacts secrets from all log messages.""" def __init__(self, fmt=None, datefmt=None, style='%', **kwargs): super().__init__(fmt, datefmt, style, **kwargs) def format(self, record: logging.LogRecord) -> str: original = super().format(record) return redact_sensitive_text(original)