fix(state): replace version-gated migrations with declarative column reconciliation

The linear migration chain (if current_version < N: ALTER TABLE ADD COLUMN)
broke when commit a7d78d3b inserted a new v7 migration (reasoning_content)
and renumbered the old v7 (api_call_count) to v8. Users who updated between
Apr 15–22 already had schema_version=7 from the old numbering, so the new
v7 block was skipped entirely — reasoning_content was never created, causing
'sqlite3.OperationalError: no such column: reasoning_content' on /continue.

Root cause: two independent sources of truth (SCHEMA_SQL for new DBs vs a
manually-maintained version chain for existing DBs) that could desync when
migrations were reordered.

Fix: replace the 8-block version chain with _reconcile_columns(), which on
every startup diffs live table columns (PRAGMA table_info) against the
columns declared in SCHEMA_SQL and ADDs any that are missing. This follows
the Beets/sqlite-utils pattern used across mature SQLite projects — the
CREATE TABLE definition becomes the single source of truth for schema.

Adding a new column is now a single change: add it to SCHEMA_SQL. No
migration block to write, no version number to bump, no possibility of
skipped columns from reordering.

The schema_version table is retained for future data migrations that
cannot be handled declaratively (row transforms), but zero version-gated
ADD COLUMN blocks remain.
This commit is contained in:
kshitijk4poor 2026-04-22 23:39:02 +05:30
parent bc5da42b2c
commit ccd4116635
2 changed files with 257 additions and 92 deletions

View file

