diff --git a/model_tools.py b/model_tools.py index db19bb67e53..1cbc83096ac 100644 --- a/model_tools.py +++ b/model_tools.py @@ -21,6 +21,7 @@ Public API (signatures preserved from the original 2,400-line version): """ import json +import re import asyncio import logging import threading @@ -485,6 +486,48 @@ _AGENT_LOOP_TOOLS = {"todo", "memory", "session_search", "delegate_task"} _READ_SEARCH_TOOLS = {"read_file", "search_files"} +# ========================================================================= +# Tool error sanitization +# ========================================================================= +# +# Tool exceptions can carry arbitrary text into the model's context as the +# `tool` message content. json.dumps() handles quote/backslash escaping so a +# raw injection of `` won't break message framing, but the model +# still *reads* those tokens and they can confuse downstream tool-call +# parsing or, in adversarial cases, nudge it toward role-confusion framing. +# +# This helper strips structural framing tokens (XML role tags, CDATA, +# markdown code fences) and caps the message at a sane upper bound before it +# becomes part of the conversation. It's defense-in-depth — the json layer +# already prevents framing escape — but cheap and worth having. +# +# Ported from ironclaw#1639. +_TOOL_ERROR_ROLE_TAG_RE = re.compile( + r'', + re.IGNORECASE, +) +_TOOL_ERROR_FENCE_OPEN_RE = re.compile(r'^\s*```(?:json|xml|html|markdown)?\s*', re.MULTILINE) +_TOOL_ERROR_FENCE_CLOSE_RE = re.compile(r'\s*```\s*$', re.MULTILINE) +_TOOL_ERROR_CDATA_RE = re.compile(r'', re.DOTALL) +_TOOL_ERROR_MAX_LEN = 2000 + + +def _sanitize_tool_error(error_msg: str) -> str: + """Strip structural framing tokens from a tool error before showing it to the model. + + See _TOOL_ERROR_ROLE_TAG_RE docstring above for rationale. + """ + if not error_msg: + return "[TOOL_ERROR] " + sanitized = _TOOL_ERROR_ROLE_TAG_RE.sub("", error_msg) + sanitized = _TOOL_ERROR_FENCE_OPEN_RE.sub("", sanitized) + sanitized = _TOOL_ERROR_FENCE_CLOSE_RE.sub("", sanitized) + sanitized = _TOOL_ERROR_CDATA_RE.sub("", sanitized) + if len(sanitized) > _TOOL_ERROR_MAX_LEN: + sanitized = sanitized[:_TOOL_ERROR_MAX_LEN - 3] + "..." + return f"[TOOL_ERROR] {sanitized}" + + # ========================================================================= # Tool argument type coercion # ========================================================================= @@ -824,7 +867,7 @@ def handle_function_call( except Exception as e: error_msg = f"Error executing {function_name}: {str(e)}" logger.exception(error_msg) - return json.dumps({"error": error_msg}, ensure_ascii=False) + return json.dumps({"error": _sanitize_tool_error(error_msg)}, ensure_ascii=False) # ============================================================================= diff --git a/tests/test_sanitize_tool_error.py b/tests/test_sanitize_tool_error.py new file mode 100644 index 00000000000..3a0685bf3d7 --- /dev/null +++ b/tests/test_sanitize_tool_error.py @@ -0,0 +1,137 @@ +"""Tests for `_sanitize_tool_error` in model_tools. + +Ported from ironclaw#1639 — defense-in-depth on tool exception strings before +they enter the model's `tool` message content. Note that `json.dumps()` in +`handle_function_call` already handles quote/backslash escaping at the wire +layer; this helper exists to strip structural framing tokens the model +itself might react to (XML role tags, CDATA, markdown code fences) and to +cap pathological lengths. +""" +from __future__ import annotations + +from model_tools import _sanitize_tool_error, _TOOL_ERROR_MAX_LEN + + +class TestRoleTagStripping: + def test_strips_tool_call_tags(self): + out = _sanitize_tool_error("bad injected happened") + assert "" not in out + assert "" not in out + assert "bad injected happened" in out + + def test_strips_function_call_tags(self): + out = _sanitize_tool_error("x") + assert "" not in out + assert "" not in out + + def test_strips_role_tags(self): + # Each of these should be stripped + for tag in ("system", "assistant", "user", "result", "response", "output", "input"): + raw = f"prefix <{tag}>hi suffix" + out = _sanitize_tool_error(raw) + assert f"<{tag}>" not in out, f"failed to strip <{tag}>" + assert f"" not in out, f"failed to strip " + + def test_role_tag_strip_is_case_insensitive(self): + out = _sanitize_tool_error("x") + assert "<" not in out.replace("[TOOL_ERROR]", "") # only the prefix bracket survives + + def test_unrelated_xml_kept(self): + # We intentionally only strip the role-like tag whitelist, not all XML + out = _sanitize_tool_error("Error parsing line 5") + assert "" in out + + +class TestCDATAStripping: + def test_strips_cdata(self): + out = _sanitize_tool_error("error: here") + assert "" not in out + + def test_strips_multiline_cdata(self): + out = _sanitize_tool_error("a\n\nb") + assert "CDATA" not in out + assert "a" in out and "b" in out + + +class TestCodeFenceStripping: + def test_strips_leading_fence_with_lang(self): + out = _sanitize_tool_error("```json\n{\"x\": 1}") + assert not out.replace("[TOOL_ERROR] ", "").startswith("```") + + def test_strips_trailing_fence(self): + out = _sanitize_tool_error("payload\n```") + assert not out.rstrip().endswith("```") + + def test_strips_bare_fence(self): + out = _sanitize_tool_error("```\nstuff") + assert "```" not in out.split("\n")[0] + + +class TestTruncation: + def test_caps_long_input(self): + long = "A" * (_TOOL_ERROR_MAX_LEN * 2) + out = _sanitize_tool_error(long) + # Total length is prefix + truncated body + body = out[len("[TOOL_ERROR] "):] + assert len(body) == _TOOL_ERROR_MAX_LEN + assert body.endswith("...") + + def test_does_not_truncate_short_input(self): + msg = "short error" + out = _sanitize_tool_error(msg) + assert "..." not in out + assert msg in out + + +class TestEnvelope: + def test_wraps_with_prefix(self): + out = _sanitize_tool_error("oh no") + assert out.startswith("[TOOL_ERROR] ") + + def test_empty_input(self): + out = _sanitize_tool_error("") + assert out == "[TOOL_ERROR] " + + def test_preserves_normal_error_text(self): + msg = "Error executing read_file: FileNotFoundError: /tmp/missing" + out = _sanitize_tool_error(msg) + assert msg in out + + +class TestHandleFunctionCallIntegration: + """Verify handle_function_call routes exception-path errors through the sanitizer. + + Note: the "Unknown tool: ..." early-return in tools/registry.py is a + *different* code path from `except Exception` in handle_function_call — + that one returns directly without sanitization (and there's nothing to + sanitize in a hardcoded format string anyway). This test exercises the + real exception path by passing args that make a known tool raise. + """ + + def test_exception_path_error_is_sanitized(self): + import json + from model_tools import handle_function_call + from tools.registry import registry as _registry + + # Force a known tool to raise with a payload containing role tags. + def boom(_args, **_kwargs): + raise RuntimeError("injected boom") + + all_tools = _registry.get_all_tool_names() + assert all_tools, "no tools registered — test environment broken" + target = all_tools[0] + original = _registry._tools[target].handler + _registry._tools[target].handler = boom + try: + result_str = handle_function_call(target, {}) + finally: + _registry._tools[target].handler = original + + payload = json.loads(result_str) + assert "error" in payload, payload + assert payload["error"].startswith("[TOOL_ERROR] "), payload["error"] + # Role-tag stripping carried through + assert "" not in payload["error"] + assert "" not in payload["error"] + assert "boom" in payload["error"] diff --git a/tools/registry.py b/tools/registry.py index 2639eac74ed..7bb92e85f96 100644 --- a/tools/registry.py +++ b/tools/registry.py @@ -404,7 +404,16 @@ class ToolRegistry: return entry.handler(args, **kwargs) except Exception as e: logger.exception("Tool %s dispatch error: %s", name, e) - return json.dumps({"error": f"Tool execution failed: {type(e).__name__}: {e}"}) + # Route through the sanitizer so framing tokens / CDATA / fences + # in exception strings don't reach the model as structural noise. + # See model_tools._sanitize_tool_error for rationale. + raw = f"Tool execution failed: {type(e).__name__}: {e}" + try: + from model_tools import _sanitize_tool_error + sanitized = _sanitize_tool_error(raw) + except Exception: + sanitized = raw # defensive: never let the sanitizer block error propagation + return json.dumps({"error": sanitized}) # ------------------------------------------------------------------ # Query helpers (replace redundant dicts in model_tools.py)