This commit is contained in:
aj-nt 2026-04-25 00:24:31 +00:00 committed by GitHub
commit 02965c7274
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 1840 additions and 15 deletions

View file

@ -31,7 +31,7 @@ T = TypeVar("T")
DEFAULT_DB_PATH = get_hermes_home() / "state.db"
SCHEMA_VERSION = 8
SCHEMA_VERSION = 9
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS schema_version (
@ -95,6 +95,13 @@ CREATE INDEX IF NOT EXISTS idx_sessions_source ON sessions(source);
CREATE INDEX IF NOT EXISTS idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX IF NOT EXISTS idx_sessions_started ON sessions(started_at DESC);
CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id, timestamp);
CREATE TABLE IF NOT EXISTS term_index (
term TEXT NOT NULL,
message_id INTEGER NOT NULL REFERENCES messages(id),
session_id TEXT NOT NULL REFERENCES sessions(id),
PRIMARY KEY (term, message_id)
) WITHOUT ROWID;
"""
FTS_SQL = """
@ -164,7 +171,14 @@ class SessionDB:
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA foreign_keys=ON")
self._init_schema()
needs_reindex = self._init_schema()
# If we just migrated to v7, backfill the term index from existing
# messages. This runs outside _init_schema so we can use
# _execute_write (which manages its own transactions).
if needs_reindex:
logger.info("v7 migration detected — backfilling term index")
self.reindex_term_index()
# ── Core write helper ──
@ -257,11 +271,17 @@ class SessionDB:
self._conn = None
def _init_schema(self):
"""Create tables and FTS if they don't exist, run migrations."""
"""Create tables and FTS if they don't exist, run migrations.
Returns True if a v7 migration was performed (term_index created
and needs backfill), False otherwise.
"""
cursor = self._conn.cursor()
cursor.executescript(SCHEMA_SQL)
needs_reindex = False
# Check schema version and run migrations
cursor.execute("SELECT version FROM schema_version LIMIT 1")
row = cursor.fetchone()
@ -356,6 +376,24 @@ class SessionDB:
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 8")
if current_version < 9:
# v9: add term_index table for inverted-index session search.
# This is the clustered (term, message_id) WITHOUT ROWID table
# used by the session_search fast path. After creating the
# table, we backfill from existing messages.
try:
cursor.execute("""
CREATE TABLE IF NOT EXISTS term_index (
term TEXT NOT NULL,
message_id INTEGER NOT NULL REFERENCES messages(id),
session_id TEXT NOT NULL REFERENCES sessions(id),
PRIMARY KEY (term, message_id)
) WITHOUT ROWID
""")
except sqlite3.OperationalError:
pass # Table already exists
cursor.execute("UPDATE schema_version SET version = 9")
needs_reindex = True
# Unique title index — always ensure it exists (safe to run after migrations
# since the title column is guaranteed to exist at this point)
@ -375,6 +413,8 @@ class SessionDB:
self._conn.commit()
return needs_reindex
# =========================================================================
# Session lifecycle
# =========================================================================
@ -569,6 +609,23 @@ class SessionDB:
row = cursor.fetchone()
return dict(row) if row else None
def get_child_session_ids(self, *parent_ids: str) -> List[str]:
"""Return IDs of sessions whose parent_session_id is in *parent_ids*.
Useful for finding delegation/compression child sessions that
belong to a parent conversation. Does NOT recurse only
direct children are returned.
"""
if not parent_ids:
return []
placeholders = ",".join("?" for _ in parent_ids)
with self._lock:
cursor = self._conn.execute(
f"SELECT id FROM sessions WHERE parent_session_id IN ({placeholders})",
list(parent_ids),
)
return [row["id"] for row in cursor.fetchall()]
def resolve_session_id(self, session_id_or_prefix: str) -> Optional[str]:
"""Resolve an exact or uniquely prefixed session ID to the full ID.
@ -1015,6 +1072,24 @@ class SessionDB:
"UPDATE sessions SET message_count = message_count + 1 WHERE id = ?",
(session_id,),
)
# Insert terms into inverted index (delayed import avoids
# circular dependency — hermes_state is imported by nearly
# everything at startup, term_index must not be top-level)
# Skip tool-role messages: their structured JSON output produces
# noise terms (field names, numeric values) with no search value.
if content and role != "tool":
try:
from term_index import extract_terms
terms = extract_terms(content)
if terms:
conn.executemany(
"INSERT OR IGNORE INTO term_index (term, message_id, session_id) VALUES (?, ?, ?)",
[(t, msg_id, session_id) for t in terms],
)
except Exception:
pass # Term indexing is best-effort; never block a message insert
return msg_id
return self._execute_write(_do)
@ -1395,6 +1470,390 @@ class SessionDB:
return matches
# =========================================================================
# Term index search (inverted index fast path)
# =========================================================================
def search_by_terms(
self,
terms: List[str],
exclude_sources: List[str] = None,
limit: int = 10,
) -> List[Dict[str, Any]]:
"""
Search sessions using the term_index inverted index.
Takes a list of query terms, finds all message IDs for each term,
intersects them (AND logic), and returns matching sessions with
metadata and match counts.
This is the fast path for session search -- no LLM calls needed.
"""
if not terms:
return []
# Filter out stop words from the query (belt and suspenders)
# Delayed import avoids circular dependency (same pattern as append_message)
from stop_words import is_stop_word
filtered = [t.lower() for t in terms if t and not is_stop_word(t.lower())]
if not filtered:
return []
# Build the query. For single terms, a simple WHERE suffices.
# For multiple terms, we use GROUP BY session_id + HAVING COUNT(DISTINCT term) = N
# to enforce AND semantics: only sessions containing ALL query terms match.
if len(filtered) == 1:
term = filtered[0]
params: list = [term]
where_sql = "ti.term = ?"
if exclude_sources:
exclude_placeholders = ",".join("?" for _ in exclude_sources)
where_sql += f" AND s.source NOT IN ({exclude_placeholders})"
params.extend(exclude_sources)
sql = f"""
SELECT ti.session_id,
s.source,
s.model,
s.started_at AS session_started,
s.title,
COUNT(DISTINCT ti.message_id) AS match_count
FROM term_index ti
JOIN sessions s ON s.id = ti.session_id
WHERE {where_sql}
GROUP BY ti.session_id
ORDER BY match_count DESC, s.started_at DESC
LIMIT ?
"""
params.append(limit)
else:
# Multi-term: GROUP BY + HAVING COUNT(DISTINCT term) = N enforces AND
term_placeholders = ",".join("?" for _ in filtered)
params = list(filtered)
exclude_sql = ""
if exclude_sources:
exclude_sql = f" AND s.source NOT IN ({','.join('?' for _ in exclude_sources)})"
params.extend(exclude_sources)
params.extend([len(filtered), limit])
sql = f"""
SELECT ti.session_id,
s.source,
s.model,
s.started_at AS session_started,
s.title,
COUNT(DISTINCT ti.term) AS term_count,
COUNT(DISTINCT ti.message_id) AS match_count
FROM term_index ti
JOIN sessions s ON s.id = ti.session_id
WHERE ti.term IN ({term_placeholders})
{exclude_sql}
GROUP BY ti.session_id
HAVING COUNT(DISTINCT ti.term) = ?
ORDER BY match_count DESC, s.started_at DESC
LIMIT ?
"""
with self._lock:
try:
cursor = self._conn.execute(sql, params)
results = [dict(row) for row in cursor.fetchall()]
except sqlite3.OperationalError:
logger.debug("term_index query failed", exc_info=True)
return []
return results
def reindex_term_index(self, batch_size: int = 500) -> int:
"""
Rebuild the term_index from existing messages.
Processes messages in batches to avoid holding the write lock too
long. Returns the total number of term entries inserted.
Uses a swap strategy: builds a temporary table, then swaps it
into place in a single transaction. This avoids the empty-index
window that would occur with a simple clear-and-repopulate.
"""
from term_index import extract_terms
# Count total messages to index
with self._lock:
total = self._conn.execute("SELECT COUNT(*) FROM messages").fetchone()[0]
if total == 0:
return 0
inserted = 0
offset = 0
# Create a temporary table with the same schema as term_index.
# We'll populate this, then swap it into place in a single
# transaction — no empty-index window for concurrent readers.
def _create_temp(conn):
conn.execute("""
CREATE TABLE IF NOT EXISTS _term_index_new (
term TEXT NOT NULL,
message_id INTEGER NOT NULL,
session_id TEXT NOT NULL,
PRIMARY KEY (term, message_id)
) WITHOUT ROWID
""")
conn.execute("DELETE FROM _term_index_new")
self._execute_write(_create_temp)
while offset < total:
# Read batch outside write lock
with self._lock:
cursor = self._conn.execute(
"SELECT id, session_id, role, content FROM messages ORDER BY id LIMIT ? OFFSET ?",
(batch_size, offset),
)
rows = cursor.fetchall()
if not rows:
break
# Extract terms for the batch, skipping tool-role messages
entries = []
for row in rows:
msg_id = row["id"]
session_id = row["session_id"]
# Skip tool messages — structured JSON output produces noise terms
if row["role"] == "tool":
continue
content = row["content"] or ""
terms = extract_terms(content)
for term in terms:
entries.append((term, msg_id, session_id))
# Write batch to temp table
if entries:
def _insert(conn, _entries=entries):
conn.executemany(
"INSERT OR IGNORE INTO _term_index_new (term, message_id, session_id) VALUES (?, ?, ?)",
_entries,
)
return len(_entries)
inserted += self._execute_write(_insert)
offset += batch_size
# Swap: replace term_index with the new table in one transaction.
# Concurrent readers see either the old index or the new one —
# never an empty table.
def _swap(conn):
conn.execute("DELETE FROM term_index")
conn.execute("""
INSERT INTO term_index (term, message_id, session_id)
SELECT term, message_id, session_id FROM _term_index_new
""")
conn.execute("DROP TABLE _term_index_new")
self._execute_write(_swap)
logger.info("Reindexed term_index: %d entries from %d messages", inserted, total)
return inserted
def backfill_messages_from_json(
self,
sessions_dir: Path = None,
) -> int:
"""Scan JSON session files for sessions missing from the messages table.
Sessions created while running code without ``append_message()`` / term
index support (e.g. old ``main``) have JSON files on disk but no
corresponding rows in ``messages``. This method:
1. Scans ``sessions/session_*.json`` for session IDs.
2. Finds which sessions are missing from the ``messages`` table
(either the session row doesn't exist, or it exists but has 0
messages).
3. Parses each missing session's JSON file and inserts both the
session row (if absent) and all its messages.
4. Reindexes the term index to pick up new terms.
Returns the total number of messages inserted.
Idempotent: sessions that already have messages are skipped, so
calling this twice returns 0 the second time.
"""
if sessions_dir is None:
sessions_dir = DEFAULT_DB_PATH.parent / "sessions"
sessions_dir = Path(sessions_dir)
if not sessions_dir.is_dir():
logger.warning("Sessions directory not found: %s", sessions_dir)
return 0
import glob as _glob
# ── Phase 1: discover which sessions need backfill ──────────
json_files = sorted(sessions_dir.glob("session_*.json"))
if not json_files:
return 0
# Extract session IDs from filenames (session_20260410_022816_1b7721.json
# -> 20260410_022816_1b7721)
file_to_sid = {}
for f in json_files:
sid = f.stem.replace("session_", "")
file_to_sid[f] = sid
# Find session IDs that have no messages in the DB
all_sids = set(file_to_sid.values())
with self._lock:
placeholders = ",".join("?" * len(all_sids))
rows = self._conn.execute(
f"SELECT s.id FROM sessions s "
f"INNER JOIN messages m ON m.session_id = s.id "
f"WHERE s.id IN ({placeholders})",
list(all_sids),
).fetchall()
sids_with_messages = {r["id"] if hasattr(r, "keys") else r[0] for r in rows}
sids_needing_backfill = all_sids - sids_with_messages
if not sids_needing_backfill:
logger.info("backfill: all sessions already have messages")
return 0
logger.info(
"backfill: %d of %d sessions need message import",
len(sids_needing_backfill), len(all_sids),
)
# ── Phase 2: insert session rows and messages ───────────────
total_inserted = 0
from term_index import extract_terms
for f, sid in file_to_sid.items():
if sid not in sids_needing_backfill:
continue
try:
data = json.loads(f.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError) as exc:
logger.warning("backfill: skipping %s: %s", f.name, exc)
continue
messages = data.get("messages") or []
if not messages:
# Create the session row even if empty, so we track it
self._ensure_session_from_json(sid, data)
continue
# Ensure session row exists
self._ensure_session_from_json(sid, data)
# Insert messages. We synthesise monotonically increasing
# timestamps from session_start since JSON messages lack them.
base_ts = self._parse_session_start(data.get("session_start"))
msg_idx = 0
for msg in messages:
role = msg.get("role", "unknown")
content = msg.get("content")
tool_calls = msg.get("tool_calls")
tool_call_id = msg.get("tool_call_id")
finish_reason = msg.get("finish_reason")
reasoning = msg.get("reasoning") if role == "assistant" else None
# Synthesise timestamp: base_ts + sequential offset
timestamp = base_ts + msg_idx
msg_idx += 1
def _insert_msg(conn, _sid=sid, _role=role, _content=content,
_tc=tool_calls, _tci=tool_call_id,
_fr=finish_reason, _reasoning=reasoning,
_ts=timestamp, _msg=msg, _extract=extract_terms):
tool_calls_json = json.dumps(_tc) if _tc else None
cursor = conn.execute(
"""INSERT INTO messages (session_id, role, content,
tool_call_id, tool_calls, tool_name, timestamp,
token_count, finish_reason, reasoning)
VALUES (?, ?, ?, ?, ?, NULL, ?, NULL, ?, ?)""",
(
_sid,
_role,
_content,
_tci,
tool_calls_json,
_ts,
_fr,
_reasoning,
),
)
msg_id = cursor.lastrowid
# Increment message_count on the session row
conn.execute(
"UPDATE sessions SET message_count = message_count + 1 "
"WHERE id = ?",
(_sid,),
)
# Term indexing: skip tool-role messages (noise)
if _content and _role != "tool":
try:
terms = _extract(_content)
if terms:
conn.executemany(
"INSERT OR IGNORE INTO term_index "
"(term, message_id, session_id) VALUES (?, ?, ?)",
[(t, msg_id, _sid) for t in terms],
)
except Exception:
pass # Term indexing is best-effort
return 1
total_inserted += self._execute_write(_insert_msg)
logger.info(
"backfill: inserted %d messages from %d sessions",
total_inserted, len(sids_needing_backfill),
)
# Phase 3: reindex to ensure consistency (the per-message inserts
# above already indexed individual messages, but a full reindex
# guarantees the term_index is consistent with the messages table)
self.reindex_term_index()
return total_inserted
def _ensure_session_from_json(self, session_id: str, data: dict) -> None:
"""Create a session row from JSON data if it doesn't already exist."""
model = data.get("model")
platform = data.get("platform", "cli")
system_prompt = data.get("system_prompt")
session_start = data.get("session_start")
started_at = self._parse_session_start(session_start) if session_start else time.time()
def _do(conn):
conn.execute(
"""INSERT OR IGNORE INTO sessions (id, source, model, system_prompt, started_at)
VALUES (?, ?, ?, ?, ?)""",
(session_id, platform, model, system_prompt, started_at),
)
self._execute_write(_do)
@staticmethod
def _parse_session_start(session_start: str) -> float:
"""Parse ISO-format session_start to Unix timestamp.
Falls back to current time if parsing fails.
"""
if not session_start:
return time.time()
for fmt in ("%Y-%m-%dT%H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S"):
try:
from datetime import datetime
dt = datetime.strptime(session_start, fmt)
return dt.timestamp()
except (ValueError, TypeError):
continue
return time.time()
def search_sessions(
self,
source: str = None,
@ -1466,8 +1925,14 @@ class SessionDB:
return results
def clear_messages(self, session_id: str) -> None:
"""Delete all messages for a session and reset its counters."""
"""Delete all messages for a session and reset its counters.
Also removes stale term_index entries that reference the deleted messages.
"""
def _do(conn):
conn.execute(
"DELETE FROM term_index WHERE session_id = ?", (session_id,)
)
conn.execute(
"DELETE FROM messages WHERE session_id = ?", (session_id,)
)
@ -1496,6 +1961,7 @@ class SessionDB:
"WHERE parent_session_id = ?",
(session_id,),
)
conn.execute("DELETE FROM term_index WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
return True
@ -1536,6 +2002,7 @@ class SessionDB:
)
for sid in session_ids:
conn.execute("DELETE FROM term_index WHERE session_id = ?", (sid,))
conn.execute("DELETE FROM messages WHERE session_id = ?", (sid,))
conn.execute("DELETE FROM sessions WHERE id = ?", (sid,))
return len(session_ids)

