fix(kanban): retry corrupt-board dispatch after quarantine

This commit is contained in:
Donovan Yohan 2026-05-27 13:52:18 +00:00 committed by Teknium
parent fc47b7285c
commit c94ad89818
2 changed files with 144 additions and 16 deletions

View file

@ -5424,7 +5424,13 @@ class GatewayRunner:
HEALTH_WINDOW = 6
bad_ticks = 0
last_warn_at = 0
disabled_corrupt_boards: dict[str, tuple[str, int | None, int | None]] = {}
# Avoid hot-looping corrupt-looking board DBs, but do not suppress
# same-fingerprint retries forever: transient WAL/open races can
# surface as "database disk image is malformed" for one tick.
CORRUPT_BOARD_RETRY_AFTER_SECONDS = 300
disabled_corrupt_boards: dict[
str, tuple[tuple[str, int | None, int | None], float]
] = {}
def _board_db_fingerprint(slug: str) -> tuple[str, int | None, int | None]:
path = _kb.kanban_db_path(slug)
@ -5439,6 +5445,9 @@ class GatewayRunner:
return (resolved, stat.st_mtime_ns, stat.st_size)
def _is_corrupt_board_db_error(exc: Exception) -> bool:
corrupt_guard_error = getattr(_kb, "KanbanDbCorruptError", None)
if corrupt_guard_error is not None and isinstance(exc, corrupt_guard_error):
return True
if not isinstance(exc, sqlite3.DatabaseError):
return False
msg = str(exc).lower()
@ -5458,14 +5467,27 @@ class GatewayRunner:
"""
conn = None
fingerprint = _board_db_fingerprint(slug)
disabled_fingerprint = disabled_corrupt_boards.get(slug)
if disabled_fingerprint == fingerprint:
return None
if disabled_fingerprint is not None:
logger.info(
"kanban dispatcher: board %s database changed; retrying dispatch",
slug,
)
disabled_entry = disabled_corrupt_boards.get(slug)
if disabled_entry is not None:
disabled_fingerprint, disabled_at = disabled_entry
age = time.monotonic() - disabled_at
if (
disabled_fingerprint == fingerprint
and age < CORRUPT_BOARD_RETRY_AFTER_SECONDS
):
return None
if disabled_fingerprint == fingerprint:
logger.info(
"kanban dispatcher: board %s database fingerprint unchanged "
"after %.0fs quarantine; retrying dispatch",
slug,
age,
)
else:
logger.info(
"kanban dispatcher: board %s database changed; retrying dispatch",
slug,
)
disabled_corrupt_boards.pop(slug, None)
try:
conn = _kb.connect(board=slug)
@ -5485,20 +5507,32 @@ class GatewayRunner:
)
except sqlite3.DatabaseError as exc:
if _is_corrupt_board_db_error(exc):
disabled_corrupt_boards[slug] = fingerprint
disabled_corrupt_boards[slug] = (fingerprint, time.monotonic())
logger.error(
"kanban dispatcher: board %s database %s is not a valid "
"SQLite database; disabling dispatch for this board "
"until the file changes or the gateway restarts. Move "
"or restore the file, then run `hermes kanban init` if "
"you need a fresh board.",
"SQLite database; pausing dispatch for this board until "
"the file changes, the gateway restarts, or the "
"quarantine timer expires. Move or restore the file, "
"then run `hermes kanban init` if you need a fresh board.",
slug,
fingerprint[0],
)
return None
logger.exception("kanban dispatcher: tick failed on board %s", slug)
return None
except Exception:
except Exception as exc:
if _is_corrupt_board_db_error(exc):
disabled_corrupt_boards[slug] = (fingerprint, time.monotonic())
logger.error(
"kanban dispatcher: board %s database %s is not a valid "
"SQLite database; pausing dispatch for this board until "
"the file changes, the gateway restarts, or the "
"quarantine timer expires. Move or restore the file, "
"then run `hermes kanban init` if you need a fresh board.",
slug,
fingerprint[0],
)
return None
logger.exception("kanban dispatcher: tick failed on board %s", slug)
return None
finally:

View file

@ -3602,8 +3602,9 @@ def test_gateway_dispatcher_watcher_env_truthy_uses_config(monkeypatch):
)
@pytest.mark.parametrize("corrupt_exc", ["sqlite", "guard"])
def test_gateway_dispatcher_disables_corrupt_board_without_traceback(
monkeypatch, tmp_path, caplog
monkeypatch, tmp_path, caplog, corrupt_exc
):
"""Corrupt board DBs log one actionable error and stop retrying per tick."""
import asyncio
@ -3645,6 +3646,12 @@ def test_gateway_dispatcher_disables_corrupt_board_without_traceback(
def _connect(*args, **kwargs):
calls["connect"] += 1
if corrupt_exc == "guard":
raise _kb.KanbanDbCorruptError(
corrupt_db,
corrupt_db.with_suffix(".db.corrupt.test.bak"),
"sqlite refused to open file: database disk image is malformed",
)
raise sqlite3.DatabaseError("file is not a database")
async def _to_thread(fn, *args, **kwargs):
@ -3682,6 +3689,93 @@ def test_gateway_dispatcher_disables_corrupt_board_without_traceback(
assert calls["connect"] == 5
def test_gateway_dispatcher_retries_corrupt_board_after_quarantine(
monkeypatch, tmp_path, caplog
):
"""A corrupt-looking board is retried after the quarantine TTL expires."""
import asyncio
import inspect
import logging
import sqlite3
from gateway.run import GatewayRunner
import hermes_cli.config as _cfg_mod
import hermes_cli.kanban_db as _kb
runner = object.__new__(GatewayRunner)
runner._running = True
corrupt_db = tmp_path / "kanban.db"
corrupt_db.write_text("not sqlite", encoding="utf-8")
monkeypatch.setattr(
_cfg_mod,
"load_config",
lambda: {
"kanban": {
"dispatch_in_gateway": True,
"dispatch_interval_seconds": 1,
}
},
)
monkeypatch.setattr(
_kb,
"list_boards",
lambda include_archived=False: [{"slug": _kb.DEFAULT_BOARD}],
)
monkeypatch.setattr(
_kb,
"read_board_metadata",
lambda slug: {"slug": slug},
)
monkeypatch.setattr(_kb, "kanban_db_path", lambda board=None: corrupt_db)
real_monotonic = time.monotonic
time_values = iter([1000.0, 1001.0, 1301.0, 1301.0])
def _monotonic_for_gateway_dispatcher():
caller = inspect.currentframe().f_back # type: ignore[union-attr]
code = caller.f_code if caller is not None else None
filename = code.co_filename if code is not None else ""
if filename.endswith("gateway/run.py"):
return next(time_values, 1301.0)
return real_monotonic()
monkeypatch.setattr("gateway.run.time.monotonic", _monotonic_for_gateway_dispatcher)
calls = {"tick": 0}
def _connect(*args, **kwargs):
raise sqlite3.DatabaseError("file is not a database")
async def _to_thread(fn, *args, **kwargs):
result = fn(*args, **kwargs)
if getattr(fn, "__name__", "") == "_tick_once":
calls["tick"] += 1
if calls["tick"] >= 3:
runner._running = False
return result
async def _sleep(_delay):
return None
monkeypatch.setattr(_kb, "connect", _connect)
monkeypatch.setattr("gateway.run.asyncio.to_thread", _to_thread)
monkeypatch.setattr("gateway.run.asyncio.sleep", _sleep)
with caplog.at_level(logging.INFO, logger="gateway.run"):
asyncio.run(
asyncio.wait_for(
runner._kanban_dispatcher_watcher(),
timeout=3.0,
)
)
messages = [record.getMessage() for record in caplog.records]
assert sum("not a valid SQLite database" in msg for msg in messages) == 2
assert any("database fingerprint unchanged" in msg for msg in messages)
assert calls["tick"] == 3
# ---------------------------------------------------------------------------
# Hallucination gate (created_cards verify + prose scan)
# ---------------------------------------------------------------------------