feat(relay): Phase 5 §5.3 going-idle / buffered-flip primitive (gateway side) (#51572)

The gateway half of the going-idle/buffered-flip primitive (scale-to-zero
PRIMITIVE, not the behaviour). Integrates with the EXISTING drain transition:

- ws_transport: `go_idle()` sends `going_idle` + awaits the connector's
  `going_idle_ack` (connector-authoritative flip-then-ack, Q-5.3c — stays
  serving until the ack so nothing is lost in the flip window); acks a buffered
  inbound (bufferId present) via `inbound_ack` after the handler runs
  (drain-without-dup on the delivery leg); NET-NEW reconnect loop re-dials +
  re-handshakes after an unexpected close (off by default, on in production).
- adapter: emits `going_idle` from its existing `disconnect()` drain seam before
  tearing down the socket; best-effort + guarded (never blocks shutdown).
- transport Protocol + contract doc §3.2 document the 3 new frames.

+6 relay tests (124 pass). NOT in scope: the autonomous idle timer / machine
suspend / NAS health model (deferred behaviour). Ben's relay-adapter solo lane.
This commit is contained in:
Ben Barclay 2026-06-24 09:50:30 +10:00 committed by GitHub
parent 433db17c0a
commit 40fddc9e4c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 439 additions and 2 deletions

View file

@ -186,6 +186,45 @@ tenant**. Tenant is resolved from the event's own discriminator (Discord
token/socket/process delivered it. This keeps one shared bot able to front many
tenants (Phase 6) without overloading an existing field.
### 3.2 Going-idle / buffered-flip primitive (§5.3)
A scale-to-zero PRIMITIVE (not the behaviour — nothing here decides to sleep or
suspends a machine; a later workstream consumes these frames). It lets a gateway
enter a drain/idle transition without losing inbound that arrives while it is
gone, by making the connector buffer for that instance and replay on reconnect.
Three frames (all keyed by the connection's **authenticated** per-instance id —
read off the stored secret record at the WS upgrade, never asserted in a frame):
- `{"type":"going_idle"}` (gateway → connector) — emitted as part of the
gateway's EXISTING drain transition (the adapter sends it before tearing down
the socket). Asks the connector to flip this instance to **buffered-only**.
- `{"type":"going_idle_ack"}` (connector → gateway) — the connector has flipped:
live delivery has stopped and subsequent inbound for this instance buffers
durably. The gateway **stays serving until this ack** (so an event landing in
the flip window is delivered live, not lost — the same SUBSCRIBE-before-serve
ordering discipline as the bus). Only after the ack is it safe to close.
- `{"type":"inbound_ack", "bufferId"}` (gateway → connector) — durable receipt of
a buffered `inbound` delivery (which carries its `bufferId`) replayed on
reconnect. The connector acks the buffer entry only after this, giving
drain-without-dup on the **delivery leg**: an instance that dies mid-drain
redelivers exactly the unacked tail; an acked entry never redelivers.
**Buffer + drain.** While flipped, the connector appends inbound to a durable
per-instance delivery-leg buffer (`delivery:<instanceId>`) instead of pushing it
live. On the gateway's **reconnect** (a NET-NEW reconnect loop re-dials +
re-handshakes after an unexpected close), the new handshake triggers the
connector to drain that backlog over the new socket **in order, ack-gated**,
then clear the flip so live delivery resumes. This reuses the same
`drainWithoutDup` machinery as the Discord→connector ingest leg, applied to the
connector→gateway delivery leg. Connector-authoritative throughout: a gateway can
only flip/drain ITS OWN instance.
> NOT in scope (deferred behaviour): the autonomous idle timer that DECIDES to
> drain, the actual machine suspend, and the NAS suspended-health model. The
> primitive is "when the gateway drains, relay flips to buffered + replays on
> reconnect, with no loss/dup"; WHAT triggers the drain is out of scope.
---
## 4. Outbound: action set

View file

@ -584,6 +584,11 @@ def register_relay_adapter(force: bool = False, url: Optional[str] = None) -> bo
bot_id,
gateway_id=gateway_id,
upgrade_secret=upgrade_secret,
# Phase 5 §5.3: re-dial + re-handshake after an unexpected socket
# close so a gateway that went idle/suspended re-establishes its
# relay socket — which triggers the connector's buffered-flip drain
# (the delivery-leg onResume) on the new handshake.
reconnect=True,
)
return RelayAdapter(config, placeholder, transport=transport)

View file

