hermes-agent/tests/run_agent/test_413_compression.py
Teknium ec46f5912e
fix(gemini): default native maxOutputTokens + strip OpenAI extra_body on Gemini endpoints (#39730)
* fix: respect disabled auto-compaction on context overflow

Port from anomalyco/opencode#30749.

When compression.enabled is false, NO automatic compaction trigger may
fire. The proactive token-threshold paths (preflight + post-response
should_compress gate) already honoured the setting, but the three
provider-overflow recovery paths in the agent loop — long-context-tier
429, 413 payload-too-large, and context-overflow — called
_compress_context() unconditionally, silently compressing and rotating
the session against the user's explicit choice.

Add a single guard at the top of the overflow-recovery dispatch: when
compression is disabled and the error is one of those three overflow
classes, surface a terminal error (compaction_disabled: True) telling the
user to /compress manually, /new, switch to a larger-context model, or
reduce attachments. Manual /compress (force=True) is unaffected — it never
enters this loop.

Tests: new TestOverflowWithCompactionDisabled (413 + 400 overflow don't
compress when disabled; control case still compresses when enabled).
Existing overflow-recovery tests updated to enable compaction explicitly
(they verify the recovery fires); fixture defaults flipped to True to
match production (compression.enabled defaults to True).

* fix(gemini): default native maxOutputTokens + strip OpenAI extra_body on Gemini endpoints

Two distinct failures hit users on the gemini provider with only Google
AI Studio keys set.

1. Truncation loop: build_gemini_request() only set maxOutputTokens when
   max_tokens was non-None. Hermes passes None to mean "unlimited", but
   Gemini's native generateContent does NOT treat an absent maxOutputTokens
   as full budget — it applies a low internal default and stops early with
   finishReason=MAX_TOKENS, truncating tool calls. The agent then retries
   3x and refuses the incomplete call. Now default to the published 65,535
   ceiling (shared by all current Gemini text models) when max_tokens=None.

2. HTTP 400 on Gemini endpoint: the chat_completions transport assembles
   profile extra_body (Nous portal 'tags', reasoning, provider prefs) and
   sends it via the OpenAI client to whatever base_url is resolved. When a
   profile that emits extra_body (e.g. Nous) is active but the endpoint is a
   native Gemini base_url — typical when only Google creds exist and a
   fallback/aux call lands on Gemini — Google rejects the unknown 'tags'
   field with a non-retryable 400. Strip all non-thinking_config extra_body
   keys when the resolved endpoint is native Gemini.

Verified E2E against real transport code: tags stripped on native Gemini,
preserved on Nous and the /openai compat endpoint; maxOutputTokens=65535
on None, explicit values respected.
2026-06-05 03:53:59 -07:00

907 lines
39 KiB
Python

"""Tests for payload/context-length → compression retry logic in AIAgent.
Verifies that:
- HTTP 413 errors trigger history compression and retry
- HTTP 400 context-length errors trigger compression (not generic 4xx abort)
- Preflight compression proactively compresses oversized sessions before API calls
"""
import pytest
#pytestmark = pytest.mark.skip(reason="Hangs in non-interactive environments")
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from agent.context_compressor import SUMMARY_PREFIX
from run_agent import AIAgent
import run_agent
# ---------------------------------------------------------------------------
# Fast backoff for compression retry tests
# ---------------------------------------------------------------------------
@pytest.fixture(autouse=True)
def _no_compression_sleep(monkeypatch):
"""Short-circuit the 2s time.sleep between compression retries.
Production code has ``time.sleep(2)`` in multiple places after a 413/context
compression, for rate-limit smoothing. Tests assert behavior, not timing.
"""
import time as _time
monkeypatch.setattr(_time, "sleep", lambda *_a, **_k: None)
monkeypatch.setattr(run_agent, "jittered_backoff", lambda *a, **k: 0.0)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_tool_defs(*names: str) -> list:
return [
{
"type": "function",
"function": {
"name": n,
"description": f"{n} tool",
"parameters": {"type": "object", "properties": {}},
},
}
for n in names
]
def _mock_response(content="Hello", finish_reason="stop", tool_calls=None, usage=None):
msg = SimpleNamespace(
content=content,
tool_calls=tool_calls,
reasoning_content=None,
reasoning=None,
)
choice = SimpleNamespace(message=msg, finish_reason=finish_reason)
resp = SimpleNamespace(choices=[choice], model="test/model")
resp.usage = SimpleNamespace(**usage) if usage else None
return resp
def _make_413_error(*, use_status_code=True, message="Request entity too large"):
"""Create an exception that mimics a 413 HTTP error."""
err = Exception(message)
if use_status_code:
err.status_code = 413
return err
@pytest.fixture()
def agent():
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key-1234567890",
base_url="https://openrouter.ai/api/v1",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
a.client = MagicMock()
a._cached_system_prompt = "You are helpful."
a._use_prompt_caching = False
a.tool_delay = 0
# Default matches production (`compression.enabled` defaults to True).
# Overflow-recovery tests below verify that 413 / context-overflow
# errors DO trigger compression; the disabled-path behavior is
# covered explicitly by TestOverflowWithCompactionDisabled.
a.compression_enabled = True
a.save_trajectories = False
return a
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestHTTP413Compression:
"""413 errors should trigger compression, not abort as generic 4xx."""
def test_413_triggers_compression(self, agent):
"""A 413 error should call _compress_context and retry, not abort."""
# First call raises 413; second call succeeds after compression.
err_413 = _make_413_error()
ok_resp = _mock_response(content="Success after compression", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [err_413, ok_resp]
# Prefill so there are multiple messages for compression to reduce
prefill = [
{"role": "user", "content": "previous question"},
{"role": "assistant", "content": "previous answer"},
]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
# Compression reduces 3 messages down to 1
mock_compress.return_value = (
[{"role": "user", "content": "hello"}],
"compressed prompt",
)
result = agent.run_conversation("hello", conversation_history=prefill)
mock_compress.assert_called_once()
assert result["completed"] is True
assert result["final_response"] == "Success after compression"
def test_413_not_treated_as_generic_4xx(self, agent):
"""413 must NOT hit the generic 4xx abort path; it should attempt compression."""
err_413 = _make_413_error()
ok_resp = _mock_response(content="Recovered", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [err_413, ok_resp]
prefill = [
{"role": "user", "content": "previous question"},
{"role": "assistant", "content": "previous answer"},
]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
mock_compress.return_value = (
[{"role": "user", "content": "hello"}],
"compressed",
)
result = agent.run_conversation("hello", conversation_history=prefill)
# If 413 were treated as generic 4xx, result would have "failed": True
assert result.get("failed") is not True
assert result["completed"] is True
def test_413_error_message_detection(self, agent):
"""413 detected via error message string (no status_code attr)."""
err = _make_413_error(use_status_code=False, message="error code: 413")
ok_resp = _mock_response(content="OK", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [err, ok_resp]
prefill = [
{"role": "user", "content": "previous question"},
{"role": "assistant", "content": "previous answer"},
]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
mock_compress.return_value = (
[{"role": "user", "content": "hello"}],
"compressed",
)
result = agent.run_conversation("hello", conversation_history=prefill)
mock_compress.assert_called_once()
assert result["completed"] is True
def test_413_clears_conversation_history_on_persist(self, agent):
"""After 413-triggered compression, _persist_session must receive None history.
Bug: _compress_context() creates a new session and resets _last_flushed_db_idx=0,
but if conversation_history still holds the original (pre-compression) list,
_flush_messages_to_session_db computes flush_from = max(len(history), 0) which
exceeds len(compressed_messages), so messages[flush_from:] is empty and nothing
is written to the new session → "Session found but has no messages" on resume.
"""
err_413 = _make_413_error()
ok_resp = _mock_response(content="OK", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [err_413, ok_resp]
big_history = [
{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"}
for i in range(200)
]
persist_calls = []
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(
agent, "_persist_session",
side_effect=lambda msgs, hist: persist_calls.append(hist),
),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
mock_compress.return_value = (
[{"role": "user", "content": "summary"}],
"compressed prompt",
)
agent.run_conversation("hello", conversation_history=big_history)
assert len(persist_calls) >= 1, "Expected at least one _persist_session call"
for hist in persist_calls:
assert hist is None, (
f"conversation_history should be None after mid-loop compression, "
f"got list with {len(hist)} items"
)
def test_context_overflow_clears_conversation_history_on_persist(self, agent):
"""After context-overflow compression, _persist_session must receive None history."""
err_400 = Exception(
"Error code: 400 - This endpoint's maximum context length is 128000 tokens. "
"However, you requested about 270460 tokens."
)
err_400.status_code = 400
ok_resp = _mock_response(content="OK", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [err_400, ok_resp]
big_history = [
{"role": "user" if i % 2 == 0 else "assistant", "content": f"msg {i}"}
for i in range(200)
]
persist_calls = []
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(
agent, "_persist_session",
side_effect=lambda msgs, hist: persist_calls.append(hist),
),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
mock_compress.return_value = (
[{"role": "user", "content": "summary"}],
"compressed prompt",
)
agent.run_conversation("hello", conversation_history=big_history)
assert len(persist_calls) >= 1
for hist in persist_calls:
assert hist is None, (
f"conversation_history should be None after context-overflow compression, "
f"got list with {len(hist)} items"
)
def test_400_context_length_triggers_compression(self, agent):
"""A 400 with 'maximum context length' should trigger compression, not abort as generic 4xx.
OpenRouter returns HTTP 400 (not 413) for context-length errors. Before
the fix, this was caught by the generic 4xx handler which aborted
immediately — now it correctly triggers compression+retry.
"""
err_400 = Exception(
"Error code: 400 - {'error': {'message': "
"\"This endpoint's maximum context length is 204800 tokens. "
"However, you requested about 270460 tokens.\", 'code': 400}}"
)
err_400.status_code = 400
ok_resp = _mock_response(content="Recovered after compression", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [err_400, ok_resp]
prefill = [
{"role": "user", "content": "previous question"},
{"role": "assistant", "content": "previous answer"},
]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
mock_compress.return_value = (
[{"role": "user", "content": "hello"}],
"compressed prompt",
)
result = agent.run_conversation("hello", conversation_history=prefill)
mock_compress.assert_called_once()
# Must NOT have "failed": True (which would mean the generic 4xx handler caught it)
assert result.get("failed") is not True
assert result["completed"] is True
assert result["final_response"] == "Recovered after compression"
def test_400_reduce_length_triggers_compression(self, agent):
"""A 400 with 'reduce the length' should trigger compression."""
err_400 = Exception(
"Error code: 400 - Please reduce the length of the messages"
)
err_400.status_code = 400
ok_resp = _mock_response(content="OK", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [err_400, ok_resp]
prefill = [
{"role": "user", "content": "previous question"},
{"role": "assistant", "content": "previous answer"},
]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
mock_compress.return_value = (
[{"role": "user", "content": "hello"}],
"compressed",
)
result = agent.run_conversation("hello", conversation_history=prefill)
mock_compress.assert_called_once()
assert result["completed"] is True
def test_context_length_retry_rebuilds_request_after_compression(self, agent):
"""Retry must send the compressed transcript, not the stale oversized payload."""
err_400 = Exception(
"Error code: 400 - {'error': {'message': "
"\"This endpoint's maximum context length is 128000 tokens. "
"Please reduce the length of the messages.\"}}"
)
err_400.status_code = 400
ok_resp = _mock_response(content="Recovered after real compression", finish_reason="stop")
request_payloads = []
def _side_effect(**kwargs):
request_payloads.append(kwargs)
if len(request_payloads) == 1:
raise err_400
return ok_resp
agent.client.chat.completions.create.side_effect = _side_effect
prefill = [
{"role": "user", "content": "previous question"},
{"role": "assistant", "content": "previous answer"},
]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
mock_compress.return_value = (
[{"role": "user", "content": "compressed summary"}],
"compressed prompt",
)
result = agent.run_conversation("hello", conversation_history=prefill)
assert result["completed"] is True
assert len(request_payloads) == 2
assert len(request_payloads[1]["messages"]) < len(request_payloads[0]["messages"])
assert request_payloads[1]["messages"][0] == {
"role": "system",
"content": "compressed prompt",
}
assert request_payloads[1]["messages"][1] == {
"role": "user",
"content": "compressed summary",
}
def test_413_cannot_compress_further(self, agent):
"""When compression can't reduce messages, return partial result."""
err_413 = _make_413_error()
agent.client.chat.completions.create.side_effect = [err_413]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
# Compression returns same number of messages → can't compress further
mock_compress.return_value = (
[{"role": "user", "content": "hello"}],
"same prompt",
)
result = agent.run_conversation("hello")
assert result["completed"] is False
assert result.get("partial") is True
assert "413" in result["error"]
class TestPreflightCompression:
"""Preflight compression should compress history before the first API call."""
def test_compress_context_emits_lifecycle_status_before_work(self, agent):
"""Direct context compression should tell gateway users why the turn paused."""
# This test calls _compress_context directly and asserts the FIRST
# status event is the lifecycle "Compacting context" message. With
# compaction enabled the lazy feasibility probe would emit an
# aux-provider warning first (no aux key in the hermetic test env),
# displacing events[0]. The flag value is irrelevant to what this
# test asserts, so disable it to suppress the probe.
agent.compression_enabled = False
events = []
agent.status_callback = lambda ev, msg: events.append((ev, msg))
def _fake_compress(messages, current_tokens=None, focus_topic=None):
events.append(("compress", "started"))
return [{"role": "user", "content": f"{SUMMARY_PREFIX}\nPrevious conversation"}]
with (
patch.object(agent.context_compressor, "compress", side_effect=_fake_compress),
patch.object(agent, "_build_system_prompt", return_value="new system prompt"),
patch("run_agent.estimate_request_tokens_rough", return_value=42),
):
compressed, new_system_prompt = agent._compress_context(
[{"role": "user", "content": "hello"}],
"system prompt",
approx_tokens=1234,
)
assert compressed == [{"role": "user", "content": f"{SUMMARY_PREFIX}\nPrevious conversation"}]
assert new_system_prompt == "new system prompt"
assert events[0][0] == "lifecycle"
assert "Compacting context" in events[0][1]
assert events[1] == ("compress", "started")
def test_preflight_compresses_oversized_history(self, agent):
"""When loaded history exceeds the model's context threshold, compress before API call."""
agent.compression_enabled = True
# Set a small context so the history is "oversized", but large enough
# that the compressed result (2 short messages) fits in a single pass.
agent.context_compressor.context_length = 2000
agent.context_compressor.threshold_tokens = 200
# Build a history that will be large enough to trigger preflight
# (each message ~50 chars ≈ 13 tokens, 40 messages ≈ 520 tokens > 200 threshold)
big_history = []
for i in range(20):
big_history.append({"role": "user", "content": f"Message number {i} with some extra text padding"})
big_history.append({"role": "assistant", "content": f"Response number {i} with extra padding here"})
ok_resp = _mock_response(content="After preflight", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [ok_resp]
status_messages = []
agent.status_callback = lambda ev, msg: status_messages.append((ev, msg))
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
# Simulate compression reducing messages to a small set that fits
mock_compress.return_value = (
[
{"role": "user", "content": f"{SUMMARY_PREFIX}\nPrevious conversation"},
{"role": "user", "content": "hello"},
],
"new system prompt",
)
result = agent.run_conversation("hello", conversation_history=big_history)
# Preflight compression is a multi-pass loop (up to 3 passes for very
# large sessions, breaking when no further reduction is possible).
# First pass must have received the full oversized history.
assert mock_compress.call_count >= 1, "Preflight compression never ran"
first_call_messages = mock_compress.call_args_list[0].args[0]
assert len(first_call_messages) >= 40, (
f"First preflight pass should see the full history, got "
f"{len(first_call_messages)} messages"
)
assert result["completed"] is True
assert result["final_response"] == "After preflight"
assert any(
ev == "lifecycle" and "Preflight compression" in msg
for ev, msg in status_messages
)
def test_preflight_defers_when_recent_real_usage_fit(self, agent):
"""A noisy rough estimate should not re-compact a recently fitting request."""
agent.compression_enabled = True
agent.context_compressor.context_length = 200_000
agent.context_compressor.threshold_tokens = 100_000
agent.context_compressor.last_prompt_tokens = 58_000
agent.context_compressor.last_real_prompt_tokens = 58_000
agent.context_compressor.last_rough_tokens_when_real_prompt_fit = 113_000
big_history = []
for i in range(20):
big_history.append({"role": "user", "content": f"Message {i} padded"})
big_history.append({"role": "assistant", "content": f"Response {i} padded"})
ok_resp = _mock_response(
content="Used real fit",
finish_reason="stop",
usage={"prompt_tokens": 59_000, "completion_tokens": 100, "total_tokens": 59_100},
)
agent.client.chat.completions.create.side_effect = [ok_resp]
status_messages = []
agent.status_callback = lambda ev, msg: status_messages.append((ev, msg))
with (
patch("agent.conversation_loop.estimate_request_tokens_rough", return_value=114_000),
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("hello", conversation_history=big_history)
mock_compress.assert_not_called()
assert result["completed"] is True
assert result["final_response"] == "Used real fit"
assert not any(
ev == "lifecycle" and "Preflight compression" in msg
for ev, msg in status_messages
)
def test_preflight_compresses_when_rough_growth_after_fit_is_large(self, agent):
"""Large rough growth after a fitting request still triggers preflight."""
agent.compression_enabled = True
agent.context_compressor.context_length = 200_000
agent.context_compressor.threshold_tokens = 100_000
agent.context_compressor.last_prompt_tokens = 58_000
agent.context_compressor.last_real_prompt_tokens = 58_000
agent.context_compressor.last_rough_tokens_when_real_prompt_fit = 113_000
big_history = []
for i in range(20):
big_history.append({"role": "user", "content": f"Message {i} padded"})
big_history.append({"role": "assistant", "content": f"Response {i} padded"})
ok_resp = _mock_response(
content="Compressed after growth",
finish_reason="stop",
usage={"prompt_tokens": 50_000, "completion_tokens": 100, "total_tokens": 50_100},
)
agent.client.chat.completions.create.side_effect = [ok_resp]
# First rough estimate must clear the threshold so preflight fires
# (rough growth since the last fitting request is large, so the
# deferral path is NOT taken). Every estimate after compaction is
# sub-threshold. Use a callable side_effect rather than a fixed list
# so we don't have to predict how many times the loop re-estimates —
# the post-response real-token estimate is an extra call that a
# 2-element list would exhaust (StopIteration).
_rough_calls = {"n": 0}
def _rough_estimate(*_args, **_kwargs):
_rough_calls["n"] += 1
return 125_000 if _rough_calls["n"] == 1 else 40_000
with (
patch("agent.conversation_loop.estimate_request_tokens_rough", side_effect=_rough_estimate),
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
mock_compress.return_value = (
[{"role": "user", "content": f"{SUMMARY_PREFIX}\nPrevious conversation"}],
"new system prompt",
)
result = agent.run_conversation("hello", conversation_history=big_history)
mock_compress.assert_called_once()
assert result["completed"] is True
def test_no_preflight_when_under_threshold(self, agent):
"""When history fits within context, no preflight compression needed."""
agent.compression_enabled = True
# Large context — history easily fits
agent.context_compressor.context_length = 1000000
agent.context_compressor.threshold_tokens = 850000
small_history = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hello"},
]
ok_resp = _mock_response(content="No compression needed", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [ok_resp]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("hello", conversation_history=small_history)
mock_compress.assert_not_called()
assert result["completed"] is True
def test_no_preflight_when_compression_disabled(self, agent):
"""Preflight should not run when compression is disabled."""
agent.compression_enabled = False
agent.context_compressor.context_length = 100
agent.context_compressor.threshold_tokens = 85
big_history = [
{"role": "user", "content": "x" * 1000},
{"role": "assistant", "content": "y" * 1000},
] * 10
ok_resp = _mock_response(content="OK", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [ok_resp]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("hello", conversation_history=big_history)
mock_compress.assert_not_called()
def test_preflight_respects_anti_thrash(self, agent):
"""Preflight must call ``should_compress()`` so anti-thrash applies.
Regression for #29335 — preflight used to bypass ``should_compress()``
and re-trigger every turn even when the prior two passes each saved
<10% (the canonical infinite-compression-loop signal).
"""
agent.compression_enabled = True
agent.context_compressor.context_length = 2000
agent.context_compressor.threshold_tokens = 200
big_history = []
for i in range(20):
big_history.append({"role": "user", "content": f"Message {i} padded"})
big_history.append({"role": "assistant", "content": f"Response {i} padded"})
ok_resp = _mock_response(content="No preflight", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [ok_resp]
with (
patch.object(agent.context_compressor, "should_compress", return_value=False) as mock_should,
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("hello", conversation_history=big_history)
# The gate consulted should_compress — anti-thrash had a chance to vote.
mock_should.assert_called()
# And vetoed: even though tokens >= threshold, no compression ran.
mock_compress.assert_not_called()
assert result["completed"] is True
def test_preflight_seeds_display_tokens_when_compression_aborts(self, agent):
"""Display must reflect the real context size even when compression no-ops.
Regression: the CLI status bar reads ``last_prompt_tokens``, which only
updated from a *successful* API response. When the loaded history was
oversized but compression failed to reduce it (e.g. the auxiliary
summary model timed out), the bar stayed stuck at the old, smaller
value while the preflight estimate reported a much larger number —
looking permanently out of sync.
"""
agent.compression_enabled = True
agent.context_compressor.context_length = 200_000
agent.context_compressor.threshold_tokens = 130_000
# Simulate a stale display value from an earlier, smaller turn.
agent.context_compressor.last_prompt_tokens = 74_400
big_history = []
for i in range(20):
big_history.append({"role": "user", "content": f"Message {i} padded text"})
big_history.append({"role": "assistant", "content": f"Response {i} padded text"})
ok_resp = _mock_response(content="After preflight", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [ok_resp]
with (
patch("agent.conversation_loop.estimate_request_tokens_rough", return_value=144_669),
# Compression no-ops (returns input unchanged) — mirrors an aux
# summary-model timeout where the messages can't be reduced.
patch.object(agent, "_compress_context", side_effect=lambda msgs, *a, **k: (msgs, agent._cached_system_prompt)),
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("hello", conversation_history=big_history)
assert result["completed"] is True
# The display token count was revised up to the fresh preflight estimate,
# not left at the stale 74_400.
assert agent.context_compressor.last_prompt_tokens == 144_669
def test_preflight_seed_only_revises_upward(self, agent):
"""A larger tracked value must not be clobbered by a smaller estimate."""
agent.compression_enabled = True
agent.context_compressor.context_length = 200_000
agent.context_compressor.threshold_tokens = 130_000
# A real, larger usage figure is already tracked.
agent.context_compressor.last_prompt_tokens = 160_000
big_history = []
for i in range(20):
big_history.append({"role": "user", "content": f"Message {i} padded text"})
big_history.append({"role": "assistant", "content": f"Response {i} padded text"})
ok_resp = _mock_response(content="After preflight", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [ok_resp]
with (
patch("agent.conversation_loop.estimate_request_tokens_rough", return_value=144_669),
patch.object(agent, "_compress_context", side_effect=lambda msgs, *a, **k: (msgs, agent._cached_system_prompt)),
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
agent.run_conversation("hello", conversation_history=big_history)
# Smaller estimate must not overwrite the larger tracked value.
assert agent.context_compressor.last_prompt_tokens == 160_000
class TestToolResultPreflightCompression:
"""Compression should trigger when tool results push context past the threshold."""
def test_large_tool_results_trigger_compression(self, agent):
"""When tool results push estimated tokens past threshold, compress before next call."""
agent.compression_enabled = True
agent.context_compressor.context_length = 200_000
agent.context_compressor.threshold_tokens = 130_000 # below the 135k reported usage
agent.context_compressor.last_prompt_tokens = 130_000
agent.context_compressor.last_completion_tokens = 5_000
tc = SimpleNamespace(
id="tc1", type="function",
function=SimpleNamespace(name="web_search", arguments='{"query":"test"}'),
)
tool_resp = _mock_response(
content=None, finish_reason="stop", tool_calls=[tc],
usage={"prompt_tokens": 130_000, "completion_tokens": 5_000, "total_tokens": 135_000},
)
ok_resp = _mock_response(
content="Done after compression", finish_reason="stop",
usage={"prompt_tokens": 50_000, "completion_tokens": 100, "total_tokens": 50_100},
)
agent.client.chat.completions.create.side_effect = [tool_resp, ok_resp]
large_result = "x" * 100_000
with (
patch("run_agent.handle_function_call", return_value=large_result),
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
mock_compress.return_value = (
[{"role": "user", "content": "hello"}], "compressed prompt",
)
result = agent.run_conversation("hello")
mock_compress.assert_called_once()
assert result["completed"] is True
def test_anthropic_prompt_too_long_safety_net(self, agent):
"""Anthropic 'prompt is too long' error triggers compression as safety net."""
err_400 = Exception(
"Error code: 400 - {'type': 'error', 'error': {'type': 'invalid_request_error', "
"'message': 'prompt is too long: 233153 tokens > 200000 maximum'}}"
)
err_400.status_code = 400
ok_resp = _mock_response(content="Recovered", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [err_400, ok_resp]
prefill = [
{"role": "user", "content": "previous"},
{"role": "assistant", "content": "answer"},
]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
mock_compress.return_value = (
[{"role": "user", "content": "hello"}], "compressed",
)
result = agent.run_conversation("hello", conversation_history=prefill)
mock_compress.assert_called_once()
assert result["completed"] is True
# ---------------------------------------------------------------------------
# Disabled auto-compaction on overflow (port of anomalyco/opencode#30749)
# ---------------------------------------------------------------------------
class TestOverflowWithCompactionDisabled:
"""When ``compression.enabled`` is False, NO automatic compaction may
fire — including the provider/request-size overflow recovery paths.
Ported from anomalyco/opencode#30749: the proactive token-threshold
path already honoured the setting, but provider overflow errors
(413 payload-too-large, context-overflow, long-context-tier 429) still
silently compressed + rotated the session. The fix surfaces a terminal
error so the user can compact manually, start fresh, or switch models.
"""
@staticmethod
def _prefill():
return [
{"role": "user", "content": "previous question"},
{"role": "assistant", "content": "previous answer"},
]
def test_413_does_not_compress_when_disabled(self, agent):
"""413 must NOT call _compress_context when compaction is disabled."""
agent.compression_enabled = False
err_413 = _make_413_error()
# If the guard fails, a second (success) response would be consumed.
agent.client.chat.completions.create.side_effect = [err_413, _mock_response()]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session") as mock_persist,
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("hello", conversation_history=self._prefill())
mock_compress.assert_not_called()
mock_persist.assert_called()
assert result.get("failed") is True
assert result.get("compaction_disabled") is True
assert "auto-compaction is disabled" in result["error"]
def test_context_overflow_does_not_compress_when_disabled(self, agent):
"""400 'prompt is too long' must NOT compress when compaction disabled."""
agent.compression_enabled = False
err_400 = Exception(
"Error code: 400 - {'type': 'error', 'error': {'type': "
"'invalid_request_error', 'message': 'prompt is too long: "
"233153 tokens > 200000 maximum'}}"
)
err_400.status_code = 400
agent.client.chat.completions.create.side_effect = [err_400, _mock_response()]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("hello", conversation_history=self._prefill())
mock_compress.assert_not_called()
assert result.get("compaction_disabled") is True
def test_413_still_compresses_when_enabled(self, agent):
"""Control: with compaction enabled, 413 still triggers compression.
Guards against the disabled-path guard accidentally swallowing the
enabled path.
"""
agent.compression_enabled = True
err_413 = _make_413_error()
ok_resp = _mock_response(content="Recovered", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [err_413, ok_resp]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
mock_compress.return_value = (
[{"role": "user", "content": "hello"}], "compressed",
)
result = agent.run_conversation("hello", conversation_history=self._prefill())
mock_compress.assert_called_once()
assert result["completed"] is True
assert result.get("compaction_disabled") is not True