fix(streaming): rebuild Anthropic client on stream cleanup instead of OpenAI client

interruptible_streaming_api_call() has three connection-pool cleanup
sites that called _replace_primary_openai_client() unconditionally.
For api_mode=anthropic_messages this has two consequences:

1. _replace_primary_openai_client() fails (OPENAI_API_KEY unset on
   Anthropic-only configs), so dead connections are never purged.
2. The stale-stream detector's outer-poll site (L1977) is the only
   mechanism that can interrupt the worker thread while it blocks in
   for event in stream:. Because the Anthropic client is never closed,
   the thread stays blocked until the 900 s httpx read-timeout fires,
   producing a visible 15-minute hang for Telegram/gateway users on
   claude-opus-4-7.

Fix: mirror the existing interrupt-path pattern (L1989-1997) at all
three cleanup sites — if api_mode == "anthropic_messages", call
_anthropic_client.close() + _rebuild_anthropic_client() instead of
_replace_primary_openai_client(). _rebuild_anthropic_client() handles
both direct Anthropic and Bedrock-hosted Claude correctly, unlike the
inline build_anthropic_client() calls in open PR #14430.

PR #14430 (open) covers only the outer stale-detector site (L1977).
PR #23678 (open) covers only the inner retry sites (L1774, L1833).
This PR covers all three sites and uses _rebuild_anthropic_client()
for Bedrock parity.

Fixes #28161
This commit is contained in:
EloquentBrush0x 2026-05-19 01:44:57 +03:00 committed by Teknium
parent 6f1a176b33
commit a0b9663c7c
2 changed files with 187 additions and 16 deletions

View file

@ -2459,12 +2459,19 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
diag=request_client_holder.get("diag"),
)
_close_request_client_once("stream_mid_tool_retry_cleanup")
try:
agent._replace_primary_openai_client(
reason="stream_mid_tool_retry_pool_cleanup"
)
except Exception:
pass
if agent.api_mode == "anthropic_messages":
try:
agent._anthropic_client.close()
agent._rebuild_anthropic_client()
except Exception:
pass
else:
try:
agent._replace_primary_openai_client(
reason="stream_mid_tool_retry_pool_cleanup"
)
except Exception:
pass
continue
# SSE error events from proxies (e.g. OpenRouter sends
@ -2512,12 +2519,19 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
_close_request_client_once("stream_retry_cleanup")
# Also rebuild the primary client to purge
# any dead connections from the pool.
try:
agent._replace_primary_openai_client(
reason="stream_retry_pool_cleanup"
)
except Exception:
pass
if agent.api_mode == "anthropic_messages":
try:
agent._anthropic_client.close()
agent._rebuild_anthropic_client()
except Exception:
pass
else:
try:
agent._replace_primary_openai_client(
reason="stream_retry_pool_cleanup"
)
except Exception:
pass
continue
# Retries exhausted. Log the final failure with
# full diagnostic detail (chain, headers,
@ -2688,10 +2702,17 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
pass
# Rebuild the primary client too — its connection pool
# may hold dead sockets from the same provider outage.
try:
agent._replace_primary_openai_client(reason="stale_stream_pool_cleanup")
except Exception:
pass
if agent.api_mode == "anthropic_messages":
try:
agent._anthropic_client.close()
agent._rebuild_anthropic_client()
except Exception:
pass
else:
try:
agent._replace_primary_openai_client(reason="stale_stream_pool_cleanup")
except Exception:
pass
# Reset the timer so we don't kill repeatedly while
# the inner thread processes the closure.
last_chunk_time["t"] = time.time()

View file