0
state.db Normal file
View file

79
stop_words.py Normal file
View file

@ -0,0 +1,79 @@
"""Stop word list for term index extraction.
Uses the well-known NLTK English stop word list (179 words) as a baseline,
plus common JSON schema keys from tool output and pure-numeric filter.
This module is self-contained -- no external dependencies.
"""
import re
# Standard English stop words (NLTK list, public domain)
# Covers articles, conjunctions, prepositions, pronouns, auxiliary verbs,
# and common function words. Intentionally excludes short tech terms
# that overlap (e.g., "go", "it" as in IT/InfoTech handled by context).
_ENGLISH_STOP_WORDS = frozenset(
w.lower() for w in [
"i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you",
"your", "yours", "yourself", "yourselves", "he", "him", "his",
"himself", "she", "her", "hers", "herself", "it", "its", "itself",
"they", "them", "their", "theirs", "themselves", "what", "which",
"who", "whom", "this", "that", "these", "those", "am", "is", "are",
"was", "were", "be", "been", "being", "have", "has", "had", "having",
"do", "does", "did", "doing", "a", "an", "the", "and", "but", "if",
"or", "because", "as", "until", "while", "of", "at", "by", "for",
"with", "about", "against", "between", "through", "during", "before",
"after", "above", "below", "to", "from", "up", "down", "in", "out",
"on", "off", "over", "under", "again", "further", "then", "once",
"here", "there", "when", "where", "why", "how", "all", "both", "each",
"few", "more", "most", "other", "some", "such", "no", "nor", "not",
"only", "own", "same", "so", "than", "too", "very", "s", "t", "can",
"will", "just", "don", "should", "now", "d", "ll", "m", "o", "re",
"ve", "y", "ain", "aren", "couldn", "didn", "doesn", "hadn", "hasn",
"haven", "isn", "ma", "mightn", "mustn", "needn", "shan", "shouldn",
"wasn", "weren", "won", "wouldn",
]
)
# JSON schema keys that appear constantly in tool output.
# These are field names from structured tool responses, not semantic content.
# Nobody searches for "exit_code" to find a past session.
_JSON_KEY_STOP_WORDS = frozenset([
"output",
"exit_code",
"error",
"null",
"true",
"false",
"status",
"content",
"message",
"cleared",
"success",
])
# Combined stop word set
_STOP_WORDS = _ENGLISH_STOP_WORDS | _JSON_KEY_STOP_WORDS
# Pattern to detect pure numeric tokens (integers, floats, hex)
_NUMERIC_RE = re.compile(r"^[0-9]+$")
def is_stop_word(word: str) -> bool:
"""Check if a word is a stop word. Case-insensitive."""
return word.lower() in _STOP_WORDS
def is_noise_term(word: str) -> bool:
"""Check if a term is noise that should be excluded from the index.
This covers stop words AND pure numeric tokens, which provide zero
search value. Nobody searches for '0', '1', or '42' to find a session.
"""
lower = word.lower()
return lower in _STOP_WORDS or _NUMERIC_RE.match(lower) is not None
def get_stop_words() -> frozenset:
"""Return the full stop word set (for inspection/bulk use)."""
return _STOP_WORDS

