diff --git a/run_agent.py b/run_agent.py index ffe0ffbe67e..3f4573bac52 100644 --- a/run_agent.py +++ b/run_agent.py @@ -1110,6 +1110,45 @@ def _qwen_portal_headers() -> dict: } +class _StreamErrorEvent(Exception): + """Synthesized provider error surfaced from a Responses ``error`` SSE frame. + + Some Codex-style Responses backends (xAI for subscription/quota + failures, custom relays under malformed-tool-call conditions) emit a + standalone ``type=error`` frame instead of routing the failure + through ``response.failed`` or returning an HTTP 4xx. The fallback + streaming path raises this exception so ``_summarize_api_error`` and + ``_extract_api_error_context`` see a familiar ``.body`` / + ``.status_code`` shape and the entitlement detector can match the + underlying provider message ("do not have an active Grok + subscription", etc.). + """ + + def __init__( + self, + message: str, + *, + code: Optional[str] = None, + param: Optional[str] = None, + status_code: Optional[int] = None, + ) -> None: + super().__init__(message) + self.message = message + self.code = code + self.param = param + self.status_code = status_code + # OpenAI SDK-shaped body so _extract_api_error_context / + # _summarize_api_error / classify_api_error all pick it up. + self.body: Dict[str, Any] = { + "error": { + "message": message, + "code": code, + "param": param, + "type": "error", + } + } + + class AIAgent: """ AI Agent with tool calling capabilities. @@ -7212,6 +7251,34 @@ class AIAgent: if not event_type and isinstance(event, dict): event_type = event.get("type") + # ``error`` SSE frames carry the provider's real failure + # reason (subscription / quota / model-not-available / + # rejected-reasoning-replay) but never appear in the + # ``{completed, incomplete, failed}`` terminal set, so the + # raw loop below would silently consume them and end with + # "did not emit a terminal response". xAI in particular + # emits ``type=error`` as the FIRST frame for OAuth + # accounts whose Grok subscription is missing/exhausted — + # the SDK's stream helper raises ``RuntimeError(Expected + # to have received response.created before error)`` which + # the caller catches and routes here, expecting this + # fallback to surface the message. Synthesize an + # APIError-shaped exception so ``_summarize_api_error`` + # and the credential-pool entitlement detector see the + # real text instead of a generic RuntimeError. + if event_type == "error": + err_message = getattr(event, "message", None) + if not err_message and isinstance(event, dict): + err_message = event.get("message") + err_code = getattr(event, "code", None) + if not err_code and isinstance(event, dict): + err_code = event.get("code") + err_param = getattr(event, "param", None) + if not err_param and isinstance(event, dict): + err_param = event.get("param") + err_message = (err_message or "stream emitted error event").strip() + raise _StreamErrorEvent(err_message, code=err_code, param=err_param) + # Collect output items and text deltas for backfill if event_type == "response.output_item.done": done_item = getattr(event, "item", None) diff --git a/tests/run_agent/test_streaming.py b/tests/run_agent/test_streaming.py index 1ce140f82bf..474a568875d 100644 --- a/tests/run_agent/test_streaming.py +++ b/tests/run_agent/test_streaming.py @@ -1586,3 +1586,145 @@ class TestCopilotACPStreamingDecision: _use_streaming = False assert _use_streaming is True + + +class TestCodexFallbackErrorEvent: + """Provider ``error`` SSE frames must surface the real message, + not the generic "did not emit a terminal response" RuntimeError. + + xAI emits ``type=error`` as the FIRST frame on the Responses stream + when an OAuth account is unsubscribed/exhausted (May 2026 + SuperGrok rollout). The SDK helper raises + ``RuntimeError("Expected to have received response.created before + error")`` which the caller catches and routes to + ``_run_codex_create_stream_fallback``. The fallback then opens a + NEW stream that emits the same ``type=error`` frame; before this + fix it ignored the event entirely and raised a useless RuntimeError. + """ + + def _make_agent(self): + from run_agent import AIAgent + agent = AIAgent( + api_key="test-key", + base_url="https://api.x.ai/v1", + provider="xai-oauth", + model="grok-4.3", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "codex_responses" + agent._touch_activity = lambda desc: None + return agent + + def test_fallback_raises_synthesized_error_with_xai_subscription_message(self): + from run_agent import _StreamErrorEvent + + agent = self._make_agent() + + error_event = SimpleNamespace( + type="error", + message=( + "Forbidden: The caller does not have permission to execute the specified operation. " + "'You have either run out of available resources or do not have an active Grok subscription.'" + ), + code="permission_denied", + param=None, + sequence_number=1, + ) + + class _FakeStream: + def __iter__(self_inner): + return iter([error_event]) + def close(self_inner): + return None + + mock_client = MagicMock() + mock_client.responses.create.return_value = _FakeStream() + + with pytest.raises(_StreamErrorEvent) as excinfo: + agent._run_codex_create_stream_fallback( + {"model": "grok-4.3", "instructions": "hi", "input": []}, + client=mock_client, + ) + + exc = excinfo.value + assert "active Grok subscription" in str(exc) + assert exc.code == "permission_denied" + assert isinstance(exc.body, dict) + assert exc.body["error"]["message"] == error_event.message + # _extract_api_error_context reads .body["error"]["message"] — make sure + # the entitlement detector will find the subscription phrase there. + assert "active Grok subscription" in exc.body["error"]["message"] + + def test_fallback_dict_event_payload_is_also_handled(self): + """Some relays deliver events as plain dicts instead of model + objects; the dict branch in the loop must surface them too.""" + from run_agent import _StreamErrorEvent + + agent = self._make_agent() + + error_event = { + "type": "error", + "message": "rate_limited", + "code": "rate_limit_exceeded", + } + + class _FakeStream: + def __iter__(self_inner): + return iter([error_event]) + def close(self_inner): + return None + + mock_client = MagicMock() + mock_client.responses.create.return_value = _FakeStream() + + with pytest.raises(_StreamErrorEvent) as excinfo: + agent._run_codex_create_stream_fallback( + {"model": "grok-4.3", "instructions": "hi", "input": []}, + client=mock_client, + ) + + assert "rate_limited" in str(excinfo.value) + assert excinfo.value.code == "rate_limit_exceeded" + + def test_fallback_surfaces_message_useful_to_summarizer(self): + """The synthesized exception must be readable by + ``_summarize_api_error`` so the user-facing log line shows the + real provider message instead of a generic class name.""" + from run_agent import AIAgent, _StreamErrorEvent + + agent = self._make_agent() + exc = _StreamErrorEvent( + "You have either run out of available resources or do not have an active Grok subscription.", + code="permission_denied", + ) + + summary = AIAgent._summarize_api_error(exc) + assert "active Grok subscription" in summary + + def test_fallback_still_raises_terminal_error_when_no_error_event(self): + """Streams that simply end without any terminal event (and no + ``error`` frame) must continue to raise the original + ``"did not emit a terminal response"`` RuntimeError so callers + can distinguish "stream truncated mid-flight" from "provider + rejected the call".""" + agent = self._make_agent() + + # Empty stream — no events at all + class _FakeStream: + def __iter__(self_inner): + return iter([]) + def close(self_inner): + return None + + mock_client = MagicMock() + mock_client.responses.create.return_value = _FakeStream() + + with pytest.raises(RuntimeError) as excinfo: + agent._run_codex_create_stream_fallback( + {"model": "grok-4.3", "instructions": "hi", "input": []}, + client=mock_client, + ) + + assert "did not emit a terminal response" in str(excinfo.value)