From 43a3f119fc68ae6b2d3e9fa20eacbbb1480d9e32 Mon Sep 17 00:00:00 2001 From: Carlton <276689385+carltonawong@users.noreply.github.com> Date: Tue, 26 May 2026 17:15:01 -0700 Subject: [PATCH] fix(agent): recover Codex streams with null output --- agent/auxiliary_client.py | 105 +++++++++++----- agent/codex_runtime.py | 113 +++++++++++++----- tests/agent/test_auxiliary_client.py | 39 ++++++ .../test_run_agent_codex_responses.py | 55 +++++++++ 4 files changed, 250 insertions(+), 62 deletions(-) diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index e6d42dd2165..18197ae309e 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -107,6 +107,32 @@ from utils import base_url_host_matches, base_url_hostname, normalize_proxy_env_ logger = logging.getLogger(__name__) +def _responses_null_output_iterable_error(exc: BaseException) -> bool: + """True when the OpenAI SDK trips over terminal response.output=None.""" + text = str(exc) + return isinstance(exc, TypeError) and "NoneType" in text and "not iterable" in text + + +def _responses_backfilled_response(output_items: List[Any], text_parts: List[str], *, has_function_calls: bool, model: str = None) -> Optional[Any]: + """Build a minimal Responses-like object from already streamed events.""" + if output_items: + return SimpleNamespace(output=list(output_items), usage=None, status="completed", model=model) + if text_parts and not has_function_calls: + assembled = "".join(text_parts) + return SimpleNamespace( + output=[SimpleNamespace( + type="message", + role="assistant", + status="completed", + content=[SimpleNamespace(type="output_text", text=assembled)], + )], + usage=None, + status="completed", + model=model, + ) + return None + + def _safe_isinstance(obj: Any, maybe_type: Any) -> bool: """Return False instead of raising when a patched symbol is not a type.""" try: @@ -796,44 +822,61 @@ class _CodexCompletionsAdapter: timeout_timer.daemon = True timeout_timer.start() _check_cancelled() + final = None with self._client.responses.stream(**resp_kwargs) as stream: - for _event in stream: + try: + for _event in stream: + _check_cancelled() + _etype = getattr(_event, "type", "") + if _etype == "response.output_item.done": + _done = getattr(_event, "item", None) + if _done is not None: + collected_output_items.append(_done) + elif "output_text.delta" in _etype: + _delta = getattr(_event, "delta", "") + if _delta: + collected_text_deltas.append(_delta) + elif "function_call" in _etype: + has_function_calls = True _check_cancelled() - _etype = getattr(_event, "type", "") - if _etype == "response.output_item.done": - _done = getattr(_event, "item", None) - if _done is not None: - collected_output_items.append(_done) - elif "output_text.delta" in _etype: - _delta = getattr(_event, "delta", "") - if _delta: - collected_text_deltas.append(_delta) - elif "function_call" in _etype: - has_function_calls = True - _check_cancelled() - final = stream.get_final_response() + final = stream.get_final_response() + except TypeError as exc: + if not _responses_null_output_iterable_error(exc): + raise + final = _responses_backfilled_response( + collected_output_items, + collected_text_deltas, + has_function_calls=has_function_calls, + model=resp_kwargs.get("model"), + ) + if final is None: + raise + logger.debug( + "Codex auxiliary Responses stream parser hit response.output=None; " + "recovered from streamed events (items=%d, text_parts=%d)", + len(collected_output_items), + len(collected_text_deltas), + ) + + if final is None: + raise RuntimeError("Codex auxiliary Responses stream did not return a final response") # Backfill empty output from collected stream events _output = getattr(final, "output", None) - if isinstance(_output, list) and not _output: - if collected_output_items: - final.output = list(collected_output_items) + if _output is None or (isinstance(_output, list) and not _output): + recovered = _responses_backfilled_response( + collected_output_items, + collected_text_deltas, + has_function_calls=has_function_calls, + model=resp_kwargs.get("model"), + ) + if recovered is not None: + final.output = recovered.output logger.debug( - "Codex auxiliary: backfilled %d output items from stream events", + "Codex auxiliary: backfilled missing output from stream events " + "(items=%d, text_parts=%d)", len(collected_output_items), - ) - elif collected_text_deltas and not has_function_calls: - # Only synthesize text when no tool calls were streamed — - # a function_call response with incidental text should not - # be collapsed into a plain-text message. - assembled = "".join(collected_text_deltas) - final.output = [SimpleNamespace( - type="message", role="assistant", status="completed", - content=[SimpleNamespace(type="output_text", text=assembled)], - )] - logger.debug( - "Codex auxiliary: synthesized from %d deltas (%d chars)", - len(collected_text_deltas), len(assembled), + len(collected_text_deltas), ) # Extract text and tool calls from the Responses output. diff --git a/agent/codex_runtime.py b/agent/codex_runtime.py index 8c5dff39bff..609a41c1451 100644 --- a/agent/codex_runtime.py +++ b/agent/codex_runtime.py @@ -176,6 +176,37 @@ def run_codex_app_server_turn( +def _responses_null_output_iterable_error(exc: BaseException) -> bool: + """True when the OpenAI SDK trips over terminal response.output=None.""" + text = str(exc) + return isinstance(exc, TypeError) and "NoneType" in text and "not iterable" in text + + +def _codex_backfilled_response(output_items: list, text_parts: list, *, has_tool_calls: bool, model: str = None): + """Build a minimal Responses-like object from events already streamed.""" + if output_items: + return SimpleNamespace( + output=list(output_items), + usage=None, + status="completed", + model=model, + ) + if text_parts and not has_tool_calls: + assembled = "".join(text_parts) + return SimpleNamespace( + output=[SimpleNamespace( + type="message", + role="assistant", + status="completed", + content=[SimpleNamespace(type="output_text", text=assembled)], + )], + usage=None, + status="completed", + model=model, + ) + return None + + def run_codex_stream(agent, api_kwargs: dict, client: Any = None, on_first_delta: callable = None): """Execute one streaming Responses API request and return the final response.""" import httpx as _httpx @@ -251,24 +282,20 @@ def run_codex_stream(agent, api_kwargs: dict, client: Any = None, on_first_delta # but get_final_response() can return an empty output list. # Backfill from collected items or synthesize from deltas. _out = getattr(final_response, "output", None) - if isinstance(_out, list) and not _out: - if collected_output_items: - final_response.output = list(collected_output_items) + if _out is None or (isinstance(_out, list) and not _out): + recovered = _codex_backfilled_response( + collected_output_items, + agent._codex_streamed_text_parts, + has_tool_calls=has_tool_calls, + model=api_kwargs.get("model"), + ) + if recovered is not None: + final_response.output = recovered.output logger.debug( - "Codex stream: backfilled %d output items from stream events", + "Codex stream: backfilled missing output from stream events " + "(items=%d, text_parts=%d)", len(collected_output_items), - ) - elif agent._codex_streamed_text_parts and not has_tool_calls: - assembled = "".join(agent._codex_streamed_text_parts) - final_response.output = [SimpleNamespace( - type="message", - role="assistant", - status="completed", - content=[SimpleNamespace(type="output_text", text=assembled)], - )] - logger.debug( - "Codex stream: synthesized output from %d text deltas (%d chars)", - len(agent._codex_streamed_text_parts), len(assembled), + len(agent._codex_streamed_text_parts), ) return final_response except (_httpx.RemoteProtocolError, _httpx.ReadTimeout, _httpx.ConnectError, ConnectionError) as exc: @@ -287,6 +314,30 @@ def run_codex_stream(agent, api_kwargs: dict, client: Any = None, on_first_delta exc, ) return agent._run_codex_create_stream_fallback(api_kwargs, client=active_client) + except TypeError as exc: + if _responses_null_output_iterable_error(exc): + recovered = _codex_backfilled_response( + collected_output_items, + agent._codex_streamed_text_parts, + has_tool_calls=has_tool_calls, + model=api_kwargs.get("model"), + ) + if recovered is not None: + logger.debug( + "Codex Responses stream parser hit response.output=None; " + "recovered from streamed events (items=%d, text_parts=%d). %s", + len(collected_output_items), + len(agent._codex_streamed_text_parts), + agent._client_log_context(), + ) + return recovered + logger.debug( + "Codex Responses stream parser hit response.output=None without " + "recoverable events; falling back to create(stream=True). %s", + agent._client_log_context(), + ) + return agent._run_codex_create_stream_fallback(api_kwargs, client=active_client) + raise except RuntimeError as exc: err_text = str(exc) missing_completed = "response.completed" in err_text @@ -355,6 +406,7 @@ def run_codex_create_stream_fallback(agent, api_kwargs: dict, client: Any = None terminal_response = None collected_output_items: list = [] collected_text_deltas: list = [] + has_tool_calls = False try: for event in stream_or_response: agent._touch_activity("receiving stream response") @@ -404,6 +456,8 @@ def run_codex_create_stream_fallback(agent, api_kwargs: dict, client: Any = None delta = event.get("delta", "") if delta: collected_text_deltas.append(delta) + elif event_type and "function_call" in event_type: + has_tool_calls = True if event_type not in {"response.completed", "response.incomplete", "response.failed"}: continue @@ -414,23 +468,20 @@ def run_codex_create_stream_fallback(agent, api_kwargs: dict, client: Any = None if terminal_response is not None: # Backfill empty output from collected stream events _out = getattr(terminal_response, "output", None) - if isinstance(_out, list) and not _out: - if collected_output_items: - terminal_response.output = list(collected_output_items) + if _out is None or (isinstance(_out, list) and not _out): + recovered = _codex_backfilled_response( + collected_output_items, + collected_text_deltas, + has_tool_calls=has_tool_calls, + model=fallback_kwargs.get("model"), + ) + if recovered is not None: + terminal_response.output = recovered.output logger.debug( - "Codex fallback stream: backfilled %d output items", + "Codex fallback stream: backfilled missing output " + "(items=%d, text_parts=%d)", len(collected_output_items), - ) - elif collected_text_deltas: - assembled = "".join(collected_text_deltas) - terminal_response.output = [SimpleNamespace( - type="message", role="assistant", - status="completed", - content=[SimpleNamespace(type="output_text", text=assembled)], - )] - logger.debug( - "Codex fallback stream: synthesized from %d deltas (%d chars)", - len(collected_text_deltas), len(assembled), + len(collected_text_deltas), ) return terminal_response finally: diff --git a/tests/agent/test_auxiliary_client.py b/tests/agent/test_auxiliary_client.py index 20c30c7ea9e..eb99629961d 100644 --- a/tests/agent/test_auxiliary_client.py +++ b/tests/agent/test_auxiliary_client.py @@ -2554,6 +2554,45 @@ class TestCodexAuxiliaryAdapterTimeout: assert time.monotonic() - started < 0.14 +class TestCodexAuxiliaryAdapterNullOutputRecovery: + def test_recovers_output_item_when_sdk_raises_during_iteration(self): + """Regression for #11179 in auxiliary calls such as compression/title generation.""" + + output_item = SimpleNamespace( + type="message", + content=[SimpleNamespace(type="output_text", text="aux survived")], + ) + + class NullOutputParseStream: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def __iter__(self): + yield SimpleNamespace(type="response.output_item.done", item=output_item) + raise TypeError("'NoneType' object is not iterable") + + def get_final_response(self): # pragma: no cover - iterator fails first + raise AssertionError("get_final_response should not be reached") + + class FakeResponses: + def __init__(self): + self.create = MagicMock() + + def stream(self, **kwargs): + return NullOutputParseStream() + + fake_client = SimpleNamespace(responses=FakeResponses()) + adapter = _CodexCompletionsAdapter(fake_client, "gpt-5.5") + + response = adapter.create(messages=[{"role": "user", "content": "summarize"}]) + + assert response.choices[0].message.content == "aux survived" + fake_client.responses.create.assert_not_called() + + # --------------------------------------------------------------------------- # Issue #23432 — auxiliary timeout poisons cached client; later aux calls fail # --------------------------------------------------------------------------- diff --git a/tests/run_agent/test_run_agent_codex_responses.py b/tests/run_agent/test_run_agent_codex_responses.py index bc575cc676f..fad3f68ffe3 100644 --- a/tests/run_agent/test_run_agent_codex_responses.py +++ b/tests/run_agent/test_run_agent_codex_responses.py @@ -186,6 +186,27 @@ class _FakeCreateStream: self.closed = True +class _IteratorTypeErrorStream: + """Mimic the SDK raising while parsing response.completed.output=None.""" + + def __init__(self, events_before_error): + self._events_before_error = list(events_before_error) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def __iter__(self): + for event in self._events_before_error: + yield event + raise TypeError("'NoneType' object is not iterable") + + def get_final_response(self): # pragma: no cover - iterator fails first + raise AssertionError("get_final_response should not be reached") + + def _codex_request_kwargs(): return { "model": "gpt-5-codex", @@ -484,6 +505,40 @@ def test_run_codex_stream_fallback_parses_create_stream_events(monkeypatch): assert response.output[0].content[0].text == "streamed create ok" +def test_run_codex_stream_falls_back_when_stream_iteration_parses_null_output(monkeypatch): + """Regression for #11179: the SDK can raise while iterating response.completed. + + The failure happens before get_final_response(), so post-loop backfill alone is + not enough. Preserve already streamed output_item.done events. + """ + agent = _build_agent(monkeypatch) + output_item = SimpleNamespace( + type="message", + status="completed", + content=[SimpleNamespace(type="output_text", text="stream item survived")], + ) + calls = {"stream": 0} + + def _fake_stream(**kwargs): + calls["stream"] += 1 + return _IteratorTypeErrorStream([ + SimpleNamespace(type="response.output_item.done", item=output_item), + ]) + + def _unexpected_create(**kwargs): # pragma: no cover - recovery should avoid fallback call + raise AssertionError("create fallback should not be needed when output items were collected") + + agent.client = SimpleNamespace( + responses=SimpleNamespace(stream=_fake_stream, create=_unexpected_create), + ) + + response = agent._run_codex_stream(_codex_request_kwargs()) + + assert calls["stream"] == 1 + assert response.output == [output_item] + assert response.status == "completed" + + def test_run_conversation_codex_plain_text(monkeypatch): agent = _build_agent(monkeypatch) monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: _codex_message_response("OK"))