fix(agent): don't retry interrupt-induced transport errors (cascading-interrupt hang)

When agent.interrupt() fires during an active LLM call, the main poll loop
force-closes the worker-local httpx client to stop token generation. That
raises a transport error (RemoteProtocolError) on the worker thread — the
EXPECTED consequence of our own close, not a network bug.

The streaming retry loop misclassified it as a transient connection error
and retried; each doomed retry stalled for the full stream-stale timeout
(up to 300s). Because the gateway caches AIAgent instances per session, the
stale worker outlived the interrupted turn and raced the next turn's request
on shared client state — the root of the multi-minute cascading-interrupt
hang reported in the wild.

Fix: a request-local _request_cancelled token set by the poll loop right
before the force-close, in both interruptible_api_call (non-streaming) and
interruptible_streaming_api_call. The worker's exception handler checks the
token and exits cleanly — no retry, no fallback, no 'reconnecting' status —
instead of treating the forced error as transient. The token is request-
local (not agent._interrupt_requested, which is cleared at turn boundaries)
so a stale worker outliving its turn still recognizes its own forced close.

Original diagnosis and fix by @kristianvast (PR #6600), against the then-
inline methods in run_agent.py. Those were since extracted into
agent/chat_completion_helpers.py, so the fix is reapplied there.

Co-authored-by: Kristian Vastveit <kristianvast@users.noreply.github.com>
This commit is contained in:
teknium1 2026-06-08 01:55:31 -07:00 committed by Teknium
parent aa6f2775fa
commit dd0d1222a2
2 changed files with 194 additions and 0 deletions

View file

@ -139,6 +139,15 @@ def interruptible_api_call(agent, api_kwargs: dict):
result = {"response": None, "error": None} result = {"response": None, "error": None}
request_client_holder = {"client": None, "owner_tid": None} request_client_holder = {"client": None, "owner_tid": None}
request_client_lock = threading.Lock() request_client_lock = threading.Lock()
# Request-local cancellation flag. Distinct from agent._interrupt_requested
# because that flag is cleared at run_conversation() turn boundaries, but
# this daemon worker thread can outlive the turn (the gateway caches
# AIAgent instances per session). Tracks whether THIS specific request was
# cancelled by the main thread's interrupt handler, so the transport error
# that is the expected consequence of our own force-close isn't misread as
# a network bug and surfaced to the caller. (PR #6600 — cascading interrupt
# hang.)
_request_cancelled = {"value": False}
def _set_request_client(client): def _set_request_client(client):
with request_client_lock: with request_client_lock:
@ -229,6 +238,17 @@ def interruptible_api_call(agent, api_kwargs: dict):
) )
result["response"] = request_client.chat.completions.create(**api_kwargs) result["response"] = request_client.chat.completions.create(**api_kwargs)
except Exception as e: except Exception as e:
# If the request was cancelled by the main thread's interrupt
# handler, the transport error is the expected consequence of our
# own force-close, NOT a network bug. Swallow it instead of
# surfacing — the main thread raises InterruptedError. (#6600)
if _request_cancelled["value"]:
logger.debug(
"Non-streaming worker caught %s after request cancellation — "
"exiting without surfacing a network error.",
type(e).__name__,
)
return
result["error"] = e result["error"] = e
finally: finally:
_close_request_client_once("request_complete") _close_request_client_once("request_complete")
@ -506,6 +526,14 @@ def interruptible_api_call(agent, api_kwargs: dict):
break break
if agent._interrupt_requested: if agent._interrupt_requested:
# Mark THIS request cancelled before force-closing so the worker's
# exception handler recognizes the forced transport error as a
# cancel and exits cleanly instead of surfacing a network error or
# (in the streaming path) burning full retry cycles. (#6600)
_request_cancelled["value"] = True
logger.debug(
"Force-closing httpx client due to interrupt (not a network error)."
)
# Force-close the in-flight worker-local HTTP connection to stop # Force-close the in-flight worker-local HTTP connection to stop
# token generation without poisoning the shared client used to # token generation without poisoning the shared client used to
# seed future retries. # seed future retries.
@ -1625,6 +1653,14 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
result = {"response": None, "error": None, "partial_tool_names": []} result = {"response": None, "error": None, "partial_tool_names": []}
request_client_holder = {"client": None, "diag": None, "owner_tid": None} request_client_holder = {"client": None, "diag": None, "owner_tid": None}
request_client_lock = threading.Lock() request_client_lock = threading.Lock()
# Request-local cancellation flag — see interruptible_api_call for the full
# rationale. The streaming retry loop is where the 7-minute cascading-
# interrupt hang originated: a force-close raised RemoteProtocolError, the
# loop classified it as a transient network error, and burned full retry
# cycles (and emitted "reconnecting" noise) on a request the user already
# cancelled. The token lets the worker recognize its own forced close and
# exit immediately instead of retrying. (PR #6600.)
_request_cancelled = {"value": False}
def _set_request_client(client): def _set_request_client(client):
with request_client_lock: with request_client_lock:
@ -2078,6 +2114,21 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
result["response"] = _call_chat_completions() result["response"] = _call_chat_completions()
return # success return # success
except Exception as e: except Exception as e:
# If the main poll loop force-closed this request because
# of an interrupt, the resulting transport error is the
# expected consequence of our own close — NOT a transient
# network error. Exit immediately: no retry, no fallback,
# no "reconnecting" status. The outer poll loop raises
# InterruptedError. This is the fix for the cascading-
# interrupt hang where doomed retries burned full
# stream-stale-timeout cycles. (#6600)
if _request_cancelled["value"]:
logger.debug(
"Streaming worker caught %s after request "
"cancellation — exiting without retry.",
type(e).__name__,
)
return
_is_timeout = isinstance( _is_timeout = isinstance(
e, (_httpx.ReadTimeout, _httpx.ConnectTimeout, _httpx.PoolTimeout) e, (_httpx.ReadTimeout, _httpx.ConnectTimeout, _httpx.PoolTimeout)
) )
@ -2387,6 +2438,15 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
) )
if agent._interrupt_requested: if agent._interrupt_requested:
# Mark THIS request cancelled before force-closing so the worker's
# exception handler recognizes the forced transport error as a
# cancel and exits without retrying or surfacing a network error.
# (#6600)
_request_cancelled["value"] = True
logger.debug(
"Force-closing streaming httpx client due to interrupt "
"(not a network error)."
)
try: try:
if agent.api_mode == "anthropic_messages": if agent.api_mode == "anthropic_messages":
agent._anthropic_client.close() agent._anthropic_client.close()

