fix(kanban): retry write_txn on transient SQLITE_BUSY

This commit is contained in:
yoniebans 2026-06-27 23:11:25 +02:00 committed by Teknium
parent 90c1dc0493
commit 204a67f0c8
2 changed files with 66 additions and 3 deletions

View file

@ -75,6 +75,7 @@ import hashlib
import json
import os
import re
import random
import secrets
import shutil
import sqlite3
@ -2270,6 +2271,38 @@ def _check_file_length_invariant(conn: sqlite3.Connection) -> None:
pass # I/O errors during check are non-fatal; let normal ops continue
# SQLite's own busy_timeout uses a near-deterministic backoff, so concurrent
# writers re-collide in lockstep under a stampede. A jittered retry on the
# transaction boundary breaks that convoy. Mirrors state.db's _execute_write:
# a fixed 20-150ms jitter band (a 20ms floor prevents a near-zero retry from
# busy-spinning back into the collision). Only BEGIN IMMEDIATE and COMMIT are
# retried -- both are idempotent re-issues that touch no transaction body, so a
# CAS inside write_txn is never replayed. kanban keeps fewer retries than
# state.db (5 vs 15) because its 120s busy_timeout already absorbs most waits;
# the retry is the backstop for the tail SQLite returns BUSY on immediately.
_BUSY_MAX_RETRIES = 5
_BUSY_RETRY_MIN_S = 0.020 # 20ms
_BUSY_RETRY_MAX_S = 0.150 # 150ms
def _is_busy_error(exc: BaseException) -> bool:
return isinstance(exc, sqlite3.OperationalError) and (
"database is locked" in str(exc).lower()
or "database is busy" in str(exc).lower()
)
def _execute_boundary_with_retry(conn: sqlite3.Connection, sql: str) -> None:
for attempt in range(_BUSY_MAX_RETRIES + 1):
try:
conn.execute(sql)
return
except sqlite3.OperationalError as exc:
if not _is_busy_error(exc) or attempt == _BUSY_MAX_RETRIES:
raise
time.sleep(random.uniform(_BUSY_RETRY_MIN_S, _BUSY_RETRY_MAX_S))
@contextlib.contextmanager
def write_txn(conn: sqlite3.Connection):
"""Context manager for an IMMEDIATE write transaction.
@ -2282,7 +2315,7 @@ def write_txn(conn: sqlite3.Connection):
a SQLite auto-rollback (which leaves no active transaction) does not
shadow the original exception with a spurious rollback error.
"""
conn.execute("BEGIN IMMEDIATE")
_execute_boundary_with_retry(conn, "BEGIN IMMEDIATE")
try:
yield conn
except Exception:
@ -2295,7 +2328,16 @@ def write_txn(conn: sqlite3.Connection):
pass
raise
else:
conn.execute("COMMIT")
try:
_execute_boundary_with_retry(conn, "COMMIT")
except Exception:
# COMMIT exhausted retries with the txn still open; roll back so the
# connection isn't poisoned for the next BEGIN IMMEDIATE.
try:
conn.execute("ROLLBACK")
except sqlite3.OperationalError:
pass
raise
# Post-commit file-length check: header page_count must match actual file pages.
# A discrepancy means a torn-extend — raise now rather than silently corrupt.
_check_file_length_invariant(conn)

View file

@ -54,6 +54,18 @@ def _no_file_check(monkeypatch):
monkeypatch.setattr(kb, "_check_file_length_invariant", lambda conn: None)
def test_retry_sleep_respects_floor(monkeypatch):
# The jitter has a floor so a retry can't busy-spin back into the collision.
slept = []
monkeypatch.setattr(kb.time, "sleep", lambda s: slept.append(s))
conn = _FakeConn({"BEGIN": [_busy(), _busy(), None]})
with kb.write_txn(conn):
pass
assert slept
assert all(s >= kb._BUSY_RETRY_MIN_S for s in slept)
assert all(s <= kb._BUSY_RETRY_MAX_S for s in slept)
def test_transient_busy_at_begin_is_absorbed():
conn = _FakeConn({"BEGIN": [_busy(), None]})
with kb.write_txn(conn):
@ -99,4 +111,13 @@ def test_clean_path_commits_once():
with kb.write_txn(conn):
pass
assert conn.count("BEGIN") == 1
assert conn.count("COMMIT") == 1
def test_persistent_busy_at_commit_rolls_back():
# Exhausted COMMIT leaves the txn open; write_txn must ROLLBACK before
# re-raising so the connection isn't poisoned for the next transaction.
conn = _FakeConn({"COMMIT": [_busy()] * 50})
with pytest.raises(sqlite3.OperationalError, match="database is locked"):
with kb.write_txn(conn):
pass
assert conn.count("ROLLBACK") == 1