feat(stream-retry): add upstream + timing diagnostics to drop log (#23005)

The previous PR (#22993) gave us a structured WARNING per stream drop
but the only diagnostic was 'error_type=APIError error=Network
connection lost.' — same nothing the user started with. To actually
diagnose why subagents drop streams disproportionately we need to know
WHERE the drop happened.

Adds three breadcrumbs to the agent.log WARNING:

1. Inner exception chain. openai SDK wraps httpx errors as
   APIConnectionError / APIError so the catch site only sees the
   wrapper. _flatten_exception_chain walks __cause__/__context__ up to
   4 levels deep and renders 'Outer(msg) <- Inner(msg)' so we can
   tell ConnectError vs RemoteProtocolError vs ReadError vs
   ProxyError without enabling verbose mode.

2. Upstream HTTP headers. Snapshots cf-ray, x-openrouter-provider,
   x-openrouter-model, x-openrouter-id, x-request-id, server, via,
   etc. from stream.response immediately after open (so they survive
   even when the stream dies before the first chunk). These answer
   'is one CF edge / one downstream provider responsible, or random?'

3. Per-attempt counters. bytes streamed, chunk count, elapsed time on
   the dying attempt, and time-to-first-byte. Distinguishes 'couldn't
   connect at all' (0s, 0 bytes) from 'died after 30s mid-stream'
   (very different root causes — first is auth/routing, second is
   upstream idle-kill or proxy timeout).

Plumbing:

- _stream_diag_init / _stream_diag_capture_response live on AIAgent
  and produce a per-attempt dict held on request_client_holder['diag']
  for closure access from the retry block.
- _call_chat_completions and _call_anthropic both initialize the diag
  and increment counters per chunk/event (best-effort, never raises in
  the streaming hot path).
- _log_stream_retry / _emit_stream_drop accept an optional diag and
  render the new fields. Final-exhaustion log goes through the same
  helper so it gets the same diagnostic dump.
- UI status line gains a brief 'after Xs' suffix when timing is
  available — distinguishes 'connect failed' from 'died mid-stream'
  at a glance without grepping logs.

Sample WARNING after this change:

  Stream drop mid tool-call on attempt 2/3 — retrying.
    subagent_id=sa-2-cafef00d depth=1 provider=openrouter
    base_url=https://openrouter.ai/api/v1
    error_type=APIError error=Connection error.
    chain=APIError(Connection error.) <- RemoteProtocolError(peer
      closed connection without sending complete message body)
    http_status=200 bytes=12400 chunks=47 elapsed=12.00s ttfb=0.83s
    upstream=[cf-ray=8f1a2b3c4d5e6f7g-LAX
      x-openrouter-provider=Anthropic
      x-openrouter-id=gen-abc123 server=cloudflare]

Tests: 10 covering diag init, header capture (whitelist enforced for
PII), exception-chain walking + depth cap, log content with full diag,
log content without diag (placeholders), UI elapsed-suffix on/off.
This commit is contained in:
Teknium 2026-05-09 22:49:35 -07:00 committed by GitHub
parent 5a70d9b6be
commit 126cbffb8a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 420 additions and 120 deletions

View file

@ -2822,6 +2822,100 @@ class AIAgent:
except Exception: except Exception:
logger.debug("status_callback error in _emit_warning", exc_info=True) logger.debug("status_callback error in _emit_warning", exc_info=True)
# Headers we capture from the dying stream's HTTP response so post-mortem
# diagnosis can answer "which CF edge / which OpenRouter downstream
# provider / which request id". Lowercased; httpx returns CIMultiDict.
_STREAM_DIAG_HEADERS = (
"cf-ray",
"cf-cache-status",
"x-openrouter-provider",
"x-openrouter-model",
"x-openrouter-id",
"x-request-id",
"x-vercel-id",
"via",
"server",
"x-forwarded-for",
)
@staticmethod
def _stream_diag_init() -> Dict[str, Any]:
"""Return a fresh per-attempt diagnostic dict.
Mutated in-place by the streaming functions and read from the retry
block when a stream dies. Lives on ``request_client_holder`` so it
survives across the closure boundary.
"""
return {
"started_at": time.time(),
"first_chunk_at": None,
"chunks": 0,
"bytes": 0,
"headers": {},
"http_status": None,
}
def _stream_diag_capture_response(
self, diag: Dict[str, Any], http_response: Any
) -> None:
"""Snapshot interesting headers + HTTP status from the live stream.
Called once at stream open (before iterating chunks) so the metadata
survives even if the stream dies before any chunk arrives. Failures
are swallowed diag is best-effort.
"""
if http_response is None or not isinstance(diag, dict):
return
try:
diag["http_status"] = getattr(http_response, "status_code", None)
except Exception:
pass
try:
headers = getattr(http_response, "headers", None) or {}
captured: Dict[str, str] = {}
for name in self._STREAM_DIAG_HEADERS:
try:
val = headers.get(name)
if val:
# Truncate single-value to keep log lines bounded.
captured[name] = str(val)[:120]
except Exception:
continue
diag["headers"] = captured
except Exception:
pass
@staticmethod
def _flatten_exception_chain(error: BaseException) -> str:
"""Return a compact ``Outer(msg) <- Inner(msg) <- ...`` rendering.
OpenAI SDK wraps httpx errors as ``APIConnectionError`` /
``APIError`` and only the wrapper's class is visible at the catch
site but the underlying ``RemoteProtocolError`` /
``ConnectError`` / ``ReadError`` is what tells us WHY the stream
died. Walks ``__cause__`` then ``__context__`` (deduped, max 4
deep) to surface the chain in one line.
"""
seen: List[BaseException] = []
link: Optional[BaseException] = error
while link is not None and len(seen) < 4:
if link in seen:
break
seen.append(link)
nxt = getattr(link, "__cause__", None) or getattr(
link, "__context__", None
)
if nxt is None or nxt is link:
break
link = nxt
parts: List[str] = []
for e in seen:
msg = str(e).strip().replace("\n", " ")
if len(msg) > 140:
msg = msg[:140] + ""
parts.append(f"{type(e).__name__}({msg})" if msg else type(e).__name__)
return " <- ".join(parts) if parts else type(error).__name__
def _log_stream_retry( def _log_stream_retry(
self, self,
*, *,
@ -2830,6 +2924,7 @@ class AIAgent:
attempt: int, attempt: int,
max_attempts: int, max_attempts: int,
mid_tool_call: bool, mid_tool_call: bool,
diag: Optional[Dict[str, Any]] = None,
) -> None: ) -> None:
"""Record a transient stream-drop and retry to ``agent.log``. """Record a transient stream-drop and retry to ``agent.log``.
@ -2837,6 +2932,13 @@ class AIAgent:
of UI verbosity. Subagents in particular benefit because their of UI verbosity. Subagents in particular benefit because their
retries no longer spam the parent's terminal — but the file log keeps retries no longer spam the parent's terminal — but the file log keeps
full detail (provider, error class, attempt, base_url, subagent_id). full detail (provider, error class, attempt, base_url, subagent_id).
When *diag* is provided (the per-attempt stream-diagnostic dict from
``_stream_diag_init``), the WARNING also captures upstream headers
(cf-ray, x-openrouter-provider, x-openrouter-id), HTTP status, bytes
streamed before the drop, and elapsed time on the dying attempt.
These are the breadcrumbs needed to answer "is one CF edge / one
downstream provider responsible, or is it random across runs?"
""" """
try: try:
try: try:
@ -2845,10 +2947,47 @@ class AIAgent:
_summary = str(error) _summary = str(error)
if _summary and len(_summary) > 240: if _summary and len(_summary) > 240:
_summary = _summary[:240] + "" _summary = _summary[:240] + ""
# Inner-cause chain (httpx errors hide under openai.APIError).
try:
_chain = self._flatten_exception_chain(error)
except Exception:
_chain = type(error).__name__
# Per-attempt counters and upstream headers.
_now = time.time()
_bytes = 0
_chunks = 0
_elapsed = 0.0
_ttfb = None
_headers_repr = "-"
_http_status = "-"
if isinstance(diag, dict):
try:
_bytes = int(diag.get("bytes") or 0)
_chunks = int(diag.get("chunks") or 0)
_started = float(diag.get("started_at") or _now)
_elapsed = max(0.0, _now - _started)
_first = diag.get("first_chunk_at")
if _first is not None:
_ttfb = max(0.0, float(_first) - _started)
headers = diag.get("headers") or {}
if isinstance(headers, dict) and headers:
_headers_repr = " ".join(
f"{k}={v}" for k, v in headers.items()
)
if diag.get("http_status") is not None:
_http_status = str(diag.get("http_status"))
except Exception:
pass
logger.warning( logger.warning(
"Stream %s on attempt %s/%s — retrying. " "Stream %s on attempt %s/%s — retrying. "
"subagent_id=%s depth=%s provider=%s base_url=%s " "subagent_id=%s depth=%s provider=%s base_url=%s "
"error_type=%s error=%s", "error_type=%s error=%s "
"chain=%s "
"http_status=%s bytes=%d chunks=%d elapsed=%.2fs ttfb=%s "
"upstream=[%s]",
kind, kind,
attempt, attempt,
max_attempts, max_attempts,
@ -2858,6 +2997,13 @@ class AIAgent:
self.base_url or "-", self.base_url or "-",
type(error).__name__, type(error).__name__,
_summary, _summary,
_chain,
_http_status,
_bytes,
_chunks,
_elapsed,
f"{_ttfb:.2f}s" if _ttfb is not None else "-",
_headers_repr,
extra={"mid_tool_call": mid_tool_call}, extra={"mid_tool_call": mid_tool_call},
) )
except Exception: except Exception:
@ -2870,6 +3016,7 @@ class AIAgent:
attempt: int, attempt: int,
max_attempts: int, max_attempts: int,
mid_tool_call: bool, mid_tool_call: bool,
diag: Optional[Dict[str, Any]] = None,
) -> None: ) -> None:
"""Emit a single user-visible line for a stream drop+retry. """Emit a single user-visible line for a stream drop+retry.
@ -2877,13 +3024,14 @@ class AIAgent:
parent prefixes subagent lines with ``[subagent-N]`` via ``log_prefix`` parent prefixes subagent lines with ``[subagent-N]`` via ``log_prefix``
so they're easy to attribute. All cases also write a structured so they're easy to attribute. All cases also write a structured
WARNING to ``agent.log`` via :meth:`_log_stream_retry` with the full WARNING to ``agent.log`` via :meth:`_log_stream_retry` with the full
diagnostic detail (subagent_id, provider, base_url, error_type) for diagnostic detail (subagent_id, provider, base_url, error_type,
post-hoc analysis. cf-ray, x-openrouter-provider, bytes/chunks, elapsed) for post-hoc
analysis.
Replaces the older two-line `` Connection dropped `` + The user-visible status line is intentionally compact: provider,
``🔄 Reconnected `` pair with a single information-dense line that error class, attempt N/M, plus ``after Xs`` when the stream dropped
names the provider (so multi-provider sessions can tell who dropped) mid-flight. Full diagnostic detail goes to ``agent.log`` only
and the error class without ambiguity. ``hermes logs --level WARNING | grep "Stream drop"`` to inspect.
""" """
kind = "drop mid tool-call" if mid_tool_call else "drop" kind = "drop mid tool-call" if mid_tool_call else "drop"
self._log_stream_retry( self._log_stream_retry(
@ -2892,11 +3040,23 @@ class AIAgent:
attempt=attempt, attempt=attempt,
max_attempts=max_attempts, max_attempts=max_attempts,
mid_tool_call=mid_tool_call, mid_tool_call=mid_tool_call,
diag=diag,
) )
provider = self.provider or "provider" provider = self.provider or "provider"
# Compose a brief "after Xs" suffix when we have timing data — helps
# the user distinguish "couldn't connect" (0s) from "died after 30s
# of streaming" (likely upstream idle-kill or proxy timeout).
_suffix = ""
if isinstance(diag, dict):
try:
started = diag.get("started_at")
if started is not None:
_suffix = f" after {max(0.0, time.time() - float(started)):.1f}s"
except Exception:
pass
try: try:
self._emit_status( self._emit_status(
f"⚠️ {provider} stream {kind} ({type(error).__name__}) " f"⚠️ {provider} stream {kind} ({type(error).__name__}){_suffix} "
f"— reconnecting, retry {attempt}/{max_attempts}" f"— reconnecting, retry {attempt}/{max_attempts}"
) )
self._touch_activity( self._touch_activity(
@ -7382,7 +7542,7 @@ class AIAgent:
return result["response"] return result["response"]
result = {"response": None, "error": None, "partial_tool_names": []} result = {"response": None, "error": None, "partial_tool_names": []}
request_client_holder = {"client": None} request_client_holder = {"client": None, "diag": None}
first_delta_fired = {"done": False} first_delta_fired = {"done": False}
deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback) deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback)
# Wall-clock timestamp of the last real streaming chunk. The outer # Wall-clock timestamp of the last real streaming chunk. The outer
@ -7444,12 +7604,21 @@ class AIAgent:
# attempt's start, not a previous attempt's last chunk. # attempt's start, not a previous attempt's last chunk.
last_chunk_time["t"] = time.time() last_chunk_time["t"] = time.time()
self._touch_activity("waiting for provider response (streaming)") self._touch_activity("waiting for provider response (streaming)")
# Initialize per-attempt stream diagnostics so the retry block can
# reach for them after the stream dies. Lives on
# ``request_client_holder["diag"]`` for closure access.
_diag = self._stream_diag_init()
request_client_holder["diag"] = _diag
stream = request_client_holder["client"].chat.completions.create(**stream_kwargs) stream = request_client_holder["client"].chat.completions.create(**stream_kwargs)
# Capture rate limit headers from the initial HTTP response. # Capture rate limit headers from the initial HTTP response.
# The OpenAI SDK Stream object exposes the underlying httpx # The OpenAI SDK Stream object exposes the underlying httpx
# response via .response before any chunks are consumed. # response via .response before any chunks are consumed.
self._capture_rate_limits(getattr(stream, "response", None)) self._capture_rate_limits(getattr(stream, "response", None))
# Snapshot diagnostic headers (cf-ray, x-openrouter-provider, etc.)
# so they survive even when the stream dies before any chunk
# arrives. Best-effort; never raises.
self._stream_diag_capture_response(_diag, getattr(stream, "response", None))
# Log OpenRouter response cache status when present. # Log OpenRouter response cache status when present.
self._check_openrouter_cache_status(getattr(stream, "response", None)) self._check_openrouter_cache_status(getattr(stream, "response", None))
@ -7472,6 +7641,24 @@ class AIAgent:
last_chunk_time["t"] = time.time() last_chunk_time["t"] = time.time()
self._touch_activity("receiving stream response") self._touch_activity("receiving stream response")
# Update per-attempt diagnostic counters. Best-effort —
# failures are swallowed so the streaming hot path is never
# interrupted by diagnostic accounting.
try:
_diag["chunks"] = int(_diag.get("chunks", 0)) + 1
if _diag.get("first_chunk_at") is None:
_diag["first_chunk_at"] = last_chunk_time["t"]
# Approximate byte size from the chunk's repr — exact wire
# bytes aren't exposed by the SDK, but len(repr(chunk)) is
# a stable proxy for "how much content arrived" that
# survives stub provider differences.
try:
_diag["bytes"] = int(_diag.get("bytes", 0)) + len(repr(chunk))
except Exception:
pass
except Exception:
pass
if self._interrupt_requested: if self._interrupt_requested:
break break
@ -7666,8 +7853,21 @@ class AIAgent:
# Reset stale-stream timer for this attempt # Reset stale-stream timer for this attempt
last_chunk_time["t"] = time.time() last_chunk_time["t"] = time.time()
# Per-attempt diagnostic dict for the retry block to consume.
_diag = self._stream_diag_init()
request_client_holder["diag"] = _diag
# Use the Anthropic SDK's streaming context manager # Use the Anthropic SDK's streaming context manager
with self._anthropic_client.messages.stream(**api_kwargs) as stream: with self._anthropic_client.messages.stream(**api_kwargs) as stream:
# The Anthropic SDK exposes the raw httpx response on
# ``stream.response``. Snapshot diagnostic headers
# immediately so they survive a stream that dies before the
# first event.
try:
self._stream_diag_capture_response(
_diag, getattr(stream, "response", None)
)
except Exception:
pass
for event in stream: for event in stream:
# Update stale-stream timer on every event so the # Update stale-stream timer on every event so the
# outer poll loop knows data is flowing. Without # outer poll loop knows data is flowing. Without
@ -7678,6 +7878,18 @@ class AIAgent:
last_chunk_time["t"] = time.time() last_chunk_time["t"] = time.time()
self._touch_activity("receiving stream response") self._touch_activity("receiving stream response")
# Update per-attempt diagnostic counters (best-effort).
try:
_diag["chunks"] = int(_diag.get("chunks", 0)) + 1
if _diag.get("first_chunk_at") is None:
_diag["first_chunk_at"] = last_chunk_time["t"]
try:
_diag["bytes"] = int(_diag.get("bytes", 0)) + len(repr(event))
except Exception:
pass
except Exception:
pass
if self._interrupt_requested: if self._interrupt_requested:
break break
@ -7831,6 +8043,7 @@ class AIAgent:
attempt=_stream_attempt + 2, attempt=_stream_attempt + 2,
max_attempts=_max_stream_retries + 1, max_attempts=_max_stream_retries + 1,
mid_tool_call=True, mid_tool_call=True,
diag=request_client_holder.get("diag"),
) )
stale = request_client_holder.get("client") stale = request_client_holder.get("client")
if stale is not None: if stale is not None:
@ -7885,6 +8098,7 @@ class AIAgent:
attempt=_stream_attempt + 2, attempt=_stream_attempt + 2,
max_attempts=_max_stream_retries + 1, max_attempts=_max_stream_retries + 1,
mid_tool_call=False, mid_tool_call=False,
diag=request_client_holder.get("diag"),
) )
# Close the stale request client before retry # Close the stale request client before retry
stale = request_client_holder.get("client") stale = request_client_holder.get("client")
@ -7903,19 +8117,18 @@ class AIAgent:
pass pass
continue continue
# Retries exhausted. Log the final failure with # Retries exhausted. Log the final failure with
# full diagnostic detail and surface a status # full diagnostic detail (chain, headers,
# line — subagent lines get the ``[subagent-N]`` # bytes/elapsed) via the same helper used for
# log_prefix so the parent can attribute them. # mid-flight retries — subagent lines get the
logger.warning( # ``[subagent-N]`` log_prefix so the parent can
"Streaming exhausted %s retries on transient " # attribute them.
"error: subagent_id=%s depth=%s provider=%s " self._log_stream_retry(
"error_type=%s error=%s", kind="exhausted",
_max_stream_retries + 1, error=e,
getattr(self, "_subagent_id", None) or "-", attempt=_max_stream_retries + 1,
getattr(self, "_delegate_depth", 0), max_attempts=_max_stream_retries + 1,
self.provider or "-", mid_tool_call=False,
type(e).__name__, diag=request_client_holder.get("diag"),
e,
) )
self._emit_status( self._emit_status(
"❌ Connection to provider failed after " "❌ Connection to provider failed after "

View file

@ -1,24 +1,24 @@
"""Tests for the structured stream-drop log + clearer single-line status. """Tests for richer stream-drop diagnostics in agent.log.
Regression coverage for the change that: When a subagent's stream drops mid-tool-call, the WARNING in agent.log must
carry enough breadcrumbs to answer "WHY did it drop" without requiring a
verbose-mode rerun. Specifically:
1. Removed ``logger.setLevel(ERROR)`` on tools/run_agent/etc. in quiet mode. - Inner exception chain (httpx errors wrapped by openai SDK)
It was clobbering the root logger's file handlers (agent.log/errors.log) - Upstream HTTP headers (cf-ray, x-openrouter-provider, x-openrouter-id, ...)
because Python checks logger-level before handler propagation, so - HTTP status of the dying response
subagent stream-drop diagnostics were never written to disk. - Bytes streamed and chunks received before the drop
2. Replaced the two `` Connection dropped `` + ``🔄 Reconnected `` - Elapsed time on the attempt + time-to-first-byte
``_emit_status`` calls with a single ``_emit_stream_drop`` helper that:
- Always writes a structured WARNING to ``agent.log``. Plus the user-visible UI line gains an ``after Xs`` suffix when timing data
- Always emits exactly ONE user-visible status line per drop (no is available, distinguishing "couldn't connect at all" from "died mid-stream
follow-up "Reconnected" line) that names the provider and error after N seconds" (very different root causes).
class so multi-provider sessions can attribute it cleanly.
- Subagent lines get the ``[subagent-N]`` ``log_prefix`` automatically
via ``_emit_status`` ``_vprint``.
""" """
from __future__ import annotations from __future__ import annotations
import logging import logging
import time
from unittest.mock import patch from unittest.mock import patch
import pytest import pytest
@ -37,124 +37,211 @@ def _make_agent() -> AIAgent:
) )
def test_quiet_mode_does_not_clobber_runagent_logger_level(): def test_stream_diag_init_returns_well_formed_dict():
"""quiet_mode must not raise the ``run_agent`` logger above WARNING. diag = AIAgent._stream_diag_init()
assert "started_at" in diag
Setting ``setLevel(ERROR)`` on the logger filters records *before* root assert diag["chunks"] == 0
logger handlers (agent.log/errors.log) ever see them. Stream-drop assert diag["bytes"] == 0
diagnostics must reach the file handlers regardless of console verbosity. assert diag["first_chunk_at"] is None
""" assert diag["http_status"] is None
_ = _make_agent() assert diag["headers"] == {}
for name in ("run_agent", "tools", "trajectory_compressor", "cron", "hermes_cli"):
logger = logging.getLogger(name)
assert logger.getEffectiveLevel() <= logging.WARNING, (
f"{name} logger blocked at level {logger.getEffectiveLevel()}"
f"file handlers will lose WARNING records"
)
def test_log_stream_retry_writes_structured_warning(caplog): class _FakeHeaders:
def __init__(self, d): self._d = {k.lower(): v for k, v in d.items()}
def get(self, k, default=None): return self._d.get(k.lower(), default)
class _FakeResponse:
def __init__(self, headers, status=200):
self.status_code = status
self.headers = _FakeHeaders(headers)
def test_stream_diag_capture_response_collects_known_headers():
agent = _make_agent()
diag = AIAgent._stream_diag_init()
resp = _FakeResponse({
"cf-ray": "8f1a2b3c4d5e6f7g-LAX",
"x-openrouter-provider": "Anthropic",
"x-openrouter-id": "gen-abc123",
"x-request-id": "req-xyz",
"server": "cloudflare",
"irrelevant-header": "ignored",
})
agent._stream_diag_capture_response(diag, resp)
assert diag["http_status"] == 200
assert diag["headers"]["cf-ray"] == "8f1a2b3c4d5e6f7g-LAX"
assert diag["headers"]["x-openrouter-provider"] == "Anthropic"
assert diag["headers"]["x-openrouter-id"] == "gen-abc123"
assert diag["headers"]["server"] == "cloudflare"
# Headers not in _STREAM_DIAG_HEADERS must not be captured (PII surface).
assert "irrelevant-header" not in diag["headers"]
def test_stream_diag_capture_response_safe_with_none():
agent = _make_agent()
diag = AIAgent._stream_diag_init()
agent._stream_diag_capture_response(diag, None)
# Must not raise; diag stays initialized.
assert diag["headers"] == {}
def test_flatten_exception_chain_walks_cause():
inner = ConnectionError("upstream closed")
middle = TimeoutError("timed out")
middle.__cause__ = inner
outer = RuntimeError("wrapper")
outer.__cause__ = middle
chain = AIAgent._flatten_exception_chain(outer)
assert "RuntimeError" in chain
assert "TimeoutError" in chain
assert "ConnectionError" in chain
assert " <- " in chain
def test_flatten_exception_chain_caps_depth():
"""Chain renders no more than 4 deep so log lines stay bounded."""
e0 = ValueError("0")
prev = e0
for i in range(1, 8):
nxt = ValueError(str(i))
nxt.__cause__ = prev
prev = nxt
chain = AIAgent._flatten_exception_chain(prev)
# 4 layers + 3 separators max.
assert chain.count("<-") <= 3
def test_log_stream_retry_includes_diagnostic_fields(caplog):
agent = _make_agent() agent = _make_agent()
agent._delegate_depth = 1 agent._delegate_depth = 1
agent._subagent_id = "sa-7-cafef00d" agent._subagent_id = "sa-3-deadbeef"
agent.provider = "openrouter" agent.provider = "openrouter"
err = ConnectionError("Network connection lost.") diag = AIAgent._stream_diag_init()
diag["http_status"] = 200
diag["headers"] = {
"cf-ray": "8f1a2b3c4d5e6f7g-LAX",
"x-openrouter-provider": "Anthropic",
"x-openrouter-id": "gen-xyz789",
}
diag["chunks"] = 12
diag["bytes"] = 4096
# Simulate 5s elapsed with first chunk at 0.5s.
diag["started_at"] = time.time() - 5.0
diag["first_chunk_at"] = diag["started_at"] + 0.5
inner = ConnectionError("peer closed")
outer = RuntimeError("Connection error.")
outer.__cause__ = inner
with caplog.at_level(logging.WARNING, logger="run_agent"): with caplog.at_level(logging.WARNING, logger="run_agent"):
agent._log_stream_retry( agent._log_stream_retry(
kind="drop mid tool-call", kind="drop mid tool-call",
error=err, error=outer,
attempt=2, attempt=2,
max_attempts=3, max_attempts=3,
mid_tool_call=True, mid_tool_call=True,
diag=diag,
) )
matching = [r for r in caplog.records if "Stream drop mid tool-call" in r.getMessage()] msg = next(
assert matching, f"no stream-drop WARNING captured; got {[r.getMessage() for r in caplog.records]}" r.getMessage() for r in caplog.records
msg = matching[0].getMessage() if "Stream drop mid tool-call" in r.getMessage()
assert "subagent_id=sa-7-cafef00d" in msg
assert "depth=1" in msg
assert "provider=openrouter" in msg
assert "base_url=https://openrouter.ai/api/v1" in msg
assert "error_type=ConnectionError" in msg
@pytest.mark.parametrize("depth", [0, 1])
def test_emit_stream_drop_emits_status_line(depth):
"""Both top-level and subagent paths emit exactly one status line.
Subagent lines get the ``[subagent-N]`` log_prefix via the parent's
``_vprint`` plumbing this test only checks that ``_emit_status`` is
called once with the right content. No "Reconnected" follow-up.
"""
agent = _make_agent()
agent._delegate_depth = depth
if depth > 0:
agent._subagent_id = "sa-2-cafe"
agent.provider = "openrouter"
with patch.object(agent, "_emit_status") as mock_emit:
agent._emit_stream_drop(
error=ConnectionError("boom"),
attempt=2,
max_attempts=3,
mid_tool_call=True,
)
assert mock_emit.call_count == 1, (
f"expected exactly one _emit_status call (no Reconnected follow-up), "
f"got {mock_emit.call_count}"
) )
msg = mock_emit.call_args.args[0]
assert "openrouter" in msg, f"provider name missing from status: {msg}" # Identity
assert "stream drop" in msg assert "subagent_id=sa-3-deadbeef" in msg
assert "ConnectionError" in msg assert "provider=openrouter" in msg
assert "retry 2/3" in msg
# Single line — no separate "Reconnected" message. But the line itself # Inner-cause chain
# should mention reconnecting so the user knows we're recovering. assert "RuntimeError" in msg and "ConnectionError" in msg
assert "reconnect" in msg.lower()
# Counters and timing
assert "http_status=200" in msg
assert "bytes=4096" in msg
assert "chunks=12" in msg
# elapsed should be roughly 5s; allow some slack.
assert "elapsed=" in msg
assert "ttfb=0.50s" in msg
# Upstream headers
assert "cf-ray=8f1a2b3c4d5e6f7g-LAX" in msg
assert "x-openrouter-provider=Anthropic" in msg
assert "x-openrouter-id=gen-xyz789" in msg
@pytest.mark.parametrize("mid_tool_call", [True, False]) def test_log_stream_retry_works_without_diag(caplog):
def test_emit_stream_drop_always_writes_to_log(caplog, mid_tool_call): """diag is optional — older callers / unit tests still work."""
"""Both subagent and top-level drops produce a WARNING in agent.log."""
agent = _make_agent() agent = _make_agent()
agent._delegate_depth = 1 if mid_tool_call else 0 agent._delegate_depth = 0
agent.provider = "openrouter" agent.provider = "openrouter"
if mid_tool_call:
agent._subagent_id = "sa-99-feed"
with caplog.at_level(logging.WARNING, logger="run_agent"): with caplog.at_level(logging.WARNING, logger="run_agent"):
agent._emit_stream_drop( agent._log_stream_retry(
error=TimeoutError("read timeout"), kind="drop",
error=ConnectionError("x"),
attempt=2, attempt=2,
max_attempts=3, max_attempts=3,
mid_tool_call=mid_tool_call, mid_tool_call=False,
) )
found = [r for r in caplog.records if r.getMessage().startswith("Stream drop")] msg = next(r.getMessage() for r in caplog.records if "Stream drop" in r.getMessage())
assert found, "expected at least one Stream drop WARNING record" # Without diag, the structured fields show "-" placeholders.
msg = found[0].getMessage() assert "http_status=-" in msg
assert "error_type=TimeoutError" in msg assert "upstream=[-]" in msg
assert "provider=openrouter" in msg assert "bytes=0" in msg
assert "chunks=0" in msg
assert "ttfb=-" in msg
def test_emit_stream_drop_provider_named_when_multi_provider(): def test_emit_stream_drop_ui_includes_elapsed_when_available():
"""The user-visible line must name the provider so multi-provider
sessions can tell which subagent dropped (the original two-line message
only said 'provider', forcing a log dive)."""
agent = _make_agent() agent = _make_agent()
agent._delegate_depth = 1 agent.provider = "openrouter"
agent._subagent_id = "sa-1"
agent.provider = "anthropic" diag = AIAgent._stream_diag_init()
diag["started_at"] = time.time() - 8.0 # 8s on the wire before drop
with patch.object(agent, "_emit_status") as mock_emit: with patch.object(agent, "_emit_status") as mock_emit:
agent._emit_stream_drop( agent._emit_stream_drop(
error=ConnectionError("x"), error=ConnectionError("x"),
attempt=3, attempt=2,
max_attempts=3,
mid_tool_call=True,
diag=diag,
)
msg = mock_emit.call_args.args[0]
# Suffix with elapsed time helps distinguish "couldn't connect" (0s)
# from "died mid-stream after a while".
assert "after" in msg and "s" in msg
def test_emit_stream_drop_ui_omits_suffix_without_diag():
"""When there's no diag, no suffix — line stays compact."""
agent = _make_agent()
agent.provider = "openrouter"
with patch.object(agent, "_emit_status") as mock_emit:
agent._emit_stream_drop(
error=ConnectionError("x"),
attempt=2,
max_attempts=3, max_attempts=3,
mid_tool_call=False, mid_tool_call=False,
) )
msg = mock_emit.call_args.args[0] msg = mock_emit.call_args.args[0]
assert "anthropic" in msg # No "after Xs" suffix when diag is not provided.
assert " after " not in msg
# Still names the provider and error class.
assert "openrouter" in msg
assert "ConnectionError" in msg
def test_quiet_mode_does_not_clobber_runagent_logger_level():
"""Regression guard for the parent fix — must persist across this PR."""
_ = _make_agent()
for name in ("run_agent", "tools", "trajectory_compressor", "cron", "hermes_cli"):
logger = logging.getLogger(name)
assert logger.getEffectiveLevel() <= logging.WARNING