fix: escalate read/search blocking, track search loops, filter completed todos

- Block file reads after 3+ re-reads of same region (no content returned)
- Track search_files calls and block repeated identical searches
- Filter completed/cancelled todos from post-compression injection
  to prevent agent from re-doing finished work
- Add 10 new tests covering all three fixes
This commit is contained in:
0xbyt4 2026-03-08 23:01:21 +03:00
parent 9eee529a7f
commit e2fe1373f3
4 changed files with 167 additions and 9 deletions

View file

@ -19,6 +19,7 @@ from unittest.mock import patch, MagicMock
from tools.file_tools import (
read_file_tool,
search_tool,
get_read_files_summary,
clear_read_tracker,
_read_tracker,
@ -39,9 +40,16 @@ def _fake_read_file(path, offset=1, limit=500):
return _FakeReadResult(content=f"content of {path}", total_lines=10)
class _FakeSearchResult:
"""Minimal stand-in for FileOperations.search return value."""
def to_dict(self):
return {"matches": [{"file": "test.py", "line": 1, "text": "match"}]}
def _make_fake_file_ops():
fake = MagicMock()
fake.read_file = _fake_read_file
fake.search = lambda **kw: _FakeSearchResult()
return fake
@ -71,11 +79,23 @@ class TestReadLoopDetection(unittest.TestCase):
self.assertIn("2 times", result["_warning"])
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_third_read_increments_count(self, _mock_ops):
def test_third_read_is_blocked(self, _mock_ops):
"""3rd read of the same region returns error, no content."""
for _ in range(2):
read_file_tool("/tmp/test.py", task_id="t1")
result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
self.assertIn("3 times", result["_warning"])
self.assertIn("error", result)
self.assertIn("BLOCKED", result["error"])
self.assertNotIn("content", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_fourth_read_still_blocked(self, _mock_ops):
"""Subsequent reads remain blocked with incrementing count."""
for _ in range(3):
read_file_tool("/tmp/test.py", task_id="t1")
result = json.loads(read_file_tool("/tmp/test.py", task_id="t1"))
self.assertIn("BLOCKED", result["error"])
self.assertIn("4 times", result["error"])
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_different_region_no_warning(self, _mock_ops):
@ -267,5 +287,94 @@ class TestCompressionFileHistory(unittest.TestCase):
self.assertIn("do NOT re-read", history_content)
class TestSearchLoopDetection(unittest.TestCase):
"""Verify that search_tool detects and blocks repeated searches."""
def setUp(self):
clear_read_tracker()
def tearDown(self):
clear_read_tracker()
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_first_search_no_warning(self, _mock_ops):
result = json.loads(search_tool("def main", task_id="t1"))
self.assertNotIn("_warning", result)
self.assertNotIn("error", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_second_search_has_warning(self, _mock_ops):
search_tool("def main", task_id="t1")
result = json.loads(search_tool("def main", task_id="t1"))
self.assertIn("_warning", result)
self.assertIn("2 times", result["_warning"])
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_third_search_is_blocked(self, _mock_ops):
for _ in range(2):
search_tool("def main", task_id="t1")
result = json.loads(search_tool("def main", task_id="t1"))
self.assertIn("error", result)
self.assertIn("BLOCKED", result["error"])
self.assertNotIn("matches", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_different_pattern_no_warning(self, _mock_ops):
search_tool("def main", task_id="t1")
result = json.loads(search_tool("class Foo", task_id="t1"))
self.assertNotIn("_warning", result)
self.assertNotIn("error", result)
@patch("tools.file_tools._get_file_ops", return_value=_make_fake_file_ops())
def test_different_task_isolated(self, _mock_ops):
search_tool("def main", task_id="t1")
result = json.loads(search_tool("def main", task_id="t2"))
self.assertNotIn("_warning", result)
class TestTodoInjectionFiltering(unittest.TestCase):
"""Verify that format_for_injection filters completed/cancelled todos."""
def test_filters_completed_and_cancelled(self):
from tools.todo_tool import TodoStore
store = TodoStore()
store.write([
{"id": "1", "content": "Read codebase", "status": "completed"},
{"id": "2", "content": "Write fix", "status": "in_progress"},
{"id": "3", "content": "Run tests", "status": "pending"},
{"id": "4", "content": "Abandoned", "status": "cancelled"},
])
injection = store.format_for_injection()
self.assertNotIn("Read codebase", injection)
self.assertNotIn("Abandoned", injection)
self.assertIn("Write fix", injection)
self.assertIn("Run tests", injection)
def test_all_completed_returns_none(self):
from tools.todo_tool import TodoStore
store = TodoStore()
store.write([
{"id": "1", "content": "Done", "status": "completed"},
{"id": "2", "content": "Also done", "status": "cancelled"},
])
self.assertIsNone(store.format_for_injection())
def test_empty_store_returns_none(self):
from tools.todo_tool import TodoStore
store = TodoStore()
self.assertIsNone(store.format_for_injection())
def test_all_active_included(self):
from tools.todo_tool import TodoStore
store = TodoStore()
store.write([
{"id": "1", "content": "Task A", "status": "pending"},
{"id": "2", "content": "Task B", "status": "in_progress"},
])
injection = store.format_for_injection()
self.assertIn("Task A", injection)
self.assertIn("Task B", injection)
if __name__ == "__main__":
unittest.main()