diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index 9f2e182a9f..bd4e6be457 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -602,6 +602,14 @@ class _CodexCompletionsAdapter: "store": False, } + # Preserve the chat.completions timeout contract. This adapter is used + # by auxiliary calls such as context compression; if the timeout is not + # forwarded and enforced, a Codex Responses stream can sit behind a + # dead-looking CLI until the user force-interrupts the whole session. + timeout = kwargs.get("timeout") + if timeout is not None: + resp_kwargs["timeout"] = timeout + # Note: the Codex endpoint (chatgpt.com/backend-api/codex) does NOT # support max_output_tokens or temperature — omit to avoid 400 errors. @@ -659,6 +667,37 @@ class _CodexCompletionsAdapter: text_parts: List[str] = [] tool_calls_raw: List[Any] = [] usage = None + total_timeout = timeout if isinstance(timeout, (int, float)) and timeout > 0 else None + deadline = time.monotonic() + float(total_timeout) if total_timeout else None + timed_out = threading.Event() + timeout_timer: Optional[threading.Timer] = None + + def _timeout_message() -> str: + return f"Codex auxiliary Responses stream exceeded {float(total_timeout):.1f}s total timeout" + + def _close_client_on_timeout() -> None: + timed_out.set() + close = getattr(self._client, "close", None) + if callable(close): + try: + close() + except Exception: + logger.debug("Codex auxiliary: client close during timeout failed", exc_info=True) + + def _check_cancelled() -> None: + if deadline is not None and time.monotonic() >= deadline: + timed_out.set() + raise TimeoutError(_timeout_message()) + try: + from tools.interrupt import is_interrupted + if is_interrupted(): + raise InterruptedError("Codex auxiliary Responses stream interrupted") + except InterruptedError: + raise + except Exception: + # Interrupt state is a best-effort UX hook; never make it a + # new failure mode for auxiliary calls. + pass try: # Collect output items and text deltas during streaming — @@ -667,8 +706,14 @@ class _CodexCompletionsAdapter: collected_output_items: List[Any] = [] collected_text_deltas: List[str] = [] has_function_calls = False + if total_timeout: + timeout_timer = threading.Timer(float(total_timeout), _close_client_on_timeout) + timeout_timer.daemon = True + timeout_timer.start() + _check_cancelled() with self._client.responses.stream(**resp_kwargs) as stream: for _event in stream: + _check_cancelled() _etype = getattr(_event, "type", "") if _etype == "response.output_item.done": _done = getattr(_event, "item", None) @@ -680,6 +725,7 @@ class _CodexCompletionsAdapter: collected_text_deltas.append(_delta) elif "function_call" in _etype: has_function_calls = True + _check_cancelled() final = stream.get_final_response() # Backfill empty output from collected stream events @@ -739,8 +785,13 @@ class _CodexCompletionsAdapter: total_tokens=getattr(resp_usage, "total_tokens", 0), ) except Exception as exc: + if timed_out.is_set(): + raise TimeoutError(_timeout_message()) from exc logger.debug("Codex auxiliary Responses API call failed: %s", exc) raise + finally: + if timeout_timer is not None: + timeout_timer.cancel() content = "".join(text_parts).strip() or None diff --git a/tests/agent/test_auxiliary_client.py b/tests/agent/test_auxiliary_client.py index 16e563a91a..6437c872ce 100644 --- a/tests/agent/test_auxiliary_client.py +++ b/tests/agent/test_auxiliary_client.py @@ -3,7 +3,9 @@ import json import logging import os +import time from pathlib import Path +from types import SimpleNamespace from unittest.mock import patch, MagicMock, AsyncMock import pytest @@ -24,6 +26,7 @@ from agent.auxiliary_client import ( _normalize_aux_provider, _try_payment_fallback, _resolve_auto, + _CodexCompletionsAdapter, ) @@ -1894,6 +1897,85 @@ class TestVisionAutoSkipsKimiCoding: }) +class TestCodexAuxiliaryAdapterTimeout: + def test_forwards_timeout_to_responses_stream(self): + class FakeStream: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def __iter__(self): + return iter(()) + + def get_final_response(self): + return SimpleNamespace( + output=[SimpleNamespace( + type="message", + content=[SimpleNamespace(type="output_text", text="summary")], + )], + usage=None, + ) + + class FakeResponses: + def __init__(self): + self.kwargs = None + + def stream(self, **kwargs): + self.kwargs = kwargs + return FakeStream() + + fake_client = SimpleNamespace(responses=FakeResponses()) + adapter = _CodexCompletionsAdapter(fake_client, "gpt-5.5") + + response = adapter.create( + messages=[{"role": "user", "content": "summarize this"}], + timeout=12.5, + ) + + assert fake_client.responses.kwargs["timeout"] == 12.5 + assert response.choices[0].message.content == "summary" + + def test_enforces_total_timeout_while_stream_keeps_emitting_events(self): + class SlowAliveStream: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def __iter__(self): + for _ in range(5): + time.sleep(0.03) + yield SimpleNamespace(type="response.in_progress") + + def get_final_response(self): + return SimpleNamespace( + output=[SimpleNamespace( + type="message", + content=[SimpleNamespace(type="output_text", text="late")], + )], + usage=None, + ) + + class FakeResponses: + def stream(self, **kwargs): + return SlowAliveStream() + + fake_client = SimpleNamespace(responses=FakeResponses(), close=lambda: None) + adapter = _CodexCompletionsAdapter(fake_client, "gpt-5.5") + + started = time.monotonic() + with pytest.raises(TimeoutError): + adapter.create( + messages=[{"role": "user", "content": "summarize this"}], + timeout=0.05, + ) + + assert time.monotonic() - started < 0.14 + + # --------------------------------------------------------------------------- # _build_call_kwargs — tool dedup at API boundary # ---------------------------------------------------------------------------