diff --git a/gateway/run.py b/gateway/run.py index 917ce2a28cc..bd91061d148 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -7902,12 +7902,15 @@ class GatewayRunner: ) adapter = self.adapters.get(source.platform) if adapter: - merge_pending_message_event( - adapter._pending_messages, - _quick_key, - event, - merge_text=True, - ) + if self._busy_input_mode == "queue": + self._enqueue_fifo(_quick_key, event, adapter) + else: + merge_pending_message_event( + adapter._pending_messages, + _quick_key, + event, + merge_text=True, + ) return None running_agent = self._running_agents.get(_quick_key) diff --git a/tests/gateway/test_busy_session_ack.py b/tests/gateway/test_busy_session_ack.py index 7fb3d3210c0..c5517c5f638 100644 --- a/tests/gateway/test_busy_session_ack.py +++ b/tests/gateway/test_busy_session_ack.py @@ -27,6 +27,7 @@ sys.modules.setdefault("telegram.ext", types.ModuleType("telegram.ext")) from gateway.platforms.base import ( MessageEvent, MessageType, + Platform, SessionSource, build_session_key, ) @@ -66,6 +67,8 @@ def _make_runner(): runner._busy_text_mode = "interrupt" runner.adapters = {} runner.config = MagicMock() + runner.config.group_sessions_per_user = True + runner.config.thread_sessions_per_user = False runner.session_store = None runner.hooks = MagicMock() runner.hooks.emit = AsyncMock() @@ -119,6 +122,55 @@ class TestBusySessionAck: assert sk not in runner._pending_messages running_agent.interrupt.assert_not_called() + @pytest.mark.asyncio + async def test_telegram_grace_followups_respect_queue_fifo(self, monkeypatch): + """Rapid Telegram text follow-ups in queue mode must not merge.""" + from gateway.run import GatewayRunner + + monkeypatch.setenv("HERMES_TELEGRAM_FOLLOWUP_GRACE_SECONDS", "3.0") + + runner, _sentinel = _make_runner() + runner._busy_input_mode = "queue" + runner._queued_events = {} + adapter = _make_adapter() + + source = SessionSource( + platform=Platform.TELEGRAM, + chat_id="123", + chat_type="dm", + user_id="user1", + ) + sk = build_session_key(source) + runner.adapters[source.platform] = adapter + + agent = MagicMock() + agent.get_activity_summary.return_value = { + "seconds_since_activity": 0.0, + } + runner._running_agents[sk] = agent + runner._running_agents_ts[sk] = time.time() + + events = [ + MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=source, + message_id=f"m-{idx}", + ) + for idx, text in enumerate(("first", "second", "third"), start=1) + ] + + for event in events: + result = await GatewayRunner._handle_message(runner, event) + assert result is None + + assert adapter._pending_messages[sk].text == "first" + assert [event.text for event in runner._queued_events[sk]] == [ + "second", + "third", + ] + agent.interrupt.assert_not_called() + @pytest.mark.asyncio async def test_sends_ack_when_agent_running(self): """First message during busy session should get a status ack."""