diff --git a/agent/context_compressor.py b/agent/context_compressor.py index e7a14faf51b..8eadcf26ef8 100644 --- a/agent/context_compressor.py +++ b/agent/context_compressor.py @@ -221,6 +221,114 @@ def _truncate_tool_call_args_json(args: str, head_chars: int = 200) -> str: return json.dumps(shrunken, ensure_ascii=False) +_IMAGE_PART_TYPES = frozenset({"image_url", "input_image", "image"}) + + +def _is_image_part(part: Any) -> bool: + """True if ``part`` is a multimodal image content block. + + Recognizes all three shapes the agent handles: + - OpenAI chat.completions: ``{"type": "image_url", "image_url": ...}`` + - OpenAI Responses API: ``{"type": "input_image", "image_url": "..."}`` + - Anthropic native: ``{"type": "image", "source": {...}}`` + """ + if not isinstance(part, dict): + return False + return part.get("type") in _IMAGE_PART_TYPES + + +def _content_has_images(content: Any) -> bool: + """True if a message's ``content`` is a multimodal list with image parts.""" + if not isinstance(content, list): + return False + return any(_is_image_part(p) for p in content) + + +def _strip_images_from_content(content: Any) -> Any: + """Return a copy of ``content`` with every image part replaced by a + short text placeholder. + + - String content is returned unchanged. + - Non-list, non-string content is returned unchanged. + - List content: image parts become ``{"type": "text", "text": "[Attached + image — stripped after compression]"}``; other parts are preserved as-is. + + Input is never mutated. + """ + if not isinstance(content, list): + return content + if not any(_is_image_part(p) for p in content): + return content + + new_parts: List[Any] = [] + for p in content: + if _is_image_part(p): + new_parts.append({ + "type": "text", + "text": "[Attached image — stripped after compression]", + }) + else: + new_parts.append(p) + return new_parts + + +def _strip_historical_media(messages: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Replace image parts in older messages with placeholder text. + + The anchor is the *last* user message that has any image content. Every + message before that anchor gets its image parts replaced with a short + placeholder so the outgoing request stops re-shipping the same multi-MB + base-64 image blobs on every turn. + + If no user message carries images, the list is returned unchanged. + If the only user message with images is the very first one (nothing + earlier to strip), the list is returned unchanged. + + Shallow copies of touched messages only; input is never mutated. + Port of Kilo-Org/kilocode#9434 (adapted for the OpenAI-style message + shape the hermes compressor emits). + """ + if not messages: + return messages + + # Find the newest user message that carries at least one image part. + # We anchor on image-bearing user messages (not all user messages) so + # a plain text follow-up after a big-image turn still strips the old + # image — matching the problem kilocode#9434 set out to solve. + anchor = -1 + for i in range(len(messages) - 1, -1, -1): + msg = messages[i] + if not isinstance(msg, dict): + continue + if msg.get("role") != "user": + continue + if _content_has_images(msg.get("content")): + anchor = i + break + + if anchor <= 0: + # No image-bearing user message, or it's the very first message — + # nothing before it to strip. + return messages + + changed = False + result: List[Dict[str, Any]] = [] + for i, msg in enumerate(messages): + if i >= anchor or not isinstance(msg, dict): + result.append(msg) + continue + content = msg.get("content") + if not _content_has_images(content): + result.append(msg) + continue + new_msg = msg.copy() + new_msg["content"] = _strip_images_from_content(content) + result.append(new_msg) + changed = True + + return result if changed else messages + + def _summarize_tool_result(tool_name: str, tool_args: str, tool_content: str) -> str: """Create an informative 1-line summary of a tool call + result. @@ -1559,6 +1667,14 @@ The user has requested that this compaction PRIORITISE preserving all informatio compressed = self._sanitize_tool_pairs(compressed) + # Replace image parts in all compressed messages before the newest + # image-bearing user turn with a short text placeholder. Without + # this, tail messages keep their original multi-MB base-64 image + # payloads forever, which can push every subsequent API request + # past the provider's body-size limit and wedge the session. + # Port of Kilo-Org/kilocode#9434. + compressed = _strip_historical_media(compressed) + new_estimate = estimate_messages_tokens_rough(compressed) saved_estimate = display_tokens - new_estimate diff --git a/tests/agent/test_compressor_historical_media.py b/tests/agent/test_compressor_historical_media.py new file mode 100644 index 00000000000..3594ef9bdde --- /dev/null +++ b/tests/agent/test_compressor_historical_media.py @@ -0,0 +1,266 @@ +"""Tests for post-compression historical-media stripping. + +Port of Kilo-Org/kilocode#9434 (adapted for OpenAI-style message lists). +Without this pass, tail messages keep their original multi-MB base-64 image +payloads after context compression, and every subsequent request re-ships +them — sometimes breaching provider body-size limits and wedging the +session. +""" + +from __future__ import annotations + +from unittest.mock import patch + +import pytest + +from agent.context_compressor import ( + ContextCompressor, + _content_has_images, + _is_image_part, + _strip_historical_media, + _strip_images_from_content, +) + + +IMG_URL = { + "type": "image_url", + "image_url": {"url": "data:image/png;base64," + ("A" * 1024)}, +} +INPUT_IMG = { + "type": "input_image", + "image_url": "data:image/png;base64," + ("B" * 1024), +} +ANTHROPIC_IMG = { + "type": "image", + "source": {"type": "base64", "media_type": "image/png", "data": "C" * 1024}, +} +TEXT = {"type": "text", "text": "hi"} +INPUT_TEXT = {"type": "input_text", "text": "hi"} + + +class TestIsImagePart: + def test_openai_chat_shape(self): + assert _is_image_part(IMG_URL) is True + + def test_openai_responses_shape(self): + assert _is_image_part(INPUT_IMG) is True + + def test_anthropic_native_shape(self): + assert _is_image_part(ANTHROPIC_IMG) is True + + def test_text_part_is_not_image(self): + assert _is_image_part(TEXT) is False + assert _is_image_part(INPUT_TEXT) is False + + def test_non_dict_rejected(self): + assert _is_image_part("image") is False + assert _is_image_part(None) is False + assert _is_image_part(42) is False + + +class TestContentHasImages: + def test_string_content(self): + assert _content_has_images("a string") is False + + def test_empty_list(self): + assert _content_has_images([]) is False + + def test_text_only_list(self): + assert _content_has_images([TEXT, TEXT]) is False + + def test_list_with_image(self): + assert _content_has_images([TEXT, IMG_URL]) is True + + def test_none(self): + assert _content_has_images(None) is False + + +class TestStripImagesFromContent: + def test_string_passthrough(self): + assert _strip_images_from_content("hello") == "hello" + + def test_none_passthrough(self): + assert _strip_images_from_content(None) is None + + def test_text_only_passthrough(self): + parts = [TEXT, {"type": "text", "text": "world"}] + assert _strip_images_from_content(parts) == parts + + def test_replaces_image_with_placeholder(self): + parts = [TEXT, IMG_URL] + out = _strip_images_from_content(parts) + assert len(out) == 2 + assert out[0] == TEXT + assert out[1] == { + "type": "text", + "text": "[Attached image — stripped after compression]", + } + + def test_does_not_mutate_input(self): + parts = [IMG_URL, TEXT] + _ = _strip_images_from_content(parts) + assert parts[0] is IMG_URL # original list untouched + assert parts[1] is TEXT + + def test_handles_all_three_shapes(self): + parts = [IMG_URL, INPUT_IMG, ANTHROPIC_IMG, TEXT] + out = _strip_images_from_content(parts) + assert sum(1 for p in out if p.get("type") == "text") == 4 + assert not any(_is_image_part(p) for p in out) + + +class TestStripHistoricalMedia: + def test_empty_passthrough(self): + assert _strip_historical_media([]) == [] + + def test_no_images_anywhere(self): + msgs = [ + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": "hey"}, + {"role": "user", "content": "bye"}, + ] + assert _strip_historical_media(msgs) is msgs # identity — no copy + + def test_single_image_user_only_first_message(self): + # Only image-bearing user is the first message — nothing before it. + msgs = [ + {"role": "user", "content": [TEXT, IMG_URL]}, + {"role": "assistant", "content": "ok"}, + ] + out = _strip_historical_media(msgs) + assert out is msgs # no-op + # Image still there. + assert _content_has_images(out[0]["content"]) + + def test_strips_older_user_image_keeps_newest(self): + msgs = [ + {"role": "user", "content": [TEXT, IMG_URL]}, # old — strip + {"role": "assistant", "content": "looked at it"}, + {"role": "user", "content": [TEXT, INPUT_IMG]}, # newest — keep + ] + out = _strip_historical_media(msgs) + assert out is not msgs # new list + # First message's image was replaced + assert not _content_has_images(out[0]["content"]) + # Newest user still has its image + assert _content_has_images(out[2]["content"]) + + def test_strips_assistant_and_tool_images_before_anchor(self): + msgs = [ + {"role": "user", "content": [TEXT, IMG_URL]}, # old user + {"role": "assistant", "content": [TEXT, IMG_URL]}, # old assistant + {"role": "tool", "content": [TEXT, IMG_URL], "tool_call_id": "t1"}, + {"role": "user", "content": [TEXT, IMG_URL]}, # newest user — keep + ] + out = _strip_historical_media(msgs) + for i in range(3): + assert not _content_has_images(out[i]["content"]), f"msg {i} still has image" + assert _content_has_images(out[3]["content"]) + + def test_text_only_newest_user_still_strips_older_images(self): + # The anchor is "newest user WITH images". If the newest user is + # text-only, we fall back to the previous image-bearing user turn. + msgs = [ + {"role": "user", "content": [TEXT, IMG_URL]}, + {"role": "assistant", "content": "ok"}, + {"role": "user", "content": [TEXT, IMG_URL]}, # anchor + {"role": "assistant", "content": "done"}, + {"role": "user", "content": "follow-up text only"}, + ] + out = _strip_historical_media(msgs) + # First image-bearing user (index 0) was stripped — it was before the + # newest image-bearing user (index 2). + assert not _content_has_images(out[0]["content"]) + # Anchor (index 2) keeps its image. + assert _content_has_images(out[2]["content"]) + + def test_no_image_bearing_user_is_noop(self): + msgs = [ + {"role": "user", "content": "first"}, + {"role": "assistant", "content": [TEXT, IMG_URL]}, # assistant image only + {"role": "user", "content": "second"}, + ] + out = _strip_historical_media(msgs) + # No image-bearing user anchor → no stripping. + assert out is msgs + assert _content_has_images(out[1]["content"]) + + def test_does_not_mutate_input_messages(self): + msg0 = {"role": "user", "content": [TEXT, IMG_URL]} + msg1 = {"role": "user", "content": [TEXT, IMG_URL]} + msgs = [msg0, msg1] + _ = _strip_historical_media(msgs) + # Originals untouched + assert _content_has_images(msg0["content"]) + assert _content_has_images(msg1["content"]) + + def test_idempotent(self): + msgs = [ + {"role": "user", "content": [TEXT, IMG_URL]}, + {"role": "assistant", "content": "k"}, + {"role": "user", "content": [TEXT, IMG_URL]}, + ] + first = _strip_historical_media(msgs) + second = _strip_historical_media(first) + # Second pass is a no-op — no images left before the anchor. + assert second is first + + def test_non_dict_messages_pass_through(self): + msgs = [ + "not-a-dict", # shouldn't crash + {"role": "user", "content": [TEXT, IMG_URL]}, + {"role": "assistant", "content": "ok"}, + {"role": "user", "content": [TEXT, IMG_URL]}, + ] + out = _strip_historical_media(msgs) + assert out[0] == "not-a-dict" + # Image-bearing user at index 1 is before the anchor (index 3) → stripped. + assert not _content_has_images(out[1]["content"]) + + +class TestCompressIntegration: + """Verify the stripping runs inside ContextCompressor.compress().""" + + @pytest.fixture + def compressor(self): + with patch("agent.context_compressor.get_model_context_length", return_value=100_000): + c = ContextCompressor( + model="test/model", + threshold_percent=0.50, + protect_first_n=1, + protect_last_n=2, + quiet_mode=True, + ) + return c + + def test_compress_strips_historical_images(self, compressor): + # Enough messages to trigger the summarize path. protect_first_n=1 + + # protect_last_n=2 + a middle window of at least 3 with a summary. + msgs = [ + {"role": "system", "content": "sys"}, + {"role": "user", "content": [TEXT, IMG_URL]}, # old image-bearing user + {"role": "assistant", "content": "looked at it"}, + {"role": "user", "content": "follow-up"}, + {"role": "assistant", "content": "ack"}, + {"role": "user", "content": "more"}, + {"role": "assistant", "content": "ok"}, + {"role": "user", "content": [TEXT, IMG_URL]}, # newest image-bearing user (tail) + {"role": "assistant", "content": "done"}, + ] + # Bypass the real LLM summary — return a stub so compress() proceeds. + with patch.object(compressor, "_generate_summary", return_value="SUMMARY TEXT"): + out = compressor.compress(msgs, current_tokens=60_000) + + # Newest user turn with image should still have it (it's in the tail). + user_imgs = [m for m in out if m.get("role") == "user" and _content_has_images(m.get("content"))] + assert len(user_imgs) == 1, ( + "Expected exactly one user message with images after compression " + f"(the newest one); got {len(user_imgs)}" + ) + # No assistant or tool messages should carry images either. + for m in out: + if m is user_imgs[0]: + continue + assert not _content_has_images(m.get("content")), ( + f"Stale image in {m.get('role')!r} message after compression" + )