mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-25 05:52:34 +00:00
fix(context_compressor): treat streaming premature-close as transient error
Problem: When a provider or proxy drops a streaming response mid-flight (httpcore raises RemoteProtocolError: "incomplete chunked read", "peer closed connection", "response ended prematurely", etc.), _generate_summary would not classify it as a transient error. Instead of retrying on the main model, it entered the generic 60-second cooldown, leaving context growing unbounded until the cooldown expired. Issue #18458. Root cause: _is_connection_error in auxiliary_client.py did not match httpcore's streaming premature-close error substrings. context_compressor.py's _generate_summary except block never called _is_connection_error, so those errors fell through to the 60-second generic cooldown rather than triggering the retry-on-main fallback path used for timeouts. Fix: 1. auxiliary_client.py — extend _is_connection_error keyword list with: "incomplete chunked read", "peer closed connection", "response ended prematurely", "unexpected eof", "remoteprotocolerror", "localprotocolerror". Also guard the `from openai import ...` with try/except ImportError so the function works in environments without the openai package. 2. context_compressor.py — import _is_connection_error and call it in _generate_summary's except block as _is_streaming_closed. Include _is_streaming_closed in the fallback-to-main condition (alongside _is_model_not_found, _is_timeout, _is_json_decode) and use the shorter 30s transient cooldown for streaming-closed errors. Tests: 4 new regression tests in TestStreamingClosedFallback: - test_incomplete_chunked_read_falls_back_to_main - test_peer_closed_connection_falls_back_to_main - test_streaming_closed_on_main_uses_short_cooldown (stash-verified) - test_non_streaming_unknown_error_still_uses_long_cooldown Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
parent
0c5c4d1b8d
commit
35f773c459
3 changed files with 157 additions and 10 deletions
|
|
@ -1840,10 +1840,12 @@ def _is_connection_error(exc: Exception) -> bool:
|
||||||
distinct from API errors (4xx/5xx) which indicate the provider IS
|
distinct from API errors (4xx/5xx) which indicate the provider IS
|
||||||
reachable but returned an error.
|
reachable but returned an error.
|
||||||
"""
|
"""
|
||||||
from openai import APIConnectionError, APITimeoutError
|
try:
|
||||||
|
from openai import APIConnectionError, APITimeoutError
|
||||||
if isinstance(exc, (APIConnectionError, APITimeoutError)):
|
if isinstance(exc, (APIConnectionError, APITimeoutError)):
|
||||||
return True
|
return True
|
||||||
|
except ImportError:
|
||||||
|
pass
|
||||||
# urllib3 / httpx / httpcore connection errors
|
# urllib3 / httpx / httpcore connection errors
|
||||||
err_type = type(exc).__name__
|
err_type = type(exc).__name__
|
||||||
if any(kw in err_type for kw in ("Connection", "Timeout", "DNS", "SSL")):
|
if any(kw in err_type for kw in ("Connection", "Timeout", "DNS", "SSL")):
|
||||||
|
|
@ -1853,6 +1855,16 @@ def _is_connection_error(exc: Exception) -> bool:
|
||||||
"connection refused", "name or service not known",
|
"connection refused", "name or service not known",
|
||||||
"no route to host", "network is unreachable",
|
"no route to host", "network is unreachable",
|
||||||
"timed out", "connection reset",
|
"timed out", "connection reset",
|
||||||
|
# httpcore / httpx streaming premature-close errors. These surface
|
||||||
|
# when a proxy or provider drops the connection mid-stream and are
|
||||||
|
# transient by nature — the request should be retried or rerouted.
|
||||||
|
# See issue #18458.
|
||||||
|
"incomplete chunked read",
|
||||||
|
"peer closed connection",
|
||||||
|
"response ended prematurely",
|
||||||
|
"unexpected eof",
|
||||||
|
"remoteprotocolerror",
|
||||||
|
"localprotocolerror",
|
||||||
)):
|
)):
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ import re
|
||||||
import time
|
import time
|
||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
from agent.auxiliary_client import call_llm
|
from agent.auxiliary_client import call_llm, _is_connection_error
|
||||||
from agent.context_engine import ContextEngine
|
from agent.context_engine import ContextEngine
|
||||||
from agent.model_metadata import (
|
from agent.model_metadata import (
|
||||||
MINIMUM_CONTEXT_LENGTH,
|
MINIMUM_CONTEXT_LENGTH,
|
||||||
|
|
@ -1000,6 +1000,14 @@ The user has requested that this compaction PRIORITISE preserving all informatio
|
||||||
isinstance(e, json.JSONDecodeError)
|
isinstance(e, json.JSONDecodeError)
|
||||||
or "expecting value" in _err_str
|
or "expecting value" in _err_str
|
||||||
)
|
)
|
||||||
|
# httpcore / httpx streaming premature-close errors surface as
|
||||||
|
# ConnectionError subclasses or plain Exception with characteristic
|
||||||
|
# substrings ("incomplete chunked read", "peer closed connection",
|
||||||
|
# "response ended prematurely", "unexpected eof"). These are
|
||||||
|
# transient network events; treat them like a timeout so we fall
|
||||||
|
# back to the main model instead of entering a 60-second cooldown.
|
||||||
|
# See issue #18458.
|
||||||
|
_is_streaming_closed = _is_connection_error(e)
|
||||||
if _is_json_decode and not _is_model_not_found and not _is_timeout:
|
if _is_json_decode and not _is_model_not_found and not _is_timeout:
|
||||||
logger.error(
|
logger.error(
|
||||||
"Context compression failed: auxiliary LLM returned a "
|
"Context compression failed: auxiliary LLM returned a "
|
||||||
|
|
@ -1012,7 +1020,7 @@ The user has requested that this compaction PRIORITISE preserving all informatio
|
||||||
e,
|
e,
|
||||||
)
|
)
|
||||||
if (
|
if (
|
||||||
(_is_model_not_found or _is_timeout or _is_json_decode)
|
(_is_model_not_found or _is_timeout or _is_json_decode or _is_streaming_closed)
|
||||||
and self.summary_model
|
and self.summary_model
|
||||||
and self.summary_model != self.model
|
and self.summary_model != self.model
|
||||||
and not getattr(self, "_summary_model_fallen_back", False)
|
and not getattr(self, "_summary_model_fallen_back", False)
|
||||||
|
|
@ -1021,6 +1029,8 @@ The user has requested that this compaction PRIORITISE preserving all informatio
|
||||||
_reason = "returned invalid JSON"
|
_reason = "returned invalid JSON"
|
||||||
elif _is_model_not_found:
|
elif _is_model_not_found:
|
||||||
_reason = "unavailable"
|
_reason = "unavailable"
|
||||||
|
elif _is_streaming_closed:
|
||||||
|
_reason = "closed stream prematurely"
|
||||||
else:
|
else:
|
||||||
_reason = "timed out"
|
_reason = "timed out"
|
||||||
self._fallback_to_main_for_compression(e, _reason)
|
self._fallback_to_main_for_compression(e, _reason)
|
||||||
|
|
@ -1043,10 +1053,10 @@ The user has requested that this compaction PRIORITISE preserving all informatio
|
||||||
self._fallback_to_main_for_compression(e, "failed")
|
self._fallback_to_main_for_compression(e, "failed")
|
||||||
return self._generate_summary(turns_to_summarize, focus_topic=focus_topic)
|
return self._generate_summary(turns_to_summarize, focus_topic=focus_topic)
|
||||||
|
|
||||||
# Transient errors (timeout, rate limit, network, JSON decode) —
|
# Transient errors (timeout, rate limit, network, JSON decode,
|
||||||
# shorter cooldown for JSON decode since the body shape can flip
|
# streaming premature-close) — shorter cooldown for JSON decode and
|
||||||
# back to valid quickly when an upstream proxy recovers.
|
# streaming-closed since those conditions can self-resolve quickly.
|
||||||
_transient_cooldown = 30 if _is_json_decode else 60
|
_transient_cooldown = 30 if (_is_json_decode or _is_streaming_closed) else 60
|
||||||
self._summary_failure_cooldown_until = time.monotonic() + _transient_cooldown
|
self._summary_failure_cooldown_until = time.monotonic() + _transient_cooldown
|
||||||
err_text = str(e).strip() or e.__class__.__name__
|
err_text = str(e).strip() or e.__class__.__name__
|
||||||
if len(err_text) > 220:
|
if len(err_text) > 220:
|
||||||
|
|
|
||||||
|
|
@ -499,6 +499,131 @@ class TestSummaryFallbackToMainModel:
|
||||||
assert c._summary_failure_cooldown_until == 1030.0
|
assert c._summary_failure_cooldown_until == 1030.0
|
||||||
|
|
||||||
|
|
||||||
|
class TestStreamingClosedFallback:
|
||||||
|
"""httpcore / httpx streaming premature-close errors must be classified the
|
||||||
|
same as timeouts so the compressor retries on the main model instead of
|
||||||
|
entering a 60-second cooldown. Issue #18458.
|
||||||
|
|
||||||
|
``_is_connection_error`` is patched here because the test venv may not
|
||||||
|
have ``openai`` installed (the real function does ``from openai import ...``
|
||||||
|
inside its body). We test the *wiring* — that `_generate_summary` calls
|
||||||
|
``_is_connection_error`` and acts on its result — not the classifier itself
|
||||||
|
(that's covered in ``test_auxiliary_client.py::TestIsConnectionError``).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _msgs(self):
|
||||||
|
return [
|
||||||
|
{"role": "user", "content": "do something"},
|
||||||
|
{"role": "assistant", "content": "ok"},
|
||||||
|
]
|
||||||
|
|
||||||
|
def test_incomplete_chunked_read_falls_back_to_main(self):
|
||||||
|
"""``httpcore.RemoteProtocolError: incomplete chunked read`` triggers
|
||||||
|
the retry-on-main path when ``_is_connection_error`` returns True."""
|
||||||
|
mock_ok = MagicMock()
|
||||||
|
mock_ok.choices = [MagicMock()]
|
||||||
|
mock_ok.choices[0].message.content = "summary via main model"
|
||||||
|
|
||||||
|
err = Exception("RemoteProtocolError: incomplete chunked read")
|
||||||
|
|
||||||
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
||||||
|
c = ContextCompressor(
|
||||||
|
model="main-model",
|
||||||
|
summary_model_override="aux-stream-model",
|
||||||
|
quiet_mode=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"agent.context_compressor.call_llm",
|
||||||
|
side_effect=[err, mock_ok],
|
||||||
|
) as mock_call, patch(
|
||||||
|
"agent.context_compressor._is_connection_error",
|
||||||
|
return_value=True,
|
||||||
|
):
|
||||||
|
result = c._generate_summary(self._msgs())
|
||||||
|
|
||||||
|
assert mock_call.call_count == 2
|
||||||
|
assert mock_call.call_args_list[0].kwargs.get("model") == "aux-stream-model"
|
||||||
|
assert "model" not in mock_call.call_args_list[1].kwargs
|
||||||
|
assert result is not None
|
||||||
|
assert "summary via main model" in result
|
||||||
|
|
||||||
|
def test_peer_closed_connection_falls_back_to_main(self):
|
||||||
|
"""``peer closed connection`` triggers the retry-on-main path."""
|
||||||
|
mock_ok = MagicMock()
|
||||||
|
mock_ok.choices = [MagicMock()]
|
||||||
|
mock_ok.choices[0].message.content = "summary ok"
|
||||||
|
|
||||||
|
err = Exception("peer closed connection without sending complete message body")
|
||||||
|
|
||||||
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
||||||
|
c = ContextCompressor(
|
||||||
|
model="main-model",
|
||||||
|
summary_model_override="aux-model",
|
||||||
|
quiet_mode=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"agent.context_compressor.call_llm",
|
||||||
|
side_effect=[err, mock_ok],
|
||||||
|
) as mock_call, patch(
|
||||||
|
"agent.context_compressor._is_connection_error",
|
||||||
|
return_value=True,
|
||||||
|
):
|
||||||
|
result = c._generate_summary(self._msgs())
|
||||||
|
|
||||||
|
assert mock_call.call_count == 2
|
||||||
|
assert result is not None
|
||||||
|
|
||||||
|
def test_streaming_closed_on_main_uses_short_cooldown(self):
|
||||||
|
"""When already on the main model, a streaming-closed error should use
|
||||||
|
the 30s cooldown, not the default 60s — these errors are transient."""
|
||||||
|
err = Exception("RemoteProtocolError: response ended prematurely")
|
||||||
|
|
||||||
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
||||||
|
c = ContextCompressor(
|
||||||
|
model="main-model",
|
||||||
|
# No summary_model_override → no fallback path.
|
||||||
|
quiet_mode=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"agent.context_compressor.call_llm",
|
||||||
|
side_effect=err,
|
||||||
|
), patch(
|
||||||
|
"agent.context_compressor._is_connection_error",
|
||||||
|
return_value=True,
|
||||||
|
), patch("agent.context_compressor.time.monotonic", return_value=1000.0):
|
||||||
|
result = c._generate_summary(self._msgs())
|
||||||
|
|
||||||
|
assert result is None
|
||||||
|
# Streaming-closed should use the 30s short cooldown.
|
||||||
|
assert c._summary_failure_cooldown_until == 1030.0
|
||||||
|
|
||||||
|
def test_non_streaming_unknown_error_still_uses_long_cooldown(self):
|
||||||
|
"""Unclassified errors should retain the 60s default cooldown to
|
||||||
|
prevent hammering a broken provider."""
|
||||||
|
err = Exception("Internal Server Error: something unexpected happened")
|
||||||
|
|
||||||
|
with patch("agent.context_compressor.get_model_context_length", return_value=100000):
|
||||||
|
c = ContextCompressor(
|
||||||
|
model="main-model",
|
||||||
|
quiet_mode=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
with patch(
|
||||||
|
"agent.context_compressor.call_llm",
|
||||||
|
side_effect=err,
|
||||||
|
), patch(
|
||||||
|
"agent.context_compressor._is_connection_error",
|
||||||
|
return_value=False,
|
||||||
|
), patch("agent.context_compressor.time.monotonic", return_value=1000.0):
|
||||||
|
result = c._generate_summary(self._msgs())
|
||||||
|
|
||||||
|
assert result is None
|
||||||
|
assert c._summary_failure_cooldown_until == 1060.0
|
||||||
|
|
||||||
|
|
||||||
class TestAuxModelFallbackSurfacedToCallers:
|
class TestAuxModelFallbackSurfacedToCallers:
|
||||||
"""When summary_model fails but retry-on-main succeeds, compress() must
|
"""When summary_model fails but retry-on-main succeeds, compress() must
|
||||||
expose the aux-model failure via _last_aux_model_failure_{model,error}
|
expose the aux-model failure via _last_aux_model_failure_{model,error}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue