fix(streaming): handle Anthropic client in stale stream detector

The streaming stale detector in _run_streaming_api_call() only handled
the OpenAI/chat-completions path when killing a stale connection. When
api_mode is 'anthropic_messages', request_client_holder['client'] is
always None (the Anthropic SDK uses its own internal transport), so the
existing code was a no-op: the stale Anthropic stream was never actually
closed, and no fresh connection was ever established.

This meant that after the stale detector fired, the inner _call_anthropic()
thread would keep waiting on the same dead stream indefinitely. Each
subsequent stale detector tick would emit another 'Reconnecting...' status
message but do nothing to reset the connection, causing the agent to appear
stuck for the full duration of the stale stream's lifetime.

The fix mirrors the existing pattern already used in two other locations
in the same file (the non-streaming stale detector at ~line 5235 and the
interrupt handler at ~line 6199): branch on api_mode and call
self._anthropic_client.close() + rebuild via build_anthropic_client()
for the Anthropic path, while keeping the existing OpenAI path unchanged.

The _replace_primary_openai_client() call is also guarded to only run
on the non-Anthropic path, since it operates on the OpenAI client pool
and is irrelevant when using the Anthropic SDK.
This commit is contained in:
xssssrf 2026-04-23 14:48:45 +08:00
parent 88b6eb9ad1
commit 13850714c6

View file

@ -6177,17 +6177,28 @@ class AIAgent:
f"Reconnecting..."
)
try:
rc = request_client_holder.get("client")
if rc is not None:
self._close_request_openai_client(rc, reason="stale_stream_kill")
if self.api_mode == "anthropic_messages":
from agent.anthropic_adapter import build_anthropic_client
self._anthropic_client.close()
self._anthropic_client = build_anthropic_client(
self._anthropic_api_key,
getattr(self, "_anthropic_base_url", None),
timeout=get_provider_request_timeout(self.provider, self.model),
)
else:
rc = request_client_holder.get("client")
if rc is not None:
self._close_request_openai_client(rc, reason="stale_stream_kill")
except Exception:
pass
# Rebuild the primary client too — its connection pool
# may hold dead sockets from the same provider outage.
try:
self._replace_primary_openai_client(reason="stale_stream_pool_cleanup")
except Exception:
pass
if self.api_mode != "anthropic_messages":
try:
self._replace_primary_openai_client(reason="stale_stream_pool_cleanup")
except Exception:
pass
# Reset the timer so we don't kill repeatedly while
# the inner thread processes the closure.
last_chunk_time["t"] = time.time()