feat(relay): WS-only inbound on the gateway adapter (Phase 3) (#48294)

The connector now delivers inbound (messages + interrupts) over the gateway's
OUTBOUND /relay WebSocket, not a signed HTTP POST to an inbound endpoint. The
gateway needs no inbound HTTP port — which is what makes hosted gateways (no
public IP) able to receive inbound at all.

- gateway/relay/adapter.py: connect() wires set_interrupt_inbound_handler(
  self.on_interrupt) so connector->gateway interrupt_inbound frames bridge into
  the existing per-session interrupt path (the inbound message handler was
  already wired). Removed _maybe_start_inbound_receiver() + the _inbound_runner
  lifecycle — there is no HTTP receiver anymore.
- gateway/relay/inbound_receiver.py: deleted (the signed-HTTP InboundDelivery
  receiver).
- gateway/relay/__init__.py: removed relay_inbound_config() (dead with the
  receiver gone). The delivery key is still set in-process by self-provision for
  forward-compat but is no longer consumed for inbound.
- docs/relay-connector-contract.md: §3 rewritten — inbound is the WS back-channel
  routed cross-instance via the connector's relay bus; §5 interrupt + §6 auth
  table updated; the old signed-HTTP-POST + per-tenant-delivery-key-signing path
  is documented as superseded. gatewayEndpoint noted as passthrough-plane only.

Tests: stub_connector grows set_interrupt_inbound_handler + push_interrupt;
new test_relay_interrupt case proves connect() wires BOTH inbound handlers and an
interrupt_inbound frame over the WS cancels the right session. Removed the
HTTP-receiver test; updated the crypto-shedding scan + self-provision delivery-key
assertion. 88 relay tests pass.

EXPERIMENTAL. Pairs with gateway-gateway (relay bus + WsGatewayDelivery) and the
NAS GATEWAY_RELAY_URL stamp. The cross-repo E2E (connector repo) proves the full
multi-instance path against this production adapter code.
This commit is contained in:
Ben Barclay 2026-06-19 09:33:15 +10:00 committed by GitHub
parent 03d9a95a74
commit d2c53ff558
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 117 additions and 476 deletions

View file

@ -26,6 +26,7 @@ class StubConnector:
def __init__(self, descriptor: CapabilityDescriptor) -> None:
self._descriptor = descriptor
self._inbound: Optional[InboundHandler] = None
self._interrupt_inbound: Optional[Any] = None
self.connected = False
self.sent: List[Dict[str, Any]] = []
self.interrupts: List[Dict[str, Any]] = []
@ -51,6 +52,11 @@ class StubConnector:
def set_inbound_handler(self, handler: InboundHandler) -> None:
self._inbound = handler
def set_interrupt_inbound_handler(self, handler: Any) -> None:
"""Mirror the real WS transport: the adapter registers its interrupt
bridge here so connectorgateway interrupt_inbound frames route to it."""
self._interrupt_inbound = handler
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
self.sent.append(action)
if action.get("op") == "send":
@ -73,3 +79,9 @@ class StubConnector:
if self._inbound is None:
raise RuntimeError("no inbound handler registered (call adapter.connect first)")
await self._inbound(event)
async def push_interrupt(self, session_key: str, chat_id: str) -> None:
"""Simulate the connector delivering an interrupt_inbound over the WS."""
if self._interrupt_inbound is None:
raise RuntimeError("no interrupt_inbound handler registered (call adapter.connect first)")
await self._interrupt_inbound(session_key, chat_id)

View file

@ -1,150 +0,0 @@
"""Unit tests for gateway/relay/inbound_receiver.py.
Covers the verify-then-dispatch core (handle_raw): a correctly-signed message
delivery is verified + dispatched; an interrupt delivery routes to the interrupt
handler; unsigned/tampered/expired/no-key deliveries are rejected 401; malformed
JSON is 400. Signatures are produced with the SAME auth primitives the connector
uses (gateway/relay/auth.py sign), so this exercises the real verify path.
"""
from __future__ import annotations
import json
import time
import pytest
from gateway.relay.auth import sign
from gateway.relay.inbound_receiver import InboundDeliveryReceiver
_KEY = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
def _signed(body_obj: dict, key: str = _KEY, ts: int | None = None) -> tuple[bytes, str, str]:
"""Serialize compactly (as the connector's JSON.stringify does), sign it."""
body = json.dumps(body_obj, separators=(",", ":"))
raw = body.encode("utf-8")
t = ts if ts is not None else int(time.time())
return raw, str(t), sign(f"{t}.{body}", key)
def _receiver(**kw):
received: list = []
interrupts: list = []
async def on_message(ev):
received.append(ev)
async def on_interrupt(sk, chat):
interrupts.append((sk, chat))
r = InboundDeliveryReceiver(
delivery_key_verify_list=lambda: [_KEY],
on_message=on_message,
on_interrupt=on_interrupt,
**kw,
)
return r, received, interrupts
@pytest.mark.asyncio
async def test_valid_message_delivery_dispatched():
r, received, _ = _receiver()
raw, ts, sig = _signed(
{
"type": "message",
"event": {
"text": "hello",
"message_type": "text",
"source": {"platform": "discord", "chat_id": "chan1", "chat_type": "group", "guild_id": "guildA"},
},
}
)
status, body = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 200 and body == {"ok": True}
assert len(received) == 1
assert received[0].text == "hello"
assert received[0].source.guild_id == "guildA"
@pytest.mark.asyncio
async def test_valid_interrupt_delivery_routes_to_interrupt_handler():
r, _, interrupts = _receiver()
raw, ts, sig = _signed({"type": "interrupt", "session_key": "agent:main:discord:group:c:u", "reason": "stop"})
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=True)
assert status == 200
assert interrupts and interrupts[0][0] == "agent:main:discord:group:c:u"
@pytest.mark.asyncio
async def test_tampered_body_rejected_401():
r, received, _ = _receiver()
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}})
status, _ = await r.handle_raw(raw_body=raw + b" ", timestamp=ts, signature=sig, is_interrupt=False)
assert status == 401
assert received == []
@pytest.mark.asyncio
async def test_unsigned_rejected_401():
r, _, _ = _receiver()
raw, _, _ = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}})
status, _ = await r.handle_raw(raw_body=raw, timestamp=None, signature=None, is_interrupt=False)
assert status == 401
@pytest.mark.asyncio
async def test_expired_timestamp_rejected_401():
r, _, _ = _receiver(max_skew_seconds=300)
raw, _, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, ts=1)
# ts=1 (1970) is far outside the 300s window vs now.
status, _ = await r.handle_raw(raw_body=raw, timestamp="1", signature=sig, is_interrupt=False)
assert status == 401
@pytest.mark.asyncio
async def test_wrong_key_rejected_401():
r, _, _ = _receiver()
other = "ffeeddccbbaa99887766554433221100ffeeddccbbaa99887766554433221100"
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, key=other)
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 401
@pytest.mark.asyncio
async def test_no_delivery_key_fails_closed_401():
async def on_message(ev):
pass
r = InboundDeliveryReceiver(delivery_key_verify_list=lambda: [], on_message=on_message)
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}})
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 401
@pytest.mark.asyncio
async def test_rotation_secondary_key_accepted():
new = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
received: list = []
async def on_message(ev):
received.append(ev)
# Connector still signs with the OLD key (secondary); verify list has both.
r = InboundDeliveryReceiver(
delivery_key_verify_list=lambda: [new, _KEY], on_message=on_message
)
raw, ts, sig = _signed({"type": "message", "event": {"text": "x", "source": {"chat_id": "c"}}}, key=_KEY)
status, _ = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 200 and len(received) == 1
@pytest.mark.asyncio
async def test_malformed_json_after_valid_signature_is_400():
r, _, _ = _receiver()
# Sign a non-JSON body so the signature passes but json.loads fails.
raw = b"not json at all"
ts = str(int(time.time()))
sig = sign(f"{ts}.{raw.decode()}", _KEY)
status, body = await r.handle_raw(raw_body=raw, timestamp=ts, signature=sig, is_interrupt=False)
assert status == 400

