mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
fix(agent): don't retry interrupt-induced transport errors (cascading-interrupt hang)
When agent.interrupt() fires during an active LLM call, the main poll loop force-closes the worker-local httpx client to stop token generation. That raises a transport error (RemoteProtocolError) on the worker thread — the EXPECTED consequence of our own close, not a network bug. The streaming retry loop misclassified it as a transient connection error and retried; each doomed retry stalled for the full stream-stale timeout (up to 300s). Because the gateway caches AIAgent instances per session, the stale worker outlived the interrupted turn and raced the next turn's request on shared client state — the root of the multi-minute cascading-interrupt hang reported in the wild. Fix: a request-local _request_cancelled token set by the poll loop right before the force-close, in both interruptible_api_call (non-streaming) and interruptible_streaming_api_call. The worker's exception handler checks the token and exits cleanly — no retry, no fallback, no 'reconnecting' status — instead of treating the forced error as transient. The token is request- local (not agent._interrupt_requested, which is cleared at turn boundaries) so a stale worker outliving its turn still recognizes its own forced close. Original diagnosis and fix by @kristianvast (PR #6600), against the then- inline methods in run_agent.py. Those were since extracted into agent/chat_completion_helpers.py, so the fix is reapplied there. Co-authored-by: Kristian Vastveit <kristianvast@users.noreply.github.com>
This commit is contained in:
parent
aa6f2775fa
commit
dd0d1222a2
2 changed files with 194 additions and 0 deletions
|
|
@ -139,6 +139,15 @@ def interruptible_api_call(agent, api_kwargs: dict):
|
|||
result = {"response": None, "error": None}
|
||||
request_client_holder = {"client": None, "owner_tid": None}
|
||||
request_client_lock = threading.Lock()
|
||||
# Request-local cancellation flag. Distinct from agent._interrupt_requested
|
||||
# because that flag is cleared at run_conversation() turn boundaries, but
|
||||
# this daemon worker thread can outlive the turn (the gateway caches
|
||||
# AIAgent instances per session). Tracks whether THIS specific request was
|
||||
# cancelled by the main thread's interrupt handler, so the transport error
|
||||
# that is the expected consequence of our own force-close isn't misread as
|
||||
# a network bug and surfaced to the caller. (PR #6600 — cascading interrupt
|
||||
# hang.)
|
||||
_request_cancelled = {"value": False}
|
||||
|
||||
def _set_request_client(client):
|
||||
with request_client_lock:
|
||||
|
|
@ -229,6 +238,17 @@ def interruptible_api_call(agent, api_kwargs: dict):
|
|||
)
|
||||
result["response"] = request_client.chat.completions.create(**api_kwargs)
|
||||
except Exception as e:
|
||||
# If the request was cancelled by the main thread's interrupt
|
||||
# handler, the transport error is the expected consequence of our
|
||||
# own force-close, NOT a network bug. Swallow it instead of
|
||||
# surfacing — the main thread raises InterruptedError. (#6600)
|
||||
if _request_cancelled["value"]:
|
||||
logger.debug(
|
||||
"Non-streaming worker caught %s after request cancellation — "
|
||||
"exiting without surfacing a network error.",
|
||||
type(e).__name__,
|
||||
)
|
||||
return
|
||||
result["error"] = e
|
||||
finally:
|
||||
_close_request_client_once("request_complete")
|
||||
|
|
@ -506,6 +526,14 @@ def interruptible_api_call(agent, api_kwargs: dict):
|
|||
break
|
||||
|
||||
if agent._interrupt_requested:
|
||||
# Mark THIS request cancelled before force-closing so the worker's
|
||||
# exception handler recognizes the forced transport error as a
|
||||
# cancel and exits cleanly instead of surfacing a network error or
|
||||
# (in the streaming path) burning full retry cycles. (#6600)
|
||||
_request_cancelled["value"] = True
|
||||
logger.debug(
|
||||
"Force-closing httpx client due to interrupt (not a network error)."
|
||||
)
|
||||
# Force-close the in-flight worker-local HTTP connection to stop
|
||||
# token generation without poisoning the shared client used to
|
||||
# seed future retries.
|
||||
|
|
@ -1625,6 +1653,14 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
|
|||
result = {"response": None, "error": None, "partial_tool_names": []}
|
||||
request_client_holder = {"client": None, "diag": None, "owner_tid": None}
|
||||
request_client_lock = threading.Lock()
|
||||
# Request-local cancellation flag — see interruptible_api_call for the full
|
||||
# rationale. The streaming retry loop is where the 7-minute cascading-
|
||||
# interrupt hang originated: a force-close raised RemoteProtocolError, the
|
||||
# loop classified it as a transient network error, and burned full retry
|
||||
# cycles (and emitted "reconnecting" noise) on a request the user already
|
||||
# cancelled. The token lets the worker recognize its own forced close and
|
||||
# exit immediately instead of retrying. (PR #6600.)
|
||||
_request_cancelled = {"value": False}
|
||||
|
||||
def _set_request_client(client):
|
||||
with request_client_lock:
|
||||
|
|
@ -2078,6 +2114,21 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
|
|||
result["response"] = _call_chat_completions()
|
||||
return # success
|
||||
except Exception as e:
|
||||
# If the main poll loop force-closed this request because
|
||||
# of an interrupt, the resulting transport error is the
|
||||
# expected consequence of our own close — NOT a transient
|
||||
# network error. Exit immediately: no retry, no fallback,
|
||||
# no "reconnecting" status. The outer poll loop raises
|
||||
# InterruptedError. This is the fix for the cascading-
|
||||
# interrupt hang where doomed retries burned full
|
||||
# stream-stale-timeout cycles. (#6600)
|
||||
if _request_cancelled["value"]:
|
||||
logger.debug(
|
||||
"Streaming worker caught %s after request "
|
||||
"cancellation — exiting without retry.",
|
||||
type(e).__name__,
|
||||
)
|
||||
return
|
||||
_is_timeout = isinstance(
|
||||
e, (_httpx.ReadTimeout, _httpx.ConnectTimeout, _httpx.PoolTimeout)
|
||||
)
|
||||
|
|
@ -2387,6 +2438,15 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
|
|||
)
|
||||
|
||||
if agent._interrupt_requested:
|
||||
# Mark THIS request cancelled before force-closing so the worker's
|
||||
# exception handler recognizes the forced transport error as a
|
||||
# cancel and exits without retrying or surfacing a network error.
|
||||
# (#6600)
|
||||
_request_cancelled["value"] = True
|
||||
logger.debug(
|
||||
"Force-closing streaming httpx client due to interrupt "
|
||||
"(not a network error)."
|
||||
)
|
||||
try:
|
||||
if agent.api_mode == "anthropic_messages":
|
||||
agent._anthropic_client.close()
|
||||
|
|
|
|||
134
tests/agent/test_cascading_interrupt_6600.py
Normal file
134
tests/agent/test_cascading_interrupt_6600.py
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
"""Regression guard for the cascading-interrupt hang (PR #6600).
|
||||
|
||||
Original diagnosis and fix by Kristian Vastveit (@kristianvast) in PR #6600,
|
||||
against the then-inline ``_interruptible_api_call`` /
|
||||
``_interruptible_streaming_api_call`` methods in run_agent.py. Those methods
|
||||
have since been extracted into ``agent/chat_completion_helpers.py``, so the
|
||||
fix is reapplied there and these tests target the extracted functions.
|
||||
|
||||
The bug: when ``agent.interrupt()`` fires during an active LLM call, the main
|
||||
poll loop force-closes the worker-local httpx client to stop token generation.
|
||||
That raises a transport error (RemoteProtocolError) on the worker — the
|
||||
EXPECTED consequence of our own close, not a network bug. The streaming retry
|
||||
loop misclassified it as a transient connection error and retried, each doomed
|
||||
retry stalling for the full stream-stale timeout (up to 300s). Because the
|
||||
gateway caches AIAgent instances per session, the stale worker outlived the
|
||||
turn and raced the next turn's request — the root of the multi-minute
|
||||
cascading-interrupt hang.
|
||||
|
||||
The fix: a request-local ``_request_cancelled`` token set by the poll loop
|
||||
right before the force-close. The worker's exception handler checks it and
|
||||
exits cleanly (no retry, no fallback, no "reconnecting" status) instead of
|
||||
treating the forced error as transient.
|
||||
"""
|
||||
import threading
|
||||
import time
|
||||
import types
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
import httpx
|
||||
import pytest
|
||||
|
||||
from agent import chat_completion_helpers as cch
|
||||
|
||||
|
||||
class _FakeInterruptError(Exception):
|
||||
"""Stand-in for the transport error a force-close raises on the worker."""
|
||||
|
||||
|
||||
def _make_agent():
|
||||
"""A MagicMock agent wired with just enough surface for the helpers."""
|
||||
agent = MagicMock()
|
||||
agent.api_mode = "chat_completions"
|
||||
agent._interrupt_requested = False
|
||||
agent.verbose_logging = False
|
||||
# _compute_non_stream_stale_timeout / streaming setup helpers return
|
||||
# benign values; the real call path is mocked per-test.
|
||||
agent._compute_non_stream_stale_timeout.return_value = 5.0
|
||||
return agent
|
||||
|
||||
|
||||
def test_non_streaming_cancel_does_not_surface_network_error():
|
||||
"""A force-close during a non-streaming call must raise InterruptedError,
|
||||
not the swallowed transport error."""
|
||||
agent = _make_agent()
|
||||
|
||||
create_calls = {"n": 0}
|
||||
fake_client = MagicMock()
|
||||
|
||||
def _create(**kwargs):
|
||||
create_calls["n"] += 1
|
||||
# Simulate the main thread firing an interrupt mid-call, then the
|
||||
# force-close raising a transport error on this worker.
|
||||
agent._interrupt_requested = True
|
||||
time.sleep(0.3) # let the poll loop observe the interrupt + force-close
|
||||
raise httpx.RemoteProtocolError("peer closed connection")
|
||||
|
||||
fake_client.chat.completions.create.side_effect = _create
|
||||
agent._create_request_openai_client.return_value = fake_client
|
||||
agent._close_request_openai_client = MagicMock()
|
||||
agent._abort_request_openai_client = MagicMock()
|
||||
|
||||
t0 = time.time()
|
||||
with pytest.raises(InterruptedError):
|
||||
cch.interruptible_api_call(agent, {"model": "x", "messages": []})
|
||||
elapsed = time.time() - t0
|
||||
|
||||
# The forced RemoteProtocolError must NOT surface as the raised error.
|
||||
assert create_calls["n"] == 1
|
||||
assert elapsed < 3.0, f"interrupt took {elapsed:.1f}s — should be near-instant"
|
||||
|
||||
|
||||
def test_normal_transient_error_still_raises_when_not_cancelled():
|
||||
"""Regression guard: a real transport error with NO interrupt must still
|
||||
surface to the caller (so the outer retry loop can recover)."""
|
||||
agent = _make_agent()
|
||||
fake_client = MagicMock()
|
||||
fake_client.chat.completions.create.side_effect = httpx.RemoteProtocolError(
|
||||
"genuine network drop"
|
||||
)
|
||||
agent._create_request_openai_client.return_value = fake_client
|
||||
agent._close_request_openai_client = MagicMock()
|
||||
agent._abort_request_openai_client = MagicMock()
|
||||
agent._interrupt_requested = False
|
||||
|
||||
with pytest.raises(httpx.RemoteProtocolError):
|
||||
cch.interruptible_api_call(agent, {"model": "x", "messages": []})
|
||||
|
||||
|
||||
def test_request_cancelled_token_is_request_local():
|
||||
"""The cancellation token must be created per call, not shared on the
|
||||
agent — a stale worker from a previous turn must not see the next turn's
|
||||
interrupt flag flip back to False and mistake its own forced error for a
|
||||
network bug. We assert the helper reads agent._interrupt_requested at the
|
||||
force-close site (request-local token set there), by confirming two
|
||||
independent calls don't share cancellation state."""
|
||||
agent = _make_agent()
|
||||
|
||||
# First call: interrupted.
|
||||
fake_client_1 = MagicMock()
|
||||
|
||||
def _create_1(**kwargs):
|
||||
agent._interrupt_requested = True
|
||||
time.sleep(0.3)
|
||||
raise httpx.RemoteProtocolError("forced close turn A")
|
||||
|
||||
fake_client_1.chat.completions.create.side_effect = _create_1
|
||||
agent._create_request_openai_client.return_value = fake_client_1
|
||||
agent._close_request_openai_client = MagicMock()
|
||||
agent._abort_request_openai_client = MagicMock()
|
||||
|
||||
with pytest.raises(InterruptedError):
|
||||
cch.interruptible_api_call(agent, {"model": "x", "messages": []})
|
||||
|
||||
# Second call: NOT interrupted (turn boundary cleared the flag). A genuine
|
||||
# error must still surface — the previous call's cancellation must not leak.
|
||||
agent._interrupt_requested = False
|
||||
fake_client_2 = MagicMock()
|
||||
fake_client_2.chat.completions.create.side_effect = httpx.RemoteProtocolError(
|
||||
"genuine drop turn B"
|
||||
)
|
||||
agent._create_request_openai_client.return_value = fake_client_2
|
||||
|
||||
with pytest.raises(httpx.RemoteProtocolError):
|
||||
cch.interruptible_api_call(agent, {"model": "x", "messages": []})
|
||||
Loading…
Add table
Add a link
Reference in a new issue