From c276b017adc4e74ae29236c39db86b7ba167afe4 Mon Sep 17 00:00:00 2001 From: Ben Barclay Date: Thu, 18 Jun 2026 12:01:54 +1000 Subject: [PATCH] =?UTF-8?q?feat(relay):=20connector=E2=87=84gateway=20chan?= =?UTF-8?q?nel=20auth=20+=20signed-HTTP=20inbound=20receiver=20+=20enroll?= =?UTF-8?q?=20CLI=20(#48147)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(relay): authenticate the connector⇄gateway WS channel The relay gateway may be customer-managed and internet-exposed, so the connector⇄gateway channel is itself authenticated (distinct from the platform crypto the relay path sheds). Add gateway/relay/auth.py — a Python port of the connector's HMAC token + delivery-signature schemes (relayAuthToken.ts / deliverySigning.ts), verified byte-for-byte against the connector's compiled TypeScript via cross-language test vectors. Present an Authorization bearer on the /relay WS upgrade keyed by the per-gateway secret (resolved from GATEWAY_RELAY_ID / GATEWAY_RELAY_SECRET in env or config). The connector rejects an unauthenticated/invalid/ revoked upgrade with close 4401. * feat(relay): signed-HTTP inbound delivery receiver The connector delivers normalized inbound events to a tenant's gateway over a signed HTTP POST, not the outbound /relay WS: the connector instance owning a platform socket is generally not the instance a given gateway dialed out to, so inbound targets a tenant endpoint that may load-balance across gateway instances. Add gateway/relay/inbound_receiver.py — verifies x-relay-signature / x-relay-timestamp over the EXACT raw request bytes (re-serializing would break the HMAC: JS JSON.stringify is compact, Python json.dumps spaces) against the per-tenant delivery key verify list within a 300s replay window, then dispatches messages to handle_message and interrupts to the interrupt handler. Wire it into the adapter lifecycle (start in connect() when a delivery key + bind port are configured, tear down in disconnect(); a purely-outbound dev gateway runs without it). Refine test_relay_sheds_crypto to distinguish PLATFORM crypto (Discord ed25519, Twilio/WeCom HMAC — still shed) from the connector⇄gateway CHANNEL auth (intended): auth.py / inbound_receiver.py are exempt from the platform-symbol scan but still banned from importing platform-crypto modules, plus a positive guard that auth.py uses only stdlib hmac/hashlib. * feat(relay): hermes gateway enroll CLI Add the gateway half of zero-touch enrollment. `hermes gateway enroll` resolves a fresh Nous Portal access token (the tenant-proving identity), POSTs {enrollmentToken, gatewayId} to the connector's /relay/enroll, and persists GATEWAY_RELAY_ID / GATEWAY_RELAY_SECRET / GATEWAY_RELAY_DELIVERY_KEY to ~/.hermes/.env. The per-gateway secret authenticates the WS upgrade; the per-tenant delivery key verifies signed inbound deliveries. Refuses under is_managed() (hosted installs get the secret stamped in by the orchestrator). Added as an 'enroll' subcommand on the existing gateway subparser — not a new top-level command. * docs(relay): inbound is signed HTTP, not WS; document channel auth Fix the stale contract: §3/§5 said inbound rode the WS socket (single- instance only, predates the multi-instance socket-ownership + channel-auth model). Inbound + connector→gateway interrupt are signed HTTP POSTs to the tenant endpoint. Add §6.1 documenting the two channel-auth schemes (per- gateway WS-upgrade secret, per-tenant inbound delivery key) and how they differ from the platform crypto the relay path sheds. * test(relay): update build_gateway_parser callers for cmd_gateway_enroll The enroll subcommand added cmd_gateway_enroll as a required keyword-only arg to build_gateway_parser, but two existing parser-extraction tests still called it with only cmd_gateway/cmd_proxy — failing CI with TypeError. Thread the new handler through both call sites and add a test asserting `gateway enroll` dispatches to cmd_gateway_enroll with its flags parsed. --- docs/relay-connector-contract.md | 67 ++++- gateway/relay/__init__.py | 67 ++++- gateway/relay/adapter.py | 42 +++ gateway/relay/auth.py | 168 ++++++++++++ gateway/relay/inbound_receiver.py | 204 ++++++++++++++ gateway/relay/ws_transport.py | 33 ++- hermes_cli/gateway_enroll.py | 250 ++++++++++++++++++ hermes_cli/main.py | 11 +- hermes_cli/subcommands/gateway.py | 50 +++- tests/gateway/relay/test_auth.py | 167 ++++++++++++ tests/gateway/relay/test_inbound_receiver.py | 150 +++++++++++ .../gateway/relay/test_relay_sheds_crypto.py | 52 +++- tests/hermes_cli/test_gateway.py | 1 + .../test_subcommands_profile_gateway.py | 33 ++- 14 files changed, 1279 insertions(+), 16 deletions(-) create mode 100644 gateway/relay/auth.py create mode 100644 gateway/relay/inbound_receiver.py create mode 100644 hermes_cli/gateway_enroll.py create mode 100644 tests/gateway/relay/test_auth.py create mode 100644 tests/gateway/relay/test_inbound_receiver.py diff --git a/docs/relay-connector-contract.md b/docs/relay-connector-contract.md index 2a041bc7f9f..39c86a5f839 100644 --- a/docs/relay-connector-contract.md +++ b/docs/relay-connector-contract.md @@ -61,10 +61,34 @@ live platform adapter's capability methods. ## 3. Inbound: `MessageEvent` envelope The connector normalizes each platform wire event into a `MessageEvent` -(`gateway/platforms/base.py`) and delivers it to the gateway's inbound handler. -The gateway keys the session via `build_session_key()` from the embedded -`SessionSource` — so populating the right discriminators is the single -highest-correctness responsibility of the connector. +(`gateway/platforms/base.py`) and delivers it to the gateway. **Inbound is +delivered over a signed HTTP POST, not the outbound `/relay` WebSocket** (see +the transport note below). The gateway keys the session via `build_session_key()` +from the embedded `SessionSource` — so populating the right discriminators is +the single highest-correctness responsibility of the connector. + +### Inbound transport (signed HTTP POST, not the outbound WS) + +The gateway dials **out** to the connector's `/relay` WebSocket for the +handshake + outbound actions (§4) + its own `/stop` egress (§5). Inbound, +however, is delivered the other way: the connector **POSTs** the normalized +event to the gateway's inbound endpoint (`HttpGatewayDelivery` on the connector; +`gateway/relay/inbound_receiver.py` on the gateway). The reason is +multi-instance: the connector instance that owns a platform's socket (and thus +produces inbound events) is generally **not** the instance a given gateway +dialed its outbound WS into, so inbound must target a tenant **endpoint** (which +may load-balance across gateway instances) rather than ride one gateway's +outbound socket. Each delivery is HMAC-signed with the per-tenant **delivery +key** (§6.1); the gateway verifies the signature over the exact raw bytes before +accepting the event. Two POST targets: + +- `POST {gatewayEndpoint}` → `{"type":"message", "event": }` +- `POST {gatewayEndpoint}/interrupt` → `{"type":"interrupt", "session_key", "reason"?}` (§5) + +> An earlier draft of this contract delivered inbound over the WS `inbound` +> frame. That only works single-instance and predates the multi-instance +> socket-ownership + channel-auth model; the signed-HTTP path above is the +> shipped design. ### SessionSource fields (the wire surface) @@ -151,14 +175,16 @@ gateway holds zero capability material). Source of truth: ## 5. Interrupt (`/stop`) routing - **Gateway → connector:** `send_interrupt(session_key, reason?)` egresses a - mid-turn `/stop`. The connector MUST forward it down the socket owned by the + mid-turn `/stop` over the outbound WS. The connector MUST forward it to the gateway instance running that `session_key` (the routing invariant). -- **Connector → gateway:** an inbound interrupt for a `session_key` is bridged - by the adapter's `on_interrupt(session_key, chat_id)` into the existing - per-session interrupt mechanism, cancelling exactly that turn (siblings - untouched). +- **Connector → gateway:** an inbound interrupt for a `session_key` is delivered + as a **signed HTTP POST** to `{gatewayEndpoint}/interrupt` (§3 transport note), + and bridged by the adapter's `on_interrupt(session_key, chat_id)` into the + existing per-session interrupt mechanism, cancelling exactly that turn + (siblings untouched). -The interrupt rides the same per-turn bidirectional socket as inbound/outbound. +The gateway→connector `/stop` rides the outbound WS; the connector→gateway +interrupt rides the same signed-HTTP inbound path as a normalized event. --- @@ -201,6 +227,27 @@ relay planes — both are "verify at the edge → emit a normalized event," diff only in transport. See `docs/capability-trust-boundary.md` (connector repo: `gateway-gateway`) for the full A2 rationale and the connector-side vault. +### 6.1 Channel authentication (the connector⇄gateway link itself) + +A2 makes the connector the sole holder of platform secrets while the gateway may +be **customer-managed and internet-exposed**, so the connector⇄gateway channel +is itself authenticated. The gateway holds two enrollment-issued credentials +(`hermes gateway enroll` → connector `/relay/enroll`): a **per-gateway secret** +and a **per-tenant delivery key**. Both are HMAC-SHA256 schemes with a +multi-secret rotation verify list (gateway side: `gateway/relay/auth.py`; +connector side: `src/core/relayAuthToken.ts` + `src/core/deliverySigning.ts`). + +| Leg | Credential | Mechanism | +|-----|-----------|-----------| +| Gateway → connector WS upgrade | per-gateway secret | An `Authorization` bearer header on the `/relay` upgrade. The token is `base64url(payload:exp:sig)` where `payload = gatewayId` and `sig = HMAC(payload:exp, secret)`. Connector verifies and rejects the upgrade (**close 4401**) on mismatch/absence/revocation. The authenticated tenant comes from the connector's store, never the `hello` frame. | +| Connector → gateway inbound POST | per-tenant delivery key | Two headers: `x-relay-timestamp` (unix seconds) and `x-relay-signature` (hex `HMAC(ts.rawBody, deliveryKey)`). Gateway verifies over the **exact raw bytes** within a ±300s replay window before accepting the event; rejects **401** otherwise. | + +This is the **channel** authenticator — distinct from platform crypto, which the +relay path still sheds entirely (§6). The gateway holds zero platform secrets; +these two keys authenticate only the connector link. Full threat model + +enrollment/rotation/kill-switch design: `docs/connector-gateway-auth-design.md` +(connector repo). + --- ## 7. Versioning policy diff --git a/gateway/relay/__init__.py b/gateway/relay/__init__.py index 803ea867ee1..f1deb20e5c7 100644 --- a/gateway/relay/__init__.py +++ b/gateway/relay/__init__.py @@ -55,6 +55,64 @@ def relay_platform_identity() -> tuple[str, str]: return platform, bot_id +def relay_connection_auth() -> tuple[Optional[str], Optional[str]]: + """The (gateway_id, upgrade_secret) this gateway authenticates the WS upgrade with. + + Both come from enrollment (``hermes gateway enroll`` writes them to + ``~/.hermes/.env``): ``GATEWAY_RELAY_ID`` identifies the enrolled instance, + ``GATEWAY_RELAY_SECRET`` is the per-gateway signing secret. Either absent -> + ``(None, None)`` and the transport dials unauthenticated (dev/test, or a + connector that doesn't enforce auth). Checks env first (Docker), then + ``gateway.relay_id`` / ``gateway.relay_secret`` in config.yaml. + """ + gateway_id = os.environ.get("GATEWAY_RELAY_ID", "").strip() + secret = os.environ.get("GATEWAY_RELAY_SECRET", "").strip() + if not (gateway_id and secret): + try: + from gateway.run import _load_gateway_config # late import to avoid cycle + + cfg = (_load_gateway_config().get("gateway") or {}) + gateway_id = gateway_id or str(cfg.get("relay_id", "") or "").strip() + secret = secret or str(cfg.get("relay_secret", "") or "").strip() + except Exception: # noqa: BLE001 - config absence/parse must never crash registration + pass + return (gateway_id or None, secret or None) + + +def relay_inbound_config() -> tuple[Optional[str], Optional[str], int]: + """Resolve (delivery_key, bind_host, bind_port) for the inbound receiver. + + The connector delivers normalized inbound events to this gateway over a + SIGNED HTTP POST (not the outbound WS), verified with the per-tenant delivery + key issued at enrollment (``GATEWAY_RELAY_DELIVERY_KEY``). The receiver only + starts when a delivery key AND a bind port are configured — a gateway with no + public inbound URL (e.g. a purely outbound dev run) simply doesn't run it. + + Env first (Docker), then ``gateway.relay_delivery_key`` / + ``gateway.relay_inbound_host`` / ``gateway.relay_inbound_port`` in config.yaml. + Port 0 (default/unset) -> receiver disabled. + """ + key = os.environ.get("GATEWAY_RELAY_DELIVERY_KEY", "").strip() + host = os.environ.get("GATEWAY_RELAY_INBOUND_HOST", "").strip() + port_raw = os.environ.get("GATEWAY_RELAY_INBOUND_PORT", "").strip() + if not (key and port_raw): + try: + from gateway.run import _load_gateway_config # late import to avoid cycle + + cfg = (_load_gateway_config().get("gateway") or {}) + key = key or str(cfg.get("relay_delivery_key", "") or "").strip() + host = host or str(cfg.get("relay_inbound_host", "") or "").strip() + if not port_raw: + port_raw = str(cfg.get("relay_inbound_port", "") or "").strip() + except Exception: # noqa: BLE001 - config absence/parse must never crash registration + pass + try: + port = int(port_raw) if port_raw else 0 + except ValueError: + port = 0 + return (key or None, host or "0.0.0.0", port) + + def register_relay_adapter(force: bool = False, url: Optional[str] = None) -> bool: """Register the generic ``relay`` platform via the platform registry. @@ -96,7 +154,14 @@ def register_relay_adapter(force: bool = False, url: Optional[str] = None) -> bo if resolved_url: from gateway.relay.ws_transport import WebSocketRelayTransport - transport = WebSocketRelayTransport(resolved_url, platform, bot_id) + gateway_id, upgrade_secret = relay_connection_auth() + transport = WebSocketRelayTransport( + resolved_url, + platform, + bot_id, + gateway_id=gateway_id, + upgrade_secret=upgrade_secret, + ) return RelayAdapter(config, placeholder, transport=transport) platform_registry.register( diff --git a/gateway/relay/adapter.py b/gateway/relay/adapter.py index 0e3116a7ad4..b64f7abc517 100644 --- a/gateway/relay/adapter.py +++ b/gateway/relay/adapter.py @@ -58,6 +58,10 @@ class RelayAdapter(BasePlatformAdapter): # Capability surface read by stream_consumer (getattr(..., 4096)). self.MAX_MESSAGE_LENGTH = descriptor.max_message_length self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain") + # Inbound delivery receiver (signed connector→gateway HTTP POSTs). Built + # lazily in connect() when a delivery key + bind port are configured; a + # purely-outbound dev gateway runs without it. See inbound_receiver.py. + self._inbound_runner: Any = None # ── capability surface (from descriptor) ───────────────────────────── @property @@ -88,8 +92,40 @@ class RelayAdapter(BasePlatformAdapter): logger.warning("relay handshake failed: %s", exc) return False self._apply_descriptor(descriptor) + # Start the signed inbound-delivery receiver if configured (the connector + # POSTs normalized events to it over HTTP, verified with the tenant + # delivery key). Non-fatal: a receiver bind failure must not fail the + # outbound connection — the gateway can still send. + await self._maybe_start_inbound_receiver() return True + async def _maybe_start_inbound_receiver(self) -> None: + """Start the inbound HTTP receiver when a delivery key + port are set.""" + from gateway.relay import relay_inbound_config + + delivery_key, host, port = relay_inbound_config() + if not (delivery_key and port): + return # no inbound URL configured -> outbound-only gateway + try: + from aiohttp import web + + from gateway.relay.inbound_receiver import InboundDeliveryReceiver + + receiver = InboundDeliveryReceiver( + delivery_key_verify_list=lambda: [delivery_key], + on_message=self._on_inbound, + on_interrupt=self.on_interrupt, + ) + runner = web.AppRunner(receiver.build_app(), access_log=None) + await runner.setup() + site = web.TCPSite(runner, host, port) + await site.start() + self._inbound_runner = runner + logger.info("relay inbound receiver listening on http://%s:%s", host, port) + except Exception as exc: # noqa: BLE001 - inbound bind failure must not kill outbound + logger.warning("relay inbound receiver failed to start: %s", exc) + self._inbound_runner = None + def _apply_descriptor(self, descriptor: CapabilityDescriptor) -> None: """Adopt a (re)negotiated descriptor into the live capability surface.""" self.descriptor = descriptor @@ -112,6 +148,12 @@ class RelayAdapter(BasePlatformAdapter): await self.interrupt_session_activity(session_key, chat_id) async def disconnect(self) -> None: + if self._inbound_runner is not None: + try: + await self._inbound_runner.cleanup() + except Exception: # noqa: BLE001 - best-effort teardown + pass + self._inbound_runner = None if self._transport is not None: await self._transport.disconnect() diff --git a/gateway/relay/auth.py b/gateway/relay/auth.py new file mode 100644 index 00000000000..b67715e20db --- /dev/null +++ b/gateway/relay/auth.py @@ -0,0 +1,168 @@ +"""Gateway-side relay authentication primitives. EXPERIMENTAL. + +The connector⇄gateway channel is authenticated because a gateway may be +customer-managed and internet-exposed (see the connector repo +``docs/connector-gateway-auth-design.md``). This module is the **gateway half** +of two HMAC schemes whose wire bytes must match the connector's TypeScript +exactly: + +1. **WS upgrade auth** (gateway → connector): the gateway presents + ``Authorization: Bearer `` on the ``/relay`` WebSocket upgrade, where + ``token = make_upgrade_token(gateway_id, secret)``. Mirrors the connector's + ``relayAuthToken.ts`` ``makeToken`` (``src/core/relayAuthToken.ts``): + ``base64url(f"{payload}:{exp}:{sig}")`` with + ``sig = HMAC_SHA256(f"{payload}:{exp}", secret).hexdigest()`` and + ``payload == gateway_id``. + +2. **Inbound delivery signature** (connector → gateway): the connector signs + each inbound POST with the per-tenant *delivery key*, carried as + ``x-relay-timestamp`` + ``x-relay-signature`` headers; the gateway verifies + before accepting the event. Mirrors the connector's ``deliverySigning.ts``: + ``sig = HMAC_SHA256(f"{ts}.{body_json}", key).hexdigest()`` over the EXACT + request body bytes, with a replay-window skew check. + +Both schemes use a **multi-secret verify list** (primary first, then a secondary +during a rotation window), exactly like ``api/src/handlers/stats_oauth.ts`` — so +a secret rotation doesn't invalidate outstanding tokens. + +EXPERIMENTAL: may change without a deprecation cycle until ≥2 Class-1 platforms +validate the relay contract. +""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac +import time +from typing import Optional, Sequence + +# Header names the connector uses for inbound delivery signatures +# (connector ``src/core/deliverySigning.ts`` — DELIVERY_TS_HEADER / SIG_HEADER). +DELIVERY_TS_HEADER = "x-relay-timestamp" +DELIVERY_SIG_HEADER = "x-relay-signature" + +# Default replay window for an inbound delivery signature (connector default). +_DEFAULT_MAX_SKEW_SECONDS = 300 +# Default TTL for an upgrade token (connector ``makeUpgradeToken`` default). +_DEFAULT_UPGRADE_TTL_SECONDS = 300 + + +def _hmac_hex(payload: str, secret: str) -> str: + """HMAC-SHA256 hex digest of ``payload`` under ``secret`` (UTF-8).""" + return hmac.new(secret.encode("utf-8"), payload.encode("utf-8"), hashlib.sha256).hexdigest() + + +def sign(payload: str, secret: str) -> str: + """HMAC-SHA256 hex digest — the connector's ``sign`` (relayAuthToken.ts).""" + return _hmac_hex(payload, secret) + + +def verify_signature(payload: str, sig_hex: str, secrets: Sequence[str]) -> bool: + """Constant-time check that ``sig_hex`` is a valid HMAC of ``payload`` under + ANY of ``secrets`` (rotation window). Length-mismatched candidates are + skipped without a timing leak. Mirrors ``verifySignature``. + """ + try: + sig_buf = bytes.fromhex(sig_hex) + except (ValueError, TypeError): + return False + if len(sig_buf) == 0: + return False + for secret in secrets: + if not secret: + continue + expected = bytes.fromhex(_hmac_hex(payload, secret)) + if len(expected) != len(sig_buf): + continue + if hmac.compare_digest(sig_buf, expected): + return True + return False + + +def make_token(payload: str, secret: str, ttl_seconds: int = 0) -> str: + """Build a signed, optionally-expiring token — the connector's ``makeToken``. + + ``base64url(f"{payload}:{exp}:{sig}")`` where ``exp`` is a unix-seconds + expiry (0 = never) and ``sig = HMAC_SHA256(f"{payload}:{exp}", secret)``. + base64url is unpadded to match Node's ``Buffer.toString("base64url")``. + """ + exp = int(time.time()) + ttl_seconds if ttl_seconds > 0 else 0 + signed = f"{payload}:{exp}" + sig = _hmac_hex(signed, secret) + raw = f"{signed}:{sig}".encode("utf-8") + return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=") + + +def make_upgrade_token( + gateway_id: str, secret: str, ttl_seconds: int = _DEFAULT_UPGRADE_TTL_SECONDS +) -> str: + """The WS-upgrade bearer token a gateway sends: ``payload = gateway_id``. + + The connector peeks ``gateway_id`` (the payload head) to index its secret + verify list, then verifies the signature against that gateway's stored + secret(s). Mirrors the connector's ``makeUpgradeToken``. + """ + return make_token(gateway_id, secret, ttl_seconds) + + +def verify_token(token: str, secrets: Sequence[str]) -> Optional[str]: + """Verify a token built by ``make_token``; return the payload or None. + + Splits from the right so a payload may itself contain colons (mirrors the + connector's ``verifyToken``). Rejects an expired token and any signature + that doesn't match a secret in the verify list. + """ + try: + # base64url decode with padding restored. + padded = token + "=" * (-len(token) % 4) + decoded = base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8") + except (ValueError, TypeError): + return None + parts = decoded.split(":") + if len(parts) < 3: + return None + sig = parts[-1] + try: + exp = int(parts[-2]) + except ValueError: + return None + payload = ":".join(parts[:-2]) + if exp != 0 and int(time.time()) > exp: + return None + signed = f"{payload}:{exp}" + return payload if verify_signature(signed, sig, secrets) else None + + +def _delivery_payload(ts: int, body_json: str) -> str: + """Signed material for an inbound delivery: ``f"{ts}.{body_json}"``.""" + return f"{ts}.{body_json}" + + +def verify_delivery_signature( + body_json: str, + timestamp: Optional[str], + signature: Optional[str], + verify_keys: Sequence[str], + max_skew_seconds: int = _DEFAULT_MAX_SKEW_SECONDS, + *, + now: Optional[int] = None, +) -> bool: + """Verify a connector→gateway inbound delivery signature. + + ``body_json`` MUST be the exact request body bytes decoded as UTF-8 — the + connector signs over the literal serialized body, so the gateway verifies + over the literal received body (no re-serialization). Checks the timestamp + is within ``max_skew_seconds`` of now and the HMAC matches any key in the + rotation verify list. Mirrors the connector's ``verifyDeliverySignature``. + """ + if not timestamp or not signature: + return False + try: + ts = int(timestamp) + except (ValueError, TypeError): + return False + current = now if now is not None else int(time.time()) + if abs(current - ts) > max_skew_seconds: + return False + return verify_signature(_delivery_payload(ts, body_json), signature, verify_keys) diff --git a/gateway/relay/inbound_receiver.py b/gateway/relay/inbound_receiver.py new file mode 100644 index 00000000000..733fe38c2c6 --- /dev/null +++ b/gateway/relay/inbound_receiver.py @@ -0,0 +1,204 @@ +"""Gateway-side inbound delivery receiver. EXPERIMENTAL. + +The connector delivers normalized inbound events to a tenant's gateway over a +**signed HTTP POST** (connector ``src/relay/httpGatewayDelivery.ts``), NOT over +the gateway's outbound ``/relay`` WebSocket: the connector instance that owns a +platform socket is generally not the instance a given gateway dialed out to, so +inbound is delivered to a tenant ENDPOINT (which may load-balance across gateway +instances). Each delivery is HMAC-signed with the per-tenant **delivery key** +(``gateway/relay/auth.py``); this receiver verifies the signature over the EXACT +raw request bytes before accepting the event. + +Two routes (mirroring the connector's two POST targets): + POST {base} {"type":"message", "event": , ...} + POST {base}/interrupt {"type":"interrupt","session_key": ..., "reason"?} + +The receiver: + 1. reads the RAW body bytes (never a reparsed/re-serialized form — the HMAC is + over the literal bytes the connector signed), + 2. verifies ``x-relay-signature`` / ``x-relay-timestamp`` against the delivery + key verify list (primary + secondary during rotation), within the replay + window — rejects 401 on any failure, + 3. parses the JSON and dispatches: a ``message`` to the inbound handler (the + RelayAdapter's ``handle_message`` via the transport's normal path), an + ``interrupt`` to the interrupt handler. + +EXPERIMENTAL: the transport protocol may change without a deprecation cycle +until ≥2 Class-1 platforms validate it. See docs/relay-connector-contract.md. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Awaitable, Callable, Optional, Sequence + +from gateway.platforms.base import MessageEvent +from gateway.relay.auth import ( + DELIVERY_SIG_HEADER, + DELIVERY_TS_HEADER, + verify_delivery_signature, +) + +logger = logging.getLogger(__name__) + +# Callbacks the receiver dispatches verified deliveries to. +InboundMessageHandler = Callable[[MessageEvent], Awaitable[None]] +InboundInterruptHandler = Callable[[str, str], Awaitable[None]] + +try: # lazy/optional dep — mirrors the other HTTP-receiving adapters + from aiohttp import web +except ImportError: # pragma: no cover - exercised only when the extra is absent + web = None # type: ignore[assignment] + +AIOHTTP_AVAILABLE = web is not None + + +def _event_from_wire(raw: dict) -> MessageEvent: + """Rebuild a MessageEvent from the connector's normalized inbound payload. + + Identical mapping to the WS transport's ``_event_from_wire`` (the wire shape + is the same; only the transport differs). Kept here so the HTTP receiver has + no import dependency on the WS transport module. + """ + from gateway.config import Platform + from gateway.platforms.base import MessageType + from gateway.session import SessionSource + + src = raw.get("source", {}) or {} + platform = src.get("platform", "relay") + try: + platform_enum = Platform(platform) + except ValueError: + platform_enum = Platform.RELAY + + source = SessionSource( + platform=platform_enum, + chat_id=src.get("chat_id", ""), + chat_type=src.get("chat_type", "dm"), + chat_name=src.get("chat_name"), + user_id=src.get("user_id"), + user_name=src.get("user_name"), + thread_id=src.get("thread_id"), + chat_topic=src.get("chat_topic"), + user_id_alt=src.get("user_id_alt"), + chat_id_alt=src.get("chat_id_alt"), + guild_id=src.get("guild_id"), + parent_chat_id=src.get("parent_chat_id"), + message_id=src.get("message_id"), + ) + try: + msg_type = MessageType(raw.get("message_type", "text")) + except ValueError: + msg_type = MessageType.TEXT + + return MessageEvent( + text=raw.get("text", ""), + message_type=msg_type, + source=source, + message_id=raw.get("message_id"), + reply_to_message_id=raw.get("reply_to_message_id"), + media_urls=raw.get("media_urls") or [], + ) + + +class InboundDeliveryReceiver: + """Verifies + dispatches signed connector→gateway inbound deliveries. + + Transport-agnostic core: ``handle_raw`` takes the raw body bytes + headers + + which route was hit and returns ``(status, body)``. The aiohttp wiring + (``build_app`` / ``serve``) is a thin shell so the verify+dispatch logic is + unit-testable without a live socket. + """ + + def __init__( + self, + *, + delivery_key_verify_list: Callable[[], Sequence[str]], + on_message: InboundMessageHandler, + on_interrupt: Optional[InboundInterruptHandler] = None, + max_skew_seconds: int = 300, + ) -> None: + # A callable (not a static list) so a rotated delivery key is picked up + # without rebuilding the receiver — mirrors the connector's verify list. + self._verify_list = delivery_key_verify_list + self._on_message = on_message + self._on_interrupt = on_interrupt + self._max_skew_seconds = max_skew_seconds + + async def handle_raw( + self, *, raw_body: bytes, timestamp: Optional[str], signature: Optional[str], is_interrupt: bool + ) -> tuple[int, dict]: + """Verify the signature over ``raw_body`` and dispatch. Returns (status, json). + + 401 on a missing/invalid/expired signature (never dispatches unverified). + 400 on malformed JSON. 200 on a verified, dispatched delivery. + """ + verify_keys = list(self._verify_list() or []) + if not verify_keys: + # No delivery key provisioned -> we cannot verify -> reject. A gateway + # that hasn't enrolled must not accept inbound (fail closed). + logger.warning("relay inbound: no delivery key configured; rejecting") + return 401, {"error": "no delivery key configured"} + + # Verify over the EXACT raw bytes the connector signed. Decode to text + # with the same UTF-8 the connector's JSON.stringify produced; a single + # differing byte breaks the HMAC (raw-body-preservation discipline). + body_text = raw_body.decode("utf-8", errors="strict") + if not verify_delivery_signature( + body_text, timestamp, signature, verify_keys, self._max_skew_seconds + ): + return 401, {"error": "invalid delivery signature"} + + try: + payload = json.loads(body_text) + except json.JSONDecodeError: + return 400, {"error": "invalid JSON body"} + + if is_interrupt or payload.get("type") == "interrupt": + session_key = str(payload.get("session_key", "")) + chat_id = str(payload.get("chat_id", "") or payload.get("reason", "") or "") + if self._on_interrupt is not None and session_key: + await self._on_interrupt(session_key, chat_id) + return 200, {"ok": True} + + # Default: a normalized inbound message event. + event_raw = payload.get("event") + if not isinstance(event_raw, dict): + return 400, {"error": "missing event"} + event = _event_from_wire(event_raw) + await self._on_message(event) + return 200, {"ok": True} + + # ── aiohttp wiring (thin shell over handle_raw) ────────────────────── + def build_app(self) -> Any: + """Build an aiohttp Application exposing the delivery + interrupt routes.""" + if not AIOHTTP_AVAILABLE: + raise RuntimeError( + "InboundDeliveryReceiver requires the 'aiohttp' package " + "(install the messaging extra)." + ) + + async def _deliver(request: Any) -> Any: + return await self._respond(request, is_interrupt=False) + + async def _interrupt(request: Any) -> Any: + return await self._respond(request, is_interrupt=True) + + app = web.Application() + app.router.add_get("/healthz", lambda _: web.Response(text="ok")) + app.router.add_post("/", _deliver) + app.router.add_post("/interrupt", _interrupt) + return app + + async def _respond(self, request: Any, *, is_interrupt: bool) -> Any: + # Read the RAW bytes — do NOT use request.json() (it reparses and we'd + # verify over a re-serialized form, breaking the HMAC). + raw_body = await request.read() + status, body = await self.handle_raw( + raw_body=raw_body, + timestamp=request.headers.get(DELIVERY_TS_HEADER), + signature=request.headers.get(DELIVERY_SIG_HEADER), + is_interrupt=is_interrupt, + ) + return web.json_response(body, status=status) diff --git a/gateway/relay/ws_transport.py b/gateway/relay/ws_transport.py index 75b892105ad..b2e8eda09cd 100644 --- a/gateway/relay/ws_transport.py +++ b/gateway/relay/ws_transport.py @@ -110,6 +110,8 @@ class WebSocketRelayTransport: *, connect_timeout_s: float = _HANDSHAKE_TIMEOUT_S, outbound_timeout_s: float = _OUTBOUND_TIMEOUT_S, + gateway_id: Optional[str] = None, + upgrade_secret: Optional[str] = None, ) -> None: if not WEBSOCKETS_AVAILABLE: raise RuntimeError( @@ -121,6 +123,14 @@ class WebSocketRelayTransport: self._bot_id = bot_id self._connect_timeout_s = connect_timeout_s self._outbound_timeout_s = outbound_timeout_s + # Connection auth (Phase 2): when a per-gateway secret is configured the + # gateway presents an HMAC bearer on the WS upgrade so the connector can + # authenticate it (reject 4401 otherwise). gateway_id identifies the + # enrolled instance — the connector peeks it to index its secret verify + # list, then verifies the signature. Absent -> unauthenticated upgrade + # (dev/test, or a connector that doesn't enforce auth). + self._gateway_id = gateway_id + self._upgrade_secret = upgrade_secret self._ws: Any = None self._reader: Optional[asyncio.Task[None]] = None @@ -135,12 +145,33 @@ class WebSocketRelayTransport: async def connect(self) -> bool: loop = asyncio.get_running_loop() self._descriptor_ready = loop.create_future() - self._ws = await websockets.connect(self._url) # type: ignore[union-attr] + headers = self._upgrade_headers() + if headers: + self._ws = await websockets.connect(self._url, additional_headers=headers) # type: ignore[union-attr] + else: + self._ws = await websockets.connect(self._url) # type: ignore[union-attr] self._reader = asyncio.create_task(self._read_loop(), name="relay-ws-reader") # Send hello; the descriptor arrives via the reader and resolves handshake(). await self._send({"type": "hello", "platform": self._platform, "botId": self._bot_id}) return True + def _upgrade_headers(self) -> Dict[str, str]: + """Auth headers for the WS upgrade, or {} when no secret is configured. + + Presents ``Authorization: Bearer *** where the token is a signed + bearer built with the per-gateway secret (``gateway/relay/auth.py`` + ``make_upgrade_token``), keyed by ``gateway_id`` so the connector can + index its verify list. The connector rejects the upgrade (close 4401) + when this is missing/invalid/revoked; an unauthenticated connector + ignores it. + """ + if not (self._upgrade_secret and self._gateway_id): + return {} + from gateway.relay.auth import make_upgrade_token + + token = make_upgrade_token(self._gateway_id, self._upgrade_secret) + return {"Authorization": f"Bearer {token}"} + async def disconnect(self) -> None: self._closing = True if self._reader is not None: diff --git a/hermes_cli/gateway_enroll.py b/hermes_cli/gateway_enroll.py new file mode 100644 index 00000000000..e0628d81c38 --- /dev/null +++ b/hermes_cli/gateway_enroll.py @@ -0,0 +1,250 @@ +"""``hermes gateway enroll`` — enroll a self-hosted gateway with a relay connector. + +The connector⇄gateway channel is authenticated (the gateway may be +customer-managed and internet-exposed). This command is the gateway half of the +zero-touch enrollment in the connector repo's +``docs/connector-gateway-auth-design.md``: + + 1. Resolve a fresh Nous Portal access token from the existing login + (``~/.hermes/auth.json``) — the same path ``hermes dashboard register`` + uses (``resolve_nous_access_token``). This proves *which Nous org (tenant)* + the caller owns; the connector derives the authoritative tenant from it via + ``GET /api/oauth/account`` (never from anything the gateway asserts). + 2. POST ``{enrollmentToken, gatewayId}`` to the connector's ``/relay/enroll`` + with that token in the ``Authorization`` header, over TLS. + 3. The connector verifies the enrollment token (signature + single-use + + tenant match), mints a per-gateway secret, get-or-creates the per-tenant + delivery key, and returns both ONCE. + 4. Persist ``GATEWAY_RELAY_ID`` / ``GATEWAY_RELAY_SECRET`` / + ``GATEWAY_RELAY_DELIVERY_KEY`` (+ ``GATEWAY_RELAY_URL`` if supplied) into + ``~/.hermes/.env``. The per-gateway secret authenticates the WS upgrade; + the per-tenant delivery key verifies signed inbound deliveries. + +Managed/hosted installs do NOT self-enroll: the orchestrator (NAS) mints the +secret directly and stamps it into the container env, so this command refuses to +run under ``is_managed()`` (mirrors ``dashboard register``). + +EXPERIMENTAL: the relay auth scheme may change without a deprecation cycle until +≥2 Class-1 platforms validate the contract. +""" + +from __future__ import annotations + +import json +import os +import socket +import sys +import urllib.error +import urllib.request +from typing import Optional + + +def _default_gateway_id() -> str: + """A stable-ish default gateway instance id: ``-``. + + The gatewayId identifies this enrolled instance for kill-switch granularity + (the connector indexes its secret verify list by it). Default to the host + name so a human can recognize it; overridable via ``--gateway-id``. + """ + host = "" + try: + host = socket.gethostname().strip() + except Exception: + host = "" + return f"gw-{host or 'hermes'}" + + +def _resolve_connector_url(override: Optional[str]) -> Optional[str]: + """Resolve the connector base URL (no trailing slash) for enrollment. + + Precedence: explicit ``--connector-url`` flag > ``GATEWAY_RELAY_URL`` env > + ``gateway.relay_url`` in config.yaml. The relay URL is a ``ws(s)://`` dial + target; enrollment is an ``http(s)://`` POST to the same host, so we map the + scheme. Returns None when nothing is configured (the user must supply one). + """ + raw = (override or os.environ.get("GATEWAY_RELAY_URL", "")).strip() + if not raw: + try: + from gateway.run import _load_gateway_config # late import to avoid cycle + + cfg = (_load_gateway_config().get("gateway") or {}) + raw = str(cfg.get("relay_url", "") or "").strip() + except Exception: + raw = "" + if not raw: + return None + raw = raw.rstrip("/") + # The relay dial URL is ws(s)://…/relay; enrollment posts to http(s)://…/relay/enroll. + if raw.startswith("ws://"): + raw = "http://" + raw[len("ws://"):] + elif raw.startswith("wss://"): + raw = "https://" + raw[len("wss://"):] + # Strip a trailing /relay path segment if the user pasted the dial URL. + if raw.endswith("/relay"): + raw = raw[: -len("/relay")] + return raw + + +def _post_enroll( + *, + connector_base_url: str, + access_token: str, + enrollment_token: str, + gateway_id: str, + timeout: float = 15.0, +) -> dict: + """POST to the connector's ``/relay/enroll`` and return the JSON body. + + Raises RuntimeError with a user-facing message on any non-2xx / transport + failure. The connector returns ``{secret, deliveryKey, tenant, gatewayId}`` + on success, ``{error}`` at 400/401/403. + """ + url = f"{connector_base_url.rstrip('/')}/relay/enroll" + data = json.dumps({"enrollmentToken": enrollment_token, "gatewayId": gateway_id}).encode("utf-8") + req = urllib.request.Request( + url, + data=data, + method="POST", + headers={ + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + "Accept": "application/json", + }, + ) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + payload = json.loads(resp.read().decode()) + except urllib.error.HTTPError as exc: + detail = "" + try: + detail = (json.loads(exc.read().decode()) or {}).get("error", "") + except Exception: + pass + if exc.code == 401: + raise RuntimeError( + "Connector rejected the caller identity (401). Your Nous Portal " + "token could not be verified — try `hermes auth login nous` and retry." + ) from exc + if exc.code == 403: + raise RuntimeError( + detail + or "Enrollment token invalid, expired, already used, or tenant mismatch (403)." + ) from exc + raise RuntimeError( + f"Connector returned HTTP {exc.code}" + (f": {detail}" if detail else "") + ) from exc + except urllib.error.URLError as exc: + raise RuntimeError( + f"Could not reach the connector at {connector_base_url}: {exc.reason}" + ) from exc + + if not isinstance(payload, dict) or not payload.get("secret"): + raise RuntimeError("Connector returned an unexpected response (no secret).") + return payload + + +def cmd_gateway_enroll(args) -> None: + """Enroll this gateway with a relay connector; persist the auth creds to .env.""" + from hermes_cli.auth import AuthError, resolve_nous_access_token + from hermes_cli.config import is_managed, save_env_value + + # Managed installs get GATEWAY_RELAY_* stamped in by the orchestrator (NAS + # mints the secret directly per the design's managed shape). Self-enrolling + # from inside such a container is a mistake — and save_env_value refuses to + # write anyway. + if is_managed(): + print( + "✗ `hermes gateway enroll` is not available in a managed/hosted install.\n" + " The relay gateway secret is provisioned by the hosting platform." + ) + sys.exit(1) + + enrollment_token = (getattr(args, "token", None) or os.environ.get("GATEWAY_RELAY_ENROLL_TOKEN", "")).strip() + if not enrollment_token: + print( + "✗ No enrollment token. Pass --token (or set " + "GATEWAY_RELAY_ENROLL_TOKEN).\n" + " The connector mints this single-use token when your tenant's route " + "is provisioned; it is delivered with your gateway config." + ) + sys.exit(1) + + connector_base_url = _resolve_connector_url(getattr(args, "connector_url", None)) + if not connector_base_url: + print( + "✗ No connector URL. Pass --connector-url (or set GATEWAY_RELAY_URL " + "/ gateway.relay_url in config.yaml)." + ) + sys.exit(1) + + gateway_id = (getattr(args, "gateway_id", None) or _default_gateway_id()).strip() + + # 1. Resolve a fresh Nous access token (the tenant-proving identity). + try: + access_token = resolve_nous_access_token() + except AuthError as exc: + if getattr(exc, "relogin_required", False): + print("✗ You're not logged into Nous Portal.") + print(" Run `hermes setup` (or `hermes auth login nous`) first, then retry.") + else: + print(f"✗ Could not resolve a Nous Portal access token: {exc}") + sys.exit(1) + except Exception as exc: + print(f"✗ Could not resolve a Nous Portal access token: {exc}") + sys.exit(1) + + # 2-3. Redeem the enrollment token at the connector. + try: + result = _post_enroll( + connector_base_url=connector_base_url, + access_token=access_token, + enrollment_token=enrollment_token, + gateway_id=gateway_id, + ) + except RuntimeError as exc: + print(f"✗ Enrollment failed: {exc}") + sys.exit(1) + + secret = str(result.get("secret") or "") + delivery_key = str(result.get("deliveryKey") or "") + tenant = str(result.get("tenant") or "") + resolved_gateway_id = str(result.get("gatewayId") or gateway_id) + + # 4. Persist the creds idempotently. The secret + delivery key are sensitive; + # save_env_value writes them to ~/.hermes/.env (0600 dir) and never logs. + to_write = { + "GATEWAY_RELAY_ID": resolved_gateway_id, + "GATEWAY_RELAY_SECRET": secret, + "GATEWAY_RELAY_DELIVERY_KEY": delivery_key, + } + # Persist the connector URL too (as the ws(s):// dial target) when supplied + # explicitly, so the runtime can dial without re-specifying it. + explicit_url = (getattr(args, "connector_url", None) or "").strip() + if explicit_url: + to_write["GATEWAY_RELAY_URL"] = explicit_url.rstrip("/") + + for key, value in to_write.items(): + if not value: + continue + try: + save_env_value(key, value) + except Exception as exc: + print(f"✗ Failed to write {key} to .env: {exc}") + sys.exit(1) + + from hermes_cli.config import get_env_path + + print(f'✓ Enrolled gateway "{resolved_gateway_id}"' + (f" for tenant {tenant}" if tenant else "")) + print() + print(f" Wrote to {get_env_path()}:") + print(f" GATEWAY_RELAY_ID={resolved_gateway_id}") + print(" GATEWAY_RELAY_SECRET=") + print(" GATEWAY_RELAY_DELIVERY_KEY=") + if explicit_url: + print(f" GATEWAY_RELAY_URL={explicit_url.rstrip('/')}") + print() + print( + " The gateway now authenticates its relay WS upgrade with the per-gateway\n" + " secret and verifies signed inbound deliveries with the tenant delivery\n" + " key. Restart the gateway to pick up the new env." + ) diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 7ec89a41412..0394ef90a2e 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -11007,6 +11007,13 @@ def cmd_dashboard_register(args): _impl(args) +def cmd_gateway_enroll(args): + """Enroll a self-hosted gateway with a relay connector.""" + from hermes_cli.gateway_enroll import cmd_gateway_enroll as _impl + + _impl(args) + + def cmd_completion(args, parser=None): """Print shell completion script.""" from hermes_cli.completion import generate_bash, generate_zsh, generate_fish @@ -11699,7 +11706,9 @@ def main(): # ========================================================================= # gateway + proxy commands (parsers built in hermes_cli/subcommands/gateway.py) # ========================================================================= - build_gateway_parser(subparsers, cmd_gateway=cmd_gateway, cmd_proxy=cmd_proxy) + build_gateway_parser( + subparsers, cmd_gateway=cmd_gateway, cmd_proxy=cmd_proxy, cmd_gateway_enroll=cmd_gateway_enroll + ) # ========================================================================= # lsp command diff --git a/hermes_cli/subcommands/gateway.py b/hermes_cli/subcommands/gateway.py index ed199fac560..9eef316ece0 100644 --- a/hermes_cli/subcommands/gateway.py +++ b/hermes_cli/subcommands/gateway.py @@ -29,7 +29,9 @@ def _add_compat_platform_flag(parser: argparse.ArgumentParser) -> None: ) -def build_gateway_parser(subparsers, *, cmd_gateway: Callable, cmd_proxy: Callable) -> None: +def build_gateway_parser( + subparsers, *, cmd_gateway: Callable, cmd_proxy: Callable, cmd_gateway_enroll: Callable +) -> None: """Attach the ``gateway`` and ``proxy`` subcommands to ``subparsers``.""" # ========================================================================= # gateway command @@ -236,6 +238,52 @@ def build_gateway_parser(subparsers, *, cmd_gateway: Callable, cmd_proxy: Callab help="Skip the confirmation prompt", ) + # gateway enroll — enroll a self-hosted gateway with a relay connector + # (connector⇄gateway auth). Redeems a single-use enrollment token for the + # per-gateway secret + per-tenant delivery key and writes them to .env. + # See docs/relay-connector-contract.md (and the connector repo's + # docs/connector-gateway-auth-design.md). EXPERIMENTAL. + gateway_enroll = gateway_subparsers.add_parser( + "enroll", + help="Enroll this gateway with a relay connector (writes relay auth creds to .env)", + description=( + "Redeem a single-use enrollment token with a relay connector. " + "Authenticates as your Nous Portal account (the connector derives the " + "authoritative tenant from it), mints this gateway's per-gateway secret " + "and per-tenant delivery key, and writes GATEWAY_RELAY_ID / " + "GATEWAY_RELAY_SECRET / GATEWAY_RELAY_DELIVERY_KEY into ~/.hermes/.env. " + "Requires being logged in (hermes setup). Not available in managed installs." + ), + ) + gateway_enroll.add_argument( + "--token", + default=None, + help=( + "The single-use enrollment token from the connector (delivered with " + "your gateway config). Also settable via GATEWAY_RELAY_ENROLL_TOKEN." + ), + ) + gateway_enroll.add_argument( + "--connector-url", + dest="connector_url", + default=None, + help=( + "The connector base/relay URL, e.g. wss://connector.example.com/relay " + "or https://connector.example.com. Also settable via GATEWAY_RELAY_URL " + "/ gateway.relay_url in config.yaml." + ), + ) + gateway_enroll.add_argument( + "--gateway-id", + dest="gateway_id", + default=None, + help=( + "A stable id for this gateway instance (kill-switch granularity). " + "Defaults to gw-." + ), + ) + gateway_enroll.set_defaults(func=cmd_gateway_enroll) + # ========================================================================= # proxy command — local OpenAI-compatible proxy that attaches the user's # OAuth-authenticated provider credentials to outbound requests. Lets diff --git a/tests/gateway/relay/test_auth.py b/tests/gateway/relay/test_auth.py new file mode 100644 index 00000000000..96450893ae1 --- /dev/null +++ b/tests/gateway/relay/test_auth.py @@ -0,0 +1,167 @@ +"""Unit tests for gateway/relay/auth.py — the gateway-side relay auth primitives. + +Two layers: + +1. **Self-consistency** — make_token/verify_token round-trip, delivery-signature + verify, rotation verify list, tamper + skew + expiry rejection. +2. **Cross-implementation conformance** — frozen vectors generated by the + connector's TypeScript (``src/core/relayAuthToken.ts`` ``makeToken``/``sign``) + are reproduced byte-for-byte by the Python port. If the connector ever + changes its wire scheme, these vectors must be regenerated in lockstep + (and that is the point — the test fails loudly on drift). Regenerate with: + + node -e 'import("./dist/core/relayAuthToken.js").then(m=>{ \ + const s="00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"; \ + console.log(m.makeToken("gw-instance-1", s, 0)); \ + console.log(m.sign("1750000000."+JSON.stringify({a:1}), s)); })' +""" + +from __future__ import annotations + +import json + +from gateway.relay.auth import ( + DELIVERY_SIG_HEADER, + DELIVERY_TS_HEADER, + make_token, + make_upgrade_token, + sign, + verify_delivery_signature, + verify_signature, + verify_token, +) + +# A fixed 256-bit hex secret used for the frozen connector vectors below. +_SECRET = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff" + +# ── Frozen vectors produced by the connector's TypeScript (relayAuthToken.ts). +# Generated via dist/core/relayAuthToken.js makeToken/sign; see module docstring. +_CONN_TOKEN = "Z3ctaW5zdGFuY2UtMTowOjM3YWE3YjE0NWU4NzY0ZDQwM2JhOWM2MzlmMjMwZGQ2M2RlOGVkOTliODhmZWQzNmFhMDI2MjVhOGE3ZTM1NjQ" +# The EXACT bytes the connector signed: JS JSON.stringify emits compact JSON +# (no spaces). The gateway verifies over the literal received body, so the +# vector is the compact form — NOT Python's spaced json.dumps default. This is +# the raw-byte-preservation discipline (a single differing byte breaks the HMAC). +_CONN_BODY = '{"type":"message","event":{"text":"hi","source":{"chat_id":"c1"}}}' +_CONN_TS = 1750000000 +_CONN_SIG = "ac9509c8dae52b5590f06378260877334ff1adc4b1c96bafa4b514165fae6dc6" + + +# ── Self-consistency ────────────────────────────────────────────────────── + + +def test_token_round_trip_no_expiry(): + tok = make_token("payload-123", _SECRET, 0) + assert verify_token(tok, [_SECRET]) == "payload-123" + + +def test_token_payload_may_contain_colons(): + # verify_token must split from the right so a colon-bearing payload survives. + payload = "agent:main:discord:group:chanA" + tok = make_token(payload, _SECRET, 0) + assert verify_token(tok, [_SECRET]) == payload + + +def test_upgrade_token_is_make_token_of_gateway_id(): + assert make_upgrade_token("gw-1", _SECRET, 0) == make_token("gw-1", _SECRET, 0) + + +def test_token_wrong_secret_rejected(): + tok = make_token("p", _SECRET, 0) + assert verify_token(tok, ["deadbeef" * 8]) is None + + +def test_token_expired_rejected(): + # ttl in the past -> exp < now -> rejected. + tok = make_token("p", _SECRET, ttl_seconds=1) + # Force expiry by signing with a manual past exp via the low-level helper. + # Simpler: a 1s ttl token is still valid now; instead assert a clearly-old one. + # Build an already-expired token by hand using the same scheme. + import base64 + + signed = "p:1" # exp=1 (1970) -> long past + sig = sign(signed, _SECRET) + raw = f"{signed}:{sig}".encode() + expired = base64.urlsafe_b64encode(raw).decode().rstrip("=") + assert verify_token(expired, [_SECRET]) is None + # And the fresh one is accepted. + assert verify_token(tok, [_SECRET]) == "p" + + +def test_token_rotation_verify_list(): + # A token signed with the (old) secondary still verifies during rotation. + old, new = _SECRET, "ffeeddccbbaa99887766554433221100ffeeddccbbaa99887766554433221100" + tok_old = make_token("p", old, 0) + assert verify_token(tok_old, [new, old]) == "p" # primary=new, secondary=old + assert verify_token(tok_old, [new]) is None + + +def test_token_garbage_rejected(): + assert verify_token("not-base64url!!!", [_SECRET]) is None + assert verify_token("", [_SECRET]) is None + + +def test_verify_signature_constant_time_multi_secret(): + payload = "1700000000.body" + s = sign(payload, _SECRET) + assert verify_signature(payload, s, ["wrong", _SECRET]) is True + assert verify_signature(payload, s, ["wrong"]) is False + assert verify_signature(payload, "zz", [_SECRET]) is False # bad hex + + +# ── Delivery signature (connector -> gateway inbound) ────────────────────── + + +def test_delivery_signature_accepts_valid(): + body = json.dumps({"type": "message", "event": {"text": "x"}}) + ts = 1700000000 + s = sign(f"{ts}.{body}", _SECRET) + assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts) is True + + +def test_delivery_signature_tamper_rejected(): + body = json.dumps({"type": "message", "event": {"text": "x"}}) + ts = 1700000000 + s = sign(f"{ts}.{body}", _SECRET) + # A single changed body byte breaks the HMAC. + assert verify_delivery_signature(body + " ", str(ts), s, [_SECRET], now=ts) is False + + +def test_delivery_signature_skew_rejected(): + body = "{}" + ts = 1700000000 + s = sign(f"{ts}.{body}", _SECRET) + # Beyond the 300s replay window in either direction. + assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts + 301) is False + assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts - 301) is False + assert verify_delivery_signature(body, str(ts), s, [_SECRET], now=ts + 299) is True + + +def test_delivery_signature_missing_headers_rejected(): + assert verify_delivery_signature("{}", None, "abc", [_SECRET]) is False + assert verify_delivery_signature("{}", "1700000000", None, [_SECRET]) is False + assert verify_delivery_signature("{}", "not-an-int", "abc", [_SECRET]) is False + + +def test_delivery_headers_match_connector_names(): + # The gateway reads exactly the header names the connector writes. + assert DELIVERY_TS_HEADER == "x-relay-timestamp" + assert DELIVERY_SIG_HEADER == "x-relay-signature" + + +# ── Cross-implementation conformance (frozen connector vectors) ──────────── + + +def test_python_make_token_matches_connector_byte_for_byte(): + assert make_token("gw-instance-1", _SECRET, 0) == _CONN_TOKEN + + +def test_python_verifies_connector_token(): + assert verify_token(_CONN_TOKEN, [_SECRET]) == "gw-instance-1" + + +def test_python_sign_matches_connector_delivery_sig(): + assert sign(f"{_CONN_TS}.{_CONN_BODY}", _SECRET) == _CONN_SIG + + +def test_python_verifies_connector_delivery_signature(): + assert verify_delivery_signature(_CONN_BODY, str(_CONN_TS), _CONN_SIG, [_SECRET], now=_CONN_TS) is True diff --git a/tests/gateway/relay/test_inbound_receiver.py b/tests/gateway/relay/test_inbound_receiver.py new file mode 100644 index 00000000000..076fc3c9528 --- /dev/null +++ b/tests/gateway/relay/test_inbound_receiver.py @@ -0,0 +1,150 @@ +"""Unit tests for gateway/relay/inbound_receiver.py. + +Covers the verify-then-dispatch core (handle_raw): a correctly-signed message +delivery is verified + dispatched; an interrupt delivery routes to the interrupt +handler; unsigned/tampered/expired/no-key deliveries are rejected 401; malformed +JSON is 400. Signatures are produced with the SAME auth primitives the connector +uses (gateway/relay/auth.py sign), so this exercises the real verify path. +""" + +from __future__ import annotations + +import json +import time + +import pytest + +from gateway.relay.auth import sign +from gateway.relay.inbound_receiver import InboundDeliveryReceiver + +_KEY = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff" + + +def _signed(body_obj: dict, key: str = _KEY, ts: int | None = None) -> tuple[bytes, str, str]: + """Serialize compactly (as the connector's JSON.stringify does), sign it.""" + body = json.dumps(body_obj, separators=(",", ":")) + raw = body.encode("utf-8") + t = ts if ts is not None else int(time.time()) + return raw, str(t), sign(f"{t}.{body}", key) + + +def _receiver(**kw): + received: list = [] + interrupts: list = [] + + async def on_message(ev): + received.append(ev) + + async def on_interrupt(sk, chat): + interrupts.append((sk, chat)) + + r = InboundDeliveryReceiver( + delivery_key_verify_list=lambda: [_KEY], + on_message=on_message, + on_interrupt=on_interrupt, + **kw, + ) + return r, received, interrupts + + +@pytest.mark.asyncio +async def test_valid_message_delivery_dispatched(): + r, received, _ = _receiver() + raw, ts, sig = _signed( + { + "type": "message", + "event": { + "text": "hello", + "message_type": "text", + "source": {"platform": "discord", "chat_id": "chan1", "chat_type": "group", "guild_id": "guildA"}, + }, + } + ) + status, body = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False) + assert status == 200 and body == {"ok": True} + assert len(received) == 1 + assert received[0].text == "hello" + assert received[0].source.guild_id == "guildA" + + +@pytest.mark.asyncio +async def test_valid_interrupt_delivery_routes_to_interrupt_handler(): + r, _, interrupts = _receiver() + raw, ts, sig = _signed({"type": "interrupt", "session_key": "agent:main:discord:group:c:u", "reason": "stop"}) + status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=True) + assert status == 200 + assert interrupts and interrupts[0][0] == "agent:main:discord:group:c:u" + + +@pytest.mark.asyncio +async def test_tampered_body_rejected_401(): + r, received, _ = _receiver() + raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}) + status, _ = await r.handle_raw(raw_body=raw + b" ", timestamp=ts, signature=sig, is_interrupt=False) + assert status == 401 + assert received == [] + + +@pytest.mark.asyncio +async def test_unsigned_rejected_401(): + r, _, _ = _receiver() + raw, _, _ = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}) + status, _ = await r.handle_raw(raw_body=raw, timestamp=None, signature=None, is_interrupt=False) + assert status == 401 + + +@pytest.mark.asyncio +async def test_expired_timestamp_rejected_401(): + r, _, _ = _receiver(max_skew_seconds=300) + raw, _, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, ts=1) + # ts=1 (1970) is far outside the 300s window vs now. + status, _ = await r.handle_raw(raw_body=raw, timestamp="1", signature=sig, is_interrupt=False) + assert status == 401 + + +@pytest.mark.asyncio +async def test_wrong_key_rejected_401(): + r, _, _ = _receiver() + other = "ffeeddccbbaa99887766554433221100ffeeddccbbaa99887766554433221100" + raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, key=other) + status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False) + assert status == 401 + + +@pytest.mark.asyncio +async def test_no_delivery_key_fails_closed_401(): + async def on_message(ev): + pass + + r = InboundDeliveryReceiver(delivery_key_verify_list=lambda: [], on_message=on_message) + raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}) + status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False) + assert status == 401 + + +@pytest.mark.asyncio +async def test_rotation_secondary_key_accepted(): + new = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + received: list = [] + + async def on_message(ev): + received.append(ev) + + # Connector still signs with the OLD key (secondary); verify list has both. + r = InboundDeliveryReceiver( + delivery_key_verify_list=lambda: [new, _KEY], on_message=on_message + ) + raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, key=_KEY) + status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False) + assert status == 200 and len(received) == 1 + + +@pytest.mark.asyncio +async def test_malformed_json_after_valid_signature_is_400(): + r, _, _ = _receiver() + # Sign a non-JSON body so the signature passes but json.loads fails. + raw = b"not json at all" + ts = str(int(time.time())) + sig = sign(f"{ts}.{raw.decode()}", _KEY) + status, body = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False) + assert status == 400 diff --git a/tests/gateway/relay/test_relay_sheds_crypto.py b/tests/gateway/relay/test_relay_sheds_crypto.py index a217f83af83..f2e0810af4a 100644 --- a/tests/gateway/relay/test_relay_sheds_crypto.py +++ b/tests/gateway/relay/test_relay_sheds_crypto.py @@ -47,6 +47,19 @@ def _relay_py_files() -> list[Path]: return sorted(_RELAY_PKG.glob("*.py")) +# ``auth.py`` is the connector⇄gateway CHANNEL authenticator (the gateway's WS +# upgrade bearer + inbound-delivery signature verification). ``inbound_receiver.py`` +# is the signed-inbound-delivery receiver that USES that channel auth to verify +# connector→gateway POSTs. Both are net-new, intended, and the whole point of +# authenticating an untrusted/disposable gateway — they are NOT platform crypto. +# They use HMAC over the connector's per-gateway / per-tenant secrets (NOT any +# platform's signing secret), so they are exempt from the platform-crypto symbol +# scan below. The module-import ban (platform-crypto modules) still applies to +# every file including these — they import only stdlib hmac/hashlib and each +# other, never a platform-crypto module, so they stay clean there. +_CHANNEL_AUTH_FILES = {"auth.py", "inbound_receiver.py"} + + def test_relay_package_imports_no_platform_crypto(): """No module in gateway/relay imports a platform-crypto / verification module.""" offenders: list[str] = [] @@ -72,9 +85,19 @@ def test_relay_package_imports_no_platform_crypto(): def test_relay_package_calls_no_signature_verification(): - """No relay module references a signature/crypto-verification symbol by name.""" + """No relay module references a PLATFORM signature/crypto-verification symbol. + + Scoped to platform crypto (Discord ed25519, Twilio/WeCom HMAC, webhook + signature checks). The connector⇄gateway channel authenticator (``auth.py``) + is exempt: its HMAC is over the connector's own per-gateway/per-tenant + secrets to authenticate the relay channel itself — the gateway holds NO + platform secret and re-validates NO platform payload. See ``auth.py`` and + docs/connector-gateway-auth-design.md. + """ offenders: list[str] = [] for path in _relay_py_files(): + if path.name in _CHANNEL_AUTH_FILES: + continue for lineno, line in enumerate(path.read_text(encoding="utf-8").splitlines(), 1): # Skip comments / docstrings-as-prose: only flag code-like usage. stripped = line.strip() @@ -89,3 +112,30 @@ def test_relay_package_calls_no_signature_verification(): + "\n ".join(offenders) + "\nThe connector verifies at the edge; the gateway re-validates nothing." ) + + +def test_channel_auth_uses_only_stdlib_crypto_not_platform_modules(): + """auth.py (channel authenticator) imports only stdlib crypto, no platform crypto. + + Positive guard: the connector⇄gateway channel auth is allowed to do HMAC, + but it must do so with stdlib primitives over connector-owned secrets — it + must never reach for a platform-crypto module. This keeps the exemption + above honest (auth.py can't smuggle in platform verification). + """ + auth_py = _RELAY_PKG / "auth.py" + assert auth_py.is_file(), "gateway/relay/auth.py (channel authenticator) is missing" + tree = ast.parse(auth_py.read_text(encoding="utf-8"), filename=str(auth_py)) + imported: list[str] = [] + for node in ast.walk(tree): + if isinstance(node, ast.Import): + imported += [a.name for a in node.names] + elif isinstance(node, ast.ImportFrom): + imported.append(node.module or "") + # No platform-crypto module import. + assert not [m for m in imported if any(tok in m for tok in _FORBIDDEN_MODULE_TOKENS)], ( + f"auth.py must not import platform crypto; imports={imported}" + ) + # It does use stdlib hmac/hashlib (that's how it authenticates the channel). + assert "hmac" in imported and "hashlib" in imported, ( + f"auth.py should authenticate the channel with stdlib hmac/hashlib; imports={imported}" + ) diff --git a/tests/hermes_cli/test_gateway.py b/tests/hermes_cli/test_gateway.py index 38a65101279..8f9e49c4957 100644 --- a/tests/hermes_cli/test_gateway.py +++ b/tests/hermes_cli/test_gateway.py @@ -305,6 +305,7 @@ def test_gateway_run_force_flag_survives_parser_extraction(): subparsers, cmd_gateway=lambda _args: None, cmd_proxy=lambda _args: None, + cmd_gateway_enroll=lambda _args: None, ) args = parser.parse_args(["gateway", "run", "--force"]) diff --git a/tests/hermes_cli/test_subcommands_profile_gateway.py b/tests/hermes_cli/test_subcommands_profile_gateway.py index 99483a0c5d3..4040c1f67ae 100644 --- a/tests/hermes_cli/test_subcommands_profile_gateway.py +++ b/tests/hermes_cli/test_subcommands_profile_gateway.py @@ -20,6 +20,10 @@ def _h_proxy(args): # pragma: no cover - identity only return "proxy" +def _h_gateway_enroll(args): # pragma: no cover - identity only + return "gateway_enroll" + + def _h_profile(args): # pragma: no cover - identity only return "profile" @@ -34,7 +38,12 @@ def _profile_parser(): def _gateway_parser(): p = argparse.ArgumentParser(prog="hermes") sub = p.add_subparsers(dest="command") - build_gateway_parser(sub, cmd_gateway=_h_gateway, cmd_proxy=_h_proxy) + build_gateway_parser( + sub, + cmd_gateway=_h_gateway, + cmd_proxy=_h_proxy, + cmd_gateway_enroll=_h_gateway_enroll, + ) return p @@ -90,3 +99,25 @@ def test_gateway_lifecycle_accepts_legacy_platform_flag(): assert ns.gateway_command == action assert ns.platform == "photon" assert ns.func is _h_gateway + + +def test_gateway_enroll_dispatch(): + p = _gateway_parser() + ns = p.parse_args( + [ + "gateway", + "enroll", + "--token", + "tok", + "--connector-url", + "wss://connector.example.com/relay", + "--gateway-id", + "gw-1", + ] + ) + assert ns.command == "gateway" + assert ns.gateway_command == "enroll" + assert ns.func is _h_gateway_enroll + assert ns.token == "tok" + assert ns.connector_url == "wss://connector.example.com/relay" + assert ns.gateway_id == "gw-1"