diff --git a/agent/anthropic_adapter.py b/agent/anthropic_adapter.py index 4a586d7f0fd..03e8b58e16c 100644 --- a/agent/anthropic_adapter.py +++ b/agent/anthropic_adapter.py @@ -2535,3 +2535,56 @@ def sanitize_anthropic_kwargs(api_kwargs: Any, *, log_prefix: str = "") -> Any: sorted(leaked), ) return api_kwargs + + +def _is_stream_unavailable_error(exc: Exception) -> bool: + """Return True when an Anthropic stream call should fall back to create().""" + err_lower = str(exc).lower() + if "stream" in err_lower and "not supported" in err_lower: + return True + if "invokemodelwithresponsestream" in err_lower: + from agent.bedrock_adapter import is_streaming_access_denied_error + + return is_streaming_access_denied_error(exc) + return False + + +def create_anthropic_message( + client: Any, + api_kwargs: dict, + *, + log_prefix: str = "", + prefer_stream: bool = True, +) -> Any: + """Create an Anthropic message, aggregating via stream when available. + + Some Anthropic-compatible gateways are SSE-only: they ignore non-streaming + requests and return ``text/event-stream`` even for ``messages.create()``. + The SDK can surface that as raw text, so callers that expect a Message then + crash on ``.content``. Prefer ``messages.stream().get_final_message()`` to + match the main turn path, falling back to ``create()`` only for providers + that explicitly do not support streaming, such as restricted Bedrock roles. + """ + sanitize_anthropic_kwargs(api_kwargs, log_prefix=log_prefix) + + messages_api = getattr(client, "messages", None) + stream_fn = getattr(messages_api, "stream", None) + if prefer_stream and callable(stream_fn): + stream_kwargs = dict(api_kwargs) + stream_kwargs.pop("stream", None) + try: + with stream_fn(**stream_kwargs) as stream: + return stream.get_final_message() + except Exception as exc: + if not _is_stream_unavailable_error(exc): + raise + logger.debug( + "%sAnthropic Messages stream unavailable; falling back to " + "messages.create(): %s", + log_prefix, + exc, + ) + + create_kwargs = dict(api_kwargs) + create_kwargs.pop("stream", None) + return messages_api.create(**create_kwargs) diff --git a/agent/auxiliary_client.py b/agent/auxiliary_client.py index 86a1c765a78..f28b5f60156 100644 --- a/agent/auxiliary_client.py +++ b/agent/auxiliary_client.py @@ -997,7 +997,7 @@ class _AnthropicCompletionsAdapter: self._is_oauth = is_oauth def create(self, **kwargs) -> Any: - from agent.anthropic_adapter import build_anthropic_kwargs + from agent.anthropic_adapter import build_anthropic_kwargs, create_anthropic_message from agent.transports import get_transport messages = kwargs.get("messages", []) @@ -1041,7 +1041,7 @@ class _AnthropicCompletionsAdapter: if not _forbids_sampling_params(model): anthropic_kwargs["temperature"] = temperature - response = self._client.messages.create(**anthropic_kwargs) + response = create_anthropic_message(self._client, anthropic_kwargs) _transport = get_transport("anthropic_messages") _nr = _transport.normalize_response( response, strip_tool_prefix=self._is_oauth diff --git a/run_agent.py b/run_agent.py index 65b95483e54..7c195b35ca8 100644 --- a/run_agent.py +++ b/run_agent.py @@ -4076,11 +4076,13 @@ class AIAgent: # Defensive: strip Responses-only kwargs that can leak in under an # api_mode-flip race (the Anthropic SDK raises a non-retryable # TypeError on them). See #31673. - from agent.anthropic_adapter import sanitize_anthropic_kwargs - sanitize_anthropic_kwargs( - api_kwargs, log_prefix=getattr(self, "log_prefix", "") + from agent.anthropic_adapter import create_anthropic_message + return create_anthropic_message( + self._anthropic_client, + api_kwargs, + log_prefix=getattr(self, "log_prefix", ""), + prefer_stream=not bool(getattr(self, "_disable_streaming", False)), ) - return self._anthropic_client.messages.create(**api_kwargs) def _rebuild_anthropic_client(self) -> None: """Rebuild the Anthropic client after an interrupt or stale call. diff --git a/tests/agent/test_auxiliary_client.py b/tests/agent/test_auxiliary_client.py index b2960b703c7..8ec6102f2e5 100644 --- a/tests/agent/test_auxiliary_client.py +++ b/tests/agent/test_auxiliary_client.py @@ -38,6 +38,20 @@ def _jwt_with_claims(claims: dict) -> str: return f"{header}.{payload}.sig" +class _FakeAnthropicStream: + def __init__(self, final_message): + self._final_message = final_message + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def get_final_message(self): + return self._final_message + + @pytest.fixture(autouse=True) def _clean_env(monkeypatch): """Strip provider env vars so each test starts clean.""" @@ -990,6 +1004,37 @@ class TestVisionClientFallback: assert client.__class__.__name__ == "AnthropicAuxiliaryClient" assert model == "claude-haiku-4-5-20251001" + def test_anthropic_auxiliary_client_aggregates_stream_response(self): + from agent.auxiliary_client import AnthropicAuxiliaryClient + + final_message = SimpleNamespace( + content=[SimpleNamespace(type="text", text="streamed aux response")], + stop_reason="end_turn", + usage=SimpleNamespace(input_tokens=3, output_tokens=4), + ) + messages_api = SimpleNamespace( + stream=MagicMock(return_value=_FakeAnthropicStream(final_message)), + create=MagicMock(return_value="raw event-stream text"), + ) + real_client = SimpleNamespace(messages=messages_api) + client = AnthropicAuxiliaryClient( + real_client, + "claude-sonnet-4-20250514", + "sk-test", + "https://sse-only.example/v1", + ) + + response = client.chat.completions.create( + messages=[{"role": "user", "content": "summarize"}], + max_tokens=16, + ) + + messages_api.stream.assert_called_once() + messages_api.create.assert_not_called() + assert response.choices[0].message.content == "streamed aux response" + assert response.usage.prompt_tokens == 3 + assert response.usage.completion_tokens == 4 + class TestAuxiliaryPoolAwareness: def test_try_nous_uses_pool_entry(self): diff --git a/tests/run_agent/test_run_agent.py b/tests/run_agent/test_run_agent.py index f2787628d4d..385a296f889 100644 --- a/tests/run_agent/test_run_agent.py +++ b/tests/run_agent/test_run_agent.py @@ -5813,12 +5813,126 @@ class TestAnthropicCredentialRefresh: response = SimpleNamespace(content=[]) agent._anthropic_client = MagicMock() - agent._anthropic_client.messages.create.return_value = response + stream_cm = MagicMock() + stream_cm.__enter__.return_value.get_final_message.return_value = response + agent._anthropic_client.messages.stream.return_value = stream_cm with patch.object(agent, "_try_refresh_anthropic_client_credentials", return_value=True) as refresh: result = agent._anthropic_messages_create({"model": "claude-sonnet-4-20250514"}) refresh.assert_called_once_with() + agent._anthropic_client.messages.stream.assert_called_once_with(model="claude-sonnet-4-20250514") + agent._anthropic_client.messages.create.assert_not_called() + assert result is response + + def test_anthropic_messages_create_falls_back_when_stream_unavailable(self): + with ( + patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")), + patch("run_agent.check_toolset_requirements", return_value={}), + patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()), + ): + agent = AIAgent( + api_key="sk-ant-oat01-current-token", + base_url="https://openrouter.ai/api/v1", + api_mode="anthropic_messages", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + + response = SimpleNamespace(content=[]) + agent._anthropic_client = MagicMock() + agent._anthropic_client.messages.stream.side_effect = RuntimeError( + "stream is not supported by this provider" + ) + agent._anthropic_client.messages.create.return_value = response + + with patch.object(agent, "_try_refresh_anthropic_client_credentials", return_value=False): + result = agent._anthropic_messages_create({"model": "claude-sonnet-4-20250514"}) + + agent._anthropic_client.messages.stream.assert_called_once_with(model="claude-sonnet-4-20250514") + agent._anthropic_client.messages.create.assert_called_once_with(model="claude-sonnet-4-20250514") + assert result is response + + def test_anthropic_messages_create_honors_disable_streaming(self): + with ( + patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")), + patch("run_agent.check_toolset_requirements", return_value={}), + patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()), + ): + agent = AIAgent( + api_key="sk-ant-oat01-current-token", + base_url="https://openrouter.ai/api/v1", + api_mode="anthropic_messages", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + + response = SimpleNamespace(content=[]) + agent._disable_streaming = True + agent._anthropic_client = MagicMock() + agent._anthropic_client.messages.create.return_value = response + + with patch.object(agent, "_try_refresh_anthropic_client_credentials", return_value=False): + result = agent._anthropic_messages_create({"model": "claude-sonnet-4-20250514"}) + + agent._anthropic_client.messages.stream.assert_not_called() + agent._anthropic_client.messages.create.assert_called_once_with(model="claude-sonnet-4-20250514") + assert result is response + + def test_anthropic_messages_create_does_not_mask_bedrock_stream_validation_errors(self): + with ( + patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")), + patch("run_agent.check_toolset_requirements", return_value={}), + patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()), + ): + agent = AIAgent( + api_key="sk-ant-oat01-current-token", + base_url="https://bedrock-runtime.us-east-1.amazonaws.com", + api_mode="anthropic_messages", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + + exc = RuntimeError("ValidationException: InvokeModelWithResponseStream input malformed") + agent._anthropic_client = MagicMock() + agent._anthropic_client.messages.stream.side_effect = exc + + with ( + patch.object(agent, "_try_refresh_anthropic_client_credentials", return_value=False), + pytest.raises(RuntimeError, match="input malformed"), + ): + agent._anthropic_messages_create({"model": "claude-sonnet-4-20250514"}) + + agent._anthropic_client.messages.create.assert_not_called() + + def test_anthropic_messages_create_falls_back_for_bedrock_stream_access_denied(self): + with ( + patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")), + patch("run_agent.check_toolset_requirements", return_value={}), + patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()), + ): + agent = AIAgent( + api_key="sk-ant-oat01-current-token", + base_url="https://bedrock-runtime.us-east-1.amazonaws.com", + api_mode="anthropic_messages", + quiet_mode=True, + skip_context_files=True, + skip_memory=True, + ) + + response = SimpleNamespace(content=[]) + agent._anthropic_client = MagicMock() + agent._anthropic_client.messages.stream.side_effect = RuntimeError( + "User is not authorized to perform: bedrock:InvokeModelWithResponseStream" + ) + agent._anthropic_client.messages.create.return_value = response + + with patch.object(agent, "_try_refresh_anthropic_client_credentials", return_value=False): + result = agent._anthropic_messages_create({"model": "claude-sonnet-4-20250514"}) + agent._anthropic_client.messages.create.assert_called_once_with(model="claude-sonnet-4-20250514") assert result is response