feat(relay): route mid-turn /stop over relay interrupt channel

RelayAdapter.on_interrupt(session_key, chat_id) bridges a connector-delivered
mid-turn /stop into the existing interrupt_session_activity path, setting the
per-session _active_sessions Event and clearing typing — cancelling exactly the
targeted session's turn without touching siblings (mirrors test_stop_thread_
sibling isolation). Transport.send_interrupt carries the gateway-side egress to
the connector for socket-owner routing.

Phase 1, Task 1.4 of the gateway-relay plan.
This commit is contained in:
Ben 2026-06-08 15:18:53 +10:00 committed by Teknium
parent d0133fd8e4
commit a3cdd8c39d
2 changed files with 80 additions and 0 deletions

View file

@ -82,6 +82,17 @@ class RelayAdapter(BasePlatformAdapter):
"""Bridge a connector-delivered MessageEvent into the normal adapter path."""
await self.handle_message(event)
async def on_interrupt(self, session_key: str, chat_id: str) -> None:
"""Bridge a connector-delivered /stop into the adapter's interrupt path.
The connector forwards a mid-turn interrupt down the socket owned by
the gateway instance running ``session_key``; this routes it to the
existing per-session interrupt mechanism (sets the
``_active_sessions[session_key]`` Event and clears typing), cancelling
the right turn without touching sibling sessions.
"""
await self.interrupt_session_activity(session_key, chat_id)
async def disconnect(self) -> None:
if self._transport is not None:
await self._transport.disconnect()

View file

@ -0,0 +1,69 @@
"""Relay /stop interrupt routing (relay Phase 1, Task 1.4).
Proves a connector-delivered mid-turn interrupt reaches the existing per-session
interrupt mechanism and cancels exactly the targeted session_key's turn — never
a sibling's. Mirrors the isolation discipline of test_stop_thread_sibling.py.
"""
from __future__ import annotations
import asyncio
import pytest
from gateway.config import PlatformConfig
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
from tests.gateway.relay.stub_connector import StubConnector
def _desc() -> CapabilityDescriptor:
return CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="discord",
label="Discord",
max_message_length=2000,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=True,
markdown_dialect="discord",
len_unit="chars",
)
@pytest.fixture
def adapter():
return RelayAdapter(PlatformConfig(), _desc(), transport=StubConnector(_desc()))
@pytest.mark.asyncio
async def test_interrupt_sets_only_target_session_event(adapter):
key_a = "agent:main:discord:group:chanA:userX"
key_b = "agent:main:discord:group:chanB:userY"
ev_a = asyncio.Event()
ev_b = asyncio.Event()
adapter._active_sessions[key_a] = ev_a
adapter._active_sessions[key_b] = ev_b
await adapter.on_interrupt(key_a, chat_id="chanA")
assert ev_a.is_set() is True, "target session's interrupt Event must be set"
assert ev_b.is_set() is False, "sibling session must be untouched"
@pytest.mark.asyncio
async def test_interrupt_unknown_session_is_noop(adapter):
# No active session for this key — must not raise.
await adapter.on_interrupt("agent:main:discord:group:nope:userZ", chat_id="nope")
@pytest.mark.asyncio
async def test_outbound_interrupt_reaches_connector(adapter):
"""The gateway-side /stop egress: send_interrupt is carried to the connector
so it can forward down the socket owning the session_key."""
stub = adapter._transport
await stub.send_interrupt("agent:main:discord:group:chanA:userX", reason="stop")
assert stub.interrupts == [
{"session_key": "agent:main:discord:group:chanA:userX", "reason": "stop"}
]