From 40fddc9e4c4592f7d2e064480e0615dbb67ac8bf Mon Sep 17 00:00:00 2001 From: Ben Barclay Date: Wed, 24 Jun 2026 09:50:30 +1000 Subject: [PATCH] =?UTF-8?q?feat(relay):=20Phase=205=20=C2=A75.3=20going-id?= =?UTF-8?q?le=20/=20buffered-flip=20primitive=20(gateway=20side)=20(#51572?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The gateway half of the going-idle/buffered-flip primitive (scale-to-zero PRIMITIVE, not the behaviour). Integrates with the EXISTING drain transition: - ws_transport: `go_idle()` sends `going_idle` + awaits the connector's `going_idle_ack` (connector-authoritative flip-then-ack, Q-5.3c — stays serving until the ack so nothing is lost in the flip window); acks a buffered inbound (bufferId present) via `inbound_ack` after the handler runs (drain-without-dup on the delivery leg); NET-NEW reconnect loop re-dials + re-handshakes after an unexpected close (off by default, on in production). - adapter: emits `going_idle` from its existing `disconnect()` drain seam before tearing down the socket; best-effort + guarded (never blocks shutdown). - transport Protocol + contract doc §3.2 document the 3 new frames. +6 relay tests (124 pass). NOT in scope: the autonomous idle timer / machine suspend / NAS health model (deferred behaviour). Ben's relay-adapter solo lane. --- docs/relay-connector-contract.md | 39 +++ gateway/relay/__init__.py | 5 + gateway/relay/adapter.py | 19 ++ gateway/relay/transport.py | 13 + gateway/relay/ws_transport.py | 122 +++++++++- tests/gateway/relay/test_relay_going_idle.py | 243 +++++++++++++++++++ 6 files changed, 439 insertions(+), 2 deletions(-) create mode 100644 tests/gateway/relay/test_relay_going_idle.py diff --git a/docs/relay-connector-contract.md b/docs/relay-connector-contract.md index b9576fbf00e..e3b21703442 100644 --- a/docs/relay-connector-contract.md +++ b/docs/relay-connector-contract.md @@ -186,6 +186,45 @@ tenant**. Tenant is resolved from the event's own discriminator (Discord token/socket/process delivered it. This keeps one shared bot able to front many tenants (Phase 6) without overloading an existing field. +### 3.2 Going-idle / buffered-flip primitive (§5.3) + +A scale-to-zero PRIMITIVE (not the behaviour — nothing here decides to sleep or +suspends a machine; a later workstream consumes these frames). It lets a gateway +enter a drain/idle transition without losing inbound that arrives while it is +gone, by making the connector buffer for that instance and replay on reconnect. + +Three frames (all keyed by the connection's **authenticated** per-instance id — +read off the stored secret record at the WS upgrade, never asserted in a frame): + +- `{"type":"going_idle"}` (gateway → connector) — emitted as part of the + gateway's EXISTING drain transition (the adapter sends it before tearing down + the socket). Asks the connector to flip this instance to **buffered-only**. +- `{"type":"going_idle_ack"}` (connector → gateway) — the connector has flipped: + live delivery has stopped and subsequent inbound for this instance buffers + durably. The gateway **stays serving until this ack** (so an event landing in + the flip window is delivered live, not lost — the same SUBSCRIBE-before-serve + ordering discipline as the bus). Only after the ack is it safe to close. +- `{"type":"inbound_ack", "bufferId"}` (gateway → connector) — durable receipt of + a buffered `inbound` delivery (which carries its `bufferId`) replayed on + reconnect. The connector acks the buffer entry only after this, giving + drain-without-dup on the **delivery leg**: an instance that dies mid-drain + redelivers exactly the unacked tail; an acked entry never redelivers. + +**Buffer + drain.** While flipped, the connector appends inbound to a durable +per-instance delivery-leg buffer (`delivery:`) instead of pushing it +live. On the gateway's **reconnect** (a NET-NEW reconnect loop re-dials + +re-handshakes after an unexpected close), the new handshake triggers the +connector to drain that backlog over the new socket **in order, ack-gated**, +then clear the flip so live delivery resumes. This reuses the same +`drainWithoutDup` machinery as the Discord→connector ingest leg, applied to the +connector→gateway delivery leg. Connector-authoritative throughout: a gateway can +only flip/drain ITS OWN instance. + +> NOT in scope (deferred behaviour): the autonomous idle timer that DECIDES to +> drain, the actual machine suspend, and the NAS suspended-health model. The +> primitive is "when the gateway drains, relay flips to buffered + replays on +> reconnect, with no loss/dup"; WHAT triggers the drain is out of scope. + --- ## 4. Outbound: action set diff --git a/gateway/relay/__init__.py b/gateway/relay/__init__.py index 92e0e46f4f5..e9a8ee7d8a1 100644 --- a/gateway/relay/__init__.py +++ b/gateway/relay/__init__.py @@ -584,6 +584,11 @@ def register_relay_adapter(force: bool = False, url: Optional[str] = None) -> bo bot_id, gateway_id=gateway_id, upgrade_secret=upgrade_secret, + # Phase 5 §5.3: re-dial + re-handshake after an unexpected socket + # close so a gateway that went idle/suspended re-establishes its + # relay socket — which triggers the connector's buffered-flip drain + # (the delivery-leg onResume) on the new handshake. + reconnect=True, ) return RelayAdapter(config, placeholder, transport=transport) diff --git a/gateway/relay/adapter.py b/gateway/relay/adapter.py index 9e44a34b421..968d2b88c12 100644 --- a/gateway/relay/adapter.py +++ b/gateway/relay/adapter.py @@ -18,6 +18,7 @@ deprecation cycle until >=2 Class-1 platforms validate them. from __future__ import annotations +import asyncio import logging from typing import Any, Callable, Dict, Optional @@ -254,6 +255,24 @@ class RelayAdapter(BasePlatformAdapter): async def disconnect(self) -> None: if self._transport is not None: + # Phase 5 §5.3: emit going_idle as part of the gateway's EXISTING + # drain/shutdown transition (the runner calls adapter.disconnect() + # when the gateway enters `draining`). Asking the connector to flip + # this instance to buffered-only BEFORE we tear down the socket means + # inbound that arrives while we're asleep buffers durably and replays + # on reconnect, instead of being pushed at a closing socket. The + # connector is authoritative (it acks the flip); we stay serving until + # the ack (Q-5.3c). Best-effort + guarded: a transport without go_idle + # (the stub) or a failed/timed-out ack must not block shutdown — we + # proceed to disconnect exactly as before, no regression. + go_idle = getattr(self._transport, "go_idle", None) + if callable(go_idle): + try: + result: Any = go_idle() + if asyncio.iscoroutine(result): + await result + except Exception: # noqa: BLE001 - going-idle is an optimization, never blocks drain + logger.debug("relay going_idle failed during drain", exc_info=True) await self._transport.disconnect() async def send( diff --git a/gateway/relay/transport.py b/gateway/relay/transport.py index b557416c7ad..7c0058dd98c 100644 --- a/gateway/relay/transport.py +++ b/gateway/relay/transport.py @@ -93,6 +93,19 @@ class RelayTransport(Protocol): """ ... + async def go_idle(self, timeout_s: float = 10.0) -> bool: + """Ask the connector to flip this instance to buffered-only (Phase 5 §5.3). + + Sends ``going_idle`` and awaits the connector's ``going_idle_ack`` — the + connector-authoritative confirmation that live delivery stopped and inbound + now buffers durably for replay on reconnect (Q-5.3c). Returns True on ack, + False on timeout / not-connected (the caller proceeds to close regardless; + without §5.3 wiring there is simply no buffering). Optional on a transport + (an in-memory stub may not implement it). Emitted as part of the gateway's + EXISTING drain transition — not a new idle path. + """ + ... + async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]: """Act on a shared-identity capability bound to a session (A2 outbound). diff --git a/gateway/relay/ws_transport.py b/gateway/relay/ws_transport.py index eb17848e0b3..6f545cb7eea 100644 --- a/gateway/relay/ws_transport.py +++ b/gateway/relay/ws_transport.py @@ -190,6 +190,9 @@ class WebSocketRelayTransport: outbound_timeout_s: float = _OUTBOUND_TIMEOUT_S, gateway_id: Optional[str] = None, upgrade_secret: Optional[str] = None, + reconnect: bool = False, + reconnect_backoff_s: float = 1.0, + reconnect_max_backoff_s: float = 30.0, ) -> None: if not WEBSOCKETS_AVAILABLE: raise RuntimeError( @@ -210,6 +213,19 @@ class WebSocketRelayTransport: self._gateway_id = gateway_id self._upgrade_secret = upgrade_secret + # Phase 5 §5.3: a NET-NEW reconnect supervisor. The base transport's + # _read_loop just ends on socket close ("reconnection is caller policy"); + # with reconnect=True the transport re-dials + re-handshakes after an + # UNEXPECTED close (not a deliberate disconnect()), so a gateway that went + # idle/suspended re-establishes its socket — which makes the connector + # drain that instance's buffered-only delivery-leg backlog (onResume) on + # the new handshake. Off by default so existing tests + the stub are + # unaffected; register_relay_adapter turns it on in production. + self._reconnect = reconnect + self._reconnect_backoff_s = reconnect_backoff_s + self._reconnect_max_backoff_s = reconnect_max_backoff_s + self._supervisor: Optional[asyncio.Task[None]] = None + self._ws: Any = None self._reader: Optional[asyncio.Task[None]] = None self._inbound: Optional[InboundHandler] = None @@ -217,12 +233,23 @@ class WebSocketRelayTransport: self._descriptor_ready: asyncio.Future[CapabilityDescriptor] | None = None # requestId -> future awaiting the matching outbound_result. self._pending: Dict[str, asyncio.Future[Dict[str, Any]]] = {} + # Phase 5 §5.3: future awaiting the connector's going_idle_ack. + self._going_idle_ack: asyncio.Future[None] | None = None self._closing = False # ── lifecycle ──────────────────────────────────────────────────────── async def connect(self) -> bool: + await self._dial_and_start() + return True + + async def _dial_and_start(self) -> None: + """Open the socket, start the reader, send hello. Used by connect() and + by the reconnect supervisor on a re-dial.""" loop = asyncio.get_running_loop() self._descriptor_ready = loop.create_future() + # A fresh handshake is coming; clear any stale descriptor so handshake() + # awaits the new one (matters on a re-dial). + self._descriptor = None headers = self._upgrade_headers() if headers: self._ws = await websockets.connect(self._url, additional_headers=headers) # type: ignore[union-attr] @@ -231,7 +258,6 @@ class WebSocketRelayTransport: self._reader = asyncio.create_task(self._read_loop(), name="relay-ws-reader") # Send hello; the descriptor arrives via the reader and resolves handshake(). await self._send({"type": "hello", "platform": self._platform, "botId": self._bot_id}) - return True def _upgrade_headers(self) -> Dict[str, str]: """Auth headers for the WS upgrade, or {} when no secret is configured. @@ -252,6 +278,13 @@ class WebSocketRelayTransport: async def disconnect(self) -> None: self._closing = True + if self._supervisor is not None: + self._supervisor.cancel() + try: + await self._supervisor + except (asyncio.CancelledError, Exception): # noqa: BLE001 - best-effort teardown + pass + self._supervisor = None if self._reader is not None: self._reader.cancel() try: @@ -270,6 +303,8 @@ class WebSocketRelayTransport: if not fut.done(): fut.set_exception(RuntimeError("relay transport closed")) self._pending.clear() + if self._going_idle_ack is not None and not self._going_idle_ack.done(): + self._going_idle_ack.set_exception(RuntimeError("relay transport closed")) async def handshake(self) -> CapabilityDescriptor: if self._descriptor is not None: @@ -302,6 +337,44 @@ class WebSocketRelayTransport: async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None: await self._send({"type": "interrupt", "session_key": session_key, "reason": reason}) + # ── going-idle / buffered-flip (Phase 5 §5.3) ──────────────────────── + async def go_idle(self, timeout_s: float = 10.0) -> bool: + """Ask the connector to flip this instance's destination to buffered-only. + + Sends ``going_idle`` and awaits the connector's ``going_idle_ack`` — the + connector-AUTHORITATIVE confirmation that live delivery has stopped and + subsequent inbound buffers durably (Q-5.3c). Returns True on ack, False on + timeout / not-connected (the caller proceeds to close anyway — at worst a + live event races a closing socket exactly as before §5.3, no regression). + + The gateway stays serving (the read loop keeps handling inbound) until the + ack, so an event landing in the flip window is delivered live, not lost. + """ + if self._ws is None: + return False + loop = asyncio.get_running_loop() + self._going_idle_ack = loop.create_future() + try: + await self._send({"type": "going_idle"}) + await asyncio.wait_for(self._going_idle_ack, timeout=timeout_s) + return True + except (asyncio.TimeoutError, Exception): # noqa: BLE001 - ack is best-effort + return False + finally: + self._going_idle_ack = None + + async def _send_inbound_ack(self, buffer_id: str) -> None: + """Acknowledge durable receipt of a buffered inbound delivery (§5.3). + + Sent after the adapter has durably taken a buffered inbound event the + connector replayed on reconnect; the connector acks the buffer entry only + after this, giving drain-without-dup on the delivery leg. + """ + try: + await self._send({"type": "inbound_ack", "bufferId": buffer_id}) + except Exception: # noqa: BLE001 - a failed ack just redelivers the entry next time + logger.debug("relay: inbound_ack send failed for %s", buffer_id) + async def _request_response( self, action: Dict[str, Any], frame_type: str = "outbound" ) -> Dict[str, Any]: @@ -338,9 +411,42 @@ class WebSocketRelayTransport: await self._handle_frame(line) except asyncio.CancelledError: raise - except Exception as exc: # noqa: BLE001 - log + let the task end; reconnection is caller policy + except Exception as exc: # noqa: BLE001 - log + let the task end; reconnection handled below if not self._closing: logger.warning("relay ws read loop ended: %s", exc) + # Phase 5 §5.3: the socket closed. If reconnect is enabled and this was + # NOT a deliberate disconnect(), kick the reconnect supervisor so the + # gateway re-dials + re-handshakes (which triggers the connector's + # buffered-flip drain on the new handshake). Self-scheduling: the reader + # ends here, the supervisor re-dials and starts a fresh reader. + if self._reconnect and not self._closing and (self._supervisor is None or self._supervisor.done()): + self._supervisor = asyncio.create_task( + self._reconnect_loop(), name="relay-ws-reconnect" + ) + + async def _reconnect_loop(self) -> None: + """Re-dial the connector with capped exponential backoff until reconnected + or disconnect() is called. NET-NEW for §5.3: a re-established socket makes + the connector replay this instance's buffered-only backlog on the new + handshake (the delivery-leg onResume). Never raises out (a re-dial failure + just retries); ends when a dial succeeds (its reader takes over) or closing.""" + backoff = self._reconnect_backoff_s + while not self._closing: + try: + await asyncio.sleep(backoff) + except asyncio.CancelledError: + raise + if self._closing: + return + try: + await self._dial_and_start() + logger.info("relay ws reconnected") + return # the fresh reader is running; supervisor's job is done + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 - keep retrying on dial failure + logger.warning("relay ws reconnect failed: %s", exc) + backoff = min(backoff * 2, self._reconnect_max_backoff_s) async def _handle_frame(self, line: str) -> None: try: @@ -358,6 +464,18 @@ class WebSocketRelayTransport: if self._inbound is not None: event = _event_from_wire(frame.get("event", {})) await self._inbound(event) + # Phase 5 §5.3: a buffered delivery (replayed on reconnect) carries + # a bufferId; ack it after the handler has durably taken it so the + # connector advances its delivery-leg buffer cursor (no dup). A live + # delivery has no bufferId — nothing to ack. + buffer_id = frame.get("bufferId") + if buffer_id: + await self._send_inbound_ack(str(buffer_id)) + elif ftype == "going_idle_ack": + # Phase 5 §5.3: the connector confirmed our destination is now + # buffered-only; resolve the waiter go_idle() is blocked on. + if self._going_idle_ack is not None and not self._going_idle_ack.done(): + self._going_idle_ack.set_result(None) elif ftype == "outbound_result": fut = self._pending.get(frame.get("requestId", "")) if fut is not None and not fut.done(): diff --git a/tests/gateway/relay/test_relay_going_idle.py b/tests/gateway/relay/test_relay_going_idle.py new file mode 100644 index 00000000000..ad4e0bf3618 --- /dev/null +++ b/tests/gateway/relay/test_relay_going_idle.py @@ -0,0 +1,243 @@ +"""Phase 5 §5.3 — going-idle / buffered-flip primitive (gateway side). + +Exercises the WebSocketRelayTransport's going_idle/ack handshake, the +buffered-inbound ack (a bufferId-carrying inbound is acked after the handler +runs), the NET-NEW reconnect loop (re-dial + re-handshake after an unexpected +close), and the RelayAdapter emitting going_idle from its existing drain +(disconnect) transition. All against a real in-process websockets server. +""" + +from __future__ import annotations + +import asyncio +import json + +import pytest +import pytest_asyncio + +from gateway.relay.ws_transport import WebSocketRelayTransport, WEBSOCKETS_AVAILABLE + +pytestmark = pytest.mark.skipif(not WEBSOCKETS_AVAILABLE, reason="websockets not installed") + +if WEBSOCKETS_AVAILABLE: + import websockets + + +DESCRIPTOR = { + "contract_version": 1, + "platform": "discord", + "label": "Discord", + "max_message_length": 2000, + "supports_draft_streaming": False, + "supports_edit": True, + "supports_threads": True, + "markdown_dialect": "discord", + "len_unit": "chars", +} + + +class _IdleAwareServer: + """Connector stub: descriptor on hello, acks going_idle, records inbound_acks, + and can push buffered inbound frames (with bufferId) after handshake.""" + + def __init__(self): + self.received: list[dict] = [] + self.inbound_acks: list[str] = [] + self.going_idle_count = 0 + self._server = None + self.url = "" + # Frames to push right after each handshake (e.g. buffered backlog replay). + self._to_push: list[dict] = [] + self.connections = 0 + + async def start(self): + self._server = await websockets.serve(self._handle, "127.0.0.1", 0) + sock = next(iter(self._server.sockets)) + self.url = f"ws://127.0.0.1:{sock.getsockname()[1]}" + + async def stop(self): + if self._server is not None: + self._server.close() + await self._server.wait_closed() + + async def _handle(self, ws): + self.connections += 1 + try: + async for raw in ws: + for line in str(raw).split("\n"): + if not line.strip(): + continue + frame = json.loads(line) + self.received.append(frame) + await self._on_frame(ws, frame) + except Exception: + pass + + async def _on_frame(self, ws, frame): + ftype = frame.get("type") + if ftype == "hello": + await ws.send(json.dumps({"type": "descriptor", "descriptor": DESCRIPTOR}) + "\n") + for f in self._to_push: + await ws.send(json.dumps(f) + "\n") + elif ftype == "going_idle": + self.going_idle_count += 1 + await ws.send(json.dumps({"type": "going_idle_ack"}) + "\n") + elif ftype == "inbound_ack": + self.inbound_acks.append(frame.get("bufferId")) + + +@pytest_asyncio.fixture +async def server(): + srv = _IdleAwareServer() + await srv.start() + yield srv + await srv.stop() + + +@pytest.mark.asyncio +async def test_go_idle_awaits_ack(server): + t = WebSocketRelayTransport(server.url, "discord", "appShared") + await t.connect() + try: + await t.handshake() + acked = await t.go_idle(timeout_s=2) + assert acked is True + assert server.going_idle_count == 1 + assert any(f["type"] == "going_idle" for f in server.received) + finally: + await t.disconnect() + + +@pytest.mark.asyncio +async def test_go_idle_returns_false_on_timeout(server): + # A server that never acks going_idle -> go_idle returns False (caller closes anyway). + async def no_ack(ws, frame): + if frame.get("type") == "hello": + await ws.send(json.dumps({"type": "descriptor", "descriptor": DESCRIPTOR}) + "\n") + # deliberately ignore going_idle + + server._on_frame = no_ack # type: ignore[assignment] + t = WebSocketRelayTransport(server.url, "discord", "appShared") + await t.connect() + try: + await t.handshake() + acked = await t.go_idle(timeout_s=0.3) + assert acked is False + finally: + await t.disconnect() + + +@pytest.mark.asyncio +async def test_buffered_inbound_is_acked_after_handler(server): + # A buffered delivery (bufferId present) is acked AFTER the handler runs; a + # live delivery (no bufferId) is not acked. + server._to_push = [ + { + "type": "inbound", + "event": { + "text": "buffered", + "message_type": "text", + "source": {"platform": "discord", "chat_id": "c1", "chat_type": "dm"}, + }, + "bufferId": "buf-42", + }, + { + "type": "inbound", + "event": { + "text": "live", + "message_type": "text", + "source": {"platform": "discord", "chat_id": "c1", "chat_type": "dm"}, + }, + }, + ] + seen = [] + + async def handler(ev): + seen.append(ev.text) + + t = WebSocketRelayTransport(server.url, "discord", "appShared") + t.set_inbound_handler(handler) + await t.connect() + try: + await t.handshake() + await asyncio.sleep(0.1) + assert "buffered" in seen and "live" in seen + # Only the buffered (bufferId) delivery was acked. + assert server.inbound_acks == ["buf-42"] + finally: + await t.disconnect() + + +@pytest.mark.asyncio +async def test_reconnect_redials_after_unexpected_close(): + # A server that drops the FIRST connection right after handshake; the + # transport with reconnect=True re-dials and handshakes again. + drops = {"n": 0} + srv = _IdleAwareServer() + + async def handle(ws): + srv.connections += 1 + async for raw in ws: + for line in str(raw).split("\n"): + if not line.strip(): + continue + frame = json.loads(line) + if frame.get("type") == "hello": + await ws.send(json.dumps({"type": "descriptor", "descriptor": DESCRIPTOR}) + "\n") + if drops["n"] == 0: + drops["n"] += 1 + await ws.close() # force an unexpected close on the first connection + return + + srv._server = await websockets.serve(handle, "127.0.0.1", 0) + sock = next(iter(srv._server.sockets)) + srv.url = f"ws://127.0.0.1:{sock.getsockname()[1]}" + t = WebSocketRelayTransport(srv.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=0.05) + try: + await t.connect() + await t.handshake() + # First connection is dropped server-side; the reconnect loop re-dials. + await asyncio.sleep(0.5) + assert srv.connections >= 2 + finally: + await t.disconnect() + srv._server.close() + await srv._server.wait_closed() + + +@pytest.mark.asyncio +async def test_no_reconnect_after_deliberate_disconnect(server): + t = WebSocketRelayTransport(server.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=0.05) + await t.connect() + await t.handshake() + before = server.connections + await t.disconnect() + await asyncio.sleep(0.3) + # A deliberate disconnect must NOT trigger the reconnect loop. + assert server.connections == before + + +@pytest.mark.asyncio +async def test_adapter_emits_going_idle_on_disconnect(server): + # The RelayAdapter emits going_idle as part of its existing disconnect (drain) + # transition, then tears down the transport. + from gateway.config import PlatformConfig + from gateway.relay.adapter import RelayAdapter + from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor + + placeholder = CapabilityDescriptor( + contract_version=CONTRACT_VERSION, + platform="discord", + label="Relay", + max_message_length=4096, + supports_draft_streaming=False, + supports_edit=True, + supports_threads=False, + markdown_dialect="plain", + len_unit="chars", + ) + transport = WebSocketRelayTransport(server.url, "discord", "appShared") + adapter = RelayAdapter(PlatformConfig(), placeholder, transport=transport) + await adapter.connect() + await adapter.disconnect() + assert server.going_idle_count == 1