fix(kanban): harden corrupt-db backup against CodeQL path-injection findings

Path.resolve() before any I/O and confine backup writes to the resolved
parent directory. Adds explicit parent-equality assertions so static
analyzers see the containment guarantee, and walks WAL/SHM sidecars
through the same resolved-parent path so accidental .. segments are
collapsed before shutil.copy2.

Functionally equivalent to the original PR; preserves the corrupt bytes
to <db>.corrupt.<ts>.bak in the same directory, still raises
KanbanDbCorruptError from connect(). E2E with Stefan's exact hex header
+ malformed pages still passes. 163/163 kanban tests still pass.
This commit is contained in:
teknium1 2026-05-23 02:46:59 -07:00 committed by Teknium
parent 4f835f7e43
commit c4b8f5efee

View file

@ -1031,26 +1031,46 @@ def _backup_corrupt_db(path: Path) -> Optional[Path]:
Returns the backup path of the main DB file, or ``None`` if the copy
itself failed (the caller still raises loudly in that case).
Writes are confined to the original DB's parent directory. The
backup basename is derived purely from ``path.name``, never from
caller-supplied directory segments no traversal is possible.
"""
# Resolve once and pin the parent so subsequent path operations cannot
# escape it. ``Path.resolve()`` collapses any ``..`` segments and
# symlinks, and we only ever write inside ``parent``.
resolved = path.resolve()
parent = resolved.parent
base_name = resolved.name # basename only
stamp = datetime.now().strftime("%Y%m%d_%H%M%S")
candidate = path.with_name(f"{path.name}.corrupt.{stamp}.bak")
candidate = parent / f"{base_name}.corrupt.{stamp}.bak"
# Defensive: candidate must still be inside parent after construction.
# f-string interpolation of ``base_name`` cannot escape ``parent``
# because ``base_name`` is itself a resolved basename, but assert it
# anyway so static analyzers can see the containment guarantee.
if candidate.parent != parent:
return None
counter = 0
while candidate.exists():
counter += 1
candidate = path.with_name(
f"{path.name}.corrupt.{stamp}.{counter}.bak"
)
candidate = parent / f"{base_name}.corrupt.{stamp}.{counter}.bak"
if candidate.parent != parent:
return None
try:
shutil.copy2(path, candidate)
shutil.copy2(resolved, candidate)
except OSError:
return None
for suffix in ("-wal", "-shm"):
sidecar = path.with_name(path.name + suffix)
if sidecar.exists():
try:
shutil.copy2(sidecar, candidate.with_name(candidate.name + suffix))
except OSError:
pass
sidecar = parent / (base_name + suffix)
if sidecar.parent != parent or not sidecar.exists():
continue
try:
sidecar_backup = parent / (candidate.name + suffix)
if sidecar_backup.parent != parent:
continue
shutil.copy2(sidecar, sidecar_backup)
except OSError:
pass
return candidate
@ -1070,17 +1090,31 @@ def _guard_existing_db_is_healthy(path: Path) -> None:
No-op for missing files, zero-byte files (treated as fresh), and
paths already proven healthy this process (cache hit).
Path-trust note: ``path`` arrives via :func:`connect`, which itself
resolves it from an explicit ``db_path`` argument, the
:func:`kanban_db_path` env-var chain, or the kanban-home default
all sources Hermes treats as user-controlled-but-trusted on the
user's own machine. We additionally resolve the path here and
confine all filesystem writes to its parent directory so any
accidental ``..`` segments are collapsed before any I/O happens.
"""
# Resolve before any I/O. ``Path.resolve()`` normalizes ``..`` and
# symlinks, giving us a canonical path whose parent dir we can pin.
try:
if not path.exists() or path.stat().st_size == 0:
resolved = path.resolve()
except OSError:
return
try:
if not resolved.exists() or resolved.stat().st_size == 0:
return
except OSError:
return
if str(path.resolve()) in _INITIALIZED_PATHS:
if str(resolved) in _INITIALIZED_PATHS:
return
reason: Optional[str] = None
try:
probe = sqlite3.connect(str(path), timeout=5, isolation_level=None)
probe = sqlite3.connect(str(resolved), timeout=5, isolation_level=None)
try:
row = probe.execute("PRAGMA integrity_check").fetchone()
finally:
@ -1094,8 +1128,8 @@ def _guard_existing_db_is_healthy(path: Path) -> None:
reason = f"sqlite refused to open file: {exc}"
if reason is None:
return
backup = _backup_corrupt_db(path)
raise KanbanDbCorruptError(path, backup, reason)
backup = _backup_corrupt_db(resolved)
raise KanbanDbCorruptError(resolved, backup, reason)
def connect(