diff --git a/hermes_cli/kanban_db.py b/hermes_cli/kanban_db.py index c2447bdcb21..33de8945ff5 100644 --- a/hermes_cli/kanban_db.py +++ b/hermes_cli/kanban_db.py @@ -1031,26 +1031,46 @@ def _backup_corrupt_db(path: Path) -> Optional[Path]: Returns the backup path of the main DB file, or ``None`` if the copy itself failed (the caller still raises loudly in that case). + + Writes are confined to the original DB's parent directory. The + backup basename is derived purely from ``path.name``, never from + caller-supplied directory segments — no traversal is possible. """ + # Resolve once and pin the parent so subsequent path operations cannot + # escape it. ``Path.resolve()`` collapses any ``..`` segments and + # symlinks, and we only ever write inside ``parent``. + resolved = path.resolve() + parent = resolved.parent + base_name = resolved.name # basename only stamp = datetime.now().strftime("%Y%m%d_%H%M%S") - candidate = path.with_name(f"{path.name}.corrupt.{stamp}.bak") + candidate = parent / f"{base_name}.corrupt.{stamp}.bak" + # Defensive: candidate must still be inside parent after construction. + # f-string interpolation of ``base_name`` cannot escape ``parent`` + # because ``base_name`` is itself a resolved basename, but assert it + # anyway so static analyzers can see the containment guarantee. + if candidate.parent != parent: + return None counter = 0 while candidate.exists(): counter += 1 - candidate = path.with_name( - f"{path.name}.corrupt.{stamp}.{counter}.bak" - ) + candidate = parent / f"{base_name}.corrupt.{stamp}.{counter}.bak" + if candidate.parent != parent: + return None try: - shutil.copy2(path, candidate) + shutil.copy2(resolved, candidate) except OSError: return None for suffix in ("-wal", "-shm"): - sidecar = path.with_name(path.name + suffix) - if sidecar.exists(): - try: - shutil.copy2(sidecar, candidate.with_name(candidate.name + suffix)) - except OSError: - pass + sidecar = parent / (base_name + suffix) + if sidecar.parent != parent or not sidecar.exists(): + continue + try: + sidecar_backup = parent / (candidate.name + suffix) + if sidecar_backup.parent != parent: + continue + shutil.copy2(sidecar, sidecar_backup) + except OSError: + pass return candidate @@ -1070,17 +1090,31 @@ def _guard_existing_db_is_healthy(path: Path) -> None: No-op for missing files, zero-byte files (treated as fresh), and paths already proven healthy this process (cache hit). + + Path-trust note: ``path`` arrives via :func:`connect`, which itself + resolves it from an explicit ``db_path`` argument, the + :func:`kanban_db_path` env-var chain, or the kanban-home default — + all sources Hermes treats as user-controlled-but-trusted on the + user's own machine. We additionally resolve the path here and + confine all filesystem writes to its parent directory so any + accidental ``..`` segments are collapsed before any I/O happens. """ + # Resolve before any I/O. ``Path.resolve()`` normalizes ``..`` and + # symlinks, giving us a canonical path whose parent dir we can pin. try: - if not path.exists() or path.stat().st_size == 0: + resolved = path.resolve() + except OSError: + return + try: + if not resolved.exists() or resolved.stat().st_size == 0: return except OSError: return - if str(path.resolve()) in _INITIALIZED_PATHS: + if str(resolved) in _INITIALIZED_PATHS: return reason: Optional[str] = None try: - probe = sqlite3.connect(str(path), timeout=5, isolation_level=None) + probe = sqlite3.connect(str(resolved), timeout=5, isolation_level=None) try: row = probe.execute("PRAGMA integrity_check").fetchone() finally: @@ -1094,8 +1128,8 @@ def _guard_existing_db_is_healthy(path: Path) -> None: reason = f"sqlite refused to open file: {exc}" if reason is None: return - backup = _backup_corrupt_db(path) - raise KanbanDbCorruptError(path, backup, reason) + backup = _backup_corrupt_db(resolved) + raise KanbanDbCorruptError(resolved, backup, reason) def connect(