feat(relay): transport protocol + test-only stub connector

Defines RelayTransport (lifecycle/handshake/inbound/outbound/interrupt) as the
gateway<->connector wire contract; RelayAdapter.connect now registers an inbound
handler that bridges connector-delivered MessageEvents into handle_message.
Adds an in-memory StubConnector under tests/ and an E2E round-trip proving:
connect registers the handler, inbound events reach the adapter, guild_id drives
build_session_key isolation (two guilds -> two keys; same guild/channel/user ->
one), outbound send round-trips, get_chat_info is proxied.

Phase 1, Task 1.2 of the gateway-relay plan.
This commit is contained in:
Ben 2026-06-08 15:16:41 +10:00 committed by Teknium
parent b0999c82f3
commit 259e78e175
4 changed files with 270 additions and 19 deletions

View file

@ -19,11 +19,12 @@ deprecation cycle until >=2 Class-1 platforms validate them.
from __future__ import annotations
import logging
from typing import Any, Callable, Dict, Optional, Protocol, runtime_checkable
from typing import Any, Callable, Dict, Optional
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import BasePlatformAdapter, SendResult
from gateway.relay.descriptor import CapabilityDescriptor
from gateway.relay.transport import RelayTransport
logger = logging.getLogger(__name__)
@ -40,24 +41,6 @@ _LEN_FNS: Dict[str, Callable[[str], int]] = {
}
@runtime_checkable
class RelayTransport(Protocol):
"""Minimal transport contract the RelayAdapter delegates wire I/O to.
The full protocol (inbound MessageEvent stream, interrupt channel) is
fleshed out in gateway/relay/transport.py (Task 1.2); the adapter only
needs these outbound + lifecycle calls to satisfy the abstract methods.
"""
async def connect(self) -> bool: ...
async def disconnect(self) -> None: ...
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]: ...
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: ...
class RelayAdapter(BasePlatformAdapter):
"""Generic relay adapter advertising a connector-negotiated capability profile."""
@ -92,8 +75,13 @@ class RelayAdapter(BasePlatformAdapter):
async def connect(self) -> bool:
if self._transport is None:
raise RuntimeError("RelayAdapter has no transport configured")
self._transport.set_inbound_handler(self._on_inbound)
return await self._transport.connect()
async def _on_inbound(self, event) -> None:
"""Bridge a connector-delivered MessageEvent into the normal adapter path."""
await self.handle_message(event)
async def disconnect(self) -> None:
if self._transport is not None:
await self._transport.disconnect()

View file

@ -0,0 +1,75 @@
"""Relay transport protocol — the gateway<->connector wire contract. EXPERIMENTAL.
The ``RelayAdapter`` (gateway side) delegates all wire I/O to a ``RelayTransport``.
The gateway dials OUT to the connector, so a production transport is a WebSocket
client; in tests it is an in-memory stub (``tests/gateway/relay/stub_connector.py``).
This module defines the protocol surface only no concrete transport. The
contract has four concerns:
1. Lifecycle: ``connect`` / ``disconnect``.
2. Handshake: ``handshake`` returns the ``CapabilityDescriptor`` the connector
advertises for the platform this adapter fronts.
3. Inbound: ``set_inbound_handler`` registers a callback the transport invokes
with each normalized ``MessageEvent`` the connector delivers.
4. Outbound: ``send_outbound`` carries send/edit/typing actions back to the
connector; ``get_chat_info`` proxies a chat-info lookup; ``send_interrupt``
routes a mid-turn /stop down the socket that owns the session_key.
EXPERIMENTAL: may change without a deprecation cycle until >=2 Class-1 platforms
validate it. See docs/relay-connector-contract.md.
"""
from __future__ import annotations
from typing import Any, Awaitable, Callable, Dict, Optional, Protocol, runtime_checkable
from gateway.platforms.base import MessageEvent
from gateway.relay.descriptor import CapabilityDescriptor
# Callback the transport invokes for each inbound normalized event.
InboundHandler = Callable[[MessageEvent], Awaitable[None]]
@runtime_checkable
class RelayTransport(Protocol):
"""Full gateway<->connector transport contract."""
async def connect(self) -> bool:
"""Open the connection to the connector; return True on success."""
...
async def disconnect(self) -> None:
"""Close the connection."""
...
async def handshake(self) -> CapabilityDescriptor:
"""Return the capability descriptor the connector advertises."""
...
def set_inbound_handler(self, handler: InboundHandler) -> None:
"""Register the callback invoked with each inbound MessageEvent."""
...
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
"""Carry an outbound action (send/edit/typing) to the connector.
Returns a result dict; for ``op == "send"`` it carries
``success`` and optionally ``message_id`` / ``error``.
"""
...
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
"""Proxy a chat-info lookup to the connector."""
...
async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None:
"""Route a mid-turn /stop to the connector for ``session_key``.
The connector forwards it down the socket owned by the gateway
instance running that session (the /stop routing invariant). On the
gateway side this is the OUTBOUND direction; the actual task
cancellation happens when the connector echoes an interrupt inbound
(handled in Task 1.4).
"""
...

View file

