diff --git a/gateway/run.py b/gateway/run.py index 20ac741c89..a2081e1fea 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -5226,22 +5226,31 @@ class GatewayRunner: self._effective_model = None self._effective_provider = None - # Check if we were interrupted and have a pending message + # Check if we were interrupted OR have a queued message (/queue). result = result_holder[0] adapter = self.adapters.get(source.platform) - # Get pending message from adapter if interrupted. + # Get pending message from adapter. # Use session_key (not source.chat_id) to match adapter's storage keys. pending = None - if result and result.get("interrupted") and adapter: - pending_event = adapter.get_pending_message(session_key) if session_key else None - if pending_event: - pending = pending_event.text - elif result.get("interrupt_message"): - pending = result.get("interrupt_message") + if result and adapter and session_key: + if result.get("interrupted"): + # Interrupted — consume the interrupt message + pending_event = adapter.get_pending_message(session_key) + if pending_event: + pending = pending_event.text + elif result.get("interrupt_message"): + pending = result.get("interrupt_message") + else: + # Normal completion — check for /queue'd messages that were + # stored without triggering an interrupt. + pending_event = adapter.get_pending_message(session_key) + if pending_event: + pending = pending_event.text + logger.debug("Processing queued message after agent completion: '%s...'", pending[:40]) if pending: - logger.debug("Processing interrupted message: '%s...'", pending[:40]) + logger.debug("Processing pending message: '%s...'", pending[:40]) # Clear the adapter's interrupt event so the next _run_agent call # doesn't immediately re-trigger the interrupt before the new agent @@ -5263,11 +5272,25 @@ class GatewayRunner: adapter.queue_message(session_key, pending) return result_holder[0] or {"final_response": response, "messages": history} - # Don't send the interrupted response to the user — it's just noise - # like "Operation interrupted." They already know they sent a new - # message, so go straight to processing it. - - # Now process the pending message with updated history + was_interrupted = result.get("interrupted") + if not was_interrupted: + # Queued message after normal completion — deliver the first + # response before processing the queued follow-up. + # Skip if streaming already delivered it. + _sc = stream_consumer_holder[0] + _already_streamed = _sc and getattr(_sc, "already_sent", False) + first_response = result.get("final_response", "") + if first_response and not _already_streamed: + try: + await adapter.send(source.chat_id, first_response, + metadata=getattr(event, "metadata", None)) + except Exception as e: + logger.warning("Failed to send first response before queued message: %s", e) + # else: interrupted — discard the interrupted response ("Operation + # interrupted." is just noise; the user already knows they sent a + # new message). + + # Process the pending message with updated history updated_history = result.get("messages", history) return await self._run_agent( message=pending, diff --git a/tests/gateway/test_queue_consumption.py b/tests/gateway/test_queue_consumption.py new file mode 100644 index 0000000000..2a4dd4ff02 --- /dev/null +++ b/tests/gateway/test_queue_consumption.py @@ -0,0 +1,165 @@ +"""Tests for /queue message consumption after normal agent completion. + +Verifies that messages queued via /queue (which store in +adapter._pending_messages WITHOUT triggering an interrupt) are consumed +after the agent finishes its current task — not silently dropped. +""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.platforms.base import ( + BasePlatformAdapter, + MessageEvent, + MessageType, + PlatformConfig, + Platform, +) + + +# --------------------------------------------------------------------------- +# Minimal adapter for testing pending message storage +# --------------------------------------------------------------------------- + +class _StubAdapter(BasePlatformAdapter): + def __init__(self): + super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM) + + async def connect(self) -> bool: + return True + + async def disconnect(self) -> None: + self._mark_disconnected() + + async def send(self, chat_id, content, reply_to=None, metadata=None): + from gateway.platforms.base import SendResult + return SendResult(success=True, message_id="msg-1") + + async def get_chat_info(self, chat_id): + return {"id": chat_id, "type": "dm"} + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestQueueMessageStorage: + """Verify /queue stores messages correctly in adapter._pending_messages.""" + + def test_queue_stores_message_in_pending(self): + adapter = _StubAdapter() + session_key = "telegram:user:123" + event = MessageEvent( + text="do this next", + message_type=MessageType.TEXT, + source=MagicMock(chat_id="123", platform=Platform.TELEGRAM), + message_id="q1", + ) + adapter._pending_messages[session_key] = event + + assert session_key in adapter._pending_messages + assert adapter._pending_messages[session_key].text == "do this next" + + def test_get_pending_message_consumes_and_clears(self): + adapter = _StubAdapter() + session_key = "telegram:user:123" + event = MessageEvent( + text="queued prompt", + message_type=MessageType.TEXT, + source=MagicMock(chat_id="123", platform=Platform.TELEGRAM), + message_id="q2", + ) + adapter._pending_messages[session_key] = event + + retrieved = adapter.get_pending_message(session_key) + assert retrieved is not None + assert retrieved.text == "queued prompt" + # Should be consumed (cleared) + assert adapter.get_pending_message(session_key) is None + + def test_queue_does_not_set_interrupt_event(self): + """The whole point of /queue — no interrupt signal.""" + adapter = _StubAdapter() + session_key = "telegram:user:123" + + # Simulate an active session (agent running) + adapter._active_sessions[session_key] = asyncio.Event() + + # Store a queued message (what /queue does) + event = MessageEvent( + text="queued", + message_type=MessageType.TEXT, + source=MagicMock(), + message_id="q3", + ) + adapter._pending_messages[session_key] = event + + # The interrupt event should NOT be set + assert not adapter._active_sessions[session_key].is_set() + assert not adapter.has_pending_interrupt(session_key) + + def test_regular_message_sets_interrupt_event(self): + """Contrast: regular messages DO trigger interrupt.""" + adapter = _StubAdapter() + session_key = "telegram:user:123" + + adapter._active_sessions[session_key] = asyncio.Event() + + # Simulate regular message arrival (what handle_message does) + event = MessageEvent( + text="new message", + message_type=MessageType.TEXT, + source=MagicMock(), + message_id="m1", + ) + adapter._pending_messages[session_key] = event + adapter._active_sessions[session_key].set() # this is what handle_message does + + assert adapter.has_pending_interrupt(session_key) + + +class TestQueueConsumptionAfterCompletion: + """Verify that pending messages are consumed after normal completion.""" + + def test_pending_message_available_after_normal_completion(self): + """After agent finishes without interrupt, pending message should + still be retrievable from adapter._pending_messages.""" + adapter = _StubAdapter() + session_key = "telegram:user:123" + + # Simulate: agent starts, /queue stores a message, agent finishes + adapter._active_sessions[session_key] = asyncio.Event() + event = MessageEvent( + text="process this after", + message_type=MessageType.TEXT, + source=MagicMock(), + message_id="q4", + ) + adapter._pending_messages[session_key] = event + + # Agent finishes (no interrupt) + del adapter._active_sessions[session_key] + + # The queued message should still be retrievable + retrieved = adapter.get_pending_message(session_key) + assert retrieved is not None + assert retrieved.text == "process this after" + + def test_multiple_queues_last_one_wins(self): + """If user /queue's multiple times, last message overwrites.""" + adapter = _StubAdapter() + session_key = "telegram:user:123" + + for text in ["first", "second", "third"]: + event = MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=MagicMock(), + message_id=f"q-{text}", + ) + adapter._pending_messages[session_key] = event + + retrieved = adapter.get_pending_message(session_key) + assert retrieved.text == "third"