mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
fix(gateway): recover from hung agents — /stop force-unlocks session (#3104)
When an agent thread hangs (truly blocked, never checks _interrupt_requested), /stop now force-cleans _running_agents to unlock the session immediately. Two changes: - Early /stop intercept in the running-agent guard: bypasses normal command dispatch to force-interrupt and unlock the session. Follows the same pattern as the existing /new intercept. - Sentinel /stop: force-cleans the sentinel instead of returning 'nothing to stop yet', so /stop during slow startup actually unlocks the session. Follow-up improvements over original PR: - Consolidated duplicate resolve_command imports into single early resolution - Updated _handle_stop_command to also force-clean for consistency - Removed 10-minute hard timeout on the executor (would kill legitimate long-running agent tasks; the /stop force-clean handles recovery) Cherry-picked from Mibayy's PR #2498. Co-authored-by: Mibayy <Mibayy@users.noreply.github.com>
This commit is contained in:
parent
f46542b6c6
commit
59575d6a91
2 changed files with 125 additions and 14 deletions
|
|
@ -1556,6 +1556,30 @@ class GatewayRunner:
|
|||
if event.get_command() == "status":
|
||||
return await self._handle_status_command(event)
|
||||
|
||||
# Resolve the command once for all early-intercept checks below.
|
||||
from hermes_cli.commands import resolve_command as _resolve_cmd_inner
|
||||
_evt_cmd = event.get_command()
|
||||
_cmd_def_inner = _resolve_cmd_inner(_evt_cmd) if _evt_cmd else None
|
||||
|
||||
# /stop must hard-kill the session when an agent is running.
|
||||
# A soft interrupt (agent.interrupt()) doesn't help when the agent
|
||||
# is truly hung — the executor thread is blocked and never checks
|
||||
# _interrupt_requested. Force-clean _running_agents so the session
|
||||
# is unlocked and subsequent messages are processed normally.
|
||||
if _cmd_def_inner and _cmd_def_inner.name == "stop":
|
||||
running_agent = self._running_agents.get(_quick_key)
|
||||
if running_agent and running_agent is not _AGENT_PENDING_SENTINEL:
|
||||
running_agent.interrupt("Stop requested")
|
||||
# Force-clean: remove the session lock regardless of agent state
|
||||
adapter = self.adapters.get(source.platform)
|
||||
if adapter and hasattr(adapter, 'get_pending_message'):
|
||||
adapter.get_pending_message(_quick_key) # consume and discard
|
||||
self._pending_messages.pop(_quick_key, None)
|
||||
if _quick_key in self._running_agents:
|
||||
del self._running_agents[_quick_key]
|
||||
logger.info("HARD STOP for session %s — session lock released", _quick_key[:20])
|
||||
return "⚡ Force-stopped. The session is unlocked — you can send a new message."
|
||||
|
||||
# /reset and /new must bypass the running-agent guard so they
|
||||
# actually dispatch as commands instead of being queued as user
|
||||
# text (which would be fed back to the agent with the same
|
||||
|
|
@ -1563,9 +1587,6 @@ class GatewayRunner:
|
|||
# clear the adapter's pending queue so the stale "/reset" text
|
||||
# doesn't get re-processed as a user message after the
|
||||
# interrupt completes.
|
||||
from hermes_cli.commands import resolve_command as _resolve_cmd_inner
|
||||
_evt_cmd = event.get_command()
|
||||
_cmd_def_inner = _resolve_cmd_inner(_evt_cmd) if _evt_cmd else None
|
||||
if _cmd_def_inner and _cmd_def_inner.name == "new":
|
||||
running_agent = self._running_agents.get(_quick_key)
|
||||
if running_agent and running_agent is not _AGENT_PENDING_SENTINEL:
|
||||
|
|
@ -1623,8 +1644,11 @@ class GatewayRunner:
|
|||
if running_agent is _AGENT_PENDING_SENTINEL:
|
||||
# Agent is being set up but not ready yet.
|
||||
if event.get_command() == "stop":
|
||||
# Nothing to interrupt — agent hasn't started yet.
|
||||
return "⏳ The agent is still starting up — nothing to stop yet."
|
||||
# Force-clean the sentinel so the session is unlocked.
|
||||
if _quick_key in self._running_agents:
|
||||
del self._running_agents[_quick_key]
|
||||
logger.info("HARD STOP (pending) for session %s — sentinel cleared", _quick_key[:20])
|
||||
return "⚡ Force-stopped. The agent was still starting — session unlocked."
|
||||
# Queue the message so it will be picked up after the
|
||||
# agent starts.
|
||||
adapter = self.adapters.get(source.platform)
|
||||
|
|
@ -2729,17 +2753,32 @@ class GatewayRunner:
|
|||
return "\n".join(lines)
|
||||
|
||||
async def _handle_stop_command(self, event: MessageEvent) -> str:
|
||||
"""Handle /stop command - interrupt a running agent."""
|
||||
"""Handle /stop command - interrupt a running agent.
|
||||
|
||||
When an agent is truly hung (blocked thread that never checks
|
||||
_interrupt_requested), the early intercept in _handle_message()
|
||||
handles /stop before this method is reached. This handler fires
|
||||
only through normal command dispatch (no running agent) or as a
|
||||
fallback. Force-clean the session lock in all cases for safety.
|
||||
"""
|
||||
source = event.source
|
||||
session_entry = self.session_store.get_or_create_session(source)
|
||||
session_key = session_entry.session_key
|
||||
|
||||
agent = self._running_agents.get(session_key)
|
||||
if agent is _AGENT_PENDING_SENTINEL:
|
||||
return "⏳ The agent is still starting up — nothing to stop yet."
|
||||
# Force-clean the sentinel so the session is unlocked.
|
||||
if session_key in self._running_agents:
|
||||
del self._running_agents[session_key]
|
||||
logger.info("HARD STOP (pending) for session %s — sentinel cleared", session_key[:20])
|
||||
return "⚡ Force-stopped. The agent was still starting — session unlocked."
|
||||
if agent:
|
||||
agent.interrupt()
|
||||
return "⚡ Stopping the current task... The agent will finish its current step and respond."
|
||||
agent.interrupt("Stop requested")
|
||||
# Force-clean the session lock so a truly hung agent doesn't
|
||||
# keep it locked forever.
|
||||
if session_key in self._running_agents:
|
||||
del self._running_agents[session_key]
|
||||
return "⚡ Force-stopped. The session is unlocked — you can send a new message."
|
||||
else:
|
||||
return "No active task to stop."
|
||||
|
||||
|
|
|
|||
|
|
@ -198,12 +198,12 @@ async def test_command_messages_do_not_leave_sentinel():
|
|||
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Test 6: /stop during sentinel returns helpful message
|
||||
# Test 6: /stop during sentinel force-cleans and unlocks session
|
||||
# ------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_during_sentinel_returns_message():
|
||||
async def test_stop_during_sentinel_force_cleans_session():
|
||||
"""If /stop arrives while the sentinel is set (agent still starting),
|
||||
it should return a helpful message instead of crashing or queuing."""
|
||||
it should force-clean the sentinel and unlock the session."""
|
||||
runner = _make_runner()
|
||||
event1 = _make_event(text="hello")
|
||||
session_key = build_session_key(event1.source)
|
||||
|
|
@ -221,11 +221,16 @@ async def test_stop_during_sentinel_returns_message():
|
|||
# Sentinel should be set
|
||||
assert runner._running_agents.get(session_key) is _AGENT_PENDING_SENTINEL
|
||||
|
||||
# Send /stop — should get a message, not crash
|
||||
# Send /stop — should force-clean the sentinel
|
||||
stop_event = _make_event(text="/stop")
|
||||
result = await runner._handle_message(stop_event)
|
||||
assert result is not None, "/stop during sentinel should return a message"
|
||||
assert "starting up" in result.lower()
|
||||
assert "force-stopped" in result.lower() or "unlocked" in result.lower()
|
||||
|
||||
# Sentinel must be cleaned up
|
||||
assert session_key not in runner._running_agents, (
|
||||
"/stop must remove sentinel so the session is unlocked"
|
||||
)
|
||||
|
||||
# Should NOT be queued as pending
|
||||
adapter = runner.adapters[Platform.TELEGRAM]
|
||||
|
|
@ -235,6 +240,73 @@ async def test_stop_during_sentinel_returns_message():
|
|||
await task1
|
||||
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Test 6b: /stop hard-kills a running agent and unlocks session
|
||||
# ------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_hard_kills_running_agent():
|
||||
"""When /stop arrives while a real agent is running, it must:
|
||||
1. Call interrupt() on the agent
|
||||
2. Force-clean _running_agents to unlock the session
|
||||
3. Return a confirmation message
|
||||
This fixes the bug where a hung agent kept the session locked
|
||||
forever — showing 'writing...' but never producing output."""
|
||||
runner = _make_runner()
|
||||
session_key = build_session_key(
|
||||
SessionSource(platform=Platform.TELEGRAM, chat_id="12345", chat_type="dm")
|
||||
)
|
||||
|
||||
# Simulate a running (possibly hung) agent
|
||||
fake_agent = MagicMock()
|
||||
runner._running_agents[session_key] = fake_agent
|
||||
|
||||
# Send /stop
|
||||
stop_event = _make_event(text="/stop")
|
||||
result = await runner._handle_message(stop_event)
|
||||
|
||||
# Agent must have been interrupted
|
||||
fake_agent.interrupt.assert_called_once_with("Stop requested")
|
||||
|
||||
# Session must be unlocked
|
||||
assert session_key not in runner._running_agents, (
|
||||
"/stop must remove the agent from _running_agents so the session is unlocked"
|
||||
)
|
||||
|
||||
# Must return a confirmation
|
||||
assert result is not None
|
||||
assert "force-stopped" in result.lower() or "unlocked" in result.lower()
|
||||
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Test 6c: /stop clears pending messages to prevent stale replays
|
||||
# ------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_stop_clears_pending_messages():
|
||||
"""When /stop hard-kills a running agent, any pending messages
|
||||
queued during the run must be discarded."""
|
||||
runner = _make_runner()
|
||||
session_key = build_session_key(
|
||||
SessionSource(platform=Platform.TELEGRAM, chat_id="12345", chat_type="dm")
|
||||
)
|
||||
|
||||
fake_agent = MagicMock()
|
||||
runner._running_agents[session_key] = fake_agent
|
||||
runner._pending_messages[session_key] = "some queued text"
|
||||
|
||||
# Queue a pending message in the adapter too
|
||||
adapter = runner.adapters[Platform.TELEGRAM]
|
||||
adapter._pending_messages[session_key] = _make_event(text="queued")
|
||||
adapter.get_pending_message = MagicMock(return_value=_make_event(text="queued"))
|
||||
adapter.has_pending_interrupt = MagicMock(return_value=False)
|
||||
|
||||
stop_event = _make_event(text="/stop")
|
||||
await runner._handle_message(stop_event)
|
||||
|
||||
# Pending messages must be cleared
|
||||
assert session_key not in runner._pending_messages
|
||||
adapter.get_pending_message.assert_called_once_with(session_key)
|
||||
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Test 7: Shutdown skips sentinel entries
|
||||
# ------------------------------------------------------------------
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue