fix(agent): tolerate large codex stream prefill

This commit is contained in:
Sanghyuk Seo 2026-05-28 02:42:18 +09:00 committed by Teknium
parent 486d632cc2
commit 283bb810e7
2 changed files with 294 additions and 18 deletions

View file

@ -129,6 +129,24 @@ def estimate_request_context_tokens(api_payload: Any) -> int:
return _chars(api_payload) // 4
def _is_openai_codex_backend(agent) -> bool:
base_url_lower = str(getattr(agent, "_base_url_lower", "") or "")
base_url_hostname = str(getattr(agent, "_base_url_hostname", "") or "")
return (
getattr(agent, "provider", None) == "openai-codex"
or (
base_url_hostname == "chatgpt.com"
and "/backend-api/codex" in base_url_lower
)
)
def _env_float(name: str, default: float) -> float:
try:
return float(os.getenv(name, str(default)))
except (TypeError, ValueError):
return default
def interruptible_api_call(agent, api_kwargs: dict):
"""
@ -256,32 +274,89 @@ def interruptible_api_call(agent, api_kwargs: dict):
# apply richer recovery (credential rotation, provider fallback).
_stale_timeout = agent._compute_non_stream_stale_timeout(api_kwargs)
# ── Time-to-first-byte (TTFB) watchdog for the Codex Responses stream ──
# ── Codex Responses stream watchdogs ────────────────────────────────
# The chatgpt.com/backend-api/codex endpoint has an intermittent failure
# mode where it accepts the connection but never emits a single stream
# event (observed directly: 0 events, no HTTP status, the socket just
# hangs). A fresh reconnect succeeds in ~2s, but the wall-clock stale
# timeout (often 180900s) makes us wait minutes before retrying. While no
# stream event has arrived yet we apply a much shorter TTFB cutoff so the
# main retry loop can reconnect promptly. Once the first event arrives the
# stream is healthy, so we fall back to the wall-clock stale timeout and
# never interrupt a legitimate long generation. Gated to codex_responses:
# only that path streams events incrementally (the chat_completions
# non-stream, anthropic and bedrock branches here have no first-event
# signal). The marker advances on *any* event (see codex_runtime), so
# reasoning-only / tool-call-only turns are not mistaken for a stall.
# Operators can tune via HERMES_CODEX_TTFB_TIMEOUT_SECONDS (0 disables).
_ttfb_enabled = agent.api_mode == "codex_responses"
try:
_ttfb_timeout = float(os.getenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "45"))
except (TypeError, ValueError):
_ttfb_timeout = 45.0
# main retry loop can reconnect promptly. Large subscription-backed Codex
# requests can legitimately spend tens of seconds in backend admission /
# prompt prefill before the first SSE event, so the no-byte TTFB watchdog
# is disabled for large chatgpt.com/backend-api/codex requests. A second
# failure mode emits an opening SSE frame and then stalls forever in SSL
# read; for that we watch the gap since the last Codex stream event. This
# matches Codex CLI's stream_idle_timeout model: any valid SSE event is
# activity. Operators can tune via HERMES_CODEX_TTFB_TIMEOUT_SECONDS and
# HERMES_CODEX_EVENT_STALE_TIMEOUT_SECONDS (0 disables each).
_codex_watchdog_enabled = agent.api_mode == "codex_responses"
_openai_codex_backend = _is_openai_codex_backend(agent)
_est_tokens_for_codex_watchdog = estimate_request_context_tokens(api_kwargs)
if _codex_watchdog_enabled and _openai_codex_backend:
if _est_tokens_for_codex_watchdog > 100_000:
_stale_timeout = max(_stale_timeout, 1200.0)
elif _est_tokens_for_codex_watchdog > 50_000:
_stale_timeout = max(_stale_timeout, 900.0)
elif _est_tokens_for_codex_watchdog > 25_000:
_stale_timeout = max(_stale_timeout, 600.0)
if _est_tokens_for_codex_watchdog > 100_000:
_codex_idle_timeout_default = 180.0
elif _est_tokens_for_codex_watchdog > 50_000:
_codex_idle_timeout_default = 120.0
elif _est_tokens_for_codex_watchdog > 10_000:
_codex_idle_timeout_default = 60.0
else:
_codex_idle_timeout_default = 12.0
_ttfb_enabled = _codex_watchdog_enabled
_ttfb_timeout = _env_float("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", 12.0)
if _ttfb_timeout <= 0:
_ttfb_enabled = False
if _ttfb_enabled:
elif _openai_codex_backend:
_ttfb_disable_above = _env_float("HERMES_CODEX_TTFB_DISABLE_ABOVE_TOKENS", 25_000.0)
_ttfb_strict = os.environ.get("HERMES_CODEX_TTFB_STRICT", "").strip().lower() in {
"1", "true", "yes", "on"
}
if (
not _ttfb_strict
and _ttfb_disable_above > 0
and _est_tokens_for_codex_watchdog >= _ttfb_disable_above
):
_ttfb_enabled = False
logger.info(
"Disabling openai-codex no-byte TTFB watchdog for large request "
"(context=~%s tokens >= %.0f). Waiting for backend response instead. "
"Set HERMES_CODEX_TTFB_STRICT=1 to force early reconnects.",
f"{_est_tokens_for_codex_watchdog:,}",
_ttfb_disable_above,
)
else:
_ttfb_cap = _env_float("HERMES_CODEX_TTFB_MAX_SECONDS", 20.0)
if _ttfb_cap > 0 and _ttfb_timeout > _ttfb_cap:
logger.info(
"Capping openai-codex no-byte TTFB timeout from %.0fs to %.0fs "
"(context=~%s tokens). Set HERMES_CODEX_TTFB_MAX_SECONDS to tune.",
_ttfb_timeout,
_ttfb_cap,
f"{_est_tokens_for_codex_watchdog:,}",
)
_ttfb_timeout = _ttfb_cap
_codex_idle_enabled = _codex_watchdog_enabled
_codex_idle_timeout = _env_float(
"HERMES_CODEX_EVENT_STALE_TIMEOUT_SECONDS",
_codex_idle_timeout_default,
)
if _codex_idle_timeout <= 0:
_codex_idle_enabled = False
if _codex_watchdog_enabled:
# Reset before the worker starts so a marker left over from a previous
# call on this agent can't be misread as first-byte for this one.
agent._codex_stream_last_event_ts = None
agent._codex_stream_last_progress_ts = None
_call_start = time.time()
agent._touch_activity("waiting for non-streaming API response")
@ -361,6 +436,45 @@ def interruptible_api_call(agent, api_kwargs: dict):
)
break
# Stream-idle detector: the Codex backend emitted at least one SSE
# frame, then stopped emitting events. Valid keepalive / in_progress
# frames refresh _codex_stream_last_event_ts and should not be killed.
_last_codex_event_ts = getattr(agent, "_codex_stream_last_event_ts", None)
if (
_codex_idle_enabled
and _last_codex_event_ts is not None
and (time.time() - _last_codex_event_ts) > _codex_idle_timeout
):
_event_stale_elapsed = time.time() - _last_codex_event_ts
logger.warning(
"Codex stream produced no SSE events for %.0fs after first byte "
"(threshold %.0fs, model=%s, context=~%s tokens). Killing "
"connection so the retry loop can reconnect.",
_event_stale_elapsed,
_codex_idle_timeout,
api_kwargs.get("model", "unknown"),
f"{_est_tokens_for_codex_watchdog:,}",
)
agent._emit_status(
f"⚠️ Codex stream sent no events for {int(_event_stale_elapsed)}s "
f"after first byte (model: {api_kwargs.get('model', 'unknown')}). "
f"Reconnecting."
)
try:
_close_request_client_once("codex_stream_idle_kill")
except Exception:
pass
agent._touch_activity(
f"codex stream killed after {int(_event_stale_elapsed)}s with no SSE events"
)
t.join(timeout=2.0)
if result["error"] is None and result["response"] is None:
result["error"] = TimeoutError(
f"Codex stream produced no SSE events for {int(_event_stale_elapsed)}s "
f"after first byte (threshold: {int(_codex_idle_timeout)}s)"
)
break
# Stale-call detector: kill the connection if no response
# arrives within the configured timeout.
if _elapsed > _stale_timeout:

View file

@ -4,9 +4,9 @@ The chatgpt.com/backend-api/codex endpoint has an intermittent failure mode
where it accepts the connection but never emits a single stream event. The
watchdog in ``interruptible_api_call`` kills such a connection at a short TTFB
cutoff (instead of waiting out the much longer wall-clock stale timeout) so the
retry loop can reconnect promptly. Once any stream event arrives, the stream is
considered healthy and only the wall-clock stale timeout applies long
generations must never be interrupted by the TTFB cutoff.
retry loop can reconnect promptly. Once any stream event arrives, the TTFB
watchdog is satisfied and a separate idle watchdog handles streams that stop
emitting SSE events.
The "bytes flowing" signal is ``agent._codex_stream_last_event_ts``, set on
*any* event by ``codex_runtime.run_codex_stream`` so reasoning-only or
@ -148,6 +148,49 @@ def test_ttfb_includes_silent_hang_hint_for_gpt_5_5(tmp_path, monkeypatch):
stop["flag"] = True
def test_ttfb_high_env_is_capped_for_openai_codex(tmp_path, monkeypatch):
"""A stale local env value like 90s must not make openai-codex wait 90s
before reconnecting when the backend emits no SSE frames."""
from agent import chat_completion_helpers as h
agent = _make_codex_agent(tmp_path, monkeypatch)
monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "90")
monkeypatch.setenv("HERMES_CODEX_TTFB_MAX_SECONDS", "1")
closes: list = []
dummy_client = SimpleNamespace()
monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client)
monkeypatch.setattr(
agent, "_abort_request_openai_client",
lambda c, reason=None: closes.append(reason),
)
monkeypatch.setattr(
agent, "_close_request_openai_client",
lambda c, reason=None: closes.append(reason),
)
stop = {"flag": False}
def fake_hang(api_kwargs, client=None, on_first_delta=None):
deadline = time.time() + 30
while time.time() < deadline and not stop["flag"] and not agent._interrupt_requested:
time.sleep(0.02)
raise RuntimeError("connection closed")
monkeypatch.setattr(agent, "_run_codex_stream", fake_hang)
t0 = time.time()
try:
with pytest.raises(TimeoutError) as excinfo:
h.interruptible_api_call(agent, {"model": "gpt-5.4", "input": "hi"})
elapsed = time.time() - t0
assert "TTFB threshold: 1s" in str(excinfo.value)
assert "codex_ttfb_kill" in closes
assert elapsed < 15, f"TTFB watchdog ignored cap and took {elapsed:.1f}s"
finally:
stop["flag"] = True
def test_ttfb_does_not_kill_when_events_flow(tmp_path, monkeypatch):
"""Once a stream event has arrived, a generation that runs past the TTFB
cutoff is NOT killed by the watchdog it completes normally."""
@ -186,6 +229,51 @@ def test_ttfb_does_not_kill_when_events_flow(tmp_path, monkeypatch):
assert "codex_ttfb_kill" not in closes
def test_event_idle_kills_after_first_event_then_silence(tmp_path, monkeypatch):
"""If Codex emits an opening SSE event and then goes silent, kill it via
the stream-idle watchdog instead of waiting for the long non-stream stale
timeout."""
from agent import chat_completion_helpers as h
agent = _make_codex_agent(tmp_path, monkeypatch)
monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "10")
monkeypatch.setenv("HERMES_CODEX_EVENT_STALE_TIMEOUT_SECONDS", "1")
closes: list = []
dummy_client = SimpleNamespace()
monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client)
monkeypatch.setattr(
agent,
"_abort_request_openai_client",
lambda c, reason=None: closes.append(reason),
)
monkeypatch.setattr(
agent,
"_close_request_openai_client",
lambda c, reason=None: closes.append(reason),
)
stop = {"flag": False}
def fake_stream(api_kwargs, client=None, on_first_delta=None):
agent._codex_stream_last_event_ts = time.time()
deadline = time.time() + 30
while time.time() < deadline and not stop["flag"] and not agent._interrupt_requested:
time.sleep(0.02)
raise RuntimeError("connection closed")
monkeypatch.setattr(agent, "_run_codex_stream", fake_stream)
try:
with pytest.raises(TimeoutError) as excinfo:
h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": "hi"})
assert "after first byte" in str(excinfo.value)
assert "codex_stream_idle_kill" in closes
assert "codex_ttfb_kill" not in closes
finally:
stop["flag"] = True
def test_ttfb_disabled_via_env_zero(tmp_path, monkeypatch):
"""Setting HERMES_CODEX_TTFB_TIMEOUT_SECONDS=0 disables the TTFB watchdog;
a no-event stall then falls through to the (here, 60s) stale timeout, so a
@ -219,3 +307,77 @@ def test_ttfb_disabled_via_env_zero(tmp_path, monkeypatch):
resp = h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": "hi"})
assert resp is sentinel
assert "codex_ttfb_kill" not in closes
def test_large_codex_request_waits_instead_of_ttfb_reconnect(tmp_path, monkeypatch):
"""Large Codex inputs can legitimately take longer than the small-request
first-byte cutoff before the first SSE frame. Preserve the full input and
wait instead of killing/retrying at TTFB."""
from agent import chat_completion_helpers as h
agent = _make_codex_agent(tmp_path, monkeypatch)
monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "1")
closes: list = []
dummy_client = SimpleNamespace()
monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client)
monkeypatch.setattr(
agent, "_abort_request_openai_client", lambda c, reason=None: closes.append(reason)
)
monkeypatch.setattr(
agent, "_close_request_openai_client", lambda c, reason=None: closes.append(reason)
)
sentinel = SimpleNamespace(ok=True)
def fake_stream(api_kwargs, client=None, on_first_delta=None):
# No event marker for 2s: this would trip the 1s TTFB watchdog on a
# small request, but should be allowed for a large request.
time.sleep(2.0)
return sentinel
monkeypatch.setattr(agent, "_run_codex_stream", fake_stream)
large_input = "x" * 120_000 # ~30k estimated tokens, above large-request gate.
resp = h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": large_input})
assert resp is sentinel
assert "codex_ttfb_kill" not in closes
def test_large_codex_request_strict_ttfb_env_still_reconnects(tmp_path, monkeypatch):
"""Operators can force the old early-reconnect behavior for large inputs
with HERMES_CODEX_TTFB_STRICT=1."""
from agent import chat_completion_helpers as h
agent = _make_codex_agent(tmp_path, monkeypatch)
monkeypatch.setenv("HERMES_CODEX_TTFB_TIMEOUT_SECONDS", "1")
monkeypatch.setenv("HERMES_CODEX_TTFB_STRICT", "1")
closes: list = []
dummy_client = SimpleNamespace()
monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client)
monkeypatch.setattr(
agent, "_abort_request_openai_client", lambda c, reason=None: closes.append(reason)
)
monkeypatch.setattr(
agent, "_close_request_openai_client", lambda c, reason=None: closes.append(reason)
)
stop = {"flag": False}
def fake_hang(api_kwargs, client=None, on_first_delta=None):
deadline = time.time() + 30
while time.time() < deadline and not stop["flag"] and not agent._interrupt_requested:
time.sleep(0.02)
raise RuntimeError("connection closed")
monkeypatch.setattr(agent, "_run_codex_stream", fake_hang)
large_input = "x" * 120_000
try:
with pytest.raises(TimeoutError) as excinfo:
h.interruptible_api_call(agent, {"model": "gpt-5.5", "input": large_input})
assert "TTFB threshold: 1s" in str(excinfo.value)
assert "codex_ttfb_kill" in closes
finally:
stop["flag"] = True