From 93feffbcfaf7bac2f795181a5d7c658e33538a34 Mon Sep 17 00:00:00 2001 From: beliefanx Date: Tue, 28 Apr 2026 11:37:22 +0800 Subject: [PATCH] fix(gateway): avoid stale interrupted turn auto-continue --- gateway/run.py | 63 +++++++++++++++++++- tests/gateway/test_restart_resume_pending.py | 59 +++++++++++++++++- 2 files changed, 118 insertions(+), 4 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 21d0299022..edd33c8709 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -40,6 +40,53 @@ from agent.account_usage import fetch_account_usage, render_account_usage_lines # from _enforce_agent_cache_cap() and _session_expiry_watcher() below. _AGENT_CACHE_MAX_SIZE = 128 _AGENT_CACHE_IDLE_TTL_SECS = 3600.0 # evict agents idle for >1h +# Only auto-continue interrupted gateway turns while the interruption is fresh. +# Stale tool-tail/resume markers can otherwise revive an unrelated old task +# after a gateway restart when the user's next message starts new work. +_AUTO_CONTINUE_FRESHNESS_SECS = 15 * 60 + + +def _coerce_gateway_timestamp(value: Any) -> Optional[float]: + """Best-effort conversion of stored gateway timestamps to epoch seconds. + + Missing/unparseable timestamps return None so legacy transcripts keep the + historical auto-continue behaviour instead of being silently dropped. + """ + if value is None: + return None + if isinstance(value, datetime): + return value.timestamp() + if isinstance(value, (int, float)): + # Some platform events use milliseconds; Hermes state rows use seconds. + return float(value) / 1000.0 if float(value) > 10_000_000_000 else float(value) + if isinstance(value, str): + text = value.strip() + if not text: + return None + try: + numeric = float(text) + return numeric / 1000.0 if numeric > 10_000_000_000 else numeric + except ValueError: + pass + try: + return datetime.fromisoformat(text.replace("Z", "+00:00")).timestamp() + except ValueError: + return None + return None + + +def _is_fresh_gateway_interruption(value: Any, *, now: Optional[float] = None) -> bool: + """Return True when an interruption marker is fresh enough to auto-continue. + + Unknown timestamps are treated as fresh for backward compatibility with + legacy transcripts/tests that predate timestamp persistence. + """ + timestamp = _coerce_gateway_timestamp(value) + if timestamp is None: + return True + current = time.time() if now is None else now + return current - timestamp <= _AUTO_CONTINUE_FRESHNESS_SECS + # --------------------------------------------------------------------------- # SSL certificate auto-detection for NixOS and other non-standard systems. @@ -10513,8 +10560,20 @@ class GatewayRunner: _resume_entry = self.session_store._entries.get(session_key) except Exception: _resume_entry = None + _resume_marked_at = ( + getattr(_resume_entry, "last_resume_marked_at", None) + if _resume_entry is not None + else None + ) _is_resume_pending = bool( - _resume_entry is not None and getattr(_resume_entry, "resume_pending", False) + _resume_entry is not None + and getattr(_resume_entry, "resume_pending", False) + and _is_fresh_gateway_interruption(_resume_marked_at) + ) + _has_fresh_tool_tail = bool( + agent_history + and agent_history[-1].get("role") == "tool" + and _is_fresh_gateway_interruption(agent_history[-1].get("timestamp")) ) if _is_resume_pending: @@ -10534,7 +10593,7 @@ class GatewayRunner: f"message below.]\n\n" + message ) - elif agent_history and agent_history[-1].get("role") == "tool": + elif _has_fresh_tool_tail: message = ( "[System note: Your previous turn was interrupted before you could " "process the last tool result(s). The conversation history contains " diff --git a/tests/gateway/test_restart_resume_pending.py b/tests/gateway/test_restart_resume_pending.py index c11b2740db..942d8d238d 100644 --- a/tests/gateway/test_restart_resume_pending.py +++ b/tests/gateway/test_restart_resume_pending.py @@ -26,12 +26,14 @@ PRs #9850, #9934, #7536): """ import asyncio +import time from datetime import datetime, timedelta from unittest.mock import AsyncMock, MagicMock, patch import pytest from gateway.config import GatewayConfig, Platform, PlatformConfig +from gateway.run import _is_fresh_gateway_interruption from gateway.session import SessionEntry, SessionSource, SessionStore from tests.gateway.restart_test_helpers import ( make_restart_runner, @@ -64,7 +66,16 @@ def _simulate_note_injection( """ message = user_message is_resume_pending = bool( - resume_entry is not None and getattr(resume_entry, "resume_pending", False) + resume_entry is not None + and getattr(resume_entry, "resume_pending", False) + and _is_fresh_gateway_interruption( + getattr(resume_entry, "last_resume_marked_at", None) + ) + ) + has_fresh_tool_tail = bool( + agent_history + and agent_history[-1].get("role") == "tool" + and _is_fresh_gateway_interruption(agent_history[-1].get("timestamp")) ) if is_resume_pending: @@ -84,7 +95,7 @@ def _simulate_note_injection( f"message below.]\n\n" + message ) - elif agent_history and agent_history[-1].get("role") == "tool": + elif has_fresh_tool_tail: message = ( "[System note: Your previous turn was interrupted before you could " "process the last tool result(s). The conversation history contains " @@ -412,6 +423,50 @@ class TestResumePendingSystemNote: assert "[System note:" in result assert "tool result" in result + def test_stale_resume_pending_does_not_inject_restart_note(self): + """Old restart markers must not revive an unrelated stale task.""" + entry = self._pending_entry() + entry.last_resume_marked_at = datetime.now() - timedelta(hours=1) + + result = _simulate_note_injection( + agent_history=[{"role": "assistant", "content": "old in progress"}], + user_message="start a new task", + resume_entry=entry, + ) + + assert result == "start a new task" + + def test_fresh_tool_tail_preserves_auto_continue_note(self): + history = [ + {"role": "assistant", "content": None, "tool_calls": [ + {"id": "c1", "function": {"name": "x", "arguments": "{}"}}, + ]}, + { + "role": "tool", + "tool_call_id": "c1", + "content": "result", + "timestamp": time.time(), + }, + ] + result = _simulate_note_injection(history, "ping", resume_entry=None) + assert "[System note:" in result + assert "tool result" in result + + def test_stale_tool_tail_does_not_inject_auto_continue_note(self): + history = [ + {"role": "assistant", "content": None, "tool_calls": [ + {"id": "c1", "function": {"name": "x", "arguments": "{}"}}, + ]}, + { + "role": "tool", + "tool_call_id": "c1", + "content": "stale result", + "timestamp": time.time() - 3600, + }, + ] + result = _simulate_note_injection(history, "start a new task", resume_entry=None) + assert result == "start a new task" + def test_no_note_when_nothing_to_resume(self): history = [ {"role": "user", "content": "hello"},