diff --git a/run_agent.py b/run_agent.py index b3cde9eb1ea..88d5c95fcd8 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3027,6 +3027,24 @@ class AIAgent: parts.append(f"{type(e).__name__}({msg})" if msg else type(e).__name__) return " <- ".join(parts) if parts else type(error).__name__ + def _is_provider_stream_parse_error(self, error: BaseException) -> bool: + """Return True for malformed provider streaming data from SDK parsers. + + Some Anthropic-compatible streaming providers can send a malformed + event-stream frame. The Anthropic SDK surfaces that as a plain + ``ValueError`` such as ``expected ident at line 1 column 149``. That + is provider wire-format trouble, not local request validation, so it + should follow the same retry path as a truncated JSON body. + """ + if getattr(self, "api_mode", None) != "anthropic_messages": + return False + if not isinstance(error, ValueError): + return False + if isinstance(error, (UnicodeEncodeError, json.JSONDecodeError)): + return False + message = str(error).strip().lower() + return "expected ident at line" in message + def _log_stream_retry( self, *, @@ -5080,6 +5098,12 @@ class AIAgent: """ raw = str(error) + if ( + isinstance(error, ValueError) + and "expected ident at line" in raw.lower() + ): + return f"Malformed provider streaming response: {raw[:300]}" + # Cloudflare / proxy HTML pages: grab the for a clean summary if "<!DOCTYPE" in raw or "<html" in raw: m = re.search(r"<title[^>]*>([^<]+)", raw, re.IGNORECASE) @@ -8528,6 +8552,7 @@ class AIAgent: _is_conn_err = isinstance( e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError) ) + _is_stream_parse_err = self._is_provider_stream_parse_error(e) # If the stream died AFTER some tokens were delivered: # normally we don't retry (the user already saw text, @@ -8567,7 +8592,10 @@ class AIAgent: for phrase in _SSE_PREVIEW_PHRASES ) _is_transient = ( - _is_timeout or _is_conn_err or _is_sse_conn_err_preview + _is_timeout + or _is_conn_err + or _is_sse_conn_err_preview + or _is_stream_parse_err ) _can_silent_retry = ( _partial_tool_in_flight @@ -8665,7 +8693,7 @@ class AIAgent: for phrase in _SSE_CONN_PHRASES ) - if _is_timeout or _is_conn_err or _is_sse_conn_err: + if _is_timeout or _is_conn_err or _is_sse_conn_err or _is_stream_parse_err: # Transient network / timeout error. Retry the # streaming request with a fresh connection first. if _stream_attempt < _max_stream_retries: @@ -8706,12 +8734,20 @@ class AIAgent: mid_tool_call=False, diag=request_client_holder.get("diag"), ) - self._emit_status( - "❌ Connection to provider failed after " - f"{_max_stream_retries + 1} attempts. " - "The provider may be experiencing issues — " - "try again in a moment." - ) + if _is_stream_parse_err: + self._emit_status( + "❌ Provider returned malformed streaming data after " + f"{_max_stream_retries + 1} attempts. " + "The provider may be experiencing issues — " + "try again in a moment." + ) + else: + self._emit_status( + "❌ Connection to provider failed after " + f"{_max_stream_retries + 1} attempts. " + "The provider may be experiencing issues — " + "try again in a moment." + ) else: _err_lower = str(e).lower() _is_stream_unsupported = ( @@ -14509,11 +14545,16 @@ class AIAgent: # provider/network failure (malformed response body, # truncated stream, routing layer corruption), not a # local programming bug, and should be retried (#14782). + # Exclude Anthropic stream parser ValueErrors for the + # same reason: third-party Anthropic-compatible providers + # can emit malformed event-stream frames that SDK parsers + # raise as plain ValueError. is_local_validation_error = ( isinstance(api_error, (ValueError, TypeError)) and not isinstance( api_error, (UnicodeEncodeError, json.JSONDecodeError) ) + and not self._is_provider_stream_parse_error(api_error) # ssl.SSLError (and its subclass SSLCertVerificationError) # inherits from OSError *and* ValueError via Python MRO, # so the isinstance(ValueError) check above would diff --git a/tests/run_agent/test_streaming.py b/tests/run_agent/test_streaming.py index e636498c462..1ce140f82bf 100644 --- a/tests/run_agent/test_streaming.py +++ b/tests/run_agent/test_streaming.py @@ -999,6 +999,88 @@ class TestAnthropicStreamCallbacks: assert touch_calls.count("receiving stream response") == len(events) + @patch("run_agent.AIAgent._replace_primary_openai_client") + def test_anthropic_stream_parser_valueerror_retries_before_delivery( + self, mock_replace, monkeypatch, + ): + """Malformed Anthropic event-stream frames retry instead of surfacing HTTP None.""" + from run_agent import AIAgent + + agent = AIAgent( + api_key="test-key", + base_url="https://api.minimax.io/anthropic", + provider="minimax", + model="MiniMax-M2.7", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "anthropic_messages" + agent._interrupt_requested = False + monkeypatch.setenv("HERMES_STREAM_RETRIES", "1") + + class _BadStream: + response = None + + def __enter__(self): + return self + + def __exit__(self, *_args): + return False + + def __iter__(self): + raise ValueError("expected ident at line 1 column 149") + + final_message = SimpleNamespace(content=[], stop_reason="end_turn") + good_stream = MagicMock() + good_stream.__enter__ = MagicMock(return_value=good_stream) + good_stream.__exit__ = MagicMock(return_value=False) + good_stream.__iter__ = MagicMock(return_value=iter([])) + good_stream.get_final_message.return_value = final_message + + agent._anthropic_client = MagicMock() + agent._anthropic_client.messages.stream.side_effect = [ + _BadStream(), + good_stream, + ] + + response = agent._interruptible_streaming_api_call({}) + + assert response is final_message + assert agent._anthropic_client.messages.stream.call_count == 2 + assert mock_replace.call_count == 1 + + @patch("run_agent.AIAgent._replace_primary_openai_client") + def test_generic_anthropic_valueerror_still_propagates_without_stream_retry( + self, mock_replace, monkeypatch, + ): + """Only known provider stream parser ValueErrors are treated as transient.""" + from run_agent import AIAgent + + agent = AIAgent( + api_key="test-key", + base_url="https://api.minimax.io/anthropic", + provider="minimax", + model="MiniMax-M2.7", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "anthropic_messages" + agent._interrupt_requested = False + monkeypatch.setenv("HERMES_STREAM_RETRIES", "1") + + agent._anthropic_client = MagicMock() + agent._anthropic_client.messages.stream.side_effect = ValueError( + "invalid local request shape" + ) + + with pytest.raises(ValueError, match="invalid local request shape"): + agent._interruptible_streaming_api_call({}) + + assert agent._anthropic_client.messages.stream.call_count == 1 + assert mock_replace.call_count == 0 + class TestPartialToolCallWarning: """Regression: when a stream dies mid tool-call argument generation after @@ -1504,4 +1586,3 @@ class TestCopilotACPStreamingDecision: _use_streaming = False assert _use_streaming is True -