fix(auxiliary): enforce Codex Responses stream timeout

## Summary
- Forwards chat-completions `timeout` into the Codex Responses stream call.
- Adds total elapsed-time enforcement while the Responses stream is still yielding events.
- Closes the underlying client on timeout to unblock stalled streams, then raises `TimeoutError`.
- Adds focused tests for timeout forwarding and total timeout enforcement.

## Why
The Codex auxiliary adapter can be used by non-interactive auxiliary work such as context compression. If the stream keeps yielding progress-like events but never completes, SDK socket/read timeouts do not necessarily protect the full operation. This makes the CLI look stuck until the user force-interrupts the whole session.

This is a refreshed upstream-ready version of the earlier fork fix around `d3f08e9a0` / PR #3.

## Verification
- `python -m py_compile agent/auxiliary_client.py tests/agent/test_auxiliary_client.py`
- `python -m pytest -o addopts='' tests/agent/test_auxiliary_client.py::TestCodexAuxiliaryAdapterTimeout -q`
- `git diff --check`
This commit is contained in:
acc001k 2026-05-07 14:18:20 +02:00 committed by Teknium
parent fd13b7d2b9
commit 5533ad7644
2 changed files with 133 additions and 0 deletions

View file

@ -602,6 +602,14 @@ class _CodexCompletionsAdapter:
"store": False, "store": False,
} }
# Preserve the chat.completions timeout contract. This adapter is used
# by auxiliary calls such as context compression; if the timeout is not
# forwarded and enforced, a Codex Responses stream can sit behind a
# dead-looking CLI until the user force-interrupts the whole session.
timeout = kwargs.get("timeout")
if timeout is not None:
resp_kwargs["timeout"] = timeout
# Note: the Codex endpoint (chatgpt.com/backend-api/codex) does NOT # Note: the Codex endpoint (chatgpt.com/backend-api/codex) does NOT
# support max_output_tokens or temperature — omit to avoid 400 errors. # support max_output_tokens or temperature — omit to avoid 400 errors.
@ -659,6 +667,37 @@ class _CodexCompletionsAdapter:
text_parts: List[str] = [] text_parts: List[str] = []
tool_calls_raw: List[Any] = [] tool_calls_raw: List[Any] = []
usage = None usage = None
total_timeout = timeout if isinstance(timeout, (int, float)) and timeout > 0 else None
deadline = time.monotonic() + float(total_timeout) if total_timeout else None
timed_out = threading.Event()
timeout_timer: Optional[threading.Timer] = None
def _timeout_message() -> str:
return f"Codex auxiliary Responses stream exceeded {float(total_timeout):.1f}s total timeout"
def _close_client_on_timeout() -> None:
timed_out.set()
close = getattr(self._client, "close", None)
if callable(close):
try:
close()
except Exception:
logger.debug("Codex auxiliary: client close during timeout failed", exc_info=True)
def _check_cancelled() -> None:
if deadline is not None and time.monotonic() >= deadline:
timed_out.set()
raise TimeoutError(_timeout_message())
try:
from tools.interrupt import is_interrupted
if is_interrupted():
raise InterruptedError("Codex auxiliary Responses stream interrupted")
except InterruptedError:
raise
except Exception:
# Interrupt state is a best-effort UX hook; never make it a
# new failure mode for auxiliary calls.
pass
try: try:
# Collect output items and text deltas during streaming — # Collect output items and text deltas during streaming —
@ -667,8 +706,14 @@ class _CodexCompletionsAdapter:
collected_output_items: List[Any] = [] collected_output_items: List[Any] = []
collected_text_deltas: List[str] = [] collected_text_deltas: List[str] = []
has_function_calls = False has_function_calls = False
if total_timeout:
timeout_timer = threading.Timer(float(total_timeout), _close_client_on_timeout)
timeout_timer.daemon = True
timeout_timer.start()
_check_cancelled()
with self._client.responses.stream(**resp_kwargs) as stream: with self._client.responses.stream(**resp_kwargs) as stream:
for _event in stream: for _event in stream:
_check_cancelled()
_etype = getattr(_event, "type", "") _etype = getattr(_event, "type", "")
if _etype == "response.output_item.done": if _etype == "response.output_item.done":
_done = getattr(_event, "item", None) _done = getattr(_event, "item", None)
@ -680,6 +725,7 @@ class _CodexCompletionsAdapter:
collected_text_deltas.append(_delta) collected_text_deltas.append(_delta)
elif "function_call" in _etype: elif "function_call" in _etype:
has_function_calls = True has_function_calls = True
_check_cancelled()
final = stream.get_final_response() final = stream.get_final_response()
# Backfill empty output from collected stream events # Backfill empty output from collected stream events
@ -739,8 +785,13 @@ class _CodexCompletionsAdapter:
total_tokens=getattr(resp_usage, "total_tokens", 0), total_tokens=getattr(resp_usage, "total_tokens", 0),
) )
except Exception as exc: except Exception as exc:
if timed_out.is_set():
raise TimeoutError(_timeout_message()) from exc
logger.debug("Codex auxiliary Responses API call failed: %s", exc) logger.debug("Codex auxiliary Responses API call failed: %s", exc)
raise raise
finally:
if timeout_timer is not None:
timeout_timer.cancel()
content = "".join(text_parts).strip() or None content = "".join(text_parts).strip() or None

