refactor: consolidate gateway session metadata into state.db

Move gateway routing metadata (session_key, platform, chat_type, origin,
display_name, memory_flushed) from sessions.json into the sessions table
in state.db. This eliminates the dual-file dependency that caused the
mcp_serve polling bug (#8925) and makes state.db the single source of
truth for session discovery.

Changes:
- Schema v7 migration: add 6 new columns to sessions table with backfill
  from existing sessions.json during migration
- New SessionDB methods: set_gateway_metadata(), list_gateway_sessions(),
  find_session_by_origin(), set_memory_flushed()
- Gateway session.py: write routing metadata to state.db on every
  session create/reset/switch
- Rewire all consumers (mcp_serve, mirror, channel_directory, status)
  to query state.db first with sessions.json fallback for pre-migration
  databases
- mcp_serve _poll_once: simplified to watch only state.db mtime (fixes
  the split-mtime bug from #8925 as a side effect)

sessions.json continues to be written by the gateway for now but is no
longer read as the primary source by any consumer. Can be made optional
in a future PR.
This commit is contained in:
Teknium 2026-04-13 05:07:35 -07:00
parent e77f135ed8
commit 14a5e56f6f
No known key found for this signature in database
8 changed files with 347 additions and 28 deletions

View file

@ -145,12 +145,49 @@ def _build_slack(adapter) -> List[Dict[str, str]]:
def _build_from_sessions(platform_name: str) -> List[Dict[str, str]]: def _build_from_sessions(platform_name: str) -> List[Dict[str, str]]:
"""Pull known channels/contacts from sessions.json origin data.""" """Pull known channels/contacts from state.db gateway metadata.
Falls back to sessions.json for pre-migration databases.
"""
entries = []
# Primary: query state.db
try:
from hermes_state import SessionDB
db = SessionDB()
try:
rows = db.list_gateway_sessions(platform=platform_name)
finally:
db.close()
if rows:
seen_ids = set()
for row in rows:
origin_json = row.get("origin_json")
if not origin_json:
continue
try:
origin = json.loads(origin_json)
except (json.JSONDecodeError, TypeError):
continue
entry_id = _session_entry_id(origin)
if not entry_id or entry_id in seen_ids:
continue
seen_ids.add(entry_id)
entries.append({
"id": entry_id,
"name": _session_entry_name(origin),
"type": row.get("chat_type", "dm"),
"thread_id": origin.get("thread_id"),
})
return entries
except Exception as e:
logger.debug("Channel directory: state.db lookup failed, falling back: %s", e)
# Fallback: read sessions.json
sessions_path = get_hermes_home() / "sessions" / "sessions.json" sessions_path = get_hermes_home() / "sessions" / "sessions.json"
if not sessions_path.exists(): if not sessions_path.exists():
return [] return []
entries = []
try: try:
with open(sessions_path, encoding="utf-8") as f: with open(sessions_path, encoding="utf-8") as f:
data = json.load(f) data = json.load(f)

View file

@ -67,10 +67,23 @@ def _find_session_id(platform: str, chat_id: str, thread_id: Optional[str] = Non
""" """
Find the active session_id for a platform + chat_id pair. Find the active session_id for a platform + chat_id pair.
Scans sessions.json entries and matches where origin.chat_id == chat_id Queries state.db for matching sessions. Falls back to sessions.json
on the right platform. DM session keys don't embed the chat_id for pre-migration databases.
(e.g. "agent:main:telegram:dm"), so we check the origin dict.
""" """
# Primary: query state.db
try:
from hermes_state import SessionDB
db = SessionDB()
try:
row = db.find_session_by_origin(platform, chat_id, thread_id=thread_id)
finally:
db.close()
if row:
return row.get("id")
except Exception as e:
logger.debug("Mirror: state.db lookup failed, falling back to sessions.json: %s", e)
# Fallback: read sessions.json
if not _SESSIONS_INDEX.exists(): if not _SESSIONS_INDEX.exists():
return None return None

View file

@ -1819,6 +1819,11 @@ class GatewayRunner:
with self.session_store._lock: with self.session_store._lock:
entry.memory_flushed = True entry.memory_flushed = True
self.session_store._save() self.session_store._save()
if self._session_db:
try:
self._session_db.set_memory_flushed(entry.session_id)
except Exception:
pass
logger.debug( logger.debug(
"Memory flush completed for session %s", "Memory flush completed for session %s",
entry.session_id, entry.session_id,
@ -1836,6 +1841,11 @@ class GatewayRunner:
with self.session_store._lock: with self.session_store._lock:
entry.memory_flushed = True entry.memory_flushed = True
self.session_store._save() self.session_store._save()
if self._session_db:
try:
self._session_db.set_memory_flushed(entry.session_id)
except Exception:
pass
_flush_failures.pop(entry.session_id, None) _flush_failures.pop(entry.session_id, None)
else: else:
logger.debug( logger.debug(

View file

@ -764,6 +764,19 @@ class SessionStore:
self._db.create_session(**db_create_kwargs) self._db.create_session(**db_create_kwargs)
except Exception as e: except Exception as e:
print(f"[gateway] Warning: Failed to create SQLite session: {e}") print(f"[gateway] Warning: Failed to create SQLite session: {e}")
# Write gateway routing metadata to state.db so it can serve
# as the single source of truth (replacing sessions.json reads).
try:
self._db.set_gateway_metadata(
session_id=entry.session_id,
session_key=entry.session_key,
platform=entry.platform.value if entry.platform else None,
chat_type=entry.chat_type,
origin_json=json.dumps(entry.origin.to_dict()) if entry.origin else None,
display_name=entry.display_name,
)
except Exception as e:
logger.debug("Failed to write gateway metadata to state.db: %s", e)
return entry return entry
@ -869,6 +882,17 @@ class SessionStore:
self._db.create_session(**db_create_kwargs) self._db.create_session(**db_create_kwargs)
except Exception as e: except Exception as e:
logger.debug("Session DB operation failed: %s", e) logger.debug("Session DB operation failed: %s", e)
try:
self._db.set_gateway_metadata(
session_id=new_entry.session_id,
session_key=new_entry.session_key,
platform=new_entry.platform.value if new_entry.platform else None,
chat_type=new_entry.chat_type,
origin_json=json.dumps(new_entry.origin.to_dict()) if new_entry.origin else None,
display_name=new_entry.display_name,
)
except Exception as e:
logger.debug("Failed to write gateway metadata to state.db: %s", e)
return new_entry return new_entry
@ -918,6 +942,20 @@ class SessionStore:
except Exception as e: except Exception as e:
logger.debug("Session DB end_session failed: %s", e) logger.debug("Session DB end_session failed: %s", e)
# Update gateway metadata on the target session
if self._db and new_entry:
try:
self._db.set_gateway_metadata(
session_id=new_entry.session_id,
session_key=new_entry.session_key,
platform=new_entry.platform.value if new_entry.platform else None,
chat_type=new_entry.chat_type,
origin_json=json.dumps(new_entry.origin.to_dict()) if new_entry.origin else None,
display_name=new_entry.display_name,
)
except Exception as e:
logger.debug("Failed to write gateway metadata to state.db: %s", e)
return new_entry return new_entry
def list_sessions(self, active_minutes: Optional[int] = None) -> List[SessionEntry]: def list_sessions(self, active_minutes: Optional[int] = None) -> List[SessionEntry]:

View file

@ -421,7 +421,21 @@ def show_status(args):
print(color("◆ Sessions", Colors.CYAN, Colors.BOLD)) print(color("◆ Sessions", Colors.CYAN, Colors.BOLD))
sessions_file = get_hermes_home() / "sessions" / "sessions.json" sessions_file = get_hermes_home() / "sessions" / "sessions.json"
if sessions_file.exists(): # Primary: count gateway sessions from state.db
_session_count_shown = False
try:
from hermes_state import SessionDB
_sdb = SessionDB()
try:
_gw_sessions = _sdb.list_gateway_sessions()
finally:
_sdb.close()
print(f" Active: {len(_gw_sessions)} session(s)")
_session_count_shown = True
except Exception:
pass
# Fallback: sessions.json
if not _session_count_shown and sessions_file.exists():
import json import json
try: try:
with open(sessions_file, encoding="utf-8") as f: with open(sessions_file, encoding="utf-8") as f:

View file

@ -31,7 +31,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db" DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 6 SCHEMA_VERSION = 7
SCHEMA_SQL = """ SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version ( CREATE TABLE IF NOT EXISTS schema_version (
@ -65,6 +65,12 @@ CREATE TABLE IF NOT EXISTS sessions (
cost_source TEXT, cost_source TEXT,
pricing_version TEXT, pricing_version TEXT,
title TEXT, title TEXT,
session_key TEXT,
platform TEXT,
chat_type TEXT,
origin_json TEXT,
display_name TEXT,
memory_flushed INTEGER DEFAULT 0,
FOREIGN KEY (parent_session_id) REFERENCES sessions(id) FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
); );
@ -329,6 +335,36 @@ class SessionDB:
except sqlite3.OperationalError: except sqlite3.OperationalError:
pass # Column already exists pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 6") cursor.execute("UPDATE schema_version SET version = 6")
if current_version < 7:
# v7: add gateway routing metadata columns to sessions table.
# These columns allow state.db to serve as the single source of
# truth for session discovery, replacing sessions.json reads.
for col_name, col_type in [
("session_key", "TEXT"),
("platform", "TEXT"),
("chat_type", "TEXT"),
("origin_json", "TEXT"),
("display_name", "TEXT"),
("memory_flushed", "INTEGER DEFAULT 0"),
]:
try:
safe = col_name.replace('"', '""')
cursor.execute(
f'ALTER TABLE sessions ADD COLUMN "{safe}" {col_type}'
)
except sqlite3.OperationalError:
pass # Column already exists
# Create index on session_key for fast lookups
try:
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_session_key "
"ON sessions(session_key)"
)
except sqlite3.OperationalError:
pass
# Backfill from sessions.json if it exists
self._backfill_gateway_metadata_v7(cursor)
cursor.execute("UPDATE schema_version SET version = 7")
# Unique title index — always ensure it exists (safe to run after migrations # Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point) # since the title column is guaranteed to exist at this point)
@ -340,6 +376,15 @@ class SessionDB:
except sqlite3.OperationalError: except sqlite3.OperationalError:
pass # Index already exists pass # Index already exists
# session_key index for gateway metadata lookups
try:
cursor.execute(
"CREATE INDEX IF NOT EXISTS idx_sessions_session_key "
"ON sessions(session_key)"
)
except sqlite3.OperationalError:
pass
# FTS5 setup (separate because CREATE VIRTUAL TABLE can't be in executescript with IF NOT EXISTS reliably) # FTS5 setup (separate because CREATE VIRTUAL TABLE can't be in executescript with IF NOT EXISTS reliably)
try: try:
cursor.execute("SELECT * FROM messages_fts LIMIT 0") cursor.execute("SELECT * FROM messages_fts LIMIT 0")
@ -348,6 +393,37 @@ class SessionDB:
self._conn.commit() self._conn.commit()
def _backfill_gateway_metadata_v7(self, cursor):
"""Backfill gateway routing metadata from sessions.json during v7 migration."""
try:
sessions_dir = get_hermes_home() / "sessions"
sessions_file = sessions_dir / "sessions.json"
if not sessions_file.exists():
return
with open(sessions_file, "r", encoding="utf-8") as f:
data = json.load(f)
for _key, entry in data.items():
session_id = entry.get("session_id", "")
if not session_id:
continue
session_key = entry.get("session_key", _key)
platform = entry.get("platform", "")
chat_type = entry.get("chat_type", "dm")
display_name = entry.get("display_name")
origin = entry.get("origin")
origin_json = json.dumps(origin) if origin else None
memory_flushed = 1 if entry.get("memory_flushed") else 0
cursor.execute(
"""UPDATE sessions SET
session_key = ?, platform = ?, chat_type = ?,
origin_json = ?, display_name = ?, memory_flushed = ?
WHERE id = ? AND session_key IS NULL""",
(session_key, platform, chat_type, origin_json,
display_name, memory_flushed, session_id),
)
except Exception as e:
logger.debug("v7 backfill from sessions.json failed (non-fatal): %s", e)
# ========================================================================= # =========================================================================
# Session lifecycle # Session lifecycle
# ========================================================================= # =========================================================================
@ -382,6 +458,112 @@ class SessionDB:
self._execute_write(_do) self._execute_write(_do)
return session_id return session_id
def set_gateway_metadata(
self,
session_id: str,
session_key: str = None,
platform: str = None,
chat_type: str = None,
origin_json: str = None,
display_name: str = None,
) -> None:
"""Write gateway routing metadata for a session.
Called by the gateway after creating or resuming a session so that
state.db becomes the single source of truth for session discovery.
Uses UPDATE (not UPSERT) the session row must already exist.
"""
sets = []
params = []
if session_key is not None:
sets.append("session_key = ?")
params.append(session_key)
if platform is not None:
sets.append("platform = ?")
params.append(platform)
if chat_type is not None:
sets.append("chat_type = ?")
params.append(chat_type)
if origin_json is not None:
sets.append("origin_json = ?")
params.append(origin_json)
if display_name is not None:
sets.append("display_name = ?")
params.append(display_name)
if not sets:
return
params.append(session_id)
sql = f"UPDATE sessions SET {', '.join(sets)} WHERE id = ?"
def _do(conn):
conn.execute(sql, params)
self._execute_write(_do)
def set_memory_flushed(self, session_id: str, flushed: bool = True) -> None:
"""Mark a session as having its memory flushed."""
def _do(conn):
conn.execute(
"UPDATE sessions SET memory_flushed = ? WHERE id = ?",
(1 if flushed else 0, session_id),
)
self._execute_write(_do)
def list_gateway_sessions(self, platform: str = None) -> List[Dict[str, Any]]:
"""List sessions that have gateway routing metadata.
Returns dicts with: id, session_key, platform, chat_type,
origin_json, display_name, source, started_at, ended_at, title,
message_count, memory_flushed.
When ``platform`` is given, only sessions for that platform are returned.
Only sessions with a non-NULL session_key are included (i.e. sessions
that were created through the gateway, not bare CLI sessions).
"""
where = "WHERE session_key IS NOT NULL"
params = []
if platform:
where += " AND platform = ?"
params.append(platform)
with self._lock:
rows = self._conn.execute(
f"""SELECT id, session_key, platform, chat_type, origin_json,
display_name, source, started_at, ended_at, title,
message_count, memory_flushed
FROM sessions {where}
ORDER BY started_at DESC""",
params,
).fetchall()
return [dict(r) for r in rows]
def find_session_by_origin(
self, platform: str, chat_id: str, thread_id: str = None,
) -> Optional[Dict[str, Any]]:
"""Find the most recent session for a platform + chat_id pair.
Searches the origin_json column for matching chat_id. When
``thread_id`` is given, also matches on thread_id. Returns the
session dict or None.
"""
# Use JSON extraction for matching. SQLite json_extract is
# available in all modern builds (3.9+).
sql = """
SELECT id, session_key, platform, chat_type, origin_json,
display_name, source, started_at, ended_at, title,
memory_flushed
FROM sessions
WHERE platform = ?
AND json_extract(origin_json, '$.chat_id') = ?
AND session_key IS NOT NULL
"""
params: list = [platform, str(chat_id)]
if thread_id is not None:
sql += " AND json_extract(origin_json, '$.thread_id') = ?"
params.append(str(thread_id))
sql += " ORDER BY started_at DESC LIMIT 1"
with self._lock:
row = self._conn.execute(sql, params).fetchone()
return dict(row) if row else None
def end_session(self, session_id: str, end_reason: str) -> None: def end_session(self, session_id: str, end_reason: str) -> None:
"""Mark a session as ended.""" """Mark a session as ended."""
def _do(conn): def _do(conn):

View file

@ -79,11 +79,45 @@ def _get_session_db():
def _load_sessions_index() -> dict: def _load_sessions_index() -> dict:
"""Load the gateway sessions.json index directly. """Load gateway session metadata from state.db.
Returns a dict of session_key -> entry_dict with platform routing info. Returns a dict of session_key -> entry_dict with platform routing info.
This avoids importing the full SessionStore which needs GatewayConfig. Falls back to reading sessions.json when state.db has no gateway metadata
(pre-migration databases).
""" """
try:
from hermes_state import SessionDB
db = SessionDB()
try:
rows = db.list_gateway_sessions()
finally:
db.close()
if rows:
result = {}
for row in rows:
sk = row.get("session_key")
if not sk:
continue
entry = {
"session_key": sk,
"session_id": row.get("id", ""),
"platform": row.get("platform", ""),
"chat_type": row.get("chat_type", "dm"),
"display_name": row.get("display_name"),
"memory_flushed": bool(row.get("memory_flushed", 0)),
}
origin_json = row.get("origin_json")
if origin_json:
try:
entry["origin"] = json.loads(origin_json)
except (json.JSONDecodeError, TypeError):
pass
result[sk] = entry
return result
except Exception as e:
logger.debug("Failed to load sessions from state.db: %s", e)
# Fallback: read sessions.json for pre-migration databases
sessions_file = _get_sessions_dir() / "sessions.json" sessions_file = _get_sessions_dir() / "sessions.json"
if not sessions_file.exists(): if not sessions_file.exists():
return {} return {}
@ -200,8 +234,7 @@ class EventBridge:
self._last_poll_timestamps: Dict[str, float] = {} # session_key -> unix timestamp self._last_poll_timestamps: Dict[str, float] = {} # session_key -> unix timestamp
# In-memory approval tracking (populated from events) # In-memory approval tracking (populated from events)
self._pending_approvals: Dict[str, dict] = {} self._pending_approvals: Dict[str, dict] = {}
# mtime cache — skip expensive work when files haven't changed # mtime cache — skip expensive work when state.db hasn't changed
self._sessions_json_mtime: float = 0.0
self._state_db_mtime: float = 0.0 self._state_db_mtime: float = 0.0
self._cached_sessions_index: dict = {} self._cached_sessions_index: dict = {}
@ -327,21 +360,10 @@ class EventBridge:
def _poll_once(self, db): def _poll_once(self, db):
"""Check for new messages across all sessions. """Check for new messages across all sessions.
Uses mtime checks on sessions.json and state.db to skip work Uses mtime check on state.db to skip work when nothing has changed
when nothing has changed makes 200ms polling essentially free. makes 200ms polling essentially free.
""" """
# Check if sessions.json has changed (mtime check is ~1μs) # Check if state.db has changed (mtime check is ~1μs)
sessions_file = _get_sessions_dir() / "sessions.json"
try:
sj_mtime = sessions_file.stat().st_mtime if sessions_file.exists() else 0.0
except OSError:
sj_mtime = 0.0
if sj_mtime != self._sessions_json_mtime:
self._sessions_json_mtime = sj_mtime
self._cached_sessions_index = _load_sessions_index()
# Check if state.db has changed
try: try:
from hermes_constants import get_hermes_home from hermes_constants import get_hermes_home
db_file = get_hermes_home() / "state.db" db_file = get_hermes_home() / "state.db"
@ -353,10 +375,13 @@ class EventBridge:
except OSError: except OSError:
db_mtime = 0.0 db_mtime = 0.0
if db_mtime == self._state_db_mtime and sj_mtime == self._sessions_json_mtime: if db_mtime == self._state_db_mtime:
return # Nothing changed since last poll — skip entirely return # Nothing changed since last poll — skip entirely
self._state_db_mtime = db_mtime self._state_db_mtime = db_mtime
# Reload the session index from state.db on every change since
# new sessions may have been created.
self._cached_sessions_index = _load_sessions_index()
entries = self._cached_sessions_index entries = self._cached_sessions_index
for session_key, entry in entries.items(): for session_key, entry in entries.items():

View file

@ -935,7 +935,7 @@ class TestSchemaInit:
def test_schema_version(self, db): def test_schema_version(self, db):
cursor = db._conn.execute("SELECT version FROM schema_version") cursor = db._conn.execute("SELECT version FROM schema_version")
version = cursor.fetchone()[0] version = cursor.fetchone()[0]
assert version == 6 assert version == 7
def test_title_column_exists(self, db): def test_title_column_exists(self, db):
"""Verify the title column was created in the sessions table.""" """Verify the title column was created in the sessions table."""
@ -996,7 +996,7 @@ class TestSchemaInit:
# Verify migration # Verify migration
cursor = migrated_db._conn.execute("SELECT version FROM schema_version") cursor = migrated_db._conn.execute("SELECT version FROM schema_version")
assert cursor.fetchone()[0] == 6 assert cursor.fetchone()[0] == 7
# Verify title column exists and is NULL for existing sessions # Verify title column exists and is NULL for existing sessions
session = migrated_db.get_session("existing") session = migrated_db.get_session("existing")