fix(kanban): rebuild legacy TEXT-PK tables to INTEGER AUTOINCREMENT on open

Legacy kanban boards (pre-AUTOINCREMENT schema) crashed the gateway
notifier on every tick — int(None) on a NULL id in unseen_events_for_sub
— silently losing all kanban notifications. CREATE TABLE IF NOT EXISTS
skips existing tables regardless of schema and _add_column_if_missing
only adds columns, so neither could fix a drifted primary-key type.

_rebuild_drifted_tables() detects the legacy shape via PRAGMA table_info
and rebuilds task_events/task_comments/task_runs (TEXT PK -> INTEGER
AUTOINCREMENT) and kanban_notify_subs.last_event_id (TEXT/NULL -> INTEGER
NOT NULL DEFAULT 0), preserving data. The whole pass is one transaction
so an interruption can't leave a table half-renamed, and recreates every
index DROP TABLE would otherwise take down (including idx_events_run).

Co-authored-by: liuhao1024 <liuhao1024@users.noreply.github.com>
This commit is contained in:
teknium1 2026-05-30 00:31:52 -07:00 committed by Teknium
parent 16882cfded
commit c70dca3a88
2 changed files with 273 additions and 0 deletions

View file

@ -1637,6 +1637,140 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
(new, old),
)
_rebuild_drifted_tables(conn)
# Legacy DBs defined these tables with a ``TEXT PRIMARY KEY`` id (or, for
# ``kanban_notify_subs``, a nullable ``TEXT last_event_id``). The current
# schema uses ``INTEGER PRIMARY KEY AUTOINCREMENT`` / ``INTEGER NOT NULL
# DEFAULT 0``. ``CREATE TABLE IF NOT EXISTS`` skips existing tables
# regardless of schema and ``_add_column_if_missing`` only adds columns, so
# neither can fix a drifted column type — the table must be rebuilt. See
# #35096.
#
# Each entry pairs the canonical CREATE TABLE with the CREATE INDEX
# statements that DROP TABLE would otherwise take down with it (including
# ``idx_events_run``, added by the additive pass above). To guard against
# this list drifting from SCHEMA_SQL, ``test_rebuilt_schema_matches_fresh``
# asserts a rebuilt legacy DB is byte-identical to a fresh one.
_REBUILD_SPECS = {
"task_events": (
"CREATE TABLE task_events ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" task_id TEXT NOT NULL, run_id INTEGER, kind TEXT NOT NULL,"
" payload TEXT, created_at INTEGER NOT NULL)",
(
"CREATE INDEX idx_events_task ON task_events(task_id, created_at)",
"CREATE INDEX idx_events_run ON task_events(run_id, id)",
),
),
"task_comments": (
"CREATE TABLE task_comments ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" task_id TEXT NOT NULL, author TEXT NOT NULL, body TEXT NOT NULL,"
" created_at INTEGER NOT NULL)",
("CREATE INDEX idx_comments_task ON task_comments(task_id, created_at)",),
),
"task_runs": (
"CREATE TABLE task_runs ("
" id INTEGER PRIMARY KEY AUTOINCREMENT,"
" task_id TEXT NOT NULL, profile TEXT, step_key TEXT,"
" status TEXT NOT NULL, claim_lock TEXT, claim_expires INTEGER,"
" worker_pid INTEGER, max_runtime_seconds INTEGER,"
" last_heartbeat_at INTEGER, started_at INTEGER NOT NULL,"
" ended_at INTEGER, outcome TEXT, summary TEXT, metadata TEXT,"
" error TEXT)",
(
"CREATE INDEX idx_runs_task ON task_runs(task_id, started_at)",
"CREATE INDEX idx_runs_status ON task_runs(status)",
),
),
"kanban_notify_subs": (
"CREATE TABLE kanban_notify_subs ("
" task_id TEXT NOT NULL, platform TEXT NOT NULL, chat_id TEXT NOT NULL,"
" thread_id TEXT NOT NULL DEFAULT '', user_id TEXT,"
" notifier_profile TEXT, created_at INTEGER NOT NULL,"
" last_event_id INTEGER NOT NULL DEFAULT 0,"
" PRIMARY KEY (task_id, platform, chat_id, thread_id))",
("CREATE INDEX idx_notify_task ON kanban_notify_subs(task_id)",),
),
}
def _table_has_drifted(conn: sqlite3.Connection, table: str) -> bool:
"""True when ``table`` still carries the legacy (pre-AUTOINCREMENT) shape."""
info = conn.execute(f"PRAGMA table_info({table})").fetchall()
if not info:
return False # table absent — nothing to rebuild
if table == "kanban_notify_subs":
lei = next((c for c in info if c["name"] == "last_event_id"), None)
return lei is not None and (lei["type"] or "").upper() != "INTEGER"
# task_events / task_comments / task_runs: id must be INTEGER and a PK.
id_col = next((c for c in info if c["name"] == "id"), None)
if id_col is None:
return False
return not ((id_col["type"] or "").upper() == "INTEGER" and id_col["pk"])
def _rebuild_drifted_tables(conn: sqlite3.Connection) -> None:
"""Rebuild any kanban table whose column types drifted from SCHEMA_SQL.
Old boards crash the gateway notifier (``int(None)`` on a NULL id in
``unseen_events_for_sub``) and never match the ``id > cursor`` filter, so
every kanban notification is silently lost (#35096). Each affected table is
rebuilt with the standard SQLite pattern CREATE new INSERT shared
columns DROP old RENAME recreating its indexes too (DROP TABLE takes
them down). The legacy TEXT ids are dropped (they aren't valid integers);
AUTOINCREMENT assigns fresh ones and ``last_event_id`` cursors reset to 0,
so the first post-migration tick replays a task's event history once —
the safe failure mode for a feature that was already fully broken.
The whole pass runs in one transaction so an interruption can't leave a
table half-renamed, and under ``connect()``'s init locks so nothing races
it. Idempotent: a correctly-typed DB skips every table and returns without
opening a transaction.
"""
drifted = [t for t in _REBUILD_SPECS if _table_has_drifted(conn, t)]
if not drifted:
return
conn.execute("BEGIN IMMEDIATE")
try:
for table in drifted:
create_sql, index_sqls = _REBUILD_SPECS[table]
old_cols = [c["name"] for c in conn.execute(f"PRAGMA table_info({table})")]
_log.info("kanban migration: rebuilding %s to match current schema", table)
conn.execute(f"ALTER TABLE {table} RENAME TO {table}_legacy")
conn.execute(create_sql)
new_cols = {c["name"] for c in conn.execute(f"PRAGMA table_info({table})")}
if table == "kanban_notify_subs":
# Cast the legacy TEXT cursor to INTEGER; NULL / non-numeric → 0.
shared = [c for c in old_cols if c in new_cols and c != "last_event_id"]
cols_csv = ", ".join(shared)
conn.execute(
f"INSERT INTO {table} ({cols_csv}, last_event_id) "
f"SELECT {cols_csv}, COALESCE(CAST(last_event_id AS INTEGER), 0) "
f"FROM {table}_legacy"
)
else:
# Drop the legacy TEXT id; AUTOINCREMENT reassigns it.
shared = [c for c in old_cols if c in new_cols and c != "id"]
cols_csv = ", ".join(shared)
conn.execute(
f"INSERT INTO {table} ({cols_csv}) "
f"SELECT {cols_csv} FROM {table}_legacy"
)
conn.execute(f"DROP TABLE {table}_legacy")
for index_sql in index_sqls:
conn.execute(index_sql)
conn.execute("COMMIT")
except Exception:
try:
conn.execute("ROLLBACK")
except sqlite3.OperationalError:
pass
raise
def _check_file_length_invariant(conn: sqlite3.Connection) -> None:
"""Read the SQLite header page_count and compare against actual file size.

View file

@ -1,11 +1,74 @@
from __future__ import annotations
import sqlite3
import threading
from pathlib import Path
from hermes_cli import kanban_db as kb
def _make_legacy_db(path: Path) -> None:
"""Write a kanban DB with the pre-AUTOINCREMENT (TEXT PK) schema for the
four tables #35096 affects, keeping every other table current so the
additive-column migration runs cleanly on top.
"""
conn = sqlite3.connect(str(path))
conn.executescript(kb.SCHEMA_SQL)
conn.executescript(
"""
DROP TABLE task_events;
DROP TABLE task_comments;
DROP TABLE task_runs;
DROP TABLE kanban_notify_subs;
CREATE TABLE task_comments (id TEXT PRIMARY KEY, task_id TEXT NOT NULL,
author TEXT NOT NULL, body TEXT NOT NULL, created_at INTEGER NOT NULL);
CREATE TABLE task_events (id TEXT PRIMARY KEY, task_id TEXT NOT NULL,
kind TEXT NOT NULL, payload TEXT, created_at INTEGER NOT NULL);
CREATE TABLE task_runs (id TEXT PRIMARY KEY, task_id TEXT NOT NULL,
profile TEXT, status TEXT NOT NULL, started_at INTEGER NOT NULL);
CREATE TABLE kanban_notify_subs (task_id TEXT NOT NULL, platform TEXT NOT NULL,
chat_id TEXT NOT NULL, thread_id TEXT NOT NULL DEFAULT '', user_id TEXT,
created_at INTEGER NOT NULL, last_event_id TEXT,
PRIMARY KEY (task_id, platform, chat_id, thread_id));
"""
)
conn.execute("INSERT INTO tasks (id, title, status, created_at) VALUES ('task-1', 'T', 'done', 1000)")
conn.execute("INSERT INTO task_comments VALUES ('c-1', 'task-1', 'agent', 'hi', 1500)")
conn.execute("INSERT INTO task_events VALUES ('e-1', 'task-1', 'completed', NULL, 2000)")
conn.execute("INSERT INTO task_events VALUES ('e-2', 'task-1', 'blocked', NULL, 2100)")
conn.execute("INSERT INTO task_runs VALUES ('r-1', 'task-1', 'default', 'done', 1000)")
conn.execute(
"INSERT INTO kanban_notify_subs (task_id, platform, chat_id, created_at, last_event_id) "
"VALUES ('task-1', 'telegram', '123', 1000, 'e-1')"
)
conn.commit()
conn.close()
def _setup_home(tmp_path, monkeypatch) -> Path:
home = tmp_path / ".hermes"
home.mkdir()
monkeypatch.setenv("HERMES_HOME", str(home))
monkeypatch.setattr(Path, "home", lambda: tmp_path)
db_path = kb.kanban_db_path(board="legacy")
db_path.parent.mkdir(parents=True, exist_ok=True)
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
return db_path
def _table_struct(conn: sqlite3.Connection, table: str):
cols = [
(r["name"], (r["type"] or "").upper(), r["notnull"], r["pk"])
for r in conn.execute(f"PRAGMA table_info({table})")
]
idx = sorted(
r["name"]
for r in conn.execute(f"PRAGMA index_list({table})")
if not r["name"].startswith("sqlite_")
)
return cols, idx
def test_connect_initialization_is_thread_safe(tmp_path, monkeypatch):
home = tmp_path / ".hermes"
home.mkdir()
@ -36,3 +99,79 @@ def test_connect_initialization_is_thread_safe(tmp_path, monkeypatch):
with kb.connect(board="default") as conn:
cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")}
assert "max_retries" in cols
def test_legacy_text_pk_tables_rebuilt_to_integer_autoincrement(tmp_path, monkeypatch):
"""A pre-AUTOINCREMENT DB is migrated in place: id columns become INTEGER
PKs, ``last_event_id`` becomes INTEGER, data is preserved, and indexes
are recreated (DROP TABLE would otherwise take them down)."""
db_path = _setup_home(tmp_path, monkeypatch)
_make_legacy_db(db_path)
with kb.connect(db_path) as conn:
for table in ("task_events", "task_comments", "task_runs"):
id_col = {r["name"]: r for r in conn.execute(f"PRAGMA table_info({table})")}["id"]
assert id_col["type"].upper() == "INTEGER" and id_col["pk"] == 1
lei = {r["name"]: r for r in conn.execute("PRAGMA table_info(kanban_notify_subs)")}
assert lei["last_event_id"]["type"].upper() == "INTEGER"
# Data preserved across the rebuild.
assert len(conn.execute("SELECT * FROM task_events").fetchall()) == 2
assert conn.execute("SELECT body FROM task_comments").fetchone()["body"] == "hi"
assert len(conn.execute("SELECT * FROM task_runs").fetchall()) == 1
# Non-numeric legacy cursor ("e-1") casts to 0.
assert conn.execute("SELECT last_event_id FROM kanban_notify_subs").fetchone()["last_event_id"] == 0
# Indexes restored, including idx_events_run (added by the additive pass).
indexes = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='index'")}
for name in ("idx_events_task", "idx_events_run", "idx_comments_task",
"idx_runs_task", "idx_runs_status", "idx_notify_task"):
assert name in indexes
# AUTOINCREMENT actually works after the rebuild.
conn.execute("INSERT INTO task_events (task_id, kind, created_at) VALUES ('task-1', 'completed', 3000)")
new_id = conn.execute("SELECT id FROM task_events ORDER BY id DESC LIMIT 1").fetchone()["id"]
assert isinstance(new_id, int) and new_id >= 1
def test_rebuilt_schema_matches_fresh_db(tmp_path, monkeypatch):
"""The rebuilt tables must be structurally identical to a fresh DB, so the
hand-written DDL in ``_REBUILD_SPECS`` can't silently drift from SCHEMA_SQL."""
legacy_path = _setup_home(tmp_path, monkeypatch)
_make_legacy_db(legacy_path)
fresh_path = kb.kanban_db_path(board="fresh")
fresh_path.parent.mkdir(parents=True, exist_ok=True)
kb._INITIALIZED_PATHS.discard(str(fresh_path.resolve()))
with kb.connect(legacy_path) as migrated, kb.connect(fresh_path) as fresh:
for table in ("task_events", "task_comments", "task_runs", "kanban_notify_subs"):
assert _table_struct(migrated, table) == _table_struct(fresh, table)
def test_migration_is_idempotent(tmp_path, monkeypatch):
"""Re-opening an already-migrated DB is a no-op and leaves data intact."""
db_path = _setup_home(tmp_path, monkeypatch)
_make_legacy_db(db_path)
with kb.connect(db_path):
pass
kb._INITIALIZED_PATHS.discard(str(db_path.resolve()))
with kb.connect(db_path) as conn:
id_col = {r["name"]: r for r in conn.execute("PRAGMA table_info(task_events)")}["id"]
assert id_col["type"].upper() == "INTEGER"
assert len(conn.execute("SELECT * FROM task_events").fetchall()) == 2
def test_unseen_events_for_sub_survives_migrated_db(tmp_path, monkeypatch):
"""The crash that motivated #35096 — ``int(None)`` on a NULL cursor — is
gone after migration; the notifier query returns an integer cursor."""
db_path = _setup_home(tmp_path, monkeypatch)
_make_legacy_db(db_path)
with kb.connect(db_path) as conn:
cursor, events = kb.unseen_events_for_sub(
conn, task_id="task-1", platform="telegram", chat_id="123"
)
assert isinstance(cursor, int)
assert isinstance(events, list)