From ea67e49574b0b3c4252cd5ae8472e78c7f2d1d80 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Wed, 22 Apr 2026 13:47:33 -0700 Subject: [PATCH] fix(streaming): silent retry when stream dies mid tool-call (#14151) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the streaming connection dropped AFTER user-visible text was delivered but a tool call was in flight, we stubbed the turn with a '⚠ Stream stalled mid tool-call; Ask me to retry' warning — costing an iteration and breaking the flow. Users report this happening increasingly often on long SSE streams through flaky provider routes. Fix: in the existing inner stream-retry loop, relax the deltas_were_sent short-circuit. If a tool call was in flight (partial_tool_names populated) AND the error is a transient connection error (timeout, RemoteProtocolError, SSE 'connection lost', etc.), silently retry instead of bailing out. Fire a brief 'Connection dropped mid tool-call; reconnecting…' marker so the user understands the preamble is about to be re-streamed. Researched how Claude Code (tombstone + non-streaming fallback), OpenCode (blind Effect.retry wrapping whole stream), and Clawdbot (4-way gate: stopReason==error + output==0 + !hadPotentialSideEffects) handle this. Chose the narrow Clawdbot-style gate: retry only when (a) a tool call was actually in flight (otherwise the existing stub-with-recovered-text is correct for pure-text stalls) and (b) the error is transient. Side-effect safety is automatic — no tool has been dispatched within this single API call yet. UX trade-off: user sees preamble text twice on retry (OpenCode-style). Strictly better than a lost action with a 'retry manually' message. If retries exhaust, falls through to the existing stub-with-warning path so the user isn't left with zero signal. Tests: 3 new tests in TestSilentRetryMidToolCall covering (1) silent retry recovers tool call; (2) exhausted retries fall back to stub; (3) text-only stalls don't trigger retry. 30/30 pass. --- run_agent.py | 127 +++++++++++++++-- tests/run_agent/test_streaming.py | 222 ++++++++++++++++++++++++++++++ 2 files changed, 339 insertions(+), 10 deletions(-) diff --git a/run_agent.py b/run_agent.py index ec5e86d78..cc7603fae 100644 --- a/run_agent.py +++ b/run_agent.py @@ -5826,16 +5826,6 @@ class AIAgent: result["response"] = _call_chat_completions() return # success except Exception as e: - if deltas_were_sent["yes"]: - # Streaming failed AFTER some tokens were already - # delivered. Don't retry or fall back — partial - # content already reached the user. - logger.warning( - "Streaming failed after partial delivery, not retrying: %s", e - ) - result["error"] = e - return - _is_timeout = isinstance( e, (_httpx.ReadTimeout, _httpx.ConnectTimeout, _httpx.PoolTimeout) ) @@ -5843,6 +5833,123 @@ class AIAgent: e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError) ) + # If the stream died AFTER some tokens were delivered: + # normally we don't retry (the user already saw text, + # retrying would duplicate it). BUT: if a tool call + # was in-flight when the stream died, silently aborting + # discards the tool call entirely. In that case we + # prefer to retry — the user sees a brief + # "reconnecting" marker + duplicated preamble text, + # which is strictly better than a failed action with + # a "retry manually" message. Limit this to transient + # connection errors (Clawdbot-style narrow gate): no + # tool has executed yet within this API call, so + # silent retry is safe wrt side-effects. + if deltas_were_sent["yes"]: + _partial_tool_in_flight = bool( + result.get("partial_tool_names") + ) + _is_sse_conn_err_preview = False + if not _is_timeout and not _is_conn_err: + from openai import APIError as _APIError + if isinstance(e, _APIError) and not getattr(e, "status_code", None): + _err_lower_preview = str(e).lower() + _SSE_PREVIEW_PHRASES = ( + "connection lost", + "connection reset", + "connection closed", + "connection terminated", + "network error", + "network connection", + "terminated", + "peer closed", + "broken pipe", + "upstream connect error", + ) + _is_sse_conn_err_preview = any( + phrase in _err_lower_preview + for phrase in _SSE_PREVIEW_PHRASES + ) + _is_transient = ( + _is_timeout or _is_conn_err or _is_sse_conn_err_preview + ) + _can_silent_retry = ( + _partial_tool_in_flight + and _is_transient + and _stream_attempt < _max_stream_retries + ) + if not _can_silent_retry: + # Either no tool call was in-flight (so the + # turn was a pure text response — current + # stub-with-recovered-text behaviour is + # correct), or retries are exhausted, or the + # error isn't transient. Fall through to the + # stub path. + logger.warning( + "Streaming failed after partial delivery, not retrying: %s", e + ) + result["error"] = e + return + # Tool call was in-flight AND error is transient: + # retry silently. Clear per-attempt state so the + # next stream starts clean. Fire a "reconnecting" + # marker so the user sees why the preamble is + # about to be re-streamed. + logger.info( + "Streaming attempt %s/%s died mid tool-call " + "(%s: %s) after user-visible text; retrying " + "silently to avoid losing the action. " + "Preamble will re-stream.", + _stream_attempt + 1, + _max_stream_retries + 1, + type(e).__name__, + e, + ) + try: + self._fire_stream_delta( + "\n\n⚠ Connection dropped mid tool-call; " + "reconnecting…\n\n" + ) + except Exception: + pass + # Reset the streamed-text buffer so the retry's + # fresh preamble doesn't get double-recorded in + # _current_streamed_assistant_text (which would + # pollute the interim-visible-text comparison). + try: + self._reset_stream_delivery_tracking() + except Exception: + pass + # Reset in-memory accumulators so the next + # attempt's chunks don't concat onto the dead + # stream's partial JSON. + result["partial_tool_names"] = [] + deltas_were_sent["yes"] = False + first_delta_fired["done"] = False + self._emit_status( + f"⚠️ Connection dropped mid tool-call " + f"({type(e).__name__}). Reconnecting… " + f"(attempt {_stream_attempt + 2}/{_max_stream_retries + 1})" + ) + self._touch_activity( + f"stream retry {_stream_attempt + 2}/{_max_stream_retries + 1} " + f"mid tool-call after {type(e).__name__}" + ) + stale = request_client_holder.get("client") + if stale is not None: + self._close_request_openai_client( + stale, reason="stream_mid_tool_retry_cleanup" + ) + request_client_holder["client"] = None + try: + self._replace_primary_openai_client( + reason="stream_mid_tool_retry_pool_cleanup" + ) + except Exception: + pass + self._emit_status("🔄 Reconnected — resuming…") + continue + # SSE error events from proxies (e.g. OpenRouter sends # {"error":{"message":"Network connection lost."}}) are # raised as APIError by the OpenAI SDK. These are diff --git a/tests/run_agent/test_streaming.py b/tests/run_agent/test_streaming.py index ff99264c7..22eab8114 100644 --- a/tests/run_agent/test_streaming.py +++ b/tests/run_agent/test_streaming.py @@ -1133,3 +1133,225 @@ class TestPartialToolCallWarning: f"Unexpected warning on text-only partial stream: {content!r}" ) + +class TestSilentRetryMidToolCall: + """Regression: when the stream dies mid tool-call JSON after text was + already delivered, we previously stubbed the turn with a "retry manually" + warning. Now: if the error is a transient connection error AND a tool + call was in flight, silently retry the stream (the user sees a brief + reconnect marker + duplicated preamble, which is strictly better than + a lost action). If no tool call was in flight, or the error isn't + transient, the existing stub-with-warning behaviour is preserved. + """ + + @patch("run_agent.AIAgent._replace_primary_openai_client") + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_silent_retry_recovers_tool_call( + self, mock_close, mock_create, mock_replace, + ): + """First attempt: text + partial tool-call + connection drop. + Second attempt: text + complete tool-call. Response should contain + the recovered tool call; no warning stub should be returned.""" + from run_agent import AIAgent + import httpx as _httpx + + attempts = {"n": 0} + + def _first_stream(): + yield _make_stream_chunk(content="Let me write the audit: ") + yield _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_1", name="write_file"), + ]) + yield _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, arguments='{"path": "/tmp/x", '), + ]) + raise _httpx.RemoteProtocolError("peer closed connection") + + def _second_stream(): + yield _make_stream_chunk(content="Let me write the audit: ") + yield _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_1", name="write_file"), + ]) + yield _make_stream_chunk(tool_calls=[ + _make_tool_call_delta( + index=0, arguments='{"path": "/tmp/x", "content": "hi"}', + ), + ]) + yield _make_stream_chunk(finish_reason="tool_calls") + + def _pick_stream(*a, **kw): + attempts["n"] += 1 + return _first_stream() if attempts["n"] == 1 else _second_stream() + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = _pick_stream + mock_create.return_value = mock_client + + agent = AIAgent( + api_key="test-key", + base_url="https://openrouter.ai/api/v1", + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + fired_deltas: list = [] + agent._fire_stream_delta = lambda text: fired_deltas.append(text) + + import os as _os + _prev = _os.environ.get("HERMES_STREAM_RETRIES") + _os.environ["HERMES_STREAM_RETRIES"] = "2" + try: + response = agent._interruptible_streaming_api_call({}) + finally: + if _prev is None: + _os.environ.pop("HERMES_STREAM_RETRIES", None) + else: + _os.environ["HERMES_STREAM_RETRIES"] = _prev + + assert attempts["n"] == 2, ( + f"Expected silent retry (2 attempts), got {attempts['n']}" + ) + # Response should carry the recovered tool call, not a warning stub. + msg = response.choices[0].message + tool_calls = getattr(msg, "tool_calls", None) + assert tool_calls, ( + f"Silent retry should recover the tool call, got tool_calls={tool_calls!r} " + f"content={getattr(msg, 'content', None)!r}" + ) + _tc0 = tool_calls[0] + _name = ( + _tc0["function"]["name"] if isinstance(_tc0, dict) + else _tc0.function.name + ) + assert _name == "write_file" + # User saw a reconnect marker between attempts. + assert any("reconnecting" in d.lower() for d in fired_deltas), ( + f"Expected a reconnect marker delta, fired_deltas={fired_deltas}" + ) + # Stub-path warning must NOT appear (this was the whole point). + joined = "".join(fired_deltas) + assert "Stream stalled" not in joined, ( + f"Stub-path warning leaked into silent-retry path: {joined!r}" + ) + + @patch("run_agent.AIAgent._replace_primary_openai_client") + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_silent_retry_exhausted_falls_back_to_stub( + self, mock_close, mock_create, mock_replace, + ): + """When all retry attempts fail with connection errors, fall back + to the original stub-with-warning behaviour so the user isn't left + with zero signal.""" + from run_agent import AIAgent + import httpx as _httpx + + def _always_fails(): + yield _make_stream_chunk(content="Let me write the audit: ") + yield _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_1", name="write_file"), + ]) + raise _httpx.RemoteProtocolError("peer closed connection") + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = lambda *a, **kw: _always_fails() + mock_create.return_value = mock_client + + agent = AIAgent( + api_key="test-key", + base_url="https://openrouter.ai/api/v1", + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + fired_deltas: list = [] + agent._fire_stream_delta = lambda text: fired_deltas.append(text) + + import os as _os + _prev = _os.environ.get("HERMES_STREAM_RETRIES") + _os.environ["HERMES_STREAM_RETRIES"] = "1" + try: + response = agent._interruptible_streaming_api_call({}) + finally: + if _prev is None: + _os.environ.pop("HERMES_STREAM_RETRIES", None) + else: + _os.environ["HERMES_STREAM_RETRIES"] = _prev + + # After retries exhaust, the stub-with-warning path must engage. + content = response.choices[0].message.content or "" + assert "Stream stalled mid tool-call" in content, ( + f"Exhausted-retry fallback dropped the user-visible warning: {content!r}" + ) + assert response.choices[0].message.tool_calls is None + + @patch("run_agent.AIAgent._replace_primary_openai_client") + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_no_silent_retry_for_text_only_stall( + self, mock_close, mock_create, mock_replace, + ): + """Text-only stall (no tool call in flight) must NOT trigger silent + retry — that's the case where the user saw the model's text reply + and retrying would duplicate it with no benefit.""" + from run_agent import AIAgent + import httpx as _httpx + + attempts = {"n": 0} + + def _text_stall(*a, **kw): + attempts["n"] += 1 + + def _gen(): + yield _make_stream_chunk(content="Here's my answer so far") + raise _httpx.RemoteProtocolError("peer closed connection") + return _gen() + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = _text_stall + mock_create.return_value = mock_client + + agent = AIAgent( + api_key="test-key", + base_url="https://openrouter.ai/api/v1", + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + agent._current_streamed_assistant_text = "Here's my answer so far" + + import os as _os + _prev = _os.environ.get("HERMES_STREAM_RETRIES") + _os.environ["HERMES_STREAM_RETRIES"] = "2" + try: + response = agent._interruptible_streaming_api_call({}) + finally: + if _prev is None: + _os.environ.pop("HERMES_STREAM_RETRIES", None) + else: + _os.environ["HERMES_STREAM_RETRIES"] = _prev + + # Only one attempt: text-only stall short-circuits retry. + assert attempts["n"] == 1, ( + f"Text-only stall should not silent-retry, got {attempts['n']} attempts" + ) + content = response.choices[0].message.content or "" + assert content == "Here's my answer so far", ( + f"Text-only stall regressed: {content!r}" + ) + assert "Stream stalled" not in content, ( + f"Text-only stall should not emit tool-call warning: {content!r}" + ) +