diff --git a/gateway/run.py b/gateway/run.py index c2be5f57135..5851b16fb98 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -5424,7 +5424,13 @@ class GatewayRunner: HEALTH_WINDOW = 6 bad_ticks = 0 last_warn_at = 0 - disabled_corrupt_boards: dict[str, tuple[str, int | None, int | None]] = {} + # Avoid hot-looping corrupt-looking board DBs, but do not suppress + # same-fingerprint retries forever: transient WAL/open races can + # surface as "database disk image is malformed" for one tick. + CORRUPT_BOARD_RETRY_AFTER_SECONDS = 300 + disabled_corrupt_boards: dict[ + str, tuple[tuple[str, int | None, int | None], float] + ] = {} def _board_db_fingerprint(slug: str) -> tuple[str, int | None, int | None]: path = _kb.kanban_db_path(slug) @@ -5439,6 +5445,9 @@ class GatewayRunner: return (resolved, stat.st_mtime_ns, stat.st_size) def _is_corrupt_board_db_error(exc: Exception) -> bool: + corrupt_guard_error = getattr(_kb, "KanbanDbCorruptError", None) + if corrupt_guard_error is not None and isinstance(exc, corrupt_guard_error): + return True if not isinstance(exc, sqlite3.DatabaseError): return False msg = str(exc).lower() @@ -5458,14 +5467,27 @@ class GatewayRunner: """ conn = None fingerprint = _board_db_fingerprint(slug) - disabled_fingerprint = disabled_corrupt_boards.get(slug) - if disabled_fingerprint == fingerprint: - return None - if disabled_fingerprint is not None: - logger.info( - "kanban dispatcher: board %s database changed; retrying dispatch", - slug, - ) + disabled_entry = disabled_corrupt_boards.get(slug) + if disabled_entry is not None: + disabled_fingerprint, disabled_at = disabled_entry + age = time.monotonic() - disabled_at + if ( + disabled_fingerprint == fingerprint + and age < CORRUPT_BOARD_RETRY_AFTER_SECONDS + ): + return None + if disabled_fingerprint == fingerprint: + logger.info( + "kanban dispatcher: board %s database fingerprint unchanged " + "after %.0fs quarantine; retrying dispatch", + slug, + age, + ) + else: + logger.info( + "kanban dispatcher: board %s database changed; retrying dispatch", + slug, + ) disabled_corrupt_boards.pop(slug, None) try: conn = _kb.connect(board=slug) @@ -5485,20 +5507,32 @@ class GatewayRunner: ) except sqlite3.DatabaseError as exc: if _is_corrupt_board_db_error(exc): - disabled_corrupt_boards[slug] = fingerprint + disabled_corrupt_boards[slug] = (fingerprint, time.monotonic()) logger.error( "kanban dispatcher: board %s database %s is not a valid " - "SQLite database; disabling dispatch for this board " - "until the file changes or the gateway restarts. Move " - "or restore the file, then run `hermes kanban init` if " - "you need a fresh board.", + "SQLite database; pausing dispatch for this board until " + "the file changes, the gateway restarts, or the " + "quarantine timer expires. Move or restore the file, " + "then run `hermes kanban init` if you need a fresh board.", slug, fingerprint[0], ) return None logger.exception("kanban dispatcher: tick failed on board %s", slug) return None - except Exception: + except Exception as exc: + if _is_corrupt_board_db_error(exc): + disabled_corrupt_boards[slug] = (fingerprint, time.monotonic()) + logger.error( + "kanban dispatcher: board %s database %s is not a valid " + "SQLite database; pausing dispatch for this board until " + "the file changes, the gateway restarts, or the " + "quarantine timer expires. Move or restore the file, " + "then run `hermes kanban init` if you need a fresh board.", + slug, + fingerprint[0], + ) + return None logger.exception("kanban dispatcher: tick failed on board %s", slug) return None finally: diff --git a/tests/hermes_cli/test_kanban_core_functionality.py b/tests/hermes_cli/test_kanban_core_functionality.py index a97ddbbe15b..5b645d318f9 100644 --- a/tests/hermes_cli/test_kanban_core_functionality.py +++ b/tests/hermes_cli/test_kanban_core_functionality.py @@ -3602,8 +3602,9 @@ def test_gateway_dispatcher_watcher_env_truthy_uses_config(monkeypatch): ) +@pytest.mark.parametrize("corrupt_exc", ["sqlite", "guard"]) def test_gateway_dispatcher_disables_corrupt_board_without_traceback( - monkeypatch, tmp_path, caplog + monkeypatch, tmp_path, caplog, corrupt_exc ): """Corrupt board DBs log one actionable error and stop retrying per tick.""" import asyncio @@ -3645,6 +3646,12 @@ def test_gateway_dispatcher_disables_corrupt_board_without_traceback( def _connect(*args, **kwargs): calls["connect"] += 1 + if corrupt_exc == "guard": + raise _kb.KanbanDbCorruptError( + corrupt_db, + corrupt_db.with_suffix(".db.corrupt.test.bak"), + "sqlite refused to open file: database disk image is malformed", + ) raise sqlite3.DatabaseError("file is not a database") async def _to_thread(fn, *args, **kwargs): @@ -3682,6 +3689,93 @@ def test_gateway_dispatcher_disables_corrupt_board_without_traceback( assert calls["connect"] == 5 +def test_gateway_dispatcher_retries_corrupt_board_after_quarantine( + monkeypatch, tmp_path, caplog +): + """A corrupt-looking board is retried after the quarantine TTL expires.""" + import asyncio + import inspect + import logging + import sqlite3 + + from gateway.run import GatewayRunner + import hermes_cli.config as _cfg_mod + import hermes_cli.kanban_db as _kb + + runner = object.__new__(GatewayRunner) + runner._running = True + corrupt_db = tmp_path / "kanban.db" + corrupt_db.write_text("not sqlite", encoding="utf-8") + + monkeypatch.setattr( + _cfg_mod, + "load_config", + lambda: { + "kanban": { + "dispatch_in_gateway": True, + "dispatch_interval_seconds": 1, + } + }, + ) + monkeypatch.setattr( + _kb, + "list_boards", + lambda include_archived=False: [{"slug": _kb.DEFAULT_BOARD}], + ) + monkeypatch.setattr( + _kb, + "read_board_metadata", + lambda slug: {"slug": slug}, + ) + monkeypatch.setattr(_kb, "kanban_db_path", lambda board=None: corrupt_db) + + real_monotonic = time.monotonic + time_values = iter([1000.0, 1001.0, 1301.0, 1301.0]) + + def _monotonic_for_gateway_dispatcher(): + caller = inspect.currentframe().f_back # type: ignore[union-attr] + code = caller.f_code if caller is not None else None + filename = code.co_filename if code is not None else "" + if filename.endswith("gateway/run.py"): + return next(time_values, 1301.0) + return real_monotonic() + + monkeypatch.setattr("gateway.run.time.monotonic", _monotonic_for_gateway_dispatcher) + + calls = {"tick": 0} + + def _connect(*args, **kwargs): + raise sqlite3.DatabaseError("file is not a database") + + async def _to_thread(fn, *args, **kwargs): + result = fn(*args, **kwargs) + if getattr(fn, "__name__", "") == "_tick_once": + calls["tick"] += 1 + if calls["tick"] >= 3: + runner._running = False + return result + + async def _sleep(_delay): + return None + + monkeypatch.setattr(_kb, "connect", _connect) + monkeypatch.setattr("gateway.run.asyncio.to_thread", _to_thread) + monkeypatch.setattr("gateway.run.asyncio.sleep", _sleep) + + with caplog.at_level(logging.INFO, logger="gateway.run"): + asyncio.run( + asyncio.wait_for( + runner._kanban_dispatcher_watcher(), + timeout=3.0, + ) + ) + + messages = [record.getMessage() for record in caplog.records] + assert sum("not a valid SQLite database" in msg for msg in messages) == 2 + assert any("database fingerprint unchanged" in msg for msg in messages) + assert calls["tick"] == 3 + + # --------------------------------------------------------------------------- # Hallucination gate (created_cards verify + prose scan) # ---------------------------------------------------------------------------