mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-26 06:01:49 +00:00
feat: add term_index inverted index for instant session search
Adds a term-based inverted index (term_index table, schema v7) that eliminates LLM summarization from the default search path. The fast path returns session metadata and match counts in ~1ms vs 10-15s for the full FTS5+LLM pipeline. Key changes: - term_index table: (term, message_id, session_id) WITHOUT ROWID for clustered B-tree lookups. Populated at write time in append_message (best-effort, never blocks inserts). - stop_words.py: 179-word NLTK English stop list, no stemming - term_index.py: extract_terms() for term extraction - session_search_tool.py: fast=True default, _fast_search for term index path, _full_search preserves original behavior, CJK query fallback to slow path - Auto-reindex on v7 migration: _init_schema returns needs_reindex flag, __init__ calls reindex_term_index() after migration - Swap strategy for reindex: builds into temp table, then atomic swap in single transaction (no empty-index window) - get_child_session_ids(): public API replacing db._lock/db._conn access in _fast_search - mode field in search results: 'fast' or 'full' - Cascade deletes: clear_messages, delete_session, prune_sessions all clean term_index entries Benchmarks on production DB (47.7 MB, 29,435 messages): - Term index reindex: 1,152,587 entries from 29,435 messages in 4s - Fast path: 1-4ms (no LLM) - Slow path: 10,000-16,000ms (FTS5 + LLM summarization) - Speedup: 4,000-15,000x on full round-trip 195 tests passing (48 term_index + 149 hermes_state). 12 regression tests from red-team QA covering: param binding, child session resolution, cascade deletes, CJK fallback.
This commit is contained in:
parent
de1a3922ed
commit
410456c599
6 changed files with 1097 additions and 15 deletions
271
hermes_state.py
271
hermes_state.py
|
|
@ -31,7 +31,7 @@ T = TypeVar("T")
|
|||
|
||||
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
|
||||
|
||||
SCHEMA_VERSION = 8
|
||||
SCHEMA_VERSION = 9
|
||||
|
||||
SCHEMA_SQL = """
|
||||
CREATE TABLE IF NOT EXISTS schema_version (
|
||||
|
|
@ -95,6 +95,13 @@ CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
|
|||
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
|
||||
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS term_index (
|
||||
term TEXT NOT NULL,
|
||||
message_id INTEGER NOT NULL REFERENCES messages(id),
|
||||
session_id TEXT NOT NULL REFERENCES sessions(id),
|
||||
PRIMARY KEY (term, message_id)
|
||||
) WITHOUT ROWID;
|
||||
"""
|
||||
|
||||
FTS_SQL = """
|
||||
|
|
@ -164,7 +171,14 @@ class SessionDB:
|
|||
self._conn.execute("PRAGMA journal_mode=WAL")
|
||||
self._conn.execute("PRAGMA foreign_keys=ON")
|
||||
|
||||
self._init_schema()
|
||||
needs_reindex = self._init_schema()
|
||||
|
||||
# If we just migrated to v7, backfill the term index from existing
|
||||
# messages. This runs outside _init_schema so we can use
|
||||
# _execute_write (which manages its own transactions).
|
||||
if needs_reindex:
|
||||
logger.info("v7 migration detected — backfilling term index")
|
||||
self.reindex_term_index()
|
||||
|
||||
# ── Core write helper ──
|
||||
|
||||
|
|
@ -257,11 +271,17 @@ class SessionDB:
|
|||
self._conn = None
|
||||
|
||||
def _init_schema(self):
|
||||
"""Create tables and FTS if they don't exist, run migrations."""
|
||||
"""Create tables and FTS if they don't exist, run migrations.
|
||||
|
||||
Returns True if a v7 migration was performed (term_index created
|
||||
and needs backfill), False otherwise.
|
||||
"""
|
||||
cursor = self._conn.cursor()
|
||||
|
||||
cursor.executescript(SCHEMA_SQL)
|
||||
|
||||
needs_reindex = False
|
||||
|
||||
# Check schema version and run migrations
|
||||
cursor.execute("SELECT version FROM schema_version LIMIT 1")
|
||||
row = cursor.fetchone()
|
||||
|
|
@ -356,6 +376,24 @@ class SessionDB:
|
|||
except sqlite3.OperationalError:
|
||||
pass # Column already exists
|
||||
cursor.execute("UPDATE schema_version SET version = 8")
|
||||
if current_version < 9:
|
||||
# v9: add term_index table for inverted-index session search.
|
||||
# This is the clustered (term, message_id) WITHOUT ROWID table
|
||||
# used by the session_search fast path. After creating the
|
||||
# table, we backfill from existing messages.
|
||||
try:
|
||||
cursor.execute("""
|
||||
CREATE TABLE IF NOT EXISTS term_index (
|
||||
term TEXT NOT NULL,
|
||||
message_id INTEGER NOT NULL REFERENCES messages(id),
|
||||
session_id TEXT NOT NULL REFERENCES sessions(id),
|
||||
PRIMARY KEY (term, message_id)
|
||||
) WITHOUT ROWID
|
||||
""")
|
||||
except sqlite3.OperationalError:
|
||||
pass # Table already exists
|
||||
cursor.execute("UPDATE schema_version SET version = 9")
|
||||
needs_reindex = True
|
||||
|
||||
# Unique title index — always ensure it exists (safe to run after migrations
|
||||
# since the title column is guaranteed to exist at this point)
|
||||
|
|
@ -375,6 +413,8 @@ class SessionDB:
|
|||
|
||||
self._conn.commit()
|
||||
|
||||
return needs_reindex
|
||||
|
||||
# =========================================================================
|
||||
# Session lifecycle
|
||||
# =========================================================================
|
||||
|
|
@ -569,6 +609,23 @@ class SessionDB:
|
|||
row = cursor.fetchone()
|
||||
return dict(row) if row else None
|
||||
|
||||
def get_child_session_ids(self, *parent_ids: str) -> List[str]:
|
||||
"""Return IDs of sessions whose parent_session_id is in *parent_ids*.
|
||||
|
||||
Useful for finding delegation/compression child sessions that
|
||||
belong to a parent conversation. Does NOT recurse — only
|
||||
direct children are returned.
|
||||
"""
|
||||
if not parent_ids:
|
||||
return []
|
||||
placeholders = ",".join("?" for _ in parent_ids)
|
||||
with self._lock:
|
||||
cursor = self._conn.execute(
|
||||
f"SELECT id FROM sessions WHERE parent_session_id IN ({placeholders})",
|
||||
list(parent_ids),
|
||||
)
|
||||
return [row["id"] for row in cursor.fetchall()]
|
||||
|
||||
def resolve_session_id(self, session_id_or_prefix: str) -> Optional[str]:
|
||||
"""Resolve an exact or uniquely prefixed session ID to the full ID.
|
||||
|
||||
|
|
@ -1015,6 +1072,22 @@ class SessionDB:
|
|||
"UPDATE sessions SET message_count = message_count + 1 WHERE id = ?",
|
||||
(session_id,),
|
||||
)
|
||||
|
||||
# Insert terms into inverted index (delayed import avoids
|
||||
# circular dependency — hermes_state is imported by nearly
|
||||
# everything at startup, term_index must not be top-level)
|
||||
if content:
|
||||
try:
|
||||
from term_index import extract_terms
|
||||
terms = extract_terms(content)
|
||||
if terms:
|
||||
conn.executemany(
|
||||
"INSERT OR IGNORE INTO term_index (term, message_id, session_id) VALUES (?, ?, ?)",
|
||||
[(t, msg_id, session_id) for t in terms],
|
||||
)
|
||||
except Exception:
|
||||
pass # Term indexing is best-effort; never block a message insert
|
||||
|
||||
return msg_id
|
||||
|
||||
return self._execute_write(_do)
|
||||
|
|
@ -1395,6 +1468,188 @@ class SessionDB:
|
|||
|
||||
return matches
|
||||
|
||||
# =========================================================================
|
||||
# Term index search (inverted index fast path)
|
||||
# =========================================================================
|
||||
|
||||
def search_by_terms(
|
||||
self,
|
||||
terms: List[str],
|
||||
exclude_sources: List[str] = None,
|
||||
limit: int = 10,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Search sessions using the term_index inverted index.
|
||||
|
||||
Takes a list of query terms, finds all message IDs for each term,
|
||||
intersects them (AND logic), and returns matching sessions with
|
||||
metadata and match counts.
|
||||
|
||||
This is the fast path for session search -- no LLM calls needed.
|
||||
"""
|
||||
if not terms:
|
||||
return []
|
||||
|
||||
# Filter out stop words from the query (belt and suspenders)
|
||||
# Delayed import avoids circular dependency (same pattern as append_message)
|
||||
from stop_words import is_stop_word
|
||||
filtered = [t.lower() for t in terms if t and not is_stop_word(t.lower())]
|
||||
if not filtered:
|
||||
return []
|
||||
|
||||
# Build the query. For single terms, a simple WHERE suffices.
|
||||
# For multiple terms, we use GROUP BY session_id + HAVING COUNT(DISTINCT term) = N
|
||||
# to enforce AND semantics: only sessions containing ALL query terms match.
|
||||
if len(filtered) == 1:
|
||||
term = filtered[0]
|
||||
params: list = [term]
|
||||
where_sql = "ti.term = ?"
|
||||
|
||||
if exclude_sources:
|
||||
exclude_placeholders = ",".join("?" for _ in exclude_sources)
|
||||
where_sql += f" AND s.source NOT IN ({exclude_placeholders})"
|
||||
params.extend(exclude_sources)
|
||||
|
||||
sql = f"""
|
||||
SELECT ti.session_id,
|
||||
s.source,
|
||||
s.model,
|
||||
s.started_at AS session_started,
|
||||
s.title,
|
||||
COUNT(DISTINCT ti.message_id) AS match_count
|
||||
FROM term_index ti
|
||||
JOIN sessions s ON s.id = ti.session_id
|
||||
WHERE {where_sql}
|
||||
GROUP BY ti.session_id
|
||||
ORDER BY match_count DESC, s.started_at DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
params.append(limit)
|
||||
else:
|
||||
# Multi-term: GROUP BY + HAVING COUNT(DISTINCT term) = N enforces AND
|
||||
term_placeholders = ",".join("?" for _ in filtered)
|
||||
params = list(filtered)
|
||||
exclude_sql = ""
|
||||
if exclude_sources:
|
||||
exclude_sql = f" AND s.source NOT IN ({','.join('?' for _ in exclude_sources)})"
|
||||
params.extend(exclude_sources)
|
||||
params.extend([len(filtered), limit])
|
||||
|
||||
sql = f"""
|
||||
SELECT ti.session_id,
|
||||
s.source,
|
||||
s.model,
|
||||
s.started_at AS session_started,
|
||||
s.title,
|
||||
COUNT(DISTINCT ti.term) AS term_count,
|
||||
COUNT(DISTINCT ti.message_id) AS match_count
|
||||
FROM term_index ti
|
||||
JOIN sessions s ON s.id = ti.session_id
|
||||
WHERE ti.term IN ({term_placeholders})
|
||||
{exclude_sql}
|
||||
GROUP BY ti.session_id
|
||||
HAVING COUNT(DISTINCT ti.term) = ?
|
||||
ORDER BY match_count DESC, s.started_at DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
|
||||
with self._lock:
|
||||
try:
|
||||
cursor = self._conn.execute(sql, params)
|
||||
results = [dict(row) for row in cursor.fetchall()]
|
||||
except sqlite3.OperationalError:
|
||||
logger.debug("term_index query failed", exc_info=True)
|
||||
return []
|
||||
|
||||
return results
|
||||
|
||||
def reindex_term_index(self, batch_size: int = 500) -> int:
|
||||
"""
|
||||
Rebuild the term_index from existing messages.
|
||||
|
||||
Processes messages in batches to avoid holding the write lock too
|
||||
long. Returns the total number of term entries inserted.
|
||||
|
||||
Uses a swap strategy: builds a temporary table, then swaps it
|
||||
into place in a single transaction. This avoids the empty-index
|
||||
window that would occur with a simple clear-and-repopulate.
|
||||
"""
|
||||
from term_index import extract_terms
|
||||
|
||||
# Count total messages to index
|
||||
with self._lock:
|
||||
total = self._conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0]
|
||||
|
||||
if total == 0:
|
||||
return 0
|
||||
|
||||
inserted = 0
|
||||
offset = 0
|
||||
|
||||
# Create a temporary table with the same schema as term_index.
|
||||
# We'll populate this, then swap it into place in a single
|
||||
# transaction — no empty-index window for concurrent readers.
|
||||
def _create_temp(conn):
|
||||
conn.execute("""
|
||||
CREATE TABLE IF NOT EXISTS _term_index_new (
|
||||
term TEXT NOT NULL,
|
||||
message_id INTEGER NOT NULL,
|
||||
session_id TEXT NOT NULL,
|
||||
PRIMARY KEY (term, message_id)
|
||||
) WITHOUT ROWID
|
||||
""")
|
||||
conn.execute("DELETE FROM _term_index_new")
|
||||
self._execute_write(_create_temp)
|
||||
|
||||
while offset < total:
|
||||
# Read batch outside write lock
|
||||
with self._lock:
|
||||
cursor = self._conn.execute(
|
||||
"SELECT id, session_id, content FROM messages ORDER BY id LIMIT ? OFFSET ?",
|
||||
(batch_size, offset),
|
||||
)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
if not rows:
|
||||
break
|
||||
|
||||
# Extract terms for the batch
|
||||
entries = []
|
||||
for row in rows:
|
||||
msg_id = row["id"]
|
||||
session_id = row["session_id"]
|
||||
content = row["content"] or ""
|
||||
terms = extract_terms(content)
|
||||
for term in terms:
|
||||
entries.append((term, msg_id, session_id))
|
||||
|
||||
# Write batch to temp table
|
||||
if entries:
|
||||
def _insert(conn, _entries=entries):
|
||||
conn.executemany(
|
||||
"INSERT OR IGNORE INTO _term_index_new (term, message_id, session_id) VALUES (?, ?, ?)",
|
||||
_entries,
|
||||
)
|
||||
return len(_entries)
|
||||
inserted += self._execute_write(_insert)
|
||||
|
||||
offset += batch_size
|
||||
|
||||
# Swap: replace term_index with the new table in one transaction.
|
||||
# Concurrent readers see either the old index or the new one —
|
||||
# never an empty table.
|
||||
def _swap(conn):
|
||||
conn.execute("DELETE FROM term_index")
|
||||
conn.execute("""
|
||||
INSERT INTO term_index (term, message_id, session_id)
|
||||
SELECT term, message_id, session_id FROM _term_index_new
|
||||
""")
|
||||
conn.execute("DROP TABLE _term_index_new")
|
||||
self._execute_write(_swap)
|
||||
|
||||
logger.info("Reindexed term_index: %d entries from %d messages", inserted, total)
|
||||
return inserted
|
||||
|
||||
def search_sessions(
|
||||
self,
|
||||
source: str = None,
|
||||
|
|
@ -1466,8 +1721,14 @@ class SessionDB:
|
|||
return results
|
||||
|
||||
def clear_messages(self, session_id: str) -> None:
|
||||
"""Delete all messages for a session and reset its counters."""
|
||||
"""Delete all messages for a session and reset its counters.
|
||||
|
||||
Also removes stale term_index entries that reference the deleted messages.
|
||||
"""
|
||||
def _do(conn):
|
||||
conn.execute(
|
||||
"DELETE FROM term_index WHERE session_id = ?", (session_id,)
|
||||
)
|
||||
conn.execute(
|
||||
"DELETE FROM messages WHERE session_id = ?", (session_id,)
|
||||
)
|
||||
|
|
@ -1496,6 +1757,7 @@ class SessionDB:
|
|||
"WHERE parent_session_id = ?",
|
||||
(session_id,),
|
||||
)
|
||||
conn.execute("DELETE FROM term_index WHERE session_id = ?", (session_id,))
|
||||
conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
|
||||
conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
|
||||
return True
|
||||
|
|
@ -1536,6 +1798,7 @@ class SessionDB:
|
|||
)
|
||||
|
||||
for sid in session_ids:
|
||||
conn.execute("DELETE FROM term_index WHERE session_id = ?", (sid,))
|
||||
conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
|
||||
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
|
||||
return len(session_ids)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue