diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 5e465e87a6f..c0e8372c727 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -1637,6 +1637,140 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None: (new, old), ) + _rebuild_drifted_tables(conn) + + +# Legacy DBs defined these tables with a ``TEXT PRIMARY KEY`` id (or, for +# ``kanban_notify_subs``, a nullable ``TEXT last_event_id``). The current +# schema uses ``INTEGER PRIMARY KEY AUTOINCREMENT`` / ``INTEGER NOT NULL +# DEFAULT 0``. ``CREATE TABLE IF NOT EXISTS`` skips existing tables +# regardless of schema and ``_add_column_if_missing`` only adds columns, so +# neither can fix a drifted column type — the table must be rebuilt. See +# #35096. +# +# Each entry pairs the canonical CREATE TABLE with the CREATE INDEX +# statements that DROP TABLE would otherwise take down with it (including +# ``idx_events_run``, added by the additive pass above). To guard against +# this list drifting from SCHEMA_SQL, ``test_rebuilt_schema_matches_fresh`` +# asserts a rebuilt legacy DB is byte-identical to a fresh one. +_REBUILD_SPECS = { + "task_events": ( + "CREATE TABLE task_events (" + " id INTEGER PRIMARY KEY AUTOINCREMENT," + " task_id TEXT NOT NULL, run_id INTEGER, kind TEXT NOT NULL," + " payload TEXT, created_at INTEGER NOT NULL)", + ( + "CREATE INDEX idx_events_task ON task_events(task_id, created_at)", + "CREATE INDEX idx_events_run ON task_events(run_id, id)", + ), + ), + "task_comments": ( + "CREATE TABLE task_comments (" + " id INTEGER PRIMARY KEY AUTOINCREMENT," + " task_id TEXT NOT NULL, author TEXT NOT NULL, body TEXT NOT NULL," + " created_at INTEGER NOT NULL)", + ("CREATE INDEX idx_comments_task ON task_comments(task_id, created_at)",), + ), + "task_runs": ( + "CREATE TABLE task_runs (" + " id INTEGER PRIMARY KEY AUTOINCREMENT," + " task_id TEXT NOT NULL, profile TEXT, step_key TEXT," + " status TEXT NOT NULL, claim_lock TEXT, claim_expires INTEGER," + " worker_pid INTEGER, max_runtime_seconds INTEGER," + " last_heartbeat_at INTEGER, started_at INTEGER NOT NULL," + " ended_at INTEGER, outcome TEXT, summary TEXT, metadata TEXT," + " error TEXT)", + ( + "CREATE INDEX idx_runs_task ON task_runs(task_id, started_at)", + "CREATE INDEX idx_runs_status ON task_runs(status)", + ), + ), + "kanban_notify_subs": ( + "CREATE TABLE kanban_notify_subs (" + " task_id TEXT NOT NULL, platform TEXT NOT NULL, chat_id TEXT NOT NULL," + " thread_id TEXT NOT NULL DEFAULT '', user_id TEXT," + " notifier_profile TEXT, created_at INTEGER NOT NULL," + " last_event_id INTEGER NOT NULL DEFAULT 0," + " PRIMARY KEY (task_id, platform, chat_id, thread_id))", + ("CREATE INDEX idx_notify_task ON kanban_notify_subs(task_id)",), + ), +} + + +def _table_has_drifted(conn: sqlite3.Connection, table: str) -> bool: + """True when ``table`` still carries the legacy (pre-AUTOINCREMENT) shape.""" + info = conn.execute(f"PRAGMA table_info({table})").fetchall() + if not info: + return False # table absent — nothing to rebuild + if table == "kanban_notify_subs": + lei = next((c for c in info if c["name"] == "last_event_id"), None) + return lei is not None and (lei["type"] or "").upper() != "INTEGER" + # task_events / task_comments / task_runs: id must be INTEGER and a PK. + id_col = next((c for c in info if c["name"] == "id"), None) + if id_col is None: + return False + return not ((id_col["type"] or "").upper() == "INTEGER" and id_col["pk"]) + + +def _rebuild_drifted_tables(conn: sqlite3.Connection) -> None: + """Rebuild any kanban table whose column types drifted from SCHEMA_SQL. + + Old boards crash the gateway notifier (``int(None)`` on a NULL id in + ``unseen_events_for_sub``) and never match the ``id > cursor`` filter, so + every kanban notification is silently lost (#35096). Each affected table is + rebuilt with the standard SQLite pattern — CREATE new → INSERT shared + columns → DROP old → RENAME — recreating its indexes too (DROP TABLE takes + them down). The legacy TEXT ids are dropped (they aren't valid integers); + AUTOINCREMENT assigns fresh ones and ``last_event_id`` cursors reset to 0, + so the first post-migration tick replays a task's event history once — + the safe failure mode for a feature that was already fully broken. + + The whole pass runs in one transaction so an interruption can't leave a + table half-renamed, and under ``connect()``'s init locks so nothing races + it. Idempotent: a correctly-typed DB skips every table and returns without + opening a transaction. + """ + drifted = [t for t in _REBUILD_SPECS if _table_has_drifted(conn, t)] + if not drifted: + return + + conn.execute("BEGIN IMMEDIATE") + try: + for table in drifted: + create_sql, index_sqls = _REBUILD_SPECS[table] + old_cols = [c["name"] for c in conn.execute(f"PRAGMA table_info({table})")] + _log.info("kanban migration: rebuilding %s to match current schema", table) + conn.execute(f"ALTER TABLE {table} RENAME TO {table}_legacy") + conn.execute(create_sql) + new_cols = {c["name"] for c in conn.execute(f"PRAGMA table_info({table})")} + if table == "kanban_notify_subs": + # Cast the legacy TEXT cursor to INTEGER; NULL / non-numeric → 0. + shared = [c for c in old_cols if c in new_cols and c != "last_event_id"] + cols_csv = ", ".join(shared) + conn.execute( + f"INSERT INTO {table} ({cols_csv}, last_event_id) " + f"SELECT {cols_csv}, COALESCE(CAST(last_event_id AS INTEGER), 0) " + f"FROM {table}_legacy" + ) + else: + # Drop the legacy TEXT id; AUTOINCREMENT reassigns it. + shared = [c for c in old_cols if c in new_cols and c != "id"] + cols_csv = ", ".join(shared) + conn.execute( + f"INSERT INTO {table} ({cols_csv}) " + f"SELECT {cols_csv} FROM {table}_legacy" + ) + conn.execute(f"DROP TABLE {table}_legacy") + for index_sql in index_sqls: + conn.execute(index_sql) + conn.execute("COMMIT") + except Exception: + try: + conn.execute("ROLLBACK") + except sqlite3.OperationalError: + pass + raise + def _check_file_length_invariant(conn: sqlite3.Connection) -> None: """Read the SQLite header page_count and compare against actual file size. diff --git a/tests/hermes_cli/test_kanban_db_init.py b/tests/hermes_cli/test_kanban_db_init.py index c400b1d90f9..7db5d2009e6 100644 --- a/tests/hermes_cli/test_kanban_db_init.py +++ b/tests/hermes_cli/test_kanban_db_init.py @@ -1,11 +1,74 @@ from __future__ import annotations +import sqlite3 import threading from pathlib import Path from hermes_cli import kanban_db as kb +def _make_legacy_db(path: Path) -> None: + """Write a kanban DB with the pre-AUTOINCREMENT (TEXT PK) schema for the + four tables #35096 affects, keeping every other table current so the + additive-column migration runs cleanly on top. + """ + conn = sqlite3.connect(str(path)) + conn.executescript(kb.SCHEMA_SQL) + conn.executescript( + """ + DROP TABLE task_events; + DROP TABLE task_comments; + DROP TABLE task_runs; + DROP TABLE kanban_notify_subs; + CREATE TABLE task_comments (id TEXT PRIMARY KEY, task_id TEXT NOT NULL, + author TEXT NOT NULL, body TEXT NOT NULL, created_at INTEGER NOT NULL); + CREATE TABLE task_events (id TEXT PRIMARY KEY, task_id TEXT NOT NULL, + kind TEXT NOT NULL, payload TEXT, created_at INTEGER NOT NULL); + CREATE TABLE task_runs (id TEXT PRIMARY KEY, task_id TEXT NOT NULL, + profile TEXT, status TEXT NOT NULL, started_at INTEGER NOT NULL); + CREATE TABLE kanban_notify_subs (task_id TEXT NOT NULL, platform TEXT NOT NULL, + chat_id TEXT NOT NULL, thread_id TEXT NOT NULL DEFAULT '', user_id TEXT, + created_at INTEGER NOT NULL, last_event_id TEXT, + PRIMARY KEY (task_id, platform, chat_id, thread_id)); + """ + ) + conn.execute("INSERT INTO tasks (id, title, status, created_at) VALUES ('task-1', 'T', 'done', 1000)") + conn.execute("INSERT INTO task_comments VALUES ('c-1', 'task-1', 'agent', 'hi', 1500)") + conn.execute("INSERT INTO task_events VALUES ('e-1', 'task-1', 'completed', NULL, 2000)") + conn.execute("INSERT INTO task_events VALUES ('e-2', 'task-1', 'blocked', NULL, 2100)") + conn.execute("INSERT INTO task_runs VALUES ('r-1', 'task-1', 'default', 'done', 1000)") + conn.execute( + "INSERT INTO kanban_notify_subs (task_id, platform, chat_id, created_at, last_event_id) " + "VALUES ('task-1', 'telegram', '123', 1000, 'e-1')" + ) + conn.commit() + conn.close() + + +def _setup_home(tmp_path, monkeypatch) -> Path: + home = tmp_path / ".hermes" + home.mkdir() + monkeypatch.setenv("HERMES_HOME", str(home)) + monkeypatch.setattr(Path, "home", lambda: tmp_path) + db_path = kb.kanban_db_path(board="legacy") + db_path.parent.mkdir(parents=True, exist_ok=True) + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + return db_path + + +def _table_struct(conn: sqlite3.Connection, table: str): + cols = [ + (r["name"], (r["type"] or "").upper(), r["notnull"], r["pk"]) + for r in conn.execute(f"PRAGMA table_info({table})") + ] + idx = sorted( + r["name"] + for r in conn.execute(f"PRAGMA index_list({table})") + if not r["name"].startswith("sqlite_") + ) + return cols, idx + + def test_connect_initialization_is_thread_safe(tmp_path, monkeypatch): home = tmp_path / ".hermes" home.mkdir() @@ -36,3 +99,79 @@ def test_connect_initialization_is_thread_safe(tmp_path, monkeypatch): with kb.connect(board="default") as conn: cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")} assert "max_retries" in cols + + +def test_legacy_text_pk_tables_rebuilt_to_integer_autoincrement(tmp_path, monkeypatch): + """A pre-AUTOINCREMENT DB is migrated in place: id columns become INTEGER + PKs, ``last_event_id`` becomes INTEGER, data is preserved, and indexes + are recreated (DROP TABLE would otherwise take them down).""" + db_path = _setup_home(tmp_path, monkeypatch) + _make_legacy_db(db_path) + + with kb.connect(db_path) as conn: + for table in ("task_events", "task_comments", "task_runs"): + id_col = {r["name"]: r for r in conn.execute(f"PRAGMA table_info({table})")}["id"] + assert id_col["type"].upper() == "INTEGER" and id_col["pk"] == 1 + + lei = {r["name"]: r for r in conn.execute("PRAGMA table_info(kanban_notify_subs)")} + assert lei["last_event_id"]["type"].upper() == "INTEGER" + + # Data preserved across the rebuild. + assert len(conn.execute("SELECT * FROM task_events").fetchall()) == 2 + assert conn.execute("SELECT body FROM task_comments").fetchone()["body"] == "hi" + assert len(conn.execute("SELECT * FROM task_runs").fetchall()) == 1 + # Non-numeric legacy cursor ("e-1") casts to 0. + assert conn.execute("SELECT last_event_id FROM kanban_notify_subs").fetchone()["last_event_id"] == 0 + + # Indexes restored, including idx_events_run (added by the additive pass). + indexes = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='index'")} + for name in ("idx_events_task", "idx_events_run", "idx_comments_task", + "idx_runs_task", "idx_runs_status", "idx_notify_task"): + assert name in indexes + + # AUTOINCREMENT actually works after the rebuild. + conn.execute("INSERT INTO task_events (task_id, kind, created_at) VALUES ('task-1', 'completed', 3000)") + new_id = conn.execute("SELECT id FROM task_events ORDER BY id DESC LIMIT 1").fetchone()["id"] + assert isinstance(new_id, int) and new_id >= 1 + + +def test_rebuilt_schema_matches_fresh_db(tmp_path, monkeypatch): + """The rebuilt tables must be structurally identical to a fresh DB, so the + hand-written DDL in ``_REBUILD_SPECS`` can't silently drift from SCHEMA_SQL.""" + legacy_path = _setup_home(tmp_path, monkeypatch) + _make_legacy_db(legacy_path) + fresh_path = kb.kanban_db_path(board="fresh") + fresh_path.parent.mkdir(parents=True, exist_ok=True) + kb._INITIALIZED_PATHS.discard(str(fresh_path.resolve())) + + with kb.connect(legacy_path) as migrated, kb.connect(fresh_path) as fresh: + for table in ("task_events", "task_comments", "task_runs", "kanban_notify_subs"): + assert _table_struct(migrated, table) == _table_struct(fresh, table) + + +def test_migration_is_idempotent(tmp_path, monkeypatch): + """Re-opening an already-migrated DB is a no-op and leaves data intact.""" + db_path = _setup_home(tmp_path, monkeypatch) + _make_legacy_db(db_path) + + with kb.connect(db_path): + pass + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + with kb.connect(db_path) as conn: + id_col = {r["name"]: r for r in conn.execute("PRAGMA table_info(task_events)")}["id"] + assert id_col["type"].upper() == "INTEGER" + assert len(conn.execute("SELECT * FROM task_events").fetchall()) == 2 + + +def test_unseen_events_for_sub_survives_migrated_db(tmp_path, monkeypatch): + """The crash that motivated #35096 — ``int(None)`` on a NULL cursor — is + gone after migration; the notifier query returns an integer cursor.""" + db_path = _setup_home(tmp_path, monkeypatch) + _make_legacy_db(db_path) + + with kb.connect(db_path) as conn: + cursor, events = kb.unseen_events_for_sub( + conn, task_id="task-1", platform="telegram", chat_id="123" + ) + assert isinstance(cursor, int) + assert isinstance(events, list)