From 9c304a7f569ebf17efe120d5b61a3a745c6dc532 Mon Sep 17 00:00:00 2001
From: helix4u <4317663+helix4u@users.noreply.github.com>
Date: Fri, 15 May 2026 22:33:16 -0600
Subject: [PATCH] fix(agent): retry malformed anthropic stream parser errors
---
run_agent.py | 57 ++++++++++++++++++---
tests/run_agent/test_streaming.py | 83 ++++++++++++++++++++++++++++++-
2 files changed, 131 insertions(+), 9 deletions(-)
diff --git a/run_agent.py b/run_agent.py
index b3cde9eb1ea..88d5c95fcd8 100644
--- a/run_agent.py
+++ b/run_agent.py
@@ -3027,6 +3027,24 @@ class AIAgent:
parts.append(f"{type(e).__name__}({msg})" if msg else type(e).__name__)
return " <- ".join(parts) if parts else type(error).__name__
+ def _is_provider_stream_parse_error(self, error: BaseException) -> bool:
+ """Return True for malformed provider streaming data from SDK parsers.
+
+ Some Anthropic-compatible streaming providers can send a malformed
+ event-stream frame. The Anthropic SDK surfaces that as a plain
+ ``ValueError`` such as ``expected ident at line 1 column 149``. That
+ is provider wire-format trouble, not local request validation, so it
+ should follow the same retry path as a truncated JSON body.
+ """
+ if getattr(self, "api_mode", None) != "anthropic_messages":
+ return False
+ if not isinstance(error, ValueError):
+ return False
+ if isinstance(error, (UnicodeEncodeError, json.JSONDecodeError)):
+ return False
+ message = str(error).strip().lower()
+ return "expected ident at line" in message
+
def _log_stream_retry(
self,
*,
@@ -5080,6 +5098,12 @@ class AIAgent:
"""
raw = str(error)
+ if (
+ isinstance(error, ValueError)
+ and "expected ident at line" in raw.lower()
+ ):
+ return f"Malformed provider streaming response: {raw[:300]}"
+
# Cloudflare / proxy HTML pages: grab the
for a clean summary
if "]*>([^<]+)", raw, re.IGNORECASE)
@@ -8528,6 +8552,7 @@ class AIAgent:
_is_conn_err = isinstance(
e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError)
)
+ _is_stream_parse_err = self._is_provider_stream_parse_error(e)
# If the stream died AFTER some tokens were delivered:
# normally we don't retry (the user already saw text,
@@ -8567,7 +8592,10 @@ class AIAgent:
for phrase in _SSE_PREVIEW_PHRASES
)
_is_transient = (
- _is_timeout or _is_conn_err or _is_sse_conn_err_preview
+ _is_timeout
+ or _is_conn_err
+ or _is_sse_conn_err_preview
+ or _is_stream_parse_err
)
_can_silent_retry = (
_partial_tool_in_flight
@@ -8665,7 +8693,7 @@ class AIAgent:
for phrase in _SSE_CONN_PHRASES
)
- if _is_timeout or _is_conn_err or _is_sse_conn_err:
+ if _is_timeout or _is_conn_err or _is_sse_conn_err or _is_stream_parse_err:
# Transient network / timeout error. Retry the
# streaming request with a fresh connection first.
if _stream_attempt < _max_stream_retries:
@@ -8706,12 +8734,20 @@ class AIAgent:
mid_tool_call=False,
diag=request_client_holder.get("diag"),
)
- self._emit_status(
- "❌ Connection to provider failed after "
- f"{_max_stream_retries + 1} attempts. "
- "The provider may be experiencing issues — "
- "try again in a moment."
- )
+ if _is_stream_parse_err:
+ self._emit_status(
+ "❌ Provider returned malformed streaming data after "
+ f"{_max_stream_retries + 1} attempts. "
+ "The provider may be experiencing issues — "
+ "try again in a moment."
+ )
+ else:
+ self._emit_status(
+ "❌ Connection to provider failed after "
+ f"{_max_stream_retries + 1} attempts. "
+ "The provider may be experiencing issues — "
+ "try again in a moment."
+ )
else:
_err_lower = str(e).lower()
_is_stream_unsupported = (
@@ -14509,11 +14545,16 @@ class AIAgent:
# provider/network failure (malformed response body,
# truncated stream, routing layer corruption), not a
# local programming bug, and should be retried (#14782).
+ # Exclude Anthropic stream parser ValueErrors for the
+ # same reason: third-party Anthropic-compatible providers
+ # can emit malformed event-stream frames that SDK parsers
+ # raise as plain ValueError.
is_local_validation_error = (
isinstance(api_error, (ValueError, TypeError))
and not isinstance(
api_error, (UnicodeEncodeError, json.JSONDecodeError)
)
+ and not self._is_provider_stream_parse_error(api_error)
# ssl.SSLError (and its subclass SSLCertVerificationError)
# inherits from OSError *and* ValueError via Python MRO,
# so the isinstance(ValueError) check above would
diff --git a/tests/run_agent/test_streaming.py b/tests/run_agent/test_streaming.py
index e636498c462..1ce140f82bf 100644
--- a/tests/run_agent/test_streaming.py
+++ b/tests/run_agent/test_streaming.py
@@ -999,6 +999,88 @@ class TestAnthropicStreamCallbacks:
assert touch_calls.count("receiving stream response") == len(events)
+ @patch("run_agent.AIAgent._replace_primary_openai_client")
+ def test_anthropic_stream_parser_valueerror_retries_before_delivery(
+ self, mock_replace, monkeypatch,
+ ):
+ """Malformed Anthropic event-stream frames retry instead of surfacing HTTP None."""
+ from run_agent import AIAgent
+
+ agent = AIAgent(
+ api_key="test-key",
+ base_url="https://api.minimax.io/anthropic",
+ provider="minimax",
+ model="MiniMax-M2.7",
+ quiet_mode=True,
+ skip_context_files=True,
+ skip_memory=True,
+ )
+ agent.api_mode = "anthropic_messages"
+ agent._interrupt_requested = False
+ monkeypatch.setenv("HERMES_STREAM_RETRIES", "1")
+
+ class _BadStream:
+ response = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *_args):
+ return False
+
+ def __iter__(self):
+ raise ValueError("expected ident at line 1 column 149")
+
+ final_message = SimpleNamespace(content=[], stop_reason="end_turn")
+ good_stream = MagicMock()
+ good_stream.__enter__ = MagicMock(return_value=good_stream)
+ good_stream.__exit__ = MagicMock(return_value=False)
+ good_stream.__iter__ = MagicMock(return_value=iter([]))
+ good_stream.get_final_message.return_value = final_message
+
+ agent._anthropic_client = MagicMock()
+ agent._anthropic_client.messages.stream.side_effect = [
+ _BadStream(),
+ good_stream,
+ ]
+
+ response = agent._interruptible_streaming_api_call({})
+
+ assert response is final_message
+ assert agent._anthropic_client.messages.stream.call_count == 2
+ assert mock_replace.call_count == 1
+
+ @patch("run_agent.AIAgent._replace_primary_openai_client")
+ def test_generic_anthropic_valueerror_still_propagates_without_stream_retry(
+ self, mock_replace, monkeypatch,
+ ):
+ """Only known provider stream parser ValueErrors are treated as transient."""
+ from run_agent import AIAgent
+
+ agent = AIAgent(
+ api_key="test-key",
+ base_url="https://api.minimax.io/anthropic",
+ provider="minimax",
+ model="MiniMax-M2.7",
+ quiet_mode=True,
+ skip_context_files=True,
+ skip_memory=True,
+ )
+ agent.api_mode = "anthropic_messages"
+ agent._interrupt_requested = False
+ monkeypatch.setenv("HERMES_STREAM_RETRIES", "1")
+
+ agent._anthropic_client = MagicMock()
+ agent._anthropic_client.messages.stream.side_effect = ValueError(
+ "invalid local request shape"
+ )
+
+ with pytest.raises(ValueError, match="invalid local request shape"):
+ agent._interruptible_streaming_api_call({})
+
+ assert agent._anthropic_client.messages.stream.call_count == 1
+ assert mock_replace.call_count == 0
+
class TestPartialToolCallWarning:
"""Regression: when a stream dies mid tool-call argument generation after
@@ -1504,4 +1586,3 @@ class TestCopilotACPStreamingDecision:
_use_streaming = False
assert _use_streaming is True
-