mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): prune stale SessionStore entries to bound memory + disk (#11789)
SessionStore._entries grew unbounded. Every unique
(platform, chat_id, thread_id, user_id) tuple ever seen was kept in
RAM and rewritten to sessions.json on every message. A Discord bot
in 100 servers x 100 channels x ~100 rotating users accumulates on
the order of 10^5 entries after a few months; each sessions.json
write becomes an O(n) fsync. Nothing trimmed this — there was no
TTL, no cap, no eviction path.
Changes
-------
* SessionStore.prune_old_entries(max_age_days) — drops entries whose
updated_at is older than the cutoff. Preserves:
- suspended entries (user paused them via /stop for later resume)
- entries with an active background process attached
Pruning is functionally identical to a natural reset-policy expiry:
SQLite transcript stays, session_key -> session_id mapping dropped,
returning user gets a fresh session.
* GatewayConfig.session_store_max_age_days (default 90; 0 disables).
Serialized in to_dict/from_dict, coerced from bad types / negatives
to safe defaults. No migration needed — missing field -> 90 days.
* _session_expiry_watcher calls prune_old_entries once per hour
(first tick is immediate). Uses the existing watcher loop so no
new background task is created.
Why not more aggressive
-----------------------
90 days is long enough that legitimate long-idle users (seasonal,
vacation, etc.) aren't surprised — pruning just means they get a
fresh session on return, same outcome they'd get from any other
reset-policy trigger. Admins can lower it via config; 0 disables.
Tests
-----
tests/gateway/test_session_store_prune.py — 17 cases covering:
* entry age based on updated_at, not created_at
* max_age_days=0 disables; negative coerces to 0
* suspended + active-process entries are skipped
* _save fires iff something was removed
* disk JSON reflects post-prune state
* thread safety against concurrent readers
* config field roundtrips + graceful fallback on bad values
* watcher gate logic (first tick prunes, subsequent within 1h don't)
119 broader session/gateway tests remain green.
This commit is contained in:
parent
f362083c64
commit
eb07c05646
4 changed files with 361 additions and 0 deletions
|
|
@ -258,6 +258,13 @@ class GatewayConfig:
|
|||
# Streaming configuration
|
||||
streaming: StreamingConfig = field(default_factory=StreamingConfig)
|
||||
|
||||
# Session store pruning: drop SessionEntry records older than this many
|
||||
# days from the in-memory dict and sessions.json. Keeps the store from
|
||||
# growing unbounded in gateways serving many chats/threads/users over
|
||||
# months. Pruning is invisible to users — if they resume, they get a
|
||||
# fresh session exactly as if the reset policy had fired. 0 = disabled.
|
||||
session_store_max_age_days: int = 90
|
||||
|
||||
def get_connected_platforms(self) -> List[Platform]:
|
||||
"""Return list of platforms that are enabled and configured."""
|
||||
connected = []
|
||||
|
|
@ -365,6 +372,7 @@ class GatewayConfig:
|
|||
"thread_sessions_per_user": self.thread_sessions_per_user,
|
||||
"unauthorized_dm_behavior": self.unauthorized_dm_behavior,
|
||||
"streaming": self.streaming.to_dict(),
|
||||
"session_store_max_age_days": self.session_store_max_age_days,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
|
|
@ -412,6 +420,13 @@ class GatewayConfig:
|
|||
"pair",
|
||||
)
|
||||
|
||||
try:
|
||||
session_store_max_age_days = int(data.get("session_store_max_age_days", 90))
|
||||
if session_store_max_age_days < 0:
|
||||
session_store_max_age_days = 0
|
||||
except (TypeError, ValueError):
|
||||
session_store_max_age_days = 90
|
||||
|
||||
return cls(
|
||||
platforms=platforms,
|
||||
default_reset_policy=default_policy,
|
||||
|
|
@ -426,6 +441,7 @@ class GatewayConfig:
|
|||
thread_sessions_per_user=_coerce_bool(thread_sessions_per_user, False),
|
||||
unauthorized_dm_behavior=unauthorized_dm_behavior,
|
||||
streaming=StreamingConfig.from_dict(data.get("streaming", {})),
|
||||
session_store_max_age_days=session_store_max_age_days,
|
||||
)
|
||||
|
||||
def get_unauthorized_dm_behavior(self, platform: Optional[Platform] = None) -> str:
|
||||
|
|
|
|||
|
|
@ -2178,6 +2178,30 @@ class GatewayRunner:
|
|||
)
|
||||
except Exception as _e:
|
||||
logger.debug("Idle agent sweep failed: %s", _e)
|
||||
|
||||
# Periodically prune stale SessionStore entries. The
|
||||
# in-memory dict (and sessions.json) would otherwise grow
|
||||
# unbounded in gateways serving many rotating chats /
|
||||
# threads / users over long time windows. Pruning is
|
||||
# invisible to users — a resumed session just gets a
|
||||
# fresh session_id, exactly as if the reset policy fired.
|
||||
_last_prune_ts = getattr(self, "_last_session_store_prune_ts", 0.0)
|
||||
_prune_interval = 3600.0 # once per hour
|
||||
if time.time() - _last_prune_ts > _prune_interval:
|
||||
try:
|
||||
_max_age = int(
|
||||
getattr(self.config, "session_store_max_age_days", 0) or 0
|
||||
)
|
||||
if _max_age > 0:
|
||||
_pruned = self.session_store.prune_old_entries(_max_age)
|
||||
if _pruned:
|
||||
logger.info(
|
||||
"SessionStore prune: dropped %d stale entries",
|
||||
_pruned,
|
||||
)
|
||||
except Exception as _e:
|
||||
logger.debug("SessionStore prune failed: %s", _e)
|
||||
self._last_session_store_prune_ts = time.time()
|
||||
except Exception as e:
|
||||
logger.debug("Session expiry watcher error: %s", e)
|
||||
# Sleep in small increments so we can stop quickly
|
||||
|
|
|
|||
|
|
@ -802,6 +802,57 @@ class SessionStore:
|
|||
return True
|
||||
return False
|
||||
|
||||
def prune_old_entries(self, max_age_days: int) -> int:
|
||||
"""Drop SessionEntry records older than max_age_days.
|
||||
|
||||
Pruning is based on ``updated_at`` (last activity), not ``created_at``.
|
||||
A session that's been active within the window is kept regardless of
|
||||
how old it is. Entries marked ``suspended`` are kept — the user
|
||||
explicitly paused them for later resume. Entries held by an active
|
||||
process (via has_active_processes_fn) are also kept so long-running
|
||||
background work isn't orphaned.
|
||||
|
||||
Pruning is functionally identical to a natural reset-policy expiry:
|
||||
the transcript in SQLite stays, but the session_key → session_id
|
||||
mapping is dropped and the user starts a fresh session on return.
|
||||
|
||||
``max_age_days <= 0`` disables pruning; returns 0 immediately.
|
||||
Returns the number of entries removed.
|
||||
"""
|
||||
if max_age_days is None or max_age_days <= 0:
|
||||
return 0
|
||||
from datetime import timedelta
|
||||
|
||||
cutoff = _now() - timedelta(days=max_age_days)
|
||||
removed_keys: list[str] = []
|
||||
|
||||
with self._lock:
|
||||
self._ensure_loaded_locked()
|
||||
for key, entry in list(self._entries.items()):
|
||||
if entry.suspended:
|
||||
continue
|
||||
# Never prune sessions with an active background process
|
||||
# attached — the user may still be waiting on output.
|
||||
if self._has_active_processes_fn is not None:
|
||||
try:
|
||||
if self._has_active_processes_fn(entry.session_id):
|
||||
continue
|
||||
except Exception:
|
||||
pass
|
||||
if entry.updated_at < cutoff:
|
||||
removed_keys.append(key)
|
||||
for key in removed_keys:
|
||||
self._entries.pop(key, None)
|
||||
if removed_keys:
|
||||
self._save()
|
||||
|
||||
if removed_keys:
|
||||
logger.info(
|
||||
"SessionStore pruned %d entries older than %d days",
|
||||
len(removed_keys), max_age_days,
|
||||
)
|
||||
return len(removed_keys)
|
||||
|
||||
def suspend_recently_active(self, max_age_seconds: int = 120) -> int:
|
||||
"""Mark recently-active sessions as suspended.
|
||||
|
||||
|
|
|
|||
270
tests/gateway/test_session_store_prune.py
Normal file
270
tests/gateway/test_session_store_prune.py
Normal file
|
|
@ -0,0 +1,270 @@
|
|||
"""Tests for SessionStore.prune_old_entries and the gateway watcher that calls it.
|
||||
|
||||
The SessionStore in-memory dict (and its backing sessions.json) grew
|
||||
unbounded — every unique (platform, chat_id, thread_id, user_id) tuple
|
||||
ever seen was kept forever, regardless of how stale it became. These
|
||||
tests pin the prune behaviour:
|
||||
|
||||
* Entries older than max_age_days (by updated_at) are removed
|
||||
* Entries marked ``suspended`` are preserved (user-paused)
|
||||
* Entries with an active process attached are preserved
|
||||
* max_age_days <= 0 disables pruning entirely
|
||||
* sessions.json is rewritten with the post-prune dict
|
||||
* The ``updated_at`` field — not ``created_at`` — drives the decision
|
||||
(so a long-running-but-still-active session isn't pruned)
|
||||
"""
|
||||
|
||||
import json
|
||||
import threading
|
||||
from datetime import datetime, timedelta
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from gateway.config import GatewayConfig, Platform, SessionResetPolicy
|
||||
from gateway.session import SessionEntry, SessionStore
|
||||
|
||||
|
||||
def _make_store(tmp_path, max_age_days: int = 90, has_active_processes_fn=None):
|
||||
"""Build a SessionStore bypassing SQLite/disk-load side effects."""
|
||||
config = GatewayConfig(
|
||||
default_reset_policy=SessionResetPolicy(mode="none"),
|
||||
session_store_max_age_days=max_age_days,
|
||||
)
|
||||
with patch("gateway.session.SessionStore._ensure_loaded"):
|
||||
store = SessionStore(
|
||||
sessions_dir=tmp_path,
|
||||
config=config,
|
||||
has_active_processes_fn=has_active_processes_fn,
|
||||
)
|
||||
store._db = None
|
||||
store._loaded = True
|
||||
return store
|
||||
|
||||
|
||||
def _entry(key: str, age_days: float, *, suspended: bool = False,
|
||||
session_id: str | None = None) -> SessionEntry:
|
||||
now = datetime.now()
|
||||
return SessionEntry(
|
||||
session_key=key,
|
||||
session_id=session_id or f"sid_{key}",
|
||||
created_at=now - timedelta(days=age_days + 30), # arbitrary older
|
||||
updated_at=now - timedelta(days=age_days),
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_type="dm",
|
||||
suspended=suspended,
|
||||
)
|
||||
|
||||
|
||||
class TestPruneBasics:
|
||||
def test_prune_removes_entries_past_max_age(self, tmp_path):
|
||||
store = _make_store(tmp_path)
|
||||
store._entries["old"] = _entry("old", age_days=100)
|
||||
store._entries["fresh"] = _entry("fresh", age_days=5)
|
||||
|
||||
removed = store.prune_old_entries(max_age_days=90)
|
||||
|
||||
assert removed == 1
|
||||
assert "old" not in store._entries
|
||||
assert "fresh" in store._entries
|
||||
|
||||
def test_prune_uses_updated_at_not_created_at(self, tmp_path):
|
||||
"""A session created long ago but updated recently must be kept."""
|
||||
store = _make_store(tmp_path)
|
||||
now = datetime.now()
|
||||
entry = SessionEntry(
|
||||
session_key="long-lived",
|
||||
session_id="sid",
|
||||
created_at=now - timedelta(days=365), # ancient
|
||||
updated_at=now - timedelta(days=3), # but just chatted
|
||||
platform=Platform.TELEGRAM,
|
||||
chat_type="dm",
|
||||
)
|
||||
store._entries["long-lived"] = entry
|
||||
|
||||
removed = store.prune_old_entries(max_age_days=30)
|
||||
|
||||
assert removed == 0
|
||||
assert "long-lived" in store._entries
|
||||
|
||||
def test_prune_disabled_when_max_age_is_zero(self, tmp_path):
|
||||
store = _make_store(tmp_path, max_age_days=0)
|
||||
for i in range(5):
|
||||
store._entries[f"s{i}"] = _entry(f"s{i}", age_days=365)
|
||||
|
||||
assert store.prune_old_entries(0) == 0
|
||||
assert len(store._entries) == 5
|
||||
|
||||
def test_prune_disabled_when_max_age_is_negative(self, tmp_path):
|
||||
store = _make_store(tmp_path)
|
||||
store._entries["s"] = _entry("s", age_days=365)
|
||||
|
||||
assert store.prune_old_entries(-1) == 0
|
||||
assert "s" in store._entries
|
||||
|
||||
def test_prune_skips_suspended_entries(self, tmp_path):
|
||||
"""/stop-suspended sessions must be kept for later resume."""
|
||||
store = _make_store(tmp_path)
|
||||
store._entries["suspended"] = _entry(
|
||||
"suspended", age_days=1000, suspended=True
|
||||
)
|
||||
store._entries["idle"] = _entry("idle", age_days=1000)
|
||||
|
||||
removed = store.prune_old_entries(max_age_days=90)
|
||||
|
||||
assert removed == 1
|
||||
assert "suspended" in store._entries
|
||||
assert "idle" not in store._entries
|
||||
|
||||
def test_prune_skips_entries_with_active_processes(self, tmp_path):
|
||||
"""Sessions with active bg processes aren't pruned even if old."""
|
||||
active_session_ids = {"sid_active"}
|
||||
|
||||
def _has_active(session_id: str) -> bool:
|
||||
return session_id in active_session_ids
|
||||
|
||||
store = _make_store(tmp_path, has_active_processes_fn=_has_active)
|
||||
store._entries["active"] = _entry(
|
||||
"active", age_days=1000, session_id="sid_active"
|
||||
)
|
||||
store._entries["idle"] = _entry(
|
||||
"idle", age_days=1000, session_id="sid_idle"
|
||||
)
|
||||
|
||||
removed = store.prune_old_entries(max_age_days=90)
|
||||
|
||||
assert removed == 1
|
||||
assert "active" in store._entries
|
||||
assert "idle" not in store._entries
|
||||
|
||||
def test_prune_does_not_write_disk_when_no_removals(self, tmp_path):
|
||||
"""If nothing is evictable, _save() should NOT be called."""
|
||||
store = _make_store(tmp_path)
|
||||
store._entries["fresh1"] = _entry("fresh1", age_days=1)
|
||||
store._entries["fresh2"] = _entry("fresh2", age_days=2)
|
||||
|
||||
save_calls = []
|
||||
store._save = lambda: save_calls.append(1)
|
||||
|
||||
assert store.prune_old_entries(max_age_days=90) == 0
|
||||
assert save_calls == []
|
||||
|
||||
def test_prune_writes_disk_after_removal(self, tmp_path):
|
||||
store = _make_store(tmp_path)
|
||||
store._entries["stale"] = _entry("stale", age_days=500)
|
||||
store._entries["fresh"] = _entry("fresh", age_days=1)
|
||||
|
||||
save_calls = []
|
||||
store._save = lambda: save_calls.append(1)
|
||||
|
||||
store.prune_old_entries(max_age_days=90)
|
||||
assert save_calls == [1]
|
||||
|
||||
def test_prune_is_thread_safe(self, tmp_path):
|
||||
"""Prune acquires _lock internally; concurrent update_session is safe."""
|
||||
store = _make_store(tmp_path)
|
||||
for i in range(20):
|
||||
age = 1000 if i % 2 == 0 else 1
|
||||
store._entries[f"s{i}"] = _entry(f"s{i}", age_days=age)
|
||||
|
||||
results = []
|
||||
|
||||
def _pruner():
|
||||
results.append(store.prune_old_entries(max_age_days=90))
|
||||
|
||||
def _reader():
|
||||
# Mimic a concurrent update_session reader iterating under lock.
|
||||
with store._lock:
|
||||
list(store._entries.keys())
|
||||
|
||||
threads = [threading.Thread(target=_pruner)]
|
||||
threads += [threading.Thread(target=_reader) for _ in range(4)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join(timeout=5)
|
||||
assert not t.is_alive()
|
||||
|
||||
# Exactly one pruner ran; removed exactly the 10 stale entries.
|
||||
assert results == [10]
|
||||
assert len(store._entries) == 10
|
||||
for i in range(20):
|
||||
if i % 2 == 1: # fresh
|
||||
assert f"s{i}" in store._entries
|
||||
|
||||
|
||||
class TestPrunePersistsToDisk:
|
||||
def test_prune_rewrites_sessions_json(self, tmp_path):
|
||||
"""After prune, sessions.json on disk reflects the new dict."""
|
||||
config = GatewayConfig(
|
||||
default_reset_policy=SessionResetPolicy(mode="none"),
|
||||
session_store_max_age_days=90,
|
||||
)
|
||||
store = SessionStore(sessions_dir=tmp_path, config=config)
|
||||
store._db = None
|
||||
# Force-populate without calling get_or_create to avoid DB side-effects
|
||||
store._entries["stale"] = _entry("stale", age_days=500)
|
||||
store._entries["fresh"] = _entry("fresh", age_days=1)
|
||||
store._loaded = True
|
||||
store._save()
|
||||
|
||||
# Verify pre-prune state on disk.
|
||||
saved_pre = json.loads((tmp_path / "sessions.json").read_text())
|
||||
assert set(saved_pre.keys()) == {"stale", "fresh"}
|
||||
|
||||
# Prune and check disk.
|
||||
store.prune_old_entries(max_age_days=90)
|
||||
saved_post = json.loads((tmp_path / "sessions.json").read_text())
|
||||
assert set(saved_post.keys()) == {"fresh"}
|
||||
|
||||
|
||||
class TestGatewayConfigSerialization:
|
||||
def test_session_store_max_age_days_defaults_to_90(self):
|
||||
cfg = GatewayConfig()
|
||||
assert cfg.session_store_max_age_days == 90
|
||||
|
||||
def test_session_store_max_age_days_roundtrips(self):
|
||||
cfg = GatewayConfig(session_store_max_age_days=30)
|
||||
restored = GatewayConfig.from_dict(cfg.to_dict())
|
||||
assert restored.session_store_max_age_days == 30
|
||||
|
||||
def test_session_store_max_age_days_missing_defaults_90(self):
|
||||
"""Loading an old config (pre-this-field) falls back to default."""
|
||||
restored = GatewayConfig.from_dict({})
|
||||
assert restored.session_store_max_age_days == 90
|
||||
|
||||
def test_session_store_max_age_days_negative_coerced_to_zero(self):
|
||||
"""A negative value (accidental or hostile) becomes 0 (disabled)."""
|
||||
restored = GatewayConfig.from_dict({"session_store_max_age_days": -5})
|
||||
assert restored.session_store_max_age_days == 0
|
||||
|
||||
def test_session_store_max_age_days_bad_type_falls_back(self):
|
||||
"""Non-int values fall back to the default, not a crash."""
|
||||
restored = GatewayConfig.from_dict({"session_store_max_age_days": "nope"})
|
||||
assert restored.session_store_max_age_days == 90
|
||||
|
||||
|
||||
class TestGatewayWatcherCallsPrune:
|
||||
"""The session_expiry_watcher should call prune_old_entries once per hour."""
|
||||
|
||||
def test_prune_gate_fires_on_first_tick(self):
|
||||
"""First watcher tick has _last_prune_ts=0, so the gate opens."""
|
||||
import time as _t
|
||||
|
||||
last_ts = 0.0
|
||||
prune_interval = 3600.0
|
||||
now = _t.time()
|
||||
|
||||
# Mirror the production gate check in _session_expiry_watcher.
|
||||
should_prune = (now - last_ts) > prune_interval
|
||||
assert should_prune is True
|
||||
|
||||
def test_prune_gate_suppresses_within_interval(self):
|
||||
import time as _t
|
||||
|
||||
last_ts = _t.time() - 600 # 10 minutes ago
|
||||
prune_interval = 3600.0
|
||||
now = _t.time()
|
||||
|
||||
should_prune = (now - last_ts) > prune_interval
|
||||
assert should_prune is False
|
||||
Loading…
Add table
Add a link
Reference in a new issue