feat: add term_index inverted index for instant session search

Adds a term-based inverted index (term_index table, schema v7) that
eliminates LLM summarization from the default search path. The fast
path returns session metadata and match counts in ~1ms vs 10-15s for
the full FTS5+LLM pipeline.

Key changes:
- term_index table: (term, message_id, session_id) WITHOUT ROWID
  for clustered B-tree lookups. Populated at write time in
  append_message (best-effort, never blocks inserts).
- stop_words.py: 179-word NLTK English stop list, no stemming
- term_index.py: extract_terms() for term extraction
- session_search_tool.py: fast=True default, _fast_search for term
  index path, _full_search preserves original behavior, CJK query
  fallback to slow path
- Auto-reindex on v7 migration: _init_schema returns needs_reindex
  flag, __init__ calls reindex_term_index() after migration
- Swap strategy for reindex: builds into temp table, then atomic
  swap in single transaction (no empty-index window)
- get_child_session_ids(): public API replacing db._lock/db._conn
  access in _fast_search
- mode field in search results: 'fast' or 'full'
- Cascade deletes: clear_messages, delete_session, prune_sessions
  all clean term_index entries

Benchmarks on production DB (47.7 MB, 29,435 messages):
  - Term index reindex: 1,152,587 entries from 29,435 messages in 4s
  - Fast path: 1-4ms (no LLM)
  - Slow path: 10,000-16,000ms (FTS5 + LLM summarization)
  - Speedup: 4,000-15,000x on full round-trip

195 tests passing (48 term_index + 149 hermes_state).
12 regression tests from red-team QA covering: param binding,
child session resolution, cascade deletes, CJK fallback.
This commit is contained in:
AJ 2026-04-21 22:31:36 -04:00 committed by AJ
parent 4fade39c90
commit 9d758706ef
6 changed files with 1097 additions and 15 deletions

View file

