diff --git a/docs/relay-connector-contract.md b/docs/relay-connector-contract.md index 54fff9406cc..4e20726197f 100644 --- a/docs/relay-connector-contract.md +++ b/docs/relay-connector-contract.md @@ -93,6 +93,16 @@ Frames (connector → gateway, over the WS): - `{"type":"inbound", "event": , "bufferId"?}` - `{"type":"interrupt_inbound", "session_key", "chat_id"}` (§5) +- `{"type":"passthrough_forward", "forward": , "bufferId"?}` (§5.1) + +`PassthroughForward` is the wire form of a forwarded passthrough-plane request +(Class-2/3 webhooks — Discord interactions, Twilio): `{platform, botId, method, +path, headers: [[k,v],…], bodyB64}`. The body is base64-encoded so arbitrary +bytes survive the newline-delimited-JSON transport; the gateway base64-decodes +back to the exact bytes the connector forwarded (the connector already verified +the provider signature and stripped any shared-identity credential at the edge — +§6 — so the gateway re-processes a sanitized, token-free body and acts on it via +the token-less `follow_up` path). See §3.1. **Trust.** The WS upgrade is authenticated with the gateway's per-gateway secret (§6.1), so the channel is trusted end to end — inbound frames are not separately @@ -106,9 +116,24 @@ old HTTP path needed). The relay-bus hop is inside the connector trust domain > every gateway to expose a reachable inbound URL — impossible for hosted > gateways, which have no public IP. The WS back-channel above replaces it; the > per-tenant delivery key is retained at provision for forward-compat but is no -> longer used for inbound. `gatewayEndpoint` remains only for the **passthrough -> plane** (Class-2/3 webhooks like Discord interactions / Twilio), which is a -> separate synchronous-forward path and out of scope for this section. +> longer used for inbound. The **passthrough plane** (Class-2/3 webhooks like +> Discord interactions / Twilio) historically still used `gatewayEndpoint` for +> its post-ACK forward; Phase 5 §5.1 moves that forward onto the WS too (the +> `passthrough_forward` frame above), so a hosted gateway needs zero public +> inbound surface and `gatewayEndpoint` is retired once the cutover lands. + +### 3.1 Passthrough-plane forward (§5.1) + +The passthrough plane answers the provider's latency-critical ACK at the +connector EDGE (e.g. Discord's deferred interaction response within ~3s), then +does a **fire-and-forget** forward of the real request to the gateway. That +forward needs no response back (the provider was already satisfied), so it rides +the same outbound WS as `inbound` via a `passthrough_forward` frame rather than +an HTTP POST. The gateway processes the decoded request through its normal agent +path (a Discord interaction is decoded to a `MessageEvent` and handled like a +message; the reply egresses over the outbound / `follow_up` path). `bufferId` is +present when the forward was buffered (Phase 5 §5.3 buffered-only flip) and the +gateway acks it after durable handoff. diff --git a/gateway/relay/adapter.py b/gateway/relay/adapter.py index a1a7826f8f8..9e44a34b421 100644 --- a/gateway/relay/adapter.py +++ b/gateway/relay/adapter.py @@ -22,9 +22,10 @@ import logging from typing import Any, Callable, Dict, Optional from gateway.config import Platform, PlatformConfig -from gateway.platforms.base import BasePlatformAdapter, SendResult +from gateway.platforms.base import BasePlatformAdapter, MessageEvent, SendResult from gateway.relay.descriptor import CapabilityDescriptor from gateway.relay.transport import RelayTransport +from gateway.session import SessionSource logger = logging.getLogger(__name__) @@ -89,6 +90,13 @@ class RelayAdapter(BasePlatformAdapter): set_interrupt = getattr(self._transport, "set_interrupt_inbound_handler", None) if callable(set_interrupt): set_interrupt(self.on_interrupt) + # Passthrough-plane forwards (Discord interactions, Twilio, …) also ride + # the SAME outbound WS (Phase 5 §5.1) — the connector edge-ACKed and + # forwards the real request here, so a hosted gateway needs no public + # inbound port. Bridge them to the adapter's passthrough handler. + set_passthrough = getattr(self._transport, "set_passthrough_handler", None) + if callable(set_passthrough): + set_passthrough(self._on_passthrough) ok = await self._transport.connect() if not ok: return False @@ -155,6 +163,95 @@ class RelayAdapter(BasePlatformAdapter): """ await self.interrupt_session_activity(session_key, chat_id) + async def _on_passthrough(self, forward, buffer_id: Optional[str] = None) -> None: + """Handle a connector-forwarded passthrough request (Phase 5 §5.1). + + The passthrough plane (Discord interactions, Twilio webhooks, …) answers + the provider's latency-critical ACK at the connector EDGE, then forwards + the real, ALREADY-SANITIZED request to this gateway over the outbound WS. + The connector is the trust boundary: it verified the provider signature + at the edge and stripped any shared-identity credential (e.g. a Discord + interaction follow-up token) into its vault — so this body carries no + token, and the agent later acts on it via the token-less ``follow_up`` + path (``send_follow_up``), never holding the credential. + + For a Discord interaction we decode the (JSON) body and convert it to a + normalized ``MessageEvent`` so it flows through the SAME agent path as a + chat message (``handle_message``); the agent's reply egresses over the + normal outbound/follow_up path. Non-JSON or non-interaction forwards are + logged and dropped for now (Twilio/SMS over the relay is a later unit). + + NEVER raises: a malformed forward must not kill the read loop. + + NOTE (open semantic sub-design, flagged for review): the interaction -> + MessageEvent mapping below is the v1 default. The exact agent UX for a + slash-command / button interaction (vs. a plain message) — command name + surfacing, option rendering, deferred-vs-immediate response — is the open + piece tracked in the spec; the TRANSPORT + receive mechanism (this whole + path) is settled. + """ + try: + platform = getattr(forward, "platform", "") or "" + if platform == "discord": + event = self._discord_interaction_to_event(forward) + if event is not None: + self._capture_scope(event) + await self.handle_message(event) + return + logger.info( + "relay passthrough_forward dropped (no handler): platform=%s method=%s path=%s", + platform, + getattr(forward, "method", "?"), + getattr(forward, "path", "?"), + ) + except Exception: # noqa: BLE001 - a bad forward must never break the reader + logger.warning("relay passthrough_forward handling failed", exc_info=True) + + def _discord_interaction_to_event(self, forward): + """Convert a forwarded Discord interaction body to a MessageEvent, or None. + + Builds the session source the same way the connector does for an + interaction (``interactionSessionSource`` on the connector side), so the + agent's session key matches the one the connector bound the follow-up + capability under. Returns None when the body isn't a usable interaction + (e.g. a PING, which the connector already answers at the edge and never + forwards). + """ + import json + + from gateway.platforms.base import MessageType + + try: + payload = json.loads(bytes(getattr(forward, "body", b"")).decode("utf-8")) + except Exception: # noqa: BLE001 + return None + if not isinstance(payload, dict): + return None + # type 1 = PING (answered at the edge, never forwarded); 2 = APPLICATION_COMMAND; + # 3 = MESSAGE_COMPONENT; 5 = MODAL_SUBMIT. Surface a best-effort text. + itype = payload.get("type") + data = payload.get("data") or {} + if itype == 2: + text = str(data.get("name") or "") + elif itype == 3: + text = str(data.get("custom_id") or "") + else: + text = "" + member = payload.get("member") or {} + user = (member.get("user") if isinstance(member, dict) else None) or payload.get("user") or {} + channel_id = str(payload.get("channel_id") or "") + guild_id = payload.get("guild_id") + source = SessionSource( + platform=Platform.RELAY, + chat_id=channel_id, + chat_type="channel" if guild_id else "dm", + user_id=str(user.get("id")) if isinstance(user, dict) and user.get("id") else None, + user_name=str(user.get("username")) if isinstance(user, dict) and user.get("username") else None, + guild_id=str(guild_id) if guild_id else None, + message_id=str(payload.get("id")) if payload.get("id") else None, + ) + return MessageEvent(text=text, message_type=MessageType.TEXT, source=source) + async def disconnect(self) -> None: if self._transport is not None: await self._transport.disconnect() diff --git a/gateway/relay/transport.py b/gateway/relay/transport.py index afe6f769f26..b557416c7ad 100644 --- a/gateway/relay/transport.py +++ b/gateway/relay/transport.py @@ -30,6 +30,13 @@ from gateway.relay.descriptor import CapabilityDescriptor # Callback the transport invokes for each inbound normalized event. InboundHandler = Callable[[MessageEvent], Awaitable[None]] +# Callback the transport invokes for each forwarded passthrough request (§5.1). +# The first arg is a PassthroughForward (gateway/relay/ws_transport.py) — typed +# as Any here to keep this protocol module free of a concrete-transport import +# (ws_transport imports FROM this module). The second is an optional bufferId +# (Phase 5 §5.3 buffered flip) the handler acks after durable handoff. +PassthroughHandler = Callable[[Any, Optional[str]], Awaitable[None]] + @runtime_checkable class RelayTransport(Protocol): @@ -51,6 +58,18 @@ class RelayTransport(Protocol): """Register the callback invoked with each inbound MessageEvent.""" ... + def set_passthrough_handler(self, handler: "PassthroughHandler") -> None: + """Register the callback invoked with each forwarded passthrough request. + + Phase 5 §5.1: the passthrough plane (Discord interactions, Twilio, …) + answers the provider's edge ACK at the connector, then forwards the real + request to the gateway over this same outbound socket (a hosted gateway + has no public inbound port). The transport invokes ``handler(forward, + buffer_id)`` for each ``passthrough_forward`` frame. Optional on a + transport (an in-memory stub may not implement it). + """ + ... + async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]: """Carry an outbound action (send/edit/typing) to the connector. diff --git a/gateway/relay/ws_transport.py b/gateway/relay/ws_transport.py index b091d44faa8..eb17848e0b3 100644 --- a/gateway/relay/ws_transport.py +++ b/gateway/relay/ws_transport.py @@ -33,6 +33,7 @@ import asyncio import json import logging import uuid +from dataclasses import dataclass from typing import Any, Dict, Optional from gateway.platforms.base import MessageEvent, MessageType @@ -128,6 +129,54 @@ def _event_from_wire(raw: Dict[str, Any]) -> MessageEvent: ) +@dataclass +class PassthroughForward: + """A connector-forwarded passthrough-plane request (Phase 5 §5.1). + + The connector answered the provider's latency-critical ACK at its edge, then + forwarded the real (already-sanitized) request to this gateway over the WS. + ``body`` is the exact decoded bytes the connector forwarded (the wire carries + it base64-encoded for byte parity). ``headers`` preserve arrival order. + """ + + platform: str + bot_id: str + method: str + path: str + headers: list[tuple[str, str]] + body: bytes + + +def _passthrough_from_wire(raw: Dict[str, Any]) -> PassthroughForward: + """Rebuild a PassthroughForward from the connector's wire frame. + + Mirrors the connector's ``PassthroughForward`` (relay/protocol.ts): the body + is base64-decoded back to the exact bytes the connector forwarded, so the + gateway re-processes byte-identical content (the connector is the trust + boundary; it already verified at the edge). + """ + import base64 + + body_b64 = raw.get("bodyB64", "") or "" + try: + body = base64.b64decode(body_b64) + except Exception: # noqa: BLE001 - a malformed body must not crash the reader + body = b"" + headers_raw = raw.get("headers", []) or [] + headers: list[tuple[str, str]] = [] + for pair in headers_raw: + if isinstance(pair, (list, tuple)) and len(pair) == 2: + headers.append((str(pair[0]), str(pair[1]))) + return PassthroughForward( + platform=str(raw.get("platform", "")), + bot_id=str(raw.get("botId", "")), + method=str(raw.get("method", "")), + path=str(raw.get("path", "")), + headers=headers, + body=body, + ) + + class WebSocketRelayTransport: """RelayTransport over a WebSocket connection the gateway dials to the connector.""" @@ -318,6 +367,16 @@ class WebSocketRelayTransport: handler = getattr(self, "_interrupt_inbound_handler", None) if handler is not None: await handler(frame.get("session_key", ""), frame.get("chat_id", "")) + elif ftype == "passthrough_forward": + # Phase 5 §5.1: a forwarded passthrough-plane request (Discord + # interaction, Twilio, …) the connector already edge-ACKed. It rides + # the SAME outbound WS as inbound messages so a hosted gateway needs + # no public inbound port. Dispatch to the adapter's handler; the + # bufferId (when present, §5.3 buffered flip) is passed for ack. + handler = getattr(self, "_passthrough_handler", None) + if handler is not None: + fwd = _passthrough_from_wire(frame.get("forward", {})) + await handler(fwd, frame.get("bufferId")) else: # hello/outbound/interrupt are gateway->connector; ignore if echoed. pass @@ -325,3 +384,12 @@ class WebSocketRelayTransport: def set_interrupt_inbound_handler(self, handler: Any) -> None: """Register the callback for connector->gateway interrupt_inbound frames.""" self._interrupt_inbound_handler = handler + + def set_passthrough_handler(self, handler: Any) -> None: + """Register the callback for connector->gateway passthrough_forward frames. + + Mirrors set_interrupt_inbound_handler: the runner/adapter wires this so a + forwarded passthrough request (Phase 5 §5.1) reaches the adapter over the + same outbound WS the gateway already holds. ``handler(forward, buffer_id)``. + """ + self._passthrough_handler = handler diff --git a/tests/gateway/relay/stub_connector.py b/tests/gateway/relay/stub_connector.py index 11a97cae53a..e309750d5e8 100644 --- a/tests/gateway/relay/stub_connector.py +++ b/tests/gateway/relay/stub_connector.py @@ -27,6 +27,7 @@ class StubConnector: self._descriptor = descriptor self._inbound: Optional[InboundHandler] = None self._interrupt_inbound: Optional[Any] = None + self._passthrough: Optional[Any] = None self.connected = False self.sent: List[Dict[str, Any]] = [] self.interrupts: List[Dict[str, Any]] = [] @@ -57,6 +58,12 @@ class StubConnector: bridge here so connector→gateway interrupt_inbound frames route to it.""" self._interrupt_inbound = handler + def set_passthrough_handler(self, handler: Any) -> None: + """Mirror the real WS transport: the adapter registers its passthrough + bridge here so connector→gateway passthrough_forward frames route to it + (Phase 5 §5.1).""" + self._passthrough = handler + async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]: self.sent.append(action) if action.get("op") == "send": @@ -85,3 +92,9 @@ class StubConnector: if self._interrupt_inbound is None: raise RuntimeError("no interrupt_inbound handler registered (call adapter.connect first)") await self._interrupt_inbound(session_key, chat_id) + + async def push_passthrough(self, forward: Any, buffer_id: Optional[str] = None) -> None: + """Simulate the connector forwarding a passthrough request over the WS (§5.1).""" + if self._passthrough is None: + raise RuntimeError("no passthrough handler registered (call adapter.connect first)") + await self._passthrough(forward, buffer_id) diff --git a/tests/gateway/relay/test_relay_passthrough.py b/tests/gateway/relay/test_relay_passthrough.py new file mode 100644 index 00000000000..51c5b8ee203 --- /dev/null +++ b/tests/gateway/relay/test_relay_passthrough.py @@ -0,0 +1,199 @@ +"""Relay passthrough-over-WS forwarding (Phase 5 §5.1). + +Proves the gateway side of §5.1: a connector-forwarded passthrough request +(Discord interaction, Twilio, …) arrives over the SAME outbound /relay WS as +inbound messages (a hosted gateway has no public inbound port), and the relay +adapter handles it — decoding the byte-preserved body and routing a Discord +interaction through the normal agent path (handle_message). + +Mirrors test_relay_interrupt.py's wiring discipline (connect() registers the +connector->gateway handlers on the transport). +""" + +from __future__ import annotations + +import base64 +import json + +import pytest + +from gateway.config import PlatformConfig +from gateway.relay.adapter import RelayAdapter +from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor +from gateway.relay.ws_transport import PassthroughForward, _passthrough_from_wire + +from tests.gateway.relay.stub_connector import StubConnector + + +def _desc() -> CapabilityDescriptor: + return CapabilityDescriptor( + contract_version=CONTRACT_VERSION, + platform="discord", + label="Discord", + max_message_length=2000, + supports_draft_streaming=False, + supports_edit=True, + supports_threads=True, + markdown_dialect="discord", + len_unit="chars", + ) + + +@pytest.fixture +def adapter(): + return RelayAdapter(PlatformConfig(), _desc(), transport=StubConnector(_desc())) + + +def _interaction_forward(payload: dict) -> PassthroughForward: + body = json.dumps(payload).encode("utf-8") + return PassthroughForward( + platform="discord", + bot_id="appShared", + method="POST", + path="/interactions/discord/appShared", + headers=[("content-type", "application/json")], + body=body, + ) + + +def test_passthrough_from_wire_byte_preserves_body(): + """The wire frame's base64 body decodes back to the exact bytes (parity with + the connector's toPassthroughForward).""" + original = json.dumps({"type": 2, "data": {"name": "ping"}, "guild_id": "g1"}).encode("utf-8") + wire = { + "platform": "discord", + "botId": "appShared", + "method": "POST", + "path": "/interactions/discord/appShared", + "headers": [["content-type", "application/json"]], + "bodyB64": base64.b64encode(original).decode("ascii"), + } + fwd = _passthrough_from_wire(wire) + assert fwd.platform == "discord" + assert fwd.bot_id == "appShared" + assert fwd.body == original + assert fwd.headers == [("content-type", "application/json")] + + +def test_passthrough_from_wire_tolerates_malformed_body(): + """A non-base64 body must not raise (the reader must never crash).""" + fwd = _passthrough_from_wire({"platform": "x", "bodyB64": "!!!not base64!!!"}) + assert fwd.body == b"" + + +@pytest.mark.asyncio +async def test_connect_wires_passthrough_handler_over_ws(adapter): + """connect() registers the passthrough handler on the transport so a + connector-delivered passthrough_forward frame reaches the adapter.""" + await adapter.connect() + stub = adapter._transport + assert stub._passthrough is not None + + +@pytest.mark.asyncio +async def test_discord_interaction_routes_through_handle_message(adapter, monkeypatch): + """A forwarded Discord application-command interaction is decoded and routed + through the normal agent path (handle_message) with a correct session source.""" + await adapter.connect() + stub = adapter._transport + + seen = [] + + async def fake_handle(event): + seen.append(event) + + monkeypatch.setattr(adapter, "handle_message", fake_handle) + + fwd = _interaction_forward( + { + "id": "interaction-1", + "type": 2, # APPLICATION_COMMAND + "channel_id": "chan-9", + "guild_id": "guild-7", + "data": {"name": "summarize"}, + "member": {"user": {"id": "user-3", "username": "ben"}}, + } + ) + await stub.push_passthrough(fwd, buffer_id=None) + + assert len(seen) == 1 + ev = seen[0] + assert ev.text == "summarize" + assert ev.source.chat_id == "chan-9" + assert ev.source.guild_id == "guild-7" + assert ev.source.user_id == "user-3" + assert ev.source.chat_type == "channel" + # Scope captured so the agent's reply re-asserts guild_id for egress. + assert adapter._scope_by_chat.get("chan-9") == "guild-7" + + +@pytest.mark.asyncio +async def test_message_component_interaction_uses_custom_id(adapter, monkeypatch): + """A MESSAGE_COMPONENT (button) interaction surfaces its custom_id as text.""" + await adapter.connect() + stub = adapter._transport + seen = [] + + async def fake_handle(event): + seen.append(event) + + monkeypatch.setattr(adapter, "handle_message", fake_handle) + fwd = _interaction_forward( + { + "id": "i2", + "type": 3, # MESSAGE_COMPONENT + "channel_id": "c2", + "guild_id": "g2", + "data": {"custom_id": "approve_btn"}, + "member": {"user": {"id": "u2", "username": "x"}}, + } + ) + await stub.push_passthrough(fwd) + assert len(seen) == 1 + assert seen[0].text == "approve_btn" + + +@pytest.mark.asyncio +async def test_malformed_interaction_body_does_not_raise(adapter, monkeypatch): + """A non-JSON forward is logged and dropped — never crashes the read loop.""" + await adapter.connect() + stub = adapter._transport + called = [] + + async def fake_handle(event): + called.append(event) + + monkeypatch.setattr(adapter, "handle_message", fake_handle) + bad = PassthroughForward( + platform="discord", + bot_id="appShared", + method="POST", + path="/x", + headers=[], + body=b"not json", + ) + await stub.push_passthrough(bad) # must not raise + assert called == [] + + +@pytest.mark.asyncio +async def test_non_discord_forward_dropped_cleanly(adapter, monkeypatch): + """A platform with no gateway-side handler yet (e.g. twilio) is dropped, not raised.""" + await adapter.connect() + stub = adapter._transport + called = [] + + async def fake_handle(event): + called.append(event) + + monkeypatch.setattr(adapter, "handle_message", fake_handle) + fwd = PassthroughForward( + platform="twilio", + bot_id="bot1", + method="POST", + path="/webhooks/twilio/seg", + headers=[], + body=b"From=+1&Body=hi", + ) + await stub.push_passthrough(fwd) # must not raise + assert called == []