@ -0,0 +1,66 @@
"""Test-only in-memory stub connector implementing RelayTransport.
MUST stay under tests/ never under plugins/ or gateway/ (a CI guard in
test_no_stub_leak.py asserts this). It lets Phase 1 prove the gateway side of
the relay end-to-end with zero dependency on the real (Node) connector.
The stub:
- hands back a fixed CapabilityDescriptor at handshake,
- lets a test push synthetic inbound MessageEvents (push_inbound),
- records every outbound action (sent/interrupts) for assertions,
- answers get_chat_info from a small fixture map.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from gateway.platforms.base import MessageEvent
from gateway.relay.descriptor import CapabilityDescriptor
from gateway.relay.transport import InboundHandler
class StubConnector:
"""In-memory RelayTransport for tests."""
def __init__(self, descriptor: CapabilityDescriptor) -> None:
self._descriptor = descriptor
self._inbound: Optional[InboundHandler] = None
self.connected = False
self.sent: List[Dict[str, Any]] = []
self.interrupts: List[Dict[str, Any]] = []
self.chat_info: Dict[str, Dict[str, Any]] = {}
# Canned result for the next send_outbound (override per-test).
self.next_send_result: Dict[str, Any] = {"success": True, "message_id": "m1"}
async def connect(self) -> bool:
self.connected = True
return True
async def disconnect(self) -> None:
self.connected = False
async def handshake(self) -> CapabilityDescriptor:
return self._descriptor
def set_inbound_handler(self, handler: InboundHandler) -> None:
self._inbound = handler
async def send_outbound(self, action: Dict[str, Any]) -> Dict[str, Any]:
self.sent.append(action)
if action.get("op") == "send":
return dict(self.next_send_result)
return {"success": True}
async def get_chat_info(self, chat_id: str) -> Dict[str, Any]:
return self.chat_info.get(chat_id, {"name": chat_id, "type": "dm"})
async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None:
self.interrupts.append({"session_key": session_key, "reason": reason})
# ── test driver ──────────────────────────────────────────────────────
async def push_inbound(self, event: MessageEvent) -> None:
"""Simulate the connector delivering a normalized inbound event."""
if self._inbound is None:
raise RuntimeError("no inbound handler registered (call adapter.connect first)")
await self._inbound(event)

View file

@ -0,0 +1,122 @@
"""End-to-end relay round-trip against the in-memory stub connector.
Proves the gateway side of the relay works with no real connector:
- connect() registers the inbound handler,
- a connector-delivered MessageEvent reaches the adapter's message path,
- SessionSource discriminators (guild_id) drive build_session_key isolation,
- an outbound send round-trips through the transport.
These target the transport contract + session-key derivation (Task 1.2's gate),
not the full agent turn handle_message is patched to capture the event.
"""
from __future__ import annotations
import pytest
from gateway.config import Platform, PlatformConfig
from gateway.platforms.base import MessageEvent, MessageType
from gateway.session import SessionSource, build_session_key
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
from tests.gateway.relay.stub_connector import StubConnector
def _discord_descriptor() -> CapabilityDescriptor:
return CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="discord",
label="Discord",
max_message_length=2000,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=True,
markdown_dialect="discord",
len_unit="chars",
emoji="\U0001f47e",
platform_hint="You are on Discord.",
pii_safe=False,
)
def _discord_event(guild_id: str, channel_id: str, user_id: str, text: str) -> MessageEvent:
"""Synthetic inbound the connector would build from a discord.js message."""
source = SessionSource(
platform=Platform.DISCORD,
chat_id=channel_id,
chat_type="group",
user_id=user_id,
guild_id=guild_id,
)
return MessageEvent(text=text, message_type=MessageType.TEXT, source=source)
@pytest.fixture
def wired():
stub = StubConnector(_discord_descriptor())
adapter = RelayAdapter(PlatformConfig(), _discord_descriptor(), transport=stub)
return adapter, stub
@pytest.mark.asyncio
async def test_connect_registers_inbound_handler(wired):
adapter, stub = wired
assert stub._inbound is None
ok = await adapter.connect()
assert ok is True
assert stub.connected is True
assert stub._inbound is not None
@pytest.mark.asyncio
async def test_inbound_event_reaches_adapter(wired, monkeypatch):
adapter, stub = wired
captured = []
monkeypatch.setattr(adapter, "handle_message", lambda ev: _async_capture(captured, ev))
await adapter.connect()
ev = _discord_event("guildA", "chan1", "userX", "hello")
await stub.push_inbound(ev)
assert len(captured) == 1
assert captured[0].text == "hello"
assert captured[0].source.guild_id == "guildA"
@pytest.mark.asyncio
async def test_two_guilds_isolate_into_distinct_session_keys(wired):
adapter, _ = wired
ev_a = _discord_event("guildA", "chan1", "userX", "hi from A")
ev_b = _discord_event("guildB", "chan2", "userX", "hi from B")
key_a = build_session_key(ev_a.source)
key_b = build_session_key(ev_b.source)
assert key_a != key_b
# Same guild + channel + user collapses to one session.
ev_a2 = _discord_event("guildA", "chan1", "userX", "again")
assert build_session_key(ev_a2.source) == key_a
@pytest.mark.asyncio
async def test_outbound_send_round_trips(wired):
adapter, stub = wired
await adapter.connect()
stub.next_send_result = {"success": True, "message_id": "msg-42"}
result = await adapter.send("chan1", "a reply", metadata={"k": "v"})
assert result.success is True
assert result.message_id == "msg-42"
assert len(stub.sent) == 1
assert stub.sent[0]["op"] == "send"
assert stub.sent[0]["chat_id"] == "chan1"
assert stub.sent[0]["content"] == "a reply"
@pytest.mark.asyncio
async def test_get_chat_info_proxied_to_connector(wired):
adapter, stub = wired
stub.chat_info["chan1"] = {"name": "general", "type": "group"}
info = await adapter.get_chat_info("chan1")
assert info == {"name": "general", "type": "group"}
async def _async_capture(sink, event):
sink.append(event)
return None