diff --git a/agent/chat_completion_helpers.py b/agent/chat_completion_helpers.py index ba5f02ba867..f710697a03e 100644 --- a/agent/chat_completion_helpers.py +++ b/agent/chat_completion_helpers.py @@ -34,6 +34,7 @@ from typing import Any, Dict, List, Optional, Tuple from urllib.parse import urlparse, parse_qs, urlunparse from hermes_cli.timeouts import get_provider_request_timeout, get_provider_stale_timeout +from hermes_constants import PARTIAL_STREAM_STUB_ID, FINISH_REASON_LENGTH from agent.error_classifier import classify_api_error, FailoverReason from agent.model_metadata import is_local_endpoint from agent.message_sanitization import ( @@ -2172,37 +2173,15 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= if deltas_were_sent["yes"]: # Streaming failed AFTER some tokens were already delivered to # the platform. Re-raising would let the outer retry loop make - # a new API call, creating a duplicate message. Return a - # partial response stub instead and let the outer loop decide: - # - # - text-only partials → finish_reason="length" so the - # conversation loop persists the partial assistant content - # and asks the model to continue from where the stream - # died (issue #30963: partial stop misclassified as a - # clean completion was exiting the loop with budget - # remaining and an unfinished goal). - # - # - partial mid-tool-call → finish_reason="stop" stays. - # The user-visible warning we append says "Ask me to - # retry if you want to continue", so the agent should - # hand control back rather than auto-retry a tool call - # that may have side-effects. - # - # Recover whatever content was already streamed to the user. - # _current_streamed_assistant_text accumulates text fired - # through _fire_stream_delta, so it has exactly what the - # user saw before the connection died. + # Return a partial response stub with finish_reason="length" + # so the conversation loop's continuation machinery fires. + # tool_calls=None prevents auto-execution of incomplete calls. _partial_text = ( getattr(agent, "_current_streamed_assistant_text", "") or "" ).strip() or None - # If the stream died while the model was emitting a tool call, - # the stub below will silently set `tool_calls=None` and the - # agent loop will treat the turn as complete — the attempted - # action is lost with no user-facing signal. Append a - # human-visible warning to the stub content so (a) the user - # knows something failed, and (b) the next turn's model sees - # in conversation history what was attempted and can retry. + # Append a user-visible warning if tool calls were dropped so + # the user and model both know what was attempted. _partial_names = list(result.get("partial_tool_names") or []) if _partial_names: _name_str = ", ".join(_partial_names[:3]) @@ -2214,8 +2193,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= f"Ask me to retry if you want to continue." ) _partial_text = (_partial_text or "") + _warn - # Also fire as a streaming delta so the user sees it now - # instead of only in the persisted transcript. + # Fire as streaming delta so the user sees it immediately. try: agent._fire_stream_delta(_warn) except Exception: @@ -2225,7 +2203,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= "of text; surfaced warning to user: %s", _partial_names, len(_partial_text or ""), result["error"], ) - _stub_finish_reason = "stop" + _stub_finish_reason = FINISH_REASON_LENGTH else: logger.warning( "Partial stream delivered before error; returning " @@ -2235,18 +2213,19 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= len(_partial_text or ""), result["error"], ) - _stub_finish_reason = "length" + _stub_finish_reason = FINISH_REASON_LENGTH _stub_msg = SimpleNamespace( role="assistant", content=_partial_text, tool_calls=None, reasoning_content=None, ) return SimpleNamespace( - id="partial-stream-stub", + id=PARTIAL_STREAM_STUB_ID, model=getattr(agent, "model", "unknown"), choices=[SimpleNamespace( index=0, message=_stub_msg, finish_reason=_stub_finish_reason, )], usage=None, + _dropped_tool_names=_partial_names or None, ) raise result["error"] return result["response"] diff --git a/agent/conversation_loop.py b/agent/conversation_loop.py index 049cbcdf67e..2f08810ae1b 100644 --- a/agent/conversation_loop.py +++ b/agent/conversation_loop.py @@ -65,7 +65,7 @@ from agent.prompt_caching import apply_anthropic_cache_control from agent.retry_utils import jittered_backoff from agent.trajectory import has_incomplete_scratchpad from agent.usage_pricing import estimate_usage_cost, normalize_usage -from hermes_constants import display_hermes_home as _dhh_fn +from hermes_constants import display_hermes_home as _dhh_fn, PARTIAL_STREAM_STUB_ID from hermes_logging import set_session_context from tools.schema_sanitizer import strip_pattern_and_format from tools.skill_provenance import set_current_write_origin @@ -229,6 +229,37 @@ def _restore_or_build_system_prompt(agent, system_message, conversation_history) ) +def _get_continuation_prompt(is_partial_stub: bool, dropped_tools: Optional[List[str]] = None) -> str: + if is_partial_stub and dropped_tools: + tool_list = ", ".join(dropped_tools[:3]) + return ( + "[System: Your previous tool call " + f"({tool_list}) was too large and " + "the stream timed out before it " + "could be delivered. Do NOT retry " + "the same tool call with the same " + "large content. Instead, break the " + "content into multiple smaller tool " + "calls (e.g. use multiple patch calls " + "or write smaller files). Each tool " + "call's arguments must be under ~8K " + "tokens to avoid stream timeouts.]" + ) + elif is_partial_stub: + return ( + "[System: The previous response was cut off by a " + "network error mid-stream. Continue exactly where " + "you left off. Do not restart or repeat prior text. " + "Finish the answer directly.]" + ) + else: + return ( + "[System: Your previous response was truncated by the output " + "length limit. Continue exactly where you left off. Do not " + "restart or repeat prior text. Finish the answer directly.]" + ) + + def run_conversation( agent, user_message: str, @@ -1414,7 +1445,7 @@ def run_conversation( finish_reason = "length" if finish_reason == "length": - if getattr(response, "id", "") == "partial-stream-stub": + if getattr(response, "id", "") == PARTIAL_STREAM_STUB_ID: agent._vprint( f"{agent.log_prefix}⚠️ Stream interrupted by network error " f"(finish_reason='length' on partial-stream-stub)", @@ -1518,37 +1549,36 @@ def run_conversation( truncated_response_parts.append(assistant_message.content) if length_continue_retries < 3: - # Distinguish a real output-token truncation - # from a partial-stream-stub network error - # (#30963). Same continuation machinery, - # but the prompt has to tell the truth or - # the model goes off rails ("I wasn't - # truncated, I'm done"). _is_partial_stream_stub = ( - getattr(response, "id", "") == "partial-stream-stub" + getattr(response, "id", "") == PARTIAL_STREAM_STUB_ID ) - if _is_partial_stream_stub: + _dropped_tools = getattr( + response, "_dropped_tool_names", None + ) + + if _is_partial_stream_stub and _dropped_tools: + _tool_list = ", ".join(_dropped_tools[:3]) + agent._vprint( + f"{agent.log_prefix}↻ Stream interrupted mid " + f"tool-call ({_tool_list}) — requesting " + f"chunked retry " + f"({length_continue_retries}/3)..." + ) + elif _is_partial_stream_stub: agent._vprint( f"{agent.log_prefix}↻ Stream interrupted — " f"requesting continuation " f"({length_continue_retries}/3)..." ) - _continue_content = ( - "[System: The previous response was cut off by a " - "network error mid-stream. Continue exactly where " - "you left off. Do not restart or repeat prior text. " - "Finish the answer directly.]" - ) else: agent._vprint( f"{agent.log_prefix}↻ Requesting continuation " f"({length_continue_retries}/3)..." ) - _continue_content = ( - "[System: Your previous response was truncated by the output " - "length limit. Continue exactly where you left off. Do not " - "restart or repeat prior text. Finish the answer directly.]" - ) + + _continue_content = _get_continuation_prompt( + _is_partial_stream_stub, _dropped_tools + ) continue_msg = { "role": "user", "content": _continue_content, diff --git a/hermes_constants.py b/hermes_constants.py index f2d01157664..b54adf01786 100644 --- a/hermes_constants.py +++ b/hermes_constants.py @@ -432,6 +432,14 @@ def apply_ipv4_preference(force: bool = False) -> None: socket.getaddrinfo = _ipv4_getaddrinfo # type: ignore[assignment] +# ─── Streaming Response Constants ──────────────────────────────────────────── + +# Response ID for partial stream stubs used during error recovery +PARTIAL_STREAM_STUB_ID = "partial-stream-stub" + +FINISH_REASON_LENGTH = "length" + + OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1" OPENROUTER_MODELS_URL = f"{OPENROUTER_BASE_URL}/models" diff --git a/tests/run_agent/test_partial_stream_finish_reason.py b/tests/run_agent/test_partial_stream_finish_reason.py index f6948844f43..77aea3353e2 100644 --- a/tests/run_agent/test_partial_stream_finish_reason.py +++ b/tests/run_agent/test_partial_stream_finish_reason.py @@ -5,9 +5,9 @@ Pins the contract: - text-only partial stream → stub.finish_reason == "length" so the conversation loop's existing length-continuation path can keep the agent moving against an unfinished goal. -- partial mid-tool-call → stub.finish_reason == "stop" so the loop - hands control back to the user (matches the user-visible warning - "Ask me to retry if you want to continue"). +- partial mid-tool-call → stub.finish_reason == "length" so the loop + triggers continuation machinery with targeted chunking guidance + instead of ending the turn immediately. - conversation_loop's length-continuation prompt distinguishes a real output-length truncation from a partial-stream-stub network error via response.id. @@ -20,6 +20,9 @@ from unittest.mock import MagicMock, patch import pytest +from hermes_constants import PARTIAL_STREAM_STUB_ID, FINISH_REASON_LENGTH +from agent.conversation_loop import _get_continuation_prompt + # ── Helpers (mirrors test_streaming.py) ──────────────────────────────────── @@ -78,8 +81,8 @@ class TestPartialStreamStubFinishReason: monkeypatch.setenv("HERMES_STREAM_RETRIES", "0") response = agent._interruptible_streaming_api_call({}) - assert response.id == "partial-stream-stub" - assert response.choices[0].finish_reason == "length", ( + assert response.id == PARTIAL_STREAM_STUB_ID + assert response.choices[0].finish_reason == FINISH_REASON_LENGTH, ( "Text-only partial streams must use finish_reason=length so the " "conversation loop continues from where the network died " "(issue #30963)." @@ -89,9 +92,11 @@ class TestPartialStreamStubFinishReason: @patch("run_agent.AIAgent._create_request_openai_client") @patch("run_agent.AIAgent._close_request_openai_client") - def test_partial_tool_call_keeps_stop(self, _mock_close, mock_create, monkeypatch): - """Mid-tool-call partials keep finish_reason=stop on purpose — the - warning text asks the user to drive the retry, not the agent.""" + def test_partial_tool_call_uses_length(self, _mock_close, mock_create, monkeypatch): + """Mid-tool-call partials now use finish_reason=length so the + conversation loop's continuation machinery fires — bounded 3-retry + with guidance to break output into smaller chunks (#31998). + tool_calls=None is preserved, so no tool auto-executes.""" def _stalling_stream(): yield _make_stream_chunk(content="Let me write the audit: ") @@ -114,12 +119,18 @@ class TestPartialStreamStubFinishReason: monkeypatch.setenv("HERMES_STREAM_RETRIES", "0") response = agent._interruptible_streaming_api_call({}) - assert response.id == "partial-stream-stub" - assert response.choices[0].finish_reason == "stop", ( - "Partial mid-tool-call must keep finish_reason=stop — the warning " - "appended to content asks the user to retry, so the agent must " - "not auto-replay a tool call with possible side-effects." + assert response.id == PARTIAL_STREAM_STUB_ID + assert response.choices[0].finish_reason == FINISH_REASON_LENGTH, ( + "Partial mid-tool-call must use finish_reason=length so the " + "continuation machinery fires instead of ending the turn " + "immediately (#31998)." ) + assert response.choices[0].message.tool_calls is None, ( + "tool_calls must remain None (no auto-execution of side-effectful " + "tool calls)." + ) + # The stub should carry dropped tool names for continuation prompt + assert getattr(response, "_dropped_tool_names", None) == ["write_file"] content = response.choices[0].message.content or "" assert "Stream stalled mid tool-call" in content assert "write_file" in content @@ -129,30 +140,17 @@ class TestPartialStreamStubFinishReason: class TestLengthContinuationPromptBranching: """When finish_reason=length, the continuation prompt that reaches the - model has to tell the truth: real truncation vs. network interruption. - Lying ("you were truncated") on a partial-stream stub leads the model - to no-op ("I wasn't truncated, I'm done"), defeating recovery.""" + model has to tell the truth: real truncation vs. network interruption + vs. dropped tool call (#31998). Three distinct prompts now exist.""" - def _simulate_branch(self, response_id: str) -> str: + def _simulate_branch(self, response_id: str, dropped_tools=None) -> str: """Return the continuation prompt text the loop would inject for - a `finish_reason=length` response with the given id. Mirrors the - exact branch in agent/conversation_loop.py.""" - response = SimpleNamespace(id=response_id) - if getattr(response, "id", "") == "partial-stream-stub": - return ( - "[System: The previous response was cut off by a " - "network error mid-stream. Continue exactly where " - "you left off. Do not restart or repeat prior text. " - "Finish the answer directly.]" - ) - return ( - "[System: Your previous response was truncated by the output " - "length limit. Continue exactly where you left off. Do not " - "restart or repeat prior text. Finish the answer directly.]" - ) + a `finish_reason=length` response with the given id.""" + is_partial = response_id == PARTIAL_STREAM_STUB_ID + return _get_continuation_prompt(is_partial, dropped_tools) def test_partial_stream_stub_uses_network_prompt(self): - prompt = self._simulate_branch("partial-stream-stub") + prompt = self._simulate_branch(PARTIAL_STREAM_STUB_ID) assert "network error mid-stream" in prompt assert "output length limit" not in prompt @@ -165,6 +163,19 @@ class TestLengthContinuationPromptBranching: prompt = self._simulate_branch("") assert "output length limit" in prompt + def test_dropped_tool_call_uses_chunking_prompt(self): + """When the stub dropped a tool call, the continuation prompt + must guide the model to break its output into smaller chunks + instead of retrying the same large tool call (#31998).""" + prompt = self._simulate_branch( + PARTIAL_STREAM_STUB_ID, dropped_tools=["write_file"], + ) + assert "too large" in prompt + assert "break" in prompt.lower() + assert "write_file" in prompt + assert "network error" not in prompt + assert "output length limit" not in prompt + # ── Integration: live conversation loop ─────────────────────────────────── @@ -208,12 +219,12 @@ class TestConversationLoopPartialStreamContinuation: # First API call: the partial-stream stub (length on partial-stream-stub id). partial_stub = SimpleNamespace( - id="partial-stream-stub", + id=PARTIAL_STREAM_STUB_ID, model="test/model", choices=[SimpleNamespace( index=0, message=_mock_assistant_msg(content="The first half of "), - finish_reason="length", + finish_reason=FINISH_REASON_LENGTH, )], usage=None, )