From ff071fc74c535ce99c8194d306c9f650fe7dcdae Mon Sep 17 00:00:00 2001 From: Teknium <127238744+teknium1@users.noreply.github.com> Date: Sun, 22 Mar 2026 04:56:13 -0700 Subject: [PATCH] fix(gateway): process /queue'd messages after agent completion (#2469) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: respect DashScope v1 runtime mode for alibaba Remove the hardcoded Alibaba branch from resolve_runtime_provider() that forced api_mode='anthropic_messages' regardless of the base URL. Alibaba now goes through the generic API-key provider path, which auto-detects the protocol from the URL: - /apps/anthropic → anthropic_messages (via endswith check) - /v1 → chat_completions (default) This fixes Alibaba setup with OpenAI-compatible DashScope endpoints (e.g. coding-intl.dashscope.aliyuncs.com/v1) that were broken because runtime always forced Anthropic mode even when setup saved a /v1 URL. Based on PR #2024 by @kshitijk4poor. * docs(skill): add split, merge, search examples to ocr-and-documents skill Adds pymupdf examples for PDF splitting, merging, and text search to the existing ocr-and-documents skill. No new dependencies — pymupdf already covers all three operations natively. * fix: replace all production print() calls with logger in rl_training_tool Replace all bare print() calls in production code paths with proper logger calls. - Add `import logging` and module-level `logger = logging.getLogger(__name__)` - Replace print() in _start_training_run() with logger.info() - Replace print() in _stop_training_run() with logger.info() - Replace print(Warning/Note) calls with logger.warning() and logger.info() Using the logging framework allows log level filtering, proper formatting, and log routing instead of always printing to stdout. * fix(gateway): process /queue'd messages after agent completion /queue stored messages in adapter._pending_messages but never consumed them after normal (non-interrupted) completion. The consumption path at line 5219 only checked pending messages when result.get('interrupted') was True — since /queue deliberately doesn't interrupt, queued messages were silently dropped. Now checks adapter._pending_messages after both interrupted AND normal completion. For queued messages (non-interrupt), the first response is delivered before recursing to process the queued follow-up. Skips the direct send when streaming already delivered the response. Reported by GhostMode on Discord. --------- Co-authored-by: kshitijk4poor Co-authored-by: memosr.eth <96793918+memosr@users.noreply.github.com> --- gateway/run.py | 51 ++++++-- tests/gateway/test_queue_consumption.py | 165 ++++++++++++++++++++++++ 2 files changed, 202 insertions(+), 14 deletions(-) create mode 100644 tests/gateway/test_queue_consumption.py diff --git a/gateway/run.py b/gateway/run.py index 20ac741c89..a2081e1fea 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -5226,22 +5226,31 @@ class GatewayRunner: self._effective_model = None self._effective_provider = None - # Check if we were interrupted and have a pending message + # Check if we were interrupted OR have a queued message (/queue). result = result_holder[0] adapter = self.adapters.get(source.platform) - # Get pending message from adapter if interrupted. + # Get pending message from adapter. # Use session_key (not source.chat_id) to match adapter's storage keys. pending = None - if result and result.get("interrupted") and adapter: - pending_event = adapter.get_pending_message(session_key) if session_key else None - if pending_event: - pending = pending_event.text - elif result.get("interrupt_message"): - pending = result.get("interrupt_message") + if result and adapter and session_key: + if result.get("interrupted"): + # Interrupted — consume the interrupt message + pending_event = adapter.get_pending_message(session_key) + if pending_event: + pending = pending_event.text + elif result.get("interrupt_message"): + pending = result.get("interrupt_message") + else: + # Normal completion — check for /queue'd messages that were + # stored without triggering an interrupt. + pending_event = adapter.get_pending_message(session_key) + if pending_event: + pending = pending_event.text + logger.debug("Processing queued message after agent completion: '%s...'", pending[:40]) if pending: - logger.debug("Processing interrupted message: '%s...'", pending[:40]) + logger.debug("Processing pending message: '%s...'", pending[:40]) # Clear the adapter's interrupt event so the next _run_agent call # doesn't immediately re-trigger the interrupt before the new agent @@ -5263,11 +5272,25 @@ class GatewayRunner: adapter.queue_message(session_key, pending) return result_holder[0] or {"final_response": response, "messages": history} - # Don't send the interrupted response to the user — it's just noise - # like "Operation interrupted." They already know they sent a new - # message, so go straight to processing it. - - # Now process the pending message with updated history + was_interrupted = result.get("interrupted") + if not was_interrupted: + # Queued message after normal completion — deliver the first + # response before processing the queued follow-up. + # Skip if streaming already delivered it. + _sc = stream_consumer_holder[0] + _already_streamed = _sc and getattr(_sc, "already_sent", False) + first_response = result.get("final_response", "") + if first_response and not _already_streamed: + try: + await adapter.send(source.chat_id, first_response, + metadata=getattr(event, "metadata", None)) + except Exception as e: + logger.warning("Failed to send first response before queued message: %s", e) + # else: interrupted — discard the interrupted response ("Operation + # interrupted." is just noise; the user already knows they sent a + # new message). + + # Process the pending message with updated history updated_history = result.get("messages", history) return await self._run_agent( message=pending, diff --git a/tests/gateway/test_queue_consumption.py b/tests/gateway/test_queue_consumption.py new file mode 100644 index 0000000000..2a4dd4ff02 --- /dev/null +++ b/tests/gateway/test_queue_consumption.py @@ -0,0 +1,165 @@ +"""Tests for /queue message consumption after normal agent completion. + +Verifies that messages queued via /queue (which store in +adapter._pending_messages WITHOUT triggering an interrupt) are consumed +after the agent finishes its current task — not silently dropped. +""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from gateway.platforms.base import ( + BasePlatformAdapter, + MessageEvent, + MessageType, + PlatformConfig, + Platform, +) + + +# --------------------------------------------------------------------------- +# Minimal adapter for testing pending message storage +# --------------------------------------------------------------------------- + +class _StubAdapter(BasePlatformAdapter): + def __init__(self): + super().__init__(PlatformConfig(enabled=True, token="test"), Platform.TELEGRAM) + + async def connect(self) -> bool: + return True + + async def disconnect(self) -> None: + self._mark_disconnected() + + async def send(self, chat_id, content, reply_to=None, metadata=None): + from gateway.platforms.base import SendResult + return SendResult(success=True, message_id="msg-1") + + async def get_chat_info(self, chat_id): + return {"id": chat_id, "type": "dm"} + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestQueueMessageStorage: + """Verify /queue stores messages correctly in adapter._pending_messages.""" + + def test_queue_stores_message_in_pending(self): + adapter = _StubAdapter() + session_key = "telegram:user:123" + event = MessageEvent( + text="do this next", + message_type=MessageType.TEXT, + source=MagicMock(chat_id="123", platform=Platform.TELEGRAM), + message_id="q1", + ) + adapter._pending_messages[session_key] = event + + assert session_key in adapter._pending_messages + assert adapter._pending_messages[session_key].text == "do this next" + + def test_get_pending_message_consumes_and_clears(self): + adapter = _StubAdapter() + session_key = "telegram:user:123" + event = MessageEvent( + text="queued prompt", + message_type=MessageType.TEXT, + source=MagicMock(chat_id="123", platform=Platform.TELEGRAM), + message_id="q2", + ) + adapter._pending_messages[session_key] = event + + retrieved = adapter.get_pending_message(session_key) + assert retrieved is not None + assert retrieved.text == "queued prompt" + # Should be consumed (cleared) + assert adapter.get_pending_message(session_key) is None + + def test_queue_does_not_set_interrupt_event(self): + """The whole point of /queue — no interrupt signal.""" + adapter = _StubAdapter() + session_key = "telegram:user:123" + + # Simulate an active session (agent running) + adapter._active_sessions[session_key] = asyncio.Event() + + # Store a queued message (what /queue does) + event = MessageEvent( + text="queued", + message_type=MessageType.TEXT, + source=MagicMock(), + message_id="q3", + ) + adapter._pending_messages[session_key] = event + + # The interrupt event should NOT be set + assert not adapter._active_sessions[session_key].is_set() + assert not adapter.has_pending_interrupt(session_key) + + def test_regular_message_sets_interrupt_event(self): + """Contrast: regular messages DO trigger interrupt.""" + adapter = _StubAdapter() + session_key = "telegram:user:123" + + adapter._active_sessions[session_key] = asyncio.Event() + + # Simulate regular message arrival (what handle_message does) + event = MessageEvent( + text="new message", + message_type=MessageType.TEXT, + source=MagicMock(), + message_id="m1", + ) + adapter._pending_messages[session_key] = event + adapter._active_sessions[session_key].set() # this is what handle_message does + + assert adapter.has_pending_interrupt(session_key) + + +class TestQueueConsumptionAfterCompletion: + """Verify that pending messages are consumed after normal completion.""" + + def test_pending_message_available_after_normal_completion(self): + """After agent finishes without interrupt, pending message should + still be retrievable from adapter._pending_messages.""" + adapter = _StubAdapter() + session_key = "telegram:user:123" + + # Simulate: agent starts, /queue stores a message, agent finishes + adapter._active_sessions[session_key] = asyncio.Event() + event = MessageEvent( + text="process this after", + message_type=MessageType.TEXT, + source=MagicMock(), + message_id="q4", + ) + adapter._pending_messages[session_key] = event + + # Agent finishes (no interrupt) + del adapter._active_sessions[session_key] + + # The queued message should still be retrievable + retrieved = adapter.get_pending_message(session_key) + assert retrieved is not None + assert retrieved.text == "process this after" + + def test_multiple_queues_last_one_wins(self): + """If user /queue's multiple times, last message overwrites.""" + adapter = _StubAdapter() + session_key = "telegram:user:123" + + for text in ["first", "second", "third"]: + event = MessageEvent( + text=text, + message_type=MessageType.TEXT, + source=MagicMock(), + message_id=f"q-{text}", + ) + adapter._pending_messages[session_key] = event + + retrieved = adapter.get_pending_message(session_key) + assert retrieved.text == "third"