"""Unit tests for gateway/relay/auth.py — the gateway-side relay auth primitives. Two layers: 1. **Self-consistency** — make_token/verify_token round-trip, delivery-signature verify, rotation verify list, tamper + skew + expiry rejection. 2. **Cross-implementation conformance** — frozen vectors generated by the connector's TypeScript (``src/core/relayAuthToken.ts`` ``makeToken``/``sign``) are reproduced byte-for-byte by the Python port. If the connector ever changes its wire scheme, these vectors must be regenerated in lockstep (and that is the point — the test fails loudly on drift). Regenerate with: node -e 'import("./dist/core/relayAuthToken.js").then(m=>{ \ const s="00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"; \ console.log(m.makeToken("gw-instance-1", s, 0)); \ console.log(m.sign("1750000000."+JSON.stringify({a:1}), s)); })' """ from __future__ import annotations import json from gateway.relay.auth import ( DELIVERY_SIG_HEADER, DELIVERY_TS_HEADER, make_token, make_upgrade_token, sign, verify_delivery_signature, verify_signature, verify_token, ) # A fixed 256-bit hex secret used for the frozen connector vectors below. _SECRET = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff" # ── Frozen vectors produced by the connector's TypeScript (relayAuthToken.ts). # Generated via dist/core/relayAuthToken.js makeToken/sign; see module docstring. _CONN_TOKEN = "Z3ctaW5zdGFuY2UtMTowOjM3YWE3YjE0NWU4NzY0ZDQwM2JhOWM2MzlmMjMwZGQ2M2RlOGVkOTliODhmZWQzNmFhMDI2MjVhOGE3ZTM1NjQ" # The EXACT bytes the connector signed: JS JSON.stringify emits compact JSON # (no spaces). The gateway verifies over the literal received body, so the # vector is the compact form — NOT Python's spaced json.dumps default. This is # the raw-byte-preservation discipline (a single differing byte breaks the HMAC). _CONN_BODY = '{"type":"message","event":{"text":"hi","source":{"chat_id":"c1"}}}' _CONN_TS = 1750000000 _CONN_SIG = "ac9509c8dae52b5590f06378260877334ff1adc4b1c96bafa4b514165fae6dc6" # ── Self-consistency ────────────────────────────────────────────────────── def test_token_round_trip_no_expiry(): tok = make_token("payload-123", _SECRET, 0) assert verify_token(tok, [_SECRET]) == "payload-123" def test_token_payload_may_contain_colons(): # verify_token must split from the right so a colon-bearing payload survives. payload = "agent:main:discord:group:chanA" tok = make_token(payload, _SECRET, 0) assert verify_token(tok, [_SECRET]) == payload def test_upgrade_token_is_make_token_of_gateway_id(): assert make_upgrade_token("gw-1", _SECRET, 0) == make_token("gw-1", _SECRET, 0) def test_token_wrong_secret_rejected(): tok = make_token("p", _SECRET, 0) assert verify_token(tok, ["deadbeef" * 8]) is None def test_token_expired_rejected(): # ttl in the past -> exp < now -> rejected. tok = make_token("p", _SECRET, ttl_seconds=1) # Force expiry by signing with a manual past exp via the low-level helper. # Simpler: a 1s ttl token is still valid now; instead assert a clearly-old one. # Build an already-expired token by hand using the same scheme. import base64 signed = "p:1" # exp=1 (1970) -> long past sig = sign(signed, _SECRET) raw = f"{signed}:{sig}".encode() expired = base64.urlsafe_b64encode(raw).decode().rstrip("=") assert verify_token(expired, [_SECRET]) is None # And the fresh one is accepted. assert verify_token(tok, [_SECRET]) == "p" def test_token_rotation_verify_list(): # A token signed with the (old) secondary still verifies during rotation. old, new = _SECRET, "ffeeddccbbaa99887766554433221100ffeeddccbbaa99887766554433221100" tok_old = make_token("p", old, 0) assert verify_token(tok_old, [new, old]) == "p" # primary=new, secondary=old assert verify_token(tok_old, [new]) is None def test_token_garbage_rejected(): assert verify_token("not-base64url!!!", [_SECRET]) is None assert verify_token("", [_SECRET]) is None def test_verify_signature_constant_time_multi_secret(): payload = "1700000000.body" s = sign(payload, _SECRET) assert verify_signature(payload, s, ["wrong", _SECRET]) is True assert verify_signature(payload, s, ["wrong"]) is False assert verify_signature(payload, "zz", [_SECRET]) is False # bad hex # ── Delivery signature (connector -> gateway inbound) ────────────────────── def test_delivery_signature_accepts_valid(): body = json.dumps({"type": "message", "event": {"text": "x"}}) ts = 1700000000 s = sign(f"{ts}.{body}", _SECRET) assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts) is True def test_delivery_signature_tamper_rejected(): body = json.dumps({"type": "message", "event": {"text": "x"}}) ts = 1700000000 s = sign(f"{ts}.{body}", _SECRET) # A single changed body byte breaks the HMAC. assert verify_delivery_signature(body + " ", str(ts), s, [_SECRET], now=ts) is False def test_delivery_signature_skew_rejected(): body = "{}" ts = 1700000000 s = sign(f"{ts}.{body}", _SECRET) # Beyond the 300s replay window in either direction. assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts + 301) is False assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts - 301) is False assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts + 299) is True def test_delivery_signature_missing_headers_rejected(): assert verify_delivery_signature("{}", None, "abc", [_SECRET]) is False assert verify_delivery_signature("{}", "1700000000", None, [_SECRET]) is False assert verify_delivery_signature("{}", "not-an-int", "abc", [_SECRET]) is False def test_delivery_headers_match_connector_names(): # The gateway reads exactly the header names the connector writes. assert DELIVERY_TS_HEADER == "x-relay-timestamp" assert DELIVERY_SIG_HEADER == "x-relay-signature" # ── Cross-implementation conformance (frozen connector vectors) ──────────── def test_python_make_token_matches_connector_byte_for_byte(): assert make_token("gw-instance-1", _SECRET, 0) == _CONN_TOKEN def test_python_verifies_connector_token(): assert verify_token(_CONN_TOKEN, [_SECRET]) == "gw-instance-1" def test_python_sign_matches_connector_delivery_sig(): assert sign(f"{_CONN_TS}.{_CONN_BODY}", _SECRET) == _CONN_SIG def test_python_verifies_connector_delivery_signature(): assert verify_delivery_signature(_CONN_BODY, str(_CONN_TS), _CONN_SIG, [_SECRET], now=_CONN_TS) is True