fix(gateway): preserve resume marker on interrupted restart

This commit is contained in:
Kevin Yan 2026-05-06 16:57:09 -04:00 committed by Teknium
parent fad684b1f3
commit 961a3535fa
2 changed files with 49 additions and 1 deletions

View file

@ -985,6 +985,26 @@ def _normalize_empty_agent_response(
return response
def _should_clear_resume_pending_after_turn(agent_result: dict) -> bool:
"""Return True only when a gateway turn really completed successfully.
Restart recovery uses ``resume_pending`` as a durable marker for sessions
interrupted during gateway drain. A soft interrupt can still bubble out as
a syntactically normal agent result with an empty final response; clearing
the marker in that case loses the recovery signal and startup auto-resume
has nothing to schedule.
"""
if not isinstance(agent_result, dict):
return False
if agent_result.get("interrupted"):
return False
if agent_result.get("failed") or agent_result.get("partial") or agent_result.get("error"):
return False
if agent_result.get("completed") is False:
return False
return True
class GatewayRunner:
"""
Main gateway controller.
@ -6587,7 +6607,7 @@ class GatewayRunner:
# shutdown) — the turn ran to completion, so recovery
# succeeded and subsequent messages should no longer receive
# the restart-interruption system note.
if session_key:
if session_key and _should_clear_resume_pending_after_turn(agent_result):
self._clear_restart_failure_count(session_key)
try:
self.session_store.clear_resume_pending(session_key)
@ -13963,6 +13983,11 @@ class GatewayRunner:
"messages": result.get("messages", []),
"api_calls": result.get("api_calls", 0),
"failed": result.get("failed", False),
"partial": result.get("partial", False),
"completed": result.get("completed"),
"interrupted": result.get("interrupted", False),
"interrupt_message": result.get("interrupt_message"),
"error": result.get("error"),
"compression_exhausted": result.get("compression_exhausted", False),
"tools": tools_holder[0] or [],
"history_offset": len(agent_history),
@ -14078,6 +14103,11 @@ class GatewayRunner:
"last_reasoning": result.get("last_reasoning"),
"messages": result_holder[0].get("messages", []) if result_holder[0] else [],
"api_calls": result_holder[0].get("api_calls", 0) if result_holder[0] else 0,
"completed": result_holder[0].get("completed") if result_holder[0] else None,
"interrupted": result_holder[0].get("interrupted", False) if result_holder[0] else False,
"partial": result_holder[0].get("partial", False) if result_holder[0] else False,
"error": result_holder[0].get("error") if result_holder[0] else None,
"interrupt_message": result_holder[0].get("interrupt_message") if result_holder[0] else None,
"tools": tools_holder[0] or [],
"history_offset": _effective_history_offset,
"last_prompt_tokens": _last_prompt_toks,

View file

@ -39,6 +39,7 @@ from gateway.run import (
_coerce_gateway_timestamp,
_is_fresh_gateway_interruption,
_last_transcript_timestamp,
_should_clear_resume_pending_after_turn,
)
from gateway.session import SessionEntry, SessionSource, SessionStore
from tests.gateway.restart_test_helpers import (
@ -52,6 +53,23 @@ from tests.gateway.restart_test_helpers import (
# ---------------------------------------------------------------------------
def test_resume_pending_is_cleared_only_after_successful_turn():
"""Interrupted/failed drain results must keep the restart recovery marker.
Regression for dogfood failure: during gateway restart the interrupted run
returned an empty final response and was normalized into a user-facing
fallback, but the gateway cleared ``resume_pending`` before startup could
auto-resume it.
"""
assert _should_clear_resume_pending_after_turn({"final_response": "done"}) is True
assert _should_clear_resume_pending_after_turn({"completed": True}) is True
assert _should_clear_resume_pending_after_turn({"interrupted": True}) is False
assert _should_clear_resume_pending_after_turn({"completed": False}) is False
assert _should_clear_resume_pending_after_turn({"failed": True}) is False
assert _should_clear_resume_pending_after_turn({"partial": True}) is False
assert _should_clear_resume_pending_after_turn({"error": "boom"}) is False
def _make_source(platform=Platform.TELEGRAM, chat_id="123", user_id="u1"):
return SessionSource(platform=platform, chat_id=chat_id, user_id=user_id)