feat(relay): handle passthrough_forward over the WS (Phase 5 §5.1, gateway half) (#50702)

The connector half (gateway-gateway) moves the passthrough plane's post-ACK
forward off the HTTP gatewayEndpoint onto the gateway's outbound /relay WS via
a new passthrough_forward frame. This is the gateway side: the relay adapter
now RECEIVES and handles that frame, so a hosted gateway (no public IP) can
process forwarded Class-2/3 traffic (Discord interactions, Twilio) over the
socket it already holds — closing the "passthrough inbound doesn't work for
hosted gateways" gap.

- ws_transport.py: decode the passthrough_forward frame; PassthroughForward
  dataclass + _passthrough_from_wire (base64 body -> exact bytes, byte parity
  with the connector's toPassthroughForward); set_passthrough_handler mirrors
  set_interrupt_inbound_handler.
- transport.py: PassthroughHandler type + set_passthrough_handler on the
  RelayTransport protocol.
- adapter.py: connect() wires the passthrough handler; _on_passthrough decodes
  the (already-sanitized, token-free) forward and, for a Discord interaction,
  converts it to a MessageEvent routed through the normal agent path
  (handle_message) — the reply egresses over the outbound / token-less
  follow_up path, so the gateway never holds the interaction credential. Never
  raises (a bad forward can't kill the read loop). Non-discord forwards (Twilio)
  are logged + dropped for now.
- docs/relay-connector-contract.md: document the passthrough_forward frame +
  PassthroughForward shape + §3.1.

The interaction -> MessageEvent CONVERSION semantics (slash-command vs button
UX, option rendering) are the open sub-design flagged in the spec; the TRANSPORT
+ receive mechanism (this) is settled per Ben's Gate-2 decision: "the relay
adapter handles receiving these events over the WS."

Tests (tests/gateway/relay/test_relay_passthrough.py): byte-preservation
round-trip (+ malformed-body tolerance), connect() wiring, application-command
and message-component interactions route through handle_message with correct
session source + scope capture, malformed/non-discord forwards dropped cleanly.
100 relay tests green. Pairs with the connector PR (gateway-gateway).
This commit is contained in:
Ben Barclay 2026-06-22 20:10:57 +10:00 committed by GitHub
parent ab22317d09
commit 64a507da44
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 425 additions and 4 deletions

View file

@ -93,6 +93,16 @@ Frames (connector → gateway, over the WS):
- `{"type":"inbound", "event": <MessageEvent>, "bufferId"?}`
- `{"type":"interrupt_inbound", "session_key", "chat_id"}` (§5)
- `{"type":"passthrough_forward", "forward": <PassthroughForward>, "bufferId"?}` (§5.1)
`PassthroughForward` is the wire form of a forwarded passthrough-plane request
(Class-2/3 webhooks — Discord interactions, Twilio): `{platform, botId, method,
path, headers: [[k,v],…], bodyB64}`. The body is base64-encoded so arbitrary
bytes survive the newline-delimited-JSON transport; the gateway base64-decodes
back to the exact bytes the connector forwarded (the connector already verified
the provider signature and stripped any shared-identity credential at the edge —
§6 — so the gateway re-processes a sanitized, token-free body and acts on it via
the token-less `follow_up` path). See §3.1.
**Trust.** The WS upgrade is authenticated with the gateway's per-gateway secret
(§6.1), so the channel is trusted end to end — inbound frames are not separately
@ -106,9 +116,24 @@ old HTTP path needed). The relay-bus hop is inside the connector trust domain
> every gateway to expose a reachable inbound URL — impossible for hosted
> gateways, which have no public IP. The WS back-channel above replaces it; the
> per-tenant delivery key is retained at provision for forward-compat but is no
> longer used for inbound. `gatewayEndpoint` remains only for the **passthrough
> plane** (Class-2/3 webhooks like Discord interactions / Twilio), which is a
> separate synchronous-forward path and out of scope for this section.
> longer used for inbound. The **passthrough plane** (Class-2/3 webhooks like
> Discord interactions / Twilio) historically still used `gatewayEndpoint` for
> its post-ACK forward; Phase 5 §5.1 moves that forward onto the WS too (the
> `passthrough_forward` frame above), so a hosted gateway needs zero public
> inbound surface and `gatewayEndpoint` is retired once the cutover lands.
### 3.1 Passthrough-plane forward (§5.1)
The passthrough plane answers the provider's latency-critical ACK at the
connector EDGE (e.g. Discord's deferred interaction response within ~3s), then
does a **fire-and-forget** forward of the real request to the gateway. That
forward needs no response back (the provider was already satisfied), so it rides
the same outbound WS as `inbound` via a `passthrough_forward` frame rather than
an HTTP POST. The gateway processes the decoded request through its normal agent
path (a Discord interaction is decoded to a `MessageEvent` and handled like a
message; the reply egresses over the outbound / `follow_up` path). `bufferId` is
present when the forward was buffered (Phase 5 §5.3 buffered-only flip) and the
gateway acks it after durable handoff.

View file

@ -22,9 +22,10 @@ import logging
from typing import Any, Callable, Dict, Optional
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, SendResult
from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult
from gateway.relay.descriptor import CapabilityDescriptor
from gateway.relay.transport import RelayTransport
from gateway.session import SessionSource
logger = logging.getLogger(__name__)
@ -89,6 +90,13 @@ class RelayAdapter(BasePlatformAdapter):
set_interrupt = getattr(self._transport, "set_interrupt_inbound_handler", None)
if callable(set_interrupt):
set_interrupt(self.on_interrupt)
# Passthrough-plane forwards (Discord interactions, Twilio, …) also ride
# the SAME outbound WS (Phase 5 §5.1) — the connector edge-ACKed and
# forwards the real request here, so a hosted gateway needs no public
# inbound port. Bridge them to the adapter's passthrough handler.
set_passthrough = getattr(self._transport, "set_passthrough_handler", None)
if callable(set_passthrough):
set_passthrough(self._on_passthrough)
ok = await self._transport.connect()
if not ok:
return False
@ -155,6 +163,95 @@ class RelayAdapter(BasePlatformAdapter):
"""
await self.interrupt_session_activity(session_key, chat_id)
async def _on_passthrough(self, forward, buffer_id: Optional[str] = None) -> None:
"""Handle a connector-forwarded passthrough request (Phase 5 §5.1).
The passthrough plane (Discord interactions, Twilio webhooks, ) answers
the provider's latency-critical ACK at the connector EDGE, then forwards
the real, ALREADY-SANITIZED request to this gateway over the outbound WS.
The connector is the trust boundary: it verified the provider signature
at the edge and stripped any shared-identity credential (e.g. a Discord
interaction follow-up token) into its vault so this body carries no
token, and the agent later acts on it via the token-less ``follow_up``
path (``send_follow_up``), never holding the credential.
For a Discord interaction we decode the (JSON) body and convert it to a
normalized ``MessageEvent`` so it flows through the SAME agent path as a
chat message (``handle_message``); the agent's reply egresses over the
normal outbound/follow_up path. Non-JSON or non-interaction forwards are
logged and dropped for now (Twilio/SMS over the relay is a later unit).
NEVER raises: a malformed forward must not kill the read loop.
NOTE (open semantic sub-design, flagged for review): the interaction ->
MessageEvent mapping below is the v1 default. The exact agent UX for a
slash-command / button interaction (vs. a plain message) command name
surfacing, option rendering, deferred-vs-immediate response is the open
piece tracked in the spec; the TRANSPORT + receive mechanism (this whole
path) is settled.
"""
try:
platform = getattr(forward, "platform", "") or ""
if platform == "discord":
event = self._discord_interaction_to_event(forward)
if event is not None:
self._capture_scope(event)
await self.handle_message(event)
return
logger.info(
"relay passthrough_forward dropped (no handler): platform=%s method=%s path=%s",
platform,
getattr(forward, "method", "?"),
getattr(forward, "path", "?"),
)
except Exception: # noqa: BLE001 - a bad forward must never break the reader
logger.warning("relay passthrough_forward handling failed", exc_info=True)
def _discord_interaction_to_event(self, forward):
"""Convert a forwarded Discord interaction body to a MessageEvent, or None.
Builds the session source the same way the connector does for an
interaction (``interactionSessionSource`` on the connector side), so the
agent's session key matches the one the connector bound the follow-up
capability under. Returns None when the body isn't a usable interaction
(e.g. a PING, which the connector already answers at the edge and never
forwards).
"""
import json
from gateway.platforms.base import MessageType
try:
payload = json.loads(bytes(getattr(forward, "body", b"")).decode("utf-8"))
except Exception: # noqa: BLE001
return None
if not isinstance(payload, dict):
return None
# type 1 = PING (answered at the edge, never forwarded); 2 = APPLICATION_COMMAND;
# 3 = MESSAGE_COMPONENT; 5 = MODAL_SUBMIT. Surface a best-effort text.
itype = payload.get("type")
data = payload.get("data") or {}
if itype == 2:
text = str(data.get("name") or "")
elif itype == 3:
text = str(data.get("custom_id") or "")
else:
text = ""
member = payload.get("member") or {}
user = (member.get("user") if isinstance(member, dict) else None) or payload.get("user") or {}
channel_id = str(payload.get("channel_id") or "")
guild_id = payload.get("guild_id")
source = SessionSource(
platform=Platform.RELAY,
chat_id=channel_id,
chat_type="channel" if guild_id else "dm",
user_id=str(user.get("id")) if isinstance(user, dict) and user.get("id") else None,
user_name=str(user.get("username")) if isinstance(user, dict) and user.get("username") else None,
guild_id=str(guild_id) if guild_id else None,
message_id=str(payload.get("id")) if payload.get("id") else None,
)
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
async def disconnect(self) -> None:
if self._transport is not None:
await self._transport.disconnect()

View file

@ -30,6 +30,13 @@ from gateway.relay.descriptor import CapabilityDescriptor
# Callback the transport invokes for each inbound normalized event.
InboundHandler = Callable[[MessageEvent], Awaitable[None]]
# Callback the transport invokes for each forwarded passthrough request (§5.1).
# The first arg is a PassthroughForward (gateway/relay/ws_transport.py) — typed
# as Any here to keep this protocol module free of a concrete-transport import
# (ws_transport imports FROM this module). The second is an optional bufferId
# (Phase 5 §5.3 buffered flip) the handler acks after durable handoff.
PassthroughHandler = Callable[[Any, Optional[str]], Awaitable[None]]
@runtime_checkable
class RelayTransport(Protocol):
@ -51,6 +58,18 @@ class RelayTransport(Protocol):
"""Register the callback invoked with each inbound MessageEvent."""
...
def set_passthrough_handler(self, handler: "PassthroughHandler") -> None:
"""Register the callback invoked with each forwarded passthrough request.
Phase 5 §5.1: the passthrough plane (Discord interactions, Twilio, )
answers the provider's edge ACK at the connector, then forwards the real
request to the gateway over this same outbound socket (a hosted gateway
has no public inbound port). The transport invokes ``handler(forward,
buffer_id)`` for each ``passthrough_forward`` frame. Optional on a
transport (an in-memory stub may not implement it).
"""
...
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
"""Carry an outbound action (send/edit/typing) to the connector.

View file

@ -33,6 +33,7 @@ import asyncio
import json
import logging
import uuid
from dataclasses import dataclass
from typing import Any, Dict, Optional
from gateway.platforms.base import MessageEvent, MessageType
@ -128,6 +129,54 @@ def _event_from_wire(raw: Dict[str, Any]) -> MessageEvent:
)
@dataclass
class PassthroughForward:
"""A connector-forwarded passthrough-plane request (Phase 5 §5.1).
The connector answered the provider's latency-critical ACK at its edge, then
forwarded the real (already-sanitized) request to this gateway over the WS.
``body`` is the exact decoded bytes the connector forwarded (the wire carries
it base64-encoded for byte parity). ``headers`` preserve arrival order.
"""
platform: str
bot_id: str
method: str
path: str
headers: list[tuple[str, str]]
body: bytes
def _passthrough_from_wire(raw: Dict[str, Any]) -> PassthroughForward:
"""Rebuild a PassthroughForward from the connector's wire frame.
Mirrors the connector's ``PassthroughForward`` (relay/protocol.ts): the body
is base64-decoded back to the exact bytes the connector forwarded, so the
gateway re-processes byte-identical content (the connector is the trust
boundary; it already verified at the edge).
"""
import base64
body_b64 = raw.get("bodyB64", "") or ""
try:
body = base64.b64decode(body_b64)
except Exception: # noqa: BLE001 - a malformed body must not crash the reader
body = b""
headers_raw = raw.get("headers", []) or []
headers: list[tuple[str, str]] = []
for pair in headers_raw:
if isinstance(pair, (list, tuple)) and len(pair) == 2:
headers.append((str(pair[0]), str(pair[1])))
return PassthroughForward(
platform=str(raw.get("platform", "")),
bot_id=str(raw.get("botId", "")),
method=str(raw.get("method", "")),
path=str(raw.get("path", "")),
headers=headers,
body=body,
)
class WebSocketRelayTransport:
"""RelayTransport over a WebSocket connection the gateway dials to the connector."""
@ -318,6 +367,16 @@ class WebSocketRelayTransport:
handler = getattr(self, "_interrupt_inbound_handler", None)
if handler is not None:
await handler(frame.get("session_key", ""), frame.get("chat_id", ""))
elif ftype == "passthrough_forward":
# Phase 5 §5.1: a forwarded passthrough-plane request (Discord
# interaction, Twilio, …) the connector already edge-ACKed. It rides
# the SAME outbound WS as inbound messages so a hosted gateway needs
# no public inbound port. Dispatch to the adapter's handler; the
# bufferId (when present, §5.3 buffered flip) is passed for ack.
handler = getattr(self, "_passthrough_handler", None)
if handler is not None:
fwd = _passthrough_from_wire(frame.get("forward", {}))
await handler(fwd, frame.get("bufferId"))
else:
# hello/outbound/interrupt are gateway->connector; ignore if echoed.
pass
@ -325,3 +384,12 @@ class WebSocketRelayTransport:
def set_interrupt_inbound_handler(self, handler: Any) -> None:
"""Register the callback for connector->gateway interrupt_inbound frames."""
self._interrupt_inbound_handler = handler
def set_passthrough_handler(self, handler: Any) -> None:
"""Register the callback for connector->gateway passthrough_forward frames.
Mirrors set_interrupt_inbound_handler: the runner/adapter wires this so a
forwarded passthrough request (Phase 5 §5.1) reaches the adapter over the
same outbound WS the gateway already holds. ``handler(forward, buffer_id)``.
"""
self._passthrough_handler = handler

View file

@ -27,6 +27,7 @@ class StubConnector:
self._descriptor = descriptor
self._inbound: Optional[InboundHandler] = None
self._interrupt_inbound: Optional[Any] = None
self._passthrough: Optional[Any] = None
self.connected = False
self.sent: List[Dict[str, Any]] = []
self.interrupts: List[Dict[str, Any]] = []
@ -57,6 +58,12 @@ class StubConnector:
bridge here so connectorgateway interrupt_inbound frames route to it."""
self._interrupt_inbound = handler
def set_passthrough_handler(self, handler: Any) -> None:
"""Mirror the real WS transport: the adapter registers its passthrough
bridge here so connectorgateway passthrough_forward frames route to it
(Phase 5 §5.1)."""
self._passthrough = handler
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
self.sent.append(action)
if action.get("op") == "send":
@ -85,3 +92,9 @@ class StubConnector:
if self._interrupt_inbound is None:
raise RuntimeError("no interrupt_inbound handler registered (call adapter.connect first)")
await self._interrupt_inbound(session_key, chat_id)
async def push_passthrough(self, forward: Any, buffer_id: Optional[str] = None) -> None:
"""Simulate the connector forwarding a passthrough request over the WS (§5.1)."""
if self._passthrough is None:
raise RuntimeError("no passthrough handler registered (call adapter.connect first)")
await self._passthrough(forward, buffer_id)

View file

@ -0,0 +1,199 @@
"""Relay passthrough-over-WS forwarding (Phase 5 §5.1).
Proves the gateway side of §5.1: a connector-forwarded passthrough request
(Discord interaction, Twilio, ) arrives over the SAME outbound /relay WS as
inbound messages (a hosted gateway has no public inbound port), and the relay
adapter handles it decoding the byte-preserved body and routing a Discord
interaction through the normal agent path (handle_message).
Mirrors test_relay_interrupt.py's wiring discipline (connect() registers the
connector->gateway handlers on the transport).
"""
from __future__ import annotations
import base64
import json
import pytest
from gateway.config import PlatformConfig
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
from gateway.relay.ws_transport import PassthroughForward, _passthrough_from_wire
from tests.gateway.relay.stub_connector import StubConnector
def _desc() -> CapabilityDescriptor:
return CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="discord",
label="Discord",
max_message_length=2000,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=True,
markdown_dialect="discord",
len_unit="chars",
)
@pytest.fixture
def adapter():
return RelayAdapter(PlatformConfig(), _desc(), transport=StubConnector(_desc()))
def _interaction_forward(payload: dict) -> PassthroughForward:
body = json.dumps(payload).encode("utf-8")
return PassthroughForward(
platform="discord",
bot_id="appShared",
method="POST",
path="/interactions/discord/appShared",
headers=[("content-type", "application/json")],
body=body,
)
def test_passthrough_from_wire_byte_preserves_body():
"""The wire frame's base64 body decodes back to the exact bytes (parity with
the connector's toPassthroughForward)."""
original = json.dumps({"type": 2, "data": {"name": "ping"}, "guild_id": "g1"}).encode("utf-8")
wire = {
"platform": "discord",
"botId": "appShared",
"method": "POST",
"path": "/interactions/discord/appShared",
"headers": [["content-type", "application/json"]],
"bodyB64": base64.b64encode(original).decode("ascii"),
}
fwd = _passthrough_from_wire(wire)
assert fwd.platform == "discord"
assert fwd.bot_id == "appShared"
assert fwd.body == original
assert fwd.headers == [("content-type", "application/json")]
def test_passthrough_from_wire_tolerates_malformed_body():
"""A non-base64 body must not raise (the reader must never crash)."""
fwd = _passthrough_from_wire({"platform": "x", "bodyB64": "!!!not base64!!!"})
assert fwd.body == b""
@pytest.mark.asyncio
async def test_connect_wires_passthrough_handler_over_ws(adapter):
"""connect() registers the passthrough handler on the transport so a
connector-delivered passthrough_forward frame reaches the adapter."""
await adapter.connect()
stub = adapter._transport
assert stub._passthrough is not None
@pytest.mark.asyncio
async def test_discord_interaction_routes_through_handle_message(adapter, monkeypatch):
"""A forwarded Discord application-command interaction is decoded and routed
through the normal agent path (handle_message) with a correct session source."""
await adapter.connect()
stub = adapter._transport
seen = []
async def fake_handle(event):
seen.append(event)
monkeypatch.setattr(adapter, "handle_message", fake_handle)
fwd = _interaction_forward(
{
"id": "interaction-1",
"type": 2, # APPLICATION_COMMAND
"channel_id": "chan-9",
"guild_id": "guild-7",
"data": {"name": "summarize"},
"member": {"user": {"id": "user-3", "username": "ben"}},
}
)
await stub.push_passthrough(fwd, buffer_id=None)
assert len(seen) == 1
ev = seen[0]
assert ev.text == "summarize"
assert ev.source.chat_id == "chan-9"
assert ev.source.guild_id == "guild-7"
assert ev.source.user_id == "user-3"
assert ev.source.chat_type == "channel"
# Scope captured so the agent's reply re-asserts guild_id for egress.
assert adapter._scope_by_chat.get("chan-9") == "guild-7"
@pytest.mark.asyncio
async def test_message_component_interaction_uses_custom_id(adapter, monkeypatch):
"""A MESSAGE_COMPONENT (button) interaction surfaces its custom_id as text."""
await adapter.connect()
stub = adapter._transport
seen = []
async def fake_handle(event):
seen.append(event)
monkeypatch.setattr(adapter, "handle_message", fake_handle)
fwd = _interaction_forward(
{
"id": "i2",
"type": 3, # MESSAGE_COMPONENT
"channel_id": "c2",
"guild_id": "g2",
"data": {"custom_id": "approve_btn"},
"member": {"user": {"id": "u2", "username": "x"}},
}
)
await stub.push_passthrough(fwd)
assert len(seen) == 1
assert seen[0].text == "approve_btn"
@pytest.mark.asyncio
async def test_malformed_interaction_body_does_not_raise(adapter, monkeypatch):
"""A non-JSON forward is logged and dropped — never crashes the read loop."""
await adapter.connect()
stub = adapter._transport
called = []
async def fake_handle(event):
called.append(event)
monkeypatch.setattr(adapter, "handle_message", fake_handle)
bad = PassthroughForward(
platform="discord",
bot_id="appShared",
method="POST",
path="/x",
headers=[],
body=b"not json",
)
await stub.push_passthrough(bad) # must not raise
assert called == []
@pytest.mark.asyncio
async def test_non_discord_forward_dropped_cleanly(adapter, monkeypatch):
"""A platform with no gateway-side handler yet (e.g. twilio) is dropped, not raised."""
await adapter.connect()
stub = adapter._transport
called = []
async def fake_handle(event):
called.append(event)
monkeypatch.setattr(adapter, "handle_message", fake_handle)
fwd = PassthroughForward(
platform="twilio",
bot_id="bot1",
method="POST",
path="/webhooks/twilio/seg",
headers=[],
body=b"From=+1&Body=hi",
)
await stub.push_passthrough(fwd) # must not raise
assert called == []