diff --git a/gateway/run.py b/gateway/run.py index 910ba73b0..de9eb59e8 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -1556,6 +1556,30 @@ class GatewayRunner: if event.get_command() == "status": return await self._handle_status_command(event) + # Resolve the command once for all early-intercept checks below. + from hermes_cli.commands import resolve_command as _resolve_cmd_inner + _evt_cmd = event.get_command() + _cmd_def_inner = _resolve_cmd_inner(_evt_cmd) if _evt_cmd else None + + # /stop must hard-kill the session when an agent is running. + # A soft interrupt (agent.interrupt()) doesn't help when the agent + # is truly hung — the executor thread is blocked and never checks + # _interrupt_requested. Force-clean _running_agents so the session + # is unlocked and subsequent messages are processed normally. + if _cmd_def_inner and _cmd_def_inner.name == "stop": + running_agent = self._running_agents.get(_quick_key) + if running_agent and running_agent is not _AGENT_PENDING_SENTINEL: + running_agent.interrupt("Stop requested") + # Force-clean: remove the session lock regardless of agent state + adapter = self.adapters.get(source.platform) + if adapter and hasattr(adapter, 'get_pending_message'): + adapter.get_pending_message(_quick_key) # consume and discard + self._pending_messages.pop(_quick_key, None) + if _quick_key in self._running_agents: + del self._running_agents[_quick_key] + logger.info("HARD STOP for session %s — session lock released", _quick_key[:20]) + return "⚡ Force-stopped. The session is unlocked — you can send a new message." + # /reset and /new must bypass the running-agent guard so they # actually dispatch as commands instead of being queued as user # text (which would be fed back to the agent with the same @@ -1563,9 +1587,6 @@ class GatewayRunner: # clear the adapter's pending queue so the stale "/reset" text # doesn't get re-processed as a user message after the # interrupt completes. - from hermes_cli.commands import resolve_command as _resolve_cmd_inner - _evt_cmd = event.get_command() - _cmd_def_inner = _resolve_cmd_inner(_evt_cmd) if _evt_cmd else None if _cmd_def_inner and _cmd_def_inner.name == "new": running_agent = self._running_agents.get(_quick_key) if running_agent and running_agent is not _AGENT_PENDING_SENTINEL: @@ -1623,8 +1644,11 @@ class GatewayRunner: if running_agent is _AGENT_PENDING_SENTINEL: # Agent is being set up but not ready yet. if event.get_command() == "stop": - # Nothing to interrupt — agent hasn't started yet. - return "⏳ The agent is still starting up — nothing to stop yet." + # Force-clean the sentinel so the session is unlocked. + if _quick_key in self._running_agents: + del self._running_agents[_quick_key] + logger.info("HARD STOP (pending) for session %s — sentinel cleared", _quick_key[:20]) + return "⚡ Force-stopped. The agent was still starting — session unlocked." # Queue the message so it will be picked up after the # agent starts. adapter = self.adapters.get(source.platform) @@ -2729,17 +2753,32 @@ class GatewayRunner: return "\n".join(lines) async def _handle_stop_command(self, event: MessageEvent) -> str: - """Handle /stop command - interrupt a running agent.""" + """Handle /stop command - interrupt a running agent. + + When an agent is truly hung (blocked thread that never checks + _interrupt_requested), the early intercept in _handle_message() + handles /stop before this method is reached. This handler fires + only through normal command dispatch (no running agent) or as a + fallback. Force-clean the session lock in all cases for safety. + """ source = event.source session_entry = self.session_store.get_or_create_session(source) session_key = session_entry.session_key agent = self._running_agents.get(session_key) if agent is _AGENT_PENDING_SENTINEL: - return "⏳ The agent is still starting up — nothing to stop yet." + # Force-clean the sentinel so the session is unlocked. + if session_key in self._running_agents: + del self._running_agents[session_key] + logger.info("HARD STOP (pending) for session %s — sentinel cleared", session_key[:20]) + return "⚡ Force-stopped. The agent was still starting — session unlocked." if agent: - agent.interrupt() - return "⚡ Stopping the current task... The agent will finish its current step and respond." + agent.interrupt("Stop requested") + # Force-clean the session lock so a truly hung agent doesn't + # keep it locked forever. + if session_key in self._running_agents: + del self._running_agents[session_key] + return "⚡ Force-stopped. The session is unlocked — you can send a new message." else: return "No active task to stop." diff --git a/tests/gateway/test_session_race_guard.py b/tests/gateway/test_session_race_guard.py index 3c11a1a31..9e05a19dc 100644 --- a/tests/gateway/test_session_race_guard.py +++ b/tests/gateway/test_session_race_guard.py @@ -198,12 +198,12 @@ async def test_command_messages_do_not_leave_sentinel(): # ------------------------------------------------------------------ -# Test 6: /stop during sentinel returns helpful message +# Test 6: /stop during sentinel force-cleans and unlocks session # ------------------------------------------------------------------ @pytest.mark.asyncio -async def test_stop_during_sentinel_returns_message(): +async def test_stop_during_sentinel_force_cleans_session(): """If /stop arrives while the sentinel is set (agent still starting), - it should return a helpful message instead of crashing or queuing.""" + it should force-clean the sentinel and unlock the session.""" runner = _make_runner() event1 = _make_event(text="hello") session_key = build_session_key(event1.source) @@ -221,11 +221,16 @@ async def test_stop_during_sentinel_returns_message(): # Sentinel should be set assert runner._running_agents.get(session_key) is _AGENT_PENDING_SENTINEL - # Send /stop — should get a message, not crash + # Send /stop — should force-clean the sentinel stop_event = _make_event(text="/stop") result = await runner._handle_message(stop_event) assert result is not None, "/stop during sentinel should return a message" - assert "starting up" in result.lower() + assert "force-stopped" in result.lower() or "unlocked" in result.lower() + + # Sentinel must be cleaned up + assert session_key not in runner._running_agents, ( + "/stop must remove sentinel so the session is unlocked" + ) # Should NOT be queued as pending adapter = runner.adapters[Platform.TELEGRAM] @@ -235,6 +240,73 @@ async def test_stop_during_sentinel_returns_message(): await task1 +# ------------------------------------------------------------------ +# Test 6b: /stop hard-kills a running agent and unlocks session +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_stop_hard_kills_running_agent(): + """When /stop arrives while a real agent is running, it must: + 1. Call interrupt() on the agent + 2. Force-clean _running_agents to unlock the session + 3. Return a confirmation message + This fixes the bug where a hung agent kept the session locked + forever — showing 'writing...' but never producing output.""" + runner = _make_runner() + session_key = build_session_key( + SessionSource(platform=Platform.TELEGRAM, chat_id="12345", chat_type="dm") + ) + + # Simulate a running (possibly hung) agent + fake_agent = MagicMock() + runner._running_agents[session_key] = fake_agent + + # Send /stop + stop_event = _make_event(text="/stop") + result = await runner._handle_message(stop_event) + + # Agent must have been interrupted + fake_agent.interrupt.assert_called_once_with("Stop requested") + + # Session must be unlocked + assert session_key not in runner._running_agents, ( + "/stop must remove the agent from _running_agents so the session is unlocked" + ) + + # Must return a confirmation + assert result is not None + assert "force-stopped" in result.lower() or "unlocked" in result.lower() + + +# ------------------------------------------------------------------ +# Test 6c: /stop clears pending messages to prevent stale replays +# ------------------------------------------------------------------ +@pytest.mark.asyncio +async def test_stop_clears_pending_messages(): + """When /stop hard-kills a running agent, any pending messages + queued during the run must be discarded.""" + runner = _make_runner() + session_key = build_session_key( + SessionSource(platform=Platform.TELEGRAM, chat_id="12345", chat_type="dm") + ) + + fake_agent = MagicMock() + runner._running_agents[session_key] = fake_agent + runner._pending_messages[session_key] = "some queued text" + + # Queue a pending message in the adapter too + adapter = runner.adapters[Platform.TELEGRAM] + adapter._pending_messages[session_key] = _make_event(text="queued") + adapter.get_pending_message = MagicMock(return_value=_make_event(text="queued")) + adapter.has_pending_interrupt = MagicMock(return_value=False) + + stop_event = _make_event(text="/stop") + await runner._handle_message(stop_event) + + # Pending messages must be cleared + assert session_key not in runner._pending_messages + adapter.get_pending_message.assert_called_once_with(session_key) + + # ------------------------------------------------------------------ # Test 7: Shutdown skips sentinel entries # ------------------------------------------------------------------