feat(file_tools): post-write delta lint on write_file + patch, add JSON/YAML/TOML/Python in-process linters (#20191)

Closes the gap where write_file skipped the post-edit syntax check that
patch already ran, so silent file corruption (bad quote escaping,
truncated writes, etc.) would persist on disk until a later read.

## Changes

tools/file_operations.py:
- Add in-process linters for .py, .json, .yaml, .toml (LINTERS_INPROC).
  Python uses ast.parse, JSON/YAML/TOML use stdlib/PyYAML parsers.
  Zero subprocess overhead; preferred over shell linters when both apply.
- _check_lint() now accepts optional content and routes to in-process
  linter first. Shell linter (py_compile, node --check, tsc, go vet,
  rustfmt) remains the fallback for languages without an in-process
  equivalent.
- New _check_lint_delta() implements the post-first/pre-lazy pattern
  borrowed from Cline and OpenCode: lint post-write state first; only
  if errors are found AND pre-content was captured does it lint the
  pre-state and diff. If the pre-existing file had the SAME errors the
  edit didn't introduce anything new, so the file is reported as 'still
  broken, pre-existing' with success=False but a message explaining the
  errors were pre-existing. If the edit introduced genuinely new errors,
  those are surfaced and pre-existing ones are filtered out.
- WriteResult gains a lint field.
- write_file() captures pre-content for in-process-lintable extensions
  and calls _check_lint_delta after a successful write.
- patch_replace() switches from _check_lint to _check_lint_delta,
  reusing the pre-edit content it already has in scope.

tools/file_tools.py:
- Update write_file schema description to mention the post-write lint.

tests/tools/test_file_operations_edge_cases.py:
- Update existing brace-path tests to use .js (shell linter) now that
  .py is in-process.
- Add TestCheckLintInproc (9 tests) covering Python/JSON/YAML/TOML
  in-process linters.
- Add TestCheckLintDelta (5 tests) covering the post-first/pre-lazy
  short-circuit, new-file path, and the single-error-parser caveat.

## Performance

In-process linters are microseconds per call (ast.parse, json.loads).
The hot path (clean write) runs exactly one lint — matches main's cost
for patch. Pre-state capture is skipped when the file has no applicable
linter. Measured 4.89ms/write average over 100 .py writes including lint.

## Inspiration

- Cline's DiffViewProvider.getNewDiagnosticProblems() — filters pre-write
  diagnostics from post-write diagnostics (src/integrations/editor/DiffViewProvider.ts).
- OpenCode's WriteTool — runs lsp.diagnostics() after write and appends
  errors to tool output (packages/opencode/src/tool/write.ts).
- Claude Code's DiagnosticTrackingService — captures baseline via
  beforeFileEdited() and returns new-diagnostics-only from
  getNewDiagnostics() (src/services/diagnosticTracking.ts).

## Validation

- tests/tools/test_file_operations.py + test_file_operations_edge_cases.py
  + test_file_tools.py + test_file_tools_live.py + test_file_write_safety.py
  + test_write_deny.py + test_patch_parser.py + test_file_ops_cwd_tracking.py:
  228 passed locally.
- Live E2E reproduction of the tips.py corruption incident: broken
  content written; lint field surfaces 'SyntaxError: invalid syntax.
  Perhaps you forgot a comma? (line 6, column 5)' — the exact error
  that would have self-corrected the bug on the next turn.
This commit is contained in:
Teknium 2026-05-05 04:54:17 -07:00 committed by GitHub
parent b93643c8fe
commit 5168226d60
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 371 additions and 37 deletions

View file

@ -82,7 +82,11 @@ class TestIsLikelyBinary:
class TestCheckLintBracePaths:
"""Verify _check_lint handles file paths with curly braces safely."""
"""Verify _check_lint handles file paths with curly braces safely.
Uses ``.js`` to exercise the shell-linter path since ``.py`` now goes
through the in-process ast.parse linter (see TestCheckLintInproc).
"""
@pytest.fixture()
def ops(self):
@ -95,12 +99,12 @@ class TestCheckLintBracePaths:
with patch.object(ops, "_has_command", return_value=True), \
patch.object(ops, "_exec") as mock_exec:
mock_exec.return_value = MagicMock(exit_code=0, stdout="")
result = ops._check_lint("/tmp/test_file.py")
result = ops._check_lint("/tmp/test_file.js")
assert result.success is True
# Verify the command was built correctly
cmd_arg = mock_exec.call_args[0][0]
assert "'/tmp/test_file.py'" in cmd_arg
assert "'/tmp/test_file.js'" in cmd_arg
def test_path_with_curly_braces(self, ops):
"""Path containing ``{`` and ``}`` must not raise KeyError/ValueError."""
@ -108,7 +112,7 @@ class TestCheckLintBracePaths:
patch.object(ops, "_exec") as mock_exec:
mock_exec.return_value = MagicMock(exit_code=0, stdout="")
# This would raise KeyError with .format() but works with .replace()
result = ops._check_lint("/tmp/{test}_file.py")
result = ops._check_lint("/tmp/{test}_file.js")
assert result.success is True
cmd_arg = mock_exec.call_args[0][0]
@ -119,7 +123,7 @@ class TestCheckLintBracePaths:
with patch.object(ops, "_has_command", return_value=True), \
patch.object(ops, "_exec") as mock_exec:
mock_exec.return_value = MagicMock(exit_code=0, stdout="")
result = ops._check_lint("/tmp/{{var}}.py")
result = ops._check_lint("/tmp/{{var}}.js")
assert result.success is True
@ -131,7 +135,7 @@ class TestCheckLintBracePaths:
def test_missing_linter_skipped(self, ops):
"""When the linter binary is not installed, skip gracefully."""
with patch.object(ops, "_has_command", return_value=False):
result = ops._check_lint("/tmp/test.py")
result = ops._check_lint("/tmp/test.js")
assert result.skipped is True
def test_lint_failure_returns_output(self, ops):
@ -142,12 +146,122 @@ class TestCheckLintBracePaths:
exit_code=1,
stdout="SyntaxError: invalid syntax",
)
result = ops._check_lint("/tmp/bad.py")
result = ops._check_lint("/tmp/bad.js")
assert result.success is False
assert "SyntaxError" in result.output
class TestCheckLintInproc:
"""Verify in-process linters (.py via ast.parse, .json, .yaml, .toml).
These bypass the shell linter table entirely and parse content
directly in Python no subprocess, no toolchain dependency.
"""
@pytest.fixture()
def ops(self):
obj = ShellFileOperations.__new__(ShellFileOperations)
obj._command_cache = {}
return obj
def test_python_inproc_clean(self, ops):
"""Valid Python content passes in-process ast.parse."""
result = ops._check_lint("/tmp/ok.py", content="x = 1\n")
assert result.success is True
assert not result.skipped
assert result.output == ""
def test_python_inproc_syntax_error(self, ops):
"""Invalid Python content fails with SyntaxError + line info."""
result = ops._check_lint("/tmp/bad.py", content="def foo(:\n pass\n")
assert result.success is False
assert "SyntaxError" in result.output
assert "line" in result.output.lower()
def test_python_inproc_content_explicit(self, ops):
"""When content is passed explicitly, the file is not re-read."""
with patch.object(ops, "_exec") as mock_exec:
result = ops._check_lint("/tmp/explicit.py", content="y = 2\n")
# _exec must not have been called — content was supplied
mock_exec.assert_not_called()
assert result.success is True
def test_json_inproc_clean(self, ops):
result = ops._check_lint("/tmp/a.json", content='{"a": 1}')
assert result.success is True
def test_json_inproc_error(self, ops):
result = ops._check_lint("/tmp/b.json", content='{"a": 1')
assert result.success is False
assert "JSONDecodeError" in result.output
def test_yaml_inproc_clean(self, ops):
result = ops._check_lint("/tmp/a.yaml", content="a: 1\nb: 2\n")
assert result.success is True
def test_yaml_inproc_error(self, ops):
result = ops._check_lint("/tmp/b.yaml", content='key: "unclosed\n')
assert result.success is False
assert "YAMLError" in result.output
def test_toml_inproc_clean(self, ops):
result = ops._check_lint("/tmp/a.toml", content='[section]\nk = "v"\n')
assert result.success is True
def test_toml_inproc_error(self, ops):
result = ops._check_lint("/tmp/b.toml", content='[section\nk = "v"')
assert result.success is False
assert "TOMLDecodeError" in result.output
class TestCheckLintDelta:
"""Verify _check_lint_delta() filters pre-existing errors from post-edit output."""
@pytest.fixture()
def ops(self):
obj = ShellFileOperations.__new__(ShellFileOperations)
obj._command_cache = {}
return obj
def test_clean_post_no_pre_lint(self, ops):
"""Hot path: post-write is clean, pre-lint should be skipped entirely."""
with patch.object(ops, "_check_lint", wraps=ops._check_lint) as wrapped:
r = ops._check_lint_delta("/tmp/a.py", pre_content="x = 0\n", post_content="x = 1\n")
# Post-lint called exactly once (clean), pre-lint never called.
assert wrapped.call_count == 1
assert r.success is True
def test_new_file_reports_all_errors(self, ops):
"""No pre-content means no delta refinement — all post errors surface."""
r = ops._check_lint_delta("/tmp/new.py", pre_content=None, post_content="def x(:\n")
assert r.success is False
assert "SyntaxError" in r.output
def test_broken_file_becomes_good(self, ops):
"""Post-clean short-circuits without any delta refinement."""
r = ops._check_lint_delta("/tmp/fix.py", pre_content="def x(:\n", post_content="def x():\n pass\n")
assert r.success is True
def test_introduces_new_error_filters_pre(self, ops):
"""Delta filter drops pre-existing errors, surfaces only new ones."""
pre = 'def a(:\n pass\n' # line 1 broken
post = 'def a():\n pass\n\ndef b(:\n pass\n' # line 1 fixed, line 4 broken
r = ops._check_lint_delta("/tmp/d.py", pre_content=pre, post_content=post)
assert r.success is False
assert "New lint errors" in r.output or "line 4" in r.output
def test_pre_existing_remains_flagged_but_not_new(self, ops):
"""Single-error parsers (ast) may miss that post is OK — be cautious."""
# Pre has line-1 error, post keeps it (and doesn't add anything new)
pre = 'def a(:\n pass\n'
post = 'def a(:\n pass\n\nprint(42)\n' # still line 1 broken
r = ops._check_lint_delta("/tmp/d.py", pre_content=pre, post_content=post)
# File is still broken — don't lie and claim success — but flag it as pre-existing
assert r.success is False
assert "pre-existing" in (r.message or "").lower()
# =========================================================================
# Pagination bounds
# =========================================================================

