feat(agent): buffer retry/fallback status, surface only on terminal failure (#33816)

Users report that the CLI/gateway floods them with confusing retry chatter
during transient failures: a single 429 can produce 10+ "Provider/Endpoint/
Retrying in 5s..." lines before the request eventually succeeds. The same
firehose hits Telegram, Discord, Slack, etc. via _emit_status.

This patch defers all retry/fallback/compression status messages until we
know the outcome:
  - if the turn ultimately succeeds (any path: primary recovers, fallback
    activates, compression unsticks the request), the buffer is silently
    dropped — the user sees nothing.
  - if every retry and fallback exhausts and the turn fails, the buffer
    is flushed at the terminal-failure return so the user sees the full
    retry trace alongside the final error.

Backend logging (agent.log) is unchanged — every emission site still
writes to logger.warning/info, so post-mortem diagnosis is intact.

## What changed

run_agent.py: four new methods on AIAgent:
  _buffer_status(msg)   — defer an _emit_status call
  _buffer_vprint(msg)   — defer a _vprint(force=True) line
  _clear_status_buffer() — drop pending messages on success
  _flush_status_buffer() — replay pending messages on terminal failure

agent/conversation_loop.py:
  - converted ~30 mid-process emit/vprint sites in the retry, fallback,
    compression, empty-response, and stream-watchdog paths to the buffered
    helpers
  - added _flush_status_buffer() at every terminal-failure return so users
    still see the trace when it actually matters
  - added _clear_status_buffer() at the "non-empty assistant content"
    point (NOT at "API call returned bytes" — empty responses still loop
    through the empty-retry path and would otherwise lose their trace
    between iterations)
  - silenced the two "(´;ω;`) oops, retrying..." / "(╥_╥) error,
    retrying..." spinner final-frame messages — the spinner now stops
    cleanly so retries leave no visible residue

agent/chat_completion_helpers.py: same conversion for codex TTFB / stale-
stream / fallback-activation status messages.

agent/stream_diag.py: _emit_stream_drop now buffers instead of emitting
directly.

## Tests

tests/run_agent/test_retry_status_buffer.py: 7 unit tests covering
accumulate→flush, clear-on-success, mixed kinds, empty-buffer no-op,
re-buffer after flush, exception swallowing.

Updated 3 existing tests that mocked _emit_status to also mock (or use)
_buffer_status:
  - tests/run_agent/test_run_agent.py::test_empty_response_emits_status_for_gateway
  - tests/run_agent/test_stream_drop_logging.py (2 tests)
  - tests/agent/test_codex_ttfb_watchdog.py (TTFB hint test)

## Validation

Live test: hermes chat -q against an unreachable endpoint with no fallback
exhausts retries and prints the full trace at the end. Same flow against
a working endpoint prints zero retry chatter.
This commit is contained in:
Teknium 2026-05-28 04:53:27 -07:00 committed by GitHub
parent e0572a6def
commit 67011cc0d7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 354 additions and 96 deletions

View file

@ -403,13 +403,13 @@ def interruptible_api_call(agent, api_kwargs: dict):
_elapsed, _ttfb_timeout, api_kwargs.get("model", "unknown"),
)
if _silent_hint:
agent._emit_status(
agent._buffer_status(
f"⚠️ No first byte from provider in {int(_elapsed)}s "
f"(codex stream, model: {api_kwargs.get('model', 'unknown')}). "
f"Reconnecting. {_silent_hint}"
)
else:
agent._emit_status(
agent._buffer_status(
f"⚠️ No first byte from provider in {int(_elapsed)}s "
f"(codex stream, model: {api_kwargs.get('model', 'unknown')}). "
f"Reconnecting."
@ -455,7 +455,7 @@ def interruptible_api_call(agent, api_kwargs: dict):
api_kwargs.get("model", "unknown"),
f"{_est_tokens_for_codex_watchdog:,}",
)
agent._emit_status(
agent._buffer_status(
f"⚠️ Codex stream sent no events for {int(_event_stale_elapsed)}s "
f"after first byte (model: {api_kwargs.get('model', 'unknown')}). "
f"Reconnecting."
@ -493,13 +493,13 @@ def interruptible_api_call(agent, api_kwargs: dict):
api_kwargs.get("model", "unknown"), f"{_est_ctx:,}",
)
if _silent_hint:
agent._emit_status(
agent._buffer_status(
f"⚠️ No response from provider for {int(_elapsed)}s "
f"(non-streaming, model: {api_kwargs.get('model', 'unknown')}). "
f"{_silent_hint}"
)
else:
agent._emit_status(
agent._buffer_status(
f"⚠️ No response from provider for {int(_elapsed)}s "
f"(non-streaming, model: {api_kwargs.get('model', 'unknown')}). "
f"Aborting call."
@ -1262,7 +1262,7 @@ def try_activate_fallback(agent, reason: "FailoverReason | None" = None) -> bool
api_mode=agent.api_mode,
)
agent._emit_status(
agent._buffer_status(
f"🔄 Primary model failed — switching to fallback: "
f"{fb_model} via {fb_provider}"
)
@ -2251,7 +2251,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
mid_tool_call=False,
diag=request_client_holder.get("diag"),
)
agent._emit_status(
agent._buffer_status(
"❌ Provider returned malformed streaming data after "
f"{_max_stream_retries + 1} attempts. "
"The provider may be experiencing issues — "
@ -2358,7 +2358,7 @@ def interruptible_streaming_api_call(agent, api_kwargs: dict, *, on_first_delta=
_stale_elapsed, _stream_stale_timeout,
api_kwargs.get("model", "unknown"), f"{_est_ctx:,}",
)
agent._emit_status(
agent._buffer_status(
f"⚠️ No response from provider for {int(_stale_elapsed)}s "
f"(model: {api_kwargs.get('model', 'unknown')}, "
f"context: ~{_est_ctx:,} tokens). "

View file

@ -1151,17 +1151,18 @@ def run_conversation(
f"Nous Portal rate limit active — "
f"resets in {_fmt_nous_remaining(_nous_remaining)}."
)
agent._vprint(
f"{agent.log_prefix}{_nous_msg} Trying fallback...",
force=True,
agent._buffer_vprint(
f"{_nous_msg} Trying fallback..."
)
agent._emit_status(f"{_nous_msg}")
agent._buffer_status(f"{_nous_msg}")
if agent._try_activate_fallback():
retry_count = 0
compression_attempts = 0
primary_recovery_attempted = False
continue
# No fallback available — return with clear message
# No fallback available — surface buffered context
# so user sees the rate-limit message that led here.
agent._flush_status_buffer()
agent._persist_session(messages, conversation_history)
return {
"final_response": (
@ -1384,9 +1385,10 @@ def run_conversation(
error_details.append("response.choices is empty")
if response_invalid:
# Stop spinner before printing error messages
# Stop spinner silently — retry status is now buffered
# and only surfaced if every retry+fallback exhausts.
if thinking_spinner:
thinking_spinner.stop("(´;ω;`) oops, retrying...")
thinking_spinner.stop("")
thinking_spinner = None
if agent.thinking_callback:
agent.thinking_callback("")
@ -1399,7 +1401,7 @@ def run_conversation(
# rate-limit symptom. Switch to fallback immediately
# rather than retrying with extended backoff.
if agent._fallback_index < len(agent._fallback_chain):
agent._emit_status("⚠️ Empty/malformed response — switching to fallback...")
agent._buffer_status("⚠️ Empty/malformed response — switching to fallback...")
if agent._try_activate_fallback():
retry_count = 0
compression_attempts = 0
@ -1461,20 +1463,22 @@ def run_conversation(
else:
_failure_hint = f"response time {api_duration:.1f}s"
agent._vprint(f"{agent.log_prefix}⚠️ Invalid API response (attempt {retry_count}/{max_retries}): {', '.join(error_details)}", force=True)
agent._vprint(f"{agent.log_prefix} 🏢 Provider: {provider_name}", force=True)
agent._buffer_vprint(f"⚠️ Invalid API response (attempt {retry_count}/{max_retries}): {', '.join(error_details)}")
agent._buffer_vprint(f" 🏢 Provider: {provider_name}")
cleaned_provider_error = agent._clean_error_message(error_msg)
agent._vprint(f"{agent.log_prefix} 📝 Provider message: {cleaned_provider_error}", force=True)
agent._vprint(f"{agent.log_prefix} ⏱️ {_failure_hint}", force=True)
agent._buffer_vprint(f" 📝 Provider message: {cleaned_provider_error}")
agent._buffer_vprint(f" ⏱️ {_failure_hint}")
if retry_count >= max_retries:
# Try fallback before giving up
agent._emit_status(f"⚠️ Max retries ({max_retries}) for invalid responses — trying fallback...")
agent._buffer_status(f"⚠️ Max retries ({max_retries}) for invalid responses — trying fallback...")
if agent._try_activate_fallback():
retry_count = 0
compression_attempts = 0
primary_recovery_attempted = False
continue
# Terminal — flush buffered retry trace so user sees what happened.
agent._flush_status_buffer()
agent._emit_status(f"❌ Max retries ({max_retries}) exceeded for invalid responses. Giving up.")
logger.error(f"{agent.log_prefix}Invalid API response after {max_retries} retries.")
agent._persist_session(messages, conversation_history)
@ -1488,7 +1492,7 @@ def run_conversation(
# Backoff before retry — jittered exponential: 5s base, 120s cap
wait_time = jittered_backoff(retry_count, base_delay=5.0, max_delay=120.0)
agent._vprint(f"{agent.log_prefix}⏳ Retrying in {wait_time:.1f}s ({_failure_hint})...", force=True)
agent._buffer_vprint(f"⏳ Retrying in {wait_time:.1f}s ({_failure_hint})...")
logger.warning(f"Invalid API response (retry {retry_count}/{max_retries}): {', '.join(error_details)} | Provider: {provider_name}")
# Sleep in small increments to stay responsive to interrupts
@ -1715,14 +1719,14 @@ def run_conversation(
if assistant_message is not None and _trunc_has_tool_calls:
if truncated_tool_call_retries < 1:
truncated_tool_call_retries += 1
agent._vprint(
f"{agent.log_prefix}⚠️ Truncated tool call detected — retrying API call...",
force=True,
agent._buffer_vprint(
f"⚠️ Truncated tool call detected — retrying API call..."
)
# Don't append the broken response to messages;
# just re-run the same API call from the current
# message state, giving the model another chance.
continue
agent._flush_status_buffer()
agent._vprint(
f"{agent.log_prefix}⚠️ Truncated tool call response detected again — refusing to execute incomplete tool arguments.",
force=True,
@ -1756,6 +1760,7 @@ def run_conversation(
}
else:
# First message was truncated - mark as failed
agent._flush_status_buffer()
agent._vprint(f"{agent.log_prefix}❌ First response truncated - cannot recover", force=True)
agent._persist_session(messages, conversation_history)
return {
@ -1907,6 +1912,11 @@ def run_conversation(
)
has_retried_429 = False # Reset on success
# Note: don't clear the retry buffer here — an "API call
# success" only means we got bytes back, not that we got
# usable content. Empty responses still loop through the
# empty-retry path below; the buffer is cleared when
# genuinely successful content is detected later (~L4127).
# Clear Nous rate limit state on successful request —
# proves the limit has reset and other sessions can
# resume hitting Nous.
@ -1933,9 +1943,10 @@ def run_conversation(
break
except Exception as api_error:
# Stop spinner before printing error messages
# Stop spinner silently — retry status is buffered and
# only flushed when every retry+fallback is exhausted.
if thinking_spinner:
thinking_spinner.stop("(╥_╥) error, retrying...")
thinking_spinner.stop("")
thinking_spinner = None
if agent.thinking_callback:
agent.thinking_callback("")
@ -1990,14 +2001,12 @@ def run_conversation(
if _surrogates_found or _is_surrogate_error:
agent._unicode_sanitization_passes += 1
if _surrogates_found:
agent._vprint(
f"{agent.log_prefix}⚠️ Stripped invalid surrogate characters from messages. Retrying...",
force=True,
agent._buffer_vprint(
f"⚠️ Stripped invalid surrogate characters from messages. Retrying..."
)
else:
agent._vprint(
f"{agent.log_prefix}⚠️ Surrogate encoding error — retrying after full-payload sanitization...",
force=True,
agent._buffer_vprint(
f"⚠️ Surrogate encoding error — retrying after full-payload sanitization..."
)
continue
if _is_ascii_codec:
@ -2325,7 +2334,7 @@ def run_conversation(
codex_auth_retry_attempted = True
if agent._try_refresh_codex_client_credentials(force=True):
_label = "xAI OAuth" if agent.provider == "xai-oauth" else "Codex"
agent._vprint(f"{agent.log_prefix}🔐 {_label} auth refreshed after 401. Retrying request...")
agent._buffer_vprint(f"🔐 {_label} auth refreshed after 401. Retrying request...")
continue
if (
agent.api_mode == "chat_completions"
@ -2366,7 +2375,7 @@ def run_conversation(
):
copilot_auth_retry_attempted = True
if agent._try_refresh_copilot_client_credentials():
agent._vprint(f"{agent.log_prefix}🔐 Copilot credentials refreshed after 401. Retrying request...")
agent._buffer_vprint(f"🔐 Copilot credentials refreshed after 401. Retrying request...")
continue
if (
agent.api_mode == "anthropic_messages"
@ -2541,41 +2550,37 @@ def run_conversation(
_base = getattr(agent, "base_url", "unknown")
_model = getattr(agent, "model", "unknown")
_status_code_str = f" [HTTP {status_code}]" if status_code else ""
agent._vprint(f"{agent.log_prefix}⚠️ API call failed (attempt {retry_count}/{max_retries}): {error_type}{_status_code_str}", force=True)
agent._vprint(f"{agent.log_prefix} 🔌 Provider: {_provider} Model: {_model}", force=True)
agent._vprint(f"{agent.log_prefix} 🌐 Endpoint: {_base}", force=True)
agent._vprint(f"{agent.log_prefix} 📝 Error: {_error_summary}", force=True)
agent._buffer_vprint(f"⚠️ API call failed (attempt {retry_count}/{max_retries}): {error_type}{_status_code_str}")
agent._buffer_vprint(f" 🔌 Provider: {_provider} Model: {_model}")
agent._buffer_vprint(f" 🌐 Endpoint: {_base}")
agent._buffer_vprint(f" 📝 Error: {_error_summary}")
if status_code and status_code < 500:
_err_body = getattr(api_error, "body", None)
_err_body_str = str(_err_body)[:300] if _err_body else None
if _err_body_str:
agent._vprint(f"{agent.log_prefix} 📋 Details: {_err_body_str}", force=True)
agent._vprint(f"{agent.log_prefix} ⏱️ Elapsed: {elapsed_time:.2f}s Context: {len(api_messages)} msgs, ~{approx_tokens:,} tokens")
agent._buffer_vprint(f" 📋 Details: {_err_body_str}")
agent._buffer_vprint(f" ⏱️ Elapsed: {elapsed_time:.2f}s Context: {len(api_messages)} msgs, ~{approx_tokens:,} tokens")
# Actionable hint for OpenRouter "no tool endpoints" error.
# This fires regardless of whether fallback succeeds — the
# user needs to know WHY their model failed so they can fix
# their provider routing, not just silently fall back.
# Buffered like the rest of the retry trace — surfaced only
# if every retry+fallback exhausts. Avoids spamming users
# who recover automatically via fallback.
if (
agent._is_openrouter_url()
and "support tool use" in error_msg
):
agent._vprint(
f"{agent.log_prefix} 💡 No OpenRouter providers for {_model} support tool calling with your current settings.",
force=True,
agent._buffer_vprint(
f" 💡 No OpenRouter providers for {_model} support tool calling with your current settings."
)
if agent.providers_allowed:
agent._vprint(
f"{agent.log_prefix} Your provider_routing.only restriction is filtering out tool-capable providers.",
force=True,
agent._buffer_vprint(
f" Your provider_routing.only restriction is filtering out tool-capable providers."
)
agent._vprint(
f"{agent.log_prefix} Try removing the restriction or adding providers that support tools for this model.",
force=True,
agent._buffer_vprint(
f" Try removing the restriction or adding providers that support tools for this model."
)
agent._vprint(
f"{agent.log_prefix} Check which providers support tools: https://openrouter.ai/models/{_model}",
force=True,
agent._buffer_vprint(
f" Check which providers support tools: https://openrouter.ai/models/{_model}"
)
# Check for interrupt before deciding to retry
@ -2625,11 +2630,10 @@ def run_conversation(
# user later enables extra usage the 1M limit
# should come back automatically.
compressor._context_probe_persistable = False
agent._vprint(
f"{agent.log_prefix}⚠️ Anthropic long-context tier "
agent._buffer_vprint(
f"⚠️ Anthropic long-context tier "
f"requires extra usage — reducing context: "
f"{old_ctx:,}{_reduced_ctx:,} tokens",
force=True,
f"{old_ctx:,}{_reduced_ctx:,} tokens"
)
compression_attempts += 1
@ -2645,7 +2649,7 @@ def run_conversation(
# messages to the new session, not skipping them.
conversation_history = None
if len(messages) < original_len or old_ctx > _reduced_ctx:
agent._emit_status(
agent._buffer_status(
f"🗜️ Context reduced to {_reduced_ctx:,} tokens "
f"(was {old_ctx:,}), retrying..."
)
@ -2675,11 +2679,11 @@ def run_conversation(
)
if not pool_may_recover:
if classified.reason == FailoverReason.billing:
agent._emit_status(
agent._buffer_status(
"⚠️ Billing or credits exhausted — switching to fallback provider..."
)
else:
agent._emit_status("⚠️ Rate limited — switching to fallback provider...")
agent._buffer_status("⚠️ Rate limited — switching to fallback provider...")
if agent._try_activate_fallback(reason=classified.reason):
retry_count = 0
compression_attempts = 0
@ -2791,6 +2795,8 @@ def run_conversation(
if is_payload_too_large:
compression_attempts += 1
if compression_attempts > max_compression_attempts:
# Terminal — surface the buffered retry trace.
agent._flush_status_buffer()
agent._vprint(f"{agent.log_prefix}❌ Max compression attempts ({max_compression_attempts}) reached for payload-too-large error.", force=True)
agent._vprint(f"{agent.log_prefix} 💡 Try /new to start a fresh conversation, or /compress to retry compression.", force=True)
logger.error(f"{agent.log_prefix}413 compression failed after {max_compression_attempts} attempts.")
@ -2804,7 +2810,7 @@ def run_conversation(
"failed": True,
"compression_exhausted": True,
}
agent._emit_status(f"⚠️ Request payload too large (413) — compression attempt {compression_attempts}/{max_compression_attempts}...")
agent._buffer_status(f"⚠️ Request payload too large (413) — compression attempt {compression_attempts}/{max_compression_attempts}...")
original_len = len(messages)
messages, active_system_prompt = agent._compress_context(
@ -2817,11 +2823,14 @@ def run_conversation(
conversation_history = None
if len(messages) < original_len:
agent._emit_status(f"🗜️ Compressed {original_len}{len(messages)} messages, retrying...")
agent._buffer_status(f"🗜️ Compressed {original_len}{len(messages)} messages, retrying...")
time.sleep(2) # Brief pause between compression retries
restart_with_compressed_messages = True
break
else:
# Terminal — surface buffered context so the user
# sees what compression attempts were made.
agent._flush_status_buffer()
agent._vprint(f"{agent.log_prefix}❌ Payload too large and cannot compress further.", force=True)
agent._vprint(f"{agent.log_prefix} 💡 Try /new to start a fresh conversation, or /compress to retry compression.", force=True)
logger.error(f"{agent.log_prefix}413 payload too large. Cannot compress further.")
@ -2865,16 +2874,16 @@ def run_conversation(
# touching context_length or triggering compression.
safe_out = max(1, available_out - 64) # small safety margin
agent._ephemeral_max_output_tokens = safe_out
agent._vprint(
f"{agent.log_prefix}⚠️ Output cap too large for current prompt — "
agent._buffer_vprint(
f"⚠️ Output cap too large for current prompt — "
f"retrying with max_tokens={safe_out:,} "
f"(available_tokens={available_out:,}; context_length unchanged at {old_ctx:,})",
force=True,
f"(available_tokens={available_out:,}; context_length unchanged at {old_ctx:,})"
)
# Still count against compression_attempts so we don't
# loop forever if the error keeps recurring.
compression_attempts += 1
if compression_attempts > max_compression_attempts:
agent._flush_status_buffer()
agent._vprint(f"{agent.log_prefix}❌ Max compression attempts ({max_compression_attempts}) reached.", force=True)
agent._vprint(f"{agent.log_prefix} 💡 Try /new to start a fresh conversation, or /compress to retry compression.", force=True)
logger.error(f"{agent.log_prefix}Context compression failed after {max_compression_attempts} attempts.")
@ -2910,13 +2919,12 @@ def run_conversation(
)
if parsed_limit and parsed_limit < old_ctx:
new_ctx = parsed_limit
agent._vprint(f"{agent.log_prefix}Context limit detected from API: {new_ctx:,} tokens (was {old_ctx:,})", force=True)
agent._buffer_vprint(f"Context limit detected from API: {new_ctx:,} tokens (was {old_ctx:,})")
elif minimax_delta_only_overflow:
new_ctx = old_ctx
agent._vprint(
f"{agent.log_prefix}Provider reported overflow amount only; "
f"keeping context_length at {old_ctx:,} tokens and compressing.",
force=True,
agent._buffer_vprint(
f"Provider reported overflow amount only; "
f"keeping context_length at {old_ctx:,} tokens and compressing."
)
else:
# Step down to the next probe tier
@ -2943,12 +2951,13 @@ def run_conversation(
compressor._context_probe_persistable = bool(
parsed_limit and parsed_limit == new_ctx
)
agent._vprint(f"{agent.log_prefix}⚠️ Context length exceeded — stepping down: {old_ctx:,}{new_ctx:,} tokens", force=True)
agent._buffer_vprint(f"⚠️ Context length exceeded — stepping down: {old_ctx:,}{new_ctx:,} tokens")
else:
agent._vprint(f"{agent.log_prefix}⚠️ Context length exceeded at minimum tier — attempting compression...", force=True)
agent._buffer_vprint(f"⚠️ Context length exceeded at minimum tier — attempting compression...")
compression_attempts += 1
if compression_attempts > max_compression_attempts:
agent._flush_status_buffer()
agent._vprint(f"{agent.log_prefix}❌ Max compression attempts ({max_compression_attempts}) reached.", force=True)
agent._vprint(f"{agent.log_prefix} 💡 Try /new to start a fresh conversation, or /compress to retry compression.", force=True)
logger.error(f"{agent.log_prefix}Context compression failed after {max_compression_attempts} attempts.")
@ -2962,7 +2971,7 @@ def run_conversation(
"failed": True,
"compression_exhausted": True,
}
agent._emit_status(f"🗜️ Context too large (~{approx_tokens:,} tokens) — compressing ({compression_attempts}/{max_compression_attempts})...")
agent._buffer_status(f"🗜️ Context too large (~{approx_tokens:,} tokens) — compressing ({compression_attempts}/{max_compression_attempts})...")
original_len = len(messages)
messages, active_system_prompt = agent._compress_context(
@ -2976,12 +2985,13 @@ def run_conversation(
if len(messages) < original_len or new_ctx and new_ctx < old_ctx:
if len(messages) < original_len:
agent._emit_status(f"🗜️ Compressed {original_len}{len(messages)} messages, retrying...")
agent._buffer_status(f"🗜️ Compressed {original_len}{len(messages)} messages, retrying...")
time.sleep(2) # Brief pause between compression retries
restart_with_compressed_messages = True
break
else:
# Can't compress further and already at minimum tier
agent._flush_status_buffer()
agent._vprint(f"{agent.log_prefix}❌ Context length exceeded and cannot compress further.", force=True)
agent._vprint(f"{agent.log_prefix} 💡 The conversation has accumulated too much content. Try /new to start fresh, or /compress to manually trigger compression.", force=True)
logger.error(f"{agent.log_prefix}Context length exceeded: {approx_tokens:,} tokens. Cannot compress further.")
@ -3070,7 +3080,7 @@ def run_conversation(
if is_client_error:
# Try fallback before aborting — a different provider
# may not have the same issue (rate limit, auth, etc.)
agent._emit_status(f"⚠️ Non-retryable error (HTTP {status_code}) — trying fallback...")
agent._buffer_status(f"⚠️ Non-retryable error (HTTP {status_code}) — trying fallback...")
if agent._try_activate_fallback():
retry_count = 0
compression_attempts = 0
@ -3080,6 +3090,9 @@ def run_conversation(
agent._dump_api_request_debug(
api_kwargs, reason="non_retryable_client_error", error=api_error,
)
# Terminal — flush buffered context so the user sees
# what was tried before the abort.
agent._flush_status_buffer()
agent._emit_status(
f"❌ Non-retryable error (HTTP {status_code}): "
f"{agent._summarize_api_error(api_error)}"
@ -3165,12 +3178,14 @@ def run_conversation(
retry_count = 0
continue
# Try fallback before giving up entirely
agent._emit_status(f"⚠️ Max retries ({max_retries}) exhausted — trying fallback...")
agent._buffer_status(f"⚠️ Max retries ({max_retries}) exhausted — trying fallback...")
if agent._try_activate_fallback():
retry_count = 0
compression_attempts = 0
primary_recovery_attempted = False
continue
# Terminal — flush buffered retry/fallback trace.
agent._flush_status_buffer()
_final_summary = agent._summarize_api_error(api_error)
_billing_guidance = ""
if classified.reason == FailoverReason.billing:
@ -3270,9 +3285,9 @@ def run_conversation(
pass
wait_time = _retry_after if _retry_after else jittered_backoff(retry_count, base_delay=2.0, max_delay=60.0)
if is_rate_limited:
agent._emit_status(f"⏱️ Rate limited. Waiting {wait_time:.1f}s (attempt {retry_count + 1}/{max_retries})...")
agent._buffer_status(f"⏱️ Rate limited. Waiting {wait_time:.1f}s (attempt {retry_count + 1}/{max_retries})...")
else:
agent._emit_status(f"⏳ Retrying in {wait_time:.1f}s (attempt {retry_count}/{max_retries})...")
agent._buffer_status(f"⏳ Retrying in {wait_time:.1f}s (attempt {retry_count}/{max_retries})...")
logger.warning(
"Retrying API call in %ss (attempt %s/%s) %s error=%s",
wait_time,
@ -3431,14 +3446,15 @@ def run_conversation(
if has_incomplete_scratchpad(assistant_message.content or ""):
agent._incomplete_scratchpad_retries += 1
agent._vprint(f"{agent.log_prefix}⚠️ Incomplete <REASONING_SCRATCHPAD> detected (opened but never closed)")
agent._buffer_vprint(f"⚠️ Incomplete <REASONING_SCRATCHPAD> detected (opened but never closed)")
if agent._incomplete_scratchpad_retries <= 2:
agent._vprint(f"{agent.log_prefix}🔄 Retrying API call ({agent._incomplete_scratchpad_retries}/2)...")
agent._buffer_vprint(f"🔄 Retrying API call ({agent._incomplete_scratchpad_retries}/2)...")
# Don't add the broken message, just retry
continue
else:
# Max retries - discard this turn and save as partial
agent._flush_status_buffer()
agent._vprint(f"{agent.log_prefix}❌ Max retries (2) for incomplete scratchpad. Saving as partial.", force=True)
agent._incomplete_scratchpad_retries = 0
@ -3546,9 +3562,10 @@ def run_conversation(
available = ", ".join(sorted(agent.valid_tool_names))
invalid_name = invalid_tool_calls[0]
invalid_preview = invalid_name[:80] + "..." if len(invalid_name) > 80 else invalid_name
agent._vprint(f"{agent.log_prefix}⚠️ Unknown tool '{invalid_preview}' — sending error to model for agent-correction ({agent._invalid_tool_retries}/3)")
agent._buffer_vprint(f"⚠️ Unknown tool '{invalid_preview}' — sending error to model for agent-correction ({agent._invalid_tool_retries}/3)")
if agent._invalid_tool_retries >= 3:
agent._flush_status_buffer()
agent._vprint(f"{agent.log_prefix}❌ Max retries (3) for invalid tool calls exceeded. Stopping as partial.", force=True)
agent._invalid_tool_retries = 0
agent._persist_session(messages, conversation_history)
@ -3632,16 +3649,16 @@ def run_conversation(
agent._invalid_json_retries += 1
tool_name, error_msg = invalid_json_args[0]
agent._vprint(f"{agent.log_prefix}⚠️ Invalid JSON in tool call arguments for '{tool_name}': {error_msg}")
agent._buffer_vprint(f"⚠️ Invalid JSON in tool call arguments for '{tool_name}': {error_msg}")
if agent._invalid_json_retries < 3:
agent._vprint(f"{agent.log_prefix}🔄 Retrying API call ({agent._invalid_json_retries}/3)...")
agent._buffer_vprint(f"🔄 Retrying API call ({agent._invalid_json_retries}/3)...")
# Don't add anything to messages, just retry the API call
continue
else:
# Instead of returning partial, inject tool error results so the model can recover.
# Using tool results (not user messages) preserves role alternation.
agent._vprint(f"{agent.log_prefix}⚠️ Injecting recovery tool results for invalid JSON...")
agent._buffer_vprint(f"⚠️ Injecting recovery tool results for invalid JSON...")
agent._invalid_json_retries = 0 # Reset for next attempt
# Append the assistant message with its (broken) tool_calls
@ -3949,7 +3966,7 @@ def run_conversation(
"Empty response after tool calls — nudging model "
"to continue processing"
)
agent._emit_status(
agent._buffer_status(
"⚠️ Model returned empty after tool calls — "
"nudging to continue"
)
@ -3995,7 +4012,7 @@ def run_conversation(
"prefilling to continue (%d/2)",
agent._thinking_prefill_retries,
)
agent._emit_status(
agent._buffer_status(
f"↻ Thinking-only response — prefilling to continue "
f"({agent._thinking_prefill_retries}/2)"
)
@ -4030,7 +4047,7 @@ def run_conversation(
"retry %d/3 (model=%s)",
agent._empty_content_retries, agent.model,
)
agent._emit_status(
agent._buffer_status(
f"⚠️ Empty response from model — retrying "
f"({agent._empty_content_retries}/3)"
)
@ -4049,13 +4066,13 @@ def run_conversation(
agent._empty_content_retries, agent.model,
agent.provider,
)
agent._emit_status(
agent._buffer_status(
"⚠️ Model returning empty responses — "
"switching to fallback provider..."
)
if agent._try_activate_fallback():
agent._empty_content_retries = 0
agent._emit_status(
agent._buffer_status(
f"↻ Switched to fallback: {agent.model} "
f"({agent.provider})"
)
@ -4069,6 +4086,9 @@ def run_conversation(
# Exhausted retries and fallback chain (or no
# fallback configured). Fall through to the
# "(empty)" terminal.
# Surface the buffered retry/fallback trace so the
# user can see what was attempted before "(empty)".
agent._flush_status_buffer()
_turn_exit_reason = "empty_response_exhausted"
reasoning_text = agent._extract_reasoning(assistant_message)
agent._drop_trailing_empty_response_scaffolding(messages)
@ -4113,6 +4133,9 @@ def run_conversation(
# Reset retry counter/signature on successful content
agent._empty_content_retries = 0
agent._thinking_prefill_retries = 0
# Successful content reached — drop any buffered retry
# status from earlier failed attempts in this turn.
agent._clear_status_buffer()
if (
agent.api_mode == "codex_responses"

View file

@ -258,7 +258,7 @@ def emit_stream_drop(
except Exception:
pass
try:
agent._emit_status(
agent._buffer_status(
f"⚠️ {provider} stream {kind} ({type(error).__name__}){_suffix} "
f"— reconnecting, retry {attempt}/{max_attempts}"
)

View file

@ -801,6 +801,83 @@ class AIAgent:
except Exception:
logger.debug("status_callback error in _emit_warning", exc_info=True)
# ── Buffered retry/fallback status ────────────────────────────────────
# Retry and fallback chains were flooding the CLI/gateway with status
# noise that users found confusing: a single transient 429 could produce
# 10+ "Provider/Endpoint/Retrying in 5s..." lines before the request
# eventually succeeded. The buffered helpers below capture these
# status messages instead of emitting them immediately. They are
# flushed (shown to the user) ONLY when every retry and fallback has
# been exhausted; on success they are silently dropped. Backend logs
# (agent.log) are unaffected — every individual emission site still
# writes to ``logger.warning`` / ``logger.info`` for diagnosis.
def _buffer_status(self, message: str) -> None:
"""Buffer a retry/fallback status message.
Stored as a (kind, text) tuple where ``kind`` is one of:
- ``"status"`` -> replays via ``_emit_status``
- ``"vprint"`` -> replays via ``_vprint(force=True)``
- ``"warn"`` -> replays via ``_emit_warning``
Used to defer noisy retry chatter until we know whether the
turn ultimately recovered or failed.
"""
try:
buf = getattr(self, "_retry_status_buffer", None)
if buf is None:
buf = []
self._retry_status_buffer = buf
buf.append(("status", message))
except Exception:
# Never break the retry loop on a buffer hiccup.
pass
def _buffer_vprint(self, message: str) -> None:
"""Buffer a vprint(force=True) retry/fallback line."""
try:
buf = getattr(self, "_retry_status_buffer", None)
if buf is None:
buf = []
self._retry_status_buffer = buf
buf.append(("vprint", message))
except Exception:
pass
def _clear_status_buffer(self) -> None:
"""Drop buffered retry messages — call on successful recovery."""
try:
buf = getattr(self, "_retry_status_buffer", None)
if buf:
buf.clear()
except Exception:
pass
def _flush_status_buffer(self) -> None:
"""Emit buffered retry messages — call on terminal failure.
Surfaces the full retry/fallback trace so the user can see what
was tried before the turn gave up.
"""
try:
buf = getattr(self, "_retry_status_buffer", None)
if not buf:
return
# Drain first so a callback exception doesn't double-emit.
messages = list(buf)
buf.clear()
for kind, msg in messages:
try:
if kind == "status":
self._emit_status(msg)
elif kind == "warn":
self._emit_warning(msg)
else:
self._vprint(f"{self.log_prefix}{msg}", force=True)
except Exception:
pass
except Exception:
pass
def _disable_codex_reasoning_replay(
self,
messages: Optional[List[Dict[str, Any]]] = None,

View file

@ -114,6 +114,7 @@ def test_ttfb_includes_silent_hang_hint_for_gpt_5_5(tmp_path, monkeypatch):
statuses: list[str] = []
dummy_client = SimpleNamespace()
monkeypatch.setattr(agent, "_create_request_openai_client", lambda **k: dummy_client)
monkeypatch.setattr(agent, "_buffer_status", lambda msg: statuses.append(msg))
monkeypatch.setattr(agent, "_emit_status", lambda msg: statuses.append(msg))
monkeypatch.setattr(
agent, "_abort_request_openai_client",

View file

@ -0,0 +1,157 @@
"""Tests for the retry/fallback status buffer helpers on AIAgent.
These helpers defer noisy retry chatter (rate-limit retries, fallback
switches, compression attempts) so users only see the trace when
everything ultimately fails. On successful recovery the buffer is
silently dropped.
"""
from __future__ import annotations
import pytest
from run_agent import AIAgent
def _make_bare_agent():
"""Construct an AIAgent without running __init__ — we only need the
buffered-status helpers, which are pure-Python and depend only on a
handful of attributes."""
agent = object.__new__(AIAgent)
agent.log_prefix = ""
agent.status_callback = None
agent.suppress_status_output = False
agent._mute_post_response = False
agent._executing_tools = False
agent._print_fn = None
return agent
def test_buffer_status_accumulates_then_flushes(capsys):
agent = _make_bare_agent()
emitted = []
agent._emit_status = lambda msg: emitted.append(("status", msg))
agent._buffer_status("⏳ Retrying...")
agent._buffer_status("⚠️ Fallback...")
# Nothing emitted yet — they are buffered.
assert emitted == []
assert agent._retry_status_buffer == [
("status", "⏳ Retrying..."),
("status", "⚠️ Fallback..."),
]
# Flush surfaces them in order through _emit_status.
agent._flush_status_buffer()
assert emitted == [
("status", "⏳ Retrying..."),
("status", "⚠️ Fallback..."),
]
# Buffer is drained.
assert agent._retry_status_buffer == []
def test_clear_drops_buffered_messages_silently():
agent = _make_bare_agent()
emitted = []
agent._emit_status = lambda msg: emitted.append(msg)
agent._buffer_status("⏳ Retrying...")
agent._buffer_status("⚠️ Fallback...")
agent._clear_status_buffer()
# Nothing was emitted — clear is the success path.
assert emitted == []
assert agent._retry_status_buffer == []
# Subsequent flush is a no-op.
agent._flush_status_buffer()
assert emitted == []
def test_buffer_vprint_replays_via_vprint_with_log_prefix():
agent = _make_bare_agent()
agent.log_prefix = "[abc] "
seen = []
agent._vprint = lambda msg, force=False, **kw: seen.append((msg, force))
agent._buffer_vprint("⚠️ API call failed")
agent._flush_status_buffer()
# Replays through _vprint with force=True and the agent's log_prefix
# prepended (matching the original direct-emit format).
assert seen == [("[abc] ⚠️ API call failed", True)]
def test_flush_empty_buffer_is_noop():
agent = _make_bare_agent()
emitted = []
agent._emit_status = lambda msg: emitted.append(msg)
agent._vprint = lambda msg, force=False, **kw: emitted.append(msg)
# No buffer attribute yet — flush should be a quiet no-op.
agent._flush_status_buffer()
assert emitted == []
# Even after touching the buffer (via clear on an empty/missing buffer).
agent._clear_status_buffer()
agent._flush_status_buffer()
assert emitted == []
def test_re_buffer_after_flush_works():
agent = _make_bare_agent()
emitted = []
agent._emit_status = lambda msg: emitted.append(msg)
agent._buffer_status("first")
agent._flush_status_buffer()
agent._buffer_status("second")
agent._flush_status_buffer()
assert emitted == ["first", "second"]
def test_mixed_kinds_replay_through_correct_channels():
agent = _make_bare_agent()
agent.log_prefix = ""
statuses = []
vprints = []
warns = []
agent._emit_status = lambda msg: statuses.append(msg)
agent._vprint = lambda msg, force=False, **kw: vprints.append((msg, force))
agent._emit_warning = lambda msg: warns.append(msg)
agent._buffer_status("status-1")
agent._buffer_vprint("vprint-1")
# Manually mix in a "warn" record to verify the dispatch still works.
agent._retry_status_buffer.append(("warn", "warn-1"))
agent._buffer_status("status-2")
agent._flush_status_buffer()
assert statuses == ["status-1", "status-2"]
assert vprints == [("vprint-1", True)]
assert warns == ["warn-1"]
def test_flush_swallows_callback_exceptions():
agent = _make_bare_agent()
seen = []
def boom(msg):
seen.append(msg)
raise RuntimeError("simulated callback failure")
agent._emit_status = boom
agent._buffer_status("first")
agent._buffer_status("second")
# Should not raise even though _emit_status raises for every message.
agent._flush_status_buffer()
# Both messages were attempted.
assert seen == ["first", "second"]
# Buffer drained regardless of failures.
assert agent._retry_status_buffer == []

View file

@ -203,7 +203,7 @@ def test_emit_stream_drop_ui_includes_elapsed_when_available():
diag = AIAgent._stream_diag_init()
diag["started_at"] = time.time() - 8.0 # 8s on the wire before drop
with patch.object(agent, "_emit_status") as mock_emit:
with patch.object(agent, "_buffer_status") as mock_emit:
agent._emit_stream_drop(
error=ConnectionError("x"),
attempt=2,
@ -223,7 +223,7 @@ def test_emit_stream_drop_ui_omits_suffix_without_diag():
agent = _make_agent()
agent.provider = "openrouter"
with patch.object(agent, "_emit_status") as mock_emit:
with patch.object(agent, "_buffer_status") as mock_emit:
agent._emit_stream_drop(
error=ConnectionError("x"),
attempt=2,