From 615ad97928f042d401b83142fd7d9daf50f48915 Mon Sep 17 00:00:00 2001 From: xxxigm <54813621+xxxigm@users.noreply.github.com> Date: Thu, 11 Jun 2026 08:21:38 +0700 Subject: [PATCH] fix(streaming): stop socket read timeout from preempting stale-stream detector (#43570) * fix(streaming): stop socket read timeout from preempting stale-stream detector The stale-stream detector is deliberately scaled to 180-300s so reasoning models (e.g. Opus) can pause mid-stream during extended thinking. But the httpx socket read timeout stayed at a flat 120s for cloud providers and fired first, tearing down healthy reasoning streams before the detector (which owns retry + diagnostics) could act. Symptom: every Copilot/Opus turn dies with ReadTimeout at a consistent ~125s and never completes. Floor the cloud socket read timeout at the stale-stream timeout so it can no longer fire before the detector. Local providers and explicit HERMES_STREAM_READ_TIMEOUT / request_timeout_seconds overrides are unchanged. * test(streaming): pin read-timeout >= stale-stream invariant for cloud reasoning streams Cover the contract that the httpx socket read timeout is never shorter than the stale-stream detector for cloud providers on the default: small contexts floor to 180s, >=50K to 240s, >=100K to 300s; explicit overrides win; local providers and the unresolved-value fallback are unaffected. --- agent/chat_completion_helpers.py | 28 +++++ tests/agent/test_stream_read_timeout_floor.py | 111 ++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 tests/agent/test_stream_read_timeout_floor.py diff --git a/agent/chat_completion_helpers.py b/agent/chat_completion_helpers.py index ce066d55640..783e85b3e93 100644 --- a/agent/chat_completion_helpers.py +++ b/agent/chat_completion_helpers.py @@ -1698,6 +1698,14 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= # poll loop uses this to detect stale connections that keep receiving # SSE keep-alive pings but no actual data. last_chunk_time = {"t": time.time()} + # Stale-stream patience, shared between the httpx socket read timeout + # (built in ``_call_chat_completions`` below) and the stale-stream detector + # (computed further down, before the worker thread starts). Initialized + # here so the read-timeout builder can floor itself at the stale value and + # never fire before the detector. ``None`` until the detector value is + # resolved, so the builder degrades to its plain default if it ever runs + # first. + _stream_stale_timeout = None def _fire_first_delta(): if not first_delta_fired["done"] and on_first_delta: @@ -1734,6 +1742,26 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= "Local provider detected (%s) — stream read timeout raised to %.0fs", agent.base_url, _stream_read_timeout, ) + elif ( + _stream_read_timeout == 120.0 + and _stream_stale_timeout is not None + and _stream_stale_timeout != float("inf") + and _stream_stale_timeout > _stream_read_timeout + ): + # Cloud reasoning models (e.g. Opus) routinely pause mid-stream + # for minutes during extended thinking. The stale-stream + # detector is deliberately scaled up to tolerate this (180–300s, + # see the stale-timeout block below), but the raw httpx socket + # read timeout defaulted to a flat 120s and fired *first* — + # tearing down a healthy reasoning stream before the stale + # detector (which owns retry + diagnostics) could act. Keep the + # socket read timeout in step with the detector so it no longer + # preempts it. + _stream_read_timeout = _stream_stale_timeout + logger.debug( + "Cloud reasoning stream — read timeout raised to %.0fs to " + "match stale-stream detector", _stream_read_timeout, + ) # Cap connect/pool at 60s even when provider timeout is higher. # connect/pool cover TCP handshake, not model inference. _conn_cap = min(_base_timeout, 60.0) if _provider_timeout_cfg is not None else 30.0 diff --git a/tests/agent/test_stream_read_timeout_floor.py b/tests/agent/test_stream_read_timeout_floor.py new file mode 100644 index 00000000000..0949866a046 --- /dev/null +++ b/tests/agent/test_stream_read_timeout_floor.py @@ -0,0 +1,111 @@ +"""Stream read timeout must never preempt the stale-stream detector. + +Reasoning models (e.g. Opus) routinely pause mid-stream for minutes during +extended thinking. The stale-stream detector is deliberately scaled up to +tolerate this (180s base, raised to 240s/300s for large contexts). The httpx +socket read timeout, however, defaulted to a flat 120s for cloud providers and +fired *first* — tearing down a healthy reasoning stream before the stale +detector (which owns retry + diagnostics) could act. + +These tests pin the invariant: for a cloud provider on the default read +timeout, the httpx socket read timeout is floored at the stale-stream timeout +so it can never fire before the detector. They mirror the inline logic in +``agent/chat_completion_helpers.py`` (the real builder lives deep inside a +worker thread, so — like ``test_local_stream_timeout.py`` — the resolution is +reproduced here rather than driven end-to-end). +""" + +import os + +import pytest + +from agent.model_metadata import is_local_endpoint + + +def _resolve_stale_timeout(base_url, est_tokens, stale_base=180.0): + """Mirror of the stale-stream detector resolution.""" + if stale_base == 180.0 and base_url and is_local_endpoint(base_url): + return float("inf") # detector disabled for local providers + if est_tokens > 100_000: + return max(stale_base, 300.0) + if est_tokens > 50_000: + return max(stale_base, 240.0) + return stale_base + + +def _resolve_read_timeout(base_url, stale_timeout, base_timeout=1800.0): + """Mirror of the httpx socket read-timeout builder (cloud branch).""" + read_timeout = float(os.getenv("HERMES_STREAM_READ_TIMEOUT", 120.0)) + if read_timeout == 120.0 and base_url and is_local_endpoint(base_url): + read_timeout = base_timeout + elif ( + read_timeout == 120.0 + and stale_timeout is not None + and stale_timeout != float("inf") + and stale_timeout > read_timeout + ): + read_timeout = stale_timeout + return read_timeout + + +CLOUD_URLS = [ + "https://api.githubcopilot.com", + "https://api.openai.com", + "https://openrouter.ai/api", + "https://api.anthropic.com", +] + + +class TestCloudReadTimeoutFloor: + @pytest.fixture(autouse=True) + def _clear_env(self): + with pytest.MonkeyPatch.context() as mp: + mp.delenv("HERMES_STREAM_READ_TIMEOUT", raising=False) + yield + + @pytest.mark.parametrize("base_url", CLOUD_URLS) + @pytest.mark.parametrize("est_tokens", [0, 10_000, 60_000, 150_000]) + def test_read_timeout_never_below_stale(self, base_url, est_tokens): + """Core invariant: the socket read timeout >= the stale detector.""" + stale = _resolve_stale_timeout(base_url, est_tokens) + read = _resolve_read_timeout(base_url, stale) + assert read >= stale + + @pytest.mark.parametrize("base_url", CLOUD_URLS) + def test_small_context_floored_to_stale_base(self, base_url): + """Reported case: ~120s timeouts on Copilot are raised to the 180s base.""" + stale = _resolve_stale_timeout(base_url, est_tokens=37_000) + read = _resolve_read_timeout(base_url, stale) + assert read == 180.0 + + @pytest.mark.parametrize("base_url", CLOUD_URLS) + def test_large_context_tracks_scaled_stale(self, base_url): + """Big contexts scale the stale detector; the read timeout follows.""" + assert _resolve_read_timeout(base_url, _resolve_stale_timeout(base_url, 60_000)) == 240.0 + assert _resolve_read_timeout(base_url, _resolve_stale_timeout(base_url, 150_000)) == 300.0 + + def test_user_override_is_respected(self): + """An explicit HERMES_STREAM_READ_TIMEOUT is never overridden by the floor.""" + with pytest.MonkeyPatch.context() as mp: + mp.setenv("HERMES_STREAM_READ_TIMEOUT", "90") + stale = _resolve_stale_timeout("https://api.githubcopilot.com", est_tokens=0) + assert _resolve_read_timeout("https://api.githubcopilot.com", stale) == 90.0 + + +class TestLocalUnaffected: + @pytest.fixture(autouse=True) + def _clear_env(self): + with pytest.MonkeyPatch.context() as mp: + mp.delenv("HERMES_STREAM_READ_TIMEOUT", raising=False) + yield + + def test_local_still_raised_to_base(self): + """Local providers keep their existing behavior (raise to base timeout).""" + stale = _resolve_stale_timeout("http://localhost:11434", est_tokens=0) + assert stale == float("inf") # detector disabled for local + read = _resolve_read_timeout("http://localhost:11434", stale) + assert read == 1800.0 # not clamped by inf + + def test_stale_none_falls_back_to_default(self): + """If the stale value is unresolved, the read timeout keeps its default.""" + assert _resolve_read_timeout("https://api.githubcopilot.com", None) == 120.0