mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-24 05:41:40 +00:00
fix(auxiliary): evict async wrappers on poisoned client (follow-up to #23482)
#23482 fixed cache poisoning in the sync path: when a Codex auxiliary timeout closes the underlying OpenAI client, _evict_cached_client_instance walks CodexAuxiliaryClient wrappers via their _real_client attribute and drops the cache entry so the next aux call rebuilds. The cache key includes async_mode (see _client_cache_key), so the sync and async clients for the same provider live in two distinct entries pointing at the same underlying transport. The fix walked the sync wrapper's _real_client correctly but the async wrappers (AsyncCodexAuxiliaryClient, AsyncAnthropicAuxiliaryClient, AsyncGeminiNativeClient) never exposed _real_client at all, so the async entry survived eviction and kept handing out the poisoned client. Effect on async aux callers: one timeout now poisons every subsequent async aux call (compression, vision, session_search, title_generation) with 'Connection error' until gateway restart -- even while the sync route recovered as designed in #23482. Mirror the sync wrapper's _real_client onto each async wrapper so the existing eviction helper finds them. Three changes, one per wrapper: - AsyncCodexAuxiliaryClient: self._real_client = sync_wrapper._real_client (the underlying OpenAI client) - AsyncAnthropicAuxiliaryClient: same shape - AsyncGeminiNativeClient: self._real_client = sync_client (Gemini's native facade is itself the leaf; no OpenAI client beneath it) Update _evict_cached_client_instance docstring to reflect that it now covers both sync and async wrappers via the same attribute walk. Test: TestAuxiliaryClientPoisonedCacheEviction.test_evict_cached_client_instance_walks_async_wrapper seeds both sync and async cache entries pointing at the same leaf and asserts both are dropped on a single eviction call. Verified the test fails without the wrapper changes ("async cache entry survived eviction -- wrapper is missing _real_client") and passes with them. Refs #23482, #23432
This commit is contained in:
parent
1d00716754
commit
111b859e49
3 changed files with 60 additions and 3 deletions
|
|
@ -900,6 +900,14 @@ class AsyncCodexAuxiliaryClient:
|
||||||
self.chat = _AsyncCodexChatShim(async_adapter)
|
self.chat = _AsyncCodexChatShim(async_adapter)
|
||||||
self.api_key = sync_wrapper.api_key
|
self.api_key = sync_wrapper.api_key
|
||||||
self.base_url = sync_wrapper.base_url
|
self.base_url = sync_wrapper.base_url
|
||||||
|
# Mirror the sync wrapper's _real_client so cache eviction by leaf
|
||||||
|
# OpenAI client (e.g. _close_client_on_timeout in #23482) drops
|
||||||
|
# this async entry too. Without this, sync and async cache entries
|
||||||
|
# diverge on poisoning: the sync entry is evicted but the async
|
||||||
|
# entry keeps reusing the closed transport, failing every
|
||||||
|
# subsequent async aux call with 'Connection error' until the
|
||||||
|
# gateway restarts.
|
||||||
|
self._real_client = sync_wrapper._real_client
|
||||||
|
|
||||||
|
|
||||||
class _AnthropicCompletionsAdapter:
|
class _AnthropicCompletionsAdapter:
|
||||||
|
|
@ -1035,6 +1043,9 @@ class AsyncAnthropicAuxiliaryClient:
|
||||||
self.chat = _AsyncAnthropicChatShim(async_adapter)
|
self.chat = _AsyncAnthropicChatShim(async_adapter)
|
||||||
self.api_key = sync_wrapper.api_key
|
self.api_key = sync_wrapper.api_key
|
||||||
self.base_url = sync_wrapper.base_url
|
self.base_url = sync_wrapper.base_url
|
||||||
|
# See AsyncCodexAuxiliaryClient: mirror _real_client so cache
|
||||||
|
# eviction on a poisoned underlying client also drops this entry.
|
||||||
|
self._real_client = sync_wrapper._real_client
|
||||||
|
|
||||||
|
|
||||||
def _endpoint_speaks_anthropic_messages(base_url: str) -> bool:
|
def _endpoint_speaks_anthropic_messages(base_url: str) -> bool:
|
||||||
|
|
@ -2108,9 +2119,13 @@ def _evict_cached_client_instance(target: Any) -> bool:
|
||||||
transport after a timeout, broken streaming session, etc.) so the next
|
transport after a timeout, broken streaming session, etc.) so the next
|
||||||
auxiliary call rebuilds rather than reusing the dead instance.
|
auxiliary call rebuilds rather than reusing the dead instance.
|
||||||
|
|
||||||
Walks ``CodexAuxiliaryClient`` wrappers via their ``_real_client`` so a
|
Walks both sync and async wrappers (``CodexAuxiliaryClient``,
|
||||||
timeout that closes the underlying ``OpenAI`` client also evicts the
|
``AnthropicAuxiliaryClient``, ``AsyncCodexAuxiliaryClient``, etc.) via
|
||||||
Codex shim that exposed it.
|
their ``_real_client`` attribute so a timeout that closes the underlying
|
||||||
|
``OpenAI`` (or native provider) client evicts every cached shim that
|
||||||
|
exposed it. Async wrappers must mirror their sync sibling's
|
||||||
|
``_real_client`` for this to work — otherwise the sync entry is evicted
|
||||||
|
but the async entry survives and keeps reusing the dead transport.
|
||||||
|
|
||||||
Returns True when at least one entry was evicted.
|
Returns True when at least one entry was evicted.
|
||||||
"""
|
"""
|
||||||
|
|
|
||||||
|
|
@ -945,6 +945,12 @@ class AsyncGeminiNativeClient:
|
||||||
self.api_key = sync_client.api_key
|
self.api_key = sync_client.api_key
|
||||||
self.base_url = sync_client.base_url
|
self.base_url = sync_client.base_url
|
||||||
self.chat = _AsyncGeminiChatNamespace(self)
|
self.chat = _AsyncGeminiChatNamespace(self)
|
||||||
|
# Expose the underlying sync client as _real_client so the auxiliary
|
||||||
|
# cache's eviction-by-leaf-client helper (#23482) can find and drop
|
||||||
|
# this async entry when the sync GeminiNativeClient is poisoned.
|
||||||
|
# GeminiNativeClient is itself the leaf (no OpenAI client beneath
|
||||||
|
# it), so we point at the sync_client directly.
|
||||||
|
self._real_client = sync_client
|
||||||
|
|
||||||
async def _create_chat_completion(self, **kwargs: Any) -> Any:
|
async def _create_chat_completion(self, **kwargs: Any) -> Any:
|
||||||
stream = bool(kwargs.get("stream"))
|
stream = bool(kwargs.get("stream"))
|
||||||
|
|
|
||||||
|
|
@ -2183,6 +2183,42 @@ class TestAuxiliaryClientPoisonedCacheEviction:
|
||||||
assert _evict_cached_client_instance(None) is False
|
assert _evict_cached_client_instance(None) is False
|
||||||
assert _evict_cached_client_instance(MagicMock()) is False
|
assert _evict_cached_client_instance(MagicMock()) is False
|
||||||
|
|
||||||
|
def test_evict_cached_client_instance_walks_async_wrapper(self):
|
||||||
|
"""async_mode is part of the cache key so sync and async share the same
|
||||||
|
underlying OpenAI client across two distinct cache entries. A single
|
||||||
|
timeout that closes the leaf must evict BOTH — otherwise the async
|
||||||
|
entry survives, keeps reusing the dead transport, and every async
|
||||||
|
aux call (compression, vision, session_search) fails fast with
|
||||||
|
'Connection error' until gateway restart even while the sync route
|
||||||
|
recovers.
|
||||||
|
|
||||||
|
Regression for the async-side gap left by #23482, which fixed the
|
||||||
|
sync wrapper's _real_client walk but missed the async wrappers.
|
||||||
|
"""
|
||||||
|
from agent.auxiliary_client import (
|
||||||
|
_client_cache, _client_cache_lock, _evict_cached_client_instance,
|
||||||
|
CodexAuxiliaryClient, AsyncCodexAuxiliaryClient,
|
||||||
|
)
|
||||||
|
|
||||||
|
real = SimpleNamespace(api_key="k", base_url="https://chatgpt.com/backend-api/codex",
|
||||||
|
responses=SimpleNamespace(stream=lambda **k: None),
|
||||||
|
close=lambda: None)
|
||||||
|
sync_wrapper = CodexAuxiliaryClient(real, "gpt-5.5")
|
||||||
|
async_wrapper = AsyncCodexAuxiliaryClient(sync_wrapper)
|
||||||
|
with _client_cache_lock:
|
||||||
|
_client_cache.clear()
|
||||||
|
_client_cache[("openai-codex", False, None, None, None)] = (sync_wrapper, "gpt-5.5", None)
|
||||||
|
_client_cache[("openai-codex", True, None, None, None)] = (async_wrapper, "gpt-5.5", None)
|
||||||
|
try:
|
||||||
|
assert _evict_cached_client_instance(real) is True
|
||||||
|
assert ("openai-codex", False, None, None, None) not in _client_cache
|
||||||
|
assert ("openai-codex", True, None, None, None) not in _client_cache, (
|
||||||
|
"async cache entry survived eviction — wrapper is missing _real_client"
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
with _client_cache_lock:
|
||||||
|
_client_cache.clear()
|
||||||
|
|
||||||
def test_codex_timeout_evicts_cached_wrapper(self):
|
def test_codex_timeout_evicts_cached_wrapper(self):
|
||||||
"""The timeout closer evicts the cache entry that wraps the closed client."""
|
"""The timeout closer evicts the cache entry that wraps the closed client."""
|
||||||
from agent.auxiliary_client import (
|
from agent.auxiliary_client import (
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue