fix: strip ANSI at the source — clean terminal output before it reaches the model

Root cause: terminal_tool, execute_code, and process_registry returned raw
subprocess output with ANSI escape sequences intact. The model saw these
in tool results and copied them into file writes.

Previous fix (PR #2532) stripped ANSI at the write point in file_tools.py,
but this was a band-aid — regex on file content risks corrupting legitimate
content, and doesn't prevent ANSI from wasting tokens in the model context.

Source-level fix:
- New tools/ansi_strip.py with comprehensive ECMA-48 regex covering CSI
  (incl. private-mode, colon-separated, intermediate bytes), OSC (both
  terminators), DCS/SOS/PM/APC strings, Fp/Fe/Fs/nF escapes, 8-bit C1
- terminal_tool.py: strip output before returning to model
- code_execution_tool.py: strip stdout/stderr before returning
- process_registry.py: strip output in poll/read_log/wait
- file_tools.py: remove _strip_ansi band-aid (no longer needed)

Verified: `ls --color=always` output returned as clean text to model,
file written from that output contains zero ESC bytes.
This commit is contained in:
Teknium 2026-03-23 06:50:39 -07:00
parent 6302e56e7c
commit 934fbe3c06
No known key found for this signature in database
7 changed files with 236 additions and 21 deletions

View file

@ -0,0 +1,168 @@
"""Comprehensive tests for ANSI escape sequence stripping (ECMA-48).
The strip_ansi function in tools/ansi_strip.py is the source-level fix for
ANSI codes leaking into the model's context via terminal/execute_code output.
It must strip ALL terminal escape sequences while preserving legitimate text.
"""
from tools.ansi_strip import strip_ansi
class TestStripAnsiBasicSGR:
"""Select Graphic Rendition — the most common ANSI sequences."""
def test_reset(self):
assert strip_ansi("\x1b[0m") == ""
def test_color(self):
assert strip_ansi("\x1b[31;1m") == ""
def test_truecolor_semicolon(self):
assert strip_ansi("\x1b[38;2;255;0;0m") == ""
def test_truecolor_colon_separated(self):
"""Modern terminals use colon-separated SGR params."""
assert strip_ansi("\x1b[38:2:255:0:0m") == ""
assert strip_ansi("\x1b[48:2:0:255:0m") == ""
class TestStripAnsiCSIPrivateMode:
"""CSI sequences with ? prefix (DEC private modes)."""
def test_cursor_show_hide(self):
assert strip_ansi("\x1b[?25h") == ""
assert strip_ansi("\x1b[?25l") == ""
def test_alt_screen(self):
assert strip_ansi("\x1b[?1049h") == ""
assert strip_ansi("\x1b[?1049l") == ""
def test_bracketed_paste(self):
assert strip_ansi("\x1b[?2004h") == ""
class TestStripAnsiCSIIntermediate:
"""CSI sequences with intermediate bytes (space, etc.)."""
def test_cursor_shape(self):
assert strip_ansi("\x1b[0 q") == ""
assert strip_ansi("\x1b[2 q") == ""
assert strip_ansi("\x1b[6 q") == ""
class TestStripAnsiOSC:
"""Operating System Command sequences."""
def test_bel_terminator(self):
assert strip_ansi("\x1b]0;title\x07") == ""
def test_st_terminator(self):
assert strip_ansi("\x1b]0;title\x1b\\") == ""
def test_hyperlink_preserves_text(self):
assert strip_ansi(
"\x1b]8;;https://example.com\x1b\\click\x1b]8;;\x1b\\"
) == "click"
class TestStripAnsiDECPrivate:
"""DEC private / Fp escape sequences."""
def test_save_restore_cursor(self):
assert strip_ansi("\x1b7") == ""
assert strip_ansi("\x1b8") == ""
def test_keypad_modes(self):
assert strip_ansi("\x1b=") == ""
assert strip_ansi("\x1b>") == ""
class TestStripAnsiFe:
"""Fe (C1 as 7-bit) escape sequences."""
def test_reverse_index(self):
assert strip_ansi("\x1bM") == ""
def test_reset_terminal(self):
assert strip_ansi("\x1bc") == ""
def test_index_and_newline(self):
assert strip_ansi("\x1bD") == ""
assert strip_ansi("\x1bE") == ""
class TestStripAnsiNF:
"""nF (character set selection) sequences."""
def test_charset_selection(self):
assert strip_ansi("\x1b(A") == ""
assert strip_ansi("\x1b(B") == ""
assert strip_ansi("\x1b(0") == ""
class TestStripAnsiDCS:
"""Device Control String sequences."""
def test_dcs(self):
assert strip_ansi("\x1bP+q\x1b\\") == ""
class TestStripAnsi8BitC1:
"""8-bit C1 control characters."""
def test_8bit_csi(self):
assert strip_ansi("\x9b31m") == ""
assert strip_ansi("\x9b38;2;255;0;0m") == ""
def test_8bit_standalone(self):
assert strip_ansi("\x9c") == ""
assert strip_ansi("\x9d") == ""
assert strip_ansi("\x90") == ""
class TestStripAnsiRealWorld:
"""Real-world contamination scenarios from bug reports."""
def test_colored_shebang(self):
"""The original reported bug: shebang corrupted by color codes."""
assert strip_ansi(
"\x1b[32m#!/usr/bin/env python3\x1b[0m\nprint('hello')"
) == "#!/usr/bin/env python3\nprint('hello')"
def test_stacked_sgr(self):
assert strip_ansi(
"\x1b[1m\x1b[31m\x1b[42mhello\x1b[0m"
) == "hello"
def test_ansi_mid_code(self):
assert strip_ansi(
"def foo(\x1b[33m):\x1b[0m\n return 42"
) == "def foo():\n return 42"
class TestStripAnsiPassthrough:
"""Clean content must pass through unmodified."""
def test_plain_text(self):
assert strip_ansi("normal text") == "normal text"
def test_empty(self):
assert strip_ansi("") == ""
def test_none(self):
assert strip_ansi(None) is None
def test_whitespace_preserved(self):
assert strip_ansi("line1\nline2\ttab") == "line1\nline2\ttab"
def test_unicode_safe(self):
assert strip_ansi("emoji 🎉 and ñ café") == "emoji 🎉 and ñ café"
def test_backslash_in_code(self):
code = "path = 'C:\\\\Users\\\\test'"
assert strip_ansi(code) == code
def test_square_brackets_in_code(self):
"""Array indexing must not be confused with CSI."""
code = "arr[0] = arr[31]"
assert strip_ansi(code) == code

View file

@ -309,3 +309,6 @@ class TestSearchHints:
raw = search_tool(pattern="foo", offset=50, limit=50) raw = search_tool(pattern="foo", offset=50, limit=50)
assert "[Hint:" in raw assert "[Hint:" in raw
assert "offset=100" in raw assert "offset=100" in raw

44
tools/ansi_strip.py Normal file
View file

@ -0,0 +1,44 @@
"""Strip ANSI escape sequences from subprocess output.
Used by terminal_tool, code_execution_tool, and process_registry to clean
command output before returning it to the model. This prevents ANSI codes
from entering the model's context — which is the root cause of models
copying escape sequences into file writes.
Covers the full ECMA-48 spec: CSI (including private-mode ``?`` prefix,
colon-separated params, intermediate bytes), OSC (BEL and ST terminators),
DCS/SOS/PM/APC string sequences, nF multi-byte escapes, Fp/Fe/Fs
single-byte escapes, and 8-bit C1 control characters.
"""
import re
_ANSI_ESCAPE_RE = re.compile(
r"\x1b"
r"(?:"
r"\[[\x30-\x3f]*[\x20-\x2f]*[\x40-\x7e]" # CSI sequence
r"|\][\s\S]*?(?:\x07|\x1b\\)" # OSC (BEL or ST terminator)
r"|[PX^_][\s\S]*?(?:\x1b\\)" # DCS/SOS/PM/APC strings
r"|[\x20-\x2f]+[\x30-\x7e]" # nF escape sequences
r"|[\x30-\x7e]" # Fp/Fe/Fs single-byte
r")"
r"|\x9b[\x30-\x3f]*[\x20-\x2f]*[\x40-\x7e]" # 8-bit CSI
r"|\x9d[\s\S]*?(?:\x07|\x9c)" # 8-bit OSC
r"|[\x80-\x9f]", # Other 8-bit C1 controls
re.DOTALL,
)
# Fast-path check — skip full regex when no escape-like bytes are present.
_HAS_ESCAPE = re.compile(r"[\x1b\x80-\x9f]")
def strip_ansi(text: str) -> str:
"""Remove ANSI escape sequences from text.
Returns the input unchanged (fast path) when no ESC or C1 bytes are
present. Safe to call on any string clean text passes through
with negligible overhead.
"""
if not text or not _HAS_ESCAPE.search(text):
return text
return _ANSI_ESCAPE_RE.sub("", text)

View file

@ -577,6 +577,12 @@ def execute_code(
server_sock = None # prevent double close in finally server_sock = None # prevent double close in finally
rpc_thread.join(timeout=3) rpc_thread.join(timeout=3)
# Strip ANSI escape sequences so the model never sees terminal
# formatting — prevents it from copying escapes into file writes.
from tools.ansi_strip import strip_ansi
stdout_text = strip_ansi(stdout_text)
stderr_text = strip_ansi(stderr_text)
# Build response # Build response
result: Dict[str, Any] = { result: Dict[str, Any] = {
"status": status, "status": status,

View file

@ -5,7 +5,6 @@ import errno
import json import json
import logging import logging
import os import os
import re
import threading import threading
from typing import Optional from typing import Optional
from tools.file_operations import ShellFileOperations from tools.file_operations import ShellFileOperations
@ -13,17 +12,6 @@ from agent.redact import redact_sensitive_text
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Regex to match ANSI escape sequences (CSI codes, OSC codes, simple escapes).
# Models occasionally copy these from terminal output into file content.
_ANSI_ESCAPE_RE = re.compile(r"\x1b\[[0-9;]*[A-Za-z]|\x1b\][^\x07]*\x07|\x1b[()][A-B012]|\x1b[=>]")
def _strip_ansi(text: str) -> str:
"""Remove ANSI escape sequences from text destined for file writes."""
if not text or "\x1b" not in text:
return text
return _ANSI_ESCAPE_RE.sub("", text)
_EXPECTED_WRITE_ERRNOS = {errno.EACCES, errno.EPERM, errno.EROFS} _EXPECTED_WRITE_ERRNOS = {errno.EACCES, errno.EPERM, errno.EROFS}
@ -301,7 +289,6 @@ def notify_other_tool_call(task_id: str = "default"):
def write_file_tool(path: str, content: str, task_id: str = "default") -> str: def write_file_tool(path: str, content: str, task_id: str = "default") -> str:
"""Write content to a file.""" """Write content to a file."""
try: try:
content = _strip_ansi(content)
file_ops = _get_file_ops(task_id) file_ops = _get_file_ops(task_id)
result = file_ops.write_file(path, content) result = file_ops.write_file(path, content)
return json.dumps(result.to_dict(), ensure_ascii=False) return json.dumps(result.to_dict(), ensure_ascii=False)
@ -325,13 +312,10 @@ def patch_tool(mode: str = "replace", path: str = None, old_string: str = None,
return json.dumps({"error": "path required"}) return json.dumps({"error": "path required"})
if old_string is None or new_string is None: if old_string is None or new_string is None:
return json.dumps({"error": "old_string and new_string required"}) return json.dumps({"error": "old_string and new_string required"})
old_string = _strip_ansi(old_string)
new_string = _strip_ansi(new_string)
result = file_ops.patch_replace(path, old_string, new_string, replace_all) result = file_ops.patch_replace(path, old_string, new_string, replace_all)
elif mode == "patch": elif mode == "patch":
if not patch: if not patch:
return json.dumps({"error": "patch content required"}) return json.dumps({"error": "patch content required"})
patch = _strip_ansi(patch)
result = file_ops.patch_v4a(patch) result = file_ops.patch_v4a(patch)
else: else:
return json.dumps({"error": f"Unknown mode: {mode}"}) return json.dumps({"error": f"Unknown mode: {mode}"})

View file

@ -426,12 +426,14 @@ class ProcessRegistry:
def poll(self, session_id: str) -> dict: def poll(self, session_id: str) -> dict:
"""Check status and get new output for a background process.""" """Check status and get new output for a background process."""
from tools.ansi_strip import strip_ansi
session = self.get(session_id) session = self.get(session_id)
if session is None: if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"} return {"status": "not_found", "error": f"No process with ID {session_id}"}
with session._lock: with session._lock:
output_preview = session.output_buffer[-1000:] if session.output_buffer else "" output_preview = strip_ansi(session.output_buffer[-1000:]) if session.output_buffer else ""
result = { result = {
"session_id": session.id, "session_id": session.id,
@ -450,12 +452,14 @@ class ProcessRegistry:
def read_log(self, session_id: str, offset: int = 0, limit: int = 200) -> dict: def read_log(self, session_id: str, offset: int = 0, limit: int = 200) -> dict:
"""Read the full output log with optional pagination by lines.""" """Read the full output log with optional pagination by lines."""
from tools.ansi_strip import strip_ansi
session = self.get(session_id) session = self.get(session_id)
if session is None: if session is None:
return {"status": "not_found", "error": f"No process with ID {session_id}"} return {"status": "not_found", "error": f"No process with ID {session_id}"}
with session._lock: with session._lock:
full_output = session.output_buffer full_output = strip_ansi(session.output_buffer)
lines = full_output.splitlines() lines = full_output.splitlines()
total_lines = len(lines) total_lines = len(lines)
@ -486,6 +490,7 @@ class ProcessRegistry:
dict with status ("exited", "timeout", "interrupted", "not_found") dict with status ("exited", "timeout", "interrupted", "not_found")
and output snapshot. and output snapshot.
""" """
from tools.ansi_strip import strip_ansi
from tools.terminal_tool import _interrupt_event from tools.terminal_tool import _interrupt_event
default_timeout = int(os.getenv("TERMINAL_TIMEOUT", "180")) default_timeout = int(os.getenv("TERMINAL_TIMEOUT", "180"))
@ -513,7 +518,7 @@ class ProcessRegistry:
result = { result = {
"status": "exited", "status": "exited",
"exit_code": session.exit_code, "exit_code": session.exit_code,
"output": session.output_buffer[-2000:], "output": strip_ansi(session.output_buffer[-2000:]),
} }
if timeout_note: if timeout_note:
result["timeout_note"] = timeout_note result["timeout_note"] = timeout_note
@ -522,7 +527,7 @@ class ProcessRegistry:
if _interrupt_event.is_set(): if _interrupt_event.is_set():
result = { result = {
"status": "interrupted", "status": "interrupted",
"output": session.output_buffer[-1000:], "output": strip_ansi(session.output_buffer[-1000:]),
"note": "User sent a new message -- wait interrupted", "note": "User sent a new message -- wait interrupted",
} }
if timeout_note: if timeout_note:
@ -533,7 +538,7 @@ class ProcessRegistry:
result = { result = {
"status": "timeout", "status": "timeout",
"output": session.output_buffer[-1000:], "output": strip_ansi(session.output_buffer[-1000:]),
} }
if timeout_note: if timeout_note:
result["timeout_note"] = timeout_note result["timeout_note"] = timeout_note

View file

@ -1163,6 +1163,11 @@ def terminal_tool(
) )
output = output[:head_chars] + truncated_notice + output[-tail_chars:] output = output[:head_chars] + truncated_notice + output[-tail_chars:]
# Strip ANSI escape sequences so the model never sees terminal
# formatting — prevents it from copying escapes into file writes.
from tools.ansi_strip import strip_ansi
output = strip_ansi(output)
# Redact secrets from command output (catches env/printenv leaking keys) # Redact secrets from command output (catches env/printenv leaking keys)
from agent.redact import redact_sensitive_text from agent.redact import redact_sensitive_text
output = redact_sensitive_text(output.strip()) if output else "" output = redact_sensitive_text(output.strip()) if output else ""