From 2c174bce2408f5d6810b0f16dbd55cb65cd1c6e3 Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 14 Jun 2026 04:29:05 -0700 Subject: [PATCH] fix(gateway): preserve new input on interrupted replay cleanup --- gateway/run.py | 128 +++++++++++-------- scripts/release.py | 1 + tests/gateway/test_auto_continue.py | 121 ++++++++++++++++-- tests/gateway/test_restart_resume_pending.py | 35 +++-- 4 files changed, 211 insertions(+), 74 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index dcf4d1f1f24..b81cd32ff4d 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -677,13 +677,15 @@ def _build_gateway_agent_history( clean_msg = {k: v for k, v in msg.items() if k not in {"timestamp", "observed"}} agent_history.append(clean_msg) elif content: - # Skip gateway-injected auto-continue / background-process notes - # that were persisted as user messages during interrupted turns. - # Replaying them tells the LLM to re-process old work instead of - # addressing the user's new message — which is the root cause of - # infinite re-transcription / re-analysis loops. - if role == "user" and _is_auto_continue_noise(content): - continue + # Strip gateway-injected auto-continue notes that were persisted + # as part of user messages during interrupted turns. Keep the + # user's real text after the note, but never replay the recovery + # instruction itself — that is what caused infinite re-execution + # loops for interrupted long-running tools. + if role == "user": + content = _strip_auto_continue_noise(content) + if not content: + continue # Simple text message - just need role and content. if msg.get("mirror"): mirror_src = msg.get("mirror_source", "another session") @@ -766,78 +768,93 @@ def _is_interrupted_tool_result(content: Any) -> bool: """Return True if a tool result indicates the tool was interrupted.""" if not isinstance(content, str): return False - if "[Command interrupted]" in content: + lowered = content.lower() + if "[command interrupted]" in lowered: return True - if '"exit_code": 130' in content or '"exit_code": -1' in content: - if "interrupt" in content.lower() or "Command interrupted" in content: - return True + if "exit_code" in lowered and ("130" in lowered or "-1" in lowered): + return "interrupt" in lowered return False def _strip_interrupted_tool_tails( agent_history: List[Dict[str, Any]], ) -> List[Dict[str, Any]]: - """Strip trailing tool-call sequences whose results were all interrupted. + """Strip interrupted assistant→tool sequences from replay history. - When the gateway interrupts a turn mid-tool-execution, the session DB stores - the tool_calls + tool results (which contain "[Command interrupted]" / - exit_code 130). Replaying these to the LLM on the next turn causes it to - re-execute the same tools — this function detects those tail sequences and - removes them before they reach the model. + Older interrupted gateway turns can be followed by a queued real user + message, so the interrupted assistant/tool block is not necessarily the + final tail by the time we rebuild replay history. Remove any contiguous + assistant(tool_calls) + tool-result block that contains an interrupted tool + result, while preserving successful tool-call sequences intact. """ if not agent_history: return agent_history + cleaned: List[Dict[str, Any]] = [] + i = 0 n = len(agent_history) - tool_tail_end = n - while tool_tail_end > 0 and agent_history[tool_tail_end - 1].get("role") == "tool": - tool_tail_end -= 1 + while i < n: + msg = agent_history[i] + if msg.get("role") == "assistant" and "tool_calls" in msg: + j = i + 1 + tool_results: List[Dict[str, Any]] = [] + while j < n and agent_history[j].get("role") == "tool": + tool_results.append(agent_history[j]) + j += 1 + if tool_results and any( + _is_interrupted_tool_result(m.get("content", "")) + for m in tool_results + ): + logger.debug( + "Stripping interrupted assistant→tool replay block " + "(indices %d–%d, tool_results=%d)", + i, j - 1, len(tool_results), + ) + i = j + continue + if msg.get("role") == "tool" and _is_interrupted_tool_result(msg.get("content", "")): + logger.debug("Stripping orphan interrupted tool result from replay history") + i += 1 + continue + cleaned.append(msg) + i += 1 - if tool_tail_end == n: - return agent_history - - tail_tool_results = agent_history[tool_tail_end:] - if not all( - _is_interrupted_tool_result(m.get("content", "")) - for m in tail_tool_results - ): - return agent_history - - assistant_idx = tool_tail_end - 1 - if assistant_idx < 0: - return agent_history - - if ( - agent_history[assistant_idx].get("role") != "assistant" - or "tool_calls" not in agent_history[assistant_idx] - ): - return agent_history - - logger.debug( - "Stripping %d interrupted tool result(s) + triggering assistant from " - "replay history (indices %d–%d)", - len(tail_tool_results), assistant_idx, n - 1, - ) - return agent_history[:assistant_idx] + return cleaned _AUTO_CONTINUE_NOTE_PREFIX = "[System note: Your previous turn" -_BG_PROCESS_NOTE_PREFIX = "[IMPORTANT: Background process" _AUTO_CONTINUE_FALLBACK_PREFIX = "[System note: A new message" def _is_auto_continue_noise(content: Any) -> bool: """Return True if this user-message content is a gateway-injected - auto-continue or background-process note that should NOT be replayed - as a real user turn.""" + auto-continue note that should NOT be replayed as a real user turn.""" if not isinstance(content, str): return False return ( content.startswith(_AUTO_CONTINUE_NOTE_PREFIX) - or content.startswith(_BG_PROCESS_NOTE_PREFIX) or content.startswith(_AUTO_CONTINUE_FALLBACK_PREFIX) ) + +def _strip_auto_continue_noise(content: Any) -> Any: + """Remove persisted gateway auto-continue note prefix from user text. + + Older gateway builds prepended the recovery note directly to the user + message, so the transcript row can contain both the synthetic note and + the user's real question. Strip one or more leading synthetic notes while + preserving any real text that follows. + """ + if not _is_auto_continue_noise(content): + return content + text = str(content) + while _is_auto_continue_noise(text): + end = text.find("]") + if end < 0: + return "" + text = text[end + 1 :].lstrip() + return text + # Tools in this set return their deliverable artifact as a JSON payload with a # local-file path field rather than a literal ``MEDIA:`` tag (e.g. image_generate # returns ``{"success": true, "image": "/abs/path.png"}``). The auto-append path @@ -14655,6 +14672,11 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew except Exception as _e: logger.error("Failed to send approval request: %s", _e) + # Keep real user text separate from API-only recovery guidance. If + # an auto-continue note is prepended below, persist the original + # message so stale guidance never replays as user-authored text. + _persist_user_message_override: Optional[Any] = None + # Prepend pending model switch note so the model knows about the switch _pending_notes = getattr(self, '_pending_model_notes', {}) _msn = _pending_notes.pop(session_key, None) if session_key else None @@ -14715,6 +14737,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew if _reason == "shutdown_timeout" else "a gateway interruption" ) + _persist_user_message_override = message message = ( f"[System note: A new message has arrived. The previous turn " f"was interrupted by {_reason_phrase}. " @@ -14725,6 +14748,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew + message ) elif _has_fresh_tool_tail: + _persist_user_message_override = message message = ( "[System note: A new message has arrived. The conversation " "history contains pending tool outputs from an interrupted turn. " @@ -14787,7 +14811,9 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew "conversation_history": agent_history, "task_id": session_id, } - if observed_group_context: + if _persist_user_message_override is not None: + _conversation_kwargs["persist_user_message"] = _persist_user_message_override + elif observed_group_context: _conversation_kwargs["persist_user_message"] = message result = agent.run_conversation(_api_run_message, **_conversation_kwargs) finally: diff --git a/scripts/release.py b/scripts/release.py index 5850d751fa9..6fe9ebb051c 100755 --- a/scripts/release.py +++ b/scripts/release.py @@ -47,6 +47,7 @@ ACP_REGISTRY_MANIFEST = REPO_ROOT / "acp_registry" / "agent.json" AUTHOR_MAP = { "kenmege@yahoo.com": "Kenmege", "dkobi16@gmail.com": "Diyoncrz18", + "arnaud@nolimitdevelopment.com": "ali-nld", "sswdarius@gmail.com": "necoweb3", "peterhao@Peters-MacBook-Air.local": "pinguarmy", "adalsteinnhelgason@Aalsteinns-MacBook-Pro-3.local": "AIalliAI", diff --git a/tests/gateway/test_auto_continue.py b/tests/gateway/test_auto_continue.py index eb20abf55df..de3b738944b 100644 --- a/tests/gateway/test_auto_continue.py +++ b/tests/gateway/test_auto_continue.py @@ -1,9 +1,9 @@ -"""Tests for the auto-continue feature (#4493). +"""Tests for the auto-continue feature (#4493 / #45232). -When the gateway restarts mid-agent-work, the session transcript ends on a +When the gateway restarts mid-agent-work, the session transcript can end on a tool result that the agent never processed. The auto-continue logic detects -this and prepends a system note to the next user message so the model -finishes the interrupted work before addressing the new input. +this and prepends an API-only system note to the next user message so the model +does not re-execute stale interrupted tool calls before addressing new input. """ @@ -18,11 +18,10 @@ def _simulate_auto_continue(agent_history: list, user_message: str) -> str: message = user_message if agent_history and agent_history[-1].get("role") == "tool": message = ( - "[System note: Your previous turn was interrupted before you could " - "process the last tool result(s). The conversation history contains " - "tool outputs you haven't responded to yet. Please finish processing " - "those results and summarize what was accomplished, then address the " - "user's new message below.]\n\n" + "[System note: A new message has arrived. The conversation " + "history contains pending tool outputs from an interrupted turn. " + "IGNORE those pending results. Address the user's NEW message " + "below FIRST. Do NOT re-execute old tool calls from the history.]\n\n" + message ) return message @@ -42,6 +41,8 @@ class TestAutoDetection: result = _simulate_auto_continue(history, "what happened?") assert "[System note:" in result assert "interrupted" in result + assert "NEW message" in result + assert "Do NOT re-execute" in result assert "what happened?" in result def test_trailing_assistant_message_no_note(self): @@ -92,3 +93,105 @@ class TestAutoDetection: note_end = result.index("]\n\n") user_msg_start = result.index("now do X") assert user_msg_start > note_end + + +class TestInterruptedReplayFiltering: + def test_interrupted_tool_tail_is_removed_from_agent_history(self): + from gateway.run import _build_gateway_agent_history + + history = [ + {"role": "user", "content": "transcribe this video"}, + { + "role": "assistant", + "content": None, + "tool_calls": [ + {"id": "call_1", "function": {"name": "terminal", "arguments": "{}"}}, + ], + }, + { + "role": "tool", + "tool_call_id": "call_1", + "content": '{"exit_code": 130, "output": "[Command interrupted]"}', + }, + ] + + agent_history, observed_context = _build_gateway_agent_history(history) + + assert observed_context is None + assert agent_history == [{"role": "user", "content": "transcribe this video"}] + + def test_mixed_tail_with_one_interrupted_result_is_removed(self): + from gateway.run import _build_gateway_agent_history + + history = [ + {"role": "user", "content": "search and transcribe"}, + { + "role": "assistant", + "content": None, + "tool_calls": [ + {"id": "call_1", "function": {"name": "web_search", "arguments": "{}"}}, + {"id": "call_2", "function": {"name": "terminal", "arguments": "{}"}}, + ], + }, + {"role": "tool", "tool_call_id": "call_1", "content": "found URL"}, + { + "role": "tool", + "tool_call_id": "call_2", + "content": '{"exit_code": 130, "output": "[Command interrupted]"}', + }, + ] + + agent_history, _observed_context = _build_gateway_agent_history(history) + + assert agent_history == [{"role": "user", "content": "search and transcribe"}] + + def test_successful_tool_tail_is_preserved(self): + from gateway.run import _build_gateway_agent_history + + history = [ + {"role": "user", "content": "deploy"}, + { + "role": "assistant", + "content": None, + "tool_calls": [ + {"id": "call_1", "function": {"name": "terminal", "arguments": "{}"}}, + ], + }, + {"role": "tool", "tool_call_id": "call_1", "content": "deployed successfully"}, + ] + + agent_history, _observed_context = _build_gateway_agent_history(history) + + assert agent_history[-1]["role"] == "tool" + assert agent_history[-1]["content"] == "deployed successfully" + + def test_persisted_auto_continue_note_is_not_replayed(self): + from gateway.run import _build_gateway_agent_history + + history = [ + {"role": "user", "content": "first real question"}, + { + "role": "user", + "content": ( + "[System note: Your previous turn was interrupted before you could " + "process the last tool result(s).]\n\nsecond real question" + ), + }, + {"role": "assistant", "content": "answer"}, + { + "role": "user", + "content": ( + "[System note: A new message has arrived. The conversation " + "history contains pending tool outputs from an interrupted turn.]\n\nthird" + ), + }, + ] + + agent_history, _observed_context = _build_gateway_agent_history(history) + + assert agent_history == [ + {"role": "user", "content": "first real question"}, + {"role": "user", "content": "second real question"}, + {"role": "assistant", "content": "answer"}, + {"role": "user", "content": "third"}, + ] diff --git a/tests/gateway/test_restart_resume_pending.py b/tests/gateway/test_restart_resume_pending.py index 3ccaf801d52..0974b26b4ec 100644 --- a/tests/gateway/test_restart_resume_pending.py +++ b/tests/gateway/test_restart_resume_pending.py @@ -154,20 +154,20 @@ def _simulate_note_injection( else "a gateway interruption" ) message = ( - f"[System note: Your previous turn in this session was interrupted " - f"by {reason_phrase}. The conversation history below is intact. " - f"If it contains unfinished tool result(s), process them first and " - f"summarize what was accomplished, then address the user's new " - f"message below.]\n\n" + f"[System note: A new message has arrived. The previous turn " + f"was interrupted by {reason_phrase}. " + f"Address the user's NEW message below FIRST. " + f"Do NOT re-execute old tool calls — skip any unfinished " + f"work from the conversation history and focus on what the " + f"user is asking now.]\n\n" + message ) elif has_fresh_tool_tail: message = ( - "[System note: Your previous turn was interrupted before you could " - "process the last tool result(s). The conversation history contains " - "tool outputs you haven't responded to yet. Please finish processing " - "those results and summarize what was accomplished, then address the " - "user's new message below.]\n\n" + "[System note: A new message has arrived. The conversation " + "history contains pending tool outputs from an interrupted turn. " + "IGNORE those pending results. Address the user's NEW message " + "below FIRST. Do NOT re-execute old tool calls from the history.]\n\n" + message ) return message @@ -442,6 +442,8 @@ class TestResumePendingSystemNote: ) assert "[System note:" in result assert "gateway restart" in result + assert "NEW message" in result + assert "Do NOT re-execute" in result assert "what happened?" in result def test_resume_pending_shutdown_note_mentions_shutdown(self): @@ -466,6 +468,7 @@ class TestResumePendingSystemNote: result = _simulate_note_injection(history, "ping", resume_entry=entry) assert "[System note:" in result assert "gateway restart" in result + assert "NEW message" in result def test_resume_pending_subsumes_tool_tail_note(self): """When BOTH conditions are true, the restart-resume note wins — @@ -495,7 +498,8 @@ class TestResumePendingSystemNote: ] result = _simulate_note_injection(history, "ping", resume_entry=None) assert "[System note:" in result - assert "tool result" in result + assert "pending tool outputs" in result + assert "Do NOT re-execute" in result def test_stale_resume_pending_does_not_inject_restart_note(self): """Old restart markers must not revive an unrelated stale task. @@ -533,7 +537,8 @@ class TestResumePendingSystemNote: ] result = _simulate_note_injection(history, "ping", resume_entry=None) assert "[System note:" in result - assert "tool result" in result + assert "pending tool outputs" in result + assert "Do NOT re-execute" in result def test_stale_tool_tail_does_not_inject_auto_continue_note(self): """The core bug fix: stale tool-tail must not revive a dead task. @@ -624,7 +629,8 @@ class TestResumePendingSystemNote: history, "ping", resume_entry=None, window_secs=0, ) assert "[System note:" in result - assert "tool result" in result + assert "pending tool outputs" in result + assert "Do NOT re-execute" in result def test_legacy_history_without_timestamps_still_injects(self): """Transcripts predating timestamp persistence must keep the old @@ -637,7 +643,8 @@ class TestResumePendingSystemNote: ] result = _simulate_note_injection(history, "ping", resume_entry=None) assert "[System note:" in result - assert "tool result" in result + assert "pending tool outputs" in result + assert "Do NOT re-execute" in result def test_no_note_when_nothing_to_resume(self): history = [