fix(streaming): silent retry when stream dies mid tool-call (#14151)

When the streaming connection dropped AFTER user-visible text was
delivered but a tool call was in flight, we stubbed the turn with a
'⚠ Stream stalled mid tool-call; Ask me to retry' warning — costing
an iteration and breaking the flow.  Users report this happening
increasingly often on long SSE streams through flaky provider routes.

Fix: in the existing inner stream-retry loop, relax the
deltas_were_sent short-circuit.  If a tool call was in flight
(partial_tool_names populated) AND the error is a transient connection
error (timeout, RemoteProtocolError, SSE 'connection lost', etc.),
silently retry instead of bailing out.  Fire a brief 'Connection
dropped mid tool-call; reconnecting…' marker so the user understands
the preamble is about to be re-streamed.

Researched how Claude Code (tombstone + non-streaming fallback),
OpenCode (blind Effect.retry wrapping whole stream), and Clawdbot
(4-way gate: stopReason==error + output==0 + !hadPotentialSideEffects)
handle this.  Chose the narrow Clawdbot-style gate: retry only when
(a) a tool call was actually in flight (otherwise the existing
stub-with-recovered-text is correct for pure-text stalls) and
(b) the error is transient.  Side-effect safety is automatic — no
tool has been dispatched within this single API call yet.

UX trade-off: user sees preamble text twice on retry (OpenCode-style).
Strictly better than a lost action with a 'retry manually' message.
If retries exhaust, falls through to the existing stub-with-warning
path so the user isn't left with zero signal.

Tests: 3 new tests in TestSilentRetryMidToolCall covering
(1) silent retry recovers tool call; (2) exhausted retries fall back
to stub; (3) text-only stalls don't trigger retry.  30/30 pass.
This commit is contained in:
Teknium 2026-04-22 13:47:33 -07:00 committed by GitHub
parent 88564ad8bc
commit ea67e49574
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 339 additions and 10 deletions

View file

@ -5826,16 +5826,6 @@ class AIAgent:
result["response"] = _call_chat_completions() result["response"] = _call_chat_completions()
return # success return # success
except Exception as e: except Exception as e:
if deltas_were_sent["yes"]:
# Streaming failed AFTER some tokens were already
# delivered. Don't retry or fall back — partial
# content already reached the user.
logger.warning(
"Streaming failed after partial delivery, not retrying: %s", e
)
result["error"] = e
return
_is_timeout = isinstance( _is_timeout = isinstance(
e, (_httpx.ReadTimeout, _httpx.ConnectTimeout, _httpx.PoolTimeout) e, (_httpx.ReadTimeout, _httpx.ConnectTimeout, _httpx.PoolTimeout)
) )
@ -5843,6 +5833,123 @@ class AIAgent:
e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError) e, (_httpx.ConnectError, _httpx.RemoteProtocolError, ConnectionError)
) )
# If the stream died AFTER some tokens were delivered:
# normally we don't retry (the user already saw text,
# retrying would duplicate it). BUT: if a tool call
# was in-flight when the stream died, silently aborting
# discards the tool call entirely. In that case we
# prefer to retry — the user sees a brief
# "reconnecting" marker + duplicated preamble text,
# which is strictly better than a failed action with
# a "retry manually" message. Limit this to transient
# connection errors (Clawdbot-style narrow gate): no
# tool has executed yet within this API call, so
# silent retry is safe wrt side-effects.
if deltas_were_sent["yes"]:
_partial_tool_in_flight = bool(
result.get("partial_tool_names")
)
_is_sse_conn_err_preview = False
if not _is_timeout and not _is_conn_err:
from openai import APIError as _APIError
if isinstance(e, _APIError) and not getattr(e, "status_code", None):
_err_lower_preview = str(e).lower()
_SSE_PREVIEW_PHRASES = (
"connection lost",
"connection reset",
"connection closed",
"connection terminated",
"network error",
"network connection",
"terminated",
"peer closed",
"broken pipe",
"upstream connect error",
)
_is_sse_conn_err_preview = any(
phrase in _err_lower_preview
for phrase in _SSE_PREVIEW_PHRASES
)
_is_transient = (
_is_timeout or _is_conn_err or _is_sse_conn_err_preview
)
_can_silent_retry = (
_partial_tool_in_flight
and _is_transient
and _stream_attempt < _max_stream_retries
)
if not _can_silent_retry:
# Either no tool call was in-flight (so the
# turn was a pure text response — current
# stub-with-recovered-text behaviour is
# correct), or retries are exhausted, or the
# error isn't transient. Fall through to the
# stub path.
logger.warning(
"Streaming failed after partial delivery, not retrying: %s", e
)
result["error"] = e
return
# Tool call was in-flight AND error is transient:
# retry silently. Clear per-attempt state so the
# next stream starts clean. Fire a "reconnecting"
# marker so the user sees why the preamble is
# about to be re-streamed.
logger.info(
"Streaming attempt %s/%s died mid tool-call "
"(%s: %s) after user-visible text; retrying "
"silently to avoid losing the action. "
"Preamble will re-stream.",
_stream_attempt + 1,
_max_stream_retries + 1,
type(e).__name__,
e,
)
try:
self._fire_stream_delta(
"\n\n⚠ Connection dropped mid tool-call; "
"reconnecting…\n\n"
)
except Exception:
pass
# Reset the streamed-text buffer so the retry's
# fresh preamble doesn't get double-recorded in
# _current_streamed_assistant_text (which would
# pollute the interim-visible-text comparison).
try:
self._reset_stream_delivery_tracking()
except Exception:
pass
# Reset in-memory accumulators so the next
# attempt's chunks don't concat onto the dead
# stream's partial JSON.
result["partial_tool_names"] = []
deltas_were_sent["yes"] = False
first_delta_fired["done"] = False
self._emit_status(
f"⚠️ Connection dropped mid tool-call "
f"({type(e).__name__}). Reconnecting… "
f"(attempt {_stream_attempt + 2}/{_max_stream_retries + 1})"
)
self._touch_activity(
f"stream retry {_stream_attempt + 2}/{_max_stream_retries + 1} "
f"mid tool-call after {type(e).__name__}"
)
stale = request_client_holder.get("client")
if stale is not None:
self._close_request_openai_client(
stale, reason="stream_mid_tool_retry_cleanup"
)
request_client_holder["client"] = None
try:
self._replace_primary_openai_client(
reason="stream_mid_tool_retry_pool_cleanup"
)
except Exception:
pass
self._emit_status("🔄 Reconnected — resuming…")
continue
# SSE error events from proxies (e.g. OpenRouter sends # SSE error events from proxies (e.g. OpenRouter sends
# {"error":{"message":"Network connection lost."}}) are # {"error":{"message":"Network connection lost."}}) are
# raised as APIError by the OpenAI SDK. These are # raised as APIError by the OpenAI SDK. These are

