fix(tui_gateway): prevent WS disconnect under GIL pressure

Three targeted fixes for Desktop GUI WebSocket stability when agent
turns starve the uvicorn event loop of CPU (GIL contention):

1. Loosen ws_ping_timeout for loopback binds (QW-1)
   - Loopback (Desktop): ping 30s interval / 60s timeout
   - Non-loopback (Cloudflare Tunnel): unchanged 20/20
   - A GIL-heavy agent turn can stall the event loop past 20s;
     uvicorn's keepalive ping runs on that same starved loop, so a
     20s timeout kills an otherwise-healthy local connection over a
     recoverable stall. 60s rides out the stall without affecting
     half-open detection on public binds.

2. Coalesce streaming token frames in WSTransport (CF-2)
   - Buffer high-frequency delta frames (message.delta, reasoning.delta,
     thinking.delta) and flush as a batch every ~33ms (~30fps)
   - Non-streaming frames (RPC responses, control/tool/completion events)
     flush pending tokens first — wire ordering preserved
   - Thread-safe via threading.Lock; worker threads return immediately
     instead of blocking on per-token loop wakeups
   - Reduces event-loop wakeup churn by orders of magnitude during model
     streaming, directly cutting GIL pressure

3. Loop heartbeat watchdog (CF-1)
   - Self-rearming call_later tick (2s) measures drift between expected
     and actual fire time using loop.time() (monotonic)
   - Logs 'event loop stalled Ns (GIL pressure suspected)' when drift >5s
   - Turns mysterious WS drops into diagnosable log entries
   - Uses call_later chain (not a task) — dies with the loop, nothing
     to cancel on shutdown

Root cause: uvicorn's ws keepalive ping (20/20s) runs on the same
starved event loop as agent turns. Under GIL pressure from heavy agent
turns or delegation, the loop can't service the ping within 20s, so
the websockets protocol declares the connection dead. Reconnects fail
with ready_send_failed because the old process's loop is still wedged.

None of these fixes touch the model-facing message array, prompt
caching, message role alternation, or the wire protocol — they are
strictly display-transport improvements plus a config tweak and a
diagnostic log.

Tests: 762 passed, 17 skipped (0 failures) across test_tui_gateway_ws,
test_tui_gateway_server, test_web_server, and tui_gateway/ suites.
This commit is contained in:
Peetwan 2026-06-24 17:42:21 +07:00 committed by Teknium
parent 35a0803a3b
commit ebb81f10cb
2 changed files with 160 additions and 12 deletions

View file

