mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): enhance message handling during agent tasks with queue mode support
This commit is contained in:
parent
692ae6dd07
commit
9d147f7fde
2 changed files with 54 additions and 21 deletions
|
|
@ -1551,27 +1551,23 @@ class GatewayRunner:
|
||||||
)
|
)
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# --- Normal busy case (agent actively running a task) ---
|
# Normal busy case (agent actively running a task)
|
||||||
# The user sent a message while the agent is working. Interrupt the
|
|
||||||
# agent immediately so it stops the current tool-calling loop and
|
|
||||||
# processes the new message. The pending message is stored in the
|
|
||||||
# adapter so the base adapter picks it up once the interrupted run
|
|
||||||
# returns. A brief ack tells the user what's happening (debounced
|
|
||||||
# to avoid spam when they fire multiple messages quickly).
|
|
||||||
|
|
||||||
adapter = self.adapters.get(event.source.platform)
|
adapter = self.adapters.get(event.source.platform)
|
||||||
if not adapter:
|
if not adapter:
|
||||||
return False # let default path handle it
|
return False # let default path handle it
|
||||||
|
|
||||||
# Store the message so it's processed as the next turn after the
|
# Store the message so it's processed as the next turn after the
|
||||||
# interrupt causes the current run to exit.
|
# current run finishes (or is interrupted).
|
||||||
from gateway.platforms.base import merge_pending_message_event
|
from gateway.platforms.base import merge_pending_message_event
|
||||||
merge_pending_message_event(adapter._pending_messages, session_key, event)
|
merge_pending_message_event(adapter._pending_messages, session_key, event)
|
||||||
|
|
||||||
# Interrupt the running agent — this aborts in-flight tool calls and
|
is_queue_mode = self._busy_input_mode == "queue"
|
||||||
# causes the agent loop to exit at the next check point.
|
|
||||||
|
# If not in queue mode, interrupt the running agent immediately.
|
||||||
|
# This aborts in-flight tool calls and causes the agent loop to exit
|
||||||
|
# at the next check point.
|
||||||
running_agent = self._running_agents.get(session_key)
|
running_agent = self._running_agents.get(session_key)
|
||||||
if running_agent and running_agent is not _AGENT_PENDING_SENTINEL:
|
if not is_queue_mode and running_agent and running_agent is not _AGENT_PENDING_SENTINEL:
|
||||||
try:
|
try:
|
||||||
running_agent.interrupt(event.text)
|
running_agent.interrupt(event.text)
|
||||||
except Exception:
|
except Exception:
|
||||||
|
|
@ -1583,7 +1579,7 @@ class GatewayRunner:
|
||||||
now = time.time()
|
now = time.time()
|
||||||
last_ack = self._busy_ack_ts.get(session_key, 0)
|
last_ack = self._busy_ack_ts.get(session_key, 0)
|
||||||
if now - last_ack < _BUSY_ACK_COOLDOWN:
|
if now - last_ack < _BUSY_ACK_COOLDOWN:
|
||||||
return True # interrupt sent, ack already delivered recently
|
return True # interrupt sent (if not queue), ack already delivered recently
|
||||||
|
|
||||||
self._busy_ack_ts[session_key] = now
|
self._busy_ack_ts[session_key] = now
|
||||||
|
|
||||||
|
|
@ -1608,10 +1604,16 @@ class GatewayRunner:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
status_detail = f" ({', '.join(status_parts)})" if status_parts else ""
|
status_detail = f" ({', '.join(status_parts)})" if status_parts else ""
|
||||||
message = (
|
if is_queue_mode:
|
||||||
f"⚡ Interrupting current task{status_detail}. "
|
message = (
|
||||||
f"I'll respond to your message shortly."
|
f"⏳ Queued for the next turn{status_detail}. "
|
||||||
)
|
f"I'll respond once the current task finishes."
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
message = (
|
||||||
|
f"⚡ Interrupting current task{status_detail}. "
|
||||||
|
f"I'll respond to your message shortly."
|
||||||
|
)
|
||||||
|
|
||||||
thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None
|
thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
|
|
@ -95,6 +95,7 @@ class TestBusySessionAck:
|
||||||
async def test_sends_ack_when_agent_running(self):
|
async def test_sends_ack_when_agent_running(self):
|
||||||
"""First message during busy session should get a status ack."""
|
"""First message during busy session should get a status ack."""
|
||||||
runner, sentinel = _make_runner()
|
runner, sentinel = _make_runner()
|
||||||
|
runner._busy_input_mode = "interrupt"
|
||||||
adapter = _make_adapter()
|
adapter = _make_adapter()
|
||||||
|
|
||||||
event = _make_event(text="Are you working?")
|
event = _make_event(text="Are you working?")
|
||||||
|
|
@ -127,16 +128,42 @@ class TestBusySessionAck:
|
||||||
assert "Interrupting" in content or "respond" in content
|
assert "Interrupting" in content or "respond" in content
|
||||||
assert "/stop" not in content # no need — we ARE interrupting
|
assert "/stop" not in content # no need — we ARE interrupting
|
||||||
|
|
||||||
# Verify message was queued in adapter pending
|
|
||||||
assert sk in adapter._pending_messages
|
|
||||||
|
|
||||||
# Verify agent interrupt was called
|
# Verify agent interrupt was called
|
||||||
agent.interrupt.assert_called_once_with("Are you working?")
|
agent.interrupt.assert_called_once_with("Are you working?")
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_queue_mode_suppresses_interrupt_and_updates_ack(self):
|
||||||
|
"""When busy_input_mode is 'queue', message is queued WITHOUT interrupt."""
|
||||||
|
runner, sentinel = _make_runner()
|
||||||
|
runner._busy_input_mode = "queue"
|
||||||
|
adapter = _make_adapter()
|
||||||
|
|
||||||
|
event = _make_event(text="Add this to queue")
|
||||||
|
sk = build_session_key(event.source)
|
||||||
|
runner.adapters[event.source.platform] = adapter
|
||||||
|
|
||||||
|
agent = MagicMock()
|
||||||
|
runner._running_agents[sk] = agent
|
||||||
|
|
||||||
|
with patch("gateway.run.merge_pending_message_event"):
|
||||||
|
await runner._handle_active_session_busy_message(event, sk)
|
||||||
|
|
||||||
|
# VERIFY: Agent was NOT interrupted
|
||||||
|
agent.interrupt.assert_not_called()
|
||||||
|
|
||||||
|
# VERIFY: Ack sent with queue-specific wording
|
||||||
|
adapter._send_with_retry.assert_called_once()
|
||||||
|
call_kwargs = adapter._send_with_retry.call_args
|
||||||
|
content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "")
|
||||||
|
assert "Queued for the next turn" in content
|
||||||
|
assert "respond once the current task finishes" in content
|
||||||
|
assert "Interrupting" not in content
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_debounce_suppresses_rapid_acks(self):
|
async def test_debounce_suppresses_rapid_acks(self):
|
||||||
"""Second message within 30s should NOT send another ack."""
|
"""Second message within 30s should NOT send another ack."""
|
||||||
runner, sentinel = _make_runner()
|
runner, sentinel = _make_runner()
|
||||||
|
runner._busy_input_mode = "interrupt"
|
||||||
adapter = _make_adapter()
|
adapter = _make_adapter()
|
||||||
|
|
||||||
event1 = _make_event(text="hello?")
|
event1 = _make_event(text="hello?")
|
||||||
|
|
@ -172,13 +199,14 @@ class TestBusySessionAck:
|
||||||
assert result2 is True
|
assert result2 is True
|
||||||
assert adapter._send_with_retry.call_count == 1 # still 1, no new ack
|
assert adapter._send_with_retry.call_count == 1 # still 1, no new ack
|
||||||
|
|
||||||
# But interrupt should still be called for both
|
# But interrupt should still be called for both (since we are in interrupt mode)
|
||||||
assert agent.interrupt.call_count == 2
|
assert agent.interrupt.call_count == 2
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_ack_after_cooldown_expires(self):
|
async def test_ack_after_cooldown_expires(self):
|
||||||
"""After 30s cooldown, a new message should send a fresh ack."""
|
"""After 30s cooldown, a new message should send a fresh ack."""
|
||||||
runner, sentinel = _make_runner()
|
runner, sentinel = _make_runner()
|
||||||
|
runner._busy_input_mode = "interrupt"
|
||||||
adapter = _make_adapter()
|
adapter = _make_adapter()
|
||||||
|
|
||||||
event = _make_event(text="hello?")
|
event = _make_event(text="hello?")
|
||||||
|
|
@ -212,6 +240,7 @@ class TestBusySessionAck:
|
||||||
async def test_includes_status_detail(self):
|
async def test_includes_status_detail(self):
|
||||||
"""Ack message should include iteration and tool info when available."""
|
"""Ack message should include iteration and tool info when available."""
|
||||||
runner, sentinel = _make_runner()
|
runner, sentinel = _make_runner()
|
||||||
|
runner._busy_input_mode = "interrupt"
|
||||||
adapter = _make_adapter()
|
adapter = _make_adapter()
|
||||||
|
|
||||||
event = _make_event(text="yo")
|
event = _make_event(text="yo")
|
||||||
|
|
@ -243,6 +272,7 @@ class TestBusySessionAck:
|
||||||
"""Draining case should still produce the drain-specific message."""
|
"""Draining case should still produce the drain-specific message."""
|
||||||
runner, sentinel = _make_runner()
|
runner, sentinel = _make_runner()
|
||||||
runner._draining = True
|
runner._draining = True
|
||||||
|
runner._busy_input_mode = "interrupt"
|
||||||
adapter = _make_adapter()
|
adapter = _make_adapter()
|
||||||
|
|
||||||
event = _make_event(text="hello")
|
event = _make_event(text="hello")
|
||||||
|
|
@ -264,6 +294,7 @@ class TestBusySessionAck:
|
||||||
async def test_pending_sentinel_no_interrupt(self):
|
async def test_pending_sentinel_no_interrupt(self):
|
||||||
"""When agent is PENDING_SENTINEL, don't call interrupt (it has no method)."""
|
"""When agent is PENDING_SENTINEL, don't call interrupt (it has no method)."""
|
||||||
runner, sentinel = _make_runner()
|
runner, sentinel = _make_runner()
|
||||||
|
runner._busy_input_mode = "interrupt"
|
||||||
adapter = _make_adapter()
|
adapter = _make_adapter()
|
||||||
|
|
||||||
event = _make_event(text="hey")
|
event = _make_event(text="hey")
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue