fix(gateway): preserve new input on interrupted replay cleanup

This commit is contained in:
Teknium 2026-06-14 04:29:05 -07:00
parent 5191c1c2ce
commit 2c174bce24
4 changed files with 211 additions and 74 deletions

View file

@ -677,13 +677,15 @@ def _build_gateway_agent_history(
clean_msg = {k: v for k, v in msg.items() if k not in {"timestamp", "observed"}}
agent_history.append(clean_msg)
elif content:
# Skip gateway-injected auto-continue / background-process notes
# that were persisted as user messages during interrupted turns.
# Replaying them tells the LLM to re-process old work instead of
# addressing the user's new message — which is the root cause of
# infinite re-transcription / re-analysis loops.
if role == "user" and _is_auto_continue_noise(content):
continue
# Strip gateway-injected auto-continue notes that were persisted
# as part of user messages during interrupted turns. Keep the
# user's real text after the note, but never replay the recovery
# instruction itself — that is what caused infinite re-execution
# loops for interrupted long-running tools.
if role == "user":
content = _strip_auto_continue_noise(content)
if not content:
continue
# Simple text message - just need role and content.
if msg.get("mirror"):
mirror_src = msg.get("mirror_source", "another session")
@ -766,78 +768,93 @@ def _is_interrupted_tool_result(content: Any) -> bool:
"""Return True if a tool result indicates the tool was interrupted."""
if not isinstance(content, str):
return False
if "[Command interrupted]" in content:
lowered = content.lower()
if "[command interrupted]" in lowered:
return True
if '"exit_code": 130' in content or '"exit_code": -1' in content:
if "interrupt" in content.lower() or "Command interrupted" in content:
return True
if "exit_code" in lowered and ("130" in lowered or "-1" in lowered):
return "interrupt" in lowered
return False
def _strip_interrupted_tool_tails(
agent_history: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
"""Strip trailing tool-call sequences whose results were all interrupted.
"""Strip interrupted assistant→tool sequences from replay history.
When the gateway interrupts a turn mid-tool-execution, the session DB stores
the tool_calls + tool results (which contain "[Command interrupted]" /
exit_code 130). Replaying these to the LLM on the next turn causes it to
re-execute the same tools this function detects those tail sequences and
removes them before they reach the model.
Older interrupted gateway turns can be followed by a queued real user
message, so the interrupted assistant/tool block is not necessarily the
final tail by the time we rebuild replay history. Remove any contiguous
assistant(tool_calls) + tool-result block that contains an interrupted tool
result, while preserving successful tool-call sequences intact.
"""
if not agent_history:
return agent_history
cleaned: List[Dict[str, Any]] = []
i = 0
n = len(agent_history)
tool_tail_end = n
while tool_tail_end > 0 and agent_history[tool_tail_end - 1].get("role") == "tool":
tool_tail_end -= 1
while i < n:
msg = agent_history[i]
if msg.get("role") == "assistant" and "tool_calls" in msg:
j = i + 1
tool_results: List[Dict[str, Any]] = []
while j < n and agent_history[j].get("role") == "tool":
tool_results.append(agent_history[j])
j += 1
if tool_results and any(
_is_interrupted_tool_result(m.get("content", ""))
for m in tool_results
):
logger.debug(
"Stripping interrupted assistant→tool replay block "
"(indices %d%d, tool_results=%d)",
i, j - 1, len(tool_results),
)
i = j
continue
if msg.get("role") == "tool" and _is_interrupted_tool_result(msg.get("content", "")):
logger.debug("Stripping orphan interrupted tool result from replay history")
i += 1
continue
cleaned.append(msg)
i += 1
if tool_tail_end == n:
return agent_history
tail_tool_results = agent_history[tool_tail_end:]
if not all(
_is_interrupted_tool_result(m.get("content", ""))
for m in tail_tool_results
):
return agent_history
assistant_idx = tool_tail_end - 1
if assistant_idx < 0:
return agent_history
if (
agent_history[assistant_idx].get("role") != "assistant"
or "tool_calls" not in agent_history[assistant_idx]
):
return agent_history
logger.debug(
"Stripping %d interrupted tool result(s) + triggering assistant from "
"replay history (indices %d%d)",
len(tail_tool_results), assistant_idx, n - 1,
)
return agent_history[:assistant_idx]
return cleaned
_AUTO_CONTINUE_NOTE_PREFIX = "[System note: Your previous turn"
_BG_PROCESS_NOTE_PREFIX = "[IMPORTANT: Background process"
_AUTO_CONTINUE_FALLBACK_PREFIX = "[System note: A new message"
def _is_auto_continue_noise(content: Any) -> bool:
"""Return True if this user-message content is a gateway-injected
auto-continue or background-process note that should NOT be replayed
as a real user turn."""
auto-continue note that should NOT be replayed as a real user turn."""
if not isinstance(content, str):
return False
return (
content.startswith(_AUTO_CONTINUE_NOTE_PREFIX)
or content.startswith(_BG_PROCESS_NOTE_PREFIX)
or content.startswith(_AUTO_CONTINUE_FALLBACK_PREFIX)
)
def _strip_auto_continue_noise(content: Any) -> Any:
"""Remove persisted gateway auto-continue note prefix from user text.
Older gateway builds prepended the recovery note directly to the user
message, so the transcript row can contain both the synthetic note and
the user's real question. Strip one or more leading synthetic notes while
preserving any real text that follows.
"""
if not _is_auto_continue_noise(content):
return content
text = str(content)
while _is_auto_continue_noise(text):
end = text.find("]")
if end < 0:
return ""
text = text[end + 1 :].lstrip()
return text
# Tools in this set return their deliverable artifact as a JSON payload with a
# local-file path field rather than a literal ``MEDIA:`` tag (e.g. image_generate
# returns ``{"success": true, "image": "/abs/path.png"}``). The auto-append path
@ -14655,6 +14672,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
except Exception as _e:
logger.error("Failed to send approval request: %s", _e)
# Keep real user text separate from API-only recovery guidance. If
# an auto-continue note is prepended below, persist the original
# message so stale guidance never replays as user-authored text.
_persist_user_message_override: Optional[Any] = None
# Prepend pending model switch note so the model knows about the switch
_pending_notes = getattr(self, '_pending_model_notes', {})
_msn = _pending_notes.pop(session_key, None) if session_key else None
@ -14715,6 +14737,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
if _reason == "shutdown_timeout"
else "a gateway interruption"
)
_persist_user_message_override = message
message = (
f"[System note: A new message has arrived. The previous turn "
f"was interrupted by {_reason_phrase}. "
@ -14725,6 +14748,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
+ message
)
elif _has_fresh_tool_tail:
_persist_user_message_override = message
message = (
"[System note: A new message has arrived. The conversation "
"history contains pending tool outputs from an interrupted turn. "
@ -14787,7 +14811,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
"conversation_history": agent_history,
"task_id": session_id,
}
if observed_group_context:
if _persist_user_message_override is not None:
_conversation_kwargs["persist_user_message"] = _persist_user_message_override
elif observed_group_context:
_conversation_kwargs["persist_user_message"] = message
result = agent.run_conversation(_api_run_message, **_conversation_kwargs)
finally:

View file

@ -47,6 +47,7 @@ ACP_REGISTRY_MANIFEST = REPO_ROOT / "acp_registry" / "agent.json"
AUTHOR_MAP = {
"kenmege@yahoo.com": "Kenmege",
"dkobi16@gmail.com": "Diyoncrz18",
"arnaud@nolimitdevelopment.com": "ali-nld",
"sswdarius@gmail.com": "necoweb3",
"peterhao@Peters-MacBook-Air.local": "pinguarmy",
"adalsteinnhelgason@Aalsteinns-MacBook-Pro-3.local": "AIalliAI",

View file

@ -1,9 +1,9 @@
"""Tests for the auto-continue feature (#4493).
"""Tests for the auto-continue feature (#4493 / #45232).
When the gateway restarts mid-agent-work, the session transcript ends on a
When the gateway restarts mid-agent-work, the session transcript can end on a
tool result that the agent never processed. The auto-continue logic detects
this and prepends a system note to the next user message so the model
finishes the interrupted work before addressing the new input.
this and prepends an API-only system note to the next user message so the model
does not re-execute stale interrupted tool calls before addressing new input.
"""
@ -18,11 +18,10 @@ def _simulate_auto_continue(agent_history: list, user_message: str) -> str:
message = user_message
if agent_history and agent_history[-1].get("role") == "tool":
message = (
"[System note: Your previous turn was interrupted before you could "
"process the last tool result(s). The conversation history contains "
"tool outputs you haven't responded to yet. Please finish processing "
"those results and summarize what was accomplished, then address the "
"user's new message below.]\n\n"
"[System note: A new message has arrived. The conversation "
"history contains pending tool outputs from an interrupted turn. "
"IGNORE those pending results. Address the user's NEW message "
"below FIRST. Do NOT re-execute old tool calls from the history.]\n\n"
+ message
)
return message
@ -42,6 +41,8 @@ class TestAutoDetection:
result = _simulate_auto_continue(history, "what happened?")
assert "[System note:" in result
assert "interrupted" in result
assert "NEW message" in result
assert "Do NOT re-execute" in result
assert "what happened?" in result
def test_trailing_assistant_message_no_note(self):
@ -92,3 +93,105 @@ class TestAutoDetection:
note_end = result.index("]\n\n")
user_msg_start = result.index("now do X")
assert user_msg_start > note_end
class TestInterruptedReplayFiltering:
def test_interrupted_tool_tail_is_removed_from_agent_history(self):
from gateway.run import _build_gateway_agent_history
history = [
{"role": "user", "content": "transcribe this video"},
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": "call_1", "function": {"name": "terminal", "arguments": "{}"}},
],
},
{
"role": "tool",
"tool_call_id": "call_1",
"content": '{"exit_code": 130, "output": "[Command interrupted]"}',
},
]
agent_history, observed_context = _build_gateway_agent_history(history)
assert observed_context is None
assert agent_history == [{"role": "user", "content": "transcribe this video"}]
def test_mixed_tail_with_one_interrupted_result_is_removed(self):
from gateway.run import _build_gateway_agent_history
history = [
{"role": "user", "content": "search and transcribe"},
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": "call_1", "function": {"name": "web_search", "arguments": "{}"}},
{"id": "call_2", "function": {"name": "terminal", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "call_1", "content": "found URL"},
{
"role": "tool",
"tool_call_id": "call_2",
"content": '{"exit_code": 130, "output": "[Command interrupted]"}',
},
]
agent_history, _observed_context = _build_gateway_agent_history(history)
assert agent_history == [{"role": "user", "content": "search and transcribe"}]
def test_successful_tool_tail_is_preserved(self):
from gateway.run import _build_gateway_agent_history
history = [
{"role": "user", "content": "deploy"},
{
"role": "assistant",
"content": None,
"tool_calls": [
{"id": "call_1", "function": {"name": "terminal", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "call_1", "content": "deployed successfully"},
]
agent_history, _observed_context = _build_gateway_agent_history(history)
assert agent_history[-1]["role"] == "tool"
assert agent_history[-1]["content"] == "deployed successfully"
def test_persisted_auto_continue_note_is_not_replayed(self):
from gateway.run import _build_gateway_agent_history
history = [
{"role": "user", "content": "first real question"},
{
"role": "user",
"content": (
"[System note: Your previous turn was interrupted before you could "
"process the last tool result(s).]\n\nsecond real question"
),
},
{"role": "assistant", "content": "answer"},
{
"role": "user",
"content": (
"[System note: A new message has arrived. The conversation "
"history contains pending tool outputs from an interrupted turn.]\n\nthird"
),
},
]
agent_history, _observed_context = _build_gateway_agent_history(history)
assert agent_history == [
{"role": "user", "content": "first real question"},
{"role": "user", "content": "second real question"},
{"role": "assistant", "content": "answer"},
{"role": "user", "content": "third"},
]

View file

@ -154,20 +154,20 @@ def _simulate_note_injection(
else "a gateway interruption"
)
message = (
f"[System note: Your previous turn in this session was interrupted "
f"by {reason_phrase}. The conversation history below is intact. "
f"If it contains unfinished tool result(s), process them first and "
f"summarize what was accomplished, then address the user's new "
f"message below.]\n\n"
f"[System note: A new message has arrived. The previous turn "
f"was interrupted by {reason_phrase}. "
f"Address the user's NEW message below FIRST. "
f"Do NOT re-execute old tool calls — skip any unfinished "
f"work from the conversation history and focus on what the "
f"user is asking now.]\n\n"
+ message
)
elif has_fresh_tool_tail:
message = (
"[System note: Your previous turn was interrupted before you could "
"process the last tool result(s). The conversation history contains "
"tool outputs you haven't responded to yet. Please finish processing "
"those results and summarize what was accomplished, then address the "
"user's new message below.]\n\n"
"[System note: A new message has arrived. The conversation "
"history contains pending tool outputs from an interrupted turn. "
"IGNORE those pending results. Address the user's NEW message "
"below FIRST. Do NOT re-execute old tool calls from the history.]\n\n"
+ message
)
return message
@ -442,6 +442,8 @@ class TestResumePendingSystemNote:
)
assert "[System note:" in result
assert "gateway restart" in result
assert "NEW message" in result
assert "Do NOT re-execute" in result
assert "what happened?" in result
def test_resume_pending_shutdown_note_mentions_shutdown(self):
@ -466,6 +468,7 @@ class TestResumePendingSystemNote:
result = _simulate_note_injection(history, "ping", resume_entry=entry)
assert "[System note:" in result
assert "gateway restart" in result
assert "NEW message" in result
def test_resume_pending_subsumes_tool_tail_note(self):
"""When BOTH conditions are true, the restart-resume note wins —
@ -495,7 +498,8 @@ class TestResumePendingSystemNote:
]
result = _simulate_note_injection(history, "ping", resume_entry=None)
assert "[System note:" in result
assert "tool result" in result
assert "pending tool outputs" in result
assert "Do NOT re-execute" in result
def test_stale_resume_pending_does_not_inject_restart_note(self):
"""Old restart markers must not revive an unrelated stale task.
@ -533,7 +537,8 @@ class TestResumePendingSystemNote:
]
result = _simulate_note_injection(history, "ping", resume_entry=None)
assert "[System note:" in result
assert "tool result" in result
assert "pending tool outputs" in result
assert "Do NOT re-execute" in result
def test_stale_tool_tail_does_not_inject_auto_continue_note(self):
"""The core bug fix: stale tool-tail must not revive a dead task.
@ -624,7 +629,8 @@ class TestResumePendingSystemNote:
history, "ping", resume_entry=None, window_secs=0,
)
assert "[System note:" in result
assert "tool result" in result
assert "pending tool outputs" in result
assert "Do NOT re-execute" in result
def test_legacy_history_without_timestamps_still_injects(self):
"""Transcripts predating timestamp persistence must keep the old
@ -637,7 +643,8 @@ class TestResumePendingSystemNote:
]
result = _simulate_note_injection(history, "ping", resume_entry=None)
assert "[System note:" in result
assert "tool result" in result
assert "pending tool outputs" in result
assert "Do NOT re-execute" in result
def test_no_note_when_nothing_to_resume(self):
history = [