fix(tui-gateway): reap leaked slash_worker sessions on disconnect + active_list liveness (re-scoped onto current main)

Salvaged from #35626 (banditburai) and re-scoped after maintainers landed the
parent-death watchdog (slash_worker.py) and PTY process-group teardown
(pty_bridge.py) directly on main. Those pieces are intentionally NOT included
here — this carries only what is still missing:

- C1 disconnect reap: ws.py's `finally` only re-pointed the dead transport at
  stdio. `_close_sessions_for_transport` now reaps `close_on_disconnect`
  sessions and schedules the grace-reap for the rest, offloaded via
  `asyncio.to_thread` so the blocking worker.close() + DB write never stalls
  the uvicorn loop.
- C2 create/close orphan race: `_attach_worker` stores the worker iff
  `_sessions.get(sid) is session` under the lock (else closes it), applied at
  every spawn site incl. the post-turn `_restart_slash_worker`.
- Single idempotent teardown funnel: session.close, WS disconnect, the
  generous-TTL idle reaper, shutdown, and the WS grace-reap all reach
  `_close_session_by_id` → `_teardown_session`; `_finalized`/`_closed` flags
  make concurrent/double teardown a no-op. `_sessions_lock` upgraded to RLock.
- uvicorn `ws_ping_interval/timeout=20s` so a half-open socket (reverse-proxy
  524) becomes a `WebSocketDisconnect` and the C1 path runs.

Plus two review-driven hardening fixes (mine):

- `session.active_list` now skips `_finalized` sessions so the footer
  "N sessions" count reflects attachable sessions instead of only ever
  growing until restart (#38950). Keys on `_finalized` only, NOT the stdio
  sentinel, so a standalone `hermes --tui` session stays visible.
- `_schedule_ws_orphan_reap._reap` pops via `_close_session_by_id`
  (under `_sessions_lock`) instead of `_sessions.pop` under the unrelated
  `_session_resume_lock` (#39591); the resume_lock now only guards the orphan
  re-check against `session.resume`.
- Float env knobs (`HERMES_SLASH_WATCHDOG_*`, `HERMES_TUI_SESSION_TTL_S`)
  parse with a fallback helper so a malformed value can't crash the worker at
  import.

Fixes #32377
Fixes #38950
Addresses #22855

Co-authored-by: banditburai <123342691+banditburai@users.noreply.github.com>
Co-authored-by: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com>
This commit is contained in:
firefly 2026-06-08 19:50:59 +05:30 committed by teknium1
parent 9c9d9113a8
commit ae94ed1728
No known key found for this signature in database
9 changed files with 689 additions and 80 deletions

View file

@ -10142,4 +10142,9 @@ def start_server(
uvicorn.run(
app, host=host, port=port, log_level="warning",
proxy_headers=bool(app.state.auth_required),
# Detect half-open WS connections (reverse-proxy 524, dropped tunnels)
# within ~20-40s so WebSocketDisconnect fires the disconnect→reap path.
# 20s stays under Cloudflare Tunnel's idle timeout, keeping it warm.
ws_ping_interval=20.0,
ws_ping_timeout=20.0,
)

View file

@ -0,0 +1,13 @@
import re
from pathlib import Path
CHAT_SIDEBAR = Path(__file__).resolve().parent.parent / "web/src/components/ChatSidebar.tsx"
def test_sidecar_session_create_requests_close_on_disconnect():
"""The sidecar must opt its session into close_on_disconnect so the gateway
reaps the slash_worker on WS disconnect (the #21370/#21467 leak)."""
source = CHAT_SIDEBAR.read_text(encoding="utf-8")
call = re.search(r'"session\.create",\s*\{(.*?)\}', source, re.DOTALL)
assert call, "sidecar session.create call not found"
assert re.search(r"close_on_disconnect:\s*true", call.group(1))

View file

@ -1,5 +1,6 @@
import json
import os
import subprocess
import sys
import threading
import time
@ -2256,7 +2257,7 @@ def test_config_set_model_global_persists(monkeypatch):
server._sessions["sid"] = _session(agent=_Agent())
monkeypatch.setattr("hermes_cli.model_switch.switch_model", _switch_model)
monkeypatch.setattr(server, "_restart_slash_worker", lambda session: None)
monkeypatch.setattr(server, "_restart_slash_worker", lambda sid, session: None)
monkeypatch.setattr(server, "_emit", lambda *args, **kwargs: None)
monkeypatch.setattr("hermes_cli.config.save_config", lambda cfg: saved.update(cfg))
@ -2315,7 +2316,7 @@ def test_config_set_model_does_not_leak_inference_provider_env(monkeypatch):
monkeypatch.setattr(
"hermes_cli.model_switch.switch_model", lambda **_kwargs: result
)
monkeypatch.setattr(server, "_restart_slash_worker", lambda session: None)
monkeypatch.setattr(server, "_restart_slash_worker", lambda sid, session: None)
monkeypatch.setattr(server, "_emit", lambda *args, **kwargs: None)
try:
@ -2376,7 +2377,7 @@ def test_config_set_model_records_per_session_override_not_env(monkeypatch):
monkeypatch.setattr(
"hermes_cli.model_switch.switch_model", lambda **_kwargs: result
)
monkeypatch.setattr(server, "_restart_slash_worker", lambda session: None)
monkeypatch.setattr(server, "_restart_slash_worker", lambda sid, session: None)
monkeypatch.setattr(server, "_emit", lambda *args, **kwargs: None)
try:
@ -2431,7 +2432,7 @@ def test_config_set_model_switches_agent_without_touching_env(monkeypatch):
monkeypatch.setenv("HERMES_TUI_PROVIDER", "openai-codex")
monkeypatch.delenv("HERMES_MODEL", raising=False)
monkeypatch.delenv("HERMES_INFERENCE_MODEL", raising=False)
monkeypatch.setattr(server, "_restart_slash_worker", lambda session: None)
monkeypatch.setattr(server, "_restart_slash_worker", lambda sid, session: None)
monkeypatch.setattr(server, "_emit", lambda *args, **kwargs: None)
def fake_switch_model(**kwargs):
@ -2583,7 +2584,7 @@ def test_session_compress_syncs_session_key_after_rotation(monkeypatch):
monkeypatch.setattr(server, "_session_info", lambda _agent, *a: {"model": "x"})
restart_calls = []
monkeypatch.setattr(
server, "_restart_slash_worker", lambda s: restart_calls.append(s)
server, "_restart_slash_worker", lambda sid, s: restart_calls.append(s)
)
try:
@ -4518,6 +4519,56 @@ def test_session_active_list_reports_live_sessions(monkeypatch):
assert rows["sid-b"]["preview"] == "writing code"
def test_session_active_list_excludes_finalized_sessions(monkeypatch):
"""#38950: a finalized-but-not-yet-popped session must not inflate the count.
The WS grace-reap and idle reaper set ``_finalized`` inside
``_teardown_session`` before popping the entry from ``_sessions``. During
that window ``session.active_list`` would otherwise still report the dead
session, which is exactly the footer "N sessions" count that only ever grew
until a gateway restart. A live session on the real stdio transport (the
standalone ``hermes --tui`` case) must still be reported.
"""
class _DB:
def get_session_title(self, key):
return {"key-live": "Live", "key-dead": "Dead"}.get(key, "")
previous_sessions = dict(server._sessions)
server._sessions.clear()
monkeypatch.setattr(server, "_get_db", lambda: _DB())
server._sessions["sid-live"] = _session(
agent=types.SimpleNamespace(model="model-live"),
history=[{"role": "user", "content": "still here"}],
session_key="key-live",
created_at=10.0,
last_active=20.0,
)
dead = _session(
agent=types.SimpleNamespace(model="model-dead"),
history=[{"role": "user", "content": "gone"}],
session_key="key-dead",
created_at=11.0,
last_active=21.0,
)
dead["_finalized"] = True
server._sessions["sid-dead"] = dead
try:
resp = server.handle_request(
{
"id": "1",
"method": "session.active_list",
"params": {},
}
)
finally:
server._sessions.clear()
server._sessions.update(previous_sessions)
session_rows = resp["result"]["sessions"]
assert [row["id"] for row in session_rows] == ["sid-live"]
def test_session_activate_returns_inflight_stream_before_completion(monkeypatch):
"""Switching into a still-running live session must hydrate partial output.
@ -6096,3 +6147,270 @@ def test_sniff_image_ext_magic_and_filename():
assert server._sniff_image_ext(b"unknown") == ".png" # fallback
# filename hint wins over magic bytes
assert server._sniff_image_ext(b"\x89PNG", "photo.jpeg") == ".jpeg"
def test_slash_worker_close_reaps_zombie_and_closes_fds():
"""A hung worker is SIGKILLed, the zombie reaped, all pipes closed — once."""
calls = {k: 0 for k in ("terminate", "kill", "wait", "stdin", "stdout", "stderr")}
class FakeStream:
def __init__(self, name):
self.name = name
def close(self):
calls[self.name] += 1
class FakeProc:
stdin, stdout, stderr = (FakeStream(n) for n in ("stdin", "stdout", "stderr"))
def poll(self):
return None # always alive -> forces terminate then kill
def terminate(self):
calls["terminate"] += 1
def kill(self):
calls["kill"] += 1
def wait(self, timeout=None):
calls["wait"] += 1
raise subprocess.TimeoutExpired(cmd="x", timeout=timeout)
worker = object.__new__(server._SlashWorker)
worker.proc = FakeProc()
worker.close()
worker.close() # idempotent
assert calls["terminate"] == 1
assert calls["kill"] == 1
assert calls["wait"] >= 2 # reaped after both terminate and kill
assert calls["stdin"] == calls["stdout"] == calls["stderr"] == 1
def test_close_session_by_id_is_idempotent_and_full(monkeypatch):
"""One call tears the session down fully; a second is a no-op."""
calls = {"worker": 0, "agent": 0, "unreg": 0, "finalize": 0}
class W:
def close(self):
calls["worker"] += 1
class A:
def close(self):
calls["agent"] += 1
monkeypatch.setattr(
server, "_finalize_session",
lambda s, end_reason="tui_close": calls.__setitem__("finalize", calls["finalize"] + 1),
)
monkeypatch.setattr(
"tools.approval.unregister_gateway_notify",
lambda key: calls.__setitem__("unreg", calls["unreg"] + 1), raising=False,
)
server._sessions["sid-1"] = {"session_key": "k1", "agent": A(), "slash_worker": W()}
assert server._close_session_by_id("sid-1", end_reason="ws_disconnect") is True
assert server._close_session_by_id("sid-1", end_reason="ws_disconnect") is False
assert calls == {"worker": 1, "agent": 1, "unreg": 1, "finalize": 1}
assert "sid-1" not in server._sessions
def test_attach_worker_closes_orphan_when_session_already_torn_down():
"""A worker built after its session was reaped must be closed, not orphaned."""
closed = []
class W:
def close(self):
closed.append(True)
server._sessions.pop("gone", None)
detached = {"session_key": "k"} # not in _sessions -> already torn down
server._attach_worker("gone", detached, W())
assert closed == [True]
assert "slash_worker" not in detached
assert "gone" not in server._sessions
def test_attach_worker_stores_worker_on_live_session():
class W:
def close(self):
raise AssertionError("must not close a worker for a live session")
live = {"session_key": "k"}
server._sessions["live"] = live
worker = W()
try:
server._attach_worker("live", live, worker)
assert live["slash_worker"] is worker
finally:
server._sessions.pop("live", None)
def test_restart_slash_worker_closes_orphan_when_session_reaped(monkeypatch):
"""Post-turn restart of a session reaped mid-flight (e.g. close_on_disconnect
fired while `running` flipped false) must close the fresh worker, not orphan it."""
closed = []
class _FakeWorker:
def __init__(self, *a, **k):
pass
def close(self):
closed.append(True)
monkeypatch.setattr(server, "_SlashWorker", _FakeWorker)
server._sessions.pop("reaped", None)
reaped = {"session_key": "k"} # not in _sessions -> torn down concurrently
server._restart_slash_worker("reaped", reaped)
assert closed == [True]
assert reaped.get("slash_worker") is None
assert "reaped" not in server._sessions
def test_restart_slash_worker_stores_on_live_session(monkeypatch):
class _FakeWorker:
def __init__(self, *a, **k):
pass
def close(self):
pass
monkeypatch.setattr(server, "_SlashWorker", _FakeWorker)
live = {"session_key": "k", "slash_worker": None}
server._sessions["live-restart"] = live
try:
server._restart_slash_worker("live-restart", live)
assert isinstance(live["slash_worker"], _FakeWorker)
finally:
server._sessions.pop("live-restart", None)
def test_session_close_rpc_delegates_to_close_session_by_id(monkeypatch):
seen = []
monkeypatch.setattr(
server, "_close_session_by_id",
lambda sid, *, end_reason: bool(seen.append((sid, end_reason))) or True,
)
resp = server.handle_request(
{"id": "1", "method": "session.close", "params": {"session_id": "s9"}}
)
assert resp["result"] == {"closed": True}
assert seen == [("s9", "tui_close")]
def test_close_sessions_for_transport_closes_flagged_repoints_rest(monkeypatch):
seen = []
monkeypatch.setattr(
server, "_close_session_by_id",
lambda sid, *, end_reason: bool(seen.append((sid, end_reason))) or True,
)
transport = object() # the disconnecting transport
server._sessions.clear()
server._sessions["a"] = {"transport": transport, "close_on_disconnect": True}
server._sessions["b"] = {"transport": transport, "close_on_disconnect": False}
try:
server._close_sessions_for_transport(transport, end_reason="ws_disconnect")
assert seen == [("a", "ws_disconnect")] # only the flagged one closed
assert server._sessions["b"]["transport"] is server._stdio_transport # re-pointed
finally:
server._sessions.clear()
def test_session_create_records_close_on_disconnect_flag(monkeypatch):
monkeypatch.setattr(server, "_start_agent_build", lambda sid, session: None)
server._sessions.clear()
try:
on = server.handle_request(
{"id": "1", "method": "session.create", "params": {"close_on_disconnect": True}}
)["result"]["session_id"]
off = server.handle_request(
{"id": "2", "method": "session.create", "params": {}}
)["result"]["session_id"]
assert server._sessions[on]["close_on_disconnect"]
assert not server._sessions[off]["close_on_disconnect"]
finally:
server._sessions.clear()
def test_shutdown_sessions_closes_every_session_via_helper(monkeypatch):
seen = []
monkeypatch.setattr(
server, "_close_session_by_id",
lambda sid, *, end_reason: seen.append((sid, end_reason)),
)
server._sessions.clear()
server._sessions["a"] = {}
server._sessions["b"] = {}
try:
server._shutdown_sessions()
assert sorted(sid for sid, _ in seen) == ["a", "b"]
assert {reason for _, reason in seen} == {"tui_shutdown"}
finally:
server._sessions.clear()
def _idle_evictable_session(now):
"""A session that satisfies every eviction precondition."""
ready = threading.Event()
ready.set()
old = now - 10 * 3600 # well past the 6h TTL
return {
"running": False,
"agent_ready": ready,
"transport": server._stdio_transport, # dead/detached
"last_active": old,
"created_at": old,
}
def test_session_is_evictable_when_idle_dead_and_quiescent(monkeypatch):
monkeypatch.setattr(server, "_session_pending_kind", lambda sid: "")
now = time.time()
assert server._session_is_evictable("s", _idle_evictable_session(now), now) is True
def test_session_not_evictable_violating_each_exemption(monkeypatch):
monkeypatch.setattr(server, "_session_pending_kind", lambda sid: "")
now = time.time()
live_transport = type("T", (), {"_closed": False})()
running = _idle_evictable_session(now) | {"running": True}
assert server._session_is_evictable("s", running, now) is False
starting = _idle_evictable_session(now)
starting["agent_ready"] = threading.Event() # not set -> still starting
assert server._session_is_evictable("s", starting, now) is False
on_socket = _idle_evictable_session(now) | {"transport": live_transport}
assert server._session_is_evictable("s", on_socket, now) is False
recent = _idle_evictable_session(now) | {"last_active": now}
assert server._session_is_evictable("s", recent, now) is False
young = _idle_evictable_session(now) | {"created_at": now}
assert server._session_is_evictable("s", young, now) is False
# Pending input request, even when everything else looks idle.
monkeypatch.setattr(server, "_session_pending_kind", lambda sid: "input")
assert server._session_is_evictable("s", _idle_evictable_session(now), now) is False
def test_reap_idle_sessions_closes_only_evictable(monkeypatch):
closed = []
monkeypatch.setattr(server, "_session_pending_kind", lambda sid: "")
monkeypatch.setattr(
server, "_close_session_by_id",
lambda sid, *, end_reason: closed.append((sid, end_reason)),
)
now = time.time()
server._sessions.clear()
server._sessions["stale"] = _idle_evictable_session(now)
server._sessions["fresh"] = _idle_evictable_session(now) | {"last_active": now}
try:
server._reap_idle_sessions()
assert closed == [("stale", "idle_timeout")]
finally:
server._sessions.clear()

View file

@ -0,0 +1,73 @@
import asyncio
from tui_gateway import server
from tui_gateway import ws as ws_mod
def _run_disconnect(monkeypatch, seed):
"""Drive handle_ws to its disconnect `finally`, seeding sessions against the
live WSTransport the moment it exists. Returns nothing; inspect _sessions."""
monkeypatch.setattr(server, "_finalize_session", lambda s, end_reason="tui_close": None)
created = []
real_transport = ws_mod.WSTransport
monkeypatch.setattr(
ws_mod, "WSTransport",
lambda ws, loop, **kw: created.append(real_transport(ws, loop, **kw)) or created[-1],
)
class FakeWS:
async def accept(self):
pass
async def send_text(self, line):
pass
async def receive_text(self):
seed(created[0]) # transport now exists; attach it to sessions
raise ws_mod._WebSocketDisconnect()
async def close(self):
pass
asyncio.run(ws_mod.handle_ws(FakeWS()))
def test_ws_disconnect_reaps_flagged_session_and_closes_worker(monkeypatch):
closed = []
class FakeWorker:
def close(self):
closed.append(True)
server._sessions.clear()
try:
_run_disconnect(
monkeypatch,
lambda t: server._sessions.update(
flagged={
"transport": t,
"close_on_disconnect": True,
"slash_worker": FakeWorker(),
"session_key": "k",
}
),
)
assert "flagged" not in server._sessions
assert closed == [True]
finally:
server._sessions.clear()
def test_ws_disconnect_preserves_and_repoints_reconnectable_session(monkeypatch):
server._sessions.clear()
try:
_run_disconnect(
monkeypatch,
lambda t: server._sessions.update(
plain={"transport": t, "close_on_disconnect": False, "session_key": "k"}
),
)
assert server._sessions["plain"]["transport"] is server._stdio_transport
finally:
server._sessions.clear()

16
tests/test_web_server.py Normal file
View file

@ -0,0 +1,16 @@
import uvicorn
from hermes_cli import web_server
def test_start_server_enables_ws_ping_for_half_open_detection(monkeypatch):
"""WS ping must be configured so half-open connections (reverse-proxy 524,
dropped tunnels) raise WebSocketDisconnect into the reaping path (#32377)."""
captured = {}
monkeypatch.setattr(uvicorn, "run", lambda *args, **kwargs: captured.update(kwargs))
# Loopback bind => no auth gate, so this reaches uvicorn.run without setup.
web_server.start_server(host="127.0.0.1", port=0, open_browser=False)
assert captured["ws_ping_interval"] == 20.0
assert captured["ws_ping_timeout"] == 20.0

View file

@ -130,7 +130,7 @@ _db = None
_db_error: str | None = None
_stdout_lock = threading.Lock()
_cfg_lock = threading.Lock()
_sessions_lock = threading.Lock()
_sessions_lock = threading.RLock() # reentrant: _close_session_by_id may run under callers that already hold it
_prompt_lock = threading.Lock()
_cfg_cache: dict | None = None
_cfg_mtime: float | None = None
@ -243,6 +243,7 @@ class _SlashWorker:
if model:
argv += ["--model", model]
self._closed = False
self.proc = subprocess.Popen(
argv,
stdin=subprocess.PIPE,
@ -297,15 +298,33 @@ class _SlashWorker:
)
def close(self):
if getattr(self, "_closed", False):
return
self._closed = True
proc = self.proc
try:
if self.proc.poll() is None:
self.proc.terminate()
self.proc.wait(timeout=1)
if proc.poll() is None:
proc.terminate()
try:
proc.wait(timeout=1)
except Exception:
proc.kill()
try:
proc.wait(timeout=1) # reap the zombie SIGKILL leaves behind
except Exception:
pass
except Exception:
try:
self.proc.kill()
proc.kill()
proc.wait(timeout=1)
except Exception:
pass
finally:
for stream in (proc.stdin, proc.stdout, proc.stderr):
try:
stream.close()
except Exception:
pass
def _load_busy_input_mode() -> str:
@ -380,7 +399,7 @@ def _finalize_session(session: dict | None, end_reason: str = "tui_close") -> No
pass
def _teardown_session(session: dict | None) -> None:
def _teardown_session(session: dict | None, *, end_reason: str = "tui_close") -> None:
"""Fully tear down a session: finalize, unregister, close agent + worker.
Shared by ``session.close`` and the orphaned-WS-session reaper. The
@ -392,19 +411,50 @@ def _teardown_session(session: dict | None) -> None:
"""
if not session:
return
_finalize_session(session)
_finalize_session(session, end_reason=end_reason)
try:
from tools.approval import unregister_gateway_notify
unregister_gateway_notify(session["session_key"])
if key := session.get("session_key"):
unregister_gateway_notify(key)
except Exception:
pass
try:
agent = session.get("agent")
if agent and hasattr(agent, "close"):
if agent is not None and hasattr(agent, "close"):
agent.close()
except Exception:
pass
# NOTE: the slash-worker subprocess is already closed inside
# _finalize_session (the single _finalized-guarded chokepoint on main).
# No second worker.close() here — it would be redundant (poll()-guarded,
# harmless, but dead).
def _attach_worker(sid: str, session: dict, worker) -> None:
"""Store worker on session iff sid still maps to it, else close it — a
concurrent teardown already popped the session and would orphan the
worker. Closes the create/close race at every slash-worker spawn site."""
with _sessions_lock:
if _sessions.get(sid) is session:
session["slash_worker"] = worker
return
worker.close()
def _close_session_by_id(sid: str, *, end_reason: str = "tui_close") -> bool:
"""Single idempotent teardown for one session: pop it under the sessions
lock, then finalize, unregister notify, close agent + slash worker via the
shared ``_teardown_session`` path. Returns True iff it closed a live
session. The ``_finalized`` / worker ``_closed`` guards make concurrent or
repeat calls (e.g. session.close racing the WS-orphan reaper) harmless."""
with _sessions_lock:
session = _sessions.pop(sid, None)
if session is None:
return False
_teardown_session(session, end_reason=end_reason)
return True
def _ws_session_is_orphaned(session: dict | None) -> bool:
@ -432,30 +482,125 @@ def _schedule_ws_orphan_reap(sid: str) -> None:
return
def _reap() -> None:
# Serialize the orphan re-check against session.resume (which re-binds a
# live transport under _session_resume_lock and would make this session
# non-orphaned). The actual pop + teardown then goes through the shared
# _close_session_by_id funnel so the dict mutation happens under
# _sessions_lock — consistent with every other _sessions mutator
# (#39591: _reap previously popped under _session_resume_lock, giving no
# mutual exclusion against _init_session / _close_session_by_id, which
# guard with _sessions_lock). _sessions_lock is an RLock and the global
# ordering is always resume_lock -> sessions_lock, so nesting is safe.
with _session_resume_lock:
session = _sessions.get(sid)
if not _ws_session_is_orphaned(session):
if not _ws_session_is_orphaned(_sessions.get(sid)):
return
_sessions.pop(sid, None)
try:
_teardown_session(session)
except Exception:
pass
_close_session_by_id(sid, end_reason="ws_orphan_reap")
timer = threading.Timer(_WS_ORPHAN_REAP_GRACE_S, _reap)
timer.daemon = True
timer.start()
def _close_sessions_for_transport(
transport, *, end_reason: str = "ws_disconnect"
) -> tuple[int, int]:
"""On transport disconnect, reap the sessions that opted into
close_on_disconnect (sidecar/dashboard) immediately via the unified
``_close_session_by_id`` path, and re-point the rest back to stdio so later
emits don't hit a dead socket.
Non-flagged detached sessions are handed to the grace-windowed WS-orphan
reaper (``_schedule_ws_orphan_reap``): a quick reconnect / session.resume
that re-binds a live transport cancels the reap, otherwise the orphan is
torn down through the same idempotent ``_teardown_session`` path. This is
the single WS-disconnect teardown entry point there is no second
independent reap loop in ``handle_ws``.
Returns ``(reaped, detached)`` counts for disconnect-path observability."""
with _sessions_lock:
owned = [(sid, s) for sid, s in _sessions.items() if s.get("transport") is transport]
reaped = 0
detached = 0
for sid, session in owned:
if session.get("close_on_disconnect"):
_close_session_by_id(sid, end_reason=end_reason)
reaped += 1
else:
# Point detached sessions at the drop sentinel (NOT real stdio) so
# _ws_session_is_orphaned recognizes them and the grace-reap can
# actually fire; a standalone `hermes --tui` keeps real _stdio.
session["transport"] = _detached_ws_transport
detached += 1
try:
_schedule_ws_orphan_reap(sid)
except Exception:
pass
return reaped, detached
def _shutdown_sessions() -> None:
with _sessions_lock:
snapshot = list(_sessions.values())
for session in snapshot:
# _finalize_session closes the slash-worker subprocess too.
_finalize_session(session, end_reason="tui_shutdown")
sids = list(_sessions)
for sid in sids:
_close_session_by_id(sid, end_reason="tui_shutdown")
# Last-resort net for any disconnect path that slips past the WS finally. TTL is
# hours-scale because last_active freezes during a long turn and on passive
# viewing — running/pending/starting/live-transport are hard exemptions instead.
try:
_SESSION_TTL_S = float(os.environ.get("HERMES_TUI_SESSION_TTL_S") or 6 * 3600)
except (TypeError, ValueError):
_SESSION_TTL_S = float(6 * 3600)
_SESSION_TTL_S = max(0.0, _SESSION_TTL_S)
_REAPER_SCAN_S = 300.0
def _transport_is_dead(transport) -> bool:
# _detached_ws_transport is the post-WS-disconnect drop sentinel; a session
# parked on it has no live client. _stdio_transport is the REAL transport
# for a standalone `hermes --tui`, so it must NOT count as dead here (doing
# so let the idle reaper evict healthy standalone TUI sessions).
if transport is _detached_ws_transport:
return True
return getattr(transport, "_closed", None) is True
def _session_is_evictable(sid: str, session: dict, now: float) -> bool:
if session.get("running") or _session_pending_kind(sid):
return False
ready = session.get("agent_ready")
if ready is not None and not ready.is_set(): # still starting
return False
if not _transport_is_dead(session.get("transport")):
return False
last_active = float(session.get("last_active") or 0.0)
created_at = float(session.get("created_at") or 0.0)
return (now - last_active) > _SESSION_TTL_S and (now - created_at) > _SESSION_TTL_S
def _reap_idle_sessions() -> None:
now = time.time()
with _sessions_lock:
victims = [sid for sid, s in _sessions.items() if _session_is_evictable(sid, s, now)]
for sid in victims:
_close_session_by_id(sid, end_reason="idle_timeout")
def _start_idle_reaper() -> None:
def _loop():
while True:
time.sleep(_REAPER_SCAN_S)
try:
_reap_idle_sessions()
except Exception:
pass
threading.Thread(target=_loop, daemon=True).start()
atexit.register(_shutdown_sessions)
_start_idle_reaper()
# ── Plumbing ──────────────────────────────────────────────────────────
@ -726,7 +871,7 @@ def _start_agent_build(sid: str, session: dict) -> None:
try:
worker = _SlashWorker(key, getattr(agent, "model", _resolve_model()))
current["slash_worker"] = worker
_attach_worker(sid, current, worker)
except Exception:
pass
@ -771,21 +916,18 @@ def _start_agent_build(sid: str, session: dict) -> None:
finally:
if home_token is not None:
reset_hermes_home_override(home_token)
# _attach_worker already closed the worker if this session was
# reaped mid-build; only the late notify registration can still
# leak (session.close unregistered before _build registered it).
with _sessions_lock:
replaced = _sessions.get(sid) is not current
if replaced:
if worker is not None:
try:
worker.close()
except Exception:
pass
if notify_registered:
try:
from tools.approval import unregister_gateway_notify
if replaced and notify_registered:
try:
from tools.approval import unregister_gateway_notify
unregister_gateway_notify(key)
except Exception:
pass
unregister_gateway_notify(key)
except Exception:
pass
ready.set()
threading.Thread(target=_build, daemon=True).start()
@ -1440,7 +1582,7 @@ def _tool_progress_enabled(sid: str) -> bool:
return _session_tool_progress_mode(sid) != "off"
def _restart_slash_worker(session: dict):
def _restart_slash_worker(sid: str, session: dict):
worker = session.get("slash_worker")
if worker:
try:
@ -1448,12 +1590,18 @@ def _restart_slash_worker(session: dict):
except Exception:
pass
try:
session["slash_worker"] = _SlashWorker(
new_worker = _SlashWorker(
session["session_key"],
getattr(session.get("agent"), "model", _resolve_model()),
)
except Exception:
session["slash_worker"] = None
return
# Route through the same store-iff-still-mapped guard as the spawn sites:
# the post-turn restart runs as `running` flips false, exactly when a
# close_on_disconnect reap can pop this session — a bare store would orphan
# the fresh worker (it self-heals only on gateway exit via the watchdog).
_attach_worker(sid, session, new_worker)
def _persist_model_switch(result) -> None:
@ -1539,7 +1687,7 @@ def _apply_model_switch(sid: str, session: dict, raw_input: str) -> dict:
base_url=result.base_url,
api_mode=result.api_mode,
)
_restart_slash_worker(session)
_restart_slash_worker(sid, session)
_emit("session.info", sid, _session_info(agent, session))
# Record the switch as a PER-SESSION override so a later rebuild of THIS
@ -1692,7 +1840,7 @@ def _sync_session_key_after_compress(
session["pending_title"] = None
if restart_slash_worker:
try:
_restart_slash_worker(session)
_restart_slash_worker(sid, session)
except Exception:
pass
@ -2597,7 +2745,7 @@ def _reset_session_agent(sid: str, session: dict) -> dict:
session["history_version"] = int(session.get("history_version", 0)) + 1
info = _session_info(new_agent, session)
_emit("session.info", sid, info)
_restart_slash_worker(session)
_restart_slash_worker(sid, session)
return info
@ -2745,8 +2893,10 @@ def _init_session(sid: str, key: str, agent, history: list, cols: int = 80):
logger.debug("failed to persist resumed session cwd", exc_info=True)
_register_session_cwd(_sessions[sid])
try:
_sessions[sid]["slash_worker"] = _SlashWorker(
key, getattr(agent, "model", _resolve_model())
_attach_worker(
sid,
_sessions[sid],
_SlashWorker(key, getattr(agent, "model", _resolve_model())),
)
except Exception:
# Defer hard-failure to slash.exec; chat still works without slash worker.
@ -3141,6 +3291,7 @@ def _(rid, params: dict) -> dict:
"agent_error": None,
"agent_ready": ready,
"attached_images": [],
"close_on_disconnect": is_truthy_value(params.get("close_on_disconnect", False)),
"cols": cols,
"created_at": now,
"edit_snapshots": {},
@ -3584,10 +3735,26 @@ def _(rid, params: dict) -> dict:
except Exception as e:
return _err(rid, 5036, f"could not enumerate active sessions: {e}")
# Liveness filter (#38950): a session whose teardown has begun (``_finalized``)
# is dead — its agent/worker are being released and it is no longer
# attachable — but it can briefly remain in ``_sessions`` until the reaper
# pops it (the WS grace-reap and idle reaper both set ``_finalized`` inside
# ``_teardown_session`` before the pop). Counting these inflated the footer's
# "N sessions" count, which only ever went up until a gateway restart. Drop
# them here so the count reflects genuinely attachable sessions. We do NOT
# filter on ``transport is _detached_ws_transport`` (the WS-detached drop
# sentinel): a detached session is still attachable via a quick reconnect /
# session.resume until the grace-reap finalizes it, and a standalone
# ``hermes --tui`` session legitimately rides the real stdio transport and
# must stay visible.
# Keep the natural creation/insertion order from ``_sessions``. The
# frontend marks the focused session with ``current``; it should not jump to
# the top just because the user switched to it.
rows = [_session_live_item(sid, session, current) for sid, session in snapshot]
rows = [
_session_live_item(sid, session, current)
for sid, session in snapshot
if not session.get("_finalized")
]
return _ok(rid, {"sessions": rows})
@ -4002,17 +4169,13 @@ def _(rid, params: dict) -> dict:
@method("session.close")
def _(rid, params: dict) -> dict:
sid = params.get("session_id", "")
with _sessions_lock:
current = _sessions.get(sid)
if not current:
return _ok(rid, {"closed": False})
# Serialize against the WS-orphan reaper (which also pops under
# _session_resume_lock) so a disconnect-reap and an explicit close can't
# both tear the same session down. _close_session_by_id is the single
# idempotent teardown path (pop + _teardown_session) and returns False
# when the session is already gone.
with _session_resume_lock:
with _sessions_lock:
session = _sessions.pop(sid, None)
if not session:
return _ok(rid, {"closed": False})
_teardown_session(session)
return _ok(rid, {"closed": True})
return _ok(rid, {"closed": _close_session_by_id(sid, end_reason="tui_close")})
@method("session.branch")
@ -7793,7 +7956,7 @@ def _(rid, params: dict) -> dict:
session["session_key"],
getattr(session.get("agent"), "model", _resolve_model()),
)
session["slash_worker"] = worker
_attach_worker(params.get("session_id", ""), session, worker)
except Exception as e:
return _err(rid, 5030, f"slash worker start failed: {e}")

View file

@ -19,8 +19,22 @@ from cli import HermesCLI
from rich.console import Console
# Env-overridable so the integration test can drive sub-second timing.
_WATCHDOG_POLL_S = float(os.environ.get("HERMES_SLASH_WATCHDOG_POLL_S", 2.0))
_ORPHAN_GRACE_S = float(os.environ.get("HERMES_SLASH_WATCHDOG_GRACE_S", 5.0))
def _env_float(name: str, default: float) -> float:
"""Parse a float env knob, falling back to ``default`` on absent/malformed
values. A bare ``float(os.environ.get(...))`` would raise ValueError at
import time on a typo (e.g. ``HERMES_SLASH_WATCHDOG_POLL_S=2s``) and kill
the worker before it can serve a single command."""
raw = os.environ.get(name)
if not raw:
return default
try:
return float(raw)
except (TypeError, ValueError):
return default
_WATCHDOG_POLL_S = max(0.05, _env_float("HERMES_SLASH_WATCHDOG_POLL_S", 2.0))
_ORPHAN_GRACE_S = max(0.0, _env_float("HERMES_SLASH_WATCHDOG_GRACE_S", 5.0))
_in_flight = threading.Event() # set while a command is executing

View file

@ -283,41 +283,44 @@ async def handle_ws(ws: Any) -> None:
)
break
finally:
reaped_sessions = 0
detached_sessions = 0
reaped_scheduled = 0
if transport is not None:
transport.close()
# Detach the transport from any sessions it owned so later emits
# do not crash into a closed socket or fall through to desktop
# stdout logs. Schedule a grace-delayed reap; a quick reconnect /
# session.resume re-binds a live transport and cancels it (see
# _ws_session_is_orphaned).
for _sid, sess in list(server._sessions.items()):
if sess.get("transport") is transport:
sess["transport"] = server._detached_ws_transport
detached_sessions += 1
try:
server._schedule_ws_orphan_reap(_sid)
reaped_scheduled += 1
except Exception:
_log.exception(
"ws orphan-reap schedule failed peer=%s sid=%s",
peer,
_sid,
)
# Reap sessions this transport owned (close_on_disconnect sidecar
# sessions) or detach the rest to the drop sentinel so later emits
# don't crash into a closed socket or fall through to desktop stdout
# logs. Detached sessions are handed to the grace-windowed WS-orphan
# reaper inside _close_sessions_for_transport (a quick reconnect /
# session.resume cancels it). This is the single WS-disconnect
# teardown path.
#
# Offloaded: _close_session_by_id does a blocking worker.close()
# (terminate + waits) plus a synchronous DB write — inline that
# would freeze the uvicorn event loop for every other live
# connection.
try:
reaped_sessions, detached_sessions = await asyncio.to_thread(
server._close_sessions_for_transport,
transport,
end_reason="ws_disconnect",
)
except Exception:
_log.exception("ws transport teardown failed peer=%s", peer)
try:
await ws.close()
except Exception as exc:
_log.debug("ws close failed peer=%s error=%s", peer, exc)
_log.info(
"ws closed peer=%s reason=%s messages=%d parse_errors=%d "
"dispatch_crashes=%d send_failures=%d detached_sessions=%d",
"dispatch_crashes=%d send_failures=%d reaped_sessions=%d detached_sessions=%d",
peer,
disconnect_reason,
messages,
parse_errors,
dispatch_crashes,
send_failures,
reaped_sessions,
detached_sessions,
)

View file

@ -120,7 +120,11 @@ export function ChatSidebar({ channel, className }: ChatSidebarProps) {
if (cancelled) {
return;
}
return gw.request<{ session_id: string }>("session.create", {});
// close_on_disconnect: the gateway reaps this sidecar session (and its
// slash_worker subprocess) when the WS drops, instead of leaking it.
return gw.request<{ session_id: string }>("session.create", {
close_on_disconnect: true,
});
})
.then((created) => {
if (cancelled || !created?.session_id) {