mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-23 05:31:23 +00:00
fix(auxiliary): evict cached client on timeout/connection error (#23482)
A Codex auxiliary timeout closes the underlying OpenAI client (so the streaming hang doesn't sit until the user kills the session), but the cached wrapper kept pointing at the now-dead transport. Subsequent auxiliary calls (compression retry, memory flush, background review, title generation routed via provider: main) reused that closed client and failed fast with 'Connection error' until the gateway restarted — even though the main agent route was healthy the whole time. Sync `_get_cached_client` had no liveness check (async did, via loop identity), and the connection-error fallback in `call_llm` only fired on the auto provider path, so an explicit provider — including the common `auxiliary.compression.provider: main` shape — never evicted. Three fixes: * New `_evict_cached_client_instance(target)` helper that drops the cache entry whose stored client is target (or wraps it via `_real_client`, for `CodexAuxiliaryClient`). * `_CodexCompletionsAdapter._close_client_on_timeout` evicts the wrapper after closing the inner OpenAI client. * `call_llm` and `async_call_llm` evict on `_is_connection_error` before re-raising, regardless of whether the provider is auto. Net effect: one timeout costs one summary attempt + the existing 30s compressor cooldown; the next compaction rebuilds the client and works. Non-connection errors (4xx/5xx) do not evict, so cache hits stay stable. Closes #23432
This commit is contained in:
parent
ae83a54be4
commit
e5bce320db
2 changed files with 245 additions and 0 deletions
|
|
@ -706,6 +706,16 @@ class _CodexCompletionsAdapter:
|
||||||
close()
|
close()
|
||||||
except Exception:
|
except Exception:
|
||||||
logger.debug("Codex auxiliary: client close during timeout failed", exc_info=True)
|
logger.debug("Codex auxiliary: client close during timeout failed", exc_info=True)
|
||||||
|
# The cached auxiliary client wraps this same ``self._client``
|
||||||
|
# (or *is* a ``CodexAuxiliaryClient`` whose ``_real_client`` is
|
||||||
|
# this instance). After we close the httpx transport above, the
|
||||||
|
# cache must drop that entry — otherwise the next auxiliary call
|
||||||
|
# (compression retry, memory flush, etc.) reuses the dead client
|
||||||
|
# and fails fast with a connection error. See issue #23432.
|
||||||
|
try:
|
||||||
|
_evict_cached_client_instance(self._client)
|
||||||
|
except Exception:
|
||||||
|
logger.debug("Codex auxiliary: cache eviction on timeout failed", exc_info=True)
|
||||||
|
|
||||||
def _check_cancelled() -> None:
|
def _check_cancelled() -> None:
|
||||||
if deadline is not None and time.monotonic() >= deadline:
|
if deadline is not None and time.monotonic() >= deadline:
|
||||||
|
|
@ -1984,6 +1994,37 @@ def _evict_cached_clients(provider: str) -> None:
|
||||||
_client_cache.pop(key, None)
|
_client_cache.pop(key, None)
|
||||||
|
|
||||||
|
|
||||||
|
def _evict_cached_client_instance(target: Any) -> bool:
|
||||||
|
"""Drop the cache entry whose stored client is *target*.
|
||||||
|
|
||||||
|
Used when a specific cached client has been poisoned (closed httpx
|
||||||
|
transport after a timeout, broken streaming session, etc.) so the next
|
||||||
|
auxiliary call rebuilds rather than reusing the dead instance.
|
||||||
|
|
||||||
|
Walks ``CodexAuxiliaryClient`` wrappers via their ``_real_client`` so a
|
||||||
|
timeout that closes the underlying ``OpenAI`` client also evicts the
|
||||||
|
Codex shim that exposed it.
|
||||||
|
|
||||||
|
Returns True when at least one entry was evicted.
|
||||||
|
"""
|
||||||
|
if target is None:
|
||||||
|
return False
|
||||||
|
evicted = False
|
||||||
|
with _client_cache_lock:
|
||||||
|
for key in list(_client_cache.keys()):
|
||||||
|
entry = _client_cache.get(key)
|
||||||
|
if entry is None:
|
||||||
|
continue
|
||||||
|
cached = entry[0]
|
||||||
|
if cached is None:
|
||||||
|
continue
|
||||||
|
real = getattr(cached, "_real_client", None)
|
||||||
|
if cached is target or real is target:
|
||||||
|
del _client_cache[key]
|
||||||
|
evicted = True
|
||||||
|
return evicted
|
||||||
|
|
||||||
|
|
||||||
def _pool_cache_hint(
|
def _pool_cache_hint(
|
||||||
provider: str,
|
provider: str,
|
||||||
*,
|
*,
|
||||||
|
|
@ -4200,6 +4241,17 @@ def call_llm(
|
||||||
base_url=str(getattr(fb_client, "base_url", "") or ""))
|
base_url=str(getattr(fb_client, "base_url", "") or ""))
|
||||||
return _validate_llm_response(
|
return _validate_llm_response(
|
||||||
fb_client.chat.completions.create(**fb_kwargs), task)
|
fb_client.chat.completions.create(**fb_kwargs), task)
|
||||||
|
# Connection/timeout errors leave the cached client poisoned (closed
|
||||||
|
# httpx transport, half-read stream, dead async loop). Drop it from
|
||||||
|
# the cache regardless of whether we found a fallback above so the
|
||||||
|
# next auxiliary call rebuilds a fresh client instead of reusing the
|
||||||
|
# dead one. See issue #23432.
|
||||||
|
if _is_connection_error(first_err):
|
||||||
|
try:
|
||||||
|
_evict_cached_client_instance(client)
|
||||||
|
except Exception:
|
||||||
|
logger.debug("Auxiliary: cache eviction after connection error failed",
|
||||||
|
exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -4517,4 +4569,12 @@ async def async_call_llm(
|
||||||
fb_kwargs["model"] = async_fb_model
|
fb_kwargs["model"] = async_fb_model
|
||||||
return _validate_llm_response(
|
return _validate_llm_response(
|
||||||
await async_fb.chat.completions.create(**fb_kwargs), task)
|
await async_fb.chat.completions.create(**fb_kwargs), task)
|
||||||
|
# Mirror the sync path: drop poisoned clients on connection/timeout
|
||||||
|
# so the next aux call rebuilds. See issue #23432.
|
||||||
|
if _is_connection_error(first_err):
|
||||||
|
try:
|
||||||
|
_evict_cached_client_instance(client)
|
||||||
|
except Exception:
|
||||||
|
logger.debug("Auxiliary (async): cache eviction after connection error failed",
|
||||||
|
exc_info=True)
|
||||||
raise
|
raise
|
||||||
|
|
|
||||||
|
|
@ -2123,6 +2123,191 @@ class TestCodexAuxiliaryAdapterTimeout:
|
||||||
assert time.monotonic() - started < 0.14
|
assert time.monotonic() - started < 0.14
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Issue #23432 — auxiliary timeout poisons cached client; later aux calls fail
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestAuxiliaryClientPoisonedCacheEviction:
|
||||||
|
"""Connection/timeout errors must evict the cached aux client.
|
||||||
|
|
||||||
|
Otherwise the next auxiliary call (compression retry, memory flush,
|
||||||
|
background review) reuses the closed httpx transport and fails with
|
||||||
|
``Connection error`` even though the main provider route is healthy.
|
||||||
|
See https://github.com/NousResearch/hermes-agent/issues/23432.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_evict_cached_client_instance_drops_direct_match(self):
|
||||||
|
from agent.auxiliary_client import (
|
||||||
|
_client_cache, _client_cache_lock, _evict_cached_client_instance,
|
||||||
|
)
|
||||||
|
|
||||||
|
target = MagicMock(name="target_client")
|
||||||
|
other = MagicMock(name="other_client")
|
||||||
|
with _client_cache_lock:
|
||||||
|
_client_cache.clear()
|
||||||
|
_client_cache[("openrouter", False, None, None, None)] = (target, "x", None)
|
||||||
|
_client_cache[("anthropic", False, None, None, None)] = (other, "y", None)
|
||||||
|
try:
|
||||||
|
assert _evict_cached_client_instance(target) is True
|
||||||
|
assert ("openrouter", False, None, None, None) not in _client_cache
|
||||||
|
assert ("anthropic", False, None, None, None) in _client_cache
|
||||||
|
finally:
|
||||||
|
with _client_cache_lock:
|
||||||
|
_client_cache.clear()
|
||||||
|
|
||||||
|
def test_evict_cached_client_instance_walks_codex_wrapper(self):
|
||||||
|
"""Closing the underlying OpenAI client must evict the Codex shim."""
|
||||||
|
from agent.auxiliary_client import (
|
||||||
|
_client_cache, _client_cache_lock, _evict_cached_client_instance,
|
||||||
|
CodexAuxiliaryClient,
|
||||||
|
)
|
||||||
|
|
||||||
|
real = SimpleNamespace(api_key="k", base_url="https://chatgpt.com/backend-api/codex",
|
||||||
|
responses=SimpleNamespace(stream=lambda **k: None),
|
||||||
|
close=lambda: None)
|
||||||
|
wrapper = CodexAuxiliaryClient(real, "gpt-5.5")
|
||||||
|
with _client_cache_lock:
|
||||||
|
_client_cache.clear()
|
||||||
|
_client_cache[("openai-codex", False, None, None, None)] = (wrapper, "gpt-5.5", None)
|
||||||
|
try:
|
||||||
|
# Eviction by the inner OpenAI client must remove the wrapper entry.
|
||||||
|
assert _evict_cached_client_instance(real) is True
|
||||||
|
assert ("openai-codex", False, None, None, None) not in _client_cache
|
||||||
|
finally:
|
||||||
|
with _client_cache_lock:
|
||||||
|
_client_cache.clear()
|
||||||
|
|
||||||
|
def test_evict_cached_client_instance_handles_none_and_misses(self):
|
||||||
|
from agent.auxiliary_client import _evict_cached_client_instance
|
||||||
|
|
||||||
|
assert _evict_cached_client_instance(None) is False
|
||||||
|
assert _evict_cached_client_instance(MagicMock()) is False
|
||||||
|
|
||||||
|
def test_codex_timeout_evicts_cached_wrapper(self):
|
||||||
|
"""The timeout closer evicts the cache entry that wraps the closed client."""
|
||||||
|
from agent.auxiliary_client import (
|
||||||
|
_client_cache, _client_cache_lock,
|
||||||
|
_CodexCompletionsAdapter, CodexAuxiliaryClient,
|
||||||
|
)
|
||||||
|
|
||||||
|
class SlowAliveStream:
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def __exit__(self, exc_type, exc, tb):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
for _ in range(20):
|
||||||
|
time.sleep(0.01)
|
||||||
|
yield SimpleNamespace(type="response.in_progress")
|
||||||
|
|
||||||
|
def get_final_response(self): # pragma: no cover — timeout fires first
|
||||||
|
return SimpleNamespace(output=[], usage=None)
|
||||||
|
|
||||||
|
closed = {"flag": False}
|
||||||
|
|
||||||
|
class FakeClient:
|
||||||
|
def __init__(self):
|
||||||
|
self.responses = SimpleNamespace(stream=lambda **k: SlowAliveStream())
|
||||||
|
self.api_key = "k"
|
||||||
|
self.base_url = "https://chatgpt.com/backend-api/codex"
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
closed["flag"] = True
|
||||||
|
|
||||||
|
fake_real = FakeClient()
|
||||||
|
wrapper = CodexAuxiliaryClient(fake_real, "gpt-5.5")
|
||||||
|
cache_key = ("openai-codex", False, None, None, None)
|
||||||
|
with _client_cache_lock:
|
||||||
|
_client_cache.clear()
|
||||||
|
_client_cache[cache_key] = (wrapper, "gpt-5.5", None)
|
||||||
|
try:
|
||||||
|
adapter = _CodexCompletionsAdapter(fake_real, "gpt-5.5")
|
||||||
|
with pytest.raises(TimeoutError):
|
||||||
|
adapter.create(
|
||||||
|
messages=[{"role": "user", "content": "x"}],
|
||||||
|
timeout=0.05,
|
||||||
|
)
|
||||||
|
assert closed["flag"] is True, "timeout closer must close inner client"
|
||||||
|
assert cache_key not in _client_cache, (
|
||||||
|
"timeout closer must evict cache entry that wraps the closed client"
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
with _client_cache_lock:
|
||||||
|
_client_cache.clear()
|
||||||
|
|
||||||
|
def test_call_llm_evicts_on_connection_error_with_explicit_provider(self):
|
||||||
|
"""Connection error on an explicit provider must drop the cached client.
|
||||||
|
|
||||||
|
This is the exact reporter scenario: ``auxiliary.compression.provider:
|
||||||
|
main`` (resolves to ``openai-codex``) → no fallback chain runs (not
|
||||||
|
auto), but the cached client was poisoned by a prior timeout and must
|
||||||
|
be evicted so the next call rebuilds.
|
||||||
|
"""
|
||||||
|
from agent.auxiliary_client import _client_cache, _client_cache_lock
|
||||||
|
|
||||||
|
poisoned = MagicMock(name="poisoned_client")
|
||||||
|
poisoned.base_url = "https://chatgpt.com/backend-api/codex"
|
||||||
|
poisoned.chat.completions.create.side_effect = ConnectionError("transport closed")
|
||||||
|
|
||||||
|
cache_key = ("openai-codex", False, None, None, None)
|
||||||
|
with _client_cache_lock:
|
||||||
|
_client_cache.clear()
|
||||||
|
_client_cache[cache_key] = (poisoned, "gpt-5.5", None)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with patch(
|
||||||
|
"agent.auxiliary_client._resolve_task_provider_model",
|
||||||
|
return_value=("openai-codex", "gpt-5.5", None, None, None),
|
||||||
|
), patch(
|
||||||
|
"agent.auxiliary_client._get_cached_client",
|
||||||
|
return_value=(poisoned, "gpt-5.5"),
|
||||||
|
):
|
||||||
|
with pytest.raises(ConnectionError):
|
||||||
|
call_llm(
|
||||||
|
task="compression",
|
||||||
|
messages=[{"role": "user", "content": "x"}],
|
||||||
|
)
|
||||||
|
assert cache_key not in _client_cache, (
|
||||||
|
"connection error must evict cached client so the next call rebuilds"
|
||||||
|
)
|
||||||
|
finally:
|
||||||
|
with _client_cache_lock:
|
||||||
|
_client_cache.clear()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_async_call_llm_evicts_on_connection_error_with_explicit_provider(self):
|
||||||
|
from agent.auxiliary_client import _client_cache, _client_cache_lock
|
||||||
|
|
||||||
|
poisoned = MagicMock(name="poisoned_async_client")
|
||||||
|
poisoned.base_url = "https://chatgpt.com/backend-api/codex"
|
||||||
|
poisoned.chat.completions.create = AsyncMock(side_effect=ConnectionError("transport closed"))
|
||||||
|
|
||||||
|
cache_key = ("openai-codex", True, None, None, None)
|
||||||
|
with _client_cache_lock:
|
||||||
|
_client_cache.clear()
|
||||||
|
_client_cache[cache_key] = (poisoned, "gpt-5.5", None)
|
||||||
|
|
||||||
|
try:
|
||||||
|
with patch(
|
||||||
|
"agent.auxiliary_client._resolve_task_provider_model",
|
||||||
|
return_value=("openai-codex", "gpt-5.5", None, None, None),
|
||||||
|
), patch(
|
||||||
|
"agent.auxiliary_client._get_cached_client",
|
||||||
|
return_value=(poisoned, "gpt-5.5"),
|
||||||
|
):
|
||||||
|
with pytest.raises(ConnectionError):
|
||||||
|
await async_call_llm(
|
||||||
|
task="compression",
|
||||||
|
messages=[{"role": "user", "content": "x"}],
|
||||||
|
)
|
||||||
|
assert cache_key not in _client_cache
|
||||||
|
finally:
|
||||||
|
with _client_cache_lock:
|
||||||
|
_client_cache.clear()
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# _build_call_kwargs — tool dedup at API boundary
|
# _build_call_kwargs — tool dedup at API boundary
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue