mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-29 06:31:32 +00:00
* refactor(codex): drop SDK responses.stream() helper; consume events directly The OpenAI Python SDK's high-level `client.responses.stream(...)` helper does post-hoc typed reconstruction from the terminal `response.completed.response.output` field. The chatgpt.com Codex backend has been observed (today, gpt-5.5) to ship `response.output = null` on terminal frames, which crashes the SDK with `TypeError: 'NoneType' object is not iterable` mid-iteration. Carlton's #32963 patched the symptom by wrapping the helper in try/except and recovering from the same per-event accumulator the SDK was supposed to populate. This PR removes the helper from the call path entirely: we now use `client.responses.create(stream=True)` (raw AsyncIterable of SSE events) and assemble the final response object ourselves from `response.output_item.done` events as they arrive. The terminal event's `output` field is never read for content. Same strategy OpenClaw uses for the same backend. This makes Hermes structurally immune to the bug class, not patched. The next time OpenAI ships a shape change to chatgpt.com's terminal frame, our consumer keeps working because it doesn't read that frame for content — only for usage/status/id. Changes - `agent/codex_runtime.py`: new `_consume_codex_event_stream()` shared consumer; `run_codex_stream()` uses `responses.create(stream=True)`; `run_codex_create_stream_fallback()` collapses into a thin alias since the primary path now does what the fallback used to do. - `agent/auxiliary_client.py`: `_CodexCompletionsAdapter` uses the same consumer; old null-output recovery helpers deleted as unreferenced. - Tests migrated: fixtures that mocked `responses.stream` now mock `responses.create` returning a raw iterable. New regression test asserts the auxiliary path returns streamed items even when the terminal event's `output` is literally `null`. Validation - Live: tested against fresh OAuth on `chatgpt.com/backend-api/codex` with `gpt-5.5` — response built correctly with `response.output=null` on the terminal frame, all events consumed, usage/reasoning tokens propagated. - `tests/run_agent/test_run_agent_codex_responses.py` + `tests/agent/test_auxiliary_client.py`: 242 passed. * test+fix(codex): migrate streaming tests, raise on truncated streams CI surfaced 10 test failures across tests/run_agent/test_streaming.py and tests/run_agent/test_codex_xai_oauth_recovery.py — both files had their own `responses.stream(...)` mocks I missed in the first sweep. agent/codex_runtime.py: _consume_codex_event_stream() now raises "Codex Responses stream did not emit a terminal response" when the stream ends without any terminal frame AND no usable content. This preserves the signal callers used to get from the SDK's high-level helper, which they distinguished from "completed with empty body" in error handling. Tests migrated: - test_streaming.py: text-delta callback, activity-touch, and remote-protocol-error tests all switch from mocking responses.stream to responses.create returning an iterable of events. - test_codex_xai_oauth_recovery.py: prelude-error tests are recast as wire-error-event tests (the new path raises _StreamErrorEvent directly when the wire emits type=error, which is strictly better than the old two-phase "SDK RuntimeError → retry → fallback"). The retry-on-transport-error test moves from responses.stream side-effect to responses.create side-effect. Verified live against chatgpt.com Codex with gpt-5.5 — AIAgent.chat() through the full codex_responses path returns correctly, 319/319 targeted tests passing.
536 lines
22 KiB
Python
536 lines
22 KiB
Python
"""Codex API runtime — App Server and Responses-API streaming paths.
|
|
|
|
Extracted from :class:`AIAgent` to keep the agent loop file focused.
|
|
Each function takes the parent ``AIAgent`` as its first argument
|
|
(``agent``). AIAgent keeps thin forwarder methods for backward
|
|
compatibility.
|
|
|
|
* ``run_codex_app_server_turn`` — drives one turn through the
|
|
``codex_app_server`` subprocess client (used when a Codex CLI install
|
|
is the active provider).
|
|
* ``run_codex_stream`` — streams a Codex Responses API call (the
|
|
``codex_responses`` api_mode).
|
|
* ``run_codex_create_stream_fallback`` — recovery path when the
|
|
Responses ``stream=True`` initial create fails.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
import time
|
|
from types import SimpleNamespace
|
|
from typing import Any, Dict, List
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def run_codex_app_server_turn(
|
|
agent,
|
|
*,
|
|
user_message: str,
|
|
original_user_message: Any,
|
|
messages: List[Dict[str, Any]],
|
|
effective_task_id: str,
|
|
should_review_memory: bool = False,
|
|
) -> Dict[str, Any]:
|
|
"""Codex app-server runtime path. Hands the entire turn to a `codex
|
|
app-server` subprocess and projects its events back into Hermes'
|
|
messages list so memory/skill review keep working.
|
|
|
|
Called from run_conversation() when agent.api_mode == "codex_app_server".
|
|
Returns the same dict shape as the chat_completions path.
|
|
"""
|
|
from agent.transports.codex_app_server_session import CodexAppServerSession
|
|
|
|
# Lazy session: one CodexAppServerSession per AIAgent instance.
|
|
# Spawned on first turn, reused across turns, closed at AIAgent
|
|
# shutdown (see _cleanup hook).
|
|
if not hasattr(agent, "_codex_session") or agent._codex_session is None:
|
|
cwd = getattr(agent, "session_cwd", None) or os.getcwd()
|
|
# Approval callback: defer to Hermes' standard prompt flow if a
|
|
# CLI thread has installed one. Gateway / cron contexts get the
|
|
# codex-side fail-closed default.
|
|
try:
|
|
from tools.terminal_tool import _get_approval_callback
|
|
approval_callback = _get_approval_callback()
|
|
except Exception:
|
|
approval_callback = None
|
|
agent._codex_session = CodexAppServerSession(
|
|
cwd=cwd,
|
|
approval_callback=approval_callback,
|
|
)
|
|
|
|
# NOTE: the user message is ALREADY appended to messages by the
|
|
# standard run_conversation() flow (line ~11823) before the early
|
|
# return reaches us. Do NOT append again — that would duplicate.
|
|
|
|
try:
|
|
turn = agent._codex_session.run_turn(user_input=user_message)
|
|
except Exception as exc:
|
|
logger.exception("codex app-server turn failed")
|
|
# Crash → unconditionally drop the session so the next turn
|
|
# respawns from scratch instead of reusing a dead client.
|
|
try:
|
|
agent._codex_session.close()
|
|
except Exception:
|
|
pass
|
|
agent._codex_session = None
|
|
return {
|
|
"final_response": (
|
|
f"Codex app-server turn failed: {exc}. "
|
|
f"Fall back to default runtime with `/codex-runtime auto`."
|
|
),
|
|
"messages": messages,
|
|
"api_calls": 0,
|
|
"completed": False,
|
|
"partial": True,
|
|
"error": str(exc),
|
|
}
|
|
|
|
# If the turn signalled the underlying client is wedged (deadline
|
|
# blown, post-tool watchdog tripped, OAuth refresh died, subprocess
|
|
# exited), retire the session so the next turn respawns codex
|
|
# rather than riding the broken process. Mirrors openclaw beta.8's
|
|
# "retire timed-out app-server clients" fix.
|
|
if getattr(turn, "should_retire", False):
|
|
logger.warning(
|
|
"codex app-server session retired (turn error: %s)",
|
|
turn.error,
|
|
)
|
|
try:
|
|
agent._codex_session.close()
|
|
except Exception:
|
|
pass
|
|
agent._codex_session = None
|
|
|
|
# Splice projected messages into the conversation. The projector emits
|
|
# standard {role, content, tool_calls, tool_call_id} entries, which
|
|
# is exactly what curator.py / sessions DB expect.
|
|
if turn.projected_messages:
|
|
messages.extend(turn.projected_messages)
|
|
|
|
# Counter ticks for the agent-improvement loop.
|
|
# _turns_since_memory and _user_turn_count are ALREADY incremented
|
|
# in the run_conversation() pre-loop block (lines ~11793-11817) so we
|
|
# do NOT touch them here — that would double-count.
|
|
# Only _iters_since_skill needs explicit increment, since the
|
|
# chat_completions loop bumps it per tool iteration (line ~12110)
|
|
# and that loop is bypassed on this path.
|
|
agent._iters_since_skill = (
|
|
getattr(agent, "_iters_since_skill", 0) + turn.tool_iterations
|
|
)
|
|
|
|
# Now check the skill nudge AFTER iters were incremented — same
|
|
# pattern the chat_completions path uses (line ~15432).
|
|
should_review_skills = False
|
|
if (
|
|
agent._skill_nudge_interval > 0
|
|
and agent._iters_since_skill >= agent._skill_nudge_interval
|
|
and "skill_manage" in agent.valid_tool_names
|
|
):
|
|
should_review_skills = True
|
|
agent._iters_since_skill = 0
|
|
|
|
# External memory provider sync (mirrors line ~15439). Skipped on
|
|
# interrupt/error to avoid feeding partial transcripts to memory.
|
|
if not turn.interrupted and turn.error is None:
|
|
try:
|
|
agent._sync_external_memory_for_turn(
|
|
original_user_message=original_user_message,
|
|
final_response=turn.final_text,
|
|
interrupted=False,
|
|
)
|
|
except Exception:
|
|
logger.debug("external memory sync raised", exc_info=True)
|
|
|
|
# Background review fork — same cadence + signature as the default
|
|
# path (line ~15449). Only fires when a trigger actually tripped AND
|
|
# we have a real final response.
|
|
if (
|
|
turn.final_text
|
|
and not turn.interrupted
|
|
and (should_review_memory or should_review_skills)
|
|
):
|
|
try:
|
|
agent._spawn_background_review(
|
|
messages_snapshot=list(messages),
|
|
review_memory=should_review_memory,
|
|
review_skills=should_review_skills,
|
|
)
|
|
except Exception:
|
|
logger.debug("background review spawn raised", exc_info=True)
|
|
|
|
return {
|
|
"final_response": turn.final_text,
|
|
"messages": messages,
|
|
"api_calls": 1, # one app-server "turn" maps to one logical API call
|
|
"completed": not turn.interrupted and turn.error is None,
|
|
"partial": turn.interrupted or turn.error is not None,
|
|
"error": turn.error,
|
|
"codex_thread_id": turn.thread_id,
|
|
"codex_turn_id": turn.turn_id,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Event-driven Responses streaming
|
|
#
|
|
# OpenAI ships its consumer Codex backend (chatgpt.com/backend-api/codex) on
|
|
# a different schedule from the openai Python SDK. The high-level
|
|
# ``client.responses.stream(...)`` helper reconstructs a typed Response from
|
|
# the terminal ``response.completed`` event's ``response.output`` field, and
|
|
# when that field drifts to ``null`` (gpt-5.5, May 2026) the SDK raises
|
|
# ``TypeError: 'NoneType' object is not iterable`` mid-iteration.
|
|
#
|
|
# We sidestep the whole class of failure by going one level lower:
|
|
# ``client.responses.create(stream=True)`` returns the raw AsyncIterable of
|
|
# SSE events, and we assemble the final response object purely from
|
|
# ``response.output_item.done`` events as they arrive. We never read
|
|
# ``response.completed.response.output`` for content reconstruction, so the
|
|
# backend can return ``null``, ``[]``, a string, or omit the field entirely
|
|
# and we don't care.
|
|
#
|
|
# This mirrors what the OpenClaw TS implementation does for the same backend
|
|
# and is structurally immune to the bug class rather than patched.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
_TERMINAL_EVENT_TYPES = frozenset({
|
|
"response.completed",
|
|
"response.incomplete",
|
|
"response.failed",
|
|
})
|
|
|
|
|
|
def _event_field(event: Any, name: str, default: Any = None) -> Any:
|
|
"""Field access that handles both attr-style (SDK objects) and dict (raw JSON) events."""
|
|
value = getattr(event, name, None)
|
|
if value is None and isinstance(event, dict):
|
|
value = event.get(name, default)
|
|
return value if value is not None else default
|
|
|
|
|
|
def _raise_stream_error(event: Any) -> None:
|
|
"""Raise a ``_StreamErrorEvent`` from a ``type=error`` SSE frame.
|
|
|
|
Imported lazily so this module stays importable from places that don't
|
|
pull in ``run_agent`` (e.g. plugin code, doc tools).
|
|
"""
|
|
from run_agent import _StreamErrorEvent
|
|
message = (_event_field(event, "message", "") or "stream emitted error event").strip()
|
|
raise _StreamErrorEvent(
|
|
message,
|
|
code=_event_field(event, "code"),
|
|
param=_event_field(event, "param"),
|
|
)
|
|
|
|
|
|
def _consume_codex_event_stream(
|
|
event_iter: Any,
|
|
*,
|
|
model: str,
|
|
on_text_delta=None,
|
|
on_reasoning_delta=None,
|
|
on_first_delta=None,
|
|
on_event=None,
|
|
interrupt_check=None,
|
|
) -> SimpleNamespace:
|
|
"""Consume a Codex Responses SSE event stream and return a final response.
|
|
|
|
The returned object is a ``SimpleNamespace`` shaped like the SDK's typed
|
|
``Response`` for the fields downstream code actually reads:
|
|
|
|
* ``output``: list of output items, assembled from ``response.output_item.done``.
|
|
For tool-call turns this contains the function_call items; for plain-text
|
|
turns it contains a synthesized ``message`` item built from streamed deltas
|
|
if no message item was emitted directly.
|
|
* ``output_text``: assembled text from ``response.output_text.delta`` deltas.
|
|
* ``usage``: copied from the terminal event's ``response.usage`` (when present).
|
|
* ``status``: ``completed`` / ``incomplete`` / ``failed`` (or ``completed`` if
|
|
the stream ended without a terminal frame but produced content).
|
|
* ``id``: ``response.id`` when present.
|
|
* ``incomplete_details``: passed through for ``response.incomplete`` frames.
|
|
* ``error``: passed through for ``response.failed`` frames.
|
|
* ``model``: from kwargs (the wire model name is not authoritative).
|
|
|
|
Critically, we never read ``response.output`` from the terminal event for
|
|
content reconstruction — only ``usage``, ``status``, ``id``. That field
|
|
being ``null`` / ``[]`` / missing is fine.
|
|
|
|
Callbacks:
|
|
|
|
* ``on_text_delta(str)`` — fires per ``response.output_text.delta``, suppressed
|
|
once a function_call event is seen (so tool-call turns don't bleed text
|
|
into the chat).
|
|
* ``on_reasoning_delta(str)`` — fires per ``response.reasoning.*.delta``.
|
|
* ``on_first_delta()`` — one-shot, fires on the first text delta only.
|
|
* ``on_event(event)`` — fires for every event before any other processing.
|
|
Used for watchdog activity, debug logging, anything wire-shape-agnostic.
|
|
* ``interrupt_check()`` — returns True to break the loop early.
|
|
"""
|
|
collected_output_items: List[Any] = []
|
|
collected_text_deltas: List[str] = []
|
|
has_tool_calls = False
|
|
first_delta_fired = False
|
|
terminal_status: str = "completed"
|
|
terminal_usage: Any = None
|
|
terminal_response_id: str = None
|
|
terminal_incomplete_details: Any = None
|
|
terminal_error: Any = None
|
|
saw_terminal = False
|
|
|
|
for event in event_iter:
|
|
if on_event is not None:
|
|
try:
|
|
on_event(event)
|
|
except (TimeoutError, InterruptedError):
|
|
# Control-flow signals from watchdog/cancellation hooks must
|
|
# propagate, not get swallowed as "debug noise".
|
|
raise
|
|
except Exception:
|
|
# Genuine bugs in third-party debug/log hooks shouldn't break
|
|
# stream consumption.
|
|
logger.debug("Codex stream on_event hook raised", exc_info=True)
|
|
if interrupt_check is not None and interrupt_check():
|
|
break
|
|
|
|
event_type = _event_field(event, "type", "")
|
|
if not isinstance(event_type, str):
|
|
event_type = ""
|
|
|
|
# ``error`` SSE frames carry the provider's real failure reason
|
|
# (subscription / quota / model-not-available / rejected-reasoning-replay)
|
|
# but never appear in the terminal set. Surface them as a structured
|
|
# exception so the credential pool + error classifier see the body.
|
|
if event_type == "error":
|
|
_raise_stream_error(event)
|
|
|
|
if "output_text.delta" in event_type or event_type == "response.output_text.delta":
|
|
delta_text = _event_field(event, "delta", "")
|
|
if delta_text:
|
|
collected_text_deltas.append(delta_text)
|
|
if not has_tool_calls:
|
|
if not first_delta_fired:
|
|
first_delta_fired = True
|
|
if on_first_delta is not None:
|
|
try:
|
|
on_first_delta()
|
|
except Exception:
|
|
logger.debug("Codex stream on_first_delta raised", exc_info=True)
|
|
if on_text_delta is not None:
|
|
try:
|
|
on_text_delta(delta_text)
|
|
except Exception:
|
|
logger.debug("Codex stream on_text_delta raised", exc_info=True)
|
|
continue
|
|
|
|
if "function_call" in event_type:
|
|
has_tool_calls = True
|
|
# fall through — function_call items still get added on output_item.done
|
|
|
|
if "reasoning" in event_type and "delta" in event_type:
|
|
reasoning_text = _event_field(event, "delta", "")
|
|
if reasoning_text and on_reasoning_delta is not None:
|
|
try:
|
|
on_reasoning_delta(reasoning_text)
|
|
except Exception:
|
|
logger.debug("Codex stream on_reasoning_delta raised", exc_info=True)
|
|
continue
|
|
|
|
if event_type == "response.output_item.done":
|
|
done_item = _event_field(event, "item")
|
|
if done_item is not None:
|
|
collected_output_items.append(done_item)
|
|
continue
|
|
|
|
if event_type in _TERMINAL_EVENT_TYPES:
|
|
saw_terminal = True
|
|
resp_obj = _event_field(event, "response")
|
|
if resp_obj is not None:
|
|
terminal_usage = getattr(resp_obj, "usage", None)
|
|
if terminal_usage is None and isinstance(resp_obj, dict):
|
|
terminal_usage = resp_obj.get("usage")
|
|
rid = getattr(resp_obj, "id", None)
|
|
if rid is None and isinstance(resp_obj, dict):
|
|
rid = resp_obj.get("id")
|
|
terminal_response_id = rid
|
|
rstatus = getattr(resp_obj, "status", None)
|
|
if rstatus is None and isinstance(resp_obj, dict):
|
|
rstatus = resp_obj.get("status")
|
|
if isinstance(rstatus, str):
|
|
terminal_status = rstatus
|
|
if event_type == "response.incomplete":
|
|
terminal_incomplete_details = getattr(resp_obj, "incomplete_details", None)
|
|
if terminal_incomplete_details is None and isinstance(resp_obj, dict):
|
|
terminal_incomplete_details = resp_obj.get("incomplete_details")
|
|
if event_type == "response.failed":
|
|
terminal_error = getattr(resp_obj, "error", None)
|
|
if terminal_error is None and isinstance(resp_obj, dict):
|
|
terminal_error = resp_obj.get("error")
|
|
if event_type == "response.completed":
|
|
terminal_status = terminal_status or "completed"
|
|
elif event_type == "response.incomplete":
|
|
terminal_status = terminal_status or "incomplete"
|
|
elif event_type == "response.failed":
|
|
terminal_status = terminal_status or "failed"
|
|
# Stop on terminal event.
|
|
break
|
|
|
|
# Build the final output list. Prefer items observed via output_item.done;
|
|
# if none arrived but we streamed plain text deltas (no tool calls), synthesize
|
|
# a single message item so downstream normalization has something to work with.
|
|
if collected_output_items:
|
|
output = list(collected_output_items)
|
|
elif collected_text_deltas and not has_tool_calls:
|
|
assembled = "".join(collected_text_deltas)
|
|
output = [SimpleNamespace(
|
|
type="message",
|
|
role="assistant",
|
|
status="completed",
|
|
content=[SimpleNamespace(type="output_text", text=assembled)],
|
|
)]
|
|
else:
|
|
output = []
|
|
|
|
# If the stream ended without any terminal event AND produced no usable
|
|
# content (no items, no text deltas), surface that as a RuntimeError so
|
|
# callers can distinguish "stream truncated mid-flight / provider rejected
|
|
# the call" from "stream completed with empty body". This preserves the
|
|
# signal the SDK's high-level helper used to raise as
|
|
# ``RuntimeError("Didn't receive a `response.completed` event.")``.
|
|
if not saw_terminal and not output:
|
|
raise RuntimeError(
|
|
"Codex Responses stream did not emit a terminal response"
|
|
)
|
|
|
|
assembled_text = "".join(collected_text_deltas)
|
|
|
|
final = SimpleNamespace(
|
|
output=output,
|
|
output_text=assembled_text,
|
|
usage=terminal_usage,
|
|
status=terminal_status,
|
|
id=terminal_response_id,
|
|
model=model,
|
|
incomplete_details=terminal_incomplete_details,
|
|
error=terminal_error,
|
|
)
|
|
return final
|
|
|
|
|
|
def run_codex_stream(agent, api_kwargs: dict, client: Any = None, on_first_delta=None):
|
|
"""Execute one streaming Responses API request and return the final response.
|
|
|
|
Uses ``responses.create(stream=True)`` (low-level raw event iteration)
|
|
rather than the high-level ``responses.stream(...)`` helper. This makes
|
|
us structurally immune to backend drift in the ``response.completed``
|
|
payload shape — we never let the SDK reconstruct a typed object from
|
|
the terminal event's ``output`` field.
|
|
"""
|
|
import httpx as _httpx
|
|
|
|
active_client = client or agent._ensure_primary_openai_client(reason="codex_stream_direct")
|
|
max_stream_retries = 1
|
|
# Accumulate streamed text so callers / compat shims can read it.
|
|
agent._codex_streamed_text_parts: list = []
|
|
|
|
def _on_text_delta(text: str) -> None:
|
|
agent._codex_streamed_text_parts.append(text)
|
|
agent._fire_stream_delta(text)
|
|
|
|
def _on_reasoning_delta(text: str) -> None:
|
|
agent._fire_reasoning_delta(text)
|
|
|
|
def _on_event(event: Any) -> None:
|
|
# TTFB watchdog and activity touch — runs once per SSE event.
|
|
agent._codex_stream_last_event_ts = time.time()
|
|
agent._touch_activity("receiving stream response")
|
|
|
|
def _interrupt_check() -> bool:
|
|
return bool(agent._interrupt_requested)
|
|
|
|
for attempt in range(max_stream_retries + 1):
|
|
if agent._interrupt_requested:
|
|
raise InterruptedError("Agent interrupted before Codex stream retry")
|
|
|
|
stream_kwargs = dict(api_kwargs)
|
|
stream_kwargs["stream"] = True
|
|
|
|
try:
|
|
event_stream = active_client.responses.create(**stream_kwargs)
|
|
except (_httpx.RemoteProtocolError, _httpx.ReadTimeout, _httpx.ConnectError, ConnectionError) as exc:
|
|
if attempt < max_stream_retries:
|
|
logger.debug(
|
|
"Codex Responses stream connect failed (attempt %s/%s); retrying. %s error=%s",
|
|
attempt + 1, max_stream_retries + 1,
|
|
agent._client_log_context(), exc,
|
|
)
|
|
continue
|
|
raise
|
|
|
|
try:
|
|
# Compatibility: some mocks/providers return a concrete response
|
|
# instead of an iterable. Pass it straight through.
|
|
if hasattr(event_stream, "output") and not hasattr(event_stream, "__iter__"):
|
|
return event_stream
|
|
|
|
try:
|
|
final = _consume_codex_event_stream(
|
|
event_stream,
|
|
model=api_kwargs.get("model"),
|
|
on_text_delta=_on_text_delta,
|
|
on_reasoning_delta=_on_reasoning_delta,
|
|
on_first_delta=on_first_delta,
|
|
on_event=_on_event,
|
|
interrupt_check=_interrupt_check,
|
|
)
|
|
except (_httpx.RemoteProtocolError, _httpx.ReadTimeout, _httpx.ConnectError, ConnectionError) as exc:
|
|
if attempt < max_stream_retries:
|
|
logger.debug(
|
|
"Codex Responses stream transport failed mid-iteration "
|
|
"(attempt %s/%s); retrying. %s error=%s",
|
|
attempt + 1, max_stream_retries + 1,
|
|
agent._client_log_context(), exc,
|
|
)
|
|
continue
|
|
raise
|
|
|
|
if final.status in {"incomplete", "failed"}:
|
|
logger.warning(
|
|
"Codex Responses stream terminal status=%s "
|
|
"(incomplete_details=%s, error=%s, streamed_chars=%d). %s",
|
|
final.status, final.incomplete_details, final.error,
|
|
sum(len(p) for p in agent._codex_streamed_text_parts),
|
|
agent._client_log_context(),
|
|
)
|
|
|
|
return final
|
|
finally:
|
|
close_fn = getattr(event_stream, "close", None)
|
|
if callable(close_fn):
|
|
try:
|
|
close_fn()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def run_codex_create_stream_fallback(agent, api_kwargs: dict, client: Any = None):
|
|
"""Backward-compatible alias for the unified event-driven path.
|
|
|
|
Historically this was the fallback when the SDK's high-level
|
|
``responses.stream(...)`` helper raised on shape drift. The primary
|
|
path now does exactly what the fallback did, so this just forwards.
|
|
Kept as a public symbol because tests and a small number of call sites
|
|
still reference it by name.
|
|
"""
|
|
return run_codex_stream(agent, api_kwargs, client=client)
|
|
|
|
|
|
__all__ = [
|
|
"run_codex_app_server_turn",
|
|
"run_codex_stream",
|
|
"run_codex_create_stream_fallback",
|
|
"_consume_codex_event_stream",
|
|
]
|