fix(gateway): enhance message handling during agent tasks with queue mode support

This commit is contained in:
whitehatjr1001 2026-04-22 12:23:32 +05:30 committed by Teknium
parent 692ae6dd07
commit 9d147f7fde
2 changed files with 54 additions and 21 deletions

View file

@ -1551,27 +1551,23 @@ class GatewayRunner:
) )
return True return True
# --- Normal busy case (agent actively running a task) --- # Normal busy case (agent actively running a task)
# The user sent a message while the agent is working. Interrupt the
# agent immediately so it stops the current tool-calling loop and
# processes the new message. The pending message is stored in the
# adapter so the base adapter picks it up once the interrupted run
# returns. A brief ack tells the user what's happening (debounced
# to avoid spam when they fire multiple messages quickly).
adapter = self.adapters.get(event.source.platform) adapter = self.adapters.get(event.source.platform)
if not adapter: if not adapter:
return False # let default path handle it return False # let default path handle it
# Store the message so it's processed as the next turn after the # Store the message so it's processed as the next turn after the
# interrupt causes the current run to exit. # current run finishes (or is interrupted).
from gateway.platforms.base import merge_pending_message_event from gateway.platforms.base import merge_pending_message_event
merge_pending_message_event(adapter._pending_messages, session_key, event) merge_pending_message_event(adapter._pending_messages, session_key, event)
# Interrupt the running agent — this aborts in-flight tool calls and is_queue_mode = self._busy_input_mode == "queue"
# causes the agent loop to exit at the next check point.
# If not in queue mode, interrupt the running agent immediately.
# This aborts in-flight tool calls and causes the agent loop to exit
# at the next check point.
running_agent = self._running_agents.get(session_key) running_agent = self._running_agents.get(session_key)
if running_agent and running_agent is not _AGENT_PENDING_SENTINEL: if not is_queue_mode and running_agent and running_agent is not _AGENT_PENDING_SENTINEL:
try: try:
running_agent.interrupt(event.text) running_agent.interrupt(event.text)
except Exception: except Exception:
@ -1583,7 +1579,7 @@ class GatewayRunner:
now = time.time() now = time.time()
last_ack = self._busy_ack_ts.get(session_key, 0) last_ack = self._busy_ack_ts.get(session_key, 0)
if now - last_ack < _BUSY_ACK_COOLDOWN: if now - last_ack < _BUSY_ACK_COOLDOWN:
return True # interrupt sent, ack already delivered recently return True # interrupt sent (if not queue), ack already delivered recently
self._busy_ack_ts[session_key] = now self._busy_ack_ts[session_key] = now
@ -1608,6 +1604,12 @@ class GatewayRunner:
pass pass
status_detail = f" ({', '.join(status_parts)})" if status_parts else "" status_detail = f" ({', '.join(status_parts)})" if status_parts else ""
if is_queue_mode:
message = (
f"⏳ Queued for the next turn{status_detail}. "
f"I'll respond once the current task finishes."
)
else:
message = ( message = (
f"⚡ Interrupting current task{status_detail}. " f"⚡ Interrupting current task{status_detail}. "
f"I'll respond to your message shortly." f"I'll respond to your message shortly."

View file

@ -95,6 +95,7 @@ class TestBusySessionAck:
async def test_sends_ack_when_agent_running(self): async def test_sends_ack_when_agent_running(self):
"""First message during busy session should get a status ack.""" """First message during busy session should get a status ack."""
runner, sentinel = _make_runner() runner, sentinel = _make_runner()
runner._busy_input_mode = "interrupt"
adapter = _make_adapter() adapter = _make_adapter()
event = _make_event(text="Are you working?") event = _make_event(text="Are you working?")
@ -127,16 +128,42 @@ class TestBusySessionAck:
assert "Interrupting" in content or "respond" in content assert "Interrupting" in content or "respond" in content
assert "/stop" not in content # no need — we ARE interrupting assert "/stop" not in content # no need — we ARE interrupting
# Verify message was queued in adapter pending
assert sk in adapter._pending_messages
# Verify agent interrupt was called # Verify agent interrupt was called
agent.interrupt.assert_called_once_with("Are you working?") agent.interrupt.assert_called_once_with("Are you working?")
@pytest.mark.asyncio
async def test_queue_mode_suppresses_interrupt_and_updates_ack(self):
"""When busy_input_mode is 'queue', message is queued WITHOUT interrupt."""
runner, sentinel = _make_runner()
runner._busy_input_mode = "queue"
adapter = _make_adapter()
event = _make_event(text="Add this to queue")
sk = build_session_key(event.source)
runner.adapters[event.source.platform] = adapter
agent = MagicMock()
runner._running_agents[sk] = agent
with patch("gateway.run.merge_pending_message_event"):
await runner._handle_active_session_busy_message(event, sk)
# VERIFY: Agent was NOT interrupted
agent.interrupt.assert_not_called()
# VERIFY: Ack sent with queue-specific wording
adapter._send_with_retry.assert_called_once()
call_kwargs = adapter._send_with_retry.call_args
content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "")
assert "Queued for the next turn" in content
assert "respond once the current task finishes" in content
assert "Interrupting" not in content
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_debounce_suppresses_rapid_acks(self): async def test_debounce_suppresses_rapid_acks(self):
"""Second message within 30s should NOT send another ack.""" """Second message within 30s should NOT send another ack."""
runner, sentinel = _make_runner() runner, sentinel = _make_runner()
runner._busy_input_mode = "interrupt"
adapter = _make_adapter() adapter = _make_adapter()
event1 = _make_event(text="hello?") event1 = _make_event(text="hello?")
@ -172,13 +199,14 @@ class TestBusySessionAck:
assert result2 is True assert result2 is True
assert adapter._send_with_retry.call_count == 1 # still 1, no new ack assert adapter._send_with_retry.call_count == 1 # still 1, no new ack
# But interrupt should still be called for both # But interrupt should still be called for both (since we are in interrupt mode)
assert agent.interrupt.call_count == 2 assert agent.interrupt.call_count == 2
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_ack_after_cooldown_expires(self): async def test_ack_after_cooldown_expires(self):
"""After 30s cooldown, a new message should send a fresh ack.""" """After 30s cooldown, a new message should send a fresh ack."""
runner, sentinel = _make_runner() runner, sentinel = _make_runner()
runner._busy_input_mode = "interrupt"
adapter = _make_adapter() adapter = _make_adapter()
event = _make_event(text="hello?") event = _make_event(text="hello?")
@ -212,6 +240,7 @@ class TestBusySessionAck:
async def test_includes_status_detail(self): async def test_includes_status_detail(self):
"""Ack message should include iteration and tool info when available.""" """Ack message should include iteration and tool info when available."""
runner, sentinel = _make_runner() runner, sentinel = _make_runner()
runner._busy_input_mode = "interrupt"
adapter = _make_adapter() adapter = _make_adapter()
event = _make_event(text="yo") event = _make_event(text="yo")
@ -243,6 +272,7 @@ class TestBusySessionAck:
"""Draining case should still produce the drain-specific message.""" """Draining case should still produce the drain-specific message."""
runner, sentinel = _make_runner() runner, sentinel = _make_runner()
runner._draining = True runner._draining = True
runner._busy_input_mode = "interrupt"
adapter = _make_adapter() adapter = _make_adapter()
event = _make_event(text="hello") event = _make_event(text="hello")
@ -264,6 +294,7 @@ class TestBusySessionAck:
async def test_pending_sentinel_no_interrupt(self): async def test_pending_sentinel_no_interrupt(self):
"""When agent is PENDING_SENTINEL, don't call interrupt (it has no method).""" """When agent is PENDING_SENTINEL, don't call interrupt (it has no method)."""
runner, sentinel = _make_runner() runner, sentinel = _make_runner()
runner._busy_input_mode = "interrupt"
adapter = _make_adapter() adapter = _make_adapter()
event = _make_event(text="hey") event = _make_event(text="hey")