diff --git a/gateway/run.py b/gateway/run.py index 881f77cb7..db3f8b00d 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1551,27 +1551,23 @@ class GatewayRunner: ) return True - # --- Normal busy case (agent actively running a task) --- - # The user sent a message while the agent is working. Interrupt the - # agent immediately so it stops the current tool-calling loop and - # processes the new message. The pending message is stored in the - # adapter so the base adapter picks it up once the interrupted run - # returns. A brief ack tells the user what's happening (debounced - # to avoid spam when they fire multiple messages quickly). - + # Normal busy case (agent actively running a task) adapter = self.adapters.get(event.source.platform) if not adapter: return False # let default path handle it # Store the message so it's processed as the next turn after the - # interrupt causes the current run to exit. + # current run finishes (or is interrupted). from gateway.platforms.base import merge_pending_message_event merge_pending_message_event(adapter._pending_messages, session_key, event) - # Interrupt the running agent — this aborts in-flight tool calls and - # causes the agent loop to exit at the next check point. + is_queue_mode = self._busy_input_mode == "queue" + + # If not in queue mode, interrupt the running agent immediately. + # This aborts in-flight tool calls and causes the agent loop to exit + # at the next check point. running_agent = self._running_agents.get(session_key) - if running_agent and running_agent is not _AGENT_PENDING_SENTINEL: + if not is_queue_mode and running_agent and running_agent is not _AGENT_PENDING_SENTINEL: try: running_agent.interrupt(event.text) except Exception: @@ -1583,7 +1579,7 @@ class GatewayRunner: now = time.time() last_ack = self._busy_ack_ts.get(session_key, 0) if now - last_ack < _BUSY_ACK_COOLDOWN: - return True # interrupt sent, ack already delivered recently + return True # interrupt sent (if not queue), ack already delivered recently self._busy_ack_ts[session_key] = now @@ -1608,10 +1604,16 @@ class GatewayRunner: pass status_detail = f" ({', '.join(status_parts)})" if status_parts else "" - message = ( - f"⚡ Interrupting current task{status_detail}. " - f"I'll respond to your message shortly." - ) + if is_queue_mode: + message = ( + f"⏳ Queued for the next turn{status_detail}. " + f"I'll respond once the current task finishes." + ) + else: + message = ( + f"⚡ Interrupting current task{status_detail}. " + f"I'll respond to your message shortly." + ) thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None try: diff --git a/tests/gateway/test_busy_session_ack.py b/tests/gateway/test_busy_session_ack.py index 07fe5fa27..52d4c23df 100644 --- a/tests/gateway/test_busy_session_ack.py +++ b/tests/gateway/test_busy_session_ack.py @@ -95,6 +95,7 @@ class TestBusySessionAck: async def test_sends_ack_when_agent_running(self): """First message during busy session should get a status ack.""" runner, sentinel = _make_runner() + runner._busy_input_mode = "interrupt" adapter = _make_adapter() event = _make_event(text="Are you working?") @@ -127,16 +128,42 @@ class TestBusySessionAck: assert "Interrupting" in content or "respond" in content assert "/stop" not in content # no need — we ARE interrupting - # Verify message was queued in adapter pending - assert sk in adapter._pending_messages - # Verify agent interrupt was called agent.interrupt.assert_called_once_with("Are you working?") + @pytest.mark.asyncio + async def test_queue_mode_suppresses_interrupt_and_updates_ack(self): + """When busy_input_mode is 'queue', message is queued WITHOUT interrupt.""" + runner, sentinel = _make_runner() + runner._busy_input_mode = "queue" + adapter = _make_adapter() + + event = _make_event(text="Add this to queue") + sk = build_session_key(event.source) + runner.adapters[event.source.platform] = adapter + + agent = MagicMock() + runner._running_agents[sk] = agent + + with patch("gateway.run.merge_pending_message_event"): + await runner._handle_active_session_busy_message(event, sk) + + # VERIFY: Agent was NOT interrupted + agent.interrupt.assert_not_called() + + # VERIFY: Ack sent with queue-specific wording + adapter._send_with_retry.assert_called_once() + call_kwargs = adapter._send_with_retry.call_args + content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "") + assert "Queued for the next turn" in content + assert "respond once the current task finishes" in content + assert "Interrupting" not in content + @pytest.mark.asyncio async def test_debounce_suppresses_rapid_acks(self): """Second message within 30s should NOT send another ack.""" runner, sentinel = _make_runner() + runner._busy_input_mode = "interrupt" adapter = _make_adapter() event1 = _make_event(text="hello?") @@ -172,13 +199,14 @@ class TestBusySessionAck: assert result2 is True assert adapter._send_with_retry.call_count == 1 # still 1, no new ack - # But interrupt should still be called for both + # But interrupt should still be called for both (since we are in interrupt mode) assert agent.interrupt.call_count == 2 @pytest.mark.asyncio async def test_ack_after_cooldown_expires(self): """After 30s cooldown, a new message should send a fresh ack.""" runner, sentinel = _make_runner() + runner._busy_input_mode = "interrupt" adapter = _make_adapter() event = _make_event(text="hello?") @@ -212,6 +240,7 @@ class TestBusySessionAck: async def test_includes_status_detail(self): """Ack message should include iteration and tool info when available.""" runner, sentinel = _make_runner() + runner._busy_input_mode = "interrupt" adapter = _make_adapter() event = _make_event(text="yo") @@ -243,6 +272,7 @@ class TestBusySessionAck: """Draining case should still produce the drain-specific message.""" runner, sentinel = _make_runner() runner._draining = True + runner._busy_input_mode = "interrupt" adapter = _make_adapter() event = _make_event(text="hello") @@ -264,6 +294,7 @@ class TestBusySessionAck: async def test_pending_sentinel_no_interrupt(self): """When agent is PENDING_SENTINEL, don't call interrupt (it has no method).""" runner, sentinel = _make_runner() + runner._busy_input_mode = "interrupt" adapter = _make_adapter() event = _make_event(text="hey")