mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-10 08:32:09 +00:00
Log detailed GUI websocket failure metadata.
Capture richer reject/disconnect/send/parse context for dashboard gateway websocket flows so GUI connection failures are diagnosable from logs.
This commit is contained in:
parent
c30550c552
commit
a7d4ada79c
2 changed files with 155 additions and 34 deletions
|
|
@ -4269,18 +4269,30 @@ async def pty_ws(ws: WebSocket) -> None:
|
|||
async def gateway_ws(ws: WebSocket) -> None:
|
||||
peer = _ws_client_label(ws)
|
||||
if not _DASHBOARD_EMBEDDED_CHAT_ENABLED:
|
||||
_log.debug("gateway-ws reject peer=%s reason=embedded_chat_disabled", peer)
|
||||
_log.warning(
|
||||
"gateway-ws reject peer=%s reason=embedded_chat_disabled close_code=4403",
|
||||
peer,
|
||||
)
|
||||
await ws.close(code=4403)
|
||||
return
|
||||
|
||||
token = ws.query_params.get("token", "")
|
||||
if not hmac.compare_digest(token.encode(), _SESSION_TOKEN.encode()):
|
||||
_log.warning("gateway-ws reject peer=%s reason=bad_token", peer)
|
||||
_log.warning(
|
||||
"gateway-ws reject peer=%s reason=bad_token token_len=%d close_code=4401",
|
||||
peer,
|
||||
len(token),
|
||||
)
|
||||
await ws.close(code=4401)
|
||||
return
|
||||
|
||||
if not _ws_client_is_allowed(ws):
|
||||
_log.warning("gateway-ws reject peer=%s reason=non_loopback_client", peer)
|
||||
_log.warning(
|
||||
"gateway-ws reject peer=%s reason=non_loopback_client "
|
||||
"bound_host=%s close_code=4403",
|
||||
peer,
|
||||
getattr(app.state, "bound_host", ""),
|
||||
)
|
||||
await ws.close(code=4403)
|
||||
return
|
||||
|
||||
|
|
@ -4288,8 +4300,13 @@ async def gateway_ws(ws: WebSocket) -> None:
|
|||
_log.info("gateway-ws connect peer=%s", peer)
|
||||
try:
|
||||
await handle_ws(ws)
|
||||
except WebSocketDisconnect:
|
||||
_log.info("gateway-ws disconnect peer=%s", peer)
|
||||
except WebSocketDisconnect as exc:
|
||||
_log.info(
|
||||
"gateway-ws disconnect peer=%s code=%s reason=%s",
|
||||
peer,
|
||||
getattr(exc, "code", None),
|
||||
getattr(exc, "reason", None),
|
||||
)
|
||||
except Exception:
|
||||
_log.exception("gateway-ws error peer=%s", peer)
|
||||
raise
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ _log = logging.getLogger(__name__)
|
|||
# to flush a WS frame before we mark the transport dead. Protects handler
|
||||
# threads from a wedged socket.
|
||||
_WS_WRITE_TIMEOUT_S = 10.0
|
||||
_WS_LOG_PAYLOAD_PREVIEW = 240
|
||||
|
||||
# Keep starlette optional at import time; handle_ws uses the real class when
|
||||
# it's available and falls back to a generic Exception sentinel otherwise.
|
||||
|
|
@ -61,9 +62,16 @@ class WSTransport:
|
|||
should use :meth:`write_async` from the loop thread.
|
||||
"""
|
||||
|
||||
def __init__(self, ws: Any, loop: asyncio.AbstractEventLoop) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
ws: Any,
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
*,
|
||||
peer: str = "unknown",
|
||||
) -> None:
|
||||
self._ws = ws
|
||||
self._loop = loop
|
||||
self._peer = peer
|
||||
self._closed = False
|
||||
|
||||
def write(self, obj: dict) -> bool:
|
||||
|
|
@ -88,7 +96,7 @@ class WSTransport:
|
|||
return not self._closed
|
||||
except Exception as exc:
|
||||
self._closed = True
|
||||
_log.debug("ws write failed: %s", exc)
|
||||
_log.warning("ws write failed peer=%s error=%s", self._peer, exc)
|
||||
return False
|
||||
|
||||
async def write_async(self, obj: dict) -> bool:
|
||||
|
|
@ -103,43 +111,86 @@ class WSTransport:
|
|||
await self._ws.send_text(line)
|
||||
except Exception as exc:
|
||||
self._closed = True
|
||||
_log.debug("ws send failed: %s", exc)
|
||||
_log.warning("ws send failed peer=%s error=%s", self._peer, exc)
|
||||
|
||||
def close(self) -> None:
|
||||
self._closed = True
|
||||
|
||||
|
||||
def _ws_peer_label(ws: Any) -> str:
|
||||
"""Return ``host:port`` when available, else a stable placeholder."""
|
||||
client = getattr(ws, "client", None)
|
||||
if client is None:
|
||||
return "unknown"
|
||||
host = getattr(client, "host", None) or "unknown"
|
||||
port = getattr(client, "port", None)
|
||||
return f"{host}:{port}" if port is not None else host
|
||||
|
||||
|
||||
async def handle_ws(ws: Any) -> None:
|
||||
"""Run one WebSocket session. Wire-compatible with ``tui_gateway.entry``."""
|
||||
await ws.accept()
|
||||
|
||||
transport = WSTransport(ws, asyncio.get_running_loop())
|
||||
|
||||
await transport.write_async(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "event",
|
||||
"params": {
|
||||
"type": "gateway.ready",
|
||||
"payload": {"skin": server.resolve_skin()},
|
||||
},
|
||||
}
|
||||
)
|
||||
peer = _ws_peer_label(ws)
|
||||
transport: WSTransport | None = None
|
||||
messages = 0
|
||||
parse_errors = 0
|
||||
dispatch_crashes = 0
|
||||
send_failures = 0
|
||||
disconnect_reason = "not_connected"
|
||||
|
||||
try:
|
||||
await ws.accept()
|
||||
disconnect_reason = "connected"
|
||||
_log.info("ws accepted peer=%s", peer)
|
||||
|
||||
transport = WSTransport(ws, asyncio.get_running_loop(), peer=peer)
|
||||
|
||||
ready_ok = await transport.write_async(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "event",
|
||||
"params": {
|
||||
"type": "gateway.ready",
|
||||
"payload": {"skin": server.resolve_skin()},
|
||||
},
|
||||
}
|
||||
)
|
||||
if not ready_ok:
|
||||
disconnect_reason = "ready_send_failed"
|
||||
send_failures += 1
|
||||
_log.error("ws ready frame send failed peer=%s", peer)
|
||||
return
|
||||
|
||||
while True:
|
||||
try:
|
||||
raw = await ws.receive_text()
|
||||
except _WebSocketDisconnect:
|
||||
except _WebSocketDisconnect as exc:
|
||||
disconnect_reason = (
|
||||
"client_disconnect("
|
||||
f"code={getattr(exc, 'code', None)},"
|
||||
f"reason={getattr(exc, 'reason', None)})"
|
||||
)
|
||||
break
|
||||
except Exception:
|
||||
disconnect_reason = "receive_failed"
|
||||
_log.exception("ws receive failed peer=%s", peer)
|
||||
break
|
||||
|
||||
line = raw.strip()
|
||||
if not line:
|
||||
continue
|
||||
messages += 1
|
||||
|
||||
try:
|
||||
req = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
except json.JSONDecodeError as exc:
|
||||
parse_errors += 1
|
||||
_log.warning(
|
||||
"ws parse error peer=%s index=%d error=%s payload=%r",
|
||||
peer,
|
||||
messages,
|
||||
exc,
|
||||
line[:_WS_LOG_PAYLOAD_PREVIEW],
|
||||
)
|
||||
ok = await transport.write_async(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
|
|
@ -148,6 +199,9 @@ async def handle_ws(ws: Any) -> None:
|
|||
}
|
||||
)
|
||||
if not ok:
|
||||
disconnect_reason = "send_failed_after_parse_error"
|
||||
send_failures += 1
|
||||
_log.warning("ws parse-error reply send failed peer=%s", peer)
|
||||
break
|
||||
continue
|
||||
|
||||
|
|
@ -156,19 +210,69 @@ async def handle_ws(ws: Any) -> None:
|
|||
# the transport we pass in (a separate thread, so transport.write
|
||||
# is the safe path there). For inline handlers it returns the
|
||||
# response dict, which we write here from the loop.
|
||||
resp = await asyncio.to_thread(server.dispatch, req, transport)
|
||||
req_id = req.get("id") if isinstance(req, dict) else None
|
||||
req_method = req.get("method") if isinstance(req, dict) else None
|
||||
try:
|
||||
resp = await asyncio.to_thread(server.dispatch, req, transport)
|
||||
except Exception:
|
||||
dispatch_crashes += 1
|
||||
_log.exception(
|
||||
"ws dispatch crash peer=%s id=%s method=%s",
|
||||
peer,
|
||||
req_id,
|
||||
req_method,
|
||||
)
|
||||
ok = await transport.write_async(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"error": {"code": -32603, "message": "internal error"},
|
||||
"id": req_id if req_id is not None else None,
|
||||
}
|
||||
)
|
||||
if not ok:
|
||||
disconnect_reason = "send_failed_after_dispatch_crash"
|
||||
send_failures += 1
|
||||
_log.warning(
|
||||
"ws dispatch-crash reply send failed peer=%s id=%s method=%s",
|
||||
peer,
|
||||
req_id,
|
||||
req_method,
|
||||
)
|
||||
break
|
||||
continue
|
||||
if resp is not None and not await transport.write_async(resp):
|
||||
disconnect_reason = "send_failed_after_response"
|
||||
send_failures += 1
|
||||
_log.warning(
|
||||
"ws response send failed peer=%s id=%s method=%s",
|
||||
peer,
|
||||
req_id,
|
||||
req_method,
|
||||
)
|
||||
break
|
||||
finally:
|
||||
transport.close()
|
||||
|
||||
# Detach the transport from any sessions it owned so later emits
|
||||
# fall back to stdio instead of crashing into a closed socket.
|
||||
for _, sess in list(server._sessions.items()):
|
||||
if sess.get("transport") is transport:
|
||||
sess["transport"] = server._stdio_transport
|
||||
detached_sessions = 0
|
||||
if transport is not None:
|
||||
transport.close()
|
||||
|
||||
# Detach the transport from any sessions it owned so later emits
|
||||
# fall back to stdio instead of crashing into a closed socket.
|
||||
for _, sess in list(server._sessions.items()):
|
||||
if sess.get("transport") is transport:
|
||||
sess["transport"] = server._stdio_transport
|
||||
detached_sessions += 1
|
||||
try:
|
||||
await ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception as exc:
|
||||
_log.debug("ws close failed peer=%s error=%s", peer, exc)
|
||||
_log.info(
|
||||
"ws closed peer=%s reason=%s messages=%d parse_errors=%d "
|
||||
"dispatch_crashes=%d send_failures=%d detached_sessions=%d",
|
||||
peer,
|
||||
disconnect_reason,
|
||||
messages,
|
||||
parse_errors,
|
||||
dispatch_crashes,
|
||||
send_failures,
|
||||
detached_sessions,
|
||||
)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue