diff --git a/agent/chat_completion_helpers.py b/agent/chat_completion_helpers.py index 705743ec708..6d1ae59f3cf 100644 --- a/agent/chat_completion_helpers.py +++ b/agent/chat_completion_helpers.py @@ -2788,7 +2788,30 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= role="assistant", content=_partial_text, tool_calls=None, reasoning_content=None, ) - return SimpleNamespace( + # Detect provider output-layer content filtering (e.g. MiniMax + # "output new_sensitive (1027)", Azure/OpenAI content_filter, + # Anthropic safety refusal). The raw error is about to be + # swallowed into a finish_reason=length stub, so classify it HERE + # while we still have it and stamp the stub. Retrying such a + # content-deterministic filter on the same primary just re-hits + # the filter — the conversation loop reads this tag and activates + # the fallback chain instead of burning continuation retries. + # error_classifier is the single source of truth for "what counts + # as a content filter" (#32421). + _content_filter_terminated = False + try: + from agent.error_classifier import classify_api_error, FailoverReason + _cls = classify_api_error( + result["error"], + provider=str(getattr(agent, "provider", "") or ""), + model=str(getattr(agent, "model", "") or ""), + ) + _content_filter_terminated = ( + _cls.reason == FailoverReason.content_policy_blocked + ) + except Exception: + _content_filter_terminated = False + _stub = SimpleNamespace( id=PARTIAL_STREAM_STUB_ID, model=getattr(agent, "model", "unknown"), choices=[SimpleNamespace( @@ -2797,6 +2820,9 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= usage=None, _dropped_tool_names=_partial_names or None, ) + if _content_filter_terminated: + _stub._content_filter_terminated = True + return _stub raise result["error"] return result["response"] diff --git a/agent/conversation_loop.py b/agent/conversation_loop.py index 218bf1bfbec..10825cfd683 100644 --- a/agent/conversation_loop.py +++ b/agent/conversation_loop.py @@ -1699,6 +1699,56 @@ def run_conversation( if agent.api_mode in {"chat_completions", "bedrock_converse", "anthropic_messages"}: assistant_message = _trunc_msg + # ── Content-filter stream stall → fallback (#32421) ── + # When the provider's output-layer safety filter (e.g. + # MiniMax "output new_sensitive (1027)", Azure + # content_filter) kills the stream mid-delivery, the + # raw error was classified at the swallow point and the + # stub tagged ``_content_filter_terminated``. This + # filter is content-deterministic — continuation + # retries against the SAME primary just re-hit it and + # burn paid attempts (the loop used to give up with + # "Response remained truncated after 3 continuation + # attempts" and never consult the fallback chain). + # Escalate to the configured fallback BEFORE retrying. + _cf_terminated = getattr( + response, "_content_filter_terminated", False + ) + if ( + _cf_terminated + and agent._fallback_index < len(agent._fallback_chain) + ): + agent._vprint( + f"{agent.log_prefix}🛡️ Content filter terminated " + f"stream — activating fallback provider...", + force=True, + ) + agent._emit_status( + "Content filter terminated stream; switching to fallback..." + ) + if agent._try_activate_fallback(): + # Roll the partial content (if any was already + # appended in a prior continuation pass) back to + # the last clean turn so the fallback provider + # gets a coherent continuation point. + if truncated_response_parts: + messages = agent._get_messages_up_to_last_assistant(messages) + agent._session_messages = messages + length_continue_retries = 0 + truncated_response_parts = [] + retry_count = 0 + compression_attempts = 0 + _retry.primary_recovery_attempted = False + _retry.restart_with_rebuilt_messages = True + break + # No fallback available — fall through to normal + # continuation (best-effort, may loop). + agent._vprint( + f"{agent.log_prefix}⚠️ No fallback provider " + f"configured — retrying with same provider " + f"(may re-hit filter)...", + force=True, + ) if assistant_message is not None and not _trunc_has_tool_calls: length_continue_retries += 1 interim_msg = agent._build_assistant_message(assistant_message, finish_reason) @@ -3781,6 +3831,17 @@ def run_conversation( _retry.restart_with_compressed_messages = False continue + if _retry.restart_with_rebuilt_messages: + # A content-filter stream stall (#32421) was escalated to the + # fallback chain and the partial content rolled back. Re-issue + # the API call against the now-active fallback provider. Refund + # the budget/count for the stalled attempt so the fallback gets a + # fair turn. + api_call_count -= 1 + agent.iteration_budget.refund() + _retry.restart_with_rebuilt_messages = False + continue + if _retry.restart_with_length_continuation: # Progressively boost the output token budget on each retry. # Retry 1 → 2× base, retry 2 → 3× base, capped at 32 768. diff --git a/agent/turn_retry_state.py b/agent/turn_retry_state.py index 34183bd06be..2298e14c24c 100644 --- a/agent/turn_retry_state.py +++ b/agent/turn_retry_state.py @@ -67,6 +67,11 @@ class TurnRetryState: # ── Restart signals (read by the outer loop after the attempt) ─────── restart_with_compressed_messages: bool = False restart_with_length_continuation: bool = False + # Set when a content-filter stream stall (e.g. MiniMax "new_sensitive") + # has been escalated to the fallback chain: the partial-stream content + # was rolled back off ``messages`` and the loop should re-issue the API + # call against the newly-activated provider (#32421). + restart_with_rebuilt_messages: bool = False def __iter__(self): # Convenience for debugging / tests: iterate (name, value) pairs. diff --git a/scripts/release.py b/scripts/release.py index d64677fd5d1..5608f68b802 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -64,6 +64,7 @@ AUTHOR_MAP = { "8180647+herbalizer404@users.noreply.github.com": "herbalizer404", # PR #49076 + #51835 salvage (auxiliary compression fallback: 403/session-usage payment errors + honor fallback chain when aux provider auth unavailable) "pyxl-dev@users.noreply.github.com": "pyxl-dev", # PR #52230 salvage (include rate-limit in auxiliary capacity-error fallback gate; #52228) "yashiel@skyner.co.za": "yashiels", # PR #53284 salvage (discord markdown table-to-bullet conversion; #21168) + "15205536+595650661@users.noreply.github.com": "595650661", # PR #37851 salvage (classify MiniMax new_sensitive content filter → content_policy_blocked; #32421) "benbenwyb@gmail.com": "benbenlijie", # PR #47205 salvage (named custom-provider extra_body + Z.AI Coding overload adaptive backoff; #50663) "dana@added-value.co.il": "Danamove", # PR #46726 salvage (kill venv-resident pythonw gateway before recreating venv on Windows; #47036/#47557/#47910) "145739220+wgu9@users.noreply.github.com": "wgu9", # PR #51468 salvage (WSL/no-systemd orphan gateway tracking, #51325) diff --git a/tests/agent/test_turn_retry_state.py b/tests/agent/test_turn_retry_state.py index 21b772d6801..83664eb9800 100644 --- a/tests/agent/test_turn_retry_state.py +++ b/tests/agent/test_turn_retry_state.py @@ -30,6 +30,7 @@ EXPECTED_FIELDS = { "auth_failover_attempted", "restart_with_compressed_messages", "restart_with_length_continuation", + "restart_with_rebuilt_messages", } diff --git a/tests/run_agent/test_partial_stream_finish_reason.py b/tests/run_agent/test_partial_stream_finish_reason.py index 80474a97310..62b40d8091f 100644 --- a/tests/run_agent/test_partial_stream_finish_reason.py +++ b/tests/run_agent/test_partial_stream_finish_reason.py @@ -362,3 +362,194 @@ class TestConversationLoopPartialStreamContinuation: # And the final response stitches both halves together. assert "first half of" in result["final_response"] assert "forty-two" in result["final_response"] + + +class TestContentFilterStallActivatesFallback: + """Regression for #32421: a provider output-layer content safety filter + (e.g. MiniMax ``output new_sensitive (1027)``) terminates a streaming + response mid-delivery. The raw error is swallowed into a + finish_reason=length partial-stream stub, so before the fix the loop + burned 3 continuation retries against the SAME primary (re-hitting the + content-deterministic filter every time) and gave up with + ``"Response remained truncated after 3 continuation attempts"`` — the + configured fallback chain was never consulted. + + The fix has three layers: + 1. error_classifier classifies ``new_sensitive`` as + ``content_policy_blocked``. + 2. interruptible_streaming_api_call runs the swallowed error through + that classifier and stamps the stub ``_content_filter_terminated``. + 3. the conversation loop reads the tag and activates fallback BEFORE + burning any continuation retries. + """ + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_streaming_call_tags_content_filter_stub( + self, _mock_close, mock_create, monkeypatch, + ): + """Layer 2: the real streaming path stamps _content_filter_terminated + when the swallowed error matches a content-filter pattern.""" + + def _minimax_stall(): + yield _make_stream_chunk(content="Writing the file: ") + yield _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, tc_id="call_1", name="write_file"), + ]) + yield _make_stream_chunk(tool_calls=[ + _make_tool_call_delta(index=0, arguments='{"path": "/tmp/x", '), + ]) + raise RuntimeError("output new_sensitive (1027) [MiniMax-M2.7]") + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = ( + lambda *a, **kw: _minimax_stall() + ) + mock_create.return_value = mock_client + + agent = _make_agent() + agent._fire_stream_delta = lambda text: None + agent._current_streamed_assistant_text = "Writing the file: " + + monkeypatch.setenv("HERMES_STREAM_RETRIES", "0") + response = agent._interruptible_streaming_api_call({}) + + assert response.id == PARTIAL_STREAM_STUB_ID + assert getattr(response, "_content_filter_terminated", False) is True, ( + "MiniMax new_sensitive stream stall must tag the stub so the loop " + "can route to fallback (#32421)." + ) + + @patch("run_agent.AIAgent._create_request_openai_client") + @patch("run_agent.AIAgent._close_request_openai_client") + def test_plain_network_stall_not_tagged( + self, _mock_close, mock_create, monkeypatch, + ): + """A plain network stall (no content-filter signature) must NOT be + tagged — it should still use the normal continuation path, not + switch providers.""" + + def _network_stall(): + yield _make_stream_chunk(content="Writing the file: ") + raise RuntimeError("connection reset by peer") + + mock_client = MagicMock() + mock_client.chat.completions.create.side_effect = ( + lambda *a, **kw: _network_stall() + ) + mock_create.return_value = mock_client + + agent = _make_agent() + agent._fire_stream_delta = lambda text: None + agent._current_streamed_assistant_text = "Writing the file: " + + monkeypatch.setenv("HERMES_STREAM_RETRIES", "0") + response = agent._interruptible_streaming_api_call({}) + + assert response.id == PARTIAL_STREAM_STUB_ID + assert getattr(response, "_content_filter_terminated", False) is False, ( + "A plain network stall must not be misclassified as a content " + "filter — that would needlessly switch providers." + ) + + def test_tagged_stub_activates_fallback_first_pass(self, loop_agent): + """Layer 3: a tagged stub activates fallback on the FIRST pass, with + zero continuation retries burned, and the fallback provider then + completes the turn.""" + from tests.run_agent.test_run_agent import _mock_assistant_msg, _mock_response + + def _filter_stub(): + return SimpleNamespace( + id=PARTIAL_STREAM_STUB_ID, + model="minimax/MiniMax-M2.7", + choices=[SimpleNamespace( + index=0, + message=_mock_assistant_msg(content="Writing the file..."), + finish_reason=FINISH_REASON_LENGTH, + )], + usage=None, + _dropped_tool_names=["write_file"], + _content_filter_terminated=True, + ) + + recovery = _mock_response( + content="Done on the fallback provider.", finish_reason="stop", + ) + loop_agent.client.chat.completions.create.side_effect = [ + _filter_stub(), recovery, + ] + loop_agent._fallback_chain = [ + {"provider": "openrouter", "model": "anthropic/claude-sonnet-4.7"}, + ] + loop_agent._fallback_index = 0 + fb_calls = {"n": 0} + + def _fake_activate(reason=None): + fb_calls["n"] += 1 + loop_agent._fallback_index = len(loop_agent._fallback_chain) + return True + + with ( + patch.object(loop_agent, "_persist_session"), + patch.object(loop_agent, "_save_trajectory"), + patch.object(loop_agent, "_cleanup_task_resources"), + patch.object(loop_agent, "_try_activate_fallback", + side_effect=_fake_activate), + ): + result = loop_agent.run_conversation("write me a long file") + + assert fb_calls["n"] == 1, ( + "Content-filter-tagged stub must activate fallback exactly once, " + "on the first pass — not after exhausting continuation retries." + ) + assert result["final_response"] == "Done on the fallback provider." + assert result["completed"] is True + + def test_tagged_stub_no_fallback_falls_through(self, loop_agent): + """When no fallback chain is configured, a tagged stub falls through + to the normal continuation path (best-effort) rather than crashing.""" + from tests.run_agent.test_run_agent import _mock_assistant_msg, _mock_response + + def _filter_stub(): + return SimpleNamespace( + id=PARTIAL_STREAM_STUB_ID, + model="minimax/MiniMax-M2.7", + choices=[SimpleNamespace( + index=0, + message=_mock_assistant_msg(content="partial "), + finish_reason=FINISH_REASON_LENGTH, + )], + usage=None, + _dropped_tool_names=["write_file"], + _content_filter_terminated=True, + ) + + recovery = _mock_response(content="recovered text", finish_reason="stop") + loop_agent.client.chat.completions.create.side_effect = [ + _filter_stub(), recovery, + ] + # No fallback chain configured. + loop_agent._fallback_chain = [] + loop_agent._fallback_index = 0 + fb_calls = {"n": 0} + + def _fake_activate(reason=None): + fb_calls["n"] += 1 + return False + + with ( + patch.object(loop_agent, "_persist_session"), + patch.object(loop_agent, "_save_trajectory"), + patch.object(loop_agent, "_cleanup_task_resources"), + patch.object(loop_agent, "_try_activate_fallback", + side_effect=_fake_activate), + ): + result = loop_agent.run_conversation("write me a long file") + + # Fallback was not attempted (empty chain gates it out); the loop + # continued normally and produced a response. + assert fb_calls["n"] == 0, ( + "With an empty fallback chain, the loop must not even call " + "_try_activate_fallback — it should fall through to continuation." + ) + assert result["completed"] is True