fix(agent): merge consecutive assistant messages before API replay (#29148, #49147) (#55603)

* fix(agent): merge consecutive assistant messages in repair_message_sequence

Strict OpenAI-compatible providers (DeepSeek v4, Moonshot/Kimi) reject a
replayed history where an assistant message carrying tool_calls is
immediately followed by another assistant message instead of its tool
results — HTTP 400 'An assistant message with tool_calls must be
followed by tool messages...'.

repair_message_sequence (the defensive belt run before every API call)
fixed orphan-tool and consecutive-user shapes but never merged
consecutive assistant messages. Adds a Pass 0 that collapses adjacent
assistant turns into one — union of tool_calls, concatenated content,
carried reasoning_content — covering both reported shapes:
  - parallel tool calls split across two assistant turns (#29148)
  - content-only assistant followed by tool_calls-only assistant (#49147)

A tool result or user turn between two assistants blocks the merge
(distinct, valid rounds). Runs before Pass 1 so the merged union of
tool_call ids is known to the orphan-tool filter.

Closes #29148, #49147.
Co-authored-by: Bartok9 <danielrpike9@gmail.com>
Co-authored-by: woaini30050 <woaini30050@users.noreply.github.com>
Co-authored-by: weidzhou <weidzhou@users.noreply.github.com>

* fix(agent): exempt codex Responses interim turns from assistant merge

The Pass 0 consecutive-assistant merge collapsed codex_responses interim
turns, which legitimately stay separate — each carries its own encrypted
continuation state (codex_reasoning_items / codex_message_items) that
must replay verbatim. Skip the merge when either side is a codex interim
(has codex_reasoning_items / codex_message_items / finish_reason=='incomplete').

Fixes the slice-2 regression in test_run_agent_codex_responses.py
(test_duplicate_detection_distinguishes_different_codex_{reasoning,message_items}).

---------

Co-authored-by: Bartok9 <danielrpike9@gmail.com>
Co-authored-by: woaini30050 <woaini30050@users.noreply.github.com>
Co-authored-by: weidzhou <weidzhou@users.noreply.github.com>
This commit is contained in:
Teknium 2026-06-30 04:22:56 -07:00 committed by GitHub
parent d2d470e321
commit cbe397ef45
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 301 additions and 1 deletions

View file

@ -368,6 +368,18 @@ def repair_message_sequence(agent, messages: List[Dict]) -> int:
host code) can feed in already-broken histories.
Repairs applied:
0. Consecutive ``assistant`` messages with no intervening
``tool``/``user`` turn merged into a single assistant turn
(union of ``tool_calls``, concatenated ``content``). Strict
OpenAI-compatible providers (DeepSeek v4, Moonshot/Kimi) reject
a history where an ``assistant`` message carrying ``tool_calls``
is immediately followed by another ``assistant`` message instead
of its ``tool`` results HTTP 400 "An assistant message with
'tool_calls' must be followed by tool messages". The split
shape is produced by recovery/continuation paths that append an
interim assistant turn (thinking-prefill, codex
incomplete-continuation) or by host-fed / legacy-persisted /
resumed histories. Refs #29148, #49147.
1. Stray ``tool`` messages whose ``tool_call_id`` doesn't match
any preceding assistant tool_call dropped.
2. Consecutive ``user`` messages merged with newline separator
@ -387,12 +399,74 @@ def repair_message_sequence(agent, messages: List[Dict]) -> int:
repairs = 0
# Pass 0: merge consecutive assistant messages. Runs BEFORE Pass 1 so
# the merged turn's union of tool_call ids is known when Pass 1
# validates which tool-result messages are orphans. Two assistant
# messages are only adjacent here when nothing (no tool result, no
# user turn) separates them — an intervening ``tool`` message means
# two distinct, valid tool-call rounds that must NOT be merged.
#
# Codex Responses interim turns are exempt: the codex_responses
# api_mode legitimately keeps multiple consecutive incomplete
# assistant turns in history, each carrying its own encrypted
# continuation state (codex_reasoning_items / codex_message_items)
# that must be replayed verbatim. Collapsing them corrupts the
# Responses replay chain (the duplicate-detection logic at
# conversation_loop.py already de-dups identical codex interims).
def _is_codex_interim(m: Dict) -> bool:
return bool(
m.get("codex_reasoning_items")
or m.get("codex_message_items")
or m.get("finish_reason") == "incomplete"
)
collapsed: List[Dict] = []
for msg in messages:
if (
collapsed
and isinstance(msg, dict)
and msg.get("role") == "assistant"
and isinstance(collapsed[-1], dict)
and collapsed[-1].get("role") == "assistant"
and not _is_codex_interim(msg)
and not _is_codex_interim(collapsed[-1])
):
prev = collapsed[-1]
# Union tool_calls (preserve order, both may carry them).
prev_calls = list(prev.get("tool_calls") or [])
new_calls = list(msg.get("tool_calls") or [])
if new_calls:
prev["tool_calls"] = prev_calls + new_calls
elif prev_calls:
prev["tool_calls"] = prev_calls
# Concatenate plain-text content; leave multimodal (list)
# content on either side alone to avoid mangling attachment
# blocks — fall back to keeping the existing content.
prev_content = prev.get("content")
new_content = msg.get("content")
if isinstance(prev_content, str) and isinstance(new_content, str):
joined = "\n".join(
p for p in (prev_content.strip(), new_content.strip()) if p
)
prev["content"] = joined
elif not prev_content and new_content is not None:
prev["content"] = new_content
# Carry reasoning_content from the later turn only if the
# earlier turn lacks it (strict thinking providers require a
# reasoning_content on the merged tool-call turn; the first
# non-empty one suffices).
if not prev.get("reasoning_content") and msg.get("reasoning_content"):
prev["reasoning_content"] = msg["reasoning_content"]
repairs += 1
continue
collapsed.append(msg)
# Pass 1: drop stray tool messages that don't follow a known
# assistant tool_call_id. Uses a rolling set of known ids refreshed
# on each assistant message.
known_tool_ids: set = set()
filtered: List[Dict] = []
for msg in messages:
for msg in collapsed:
if not isinstance(msg, dict):
filtered.append(msg)
continue

View file

@ -298,3 +298,229 @@ def test_flush_guard_clamps_overshooting_cursor():
# min(5, 2) = 2 → nothing skipped below start_idx, cursor settles at 2
assert agent._last_flushed_db_idx == 2
# ── Pass 0: merge consecutive assistant messages (issue #29148, #49147) ─────
def test_repair_merges_parallel_tool_calls_split_across_assistants():
"""Two adjacent assistant(tool_calls) collapse into one turn (#29148).
DeepSeek v4 rejects a replayed history where parallel calls appear as
separate assistant turns:
assistant(tc=[A]) assistant(tc=[B]) tool(A) tool(B)
The repair must produce:
assistant(tc=[A, B]) tool(A) tool(B)
"""
agent = _bare_agent()
messages = [
{"role": "user", "content": "run both"},
{"role": "assistant", "content": "",
"tool_calls": [{"id": "call_A", "type": "function",
"function": {"name": "session_search", "arguments": "{}"}}]},
{"role": "assistant", "content": "",
"tool_calls": [{"id": "call_B", "type": "function",
"function": {"name": "search_files", "arguments": "{}"}}]},
{"role": "tool", "tool_call_id": "call_A", "content": "A"},
{"role": "tool", "tool_call_id": "call_B", "content": "B"},
]
repairs = AIAgent._repair_message_sequence(agent, messages)
assert repairs >= 1
assistant_msgs = [m for m in messages if m.get("role") == "assistant"]
assert len(assistant_msgs) == 1
assert {tc["id"] for tc in assistant_msgs[0]["tool_calls"]} == {"call_A", "call_B"}
# Both tool results survive Pass 1 (their ids are in the merged union).
assert sum(1 for m in messages if m.get("role") == "tool") == 2
def test_repair_merges_content_then_toolcalls_split():
"""content-only assistant followed by tool_calls-only assistant merge (#49147).
The recovery/continuation paths can leave:
assistant(content="Let me search") assistant(tool_calls=[A]) tool(A)
which must become:
assistant(content="Let me search", tool_calls=[A]) tool(A)
"""
agent = _bare_agent()
messages = [
{"role": "user", "content": "search"},
{"role": "assistant", "content": "Let me search for that."},
{"role": "assistant", "content": "",
"tool_calls": [{"id": "call_1", "type": "function",
"function": {"name": "session_search", "arguments": "{}"}}]},
{"role": "tool", "tool_call_id": "call_1", "content": "found"},
]
repairs = AIAgent._repair_message_sequence(agent, messages)
assert repairs >= 1
assistant_msgs = [m for m in messages if m.get("role") == "assistant"]
assert len(assistant_msgs) == 1
merged = assistant_msgs[0]
assert merged["content"] == "Let me search for that."
assert len(merged["tool_calls"]) == 1
assert merged["tool_calls"][0]["id"] == "call_1"
# Tool result still follows immediately.
assert messages[-1]["role"] == "tool"
def test_repair_merges_three_consecutive_assistant_tool_calls():
"""Three adjacent assistant(tool_calls) turns all collapse into one."""
agent = _bare_agent()
messages = [
{"role": "user", "content": "run three"},
{"role": "assistant", "content": "",
"tool_calls": [{"id": "c1", "type": "function",
"function": {"name": "x", "arguments": "{}"}}]},
{"role": "assistant", "content": "",
"tool_calls": [{"id": "c2", "type": "function",
"function": {"name": "y", "arguments": "{}"}}]},
{"role": "assistant", "content": "",
"tool_calls": [{"id": "c3", "type": "function",
"function": {"name": "z", "arguments": "{}"}}]},
{"role": "tool", "tool_call_id": "c1", "content": "r1"},
{"role": "tool", "tool_call_id": "c2", "content": "r2"},
{"role": "tool", "tool_call_id": "c3", "content": "r3"},
]
repairs = AIAgent._repair_message_sequence(agent, messages)
assert repairs >= 2
assistant_msgs = [m for m in messages if m.get("role") == "assistant"]
assert len(assistant_msgs) == 1
assert len(assistant_msgs[0]["tool_calls"]) == 3
assert sum(1 for m in messages if m.get("role") == "tool") == 3
def test_repair_does_NOT_merge_tool_calls_separated_by_tool_result():
"""A tool result between two assistant(tool_calls) marks distinct rounds.
This is the critical guard: two sequential tool-call rounds must NOT be
collapsed, or the second round's tool result would orphan.
"""
agent = _bare_agent()
messages = [
{"role": "user", "content": "go"},
{"role": "assistant", "content": "",
"tool_calls": [{"id": "t1", "type": "function",
"function": {"name": "f", "arguments": "{}"}}]},
{"role": "tool", "tool_call_id": "t1", "content": "done"},
{"role": "assistant", "content": "",
"tool_calls": [{"id": "t2", "type": "function",
"function": {"name": "g", "arguments": "{}"}}]},
{"role": "tool", "tool_call_id": "t2", "content": "done2"},
]
before = sum(1 for m in messages if m.get("role") == "assistant")
AIAgent._repair_message_sequence(agent, messages)
assert sum(1 for m in messages if m.get("role") == "assistant") == before
# Both tool results survive (neither orphaned).
assert sum(1 for m in messages if m.get("role") == "tool") == 2
def test_repair_does_NOT_merge_assistant_separated_by_user():
"""A user turn between two assistants blocks the merge (normal dialog)."""
agent = _bare_agent()
messages = [
{"role": "user", "content": "q1"},
{"role": "assistant", "content": "a1"},
{"role": "user", "content": "q2"},
{"role": "assistant", "content": "a2"},
]
AIAgent._repair_message_sequence(agent, messages)
assert sum(1 for m in messages if m.get("role") == "assistant") == 2
def test_repair_merges_two_text_only_assistants():
"""Two consecutive text-only assistants (no tool_calls) still merge.
The empty-response / thinking-prefill paths can leave two adjacent
text assistants; strict providers reject consecutive same-role turns.
"""
agent = _bare_agent()
messages = [
{"role": "user", "content": "q"},
{"role": "assistant", "content": "First part."},
{"role": "assistant", "content": "Second part."},
]
repairs = AIAgent._repair_message_sequence(agent, messages)
assert repairs >= 1
assistant_msgs = [m for m in messages if m.get("role") == "assistant"]
assert len(assistant_msgs) == 1
assert assistant_msgs[0]["content"] == "First part.\nSecond part."
def test_repair_preserves_reasoning_content_on_merge():
"""Merged tool-call turn keeps a reasoning_content (DeepSeek/Kimi replay)."""
agent = _bare_agent()
messages = [
{"role": "user", "content": "go"},
{"role": "assistant", "content": "", "reasoning_content": "thinking A",
"tool_calls": [{"id": "a", "type": "function",
"function": {"name": "f", "arguments": "{}"}}]},
{"role": "assistant", "content": "",
"tool_calls": [{"id": "b", "type": "function",
"function": {"name": "g", "arguments": "{}"}}]},
{"role": "tool", "tool_call_id": "a", "content": "ra"},
{"role": "tool", "tool_call_id": "b", "content": "rb"},
]
AIAgent._repair_message_sequence(agent, messages)
merged = [m for m in messages if m.get("role") == "assistant"][0]
assert merged.get("reasoning_content") == "thinking A"
def test_repair_noop_on_valid_parallel_format():
"""A correctly-formatted single assistant with multiple tool_calls is unchanged."""
agent = _bare_agent()
messages = [
{"role": "user", "content": "run both"},
{"role": "assistant", "content": "",
"tool_calls": [
{"id": "call_A", "type": "function",
"function": {"name": "session_search", "arguments": "{}"}},
{"id": "call_B", "type": "function",
"function": {"name": "search_files", "arguments": "{}"}},
]},
{"role": "tool", "tool_call_id": "call_A", "content": "A"},
{"role": "tool", "tool_call_id": "call_B", "content": "B"},
]
original_len = len(messages)
repairs = AIAgent._repair_message_sequence(agent, messages)
assert repairs == 0
assert len(messages) == original_len
def test_repair_does_NOT_merge_codex_interim_assistants():
"""Codex Responses interim turns stay separate (encrypted replay state).
The codex_responses api_mode keeps multiple consecutive incomplete
assistant turns, each carrying distinct codex_reasoning_items /
codex_message_items that must replay verbatim. Pass 0 must exempt them.
Refs test_run_agent_codex_responses.py duplicate-detection tests.
"""
agent = _bare_agent()
messages = [
{"role": "user", "content": "think hard"},
{"role": "assistant", "content": "", "finish_reason": "incomplete",
"codex_reasoning_items": [{"encrypted_content": "enc_first"}]},
{"role": "assistant", "content": "", "finish_reason": "incomplete",
"codex_reasoning_items": [{"encrypted_content": "enc_second"}]},
{"role": "assistant", "content": "Final answer."},
]
AIAgent._repair_message_sequence(agent, messages)
interim = [m for m in messages if m.get("finish_reason") == "incomplete"]
assert len(interim) == 2
encs = [m["codex_reasoning_items"][0]["encrypted_content"] for m in interim]
assert "enc_first" in encs and "enc_second" in encs