From 35f773c459a3602f253311328809363463f181d5 Mon Sep 17 00:00:00 2001 From: Wesley Simplicio Date: Sat, 9 May 2026 12:33:23 -0300 Subject: [PATCH] fix(context_compressor): treat streaming premature-close as transient error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Problem: When a provider or proxy drops a streaming response mid-flight (httpcore raises RemoteProtocolError: "incomplete chunked read", "peer closed connection", "response ended prematurely", etc.), _generate_summary would not classify it as a transient error. Instead of retrying on the main model, it entered the generic 60-second cooldown, leaving context growing unbounded until the cooldown expired. Issue #18458. Root cause: _is_connection_error in auxiliary_client.py did not match httpcore's streaming premature-close error substrings. context_compressor.py's _generate_summary except block never called _is_connection_error, so those errors fell through to the 60-second generic cooldown rather than triggering the retry-on-main fallback path used for timeouts. Fix: 1. auxiliary_client.py — extend _is_connection_error keyword list with: "incomplete chunked read", "peer closed connection", "response ended prematurely", "unexpected eof", "remoteprotocolerror", "localprotocolerror". Also guard the `from openai import ...` with try/except ImportError so the function works in environments without the openai package. 2. context_compressor.py — import _is_connection_error and call it in _generate_summary's except block as _is_streaming_closed. Include _is_streaming_closed in the fallback-to-main condition (alongside _is_model_not_found, _is_timeout, _is_json_decode) and use the shorter 30s transient cooldown for streaming-closed errors. Tests: 4 new regression tests in TestStreamingClosedFallback: - test_incomplete_chunked_read_falls_back_to_main - test_peer_closed_connection_falls_back_to_main - test_streaming_closed_on_main_uses_short_cooldown (stash-verified) - test_non_streaming_unknown_error_still_uses_long_cooldown Co-Authored-By: Claude Opus 4.7 --- agent/auxiliary_client.py | 20 +++- agent/context_compressor.py | 22 +++-- tests/agent/test_context_compressor.py | 125 +++++++++++++++++++++++++ 3 files changed, 157 insertions(+), 10 deletions(-) diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index 3eefab1632b..874b7d7b963 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -1840,10 +1840,12 @@ def _is_connection_error(exc: Exception) -> bool: distinct from API errors (4xx/5xx) which indicate the provider IS reachable but returned an error. """ - from openai import APIConnectionError, APITimeoutError - - if isinstance(exc, (APIConnectionError, APITimeoutError)): - return True + try: + from openai import APIConnectionError, APITimeoutError + if isinstance(exc, (APIConnectionError, APITimeoutError)): + return True + except ImportError: + pass # urllib3 / httpx / httpcore connection errors err_type = type(exc).__name__ if any(kw in err_type for kw in ("Connection", "Timeout", "DNS", "SSL")): @@ -1853,6 +1855,16 @@ def _is_connection_error(exc: Exception) -> bool: "connection refused", "name or service not known", "no route to host", "network is unreachable", "timed out", "connection reset", + # httpcore / httpx streaming premature-close errors. These surface + # when a proxy or provider drops the connection mid-stream and are + # transient by nature — the request should be retried or rerouted. + # See issue #18458. + "incomplete chunked read", + "peer closed connection", + "response ended prematurely", + "unexpected eof", + "remoteprotocolerror", + "localprotocolerror", )): return True return False diff --git a/agent/context_compressor.py b/agent/context_compressor.py index 5f0792be882..885b0ca7895 100644 --- a/agent/context_compressor.py +++ b/agent/context_compressor.py @@ -23,7 +23,7 @@ import re import time from typing import Any, Dict, List, Optional -from agent.auxiliary_client import call_llm +from agent.auxiliary_client import call_llm, _is_connection_error from agent.context_engine import ContextEngine from agent.model_metadata import ( MINIMUM_CONTEXT_LENGTH, @@ -1000,6 +1000,14 @@ The user has requested that this compaction PRIORITISE preserving all informatio isinstance(e, json.JSONDecodeError) or "expecting value" in _err_str ) + # httpcore / httpx streaming premature-close errors surface as + # ConnectionError subclasses or plain Exception with characteristic + # substrings ("incomplete chunked read", "peer closed connection", + # "response ended prematurely", "unexpected eof"). These are + # transient network events; treat them like a timeout so we fall + # back to the main model instead of entering a 60-second cooldown. + # See issue #18458. + _is_streaming_closed = _is_connection_error(e) if _is_json_decode and not _is_model_not_found and not _is_timeout: logger.error( "Context compression failed: auxiliary LLM returned a " @@ -1012,7 +1020,7 @@ The user has requested that this compaction PRIORITISE preserving all informatio e, ) if ( - (_is_model_not_found or _is_timeout or _is_json_decode) + (_is_model_not_found or _is_timeout or _is_json_decode or _is_streaming_closed) and self.summary_model and self.summary_model != self.model and not getattr(self, "_summary_model_fallen_back", False) @@ -1021,6 +1029,8 @@ The user has requested that this compaction PRIORITISE preserving all informatio _reason = "returned invalid JSON" elif _is_model_not_found: _reason = "unavailable" + elif _is_streaming_closed: + _reason = "closed stream prematurely" else: _reason = "timed out" self._fallback_to_main_for_compression(e, _reason) @@ -1043,10 +1053,10 @@ The user has requested that this compaction PRIORITISE preserving all informatio self._fallback_to_main_for_compression(e, "failed") return self._generate_summary(turns_to_summarize, focus_topic=focus_topic) - # Transient errors (timeout, rate limit, network, JSON decode) — - # shorter cooldown for JSON decode since the body shape can flip - # back to valid quickly when an upstream proxy recovers. - _transient_cooldown = 30 if _is_json_decode else 60 + # Transient errors (timeout, rate limit, network, JSON decode, + # streaming premature-close) — shorter cooldown for JSON decode and + # streaming-closed since those conditions can self-resolve quickly. + _transient_cooldown = 30 if (_is_json_decode or _is_streaming_closed) else 60 self._summary_failure_cooldown_until = time.monotonic() + _transient_cooldown err_text = str(e).strip() or e.__class__.__name__ if len(err_text) > 220: diff --git a/tests/agent/test_context_compressor.py b/tests/agent/test_context_compressor.py index 7817930851e..97a7c7b3d0f 100644 --- a/tests/agent/test_context_compressor.py +++ b/tests/agent/test_context_compressor.py @@ -499,6 +499,131 @@ class TestSummaryFallbackToMainModel: assert c._summary_failure_cooldown_until == 1030.0 +class TestStreamingClosedFallback: + """httpcore / httpx streaming premature-close errors must be classified the + same as timeouts so the compressor retries on the main model instead of + entering a 60-second cooldown. Issue #18458. + + ``_is_connection_error`` is patched here because the test venv may not + have ``openai`` installed (the real function does ``from openai import ...`` + inside its body). We test the *wiring* — that `_generate_summary` calls + ``_is_connection_error`` and acts on its result — not the classifier itself + (that's covered in ``test_auxiliary_client.py::TestIsConnectionError``). + """ + + def _msgs(self): + return [ + {"role": "user", "content": "do something"}, + {"role": "assistant", "content": "ok"}, + ] + + def test_incomplete_chunked_read_falls_back_to_main(self): + """``httpcore.RemoteProtocolError: incomplete chunked read`` triggers + the retry-on-main path when ``_is_connection_error`` returns True.""" + mock_ok = MagicMock() + mock_ok.choices = [MagicMock()] + mock_ok.choices[0].message.content = "summary via main model" + + err = Exception("RemoteProtocolError: incomplete chunked read") + + with patch("agent.context_compressor.get_model_context_length", return_value=100000): + c = ContextCompressor( + model="main-model", + summary_model_override="aux-stream-model", + quiet_mode=True, + ) + + with patch( + "agent.context_compressor.call_llm", + side_effect=[err, mock_ok], + ) as mock_call, patch( + "agent.context_compressor._is_connection_error", + return_value=True, + ): + result = c._generate_summary(self._msgs()) + + assert mock_call.call_count == 2 + assert mock_call.call_args_list[0].kwargs.get("model") == "aux-stream-model" + assert "model" not in mock_call.call_args_list[1].kwargs + assert result is not None + assert "summary via main model" in result + + def test_peer_closed_connection_falls_back_to_main(self): + """``peer closed connection`` triggers the retry-on-main path.""" + mock_ok = MagicMock() + mock_ok.choices = [MagicMock()] + mock_ok.choices[0].message.content = "summary ok" + + err = Exception("peer closed connection without sending complete message body") + + with patch("agent.context_compressor.get_model_context_length", return_value=100000): + c = ContextCompressor( + model="main-model", + summary_model_override="aux-model", + quiet_mode=True, + ) + + with patch( + "agent.context_compressor.call_llm", + side_effect=[err, mock_ok], + ) as mock_call, patch( + "agent.context_compressor._is_connection_error", + return_value=True, + ): + result = c._generate_summary(self._msgs()) + + assert mock_call.call_count == 2 + assert result is not None + + def test_streaming_closed_on_main_uses_short_cooldown(self): + """When already on the main model, a streaming-closed error should use + the 30s cooldown, not the default 60s — these errors are transient.""" + err = Exception("RemoteProtocolError: response ended prematurely") + + with patch("agent.context_compressor.get_model_context_length", return_value=100000): + c = ContextCompressor( + model="main-model", + # No summary_model_override → no fallback path. + quiet_mode=True, + ) + + with patch( + "agent.context_compressor.call_llm", + side_effect=err, + ), patch( + "agent.context_compressor._is_connection_error", + return_value=True, + ), patch("agent.context_compressor.time.monotonic", return_value=1000.0): + result = c._generate_summary(self._msgs()) + + assert result is None + # Streaming-closed should use the 30s short cooldown. + assert c._summary_failure_cooldown_until == 1030.0 + + def test_non_streaming_unknown_error_still_uses_long_cooldown(self): + """Unclassified errors should retain the 60s default cooldown to + prevent hammering a broken provider.""" + err = Exception("Internal Server Error: something unexpected happened") + + with patch("agent.context_compressor.get_model_context_length", return_value=100000): + c = ContextCompressor( + model="main-model", + quiet_mode=True, + ) + + with patch( + "agent.context_compressor.call_llm", + side_effect=err, + ), patch( + "agent.context_compressor._is_connection_error", + return_value=False, + ), patch("agent.context_compressor.time.monotonic", return_value=1000.0): + result = c._generate_summary(self._msgs()) + + assert result is None + assert c._summary_failure_cooldown_until == 1060.0 + + class TestAuxModelFallbackSurfacedToCallers: """When summary_model fails but retry-on-main succeeds, compress() must expose the aux-model failure via _last_aux_model_failure_{model,error}