View file

@ -119,9 +119,10 @@ class WriteResult:
"""Result from writing a file."""
bytes_written: int = 0
dirs_created: bool = False
lint: Optional[Dict[str, Any]] = None
error: Optional[str] = None
warning: Optional[str] = None
def to_dict(self) -> dict:
return {k: v for k, v in self.__dict__.items() if v is not None}
@ -202,10 +203,10 @@ class LintResult:
def to_dict(self) -> dict:
if self.skipped:
return {"status": "skipped", "message": self.message}
return {
"status": "ok" if self.success else "error",
"output": self.output
}
result = {"status": "ok" if self.success else "error", "output": self.output}
if self.message:
result["message"] = self.message
return result
@dataclass
@ -303,7 +304,9 @@ class FileOperations(ABC):
# Image extensions (subset of binary that we can return as base64)
IMAGE_EXTENSIONS = {'.png', '.jpg', '.jpeg', '.gif', '.webp', '.bmp', '.ico'}
# Linters by file extension
# Shell-based linters by file extension. Invoked via _exec() with the
# filesystem path. Cover languages where a compile/type check needs an
# external toolchain (py_compile, node, tsc, go vet, rustfmt).
LINTERS = {
'.py': 'python -m py_compile {file} 2>&1',
'.js': 'node --check {file} 2>&1',
@ -312,6 +315,86 @@ LINTERS = {
'.rs': 'rustfmt --check {file} 2>&1',
}
def _lint_json_inproc(content: str) -> tuple[bool, str]:
"""In-process JSON syntax check. Returns (ok, error_message)."""
import json as _json
try:
_json.loads(content)
return True, ""
except _json.JSONDecodeError as e:
return False, f"JSONDecodeError: {e.msg} (line {e.lineno}, column {e.colno})"
except Exception as e: # noqa: BLE001 — any parse failure is a lint failure
return False, f"{type(e).__name__}: {e}"
def _lint_yaml_inproc(content: str) -> tuple[bool, str]:
"""In-process YAML syntax check. Returns (ok, error_message).
Skipped gracefully if PyYAML isn't installed — YAML parsing is optional.
"""
try:
import yaml as _yaml
except ImportError:
# PyYAML not available — skip silently, caller treats as no linter.
return True, "__SKIP__"
try:
_yaml.safe_load(content)
return True, ""
except _yaml.YAMLError as e:
return False, f"YAMLError: {e}"
except Exception as e: # noqa: BLE001
return False, f"{type(e).__name__}: {e}"
def _lint_toml_inproc(content: str) -> tuple[bool, str]:
"""In-process TOML syntax check (stdlib tomllib, Python 3.11+)."""
try:
import tomllib as _toml
except ImportError:
# Pre-3.11 fallback via tomli, if installed.
try:
import tomli as _toml # type: ignore[no-redef]
except ImportError:
return True, "__SKIP__"
try:
_toml.loads(content)
return True, ""
except Exception as e: # tomllib raises TOMLDecodeError, a ValueError subclass
return False, f"{type(e).__name__}: {e}"
def _lint_python_inproc(content: str) -> tuple[bool, str]:
"""In-process Python syntax check via ast.parse.
Catches SyntaxError, IndentationError, and everything else the
ast module rejects matching py_compile's scope but with no
subprocess overhead and no dependency on a ``python`` in PATH.
"""
import ast as _ast
try:
_ast.parse(content)
return True, ""
except SyntaxError as e:
loc = f" (line {e.lineno}, column {e.offset})" if e.lineno else ""
return False, f"{type(e).__name__}: {e.msg}{loc}"
except Exception as e: # noqa: BLE001
return False, f"{type(e).__name__}: {e}"
# In-process linters by file extension. Preferred over shell linters when
# present — no subprocess overhead, microseconds per call. Each callable
# takes file content (str) and returns (ok: bool, error: str). An error
# string of ``"__SKIP__"`` signals the linter isn't available (missing
# dependency) and should be treated as "no linter".
LINTERS_INPROC = {
'.py': _lint_python_inproc,
'.json': _lint_json_inproc,
'.yaml': _lint_yaml_inproc,
'.yml': _lint_yaml_inproc,
'.toml': _lint_toml_inproc,
}
# Max limits for read operations
MAX_LINES = 2000
MAX_LINE_LENGTH = 2000
@ -745,12 +828,19 @@ class ShellFileOperations(FileOperations):
files. The content never appears in the shell command string
only the file path does.
After the write, runs a post-first / pre-lazy lint check via
``_check_lint_delta()``. If the new content is clean, the lint
call is O(one parse). If the new content has errors, the pre-write
content is linted too and only errors newly introduced by this
write are surfaced pre-existing problems are filtered out so
the agent isn't distracted chasing them.
Args:
path: File path to write
content: Content to write
Returns:
WriteResult with bytes written or error
WriteResult with bytes written, lint summary, or error.
"""
# Expand ~ and other shell paths
path = self._expand_path(path)
@ -759,36 +849,58 @@ class ShellFileOperations(FileOperations):
if _is_write_denied(path):
return WriteResult(error=f"Write denied: '{path}' is a protected system/credential file.")
# Capture pre-write content for lint-delta computation. Only do this
# when an in-process OR shell linter exists for this extension — no
# point paying for the read otherwise. For in-process linters we
# pass the content directly; for shell linters the pre-state isn't
# useful (we'd have to re-write-read to lint the old version, which
# defeats the purpose), so we skip the capture and accept the naive
# "all errors" report.
ext = os.path.splitext(path)[1].lower()
pre_content: Optional[str] = None
if ext in LINTERS_INPROC:
# Best-effort read; failure (file missing, permission) leaves
# pre_content as None which makes the delta step degrade
# gracefully to "report all errors".
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
read_result = self._exec(read_cmd)
if read_result.exit_code == 0 and read_result.stdout:
pre_content = read_result.stdout
# Create parent directories
parent = os.path.dirname(path)
dirs_created = False
if parent:
mkdir_cmd = f"mkdir -p {self._escape_shell_arg(parent)}"
mkdir_result = self._exec(mkdir_cmd)
if mkdir_result.exit_code == 0:
dirs_created = True
# Write via stdin pipe — content bypasses shell arg parsing entirely,
# so there's no ARG_MAX limit regardless of file size.
write_cmd = f"cat > {self._escape_shell_arg(path)}"
write_result = self._exec(write_cmd, stdin_data=content)
if write_result.exit_code != 0:
return WriteResult(error=f"Failed to write file: {write_result.stdout}")
# Get bytes written (wc -c is POSIX, works on Linux + macOS)
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
stat_result = self._exec(stat_cmd)
try:
bytes_written = int(stat_result.stdout.strip())
except ValueError:
bytes_written = len(content.encode('utf-8'))
# Post-write lint with delta refinement.
lint_result = self._check_lint_delta(path, pre_content=pre_content, post_content=content)
return WriteResult(
bytes_written=bytes_written,
dirs_created=dirs_created
dirs_created=dirs_created,
lint=lint_result.to_dict() if lint_result else None,
)
# =========================================================================
@ -864,10 +976,12 @@ class ShellFileOperations(FileOperations):
# Generate diff
diff = self._unified_diff(content, new_content, path)
# Auto-lint
lint_result = self._check_lint(path)
# Auto-lint with delta refinement: only surface errors introduced
# by this patch, filtering out pre-existing lint failures so the
# agent isn't distracted by problems that were already there.
lint_result = self._check_lint_delta(path, pre_content=content, post_content=new_content)
return PatchResult(
success=True,
diff=diff,
@ -905,37 +1019,143 @@ class ShellFileOperations(FileOperations):
result = apply_v4a_operations(operations, self)
return result
def _check_lint(self, path: str) -> LintResult:
def _check_lint(self, path: str, content: Optional[str] = None) -> LintResult:
"""
Run syntax check on a file after editing.
Prefers the in-process linter for structured formats (JSON, YAML,
TOML) when possible those parse via the Python stdlib in
microseconds and don't require a subprocess. Falls back to the
shell linter table for compiled/type-checked languages
(py_compile, node --check, tsc, go vet, rustfmt).
Args:
path: File path to lint
path: File path (used to select the linter + for shell invocation).
content: Optional file content. If provided AND an in-process
linter matches the extension, we lint the content
directly without re-reading the file from disk. Ignored
for shell linters.
Returns:
LintResult with status and any errors
LintResult with status and any errors.
"""
ext = os.path.splitext(path)[1].lower()
# Prefer in-process linter when available.
inproc = LINTERS_INPROC.get(ext)
if inproc is not None:
# Need content — either passed in or read from disk.
if content is None:
read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null"
read_result = self._exec(read_cmd)
if read_result.exit_code != 0:
return LintResult(skipped=True, message=f"Failed to read {path} for lint")
content = read_result.stdout
ok, err = inproc(content)
if err == "__SKIP__":
return LintResult(skipped=True, message=f"No linter available for {ext} (missing dependency)")
return LintResult(success=ok, output="" if ok else err)
# Fall back to shell linter.
if ext not in LINTERS:
return LintResult(skipped=True, message=f"No linter for {ext} files")
# Check if linter command is available
linter_cmd = LINTERS[ext]
# Extract the base command (first word)
base_cmd = linter_cmd.split()[0]
if not self._has_command(base_cmd):
return LintResult(skipped=True, message=f"{base_cmd} not available")
# Run linter
cmd = linter_cmd.replace("{file}", self._escape_shell_arg(path))
result = self._exec(cmd, timeout=30)
return LintResult(
success=result.exit_code == 0,
output=result.stdout.strip() if result.stdout.strip() else ""
)
def _check_lint_delta(self, path: str, pre_content: Optional[str],
post_content: Optional[str] = None) -> LintResult:
"""
Run post-write lint with pre-write baseline comparison.
Strategy (post-first, pre-lazy):
1. Lint the post-write state. If clean return clean immediately.
This is the hot path and matches _check_lint() in cost.
2. If post-lint found errors AND we have pre-write content, lint
that too. If the pre-write file was already broken, return only
the *new* errors introduced by this edit errors that existed
before aren't the agent's problem to chase right now.
3. If pre_content is None (new file or unavailable), skip the delta
step and return all post-write errors.
This mirrors Cline's and OpenCode's post-edit LSP pattern: surface
only the errors this specific edit introduced, so the agent doesn't
get distracted by pre-existing problems.
Args:
path: File path (for linter selection).
pre_content: File content BEFORE the write. Pass None for new
files or when the pre-state isn't available — the
delta refinement is skipped and all post errors
are returned.
post_content: File content AFTER the write. Optional; if None,
the shell linter reads from disk (same as
_check_lint).
Returns:
LintResult. ``output`` contains either the full post-lint
errors (no pre-state) or just the new-error lines (delta
refinement applied).
"""
post = self._check_lint(path, content=post_content)
# Hot path: clean post-write, no pre-lint needed.
if post.success or post.skipped:
return post
# Post-write has errors. If we have pre-content, run the delta
# refinement to filter out pre-existing errors.
if pre_content is None:
return post
pre = self._check_lint(path, content=pre_content)
if pre.success or pre.skipped or not pre.output:
# Pre-write was clean (or we couldn't lint it) — post errors
# are all new. Return the full post output.
return post
# Both pre- and post-write had errors. Compute the set-difference
# on non-empty stripped lines. Caveat: single-error parsers
# (ast.parse, json.loads) stop at the first error and don't report
# later ones — if the pre-existing error blocks parsing before
# reaching the edit region, we can't prove the edit is clean. So
# if every post error also appeared pre-edit, we report the file
# as still broken but annotate that this edit introduced nothing
# new on top — the agent knows it's inherited state, not fresh
# damage, without silently dropping the error.
pre_lines = {ln.strip() for ln in pre.output.splitlines() if ln.strip()}
post_lines = [ln for ln in post.output.splitlines() if ln.strip() and ln.strip() not in pre_lines]
if not post_lines:
# Every error in post was also in pre — this edit didn't make
# anything obviously worse, but the file remains broken and
# the agent should know.
return LintResult(
success=False,
output=post.output,
message="Pre-existing lint errors — this edit didn't introduce new ones but the file is still broken.",
)
return LintResult(
success=False,
output=(
"New lint errors introduced by this edit "
"(pre-existing errors filtered out):\n" + "\n".join(post_lines)
)
)
# =========================================================================
# SEARCH Implementation

View file

@ -1042,7 +1042,7 @@ READ_FILE_SCHEMA = {
WRITE_FILE_SCHEMA = {
"name": "write_file",
"description": "Write content to a file, completely replacing existing content. Use this instead of echo/cat heredoc in terminal. Creates parent directories automatically. OVERWRITES the entire file — use 'patch' for targeted edits.",
"description": "Write content to a file, completely replacing existing content. Use this instead of echo/cat heredoc in terminal. Creates parent directories automatically. OVERWRITES the entire file — use 'patch' for targeted edits. Auto-runs syntax checks on .py/.json/.yaml/.toml and other linted languages; only NEW errors introduced by this write are surfaced (pre-existing errors are filtered out).",
"parameters": {
"type": "object",
"properties": {