hermes-agent/tui_gateway/ws.py
firefly ae94ed1728
fix(tui-gateway): reap leaked slash_worker sessions on disconnect + active_list liveness (re-scoped onto current main)
Salvaged from #35626 (banditburai) and re-scoped after maintainers landed the
parent-death watchdog (slash_worker.py) and PTY process-group teardown
(pty_bridge.py) directly on main. Those pieces are intentionally NOT included
here — this carries only what is still missing:

- C1 disconnect reap: ws.py's `finally` only re-pointed the dead transport at
  stdio. `_close_sessions_for_transport` now reaps `close_on_disconnect`
  sessions and schedules the grace-reap for the rest, offloaded via
  `asyncio.to_thread` so the blocking worker.close() + DB write never stalls
  the uvicorn loop.
- C2 create/close orphan race: `_attach_worker` stores the worker iff
  `_sessions.get(sid) is session` under the lock (else closes it), applied at
  every spawn site incl. the post-turn `_restart_slash_worker`.
- Single idempotent teardown funnel: session.close, WS disconnect, the
  generous-TTL idle reaper, shutdown, and the WS grace-reap all reach
  `_close_session_by_id` → `_teardown_session`; `_finalized`/`_closed` flags
  make concurrent/double teardown a no-op. `_sessions_lock` upgraded to RLock.
- uvicorn `ws_ping_interval/timeout=20s` so a half-open socket (reverse-proxy
  524) becomes a `WebSocketDisconnect` and the C1 path runs.

Plus two review-driven hardening fixes (mine):

