diff --git a/hermes_cli/web_server.py b/hermes_cli/web_server.py index 60103630396..c3e1d7ec3e4 100644 --- a/hermes_cli/web_server.py +++ b/hermes_cli/web_server.py @@ -10142,4 +10142,9 @@ def start_server( uvicorn.run( app, host=host, port=port, log_level="warning", proxy_headers=bool(app.state.auth_required), + # Detect half-open WS connections (reverse-proxy 524, dropped tunnels) + # within ~20-40s so WebSocketDisconnect fires the disconnect→reap path. + # 20s stays under Cloudflare Tunnel's idle timeout, keeping it warm. + ws_ping_interval=20.0, + ws_ping_timeout=20.0, ) diff --git a/tests/test_dashboard_sidecar_close_on_disconnect.py b/tests/test_dashboard_sidecar_close_on_disconnect.py new file mode 100644 index 00000000000..bb11e688cf1 --- /dev/null +++ b/tests/test_dashboard_sidecar_close_on_disconnect.py @@ -0,0 +1,13 @@ +import re +from pathlib import Path + +CHAT_SIDEBAR = Path(__file__).resolve().parent.parent / "web/src/components/ChatSidebar.tsx" + + +def test_sidecar_session_create_requests_close_on_disconnect(): + """The sidecar must opt its session into close_on_disconnect so the gateway + reaps the slash_worker on WS disconnect (the #21370/#21467 leak).""" + source = CHAT_SIDEBAR.read_text(encoding="utf-8") + call = re.search(r'"session\.create",\s*\{(.*?)\}', source, re.DOTALL) + assert call, "sidecar session.create call not found" + assert re.search(r"close_on_disconnect:\s*true", call.group(1)) diff --git a/tests/test_tui_gateway_server.py b/tests/test_tui_gateway_server.py index b5534046b7e..90b73049b63 100644 --- a/tests/test_tui_gateway_server.py +++ b/tests/test_tui_gateway_server.py @@ -1,5 +1,6 @@ import json import os +import subprocess import sys import threading import time @@ -2256,7 +2257,7 @@ def test_config_set_model_global_persists(monkeypatch): server._sessions["sid"] = _session(agent=_Agent()) monkeypatch.setattr("hermes_cli.model_switch.switch_model", _switch_model) - monkeypatch.setattr(server, "_restart_slash_worker", lambda session: None) + monkeypatch.setattr(server, "_restart_slash_worker", lambda sid, session: None) monkeypatch.setattr(server, "_emit", lambda *args, **kwargs: None) monkeypatch.setattr("hermes_cli.config.save_config", lambda cfg: saved.update(cfg)) @@ -2315,7 +2316,7 @@ def test_config_set_model_does_not_leak_inference_provider_env(monkeypatch): monkeypatch.setattr( "hermes_cli.model_switch.switch_model", lambda **_kwargs: result ) - monkeypatch.setattr(server, "_restart_slash_worker", lambda session: None) + monkeypatch.setattr(server, "_restart_slash_worker", lambda sid, session: None) monkeypatch.setattr(server, "_emit", lambda *args, **kwargs: None) try: @@ -2376,7 +2377,7 @@ def test_config_set_model_records_per_session_override_not_env(monkeypatch): monkeypatch.setattr( "hermes_cli.model_switch.switch_model", lambda **_kwargs: result ) - monkeypatch.setattr(server, "_restart_slash_worker", lambda session: None) + monkeypatch.setattr(server, "_restart_slash_worker", lambda sid, session: None) monkeypatch.setattr(server, "_emit", lambda *args, **kwargs: None) try: @@ -2431,7 +2432,7 @@ def test_config_set_model_switches_agent_without_touching_env(monkeypatch): monkeypatch.setenv("HERMES_TUI_PROVIDER", "openai-codex") monkeypatch.delenv("HERMES_MODEL", raising=False) monkeypatch.delenv("HERMES_INFERENCE_MODEL", raising=False) - monkeypatch.setattr(server, "_restart_slash_worker", lambda session: None) + monkeypatch.setattr(server, "_restart_slash_worker", lambda sid, session: None) monkeypatch.setattr(server, "_emit", lambda *args, **kwargs: None) def fake_switch_model(**kwargs): @@ -2583,7 +2584,7 @@ def test_session_compress_syncs_session_key_after_rotation(monkeypatch): monkeypatch.setattr(server, "_session_info", lambda _agent, *a: {"model": "x"}) restart_calls = [] monkeypatch.setattr( - server, "_restart_slash_worker", lambda s: restart_calls.append(s) + server, "_restart_slash_worker", lambda sid, s: restart_calls.append(s) ) try: @@ -4518,6 +4519,56 @@ def test_session_active_list_reports_live_sessions(monkeypatch): assert rows["sid-b"]["preview"] == "writing code" +def test_session_active_list_excludes_finalized_sessions(monkeypatch): + """#38950: a finalized-but-not-yet-popped session must not inflate the count. + + The WS grace-reap and idle reaper set ``_finalized`` inside + ``_teardown_session`` before popping the entry from ``_sessions``. During + that window ``session.active_list`` would otherwise still report the dead + session, which is exactly the footer "N sessions" count that only ever grew + until a gateway restart. A live session on the real stdio transport (the + standalone ``hermes --tui`` case) must still be reported. + """ + class _DB: + def get_session_title(self, key): + return {"key-live": "Live", "key-dead": "Dead"}.get(key, "") + + previous_sessions = dict(server._sessions) + server._sessions.clear() + monkeypatch.setattr(server, "_get_db", lambda: _DB()) + server._sessions["sid-live"] = _session( + agent=types.SimpleNamespace(model="model-live"), + history=[{"role": "user", "content": "still here"}], + session_key="key-live", + created_at=10.0, + last_active=20.0, + ) + dead = _session( + agent=types.SimpleNamespace(model="model-dead"), + history=[{"role": "user", "content": "gone"}], + session_key="key-dead", + created_at=11.0, + last_active=21.0, + ) + dead["_finalized"] = True + server._sessions["sid-dead"] = dead + try: + resp = server.handle_request( + { + "id": "1", + "method": "session.active_list", + "params": {}, + } + ) + finally: + server._sessions.clear() + server._sessions.update(previous_sessions) + + session_rows = resp["result"]["sessions"] + assert [row["id"] for row in session_rows] == ["sid-live"] + + + def test_session_activate_returns_inflight_stream_before_completion(monkeypatch): """Switching into a still-running live session must hydrate partial output. @@ -6096,3 +6147,270 @@ def test_sniff_image_ext_magic_and_filename(): assert server._sniff_image_ext(b"unknown") == ".png" # fallback # filename hint wins over magic bytes assert server._sniff_image_ext(b"\x89PNG", "photo.jpeg") == ".jpeg" + + +def test_slash_worker_close_reaps_zombie_and_closes_fds(): + """A hung worker is SIGKILLed, the zombie reaped, all pipes closed — once.""" + calls = {k: 0 for k in ("terminate", "kill", "wait", "stdin", "stdout", "stderr")} + + class FakeStream: + def __init__(self, name): + self.name = name + + def close(self): + calls[self.name] += 1 + + class FakeProc: + stdin, stdout, stderr = (FakeStream(n) for n in ("stdin", "stdout", "stderr")) + + def poll(self): + return None # always alive -> forces terminate then kill + + def terminate(self): + calls["terminate"] += 1 + + def kill(self): + calls["kill"] += 1 + + def wait(self, timeout=None): + calls["wait"] += 1 + raise subprocess.TimeoutExpired(cmd="x", timeout=timeout) + + worker = object.__new__(server._SlashWorker) + worker.proc = FakeProc() + + worker.close() + worker.close() # idempotent + + assert calls["terminate"] == 1 + assert calls["kill"] == 1 + assert calls["wait"] >= 2 # reaped after both terminate and kill + assert calls["stdin"] == calls["stdout"] == calls["stderr"] == 1 + + +def test_close_session_by_id_is_idempotent_and_full(monkeypatch): + """One call tears the session down fully; a second is a no-op.""" + calls = {"worker": 0, "agent": 0, "unreg": 0, "finalize": 0} + + class W: + def close(self): + calls["worker"] += 1 + + class A: + def close(self): + calls["agent"] += 1 + + monkeypatch.setattr( + server, "_finalize_session", + lambda s, end_reason="tui_close": calls.__setitem__("finalize", calls["finalize"] + 1), + ) + monkeypatch.setattr( + "tools.approval.unregister_gateway_notify", + lambda key: calls.__setitem__("unreg", calls["unreg"] + 1), raising=False, + ) + server._sessions["sid-1"] = {"session_key": "k1", "agent": A(), "slash_worker": W()} + + assert server._close_session_by_id("sid-1", end_reason="ws_disconnect") is True + assert server._close_session_by_id("sid-1", end_reason="ws_disconnect") is False + assert calls == {"worker": 1, "agent": 1, "unreg": 1, "finalize": 1} + assert "sid-1" not in server._sessions + + +def test_attach_worker_closes_orphan_when_session_already_torn_down(): + """A worker built after its session was reaped must be closed, not orphaned.""" + closed = [] + + class W: + def close(self): + closed.append(True) + + server._sessions.pop("gone", None) + detached = {"session_key": "k"} # not in _sessions -> already torn down + server._attach_worker("gone", detached, W()) + + assert closed == [True] + assert "slash_worker" not in detached + assert "gone" not in server._sessions + + +def test_attach_worker_stores_worker_on_live_session(): + class W: + def close(self): + raise AssertionError("must not close a worker for a live session") + + live = {"session_key": "k"} + server._sessions["live"] = live + worker = W() + try: + server._attach_worker("live", live, worker) + assert live["slash_worker"] is worker + finally: + server._sessions.pop("live", None) + + +def test_restart_slash_worker_closes_orphan_when_session_reaped(monkeypatch): + """Post-turn restart of a session reaped mid-flight (e.g. close_on_disconnect + fired while `running` flipped false) must close the fresh worker, not orphan it.""" + closed = [] + + class _FakeWorker: + def __init__(self, *a, **k): + pass + + def close(self): + closed.append(True) + + monkeypatch.setattr(server, "_SlashWorker", _FakeWorker) + server._sessions.pop("reaped", None) + reaped = {"session_key": "k"} # not in _sessions -> torn down concurrently + server._restart_slash_worker("reaped", reaped) + + assert closed == [True] + assert reaped.get("slash_worker") is None + assert "reaped" not in server._sessions + + +def test_restart_slash_worker_stores_on_live_session(monkeypatch): + class _FakeWorker: + def __init__(self, *a, **k): + pass + + def close(self): + pass + + monkeypatch.setattr(server, "_SlashWorker", _FakeWorker) + live = {"session_key": "k", "slash_worker": None} + server._sessions["live-restart"] = live + try: + server._restart_slash_worker("live-restart", live) + assert isinstance(live["slash_worker"], _FakeWorker) + finally: + server._sessions.pop("live-restart", None) + + +def test_session_close_rpc_delegates_to_close_session_by_id(monkeypatch): + seen = [] + monkeypatch.setattr( + server, "_close_session_by_id", + lambda sid, *, end_reason: bool(seen.append((sid, end_reason))) or True, + ) + resp = server.handle_request( + {"id": "1", "method": "session.close", "params": {"session_id": "s9"}} + ) + assert resp["result"] == {"closed": True} + assert seen == [("s9", "tui_close")] + + +def test_close_sessions_for_transport_closes_flagged_repoints_rest(monkeypatch): + seen = [] + monkeypatch.setattr( + server, "_close_session_by_id", + lambda sid, *, end_reason: bool(seen.append((sid, end_reason))) or True, + ) + transport = object() # the disconnecting transport + server._sessions.clear() + server._sessions["a"] = {"transport": transport, "close_on_disconnect": True} + server._sessions["b"] = {"transport": transport, "close_on_disconnect": False} + try: + server._close_sessions_for_transport(transport, end_reason="ws_disconnect") + assert seen == [("a", "ws_disconnect")] # only the flagged one closed + assert server._sessions["b"]["transport"] is server._stdio_transport # re-pointed + finally: + server._sessions.clear() + + +def test_session_create_records_close_on_disconnect_flag(monkeypatch): + monkeypatch.setattr(server, "_start_agent_build", lambda sid, session: None) + server._sessions.clear() + try: + on = server.handle_request( + {"id": "1", "method": "session.create", "params": {"close_on_disconnect": True}} + )["result"]["session_id"] + off = server.handle_request( + {"id": "2", "method": "session.create", "params": {}} + )["result"]["session_id"] + assert server._sessions[on]["close_on_disconnect"] + assert not server._sessions[off]["close_on_disconnect"] + finally: + server._sessions.clear() + + +def test_shutdown_sessions_closes_every_session_via_helper(monkeypatch): + seen = [] + monkeypatch.setattr( + server, "_close_session_by_id", + lambda sid, *, end_reason: seen.append((sid, end_reason)), + ) + server._sessions.clear() + server._sessions["a"] = {} + server._sessions["b"] = {} + try: + server._shutdown_sessions() + assert sorted(sid for sid, _ in seen) == ["a", "b"] + assert {reason for _, reason in seen} == {"tui_shutdown"} + finally: + server._sessions.clear() + + +def _idle_evictable_session(now): + """A session that satisfies every eviction precondition.""" + ready = threading.Event() + ready.set() + old = now - 10 * 3600 # well past the 6h TTL + return { + "running": False, + "agent_ready": ready, + "transport": server._stdio_transport, # dead/detached + "last_active": old, + "created_at": old, + } + + +def test_session_is_evictable_when_idle_dead_and_quiescent(monkeypatch): + monkeypatch.setattr(server, "_session_pending_kind", lambda sid: "") + now = time.time() + assert server._session_is_evictable("s", _idle_evictable_session(now), now) is True + + +def test_session_not_evictable_violating_each_exemption(monkeypatch): + monkeypatch.setattr(server, "_session_pending_kind", lambda sid: "") + now = time.time() + live_transport = type("T", (), {"_closed": False})() + + running = _idle_evictable_session(now) | {"running": True} + assert server._session_is_evictable("s", running, now) is False + + starting = _idle_evictable_session(now) + starting["agent_ready"] = threading.Event() # not set -> still starting + assert server._session_is_evictable("s", starting, now) is False + + on_socket = _idle_evictable_session(now) | {"transport": live_transport} + assert server._session_is_evictable("s", on_socket, now) is False + + recent = _idle_evictable_session(now) | {"last_active": now} + assert server._session_is_evictable("s", recent, now) is False + + young = _idle_evictable_session(now) | {"created_at": now} + assert server._session_is_evictable("s", young, now) is False + + # Pending input request, even when everything else looks idle. + monkeypatch.setattr(server, "_session_pending_kind", lambda sid: "input") + assert server._session_is_evictable("s", _idle_evictable_session(now), now) is False + + +def test_reap_idle_sessions_closes_only_evictable(monkeypatch): + closed = [] + monkeypatch.setattr(server, "_session_pending_kind", lambda sid: "") + monkeypatch.setattr( + server, "_close_session_by_id", + lambda sid, *, end_reason: closed.append((sid, end_reason)), + ) + now = time.time() + server._sessions.clear() + server._sessions["stale"] = _idle_evictable_session(now) + server._sessions["fresh"] = _idle_evictable_session(now) | {"last_active": now} + try: + server._reap_idle_sessions() + assert closed == [("stale", "idle_timeout")] + finally: + server._sessions.clear() diff --git a/tests/test_tui_gateway_ws.py b/tests/test_tui_gateway_ws.py new file mode 100644 index 00000000000..125cb4ec310 --- /dev/null +++ b/tests/test_tui_gateway_ws.py @@ -0,0 +1,73 @@ +import asyncio + +from tui_gateway import server +from tui_gateway import ws as ws_mod + + +def _run_disconnect(monkeypatch, seed): + """Drive handle_ws to its disconnect `finally`, seeding sessions against the + live WSTransport the moment it exists. Returns nothing; inspect _sessions.""" + monkeypatch.setattr(server, "_finalize_session", lambda s, end_reason="tui_close": None) + + created = [] + real_transport = ws_mod.WSTransport + monkeypatch.setattr( + ws_mod, "WSTransport", + lambda ws, loop, **kw: created.append(real_transport(ws, loop, **kw)) or created[-1], + ) + + class FakeWS: + async def accept(self): + pass + + async def send_text(self, line): + pass + + async def receive_text(self): + seed(created[0]) # transport now exists; attach it to sessions + raise ws_mod._WebSocketDisconnect() + + async def close(self): + pass + + asyncio.run(ws_mod.handle_ws(FakeWS())) + + +def test_ws_disconnect_reaps_flagged_session_and_closes_worker(monkeypatch): + closed = [] + + class FakeWorker: + def close(self): + closed.append(True) + + server._sessions.clear() + try: + _run_disconnect( + monkeypatch, + lambda t: server._sessions.update( + flagged={ + "transport": t, + "close_on_disconnect": True, + "slash_worker": FakeWorker(), + "session_key": "k", + } + ), + ) + assert "flagged" not in server._sessions + assert closed == [True] + finally: + server._sessions.clear() + + +def test_ws_disconnect_preserves_and_repoints_reconnectable_session(monkeypatch): + server._sessions.clear() + try: + _run_disconnect( + monkeypatch, + lambda t: server._sessions.update( + plain={"transport": t, "close_on_disconnect": False, "session_key": "k"} + ), + ) + assert server._sessions["plain"]["transport"] is server._stdio_transport + finally: + server._sessions.clear() diff --git a/tests/test_web_server.py b/tests/test_web_server.py new file mode 100644 index 00000000000..2f32925963f --- /dev/null +++ b/tests/test_web_server.py @@ -0,0 +1,16 @@ +import uvicorn + +from hermes_cli import web_server + + +def test_start_server_enables_ws_ping_for_half_open_detection(monkeypatch): + """WS ping must be configured so half-open connections (reverse-proxy 524, + dropped tunnels) raise WebSocketDisconnect into the reaping path (#32377).""" + captured = {} + monkeypatch.setattr(uvicorn, "run", lambda *args, **kwargs: captured.update(kwargs)) + + # Loopback bind => no auth gate, so this reaches uvicorn.run without setup. + web_server.start_server(host="127.0.0.1", port=0, open_browser=False) + + assert captured["ws_ping_interval"] == 20.0 + assert captured["ws_ping_timeout"] == 20.0 diff --git a/tui_gateway/server.py b/tui_gateway/server.py index 962813c47c5..c7bfd8f84e9 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -130,7 +130,7 @@ _db = None _db_error: str | None = None _stdout_lock = threading.Lock() _cfg_lock = threading.Lock() -_sessions_lock = threading.Lock() +_sessions_lock = threading.RLock() # reentrant: _close_session_by_id may run under callers that already hold it _prompt_lock = threading.Lock() _cfg_cache: dict | None = None _cfg_mtime: float | None = None @@ -243,6 +243,7 @@ class _SlashWorker: if model: argv += ["--model", model] + self._closed = False self.proc = subprocess.Popen( argv, stdin=subprocess.PIPE, @@ -297,15 +298,33 @@ class _SlashWorker: ) def close(self): + if getattr(self, "_closed", False): + return + self._closed = True + proc = self.proc try: - if self.proc.poll() is None: - self.proc.terminate() - self.proc.wait(timeout=1) + if proc.poll() is None: + proc.terminate() + try: + proc.wait(timeout=1) + except Exception: + proc.kill() + try: + proc.wait(timeout=1) # reap the zombie SIGKILL leaves behind + except Exception: + pass except Exception: try: - self.proc.kill() + proc.kill() + proc.wait(timeout=1) except Exception: pass + finally: + for stream in (proc.stdin, proc.stdout, proc.stderr): + try: + stream.close() + except Exception: + pass def _load_busy_input_mode() -> str: @@ -380,7 +399,7 @@ def _finalize_session(session: dict | None, end_reason: str = "tui_close") -> No pass -def _teardown_session(session: dict | None) -> None: +def _teardown_session(session: dict | None, *, end_reason: str = "tui_close") -> None: """Fully tear down a session: finalize, unregister, close agent + worker. Shared by ``session.close`` and the orphaned-WS-session reaper. The @@ -392,19 +411,50 @@ def _teardown_session(session: dict | None) -> None: """ if not session: return - _finalize_session(session) + _finalize_session(session, end_reason=end_reason) try: from tools.approval import unregister_gateway_notify - unregister_gateway_notify(session["session_key"]) + if key := session.get("session_key"): + unregister_gateway_notify(key) except Exception: pass try: agent = session.get("agent") - if agent and hasattr(agent, "close"): + if agent is not None and hasattr(agent, "close"): agent.close() except Exception: pass + # NOTE: the slash-worker subprocess is already closed inside + # _finalize_session (the single _finalized-guarded chokepoint on main). + # No second worker.close() here — it would be redundant (poll()-guarded, + # harmless, but dead). + + +def _attach_worker(sid: str, session: dict, worker) -> None: + """Store worker on session iff sid still maps to it, else close it — a + concurrent teardown already popped the session and would orphan the + worker. Closes the create/close race at every slash-worker spawn site.""" + with _sessions_lock: + if _sessions.get(sid) is session: + session["slash_worker"] = worker + return + worker.close() + + +def _close_session_by_id(sid: str, *, end_reason: str = "tui_close") -> bool: + """Single idempotent teardown for one session: pop it under the sessions + lock, then finalize, unregister notify, close agent + slash worker via the + shared ``_teardown_session`` path. Returns True iff it closed a live + session. The ``_finalized`` / worker ``_closed`` guards make concurrent or + repeat calls (e.g. session.close racing the WS-orphan reaper) harmless.""" + with _sessions_lock: + session = _sessions.pop(sid, None) + if session is None: + return False + _teardown_session(session, end_reason=end_reason) + return True + def _ws_session_is_orphaned(session: dict | None) -> bool: @@ -432,30 +482,125 @@ def _schedule_ws_orphan_reap(sid: str) -> None: return def _reap() -> None: + # Serialize the orphan re-check against session.resume (which re-binds a + # live transport under _session_resume_lock and would make this session + # non-orphaned). The actual pop + teardown then goes through the shared + # _close_session_by_id funnel so the dict mutation happens under + # _sessions_lock — consistent with every other _sessions mutator + # (#39591: _reap previously popped under _session_resume_lock, giving no + # mutual exclusion against _init_session / _close_session_by_id, which + # guard with _sessions_lock). _sessions_lock is an RLock and the global + # ordering is always resume_lock -> sessions_lock, so nesting is safe. with _session_resume_lock: - session = _sessions.get(sid) - if not _ws_session_is_orphaned(session): + if not _ws_session_is_orphaned(_sessions.get(sid)): return - _sessions.pop(sid, None) - try: - _teardown_session(session) - except Exception: - pass + _close_session_by_id(sid, end_reason="ws_orphan_reap") timer = threading.Timer(_WS_ORPHAN_REAP_GRACE_S, _reap) timer.daemon = True timer.start() +def _close_sessions_for_transport( + transport, *, end_reason: str = "ws_disconnect" +) -> tuple[int, int]: + """On transport disconnect, reap the sessions that opted into + close_on_disconnect (sidecar/dashboard) immediately via the unified + ``_close_session_by_id`` path, and re-point the rest back to stdio so later + emits don't hit a dead socket. + + Non-flagged detached sessions are handed to the grace-windowed WS-orphan + reaper (``_schedule_ws_orphan_reap``): a quick reconnect / session.resume + that re-binds a live transport cancels the reap, otherwise the orphan is + torn down through the same idempotent ``_teardown_session`` path. This is + the single WS-disconnect teardown entry point — there is no second + independent reap loop in ``handle_ws``. + + Returns ``(reaped, detached)`` counts for disconnect-path observability.""" + with _sessions_lock: + owned = [(sid, s) for sid, s in _sessions.items() if s.get("transport") is transport] + reaped = 0 + detached = 0 + for sid, session in owned: + if session.get("close_on_disconnect"): + _close_session_by_id(sid, end_reason=end_reason) + reaped += 1 + else: + # Point detached sessions at the drop sentinel (NOT real stdio) so + # _ws_session_is_orphaned recognizes them and the grace-reap can + # actually fire; a standalone `hermes --tui` keeps real _stdio. + session["transport"] = _detached_ws_transport + detached += 1 + try: + _schedule_ws_orphan_reap(sid) + except Exception: + pass + return reaped, detached + + def _shutdown_sessions() -> None: with _sessions_lock: - snapshot = list(_sessions.values()) - for session in snapshot: - # _finalize_session closes the slash-worker subprocess too. - _finalize_session(session, end_reason="tui_shutdown") + sids = list(_sessions) + for sid in sids: + _close_session_by_id(sid, end_reason="tui_shutdown") + + +# Last-resort net for any disconnect path that slips past the WS finally. TTL is +# hours-scale because last_active freezes during a long turn and on passive +# viewing — running/pending/starting/live-transport are hard exemptions instead. +try: + _SESSION_TTL_S = float(os.environ.get("HERMES_TUI_SESSION_TTL_S") or 6 * 3600) +except (TypeError, ValueError): + _SESSION_TTL_S = float(6 * 3600) +_SESSION_TTL_S = max(0.0, _SESSION_TTL_S) +_REAPER_SCAN_S = 300.0 + + +def _transport_is_dead(transport) -> bool: + # _detached_ws_transport is the post-WS-disconnect drop sentinel; a session + # parked on it has no live client. _stdio_transport is the REAL transport + # for a standalone `hermes --tui`, so it must NOT count as dead here (doing + # so let the idle reaper evict healthy standalone TUI sessions). + if transport is _detached_ws_transport: + return True + return getattr(transport, "_closed", None) is True + + +def _session_is_evictable(sid: str, session: dict, now: float) -> bool: + if session.get("running") or _session_pending_kind(sid): + return False + ready = session.get("agent_ready") + if ready is not None and not ready.is_set(): # still starting + return False + if not _transport_is_dead(session.get("transport")): + return False + last_active = float(session.get("last_active") or 0.0) + created_at = float(session.get("created_at") or 0.0) + return (now - last_active) > _SESSION_TTL_S and (now - created_at) > _SESSION_TTL_S + + +def _reap_idle_sessions() -> None: + now = time.time() + with _sessions_lock: + victims = [sid for sid, s in _sessions.items() if _session_is_evictable(sid, s, now)] + for sid in victims: + _close_session_by_id(sid, end_reason="idle_timeout") + + +def _start_idle_reaper() -> None: + def _loop(): + while True: + time.sleep(_REAPER_SCAN_S) + try: + _reap_idle_sessions() + except Exception: + pass + + threading.Thread(target=_loop, daemon=True).start() atexit.register(_shutdown_sessions) +_start_idle_reaper() # ── Plumbing ────────────────────────────────────────────────────────── @@ -726,7 +871,7 @@ def _start_agent_build(sid: str, session: dict) -> None: try: worker = _SlashWorker(key, getattr(agent, "model", _resolve_model())) - current["slash_worker"] = worker + _attach_worker(sid, current, worker) except Exception: pass @@ -771,21 +916,18 @@ def _start_agent_build(sid: str, session: dict) -> None: finally: if home_token is not None: reset_hermes_home_override(home_token) + # _attach_worker already closed the worker if this session was + # reaped mid-build; only the late notify registration can still + # leak (session.close unregistered before _build registered it). with _sessions_lock: replaced = _sessions.get(sid) is not current - if replaced: - if worker is not None: - try: - worker.close() - except Exception: - pass - if notify_registered: - try: - from tools.approval import unregister_gateway_notify + if replaced and notify_registered: + try: + from tools.approval import unregister_gateway_notify - unregister_gateway_notify(key) - except Exception: - pass + unregister_gateway_notify(key) + except Exception: + pass ready.set() threading.Thread(target=_build, daemon=True).start() @@ -1440,7 +1582,7 @@ def _tool_progress_enabled(sid: str) -> bool: return _session_tool_progress_mode(sid) != "off" -def _restart_slash_worker(session: dict): +def _restart_slash_worker(sid: str, session: dict): worker = session.get("slash_worker") if worker: try: @@ -1448,12 +1590,18 @@ def _restart_slash_worker(session: dict): except Exception: pass try: - session["slash_worker"] = _SlashWorker( + new_worker = _SlashWorker( session["session_key"], getattr(session.get("agent"), "model", _resolve_model()), ) except Exception: session["slash_worker"] = None + return + # Route through the same store-iff-still-mapped guard as the spawn sites: + # the post-turn restart runs as `running` flips false, exactly when a + # close_on_disconnect reap can pop this session — a bare store would orphan + # the fresh worker (it self-heals only on gateway exit via the watchdog). + _attach_worker(sid, session, new_worker) def _persist_model_switch(result) -> None: @@ -1539,7 +1687,7 @@ def _apply_model_switch(sid: str, session: dict, raw_input: str) -> dict: base_url=result.base_url, api_mode=result.api_mode, ) - _restart_slash_worker(session) + _restart_slash_worker(sid, session) _emit("session.info", sid, _session_info(agent, session)) # Record the switch as a PER-SESSION override so a later rebuild of THIS @@ -1692,7 +1840,7 @@ def _sync_session_key_after_compress( session["pending_title"] = None if restart_slash_worker: try: - _restart_slash_worker(session) + _restart_slash_worker(sid, session) except Exception: pass @@ -2597,7 +2745,7 @@ def _reset_session_agent(sid: str, session: dict) -> dict: session["history_version"] = int(session.get("history_version", 0)) + 1 info = _session_info(new_agent, session) _emit("session.info", sid, info) - _restart_slash_worker(session) + _restart_slash_worker(sid, session) return info @@ -2745,8 +2893,10 @@ def _init_session(sid: str, key: str, agent, history: list, cols: int = 80): logger.debug("failed to persist resumed session cwd", exc_info=True) _register_session_cwd(_sessions[sid]) try: - _sessions[sid]["slash_worker"] = _SlashWorker( - key, getattr(agent, "model", _resolve_model()) + _attach_worker( + sid, + _sessions[sid], + _SlashWorker(key, getattr(agent, "model", _resolve_model())), ) except Exception: # Defer hard-failure to slash.exec; chat still works without slash worker. @@ -3141,6 +3291,7 @@ def _(rid, params: dict) -> dict: "agent_error": None, "agent_ready": ready, "attached_images": [], + "close_on_disconnect": is_truthy_value(params.get("close_on_disconnect", False)), "cols": cols, "created_at": now, "edit_snapshots": {}, @@ -3584,10 +3735,26 @@ def _(rid, params: dict) -> dict: except Exception as e: return _err(rid, 5036, f"could not enumerate active sessions: {e}") + # Liveness filter (#38950): a session whose teardown has begun (``_finalized``) + # is dead — its agent/worker are being released and it is no longer + # attachable — but it can briefly remain in ``_sessions`` until the reaper + # pops it (the WS grace-reap and idle reaper both set ``_finalized`` inside + # ``_teardown_session`` before the pop). Counting these inflated the footer's + # "N sessions" count, which only ever went up until a gateway restart. Drop + # them here so the count reflects genuinely attachable sessions. We do NOT + # filter on ``transport is _detached_ws_transport`` (the WS-detached drop + # sentinel): a detached session is still attachable via a quick reconnect / + # session.resume until the grace-reap finalizes it, and a standalone + # ``hermes --tui`` session legitimately rides the real stdio transport and + # must stay visible. # Keep the natural creation/insertion order from ``_sessions``. The # frontend marks the focused session with ``current``; it should not jump to # the top just because the user switched to it. - rows = [_session_live_item(sid, session, current) for sid, session in snapshot] + rows = [ + _session_live_item(sid, session, current) + for sid, session in snapshot + if not session.get("_finalized") + ] return _ok(rid, {"sessions": rows}) @@ -4002,17 +4169,13 @@ def _(rid, params: dict) -> dict: @method("session.close") def _(rid, params: dict) -> dict: sid = params.get("session_id", "") - with _sessions_lock: - current = _sessions.get(sid) - if not current: - return _ok(rid, {"closed": False}) + # Serialize against the WS-orphan reaper (which also pops under + # _session_resume_lock) so a disconnect-reap and an explicit close can't + # both tear the same session down. _close_session_by_id is the single + # idempotent teardown path (pop + _teardown_session) and returns False + # when the session is already gone. with _session_resume_lock: - with _sessions_lock: - session = _sessions.pop(sid, None) - if not session: - return _ok(rid, {"closed": False}) - _teardown_session(session) - return _ok(rid, {"closed": True}) + return _ok(rid, {"closed": _close_session_by_id(sid, end_reason="tui_close")}) @method("session.branch") @@ -7793,7 +7956,7 @@ def _(rid, params: dict) -> dict: session["session_key"], getattr(session.get("agent"), "model", _resolve_model()), ) - session["slash_worker"] = worker + _attach_worker(params.get("session_id", ""), session, worker) except Exception as e: return _err(rid, 5030, f"slash worker start failed: {e}") diff --git a/tui_gateway/slash_worker.py b/tui_gateway/slash_worker.py index 49436d57449..fce8ec3e26b 100644 --- a/tui_gateway/slash_worker.py +++ b/tui_gateway/slash_worker.py @@ -19,8 +19,22 @@ from cli import HermesCLI from rich.console import Console # Env-overridable so the integration test can drive sub-second timing. -_WATCHDOG_POLL_S = float(os.environ.get("HERMES_SLASH_WATCHDOG_POLL_S", 2.0)) -_ORPHAN_GRACE_S = float(os.environ.get("HERMES_SLASH_WATCHDOG_GRACE_S", 5.0)) +def _env_float(name: str, default: float) -> float: + """Parse a float env knob, falling back to ``default`` on absent/malformed + values. A bare ``float(os.environ.get(...))`` would raise ValueError at + import time on a typo (e.g. ``HERMES_SLASH_WATCHDOG_POLL_S=2s``) and kill + the worker before it can serve a single command.""" + raw = os.environ.get(name) + if not raw: + return default + try: + return float(raw) + except (TypeError, ValueError): + return default + + +_WATCHDOG_POLL_S = max(0.05, _env_float("HERMES_SLASH_WATCHDOG_POLL_S", 2.0)) +_ORPHAN_GRACE_S = max(0.0, _env_float("HERMES_SLASH_WATCHDOG_GRACE_S", 5.0)) _in_flight = threading.Event() # set while a command is executing diff --git a/tui_gateway/ws.py b/tui_gateway/ws.py index 2e88f457802..738ed9b1b80 100644 --- a/tui_gateway/ws.py +++ b/tui_gateway/ws.py @@ -283,41 +283,44 @@ async def handle_ws(ws: Any) -> None: ) break finally: + reaped_sessions = 0 detached_sessions = 0 - reaped_scheduled = 0 if transport is not None: transport.close() - # Detach the transport from any sessions it owned so later emits - # do not crash into a closed socket or fall through to desktop - # stdout logs. Schedule a grace-delayed reap; a quick reconnect / - # session.resume re-binds a live transport and cancels it (see - # _ws_session_is_orphaned). - for _sid, sess in list(server._sessions.items()): - if sess.get("transport") is transport: - sess["transport"] = server._detached_ws_transport - detached_sessions += 1 - try: - server._schedule_ws_orphan_reap(_sid) - reaped_scheduled += 1 - except Exception: - _log.exception( - "ws orphan-reap schedule failed peer=%s sid=%s", - peer, - _sid, - ) + # Reap sessions this transport owned (close_on_disconnect sidecar + # sessions) or detach the rest to the drop sentinel so later emits + # don't crash into a closed socket or fall through to desktop stdout + # logs. Detached sessions are handed to the grace-windowed WS-orphan + # reaper inside _close_sessions_for_transport (a quick reconnect / + # session.resume cancels it). This is the single WS-disconnect + # teardown path. + # + # Offloaded: _close_session_by_id does a blocking worker.close() + # (terminate + waits) plus a synchronous DB write — inline that + # would freeze the uvicorn event loop for every other live + # connection. + try: + reaped_sessions, detached_sessions = await asyncio.to_thread( + server._close_sessions_for_transport, + transport, + end_reason="ws_disconnect", + ) + except Exception: + _log.exception("ws transport teardown failed peer=%s", peer) try: await ws.close() except Exception as exc: _log.debug("ws close failed peer=%s error=%s", peer, exc) _log.info( "ws closed peer=%s reason=%s messages=%d parse_errors=%d " - "dispatch_crashes=%d send_failures=%d detached_sessions=%d", + "dispatch_crashes=%d send_failures=%d reaped_sessions=%d detached_sessions=%d", peer, disconnect_reason, messages, parse_errors, dispatch_crashes, send_failures, + reaped_sessions, detached_sessions, ) diff --git a/web/src/components/ChatSidebar.tsx b/web/src/components/ChatSidebar.tsx index ec8ffa442e8..e24ddfa5b10 100644 --- a/web/src/components/ChatSidebar.tsx +++ b/web/src/components/ChatSidebar.tsx @@ -120,7 +120,11 @@ export function ChatSidebar({ channel, className }: ChatSidebarProps) { if (cancelled) { return; } - return gw.request<{ session_id: string }>("session.create", {}); + // close_on_disconnect: the gateway reaps this sidecar session (and its + // slash_worker subprocess) when the WS drops, instead of leaking it. + return gw.request<{ session_id: string }>("session.create", { + close_on_disconnect: true, + }); }) .then((created) => { if (cancelled || !created?.session_id) {