hermes-agent/agent/bedrock_adapter.py
Andre Kurait dae5782e21
fix(bedrock): evict cached boto3 client on stale-connection errors
## Problem

When a pooled HTTPS connection to the Bedrock runtime goes stale (NAT
timeout, VPN flap, server-side TCP RST, proxy idle cull), the next
Converse call surfaces as one of:

  * botocore.exceptions.ConnectionClosedError / ReadTimeoutError /
    EndpointConnectionError / ConnectTimeoutError
  * urllib3.exceptions.ProtocolError
  * A bare AssertionError raised from inside urllib3 or botocore
    (internal connection-pool invariant check)

The agent loop retries the request 3x, but the cached boto3 client in
_bedrock_runtime_client_cache is reused across retries — so every
attempt hits the same dead connection pool and fails identically.
Only a process restart clears the cache and lets the user keep working.

The bare-AssertionError variant is particularly user-hostile because
str(AssertionError()) is an empty string, so the retry banner shows:

    ⚠️  API call failed: AssertionError
       📝 Error:

with no hint of what went wrong.

## Fix

Add two helpers to agent/bedrock_adapter.py:

  * is_stale_connection_error(exc) — classifies exceptions that
    indicate dead-client/dead-socket state. Matches botocore
    ConnectionError + HTTPClientError subtrees, urllib3
    ProtocolError / NewConnectionError, and AssertionError
    raised from a frame whose module name starts with urllib3.,
    botocore., or boto3.. Application-level AssertionErrors are
    intentionally excluded.

  * invalidate_runtime_client(region) — per-region counterpart to
    the existing reset_client_cache(). Evicts a single cached
    client so the next call rebuilds it (and its connection pool).

Wire both into the Converse call sites:

  * call_converse() / call_converse_stream() in
    bedrock_adapter.py (defense-in-depth for any future caller)
  * The two direct client.converse(**kwargs) /
    client.converse_stream(**kwargs) call sites in run_agent.py
    (the paths the agent loop actually uses)

On a stale-connection exception, the client is evicted and the
exception re-raised unchanged. The agent's existing retry loop then
builds a fresh client on the next attempt and recovers without
requiring a process restart.

## Tests

tests/agent/test_bedrock_adapter.py gets three new classes (14 tests):

  * TestInvalidateRuntimeClient — per-region eviction correctness;
    non-cached region returns False.
  * TestIsStaleConnectionError — classifies botocore
    ConnectionClosedError / EndpointConnectionError /
    ReadTimeoutError, urllib3 ProtocolError, library-internal
    AssertionError (both urllib3.* and botocore.* frames), and
    correctly ignores application-level AssertionError and
    unrelated exceptions (ValueError, KeyError).
  * TestCallConverseInvalidatesOnStaleError — end-to-end: stale
    error evicts the cached client, non-stale error (validation)
    leaves it alone, successful call leaves it cached.

All 116 tests in test_bedrock_adapter.py pass.

Signed-off-by: Andre Kurait <andrekurait@gmail.com>
2026-04-23 20:26:55 +00:00

1226 lines
47 KiB
Python

