From dae5782e212e673cbfe68f4e746f021c429ba899 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Thu, 23 Apr 2026 20:13:48 +0000 Subject: [PATCH] fix(bedrock): evict cached boto3 client on stale-connection errors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem When a pooled HTTPS connection to the Bedrock runtime goes stale (NAT timeout, VPN flap, server-side TCP RST, proxy idle cull), the next Converse call surfaces as one of: * botocore.exceptions.ConnectionClosedError / ReadTimeoutError / EndpointConnectionError / ConnectTimeoutError * urllib3.exceptions.ProtocolError * A bare AssertionError raised from inside urllib3 or botocore (internal connection-pool invariant check) The agent loop retries the request 3x, but the cached boto3 client in _bedrock_runtime_client_cache is reused across retries — so every attempt hits the same dead connection pool and fails identically. Only a process restart clears the cache and lets the user keep working. The bare-AssertionError variant is particularly user-hostile because str(AssertionError()) is an empty string, so the retry banner shows: ⚠️ API call failed: AssertionError 📝 Error: with no hint of what went wrong. ## Fix Add two helpers to agent/bedrock_adapter.py: * is_stale_connection_error(exc) — classifies exceptions that indicate dead-client/dead-socket state. Matches botocore ConnectionError + HTTPClientError subtrees, urllib3 ProtocolError / NewConnectionError, and AssertionError raised from a frame whose module name starts with urllib3., botocore., or boto3.. Application-level AssertionErrors are intentionally excluded. * invalidate_runtime_client(region) — per-region counterpart to the existing reset_client_cache(). Evicts a single cached client so the next call rebuilds it (and its connection pool). Wire both into the Converse call sites: * call_converse() / call_converse_stream() in bedrock_adapter.py (defense-in-depth for any future caller) * The two direct client.converse(**kwargs) / client.converse_stream(**kwargs) call sites in run_agent.py (the paths the agent loop actually uses) On a stale-connection exception, the client is evicted and the exception re-raised unchanged. The agent's existing retry loop then builds a fresh client on the next attempt and recovers without requiring a process restart. ## Tests tests/agent/test_bedrock_adapter.py gets three new classes (14 tests): * TestInvalidateRuntimeClient — per-region eviction correctness; non-cached region returns False. * TestIsStaleConnectionError — classifies botocore ConnectionClosedError / EndpointConnectionError / ReadTimeoutError, urllib3 ProtocolError, library-internal AssertionError (both urllib3.* and botocore.* frames), and correctly ignores application-level AssertionError and unrelated exceptions (ValueError, KeyError). * TestCallConverseInvalidatesOnStaleError — end-to-end: stale error evicts the cached client, non-stale error (validation) leaves it alone, successful call leaves it cached. All 116 tests in test_bedrock_adapter.py pass. Signed-off-by: Andre Kurait --- agent/bedrock_adapter.py | 132 +++++++++++++++++- run_agent.py | 22 ++- tests/agent/test_bedrock_adapter.py | 207 ++++++++++++++++++++++++++++ 3 files changed, 357 insertions(+), 4 deletions(-) diff --git a/agent/bedrock_adapter.py b/agent/bedrock_adapter.py index 9e4297581..48674a562 100644 --- a/agent/bedrock_adapter.py +++ b/agent/bedrock_adapter.py @@ -87,6 +87,114 @@ def reset_client_cache(): _bedrock_control_client_cache.clear() +def invalidate_runtime_client(region: str) -> bool: + """Evict the cached ``bedrock-runtime`` client for a single region. + + Per-region counterpart to :func:`reset_client_cache`. Used by the converse + call wrappers to discard clients whose underlying HTTP connection has + gone stale, so the next call allocates a fresh client (with a fresh + connection pool) instead of reusing a dead socket. + + Returns True if a cached entry was evicted, False if the region was not + cached. + """ + existed = region in _bedrock_runtime_client_cache + _bedrock_runtime_client_cache.pop(region, None) + return existed + + +# --------------------------------------------------------------------------- +# Stale-connection detection +# --------------------------------------------------------------------------- +# +# boto3 caches its HTTPS connection pool inside the client object. When a +# pooled connection is killed out from under us (NAT timeout, VPN flap, +# server-side TCP RST, proxy idle cull, etc.), the next use surfaces as +# one of a handful of low-level exceptions — most commonly +# ``botocore.exceptions.ConnectionClosedError`` or +# ``urllib3.exceptions.ProtocolError``. urllib3 also trips an internal +# ``assert`` in a couple of paths (connection pool state checks, chunked +# response readers) which bubbles up as a bare ``AssertionError`` with an +# empty ``str(exc)``. +# +# In all of these cases the client is the problem, not the request: retrying +# with the same cached client reproduces the failure until the process +# restarts. The fix is to evict the region's cached client so the next +# attempt builds a new one. + +_STALE_LIB_MODULE_PREFIXES = ( + "urllib3.", + "botocore.", + "boto3.", +) + + +def _traceback_frames_modules(exc: BaseException): + """Yield ``__name__``-style module strings for each frame in exc's traceback.""" + tb = getattr(exc, "__traceback__", None) + while tb is not None: + frame = tb.tb_frame + module = frame.f_globals.get("__name__", "") + yield module or "" + tb = tb.tb_next + + +def is_stale_connection_error(exc: BaseException) -> bool: + """Return True if ``exc`` indicates a dead/stale Bedrock HTTP connection. + + Matches: + * ``botocore.exceptions.ConnectionError`` and subclasses + (``ConnectionClosedError``, ``EndpointConnectionError``, + ``ReadTimeoutError``, ``ConnectTimeoutError``). + * ``urllib3.exceptions.ProtocolError`` / ``NewConnectionError`` / + ``ConnectionError`` (best-effort import — urllib3 is a transitive + dependency of botocore so it is always available in practice). + * Bare ``AssertionError`` raised from a frame inside urllib3, botocore, + or boto3. These are internal-invariant failures (typically triggered + by corrupted connection-pool state after a dropped socket) and are + recoverable by swapping the client. + + Non-library ``AssertionError``s (from application code or tests) are + intentionally not matched — only library-internal asserts signal stale + connection state. + """ + # botocore: the canonical signal — HTTPClientError is the umbrella for + # ConnectionClosedError, ReadTimeoutError, EndpointConnectionError, + # ConnectTimeoutError, and ProxyConnectionError. ConnectionError covers + # the same family via a different branch of the hierarchy. + try: + from botocore.exceptions import ( + ConnectionError as BotoConnectionError, + HTTPClientError, + ) + botocore_errors: tuple = (BotoConnectionError, HTTPClientError) + except ImportError: # pragma: no cover — botocore always present with boto3 + botocore_errors = () + if botocore_errors and isinstance(exc, botocore_errors): + return True + + # urllib3: low-level transport failures + try: + from urllib3.exceptions import ( + ProtocolError, + NewConnectionError, + ConnectionError as Urllib3ConnectionError, + ) + urllib3_errors = (ProtocolError, NewConnectionError, Urllib3ConnectionError) + except ImportError: # pragma: no cover + urllib3_errors = () + if urllib3_errors and isinstance(exc, urllib3_errors): + return True + + # Library-internal AssertionError (urllib3 / botocore / boto3) + if isinstance(exc, AssertionError): + for module in _traceback_frames_modules(exc): + if any(module.startswith(prefix) for prefix in _STALE_LIB_MODULE_PREFIXES): + return True + + return False + + # --------------------------------------------------------------------------- # AWS credential detection # --------------------------------------------------------------------------- @@ -787,7 +895,17 @@ def call_converse( guardrail_config=guardrail_config, ) - response = client.converse(**kwargs) + try: + response = client.converse(**kwargs) + except Exception as exc: + if is_stale_connection_error(exc): + logger.warning( + "bedrock: stale-connection error on converse(region=%s, model=%s): " + "%s — evicting cached client so the next call reconnects.", + region, model, type(exc).__name__, + ) + invalidate_runtime_client(region) + raise return normalize_converse_response(response) @@ -819,7 +937,17 @@ def call_converse_stream( guardrail_config=guardrail_config, ) - response = client.converse_stream(**kwargs) + try: + response = client.converse_stream(**kwargs) + except Exception as exc: + if is_stale_connection_error(exc): + logger.warning( + "bedrock: stale-connection error on converse_stream(region=%s, " + "model=%s): %s — evicting cached client so the next call reconnects.", + region, model, type(exc).__name__, + ) + invalidate_runtime_client(region) + raise return normalize_converse_stream_events(response) diff --git a/run_agent.py b/run_agent.py index 855b67a84..3265b2292 100644 --- a/run_agent.py +++ b/run_agent.py @@ -5255,12 +5255,21 @@ class AIAgent: # bedrock responses like chat_completions responses. from agent.bedrock_adapter import ( _get_bedrock_runtime_client, + invalidate_runtime_client, + is_stale_connection_error, normalize_converse_response, ) region = api_kwargs.pop("__bedrock_region__", "us-east-1") api_kwargs.pop("__bedrock_converse__", None) client = _get_bedrock_runtime_client(region) - raw_response = client.converse(**api_kwargs) + try: + raw_response = client.converse(**api_kwargs) + except Exception as _bedrock_exc: + # Evict the cached client on stale-connection failures + # so the outer retry loop builds a fresh client/pool. + if is_stale_connection_error(_bedrock_exc): + invalidate_runtime_client(region) + raise result["response"] = normalize_converse_response(raw_response) else: request_client_holder["client"] = self._create_request_openai_client(reason="chat_completion_request") @@ -5513,12 +5522,21 @@ class AIAgent: try: from agent.bedrock_adapter import ( _get_bedrock_runtime_client, + invalidate_runtime_client, + is_stale_connection_error, stream_converse_with_callbacks, ) region = api_kwargs.pop("__bedrock_region__", "us-east-1") api_kwargs.pop("__bedrock_converse__", None) client = _get_bedrock_runtime_client(region) - raw_response = client.converse_stream(**api_kwargs) + try: + raw_response = client.converse_stream(**api_kwargs) + except Exception as _bedrock_exc: + # Evict the cached client on stale-connection failures + # so the outer retry loop builds a fresh client/pool. + if is_stale_connection_error(_bedrock_exc): + invalidate_runtime_client(region) + raise def _on_text(text): _fire_first() diff --git a/tests/agent/test_bedrock_adapter.py b/tests/agent/test_bedrock_adapter.py index d12be7b88..fea136604 100644 --- a/tests/agent/test_bedrock_adapter.py +++ b/tests/agent/test_bedrock_adapter.py @@ -1230,3 +1230,210 @@ class TestEmptyTextBlockFix: from agent.bedrock_adapter import _convert_content_to_converse blocks = _convert_content_to_converse("Hello") assert blocks[0]["text"] == "Hello" + + +# --------------------------------------------------------------------------- +# Stale-connection detection and per-region client invalidation +# --------------------------------------------------------------------------- + +class TestInvalidateRuntimeClient: + """Per-region eviction used to discard dead/stale bedrock-runtime clients.""" + + def test_evicts_only_the_target_region(self): + from agent.bedrock_adapter import ( + _bedrock_runtime_client_cache, + invalidate_runtime_client, + reset_client_cache, + ) + reset_client_cache() + _bedrock_runtime_client_cache["us-east-1"] = "dead-client" + _bedrock_runtime_client_cache["us-west-2"] = "live-client" + + evicted = invalidate_runtime_client("us-east-1") + + assert evicted is True + assert "us-east-1" not in _bedrock_runtime_client_cache + assert _bedrock_runtime_client_cache["us-west-2"] == "live-client" + + def test_returns_false_when_region_not_cached(self): + from agent.bedrock_adapter import invalidate_runtime_client, reset_client_cache + reset_client_cache() + assert invalidate_runtime_client("eu-west-1") is False + + +class TestIsStaleConnectionError: + """Classifier that decides whether an exception warrants client eviction.""" + + def test_detects_botocore_connection_closed_error(self): + from agent.bedrock_adapter import is_stale_connection_error + from botocore.exceptions import ConnectionClosedError + exc = ConnectionClosedError(endpoint_url="https://bedrock.example") + assert is_stale_connection_error(exc) is True + + def test_detects_botocore_endpoint_connection_error(self): + from agent.bedrock_adapter import is_stale_connection_error + from botocore.exceptions import EndpointConnectionError + exc = EndpointConnectionError(endpoint_url="https://bedrock.example") + assert is_stale_connection_error(exc) is True + + def test_detects_botocore_read_timeout(self): + from agent.bedrock_adapter import is_stale_connection_error + from botocore.exceptions import ReadTimeoutError + exc = ReadTimeoutError(endpoint_url="https://bedrock.example") + assert is_stale_connection_error(exc) is True + + def test_detects_urllib3_protocol_error(self): + from agent.bedrock_adapter import is_stale_connection_error + from urllib3.exceptions import ProtocolError + exc = ProtocolError("Connection broken") + assert is_stale_connection_error(exc) is True + + def test_detects_library_internal_assertion_error(self): + """A bare AssertionError raised from inside urllib3/botocore signals + a corrupted connection-pool invariant and should trigger eviction.""" + from agent.bedrock_adapter import is_stale_connection_error + + # Fabricate an AssertionError whose traceback's last frame belongs + # to a module named "urllib3.connectionpool". We do this by exec'ing + # a tiny `assert False` under a fake globals dict — the resulting + # frame's ``f_globals["__name__"]`` is what the classifier inspects. + fake_globals = {"__name__": "urllib3.connectionpool"} + try: + exec("def _boom():\n assert False\n_boom()", fake_globals) + except AssertionError as exc: + assert is_stale_connection_error(exc) is True + else: + pytest.fail("AssertionError not raised") + + def test_detects_botocore_internal_assertion_error(self): + """Same as above but for a frame inside the botocore namespace.""" + from agent.bedrock_adapter import is_stale_connection_error + fake_globals = {"__name__": "botocore.httpsession"} + try: + exec("def _boom():\n assert False\n_boom()", fake_globals) + except AssertionError as exc: + assert is_stale_connection_error(exc) is True + else: + pytest.fail("AssertionError not raised") + + def test_ignores_application_assertion_error(self): + """AssertionError from application code (not urllib3/botocore) should + NOT be classified as stale — those are real test/code bugs.""" + from agent.bedrock_adapter import is_stale_connection_error + try: + assert False, "test-only" # noqa: B011 + except AssertionError as exc: + assert is_stale_connection_error(exc) is False + + def test_ignores_unrelated_exceptions(self): + from agent.bedrock_adapter import is_stale_connection_error + assert is_stale_connection_error(ValueError("bad input")) is False + assert is_stale_connection_error(KeyError("missing")) is False + + +class TestCallConverseInvalidatesOnStaleError: + """call_converse / call_converse_stream evict the cached client when the + boto3 call raises a stale-connection error — so the next invocation + reconnects instead of reusing the dead socket.""" + + def test_converse_evicts_client_on_stale_error(self): + from agent.bedrock_adapter import ( + _bedrock_runtime_client_cache, + call_converse, + reset_client_cache, + ) + from botocore.exceptions import ConnectionClosedError + + reset_client_cache() + dead_client = MagicMock() + dead_client.converse.side_effect = ConnectionClosedError( + endpoint_url="https://bedrock.example", + ) + _bedrock_runtime_client_cache["us-east-1"] = dead_client + + with pytest.raises(ConnectionClosedError): + call_converse( + region="us-east-1", + model="anthropic.claude-3-sonnet-20240229-v1:0", + messages=[{"role": "user", "content": "hi"}], + ) + + assert "us-east-1" not in _bedrock_runtime_client_cache, ( + "stale client should have been evicted so the retry reconnects" + ) + + def test_converse_stream_evicts_client_on_stale_error(self): + from agent.bedrock_adapter import ( + _bedrock_runtime_client_cache, + call_converse_stream, + reset_client_cache, + ) + from botocore.exceptions import ConnectionClosedError + + reset_client_cache() + dead_client = MagicMock() + dead_client.converse_stream.side_effect = ConnectionClosedError( + endpoint_url="https://bedrock.example", + ) + _bedrock_runtime_client_cache["us-east-1"] = dead_client + + with pytest.raises(ConnectionClosedError): + call_converse_stream( + region="us-east-1", + model="anthropic.claude-3-sonnet-20240229-v1:0", + messages=[{"role": "user", "content": "hi"}], + ) + + assert "us-east-1" not in _bedrock_runtime_client_cache + + def test_converse_does_not_evict_on_non_stale_error(self): + """Non-stale errors (e.g. ValidationException) leave the client cache alone.""" + from agent.bedrock_adapter import ( + _bedrock_runtime_client_cache, + call_converse, + reset_client_cache, + ) + from botocore.exceptions import ClientError + + reset_client_cache() + live_client = MagicMock() + live_client.converse.side_effect = ClientError( + error_response={"Error": {"Code": "ValidationException", "Message": "bad"}}, + operation_name="Converse", + ) + _bedrock_runtime_client_cache["us-east-1"] = live_client + + with pytest.raises(ClientError): + call_converse( + region="us-east-1", + model="anthropic.claude-3-sonnet-20240229-v1:0", + messages=[{"role": "user", "content": "hi"}], + ) + + assert _bedrock_runtime_client_cache.get("us-east-1") is live_client, ( + "validation errors do not indicate a dead connection — keep the client" + ) + + def test_converse_leaves_successful_client_in_cache(self): + from agent.bedrock_adapter import ( + _bedrock_runtime_client_cache, + call_converse, + reset_client_cache, + ) + + reset_client_cache() + live_client = MagicMock() + live_client.converse.return_value = { + "output": {"message": {"role": "assistant", "content": [{"text": "hi"}]}}, + "stopReason": "end_turn", + "usage": {"inputTokens": 1, "outputTokens": 1, "totalTokens": 2}, + } + _bedrock_runtime_client_cache["us-east-1"] = live_client + + call_converse( + region="us-east-1", + model="anthropic.claude-3-sonnet-20240229-v1:0", + messages=[{"role": "user", "content": "hi"}], + ) + + assert _bedrock_runtime_client_cache.get("us-east-1") is live_client