diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index 2dc5fc9b60d..a76cf42fc2e 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -14060,6 +14060,15 @@ def start_server( # For explicit non-zero ports, if the port is taken uvicorn catches # OSError inside create_server() and exits with a clear error — no # separate preflight probe needed. + # Loopback binds are the Desktop case: a single local client, no reverse + # proxy in front. A GIL-heavy agent turn can stall the event loop past 20s, + # and uvicorn's ws keepalive ping runs on that same starved loop — so a + # 20s ping timeout kills an otherwise-healthy local connection over a + # recoverable stall (QW-1). Give loopback a longer 60s timeout / 30s + # interval to ride out those stalls. Non-loopback binds sit behind a + # Cloudflare Tunnel (idle timeout ~100s), so keep them at 20/20 to detect + # half-open connections promptly and stay under the tunnel's idle window. + _is_loopback = host in ("127.0.0.1", "localhost", "::1") config = uvicorn.Config( app, host=host, port=port, log_level="warning", # proxy_headers defaults to False so _ws_client_is_allowed sees @@ -14073,9 +14082,9 @@ def start_server( # Detect half-open WS connections (reverse-proxy 524, dropped # tunnels) within ~20-40s so WebSocketDisconnect fires the # disconnect→reap path. 20s stays under Cloudflare Tunnel's idle - # timeout, keeping it warm. - ws_ping_interval=20.0, - ws_ping_timeout=20.0, + # timeout, keeping it warm. Loopback gets a longer window (see above). + ws_ping_interval=30.0 if _is_loopback else 20.0, + ws_ping_timeout=60.0 if _is_loopback else 20.0, ) server = uvicorn.Server(config) @@ -14111,6 +14120,35 @@ def start_server( except Exception as exc: # pragma: no cover - best-effort _log.debug("loop noise filter install skipped: %s", exc) + # ── Loop heartbeat watchdog (CF-1) ─────────────────────────── + # Confirm the GIL-pressure hypothesis in production. Re-arm a 2s + # tick and measure the drift between when it *should* fire and + # when it actually does: a healthy loop drifts ~0, but a turn that + # holds the GIL blocks the loop and the next tick fires late by the + # stall duration. We log that so a stalled-loop WS drop is + # diagnosable from the gateway log. Uses loop.time() (monotonic) + # for drift, and call_later (not a task) so it dies with the loop — + # nothing to cancel on shutdown. + _hb_interval = 2.0 + _hb_stall_threshold = 5.0 + _hb_loop = asyncio.get_running_loop() + + def _loop_heartbeat(expected: float) -> None: + now = _hb_loop.time() + drift = now - expected + if drift > _hb_stall_threshold: + _log.warning( + "event loop stalled %.1fs (GIL pressure suspected)", + drift, + ) + _hb_loop.call_later( + _hb_interval, _loop_heartbeat, now + _hb_interval + ) + + _hb_loop.call_later( + _hb_interval, _loop_heartbeat, _hb_loop.time() + _hb_interval + ) + await server.main_loop() if server.started: await server.shutdown() diff --git a/tui_gateway/ws.py b/tui_gateway/ws.py index 62218e60b9d..2ab4798df12 100644 --- a/tui_gateway/ws.py +++ b/tui_gateway/ws.py @@ -28,6 +28,7 @@ import concurrent.futures import json import logging import socket +import threading from typing import Any from tui_gateway import server @@ -40,6 +41,24 @@ _log = logging.getLogger(__name__) _WS_WRITE_TIMEOUT_S = 10.0 _WS_LOG_PAYLOAD_PREVIEW = 240 +# Per-token streaming frames are coalesced: buffered and flushed as a batch on +# a short timer instead of waking the event loop once per token. A model reply +# emits hundreds of these in a burst, and each one is a loop wakeup competing +# with the agent turn for the GIL — coalescing cuts that churn (CF-2). The task +# that introduced this called them "agent.token"/"agent.thinking"; in this +# codebase the per-token frames are the ``*.delta`` stream events below. Keep +# this set to genuinely high-frequency, display-only events — anything a client +# must see promptly (tool/approval/status/completion frames) is non-streaming +# and flushes the buffer ahead of itself, so ordering is preserved. +_STREAMING_EVENT_TYPES = frozenset({ + "message.delta", + "reasoning.delta", + "thinking.delta", +}) +# Max time a streamed token waits in the buffer before flush (~30 fps). Short +# enough to stay imperceptible to the live token cadence. +_TOKEN_COALESCE_S = 0.033 + # Keep starlette optional at import time; handle_ws uses the real class when # it's available and falls back to a generic Exception sentinel otherwise. try: @@ -75,6 +94,22 @@ class WSTransport: self._loop = loop self._peer = peer self._closed = False + # Token-coalescing buffer (CF-2). Streamed token frames land here and a + # short timer flushes the batch. The lock guards the buffer + the + # "armed" flag against the worker threads that call write(); the timer + # handle is only ever touched on the loop thread. + self._token_lock = threading.Lock() + self._pending_tokens: list[str] = [] + self._token_flush_handle: asyncio.TimerHandle | None = None + self._token_flush_armed = False + + @staticmethod + def _is_streaming_frame(obj: dict) -> bool: + """True for high-frequency per-token frames eligible for coalescing.""" + params = obj.get("params") if isinstance(obj, dict) else None + if not isinstance(params, dict): + return False + return params.get("type") in _STREAMING_EVENT_TYPES def write(self, obj: dict) -> bool: if self._closed: @@ -87,17 +122,43 @@ class WSTransport: except RuntimeError: on_loop = False - if on_loop: - # Fire-and-forget — don't block the loop waiting on itself. - self._loop.create_task(self._safe_send(line)) - return True + # Coalesce streamed token frames: buffer this frame and arm a short + # flush timer instead of waking the loop right now. Cheap and + # non-blocking — the worker returns immediately. Ordering is preserved + # because every non-streaming frame (below) drains the buffer ahead of + # itself. + if self._is_streaming_frame(obj): + with self._token_lock: + self._pending_tokens.append(line) + if not self._token_flush_armed: + self._token_flush_armed = True + # call_soon_threadsafe arms the call_later timer on the loop + # thread and is safe to call from a worker or the loop. + self._loop.call_soon_threadsafe(self._arm_token_flush) + return not self._closed - try: - from agent.async_utils import safe_schedule_threadsafe - fut = safe_schedule_threadsafe(self._safe_send(line), self._loop) + # Non-streaming frame (RPC response, control frame, non-token event): + # append it behind any buffered tokens and flush the whole batch NOW so + # it can never overtake the tokens that preceded it. The send is + # scheduled INSIDE the lock so the on-the-wire order matches the buffer + # order even if the coalesce timer fires on the loop at the same moment. + from agent.async_utils import safe_schedule_threadsafe + with self._token_lock: + self._pending_tokens.append(line) + batch = self._pending_tokens + self._pending_tokens = [] + if on_loop: + # Fire-and-forget — don't block the loop waiting on itself. + self._loop.create_task(self._safe_send_many(batch)) + return True + fut = safe_schedule_threadsafe( + self._safe_send_many(batch), self._loop + ) if fut is None: self._closed = True return False + + try: fut.result(timeout=_WS_WRITE_TIMEOUT_S) return not self._closed except concurrent.futures.TimeoutError: # builtin TimeoutError on 3.11+ @@ -106,8 +167,8 @@ class WSTransport: # already scheduled and will flush once the loop breathes — latching # _closed here permanently silenced live windows after one slow # write (the "subagent window shows zero streaming" bug). Unblock - # the worker thread and keep the transport alive; _safe_send latches - # on a real socket error when the frame actually fails. + # the worker thread and keep the transport alive; _safe_send_many + # latches on a real socket error when the frame actually fails. _log.warning( "ws write slow (loop stalled >%ss) peer=%s — frame left in flight", _WS_WRITE_TIMEOUT_S, self._peer, @@ -121,10 +182,41 @@ class WSTransport: ) return False + def _arm_token_flush(self) -> None: + """Arm the coalesce timer. Runs on the loop thread (call_soon_threadsafe).""" + if self._closed: + return + self._token_flush_handle = self._loop.call_later( + _TOKEN_COALESCE_S, self._flush_tokens + ) + + def _flush_tokens(self) -> None: + """Send buffered tokens as one batch. Runs on the loop thread (timer). + + The send is scheduled under the lock so its wire order is fixed relative + to a concurrent non-streaming flush in :meth:`write`. + """ + with self._token_lock: + self._token_flush_handle = None + self._token_flush_armed = False + if not self._pending_tokens or self._closed: + self._pending_tokens = [] + return + batch = self._pending_tokens + self._pending_tokens = [] + self._loop.create_task(self._safe_send_many(batch)) + async def write_async(self, obj: dict) -> bool: """Send from the owning event loop. Awaits until the frame is on the wire.""" if self._closed: return False + # Flush any buffered streamed tokens ahead of this frame (RPC response / + # control frame) so it can't overtake the tokens that preceded it. + with self._token_lock: + pending = self._pending_tokens + self._pending_tokens = [] + if pending: + await self._safe_send_many(pending) await self._safe_send(json.dumps(obj, ensure_ascii=False)) return not self._closed @@ -138,8 +230,26 @@ class WSTransport: self._peer, type(exc).__name__, exc, ) + async def _safe_send_many(self, lines: list[str]) -> None: + """Send a batch of pre-serialized frames in order on the loop thread.""" + try: + for line in lines: + await self._ws.send_text(line) + except Exception as exc: + self._closed = True + _log.warning( + "ws send failed peer=%s error_type=%s error=%s", + self._peer, type(exc).__name__, exc, + ) + def close(self) -> None: self._closed = True + # Cancel any pending coalesce flush. close() runs on the loop thread + # (the handle_ws finally), so touching the TimerHandle here is safe. + handle = self._token_flush_handle + if handle is not None: + handle.cancel() + self._token_flush_handle = None def _ws_peer_label(ws: Any) -> str: