fix(agent): retry malformed anthropic stream parser errors — port to extracted modules

Original commit 9c304a7f5 by helix4u targeted _flatten_exception_chain,
_summarize_api_error, and the _call streaming retry loop in pre-refactor
run_agent.py. Re-applied to:

  - New _is_provider_stream_parse_error helper → run_agent.py (next
    to _flatten_exception_chain in the AIAgent class)
  - _summarize_api_error early-return for the malformed-streaming
    ValueError → run_agent.py (kept method body)
  - _call streaming retry: _is_stream_parse_err flag wired into
    _is_transient AND the post-exhaustion branch + dedicated
    malformed-streaming user-status string → agent/chat_completion_helpers.py
    (the _call body now lives there)

Co-authored-by: helix4u <4317663+helix4u@users.noreply.github.com>
This commit is contained in:
teknium1 2026-05-16 23:35:54 -07:00
parent f885be030c
commit fe4c87eb28
No known key found for this signature in database
2 changed files with 35 additions and 2 deletions

View file

@ -1632,6 +1632,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
_is_conn_err = isinstance(
e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError)
)
_is_stream_parse_err = agent._is_provider_stream_parse_error(e)
# If the stream died AFTER some tokens were delivered:
# normally we don't retry (the user already saw text,
@ -1671,7 +1672,10 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
for phrase in _SSE_PREVIEW_PHRASES
)
_is_transient = (
_is_timeout or _is_conn_err or _is_sse_conn_err_preview
_is_timeout
or _is_conn_err
or _is_sse_conn_err_preview
or _is_stream_parse_err
)
_can_silent_retry = (
_partial_tool_in_flight
@ -1769,7 +1773,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
for phrase in _SSE_CONN_PHRASES
)
if _is_timeout or _is_conn_err or _is_sse_conn_err:
if _is_timeout or _is_conn_err or _is_sse_conn_err or _is_stream_parse_err:
# Transient network / timeout error. Retry the
# streaming request with a fresh connection first.
if _stream_attempt < _max_stream_retries:
@ -1811,6 +1815,11 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
diag=request_client_holder.get("diag"),
)
agent._emit_status(
"❌ Provider returned malformed streaming data after "
f"{_max_stream_retries + 1} attempts. "
"The provider may be experiencing issues — "
"try again in a moment."
if _is_stream_parse_err else
"❌ Connection to provider failed after "
f"{_max_stream_retries + 1} attempts. "
"The provider may be experiencing issues — "

View file

@ -700,6 +700,24 @@ class AIAgent:
from agent.stream_diag import flatten_exception_chain
return flatten_exception_chain(error)
def _is_provider_stream_parse_error(self, error: BaseException) -> bool:
"""Return True for malformed provider streaming data from SDK parsers.
Some Anthropic-compatible streaming providers can send a malformed
event-stream frame. The Anthropic SDK surfaces that as a plain
``ValueError`` such as ``expected ident at line 1 column 149``. That
is provider wire-format trouble, not local request validation, so it
should follow the same retry path as a truncated JSON body.
"""
if getattr(self, "api_mode", None) != "anthropic_messages":
return False
if not isinstance(error, ValueError):
return False
if isinstance(error, (UnicodeEncodeError, json.JSONDecodeError)):
return False
message = str(error).strip().lower()
return "expected ident at line" in message
def _log_stream_retry(
self,
*,
@ -1335,6 +1353,12 @@ class AIAgent:
"""
raw = str(error)
if (
isinstance(error, ValueError)
and "expected ident at line" in raw.lower()
):
return f"Malformed provider streaming response: {raw[:300]}"
# Cloudflare / proxy HTML pages: grab the <title> for a clean summary
if "<!DOCTYPE" in raw or "<html" in raw:
m = re.search(r"<title[^>]*>([^<]+)</title>", raw, re.IGNORECASE)