diff --git a/agent/anthropic_adapter.py b/agent/anthropic_adapter.py index 01fb8e48be..f32f0dc32f 100644 --- a/agent/anthropic_adapter.py +++ b/agent/anthropic_adapter.py @@ -363,6 +363,102 @@ def _common_betas_for_base_url(base_url: str | None) -> list[str]: return _COMMON_BETAS +def extract_stream_error_payload(exc: BaseException) -> Optional[Dict[str, Any]]: + """Extract the original service-side error payload from an Anthropic SDK + streaming RuntimeError. + + The SDK's streaming accumulator (``anthropic.lib.streaming._messages. + accumulate_event``) raises:: + + RuntimeError('Unexpected event order, got before "message_start"') + + when the server sends a non-``message_start`` event as the first stream + event. In practice this happens when Bedrock/Anthropic returns an + ``error`` event (throttling, overloaded, 5xx, validation) at the start + of the stream. The accumulator's type guard rejects the event without + forwarding the payload, so the caller sees the cryptic "event order" + message with no context. + + The raw event object — with the full error payload intact — is still + preserved in the raising frame's locals. This helper walks + ``exc.__traceback__`` to the innermost frame and recovers it, returning + a ``dict`` like:: + + {"type": "overloaded_error", "message": "Bedrock is currently overloaded"} + + Returns ``None`` if the exception is not the expected RuntimeError, or + if no recoverable event payload is present (e.g. different failure mode). + """ + if not isinstance(exc, RuntimeError): + return None + if "Unexpected event order" not in str(exc): + return None + + tb = getattr(exc, "__traceback__", None) + while tb is not None and tb.tb_next is not None: + tb = tb.tb_next + if tb is None: + return None + + frame_locals = tb.tb_frame.f_locals + event = frame_locals.get("event") + if event is None: + return None + + # The SDK misconstructs the event as RawMessageStartEvent via + # construct_type_unchecked, but the actual fields (type, error) remain + # accessible on the pydantic model. + payload: Dict[str, Any] = {} + event_type = getattr(event, "type", None) + if event_type: + payload["event_type"] = event_type + err = getattr(event, "error", None) + if err is not None: + # err may be a pydantic model, a dict, or a str + if hasattr(err, "model_dump"): + payload["error"] = err.model_dump() + elif hasattr(err, "to_dict"): + payload["error"] = err.to_dict() + elif isinstance(err, dict): + payload["error"] = err + else: + payload["error"] = {"message": str(err)} + # Final fallback: dump the entire event model if we got nothing useful + if not payload.get("error") and hasattr(event, "model_dump"): + try: + payload["raw_event"] = event.model_dump() + except Exception: + pass + return payload or None + + +def format_stream_error_message(exc: RuntimeError, payload: Dict[str, Any]) -> str: + """Format a recovered stream error payload into a human-readable message + suitable for re-raising. + + Produces output like:: + + overloaded_error: Bedrock is currently overloaded, please retry + (original SDK error: Unexpected event order, got error before "message_start") + """ + err = payload.get("error") or {} + err_type = err.get("type") if isinstance(err, dict) else None + err_msg = err.get("message") if isinstance(err, dict) else None + + if err_type and err_msg: + head = f"{err_type}: {err_msg}" + elif err_msg: + head = str(err_msg) + elif err_type: + head = err_type + elif payload.get("raw_event"): + head = f"service error event: {payload['raw_event']}" + else: + head = f"service returned event_type={payload.get('event_type', 'unknown')}" + + return f"{head} (anthropic SDK: {exc})" + + def build_anthropic_client(api_key: str, base_url: str = None, timeout: float = None): """Create an Anthropic client, auto-detecting setup-tokens vs API keys. diff --git a/run_agent.py b/run_agent.py index f7a929118c..083bf8523b 100644 --- a/run_agent.py +++ b/run_agent.py @@ -6255,49 +6255,81 @@ class AIAgent: # Reset stale-stream timer for this attempt last_chunk_time["t"] = time.time() # Use the Anthropic SDK's streaming context manager - with self._anthropic_client.messages.stream(**api_kwargs) as stream: - for event in stream: - # Update stale-stream timer on every event so the - # outer poll loop knows data is flowing. Without - # this, the detector kills healthy long-running - # Opus streams after 180 s even when events are - # actively arriving (the chat_completions path - # already does this at the top of its chunk loop). - last_chunk_time["t"] = time.time() - self._touch_activity("receiving stream response") + try: + with self._anthropic_client.messages.stream(**api_kwargs) as stream: + for event in stream: + # Update stale-stream timer on every event so the + # outer poll loop knows data is flowing. Without + # this, the detector kills healthy long-running + # Opus streams after 180 s even when events are + # actively arriving (the chat_completions path + # already does this at the top of its chunk loop). + last_chunk_time["t"] = time.time() + self._touch_activity("receiving stream response") - if self._interrupt_requested: - break + if self._interrupt_requested: + break - event_type = getattr(event, "type", None) + event_type = getattr(event, "type", None) - if event_type == "content_block_start": - block = getattr(event, "content_block", None) - if block and getattr(block, "type", None) == "tool_use": - has_tool_use = True - tool_name = getattr(block, "name", None) - if tool_name: - _fire_first_delta() - self._fire_tool_gen_started(tool_name) - - elif event_type == "content_block_delta": - delta = getattr(event, "delta", None) - if delta: - delta_type = getattr(delta, "type", None) - if delta_type == "text_delta": - text = getattr(delta, "text", "") - if text and not has_tool_use: + if event_type == "content_block_start": + block = getattr(event, "content_block", None) + if block and getattr(block, "type", None) == "tool_use": + has_tool_use = True + tool_name = getattr(block, "name", None) + if tool_name: _fire_first_delta() - self._fire_stream_delta(text) - deltas_were_sent["yes"] = True - elif delta_type == "thinking_delta": - thinking_text = getattr(delta, "thinking", "") - if thinking_text: - _fire_first_delta() - self._fire_reasoning_delta(thinking_text) + self._fire_tool_gen_started(tool_name) - # Return the native Anthropic Message for downstream processing - return stream.get_final_message() + elif event_type == "content_block_delta": + delta = getattr(event, "delta", None) + if delta: + delta_type = getattr(delta, "type", None) + if delta_type == "text_delta": + text = getattr(delta, "text", "") + if text and not has_tool_use: + _fire_first_delta() + self._fire_stream_delta(text) + deltas_were_sent["yes"] = True + elif delta_type == "thinking_delta": + thinking_text = getattr(delta, "thinking", "") + if thinking_text: + _fire_first_delta() + self._fire_reasoning_delta(thinking_text) + + # Return the native Anthropic Message for downstream processing + return stream.get_final_message() + except RuntimeError as _stream_rt_err: + # The Anthropic SDK's streaming accumulator raises + # `RuntimeError('Unexpected event order, got before + # "message_start"')` when the first SSE event isn't + # message_start — typically when Bedrock / Anthropic returns + # an `error` event first (throttling, overload, 5xx, + # validation). The SDK has the payload but discards it + # via construct_type_unchecked, leaving callers with a + # cryptic "event order" message. Recover the original + # payload from the raising frame's locals and re-raise + # with meaningful context so the user (and retry logic) + # can see what actually happened. + from agent.anthropic_adapter import ( + extract_stream_error_payload, + format_stream_error_message, + ) + _payload = extract_stream_error_payload(_stream_rt_err) + if _payload is not None: + _msg = format_stream_error_message(_stream_rt_err, _payload) + # Re-raise as RuntimeError with the real message so + # logs / _summarize_api_error surface actionable context. + # (We keep it as RuntimeError rather than attempting to + # construct a typed Anthropic SDK exception because + # APIStatusError subclasses require an httpx.Response + # we don't have at this point. The outer retry loop's + # classifier handles RuntimeError retry eligibility via + # substring matching on the message.) + raise RuntimeError(_msg) from _stream_rt_err + # Couldn't recover a payload — let the original exception + # propagate unchanged. + raise def _call(): import httpx as _httpx diff --git a/tests/agent/test_anthropic_stream_error_recovery.py b/tests/agent/test_anthropic_stream_error_recovery.py new file mode 100644 index 0000000000..5f02f7bbfd --- /dev/null +++ b/tests/agent/test_anthropic_stream_error_recovery.py @@ -0,0 +1,129 @@ +"""Tests for anthropic_adapter.extract_stream_error_payload and +format_stream_error_message. + +Regression tests for the SDK streaming accumulator behaviour where +Bedrock/Anthropic error events at the start of a stream would surface +as a cryptic "Unexpected event order" RuntimeError with no payload. +""" +import pytest + +from anthropic.lib.streaming._messages import accumulate_event +from agent.anthropic_adapter import ( + extract_stream_error_payload, + format_stream_error_message, +) + + +def _trigger_sdk_accumulator_error(event_payload): + """Reproduce the real SDK RuntimeError by calling accumulate_event.""" + try: + accumulate_event(event=event_payload, current_snapshot=None) + except RuntimeError as exc: + return exc + raise AssertionError("accumulate_event did not raise") + + +class TestExtractStreamErrorPayload: + """Recover the original service-side error event from the SDK's + RuntimeError traceback.""" + + def test_recovers_overloaded_error_payload(self): + exc = _trigger_sdk_accumulator_error({ + "type": "error", + "error": { + "type": "overloaded_error", + "message": "Bedrock is currently overloaded, please retry", + }, + }) + payload = extract_stream_error_payload(exc) + + assert payload is not None + assert payload["event_type"] == "error" + assert payload["error"]["type"] == "overloaded_error" + assert "overloaded" in payload["error"]["message"] + + def test_recovers_rate_limit_error_payload(self): + exc = _trigger_sdk_accumulator_error({ + "type": "error", + "error": { + "type": "rate_limit_error", + "message": "Number of request tokens has exceeded your per-minute rate limit", + }, + }) + payload = extract_stream_error_payload(exc) + + assert payload is not None + assert payload["error"]["type"] == "rate_limit_error" + + def test_returns_none_for_unrelated_runtime_error(self): + """Don't accidentally match arbitrary RuntimeErrors.""" + exc = RuntimeError("something completely different") + assert extract_stream_error_payload(exc) is None + + def test_returns_none_for_non_runtime_error(self): + """Reject non-RuntimeError exceptions even with matching message.""" + exc = ValueError("Unexpected event order, got error before \"message_start\"") + assert extract_stream_error_payload(exc) is None + + def test_returns_none_for_runtime_error_without_traceback(self): + """Unraised RuntimeError (no traceback) returns None, not crash.""" + exc = RuntimeError("Unexpected event order, got error before \"message_start\"") + assert extract_stream_error_payload(exc) is None + + +class TestFormatStreamErrorMessage: + """Format the recovered payload into a human-readable error message.""" + + def test_formats_typed_error_with_type_and_message(self): + exc = RuntimeError("Unexpected event order, got error") + payload = { + "event_type": "error", + "error": {"type": "overloaded_error", "message": "Please retry"}, + } + msg = format_stream_error_message(exc, payload) + + assert "overloaded_error" in msg + assert "Please retry" in msg + assert "anthropic SDK" in msg # original SDK message preserved as context + + def test_formats_payload_with_only_message(self): + exc = RuntimeError("Unexpected event order") + payload = {"error": {"message": "bare message"}} + msg = format_stream_error_message(exc, payload) + + assert "bare message" in msg + + def test_formats_payload_with_only_type(self): + exc = RuntimeError("Unexpected event order") + payload = {"error": {"type": "invalid_request_error"}} + msg = format_stream_error_message(exc, payload) + + assert "invalid_request_error" in msg + + def test_formats_raw_event_fallback(self): + exc = RuntimeError("Unexpected event order") + payload = {"raw_event": {"type": "unknown", "foo": "bar"}} + msg = format_stream_error_message(exc, payload) + + assert "unknown" in msg or "foo" in msg + + +class TestEndToEndRecovery: + """End-to-end: SDK raises, extract+format produces an actionable message.""" + + def test_sdk_error_event_becomes_readable_message(self): + exc = _trigger_sdk_accumulator_error({ + "type": "error", + "error": { + "type": "overloaded_error", + "message": "Bedrock is currently overloaded, please retry", + }, + }) + payload = extract_stream_error_payload(exc) + msg = format_stream_error_message(exc, payload) + + # Message now contains the actual root cause, not just the SDK's + # cryptic fallback + assert "overloaded_error" in msg + assert "Bedrock is currently overloaded" in msg + assert "anthropic SDK:" in msg