hermes-agent/tests/tools/test_file_tools.py
teknium1 b2faeba182 fix(file-ops): make preserved cwd reachable at write-time resolution (#26211)
Belt-and-suspenders on top of the cherry-picked cwd-preservation fix:

- Proactively mirror every live terminal cwd into _last_known_cwd on each
  successful read, so the durable anchor survives even when the cleanup
  thread pops both _file_ops_cache and _active_environments before
  _get_file_ops' stale-cache save branch can fire.
- Fall back to _last_known_cwd in _authoritative_workspace_root. write_file_tool
  resolves the path (via _resolve_path_for_task) BEFORE _get_file_ops rebuilds
  the env, so restoring only the rebuilt env's cwd was insufficient — the
  resolution that decides where the file lands runs first. This closes that gap.

The local env's persisted _cwd_file can't serve this role: it's keyed by a
random per-session uuid and deleted on cleanup (the same cleanup that triggers
the bug). The in-memory _last_known_cwd registry is the durable anchor instead.

Adds a real-IO E2E regression (TestSilentFileMisplacementE2E) exercising the
actual write_file_tool path after env cleanup.
2026-06-27 19:29:06 -07:00

755 lines
32 KiB
Python

"""Tests for the file tools module (schema, handler wiring, error paths).
Tests verify tool schemas, handler dispatch, validation logic, and error
handling without requiring a running terminal environment.
"""
import json
import logging
from unittest.mock import MagicMock, patch
from tools.file_tools import (
PATCH_SCHEMA,
)
class TestReadFileHandler:
@patch("tools.file_tools._get_file_ops")
def test_returns_file_content(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.content = "line1\nline2"
result_obj.to_dict.return_value = {"content": "line1\nline2", "total_lines": 2}
mock_ops.read_file.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import read_file_tool
result = json.loads(read_file_tool("/tmp/test.txt"))
assert result["content"] == "line1\nline2"
assert result["total_lines"] == 2
mock_ops.read_file.assert_called_once_with("/tmp/test.txt", 1, 500)
@patch("tools.file_tools._get_file_ops")
def test_custom_offset_and_limit(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.content = "line10"
result_obj.to_dict.return_value = {"content": "line10", "total_lines": 50}
mock_ops.read_file.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import read_file_tool
read_file_tool("/tmp/big.txt", offset=10, limit=20)
mock_ops.read_file.assert_called_once_with("/tmp/big.txt", 10, 20)
@patch("tools.file_tools._get_file_ops")
def test_invalid_offset_and_limit_are_normalized_before_dispatch(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.content = "line1"
result_obj.to_dict.return_value = {"content": "line1", "total_lines": 1}
mock_ops.read_file.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import read_file_tool
read_file_tool("/tmp/big.txt", offset=0, limit=0)
mock_ops.read_file.assert_called_once_with("/tmp/big.txt", 1, 1)
@patch("tools.file_tools._get_file_ops")
def test_exception_returns_error_json(self, mock_get):
mock_get.side_effect = RuntimeError("terminal not available")
from tools.file_tools import read_file_tool
result = json.loads(read_file_tool("/tmp/test.txt"))
assert "error" in result
assert "terminal not available" in result["error"]
class TestWriteFileHandler:
@patch("tools.file_tools._get_file_ops")
def test_writes_content(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"status": "ok", "path": "/tmp/out.txt", "bytes": 13}
mock_ops.write_file.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import write_file_tool
result = json.loads(write_file_tool("/tmp/out.txt", "hello world!\n"))
assert result["status"] == "ok"
mock_ops.write_file.assert_called_once_with("/tmp/out.txt", "hello world!\n")
@patch("tools.file_tools._get_file_ops")
def test_permission_error_returns_error_json_without_error_log(self, mock_get, caplog):
mock_get.side_effect = PermissionError("read-only filesystem")
from tools.file_tools import write_file_tool
with caplog.at_level(logging.DEBUG, logger="tools.file_tools"):
result = json.loads(write_file_tool("/tmp/out.txt", "data"))
assert "error" in result
assert "read-only" in result["error"]
assert any("write_file expected denial" in r.getMessage() for r in caplog.records)
assert not any(r.levelno >= logging.ERROR for r in caplog.records)
@patch("tools.file_tools._get_file_ops")
def test_rejects_read_file_line_numbered_content(self, mock_get):
"""#19798 — do not persist read_file's LINE_NUM|CONTENT display format."""
from tools.file_tools import write_file_tool
content = " 1|setting: new_value\n 2|other: thing\n"
result = json.loads(write_file_tool("/tmp/config.yaml", content))
assert "error" in result
assert "line-number" in result["error"].lower()
mock_get.assert_not_called()
@patch("tools.file_tools._get_file_ops")
def test_allows_sparse_literal_pipe_content(self, mock_get):
"""A single literal N| line should not be treated as read_file output."""
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"status": "ok", "path": "/tmp/out.txt", "bytes": 21}
mock_ops.write_file.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import write_file_tool
result = json.loads(write_file_tool("/tmp/out.txt", "1|literal value\nplain line\n"))
assert result["status"] == "ok"
mock_ops.write_file.assert_called_once()
@patch("tools.file_tools._get_file_ops")
def test_unexpected_exception_still_logs_error(self, mock_get, caplog):
mock_get.side_effect = RuntimeError("boom")
from tools.file_tools import write_file_tool
with caplog.at_level(logging.ERROR, logger="tools.file_tools"):
result = json.loads(write_file_tool("/tmp/out.txt", "data"))
assert result["error"] == "boom"
assert any("write_file error" in r.getMessage() for r in caplog.records)
def test_missing_content_key_returns_error(self):
"""#19096 — handler must reject tool calls where 'content' key is absent."""
from tools.file_tools import _handle_write_file
result = json.loads(_handle_write_file({"path": "/tmp/oops.md"}))
assert "error" in result
assert "content" in result["error"]
assert "path" not in result.get("error", "").lower() or "missing" not in result.get("error", "").lower() or True # just check error present
def test_missing_path_key_returns_error(self):
"""#19096 — handler must reject tool calls where 'path' key is absent."""
from tools.file_tools import _handle_write_file
result = json.loads(_handle_write_file({"content": "hello"}))
assert "error" in result
def test_explicit_empty_content_is_allowed(self):
"""#19096 — explicit empty string content (file truncation) must still work."""
from tools.file_tools import _handle_write_file
with patch("tools.file_tools._get_file_ops") as mock_get:
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"status": "ok", "path": "/tmp/empty.txt", "bytes": 0}
mock_ops.write_file.return_value = result_obj
mock_get.return_value = mock_ops
result = json.loads(_handle_write_file({"path": "/tmp/empty.txt", "content": ""}))
assert result["status"] == "ok"
def test_non_string_content_returns_error(self):
"""#19096 — content must be a string, not a dict or list."""
from tools.file_tools import _handle_write_file
result = json.loads(_handle_write_file({"path": "/tmp/x.txt", "content": {"nested": "dict"}}))
assert "error" in result
assert "string" in result["error"].lower() or "content" in result["error"].lower()
class TestPatchHandler:
@patch("tools.file_tools._get_file_ops")
def test_replace_mode_calls_patch_replace(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"status": "ok", "replacements": 1}
mock_ops.patch_replace.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import patch_tool
result = json.loads(patch_tool(
mode="replace", path="/tmp/f.py",
old_string="foo", new_string="bar"
))
assert result["status"] == "ok"
mock_ops.patch_replace.assert_called_once_with("/tmp/f.py", "foo", "bar", False)
@patch("tools.file_tools._get_file_ops")
def test_replace_mode_replace_all_flag(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"status": "ok", "replacements": 5}
mock_ops.patch_replace.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import patch_tool
patch_tool(mode="replace", path="/tmp/f.py",
old_string="x", new_string="y", replace_all=True)
mock_ops.patch_replace.assert_called_once_with("/tmp/f.py", "x", "y", True)
@patch("tools.file_tools._get_file_ops")
def test_replace_mode_missing_path_errors(self, mock_get):
from tools.file_tools import patch_tool
result = json.loads(patch_tool(mode="replace", path=None, old_string="a", new_string="b"))
assert "error" in result
@patch("tools.file_tools._get_file_ops")
def test_replace_mode_missing_strings_errors(self, mock_get):
from tools.file_tools import patch_tool
result = json.loads(patch_tool(mode="replace", path="/tmp/f.py", old_string=None, new_string="b"))
assert "error" in result
@patch("tools.file_tools._get_file_ops")
def test_patch_mode_calls_patch_v4a(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"status": "ok", "operations": 1}
mock_ops.patch_v4a.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import patch_tool
result = json.loads(patch_tool(mode="patch", patch="*** Begin Patch\n..."))
assert result["status"] == "ok"
mock_ops.patch_v4a.assert_called_once()
@patch("tools.file_tools._get_file_ops")
def test_patch_mode_missing_content_errors(self, mock_get):
from tools.file_tools import patch_tool
result = json.loads(patch_tool(mode="patch", patch=None))
assert "error" in result
@patch("tools.file_tools._get_file_ops")
def test_unknown_mode_errors(self, mock_get):
from tools.file_tools import patch_tool
result = json.loads(patch_tool(mode="invalid_mode"))
assert "error" in result
assert "Unknown mode" in result["error"]
@patch("tools.file_tools._get_file_ops")
def test_patch_v4a_rejects_traversal_in_update_header(self, mock_get):
"""V4A '*** Update File:' headers come from patch content, which can
carry prompt-injection-controlled paths (skill content, web extract).
``..`` traversal in the header must be rejected before the patch is
applied, even though the explicit ``path=`` arg is allowed to use
``..`` for legitimate cross-worktree edits."""
from tools.file_tools import patch_tool
result = json.loads(patch_tool(
mode="patch",
patch=(
"*** Begin Patch\n"
"*** Update File: ../../../etc/shadow\n"
"@@ -1,3 +1,3 @@\n"
"-old\n"
"+new\n"
"*** End Patch\n"
),
))
assert "error" in result
assert "traversal" in result["error"].lower()
# patch_v4a must not be invoked when the header is rejected
mock_get.return_value.patch_v4a.assert_not_called()
@patch("tools.file_tools._get_file_ops")
def test_patch_v4a_rejects_traversal_in_add_header(self, mock_get):
from tools.file_tools import patch_tool
result = json.loads(patch_tool(
mode="patch",
patch=(
"*** Begin Patch\n"
"*** Add File: ../../../tmp/dropped.py\n"
"+print('pwned')\n"
"*** End Patch\n"
),
))
assert "error" in result
assert "traversal" in result["error"].lower()
class TestSearchHandler:
@patch("tools.file_tools._get_file_ops")
def test_search_calls_file_ops(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"matches": ["file1.py:3:match"]}
mock_ops.search.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import search_tool
result = json.loads(search_tool(pattern="TODO", target="content", path="."))
assert "matches" in result
mock_ops.search.assert_called_once()
@patch("tools.file_tools._get_file_ops")
def test_search_passes_all_params(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"matches": []}
mock_ops.search.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import search_tool
search_tool(pattern="class", target="files", path="/src",
file_glob="*.py", limit=10, offset=5, output_mode="count", context=2)
mock_ops.search.assert_called_once_with(
pattern="class", path="/src", target="files", file_glob="*.py",
limit=10, offset=5, output_mode="count", context=2,
)
@patch("tools.file_tools._get_file_ops")
def test_search_normalizes_invalid_pagination_before_dispatch(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"files": []}
mock_ops.search.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import search_tool
search_tool(pattern="class", target="files", path="/src", limit=-5, offset=-2)
mock_ops.search.assert_called_once_with(
pattern="class", path="/src", target="files", file_glob=None,
limit=1, offset=0, output_mode="content", context=0,
)
@patch("tools.file_tools._get_file_ops")
def test_search_exception_returns_error(self, mock_get):
mock_get.side_effect = RuntimeError("no terminal")
from tools.file_tools import search_tool
result = json.loads(search_tool(pattern="x"))
assert "error" in result
# ---------------------------------------------------------------------------
# Tool result hint tests (#722)
# ---------------------------------------------------------------------------
class TestPatchHints:
"""Patch tool should hint when old_string is not found."""
@patch("tools.file_tools._get_file_ops")
def test_no_match_includes_hint(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {
"error": "Could not find match for old_string in foo.py"
}
mock_ops.patch_replace.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import patch_tool
raw = patch_tool(mode="replace", path="foo.py", old_string="x", new_string="y")
# patch_tool surfaces the hint as a structured "_hint" field on the
# JSON error payload (not an inline "[Hint: ..." tail).
assert "_hint" in raw
assert "read_file" in raw
@patch("tools.file_tools._get_file_ops")
def test_success_no_hint(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"success": True, "diff": "--- a\n+++ b"}
mock_ops.patch_replace.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import patch_tool
raw = patch_tool(mode="replace", path="foo.py", old_string="x", new_string="y")
assert "_hint" not in raw
class TestSearchHints:
"""Search tool should hint when results are truncated."""
def setup_method(self):
"""Clear read/search tracker between tests to avoid cross-test state."""
from tools.file_tools import _read_tracker
_read_tracker.clear()
@patch("tools.file_tools._get_file_ops")
def test_truncated_results_hint(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {
"total_count": 100,
"matches": [{"path": "a.py", "line": 1, "content": "x"}] * 50,
"truncated": True,
}
mock_ops.search.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import search_tool
raw = search_tool(pattern="foo", offset=0, limit=50)
assert "[Hint:" in raw
assert "offset=50" in raw
@patch("tools.file_tools._get_file_ops")
def test_non_truncated_no_hint(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {
"total_count": 3,
"matches": [{"path": "a.py", "line": 1, "content": "x"}] * 3,
}
mock_ops.search.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import search_tool
raw = search_tool(pattern="foo")
assert "[Hint:" not in raw
@patch("tools.file_tools._get_file_ops")
def test_truncated_hint_with_nonzero_offset(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {
"total_count": 150,
"matches": [{"path": "a.py", "line": 1, "content": "x"}] * 50,
"truncated": True,
}
mock_ops.search.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import search_tool
raw = search_tool(pattern="foo", offset=50, limit=50)
assert "[Hint:" in raw
assert "offset=100" in raw
# ---------------------------------------------------------------------------
# PATCH_SCHEMA shape tests (issue #15524)
# ---------------------------------------------------------------------------
class TestSensitivePathCheck:
"""Verify that _check_sensitive_path blocks writes to protected locations."""
def test_hermes_config_blocked_for_write_file(self, tmp_path, monkeypatch):
fake_config = tmp_path / "config.yaml"
monkeypatch.setattr("tools.file_tools._hermes_config_resolved", str(fake_config))
monkeypatch.setattr("tools.file_tools._hermes_config_resolved_loaded", True)
from tools.file_tools import write_file_tool
result = json.loads(write_file_tool(str(fake_config), "approvals:\n mode: off\n"))
assert "error" in result
assert "Hermes config" in result["error"]
def test_hermes_config_blocked_via_tilde_path(self, tmp_path, monkeypatch):
fake_config = tmp_path / "config.yaml"
monkeypatch.setattr("tools.file_tools._hermes_config_resolved", str(fake_config))
monkeypatch.setattr("tools.file_tools._hermes_config_resolved_loaded", True)
from tools.file_tools import write_file_tool
result = json.loads(write_file_tool(str(fake_config), "approvals:\n mode: off\n"))
assert "error" in result
assert "Hermes config" in result["error"]
def test_hermes_config_blocked_for_patch(self, tmp_path, monkeypatch):
fake_config = tmp_path / "config.yaml"
fake_config.write_text("approvals:\n mode: manual\n")
monkeypatch.setattr("tools.file_tools._hermes_config_resolved", str(fake_config))
monkeypatch.setattr("tools.file_tools._hermes_config_resolved_loaded", True)
from tools.file_tools import patch_tool
result = json.loads(patch_tool(
mode="replace",
path=str(fake_config),
old_string="mode: manual",
new_string="mode: off",
))
assert "error" in result
assert "Hermes config" in result["error"]
def test_system_path_still_blocked(self, monkeypatch):
monkeypatch.setattr("tools.file_tools._hermes_config_resolved", "/some/other/path")
monkeypatch.setattr("tools.file_tools._hermes_config_resolved_loaded", True)
from tools.file_tools import write_file_tool
result = json.loads(write_file_tool("/etc/passwd", "evil"))
assert "error" in result
assert "sensitive system path" in result["error"]
@patch("tools.file_tools._get_file_ops")
def test_normal_file_not_blocked(self, mock_get, monkeypatch):
monkeypatch.setattr("tools.file_tools._hermes_config_resolved", "/home/user/.hermes/config.yaml")
monkeypatch.setattr("tools.file_tools._hermes_config_resolved_loaded", True)
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"status": "ok", "path": "/tmp/other.txt", "bytes": 5}
mock_ops.write_file.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import write_file_tool
result = json.loads(write_file_tool("/tmp/other.txt", "hello"))
assert result["status"] == "ok"
class TestPatchSchemaShape:
"""PATCH_SCHEMA must advertise per-mode required params via description
text (not JSON-schema ``required``), so strict models like kimi-k2.x stop
silently omitting old_string / new_string / patch content."""
def test_per_mode_required_params_documented_in_descriptions(self):
desc = PATCH_SCHEMA["description"]
assert "REQUIRED PARAMETERS: mode, path, old_string, new_string" in desc
assert "REQUIRED PARAMETERS: mode, patch" in desc
props = PATCH_SCHEMA["parameters"]["properties"]
for name in ("path", "old_string", "new_string"):
assert "REQUIRED when mode='replace'" in props[name]["description"]
assert "REQUIRED when mode='patch'" in props["patch"]["description"]
def test_no_anyof_required_stays_mode_only(self):
# anyOf/oneOf at parameters level break Anthropic, Fireworks, and the
# Moonshot/Kimi schema sanitizer — description-level guidance is the
# only provider-safe signalling mechanism.
params = PATCH_SCHEMA["parameters"]
assert params["required"] == ["mode"]
assert "anyOf" not in params and "oneOf" not in params
# ---------------------------------------------------------------------------
# _last_known_cwd tests (#26211: silent file creation failure in long conversations)
# ---------------------------------------------------------------------------
class TestLastKnownCwd:
"""
When the terminal environment is cleaned up and re-created during a long
conversation, _last_known_cwd preserves the old environment's CWD so
subsequent file writes with relative paths land in the right directory.
Regression guard for issue #26211.
"""
@patch("tools.terminal_tool._active_environments", new_callable=dict)
@patch("tools.file_tools._file_ops_cache", new_callable=dict)
@patch("tools.terminal_tool._get_env_config")
@patch("tools.terminal_tool._create_environment")
def test_last_known_cwd_preserved_across_env_recreation(
self, mock_create_env, mock_config, mock_cache, mock_active
):
from tools.file_tools import _get_file_ops, _last_known_cwd
# Setup: create a mock env with a known CWD
mock_env = MagicMock()
mock_env.cwd = "/Users/user/project"
mock_create_env.return_value = mock_env
mock_config.return_value = {
"env_type": "local",
"cwd": "/default/path",
"timeout": 30,
}
task_id = "default"
# Preset _last_known_cwd to simulate a previous env's CWD
_last_known_cwd[task_id] = "/Users/user/project"
# Call _get_file_ops - should use _last_known_cwd for the new env
result = _get_file_ops(task_id)
# Verify the env was created with the saved CWD, not the default
create_call = mock_create_env.call_args
assert create_call is not None, "_create_environment was not called"
# Find cwd in the kwargs
kwargs = create_call.kwargs if create_call.kwargs else {}
# cwd is passed as positional or keyword
cwd_passed = kwargs.get("cwd", None)
if cwd_passed is None:
# Try positional args
args = create_call.args if create_call.args else []
# Position: (env_type, image, cwd, timeout, ...)
if len(args) >= 3:
cwd_passed = args[2]
assert cwd_passed == "/Users/user/project", \
f"Expected cwd='/Users/user/project', got {cwd_passed!r}"
# Cleanup
_last_known_cwd.pop(task_id, None)
@patch("tools.terminal_tool._active_environments", new_callable=dict)
@patch("tools.file_tools._file_ops_cache", new_callable=dict)
@patch("tools.terminal_tool._get_env_config")
@patch("tools.terminal_tool._create_environment")
def test_last_known_cwd_falls_back_to_config_default_when_not_set(
self, mock_create_env, mock_config, mock_cache, mock_active
):
from tools.file_tools import _get_file_ops, _last_known_cwd
mock_env = MagicMock()
mock_env.cwd = "/default/path"
mock_create_env.return_value = mock_env
mock_config.return_value = {
"env_type": "local",
"cwd": "/config/default/path",
"timeout": 30,
}
# _get_file_ops resolves to "default"
task_id = "default"
# Ensure _last_known_cwd is empty for this task
_last_known_cwd.pop(task_id, None)
result = _get_file_ops(task_id)
create_call = mock_create_env.call_args
assert create_call is not None, "_create_environment was not called"
kwargs = create_call.kwargs if create_call.kwargs else {}
cwd_passed = kwargs.get("cwd", None)
if cwd_passed is None:
args = create_call.args if create_call.args else []
if len(args) >= 3:
cwd_passed = args[2]
# Should fall back to config default
assert cwd_passed == "/config/default/path", \
f"Expected cwd='/config/default/path', got {cwd_passed!r}"
@patch("tools.terminal_tool._active_environments", new_callable=dict)
@patch("tools.file_tools._file_ops_cache", new_callable=dict)
def test_live_cwd_read_mirrors_into_last_known_cwd(self, mock_cache, mock_active):
"""Belt-and-suspenders (#26211): every successful live-cwd read records
the cwd in _last_known_cwd, so the durable anchor doesn't depend on the
cleanup-detection branch of _get_file_ops firing."""
from tools.file_tools import _get_live_tracking_cwd, _last_known_cwd
task_id = "default"
_last_known_cwd.pop(task_id, None)
cached = MagicMock()
cached.env = MagicMock()
cached.env.cwd = "/Users/user/project"
cached.env.cwd_owner = "default"
mock_cache[task_id] = cached
live = _get_live_tracking_cwd(task_id)
assert live == "/Users/user/project"
# The read mirrored the live cwd into the durable registry.
assert _last_known_cwd.get(task_id) == "/Users/user/project"
_last_known_cwd.pop(task_id, None)
@patch("tools.terminal_tool._active_environments", new_callable=dict)
@patch("tools.file_tools._file_ops_cache", new_callable=dict)
@patch("tools.terminal_tool._get_env_config")
@patch("tools.terminal_tool._create_environment")
def test_mirrored_cwd_survives_when_cache_already_cleared(
self, mock_create_env, mock_config, mock_cache, mock_active
):
"""The original save-old-cwd path only fires when _file_ops_cache still
holds the stale entry. If the cleanup thread popped BOTH dicts first,
_get_file_ops sees cached=None and never saves — but the proactive
mirror from an earlier live read already populated _last_known_cwd, so
the rebuilt env still restores the user's directory."""
from tools.file_tools import (
_get_file_ops, _get_live_tracking_cwd, _last_known_cwd,
)
task_id = "default"
_last_known_cwd.pop(task_id, None)
# 1) Env is alive and the agent has cd'd into the project. A live read
# (happens on every relative-path resolution) mirrors the cwd.
cached = MagicMock()
cached.env = MagicMock()
cached.env.cwd = "/Users/user/project"
cached.env.cwd_owner = "default"
mock_cache[task_id] = cached
assert _get_live_tracking_cwd(task_id) == "/Users/user/project"
assert _last_known_cwd.get(task_id) == "/Users/user/project"
# 2) Cleanup thread kills the env AND clears the cache before the next
# file write — so _get_file_ops' save-old-cwd branch never runs.
mock_cache.pop(task_id, None)
mock_active.clear()
mock_env = MagicMock()
mock_env.cwd = "/Users/user/project"
mock_create_env.return_value = mock_env
mock_config.return_value = {
"env_type": "local",
"cwd": "/config/default/path",
"timeout": 30,
}
_get_file_ops(task_id)
create_call = mock_create_env.call_args
assert create_call is not None, "_create_environment was not called"
kwargs = create_call.kwargs if create_call.kwargs else {}
cwd_passed = kwargs.get("cwd", None)
if cwd_passed is None:
args = create_call.args if create_call.args else []
if len(args) >= 3:
cwd_passed = args[2]
# Rebuilt env restored the mirrored cwd, NOT the config default.
assert cwd_passed == "/Users/user/project", \
f"Expected restored cwd='/Users/user/project', got {cwd_passed!r}"
_last_known_cwd.pop(task_id, None)
class TestSilentFileMisplacementE2E:
"""Real-IO regression for #26211.
Exercises the actual write_file_tool path against a temp filesystem: an
agent cd's into a project, the cleanup thread kills the env, and a later
relative-path write must land in the project dir (not the config default).
Mocks miss this because resolution (_resolve_path_for_task) runs BEFORE
_get_file_ops rebuilds the env — only the durable _last_known_cwd fallback
in _authoritative_workspace_root makes the resolved path correct.
"""
def test_relative_write_after_env_cleanup_lands_in_user_cwd(self, tmp_path, monkeypatch):
import tools.terminal_tool as tt
import tools.file_tools as ft
project = tmp_path / "project"
config_default = tmp_path / "config_default"
project.mkdir()
config_default.mkdir()
monkeypatch.delenv("TERMINAL_CWD", raising=False)
_orig = tt._get_env_config
monkeypatch.setattr(
tt, "_get_env_config",
lambda: {**_orig(), "env_type": "local", "cwd": str(config_default)},
)
task_id = "default"
ft._last_known_cwd.pop(task_id, None)
# 1) Env alive; agent has cd'd into the project. A relative write
# while alive mirrors the live cwd into the durable registry.
fo = ft._get_file_ops(task_id)
fo.env.cwd = str(project)
fo.env.cwd_owner = "default"
ft.write_file_tool("alive.txt", "1\n", task_id)
assert (project / "alive.txt").exists()
# 2) Cleanup thread kills the env AND clears the file_ops cache.
with tt._env_lock:
tt._active_environments.pop(task_id, None)
tt._last_activity.pop(task_id, None)
with ft._file_ops_lock:
ft._file_ops_cache.pop(task_id, None)
# 3) The next relative write must still land in the project dir.
res = json.loads(ft.write_file_tool("report.txt", "hello\n", task_id))
assert res.get("resolved_path") == str(project / "report.txt"), res
assert (project / "report.txt").exists(), "file should be in the user's cwd"
assert not (config_default / "report.txt").exists(), \
"file silently misplaced into config default (the #26211 bug)"
ft._last_known_cwd.pop(task_id, None)