diff --git a/tests/test_tui_gateway_server.py b/tests/test_tui_gateway_server.py index c0f5239035..533516b95d 100644 --- a/tests/test_tui_gateway_server.py +++ b/tests/test_tui_gateway_server.py @@ -949,3 +949,162 @@ def test_mirror_slash_side_effects_allowed_when_idle(monkeypatch): # Should NOT contain "session busy" — the switch went through. assert "session busy" not in warning assert applied["model"] + + +# --------------------------------------------------------------------------- +# session.create / session.close race: fast /new churn must not orphan the +# slash_worker subprocess or the global approval-notify registration. +# --------------------------------------------------------------------------- + + +def test_session_create_close_race_does_not_orphan_worker(monkeypatch): + """Regression guard: if session.close runs while session.create's + _build thread is still constructing the agent, the build thread + must detect the orphan and clean up the slash_worker + notify + registration it's about to install. Without the cleanup those + resources leak — the subprocess stays alive until atexit and the + notify callback lingers in the global registry.""" + import threading + + closed_workers: list[str] = [] + unregistered_keys: list[str] = [] + + class _FakeWorker: + def __init__(self, key, model): + self.key = key + self._closed = False + + def close(self): + self._closed = True + closed_workers.append(self.key) + + class _FakeAgent: + def __init__(self): + self.model = "x" + self.provider = "openrouter" + self.base_url = "" + self.api_key = "" + + # Make _build block until we release it — simulates slow agent init + release_build = threading.Event() + + def _slow_make_agent(sid, key): + release_build.wait(timeout=3.0) + return _FakeAgent() + + # Stub everything _build touches + monkeypatch.setattr(server, "_make_agent", _slow_make_agent) + monkeypatch.setattr(server, "_SlashWorker", _FakeWorker) + monkeypatch.setattr(server, "_get_db", lambda: types.SimpleNamespace(create_session=lambda *a, **kw: None)) + monkeypatch.setattr(server, "_session_info", lambda _a: {"model": "x"}) + monkeypatch.setattr(server, "_probe_credentials", lambda _a: None) + monkeypatch.setattr(server, "_wire_callbacks", lambda _sid: None) + monkeypatch.setattr(server, "_emit", lambda *a, **kw: None) + + # Shim register/unregister to observe leaks + import tools.approval as _approval + monkeypatch.setattr(_approval, "register_gateway_notify", + lambda key, cb: None) + monkeypatch.setattr(_approval, "unregister_gateway_notify", + lambda key: unregistered_keys.append(key)) + monkeypatch.setattr(_approval, "load_permanent_allowlist", lambda: None) + + # Start: session.create spawns _build thread, returns synchronously + resp = server.handle_request({ + "id": "1", "method": "session.create", "params": {"cols": 80}, + }) + assert resp.get("result"), f"got error: {resp.get('error')}" + sid = resp["result"]["session_id"] + + # Build thread is blocked in _slow_make_agent. Close the session + # NOW — this pops _sessions[sid] before _build can install the + # worker/notify. + close_resp = server.handle_request({ + "id": "2", "method": "session.close", "params": {"session_id": sid}, + }) + assert close_resp.get("result", {}).get("closed") is True + + # At this point session.close saw slash_worker=None (not yet + # installed) so it didn't close anything. Release the build thread + # and let it finish — it should detect the orphan and clean up the + # worker it just allocated + unregister the notify. + release_build.set() + + # Give the build thread a moment to run through its finally. + for _ in range(100): + if closed_workers: + break + import time + time.sleep(0.02) + + assert len(closed_workers) == 1, ( + f"orphan worker was not cleaned up — closed_workers={closed_workers}" + ) + # Notify may be unregistered by both session.close (unconditional) + # and the orphan-cleanup path; the key guarantee is that the build + # thread does at least one unregister call (any prior close + # already popped the callback; the duplicate is a no-op). + assert len(unregistered_keys) >= 1, ( + f"orphan notify registration was not unregistered — " + f"unregistered_keys={unregistered_keys}" + ) + + +def test_session_create_no_race_keeps_worker_alive(monkeypatch): + """Regression guard: when session.close does NOT race, the build + thread must install the worker + notify normally and leave them + alone (no over-eager cleanup).""" + closed_workers: list[str] = [] + unregistered_keys: list[str] = [] + + class _FakeWorker: + def __init__(self, key, model): + self.key = key + + def close(self): + closed_workers.append(self.key) + + class _FakeAgent: + def __init__(self): + self.model = "x" + self.provider = "openrouter" + self.base_url = "" + self.api_key = "" + + monkeypatch.setattr(server, "_make_agent", lambda sid, key: _FakeAgent()) + monkeypatch.setattr(server, "_SlashWorker", _FakeWorker) + monkeypatch.setattr(server, "_get_db", lambda: types.SimpleNamespace(create_session=lambda *a, **kw: None)) + monkeypatch.setattr(server, "_session_info", lambda _a: {"model": "x"}) + monkeypatch.setattr(server, "_probe_credentials", lambda _a: None) + monkeypatch.setattr(server, "_wire_callbacks", lambda _sid: None) + monkeypatch.setattr(server, "_emit", lambda *a, **kw: None) + + import tools.approval as _approval + monkeypatch.setattr(_approval, "register_gateway_notify", lambda key, cb: None) + monkeypatch.setattr(_approval, "unregister_gateway_notify", + lambda key: unregistered_keys.append(key)) + monkeypatch.setattr(_approval, "load_permanent_allowlist", lambda: None) + + resp = server.handle_request({ + "id": "1", "method": "session.create", "params": {"cols": 80}, + }) + sid = resp["result"]["session_id"] + + # Wait for the build to finish (ready event inside session dict). + session = server._sessions[sid] + session["agent_ready"].wait(timeout=2.0) + + # Build finished without a close race — nothing should have been + # cleaned up by the orphan check. + assert closed_workers == [], ( + f"build thread closed its own worker despite no race: {closed_workers}" + ) + assert unregistered_keys == [], ( + f"build thread unregistered its own notify despite no race: {unregistered_keys}" + ) + + # Session should have the live worker installed. + assert session.get("slash_worker") is not None + + # Cleanup + server._sessions.pop(sid, None) diff --git a/tui_gateway/server.py b/tui_gateway/server.py index 00f8346191..70dff3b17b 100644 --- a/tui_gateway/server.py +++ b/tui_gateway/server.py @@ -1088,7 +1088,23 @@ def _(rid, params: dict) -> dict: } def _build() -> None: - session = _sessions[sid] + session = _sessions.get(sid) + if session is None: + # session.close ran before the build thread got scheduled. + ready.set() + return + + # Track what we allocate so we can clean up if session.close + # races us to the finish line. session.close pops _sessions[sid] + # unconditionally and tries to close the slash_worker it finds; + # if _build is still mid-construction when close runs, close + # finds slash_worker=None / notify unregistered and returns + # cleanly — leaving us, the build thread, to later install the + # worker + notify on an orphaned session dict. The finally + # block below detects the orphan and cleans up instead of + # leaking a subprocess and a global notify registration. + worker = None + notify_registered = False try: tokens = _set_session_context(key) try: @@ -1100,13 +1116,15 @@ def _(rid, params: dict) -> dict: session["agent"] = agent try: - session["slash_worker"] = _SlashWorker(key, getattr(agent, "model", _resolve_model())) + worker = _SlashWorker(key, getattr(agent, "model", _resolve_model())) + session["slash_worker"] = worker except Exception: pass try: from tools.approval import register_gateway_notify, load_permanent_allowlist register_gateway_notify(key, lambda data: _emit("approval.request", sid, data)) + notify_registered = True load_permanent_allowlist() except Exception: pass @@ -1122,6 +1140,23 @@ def _(rid, params: dict) -> dict: session["agent_error"] = str(e) _emit("error", sid, {"message": f"agent init failed: {e}"}) finally: + # Orphan check: if session.close raced us and popped + # _sessions[sid] while we were building, the dict we just + # populated is unreachable. Clean up the subprocess and + # the global notify registration ourselves — session.close + # couldn't see them at the time it ran. + if _sessions.get(sid) is not session: + if worker is not None: + try: + worker.close() + except Exception: + pass + if notify_registered: + try: + from tools.approval import unregister_gateway_notify + unregister_gateway_notify(key) + except Exception: + pass ready.set() threading.Thread(target=_build, daemon=True).start()