From fe4c87eb28907c467f60335d75680a50e77b15c9 Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sat, 16 May 2026 23:35:54 -0700 Subject: [PATCH] =?UTF-8?q?fix(agent):=20retry=20malformed=20anthropic=20s?= =?UTF-8?q?tream=20parser=20errors=20=E2=80=94=20port=20to=20extracted=20m?= =?UTF-8?q?odules?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Original commit 9c304a7f5 by helix4u targeted _flatten_exception_chain, _summarize_api_error, and the _call streaming retry loop in pre-refactor run_agent.py. Re-applied to: - New _is_provider_stream_parse_error helper → run_agent.py (next to _flatten_exception_chain in the AIAgent class) - _summarize_api_error early-return for the malformed-streaming ValueError → run_agent.py (kept method body) - _call streaming retry: _is_stream_parse_err flag wired into _is_transient AND the post-exhaustion branch + dedicated malformed-streaming user-status string → agent/chat_completion_helpers.py (the _call body now lives there) Co-authored-by: helix4u <4317663+helix4u@users.noreply.github.com> --- agent/chat_completion_helpers.py | 13 +++++++++++-- run_agent.py | 24 ++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/agent/chat_completion_helpers.py b/agent/chat_completion_helpers.py index d163557b8fa..0b3c394832f 100644 --- a/agent/chat_completion_helpers.py +++ b/agent/chat_completion_helpers.py @@ -1632,6 +1632,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= _is_conn_err = isinstance( e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError) ) + _is_stream_parse_err = agent._is_provider_stream_parse_error(e) # If the stream died AFTER some tokens were delivered: # normally we don't retry (the user already saw text, @@ -1671,7 +1672,10 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= for phrase in _SSE_PREVIEW_PHRASES ) _is_transient = ( - _is_timeout or _is_conn_err or _is_sse_conn_err_preview + _is_timeout + or _is_conn_err + or _is_sse_conn_err_preview + or _is_stream_parse_err ) _can_silent_retry = ( _partial_tool_in_flight @@ -1769,7 +1773,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= for phrase in _SSE_CONN_PHRASES ) - if _is_timeout or _is_conn_err or _is_sse_conn_err: + if _is_timeout or _is_conn_err or _is_sse_conn_err or _is_stream_parse_err: # Transient network / timeout error. Retry the # streaming request with a fresh connection first. if _stream_attempt < _max_stream_retries: @@ -1811,6 +1815,11 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= diag=request_client_holder.get("diag"), ) agent._emit_status( + "❌ Provider returned malformed streaming data after " + f"{_max_stream_retries + 1} attempts. " + "The provider may be experiencing issues — " + "try again in a moment." + if _is_stream_parse_err else "❌ Connection to provider failed after " f"{_max_stream_retries + 1} attempts. " "The provider may be experiencing issues — " diff --git a/run_agent.py b/run_agent.py index 1cb0ae761e6..f843603a1e5 100644 --- a/run_agent.py +++ b/run_agent.py @@ -700,6 +700,24 @@ class AIAgent: from agent.stream_diag import flatten_exception_chain return flatten_exception_chain(error) + def _is_provider_stream_parse_error(self, error: BaseException) -> bool: + """Return True for malformed provider streaming data from SDK parsers. + + Some Anthropic-compatible streaming providers can send a malformed + event-stream frame. The Anthropic SDK surfaces that as a plain + ``ValueError`` such as ``expected ident at line 1 column 149``. That + is provider wire-format trouble, not local request validation, so it + should follow the same retry path as a truncated JSON body. + """ + if getattr(self, "api_mode", None) != "anthropic_messages": + return False + if not isinstance(error, ValueError): + return False + if isinstance(error, (UnicodeEncodeError, json.JSONDecodeError)): + return False + message = str(error).strip().lower() + return "expected ident at line" in message + def _log_stream_retry( self, *, @@ -1335,6 +1353,12 @@ class AIAgent: """ raw = str(error) + if ( + isinstance(error, ValueError) + and "expected ident at line" in raw.lower() + ): + return f"Malformed provider streaming response: {raw[:300]}" + # Cloudflare / proxy HTML pages: grab the for a clean summary if "<!DOCTYPE" in raw or "<html" in raw: m = re.search(r"<title[^>]*>([^<]+)", raw, re.IGNORECASE)