"""AWS Bedrock Converse API adapter for Hermes Agent.
Provides native integration with Amazon Bedrock using the Converse API,
bypassing the OpenAI-compatible endpoint in favor of direct AWS SDK calls.
This enables full access to the Bedrock ecosystem:
- **Native Converse API**: Unified interface for all Bedrock models
(Claude, Nova, Llama, Mistral, etc.) with streaming support.
- **AWS credential chain**: IAM roles, SSO profiles, environment variables,
instance metadata — zero API key management for AWS-native environments.
- **Dynamic model discovery**: Auto-discovers available foundation models
and cross-region inference profiles via the Bedrock control plane.
- **Guardrails support**: Optional Bedrock Guardrails configuration for
content filtering and safety policies.
- **Inference profiles**: Supports cross-region inference profiles
(us.anthropic.claude-*, global.anthropic.claude-*) for better capacity
and automatic failover.
Architecture follows the same pattern as ``anthropic_adapter.py``:
- All Bedrock-specific logic is isolated in this module.
- Messages/tools are converted between OpenAI format and Converse format.
- Responses are normalized back to OpenAI-compatible objects for the agent loop.
Reference: OpenClaw's ``extensions/amazon-bedrock/`` plugin, which implements
the same Converse API integration in TypeScript via ``@aws-sdk/client-bedrock``.
Requires: ``boto3`` (optional dependency — only needed when using the Bedrock provider).
"""
import json
import logging
import os
import re
from types import SimpleNamespace
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Lazy boto3 import — only loaded when the Bedrock provider is actually used.
# This keeps startup fast for users who don't use Bedrock.
# ---------------------------------------------------------------------------
_bedrock_runtime_client_cache: Dict[str, Any] = {}
_bedrock_control_client_cache: Dict[str, Any] = {}
def _require_boto3():
"""Import boto3, raising a clear error if not installed."""
try:
import boto3
return boto3
except ImportError:
raise ImportError(
"The 'boto3' package is required for the AWS Bedrock provider. "
"Install it with: pip install boto3\n"
"Or install Hermes with Bedrock support: pip install -e '.[bedrock]'"
)
def _get_bedrock_runtime_client(region: str):
"""Get or create a cached ``bedrock-runtime`` client for the given region.
Uses the default AWS credential chain (env vars → profile → instance role).
"""
if region not in _bedrock_runtime_client_cache:
boto3 = _require_boto3()
_bedrock_runtime_client_cache[region] = boto3.client(
"bedrock-runtime", region_name=region,
)
return _bedrock_runtime_client_cache[region]
def _get_bedrock_control_client(region: str):
"""Get or create a cached ``bedrock`` control-plane client for model discovery."""
if region not in _bedrock_control_client_cache:
boto3 = _require_boto3()
_bedrock_control_client_cache[region] = boto3.client(
"bedrock", region_name=region,
)
return _bedrock_control_client_cache[region]
def reset_client_cache():
"""Clear cached boto3 clients. Used in tests and profile switches."""
_bedrock_runtime_client_cache.clear()
_bedrock_control_client_cache.clear()
def invalidate_runtime_client(region: str) -> bool:
"""Evict the cached ``bedrock-runtime`` client for a single region.
Per-region counterpart to :func:`reset_client_cache`. Used by the converse
call wrappers to discard clients whose underlying HTTP connection has
gone stale, so the next call allocates a fresh client (with a fresh
connection pool) instead of reusing a dead socket.
Returns True if a cached entry was evicted, False if the region was not
cached.
"""
existed = region in _bedrock_runtime_client_cache
_bedrock_runtime_client_cache.pop(region, None)
return existed
# ---------------------------------------------------------------------------
# Stale-connection detection
# ---------------------------------------------------------------------------
#
# boto3 caches its HTTPS connection pool inside the client object. When a
# pooled connection is killed out from under us (NAT timeout, VPN flap,
# server-side TCP RST, proxy idle cull, etc.), the next use surfaces as
# one of a handful of low-level exceptions — most commonly
# ``botocore.exceptions.ConnectionClosedError`` or
# ``urllib3.exceptions.ProtocolError``. urllib3 also trips an internal
# ``assert`` in a couple of paths (connection pool state checks, chunked
# response readers) which bubbles up as a bare ``AssertionError`` with an
# empty ``str(exc)``.
#
# In all of these cases the client is the problem, not the request: retrying
# with the same cached client reproduces the failure until the process
# restarts. The fix is to evict the region's cached client so the next
# attempt builds a new one.
_STALE_LIB_MODULE_PREFIXES = (
"urllib3.",
"botocore.",
"boto3.",
)
def _traceback_frames_modules(exc: BaseException):
"""Yield ``__name__``-style module strings for each frame in exc's traceback."""
tb = getattr(exc, "__traceback__", None)
while tb is not None:
frame = tb.tb_frame
module = frame.f_globals.get("__name__", "")
yield module or ""
tb = tb.tb_next
def is_stale_connection_error(exc: BaseException) -> bool:
"""Return True if ``exc`` indicates a dead/stale Bedrock HTTP connection.
Matches:
* ``botocore.exceptions.ConnectionError`` and subclasses
(``ConnectionClosedError``, ``EndpointConnectionError``,
``ReadTimeoutError``, ``ConnectTimeoutError``).
* ``urllib3.exceptions.ProtocolError`` / ``NewConnectionError`` /
``ConnectionError`` (best-effort import — urllib3 is a transitive
dependency of botocore so it is always available in practice).
* Bare ``AssertionError`` raised from a frame inside urllib3, botocore,
or boto3. These are internal-invariant failures (typically triggered
by corrupted connection-pool state after a dropped socket) and are
recoverable by swapping the client.
Non-library ``AssertionError``s (from application code or tests) are
intentionally not matched — only library-internal asserts signal stale
connection state.
"""
# botocore: the canonical signal — HTTPClientError is the umbrella for
# ConnectionClosedError, ReadTimeoutError, EndpointConnectionError,
# ConnectTimeoutError, and ProxyConnectionError. ConnectionError covers
# the same family via a different branch of the hierarchy.
try:
from botocore.exceptions import (
ConnectionError as BotoConnectionError,
HTTPClientError,
)
botocore_errors: tuple = (BotoConnectionError, HTTPClientError)
except ImportError: # pragma: no cover — botocore always present with boto3
botocore_errors = ()
if botocore_errors and isinstance(exc, botocore_errors):
return True
# urllib3: low-level transport failures
try:
from urllib3.exceptions import (
ProtocolError,
NewConnectionError,
ConnectionError as Urllib3ConnectionError,
)
urllib3_errors = (ProtocolError, NewConnectionError, Urllib3ConnectionError)
except ImportError: # pragma: no cover
urllib3_errors = ()
if urllib3_errors and isinstance(exc, urllib3_errors):
return True
# Library-internal AssertionError (urllib3 / botocore / boto3)
if isinstance(exc, AssertionError):
for module in _traceback_frames_modules(exc):
if any(module.startswith(prefix) for prefix in _STALE_LIB_MODULE_PREFIXES):
return True
return False
# ---------------------------------------------------------------------------
# AWS credential detection
# ---------------------------------------------------------------------------
# Priority order matches OpenClaw's resolveAwsSdkEnvVarName():
# 1. AWS_BEARER_TOKEN_BEDROCK (Bedrock-specific bearer token)
# 2. AWS_ACCESS_KEY_ID + AWS_SECRET_ACCESS_KEY (explicit IAM credentials)
# 3. AWS_PROFILE (named profile → SSO, assume-role, etc.)
# 4. Implicit: instance role, ECS task role, Lambda execution role
_AWS_CREDENTIAL_ENV_VARS = [
"AWS_BEARER_TOKEN_BEDROCK",
"AWS_ACCESS_KEY_ID",
"AWS_PROFILE",
# These are checked by boto3's default chain but we list them for
# has_aws_credentials() detection:
"AWS_CONTAINER_CREDENTIALS_RELATIVE_URI",
"AWS_WEB_IDENTITY_TOKEN_FILE",
]
def resolve_aws_auth_env_var(env: Optional[Dict[str, str]] = None) -> Optional[str]:
"""Return the name of the AWS auth source that is active, or None.
Checks environment variables first, then falls back to boto3's credential
chain for implicit sources (EC2 IMDS, ECS task role, etc.).
This mirrors OpenClaw's ``resolveAwsSdkEnvVarName()`` — used to detect
whether the user has any AWS credentials configured without actually
attempting to authenticate.
"""
env = env if env is not None else os.environ
# Bearer token takes highest priority
if env.get("AWS_BEARER_TOKEN_BEDROCK", "").strip():
return "AWS_BEARER_TOKEN_BEDROCK"
# Explicit access key pair
if (env.get("AWS_ACCESS_KEY_ID", "").strip()
and env.get("AWS_SECRET_ACCESS_KEY", "").strip()):
return "AWS_ACCESS_KEY_ID"
# Named profile (SSO, assume-role, etc.)
if env.get("AWS_PROFILE", "").strip():
return "AWS_PROFILE"
# Container credentials (ECS, CodeBuild)
if env.get("AWS_CONTAINER_CREDENTIALS_RELATIVE_URI", "").strip():
return "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI"
# Web identity (EKS IRSA)
if env.get("AWS_WEB_IDENTITY_TOKEN_FILE", "").strip():
return "AWS_WEB_IDENTITY_TOKEN_FILE"
# No env vars — check if boto3 can resolve credentials via IMDS or other
# implicit sources (EC2 instance role, ECS task role, Lambda, etc.)
try:
import botocore.session
session = botocore.session.get_session()
credentials = session.get_credentials()
if credentials is not None:
resolved = credentials.get_frozen_credentials()
if resolved and resolved.access_key:
return "iam-role"
except Exception:
pass
return None
def has_aws_credentials(env: Optional[Dict[str, str]] = None) -> bool:
"""Return True if any AWS credential source is detected.
Checks environment variables first (fast, no I/O), then falls back to
boto3's credential chain which covers EC2 instance roles, ECS task roles,
Lambda execution roles, and other IMDS-based sources that don't set
environment variables.
This two-tier approach mirrors the pattern from OpenClaw PR #62673:
cloud environments (EC2, ECS, Lambda) provide credentials via instance
metadata, not environment variables. The env-var check is a fast path
for local development; the boto3 fallback covers all cloud deployments.
"""
if resolve_aws_auth_env_var(env) is not None:
return True
# Fall back to boto3's credential resolver — this covers EC2 instance
# metadata (IMDS), ECS container credentials, and other implicit sources
# that don't set environment variables.
try:
import botocore.session
session = botocore.session.get_session()
credentials = session.get_credentials()
if credentials is not None:
resolved = credentials.get_frozen_credentials()
if resolved and resolved.access_key:
return True
except Exception:
pass
return False
def resolve_bedrock_region(env: Optional[Dict[str, str]] = None) -> str:
"""Resolve the AWS region for Bedrock API calls.
Priority: AWS_REGION → AWS_DEFAULT_REGION → us-east-1 (fallback).
"""
env = env if env is not None else os.environ
return (
env.get("AWS_REGION", "").strip()
or env.get("AWS_DEFAULT_REGION", "").strip()
or "us-east-1"
)
# ---------------------------------------------------------------------------
# Tool-calling capability detection
# ---------------------------------------------------------------------------
# Some Bedrock models don't support tool/function calling. Sending toolConfig
# to these models causes ValidationException. We maintain a denylist of known
# non-tool-calling model patterns and strip tools for them.
#
# This is a conservative approach: unknown models are assumed to support tools.
# If a model fails with a tool-related ValidationException, add it here.
_NON_TOOL_CALLING_PATTERNS = [
"deepseek.r1", # DeepSeek R1 — reasoning only, no tool support
"deepseek-r1", # Alternate ID format
"stability.", # Image generation models
"cohere.embed", # Embedding models
"amazon.titan-embed", # Embedding models
]
def _model_supports_tool_use(model_id: str) -> bool:
"""Return True if the model is expected to support tool/function calling.
Models in the denylist are known to reject toolConfig in the Converse API.
Unknown models default to True (assume tool support).
"""
model_lower = model_id.lower()
return not any(pattern in model_lower for pattern in _NON_TOOL_CALLING_PATTERNS)
def is_anthropic_bedrock_model(model_id: str) -> bool:
"""Return True if the model is an Anthropic Claude model on Bedrock.
These models should use the AnthropicBedrock SDK path for full feature
parity (prompt caching, thinking budgets, adaptive thinking).
Non-Claude models use the Converse API path.
Matches:
- ``anthropic.claude-*`` (foundation model IDs)
- ``us.anthropic.claude-*`` (US inference profiles)
- ``global.anthropic.claude-*`` (global inference profiles)
- ``eu.anthropic.claude-*`` (EU inference profiles)
"""
model_lower = model_id.lower()
# Strip regional prefix if present
for prefix in ("us.", "global.", "eu.", "ap.", "jp."):
if model_lower.startswith(prefix):
model_lower = model_lower[len(prefix):]
break
return model_lower.startswith("anthropic.claude")
# ---------------------------------------------------------------------------
# Message format conversion: OpenAI → Bedrock Converse
# ---------------------------------------------------------------------------
def convert_tools_to_converse(tools: List[Dict]) -> List[Dict]:
"""Convert OpenAI-format tool definitions to Bedrock Converse ``toolConfig``.
OpenAI format::
{"type": "function", "function": {"name": "...", "description": "...",
"parameters": {"type": "object", "properties": {...}}}}
Converse format::
{"toolSpec": {"name": "...", "description": "...",
"inputSchema": {"json": {"type": "object", "properties": {...}}}}}
"""
if not tools:
return []
result = []
for t in tools:
fn = t.get("function", {})
name = fn.get("name", "")
description = fn.get("description", "")
parameters = fn.get("parameters", {"type": "object", "properties": {}})
result.append({
"toolSpec": {
"name": name,
"description": description,
"inputSchema": {"json": parameters},
}
})
return result
def _convert_content_to_converse(content) -> List[Dict]:
"""Convert OpenAI message content (string or list) to Converse content blocks.
Handles:
- Plain text strings → [{"text": "..."}]
- Content arrays with text/image_url parts → mixed text/image blocks
Filters out empty text blocks — Bedrock's Converse API rejects messages
where a text content block has an empty ``text`` field (ValidationException:
"text content blocks must be non-empty"). Ref: issue #9486.
"""
if content is None:
return [{"text": " "}]
if isinstance(content, str):
return [{"text": content}] if content.strip() else [{"text": " "}]
if isinstance(content, list):
blocks = []
for part in content:
if isinstance(part, str):
blocks.append({"text": part})
continue
if not isinstance(part, dict):
continue
part_type = part.get("type", "")
if part_type == "text":
text = part.get("text", "")
blocks.append({"text": text if text else " "})
elif part_type == "image_url":
image_url = part.get("image_url", {})
url = image_url.get("url", "") if isinstance(image_url, dict) else ""
if url.startswith("data:"):
# data:image/jpeg;base64,/9j/4AAQ...
header, _, data = url.partition(",")
media_type = "image/jpeg"
if header.startswith("data:"):
mime_part = header[5:].split(";")[0]
if mime_part:
media_type = mime_part
blocks.append({
"image": {
"format": media_type.split("/")[-1] if "/" in media_type else "jpeg",
"source": {"bytes": data},
}
})
else:
# Remote URL — Converse doesn't support URLs directly,
# include as text reference for the model.
blocks.append({"text": f"[Image: {url}]"})
return blocks if blocks else [{"text": " "}]
return [{"text": str(content)}]
def convert_messages_to_converse(
messages: List[Dict],
) -> Tuple[Optional[List[Dict]], List[Dict]]:
"""Convert OpenAI-format messages to Bedrock Converse format.
Returns ``(system_prompt, converse_messages)`` where:
- ``system_prompt`` is a list of system content blocks (or None)
- ``converse_messages`` is the conversation in Converse format
Handles:
- System messages → extracted as system prompt
- User messages → ``{"role": "user", "content": [...]}``
- Assistant messages → ``{"role": "assistant", "content": [...]}``
- Tool calls → ``{"toolUse": {"toolUseId": ..., "name": ..., "input": ...}}``
- Tool results → ``{"toolResult": {"toolUseId": ..., "content": [...]}}``
Converse requires strict user/assistant alternation. Consecutive messages
with the same role are merged into a single message.
"""
system_blocks: List[Dict] = []
converse_msgs: List[Dict] = []
for msg in messages:
role = msg.get("role", "")
content = msg.get("content")
if role == "system":
# System messages become the system prompt
if isinstance(content, str) and content.strip():
system_blocks.append({"text": content})
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get("type") == "text":
system_blocks.append({"text": part.get("text", "")})
elif isinstance(part, str):
system_blocks.append({"text": part})
continue
if role == "tool":
# Tool result messages → merge into the preceding user turn
tool_call_id = msg.get("tool_call_id", "")
result_content = content if isinstance(content, str) else json.dumps(content)
tool_result_block = {
"toolResult": {
"toolUseId": tool_call_id,
"content": [{"text": result_content}],
}
}
# In Converse, tool results go in a "user" role message
if converse_msgs and converse_msgs[-1]["role"] == "user":
converse_msgs[-1]["content"].append(tool_result_block)
else:
converse_msgs.append({
"role": "user",
"content": [tool_result_block],
})
continue
if role == "assistant":
content_blocks = []
# Convert text content
if isinstance(content, str) and content.strip():
content_blocks.append({"text": content})
elif isinstance(content, list):
content_blocks.extend(_convert_content_to_converse(content))
# Convert tool calls
tool_calls = msg.get("tool_calls", [])
for tc in (tool_calls or []):
fn = tc.get("function", {})
args_str = fn.get("arguments", "{}")
try:
args_dict = json.loads(args_str) if isinstance(args_str, str) else args_str
except (json.JSONDecodeError, TypeError):
args_dict = {}
content_blocks.append({
"toolUse": {
"toolUseId": tc.get("id", ""),
"name": fn.get("name", ""),
"input": args_dict,
}
})
if not content_blocks:
content_blocks = [{"text": " "}]
# Merge with previous assistant message if needed (strict alternation)
if converse_msgs and converse_msgs[-1]["role"] == "assistant":
converse_msgs[-1]["content"].extend(content_blocks)
else:
converse_msgs.append({
"role": "assistant",
"content": content_blocks,
})
continue
if role == "user":
content_blocks = _convert_content_to_converse(content)
# Merge with previous user message if needed (strict alternation)
if converse_msgs and converse_msgs[-1]["role"] == "user":
converse_msgs[-1]["content"].extend(content_blocks)
else:
converse_msgs.append({
"role": "user",
"content": content_blocks,
})
continue
# Converse requires the first message to be from the user
if converse_msgs and converse_msgs[0]["role"] != "user":
converse_msgs.insert(0, {"role": "user", "content": [{"text": " "}]})
# Converse requires the last message to be from the user
if converse_msgs and converse_msgs[-1]["role"] != "user":
converse_msgs.append({"role": "user", "content": [{"text": " "}]})
return (system_blocks if system_blocks else None, converse_msgs)
# ---------------------------------------------------------------------------
# Response format conversion: Bedrock Converse → OpenAI
# ---------------------------------------------------------------------------
def _converse_stop_reason_to_openai(stop_reason: str) -> str:
"""Map Bedrock Converse stop reasons to OpenAI finish_reason values."""
mapping = {
"end_turn": "stop",
"stop_sequence": "stop",
"tool_use": "tool_calls",
"max_tokens": "length",
"content_filtered": "content_filter",
"guardrail_intervened": "content_filter",
}
return mapping.get(stop_reason, "stop")
def normalize_converse_response(response: Dict) -> SimpleNamespace:
"""Convert a Bedrock Converse API response to an OpenAI-compatible object.
The agent loop in ``run_agent.py`` expects responses shaped like
``openai.ChatCompletion`` — this function bridges the gap.
Returns a SimpleNamespace with:
- ``.choices[0].message.content`` — text response
- ``.choices[0].message.tool_calls`` — tool call list (if any)
- ``.choices[0].finish_reason`` — stop/tool_calls/length
- ``.usage`` — token usage stats
"""
output = response.get("output", {})
message = output.get("message", {})
content_blocks = message.get("content", [])
stop_reason = response.get("stopReason", "end_turn")
text_parts = []
tool_calls = []
for block in content_blocks:
if "text" in block:
text_parts.append(block["text"])
elif "toolUse" in block:
tu = block["toolUse"]
tool_calls.append(SimpleNamespace(
id=tu.get("toolUseId", ""),
type="function",
function=SimpleNamespace(
name=tu.get("name", ""),
arguments=json.dumps(tu.get("input", {})),
),
))
# Build the message object
msg = SimpleNamespace(
role="assistant",
content="\n".join(text_parts) if text_parts else None,
tool_calls=tool_calls if tool_calls else None,
)
# Build usage stats
usage_data = response.get("usage", {})
usage = SimpleNamespace(
prompt_tokens=usage_data.get("inputTokens", 0),
completion_tokens=usage_data.get("outputTokens", 0),
total_tokens=(
usage_data.get("inputTokens", 0) + usage_data.get("outputTokens", 0)
),
)
finish_reason = _converse_stop_reason_to_openai(stop_reason)
if tool_calls and finish_reason == "stop":
finish_reason = "tool_calls"
choice = SimpleNamespace(
index=0,
message=msg,
finish_reason=finish_reason,
)
return SimpleNamespace(
choices=[choice],
usage=usage,
model=response.get("modelId", ""),
)
# ---------------------------------------------------------------------------
# Streaming response conversion
# ---------------------------------------------------------------------------
def normalize_converse_stream_events(event_stream) -> SimpleNamespace:
"""Consume a Bedrock ConverseStream event stream and build an OpenAI-compatible response.
Processes the stream events in order:
- ``messageStart`` — role info
- ``contentBlockStart`` — new text or toolUse block
- ``contentBlockDelta`` — incremental text or toolUse input
- ``contentBlockStop`` — block complete
- ``messageStop`` — stop reason
- ``metadata`` — usage stats
Returns the same shape as ``normalize_converse_response()``.
"""
return stream_converse_with_callbacks(event_stream)
def stream_converse_with_callbacks(
event_stream,
on_text_delta=None,
on_tool_start=None,
on_reasoning_delta=None,
on_interrupt_check=None,
) -> SimpleNamespace:
"""Process a Bedrock ConverseStream event stream with real-time callbacks.
This is the core streaming function that powers both the CLI's live token
display and the gateway's progressive message updates.
Args:
event_stream: The boto3 ``converse_stream()`` response containing a
``stream`` key with an iterable of events.
on_text_delta: Called with each text chunk as it arrives. Only fires
when no tool_use blocks have been seen (same semantics as the
Anthropic and chat_completions streaming paths).
on_tool_start: Called with the tool name when a toolUse block begins.
Lets the TUI show a spinner while tool arguments are generated.
on_reasoning_delta: Called with reasoning/thinking text chunks.
Bedrock surfaces thinking via ``reasoning`` content block deltas
on supported models (Claude 4.6+).
on_interrupt_check: Called on each event. Should return True if the
agent has been interrupted and streaming should stop.
Returns:
An OpenAI-compatible SimpleNamespace response, identical in shape to
``normalize_converse_response()``.
"""
text_parts: List[str] = []
tool_calls: List[SimpleNamespace] = []
current_tool: Optional[Dict] = None
current_text_buffer: List[str] = []
has_tool_use = False
stop_reason = "end_turn"
usage_data: Dict[str, int] = {}
for event in event_stream.get("stream", []):
# Check for interrupt
if on_interrupt_check and on_interrupt_check():
break
if "contentBlockStart" in event:
start = event["contentBlockStart"].get("start", {})
if "toolUse" in start:
has_tool_use = True
# Flush any accumulated text
if current_text_buffer:
text_parts.append("".join(current_text_buffer))
current_text_buffer = []
current_tool = {
"toolUseId": start["toolUse"].get("toolUseId", ""),
"name": start["toolUse"].get("name", ""),
"input_json": "",
}
if on_tool_start:
on_tool_start(current_tool["name"])
elif "contentBlockDelta" in event:
delta = event["contentBlockDelta"].get("delta", {})
if "text" in delta:
text = delta["text"]
current_text_buffer.append(text)
# Fire text delta callback only when no tool calls are present
# (same semantics as Anthropic/chat_completions streaming)
if on_text_delta and not has_tool_use:
on_text_delta(text)
elif "toolUse" in delta:
if current_tool is not None:
current_tool["input_json"] += delta["toolUse"].get("input", "")
elif "reasoningContent" in delta:
# Claude 4.6+ on Bedrock surfaces thinking via reasoningContent
reasoning = delta["reasoningContent"]
if isinstance(reasoning, dict):
thinking_text = reasoning.get("text", "")
if thinking_text and on_reasoning_delta:
on_reasoning_delta(thinking_text)
elif "contentBlockStop" in event:
if current_tool is not None:
try:
input_dict = json.loads(current_tool["input_json"]) if current_tool["input_json"] else {}
except (json.JSONDecodeError, TypeError):
input_dict = {}
tool_calls.append(SimpleNamespace(
id=current_tool["toolUseId"],
type="function",
function=SimpleNamespace(
name=current_tool["name"],
arguments=json.dumps(input_dict),
),
))
current_tool = None
elif current_text_buffer:
text_parts.append("".join(current_text_buffer))
current_text_buffer = []
elif "messageStop" in event:
stop_reason = event["messageStop"].get("stopReason", "end_turn")
elif "metadata" in event:
meta_usage = event["metadata"].get("usage", {})
usage_data = {
"inputTokens": meta_usage.get("inputTokens", 0),
"outputTokens": meta_usage.get("outputTokens", 0),
}
# Flush remaining text
if current_text_buffer:
text_parts.append("".join(current_text_buffer))
msg = SimpleNamespace(
role="assistant",
content="\n".join(text_parts) if text_parts else None,
tool_calls=tool_calls if tool_calls else None,
)
usage = SimpleNamespace(
prompt_tokens=usage_data.get("inputTokens", 0),
completion_tokens=usage_data.get("outputTokens", 0),
total_tokens=(
usage_data.get("inputTokens", 0) + usage_data.get("outputTokens", 0)
),
)
finish_reason = _converse_stop_reason_to_openai(stop_reason)
if tool_calls and finish_reason == "stop":
finish_reason = "tool_calls"
choice = SimpleNamespace(
index=0,
message=msg,
finish_reason=finish_reason,
)
return SimpleNamespace(
choices=[choice],
usage=usage,
model="",
)
# ---------------------------------------------------------------------------
# High-level API: call Bedrock Converse
# ---------------------------------------------------------------------------
def build_converse_kwargs(
model: str,
messages: List[Dict],
tools: Optional[List[Dict]] = None,
max_tokens: int = 4096,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
stop_sequences: Optional[List[str]] = None,
guardrail_config: Optional[Dict] = None,
) -> Dict[str, Any]:
"""Build kwargs for ``bedrock-runtime.converse()`` or ``converse_stream()``.
Converts OpenAI-format inputs to Converse API parameters.
"""
system_prompt, converse_messages = convert_messages_to_converse(messages)
kwargs: Dict[str, Any] = {
"modelId": model,
"messages": converse_messages,
"inferenceConfig": {
"maxTokens": max_tokens,
},
}
if system_prompt:
kwargs["system"] = system_prompt
if temperature is not None:
kwargs["inferenceConfig"]["temperature"] = temperature
if top_p is not None:
kwargs["inferenceConfig"]["topP"] = top_p
if stop_sequences:
kwargs["inferenceConfig"]["stopSequences"] = stop_sequences
if tools:
converse_tools = convert_tools_to_converse(tools)
if converse_tools:
# Some Bedrock models don't support tool/function calling (e.g.
# DeepSeek R1, reasoning-only models). Sending toolConfig to
# these models causes a ValidationException → retry loop → failure.
# Strip tools for known non-tool-calling models and warn the user.
# Ref: PR #7920 feedback from @ptlally, pattern from PR #4346.
if _model_supports_tool_use(model):
kwargs["toolConfig"] = {"tools": converse_tools}
else:
logger.warning(
"Model %s does not support tool calling — tools stripped. "
"The agent will operate in text-only mode.", model
)
if guardrail_config:
kwargs["guardrailConfig"] = guardrail_config
return kwargs
def call_converse(
region: str,
model: str,
messages: List[Dict],
tools: Optional[List[Dict]] = None,
max_tokens: int = 4096,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
stop_sequences: Optional[List[str]] = None,
guardrail_config: Optional[Dict] = None,
) -> SimpleNamespace:
"""Call Bedrock Converse API (non-streaming) and return an OpenAI-compatible response.
This is the primary entry point for the agent loop when using the Bedrock provider.
"""
client = _get_bedrock_runtime_client(region)
kwargs = build_converse_kwargs(
model=model,
messages=messages,
tools=tools,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop_sequences=stop_sequences,
guardrail_config=guardrail_config,
)
try:
response = client.converse(**kwargs)
except Exception as exc:
if is_stale_connection_error(exc):
logger.warning(
"bedrock: stale-connection error on converse(region=%s, model=%s): "
"%s — evicting cached client so the next call reconnects.",
region, model, type(exc).__name__,
)
invalidate_runtime_client(region)
raise
return normalize_converse_response(response)
def call_converse_stream(
region: str,
model: str,
messages: List[Dict],
tools: Optional[List[Dict]] = None,
max_tokens: int = 4096,
temperature: Optional[float] = None,
top_p: Optional[float] = None,
stop_sequences: Optional[List[str]] = None,
guardrail_config: Optional[Dict] = None,
) -> SimpleNamespace:
"""Call Bedrock ConverseStream API and return an OpenAI-compatible response.
Consumes the full stream and returns the assembled response. For true
streaming with delta callbacks, use ``iter_converse_stream()`` instead.
"""
client = _get_bedrock_runtime_client(region)
kwargs = build_converse_kwargs(
model=model,
messages=messages,
tools=tools,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stop_sequences=stop_sequences,
guardrail_config=guardrail_config,
)
try:
response = client.converse_stream(**kwargs)
except Exception as exc:
if is_stale_connection_error(exc):
logger.warning(
"bedrock: stale-connection error on converse_stream(region=%s, "
"model=%s): %s — evicting cached client so the next call reconnects.",
region, model, type(exc).__name__,
)
invalidate_runtime_client(region)
raise
return normalize_converse_stream_events(response)
# ---------------------------------------------------------------------------
# Model discovery
# ---------------------------------------------------------------------------
_discovery_cache: Dict[str, Any] = {}
_DISCOVERY_CACHE_TTL_SECONDS = 3600
def reset_discovery_cache():
"""Clear the model discovery cache. Used in tests."""
_discovery_cache.clear()
def discover_bedrock_models(
region: str,
provider_filter: Optional[List[str]] = None,
) -> List[Dict[str, Any]]:
"""Discover available Bedrock foundation models and inference profiles.
Returns a list of model info dicts with keys:
- ``id``: Model ID (e.g. "anthropic.claude-sonnet-4-6-20250514-v1:0")
- ``name``: Human-readable name
- ``provider``: Model provider (e.g. "Anthropic", "Amazon", "Meta")
- ``input_modalities``: List of input types (e.g. ["TEXT", "IMAGE"])
- ``output_modalities``: List of output types
- ``streaming``: Whether streaming is supported
Caches results for 1 hour per region to avoid repeated API calls.
Mirrors OpenClaw's ``discoverBedrockModels()`` in
``extensions/amazon-bedrock/discovery.ts``.
"""
import time
cache_key = f"{region}:{','.join(sorted(provider_filter or []))}"
cached = _discovery_cache.get(cache_key)
if cached and (time.time() - cached["timestamp"]) < _DISCOVERY_CACHE_TTL_SECONDS:
return cached["models"]
try:
client = _get_bedrock_control_client(region)
except Exception as e:
logger.warning("Failed to create Bedrock client for model discovery: %s", e)
return []
models = []
seen_ids = set()
filter_set = {f.lower() for f in (provider_filter or [])}
# 1. Discover foundation models
try:
response = client.list_foundation_models()
for summary in response.get("modelSummaries", []):
model_id = (summary.get("modelId") or "").strip()
if not model_id:
continue
# Apply provider filter
if filter_set:
provider_name = (summary.get("providerName") or "").lower()
model_prefix = model_id.split(".")[0].lower() if "." in model_id else ""
if provider_name not in filter_set and model_prefix not in filter_set:
continue
# Only include active, streaming-capable, text-output models
lifecycle = summary.get("modelLifecycle", {})
if lifecycle.get("status", "").upper() != "ACTIVE":
continue
if not summary.get("responseStreamingSupported", False):
continue
output_mods = summary.get("outputModalities", [])
if "TEXT" not in output_mods:
continue
models.append({
"id": model_id,
"name": (summary.get("modelName") or model_id).strip(),
"provider": (summary.get("providerName") or "").strip(),
"input_modalities": summary.get("inputModalities", []),
"output_modalities": output_mods,
"streaming": True,
})
seen_ids.add(model_id.lower())
except Exception as e:
logger.warning("Failed to list Bedrock foundation models: %s", e)
# 2. Discover inference profiles (cross-region, better capacity)
try:
profiles = []
next_token = None
while True:
kwargs = {}
if next_token:
kwargs["nextToken"] = next_token
response = client.list_inference_profiles(**kwargs)
for profile in response.get("inferenceProfileSummaries", []):
profiles.append(profile)
next_token = response.get("nextToken")
if not next_token:
break
for profile in profiles:
profile_id = (profile.get("inferenceProfileId") or "").strip()
if not profile_id:
continue
if profile.get("status") != "ACTIVE":
continue
if profile_id.lower() in seen_ids:
continue
# Apply provider filter to underlying models
if filter_set:
profile_models = profile.get("models", [])
matches = any(
_extract_provider_from_arn(m.get("modelArn", "")).lower() in filter_set
for m in profile_models
)
if not matches:
continue
models.append({
"id": profile_id,
"name": (profile.get("inferenceProfileName") or profile_id).strip(),
"provider": "inference-profile",
"input_modalities": ["TEXT"],
"output_modalities": ["TEXT"],
"streaming": True,
})
seen_ids.add(profile_id.lower())
except Exception as e:
logger.debug("Skipping inference profile discovery: %s", e)
# Sort: global cross-region profiles first (recommended), then alphabetical
models.sort(key=lambda m: (
0 if m["id"].startswith("global.") else 1,
m["name"].lower(),
))
_discovery_cache[cache_key] = {
"timestamp": time.time(),
"models": models,
}
return models
def _extract_provider_from_arn(arn: str) -> str:
"""Extract the model provider from a Bedrock model ARN.
Example: "arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2"
"anthropic"
"""
match = re.search(r"foundation-model/([^.]+)", arn)
return match.group(1) if match else ""
def get_bedrock_model_ids(region: str) -> List[str]:
"""Return a flat list of available Bedrock model IDs for the given region.
Convenience wrapper around ``discover_bedrock_models()`` for use in
the model selection UI.
"""
models = discover_bedrock_models(region)
return [m["id"] for m in models]
# ---------------------------------------------------------------------------
# Error classification — Bedrock-specific exceptions
# ---------------------------------------------------------------------------
# Mirrors OpenClaw's classifyFailoverReason() and matchesContextOverflowError()
# in extensions/amazon-bedrock/register.sync.runtime.ts.
# Patterns that indicate the input context exceeded the model's token limit.
# Used by run_agent.py to trigger context compression instead of retrying.
CONTEXT_OVERFLOW_PATTERNS = [
re.compile(r"ValidationException.*(?:input is too long|max input token|input token.*exceed)", re.IGNORECASE),
re.compile(r"ValidationException.*(?:exceeds? the (?:maximum|max) (?:number of )?(?:input )?tokens)", re.IGNORECASE),
re.compile(r"ModelStreamErrorException.*(?:Input is too long|too many input tokens)", re.IGNORECASE),
]
# Patterns for throttling / rate limit errors — should trigger backoff + retry.
THROTTLE_PATTERNS = [
re.compile(r"ThrottlingException", re.IGNORECASE),
re.compile(r"Too many concurrent requests", re.IGNORECASE),
re.compile(r"ServiceQuotaExceededException", re.IGNORECASE),
]
# Patterns for transient overload — model is temporarily unavailable.
OVERLOAD_PATTERNS = [
re.compile(r"ModelNotReadyException", re.IGNORECASE),
re.compile(r"ModelTimeoutException", re.IGNORECASE),
re.compile(r"InternalServerException", re.IGNORECASE),
]
def is_context_overflow_error(error_message: str) -> bool:
"""Return True if the error indicates the input context was too large.
When this returns True, the agent should compress context and retry
rather than treating it as a fatal error.
"""
return any(p.search(error_message) for p in CONTEXT_OVERFLOW_PATTERNS)
def classify_bedrock_error(error_message: str) -> str:
"""Classify a Bedrock error for retry/failover decisions.
Returns:
- ``"context_overflow"`` — input too long, compress and retry
- ``"rate_limit"`` — throttled, backoff and retry
- ``"overloaded"`` — model temporarily unavailable, retry with delay
- ``"unknown"`` — unclassified error
"""
if is_context_overflow_error(error_message):
return "context_overflow"
if any(p.search(error_message) for p in THROTTLE_PATTERNS):
return "rate_limit"
if any(p.search(error_message) for p in OVERLOAD_PATTERNS):
return "overloaded"
return "unknown"
# ---------------------------------------------------------------------------
# Bedrock model context lengths
# ---------------------------------------------------------------------------
# Static fallback table for models where the Bedrock API doesn't expose
# context window sizes. Used by agent/model_metadata.py when dynamic
# detection is unavailable.
BEDROCK_CONTEXT_LENGTHS: Dict[str, int] = {
# Anthropic Claude models on Bedrock
"anthropic.claude-opus-4-6": 200_000,
"anthropic.claude-sonnet-4-6": 200_000,
"anthropic.claude-sonnet-4-5": 200_000,
"anthropic.claude-haiku-4-5": 200_000,
"anthropic.claude-opus-4": 200_000,
"anthropic.claude-sonnet-4": 200_000,
"anthropic.claude-3-5-sonnet": 200_000,
"anthropic.claude-3-5-haiku": 200_000,
"anthropic.claude-3-opus": 200_000,
"anthropic.claude-3-sonnet": 200_000,
"anthropic.claude-3-haiku": 200_000,
# Amazon Nova
"amazon.nova-pro": 300_000,
"amazon.nova-lite": 300_000,
"amazon.nova-micro": 128_000,
# Meta Llama
"meta.llama4-maverick": 128_000,
"meta.llama4-scout": 128_000,
"meta.llama3-3-70b-instruct": 128_000,
# Mistral
"mistral.mistral-large": 128_000,
# DeepSeek
"deepseek.v3": 128_000,
}
# Default for unknown Bedrock models
BEDROCK_DEFAULT_CONTEXT_LENGTH = 128_000
def get_bedrock_context_length(model_id: str) -> int:
"""Look up the context window size for a Bedrock model.
Uses substring matching so versioned IDs like
``anthropic.claude-sonnet-4-6-20250514-v1:0`` resolve correctly.
"""
model_lower = model_id.lower()
best_key = ""
best_val = BEDROCK_DEFAULT_CONTEXT_LENGTH
for key, val in BEDROCK_CONTEXT_LENGTHS.items():
if key in model_lower and len(key) > len(best_key):
best_key = key
best_val = val
return best_val