View file

@ -1133,3 +1133,225 @@ class TestPartialToolCallWarning:
f"Unexpected warning on text-only partial stream: {content!r}" f"Unexpected warning on text-only partial stream: {content!r}"
) )
class TestSilentRetryMidToolCall:
"""Regression: when the stream dies mid tool-call JSON after text was
already delivered, we previously stubbed the turn with a "retry manually"
warning. Now: if the error is a transient connection error AND a tool
call was in flight, silently retry the stream (the user sees a brief
reconnect marker + duplicated preamble, which is strictly better than
a lost action). If no tool call was in flight, or the error isn't
transient, the existing stub-with-warning behaviour is preserved.
"""
@patch("run_agent.AIAgent._replace_primary_openai_client")
@patch("run_agent.AIAgent._create_request_openai_client")
@patch("run_agent.AIAgent._close_request_openai_client")
def test_silent_retry_recovers_tool_call(
self, mock_close, mock_create, mock_replace,
):
"""First attempt: text + partial tool-call + connection drop.
Second attempt: text + complete tool-call. Response should contain
the recovered tool call; no warning stub should be returned."""
from run_agent import AIAgent
import httpx as _httpx
attempts = {"n": 0}
def _first_stream():
yield _make_stream_chunk(content="Let me write the audit: ")
yield _make_stream_chunk(tool_calls=[
_make_tool_call_delta(index=0, tc_id="call_1", name="write_file"),
])
yield _make_stream_chunk(tool_calls=[
_make_tool_call_delta(index=0, arguments='{"path": "/tmp/x", '),
])
raise _httpx.RemoteProtocolError("peer closed connection")
def _second_stream():
yield _make_stream_chunk(content="Let me write the audit: ")
yield _make_stream_chunk(tool_calls=[
_make_tool_call_delta(index=0, tc_id="call_1", name="write_file"),
])
yield _make_stream_chunk(tool_calls=[
_make_tool_call_delta(
index=0, arguments='{"path": "/tmp/x", "content": "hi"}',
),
])
yield _make_stream_chunk(finish_reason="tool_calls")
def _pick_stream(*a, **kw):
attempts["n"] += 1
return _first_stream() if attempts["n"] == 1 else _second_stream()
mock_client = MagicMock()
mock_client.chat.completions.create.side_effect = _pick_stream
mock_create.return_value = mock_client
agent = AIAgent(
api_key="test-key",
base_url="https://openrouter.ai/api/v1",
model="test/model",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
agent.api_mode = "chat_completions"
agent._interrupt_requested = False
fired_deltas: list = []
agent._fire_stream_delta = lambda text: fired_deltas.append(text)
import os as _os
_prev = _os.environ.get("HERMES_STREAM_RETRIES")
_os.environ["HERMES_STREAM_RETRIES"] = "2"
try:
response = agent._interruptible_streaming_api_call({})
finally:
if _prev is None:
_os.environ.pop("HERMES_STREAM_RETRIES", None)
else:
_os.environ["HERMES_STREAM_RETRIES"] = _prev
assert attempts["n"] == 2, (
f"Expected silent retry (2 attempts), got {attempts['n']}"
)
# Response should carry the recovered tool call, not a warning stub.
msg = response.choices[0].message
tool_calls = getattr(msg, "tool_calls", None)
assert tool_calls, (
f"Silent retry should recover the tool call, got tool_calls={tool_calls!r} "
f"content={getattr(msg, 'content', None)!r}"
)
_tc0 = tool_calls[0]
_name = (
_tc0["function"]["name"] if isinstance(_tc0, dict)
else _tc0.function.name
)
assert _name == "write_file"
# User saw a reconnect marker between attempts.
assert any("reconnecting" in d.lower() for d in fired_deltas), (
f"Expected a reconnect marker delta, fired_deltas={fired_deltas}"
)
# Stub-path warning must NOT appear (this was the whole point).
joined = "".join(fired_deltas)
assert "Stream stalled" not in joined, (
f"Stub-path warning leaked into silent-retry path: {joined!r}"
)
@patch("run_agent.AIAgent._replace_primary_openai_client")
@patch("run_agent.AIAgent._create_request_openai_client")
@patch("run_agent.AIAgent._close_request_openai_client")
def test_silent_retry_exhausted_falls_back_to_stub(
self, mock_close, mock_create, mock_replace,
):
"""When all retry attempts fail with connection errors, fall back
to the original stub-with-warning behaviour so the user isn't left
with zero signal."""
from run_agent import AIAgent
import httpx as _httpx
def _always_fails():
yield _make_stream_chunk(content="Let me write the audit: ")
yield _make_stream_chunk(tool_calls=[
_make_tool_call_delta(index=0, tc_id="call_1", name="write_file"),
])
raise _httpx.RemoteProtocolError("peer closed connection")
mock_client = MagicMock()
mock_client.chat.completions.create.side_effect = lambda *a, **kw: _always_fails()
mock_create.return_value = mock_client
agent = AIAgent(
api_key="test-key",
base_url="https://openrouter.ai/api/v1",
model="test/model",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
agent.api_mode = "chat_completions"
agent._interrupt_requested = False
fired_deltas: list = []
agent._fire_stream_delta = lambda text: fired_deltas.append(text)
import os as _os
_prev = _os.environ.get("HERMES_STREAM_RETRIES")
_os.environ["HERMES_STREAM_RETRIES"] = "1"
try:
response = agent._interruptible_streaming_api_call({})
finally:
if _prev is None:
_os.environ.pop("HERMES_STREAM_RETRIES", None)
else:
_os.environ["HERMES_STREAM_RETRIES"] = _prev
# After retries exhaust, the stub-with-warning path must engage.
content = response.choices[0].message.content or ""
assert "Stream stalled mid tool-call" in content, (
f"Exhausted-retry fallback dropped the user-visible warning: {content!r}"
)
assert response.choices[0].message.tool_calls is None
@patch("run_agent.AIAgent._replace_primary_openai_client")
@patch("run_agent.AIAgent._create_request_openai_client")
@patch("run_agent.AIAgent._close_request_openai_client")
def test_no_silent_retry_for_text_only_stall(
self, mock_close, mock_create, mock_replace,
):
"""Text-only stall (no tool call in flight) must NOT trigger silent
retry that's the case where the user saw the model's text reply
and retrying would duplicate it with no benefit."""
from run_agent import AIAgent
import httpx as _httpx
attempts = {"n": 0}
def _text_stall(*a, **kw):
attempts["n"] += 1
def _gen():
yield _make_stream_chunk(content="Here's my answer so far")
raise _httpx.RemoteProtocolError("peer closed connection")
return _gen()
mock_client = MagicMock()
mock_client.chat.completions.create.side_effect = _text_stall
mock_create.return_value = mock_client
agent = AIAgent(
api_key="test-key",
base_url="https://openrouter.ai/api/v1",
model="test/model",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
agent.api_mode = "chat_completions"
agent._interrupt_requested = False
agent._current_streamed_assistant_text = "Here's my answer so far"
import os as _os
_prev = _os.environ.get("HERMES_STREAM_RETRIES")
_os.environ["HERMES_STREAM_RETRIES"] = "2"
try:
response = agent._interruptible_streaming_api_call({})
finally:
if _prev is None:
_os.environ.pop("HERMES_STREAM_RETRIES", None)
else:
_os.environ["HERMES_STREAM_RETRIES"] = _prev
# Only one attempt: text-only stall short-circuits retry.
assert attempts["n"] == 1, (
f"Text-only stall should not silent-retry, got {attempts['n']} attempts"
)
content = response.choices[0].message.content or ""
assert content == "Here's my answer so far", (
f"Text-only stall regressed: {content!r}"
)
assert "Stream stalled" not in content, (
f"Text-only stall should not emit tool-call warning: {content!r}"
)