diff --git a/run_agent.py b/run_agent.py index e8d23d39ca..d5ff125e33 100644 --- a/run_agent.py +++ b/run_agent.py @@ -5579,7 +5579,7 @@ class AIAgent: raise result["error"] return result["response"] - result = {"response": None, "error": None} + result = {"response": None, "error": None, "partial_tool_names": []} request_client_holder = {"client": None} first_delta_fired = {"done": False} deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback) @@ -5751,6 +5751,14 @@ class AIAgent: tool_gen_notified.add(idx) _fire_first_delta() self._fire_tool_gen_started(name) + # Record the partial tool-call name so the outer + # stub-builder can surface a user-visible warning + # if streaming dies before this tool's arguments + # are fully delivered. Without this, a stall + # during tool-call JSON generation lets the stub + # at line ~6107 return `tool_calls=None`, silently + # discarding the attempted action. + result["partial_tool_names"].append(name) if chunk.choices[0].finish_reason: finish_reason = chunk.choices[0].finish_reason @@ -6117,13 +6125,44 @@ class AIAgent: _partial_text = ( getattr(self, "_current_streamed_assistant_text", "") or "" ).strip() or None - logger.warning( - "Partial stream delivered before error; returning stub " - "response with %s chars of recovered content to prevent " - "duplicate messages: %s", - len(_partial_text or ""), - result["error"], - ) + + # If the stream died while the model was emitting a tool call, + # the stub below will silently set `tool_calls=None` and the + # agent loop will treat the turn as complete — the attempted + # action is lost with no user-facing signal. Append a + # human-visible warning to the stub content so (a) the user + # knows something failed, and (b) the next turn's model sees + # in conversation history what was attempted and can retry. + _partial_names = list(result.get("partial_tool_names") or []) + if _partial_names: + _name_str = ", ".join(_partial_names[:3]) + if len(_partial_names) > 3: + _name_str += f", +{len(_partial_names) - 3} more" + _warn = ( + f"\n\n⚠ Stream stalled mid tool-call " + f"({_name_str}); the action was not executed. " + f"Ask me to retry if you want to continue." + ) + _partial_text = (_partial_text or "") + _warn + # Also fire as a streaming delta so the user sees it now + # instead of only in the persisted transcript. + try: + self._fire_stream_delta(_warn) + except Exception: + pass + logger.warning( + "Partial stream dropped tool call(s) %s after %s chars " + "of text; surfaced warning to user: %s", + _partial_names, len(_partial_text or ""), result["error"], + ) + else: + logger.warning( + "Partial stream delivered before error; returning stub " + "response with %s chars of recovered content to prevent " + "duplicate messages: %s", + len(_partial_text or ""), + result["error"], + ) _stub_msg = SimpleNamespace( role="assistant", content=_partial_text, tool_calls=None, reasoning_content=None, diff --git a/tests/run_agent/test_streaming.py b/tests/run_agent/test_streaming.py index 73a9872020..6afe36ee3a 100644 --- a/tests/run_agent/test_streaming.py +++ b/tests/run_agent/test_streaming.py @@ -952,3 +952,138 @@ class TestAnthropicStreamCallbacks: agent._interruptible_streaming_api_call({}) assert touch_calls.count("receiving stream response") == len(events) + + +class TestPartialToolCallWarning: + """Regression: when a stream dies mid tool-call argument generation after + text was already delivered, the partial-stream stub at run_agent.py + line ~6107 used to silently set ``tool_calls=None`` and return + ``finish_reason=stop``, losing the attempted action with zero user-facing + signal. Live-observed Apr 2026 with MiniMax M2.7 on a 6-minute audit + task — agent streamed commentary, emitted a write_file tool call, + MiniMax stalled for 240 s mid-arguments, stale-stream detector killed + the connection, the stub returned, session ended with no file written + and no error shown. + + Fix: when the stream accumulator captured any tool-call names before the + error, the stub now appends a user-visible warning to content AND fires + it as a stream delta so the user sees it immediately. + """ + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_partial_tool_call_surfaces_warning(self, mock_close, mock_create): + """Stream with text + partial tool-call name + mid-stream error + produces a stub whose content contains the user-visible warning + and whose tool_calls is None.""" + from run_agent import AIAgent + + class _StallError(RuntimeError): + pass + + def _stalling_stream(): + yield _make_stream_chunk(content="Let me write the audit: ") + yield _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_1", name="write_file"), + ]) + yield _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, arguments='{"path": "/tmp/x", '), + ]) + raise _StallError("simulated upstream stall") + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = lambda *a, **kw: _stalling_stream() + mock_create.return_value = mock_client + + agent = AIAgent( + api_key="test-key", + base_url="https://openrouter.ai/api/v1", + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + + fired_deltas: list = [] + agent._fire_stream_delta = lambda text: fired_deltas.append(text) + agent._current_streamed_assistant_text = "Let me write the audit: " + + import os as _os + _prev = _os.environ.get("HERMES_STREAM_RETRIES") + _os.environ["HERMES_STREAM_RETRIES"] = "0" + try: + response = agent._interruptible_streaming_api_call({}) + finally: + if _prev is None: + _os.environ.pop("HERMES_STREAM_RETRIES", None) + else: + _os.environ["HERMES_STREAM_RETRIES"] = _prev + + content = response.choices[0].message.content or "" + assert "Let me write the audit:" in content, ( + f"Partial text not preserved in stub: {content!r}" + ) + assert "Stream stalled mid tool-call" in content, ( + f"Stub content is missing the dropped-tool-call warning; users " + f"get silent failure. Got content={content!r}" + ) + assert "write_file" in content, ( + f"Warning should name the dropped tool. Got: {content!r}" + ) + assert response.choices[0].message.tool_calls is None + assert any("Stream stalled mid tool-call" in d for d in fired_deltas), ( + f"Warning was not surfaced as a live stream delta. " + f"fired_deltas={fired_deltas}" + ) + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_partial_text_only_no_warning(self, mock_close, mock_create): + """Text-only partial stream (no tool call mid-flight) keeps the + pre-fix behaviour: bare recovered text, no warning noise.""" + from run_agent import AIAgent + + class _StallError(RuntimeError): + pass + + def _stalling_stream(): + yield _make_stream_chunk(content="Here's my answer so far") + raise _StallError("simulated upstream stall") + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = lambda *a, **kw: _stalling_stream() + mock_create.return_value = mock_client + + agent = AIAgent( + api_key="test-key", + base_url="https://openrouter.ai/api/v1", + model="test/model", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + agent.api_mode = "chat_completions" + agent._interrupt_requested = False + agent._current_streamed_assistant_text = "Here's my answer so far" + + import os as _os + _prev = _os.environ.get("HERMES_STREAM_RETRIES") + _os.environ["HERMES_STREAM_RETRIES"] = "0" + try: + response = agent._interruptible_streaming_api_call({}) + finally: + if _prev is None: + _os.environ.pop("HERMES_STREAM_RETRIES", None) + else: + _os.environ["HERMES_STREAM_RETRIES"] = _prev + + content = response.choices[0].message.content or "" + assert content == "Here's my answer so far", ( + f"Pre-fix behaviour regressed for text-only partial streams: {content!r}" + ) + assert "Stream stalled" not in content, ( + f"Unexpected warning on text-only partial stream: {content!r}" + ) +