mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-09 08:21:50 +00:00
fix(agent): don't retry interrupt-induced transport errors (cascading-interrupt hang)
When agent.interrupt() fires during an active LLM call, the main poll loop force-closes the worker-local httpx client to stop token generation. That raises a transport error (RemoteProtocolError) on the worker thread — the EXPECTED consequence of our own close, not a network bug. The streaming retry loop misclassified it as a transient connection error and retried; each doomed retry stalled for the full stream-stale timeout (up to 300s). Because the gateway caches AIAgent instances per session, the stale worker outlived the interrupted turn and raced the next turn's request on shared client state — the root of the multi-minute cascading-interrupt hang reported in the wild. Fix: a request-local _request_cancelled token set by the poll loop right before the force-close, in both interruptible_api_call (non-streaming) and interruptible_streaming_api_call. The worker's exception handler checks the token and exits cleanly — no retry, no fallback, no 'reconnecting' status — instead of treating the forced error as transient. The token is request- local (not agent._interrupt_requested, which is cleared at turn boundaries) so a stale worker outliving its turn still recognizes its own forced close. Original diagnosis and fix by @kristianvast (PR #6600), against the then- inline methods in run_agent.py. Those were since extracted into agent/chat_completion_helpers.py, so the fix is reapplied there. Co-authored-by: Kristian Vastveit <kristianvast@users.noreply.github.com>
This commit is contained in:
parent
aa6f2775fa
commit
dd0d1222a2
2 changed files with 194 additions and 0 deletions
|
|
@ -139,6 +139,15 @@ def interruptible_api_call(agent, api_kwargs: dict):
|
||||||
result = {"response": None, "error": None}
|
result = {"response": None, "error": None}
|
||||||
request_client_holder = {"client": None, "owner_tid": None}
|
request_client_holder = {"client": None, "owner_tid": None}
|
||||||
request_client_lock = threading.Lock()
|
request_client_lock = threading.Lock()
|
||||||
|
# Request-local cancellation flag. Distinct from agent._interrupt_requested
|
||||||
|
# because that flag is cleared at run_conversation() turn boundaries, but
|
||||||
|
# this daemon worker thread can outlive the turn (the gateway caches
|
||||||
|
# AIAgent instances per session). Tracks whether THIS specific request was
|
||||||
|
# cancelled by the main thread's interrupt handler, so the transport error
|
||||||
|
# that is the expected consequence of our own force-close isn't misread as
|
||||||
|
# a network bug and surfaced to the caller. (PR #6600 — cascading interrupt
|
||||||
|
# hang.)
|
||||||
|
_request_cancelled = {"value": False}
|
||||||
|
|
||||||
def _set_request_client(client):
|
def _set_request_client(client):
|
||||||
with request_client_lock:
|
with request_client_lock:
|
||||||
|
|
@ -229,6 +238,17 @@ def interruptible_api_call(agent, api_kwargs: dict):
|
||||||
)
|
)
|
||||||
result["response"] = request_client.chat.completions.create(**api_kwargs)
|
result["response"] = request_client.chat.completions.create(**api_kwargs)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# If the request was cancelled by the main thread's interrupt
|
||||||
|
# handler, the transport error is the expected consequence of our
|
||||||
|
# own force-close, NOT a network bug. Swallow it instead of
|
||||||
|
# surfacing — the main thread raises InterruptedError. (#6600)
|
||||||
|
if _request_cancelled["value"]:
|
||||||
|
logger.debug(
|
||||||
|
"Non-streaming worker caught %s after request cancellation — "
|
||||||
|
"exiting without surfacing a network error.",
|
||||||
|
type(e).__name__,
|
||||||
|
)
|
||||||
|
return
|
||||||
result["error"] = e
|
result["error"] = e
|
||||||
finally:
|
finally:
|
||||||
_close_request_client_once("request_complete")
|
_close_request_client_once("request_complete")
|
||||||
|
|
@ -506,6 +526,14 @@ def interruptible_api_call(agent, api_kwargs: dict):
|
||||||
break
|
break
|
||||||
|
|
||||||
if agent._interrupt_requested:
|
if agent._interrupt_requested:
|
||||||
|
# Mark THIS request cancelled before force-closing so the worker's
|
||||||
|
# exception handler recognizes the forced transport error as a
|
||||||
|
# cancel and exits cleanly instead of surfacing a network error or
|
||||||
|
# (in the streaming path) burning full retry cycles. (#6600)
|
||||||
|
_request_cancelled["value"] = True
|
||||||
|
logger.debug(
|
||||||
|
"Force-closing httpx client due to interrupt (not a network error)."
|
||||||
|
)
|
||||||
# Force-close the in-flight worker-local HTTP connection to stop
|
# Force-close the in-flight worker-local HTTP connection to stop
|
||||||
# token generation without poisoning the shared client used to
|
# token generation without poisoning the shared client used to
|
||||||
# seed future retries.
|
# seed future retries.
|
||||||
|
|
@ -1625,6 +1653,14 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
|
||||||
result = {"response": None, "error": None, "partial_tool_names": []}
|
result = {"response": None, "error": None, "partial_tool_names": []}
|
||||||
request_client_holder = {"client": None, "diag": None, "owner_tid": None}
|
request_client_holder = {"client": None, "diag": None, "owner_tid": None}
|
||||||
request_client_lock = threading.Lock()
|
request_client_lock = threading.Lock()
|
||||||
|
# Request-local cancellation flag — see interruptible_api_call for the full
|
||||||
|
# rationale. The streaming retry loop is where the 7-minute cascading-
|
||||||
|
# interrupt hang originated: a force-close raised RemoteProtocolError, the
|
||||||
|
# loop classified it as a transient network error, and burned full retry
|
||||||
|
# cycles (and emitted "reconnecting" noise) on a request the user already
|
||||||
|
# cancelled. The token lets the worker recognize its own forced close and
|
||||||
|
# exit immediately instead of retrying. (PR #6600.)
|
||||||
|
_request_cancelled = {"value": False}
|
||||||
|
|
||||||
def _set_request_client(client):
|
def _set_request_client(client):
|
||||||
with request_client_lock:
|
with request_client_lock:
|
||||||
|
|
@ -2078,6 +2114,21 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
|
||||||
result["response"] = _call_chat_completions()
|
result["response"] = _call_chat_completions()
|
||||||
return # success
|
return # success
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# If the main poll loop force-closed this request because
|
||||||
|
# of an interrupt, the resulting transport error is the
|
||||||
|
# expected consequence of our own close — NOT a transient
|
||||||
|
# network error. Exit immediately: no retry, no fallback,
|
||||||
|
# no "reconnecting" status. The outer poll loop raises
|
||||||
|
# InterruptedError. This is the fix for the cascading-
|
||||||
|
# interrupt hang where doomed retries burned full
|
||||||
|
# stream-stale-timeout cycles. (#6600)
|
||||||
|
if _request_cancelled["value"]:
|
||||||
|
logger.debug(
|
||||||
|
"Streaming worker caught %s after request "
|
||||||
|
"cancellation — exiting without retry.",
|
||||||
|
type(e).__name__,
|
||||||
|
)
|
||||||
|
return
|
||||||
_is_timeout = isinstance(
|
_is_timeout = isinstance(
|
||||||
e, (_httpx.ReadTimeout, _httpx.ConnectTimeout, _httpx.PoolTimeout)
|
e, (_httpx.ReadTimeout, _httpx.ConnectTimeout, _httpx.PoolTimeout)
|
||||||
)
|
)
|
||||||
|
|
@ -2387,6 +2438,15 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
|
||||||
)
|
)
|
||||||
|
|
||||||
if agent._interrupt_requested:
|
if agent._interrupt_requested:
|
||||||
|
# Mark THIS request cancelled before force-closing so the worker's
|
||||||
|
# exception handler recognizes the forced transport error as a
|
||||||
|
# cancel and exits without retrying or surfacing a network error.
|
||||||
|
# (#6600)
|
||||||
|
_request_cancelled["value"] = True
|
||||||
|
logger.debug(
|
||||||
|
"Force-closing streaming httpx client due to interrupt "
|
||||||
|
"(not a network error)."
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
if agent.api_mode == "anthropic_messages":
|
if agent.api_mode == "anthropic_messages":
|
||||||
agent._anthropic_client.close()
|
agent._anthropic_client.close()
|
||||||
|
|
|
||||||
134
tests/agent/test_cascading_interrupt_6600.py
Normal file
134
tests/agent/test_cascading_interrupt_6600.py
Normal file
|
|
@ -0,0 +1,134 @@
|
||||||
|
"""Regression guard for the cascading-interrupt hang (PR #6600).
|
||||||
|
|
||||||
|
Original diagnosis and fix by Kristian Vastveit (@kristianvast) in PR #6600,
|
||||||
|
against the then-inline ``_interruptible_api_call`` /
|
||||||
|
``_interruptible_streaming_api_call`` methods in run_agent.py. Those methods
|
||||||
|
have since been extracted into ``agent/chat_completion_helpers.py``, so the
|
||||||
|
fix is reapplied there and these tests target the extracted functions.
|
||||||
|
|
||||||
|
The bug: when ``agent.interrupt()`` fires during an active LLM call, the main
|
||||||
|
poll loop force-closes the worker-local httpx client to stop token generation.
|
||||||
|
That raises a transport error (RemoteProtocolError) on the worker — the
|
||||||
|
EXPECTED consequence of our own close, not a network bug. The streaming retry
|
||||||
|
loop misclassified it as a transient connection error and retried, each doomed
|
||||||
|
retry stalling for the full stream-stale timeout (up to 300s). Because the
|
||||||
|
gateway caches AIAgent instances per session, the stale worker outlived the
|
||||||
|
turn and raced the next turn's request — the root of the multi-minute
|
||||||
|
cascading-interrupt hang.
|
||||||
|
|
||||||
|
The fix: a request-local ``_request_cancelled`` token set by the poll loop
|
||||||
|
right before the force-close. The worker's exception handler checks it and
|
||||||
|
exits cleanly (no retry, no fallback, no "reconnecting" status) instead of
|
||||||
|
treating the forced error as transient.
|
||||||
|
"""
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
import types
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from agent import chat_completion_helpers as cch
|
||||||
|
|
||||||
|
|
||||||
|
class _FakeInterruptError(Exception):
|
||||||
|
"""Stand-in for the transport error a force-close raises on the worker."""
|
||||||
|
|
||||||
|
|
||||||
|
def _make_agent():
|
||||||
|
"""A MagicMock agent wired with just enough surface for the helpers."""
|
||||||
|
agent = MagicMock()
|
||||||
|
agent.api_mode = "chat_completions"
|
||||||
|
agent._interrupt_requested = False
|
||||||
|
agent.verbose_logging = False
|
||||||
|
# _compute_non_stream_stale_timeout / streaming setup helpers return
|
||||||
|
# benign values; the real call path is mocked per-test.
|
||||||
|
agent._compute_non_stream_stale_timeout.return_value = 5.0
|
||||||
|
return agent
|
||||||
|
|
||||||
|
|
||||||
|
def test_non_streaming_cancel_does_not_surface_network_error():
|
||||||
|
"""A force-close during a non-streaming call must raise InterruptedError,
|
||||||
|
not the swallowed transport error."""
|
||||||
|
agent = _make_agent()
|
||||||
|
|
||||||
|
create_calls = {"n": 0}
|
||||||
|
fake_client = MagicMock()
|
||||||
|
|
||||||
|
def _create(**kwargs):
|
||||||
|
create_calls["n"] += 1
|
||||||
|
# Simulate the main thread firing an interrupt mid-call, then the
|
||||||
|
# force-close raising a transport error on this worker.
|
||||||
|
agent._interrupt_requested = True
|
||||||
|
time.sleep(0.3) # let the poll loop observe the interrupt + force-close
|
||||||
|
raise httpx.RemoteProtocolError("peer closed connection")
|
||||||
|
|
||||||
|
fake_client.chat.completions.create.side_effect = _create
|
||||||
|
agent._create_request_openai_client.return_value = fake_client
|
||||||
|
agent._close_request_openai_client = MagicMock()
|
||||||
|
agent._abort_request_openai_client = MagicMock()
|
||||||
|
|
||||||
|
t0 = time.time()
|
||||||
|
with pytest.raises(InterruptedError):
|
||||||
|
cch.interruptible_api_call(agent, {"model": "x", "messages": []})
|
||||||
|
elapsed = time.time() - t0
|
||||||
|
|
||||||
|
# The forced RemoteProtocolError must NOT surface as the raised error.
|
||||||
|
assert create_calls["n"] == 1
|
||||||
|
assert elapsed < 3.0, f"interrupt took {elapsed:.1f}s — should be near-instant"
|
||||||
|
|
||||||
|
|
||||||
|
def test_normal_transient_error_still_raises_when_not_cancelled():
|
||||||
|
"""Regression guard: a real transport error with NO interrupt must still
|
||||||
|
surface to the caller (so the outer retry loop can recover)."""
|
||||||
|
agent = _make_agent()
|
||||||
|
fake_client = MagicMock()
|
||||||
|
fake_client.chat.completions.create.side_effect = httpx.RemoteProtocolError(
|
||||||
|
"genuine network drop"
|
||||||
|
)
|
||||||
|
agent._create_request_openai_client.return_value = fake_client
|
||||||
|
agent._close_request_openai_client = MagicMock()
|
||||||
|
agent._abort_request_openai_client = MagicMock()
|
||||||
|
agent._interrupt_requested = False
|
||||||
|
|
||||||
|
with pytest.raises(httpx.RemoteProtocolError):
|
||||||
|
cch.interruptible_api_call(agent, {"model": "x", "messages": []})
|
||||||
|
|
||||||
|
|
||||||
|
def test_request_cancelled_token_is_request_local():
|
||||||
|
"""The cancellation token must be created per call, not shared on the
|
||||||
|
agent — a stale worker from a previous turn must not see the next turn's
|
||||||
|
interrupt flag flip back to False and mistake its own forced error for a
|
||||||
|
network bug. We assert the helper reads agent._interrupt_requested at the
|
||||||
|
force-close site (request-local token set there), by confirming two
|
||||||
|
independent calls don't share cancellation state."""
|
||||||
|
agent = _make_agent()
|
||||||
|
|
||||||
|
# First call: interrupted.
|
||||||
|
fake_client_1 = MagicMock()
|
||||||
|
|
||||||
|
def _create_1(**kwargs):
|
||||||
|
agent._interrupt_requested = True
|
||||||
|
time.sleep(0.3)
|
||||||
|
raise httpx.RemoteProtocolError("forced close turn A")
|
||||||
|
|
||||||
|
fake_client_1.chat.completions.create.side_effect = _create_1
|
||||||
|
agent._create_request_openai_client.return_value = fake_client_1
|
||||||
|
agent._close_request_openai_client = MagicMock()
|
||||||
|
agent._abort_request_openai_client = MagicMock()
|
||||||
|
|
||||||
|
with pytest.raises(InterruptedError):
|
||||||
|
cch.interruptible_api_call(agent, {"model": "x", "messages": []})
|
||||||
|
|
||||||
|
# Second call: NOT interrupted (turn boundary cleared the flag). A genuine
|
||||||
|
# error must still surface — the previous call's cancellation must not leak.
|
||||||
|
agent._interrupt_requested = False
|
||||||
|
fake_client_2 = MagicMock()
|
||||||
|
fake_client_2.chat.completions.create.side_effect = httpx.RemoteProtocolError(
|
||||||
|
"genuine drop turn B"
|
||||||
|
)
|
||||||
|
agent._create_request_openai_client.return_value = fake_client_2
|
||||||
|
|
||||||
|
with pytest.raises(httpx.RemoteProtocolError):
|
||||||
|
cch.interruptible_api_call(agent, {"model": "x", "messages": []})
|
||||||
Loading…
Add table
Add a link
Reference in a new issue