diff --git a/tests/tools/test_mcp_circuit_breaker.py b/tests/tools/test_mcp_circuit_breaker.py index 0173fa52afe..4f5a89477d0 100644 --- a/tests/tools/test_mcp_circuit_breaker.py +++ b/tests/tools/test_mcp_circuit_breaker.py @@ -179,6 +179,107 @@ def test_circuit_breaker_reopens_on_probe_failure(monkeypatch, tmp_path): _cleanup(mcp_tool, "srv") +def test_half_open_probe_on_dead_session_requests_reconnect(monkeypatch, tmp_path): + """A half-open probe against a server with no live session must request + a transport reconnect and return a clean error — NOT write into a dead + pipe or permanently re-arm the breaker. + + This is the #16788 wedge: a dead stdio subprocess leaves ``session=None`` + (the run loop parked after exhausting retries). The old handler bumped + the breaker every cooldown forever; the fix signals ``_reconnect_event`` + so the parked task revives and rebuilds the transport. + """ + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + from tools import mcp_tool + from tools.mcp_tool import _make_tool_handler + + server = _install_stub_server(mcp_tool, "srv", None) + # Simulate a dead/parked transport: no live session. + server.session = None + # Drive _signal_reconnect down its direct .set() path (no live loop). + monkeypatch.setattr(mcp_tool, "_mcp_loop", None) + + try: + mcp_tool._server_error_counts["srv"] = mcp_tool._CIRCUIT_BREAKER_THRESHOLD + fake_now = [1000.0] + + def _fake_monotonic(): + return fake_now[0] + + monkeypatch.setattr(mcp_tool.time, "monotonic", _fake_monotonic) + mcp_tool._server_breaker_opened_at["srv"] = fake_now[0] + cooldown = getattr(mcp_tool, "_CIRCUIT_BREAKER_COOLDOWN_SEC", 60.0) + + # Advance past cooldown → next call is a half-open probe. + fake_now[0] += cooldown + 1.0 + + handler = _make_tool_handler("srv", "tool1", 10.0) + result = handler({}) + parsed = json.loads(result) + + # Clean "reconnecting" error, and a reconnect was actually signalled. + assert "reconnect" in parsed.get("error", "").lower(), parsed + server._reconnect_event.set.assert_called_once() + finally: + _cleanup(mcp_tool, "srv") + + +def test_half_open_dead_session_recovers_after_reconnect(monkeypatch, tmp_path): + """Once the transport comes back (session repopulated + breaker reset by + the run loop), the next call must go straight through — proving the wedge + is escapable, not just deferred. + """ + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + from tools import mcp_tool + from tools.mcp_tool import _make_tool_handler + + async def _call_tool_success(*a, **kw): + result = MagicMock() + result.isError = False + block = MagicMock() + block.text = "ok" + result.content = [block] + result.structuredContent = None + return result + + server = _install_stub_server(mcp_tool, "srv", _call_tool_success) + server.session = None # transport down at first + monkeypatch.setattr(mcp_tool, "_mcp_loop", None) + mcp_tool._ensure_mcp_loop() + + try: + mcp_tool._server_error_counts["srv"] = mcp_tool._CIRCUIT_BREAKER_THRESHOLD + fake_now = [1000.0] + monkeypatch.setattr(mcp_tool.time, "monotonic", lambda: fake_now[0]) + mcp_tool._server_breaker_opened_at["srv"] = fake_now[0] + cooldown = getattr(mcp_tool, "_CIRCUIT_BREAKER_COOLDOWN_SEC", 60.0) + fake_now[0] += cooldown + 1.0 + + handler = _make_tool_handler("srv", "tool1", 10.0) + + # Probe 1: transport down → reconnect requested, clean error. + parsed = json.loads(handler({})) + assert "reconnect" in parsed.get("error", "").lower(), parsed + + # Simulate the run loop rebuilding the session + resetting the breaker + # (what _run_stdio does on successful re-init). + live = MagicMock() + live.call_tool = _call_tool_success + server.session = live + mcp_tool._reset_server_error("srv") + + # Advance past the re-armed cooldown so the next call is a fresh probe. + fake_now[0] += cooldown + 1.0 + + # Next call goes straight through. + parsed = json.loads(handler({})) + assert parsed.get("result") == "ok", parsed + finally: + _cleanup(mcp_tool, "srv") + + def test_circuit_breaker_cleared_on_reconnect(monkeypatch, tmp_path): """When the auth-recovery path successfully reconnects the server, the breaker should be cleared so subsequent calls aren't gated on a @@ -250,3 +351,98 @@ def test_circuit_breaker_cleared_on_reconnect(monkeypatch, tmp_path): ) finally: _cleanup(mcp_tool, "srv") + + +def test_run_loop_parks_instead_of_exiting_then_revives(monkeypatch, tmp_path): + """The run loop must NOT exit when the reconnect budget is exhausted. + + It deregisters tools and parks as a dormant listener; a later + ``_reconnect_event`` revives it and re-enters the transport. This is the + structural fix for #16788 — without a live task, no half-open probe could + ever bring a dead stdio server back. + """ + import asyncio + + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + from tools import mcp_tool + from tools.mcp_tool import MCPServerTask + + # Shrink the budget and collapse backoff sleeps (but still yield control + # to the loop) so the test runs fast without starving the scheduler. + monkeypatch.setattr(mcp_tool, "_MAX_RECONNECT_RETRIES", 2) + + _real_sleep = asyncio.sleep + + async def _fast_sleep(_delay, *a, **kw): + await _real_sleep(0) + + monkeypatch.setattr(mcp_tool.asyncio, "sleep", _fast_sleep) + + state = {"transport_calls": 0, "deregistered": 0, "revived": False} + + async def _scenario(): + class _Task(MCPServerTask): + def _is_http(self): + return False + + def _deregister_tools(self): + state["deregistered"] += 1 + self._registered_tool_names = [] + + async def _run_stdio(self, config): + state["transport_calls"] += 1 + # First connect succeeds (sets _ready) then immediately + # fails, as if the subprocess died — the post-ready failure + # path that counts toward the reconnect budget. + if state["transport_calls"] == 1: + self.session = object() + self._ready.set() + self.session = None + raise RuntimeError("subprocess died") + # Keep failing until the budget is exhausted and the loop + # parks, UNLESS we've been revived after parking. + if state["revived"]: + self.session = object() + self._ready.set() + await self._wait_for_lifecycle_event() + return + raise RuntimeError("still down") + + task = _Task("srv") + task._registered_tool_names = ["srv__tool"] + + run_task = asyncio.ensure_future(task.run({"command": "x"})) + + # Wait until the loop has parked (it deregisters tools right before + # blocking on _wait_for_reconnect_or_shutdown). + for _ in range(500): + await _real_sleep(0) + if state["deregistered"] >= 1: + break + # Give the loop one more tick to settle into the park wait. + await _real_sleep(0) + assert not run_task.done(), "run loop exited instead of parking" + assert state["deregistered"] >= 1, "tools not deregistered on park" + + # Revive it: a reconnect signal must wake the parked task. + state["revived"] = True + before = state["transport_calls"] + task._reconnect_event.set() + for _ in range(500): + await _real_sleep(0) + if state["transport_calls"] > before: + break + assert state["transport_calls"] > before, ( + "parked task did not re-enter transport on reconnect signal" + ) + + # Clean shutdown. + task._shutdown_event.set() + task._reconnect_event.set() + try: + await asyncio.wait_for(run_task, timeout=2) + except (asyncio.TimeoutError, asyncio.CancelledError, Exception): + run_task.cancel() + + asyncio.run(_scenario()) diff --git a/tools/mcp_tool.py b/tools/mcp_tool.py index dead8ca2046..6b73fac7ad4 100644 --- a/tools/mcp_tool.py +++ b/tools/mcp_tool.py @@ -1759,6 +1759,41 @@ class MCPServerTask: self._reconnect_event.clear() return "reconnect" + async def _wait_for_reconnect_or_shutdown(self) -> str: + """Block until a reconnect or shutdown is requested while parked. + + Used by :meth:`run` after the reconnect budget is exhausted. The + task stays alive (so ``_reconnect_event`` always has a listener) but + does no work until something explicitly asks it to come back — + the circuit-breaker half-open probe, OAuth recovery, or a manual + ``/mcp`` refresh. + + Returns: + ``"shutdown"`` if the server should exit the run loop entirely, + ``"reconnect"`` if it should rebuild the transport. The reconnect + event is cleared before returning so the next park cycle starts + from a fresh signal. Shutdown takes precedence. + """ + shutdown_task = asyncio.ensure_future(self._shutdown_event.wait()) + reconnect_task = asyncio.ensure_future(self._reconnect_event.wait()) + try: + await asyncio.wait( + {shutdown_task, reconnect_task}, + return_when=asyncio.FIRST_COMPLETED, + ) + finally: + for t in (shutdown_task, reconnect_task): + if not t.done(): + t.cancel() + try: + await t + except (asyncio.CancelledError, Exception): + pass + if self._shutdown_event.is_set(): + return "shutdown" + self._reconnect_event.clear() + return "reconnect" + async def _run_stdio(self, config: dict): """Run the server using stdio transport.""" if not _MCP_AVAILABLE: @@ -1857,6 +1892,10 @@ class MCPServerTask: self.session = session await self._discover_tools() self._ready.set() + # Session is live again: clear any breaker state from a + # prior outage so the first call after recovery isn't + # gated on a stale consecutive-failure count (#16788). + _reset_server_error(self.name) # stdio transport does not use OAuth, but we still honor # _reconnect_event (e.g. future manual /mcp refresh) for # consistency with _run_http. @@ -2088,6 +2127,10 @@ class MCPServerTask: self.session = session await self._discover_tools() self._ready.set() + # Session is live again: clear any breaker state from a + # prior outage so the first call after recovery isn't + # gated on a stale consecutive-failure count (#16788). + _reset_server_error(self.name) reason = await self._wait_for_lifecycle_event() if reason == "reconnect": logger.info( @@ -2137,6 +2180,10 @@ class MCPServerTask: self.session = session await self._discover_tools() self._ready.set() + # Session is live again: clear any breaker state from + # a prior outage so the first call after recovery + # isn't gated on a stale failure count (#16788). + _reset_server_error(self.name) reason = await self._wait_for_lifecycle_event() if reason == "reconnect": logger.info( @@ -2160,6 +2207,10 @@ class MCPServerTask: self.session = session await self._discover_tools() self._ready.set() + # Session is live again: clear any breaker state from a + # prior outage so the first call after recovery isn't + # gated on a stale consecutive-failure count (#16788). + _reset_server_error(self.name) reason = await self._wait_for_lifecycle_event() if reason == "reconnect": logger.info( @@ -2298,6 +2349,15 @@ class MCPServerTask: "manual refresh)", self.name, ) + # A clean transport return only happens after a session was + # successfully established and then asked to rebuild (auth + # recovery / manual refresh / breaker-driven reconnect). That + # is proof the server is reachable, so clear the consecutive- + # failure budget — otherwise transient drops accumulated over + # a long-lived session would eventually exhaust it and + # permanently kill an otherwise-healthy server. + retries = 0 + backoff = 1.0 # Reset the session reference; _run_http/_run_stdio will # repopulate it on successful re-entry. self.session = None @@ -2374,10 +2434,32 @@ class MCPServerTask: if retries > _MAX_RECONNECT_RETRIES: logger.warning( "MCP server '%s' failed after %d reconnection attempts, " - "giving up: %s", + "parking until a reconnect is requested: %s", self.name, _MAX_RECONNECT_RETRIES, exc, ) - return + # Do NOT return — exiting the task orphans the server: + # nothing would ever listen for _reconnect_event again, + # so a half-open circuit-breaker probe could never revive + # it and the server would be permanently wedged for the + # life of the process (#16788). Instead, drop the phantom + # tools from the registry and park as a dormant listener. + # A future _reconnect_event.set() — from the breaker's + # half-open probe, OAuth recovery, or a manual /mcp + # refresh — wakes us to rebuild the transport (respawning + # a dead stdio subprocess in the process). + self._deregister_tools() + self._reconnect_event.clear() + parked = await self._wait_for_reconnect_or_shutdown() + if parked == "shutdown": + return + logger.info( + "MCP server '%s': reconnect requested while parked; " + "rebuilding transport.", + self.name, + ) + retries = 0 + backoff = 1.0 + continue logger.warning( "MCP server '%s' connection lost (attempt %d/%d), " @@ -2403,8 +2485,6 @@ class MCPServerTask: async def shutdown(self): """Signal the Task to exit and wait for clean resource teardown.""" - from tools.registry import registry - self._shutdown_event.set() # Defensive: if _wait_for_lifecycle_event is blocking, we need ANY # event to unblock it. _shutdown_event alone is sufficient (the @@ -2430,11 +2510,24 @@ class MCPServerTask: task.cancel() await asyncio.gather(*self._pending_refresh_tasks, return_exceptions=True) self._pending_refresh_tasks.clear() + self._deregister_tools() + self.session = None + + def _deregister_tools(self) -> None: + """Drop this server's tools from the global registry (idempotent). + + Pulls the server's tool schemas out of the registry so the agent + stops advertising them to the model. Called on shutdown AND when the + reconnect budget is exhausted, so a dead server never leaves phantom + tool definitions bloating the prompt cache and producing "not + connected" errors on every turn. + """ + from tools.registry import registry + for tool_name in list(getattr(self, "_registered_tool_names", [])): registry.deregister(tool_name) _forget_mcp_tool_server(tool_name) self._registered_tool_names = [] - self.session = None # --------------------------------------------------------------------------- @@ -2491,6 +2584,31 @@ def _reset_server_error(server_name: str) -> None: _server_error_counts[server_name] = 0 _server_breaker_opened_at.pop(server_name, None) + +def _signal_reconnect(server: Any) -> bool: + """Ask a server task to rebuild its transport, thread-safely. + + The tool handlers run on caller threads, while the server task and its + ``_reconnect_event`` live on the background MCP loop. Setting an + asyncio.Event from another thread must go through + ``loop.call_soon_threadsafe``; only fall back to a direct ``.set()`` + when the loop isn't running (e.g. unit tests that drive the handler + synchronously). + + Returns True if a reconnect signal was delivered, False if the server + has no reconnect machinery (nothing to revive). + """ + event = getattr(server, "_reconnect_event", None) + if event is None: + return False + loop = _mcp_loop + if loop is not None and loop.is_running(): + loop.call_soon_threadsafe(event.set) + else: + event.set() + return True + + # --------------------------------------------------------------------------- # Auth-failure detection helpers (Task 6 of MCP OAuth consolidation) # --------------------------------------------------------------------------- @@ -3176,12 +3294,35 @@ def _make_tool_handler(server_name: str, tool_name: str, tool_timeout: float): with _lock: server = _servers.get(server_name) - if not server or not server.session: + if not server: _bump_server_error(server_name) return json.dumps({ "error": f"MCP server '{server_name}' is not connected" }, ensure_ascii=False) + if not server.session: + # No live session — the server task is reconnecting, or it has + # exhausted its retry budget and parked (e.g. a dead stdio + # subprocess). Probing here would write into a dead/absent + # transport and re-arm the breaker forever (#16788). Instead, + # ask the (always-present) server task to rebuild the transport + # — which respawns a dead stdio subprocess — and return a clean + # "reconnecting" error so the model backs off without burning + # iterations. The breaker resets once the fresh session + # initializes (_run_stdio/_run_http call _reset_server_error). + _bump_server_error(server_name) + if _signal_reconnect(server): + return json.dumps({ + "error": ( + f"MCP server '{server_name}' transport is down; " + f"reconnect requested. Do NOT retry this tool " + f"immediately — give it a few seconds to come back." + ) + }, ensure_ascii=False) + return json.dumps({ + "error": f"MCP server '{server_name}' is not connected" + }, ensure_ascii=False) + async def _call(): async with server._rpc_lock: # Snapshot the agent's context so an elicitation callback