@ -31,7 +31,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 8
SCHEMA_VERSION = 9
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@ -95,6 +95,13 @@ CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
CREATE TABLE IF NOT EXISTS term_index (
term TEXT NOT NULL,
message_id INTEGER NOT NULL REFERENCES messages(id),
session_id TEXT NOT NULL REFERENCES sessions(id),
PRIMARY KEY (term, message_id)
) WITHOUT ROWID;
"""
FTS_SQL = """
@ -164,7 +171,14 @@ class SessionDB:
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA foreign_keys=ON")
self._init_schema()
needs_reindex = self._init_schema()
# If we just migrated to v7, backfill the term index from existing
# messages. This runs outside _init_schema so we can use
# _execute_write (which manages its own transactions).
if needs_reindex:
logger.info("v7 migration detected — backfilling term index")
self.reindex_term_index()
# ── Core write helper ──
@ -257,11 +271,17 @@ class SessionDB:
self._conn = None
def _init_schema(self):
"""Create tables and FTS if they don't exist, run migrations."""
"""Create tables and FTS if they don't exist, run migrations.
Returns True if a v7 migration was performed (term_index created
and needs backfill), False otherwise.
"""
cursor = self._conn.cursor()
cursor.executescript(SCHEMA_SQL)
needs_reindex = False
# Check schema version and run migrations
cursor.execute("SELECT version FROM schema_version LIMIT 1")
row = cursor.fetchone()
@ -356,6 +376,24 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 8")
if current_version < 9:
# v9: add term_index table for inverted-index session search.
# This is the clustered (term, message_id) WITHOUT ROWID table
# used by the session_search fast path. After creating the
# table, we backfill from existing messages.
try:
cursor.execute("""
CREATE TABLE IF NOT EXISTS term_index (
term TEXT NOT NULL,
message_id INTEGER NOT NULL REFERENCES messages(id),
session_id TEXT NOT NULL REFERENCES sessions(id),
PRIMARY KEY (term, message_id)
) WITHOUT ROWID
""")
except sqlite3.OperationalError:
pass # Table already exists
cursor.execute("UPDATE schema_version SET version = 9")
needs_reindex = True
# Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point)
@ -375,6 +413,8 @@ class SessionDB:
self._conn.commit()
return needs_reindex
# =========================================================================
# Session lifecycle
# =========================================================================
@ -569,6 +609,23 @@ class SessionDB:
row = cursor.fetchone()
return dict(row) if row else None
def get_child_session_ids(self, *parent_ids: str) -> List[str]:
"""Return IDs of sessions whose parent_session_id is in *parent_ids*.
Useful for finding delegation/compression child sessions that
belong to a parent conversation. Does NOT recurse only
direct children are returned.
"""
if not parent_ids:
return []
placeholders = ",".join("?" for _ in parent_ids)
with self._lock:
cursor = self._conn.execute(
f"SELECT id FROM sessions WHERE parent_session_id IN ({placeholders})",
list(parent_ids),
)
return [row["id"] for row in cursor.fetchall()]
def resolve_session_id(self, session_id_or_prefix: str) -> Optional[str]:
"""Resolve an exact or uniquely prefixed session ID to the full ID.
@ -1015,6 +1072,22 @@ class SessionDB:
"UPDATE sessions SET message_count = message_count + 1 WHERE id = ?",
(session_id,),
)
# Insert terms into inverted index (delayed import avoids
# circular dependency — hermes_state is imported by nearly
# everything at startup, term_index must not be top-level)
if content:
try:
from term_index import extract_terms
terms = extract_terms(content)
if terms:
conn.executemany(
"INSERT OR IGNORE INTO term_index (term, message_id, session_id) VALUES (?, ?, ?)",
[(t, msg_id, session_id) for t in terms],
)
except Exception:
pass # Term indexing is best-effort; never block a message insert
return msg_id
return self._execute_write(_do)
@ -1395,6 +1468,188 @@ class SessionDB:
return matches
# =========================================================================
# Term index search (inverted index fast path)
# =========================================================================
def search_by_terms(
self,
terms: List[str],
exclude_sources: List[str] = None,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""
Search sessions using the term_index inverted index.
Takes a list of query terms, finds all message IDs for each term,
intersects them (AND logic), and returns matching sessions with
metadata and match counts.
This is the fast path for session search -- no LLM calls needed.
"""
if not terms:
return []
# Filter out stop words from the query (belt and suspenders)
# Delayed import avoids circular dependency (same pattern as append_message)
from stop_words import is_stop_word
filtered = [t.lower() for t in terms if t and not is_stop_word(t.lower())]
if not filtered:
return []
# Build the query. For single terms, a simple WHERE suffices.
# For multiple terms, we use GROUP BY session_id + HAVING COUNT(DISTINCT term) = N
# to enforce AND semantics: only sessions containing ALL query terms match.
if len(filtered) == 1:
term = filtered[0]
params: list = [term]
where_sql = "ti.term = ?"
if exclude_sources:
exclude_placeholders = ",".join("?" for _ in exclude_sources)
where_sql += f" AND s.source NOT IN ({exclude_placeholders})"
params.extend(exclude_sources)
sql = f"""
SELECT ti.session_id,
s.source,
s.model,
s.started_at AS session_started,
s.title,
COUNT(DISTINCT ti.message_id) AS match_count
FROM term_index ti
JOIN sessions s ON s.id = ti.session_id
WHERE {where_sql}
GROUP BY ti.session_id
ORDER BY match_count DESC, s.started_at DESC
LIMIT ?
"""
params.append(limit)
else:
# Multi-term: GROUP BY + HAVING COUNT(DISTINCT term) = N enforces AND
term_placeholders = ",".join("?" for _ in filtered)
params = list(filtered)
exclude_sql = ""
if exclude_sources:
exclude_sql = f" AND s.source NOT IN ({','.join('?' for _ in exclude_sources)})"
params.extend(exclude_sources)
params.extend([len(filtered), limit])
sql = f"""
SELECT ti.session_id,
s.source,
s.model,
s.started_at AS session_started,
s.title,
COUNT(DISTINCT ti.term) AS term_count,
COUNT(DISTINCT ti.message_id) AS match_count
FROM term_index ti
JOIN sessions s ON s.id = ti.session_id
WHERE ti.term IN ({term_placeholders})
{exclude_sql}
GROUP BY ti.session_id
HAVING COUNT(DISTINCT ti.term) = ?
ORDER BY match_count DESC, s.started_at DESC
LIMIT ?
"""
with self._lock:
try:
cursor = self._conn.execute(sql, params)
results = [dict(row) for row in cursor.fetchall()]
except sqlite3.OperationalError:
logger.debug("term_index query failed", exc_info=True)
return []
return results
def reindex_term_index(self, batch_size: int = 500) -> int:
"""
Rebuild the term_index from existing messages.
Processes messages in batches to avoid holding the write lock too
long. Returns the total number of term entries inserted.
Uses a swap strategy: builds a temporary table, then swaps it
into place in a single transaction. This avoids the empty-index
window that would occur with a simple clear-and-repopulate.
"""
from term_index import extract_terms
# Count total messages to index
with self._lock:
total = self._conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0]
if total == 0:
return 0
inserted = 0
offset = 0
# Create a temporary table with the same schema as term_index.
# We'll populate this, then swap it into place in a single
# transaction — no empty-index window for concurrent readers.
def _create_temp(conn):
conn.execute("""
CREATE TABLE IF NOT EXISTS _term_index_new (
term TEXT NOT NULL,
message_id INTEGER NOT NULL,
session_id TEXT NOT NULL,
PRIMARY KEY (term, message_id)
) WITHOUT ROWID
""")
conn.execute("DELETE FROM _term_index_new")
self._execute_write(_create_temp)
while offset < total:
# Read batch outside write lock
with self._lock:
cursor = self._conn.execute(
"SELECT id, session_id, content FROM messages ORDER BY id LIMIT ? OFFSET ?",
(batch_size, offset),
)
rows = cursor.fetchall()
if not rows:
break
# Extract terms for the batch
entries = []
for row in rows:
msg_id = row["id"]
session_id = row["session_id"]
content = row["content"] or ""
terms = extract_terms(content)
for term in terms:
entries.append((term, msg_id, session_id))
# Write batch to temp table
if entries:
def _insert(conn, _entries=entries):
conn.executemany(
"INSERT OR IGNORE INTO _term_index_new (term, message_id, session_id) VALUES (?, ?, ?)",
_entries,
)
return len(_entries)
inserted += self._execute_write(_insert)
offset += batch_size
# Swap: replace term_index with the new table in one transaction.
# Concurrent readers see either the old index or the new one —
# never an empty table.
def _swap(conn):
conn.execute("DELETE FROM term_index")
conn.execute("""
INSERT INTO term_index (term, message_id, session_id)
SELECT term, message_id, session_id FROM _term_index_new
""")
conn.execute("DROP TABLE _term_index_new")
self._execute_write(_swap)
logger.info("Reindexed term_index: %d entries from %d messages", inserted, total)
return inserted
def search_sessions(
self,
source: str = None,
@ -1466,8 +1721,14 @@ class SessionDB:
return results
def clear_messages(self, session_id: str) -> None:
"""Delete all messages for a session and reset its counters."""
"""Delete all messages for a session and reset its counters.
Also removes stale term_index entries that reference the deleted messages.
"""
def _do(conn):
conn.execute(
"DELETE FROM term_index WHERE session_id = ?", (session_id,)
)
conn.execute(
"DELETE FROM messages WHERE session_id = ?", (session_id,)
)
@ -1496,6 +1757,7 @@ class SessionDB:
"WHERE parent_session_id = ?",
(session_id,),
)
conn.execute("DELETE FROM term_index WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
return True
@ -1536,6 +1798,7 @@ class SessionDB:
)
for sid in session_ids:
conn.execute("DELETE FROM term_index WHERE session_id = ?", (sid,))
conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
return len(session_ids)

