diff --git a/gateway/pairing.py b/gateway/pairing.py index d5f7ec6b96..af9ff2fdbf 100644 --- a/gateway/pairing.py +++ b/gateway/pairing.py @@ -195,12 +195,23 @@ class PairingStore: """ Approve a pairing code. Adds the user to the approved list. - Returns {user_id, user_name} on success, None if code is invalid/expired. + Returns {user_id, user_name} on success, None if code is + invalid/expired OR the platform is currently locked out after + ``MAX_FAILED_ATTEMPTS`` failed approvals (#10195). Callers can + disambiguate with ``_is_locked_out(platform)``. """ with self._lock: self._cleanup_expired(platform) code = code.upper().strip() + # Lockout check — must run before the pending lookup so a + # valid code (e.g. one already sitting in pending) cannot be + # accepted once the lockout fires. Without this, the lockout + # only blocks `generate_code`, not `approve_code` — nullifying + # the brute-force protection for any code already issued. + if self._is_locked_out(platform): + return None + pending = self._load_json(self._pending_path(platform)) if code not in pending: self._record_failed_attempt(platform) diff --git a/hermes_cli/pairing.py b/hermes_cli/pairing.py index 887b7e49ff..101a1d10bc 100644 --- a/hermes_cli/pairing.py +++ b/hermes_cli/pairing.py @@ -73,6 +73,24 @@ def _cmd_approve(store, platform: str, code: str): display = f"{name} ({uid})" if name else uid print(f"\n Approved! User {display} on {platform} can now use the bot~") print(" They'll be recognized automatically on their next message.\n") + elif store._is_locked_out(platform): + # Disambiguate: approve_code returns None for both invalid codes + # and lockout. Tell the operator it's lockout so they don't chase + # a "wrong code" rabbit hole (#10195). + import time as _time + limits = store._load_json(store._rate_limit_path()) + lockout_until = limits.get(f"_lockout:{platform}", 0) + remaining = max(0, int(lockout_until - _time.time())) + mins = remaining // 60 + print( + f"\n Platform '{platform}' is locked out after too many failed " + f"approval attempts." + ) + print(f" Lockout clears in ~{mins} minute(s).") + print( + " To reset sooner, delete the '_lockout:{0}' entry from " + "~/.hermes/platforms/pairing/_rate_limits.json\n".format(platform) + ) else: print(f"\n Code '{code}' not found or expired for platform '{platform}'.") print(" Run 'hermes pairing list' to see pending codes.\n") diff --git a/tests/gateway/test_pairing.py b/tests/gateway/test_pairing.py index da14e25269..36e6bda15d 100644 --- a/tests/gateway/test_pairing.py +++ b/tests/gateway/test_pairing.py @@ -238,6 +238,42 @@ class TestLockout: code = store.generate_code("telegram", "newuser") assert code is None + def test_lockout_blocks_code_approval(self, tmp_path): + """Regression guard for #10195: lockout must also gate approve_code. + + Prior to the fix, 5 failed approvals set the lockout flag but + approve_code() never consulted it — so any valid code already + in `pending` (or a later lucky guess) still got accepted, + nullifying the brute-force protection. + """ + with patch("gateway.pairing.PAIRING_DIR", tmp_path): + store = PairingStore() + # Generate a valid code before triggering the lockout. + valid_code = store.generate_code("telegram", "attacker", "Attacker") + assert valid_code is not None + + # Trigger the lockout with wrong codes. + for _ in range(MAX_FAILED_ATTEMPTS): + assert store.approve_code("telegram", "WRONGCODE") is None + assert store._is_locked_out("telegram") is True + + # The valid code must be rejected while the lockout is active, + # and the user must NOT land in the approved list. + result = store.approve_code("telegram", valid_code) + assert result is None + assert store.is_approved("telegram", "attacker") is False + + # Simulate lockout expiry — the valid code is still in pending + # (we didn't pop it) and must now approve normally. + limits = store._load_json(store._rate_limit_path()) + limits["_lockout:telegram"] = time.time() - 1 + store._save_json(store._rate_limit_path(), limits) + + result = store.approve_code("telegram", valid_code) + assert result is not None + assert result["user_id"] == "attacker" + assert store.is_approved("telegram", "attacker") is True + def test_lockout_expires(self, tmp_path): with patch("gateway.pairing.PAIRING_DIR", tmp_path): store = PairingStore()