From 78698381af7ed7efa7e0c8e634af01dab4334511 Mon Sep 17 00:00:00 2001 From: Wesley Simplicio Date: Sat, 9 May 2026 12:12:24 -0300 Subject: [PATCH] fix(kanban): make _migrate_add_optional_columns idempotent on concurrent open ALTER TABLE calls inside _migrate_add_optional_columns were guarded by a snapshot of PRAGMA table_info taken at function entry. When the gateway dispatcher opens the kanban DB twice per tick (once in _tick_once_for_board and once via init_db's discard-and-reconnect path), a second connection can run the same migration before the first one commits, causing: sqlite3.OperationalError: duplicate column name: consecutive_failures This crashed the dispatcher on every first tick after a gateway restart (subsequent ticks succeeded because the columns were then present). Fix: introduce _add_column_if_missing() which wraps ALTER TABLE in a try/except that swallows OperationalError whose message contains 'duplicate column name'. All ALTER TABLE calls in _migrate_add_optional_columns are routed through this helper. Closes #21708 --- hermes_cli/kanban_db.py | 71 ++++++++++++++++++------- tests/hermes_cli/test_kanban_db.py | 83 ++++++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 18 deletions(-) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index 0af557e3e2c..aa3655b1762 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -963,6 +963,25 @@ def init_db( return path +def _add_column_if_missing( + conn: sqlite3.Connection, table: str, column: str, ddl: str +) -> bool: + """Run ``ALTER TABLE ADD COLUMN ``, idempotent across races. + + Returns ``True`` when the column was actually added by this call. + Swallows ``duplicate column name`` errors so a concurrent connection + that ran the same migration first does not crash the dispatcher tick + (issue #21708). + """ + try: + conn.execute(f"ALTER TABLE {table} ADD COLUMN {ddl}") + return True + except sqlite3.OperationalError as exc: + if "duplicate column name" in str(exc).lower(): + return False + raise + + def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None: """Add columns that were introduced after v1 release to legacy DBs. @@ -970,11 +989,13 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None: """ cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")} if "tenant" not in cols: - conn.execute("ALTER TABLE tasks ADD COLUMN tenant TEXT") + _add_column_if_missing(conn, "tasks", "tenant", "tenant TEXT") if "result" not in cols: - conn.execute("ALTER TABLE tasks ADD COLUMN result TEXT") + _add_column_if_missing(conn, "tasks", "result", "result TEXT") if "idempotency_key" not in cols: - conn.execute("ALTER TABLE tasks ADD COLUMN idempotency_key TEXT") + _add_column_if_missing( + conn, "tasks", "idempotency_key", "idempotency_key TEXT" + ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_tasks_idempotency " "ON tasks(idempotency_key)" @@ -997,37 +1018,51 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None: # the *original* snapshot; this is intentional and safe as long as # no step depends on a column added by a previous step in the same call. if "consecutive_failures" not in cols: - conn.execute( - "ALTER TABLE tasks ADD COLUMN consecutive_failures " - "INTEGER NOT NULL DEFAULT 0" + added = _add_column_if_missing( + conn, + "tasks", + "consecutive_failures", + "consecutive_failures INTEGER NOT NULL DEFAULT 0", ) - if "spawn_failures" in cols: + if added and "spawn_failures" in cols: conn.execute( "UPDATE tasks SET consecutive_failures = COALESCE(spawn_failures, 0)" ) if "worker_pid" not in cols: - conn.execute("ALTER TABLE tasks ADD COLUMN worker_pid INTEGER") + _add_column_if_missing(conn, "tasks", "worker_pid", "worker_pid INTEGER") if "last_failure_error" not in cols: - conn.execute("ALTER TABLE tasks ADD COLUMN last_failure_error TEXT") - if "last_spawn_error" in cols: + added = _add_column_if_missing( + conn, "tasks", "last_failure_error", "last_failure_error TEXT" + ) + if added and "last_spawn_error" in cols: conn.execute( "UPDATE tasks SET last_failure_error = last_spawn_error" ) if "max_runtime_seconds" not in cols: - conn.execute("ALTER TABLE tasks ADD COLUMN max_runtime_seconds INTEGER") + _add_column_if_missing( + conn, "tasks", "max_runtime_seconds", "max_runtime_seconds INTEGER" + ) if "last_heartbeat_at" not in cols: - conn.execute("ALTER TABLE tasks ADD COLUMN last_heartbeat_at INTEGER") + _add_column_if_missing( + conn, "tasks", "last_heartbeat_at", "last_heartbeat_at INTEGER" + ) if "current_run_id" not in cols: - conn.execute("ALTER TABLE tasks ADD COLUMN current_run_id INTEGER") + _add_column_if_missing( + conn, "tasks", "current_run_id", "current_run_id INTEGER" + ) if "workflow_template_id" not in cols: - conn.execute("ALTER TABLE tasks ADD COLUMN workflow_template_id TEXT") + _add_column_if_missing( + conn, "tasks", "workflow_template_id", "workflow_template_id TEXT" + ) if "current_step_key" not in cols: - conn.execute("ALTER TABLE tasks ADD COLUMN current_step_key TEXT") + _add_column_if_missing( + conn, "tasks", "current_step_key", "current_step_key TEXT" + ) if "skills" not in cols: # JSON array of skill names the dispatcher force-loads into the # worker (additive to the built-in `kanban-worker`). NULL is fine # for existing rows. - conn.execute("ALTER TABLE tasks ADD COLUMN skills TEXT") + _add_column_if_missing(conn, "tasks", "skills", "skills TEXT") if "max_retries" not in cols: # Per-task override for the consecutive-failure circuit breaker. @@ -1035,13 +1070,13 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None: # config, then ``DEFAULT_FAILURE_LIMIT``. Existing rows get NULL, # which is the correct default (they keep the global behaviour # they were getting before the column existed). - conn.execute("ALTER TABLE tasks ADD COLUMN max_retries INTEGER") + _add_column_if_missing(conn, "tasks", "max_retries", "max_retries INTEGER") # task_events gained a run_id column; back-fill it as NULL for # historical events (they predate runs and can't be attributed). ev_cols = {row["name"] for row in conn.execute("PRAGMA table_info(task_events)")} if "run_id" not in ev_cols: - conn.execute("ALTER TABLE task_events ADD COLUMN run_id INTEGER") + _add_column_if_missing(conn, "task_events", "run_id", "run_id INTEGER") conn.execute( "CREATE INDEX IF NOT EXISTS idx_events_run " "ON task_events(run_id, id)" diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index af9fb1da43c..b750139f454 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -1116,3 +1116,86 @@ def test_unlink_tasks_triggers_recompute_ready(kanban_home): "child should promote to ready immediately after unlink_tasks " "removes its last blocking dependency" ) +# --------------------------------------------------------------------------- +# _add_column_if_missing / _migrate_add_optional_columns idempotency (#21708) +# --------------------------------------------------------------------------- + +def test_add_column_if_missing_is_idempotent_on_race(kanban_home): + """``_add_column_if_missing`` must swallow 'duplicate column name' errors. + + Regression for #21708: the kanban dispatcher opens the DB twice per tick + (once via _tick_once_for_board, once via init_db's discard-and-reconnect + path). A second concurrent connection runs _migrate_add_optional_columns + before the first one commits, so ALTER TABLE raises OperationalError with + 'duplicate column name: consecutive_failures'. Without the idempotency + guard that crashes the dispatcher on the first tick after every restart. + """ + import sqlite3 + + conn = sqlite3.connect(":memory:") + conn.row_factory = sqlite3.Row + conn.execute( + "CREATE TABLE tasks (id INTEGER PRIMARY KEY, title TEXT NOT NULL)" + ) + + # First call adds the column — returns True. + added = kb._add_column_if_missing(conn, "tasks", "extra_col", "extra_col TEXT") + assert added is True + cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")} + assert "extra_col" in cols + + # Second call on same connection — column already exists — must return + # False without raising, simulating the race the dispatcher hits. + added_again = kb._add_column_if_missing( + conn, "tasks", "extra_col", "extra_col TEXT" + ) + assert added_again is False + + conn.close() + + +def test_migrate_add_optional_columns_tolerates_concurrent_migration(kanban_home): + """Full _migrate_add_optional_columns must not raise when columns already + exist (issue #21708 race window — two connections migrate concurrently).""" + import sqlite3 + + # Schema already in fully-migrated state (all optional columns present). + conn = sqlite3.connect(":memory:") + conn.row_factory = sqlite3.Row + conn.execute( + """ + CREATE TABLE tasks ( + id INTEGER PRIMARY KEY, + title TEXT NOT NULL, + tenant TEXT, + result TEXT, + idempotency_key TEXT, + consecutive_failures INTEGER NOT NULL DEFAULT 0, + worker_pid INTEGER, + last_failure_error TEXT, + max_runtime_seconds INTEGER, + last_heartbeat_at INTEGER, + current_run_id INTEGER, + workflow_template_id TEXT, + current_step_key TEXT, + skills TEXT, + max_retries INTEGER + ) + """ + ) + conn.execute( + """ + CREATE TABLE task_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + task_id TEXT NOT NULL DEFAULT '', + run_id INTEGER, + kind TEXT NOT NULL DEFAULT '', + payload TEXT, + created_at INTEGER NOT NULL DEFAULT 0 + ) + """ + ) + + # Running migration on an already-migrated schema must not raise. + kb._migrate_add_optional_columns(conn) + conn.close()