diff --git a/run_agent.py b/run_agent.py index 5c946591c2..cdfab0e943 100644 --- a/run_agent.py +++ b/run_agent.py @@ -4704,6 +4704,11 @@ class AIAgent: Each worker thread gets its own OpenAI client instance. Interrupts only close that worker-local client, so retries and other requests never inherit a closed transport. + + Includes a stale-call detector: if no response arrives within the + configured timeout, the connection is killed and an error raised so + the main retry loop can try again with backoff / credential rotation / + provider fallback. """ result = {"response": None, "error": None} request_client_holder = {"client": None} @@ -4729,10 +4734,86 @@ class AIAgent: if request_client is not None: self._close_request_openai_client(request_client, reason="request_complete") + # ── Stale-call timeout (mirrors streaming stale detector) ──────── + # Non-streaming calls return nothing until the full response is + # ready. Without this, a hung provider can block for the full + # httpx timeout (default 1800s) with zero feedback. The stale + # detector kills the connection early so the main retry loop can + # apply richer recovery (credential rotation, provider fallback). + _stale_base = float(os.getenv("HERMES_API_CALL_STALE_TIMEOUT", 300.0)) + _base_url = getattr(self, "_base_url", None) or "" + if _stale_base == 300.0 and _base_url and is_local_endpoint(_base_url): + _stale_timeout = float("inf") + else: + _est_tokens = sum(len(str(v)) for v in api_kwargs.get("messages", [])) // 4 + if _est_tokens > 100_000: + _stale_timeout = max(_stale_base, 600.0) + elif _est_tokens > 50_000: + _stale_timeout = max(_stale_base, 450.0) + else: + _stale_timeout = _stale_base + + _call_start = time.time() + self._touch_activity("waiting for non-streaming API response") + t = threading.Thread(target=_call, daemon=True) t.start() + _poll_count = 0 while t.is_alive(): t.join(timeout=0.3) + _poll_count += 1 + + # Touch activity every ~30s so the gateway's inactivity + # monitor knows we're alive while waiting for the response. + if _poll_count % 100 == 0: # 100 × 0.3s = 30s + _elapsed = time.time() - _call_start + self._touch_activity( + f"waiting for non-streaming response ({int(_elapsed)}s elapsed)" + ) + + # Stale-call detector: kill the connection if no response + # arrives within the configured timeout. + _elapsed = time.time() - _call_start + if _elapsed > _stale_timeout: + _est_ctx = sum(len(str(v)) for v in api_kwargs.get("messages", [])) // 4 + logger.warning( + "Non-streaming API call stale for %.0fs (threshold %.0fs). " + "model=%s context=~%s tokens. Killing connection.", + _elapsed, _stale_timeout, + api_kwargs.get("model", "unknown"), f"{_est_ctx:,}", + ) + self._emit_status( + f"⚠️ No response from provider for {int(_elapsed)}s " + f"(non-streaming, model: {api_kwargs.get('model', 'unknown')}). " + f"Aborting call." + ) + try: + if self.api_mode == "anthropic_messages": + from agent.anthropic_adapter import build_anthropic_client + + self._anthropic_client.close() + self._anthropic_client = build_anthropic_client( + self._anthropic_api_key, + getattr(self, "_anthropic_base_url", None), + ) + else: + rc = request_client_holder.get("client") + if rc is not None: + self._close_request_openai_client(rc, reason="stale_call_kill") + except Exception: + pass + self._touch_activity( + f"stale non-streaming call killed after {int(_elapsed)}s" + ) + # Wait briefly for the thread to notice the closed connection. + t.join(timeout=2.0) + if result["error"] is None and result["response"] is None: + result["error"] = TimeoutError( + f"Non-streaming API call timed out after {int(_elapsed)}s " + f"with no response (threshold: {int(_stale_timeout)}s)" + ) + break + if self._interrupt_requested: # Force-close the in-flight worker-local HTTP connection to stop # token generation without poisoning the shared client used to @@ -5247,6 +5328,10 @@ class AIAgent: f"({type(e).__name__}). Reconnecting… " f"(attempt {_stream_attempt + 2}/{_max_stream_retries + 1})" ) + self._touch_activity( + f"stream retry {_stream_attempt + 2}/{_max_stream_retries + 1} " + f"after {type(e).__name__}" + ) # Close the stale request client before retry stale = request_client_holder.get("client") if stale is not None: @@ -5270,8 +5355,7 @@ class AIAgent: "try again in a moment." ) logger.warning( - "Streaming exhausted %s retries on transient error, " - "falling back to non-streaming: %s", + "Streaming exhausted %s retries on transient error: %s", _max_stream_retries + 1, e, ) @@ -5282,25 +5366,24 @@ class AIAgent: and "not supported" in _err_lower ) if _is_stream_unsupported: + self._disable_streaming = True self._safe_print( "\n⚠ Streaming is not supported for this " - "model/provider. Falling back to non-streaming.\n" + "model/provider. Switching to non-streaming.\n" " To avoid this delay, set display.streaming: false " "in config.yaml\n" ) logger.info( - "Streaming failed before delivery, falling back to non-streaming: %s", + "Streaming failed before delivery: %s", e, ) - try: - # Reset stale timer — the non-streaming fallback - # uses its own client; prevent the stale detector - # from firing on stale timestamps from failed streams. - last_chunk_time["t"] = time.time() - result["response"] = self._interruptible_api_call(api_kwargs) - except Exception as fallback_err: - result["error"] = fallback_err + # Propagate the error to the main retry loop instead of + # falling back to non-streaming inline. The main loop has + # richer recovery: credential rotation, provider fallback, + # backoff, and — for "stream not supported" — will switch + # to non-streaming on the next attempt via _disable_streaming. + result["error"] = e return finally: request_client = request_client_holder.get("client") @@ -5366,6 +5449,9 @@ class AIAgent: # Reset the timer so we don't kill repeatedly while # the inner thread processes the closure. last_chunk_time["t"] = time.time() + self._touch_activity( + f"stale stream detected after {int(_stale_elapsed)}s, reconnecting" + ) if self._interrupt_requested: try: @@ -8150,7 +8236,12 @@ class AIAgent: self.thinking_callback("") _use_streaming = True - if not self._has_stream_consumers(): + # Provider signaled "stream not supported" on a previous + # attempt — switch to non-streaming for the rest of this + # session instead of re-failing every retry. + if getattr(self, "_disable_streaming", False): + _use_streaming = False + elif not self._has_stream_consumers(): # No display/TTS consumer. Still prefer streaming for # health checking, but skip for Mock clients in tests # (mocks return SimpleNamespace, not stream iterators). @@ -8352,6 +8443,7 @@ class AIAgent: # Sleep in small increments to stay responsive to interrupts sleep_end = time.time() + wait_time + _backoff_touch_counter = 0 while time.time() < sleep_end: if self._interrupt_requested: self._vprint(f"{self.log_prefix}⚡ Interrupt detected during retry wait, aborting.", force=True) @@ -8365,6 +8457,14 @@ class AIAgent: "interrupted": True, } time.sleep(0.2) + # Touch activity every ~30s so the gateway's inactivity + # monitor knows we're alive during backoff waits. + _backoff_touch_counter += 1 + if _backoff_touch_counter % 150 == 0: # 150 × 0.2s = 30s + self._touch_activity( + f"retry backoff ({retry_count}/{max_retries}), " + f"{int(sleep_end - time.time())}s remaining" + ) continue # Retry the API call # Check finish_reason before proceeding @@ -8837,6 +8937,9 @@ class AIAgent: retry_count += 1 elapsed_time = time.time() - api_start_time + self._touch_activity( + f"API error recovery (attempt {retry_count}/{max_retries})" + ) error_type = type(api_error).__name__ error_msg = str(api_error).lower() @@ -9363,6 +9466,7 @@ class AIAgent: # Sleep in small increments so we can respond to interrupts quickly # instead of blocking the entire wait_time in one sleep() call sleep_end = time.time() + wait_time + _backoff_touch_counter = 0 while time.time() < sleep_end: if self._interrupt_requested: self._vprint(f"{self.log_prefix}⚡ Interrupt detected during retry wait, aborting.", force=True) @@ -9376,6 +9480,14 @@ class AIAgent: "interrupted": True, } time.sleep(0.2) # Check interrupt every 200ms + # Touch activity every ~30s so the gateway's inactivity + # monitor knows we're alive during backoff waits. + _backoff_touch_counter += 1 + if _backoff_touch_counter % 150 == 0: # 150 × 0.2s = 30s + self._touch_activity( + f"error retry backoff ({retry_count}/{max_retries}), " + f"{int(sleep_end - time.time())}s remaining" + ) # If the API call was interrupted, skip response processing if interrupted: diff --git a/tests/run_agent/test_run_agent.py b/tests/run_agent/test_run_agent.py index 49a5a33d1f..7d0ddd1c83 100644 --- a/tests/run_agent/test_run_agent.py +++ b/tests/run_agent/test_run_agent.py @@ -3519,8 +3519,8 @@ class TestStreamingApiCall: call_kwargs = agent.client.chat.completions.create.call_args assert call_kwargs[1].get("stream") is True or call_kwargs.kwargs.get("stream") is True - def test_api_exception_falls_back_to_non_streaming(self, agent): - """When streaming fails before any deltas, fallback to non-streaming is attempted.""" + def test_api_exception_propagates_no_non_streaming_fallback(self, agent): + """When streaming fails before any deltas, error propagates to the main retry loop.""" agent.client.chat.completions.create.side_effect = ConnectionError("fail") # Prevent stream retry logic from replacing the mock client with patch.object(agent, "_replace_primary_openai_client", return_value=False): diff --git a/tests/run_agent/test_streaming.py b/tests/run_agent/test_streaming.py index 37a61ac370..1943b06117 100644 --- a/tests/run_agent/test_streaming.py +++ b/tests/run_agent/test_streaming.py @@ -374,13 +374,19 @@ class TestStreamingCallbacks: class TestStreamingFallback: - """Verify fallback to non-streaming on ANY streaming error.""" + """Verify streaming errors propagate to the main retry loop. + + Previously, streaming errors triggered an inline fallback to + non-streaming. Now they propagate so the main retry loop can apply + richer recovery (credential rotation, provider fallback, backoff). + The only special case: 'stream not supported' sets _disable_streaming + so the *next* main-loop retry uses non-streaming automatically. + """ - @patch("run_agent.AIAgent._interruptible_api_call") @patch("run_agent.AIAgent._create_request_openai_client") @patch("run_agent.AIAgent._close_request_openai_client") - def test_stream_error_falls_back(self, mock_close, mock_create, mock_non_stream): - """'not supported' error triggers fallback to non-streaming.""" + def test_stream_not_supported_sets_flag_and_raises(self, mock_close, mock_create): + """'not supported' error sets _disable_streaming and propagates.""" from run_agent import AIAgent mock_client = MagicMock() @@ -389,23 +395,6 @@ class TestStreamingFallback: ) mock_create.return_value = mock_client - fallback_response = SimpleNamespace( - id="fallback", - model="test", - choices=[SimpleNamespace( - index=0, - message=SimpleNamespace( - role="assistant", - content="fallback response", - tool_calls=None, - reasoning_content=None, - ), - finish_reason="stop", - )], - usage=None, - ) - mock_non_stream.return_value = fallback_response - agent = AIAgent( model="test/model", quiet_mode=True, @@ -415,16 +404,16 @@ class TestStreamingFallback: agent.api_mode = "chat_completions" agent._interrupt_requested = False - response = agent._interruptible_streaming_api_call({}) + with pytest.raises(Exception, match="Streaming is not supported"): + agent._interruptible_streaming_api_call({}) - assert response.choices[0].message.content == "fallback response" - mock_non_stream.assert_called_once() + # The flag should be set so the main retry loop switches to non-streaming + assert agent._disable_streaming is True - @patch("run_agent.AIAgent._interruptible_api_call") @patch("run_agent.AIAgent._create_request_openai_client") @patch("run_agent.AIAgent._close_request_openai_client") - def test_any_stream_error_falls_back(self, mock_close, mock_create, mock_non_stream): - """ANY streaming error triggers fallback — not just specific messages.""" + def test_non_transport_error_propagates(self, mock_close, mock_create): + """Non-transport streaming errors propagate to the main retry loop.""" from run_agent import AIAgent mock_client = MagicMock() @@ -433,23 +422,6 @@ class TestStreamingFallback: ) mock_create.return_value = mock_client - fallback_response = SimpleNamespace( - id="fallback", - model="test", - choices=[SimpleNamespace( - index=0, - message=SimpleNamespace( - role="assistant", - content="fallback after connection error", - tool_calls=None, - reasoning_content=None, - ), - finish_reason="stop", - )], - usage=None, - ) - mock_non_stream.return_value = fallback_response - agent = AIAgent( model="test/model", quiet_mode=True, @@ -459,24 +431,19 @@ class TestStreamingFallback: agent.api_mode = "chat_completions" agent._interrupt_requested = False - response = agent._interruptible_streaming_api_call({}) + with pytest.raises(Exception, match="Connection reset by peer"): + agent._interruptible_streaming_api_call({}) - assert response.choices[0].message.content == "fallback after connection error" - mock_non_stream.assert_called_once() - - @patch("run_agent.AIAgent._interruptible_api_call") @patch("run_agent.AIAgent._create_request_openai_client") @patch("run_agent.AIAgent._close_request_openai_client") - def test_fallback_error_propagates(self, mock_close, mock_create, mock_non_stream): - """When both streaming AND fallback fail, the fallback error propagates.""" + def test_stream_error_propagates_original(self, mock_close, mock_create): + """The original streaming error propagates (not a fallback error).""" from run_agent import AIAgent mock_client = MagicMock() mock_client.chat.completions.create.side_effect = Exception("stream broke") mock_create.return_value = mock_client - mock_non_stream.side_effect = Exception("Rate limit exceeded") - agent = AIAgent( model="test/model", quiet_mode=True, @@ -486,14 +453,13 @@ class TestStreamingFallback: agent.api_mode = "chat_completions" agent._interrupt_requested = False - with pytest.raises(Exception, match="Rate limit exceeded"): + with pytest.raises(Exception, match="stream broke"): agent._interruptible_streaming_api_call({}) - @patch("run_agent.AIAgent._interruptible_api_call") @patch("run_agent.AIAgent._create_request_openai_client") @patch("run_agent.AIAgent._close_request_openai_client") - def test_exhausted_transient_stream_error_falls_back(self, mock_close, mock_create, mock_non_stream): - """Transient stream errors retry first, then fall back after retries are exhausted.""" + def test_exhausted_transient_stream_error_propagates(self, mock_close, mock_create): + """Transient stream errors retry first, then propagate after retries exhausted.""" from run_agent import AIAgent import httpx @@ -501,23 +467,6 @@ class TestStreamingFallback: mock_client.chat.completions.create.side_effect = httpx.ConnectError("socket closed") mock_create.return_value = mock_client - fallback_response = SimpleNamespace( - id="fallback", - model="test", - choices=[SimpleNamespace( - index=0, - message=SimpleNamespace( - role="assistant", - content="fallback after retries exhausted", - tool_calls=None, - reasoning_content=None, - ), - finish_reason="stop", - )], - usage=None, - ) - mock_non_stream.return_value = fallback_response - agent = AIAgent( model="test/model", quiet_mode=True, @@ -527,23 +476,22 @@ class TestStreamingFallback: agent.api_mode = "chat_completions" agent._interrupt_requested = False - response = agent._interruptible_streaming_api_call({}) + with pytest.raises(httpx.ConnectError, match="socket closed"): + agent._interruptible_streaming_api_call({}) - assert response.choices[0].message.content == "fallback after retries exhausted" + # Should have retried 3 times (default HERMES_STREAM_RETRIES=2 → 3 attempts) assert mock_client.chat.completions.create.call_count == 3 - mock_non_stream.assert_called_once() assert mock_close.call_count >= 1 - @patch("run_agent.AIAgent._interruptible_api_call") @patch("run_agent.AIAgent._create_request_openai_client") @patch("run_agent.AIAgent._close_request_openai_client") - def test_sse_connection_lost_retried_as_transient(self, mock_close, mock_create, mock_non_stream): + def test_sse_connection_lost_retried_as_transient(self, mock_close, mock_create): """SSE 'Network connection lost' (APIError w/ no status_code) retries like httpx errors. OpenRouter sends {"error":{"message":"Network connection lost."}} as an SSE event when the upstream stream drops. The OpenAI SDK raises APIError from this. It should be retried at the streaming level, same as httpx connection - errors, before falling back to non-streaming. + errors, then propagate to the main retry loop after exhaustion. """ from run_agent import AIAgent import httpx @@ -561,23 +509,6 @@ class TestStreamingFallback: mock_client.chat.completions.create.side_effect = sse_error mock_create.return_value = mock_client - fallback_response = SimpleNamespace( - id="fallback", - model="test", - choices=[SimpleNamespace( - index=0, - message=SimpleNamespace( - role="assistant", - content="fallback after SSE retries", - tool_calls=None, - reasoning_content=None, - ), - finish_reason="stop", - )], - usage=None, - ) - mock_non_stream.return_value = fallback_response - agent = AIAgent( model="test/model", quiet_mode=True, @@ -587,21 +518,18 @@ class TestStreamingFallback: agent.api_mode = "chat_completions" agent._interrupt_requested = False - response = agent._interruptible_streaming_api_call({}) + with pytest.raises(OAIAPIError): + agent._interruptible_streaming_api_call({}) - assert response.choices[0].message.content == "fallback after SSE retries" # Should retry 3 times (default HERMES_STREAM_RETRIES=2 → 3 attempts) - # before falling back to non-streaming assert mock_client.chat.completions.create.call_count == 3 - mock_non_stream.assert_called_once() # Connection cleanup should happen for each failed retry assert mock_close.call_count >= 2 - @patch("run_agent.AIAgent._interruptible_api_call") @patch("run_agent.AIAgent._create_request_openai_client") @patch("run_agent.AIAgent._close_request_openai_client") - def test_sse_non_connection_error_falls_back_immediately(self, mock_close, mock_create, mock_non_stream): - """SSE errors that aren't connection-related still fall back immediately (no stream retry).""" + def test_sse_non_connection_error_propagates_immediately(self, mock_close, mock_create): + """SSE errors that aren't connection-related propagate immediately (no stream retry).""" from run_agent import AIAgent import httpx @@ -616,23 +544,6 @@ class TestStreamingFallback: mock_client.chat.completions.create.side_effect = sse_error mock_create.return_value = mock_client - fallback_response = SimpleNamespace( - id="fallback", - model="test", - choices=[SimpleNamespace( - index=0, - message=SimpleNamespace( - role="assistant", - content="fallback no retry", - tool_calls=None, - reasoning_content=None, - ), - finish_reason="stop", - )], - usage=None, - ) - mock_non_stream.return_value = fallback_response - agent = AIAgent( model="test/model", quiet_mode=True, @@ -642,12 +553,11 @@ class TestStreamingFallback: agent.api_mode = "chat_completions" agent._interrupt_requested = False - response = agent._interruptible_streaming_api_call({}) + with pytest.raises(OAIAPIError): + agent._interruptible_streaming_api_call({}) - assert response.choices[0].message.content == "fallback no retry" - # Should NOT retry — goes straight to non-streaming fallback + # Should NOT retry — propagates immediately assert mock_client.chat.completions.create.call_count == 1 - mock_non_stream.assert_called_once() # ── Test: Reasoning Streaming ────────────────────────────────────────────