hermes-agent/tests/gateway/test_session_store_prune.py
Teknium eb07c05646
fix(gateway): prune stale SessionStore entries to bound memory + disk (#11789)
SessionStore._entries grew unbounded.  Every unique
(platform, chat_id, thread_id, user_id) tuple ever seen was kept in
RAM and rewritten to sessions.json on every message.  A Discord bot
in 100 servers x 100 channels x ~100 rotating users accumulates on
the order of 10^5 entries after a few months; each sessions.json
write becomes an O(n) fsync.  Nothing trimmed this — there was no
TTL, no cap, no eviction path.

Changes
-------
* SessionStore.prune_old_entries(max_age_days) — drops entries whose
  updated_at is older than the cutoff.  Preserves:
    - suspended entries (user paused them via /stop for later resume)
    - entries with an active background process attached
  Pruning is functionally identical to a natural reset-policy expiry:
  SQLite transcript stays, session_key -> session_id mapping dropped,
  returning user gets a fresh session.

* GatewayConfig.session_store_max_age_days (default 90; 0 disables).
  Serialized in to_dict/from_dict, coerced from bad types / negatives
  to safe defaults.  No migration needed — missing field -> 90 days.

* _session_expiry_watcher calls prune_old_entries once per hour
  (first tick is immediate).  Uses the existing watcher loop so no
  new background task is created.

Why not more aggressive
-----------------------
90 days is long enough that legitimate long-idle users (seasonal,
vacation, etc.) aren't surprised — pruning just means they get a
fresh session on return, same outcome they'd get from any other
reset-policy trigger.  Admins can lower it via config; 0 disables.

Tests
-----
tests/gateway/test_session_store_prune.py — 17 cases covering:
  * entry age based on updated_at, not created_at
  * max_age_days=0 disables; negative coerces to 0
  * suspended + active-process entries are skipped
  * _save fires iff something was removed
  * disk JSON reflects post-prune state
  * thread safety against concurrent readers
  * config field roundtrips + graceful fallback on bad values
  * watcher gate logic (first tick prunes, subsequent within 1h don't)

119 broader session/gateway tests remain green.
2026-04-17 13:48:49 -07:00

270 lines
9.9 KiB
Python

"""Tests for SessionStore.prune_old_entries and the gateway watcher that calls it.
The SessionStore in-memory dict (and its backing sessions.json) grew
unbounded — every unique (platform, chat_id, thread_id, user_id) tuple
ever seen was kept forever, regardless of how stale it became. These
tests pin the prune behaviour:
* Entries older than max_age_days (by updated_at) are removed
* Entries marked ``suspended`` are preserved (user-paused)
* Entries with an active process attached are preserved
* max_age_days <= 0 disables pruning entirely
* sessions.json is rewritten with the post-prune dict
* The ``updated_at`` field — not ``created_at`` — drives the decision
(so a long-running-but-still-active session isn't pruned)
"""
import json
import threading
from datetime import datetime, timedelta
from unittest.mock import patch
import pytest
from gateway.config import GatewayConfig, Platform, SessionResetPolicy
from gateway.session import SessionEntry, SessionStore
def _make_store(tmp_path, max_age_days: int = 90, has_active_processes_fn=None):
"""Build a SessionStore bypassing SQLite/disk-load side effects."""
config = GatewayConfig(
default_reset_policy=SessionResetPolicy(mode="none"),
session_store_max_age_days=max_age_days,
)
with patch("gateway.session.SessionStore._ensure_loaded"):
store = SessionStore(
sessions_dir=tmp_path,
config=config,
has_active_processes_fn=has_active_processes_fn,
)
store._db = None
store._loaded = True
return store
def _entry(key: str, age_days: float, *, suspended: bool = False,
session_id: str | None = None) -> SessionEntry:
now = datetime.now()
return SessionEntry(
session_key=key,
session_id=session_id or f"sid_{key}",
created_at=now - timedelta(days=age_days + 30), # arbitrary older
updated_at=now - timedelta(days=age_days),
platform=Platform.TELEGRAM,
chat_type="dm",
suspended=suspended,
)
class TestPruneBasics:
def test_prune_removes_entries_past_max_age(self, tmp_path):
store = _make_store(tmp_path)
store._entries["old"] = _entry("old", age_days=100)
store._entries["fresh"] = _entry("fresh", age_days=5)
removed = store.prune_old_entries(max_age_days=90)
assert removed == 1
assert "old" not in store._entries
assert "fresh" in store._entries
def test_prune_uses_updated_at_not_created_at(self, tmp_path):
"""A session created long ago but updated recently must be kept."""
store = _make_store(tmp_path)
now = datetime.now()
entry = SessionEntry(
session_key="long-lived",
session_id="sid",
created_at=now - timedelta(days=365), # ancient
updated_at=now - timedelta(days=3), # but just chatted
platform=Platform.TELEGRAM,
chat_type="dm",
)
store._entries["long-lived"] = entry
removed = store.prune_old_entries(max_age_days=30)
assert removed == 0
assert "long-lived" in store._entries
def test_prune_disabled_when_max_age_is_zero(self, tmp_path):
store = _make_store(tmp_path, max_age_days=0)
for i in range(5):
store._entries[f"s{i}"] = _entry(f"s{i}", age_days=365)
assert store.prune_old_entries(0) == 0
assert len(store._entries) == 5
def test_prune_disabled_when_max_age_is_negative(self, tmp_path):
store = _make_store(tmp_path)
store._entries["s"] = _entry("s", age_days=365)
assert store.prune_old_entries(-1) == 0
assert "s" in store._entries
def test_prune_skips_suspended_entries(self, tmp_path):
"""/stop-suspended sessions must be kept for later resume."""
store = _make_store(tmp_path)
store._entries["suspended"] = _entry(
"suspended", age_days=1000, suspended=True
)
store._entries["idle"] = _entry("idle", age_days=1000)
removed = store.prune_old_entries(max_age_days=90)
assert removed == 1
assert "suspended" in store._entries
assert "idle" not in store._entries
def test_prune_skips_entries_with_active_processes(self, tmp_path):
"""Sessions with active bg processes aren't pruned even if old."""
active_session_ids = {"sid_active"}
def _has_active(session_id: str) -> bool:
return session_id in active_session_ids
store = _make_store(tmp_path, has_active_processes_fn=_has_active)
store._entries["active"] = _entry(
"active", age_days=1000, session_id="sid_active"
)
store._entries["idle"] = _entry(
"idle", age_days=1000, session_id="sid_idle"
)
removed = store.prune_old_entries(max_age_days=90)
assert removed == 1
assert "active" in store._entries
assert "idle" not in store._entries
def test_prune_does_not_write_disk_when_no_removals(self, tmp_path):
"""If nothing is evictable, _save() should NOT be called."""
store = _make_store(tmp_path)
store._entries["fresh1"] = _entry("fresh1", age_days=1)
store._entries["fresh2"] = _entry("fresh2", age_days=2)
save_calls = []
store._save = lambda: save_calls.append(1)
assert store.prune_old_entries(max_age_days=90) == 0
assert save_calls == []
def test_prune_writes_disk_after_removal(self, tmp_path):
store = _make_store(tmp_path)
store._entries["stale"] = _entry("stale", age_days=500)
store._entries["fresh"] = _entry("fresh", age_days=1)
save_calls = []
store._save = lambda: save_calls.append(1)
store.prune_old_entries(max_age_days=90)
assert save_calls == [1]
def test_prune_is_thread_safe(self, tmp_path):
"""Prune acquires _lock internally; concurrent update_session is safe."""
store = _make_store(tmp_path)
for i in range(20):
age = 1000 if i % 2 == 0 else 1
store._entries[f"s{i}"] = _entry(f"s{i}", age_days=age)
results = []
def _pruner():
results.append(store.prune_old_entries(max_age_days=90))
def _reader():
# Mimic a concurrent update_session reader iterating under lock.
with store._lock:
list(store._entries.keys())
threads = [threading.Thread(target=_pruner)]
threads += [threading.Thread(target=_reader) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join(timeout=5)
assert not t.is_alive()
# Exactly one pruner ran; removed exactly the 10 stale entries.
assert results == [10]
assert len(store._entries) == 10
for i in range(20):
if i % 2 == 1: # fresh
assert f"s{i}" in store._entries
class TestPrunePersistsToDisk:
def test_prune_rewrites_sessions_json(self, tmp_path):
"""After prune, sessions.json on disk reflects the new dict."""
config = GatewayConfig(
default_reset_policy=SessionResetPolicy(mode="none"),
session_store_max_age_days=90,
)
store = SessionStore(sessions_dir=tmp_path, config=config)
store._db = None
# Force-populate without calling get_or_create to avoid DB side-effects
store._entries["stale"] = _entry("stale", age_days=500)
store._entries["fresh"] = _entry("fresh", age_days=1)
store._loaded = True
store._save()
# Verify pre-prune state on disk.
saved_pre = json.loads((tmp_path / "sessions.json").read_text())
assert set(saved_pre.keys()) == {"stale", "fresh"}
# Prune and check disk.
store.prune_old_entries(max_age_days=90)
saved_post = json.loads((tmp_path / "sessions.json").read_text())
assert set(saved_post.keys()) == {"fresh"}
class TestGatewayConfigSerialization:
def test_session_store_max_age_days_defaults_to_90(self):
cfg = GatewayConfig()
assert cfg.session_store_max_age_days == 90
def test_session_store_max_age_days_roundtrips(self):
cfg = GatewayConfig(session_store_max_age_days=30)
restored = GatewayConfig.from_dict(cfg.to_dict())
assert restored.session_store_max_age_days == 30
def test_session_store_max_age_days_missing_defaults_90(self):
"""Loading an old config (pre-this-field) falls back to default."""
restored = GatewayConfig.from_dict({})
assert restored.session_store_max_age_days == 90
def test_session_store_max_age_days_negative_coerced_to_zero(self):
"""A negative value (accidental or hostile) becomes 0 (disabled)."""
restored = GatewayConfig.from_dict({"session_store_max_age_days": -5})
assert restored.session_store_max_age_days == 0
def test_session_store_max_age_days_bad_type_falls_back(self):
"""Non-int values fall back to the default, not a crash."""
restored = GatewayConfig.from_dict({"session_store_max_age_days": "nope"})
assert restored.session_store_max_age_days == 90
class TestGatewayWatcherCallsPrune:
"""The session_expiry_watcher should call prune_old_entries once per hour."""
def test_prune_gate_fires_on_first_tick(self):
"""First watcher tick has _last_prune_ts=0, so the gate opens."""
import time as _t
last_ts = 0.0
prune_interval = 3600.0
now = _t.time()
# Mirror the production gate check in _session_expiry_watcher.
should_prune = (now - last_ts) > prune_interval
assert should_prune is True
def test_prune_gate_suppresses_within_interval(self):
import time as _t
last_ts = _t.time() - 600 # 10 minutes ago
prune_interval = 3600.0
now = _t.time()
should_prune = (now - last_ts) > prune_interval
assert should_prune is False