diff --git a/gateway/config.py b/gateway/config.py index 1258e08990..799b151b7f 100644 --- a/gateway/config.py +++ b/gateway/config.py @@ -258,6 +258,13 @@ class GatewayConfig: # Streaming configuration streaming: StreamingConfig = field(default_factory=StreamingConfig) + # Session store pruning: drop SessionEntry records older than this many + # days from the in-memory dict and sessions.json. Keeps the store from + # growing unbounded in gateways serving many chats/threads/users over + # months. Pruning is invisible to users — if they resume, they get a + # fresh session exactly as if the reset policy had fired. 0 = disabled. + session_store_max_age_days: int = 90 + def get_connected_platforms(self) -> List[Platform]: """Return list of platforms that are enabled and configured.""" connected = [] @@ -365,6 +372,7 @@ class GatewayConfig: "thread_sessions_per_user": self.thread_sessions_per_user, "unauthorized_dm_behavior": self.unauthorized_dm_behavior, "streaming": self.streaming.to_dict(), + "session_store_max_age_days": self.session_store_max_age_days, } @classmethod @@ -412,6 +420,13 @@ class GatewayConfig: "pair", ) + try: + session_store_max_age_days = int(data.get("session_store_max_age_days", 90)) + if session_store_max_age_days < 0: + session_store_max_age_days = 0 + except (TypeError, ValueError): + session_store_max_age_days = 90 + return cls( platforms=platforms, default_reset_policy=default_policy, @@ -426,6 +441,7 @@ class GatewayConfig: thread_sessions_per_user=_coerce_bool(thread_sessions_per_user, False), unauthorized_dm_behavior=unauthorized_dm_behavior, streaming=StreamingConfig.from_dict(data.get("streaming", {})), + session_store_max_age_days=session_store_max_age_days, ) def get_unauthorized_dm_behavior(self, platform: Optional[Platform] = None) -> str: diff --git a/gateway/run.py b/gateway/run.py index ca07254268..e642d9df02 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -2178,6 +2178,30 @@ class GatewayRunner: ) except Exception as _e: logger.debug("Idle agent sweep failed: %s", _e) + + # Periodically prune stale SessionStore entries. The + # in-memory dict (and sessions.json) would otherwise grow + # unbounded in gateways serving many rotating chats / + # threads / users over long time windows. Pruning is + # invisible to users — a resumed session just gets a + # fresh session_id, exactly as if the reset policy fired. + _last_prune_ts = getattr(self, "_last_session_store_prune_ts", 0.0) + _prune_interval = 3600.0 # once per hour + if time.time() - _last_prune_ts > _prune_interval: + try: + _max_age = int( + getattr(self.config, "session_store_max_age_days", 0) or 0 + ) + if _max_age > 0: + _pruned = self.session_store.prune_old_entries(_max_age) + if _pruned: + logger.info( + "SessionStore prune: dropped %d stale entries", + _pruned, + ) + except Exception as _e: + logger.debug("SessionStore prune failed: %s", _e) + self._last_session_store_prune_ts = time.time() except Exception as e: logger.debug("Session expiry watcher error: %s", e) # Sleep in small increments so we can stop quickly diff --git a/gateway/session.py b/gateway/session.py index f057d1cfc0..4cb623128c 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -802,6 +802,57 @@ class SessionStore: return True return False + def prune_old_entries(self, max_age_days: int) -> int: + """Drop SessionEntry records older than max_age_days. + + Pruning is based on ``updated_at`` (last activity), not ``created_at``. + A session that's been active within the window is kept regardless of + how old it is. Entries marked ``suspended`` are kept — the user + explicitly paused them for later resume. Entries held by an active + process (via has_active_processes_fn) are also kept so long-running + background work isn't orphaned. + + Pruning is functionally identical to a natural reset-policy expiry: + the transcript in SQLite stays, but the session_key → session_id + mapping is dropped and the user starts a fresh session on return. + + ``max_age_days <= 0`` disables pruning; returns 0 immediately. + Returns the number of entries removed. + """ + if max_age_days is None or max_age_days <= 0: + return 0 + from datetime import timedelta + + cutoff = _now() - timedelta(days=max_age_days) + removed_keys: list[str] = [] + + with self._lock: + self._ensure_loaded_locked() + for key, entry in list(self._entries.items()): + if entry.suspended: + continue + # Never prune sessions with an active background process + # attached — the user may still be waiting on output. + if self._has_active_processes_fn is not None: + try: + if self._has_active_processes_fn(entry.session_id): + continue + except Exception: + pass + if entry.updated_at < cutoff: + removed_keys.append(key) + for key in removed_keys: + self._entries.pop(key, None) + if removed_keys: + self._save() + + if removed_keys: + logger.info( + "SessionStore pruned %d entries older than %d days", + len(removed_keys), max_age_days, + ) + return len(removed_keys) + def suspend_recently_active(self, max_age_seconds: int = 120) -> int: """Mark recently-active sessions as suspended. diff --git a/tests/gateway/test_session_store_prune.py b/tests/gateway/test_session_store_prune.py new file mode 100644 index 0000000000..9b1dca2971 --- /dev/null +++ b/tests/gateway/test_session_store_prune.py @@ -0,0 +1,270 @@ +"""Tests for SessionStore.prune_old_entries and the gateway watcher that calls it. + +The SessionStore in-memory dict (and its backing sessions.json) grew +unbounded — every unique (platform, chat_id, thread_id, user_id) tuple +ever seen was kept forever, regardless of how stale it became. These +tests pin the prune behaviour: + + * Entries older than max_age_days (by updated_at) are removed + * Entries marked ``suspended`` are preserved (user-paused) + * Entries with an active process attached are preserved + * max_age_days <= 0 disables pruning entirely + * sessions.json is rewritten with the post-prune dict + * The ``updated_at`` field — not ``created_at`` — drives the decision + (so a long-running-but-still-active session isn't pruned) +""" + +import json +import threading +from datetime import datetime, timedelta +from unittest.mock import patch + +import pytest + +from gateway.config import GatewayConfig, Platform, SessionResetPolicy +from gateway.session import SessionEntry, SessionStore + + +def _make_store(tmp_path, max_age_days: int = 90, has_active_processes_fn=None): + """Build a SessionStore bypassing SQLite/disk-load side effects.""" + config = GatewayConfig( + default_reset_policy=SessionResetPolicy(mode="none"), + session_store_max_age_days=max_age_days, + ) + with patch("gateway.session.SessionStore._ensure_loaded"): + store = SessionStore( + sessions_dir=tmp_path, + config=config, + has_active_processes_fn=has_active_processes_fn, + ) + store._db = None + store._loaded = True + return store + + +def _entry(key: str, age_days: float, *, suspended: bool = False, + session_id: str | None = None) -> SessionEntry: + now = datetime.now() + return SessionEntry( + session_key=key, + session_id=session_id or f"sid_{key}", + created_at=now - timedelta(days=age_days + 30), # arbitrary older + updated_at=now - timedelta(days=age_days), + platform=Platform.TELEGRAM, + chat_type="dm", + suspended=suspended, + ) + + +class TestPruneBasics: + def test_prune_removes_entries_past_max_age(self, tmp_path): + store = _make_store(tmp_path) + store._entries["old"] = _entry("old", age_days=100) + store._entries["fresh"] = _entry("fresh", age_days=5) + + removed = store.prune_old_entries(max_age_days=90) + + assert removed == 1 + assert "old" not in store._entries + assert "fresh" in store._entries + + def test_prune_uses_updated_at_not_created_at(self, tmp_path): + """A session created long ago but updated recently must be kept.""" + store = _make_store(tmp_path) + now = datetime.now() + entry = SessionEntry( + session_key="long-lived", + session_id="sid", + created_at=now - timedelta(days=365), # ancient + updated_at=now - timedelta(days=3), # but just chatted + platform=Platform.TELEGRAM, + chat_type="dm", + ) + store._entries["long-lived"] = entry + + removed = store.prune_old_entries(max_age_days=30) + + assert removed == 0 + assert "long-lived" in store._entries + + def test_prune_disabled_when_max_age_is_zero(self, tmp_path): + store = _make_store(tmp_path, max_age_days=0) + for i in range(5): + store._entries[f"s{i}"] = _entry(f"s{i}", age_days=365) + + assert store.prune_old_entries(0) == 0 + assert len(store._entries) == 5 + + def test_prune_disabled_when_max_age_is_negative(self, tmp_path): + store = _make_store(tmp_path) + store._entries["s"] = _entry("s", age_days=365) + + assert store.prune_old_entries(-1) == 0 + assert "s" in store._entries + + def test_prune_skips_suspended_entries(self, tmp_path): + """/stop-suspended sessions must be kept for later resume.""" + store = _make_store(tmp_path) + store._entries["suspended"] = _entry( + "suspended", age_days=1000, suspended=True + ) + store._entries["idle"] = _entry("idle", age_days=1000) + + removed = store.prune_old_entries(max_age_days=90) + + assert removed == 1 + assert "suspended" in store._entries + assert "idle" not in store._entries + + def test_prune_skips_entries_with_active_processes(self, tmp_path): + """Sessions with active bg processes aren't pruned even if old.""" + active_session_ids = {"sid_active"} + + def _has_active(session_id: str) -> bool: + return session_id in active_session_ids + + store = _make_store(tmp_path, has_active_processes_fn=_has_active) + store._entries["active"] = _entry( + "active", age_days=1000, session_id="sid_active" + ) + store._entries["idle"] = _entry( + "idle", age_days=1000, session_id="sid_idle" + ) + + removed = store.prune_old_entries(max_age_days=90) + + assert removed == 1 + assert "active" in store._entries + assert "idle" not in store._entries + + def test_prune_does_not_write_disk_when_no_removals(self, tmp_path): + """If nothing is evictable, _save() should NOT be called.""" + store = _make_store(tmp_path) + store._entries["fresh1"] = _entry("fresh1", age_days=1) + store._entries["fresh2"] = _entry("fresh2", age_days=2) + + save_calls = [] + store._save = lambda: save_calls.append(1) + + assert store.prune_old_entries(max_age_days=90) == 0 + assert save_calls == [] + + def test_prune_writes_disk_after_removal(self, tmp_path): + store = _make_store(tmp_path) + store._entries["stale"] = _entry("stale", age_days=500) + store._entries["fresh"] = _entry("fresh", age_days=1) + + save_calls = [] + store._save = lambda: save_calls.append(1) + + store.prune_old_entries(max_age_days=90) + assert save_calls == [1] + + def test_prune_is_thread_safe(self, tmp_path): + """Prune acquires _lock internally; concurrent update_session is safe.""" + store = _make_store(tmp_path) + for i in range(20): + age = 1000 if i % 2 == 0 else 1 + store._entries[f"s{i}"] = _entry(f"s{i}", age_days=age) + + results = [] + + def _pruner(): + results.append(store.prune_old_entries(max_age_days=90)) + + def _reader(): + # Mimic a concurrent update_session reader iterating under lock. + with store._lock: + list(store._entries.keys()) + + threads = [threading.Thread(target=_pruner)] + threads += [threading.Thread(target=_reader) for _ in range(4)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=5) + assert not t.is_alive() + + # Exactly one pruner ran; removed exactly the 10 stale entries. + assert results == [10] + assert len(store._entries) == 10 + for i in range(20): + if i % 2 == 1: # fresh + assert f"s{i}" in store._entries + + +class TestPrunePersistsToDisk: + def test_prune_rewrites_sessions_json(self, tmp_path): + """After prune, sessions.json on disk reflects the new dict.""" + config = GatewayConfig( + default_reset_policy=SessionResetPolicy(mode="none"), + session_store_max_age_days=90, + ) + store = SessionStore(sessions_dir=tmp_path, config=config) + store._db = None + # Force-populate without calling get_or_create to avoid DB side-effects + store._entries["stale"] = _entry("stale", age_days=500) + store._entries["fresh"] = _entry("fresh", age_days=1) + store._loaded = True + store._save() + + # Verify pre-prune state on disk. + saved_pre = json.loads((tmp_path / "sessions.json").read_text()) + assert set(saved_pre.keys()) == {"stale", "fresh"} + + # Prune and check disk. + store.prune_old_entries(max_age_days=90) + saved_post = json.loads((tmp_path / "sessions.json").read_text()) + assert set(saved_post.keys()) == {"fresh"} + + +class TestGatewayConfigSerialization: + def test_session_store_max_age_days_defaults_to_90(self): + cfg = GatewayConfig() + assert cfg.session_store_max_age_days == 90 + + def test_session_store_max_age_days_roundtrips(self): + cfg = GatewayConfig(session_store_max_age_days=30) + restored = GatewayConfig.from_dict(cfg.to_dict()) + assert restored.session_store_max_age_days == 30 + + def test_session_store_max_age_days_missing_defaults_90(self): + """Loading an old config (pre-this-field) falls back to default.""" + restored = GatewayConfig.from_dict({}) + assert restored.session_store_max_age_days == 90 + + def test_session_store_max_age_days_negative_coerced_to_zero(self): + """A negative value (accidental or hostile) becomes 0 (disabled).""" + restored = GatewayConfig.from_dict({"session_store_max_age_days": -5}) + assert restored.session_store_max_age_days == 0 + + def test_session_store_max_age_days_bad_type_falls_back(self): + """Non-int values fall back to the default, not a crash.""" + restored = GatewayConfig.from_dict({"session_store_max_age_days": "nope"}) + assert restored.session_store_max_age_days == 90 + + +class TestGatewayWatcherCallsPrune: + """The session_expiry_watcher should call prune_old_entries once per hour.""" + + def test_prune_gate_fires_on_first_tick(self): + """First watcher tick has _last_prune_ts=0, so the gate opens.""" + import time as _t + + last_ts = 0.0 + prune_interval = 3600.0 + now = _t.time() + + # Mirror the production gate check in _session_expiry_watcher. + should_prune = (now - last_ts) > prune_interval + assert should_prune is True + + def test_prune_gate_suppresses_within_interval(self): + import time as _t + + last_ts = _t.time() - 600 # 10 minutes ago + prune_interval = 3600.0 + now = _t.time() + + should_prune = (now - last_ts) > prune_interval + assert should_prune is False