42
stop_words.py Normal file
View file

@ -0,0 +1,42 @@
"""Stop word list for term index extraction.
Uses the well-known NLTK English stop word list (179 words) as a baseline.
This module is self-contained -- no external dependencies.
"""
# Standard English stop words (NLTK list, public domain)
# Covers articles, conjunctions, prepositions, pronouns, auxiliary verbs,
# and common function words. Intentionally excludes short tech terms
# that overlap (e.g., "go", "it" as in IT/InfoTech handled by context).
_STOP_WORDS = frozenset(
w.lower() for w in [
"i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you",
"your", "yours", "yourself", "yourselves", "he", "him", "his",
"himself", "she", "her", "hers", "herself", "it", "its", "itself",
"they", "them", "their", "theirs", "themselves", "what", "which",
"who", "whom", "this", "that", "these", "those", "am", "is", "are",
"was", "were", "be", "been", "being", "have", "has", "had", "having",
"do", "does", "did", "doing", "a", "an", "the", "and", "but", "if",
"or", "because", "as", "until", "while", "of", "at", "by", "for",
"with", "about", "against", "between", "through", "during", "before",
"after", "above", "below", "to", "from", "up", "down", "in", "out",
"on", "off", "over", "under", "again", "further", "then", "once",
"here", "there", "when", "where", "why", "how", "all", "both", "each",
"few", "more", "most", "other", "some", "such", "no", "nor", "not",
"only", "own", "same", "so", "than", "too", "very", "s", "t", "can",
"will", "just", "don", "should", "now", "d", "ll", "m", "o", "re",
"ve", "y", "ain", "aren", "couldn", "didn", "doesn", "hadn", "hasn",
"haven", "isn", "ma", "mightn", "mustn", "needn", "shan", "shouldn",
"wasn", "weren", "won", "wouldn",
]
)
def is_stop_word(word: str) -> bool:
"""Check if a word is a stop word. Case-insensitive."""
return word.lower() in _STOP_WORDS
def get_stop_words() -> frozenset:
"""Return the full stop word set (for inspection/bulk use)."""
return _STOP_WORDS

44
term_index.py Normal file
View file

@ -0,0 +1,44 @@
"""Term index — inverted index extraction for session search fast path.
Extracts non-stop-word terms from message content for insertion into the
term_index table in SessionDB. Terms are lowercased, punctuation-stripped
(with preservation of path-like strings), and deduplicated per message.
"""
import re
from stop_words import is_stop_word
# Matches "words" including paths (foo/bar), filenames (file.py), and
# hyphenated terms (self-hosted). Filters out most punctuation but
# preserves dots in filenames and slashes in paths.
# Strategy: split on whitespace first, then strip leading/trailing punctuation.
_TERM_RE = re.compile(r"[a-zA-Z0-9][\w./\-]*[a-zA-Z0-9]|[a-zA-Z0-9]")
def extract_terms(content: str) -> list[str]:
"""Extract non-stop-word terms from message content.
Returns a deduplicated, lowercased list of terms.
Stops words, pure punctuation, and empty strings are excluded.
"""
if not content:
return []
# Find candidate tokens
raw_tokens = _TERM_RE.findall(content)
seen = set()
terms = []
for token in raw_tokens:
lower = token.lower()
# Skip stop words
if is_stop_word(lower):
continue
# Skip single characters except meaningful ones
# (but these are already handled by stop words for 'a', 'I', etc.)
# Deduplicate within this message
if lower not in seen:
seen.add(lower)
terms.append(lower)
return terms

View file

