From 2e509422ef30a6feaeffb93d77068327fdbf9ae5 Mon Sep 17 00:00:00 2001 From: Tom Qiao Date: Sun, 12 Apr 2026 09:19:41 +0800 Subject: [PATCH] fix(security): hash gateway pairing codes instead of storing plaintext Pairing codes were stored as plaintext keys in JSON files. Now uses sha256 + random salt hashing with constant-time comparison. Fixes #8036 Co-Authored-By: Claude Opus 4.6 --- gateway/pairing.py | 68 ++++++++++++++++++----- tests/gateway/test_pairing.py | 100 ++++++++++++++++++++++++++++++++-- 2 files changed, 149 insertions(+), 19 deletions(-) diff --git a/gateway/pairing.py b/gateway/pairing.py index af9ff2fdbfd..471b22b3cba 100644 --- a/gateway/pairing.py +++ b/gateway/pairing.py @@ -18,6 +18,7 @@ Security features (based on OWASP + NIST SP 800-63-4 guidance): Storage: ~/.hermes/pairing/ """ +import hashlib import json import os import secrets @@ -148,6 +149,11 @@ class PairingStore: # ----- Pending codes ----- + @staticmethod + def _hash_code(code: str, salt: bytes) -> str: + """Hash a pairing code with the given salt using SHA-256.""" + return hashlib.sha256(salt + code.encode("utf-8")).hexdigest() + def generate_code( self, platform: str, user_id: str, user_name: str = "" ) -> Optional[str]: @@ -158,6 +164,9 @@ class PairingStore: - User is rate-limited (too recent request) - Max pending codes reached for this platform - User/platform is in lockout due to failed attempts + + The code is NOT stored in plaintext. Only a salted SHA-256 hash is + persisted so that reading the pending file does not reveal codes. """ with self._lock: self._cleanup_expired(platform) @@ -178,8 +187,17 @@ class PairingStore: # Generate cryptographically random code code = "".join(secrets.choice(ALPHABET) for _ in range(CODE_LENGTH)) - # Store pending request - pending[code] = { + # Hash the code with a random salt before storing + salt = os.urandom(16) + code_hash = self._hash_code(code, salt) + + # Use a unique entry id as the key (not the code itself) + entry_id = secrets.token_hex(8) + + # Store pending request with hashed code + pending[entry_id] = { + "hash": code_hash, + "salt": salt.hex(), "user_id": user_id, "user_name": user_name, "created_at": time.time(), @@ -195,10 +213,14 @@ class PairingStore: """ Approve a pairing code. Adds the user to the approved list. - Returns {user_id, user_name} on success, None if code is + Returns ``{user_id, user_name}`` on success, ``None`` if the code is invalid/expired OR the platform is currently locked out after ``MAX_FAILED_ATTEMPTS`` failed approvals (#10195). Callers can disambiguate with ``_is_locked_out(platform)``. + + Verification: the user-provided code is hashed with each stored + entry's salt and compared to the stored hash using constant-time + comparison. """ with self._lock: self._cleanup_expired(platform) @@ -213,33 +235,51 @@ class PairingStore: return None pending = self._load_json(self._pending_path(platform)) - if code not in pending: + + # Find the entry whose hash matches the provided code + matched_key = None + matched_entry = None + for entry_id, entry in pending.items(): + salt = bytes.fromhex(entry["salt"]) + candidate_hash = self._hash_code(code, salt) + if secrets.compare_digest(candidate_hash, entry["hash"]): + matched_key = entry_id + matched_entry = entry + break + + if matched_key is None: self._record_failed_attempt(platform) return None - entry = pending.pop(code) + del pending[matched_key] self._save_json(self._pending_path(platform), pending) # Add to approved list - self._approve_user(platform, entry["user_id"], entry.get("user_name", "")) + self._approve_user(platform, matched_entry["user_id"], + matched_entry.get("user_name", "")) return { - "user_id": entry["user_id"], - "user_name": entry.get("user_name", ""), + "user_id": matched_entry["user_id"], + "user_name": matched_entry.get("user_name", ""), } def list_pending(self, platform: str = None) -> list: - """List pending pairing requests, optionally filtered by platform.""" + """List pending pairing requests, optionally filtered by platform. + + Codes are stored hashed — the ``code`` field is replaced with the + first 8 hex characters of the hash so admins can distinguish entries + without revealing the original code. + """ results = [] platforms = [platform] if platform else self._all_platforms("pending") for p in platforms: self._cleanup_expired(p) pending = self._load_json(self._pending_path(p)) - for code, info in pending.items(): + for entry_id, info in pending.items(): age_min = int((time.time() - info["created_at"]) / 60) results.append({ "platform": p, - "code": code, + "code": info["hash"][:8], "user_id": info["user_id"], "user_name": info.get("user_name", ""), "age_minutes": age_min, @@ -302,12 +342,12 @@ class PairingStore: pending = self._load_json(path) now = time.time() expired = [ - code for code, info in pending.items() + entry_id for entry_id, info in pending.items() if (now - info["created_at"]) > CODE_TTL_SECONDS ] if expired: - for code in expired: - del pending[code] + for entry_id in expired: + del pending[entry_id] self._save_json(path, pending) def _all_platforms(self, suffix: str) -> list: diff --git a/tests/gateway/test_pairing.py b/tests/gateway/test_pairing.py index 36e6bda15dd..144da77f6bb 100644 --- a/tests/gateway/test_pairing.py +++ b/tests/gateway/test_pairing.py @@ -75,9 +75,97 @@ class TestCodeGeneration: code = store.generate_code("telegram", "user1", "Alice") pending = store.list_pending("telegram") assert len(pending) == 1 - assert pending[0]["code"] == code + # list_pending no longer returns the original code — it returns a + # truncated hash prefix. Verify the metadata is correct instead. assert pending[0]["user_id"] == "user1" assert pending[0]["user_name"] == "Alice" + # The code field is now a hash prefix, not the original plaintext code + assert pending[0]["code"] != code + + +# --------------------------------------------------------------------------- +# Hashed storage +# --------------------------------------------------------------------------- + + +class TestHashedStorage: + def test_pending_file_contains_hash_and_salt(self, tmp_path): + """Stored entries must have 'hash' and 'salt', never the plaintext code.""" + with patch("gateway.pairing.PAIRING_DIR", tmp_path): + store = PairingStore() + code = store.generate_code("telegram", "user1", "Alice") + raw = json.loads( + (tmp_path / "telegram-pending.json").read_text(encoding="utf-8") + ) + + assert len(raw) == 1 + entry = next(iter(raw.values())) + # Must have hash and salt fields + assert "hash" in entry + assert "salt" in entry + # Hash must be a valid hex SHA-256 digest (64 hex chars) + assert len(entry["hash"]) == 64 + assert all(c in "0123456789abcdef" for c in entry["hash"]) + # Salt must be a valid hex string (32 hex chars for 16 bytes) + assert len(entry["salt"]) == 32 + assert all(c in "0123456789abcdef" for c in entry["salt"]) + # The plaintext code must NOT appear as a key or value anywhere + assert code not in raw # not a key + for key, val in raw.items(): + assert code != key + for field_val in val.values(): + if isinstance(field_val, str): + assert field_val != code + + def test_plaintext_code_not_stored(self, tmp_path): + """The raw JSON file must not contain the plaintext code anywhere.""" + with patch("gateway.pairing.PAIRING_DIR", tmp_path): + store = PairingStore() + code = store.generate_code("telegram", "user1") + raw_text = (tmp_path / "telegram-pending.json").read_text(encoding="utf-8") + assert code not in raw_text + + def test_valid_code_verifies_against_hash(self, tmp_path): + """approve_code with the correct code should succeed.""" + with patch("gateway.pairing.PAIRING_DIR", tmp_path): + store = PairingStore() + code = store.generate_code("telegram", "user1", "Bob") + result = store.approve_code("telegram", code) + assert result is not None + assert result["user_id"] == "user1" + assert result["user_name"] == "Bob" + + def test_invalid_code_rejected(self, tmp_path): + """approve_code with a wrong code should fail.""" + with patch("gateway.pairing.PAIRING_DIR", tmp_path): + store = PairingStore() + store.generate_code("telegram", "user1") + result = store.approve_code("telegram", "ZZZZZZZZ") + assert result is None + + def test_different_salts_per_entry(self, tmp_path): + """Each pending entry should have a unique salt.""" + with patch("gateway.pairing.PAIRING_DIR", tmp_path): + store = PairingStore() + store.generate_code("telegram", "user0") + store.generate_code("telegram", "user1") + store.generate_code("telegram", "user2") + raw = json.loads( + (tmp_path / "telegram-pending.json").read_text(encoding="utf-8") + ) + salts = [entry["salt"] for entry in raw.values()] + assert len(set(salts)) == 3 # all unique + + def test_hash_code_static_method(self, tmp_path): + """_hash_code should be deterministic for the same code+salt.""" + salt = os.urandom(16) + h1 = PairingStore._hash_code("ABCD1234", salt) + h2 = PairingStore._hash_code("ABCD1234", salt) + assert h1 == h2 + # Different salt should produce a different hash + salt2 = os.urandom(16) + h3 = PairingStore._hash_code("ABCD1234", salt2) + assert h3 != h1 # --------------------------------------------------------------------------- @@ -300,9 +388,10 @@ class TestCodeExpiry: store = PairingStore() code = store.generate_code("telegram", "user1") - # Manually expire the code + # Manually expire all pending entries pending = store._load_json(store._pending_path("telegram")) - pending[code]["created_at"] = time.time() - CODE_TTL_SECONDS - 1 + for entry_id in pending: + pending[entry_id]["created_at"] = time.time() - CODE_TTL_SECONDS - 1 store._save_json(store._pending_path("telegram"), pending) # Cleanup happens on next operation @@ -314,9 +403,10 @@ class TestCodeExpiry: store = PairingStore() code = store.generate_code("telegram", "user1") - # Expire it + # Expire all entries pending = store._load_json(store._pending_path("telegram")) - pending[code]["created_at"] = time.time() - CODE_TTL_SECONDS - 1 + for entry_id in pending: + pending[entry_id]["created_at"] = time.time() - CODE_TTL_SECONDS - 1 store._save_json(store._pending_path("telegram"), pending) result = store.approve_code("telegram", code)