diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py
index 0af557e3e2c..aa3655b1762 100644
--- a/hermes_cli/kanban_db.py
+++ b/hermes_cli/kanban_db.py
@@ -963,6 +963,25 @@ def init_db(
return path
+def _add_column_if_missing(
+ conn: sqlite3.Connection, table: str, column: str, ddl: str
+) -> bool:
+ """Run ``ALTER TABLE
ADD COLUMN ``, idempotent across races.
+
+ Returns ``True`` when the column was actually added by this call.
+ Swallows ``duplicate column name`` errors so a concurrent connection
+ that ran the same migration first does not crash the dispatcher tick
+ (issue #21708).
+ """
+ try:
+ conn.execute(f"ALTER TABLE {table} ADD COLUMN {ddl}")
+ return True
+ except sqlite3.OperationalError as exc:
+ if "duplicate column name" in str(exc).lower():
+ return False
+ raise
+
+
def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
"""Add columns that were introduced after v1 release to legacy DBs.
@@ -970,11 +989,13 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
"""
cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")}
if "tenant" not in cols:
- conn.execute("ALTER TABLE tasks ADD COLUMN tenant TEXT")
+ _add_column_if_missing(conn, "tasks", "tenant", "tenant TEXT")
if "result" not in cols:
- conn.execute("ALTER TABLE tasks ADD COLUMN result TEXT")
+ _add_column_if_missing(conn, "tasks", "result", "result TEXT")
if "idempotency_key" not in cols:
- conn.execute("ALTER TABLE tasks ADD COLUMN idempotency_key TEXT")
+ _add_column_if_missing(
+ conn, "tasks", "idempotency_key", "idempotency_key TEXT"
+ )
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_tasks_idempotency "
"ON tasks(idempotency_key)"
@@ -997,37 +1018,51 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
# the *original* snapshot; this is intentional and safe as long as
# no step depends on a column added by a previous step in the same call.
if "consecutive_failures" not in cols:
- conn.execute(
- "ALTER TABLE tasks ADD COLUMN consecutive_failures "
- "INTEGER NOT NULL DEFAULT 0"
+ added = _add_column_if_missing(
+ conn,
+ "tasks",
+ "consecutive_failures",
+ "consecutive_failures INTEGER NOT NULL DEFAULT 0",
)
- if "spawn_failures" in cols:
+ if added and "spawn_failures" in cols:
conn.execute(
"UPDATE tasks SET consecutive_failures = COALESCE(spawn_failures, 0)"
)
if "worker_pid" not in cols:
- conn.execute("ALTER TABLE tasks ADD COLUMN worker_pid INTEGER")
+ _add_column_if_missing(conn, "tasks", "worker_pid", "worker_pid INTEGER")
if "last_failure_error" not in cols:
- conn.execute("ALTER TABLE tasks ADD COLUMN last_failure_error TEXT")
- if "last_spawn_error" in cols:
+ added = _add_column_if_missing(
+ conn, "tasks", "last_failure_error", "last_failure_error TEXT"
+ )
+ if added and "last_spawn_error" in cols:
conn.execute(
"UPDATE tasks SET last_failure_error = last_spawn_error"
)
if "max_runtime_seconds" not in cols:
- conn.execute("ALTER TABLE tasks ADD COLUMN max_runtime_seconds INTEGER")
+ _add_column_if_missing(
+ conn, "tasks", "max_runtime_seconds", "max_runtime_seconds INTEGER"
+ )
if "last_heartbeat_at" not in cols:
- conn.execute("ALTER TABLE tasks ADD COLUMN last_heartbeat_at INTEGER")
+ _add_column_if_missing(
+ conn, "tasks", "last_heartbeat_at", "last_heartbeat_at INTEGER"
+ )
if "current_run_id" not in cols:
- conn.execute("ALTER TABLE tasks ADD COLUMN current_run_id INTEGER")
+ _add_column_if_missing(
+ conn, "tasks", "current_run_id", "current_run_id INTEGER"
+ )
if "workflow_template_id" not in cols:
- conn.execute("ALTER TABLE tasks ADD COLUMN workflow_template_id TEXT")
+ _add_column_if_missing(
+ conn, "tasks", "workflow_template_id", "workflow_template_id TEXT"
+ )
if "current_step_key" not in cols:
- conn.execute("ALTER TABLE tasks ADD COLUMN current_step_key TEXT")
+ _add_column_if_missing(
+ conn, "tasks", "current_step_key", "current_step_key TEXT"
+ )
if "skills" not in cols:
# JSON array of skill names the dispatcher force-loads into the
# worker (additive to the built-in `kanban-worker`). NULL is fine
# for existing rows.
- conn.execute("ALTER TABLE tasks ADD COLUMN skills TEXT")
+ _add_column_if_missing(conn, "tasks", "skills", "skills TEXT")
if "max_retries" not in cols:
# Per-task override for the consecutive-failure circuit breaker.
@@ -1035,13 +1070,13 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
# config, then ``DEFAULT_FAILURE_LIMIT``. Existing rows get NULL,
# which is the correct default (they keep the global behaviour
# they were getting before the column existed).
- conn.execute("ALTER TABLE tasks ADD COLUMN max_retries INTEGER")
+ _add_column_if_missing(conn, "tasks", "max_retries", "max_retries INTEGER")
# task_events gained a run_id column; back-fill it as NULL for
# historical events (they predate runs and can't be attributed).
ev_cols = {row["name"] for row in conn.execute("PRAGMA table_info(task_events)")}
if "run_id" not in ev_cols:
- conn.execute("ALTER TABLE task_events ADD COLUMN run_id INTEGER")
+ _add_column_if_missing(conn, "task_events", "run_id", "run_id INTEGER")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_events_run "
"ON task_events(run_id, id)"
diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py
index af9fb1da43c..b750139f454 100644
--- a/tests/hermes_cli/test_kanban_db.py
+++ b/tests/hermes_cli/test_kanban_db.py
@@ -1116,3 +1116,86 @@ def test_unlink_tasks_triggers_recompute_ready(kanban_home):
"child should promote to ready immediately after unlink_tasks "
"removes its last blocking dependency"
)
+# ---------------------------------------------------------------------------
+# _add_column_if_missing / _migrate_add_optional_columns idempotency (#21708)
+# ---------------------------------------------------------------------------
+
+def test_add_column_if_missing_is_idempotent_on_race(kanban_home):
+ """``_add_column_if_missing`` must swallow 'duplicate column name' errors.
+
+ Regression for #21708: the kanban dispatcher opens the DB twice per tick
+ (once via _tick_once_for_board, once via init_db's discard-and-reconnect
+ path). A second concurrent connection runs _migrate_add_optional_columns
+ before the first one commits, so ALTER TABLE raises OperationalError with
+ 'duplicate column name: consecutive_failures'. Without the idempotency
+ guard that crashes the dispatcher on the first tick after every restart.
+ """
+ import sqlite3
+
+ conn = sqlite3.connect(":memory:")
+ conn.row_factory = sqlite3.Row
+ conn.execute(
+ "CREATE TABLE tasks (id INTEGER PRIMARY KEY, title TEXT NOT NULL)"
+ )
+
+ # First call adds the column — returns True.
+ added = kb._add_column_if_missing(conn, "tasks", "extra_col", "extra_col TEXT")
+ assert added is True
+ cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")}
+ assert "extra_col" in cols
+
+ # Second call on same connection — column already exists — must return
+ # False without raising, simulating the race the dispatcher hits.
+ added_again = kb._add_column_if_missing(
+ conn, "tasks", "extra_col", "extra_col TEXT"
+ )
+ assert added_again is False
+
+ conn.close()
+
+
+def test_migrate_add_optional_columns_tolerates_concurrent_migration(kanban_home):
+ """Full _migrate_add_optional_columns must not raise when columns already
+ exist (issue #21708 race window — two connections migrate concurrently)."""
+ import sqlite3
+
+ # Schema already in fully-migrated state (all optional columns present).
+ conn = sqlite3.connect(":memory:")
+ conn.row_factory = sqlite3.Row
+ conn.execute(
+ """
+ CREATE TABLE tasks (
+ id INTEGER PRIMARY KEY,
+ title TEXT NOT NULL,
+ tenant TEXT,
+ result TEXT,
+ idempotency_key TEXT,
+ consecutive_failures INTEGER NOT NULL DEFAULT 0,
+ worker_pid INTEGER,
+ last_failure_error TEXT,
+ max_runtime_seconds INTEGER,
+ last_heartbeat_at INTEGER,
+ current_run_id INTEGER,
+ workflow_template_id TEXT,
+ current_step_key TEXT,
+ skills TEXT,
+ max_retries INTEGER
+ )
+ """
+ )
+ conn.execute(
+ """
+ CREATE TABLE task_events (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ task_id TEXT NOT NULL DEFAULT '',
+ run_id INTEGER,
+ kind TEXT NOT NULL DEFAULT '',
+ payload TEXT,
+ created_at INTEGER NOT NULL DEFAULT 0
+ )
+ """
+ )
+
+ # Running migration on an already-migrated schema must not raise.
+ kb._migrate_add_optional_columns(conn)
+ conn.close()