@ -18,6 +18,7 @@ deprecation cycle until >=2 Class-1 platforms validate them.
from __future__ import annotations
import asyncio
import logging
from typing import Any, Callable, Dict, Optional
@ -254,6 +255,24 @@ class RelayAdapter(BasePlatformAdapter):
async def disconnect(self) -> None:
if self._transport is not None:
# Phase 5 §5.3: emit going_idle as part of the gateway's EXISTING
# drain/shutdown transition (the runner calls adapter.disconnect()
# when the gateway enters `draining`). Asking the connector to flip
# this instance to buffered-only BEFORE we tear down the socket means
# inbound that arrives while we're asleep buffers durably and replays
# on reconnect, instead of being pushed at a closing socket. The
# connector is authoritative (it acks the flip); we stay serving until
# the ack (Q-5.3c). Best-effort + guarded: a transport without go_idle
# (the stub) or a failed/timed-out ack must not block shutdown — we
# proceed to disconnect exactly as before, no regression.
go_idle = getattr(self._transport, "go_idle", None)
if callable(go_idle):
try:
result: Any = go_idle()
if asyncio.iscoroutine(result):
await result
except Exception: # noqa: BLE001 - going-idle is an optimization, never blocks drain
logger.debug("relay going_idle failed during drain", exc_info=True)
await self._transport.disconnect()
async def send(

View file

@ -93,6 +93,19 @@ class RelayTransport(Protocol):
"""
...
async def go_idle(self, timeout_s: float = 10.0) -> bool:
"""Ask the connector to flip this instance to buffered-only (Phase 5 §5.3).
Sends ``going_idle`` and awaits the connector's ``going_idle_ack`` — the
connector-authoritative confirmation that live delivery stopped and inbound
now buffers durably for replay on reconnect (Q-5.3c). Returns True on ack,
False on timeout / not-connected (the caller proceeds to close regardless;
without §5.3 wiring there is simply no buffering). Optional on a transport
(an in-memory stub may not implement it). Emitted as part of the gateway's
EXISTING drain transition not a new idle path.
"""
...
async def send_follow_up(self, action: Dict[str, Any]) -> Dict[str, Any]:
"""Act on a shared-identity capability bound to a session (A2 outbound).

View file

@ -190,6 +190,9 @@ class WebSocketRelayTransport:
outbound_timeout_s: float = _OUTBOUND_TIMEOUT_S,
gateway_id: Optional[str] = None,
upgrade_secret: Optional[str] = None,
reconnect: bool = False,
reconnect_backoff_s: float = 1.0,
reconnect_max_backoff_s: float = 30.0,
) -> None:
if not WEBSOCKETS_AVAILABLE:
raise RuntimeError(
@ -210,6 +213,19 @@ class WebSocketRelayTransport:
self._gateway_id = gateway_id
self._upgrade_secret = upgrade_secret
# Phase 5 §5.3: a NET-NEW reconnect supervisor. The base transport's
# _read_loop just ends on socket close ("reconnection is caller policy");
# with reconnect=True the transport re-dials + re-handshakes after an
# UNEXPECTED close (not a deliberate disconnect()), so a gateway that went
# idle/suspended re-establishes its socket — which makes the connector
# drain that instance's buffered-only delivery-leg backlog (onResume) on
# the new handshake. Off by default so existing tests + the stub are
# unaffected; register_relay_adapter turns it on in production.
self._reconnect = reconnect
self._reconnect_backoff_s = reconnect_backoff_s
self._reconnect_max_backoff_s = reconnect_max_backoff_s
self._supervisor: Optional[asyncio.Task[None]] = None
self._ws: Any = None
self._reader: Optional[asyncio.Task[None]] = None
self._inbound: Optional[InboundHandler] = None
@ -217,12 +233,23 @@ class WebSocketRelayTransport:
self._descriptor_ready: asyncio.Future[CapabilityDescriptor] | None = None
# requestId -> future awaiting the matching outbound_result.
self._pending: Dict[str, asyncio.Future[Dict[str, Any]]] = {}
# Phase 5 §5.3: future awaiting the connector's going_idle_ack.
self._going_idle_ack: asyncio.Future[None] | None = None
self._closing = False
# ── lifecycle ────────────────────────────────────────────────────────
async def connect(self) -> bool:
await self._dial_and_start()
return True
async def _dial_and_start(self) -> None:
"""Open the socket, start the reader, send hello. Used by connect() and
by the reconnect supervisor on a re-dial."""
loop = asyncio.get_running_loop()
self._descriptor_ready = loop.create_future()
# A fresh handshake is coming; clear any stale descriptor so handshake()
# awaits the new one (matters on a re-dial).
self._descriptor = None
headers = self._upgrade_headers()
if headers:
self._ws = await websockets.connect(self._url, additional_headers=headers) # type: ignore[union-attr]
@ -231,7 +258,6 @@ class WebSocketRelayTransport:
self._reader = asyncio.create_task(self._read_loop(), name="relay-ws-reader")
# Send hello; the descriptor arrives via the reader and resolves handshake().
await self._send({"type": "hello", "platform": self._platform, "botId": self._bot_id})
return True
def _upgrade_headers(self) -> Dict[str, str]:
"""Auth headers for the WS upgrade, or {} when no secret is configured.
@ -252,6 +278,13 @@ class WebSocketRelayTransport:
async def disconnect(self) -> None:
self._closing = True
if self._supervisor is not None:
self._supervisor.cancel()
try:
await self._supervisor
except (asyncio.CancelledError, Exception): # noqa: BLE001 - best-effort teardown
pass
self._supervisor = None
if self._reader is not None:
self._reader.cancel()
try:
@ -270,6 +303,8 @@ class WebSocketRelayTransport:
if not fut.done():
fut.set_exception(RuntimeError("relay transport closed"))
self._pending.clear()
if self._going_idle_ack is not None and not self._going_idle_ack.done():
self._going_idle_ack.set_exception(RuntimeError("relay transport closed"))
async def handshake(self) -> CapabilityDescriptor:
if self._descriptor is not None:
@ -302,6 +337,44 @@ class WebSocketRelayTransport:
async def send_interrupt(self, session_key: str, reason: Optional[str] = None) -> None:
await self._send({"type": "interrupt", "session_key": session_key, "reason": reason})
# ── going-idle / buffered-flip (Phase 5 §5.3) ────────────────────────
async def go_idle(self, timeout_s: float = 10.0) -> bool:
"""Ask the connector to flip this instance's destination to buffered-only.
Sends ``going_idle`` and awaits the connector's ``going_idle_ack`` — the
connector-AUTHORITATIVE confirmation that live delivery has stopped and
subsequent inbound buffers durably (Q-5.3c). Returns True on ack, False on
timeout / not-connected (the caller proceeds to close anyway at worst a
live event races a closing socket exactly as before §5.3, no regression).
The gateway stays serving (the read loop keeps handling inbound) until the
ack, so an event landing in the flip window is delivered live, not lost.
"""
if self._ws is None:
return False
loop = asyncio.get_running_loop()
self._going_idle_ack = loop.create_future()
try:
await self._send({"type": "going_idle"})
await asyncio.wait_for(self._going_idle_ack, timeout=timeout_s)
return True
except (asyncio.TimeoutError, Exception): # noqa: BLE001 - ack is best-effort
return False
finally:
self._going_idle_ack = None
async def _send_inbound_ack(self, buffer_id: str) -> None:
"""Acknowledge durable receipt of a buffered inbound delivery (§5.3).
Sent after the adapter has durably taken a buffered inbound event the
connector replayed on reconnect; the connector acks the buffer entry only
after this, giving drain-without-dup on the delivery leg.
"""
try:
await self._send({"type": "inbound_ack", "bufferId": buffer_id})
except Exception: # noqa: BLE001 - a failed ack just redelivers the entry next time
logger.debug("relay: inbound_ack send failed for %s", buffer_id)
async def _request_response(
self, action: Dict[str, Any], frame_type: str = "outbound"
) -> Dict[str, Any]:
@ -338,9 +411,42 @@ class WebSocketRelayTransport:
await self._handle_frame(line)
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001 - log + let the task end; reconnection is caller policy
except Exception as exc: # noqa: BLE001 - log + let the task end; reconnection handled below
if not self._closing:
logger.warning("relay ws read loop ended: %s", exc)
# Phase 5 §5.3: the socket closed. If reconnect is enabled and this was
# NOT a deliberate disconnect(), kick the reconnect supervisor so the
# gateway re-dials + re-handshakes (which triggers the connector's
# buffered-flip drain on the new handshake). Self-scheduling: the reader
# ends here, the supervisor re-dials and starts a fresh reader.
if self._reconnect and not self._closing and (self._supervisor is None or self._supervisor.done()):
self._supervisor = asyncio.create_task(
self._reconnect_loop(), name="relay-ws-reconnect"
)
async def _reconnect_loop(self) -> None:
"""Re-dial the connector with capped exponential backoff until reconnected
or disconnect() is called. NET-NEW for §5.3: a re-established socket makes
the connector replay this instance's buffered-only backlog on the new
handshake (the delivery-leg onResume). Never raises out (a re-dial failure
just retries); ends when a dial succeeds (its reader takes over) or closing."""
backoff = self._reconnect_backoff_s
while not self._closing:
try:
await asyncio.sleep(backoff)
except asyncio.CancelledError:
raise
if self._closing:
return
try:
await self._dial_and_start()
logger.info("relay ws reconnected")
return # the fresh reader is running; supervisor's job is done
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001 - keep retrying on dial failure
logger.warning("relay ws reconnect failed: %s", exc)
backoff = min(backoff * 2, self._reconnect_max_backoff_s)
async def _handle_frame(self, line: str) -> None:
try:
@ -358,6 +464,18 @@ class WebSocketRelayTransport:
if self._inbound is not None:
event = _event_from_wire(frame.get("event", {}))
await self._inbound(event)
# Phase 5 §5.3: a buffered delivery (replayed on reconnect) carries
# a bufferId; ack it after the handler has durably taken it so the
# connector advances its delivery-leg buffer cursor (no dup). A live
# delivery has no bufferId — nothing to ack.
buffer_id = frame.get("bufferId")
if buffer_id:
await self._send_inbound_ack(str(buffer_id))
elif ftype == "going_idle_ack":
# Phase 5 §5.3: the connector confirmed our destination is now
# buffered-only; resolve the waiter go_idle() is blocked on.
if self._going_idle_ack is not None and not self._going_idle_ack.done():
self._going_idle_ack.set_result(None)
elif ftype == "outbound_result":
fut = self._pending.get(frame.get("requestId", ""))
if fut is not None and not fut.done():

View file

@ -0,0 +1,243 @@
"""Phase 5 §5.3 — going-idle / buffered-flip primitive (gateway side).
Exercises the WebSocketRelayTransport's going_idle/ack handshake, the
buffered-inbound ack (a bufferId-carrying inbound is acked after the handler
runs), the NET-NEW reconnect loop (re-dial + re-handshake after an unexpected
close), and the RelayAdapter emitting going_idle from its existing drain
(disconnect) transition. All against a real in-process websockets server.
"""
from __future__ import annotations
import asyncio
import json
import pytest
import pytest_asyncio
from gateway.relay.ws_transport import WebSocketRelayTransport, WEBSOCKETS_AVAILABLE
pytestmark = pytest.mark.skipif(not WEBSOCKETS_AVAILABLE, reason="websockets not installed")
if WEBSOCKETS_AVAILABLE:
import websockets
DESCRIPTOR = {
"contract_version": 1,
"platform": "discord",
"label": "Discord",
"max_message_length": 2000,
"supports_draft_streaming": False,
"supports_edit": True,
"supports_threads": True,
"markdown_dialect": "discord",
"len_unit": "chars",
}
class _IdleAwareServer:
"""Connector stub: descriptor on hello, acks going_idle, records inbound_acks,
and can push buffered inbound frames (with bufferId) after handshake."""
def __init__(self):
self.received: list[dict] = []
self.inbound_acks: list[str] = []
self.going_idle_count = 0
self._server = None
self.url = ""
# Frames to push right after each handshake (e.g. buffered backlog replay).
self._to_push: list[dict] = []
self.connections = 0
async def start(self):
self._server = await websockets.serve(self._handle, "127.0.0.1", 0)
sock = next(iter(self._server.sockets))
self.url = f"ws://127.0.0.1:{sock.getsockname()[1]}"
async def stop(self):
if self._server is not None:
self._server.close()
await self._server.wait_closed()
async def _handle(self, ws):
self.connections += 1
try:
async for raw in ws:
for line in str(raw).split("\n"):
if not line.strip():
continue
frame = json.loads(line)
self.received.append(frame)
await self._on_frame(ws, frame)
except Exception:
pass
async def _on_frame(self, ws, frame):
ftype = frame.get("type")
if ftype == "hello":
await ws.send(json.dumps({"type": "descriptor", "descriptor": DESCRIPTOR}) + "\n")
for f in self._to_push:
await ws.send(json.dumps(f) + "\n")
elif ftype == "going_idle":
self.going_idle_count += 1
await ws.send(json.dumps({"type": "going_idle_ack"}) + "\n")
elif ftype == "inbound_ack":
self.inbound_acks.append(frame.get("bufferId"))
@pytest_asyncio.fixture
async def server():
srv = _IdleAwareServer()
await srv.start()
yield srv
await srv.stop()
@pytest.mark.asyncio
async def test_go_idle_awaits_ack(server):
t = WebSocketRelayTransport(server.url, "discord", "appShared")
await t.connect()
try:
await t.handshake()
acked = await t.go_idle(timeout_s=2)
assert acked is True
assert server.going_idle_count == 1
assert any(f["type"] == "going_idle" for f in server.received)
finally:
await t.disconnect()
@pytest.mark.asyncio
async def test_go_idle_returns_false_on_timeout(server):
# A server that never acks going_idle -> go_idle returns False (caller closes anyway).
async def no_ack(ws, frame):
if frame.get("type") == "hello":
await ws.send(json.dumps({"type": "descriptor", "descriptor": DESCRIPTOR}) + "\n")
# deliberately ignore going_idle
server._on_frame = no_ack # type: ignore[assignment]
t = WebSocketRelayTransport(server.url, "discord", "appShared")
await t.connect()
try:
await t.handshake()
acked = await t.go_idle(timeout_s=0.3)
assert acked is False
finally:
await t.disconnect()
@pytest.mark.asyncio
async def test_buffered_inbound_is_acked_after_handler(server):
# A buffered delivery (bufferId present) is acked AFTER the handler runs; a
# live delivery (no bufferId) is not acked.
server._to_push = [
{
"type": "inbound",
"event": {
"text": "buffered",
"message_type": "text",
"source": {"platform": "discord", "chat_id": "c1", "chat_type": "dm"},
},
"bufferId": "buf-42",
},
{
"type": "inbound",
"event": {
"text": "live",
"message_type": "text",
"source": {"platform": "discord", "chat_id": "c1", "chat_type": "dm"},
},
},
]
seen = []
async def handler(ev):
seen.append(ev.text)
t = WebSocketRelayTransport(server.url, "discord", "appShared")
t.set_inbound_handler(handler)
await t.connect()
try:
await t.handshake()
await asyncio.sleep(0.1)
assert "buffered" in seen and "live" in seen
# Only the buffered (bufferId) delivery was acked.
assert server.inbound_acks == ["buf-42"]
finally:
await t.disconnect()
@pytest.mark.asyncio
async def test_reconnect_redials_after_unexpected_close():
# A server that drops the FIRST connection right after handshake; the
# transport with reconnect=True re-dials and handshakes again.
drops = {"n": 0}
srv = _IdleAwareServer()
async def handle(ws):
srv.connections += 1
async for raw in ws:
for line in str(raw).split("\n"):
if not line.strip():
continue
frame = json.loads(line)
if frame.get("type") == "hello":
await ws.send(json.dumps({"type": "descriptor", "descriptor": DESCRIPTOR}) + "\n")
if drops["n"] == 0:
drops["n"] += 1
await ws.close() # force an unexpected close on the first connection
return
srv._server = await websockets.serve(handle, "127.0.0.1", 0)
sock = next(iter(srv._server.sockets))
srv.url = f"ws://127.0.0.1:{sock.getsockname()[1]}"
t = WebSocketRelayTransport(srv.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=0.05)
try:
await t.connect()
await t.handshake()
# First connection is dropped server-side; the reconnect loop re-dials.
await asyncio.sleep(0.5)
assert srv.connections >= 2
finally:
await t.disconnect()
srv._server.close()
await srv._server.wait_closed()
@pytest.mark.asyncio
async def test_no_reconnect_after_deliberate_disconnect(server):
t = WebSocketRelayTransport(server.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=0.05)
await t.connect()
await t.handshake()
before = server.connections
await t.disconnect()
await asyncio.sleep(0.3)
# A deliberate disconnect must NOT trigger the reconnect loop.
assert server.connections == before
@pytest.mark.asyncio
async def test_adapter_emits_going_idle_on_disconnect(server):
# The RelayAdapter emits going_idle as part of its existing disconnect (drain)
# transition, then tears down the transport.
from gateway.config import PlatformConfig
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
placeholder = CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="discord",
label="Relay",
max_message_length=4096,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=False,
markdown_dialect="plain",
len_unit="chars",
)
transport = WebSocketRelayTransport(server.url, "discord", "appShared")
adapter = RelayAdapter(PlatformConfig(), placeholder, transport=transport)
await adapter.connect()
await adapter.disconnect()
assert server.going_idle_count == 1