diff --git a/run_agent.py b/run_agent.py index 95926ff807e..f5af556bbcb 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3873,6 +3873,7 @@ class AIAgent: # response.incomplete instead of response.completed). self._codex_streamed_text_parts: list = [] for attempt in range(max_stream_retries + 1): + collected_output_items: list = [] try: with active_client.responses.stream(**api_kwargs) as stream: for event in stream: @@ -3882,6 +3883,8 @@ class AIAgent: # Fire callbacks on text content deltas (suppress during tool calls) if "output_text.delta" in event_type or event_type == "response.output_text.delta": delta_text = getattr(event, "delta", "") + if delta_text: + self._codex_streamed_text_parts.append(delta_text) if delta_text and not has_tool_calls: if not first_delta_fired: first_delta_fired = True @@ -3891,7 +3894,6 @@ class AIAgent: except Exception: pass self._fire_stream_delta(delta_text) - self._codex_streamed_text_parts.append(delta_text) # Track tool calls to suppress text streaming elif "function_call" in event_type: has_tool_calls = True @@ -3900,6 +3902,14 @@ class AIAgent: reasoning_text = getattr(event, "delta", "") if reasoning_text: self._fire_reasoning_delta(reasoning_text) + # Collect completed output items — some backends + # (chatgpt.com/backend-api/codex) stream valid items + # via response.output_item.done but the SDK's + # get_final_response() returns an empty output list. + elif event_type == "response.output_item.done": + done_item = getattr(event, "item", None) + if done_item is not None: + collected_output_items.append(done_item) # Log non-completed terminal events for diagnostics elif event_type in ("response.incomplete", "response.failed"): resp_obj = getattr(event, "response", None) @@ -3912,7 +3922,31 @@ class AIAgent: sum(len(p) for p in self._codex_streamed_text_parts), self._client_log_context(), ) - return stream.get_final_response() + final_response = stream.get_final_response() + # PATCH: ChatGPT Codex backend streams valid output items + # but get_final_response() can return an empty output list. + # Backfill from collected items or synthesize from deltas. + _out = getattr(final_response, "output", None) + if isinstance(_out, list) and not _out: + if collected_output_items: + final_response.output = list(collected_output_items) + logger.debug( + "Codex stream: backfilled %d output items from stream events", + len(collected_output_items), + ) + elif self._codex_streamed_text_parts and not has_tool_calls: + assembled = "".join(self._codex_streamed_text_parts) + final_response.output = [SimpleNamespace( + type="message", + role="assistant", + status="completed", + content=[SimpleNamespace(type="output_text", text=assembled)], + )] + logger.debug( + "Codex stream: synthesized output from %d text deltas (%d chars)", + len(self._codex_streamed_text_parts), len(assembled), + ) + return final_response except (_httpx.RemoteProtocolError, _httpx.ReadTimeout, _httpx.ConnectError, ConnectionError) as exc: if attempt < max_stream_retries: logger.debug( @@ -7383,50 +7417,20 @@ class AIAgent: response_invalid = True error_details.append("response.output is not a list") elif len(output_items) == 0: - # Log diagnostics for empty output + # If we reach here, _run_codex_stream's backfill + # from output_item.done events and text-delta + # synthesis both failed to populate output. _resp_status = getattr(response, "status", None) _resp_incomplete = getattr(response, "incomplete_details", None) - _streamed_parts = getattr(self, "_codex_streamed_text_parts", []) - _streamed_text = "".join(_streamed_parts).strip() if _streamed_parts else "" logging.warning( - "Codex response.output is empty " - "(status=%s, incomplete_details=%s, streamed_chars=%d, " - "output_text=%r, model=%s). %s", - _resp_status, _resp_incomplete, len(_streamed_text), - getattr(response, "output_text", None), + "Codex response.output is empty after stream backfill " + "(status=%s, incomplete_details=%s, model=%s). %s", + _resp_status, _resp_incomplete, getattr(response, "model", None), f"api_mode={self.api_mode} provider={self.provider}", ) - # Recovery: if we streamed text but the final response - # lost it (e.g. response.incomplete from chatgpt backend-api), - # synthesize a minimal response so the user gets the answer - # the model already delivered. - if _streamed_text: - logging.info( - "Recovering %d chars of streamed text as response " - "(status was %s).", len(_streamed_text), _resp_status, - ) - response = SimpleNamespace( - output=[SimpleNamespace( - type="message", - role="assistant", - status="completed", - content=[SimpleNamespace( - type="output_text", - text=_streamed_text, - )], - )], - status=_resp_status or "completed", - model=getattr(response, "model", self.model), - usage=getattr(response, "usage", None), - id=getattr(response, "id", None), - output_text=_streamed_text, - ) - # Clear the accumulated parts so we don't double-recover - self._codex_streamed_text_parts = [] - else: - response_invalid = True - error_details.append("response.output is empty") + response_invalid = True + error_details.append("response.output is empty") elif self.api_mode == "anthropic_messages": content_blocks = getattr(response, "content", None) if response is not None else None if response is None: