mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-30 06:41:51 +00:00
174 lines
4.9 KiB
Python
174 lines
4.9 KiB
Python
"""Credential-pool disk-boundary sanitization helpers.
|
|
|
|
These helpers define which credential-pool entries are references to borrowed
|
|
runtime secrets and strip raw values before those entries are written to
|
|
``auth.json``. They intentionally have no dependency on ``hermes_cli.auth`` so
|
|
both the pool model and the final auth-store write boundary can share the same
|
|
policy without import cycles.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import re
|
|
from typing import Any, Dict, Mapping
|
|
|
|
|
|
# Sources Hermes owns and can intentionally persist in auth.json. Everything
|
|
# else with a non-empty source is treated as borrowed/reference-only by default
|
|
# so future external secret providers fail closed at the disk boundary.
|
|
_PERSISTABLE_PROVIDER_SOURCES = frozenset({
|
|
("anthropic", "hermes_pkce"),
|
|
("minimax-oauth", "oauth"),
|
|
("nous", "device_code"),
|
|
("openai-codex", "device_code"),
|
|
("xai-oauth", "loopback_pkce"),
|
|
})
|
|
|
|
_SAFE_SECRETISH_METADATA_KEYS = frozenset({
|
|
"secret_fingerprint",
|
|
"secret_source",
|
|
"token_type",
|
|
"scope",
|
|
"client_id",
|
|
"agent_key_id",
|
|
"agent_key_expires_at",
|
|
"agent_key_expires_in",
|
|
"agent_key_reused",
|
|
"agent_key_obtained_at",
|
|
"expires_at",
|
|
"expires_at_ms",
|
|
"expires_in",
|
|
"last_refresh",
|
|
"last_status",
|
|
"last_status_at",
|
|
"last_error_code",
|
|
"last_error_reason",
|
|
"last_error_message",
|
|
"last_error_reset_at",
|
|
})
|
|
|
|
_SECRET_VALUE_KEYS = frozenset({
|
|
"access_token",
|
|
"refresh_token",
|
|
"agent_key",
|
|
"api_key",
|
|
"apikey",
|
|
"api_token",
|
|
"auth_token",
|
|
"authorization",
|
|
"bearer_token",
|
|
"client_secret",
|
|
"credential",
|
|
"credentials",
|
|
"id_token",
|
|
"oauth_token",
|
|
"private_key",
|
|
"secret_key",
|
|
"session_token",
|
|
"password",
|
|
"secret",
|
|
"token",
|
|
"tokens",
|
|
})
|
|
|
|
_SECRET_VALUE_SUFFIXES = (
|
|
"_api_key",
|
|
"_api_token",
|
|
"_access_token",
|
|
"_auth_token",
|
|
"_refresh_token",
|
|
"_bearer_token",
|
|
"_client_secret",
|
|
"_id_token",
|
|
"_oauth_token",
|
|
"_private_key",
|
|
"_session_token",
|
|
"_secret_key",
|
|
"_password",
|
|
"_secret",
|
|
"_token",
|
|
"_key",
|
|
)
|
|
|
|
_CAMEL_CASE_BOUNDARY = re.compile(r"(?<=[a-z0-9])(?=[A-Z])")
|
|
|
|
|
|
def _normalize_key(key: Any) -> str:
|
|
raw = str(key or "").strip()
|
|
raw = _CAMEL_CASE_BOUNDARY.sub("_", raw)
|
|
return raw.lower().replace("-", "_").replace(".", "_")
|
|
|
|
|
|
def is_borrowed_credential_source(source: Any, provider_id: Any = None) -> bool:
|
|
"""Return True when ``source`` points at a borrowed/reference-only secret."""
|
|
normalized_source = str(source or "").strip().lower()
|
|
if not normalized_source:
|
|
return False
|
|
if normalized_source == "manual" or normalized_source.startswith("manual:"):
|
|
return False
|
|
normalized_provider = str(provider_id or "").strip().lower()
|
|
return (normalized_provider, normalized_source) not in _PERSISTABLE_PROVIDER_SOURCES
|
|
|
|
|
|
def _is_secret_payload_key(key: Any) -> bool:
|
|
normalized = _normalize_key(key)
|
|
if not normalized or normalized in _SAFE_SECRETISH_METADATA_KEYS:
|
|
return False
|
|
if normalized in _SECRET_VALUE_KEYS:
|
|
return True
|
|
return normalized.endswith(_SECRET_VALUE_SUFFIXES)
|
|
|
|
|
|
def _fingerprint_value(value: Any) -> str | None:
|
|
if value is None:
|
|
return None
|
|
text = str(value)
|
|
if not text:
|
|
return None
|
|
digest = hashlib.sha256(text.encode("utf-8", errors="surrogatepass")).hexdigest()
|
|
return f"sha256:{digest[:16]}"
|
|
|
|
|
|
def _credential_secret_fingerprint(payload: Mapping[str, Any]) -> str | None:
|
|
for key in ("agent_key", "access_token", "refresh_token", "api_key", "token", "secret"):
|
|
fingerprint = _fingerprint_value(payload.get(key))
|
|
if fingerprint:
|
|
return fingerprint
|
|
|
|
for key, value in payload.items():
|
|
if _is_secret_payload_key(key):
|
|
fingerprint = _fingerprint_value(value)
|
|
if fingerprint:
|
|
return fingerprint
|
|
|
|
existing = payload.get("secret_fingerprint")
|
|
if isinstance(existing, str) and existing.startswith("sha256:"):
|
|
return existing
|
|
return None
|
|
|
|
|
|
def sanitize_borrowed_credential_payload(
|
|
payload: Mapping[str, Any],
|
|
provider_id: Any = None,
|
|
) -> Dict[str, Any]:
|
|
"""Return a disk-safe credential-pool payload.
|
|
|
|
Owned sources (manual entries and Hermes-owned OAuth/device-code state)
|
|
pass through unchanged. Borrowed/reference-only sources keep labels,
|
|
source refs, status/cooldown metadata, counters, and a non-reversible
|
|
fingerprint, but raw secret value fields are removed.
|
|
"""
|
|
result = dict(payload)
|
|
if not is_borrowed_credential_source(result.get("source"), provider_id):
|
|
return result
|
|
|
|
fingerprint = _credential_secret_fingerprint(result)
|
|
sanitized = {
|
|
key: value
|
|
for key, value in result.items()
|
|
if not _is_secret_payload_key(key)
|
|
}
|
|
if fingerprint:
|
|
sanitized["secret_fingerprint"] = fingerprint
|
|
return sanitized
|