diff --git a/tests/tools/test_file_operations_edge_cases.py b/tests/tools/test_file_operations_edge_cases.py index a53450a814..bad72f4b6d 100644 --- a/tests/tools/test_file_operations_edge_cases.py +++ b/tests/tools/test_file_operations_edge_cases.py @@ -82,7 +82,11 @@ class TestIsLikelyBinary: class TestCheckLintBracePaths: - """Verify _check_lint handles file paths with curly braces safely.""" + """Verify _check_lint handles file paths with curly braces safely. + + Uses ``.js`` to exercise the shell-linter path since ``.py`` now goes + through the in-process ast.parse linter (see TestCheckLintInproc). + """ @pytest.fixture() def ops(self): @@ -95,12 +99,12 @@ class TestCheckLintBracePaths: with patch.object(ops, "_has_command", return_value=True), \ patch.object(ops, "_exec") as mock_exec: mock_exec.return_value = MagicMock(exit_code=0, stdout="") - result = ops._check_lint("/tmp/test_file.py") + result = ops._check_lint("/tmp/test_file.js") assert result.success is True # Verify the command was built correctly cmd_arg = mock_exec.call_args[0][0] - assert "'/tmp/test_file.py'" in cmd_arg + assert "'/tmp/test_file.js'" in cmd_arg def test_path_with_curly_braces(self, ops): """Path containing ``{`` and ``}`` must not raise KeyError/ValueError.""" @@ -108,7 +112,7 @@ class TestCheckLintBracePaths: patch.object(ops, "_exec") as mock_exec: mock_exec.return_value = MagicMock(exit_code=0, stdout="") # This would raise KeyError with .format() but works with .replace() - result = ops._check_lint("/tmp/{test}_file.py") + result = ops._check_lint("/tmp/{test}_file.js") assert result.success is True cmd_arg = mock_exec.call_args[0][0] @@ -119,7 +123,7 @@ class TestCheckLintBracePaths: with patch.object(ops, "_has_command", return_value=True), \ patch.object(ops, "_exec") as mock_exec: mock_exec.return_value = MagicMock(exit_code=0, stdout="") - result = ops._check_lint("/tmp/{{var}}.py") + result = ops._check_lint("/tmp/{{var}}.js") assert result.success is True @@ -131,7 +135,7 @@ class TestCheckLintBracePaths: def test_missing_linter_skipped(self, ops): """When the linter binary is not installed, skip gracefully.""" with patch.object(ops, "_has_command", return_value=False): - result = ops._check_lint("/tmp/test.py") + result = ops._check_lint("/tmp/test.js") assert result.skipped is True def test_lint_failure_returns_output(self, ops): @@ -142,12 +146,122 @@ class TestCheckLintBracePaths: exit_code=1, stdout="SyntaxError: invalid syntax", ) - result = ops._check_lint("/tmp/bad.py") + result = ops._check_lint("/tmp/bad.js") assert result.success is False assert "SyntaxError" in result.output +class TestCheckLintInproc: + """Verify in-process linters (.py via ast.parse, .json, .yaml, .toml). + + These bypass the shell linter table entirely and parse content + directly in Python — no subprocess, no toolchain dependency. + """ + + @pytest.fixture() + def ops(self): + obj = ShellFileOperations.__new__(ShellFileOperations) + obj._command_cache = {} + return obj + + def test_python_inproc_clean(self, ops): + """Valid Python content passes in-process ast.parse.""" + result = ops._check_lint("/tmp/ok.py", content="x = 1\n") + assert result.success is True + assert not result.skipped + assert result.output == "" + + def test_python_inproc_syntax_error(self, ops): + """Invalid Python content fails with SyntaxError + line info.""" + result = ops._check_lint("/tmp/bad.py", content="def foo(:\n pass\n") + assert result.success is False + assert "SyntaxError" in result.output + assert "line" in result.output.lower() + + def test_python_inproc_content_explicit(self, ops): + """When content is passed explicitly, the file is not re-read.""" + with patch.object(ops, "_exec") as mock_exec: + result = ops._check_lint("/tmp/explicit.py", content="y = 2\n") + # _exec must not have been called — content was supplied + mock_exec.assert_not_called() + assert result.success is True + + def test_json_inproc_clean(self, ops): + result = ops._check_lint("/tmp/a.json", content='{"a": 1}') + assert result.success is True + + def test_json_inproc_error(self, ops): + result = ops._check_lint("/tmp/b.json", content='{"a": 1') + assert result.success is False + assert "JSONDecodeError" in result.output + + def test_yaml_inproc_clean(self, ops): + result = ops._check_lint("/tmp/a.yaml", content="a: 1\nb: 2\n") + assert result.success is True + + def test_yaml_inproc_error(self, ops): + result = ops._check_lint("/tmp/b.yaml", content='key: "unclosed\n') + assert result.success is False + assert "YAMLError" in result.output + + def test_toml_inproc_clean(self, ops): + result = ops._check_lint("/tmp/a.toml", content='[section]\nk = "v"\n') + assert result.success is True + + def test_toml_inproc_error(self, ops): + result = ops._check_lint("/tmp/b.toml", content='[section\nk = "v"') + assert result.success is False + assert "TOMLDecodeError" in result.output + + +class TestCheckLintDelta: + """Verify _check_lint_delta() filters pre-existing errors from post-edit output.""" + + @pytest.fixture() + def ops(self): + obj = ShellFileOperations.__new__(ShellFileOperations) + obj._command_cache = {} + return obj + + def test_clean_post_no_pre_lint(self, ops): + """Hot path: post-write is clean, pre-lint should be skipped entirely.""" + with patch.object(ops, "_check_lint", wraps=ops._check_lint) as wrapped: + r = ops._check_lint_delta("/tmp/a.py", pre_content="x = 0\n", post_content="x = 1\n") + # Post-lint called exactly once (clean), pre-lint never called. + assert wrapped.call_count == 1 + assert r.success is True + + def test_new_file_reports_all_errors(self, ops): + """No pre-content means no delta refinement — all post errors surface.""" + r = ops._check_lint_delta("/tmp/new.py", pre_content=None, post_content="def x(:\n") + assert r.success is False + assert "SyntaxError" in r.output + + def test_broken_file_becomes_good(self, ops): + """Post-clean short-circuits without any delta refinement.""" + r = ops._check_lint_delta("/tmp/fix.py", pre_content="def x(:\n", post_content="def x():\n pass\n") + assert r.success is True + + def test_introduces_new_error_filters_pre(self, ops): + """Delta filter drops pre-existing errors, surfaces only new ones.""" + pre = 'def a(:\n pass\n' # line 1 broken + post = 'def a():\n pass\n\ndef b(:\n pass\n' # line 1 fixed, line 4 broken + r = ops._check_lint_delta("/tmp/d.py", pre_content=pre, post_content=post) + assert r.success is False + assert "New lint errors" in r.output or "line 4" in r.output + + def test_pre_existing_remains_flagged_but_not_new(self, ops): + """Single-error parsers (ast) may miss that post is OK — be cautious.""" + # Pre has line-1 error, post keeps it (and doesn't add anything new) + pre = 'def a(:\n pass\n' + post = 'def a(:\n pass\n\nprint(42)\n' # still line 1 broken + r = ops._check_lint_delta("/tmp/d.py", pre_content=pre, post_content=post) + # File is still broken — don't lie and claim success — but flag it as pre-existing + assert r.success is False + assert "pre-existing" in (r.message or "").lower() + + # ========================================================================= # Pagination bounds # ========================================================================= diff --git a/tools/file_operations.py b/tools/file_operations.py index 6c6dd91c69..3f7343bb25 100644 --- a/tools/file_operations.py +++ b/tools/file_operations.py @@ -119,9 +119,10 @@ class WriteResult: """Result from writing a file.""" bytes_written: int = 0 dirs_created: bool = False + lint: Optional[Dict[str, Any]] = None error: Optional[str] = None warning: Optional[str] = None - + def to_dict(self) -> dict: return {k: v for k, v in self.__dict__.items() if v is not None} @@ -202,10 +203,10 @@ class LintResult: def to_dict(self) -> dict: if self.skipped: return {"status": "skipped", "message": self.message} - return { - "status": "ok" if self.success else "error", - "output": self.output - } + result = {"status": "ok" if self.success else "error", "output": self.output} + if self.message: + result["message"] = self.message + return result @dataclass @@ -303,7 +304,9 @@ class FileOperations(ABC): # Image extensions (subset of binary that we can return as base64) IMAGE_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.ico'} -# Linters by file extension +# Shell-based linters by file extension. Invoked via _exec() with the +# filesystem path. Cover languages where a compile/type check needs an +# external toolchain (py_compile, node, tsc, go vet, rustfmt). LINTERS = { '.py': 'python -m py_compile {file} 2>&1', '.js': 'node --check {file} 2>&1', @@ -312,6 +315,86 @@ LINTERS = { '.rs': 'rustfmt --check {file} 2>&1', } + +def _lint_json_inproc(content: str) -> tuple[bool, str]: + """In-process JSON syntax check. Returns (ok, error_message).""" + import json as _json + try: + _json.loads(content) + return True, "" + except _json.JSONDecodeError as e: + return False, f"JSONDecodeError: {e.msg} (line {e.lineno}, column {e.colno})" + except Exception as e: # noqa: BLE001 — any parse failure is a lint failure + return False, f"{type(e).__name__}: {e}" + + +def _lint_yaml_inproc(content: str) -> tuple[bool, str]: + """In-process YAML syntax check. Returns (ok, error_message). + + Skipped gracefully if PyYAML isn't installed — YAML parsing is optional. + """ + try: + import yaml as _yaml + except ImportError: + # PyYAML not available — skip silently, caller treats as no linter. + return True, "__SKIP__" + try: + _yaml.safe_load(content) + return True, "" + except _yaml.YAMLError as e: + return False, f"YAMLError: {e}" + except Exception as e: # noqa: BLE001 + return False, f"{type(e).__name__}: {e}" + + +def _lint_toml_inproc(content: str) -> tuple[bool, str]: + """In-process TOML syntax check (stdlib tomllib, Python 3.11+).""" + try: + import tomllib as _toml + except ImportError: + # Pre-3.11 fallback via tomli, if installed. + try: + import tomli as _toml # type: ignore[no-redef] + except ImportError: + return True, "__SKIP__" + try: + _toml.loads(content) + return True, "" + except Exception as e: # tomllib raises TOMLDecodeError, a ValueError subclass + return False, f"{type(e).__name__}: {e}" + + +def _lint_python_inproc(content: str) -> tuple[bool, str]: + """In-process Python syntax check via ast.parse. + + Catches SyntaxError, IndentationError, and everything else the + ast module rejects — matching py_compile's scope but with no + subprocess overhead and no dependency on a ``python`` in PATH. + """ + import ast as _ast + try: + _ast.parse(content) + return True, "" + except SyntaxError as e: + loc = f" (line {e.lineno}, column {e.offset})" if e.lineno else "" + return False, f"{type(e).__name__}: {e.msg}{loc}" + except Exception as e: # noqa: BLE001 + return False, f"{type(e).__name__}: {e}" + + +# In-process linters by file extension. Preferred over shell linters when +# present — no subprocess overhead, microseconds per call. Each callable +# takes file content (str) and returns (ok: bool, error: str). An error +# string of ``"__SKIP__"`` signals the linter isn't available (missing +# dependency) and should be treated as "no linter". +LINTERS_INPROC = { + '.py': _lint_python_inproc, + '.json': _lint_json_inproc, + '.yaml': _lint_yaml_inproc, + '.yml': _lint_yaml_inproc, + '.toml': _lint_toml_inproc, +} + # Max limits for read operations MAX_LINES = 2000 MAX_LINE_LENGTH = 2000 @@ -745,12 +828,19 @@ class ShellFileOperations(FileOperations): files. The content never appears in the shell command string — only the file path does. + After the write, runs a post-first / pre-lazy lint check via + ``_check_lint_delta()``. If the new content is clean, the lint + call is O(one parse). If the new content has errors, the pre-write + content is linted too and only errors newly introduced by this + write are surfaced — pre-existing problems are filtered out so + the agent isn't distracted chasing them. + Args: path: File path to write content: Content to write Returns: - WriteResult with bytes written or error + WriteResult with bytes written, lint summary, or error. """ # Expand ~ and other shell paths path = self._expand_path(path) @@ -759,36 +849,58 @@ class ShellFileOperations(FileOperations): if _is_write_denied(path): return WriteResult(error=f"Write denied: '{path}' is a protected system/credential file.") + # Capture pre-write content for lint-delta computation. Only do this + # when an in-process OR shell linter exists for this extension — no + # point paying for the read otherwise. For in-process linters we + # pass the content directly; for shell linters the pre-state isn't + # useful (we'd have to re-write-read to lint the old version, which + # defeats the purpose), so we skip the capture and accept the naive + # "all errors" report. + ext = os.path.splitext(path)[1].lower() + pre_content: Optional[str] = None + if ext in LINTERS_INPROC: + # Best-effort read; failure (file missing, permission) leaves + # pre_content as None which makes the delta step degrade + # gracefully to "report all errors". + read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null" + read_result = self._exec(read_cmd) + if read_result.exit_code == 0 and read_result.stdout: + pre_content = read_result.stdout + # Create parent directories parent = os.path.dirname(path) dirs_created = False - + if parent: mkdir_cmd = f"mkdir -p {self._escape_shell_arg(parent)}" mkdir_result = self._exec(mkdir_cmd) if mkdir_result.exit_code == 0: dirs_created = True - + # Write via stdin pipe — content bypasses shell arg parsing entirely, # so there's no ARG_MAX limit regardless of file size. write_cmd = f"cat > {self._escape_shell_arg(path)}" write_result = self._exec(write_cmd, stdin_data=content) - + if write_result.exit_code != 0: return WriteResult(error=f"Failed to write file: {write_result.stdout}") - + # Get bytes written (wc -c is POSIX, works on Linux + macOS) stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null" stat_result = self._exec(stat_cmd) - + try: bytes_written = int(stat_result.stdout.strip()) except ValueError: bytes_written = len(content.encode('utf-8')) - + + # Post-write lint with delta refinement. + lint_result = self._check_lint_delta(path, pre_content=pre_content, post_content=content) + return WriteResult( bytes_written=bytes_written, - dirs_created=dirs_created + dirs_created=dirs_created, + lint=lint_result.to_dict() if lint_result else None, ) # ========================================================================= @@ -864,10 +976,12 @@ class ShellFileOperations(FileOperations): # Generate diff diff = self._unified_diff(content, new_content, path) - - # Auto-lint - lint_result = self._check_lint(path) - + + # Auto-lint with delta refinement: only surface errors introduced + # by this patch, filtering out pre-existing lint failures so the + # agent isn't distracted by problems that were already there. + lint_result = self._check_lint_delta(path, pre_content=content, post_content=new_content) + return PatchResult( success=True, diff=diff, @@ -905,37 +1019,143 @@ class ShellFileOperations(FileOperations): result = apply_v4a_operations(operations, self) return result - def _check_lint(self, path: str) -> LintResult: + def _check_lint(self, path: str, content: Optional[str] = None) -> LintResult: """ Run syntax check on a file after editing. - + + Prefers the in-process linter for structured formats (JSON, YAML, + TOML) when possible — those parse via the Python stdlib in + microseconds and don't require a subprocess. Falls back to the + shell linter table for compiled/type-checked languages + (py_compile, node --check, tsc, go vet, rustfmt). + Args: - path: File path to lint - + path: File path (used to select the linter + for shell invocation). + content: Optional file content. If provided AND an in-process + linter matches the extension, we lint the content + directly without re-reading the file from disk. Ignored + for shell linters. + Returns: - LintResult with status and any errors + LintResult with status and any errors. """ ext = os.path.splitext(path)[1].lower() - + + # Prefer in-process linter when available. + inproc = LINTERS_INPROC.get(ext) + if inproc is not None: + # Need content — either passed in or read from disk. + if content is None: + read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null" + read_result = self._exec(read_cmd) + if read_result.exit_code != 0: + return LintResult(skipped=True, message=f"Failed to read {path} for lint") + content = read_result.stdout + ok, err = inproc(content) + if err == "__SKIP__": + return LintResult(skipped=True, message=f"No linter available for {ext} (missing dependency)") + return LintResult(success=ok, output="" if ok else err) + + # Fall back to shell linter. if ext not in LINTERS: return LintResult(skipped=True, message=f"No linter for {ext} files") - - # Check if linter command is available + linter_cmd = LINTERS[ext] # Extract the base command (first word) base_cmd = linter_cmd.split()[0] - + if not self._has_command(base_cmd): return LintResult(skipped=True, message=f"{base_cmd} not available") - + # Run linter cmd = linter_cmd.replace("{file}", self._escape_shell_arg(path)) result = self._exec(cmd, timeout=30) - + return LintResult( success=result.exit_code == 0, output=result.stdout.strip() if result.stdout.strip() else "" ) + + def _check_lint_delta(self, path: str, pre_content: Optional[str], + post_content: Optional[str] = None) -> LintResult: + """ + Run post-write lint with pre-write baseline comparison. + + Strategy (post-first, pre-lazy): + 1. Lint the post-write state. If clean → return clean immediately. + This is the hot path and matches _check_lint() in cost. + 2. If post-lint found errors AND we have pre-write content, lint + that too. If the pre-write file was already broken, return only + the *new* errors introduced by this edit — errors that existed + before aren't the agent's problem to chase right now. + 3. If pre_content is None (new file or unavailable), skip the delta + step and return all post-write errors. + + This mirrors Cline's and OpenCode's post-edit LSP pattern: surface + only the errors this specific edit introduced, so the agent doesn't + get distracted by pre-existing problems. + + Args: + path: File path (for linter selection). + pre_content: File content BEFORE the write. Pass None for new + files or when the pre-state isn't available — the + delta refinement is skipped and all post errors + are returned. + post_content: File content AFTER the write. Optional; if None, + the shell linter reads from disk (same as + _check_lint). + + Returns: + LintResult. ``output`` contains either the full post-lint + errors (no pre-state) or just the new-error lines (delta + refinement applied). + """ + post = self._check_lint(path, content=post_content) + + # Hot path: clean post-write, no pre-lint needed. + if post.success or post.skipped: + return post + + # Post-write has errors. If we have pre-content, run the delta + # refinement to filter out pre-existing errors. + if pre_content is None: + return post + + pre = self._check_lint(path, content=pre_content) + if pre.success or pre.skipped or not pre.output: + # Pre-write was clean (or we couldn't lint it) — post errors + # are all new. Return the full post output. + return post + + # Both pre- and post-write had errors. Compute the set-difference + # on non-empty stripped lines. Caveat: single-error parsers + # (ast.parse, json.loads) stop at the first error and don't report + # later ones — if the pre-existing error blocks parsing before + # reaching the edit region, we can't prove the edit is clean. So + # if every post error also appeared pre-edit, we report the file + # as still broken but annotate that this edit introduced nothing + # new on top — the agent knows it's inherited state, not fresh + # damage, without silently dropping the error. + pre_lines = {ln.strip() for ln in pre.output.splitlines() if ln.strip()} + post_lines = [ln for ln in post.output.splitlines() if ln.strip() and ln.strip() not in pre_lines] + + if not post_lines: + # Every error in post was also in pre — this edit didn't make + # anything obviously worse, but the file remains broken and + # the agent should know. + return LintResult( + success=False, + output=post.output, + message="Pre-existing lint errors — this edit didn't introduce new ones but the file is still broken.", + ) + + return LintResult( + success=False, + output=( + "New lint errors introduced by this edit " + "(pre-existing errors filtered out):\n" + "\n".join(post_lines) + ) + ) # ========================================================================= # SEARCH Implementation diff --git a/tools/file_tools.py b/tools/file_tools.py index 106bd295be..200287dcbd 100644 --- a/tools/file_tools.py +++ b/tools/file_tools.py @@ -1042,7 +1042,7 @@ READ_FILE_SCHEMA = { WRITE_FILE_SCHEMA = { "name": "write_file", - "description": "Write content to a file, completely replacing existing content. Use this instead of echo/cat heredoc in terminal. Creates parent directories automatically. OVERWRITES the entire file — use 'patch' for targeted edits.", + "description": "Write content to a file, completely replacing existing content. Use this instead of echo/cat heredoc in terminal. Creates parent directories automatically. OVERWRITES the entire file — use 'patch' for targeted edits. Auto-runs syntax checks on .py/.json/.yaml/.toml and other linted languages; only NEW errors introduced by this write are surfaced (pre-existing errors are filtered out).", "parameters": { "type": "object", "properties": {