mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): enhance message handling during agent tasks with queue mode support
This commit is contained in:
parent
692ae6dd07
commit
9d147f7fde
2 changed files with 54 additions and 21 deletions
|
|
@ -1551,27 +1551,23 @@ class GatewayRunner:
|
|||
)
|
||||
return True
|
||||
|
||||
# --- Normal busy case (agent actively running a task) ---
|
||||
# The user sent a message while the agent is working. Interrupt the
|
||||
# agent immediately so it stops the current tool-calling loop and
|
||||
# processes the new message. The pending message is stored in the
|
||||
# adapter so the base adapter picks it up once the interrupted run
|
||||
# returns. A brief ack tells the user what's happening (debounced
|
||||
# to avoid spam when they fire multiple messages quickly).
|
||||
|
||||
# Normal busy case (agent actively running a task)
|
||||
adapter = self.adapters.get(event.source.platform)
|
||||
if not adapter:
|
||||
return False # let default path handle it
|
||||
|
||||
# Store the message so it's processed as the next turn after the
|
||||
# interrupt causes the current run to exit.
|
||||
# current run finishes (or is interrupted).
|
||||
from gateway.platforms.base import merge_pending_message_event
|
||||
merge_pending_message_event(adapter._pending_messages, session_key, event)
|
||||
|
||||
# Interrupt the running agent — this aborts in-flight tool calls and
|
||||
# causes the agent loop to exit at the next check point.
|
||||
is_queue_mode = self._busy_input_mode == "queue"
|
||||
|
||||
# If not in queue mode, interrupt the running agent immediately.
|
||||
# This aborts in-flight tool calls and causes the agent loop to exit
|
||||
# at the next check point.
|
||||
running_agent = self._running_agents.get(session_key)
|
||||
if running_agent and running_agent is not _AGENT_PENDING_SENTINEL:
|
||||
if not is_queue_mode and running_agent and running_agent is not _AGENT_PENDING_SENTINEL:
|
||||
try:
|
||||
running_agent.interrupt(event.text)
|
||||
except Exception:
|
||||
|
|
@ -1583,7 +1579,7 @@ class GatewayRunner:
|
|||
now = time.time()
|
||||
last_ack = self._busy_ack_ts.get(session_key, 0)
|
||||
if now - last_ack < _BUSY_ACK_COOLDOWN:
|
||||
return True # interrupt sent, ack already delivered recently
|
||||
return True # interrupt sent (if not queue), ack already delivered recently
|
||||
|
||||
self._busy_ack_ts[session_key] = now
|
||||
|
||||
|
|
@ -1608,10 +1604,16 @@ class GatewayRunner:
|
|||
pass
|
||||
|
||||
status_detail = f" ({', '.join(status_parts)})" if status_parts else ""
|
||||
message = (
|
||||
f"⚡ Interrupting current task{status_detail}. "
|
||||
f"I'll respond to your message shortly."
|
||||
)
|
||||
if is_queue_mode:
|
||||
message = (
|
||||
f"⏳ Queued for the next turn{status_detail}. "
|
||||
f"I'll respond once the current task finishes."
|
||||
)
|
||||
else:
|
||||
message = (
|
||||
f"⚡ Interrupting current task{status_detail}. "
|
||||
f"I'll respond to your message shortly."
|
||||
)
|
||||
|
||||
thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -95,6 +95,7 @@ class TestBusySessionAck:
|
|||
async def test_sends_ack_when_agent_running(self):
|
||||
"""First message during busy session should get a status ack."""
|
||||
runner, sentinel = _make_runner()
|
||||
runner._busy_input_mode = "interrupt"
|
||||
adapter = _make_adapter()
|
||||
|
||||
event = _make_event(text="Are you working?")
|
||||
|
|
@ -127,16 +128,42 @@ class TestBusySessionAck:
|
|||
assert "Interrupting" in content or "respond" in content
|
||||
assert "/stop" not in content # no need — we ARE interrupting
|
||||
|
||||
# Verify message was queued in adapter pending
|
||||
assert sk in adapter._pending_messages
|
||||
|
||||
# Verify agent interrupt was called
|
||||
agent.interrupt.assert_called_once_with("Are you working?")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_queue_mode_suppresses_interrupt_and_updates_ack(self):
|
||||
"""When busy_input_mode is 'queue', message is queued WITHOUT interrupt."""
|
||||
runner, sentinel = _make_runner()
|
||||
runner._busy_input_mode = "queue"
|
||||
adapter = _make_adapter()
|
||||
|
||||
event = _make_event(text="Add this to queue")
|
||||
sk = build_session_key(event.source)
|
||||
runner.adapters[event.source.platform] = adapter
|
||||
|
||||
agent = MagicMock()
|
||||
runner._running_agents[sk] = agent
|
||||
|
||||
with patch("gateway.run.merge_pending_message_event"):
|
||||
await runner._handle_active_session_busy_message(event, sk)
|
||||
|
||||
# VERIFY: Agent was NOT interrupted
|
||||
agent.interrupt.assert_not_called()
|
||||
|
||||
# VERIFY: Ack sent with queue-specific wording
|
||||
adapter._send_with_retry.assert_called_once()
|
||||
call_kwargs = adapter._send_with_retry.call_args
|
||||
content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "")
|
||||
assert "Queued for the next turn" in content
|
||||
assert "respond once the current task finishes" in content
|
||||
assert "Interrupting" not in content
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_debounce_suppresses_rapid_acks(self):
|
||||
"""Second message within 30s should NOT send another ack."""
|
||||
runner, sentinel = _make_runner()
|
||||
runner._busy_input_mode = "interrupt"
|
||||
adapter = _make_adapter()
|
||||
|
||||
event1 = _make_event(text="hello?")
|
||||
|
|
@ -172,13 +199,14 @@ class TestBusySessionAck:
|
|||
assert result2 is True
|
||||
assert adapter._send_with_retry.call_count == 1 # still 1, no new ack
|
||||
|
||||
# But interrupt should still be called for both
|
||||
# But interrupt should still be called for both (since we are in interrupt mode)
|
||||
assert agent.interrupt.call_count == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_ack_after_cooldown_expires(self):
|
||||
"""After 30s cooldown, a new message should send a fresh ack."""
|
||||
runner, sentinel = _make_runner()
|
||||
runner._busy_input_mode = "interrupt"
|
||||
adapter = _make_adapter()
|
||||
|
||||
event = _make_event(text="hello?")
|
||||
|
|
@ -212,6 +240,7 @@ class TestBusySessionAck:
|
|||
async def test_includes_status_detail(self):
|
||||
"""Ack message should include iteration and tool info when available."""
|
||||
runner, sentinel = _make_runner()
|
||||
runner._busy_input_mode = "interrupt"
|
||||
adapter = _make_adapter()
|
||||
|
||||
event = _make_event(text="yo")
|
||||
|
|
@ -243,6 +272,7 @@ class TestBusySessionAck:
|
|||
"""Draining case should still produce the drain-specific message."""
|
||||
runner, sentinel = _make_runner()
|
||||
runner._draining = True
|
||||
runner._busy_input_mode = "interrupt"
|
||||
adapter = _make_adapter()
|
||||
|
||||
event = _make_event(text="hello")
|
||||
|
|
@ -264,6 +294,7 @@ class TestBusySessionAck:
|
|||
async def test_pending_sentinel_no_interrupt(self):
|
||||
"""When agent is PENDING_SENTINEL, don't call interrupt (it has no method)."""
|
||||
runner, sentinel = _make_runner()
|
||||
runner._busy_input_mode = "interrupt"
|
||||
adapter = _make_adapter()
|
||||
|
||||
event = _make_event(text="hey")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue