From 96af4bec30a547d64fc035434a249db1e5c00b65 Mon Sep 17 00:00:00 2001 From: Ben Date: Thu, 25 Jun 2026 09:36:22 +1000 Subject: [PATCH] feat(relay): add go_dormant() transport mode for scale-to-zero (0.E0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Net-new WebSocketRelayTransport.go_dormant() + RelayAdapter.go_dormant() — the third transport mode the scale-to-zero behaviour layer needs, distinct from both disconnect() and an unexpected close (decisions.md D12/F14): - disconnect() sets _closing=True and CANCELS the reconnect supervisor (terminal "shutting down for good") -> a suspended machine never re-dials on wake, stranding its buffered backlog. - an unexpected close re-dials IMMEDIATELY -> the socket never stays down, so the platform proxy never suspends the machine. go_dormant(): going_idle->ack (reuse go_idle), then close the socket WITHOUT setting _closing, so the reader's fall-through still arms the reconnect supervisor (wake path stays live) but on the longer _dormant_redial_s cadence so it doesn't fight the platform suspend window. A successful re-dial clears _dormant. Honors the §3.4 wake->reconnect->drain contract. Tests: 6 new in test_relay_going_idle.py incl. the F14 regression guard (routing dormancy through disconnect() fails exactly the 4 wake-path tests). Full relay suite 140 passed. --- gateway/relay/adapter.py | 27 +++ gateway/relay/ws_transport.py | 77 +++++++- tests/gateway/relay/test_relay_going_idle.py | 192 +++++++++++++++++++ 3 files changed, 294 insertions(+), 2 deletions(-) diff --git a/gateway/relay/adapter.py b/gateway/relay/adapter.py index e4f9d84c15c..041fbe2f548 100644 --- a/gateway/relay/adapter.py +++ b/gateway/relay/adapter.py @@ -355,6 +355,33 @@ class RelayAdapter(BasePlatformAdapter): logger.debug("relay going_idle failed during drain", exc_info=True) await self._transport.disconnect() + async def go_dormant(self) -> bool: + """Quiesce the relay for a scale-to-zero suspend (D12 / Phase 0). + + Unlike ``disconnect()`` (terminal teardown for shutdown/restart), this + keeps the adapter's reconnect path armed so the gateway re-dials and + drains its buffered backlog when the machine wakes. Delegates to the + transport's ``go_dormant()`` when available; a transport without it (the + stub) is a no-op that returns False, so callers degrade safely. + + NOTE: deliberately does NOT stop the revocation monitor — going dormant + is not a teardown; the monitor stays live so a real opt-out/revocation + during dormancy is still surfaced on wake. + """ + if self._transport is None: + return False + go_dormant = getattr(self._transport, "go_dormant", None) + if not callable(go_dormant): + return False + try: + result: Any = go_dormant() + if asyncio.iscoroutine(result): + return bool(await result) + return bool(result) + except Exception: # noqa: BLE001 - dormancy is best-effort, never blocks the idle path + logger.debug("relay go_dormant failed", exc_info=True) + return False + async def send( self, chat_id: str, diff --git a/gateway/relay/ws_transport.py b/gateway/relay/ws_transport.py index d21d1c3b29e..3532aa81e91 100644 --- a/gateway/relay/ws_transport.py +++ b/gateway/relay/ws_transport.py @@ -232,6 +232,23 @@ class WebSocketRelayTransport: self._reconnect_backoff_s = reconnect_backoff_s self._reconnect_max_backoff_s = reconnect_max_backoff_s self._supervisor: Optional[asyncio.Task[None]] = None + # scale-to-zero §Phase 0 (D12/F14): a DORMANT close is distinct from both + # disconnect() (terminal: cancels the supervisor) and an unexpected close + # (re-dials immediately). go_dormant() sets this True, then closes the + # socket WITHOUT setting _closing — so _read_loop's fall-through still + # kicks the reconnect supervisor (the wake path stays armed), but the + # supervisor waits on the longer dormant cadence instead of the fast + # reconnect backoff, so it does not fight the platform's suspend window. + # On resume (process unfrozen) the pending wait completes, the re-dial + # succeeds, and the connector drains this instance's buffered backlog on + # the new handshake. Cleared on a successful re-dial (_dial_and_start). + self._dormant = False + # The re-dial poll cadence while dormant. A suspended machine's event + # loop is frozen, so this timer only advances once the machine is awake; + # it just needs to be short enough that a freshly-woken machine re-dials + # promptly (the connector's wake poke is what triggers the platform + # autostart in the first place — §3.4(5)). + self._dormant_redial_s = 1.0 self._ws: Any = None self._reader: Optional[asyncio.Task[None]] = None @@ -268,6 +285,10 @@ class WebSocketRelayTransport: # A fresh handshake is coming; clear any stale descriptor so handshake() # awaits the new one (matters on a re-dial). self._descriptor = None + # scale-to-zero (D12): a successful (re-)dial ends any dormant state — we + # are live again, so a subsequent UNEXPECTED close should reconnect on the + # normal fast backoff, not the dormant cadence. + self._dormant = False headers = self._upgrade_headers() if headers: self._ws = await websockets.connect(self._url, additional_headers=headers) # type: ignore[union-attr] @@ -389,6 +410,50 @@ class WebSocketRelayTransport: finally: self._going_idle_ack = None + async def go_dormant(self, timeout_s: float = 10.0) -> bool: + """Quiesce this transport for a scale-to-zero suspend (D12 / Phase 0). + + Distinct from BOTH ``disconnect()`` and an unexpected close (F14): + - ``disconnect()`` sets ``_closing=True`` and CANCELS the reconnect + supervisor — terminal, "shutting down for good." A machine suspended + after that never re-dials on wake, so its buffered backlog strands. + - An unexpected close re-dials IMMEDIATELY (fast backoff) — the socket + never stays down, so the platform proxy never sees the connection go + away and never suspends the machine. + + ``go_dormant()`` is the third mode the suspend behaviour needs: + 1. ``go_idle()`` → the connector flips this instance to buffered-only + and acks (so inbound that arrives while we sleep buffers durably and + replays on the next handshake). + 2. Close the socket so the platform proxy sees load drop to zero (the + precondition for Fly ``autostop:"suspend"``) — but WITHOUT setting + ``_closing``. The reader's normal end-of-socket fall-through still + arms the reconnect supervisor, so the wake path stays live; the + ``_dormant`` flag just makes that supervisor poll on the dormant + cadence rather than fight the suspend window. + + On resume (process unfrozen) the supervisor's pending wait completes, the + re-dial succeeds, and the connector drains the buffered backlog on the new + handshake. Returns the ``go_idle`` ack result (True on ack); the dormancy + close happens regardless (a missed ack at worst races one live event onto + a closing socket, exactly as §5.3 already tolerates). + + No-op-safe: a transport that never connected (``_ws is None``) just + returns False without closing. + """ + if self._ws is None: + return False + acked = await self.go_idle(timeout_s=timeout_s) + # Mark dormant BEFORE closing so the supervisor (armed by the reader's + # fall-through) takes the dormant cadence, and a racing live event can't + # flip us back to a fast reconnect. + self._dormant = True + try: + await self._ws.close() + except Exception: # noqa: BLE001 - best-effort; the reader still ends + arms reconnect + logger.debug("relay go_dormant: ws.close() raised", exc_info=True) + return acked + async def _send_inbound_ack(self, buffer_id: str) -> None: """Acknowledge durable receipt of a buffered inbound delivery (§5.3). @@ -489,8 +554,16 @@ class WebSocketRelayTransport: or disconnect() is called. NET-NEW for §5.3: a re-established socket makes the connector replay this instance's buffered-only backlog on the new handshake (the delivery-leg onResume). Never raises out (a re-dial failure - just retries); ends when a dial succeeds (its reader takes over) or closing.""" - backoff = self._reconnect_backoff_s + just retries); ends when a dial succeeds (its reader takes over) or closing. + + scale-to-zero (D12): when the close was a deliberate go_dormant() rather + than an unexpected drop, start from the dormant poll cadence. On a + suspended machine the event loop is frozen, so this sleep only advances + once the machine is awake — it just needs to be short enough that a + freshly-woken machine re-dials promptly. A successful _dial_and_start() + clears _dormant, so any LATER unexpected drop reconnects on the normal + fast backoff.""" + backoff = self._dormant_redial_s if self._dormant else self._reconnect_backoff_s while not self._closing: try: await asyncio.sleep(backoff) diff --git a/tests/gateway/relay/test_relay_going_idle.py b/tests/gateway/relay/test_relay_going_idle.py index ad4e0bf3618..05ed8190734 100644 --- a/tests/gateway/relay/test_relay_going_idle.py +++ b/tests/gateway/relay/test_relay_going_idle.py @@ -241,3 +241,195 @@ async def test_adapter_emits_going_idle_on_disconnect(server): await adapter.connect() await adapter.disconnect() assert server.going_idle_count == 1 + + +# ── scale-to-zero go_dormant() (D12 / F14) ─────────────────────────────────── + + +@pytest.mark.asyncio +async def test_go_dormant_emits_going_idle_and_closes_without_terminal_teardown(server): + """go_dormant() flips the connector to buffered-only (going_idle->ack) AND + closes the socket, but does NOT set the terminal _closing flag or cancel the + reconnect supervisor — the F14 distinction from disconnect().""" + t = WebSocketRelayTransport( + server.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=0.05 + ) + await t.connect() + await t.handshake() + try: + acked = await t.go_dormant(timeout_s=2) + assert acked is True + assert server.going_idle_count == 1 + # The socket was closed (dormant), but NOT via the terminal path: + assert t._closing is False # disconnect() would set this True + assert t._dormant is True + # Not a revocation — the auth-revoked latch stays clear. + assert t.auth_revoked is False + finally: + await t.disconnect() + + +@pytest.mark.asyncio +async def test_go_dormant_redials_on_wake_and_drains(server): + """After go_dormant() the reconnect supervisor stays armed, so the gateway + re-dials (simulating a wake) and the connector replays its buffered backlog + on the new handshake. This is the wake->reconnect->drain contract (§3.4).""" + # Queue a buffered inbound to be replayed on the NEXT (wake) handshake. + server._to_push = [ + { + "type": "inbound", + "event": { + "text": "while-asleep", + "message_type": "text", + "source": {"platform": "discord", "chat_id": "c1", "chat_type": "dm"}, + }, + "bufferId": "buf-wake-1", + } + ] + seen: list[str] = [] + + async def handler(ev): + seen.append(ev.text) + + t = WebSocketRelayTransport( + server.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=5.0 + ) + # Dormant re-dial cadence is short so the test wakes promptly even though the + # ordinary reconnect backoff is long (proves the dormant path uses its own). + t._dormant_redial_s = 0.05 + t.set_inbound_handler(handler) + await t.connect() + await t.handshake() + before = server.connections + try: + await t.go_dormant(timeout_s=2) + # The supervisor was armed by the dormant close; it re-dials on the + # dormant cadence (~0.05s), NOT the 5s reconnect backoff. + for _ in range(50): + if server.connections > before and "while-asleep" in seen: + break + await asyncio.sleep(0.05) + assert server.connections > before # re-dialed (woke) + assert "while-asleep" in seen # drained the buffered backlog on reconnect + # The successful re-dial cleared the dormant flag. + assert t._dormant is False + # The buffered entry was acked (this stub re-pushes on every handshake, so + # a long-lived dormant poll may ack it more than once; the invariant is + # that it was drained at least once — a real connector stops replaying an + # acked entry). + assert "buf-wake-1" in server.inbound_acks + finally: + await t.disconnect() + + +@pytest.mark.asyncio +async def test_disconnect_cancels_supervisor_but_go_dormant_does_not(server): + """Direct contrast (F14): disconnect() is terminal (cancels supervisor, no + re-dial); go_dormant() keeps it armed. Guards against a future refactor that + routes dormancy through disconnect().""" + # disconnect(): terminal — no reconnect. + t1 = WebSocketRelayTransport( + server.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=0.05 + ) + await t1.connect() + await t1.handshake() + after_first = server.connections + await t1.disconnect() + await asyncio.sleep(0.3) + assert server.connections == after_first # disconnect did NOT re-dial + assert t1._closing is True + + # go_dormant(): armed — re-dials. + t2 = WebSocketRelayTransport( + server.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=0.05 + ) + t2._dormant_redial_s = 0.05 + await t2.connect() + await t2.handshake() + before = server.connections + try: + await t2.go_dormant(timeout_s=2) + for _ in range(50): + if server.connections > before: + break + await asyncio.sleep(0.05) + assert server.connections > before # go_dormant stayed armed and re-dialed + assert t2._closing is False + finally: + await t2.disconnect() + + +@pytest.mark.asyncio +async def test_go_dormant_noop_when_never_connected(): + """go_dormant() on a transport that never connected is a safe no-op (False), + not a crash.""" + t = WebSocketRelayTransport("ws://127.0.0.1:1", "discord", "appShared") + assert await t.go_dormant(timeout_s=0.1) is False + + +@pytest.mark.asyncio +async def test_adapter_go_dormant_delegates_to_transport(server): + """RelayAdapter.go_dormant() drives the transport's go_dormant (going_idle + + dormant close) without the terminal teardown disconnect() does.""" + from gateway.config import PlatformConfig + from gateway.relay.adapter import RelayAdapter + from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor + + placeholder = CapabilityDescriptor( + contract_version=CONTRACT_VERSION, + platform="discord", + label="Relay", + max_message_length=4096, + supports_draft_streaming=False, + supports_edit=True, + supports_threads=False, + markdown_dialect="plain", + len_unit="chars", + ) + transport = WebSocketRelayTransport( + server.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=0.05 + ) + adapter = RelayAdapter(PlatformConfig(), placeholder, transport=transport) + await adapter.connect() + try: + ok = await adapter.go_dormant() + assert ok is True + assert server.going_idle_count == 1 + assert transport._closing is False # NOT the terminal teardown + assert transport._dormant is True + finally: + await adapter.disconnect() + + +@pytest.mark.asyncio +async def test_adapter_go_dormant_noop_on_stub_transport(): + """An adapter whose transport lacks go_dormant (the stub) degrades to a safe + no-op returning False, never raising.""" + from gateway.config import PlatformConfig + from gateway.relay.adapter import RelayAdapter + from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor + + placeholder = CapabilityDescriptor( + contract_version=CONTRACT_VERSION, + platform="discord", + label="Relay", + max_message_length=4096, + supports_draft_streaming=False, + supports_edit=True, + supports_threads=False, + markdown_dialect="plain", + len_unit="chars", + ) + + class _StubTransport: + async def connect(self): + return True + + def set_inbound_handler(self, h): + pass + + async def handshake(self): + return placeholder + + adapter = RelayAdapter(PlatformConfig(), placeholder, transport=_StubTransport()) + assert await adapter.go_dormant() is False