47
term_index.py Normal file
View file

@ -0,0 +1,47 @@
"""Term index — inverted index extraction for session search fast path.
Extracts non-stop-word terms from message content for insertion into the
term_index table in SessionDB. Terms are lowercased, punctuation-stripped
(with preservation of path-like strings), and deduplicated per message.
Noise filtering:
- English stop words (NLTK list)
- JSON schema keys from tool output (output, exit_code, error, etc.)
- Pure numeric tokens (0, 1, 42, etc.)
"""
import re
from stop_words import is_noise_term
# Matches "words" including paths (foo/bar), filenames (file.py), and
# hyphenated terms (self-hosted). Filters out most punctuation but
# preserves dots in filenames and slashes in paths.
# Strategy: split on whitespace first, then strip leading/trailing punctuation.
_TERM_RE = re.compile(r"[a-zA-Z0-9][\w./\-]*[a-zA-Z0-9]|[a-zA-Z0-9]")
def extract_terms(content: str) -> list[str]:
"""Extract non-noise terms from message content.
Returns a deduplicated, lowercased list of terms.
Stop words, JSON keys, pure numerics, and empty strings are excluded.
"""
if not content:
return []
# Find candidate tokens
raw_tokens = _TERM_RE.findall(content)
seen = set()
terms = []
for token in raw_tokens:
lower = token.lower()
# Skip noise: stop words, JSON keys, pure numerics
if is_noise_term(lower):
continue
# Deduplicate within this message
if lower not in seen:
seen.add(lower)
terms.append(lower)
return terms

View file

