diff --git a/gateway/run.py b/gateway/run.py index f5a411244aa..73700e3b529 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -4240,13 +4240,19 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew # current run finishes (or is interrupted). Skip this for a # successful steer — the text already landed inside the run and # must NOT also be replayed as a next-turn user message. + # + # Route through _queue_or_replace_pending_event (the same FIFO + # infrastructure used by busy queue-mode and /queue) rather than a + # raw merge_pending_message_event(merge_text=True). The raw merge + # newline-joins consecutive TEXT follow-ups into a SINGLE pending + # turn, destroying message boundaries — so two separate user + # messages sent while the agent was busy (interrupt mode, or a + # steer that fell back to queue) arrived as one mashed-together + # turn (#43066 sub-bug 2). The FIFO path gives each text its own + # turn in arrival order while still preserving photo-burst / album + # merge semantics for media. if not steered: - merge_pending_message_event( - adapter._pending_messages, - session_key, - event, - merge_text=event.message_type == MessageType.TEXT, - ) + self._queue_or_replace_pending_event(session_key, event) is_queue_mode = effective_mode == "queue" is_steer_mode = effective_mode == "steer" diff --git a/tests/gateway/test_busy_session_ack.py b/tests/gateway/test_busy_session_ack.py index c5517c5f638..c58031fdb5c 100644 --- a/tests/gateway/test_busy_session_ack.py +++ b/tests/gateway/test_busy_session_ack.py @@ -312,13 +312,14 @@ class TestBusySessionAck: agent.steer = MagicMock(return_value=False) # rejected runner._running_agents[sk] = agent - with patch("gateway.run.merge_pending_message_event") as mock_merge: - await runner._handle_active_session_busy_message(event, sk) + await runner._handle_active_session_busy_message(event, sk) agent.steer.assert_called_once() agent.interrupt.assert_not_called() - # Fell back to queue semantics: event was merged into pending messages - mock_merge.assert_called_once() + # Fell back to queue semantics: event was stored for the next turn + # via the FIFO path (each follow-up its own turn — no newline-merge + # that would mash separate messages together, #43066). + assert adapter._pending_messages.get(sk) is event # Ack uses queue-mode wording (not steer, not interrupt) call_kwargs = adapter._send_with_retry.call_args @@ -340,16 +341,61 @@ class TestBusySessionAck: # Agent is still being set up — sentinel in place runner._running_agents[sk] = sentinel - with patch("gateway.run.merge_pending_message_event") as mock_merge: - await runner._handle_active_session_busy_message(event, sk) + await runner._handle_active_session_busy_message(event, sk) - # Event was queued instead of steered - mock_merge.assert_called_once() + # Event was queued instead of steered (FIFO path, #43066) + assert adapter._pending_messages.get(sk) is event call_kwargs = adapter._send_with_retry.call_args content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "") assert "Queued for the next turn" in content + @pytest.mark.asyncio + async def test_interrupt_mode_text_followups_fifo_not_merged(self): + """Two TEXT follow-ups during a busy turn (interrupt mode) must each + get their OWN next-turn slot via FIFO — NOT newline-merged into one + mashed-together turn (#43066 sub-bug 2). Before the fix the + interrupt/steer-fallback path called merge_pending_message_event + with merge_text=True, collapsing 'first' and 'second' into + 'first\\nsecond' and destroying message boundaries.""" + runner, _sentinel = _make_runner() + runner._busy_input_mode = "interrupt" + runner._queued_events = {} + adapter = _make_adapter() + + # Both events must share the SAME platform object so they resolve to + # the same adapter (a fresh MagicMock per event would not). + shared_platform = Platform.TELEGRAM + + def _evt(text): + src = SessionSource( + platform=shared_platform, chat_id="123", + chat_type="dm", user_id="user1", + ) + return MessageEvent(text=text, message_type=MessageType.TEXT, + source=src, message_id=f"m-{text[:5]}") + + first = _evt("first message") + second = _evt("second message") + sk = build_session_key(first.source) + runner.adapters[shared_platform] = adapter + + agent = MagicMock() + agent._active_children = [] # real list → not demoted to queue + runner._running_agents[sk] = agent + + await runner._handle_active_session_busy_message(first, sk) + runner._busy_ack_ts = {} # avoid the 30s ack-debounce early return + await runner._handle_active_session_busy_message(second, sk) + + # First lands in the head slot; second goes to the FIFO overflow — + # they are NOT merged into a single pending event. + head = adapter._pending_messages.get(sk) + assert head is first + assert head.text == "first message" # not "first message\nsecond message" + overflow = runner._queued_events.get(sk, []) + assert [e.text for e in overflow] == ["second message"] + @pytest.mark.asyncio async def test_debounce_suppresses_rapid_acks(self): """Second message within 30s should NOT send another ack."""