@ -0,0 +1,150 @@
"""Anthropic stream cleanup must call _anthropic_client.close() + _rebuild_anthropic_client(),
not _replace_primary_openai_client(), to avoid 15-minute hangs on Anthropic-native configs.
Three cleanup sites in chat_completion_helpers.interruptible_streaming_api_call() were
calling _replace_primary_openai_client() unconditionally. For api_mode=anthropic_messages
this silently fails (no OPENAI_API_KEY) and leaves the in-flight httpx stream unclosed,
blocking the worker thread until the 900s httpx read-timeout fires.
Tests cover:
- stream_retry_pool_cleanup (connection error on fresh stream, L1836)
- stale_stream_pool_cleanup (outer poll loop detects stale stream, L1987)
Fixes #28161
"""
import threading
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import httpx
import pytest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_anthropic_agent(**kwargs):
from run_agent import AIAgent
defaults = dict(
api_key="test-key",
base_url="https://example.com/v1",
model="claude-opus-4-7",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
defaults.update(kwargs)
agent = AIAgent(**defaults)
agent.api_mode = "anthropic_messages"
agent._anthropic_client = MagicMock()
agent._anthropic_api_key = "test-anthropic-key"
return agent
def _good_stream_cm():
"""Context manager whose stream yields no events and returns a valid message."""
cm = MagicMock()
stream = MagicMock()
stream.__iter__ = MagicMock(return_value=iter([]))
msg = MagicMock()
msg.content = []
msg.stop_reason = "end_turn"
msg.usage = SimpleNamespace(input_tokens=10, output_tokens=5)
stream.get_final_message = MagicMock(return_value=msg)
cm.__enter__ = MagicMock(return_value=stream)
cm.__exit__ = MagicMock(return_value=False)
return cm
def _failing_stream_cm():
"""Context manager whose __enter__ raises ConnectError immediately."""
cm = MagicMock()
cm.__enter__ = MagicMock(
side_effect=httpx.ConnectError("connection reset by peer")
)
return cm
# ---------------------------------------------------------------------------
# Tests
# ---------------------------------------------------------------------------
class TestAnthropicStreamPoolCleanup:
"""_replace_primary_openai_client must not be called for api_mode=anthropic_messages."""
@pytest.mark.filterwarnings(
"ignore::pytest.PytestUnhandledThreadExceptionWarning"
)
def test_stream_retry_calls_anthropic_rebuild_not_openai(self):
"""Connection error during stream retry → close+rebuild Anthropic client, not OpenAI."""
agent = _make_anthropic_agent()
attempt_count = [0]
def _stream_side_effect(*args, **kwargs):
attempt_count[0] += 1
if attempt_count[0] == 1:
return _failing_stream_cm()
return _good_stream_cm()
agent._anthropic_client.messages.stream.side_effect = _stream_side_effect
with patch.object(agent, "_rebuild_anthropic_client") as mock_rebuild:
with patch.object(
agent, "_replace_primary_openai_client"
) as mock_replace:
agent._interruptible_streaming_api_call({})
mock_replace.assert_not_called()
mock_rebuild.assert_called_once()
agent._anthropic_client.close.assert_called_once()
@pytest.mark.filterwarnings(
"ignore::pytest.PytestUnhandledThreadExceptionWarning"
)
def test_stale_stream_calls_anthropic_rebuild_not_openai(self, monkeypatch):
"""Stale-stream outer-poll detector → close+rebuild Anthropic client, not OpenAI."""
monkeypatch.setenv("HERMES_STREAM_STALE_TIMEOUT", "0.1")
agent = _make_anthropic_agent()
unblock = threading.Event()
attempt_count = [0]
def _stream_side_effect(*args, **kwargs):
attempt_count[0] += 1
if attempt_count[0] == 1:
# First attempt: stream that yields nothing (triggers stale detector),
# then raises ConnectError once _anthropic_client.close() unblocks it.
cm = MagicMock()
stream = MagicMock()
def _blocking_gen():
unblock.wait(timeout=5.0)
raise httpx.ConnectError("connection dropped after close()")
yield # make this a generator so next() triggers the wait
stream.__iter__ = MagicMock(return_value=_blocking_gen())
cm.__enter__ = MagicMock(return_value=stream)
cm.__exit__ = MagicMock(return_value=False)
return cm
# Second attempt: succeed
return _good_stream_cm()
agent._anthropic_client.messages.stream.side_effect = _stream_side_effect
# close() on the mock Anthropic client unblocks the inner thread.
agent._anthropic_client.close.side_effect = unblock.set
with patch.object(agent, "_rebuild_anthropic_client") as mock_rebuild:
with patch.object(
agent, "_replace_primary_openai_client"
) as mock_replace:
agent._interruptible_streaming_api_call({})
mock_replace.assert_not_called()
# close() and rebuild called at least once by the stale detector.
agent._anthropic_client.close.assert_called()
assert mock_rebuild.call_count >= 1