View file

@ -0,0 +1,134 @@
"""Regression guard for the cascading-interrupt hang (PR #6600).
Original diagnosis and fix by Kristian Vastveit (@kristianvast) in PR #6600,
against the then-inline ``_interruptible_api_call`` /
``_interruptible_streaming_api_call`` methods in run_agent.py. Those methods
have since been extracted into ``agent/chat_completion_helpers.py``, so the
fix is reapplied there and these tests target the extracted functions.
The bug: when ``agent.interrupt()`` fires during an active LLM call, the main
poll loop force-closes the worker-local httpx client to stop token generation.
That raises a transport error (RemoteProtocolError) on the worker the
EXPECTED consequence of our own close, not a network bug. The streaming retry
loop misclassified it as a transient connection error and retried, each doomed
retry stalling for the full stream-stale timeout (up to 300s). Because the
gateway caches AIAgent instances per session, the stale worker outlived the
turn and raced the next turn's request — the root of the multi-minute
cascading-interrupt hang.
The fix: a request-local ``_request_cancelled`` token set by the poll loop
right before the force-close. The worker's exception handler checks it and
exits cleanly (no retry, no fallback, no "reconnecting" status) instead of
treating the forced error as transient.
"""
import threading
import time
import types
from unittest.mock import MagicMock
import httpx
import pytest
from agent import chat_completion_helpers as cch
class _FakeInterruptError(Exception):
"""Stand-in for the transport error a force-close raises on the worker."""
def _make_agent():
"""A MagicMock agent wired with just enough surface for the helpers."""
agent = MagicMock()
agent.api_mode = "chat_completions"
agent._interrupt_requested = False
agent.verbose_logging = False
# _compute_non_stream_stale_timeout / streaming setup helpers return
# benign values; the real call path is mocked per-test.
agent._compute_non_stream_stale_timeout.return_value = 5.0
return agent
def test_non_streaming_cancel_does_not_surface_network_error():
"""A force-close during a non-streaming call must raise InterruptedError,
not the swallowed transport error."""
agent = _make_agent()
create_calls = {"n": 0}
fake_client = MagicMock()
def _create(**kwargs):
create_calls["n"] += 1
# Simulate the main thread firing an interrupt mid-call, then the
# force-close raising a transport error on this worker.
agent._interrupt_requested = True
time.sleep(0.3) # let the poll loop observe the interrupt + force-close
raise httpx.RemoteProtocolError("peer closed connection")
fake_client.chat.completions.create.side_effect = _create
agent._create_request_openai_client.return_value = fake_client
agent._close_request_openai_client = MagicMock()
agent._abort_request_openai_client = MagicMock()
t0 = time.time()
with pytest.raises(InterruptedError):
cch.interruptible_api_call(agent, {"model": "x", "messages": []})
elapsed = time.time() - t0
# The forced RemoteProtocolError must NOT surface as the raised error.
assert create_calls["n"] == 1
assert elapsed < 3.0, f"interrupt took {elapsed:.1f}s — should be near-instant"
def test_normal_transient_error_still_raises_when_not_cancelled():
"""Regression guard: a real transport error with NO interrupt must still
surface to the caller (so the outer retry loop can recover)."""
agent = _make_agent()
fake_client = MagicMock()
fake_client.chat.completions.create.side_effect = httpx.RemoteProtocolError(
"genuine network drop"
)
agent._create_request_openai_client.return_value = fake_client
agent._close_request_openai_client = MagicMock()
agent._abort_request_openai_client = MagicMock()
agent._interrupt_requested = False
with pytest.raises(httpx.RemoteProtocolError):
cch.interruptible_api_call(agent, {"model": "x", "messages": []})
def test_request_cancelled_token_is_request_local():
"""The cancellation token must be created per call, not shared on the
agent a stale worker from a previous turn must not see the next turn's
interrupt flag flip back to False and mistake its own forced error for a
network bug. We assert the helper reads agent._interrupt_requested at the
force-close site (request-local token set there), by confirming two
independent calls don't share cancellation state."""
agent = _make_agent()
# First call: interrupted.
fake_client_1 = MagicMock()
def _create_1(**kwargs):
agent._interrupt_requested = True
time.sleep(0.3)
raise httpx.RemoteProtocolError("forced close turn A")
fake_client_1.chat.completions.create.side_effect = _create_1
agent._create_request_openai_client.return_value = fake_client_1
agent._close_request_openai_client = MagicMock()
agent._abort_request_openai_client = MagicMock()
with pytest.raises(InterruptedError):
cch.interruptible_api_call(agent, {"model": "x", "messages": []})
# Second call: NOT interrupted (turn boundary cleared the flag). A genuine
# error must still surface — the previous call's cancellation must not leak.
agent._interrupt_requested = False
fake_client_2 = MagicMock()
fake_client_2.chat.completions.create.side_effect = httpx.RemoteProtocolError(
"genuine drop turn B"
)
agent._create_request_openai_client.return_value = fake_client_2
with pytest.raises(httpx.RemoteProtocolError):
cch.interruptible_api_call(agent, {"model": "x", "messages": []})