View file

@ -3,7 +3,9 @@
import json import json
import logging import logging
import os import os
import time
from pathlib import Path from pathlib import Path
from types import SimpleNamespace
from unittest.mock import patch, MagicMock, AsyncMock from unittest.mock import patch, MagicMock, AsyncMock
import pytest import pytest
@ -24,6 +26,7 @@ from agent.auxiliary_client import (
_normalize_aux_provider, _normalize_aux_provider,
_try_payment_fallback, _try_payment_fallback,
_resolve_auto, _resolve_auto,
_CodexCompletionsAdapter,
) )
@ -1894,6 +1897,85 @@ class TestVisionAutoSkipsKimiCoding:
}) })
class TestCodexAuxiliaryAdapterTimeout:
def test_forwards_timeout_to_responses_stream(self):
class FakeStream:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def __iter__(self):
return iter(())
def get_final_response(self):
return SimpleNamespace(
output=[SimpleNamespace(
type="message",
content=[SimpleNamespace(type="output_text", text="summary")],
)],
usage=None,
)
class FakeResponses:
def __init__(self):
self.kwargs = None
def stream(self, **kwargs):
self.kwargs = kwargs
return FakeStream()
fake_client = SimpleNamespace(responses=FakeResponses())
adapter = _CodexCompletionsAdapter(fake_client, "gpt-5.5")
response = adapter.create(
messages=[{"role": "user", "content": "summarize this"}],
timeout=12.5,
)
assert fake_client.responses.kwargs["timeout"] == 12.5
assert response.choices[0].message.content == "summary"
def test_enforces_total_timeout_while_stream_keeps_emitting_events(self):
class SlowAliveStream:
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def __iter__(self):
for _ in range(5):
time.sleep(0.03)
yield SimpleNamespace(type="response.in_progress")
def get_final_response(self):
return SimpleNamespace(
output=[SimpleNamespace(
type="message",
content=[SimpleNamespace(type="output_text", text="late")],
)],
usage=None,
)
class FakeResponses:
def stream(self, **kwargs):
return SlowAliveStream()
fake_client = SimpleNamespace(responses=FakeResponses(), close=lambda: None)
adapter = _CodexCompletionsAdapter(fake_client, "gpt-5.5")
started = time.monotonic()
with pytest.raises(TimeoutError):
adapter.create(
messages=[{"role": "user", "content": "summarize this"}],
timeout=0.05,
)
assert time.monotonic() - started < 0.14
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# _build_call_kwargs — tool dedup at API boundary # _build_call_kwargs — tool dedup at API boundary
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------