@ -0,0 +1,387 @@
"""Regression guard for session search coverage: backfill JSON sessions into SQLite.
Session JSON files can exist on disk without corresponding rows in the
``messages`` table. This happens when sessions were created by code that
didn't call ``append_message()`` — for example, the old ``main`` before
term_index support was added, or sessions created via ``/new`` which
resets state before flushing.
``SessionDB.backfill_messages_from_json()`` scans the sessions directory
for JSON files whose session_id is missing from the ``messages`` table,
parses each file, inserts both the session row (if absent) and its
messages, then reindexes the term index to pick up the new data.
"""
import json
import time
from pathlib import Path
import pytest
from hermes_state import SessionDB
@pytest.fixture
def db(tmp_path):
return SessionDB(tmp_path / "state.db")
@pytest.fixture
def sessions_dir(tmp_path):
"""Create a sessions/ directory alongside the DB."""
d = tmp_path / "sessions"
d.mkdir()
return d
def _write_session(path, session_id, messages, model="test:model",
session_start="2026-04-20T12:00:00", platform="cli"):
"""Helper: write a session JSON file."""
data = {
"session_id": session_id,
"model": model,
"platform": platform,
"session_start": session_start,
"last_updated": session_start,
"system_prompt": "",
"tools": [],
"message_count": len(messages),
"messages": messages,
}
Path(path).write_text(json.dumps(data))
return data
# ── Session creation ──────────────────────────────────────────────────────
def test_creates_missing_session_row(db, sessions_dir):
"""Session in JSON but not in DB at all should be created."""
_write_session(
sessions_dir / "session_abc123.json",
"abc123",
[{"role": "user", "content": "Hello"}],
session_start="2026-04-20T12:00:00",
)
inserted = db.backfill_messages_from_json(sessions_dir)
assert inserted >= 1
# Session row should now exist
row = db._conn.execute(
"SELECT id, model, source FROM sessions WHERE id = ?", ("abc123",)
).fetchone()
assert row is not None
assert row["model"] == "test:model"
assert row["source"] == "cli"
def test_does_not_duplicate_existing_session(db, sessions_dir):
"""Session already in DB should not be re-created (INSERT OR IGNORE)."""
db.create_session("abc123", source="cli", model="existing:model")
_write_session(
sessions_dir / "session_abc123.json",
"abc123",
[{"role": "user", "content": "Hello"}],
)
db.backfill_messages_from_json(sessions_dir)
# Should still have only one session row, with the original model
rows = db._conn.execute(
"SELECT id, model FROM sessions WHERE id = ?", ("abc123",)
).fetchall()
assert len(rows) == 1
assert rows[0]["model"] == "existing:model"
# ── Message insertion ─────────────────────────────────────────────────────
def test_inserts_messages_for_empty_session(db, sessions_dir):
"""Session in DB with 0 messages gets its JSON messages inserted."""
db.create_session("abc123", source="cli")
_write_session(
sessions_dir / "session_abc123.json",
"abc123",
[
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi there"},
{"role": "tool", "content": '{"success": true}'},
],
)
db.backfill_messages_from_json(sessions_dir)
msgs = db._conn.execute(
"SELECT role, content FROM messages WHERE session_id = ? ORDER BY id",
("abc123",),
).fetchall()
# All 3 messages inserted
assert len(msgs) == 3
assert msgs[0]["role"] == "user"
assert msgs[0]["content"] == "Hello"
assert msgs[1]["role"] == "assistant"
# Tool messages ARE inserted (they have search value in backfill context)
assert msgs[2]["role"] == "tool"
def test_skips_session_with_existing_messages(db, sessions_dir):
"""Session already having messages in DB should not be re-inserted."""
db.create_session("abc123", source="cli")
db.append_message("abc123", role="user", content="Existing")
_write_session(
sessions_dir / "session_abc123.json",
"abc123",
[{"role": "user", "content": "From JSON"}],
)
db.backfill_messages_from_json(sessions_dir)
msgs = db._conn.execute(
"SELECT content FROM messages WHERE session_id = ? ORDER BY id",
("abc123",),
).fetchall()
# Should only have the original message, not the JSON one
assert len(msgs) == 1
assert msgs[0]["content"] == "Existing"
def test_synthesizes_timestamps_from_session_start(db, sessions_dir):
"""Messages without timestamps get monotonically increasing timestamps
derived from session_start."""
_write_session(
sessions_dir / "session_abc123.json",
"abc123",
[
{"role": "user", "content": "First"},
{"role": "assistant", "content": "Second"},
],
session_start="2026-04-20T12:00:00",
)
db.backfill_messages_from_json(sessions_dir)
msgs = db._conn.execute(
"SELECT timestamp FROM messages WHERE session_id = ? ORDER BY id",
("abc123",),
).fetchall()
# Timestamps should be monotonically increasing
assert msgs[0]["timestamp"] < msgs[1]["timestamp"]
# First timestamp should be close to the session start epoch
assert msgs[0]["timestamp"] > 1745000000 # ~2025
def test_handles_message_without_content(db, sessions_dir):
"""Messages with None/empty content should not crash backfill."""
_write_session(
sessions_dir / "session_abc123.json",
"abc123",
[
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": None}, # e.g. function_call only
],
)
db.backfill_messages_from_json(sessions_dir)
msgs = db._conn.execute(
"SELECT role, content FROM messages WHERE session_id = ? ORDER BY id",
("abc123",),
).fetchall()
assert len(msgs) == 2
def test_preserves_tool_calls_and_reasoning(db, sessions_dir):
"""Tool calls, finish_reason, and reasoning should be preserved."""
_write_session(
sessions_dir / "session_abc123.json",
"abc123",
[
{"role": "user", "content": "Run it"},
{
"role": "assistant",
"content": "Done",
"finish_reason": "stop",
"reasoning": "I thought about it",
"tool_calls": [
{"name": "terminal", "arguments": '{"command": "ls"}'},
],
},
],
)
db.backfill_messages_from_json(sessions_dir)
msgs = db._conn.execute(
"SELECT role, finish_reason, reasoning FROM messages WHERE session_id = ? ORDER BY id",
("abc123",),
).fetchall()
assert msgs[1]["finish_reason"] == "stop"
assert msgs[1]["reasoning"] == "I thought about it"
# ── Term index integration ────────────────────────────────────────────────
def test_backfill_populates_term_index(db, sessions_dir):
"""After backfill, term_index should contain terms from inserted messages."""
_write_session(
sessions_dir / "session_abc123.json",
"abc123",
[{"role": "user", "content": "proton mail himalaya bridge setup"}],
)
db.backfill_messages_from_json(sessions_dir)
# Term index should have entries for this session
terms = db._conn.execute(
"SELECT DISTINCT term FROM term_index WHERE session_id = ?",
("abc123",),
).fetchall()
term_set = {r["term"] for r in terms}
# Should contain at least some terms from the content
assert len(term_set) > 0
def test_backfill_skips_tool_messages_in_term_index(db, sessions_dir):
"""Tool messages should be inserted into messages table but NOT indexed."""
_write_session(
sessions_dir / "session_abc123.json",
"abc123",
[
{"role": "user", "content": "search for proton"},
{"role": "tool", "content": '{"success": true, "results": ["noise_term_xyz"]}'},
],
)
db.backfill_messages_from_json(sessions_dir)
# Both messages should be in messages table
msgs = db._conn.execute(
"SELECT role FROM messages WHERE session_id = ?", ("abc123",)
).fetchall()
roles = [r["role"] for r in msgs]
assert "user" in roles
assert "tool" in roles
# But tool message terms should NOT be in term_index
terms = db._conn.execute(
"SELECT term FROM term_index WHERE session_id = ?", ("abc123",)
).fetchall()
term_set = {r["term"] for r in terms}
assert "noise_term_xyz" not in term_set
# ── Edge cases ─────────────────────────────────────────────────────────────
def test_handles_malformed_json_gracefully(db, sessions_dir):
"""Malformed JSON files should be skipped, not crash the backfill."""
# Write a valid session
_write_session(
sessions_dir / "session_good.json",
"good",
[{"role": "user", "content": "Hello"}],
)
# Write a corrupted JSON file
(sessions_dir / "session_bad.json").write_text("{broken json")
# Write a non-session JSON file (different name pattern)
(sessions_dir / "request_dump_xyz.json").write_text('{"foo": "bar"}')
inserted = db.backfill_messages_from_json(sessions_dir)
# Should have inserted the good session's messages and moved on
assert inserted >= 1
def test_handles_empty_session_json(db, sessions_dir):
"""Session JSON with empty messages list should create session row only."""
_write_session(
sessions_dir / "session_empty.json",
"empty",
[], # No messages
)
db.backfill_messages_from_json(sessions_dir)
# Session should exist
row = db._conn.execute(
"SELECT id FROM sessions WHERE id = ?", ("empty",)
).fetchone()
assert row is not None
# But no messages
msgs = db._conn.execute(
"SELECT id FROM messages WHERE session_id = ?", ("empty",)
).fetchall()
assert len(msgs) == 0
def test_only_processes_session_prefix_files(db, sessions_dir):
"""Files not matching session_*.json pattern should be ignored."""
_write_session(
sessions_dir / "session_real.json",
"real",
[{"role": "user", "content": "Hello"}],
)
# Create a file that doesn't match session_*.json pattern
(sessions_dir / "request_dump_2026.json").write_text(json.dumps({
"session_id": "dump",
"messages": [{"role": "user", "content": "Should not be processed"}],
}))
db.backfill_messages_from_json(sessions_dir)
# Only the session_real should be in the DB
sessions = db._conn.execute("SELECT id FROM sessions").fetchall()
session_ids = {r["id"] for r in sessions}
assert "real" in session_ids
assert "dump" not in session_ids
def test_returns_count_of_inserted_messages(db, sessions_dir):
"""backfill_messages_from_json should return total messages inserted."""
_write_session(
sessions_dir / "session_s1.json",
"s1",
[
{"role": "user", "content": "Hello"},
{"role": "assistant", "content": "Hi"},
],
)
_write_session(
sessions_dir / "session_s2.json",
"s2",
[
{"role": "user", "content": "Hey"},
{"role": "assistant", "content": "Yo"},
{"role": "user", "content": "What's up"},
],
)
count = db.backfill_messages_from_json(sessions_dir)
assert count == 5 # 2 + 3
def test_idempotent(db, sessions_dir):
"""Running backfill twice should not duplicate messages."""
_write_session(
sessions_dir / "session_abc.json",
"abc",
[{"role": "user", "content": "Hello"}],
)
count1 = db.backfill_messages_from_json(sessions_dir)
count2 = db.backfill_messages_from_json(sessions_dir)
assert count1 == 1
assert count2 == 0 # Second run finds nothing new to insert
msgs = db._conn.execute(
"SELECT content FROM messages WHERE session_id = ?", ("abc",)
).fetchall()
assert len(msgs) == 1 # Not duplicated

