diff --git a/gateway/channel_directory.py b/gateway/channel_directory.py index ae2beda9e..c8759a48b 100644 --- a/gateway/channel_directory.py +++ b/gateway/channel_directory.py @@ -145,12 +145,49 @@ def _build_slack(adapter) -> List[Dict[str, str]]: def _build_from_sessions(platform_name: str) -> List[Dict[str, str]]: - """Pull known channels/contacts from sessions.json origin data.""" + """Pull known channels/contacts from state.db gateway metadata. + + Falls back to sessions.json for pre-migration databases. + """ + entries = [] + + # Primary: query state.db + try: + from hermes_state import SessionDB + db = SessionDB() + try: + rows = db.list_gateway_sessions(platform=platform_name) + finally: + db.close() + if rows: + seen_ids = set() + for row in rows: + origin_json = row.get("origin_json") + if not origin_json: + continue + try: + origin = json.loads(origin_json) + except (json.JSONDecodeError, TypeError): + continue + entry_id = _session_entry_id(origin) + if not entry_id or entry_id in seen_ids: + continue + seen_ids.add(entry_id) + entries.append({ + "id": entry_id, + "name": _session_entry_name(origin), + "type": row.get("chat_type", "dm"), + "thread_id": origin.get("thread_id"), + }) + return entries + except Exception as e: + logger.debug("Channel directory: state.db lookup failed, falling back: %s", e) + + # Fallback: read sessions.json sessions_path = get_hermes_home() / "sessions" / "sessions.json" if not sessions_path.exists(): return [] - entries = [] try: with open(sessions_path, encoding="utf-8") as f: data = json.load(f) diff --git a/gateway/mirror.py b/gateway/mirror.py index 0312424f1..974a8897e 100644 --- a/gateway/mirror.py +++ b/gateway/mirror.py @@ -67,10 +67,23 @@ def _find_session_id(platform: str, chat_id: str, thread_id: Optional[str] = Non """ Find the active session_id for a platform + chat_id pair. - Scans sessions.json entries and matches where origin.chat_id == chat_id - on the right platform. DM session keys don't embed the chat_id - (e.g. "agent:main:telegram:dm"), so we check the origin dict. + Queries state.db for matching sessions. Falls back to sessions.json + for pre-migration databases. """ + # Primary: query state.db + try: + from hermes_state import SessionDB + db = SessionDB() + try: + row = db.find_session_by_origin(platform, chat_id, thread_id=thread_id) + finally: + db.close() + if row: + return row.get("id") + except Exception as e: + logger.debug("Mirror: state.db lookup failed, falling back to sessions.json: %s", e) + + # Fallback: read sessions.json if not _SESSIONS_INDEX.exists(): return None diff --git a/gateway/run.py b/gateway/run.py index b54149d04..c0c6a386b 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1819,6 +1819,11 @@ class GatewayRunner: with self.session_store._lock: entry.memory_flushed = True self.session_store._save() + if self._session_db: + try: + self._session_db.set_memory_flushed(entry.session_id) + except Exception: + pass logger.debug( "Memory flush completed for session %s", entry.session_id, @@ -1836,6 +1841,11 @@ class GatewayRunner: with self.session_store._lock: entry.memory_flushed = True self.session_store._save() + if self._session_db: + try: + self._session_db.set_memory_flushed(entry.session_id) + except Exception: + pass _flush_failures.pop(entry.session_id, None) else: logger.debug( diff --git a/gateway/session.py b/gateway/session.py index a11ade898..607484faa 100644 --- a/gateway/session.py +++ b/gateway/session.py @@ -764,6 +764,19 @@ class SessionStore: self._db.create_session(**db_create_kwargs) except Exception as e: print(f"[gateway] Warning: Failed to create SQLite session: {e}") + # Write gateway routing metadata to state.db so it can serve + # as the single source of truth (replacing sessions.json reads). + try: + self._db.set_gateway_metadata( + session_id=entry.session_id, + session_key=entry.session_key, + platform=entry.platform.value if entry.platform else None, + chat_type=entry.chat_type, + origin_json=json.dumps(entry.origin.to_dict()) if entry.origin else None, + display_name=entry.display_name, + ) + except Exception as e: + logger.debug("Failed to write gateway metadata to state.db: %s", e) return entry @@ -869,6 +882,17 @@ class SessionStore: self._db.create_session(**db_create_kwargs) except Exception as e: logger.debug("Session DB operation failed: %s", e) + try: + self._db.set_gateway_metadata( + session_id=new_entry.session_id, + session_key=new_entry.session_key, + platform=new_entry.platform.value if new_entry.platform else None, + chat_type=new_entry.chat_type, + origin_json=json.dumps(new_entry.origin.to_dict()) if new_entry.origin else None, + display_name=new_entry.display_name, + ) + except Exception as e: + logger.debug("Failed to write gateway metadata to state.db: %s", e) return new_entry @@ -918,6 +942,20 @@ class SessionStore: except Exception as e: logger.debug("Session DB end_session failed: %s", e) + # Update gateway metadata on the target session + if self._db and new_entry: + try: + self._db.set_gateway_metadata( + session_id=new_entry.session_id, + session_key=new_entry.session_key, + platform=new_entry.platform.value if new_entry.platform else None, + chat_type=new_entry.chat_type, + origin_json=json.dumps(new_entry.origin.to_dict()) if new_entry.origin else None, + display_name=new_entry.display_name, + ) + except Exception as e: + logger.debug("Failed to write gateway metadata to state.db: %s", e) + return new_entry def list_sessions(self, active_minutes: Optional[int] = None) -> List[SessionEntry]: diff --git a/hermes_cli/status.py b/hermes_cli/status.py index a7745d65f..9d8556bc8 100644 --- a/hermes_cli/status.py +++ b/hermes_cli/status.py @@ -421,7 +421,21 @@ def show_status(args): print(color("◆ Sessions", Colors.CYAN, Colors.BOLD)) sessions_file = get_hermes_home() / "sessions" / "sessions.json" - if sessions_file.exists(): + # Primary: count gateway sessions from state.db + _session_count_shown = False + try: + from hermes_state import SessionDB + _sdb = SessionDB() + try: + _gw_sessions = _sdb.list_gateway_sessions() + finally: + _sdb.close() + print(f" Active: {len(_gw_sessions)} session(s)") + _session_count_shown = True + except Exception: + pass + # Fallback: sessions.json + if not _session_count_shown and sessions_file.exists(): import json try: with open(sessions_file, encoding="utf-8") as f: diff --git a/hermes_state.py b/hermes_state.py index 5e563666e..a148916bd 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -31,7 +31,7 @@ T = TypeVar("T") DEFAULT_DB_PATH = get_hermes_home() / "state.db" -SCHEMA_VERSION = 6 +SCHEMA_VERSION = 7 SCHEMA_SQL = """ CREATE TABLE IF NOT EXISTS schema_version ( @@ -65,6 +65,12 @@ CREATE TABLE IF NOT EXISTS sessions ( cost_source TEXT, pricing_version TEXT, title TEXT, + session_key TEXT, + platform TEXT, + chat_type TEXT, + origin_json TEXT, + display_name TEXT, + memory_flushed INTEGER DEFAULT 0, FOREIGN KEY (parent_session_id) REFERENCES sessions(id) ); @@ -329,6 +335,36 @@ class SessionDB: except sqlite3.OperationalError: pass # Column already exists cursor.execute("UPDATE schema_version SET version = 6") + if current_version < 7: + # v7: add gateway routing metadata columns to sessions table. + # These columns allow state.db to serve as the single source of + # truth for session discovery, replacing sessions.json reads. + for col_name, col_type in [ + ("session_key", "TEXT"), + ("platform", "TEXT"), + ("chat_type", "TEXT"), + ("origin_json", "TEXT"), + ("display_name", "TEXT"), + ("memory_flushed", "INTEGER DEFAULT 0"), + ]: + try: + safe = col_name.replace('"', '""') + cursor.execute( + f'ALTER TABLE sessions ADD COLUMN "{safe}" {col_type}' + ) + except sqlite3.OperationalError: + pass # Column already exists + # Create index on session_key for fast lookups + try: + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_sessions_session_key " + "ON sessions(session_key)" + ) + except sqlite3.OperationalError: + pass + # Backfill from sessions.json if it exists + self._backfill_gateway_metadata_v7(cursor) + cursor.execute("UPDATE schema_version SET version = 7") # Unique title index — always ensure it exists (safe to run after migrations # since the title column is guaranteed to exist at this point) @@ -340,6 +376,15 @@ class SessionDB: except sqlite3.OperationalError: pass # Index already exists + # session_key index for gateway metadata lookups + try: + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_sessions_session_key " + "ON sessions(session_key)" + ) + except sqlite3.OperationalError: + pass + # FTS5 setup (separate because CREATE VIRTUAL TABLE can't be in executescript with IF NOT EXISTS reliably) try: cursor.execute("SELECT * FROM messages_fts LIMIT 0") @@ -348,6 +393,37 @@ class SessionDB: self._conn.commit() + def _backfill_gateway_metadata_v7(self, cursor): + """Backfill gateway routing metadata from sessions.json during v7 migration.""" + try: + sessions_dir = get_hermes_home() / "sessions" + sessions_file = sessions_dir / "sessions.json" + if not sessions_file.exists(): + return + with open(sessions_file, "r", encoding="utf-8") as f: + data = json.load(f) + for _key, entry in data.items(): + session_id = entry.get("session_id", "") + if not session_id: + continue + session_key = entry.get("session_key", _key) + platform = entry.get("platform", "") + chat_type = entry.get("chat_type", "dm") + display_name = entry.get("display_name") + origin = entry.get("origin") + origin_json = json.dumps(origin) if origin else None + memory_flushed = 1 if entry.get("memory_flushed") else 0 + cursor.execute( + """UPDATE sessions SET + session_key = ?, platform = ?, chat_type = ?, + origin_json = ?, display_name = ?, memory_flushed = ? + WHERE id = ? AND session_key IS NULL""", + (session_key, platform, chat_type, origin_json, + display_name, memory_flushed, session_id), + ) + except Exception as e: + logger.debug("v7 backfill from sessions.json failed (non-fatal): %s", e) + # ========================================================================= # Session lifecycle # ========================================================================= @@ -382,6 +458,112 @@ class SessionDB: self._execute_write(_do) return session_id + def set_gateway_metadata( + self, + session_id: str, + session_key: str = None, + platform: str = None, + chat_type: str = None, + origin_json: str = None, + display_name: str = None, + ) -> None: + """Write gateway routing metadata for a session. + + Called by the gateway after creating or resuming a session so that + state.db becomes the single source of truth for session discovery. + Uses UPDATE (not UPSERT) — the session row must already exist. + """ + sets = [] + params = [] + if session_key is not None: + sets.append("session_key = ?") + params.append(session_key) + if platform is not None: + sets.append("platform = ?") + params.append(platform) + if chat_type is not None: + sets.append("chat_type = ?") + params.append(chat_type) + if origin_json is not None: + sets.append("origin_json = ?") + params.append(origin_json) + if display_name is not None: + sets.append("display_name = ?") + params.append(display_name) + if not sets: + return + params.append(session_id) + sql = f"UPDATE sessions SET {', '.join(sets)} WHERE id = ?" + + def _do(conn): + conn.execute(sql, params) + self._execute_write(_do) + + def set_memory_flushed(self, session_id: str, flushed: bool = True) -> None: + """Mark a session as having its memory flushed.""" + def _do(conn): + conn.execute( + "UPDATE sessions SET memory_flushed = ? WHERE id = ?", + (1 if flushed else 0, session_id), + ) + self._execute_write(_do) + + def list_gateway_sessions(self, platform: str = None) -> List[Dict[str, Any]]: + """List sessions that have gateway routing metadata. + + Returns dicts with: id, session_key, platform, chat_type, + origin_json, display_name, source, started_at, ended_at, title, + message_count, memory_flushed. + + When ``platform`` is given, only sessions for that platform are returned. + Only sessions with a non-NULL session_key are included (i.e. sessions + that were created through the gateway, not bare CLI sessions). + """ + where = "WHERE session_key IS NOT NULL" + params = [] + if platform: + where += " AND platform = ?" + params.append(platform) + with self._lock: + rows = self._conn.execute( + f"""SELECT id, session_key, platform, chat_type, origin_json, + display_name, source, started_at, ended_at, title, + message_count, memory_flushed + FROM sessions {where} + ORDER BY started_at DESC""", + params, + ).fetchall() + return [dict(r) for r in rows] + + def find_session_by_origin( + self, platform: str, chat_id: str, thread_id: str = None, + ) -> Optional[Dict[str, Any]]: + """Find the most recent session for a platform + chat_id pair. + + Searches the origin_json column for matching chat_id. When + ``thread_id`` is given, also matches on thread_id. Returns the + session dict or None. + """ + # Use JSON extraction for matching. SQLite json_extract is + # available in all modern builds (3.9+). + sql = """ + SELECT id, session_key, platform, chat_type, origin_json, + display_name, source, started_at, ended_at, title, + memory_flushed + FROM sessions + WHERE platform = ? + AND json_extract(origin_json, '$.chat_id') = ? + AND session_key IS NOT NULL + """ + params: list = [platform, str(chat_id)] + if thread_id is not None: + sql += " AND json_extract(origin_json, '$.thread_id') = ?" + params.append(str(thread_id)) + sql += " ORDER BY started_at DESC LIMIT 1" + with self._lock: + row = self._conn.execute(sql, params).fetchone() + return dict(row) if row else None + def end_session(self, session_id: str, end_reason: str) -> None: """Mark a session as ended.""" def _do(conn): diff --git a/mcp_serve.py b/mcp_serve.py index e8294d1f9..1cb6eb662 100644 --- a/mcp_serve.py +++ b/mcp_serve.py @@ -79,11 +79,45 @@ def _get_session_db(): def _load_sessions_index() -> dict: - """Load the gateway sessions.json index directly. + """Load gateway session metadata from state.db. Returns a dict of session_key -> entry_dict with platform routing info. - This avoids importing the full SessionStore which needs GatewayConfig. + Falls back to reading sessions.json when state.db has no gateway metadata + (pre-migration databases). """ + try: + from hermes_state import SessionDB + db = SessionDB() + try: + rows = db.list_gateway_sessions() + finally: + db.close() + if rows: + result = {} + for row in rows: + sk = row.get("session_key") + if not sk: + continue + entry = { + "session_key": sk, + "session_id": row.get("id", ""), + "platform": row.get("platform", ""), + "chat_type": row.get("chat_type", "dm"), + "display_name": row.get("display_name"), + "memory_flushed": bool(row.get("memory_flushed", 0)), + } + origin_json = row.get("origin_json") + if origin_json: + try: + entry["origin"] = json.loads(origin_json) + except (json.JSONDecodeError, TypeError): + pass + result[sk] = entry + return result + except Exception as e: + logger.debug("Failed to load sessions from state.db: %s", e) + + # Fallback: read sessions.json for pre-migration databases sessions_file = _get_sessions_dir() / "sessions.json" if not sessions_file.exists(): return {} @@ -200,8 +234,7 @@ class EventBridge: self._last_poll_timestamps: Dict[str, float] = {} # session_key -> unix timestamp # In-memory approval tracking (populated from events) self._pending_approvals: Dict[str, dict] = {} - # mtime cache — skip expensive work when files haven't changed - self._sessions_json_mtime: float = 0.0 + # mtime cache — skip expensive work when state.db hasn't changed self._state_db_mtime: float = 0.0 self._cached_sessions_index: dict = {} @@ -327,21 +360,10 @@ class EventBridge: def _poll_once(self, db): """Check for new messages across all sessions. - Uses mtime checks on sessions.json and state.db to skip work - when nothing has changed — makes 200ms polling essentially free. + Uses mtime check on state.db to skip work when nothing has changed + — makes 200ms polling essentially free. """ - # Check if sessions.json has changed (mtime check is ~1μs) - sessions_file = _get_sessions_dir() / "sessions.json" - try: - sj_mtime = sessions_file.stat().st_mtime if sessions_file.exists() else 0.0 - except OSError: - sj_mtime = 0.0 - - if sj_mtime != self._sessions_json_mtime: - self._sessions_json_mtime = sj_mtime - self._cached_sessions_index = _load_sessions_index() - - # Check if state.db has changed + # Check if state.db has changed (mtime check is ~1μs) try: from hermes_constants import get_hermes_home db_file = get_hermes_home() / "state.db" @@ -353,10 +375,13 @@ class EventBridge: except OSError: db_mtime = 0.0 - if db_mtime == self._state_db_mtime and sj_mtime == self._sessions_json_mtime: + if db_mtime == self._state_db_mtime: return # Nothing changed since last poll — skip entirely self._state_db_mtime = db_mtime + # Reload the session index from state.db on every change since + # new sessions may have been created. + self._cached_sessions_index = _load_sessions_index() entries = self._cached_sessions_index for session_key, entry in entries.items(): diff --git a/tests/test_hermes_state.py b/tests/test_hermes_state.py index 5f9a16a52..0bde29d49 100644 --- a/tests/test_hermes_state.py +++ b/tests/test_hermes_state.py @@ -935,7 +935,7 @@ class TestSchemaInit: def test_schema_version(self, db): cursor = db._conn.execute("SELECT version FROM schema_version") version = cursor.fetchone()[0] - assert version == 6 + assert version == 7 def test_title_column_exists(self, db): """Verify the title column was created in the sessions table.""" @@ -996,7 +996,7 @@ class TestSchemaInit: # Verify migration cursor = migrated_db._conn.execute("SELECT version FROM schema_version") - assert cursor.fetchone()[0] == 6 + assert cursor.fetchone()[0] == 7 # Verify title column exists and is NULL for existing sessions session = migrated_db.get_session("existing")