fix(xai): surface provider 'error' SSE frame in Codex fallback stream (#27184)

Original commit 2b193907d by Teknium added a new module-level
_StreamErrorEvent class and threaded its raise into
_run_codex_create_stream_fallback in pre-refactor run_agent.py.

  - _StreamErrorEvent class → run_agent.py (module-level, next to
    _qwen_portal_headers; class needs to be top-level for the codex
    runtime to import it)
  - The fallback event-loop's 'type=error' handler → agent/codex_runtime.py
    where run_codex_create_stream_fallback now lives. Imports
    _StreamErrorEvent lazily from run_agent to avoid circular import.

Co-authored-by: Teknium <127238744+teknium1@users.noreply.github.com>
This commit is contained in:
teknium1 2026-05-16 23:41:09 -07:00
parent 80fa92a491
commit aa05ffba53
No known key found for this signature in database
2 changed files with 68 additions and 0 deletions

View file

@ -356,6 +356,35 @@ def run_codex_create_stream_fallback(agent, api_kwargs: dict, client: Any = None
if not event_type and isinstance(event, dict):
event_type = event.get("type")
# ``error`` SSE frames carry the provider's real failure
# reason (subscription / quota / model-not-available /
# rejected-reasoning-replay) but never appear in the
# ``{completed, incomplete, failed}`` terminal set, so the
# raw loop below would silently consume them and end with
# "did not emit a terminal response". xAI in particular
# emits ``type=error`` as the FIRST frame for OAuth
# accounts whose Grok subscription is missing/exhausted —
# the SDK's stream helper raises ``RuntimeError(Expected
# to have received response.created before error)`` which
# the caller catches and routes here, expecting this
# fallback to surface the message. Synthesize an
# APIError-shaped exception so ``_summarize_api_error``
# and the credential-pool entitlement detector see the
# real text instead of a generic RuntimeError.
if event_type == "error":
err_message = getattr(event, "message", None)
if not err_message and isinstance(event, dict):
err_message = event.get("message")
err_code = getattr(event, "code", None)
if not err_code and isinstance(event, dict):
err_code = event.get("code")
err_param = getattr(event, "param", None)
if not err_param and isinstance(event, dict):
err_param = event.get("param")
err_message = (err_message or "stream emitted error event").strip()
from run_agent import _StreamErrorEvent
raise _StreamErrorEvent(err_message, code=err_code, param=err_param)
# Collect output items and text deltas for backfill
if event_type == "response.output_item.done":
done_item = getattr(event, "item", None)