diff --git a/tests/tools/test_checkpoint_manager.py b/tests/tools/test_checkpoint_manager.py index ef843465f..ae03dc31b 100644 --- a/tests/tools/test_checkpoint_manager.py +++ b/tests/tools/test_checkpoint_manager.py @@ -411,3 +411,68 @@ class TestErrorResilience: # Should not raise result = mgr.ensure_checkpoint(str(work_dir), "test") assert result is False + + +# ========================================================================= +# Security / Input validation +# ========================================================================= + +class TestSecurity: + def test_restore_rejects_argument_injection(self, mgr, work_dir): + mgr.ensure_checkpoint(str(work_dir), "initial") + # Try to pass a git flag as a commit hash + result = mgr.restore(str(work_dir), "--patch") + assert result["success"] is False + assert "Invalid commit hash" in result["error"] + assert "must not start with '-'" in result["error"] + + result = mgr.restore(str(work_dir), "-p") + assert result["success"] is False + assert "Invalid commit hash" in result["error"] + + def test_restore_rejects_invalid_hex_chars(self, mgr, work_dir): + mgr.ensure_checkpoint(str(work_dir), "initial") + # Git hashes should not contain characters like ;, &, | + result = mgr.restore(str(work_dir), "abc; rm -rf /") + assert result["success"] is False + assert "expected 4-64 hex characters" in result["error"] + + result = mgr.diff(str(work_dir), "abc&def") + assert result["success"] is False + assert "expected 4-64 hex characters" in result["error"] + + def test_restore_rejects_path_traversal(self, mgr, work_dir): + mgr.ensure_checkpoint(str(work_dir), "initial") + # Real commit hash but malicious path + checkpoints = mgr.list_checkpoints(str(work_dir)) + target_hash = checkpoints[0]["hash"] + + # Absolute path outside + result = mgr.restore(str(work_dir), target_hash, file_path="/etc/passwd") + assert result["success"] is False + assert "got absolute path" in result["error"] + + # Relative traversal outside path + result = mgr.restore(str(work_dir), target_hash, file_path="../outside_file.txt") + assert result["success"] is False + assert "escapes the working directory" in result["error"] + + def test_restore_accepts_valid_file_path(self, mgr, work_dir): + mgr.ensure_checkpoint(str(work_dir), "initial") + checkpoints = mgr.list_checkpoints(str(work_dir)) + target_hash = checkpoints[0]["hash"] + + # Valid path inside directory + result = mgr.restore(str(work_dir), target_hash, file_path="main.py") + assert result["success"] is True + + # Another valid path with subdirectories + (work_dir / "subdir").mkdir() + (work_dir / "subdir" / "test.txt").write_text("hello") + mgr.new_turn() + mgr.ensure_checkpoint(str(work_dir), "second") + checkpoints = mgr.list_checkpoints(str(work_dir)) + target_hash = checkpoints[0]["hash"] + + result = mgr.restore(str(work_dir), target_hash, file_path="subdir/test.txt") + assert result["success"] is True diff --git a/tools/checkpoint_manager.py b/tools/checkpoint_manager.py index c298aa0bb..3ea6b32fd 100644 --- a/tools/checkpoint_manager.py +++ b/tools/checkpoint_manager.py @@ -21,6 +21,7 @@ into the user's project directory. import hashlib import logging import os +import re import shutil import subprocess from pathlib import Path @@ -64,6 +65,49 @@ _GIT_TIMEOUT: int = max(10, min(60, int(os.getenv("HERMES_CHECKPOINT_TIMEOUT", " # Max files to snapshot — skip huge directories to avoid slowdowns. _MAX_FILES = 50_000 +# Valid git commit hash pattern: 4–40 hex chars (short or full SHA-1/SHA-256). +_COMMIT_HASH_RE = re.compile(r'^[0-9a-fA-F]{4,64}$') + + +# --------------------------------------------------------------------------- +# Input validation helpers +# --------------------------------------------------------------------------- + +def _validate_commit_hash(commit_hash: str) -> Optional[str]: + """Validate a commit hash to prevent git argument injection. + + Returns an error string if invalid, None if valid. + Values starting with '-' would be interpreted as git flags + (e.g., '--patch', '-p') instead of revision specifiers. + """ + if not commit_hash or not commit_hash.strip(): + return "Empty commit hash" + if commit_hash.startswith("-"): + return f"Invalid commit hash (must not start with '-'): {commit_hash!r}" + if not _COMMIT_HASH_RE.match(commit_hash): + return f"Invalid commit hash (expected 4-64 hex characters): {commit_hash!r}" + return None + + +def _validate_file_path(file_path: str, working_dir: str) -> Optional[str]: + """Validate a file path to prevent path traversal outside the working directory. + + Returns an error string if invalid, None if valid. + """ + if not file_path or not file_path.strip(): + return "Empty file path" + # Reject absolute paths — restore targets must be relative to the workdir + if os.path.isabs(file_path): + return f"File path must be relative, got absolute path: {file_path!r}" + # Resolve and check containment within working_dir + abs_workdir = Path(working_dir).resolve() + resolved = (abs_workdir / file_path).resolve() + try: + resolved.relative_to(abs_workdir) + except ValueError: + return f"File path escapes the working directory via traversal: {file_path!r}" + return None + # --------------------------------------------------------------------------- # Shadow repo helpers @@ -311,6 +355,11 @@ class CheckpointManager: Returns dict with success, diff text, and stat summary. """ + # Validate commit_hash to prevent git argument injection + hash_err = _validate_commit_hash(commit_hash) + if hash_err: + return {"success": False, "error": hash_err} + abs_dir = str(Path(working_dir).resolve()) shadow = _shadow_repo_path(abs_dir) @@ -364,7 +413,19 @@ class CheckpointManager: Returns dict with success/error info. """ + # Validate commit_hash to prevent git argument injection + hash_err = _validate_commit_hash(commit_hash) + if hash_err: + return {"success": False, "error": hash_err} + abs_dir = str(Path(working_dir).resolve()) + + # Validate file_path to prevent path traversal outside the working dir + if file_path: + path_err = _validate_file_path(file_path, abs_dir) + if path_err: + return {"success": False, "error": path_err} + shadow = _shadow_repo_path(abs_dir) if not (shadow / "HEAD").exists():