diff --git a/gateway/run.py b/gateway/run.py index 3f950685f1c..917ce2a28cc 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3328,11 +3328,52 @@ class GatewayRunner: except Exception: return False + # Hard cap on per-session pending follow-ups for busy_input_mode=queue + # (and the draining/steer-fallback/subagent-demotion paths that share + # this entry point). Without a cap, a stuck agent + a rapid-fire user + # could grow the overflow list unboundedly. 32 turns of queued + # follow-ups is far beyond any realistic conversational backlog while + # still small enough to never threaten memory. + _BUSY_QUEUE_MAX_PENDING = 32 + def _queue_or_replace_pending_event(self, session_key: str, event: MessageEvent) -> None: adapter = self.adapters.get(event.source.platform) if not adapter: return - merge_pending_message_event(adapter._pending_messages, session_key, event) + # #28503 — Previously this called ``merge_pending_message_event`` + # with the default ``merge_text=False``, which silently OVERWROTE + # the single pending slot when consecutive text messages arrived + # in ``busy_input_mode: queue``. Route through the FIFO + # infrastructure shared with ``/queue`` so each follow-up gets + # its own turn in arrival order. Photo bursts still merge into + # the head slot via ``merge_pending_message_event`` (album + # semantics); everything else appends to the overflow tail. + pending_slot = getattr(adapter, "_pending_messages", None) + existing = pending_slot.get(session_key) if isinstance(pending_slot, dict) else None + if existing is not None and ( + getattr(existing, "message_type", None) == MessageType.PHOTO + or event.message_type == MessageType.PHOTO + or bool(getattr(existing, "media_urls", None)) + or bool(getattr(event, "media_urls", None)) + ): + # Preserve photo-burst / media-merge semantics for the head slot. + merge_pending_message_event( + adapter._pending_messages, + session_key, + event, + merge_text=event.message_type == MessageType.TEXT, + ) + return + + if self._queue_depth(session_key, adapter=adapter) >= self._BUSY_QUEUE_MAX_PENDING: + logger.warning( + "Dropping busy-mode follow-up for session %s — pending queue at cap (%d).", + session_key, + self._BUSY_QUEUE_MAX_PENDING, + ) + return + + self._enqueue_fifo(session_key, event, adapter) async def _handle_active_session_busy_message(self, event: MessageEvent, session_key: str) -> bool: # --- Authorization gate (#17775) --- diff --git a/tests/gateway/test_queue_consumption.py b/tests/gateway/test_queue_consumption.py index 178d1965af9..792d7b7ea52 100644 --- a/tests/gateway/test_queue_consumption.py +++ b/tests/gateway/test_queue_consumption.py @@ -360,3 +360,95 @@ class TestQueueConsumptionAfterCompletion: e.text for e in runner._queued_events[session_key] ] assert collected == texts + + +class TestBusyInputModeQueueFifo: + """Regression coverage for issue #28503. + + ``busy_input_mode: queue`` rapid follow-ups used to silently overwrite + a single pending slot, losing every message except the last. The + runner's busy/queue/steer-fallback entry point now routes through + the same FIFO infrastructure as ``/queue``, so each follow-up gets + its own turn in arrival order. + """ + + def _make_runner_and_adapter(self): + from gateway.run import GatewayRunner + + runner = GatewayRunner.__new__(GatewayRunner) + runner._queued_events = {} + adapter = _StubAdapter() + runner.adapters = {Platform.TELEGRAM: adapter} + return runner, adapter + + def _text_event(self, text: str) -> MessageEvent: + source = MagicMock(chat_id="c1", platform=Platform.TELEGRAM) + return MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=source, + message_id=f"m-{text}", + ) + + def test_rapid_text_followups_are_queued_in_fifo_order(self): + """Five rapid texts in queue mode must all survive (none silently dropped).""" + runner, adapter = self._make_runner_and_adapter() + session_key = "telegram:user:fifo" + + texts = ["one", "two", "three", "four", "five"] + for text in texts: + runner._queue_or_replace_pending_event(session_key, self._text_event(text)) + + # Head slot keeps the first; overflow keeps the rest in order. + assert adapter._pending_messages[session_key].text == "one" + assert [e.text for e in runner._queued_events[session_key]] == [ + "two", + "three", + "four", + "five", + ] + assert runner._queue_depth(session_key, adapter=adapter) == len(texts) + + def test_queue_respects_bounded_cap(self): + """Beyond the per-session cap, follow-ups are dropped (with a warning).""" + from gateway.run import GatewayRunner + + runner, adapter = self._make_runner_and_adapter() + session_key = "telegram:user:cap" + + cap = GatewayRunner._BUSY_QUEUE_MAX_PENDING + for i in range(cap + 5): + runner._queue_or_replace_pending_event( + session_key, self._text_event(f"msg-{i:03d}") + ) + + # Exactly ``cap`` follow-ups retained (head + cap-1 in overflow). + assert runner._queue_depth(session_key, adapter=adapter) == cap + assert adapter._pending_messages[session_key].text == "msg-000" + # The last accepted overflow item is msg-{cap-1}. + assert runner._queued_events[session_key][-1].text == f"msg-{cap - 1:03d}" + + def test_photo_burst_still_merges_in_head_slot(self): + """Photo bursts must keep album-merge semantics, not split into N turns.""" + runner, adapter = self._make_runner_and_adapter() + session_key = "telegram:user:burst" + + source = MagicMock(chat_id="c1", platform=Platform.TELEGRAM) + for i in range(3): + runner._queue_or_replace_pending_event( + session_key, + MessageEvent( + text="", + message_type=MessageType.PHOTO, + source=source, + message_id=f"p-{i}", + media_urls=[f"http://example.com/{i}.jpg"], + media_types=["image/jpeg"], + ), + ) + + # Single merged head event with all three media URLs. + assert session_key not in runner._queued_events or not runner._queued_events[session_key] + head = adapter._pending_messages[session_key] + assert head.message_type == MessageType.PHOTO + assert len(head.media_urls) == 3