From 67011cc0d76b7047320b2760e948b4e4488c24ca Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Thu, 28 May 2026 04:53:27 -0700 Subject: [PATCH] feat(agent): buffer retry/fallback status, surface only on terminal failure (#33816) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Users report that the CLI/gateway floods them with confusing retry chatter during transient failures: a single 429 can produce 10+ "Provider/Endpoint/ Retrying in 5s..." lines before the request eventually succeeds. The same firehose hits Telegram, Discord, Slack, etc. via _emit_status. This patch defers all retry/fallback/compression status messages until we know the outcome: - if the turn ultimately succeeds (any path: primary recovers, fallback activates, compression unsticks the request), the buffer is silently dropped — the user sees nothing. - if every retry and fallback exhausts and the turn fails, the buffer is flushed at the terminal-failure return so the user sees the full retry trace alongside the final error. Backend logging (agent.log) is unchanged — every emission site still writes to logger.warning/info, so post-mortem diagnosis is intact. ## What changed run_agent.py: four new methods on AIAgent: _buffer_status(msg) — defer an _emit_status call _buffer_vprint(msg) — defer a _vprint(force=True) line _clear_status_buffer() — drop pending messages on success _flush_status_buffer() — replay pending messages on terminal failure agent/conversation_loop.py: - converted ~30 mid-process emit/vprint sites in the retry, fallback, compression, empty-response, and stream-watchdog paths to the buffered helpers - added _flush_status_buffer() at every terminal-failure return so users still see the trace when it actually matters - added _clear_status_buffer() at the "non-empty assistant content" point (NOT at "API call returned bytes" — empty responses still loop through the empty-retry path and would otherwise lose their trace between iterations) - silenced the two "(´;ω;`) oops, retrying..." / "(╥_╥) error, retrying..." spinner final-frame messages — the spinner now stops cleanly so retries leave no visible residue agent/chat_completion_helpers.py: same conversion for codex TTFB / stale- stream / fallback-activation status messages. agent/stream_diag.py: _emit_stream_drop now buffers instead of emitting directly. ## Tests tests/run_agent/test_retry_status_buffer.py: 7 unit tests covering accumulate→flush, clear-on-success, mixed kinds, empty-buffer no-op, re-buffer after flush, exception swallowing. Updated 3 existing tests that mocked _emit_status to also mock (or use) _buffer_status: - tests/run_agent/test_run_agent.py::test_empty_response_emits_status_for_gateway - tests/run_agent/test_stream_drop_logging.py (2 tests) - tests/agent/test_codex_ttfb_watchdog.py (TTFB hint test) ## Validation Live test: hermes chat -q against an unreachable endpoint with no fallback exhausts retries and prints the full trace at the end. Same flow against a working endpoint prints zero retry chatter. --- agent/chat_completion_helpers.py | 16 +- agent/conversation_loop.py | 193 +++++++++++--------- agent/stream_diag.py | 2 +- run_agent.py | 77 ++++++++ tests/agent/test_codex_ttfb_watchdog.py | 1 + tests/run_agent/test_retry_status_buffer.py | 157 ++++++++++++++++ tests/run_agent/test_stream_drop_logging.py | 4 +- 7 files changed, 354 insertions(+), 96 deletions(-) create mode 100644 tests/run_agent/test_retry_status_buffer.py diff --git a/agent/chat_completion_helpers.py b/agent/chat_completion_helpers.py index ce83dd04907..35d0477cf67 100644 --- a/agent/chat_completion_helpers.py +++ b/agent/chat_completion_helpers.py @@ -403,13 +403,13 @@ def interruptible_api_call(agent, api_kwargs: dict): _elapsed, _ttfb_timeout, api_kwargs.get("model", "unknown"), ) if _silent_hint: - agent._emit_status( + agent._buffer_status( f"⚠️ No first byte from provider in {int(_elapsed)}s " f"(codex stream, model: {api_kwargs.get('model', 'unknown')}). " f"Reconnecting. {_silent_hint}" ) else: - agent._emit_status( + agent._buffer_status( f"⚠️ No first byte from provider in {int(_elapsed)}s " f"(codex stream, model: {api_kwargs.get('model', 'unknown')}). " f"Reconnecting." @@ -455,7 +455,7 @@ def interruptible_api_call(agent, api_kwargs: dict): api_kwargs.get("model", "unknown"), f"{_est_tokens_for_codex_watchdog:,}", ) - agent._emit_status( + agent._buffer_status( f"⚠️ Codex stream sent no events for {int(_event_stale_elapsed)}s " f"after first byte (model: {api_kwargs.get('model', 'unknown')}). " f"Reconnecting." @@ -493,13 +493,13 @@ def interruptible_api_call(agent, api_kwargs: dict): api_kwargs.get("model", "unknown"), f"{_est_ctx:,}", ) if _silent_hint: - agent._emit_status( + agent._buffer_status( f"⚠️ No response from provider for {int(_elapsed)}s " f"(non-streaming, model: {api_kwargs.get('model', 'unknown')}). " f"{_silent_hint}" ) else: - agent._emit_status( + agent._buffer_status( f"⚠️ No response from provider for {int(_elapsed)}s " f"(non-streaming, model: {api_kwargs.get('model', 'unknown')}). " f"Aborting call." @@ -1262,7 +1262,7 @@ def try_activate_fallback(agent, reason: "FailoverReason | None" = None) -> bool api_mode=agent.api_mode, ) - agent._emit_status( + agent._buffer_status( f"🔄 Primary model failed — switching to fallback: " f"{fb_model} via {fb_provider}" ) @@ -2251,7 +2251,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= mid_tool_call=False, diag=request_client_holder.get("diag"), ) - agent._emit_status( + agent._buffer_status( "❌ Provider returned malformed streaming data after " f"{_max_stream_retries + 1} attempts. " "The provider may be experiencing issues — " @@ -2358,7 +2358,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta= _stale_elapsed, _stream_stale_timeout, api_kwargs.get("model", "unknown"), f"{_est_ctx:,}", ) - agent._emit_status( + agent._buffer_status( f"⚠️ No response from provider for {int(_stale_elapsed)}s " f"(model: {api_kwargs.get('model', 'unknown')}, " f"context: ~{_est_ctx:,} tokens). " diff --git a/agent/conversation_loop.py b/agent/conversation_loop.py index f21dd183d4b..49ce9dbb376 100644 --- a/agent/conversation_loop.py +++ b/agent/conversation_loop.py @@ -1151,17 +1151,18 @@ def run_conversation( f"Nous Portal rate limit active — " f"resets in {_fmt_nous_remaining(_nous_remaining)}." ) - agent._vprint( - f"{agent.log_prefix}⏳ {_nous_msg} Trying fallback...", - force=True, + agent._buffer_vprint( + f"⏳ {_nous_msg} Trying fallback..." ) - agent._emit_status(f"⏳ {_nous_msg}") + agent._buffer_status(f"⏳ {_nous_msg}") if agent._try_activate_fallback(): retry_count = 0 compression_attempts = 0 primary_recovery_attempted = False continue - # No fallback available — return with clear message + # No fallback available — surface buffered context + # so user sees the rate-limit message that led here. + agent._flush_status_buffer() agent._persist_session(messages, conversation_history) return { "final_response": ( @@ -1384,9 +1385,10 @@ def run_conversation( error_details.append("response.choices is empty") if response_invalid: - # Stop spinner before printing error messages + # Stop spinner silently — retry status is now buffered + # and only surfaced if every retry+fallback exhausts. if thinking_spinner: - thinking_spinner.stop("(´;ω;`) oops, retrying...") + thinking_spinner.stop("") thinking_spinner = None if agent.thinking_callback: agent.thinking_callback("") @@ -1399,7 +1401,7 @@ def run_conversation( # rate-limit symptom. Switch to fallback immediately # rather than retrying with extended backoff. if agent._fallback_index < len(agent._fallback_chain): - agent._emit_status("⚠️ Empty/malformed response — switching to fallback...") + agent._buffer_status("⚠️ Empty/malformed response — switching to fallback...") if agent._try_activate_fallback(): retry_count = 0 compression_attempts = 0 @@ -1461,20 +1463,22 @@ def run_conversation( else: _failure_hint = f"response time {api_duration:.1f}s" - agent._vprint(f"{agent.log_prefix}⚠️ Invalid API response (attempt {retry_count}/{max_retries}): {', '.join(error_details)}", force=True) - agent._vprint(f"{agent.log_prefix} 🏢 Provider: {provider_name}", force=True) + agent._buffer_vprint(f"⚠️ Invalid API response (attempt {retry_count}/{max_retries}): {', '.join(error_details)}") + agent._buffer_vprint(f" 🏢 Provider: {provider_name}") cleaned_provider_error = agent._clean_error_message(error_msg) - agent._vprint(f"{agent.log_prefix} 📝 Provider message: {cleaned_provider_error}", force=True) - agent._vprint(f"{agent.log_prefix} ⏱️ {_failure_hint}", force=True) + agent._buffer_vprint(f" 📝 Provider message: {cleaned_provider_error}") + agent._buffer_vprint(f" ⏱️ {_failure_hint}") if retry_count >= max_retries: # Try fallback before giving up - agent._emit_status(f"⚠️ Max retries ({max_retries}) for invalid responses — trying fallback...") + agent._buffer_status(f"⚠️ Max retries ({max_retries}) for invalid responses — trying fallback...") if agent._try_activate_fallback(): retry_count = 0 compression_attempts = 0 primary_recovery_attempted = False continue + # Terminal — flush buffered retry trace so user sees what happened. + agent._flush_status_buffer() agent._emit_status(f"❌ Max retries ({max_retries}) exceeded for invalid responses. Giving up.") logger.error(f"{agent.log_prefix}Invalid API response after {max_retries} retries.") agent._persist_session(messages, conversation_history) @@ -1488,7 +1492,7 @@ def run_conversation( # Backoff before retry — jittered exponential: 5s base, 120s cap wait_time = jittered_backoff(retry_count, base_delay=5.0, max_delay=120.0) - agent._vprint(f"{agent.log_prefix}⏳ Retrying in {wait_time:.1f}s ({_failure_hint})...", force=True) + agent._buffer_vprint(f"⏳ Retrying in {wait_time:.1f}s ({_failure_hint})...") logger.warning(f"Invalid API response (retry {retry_count}/{max_retries}): {', '.join(error_details)} | Provider: {provider_name}") # Sleep in small increments to stay responsive to interrupts @@ -1715,14 +1719,14 @@ def run_conversation( if assistant_message is not None and _trunc_has_tool_calls: if truncated_tool_call_retries < 1: truncated_tool_call_retries += 1 - agent._vprint( - f"{agent.log_prefix}⚠️ Truncated tool call detected — retrying API call...", - force=True, + agent._buffer_vprint( + f"⚠️ Truncated tool call detected — retrying API call..." ) # Don't append the broken response to messages; # just re-run the same API call from the current # message state, giving the model another chance. continue + agent._flush_status_buffer() agent._vprint( f"{agent.log_prefix}⚠️ Truncated tool call response detected again — refusing to execute incomplete tool arguments.", force=True, @@ -1756,6 +1760,7 @@ def run_conversation( } else: # First message was truncated - mark as failed + agent._flush_status_buffer() agent._vprint(f"{agent.log_prefix}❌ First response truncated - cannot recover", force=True) agent._persist_session(messages, conversation_history) return { @@ -1907,6 +1912,11 @@ def run_conversation( ) has_retried_429 = False # Reset on success + # Note: don't clear the retry buffer here — an "API call + # success" only means we got bytes back, not that we got + # usable content. Empty responses still loop through the + # empty-retry path below; the buffer is cleared when + # genuinely successful content is detected later (~L4127). # Clear Nous rate limit state on successful request — # proves the limit has reset and other sessions can # resume hitting Nous. @@ -1933,9 +1943,10 @@ def run_conversation( break except Exception as api_error: - # Stop spinner before printing error messages + # Stop spinner silently — retry status is buffered and + # only flushed when every retry+fallback is exhausted. if thinking_spinner: - thinking_spinner.stop("(╥_╥) error, retrying...") + thinking_spinner.stop("") thinking_spinner = None if agent.thinking_callback: agent.thinking_callback("") @@ -1990,14 +2001,12 @@ def run_conversation( if _surrogates_found or _is_surrogate_error: agent._unicode_sanitization_passes += 1 if _surrogates_found: - agent._vprint( - f"{agent.log_prefix}⚠️ Stripped invalid surrogate characters from messages. Retrying...", - force=True, + agent._buffer_vprint( + f"⚠️ Stripped invalid surrogate characters from messages. Retrying..." ) else: - agent._vprint( - f"{agent.log_prefix}⚠️ Surrogate encoding error — retrying after full-payload sanitization...", - force=True, + agent._buffer_vprint( + f"⚠️ Surrogate encoding error — retrying after full-payload sanitization..." ) continue if _is_ascii_codec: @@ -2325,7 +2334,7 @@ def run_conversation( codex_auth_retry_attempted = True if agent._try_refresh_codex_client_credentials(force=True): _label = "xAI OAuth" if agent.provider == "xai-oauth" else "Codex" - agent._vprint(f"{agent.log_prefix}🔐 {_label} auth refreshed after 401. Retrying request...") + agent._buffer_vprint(f"🔐 {_label} auth refreshed after 401. Retrying request...") continue if ( agent.api_mode == "chat_completions" @@ -2366,7 +2375,7 @@ def run_conversation( ): copilot_auth_retry_attempted = True if agent._try_refresh_copilot_client_credentials(): - agent._vprint(f"{agent.log_prefix}🔐 Copilot credentials refreshed after 401. Retrying request...") + agent._buffer_vprint(f"🔐 Copilot credentials refreshed after 401. Retrying request...") continue if ( agent.api_mode == "anthropic_messages" @@ -2541,41 +2550,37 @@ def run_conversation( _base = getattr(agent, "base_url", "unknown") _model = getattr(agent, "model", "unknown") _status_code_str = f" [HTTP {status_code}]" if status_code else "" - agent._vprint(f"{agent.log_prefix}⚠️ API call failed (attempt {retry_count}/{max_retries}): {error_type}{_status_code_str}", force=True) - agent._vprint(f"{agent.log_prefix} 🔌 Provider: {_provider} Model: {_model}", force=True) - agent._vprint(f"{agent.log_prefix} 🌐 Endpoint: {_base}", force=True) - agent._vprint(f"{agent.log_prefix} 📝 Error: {_error_summary}", force=True) + agent._buffer_vprint(f"⚠️ API call failed (attempt {retry_count}/{max_retries}): {error_type}{_status_code_str}") + agent._buffer_vprint(f" 🔌 Provider: {_provider} Model: {_model}") + agent._buffer_vprint(f" 🌐 Endpoint: {_base}") + agent._buffer_vprint(f" 📝 Error: {_error_summary}") if status_code and status_code < 500: _err_body = getattr(api_error, "body", None) _err_body_str = str(_err_body)[:300] if _err_body else None if _err_body_str: - agent._vprint(f"{agent.log_prefix} 📋 Details: {_err_body_str}", force=True) - agent._vprint(f"{agent.log_prefix} ⏱️ Elapsed: {elapsed_time:.2f}s Context: {len(api_messages)} msgs, ~{approx_tokens:,} tokens") + agent._buffer_vprint(f" 📋 Details: {_err_body_str}") + agent._buffer_vprint(f" ⏱️ Elapsed: {elapsed_time:.2f}s Context: {len(api_messages)} msgs, ~{approx_tokens:,} tokens") # Actionable hint for OpenRouter "no tool endpoints" error. - # This fires regardless of whether fallback succeeds — the - # user needs to know WHY their model failed so they can fix - # their provider routing, not just silently fall back. + # Buffered like the rest of the retry trace — surfaced only + # if every retry+fallback exhausts. Avoids spamming users + # who recover automatically via fallback. if ( agent._is_openrouter_url() and "support tool use" in error_msg ): - agent._vprint( - f"{agent.log_prefix} 💡 No OpenRouter providers for {_model} support tool calling with your current settings.", - force=True, + agent._buffer_vprint( + f" 💡 No OpenRouter providers for {_model} support tool calling with your current settings." ) if agent.providers_allowed: - agent._vprint( - f"{agent.log_prefix} Your provider_routing.only restriction is filtering out tool-capable providers.", - force=True, + agent._buffer_vprint( + f" Your provider_routing.only restriction is filtering out tool-capable providers." ) - agent._vprint( - f"{agent.log_prefix} Try removing the restriction or adding providers that support tools for this model.", - force=True, + agent._buffer_vprint( + f" Try removing the restriction or adding providers that support tools for this model." ) - agent._vprint( - f"{agent.log_prefix} Check which providers support tools: https://openrouter.ai/models/{_model}", - force=True, + agent._buffer_vprint( + f" Check which providers support tools: https://openrouter.ai/models/{_model}" ) # Check for interrupt before deciding to retry @@ -2625,11 +2630,10 @@ def run_conversation( # user later enables extra usage the 1M limit # should come back automatically. compressor._context_probe_persistable = False - agent._vprint( - f"{agent.log_prefix}⚠️ Anthropic long-context tier " + agent._buffer_vprint( + f"⚠️ Anthropic long-context tier " f"requires extra usage — reducing context: " - f"{old_ctx:,} → {_reduced_ctx:,} tokens", - force=True, + f"{old_ctx:,} → {_reduced_ctx:,} tokens" ) compression_attempts += 1 @@ -2645,7 +2649,7 @@ def run_conversation( # messages to the new session, not skipping them. conversation_history = None if len(messages) < original_len or old_ctx > _reduced_ctx: - agent._emit_status( + agent._buffer_status( f"🗜️ Context reduced to {_reduced_ctx:,} tokens " f"(was {old_ctx:,}), retrying..." ) @@ -2675,11 +2679,11 @@ def run_conversation( ) if not pool_may_recover: if classified.reason == FailoverReason.billing: - agent._emit_status( + agent._buffer_status( "⚠️ Billing or credits exhausted — switching to fallback provider..." ) else: - agent._emit_status("⚠️ Rate limited — switching to fallback provider...") + agent._buffer_status("⚠️ Rate limited — switching to fallback provider...") if agent._try_activate_fallback(reason=classified.reason): retry_count = 0 compression_attempts = 0 @@ -2791,6 +2795,8 @@ def run_conversation( if is_payload_too_large: compression_attempts += 1 if compression_attempts > max_compression_attempts: + # Terminal — surface the buffered retry trace. + agent._flush_status_buffer() agent._vprint(f"{agent.log_prefix}❌ Max compression attempts ({max_compression_attempts}) reached for payload-too-large error.", force=True) agent._vprint(f"{agent.log_prefix} 💡 Try /new to start a fresh conversation, or /compress to retry compression.", force=True) logger.error(f"{agent.log_prefix}413 compression failed after {max_compression_attempts} attempts.") @@ -2804,7 +2810,7 @@ def run_conversation( "failed": True, "compression_exhausted": True, } - agent._emit_status(f"⚠️ Request payload too large (413) — compression attempt {compression_attempts}/{max_compression_attempts}...") + agent._buffer_status(f"⚠️ Request payload too large (413) — compression attempt {compression_attempts}/{max_compression_attempts}...") original_len = len(messages) messages, active_system_prompt = agent._compress_context( @@ -2817,11 +2823,14 @@ def run_conversation( conversation_history = None if len(messages) < original_len: - agent._emit_status(f"🗜️ Compressed {original_len} → {len(messages)} messages, retrying...") + agent._buffer_status(f"🗜️ Compressed {original_len} → {len(messages)} messages, retrying...") time.sleep(2) # Brief pause between compression retries restart_with_compressed_messages = True break else: + # Terminal — surface buffered context so the user + # sees what compression attempts were made. + agent._flush_status_buffer() agent._vprint(f"{agent.log_prefix}❌ Payload too large and cannot compress further.", force=True) agent._vprint(f"{agent.log_prefix} 💡 Try /new to start a fresh conversation, or /compress to retry compression.", force=True) logger.error(f"{agent.log_prefix}413 payload too large. Cannot compress further.") @@ -2865,16 +2874,16 @@ def run_conversation( # touching context_length or triggering compression. safe_out = max(1, available_out - 64) # small safety margin agent._ephemeral_max_output_tokens = safe_out - agent._vprint( - f"{agent.log_prefix}⚠️ Output cap too large for current prompt — " + agent._buffer_vprint( + f"⚠️ Output cap too large for current prompt — " f"retrying with max_tokens={safe_out:,} " - f"(available_tokens={available_out:,}; context_length unchanged at {old_ctx:,})", - force=True, + f"(available_tokens={available_out:,}; context_length unchanged at {old_ctx:,})" ) # Still count against compression_attempts so we don't # loop forever if the error keeps recurring. compression_attempts += 1 if compression_attempts > max_compression_attempts: + agent._flush_status_buffer() agent._vprint(f"{agent.log_prefix}❌ Max compression attempts ({max_compression_attempts}) reached.", force=True) agent._vprint(f"{agent.log_prefix} 💡 Try /new to start a fresh conversation, or /compress to retry compression.", force=True) logger.error(f"{agent.log_prefix}Context compression failed after {max_compression_attempts} attempts.") @@ -2910,13 +2919,12 @@ def run_conversation( ) if parsed_limit and parsed_limit < old_ctx: new_ctx = parsed_limit - agent._vprint(f"{agent.log_prefix}Context limit detected from API: {new_ctx:,} tokens (was {old_ctx:,})", force=True) + agent._buffer_vprint(f"Context limit detected from API: {new_ctx:,} tokens (was {old_ctx:,})") elif minimax_delta_only_overflow: new_ctx = old_ctx - agent._vprint( - f"{agent.log_prefix}Provider reported overflow amount only; " - f"keeping context_length at {old_ctx:,} tokens and compressing.", - force=True, + agent._buffer_vprint( + f"Provider reported overflow amount only; " + f"keeping context_length at {old_ctx:,} tokens and compressing." ) else: # Step down to the next probe tier @@ -2943,12 +2951,13 @@ def run_conversation( compressor._context_probe_persistable = bool( parsed_limit and parsed_limit == new_ctx ) - agent._vprint(f"{agent.log_prefix}⚠️ Context length exceeded — stepping down: {old_ctx:,} → {new_ctx:,} tokens", force=True) + agent._buffer_vprint(f"⚠️ Context length exceeded — stepping down: {old_ctx:,} → {new_ctx:,} tokens") else: - agent._vprint(f"{agent.log_prefix}⚠️ Context length exceeded at minimum tier — attempting compression...", force=True) + agent._buffer_vprint(f"⚠️ Context length exceeded at minimum tier — attempting compression...") compression_attempts += 1 if compression_attempts > max_compression_attempts: + agent._flush_status_buffer() agent._vprint(f"{agent.log_prefix}❌ Max compression attempts ({max_compression_attempts}) reached.", force=True) agent._vprint(f"{agent.log_prefix} 💡 Try /new to start a fresh conversation, or /compress to retry compression.", force=True) logger.error(f"{agent.log_prefix}Context compression failed after {max_compression_attempts} attempts.") @@ -2962,7 +2971,7 @@ def run_conversation( "failed": True, "compression_exhausted": True, } - agent._emit_status(f"🗜️ Context too large (~{approx_tokens:,} tokens) — compressing ({compression_attempts}/{max_compression_attempts})...") + agent._buffer_status(f"🗜️ Context too large (~{approx_tokens:,} tokens) — compressing ({compression_attempts}/{max_compression_attempts})...") original_len = len(messages) messages, active_system_prompt = agent._compress_context( @@ -2976,12 +2985,13 @@ def run_conversation( if len(messages) < original_len or new_ctx and new_ctx < old_ctx: if len(messages) < original_len: - agent._emit_status(f"🗜️ Compressed {original_len} → {len(messages)} messages, retrying...") + agent._buffer_status(f"🗜️ Compressed {original_len} → {len(messages)} messages, retrying...") time.sleep(2) # Brief pause between compression retries restart_with_compressed_messages = True break else: # Can't compress further and already at minimum tier + agent._flush_status_buffer() agent._vprint(f"{agent.log_prefix}❌ Context length exceeded and cannot compress further.", force=True) agent._vprint(f"{agent.log_prefix} 💡 The conversation has accumulated too much content. Try /new to start fresh, or /compress to manually trigger compression.", force=True) logger.error(f"{agent.log_prefix}Context length exceeded: {approx_tokens:,} tokens. Cannot compress further.") @@ -3070,7 +3080,7 @@ def run_conversation( if is_client_error: # Try fallback before aborting — a different provider # may not have the same issue (rate limit, auth, etc.) - agent._emit_status(f"⚠️ Non-retryable error (HTTP {status_code}) — trying fallback...") + agent._buffer_status(f"⚠️ Non-retryable error (HTTP {status_code}) — trying fallback...") if agent._try_activate_fallback(): retry_count = 0 compression_attempts = 0 @@ -3080,6 +3090,9 @@ def run_conversation( agent._dump_api_request_debug( api_kwargs, reason="non_retryable_client_error", error=api_error, ) + # Terminal — flush buffered context so the user sees + # what was tried before the abort. + agent._flush_status_buffer() agent._emit_status( f"❌ Non-retryable error (HTTP {status_code}): " f"{agent._summarize_api_error(api_error)}" @@ -3165,12 +3178,14 @@ def run_conversation( retry_count = 0 continue # Try fallback before giving up entirely - agent._emit_status(f"⚠️ Max retries ({max_retries}) exhausted — trying fallback...") + agent._buffer_status(f"⚠️ Max retries ({max_retries}) exhausted — trying fallback...") if agent._try_activate_fallback(): retry_count = 0 compression_attempts = 0 primary_recovery_attempted = False continue + # Terminal — flush buffered retry/fallback trace. + agent._flush_status_buffer() _final_summary = agent._summarize_api_error(api_error) _billing_guidance = "" if classified.reason == FailoverReason.billing: @@ -3270,9 +3285,9 @@ def run_conversation( pass wait_time = _retry_after if _retry_after else jittered_backoff(retry_count, base_delay=2.0, max_delay=60.0) if is_rate_limited: - agent._emit_status(f"⏱️ Rate limited. Waiting {wait_time:.1f}s (attempt {retry_count + 1}/{max_retries})...") + agent._buffer_status(f"⏱️ Rate limited. Waiting {wait_time:.1f}s (attempt {retry_count + 1}/{max_retries})...") else: - agent._emit_status(f"⏳ Retrying in {wait_time:.1f}s (attempt {retry_count}/{max_retries})...") + agent._buffer_status(f"⏳ Retrying in {wait_time:.1f}s (attempt {retry_count}/{max_retries})...") logger.warning( "Retrying API call in %ss (attempt %s/%s) %s error=%s", wait_time, @@ -3431,14 +3446,15 @@ def run_conversation( if has_incomplete_scratchpad(assistant_message.content or ""): agent._incomplete_scratchpad_retries += 1 - agent._vprint(f"{agent.log_prefix}⚠️ Incomplete detected (opened but never closed)") + agent._buffer_vprint(f"⚠️ Incomplete detected (opened but never closed)") if agent._incomplete_scratchpad_retries <= 2: - agent._vprint(f"{agent.log_prefix}🔄 Retrying API call ({agent._incomplete_scratchpad_retries}/2)...") + agent._buffer_vprint(f"🔄 Retrying API call ({agent._incomplete_scratchpad_retries}/2)...") # Don't add the broken message, just retry continue else: # Max retries - discard this turn and save as partial + agent._flush_status_buffer() agent._vprint(f"{agent.log_prefix}❌ Max retries (2) for incomplete scratchpad. Saving as partial.", force=True) agent._incomplete_scratchpad_retries = 0 @@ -3546,9 +3562,10 @@ def run_conversation( available = ", ".join(sorted(agent.valid_tool_names)) invalid_name = invalid_tool_calls[0] invalid_preview = invalid_name[:80] + "..." if len(invalid_name) > 80 else invalid_name - agent._vprint(f"{agent.log_prefix}⚠️ Unknown tool '{invalid_preview}' — sending error to model for agent-correction ({agent._invalid_tool_retries}/3)") + agent._buffer_vprint(f"⚠️ Unknown tool '{invalid_preview}' — sending error to model for agent-correction ({agent._invalid_tool_retries}/3)") if agent._invalid_tool_retries >= 3: + agent._flush_status_buffer() agent._vprint(f"{agent.log_prefix}❌ Max retries (3) for invalid tool calls exceeded. Stopping as partial.", force=True) agent._invalid_tool_retries = 0 agent._persist_session(messages, conversation_history) @@ -3632,16 +3649,16 @@ def run_conversation( agent._invalid_json_retries += 1 tool_name, error_msg = invalid_json_args[0] - agent._vprint(f"{agent.log_prefix}⚠️ Invalid JSON in tool call arguments for '{tool_name}': {error_msg}") + agent._buffer_vprint(f"⚠️ Invalid JSON in tool call arguments for '{tool_name}': {error_msg}") if agent._invalid_json_retries < 3: - agent._vprint(f"{agent.log_prefix}🔄 Retrying API call ({agent._invalid_json_retries}/3)...") + agent._buffer_vprint(f"🔄 Retrying API call ({agent._invalid_json_retries}/3)...") # Don't add anything to messages, just retry the API call continue else: # Instead of returning partial, inject tool error results so the model can recover. # Using tool results (not user messages) preserves role alternation. - agent._vprint(f"{agent.log_prefix}⚠️ Injecting recovery tool results for invalid JSON...") + agent._buffer_vprint(f"⚠️ Injecting recovery tool results for invalid JSON...") agent._invalid_json_retries = 0 # Reset for next attempt # Append the assistant message with its (broken) tool_calls @@ -3949,7 +3966,7 @@ def run_conversation( "Empty response after tool calls — nudging model " "to continue processing" ) - agent._emit_status( + agent._buffer_status( "⚠️ Model returned empty after tool calls — " "nudging to continue" ) @@ -3995,7 +4012,7 @@ def run_conversation( "prefilling to continue (%d/2)", agent._thinking_prefill_retries, ) - agent._emit_status( + agent._buffer_status( f"↻ Thinking-only response — prefilling to continue " f"({agent._thinking_prefill_retries}/2)" ) @@ -4030,7 +4047,7 @@ def run_conversation( "retry %d/3 (model=%s)", agent._empty_content_retries, agent.model, ) - agent._emit_status( + agent._buffer_status( f"⚠️ Empty response from model — retrying " f"({agent._empty_content_retries}/3)" ) @@ -4049,13 +4066,13 @@ def run_conversation( agent._empty_content_retries, agent.model, agent.provider, ) - agent._emit_status( + agent._buffer_status( "⚠️ Model returning empty responses — " "switching to fallback provider..." ) if agent._try_activate_fallback(): agent._empty_content_retries = 0 - agent._emit_status( + agent._buffer_status( f"↻ Switched to fallback: {agent.model} " f"({agent.provider})" ) @@ -4069,6 +4086,9 @@ def run_conversation( # Exhausted retries and fallback chain (or no # fallback configured). Fall through to the # "(empty)" terminal. + # Surface the buffered retry/fallback trace so the + # user can see what was attempted before "(empty)". + agent._flush_status_buffer() _turn_exit_reason = "empty_response_exhausted" reasoning_text = agent._extract_reasoning(assistant_message) agent._drop_trailing_empty_response_scaffolding(messages) @@ -4113,6 +4133,9 @@ def run_conversation( # Reset retry counter/signature on successful content agent._empty_content_retries = 0 agent._thinking_prefill_retries = 0 + # Successful content reached — drop any buffered retry + # status from earlier failed attempts in this turn. + agent._clear_status_buffer() if ( agent.api_mode == "codex_responses" diff --git a/agent/stream_diag.py b/agent/stream_diag.py index c4d8c54f470..cd10e74367a 100644 --- a/agent/stream_diag.py +++ b/agent/stream_diag.py @@ -258,7 +258,7 @@ def emit_stream_drop( except Exception: pass try: - agent._emit_status( + agent._buffer_status( f"⚠️ {provider} stream {kind} ({type(error).__name__}){_suffix} " f"— reconnecting, retry {attempt}/{max_attempts}" ) diff --git a/run_agent.py b/run_agent.py index a43eac9bdbf..6d3af390b6d 100644 --- a/run_agent.py +++ b/run_agent.py @@ -801,6 +801,83 @@ class AIAgent: except Exception: logger.debug("status_callback error in _emit_warning", exc_info=True) + # ── Buffered retry/fallback status ──────────────────────────────────── + # Retry and fallback chains were flooding the CLI/gateway with status + # noise that users found confusing: a single transient 429 could produce + # 10+ "Provider/Endpoint/Retrying in 5s..." lines before the request + # eventually succeeded. The buffered helpers below capture these + # status messages instead of emitting them immediately. They are + # flushed (shown to the user) ONLY when every retry and fallback has + # been exhausted; on success they are silently dropped. Backend logs + # (agent.log) are unaffected — every individual emission site still + # writes to ``logger.warning`` / ``logger.info`` for diagnosis. + + def _buffer_status(self, message: str) -> None: + """Buffer a retry/fallback status message. + + Stored as a (kind, text) tuple where ``kind`` is one of: + - ``"status"`` -> replays via ``_emit_status`` + - ``"vprint"`` -> replays via ``_vprint(force=True)`` + - ``"warn"`` -> replays via ``_emit_warning`` + Used to defer noisy retry chatter until we know whether the + turn ultimately recovered or failed. + """ + try: + buf = getattr(self, "_retry_status_buffer", None) + if buf is None: + buf = [] + self._retry_status_buffer = buf + buf.append(("status", message)) + except Exception: + # Never break the retry loop on a buffer hiccup. + pass + + def _buffer_vprint(self, message: str) -> None: + """Buffer a vprint(force=True) retry/fallback line.""" + try: + buf = getattr(self, "_retry_status_buffer", None) + if buf is None: + buf = [] + self._retry_status_buffer = buf + buf.append(("vprint", message)) + except Exception: + pass + + def _clear_status_buffer(self) -> None: + """Drop buffered retry messages — call on successful recovery.""" + try: + buf = getattr(self, "_retry_status_buffer", None) + if buf: + buf.clear() + except Exception: + pass + + def _flush_status_buffer(self) -> None: + """Emit buffered retry messages — call on terminal failure. + + Surfaces the full retry/fallback trace so the user can see what + was tried before the turn gave up. + """ + try: + buf = getattr(self, "_retry_status_buffer", None) + if not buf: + return + # Drain first so a callback exception doesn't double-emit. + messages = list(buf) + buf.clear() + for kind, msg in messages: + try: + if kind == "status": + self._emit_status(msg) + elif kind == "warn": + self._emit_warning(msg) + else: + self._vprint(f"{self.log_prefix}{msg}", force=True) + except Exception: + pass + except Exception: + pass + def _disable_codex_reasoning_replay( self, messages: Optional[List[Dict[str, Any]]] = None, diff --git a/tests/agent/test_codex_ttfb_watchdog.py b/tests/agent/test_codex_ttfb_watchdog.py index 57466a81834..02f3e750c7c 100644 --- a/tests/agent/test_codex_ttfb_watchdog.py +++ b/tests/agent/test_codex_ttfb_watchdog.py @@ -114,6 +114,7 @@ def test_ttfb_includes_silent_hang_hint_for_gpt_5_5(tmp_path, monkeypatch): statuses: list[str] = [] dummy_client = SimpleNamespace() monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client) + monkeypatch.setattr(agent, "_buffer_status", lambda msg: statuses.append(msg)) monkeypatch.setattr(agent, "_emit_status", lambda msg: statuses.append(msg)) monkeypatch.setattr( agent, "_abort_request_openai_client", diff --git a/tests/run_agent/test_retry_status_buffer.py b/tests/run_agent/test_retry_status_buffer.py new file mode 100644 index 00000000000..a47f19fa502 --- /dev/null +++ b/tests/run_agent/test_retry_status_buffer.py @@ -0,0 +1,157 @@ +"""Tests for the retry/fallback status buffer helpers on AIAgent. + +These helpers defer noisy retry chatter (rate-limit retries, fallback +switches, compression attempts) so users only see the trace when +everything ultimately fails. On successful recovery the buffer is +silently dropped. +""" + +from __future__ import annotations + +import pytest + +from run_agent import AIAgent + + +def _make_bare_agent(): + """Construct an AIAgent without running __init__ — we only need the + buffered-status helpers, which are pure-Python and depend only on a + handful of attributes.""" + agent = object.__new__(AIAgent) + agent.log_prefix = "" + agent.status_callback = None + agent.suppress_status_output = False + agent._mute_post_response = False + agent._executing_tools = False + agent._print_fn = None + return agent + + +def test_buffer_status_accumulates_then_flushes(capsys): + agent = _make_bare_agent() + emitted = [] + agent._emit_status = lambda msg: emitted.append(("status", msg)) + + agent._buffer_status("⏳ Retrying...") + agent._buffer_status("⚠️ Fallback...") + + # Nothing emitted yet — they are buffered. + assert emitted == [] + assert agent._retry_status_buffer == [ + ("status", "⏳ Retrying..."), + ("status", "⚠️ Fallback..."), + ] + + # Flush surfaces them in order through _emit_status. + agent._flush_status_buffer() + assert emitted == [ + ("status", "⏳ Retrying..."), + ("status", "⚠️ Fallback..."), + ] + # Buffer is drained. + assert agent._retry_status_buffer == [] + + +def test_clear_drops_buffered_messages_silently(): + agent = _make_bare_agent() + emitted = [] + agent._emit_status = lambda msg: emitted.append(msg) + + agent._buffer_status("⏳ Retrying...") + agent._buffer_status("⚠️ Fallback...") + agent._clear_status_buffer() + + # Nothing was emitted — clear is the success path. + assert emitted == [] + assert agent._retry_status_buffer == [] + + # Subsequent flush is a no-op. + agent._flush_status_buffer() + assert emitted == [] + + +def test_buffer_vprint_replays_via_vprint_with_log_prefix(): + agent = _make_bare_agent() + agent.log_prefix = "[abc] " + seen = [] + agent._vprint = lambda msg, force=False, **kw: seen.append((msg, force)) + + agent._buffer_vprint("⚠️ API call failed") + agent._flush_status_buffer() + + # Replays through _vprint with force=True and the agent's log_prefix + # prepended (matching the original direct-emit format). + assert seen == [("[abc] ⚠️ API call failed", True)] + + +def test_flush_empty_buffer_is_noop(): + agent = _make_bare_agent() + emitted = [] + agent._emit_status = lambda msg: emitted.append(msg) + agent._vprint = lambda msg, force=False, **kw: emitted.append(msg) + + # No buffer attribute yet — flush should be a quiet no-op. + agent._flush_status_buffer() + assert emitted == [] + + # Even after touching the buffer (via clear on an empty/missing buffer). + agent._clear_status_buffer() + agent._flush_status_buffer() + assert emitted == [] + + +def test_re_buffer_after_flush_works(): + agent = _make_bare_agent() + emitted = [] + agent._emit_status = lambda msg: emitted.append(msg) + + agent._buffer_status("first") + agent._flush_status_buffer() + agent._buffer_status("second") + agent._flush_status_buffer() + + assert emitted == ["first", "second"] + + +def test_mixed_kinds_replay_through_correct_channels(): + agent = _make_bare_agent() + agent.log_prefix = "" + statuses = [] + vprints = [] + warns = [] + agent._emit_status = lambda msg: statuses.append(msg) + agent._vprint = lambda msg, force=False, **kw: vprints.append((msg, force)) + agent._emit_warning = lambda msg: warns.append(msg) + + agent._buffer_status("status-1") + agent._buffer_vprint("vprint-1") + # Manually mix in a "warn" record to verify the dispatch still works. + agent._retry_status_buffer.append(("warn", "warn-1")) + agent._buffer_status("status-2") + + agent._flush_status_buffer() + + assert statuses == ["status-1", "status-2"] + assert vprints == [("vprint-1", True)] + assert warns == ["warn-1"] + + +def test_flush_swallows_callback_exceptions(): + agent = _make_bare_agent() + seen = [] + + def boom(msg): + seen.append(msg) + raise RuntimeError("simulated callback failure") + + agent._emit_status = boom + + agent._buffer_status("first") + agent._buffer_status("second") + # Should not raise even though _emit_status raises for every message. + agent._flush_status_buffer() + + # Both messages were attempted. + assert seen == ["first", "second"] + # Buffer drained regardless of failures. + assert agent._retry_status_buffer == [] diff --git a/tests/run_agent/test_stream_drop_logging.py b/tests/run_agent/test_stream_drop_logging.py index f424a4f403f..bcb6ddd1a12 100644 --- a/tests/run_agent/test_stream_drop_logging.py +++ b/tests/run_agent/test_stream_drop_logging.py @@ -203,7 +203,7 @@ def test_emit_stream_drop_ui_includes_elapsed_when_available(): diag = AIAgent._stream_diag_init() diag["started_at"] = time.time() - 8.0 # 8s on the wire before drop - with patch.object(agent, "_emit_status") as mock_emit: + with patch.object(agent, "_buffer_status") as mock_emit: agent._emit_stream_drop( error=ConnectionError("x"), attempt=2, @@ -223,7 +223,7 @@ def test_emit_stream_drop_ui_omits_suffix_without_diag(): agent = _make_agent() agent.provider = "openrouter" - with patch.object(agent, "_emit_status") as mock_emit: + with patch.object(agent, "_buffer_status") as mock_emit: agent._emit_stream_drop( error=ConnectionError("x"), attempt=2,