From 126cbffb8ad5a9f0fb9bd99a1569d36c40a570fe Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sat, 9 May 2026 22:49:35 -0700 Subject: [PATCH] feat(stream-retry): add upstream + timing diagnostics to drop log (#23005) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous PR (#22993) gave us a structured WARNING per stream drop but the only diagnostic was 'error_type=APIError error=Network connection lost.' — same nothing the user started with. To actually diagnose why subagents drop streams disproportionately we need to know WHERE the drop happened. Adds three breadcrumbs to the agent.log WARNING: 1. Inner exception chain. openai SDK wraps httpx errors as APIConnectionError / APIError so the catch site only sees the wrapper. _flatten_exception_chain walks __cause__/__context__ up to 4 levels deep and renders 'Outer(msg) <- Inner(msg)' so we can tell ConnectError vs RemoteProtocolError vs ReadError vs ProxyError without enabling verbose mode. 2. Upstream HTTP headers. Snapshots cf-ray, x-openrouter-provider, x-openrouter-model, x-openrouter-id, x-request-id, server, via, etc. from stream.response immediately after open (so they survive even when the stream dies before the first chunk). These answer 'is one CF edge / one downstream provider responsible, or random?' 3. Per-attempt counters. bytes streamed, chunk count, elapsed time on the dying attempt, and time-to-first-byte. Distinguishes 'couldn't connect at all' (0s, 0 bytes) from 'died after 30s mid-stream' (very different root causes — first is auth/routing, second is upstream idle-kill or proxy timeout). Plumbing: - _stream_diag_init / _stream_diag_capture_response live on AIAgent and produce a per-attempt dict held on request_client_holder['diag'] for closure access from the retry block. - _call_chat_completions and _call_anthropic both initialize the diag and increment counters per chunk/event (best-effort, never raises in the streaming hot path). - _log_stream_retry / _emit_stream_drop accept an optional diag and render the new fields. Final-exhaustion log goes through the same helper so it gets the same diagnostic dump. - UI status line gains a brief 'after Xs' suffix when timing is available — distinguishes 'connect failed' from 'died mid-stream' at a glance without grepping logs. Sample WARNING after this change: Stream drop mid tool-call on attempt 2/3 — retrying. subagent_id=sa-2-cafef00d depth=1 provider=openrouter base_url=https://openrouter.ai/api/v1 error_type=APIError error=Connection error. chain=APIError(Connection error.) <- RemoteProtocolError(peer closed connection without sending complete message body) http_status=200 bytes=12400 chunks=47 elapsed=12.00s ttfb=0.83s upstream=[cf-ray=8f1a2b3c4d5e6f7g-LAX x-openrouter-provider=Anthropic x-openrouter-id=gen-abc123 server=cloudflare] Tests: 10 covering diag init, header capture (whitelist enforced for PII), exception-chain walking + depth cap, log content with full diag, log content without diag (placeholders), UI elapsed-suffix on/off. --- run_agent.py | 257 ++++++++++++++++-- tests/run_agent/test_stream_drop_logging.py | 283 +++++++++++++------- 2 files changed, 420 insertions(+), 120 deletions(-) diff --git a/run_agent.py b/run_agent.py index 8413af96d72..8c3e28e68d3 100644 --- a/run_agent.py +++ b/run_agent.py @@ -2822,6 +2822,100 @@ class AIAgent: except Exception: logger.debug("status_callback error in _emit_warning", exc_info=True) + # Headers we capture from the dying stream's HTTP response so post-mortem + # diagnosis can answer "which CF edge / which OpenRouter downstream + # provider / which request id". Lowercased; httpx returns CIMultiDict. + _STREAM_DIAG_HEADERS = ( + "cf-ray", + "cf-cache-status", + "x-openrouter-provider", + "x-openrouter-model", + "x-openrouter-id", + "x-request-id", + "x-vercel-id", + "via", + "server", + "x-forwarded-for", + ) + + @staticmethod + def _stream_diag_init() -> Dict[str, Any]: + """Return a fresh per-attempt diagnostic dict. + + Mutated in-place by the streaming functions and read from the retry + block when a stream dies. Lives on ``request_client_holder`` so it + survives across the closure boundary. + """ + return { + "started_at": time.time(), + "first_chunk_at": None, + "chunks": 0, + "bytes": 0, + "headers": {}, + "http_status": None, + } + + def _stream_diag_capture_response( + self, diag: Dict[str, Any], http_response: Any + ) -> None: + """Snapshot interesting headers + HTTP status from the live stream. + + Called once at stream open (before iterating chunks) so the metadata + survives even if the stream dies before any chunk arrives. Failures + are swallowed — diag is best-effort. + """ + if http_response is None or not isinstance(diag, dict): + return + try: + diag["http_status"] = getattr(http_response, "status_code", None) + except Exception: + pass + try: + headers = getattr(http_response, "headers", None) or {} + captured: Dict[str, str] = {} + for name in self._STREAM_DIAG_HEADERS: + try: + val = headers.get(name) + if val: + # Truncate single-value to keep log lines bounded. + captured[name] = str(val)[:120] + except Exception: + continue + diag["headers"] = captured + except Exception: + pass + + @staticmethod + def _flatten_exception_chain(error: BaseException) -> str: + """Return a compact ``Outer(msg) <- Inner(msg) <- ...`` rendering. + + OpenAI SDK wraps httpx errors as ``APIConnectionError`` / + ``APIError`` and only the wrapper's class is visible at the catch + site — but the underlying ``RemoteProtocolError`` / + ``ConnectError`` / ``ReadError`` is what tells us WHY the stream + died. Walks ``__cause__`` then ``__context__`` (deduped, max 4 + deep) to surface the chain in one line. + """ + seen: List[BaseException] = [] + link: Optional[BaseException] = error + while link is not None and len(seen) < 4: + if link in seen: + break + seen.append(link) + nxt = getattr(link, "__cause__", None) or getattr( + link, "__context__", None + ) + if nxt is None or nxt is link: + break + link = nxt + parts: List[str] = [] + for e in seen: + msg = str(e).strip().replace("\n", " ") + if len(msg) > 140: + msg = msg[:140] + "…" + parts.append(f"{type(e).__name__}({msg})" if msg else type(e).__name__) + return " <- ".join(parts) if parts else type(error).__name__ + def _log_stream_retry( self, *, @@ -2830,6 +2924,7 @@ class AIAgent: attempt: int, max_attempts: int, mid_tool_call: bool, + diag: Optional[Dict[str, Any]] = None, ) -> None: """Record a transient stream-drop and retry to ``agent.log``. @@ -2837,6 +2932,13 @@ class AIAgent: of UI verbosity. Subagents in particular benefit because their retries no longer spam the parent's terminal — but the file log keeps full detail (provider, error class, attempt, base_url, subagent_id). + + When *diag* is provided (the per-attempt stream-diagnostic dict from + ``_stream_diag_init``), the WARNING also captures upstream headers + (cf-ray, x-openrouter-provider, x-openrouter-id), HTTP status, bytes + streamed before the drop, and elapsed time on the dying attempt. + These are the breadcrumbs needed to answer "is one CF edge / one + downstream provider responsible, or is it random across runs?" """ try: try: @@ -2845,10 +2947,47 @@ class AIAgent: _summary = str(error) if _summary and len(_summary) > 240: _summary = _summary[:240] + "…" + + # Inner-cause chain (httpx errors hide under openai.APIError). + try: + _chain = self._flatten_exception_chain(error) + except Exception: + _chain = type(error).__name__ + + # Per-attempt counters and upstream headers. + _now = time.time() + _bytes = 0 + _chunks = 0 + _elapsed = 0.0 + _ttfb = None + _headers_repr = "-" + _http_status = "-" + if isinstance(diag, dict): + try: + _bytes = int(diag.get("bytes") or 0) + _chunks = int(diag.get("chunks") or 0) + _started = float(diag.get("started_at") or _now) + _elapsed = max(0.0, _now - _started) + _first = diag.get("first_chunk_at") + if _first is not None: + _ttfb = max(0.0, float(_first) - _started) + headers = diag.get("headers") or {} + if isinstance(headers, dict) and headers: + _headers_repr = " ".join( + f"{k}={v}" for k, v in headers.items() + ) + if diag.get("http_status") is not None: + _http_status = str(diag.get("http_status")) + except Exception: + pass + logger.warning( "Stream %s on attempt %s/%s — retrying. " "subagent_id=%s depth=%s provider=%s base_url=%s " - "error_type=%s error=%s", + "error_type=%s error=%s " + "chain=%s " + "http_status=%s bytes=%d chunks=%d elapsed=%.2fs ttfb=%s " + "upstream=[%s]", kind, attempt, max_attempts, @@ -2858,6 +2997,13 @@ class AIAgent: self.base_url or "-", type(error).__name__, _summary, + _chain, + _http_status, + _bytes, + _chunks, + _elapsed, + f"{_ttfb:.2f}s" if _ttfb is not None else "-", + _headers_repr, extra={"mid_tool_call": mid_tool_call}, ) except Exception: @@ -2870,6 +3016,7 @@ class AIAgent: attempt: int, max_attempts: int, mid_tool_call: bool, + diag: Optional[Dict[str, Any]] = None, ) -> None: """Emit a single user-visible line for a stream drop+retry. @@ -2877,13 +3024,14 @@ class AIAgent: parent prefixes subagent lines with ``[subagent-N]`` via ``log_prefix`` so they're easy to attribute. All cases also write a structured WARNING to ``agent.log`` via :meth:`_log_stream_retry` with the full - diagnostic detail (subagent_id, provider, base_url, error_type) for - post-hoc analysis. + diagnostic detail (subagent_id, provider, base_url, error_type, + cf-ray, x-openrouter-provider, bytes/chunks, elapsed) for post-hoc + analysis. - Replaces the older two-line ``⚠️ Connection dropped …`` + - ``🔄 Reconnected …`` pair with a single information-dense line that - names the provider (so multi-provider sessions can tell who dropped) - and the error class without ambiguity. + The user-visible status line is intentionally compact: provider, + error class, attempt N/M, plus ``after Xs`` when the stream dropped + mid-flight. Full diagnostic detail goes to ``agent.log`` only — + ``hermes logs --level WARNING | grep "Stream drop"`` to inspect. """ kind = "drop mid tool-call" if mid_tool_call else "drop" self._log_stream_retry( @@ -2892,11 +3040,23 @@ class AIAgent: attempt=attempt, max_attempts=max_attempts, mid_tool_call=mid_tool_call, + diag=diag, ) provider = self.provider or "provider" + # Compose a brief "after Xs" suffix when we have timing data — helps + # the user distinguish "couldn't connect" (0s) from "died after 30s + # of streaming" (likely upstream idle-kill or proxy timeout). + _suffix = "" + if isinstance(diag, dict): + try: + started = diag.get("started_at") + if started is not None: + _suffix = f" after {max(0.0, time.time() - float(started)):.1f}s" + except Exception: + pass try: self._emit_status( - f"⚠️ {provider} stream {kind} ({type(error).__name__}) " + f"⚠️ {provider} stream {kind} ({type(error).__name__}){_suffix} " f"— reconnecting, retry {attempt}/{max_attempts}" ) self._touch_activity( @@ -7382,7 +7542,7 @@ class AIAgent: return result["response"] result = {"response": None, "error": None, "partial_tool_names": []} - request_client_holder = {"client": None} + request_client_holder = {"client": None, "diag": None} first_delta_fired = {"done": False} deltas_were_sent = {"yes": False} # Track if any deltas were fired (for fallback) # Wall-clock timestamp of the last real streaming chunk. The outer @@ -7444,12 +7604,21 @@ class AIAgent: # attempt's start, not a previous attempt's last chunk. last_chunk_time["t"] = time.time() self._touch_activity("waiting for provider response (streaming)") + # Initialize per-attempt stream diagnostics so the retry block can + # reach for them after the stream dies. Lives on + # ``request_client_holder["diag"]`` for closure access. + _diag = self._stream_diag_init() + request_client_holder["diag"] = _diag stream = request_client_holder["client"].chat.completions.create(**stream_kwargs) # Capture rate limit headers from the initial HTTP response. # The OpenAI SDK Stream object exposes the underlying httpx # response via .response before any chunks are consumed. self._capture_rate_limits(getattr(stream, "response", None)) + # Snapshot diagnostic headers (cf-ray, x-openrouter-provider, etc.) + # so they survive even when the stream dies before any chunk + # arrives. Best-effort; never raises. + self._stream_diag_capture_response(_diag, getattr(stream, "response", None)) # Log OpenRouter response cache status when present. self._check_openrouter_cache_status(getattr(stream, "response", None)) @@ -7472,6 +7641,24 @@ class AIAgent: last_chunk_time["t"] = time.time() self._touch_activity("receiving stream response") + # Update per-attempt diagnostic counters. Best-effort — + # failures are swallowed so the streaming hot path is never + # interrupted by diagnostic accounting. + try: + _diag["chunks"] = int(_diag.get("chunks", 0)) + 1 + if _diag.get("first_chunk_at") is None: + _diag["first_chunk_at"] = last_chunk_time["t"] + # Approximate byte size from the chunk's repr — exact wire + # bytes aren't exposed by the SDK, but len(repr(chunk)) is + # a stable proxy for "how much content arrived" that + # survives stub provider differences. + try: + _diag["bytes"] = int(_diag.get("bytes", 0)) + len(repr(chunk)) + except Exception: + pass + except Exception: + pass + if self._interrupt_requested: break @@ -7666,8 +7853,21 @@ class AIAgent: # Reset stale-stream timer for this attempt last_chunk_time["t"] = time.time() + # Per-attempt diagnostic dict for the retry block to consume. + _diag = self._stream_diag_init() + request_client_holder["diag"] = _diag # Use the Anthropic SDK's streaming context manager with self._anthropic_client.messages.stream(**api_kwargs) as stream: + # The Anthropic SDK exposes the raw httpx response on + # ``stream.response``. Snapshot diagnostic headers + # immediately so they survive a stream that dies before the + # first event. + try: + self._stream_diag_capture_response( + _diag, getattr(stream, "response", None) + ) + except Exception: + pass for event in stream: # Update stale-stream timer on every event so the # outer poll loop knows data is flowing. Without @@ -7678,6 +7878,18 @@ class AIAgent: last_chunk_time["t"] = time.time() self._touch_activity("receiving stream response") + # Update per-attempt diagnostic counters (best-effort). + try: + _diag["chunks"] = int(_diag.get("chunks", 0)) + 1 + if _diag.get("first_chunk_at") is None: + _diag["first_chunk_at"] = last_chunk_time["t"] + try: + _diag["bytes"] = int(_diag.get("bytes", 0)) + len(repr(event)) + except Exception: + pass + except Exception: + pass + if self._interrupt_requested: break @@ -7831,6 +8043,7 @@ class AIAgent: attempt=_stream_attempt + 2, max_attempts=_max_stream_retries + 1, mid_tool_call=True, + diag=request_client_holder.get("diag"), ) stale = request_client_holder.get("client") if stale is not None: @@ -7885,6 +8098,7 @@ class AIAgent: attempt=_stream_attempt + 2, max_attempts=_max_stream_retries + 1, mid_tool_call=False, + diag=request_client_holder.get("diag"), ) # Close the stale request client before retry stale = request_client_holder.get("client") @@ -7903,19 +8117,18 @@ class AIAgent: pass continue # Retries exhausted. Log the final failure with - # full diagnostic detail and surface a status - # line — subagent lines get the ``[subagent-N]`` - # log_prefix so the parent can attribute them. - logger.warning( - "Streaming exhausted %s retries on transient " - "error: subagent_id=%s depth=%s provider=%s " - "error_type=%s error=%s", - _max_stream_retries + 1, - getattr(self, "_subagent_id", None) or "-", - getattr(self, "_delegate_depth", 0), - self.provider or "-", - type(e).__name__, - e, + # full diagnostic detail (chain, headers, + # bytes/elapsed) via the same helper used for + # mid-flight retries — subagent lines get the + # ``[subagent-N]`` log_prefix so the parent can + # attribute them. + self._log_stream_retry( + kind="exhausted", + error=e, + attempt=_max_stream_retries + 1, + max_attempts=_max_stream_retries + 1, + mid_tool_call=False, + diag=request_client_holder.get("diag"), ) self._emit_status( "❌ Connection to provider failed after " diff --git a/tests/run_agent/test_stream_drop_logging.py b/tests/run_agent/test_stream_drop_logging.py index b319725f76b..f424a4f403f 100644 --- a/tests/run_agent/test_stream_drop_logging.py +++ b/tests/run_agent/test_stream_drop_logging.py @@ -1,24 +1,24 @@ -"""Tests for the structured stream-drop log + clearer single-line status. +"""Tests for richer stream-drop diagnostics in agent.log. -Regression coverage for the change that: +When a subagent's stream drops mid-tool-call, the WARNING in agent.log must +carry enough breadcrumbs to answer "WHY did it drop" without requiring a +verbose-mode rerun. Specifically: -1. Removed ``logger.setLevel(ERROR)`` on tools/run_agent/etc. in quiet mode. - It was clobbering the root logger's file handlers (agent.log/errors.log) - because Python checks logger-level before handler propagation, so - subagent stream-drop diagnostics were never written to disk. -2. Replaced the two ``⚠️ Connection dropped …`` + ``🔄 Reconnected …`` - ``_emit_status`` calls with a single ``_emit_stream_drop`` helper that: - - Always writes a structured WARNING to ``agent.log``. - - Always emits exactly ONE user-visible status line per drop (no - follow-up "Reconnected" line) that names the provider and error - class so multi-provider sessions can attribute it cleanly. - - Subagent lines get the ``[subagent-N]`` ``log_prefix`` automatically - via ``_emit_status`` → ``_vprint``. +- Inner exception chain (httpx errors wrapped by openai SDK) +- Upstream HTTP headers (cf-ray, x-openrouter-provider, x-openrouter-id, ...) +- HTTP status of the dying response +- Bytes streamed and chunks received before the drop +- Elapsed time on the attempt + time-to-first-byte + +Plus the user-visible UI line gains an ``after Xs`` suffix when timing data +is available, distinguishing "couldn't connect at all" from "died mid-stream +after N seconds" (very different root causes). """ from __future__ import annotations import logging +import time from unittest.mock import patch import pytest @@ -37,124 +37,211 @@ def _make_agent() -> AIAgent: ) -def test_quiet_mode_does_not_clobber_runagent_logger_level(): - """quiet_mode must not raise the ``run_agent`` logger above WARNING. - - Setting ``setLevel(ERROR)`` on the logger filters records *before* root - logger handlers (agent.log/errors.log) ever see them. Stream-drop - diagnostics must reach the file handlers regardless of console verbosity. - """ - _ = _make_agent() - for name in ("run_agent", "tools", "trajectory_compressor", "cron", "hermes_cli"): - logger = logging.getLogger(name) - assert logger.getEffectiveLevel() <= logging.WARNING, ( - f"{name} logger blocked at level {logger.getEffectiveLevel()} — " - f"file handlers will lose WARNING records" - ) +def test_stream_diag_init_returns_well_formed_dict(): + diag = AIAgent._stream_diag_init() + assert "started_at" in diag + assert diag["chunks"] == 0 + assert diag["bytes"] == 0 + assert diag["first_chunk_at"] is None + assert diag["http_status"] is None + assert diag["headers"] == {} -def test_log_stream_retry_writes_structured_warning(caplog): +class _FakeHeaders: + def __init__(self, d): self._d = {k.lower(): v for k, v in d.items()} + def get(self, k, default=None): return self._d.get(k.lower(), default) + + +class _FakeResponse: + def __init__(self, headers, status=200): + self.status_code = status + self.headers = _FakeHeaders(headers) + + +def test_stream_diag_capture_response_collects_known_headers(): + agent = _make_agent() + diag = AIAgent._stream_diag_init() + resp = _FakeResponse({ + "cf-ray": "8f1a2b3c4d5e6f7g-LAX", + "x-openrouter-provider": "Anthropic", + "x-openrouter-id": "gen-abc123", + "x-request-id": "req-xyz", + "server": "cloudflare", + "irrelevant-header": "ignored", + }) + agent._stream_diag_capture_response(diag, resp) + assert diag["http_status"] == 200 + assert diag["headers"]["cf-ray"] == "8f1a2b3c4d5e6f7g-LAX" + assert diag["headers"]["x-openrouter-provider"] == "Anthropic" + assert diag["headers"]["x-openrouter-id"] == "gen-abc123" + assert diag["headers"]["server"] == "cloudflare" + # Headers not in _STREAM_DIAG_HEADERS must not be captured (PII surface). + assert "irrelevant-header" not in diag["headers"] + + +def test_stream_diag_capture_response_safe_with_none(): + agent = _make_agent() + diag = AIAgent._stream_diag_init() + agent._stream_diag_capture_response(diag, None) + # Must not raise; diag stays initialized. + assert diag["headers"] == {} + + +def test_flatten_exception_chain_walks_cause(): + inner = ConnectionError("upstream closed") + middle = TimeoutError("timed out") + middle.__cause__ = inner + outer = RuntimeError("wrapper") + outer.__cause__ = middle + chain = AIAgent._flatten_exception_chain(outer) + assert "RuntimeError" in chain + assert "TimeoutError" in chain + assert "ConnectionError" in chain + assert " <- " in chain + + +def test_flatten_exception_chain_caps_depth(): + """Chain renders no more than 4 deep so log lines stay bounded.""" + e0 = ValueError("0") + prev = e0 + for i in range(1, 8): + nxt = ValueError(str(i)) + nxt.__cause__ = prev + prev = nxt + chain = AIAgent._flatten_exception_chain(prev) + # 4 layers + 3 separators max. + assert chain.count("<-") <= 3 + + +def test_log_stream_retry_includes_diagnostic_fields(caplog): agent = _make_agent() agent._delegate_depth = 1 - agent._subagent_id = "sa-7-cafef00d" + agent._subagent_id = "sa-3-deadbeef" agent.provider = "openrouter" - err = ConnectionError("Network connection lost.") + diag = AIAgent._stream_diag_init() + diag["http_status"] = 200 + diag["headers"] = { + "cf-ray": "8f1a2b3c4d5e6f7g-LAX", + "x-openrouter-provider": "Anthropic", + "x-openrouter-id": "gen-xyz789", + } + diag["chunks"] = 12 + diag["bytes"] = 4096 + # Simulate 5s elapsed with first chunk at 0.5s. + diag["started_at"] = time.time() - 5.0 + diag["first_chunk_at"] = diag["started_at"] + 0.5 + + inner = ConnectionError("peer closed") + outer = RuntimeError("Connection error.") + outer.__cause__ = inner + with caplog.at_level(logging.WARNING, logger="run_agent"): agent._log_stream_retry( kind="drop mid tool-call", - error=err, + error=outer, attempt=2, max_attempts=3, mid_tool_call=True, + diag=diag, ) - matching = [r for r in caplog.records if "Stream drop mid tool-call" in r.getMessage()] - assert matching, f"no stream-drop WARNING captured; got {[r.getMessage() for r in caplog.records]}" - msg = matching[0].getMessage() - assert "subagent_id=sa-7-cafef00d" in msg - assert "depth=1" in msg - assert "provider=openrouter" in msg - assert "base_url=https://openrouter.ai/api/v1" in msg - assert "error_type=ConnectionError" in msg - - -@pytest.mark.parametrize("depth", [0, 1]) -def test_emit_stream_drop_emits_status_line(depth): - """Both top-level and subagent paths emit exactly one status line. - - Subagent lines get the ``[subagent-N]`` log_prefix via the parent's - ``_vprint`` plumbing — this test only checks that ``_emit_status`` is - called once with the right content. No "Reconnected" follow-up. - """ - agent = _make_agent() - agent._delegate_depth = depth - if depth > 0: - agent._subagent_id = "sa-2-cafe" - agent.provider = "openrouter" - - with patch.object(agent, "_emit_status") as mock_emit: - agent._emit_stream_drop( - error=ConnectionError("boom"), - attempt=2, - max_attempts=3, - mid_tool_call=True, - ) - - assert mock_emit.call_count == 1, ( - f"expected exactly one _emit_status call (no Reconnected follow-up), " - f"got {mock_emit.call_count}" + msg = next( + r.getMessage() for r in caplog.records + if "Stream drop mid tool-call" in r.getMessage() ) - msg = mock_emit.call_args.args[0] - assert "openrouter" in msg, f"provider name missing from status: {msg}" - assert "stream drop" in msg - assert "ConnectionError" in msg - assert "retry 2/3" in msg - # Single line — no separate "Reconnected" message. But the line itself - # should mention reconnecting so the user knows we're recovering. - assert "reconnect" in msg.lower() + + # Identity + assert "subagent_id=sa-3-deadbeef" in msg + assert "provider=openrouter" in msg + + # Inner-cause chain + assert "RuntimeError" in msg and "ConnectionError" in msg + + # Counters and timing + assert "http_status=200" in msg + assert "bytes=4096" in msg + assert "chunks=12" in msg + # elapsed should be roughly 5s; allow some slack. + assert "elapsed=" in msg + assert "ttfb=0.50s" in msg + + # Upstream headers + assert "cf-ray=8f1a2b3c4d5e6f7g-LAX" in msg + assert "x-openrouter-provider=Anthropic" in msg + assert "x-openrouter-id=gen-xyz789" in msg -@pytest.mark.parametrize("mid_tool_call", [True, False]) -def test_emit_stream_drop_always_writes_to_log(caplog, mid_tool_call): - """Both subagent and top-level drops produce a WARNING in agent.log.""" +def test_log_stream_retry_works_without_diag(caplog): + """diag is optional — older callers / unit tests still work.""" agent = _make_agent() - agent._delegate_depth = 1 if mid_tool_call else 0 + agent._delegate_depth = 0 agent.provider = "openrouter" - if mid_tool_call: - agent._subagent_id = "sa-99-feed" with caplog.at_level(logging.WARNING, logger="run_agent"): - agent._emit_stream_drop( - error=TimeoutError("read timeout"), + agent._log_stream_retry( + kind="drop", + error=ConnectionError("x"), attempt=2, max_attempts=3, - mid_tool_call=mid_tool_call, + mid_tool_call=False, ) - found = [r for r in caplog.records if r.getMessage().startswith("Stream drop")] - assert found, "expected at least one Stream drop WARNING record" - msg = found[0].getMessage() - assert "error_type=TimeoutError" in msg - assert "provider=openrouter" in msg + msg = next(r.getMessage() for r in caplog.records if "Stream drop" in r.getMessage()) + # Without diag, the structured fields show "-" placeholders. + assert "http_status=-" in msg + assert "upstream=[-]" in msg + assert "bytes=0" in msg + assert "chunks=0" in msg + assert "ttfb=-" in msg -def test_emit_stream_drop_provider_named_when_multi_provider(): - """The user-visible line must name the provider so multi-provider - sessions can tell which subagent dropped (the original two-line message - only said 'provider', forcing a log dive).""" +def test_emit_stream_drop_ui_includes_elapsed_when_available(): agent = _make_agent() - agent._delegate_depth = 1 - agent._subagent_id = "sa-1" - agent.provider = "anthropic" + agent.provider = "openrouter" + + diag = AIAgent._stream_diag_init() + diag["started_at"] = time.time() - 8.0 # 8s on the wire before drop with patch.object(agent, "_emit_status") as mock_emit: agent._emit_stream_drop( error=ConnectionError("x"), - attempt=3, + attempt=2, + max_attempts=3, + mid_tool_call=True, + diag=diag, + ) + + msg = mock_emit.call_args.args[0] + # Suffix with elapsed time helps distinguish "couldn't connect" (0s) + # from "died mid-stream after a while". + assert "after" in msg and "s" in msg + + +def test_emit_stream_drop_ui_omits_suffix_without_diag(): + """When there's no diag, no suffix — line stays compact.""" + agent = _make_agent() + agent.provider = "openrouter" + + with patch.object(agent, "_emit_status") as mock_emit: + agent._emit_stream_drop( + error=ConnectionError("x"), + attempt=2, max_attempts=3, mid_tool_call=False, ) msg = mock_emit.call_args.args[0] - assert "anthropic" in msg + # No "after Xs" suffix when diag is not provided. + assert " after " not in msg + # Still names the provider and error class. + assert "openrouter" in msg + assert "ConnectionError" in msg + + +def test_quiet_mode_does_not_clobber_runagent_logger_level(): + """Regression guard for the parent fix — must persist across this PR.""" + _ = _make_agent() + for name in ("run_agent", "tools", "trajectory_compressor", "cron", "hermes_cli"): + logger = logging.getLogger(name) + assert logger.getEffectiveLevel() <= logging.WARNING