diff --git a/gateway/relay/adapter.py b/gateway/relay/adapter.py index cbcea28001c..8d6a2f7cea5 100644 --- a/gateway/relay/adapter.py +++ b/gateway/relay/adapter.py @@ -19,11 +19,12 @@ deprecation cycle until >=2 Class-1 platforms validate them. from __future__ import annotations import logging -from typing import Any, Callable, Dict, Optional, Protocol, runtime_checkable +from typing import Any, Callable, Dict, Optional from gateway.config import Platform, PlatformConfig from gateway.platforms.base import BasePlatformAdapter, SendResult from gateway.relay.descriptor import CapabilityDescriptor +from gateway.relay.transport import RelayTransport logger = logging.getLogger(__name__) @@ -40,24 +41,6 @@ _LEN_FNS: Dict[str, Callable[[str], int]] = { } -@runtime_checkable -class RelayTransport(Protocol): - """Minimal transport contract the RelayAdapter delegates wire I/O to. - - The full protocol (inbound MessageEvent stream, interrupt channel) is - fleshed out in gateway/relay/transport.py (Task 1.2); the adapter only - needs these outbound + lifecycle calls to satisfy the abstract methods. - """ - - async def connect(self) -> bool: ... - - async def disconnect(self) -> None: ... - - async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]: ... - - async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: ... - - class RelayAdapter(BasePlatformAdapter): """Generic relay adapter advertising a connector-negotiated capability profile.""" @@ -92,8 +75,13 @@ class RelayAdapter(BasePlatformAdapter): async def connect(self) -> bool: if self._transport is None: raise RuntimeError("RelayAdapter has no transport configured") + self._transport.set_inbound_handler(self._on_inbound) return await self._transport.connect() + async def _on_inbound(self, event) -> None: + """Bridge a connector-delivered MessageEvent into the normal adapter path.""" + await self.handle_message(event) + async def disconnect(self) -> None: if self._transport is not None: await self._transport.disconnect() diff --git a/gateway/relay/transport.py b/gateway/relay/transport.py new file mode 100644 index 00000000000..ff41474ba4b --- /dev/null +++ b/gateway/relay/transport.py @@ -0,0 +1,75 @@ +"""Relay transport protocol — the gateway<->connector wire contract. EXPERIMENTAL. + +The ``RelayAdapter`` (gateway side) delegates all wire I/O to a ``RelayTransport``. +The gateway dials OUT to the connector, so a production transport is a WebSocket +client; in tests it is an in-memory stub (``tests/gateway/relay/stub_connector.py``). + +This module defines the protocol surface only — no concrete transport. The +contract has four concerns: + + 1. Lifecycle: ``connect`` / ``disconnect``. + 2. Handshake: ``handshake`` returns the ``CapabilityDescriptor`` the connector + advertises for the platform this adapter fronts. + 3. Inbound: ``set_inbound_handler`` registers a callback the transport invokes + with each normalized ``MessageEvent`` the connector delivers. + 4. Outbound: ``send_outbound`` carries send/edit/typing actions back to the + connector; ``get_chat_info`` proxies a chat-info lookup; ``send_interrupt`` + routes a mid-turn /stop down the socket that owns the session_key. + +EXPERIMENTAL: may change without a deprecation cycle until >=2 Class-1 platforms +validate it. See docs/relay-connector-contract.md. +""" + +from __future__ import annotations + +from typing import Any, Awaitable, Callable, Dict, Optional, Protocol, runtime_checkable + +from gateway.platforms.base import MessageEvent +from gateway.relay.descriptor import CapabilityDescriptor + +# Callback the transport invokes for each inbound normalized event. +InboundHandler = Callable[[MessageEvent], Awaitable[None]] + + +@runtime_checkable +class RelayTransport(Protocol): + """Full gateway<->connector transport contract.""" + + async def connect(self) -> bool: + """Open the connection to the connector; return True on success.""" + ... + + async def disconnect(self) -> None: + """Close the connection.""" + ... + + async def handshake(self) -> CapabilityDescriptor: + """Return the capability descriptor the connector advertises.""" + ... + + def set_inbound_handler(self, handler: InboundHandler) -> None: + """Register the callback invoked with each inbound MessageEvent.""" + ... + + async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]: + """Carry an outbound action (send/edit/typing) to the connector. + + Returns a result dict; for ``op == "send"`` it carries + ``success`` and optionally ``message_id`` / ``error``. + """ + ... + + async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: + """Proxy a chat-info lookup to the connector.""" + ... + + async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None: + """Route a mid-turn /stop to the connector for ``session_key``. + + The connector forwards it down the socket owned by the gateway + instance running that session (the /stop routing invariant). On the + gateway side this is the OUTBOUND direction; the actual task + cancellation happens when the connector echoes an interrupt inbound + (handled in Task 1.4). + """ + ... diff --git a/tests/gateway/relay/stub_connector.py b/tests/gateway/relay/stub_connector.py new file mode 100644 index 00000000000..b2e163786ba --- /dev/null +++ b/tests/gateway/relay/stub_connector.py @@ -0,0 +1,66 @@ +"""Test-only in-memory stub connector implementing RelayTransport. + +MUST stay under tests/ — never under plugins/ or gateway/ (a CI guard in +test_no_stub_leak.py asserts this). It lets Phase 1 prove the gateway side of +the relay end-to-end with zero dependency on the real (Node) connector. + +The stub: + - hands back a fixed CapabilityDescriptor at handshake, + - lets a test push synthetic inbound MessageEvents (push_inbound), + - records every outbound action (sent/interrupts) for assertions, + - answers get_chat_info from a small fixture map. +""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +from gateway.platforms.base import MessageEvent +from gateway.relay.descriptor import CapabilityDescriptor +from gateway.relay.transport import InboundHandler + + +class StubConnector: + """In-memory RelayTransport for tests.""" + + def __init__(self, descriptor: CapabilityDescriptor) -> None: + self._descriptor = descriptor + self._inbound: Optional[InboundHandler] = None + self.connected = False + self.sent: List[Dict[str, Any]] = [] + self.interrupts: List[Dict[str, Any]] = [] + self.chat_info: Dict[str, Dict[str, Any]] = {} + # Canned result for the next send_outbound (override per-test). + self.next_send_result: Dict[str, Any] = {"success": True, "message_id": "m1"} + + async def connect(self) -> bool: + self.connected = True + return True + + async def disconnect(self) -> None: + self.connected = False + + async def handshake(self) -> CapabilityDescriptor: + return self._descriptor + + def set_inbound_handler(self, handler: InboundHandler) -> None: + self._inbound = handler + + async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]: + self.sent.append(action) + if action.get("op") == "send": + return dict(self.next_send_result) + return {"success": True} + + async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: + return self.chat_info.get(chat_id, {"name": chat_id, "type": "dm"}) + + async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None: + self.interrupts.append({"session_key": session_key, "reason": reason}) + + # ── test driver ────────────────────────────────────────────────────── + async def push_inbound(self, event: MessageEvent) -> None: + """Simulate the connector delivering a normalized inbound event.""" + if self._inbound is None: + raise RuntimeError("no inbound handler registered (call adapter.connect first)") + await self._inbound(event) diff --git a/tests/gateway/relay/test_relay_roundtrip.py b/tests/gateway/relay/test_relay_roundtrip.py new file mode 100644 index 00000000000..2336d53ee9b --- /dev/null +++ b/tests/gateway/relay/test_relay_roundtrip.py @@ -0,0 +1,122 @@ +"""End-to-end relay round-trip against the in-memory stub connector. + +Proves the gateway side of the relay works with no real connector: + - connect() registers the inbound handler, + - a connector-delivered MessageEvent reaches the adapter's message path, + - SessionSource discriminators (guild_id) drive build_session_key isolation, + - an outbound send round-trips through the transport. + +These target the transport contract + session-key derivation (Task 1.2's gate), +not the full agent turn — handle_message is patched to capture the event. +""" + +from __future__ import annotations + +import pytest + +from gateway.config import Platform, PlatformConfig +from gateway.platforms.base import MessageEvent, MessageType +from gateway.session import SessionSource, build_session_key +from gateway.relay.adapter import RelayAdapter +from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor + +from tests.gateway.relay.stub_connector import StubConnector + + +def _discord_descriptor() -> CapabilityDescriptor: + return CapabilityDescriptor( + contract_version=CONTRACT_VERSION, + platform="discord", + label="Discord", + max_message_length=2000, + supports_draft_streaming=False, + supports_edit=True, + supports_threads=True, + markdown_dialect="discord", + len_unit="chars", + emoji="\U0001f47e", + platform_hint="You are on Discord.", + pii_safe=False, + ) + + +def _discord_event(guild_id: str, channel_id: str, user_id: str, text: str) -> MessageEvent: + """Synthetic inbound the connector would build from a discord.js message.""" + source = SessionSource( + platform=Platform.DISCORD, + chat_id=channel_id, + chat_type="group", + user_id=user_id, + guild_id=guild_id, + ) + return MessageEvent(text=text, message_type=MessageType.TEXT, source=source) + + +@pytest.fixture +def wired(): + stub = StubConnector(_discord_descriptor()) + adapter = RelayAdapter(PlatformConfig(), _discord_descriptor(), transport=stub) + return adapter, stub + + +@pytest.mark.asyncio +async def test_connect_registers_inbound_handler(wired): + adapter, stub = wired + assert stub._inbound is None + ok = await adapter.connect() + assert ok is True + assert stub.connected is True + assert stub._inbound is not None + + +@pytest.mark.asyncio +async def test_inbound_event_reaches_adapter(wired, monkeypatch): + adapter, stub = wired + captured = [] + monkeypatch.setattr(adapter, "handle_message", lambda ev: _async_capture(captured, ev)) + await adapter.connect() + ev = _discord_event("guildA", "chan1", "userX", "hello") + await stub.push_inbound(ev) + assert len(captured) == 1 + assert captured[0].text == "hello" + assert captured[0].source.guild_id == "guildA" + + +@pytest.mark.asyncio +async def test_two_guilds_isolate_into_distinct_session_keys(wired): + adapter, _ = wired + ev_a = _discord_event("guildA", "chan1", "userX", "hi from A") + ev_b = _discord_event("guildB", "chan2", "userX", "hi from B") + key_a = build_session_key(ev_a.source) + key_b = build_session_key(ev_b.source) + assert key_a != key_b + # Same guild + channel + user collapses to one session. + ev_a2 = _discord_event("guildA", "chan1", "userX", "again") + assert build_session_key(ev_a2.source) == key_a + + +@pytest.mark.asyncio +async def test_outbound_send_round_trips(wired): + adapter, stub = wired + await adapter.connect() + stub.next_send_result = {"success": True, "message_id": "msg-42"} + result = await adapter.send("chan1", "a reply", metadata={"k": "v"}) + assert result.success is True + assert result.message_id == "msg-42" + assert len(stub.sent) == 1 + assert stub.sent[0]["op"] == "send" + assert stub.sent[0]["chat_id"] == "chan1" + assert stub.sent[0]["content"] == "a reply" + + +@pytest.mark.asyncio +async def test_get_chat_info_proxied_to_connector(wired): + adapter, stub = wired + stub.chat_info["chan1"] = {"name": "general", "type": "group"} + info = await adapter.get_chat_info("chan1") + assert info == {"name": "general", "type": "group"} + + +async def _async_capture(sink, event): + sink.append(event) + return None