fix(file_tools): refresh staleness timestamp after writes (#4390)

After a successful write_file or patch, update the stored read
timestamp to match the file's new modification time.  Without this,
consecutive edits by the same task (read → write → write) would
false-warn on the second write because the stored timestamp still
reflected the original read, not the first write.

Also renames the internal tracker key from 'file_mtimes' to
'read_timestamps' for clarity.
This commit is contained in:
Teknium 2026-04-01 00:50:08 -07:00 committed by GitHub
parent 83dec2b3ec
commit ef2ae3e48f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 36 additions and 7 deletions

View file

@ -136,9 +136,12 @@ _file_ops_cache: dict = {}
# Used to skip re-reads of unchanged files. Reset on
# context compression (the original content is summarised
# away so the model needs the full content again).
# "file_mtimes": dict mapping resolved_path → mtime float at last read.
# Used by write_file and patch to detect when a file was
# modified externally between the agent's read and write.
# "read_timestamps": dict mapping resolved_path → modification-time float
# recorded when the file was last read (or written) by
# this task. Used by write_file and patch to detect
# external changes between the agent's read and write.
# Updated after successful writes so consecutive edits
# by the same task don't trigger false warnings.
_read_tracker_lock = threading.Lock()
_read_tracker: dict = {}
@ -401,7 +404,7 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
try:
_mtime_now = os.path.getmtime(resolved_str)
task_data["dedup"][dedup_key] = _mtime_now
task_data.setdefault("file_mtimes", {})[resolved_str] = _mtime_now
task_data.setdefault("read_timestamps", {})[resolved_str] = _mtime_now
except OSError:
pass # Can't stat — skip tracking for this entry
@ -500,6 +503,24 @@ def notify_other_tool_call(task_id: str = "default"):
task_data["consecutive"] = 0
def _update_read_timestamp(filepath: str, task_id: str) -> None:
"""Record the file's current modification time after a successful write.
Called after write_file and patch so that consecutive edits by the
same task don't trigger false staleness warnings — each write
refreshes the stored timestamp to match the file's new state.
"""
try:
resolved = str(Path(filepath).expanduser().resolve())
current_mtime = os.path.getmtime(resolved)
except (OSError, ValueError):
return
with _read_tracker_lock:
task_data = _read_tracker.get(task_id)
if task_data is not None:
task_data.setdefault("read_timestamps", {})[resolved] = current_mtime
def _check_file_staleness(filepath: str, task_id: str) -> str | None:
"""Check whether a file was modified since the agent last read it.
@ -515,7 +536,7 @@ def _check_file_staleness(filepath: str, task_id: str) -> str | None:
task_data = _read_tracker.get(task_id)
if not task_data:
return None
read_mtime = task_data.get("file_mtimes", {}).get(resolved)
read_mtime = task_data.get("read_timestamps", {}).get(resolved)
if read_mtime is None:
return None # File was never read — nothing to compare against
try:
@ -543,6 +564,9 @@ def write_file_tool(path: str, content: str, task_id: str = "default") -> str:
result_dict = result.to_dict()
if stale_warning:
result_dict["_warning"] = stale_warning
# Refresh the stored timestamp so consecutive writes by this
# task don't trigger false staleness warnings.
_update_read_timestamp(path, task_id)
return json.dumps(result_dict, ensure_ascii=False)
except Exception as e:
if _is_expected_write_exception(e):
@ -594,6 +618,11 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None,
result_dict = result.to_dict()
if stale_warnings:
result_dict["_warning"] = stale_warnings[0] if len(stale_warnings) == 1 else " | ".join(stale_warnings)
# Refresh stored timestamps for all successfully-patched paths so
# consecutive edits by this task don't trigger false warnings.
if not result_dict.get("error"):
for _p in _paths_to_check:
_update_read_timestamp(_p, task_id)
result_json = json.dumps(result_dict, ensure_ascii=False)
# Hint when old_string not found — saves iterations where the agent
# retries with stale content instead of re-reading the file.