mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
fix(xai): surface provider 'error' SSE frame in Codex fallback stream (#27184)
xAI's Responses stream emits 'type=error' as the FIRST SSE frame when an
OAuth account is unsubscribed/exhausted or rejects the encrypted-reasoning
replay introduced in the May 2026 SuperGrok rollout. The SDK helper
raises RuntimeError(Expected to have received response.created before
error), which the caller correctly routes to
_run_codex_create_stream_fallback. The fallback then opens a new stream
that emits the same 'error' frame — but the fallback loop only handled
{response.completed, response.incomplete, response.failed} and silently
continue'd past 'error' events. Result: the loop fell off the end of
the stream and raised the useless 'fallback did not emit a terminal
response' RuntimeError, which the classifier marked retryable=True and
looped 3x before failing with no clue what went wrong.
Now: 'error' frames raise a synthesized _StreamErrorEvent with an OpenAI
SDK-shaped .body so _summarize_api_error, _extract_api_error_context,
_is_entitlement_failure, and classify_api_error all see the real
provider message. Users on unsubscribed accounts now see 'do not have
an active Grok subscription' once, not three RuntimeErrors.
Verified end-to-end: classifier returns reason=auth retryable=False;
entitlement detector matches even with status_code=None; summarizer
returns the full xAI message.
Tests: 4 new in TestCodexFallbackErrorEvent covering xAI subscription
message, dict-shaped events, summarizer integration, and the empty-stream
case (must still raise the original RuntimeError so 'truncated mid-flight'
stays distinguishable from 'provider rejected the call').
This commit is contained in:
parent
e21cb8d145
commit
2b193907d6
2 changed files with 209 additions and 0 deletions
67
run_agent.py
67
run_agent.py
|
|
@ -1110,6 +1110,45 @@ def _qwen_portal_headers() -> dict:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class _StreamErrorEvent(Exception):
|
||||||
|
"""Synthesized provider error surfaced from a Responses ``error`` SSE frame.
|
||||||
|
|
||||||
|
Some Codex-style Responses backends (xAI for subscription/quota
|
||||||
|
failures, custom relays under malformed-tool-call conditions) emit a
|
||||||
|
standalone ``type=error`` frame instead of routing the failure
|
||||||
|
through ``response.failed`` or returning an HTTP 4xx. The fallback
|
||||||
|
streaming path raises this exception so ``_summarize_api_error`` and
|
||||||
|
``_extract_api_error_context`` see a familiar ``.body`` /
|
||||||
|
``.status_code`` shape and the entitlement detector can match the
|
||||||
|
underlying provider message ("do not have an active Grok
|
||||||
|
subscription", etc.).
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
message: str,
|
||||||
|
*,
|
||||||
|
code: Optional[str] = None,
|
||||||
|
param: Optional[str] = None,
|
||||||
|
status_code: Optional[int] = None,
|
||||||
|
) -> None:
|
||||||
|
super().__init__(message)
|
||||||
|
self.message = message
|
||||||
|
self.code = code
|
||||||
|
self.param = param
|
||||||
|
self.status_code = status_code
|
||||||
|
# OpenAI SDK-shaped body so _extract_api_error_context /
|
||||||
|
# _summarize_api_error / classify_api_error all pick it up.
|
||||||
|
self.body: Dict[str, Any] = {
|
||||||
|
"error": {
|
||||||
|
"message": message,
|
||||||
|
"code": code,
|
||||||
|
"param": param,
|
||||||
|
"type": "error",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class AIAgent:
|
class AIAgent:
|
||||||
"""
|
"""
|
||||||
AI Agent with tool calling capabilities.
|
AI Agent with tool calling capabilities.
|
||||||
|
|
@ -7212,6 +7251,34 @@ class AIAgent:
|
||||||
if not event_type and isinstance(event, dict):
|
if not event_type and isinstance(event, dict):
|
||||||
event_type = event.get("type")
|
event_type = event.get("type")
|
||||||
|
|
||||||
|
# ``error`` SSE frames carry the provider's real failure
|
||||||
|
# reason (subscription / quota / model-not-available /
|
||||||
|
# rejected-reasoning-replay) but never appear in the
|
||||||
|
# ``{completed, incomplete, failed}`` terminal set, so the
|
||||||
|
# raw loop below would silently consume them and end with
|
||||||
|
# "did not emit a terminal response". xAI in particular
|
||||||
|
# emits ``type=error`` as the FIRST frame for OAuth
|
||||||
|
# accounts whose Grok subscription is missing/exhausted —
|
||||||
|
# the SDK's stream helper raises ``RuntimeError(Expected
|
||||||
|
# to have received response.created before error)`` which
|
||||||
|
# the caller catches and routes here, expecting this
|
||||||
|
# fallback to surface the message. Synthesize an
|
||||||
|
# APIError-shaped exception so ``_summarize_api_error``
|
||||||
|
# and the credential-pool entitlement detector see the
|
||||||
|
# real text instead of a generic RuntimeError.
|
||||||
|
if event_type == "error":
|
||||||
|
err_message = getattr(event, "message", None)
|
||||||
|
if not err_message and isinstance(event, dict):
|
||||||
|
err_message = event.get("message")
|
||||||
|
err_code = getattr(event, "code", None)
|
||||||
|
if not err_code and isinstance(event, dict):
|
||||||
|
err_code = event.get("code")
|
||||||
|
err_param = getattr(event, "param", None)
|
||||||
|
if not err_param and isinstance(event, dict):
|
||||||
|
err_param = event.get("param")
|
||||||
|
err_message = (err_message or "stream emitted error event").strip()
|
||||||
|
raise _StreamErrorEvent(err_message, code=err_code, param=err_param)
|
||||||
|
|
||||||
# Collect output items and text deltas for backfill
|
# Collect output items and text deltas for backfill
|
||||||
if event_type == "response.output_item.done":
|
if event_type == "response.output_item.done":
|
||||||
done_item = getattr(event, "item", None)
|
done_item = getattr(event, "item", None)
|
||||||
|
|
|
||||||
|
|
@ -1586,3 +1586,145 @@ class TestCopilotACPStreamingDecision:
|
||||||
_use_streaming = False
|
_use_streaming = False
|
||||||
|
|
||||||
assert _use_streaming is True
|
assert _use_streaming is True
|
||||||
|
|
||||||
|
|
||||||
|
class TestCodexFallbackErrorEvent:
|
||||||
|
"""Provider ``error`` SSE frames must surface the real message,
|
||||||
|
not the generic "did not emit a terminal response" RuntimeError.
|
||||||
|
|
||||||
|
xAI emits ``type=error`` as the FIRST frame on the Responses stream
|
||||||
|
when an OAuth account is unsubscribed/exhausted (May 2026
|
||||||
|
SuperGrok rollout). The SDK helper raises
|
||||||
|
``RuntimeError("Expected to have received response.created before
|
||||||
|
error")`` which the caller catches and routes to
|
||||||
|
``_run_codex_create_stream_fallback``. The fallback then opens a
|
||||||
|
NEW stream that emits the same ``type=error`` frame; before this
|
||||||
|
fix it ignored the event entirely and raised a useless RuntimeError.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _make_agent(self):
|
||||||
|
from run_agent import AIAgent
|
||||||
|
agent = AIAgent(
|
||||||
|
api_key="test-key",
|
||||||
|
base_url="https://api.x.ai/v1",
|
||||||
|
provider="xai-oauth",
|
||||||
|
model="grok-4.3",
|
||||||
|
quiet_mode=True,
|
||||||
|
skip_context_files=True,
|
||||||
|
skip_memory=True,
|
||||||
|
)
|
||||||
|
agent.api_mode = "codex_responses"
|
||||||
|
agent._touch_activity = lambda desc: None
|
||||||
|
return agent
|
||||||
|
|
||||||
|
def test_fallback_raises_synthesized_error_with_xai_subscription_message(self):
|
||||||
|
from run_agent import _StreamErrorEvent
|
||||||
|
|
||||||
|
agent = self._make_agent()
|
||||||
|
|
||||||
|
error_event = SimpleNamespace(
|
||||||
|
type="error",
|
||||||
|
message=(
|
||||||
|
"Forbidden: The caller does not have permission to execute the specified operation. "
|
||||||
|
"'You have either run out of available resources or do not have an active Grok subscription.'"
|
||||||
|
),
|
||||||
|
code="permission_denied",
|
||||||
|
param=None,
|
||||||
|
sequence_number=1,
|
||||||
|
)
|
||||||
|
|
||||||
|
class _FakeStream:
|
||||||
|
def __iter__(self_inner):
|
||||||
|
return iter([error_event])
|
||||||
|
def close(self_inner):
|
||||||
|
return None
|
||||||
|
|
||||||
|
mock_client = MagicMock()
|
||||||
|
mock_client.responses.create.return_value = _FakeStream()
|
||||||
|
|
||||||
|
with pytest.raises(_StreamErrorEvent) as excinfo:
|
||||||
|
agent._run_codex_create_stream_fallback(
|
||||||
|
{"model": "grok-4.3", "instructions": "hi", "input": []},
|
||||||
|
client=mock_client,
|
||||||
|
)
|
||||||
|
|
||||||
|
exc = excinfo.value
|
||||||
|
assert "active Grok subscription" in str(exc)
|
||||||
|
assert exc.code == "permission_denied"
|
||||||
|
assert isinstance(exc.body, dict)
|
||||||
|
assert exc.body["error"]["message"] == error_event.message
|
||||||
|
# _extract_api_error_context reads .body["error"]["message"] — make sure
|
||||||
|
# the entitlement detector will find the subscription phrase there.
|
||||||
|
assert "active Grok subscription" in exc.body["error"]["message"]
|
||||||
|
|
||||||
|
def test_fallback_dict_event_payload_is_also_handled(self):
|
||||||
|
"""Some relays deliver events as plain dicts instead of model
|
||||||
|
objects; the dict branch in the loop must surface them too."""
|
||||||
|
from run_agent import _StreamErrorEvent
|
||||||
|
|
||||||
|
agent = self._make_agent()
|
||||||
|
|
||||||
|
error_event = {
|
||||||
|
"type": "error",
|
||||||
|
"message": "rate_limited",
|
||||||
|
"code": "rate_limit_exceeded",
|
||||||
|
}
|
||||||
|
|
||||||
|
class _FakeStream:
|
||||||
|
def __iter__(self_inner):
|
||||||
|
return iter([error_event])
|
||||||
|
def close(self_inner):
|
||||||
|
return None
|
||||||
|
|
||||||
|
mock_client = MagicMock()
|
||||||
|
mock_client.responses.create.return_value = _FakeStream()
|
||||||
|
|
||||||
|
with pytest.raises(_StreamErrorEvent) as excinfo:
|
||||||
|
agent._run_codex_create_stream_fallback(
|
||||||
|
{"model": "grok-4.3", "instructions": "hi", "input": []},
|
||||||
|
client=mock_client,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert "rate_limited" in str(excinfo.value)
|
||||||
|
assert excinfo.value.code == "rate_limit_exceeded"
|
||||||
|
|
||||||
|
def test_fallback_surfaces_message_useful_to_summarizer(self):
|
||||||
|
"""The synthesized exception must be readable by
|
||||||
|
``_summarize_api_error`` so the user-facing log line shows the
|
||||||
|
real provider message instead of a generic class name."""
|
||||||
|
from run_agent import AIAgent, _StreamErrorEvent
|
||||||
|
|
||||||
|
agent = self._make_agent()
|
||||||
|
exc = _StreamErrorEvent(
|
||||||
|
"You have either run out of available resources or do not have an active Grok subscription.",
|
||||||
|
code="permission_denied",
|
||||||
|
)
|
||||||
|
|
||||||
|
summary = AIAgent._summarize_api_error(exc)
|
||||||
|
assert "active Grok subscription" in summary
|
||||||
|
|
||||||
|
def test_fallback_still_raises_terminal_error_when_no_error_event(self):
|
||||||
|
"""Streams that simply end without any terminal event (and no
|
||||||
|
``error`` frame) must continue to raise the original
|
||||||
|
``"did not emit a terminal response"`` RuntimeError so callers
|
||||||
|
can distinguish "stream truncated mid-flight" from "provider
|
||||||
|
rejected the call"."""
|
||||||
|
agent = self._make_agent()
|
||||||
|
|
||||||
|
# Empty stream — no events at all
|
||||||
|
class _FakeStream:
|
||||||
|
def __iter__(self_inner):
|
||||||
|
return iter([])
|
||||||
|
def close(self_inner):
|
||||||
|
return None
|
||||||
|
|
||||||
|
mock_client = MagicMock()
|
||||||
|
mock_client.responses.create.return_value = _FakeStream()
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError) as excinfo:
|
||||||
|
agent._run_codex_create_stream_fallback(
|
||||||
|
{"model": "grok-4.3", "instructions": "hi", "input": []},
|
||||||
|
client=mock_client,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert "did not emit a terminal response" in str(excinfo.value)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue