diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index c89e697c98d..9ad61f87817 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -1181,8 +1181,16 @@ def connect( # See hermes_state._WAL_INCOMPAT_MARKERS for detection logic. from hermes_state import apply_wal_with_fallback apply_wal_with_fallback(conn, db_label=f"kanban.db ({path.name})") - conn.execute("PRAGMA synchronous=NORMAL") + # FULL (was NORMAL): fsync before each checkpoint to narrow the + # crash window that can leave a b-tree page header torn. + conn.execute("PRAGMA synchronous=FULL") conn.execute("PRAGMA foreign_keys=ON") + # Zero freed pages so a later torn write cannot expose stale + # cell content; persisted in the DB header for new DBs. + conn.execute("PRAGMA secure_delete=ON") + # Surface corrupt cells as read errors instead of silent + # wrong-data returns. + conn.execute("PRAGMA cell_size_check=ON") needs_init = resolved not in _INITIALIZED_PATHS if needs_init: # Idempotent: runs CREATE TABLE IF NOT EXISTS + the additive diff --git a/tests/hermes_cli/test_kanban_db.py b/tests/hermes_cli/test_kanban_db.py index 883cf8f4d5d..c90fa4582b7 100644 --- a/tests/hermes_cli/test_kanban_db.py +++ b/tests/hermes_cli/test_kanban_db.py @@ -3339,3 +3339,67 @@ def test_maybe_emit_scratch_tip_skips_non_scratch_workspaces(kanban_home, caplog ).fetchall() assert "tip_scratch_workspace" not in [e["kind"] for e in events] + +# --------------------------------------------------------------------------- +# Connection pragmas (secure_delete, cell_size_check, synchronous=FULL) +# --------------------------------------------------------------------------- + + +def test_connect_sets_secure_delete_on(tmp_path): + """secure_delete=ON must be active on every new connection.""" + db_path = tmp_path / "kanban.db" + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + with kb.connect(db_path=db_path) as conn: + row = conn.execute("PRAGMA secure_delete").fetchone() + assert row[0] == 1, f"expected secure_delete=1, got {row[0]}" + + +def test_connect_sets_cell_size_check_on(tmp_path): + """cell_size_check=ON must be active on every new connection.""" + db_path = tmp_path / "kanban.db" + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + with kb.connect(db_path=db_path) as conn: + row = conn.execute("PRAGMA cell_size_check").fetchone() + assert row[0] == 1, f"expected cell_size_check=1, got {row[0]}" + + +def test_connect_sets_synchronous_full(tmp_path): + """synchronous must be FULL (=2), not NORMAL (=1).""" + db_path = tmp_path / "kanban.db" + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + with kb.connect(db_path=db_path) as conn: + row = conn.execute("PRAGMA synchronous").fetchone() + assert row[0] == 2, f"expected synchronous=2 (FULL), got {row[0]}" + + +def test_connect_pragmas_applied_on_reconnect(tmp_path): + """All three pragmas must be re-applied on every connect(), not just the first.""" + db_path = tmp_path / "kanban.db" + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + # First connection: write a task and close. + with kb.connect(db_path=db_path) as conn: + kb.create_task(conn, title="reconnect-check") + # Force re-init path by discarding path cache. + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + # Second connection: pragmas must still be applied. + with kb.connect(db_path=db_path) as conn: + assert conn.execute("PRAGMA secure_delete").fetchone()[0] == 1 + assert conn.execute("PRAGMA cell_size_check").fetchone()[0] == 1 + assert conn.execute("PRAGMA synchronous").fetchone()[0] == 2 + + + +def test_pragmas_not_accidentally_disabled_by_migrate_path(tmp_path): + """Migration path must not reset connection pragmas.""" + db_path = tmp_path / "legacy.db" + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + # Initialise with a fresh connect so schema + init run. + with kb.connect(db_path=db_path) as conn: + kb.create_task(conn, title="pre-migration-task") + # Simulate a re-entry through the init/migration path by discarding path cache. + kb._INITIALIZED_PATHS.discard(str(db_path.resolve())) + with kb.connect(db_path=db_path) as conn: + assert conn.execute("PRAGMA secure_delete").fetchone()[0] == 1 + assert conn.execute("PRAGMA cell_size_check").fetchone()[0] == 1 + assert conn.execute("PRAGMA synchronous").fetchone()[0] == 2 +