fix(agent): aggregate anthropic aux calls via stream

This commit is contained in:
tt-a1i 2026-06-19 16:51:41 +08:00 committed by kshitij
parent 5e93075fd5
commit 46f9d53468
5 changed files with 221 additions and 7 deletions

View file

@ -2535,3 +2535,56 @@ def sanitize_anthropic_kwargs(api_kwargs: Any, *, log_prefix: str = "") -> Any:
sorted(leaked),
)
return api_kwargs
def _is_stream_unavailable_error(exc: Exception) -> bool:
"""Return True when an Anthropic stream call should fall back to create()."""
err_lower = str(exc).lower()
if "stream" in err_lower and "not supported" in err_lower:
return True
if "invokemodelwithresponsestream" in err_lower:
from agent.bedrock_adapter import is_streaming_access_denied_error
return is_streaming_access_denied_error(exc)
return False
def create_anthropic_message(
client: Any,
api_kwargs: dict,
*,
log_prefix: str = "",
prefer_stream: bool = True,
) -> Any:
"""Create an Anthropic message, aggregating via stream when available.
Some Anthropic-compatible gateways are SSE-only: they ignore non-streaming
requests and return ``text/event-stream`` even for ``messages.create()``.
The SDK can surface that as raw text, so callers that expect a Message then
crash on ``.content``. Prefer ``messages.stream().get_final_message()`` to
match the main turn path, falling back to ``create()`` only for providers
that explicitly do not support streaming, such as restricted Bedrock roles.
"""
sanitize_anthropic_kwargs(api_kwargs, log_prefix=log_prefix)
messages_api = getattr(client, "messages", None)
stream_fn = getattr(messages_api, "stream", None)
if prefer_stream and callable(stream_fn):
stream_kwargs = dict(api_kwargs)
stream_kwargs.pop("stream", None)
try:
with stream_fn(**stream_kwargs) as stream:
return stream.get_final_message()
except Exception as exc:
if not _is_stream_unavailable_error(exc):
raise
logger.debug(
"%sAnthropic Messages stream unavailable; falling back to "
"messages.create(): %s",
log_prefix,
exc,
)
create_kwargs = dict(api_kwargs)
create_kwargs.pop("stream", None)
return messages_api.create(**create_kwargs)

View file

