From 283bb810e7211acc38171d3171bb6049ff6d4dba Mon Sep 17 00:00:00 2001 From: Sanghyuk Seo Date: Thu, 28 May 2026 02:42:18 +0900 Subject: [PATCH] fix(agent): tolerate large codex stream prefill --- agent/chat_completion_helpers.py | 144 +++++++++++++++++--- tests/agent/test_codex_ttfb_watchdog.py | 168 +++++++++++++++++++++++- 2 files changed, 294 insertions(+), 18 deletions(-) diff --git a/agent/chat_completion_helpers.py b/agent/chat_completion_helpers.py index 6ef4fe24365..ce83dd04907 100644 --- a/agent/chat_completion_helpers.py +++ b/agent/chat_completion_helpers.py @@ -129,6 +129,24 @@ def estimate_request_context_tokens(api_payload: Any) -> int: return _chars(api_payload) // 4 +def _is_openai_codex_backend(agent) -> bool: + base_url_lower = str(getattr(agent, "_base_url_lower", "") or "") + base_url_hostname = str(getattr(agent, "_base_url_hostname", "") or "") + return ( + getattr(agent, "provider", None) == "openai-codex" + or ( + base_url_hostname == "chatgpt.com" + and "/backend-api/codex" in base_url_lower + ) + ) + + +def _env_float(name: str, default: float) -> float: + try: + return float(os.getenv(name, str(default))) + except (TypeError, ValueError): + return default + def interruptible_api_call(agent, api_kwargs: dict): """ @@ -256,32 +274,89 @@ def interruptible_api_call(agent, api_kwargs: dict): # apply richer recovery (credential rotation, provider fallback). _stale_timeout = agent._compute_non_stream_stale_timeout(api_kwargs) - # ── Time-to-first-byte (TTFB) watchdog for the Codex Responses stream ── + # ── Codex Responses stream watchdogs ──────────────────────────────── # The chatgpt.com/backend-api/codex endpoint has an intermittent failure # mode where it accepts the connection but never emits a single stream # event (observed directly: 0 events, no HTTP status, the socket just # hangs). A fresh reconnect succeeds in ~2s, but the wall-clock stale # timeout (often 180–900s) makes us wait minutes before retrying. While no # stream event has arrived yet we apply a much shorter TTFB cutoff so the - # main retry loop can reconnect promptly. Once the first event arrives the - # stream is healthy, so we fall back to the wall-clock stale timeout and - # never interrupt a legitimate long generation. Gated to codex_responses: - # only that path streams events incrementally (the chat_completions - # non-stream, anthropic and bedrock branches here have no first-event - # signal). The marker advances on *any* event (see codex_runtime), so - # reasoning-only / tool-call-only turns are not mistaken for a stall. - # Operators can tune via HERMES_CODEX_TTFB_TIMEOUT_SECONDS (0 disables). - _ttfb_enabled = agent.api_mode == "codex_responses" - try: - _ttfb_timeout = float(os.getenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "45")) - except (TypeError, ValueError): - _ttfb_timeout = 45.0 + # main retry loop can reconnect promptly. Large subscription-backed Codex + # requests can legitimately spend tens of seconds in backend admission / + # prompt prefill before the first SSE event, so the no-byte TTFB watchdog + # is disabled for large chatgpt.com/backend-api/codex requests. A second + # failure mode emits an opening SSE frame and then stalls forever in SSL + # read; for that we watch the gap since the last Codex stream event. This + # matches Codex CLI's stream_idle_timeout model: any valid SSE event is + # activity. Operators can tune via HERMES_CODEX_TTFB_TIMEOUT_SECONDS and + # HERMES_CODEX_EVENT_STALE_TIMEOUT_SECONDS (0 disables each). + _codex_watchdog_enabled = agent.api_mode == "codex_responses" + _openai_codex_backend = _is_openai_codex_backend(agent) + _est_tokens_for_codex_watchdog = estimate_request_context_tokens(api_kwargs) + if _codex_watchdog_enabled and _openai_codex_backend: + if _est_tokens_for_codex_watchdog > 100_000: + _stale_timeout = max(_stale_timeout, 1200.0) + elif _est_tokens_for_codex_watchdog > 50_000: + _stale_timeout = max(_stale_timeout, 900.0) + elif _est_tokens_for_codex_watchdog > 25_000: + _stale_timeout = max(_stale_timeout, 600.0) + + if _est_tokens_for_codex_watchdog > 100_000: + _codex_idle_timeout_default = 180.0 + elif _est_tokens_for_codex_watchdog > 50_000: + _codex_idle_timeout_default = 120.0 + elif _est_tokens_for_codex_watchdog > 10_000: + _codex_idle_timeout_default = 60.0 + else: + _codex_idle_timeout_default = 12.0 + + _ttfb_enabled = _codex_watchdog_enabled + _ttfb_timeout = _env_float("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", 12.0) if _ttfb_timeout <= 0: _ttfb_enabled = False - if _ttfb_enabled: + elif _openai_codex_backend: + _ttfb_disable_above = _env_float("HERMES_CODEX_TTFB_DISABLE_ABOVE_TOKENS", 25_000.0) + _ttfb_strict = os.environ.get("HERMES_CODEX_TTFB_STRICT", "").strip().lower() in { + "1", "true", "yes", "on" + } + if ( + not _ttfb_strict + and _ttfb_disable_above > 0 + and _est_tokens_for_codex_watchdog >= _ttfb_disable_above + ): + _ttfb_enabled = False + logger.info( + "Disabling openai-codex no-byte TTFB watchdog for large request " + "(context=~%s tokens >= %.0f). Waiting for backend response instead. " + "Set HERMES_CODEX_TTFB_STRICT=1 to force early reconnects.", + f"{_est_tokens_for_codex_watchdog:,}", + _ttfb_disable_above, + ) + else: + _ttfb_cap = _env_float("HERMES_CODEX_TTFB_MAX_SECONDS", 20.0) + if _ttfb_cap > 0 and _ttfb_timeout > _ttfb_cap: + logger.info( + "Capping openai-codex no-byte TTFB timeout from %.0fs to %.0fs " + "(context=~%s tokens). Set HERMES_CODEX_TTFB_MAX_SECONDS to tune.", + _ttfb_timeout, + _ttfb_cap, + f"{_est_tokens_for_codex_watchdog:,}", + ) + _ttfb_timeout = _ttfb_cap + + _codex_idle_enabled = _codex_watchdog_enabled + _codex_idle_timeout = _env_float( + "HERMES_CODEX_EVENT_STALE_TIMEOUT_SECONDS", + _codex_idle_timeout_default, + ) + if _codex_idle_timeout <= 0: + _codex_idle_enabled = False + + if _codex_watchdog_enabled: # Reset before the worker starts so a marker left over from a previous # call on this agent can't be misread as first-byte for this one. agent._codex_stream_last_event_ts = None + agent._codex_stream_last_progress_ts = None _call_start = time.time() agent._touch_activity("waiting for non-streaming API response") @@ -361,6 +436,45 @@ def interruptible_api_call(agent, api_kwargs: dict): ) break + # Stream-idle detector: the Codex backend emitted at least one SSE + # frame, then stopped emitting events. Valid keepalive / in_progress + # frames refresh _codex_stream_last_event_ts and should not be killed. + _last_codex_event_ts = getattr(agent, "_codex_stream_last_event_ts", None) + if ( + _codex_idle_enabled + and _last_codex_event_ts is not None + and (time.time() - _last_codex_event_ts) > _codex_idle_timeout + ): + _event_stale_elapsed = time.time() - _last_codex_event_ts + logger.warning( + "Codex stream produced no SSE events for %.0fs after first byte " + "(threshold %.0fs, model=%s, context=~%s tokens). Killing " + "connection so the retry loop can reconnect.", + _event_stale_elapsed, + _codex_idle_timeout, + api_kwargs.get("model", "unknown"), + f"{_est_tokens_for_codex_watchdog:,}", + ) + agent._emit_status( + f"⚠️ Codex stream sent no events for {int(_event_stale_elapsed)}s " + f"after first byte (model: {api_kwargs.get('model', 'unknown')}). " + f"Reconnecting." + ) + try: + _close_request_client_once("codex_stream_idle_kill") + except Exception: + pass + agent._touch_activity( + f"codex stream killed after {int(_event_stale_elapsed)}s with no SSE events" + ) + t.join(timeout=2.0) + if result["error"] is None and result["response"] is None: + result["error"] = TimeoutError( + f"Codex stream produced no SSE events for {int(_event_stale_elapsed)}s " + f"after first byte (threshold: {int(_codex_idle_timeout)}s)" + ) + break + # Stale-call detector: kill the connection if no response # arrives within the configured timeout. if _elapsed > _stale_timeout: diff --git a/tests/agent/test_codex_ttfb_watchdog.py b/tests/agent/test_codex_ttfb_watchdog.py index f649e40b49a..57466a81834 100644 --- a/tests/agent/test_codex_ttfb_watchdog.py +++ b/tests/agent/test_codex_ttfb_watchdog.py @@ -4,9 +4,9 @@ The chatgpt.com/backend-api/codex endpoint has an intermittent failure mode where it accepts the connection but never emits a single stream event. The watchdog in ``interruptible_api_call`` kills such a connection at a short TTFB cutoff (instead of waiting out the much longer wall-clock stale timeout) so the -retry loop can reconnect promptly. Once any stream event arrives, the stream is -considered healthy and only the wall-clock stale timeout applies — long -generations must never be interrupted by the TTFB cutoff. +retry loop can reconnect promptly. Once any stream event arrives, the TTFB +watchdog is satisfied and a separate idle watchdog handles streams that stop +emitting SSE events. The "bytes flowing" signal is ``agent._codex_stream_last_event_ts``, set on *any* event by ``codex_runtime.run_codex_stream`` — so reasoning-only or @@ -148,6 +148,49 @@ def test_ttfb_includes_silent_hang_hint_for_gpt_5_5(tmp_path, monkeypatch): stop["flag"] = True +def test_ttfb_high_env_is_capped_for_openai_codex(tmp_path, monkeypatch): + """A stale local env value like 90s must not make openai-codex wait 90s + before reconnecting when the backend emits no SSE frames.""" + from agent import chat_completion_helpers as h + + agent = _make_codex_agent(tmp_path, monkeypatch) + monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "90") + monkeypatch.setenv("HERMES_CODEX_TTFB_MAX_SECONDS", "1") + + closes: list = [] + dummy_client = SimpleNamespace() + monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client) + monkeypatch.setattr( + agent, "_abort_request_openai_client", + lambda c, reason=None: closes.append(reason), + ) + monkeypatch.setattr( + agent, "_close_request_openai_client", + lambda c, reason=None: closes.append(reason), + ) + + stop = {"flag": False} + + def fake_hang(api_kwargs, client=None, on_first_delta=None): + deadline = time.time() + 30 + while time.time() < deadline and not stop["flag"] and not agent._interrupt_requested: + time.sleep(0.02) + raise RuntimeError("connection closed") + + monkeypatch.setattr(agent, "_run_codex_stream", fake_hang) + + t0 = time.time() + try: + with pytest.raises(TimeoutError) as excinfo: + h.interruptible_api_call(agent, {"model": "gpt-5.4", "input": "hi"}) + elapsed = time.time() - t0 + assert "TTFB threshold: 1s" in str(excinfo.value) + assert "codex_ttfb_kill" in closes + assert elapsed < 15, f"TTFB watchdog ignored cap and took {elapsed:.1f}s" + finally: + stop["flag"] = True + + def test_ttfb_does_not_kill_when_events_flow(tmp_path, monkeypatch): """Once a stream event has arrived, a generation that runs past the TTFB cutoff is NOT killed by the watchdog — it completes normally.""" @@ -186,6 +229,51 @@ def test_ttfb_does_not_kill_when_events_flow(tmp_path, monkeypatch): assert "codex_ttfb_kill" not in closes +def test_event_idle_kills_after_first_event_then_silence(tmp_path, monkeypatch): + """If Codex emits an opening SSE event and then goes silent, kill it via + the stream-idle watchdog instead of waiting for the long non-stream stale + timeout.""" + from agent import chat_completion_helpers as h + + agent = _make_codex_agent(tmp_path, monkeypatch) + monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "10") + monkeypatch.setenv("HERMES_CODEX_EVENT_STALE_TIMEOUT_SECONDS", "1") + + closes: list = [] + dummy_client = SimpleNamespace() + monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client) + monkeypatch.setattr( + agent, + "_abort_request_openai_client", + lambda c, reason=None: closes.append(reason), + ) + monkeypatch.setattr( + agent, + "_close_request_openai_client", + lambda c, reason=None: closes.append(reason), + ) + + stop = {"flag": False} + + def fake_stream(api_kwargs, client=None, on_first_delta=None): + agent._codex_stream_last_event_ts = time.time() + deadline = time.time() + 30 + while time.time() < deadline and not stop["flag"] and not agent._interrupt_requested: + time.sleep(0.02) + raise RuntimeError("connection closed") + + monkeypatch.setattr(agent, "_run_codex_stream", fake_stream) + + try: + with pytest.raises(TimeoutError) as excinfo: + h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": "hi"}) + assert "after first byte" in str(excinfo.value) + assert "codex_stream_idle_kill" in closes + assert "codex_ttfb_kill" not in closes + finally: + stop["flag"] = True + + def test_ttfb_disabled_via_env_zero(tmp_path, monkeypatch): """Setting HERMES_CODEX_TTFB_TIMEOUT_SECONDS=0 disables the TTFB watchdog; a no-event stall then falls through to the (here, 60s) stale timeout, so a @@ -219,3 +307,77 @@ def test_ttfb_disabled_via_env_zero(tmp_path, monkeypatch): resp = h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": "hi"}) assert resp is sentinel assert "codex_ttfb_kill" not in closes + + +def test_large_codex_request_waits_instead_of_ttfb_reconnect(tmp_path, monkeypatch): + """Large Codex inputs can legitimately take longer than the small-request + first-byte cutoff before the first SSE frame. Preserve the full input and + wait instead of killing/retrying at TTFB.""" + from agent import chat_completion_helpers as h + + agent = _make_codex_agent(tmp_path, monkeypatch) + monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "1") + + closes: list = [] + dummy_client = SimpleNamespace() + monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client) + monkeypatch.setattr( + agent, "_abort_request_openai_client", lambda c, reason=None: closes.append(reason) + ) + monkeypatch.setattr( + agent, "_close_request_openai_client", lambda c, reason=None: closes.append(reason) + ) + + sentinel = SimpleNamespace(ok=True) + + def fake_stream(api_kwargs, client=None, on_first_delta=None): + # No event marker for 2s: this would trip the 1s TTFB watchdog on a + # small request, but should be allowed for a large request. + time.sleep(2.0) + return sentinel + + monkeypatch.setattr(agent, "_run_codex_stream", fake_stream) + + large_input = "x" * 120_000 # ~30k estimated tokens, above large-request gate. + resp = h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": large_input}) + assert resp is sentinel + assert "codex_ttfb_kill" not in closes + + +def test_large_codex_request_strict_ttfb_env_still_reconnects(tmp_path, monkeypatch): + """Operators can force the old early-reconnect behavior for large inputs + with HERMES_CODEX_TTFB_STRICT=1.""" + from agent import chat_completion_helpers as h + + agent = _make_codex_agent(tmp_path, monkeypatch) + monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "1") + monkeypatch.setenv("HERMES_CODEX_TTFB_STRICT", "1") + + closes: list = [] + dummy_client = SimpleNamespace() + monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client) + monkeypatch.setattr( + agent, "_abort_request_openai_client", lambda c, reason=None: closes.append(reason) + ) + monkeypatch.setattr( + agent, "_close_request_openai_client", lambda c, reason=None: closes.append(reason) + ) + + stop = {"flag": False} + + def fake_hang(api_kwargs, client=None, on_first_delta=None): + deadline = time.time() + 30 + while time.time() < deadline and not stop["flag"] and not agent._interrupt_requested: + time.sleep(0.02) + raise RuntimeError("connection closed") + + monkeypatch.setattr(agent, "_run_codex_stream", fake_hang) + + large_input = "x" * 120_000 + try: + with pytest.raises(TimeoutError) as excinfo: + h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": large_input}) + assert "TTFB threshold: 1s" in str(excinfo.value) + assert "codex_ttfb_kill" in closes + finally: + stop["flag"] = True