From 40619b393fd997cde206557dcf6599d113ca2cf9 Mon Sep 17 00:00:00 2001 From: Yukipukii1 Date: Wed, 22 Apr 2026 08:38:01 +0300 Subject: [PATCH] tools: normalize file tool pagination bounds --- tests/tools/test_file_operations.py | 13 +++++ .../tools/test_file_operations_edge_cases.py | 58 +++++++++++++++++++ tests/tools/test_file_tools.py | 28 +++++++++ tools/file_operations.py | 39 ++++++++++++- tools/file_tools.py | 10 +++- 5 files changed, 145 insertions(+), 3 deletions(-) diff --git a/tests/tools/test_file_operations.py b/tests/tools/test_file_operations.py index b379fefcb..dfd54ba63 100644 --- a/tests/tools/test_file_operations.py +++ b/tests/tools/test_file_operations.py @@ -19,6 +19,8 @@ from tools.file_operations import ( BINARY_EXTENSIONS, IMAGE_EXTENSIONS, MAX_LINE_LENGTH, + normalize_read_pagination, + normalize_search_pagination, ) @@ -192,6 +194,17 @@ def file_ops(mock_env): class TestShellFileOpsHelpers: + def test_normalize_read_pagination_clamps_invalid_values(self): + assert normalize_read_pagination(offset=0, limit=0) == (1, 1) + assert normalize_read_pagination(offset=-10, limit=-5) == (1, 1) + assert normalize_read_pagination(offset="bad", limit="bad") == (1, 500) + assert normalize_read_pagination(offset=2, limit=999999) == (2, 2000) + + def test_normalize_search_pagination_clamps_invalid_values(self): + assert normalize_search_pagination(offset=-10, limit=-5) == (0, 1) + assert normalize_search_pagination(offset="bad", limit="bad") == (0, 50) + assert normalize_search_pagination(offset=3, limit=0) == (3, 1) + def test_escape_shell_arg_simple(self, file_ops): assert file_ops._escape_shell_arg("hello") == "'hello'" diff --git a/tests/tools/test_file_operations_edge_cases.py b/tests/tools/test_file_operations_edge_cases.py index b13dedded..8a4378d2f 100644 --- a/tests/tools/test_file_operations_edge_cases.py +++ b/tests/tools/test_file_operations_edge_cases.py @@ -146,3 +146,61 @@ class TestCheckLintBracePaths: assert result.success is False assert "SyntaxError" in result.output + + +# ========================================================================= +# Pagination bounds +# ========================================================================= + + +class TestPaginationBounds: + """Invalid pagination inputs should not leak into shell commands.""" + + def test_read_file_clamps_offset_and_limit_before_building_sed_range(self): + env = MagicMock() + env.cwd = "/tmp" + ops = ShellFileOperations(env) + commands = [] + + def fake_exec(command, *args, **kwargs): + commands.append(command) + if command.startswith("wc -c"): + return MagicMock(exit_code=0, stdout="12") + if command.startswith("head -c"): + return MagicMock(exit_code=0, stdout="line1\nline2\n") + if command.startswith("sed -n"): + return MagicMock(exit_code=0, stdout="line1\n") + if command.startswith("wc -l"): + return MagicMock(exit_code=0, stdout="2") + return MagicMock(exit_code=0, stdout="") + + with patch.object(ops, "_exec", side_effect=fake_exec): + result = ops.read_file("notes.txt", offset=0, limit=0) + + assert result.error is None + assert " 1|line1" in result.content + sed_commands = [cmd for cmd in commands if cmd.startswith("sed -n")] + assert sed_commands == ["sed -n '1,1p' 'notes.txt'"] + + def test_search_clamps_offset_and_limit_before_building_head_pipeline(self): + env = MagicMock() + env.cwd = "/tmp" + ops = ShellFileOperations(env) + commands = [] + + def fake_exec(command, *args, **kwargs): + commands.append(command) + if command.startswith("test -e"): + return MagicMock(exit_code=0, stdout="exists") + if command.startswith("rg --files"): + return MagicMock(exit_code=0, stdout="a.py\n") + return MagicMock(exit_code=0, stdout="") + + with patch.object(ops, "_has_command", side_effect=lambda cmd: cmd == "rg"), \ + patch.object(ops, "_exec", side_effect=fake_exec): + result = ops.search("*.py", target="files", path=".", offset=-4, limit=-2) + + assert result.files == ["a.py"] + rg_commands = [cmd for cmd in commands if cmd.startswith("rg --files")] + assert rg_commands + assert "| head -n 1" in rg_commands[0] diff --git a/tests/tools/test_file_tools.py b/tests/tools/test_file_tools.py index 1e1fccb66..c2d75bf5d 100644 --- a/tests/tools/test_file_tools.py +++ b/tests/tools/test_file_tools.py @@ -45,6 +45,19 @@ class TestReadFileHandler: read_file_tool("/tmp/big.txt", offset=10, limit=20) mock_ops.read_file.assert_called_once_with("/tmp/big.txt", 10, 20) + @patch("tools.file_tools._get_file_ops") + def test_invalid_offset_and_limit_are_normalized_before_dispatch(self, mock_get): + mock_ops = MagicMock() + result_obj = MagicMock() + result_obj.content = "line1" + result_obj.to_dict.return_value = {"content": "line1", "total_lines": 1} + mock_ops.read_file.return_value = result_obj + mock_get.return_value = mock_ops + + from tools.file_tools import read_file_tool + read_file_tool("/tmp/big.txt", offset=0, limit=0) + mock_ops.read_file.assert_called_once_with("/tmp/big.txt", 1, 1) + @patch("tools.file_tools._get_file_ops") def test_exception_returns_error_json(self, mock_get): mock_get.side_effect = RuntimeError("terminal not available") @@ -191,6 +204,21 @@ class TestSearchHandler: limit=10, offset=5, output_mode="count", context=2, ) + @patch("tools.file_tools._get_file_ops") + def test_search_normalizes_invalid_pagination_before_dispatch(self, mock_get): + mock_ops = MagicMock() + result_obj = MagicMock() + result_obj.to_dict.return_value = {"files": []} + mock_ops.search.return_value = result_obj + mock_get.return_value = mock_ops + + from tools.file_tools import search_tool + search_tool(pattern="class", target="files", path="/src", limit=-5, offset=-2) + mock_ops.search.assert_called_once_with( + pattern="class", path="/src", target="files", file_glob=None, + limit=1, offset=0, output_mode="content", context=0, + ) + @patch("tools.file_tools._get_file_ops") def test_search_exception_returns_error(self, mock_get): mock_get.side_effect = RuntimeError("no terminal") diff --git a/tools/file_operations.py b/tools/file_operations.py index 87ad13968..7e75578b2 100644 --- a/tools/file_operations.py +++ b/tools/file_operations.py @@ -271,6 +271,40 @@ LINTERS = { MAX_LINES = 2000 MAX_LINE_LENGTH = 2000 MAX_FILE_SIZE = 50 * 1024 # 50KB +DEFAULT_READ_OFFSET = 1 +DEFAULT_READ_LIMIT = 500 +DEFAULT_SEARCH_OFFSET = 0 +DEFAULT_SEARCH_LIMIT = 50 + + +def _coerce_int(value: Any, default: int) -> int: + """Best-effort integer coercion for tool pagination inputs.""" + try: + return int(value) + except (TypeError, ValueError): + return default + + +def normalize_read_pagination(offset: Any = DEFAULT_READ_OFFSET, + limit: Any = DEFAULT_READ_LIMIT) -> tuple[int, int]: + """Return safe read_file pagination bounds. + + Tool schemas declare minimum/maximum values, but not every caller or + provider enforces schemas before dispatch. Clamp here so invalid values + cannot leak into sed ranges like ``0,-1p``. + """ + normalized_offset = max(1, _coerce_int(offset, DEFAULT_READ_OFFSET)) + normalized_limit = _coerce_int(limit, DEFAULT_READ_LIMIT) + normalized_limit = max(1, min(normalized_limit, MAX_LINES)) + return normalized_offset, normalized_limit + + +def normalize_search_pagination(offset: Any = DEFAULT_SEARCH_OFFSET, + limit: Any = DEFAULT_SEARCH_LIMIT) -> tuple[int, int]: + """Return safe search pagination bounds for shell head/tail pipelines.""" + normalized_offset = max(0, _coerce_int(offset, DEFAULT_SEARCH_OFFSET)) + normalized_limit = max(1, _coerce_int(limit, DEFAULT_SEARCH_LIMIT)) + return normalized_offset, normalized_limit class ShellFileOperations(FileOperations): @@ -461,8 +495,7 @@ class ShellFileOperations(FileOperations): # Expand ~ and other shell paths path = self._expand_path(path) - # Clamp limit - limit = min(limit, MAX_LINES) + offset, limit = normalize_read_pagination(offset, limit) # Check if file exists and get size (wc -c is POSIX, works on Linux + macOS) stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null" @@ -866,6 +899,8 @@ class ShellFileOperations(FileOperations): Returns: SearchResult with matches or file list """ + offset, limit = normalize_search_pagination(offset, limit) + # Expand ~ and other shell paths path = self._expand_path(path) diff --git a/tools/file_tools.py b/tools/file_tools.py index a2e72e7ec..3b6f45942 100644 --- a/tools/file_tools.py +++ b/tools/file_tools.py @@ -11,7 +11,11 @@ from typing import Optional from agent.file_safety import get_read_block_error from tools.binary_extensions import has_binary_extension -from tools.file_operations import ShellFileOperations +from tools.file_operations import ( + ShellFileOperations, + normalize_read_pagination, + normalize_search_pagination, +) from tools import file_state from agent.redact import redact_sensitive_text @@ -351,6 +355,8 @@ def clear_file_ops_cache(task_id: str = None): def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str: """Read a file with pagination and line numbers.""" try: + offset, limit = normalize_read_pagination(offset, limit) + # ── Device path guard ───────────────────────────────────────── # Block paths that would hang the process (infinite output, # blocking on input). Pure path check — no I/O. @@ -762,6 +768,8 @@ def search_tool(pattern: str, target: str = "content", path: str = ".", task_id: str = "default") -> str: """Search for content or files.""" try: + offset, limit = normalize_search_pagination(offset, limit) + # Track searches to detect *consecutive* repeated search loops. # Include pagination args so users can page through truncated # results without tripping the repeated-search guard.