View file

@ -67,3 +67,23 @@ async def test_outbound_interrupt_reaches_connector(adapter):
assert stub.interrupts == [
{"session_key": "agent:main:discord:group:chanA:userX", "reason": "stop"}
]
@pytest.mark.asyncio
async def test_connect_wires_inbound_interrupt_over_ws(adapter):
"""WS-only inbound: connect() registers BOTH the inbound message handler AND
the interrupt_inbound handler on the transport, so a connector-delivered
interrupt_inbound frame (no HTTP receiver) reaches the right session."""
await adapter.connect()
stub = adapter._transport
# Both connector->gateway handlers are wired post-connect.
assert stub._inbound is not None
assert stub._interrupt_inbound is not None
key = "agent:main:discord:group:chanA:userX"
ev = asyncio.Event()
adapter._active_sessions[key] = ev
# Simulate the connector pushing an interrupt_inbound frame down the WS.
await stub.push_interrupt(key, chat_id="chanA")
assert ev.is_set() is True, "interrupt delivered over the WS must cancel the target turn"

View file

@ -48,16 +48,14 @@ def _relay_py_files() -> list[Path]:
# ``auth.py`` is the connector⇄gateway CHANNEL authenticator (the gateway's WS
# upgrade bearer + inbound-delivery signature verification). ``inbound_receiver.py``
# is the signed-inbound-delivery receiver that USES that channel auth to verify
# connector→gateway POSTs. Both are net-new, intended, and the whole point of
# authenticating an untrusted/disposable gateway — they are NOT platform crypto.
# They use HMAC over the connector's per-gateway / per-tenant secrets (NOT any
# platform's signing secret), so they are exempt from the platform-crypto symbol
# scan below. The module-import ban (platform-crypto modules) still applies to
# every file including these — they import only stdlib hmac/hashlib and each
# other, never a platform-crypto module, so they stay clean there.
_CHANNEL_AUTH_FILES = {"auth.py", "inbound_receiver.py"}
# upgrade bearer). It is net-new, intended, and the whole point of
# authenticating an untrusted/disposable gateway — it is NOT platform crypto.
# It uses HMAC over the connector's per-gateway secret (NOT any platform's
# signing secret), so it is exempt from the platform-crypto symbol scan below.
# The module-import ban (platform-crypto modules) still applies to every file
# including this one — it imports only stdlib hmac/hashlib, never a
# platform-crypto module, so it stays clean there.
_CHANNEL_AUTH_FILES = {"auth.py"}
def test_relay_package_imports_no_platform_crypto():

View file

@ -8,6 +8,8 @@ TRIGGER logic, in-process env wiring, and fail-soft boot behaviour.
from __future__ import annotations
import os
import pytest
import gateway.relay as relay
@ -126,8 +128,9 @@ def test_provisions_and_sets_env_in_process(monkeypatch):
# Creds landed in os.environ (in-process), so register_relay_adapter() reads them.
gid, secret = relay.relay_connection_auth()
assert gid and secret == "a" * 64
key, _host, _port = relay.relay_inbound_config()
assert key == "b" * 64
# The delivery key is persisted in-process too (issued by the connector,
# kept for forward-compat; inbound rides the WS so it isn't consumed).
assert os.environ["GATEWAY_RELAY_DELIVERY_KEY"] == "b" * 64
def test_outbound_only_when_no_endpoint(monkeypatch):