feat(relay): add go_dormant() transport mode for scale-to-zero (0.E0)

Net-new WebSocketRelayTransport.go_dormant() + RelayAdapter.go_dormant() —
the third transport mode the scale-to-zero behaviour layer needs, distinct
from both disconnect() and an unexpected close (decisions.md D12/F14):

- disconnect() sets _closing=True and CANCELS the reconnect supervisor
  (terminal "shutting down for good") -> a suspended machine never re-dials
  on wake, stranding its buffered backlog.
- an unexpected close re-dials IMMEDIATELY -> the socket never stays down,
  so the platform proxy never suspends the machine.

go_dormant(): going_idle->ack (reuse go_idle), then close the socket WITHOUT
setting _closing, so the reader's fall-through still arms the reconnect
supervisor (wake path stays live) but on the longer _dormant_redial_s
cadence so it doesn't fight the platform suspend window. A successful re-dial
clears _dormant. Honors the §3.4 wake->reconnect->drain contract.

Tests: 6 new in test_relay_going_idle.py incl. the F14 regression guard
(routing dormancy through disconnect() fails exactly the 4 wake-path tests).
Full relay suite 140 passed.
This commit is contained in:
Ben 2026-06-25 09:36:22 +10:00 committed by Teknium
parent 4aeaba6922
commit 96af4bec30
3 changed files with 294 additions and 2 deletions

View file