@ -997,7 +997,7 @@ class _AnthropicCompletionsAdapter:
self._is_oauth = is_oauth
def create(self, **kwargs) -> Any:
from agent.anthropic_adapter import build_anthropic_kwargs
from agent.anthropic_adapter import build_anthropic_kwargs, create_anthropic_message
from agent.transports import get_transport
messages = kwargs.get("messages", [])
@ -1041,7 +1041,7 @@ class _AnthropicCompletionsAdapter:
if not _forbids_sampling_params(model):
anthropic_kwargs["temperature"] = temperature
response = self._client.messages.create(**anthropic_kwargs)
response = create_anthropic_message(self._client, anthropic_kwargs)
_transport = get_transport("anthropic_messages")
_nr = _transport.normalize_response(
response, strip_tool_prefix=self._is_oauth

View file

@ -4076,11 +4076,13 @@ class AIAgent:
# Defensive: strip Responses-only kwargs that can leak in under an
# api_mode-flip race (the Anthropic SDK raises a non-retryable
# TypeError on them). See #31673.
from agent.anthropic_adapter import sanitize_anthropic_kwargs
sanitize_anthropic_kwargs(
api_kwargs, log_prefix=getattr(self, "log_prefix", "")
from agent.anthropic_adapter import create_anthropic_message
return create_anthropic_message(
self._anthropic_client,
api_kwargs,
log_prefix=getattr(self, "log_prefix", ""),
prefer_stream=not bool(getattr(self, "_disable_streaming", False)),
)
return self._anthropic_client.messages.create(**api_kwargs)
def _rebuild_anthropic_client(self) -> None:
"""Rebuild the Anthropic client after an interrupt or stale call.

View file

@ -38,6 +38,20 @@ def _jwt_with_claims(claims: dict) -> str:
return f"{header}.{payload}.sig"
class _FakeAnthropicStream:
def __init__(self, final_message):
self._final_message = final_message
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def get_final_message(self):
return self._final_message
@pytest.fixture(autouse=True)
def _clean_env(monkeypatch):
"""Strip provider env vars so each test starts clean."""
@ -990,6 +1004,37 @@ class TestVisionClientFallback:
assert client.__class__.__name__ == "AnthropicAuxiliaryClient"
assert model == "claude-haiku-4-5-20251001"
def test_anthropic_auxiliary_client_aggregates_stream_response(self):
from agent.auxiliary_client import AnthropicAuxiliaryClient
final_message = SimpleNamespace(
content=[SimpleNamespace(type="text", text="streamed aux response")],
stop_reason="end_turn",
usage=SimpleNamespace(input_tokens=3, output_tokens=4),
)
messages_api = SimpleNamespace(
stream=MagicMock(return_value=_FakeAnthropicStream(final_message)),
create=MagicMock(return_value="raw event-stream text"),
)
real_client = SimpleNamespace(messages=messages_api)
client = AnthropicAuxiliaryClient(
real_client,
"claude-sonnet-4-20250514",
"sk-test",
"https://sse-only.example/v1",
)
response = client.chat.completions.create(
messages=[{"role": "user", "content": "summarize"}],
max_tokens=16,
)
messages_api.stream.assert_called_once()
messages_api.create.assert_not_called()
assert response.choices[0].message.content == "streamed aux response"
assert response.usage.prompt_tokens == 3
assert response.usage.completion_tokens == 4
class TestAuxiliaryPoolAwareness:
def test_try_nous_uses_pool_entry(self):

View file

@ -5813,12 +5813,126 @@ class TestAnthropicCredentialRefresh:
response = SimpleNamespace(content=[])
agent._anthropic_client = MagicMock()
agent._anthropic_client.messages.create.return_value = response
stream_cm = MagicMock()
stream_cm.__enter__.return_value.get_final_message.return_value = response
agent._anthropic_client.messages.stream.return_value = stream_cm
with patch.object(agent, "_try_refresh_anthropic_client_credentials", return_value=True) as refresh:
result = agent._anthropic_messages_create({"model": "claude-sonnet-4-20250514"})
refresh.assert_called_once_with()
agent._anthropic_client.messages.stream.assert_called_once_with(model="claude-sonnet-4-20250514")
agent._anthropic_client.messages.create.assert_not_called()
assert result is response
def test_anthropic_messages_create_falls_back_when_stream_unavailable(self):
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()),
):
agent = AIAgent(
api_key="sk-ant-oat01-current-token",
base_url="https://openrouter.ai/api/v1",
api_mode="anthropic_messages",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
response = SimpleNamespace(content=[])
agent._anthropic_client = MagicMock()
agent._anthropic_client.messages.stream.side_effect = RuntimeError(
"stream is not supported by this provider"
)
agent._anthropic_client.messages.create.return_value = response
with patch.object(agent, "_try_refresh_anthropic_client_credentials", return_value=False):
result = agent._anthropic_messages_create({"model": "claude-sonnet-4-20250514"})
agent._anthropic_client.messages.stream.assert_called_once_with(model="claude-sonnet-4-20250514")
agent._anthropic_client.messages.create.assert_called_once_with(model="claude-sonnet-4-20250514")
assert result is response
def test_anthropic_messages_create_honors_disable_streaming(self):
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()),
):
agent = AIAgent(
api_key="sk-ant-oat01-current-token",
base_url="https://openrouter.ai/api/v1",
api_mode="anthropic_messages",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
response = SimpleNamespace(content=[])
agent._disable_streaming = True
agent._anthropic_client = MagicMock()
agent._anthropic_client.messages.create.return_value = response
with patch.object(agent, "_try_refresh_anthropic_client_credentials", return_value=False):
result = agent._anthropic_messages_create({"model": "claude-sonnet-4-20250514"})
agent._anthropic_client.messages.stream.assert_not_called()
agent._anthropic_client.messages.create.assert_called_once_with(model="claude-sonnet-4-20250514")
assert result is response
def test_anthropic_messages_create_does_not_mask_bedrock_stream_validation_errors(self):
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()),
):
agent = AIAgent(
api_key="sk-ant-oat01-current-token",
base_url="https://bedrock-runtime.us-east-1.amazonaws.com",
api_mode="anthropic_messages",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
exc = RuntimeError("ValidationException: InvokeModelWithResponseStream input malformed")
agent._anthropic_client = MagicMock()
agent._anthropic_client.messages.stream.side_effect = exc
with (
patch.object(agent, "_try_refresh_anthropic_client_credentials", return_value=False),
pytest.raises(RuntimeError, match="input malformed"),
):
agent._anthropic_messages_create({"model": "claude-sonnet-4-20250514"})
agent._anthropic_client.messages.create.assert_not_called()
def test_anthropic_messages_create_falls_back_for_bedrock_stream_access_denied(self):
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()),
):
agent = AIAgent(
api_key="sk-ant-oat01-current-token",
base_url="https://bedrock-runtime.us-east-1.amazonaws.com",
api_mode="anthropic_messages",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
response = SimpleNamespace(content=[])
agent._anthropic_client = MagicMock()
agent._anthropic_client.messages.stream.side_effect = RuntimeError(
"User is not authorized to perform: bedrock:InvokeModelWithResponseStream"
)
agent._anthropic_client.messages.create.return_value = response
with patch.object(agent, "_try_refresh_anthropic_client_credentials", return_value=False):
result = agent._anthropic_messages_create({"model": "claude-sonnet-4-20250514"})
agent._anthropic_client.messages.create.assert_called_once_with(model="claude-sonnet-4-20250514")
assert result is response