diff --git a/gateway/session.py b/gateway/session.py index 905de41e622..d2f5b500750 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -905,11 +905,11 @@ class SessionStore: # in state.db. A hard gateway crash (exit code 1) skips the graceful # shutdown path, so sessions.json is never cleared and is left pointing # at ended sessions. On the next startup those stale entries act as live - # routing keys, but get_or_create_session() reuses them as long as the - # time/policy reset checks pass — it never consults end_reason — so every - # incoming message is silently routed into a closed session. Pruning here - # (lock already held) is cheap: one lookup per routing key, once at - # startup, and self-heals into a fresh session on the next message. + # routing keys. get_or_create_session() only consulted end_reason at + # startup (here) until #54878 added a routing-time guard for the + # live-gateway case; this startup prune still self-heals crash-left + # entries before the first message arrives. Pruning here (lock already + # held) is cheap: one lookup per routing key, once at startup. self._prune_stale_sessions_locked() def _prune_stale_sessions_locked(self) -> None: @@ -1148,6 +1148,32 @@ class SessionStore: return False + def _is_session_ended_in_db(self, session_id: str) -> bool: + """Return True iff state.db has this session with a non-null end_reason. + + Mirrors the staleness test in ``_prune_stale_sessions_locked``: + - no DB handle / no session_id -> False (can't tell — keep) + - row absent (legacy / not yet persisted) -> False (keep) + - end_reason is None -> False (alive — keep) + - end_reason not None -> True (ended — stale) + + Used by ``get_or_create_session`` to self-heal at routing time: + ``_prune_stale_sessions_locked`` only runs at startup, so a session + ended in the DB while the gateway stays alive (any path that finalizes + the row without clearing sessions.json) would otherwise be reused as a + live routing key and silently swallow every subsequent message until + the next restart (#54878 — the live-gateway variant of #52804/FM9). + DB errors are non-fatal — never block routing on a failed lookup. + """ + db = getattr(self, "_db", None) + if not db or not session_id: + return False + try: + row = db.get_session(session_id) + except Exception: + return False + return bool(row is not None and row.get("end_reason") is not None) + def _should_reset(self, entry: SessionEntry, source: SessionSource) -> Optional[str]: """ Check if a session should be reset based on policy. @@ -1243,39 +1269,73 @@ class SessionStore: if session_key in self._entries and not force_new: entry = self._entries[session_key] - # Auto-reset sessions marked as suspended (e.g. after /stop - # broke a stuck loop — #7536). ``suspended`` is the hard - # forced-wipe signal and always wins over ``resume_pending``, - # so repeated interrupted restarts that escalate via the - # existing ``.restart_failure_counts`` stuck-loop counter - # still converge to a clean slate. - if entry.suspended: - reset_reason = "suspended" - elif entry.resume_pending: - # Restart-interrupted session: preserve the session_id - # and return the existing entry so the transcript reloads - # intact, but still honour normal daily/idle reset policy. - reset_reason = self._should_reset(entry, source) + # Self-heal stale routing: if this session_key still points at + # a session that has ALREADY been ended in state.db (end_reason + # set), the in-memory sessions.json entry is stale. Reusing it + # would route every incoming message into a closed session and + # silently drop it — with no log, no error, no response — until + # the gateway restarts and _prune_stale_sessions_locked() clears + # it (#54878 — the live-gateway variant of #52804/FM9, which + # only the startup prune previously caught). + # + # Drop the stale entry and fall through to the recovery path + # below. Leaving db_end_session_id None routes us into + # _recover_session_from_db, whose finder + # (hermes_state.find_latest_gateway_session_for_peer) selects + # rows WHERE `ended_at IS NULL OR end_reason = 'agent_close'` + # — so it REOPENS gateway-cleanup-ended ('agent_close') rows and + # resumes the SAME session_id (transcript preserved), but returns + # None for any other end_reason (e.g. /new), which then correctly + # starts a fresh session. + if self._is_session_ended_in_db(entry.session_id): + logger.warning( + "gateway.session: routing key %r -> %s is ended in " + "state.db but still live in sessions.json; dropping " + "stale entry and recovering/recreating the session " + "(#54878)", + session_key, entry.session_id, + ) + self._entries.pop(session_key, None) + was_auto_reset = False + auto_reset_reason = None + reset_had_activity = False + # Fall through to the recovery/create path below; the + # stale entry is gone so we must NOT consult its + # suspended/resume/reset state. + else: + # Auto-reset sessions marked as suspended (e.g. after /stop + # broke a stuck loop — #7536). ``suspended`` is the hard + # forced-wipe signal and always wins over ``resume_pending``, + # so repeated interrupted restarts that escalate via the + # existing ``.restart_failure_counts`` stuck-loop counter + # still converge to a clean slate. + if entry.suspended: + reset_reason = "suspended" + elif entry.resume_pending: + # Restart-interrupted session: preserve the session_id + # and return the existing entry so the transcript reloads + # intact, but still honour normal daily/idle reset policy. + reset_reason = self._should_reset(entry, source) + if not reset_reason: + entry.updated_at = now + self._save() + return entry + else: + reset_reason = self._should_reset(entry, source) if not reset_reason: entry.updated_at = now self._save() return entry - else: - reset_reason = self._should_reset(entry, source) - if not reset_reason: - entry.updated_at = now - self._save() - return entry - else: - # Session is being auto-reset. - was_auto_reset = True - auto_reset_reason = reset_reason - # Track whether the expired session had any real conversation. - # total_tokens is never written (token counts migrated to - # agent-direct persistence) so it is always 0 — use - # last_prompt_tokens, which is updated on every turn. - reset_had_activity = entry.last_prompt_tokens > 0 - db_end_session_id = entry.session_id + else: + # Session is being auto-reset. + was_auto_reset = True + auto_reset_reason = reset_reason + # Track whether the expired session had any real + # conversation. total_tokens is never written (token + # counts migrated to agent-direct persistence) so it is + # always 0 — use last_prompt_tokens, updated every turn. + reset_had_activity = entry.last_prompt_tokens > 0 + db_end_session_id = entry.session_id else: was_auto_reset = False auto_reset_reason = None diff --git a/tests/gateway/test_session_store_runtime_stale_guard.py b/tests/gateway/test_session_store_runtime_stale_guard.py new file mode 100644 index 00000000000..262d1a489bb --- /dev/null +++ b/tests/gateway/test_session_store_runtime_stale_guard.py @@ -0,0 +1,200 @@ +"""Runtime self-heal for stale sessions.json routing entries (#54878). + +`_prune_stale_sessions_locked` only runs at gateway startup. A session ended +in state.db while the gateway stays alive (e.g. any path that finalizes the +row without clearing sessions.json) leaves a stale `session_key -> session_id` +mapping whose session has `end_reason` set. Before this fix, +`get_or_create_session` returned that stale entry as a live routing key (it +never consulted end_reason), so every subsequent message was silently routed +into a closed session and dropped — no log, no error, no response — until the +next restart pruned it. + +This is the live-gateway variant of #52804/FM9 (#52808/#54138 startup prune), +which required an actual gateway *crash*. Here the guard inside +`get_or_create_session` detects the ended row at routing time and drops the +stale entry, falling through to `_recover_session_from_db` (which reopens +`agent_close`-ended rows and resumes the SAME session_id, preserving the +transcript) or, failing recovery, to a fresh session. +""" + +from datetime import datetime, timedelta +from unittest.mock import MagicMock, patch + +from gateway.config import GatewayConfig, Platform, SessionResetPolicy +from gateway.session import SessionEntry, SessionSource, SessionStore + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_entry(key: str, session_id: str, **kw) -> SessionEntry: + now = datetime.now() + return SessionEntry( + session_key=key, + session_id=session_id, + created_at=now - timedelta(hours=2), + updated_at=now - timedelta(hours=1), + platform=Platform.TELEGRAM, + chat_type="dm", + **kw, + ) + + +def _db_returning(rows: dict) -> MagicMock: + """SessionDB mock where get_session maps session_id -> row dict.""" + db = MagicMock() + db.get_session.side_effect = lambda sid: rows.get(sid) + # By default recovery finds nothing (forces a fresh session). + db.find_latest_gateway_session_for_peer.return_value = None + db.reopen_session.return_value = None + db.create_session.return_value = None + return db + + +def _make_store_with_db(tmp_path, db_mock) -> SessionStore: + """Build a SessionStore with a mock SessionDB, bypassing disk load.""" + config = GatewayConfig(default_reset_policy=SessionResetPolicy(mode="none")) + with patch("gateway.session.SessionStore._ensure_loaded"): + store = SessionStore(sessions_dir=tmp_path, config=config) + store._db = db_mock + store._loaded = True + return store + + +def _source() -> SessionSource: + # session_key for this peer is deterministic; matches the entry key we seed. + return SessionSource( + platform=Platform.TELEGRAM, + chat_id="8494508720", + chat_type="dm", + user_id="8494508720", + ) + + +# --------------------------------------------------------------------------- +# _is_session_ended_in_db helper +# --------------------------------------------------------------------------- + +class TestIsSessionEndedInDb: + def test_ended_row_is_stale(self, tmp_path): + db = _db_returning({"sid": {"end_reason": "agent_close", "id": "sid"}}) + store = _make_store_with_db(tmp_path, db) + assert store._is_session_ended_in_db("sid") is True + + def test_alive_row_not_stale(self, tmp_path): + db = _db_returning({"sid": {"end_reason": None, "id": "sid"}}) + store = _make_store_with_db(tmp_path, db) + assert store._is_session_ended_in_db("sid") is False + + def test_absent_row_not_stale(self, tmp_path): + # Not yet persisted / legacy — must NOT be treated as ended, else a + # freshly-created in-memory session would be wrongly discarded. + db = _db_returning({}) + store = _make_store_with_db(tmp_path, db) + assert store._is_session_ended_in_db("sid_absent") is False + + def test_no_db_not_stale(self, tmp_path): + store = _make_store_with_db(tmp_path, _db_returning({})) + store._db = None + assert store._is_session_ended_in_db("sid") is False + + def test_empty_session_id_not_stale(self, tmp_path): + store = _make_store_with_db(tmp_path, _db_returning({})) + assert store._is_session_ended_in_db("") is False + + def test_db_error_not_stale(self, tmp_path): + db = MagicMock() + db.get_session.side_effect = Exception("DB locked") + store = _make_store_with_db(tmp_path, db) + # On error, never block routing — treat as not-stale (keep). + assert store._is_session_ended_in_db("sid") is False + + +# --------------------------------------------------------------------------- +# get_or_create_session — runtime self-heal +# --------------------------------------------------------------------------- + +class TestRuntimeStaleGuard: + def test_stale_agent_close_entry_recovered_preserving_session_id(self, tmp_path): + """Stale `agent_close` entry → recovery reopens the SAME session_id.""" + source = _source() + db = _db_returning({"sid_stale": {"end_reason": "agent_close", "id": "sid_stale"}}) + # Recovery finds the agent_close row and reopens it (transcript-preserving). + db.find_latest_gateway_session_for_peer.return_value = { + "id": "sid_stale", + "started_at": (datetime.now() - timedelta(hours=2)).timestamp(), + } + store = _make_store_with_db(tmp_path, db) + key = store._generate_session_key(source) + store._entries[key] = _make_entry(key, "sid_stale") + + result = store.get_or_create_session(source) + + # SAME session_id (resumed), not a brand-new one, and not silently + # routed into the closed entry. + assert result.session_id == "sid_stale" + db.reopen_session.assert_called_once_with("sid_stale") + # A brand-new session row must NOT have been created. + db.create_session.assert_not_called() + + def test_stale_entry_creates_fresh_when_recovery_returns_none(self, tmp_path): + """Stale entry, no recoverable row → brand-new session (no silent drop).""" + source = _source() + # Ended with a non-recoverable reason (e.g. /new) → finder returns None. + db = _db_returning({"sid_stale": {"end_reason": "new_command", "id": "sid_stale"}}) + db.find_latest_gateway_session_for_peer.return_value = None + store = _make_store_with_db(tmp_path, db) + key = store._generate_session_key(source) + store._entries[key] = _make_entry(key, "sid_stale") + + result = store.get_or_create_session(source) + + assert result.session_id != "sid_stale" + # A fresh session row was created for the new session_id. + db.create_session.assert_called_once() + assert store._entries[key].session_id == result.session_id + + def test_live_entry_returned_unchanged(self, tmp_path): + """A session still alive in the DB is returned as-is (no churn).""" + source = _source() + db = _db_returning({"sid_live": {"end_reason": None, "id": "sid_live"}}) + store = _make_store_with_db(tmp_path, db) + key = store._generate_session_key(source) + store._entries[key] = _make_entry(key, "sid_live") + + result = store.get_or_create_session(source) + + assert result.session_id == "sid_live" + db.find_latest_gateway_session_for_peer.assert_not_called() + db.create_session.assert_not_called() + + def test_stale_check_wins_over_suspended(self, tmp_path): + """A stale entry that is ALSO suspended is still dropped via the stale + path — we must not consult the dead entry's reset/suspend state.""" + source = _source() + db = _db_returning({"sid_stale": {"end_reason": "agent_close", "id": "sid_stale"}}) + db.find_latest_gateway_session_for_peer.return_value = None # → fresh + store = _make_store_with_db(tmp_path, db) + key = store._generate_session_key(source) + store._entries[key] = _make_entry(key, "sid_stale", suspended=True) + + result = store.get_or_create_session(source) + + # Did not return the stale (suspended) entry; created a fresh session. + assert result.session_id != "sid_stale" + db.create_session.assert_called_once() + + def test_force_new_skips_stale_check(self, tmp_path): + """force_new short-circuits the whole existing-entry branch; the stale + DB lookup must not even run.""" + source = _source() + db = _db_returning({"sid_old": {"end_reason": "agent_close", "id": "sid_old"}}) + store = _make_store_with_db(tmp_path, db) + key = store._generate_session_key(source) + store._entries[key] = _make_entry(key, "sid_old") + + result = store.get_or_create_session(source, force_new=True) + + assert result.session_id != "sid_old" + db.get_session.assert_not_called()