fix(agent): retry malformed anthropic stream parser errors

This commit is contained in:
helix4u 2026-05-15 22:33:16 -06:00 committed by Teknium
parent 53637fb17d
commit 9c304a7f56
2 changed files with 131 additions and 9 deletions

View file

@ -3027,6 +3027,24 @@ class AIAgent:
parts.append(f"{type(e).__name__}({msg})" if msg else type(e).__name__)
return " <- ".join(parts) if parts else type(error).__name__
def _is_provider_stream_parse_error(self, error: BaseException) -> bool:
"""Return True for malformed provider streaming data from SDK parsers.
Some Anthropic-compatible streaming providers can send a malformed
event-stream frame. The Anthropic SDK surfaces that as a plain
``ValueError`` such as ``expected ident at line 1 column 149``. That
is provider wire-format trouble, not local request validation, so it
should follow the same retry path as a truncated JSON body.
"""
if getattr(self, "api_mode", None) != "anthropic_messages":
return False
if not isinstance(error, ValueError):
return False
if isinstance(error, (UnicodeEncodeError, json.JSONDecodeError)):
return False
message = str(error).strip().lower()
return "expected ident at line" in message
def _log_stream_retry(
self,
*,
@ -5080,6 +5098,12 @@ class AIAgent:
"""
raw = str(error)
if (
isinstance(error, ValueError)
and "expected ident at line" in raw.lower()
):
return f"Malformed provider streaming response: {raw[:300]}"
# Cloudflare / proxy HTML pages: grab the <title> for a clean summary
if "<!DOCTYPE" in raw or "<html" in raw:
m = re.search(r"<title[^>]*>([^<]+)</title>", raw, re.IGNORECASE)
@ -8528,6 +8552,7 @@ class AIAgent:
_is_conn_err = isinstance(
e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError)
)
_is_stream_parse_err = self._is_provider_stream_parse_error(e)
# If the stream died AFTER some tokens were delivered:
# normally we don't retry (the user already saw text,
@ -8567,7 +8592,10 @@ class AIAgent:
for phrase in _SSE_PREVIEW_PHRASES
)
_is_transient = (
_is_timeout or _is_conn_err or _is_sse_conn_err_preview
_is_timeout
or _is_conn_err
or _is_sse_conn_err_preview
or _is_stream_parse_err
)
_can_silent_retry = (
_partial_tool_in_flight
@ -8665,7 +8693,7 @@ class AIAgent:
for phrase in _SSE_CONN_PHRASES
)
if _is_timeout or _is_conn_err or _is_sse_conn_err:
if _is_timeout or _is_conn_err or _is_sse_conn_err or _is_stream_parse_err:
# Transient network / timeout error. Retry the
# streaming request with a fresh connection first.
if _stream_attempt < _max_stream_retries:
@ -8706,12 +8734,20 @@ class AIAgent:
mid_tool_call=False,
diag=request_client_holder.get("diag"),
)
self._emit_status(
"❌ Connection to provider failed after "
f"{_max_stream_retries + 1} attempts. "
"The provider may be experiencing issues — "
"try again in a moment."
)
if _is_stream_parse_err:
self._emit_status(
"❌ Provider returned malformed streaming data after "
f"{_max_stream_retries + 1} attempts. "
"The provider may be experiencing issues — "
"try again in a moment."
)
else:
self._emit_status(
"❌ Connection to provider failed after "
f"{_max_stream_retries + 1} attempts. "
"The provider may be experiencing issues — "
"try again in a moment."
)
else:
_err_lower = str(e).lower()
_is_stream_unsupported = (
@ -14509,11 +14545,16 @@ class AIAgent:
# provider/network failure (malformed response body,
# truncated stream, routing layer corruption), not a
# local programming bug, and should be retried (#14782).
# Exclude Anthropic stream parser ValueErrors for the
# same reason: third-party Anthropic-compatible providers
# can emit malformed event-stream frames that SDK parsers
# raise as plain ValueError.
is_local_validation_error = (
isinstance(api_error, (ValueError, TypeError))
and not isinstance(
api_error, (UnicodeEncodeError, json.JSONDecodeError)
)
and not self._is_provider_stream_parse_error(api_error)
# ssl.SSLError (and its subclass SSLCertVerificationError)
# inherits from OSError *and* ValueError via Python MRO,
# so the isinstance(ValueError) check above would

View file

@ -999,6 +999,88 @@ class TestAnthropicStreamCallbacks:
assert touch_calls.count("receiving stream response") == len(events)
@patch("run_agent.AIAgent._replace_primary_openai_client")
def test_anthropic_stream_parser_valueerror_retries_before_delivery(
self, mock_replace, monkeypatch,
):
"""Malformed Anthropic event-stream frames retry instead of surfacing HTTP None."""
from run_agent import AIAgent
agent = AIAgent(
api_key="test-key",
base_url="https://api.minimax.io/anthropic",
provider="minimax",
model="MiniMax-M2.7",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
agent.api_mode = "anthropic_messages"
agent._interrupt_requested = False
monkeypatch.setenv("HERMES_STREAM_RETRIES", "1")
class _BadStream:
response = None
def __enter__(self):
return self
def __exit__(self, *_args):
return False
def __iter__(self):
raise ValueError("expected ident at line 1 column 149")
final_message = SimpleNamespace(content=[], stop_reason="end_turn")
good_stream = MagicMock()
good_stream.__enter__ = MagicMock(return_value=good_stream)
good_stream.__exit__ = MagicMock(return_value=False)
good_stream.__iter__ = MagicMock(return_value=iter([]))
good_stream.get_final_message.return_value = final_message
agent._anthropic_client = MagicMock()
agent._anthropic_client.messages.stream.side_effect = [
_BadStream(),
good_stream,
]
response = agent._interruptible_streaming_api_call({})
assert response is final_message
assert agent._anthropic_client.messages.stream.call_count == 2
assert mock_replace.call_count == 1
@patch("run_agent.AIAgent._replace_primary_openai_client")
def test_generic_anthropic_valueerror_still_propagates_without_stream_retry(
self, mock_replace, monkeypatch,
):
"""Only known provider stream parser ValueErrors are treated as transient."""
from run_agent import AIAgent
agent = AIAgent(
api_key="test-key",
base_url="https://api.minimax.io/anthropic",
provider="minimax",
model="MiniMax-M2.7",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
agent.api_mode = "anthropic_messages"
agent._interrupt_requested = False
monkeypatch.setenv("HERMES_STREAM_RETRIES", "1")
agent._anthropic_client = MagicMock()
agent._anthropic_client.messages.stream.side_effect = ValueError(
"invalid local request shape"
)
with pytest.raises(ValueError, match="invalid local request shape"):
agent._interruptible_streaming_api_call({})
assert agent._anthropic_client.messages.stream.call_count == 1
assert mock_replace.call_count == 0
class TestPartialToolCallWarning:
"""Regression: when a stream dies mid tool-call argument generation after
@ -1504,4 +1586,3 @@ class TestCopilotACPStreamingDecision:
_use_streaming = False
assert _use_streaming is True