diff --git a/agent/agent_runtime_helpers.py b/agent/agent_runtime_helpers.py index ac0ba80d6ee..af64541a828 100644 --- a/agent/agent_runtime_helpers.py +++ b/agent/agent_runtime_helpers.py @@ -368,6 +368,18 @@ def repair_message_sequence(agent, messages: List[Dict]) -> int: host code) can feed in already-broken histories. Repairs applied: + 0. Consecutive ``assistant`` messages with no intervening + ``tool``/``user`` turn — merged into a single assistant turn + (union of ``tool_calls``, concatenated ``content``). Strict + OpenAI-compatible providers (DeepSeek v4, Moonshot/Kimi) reject + a history where an ``assistant`` message carrying ``tool_calls`` + is immediately followed by another ``assistant`` message instead + of its ``tool`` results — HTTP 400 "An assistant message with + 'tool_calls' must be followed by tool messages…". The split + shape is produced by recovery/continuation paths that append an + interim assistant turn (thinking-prefill, codex + incomplete-continuation) or by host-fed / legacy-persisted / + resumed histories. Refs #29148, #49147. 1. Stray ``tool`` messages whose ``tool_call_id`` doesn't match any preceding assistant tool_call — dropped. 2. Consecutive ``user`` messages — merged with newline separator @@ -387,12 +399,74 @@ def repair_message_sequence(agent, messages: List[Dict]) -> int: repairs = 0 + # Pass 0: merge consecutive assistant messages. Runs BEFORE Pass 1 so + # the merged turn's union of tool_call ids is known when Pass 1 + # validates which tool-result messages are orphans. Two assistant + # messages are only adjacent here when nothing (no tool result, no + # user turn) separates them — an intervening ``tool`` message means + # two distinct, valid tool-call rounds that must NOT be merged. + # + # Codex Responses interim turns are exempt: the codex_responses + # api_mode legitimately keeps multiple consecutive incomplete + # assistant turns in history, each carrying its own encrypted + # continuation state (codex_reasoning_items / codex_message_items) + # that must be replayed verbatim. Collapsing them corrupts the + # Responses replay chain (the duplicate-detection logic at + # conversation_loop.py already de-dups identical codex interims). + def _is_codex_interim(m: Dict) -> bool: + return bool( + m.get("codex_reasoning_items") + or m.get("codex_message_items") + or m.get("finish_reason") == "incomplete" + ) + + collapsed: List[Dict] = [] + for msg in messages: + if ( + collapsed + and isinstance(msg, dict) + and msg.get("role") == "assistant" + and isinstance(collapsed[-1], dict) + and collapsed[-1].get("role") == "assistant" + and not _is_codex_interim(msg) + and not _is_codex_interim(collapsed[-1]) + ): + prev = collapsed[-1] + # Union tool_calls (preserve order, both may carry them). + prev_calls = list(prev.get("tool_calls") or []) + new_calls = list(msg.get("tool_calls") or []) + if new_calls: + prev["tool_calls"] = prev_calls + new_calls + elif prev_calls: + prev["tool_calls"] = prev_calls + # Concatenate plain-text content; leave multimodal (list) + # content on either side alone to avoid mangling attachment + # blocks — fall back to keeping the existing content. + prev_content = prev.get("content") + new_content = msg.get("content") + if isinstance(prev_content, str) and isinstance(new_content, str): + joined = "\n".join( + p for p in (prev_content.strip(), new_content.strip()) if p + ) + prev["content"] = joined + elif not prev_content and new_content is not None: + prev["content"] = new_content + # Carry reasoning_content from the later turn only if the + # earlier turn lacks it (strict thinking providers require a + # reasoning_content on the merged tool-call turn; the first + # non-empty one suffices). + if not prev.get("reasoning_content") and msg.get("reasoning_content"): + prev["reasoning_content"] = msg["reasoning_content"] + repairs += 1 + continue + collapsed.append(msg) + # Pass 1: drop stray tool messages that don't follow a known # assistant tool_call_id. Uses a rolling set of known ids refreshed # on each assistant message. known_tool_ids: set = set() filtered: List[Dict] = [] - for msg in messages: + for msg in collapsed: if not isinstance(msg, dict): filtered.append(msg) continue diff --git a/tests/run_agent/test_message_sequence_repair.py b/tests/run_agent/test_message_sequence_repair.py index 8fba45ebeb2..93e65193756 100644 --- a/tests/run_agent/test_message_sequence_repair.py +++ b/tests/run_agent/test_message_sequence_repair.py @@ -298,3 +298,229 @@ def test_flush_guard_clamps_overshooting_cursor(): # min(5, 2) = 2 → nothing skipped below start_idx, cursor settles at 2 assert agent._last_flushed_db_idx == 2 + + +# ── Pass 0: merge consecutive assistant messages (issue #29148, #49147) ───── + +def test_repair_merges_parallel_tool_calls_split_across_assistants(): + """Two adjacent assistant(tool_calls) collapse into one turn (#29148). + + DeepSeek v4 rejects a replayed history where parallel calls appear as + separate assistant turns: + assistant(tc=[A]) → assistant(tc=[B]) → tool(A) → tool(B) + The repair must produce: + assistant(tc=[A, B]) → tool(A) → tool(B) + """ + agent = _bare_agent() + messages = [ + {"role": "user", "content": "run both"}, + {"role": "assistant", "content": "", + "tool_calls": [{"id": "call_A", "type": "function", + "function": {"name": "session_search", "arguments": "{}"}}]}, + {"role": "assistant", "content": "", + "tool_calls": [{"id": "call_B", "type": "function", + "function": {"name": "search_files", "arguments": "{}"}}]}, + {"role": "tool", "tool_call_id": "call_A", "content": "A"}, + {"role": "tool", "tool_call_id": "call_B", "content": "B"}, + ] + + repairs = AIAgent._repair_message_sequence(agent, messages) + + assert repairs >= 1 + assistant_msgs = [m for m in messages if m.get("role") == "assistant"] + assert len(assistant_msgs) == 1 + assert {tc["id"] for tc in assistant_msgs[0]["tool_calls"]} == {"call_A", "call_B"} + # Both tool results survive Pass 1 (their ids are in the merged union). + assert sum(1 for m in messages if m.get("role") == "tool") == 2 + + +def test_repair_merges_content_then_toolcalls_split(): + """content-only assistant followed by tool_calls-only assistant merge (#49147). + + The recovery/continuation paths can leave: + assistant(content="Let me search") → assistant(tool_calls=[A]) → tool(A) + which must become: + assistant(content="Let me search", tool_calls=[A]) → tool(A) + """ + agent = _bare_agent() + messages = [ + {"role": "user", "content": "search"}, + {"role": "assistant", "content": "Let me search for that."}, + {"role": "assistant", "content": "", + "tool_calls": [{"id": "call_1", "type": "function", + "function": {"name": "session_search", "arguments": "{}"}}]}, + {"role": "tool", "tool_call_id": "call_1", "content": "found"}, + ] + + repairs = AIAgent._repair_message_sequence(agent, messages) + + assert repairs >= 1 + assistant_msgs = [m for m in messages if m.get("role") == "assistant"] + assert len(assistant_msgs) == 1 + merged = assistant_msgs[0] + assert merged["content"] == "Let me search for that." + assert len(merged["tool_calls"]) == 1 + assert merged["tool_calls"][0]["id"] == "call_1" + # Tool result still follows immediately. + assert messages[-1]["role"] == "tool" + + +def test_repair_merges_three_consecutive_assistant_tool_calls(): + """Three adjacent assistant(tool_calls) turns all collapse into one.""" + agent = _bare_agent() + messages = [ + {"role": "user", "content": "run three"}, + {"role": "assistant", "content": "", + "tool_calls": [{"id": "c1", "type": "function", + "function": {"name": "x", "arguments": "{}"}}]}, + {"role": "assistant", "content": "", + "tool_calls": [{"id": "c2", "type": "function", + "function": {"name": "y", "arguments": "{}"}}]}, + {"role": "assistant", "content": "", + "tool_calls": [{"id": "c3", "type": "function", + "function": {"name": "z", "arguments": "{}"}}]}, + {"role": "tool", "tool_call_id": "c1", "content": "r1"}, + {"role": "tool", "tool_call_id": "c2", "content": "r2"}, + {"role": "tool", "tool_call_id": "c3", "content": "r3"}, + ] + + repairs = AIAgent._repair_message_sequence(agent, messages) + + assert repairs >= 2 + assistant_msgs = [m for m in messages if m.get("role") == "assistant"] + assert len(assistant_msgs) == 1 + assert len(assistant_msgs[0]["tool_calls"]) == 3 + assert sum(1 for m in messages if m.get("role") == "tool") == 3 + + +def test_repair_does_NOT_merge_tool_calls_separated_by_tool_result(): + """A tool result between two assistant(tool_calls) marks distinct rounds. + + This is the critical guard: two sequential tool-call rounds must NOT be + collapsed, or the second round's tool result would orphan. + """ + agent = _bare_agent() + messages = [ + {"role": "user", "content": "go"}, + {"role": "assistant", "content": "", + "tool_calls": [{"id": "t1", "type": "function", + "function": {"name": "f", "arguments": "{}"}}]}, + {"role": "tool", "tool_call_id": "t1", "content": "done"}, + {"role": "assistant", "content": "", + "tool_calls": [{"id": "t2", "type": "function", + "function": {"name": "g", "arguments": "{}"}}]}, + {"role": "tool", "tool_call_id": "t2", "content": "done2"}, + ] + before = sum(1 for m in messages if m.get("role") == "assistant") + + AIAgent._repair_message_sequence(agent, messages) + + assert sum(1 for m in messages if m.get("role") == "assistant") == before + # Both tool results survive (neither orphaned). + assert sum(1 for m in messages if m.get("role") == "tool") == 2 + + +def test_repair_does_NOT_merge_assistant_separated_by_user(): + """A user turn between two assistants blocks the merge (normal dialog).""" + agent = _bare_agent() + messages = [ + {"role": "user", "content": "q1"}, + {"role": "assistant", "content": "a1"}, + {"role": "user", "content": "q2"}, + {"role": "assistant", "content": "a2"}, + ] + + AIAgent._repair_message_sequence(agent, messages) + + assert sum(1 for m in messages if m.get("role") == "assistant") == 2 + + +def test_repair_merges_two_text_only_assistants(): + """Two consecutive text-only assistants (no tool_calls) still merge. + + The empty-response / thinking-prefill paths can leave two adjacent + text assistants; strict providers reject consecutive same-role turns. + """ + agent = _bare_agent() + messages = [ + {"role": "user", "content": "q"}, + {"role": "assistant", "content": "First part."}, + {"role": "assistant", "content": "Second part."}, + ] + + repairs = AIAgent._repair_message_sequence(agent, messages) + + assert repairs >= 1 + assistant_msgs = [m for m in messages if m.get("role") == "assistant"] + assert len(assistant_msgs) == 1 + assert assistant_msgs[0]["content"] == "First part.\nSecond part." + + +def test_repair_preserves_reasoning_content_on_merge(): + """Merged tool-call turn keeps a reasoning_content (DeepSeek/Kimi replay).""" + agent = _bare_agent() + messages = [ + {"role": "user", "content": "go"}, + {"role": "assistant", "content": "", "reasoning_content": "thinking A", + "tool_calls": [{"id": "a", "type": "function", + "function": {"name": "f", "arguments": "{}"}}]}, + {"role": "assistant", "content": "", + "tool_calls": [{"id": "b", "type": "function", + "function": {"name": "g", "arguments": "{}"}}]}, + {"role": "tool", "tool_call_id": "a", "content": "ra"}, + {"role": "tool", "tool_call_id": "b", "content": "rb"}, + ] + + AIAgent._repair_message_sequence(agent, messages) + + merged = [m for m in messages if m.get("role") == "assistant"][0] + assert merged.get("reasoning_content") == "thinking A" + + +def test_repair_noop_on_valid_parallel_format(): + """A correctly-formatted single assistant with multiple tool_calls is unchanged.""" + agent = _bare_agent() + messages = [ + {"role": "user", "content": "run both"}, + {"role": "assistant", "content": "", + "tool_calls": [ + {"id": "call_A", "type": "function", + "function": {"name": "session_search", "arguments": "{}"}}, + {"id": "call_B", "type": "function", + "function": {"name": "search_files", "arguments": "{}"}}, + ]}, + {"role": "tool", "tool_call_id": "call_A", "content": "A"}, + {"role": "tool", "tool_call_id": "call_B", "content": "B"}, + ] + original_len = len(messages) + + repairs = AIAgent._repair_message_sequence(agent, messages) + + assert repairs == 0 + assert len(messages) == original_len + + +def test_repair_does_NOT_merge_codex_interim_assistants(): + """Codex Responses interim turns stay separate (encrypted replay state). + + The codex_responses api_mode keeps multiple consecutive incomplete + assistant turns, each carrying distinct codex_reasoning_items / + codex_message_items that must replay verbatim. Pass 0 must exempt them. + Refs test_run_agent_codex_responses.py duplicate-detection tests. + """ + agent = _bare_agent() + messages = [ + {"role": "user", "content": "think hard"}, + {"role": "assistant", "content": "", "finish_reason": "incomplete", + "codex_reasoning_items": [{"encrypted_content": "enc_first"}]}, + {"role": "assistant", "content": "", "finish_reason": "incomplete", + "codex_reasoning_items": [{"encrypted_content": "enc_second"}]}, + {"role": "assistant", "content": "Final answer."}, + ] + + AIAgent._repair_message_sequence(agent, messages) + + interim = [m for m in messages if m.get("finish_reason") == "incomplete"] + assert len(interim) == 2 + encs = [m["codex_reasoning_items"][0]["encrypted_content"] for m in interim] + assert "enc_first" in encs and "enc_second" in encs