@ -14060,6 +14060,15 @@ def start_server(
# For explicit non-zero ports, if the port is taken uvicorn catches
# OSError inside create_server() and exits with a clear error — no
# separate preflight probe needed.
# Loopback binds are the Desktop case: a single local client, no reverse
# proxy in front. A GIL-heavy agent turn can stall the event loop past 20s,
# and uvicorn's ws keepalive ping runs on that same starved loop — so a
# 20s ping timeout kills an otherwise-healthy local connection over a
# recoverable stall (QW-1). Give loopback a longer 60s timeout / 30s
# interval to ride out those stalls. Non-loopback binds sit behind a
# Cloudflare Tunnel (idle timeout ~100s), so keep them at 20/20 to detect
# half-open connections promptly and stay under the tunnel's idle window.
_is_loopback = host in ("127.0.0.1", "localhost", "::1")
config = uvicorn.Config(
app, host=host, port=port, log_level="warning",
# proxy_headers defaults to False so _ws_client_is_allowed sees
@ -14073,9 +14082,9 @@ def start_server(
# Detect half-open WS connections (reverse-proxy 524, dropped
# tunnels) within ~20-40s so WebSocketDisconnect fires the
# disconnect→reap path. 20s stays under Cloudflare Tunnel's idle
# timeout, keeping it warm.
ws_ping_interval=20.0,
ws_ping_timeout=20.0,
# timeout, keeping it warm. Loopback gets a longer window (see above).
ws_ping_interval=30.0 if _is_loopback else 20.0,
ws_ping_timeout=60.0 if _is_loopback else 20.0,
)
server = uvicorn.Server(config)
@ -14111,6 +14120,35 @@ def start_server(
except Exception as exc: # pragma: no cover - best-effort
_log.debug("loop noise filter install skipped: %s", exc)
# ── Loop heartbeat watchdog (CF-1) ───────────────────────────
# Confirm the GIL-pressure hypothesis in production. Re-arm a 2s
# tick and measure the drift between when it *should* fire and
# when it actually does: a healthy loop drifts ~0, but a turn that
# holds the GIL blocks the loop and the next tick fires late by the
# stall duration. We log that so a stalled-loop WS drop is
# diagnosable from the gateway log. Uses loop.time() (monotonic)
# for drift, and call_later (not a task) so it dies with the loop —
# nothing to cancel on shutdown.
_hb_interval = 2.0
_hb_stall_threshold = 5.0
_hb_loop = asyncio.get_running_loop()
def _loop_heartbeat(expected: float) -> None:
now = _hb_loop.time()
drift = now - expected
if drift > _hb_stall_threshold:
_log.warning(
"event loop stalled %.1fs (GIL pressure suspected)",
drift,
)
_hb_loop.call_later(
_hb_interval, _loop_heartbeat, now + _hb_interval
)
_hb_loop.call_later(
_hb_interval, _loop_heartbeat, _hb_loop.time() + _hb_interval
)
await server.main_loop()
if server.started:
await server.shutdown()

View file

@ -28,6 +28,7 @@ import concurrent.futures
import json
import logging
import socket
import threading
from typing import Any
from tui_gateway import server
@ -40,6 +41,24 @@ _log = logging.getLogger(__name__)
_WS_WRITE_TIMEOUT_S = 10.0
_WS_LOG_PAYLOAD_PREVIEW = 240
# Per-token streaming frames are coalesced: buffered and flushed as a batch on
# a short timer instead of waking the event loop once per token. A model reply
# emits hundreds of these in a burst, and each one is a loop wakeup competing
# with the agent turn for the GIL — coalescing cuts that churn (CF-2). The task
# that introduced this called them "agent.token"/"agent.thinking"; in this
# codebase the per-token frames are the ``*.delta`` stream events below. Keep
# this set to genuinely high-frequency, display-only events — anything a client
# must see promptly (tool/approval/status/completion frames) is non-streaming
# and flushes the buffer ahead of itself, so ordering is preserved.
_STREAMING_EVENT_TYPES = frozenset({
"message.delta",
"reasoning.delta",
"thinking.delta",
})
# Max time a streamed token waits in the buffer before flush (~30 fps). Short
# enough to stay imperceptible to the live token cadence.
_TOKEN_COALESCE_S = 0.033
# Keep starlette optional at import time; handle_ws uses the real class when
# it's available and falls back to a generic Exception sentinel otherwise.
try:
@ -75,6 +94,22 @@ class WSTransport:
self._loop = loop
self._peer = peer
self._closed = False
# Token-coalescing buffer (CF-2). Streamed token frames land here and a
# short timer flushes the batch. The lock guards the buffer + the
# "armed" flag against the worker threads that call write(); the timer
# handle is only ever touched on the loop thread.
self._token_lock = threading.Lock()
self._pending_tokens: list[str] = []
self._token_flush_handle: asyncio.TimerHandle | None = None
self._token_flush_armed = False
@staticmethod
def _is_streaming_frame(obj: dict) -> bool:
"""True for high-frequency per-token frames eligible for coalescing."""
params = obj.get("params") if isinstance(obj, dict) else None
if not isinstance(params, dict):
return False
return params.get("type") in _STREAMING_EVENT_TYPES
def write(self, obj: dict) -> bool:
if self._closed:
@ -87,17 +122,43 @@ class WSTransport:
except RuntimeError:
on_loop = False
if on_loop:
# Fire-and-forget — don't block the loop waiting on itself.
self._loop.create_task(self._safe_send(line))
return True
# Coalesce streamed token frames: buffer this frame and arm a short
# flush timer instead of waking the loop right now. Cheap and
# non-blocking — the worker returns immediately. Ordering is preserved
# because every non-streaming frame (below) drains the buffer ahead of
# itself.
if self._is_streaming_frame(obj):
with self._token_lock:
self._pending_tokens.append(line)
if not self._token_flush_armed:
self._token_flush_armed = True
# call_soon_threadsafe arms the call_later timer on the loop
# thread and is safe to call from a worker or the loop.
self._loop.call_soon_threadsafe(self._arm_token_flush)
return not self._closed
try:
from agent.async_utils import safe_schedule_threadsafe
fut = safe_schedule_threadsafe(self._safe_send(line), self._loop)
# Non-streaming frame (RPC response, control frame, non-token event):
# append it behind any buffered tokens and flush the whole batch NOW so
# it can never overtake the tokens that preceded it. The send is
# scheduled INSIDE the lock so the on-the-wire order matches the buffer
# order even if the coalesce timer fires on the loop at the same moment.
from agent.async_utils import safe_schedule_threadsafe
with self._token_lock:
self._pending_tokens.append(line)
batch = self._pending_tokens
self._pending_tokens = []
if on_loop:
# Fire-and-forget — don't block the loop waiting on itself.
self._loop.create_task(self._safe_send_many(batch))
return True
fut = safe_schedule_threadsafe(
self._safe_send_many(batch), self._loop
)
if fut is None:
self._closed = True
return False
try:
fut.result(timeout=_WS_WRITE_TIMEOUT_S)
return not self._closed
except concurrent.futures.TimeoutError: # builtin TimeoutError on 3.11+
@ -106,8 +167,8 @@ class WSTransport:
# already scheduled and will flush once the loop breathes — latching
# _closed here permanently silenced live windows after one slow
# write (the "subagent window shows zero streaming" bug). Unblock
# the worker thread and keep the transport alive; _safe_send latches
# on a real socket error when the frame actually fails.
# the worker thread and keep the transport alive; _safe_send_many
# latches on a real socket error when the frame actually fails.
_log.warning(
"ws write slow (loop stalled >%ss) peer=%s — frame left in flight",
_WS_WRITE_TIMEOUT_S, self._peer,
@ -121,10 +182,41 @@ class WSTransport:
)
return False
def _arm_token_flush(self) -> None:
"""Arm the coalesce timer. Runs on the loop thread (call_soon_threadsafe)."""
if self._closed:
return
self._token_flush_handle = self._loop.call_later(
_TOKEN_COALESCE_S, self._flush_tokens
)
def _flush_tokens(self) -> None:
"""Send buffered tokens as one batch. Runs on the loop thread (timer).
The send is scheduled under the lock so its wire order is fixed relative
to a concurrent non-streaming flush in :meth:`write`.
"""
with self._token_lock:
self._token_flush_handle = None
self._token_flush_armed = False
if not self._pending_tokens or self._closed:
self._pending_tokens = []
return
batch = self._pending_tokens
self._pending_tokens = []
self._loop.create_task(self._safe_send_many(batch))
async def write_async(self, obj: dict) -> bool:
"""Send from the owning event loop. Awaits until the frame is on the wire."""
if self._closed:
return False
# Flush any buffered streamed tokens ahead of this frame (RPC response /
# control frame) so it can't overtake the tokens that preceded it.
with self._token_lock:
pending = self._pending_tokens
self._pending_tokens = []
if pending:
await self._safe_send_many(pending)
await self._safe_send(json.dumps(obj, ensure_ascii=False))
return not self._closed
@ -138,8 +230,26 @@ class WSTransport:
self._peer, type(exc).__name__, exc,
)
async def _safe_send_many(self, lines: list[str]) -> None:
"""Send a batch of pre-serialized frames in order on the loop thread."""
try:
for line in lines:
await self._ws.send_text(line)
except Exception as exc:
self._closed = True
_log.warning(
"ws send failed peer=%s error_type=%s error=%s",
self._peer, type(exc).__name__, exc,
)
def close(self) -> None:
self._closed = True
# Cancel any pending coalesce flush. close() runs on the loop thread
# (the handle_ws finally), so touching the TimerHandle here is safe.
handle = self._token_flush_handle
if handle is not None:
handle.cancel()
self._token_flush_handle = None
def _ws_peer_label(ws: Any) -> str: