From d2c53ff5583eca0e5f4009a3fcc28c5da8b17fce Mon Sep 17 00:00:00 2001 From: Ben Barclay Date: Fri, 19 Jun 2026 09:33:15 +1000 Subject: [PATCH] feat(relay): WS-only inbound on the gateway adapter (Phase 3) (#48294) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The connector now delivers inbound (messages + interrupts) over the gateway's OUTBOUND /relay WebSocket, not a signed HTTP POST to an inbound endpoint. The gateway needs no inbound HTTP port — which is what makes hosted gateways (no public IP) able to receive inbound at all. - gateway/relay/adapter.py: connect() wires set_interrupt_inbound_handler( self.on_interrupt) so connector->gateway interrupt_inbound frames bridge into the existing per-session interrupt path (the inbound message handler was already wired). Removed _maybe_start_inbound_receiver() + the _inbound_runner lifecycle — there is no HTTP receiver anymore. - gateway/relay/inbound_receiver.py: deleted (the signed-HTTP InboundDelivery receiver). - gateway/relay/__init__.py: removed relay_inbound_config() (dead with the receiver gone). The delivery key is still set in-process by self-provision for forward-compat but is no longer consumed for inbound. - docs/relay-connector-contract.md: §3 rewritten — inbound is the WS back-channel routed cross-instance via the connector's relay bus; §5 interrupt + §6 auth table updated; the old signed-HTTP-POST + per-tenant-delivery-key-signing path is documented as superseded. gatewayEndpoint noted as passthrough-plane only. Tests: stub_connector grows set_interrupt_inbound_handler + push_interrupt; new test_relay_interrupt case proves connect() wires BOTH inbound handlers and an interrupt_inbound frame over the WS cancels the right session. Removed the HTTP-receiver test; updated the crypto-shedding scan + self-provision delivery-key assertion. 88 relay tests pass. EXPERIMENTAL. Pairs with gateway-gateway (relay bus + WsGatewayDelivery) and the NAS GATEWAY_RELAY_URL stamp. The cross-repo E2E (connector repo) proves the full multi-instance path against this production adapter code. --- docs/relay-connector-contract.md | 89 +++++--- gateway/relay/__init__.py | 41 +--- gateway/relay/adapter.py | 52 +---- gateway/relay/inbound_receiver.py | 204 ------------------ tests/gateway/relay/stub_connector.py | 12 ++ tests/gateway/relay/test_inbound_receiver.py | 150 ------------- tests/gateway/relay/test_relay_interrupt.py | 20 ++ .../gateway/relay/test_relay_sheds_crypto.py | 18 +- tests/gateway/relay/test_self_provision.py | 7 +- 9 files changed, 117 insertions(+), 476 deletions(-) delete mode 100644 gateway/relay/inbound_receiver.py delete mode 100644 tests/gateway/relay/test_inbound_receiver.py diff --git a/docs/relay-connector-contract.md b/docs/relay-connector-contract.md index 39c86a5f839..54fff9406cc 100644 --- a/docs/relay-connector-contract.md +++ b/docs/relay-connector-contract.md @@ -62,33 +62,55 @@ live platform adapter's capability methods. The connector normalizes each platform wire event into a `MessageEvent` (`gateway/platforms/base.py`) and delivers it to the gateway. **Inbound is -delivered over a signed HTTP POST, not the outbound `/relay` WebSocket** (see -the transport note below). The gateway keys the session via `build_session_key()` +delivered over the gateway's OUTBOUND `/relay` WebSocket** (see the transport +note below) — the connector pushes an `inbound` frame down the socket the +gateway already dialed. The gateway keys the session via `build_session_key()` from the embedded `SessionSource` — so populating the right discriminators is the single highest-correctness responsibility of the connector. -### Inbound transport (signed HTTP POST, not the outbound WS) +### Inbound transport (WS back-channel, not HTTP) The gateway dials **out** to the connector's `/relay` WebSocket for the -handshake + outbound actions (§4) + its own `/stop` egress (§5). Inbound, -however, is delivered the other way: the connector **POSTs** the normalized -event to the gateway's inbound endpoint (`HttpGatewayDelivery` on the connector; -`gateway/relay/inbound_receiver.py` on the gateway). The reason is -multi-instance: the connector instance that owns a platform's socket (and thus -produces inbound events) is generally **not** the instance a given gateway -dialed its outbound WS into, so inbound must target a tenant **endpoint** (which -may load-balance across gateway instances) rather than ride one gateway's -outbound socket. Each delivery is HMAC-signed with the per-tenant **delivery -key** (§6.1); the gateway verifies the signature over the exact raw bytes before -accepting the event. Two POST targets: +handshake + outbound actions (§4) + its own `/stop` egress (§5). Inbound rides +the **same socket** in the other direction: the connector pushes an `inbound` +frame (and `interrupt_inbound` for §5) down the gateway's outbound WS. There is +**no gateway-side inbound HTTP endpoint** — a gateway need not (and, when hosted, +cannot) expose any inbound port; everything flows over the connection it +initiated. + +**Multi-instance routing.** The connector instance that owns a platform's socket +(and thus produces inbound events) is generally **not** the instance the gateway +dialed its outbound WS into. The producing instance therefore publishes the +event on the connector's internal **relay bus** (Redis pub/sub; `RelayBus` in +`src/core/relayBus.ts`) keyed by tenant. Every connector instance subscribes and +routes each message to its **local** sessions for that tenant +(`RelayServer.routeBusMessage`); the single instance that actually holds the +gateway's socket delivers it, and instances with no local session for the tenant +no-op. Cross-instance delivery is thus an in-cluster Redis hop, not a public +HTTP call. + +Frames (connector → gateway, over the WS): + +- `{"type":"inbound", "event": , "bufferId"?}` +- `{"type":"interrupt_inbound", "session_key", "chat_id"}` (§5) + +**Trust.** The WS upgrade is authenticated with the gateway's per-gateway secret +(§6.1), so the channel is trusted end to end — inbound frames are not separately +HMAC-signed (the authenticated socket subsumes the per-delivery origin proof the +old HTTP path needed). The relay-bus hop is inside the connector trust domain +(same as the lease/buffer/capability stores). + +> Earlier drafts of this contract delivered inbound over a signed **HTTP POST** +> to a `gatewayEndpoint` (`HttpGatewayDelivery` + a gateway-side +> `inbound_receiver`), HMAC-signed with a per-tenant delivery key. That required +> every gateway to expose a reachable inbound URL — impossible for hosted +> gateways, which have no public IP. The WS back-channel above replaces it; the +> per-tenant delivery key is retained at provision for forward-compat but is no +> longer used for inbound. `gatewayEndpoint` remains only for the **passthrough +> plane** (Class-2/3 webhooks like Discord interactions / Twilio), which is a +> separate synchronous-forward path and out of scope for this section. -- `POST {gatewayEndpoint}` → `{"type":"message", "event": }` -- `POST {gatewayEndpoint}/interrupt` → `{"type":"interrupt", "session_key", "reason"?}` (§5) -> An earlier draft of this contract delivered inbound over the WS `inbound` -> frame. That only works single-instance and predates the multi-instance -> socket-ownership + channel-auth model; the signed-HTTP path above is the -> shipped design. ### SessionSource fields (the wire surface) @@ -178,13 +200,15 @@ gateway holds zero capability material). Source of truth: mid-turn `/stop` over the outbound WS. The connector MUST forward it to the gateway instance running that `session_key` (the routing invariant). - **Connector → gateway:** an inbound interrupt for a `session_key` is delivered - as a **signed HTTP POST** to `{gatewayEndpoint}/interrupt` (§3 transport note), - and bridged by the adapter's `on_interrupt(session_key, chat_id)` into the - existing per-session interrupt mechanism, cancelling exactly that turn + as an `interrupt_inbound` frame down the gateway's outbound WS (§3 transport + note) — routed cross-instance via the relay bus to whichever instance holds + the socket — and bridged by the adapter's `on_interrupt(session_key, chat_id)` + into the existing per-session interrupt mechanism, cancelling exactly that turn (siblings untouched). -The gateway→connector `/stop` rides the outbound WS; the connector→gateway -interrupt rides the same signed-HTTP inbound path as a normalized event. +Both directions ride the gateway's outbound WS: the gateway→connector `/stop` +egresses over it, and the connector→gateway interrupt rides the same `inbound` +back-channel as a normalized event. --- @@ -231,20 +255,21 @@ only in transport. See `docs/capability-trust-boundary.md` (connector repo: A2 makes the connector the sole holder of platform secrets while the gateway may be **customer-managed and internet-exposed**, so the connector⇄gateway channel -is itself authenticated. The gateway holds two enrollment-issued credentials -(`hermes gateway enroll` → connector `/relay/enroll`): a **per-gateway secret** -and a **per-tenant delivery key**. Both are HMAC-SHA256 schemes with a -multi-secret rotation verify list (gateway side: `gateway/relay/auth.py`; -connector side: `src/core/relayAuthToken.ts` + `src/core/deliverySigning.ts`). +is itself authenticated. The gateway holds an enrollment- or provision-issued +**per-gateway secret** (`hermes gateway enroll` → connector `/relay/enroll`, or +managed self-provision → `/relay/provision`) that authenticates its outbound WS +upgrade. It is an HMAC-SHA256 scheme with a multi-secret rotation verify list +(gateway side: `gateway/relay/auth.py`; connector side: +`src/core/relayAuthToken.ts`). | Leg | Credential | Mechanism | |-----|-----------|-----------| | Gateway → connector WS upgrade | per-gateway secret | An `Authorization` bearer header on the `/relay` upgrade. The token is `base64url(payload:exp:sig)` where `payload = gatewayId` and `sig = HMAC(payload:exp, secret)`. Connector verifies and rejects the upgrade (**close 4401**) on mismatch/absence/revocation. The authenticated tenant comes from the connector's store, never the `hello` frame. | -| Connector → gateway inbound POST | per-tenant delivery key | Two headers: `x-relay-timestamp` (unix seconds) and `x-relay-signature` (hex `HMAC(ts.rawBody, deliveryKey)`). Gateway verifies over the **exact raw bytes** within a ±300s replay window before accepting the event; rejects **401** otherwise. | +| Connector → gateway inbound (`inbound` / `interrupt_inbound` frames) | — (rides the authenticated WS) | Inbound is pushed down the gateway's already-authenticated outbound socket (§3), so no per-message signature is needed. A **per-tenant delivery key** is still issued at enroll/provision and retained for forward-compat, but is no longer used to sign inbound. | This is the **channel** authenticator — distinct from platform crypto, which the relay path still sheds entirely (§6). The gateway holds zero platform secrets; -these two keys authenticate only the connector link. Full threat model + +the per-gateway secret authenticates only the connector link. Full threat model + enrollment/rotation/kill-switch design: `docs/connector-gateway-auth-design.md` (connector repo). diff --git a/gateway/relay/__init__.py b/gateway/relay/__init__.py index 421fe0ac240..a0bd4f526ef 100644 --- a/gateway/relay/__init__.py +++ b/gateway/relay/__init__.py @@ -79,40 +79,6 @@ def relay_connection_auth() -> tuple[Optional[str], Optional[str]]: return (gateway_id or None, secret or None) -def relay_inbound_config() -> tuple[Optional[str], Optional[str], int]: - """Resolve (delivery_key, bind_host, bind_port) for the inbound receiver. - - The connector delivers normalized inbound events to this gateway over a - SIGNED HTTP POST (not the outbound WS), verified with the per-tenant delivery - key issued at enrollment (``GATEWAY_RELAY_DELIVERY_KEY``). The receiver only - starts when a delivery key AND a bind port are configured — a gateway with no - public inbound URL (e.g. a purely outbound dev run) simply doesn't run it. - - Env first (Docker), then ``gateway.relay_delivery_key`` / - ``gateway.relay_inbound_host`` / ``gateway.relay_inbound_port`` in config.yaml. - Port 0 (default/unset) -> receiver disabled. - """ - key = os.environ.get("GATEWAY_RELAY_DELIVERY_KEY", "").strip() - host = os.environ.get("GATEWAY_RELAY_INBOUND_HOST", "").strip() - port_raw = os.environ.get("GATEWAY_RELAY_INBOUND_PORT", "").strip() - if not (key and port_raw): - try: - from gateway.run import _load_gateway_config # late import to avoid cycle - - cfg = (_load_gateway_config().get("gateway") or {}) - key = key or str(cfg.get("relay_delivery_key", "") or "").strip() - host = host or str(cfg.get("relay_inbound_host", "") or "").strip() - if not port_raw: - port_raw = str(cfg.get("relay_inbound_port", "") or "").strip() - except Exception: # noqa: BLE001 - config absence/parse must never crash registration - pass - try: - port = int(port_raw) if port_raw else 0 - except ValueError: - port = 0 - return (key or None, host or "0.0.0.0", port) - - def relay_endpoint() -> Optional[str]: """The gateway's own PUBLIC inbound URL, asserted to the connector at provision. @@ -318,8 +284,11 @@ def self_provision_if_managed() -> bool: logger.warning("relay self-provision failed (%s); gateway will boot without relay auth", exc) return False - # Set creds in-process so register_relay_adapter() + relay_inbound_config() - # read them from os.environ. Never logged. + # Set creds in-process so register_relay_adapter() reads them from os.environ + # (the per-gateway secret authenticates the outbound WS upgrade). The delivery + # key is still issued by the connector and persisted for forward-compat, but + # inbound now rides the WS (no HTTP receiver), so it is not consumed here. + # Never logged. os.environ["GATEWAY_RELAY_ID"] = str(result.get("gatewayId") or gateway_id) os.environ["GATEWAY_RELAY_SECRET"] = str(result.get("secret") or "") os.environ["GATEWAY_RELAY_DELIVERY_KEY"] = str(result.get("deliveryKey") or "") diff --git a/gateway/relay/adapter.py b/gateway/relay/adapter.py index b64f7abc517..fc4e5f40ee7 100644 --- a/gateway/relay/adapter.py +++ b/gateway/relay/adapter.py @@ -58,10 +58,6 @@ class RelayAdapter(BasePlatformAdapter): # Capability surface read by stream_consumer (getattr(..., 4096)). self.MAX_MESSAGE_LENGTH = descriptor.max_message_length self.supports_code_blocks = descriptor.markdown_dialect not in ("", "plain") - # Inbound delivery receiver (signed connector→gateway HTTP POSTs). Built - # lazily in connect() when a delivery key + bind port are configured; a - # purely-outbound dev gateway runs without it. See inbound_receiver.py. - self._inbound_runner: Any = None # ── capability surface (from descriptor) ───────────────────────────── @property @@ -80,6 +76,12 @@ class RelayAdapter(BasePlatformAdapter): if self._transport is None: raise RuntimeError("RelayAdapter has no transport configured") self._transport.set_inbound_handler(self._on_inbound) + # Inbound interrupts (connector -> owning gateway) arrive as + # interrupt_inbound frames over the SAME outbound WS; bridge them to the + # adapter's interrupt path. WS-only: there is no inbound HTTP receiver. + set_interrupt = getattr(self._transport, "set_interrupt_inbound_handler", None) + if callable(set_interrupt): + set_interrupt(self.on_interrupt) ok = await self._transport.connect() if not ok: return False @@ -92,40 +94,12 @@ class RelayAdapter(BasePlatformAdapter): logger.warning("relay handshake failed: %s", exc) return False self._apply_descriptor(descriptor) - # Start the signed inbound-delivery receiver if configured (the connector - # POSTs normalized events to it over HTTP, verified with the tenant - # delivery key). Non-fatal: a receiver bind failure must not fail the - # outbound connection — the gateway can still send. - await self._maybe_start_inbound_receiver() + # Inbound (messages + interrupts) is delivered over the outbound WS via + # the connector's relay bus — there is NO inbound HTTP endpoint (hosted + # gateways have no public IP). The transport's reader already dispatches + # `inbound` / `interrupt_inbound` frames to the handlers wired above. return True - async def _maybe_start_inbound_receiver(self) -> None: - """Start the inbound HTTP receiver when a delivery key + port are set.""" - from gateway.relay import relay_inbound_config - - delivery_key, host, port = relay_inbound_config() - if not (delivery_key and port): - return # no inbound URL configured -> outbound-only gateway - try: - from aiohttp import web - - from gateway.relay.inbound_receiver import InboundDeliveryReceiver - - receiver = InboundDeliveryReceiver( - delivery_key_verify_list=lambda: [delivery_key], - on_message=self._on_inbound, - on_interrupt=self.on_interrupt, - ) - runner = web.AppRunner(receiver.build_app(), access_log=None) - await runner.setup() - site = web.TCPSite(runner, host, port) - await site.start() - self._inbound_runner = runner - logger.info("relay inbound receiver listening on http://%s:%s", host, port) - except Exception as exc: # noqa: BLE001 - inbound bind failure must not kill outbound - logger.warning("relay inbound receiver failed to start: %s", exc) - self._inbound_runner = None - def _apply_descriptor(self, descriptor: CapabilityDescriptor) -> None: """Adopt a (re)negotiated descriptor into the live capability surface.""" self.descriptor = descriptor @@ -148,12 +122,6 @@ class RelayAdapter(BasePlatformAdapter): await self.interrupt_session_activity(session_key, chat_id) async def disconnect(self) -> None: - if self._inbound_runner is not None: - try: - await self._inbound_runner.cleanup() - except Exception: # noqa: BLE001 - best-effort teardown - pass - self._inbound_runner = None if self._transport is not None: await self._transport.disconnect() diff --git a/gateway/relay/inbound_receiver.py b/gateway/relay/inbound_receiver.py deleted file mode 100644 index 733fe38c2c6..00000000000 --- a/gateway/relay/inbound_receiver.py +++ /dev/null @@ -1,204 +0,0 @@ -"""Gateway-side inbound delivery receiver. EXPERIMENTAL. - -The connector delivers normalized inbound events to a tenant's gateway over a -**signed HTTP POST** (connector ``src/relay/httpGatewayDelivery.ts``), NOT over -the gateway's outbound ``/relay`` WebSocket: the connector instance that owns a -platform socket is generally not the instance a given gateway dialed out to, so -inbound is delivered to a tenant ENDPOINT (which may load-balance across gateway -instances). Each delivery is HMAC-signed with the per-tenant **delivery key** -(``gateway/relay/auth.py``); this receiver verifies the signature over the EXACT -raw request bytes before accepting the event. - -Two routes (mirroring the connector's two POST targets): - POST {base} {"type":"message", "event": , ...} - POST {base}/interrupt {"type":"interrupt","session_key": ..., "reason"?} - -The receiver: - 1. reads the RAW body bytes (never a reparsed/re-serialized form — the HMAC is - over the literal bytes the connector signed), - 2. verifies ``x-relay-signature`` / ``x-relay-timestamp`` against the delivery - key verify list (primary + secondary during rotation), within the replay - window — rejects 401 on any failure, - 3. parses the JSON and dispatches: a ``message`` to the inbound handler (the - RelayAdapter's ``handle_message`` via the transport's normal path), an - ``interrupt`` to the interrupt handler. - -EXPERIMENTAL: the transport protocol may change without a deprecation cycle -until ≥2 Class-1 platforms validate it. See docs/relay-connector-contract.md. -""" - -from __future__ import annotations - -import json -import logging -from typing import Any, Awaitable, Callable, Optional, Sequence - -from gateway.platforms.base import MessageEvent -from gateway.relay.auth import ( - DELIVERY_SIG_HEADER, - DELIVERY_TS_HEADER, - verify_delivery_signature, -) - -logger = logging.getLogger(__name__) - -# Callbacks the receiver dispatches verified deliveries to. -InboundMessageHandler = Callable[[MessageEvent], Awaitable[None]] -InboundInterruptHandler = Callable[[str, str], Awaitable[None]] - -try: # lazy/optional dep — mirrors the other HTTP-receiving adapters - from aiohttp import web -except ImportError: # pragma: no cover - exercised only when the extra is absent - web = None # type: ignore[assignment] - -AIOHTTP_AVAILABLE = web is not None - - -def _event_from_wire(raw: dict) -> MessageEvent: - """Rebuild a MessageEvent from the connector's normalized inbound payload. - - Identical mapping to the WS transport's ``_event_from_wire`` (the wire shape - is the same; only the transport differs). Kept here so the HTTP receiver has - no import dependency on the WS transport module. - """ - from gateway.config import Platform - from gateway.platforms.base import MessageType - from gateway.session import SessionSource - - src = raw.get("source", {}) or {} - platform = src.get("platform", "relay") - try: - platform_enum = Platform(platform) - except ValueError: - platform_enum = Platform.RELAY - - source = SessionSource( - platform=platform_enum, - chat_id=src.get("chat_id", ""), - chat_type=src.get("chat_type", "dm"), - chat_name=src.get("chat_name"), - user_id=src.get("user_id"), - user_name=src.get("user_name"), - thread_id=src.get("thread_id"), - chat_topic=src.get("chat_topic"), - user_id_alt=src.get("user_id_alt"), - chat_id_alt=src.get("chat_id_alt"), - guild_id=src.get("guild_id"), - parent_chat_id=src.get("parent_chat_id"), - message_id=src.get("message_id"), - ) - try: - msg_type = MessageType(raw.get("message_type", "text")) - except ValueError: - msg_type = MessageType.TEXT - - return MessageEvent( - text=raw.get("text", ""), - message_type=msg_type, - source=source, - message_id=raw.get("message_id"), - reply_to_message_id=raw.get("reply_to_message_id"), - media_urls=raw.get("media_urls") or [], - ) - - -class InboundDeliveryReceiver: - """Verifies + dispatches signed connector→gateway inbound deliveries. - - Transport-agnostic core: ``handle_raw`` takes the raw body bytes + headers + - which route was hit and returns ``(status, body)``. The aiohttp wiring - (``build_app`` / ``serve``) is a thin shell so the verify+dispatch logic is - unit-testable without a live socket. - """ - - def __init__( - self, - *, - delivery_key_verify_list: Callable[[], Sequence[str]], - on_message: InboundMessageHandler, - on_interrupt: Optional[InboundInterruptHandler] = None, - max_skew_seconds: int = 300, - ) -> None: - # A callable (not a static list) so a rotated delivery key is picked up - # without rebuilding the receiver — mirrors the connector's verify list. - self._verify_list = delivery_key_verify_list - self._on_message = on_message - self._on_interrupt = on_interrupt - self._max_skew_seconds = max_skew_seconds - - async def handle_raw( - self, *, raw_body: bytes, timestamp: Optional[str], signature: Optional[str], is_interrupt: bool - ) -> tuple[int, dict]: - """Verify the signature over ``raw_body`` and dispatch. Returns (status, json). - - 401 on a missing/invalid/expired signature (never dispatches unverified). - 400 on malformed JSON. 200 on a verified, dispatched delivery. - """ - verify_keys = list(self._verify_list() or []) - if not verify_keys: - # No delivery key provisioned -> we cannot verify -> reject. A gateway - # that hasn't enrolled must not accept inbound (fail closed). - logger.warning("relay inbound: no delivery key configured; rejecting") - return 401, {"error": "no delivery key configured"} - - # Verify over the EXACT raw bytes the connector signed. Decode to text - # with the same UTF-8 the connector's JSON.stringify produced; a single - # differing byte breaks the HMAC (raw-body-preservation discipline). - body_text = raw_body.decode("utf-8", errors="strict") - if not verify_delivery_signature( - body_text, timestamp, signature, verify_keys, self._max_skew_seconds - ): - return 401, {"error": "invalid delivery signature"} - - try: - payload = json.loads(body_text) - except json.JSONDecodeError: - return 400, {"error": "invalid JSON body"} - - if is_interrupt or payload.get("type") == "interrupt": - session_key = str(payload.get("session_key", "")) - chat_id = str(payload.get("chat_id", "") or payload.get("reason", "") or "") - if self._on_interrupt is not None and session_key: - await self._on_interrupt(session_key, chat_id) - return 200, {"ok": True} - - # Default: a normalized inbound message event. - event_raw = payload.get("event") - if not isinstance(event_raw, dict): - return 400, {"error": "missing event"} - event = _event_from_wire(event_raw) - await self._on_message(event) - return 200, {"ok": True} - - # ── aiohttp wiring (thin shell over handle_raw) ────────────────────── - def build_app(self) -> Any: - """Build an aiohttp Application exposing the delivery + interrupt routes.""" - if not AIOHTTP_AVAILABLE: - raise RuntimeError( - "InboundDeliveryReceiver requires the 'aiohttp' package " - "(install the messaging extra)." - ) - - async def _deliver(request: Any) -> Any: - return await self._respond(request, is_interrupt=False) - - async def _interrupt(request: Any) -> Any: - return await self._respond(request, is_interrupt=True) - - app = web.Application() - app.router.add_get("/healthz", lambda _: web.Response(text="ok")) - app.router.add_post("/", _deliver) - app.router.add_post("/interrupt", _interrupt) - return app - - async def _respond(self, request: Any, *, is_interrupt: bool) -> Any: - # Read the RAW bytes — do NOT use request.json() (it reparses and we'd - # verify over a re-serialized form, breaking the HMAC). - raw_body = await request.read() - status, body = await self.handle_raw( - raw_body=raw_body, - timestamp=request.headers.get(DELIVERY_TS_HEADER), - signature=request.headers.get(DELIVERY_SIG_HEADER), - is_interrupt=is_interrupt, - ) - return web.json_response(body, status=status) diff --git a/tests/gateway/relay/stub_connector.py b/tests/gateway/relay/stub_connector.py index 60e79a81a1b..11a97cae53a 100644 --- a/tests/gateway/relay/stub_connector.py +++ b/tests/gateway/relay/stub_connector.py @@ -26,6 +26,7 @@ class StubConnector: def __init__(self, descriptor: CapabilityDescriptor) -> None: self._descriptor = descriptor self._inbound: Optional[InboundHandler] = None + self._interrupt_inbound: Optional[Any] = None self.connected = False self.sent: List[Dict[str, Any]] = [] self.interrupts: List[Dict[str, Any]] = [] @@ -51,6 +52,11 @@ class StubConnector: def set_inbound_handler(self, handler: InboundHandler) -> None: self._inbound = handler + def set_interrupt_inbound_handler(self, handler: Any) -> None: + """Mirror the real WS transport: the adapter registers its interrupt + bridge here so connector→gateway interrupt_inbound frames route to it.""" + self._interrupt_inbound = handler + async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]: self.sent.append(action) if action.get("op") == "send": @@ -73,3 +79,9 @@ class StubConnector: if self._inbound is None: raise RuntimeError("no inbound handler registered (call adapter.connect first)") await self._inbound(event) + + async def push_interrupt(self, session_key: str, chat_id: str) -> None: + """Simulate the connector delivering an interrupt_inbound over the WS.""" + if self._interrupt_inbound is None: + raise RuntimeError("no interrupt_inbound handler registered (call adapter.connect first)") + await self._interrupt_inbound(session_key, chat_id) diff --git a/tests/gateway/relay/test_inbound_receiver.py b/tests/gateway/relay/test_inbound_receiver.py deleted file mode 100644 index 076fc3c9528..00000000000 --- a/tests/gateway/relay/test_inbound_receiver.py +++ /dev/null @@ -1,150 +0,0 @@ -"""Unit tests for gateway/relay/inbound_receiver.py. - -Covers the verify-then-dispatch core (handle_raw): a correctly-signed message -delivery is verified + dispatched; an interrupt delivery routes to the interrupt -handler; unsigned/tampered/expired/no-key deliveries are rejected 401; malformed -JSON is 400. Signatures are produced with the SAME auth primitives the connector -uses (gateway/relay/auth.py sign), so this exercises the real verify path. -""" - -from __future__ import annotations - -import json -import time - -import pytest - -from gateway.relay.auth import sign -from gateway.relay.inbound_receiver import InboundDeliveryReceiver - -_KEY = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff" - - -def _signed(body_obj: dict, key: str = _KEY, ts: int | None = None) -> tuple[bytes, str, str]: - """Serialize compactly (as the connector's JSON.stringify does), sign it.""" - body = json.dumps(body_obj, separators=(",", ":")) - raw = body.encode("utf-8") - t = ts if ts is not None else int(time.time()) - return raw, str(t), sign(f"{t}.{body}", key) - - -def _receiver(**kw): - received: list = [] - interrupts: list = [] - - async def on_message(ev): - received.append(ev) - - async def on_interrupt(sk, chat): - interrupts.append((sk, chat)) - - r = InboundDeliveryReceiver( - delivery_key_verify_list=lambda: [_KEY], - on_message=on_message, - on_interrupt=on_interrupt, - **kw, - ) - return r, received, interrupts - - -@pytest.mark.asyncio -async def test_valid_message_delivery_dispatched(): - r, received, _ = _receiver() - raw, ts, sig = _signed( - { - "type": "message", - "event": { - "text": "hello", - "message_type": "text", - "source": {"platform": "discord", "chat_id": "chan1", "chat_type": "group", "guild_id": "guildA"}, - }, - } - ) - status, body = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False) - assert status == 200 and body == {"ok": True} - assert len(received) == 1 - assert received[0].text == "hello" - assert received[0].source.guild_id == "guildA" - - -@pytest.mark.asyncio -async def test_valid_interrupt_delivery_routes_to_interrupt_handler(): - r, _, interrupts = _receiver() - raw, ts, sig = _signed({"type": "interrupt", "session_key": "agent:main:discord:group:c:u", "reason": "stop"}) - status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=True) - assert status == 200 - assert interrupts and interrupts[0][0] == "agent:main:discord:group:c:u" - - -@pytest.mark.asyncio -async def test_tampered_body_rejected_401(): - r, received, _ = _receiver() - raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}) - status, _ = await r.handle_raw(raw_body=raw + b" ", timestamp=ts, signature=sig, is_interrupt=False) - assert status == 401 - assert received == [] - - -@pytest.mark.asyncio -async def test_unsigned_rejected_401(): - r, _, _ = _receiver() - raw, _, _ = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}) - status, _ = await r.handle_raw(raw_body=raw, timestamp=None, signature=None, is_interrupt=False) - assert status == 401 - - -@pytest.mark.asyncio -async def test_expired_timestamp_rejected_401(): - r, _, _ = _receiver(max_skew_seconds=300) - raw, _, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, ts=1) - # ts=1 (1970) is far outside the 300s window vs now. - status, _ = await r.handle_raw(raw_body=raw, timestamp="1", signature=sig, is_interrupt=False) - assert status == 401 - - -@pytest.mark.asyncio -async def test_wrong_key_rejected_401(): - r, _, _ = _receiver() - other = "ffeeddccbbaa99887766554433221100ffeeddccbbaa99887766554433221100" - raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, key=other) - status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False) - assert status == 401 - - -@pytest.mark.asyncio -async def test_no_delivery_key_fails_closed_401(): - async def on_message(ev): - pass - - r = InboundDeliveryReceiver(delivery_key_verify_list=lambda: [], on_message=on_message) - raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}) - status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False) - assert status == 401 - - -@pytest.mark.asyncio -async def test_rotation_secondary_key_accepted(): - new = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" - received: list = [] - - async def on_message(ev): - received.append(ev) - - # Connector still signs with the OLD key (secondary); verify list has both. - r = InboundDeliveryReceiver( - delivery_key_verify_list=lambda: [new, _KEY], on_message=on_message - ) - raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, key=_KEY) - status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False) - assert status == 200 and len(received) == 1 - - -@pytest.mark.asyncio -async def test_malformed_json_after_valid_signature_is_400(): - r, _, _ = _receiver() - # Sign a non-JSON body so the signature passes but json.loads fails. - raw = b"not json at all" - ts = str(int(time.time())) - sig = sign(f"{ts}.{raw.decode()}", _KEY) - status, body = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False) - assert status == 400 diff --git a/tests/gateway/relay/test_relay_interrupt.py b/tests/gateway/relay/test_relay_interrupt.py index 49b6d8607ed..10f34308cf8 100644 --- a/tests/gateway/relay/test_relay_interrupt.py +++ b/tests/gateway/relay/test_relay_interrupt.py @@ -67,3 +67,23 @@ async def test_outbound_interrupt_reaches_connector(adapter): assert stub.interrupts == [ {"session_key": "agent:main:discord:group:chanA:userX", "reason": "stop"} ] + + +@pytest.mark.asyncio +async def test_connect_wires_inbound_interrupt_over_ws(adapter): + """WS-only inbound: connect() registers BOTH the inbound message handler AND + the interrupt_inbound handler on the transport, so a connector-delivered + interrupt_inbound frame (no HTTP receiver) reaches the right session.""" + await adapter.connect() + stub = adapter._transport + # Both connector->gateway handlers are wired post-connect. + assert stub._inbound is not None + assert stub._interrupt_inbound is not None + + key = "agent:main:discord:group:chanA:userX" + ev = asyncio.Event() + adapter._active_sessions[key] = ev + + # Simulate the connector pushing an interrupt_inbound frame down the WS. + await stub.push_interrupt(key, chat_id="chanA") + assert ev.is_set() is True, "interrupt delivered over the WS must cancel the target turn" diff --git a/tests/gateway/relay/test_relay_sheds_crypto.py b/tests/gateway/relay/test_relay_sheds_crypto.py index f2e0810af4a..4af7d7368ba 100644 --- a/tests/gateway/relay/test_relay_sheds_crypto.py +++ b/tests/gateway/relay/test_relay_sheds_crypto.py @@ -48,16 +48,14 @@ def _relay_py_files() -> list[Path]: # ``auth.py`` is the connector⇄gateway CHANNEL authenticator (the gateway's WS -# upgrade bearer + inbound-delivery signature verification). ``inbound_receiver.py`` -# is the signed-inbound-delivery receiver that USES that channel auth to verify -# connector→gateway POSTs. Both are net-new, intended, and the whole point of -# authenticating an untrusted/disposable gateway — they are NOT platform crypto. -# They use HMAC over the connector's per-gateway / per-tenant secrets (NOT any -# platform's signing secret), so they are exempt from the platform-crypto symbol -# scan below. The module-import ban (platform-crypto modules) still applies to -# every file including these — they import only stdlib hmac/hashlib and each -# other, never a platform-crypto module, so they stay clean there. -_CHANNEL_AUTH_FILES = {"auth.py", "inbound_receiver.py"} +# upgrade bearer). It is net-new, intended, and the whole point of +# authenticating an untrusted/disposable gateway — it is NOT platform crypto. +# It uses HMAC over the connector's per-gateway secret (NOT any platform's +# signing secret), so it is exempt from the platform-crypto symbol scan below. +# The module-import ban (platform-crypto modules) still applies to every file +# including this one — it imports only stdlib hmac/hashlib, never a +# platform-crypto module, so it stays clean there. +_CHANNEL_AUTH_FILES = {"auth.py"} def test_relay_package_imports_no_platform_crypto(): diff --git a/tests/gateway/relay/test_self_provision.py b/tests/gateway/relay/test_self_provision.py index 4b4a6070e7e..7a379eb5c3b 100644 --- a/tests/gateway/relay/test_self_provision.py +++ b/tests/gateway/relay/test_self_provision.py @@ -8,6 +8,8 @@ TRIGGER logic, in-process env wiring, and fail-soft boot behaviour. from __future__ import annotations +import os + import pytest import gateway.relay as relay @@ -126,8 +128,9 @@ def test_provisions_and_sets_env_in_process(monkeypatch): # Creds landed in os.environ (in-process), so register_relay_adapter() reads them. gid, secret = relay.relay_connection_auth() assert gid and secret == "a" * 64 - key, _host, _port = relay.relay_inbound_config() - assert key == "b" * 64 + # The delivery key is persisted in-process too (issued by the connector, + # kept for forward-compat; inbound rides the WS so it isn't consumed). + assert os.environ["GATEWAY_RELAY_DELIVERY_KEY"] == "b" * 64 def test_outbound_only_when_no_endpoint(monkeypatch):