View file

@ -1173,7 +1173,7 @@ class TestSchemaInit:
def test_schema_version(self, db):
cursor = db._conn.execute("SELECT version FROM schema_version")
version = cursor.fetchone()[0]
assert version == 8
assert version == 9
def test_title_column_exists(self, db):
"""Verify the title column was created in the sessions table."""
@ -1229,12 +1229,12 @@ class TestSchemaInit:
conn.commit()
conn.close()
# Open with SessionDB — should migrate to v8
# Open with SessionDB — should migrate to v9
migrated_db = SessionDB(db_path=db_path)
# Verify migration
cursor = migrated_db._conn.execute("SELECT version FROM schema_version")
assert cursor.fetchone()[0] == 8
assert cursor.fetchone()[0] == 9
# Verify title column exists and is NULL for existing sessions
session = migrated_db.get_session("existing")

715
tests/test_term_index.py Normal file
View file

@ -0,0 +1,715 @@
"""Tests for term_index — inverted index for session search fast path.
Covers: stop word filtering, term extraction, term insertion at write time,
term-based search with session-level results, multi-term intersection.
"""
import time
import pytest
from pathlib import Path
from hermes_state import SessionDB
@pytest.fixture()
def db(tmp_path):
"""Create a SessionDB with a temp database file."""
db_path = tmp_path / "test_state.db"
session_db = SessionDB(db_path=db_path)
yield session_db
session_db.close()
# =========================================================================
# Stop word filtering
# =========================================================================
class TestStopWords:
def test_common_english_words_are_stopped(self):
from stop_words import is_stop_word
for w in ["the", "and", "is", "in", "it", "of", "to", "a", "was", "for"]:
assert is_stop_word(w), f"'{w}' should be a stop word"
def test_case_insensitive_stop_words(self):
from stop_words import is_stop_word
assert is_stop_word("The")
assert is_stop_word("AND")
assert is_stop_word("Is")
def test_non_stop_words_pass(self):
from stop_words import is_stop_word
for w in ["docker", "kubernetes", "python", "hermes", "session"]:
assert not is_stop_word(w), f"'{w}' should NOT be a stop word"
def test_short_words_not_auto_stopped(self):
"""Single letters and 2-letter words that aren't in the list should pass."""
from stop_words import is_stop_word
# 'go' is a real tech term, 'I' is a stop word
assert not is_stop_word("go")
assert is_stop_word("I")
# =========================================================================
# Term extraction
# =========================================================================
class TestTermExtraction:
def test_extracts_words_from_content(self):
from term_index import extract_terms
terms = extract_terms("docker compose up -d")
assert "docker" in terms
assert "compose" in terms
def test_strips_punctuation(self):
from term_index import extract_terms
terms = extract_terms("It's working! Check the file.py, okay?")
assert "working" in terms
assert "file.py" in terms # dots in filenames preserved
assert "okay" in terms
def test_filters_stop_words(self):
from term_index import extract_terms
terms = extract_terms("the docker container is running in the background")
assert "the" not in terms
assert "is" not in terms
assert "in" not in terms
assert "docker" in terms
assert "container" in terms
assert "running" in terms
def test_case_folded(self):
from term_index import extract_terms
terms = extract_terms("Docker DOCKER docker")
# Should be case-folded to single term
assert len(terms) == len(set(terms)), "Terms should be deduplicated after case folding"
def test_empty_content(self):
from term_index import extract_terms
terms = extract_terms("")
assert terms == []
def test_none_content(self):
from term_index import extract_terms
terms = extract_terms(None)
assert terms == []
def test_preserves_paths_and_commands(self):
from term_index import extract_terms
terms = extract_terms("edited /etc/hosts and ran git push origin main")
assert "/etc/hosts" in terms or "etc/hosts" in terms # path fragment
assert "git" in terms
assert "push" in terms
# =========================================================================
# Term index insertion
# =========================================================================
class TestTermIndexInsertion:
def test_terms_inserted_on_append_message(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(
session_id="s1",
role="user",
content="I need to deploy the docker container",
)
# Should be able to find the message by term
results = db.search_by_terms(["docker"])
assert len(results) >= 1
assert any(r["session_id"] == "s1" for r in results)
def test_stop_words_not_indexed(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(
session_id="s1",
role="user",
content="the and is in of to a",
)
# All stop words — should find nothing
results = db.search_by_terms(["the", "and", "is"])
assert len(results) == 0
def test_same_term_multiple_messages_same_session(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="docker is great")
db.append_message(session_id="s1", role="assistant", content="docker compose ready")
results = db.search_by_terms(["docker"])
# Should return session once, not twice
sids = [r["session_id"] for r in results]
assert sids.count("s1") == 1
def test_term_indexed_across_sessions(self, db):
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="telegram")
db.append_message(session_id="s1", role="user", content="fix the docker bug")
db.append_message(session_id="s2", role="user", content="docker pull failed")
results = db.search_by_terms(["docker"])
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" in sids
# =========================================================================
# Term-based search
# =========================================================================
class TestTermSearch:
def test_single_term_search(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(
session_id="s1",
role="user",
content="I need to configure kubernetes",
)
results = db.search_by_terms(["kubernetes"])
assert len(results) >= 1
assert results[0]["session_id"] == "s1"
# Should include session metadata
assert "source" in results[0]
assert "started_at" in results[0] or "session_started" in results[0]
def test_multi_term_intersection(self, db):
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="cli")
db.create_session(session_id="s3", source="cli")
db.append_message(session_id="s1", role="user", content="docker networking issue")
db.append_message(session_id="s2", role="user", content="docker container running")
db.append_message(session_id="s3", role="user", content="kubernetes networking problem")
# Both "docker" AND "networking" should only match s1
results = db.search_by_terms(["docker", "networking"])
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" not in sids
assert "s3" not in sids
def test_search_returns_empty_for_stop_words_only(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="the and is")
results = db.search_by_terms(["the", "and"])
assert results == []
def test_search_excludes_hidden_sources(self, db):
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="tool")
db.append_message(session_id="s1", role="user", content="docker deployment")
db.append_message(session_id="s2", role="user", content="docker deployment tool")
results = db.search_by_terms(["docker"], exclude_sources=["tool"])
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" not in sids
def test_search_with_limit(self, db):
for i in range(5):
sid = f"s{i}"
db.create_session(session_id=sid, source="cli")
db.append_message(session_id=sid, role="user", content="python script")
results = db.search_by_terms(["python"], limit=3)
assert len(results) <= 3
def test_nonexistent_term_returns_empty(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="hello world")
results = db.search_by_terms(["nonexistent_xyzzy"])
assert results == []
def test_term_result_includes_match_count(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="docker docker docker")
db.append_message(session_id="s1", role="assistant", content="docker ready")
results = db.search_by_terms(["docker"])
assert len(results) >= 1
# Should tell us how many messages matched in the session
assert "match_count" in results[0]
# =========================================================================
# Schema and migration
# =========================================================================
class TestTermIndexSchema:
def test_term_index_table_exists(self, db):
cursor = db._conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='term_index'"
)
assert cursor.fetchone() is not None
def test_term_index_is_without_rowid(self, db):
cursor = db._conn.execute(
"SELECT sql FROM sqlite_master WHERE type='table' AND name='term_index'"
)
row = cursor.fetchone()
assert row is not None
assert "WITHOUT ROWID" in row[0]
def test_schema_version_bumped(self, db):
cursor = db._conn.execute("SELECT version FROM schema_version LIMIT 1")
version = cursor.fetchone()[0]
assert version >= 9
def test_existing_data_survives_migration(self, tmp_path):
"""Create a v6 DB, then open it with current code -- data should survive."""
# Build a v6 DB manually
db_path = tmp_path / "migrate.db"
db = SessionDB(db_path=db_path)
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="hello world")
db.close()
# Re-open -- migration should run, data intact
db2 = SessionDB(db_path=db_path)
session = db2.get_session("s1")
assert session is not None
assert session["source"] == "cli"
# term_index should now exist
cursor = db2._conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='term_index'"
)
assert cursor.fetchone() is not None
db2.close()
def test_v9_migration_auto_reindexes(self, tmp_path):
"""When a v6 DB with existing messages is opened, the v9 migration
should create the term_index and backfill it automatically."""
db_path = tmp_path / "migrate_v9.db"
# Step 1: Create a fresh DB, add messages, then manually downgrade
# to v6 so the next open triggers the migration path.
db = SessionDB(db_path=db_path)
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="cli")
db.append_message(session_id="s1", role="user", content="deploy the kubernetes cluster")
db.append_message(session_id="s2", role="user", content="debug docker networking issue")
db.close()
# Step 2: Re-open raw, manually set version to 6 and wipe term_index
# to simulate a pre-v7 DB.
import sqlite3
conn = sqlite3.connect(str(db_path))
conn.execute("UPDATE schema_version SET version = 6")
conn.execute("DROP TABLE IF EXISTS term_index")
conn.commit()
conn.close()
# Step 3: Open with SessionDB — should migrate to v9 and auto-reindex.
db2 = SessionDB(db_path=db_path)
# Verify version is now 9
cursor = db2._conn.execute("SELECT version FROM schema_version")
assert cursor.fetchone()[0] == 9
# Verify term_index is populated — search should find the terms
results = db2.search_by_terms(["kubernetes"])
assert len(results) >= 1
assert results[0]["session_id"] == "s1"
results2 = db2.search_by_terms(["docker"])
assert len(results2) >= 1
assert results2[0]["session_id"] == "s2"
db2.close()
# =========================================================================
# Regression tests for red-team QA bugs
# =========================================================================
class TestClearMessagesCleansTermIndex:
"""BUG 3: clear_messages() left stale term_index entries.
After clearing messages, search_by_terms should return zero results
for that session, not ghost matches pointing to deleted message IDs.
"""
def test_clear_messages_removes_term_entries(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="docker networking issue")
# Confirm indexed
results = db.search_by_terms(["docker"])
assert len(results) >= 1
# Clear messages
db.clear_messages(session_id="s1")
# Term entries should be gone
results = db.search_by_terms(["docker"])
assert results == []
def test_clear_messages_does_not_affect_other_sessions(self, db):
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="cli")
db.append_message(session_id="s1", role="user", content="docker test")
db.append_message(session_id="s2", role="user", content="docker prod")
db.clear_messages(session_id="s1")
# s2 should still be searchable
results = db.search_by_terms(["docker"])
sids = [r["session_id"] for r in results]
assert "s2" in sids
assert "s1" not in sids
def test_clear_messages_no_stray_term_rows(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="kubernetes deployment")
db.clear_messages(session_id="s1")
cursor = db._conn.execute(
"SELECT COUNT(*) FROM term_index WHERE session_id = 's1'"
)
assert cursor.fetchone()[0] == 0
class TestSearchByTermsParamBinding:
"""BUG 1: search_by_terms() had dead code with wrong param binding.
The multi-term GROUP BY + HAVING path is the one that actually runs.
These tests verify parameter binding is correct for both single and
multi-term queries, including with exclude_sources.
"""
def test_single_term_with_exclude_sources(self, db):
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="tool")
db.append_message(session_id="s1", role="user", content="docker deploy")
db.append_message(session_id="s2", role="user", content="docker deploy")
results = db.search_by_terms(["docker"], exclude_sources=["tool"])
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" not in sids
def test_multi_term_and_semantics(self, db):
"""Multi-term search should use AND: only sessions with ALL terms match."""
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="cli")
db.create_session(session_id="s3", source="cli")
db.append_message(session_id="s1", role="user", content="docker networking issue")
db.append_message(session_id="s2", role="user", content="docker container only")
db.append_message(session_id="s3", role="user", content="networking problem only")
results = db.search_by_terms(["docker", "networking"])
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" not in sids
assert "s3" not in sids
def test_multi_term_with_exclude_sources(self, db):
"""Multi-term + exclude_sources: param binding must be correct."""
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="tool")
db.append_message(session_id="s1", role="user", content="docker networking setup")
db.append_message(session_id="s2", role="user", content="docker networking deploy")
results = db.search_by_terms(
["docker", "networking"], exclude_sources=["tool"]
)
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" not in sids
def test_three_term_intersection(self, db):
"""Three-term AND: all three must be present in the session."""
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="cli")
db.append_message(session_id="s1", role="user", content="docker kubernetes aws deployment")
db.append_message(session_id="s2", role="user", content="docker kubernetes only two terms")
results = db.search_by_terms(["docker", "kubernetes", "aws"])
sids = [r["session_id"] for r in results]
assert "s1" in sids
assert "s2" not in sids
class TestDeleteSessionCleansTermIndex:
"""Verify delete_session() and prune_sessions() clean term_index."""
def test_delete_session_removes_term_entries(self, db):
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="docker deploy")
db.append_message(session_id="s1", role="assistant", content="docker is running")
db.delete_session(session_id="s1")
cursor = db._conn.execute(
"SELECT COUNT(*) FROM term_index WHERE session_id = 's1'"
)
assert cursor.fetchone()[0] == 0
def test_delete_session_does_not_affect_other_sessions(self, db):
db.create_session(session_id="s1", source="cli")
db.create_session(session_id="s2", source="cli")
db.append_message(session_id="s1", role="user", content="docker one")
db.append_message(session_id="s2", role="user", content="docker two")
db.delete_session(session_id="s1")
results = db.search_by_terms(["docker"])
sids = [r["session_id"] for r in results]
assert "s2" in sids
assert "s1" not in sids
class TestFastSearchSessionResolution:
"""BUG 2: _fast_search didn't resolve child sessions to parent.
A delegation child and its parent both containing "docker" would appear
as two separate results. They should be resolved to the parent session.
Also, current session lineage exclusion must cover the entire chain.
"""
def test_child_resolved_to_parent(self, db):
"""Parent + child matching same term should return 1 result (parent)."""
import json
from tools.session_search_tool import _fast_search
db.create_session(session_id="parent-1", source="cli")
db.create_session(session_id="child-1", source="cli", parent_session_id="parent-1")
db.append_message(session_id="parent-1", role="user", content="docker setup question")
db.append_message(session_id="child-1", role="assistant", content="docker setup done")
result = json.loads(_fast_search(query="docker", db=db, limit=5, current_session_id=None))
assert result["success"]
sids = [e["session_id"] for e in result["results"]]
# Should collapse to parent, not show both
assert "child-1" not in sids, "Child should be resolved to parent"
assert "parent-1" in sids
assert len(result["results"]) == 1
def test_match_count_accumulates_from_children(self, db):
"""Match_count should sum parent + child matches."""
import json
from tools.session_search_tool import _fast_search
db.create_session(session_id="p", source="cli")
db.create_session(session_id="c", source="cli", parent_session_id="p")
db.append_message(session_id="p", role="user", content="docker question")
db.append_message(session_id="c", role="assistant", content="docker answer")
result = json.loads(_fast_search(query="docker", db=db, limit=5, current_session_id=None))
entry = result["results"][0]
assert entry["session_id"] == "p"
assert entry["match_count"] >= 2, f"Expected accumulated count >= 2, got {entry['match_count']}"
def test_current_session_lineage_excludes_children(self, db):
"""When current session is a child, parent should also be excluded."""
import json
from tools.session_search_tool import _fast_search
db.create_session(session_id="parent-2", source="cli")
db.create_session(session_id="child-2", source="cli", parent_session_id="parent-2")
db.create_session(session_id="unrelated", source="cli")
db.append_message(session_id="parent-2", role="user", content="docker deploy")
db.append_message(session_id="child-2", role="assistant", content="docker deployed")
db.append_message(session_id="unrelated", role="user", content="docker build")
# Current session = child -> should exclude parent-2 AND child-2, keep unrelated
result = json.loads(_fast_search(query="docker", db=db, limit=5, current_session_id="child-2"))
sids = [e["session_id"] for e in result["results"]]
assert "parent-2" not in sids, "Parent of current should be excluded"
assert "child-2" not in sids, "Current child should be excluded"
assert "unrelated" in sids, "Unrelated session should appear"
class TestGetChildSessionIds:
"""Tests for SessionDB.get_child_session_ids -- public API replacing
direct db._lock/db._conn access in _fast_search."""
def test_returns_child_ids(self, db):
db.create_session(session_id="parent", source="cli")
db.create_session(session_id="child-1", source="delegation", parent_session_id="parent")
db.create_session(session_id="child-2", source="compression", parent_session_id="parent")
db.create_session(session_id="orphan", source="cli")
children = db.get_child_session_ids("parent")
assert set(children) == {"child-1", "child-2"}
def test_returns_empty_for_leaf_session(self, db):
db.create_session(session_id="leaf", source="cli")
assert db.get_child_session_ids("leaf") == []
def test_returns_empty_for_no_args(self, db):
assert db.get_child_session_ids() == []
def test_multiple_parent_ids(self, db):
db.create_session(session_id="p1", source="cli")
db.create_session(session_id="p2", source="cli")
db.create_session(session_id="c1", source="delegation", parent_session_id="p1")
db.create_session(session_id="c2", source="delegation", parent_session_id="p2")
children = db.get_child_session_ids("p1", "p2")
assert set(children) == {"c1", "c2"}
def test_does_not_recurse(self, db):
"""Only direct children, not grandchildren."""
db.create_session(session_id="root", source="cli")
db.create_session(session_id="child", source="delegation", parent_session_id="root")
db.create_session(session_id="grandchild", source="delegation", parent_session_id="child")
children = db.get_child_session_ids("root")
assert children == ["child"]
class TestNoiseReduction:
"""Tests for noise reduction in term indexing.
Tool-role messages (structured JSON output) produce junk terms like
'output', 'exit_code', 'null', 'true', 'false'. Pure numeric tokens
('0', '1', '2') are never useful search targets. JSON key names that
appear in tool output schemas should be treated as stop words.
"""
def test_tool_role_messages_not_indexed(self, db):
"""Tool-role messages should be skipped entirely during indexing."""
db.create_session(session_id="s1", source="cli")
db.append_message(
session_id="s1",
role="tool",
content='{"output": "docker is running", "exit_code": 0}',
tool_name="terminal",
)
# Tool output should NOT index any terms from the JSON blob
# Even though 'docker' appears in the output string, it's inside
# structured JSON from a tool call, not natural language
cursor = db._conn.execute(
"SELECT COUNT(*) FROM term_index WHERE session_id = 's1'"
)
assert cursor.fetchone()[0] == 0
def test_assistant_role_still_indexed(self, db):
"""Non-tool messages should still be indexed normally."""
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="docker deploy")
db.append_message(
session_id="s1", role="assistant", content="docker is now running"
)
results = db.search_by_terms(["docker"])
assert len(results) >= 1
def test_pure_numeric_tokens_filtered(self):
"""Pure numeric tokens should be excluded from term extraction."""
from term_index import extract_terms
terms = extract_terms("exit code 0 with 42 errors in 123 steps")
# These numeric tokens provide zero search value
for num in ["0", "42", "123"]:
assert num not in terms, f"Pure numeric '{num}' should be filtered"
# But word tokens should survive
assert "exit" in terms
assert "code" in terms
assert "errors" in terms
assert "steps" in terms
def test_json_key_stopwords_filtered(self):
"""Common JSON schema keys from tool output should be stop words."""
from stop_words import is_stop_word
json_keys = [
"output",
"exit_code",
"error",
"null",
"true",
"false",
"status",
"content",
"message",
"cleared",
"success",
]
for key in json_keys:
assert is_stop_word(key), f"JSON key '{key}' should be a stop word"
def test_json_key_stopwords_in_extract_terms(self):
"""JSON key stop words should be filtered by extract_terms."""
from term_index import extract_terms
# Simulates typical tool output content
terms = extract_terms(
'{"output": "hello world", "exit_code": 0, "error": null, "success": true}'
)
for junk in ["output", "exit_code", "error", "null", "success", "true", "false"]:
assert junk not in terms, f"JSON key '{junk}' should be filtered"
# Actual content words should survive
assert "hello" in terms
assert "world" in terms
def test_reindex_skips_tool_messages(self, db):
"""reindex_term_index should not index tool-role messages."""
db.create_session(session_id="s1", source="cli")
db.append_message(session_id="s1", role="user", content="deploy docker")
db.append_message(
session_id="s1",
role="tool",
content='{"output": "docker running", "exit_code": 0}',
)
# Clear and reindex
db._conn.execute("DELETE FROM term_index")
db._conn.commit()
db.reindex_term_index()
# Tool message terms should not be in index
cursor = db._conn.execute(
"SELECT term FROM term_index WHERE session_id = 's1'"
)
indexed_terms = [row[0] for row in cursor.fetchall()]
for junk in ["output", "exit_code", "0"]:
assert junk not in indexed_terms, f"'{junk}' should not be indexed from tool messages"
class TestCJKFallbackInFastSearch:
"""CJK queries should fall through to the slow path even when fast=True.
The term index can't handle CJK because extract_terms() splits on
whitespace, and CJK languages don't use spaces between words.
session_search should detect this and use the FTS5+LIKE fallback.
"""
def test_cjk_query_bypasses_fast_path(self, db):
"""A CJK query with fast=True should be downgraded to fast=False."""
import json
from tools.session_search_tool import session_search
db.create_session(session_id="cjk-1", source="cli")
db.append_message(session_id="cjk-1", role="user", content="测试中文搜索")
# fast=True, but CJK query should fall through to full search
result = json.loads(session_search(
query="中文", db=db, limit=3, fast=True, current_session_id=None
))
# The result should come from the slow path (mode="full")
# not the fast path (mode="fast") since CJK triggers fallback
assert result["success"]
# mode should be "full" (not "fast") because CJK forced the fallback
assert result.get("mode") != "fast"
def test_english_query_stays_fast(self, db):
"""Non-CJK queries should still use the fast path."""
import json
from tools.session_search_tool import session_search
db.create_session(session_id="eng-1", source="cli")
db.append_message(session_id="eng-1", role="user", content="deploy the server")
result = json.loads(session_search(
query="deploy", db=db, limit=3, fast=True, current_session_id=None
))
assert result["success"]
assert result.get("mode") == "fast"

