mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
feat(web): add /api/pty WebSocket bridge to embed TUI in dashboard
Exposes hermes --tui over a PTY-backed WebSocket so the dashboard can
embed the real TUI rather than reimplement its surface. The browser
attaches xterm.js to the socket; keystrokes flow in, PTY output bytes
flow out.
Architecture:
browser <Terminal> (xterm.js)
│ onData ───► ws.send(keystrokes)
│ onResize ► ws.send('\x1b[RESIZE:cols;rows]')
│ write ◄── ws.onmessage (PTY bytes)
▼
FastAPI /api/pty (token-gated, loopback-only)
▼
PtyBridge (ptyprocess) ── spawns node ui-tui/dist/entry.js ──► tui_gateway + AIAgent
Components
----------
hermes_cli/pty_bridge.py
Thin wrapper around ptyprocess.PtyProcess: byte-safe read/write on the
master fd via os.read/os.write (not PtyProcessUnicode — ANSI is
inherently byte-oriented and UTF-8 boundaries may land mid-read),
non-blocking select-based reads, TIOCSWINSZ resize, idempotent
SIGHUP→SIGTERM→SIGKILL teardown, platform guard (POSIX-only; Windows
is WSL-supported only).
hermes_cli/web_server.py
@app.websocket("/api/pty") endpoint gated by the existing
_SESSION_TOKEN (via ?token= query param since browsers can't set
Authorization on WS upgrades). Loopback-only enforcement. Reader task
uses run_in_executor to pump PTY bytes without blocking the event
loop. Writer loop intercepts a custom \x1b[RESIZE:cols;rows] escape
before forwarding to the PTY. The endpoint resolves the TUI argv
through a _resolve_chat_argv hook so tests can inject fake commands
without building the real TUI.
Tests
-----
tests/hermes_cli/test_pty_bridge.py — 12 unit tests: spawn, stdout,
stdin round-trip, EOF, resize (via TIOCSWINSZ + tput readback), close
idempotency, cwd, env forwarding, unavailable-platform error.
tests/hermes_cli/test_web_server.py — TestPtyWebSocket adds 7 tests:
missing/bad token rejection (close code 4401), stdout streaming,
stdin round-trip, resize escape forwarding, unavailable-platform ANSI
error frame + 1011 close, resume parameter forwarding to argv.
96 tests pass under scripts/run_tests.sh.
(cherry picked from commit 29b337bca7)
feat(web): add Chat tab with xterm.js terminal + Sessions resume button
(cherry picked from commit 3d21aee8 by emozilla, conflicts resolved
against current main: BUILTIN_ROUTES table + plugin slot layout)
fix(tui): replace OSC 52 jargon in /copy confirmation
When the user ran /copy successfully, Ink confirmed with:
sent OSC52 copy sequence (terminal support required)
That reads like a protocol spec to everyone who isn't a terminal
implementer. The caveat was a historical artifact — OSC 52 wasn't
universally supported when this message was written, so the TUI
honestly couldn't guarantee the copy had landed anywhere.
Today every modern terminal (including the dashboard's embedded
xterm.js) handles OSC 52 reliably. Say what the user actually wants
to know — that it copied, and how much — matching the message the
TUI already uses for selection copy:
copied 1482 chars
(cherry picked from commit a0701b1d5a)
docs: document the dashboard Chat tab
AGENTS.md — new subsection under TUI Architecture explaining that the
dashboard embeds the real hermes --tui rather than rewriting it,
with pointers to the pty_bridge + WebSocket endpoint and the rule
'never add a parallel chat surface in React.'
website/docs/user-guide/features/web-dashboard.md — user-facing Chat
section inside the existing Web Dashboard page, covering how it works
(WebSocket + PTY + xterm.js), the Sessions-page resume flow, and
prerequisites (Node.js, ptyprocess, POSIX kernel / WSL on Windows).
(cherry picked from commit 2c2e32cc45)
feat(tui-gateway): transport-aware dispatch + WebSocket sidecar
Decouples the JSON-RPC dispatcher from its I/O sink so the same handler
surface can drive multiple transports concurrently. The PTY chat tab
already speaks to the TUI binary as bytes — this adds a structured
event channel alongside it for dashboard-side React widgets that need
typed events (tool.start/complete, model picker state, slash catalog)
that PTY can't surface.
- `tui_gateway/transport.py` — `Transport` protocol + `contextvars` binding
+ module-level `StdioTransport` fallback. The stdio stream resolves
through a lambda so existing tests that monkey-patch `_real_stdout`
keep passing without modification.
- `tui_gateway/ws.py` — WebSocket transport implementation; FastAPI
endpoint mounting lives in hermes_cli/web_server.py.
- `tui_gateway/server.py`:
- `write_json` routes via session transport (for async events) →
contextvar transport (for in-request writes) → stdio fallback.
- `dispatch(req, transport=None)` binds the transport for the request
lifetime and propagates it to pool workers via `contextvars.copy_context`
so async handlers don't lose their sink.
- `_init_session` and the manual-session create path stash the
request's transport so out-of-band events (subagent.complete, etc.)
fan out to the right peer.
`tui_gateway.entry` (Ink's stdio handshake) is unchanged externally —
it falls through every precedence step into the stdio fallback, byte-
identical to the previous behaviour.
feat(web): ChatSidebar — JSON-RPC sidecar next to xterm.js terminal
Composes the two transports into a single Chat tab:
┌─────────────────────────────────────────┬──────────────┐
│ xterm.js / PTY (emozilla #13379) │ ChatSidebar │
│ the literal hermes --tui process │ /api/ws │
└─────────────────────────────────────────┴──────────────┘
terminal bytes structured events
The terminal pane stays the canonical chat surface — full TUI fidelity,
slash commands, model picker, mouse, skin engine, wide chars all paint
inside the terminal. The sidebar opens a parallel JSON-RPC WebSocket
to the same gateway and renders metadata that PTY can't surface to
React chrome:
• model + provider badge with connection state (click → switch)
• running tool-call list (driven by tool.start / tool.progress /
tool.complete events)
• model picker dialog (gateway-driven, reuses ModelPickerDialog)
The sidecar is best-effort. If the WS can't connect (older gateway,
network hiccup, missing token) the terminal pane keeps working
unimpaired — sidebar just shows the connection-state badge in the
appropriate tone.
- `web/src/components/ChatSidebar.tsx` — new component (~270 lines).
Owns its GatewayClient, drives the model picker through
`slash.exec`, fans tool events into a capped tool list.
- `web/src/pages/ChatPage.tsx` — split layout: terminal pane
(`flex-1`) + sidebar (`w-80`, `lg+` only).
- `hermes_cli/web_server.py` — mount `/api/ws` (token + loopback
guards mirror /api/pty), delegate to `tui_gateway.ws.handle_ws`.
Co-authored-by: emozilla <emozilla@nousresearch.com>
refactor(web): /clean pass on ChatSidebar + ChatPage lint debt
- ChatSidebar: lift gw out of useRef into a useMemo derived from a
reconnect counter. React 19's react-hooks/refs and react-hooks/
set-state-in-effect rules both fire when you touch a ref during
render or call setState from inside a useEffect body. The
counter-derived gw is the canonical pattern for "external resource
that needs to be replaceable on user action" — re-creating the
client comes from bumping `version`, the effect just wires + tears
down. Drops the imperative `gwRef.current = …` reassign in
reconnect, drops the truthy ref guard in JSX. modelLabel +
banner inlined as derived locals (one-off useMemo was overkill).
- ChatPage: lazy-init the banner state from the missing-token check
so the effect body doesn't have to setState on first run. Drops
the unused react-hooks/exhaustive-deps eslint-disable. Adds a
scoped no-control-regex disable on the SGR mouse parser regex
(the \\x1b is intentional for xterm escape sequences).
All my-touched files now lint clean. Remaining warnings on web/
belong to pre-existing files this PR doesn't touch.
Verified: vitest 249/249, ui-tui eslint clean, web tsc clean,
python imports clean.
chore: uptick
fix(web): drop ChatSidebar tool list — events can't cross PTY/WS boundary
The /api/pty endpoint spawns `hermes --tui` as a child process with its
own tui_gateway and _sessions dict; /api/ws runs handle_ws in-process in
the dashboard server with a separate _sessions dict. Tool events fire on
the child's gateway and never reach the WS sidecar, so the sidebar's
tool.start/progress/complete listeners always observed an empty list.
Drop the misleading list (and the now-orphaned ToolCall primitive),
keep model badge + connection state + model picker + error banner —
those work because they're sidecar-local concerns. Surfacing tool calls
in the sidebar requires cross-process forwarding (PTY child opens a
back-WS to the dashboard, gateway tees emits onto stdio + sidecar
transport) — proper feature for a follow-up.
feat(web): wire ChatSidebar tool list to PTY child via /api/pub broadcast
The dashboard's /api/pty spawns hermes --tui as a child process; tool
events fire in the python tui_gateway grandchild and never crossed the
process boundary into the in-process WS sidecar — so the sidebar tool
list was always empty.
Cross-process forwarding:
- tui_gateway: TeeTransport (transport.py) + WsPublisherTransport
(event_publisher.py, sync websockets client). entry.py installs the
tee on _stdio_transport when HERMES_TUI_SIDECAR_URL is set, mirroring
every dispatcher emit to a back-WS without disturbing Ink's stdio
handshake.
- hermes_cli/web_server.py: new /api/pub (publisher) + /api/events
(subscriber) endpoints with a per-channel registry. /api/pty now
accepts ?channel= and propagates the sidecar URL via env. start_server
also stashes app.state.bound_port so the URL is constructable.
- web/src/pages/ChatPage.tsx: generates a channel UUID per mount,
passes it to /api/pty and as a prop to ChatSidebar.
- web/src/components/ChatSidebar.tsx: opens /api/events?channel=, fans
tool.start/progress/complete back into the ToolCall list. Restores
the ToolCall primitive.
Tests: 4 new TestPtyWebSocket cases cover channel propagation,
broadcast fan-out, and missing-channel rejection (10 PTY tests pass,
120 web_server tests overall).
fix(web): address Copilot review on #14890
Five threads, all real:
- gatewayClient.ts: register `message`/`close` listeners BEFORE awaiting
the open handshake. Server emits `gateway.ready` immediately after
accept, so a listener attached after the open promise could race past
the initial skin payload and lose it.
- ChatSidebar.tsx: wire `error`/`close` on the /api/events subscriber
WS into the existing error banner. 4401/4403 (auth/loopback reject)
surface as a "reload the page" message; mid-stream drops surface as
"events feed disconnected" with the existing reconnect button. Clean
unmount closes (1000/1001) stay silent.
- web-dashboard.md: install hint was `pip install hermes-agent[web]` but
ptyprocess lives in the `pty` extra, not `web`. Switch to
`hermes-agent[web,pty]` in both prerequisite blocks.
- AGENTS.md: previous "never add a parallel React chat surface" guidance
was overbroad and contradicted this PR's sidebar. Tightened to forbid
re-implementing the transcript/composer/PTY terminal while explicitly
allowing structured supporting widgets (sidebar / model picker /
inspectors), matching the actual architecture.
- web/package-lock.json: regenerated cleanly so the wterm sibling
workspace paths (extraneous machine-local entries) stop polluting CI.
Tests: 249/249 vitest, 10/10 PTY/events, web tsc clean.
refactor(web): /clean pass on ChatSidebar events handler
Spotted in the round-2 review:
- Banner flashed on clean unmount: `ws.close()` from the effect cleanup
fires `close` with code 1005, opened=true, neither 1000 nor 1001 —
hit the "unexpected drop" branch. Track `unmounting` in the effect
scope and gate the banner through a `surface()` helper so cleanup
closes stay silent.
- DRY the duplicated "events feed disconnected" string into a local
const used by both the error and close handlers.
- Drop the `opened` flag (no longer needed once the unmount guard is
the source of truth for "is this an expected close?").
This commit is contained in:
parent
1143f234e3
commit
f49afd3122
33 changed files with 4259 additions and 612 deletions
|
|
@ -5,7 +5,28 @@ import sys
|
|||
import time
|
||||
import traceback
|
||||
|
||||
from tui_gateway import server
|
||||
from tui_gateway.server import _CRASH_LOG, dispatch, resolve_skin, write_json
|
||||
from tui_gateway.transport import TeeTransport
|
||||
|
||||
|
||||
def _install_sidecar_publisher() -> None:
|
||||
"""Mirror every dispatcher emit to the dashboard sidebar via WS.
|
||||
|
||||
Activated by `HERMES_TUI_SIDECAR_URL`, set by the dashboard's
|
||||
``/api/pty`` endpoint when a chat tab passes a ``channel`` query param.
|
||||
Best-effort: connect failure or runtime drop falls back to stdio-only.
|
||||
"""
|
||||
url = os.environ.get("HERMES_TUI_SIDECAR_URL")
|
||||
|
||||
if not url:
|
||||
return
|
||||
|
||||
from tui_gateway.event_publisher import WsPublisherTransport
|
||||
|
||||
server._stdio_transport = TeeTransport(
|
||||
server._stdio_transport, WsPublisherTransport(url)
|
||||
)
|
||||
|
||||
|
||||
def _log_signal(signum: int, frame) -> None:
|
||||
|
|
@ -82,6 +103,8 @@ def _log_exit(reason: str) -> None:
|
|||
|
||||
|
||||
def main():
|
||||
_install_sidecar_publisher()
|
||||
|
||||
if not write_json({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "event",
|
||||
|
|
|
|||
81
tui_gateway/event_publisher.py
Normal file
81
tui_gateway/event_publisher.py
Normal file
|
|
@ -0,0 +1,81 @@
|
|||
"""Best-effort WebSocket publisher transport for the PTY-side gateway.
|
||||
|
||||
The dashboard's `/api/pty` spawns `hermes --tui` as a child process, which
|
||||
spawns its own ``tui_gateway.entry``. Tool/reasoning/status events fire on
|
||||
*that* gateway's transport — three processes removed from the dashboard
|
||||
server itself. To surface them in the dashboard sidebar (`/api/events`),
|
||||
the PTY-side gateway opens a back-WS to the dashboard at startup and
|
||||
mirrors every emit through this transport.
|
||||
|
||||
Wire protocol: newline-framed JSON dicts (the same shape the dispatcher
|
||||
already passes to ``write``). No JSON-RPC envelope here — the dashboard's
|
||||
``/api/pub`` endpoint just rebroadcasts the bytes verbatim to subscribers.
|
||||
|
||||
Failure mode: silent. The agent loop must never block waiting for the
|
||||
sidecar to drain. A dead WS short-circuits all subsequent writes.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
from typing import Optional
|
||||
|
||||
try:
|
||||
from websockets.sync.client import connect as ws_connect
|
||||
except ImportError: # pragma: no cover - websockets is a required install path
|
||||
ws_connect = None # type: ignore[assignment]
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WsPublisherTransport:
|
||||
__slots__ = ("_url", "_lock", "_ws", "_dead")
|
||||
|
||||
def __init__(self, url: str, *, connect_timeout: float = 2.0) -> None:
|
||||
self._url = url
|
||||
self._lock = threading.Lock()
|
||||
self._ws: Optional[object] = None
|
||||
self._dead = False
|
||||
|
||||
if ws_connect is None:
|
||||
self._dead = True
|
||||
|
||||
return
|
||||
|
||||
try:
|
||||
self._ws = ws_connect(url, open_timeout=connect_timeout, max_size=None)
|
||||
except Exception as exc:
|
||||
_log.debug("event publisher connect failed: %s", exc)
|
||||
self._dead = True
|
||||
self._ws = None
|
||||
|
||||
def write(self, obj: dict) -> bool:
|
||||
if self._dead or self._ws is None:
|
||||
return False
|
||||
|
||||
try:
|
||||
with self._lock:
|
||||
self._ws.send(json.dumps(obj, ensure_ascii=False)) # type: ignore[union-attr]
|
||||
|
||||
return True
|
||||
except Exception as exc:
|
||||
_log.debug("event publisher write failed: %s", exc)
|
||||
self._dead = True
|
||||
self._ws = None
|
||||
|
||||
return False
|
||||
|
||||
def close(self) -> None:
|
||||
self._dead = True
|
||||
|
||||
if self._ws is None:
|
||||
return
|
||||
|
||||
try:
|
||||
self._ws.close() # type: ignore[union-attr]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
self._ws = None
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
import atexit
|
||||
import concurrent.futures
|
||||
import contextvars
|
||||
import copy
|
||||
import json
|
||||
import logging
|
||||
|
|
@ -12,9 +13,17 @@ import time
|
|||
import uuid
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from hermes_constants import get_hermes_home
|
||||
from hermes_cli.env_loader import load_hermes_dotenv
|
||||
from tui_gateway.transport import (
|
||||
StdioTransport,
|
||||
Transport,
|
||||
bind_transport,
|
||||
current_transport,
|
||||
reset_transport,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -147,6 +156,11 @@ atexit.register(lambda: _pool.shutdown(wait=False, cancel_futures=True))
|
|||
_real_stdout = sys.stdout
|
||||
sys.stdout = sys.stderr
|
||||
|
||||
# Module-level stdio transport — fallback sink when no transport is bound via
|
||||
# contextvar or session. Stream resolved through a lambda so runtime monkey-
|
||||
# patches of `_real_stdout` (used extensively in tests) still land correctly.
|
||||
_stdio_transport = StdioTransport(lambda: _real_stdout, _stdout_lock)
|
||||
|
||||
|
||||
class _SlashWorker:
|
||||
"""Persistent HermesCLI subprocess for slash commands."""
|
||||
|
|
@ -266,14 +280,24 @@ def _db_unavailable_error(rid, *, code: int):
|
|||
|
||||
|
||||
def write_json(obj: dict) -> bool:
|
||||
line = json.dumps(obj, ensure_ascii=False) + "\n"
|
||||
try:
|
||||
with _stdout_lock:
|
||||
_real_stdout.write(line)
|
||||
_real_stdout.flush()
|
||||
return True
|
||||
except BrokenPipeError:
|
||||
return False
|
||||
"""Emit one JSON frame. Routes via the most-specific transport available.
|
||||
|
||||
Precedence:
|
||||
|
||||
1. Event frames with a session id → the transport stored on that session,
|
||||
so async events land with the client that owns the session even if
|
||||
the emitting thread has no contextvar binding.
|
||||
2. Otherwise the transport bound on the current context (set by
|
||||
:func:`dispatch` for the lifetime of a request).
|
||||
3. Otherwise the module-level stdio transport, matching the historical
|
||||
behaviour and keeping tests that monkey-patch ``_real_stdout`` green.
|
||||
"""
|
||||
if obj.get("method") == "event":
|
||||
sid = ((obj.get("params") or {}).get("session_id")) or ""
|
||||
if sid and (t := (_sessions.get(sid) or {}).get("transport")) is not None:
|
||||
return t.write(obj)
|
||||
|
||||
return (current_transport() or _stdio_transport).write(obj)
|
||||
|
||||
|
||||
def _emit(event: str, sid: str, payload: dict | None = None):
|
||||
|
|
@ -343,27 +367,40 @@ def handle_request(req: dict) -> dict | None:
|
|||
return fn(req.get("id"), req.get("params", {}))
|
||||
|
||||
|
||||
def dispatch(req: dict) -> dict | None:
|
||||
def dispatch(req: dict, transport: Optional[Transport] = None) -> dict | None:
|
||||
"""Route inbound RPCs — long handlers to the pool, everything else inline.
|
||||
|
||||
Returns a response dict when handled inline. Returns None when the
|
||||
handler was scheduled on the pool; the worker writes its own
|
||||
response via write_json when done.
|
||||
handler was scheduled on the pool; the worker writes its own response
|
||||
via the bound transport when done.
|
||||
|
||||
*transport* (optional): pins every write produced by this request —
|
||||
including any events emitted by the handler — to the given transport.
|
||||
Omitting it falls back to the module-level stdio transport, preserving
|
||||
the original behaviour for ``tui_gateway.entry``.
|
||||
"""
|
||||
if req.get("method") not in _LONG_HANDLERS:
|
||||
return handle_request(req)
|
||||
t = transport or _stdio_transport
|
||||
token = bind_transport(t)
|
||||
try:
|
||||
if req.get("method") not in _LONG_HANDLERS:
|
||||
return handle_request(req)
|
||||
|
||||
def run():
|
||||
try:
|
||||
resp = handle_request(req)
|
||||
except Exception as exc:
|
||||
resp = _err(req.get("id"), -32000, f"handler error: {exc}")
|
||||
if resp is not None:
|
||||
write_json(resp)
|
||||
# Snapshot the context so the pool worker sees the bound transport.
|
||||
ctx = contextvars.copy_context()
|
||||
|
||||
_pool.submit(run)
|
||||
def run():
|
||||
try:
|
||||
resp = handle_request(req)
|
||||
except Exception as exc:
|
||||
resp = _err(req.get("id"), -32000, f"handler error: {exc}")
|
||||
if resp is not None:
|
||||
t.write(resp)
|
||||
|
||||
return None
|
||||
_pool.submit(lambda: ctx.run(run))
|
||||
|
||||
return None
|
||||
finally:
|
||||
reset_transport(token)
|
||||
|
||||
|
||||
def _wait_agent(session: dict, rid: str, timeout: float = 30.0) -> dict | None:
|
||||
|
|
@ -1262,6 +1299,9 @@ def _init_session(sid: str, key: str, agent, history: list, cols: int = 80):
|
|||
"tool_progress_mode": _load_tool_progress_mode(),
|
||||
"edit_snapshots": {},
|
||||
"tool_started_at": {},
|
||||
# Pin async event emissions to whichever transport created the
|
||||
# session (stdio for Ink, JSON-RPC WS for the dashboard sidebar).
|
||||
"transport": current_transport() or _stdio_transport,
|
||||
}
|
||||
try:
|
||||
_sessions[sid]["slash_worker"] = _SlashWorker(
|
||||
|
|
@ -1404,6 +1444,7 @@ def _(rid, params: dict) -> dict:
|
|||
"slash_worker": None,
|
||||
"tool_progress_mode": _load_tool_progress_mode(),
|
||||
"tool_started_at": {},
|
||||
"transport": current_transport() or _stdio_transport,
|
||||
}
|
||||
|
||||
def _build() -> None:
|
||||
|
|
|
|||
125
tui_gateway/transport.py
Normal file
125
tui_gateway/transport.py
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
"""Transport abstraction for the tui_gateway JSON-RPC server.
|
||||
|
||||
Historically the gateway wrote every JSON frame directly to real stdout. This
|
||||
module decouples the I/O sink from the handler logic so the same dispatcher
|
||||
can be driven over stdio (``tui_gateway.entry``) or WebSocket
|
||||
(``tui_gateway.ws``) without duplicating code.
|
||||
|
||||
A :class:`Transport` is anything that can accept a JSON-serialisable dict and
|
||||
forward it to its peer. The active transport for the current request is
|
||||
tracked in a :class:`contextvars.ContextVar` so handlers — including those
|
||||
dispatched onto the worker pool — route their writes to the right peer.
|
||||
|
||||
Backward compatibility
|
||||
----------------------
|
||||
``tui_gateway.server.write_json`` still works without any transport bound.
|
||||
When nothing is on the contextvar and no session-level transport is found,
|
||||
it falls back to the module-level :class:`StdioTransport`, which wraps the
|
||||
original ``_real_stdout`` + ``_stdout_lock`` pair. Tests that monkey-patch
|
||||
``server._real_stdout`` continue to work because the stdio transport resolves
|
||||
the stream lazily through a callback.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import contextvars
|
||||
import json
|
||||
import threading
|
||||
from typing import Any, Callable, Optional, Protocol, runtime_checkable
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class Transport(Protocol):
|
||||
"""Minimal interface every transport implements."""
|
||||
|
||||
def write(self, obj: dict) -> bool:
|
||||
"""Emit one JSON frame. Return ``False`` when the peer is gone."""
|
||||
|
||||
def close(self) -> None:
|
||||
"""Release any resources owned by this transport."""
|
||||
|
||||
|
||||
_current_transport: contextvars.ContextVar[Optional[Transport]] = (
|
||||
contextvars.ContextVar(
|
||||
"hermes_gateway_transport",
|
||||
default=None,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def current_transport() -> Optional[Transport]:
|
||||
"""Return the transport bound for the current request, if any."""
|
||||
return _current_transport.get()
|
||||
|
||||
|
||||
def bind_transport(transport: Optional[Transport]):
|
||||
"""Bind *transport* for the current context. Returns a token for :func:`reset_transport`."""
|
||||
return _current_transport.set(transport)
|
||||
|
||||
|
||||
def reset_transport(token) -> None:
|
||||
"""Restore the transport binding captured by :func:`bind_transport`."""
|
||||
_current_transport.reset(token)
|
||||
|
||||
|
||||
class StdioTransport:
|
||||
"""Writes JSON frames to a stream (usually ``sys.stdout``).
|
||||
|
||||
The stream is resolved via a callable so runtime monkey-patches of the
|
||||
underlying stream continue to work — this preserves the behaviour the
|
||||
existing test suite relies on (``monkeypatch.setattr(server, "_real_stdout", ...)``).
|
||||
"""
|
||||
|
||||
__slots__ = ("_stream_getter", "_lock")
|
||||
|
||||
def __init__(self, stream_getter: Callable[[], Any], lock: threading.Lock) -> None:
|
||||
self._stream_getter = stream_getter
|
||||
self._lock = lock
|
||||
|
||||
def write(self, obj: dict) -> bool:
|
||||
line = json.dumps(obj, ensure_ascii=False) + "\n"
|
||||
try:
|
||||
with self._lock:
|
||||
stream = self._stream_getter()
|
||||
stream.write(line)
|
||||
stream.flush()
|
||||
return True
|
||||
except BrokenPipeError:
|
||||
return False
|
||||
|
||||
def close(self) -> None:
|
||||
return None
|
||||
|
||||
|
||||
class TeeTransport:
|
||||
"""Mirrors writes to one primary plus N best-effort secondaries.
|
||||
|
||||
The primary's return value (and exceptions) determine the result —
|
||||
secondaries swallow failures so a wedged sidecar never stalls the
|
||||
main IO path. Used by the PTY child so every dispatcher emit lands
|
||||
on stdio (Ink) AND on a back-WS feeding the dashboard sidebar.
|
||||
"""
|
||||
|
||||
__slots__ = ("_primary", "_secondaries")
|
||||
|
||||
def __init__(self, primary: "Transport", *secondaries: "Transport") -> None:
|
||||
self._primary = primary
|
||||
self._secondaries = secondaries
|
||||
|
||||
def write(self, obj: dict) -> bool:
|
||||
for sec in self._secondaries:
|
||||
try:
|
||||
sec.write(obj)
|
||||
except Exception:
|
||||
pass
|
||||
return self._primary.write(obj)
|
||||
|
||||
def close(self) -> None:
|
||||
try:
|
||||
self._primary.close()
|
||||
finally:
|
||||
for sec in self._secondaries:
|
||||
try:
|
||||
sec.close()
|
||||
except Exception:
|
||||
pass
|
||||
174
tui_gateway/ws.py
Normal file
174
tui_gateway/ws.py
Normal file
|
|
@ -0,0 +1,174 @@
|
|||
"""WebSocket transport for the tui_gateway JSON-RPC server.
|
||||
|
||||
Reuses :func:`tui_gateway.server.dispatch` verbatim so every RPC method, every
|
||||
slash command, every approval/clarify/sudo flow, and every agent event flows
|
||||
through the same handlers whether the client is Ink over stdio or an iOS /
|
||||
web client over WebSocket.
|
||||
|
||||
Wire protocol
|
||||
-------------
|
||||
Identical to stdio: newline-delimited JSON-RPC in both directions. The server
|
||||
emits a ``gateway.ready`` event immediately after connection accept, then
|
||||
echoes responses/events for inbound requests. No framing differences.
|
||||
|
||||
Mounting
|
||||
--------
|
||||
from fastapi import WebSocket
|
||||
from tui_gateway.ws import handle_ws
|
||||
|
||||
@app.websocket("/api/ws")
|
||||
async def ws(ws: WebSocket):
|
||||
await handle_ws(ws)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from tui_gateway import server
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
# Max seconds a pool-dispatched handler will block waiting for the event loop
|
||||
# to flush a WS frame before we mark the transport dead. Protects handler
|
||||
# threads from a wedged socket.
|
||||
_WS_WRITE_TIMEOUT_S = 10.0
|
||||
|
||||
# Keep starlette optional at import time; handle_ws uses the real class when
|
||||
# it's available and falls back to a generic Exception sentinel otherwise.
|
||||
try:
|
||||
from starlette.websockets import WebSocketDisconnect as _WebSocketDisconnect
|
||||
except ImportError: # pragma: no cover - starlette is a required install path
|
||||
_WebSocketDisconnect = Exception # type: ignore[assignment]
|
||||
|
||||
|
||||
class WSTransport:
|
||||
"""Per-connection WS transport.
|
||||
|
||||
``write`` is safe to call from any thread *other than* the event loop
|
||||
thread that owns the socket. Pool workers (the only real caller) run in
|
||||
their own threads, so marshalling onto the loop via
|
||||
:func:`asyncio.run_coroutine_threadsafe` + ``future.result()`` is correct
|
||||
and deadlock-free there.
|
||||
|
||||
When called from the loop thread itself (e.g. by ``handle_ws`` for an
|
||||
inline response) the same call would deadlock: we'd schedule work onto
|
||||
the loop we're currently blocking. We detect that case and fire-and-
|
||||
forget instead. Callers that need to know when the bytes are on the wire
|
||||
should use :meth:`write_async` from the loop thread.
|
||||
"""
|
||||
|
||||
def __init__(self, ws: Any, loop: asyncio.AbstractEventLoop) -> None:
|
||||
self._ws = ws
|
||||
self._loop = loop
|
||||
self._closed = False
|
||||
|
||||
def write(self, obj: dict) -> bool:
|
||||
if self._closed:
|
||||
return False
|
||||
|
||||
line = json.dumps(obj, ensure_ascii=False)
|
||||
|
||||
try:
|
||||
on_loop = asyncio.get_running_loop() is self._loop
|
||||
except RuntimeError:
|
||||
on_loop = False
|
||||
|
||||
if on_loop:
|
||||
# Fire-and-forget — don't block the loop waiting on itself.
|
||||
self._loop.create_task(self._safe_send(line))
|
||||
return True
|
||||
|
||||
try:
|
||||
fut = asyncio.run_coroutine_threadsafe(self._safe_send(line), self._loop)
|
||||
fut.result(timeout=_WS_WRITE_TIMEOUT_S)
|
||||
return not self._closed
|
||||
except Exception as exc:
|
||||
self._closed = True
|
||||
_log.debug("ws write failed: %s", exc)
|
||||
return False
|
||||
|
||||
async def write_async(self, obj: dict) -> bool:
|
||||
"""Send from the owning event loop. Awaits until the frame is on the wire."""
|
||||
if self._closed:
|
||||
return False
|
||||
await self._safe_send(json.dumps(obj, ensure_ascii=False))
|
||||
return not self._closed
|
||||
|
||||
async def _safe_send(self, line: str) -> None:
|
||||
try:
|
||||
await self._ws.send_text(line)
|
||||
except Exception as exc:
|
||||
self._closed = True
|
||||
_log.debug("ws send failed: %s", exc)
|
||||
|
||||
def close(self) -> None:
|
||||
self._closed = True
|
||||
|
||||
|
||||
async def handle_ws(ws: Any) -> None:
|
||||
"""Run one WebSocket session. Wire-compatible with ``tui_gateway.entry``."""
|
||||
await ws.accept()
|
||||
|
||||
transport = WSTransport(ws, asyncio.get_running_loop())
|
||||
|
||||
await transport.write_async(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"method": "event",
|
||||
"params": {
|
||||
"type": "gateway.ready",
|
||||
"payload": {"skin": server.resolve_skin()},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
raw = await ws.receive_text()
|
||||
except _WebSocketDisconnect:
|
||||
break
|
||||
|
||||
line = raw.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
try:
|
||||
req = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
ok = await transport.write_async(
|
||||
{
|
||||
"jsonrpc": "2.0",
|
||||
"error": {"code": -32700, "message": "parse error"},
|
||||
"id": None,
|
||||
}
|
||||
)
|
||||
if not ok:
|
||||
break
|
||||
continue
|
||||
|
||||
# dispatch() may schedule long handlers on the pool; it returns
|
||||
# None in that case and the worker writes the response itself via
|
||||
# the transport we pass in (a separate thread, so transport.write
|
||||
# is the safe path there). For inline handlers it returns the
|
||||
# response dict, which we write here from the loop.
|
||||
resp = await asyncio.to_thread(server.dispatch, req, transport)
|
||||
if resp is not None and not await transport.write_async(resp):
|
||||
break
|
||||
finally:
|
||||
transport.close()
|
||||
|
||||
# Detach the transport from any sessions it owned so later emits
|
||||
# fall back to stdio instead of crashing into a closed socket.
|
||||
for _, sess in list(server._sessions.items()):
|
||||
if sess.get("transport") is transport:
|
||||
sess["transport"] = server._stdio_transport
|
||||
|
||||
try:
|
||||
await ws.close()
|
||||
except Exception:
|
||||
pass
|
||||
Loading…
Add table
Add a link
Reference in a new issue