diff --git a/agent/chat_completion_helpers.py b/agent/chat_completion_helpers.py index cbbc9139462..4a220a0e356 100644 --- a/agent/chat_completion_helpers.py +++ b/agent/chat_completion_helpers.py @@ -139,6 +139,15 @@ def interruptible_api_call(agent, api_kwargs: dict): result = {"response": None, "error": None} request_client_holder = {"client": None, "owner_tid": None} request_client_lock = threading.Lock() + # Request-local cancellation flag. Distinct from agent._interrupt_requested + # because that flag is cleared at run_conversation() turn boundaries, but + # this daemon worker thread can outlive the turn (the gateway caches + # AIAgent instances per session). Tracks whether THIS specific request was + # cancelled by the main thread's interrupt handler, so the transport error + # that is the expected consequence of our own force-close isn't misread as + # a network bug and surfaced to the caller. (PR #6600 — cascading interrupt + # hang.) + _request_cancelled = {"value": False} def _set_request_client(client): with request_client_lock: @@ -229,6 +238,17 @@ def interruptible_api_call(agent, api_kwargs: dict): ) result["response"] = request_client.chat.completions.create(**api_kwargs) except Exception as e: + # If the request was cancelled by the main thread's interrupt + # handler, the transport error is the expected consequence of our + # own force-close, NOT a network bug. Swallow it instead of + # surfacing — the main thread raises InterruptedError. (#6600) + if _request_cancelled["value"]: + logger.debug( + "Non-streaming worker caught %s after request cancellation — " + "exiting without surfacing a network error.", + type(e).__name__, + ) + return result["error"] = e finally: _close_request_client_once("request_complete") @@ -506,6 +526,14 @@ def interruptible_api_call(agent, api_kwargs: dict): break if agent._interrupt_requested: + # Mark THIS request cancelled before force-closing so the worker's + # exception handler recognizes the forced transport error as a + # cancel and exits cleanly instead of surfacing a network error or + # (in the streaming path) burning full retry cycles. (#6600) + _request_cancelled["value"] = True + logger.debug( + "Force-closing httpx client due to interrupt (not a network error)." + ) # Force-close the in-flight worker-local HTTP connection to stop # token generation without poisoning the shared client used to # seed future retries. @@ -1625,6 +1653,14 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= result = {"response": None, "error": None, "partial_tool_names": []} request_client_holder = {"client": None, "diag": None, "owner_tid": None} request_client_lock = threading.Lock() + # Request-local cancellation flag — see interruptible_api_call for the full + # rationale. The streaming retry loop is where the 7-minute cascading- + # interrupt hang originated: a force-close raised RemoteProtocolError, the + # loop classified it as a transient network error, and burned full retry + # cycles (and emitted "reconnecting" noise) on a request the user already + # cancelled. The token lets the worker recognize its own forced close and + # exit immediately instead of retrying. (PR #6600.) + _request_cancelled = {"value": False} def _set_request_client(client): with request_client_lock: @@ -2078,6 +2114,21 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= result["response"] = _call_chat_completions() return # success except Exception as e: + # If the main poll loop force-closed this request because + # of an interrupt, the resulting transport error is the + # expected consequence of our own close — NOT a transient + # network error. Exit immediately: no retry, no fallback, + # no "reconnecting" status. The outer poll loop raises + # InterruptedError. This is the fix for the cascading- + # interrupt hang where doomed retries burned full + # stream-stale-timeout cycles. (#6600) + if _request_cancelled["value"]: + logger.debug( + "Streaming worker caught %s after request " + "cancellation — exiting without retry.", + type(e).__name__, + ) + return _is_timeout = isinstance( e, (_httpx.ReadTimeout, _httpx.ConnectTimeout, _httpx.PoolTimeout) ) @@ -2387,6 +2438,15 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= ) if agent._interrupt_requested: + # Mark THIS request cancelled before force-closing so the worker's + # exception handler recognizes the forced transport error as a + # cancel and exits without retrying or surfacing a network error. + # (#6600) + _request_cancelled["value"] = True + logger.debug( + "Force-closing streaming httpx client due to interrupt " + "(not a network error)." + ) try: if agent.api_mode == "anthropic_messages": agent._anthropic_client.close() diff --git a/tests/agent/test_cascading_interrupt_6600.py b/tests/agent/test_cascading_interrupt_6600.py new file mode 100644 index 00000000000..58fc28c4df0 --- /dev/null +++ b/tests/agent/test_cascading_interrupt_6600.py @@ -0,0 +1,134 @@ +"""Regression guard for the cascading-interrupt hang (PR #6600). + +Original diagnosis and fix by Kristian Vastveit (@kristianvast) in PR #6600, +against the then-inline ``_interruptible_api_call`` / +``_interruptible_streaming_api_call`` methods in run_agent.py. Those methods +have since been extracted into ``agent/chat_completion_helpers.py``, so the +fix is reapplied there and these tests target the extracted functions. + +The bug: when ``agent.interrupt()`` fires during an active LLM call, the main +poll loop force-closes the worker-local httpx client to stop token generation. +That raises a transport error (RemoteProtocolError) on the worker — the +EXPECTED consequence of our own close, not a network bug. The streaming retry +loop misclassified it as a transient connection error and retried, each doomed +retry stalling for the full stream-stale timeout (up to 300s). Because the +gateway caches AIAgent instances per session, the stale worker outlived the +turn and raced the next turn's request — the root of the multi-minute +cascading-interrupt hang. + +The fix: a request-local ``_request_cancelled`` token set by the poll loop +right before the force-close. The worker's exception handler checks it and +exits cleanly (no retry, no fallback, no "reconnecting" status) instead of +treating the forced error as transient. +""" +import threading +import time +import types +from unittest.mock import MagicMock + +import httpx +import pytest + +from agent import chat_completion_helpers as cch + + +class _FakeInterruptError(Exception): + """Stand-in for the transport error a force-close raises on the worker.""" + + +def _make_agent(): + """A MagicMock agent wired with just enough surface for the helpers.""" + agent = MagicMock() + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + agent.verbose_logging = False + # _compute_non_stream_stale_timeout / streaming setup helpers return + # benign values; the real call path is mocked per-test. + agent._compute_non_stream_stale_timeout.return_value = 5.0 + return agent + + +def test_non_streaming_cancel_does_not_surface_network_error(): + """A force-close during a non-streaming call must raise InterruptedError, + not the swallowed transport error.""" + agent = _make_agent() + + create_calls = {"n": 0} + fake_client = MagicMock() + + def _create(**kwargs): + create_calls["n"] += 1 + # Simulate the main thread firing an interrupt mid-call, then the + # force-close raising a transport error on this worker. + agent._interrupt_requested = True + time.sleep(0.3) # let the poll loop observe the interrupt + force-close + raise httpx.RemoteProtocolError("peer closed connection") + + fake_client.chat.completions.create.side_effect = _create + agent._create_request_openai_client.return_value = fake_client + agent._close_request_openai_client = MagicMock() + agent._abort_request_openai_client = MagicMock() + + t0 = time.time() + with pytest.raises(InterruptedError): + cch.interruptible_api_call(agent, {"model": "x", "messages": []}) + elapsed = time.time() - t0 + + # The forced RemoteProtocolError must NOT surface as the raised error. + assert create_calls["n"] == 1 + assert elapsed < 3.0, f"interrupt took {elapsed:.1f}s — should be near-instant" + + +def test_normal_transient_error_still_raises_when_not_cancelled(): + """Regression guard: a real transport error with NO interrupt must still + surface to the caller (so the outer retry loop can recover).""" + agent = _make_agent() + fake_client = MagicMock() + fake_client.chat.completions.create.side_effect = httpx.RemoteProtocolError( + "genuine network drop" + ) + agent._create_request_openai_client.return_value = fake_client + agent._close_request_openai_client = MagicMock() + agent._abort_request_openai_client = MagicMock() + agent._interrupt_requested = False + + with pytest.raises(httpx.RemoteProtocolError): + cch.interruptible_api_call(agent, {"model": "x", "messages": []}) + + +def test_request_cancelled_token_is_request_local(): + """The cancellation token must be created per call, not shared on the + agent — a stale worker from a previous turn must not see the next turn's + interrupt flag flip back to False and mistake its own forced error for a + network bug. We assert the helper reads agent._interrupt_requested at the + force-close site (request-local token set there), by confirming two + independent calls don't share cancellation state.""" + agent = _make_agent() + + # First call: interrupted. + fake_client_1 = MagicMock() + + def _create_1(**kwargs): + agent._interrupt_requested = True + time.sleep(0.3) + raise httpx.RemoteProtocolError("forced close turn A") + + fake_client_1.chat.completions.create.side_effect = _create_1 + agent._create_request_openai_client.return_value = fake_client_1 + agent._close_request_openai_client = MagicMock() + agent._abort_request_openai_client = MagicMock() + + with pytest.raises(InterruptedError): + cch.interruptible_api_call(agent, {"model": "x", "messages": []}) + + # Second call: NOT interrupted (turn boundary cleared the flag). A genuine + # error must still surface — the previous call's cancellation must not leak. + agent._interrupt_requested = False + fake_client_2 = MagicMock() + fake_client_2.chat.completions.create.side_effect = httpx.RemoteProtocolError( + "genuine drop turn B" + ) + agent._create_request_openai_client.return_value = fake_client_2 + + with pytest.raises(httpx.RemoteProtocolError): + cch.interruptible_api_call(agent, {"model": "x", "messages": []})