mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
feat(web): add /api/pty WebSocket bridge to embed TUI in dashboard
Exposes hermes --tui over a PTY-backed WebSocket so the dashboard can
embed the real TUI rather than reimplement its surface. The browser
attaches xterm.js to the socket; keystrokes flow in, PTY output bytes
flow out.
Architecture:
browser <Terminal> (xterm.js)
│ onData ───► ws.send(keystrokes)
│ onResize ► ws.send('\x1b[RESIZE:cols;rows]')
│ write ◄── ws.onmessage (PTY bytes)
▼
FastAPI /api/pty (token-gated, loopback-only)
▼
PtyBridge (ptyprocess) ── spawns node ui-tui/dist/entry.js ──► tui_gateway + AIAgent
Components
----------
hermes_cli/pty_bridge.py
Thin wrapper around ptyprocess.PtyProcess: byte-safe read/write on the
master fd via os.read/os.write (not PtyProcessUnicode — ANSI is
inherently byte-oriented and UTF-8 boundaries may land mid-read),
non-blocking select-based reads, TIOCSWINSZ resize, idempotent
SIGHUP→SIGTERM→SIGKILL teardown, platform guard (POSIX-only; Windows
is WSL-supported only).
hermes_cli/web_server.py
@app.websocket("/api/pty") endpoint gated by the existing
_SESSION_TOKEN (via ?token= query param since browsers can't set
Authorization on WS upgrades). Loopback-only enforcement. Reader task
uses run_in_executor to pump PTY bytes without blocking the event
loop. Writer loop intercepts a custom \x1b[RESIZE:cols;rows] escape
before forwarding to the PTY. The endpoint resolves the TUI argv
through a _resolve_chat_argv hook so tests can inject fake commands
without building the real TUI.
Tests
-----
tests/hermes_cli/test_pty_bridge.py — 12 unit tests: spawn, stdout,
stdin round-trip, EOF, resize (via TIOCSWINSZ + tput readback), close
idempotency, cwd, env forwarding, unavailable-platform error.
tests/hermes_cli/test_web_server.py — TestPtyWebSocket adds 7 tests:
missing/bad token rejection (close code 4401), stdout streaming,
stdin round-trip, resize escape forwarding, unavailable-platform ANSI
error frame + 1011 close, resume parameter forwarding to argv.
96 tests pass under scripts/run_tests.sh.
This commit is contained in:
parent
62cbeb6367
commit
29b337bca7
4 changed files with 719 additions and 1 deletions
221
hermes_cli/pty_bridge.py
Normal file
221
hermes_cli/pty_bridge.py
Normal file
|
|
@ -0,0 +1,221 @@
|
||||||
|
"""PTY bridge for `hermes dashboard` chat tab.
|
||||||
|
|
||||||
|
Wraps a child process behind a pseudo-terminal so its ANSI output can be
|
||||||
|
streamed to a browser-side terminal emulator (xterm.js) and typed
|
||||||
|
keystrokes can be fed back in. The only caller today is the
|
||||||
|
``/api/pty`` WebSocket endpoint in ``hermes_cli.web_server``.
|
||||||
|
|
||||||
|
Design constraints:
|
||||||
|
|
||||||
|
* **POSIX-only.** Hermes Agent supports Windows exclusively via WSL, which
|
||||||
|
exposes a native POSIX PTY via ``openpty(3)``. Native Windows Python
|
||||||
|
has no PTY; :class:`PtyUnavailableError` is raised with a user-readable
|
||||||
|
install/platform message so the dashboard can render a banner instead of
|
||||||
|
crashing.
|
||||||
|
* **Zero Node dependency on the server side.** We use :mod:`ptyprocess`,
|
||||||
|
which is a pure-Python wrapper around the OS calls. The browser talks
|
||||||
|
to the same ``hermes --tui`` binary it would launch from the CLI, so
|
||||||
|
every TUI feature (slash popover, model picker, tool rows, markdown,
|
||||||
|
skin engine, clarify/sudo/approval prompts) ships automatically.
|
||||||
|
* **Byte-safe I/O.** Reads and writes go through the PTY master fd
|
||||||
|
directly — we avoid :class:`ptyprocess.PtyProcessUnicode` because
|
||||||
|
streaming ANSI is inherently byte-oriented and UTF-8 boundaries may land
|
||||||
|
mid-read.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import errno
|
||||||
|
import fcntl
|
||||||
|
import os
|
||||||
|
import select
|
||||||
|
import signal
|
||||||
|
import struct
|
||||||
|
import sys
|
||||||
|
import termios
|
||||||
|
import time
|
||||||
|
from typing import Optional, Sequence
|
||||||
|
|
||||||
|
try:
|
||||||
|
import ptyprocess # type: ignore
|
||||||
|
_PTY_AVAILABLE = not sys.platform.startswith("win")
|
||||||
|
except ImportError: # pragma: no cover - dev env without ptyprocess
|
||||||
|
ptyprocess = None # type: ignore
|
||||||
|
_PTY_AVAILABLE = False
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = ["PtyBridge", "PtyUnavailableError"]
|
||||||
|
|
||||||
|
|
||||||
|
class PtyUnavailableError(RuntimeError):
|
||||||
|
"""Raised when a PTY cannot be created on this platform.
|
||||||
|
|
||||||
|
Today this means native Windows (no ConPTY bindings) or a dev
|
||||||
|
environment missing the ``ptyprocess`` dependency. The dashboard
|
||||||
|
surfaces the message to the user as a chat-tab banner.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class PtyBridge:
|
||||||
|
"""Thin wrapper around ``ptyprocess.PtyProcess`` for byte streaming.
|
||||||
|
|
||||||
|
Not thread-safe. A single bridge is owned by the WebSocket handler
|
||||||
|
that spawned it; the reader runs in an executor thread while writes
|
||||||
|
happen on the event-loop thread. Both sides are OK because the
|
||||||
|
kernel PTY is the actual synchronization point — we never call
|
||||||
|
:mod:`ptyprocess` methods concurrently, we only call ``os.read`` and
|
||||||
|
``os.write`` on the master fd, which is safe.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, proc: "ptyprocess.PtyProcess"): # type: ignore[name-defined]
|
||||||
|
self._proc = proc
|
||||||
|
self._fd: int = proc.fd
|
||||||
|
self._closed = False
|
||||||
|
|
||||||
|
# -- lifecycle --------------------------------------------------------
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def is_available(cls) -> bool:
|
||||||
|
"""True if a PTY can be spawned on this platform."""
|
||||||
|
return bool(_PTY_AVAILABLE)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def spawn(
|
||||||
|
cls,
|
||||||
|
argv: Sequence[str],
|
||||||
|
*,
|
||||||
|
cwd: Optional[str] = None,
|
||||||
|
env: Optional[dict] = None,
|
||||||
|
cols: int = 80,
|
||||||
|
rows: int = 24,
|
||||||
|
) -> "PtyBridge":
|
||||||
|
"""Spawn ``argv`` behind a new PTY and return a bridge.
|
||||||
|
|
||||||
|
Raises :class:`PtyUnavailableError` if the platform can't host a
|
||||||
|
PTY. Raises :class:`FileNotFoundError` or :class:`OSError` for
|
||||||
|
ordinary exec failures (missing binary, bad cwd, etc.).
|
||||||
|
"""
|
||||||
|
if not _PTY_AVAILABLE:
|
||||||
|
raise PtyUnavailableError(
|
||||||
|
"Pseudo-terminals are unavailable on this platform. "
|
||||||
|
"Hermes Agent supports Windows only via WSL."
|
||||||
|
)
|
||||||
|
# Let caller-supplied env fully override inheritance; if they pass
|
||||||
|
# None we inherit the server's env (same semantics as subprocess).
|
||||||
|
spawn_env = os.environ.copy() if env is None else env
|
||||||
|
proc = ptyprocess.PtyProcess.spawn( # type: ignore[union-attr]
|
||||||
|
list(argv),
|
||||||
|
cwd=cwd,
|
||||||
|
env=spawn_env,
|
||||||
|
dimensions=(rows, cols),
|
||||||
|
)
|
||||||
|
return cls(proc)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pid(self) -> int:
|
||||||
|
return int(self._proc.pid)
|
||||||
|
|
||||||
|
def is_alive(self) -> bool:
|
||||||
|
if self._closed:
|
||||||
|
return False
|
||||||
|
try:
|
||||||
|
return bool(self._proc.isalive())
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# -- I/O --------------------------------------------------------------
|
||||||
|
|
||||||
|
def read(self, timeout: float = 0.2) -> Optional[bytes]:
|
||||||
|
"""Read up to 64 KiB of raw bytes from the PTY master.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
* bytes — zero or more bytes of child output
|
||||||
|
* empty bytes (``b""``) — no data available within ``timeout``
|
||||||
|
* None — child has exited and the master fd is at EOF
|
||||||
|
|
||||||
|
Never blocks longer than ``timeout`` seconds. Safe to call after
|
||||||
|
:meth:`close`; returns ``None`` in that case.
|
||||||
|
"""
|
||||||
|
if self._closed:
|
||||||
|
return None
|
||||||
|
try:
|
||||||
|
readable, _, _ = select.select([self._fd], [], [], timeout)
|
||||||
|
except (OSError, ValueError):
|
||||||
|
return None
|
||||||
|
if not readable:
|
||||||
|
return b""
|
||||||
|
try:
|
||||||
|
data = os.read(self._fd, 65536)
|
||||||
|
except OSError as exc:
|
||||||
|
# EIO on Linux = slave side closed. EBADF = already closed.
|
||||||
|
if exc.errno in (errno.EIO, errno.EBADF):
|
||||||
|
return None
|
||||||
|
raise
|
||||||
|
if not data:
|
||||||
|
return None
|
||||||
|
return data
|
||||||
|
|
||||||
|
def write(self, data: bytes) -> None:
|
||||||
|
"""Write raw bytes to the PTY master (i.e. the child's stdin)."""
|
||||||
|
if self._closed or not data:
|
||||||
|
return
|
||||||
|
# os.write can return a short write under load; loop until drained.
|
||||||
|
view = memoryview(data)
|
||||||
|
while view:
|
||||||
|
try:
|
||||||
|
n = os.write(self._fd, view)
|
||||||
|
except OSError as exc:
|
||||||
|
if exc.errno in (errno.EIO, errno.EBADF, errno.EPIPE):
|
||||||
|
return
|
||||||
|
raise
|
||||||
|
if n <= 0:
|
||||||
|
return
|
||||||
|
view = view[n:]
|
||||||
|
|
||||||
|
def resize(self, cols: int, rows: int) -> None:
|
||||||
|
"""Forward a terminal resize to the child via ``TIOCSWINSZ``."""
|
||||||
|
if self._closed:
|
||||||
|
return
|
||||||
|
# struct winsize: rows, cols, xpixel, ypixel (all unsigned short)
|
||||||
|
winsize = struct.pack("HHHH", max(1, rows), max(1, cols), 0, 0)
|
||||||
|
try:
|
||||||
|
fcntl.ioctl(self._fd, termios.TIOCSWINSZ, winsize)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# -- teardown ---------------------------------------------------------
|
||||||
|
|
||||||
|
def close(self) -> None:
|
||||||
|
"""Terminate the child (SIGTERM → 0.5s grace → SIGKILL) and close fds.
|
||||||
|
|
||||||
|
Idempotent. Reaping the child is important so we don't leak
|
||||||
|
zombies across the lifetime of the dashboard process.
|
||||||
|
"""
|
||||||
|
if self._closed:
|
||||||
|
return
|
||||||
|
self._closed = True
|
||||||
|
|
||||||
|
# SIGHUP is the conventional "your terminal went away" signal.
|
||||||
|
# We escalate if the child ignores it.
|
||||||
|
for sig in (signal.SIGHUP, signal.SIGTERM, signal.SIGKILL):
|
||||||
|
if not self._proc.isalive():
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
self._proc.kill(sig)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
deadline = time.monotonic() + 0.5
|
||||||
|
while self._proc.isalive() and time.monotonic() < deadline:
|
||||||
|
time.sleep(0.02)
|
||||||
|
|
||||||
|
try:
|
||||||
|
self._proc.close(force=True)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Context-manager sugar — handy in tests and ad-hoc scripts.
|
||||||
|
def __enter__(self) -> "PtyBridge":
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, *_exc) -> None:
|
||||||
|
self.close()
|
||||||
|
|
@ -48,7 +48,7 @@ from hermes_cli.config import (
|
||||||
from gateway.status import get_running_pid, read_runtime_status
|
from gateway.status import get_running_pid, read_runtime_status
|
||||||
|
|
||||||
try:
|
try:
|
||||||
from fastapi import FastAPI, HTTPException, Request
|
from fastapi import FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect
|
||||||
from fastapi.middleware.cors import CORSMiddleware
|
from fastapi.middleware.cors import CORSMiddleware
|
||||||
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
|
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse
|
||||||
from fastapi.staticfiles import StaticFiles
|
from fastapi.staticfiles import StaticFiles
|
||||||
|
|
@ -2021,6 +2021,148 @@ async def get_usage_analytics(days: int = 30):
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# /api/pty — PTY-over-WebSocket bridge for the dashboard "Chat" tab.
|
||||||
|
#
|
||||||
|
# The endpoint spawns the same ``hermes --tui`` binary the CLI uses, behind
|
||||||
|
# a POSIX pseudo-terminal, and forwards bytes + resize escapes across a
|
||||||
|
# WebSocket. The browser renders the ANSI through xterm.js (see
|
||||||
|
# web/src/pages/ChatPage.tsx).
|
||||||
|
#
|
||||||
|
# Auth: ``?token=<session_token>`` query param (browsers can't set
|
||||||
|
# Authorization on the WS upgrade). Same ephemeral ``_SESSION_TOKEN`` as
|
||||||
|
# REST. Localhost-only — we defensively reject non-loopback clients even
|
||||||
|
# though uvicorn binds to 127.0.0.1.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
import re
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from hermes_cli.pty_bridge import PtyBridge, PtyUnavailableError
|
||||||
|
|
||||||
|
_RESIZE_RE = re.compile(rb"\x1b\[RESIZE:(\d+);(\d+)\]")
|
||||||
|
_PTY_READ_CHUNK_TIMEOUT = 0.2
|
||||||
|
# Starlette's TestClient reports the peer as "testclient"; treat it as
|
||||||
|
# loopback so tests don't need to rewrite request scope.
|
||||||
|
_LOOPBACK_HOSTS = frozenset({"127.0.0.1", "::1", "localhost", "testclient"})
|
||||||
|
|
||||||
|
|
||||||
|
def _resolve_chat_argv(
|
||||||
|
resume: Optional[str] = None,
|
||||||
|
) -> tuple[list[str], Optional[str], Optional[dict]]:
|
||||||
|
"""Resolve the argv + cwd + env for the chat PTY.
|
||||||
|
|
||||||
|
Default: whatever ``hermes --tui`` would run. Tests monkeypatch this
|
||||||
|
function to inject a tiny fake command (``cat``, ``sh -c 'printf …'``)
|
||||||
|
so nothing has to build Node or the TUI bundle.
|
||||||
|
|
||||||
|
Session resume is propagated via the ``HERMES_TUI_RESUME`` env var —
|
||||||
|
matching what ``hermes_cli.main._launch_tui`` does for the CLI path.
|
||||||
|
Appending ``--resume <id>`` to argv doesn't work because ``ui-tui`` does
|
||||||
|
not parse its argv.
|
||||||
|
"""
|
||||||
|
from hermes_cli.main import PROJECT_ROOT, _make_tui_argv
|
||||||
|
|
||||||
|
argv, cwd = _make_tui_argv(PROJECT_ROOT / "ui-tui", tui_dev=False)
|
||||||
|
env: Optional[dict] = None
|
||||||
|
if resume:
|
||||||
|
env = os.environ.copy()
|
||||||
|
env["HERMES_TUI_RESUME"] = resume
|
||||||
|
return list(argv), str(cwd) if cwd else None, env
|
||||||
|
|
||||||
|
|
||||||
|
@app.websocket("/api/pty")
|
||||||
|
async def pty_ws(ws: WebSocket) -> None:
|
||||||
|
# --- auth + loopback check (before accept so we can close cleanly) ---
|
||||||
|
token = ws.query_params.get("token", "")
|
||||||
|
expected = _SESSION_TOKEN
|
||||||
|
if not hmac.compare_digest(token.encode(), expected.encode()):
|
||||||
|
await ws.close(code=4401)
|
||||||
|
return
|
||||||
|
|
||||||
|
client_host = ws.client.host if ws.client else ""
|
||||||
|
if client_host and client_host not in _LOOPBACK_HOSTS:
|
||||||
|
await ws.close(code=4403)
|
||||||
|
return
|
||||||
|
|
||||||
|
await ws.accept()
|
||||||
|
|
||||||
|
# --- spawn PTY ------------------------------------------------------
|
||||||
|
resume = ws.query_params.get("resume") or None
|
||||||
|
try:
|
||||||
|
argv, cwd, env = _resolve_chat_argv(resume=resume)
|
||||||
|
except SystemExit as exc:
|
||||||
|
# _make_tui_argv calls sys.exit(1) when node/npm is missing.
|
||||||
|
await ws.send_text(f"\r\n\x1b[31mChat unavailable: {exc}\x1b[0m\r\n")
|
||||||
|
await ws.close(code=1011)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
bridge = PtyBridge.spawn(argv, cwd=cwd, env=env)
|
||||||
|
except PtyUnavailableError as exc:
|
||||||
|
await ws.send_text(f"\r\n\x1b[31mChat unavailable: {exc}\x1b[0m\r\n")
|
||||||
|
await ws.close(code=1011)
|
||||||
|
return
|
||||||
|
except (FileNotFoundError, OSError) as exc:
|
||||||
|
await ws.send_text(f"\r\n\x1b[31mChat failed to start: {exc}\x1b[0m\r\n")
|
||||||
|
await ws.close(code=1011)
|
||||||
|
return
|
||||||
|
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
|
||||||
|
# --- reader task: PTY master → WebSocket ----------------------------
|
||||||
|
async def pump_pty_to_ws() -> None:
|
||||||
|
while True:
|
||||||
|
chunk = await loop.run_in_executor(
|
||||||
|
None, bridge.read, _PTY_READ_CHUNK_TIMEOUT
|
||||||
|
)
|
||||||
|
if chunk is None: # EOF
|
||||||
|
return
|
||||||
|
if not chunk: # no data this tick; yield control and retry
|
||||||
|
await asyncio.sleep(0)
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
await ws.send_bytes(chunk)
|
||||||
|
except Exception:
|
||||||
|
return
|
||||||
|
|
||||||
|
reader_task = asyncio.create_task(pump_pty_to_ws())
|
||||||
|
|
||||||
|
# --- writer loop: WebSocket → PTY master ----------------------------
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
msg = await ws.receive()
|
||||||
|
msg_type = msg.get("type")
|
||||||
|
if msg_type == "websocket.disconnect":
|
||||||
|
break
|
||||||
|
raw = msg.get("bytes")
|
||||||
|
if raw is None:
|
||||||
|
text = msg.get("text")
|
||||||
|
raw = text.encode("utf-8") if isinstance(text, str) else b""
|
||||||
|
if not raw:
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Resize escape is consumed locally, never written to the PTY.
|
||||||
|
match = _RESIZE_RE.match(raw)
|
||||||
|
if match and match.end() == len(raw):
|
||||||
|
cols = int(match.group(1))
|
||||||
|
rows = int(match.group(2))
|
||||||
|
bridge.resize(cols=cols, rows=rows)
|
||||||
|
continue
|
||||||
|
|
||||||
|
bridge.write(raw)
|
||||||
|
except WebSocketDisconnect:
|
||||||
|
pass
|
||||||
|
finally:
|
||||||
|
reader_task.cancel()
|
||||||
|
try:
|
||||||
|
await reader_task
|
||||||
|
except (asyncio.CancelledError, Exception):
|
||||||
|
pass
|
||||||
|
bridge.close()
|
||||||
|
|
||||||
|
|
||||||
def mount_spa(application: FastAPI):
|
def mount_spa(application: FastAPI):
|
||||||
"""Mount the built SPA. Falls back to index.html for client-side routing.
|
"""Mount the built SPA. Falls back to index.html for client-side routing.
|
||||||
|
|
||||||
|
|
|
||||||
172
tests/hermes_cli/test_pty_bridge.py
Normal file
172
tests/hermes_cli/test_pty_bridge.py
Normal file
|
|
@ -0,0 +1,172 @@
|
||||||
|
"""Unit tests for hermes_cli.pty_bridge — PTY spawning + byte forwarding.
|
||||||
|
|
||||||
|
These tests drive the bridge with minimal POSIX processes (echo, env, sleep,
|
||||||
|
printf) to verify it behaves like a PTY you can read/write/resize/close.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
pytest.importorskip("ptyprocess", reason="ptyprocess not installed")
|
||||||
|
|
||||||
|
from hermes_cli.pty_bridge import PtyBridge, PtyUnavailableError
|
||||||
|
|
||||||
|
|
||||||
|
skip_on_windows = pytest.mark.skipif(
|
||||||
|
sys.platform.startswith("win"), reason="PTY bridge is POSIX-only"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _read_until(bridge: PtyBridge, needle: bytes, timeout: float = 5.0) -> bytes:
|
||||||
|
"""Accumulate PTY output until we see `needle` or time out."""
|
||||||
|
deadline = time.monotonic() + timeout
|
||||||
|
buf = bytearray()
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
chunk = bridge.read(timeout=0.2)
|
||||||
|
if chunk is None:
|
||||||
|
break
|
||||||
|
buf.extend(chunk)
|
||||||
|
if needle in buf:
|
||||||
|
return bytes(buf)
|
||||||
|
return bytes(buf)
|
||||||
|
|
||||||
|
|
||||||
|
@skip_on_windows
|
||||||
|
class TestPtyBridgeSpawn:
|
||||||
|
def test_is_available_on_posix(self):
|
||||||
|
assert PtyBridge.is_available() is True
|
||||||
|
|
||||||
|
def test_spawn_returns_bridge_with_pid(self):
|
||||||
|
bridge = PtyBridge.spawn(["true"])
|
||||||
|
try:
|
||||||
|
assert bridge.pid > 0
|
||||||
|
finally:
|
||||||
|
bridge.close()
|
||||||
|
|
||||||
|
def test_spawn_raises_on_missing_argv0(self, tmp_path):
|
||||||
|
with pytest.raises((FileNotFoundError, OSError)):
|
||||||
|
PtyBridge.spawn([str(tmp_path / "definitely-not-a-real-binary")])
|
||||||
|
|
||||||
|
|
||||||
|
@skip_on_windows
|
||||||
|
class TestPtyBridgeIO:
|
||||||
|
def test_reads_child_stdout(self):
|
||||||
|
bridge = PtyBridge.spawn(["/bin/sh", "-c", "printf hermes-ok"])
|
||||||
|
try:
|
||||||
|
output = _read_until(bridge, b"hermes-ok")
|
||||||
|
assert b"hermes-ok" in output
|
||||||
|
finally:
|
||||||
|
bridge.close()
|
||||||
|
|
||||||
|
def test_write_sends_to_child_stdin(self):
|
||||||
|
# `cat` with no args echoes stdin back to stdout. We write a line,
|
||||||
|
# read it back, then signal EOF to let cat exit cleanly.
|
||||||
|
bridge = PtyBridge.spawn(["/bin/cat"])
|
||||||
|
try:
|
||||||
|
bridge.write(b"hello-pty\n")
|
||||||
|
output = _read_until(bridge, b"hello-pty")
|
||||||
|
assert b"hello-pty" in output
|
||||||
|
finally:
|
||||||
|
bridge.close()
|
||||||
|
|
||||||
|
def test_read_returns_none_after_child_exits(self):
|
||||||
|
bridge = PtyBridge.spawn(["/bin/sh", "-c", "printf done"])
|
||||||
|
try:
|
||||||
|
_read_until(bridge, b"done")
|
||||||
|
# Give the child a beat to exit cleanly, then drain until EOF.
|
||||||
|
deadline = time.monotonic() + 3.0
|
||||||
|
while bridge.is_alive() and time.monotonic() < deadline:
|
||||||
|
bridge.read(timeout=0.1)
|
||||||
|
# Next reads after exit should return None (EOF), not raise.
|
||||||
|
got_none = False
|
||||||
|
for _ in range(10):
|
||||||
|
if bridge.read(timeout=0.1) is None:
|
||||||
|
got_none = True
|
||||||
|
break
|
||||||
|
assert got_none, "PtyBridge.read did not return None after child EOF"
|
||||||
|
finally:
|
||||||
|
bridge.close()
|
||||||
|
|
||||||
|
|
||||||
|
@skip_on_windows
|
||||||
|
class TestPtyBridgeResize:
|
||||||
|
def test_resize_updates_child_winsize(self):
|
||||||
|
# tput reads COLUMNS/LINES from the TTY ioctl (TIOCGWINSZ).
|
||||||
|
# Spawn a shell, resize, then ask tput for the dimensions.
|
||||||
|
bridge = PtyBridge.spawn(
|
||||||
|
["/bin/sh", "-c", "sleep 0.1; tput cols; tput lines"],
|
||||||
|
cols=80,
|
||||||
|
rows=24,
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
bridge.resize(cols=123, rows=45)
|
||||||
|
output = _read_until(bridge, b"45", timeout=5.0)
|
||||||
|
# tput prints just the numbers, one per line
|
||||||
|
assert b"123" in output
|
||||||
|
assert b"45" in output
|
||||||
|
finally:
|
||||||
|
bridge.close()
|
||||||
|
|
||||||
|
|
||||||
|
@skip_on_windows
|
||||||
|
class TestPtyBridgeClose:
|
||||||
|
def test_close_is_idempotent(self):
|
||||||
|
bridge = PtyBridge.spawn(["/bin/sh", "-c", "sleep 30"])
|
||||||
|
bridge.close()
|
||||||
|
bridge.close() # must not raise
|
||||||
|
assert not bridge.is_alive()
|
||||||
|
|
||||||
|
def test_close_terminates_long_running_child(self):
|
||||||
|
bridge = PtyBridge.spawn(["/bin/sh", "-c", "sleep 30"])
|
||||||
|
pid = bridge.pid
|
||||||
|
bridge.close()
|
||||||
|
# Give the kernel a moment to reap
|
||||||
|
deadline = time.monotonic() + 3.0
|
||||||
|
reaped = False
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
try:
|
||||||
|
os.kill(pid, 0)
|
||||||
|
time.sleep(0.05)
|
||||||
|
except ProcessLookupError:
|
||||||
|
reaped = True
|
||||||
|
break
|
||||||
|
assert reaped, f"pid {pid} still running after close()"
|
||||||
|
|
||||||
|
|
||||||
|
@skip_on_windows
|
||||||
|
class TestPtyBridgeEnv:
|
||||||
|
def test_cwd_is_respected(self, tmp_path):
|
||||||
|
bridge = PtyBridge.spawn(
|
||||||
|
["/bin/sh", "-c", "pwd"],
|
||||||
|
cwd=str(tmp_path),
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
output = _read_until(bridge, str(tmp_path).encode())
|
||||||
|
assert str(tmp_path).encode() in output
|
||||||
|
finally:
|
||||||
|
bridge.close()
|
||||||
|
|
||||||
|
def test_env_is_forwarded(self):
|
||||||
|
bridge = PtyBridge.spawn(
|
||||||
|
["/bin/sh", "-c", "printf %s \"$HERMES_PTY_TEST\""],
|
||||||
|
env={**os.environ, "HERMES_PTY_TEST": "pty-env-works"},
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
output = _read_until(bridge, b"pty-env-works")
|
||||||
|
assert b"pty-env-works" in output
|
||||||
|
finally:
|
||||||
|
bridge.close()
|
||||||
|
|
||||||
|
|
||||||
|
class TestPtyBridgeUnavailable:
|
||||||
|
"""Platform fallback semantics — PtyUnavailableError is importable and
|
||||||
|
carries a user-readable message."""
|
||||||
|
|
||||||
|
def test_error_carries_user_message(self):
|
||||||
|
err = PtyUnavailableError("platform not supported")
|
||||||
|
assert "platform" in str(err)
|
||||||
|
|
@ -1255,3 +1255,186 @@ class TestStatusRemoteGateway:
|
||||||
assert data["gateway_running"] is True
|
assert data["gateway_running"] is True
|
||||||
assert data["gateway_pid"] is None
|
assert data["gateway_pid"] is None
|
||||||
assert data["gateway_state"] == "running"
|
assert data["gateway_state"] == "running"
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# /api/pty WebSocket — terminal bridge for the dashboard "Chat" tab.
|
||||||
|
#
|
||||||
|
# These tests drive the endpoint with a tiny fake command (typically ``cat``
|
||||||
|
# or ``sh -c 'printf …'``) instead of the real ``hermes --tui`` binary. The
|
||||||
|
# endpoint resolves its argv through ``_resolve_chat_argv``, so tests
|
||||||
|
# monkeypatch that hook.
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
skip_on_windows = pytest.mark.skipif(
|
||||||
|
sys.platform.startswith("win"), reason="PTY bridge is POSIX-only"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@skip_on_windows
|
||||||
|
class TestPtyWebSocket:
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
def _setup(self, monkeypatch, _isolate_hermes_home):
|
||||||
|
from starlette.testclient import TestClient
|
||||||
|
|
||||||
|
import hermes_cli.web_server as ws
|
||||||
|
|
||||||
|
# Avoid exec'ing the actual TUI in tests: every test below installs
|
||||||
|
# its own fake argv via ``ws._resolve_chat_argv``.
|
||||||
|
self.ws_module = ws
|
||||||
|
self.token = ws._SESSION_TOKEN
|
||||||
|
self.client = TestClient(ws.app)
|
||||||
|
|
||||||
|
def _url(self, token: str | None = None, **params: str) -> str:
|
||||||
|
tok = token if token is not None else self.token
|
||||||
|
# TestClient.websocket_connect takes the path; it reconstructs the
|
||||||
|
# query string, so we pass it inline.
|
||||||
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
|
q = {"token": tok, **params}
|
||||||
|
return f"/api/pty?{urlencode(q)}"
|
||||||
|
|
||||||
|
def test_rejects_missing_token(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
self.ws_module,
|
||||||
|
"_resolve_chat_argv",
|
||||||
|
lambda resume=None: (["/bin/cat"], None, None),
|
||||||
|
)
|
||||||
|
from starlette.websockets import WebSocketDisconnect
|
||||||
|
|
||||||
|
with pytest.raises(WebSocketDisconnect) as exc:
|
||||||
|
with self.client.websocket_connect("/api/pty"):
|
||||||
|
pass
|
||||||
|
assert exc.value.code == 4401
|
||||||
|
|
||||||
|
def test_rejects_bad_token(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
self.ws_module,
|
||||||
|
"_resolve_chat_argv",
|
||||||
|
lambda resume=None: (["/bin/cat"], None, None),
|
||||||
|
)
|
||||||
|
from starlette.websockets import WebSocketDisconnect
|
||||||
|
|
||||||
|
with pytest.raises(WebSocketDisconnect) as exc:
|
||||||
|
with self.client.websocket_connect(self._url(token="wrong")):
|
||||||
|
pass
|
||||||
|
assert exc.value.code == 4401
|
||||||
|
|
||||||
|
def test_streams_child_stdout_to_client(self, monkeypatch):
|
||||||
|
monkeypatch.setattr(
|
||||||
|
self.ws_module,
|
||||||
|
"_resolve_chat_argv",
|
||||||
|
lambda resume=None: (
|
||||||
|
["/bin/sh", "-c", "printf hermes-ws-ok"],
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
with self.client.websocket_connect(self._url()) as conn:
|
||||||
|
# Drain frames until we see the needle or time out. TestClient's
|
||||||
|
# recv_bytes blocks; loop until we have the signal byte string.
|
||||||
|
buf = b""
|
||||||
|
import time
|
||||||
|
|
||||||
|
deadline = time.monotonic() + 5.0
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
try:
|
||||||
|
frame = conn.receive_bytes()
|
||||||
|
except Exception:
|
||||||
|
break
|
||||||
|
if frame:
|
||||||
|
buf += frame
|
||||||
|
if b"hermes-ws-ok" in buf:
|
||||||
|
break
|
||||||
|
assert b"hermes-ws-ok" in buf
|
||||||
|
|
||||||
|
def test_client_input_reaches_child_stdin(self, monkeypatch):
|
||||||
|
# ``cat`` echoes stdin back, so a write → read round-trip proves
|
||||||
|
# the full duplex path.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
self.ws_module,
|
||||||
|
"_resolve_chat_argv",
|
||||||
|
lambda resume=None: (["/bin/cat"], None, None),
|
||||||
|
)
|
||||||
|
with self.client.websocket_connect(self._url()) as conn:
|
||||||
|
conn.send_bytes(b"round-trip-payload\n")
|
||||||
|
buf = b""
|
||||||
|
import time
|
||||||
|
|
||||||
|
deadline = time.monotonic() + 5.0
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
frame = conn.receive_bytes()
|
||||||
|
if frame:
|
||||||
|
buf += frame
|
||||||
|
if b"round-trip-payload" in buf:
|
||||||
|
break
|
||||||
|
assert b"round-trip-payload" in buf
|
||||||
|
|
||||||
|
def test_resize_escape_is_forwarded(self, monkeypatch):
|
||||||
|
# Resize escape gets intercepted and applied via TIOCSWINSZ,
|
||||||
|
# then ``tput cols/lines`` reports the new dimensions back.
|
||||||
|
monkeypatch.setattr(
|
||||||
|
self.ws_module,
|
||||||
|
"_resolve_chat_argv",
|
||||||
|
# sleep gives the test time to push the resize before tput runs
|
||||||
|
lambda resume=None: (
|
||||||
|
["/bin/sh", "-c", "sleep 0.15; tput cols; tput lines"],
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
with self.client.websocket_connect(self._url()) as conn:
|
||||||
|
conn.send_text("\x1b[RESIZE:99;41]")
|
||||||
|
buf = b""
|
||||||
|
import time
|
||||||
|
|
||||||
|
deadline = time.monotonic() + 5.0
|
||||||
|
while time.monotonic() < deadline:
|
||||||
|
frame = conn.receive_bytes()
|
||||||
|
if frame:
|
||||||
|
buf += frame
|
||||||
|
if b"99" in buf and b"41" in buf:
|
||||||
|
break
|
||||||
|
assert b"99" in buf and b"41" in buf
|
||||||
|
|
||||||
|
def test_unavailable_platform_closes_with_message(self, monkeypatch):
|
||||||
|
from hermes_cli.pty_bridge import PtyUnavailableError
|
||||||
|
|
||||||
|
def _raise(argv, **kwargs):
|
||||||
|
raise PtyUnavailableError("pty missing for tests")
|
||||||
|
|
||||||
|
monkeypatch.setattr(
|
||||||
|
self.ws_module,
|
||||||
|
"_resolve_chat_argv",
|
||||||
|
lambda resume=None: (["/bin/cat"], None, None),
|
||||||
|
)
|
||||||
|
# Patch PtyBridge.spawn at the web_server module's binding.
|
||||||
|
import hermes_cli.web_server as ws_mod
|
||||||
|
|
||||||
|
monkeypatch.setattr(ws_mod.PtyBridge, "spawn", classmethod(lambda cls, *a, **k: _raise(*a, **k)))
|
||||||
|
|
||||||
|
with self.client.websocket_connect(self._url()) as conn:
|
||||||
|
# Expect a final text frame with the error message, then close.
|
||||||
|
msg = conn.receive_text()
|
||||||
|
assert "pty missing" in msg or "unavailable" in msg.lower() or "pty" in msg.lower()
|
||||||
|
|
||||||
|
def test_resume_parameter_is_forwarded_to_argv(self, monkeypatch):
|
||||||
|
captured: dict = {}
|
||||||
|
|
||||||
|
def fake_resolve(resume=None):
|
||||||
|
captured["resume"] = resume
|
||||||
|
return (["/bin/sh", "-c", "printf resume-arg-ok"], None, None)
|
||||||
|
|
||||||
|
monkeypatch.setattr(self.ws_module, "_resolve_chat_argv", fake_resolve)
|
||||||
|
|
||||||
|
with self.client.websocket_connect(self._url(resume="sess-42")) as conn:
|
||||||
|
# Drain briefly so the handler actually invokes the resolver.
|
||||||
|
try:
|
||||||
|
conn.receive_bytes()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
assert captured.get("resume") == "sess-42"
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue