hermes-agent/tests/test_tui_gateway_ws.py
teknium1 365813a72b
fix: resolve rebase conflict in _teardown_session worker cleanup
Main folded slash_worker.close() into _finalize_session (the single
_finalized-guarded chokepoint) while #42143 was open. The rebase
conflicted with the PR's worker-close in _teardown_session. Keep both —
they target the same #38095 leak and _SlashWorker.close() is
idempotent (_closed/poll()-guarded) — so callers reaching
_teardown_session without the real _finalize_session (and the PR's own
tests, which monkeypatch _finalize_session out) still reap the worker.
Same for _shutdown_sessions, now routed through the unified
_close_session_by_id funnel.
2026-06-08 10:02:05 -07:00

89 lines
2.9 KiB
Python

import asyncio
from tui_gateway import server
from tui_gateway import ws as ws_mod
def _run_disconnect(monkeypatch, seed):
"""Drive handle_ws to its disconnect `finally`, seeding sessions against the
live WSTransport the moment it exists. Returns nothing; inspect _sessions."""
# Disable the grace-reap Timer: detached sessions normally schedule a
# threading.Timer via _schedule_ws_orphan_reap, which would outlive the test
# and fire _reap during interpreter teardown — touching _sessions/DB and
# producing spurious post-run errors under the per-file CI runner. Grace=0
# short-circuits the Timer (see _schedule_ws_orphan_reap) so the test leaves
# no lingering thread.
monkeypatch.setattr(server, "_WS_ORPHAN_REAP_GRACE_S", 0)
# Mirror the real _finalize_session chokepoint: it is the single place that
# closes the slash-worker (#38095). Stub it but keep that behavior so the
# disconnect-reap path still exercises worker teardown.
def _fake_finalize(s, end_reason="tui_close"):
w = s.get("slash_worker")
if w:
w.close()
monkeypatch.setattr(server, "_finalize_session", _fake_finalize)
created = []
real_transport = ws_mod.WSTransport
monkeypatch.setattr(
ws_mod, "WSTransport",
lambda ws, loop, **kw: created.append(real_transport(ws, loop, **kw)) or created[-1],
)
class FakeWS:
async def accept(self):
pass
async def send_text(self, line):
pass
async def receive_text(self):
seed(created[0]) # transport now exists; attach it to sessions
raise ws_mod._WebSocketDisconnect()
async def close(self):
pass
asyncio.run(ws_mod.handle_ws(FakeWS()))
def test_ws_disconnect_reaps_flagged_session_and_closes_worker(monkeypatch):
closed = []
class FakeWorker:
def close(self):
closed.append(True)
server._sessions.clear()
try:
_run_disconnect(
monkeypatch,
lambda t: server._sessions.update(
flagged={
"transport": t,
"close_on_disconnect": True,
"slash_worker": FakeWorker(),
"session_key": "k",
}
),
)
assert "flagged" not in server._sessions
assert closed == [True]
finally:
server._sessions.clear()
def test_ws_disconnect_preserves_and_repoints_reconnectable_session(monkeypatch):
server._sessions.clear()
try:
_run_disconnect(
monkeypatch,
lambda t: server._sessions.update(
plain={"transport": t, "close_on_disconnect": False, "session_key": "k"}
),
)
assert server._sessions["plain"]["transport"] is server._detached_ws_transport
finally:
server._sessions.clear()