diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index 93dc42fbe6d..a9161f900cd 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -4269,18 +4269,30 @@ async def pty_ws(ws: WebSocket) -> None: async def gateway_ws(ws: WebSocket) -> None: peer = _ws_client_label(ws) if not _DASHBOARD_EMBEDDED_CHAT_ENABLED: - _log.debug("gateway-ws reject peer=%s reason=embedded_chat_disabled", peer) + _log.warning( + "gateway-ws reject peer=%s reason=embedded_chat_disabled close_code=4403", + peer, + ) await ws.close(code=4403) return token = ws.query_params.get("token", "") if not hmac.compare_digest(token.encode(), _SESSION_TOKEN.encode()): - _log.warning("gateway-ws reject peer=%s reason=bad_token", peer) + _log.warning( + "gateway-ws reject peer=%s reason=bad_token token_len=%d close_code=4401", + peer, + len(token), + ) await ws.close(code=4401) return if not _ws_client_is_allowed(ws): - _log.warning("gateway-ws reject peer=%s reason=non_loopback_client", peer) + _log.warning( + "gateway-ws reject peer=%s reason=non_loopback_client " + "bound_host=%s close_code=4403", + peer, + getattr(app.state, "bound_host", ""), + ) await ws.close(code=4403) return @@ -4288,8 +4300,13 @@ async def gateway_ws(ws: WebSocket) -> None: _log.info("gateway-ws connect peer=%s", peer) try: await handle_ws(ws) - except WebSocketDisconnect: - _log.info("gateway-ws disconnect peer=%s", peer) + except WebSocketDisconnect as exc: + _log.info( + "gateway-ws disconnect peer=%s code=%s reason=%s", + peer, + getattr(exc, "code", None), + getattr(exc, "reason", None), + ) except Exception: _log.exception("gateway-ws error peer=%s", peer) raise diff --git a/tui_gateway/ws.py b/tui_gateway/ws.py index 1661811dbd6..25b732a7c97 100644 --- a/tui_gateway/ws.py +++ b/tui_gateway/ws.py @@ -36,6 +36,7 @@ _log = logging.getLogger(__name__) # to flush a WS frame before we mark the transport dead. Protects handler # threads from a wedged socket. _WS_WRITE_TIMEOUT_S = 10.0 +_WS_LOG_PAYLOAD_PREVIEW = 240 # Keep starlette optional at import time; handle_ws uses the real class when # it's available and falls back to a generic Exception sentinel otherwise. @@ -61,9 +62,16 @@ class WSTransport: should use :meth:`write_async` from the loop thread. """ - def __init__(self, ws: Any, loop: asyncio.AbstractEventLoop) -> None: + def __init__( + self, + ws: Any, + loop: asyncio.AbstractEventLoop, + *, + peer: str = "unknown", + ) -> None: self._ws = ws self._loop = loop + self._peer = peer self._closed = False def write(self, obj: dict) -> bool: @@ -88,7 +96,7 @@ class WSTransport: return not self._closed except Exception as exc: self._closed = True - _log.debug("ws write failed: %s", exc) + _log.warning("ws write failed peer=%s error=%s", self._peer, exc) return False async def write_async(self, obj: dict) -> bool: @@ -103,43 +111,86 @@ class WSTransport: await self._ws.send_text(line) except Exception as exc: self._closed = True - _log.debug("ws send failed: %s", exc) + _log.warning("ws send failed peer=%s error=%s", self._peer, exc) def close(self) -> None: self._closed = True +def _ws_peer_label(ws: Any) -> str: + """Return ``host:port`` when available, else a stable placeholder.""" + client = getattr(ws, "client", None) + if client is None: + return "unknown" + host = getattr(client, "host", None) or "unknown" + port = getattr(client, "port", None) + return f"{host}:{port}" if port is not None else host + + async def handle_ws(ws: Any) -> None: """Run one WebSocket session. Wire-compatible with ``tui_gateway.entry``.""" - await ws.accept() - - transport = WSTransport(ws, asyncio.get_running_loop()) - - await transport.write_async( - { - "jsonrpc": "2.0", - "method": "event", - "params": { - "type": "gateway.ready", - "payload": {"skin": server.resolve_skin()}, - }, - } - ) + peer = _ws_peer_label(ws) + transport: WSTransport | None = None + messages = 0 + parse_errors = 0 + dispatch_crashes = 0 + send_failures = 0 + disconnect_reason = "not_connected" try: + await ws.accept() + disconnect_reason = "connected" + _log.info("ws accepted peer=%s", peer) + + transport = WSTransport(ws, asyncio.get_running_loop(), peer=peer) + + ready_ok = await transport.write_async( + { + "jsonrpc": "2.0", + "method": "event", + "params": { + "type": "gateway.ready", + "payload": {"skin": server.resolve_skin()}, + }, + } + ) + if not ready_ok: + disconnect_reason = "ready_send_failed" + send_failures += 1 + _log.error("ws ready frame send failed peer=%s", peer) + return + while True: try: raw = await ws.receive_text() - except _WebSocketDisconnect: + except _WebSocketDisconnect as exc: + disconnect_reason = ( + "client_disconnect(" + f"code={getattr(exc, 'code', None)}," + f"reason={getattr(exc, 'reason', None)})" + ) + break + except Exception: + disconnect_reason = "receive_failed" + _log.exception("ws receive failed peer=%s", peer) break line = raw.strip() if not line: continue + messages += 1 try: req = json.loads(line) - except json.JSONDecodeError: + except json.JSONDecodeError as exc: + parse_errors += 1 + _log.warning( + "ws parse error peer=%s index=%d error=%s payload=%r", + peer, + messages, + exc, + line[:_WS_LOG_PAYLOAD_PREVIEW], + ) ok = await transport.write_async( { "jsonrpc": "2.0", @@ -148,6 +199,9 @@ async def handle_ws(ws: Any) -> None: } ) if not ok: + disconnect_reason = "send_failed_after_parse_error" + send_failures += 1 + _log.warning("ws parse-error reply send failed peer=%s", peer) break continue @@ -156,19 +210,69 @@ async def handle_ws(ws: Any) -> None: # the transport we pass in (a separate thread, so transport.write # is the safe path there). For inline handlers it returns the # response dict, which we write here from the loop. - resp = await asyncio.to_thread(server.dispatch, req, transport) + req_id = req.get("id") if isinstance(req, dict) else None + req_method = req.get("method") if isinstance(req, dict) else None + try: + resp = await asyncio.to_thread(server.dispatch, req, transport) + except Exception: + dispatch_crashes += 1 + _log.exception( + "ws dispatch crash peer=%s id=%s method=%s", + peer, + req_id, + req_method, + ) + ok = await transport.write_async( + { + "jsonrpc": "2.0", + "error": {"code": -32603, "message": "internal error"}, + "id": req_id if req_id is not None else None, + } + ) + if not ok: + disconnect_reason = "send_failed_after_dispatch_crash" + send_failures += 1 + _log.warning( + "ws dispatch-crash reply send failed peer=%s id=%s method=%s", + peer, + req_id, + req_method, + ) + break + continue if resp is not None and not await transport.write_async(resp): + disconnect_reason = "send_failed_after_response" + send_failures += 1 + _log.warning( + "ws response send failed peer=%s id=%s method=%s", + peer, + req_id, + req_method, + ) break finally: - transport.close() - - # Detach the transport from any sessions it owned so later emits - # fall back to stdio instead of crashing into a closed socket. - for _, sess in list(server._sessions.items()): - if sess.get("transport") is transport: - sess["transport"] = server._stdio_transport + detached_sessions = 0 + if transport is not None: + transport.close() + # Detach the transport from any sessions it owned so later emits + # fall back to stdio instead of crashing into a closed socket. + for _, sess in list(server._sessions.items()): + if sess.get("transport") is transport: + sess["transport"] = server._stdio_transport + detached_sessions += 1 try: await ws.close() - except Exception: - pass + except Exception as exc: + _log.debug("ws close failed peer=%s error=%s", peer, exc) + _log.info( + "ws closed peer=%s reason=%s messages=%d parse_errors=%d " + "dispatch_crashes=%d send_failures=%d detached_sessions=%d", + peer, + disconnect_reason, + messages, + parse_errors, + dispatch_crashes, + send_failures, + detached_sessions, + )