diff --git a/run_agent.py b/run_agent.py index 5170178b70..6d69374012 100644 --- a/run_agent.py +++ b/run_agent.py @@ -1530,11 +1530,49 @@ class AIAgent: continue if missing_completed: logger.debug( - "Responses stream did not emit response.completed; falling back to non-stream create." + "Responses stream did not emit response.completed; falling back to create(stream=True)." ) - return self.client.responses.create(**api_kwargs) + return self._run_codex_create_stream_fallback(api_kwargs) raise + def _run_codex_create_stream_fallback(self, api_kwargs: dict): + """Fallback path for stream completion edge cases on Codex-style Responses backends.""" + fallback_kwargs = dict(api_kwargs) + fallback_kwargs["stream"] = True + stream_or_response = self.client.responses.create(**fallback_kwargs) + + # Compatibility shim for mocks or providers that still return a concrete response. + if hasattr(stream_or_response, "output"): + return stream_or_response + if not hasattr(stream_or_response, "__iter__"): + return stream_or_response + + terminal_response = None + try: + for event in stream_or_response: + event_type = getattr(event, "type", None) + if not event_type and isinstance(event, dict): + event_type = event.get("type") + if event_type not in {"response.completed", "response.incomplete", "response.failed"}: + continue + + terminal_response = getattr(event, "response", None) + if terminal_response is None and isinstance(event, dict): + terminal_response = event.get("response") + if terminal_response is not None: + return terminal_response + finally: + close_fn = getattr(stream_or_response, "close", None) + if callable(close_fn): + try: + close_fn() + except Exception: + pass + + if terminal_response is not None: + return terminal_response + raise RuntimeError("Responses create(stream=True) fallback did not emit a terminal response.") + def _interruptible_api_call(self, api_kwargs: dict): """ Run the API call in a background thread so the main conversation loop diff --git a/tests/test_run_agent_codex_responses.py b/tests/test_run_agent_codex_responses.py index 7121cc5720..fc7c619802 100644 --- a/tests/test_run_agent_codex_responses.py +++ b/tests/test_run_agent_codex_responses.py @@ -144,6 +144,18 @@ class _FakeResponsesStream: return self._final_response +class _FakeCreateStream: + def __init__(self, events): + self._events = list(events) + self.closed = False + + def __iter__(self): + return iter(self._events) + + def close(self): + self.closed = True + + def test_api_mode_uses_explicit_provider_when_codex(monkeypatch): _patch_agent_bootstrap(monkeypatch) agent = run_agent.AIAgent( @@ -263,6 +275,42 @@ def test_run_codex_stream_falls_back_to_create_after_stream_completion_error(mon assert response.output[0].content[0].text == "create fallback ok" +def test_run_codex_stream_fallback_parses_create_stream_events(monkeypatch): + agent = _build_agent(monkeypatch) + calls = {"stream": 0, "create": 0} + create_stream = _FakeCreateStream( + [ + SimpleNamespace(type="response.created"), + SimpleNamespace(type="response.in_progress"), + SimpleNamespace(type="response.completed", response=_codex_message_response("streamed create ok")), + ] + ) + + def _fake_stream(**kwargs): + calls["stream"] += 1 + return _FakeResponsesStream( + final_error=RuntimeError("Didn't receive a `response.completed` event.") + ) + + def _fake_create(**kwargs): + calls["create"] += 1 + assert kwargs.get("stream") is True + return create_stream + + agent.client = SimpleNamespace( + responses=SimpleNamespace( + stream=_fake_stream, + create=_fake_create, + ) + ) + + response = agent._run_codex_stream({"model": "gpt-5-codex"}) + assert calls["stream"] == 2 + assert calls["create"] == 1 + assert create_stream.closed is True + assert response.output[0].content[0].text == "streamed create ok" + + def test_run_conversation_codex_plain_text(monkeypatch): agent = _build_agent(monkeypatch) monkeypatch.setattr(agent, "_interruptible_api_call", lambda api_kwargs: _codex_message_response("OK"))