fix(gateway): avoid stale interrupted turn auto-continue

This commit is contained in:
beliefanx 2026-04-28 11:37:22 +08:00 committed by Teknium
parent b61d9b297a
commit 93feffbcfa
2 changed files with 118 additions and 4 deletions

View file

@ -40,6 +40,53 @@ from agent.account_usage import fetch_account_usage, render_account_usage_lines
# from _enforce_agent_cache_cap() and _session_expiry_watcher() below.
_AGENT_CACHE_MAX_SIZE = 128
_AGENT_CACHE_IDLE_TTL_SECS = 3600.0 # evict agents idle for >1h
# Only auto-continue interrupted gateway turns while the interruption is fresh.
# Stale tool-tail/resume markers can otherwise revive an unrelated old task
# after a gateway restart when the user's next message starts new work.
_AUTO_CONTINUE_FRESHNESS_SECS = 15 * 60
def _coerce_gateway_timestamp(value: Any) -> Optional[float]:
"""Best-effort conversion of stored gateway timestamps to epoch seconds.
Missing/unparseable timestamps return None so legacy transcripts keep the
historical auto-continue behaviour instead of being silently dropped.
"""
if value is None:
return None
if isinstance(value, datetime):
return value.timestamp()
if isinstance(value, (int, float)):
# Some platform events use milliseconds; Hermes state rows use seconds.
return float(value) / 1000.0 if float(value) > 10_000_000_000 else float(value)
if isinstance(value, str):
text = value.strip()
if not text:
return None
try:
numeric = float(text)
return numeric / 1000.0 if numeric > 10_000_000_000 else numeric
except ValueError:
pass
try:
return datetime.fromisoformat(text.replace("Z", "+00:00")).timestamp()
except ValueError:
return None
return None
def _is_fresh_gateway_interruption(value: Any, *, now: Optional[float] = None) -> bool:
"""Return True when an interruption marker is fresh enough to auto-continue.
Unknown timestamps are treated as fresh for backward compatibility with
legacy transcripts/tests that predate timestamp persistence.
"""
timestamp = _coerce_gateway_timestamp(value)
if timestamp is None:
return True
current = time.time() if now is None else now
return current - timestamp <= _AUTO_CONTINUE_FRESHNESS_SECS
# ---------------------------------------------------------------------------
# SSL certificate auto-detection for NixOS and other non-standard systems.
@ -10513,8 +10560,20 @@ class GatewayRunner:
_resume_entry = self.session_store._entries.get(session_key)
except Exception:
_resume_entry = None
_resume_marked_at = (
getattr(_resume_entry, "last_resume_marked_at", None)
if _resume_entry is not None
else None
)
_is_resume_pending = bool(
_resume_entry is not None and getattr(_resume_entry, "resume_pending", False)
_resume_entry is not None
and getattr(_resume_entry, "resume_pending", False)
and _is_fresh_gateway_interruption(_resume_marked_at)
)
_has_fresh_tool_tail = bool(
agent_history
and agent_history[-1].get("role") == "tool"
and _is_fresh_gateway_interruption(agent_history[-1].get("timestamp"))
)
if _is_resume_pending:
@ -10534,7 +10593,7 @@ class GatewayRunner:
f"message below.]\n\n"
+ message
)
elif agent_history and agent_history[-1].get("role") == "tool":
elif _has_fresh_tool_tail:
message = (
"[System note: Your previous turn was interrupted before you could "
"process the last tool result(s). The conversation history contains "

View file

@ -26,12 +26,14 @@ PRs #9850, #9934, #7536):
"""
import asyncio
import time
from datetime import datetime, timedelta
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from gateway.config import GatewayConfig, Platform, PlatformConfig
from gateway.run import _is_fresh_gateway_interruption
from gateway.session import SessionEntry, SessionSource, SessionStore
from tests.gateway.restart_test_helpers import (
make_restart_runner,
@ -64,7 +66,16 @@ def _simulate_note_injection(
"""
message = user_message
is_resume_pending = bool(
resume_entry is not None and getattr(resume_entry, "resume_pending", False)
resume_entry is not None
and getattr(resume_entry, "resume_pending", False)
and _is_fresh_gateway_interruption(
getattr(resume_entry, "last_resume_marked_at", None)
)
)
has_fresh_tool_tail = bool(
agent_history
and agent_history[-1].get("role") == "tool"
and _is_fresh_gateway_interruption(agent_history[-1].get("timestamp"))
)
if is_resume_pending:
@ -84,7 +95,7 @@ def _simulate_note_injection(
f"message below.]\n\n"
+ message
)
elif agent_history and agent_history[-1].get("role") == "tool":
elif has_fresh_tool_tail:
message = (
"[System note: Your previous turn was interrupted before you could "
"process the last tool result(s). The conversation history contains "
@ -412,6 +423,50 @@ class TestResumePendingSystemNote:
assert "[System note:" in result
assert "tool result" in result
def test_stale_resume_pending_does_not_inject_restart_note(self):
"""Old restart markers must not revive an unrelated stale task."""
entry = self._pending_entry()
entry.last_resume_marked_at = datetime.now() - timedelta(hours=1)
result = _simulate_note_injection(
agent_history=[{"role": "assistant", "content": "old in progress"}],
user_message="start a new task",
resume_entry=entry,
)
assert result == "start a new task"
def test_fresh_tool_tail_preserves_auto_continue_note(self):
history = [
{"role": "assistant", "content": None, "tool_calls": [
{"id": "c1", "function": {"name": "x", "arguments": "{}"}},
]},
{
"role": "tool",
"tool_call_id": "c1",
"content": "result",
"timestamp": time.time(),
},
]
result = _simulate_note_injection(history, "ping", resume_entry=None)
assert "[System note:" in result
assert "tool result" in result
def test_stale_tool_tail_does_not_inject_auto_continue_note(self):
history = [
{"role": "assistant", "content": None, "tool_calls": [
{"id": "c1", "function": {"name": "x", "arguments": "{}"}},
]},
{
"role": "tool",
"tool_call_id": "c1",
"content": "stale result",
"timestamp": time.time() - 3600,
},
]
result = _simulate_note_injection(history, "start a new task", resume_entry=None)
assert result == "start a new task"
def test_no_note_when_nothing_to_resume(self):
history = [
{"role": "user", "content": "hello"},