View file

@ -321,11 +321,14 @@ def session_search(
limit: int = 3,
db=None,
current_session_id: str = None,
fast: bool = True,
) -> str:
"""
Search past sessions and return focused summaries of matching conversations.
Uses FTS5 to find matches, then summarizes the top sessions with Gemini Flash.
By default (fast=True), uses the term_index inverted index for instant
results with session metadata and match counts no LLM calls needed.
Set fast=False to use FTS5 + LLM summarization for detailed summaries.
The current session is excluded from results since the agent already has that context.
"""
if db is None:
@ -348,6 +351,120 @@ def session_search(
query = query.strip()
# CJK queries can't be handled by the term index (no word boundaries
# for extract_terms to split on). Fall through to FTS5 + LIKE which
# has a CJK bigram/LIKE fallback.
if fast and db._contains_cjk(query):
fast = False
# ── Fast path: term index (no LLM, ~1ms) ──────────────────────────
if fast:
return _fast_search(query, db, limit, current_session_id)
# ── Slow path: FTS5 + LLM summarization (~5-15s) ───────────────────
return _full_search(query, role_filter, limit, db, current_session_id)
def _fast_search(query: str, db, limit: int, current_session_id: str = None) -> str:
"""Term index fast path: instant search, no LLM calls."""
from term_index import extract_terms
terms = extract_terms(query)
if not terms:
return json.dumps({
"success": True,
"query": query,
"results": [],
"count": 0,
"message": "No searchable terms in query (all stop words or empty).",
}, ensure_ascii=False)
# Fetch extra results so we have room after dedup/lineage exclusion
raw_results = db.search_by_terms(
terms=terms,
exclude_sources=list(_HIDDEN_SESSION_SOURCES),
limit=limit * 3,
)
# Resolve child sessions to their parent root, just like _full_search.
# Delegation stores detailed content in child sessions, but the user
# sees the parent conversation. Without this, parent + child both
# containing "docker" would appear as two separate results.
def _resolve_to_parent(session_id: str) -> str:
visited = set()
sid = session_id
while sid and sid not in visited:
visited.add(sid)
try:
session = db.get_session(sid)
if not session:
break
parent = session.get("parent_session_id")
if parent:
sid = parent
else:
break
except Exception:
break
return sid
# Determine current session lineage for exclusion
current_lineage = set()
if current_session_id:
# Walk parent chain AND collect all children
root = _resolve_to_parent(current_session_id)
current_lineage.add(root)
current_lineage.add(current_session_id)
# Also find any child sessions of the current root
try:
children = db.get_child_session_ids(root, current_session_id)
current_lineage.update(children)
except Exception:
pass
seen_sessions = {}
for r in raw_results:
raw_sid = r.get("session_id", "")
resolved_sid = _resolve_to_parent(raw_sid)
if resolved_sid in current_lineage or raw_sid in current_lineage:
continue
if resolved_sid not in seen_sessions:
# Sum match_count from child into parent
seen_sessions[resolved_sid] = dict(r)
seen_sessions[resolved_sid]["session_id"] = resolved_sid
else:
# Accumulate match_count from child sessions
seen_sessions[resolved_sid]["match_count"] = (
seen_sessions[resolved_sid].get("match_count", 0)
+ r.get("match_count", 0)
)
if len(seen_sessions) >= limit:
break
entries = []
for sid, r in seen_sessions.items():
entries.append({
"session_id": sid,
"when": _format_timestamp(r.get("session_started")),
"source": r.get("source", "unknown"),
"model": r.get("model"),
"title": r.get("title"),
"match_count": r.get("match_count", 0),
})
return json.dumps({
"success": True,
"query": query,
"mode": "fast",
"results": entries,
"count": len(entries),
"message": f"Found {len(entries)} matching sessions via term index (instant, no LLM)."
f" Use fast=False for LLM-summarized results.",
}, ensure_ascii=False)
def _full_search(query: str, role_filter: str, limit: int, db, current_session_id: str = None) -> str:
"""FTS5 + LLM summarization path (original behavior)."""
try:
# Parse role filter
role_list = None
@ -367,6 +484,7 @@ def session_search(
return json.dumps({
"success": True,
"query": query,
"mode": "full",
"results": [],
"count": 0,
"message": "No matching sessions found.",
@ -506,6 +624,7 @@ def session_search(
return json.dumps({
"success": True,
"query": query,
"mode": "full",
"results": summaries,
"count": len(summaries),
"sessions_searched": len(seen_sessions),
@ -535,7 +654,8 @@ SESSION_SEARCH_SCHEMA = {
"Returns titles, previews, and timestamps. Zero LLM cost, instant. "
"Start here when the user asks what were we working on or what did we do recently.\n"
"2. Keyword search (with query): Search for specific topics across all past sessions. "
"Returns LLM-generated summaries of matching sessions.\n\n"
"By default uses the term index for instant results (no LLM). "
"Set fast=False for detailed LLM-generated summaries.\n\n"
"USE THIS PROACTIVELY when:\n"
"- The user says 'we did this before', 'remember when', 'last time', 'as I mentioned'\n"
"- The user asks about a topic you worked on before but don't have in current context\n"
@ -544,11 +664,15 @@ SESSION_SEARCH_SCHEMA = {
"- The user asks 'what did we do about X?' or 'how did we fix Y?'\n\n"
"Don't hesitate to search when it is actually cross-session -- it's fast and cheap. "
"Better to search and confirm than to guess or ask the user to repeat themselves.\n\n"
"Search syntax: keywords joined with OR for broad recall (elevenlabs OR baseten OR funding), "
"phrases for exact match (\"docker networking\"), boolean (python NOT java), prefix (deploy*). "
"IMPORTANT: Use OR between keywords for best results — FTS5 defaults to AND which misses "
"sessions that only mention some terms. If a broad OR query returns nothing, try individual "
"keyword searches in parallel. Returns summaries of the top matching sessions."
"Search syntax depends on the mode:\n"
"- fast=True (default): Simple keyword search with AND semantics. Multiple words "
"all must appear in a session. No boolean operators or phrase matching. "
"Instant, zero LLM cost.\n"
"- fast=False: FTS5 full syntax — OR for broad recall (elevenlabs OR baseten OR funding), "
"phrases for exact match (\\\"docker networking\\\"), boolean (python NOT java), "
"prefix (deploy*). IMPORTANT: Use OR between keywords for best FTS5 results — "
"it defaults to AND which misses sessions that only mention some terms. "
"Slower (5-15s) but returns LLM-summarized results."
),
"parameters": {
"type": "object",
@ -559,13 +683,18 @@ SESSION_SEARCH_SCHEMA = {
},
"role_filter": {
"type": "string",
"description": "Optional: only search messages from specific roles (comma-separated). E.g. 'user,assistant' to skip tool outputs.",
"description": "Optional: only search messages from specific roles (comma-separated). E.g. 'user,assistant' to skip tool outputs. Only used when fast=False.",
},
"limit": {
"type": "integer",
"description": "Max sessions to summarize (default: 3, max: 5).",
"default": 3,
},
"fast": {
"type": "boolean",
"description": "When true (default), use the term index for instant results with no LLM cost. When false, use FTS5 + LLM summarization for detailed summaries.",
"default": True,
},
},
"required": [],
},
@ -583,6 +712,7 @@ registry.register(
query=args.get("query") or "",
role_filter=args.get("role_filter"),
limit=args.get("limit", 3),
fast=args.get("fast", True),
db=kw.get("db"),
current_session_id=kw.get("current_session_id")),
check_fn=check_session_search_requirements,