From c4b8f5efee8cdd3186e465d940bf0e8c98849346 Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sat, 23 May 2026 02:46:59 -0700 Subject: [PATCH] fix(kanban): harden corrupt-db backup against CodeQL path-injection findings Path.resolve() before any I/O and confine backup writes to the resolved parent directory. Adds explicit parent-equality assertions so static analyzers see the containment guarantee, and walks WAL/SHM sidecars through the same resolved-parent path so accidental .. segments are collapsed before shutil.copy2. Functionally equivalent to the original PR; preserves the corrupt bytes to .corrupt..bak in the same directory, still raises KanbanDbCorruptError from connect(). E2E with Stefan's exact hex header + malformed pages still passes. 163/163 kanban tests still pass. --- hermes_cli/kanban_db.py | 66 +++++++++++++++++++++++++++++++---------- 1 file changed, 50 insertions(+), 16 deletions(-) diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index c2447bdcb21..33de8945ff5 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -1031,26 +1031,46 @@ def _backup_corrupt_db(path: Path) -> Optional[Path]: Returns the backup path of the main DB file, or ``None`` if the copy itself failed (the caller still raises loudly in that case). + + Writes are confined to the original DB's parent directory. The + backup basename is derived purely from ``path.name``, never from + caller-supplied directory segments — no traversal is possible. """ + # Resolve once and pin the parent so subsequent path operations cannot + # escape it. ``Path.resolve()`` collapses any ``..`` segments and + # symlinks, and we only ever write inside ``parent``. + resolved = path.resolve() + parent = resolved.parent + base_name = resolved.name # basename only stamp = datetime.now().strftime("%Y%m%d_%H%M%S") - candidate = path.with_name(f"{path.name}.corrupt.{stamp}.bak") + candidate = parent / f"{base_name}.corrupt.{stamp}.bak" + # Defensive: candidate must still be inside parent after construction. + # f-string interpolation of ``base_name`` cannot escape ``parent`` + # because ``base_name`` is itself a resolved basename, but assert it + # anyway so static analyzers can see the containment guarantee. + if candidate.parent != parent: + return None counter = 0 while candidate.exists(): counter += 1 - candidate = path.with_name( - f"{path.name}.corrupt.{stamp}.{counter}.bak" - ) + candidate = parent / f"{base_name}.corrupt.{stamp}.{counter}.bak" + if candidate.parent != parent: + return None try: - shutil.copy2(path, candidate) + shutil.copy2(resolved, candidate) except OSError: return None for suffix in ("-wal", "-shm"): - sidecar = path.with_name(path.name + suffix) - if sidecar.exists(): - try: - shutil.copy2(sidecar, candidate.with_name(candidate.name + suffix)) - except OSError: - pass + sidecar = parent / (base_name + suffix) + if sidecar.parent != parent or not sidecar.exists(): + continue + try: + sidecar_backup = parent / (candidate.name + suffix) + if sidecar_backup.parent != parent: + continue + shutil.copy2(sidecar, sidecar_backup) + except OSError: + pass return candidate @@ -1070,17 +1090,31 @@ def _guard_existing_db_is_healthy(path: Path) -> None: No-op for missing files, zero-byte files (treated as fresh), and paths already proven healthy this process (cache hit). + + Path-trust note: ``path`` arrives via :func:`connect`, which itself + resolves it from an explicit ``db_path`` argument, the + :func:`kanban_db_path` env-var chain, or the kanban-home default — + all sources Hermes treats as user-controlled-but-trusted on the + user's own machine. We additionally resolve the path here and + confine all filesystem writes to its parent directory so any + accidental ``..`` segments are collapsed before any I/O happens. """ + # Resolve before any I/O. ``Path.resolve()`` normalizes ``..`` and + # symlinks, giving us a canonical path whose parent dir we can pin. try: - if not path.exists() or path.stat().st_size == 0: + resolved = path.resolve() + except OSError: + return + try: + if not resolved.exists() or resolved.stat().st_size == 0: return except OSError: return - if str(path.resolve()) in _INITIALIZED_PATHS: + if str(resolved) in _INITIALIZED_PATHS: return reason: Optional[str] = None try: - probe = sqlite3.connect(str(path), timeout=5, isolation_level=None) + probe = sqlite3.connect(str(resolved), timeout=5, isolation_level=None) try: row = probe.execute("PRAGMA integrity_check").fetchone() finally: @@ -1094,8 +1128,8 @@ def _guard_existing_db_is_healthy(path: Path) -> None: reason = f"sqlite refused to open file: {exc}" if reason is None: return - backup = _backup_corrupt_db(path) - raise KanbanDbCorruptError(path, backup, reason) + backup = _backup_corrupt_db(resolved) + raise KanbanDbCorruptError(resolved, backup, reason) def connect(