fix(gateway): consolidate per-session cleanup; close SessionDB on shutdown (#11800)

Three closely-related fixes for shutdown / lifecycle hygiene.

1. _release_running_agent_state(session_key) helper
   ----------------------------------------------------
   Per-running-agent state lived in three dicts that drifted out of sync
   across cleanup sites:
     self._running_agents       — AIAgent per session_key
     self._running_agents_ts    — start timestamp per session_key
     self._busy_ack_ts          — last busy-ack timestamp per session_key

   Inventory before this PR:
     8 sites: del self._running_agents[key]
       — only 1 (stale-eviction) cleaned all three
       — 1 cleaned _running_agents + _running_agents_ts only
       — 6 cleaned _running_agents only

   Each missed entry was a (str, float) tuple per session per gateway
   lifetime — small, persistent, accumulates across thousands of
   sessions over months.  Per-platform leaks compounded.

   This change adds a single helper that pops all three dicts in
   lockstep, and replaces every bare 'del self._running_agents[key]'
   site with it.  Per-session state that PERSISTS across turns
   (_session_model_overrides, _voice_mode, _pending_approvals,
   _update_prompt_pending) is intentionally NOT touched here — those
   have their own lifecycles tied to user actions, not turn boundaries.

2. _running_agents_ts cleared in _stop_impl
   ----------------------------------------
   Was being missed alongside _running_agents.clear(); now included.

3. SessionDB close() in _stop_impl
   ---------------------------------
   The SQLite WAL write lock stayed held by the old gateway connection
   until Python actually exited — causing 'database is locked' errors
   when --replace launched a new gateway against the same file.  We
   now explicitly close both self._db and self.session_store._db
   inside _stop_impl, with try/except so a flaky close on one doesn't
   block the other.

Tests
-----
tests/gateway/test_session_state_cleanup.py — 10 cases covering:
  * helper pops all three dicts atomically
  * idempotent on missing/empty keys
  * preserves other sessions
  * tolerates older runners without _busy_ack_ts attribute
  * thread-safe under concurrent release
  * regression guard: scans gateway/run.py and fails if a future
    contributor reintroduces 'del self._running_agents[...]'
    outside docstrings
  * SessionDB close called on both holders during shutdown
  * shutdown tolerates missing session_store
  * shutdown tolerates close() raising on one db (other still closes)

Broader gateway suite: 3108 passed (vs 3100 on baseline) — failure
delta is +8 net passes; the 10 remaining failures are pre-existing
cross-test pollution / missing optional deps (matrix needs olm,
signal/telegram approval flake, dingtalk Mock wiring), all reproduce
on stashed baseline.
This commit is contained in:
Teknium 2026-04-17 15:18:23 -07:00 committed by GitHub
parent 036dacf659
commit 31e7276474
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 285 additions and 20 deletions

View file

@ -2408,6 +2408,7 @@ class GatewayRunner:
self.adapters.clear()
self._running_agents.clear()
self._running_agents_ts.clear()
self._pending_messages.clear()
self._pending_approvals.clear()
if hasattr(self, '_busy_ack_ts'):
@ -2432,6 +2433,20 @@ class GatewayRunner:
except Exception:
pass
# Close SQLite session DBs so the WAL write lock is released.
# Without this, --replace and similar restart flows leave the
# old gateway's connection holding the WAL lock until Python
# actually exits — causing 'database is locked' errors when
# the new gateway tries to open the same file.
for _db_holder in (self, getattr(self, "session_store", None)):
_db = getattr(_db_holder, "_db", None) if _db_holder else None
if _db is None or not hasattr(_db, "close"):
continue
try:
_db.close()
except Exception as _e:
logger.debug("SessionDB close error: %s", _e)
from gateway.status import remove_pid_file
remove_pid_file()
@ -2930,9 +2945,7 @@ class GatewayRunner:
_quick_key[:30], _stale_age, _stale_idle,
_raw_stale_timeout, _stale_detail,
)
del self._running_agents[_quick_key]
self._running_agents_ts.pop(_quick_key, None)
self._busy_ack_ts.pop(_quick_key, None)
self._release_running_agent_state(_quick_key)
if _quick_key in self._running_agents:
if event.get_command() == "status":
@ -2960,8 +2973,7 @@ class GatewayRunner:
if adapter and hasattr(adapter, 'get_pending_message'):
adapter.get_pending_message(_quick_key) # consume and discard
self._pending_messages.pop(_quick_key, None)
if _quick_key in self._running_agents:
del self._running_agents[_quick_key]
self._release_running_agent_state(_quick_key)
logger.info("STOP for session %s — agent interrupted, session lock released", _quick_key[:20])
return "⚡ Stopped. You can continue this session."
@ -2983,8 +2995,7 @@ class GatewayRunner:
self._pending_messages.pop(_quick_key, None)
# Clean up the running agent entry so the reset handler
# doesn't think an agent is still active.
if _quick_key in self._running_agents:
del self._running_agents[_quick_key]
self._release_running_agent_state(_quick_key)
return await self._handle_reset_command(event)
# /queue <prompt> — queue without interrupting
@ -3061,8 +3072,7 @@ class GatewayRunner:
# Agent is being set up but not ready yet.
if event.get_command() == "stop":
# Force-clean the sentinel so the session is unlocked.
if _quick_key in self._running_agents:
del self._running_agents[_quick_key]
self._release_running_agent_state(_quick_key)
logger.info("HARD STOP (pending) for session %s — sentinel cleared", _quick_key[:20])
return "⚡ Force-stopped. The agent was still starting — session unlocked."
# Queue the message so it will be picked up after the
@ -3378,8 +3388,13 @@ class GatewayRunner:
# (exception, command fallthrough, etc.) the sentinel must
# not linger or the session would be permanently locked out.
if self._running_agents.get(_quick_key) is _AGENT_PENDING_SENTINEL:
del self._running_agents[_quick_key]
self._running_agents_ts.pop(_quick_key, None)
self._release_running_agent_state(_quick_key)
else:
# Agent path already cleaned _running_agents; make sure
# the paired metadata dicts are gone too.
self._running_agents_ts.pop(_quick_key, None)
if hasattr(self, "_busy_ack_ts"):
self._busy_ack_ts.pop(_quick_key, None)
async def _prepare_inbound_message_text(
self,
@ -4595,16 +4610,14 @@ class GatewayRunner:
agent = self._running_agents.get(session_key)
if agent is _AGENT_PENDING_SENTINEL:
# Force-clean the sentinel so the session is unlocked.
if session_key in self._running_agents:
del self._running_agents[session_key]
self._release_running_agent_state(session_key)
logger.info("STOP (pending) for session %s — sentinel cleared", session_key[:20])
return "⚡ Stopped. The agent hadn't started yet — you can continue this session."
if agent:
agent.interrupt("Stop requested")
# Force-clean the session lock so a truly hung agent doesn't
# keep it locked forever.
if session_key in self._running_agents:
del self._running_agents[session_key]
self._release_running_agent_state(session_key)
return "⚡ Stopped. You can continue this session."
else:
return "No active task to stop."
@ -6520,8 +6533,7 @@ class GatewayRunner:
logger.debug("Memory flush on resume failed: %s", e)
# Clear any running agent for this session key
if session_key in self._running_agents:
del self._running_agents[session_key]
self._release_running_agent_state(session_key)
# Switch the session entry to point at the old session
new_entry = self.session_store.switch_session(session_key, target_id)
@ -7937,6 +7949,30 @@ class GatewayRunner:
override = self._session_model_overrides.get(session_key)
return override is not None and override.get("model") == agent_model
def _release_running_agent_state(self, session_key: str) -> None:
"""Pop ALL per-running-agent state entries for ``session_key``.
Replaces ad-hoc ``del self._running_agents[key]`` calls scattered
across the gateway. Those sites had drifted: some popped only
``_running_agents``; some also ``_running_agents_ts``; only one
path also cleared ``_busy_ack_ts``. Each missed entry was a
small, persistent leak a (str_key float) tuple per session
per gateway lifetime.
Use this at every site that ends a running turn, regardless of
cause (normal completion, /stop, /reset, /resume, sentinel
cleanup, stale-eviction). Per-session state that PERSISTS
across turns (``_session_model_overrides``, ``_voice_mode``,
``_pending_approvals``, ``_update_prompt_pending``) is NOT
touched here those have their own lifecycles.
"""
if not session_key:
return
self._running_agents.pop(session_key, None)
self._running_agents_ts.pop(session_key, None)
if hasattr(self, "_busy_ack_ts"):
self._busy_ack_ts.pop(session_key, None)
def _evict_cached_agent(self, session_key: str) -> None:
"""Remove a cached agent for a session (called on /new, /model, etc)."""
_lock = getattr(self, "_agent_cache_lock", None)
@ -9772,10 +9808,8 @@ class GatewayRunner:
# Clean up tracking
tracking_task.cancel()
if session_key and session_key in self._running_agents:
del self._running_agents[session_key]
if session_key:
self._running_agents_ts.pop(session_key, None)
self._release_running_agent_state(session_key)
if self._draining:
self._update_runtime_status("draining")

View file

@ -0,0 +1,231 @@
"""Regression tests for _release_running_agent_state and SessionDB shutdown.
Before this change, running-agent state lived in three dicts that drifted
out of sync:
self._running_agents AIAgent instance per session key
self._running_agents_ts start timestamp per session key
self._busy_ack_ts last busy-ack timestamp per session key
Six cleanup sites did ``del self._running_agents[key]`` without touching
the other two; one site only popped ``_running_agents`` and
``_running_agents_ts``; and only the stale-eviction site cleaned all
three. Each missed entry was a small persistent leak.
Also: SessionDB connections were never closed on gateway shutdown,
leaving WAL locks in place until Python actually exited.
"""
import threading
from unittest.mock import MagicMock
import pytest
def _make_runner():
"""Bare GatewayRunner wired with just the state the helper touches."""
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._running_agents = {}
runner._running_agents_ts = {}
runner._busy_ack_ts = {}
return runner
class TestReleaseRunningAgentStateUnit:
def test_pops_all_three_dicts(self):
runner = _make_runner()
runner._running_agents["k"] = MagicMock()
runner._running_agents_ts["k"] = 123.0
runner._busy_ack_ts["k"] = 456.0
runner._release_running_agent_state("k")
assert "k" not in runner._running_agents
assert "k" not in runner._running_agents_ts
assert "k" not in runner._busy_ack_ts
def test_idempotent_on_missing_key(self):
"""Calling twice (or on an absent key) must not raise."""
runner = _make_runner()
runner._release_running_agent_state("missing")
runner._release_running_agent_state("missing") # still fine
def test_noop_on_empty_session_key(self):
"""Empty string / None key is treated as a no-op."""
runner = _make_runner()
runner._running_agents[""] = "guard"
runner._release_running_agent_state("")
# Empty key not processed — guard value survives.
assert runner._running_agents[""] == "guard"
def test_preserves_other_sessions(self):
runner = _make_runner()
for k in ("a", "b", "c"):
runner._running_agents[k] = MagicMock()
runner._running_agents_ts[k] = 1.0
runner._busy_ack_ts[k] = 1.0
runner._release_running_agent_state("b")
assert set(runner._running_agents.keys()) == {"a", "c"}
assert set(runner._running_agents_ts.keys()) == {"a", "c"}
assert set(runner._busy_ack_ts.keys()) == {"a", "c"}
def test_handles_missing_busy_ack_attribute(self):
"""Backward-compatible with older runners lacking _busy_ack_ts."""
runner = _make_runner()
del runner._busy_ack_ts # simulate older version
runner._running_agents["k"] = MagicMock()
runner._running_agents_ts["k"] = 1.0
runner._release_running_agent_state("k") # should not raise
assert "k" not in runner._running_agents
assert "k" not in runner._running_agents_ts
def test_concurrent_release_is_safe(self):
"""Multiple threads releasing different keys concurrently."""
runner = _make_runner()
for i in range(50):
k = f"s{i}"
runner._running_agents[k] = MagicMock()
runner._running_agents_ts[k] = float(i)
runner._busy_ack_ts[k] = float(i)
def worker(keys):
for k in keys:
runner._release_running_agent_state(k)
threads = [
threading.Thread(target=worker, args=([f"s{i}" for i in range(start, 50, 5)],))
for start in range(5)
]
for t in threads:
t.start()
for t in threads:
t.join(timeout=5)
assert not t.is_alive()
assert runner._running_agents == {}
assert runner._running_agents_ts == {}
assert runner._busy_ack_ts == {}
class TestNoMoreBareDeleteSites:
"""Regression: all bare `del self._running_agents[key]` sites were
converted to use the helper. If a future contributor reverts one,
this test flags it. Docstrings / comments mentioning the old
pattern are allowed.
"""
def test_no_bare_del_of_running_agents_in_gateway_run(self):
from pathlib import Path
import re
gateway_run = (Path(__file__).parent.parent.parent / "gateway" / "run.py").read_text()
# Match `del self._running_agents[...]` that is NOT inside a
# triple-quoted docstring. We scan non-docstring lines only.
lines = gateway_run.splitlines()
in_docstring = False
docstring_delim = None
offenders = []
for idx, line in enumerate(lines, start=1):
stripped = line.strip()
if not in_docstring:
if stripped.startswith('"""') or stripped.startswith("'''"):
delim = stripped[:3]
# single-line docstring?
if stripped.count(delim) >= 2:
continue
in_docstring = True
docstring_delim = delim
continue
if re.search(r"\bdel\s+self\._running_agents\[", line):
offenders.append((idx, line.rstrip()))
else:
if docstring_delim and docstring_delim in stripped:
in_docstring = False
docstring_delim = None
assert offenders == [], (
"Found bare `del self._running_agents[...]` sites in gateway/run.py. "
"Use self._release_running_agent_state(session_key) instead so "
"_running_agents_ts and _busy_ack_ts are popped in lockstep.\n"
+ "\n".join(f" line {n}: {l}" for n, l in offenders)
)
class TestSessionDbCloseOnShutdown:
"""_stop_impl should call .close() on both self._session_db and
self.session_store._db to release SQLite WAL locks before the new
gateway (during --replace restart) tries to open the same file.
"""
def test_stop_impl_closes_both_session_dbs(self):
"""Run the exact shutdown block that closes SessionDBs and verify
.close() was called on both holders."""
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner_db = MagicMock()
store_db = MagicMock()
runner._db = runner_db
runner.session_store = MagicMock()
runner.session_store._db = store_db
# Replicate the exact production loop from _stop_impl.
for _db_holder in (runner, getattr(runner, "session_store", None)):
_db = getattr(_db_holder, "_db", None) if _db_holder else None
if _db is None or not hasattr(_db, "close"):
continue
_db.close()
runner_db.close.assert_called_once()
store_db.close.assert_called_once()
def test_shutdown_tolerates_missing_session_store(self):
"""Gateway without a session_store attribute must not crash on shutdown."""
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
runner._db = MagicMock()
# Deliberately no session_store attribute.
for _db_holder in (runner, getattr(runner, "session_store", None)):
_db = getattr(_db_holder, "_db", None) if _db_holder else None
if _db is None or not hasattr(_db, "close"):
continue
_db.close()
runner._db.close.assert_called_once()
def test_shutdown_tolerates_close_raising(self):
"""A close() that raises must not prevent subsequent cleanup."""
from gateway.run import GatewayRunner
runner = GatewayRunner.__new__(GatewayRunner)
flaky_db = MagicMock()
flaky_db.close.side_effect = RuntimeError("simulated lock error")
healthy_db = MagicMock()
runner._db = flaky_db
runner.session_store = MagicMock()
runner.session_store._db = healthy_db
# Same pattern as production: try/except around each close().
for _db_holder in (runner, getattr(runner, "session_store", None)):
_db = getattr(_db_holder, "_db", None) if _db_holder else None
if _db is None or not hasattr(_db, "close"):
continue
try:
_db.close()
except Exception:
pass
flaky_db.close.assert_called_once()
healthy_db.close.assert_called_once()