- `session.active_list` now skips `_finalized` sessions so the footer
  "N sessions" count reflects attachable sessions instead of only ever
  growing until restart (#38950). Keys on `_finalized` only, NOT the stdio
  sentinel, so a standalone `hermes --tui` session stays visible.
- `_schedule_ws_orphan_reap._reap` pops via `_close_session_by_id`
  (under `_sessions_lock`) instead of `_sessions.pop` under the unrelated
  `_session_resume_lock` (#39591); the resume_lock now only guards the orphan
  re-check against `session.resume`.
- Float env knobs (`HERMES_SLASH_WATCHDOG_*`, `HERMES_TUI_SESSION_TTL_S`)
  parse with a fallback helper so a malformed value can't crash the worker at
  import.

Fixes #32377
Fixes #38950
Addresses #22855

Co-authored-by: banditburai <123342691+banditburai@users.noreply.github.com>
Co-authored-by: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com>
2026-06-08 10:02:05 -07:00

326 lines
12 KiB
Python

"""WebSocket transport for the tui_gateway JSON-RPC server.
Reuses :func:`tui_gateway.server.dispatch` verbatim so every RPC method, every
slash command, every approval/clarify/sudo flow, and every agent event flows
through the same handlers whether the client is Ink over stdio or an iOS /
web client over WebSocket.
Wire protocol
-------------
Identical to stdio: newline-delimited JSON-RPC in both directions. The server
emits a ``gateway.ready`` event immediately after connection accept, then
echoes responses/events for inbound requests. No framing differences.
Mounting
--------
from fastapi import WebSocket
from tui_gateway.ws import handle_ws
@app.websocket("/api/ws")
async def ws(ws: WebSocket):
await handle_ws(ws)
"""
from __future__ import annotations
import asyncio
import json
import logging
import socket
from typing import Any
from tui_gateway import server
_log = logging.getLogger(__name__)
# Max seconds a pool-dispatched handler will block waiting for the event loop
# to flush a WS frame before we mark the transport dead. Protects handler
# threads from a wedged socket.
_WS_WRITE_TIMEOUT_S = 10.0
_WS_LOG_PAYLOAD_PREVIEW = 240
# Keep starlette optional at import time; handle_ws uses the real class when
# it's available and falls back to a generic Exception sentinel otherwise.
try:
from starlette.websockets import WebSocketDisconnect as _WebSocketDisconnect
except ImportError: # pragma: no cover - starlette is a required install path
_WebSocketDisconnect = Exception # type: ignore[assignment]
class WSTransport:
"""Per-connection WS transport.
``write`` is safe to call from any thread *other than* the event loop
thread that owns the socket. Pool workers (the only real caller) run in
their own threads, so marshalling onto the loop via
:func:`asyncio.run_coroutine_threadsafe` + ``future.result()`` is correct
and deadlock-free there.
When called from the loop thread itself (e.g. by ``handle_ws`` for an
inline response) the same call would deadlock: we'd schedule work onto
the loop we're currently blocking. We detect that case and fire-and-
forget instead. Callers that need to know when the bytes are on the wire
should use :meth:`write_async` from the loop thread.
"""
def __init__(
self,
ws: Any,
loop: asyncio.AbstractEventLoop,
*,
peer: str = "unknown",
) -> None:
self._ws = ws
self._loop = loop
self._peer = peer
self._closed = False
def write(self, obj: dict) -> bool:
if self._closed:
return False
line = json.dumps(obj, ensure_ascii=False)
try:
on_loop = asyncio.get_running_loop() is self._loop
except RuntimeError:
on_loop = False
if on_loop:
# Fire-and-forget — don't block the loop waiting on itself.
self._loop.create_task(self._safe_send(line))
return True
try:
from agent.async_utils import safe_schedule_threadsafe
fut = safe_schedule_threadsafe(self._safe_send(line), self._loop)
if fut is None:
self._closed = True
return False
fut.result(timeout=_WS_WRITE_TIMEOUT_S)
return not self._closed
except Exception as exc:
self._closed = True
_log.warning(
"ws write failed peer=%s error_type=%s error=%s",
self._peer, type(exc).__name__, exc,
)
return False
async def write_async(self, obj: dict) -> bool:
"""Send from the owning event loop. Awaits until the frame is on the wire."""
if self._closed:
return False
await self._safe_send(json.dumps(obj, ensure_ascii=False))
return not self._closed
async def _safe_send(self, line: str) -> None:
try:
await self._ws.send_text(line)
except Exception as exc:
self._closed = True
_log.warning(
"ws send failed peer=%s error_type=%s error=%s",
self._peer, type(exc).__name__, exc,
)
def close(self) -> None:
self._closed = True
def _ws_peer_label(ws: Any) -> str:
"""Return ``host:port`` when available, else a stable placeholder."""
client = getattr(ws, "client", None)
if client is None:
return "unknown"
host = getattr(client, "host", None) or "unknown"
port = getattr(client, "port", None)
return f"{host}:{port}" if port is not None else host
def _disable_nagle(ws: Any) -> None:
"""Disable Nagle so streamed JSON-RPC frames go out individually.
Without it the kernel coalesces the small per-token frames, so a burst after
the model's think-pause lands on the client in one tick and no client-side
smoothing can recover the cadence. GUI/WS only; chat platforms don't hit
this path. Best-effort — skip silently if the socket isn't reachable.
"""
try:
scope = getattr(ws, "scope", None) or {}
transport = (scope.get("extensions") or {}).get("transport") or getattr(ws, "transport", None)
sock = transport.get_extra_info("socket") if transport is not None else None
if sock is not None:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except Exception as exc: # pragma: no cover - best-effort tuning
_log.debug("ws TCP_NODELAY skip: %s", exc)
async def handle_ws(ws: Any) -> None:
"""Run one WebSocket session. Wire-compatible with ``tui_gateway.entry``."""
peer = _ws_peer_label(ws)
transport: WSTransport | None = None
messages = 0
parse_errors = 0
dispatch_crashes = 0
send_failures = 0
disconnect_reason = "not_connected"
try:
await ws.accept()
disconnect_reason = "connected"
# Push small streamed frames out immediately instead of letting Nagle
# batch them — keeps the live token cadence intact for GUI clients.
_disable_nagle(ws)
_log.info("ws accepted peer=%s", peer)
transport = WSTransport(ws, asyncio.get_running_loop(), peer=peer)
ready_ok = await transport.write_async(
{
"jsonrpc": "2.0",
"method": "event",
"params": {
"type": "gateway.ready",
"payload": {"skin": server.resolve_skin()},
},
}
)
if not ready_ok:
disconnect_reason = "ready_send_failed"
send_failures += 1
_log.error("ws ready frame send failed peer=%s", peer)
return
while True:
try:
raw = await ws.receive_text()
except _WebSocketDisconnect as exc:
disconnect_reason = (
"client_disconnect("
f"code={getattr(exc, 'code', None)},"
f"reason={getattr(exc, 'reason', None)})"
)
break
except Exception:
disconnect_reason = "receive_failed"
_log.exception("ws receive failed peer=%s", peer)
break
line = raw.strip()
if not line:
continue
messages += 1
try:
req = json.loads(line)
except json.JSONDecodeError as exc:
parse_errors += 1
_log.warning(
"ws parse error peer=%s index=%d error=%s payload=%r",
peer,
messages,
exc,
line[:_WS_LOG_PAYLOAD_PREVIEW],
)
ok = await transport.write_async(
{
"jsonrpc": "2.0",
"error": {"code": -32700, "message": "parse error"},
"id": None,
}
)
if not ok:
disconnect_reason = "send_failed_after_parse_error"
send_failures += 1
_log.warning("ws parse-error reply send failed peer=%s", peer)
break
continue
# dispatch() may schedule long handlers on the pool; it returns
# None in that case and the worker writes the response itself via
# the transport we pass in (a separate thread, so transport.write
# is the safe path there). For inline handlers it returns the
# response dict, which we write here from the loop.
req_id = req.get("id") if isinstance(req, dict) else None
req_method = req.get("method") if isinstance(req, dict) else None
try:
resp = await asyncio.to_thread(server.dispatch, req, transport)
except Exception:
dispatch_crashes += 1
_log.exception(
"ws dispatch crash peer=%s id=%s method=%s",
peer,
req_id,
req_method,
)
ok = await transport.write_async(
{
"jsonrpc": "2.0",
"error": {"code": -32603, "message": "internal error"},
"id": req_id if req_id is not None else None,
}
)
if not ok:
disconnect_reason = "send_failed_after_dispatch_crash"
send_failures += 1
_log.warning(
"ws dispatch-crash reply send failed peer=%s id=%s method=%s",
peer,
req_id,
req_method,
)
break
continue
if resp is not None and not await transport.write_async(resp):
disconnect_reason = "send_failed_after_response"
send_failures += 1
_log.warning(
"ws response send failed peer=%s id=%s method=%s",
peer,
req_id,
req_method,
)
break
finally:
reaped_sessions = 0
detached_sessions = 0
if transport is not None:
transport.close()
# Reap sessions this transport owned (close_on_disconnect sidecar
# sessions) or detach the rest to the drop sentinel so later emits
# don't crash into a closed socket or fall through to desktop stdout
# logs. Detached sessions are handed to the grace-windowed WS-orphan
# reaper inside _close_sessions_for_transport (a quick reconnect /
# session.resume cancels it). This is the single WS-disconnect
# teardown path.
#
# Offloaded: _close_session_by_id does a blocking worker.close()
# (terminate + waits) plus a synchronous DB write — inline that
# would freeze the uvicorn event loop for every other live
# connection.
try:
reaped_sessions, detached_sessions = await asyncio.to_thread(
server._close_sessions_for_transport,
transport,
end_reason="ws_disconnect",
)
except Exception:
_log.exception("ws transport teardown failed peer=%s", peer)
try:
await ws.close()
except Exception as exc:
_log.debug("ws close failed peer=%s error=%s", peer, exc)
_log.info(
"ws closed peer=%s reason=%s messages=%d parse_errors=%d "
"dispatch_crashes=%d send_failures=%d reaped_sessions=%d detached_sessions=%d",
peer,
disconnect_reason,
messages,
parse_errors,
dispatch_crashes,
send_failures,
reaped_sessions,
detached_sessions,
)