@ -355,6 +355,33 @@ class RelayAdapter(BasePlatformAdapter):
logger.debug("relay going_idle failed during drain", exc_info=True)
await self._transport.disconnect()
async def go_dormant(self) -> bool:
"""Quiesce the relay for a scale-to-zero suspend (D12 / Phase 0).
Unlike ``disconnect()`` (terminal teardown for shutdown/restart), this
keeps the adapter's reconnect path armed so the gateway re-dials and
drains its buffered backlog when the machine wakes. Delegates to the
transport's ``go_dormant()`` when available; a transport without it (the
stub) is a no-op that returns False, so callers degrade safely.
NOTE: deliberately does NOT stop the revocation monitor going dormant
is not a teardown; the monitor stays live so a real opt-out/revocation
during dormancy is still surfaced on wake.
"""
if self._transport is None:
return False
go_dormant = getattr(self._transport, "go_dormant", None)
if not callable(go_dormant):
return False
try:
result: Any = go_dormant()
if asyncio.iscoroutine(result):
return bool(await result)
return bool(result)
except Exception: # noqa: BLE001 - dormancy is best-effort, never blocks the idle path
logger.debug("relay go_dormant failed", exc_info=True)
return False
async def send(
self,
chat_id: str,

View file

@ -232,6 +232,23 @@ class WebSocketRelayTransport:
self._reconnect_backoff_s = reconnect_backoff_s
self._reconnect_max_backoff_s = reconnect_max_backoff_s
self._supervisor: Optional[asyncio.Task[None]] = None
# scale-to-zero §Phase 0 (D12/F14): a DORMANT close is distinct from both
# disconnect() (terminal: cancels the supervisor) and an unexpected close
# (re-dials immediately). go_dormant() sets this True, then closes the
# socket WITHOUT setting _closing — so _read_loop's fall-through still
# kicks the reconnect supervisor (the wake path stays armed), but the
# supervisor waits on the longer dormant cadence instead of the fast
# reconnect backoff, so it does not fight the platform's suspend window.
# On resume (process unfrozen) the pending wait completes, the re-dial
# succeeds, and the connector drains this instance's buffered backlog on
# the new handshake. Cleared on a successful re-dial (_dial_and_start).
self._dormant = False
# The re-dial poll cadence while dormant. A suspended machine's event
# loop is frozen, so this timer only advances once the machine is awake;
# it just needs to be short enough that a freshly-woken machine re-dials
# promptly (the connector's wake poke is what triggers the platform
# autostart in the first place — §3.4(5)).
self._dormant_redial_s = 1.0
self._ws: Any = None
self._reader: Optional[asyncio.Task[None]] = None
@ -268,6 +285,10 @@ class WebSocketRelayTransport:
# A fresh handshake is coming; clear any stale descriptor so handshake()
# awaits the new one (matters on a re-dial).
self._descriptor = None
# scale-to-zero (D12): a successful (re-)dial ends any dormant state — we
# are live again, so a subsequent UNEXPECTED close should reconnect on the
# normal fast backoff, not the dormant cadence.
self._dormant = False
headers = self._upgrade_headers()
if headers:
self._ws = await websockets.connect(self._url, additional_headers=headers) # type: ignore[union-attr]
@ -389,6 +410,50 @@ class WebSocketRelayTransport:
finally:
self._going_idle_ack = None
async def go_dormant(self, timeout_s: float = 10.0) -> bool:
"""Quiesce this transport for a scale-to-zero suspend (D12 / Phase 0).
Distinct from BOTH ``disconnect()`` and an unexpected close (F14):
- ``disconnect()`` sets ``_closing=True`` and CANCELS the reconnect
supervisor terminal, "shutting down for good." A machine suspended
after that never re-dials on wake, so its buffered backlog strands.
- An unexpected close re-dials IMMEDIATELY (fast backoff) the socket
never stays down, so the platform proxy never sees the connection go
away and never suspends the machine.
``go_dormant()`` is the third mode the suspend behaviour needs:
1. ``go_idle()`` the connector flips this instance to buffered-only
and acks (so inbound that arrives while we sleep buffers durably and
replays on the next handshake).
2. Close the socket so the platform proxy sees load drop to zero (the
precondition for Fly ``autostop:"suspend"``) but WITHOUT setting
``_closing``. The reader's normal end-of-socket fall-through still
arms the reconnect supervisor, so the wake path stays live; the
``_dormant`` flag just makes that supervisor poll on the dormant
cadence rather than fight the suspend window.
On resume (process unfrozen) the supervisor's pending wait completes, the
re-dial succeeds, and the connector drains the buffered backlog on the new
handshake. Returns the ``go_idle`` ack result (True on ack); the dormancy
close happens regardless (a missed ack at worst races one live event onto
a closing socket, exactly as §5.3 already tolerates).
No-op-safe: a transport that never connected (``_ws is None``) just
returns False without closing.
"""
if self._ws is None:
return False
acked = await self.go_idle(timeout_s=timeout_s)
# Mark dormant BEFORE closing so the supervisor (armed by the reader's
# fall-through) takes the dormant cadence, and a racing live event can't
# flip us back to a fast reconnect.
self._dormant = True
try:
await self._ws.close()
except Exception: # noqa: BLE001 - best-effort; the reader still ends + arms reconnect
logger.debug("relay go_dormant: ws.close() raised", exc_info=True)
return acked
async def _send_inbound_ack(self, buffer_id: str) -> None:
"""Acknowledge durable receipt of a buffered inbound delivery (§5.3).
@ -489,8 +554,16 @@ class WebSocketRelayTransport:
or disconnect() is called. NET-NEW for §5.3: a re-established socket makes
the connector replay this instance's buffered-only backlog on the new
handshake (the delivery-leg onResume). Never raises out (a re-dial failure
just retries); ends when a dial succeeds (its reader takes over) or closing."""
backoff = self._reconnect_backoff_s
just retries); ends when a dial succeeds (its reader takes over) or closing.
scale-to-zero (D12): when the close was a deliberate go_dormant() rather
than an unexpected drop, start from the dormant poll cadence. On a
suspended machine the event loop is frozen, so this sleep only advances
once the machine is awake it just needs to be short enough that a
freshly-woken machine re-dials promptly. A successful _dial_and_start()
clears _dormant, so any LATER unexpected drop reconnects on the normal
fast backoff."""
backoff = self._dormant_redial_s if self._dormant else self._reconnect_backoff_s
while not self._closing:
try:
await asyncio.sleep(backoff)

View file

@ -241,3 +241,195 @@ async def test_adapter_emits_going_idle_on_disconnect(server):
await adapter.connect()
await adapter.disconnect()
assert server.going_idle_count == 1
# ── scale-to-zero go_dormant() (D12 / F14) ───────────────────────────────────
@pytest.mark.asyncio
async def test_go_dormant_emits_going_idle_and_closes_without_terminal_teardown(server):
"""go_dormant() flips the connector to buffered-only (going_idle->ack) AND
closes the socket, but does NOT set the terminal _closing flag or cancel the
reconnect supervisor the F14 distinction from disconnect()."""
t = WebSocketRelayTransport(
server.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=0.05
)
await t.connect()
await t.handshake()
try:
acked = await t.go_dormant(timeout_s=2)
assert acked is True
assert server.going_idle_count == 1
# The socket was closed (dormant), but NOT via the terminal path:
assert t._closing is False # disconnect() would set this True
assert t._dormant is True
# Not a revocation — the auth-revoked latch stays clear.
assert t.auth_revoked is False
finally:
await t.disconnect()
@pytest.mark.asyncio
async def test_go_dormant_redials_on_wake_and_drains(server):
"""After go_dormant() the reconnect supervisor stays armed, so the gateway
re-dials (simulating a wake) and the connector replays its buffered backlog
on the new handshake. This is the wake->reconnect->drain contract (§3.4)."""
# Queue a buffered inbound to be replayed on the NEXT (wake) handshake.
server._to_push = [
{
"type": "inbound",
"event": {
"text": "while-asleep",
"message_type": "text",
"source": {"platform": "discord", "chat_id": "c1", "chat_type": "dm"},
},
"bufferId": "buf-wake-1",
}
]
seen: list[str] = []
async def handler(ev):
seen.append(ev.text)
t = WebSocketRelayTransport(
server.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=5.0
)
# Dormant re-dial cadence is short so the test wakes promptly even though the
# ordinary reconnect backoff is long (proves the dormant path uses its own).
t._dormant_redial_s = 0.05
t.set_inbound_handler(handler)
await t.connect()
await t.handshake()
before = server.connections
try:
await t.go_dormant(timeout_s=2)
# The supervisor was armed by the dormant close; it re-dials on the
# dormant cadence (~0.05s), NOT the 5s reconnect backoff.
for _ in range(50):
if server.connections > before and "while-asleep" in seen:
break
await asyncio.sleep(0.05)
assert server.connections > before # re-dialed (woke)
assert "while-asleep" in seen # drained the buffered backlog on reconnect
# The successful re-dial cleared the dormant flag.
assert t._dormant is False
# The buffered entry was acked (this stub re-pushes on every handshake, so
# a long-lived dormant poll may ack it more than once; the invariant is
# that it was drained at least once — a real connector stops replaying an
# acked entry).
assert "buf-wake-1" in server.inbound_acks
finally:
await t.disconnect()
@pytest.mark.asyncio
async def test_disconnect_cancels_supervisor_but_go_dormant_does_not(server):
"""Direct contrast (F14): disconnect() is terminal (cancels supervisor, no
re-dial); go_dormant() keeps it armed. Guards against a future refactor that
routes dormancy through disconnect()."""
# disconnect(): terminal — no reconnect.
t1 = WebSocketRelayTransport(
server.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=0.05
)
await t1.connect()
await t1.handshake()
after_first = server.connections
await t1.disconnect()
await asyncio.sleep(0.3)
assert server.connections == after_first # disconnect did NOT re-dial
assert t1._closing is True
# go_dormant(): armed — re-dials.
t2 = WebSocketRelayTransport(
server.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=0.05
)
t2._dormant_redial_s = 0.05
await t2.connect()
await t2.handshake()
before = server.connections
try:
await t2.go_dormant(timeout_s=2)
for _ in range(50):
if server.connections > before:
break
await asyncio.sleep(0.05)
assert server.connections > before # go_dormant stayed armed and re-dialed
assert t2._closing is False
finally:
await t2.disconnect()
@pytest.mark.asyncio
async def test_go_dormant_noop_when_never_connected():
"""go_dormant() on a transport that never connected is a safe no-op (False),
not a crash."""
t = WebSocketRelayTransport("ws://127.0.0.1:1", "discord", "appShared")
assert await t.go_dormant(timeout_s=0.1) is False
@pytest.mark.asyncio
async def test_adapter_go_dormant_delegates_to_transport(server):
"""RelayAdapter.go_dormant() drives the transport's go_dormant (going_idle +
dormant close) without the terminal teardown disconnect() does."""
from gateway.config import PlatformConfig
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
placeholder = CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="discord",
label="Relay",
max_message_length=4096,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=False,
markdown_dialect="plain",
len_unit="chars",
)
transport = WebSocketRelayTransport(
server.url, "discord", "appShared", reconnect=True, reconnect_backoff_s=0.05
)
adapter = RelayAdapter(PlatformConfig(), placeholder, transport=transport)
await adapter.connect()
try:
ok = await adapter.go_dormant()
assert ok is True
assert server.going_idle_count == 1
assert transport._closing is False # NOT the terminal teardown
assert transport._dormant is True
finally:
await adapter.disconnect()
@pytest.mark.asyncio
async def test_adapter_go_dormant_noop_on_stub_transport():
"""An adapter whose transport lacks go_dormant (the stub) degrades to a safe
no-op returning False, never raising."""
from gateway.config import PlatformConfig
from gateway.relay.adapter import RelayAdapter
from gateway.relay.descriptor import CONTRACT_VERSION, CapabilityDescriptor
placeholder = CapabilityDescriptor(
contract_version=CONTRACT_VERSION,
platform="discord",
label="Relay",
max_message_length=4096,
supports_draft_streaming=False,
supports_edit=True,
supports_threads=False,
markdown_dialect="plain",
len_unit="chars",
)
class _StubTransport:
async def connect(self):
return True
def set_inbound_handler(self, h):
pass
async def handshake(self):
return placeholder
adapter = RelayAdapter(PlatformConfig(), placeholder, transport=_StubTransport())
assert await adapter.go_dormant() is False