@ -1173,7 +1173,7 @@ class TestSchemaInit:
def test_schema_version(self, db):
cursor = db._conn.execute("SELECT version FROM schema_version")
version = cursor.fetchone()[0]
assert version == 8
assert version == 9
def test_title_column_exists(self, db):
"""Verify the title column was created in the sessions table."""
@ -1229,12 +1229,12 @@ class TestSchemaInit:
conn.commit()
conn.close()
# Open with SessionDB — should migrate to v8
# Open with SessionDB — should migrate to v9
migrated_db = SessionDB(db_path=db_path)
# Verify migration
cursor = migrated_db._conn.execute("SELECT version FROM schema_version")
assert cursor.fetchone()[0] == 8
assert cursor.fetchone()[0] == 9
# Verify title column exists and is NULL for existing sessions
session = migrated_db.get_session("existing")

603
tests/test_term_index.py Normal file
View file

@ -0,0 +1,603 @@
"""Tests for term_index — inverted index for session search fast path.
Covers: stop word filtering, term extraction, term insertion at write time,
term-based search with session-level results, multi-term intersection.
"""
import time
import pytest
from pathlib import Path
from hermes_state import SessionDB
@pytest.fixture()
def db(tmp_path):
"""Create a SessionDB with a temp database file."""
db_path = tmp_path / "test_state.db"
session_db = SessionDB(db_path=db_path)
yield session_db
session_db.close()
# =========================================================================
# Stop word filtering
# =========================================================================
class TestStopWords:
def test_common_english_words_are_stopped(self):
from stop_words import is_stop_word
for w in ["the", "and", "is", "in", "it", "of", "to", "a", "was", "for"]:
assert is_stop_word(w), f"'{w}' should be a stop word"
def test_case_insensitive_stop_words(self):
from stop_words import is_stop_word
assert is_stop_word("The")
assert is_stop_word("AND")
assert is_stop_word("Is")
def test_non_stop_words_pass(self):
from stop_words import is_stop_word
for w in ["docker", "kubernetes", "python", "hermes", "session"]:
assert not is_stop_word(w), f"'{w}' should NOT be a stop word"
def test_short_words_not_auto_stopped(self):
"""Single letters and 2-letter words that aren't in the list should pass."""
from stop_words import is_stop_word
# 'go' is a real tech term, 'I' is a stop word
assert not is_stop_word("go")
assert is_stop_word("I")
# =========================================================================
# Term extraction
# =========================================================================
class TestTermExtraction:
def test_extracts_words_from_content(self):
from term_index import extract_terms
terms = extract_terms("docker compose up -d")
assert "docker" in terms
assert "compose" in terms
def test_strips_punctuation(self):
from term_index import extract_terms
terms = extract_terms("It's working! Check the file.py, okay?")
assert "working" in terms
assert "file.py" in terms # dots in filenames preserved
assert "okay" in terms
def test_filters_stop_words(self):
from term_index import extract_terms
terms = extract_terms("the docker container is running in the background")
assert "the" not in terms
assert "is" not in terms
assert "in" not in terms
assert "docker" in terms
assert "container" in terms
assert "running" in terms
def test_case_folded(self):
from term_index import extract_terms
terms = extract_terms("Docker DOCKER docker")
# Should be case-folded to single term
assert len(terms) == len(set(terms)), "Terms should be deduplicated after case folding"
def test_empty_content(self):
from term_index import extract_terms
terms = extract_terms("")
assert terms == []
def test_none_content(self):
from term_index import extract_terms
terms = extract_terms(None)
assert terms == []
def test_preserves_paths_and_commands(self):
from term_index import extract_terms
terms = extract_terms("edited /etc/hosts and ran git push origin main")
assert "/etc/hosts" in terms or "etc/hosts" in terms # path fragment
assert "git" in terms
assert "push" in terms
# =========================================================================
# Term index insertion
# =========================================================================
class TestTermIndexInsertion:
def test_terms_inserted_on_append_message(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(
session_id="s1",
role="user",
content="I need to deploy the docker container",
)
# Should be able to find the message by term
results = db.search_by_terms(["docker"])
assert len(results) >= 1
assert any(r["session_id"] == "s1" for r in results)
def test_stop_words_not_indexed(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(
session_id="s1",
role="user",
content="the and is in of to a",
)
# All stop words — should find nothing
results = db.search_by_terms(["the", "and", "is"])
assert len(results) == 0
def test_same_term_multiple_messages_same_session(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="docker is great")
db.append_message(session_id="s1", role="assistant", content="docker compose ready")
results = db.search_by_terms(["docker"])
# Should return session once, not twice
sids = [r["session_id"] for r in results]
assert sids.count("s1") == 1
def test_term_indexed_across_sessions(self, db):
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="telegram")
db.append_message(session_id="s1", role="user", content="fix the docker bug")
db.append_message(session_id="s2", role="user", content="docker pull failed")
results = db.search_by_terms(["docker"])
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" in sids
# =========================================================================
# Term-based search
# =========================================================================
class TestTermSearch:
def test_single_term_search(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(
session_id="s1",
role="user",
content="I need to configure kubernetes",
)
results = db.search_by_terms(["kubernetes"])
assert len(results) >= 1
assert results[0]["session_id"] == "s1"
# Should include session metadata
assert "source" in results[0]
assert "started_at" in results[0] or "session_started" in results[0]
def test_multi_term_intersection(self, db):
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="cli")
db.create_session(session_id="s3", source="cli")
db.append_message(session_id="s1", role="user", content="docker networking issue")
db.append_message(session_id="s2", role="user", content="docker container running")
db.append_message(session_id="s3", role="user", content="kubernetes networking problem")
# Both "docker" AND "networking" should only match s1
results = db.search_by_terms(["docker", "networking"])
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" not in sids
assert "s3" not in sids
def test_search_returns_empty_for_stop_words_only(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="the and is")
results = db.search_by_terms(["the", "and"])
assert results == []
def test_search_excludes_hidden_sources(self, db):
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="tool")
db.append_message(session_id="s1", role="user", content="docker deployment")
db.append_message(session_id="s2", role="user", content="docker deployment tool")
results = db.search_by_terms(["docker"], exclude_sources=["tool"])
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" not in sids
def test_search_with_limit(self, db):
for i in range(5):
sid = f"s{i}"
db.create_session(session_id=sid, source="cli")
db.append_message(session_id=sid, role="user", content="python script")
results = db.search_by_terms(["python"], limit=3)
assert len(results) <= 3
def test_nonexistent_term_returns_empty(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="hello world")
results = db.search_by_terms(["nonexistent_xyzzy"])
assert results == []
def test_term_result_includes_match_count(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="docker docker docker")
db.append_message(session_id="s1", role="assistant", content="docker ready")
results = db.search_by_terms(["docker"])
assert len(results) >= 1
# Should tell us how many messages matched in the session
assert "match_count" in results[0]
# =========================================================================
# Schema and migration
# =========================================================================
class TestTermIndexSchema:
def test_term_index_table_exists(self, db):
cursor = db._conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='term_index'"
)
assert cursor.fetchone() is not None
def test_term_index_is_without_rowid(self, db):
cursor = db._conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='term_index'"
)
row = cursor.fetchone()
assert row is not None
assert "WITHOUT ROWID" in row[0]
def test_schema_version_bumped(self, db):
cursor = db._conn.execute("SELECT version FROM schema_version LIMIT 1")
version = cursor.fetchone()[0]
assert version >= 9
def test_existing_data_survives_migration(self, tmp_path):
"""Create a v6 DB, then open it with current code -- data should survive."""
# Build a v6 DB manually
db_path = tmp_path / "migrate.db"
db = SessionDB(db_path=db_path)
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="hello world")
db.close()
# Re-open -- migration should run, data intact
db2 = SessionDB(db_path=db_path)
session = db2.get_session("s1")
assert session is not None
assert session["source"] == "cli"
# term_index should now exist
cursor = db2._conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='term_index'"
)
assert cursor.fetchone() is not None
db2.close()
def test_v9_migration_auto_reindexes(self, tmp_path):
"""When a v6 DB with existing messages is opened, the v9 migration
should create the term_index and backfill it automatically."""
db_path = tmp_path / "migrate_v9.db"
# Step 1: Create a fresh DB, add messages, then manually downgrade
# to v6 so the next open triggers the migration path.
db = SessionDB(db_path=db_path)
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="cli")
db.append_message(session_id="s1", role="user", content="deploy the kubernetes cluster")
db.append_message(session_id="s2", role="user", content="debug docker networking issue")
db.close()
# Step 2: Re-open raw, manually set version to 6 and wipe term_index
# to simulate a pre-v7 DB.
import sqlite3
conn = sqlite3.connect(str(db_path))
conn.execute("UPDATE schema_version SET version = 6")
conn.execute("DROP TABLE IF EXISTS term_index")
conn.commit()
conn.close()
# Step 3: Open with SessionDB — should migrate to v9 and auto-reindex.
db2 = SessionDB(db_path=db_path)
# Verify version is now 9
cursor = db2._conn.execute("SELECT version FROM schema_version")
assert cursor.fetchone()[0] == 9
# Verify term_index is populated — search should find the terms
results = db2.search_by_terms(["kubernetes"])
assert len(results) >= 1
assert results[0]["session_id"] == "s1"
results2 = db2.search_by_terms(["docker"])
assert len(results2) >= 1
assert results2[0]["session_id"] == "s2"
db2.close()
# =========================================================================
# Regression tests for red-team QA bugs
# =========================================================================
class TestClearMessagesCleansTermIndex:
"""BUG 3: clear_messages() left stale term_index entries.
After clearing messages, search_by_terms should return zero results
for that session, not ghost matches pointing to deleted message IDs.
"""
def test_clear_messages_removes_term_entries(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="docker networking issue")
# Confirm indexed
results = db.search_by_terms(["docker"])
assert len(results) >= 1
# Clear messages
db.clear_messages(session_id="s1")
# Term entries should be gone
results = db.search_by_terms(["docker"])
assert results == []
def test_clear_messages_does_not_affect_other_sessions(self, db):
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="cli")
db.append_message(session_id="s1", role="user", content="docker test")
db.append_message(session_id="s2", role="user", content="docker prod")
db.clear_messages(session_id="s1")
# s2 should still be searchable
results = db.search_by_terms(["docker"])
sids = [r["session_id"] for r in results]
assert "s2" in sids
assert "s1" not in sids
def test_clear_messages_no_stray_term_rows(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="kubernetes deployment")
db.clear_messages(session_id="s1")
cursor = db._conn.execute(
"SELECT COUNT(*) FROM term_index WHERE session_id = 's1'"
)
assert cursor.fetchone()[0] == 0
class TestSearchByTermsParamBinding:
"""BUG 1: search_by_terms() had dead code with wrong param binding.
The multi-term GROUP BY + HAVING path is the one that actually runs.
These tests verify parameter binding is correct for both single and
multi-term queries, including with exclude_sources.
"""
def test_single_term_with_exclude_sources(self, db):
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="tool")
db.append_message(session_id="s1", role="user", content="docker deploy")
db.append_message(session_id="s2", role="user", content="docker deploy")
results = db.search_by_terms(["docker"], exclude_sources=["tool"])
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" not in sids
def test_multi_term_and_semantics(self, db):
"""Multi-term search should use AND: only sessions with ALL terms match."""
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="cli")
db.create_session(session_id="s3", source="cli")
db.append_message(session_id="s1", role="user", content="docker networking issue")
db.append_message(session_id="s2", role="user", content="docker container only")
db.append_message(session_id="s3", role="user", content="networking problem only")
results = db.search_by_terms(["docker", "networking"])
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" not in sids
assert "s3" not in sids
def test_multi_term_with_exclude_sources(self, db):
"""Multi-term + exclude_sources: param binding must be correct."""
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="tool")
db.append_message(session_id="s1", role="user", content="docker networking setup")
db.append_message(session_id="s2", role="user", content="docker networking deploy")
results = db.search_by_terms(
["docker", "networking"], exclude_sources=["tool"]
)
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" not in sids
def test_three_term_intersection(self, db):
"""Three-term AND: all three must be present in the session."""
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="cli")
db.append_message(session_id="s1", role="user", content="docker kubernetes aws deployment")
db.append_message(session_id="s2", role="user", content="docker kubernetes only two terms")
results = db.search_by_terms(["docker", "kubernetes", "aws"])
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" not in sids
class TestDeleteSessionCleansTermIndex:
"""Verify delete_session() and prune_sessions() clean term_index."""
def test_delete_session_removes_term_entries(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="docker deploy")
db.append_message(session_id="s1", role="assistant", content="docker is running")
db.delete_session(session_id="s1")
cursor = db._conn.execute(
"SELECT COUNT(*) FROM term_index WHERE session_id = 's1'"
)
assert cursor.fetchone()[0] == 0
def test_delete_session_does_not_affect_other_sessions(self, db):
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="cli")
db.append_message(session_id="s1", role="user", content="docker one")
db.append_message(session_id="s2", role="user", content="docker two")
db.delete_session(session_id="s1")
results = db.search_by_terms(["docker"])
sids = [r["session_id"] for r in results]
assert "s2" in sids
assert "s1" not in sids
class TestFastSearchSessionResolution:
"""BUG 2: _fast_search didn't resolve child sessions to parent.
A delegation child and its parent both containing "docker" would appear
as two separate results. They should be resolved to the parent session.
Also, current session lineage exclusion must cover the entire chain.
"""
def test_child_resolved_to_parent(self, db):
"""Parent + child matching same term should return 1 result (parent)."""
import json
from tools.session_search_tool import _fast_search
db.create_session(session_id="parent-1", source="cli")
db.create_session(session_id="child-1", source="cli", parent_session_id="parent-1")
db.append_message(session_id="parent-1", role="user", content="docker setup question")
db.append_message(session_id="child-1", role="assistant", content="docker setup done")
result = json.loads(_fast_search(query="docker", db=db, limit=5, current_session_id=None))
assert result["success"]
sids = [e["session_id"] for e in result["results"]]
# Should collapse to parent, not show both
assert "child-1" not in sids, "Child should be resolved to parent"
assert "parent-1" in sids
assert len(result["results"]) == 1
def test_match_count_accumulates_from_children(self, db):
"""Match_count should sum parent + child matches."""
import json
from tools.session_search_tool import _fast_search
db.create_session(session_id="p", source="cli")
db.create_session(session_id="c", source="cli", parent_session_id="p")
db.append_message(session_id="p", role="user", content="docker question")
db.append_message(session_id="c", role="assistant", content="docker answer")
result = json.loads(_fast_search(query="docker", db=db, limit=5, current_session_id=None))
entry = result["results"][0]
assert entry["session_id"] == "p"
assert entry["match_count"] >= 2, f"Expected accumulated count >= 2, got {entry['match_count']}"
def test_current_session_lineage_excludes_children(self, db):
"""When current session is a child, parent should also be excluded."""
import json
from tools.session_search_tool import _fast_search
db.create_session(session_id="parent-2", source="cli")
db.create_session(session_id="child-2", source="cli", parent_session_id="parent-2")
db.create_session(session_id="unrelated", source="cli")
db.append_message(session_id="parent-2", role="user", content="docker deploy")
db.append_message(session_id="child-2", role="assistant", content="docker deployed")
db.append_message(session_id="unrelated", role="user", content="docker build")
# Current session = child -> should exclude parent-2 AND child-2, keep unrelated
result = json.loads(_fast_search(query="docker", db=db, limit=5, current_session_id="child-2"))
sids = [e["session_id"] for e in result["results"]]
assert "parent-2" not in sids, "Parent of current should be excluded"
assert "child-2" not in sids, "Current child should be excluded"
assert "unrelated" in sids, "Unrelated session should appear"
class TestGetChildSessionIds:
"""Tests for SessionDB.get_child_session_ids -- public API replacing
direct db._lock/db._conn access in _fast_search."""
def test_returns_child_ids(self, db):
db.create_session(session_id="parent", source="cli")
db.create_session(session_id="child-1", source="delegation", parent_session_id="parent")
db.create_session(session_id="child-2", source="compression", parent_session_id="parent")
db.create_session(session_id="orphan", source="cli")
children = db.get_child_session_ids("parent")
assert set(children) == {"child-1", "child-2"}
def test_returns_empty_for_leaf_session(self, db):
db.create_session(session_id="leaf", source="cli")
assert db.get_child_session_ids("leaf") == []
def test_returns_empty_for_no_args(self, db):
assert db.get_child_session_ids() == []
def test_multiple_parent_ids(self, db):
db.create_session(session_id="p1", source="cli")
db.create_session(session_id="p2", source="cli")
db.create_session(session_id="c1", source="delegation", parent_session_id="p1")
db.create_session(session_id="c2", source="delegation", parent_session_id="p2")
children = db.get_child_session_ids("p1", "p2")
assert set(children) == {"c1", "c2"}
def test_does_not_recurse(self, db):
"""Only direct children, not grandchildren."""
db.create_session(session_id="root", source="cli")
db.create_session(session_id="child", source="delegation", parent_session_id="root")
db.create_session(session_id="grandchild", source="delegation", parent_session_id="child")
children = db.get_child_session_ids("root")
assert children == ["child"]
class TestCJKFallbackInFastSearch:
"""CJK queries should fall through to the slow path even when fast=True.
The term index can't handle CJK because extract_terms() splits on
whitespace, and CJK languages don't use spaces between words.
session_search should detect this and use the FTS5+LIKE fallback.
"""
def test_cjk_query_bypasses_fast_path(self, db):
"""A CJK query with fast=True should be downgraded to fast=False."""
import json
from tools.session_search_tool import session_search
db.create_session(session_id="cjk-1", source="cli")
db.append_message(session_id="cjk-1", role="user", content="测试中文搜索")
# fast=True, but CJK query should fall through to full search
result = json.loads(session_search(
query="中文", db=db, limit=3, fast=True, current_session_id=None
))
# The result should come from the slow path (mode="full")
# not the fast path (mode="fast") since CJK triggers fallback
assert result["success"]
# mode should be "full" (not "fast") because CJK forced the fallback
assert result.get("mode") != "fast"
def test_english_query_stays_fast(self, db):
"""Non-CJK queries should still use the fast path."""
import json
from tools.session_search_tool import session_search
db.create_session(session_id="eng-1", source="cli")
db.append_message(session_id="eng-1", role="user", content="deploy the server")
result = json.loads(session_search(
query="deploy", db=db, limit=3, fast=True, current_session_id=None
))
assert result["success"]
assert result.get("mode") == "fast"

