From 4d0f2bd241694e91d1172194f7a0f73d2d585ba6 Mon Sep 17 00:00:00 2001 From: Cornna <96944678+ymylive@users.noreply.github.com> Date: Thu, 28 May 2026 18:35:16 +0800 Subject: [PATCH 1/2] fix(gateway): use FIFO queue for busy_input_mode pending messages Closes #28503 --- gateway/run.py | 43 +++++++++++- tests/gateway/test_queue_consumption.py | 92 +++++++++++++++++++++++++ 2 files changed, 134 insertions(+), 1 deletion(-) diff --git a/gateway/run.py b/gateway/run.py index 3f950685f1c..917ce2a28cc 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -3328,11 +3328,52 @@ class GatewayRunner: except Exception: return False + # Hard cap on per-session pending follow-ups for busy_input_mode=queue + # (and the draining/steer-fallback/subagent-demotion paths that share + # this entry point). Without a cap, a stuck agent + a rapid-fire user + # could grow the overflow list unboundedly. 32 turns of queued + # follow-ups is far beyond any realistic conversational backlog while + # still small enough to never threaten memory. + _BUSY_QUEUE_MAX_PENDING = 32 + def _queue_or_replace_pending_event(self, session_key: str, event: MessageEvent) -> None: adapter = self.adapters.get(event.source.platform) if not adapter: return - merge_pending_message_event(adapter._pending_messages, session_key, event) + # #28503 — Previously this called ``merge_pending_message_event`` + # with the default ``merge_text=False``, which silently OVERWROTE + # the single pending slot when consecutive text messages arrived + # in ``busy_input_mode: queue``. Route through the FIFO + # infrastructure shared with ``/queue`` so each follow-up gets + # its own turn in arrival order. Photo bursts still merge into + # the head slot via ``merge_pending_message_event`` (album + # semantics); everything else appends to the overflow tail. + pending_slot = getattr(adapter, "_pending_messages", None) + existing = pending_slot.get(session_key) if isinstance(pending_slot, dict) else None + if existing is not None and ( + getattr(existing, "message_type", None) == MessageType.PHOTO + or event.message_type == MessageType.PHOTO + or bool(getattr(existing, "media_urls", None)) + or bool(getattr(event, "media_urls", None)) + ): + # Preserve photo-burst / media-merge semantics for the head slot. + merge_pending_message_event( + adapter._pending_messages, + session_key, + event, + merge_text=event.message_type == MessageType.TEXT, + ) + return + + if self._queue_depth(session_key, adapter=adapter) >= self._BUSY_QUEUE_MAX_PENDING: + logger.warning( + "Dropping busy-mode follow-up for session %s — pending queue at cap (%d).", + session_key, + self._BUSY_QUEUE_MAX_PENDING, + ) + return + + self._enqueue_fifo(session_key, event, adapter) async def _handle_active_session_busy_message(self, event: MessageEvent, session_key: str) -> bool: # --- Authorization gate (#17775) --- diff --git a/tests/gateway/test_queue_consumption.py b/tests/gateway/test_queue_consumption.py index 178d1965af9..792d7b7ea52 100644 --- a/tests/gateway/test_queue_consumption.py +++ b/tests/gateway/test_queue_consumption.py @@ -360,3 +360,95 @@ class TestQueueConsumptionAfterCompletion: e.text for e in runner._queued_events[session_key] ] assert collected == texts + + +class TestBusyInputModeQueueFifo: + """Regression coverage for issue #28503. + + ``busy_input_mode: queue`` rapid follow-ups used to silently overwrite + a single pending slot, losing every message except the last. The + runner's busy/queue/steer-fallback entry point now routes through + the same FIFO infrastructure as ``/queue``, so each follow-up gets + its own turn in arrival order. + """ + + def _make_runner_and_adapter(self): + from gateway.run import GatewayRunner + + runner = GatewayRunner.__new__(GatewayRunner) + runner._queued_events = {} + adapter = _StubAdapter() + runner.adapters = {Platform.TELEGRAM: adapter} + return runner, adapter + + def _text_event(self, text: str) -> MessageEvent: + source = MagicMock(chat_id="c1", platform=Platform.TELEGRAM) + return MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=source, + message_id=f"m-{text}", + ) + + def test_rapid_text_followups_are_queued_in_fifo_order(self): + """Five rapid texts in queue mode must all survive (none silently dropped).""" + runner, adapter = self._make_runner_and_adapter() + session_key = "telegram:user:fifo" + + texts = ["one", "two", "three", "four", "five"] + for text in texts: + runner._queue_or_replace_pending_event(session_key, self._text_event(text)) + + # Head slot keeps the first; overflow keeps the rest in order. + assert adapter._pending_messages[session_key].text == "one" + assert [e.text for e in runner._queued_events[session_key]] == [ + "two", + "three", + "four", + "five", + ] + assert runner._queue_depth(session_key, adapter=adapter) == len(texts) + + def test_queue_respects_bounded_cap(self): + """Beyond the per-session cap, follow-ups are dropped (with a warning).""" + from gateway.run import GatewayRunner + + runner, adapter = self._make_runner_and_adapter() + session_key = "telegram:user:cap" + + cap = GatewayRunner._BUSY_QUEUE_MAX_PENDING + for i in range(cap + 5): + runner._queue_or_replace_pending_event( + session_key, self._text_event(f"msg-{i:03d}") + ) + + # Exactly ``cap`` follow-ups retained (head + cap-1 in overflow). + assert runner._queue_depth(session_key, adapter=adapter) == cap + assert adapter._pending_messages[session_key].text == "msg-000" + # The last accepted overflow item is msg-{cap-1}. + assert runner._queued_events[session_key][-1].text == f"msg-{cap - 1:03d}" + + def test_photo_burst_still_merges_in_head_slot(self): + """Photo bursts must keep album-merge semantics, not split into N turns.""" + runner, adapter = self._make_runner_and_adapter() + session_key = "telegram:user:burst" + + source = MagicMock(chat_id="c1", platform=Platform.TELEGRAM) + for i in range(3): + runner._queue_or_replace_pending_event( + session_key, + MessageEvent( + text="", + message_type=MessageType.PHOTO, + source=source, + message_id=f"p-{i}", + media_urls=[f"http://example.com/{i}.jpg"], + media_types=["image/jpeg"], + ), + ) + + # Single merged head event with all three media URLs. + assert session_key not in runner._queued_events or not runner._queued_events[session_key] + head = adapter._pending_messages[session_key] + assert head.message_type == MessageType.PHOTO + assert len(head.media_urls) == 3 From fec5ca71d8cabb7e770cc6e4a96317a64b970180 Mon Sep 17 00:00:00 2001 From: Cornna <96944678+ymylive@users.noreply.github.com> Date: Wed, 3 Jun 2026 16:24:33 +0800 Subject: [PATCH 2/2] fix: preserve telegram queue fifo during grace window --- gateway/run.py | 15 +++++--- tests/gateway/test_busy_session_ack.py | 52 ++++++++++++++++++++++++++ 2 files changed, 61 insertions(+), 6 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index 917ce2a28cc..bd91061d148 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -7902,12 +7902,15 @@ class GatewayRunner: ) adapter = self.adapters.get(source.platform) if adapter: - merge_pending_message_event( - adapter._pending_messages, - _quick_key, - event, - merge_text=True, - ) + if self._busy_input_mode == "queue": + self._enqueue_fifo(_quick_key, event, adapter) + else: + merge_pending_message_event( + adapter._pending_messages, + _quick_key, + event, + merge_text=True, + ) return None running_agent = self._running_agents.get(_quick_key) diff --git a/tests/gateway/test_busy_session_ack.py b/tests/gateway/test_busy_session_ack.py index 7fb3d3210c0..c5517c5f638 100644 --- a/tests/gateway/test_busy_session_ack.py +++ b/tests/gateway/test_busy_session_ack.py @@ -27,6 +27,7 @@ sys.modules.setdefault("telegram.ext", types.ModuleType("telegram.ext")) from gateway.platforms.base import ( MessageEvent, MessageType, + Platform, SessionSource, build_session_key, ) @@ -66,6 +67,8 @@ def _make_runner(): runner._busy_text_mode = "interrupt" runner.adapters = {} runner.config = MagicMock() + runner.config.group_sessions_per_user = True + runner.config.thread_sessions_per_user = False runner.session_store = None runner.hooks = MagicMock() runner.hooks.emit = AsyncMock() @@ -119,6 +122,55 @@ class TestBusySessionAck: assert sk not in runner._pending_messages running_agent.interrupt.assert_not_called() + @pytest.mark.asyncio + async def test_telegram_grace_followups_respect_queue_fifo(self, monkeypatch): + """Rapid Telegram text follow-ups in queue mode must not merge.""" + from gateway.run import GatewayRunner + + monkeypatch.setenv("HERMES_TELEGRAM_FOLLOWUP_GRACE_SECONDS", "3.0") + + runner, _sentinel = _make_runner() + runner._busy_input_mode = "queue" + runner._queued_events = {} + adapter = _make_adapter() + + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="123", + chat_type="dm", + user_id="user1", + ) + sk = build_session_key(source) + runner.adapters[source.platform] = adapter + + agent = MagicMock() + agent.get_activity_summary.return_value = { + "seconds_since_activity": 0.0, + } + runner._running_agents[sk] = agent + runner._running_agents_ts[sk] = time.time() + + events = [ + MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=source, + message_id=f"m-{idx}", + ) + for idx, text in enumerate(("first", "second", "third"), start=1) + ] + + for event in events: + result = await GatewayRunner._handle_message(runner, event) + assert result is None + + assert adapter._pending_messages[sk].text == "first" + assert [event.text for event in runner._queued_events[sk]] == [ + "second", + "third", + ] + agent.interrupt.assert_not_called() + @pytest.mark.asyncio async def test_sends_ack_when_agent_running(self): """First message during busy session should get a status ack."""