diff --git a/agent/chat_completion_helpers.py b/agent/chat_completion_helpers.py index f710697a03e..8fe6bcd20cb 100644 --- a/agent/chat_completion_helpers.py +++ b/agent/chat_completion_helpers.py @@ -256,6 +256,33 @@ def interruptible_api_call(agent, api_kwargs: dict): # apply richer recovery (credential rotation, provider fallback). _stale_timeout = agent._compute_non_stream_stale_timeout(api_kwargs) + # ── Time-to-first-byte (TTFB) watchdog for the Codex Responses stream ── + # The chatgpt.com/backend-api/codex endpoint has an intermittent failure + # mode where it accepts the connection but never emits a single stream + # event (observed directly: 0 events, no HTTP status, the socket just + # hangs). A fresh reconnect succeeds in ~2s, but the wall-clock stale + # timeout (often 180–900s) makes us wait minutes before retrying. While no + # stream event has arrived yet we apply a much shorter TTFB cutoff so the + # main retry loop can reconnect promptly. Once the first event arrives the + # stream is healthy, so we fall back to the wall-clock stale timeout and + # never interrupt a legitimate long generation. Gated to codex_responses: + # only that path streams events incrementally (the chat_completions + # non-stream, anthropic and bedrock branches here have no first-event + # signal). The marker advances on *any* event (see codex_runtime), so + # reasoning-only / tool-call-only turns are not mistaken for a stall. + # Operators can tune via HERMES_CODEX_TTFB_TIMEOUT_SECONDS (0 disables). + _ttfb_enabled = agent.api_mode == "codex_responses" + try: + _ttfb_timeout = float(os.getenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "45")) + except (TypeError, ValueError): + _ttfb_timeout = 45.0 + if _ttfb_timeout <= 0: + _ttfb_enabled = False + if _ttfb_enabled: + # Reset before the worker starts so a marker left over from a previous + # call on this agent can't be misread as first-byte for this one. + agent._codex_stream_last_event_ts = None + _call_start = time.time() agent._touch_activity("waiting for non-streaming API response") @@ -274,9 +301,48 @@ def interruptible_api_call(agent, api_kwargs: dict): f"waiting for non-streaming response ({int(_elapsed)}s elapsed)" ) + _elapsed = time.time() - _call_start + + # TTFB detector: the Codex stream has produced no event at all and + # we're past the first-byte cutoff → the backend opened the + # connection but isn't responding. Kill it so the retry loop can + # reconnect (a fresh connection typically succeeds in seconds), + # instead of waiting out the much longer wall-clock stale timeout. + if ( + _ttfb_enabled + and _elapsed > _ttfb_timeout + and getattr(agent, "_codex_stream_last_event_ts", None) is None + ): + logger.warning( + "Codex stream produced no bytes within TTFB cutoff " + "(%.0fs > %.0fs, model=%s). Backend accepted the connection " + "but sent no stream events. Killing connection so the retry " + "loop can reconnect.", + _elapsed, _ttfb_timeout, api_kwargs.get("model", "unknown"), + ) + agent._emit_status( + f"⚠️ No first byte from provider in {int(_elapsed)}s " + f"(codex stream, model: {api_kwargs.get('model', 'unknown')}). " + f"Reconnecting." + ) + try: + _close_request_client_once("codex_ttfb_kill") + except Exception: + pass + agent._touch_activity( + f"codex stream killed after {int(_elapsed)}s with no first byte" + ) + # Wait briefly for the worker to notice the closed connection. + t.join(timeout=2.0) + if result["error"] is None and result["response"] is None: + result["error"] = TimeoutError( + f"Codex stream produced no bytes within {int(_elapsed)}s " + f"(TTFB threshold: {int(_ttfb_timeout)}s)" + ) + break + # Stale-call detector: kill the connection if no response # arrives within the configured timeout. - _elapsed = time.time() - _call_start if _elapsed > _stale_timeout: _est_ctx = estimate_request_context_tokens(api_kwargs) _silent_hint: Optional[str] = None diff --git a/agent/codex_runtime.py b/agent/codex_runtime.py index 02b788f5777..8c5dff39bff 100644 --- a/agent/codex_runtime.py +++ b/agent/codex_runtime.py @@ -19,6 +19,7 @@ from __future__ import annotations import json import logging import os +import time from types import SimpleNamespace from typing import Any, Dict, List @@ -194,6 +195,11 @@ def run_codex_stream(agent, api_kwargs: dict, client: Any = None, on_first_delta try: with active_client.responses.stream(**api_kwargs) as stream: for event in stream: + # Mark stream activity for the TTFB watchdog in + # interruptible_api_call. The Codex backend can accept the + # connection but never emit a single event; this timestamp + # staying None tells the watchdog no bytes are flowing. + agent._codex_stream_last_event_ts = time.time() agent._touch_activity("receiving stream response") if agent._interrupt_requested: break diff --git a/tests/agent/test_codex_ttfb_watchdog.py b/tests/agent/test_codex_ttfb_watchdog.py new file mode 100644 index 00000000000..9898c46261f --- /dev/null +++ b/tests/agent/test_codex_ttfb_watchdog.py @@ -0,0 +1,175 @@ +"""Regression tests for the Codex time-to-first-byte (TTFB) watchdog. + +The chatgpt.com/backend-api/codex endpoint has an intermittent failure mode +where it accepts the connection but never emits a single stream event. The +watchdog in ``interruptible_api_call`` kills such a connection at a short TTFB +cutoff (instead of waiting out the much longer wall-clock stale timeout) so the +retry loop can reconnect promptly. Once any stream event arrives, the stream is +considered healthy and only the wall-clock stale timeout applies — long +generations must never be interrupted by the TTFB cutoff. + +The "bytes flowing" signal is ``agent._codex_stream_last_event_ts``, set on +*any* event by ``codex_runtime.run_codex_stream`` — so reasoning-only or +tool-call-only turns (which emit no output-text deltas) are not mistaken for a +stall. +""" + +from __future__ import annotations + +import sys +import time +import types +from types import SimpleNamespace + +import pytest + +# Stub optional heavy imports so run_agent imports cleanly in isolation. +sys.modules.setdefault("fire", types.SimpleNamespace(Fire=lambda *a, **k: None)) +sys.modules.setdefault("firecrawl", types.SimpleNamespace(Firecrawl=object)) +sys.modules.setdefault("fal_client", types.SimpleNamespace()) + + +def _make_codex_agent(tmp_path, monkeypatch): + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + (tmp_path / ".env").write_text("", encoding="utf-8") + (tmp_path / "config.yaml").write_text("{}\n", encoding="utf-8") + from run_agent import AIAgent + + agent = AIAgent( + model="gpt-5.5", + provider="openai-codex", + api_key="sk-dummy", + base_url="https://chatgpt.com/backend-api/codex", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + platform="cli", + ) + # The watchdog is gated on the codex_responses api_mode; assert/force it so + # the test is robust to detection-logic changes elsewhere. + agent.api_mode = "codex_responses" + monkeypatch.setattr(agent, "_emit_status", lambda *a, **k: None) + # Keep the wall-clock stale timeout high so any early kill is unambiguously + # the TTFB path, not the stale-call path. + monkeypatch.setattr( + agent, "_compute_non_stream_stale_timeout", lambda *a, **k: 60.0 + ) + return agent + + +def test_ttfb_kills_when_no_stream_event(tmp_path, monkeypatch): + """Backend accepts the connection but emits no event -> killed at the TTFB + cutoff, well before the 60s wall-clock stale timeout, with a retryable + TimeoutError and a ``codex_ttfb_kill`` close reason.""" + from agent import chat_completion_helpers as h + + agent = _make_codex_agent(tmp_path, monkeypatch) + monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "1") + + closes: list = [] + dummy_client = SimpleNamespace() + monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client) + monkeypatch.setattr( + agent, "_abort_request_openai_client", + lambda c, reason=None: closes.append(reason), + ) + monkeypatch.setattr( + agent, "_close_request_openai_client", + lambda c, reason=None: closes.append(reason), + ) + + stop = {"flag": False} + + def fake_hang(api_kwargs, client=None, on_first_delta=None): + # Never set _codex_stream_last_event_ts: simulate zero events arriving. + deadline = time.time() + 30 + while time.time() < deadline and not stop["flag"] and not agent._interrupt_requested: + time.sleep(0.02) + raise RuntimeError("connection closed") + + monkeypatch.setattr(agent, "_run_codex_stream", fake_hang) + + t0 = time.time() + try: + with pytest.raises(TimeoutError) as excinfo: + h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": "hi"}) + elapsed = time.time() - t0 + assert "TTFB" in str(excinfo.value) + assert "codex_ttfb_kill" in closes + # ~1s cutoff + 2s join grace; must be far under the 60s stale timeout. + assert elapsed < 15, f"TTFB watchdog took {elapsed:.1f}s" + finally: + stop["flag"] = True + + +def test_ttfb_does_not_kill_when_events_flow(tmp_path, monkeypatch): + """Once a stream event has arrived, a generation that runs past the TTFB + cutoff is NOT killed by the watchdog — it completes normally.""" + from agent import chat_completion_helpers as h + + agent = _make_codex_agent(tmp_path, monkeypatch) + monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "1") + + closes: list = [] + dummy_client = SimpleNamespace() + monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client) + monkeypatch.setattr( + agent, "_abort_request_openai_client", + lambda c, reason=None: closes.append(reason), + ) + monkeypatch.setattr( + agent, "_close_request_openai_client", + lambda c, reason=None: closes.append(reason), + ) + + sentinel = SimpleNamespace(ok=True) + + def fake_stream(api_kwargs, client=None, on_first_delta=None): + # Bytes flowing: mark stream activity right away, then keep generating + # past the 1s TTFB cutoff before returning a real response. + agent._codex_stream_last_event_ts = time.time() + if on_first_delta: + on_first_delta() + time.sleep(2.0) + return sentinel + + monkeypatch.setattr(agent, "_run_codex_stream", fake_stream) + + resp = h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": "hi"}) + assert resp is sentinel + assert "codex_ttfb_kill" not in closes + + +def test_ttfb_disabled_via_env_zero(tmp_path, monkeypatch): + """Setting HERMES_CODEX_TTFB_TIMEOUT_SECONDS=0 disables the TTFB watchdog; + a no-event stall then falls through to the (here, 60s) stale timeout, so a + short hang is NOT killed by TTFB.""" + from agent import chat_completion_helpers as h + + agent = _make_codex_agent(tmp_path, monkeypatch) + monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "0") + + closes: list = [] + dummy_client = SimpleNamespace() + monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client) + monkeypatch.setattr( + agent, "_abort_request_openai_client", + lambda c, reason=None: closes.append(reason), + ) + monkeypatch.setattr( + agent, "_close_request_openai_client", + lambda c, reason=None: closes.append(reason), + ) + + sentinel = SimpleNamespace(ok=True) + + def fake_stream(api_kwargs, client=None, on_first_delta=None): + # No event marker, but only briefly — well under the 60s stale timeout. + time.sleep(2.0) + return sentinel + + monkeypatch.setattr(agent, "_run_codex_stream", fake_stream) + + resp = h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": "hi"}) + assert resp is sentinel + assert "codex_ttfb_kill" not in closes