mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Extracts the JSON-RPC transport from stdio into an abstraction so the same
dispatcher drives Ink over stdio AND browser/iOS clients over WebSocket
without duplicating handler logic. Adds a Chat page to the existing web
dashboard that exercises the full surface — streaming, tool calls, slash
commands, model picker, session resume.
Backend
-------
* tui_gateway/transport.py — Transport protocol + contextvar binding + the
module-level StdioTransport. Stream is resolved through a callback so
tests that monkeypatch `_real_stdout` keep working.
* tui_gateway/server.py — write_json and dispatch are now transport-aware.
Backward compatible: no transport bound = legacy stdio path, so entry.py
(Ink's stdio entrypoint) is unchanged externally.
* tui_gateway/ws.py — WSTransport + handle_ws coroutine. Safe to call from
any thread: detects loop-thread deadlock and fire-and-forget schedules
when needed, blocking run_coroutine_threadsafe + future.result otherwise.
* hermes_cli/web_server.py — mounts /api/ws on the existing FastAPI app,
gated by the same ephemeral session token used for REST. Adds
HERMES_DASHBOARD_DEV_TOKEN env override so Vite HMR dev can share the
token with the backend.
Frontend
--------
* web/src/lib/gatewayClient.ts — browser WebSocket JSON-RPC client that
mirrors ui-tui/src/gatewayClient.ts.
* web/src/lib/slashExec.ts — slash command pipeline (slash.exec with
command.dispatch fallback + exec/plugin/alias/skill/send directive
handling), mirrors ui-tui/src/app/createSlashHandler.ts.
* web/src/pages/ChatPage.tsx — transcript + composer driven entirely by
the WS.
* web/src/components/SlashPopover.tsx — autocomplete popover above the
composer, debounced complete.slash.
* web/src/components/ModelPickerDialog.tsx — two-stage provider/model
picker; confirms by emitting /model through the slash pipeline.
* web/src/components/ToolCall.tsx — expandable tool call row (Ink-style
chevron + context + summary/error/diff).
* web/src/App.tsx — logo links to /, Chat entry added to nav.
* web/src/pages/SessionsPage.tsx — every session row gets an Open-in-chat
button that navigates to /chat?resume=<id> (uses session.resume).
* web/vite.config.ts — /api proxy configured with ws: true so WebSocket
upgrades forward in dev mode; injectDevToken plugin reads
HERMES_DASHBOARD_DEV_TOKEN and injects it into the served index.html so
Vite HMR can authenticate against FastAPI without a separate flow.
Tests
-----
tests/hermes_cli/test_web_server.py picks up three new classes:
* TestTuiGatewayWebSocket — handshake, auth rejection, parse errors,
unknown methods, inline + pool handler round-trips, session event
routing, disconnect cleanup.
* TestTuiGatewayTransportParity — byte-identical envelopes for the same
RPC over stdio vs WS (unknown method, inline handler, error envelope,
explicit stdio transport).
* TestTuiGatewayE2EAnyPort — scripted multi-RPC conversation driven
identically via handle_request and via WebSocket; order + shape must
match. This is the "hermes --tui in any port" check.
Existing tests under tests/tui_gateway/ and tests/test_tui_gateway_server.py
all still pass unchanged — backward compat preserved.
Try it
------
hermes dashboard # builds web, serves on :9119, click Chat
Dev with HMR:
export HERMES_DASHBOARD_DEV_TOKEN="dev-\$(openssl rand -hex 16)"
hermes dashboard --no-open
cd web && npm run dev # :5173, /api + /api/ws proxied to :9119
fix(chat): insert tool rows before the streaming assistant message
Transcript used to read "user → empty assistant bubble → tool → bubble
filling in", which is disorienting: the streaming cursor sits at the top
while the "work" rows appear below it chronologically.
Now tool.start inserts the row just before the current streaming
assistant message, so the order reads "user → tools → final message".
If no streaming assistant exists yet (rare), tools still append at the
end; tool.progress / tool.complete match by id regardless of position.
fix(web-chat): font, composer, streaming caret + port GoodVibesHeart
- ChatPage root opts out of App's `font-mondwest uppercase` (dashboard
chrome style) — adds `font-courier normal-case` so transcript prose is
readable mono mixed-case instead of pixel-display caps.
- Composer: textarea + send button wrapped as one bordered unit with
`focus-within` ring; `font-sans` dropped (it mapped to `Collapse`
display). Heights stretch together via `items-stretch`; button is a
flush cap with `border-l` divider.
- Streaming caret no longer wraps to a new line when the assistant
renders a block element. Markdown now takes a `streaming` prop and
injects the caret inside the last block (paragraph, list item, code)
so it hugs the trailing character. Caret sized in em units.
- EmptyState gets a blinking caret + <kbd> shortcut chips.
- Port ui-tui's GoodVibesHeart easter egg to the web: typing "thanks" /
"ty" / "ily" / "good bot" flashes a Lucide heart next to the
connection badge (same regex, same 650ms beat, same palette as
ui-tui/src/app/useMainApp.ts).
174 lines
5.8 KiB
Python
174 lines
5.8 KiB
Python
"""WebSocket transport for the tui_gateway JSON-RPC server.
|
|
|
|
Reuses :func:`tui_gateway.server.dispatch` verbatim so every RPC method, every
|
|
slash command, every approval/clarify/sudo flow, and every agent event flows
|
|
through the same handlers whether the client is Ink over stdio or an iOS /
|
|
web client over WebSocket.
|
|
|
|
Wire protocol
|
|
-------------
|
|
Identical to stdio: newline-delimited JSON-RPC in both directions. The server
|
|
emits a ``gateway.ready`` event immediately after connection accept, then
|
|
echoes responses/events for inbound requests. No framing differences.
|
|
|
|
Mounting
|
|
--------
|
|
from fastapi import WebSocket
|
|
from tui_gateway.ws import handle_ws
|
|
|
|
@app.websocket("/api/ws")
|
|
async def ws(ws: WebSocket):
|
|
await handle_ws(ws)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from typing import Any
|
|
|
|
from tui_gateway import server
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
# Max seconds a pool-dispatched handler will block waiting for the event loop
|
|
# to flush a WS frame before we mark the transport dead. Protects handler
|
|
# threads from a wedged socket.
|
|
_WS_WRITE_TIMEOUT_S = 10.0
|
|
|
|
# Keep starlette optional at import time; handle_ws uses the real class when
|
|
# it's available and falls back to a generic Exception sentinel otherwise.
|
|
try:
|
|
from starlette.websockets import WebSocketDisconnect as _WebSocketDisconnect
|
|
except ImportError: # pragma: no cover - starlette is a required install path
|
|
_WebSocketDisconnect = Exception # type: ignore[assignment]
|
|
|
|
|
|
class WSTransport:
|
|
"""Per-connection WS transport.
|
|
|
|
``write`` is safe to call from any thread *other than* the event loop
|
|
thread that owns the socket. Pool workers (the only real caller) run in
|
|
their own threads, so marshalling onto the loop via
|
|
:func:`asyncio.run_coroutine_threadsafe` + ``future.result()`` is correct
|
|
and deadlock-free there.
|
|
|
|
When called from the loop thread itself (e.g. by ``handle_ws`` for an
|
|
inline response) the same call would deadlock: we'd schedule work onto
|
|
the loop we're currently blocking. We detect that case and fire-and-
|
|
forget instead. Callers that need to know when the bytes are on the wire
|
|
should use :meth:`write_async` from the loop thread.
|
|
"""
|
|
|
|
def __init__(self, ws: Any, loop: asyncio.AbstractEventLoop) -> None:
|
|
self._ws = ws
|
|
self._loop = loop
|
|
self._closed = False
|
|
|
|
def write(self, obj: dict) -> bool:
|
|
if self._closed:
|
|
return False
|
|
|
|
line = json.dumps(obj, ensure_ascii=False)
|
|
|
|
try:
|
|
on_loop = asyncio.get_running_loop() is self._loop
|
|
except RuntimeError:
|
|
on_loop = False
|
|
|
|
if on_loop:
|
|
# Fire-and-forget — don't block the loop waiting on itself.
|
|
self._loop.create_task(self._safe_send(line))
|
|
return True
|
|
|
|
try:
|
|
fut = asyncio.run_coroutine_threadsafe(self._safe_send(line), self._loop)
|
|
fut.result(timeout=_WS_WRITE_TIMEOUT_S)
|
|
return not self._closed
|
|
except Exception as exc:
|
|
self._closed = True
|
|
_log.debug("ws write failed: %s", exc)
|
|
return False
|
|
|
|
async def write_async(self, obj: dict) -> bool:
|
|
"""Send from the owning event loop. Awaits until the frame is on the wire."""
|
|
if self._closed:
|
|
return False
|
|
await self._safe_send(json.dumps(obj, ensure_ascii=False))
|
|
return not self._closed
|
|
|
|
async def _safe_send(self, line: str) -> None:
|
|
try:
|
|
await self._ws.send_text(line)
|
|
except Exception as exc:
|
|
self._closed = True
|
|
_log.debug("ws send failed: %s", exc)
|
|
|
|
def close(self) -> None:
|
|
self._closed = True
|
|
|
|
|
|
async def handle_ws(ws: Any) -> None:
|
|
"""Run one WebSocket session. Wire-compatible with ``tui_gateway.entry``."""
|
|
await ws.accept()
|
|
|
|
transport = WSTransport(ws, asyncio.get_running_loop())
|
|
|
|
await transport.write_async(
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"method": "event",
|
|
"params": {
|
|
"type": "gateway.ready",
|
|
"payload": {"skin": server.resolve_skin()},
|
|
},
|
|
}
|
|
)
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
raw = await ws.receive_text()
|
|
except _WebSocketDisconnect:
|
|
break
|
|
|
|
line = raw.strip()
|
|
if not line:
|
|
continue
|
|
|
|
try:
|
|
req = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
ok = await transport.write_async(
|
|
{
|
|
"jsonrpc": "2.0",
|
|
"error": {"code": -32700, "message": "parse error"},
|
|
"id": None,
|
|
}
|
|
)
|
|
if not ok:
|
|
break
|
|
continue
|
|
|
|
# dispatch() may schedule long handlers on the pool; it returns
|
|
# None in that case and the worker writes the response itself via
|
|
# the transport we pass in (a separate thread, so transport.write
|
|
# is the safe path there). For inline handlers it returns the
|
|
# response dict, which we write here from the loop.
|
|
resp = await asyncio.to_thread(server.dispatch, req, transport)
|
|
if resp is not None and not await transport.write_async(resp):
|
|
break
|
|
finally:
|
|
transport.close()
|
|
|
|
# Detach the transport from any sessions it owned so later emits
|
|
# fall back to stdio instead of crashing into a closed socket.
|
|
for _, sess in list(server._sessions.items()):
|
|
if sess.get("transport") is transport:
|
|
sess["transport"] = server._stdio_transport
|
|
|
|
try:
|
|
await ws.close()
|
|
except Exception:
|
|
pass
|