From ccd4116635fd5031fbb9ce53ad0db904e1bf0fef Mon Sep 17 00:00:00 2001 From: kshitijk4poor <82637225+kshitijk4poor@users.noreply.github.com> Date: Wed, 22 Apr 2026 23:39:02 +0530 Subject: [PATCH] fix(state): replace version-gated migrations with declarative column reconciliation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The linear migration chain (if current_version < N: ALTER TABLE ADD COLUMN) broke when commit a7d78d3b inserted a new v7 migration (reasoning_content) and renumbered the old v7 (api_call_count) to v8. Users who updated between Apr 15–22 already had schema_version=7 from the old numbering, so the new v7 block was skipped entirely — reasoning_content was never created, causing 'sqlite3.OperationalError: no such column: reasoning_content' on /continue. Root cause: two independent sources of truth (SCHEMA_SQL for new DBs vs a manually-maintained version chain for existing DBs) that could desync when migrations were reordered. Fix: replace the 8-block version chain with _reconcile_columns(), which on every startup diffs live table columns (PRAGMA table_info) against the columns declared in SCHEMA_SQL and ADDs any that are missing. This follows the Beets/sqlite-utils pattern used across mature SQLite projects — the CREATE TABLE definition becomes the single source of truth for schema. Adding a new column is now a single change: add it to SCHEMA_SQL. No migration block to write, no version number to bump, no possibility of skipped columns from reordering. The schema_version table is retained for future data migrations that cannot be handled declaratively (row transforms), but zero version-gated ADD COLUMN blocks remain. --- hermes_state.py | 211 +++++++++++++++++++++---------------- tests/test_hermes_state.py | 138 ++++++++++++++++++++++++ 2 files changed, 257 insertions(+), 92 deletions(-) diff --git a/hermes_state.py b/hermes_state.py index 0ea9815b5..8990ceccd 100644 --- a/hermes_state.py +++ b/hermes_state.py @@ -256,109 +256,136 @@ class SessionDB: self._conn.close() self._conn = None + @staticmethod + def _parse_schema_columns(schema_sql: str) -> Dict[str, Dict[str, str]]: + """Extract expected columns per table from SCHEMA_SQL. + + Uses an in-memory SQLite database to parse the SQL — SQLite itself + handles all syntax (DEFAULT expressions with commas, inline + REFERENCES, CHECK constraints, etc.) so there are zero regex + edge cases. The in-memory DB is opened, the schema DDL is + executed, and PRAGMA table_info extracts the column metadata. + + Adding a column to SCHEMA_SQL is all that's needed; the + reconciliation loop picks it up automatically. + """ + ref = sqlite3.connect(":memory:") + try: + ref.executescript(schema_sql) + table_columns: Dict[str, Dict[str, str]] = {} + for (tbl,) in ref.execute( + "SELECT name FROM sqlite_master " + "WHERE type='table' AND name NOT LIKE 'sqlite_%'" + ).fetchall(): + cols: Dict[str, str] = {} + for row in ref.execute( + f'PRAGMA table_info("{tbl}")' + ).fetchall(): + # row: (cid, name, type, notnull, dflt_value, pk) + col_name = row[1] + col_type = row[2] or "" + notnull = row[3] + default = row[4] + pk = row[5] + # Reconstruct the type expression for ALTER TABLE ADD COLUMN + parts = [col_type] if col_type else [] + if notnull and not pk: + parts.append("NOT NULL") + if default is not None: + parts.append(f"DEFAULT {default}") + cols[col_name] = " ".join(parts) + table_columns[tbl] = cols + return table_columns + finally: + ref.close() + + def _reconcile_columns(self, cursor: sqlite3.Cursor) -> None: + """Ensure live tables have every column declared in SCHEMA_SQL. + + Follows the Beets/sqlite-utils pattern: the CREATE TABLE definition + in SCHEMA_SQL is the single source of truth for the desired schema. + On every startup this method diffs the live columns (via PRAGMA + table_info) against the declared columns, and ADDs any that are + missing. + + This makes column additions a declarative operation — just add + the column to SCHEMA_SQL and it appears on the next startup. + Version-gated migration blocks are no longer needed for ADD COLUMN. + """ + expected = self._parse_schema_columns(SCHEMA_SQL) + for table_name, declared_cols in expected.items(): + # Get current columns from the live table + try: + rows = cursor.execute( + f'PRAGMA table_info("{table_name}")' + ).fetchall() + except sqlite3.OperationalError: + continue # Table doesn't exist yet (shouldn't happen after executescript) + live_cols = set() + for row in rows: + # PRAGMA table_info returns (cid, name, type, notnull, dflt_value, pk) + name = row[1] if isinstance(row, (tuple, list)) else row["name"] + live_cols.add(name) + + for col_name, col_type in declared_cols.items(): + if col_name not in live_cols: + safe_name = col_name.replace('"', '""') + try: + cursor.execute( + f'ALTER TABLE "{table_name}" ADD COLUMN "{safe_name}" {col_type}' + ) + except sqlite3.OperationalError as exc: + # Expected: "duplicate column name" from a race or + # re-run. Unexpected: "Cannot add a NOT NULL column + # with default value NULL" from a schema mistake. + # Log at DEBUG so it's visible in agent.log. + logger.debug( + "reconcile %s.%s: %s", table_name, col_name, exc, + ) + def _init_schema(self): - """Create tables and FTS if they don't exist, run migrations.""" + """Create tables and FTS if they don't exist, reconcile columns. + + Schema management follows the declarative reconciliation pattern + (Beets, sqlite-utils): SCHEMA_SQL is the single source of truth. + On existing databases, _reconcile_columns() diffs live columns + against SCHEMA_SQL and ADDs any missing ones. This eliminates + the version-gated migration chain for column additions, making + it impossible for reordered or inserted migrations to skip columns. + + The schema_version table is retained for future data migrations + (transforming existing rows) which cannot be handled declaratively. + """ cursor = self._conn.cursor() cursor.executescript(SCHEMA_SQL) - # Check schema version and run migrations + # ── Declarative column reconciliation ────────────────────────── + # Diff live tables against SCHEMA_SQL and ADD any missing columns. + # This is idempotent and self-healing: even if a version-gated + # migration was skipped (e.g. due to version renumbering), the + # column gets created here. + self._reconcile_columns(cursor) + + # ── Schema version bookkeeping ───────────────────────────────── + # Bump to current so future data migrations (if any) can gate on + # version. No version-gated column additions remain. cursor.execute("SELECT version FROM schema_version LIMIT 1") row = cursor.fetchone() if row is None: - cursor.execute("INSERT INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,)) + cursor.execute( + "INSERT INTO schema_version (version) VALUES (?)", + (SCHEMA_VERSION,), + ) else: current_version = row["version"] if isinstance(row, sqlite3.Row) else row[0] - if current_version < 2: - # v2: add finish_reason column to messages - try: - cursor.execute("ALTER TABLE messages ADD COLUMN finish_reason TEXT") - except sqlite3.OperationalError: - pass # Column already exists - cursor.execute("UPDATE schema_version SET version = 2") - if current_version < 3: - # v3: add title column to sessions - try: - cursor.execute("ALTER TABLE sessions ADD COLUMN title TEXT") - except sqlite3.OperationalError: - pass # Column already exists - cursor.execute("UPDATE schema_version SET version = 3") - if current_version < 4: - # v4: add unique index on title (NULLs allowed, only non-NULL must be unique) - try: - cursor.execute( - "CREATE UNIQUE INDEX IF NOT EXISTS idx_sessions_title_unique " - "ON sessions(title) WHERE title IS NOT NULL" - ) - except sqlite3.OperationalError: - pass # Index already exists - cursor.execute("UPDATE schema_version SET version = 4") - if current_version < 5: - new_columns = [ - ("cache_read_tokens", "INTEGER DEFAULT 0"), - ("cache_write_tokens", "INTEGER DEFAULT 0"), - ("reasoning_tokens", "INTEGER DEFAULT 0"), - ("billing_provider", "TEXT"), - ("billing_base_url", "TEXT"), - ("billing_mode", "TEXT"), - ("estimated_cost_usd", "REAL"), - ("actual_cost_usd", "REAL"), - ("cost_status", "TEXT"), - ("cost_source", "TEXT"), - ("pricing_version", "TEXT"), - ] - for name, column_type in new_columns: - try: - # name and column_type come from the hardcoded tuple above, - # not user input. Double-quote identifier escaping is applied - # as defense-in-depth; SQLite DDL cannot be parameterized. - safe_name = name.replace('"', '""') - cursor.execute(f'ALTER TABLE sessions ADD COLUMN "{safe_name}" {column_type}') - except sqlite3.OperationalError: - pass - cursor.execute("UPDATE schema_version SET version = 5") - if current_version < 6: - # v6: add reasoning columns to messages table — preserves assistant - # reasoning text and structured reasoning_details across gateway - # session turns. Without these, reasoning chains are lost on - # session reload, breaking multi-turn reasoning continuity for - # providers that replay reasoning (OpenRouter, OpenAI, Nous). - for col_name, col_type in [ - ("reasoning", "TEXT"), - ("reasoning_details", "TEXT"), - ("codex_reasoning_items", "TEXT"), - ]: - try: - safe = col_name.replace('"', '""') - cursor.execute( - f'ALTER TABLE messages ADD COLUMN "{safe}" {col_type}' - ) - except sqlite3.OperationalError: - pass # Column already exists - cursor.execute("UPDATE schema_version SET version = 6") - if current_version < 7: - # v7: preserve provider-native reasoning_content separately from - # normalized reasoning text. Kimi/Moonshot replay can require - # this field on assistant tool-call messages when thinking is on. - try: - cursor.execute('ALTER TABLE messages ADD COLUMN "reasoning_content" TEXT') - except sqlite3.OperationalError: - pass # Column already exists - cursor.execute("UPDATE schema_version SET version = 7") - if current_version < 8: - # v8: add api_call_count column to sessions — tracks the number - # of individual LLM API calls made within a session (as opposed - # to the session count itself). - try: - cursor.execute( - 'ALTER TABLE sessions ADD COLUMN "api_call_count" INTEGER DEFAULT 0' - ) - except sqlite3.OperationalError: - pass # Column already exists - cursor.execute("UPDATE schema_version SET version = 8") + if current_version < SCHEMA_VERSION: + cursor.execute( + "UPDATE schema_version SET version = ?", + (SCHEMA_VERSION,), + ) - # Unique title index — always ensure it exists (safe to run after migrations - # since the title column is guaranteed to exist at this point) + # Unique title index — always ensure it exists try: cursor.execute( "CREATE UNIQUE INDEX IF NOT EXISTS idx_sessions_title_unique " diff --git a/tests/test_hermes_state.py b/tests/test_hermes_state.py index f405cf8bd..cbc5165e2 100644 --- a/tests/test_hermes_state.py +++ b/tests/test_hermes_state.py @@ -1254,6 +1254,144 @@ class TestSchemaInit: migrated_db.close() + def test_reconciliation_adds_missing_columns(self, tmp_path): + """Columns present in SCHEMA_SQL but missing from the live table + are added by _reconcile_columns regardless of schema_version. + + Regression test: commit a7d78d3b inserted a new v7 migration + (reasoning_content) and renumbered the old v7 (api_call_count) + to v8. Users already at the old v7 had schema_version >= 7, + so the new v7 block was skipped and reasoning_content was never + created — causing 'no such column' on /continue. + """ + import sqlite3 + + db_path = tmp_path / "gap_test.db" + conn = sqlite3.connect(str(db_path)) + # Simulate the old v7 state: api_call_count exists, reasoning_content does NOT + conn.executescript(""" + CREATE TABLE schema_version (version INTEGER NOT NULL); + INSERT INTO schema_version (version) VALUES (7); + + CREATE TABLE sessions ( + id TEXT PRIMARY KEY, + source TEXT NOT NULL, + user_id TEXT, + model TEXT, + model_config TEXT, + system_prompt TEXT, + parent_session_id TEXT, + started_at REAL NOT NULL, + ended_at REAL, + end_reason TEXT, + message_count INTEGER DEFAULT 0, + tool_call_count INTEGER DEFAULT 0, + input_tokens INTEGER DEFAULT 0, + output_tokens INTEGER DEFAULT 0, + cache_read_tokens INTEGER DEFAULT 0, + cache_write_tokens INTEGER DEFAULT 0, + reasoning_tokens INTEGER DEFAULT 0, + billing_provider TEXT, + billing_base_url TEXT, + billing_mode TEXT, + estimated_cost_usd REAL, + actual_cost_usd REAL, + cost_status TEXT, + cost_source TEXT, + pricing_version TEXT, + title TEXT, + api_call_count INTEGER DEFAULT 0 + ); + + CREATE TABLE messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + session_id TEXT NOT NULL, + role TEXT NOT NULL, + content TEXT, + tool_call_id TEXT, + tool_calls TEXT, + tool_name TEXT, + timestamp REAL NOT NULL, + token_count INTEGER, + finish_reason TEXT, + reasoning TEXT, + reasoning_details TEXT, + codex_reasoning_items TEXT + ); + """) + conn.execute( + "INSERT INTO sessions (id, source, started_at) VALUES (?, ?, ?)", + ("s1", "cli", 1000.0), + ) + conn.execute( + "INSERT INTO messages (session_id, role, content, timestamp) " + "VALUES (?, ?, ?, ?)", + ("s1", "assistant", "hello", 1001.0), + ) + conn.commit() + # Verify reasoning_content is absent + cols = {r[1] for r in conn.execute("PRAGMA table_info(messages)").fetchall()} + assert "reasoning_content" not in cols + conn.close() + + # Open with SessionDB — reconciliation should add the missing column + migrated_db = SessionDB(db_path=db_path) + + msg_cols = { + r[1] + for r in migrated_db._conn.execute("PRAGMA table_info(messages)").fetchall() + } + assert "reasoning_content" in msg_cols + + # The query that used to crash must now work + cursor = migrated_db._conn.execute( + "SELECT role, content, reasoning, reasoning_content, " + "reasoning_details, codex_reasoning_items " + "FROM messages WHERE session_id = ?", + ("s1",), + ) + row = cursor.fetchone() + assert row is not None + assert row[0] == "assistant" + assert row[3] is None # reasoning_content NULL for old rows + + migrated_db.close() + + def test_reconciliation_is_idempotent(self, tmp_path): + """Opening the same database twice doesn't error or duplicate columns.""" + db_path = tmp_path / "idempotent.db" + db1 = SessionDB(db_path=db_path) + cols1 = {r[1] for r in db1._conn.execute("PRAGMA table_info(messages)").fetchall()} + db1.close() + + db2 = SessionDB(db_path=db_path) + cols2 = {r[1] for r in db2._conn.execute("PRAGMA table_info(messages)").fetchall()} + db2.close() + + assert cols1 == cols2 + + def test_schema_sql_is_source_of_truth(self, db): + """Every column in SCHEMA_SQL exists in the live database. + + This is the architectural invariant: SCHEMA_SQL declares the + desired schema, _reconcile_columns ensures it matches reality. + """ + from hermes_state import SCHEMA_SQL + + expected = SessionDB._parse_schema_columns(SCHEMA_SQL) + for table_name, declared_cols in expected.items(): + live_cols = { + r[1] + for r in db._conn.execute( + f'PRAGMA table_info("{table_name}")' + ).fetchall() + } + for col_name in declared_cols: + assert col_name in live_cols, ( + f"Column {col_name} declared in SCHEMA_SQL for {table_name} " + f"but missing from live DB. Live columns: {live_cols}" + ) + class TestTitleUniqueness: """Tests for unique title enforcement and title-based lookups."""