diff --git a/run_agent.py b/run_agent.py index 185431671b..bdfc17efa0 100644 --- a/run_agent.py +++ b/run_agent.py @@ -3836,7 +3836,17 @@ class AIAgent: self._flush_messages_to_session_db(messages, conversation_history) def _drop_trailing_empty_response_scaffolding(self, messages: List[Dict]) -> None: - """Remove private empty-response retry/failure scaffolding from transcript tails.""" + """Remove private empty-response retry/failure scaffolding from transcript tails. + + Also rewinds past any trailing tool-result / assistant(tool_calls) pair + that the failed iteration left hanging. Without this, the tail ends at + a raw ``tool`` message and the next user turn lands as + ``...tool, user, user`` — a protocol-invalid sequence that most + providers silently reject (returns empty content), causing the + empty-retry loop to fire forever. See #. + """ + # Pass 1: strip the flagged scaffolding messages themselves. + dropped_scaffolding = False while ( messages and isinstance(messages[-1], dict) @@ -3846,6 +3856,137 @@ class AIAgent: ) ): messages.pop() + dropped_scaffolding = True + + # Pass 2: if we stripped scaffolding, rewind through any trailing + # tool-result messages plus the assistant(tool_calls) message that + # produced them. This preserves role alternation so the next user + # message follows a user or assistant message, not an orphan tool + # result. Only runs when scaffolding was actually present — normal + # conversation tails (real tool loops mid-progress) are untouched. + if not dropped_scaffolding: + return + + # Drop any trailing tool-result messages + while ( + messages + and isinstance(messages[-1], dict) + and messages[-1].get("role") == "tool" + ): + messages.pop() + + # Drop the assistant message that issued the tool calls, if the tail + # now ends in an assistant-with-tool_calls (the pair that owned the + # just-popped tool results). Without this, the tail is + # ``assistant(tool_calls=...)`` with no tool answers, which some + # providers also reject. + if ( + messages + and isinstance(messages[-1], dict) + and messages[-1].get("role") == "assistant" + and messages[-1].get("tool_calls") + ): + messages.pop() + + def _repair_message_sequence(self, messages: List[Dict]) -> int: + """Collapse malformed role-alternation left in the live history. + + Providers (OpenAI, OpenRouter, Anthropic) expect strict alternation: + after the system message, user/tool alternates with assistant, with + no two consecutive user messages and no tool-result that doesn't + follow an assistant-with-tool_calls. Violations cause silent empty + responses on most providers, which triggers the empty-retry loop. + + This runs right before the API call as a defensive belt — by the + time it fires, the scaffolding strip should already have prevented + most shapes, but external callers (gateway multi-queue replay, + session resume, cron, explicit conversation_history passed in by + host code) can feed in already-broken histories. + + Repairs applied: + 1. Stray ``tool`` messages whose ``tool_call_id`` doesn't match + any preceding assistant tool_call — dropped. + 2. Consecutive ``user`` messages — merged with newline separator + so no user input is lost. + + Deliberately does NOT rewind orphan ``assistant(tool_calls)+tool`` + pairs that precede a user message — that pattern IS valid when the + previous turn completed normally and the user jumped in to redirect + before the model got a continuation turn (the ongoing dialog + pattern). The empty-response scaffolding stripper handles the + genuinely-broken variant via its flag-gated rewind. + + Returns the number of repairs made (for logging/telemetry). + """ + if not messages: + return 0 + + repairs = 0 + + # Pass 1: drop stray tool messages that don't follow a known + # assistant tool_call_id. Uses a rolling set of known ids refreshed + # on each assistant message. + known_tool_ids: set = set() + filtered: List[Dict] = [] + for msg in messages: + if not isinstance(msg, dict): + filtered.append(msg) + continue + role = msg.get("role") + if role == "assistant": + known_tool_ids = set() + for tc in (msg.get("tool_calls") or []): + tc_id = tc.get("id") if isinstance(tc, dict) else None + if tc_id: + known_tool_ids.add(tc_id) + filtered.append(msg) + elif role == "tool": + tc_id = msg.get("tool_call_id") + if tc_id and tc_id in known_tool_ids: + filtered.append(msg) + else: + repairs += 1 + else: + if role == "user": + # A user turn closes the tool-result run; subsequent + # tool messages without a fresh assistant tool_call + # are orphans. + known_tool_ids = set() + filtered.append(msg) + + # Pass 2: merge consecutive user messages. Preserves all user input + # so nothing the user typed is lost. + merged: List[Dict] = [] + for msg in filtered: + if ( + merged + and isinstance(msg, dict) + and msg.get("role") == "user" + and isinstance(merged[-1], dict) + and merged[-1].get("role") == "user" + ): + prev = merged[-1] + prev_content = prev.get("content", "") + new_content = msg.get("content", "") + # Only merge plain-text content; leave multimodal (list) + # content alone — collapsing image/audio blocks risks + # mangling the attachment structure. + if isinstance(prev_content, str) and isinstance(new_content, str): + prev["content"] = ( + (prev_content + "\n\n" + new_content) + if prev_content and new_content + else (prev_content or new_content) + ) + repairs += 1 + continue + merged.append(msg) + + if repairs > 0: + # Rewrite in place so downstream paths (persistence, return + # value, session DB flush) see the repaired sequence. + messages[:] = merged + + return repairs def _flush_messages_to_session_db(self, messages: List[Dict], conversation_history: List[Dict] = None): """Persist any un-flushed messages to the SQLite session store. @@ -11133,6 +11274,21 @@ class AIAgent: self.session_id or "-", ) + # Defensive: repair malformed role-alternation before API call. + # Catches cases where the history got wedged into a + # ``tool → user`` or ``user → user`` tail (e.g. after empty- + # response scaffolding was stripped and a new user message + # landed after an orphan tool result). Most providers return + # empty content on malformed sequences, which would otherwise + # retrigger the empty-retry loop indefinitely. + repaired_seq = self._repair_message_sequence(messages) + if repaired_seq > 0: + request_logger.info( + "Repaired %s message-alternation violations before request (session=%s)", + repaired_seq, + self.session_id or "-", + ) + api_messages = [] for idx, msg in enumerate(messages): api_msg = msg.copy() diff --git a/tests/run_agent/test_empty_response_recovery_persistence.py b/tests/run_agent/test_empty_response_recovery_persistence.py index d31a1ff8d2..24c637a2fe 100644 --- a/tests/run_agent/test_empty_response_recovery_persistence.py +++ b/tests/run_agent/test_empty_response_recovery_persistence.py @@ -21,9 +21,21 @@ def _agent_with_stubbed_persistence(): def test_persist_session_strips_trailing_empty_recovery_scaffolding(): + """After stripping scaffolding, also rewind past orphan trailing tool-result + messages that the failed iteration left behind. Otherwise the next user + message lands after a bare ``tool`` and produces a protocol-invalid + sequence that most providers silently fail on, retriggering the empty- + retry loop indefinitely. + """ agent = _agent_with_stubbed_persistence() messages = [ {"role": "user", "content": "run the task"}, + { + "role": "assistant", + "content": "", + "tool_calls": [{"id": "call_1", "type": "function", + "function": {"name": "x", "arguments": "{}"}}], + }, {"role": "tool", "content": "{}", "tool_call_id": "call_1"}, { "role": "assistant", @@ -42,9 +54,11 @@ def test_persist_session_strips_trailing_empty_recovery_scaffolding(): AIAgent._persist_session(agent, messages, conversation_history=[]) + # After strip + rewind, only the original user message remains. The + # assistant(tool_calls) + tool pair is dropped because its iteration + # never produced a real response. assert messages == [ {"role": "user", "content": "run the task"}, - {"role": "tool", "content": "{}", "tool_call_id": "call_1"}, ] assert agent.saved_session_logs[-1] == messages assert all(not msg.get("_empty_recovery_synthetic") for msg in messages) diff --git a/tests/run_agent/test_message_sequence_repair.py b/tests/run_agent/test_message_sequence_repair.py new file mode 100644 index 0000000000..fd1db95e84 --- /dev/null +++ b/tests/run_agent/test_message_sequence_repair.py @@ -0,0 +1,201 @@ +"""Tests for pre-API-call message-sequence repair. + +Covers ``_repair_message_sequence`` and the extended +``_drop_trailing_empty_response_scaffolding`` behavior that rewinds past +orphan tool-result tails. Together these prevent the self-reinforcing empty- +response loop observed in session 20260507_044111_fa7e65, where a tool-result +followed directly by a user message produced silent empty responses from +providers (violating role alternation), which retriggered the empty-retry +recovery every turn. +""" + +from run_agent import AIAgent + + +def _bare_agent(): + return AIAgent.__new__(AIAgent) + + +# ── _drop_trailing_empty_response_scaffolding ────────────────────────────── + +def test_drop_scaffolding_rewinds_orphan_tool_tail(): + """When scaffolding is stripped, also rewind the orphan assistant+tool pair.""" + agent = _bare_agent() + messages = [ + {"role": "user", "content": "task"}, + {"role": "assistant", "content": "", + "tool_calls": [{"id": "t1", "type": "function", + "function": {"name": "f", "arguments": "{}"}}]}, + {"role": "tool", "tool_call_id": "t1", "content": "out"}, + {"role": "assistant", "content": "(empty)", + "_empty_terminal_sentinel": True}, + ] + + AIAgent._drop_trailing_empty_response_scaffolding(agent, messages) + + assert messages == [{"role": "user", "content": "task"}] + + +def test_drop_scaffolding_keeps_tail_when_no_scaffolding(): + """Mid-iteration tool results must NOT be rewound — only if scaffolding fires.""" + agent = _bare_agent() + messages = [ + {"role": "user", "content": "task"}, + {"role": "assistant", "content": "", + "tool_calls": [{"id": "t1", "type": "function", + "function": {"name": "f", "arguments": "{}"}}]}, + {"role": "tool", "tool_call_id": "t1", "content": "out"}, + ] + original = [dict(m) for m in messages] + + AIAgent._drop_trailing_empty_response_scaffolding(agent, messages) + + assert messages == original + + +def test_drop_scaffolding_handles_multiple_parallel_tool_results(): + """Parallel tool calls (one assistant → many tool results) all rewound together.""" + agent = _bare_agent() + messages = [ + {"role": "user", "content": "task"}, + {"role": "assistant", "content": "", + "tool_calls": [ + {"id": "t1", "type": "function", + "function": {"name": "f", "arguments": "{}"}}, + {"id": "t2", "type": "function", + "function": {"name": "g", "arguments": "{}"}}, + ]}, + {"role": "tool", "tool_call_id": "t1", "content": "out1"}, + {"role": "tool", "tool_call_id": "t2", "content": "out2"}, + {"role": "assistant", "content": "(empty)", + "_empty_terminal_sentinel": True}, + ] + + AIAgent._drop_trailing_empty_response_scaffolding(agent, messages) + + assert messages == [{"role": "user", "content": "task"}] + + +# ── _repair_message_sequence ─────────────────────────────────────────────── + +def test_repair_merges_consecutive_user_messages(): + agent = _bare_agent() + messages = [ + {"role": "user", "content": "first"}, + {"role": "user", "content": "second"}, + ] + + repairs = AIAgent._repair_message_sequence(agent, messages) + + assert repairs == 1 + assert len(messages) == 1 + assert messages[0]["role"] == "user" + assert messages[0]["content"] == "first\n\nsecond" + + +def test_repair_preserves_user_content_when_one_side_empty(): + agent = _bare_agent() + messages = [ + {"role": "user", "content": ""}, + {"role": "user", "content": "real message"}, + ] + + AIAgent._repair_message_sequence(agent, messages) + + assert messages == [{"role": "user", "content": "real message"}] + + +def test_repair_does_not_rewind_ongoing_dialog_tool_pair(): + """assistant(tool_calls) + tool + user is a VALID pattern (user redirect + before the model gets its continuation turn). Repair must not touch it — + only the flag-gated scaffolding strip rewinds, and only when the + empty-recovery scaffolding was actually present. + """ + agent = _bare_agent() + messages = [ + {"role": "user", "content": "Q1"}, + {"role": "assistant", "content": "", + "tool_calls": [{"id": "t1", "type": "function", + "function": {"name": "f", "arguments": "{}"}}]}, + {"role": "tool", "tool_call_id": "t1", "content": "out"}, + {"role": "user", "content": "Q2"}, + ] + original = [dict(m) for m in messages] + + repairs = AIAgent._repair_message_sequence(agent, messages) + + assert repairs == 0 + assert messages == original + + +def test_repair_drops_stray_tool_with_unknown_tool_call_id(): + agent = _bare_agent() + messages = [ + {"role": "user", "content": "hi"}, + {"role": "assistant", "content": "hello"}, + {"role": "tool", "tool_call_id": "orphan", "content": "stray"}, + {"role": "user", "content": "real"}, + ] + + repairs = AIAgent._repair_message_sequence(agent, messages) + + assert repairs >= 1 + assert all(m.get("role") != "tool" for m in messages) + + +def test_repair_leaves_valid_conversation_unchanged(): + agent = _bare_agent() + messages = [ + {"role": "user", "content": "list files"}, + {"role": "assistant", "content": "", + "tool_calls": [{"id": "t1", "type": "function", + "function": {"name": "ls", "arguments": "{}"}}]}, + {"role": "tool", "tool_call_id": "t1", "content": "a.txt b.txt"}, + {"role": "assistant", "content": "Found 2 files"}, + {"role": "user", "content": "more"}, + ] + original = [dict(m) for m in messages] + + repairs = AIAgent._repair_message_sequence(agent, messages) + + assert repairs == 0 + assert messages == original + + +def test_repair_preserves_multimodal_user_content(): + """Multimodal (list) content must NOT be merged — risks mangling attachments.""" + agent = _bare_agent() + messages = [ + {"role": "user", "content": [{"type": "text", "text": "hi"}, + {"type": "image_url", "image_url": {"url": "..."}}]}, + {"role": "user", "content": "follow-up"}, + ] + + AIAgent._repair_message_sequence(agent, messages) + + # The multimodal user message stays as a distinct message — no merge + assert len(messages) == 2 + assert isinstance(messages[0]["content"], list) + + +def test_repair_empty_messages_returns_zero(): + agent = _bare_agent() + messages = [] + + repairs = AIAgent._repair_message_sequence(agent, messages) + + assert repairs == 0 + assert messages == [] + + +def test_repair_preserves_system_messages(): + agent = _bare_agent() + messages = [ + {"role": "system", "content": "You are..."}, + {"role": "user", "content": "hi"}, + ] + original = [dict(m) for m in messages] + + AIAgent._repair_message_sequence(agent, messages) + + assert messages == original