@ -256,109 +256,136 @@ class SessionDB:
self._conn.close() self._conn.close()
self._conn = None self._conn = None
@staticmethod
def _parse_schema_columns(schema_sql: str) -> Dict[str, Dict[str, str]]:
"""Extract expected columns per table from SCHEMA_SQL.
Uses an in-memory SQLite database to parse the SQL SQLite itself
handles all syntax (DEFAULT expressions with commas, inline
REFERENCES, CHECK constraints, etc.) so there are zero regex
edge cases. The in-memory DB is opened, the schema DDL is
executed, and PRAGMA table_info extracts the column metadata.
Adding a column to SCHEMA_SQL is all that's needed; the
reconciliation loop picks it up automatically.
"""
ref = sqlite3.connect(":memory:")
try:
ref.executescript(schema_sql)
table_columns: Dict[str, Dict[str, str]] = {}
for (tbl,) in ref.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name NOT LIKE 'sqlite_%'"
).fetchall():
cols: Dict[str, str] = {}
for row in ref.execute(
f'PRAGMA table_info("{tbl}")'
).fetchall():
# row: (cid, name, type, notnull, dflt_value, pk)
col_name = row[1]
col_type = row[2] or ""
notnull = row[3]
default = row[4]
pk = row[5]
# Reconstruct the type expression for ALTER TABLE ADD COLUMN
parts = [col_type] if col_type else []
if notnull and not pk:
parts.append("NOT NULL")
if default is not None:
parts.append(f"DEFAULT {default}")
cols[col_name] = " ".join(parts)
table_columns[tbl] = cols
return table_columns
finally:
ref.close()
def _reconcile_columns(self, cursor: sqlite3.Cursor) -> None:
"""Ensure live tables have every column declared in SCHEMA_SQL.
Follows the Beets/sqlite-utils pattern: the CREATE TABLE definition
in SCHEMA_SQL is the single source of truth for the desired schema.
On every startup this method diffs the live columns (via PRAGMA
table_info) against the declared columns, and ADDs any that are
missing.
This makes column additions a declarative operation just add
the column to SCHEMA_SQL and it appears on the next startup.
Version-gated migration blocks are no longer needed for ADD COLUMN.
"""
expected = self._parse_schema_columns(SCHEMA_SQL)
for table_name, declared_cols in expected.items():
# Get current columns from the live table
try:
rows = cursor.execute(
f'PRAGMA table_info("{table_name}")'
).fetchall()
except sqlite3.OperationalError:
continue # Table doesn't exist yet (shouldn't happen after executescript)
live_cols = set()
for row in rows:
# PRAGMA table_info returns (cid, name, type, notnull, dflt_value, pk)
name = row[1] if isinstance(row, (tuple, list)) else row["name"]
live_cols.add(name)
for col_name, col_type in declared_cols.items():
if col_name not in live_cols:
safe_name = col_name.replace('"', '""')
try:
cursor.execute(
f'ALTER TABLE "{table_name}" ADD COLUMN "{safe_name}" {col_type}'
)
except sqlite3.OperationalError as exc:
# Expected: "duplicate column name" from a race or
# re-run. Unexpected: "Cannot add a NOT NULL column
# with default value NULL" from a schema mistake.
# Log at DEBUG so it's visible in agent.log.
logger.debug(
"reconcile %s.%s: %s", table_name, col_name, exc,
)
def _init_schema(self): def _init_schema(self):
"""Create tables and FTS if they don't exist, run migrations.""" """Create tables and FTS if they don't exist, reconcile columns.
Schema management follows the declarative reconciliation pattern
(Beets, sqlite-utils): SCHEMA_SQL is the single source of truth.
On existing databases, _reconcile_columns() diffs live columns
against SCHEMA_SQL and ADDs any missing ones. This eliminates
the version-gated migration chain for column additions, making
it impossible for reordered or inserted migrations to skip columns.
The schema_version table is retained for future data migrations
(transforming existing rows) which cannot be handled declaratively.
"""
cursor = self._conn.cursor() cursor = self._conn.cursor()
cursor.executescript(SCHEMA_SQL) cursor.executescript(SCHEMA_SQL)
# Check schema version and run migrations # ── Declarative column reconciliation ──────────────────────────
# Diff live tables against SCHEMA_SQL and ADD any missing columns.
# This is idempotent and self-healing: even if a version-gated
# migration was skipped (e.g. due to version renumbering), the
# column gets created here.
self._reconcile_columns(cursor)
# ── Schema version bookkeeping ─────────────────────────────────
# Bump to current so future data migrations (if any) can gate on
# version. No version-gated column additions remain.
cursor.execute("SELECT version FROM schema_version LIMIT 1") cursor.execute("SELECT version FROM schema_version LIMIT 1")
row = cursor.fetchone() row = cursor.fetchone()
if row is None: if row is None:
cursor.execute("INSERT INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,)) cursor.execute(
"INSERT INTO schema_version (version) VALUES (?)",
(SCHEMA_VERSION,),
)
else: else:
current_version = row["version"] if isinstance(row, sqlite3.Row) else row[0] current_version = row["version"] if isinstance(row, sqlite3.Row) else row[0]
if current_version < 2: if current_version < SCHEMA_VERSION:
# v2: add finish_reason column to messages cursor.execute(
try: "UPDATE schema_version SET version = ?",
cursor.execute("ALTER TABLE messages ADD COLUMN finish_reason TEXT") (SCHEMA_VERSION,),
except sqlite3.OperationalError: )
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 2")
if current_version < 3:
# v3: add title column to sessions
try:
cursor.execute("ALTER TABLE sessions ADD COLUMN title TEXT")
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 3")
if current_version < 4:
# v4: add unique index on title (NULLs allowed, only non-NULL must be unique)
try:
cursor.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_sessions_title_unique "
"ON sessions(title) WHERE title IS NOT NULL"
)
except sqlite3.OperationalError:
pass # Index already exists
cursor.execute("UPDATE schema_version SET version = 4")
if current_version < 5:
new_columns = [
("cache_read_tokens", "INTEGER DEFAULT 0"),
("cache_write_tokens", "INTEGER DEFAULT 0"),
("reasoning_tokens", "INTEGER DEFAULT 0"),
("billing_provider", "TEXT"),
("billing_base_url", "TEXT"),
("billing_mode", "TEXT"),
("estimated_cost_usd", "REAL"),
("actual_cost_usd", "REAL"),
("cost_status", "TEXT"),
("cost_source", "TEXT"),
("pricing_version", "TEXT"),
]
for name, column_type in new_columns:
try:
# name and column_type come from the hardcoded tuple above,
# not user input. Double-quote identifier escaping is applied
# as defense-in-depth; SQLite DDL cannot be parameterized.
safe_name = name.replace('"', '""')
cursor.execute(f'ALTER TABLE sessions ADD COLUMN "{safe_name}" {column_type}')
except sqlite3.OperationalError:
pass
cursor.execute("UPDATE schema_version SET version = 5")
if current_version < 6:
# v6: add reasoning columns to messages table — preserves assistant
# reasoning text and structured reasoning_details across gateway
# session turns. Without these, reasoning chains are lost on
# session reload, breaking multi-turn reasoning continuity for
# providers that replay reasoning (OpenRouter, OpenAI, Nous).
for col_name, col_type in [
("reasoning", "TEXT"),
("reasoning_details", "TEXT"),
("codex_reasoning_items", "TEXT"),
]:
try:
safe = col_name.replace('"', '""')
cursor.execute(
f'ALTER TABLE messages ADD COLUMN "{safe}" {col_type}'
)
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 6")
if current_version < 7:
# v7: preserve provider-native reasoning_content separately from
# normalized reasoning text. Kimi/Moonshot replay can require
# this field on assistant tool-call messages when thinking is on.
try:
cursor.execute('ALTER TABLE messages ADD COLUMN "reasoning_content" TEXT')
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 7")
if current_version < 8:
# v8: add api_call_count column to sessions — tracks the number
# of individual LLM API calls made within a session (as opposed
# to the session count itself).
try:
cursor.execute(
'ALTER TABLE sessions ADD COLUMN "api_call_count" INTEGER DEFAULT 0'
)
except sqlite3.OperationalError:
pass # Column already exists
cursor.execute("UPDATE schema_version SET version = 8")
# Unique title index — always ensure it exists (safe to run after migrations # Unique title index — always ensure it exists
# since the title column is guaranteed to exist at this point)
try: try:
cursor.execute( cursor.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_sessions_title_unique " "CREATE UNIQUE INDEX IF NOT EXISTS idx_sessions_title_unique "

View file

@ -1254,6 +1254,144 @@ class TestSchemaInit:
migrated_db.close() migrated_db.close()
def test_reconciliation_adds_missing_columns(self, tmp_path):
"""Columns present in SCHEMA_SQL but missing from the live table
are added by _reconcile_columns regardless of schema_version.
Regression test: commit a7d78d3b inserted a new v7 migration
(reasoning_content) and renumbered the old v7 (api_call_count)
to v8. Users already at the old v7 had schema_version >= 7,
so the new v7 block was skipped and reasoning_content was never
created causing 'no such column' on /continue.
"""
import sqlite3
db_path = tmp_path / "gap_test.db"
conn = sqlite3.connect(str(db_path))
# Simulate the old v7 state: api_call_count exists, reasoning_content does NOT
conn.executescript("""
CREATE TABLE schema_version (version INTEGER NOT NULL);
INSERT INTO schema_version (version) VALUES (7);
CREATE TABLE sessions (
id TEXT PRIMARY KEY,
source TEXT NOT NULL,
user_id TEXT,
model TEXT,
model_config TEXT,
system_prompt TEXT,
parent_session_id TEXT,
started_at REAL NOT NULL,
ended_at REAL,
end_reason TEXT,
message_count INTEGER DEFAULT 0,
tool_call_count INTEGER DEFAULT 0,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
cache_read_tokens INTEGER DEFAULT 0,
cache_write_tokens INTEGER DEFAULT 0,
reasoning_tokens INTEGER DEFAULT 0,
billing_provider TEXT,
billing_base_url TEXT,
billing_mode TEXT,
estimated_cost_usd REAL,
actual_cost_usd REAL,
cost_status TEXT,
cost_source TEXT,
pricing_version TEXT,
title TEXT,
api_call_count INTEGER DEFAULT 0
);
CREATE TABLE messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT,
tool_call_id TEXT,
tool_calls TEXT,
tool_name TEXT,
timestamp REAL NOT NULL,
token_count INTEGER,
finish_reason TEXT,
reasoning TEXT,
reasoning_details TEXT,
codex_reasoning_items TEXT
);
""")
conn.execute(
"INSERT INTO sessions (id, source, started_at) VALUES (?, ?, ?)",
("s1", "cli", 1000.0),
)
conn.execute(
"INSERT INTO messages (session_id, role, content, timestamp) "
"VALUES (?, ?, ?, ?)",
("s1", "assistant", "hello", 1001.0),
)
conn.commit()
# Verify reasoning_content is absent
cols = {r[1] for r in conn.execute("PRAGMA table_info(messages)").fetchall()}
assert "reasoning_content" not in cols
conn.close()
# Open with SessionDB — reconciliation should add the missing column
migrated_db = SessionDB(db_path=db_path)
msg_cols = {
r[1]
for r in migrated_db._conn.execute("PRAGMA table_info(messages)").fetchall()
}
assert "reasoning_content" in msg_cols
# The query that used to crash must now work
cursor = migrated_db._conn.execute(
"SELECT role, content, reasoning, reasoning_content, "
"reasoning_details, codex_reasoning_items "
"FROM messages WHERE session_id = ?",
("s1",),
)
row = cursor.fetchone()
assert row is not None
assert row[0] == "assistant"
assert row[3] is None # reasoning_content NULL for old rows
migrated_db.close()
def test_reconciliation_is_idempotent(self, tmp_path):
"""Opening the same database twice doesn't error or duplicate columns."""
db_path = tmp_path / "idempotent.db"
db1 = SessionDB(db_path=db_path)
cols1 = {r[1] for r in db1._conn.execute("PRAGMA table_info(messages)").fetchall()}
db1.close()
db2 = SessionDB(db_path=db_path)
cols2 = {r[1] for r in db2._conn.execute("PRAGMA table_info(messages)").fetchall()}
db2.close()
assert cols1 == cols2
def test_schema_sql_is_source_of_truth(self, db):
"""Every column in SCHEMA_SQL exists in the live database.
This is the architectural invariant: SCHEMA_SQL declares the
desired schema, _reconcile_columns ensures it matches reality.
"""
from hermes_state import SCHEMA_SQL
expected = SessionDB._parse_schema_columns(SCHEMA_SQL)
for table_name, declared_cols in expected.items():
live_cols = {
r[1]
for r in db._conn.execute(
f'PRAGMA table_info("{table_name}")'
).fetchall()
}
for col_name in declared_cols:
assert col_name in live_cols, (
f"Column {col_name} declared in SCHEMA_SQL for {table_name} "
f"but missing from live DB. Live columns: {live_cols}"
)
class TestTitleUniqueness: class TestTitleUniqueness:
"""Tests for unique title enforcement and title-based lookups.""" """Tests for unique title enforcement and title-based lookups."""