fix(gateway): self-heal stale sessions.json routing at message time

Detect a routing key whose session is already ended in state.db
(end_reason set) inside get_or_create_session and drop the stale entry
instead of silently routing the message into a closed session.

Previously the only runtime cleanup of sessions.json was the startup
_prune_stale_sessions_locked (#52808/#54138), which requires a restart.
A session ended while the gateway stays alive — any path that finalizes
the DB row without clearing sessions.json — left a live routing key
pointing at a closed session. get_or_create_session never consulted
end_reason, so it returned that stale entry and every subsequent message
was silently dropped (no log, no error, no response) until the next
restart. This is the live-gateway variant of #52804/FM9, which needed an
actual gateway crash.

The guard drops the stale entry and falls through to
_recover_session_from_db, which reopens agent_close-ended rows and
resumes the SAME session_id (transcript preserved); if the row ended for
a non-recoverable reason (e.g. /new) it correctly starts a fresh
session. A warning is logged so the event is visible (the field
incident reported zero log output).

Adds tests/gateway/test_session_store_runtime_stale_guard.py covering
the _is_session_ended_in_db helper and the end-to-end routing self-heal
(recover-vs-fresh, live-entry untouched, stale-wins-over-suspended,
force_new short-circuit).

Closes #54878.

Co-authored-by: David Gutowsky <david.gutowsky@gmail.com>
This commit is contained in:
David Gutowsky 2026-06-30 12:59:11 +05:30 committed by kshitij
parent 6763d63240
commit 3a83b6bc5d
2 changed files with 294 additions and 34 deletions

View file

@ -905,11 +905,11 @@ class SessionStore:
# in state.db. A hard gateway crash (exit code 1) skips the graceful
# shutdown path, so sessions.json is never cleared and is left pointing
# at ended sessions. On the next startup those stale entries act as live
# routing keys, but get_or_create_session() reuses them as long as the
# time/policy reset checks pass — it never consults end_reason — so every
# incoming message is silently routed into a closed session. Pruning here
# (lock already held) is cheap: one lookup per routing key, once at
# startup, and self-heals into a fresh session on the next message.
# routing keys. get_or_create_session() only consulted end_reason at
# startup (here) until #54878 added a routing-time guard for the
# live-gateway case; this startup prune still self-heals crash-left
# entries before the first message arrives. Pruning here (lock already
# held) is cheap: one lookup per routing key, once at startup.
self._prune_stale_sessions_locked()
def _prune_stale_sessions_locked(self) -> None:
@ -1148,6 +1148,32 @@ class SessionStore:
return False
def _is_session_ended_in_db(self, session_id: str) -> bool:
"""Return True iff state.db has this session with a non-null end_reason.
Mirrors the staleness test in ``_prune_stale_sessions_locked``:
- no DB handle / no session_id -> False (can't tell — keep)
- row absent (legacy / not yet persisted) -> False (keep)
- end_reason is None -> False (alive keep)
- end_reason not None -> True (ended stale)
Used by ``get_or_create_session`` to self-heal at routing time:
``_prune_stale_sessions_locked`` only runs at startup, so a session
ended in the DB while the gateway stays alive (any path that finalizes
the row without clearing sessions.json) would otherwise be reused as a
live routing key and silently swallow every subsequent message until
the next restart (#54878 — the live-gateway variant of #52804/FM9).
DB errors are non-fatal never block routing on a failed lookup.
"""
db = getattr(self, "_db", None)
if not db or not session_id:
return False
try:
row = db.get_session(session_id)
except Exception:
return False
return bool(row is not None and row.get("end_reason") is not None)
def _should_reset(self, entry: SessionEntry, source: SessionSource) -> Optional[str]:
"""
Check if a session should be reset based on policy.
@ -1243,39 +1269,73 @@ class SessionStore:
if session_key in self._entries and not force_new:
entry = self._entries[session_key]
# Auto-reset sessions marked as suspended (e.g. after /stop
# broke a stuck loop — #7536). ``suspended`` is the hard
# forced-wipe signal and always wins over ``resume_pending``,
# so repeated interrupted restarts that escalate via the
# existing ``.restart_failure_counts`` stuck-loop counter
# still converge to a clean slate.
if entry.suspended:
reset_reason = "suspended"
elif entry.resume_pending:
# Restart-interrupted session: preserve the session_id
# and return the existing entry so the transcript reloads
# intact, but still honour normal daily/idle reset policy.
reset_reason = self._should_reset(entry, source)
# Self-heal stale routing: if this session_key still points at
# a session that has ALREADY been ended in state.db (end_reason
# set), the in-memory sessions.json entry is stale. Reusing it
# would route every incoming message into a closed session and
# silently drop it — with no log, no error, no response — until
# the gateway restarts and _prune_stale_sessions_locked() clears
# it (#54878 — the live-gateway variant of #52804/FM9, which
# only the startup prune previously caught).
#
# Drop the stale entry and fall through to the recovery path
# below. Leaving db_end_session_id None routes us into
# _recover_session_from_db, whose finder
# (hermes_state.find_latest_gateway_session_for_peer) selects
# rows WHERE `ended_at IS NULL OR end_reason = 'agent_close'`
# — so it REOPENS gateway-cleanup-ended ('agent_close') rows and
# resumes the SAME session_id (transcript preserved), but returns
# None for any other end_reason (e.g. /new), which then correctly
# starts a fresh session.
if self._is_session_ended_in_db(entry.session_id):
logger.warning(
"gateway.session: routing key %r -> %s is ended in "
"state.db but still live in sessions.json; dropping "
"stale entry and recovering/recreating the session "
"(#54878)",
session_key, entry.session_id,
)
self._entries.pop(session_key, None)
was_auto_reset = False
auto_reset_reason = None
reset_had_activity = False
# Fall through to the recovery/create path below; the
# stale entry is gone so we must NOT consult its
# suspended/resume/reset state.
else:
# Auto-reset sessions marked as suspended (e.g. after /stop
# broke a stuck loop — #7536). ``suspended`` is the hard
# forced-wipe signal and always wins over ``resume_pending``,
# so repeated interrupted restarts that escalate via the
# existing ``.restart_failure_counts`` stuck-loop counter
# still converge to a clean slate.
if entry.suspended:
reset_reason = "suspended"
elif entry.resume_pending:
# Restart-interrupted session: preserve the session_id
# and return the existing entry so the transcript reloads
# intact, but still honour normal daily/idle reset policy.
reset_reason = self._should_reset(entry, source)
if not reset_reason:
entry.updated_at = now
self._save()
return entry
else:
reset_reason = self._should_reset(entry, source)
if not reset_reason:
entry.updated_at = now
self._save()
return entry
else:
reset_reason = self._should_reset(entry, source)
if not reset_reason:
entry.updated_at = now
self._save()
return entry
else:
# Session is being auto-reset.
was_auto_reset = True
auto_reset_reason = reset_reason
# Track whether the expired session had any real conversation.
# total_tokens is never written (token counts migrated to
# agent-direct persistence) so it is always 0 — use
# last_prompt_tokens, which is updated on every turn.
reset_had_activity = entry.last_prompt_tokens > 0
db_end_session_id = entry.session_id
else:
# Session is being auto-reset.
was_auto_reset = True
auto_reset_reason = reset_reason
# Track whether the expired session had any real
# conversation. total_tokens is never written (token
# counts migrated to agent-direct persistence) so it is
# always 0 — use last_prompt_tokens, updated every turn.
reset_had_activity = entry.last_prompt_tokens > 0
db_end_session_id = entry.session_id
else:
was_auto_reset = False
auto_reset_reason = None

View file

@ -0,0 +1,200 @@
"""Runtime self-heal for stale sessions.json routing entries (#54878).
`_prune_stale_sessions_locked` only runs at gateway startup. A session ended
in state.db while the gateway stays alive (e.g. any path that finalizes the
row without clearing sessions.json) leaves a stale `session_key -> session_id`
mapping whose session has `end_reason` set. Before this fix,
`get_or_create_session` returned that stale entry as a live routing key (it
never consulted end_reason), so every subsequent message was silently routed
into a closed session and dropped no log, no error, no response until the
next restart pruned it.
This is the live-gateway variant of #52804/FM9 (#52808/#54138 startup prune),
which required an actual gateway *crash*. Here the guard inside
`get_or_create_session` detects the ended row at routing time and drops the
stale entry, falling through to `_recover_session_from_db` (which reopens
`agent_close`-ended rows and resumes the SAME session_id, preserving the
transcript) or, failing recovery, to a fresh session.
"""
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch
from gateway.config import GatewayConfig, Platform, SessionResetPolicy
from gateway.session import SessionEntry, SessionSource, SessionStore
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_entry(key: str, session_id: str, **kw) -> SessionEntry:
now = datetime.now()
return SessionEntry(
session_key=key,
session_id=session_id,
created_at=now - timedelta(hours=2),
updated_at=now - timedelta(hours=1),
platform=Platform.TELEGRAM,
chat_type="dm",
**kw,
)
def _db_returning(rows: dict) -> MagicMock:
"""SessionDB mock where get_session maps session_id -> row dict."""
db = MagicMock()
db.get_session.side_effect = lambda sid: rows.get(sid)
# By default recovery finds nothing (forces a fresh session).
db.find_latest_gateway_session_for_peer.return_value = None
db.reopen_session.return_value = None
db.create_session.return_value = None
return db
def _make_store_with_db(tmp_path, db_mock) -> SessionStore:
"""Build a SessionStore with a mock SessionDB, bypassing disk load."""
config = GatewayConfig(default_reset_policy=SessionResetPolicy(mode="none"))
with patch("gateway.session.SessionStore._ensure_loaded"):
store = SessionStore(sessions_dir=tmp_path, config=config)
store._db = db_mock
store._loaded = True
return store
def _source() -> SessionSource:
# session_key for this peer is deterministic; matches the entry key we seed.
return SessionSource(
platform=Platform.TELEGRAM,
chat_id="8494508720",
chat_type="dm",
user_id="8494508720",
)
# ---------------------------------------------------------------------------
# _is_session_ended_in_db helper
# ---------------------------------------------------------------------------
class TestIsSessionEndedInDb:
def test_ended_row_is_stale(self, tmp_path):
db = _db_returning({"sid": {"end_reason": "agent_close", "id": "sid"}})
store = _make_store_with_db(tmp_path, db)
assert store._is_session_ended_in_db("sid") is True
def test_alive_row_not_stale(self, tmp_path):
db = _db_returning({"sid": {"end_reason": None, "id": "sid"}})
store = _make_store_with_db(tmp_path, db)
assert store._is_session_ended_in_db("sid") is False
def test_absent_row_not_stale(self, tmp_path):
# Not yet persisted / legacy — must NOT be treated as ended, else a
# freshly-created in-memory session would be wrongly discarded.
db = _db_returning({})
store = _make_store_with_db(tmp_path, db)
assert store._is_session_ended_in_db("sid_absent") is False
def test_no_db_not_stale(self, tmp_path):
store = _make_store_with_db(tmp_path, _db_returning({}))
store._db = None
assert store._is_session_ended_in_db("sid") is False
def test_empty_session_id_not_stale(self, tmp_path):
store = _make_store_with_db(tmp_path, _db_returning({}))
assert store._is_session_ended_in_db("") is False
def test_db_error_not_stale(self, tmp_path):
db = MagicMock()
db.get_session.side_effect = Exception("DB locked")
store = _make_store_with_db(tmp_path, db)
# On error, never block routing — treat as not-stale (keep).
assert store._is_session_ended_in_db("sid") is False
# ---------------------------------------------------------------------------
# get_or_create_session — runtime self-heal
# ---------------------------------------------------------------------------
class TestRuntimeStaleGuard:
def test_stale_agent_close_entry_recovered_preserving_session_id(self, tmp_path):
"""Stale `agent_close` entry → recovery reopens the SAME session_id."""
source = _source()
db = _db_returning({"sid_stale": {"end_reason": "agent_close", "id": "sid_stale"}})
# Recovery finds the agent_close row and reopens it (transcript-preserving).
db.find_latest_gateway_session_for_peer.return_value = {
"id": "sid_stale",
"started_at": (datetime.now() - timedelta(hours=2)).timestamp(),
}
store = _make_store_with_db(tmp_path, db)
key = store._generate_session_key(source)
store._entries[key] = _make_entry(key, "sid_stale")
result = store.get_or_create_session(source)
# SAME session_id (resumed), not a brand-new one, and not silently
# routed into the closed entry.
assert result.session_id == "sid_stale"
db.reopen_session.assert_called_once_with("sid_stale")
# A brand-new session row must NOT have been created.
db.create_session.assert_not_called()
def test_stale_entry_creates_fresh_when_recovery_returns_none(self, tmp_path):
"""Stale entry, no recoverable row → brand-new session (no silent drop)."""
source = _source()
# Ended with a non-recoverable reason (e.g. /new) → finder returns None.
db = _db_returning({"sid_stale": {"end_reason": "new_command", "id": "sid_stale"}})
db.find_latest_gateway_session_for_peer.return_value = None
store = _make_store_with_db(tmp_path, db)
key = store._generate_session_key(source)
store._entries[key] = _make_entry(key, "sid_stale")
result = store.get_or_create_session(source)
assert result.session_id != "sid_stale"
# A fresh session row was created for the new session_id.
db.create_session.assert_called_once()
assert store._entries[key].session_id == result.session_id
def test_live_entry_returned_unchanged(self, tmp_path):
"""A session still alive in the DB is returned as-is (no churn)."""
source = _source()
db = _db_returning({"sid_live": {"end_reason": None, "id": "sid_live"}})
store = _make_store_with_db(tmp_path, db)
key = store._generate_session_key(source)
store._entries[key] = _make_entry(key, "sid_live")
result = store.get_or_create_session(source)
assert result.session_id == "sid_live"
db.find_latest_gateway_session_for_peer.assert_not_called()
db.create_session.assert_not_called()
def test_stale_check_wins_over_suspended(self, tmp_path):
"""A stale entry that is ALSO suspended is still dropped via the stale
path we must not consult the dead entry's reset/suspend state."""
source = _source()
db = _db_returning({"sid_stale": {"end_reason": "agent_close", "id": "sid_stale"}})
db.find_latest_gateway_session_for_peer.return_value = None # → fresh
store = _make_store_with_db(tmp_path, db)
key = store._generate_session_key(source)
store._entries[key] = _make_entry(key, "sid_stale", suspended=True)
result = store.get_or_create_session(source)
# Did not return the stale (suspended) entry; created a fresh session.
assert result.session_id != "sid_stale"
db.create_session.assert_called_once()
def test_force_new_skips_stale_check(self, tmp_path):
"""force_new short-circuits the whole existing-entry branch; the stale
DB lookup must not even run."""
source = _source()
db = _db_returning({"sid_old": {"end_reason": "agent_close", "id": "sid_old"}})
store = _make_store_with_db(tmp_path, db)
key = store._generate_session_key(source)
store._entries[key] = _make_entry(key, "sid_old")
result = store.get_or_create_session(source, force_new=True)
assert result.session_id != "sid_old"
db.get_session.assert_not_called()