mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
When display.busy_input_mode is 'queue', the runner-level PRIORITY block
in _handle_message was still calling running_agent.interrupt() for every
text follow-up to an active session. The adapter-level busy handler
already honors queue mode (commit 9d147f7fd), but this runner-level path
was an unconditional interrupt regardless of config.
Adds a queue-mode branch that queues the follow-up via
_queue_or_replace_pending_event() and returns without interrupting.
Salvages the useful part of #12070 (@knockyai). The config fan-out to
per-platform extra was redundant — runner already loads busy_input_mode
directly via _load_busy_input_mode().
351 lines
13 KiB
Python
351 lines
13 KiB
Python
"""Tests for busy-session acknowledgment when user sends messages during active agent runs.
|
|
|
|
Verifies that users get an immediate status response instead of total silence
|
|
when the agent is working on a task. See PR fix for the @Lonely__MH report.
|
|
"""
|
|
import asyncio
|
|
import time
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Minimal stubs so we can import gateway code without heavy deps
|
|
# ---------------------------------------------------------------------------
|
|
import sys, types
|
|
|
|
_tg = types.ModuleType("telegram")
|
|
_tg.constants = types.ModuleType("telegram.constants")
|
|
_ct = MagicMock()
|
|
_ct.SUPERGROUP = "supergroup"
|
|
_ct.GROUP = "group"
|
|
_ct.PRIVATE = "private"
|
|
_tg.constants.ChatType = _ct
|
|
sys.modules.setdefault("telegram", _tg)
|
|
sys.modules.setdefault("telegram.constants", _tg.constants)
|
|
sys.modules.setdefault("telegram.ext", types.ModuleType("telegram.ext"))
|
|
|
|
from gateway.platforms.base import (
|
|
BasePlatformAdapter,
|
|
MessageEvent,
|
|
MessageType,
|
|
SessionSource,
|
|
build_session_key,
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _make_event(text="hello", chat_id="123", platform_val="telegram"):
|
|
"""Build a minimal MessageEvent."""
|
|
source = SessionSource(
|
|
platform=MagicMock(value=platform_val),
|
|
chat_id=chat_id,
|
|
chat_type="private",
|
|
user_id="user1",
|
|
)
|
|
evt = MessageEvent(
|
|
text=text,
|
|
message_type=MessageType.TEXT,
|
|
source=source,
|
|
message_id="msg1",
|
|
)
|
|
return evt
|
|
|
|
|
|
def _make_runner():
|
|
"""Build a minimal GatewayRunner-like object for testing."""
|
|
from gateway.run import GatewayRunner, _AGENT_PENDING_SENTINEL
|
|
|
|
runner = object.__new__(GatewayRunner)
|
|
runner._running_agents = {}
|
|
runner._running_agents_ts = {}
|
|
runner._pending_messages = {}
|
|
runner._busy_ack_ts = {}
|
|
runner._draining = False
|
|
runner.adapters = {}
|
|
runner.config = MagicMock()
|
|
runner.session_store = None
|
|
runner.hooks = MagicMock()
|
|
runner.hooks.emit = AsyncMock()
|
|
runner.pairing_store = MagicMock()
|
|
runner.pairing_store.is_approved.return_value = True
|
|
runner._is_user_authorized = lambda _source: True
|
|
return runner, _AGENT_PENDING_SENTINEL
|
|
|
|
|
|
def _make_adapter(platform_val="telegram"):
|
|
"""Build a minimal adapter mock."""
|
|
adapter = MagicMock()
|
|
adapter._pending_messages = {}
|
|
adapter._send_with_retry = AsyncMock()
|
|
adapter.config = MagicMock()
|
|
adapter.config.extra = {}
|
|
adapter.platform = MagicMock(value=platform_val)
|
|
return adapter
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestBusySessionAck:
|
|
"""User sends a message while agent is running — should get acknowledgment."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_handle_message_queue_mode_queues_without_interrupt(self):
|
|
"""Runner queue mode must not interrupt an active agent for text follow-ups."""
|
|
from gateway.run import GatewayRunner
|
|
|
|
runner, _sentinel = _make_runner()
|
|
adapter = _make_adapter()
|
|
|
|
event = _make_event(text="follow up in queue mode")
|
|
sk = build_session_key(event.source)
|
|
|
|
running_agent = MagicMock()
|
|
runner._busy_input_mode = "queue"
|
|
runner._running_agents[sk] = running_agent
|
|
runner.adapters[event.source.platform] = adapter
|
|
|
|
result = await GatewayRunner._handle_message(runner, event)
|
|
|
|
assert result is None
|
|
assert sk in adapter._pending_messages
|
|
assert adapter._pending_messages[sk] is event
|
|
assert sk not in runner._pending_messages
|
|
running_agent.interrupt.assert_not_called()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_sends_ack_when_agent_running(self):
|
|
"""First message during busy session should get a status ack."""
|
|
runner, sentinel = _make_runner()
|
|
runner._busy_input_mode = "interrupt"
|
|
adapter = _make_adapter()
|
|
|
|
event = _make_event(text="Are you working?")
|
|
sk = build_session_key(event.source)
|
|
|
|
# Simulate running agent
|
|
agent = MagicMock()
|
|
agent.get_activity_summary.return_value = {
|
|
"api_call_count": 21,
|
|
"max_iterations": 60,
|
|
"current_tool": "terminal",
|
|
"last_activity_ts": time.time(),
|
|
"last_activity_desc": "terminal",
|
|
"seconds_since_activity": 1.0,
|
|
}
|
|
runner._running_agents[sk] = agent
|
|
runner._running_agents_ts[sk] = time.time() - 600 # 10 min ago
|
|
runner.adapters[event.source.platform] = adapter
|
|
|
|
result = await runner._handle_active_session_busy_message(event, sk)
|
|
|
|
assert result is True # handled
|
|
# Verify ack was sent
|
|
adapter._send_with_retry.assert_called_once()
|
|
call_kwargs = adapter._send_with_retry.call_args
|
|
content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "")
|
|
if not content and call_kwargs.args:
|
|
# positional args
|
|
content = str(call_kwargs)
|
|
assert "Interrupting" in content or "respond" in content
|
|
assert "/stop" not in content # no need — we ARE interrupting
|
|
|
|
# Verify agent interrupt was called
|
|
agent.interrupt.assert_called_once_with("Are you working?")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_queue_mode_suppresses_interrupt_and_updates_ack(self):
|
|
"""When busy_input_mode is 'queue', message is queued WITHOUT interrupt."""
|
|
runner, sentinel = _make_runner()
|
|
runner._busy_input_mode = "queue"
|
|
adapter = _make_adapter()
|
|
|
|
event = _make_event(text="Add this to queue")
|
|
sk = build_session_key(event.source)
|
|
runner.adapters[event.source.platform] = adapter
|
|
|
|
agent = MagicMock()
|
|
runner._running_agents[sk] = agent
|
|
|
|
with patch("gateway.run.merge_pending_message_event"):
|
|
await runner._handle_active_session_busy_message(event, sk)
|
|
|
|
# VERIFY: Agent was NOT interrupted
|
|
agent.interrupt.assert_not_called()
|
|
|
|
# VERIFY: Ack sent with queue-specific wording
|
|
adapter._send_with_retry.assert_called_once()
|
|
call_kwargs = adapter._send_with_retry.call_args
|
|
content = call_kwargs.kwargs.get("content") or call_kwargs[1].get("content", "")
|
|
assert "Queued for the next turn" in content
|
|
assert "respond once the current task finishes" in content
|
|
assert "Interrupting" not in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_debounce_suppresses_rapid_acks(self):
|
|
"""Second message within 30s should NOT send another ack."""
|
|
runner, sentinel = _make_runner()
|
|
runner._busy_input_mode = "interrupt"
|
|
adapter = _make_adapter()
|
|
|
|
event1 = _make_event(text="hello?")
|
|
# Reuse the same source so platform mock matches
|
|
event2 = MessageEvent(
|
|
text="still there?",
|
|
message_type=MessageType.TEXT,
|
|
source=event1.source,
|
|
message_id="msg2",
|
|
)
|
|
sk = build_session_key(event1.source)
|
|
|
|
agent = MagicMock()
|
|
agent.get_activity_summary.return_value = {
|
|
"api_call_count": 5,
|
|
"max_iterations": 60,
|
|
"current_tool": None,
|
|
"last_activity_ts": time.time(),
|
|
"last_activity_desc": "api_call",
|
|
"seconds_since_activity": 0.5,
|
|
}
|
|
runner._running_agents[sk] = agent
|
|
runner._running_agents_ts[sk] = time.time() - 60
|
|
runner.adapters[event1.source.platform] = adapter
|
|
|
|
# First message — should get ack
|
|
result1 = await runner._handle_active_session_busy_message(event1, sk)
|
|
assert result1 is True
|
|
assert adapter._send_with_retry.call_count == 1
|
|
|
|
# Second message within cooldown — should be queued but no ack
|
|
result2 = await runner._handle_active_session_busy_message(event2, sk)
|
|
assert result2 is True
|
|
assert adapter._send_with_retry.call_count == 1 # still 1, no new ack
|
|
|
|
# But interrupt should still be called for both (since we are in interrupt mode)
|
|
assert agent.interrupt.call_count == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_ack_after_cooldown_expires(self):
|
|
"""After 30s cooldown, a new message should send a fresh ack."""
|
|
runner, sentinel = _make_runner()
|
|
runner._busy_input_mode = "interrupt"
|
|
adapter = _make_adapter()
|
|
|
|
event = _make_event(text="hello?")
|
|
sk = build_session_key(event.source)
|
|
|
|
agent = MagicMock()
|
|
agent.get_activity_summary.return_value = {
|
|
"api_call_count": 10,
|
|
"max_iterations": 60,
|
|
"current_tool": "web_search",
|
|
"last_activity_ts": time.time(),
|
|
"last_activity_desc": "tool",
|
|
"seconds_since_activity": 0.5,
|
|
}
|
|
runner._running_agents[sk] = agent
|
|
runner._running_agents_ts[sk] = time.time() - 120
|
|
runner.adapters[event.source.platform] = adapter
|
|
|
|
# First ack
|
|
await runner._handle_active_session_busy_message(event, sk)
|
|
assert adapter._send_with_retry.call_count == 1
|
|
|
|
# Fake that cooldown expired
|
|
runner._busy_ack_ts[sk] = time.time() - 31
|
|
|
|
# Second ack should go through
|
|
await runner._handle_active_session_busy_message(event, sk)
|
|
assert adapter._send_with_retry.call_count == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_includes_status_detail(self):
|
|
"""Ack message should include iteration and tool info when available."""
|
|
runner, sentinel = _make_runner()
|
|
runner._busy_input_mode = "interrupt"
|
|
adapter = _make_adapter()
|
|
|
|
event = _make_event(text="yo")
|
|
sk = build_session_key(event.source)
|
|
|
|
agent = MagicMock()
|
|
agent.get_activity_summary.return_value = {
|
|
"api_call_count": 21,
|
|
"max_iterations": 60,
|
|
"current_tool": "terminal",
|
|
"last_activity_ts": time.time(),
|
|
"last_activity_desc": "terminal",
|
|
"seconds_since_activity": 0.5,
|
|
}
|
|
runner._running_agents[sk] = agent
|
|
runner._running_agents_ts[sk] = time.time() - 600 # 10 min
|
|
runner.adapters[event.source.platform] = adapter
|
|
|
|
await runner._handle_active_session_busy_message(event, sk)
|
|
|
|
call_kwargs = adapter._send_with_retry.call_args
|
|
content = call_kwargs.kwargs.get("content", "")
|
|
assert "21/60" in content # iteration
|
|
assert "terminal" in content # current tool
|
|
assert "10 min" in content # elapsed
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_draining_still_works(self):
|
|
"""Draining case should still produce the drain-specific message."""
|
|
runner, sentinel = _make_runner()
|
|
runner._draining = True
|
|
runner._busy_input_mode = "interrupt"
|
|
adapter = _make_adapter()
|
|
|
|
event = _make_event(text="hello")
|
|
sk = build_session_key(event.source)
|
|
runner.adapters[event.source.platform] = adapter
|
|
|
|
# Mock the drain-specific methods
|
|
runner._queue_during_drain_enabled = lambda: False
|
|
runner._status_action_gerund = lambda: "restarting"
|
|
|
|
result = await runner._handle_active_session_busy_message(event, sk)
|
|
assert result is True
|
|
|
|
call_kwargs = adapter._send_with_retry.call_args
|
|
content = call_kwargs.kwargs.get("content", "")
|
|
assert "restarting" in content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pending_sentinel_no_interrupt(self):
|
|
"""When agent is PENDING_SENTINEL, don't call interrupt (it has no method)."""
|
|
runner, sentinel = _make_runner()
|
|
runner._busy_input_mode = "interrupt"
|
|
adapter = _make_adapter()
|
|
|
|
event = _make_event(text="hey")
|
|
sk = build_session_key(event.source)
|
|
|
|
runner._running_agents[sk] = sentinel
|
|
runner._running_agents_ts[sk] = time.time()
|
|
runner.adapters[event.source.platform] = adapter
|
|
|
|
result = await runner._handle_active_session_busy_message(event, sk)
|
|
assert result is True
|
|
# Should still send ack
|
|
adapter._send_with_retry.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_no_adapter_falls_through(self):
|
|
"""If adapter is missing, return False so default path handles it."""
|
|
runner, sentinel = _make_runner()
|
|
|
|
event = _make_event(text="hello")
|
|
sk = build_session_key(event.source)
|
|
|
|
# No adapter registered
|
|
runner._running_agents[sk] = MagicMock()
|
|
|
|
result = await runner._handle_active_session_busy_message(event, sk)
|
|
assert result is False # not handled, let default path try
|