diff --git a/cron/scheduler.py b/cron/scheduler.py index 2ca012ea05..27690ac5e2 100644 --- a/cron/scheduler.py +++ b/cron/scheduler.py @@ -1308,6 +1308,17 @@ def tick(verbose: bool = True, adapters=None, loop=None) -> int: _futures.append(_tick_pool.submit(_ctx.run, _process_job, job)) _results.extend(f.result() for f in _futures) + # Best-effort sweep of MCP stdio subprocesses that survived their + # session teardown during this tick. Runs AFTER every job has + # finished so active sessions (including live user chats) are + # never touched — only PIDs explicitly detected as orphans in + # tools.mcp_tool._run_stdio's finally block are reaped. + try: + from tools.mcp_tool import _kill_orphaned_mcp_children + _kill_orphaned_mcp_children() + except Exception as _e: + logger.debug("Post-tick MCP orphan cleanup failed: %s", _e) + return sum(_results) finally: if fcntl: diff --git a/tests/tools/test_mcp_stability.py b/tests/tools/test_mcp_stability.py index 7a500dad51..2cee822e3e 100644 --- a/tests/tools/test_mcp_stability.py +++ b/tests/tools/test_mcp_stability.py @@ -81,37 +81,51 @@ class TestStdioPidTracking: def test_kill_orphaned_noop_when_empty(self): """_kill_orphaned_mcp_children does nothing when no PIDs tracked.""" - from tools.mcp_tool import _kill_orphaned_mcp_children, _stdio_pids, _lock + from tools.mcp_tool import ( + _kill_orphaned_mcp_children, + _orphan_stdio_pids, + _stdio_pids, + _lock, + ) with _lock: _stdio_pids.clear() + _orphan_stdio_pids.clear() # Should not raise _kill_orphaned_mcp_children() def test_kill_orphaned_handles_dead_pids(self): """_kill_orphaned_mcp_children gracefully handles already-dead PIDs.""" - from tools.mcp_tool import _kill_orphaned_mcp_children, _stdio_pids, _lock + from tools.mcp_tool import ( + _kill_orphaned_mcp_children, + _orphan_stdio_pids, + _lock, + ) # Use a PID that definitely doesn't exist fake_pid = 999999999 with _lock: - _stdio_pids[fake_pid] = "test" + _orphan_stdio_pids.add(fake_pid) # Should not raise (ProcessLookupError is caught) _kill_orphaned_mcp_children() with _lock: - assert fake_pid not in _stdio_pids + assert fake_pid not in _orphan_stdio_pids def test_kill_orphaned_uses_sigkill_when_available(self, monkeypatch): """SIGTERM-first then SIGKILL after 2s for orphan cleanup.""" - from tools.mcp_tool import _kill_orphaned_mcp_children, _stdio_pids, _lock + from tools.mcp_tool import ( + _kill_orphaned_mcp_children, + _orphan_stdio_pids, + _lock, + ) fake_pid = 424242 with _lock: - _stdio_pids.clear() - _stdio_pids[fake_pid] = "test" + _orphan_stdio_pids.clear() + _orphan_stdio_pids.add(fake_pid) fake_sigkill = 9 monkeypatch.setattr(signal, "SIGKILL", fake_sigkill, raising=False) @@ -128,16 +142,20 @@ class TestStdioPidTracking: mock_sleep.assert_called_once_with(2) with _lock: - assert fake_pid not in _stdio_pids + assert fake_pid not in _orphan_stdio_pids def test_kill_orphaned_falls_back_without_sigkill(self, monkeypatch): """Without SIGKILL, SIGTERM is used for both phases.""" - from tools.mcp_tool import _kill_orphaned_mcp_children, _stdio_pids, _lock + from tools.mcp_tool import ( + _kill_orphaned_mcp_children, + _orphan_stdio_pids, + _lock, + ) fake_pid = 434343 with _lock: - _stdio_pids.clear() - _stdio_pids[fake_pid] = "test" + _orphan_stdio_pids.clear() + _orphan_stdio_pids.add(fake_pid) monkeypatch.delattr(signal, "SIGKILL", raising=False) @@ -150,7 +168,7 @@ class TestStdioPidTracking: assert mock_sleep.called with _lock: - assert fake_pid not in _stdio_pids + assert fake_pid not in _orphan_stdio_pids # --------------------------------------------------------------------------- diff --git a/tools/mcp_tool.py b/tools/mcp_tool.py index 565dbfca0e..e02219d7bc 100644 --- a/tools/mcp_tool.py +++ b/tools/mcp_tool.py @@ -1044,33 +1044,51 @@ class MCPServerTask: # Snapshot child PIDs before spawning so we can track the new one. pids_before = _snapshot_child_pids() + new_pids: set = set() # Redirect subprocess stderr into a shared log file so MCP servers # (FastMCP banners, slack-mcp startup JSON, etc.) don't dump onto # the user's TTY and corrupt the TUI. Preserves debuggability via # ~/.hermes/logs/mcp-stderr.log. _write_stderr_log_header(self.name) _errlog = _get_mcp_stderr_log() - async with stdio_client(server_params, errlog=_errlog) as (read_stream, write_stream): - # Capture the newly spawned subprocess PID for force-kill cleanup. - new_pids = _snapshot_child_pids() - pids_before + try: + async with stdio_client(server_params, errlog=_errlog) as ( + read_stream, + write_stream, + ): + # Capture the newly spawned subprocess PID for force-kill cleanup. + new_pids = _snapshot_child_pids() - pids_before + if new_pids: + with _lock: + for _pid in new_pids: + _stdio_pids[_pid] = self.name + async with ClientSession( + read_stream, write_stream, **sampling_kwargs + ) as session: + await session.initialize() + self.session = session + await self._discover_tools() + self._ready.set() + # stdio transport does not use OAuth, but we still honor + # _reconnect_event (e.g. future manual /mcp refresh) for + # consistency with _run_http. + await self._wait_for_lifecycle_event() + finally: + # Runs on clean exit, exceptions, AND asyncio cancellation. + # If any of the spawned PIDs are still alive, the SDK's + # teardown failed (common when the task is cancelled mid-way + # on Linux, where setsid() children escape the parent cgroup). + # Mark them as orphans so the next cleanup sweep can reap them. if new_pids: with _lock: for _pid in new_pids: - _stdio_pids[_pid] = self.name - async with ClientSession(read_stream, write_stream, **sampling_kwargs) as session: - await session.initialize() - self.session = session - await self._discover_tools() - self._ready.set() - # stdio transport does not use OAuth, but we still honor - # _reconnect_event (e.g. future manual /mcp refresh) for - # consistency with _run_http. - await self._wait_for_lifecycle_event() - # Context exited cleanly — subprocess was terminated by the SDK. - if new_pids: - with _lock: - for _pid in new_pids: - _stdio_pids.pop(_pid, None) + _stdio_pids.pop(_pid, None) + for pid in new_pids: + try: + os.kill(pid, 0) # signal 0: probe liveness only + except (ProcessLookupError, PermissionError, OSError): + continue # process already exited — nothing to do + _orphan_stdio_pids.add(pid) async def _run_http(self, config: dict): """Run the server using HTTP/StreamableHTTP transport.""" @@ -1718,6 +1736,13 @@ _lock = threading.Lock() # normal server shutdown. _stdio_pids: Dict[int, str] = {} # pid -> server_name +# PIDs that survived their session context exit (SDK teardown failed to +# terminate them). These are detected in _run_stdio's finally block and +# can be cleaned up asynchronously by _kill_orphaned_mcp_children(). +# Separate from _stdio_pids so cleanup sweeps never race with active +# sessions (e.g. concurrent cron jobs or live user chats). +_orphan_stdio_pids: set = set() + def _snapshot_child_pids() -> set: """Return a set of current child process PIDs. @@ -2959,21 +2984,34 @@ def shutdown_mcp_servers(): _stop_mcp_loop() -def _kill_orphaned_mcp_children() -> None: - """Graceful shutdown of MCP stdio subprocesses that survived loop cleanup. +def _kill_orphaned_mcp_children(include_active: bool = False) -> None: + """Best-effort graceful shutdown of stdio MCP subprocesses to reap orphans. - Sends SIGTERM first, waits 2 seconds, then escalates to SIGKILL. - This prevents shared-resource collisions when multiple hermes processes - run on the same host (each has its own _stdio_pids dict). + Orphans are PIDs that survived their session context exit (SDK teardown + did not terminate the process — common on Linux when stdio children escape + the parent cgroup on cancellation). By default only entries in + ``_orphan_stdio_pids`` are reaped so concurrent cron jobs and live user + sessions are not disrupted. - Only kills PIDs tracked in ``_stdio_pids`` — never arbitrary children. + Sends SIGTERM, waits 2 seconds, then escalates to SIGKILL for any + survivors, avoiding shared-resource collisions when multiple hermes + processes run on the same host (each has its own ``_stdio_pids`` dict). + + With ``include_active=True`` also kills every PID in ``_stdio_pids`` — + used only at final shutdown, after the MCP event loop has stopped and no + sessions can still be in flight. """ import signal as _signal import time as _time with _lock: - pids = dict(_stdio_pids) - _stdio_pids.clear() + pids: Dict[int, str] = {} + for opid in _orphan_stdio_pids: + pids[opid] = "orphan" + _orphan_stdio_pids.clear() + if include_active: + pids.update(dict(_stdio_pids)) + _stdio_pids.clear() # Fast path: no tracked stdio PIDs to reap. Skip the SIGTERM/sleep/SIGKILL # dance entirely — otherwise every MCP-free shutdown pays a 2s sleep tax. @@ -3022,5 +3060,6 @@ def _stop_mcp_loop(): except Exception: pass # After closing the loop, any stdio subprocesses that survived the - # graceful shutdown are now orphaned. Force-kill them. - _kill_orphaned_mcp_children() + # graceful shutdown are now orphaned — include active PIDs too + # since the loop is gone and no session can still be in flight. + _kill_orphaned_mcp_children(include_active=True)