mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-21 10:22:18 +00:00
fix(dashboard): resolve chat TUI argv off event loop (#48561)
* fix(dashboard): resolve chat TUI argv off event loop Dashboard chat now resolves its TUI launch command off the FastAPI/WebSocket event loop. The resolver can run `npm install` / `npm run build` through `_make_tui_argv()`, and doing that synchronously in `/api/pty` can block proxy keepalives and other dashboard WebSocket work long enough for reverse-proxy deployments to drop the chat connection. This keeps the current TUI build policy intact: normal production launches still run the correctness-first `npm run build` path, while `HERMES_TUI_DIR` remains the prebuilt/no-build path for distros and containers. The change only moves the potentially slow resolver work to a worker thread for the dashboard chat path, serialized by an `asyncio.Lock` so concurrent chat tabs preserve one-build-at-a-time behavior. `SystemExit` (node/npm missing) and the profile `HTTPException` path still propagate cleanly through `asyncio.to_thread()`. Salvaged from #26124 — rebased onto current main. The async wrapper now threads the `profile` parameter that `_resolve_chat_argv` gained on main since the PR was opened, so cross-profile chat is preserved. Co-authored-by: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> * chore: add 0xdany to AUTHOR_MAP * fix(dashboard): bind chat-argv lock to app.state; cover error propagation Self-review hardening on top of the salvaged fix: - Move `_chat_argv_lock` from a module-level `asyncio.Lock()` onto `app.state` (initialised in `_lifespan`, lazy fallback via `_get_chat_argv_lock`), mirroring `event_lock`. A module-level `asyncio.Lock()` binds to whatever event loop is active at import time, which is the exact pattern `_get_event_state`'s docstring warns against (breaks across TestClient instances / uvicorn reloads). This keeps the lock on the running loop. - Add two tests exercising the real `_resolve_chat_argv_async` → `asyncio.to_thread` → lock → re-raise chain: `SystemExit` (node/npm missing) and `HTTPException` (invalid profile) both propagate out of the worker thread and are caught by `pty_ws`'s existing handlers. The prior tests mocked `asyncio.to_thread` away and never covered this path. * test(dashboard): dedupe pty error-propagation tests; assert close code simplify-code cleanup pass on the salvage stack: - Extract the shared scaffolding of the two pty_ws error-propagation tests into `_assert_pty_propagates`, keeping the two tests as distinct contracts for the `except SystemExit` and `except HTTPException` arms. - Assert the stable WebSocket close code (1011) instead of relying solely on the user-facing "Chat unavailable" notice wording — a behavior contract per the AGENTS.md "behavior contracts over snapshots" rule, robust to notice rewording. The detail substring ("unknown profile") is still checked for the HTTPException case since proving the detail survives the thread hop is the point of that test. No production-code change; the helper exercises the same real _resolve_chat_argv_async -> asyncio.to_thread -> lock -> re-raise chain. --------- Co-authored-by: draihan <draihan@student.ubc.ca>
This commit is contained in:
parent
8568988b01
commit
d06104a9ee
3 changed files with 149 additions and 2 deletions
|
|
@ -147,6 +147,11 @@ def _start_desktop_cron_ticker(stop_event: "threading.Event", interval: int = 60
|
|||
async def _lifespan(app: "FastAPI"):
|
||||
app.state.event_channels = {} # dict[str, set]
|
||||
app.state.event_lock = asyncio.Lock()
|
||||
# Serializes chat-argv resolution so concurrent /api/pty connections
|
||||
# don't trigger overlapping ``npm install`` / ``npm run build`` work.
|
||||
# On app.state (not a module global) so the Lock binds to the running
|
||||
# event loop during lifespan startup — see _get_event_state's docstring.
|
||||
app.state.chat_argv_lock = asyncio.Lock()
|
||||
|
||||
# Desktop-spawned backends (HERMES_DESKTOP=1) fire cron jobs themselves,
|
||||
# since the app has no gateway running the scheduler. Server `hermes
|
||||
|
|
@ -187,6 +192,20 @@ def _get_event_state(app: "FastAPI"):
|
|||
return app.state.event_channels, app.state.event_lock
|
||||
|
||||
|
||||
def _get_chat_argv_lock(app: "FastAPI") -> asyncio.Lock:
|
||||
"""Return the chat-argv resolution lock from app.state.
|
||||
|
||||
Mirrors :func:`_get_event_state`: prefers the lifespan-initialised Lock
|
||||
(created on the correct event loop) but lazily initialises it for
|
||||
non-``with`` TestClient usages.
|
||||
"""
|
||||
try:
|
||||
return app.state.chat_argv_lock
|
||||
except AttributeError:
|
||||
app.state.chat_argv_lock = asyncio.Lock()
|
||||
return app.state.chat_argv_lock
|
||||
|
||||
|
||||
app = FastAPI(title="Hermes Agent", version=__version__, lifespan=_lifespan)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
@ -10745,7 +10764,8 @@ def _ws_auth_ok(ws: "WebSocket") -> bool:
|
|||
# and /api/events (dashboard → browser sidebar). Keyed by an opaque channel id
|
||||
# the chat tab generates on mount; entries auto-evict when the last subscriber
|
||||
# drops AND the publisher has disconnected.
|
||||
# (State is initialised in _lifespan on app startup — see above.)
|
||||
# (Channel state and the chat-argv lock are initialised in _lifespan on app
|
||||
# startup — see _get_event_state / _get_chat_argv_lock above.)
|
||||
|
||||
|
||||
def _resolve_chat_argv(
|
||||
|
|
@ -10862,6 +10882,30 @@ def _build_gateway_ws_url() -> Optional[str]:
|
|||
return f"ws://{netloc}/api/ws?{qs}"
|
||||
|
||||
|
||||
async def _resolve_chat_argv_async(
|
||||
resume: Optional[str] = None,
|
||||
sidecar_url: Optional[str] = None,
|
||||
profile: Optional[str] = None,
|
||||
) -> tuple[list[str], Optional[str], Optional[dict]]:
|
||||
"""Resolve chat argv without blocking the dashboard event loop.
|
||||
|
||||
``_resolve_chat_argv`` may run ``npm install`` / ``npm run build`` through
|
||||
``_make_tui_argv``. Keep that synchronous work off the WebSocket event
|
||||
loop so reverse proxies and existing dashboard connections can continue
|
||||
to exchange keepalives while the TUI launch command is prepared. The
|
||||
async lock preserves the previous one-build-at-a-time behavior when
|
||||
multiple browser tabs connect at once without occupying worker threads
|
||||
while queued connections wait.
|
||||
"""
|
||||
async with _get_chat_argv_lock(app):
|
||||
return await asyncio.to_thread(
|
||||
_resolve_chat_argv,
|
||||
resume=resume,
|
||||
sidecar_url=sidecar_url,
|
||||
profile=profile,
|
||||
)
|
||||
|
||||
|
||||
def _build_sidecar_url(channel: str) -> Optional[str]:
|
||||
"""ws:// URL the PTY child should publish events to, or None when unbound.
|
||||
|
||||
|
|
@ -10992,7 +11036,7 @@ async def pty_ws(ws: WebSocket) -> None:
|
|||
sidecar_url = _build_sidecar_url(channel) if channel else None
|
||||
|
||||
try:
|
||||
argv, cwd, env = _resolve_chat_argv(
|
||||
argv, cwd, env = await _resolve_chat_argv_async(
|
||||
resume=resume, sidecar_url=sidecar_url, profile=profile
|
||||
)
|
||||
except HTTPException as exc:
|
||||
|
|
|
|||
|
|
@ -208,6 +208,7 @@ AUTHOR_MAP = {
|
|||
"me@promplate.dev": "CNSeniorious000",
|
||||
"yichengqiao21@gmail.com": "YarrowQiao",
|
||||
"erhanyasarx@gmail.com": "erhnysr",
|
||||
"draihan@student.ubc.ca": "0xdany", # PR #26124 salvage (chat argv off event loop)
|
||||
"30366221+WorldWriter@users.noreply.github.com": "WorldWriter",
|
||||
"dafeng@DafengdeMacBook-Pro.local": "WorldWriter",
|
||||
"schepers.zander1@gmail.com": "Strontvod",
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
"""Tests for hermes_cli.web_server and related config utilities."""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import json
|
||||
import shutil
|
||||
|
|
@ -5132,6 +5133,107 @@ class TestPtyWebSocket:
|
|||
pass
|
||||
assert exc.value.code == 4401
|
||||
|
||||
def test_resolve_chat_argv_async_uses_worker_thread(self, monkeypatch):
|
||||
captured: dict = {}
|
||||
|
||||
def fake_resolve(resume=None, sidecar_url=None, profile=None):
|
||||
captured["resume"] = resume
|
||||
captured["sidecar_url"] = sidecar_url
|
||||
captured["profile"] = profile
|
||||
return (["node", "dist/entry.js"], "/tmp/ui-tui", {"NODE_ENV": "production"})
|
||||
|
||||
async def fake_to_thread(fn, *args, **kwargs):
|
||||
captured["thread_fn"] = fn
|
||||
captured["thread_args"] = args
|
||||
captured["thread_kwargs"] = kwargs
|
||||
return fn(*args, **kwargs)
|
||||
|
||||
monkeypatch.setattr(self.ws_module, "_resolve_chat_argv", fake_resolve)
|
||||
monkeypatch.setattr(self.ws_module.asyncio, "to_thread", fake_to_thread)
|
||||
|
||||
argv, cwd, env = asyncio.run(
|
||||
self.ws_module._resolve_chat_argv_async(
|
||||
resume="sess-42",
|
||||
sidecar_url="ws://127.0.0.1:9119/api/pub?channel=abc",
|
||||
profile="worker",
|
||||
)
|
||||
)
|
||||
|
||||
assert callable(captured["thread_fn"])
|
||||
assert captured["thread_args"] == ()
|
||||
assert captured["thread_kwargs"] == {
|
||||
"resume": "sess-42",
|
||||
"sidecar_url": "ws://127.0.0.1:9119/api/pub?channel=abc",
|
||||
"profile": "worker",
|
||||
}
|
||||
assert argv == ["node", "dist/entry.js"]
|
||||
assert cwd == "/tmp/ui-tui"
|
||||
assert env == {"NODE_ENV": "production"}
|
||||
assert captured["resume"] == "sess-42"
|
||||
assert captured["sidecar_url"] == "ws://127.0.0.1:9119/api/pub?channel=abc"
|
||||
assert captured["profile"] == "worker"
|
||||
|
||||
def test_pty_ws_resolves_argv_through_async_wrapper(self, monkeypatch):
|
||||
captured: dict = {}
|
||||
|
||||
async def fake_resolve_async(resume=None, sidecar_url=None, profile=None):
|
||||
captured["resume"] = resume
|
||||
captured["sidecar_url"] = sidecar_url
|
||||
captured["profile"] = profile
|
||||
return (["/bin/sh", "-c", "printf async-resolve-ok"], None, None)
|
||||
|
||||
monkeypatch.setattr(self.ws_module, "_resolve_chat_argv_async", fake_resolve_async)
|
||||
|
||||
with self.client.websocket_connect(self._url(resume="sess-99")) as conn:
|
||||
try:
|
||||
conn.receive_bytes()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
assert captured["resume"] == "sess-99"
|
||||
|
||||
def _assert_pty_propagates(self, monkeypatch, raising_resolver, *, profile=None, expect_detail=None):
|
||||
"""Drive /api/pty with a resolver that raises, and assert the error
|
||||
propagates through the real _resolve_chat_argv_async -> asyncio.to_thread
|
||||
-> lock -> re-raise chain into pty_ws's handler: the "Chat unavailable"
|
||||
notice is sent and the socket closes with code 1011 (the stable
|
||||
contract — we assert the close code, not the exact notice wording)."""
|
||||
from starlette.websockets import WebSocketDisconnect
|
||||
|
||||
# Patch the REAL resolver so the whole wrapper/to_thread/lock chain runs.
|
||||
monkeypatch.setattr(self.ws_module, "_resolve_chat_argv", raising_resolver)
|
||||
|
||||
url = self._url(profile=profile) if profile else self._url()
|
||||
with self.client.websocket_connect(url) as conn:
|
||||
notice = conn.receive_text()
|
||||
with pytest.raises(WebSocketDisconnect) as exc:
|
||||
conn.receive_text()
|
||||
assert "Chat unavailable" in notice
|
||||
assert exc.value.code == 1011
|
||||
if expect_detail is not None:
|
||||
assert expect_detail in notice
|
||||
|
||||
def test_pty_ws_propagates_systemexit_through_async_wrapper(self, monkeypatch):
|
||||
"""SystemExit from _make_tui_argv (node/npm missing) propagates through
|
||||
the async wrapper and is caught by pty_ws's ``except SystemExit``."""
|
||||
|
||||
def boom(resume=None, sidecar_url=None, profile=None):
|
||||
raise SystemExit("node not found")
|
||||
|
||||
self._assert_pty_propagates(monkeypatch, boom)
|
||||
|
||||
def test_pty_ws_propagates_httpexception_through_async_wrapper(self, monkeypatch):
|
||||
"""An invalid-profile HTTPException raised inside the threaded resolver
|
||||
propagates through the wrapper and hits pty_ws's ``except HTTPException``."""
|
||||
from fastapi import HTTPException
|
||||
|
||||
def bad_profile(resume=None, sidecar_url=None, profile=None):
|
||||
raise HTTPException(status_code=404, detail="unknown profile")
|
||||
|
||||
self._assert_pty_propagates(
|
||||
monkeypatch, bad_profile, profile="ghost", expect_detail="unknown profile"
|
||||
)
|
||||
|
||||
def test_streams_child_stdout_to_client(self, monkeypatch):
|
||||
monkeypatch.setattr(
|
||||
self.ws_module,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue