mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-27 11:22:03 +00:00
Reconnect dashboard PTY chat after socket drops
This commit is contained in:
parent
6a319f570f
commit
41f8126148
3 changed files with 220 additions and 11 deletions
|
|
@ -167,6 +167,7 @@ def _resolve_restart_drain_timeout() -> float:
|
|||
async def _lifespan(app: "FastAPI"):
|
||||
app.state.event_channels = {} # dict[str, set]
|
||||
app.state.event_lock = asyncio.Lock()
|
||||
app.state.pty_active_session_files = {} # dict[str, Path]
|
||||
# Serializes chat-argv resolution so concurrent /api/pty connections
|
||||
# don't trigger overlapping ``npm install`` / ``npm run build`` work.
|
||||
# On app.state (not a module global) so the Lock binds to the running
|
||||
|
|
@ -234,6 +235,15 @@ def _get_chat_argv_lock(app: "FastAPI") -> asyncio.Lock:
|
|||
return app.state.chat_argv_lock
|
||||
|
||||
|
||||
def _get_pty_active_session_files(app: "FastAPI") -> dict[str, Path]:
|
||||
"""Return channel -> active-session-file state for dashboard PTYs."""
|
||||
try:
|
||||
return app.state.pty_active_session_files
|
||||
except AttributeError:
|
||||
app.state.pty_active_session_files = {}
|
||||
return app.state.pty_active_session_files
|
||||
|
||||
|
||||
app = FastAPI(title="Hermes Agent", version=__version__, lifespan=_lifespan)
|
||||
|
||||
# Memory-provider OAuth connect routes live in the memory layer, not here.
|
||||
|
|
@ -11544,6 +11554,7 @@ def _resolve_chat_argv(
|
|||
resume: Optional[str] = None,
|
||||
sidecar_url: Optional[str] = None,
|
||||
profile: Optional[str] = None,
|
||||
active_session_file: Optional[str] = None,
|
||||
) -> tuple[list[str], Optional[str], Optional[dict]]:
|
||||
"""Resolve the argv + cwd + env for the chat PTY.
|
||||
|
||||
|
|
@ -11564,6 +11575,12 @@ def _resolve_chat_argv(
|
|||
the spawned ``tui_gateway.entry`` can mirror dispatcher emits to the
|
||||
dashboard's ``/api/pub`` endpoint (see :func:`pub_ws`).
|
||||
|
||||
`active_session_file` (when set) is forwarded as
|
||||
``HERMES_TUI_ACTIVE_SESSION_FILE``. The TUI writes the current session id
|
||||
there whenever it creates/resumes/switches sessions, giving the dashboard a
|
||||
small cross-process breadcrumb for reconnecting after an unexpected browser
|
||||
WebSocket close.
|
||||
|
||||
`profile` (when set) scopes the ENTIRE chat to that profile by pointing
|
||||
``HERMES_HOME`` at the profile dir in the child env. Every spawned
|
||||
process (the TUI and the ``tui_gateway.entry`` it launches) resolves
|
||||
|
|
@ -11611,6 +11628,9 @@ def _resolve_chat_argv(
|
|||
if sidecar_url:
|
||||
env["HERMES_TUI_SIDECAR_URL"] = sidecar_url
|
||||
|
||||
if active_session_file:
|
||||
env["HERMES_TUI_ACTIVE_SESSION_FILE"] = active_session_file
|
||||
|
||||
# Profile-scoped chats must NOT attach to the dashboard's in-memory
|
||||
# gateway — it runs under the dashboard's own profile. Without the
|
||||
# attach URL, gatewayClient spawns its own `tui_gateway.entry`, which
|
||||
|
|
@ -11659,6 +11679,7 @@ async def _resolve_chat_argv_async(
|
|||
resume: Optional[str] = None,
|
||||
sidecar_url: Optional[str] = None,
|
||||
profile: Optional[str] = None,
|
||||
active_session_file: Optional[str] = None,
|
||||
) -> tuple[list[str], Optional[str], Optional[dict]]:
|
||||
"""Resolve chat argv without blocking the dashboard event loop.
|
||||
|
||||
|
|
@ -11670,12 +11691,18 @@ async def _resolve_chat_argv_async(
|
|||
multiple browser tabs connect at once without occupying worker threads
|
||||
while queued connections wait.
|
||||
"""
|
||||
kwargs = {
|
||||
"resume": resume,
|
||||
"sidecar_url": sidecar_url,
|
||||
"profile": profile,
|
||||
}
|
||||
if active_session_file is not None:
|
||||
kwargs["active_session_file"] = active_session_file
|
||||
|
||||
async with _get_chat_argv_lock(app):
|
||||
return await asyncio.to_thread(
|
||||
_resolve_chat_argv,
|
||||
resume=resume,
|
||||
sidecar_url=sidecar_url,
|
||||
profile=profile,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -11737,6 +11764,37 @@ def _channel_or_close_code(ws: WebSocket) -> Optional[str]:
|
|||
return channel if _VALID_CHANNEL_RE.match(channel) else None
|
||||
|
||||
|
||||
def _active_session_file_for_channel(app: "FastAPI", channel: str) -> Path:
|
||||
"""Return the per-channel file where a dashboard TUI writes its active sid."""
|
||||
files = _get_pty_active_session_files(app)
|
||||
existing = files.get(channel)
|
||||
if existing is not None:
|
||||
return existing
|
||||
|
||||
fd, raw_path = tempfile.mkstemp(prefix="hermes-pty-active-", suffix=".json")
|
||||
os.close(fd)
|
||||
path = Path(raw_path)
|
||||
files[channel] = path
|
||||
return path
|
||||
|
||||
|
||||
def _read_active_session_file(path: Path) -> Optional[str]:
|
||||
try:
|
||||
data = json.loads(path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
return None
|
||||
|
||||
session_id = str(data.get("session_id") or "").strip()
|
||||
return session_id or None
|
||||
|
||||
|
||||
def _forget_active_session_file(path: Path) -> None:
|
||||
try:
|
||||
path.unlink(missing_ok=True)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
def _ws_close_reason(text: str) -> str:
|
||||
"""Clamp a WS close reason to the protocol's 123-byte UTF-8 limit.
|
||||
|
||||
|
|
@ -11807,11 +11865,32 @@ async def pty_ws(ws: WebSocket) -> None:
|
|||
profile = ws.query_params.get("profile") or None
|
||||
channel = _channel_or_close_code(ws)
|
||||
sidecar_url = _build_sidecar_url(channel) if channel else None
|
||||
force_fresh = (ws.query_params.get("fresh") or "").strip().lower() in {
|
||||
"1",
|
||||
"true",
|
||||
"yes",
|
||||
"on",
|
||||
}
|
||||
active_session_file: Optional[Path] = None
|
||||
|
||||
if channel:
|
||||
active_session_file = _active_session_file_for_channel(ws.app, channel)
|
||||
if force_fresh:
|
||||
resume = None
|
||||
_forget_active_session_file(active_session_file)
|
||||
elif not resume:
|
||||
resume = _read_active_session_file(active_session_file)
|
||||
|
||||
resolve_kwargs = {
|
||||
"resume": resume,
|
||||
"sidecar_url": sidecar_url,
|
||||
"profile": profile,
|
||||
}
|
||||
if active_session_file is not None:
|
||||
resolve_kwargs["active_session_file"] = str(active_session_file)
|
||||
|
||||
try:
|
||||
argv, cwd, env = await _resolve_chat_argv_async(
|
||||
resume=resume, sidecar_url=sidecar_url, profile=profile
|
||||
)
|
||||
argv, cwd, env = await _resolve_chat_argv_async(**resolve_kwargs)
|
||||
except HTTPException as exc:
|
||||
# Unknown/invalid profile from _resolve_profile_dir.
|
||||
await ws.send_text(f"\r\n\x1b[31mChat unavailable: {exc.detail}\x1b[0m\r\n")
|
||||
|
|
|
|||
|
|
@ -5426,6 +5426,7 @@ class TestPtyWebSocket:
|
|||
# its own fake argv via ``ws._resolve_chat_argv``.
|
||||
self.ws_module = ws
|
||||
monkeypatch.setattr(ws, "_DASHBOARD_EMBEDDED_CHAT_ENABLED", True)
|
||||
ws.app.state.pty_active_session_files = {}
|
||||
self.token = ws._SESSION_TOKEN
|
||||
self.client = TestClient(ws.app)
|
||||
|
||||
|
|
@ -5454,6 +5455,22 @@ class TestPtyWebSocket:
|
|||
assert env["HERMES_TUI_INLINE"] == "1"
|
||||
assert env["HERMES_TUI_DISABLE_MOUSE"] == "1"
|
||||
|
||||
def test_resolve_chat_argv_sets_active_session_file_env(self, monkeypatch):
|
||||
"""Dashboard chat gives the TUI a breadcrumb file for reconnect resume."""
|
||||
import hermes_cli.main as main_mod
|
||||
|
||||
monkeypatch.setattr(
|
||||
main_mod,
|
||||
"_make_tui_argv",
|
||||
lambda project_root, tui_dev=False: (["node", "dist/entry.js"], "/tmp/ui-tui"),
|
||||
)
|
||||
|
||||
_argv, _cwd, env = self.ws_module._resolve_chat_argv(
|
||||
active_session_file="/tmp/hermes-active-session.json"
|
||||
)
|
||||
|
||||
assert env["HERMES_TUI_ACTIVE_SESSION_FILE"] == "/tmp/hermes-active-session.json"
|
||||
|
||||
def test_resolve_chat_argv_applies_terminal_backend_config(
|
||||
self, monkeypatch, _isolate_hermes_home
|
||||
):
|
||||
|
|
@ -5755,14 +5772,84 @@ class TestPtyWebSocket:
|
|||
pass
|
||||
assert captured.get("resume") == "sess-42"
|
||||
|
||||
def test_channel_reconnect_resumes_active_session_file(self, monkeypatch):
|
||||
"""A new /api/pty socket on the same channel resumes the last TUI sid."""
|
||||
script = (
|
||||
"import json, os, sys; "
|
||||
"resume = os.environ.get('HERMES_TUI_RESUME', ''); "
|
||||
"active = os.environ.get('HERMES_TUI_ACTIVE_SESSION_FILE', ''); "
|
||||
"sys.stdout.write(f'resume={resume}\\n'); sys.stdout.flush(); "
|
||||
"active and not resume and open(active, 'w').write(json.dumps({'session_id': 'sess-live'}))"
|
||||
)
|
||||
|
||||
def fake_resolve(resume=None, sidecar_url=None, profile=None, active_session_file=None):
|
||||
env = {}
|
||||
if active_session_file:
|
||||
env["HERMES_TUI_ACTIVE_SESSION_FILE"] = active_session_file
|
||||
if resume:
|
||||
env["HERMES_TUI_RESUME"] = resume
|
||||
return ([sys.executable, "-c", script], None, env)
|
||||
|
||||
monkeypatch.setattr(self.ws_module, "_resolve_chat_argv", fake_resolve)
|
||||
|
||||
def drain_until(conn, needle: bytes) -> bytes:
|
||||
buf = b""
|
||||
import time
|
||||
|
||||
deadline = time.monotonic() + 5.0
|
||||
while time.monotonic() < deadline:
|
||||
try:
|
||||
frame = conn.receive_bytes()
|
||||
except Exception:
|
||||
break
|
||||
if frame:
|
||||
buf += frame
|
||||
if needle in buf:
|
||||
break
|
||||
return buf
|
||||
|
||||
with self.client.websocket_connect(self._url(channel="reconnect-chan")) as conn:
|
||||
assert b"resume=" in drain_until(conn, b"resume=")
|
||||
|
||||
with self.client.websocket_connect(self._url(channel="reconnect-chan")) as conn:
|
||||
assert b"resume=sess-live" in drain_until(conn, b"resume=sess-live")
|
||||
|
||||
def test_fresh_param_ignores_channel_active_session_file(self, monkeypatch):
|
||||
"""Explicit fresh starts must not resurrect the prior channel session."""
|
||||
channel = "fresh-chan"
|
||||
active_file = self.ws_module._active_session_file_for_channel(
|
||||
self.ws_module.app,
|
||||
channel,
|
||||
)
|
||||
active_file.write_text(json.dumps({"session_id": "sess-old"}), encoding="utf-8")
|
||||
captured: dict = {}
|
||||
|
||||
def fake_resolve(resume=None, sidecar_url=None, profile=None, active_session_file=None):
|
||||
captured["resume"] = resume
|
||||
captured["active_session_file"] = active_session_file
|
||||
return (["/bin/sh", "-c", "printf fresh-ok"], None, None)
|
||||
|
||||
monkeypatch.setattr(self.ws_module, "_resolve_chat_argv", fake_resolve)
|
||||
|
||||
with self.client.websocket_connect(self._url(channel=channel, fresh="1")) as conn:
|
||||
try:
|
||||
conn.receive_bytes()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
assert captured["resume"] is None
|
||||
assert captured["active_session_file"] == str(active_file)
|
||||
assert not active_file.exists()
|
||||
|
||||
def test_channel_param_propagates_sidecar_url(self, monkeypatch):
|
||||
"""When /api/pty is opened with ?channel=, the PTY child gets a
|
||||
HERMES_TUI_SIDECAR_URL env var pointing back at /api/pub on the
|
||||
same channel — which is how tool events reach the dashboard sidebar."""
|
||||
captured: dict = {}
|
||||
|
||||
def fake_resolve(resume=None, sidecar_url=None, profile=None):
|
||||
def fake_resolve(resume=None, sidecar_url=None, profile=None, active_session_file=None):
|
||||
captured["sidecar_url"] = sidecar_url
|
||||
captured["active_session_file"] = active_session_file
|
||||
return (["/bin/sh", "-c", "printf sidecar-ok"], None, None)
|
||||
|
||||
monkeypatch.setattr(self.ws_module, "_resolve_chat_argv", fake_resolve)
|
||||
|
|
@ -5786,6 +5873,7 @@ class TestPtyWebSocket:
|
|||
assert url.startswith("ws://127.0.0.1:9119/api/pub?")
|
||||
assert "channel=abc-123" in url
|
||||
assert "token=" in url
|
||||
assert captured["active_session_file"]
|
||||
|
||||
def test_pub_broadcasts_to_events_subscribers(self):
|
||||
"""A frame handed to _broadcast_event is sent verbatim to every
|
||||
|
|
|
|||
|
|
@ -46,6 +46,7 @@ function buildWsUrl(
|
|||
resume: string | null,
|
||||
channel: string,
|
||||
profile: string,
|
||||
fresh: boolean,
|
||||
): string {
|
||||
const proto = window.location.protocol === "https:" ? "wss:" : "ws:";
|
||||
// ``authParam`` is ``["token", <session>]`` in loopback mode and
|
||||
|
|
@ -53,6 +54,7 @@ function buildWsUrl(
|
|||
// ``_ws_auth_ok`` picks whichever shape matches the current gate state.
|
||||
const qs = new URLSearchParams({ [authParam[0]]: authParam[1], channel });
|
||||
if (resume) qs.set("resume", resume);
|
||||
if (fresh) qs.set("fresh", "1");
|
||||
// Profile-scoped chat: the PTY child gets HERMES_HOME pointed at the
|
||||
// selected profile, so the conversation runs with that profile's model,
|
||||
// skills, memory, and sessions (see web_server._resolve_chat_argv).
|
||||
|
|
@ -144,6 +146,9 @@ export default function ChatPage({ isActive = true }: { isActive?: boolean }) {
|
|||
);
|
||||
const [copyState, setCopyState] = useState<"idle" | "copied">("idle");
|
||||
const copyResetRef = useRef<ReturnType<typeof setTimeout> | null>(null);
|
||||
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
|
||||
const reconnectAttemptRef = useRef(0);
|
||||
const forceFreshPtyRef = useRef(false);
|
||||
// NS-504: when the agent process exits cleanly (the user typed `/exit`, or
|
||||
// started a new session that ended the current PTY child), the PTY socket
|
||||
// closes with a normal code. Before this fix the terminal just printed
|
||||
|
|
@ -153,20 +158,32 @@ export default function ChatPage({ isActive = true }: { isActive?: boolean }) {
|
|||
// is a dependency of the connect effect, so a fresh PTY spawns in place.
|
||||
const [sessionEnded, setSessionEnded] = useState(false);
|
||||
const [reconnectNonce, setReconnectNonce] = useState(0);
|
||||
const clearReconnectTimer = useCallback(() => {
|
||||
if (reconnectTimerRef.current) {
|
||||
clearTimeout(reconnectTimerRef.current);
|
||||
reconnectTimerRef.current = null;
|
||||
}
|
||||
}, []);
|
||||
const reconnect = useCallback(() => {
|
||||
forceFreshPtyRef.current = true;
|
||||
reconnectAttemptRef.current = 0;
|
||||
clearReconnectTimer();
|
||||
setSessionEnded(false);
|
||||
setBanner(null);
|
||||
setReconnectNonce((n) => n + 1);
|
||||
}, []);
|
||||
}, [clearReconnectTimer]);
|
||||
const startFreshDashboardChat = useCallback(() => {
|
||||
const next = new URLSearchParams(searchParams);
|
||||
|
||||
next.delete("resume");
|
||||
forceFreshPtyRef.current = true;
|
||||
reconnectAttemptRef.current = 0;
|
||||
clearReconnectTimer();
|
||||
setSearchParams(next, { replace: true });
|
||||
setSessionEnded(false);
|
||||
setBanner(null);
|
||||
setReconnectNonce((n) => n + 1);
|
||||
}, [searchParams, setSearchParams]);
|
||||
}, [clearReconnectTimer, searchParams, setSearchParams]);
|
||||
// Raw state for the mobile side-sheet + a derived value that force-
|
||||
// closes whenever the chat tab isn't active. The *derived* value is
|
||||
// what side-effects (body-scroll lock, keydown listener, portal render)
|
||||
|
|
@ -655,15 +672,35 @@ export default function ChatPage({ isActive = true }: { isActive?: boolean }) {
|
|||
let unmounting = false;
|
||||
let onDataDisposable: { dispose(): void } | null = null;
|
||||
let onResizeDisposable: { dispose(): void } | null = null;
|
||||
const forceFresh = forceFreshPtyRef.current;
|
||||
forceFreshPtyRef.current = false;
|
||||
const scheduleReconnect = (code: number) => {
|
||||
if (reconnectTimerRef.current) {
|
||||
return;
|
||||
}
|
||||
const attempt = Math.min(reconnectAttemptRef.current + 1, 5);
|
||||
reconnectAttemptRef.current = attempt;
|
||||
const delayMs = Math.min(250 * 2 ** (attempt - 1), 3000);
|
||||
setSessionEnded(false);
|
||||
setBanner(
|
||||
`Chat connection interrupted (code ${code}). Reconnecting…`,
|
||||
);
|
||||
reconnectTimerRef.current = setTimeout(() => {
|
||||
reconnectTimerRef.current = null;
|
||||
setReconnectNonce((n) => n + 1);
|
||||
}, delayMs);
|
||||
};
|
||||
void (async () => {
|
||||
const authParam = await buildWsAuthParam();
|
||||
if (unmounting) return;
|
||||
const url = buildWsUrl(authParam, resumeParam, channel, scopedProfile);
|
||||
const url = buildWsUrl(authParam, resumeParam, channel, scopedProfile, forceFresh);
|
||||
const ws = new WebSocket(url);
|
||||
ws.binaryType = "arraybuffer";
|
||||
wsRef.current = ws;
|
||||
|
||||
ws.onopen = () => {
|
||||
clearReconnectTimer();
|
||||
reconnectAttemptRef.current = 0;
|
||||
setBanner(null);
|
||||
setSessionEnded(false);
|
||||
// Send the initial RESIZE immediately so Ink has *a* size to lay
|
||||
|
|
@ -746,6 +783,10 @@ export default function ChatPage({ isActive = true }: { isActive?: boolean }) {
|
|||
// Server already wrote an ANSI error frame.
|
||||
return;
|
||||
}
|
||||
if (!ev.wasClean || ev.code === 1001 || ev.code === 1006) {
|
||||
scheduleReconnect(ev.code);
|
||||
return;
|
||||
}
|
||||
// Normal/clean exit: the agent process ended (e.g. the user typed
|
||||
// `/exit`, or started a new session). NS-504: surface an explicit
|
||||
// restart affordance instead of leaving a dead terminal that only a
|
||||
|
|
@ -806,6 +847,7 @@ export default function ChatPage({ isActive = true }: { isActive?: boolean }) {
|
|||
if (hostSyncRaf) cancelAnimationFrame(hostSyncRaf);
|
||||
if (settleRaf1) cancelAnimationFrame(settleRaf1);
|
||||
if (settleRaf2) cancelAnimationFrame(settleRaf2);
|
||||
clearReconnectTimer();
|
||||
// Phase 5.3: ``ws`` is local to the IIFE that opens it (the gated-mode
|
||||
// ticket fetch makes the open async). The cleanup runs at the outer
|
||||
// effect's top level so it can't reach into that scope — close via
|
||||
|
|
@ -821,7 +863,7 @@ export default function ChatPage({ isActive = true }: { isActive?: boolean }) {
|
|||
copyResetRef.current = null;
|
||||
}
|
||||
};
|
||||
}, [channel, resumeParam, scopedProfile, reconnectNonce]);
|
||||
}, [channel, clearReconnectTimer, resumeParam, scopedProfile, reconnectNonce]);
|
||||
|
||||
// When the user returns to the chat tab (isActive: false → true), the
|
||||
// terminal host just transitioned from display:none to display:flex.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue