mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-12 08:51:53 +00:00
fix(streaming): stop socket read timeout from preempting stale-stream detector (#43570)
* fix(streaming): stop socket read timeout from preempting stale-stream detector The stale-stream detector is deliberately scaled to 180-300s so reasoning models (e.g. Opus) can pause mid-stream during extended thinking. But the httpx socket read timeout stayed at a flat 120s for cloud providers and fired first, tearing down healthy reasoning streams before the detector (which owns retry + diagnostics) could act. Symptom: every Copilot/Opus turn dies with ReadTimeout at a consistent ~125s and never completes. Floor the cloud socket read timeout at the stale-stream timeout so it can no longer fire before the detector. Local providers and explicit HERMES_STREAM_READ_TIMEOUT / request_timeout_seconds overrides are unchanged. * test(streaming): pin read-timeout >= stale-stream invariant for cloud reasoning streams Cover the contract that the httpx socket read timeout is never shorter than the stale-stream detector for cloud providers on the default: small contexts floor to 180s, >=50K to 240s, >=100K to 300s; explicit overrides win; local providers and the unresolved-value fallback are unaffected.
This commit is contained in:
parent
9dd9ef0ec9
commit
615ad97928
2 changed files with 139 additions and 0 deletions
|
|
@ -1698,6 +1698,14 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
|
|||
# poll loop uses this to detect stale connections that keep receiving
|
||||
# SSE keep-alive pings but no actual data.
|
||||
last_chunk_time = {"t": time.time()}
|
||||
# Stale-stream patience, shared between the httpx socket read timeout
|
||||
# (built in ``_call_chat_completions`` below) and the stale-stream detector
|
||||
# (computed further down, before the worker thread starts). Initialized
|
||||
# here so the read-timeout builder can floor itself at the stale value and
|
||||
# never fire before the detector. ``None`` until the detector value is
|
||||
# resolved, so the builder degrades to its plain default if it ever runs
|
||||
# first.
|
||||
_stream_stale_timeout = None
|
||||
|
||||
def _fire_first_delta():
|
||||
if not first_delta_fired["done"] and on_first_delta:
|
||||
|
|
@ -1734,6 +1742,26 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
|
|||
"Local provider detected (%s) — stream read timeout raised to %.0fs",
|
||||
agent.base_url, _stream_read_timeout,
|
||||
)
|
||||
elif (
|
||||
_stream_read_timeout == 120.0
|
||||
and _stream_stale_timeout is not None
|
||||
and _stream_stale_timeout != float("inf")
|
||||
and _stream_stale_timeout > _stream_read_timeout
|
||||
):
|
||||
# Cloud reasoning models (e.g. Opus) routinely pause mid-stream
|
||||
# for minutes during extended thinking. The stale-stream
|
||||
# detector is deliberately scaled up to tolerate this (180–300s,
|
||||
# see the stale-timeout block below), but the raw httpx socket
|
||||
# read timeout defaulted to a flat 120s and fired *first* —
|
||||
# tearing down a healthy reasoning stream before the stale
|
||||
# detector (which owns retry + diagnostics) could act. Keep the
|
||||
# socket read timeout in step with the detector so it no longer
|
||||
# preempts it.
|
||||
_stream_read_timeout = _stream_stale_timeout
|
||||
logger.debug(
|
||||
"Cloud reasoning stream — read timeout raised to %.0fs to "
|
||||
"match stale-stream detector", _stream_read_timeout,
|
||||
)
|
||||
# Cap connect/pool at 60s even when provider timeout is higher.
|
||||
# connect/pool cover TCP handshake, not model inference.
|
||||
_conn_cap = min(_base_timeout, 60.0) if _provider_timeout_cfg is not None else 30.0
|
||||
|
|
|
|||
111
tests/agent/test_stream_read_timeout_floor.py
Normal file
111
tests/agent/test_stream_read_timeout_floor.py
Normal file
|
|
@ -0,0 +1,111 @@
|
|||
"""Stream read timeout must never preempt the stale-stream detector.
|
||||
|
||||
Reasoning models (e.g. Opus) routinely pause mid-stream for minutes during
|
||||
extended thinking. The stale-stream detector is deliberately scaled up to
|
||||
tolerate this (180s base, raised to 240s/300s for large contexts). The httpx
|
||||
socket read timeout, however, defaulted to a flat 120s for cloud providers and
|
||||
fired *first* — tearing down a healthy reasoning stream before the stale
|
||||
detector (which owns retry + diagnostics) could act.
|
||||
|
||||
These tests pin the invariant: for a cloud provider on the default read
|
||||
timeout, the httpx socket read timeout is floored at the stale-stream timeout
|
||||
so it can never fire before the detector. They mirror the inline logic in
|
||||
``agent/chat_completion_helpers.py`` (the real builder lives deep inside a
|
||||
worker thread, so — like ``test_local_stream_timeout.py`` — the resolution is
|
||||
reproduced here rather than driven end-to-end).
|
||||
"""
|
||||
|
||||
import os
|
||||
|
||||
import pytest
|
||||
|
||||
from agent.model_metadata import is_local_endpoint
|
||||
|
||||
|
||||
def _resolve_stale_timeout(base_url, est_tokens, stale_base=180.0):
|
||||
"""Mirror of the stale-stream detector resolution."""
|
||||
if stale_base == 180.0 and base_url and is_local_endpoint(base_url):
|
||||
return float("inf") # detector disabled for local providers
|
||||
if est_tokens > 100_000:
|
||||
return max(stale_base, 300.0)
|
||||
if est_tokens > 50_000:
|
||||
return max(stale_base, 240.0)
|
||||
return stale_base
|
||||
|
||||
|
||||
def _resolve_read_timeout(base_url, stale_timeout, base_timeout=1800.0):
|
||||
"""Mirror of the httpx socket read-timeout builder (cloud branch)."""
|
||||
read_timeout = float(os.getenv("HERMES_STREAM_READ_TIMEOUT", 120.0))
|
||||
if read_timeout == 120.0 and base_url and is_local_endpoint(base_url):
|
||||
read_timeout = base_timeout
|
||||
elif (
|
||||
read_timeout == 120.0
|
||||
and stale_timeout is not None
|
||||
and stale_timeout != float("inf")
|
||||
and stale_timeout > read_timeout
|
||||
):
|
||||
read_timeout = stale_timeout
|
||||
return read_timeout
|
||||
|
||||
|
||||
CLOUD_URLS = [
|
||||
"https://api.githubcopilot.com",
|
||||
"https://api.openai.com",
|
||||
"https://openrouter.ai/api",
|
||||
"https://api.anthropic.com",
|
||||
]
|
||||
|
||||
|
||||
class TestCloudReadTimeoutFloor:
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clear_env(self):
|
||||
with pytest.MonkeyPatch.context() as mp:
|
||||
mp.delenv("HERMES_STREAM_READ_TIMEOUT", raising=False)
|
||||
yield
|
||||
|
||||
@pytest.mark.parametrize("base_url", CLOUD_URLS)
|
||||
@pytest.mark.parametrize("est_tokens", [0, 10_000, 60_000, 150_000])
|
||||
def test_read_timeout_never_below_stale(self, base_url, est_tokens):
|
||||
"""Core invariant: the socket read timeout >= the stale detector."""
|
||||
stale = _resolve_stale_timeout(base_url, est_tokens)
|
||||
read = _resolve_read_timeout(base_url, stale)
|
||||
assert read >= stale
|
||||
|
||||
@pytest.mark.parametrize("base_url", CLOUD_URLS)
|
||||
def test_small_context_floored_to_stale_base(self, base_url):
|
||||
"""Reported case: ~120s timeouts on Copilot are raised to the 180s base."""
|
||||
stale = _resolve_stale_timeout(base_url, est_tokens=37_000)
|
||||
read = _resolve_read_timeout(base_url, stale)
|
||||
assert read == 180.0
|
||||
|
||||
@pytest.mark.parametrize("base_url", CLOUD_URLS)
|
||||
def test_large_context_tracks_scaled_stale(self, base_url):
|
||||
"""Big contexts scale the stale detector; the read timeout follows."""
|
||||
assert _resolve_read_timeout(base_url, _resolve_stale_timeout(base_url, 60_000)) == 240.0
|
||||
assert _resolve_read_timeout(base_url, _resolve_stale_timeout(base_url, 150_000)) == 300.0
|
||||
|
||||
def test_user_override_is_respected(self):
|
||||
"""An explicit HERMES_STREAM_READ_TIMEOUT is never overridden by the floor."""
|
||||
with pytest.MonkeyPatch.context() as mp:
|
||||
mp.setenv("HERMES_STREAM_READ_TIMEOUT", "90")
|
||||
stale = _resolve_stale_timeout("https://api.githubcopilot.com", est_tokens=0)
|
||||
assert _resolve_read_timeout("https://api.githubcopilot.com", stale) == 90.0
|
||||
|
||||
|
||||
class TestLocalUnaffected:
|
||||
@pytest.fixture(autouse=True)
|
||||
def _clear_env(self):
|
||||
with pytest.MonkeyPatch.context() as mp:
|
||||
mp.delenv("HERMES_STREAM_READ_TIMEOUT", raising=False)
|
||||
yield
|
||||
|
||||
def test_local_still_raised_to_base(self):
|
||||
"""Local providers keep their existing behavior (raise to base timeout)."""
|
||||
stale = _resolve_stale_timeout("http://localhost:11434", est_tokens=0)
|
||||
assert stale == float("inf") # detector disabled for local
|
||||
read = _resolve_read_timeout("http://localhost:11434", stale)
|
||||
assert read == 1800.0 # not clamped by inf
|
||||
|
||||
def test_stale_none_falls_back_to_default(self):
|
||||
"""If the stale value is unresolved, the read timeout keeps its default."""
|
||||
assert _resolve_read_timeout("https://api.githubcopilot.com", None) == 120.0
|
||||
Loading…
Add table
Add a link
Reference in a new issue