This commit is contained in:
Andre Kurait 2026-04-24 19:25:26 -05:00 committed by GitHub
commit f4b989a769
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 295 additions and 38 deletions

View file

@ -363,6 +363,102 @@ def _common_betas_for_base_url(base_url: str | None) -> list[str]:
return _COMMON_BETAS
def extract_stream_error_payload(exc: BaseException) -> Optional[Dict[str, Any]]:
"""Extract the original service-side error payload from an Anthropic SDK
streaming RuntimeError.
The SDK's streaming accumulator (``anthropic.lib.streaming._messages.
accumulate_event``) raises::
RuntimeError('Unexpected event order, got <type> before "message_start"')
when the server sends a non-``message_start`` event as the first stream
event. In practice this happens when Bedrock/Anthropic returns an
``error`` event (throttling, overloaded, 5xx, validation) at the start
of the stream. The accumulator's type guard rejects the event without
forwarding the payload, so the caller sees the cryptic "event order"
message with no context.
The raw event object with the full error payload intact is still
preserved in the raising frame's locals. This helper walks
``exc.__traceback__`` to the innermost frame and recovers it, returning
a ``dict`` like::
{"type": "overloaded_error", "message": "Bedrock is currently overloaded"}
Returns ``None`` if the exception is not the expected RuntimeError, or
if no recoverable event payload is present (e.g. different failure mode).
"""
if not isinstance(exc, RuntimeError):
return None
if "Unexpected event order" not in str(exc):
return None
tb = getattr(exc, "__traceback__", None)
while tb is not None and tb.tb_next is not None:
tb = tb.tb_next
if tb is None:
return None
frame_locals = tb.tb_frame.f_locals
event = frame_locals.get("event")
if event is None:
return None
# The SDK misconstructs the event as RawMessageStartEvent via
# construct_type_unchecked, but the actual fields (type, error) remain
# accessible on the pydantic model.
payload: Dict[str, Any] = {}
event_type = getattr(event, "type", None)
if event_type:
payload["event_type"] = event_type
err = getattr(event, "error", None)
if err is not None:
# err may be a pydantic model, a dict, or a str
if hasattr(err, "model_dump"):
payload["error"] = err.model_dump()
elif hasattr(err, "to_dict"):
payload["error"] = err.to_dict()
elif isinstance(err, dict):
payload["error"] = err
else:
payload["error"] = {"message": str(err)}
# Final fallback: dump the entire event model if we got nothing useful
if not payload.get("error") and hasattr(event, "model_dump"):
try:
payload["raw_event"] = event.model_dump()
except Exception:
pass
return payload or None
def format_stream_error_message(exc: RuntimeError, payload: Dict[str, Any]) -> str:
"""Format a recovered stream error payload into a human-readable message
suitable for re-raising.
Produces output like::
overloaded_error: Bedrock is currently overloaded, please retry
(original SDK error: Unexpected event order, got error before "message_start")
"""
err = payload.get("error") or {}
err_type = err.get("type") if isinstance(err, dict) else None
err_msg = err.get("message") if isinstance(err, dict) else None
if err_type and err_msg:
head = f"{err_type}: {err_msg}"
elif err_msg:
head = str(err_msg)
elif err_type:
head = err_type
elif payload.get("raw_event"):
head = f"service error event: {payload['raw_event']}"
else:
head = f"service returned event_type={payload.get('event_type', 'unknown')}"
return f"{head} (anthropic SDK: {exc})"
def build_anthropic_client(api_key: str, base_url: str = None, timeout: float = None):
"""Create an Anthropic client, auto-detecting setup-tokens vs API keys.

View file

@ -6255,49 +6255,81 @@ class AIAgent:
# Reset stale-stream timer for this attempt
last_chunk_time["t"] = time.time()
# Use the Anthropic SDK's streaming context manager
with self._anthropic_client.messages.stream(**api_kwargs) as stream:
for event in stream:
# Update stale-stream timer on every event so the
# outer poll loop knows data is flowing. Without
# this, the detector kills healthy long-running
# Opus streams after 180 s even when events are
# actively arriving (the chat_completions path
# already does this at the top of its chunk loop).
last_chunk_time["t"] = time.time()
self._touch_activity("receiving stream response")
try:
with self._anthropic_client.messages.stream(**api_kwargs) as stream:
for event in stream:
# Update stale-stream timer on every event so the
# outer poll loop knows data is flowing. Without
# this, the detector kills healthy long-running
# Opus streams after 180 s even when events are
# actively arriving (the chat_completions path
# already does this at the top of its chunk loop).
last_chunk_time["t"] = time.time()
self._touch_activity("receiving stream response")
if self._interrupt_requested:
break
if self._interrupt_requested:
break
event_type = getattr(event, "type", None)
event_type = getattr(event, "type", None)
if event_type == "content_block_start":
block = getattr(event, "content_block", None)
if block and getattr(block, "type", None) == "tool_use":
has_tool_use = True
tool_name = getattr(block, "name", None)
if tool_name:
_fire_first_delta()
self._fire_tool_gen_started(tool_name)
elif event_type == "content_block_delta":
delta = getattr(event, "delta", None)
if delta:
delta_type = getattr(delta, "type", None)
if delta_type == "text_delta":
text = getattr(delta, "text", "")
if text and not has_tool_use:
if event_type == "content_block_start":
block = getattr(event, "content_block", None)
if block and getattr(block, "type", None) == "tool_use":
has_tool_use = True
tool_name = getattr(block, "name", None)
if tool_name:
_fire_first_delta()
self._fire_stream_delta(text)
deltas_were_sent["yes"] = True
elif delta_type == "thinking_delta":
thinking_text = getattr(delta, "thinking", "")
if thinking_text:
_fire_first_delta()
self._fire_reasoning_delta(thinking_text)
self._fire_tool_gen_started(tool_name)
# Return the native Anthropic Message for downstream processing
return stream.get_final_message()
elif event_type == "content_block_delta":
delta = getattr(event, "delta", None)
if delta:
delta_type = getattr(delta, "type", None)
if delta_type == "text_delta":
text = getattr(delta, "text", "")
if text and not has_tool_use:
_fire_first_delta()
self._fire_stream_delta(text)
deltas_were_sent["yes"] = True
elif delta_type == "thinking_delta":
thinking_text = getattr(delta, "thinking", "")
if thinking_text:
_fire_first_delta()
self._fire_reasoning_delta(thinking_text)
# Return the native Anthropic Message for downstream processing
return stream.get_final_message()
except RuntimeError as _stream_rt_err:
# The Anthropic SDK's streaming accumulator raises
# `RuntimeError('Unexpected event order, got <type> before
# "message_start"')` when the first SSE event isn't
# message_start — typically when Bedrock / Anthropic returns
# an `error` event first (throttling, overload, 5xx,
# validation). The SDK has the payload but discards it
# via construct_type_unchecked, leaving callers with a
# cryptic "event order" message. Recover the original
# payload from the raising frame's locals and re-raise
# with meaningful context so the user (and retry logic)
# can see what actually happened.
from agent.anthropic_adapter import (
extract_stream_error_payload,
format_stream_error_message,
)
_payload = extract_stream_error_payload(_stream_rt_err)
if _payload is not None:
_msg = format_stream_error_message(_stream_rt_err, _payload)
# Re-raise as RuntimeError with the real message so
# logs / _summarize_api_error surface actionable context.
# (We keep it as RuntimeError rather than attempting to
# construct a typed Anthropic SDK exception because
# APIStatusError subclasses require an httpx.Response
# we don't have at this point. The outer retry loop's
# classifier handles RuntimeError retry eligibility via
# substring matching on the message.)
raise RuntimeError(_msg) from _stream_rt_err
# Couldn't recover a payload — let the original exception
# propagate unchanged.
raise
def _call():
import httpx as _httpx

View file

@ -0,0 +1,129 @@
"""Tests for anthropic_adapter.extract_stream_error_payload and
format_stream_error_message.
Regression tests for the SDK streaming accumulator behaviour where
Bedrock/Anthropic error events at the start of a stream would surface
as a cryptic "Unexpected event order" RuntimeError with no payload.
"""
import pytest
from anthropic.lib.streaming._messages import accumulate_event
from agent.anthropic_adapter import (
extract_stream_error_payload,
format_stream_error_message,
)
def _trigger_sdk_accumulator_error(event_payload):
"""Reproduce the real SDK RuntimeError by calling accumulate_event."""
try:
accumulate_event(event=event_payload, current_snapshot=None)
except RuntimeError as exc:
return exc
raise AssertionError("accumulate_event did not raise")
class TestExtractStreamErrorPayload:
"""Recover the original service-side error event from the SDK's
RuntimeError traceback."""
def test_recovers_overloaded_error_payload(self):
exc = _trigger_sdk_accumulator_error({
"type": "error",
"error": {
"type": "overloaded_error",
"message": "Bedrock is currently overloaded, please retry",
},
})
payload = extract_stream_error_payload(exc)
assert payload is not None
assert payload["event_type"] == "error"
assert payload["error"]["type"] == "overloaded_error"
assert "overloaded" in payload["error"]["message"]
def test_recovers_rate_limit_error_payload(self):
exc = _trigger_sdk_accumulator_error({
"type": "error",
"error": {
"type": "rate_limit_error",
"message": "Number of request tokens has exceeded your per-minute rate limit",
},
})
payload = extract_stream_error_payload(exc)
assert payload is not None
assert payload["error"]["type"] == "rate_limit_error"
def test_returns_none_for_unrelated_runtime_error(self):
"""Don't accidentally match arbitrary RuntimeErrors."""
exc = RuntimeError("something completely different")
assert extract_stream_error_payload(exc) is None
def test_returns_none_for_non_runtime_error(self):
"""Reject non-RuntimeError exceptions even with matching message."""
exc = ValueError("Unexpected event order, got error before \"message_start\"")
assert extract_stream_error_payload(exc) is None
def test_returns_none_for_runtime_error_without_traceback(self):
"""Unraised RuntimeError (no traceback) returns None, not crash."""
exc = RuntimeError("Unexpected event order, got error before \"message_start\"")
assert extract_stream_error_payload(exc) is None
class TestFormatStreamErrorMessage:
"""Format the recovered payload into a human-readable error message."""
def test_formats_typed_error_with_type_and_message(self):
exc = RuntimeError("Unexpected event order, got error")
payload = {
"event_type": "error",
"error": {"type": "overloaded_error", "message": "Please retry"},
}
msg = format_stream_error_message(exc, payload)
assert "overloaded_error" in msg
assert "Please retry" in msg
assert "anthropic SDK" in msg # original SDK message preserved as context
def test_formats_payload_with_only_message(self):
exc = RuntimeError("Unexpected event order")
payload = {"error": {"message": "bare message"}}
msg = format_stream_error_message(exc, payload)
assert "bare message" in msg
def test_formats_payload_with_only_type(self):
exc = RuntimeError("Unexpected event order")
payload = {"error": {"type": "invalid_request_error"}}
msg = format_stream_error_message(exc, payload)
assert "invalid_request_error" in msg
def test_formats_raw_event_fallback(self):
exc = RuntimeError("Unexpected event order")
payload = {"raw_event": {"type": "unknown", "foo": "bar"}}
msg = format_stream_error_message(exc, payload)
assert "unknown" in msg or "foo" in msg
class TestEndToEndRecovery:
"""End-to-end: SDK raises, extract+format produces an actionable message."""
def test_sdk_error_event_becomes_readable_message(self):
exc = _trigger_sdk_accumulator_error({
"type": "error",
"error": {
"type": "overloaded_error",
"message": "Bedrock is currently overloaded, please retry",
},
})
payload = extract_stream_error_payload(exc)
msg = format_stream_error_message(exc, payload)
# Message now contains the actual root cause, not just the SDK's
# cryptic fallback
assert "overloaded_error" in msg
assert "Bedrock is currently overloaded" in msg
assert "anthropic SDK:" in msg