tools: normalize file tool pagination bounds

This commit is contained in:
Yukipukii1 2026-04-22 08:38:01 +03:00 committed by Teknium
parent 3e652f75b2
commit 40619b393f
5 changed files with 145 additions and 3 deletions

View file

@ -19,6 +19,8 @@ from tools.file_operations import (
BINARY_EXTENSIONS, BINARY_EXTENSIONS,
IMAGE_EXTENSIONS, IMAGE_EXTENSIONS,
MAX_LINE_LENGTH, MAX_LINE_LENGTH,
normalize_read_pagination,
normalize_search_pagination,
) )
@ -192,6 +194,17 @@ def file_ops(mock_env):
class TestShellFileOpsHelpers: class TestShellFileOpsHelpers:
def test_normalize_read_pagination_clamps_invalid_values(self):
assert normalize_read_pagination(offset=0, limit=0) == (1, 1)
assert normalize_read_pagination(offset=-10, limit=-5) == (1, 1)
assert normalize_read_pagination(offset="bad", limit="bad") == (1, 500)
assert normalize_read_pagination(offset=2, limit=999999) == (2, 2000)
def test_normalize_search_pagination_clamps_invalid_values(self):
assert normalize_search_pagination(offset=-10, limit=-5) == (0, 1)
assert normalize_search_pagination(offset="bad", limit="bad") == (0, 50)
assert normalize_search_pagination(offset=3, limit=0) == (3, 1)
def test_escape_shell_arg_simple(self, file_ops): def test_escape_shell_arg_simple(self, file_ops):
assert file_ops._escape_shell_arg("hello") == "'hello'" assert file_ops._escape_shell_arg("hello") == "'hello'"

View file

@ -146,3 +146,61 @@ class TestCheckLintBracePaths:
assert result.success is False assert result.success is False
assert "SyntaxError" in result.output assert "SyntaxError" in result.output
# =========================================================================
# Pagination bounds
# =========================================================================
class TestPaginationBounds:
"""Invalid pagination inputs should not leak into shell commands."""
def test_read_file_clamps_offset_and_limit_before_building_sed_range(self):
env = MagicMock()
env.cwd = "/tmp"
ops = ShellFileOperations(env)
commands = []
def fake_exec(command, *args, **kwargs):
commands.append(command)
if command.startswith("wc -c"):
return MagicMock(exit_code=0, stdout="12")
if command.startswith("head -c"):
return MagicMock(exit_code=0, stdout="line1\nline2\n")
if command.startswith("sed -n"):
return MagicMock(exit_code=0, stdout="line1\n")
if command.startswith("wc -l"):
return MagicMock(exit_code=0, stdout="2")
return MagicMock(exit_code=0, stdout="")
with patch.object(ops, "_exec", side_effect=fake_exec):
result = ops.read_file("notes.txt", offset=0, limit=0)
assert result.error is None
assert " 1|line1" in result.content
sed_commands = [cmd for cmd in commands if cmd.startswith("sed -n")]
assert sed_commands == ["sed -n '1,1p' 'notes.txt'"]
def test_search_clamps_offset_and_limit_before_building_head_pipeline(self):
env = MagicMock()
env.cwd = "/tmp"
ops = ShellFileOperations(env)
commands = []
def fake_exec(command, *args, **kwargs):
commands.append(command)
if command.startswith("test -e"):
return MagicMock(exit_code=0, stdout="exists")
if command.startswith("rg --files"):
return MagicMock(exit_code=0, stdout="a.py\n")
return MagicMock(exit_code=0, stdout="")
with patch.object(ops, "_has_command", side_effect=lambda cmd: cmd == "rg"), \
patch.object(ops, "_exec", side_effect=fake_exec):
result = ops.search("*.py", target="files", path=".", offset=-4, limit=-2)
assert result.files == ["a.py"]
rg_commands = [cmd for cmd in commands if cmd.startswith("rg --files")]
assert rg_commands
assert "| head -n 1" in rg_commands[0]

View file

@ -45,6 +45,19 @@ class TestReadFileHandler:
read_file_tool("/tmp/big.txt", offset=10, limit=20) read_file_tool("/tmp/big.txt", offset=10, limit=20)
mock_ops.read_file.assert_called_once_with("/tmp/big.txt", 10, 20) mock_ops.read_file.assert_called_once_with("/tmp/big.txt", 10, 20)
@patch("tools.file_tools._get_file_ops")
def test_invalid_offset_and_limit_are_normalized_before_dispatch(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.content = "line1"
result_obj.to_dict.return_value = {"content": "line1", "total_lines": 1}
mock_ops.read_file.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import read_file_tool
read_file_tool("/tmp/big.txt", offset=0, limit=0)
mock_ops.read_file.assert_called_once_with("/tmp/big.txt", 1, 1)
@patch("tools.file_tools._get_file_ops") @patch("tools.file_tools._get_file_ops")
def test_exception_returns_error_json(self, mock_get): def test_exception_returns_error_json(self, mock_get):
mock_get.side_effect = RuntimeError("terminal not available") mock_get.side_effect = RuntimeError("terminal not available")
@ -191,6 +204,21 @@ class TestSearchHandler:
limit=10, offset=5, output_mode="count", context=2, limit=10, offset=5, output_mode="count", context=2,
) )
@patch("tools.file_tools._get_file_ops")
def test_search_normalizes_invalid_pagination_before_dispatch(self, mock_get):
mock_ops = MagicMock()
result_obj = MagicMock()
result_obj.to_dict.return_value = {"files": []}
mock_ops.search.return_value = result_obj
mock_get.return_value = mock_ops
from tools.file_tools import search_tool
search_tool(pattern="class", target="files", path="/src", limit=-5, offset=-2)
mock_ops.search.assert_called_once_with(
pattern="class", path="/src", target="files", file_glob=None,
limit=1, offset=0, output_mode="content", context=0,
)
@patch("tools.file_tools._get_file_ops") @patch("tools.file_tools._get_file_ops")
def test_search_exception_returns_error(self, mock_get): def test_search_exception_returns_error(self, mock_get):
mock_get.side_effect = RuntimeError("no terminal") mock_get.side_effect = RuntimeError("no terminal")

