fix(kanban): make _migrate_add_optional_columns idempotent on concurrent open

ALTER TABLE calls inside _migrate_add_optional_columns were guarded by a
snapshot of PRAGMA table_info taken at function entry.  When the gateway
dispatcher opens the kanban DB twice per tick (once in _tick_once_for_board
and once via init_db's discard-and-reconnect path), a second connection can
run the same migration before the first one commits, causing:

  sqlite3.OperationalError: duplicate column name: consecutive_failures

This crashed the dispatcher on every first tick after a gateway restart
(subsequent ticks succeeded because the columns were then present).

Fix: introduce _add_column_if_missing() which wraps ALTER TABLE in a
try/except that swallows OperationalError whose message contains
'duplicate column name'.  All ALTER TABLE calls in
_migrate_add_optional_columns are routed through this helper.

Closes #21708
This commit is contained in:
Wesley Simplicio 2026-05-09 12:12:24 -03:00 committed by Teknium
parent 68854cdcdb
commit 78698381af
2 changed files with 136 additions and 18 deletions

View file

@ -963,6 +963,25 @@ def init_db(
return path
def _add_column_if_missing(
conn: sqlite3.Connection, table: str, column: str, ddl: str
) -> bool:
"""Run ``ALTER TABLE <table> ADD COLUMN <ddl>``, idempotent across races.
Returns ``True`` when the column was actually added by this call.
Swallows ``duplicate column name`` errors so a concurrent connection
that ran the same migration first does not crash the dispatcher tick
(issue #21708).
"""
try:
conn.execute(f"ALTER TABLE {table} ADD COLUMN {ddl}")
return True
except sqlite3.OperationalError as exc:
if "duplicate column name" in str(exc).lower():
return False
raise
def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
"""Add columns that were introduced after v1 release to legacy DBs.
@ -970,11 +989,13 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
"""
cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")}
if "tenant" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN tenant TEXT")
_add_column_if_missing(conn, "tasks", "tenant", "tenant TEXT")
if "result" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN result TEXT")
_add_column_if_missing(conn, "tasks", "result", "result TEXT")
if "idempotency_key" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN idempotency_key TEXT")
_add_column_if_missing(
conn, "tasks", "idempotency_key", "idempotency_key TEXT"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_tasks_idempotency "
"ON tasks(idempotency_key)"
@ -997,37 +1018,51 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
# the *original* snapshot; this is intentional and safe as long as
# no step depends on a column added by a previous step in the same call.
if "consecutive_failures" not in cols:
conn.execute(
"ALTER TABLE tasks ADD COLUMN consecutive_failures "
"INTEGER NOT NULL DEFAULT 0"
added = _add_column_if_missing(
conn,
"tasks",
"consecutive_failures",
"consecutive_failures INTEGER NOT NULL DEFAULT 0",
)
if "spawn_failures" in cols:
if added and "spawn_failures" in cols:
conn.execute(
"UPDATE tasks SET consecutive_failures = COALESCE(spawn_failures, 0)"
)
if "worker_pid" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN worker_pid INTEGER")
_add_column_if_missing(conn, "tasks", "worker_pid", "worker_pid INTEGER")
if "last_failure_error" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN last_failure_error TEXT")
if "last_spawn_error" in cols:
added = _add_column_if_missing(
conn, "tasks", "last_failure_error", "last_failure_error TEXT"
)
if added and "last_spawn_error" in cols:
conn.execute(
"UPDATE tasks SET last_failure_error = last_spawn_error"
)
if "max_runtime_seconds" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN max_runtime_seconds INTEGER")
_add_column_if_missing(
conn, "tasks", "max_runtime_seconds", "max_runtime_seconds INTEGER"
)
if "last_heartbeat_at" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN last_heartbeat_at INTEGER")
_add_column_if_missing(
conn, "tasks", "last_heartbeat_at", "last_heartbeat_at INTEGER"
)
if "current_run_id" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN current_run_id INTEGER")
_add_column_if_missing(
conn, "tasks", "current_run_id", "current_run_id INTEGER"
)
if "workflow_template_id" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN workflow_template_id TEXT")
_add_column_if_missing(
conn, "tasks", "workflow_template_id", "workflow_template_id TEXT"
)
if "current_step_key" not in cols:
conn.execute("ALTER TABLE tasks ADD COLUMN current_step_key TEXT")
_add_column_if_missing(
conn, "tasks", "current_step_key", "current_step_key TEXT"
)
if "skills" not in cols:
# JSON array of skill names the dispatcher force-loads into the
# worker (additive to the built-in `kanban-worker`). NULL is fine
# for existing rows.
conn.execute("ALTER TABLE tasks ADD COLUMN skills TEXT")
_add_column_if_missing(conn, "tasks", "skills", "skills TEXT")
if "max_retries" not in cols:
# Per-task override for the consecutive-failure circuit breaker.
@ -1035,13 +1070,13 @@ def _migrate_add_optional_columns(conn: sqlite3.Connection) -> None:
# config, then ``DEFAULT_FAILURE_LIMIT``. Existing rows get NULL,
# which is the correct default (they keep the global behaviour
# they were getting before the column existed).
conn.execute("ALTER TABLE tasks ADD COLUMN max_retries INTEGER")
_add_column_if_missing(conn, "tasks", "max_retries", "max_retries INTEGER")
# task_events gained a run_id column; back-fill it as NULL for
# historical events (they predate runs and can't be attributed).
ev_cols = {row["name"] for row in conn.execute("PRAGMA table_info(task_events)")}
if "run_id" not in ev_cols:
conn.execute("ALTER TABLE task_events ADD COLUMN run_id INTEGER")
_add_column_if_missing(conn, "task_events", "run_id", "run_id INTEGER")
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_events_run "
"ON task_events(run_id, id)"

View file

@ -1116,3 +1116,86 @@ def test_unlink_tasks_triggers_recompute_ready(kanban_home):
"child should promote to ready immediately after unlink_tasks "
"removes its last blocking dependency"
)
# ---------------------------------------------------------------------------
# _add_column_if_missing / _migrate_add_optional_columns idempotency (#21708)
# ---------------------------------------------------------------------------
def test_add_column_if_missing_is_idempotent_on_race(kanban_home):
"""``_add_column_if_missing`` must swallow 'duplicate column name' errors.
Regression for #21708: the kanban dispatcher opens the DB twice per tick
(once via _tick_once_for_board, once via init_db's discard-and-reconnect
path). A second concurrent connection runs _migrate_add_optional_columns
before the first one commits, so ALTER TABLE raises OperationalError with
'duplicate column name: consecutive_failures'. Without the idempotency
guard that crashes the dispatcher on the first tick after every restart.
"""
import sqlite3
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.execute(
"CREATE TABLE tasks (id INTEGER PRIMARY KEY, title TEXT NOT NULL)"
)
# First call adds the column — returns True.
added = kb._add_column_if_missing(conn, "tasks", "extra_col", "extra_col TEXT")
assert added is True
cols = {row["name"] for row in conn.execute("PRAGMA table_info(tasks)")}
assert "extra_col" in cols
# Second call on same connection — column already exists — must return
# False without raising, simulating the race the dispatcher hits.
added_again = kb._add_column_if_missing(
conn, "tasks", "extra_col", "extra_col TEXT"
)
assert added_again is False
conn.close()
def test_migrate_add_optional_columns_tolerates_concurrent_migration(kanban_home):
"""Full _migrate_add_optional_columns must not raise when columns already
exist (issue #21708 race window — two connections migrate concurrently)."""
import sqlite3
# Schema already in fully-migrated state (all optional columns present).
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
conn.execute(
"""
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
title TEXT NOT NULL,
tenant TEXT,
result TEXT,
idempotency_key TEXT,
consecutive_failures INTEGER NOT NULL DEFAULT 0,
worker_pid INTEGER,
last_failure_error TEXT,
max_runtime_seconds INTEGER,
last_heartbeat_at INTEGER,
current_run_id INTEGER,
workflow_template_id TEXT,
current_step_key TEXT,
skills TEXT,
max_retries INTEGER
)
"""
)
conn.execute(
"""
CREATE TABLE task_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL DEFAULT '',
run_id INTEGER,
kind TEXT NOT NULL DEFAULT '',
payload TEXT,
created_at INTEGER NOT NULL DEFAULT 0
)
"""
)
# Running migration on an already-migrated schema must not raise.
kb._migrate_add_optional_columns(conn)
conn.close()