mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(bedrock): evict cached boto3 client on stale-connection errors
## Problem
When a pooled HTTPS connection to the Bedrock runtime goes stale (NAT
timeout, VPN flap, server-side TCP RST, proxy idle cull), the next
Converse call surfaces as one of:
* botocore.exceptions.ConnectionClosedError / ReadTimeoutError /
EndpointConnectionError / ConnectTimeoutError
* urllib3.exceptions.ProtocolError
* A bare AssertionError raised from inside urllib3 or botocore
(internal connection-pool invariant check)
The agent loop retries the request 3x, but the cached boto3 client in
_bedrock_runtime_client_cache is reused across retries — so every
attempt hits the same dead connection pool and fails identically.
Only a process restart clears the cache and lets the user keep working.
The bare-AssertionError variant is particularly user-hostile because
str(AssertionError()) is an empty string, so the retry banner shows:
⚠️ API call failed: AssertionError
📝 Error:
with no hint of what went wrong.
## Fix
Add two helpers to agent/bedrock_adapter.py:
* is_stale_connection_error(exc) — classifies exceptions that
indicate dead-client/dead-socket state. Matches botocore
ConnectionError + HTTPClientError subtrees, urllib3
ProtocolError / NewConnectionError, and AssertionError
raised from a frame whose module name starts with urllib3.,
botocore., or boto3.. Application-level AssertionErrors are
intentionally excluded.
* invalidate_runtime_client(region) — per-region counterpart to
the existing reset_client_cache(). Evicts a single cached
client so the next call rebuilds it (and its connection pool).
Wire both into the Converse call sites:
* call_converse() / call_converse_stream() in
bedrock_adapter.py (defense-in-depth for any future caller)
* The two direct client.converse(**kwargs) /
client.converse_stream(**kwargs) call sites in run_agent.py
(the paths the agent loop actually uses)
On a stale-connection exception, the client is evicted and the
exception re-raised unchanged. The agent's existing retry loop then
builds a fresh client on the next attempt and recovers without
requiring a process restart.
## Tests
tests/agent/test_bedrock_adapter.py gets three new classes (14 tests):
* TestInvalidateRuntimeClient — per-region eviction correctness;
non-cached region returns False.
* TestIsStaleConnectionError — classifies botocore
ConnectionClosedError / EndpointConnectionError /
ReadTimeoutError, urllib3 ProtocolError, library-internal
AssertionError (both urllib3.* and botocore.* frames), and
correctly ignores application-level AssertionError and
unrelated exceptions (ValueError, KeyError).
* TestCallConverseInvalidatesOnStaleError — end-to-end: stale
error evicts the cached client, non-stale error (validation)
leaves it alone, successful call leaves it cached.
All 116 tests in test_bedrock_adapter.py pass.
Signed-off-by: Andre Kurait <andrekurait@gmail.com>
This commit is contained in:
parent
7dc6eb9fbf
commit
a9ccb03ccc
3 changed files with 357 additions and 4 deletions
|
|
@ -87,6 +87,114 @@ def reset_client_cache():
|
|||
_bedrock_control_client_cache.clear()
|
||||
|
||||
|
||||
def invalidate_runtime_client(region: str) -> bool:
|
||||
"""Evict the cached ``bedrock-runtime`` client for a single region.
|
||||
|
||||
Per-region counterpart to :func:`reset_client_cache`. Used by the converse
|
||||
call wrappers to discard clients whose underlying HTTP connection has
|
||||
gone stale, so the next call allocates a fresh client (with a fresh
|
||||
connection pool) instead of reusing a dead socket.
|
||||
|
||||
Returns True if a cached entry was evicted, False if the region was not
|
||||
cached.
|
||||
"""
|
||||
existed = region in _bedrock_runtime_client_cache
|
||||
_bedrock_runtime_client_cache.pop(region, None)
|
||||
return existed
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stale-connection detection
|
||||
# ---------------------------------------------------------------------------
|
||||
#
|
||||
# boto3 caches its HTTPS connection pool inside the client object. When a
|
||||
# pooled connection is killed out from under us (NAT timeout, VPN flap,
|
||||
# server-side TCP RST, proxy idle cull, etc.), the next use surfaces as
|
||||
# one of a handful of low-level exceptions — most commonly
|
||||
# ``botocore.exceptions.ConnectionClosedError`` or
|
||||
# ``urllib3.exceptions.ProtocolError``. urllib3 also trips an internal
|
||||
# ``assert`` in a couple of paths (connection pool state checks, chunked
|
||||
# response readers) which bubbles up as a bare ``AssertionError`` with an
|
||||
# empty ``str(exc)``.
|
||||
#
|
||||
# In all of these cases the client is the problem, not the request: retrying
|
||||
# with the same cached client reproduces the failure until the process
|
||||
# restarts. The fix is to evict the region's cached client so the next
|
||||
# attempt builds a new one.
|
||||
|
||||
_STALE_LIB_MODULE_PREFIXES = (
|
||||
"urllib3.",
|
||||
"botocore.",
|
||||
"boto3.",
|
||||
)
|
||||
|
||||
|
||||
def _traceback_frames_modules(exc: BaseException):
|
||||
"""Yield ``__name__``-style module strings for each frame in exc's traceback."""
|
||||
tb = getattr(exc, "__traceback__", None)
|
||||
while tb is not None:
|
||||
frame = tb.tb_frame
|
||||
module = frame.f_globals.get("__name__", "")
|
||||
yield module or ""
|
||||
tb = tb.tb_next
|
||||
|
||||
|
||||
def is_stale_connection_error(exc: BaseException) -> bool:
|
||||
"""Return True if ``exc`` indicates a dead/stale Bedrock HTTP connection.
|
||||
|
||||
Matches:
|
||||
* ``botocore.exceptions.ConnectionError`` and subclasses
|
||||
(``ConnectionClosedError``, ``EndpointConnectionError``,
|
||||
``ReadTimeoutError``, ``ConnectTimeoutError``).
|
||||
* ``urllib3.exceptions.ProtocolError`` / ``NewConnectionError`` /
|
||||
``ConnectionError`` (best-effort import — urllib3 is a transitive
|
||||
dependency of botocore so it is always available in practice).
|
||||
* Bare ``AssertionError`` raised from a frame inside urllib3, botocore,
|
||||
or boto3. These are internal-invariant failures (typically triggered
|
||||
by corrupted connection-pool state after a dropped socket) and are
|
||||
recoverable by swapping the client.
|
||||
|
||||
Non-library ``AssertionError``s (from application code or tests) are
|
||||
intentionally not matched — only library-internal asserts signal stale
|
||||
connection state.
|
||||
"""
|
||||
# botocore: the canonical signal — HTTPClientError is the umbrella for
|
||||
# ConnectionClosedError, ReadTimeoutError, EndpointConnectionError,
|
||||
# ConnectTimeoutError, and ProxyConnectionError. ConnectionError covers
|
||||
# the same family via a different branch of the hierarchy.
|
||||
try:
|
||||
from botocore.exceptions import (
|
||||
ConnectionError as BotoConnectionError,
|
||||
HTTPClientError,
|
||||
)
|
||||
botocore_errors: tuple = (BotoConnectionError, HTTPClientError)
|
||||
except ImportError: # pragma: no cover — botocore always present with boto3
|
||||
botocore_errors = ()
|
||||
if botocore_errors and isinstance(exc, botocore_errors):
|
||||
return True
|
||||
|
||||
# urllib3: low-level transport failures
|
||||
try:
|
||||
from urllib3.exceptions import (
|
||||
ProtocolError,
|
||||
NewConnectionError,
|
||||
ConnectionError as Urllib3ConnectionError,
|
||||
)
|
||||
urllib3_errors = (ProtocolError, NewConnectionError, Urllib3ConnectionError)
|
||||
except ImportError: # pragma: no cover
|
||||
urllib3_errors = ()
|
||||
if urllib3_errors and isinstance(exc, urllib3_errors):
|
||||
return True
|
||||
|
||||
# Library-internal AssertionError (urllib3 / botocore / boto3)
|
||||
if isinstance(exc, AssertionError):
|
||||
for module in _traceback_frames_modules(exc):
|
||||
if any(module.startswith(prefix) for prefix in _STALE_LIB_MODULE_PREFIXES):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AWS credential detection
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
@ -787,7 +895,17 @@ def call_converse(
|
|||
guardrail_config=guardrail_config,
|
||||
)
|
||||
|
||||
try:
|
||||
response = client.converse(**kwargs)
|
||||
except Exception as exc:
|
||||
if is_stale_connection_error(exc):
|
||||
logger.warning(
|
||||
"bedrock: stale-connection error on converse(region=%s, model=%s): "
|
||||
"%s — evicting cached client so the next call reconnects.",
|
||||
region, model, type(exc).__name__,
|
||||
)
|
||||
invalidate_runtime_client(region)
|
||||
raise
|
||||
return normalize_converse_response(response)
|
||||
|
||||
|
||||
|
|
@ -819,7 +937,17 @@ def call_converse_stream(
|
|||
guardrail_config=guardrail_config,
|
||||
)
|
||||
|
||||
try:
|
||||
response = client.converse_stream(**kwargs)
|
||||
except Exception as exc:
|
||||
if is_stale_connection_error(exc):
|
||||
logger.warning(
|
||||
"bedrock: stale-connection error on converse_stream(region=%s, "
|
||||
"model=%s): %s — evicting cached client so the next call reconnects.",
|
||||
region, model, type(exc).__name__,
|
||||
)
|
||||
invalidate_runtime_client(region)
|
||||
raise
|
||||
return normalize_converse_stream_events(response)
|
||||
|
||||
|
||||
|
|
|
|||
18
run_agent.py
18
run_agent.py
|
|
@ -5467,12 +5467,21 @@ class AIAgent:
|
|||
# bedrock responses like chat_completions responses.
|
||||
from agent.bedrock_adapter import (
|
||||
_get_bedrock_runtime_client,
|
||||
invalidate_runtime_client,
|
||||
is_stale_connection_error,
|
||||
normalize_converse_response,
|
||||
)
|
||||
region = api_kwargs.pop("__bedrock_region__", "us-east-1")
|
||||
api_kwargs.pop("__bedrock_converse__", None)
|
||||
client = _get_bedrock_runtime_client(region)
|
||||
try:
|
||||
raw_response = client.converse(**api_kwargs)
|
||||
except Exception as _bedrock_exc:
|
||||
# Evict the cached client on stale-connection failures
|
||||
# so the outer retry loop builds a fresh client/pool.
|
||||
if is_stale_connection_error(_bedrock_exc):
|
||||
invalidate_runtime_client(region)
|
||||
raise
|
||||
result["response"] = normalize_converse_response(raw_response)
|
||||
else:
|
||||
request_client_holder["client"] = self._create_request_openai_client(reason="chat_completion_request")
|
||||
|
|
@ -5725,12 +5734,21 @@ class AIAgent:
|
|||
try:
|
||||
from agent.bedrock_adapter import (
|
||||
_get_bedrock_runtime_client,
|
||||
invalidate_runtime_client,
|
||||
is_stale_connection_error,
|
||||
stream_converse_with_callbacks,
|
||||
)
|
||||
region = api_kwargs.pop("__bedrock_region__", "us-east-1")
|
||||
api_kwargs.pop("__bedrock_converse__", None)
|
||||
client = _get_bedrock_runtime_client(region)
|
||||
try:
|
||||
raw_response = client.converse_stream(**api_kwargs)
|
||||
except Exception as _bedrock_exc:
|
||||
# Evict the cached client on stale-connection failures
|
||||
# so the outer retry loop builds a fresh client/pool.
|
||||
if is_stale_connection_error(_bedrock_exc):
|
||||
invalidate_runtime_client(region)
|
||||
raise
|
||||
|
||||
def _on_text(text):
|
||||
_fire_first()
|
||||
|
|
|
|||
|
|
@ -1230,3 +1230,210 @@ class TestEmptyTextBlockFix:
|
|||
from agent.bedrock_adapter import _convert_content_to_converse
|
||||
blocks = _convert_content_to_converse("Hello")
|
||||
assert blocks[0]["text"] == "Hello"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Stale-connection detection and per-region client invalidation
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestInvalidateRuntimeClient:
|
||||
"""Per-region eviction used to discard dead/stale bedrock-runtime clients."""
|
||||
|
||||
def test_evicts_only_the_target_region(self):
|
||||
from agent.bedrock_adapter import (
|
||||
_bedrock_runtime_client_cache,
|
||||
invalidate_runtime_client,
|
||||
reset_client_cache,
|
||||
)
|
||||
reset_client_cache()
|
||||
_bedrock_runtime_client_cache["us-east-1"] = "dead-client"
|
||||
_bedrock_runtime_client_cache["us-west-2"] = "live-client"
|
||||
|
||||
evicted = invalidate_runtime_client("us-east-1")
|
||||
|
||||
assert evicted is True
|
||||
assert "us-east-1" not in _bedrock_runtime_client_cache
|
||||
assert _bedrock_runtime_client_cache["us-west-2"] == "live-client"
|
||||
|
||||
def test_returns_false_when_region_not_cached(self):
|
||||
from agent.bedrock_adapter import invalidate_runtime_client, reset_client_cache
|
||||
reset_client_cache()
|
||||
assert invalidate_runtime_client("eu-west-1") is False
|
||||
|
||||
|
||||
class TestIsStaleConnectionError:
|
||||
"""Classifier that decides whether an exception warrants client eviction."""
|
||||
|
||||
def test_detects_botocore_connection_closed_error(self):
|
||||
from agent.bedrock_adapter import is_stale_connection_error
|
||||
from botocore.exceptions import ConnectionClosedError
|
||||
exc = ConnectionClosedError(endpoint_url="https://bedrock.example")
|
||||
assert is_stale_connection_error(exc) is True
|
||||
|
||||
def test_detects_botocore_endpoint_connection_error(self):
|
||||
from agent.bedrock_adapter import is_stale_connection_error
|
||||
from botocore.exceptions import EndpointConnectionError
|
||||
exc = EndpointConnectionError(endpoint_url="https://bedrock.example")
|
||||
assert is_stale_connection_error(exc) is True
|
||||
|
||||
def test_detects_botocore_read_timeout(self):
|
||||
from agent.bedrock_adapter import is_stale_connection_error
|
||||
from botocore.exceptions import ReadTimeoutError
|
||||
exc = ReadTimeoutError(endpoint_url="https://bedrock.example")
|
||||
assert is_stale_connection_error(exc) is True
|
||||
|
||||
def test_detects_urllib3_protocol_error(self):
|
||||
from agent.bedrock_adapter import is_stale_connection_error
|
||||
from urllib3.exceptions import ProtocolError
|
||||
exc = ProtocolError("Connection broken")
|
||||
assert is_stale_connection_error(exc) is True
|
||||
|
||||
def test_detects_library_internal_assertion_error(self):
|
||||
"""A bare AssertionError raised from inside urllib3/botocore signals
|
||||
a corrupted connection-pool invariant and should trigger eviction."""
|
||||
from agent.bedrock_adapter import is_stale_connection_error
|
||||
|
||||
# Fabricate an AssertionError whose traceback's last frame belongs
|
||||
# to a module named "urllib3.connectionpool". We do this by exec'ing
|
||||
# a tiny `assert False` under a fake globals dict — the resulting
|
||||
# frame's ``f_globals["__name__"]`` is what the classifier inspects.
|
||||
fake_globals = {"__name__": "urllib3.connectionpool"}
|
||||
try:
|
||||
exec("def _boom():\n assert False\n_boom()", fake_globals)
|
||||
except AssertionError as exc:
|
||||
assert is_stale_connection_error(exc) is True
|
||||
else:
|
||||
pytest.fail("AssertionError not raised")
|
||||
|
||||
def test_detects_botocore_internal_assertion_error(self):
|
||||
"""Same as above but for a frame inside the botocore namespace."""
|
||||
from agent.bedrock_adapter import is_stale_connection_error
|
||||
fake_globals = {"__name__": "botocore.httpsession"}
|
||||
try:
|
||||
exec("def _boom():\n assert False\n_boom()", fake_globals)
|
||||
except AssertionError as exc:
|
||||
assert is_stale_connection_error(exc) is True
|
||||
else:
|
||||
pytest.fail("AssertionError not raised")
|
||||
|
||||
def test_ignores_application_assertion_error(self):
|
||||
"""AssertionError from application code (not urllib3/botocore) should
|
||||
NOT be classified as stale — those are real test/code bugs."""
|
||||
from agent.bedrock_adapter import is_stale_connection_error
|
||||
try:
|
||||
assert False, "test-only" # noqa: B011
|
||||
except AssertionError as exc:
|
||||
assert is_stale_connection_error(exc) is False
|
||||
|
||||
def test_ignores_unrelated_exceptions(self):
|
||||
from agent.bedrock_adapter import is_stale_connection_error
|
||||
assert is_stale_connection_error(ValueError("bad input")) is False
|
||||
assert is_stale_connection_error(KeyError("missing")) is False
|
||||
|
||||
|
||||
class TestCallConverseInvalidatesOnStaleError:
|
||||
"""call_converse / call_converse_stream evict the cached client when the
|
||||
boto3 call raises a stale-connection error — so the next invocation
|
||||
reconnects instead of reusing the dead socket."""
|
||||
|
||||
def test_converse_evicts_client_on_stale_error(self):
|
||||
from agent.bedrock_adapter import (
|
||||
_bedrock_runtime_client_cache,
|
||||
call_converse,
|
||||
reset_client_cache,
|
||||
)
|
||||
from botocore.exceptions import ConnectionClosedError
|
||||
|
||||
reset_client_cache()
|
||||
dead_client = MagicMock()
|
||||
dead_client.converse.side_effect = ConnectionClosedError(
|
||||
endpoint_url="https://bedrock.example",
|
||||
)
|
||||
_bedrock_runtime_client_cache["us-east-1"] = dead_client
|
||||
|
||||
with pytest.raises(ConnectionClosedError):
|
||||
call_converse(
|
||||
region="us-east-1",
|
||||
model="anthropic.claude-3-sonnet-20240229-v1:0",
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
)
|
||||
|
||||
assert "us-east-1" not in _bedrock_runtime_client_cache, (
|
||||
"stale client should have been evicted so the retry reconnects"
|
||||
)
|
||||
|
||||
def test_converse_stream_evicts_client_on_stale_error(self):
|
||||
from agent.bedrock_adapter import (
|
||||
_bedrock_runtime_client_cache,
|
||||
call_converse_stream,
|
||||
reset_client_cache,
|
||||
)
|
||||
from botocore.exceptions import ConnectionClosedError
|
||||
|
||||
reset_client_cache()
|
||||
dead_client = MagicMock()
|
||||
dead_client.converse_stream.side_effect = ConnectionClosedError(
|
||||
endpoint_url="https://bedrock.example",
|
||||
)
|
||||
_bedrock_runtime_client_cache["us-east-1"] = dead_client
|
||||
|
||||
with pytest.raises(ConnectionClosedError):
|
||||
call_converse_stream(
|
||||
region="us-east-1",
|
||||
model="anthropic.claude-3-sonnet-20240229-v1:0",
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
)
|
||||
|
||||
assert "us-east-1" not in _bedrock_runtime_client_cache
|
||||
|
||||
def test_converse_does_not_evict_on_non_stale_error(self):
|
||||
"""Non-stale errors (e.g. ValidationException) leave the client cache alone."""
|
||||
from agent.bedrock_adapter import (
|
||||
_bedrock_runtime_client_cache,
|
||||
call_converse,
|
||||
reset_client_cache,
|
||||
)
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
reset_client_cache()
|
||||
live_client = MagicMock()
|
||||
live_client.converse.side_effect = ClientError(
|
||||
error_response={"Error": {"Code": "ValidationException", "Message": "bad"}},
|
||||
operation_name="Converse",
|
||||
)
|
||||
_bedrock_runtime_client_cache["us-east-1"] = live_client
|
||||
|
||||
with pytest.raises(ClientError):
|
||||
call_converse(
|
||||
region="us-east-1",
|
||||
model="anthropic.claude-3-sonnet-20240229-v1:0",
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
)
|
||||
|
||||
assert _bedrock_runtime_client_cache.get("us-east-1") is live_client, (
|
||||
"validation errors do not indicate a dead connection — keep the client"
|
||||
)
|
||||
|
||||
def test_converse_leaves_successful_client_in_cache(self):
|
||||
from agent.bedrock_adapter import (
|
||||
_bedrock_runtime_client_cache,
|
||||
call_converse,
|
||||
reset_client_cache,
|
||||
)
|
||||
|
||||
reset_client_cache()
|
||||
live_client = MagicMock()
|
||||
live_client.converse.return_value = {
|
||||
"output": {"message": {"role": "assistant", "content": [{"text": "hi"}]}},
|
||||
"stopReason": "end_turn",
|
||||
"usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 2},
|
||||
}
|
||||
_bedrock_runtime_client_cache["us-east-1"] = live_client
|
||||
|
||||
call_converse(
|
||||
region="us-east-1",
|
||||
model="anthropic.claude-3-sonnet-20240229-v1:0",
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
)
|
||||
|
||||
assert _bedrock_runtime_client_cache.get("us-east-1") is live_client
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue