From 57f6762ca085839833b1eb9e2ca9e6cb69abcc4a Mon Sep 17 00:00:00 2001 From: teknium1 <127238744+teknium1@users.noreply.github.com> Date: Sat, 16 May 2026 18:28:17 -0700 Subject: [PATCH] refactor(run_agent): extract stream diagnostics to agent/stream_diag.py MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move the five stream-drop diagnostic helpers + the headers tuple: * STREAM_DIAG_HEADERS — cf-ray, x-openrouter-provider, x-request-id, etc. * stream_diag_init — fresh per-attempt diagnostic dict * stream_diag_capture_response — snapshot upstream headers + HTTP status * flatten_exception_chain — compact Outer(msg) <- Inner(msg) rendering * log_stream_retry — structured WARNING with provider/bytes/elapsed/ttfb * emit_stream_drop — user-facing status line + activity touch AIAgent keeps thin forwarder methods (and exposes the headers tuple as _STREAM_DIAG_HEADERS for back-compat). All test patches and call sites unchanged. tests/run_agent/ + tests/agent/: 4313 passed (same pre-existing test_auxiliary_client failure). run_agent.py: 13470 -> 13227 lines (-243). --- agent/stream_diag.py | 280 +++++++++++++++++++++++++++++++++++++++++++ run_agent.py | 234 ++++-------------------------------- 2 files changed, 303 insertions(+), 211 deletions(-) create mode 100644 agent/stream_diag.py diff --git a/agent/stream_diag.py b/agent/stream_diag.py new file mode 100644 index 00000000000..c4d8c54f470 --- /dev/null +++ b/agent/stream_diag.py @@ -0,0 +1,280 @@ +"""Stream diagnostics — per-attempt counters, exception chains, retry logging. + +When a streaming chat-completions request dies mid-response, we want to +know why: which Cloudflare edge served the request, which OpenRouter +downstream provider answered, how many bytes/chunks we got before the +drop, the HTTP status, the underlying httpx error class. These helpers +collect that info and emit it both to ``agent.log`` (full detail) and to +the user-facing status line (compact). + +All helpers are extracted from :class:`AIAgent` for cleanliness. +``run_agent`` keeps thin forwarder methods so existing call sites and +tests that patch ``run_agent.`` keep working. +""" + +from __future__ import annotations + +import logging +import time +from typing import Any, Dict, List, Optional + +logger = logging.getLogger(__name__) + + +# Per-attempt stream diagnostic headers. Lowercased; httpx returns +# CIMultiDict so case-insensitive lookups already work, but we read .get() +# on the dict from agent.log for free-form post-hoc analysis. +STREAM_DIAG_HEADERS = ( + "cf-ray", + "cf-cache-status", + "x-openrouter-provider", + "x-openrouter-model", + "x-openrouter-id", + "x-request-id", + "x-vercel-id", + "via", + "server", + "x-forwarded-for", +) + + +def stream_diag_init() -> Dict[str, Any]: + """Return a fresh per-attempt diagnostic dict. + + Mutated in-place by the streaming functions and read from the retry + block when a stream dies. Lives on ``request_client_holder`` so it + survives across the closure boundary. + """ + return { + "started_at": time.time(), + "first_chunk_at": None, + "chunks": 0, + "bytes": 0, + "headers": {}, + "http_status": None, + } + + +def stream_diag_capture_response(agent: Any, diag: Dict[str, Any], http_response: Any) -> None: + """Snapshot interesting headers + HTTP status from the live stream. + + Called once at stream open (before iterating chunks) so the metadata + survives even if the stream dies before any chunk arrives. Failures + are swallowed — diag is best-effort. + """ + if http_response is None or not isinstance(diag, dict): + return + try: + diag["http_status"] = getattr(http_response, "status_code", None) + except Exception: + pass + try: + headers = getattr(http_response, "headers", None) or {} + captured: Dict[str, str] = {} + # Allow per-agent override of the headers list (back-compat). + target_headers = getattr(agent, "_STREAM_DIAG_HEADERS", STREAM_DIAG_HEADERS) + for name in target_headers: + try: + val = headers.get(name) + if val: + # Truncate single-value to keep log lines bounded. + captured[name] = str(val)[:120] + except Exception: + continue + diag["headers"] = captured + except Exception: + pass + + +def flatten_exception_chain(error: BaseException) -> str: + """Return a compact ``Outer(msg) <- Inner(msg) <- ...`` rendering. + + OpenAI SDK wraps httpx errors as ``APIConnectionError`` / + ``APIError`` and only the wrapper's class is visible at the catch + site — but the underlying ``RemoteProtocolError`` / + ``ConnectError`` / ``ReadError`` is what tells us WHY the stream + died. Walks ``__cause__`` then ``__context__`` (deduped, max 4 + deep) to surface the chain in one line. + """ + seen: List[BaseException] = [] + link: Optional[BaseException] = error + while link is not None and len(seen) < 4: + if link in seen: + break + seen.append(link) + nxt = getattr(link, "__cause__", None) or getattr( + link, "__context__", None + ) + if nxt is None or nxt is link: + break + link = nxt + parts: List[str] = [] + for e in seen: + msg = str(e).strip().replace("\n", " ") + if len(msg) > 140: + msg = msg[:140] + "…" + parts.append(f"{type(e).__name__}({msg})" if msg else type(e).__name__) + return " <- ".join(parts) if parts else type(error).__name__ + + +def log_stream_retry( + agent: Any, + *, + kind: str, + error: BaseException, + attempt: int, + max_attempts: int, + mid_tool_call: bool, + diag: Optional[Dict[str, Any]] = None, +) -> None: + """Record a transient stream-drop and retry to ``agent.log``. + + Always logs a structured WARNING so users have a breadcrumb regardless + of UI verbosity. Subagents in particular benefit because their + retries no longer spam the parent's terminal — but the file log keeps + full detail (provider, error class, attempt, base_url, subagent_id). + + When *diag* is provided (the per-attempt stream-diagnostic dict from + :func:`stream_diag_init`), the WARNING also captures upstream headers + (cf-ray, x-openrouter-provider, x-openrouter-id), HTTP status, bytes + streamed before the drop, and elapsed time on the dying attempt. + These are the breadcrumbs needed to answer "is one CF edge / one + downstream provider responsible, or is it random across runs?" + """ + try: + try: + _summary = agent._summarize_api_error(error) + except Exception: + _summary = str(error) + if _summary and len(_summary) > 240: + _summary = _summary[:240] + "…" + + # Inner-cause chain (httpx errors hide under openai.APIError). + try: + _chain = flatten_exception_chain(error) + except Exception: + _chain = type(error).__name__ + + # Per-attempt counters and upstream headers. + _now = time.time() + _bytes = 0 + _chunks = 0 + _elapsed = 0.0 + _ttfb = None + _headers_repr = "-" + _http_status = "-" + if isinstance(diag, dict): + try: + _bytes = int(diag.get("bytes") or 0) + _chunks = int(diag.get("chunks") or 0) + _started = float(diag.get("started_at") or _now) + _elapsed = max(0.0, _now - _started) + _first = diag.get("first_chunk_at") + if _first is not None: + _ttfb = max(0.0, float(_first) - _started) + headers = diag.get("headers") or {} + if isinstance(headers, dict) and headers: + _headers_repr = " ".join( + f"{k}={v}" for k, v in headers.items() + ) + if diag.get("http_status") is not None: + _http_status = str(diag.get("http_status")) + except Exception: + pass + + logger.warning( + "Stream %s on attempt %s/%s — retrying. " + "subagent_id=%s depth=%s provider=%s base_url=%s " + "error_type=%s error=%s " + "chain=%s " + "http_status=%s bytes=%d chunks=%d elapsed=%.2fs ttfb=%s " + "upstream=[%s]", + kind, + attempt, + max_attempts, + getattr(agent, "_subagent_id", None) or "-", + getattr(agent, "_delegate_depth", 0), + agent.provider or "-", + agent.base_url or "-", + type(error).__name__, + _summary, + _chain, + _http_status, + _bytes, + _chunks, + _elapsed, + f"{_ttfb:.2f}s" if _ttfb is not None else "-", + _headers_repr, + extra={"mid_tool_call": mid_tool_call}, + ) + except Exception: + logger.debug("stream-retry log emit failed", exc_info=True) + + +def emit_stream_drop( + agent: Any, + *, + error: BaseException, + attempt: int, + max_attempts: int, + mid_tool_call: bool, + diag: Optional[Dict[str, Any]] = None, +) -> None: + """Emit a single user-visible line for a stream drop+retry. + + Both top-level agents and subagents announce drops in the UI — the + parent prefixes subagent lines with ``[subagent-N]`` via ``log_prefix`` + so they're easy to attribute. All cases also write a structured + WARNING to ``agent.log`` via :func:`log_stream_retry` with the full + diagnostic detail (subagent_id, provider, base_url, error_type, + cf-ray, x-openrouter-provider, bytes/chunks, elapsed) for post-hoc + analysis. + + The user-visible status line is intentionally compact: provider, + error class, attempt N/M, plus ``after Xs`` when the stream dropped + mid-flight. Full diagnostic detail goes to ``agent.log`` only — + ``hermes logs --level WARNING | grep "Stream drop"`` to inspect. + """ + kind = "drop mid tool-call" if mid_tool_call else "drop" + log_stream_retry( + agent, + kind=kind, + error=error, + attempt=attempt, + max_attempts=max_attempts, + mid_tool_call=mid_tool_call, + diag=diag, + ) + provider = agent.provider or "provider" + # Compose a brief "after Xs" suffix when we have timing data — helps + # the user distinguish "couldn't connect" (0s) from "died after 30s + # of streaming" (likely upstream idle-kill or proxy timeout). + _suffix = "" + if isinstance(diag, dict): + try: + started = diag.get("started_at") + if started is not None: + _suffix = f" after {max(0.0, time.time() - float(started)):.1f}s" + except Exception: + pass + try: + agent._emit_status( + f"⚠️ {provider} stream {kind} ({type(error).__name__}){_suffix} " + f"— reconnecting, retry {attempt}/{max_attempts}" + ) + agent._touch_activity( + f"stream retry {attempt}/{max_attempts} " + f"after {type(error).__name__}" + ) + except Exception: + pass + + +__all__ = [ + "STREAM_DIAG_HEADERS", + "stream_diag_init", + "stream_diag_capture_response", + "flatten_exception_chain", + "log_stream_retry", + "emit_stream_drop", +] diff --git a/run_agent.py b/run_agent.py index b5ea98d911d..234a322a480 100644 --- a/run_agent.py +++ b/run_agent.py @@ -2110,99 +2110,28 @@ class AIAgent: except Exception: logger.debug("status_callback error in _emit_warning", exc_info=True) - # Headers we capture from the dying stream's HTTP response so post-mortem - # diagnosis can answer "which CF edge / which OpenRouter downstream - # provider / which request id". Lowercased; httpx returns CIMultiDict. - _STREAM_DIAG_HEADERS = ( - "cf-ray", - "cf-cache-status", - "x-openrouter-provider", - "x-openrouter-model", - "x-openrouter-id", - "x-request-id", - "x-vercel-id", - "via", - "server", - "x-forwarded-for", - ) + # Stream-diagnostic class header preserved for backward compat — + # actual list lives in ``agent.stream_diag.STREAM_DIAG_HEADERS``. + from agent.stream_diag import STREAM_DIAG_HEADERS as _STREAM_DIAG_HEADERS # noqa: E402 @staticmethod def _stream_diag_init() -> Dict[str, Any]: - """Return a fresh per-attempt diagnostic dict. - - Mutated in-place by the streaming functions and read from the retry - block when a stream dies. Lives on ``request_client_holder`` so it - survives across the closure boundary. - """ - return { - "started_at": time.time(), - "first_chunk_at": None, - "chunks": 0, - "bytes": 0, - "headers": {}, - "http_status": None, - } + """Forwarder — see ``agent.stream_diag.stream_diag_init``.""" + from agent.stream_diag import stream_diag_init + return stream_diag_init() def _stream_diag_capture_response( self, diag: Dict[str, Any], http_response: Any ) -> None: - """Snapshot interesting headers + HTTP status from the live stream. - - Called once at stream open (before iterating chunks) so the metadata - survives even if the stream dies before any chunk arrives. Failures - are swallowed — diag is best-effort. - """ - if http_response is None or not isinstance(diag, dict): - return - try: - diag["http_status"] = getattr(http_response, "status_code", None) - except Exception: - pass - try: - headers = getattr(http_response, "headers", None) or {} - captured: Dict[str, str] = {} - for name in self._STREAM_DIAG_HEADERS: - try: - val = headers.get(name) - if val: - # Truncate single-value to keep log lines bounded. - captured[name] = str(val)[:120] - except Exception: - continue - diag["headers"] = captured - except Exception: - pass + """Forwarder — see ``agent.stream_diag.stream_diag_capture_response``.""" + from agent.stream_diag import stream_diag_capture_response + stream_diag_capture_response(self, diag, http_response) @staticmethod def _flatten_exception_chain(error: BaseException) -> str: - """Return a compact ``Outer(msg) <- Inner(msg) <- ...`` rendering. - - OpenAI SDK wraps httpx errors as ``APIConnectionError`` / - ``APIError`` and only the wrapper's class is visible at the catch - site — but the underlying ``RemoteProtocolError`` / - ``ConnectError`` / ``ReadError`` is what tells us WHY the stream - died. Walks ``__cause__`` then ``__context__`` (deduped, max 4 - deep) to surface the chain in one line. - """ - seen: List[BaseException] = [] - link: Optional[BaseException] = error - while link is not None and len(seen) < 4: - if link in seen: - break - seen.append(link) - nxt = getattr(link, "__cause__", None) or getattr( - link, "__context__", None - ) - if nxt is None or nxt is link: - break - link = nxt - parts: List[str] = [] - for e in seen: - msg = str(e).strip().replace("\n", " ") - if len(msg) > 140: - msg = msg[:140] + "…" - parts.append(f"{type(e).__name__}({msg})" if msg else type(e).__name__) - return " <- ".join(parts) if parts else type(error).__name__ + """Forwarder — see ``agent.stream_diag.flatten_exception_chain``.""" + from agent.stream_diag import flatten_exception_chain + return flatten_exception_chain(error) def _log_stream_retry( self, @@ -2214,88 +2143,12 @@ class AIAgent: mid_tool_call: bool, diag: Optional[Dict[str, Any]] = None, ) -> None: - """Record a transient stream-drop and retry to ``agent.log``. - - Always logs a structured WARNING so users have a breadcrumb regardless - of UI verbosity. Subagents in particular benefit because their - retries no longer spam the parent's terminal — but the file log keeps - full detail (provider, error class, attempt, base_url, subagent_id). - - When *diag* is provided (the per-attempt stream-diagnostic dict from - ``_stream_diag_init``), the WARNING also captures upstream headers - (cf-ray, x-openrouter-provider, x-openrouter-id), HTTP status, bytes - streamed before the drop, and elapsed time on the dying attempt. - These are the breadcrumbs needed to answer "is one CF edge / one - downstream provider responsible, or is it random across runs?" - """ - try: - try: - _summary = self._summarize_api_error(error) - except Exception: - _summary = str(error) - if _summary and len(_summary) > 240: - _summary = _summary[:240] + "…" - - # Inner-cause chain (httpx errors hide under openai.APIError). - try: - _chain = self._flatten_exception_chain(error) - except Exception: - _chain = type(error).__name__ - - # Per-attempt counters and upstream headers. - _now = time.time() - _bytes = 0 - _chunks = 0 - _elapsed = 0.0 - _ttfb = None - _headers_repr = "-" - _http_status = "-" - if isinstance(diag, dict): - try: - _bytes = int(diag.get("bytes") or 0) - _chunks = int(diag.get("chunks") or 0) - _started = float(diag.get("started_at") or _now) - _elapsed = max(0.0, _now - _started) - _first = diag.get("first_chunk_at") - if _first is not None: - _ttfb = max(0.0, float(_first) - _started) - headers = diag.get("headers") or {} - if isinstance(headers, dict) and headers: - _headers_repr = " ".join( - f"{k}={v}" for k, v in headers.items() - ) - if diag.get("http_status") is not None: - _http_status = str(diag.get("http_status")) - except Exception: - pass - - logger.warning( - "Stream %s on attempt %s/%s — retrying. " - "subagent_id=%s depth=%s provider=%s base_url=%s " - "error_type=%s error=%s " - "chain=%s " - "http_status=%s bytes=%d chunks=%d elapsed=%.2fs ttfb=%s " - "upstream=[%s]", - kind, - attempt, - max_attempts, - getattr(self, "_subagent_id", None) or "-", - getattr(self, "_delegate_depth", 0), - self.provider or "-", - self.base_url or "-", - type(error).__name__, - _summary, - _chain, - _http_status, - _bytes, - _chunks, - _elapsed, - f"{_ttfb:.2f}s" if _ttfb is not None else "-", - _headers_repr, - extra={"mid_tool_call": mid_tool_call}, - ) - except Exception: - logger.debug("stream-retry log emit failed", exc_info=True) + """Forwarder — see ``agent.stream_diag.log_stream_retry``.""" + from agent.stream_diag import log_stream_retry + log_stream_retry( + self, kind=kind, error=error, attempt=attempt, + max_attempts=max_attempts, mid_tool_call=mid_tool_call, diag=diag, + ) def _emit_stream_drop( self, @@ -2306,53 +2159,12 @@ class AIAgent: mid_tool_call: bool, diag: Optional[Dict[str, Any]] = None, ) -> None: - """Emit a single user-visible line for a stream drop+retry. - - Both top-level agents and subagents announce drops in the UI — the - parent prefixes subagent lines with ``[subagent-N]`` via ``log_prefix`` - so they're easy to attribute. All cases also write a structured - WARNING to ``agent.log`` via :meth:`_log_stream_retry` with the full - diagnostic detail (subagent_id, provider, base_url, error_type, - cf-ray, x-openrouter-provider, bytes/chunks, elapsed) for post-hoc - analysis. - - The user-visible status line is intentionally compact: provider, - error class, attempt N/M, plus ``after Xs`` when the stream dropped - mid-flight. Full diagnostic detail goes to ``agent.log`` only — - ``hermes logs --level WARNING | grep "Stream drop"`` to inspect. - """ - kind = "drop mid tool-call" if mid_tool_call else "drop" - self._log_stream_retry( - kind=kind, - error=error, - attempt=attempt, - max_attempts=max_attempts, - mid_tool_call=mid_tool_call, - diag=diag, + """Forwarder — see ``agent.stream_diag.emit_stream_drop``.""" + from agent.stream_diag import emit_stream_drop + emit_stream_drop( + self, error=error, attempt=attempt, max_attempts=max_attempts, + mid_tool_call=mid_tool_call, diag=diag, ) - provider = self.provider or "provider" - # Compose a brief "after Xs" suffix when we have timing data — helps - # the user distinguish "couldn't connect" (0s) from "died after 30s - # of streaming" (likely upstream idle-kill or proxy timeout). - _suffix = "" - if isinstance(diag, dict): - try: - started = diag.get("started_at") - if started is not None: - _suffix = f" after {max(0.0, time.time() - float(started)):.1f}s" - except Exception: - pass - try: - self._emit_status( - f"⚠️ {provider} stream {kind} ({type(error).__name__}){_suffix} " - f"— reconnecting, retry {attempt}/{max_attempts}" - ) - self._touch_activity( - f"stream retry {attempt}/{max_attempts} " - f"after {type(error).__name__}" - ) - except Exception: - pass def _emit_auxiliary_failure(self, task: str, exc: BaseException) -> None: """Surface a compact warning for failed auxiliary work."""