fix(gateway): FIFO busy-mode text follow-ups instead of newline-merging them

When the agent is busy and the user sends multiple text follow-ups, the
interrupt-mode and steer-fallback path stored them via
merge_pending_message_event(merge_text=True), which newline-joins
consecutive TEXT messages into a SINGLE pending turn — collapsing two
separate user messages into one mashed-together turn and destroying the
message boundaries the user sees (#43066 sub-bug 2).

Route that storage through _queue_or_replace_pending_event (the same FIFO
infrastructure used by busy queue-mode and /queue) so each follow-up gets
its own next-turn slot in arrival order, while still preserving
photo-burst / album merge semantics for media. Pure queue-mode already
used FIFO; this brings the interrupt/steer-fallback path in line.

The sibling defect in #43066 (assistant messages lost after compaction)
was already fixed on main by the identity-tracking flush rewrite (#46053)
plus the pre-rotation flush (#47202), so this only addresses the
remaining busy-message-merge half.

Co-authored-by: KiruyaMomochi <65301509+KiruyaMomochi@users.noreply.github.com>
This commit is contained in:
teknium1 2026-06-20 12:39:40 -07:00 committed by Teknium
parent 170ef24c8f
commit c11c510b42
2 changed files with 66 additions and 14 deletions

View file

@ -4240,13 +4240,19 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
# current run finishes (or is interrupted). Skip this for a
# successful steer — the text already landed inside the run and
# must NOT also be replayed as a next-turn user message.
#
# Route through _queue_or_replace_pending_event (the same FIFO
# infrastructure used by busy queue-mode and /queue) rather than a
# raw merge_pending_message_event(merge_text=True). The raw merge
# newline-joins consecutive TEXT follow-ups into a SINGLE pending
# turn, destroying message boundaries — so two separate user
# messages sent while the agent was busy (interrupt mode, or a
# steer that fell back to queue) arrived as one mashed-together
# turn (#43066 sub-bug 2). The FIFO path gives each text its own
# turn in arrival order while still preserving photo-burst / album
# merge semantics for media.
if not steered:
merge_pending_message_event(
adapter._pending_messages,
session_key,
event,
merge_text=event.message_type == MessageType.TEXT,
)
self._queue_or_replace_pending_event(session_key, event)
is_queue_mode = effective_mode == "queue"
is_steer_mode = effective_mode == "steer"

View file

@ -312,13 +312,14 @@ class TestBusySessionAck:
agent.steer = MagicMock(return_value=False) # rejected
runner._running_agents[sk] = agent
with patch("gateway.run.merge_pending_message_event") as mock_merge:
await runner._handle_active_session_busy_message(event, sk)
await runner._handle_active_session_busy_message(event, sk)
agent.steer.assert_called_once()
agent.interrupt.assert_not_called()
# Fell back to queue semantics: event was merged into pending messages
mock_merge.assert_called_once()
# Fell back to queue semantics: event was stored for the next turn
# via the FIFO path (each follow-up its own turn — no newline-merge
# that would mash separate messages together, #43066).
assert adapter._pending_messages.get(sk) is event
# Ack uses queue-mode wording (not steer, not interrupt)
call_kwargs = adapter._send_with_retry.call_args
@ -340,16 +341,61 @@ class TestBusySessionAck:
# Agent is still being set up — sentinel in place
runner._running_agents[sk] = sentinel
with patch("gateway.run.merge_pending_message_event") as mock_merge:
await runner._handle_active_session_busy_message(event, sk)
await runner._handle_active_session_busy_message(event, sk)
# Event was queued instead of steered
mock_merge.assert_called_once()
# Event was queued instead of steered (FIFO path, #43066)
assert adapter._pending_messages.get(sk) is event
call_kwargs = adapter._send_with_retry.call_args
content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "")
assert "Queued for the next turn" in content
@pytest.mark.asyncio
async def test_interrupt_mode_text_followups_fifo_not_merged(self):
"""Two TEXT follow-ups during a busy turn (interrupt mode) must each
get their OWN next-turn slot via FIFO NOT newline-merged into one
mashed-together turn (#43066 sub-bug 2). Before the fix the
interrupt/steer-fallback path called merge_pending_message_event
with merge_text=True, collapsing 'first' and 'second' into
'first\\nsecond' and destroying message boundaries."""
runner, _sentinel = _make_runner()
runner._busy_input_mode = "interrupt"
runner._queued_events = {}
adapter = _make_adapter()
# Both events must share the SAME platform object so they resolve to
# the same adapter (a fresh MagicMock per event would not).
shared_platform = Platform.TELEGRAM
def _evt(text):
src = SessionSource(
platform=shared_platform, chat_id="123",
chat_type="dm", user_id="user1",
)
return MessageEvent(text=text, message_type=MessageType.TEXT,
source=src, message_id=f"m-{text[:5]}")
first = _evt("first message")
second = _evt("second message")
sk = build_session_key(first.source)
runner.adapters[shared_platform] = adapter
agent = MagicMock()
agent._active_children = [] # real list → not demoted to queue
runner._running_agents[sk] = agent
await runner._handle_active_session_busy_message(first, sk)
runner._busy_ack_ts = {} # avoid the 30s ack-debounce early return
await runner._handle_active_session_busy_message(second, sk)
# First lands in the head slot; second goes to the FIFO overflow —
# they are NOT merged into a single pending event.
head = adapter._pending_messages.get(sk)
assert head is first
assert head.text == "first message" # not "first message\nsecond message"
overflow = runner._queued_events.get(sk, [])
assert [e.text for e in overflow] == ["second message"]
@pytest.mark.asyncio
async def test_debounce_suppresses_rapid_acks(self):
"""Second message within 30s should NOT send another ack."""