View file

@ -321,11 +321,14 @@ def session_search(
limit: int = 3,
db=None,
current_session_id: str = None,
fast: bool = True,
) -> str:
"""
Search past sessions and return focused summaries of matching conversations.
Uses FTS5 to find matches, then summarizes the top sessions with Gemini Flash.
By default (fast=True), uses the term_index inverted index for instant
results with session metadata and match counts no LLM calls needed.
Set fast=False to use FTS5 + LLM summarization for detailed summaries.
The current session is excluded from results since the agent already has that context.
"""
if db is None:
@ -348,6 +351,120 @@ def session_search(
query = query.strip()
# CJK queries can't be handled by the term index (no word boundaries
# for extract_terms to split on). Fall through to FTS5 + LIKE which
# has a CJK bigram/LIKE fallback.
if fast and db._contains_cjk(query):
fast = False
# ── Fast path: term index (no LLM, ~1ms) ──────────────────────────
if fast:
return _fast_search(query, db, limit, current_session_id)
# ── Slow path: FTS5 + LLM summarization (~5-15s) ───────────────────
return _full_search(query, role_filter, limit, db, current_session_id)
def _fast_search(query: str, db, limit: int, current_session_id: str = None) -> str:
"""Term index fast path: instant search, no LLM calls."""
from term_index import extract_terms
terms = extract_terms(query)
if not terms:
return json.dumps({
"success": True,
"query": query,
"results": [],
"count": 0,
"message": "No searchable terms in query (all stop words or empty).",
}, ensure_ascii=False)
# Fetch extra results so we have room after dedup/lineage exclusion
raw_results = db.search_by_terms(
terms=terms,
exclude_sources=list(_HIDDEN_SESSION_SOURCES),
limit=limit * 3,
)
# Resolve child sessions to their parent root, just like _full_search.
# Delegation stores detailed content in child sessions, but the user
# sees the parent conversation. Without this, parent + child both
# containing "docker" would appear as two separate results.
def _resolve_to_parent(session_id: str) -> str:
visited = set()
sid = session_id
while sid and sid not in visited:
visited.add(sid)
try:
session = db.get_session(sid)
if not session:
break
parent = session.get("parent_session_id")
if parent:
sid = parent
else:
break
except Exception:
break
return sid
# Determine current session lineage for exclusion
current_lineage = set()
if current_session_id:
# Walk parent chain AND collect all children
root = _resolve_to_parent(current_session_id)
current_lineage.add(root)
current_lineage.add(current_session_id)
# Also find any child sessions of the current root
try:
children = db.get_child_session_ids(root, current_session_id)
current_lineage.update(children)
except Exception:
pass
seen_sessions = {}
for r in raw_results:
raw_sid = r.get("session_id", "")
resolved_sid = _resolve_to_parent(raw_sid)
if resolved_sid in current_lineage or raw_sid in current_lineage:
continue
if resolved_sid not in seen_sessions:
# Sum match_count from child into parent
seen_sessions[resolved_sid] = dict(r)
seen_sessions[resolved_sid]["session_id"] = resolved_sid
else:
# Accumulate match_count from child sessions
seen_sessions[resolved_sid]["match_count"] = (
seen_sessions[resolved_sid].get("match_count", 0)
+ r.get("match_count", 0)
)
if len(seen_sessions) >= limit:
break
entries = []
for sid, r in seen_sessions.items():
entries.append({
"session_id": sid,
"when": _format_timestamp(r.get("session_started")),
"source": r.get("source", "unknown"),
"model": r.get("model"),
"title": r.get("title"),
"match_count": r.get("match_count", 0),
})
return json.dumps({
"success": True,
"query": query,
"mode": "fast",
"results": entries,
"count": len(entries),
"message": f"Found {len(entries)} matching sessions via term index (instant, no LLM)."
f" Use fast=False for LLM-summarized results.",
}, ensure_ascii=False)
def _full_search(query: str, role_filter: str, limit: int, db, current_session_id: str = None) -> str:
"""FTS5 + LLM summarization path (original behavior)."""
try:
# Parse role filter
role_list = None
@ -367,6 +484,7 @@ def session_search(
return json.dumps({
"success": True,
"query": query,
"mode": "full",
"results": [],
"count": 0,
"message": "No matching sessions found.",
@ -506,6 +624,7 @@ def session_search(
return json.dumps({
"success": True,
"query": query,
"mode": "full",
"results": summaries,
"count": len(summaries),
"sessions_searched": len(seen_sessions),
@ -535,7 +654,8 @@ SESSION_SEARCH_SCHEMA = {
"Returns titles, previews, and timestamps. Zero LLM cost, instant. "
"Start here when the user asks what were we working on or what did we do recently.\n"
"2. Keyword search (with query): Search for specific topics across all past sessions. "
"Returns LLM-generated summaries of matching sessions.\n\n"
"By default uses the term index for instant results (no LLM). "
"Set fast=False for detailed LLM-generated summaries.\n\n"
"USE THIS PROACTIVELY when:\n"
"- The user says 'we did this before', 'remember when', 'last time', 'as I mentioned'\n"
"- The user asks about a topic you worked on before but don't have in current context\n"
@ -544,11 +664,15 @@ SESSION_SEARCH_SCHEMA = {
"- The user asks 'what did we do about X?' or 'how did we fix Y?'\n\n"
"Don't hesitate to search when it is actually cross-session -- it's fast and cheap. "
"Better to search and confirm than to guess or ask the user to repeat themselves.\n\n"
"Search syntax: keywords joined with OR for broad recall (elevenlabs OR baseten OR funding), "
"phrases for exact match (\"docker networking\"), boolean (python NOT java), prefix (deploy*). "
"IMPORTANT: Use OR between keywords for best results — FTS5 defaults to AND which misses "
"sessions that only mention some terms. If a broad OR query returns nothing, try individual "
"keyword searches in parallel. Returns summaries of the top matching sessions."
"Search syntax depends on the mode:\n"
"- fast=True (default): Simple keyword search with AND semantics. Multiple words "
"all must appear in a session. No boolean operators or phrase matching. "
"Instant, zero LLM cost.\n"
"- fast=False: FTS5 full syntax — OR for broad recall (elevenlabs OR baseten OR funding), "
"phrases for exact match (\\\"docker networking\\\"), boolean (python NOT java), "
"prefix (deploy*). IMPORTANT: Use OR between keywords for best FTS5 results — "
"it defaults to AND which misses sessions that only mention some terms. "
"Slower (5-15s) but returns LLM-summarized results."
),
"parameters": {
"type": "object",
@ -559,13 +683,18 @@ SESSION_SEARCH_SCHEMA = {
},
"role_filter": {
"type": "string",
"description": "Optional: only search messages from specific roles (comma-separated). E.g. 'user,assistant' to skip tool outputs.",
"description": "Optional: only search messages from specific roles (comma-separated). E.g. 'user,assistant' to skip tool outputs. Only used when fast=False.",
},
"limit": {
"type": "integer",
"description": "Max sessions to summarize (default: 3, max: 5).",
"default": 3,
},
"fast": {
"type": "boolean",
"description": "When true (default), use the term index for instant results with no LLM cost. When false, use FTS5 + LLM summarization for detailed summaries.",
"default": True,
},
},
"required": [],
},
@ -583,6 +712,7 @@ registry.register(
query=args.get("query") or "",
role_filter=args.get("role_filter"),
limit=args.get("limit", 3),
fast=args.get("fast", True),
db=kw.get("db"),
current_session_id=kw.get("current_session_id")),
check_fn=check_session_search_requirements,