diff --git a/run_agent.py b/run_agent.py index 34ab7f450..c5966a173 100644 --- a/run_agent.py +++ b/run_agent.py @@ -8689,6 +8689,11 @@ class AIAgent: self._persist_user_message_override = persist_user_message # Generate unique task_id if not provided to isolate VMs between concurrent tasks effective_task_id = task_id or str(uuid.uuid4()) + # Expose the active task_id so tools running mid-turn (e.g. delegate_task + # in delegate_tool.py) can identify this agent for the cross-agent file + # state registry. Set BEFORE any tool dispatch so snapshots taken at + # child-launch time see the parent's real id, not None. + self._current_task_id = effective_task_id # Reset retry counters and iteration budget at the start of each turn # so subagent usage from a previous turn doesn't eat into the next one. diff --git a/tests/tools/test_delegate.py b/tests/tools/test_delegate.py index 8487c5387..f53da7e55 100644 --- a/tests/tools/test_delegate.py +++ b/tests/tools/test_delegate.py @@ -390,7 +390,7 @@ class TestToolNamePreservation(unittest.TestCase): with patch("run_agent.AIAgent") as MockAgent: mock_child = MagicMock() - def capture_and_return(user_message): + def capture_and_return(user_message, task_id=None): captured["saved"] = list(mock_child._delegate_saved_tool_names) return {"final_response": "ok", "completed": True, "api_calls": 1} @@ -1932,7 +1932,7 @@ class TestOrchestratorEndToEnd(unittest.TestCase): m.thinking_callback = None orch_mock["agent"] = m - def _orchestrator_run(user_message=None): + def _orchestrator_run(user_message=None, task_id=None): # Re-entrant: orchestrator spawns two leaves delegate_task( tasks=[{"goal": "leaf-A"}, {"goal": "leaf-B"}], diff --git a/tests/tools/test_file_state_registry.py b/tests/tools/test_file_state_registry.py new file mode 100644 index 000000000..6038036ae --- /dev/null +++ b/tests/tools/test_file_state_registry.py @@ -0,0 +1,287 @@ +#!/usr/bin/env python3 +"""Tests for the cross-agent FileStateRegistry (tools/file_state.py). + +Covers the three layers added for safe concurrent subagent file edits: + + 1. Cross-agent staleness detection via ``check_stale`` + 2. Per-path serialization via ``lock_path`` + 3. Delegate-completion reminder via ``writes_since`` + +Plus integration through the real ``read_file_tool`` / ``write_file_tool`` +/ ``patch_tool`` handlers so the full hook wiring is exercised. + +Run: + python -m pytest tests/tools/test_file_state_registry.py -v +""" + +from __future__ import annotations + +import json +import os +import tempfile +import threading +import time +import unittest + +from tools import file_state +from tools.file_tools import ( + read_file_tool, + write_file_tool, + patch_tool, +) + + +def _tmp_file(content: str = "initial\n") -> str: + fd, path = tempfile.mkstemp(prefix="hermes_file_state_test_", suffix=".txt") + with os.fdopen(fd, "w") as f: + f.write(content) + return path + + +class FileStateRegistryUnitTests(unittest.TestCase): + """Direct unit tests on the registry singleton.""" + + def setUp(self) -> None: + file_state.get_registry().clear() + self._tmpfiles: list[str] = [] + + def tearDown(self) -> None: + for p in self._tmpfiles: + try: + os.unlink(p) + except OSError: + pass + file_state.get_registry().clear() + + def _mk(self, content: str = "x\n") -> str: + p = _tmp_file(content) + self._tmpfiles.append(p) + return p + + def test_record_read_then_check_stale_returns_none(self): + p = self._mk() + file_state.record_read("A", p) + self.assertIsNone(file_state.check_stale("A", p)) + + def test_sibling_write_flags_other_agent_as_stale(self): + p = self._mk() + file_state.record_read("A", p) + # Simulate sibling writing this file later + time.sleep(0.01) # ensure ts ordering across resolution + file_state.note_write("B", p) + warn = file_state.check_stale("A", p) + self.assertIsNotNone(warn) + self.assertIn("B", warn) + self.assertIn("sibling", warn.lower()) + + def test_write_without_read_flagged(self): + p = self._mk() + # Agent A never read this file. + file_state.note_write("B", p) # another agent touched it + warn = file_state.check_stale("A", p) + self.assertIsNotNone(warn) + + def test_partial_read_flagged_on_write(self): + p = self._mk() + file_state.record_read("A", p, partial=True) + warn = file_state.check_stale("A", p) + self.assertIsNotNone(warn) + self.assertIn("partial", warn.lower()) + + def test_external_mtime_drift_flagged(self): + p = self._mk() + file_state.record_read("A", p) + # Bump the on-disk mtime without going through the registry. + time.sleep(0.01) + os.utime(p, None) + with open(p, "w") as f: + f.write("externally modified\n") + warn = file_state.check_stale("A", p) + self.assertIsNotNone(warn) + self.assertIn("modified since you last read", warn) + + def test_own_write_updates_stamp_so_next_write_is_clean(self): + p = self._mk() + file_state.record_read("A", p) + file_state.note_write("A", p) + # Second write by the same agent — should not be flagged. + self.assertIsNone(file_state.check_stale("A", p)) + + def test_different_paths_dont_interfere(self): + a = self._mk() + b = self._mk() + file_state.record_read("A", a) + file_state.note_write("B", b) + # A reads only `a`; B writes `b`. A writing `a` is NOT stale. + self.assertIsNone(file_state.check_stale("A", a)) + + def test_lock_path_serializes_same_path(self): + p = self._mk() + events: list[tuple[str, int]] = [] + lock = threading.Lock() + + def worker(i: int) -> None: + with file_state.lock_path(p): + with lock: + events.append(("enter", i)) + time.sleep(0.01) + with lock: + events.append(("exit", i)) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(4)] + for t in threads: + t.start() + for t in threads: + t.join() + + # Every enter must be immediately followed by its matching exit. + self.assertEqual(len(events), 8) + for i in range(0, 8, 2): + self.assertEqual(events[i][0], "enter") + self.assertEqual(events[i + 1][0], "exit") + self.assertEqual(events[i][1], events[i + 1][1]) + + def test_lock_path_is_per_path_not_global(self): + a = self._mk() + b = self._mk() + b_entered = threading.Event() + + def hold_a() -> None: + with file_state.lock_path(a): + b_entered.wait(timeout=2.0) + + def enter_b() -> None: + time.sleep(0.02) # let A grab its lock + with file_state.lock_path(b): + b_entered.set() + + ta = threading.Thread(target=hold_a) + tb = threading.Thread(target=enter_b) + ta.start() + tb.start() + self.assertTrue(b_entered.wait(timeout=3.0)) + ta.join(timeout=3.0) + tb.join(timeout=3.0) + + def test_writes_since_filters_by_parent_read_set(self): + foo = self._mk() + bar = self._mk() + baz = self._mk() + file_state.record_read("parent", foo) + file_state.record_read("parent", bar) + since = time.time() + time.sleep(0.01) + file_state.note_write("child", foo) # parent read this — report + file_state.note_write("child", baz) # parent never saw — skip + + # Caller passes only paths the parent actually read (this is what + # delegate_tool does via ``known_reads(parent_task_id)``). + parent_reads = file_state.known_reads("parent") + out = file_state.writes_since("parent", since, parent_reads) + self.assertIn("child", out) + self.assertIn(foo, out["child"]) + self.assertNotIn(baz, out["child"]) + + def test_writes_since_excludes_the_target_agent(self): + p = self._mk() + file_state.record_read("parent", p) + since = time.time() + time.sleep(0.01) + file_state.note_write("parent", p) # parent's own write + out = file_state.writes_since("parent", since, [p]) + self.assertEqual(out, {}) + + def test_kill_switch_env_var(self): + p = self._mk() + os.environ["HERMES_DISABLE_FILE_STATE_GUARD"] = "1" + try: + file_state.record_read("A", p) + file_state.note_write("B", p) + self.assertIsNone(file_state.check_stale("A", p)) + self.assertEqual(file_state.known_reads("A"), []) + self.assertEqual( + file_state.writes_since("A", 0.0, [p]), + {}, + ) + finally: + del os.environ["HERMES_DISABLE_FILE_STATE_GUARD"] + + +class FileToolsIntegrationTests(unittest.TestCase): + """Integration through the real file_tools handlers. + + These exercise the wiring: read_file_tool → registry.record_read, + write_file_tool / patch_tool → check_stale + lock_path + note_write. + """ + + def setUp(self) -> None: + file_state.get_registry().clear() + self._tmpdir = tempfile.mkdtemp(prefix="hermes_file_state_int_") + + def tearDown(self) -> None: + import shutil + shutil.rmtree(self._tmpdir, ignore_errors=True) + file_state.get_registry().clear() + + def _write_seed(self, name: str, content: str = "seed\n") -> str: + p = os.path.join(self._tmpdir, name) + with open(p, "w") as f: + f.write(content) + return p + + def test_sibling_agent_write_surfaces_warning_through_handler(self): + p = self._write_seed("shared.txt") + r = json.loads(read_file_tool(path=p, task_id="agentA")) + self.assertNotIn("error", r) + + w_b = json.loads(write_file_tool(path=p, content="B wrote\n", task_id="agentB")) + self.assertNotIn("error", w_b) + + w_a = json.loads(write_file_tool(path=p, content="A stale\n", task_id="agentA")) + warn = w_a.get("_warning", "") + self.assertTrue(warn, f"expected warning, got: {w_a}") + # The cross-agent message names the sibling task_id. + self.assertIn("agentB", warn) + self.assertIn("sibling", warn.lower()) + + def test_same_agent_consecutive_writes_no_false_warning(self): + p = self._write_seed("own.txt") + json.loads(read_file_tool(path=p, task_id="agentC")) + w1 = json.loads(write_file_tool(path=p, content="one\n", task_id="agentC")) + self.assertFalse(w1.get("_warning")) + w2 = json.loads(write_file_tool(path=p, content="two\n", task_id="agentC")) + self.assertFalse(w2.get("_warning")) + + def test_patch_tool_also_surfaces_sibling_warning(self): + p = self._write_seed("p.txt", "hello world\n") + json.loads(read_file_tool(path=p, task_id="agentA")) + json.loads(write_file_tool(path=p, content="hello planet\n", task_id="agentB")) + r = json.loads( + patch_tool( + mode="replace", + path=p, + old_string="hello", + new_string="HI", + task_id="agentA", + ) + ) + warn = r.get("_warning", "") + # Patch may fail (sibling changed the content so old_string may not + # match) or succeed — either way, the cross-agent warning should be + # present when old_string still happens to match. What matters is + # that if the patch succeeded or the warning was reported, it names + # the sibling. When old_string doesn't match, the patch itself + # returns an error but the warning is still set from the pre-check. + if warn: + self.assertIn("agentB", warn) + + def test_net_new_file_no_warning(self): + p = os.path.join(self._tmpdir, "brand_new.txt") + # Nobody has read or written this before. + w = json.loads(write_file_tool(path=p, content="hi\n", task_id="agentX")) + self.assertFalse(w.get("_warning")) + self.assertNotIn("error", w) + + +if __name__ == "__main__": + unittest.main() diff --git a/tools/delegate_tool.py b/tools/delegate_tool.py index 29aab35fe..093be11c0 100644 --- a/tools/delegate_tool.py +++ b/tools/delegate_tool.py @@ -27,6 +27,7 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any, Dict, List, Optional from toolsets import TOOLSETS +from tools import file_state from utils import base_url_hostname @@ -728,7 +729,22 @@ def _run_single_child( except Exception as e: logger.debug("Progress callback start failed: %s", e) - result = child.run_conversation(user_message=goal) + # File-state coordination: generate a stable child task_id so the + # file_state registry can attribute writes back to this subagent, + # and snapshot the parent's read set at launch time. After the + # child returns we compare to detect "sibling modified files the + # parent previously read" and surface it as a reminder on the + # returned summary. + import uuid as _uuid + child_task_id = f"subagent-{task_index}-{_uuid.uuid4().hex[:8]}" + parent_task_id = getattr(parent_agent, "_current_task_id", None) + wall_start = time.time() + parent_reads_snapshot = ( + list(file_state.known_reads(parent_task_id)) + if parent_task_id else [] + ) + + result = child.run_conversation(user_message=goal, task_id=child_task_id) # Flush any remaining batched progress to gateway if child_progress_cb and hasattr(child_progress_cb, '_flush'): @@ -826,6 +842,36 @@ def _run_single_child( if status == "failed": entry["error"] = result.get("error", "Subagent did not produce a response.") + # Cross-agent file-state reminder. If this subagent wrote any + # files the parent had already read, surface it so the parent + # knows to re-read before editing — the scenario that motivated + # the registry. We check writes by ANY non-parent task_id (not + # just this child's), which also covers transitive writes from + # nested orchestrator→worker chains. + try: + if parent_task_id and parent_reads_snapshot: + sibling_writes = file_state.writes_since( + parent_task_id, wall_start, parent_reads_snapshot + ) + if sibling_writes: + mod_paths = sorted( + {p for paths in sibling_writes.values() for p in paths} + ) + if mod_paths: + reminder = ( + "\n\n[NOTE: subagent modified files the parent " + "previously read — re-read before editing: " + + ", ".join(mod_paths[:8]) + + (f" (+{len(mod_paths) - 8} more)" if len(mod_paths) > 8 else "") + + "]" + ) + if entry.get("summary"): + entry["summary"] = entry["summary"] + reminder + else: + entry["stale_paths"] = mod_paths + except Exception: + logger.debug("file_state sibling-write check failed", exc_info=True) + if child_progress_cb: try: child_progress_cb( diff --git a/tools/file_state.py b/tools/file_state.py new file mode 100644 index 000000000..f22a966e1 --- /dev/null +++ b/tools/file_state.py @@ -0,0 +1,332 @@ +"""Cross-agent file state coordination. + +Prevents mangled edits when concurrent subagents (same process, same +filesystem) touch the same file. Complements the single-agent path-overlap +check in ``run_agent._should_parallelize_tool_batch`` — this module catches +the case where subagent B writes a file that subagent A already read, so +A's next write would overwrite B's changes with stale content. + +Design +------ +A process-wide singleton ``FileStateRegistry`` tracks, per resolved path: + + * per-agent read stamps: {task_id: {path: (mtime, read_ts, partial)}} + * last writer globally: {path: (task_id, write_ts)} + * per-path ``threading.Lock`` for read→modify→write critical sections + +Three public hooks are used by the file tools: + + * ``record_read(task_id, path, *, partial)`` — called by read_file + * ``note_write(task_id, path)`` — called after write_file / patch + * ``check_stale(task_id, path)`` — called BEFORE write_file / patch + +Plus ``lock_path(path)`` — a context-manager returning a per-path lock to +wrap the whole read→modify→write block. And ``writes_since(task_id, +since_ts, paths)`` for the subagent-completion reminder in delegate_tool. + +All methods are no-ops when ``HERMES_DISABLE_FILE_STATE_GUARD=1`` is set. + +This module is intentionally separate from ``_read_tracker`` in +``file_tools.py`` — that tracker is per-task and handles consecutive-read +loop detection, which is a different concern. +""" +from __future__ import annotations + +import os +import threading +import time +from collections import defaultdict +from contextlib import contextmanager +from pathlib import Path +from typing import Dict, Iterable, List, Optional, Tuple + + +# ── Public stamp type ──────────────────────────────────────────────── +# (mtime, read_ts, partial). partial=True when read_file returned a +# windowed view (offset > 1 or limit < total_lines) — writes that happen +# after a partial read should still warn so the model re-reads in full. +ReadStamp = Tuple[float, float, bool] + +# Number of resolved-path entries retained per agent. Bounded to keep +# long sessions from accumulating unbounded state. On overflow we drop +# the oldest entries by insertion order. +_MAX_PATHS_PER_AGENT = 4096 + +# Global last-writer map cap. Same policy. +_MAX_GLOBAL_WRITERS = 4096 + + +class FileStateRegistry: + """Process-wide coordinator for cross-agent file edits.""" + + def __init__(self) -> None: + self._reads: Dict[str, Dict[str, ReadStamp]] = defaultdict(dict) + self._last_writer: Dict[str, Tuple[str, float]] = {} + self._path_locks: Dict[str, threading.Lock] = {} + self._meta_lock = threading.Lock() # guards _path_locks + self._state_lock = threading.Lock() # guards _reads + _last_writer + + # ── Path lock management ──────────────────────────────────────── + def _lock_for(self, resolved: str) -> threading.Lock: + with self._meta_lock: + lock = self._path_locks.get(resolved) + if lock is None: + lock = threading.Lock() + self._path_locks[resolved] = lock + return lock + + @contextmanager + def lock_path(self, resolved: str): + """Acquire the per-path lock for a read→modify→write section. + + Same process, same filesystem — threads on the same path serialize. + Different paths proceed in parallel. + """ + lock = self._lock_for(resolved) + lock.acquire() + try: + yield + finally: + lock.release() + + # ── Read/write accounting ─────────────────────────────────────── + def record_read( + self, + task_id: str, + resolved: str, + *, + partial: bool = False, + mtime: Optional[float] = None, + ) -> None: + if _disabled(): + return + if mtime is None: + try: + mtime = os.path.getmtime(resolved) + except OSError: + return + now = time.time() + with self._state_lock: + agent_reads = self._reads[task_id] + agent_reads[resolved] = (float(mtime), now, bool(partial)) + _cap_dict(agent_reads, _MAX_PATHS_PER_AGENT) + + def note_write( + self, + task_id: str, + resolved: str, + *, + mtime: Optional[float] = None, + ) -> None: + """Record a successful write. + + Updates the global last-writer map AND this agent's own read stamp + (a write is an implicit read — the agent now knows the current + content). + """ + if _disabled(): + return + if mtime is None: + try: + mtime = os.path.getmtime(resolved) + except OSError: + return + now = time.time() + with self._state_lock: + self._last_writer[resolved] = (task_id, now) + _cap_dict(self._last_writer, _MAX_GLOBAL_WRITERS) + # Writer's own view is now up-to-date. + self._reads[task_id][resolved] = (float(mtime), now, False) + _cap_dict(self._reads[task_id], _MAX_PATHS_PER_AGENT) + + def check_stale(self, task_id: str, resolved: str) -> Optional[str]: + """Return a model-facing warning if this write would be stale. + + Three staleness classes, in order of severity: + + 1. Sibling subagent wrote this file after this agent's last read. + 2. External/unknown change (mtime differs from our last read). + 3. Agent never read the file (write-without-read). + + Returns ``None`` when the write is safe. Does not raise — callers + decide whether to block or warn. + """ + if _disabled(): + return None + with self._state_lock: + stamp = self._reads.get(task_id, {}).get(resolved) + last_writer = self._last_writer.get(resolved) + + # Case 3: never read AND we have no write record — net-new file or + # first touch by this agent. Let existing _check_sensitive_path + # and file-exists logic handle it; nothing to warn about here. + if stamp is None and last_writer is None: + return None + + try: + current_mtime = os.path.getmtime(resolved) + except OSError: + # File doesn't exist — write will create it; not stale. + return None + + # Case 1: sibling subagent modified after our last read. + if last_writer is not None: + writer_tid, writer_ts = last_writer + if writer_tid != task_id: + if stamp is None: + return ( + f"{resolved} was modified by sibling subagent " + f"{writer_tid!r} but this agent never read it. " + "Read the file before writing to avoid overwriting " + "the sibling's changes." + ) + read_ts = stamp[1] + if writer_ts > read_ts: + return ( + f"{resolved} was modified by sibling subagent " + f"{writer_tid!r} at {_fmt_ts(writer_ts)} — after " + f"this agent's last read at {_fmt_ts(read_ts)}. " + "Re-read the file before writing." + ) + + # Case 2: external / unknown modification (mtime drifted). + if stamp is not None: + read_mtime, _read_ts, partial = stamp + if current_mtime != read_mtime: + return ( + f"{resolved} was modified since you last read it " + "on disk (external edit or unrecorded writer). " + "Re-read the file before writing." + ) + if partial: + return ( + f"{resolved} was last read with offset/limit pagination " + "(partial view). Re-read the whole file before " + "overwriting it." + ) + + # Case 3b: agent truly never read the file. + if stamp is None: + return ( + f"{resolved} was not read by this agent. " + "Read the file first so you can write an informed edit." + ) + + return None + + # ── Reminder helper for delegate_tool ─────────────────────────── + def writes_since( + self, + exclude_task_id: str, + since_ts: float, + paths: Iterable[str], + ) -> Dict[str, List[str]]: + """Return ``{writer_task_id: [paths]}`` for writes done after + ``since_ts`` by agents OTHER than ``exclude_task_id``. + + Used by delegate_task to append a "subagent modified files the + parent previously read" reminder to the delegation result. + """ + if _disabled(): + return {} + paths_set = set(paths) + out: Dict[str, List[str]] = defaultdict(list) + with self._state_lock: + for p, (writer_tid, ts) in self._last_writer.items(): + if writer_tid == exclude_task_id: + continue + if ts < since_ts: + continue + if p in paths_set: + out[writer_tid].append(p) + return dict(out) + + def known_reads(self, task_id: str) -> List[str]: + """Return the list of resolved paths this agent has read.""" + if _disabled(): + return [] + with self._state_lock: + return list(self._reads.get(task_id, {}).keys()) + + # ── Testing hooks ─────────────────────────────────────────────── + def clear(self) -> None: + """Reset all state. Intended for tests only.""" + with self._state_lock: + self._reads.clear() + self._last_writer.clear() + with self._meta_lock: + self._path_locks.clear() + + +# ── Module-level singleton + helpers ───────────────────────────────── +_registry = FileStateRegistry() + + +def get_registry() -> FileStateRegistry: + return _registry + + +def _disabled() -> bool: + # Re-read each call so tests can toggle via monkeypatch.setenv. + return os.environ.get("HERMES_DISABLE_FILE_STATE_GUARD", "").strip() == "1" + + +def _fmt_ts(ts: float) -> str: + # Short relative wall-clock for error messages; avoids pulling in + # datetime formatting overhead on the hot path. + return time.strftime("%H:%M:%S", time.localtime(ts)) + + +def _cap_dict(d: dict, limit: int) -> None: + """Trim a dict to ``limit`` entries by dropping insertion-order oldest.""" + over = len(d) - limit + if over <= 0: + return + # dict preserves insertion order (PY>=3.7) — pop the oldest keys. + it = iter(d) + for _ in range(over): + try: + d.pop(next(it)) + except (StopIteration, KeyError): + break + + +# ── Convenience wrappers (short names used at call sites) ──────────── +def record_read(task_id: str, resolved_or_path: str | Path, *, partial: bool = False) -> None: + _registry.record_read(task_id, str(resolved_or_path), partial=partial) + + +def note_write(task_id: str, resolved_or_path: str | Path) -> None: + _registry.note_write(task_id, str(resolved_or_path)) + + +def check_stale(task_id: str, resolved_or_path: str | Path) -> Optional[str]: + return _registry.check_stale(task_id, str(resolved_or_path)) + + +def lock_path(resolved_or_path: str | Path): + return _registry.lock_path(str(resolved_or_path)) + + +def writes_since( + exclude_task_id: str, + since_ts: float, + paths: Iterable[str | Path], +) -> Dict[str, List[str]]: + return _registry.writes_since(exclude_task_id, since_ts, [str(p) for p in paths]) + + +def known_reads(task_id: str) -> List[str]: + return _registry.known_reads(task_id) + + +__all__ = [ + "FileStateRegistry", + "get_registry", + "record_read", + "note_write", + "check_stale", + "lock_path", + "writes_since", + "known_reads", +] diff --git a/tools/file_tools.py b/tools/file_tools.py index 5b44ff03d..a2e72e7ec 100644 --- a/tools/file_tools.py +++ b/tools/file_tools.py @@ -12,6 +12,7 @@ from typing import Optional from agent.file_safety import get_read_block_error from tools.binary_extensions import has_binary_extension from tools.file_operations import ShellFileOperations +from tools import file_state from agent.redact import redact_sensitive_text logger = logging.getLogger(__name__) @@ -483,6 +484,19 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = # accumulate megabytes of dict/set state. See _cap_read_tracker_data. _cap_read_tracker_data(task_data) + # Cross-agent file-state registry (separate from per-task read + # tracker above): records that THIS agent has read this path so + # write/patch can detect sibling-subagent writes that happened + # after our read. Partial read when offset>1 or the read was + # truncated (large file with more content than limit covered). + # Outside the _read_tracker_lock so the registry's own locking + # isn't nested under ours. + try: + _partial = (offset > 1) or bool(result_dict.get("truncated")) + file_state.record_read(task_id, resolved_str, partial=_partial) + except Exception: + logger.debug("file_state.record_read failed", exc_info=True) + if count >= 4: # Hard block: stop returning content to break the loop return json.dumps({ @@ -602,15 +616,43 @@ def write_file_tool(path: str, content: str, task_id: str = "default") -> str: if sensitive_err: return tool_error(sensitive_err) try: - stale_warning = _check_file_staleness(path, task_id) - file_ops = _get_file_ops(task_id) - result = file_ops.write_file(path, content) - result_dict = result.to_dict() - if stale_warning: - result_dict["_warning"] = stale_warning - # Refresh the stored timestamp so consecutive writes by this - # task don't trigger false staleness warnings. - _update_read_timestamp(path, task_id) + # Resolve once for the registry lock + stale check. Failures here + # fall back to the legacy path — write proceeds, per-task staleness + # check below still runs. + try: + _resolved = str(_resolve_path(path)) + except Exception: + _resolved = None + + if _resolved is None: + stale_warning = _check_file_staleness(path, task_id) + file_ops = _get_file_ops(task_id) + result = file_ops.write_file(path, content) + result_dict = result.to_dict() + if stale_warning: + result_dict["_warning"] = stale_warning + _update_read_timestamp(path, task_id) + return json.dumps(result_dict, ensure_ascii=False) + + # Serialize the read→modify→write region per-path so concurrent + # subagents can't interleave on the same file. Different paths + # remain fully parallel. + with file_state.lock_path(_resolved): + # Cross-agent staleness wins over per-task warning when both + # fire — its message names the sibling subagent. + cross_warning = file_state.check_stale(task_id, _resolved) + stale_warning = _check_file_staleness(path, task_id) + file_ops = _get_file_ops(task_id) + result = file_ops.write_file(path, content) + result_dict = result.to_dict() + effective_warning = cross_warning or stale_warning + if effective_warning: + result_dict["_warning"] = effective_warning + # Refresh stamps after the successful write so consecutive + # writes by this task don't trigger false staleness warnings. + _update_read_timestamp(path, task_id) + if not result_dict.get("error"): + file_state.note_write(task_id, _resolved) return json.dumps(result_dict, ensure_ascii=False) except Exception as e: if _is_expected_write_exception(e): @@ -637,36 +679,70 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None, if sensitive_err: return tool_error(sensitive_err) try: - # Check staleness for all files this patch will touch. - stale_warnings = [] + # Resolve paths for locking. Ordered + deduplicated so concurrent + # callers lock in the same order — prevents deadlock on overlapping + # multi-file V4A patches. + _resolved_paths: list[str] = [] + _seen: set[str] = set() for _p in _paths_to_check: - _sw = _check_file_staleness(_p, task_id) - if _sw: - stale_warnings.append(_sw) + try: + _r = str(_resolve_path(_p)) + except Exception: + _r = None + if _r and _r not in _seen: + _resolved_paths.append(_r) + _seen.add(_r) + _resolved_paths.sort() - file_ops = _get_file_ops(task_id) - - if mode == "replace": - if not path: - return tool_error("path required") - if old_string is None or new_string is None: - return tool_error("old_string and new_string required") - result = file_ops.patch_replace(path, old_string, new_string, replace_all) - elif mode == "patch": - if not patch: - return tool_error("patch content required") - result = file_ops.patch_v4a(patch) - else: - return tool_error(f"Unknown mode: {mode}") - - result_dict = result.to_dict() - if stale_warnings: - result_dict["_warning"] = stale_warnings[0] if len(stale_warnings) == 1 else " | ".join(stale_warnings) - # Refresh stored timestamps for all successfully-patched paths so - # consecutive edits by this task don't trigger false warnings. - if not result_dict.get("error"): + # Acquire per-path locks in sorted order via ExitStack. On single + # path this degenerates to one lock; on empty list (unresolvable) + # it's a no-op and execution falls through unchanged. + from contextlib import ExitStack + with ExitStack() as _locks: + for _r in _resolved_paths: + _locks.enter_context(file_state.lock_path(_r)) + + # Collect warnings — cross-agent registry first (names sibling), + # then per-task tracker as a fallback. + stale_warnings: list[str] = [] + _path_to_resolved: dict[str, str] = {} for _p in _paths_to_check: - _update_read_timestamp(_p, task_id) + try: + _r = str(_resolve_path(_p)) + except Exception: + _r = None + _path_to_resolved[_p] = _r + _cross = file_state.check_stale(task_id, _r) if _r else None + _sw = _cross or _check_file_staleness(_p, task_id) + if _sw: + stale_warnings.append(_sw) + + file_ops = _get_file_ops(task_id) + + if mode == "replace": + if not path: + return tool_error("path required") + if old_string is None or new_string is None: + return tool_error("old_string and new_string required") + result = file_ops.patch_replace(path, old_string, new_string, replace_all) + elif mode == "patch": + if not patch: + return tool_error("patch content required") + result = file_ops.patch_v4a(patch) + else: + return tool_error(f"Unknown mode: {mode}") + + result_dict = result.to_dict() + if stale_warnings: + result_dict["_warning"] = stale_warnings[0] if len(stale_warnings) == 1 else " | ".join(stale_warnings) + # Refresh stored timestamps for all successfully-patched paths so + # consecutive edits by this task don't trigger false warnings. + if not result_dict.get("error"): + for _p in _paths_to_check: + _update_read_timestamp(_p, task_id) + _r = _path_to_resolved.get(_p) + if _r: + file_state.note_write(task_id, _r) result_json = json.dumps(result_dict, ensure_ascii=False) # Hint when old_string not found — saves iterations where the agent # retries with stale content instead of re-reading the file.