hermes-agent/tests/gateway/relay/test_auth.py
Ben Barclay c276b017ad
feat(relay): connector⇄gateway channel auth + signed-HTTP inbound receiver + enroll CLI (#48147)
* feat(relay): authenticate the connector⇄gateway WS channel

The relay gateway may be customer-managed and internet-exposed, so the
connector⇄gateway channel is itself authenticated (distinct from the
platform crypto the relay path sheds). Add gateway/relay/auth.py — a
Python port of the connector's HMAC token + delivery-signature schemes
(relayAuthToken.ts / deliverySigning.ts), verified byte-for-byte against
the connector's compiled TypeScript via cross-language test vectors.

Present an Authorization bearer on the /relay WS upgrade keyed by the
per-gateway secret (resolved from GATEWAY_RELAY_ID / GATEWAY_RELAY_SECRET
in env or config). The connector rejects an unauthenticated/invalid/
revoked upgrade with close 4401.

* feat(relay): signed-HTTP inbound delivery receiver

The connector delivers normalized inbound events to a tenant's gateway
over a signed HTTP POST, not the outbound /relay WS: the connector
instance owning a platform socket is generally not the instance a given
gateway dialed out to, so inbound targets a tenant endpoint that may
load-balance across gateway instances.

Add gateway/relay/inbound_receiver.py — verifies x-relay-signature /
x-relay-timestamp over the EXACT raw request bytes (re-serializing would
break the HMAC: JS JSON.stringify is compact, Python json.dumps spaces)
against the per-tenant delivery key verify list within a 300s replay
window, then dispatches messages to handle_message and interrupts to the
interrupt handler. Wire it into the adapter lifecycle (start in connect()
when a delivery key + bind port are configured, tear down in disconnect();
a purely-outbound dev gateway runs without it).

Refine test_relay_sheds_crypto to distinguish PLATFORM crypto (Discord
ed25519, Twilio/WeCom HMAC — still shed) from the connector⇄gateway
CHANNEL auth (intended): auth.py / inbound_receiver.py are exempt from
the platform-symbol scan but still banned from importing platform-crypto
modules, plus a positive guard that auth.py uses only stdlib hmac/hashlib.

* feat(relay): hermes gateway enroll CLI

Add the gateway half of zero-touch enrollment. `hermes gateway enroll`
resolves a fresh Nous Portal access token (the tenant-proving identity),
POSTs {enrollmentToken, gatewayId} to the connector's /relay/enroll, and
persists GATEWAY_RELAY_ID / GATEWAY_RELAY_SECRET / GATEWAY_RELAY_DELIVERY_KEY
to ~/.hermes/.env. The per-gateway secret authenticates the WS upgrade;
the per-tenant delivery key verifies signed inbound deliveries.

Refuses under is_managed() (hosted installs get the secret stamped in by
the orchestrator). Added as an 'enroll' subcommand on the existing
gateway subparser — not a new top-level command.

* docs(relay): inbound is signed HTTP, not WS; document channel auth

Fix the stale contract: §3/§5 said inbound rode the WS socket (single-
instance only, predates the multi-instance socket-ownership + channel-auth
model). Inbound + connector→gateway interrupt are signed HTTP POSTs to the
tenant endpoint. Add §6.1 documenting the two channel-auth schemes (per-
gateway WS-upgrade secret, per-tenant inbound delivery key) and how they
differ from the platform crypto the relay path sheds.

* test(relay): update build_gateway_parser callers for cmd_gateway_enroll

The enroll subcommand added cmd_gateway_enroll as a required keyword-only
arg to build_gateway_parser, but two existing parser-extraction tests still
called it with only cmd_gateway/cmd_proxy — failing CI with TypeError.
Thread the new handler through both call sites and add a test asserting
`gateway enroll` dispatches to cmd_gateway_enroll with its flags parsed.
2026-06-18 12:01:54 +10:00

167 lines
6.7 KiB
Python

"""Unit tests for gateway/relay/auth.py — the gateway-side relay auth primitives.
Two layers:
1. **Self-consistency** — make_token/verify_token round-trip, delivery-signature
verify, rotation verify list, tamper + skew + expiry rejection.
2. **Cross-implementation conformance** — frozen vectors generated by the
connector's TypeScript (``src/core/relayAuthToken.ts`` ``makeToken``/``sign``)
are reproduced byte-for-byte by the Python port. If the connector ever
changes its wire scheme, these vectors must be regenerated in lockstep
(and that is the point — the test fails loudly on drift). Regenerate with:
node -e 'import("./dist/core/relayAuthToken.js").then(m=>{ \
const s="00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"; \
console.log(m.makeToken("gw-instance-1", s, 0)); \
console.log(m.sign("1750000000."+JSON.stringify({a:1}), s)); })'
"""
from __future__ import annotations
import json
from gateway.relay.auth import (
DELIVERY_SIG_HEADER,
DELIVERY_TS_HEADER,
make_token,
make_upgrade_token,
sign,
verify_delivery_signature,
verify_signature,
verify_token,
)
# A fixed 256-bit hex secret used for the frozen connector vectors below.
_SECRET = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
# ── Frozen vectors produced by the connector's TypeScript (relayAuthToken.ts).
# Generated via dist/core/relayAuthToken.js makeToken/sign; see module docstring.
_CONN_TOKEN = "Z3ctaW5zdGFuY2UtMTowOjM3YWE3YjE0NWU4NzY0ZDQwM2JhOWM2MzlmMjMwZGQ2M2RlOGVkOTliODhmZWQzNmFhMDI2MjVhOGE3ZTM1NjQ"
# The EXACT bytes the connector signed: JS JSON.stringify emits compact JSON
# (no spaces). The gateway verifies over the literal received body, so the
# vector is the compact form — NOT Python's spaced json.dumps default. This is
# the raw-byte-preservation discipline (a single differing byte breaks the HMAC).
_CONN_BODY = '{"type":"message","event":{"text":"hi","source":{"chat_id":"c1"}}}'
_CONN_TS = 1750000000
_CONN_SIG = "ac9509c8dae52b5590f06378260877334ff1adc4b1c96bafa4b514165fae6dc6"
# ── Self-consistency ──────────────────────────────────────────────────────
def test_token_round_trip_no_expiry():
tok = make_token("payload-123", _SECRET, 0)
assert verify_token(tok, [_SECRET]) == "payload-123"
def test_token_payload_may_contain_colons():
# verify_token must split from the right so a colon-bearing payload survives.
payload = "agent:main:discord:group:chanA"
tok = make_token(payload, _SECRET, 0)
assert verify_token(tok, [_SECRET]) == payload
def test_upgrade_token_is_make_token_of_gateway_id():
assert make_upgrade_token("gw-1", _SECRET, 0) == make_token("gw-1", _SECRET, 0)
def test_token_wrong_secret_rejected():
tok = make_token("p", _SECRET, 0)
assert verify_token(tok, ["deadbeef" * 8]) is None
def test_token_expired_rejected():
# ttl in the past -> exp < now -> rejected.
tok = make_token("p", _SECRET, ttl_seconds=1)
# Force expiry by signing with a manual past exp via the low-level helper.
# Simpler: a 1s ttl token is still valid now; instead assert a clearly-old one.
# Build an already-expired token by hand using the same scheme.
import base64
signed = "p:1" # exp=1 (1970) -> long past
sig = sign(signed, _SECRET)
raw = f"{signed}:{sig}".encode()
expired = base64.urlsafe_b64encode(raw).decode().rstrip("=")
assert verify_token(expired, [_SECRET]) is None
# And the fresh one is accepted.
assert verify_token(tok, [_SECRET]) == "p"
def test_token_rotation_verify_list():
# A token signed with the (old) secondary still verifies during rotation.
old, new = _SECRET, "ffeeddccbbaa99887766554433221100ffeeddccbbaa99887766554433221100"
tok_old = make_token("p", old, 0)
assert verify_token(tok_old, [new, old]) == "p" # primary=new, secondary=old
assert verify_token(tok_old, [new]) is None
def test_token_garbage_rejected():
assert verify_token("not-base64url!!!", [_SECRET]) is None
assert verify_token("", [_SECRET]) is None
def test_verify_signature_constant_time_multi_secret():
payload = "1700000000.body"
s = sign(payload, _SECRET)
assert verify_signature(payload, s, ["wrong", _SECRET]) is True
assert verify_signature(payload, s, ["wrong"]) is False
assert verify_signature(payload, "zz", [_SECRET]) is False # bad hex
# ── Delivery signature (connector -> gateway inbound) ──────────────────────
def test_delivery_signature_accepts_valid():
body = json.dumps({"type": "message", "event": {"text": "x"}})
ts = 1700000000
s = sign(f"{ts}.{body}", _SECRET)
assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts) is True
def test_delivery_signature_tamper_rejected():
body = json.dumps({"type": "message", "event": {"text": "x"}})
ts = 1700000000
s = sign(f"{ts}.{body}", _SECRET)
# A single changed body byte breaks the HMAC.
assert verify_delivery_signature(body + " ", str(ts), s, [_SECRET], now=ts) is False
def test_delivery_signature_skew_rejected():
body = "{}"
ts = 1700000000
s = sign(f"{ts}.{body}", _SECRET)
# Beyond the 300s replay window in either direction.
assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts + 301) is False
assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts - 301) is False
assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts + 299) is True
def test_delivery_signature_missing_headers_rejected():
assert verify_delivery_signature("{}", None, "abc", [_SECRET]) is False
assert verify_delivery_signature("{}", "1700000000", None, [_SECRET]) is False
assert verify_delivery_signature("{}", "not-an-int", "abc", [_SECRET]) is False
def test_delivery_headers_match_connector_names():
# The gateway reads exactly the header names the connector writes.
assert DELIVERY_TS_HEADER == "x-relay-timestamp"
assert DELIVERY_SIG_HEADER == "x-relay-signature"
# ── Cross-implementation conformance (frozen connector vectors) ────────────
def test_python_make_token_matches_connector_byte_for_byte():
assert make_token("gw-instance-1", _SECRET, 0) == _CONN_TOKEN
def test_python_verifies_connector_token():
assert verify_token(_CONN_TOKEN, [_SECRET]) == "gw-instance-1"
def test_python_sign_matches_connector_delivery_sig():
assert sign(f"{_CONN_TS}.{_CONN_BODY}", _SECRET) == _CONN_SIG
def test_python_verifies_connector_delivery_signature():
assert verify_delivery_signature(_CONN_BODY, str(_CONN_TS), _CONN_SIG, [_SECRET], now=_CONN_TS) is True