View file

@ -271,6 +271,40 @@ LINTERS = {
MAX_LINES = 2000 MAX_LINES = 2000
MAX_LINE_LENGTH = 2000 MAX_LINE_LENGTH = 2000
MAX_FILE_SIZE = 50 * 1024 # 50KB MAX_FILE_SIZE = 50 * 1024 # 50KB
DEFAULT_READ_OFFSET = 1
DEFAULT_READ_LIMIT = 500
DEFAULT_SEARCH_OFFSET = 0
DEFAULT_SEARCH_LIMIT = 50
def _coerce_int(value: Any, default: int) -> int:
"""Best-effort integer coercion for tool pagination inputs."""
try:
return int(value)
except (TypeError, ValueError):
return default
def normalize_read_pagination(offset: Any = DEFAULT_READ_OFFSET,
limit: Any = DEFAULT_READ_LIMIT) -> tuple[int, int]:
"""Return safe read_file pagination bounds.
Tool schemas declare minimum/maximum values, but not every caller or
provider enforces schemas before dispatch. Clamp here so invalid values
cannot leak into sed ranges like ``0,-1p``.
"""
normalized_offset = max(1, _coerce_int(offset, DEFAULT_READ_OFFSET))
normalized_limit = _coerce_int(limit, DEFAULT_READ_LIMIT)
normalized_limit = max(1, min(normalized_limit, MAX_LINES))
return normalized_offset, normalized_limit
def normalize_search_pagination(offset: Any = DEFAULT_SEARCH_OFFSET,
limit: Any = DEFAULT_SEARCH_LIMIT) -> tuple[int, int]:
"""Return safe search pagination bounds for shell head/tail pipelines."""
normalized_offset = max(0, _coerce_int(offset, DEFAULT_SEARCH_OFFSET))
normalized_limit = max(1, _coerce_int(limit, DEFAULT_SEARCH_LIMIT))
return normalized_offset, normalized_limit
class ShellFileOperations(FileOperations): class ShellFileOperations(FileOperations):
@ -461,8 +495,7 @@ class ShellFileOperations(FileOperations):
# Expand ~ and other shell paths # Expand ~ and other shell paths
path = self._expand_path(path) path = self._expand_path(path)
# Clamp limit offset, limit = normalize_read_pagination(offset, limit)
limit = min(limit, MAX_LINES)
# Check if file exists and get size (wc -c is POSIX, works on Linux + macOS) # Check if file exists and get size (wc -c is POSIX, works on Linux + macOS)
stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null" stat_cmd = f"wc -c < {self._escape_shell_arg(path)} 2>/dev/null"
@ -866,6 +899,8 @@ class ShellFileOperations(FileOperations):
Returns: Returns:
SearchResult with matches or file list SearchResult with matches or file list
""" """
offset, limit = normalize_search_pagination(offset, limit)
# Expand ~ and other shell paths # Expand ~ and other shell paths
path = self._expand_path(path) path = self._expand_path(path)

View file

@ -11,7 +11,11 @@ from typing import Optional
from agent.file_safety import get_read_block_error from agent.file_safety import get_read_block_error
from tools.binary_extensions import has_binary_extension from tools.binary_extensions import has_binary_extension
from tools.file_operations import ShellFileOperations from tools.file_operations import (
ShellFileOperations,
normalize_read_pagination,
normalize_search_pagination,
)
from tools import file_state from tools import file_state
from agent.redact import redact_sensitive_text from agent.redact import redact_sensitive_text
@ -351,6 +355,8 @@ def clear_file_ops_cache(task_id: str = None):
def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str: def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str:
"""Read a file with pagination and line numbers.""" """Read a file with pagination and line numbers."""
try: try:
offset, limit = normalize_read_pagination(offset, limit)
# ── Device path guard ───────────────────────────────────────── # ── Device path guard ─────────────────────────────────────────
# Block paths that would hang the process (infinite output, # Block paths that would hang the process (infinite output,
# blocking on input). Pure path check — no I/O. # blocking on input). Pure path check — no I/O.
@ -762,6 +768,8 @@ def search_tool(pattern: str, target: str = "content", path: str = ".",
task_id: str = "default") -> str: task_id: str = "default") -> str:
"""Search for content or files.""" """Search for content or files."""
try: try:
offset, limit = normalize_search_pagination(offset, limit)
# Track searches to detect *consecutive* repeated search loops. # Track searches to detect *consecutive* repeated search loops.
# Include pagination args so users can page through truncated # Include pagination args so users can page through truncated
# results without tripping the repeated-search guard. # results without tripping the repeated-search guard.