mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-07-01 12:02:05 +00:00
fix(mcp): never permanently wedge the circuit breaker on a dead transport (#53599)
A long-running gateway session could permanently lose an MCP server: once a stdio subprocess died (or transient drops accumulated over the session), the run loop exhausted its reconnect budget and returned, orphaning the task. With no listener for _reconnect_event, the circuit breaker's half-open probe could never revive the server — every probe hit a dead/absent session, re-armed the 60s cooldown, and looped forever until a full gateway restart (#16788). Root cause was split ownership of transport liveness between the run loop and the tool handler, plus a permanent give-up path. Fixed by one invariant: a non-shutdown server task is always reconnectable. - run loop parks (deregisters phantom tools, then awaits _reconnect_event) instead of returning when the reconnect budget is exhausted, so the task stays alive as a dormant listener - retry budget resets on every successful (re)connect, so a healthy long-lived server can't accumulate lifetime drops into a death sentence - half-open probe with no live session signals a reconnect (reviving a parked/dead task and respawning a dead stdio subprocess) and returns a clean 'reconnecting' error instead of writing into a dead pipe - breaker resets on successful session init across all transports (stdio/HTTP/SSE) — fully transport-agnostic, no PID/pipe polling Builds on the closed-PR cluster for this issue: keeps #49255's deregister-on- exhaustion insight and #21006's signal-don't-probe insight, discards the racy os.kill PID machinery. Co-authored-by: LeonSGP43 <LeonSGP43@users.noreply.github.com> Co-authored-by: srojk34 <srojk34@users.noreply.github.com>
This commit is contained in:
parent
dbc925b755
commit
88c02469cc
2 changed files with 343 additions and 6 deletions
|
|
@ -179,6 +179,107 @@ def test_circuit_breaker_reopens_on_probe_failure(monkeypatch, tmp_path):
|
|||
_cleanup(mcp_tool, "srv")
|
||||
|
||||
|
||||
def test_half_open_probe_on_dead_session_requests_reconnect(monkeypatch, tmp_path):
|
||||
"""A half-open probe against a server with no live session must request
|
||||
a transport reconnect and return a clean error — NOT write into a dead
|
||||
pipe or permanently re-arm the breaker.
|
||||
|
||||
This is the #16788 wedge: a dead stdio subprocess leaves ``session=None``
|
||||
(the run loop parked after exhausting retries). The old handler bumped
|
||||
the breaker every cooldown forever; the fix signals ``_reconnect_event``
|
||||
so the parked task revives and rebuilds the transport.
|
||||
"""
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
|
||||
from tools import mcp_tool
|
||||
from tools.mcp_tool import _make_tool_handler
|
||||
|
||||
server = _install_stub_server(mcp_tool, "srv", None)
|
||||
# Simulate a dead/parked transport: no live session.
|
||||
server.session = None
|
||||
# Drive _signal_reconnect down its direct .set() path (no live loop).
|
||||
monkeypatch.setattr(mcp_tool, "_mcp_loop", None)
|
||||
|
||||
try:
|
||||
mcp_tool._server_error_counts["srv"] = mcp_tool._CIRCUIT_BREAKER_THRESHOLD
|
||||
fake_now = [1000.0]
|
||||
|
||||
def _fake_monotonic():
|
||||
return fake_now[0]
|
||||
|
||||
monkeypatch.setattr(mcp_tool.time, "monotonic", _fake_monotonic)
|
||||
mcp_tool._server_breaker_opened_at["srv"] = fake_now[0]
|
||||
cooldown = getattr(mcp_tool, "_CIRCUIT_BREAKER_COOLDOWN_SEC", 60.0)
|
||||
|
||||
# Advance past cooldown → next call is a half-open probe.
|
||||
fake_now[0] += cooldown + 1.0
|
||||
|
||||
handler = _make_tool_handler("srv", "tool1", 10.0)
|
||||
result = handler({})
|
||||
parsed = json.loads(result)
|
||||
|
||||
# Clean "reconnecting" error, and a reconnect was actually signalled.
|
||||
assert "reconnect" in parsed.get("error", "").lower(), parsed
|
||||
server._reconnect_event.set.assert_called_once()
|
||||
finally:
|
||||
_cleanup(mcp_tool, "srv")
|
||||
|
||||
|
||||
def test_half_open_dead_session_recovers_after_reconnect(monkeypatch, tmp_path):
|
||||
"""Once the transport comes back (session repopulated + breaker reset by
|
||||
the run loop), the next call must go straight through — proving the wedge
|
||||
is escapable, not just deferred.
|
||||
"""
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
|
||||
from tools import mcp_tool
|
||||
from tools.mcp_tool import _make_tool_handler
|
||||
|
||||
async def _call_tool_success(*a, **kw):
|
||||
result = MagicMock()
|
||||
result.isError = False
|
||||
block = MagicMock()
|
||||
block.text = "ok"
|
||||
result.content = [block]
|
||||
result.structuredContent = None
|
||||
return result
|
||||
|
||||
server = _install_stub_server(mcp_tool, "srv", _call_tool_success)
|
||||
server.session = None # transport down at first
|
||||
monkeypatch.setattr(mcp_tool, "_mcp_loop", None)
|
||||
mcp_tool._ensure_mcp_loop()
|
||||
|
||||
try:
|
||||
mcp_tool._server_error_counts["srv"] = mcp_tool._CIRCUIT_BREAKER_THRESHOLD
|
||||
fake_now = [1000.0]
|
||||
monkeypatch.setattr(mcp_tool.time, "monotonic", lambda: fake_now[0])
|
||||
mcp_tool._server_breaker_opened_at["srv"] = fake_now[0]
|
||||
cooldown = getattr(mcp_tool, "_CIRCUIT_BREAKER_COOLDOWN_SEC", 60.0)
|
||||
fake_now[0] += cooldown + 1.0
|
||||
|
||||
handler = _make_tool_handler("srv", "tool1", 10.0)
|
||||
|
||||
# Probe 1: transport down → reconnect requested, clean error.
|
||||
parsed = json.loads(handler({}))
|
||||
assert "reconnect" in parsed.get("error", "").lower(), parsed
|
||||
|
||||
# Simulate the run loop rebuilding the session + resetting the breaker
|
||||
# (what _run_stdio does on successful re-init).
|
||||
live = MagicMock()
|
||||
live.call_tool = _call_tool_success
|
||||
server.session = live
|
||||
mcp_tool._reset_server_error("srv")
|
||||
|
||||
# Advance past the re-armed cooldown so the next call is a fresh probe.
|
||||
fake_now[0] += cooldown + 1.0
|
||||
|
||||
# Next call goes straight through.
|
||||
parsed = json.loads(handler({}))
|
||||
assert parsed.get("result") == "ok", parsed
|
||||
finally:
|
||||
_cleanup(mcp_tool, "srv")
|
||||
|
||||
|
||||
def test_circuit_breaker_cleared_on_reconnect(monkeypatch, tmp_path):
|
||||
"""When the auth-recovery path successfully reconnects the server,
|
||||
the breaker should be cleared so subsequent calls aren't gated on a
|
||||
|
|
@ -250,3 +351,98 @@ def test_circuit_breaker_cleared_on_reconnect(monkeypatch, tmp_path):
|
|||
)
|
||||
finally:
|
||||
_cleanup(mcp_tool, "srv")
|
||||
|
||||
|
||||
def test_run_loop_parks_instead_of_exiting_then_revives(monkeypatch, tmp_path):
|
||||
"""The run loop must NOT exit when the reconnect budget is exhausted.
|
||||
|
||||
It deregisters tools and parks as a dormant listener; a later
|
||||
``_reconnect_event`` revives it and re-enters the transport. This is the
|
||||
structural fix for #16788 — without a live task, no half-open probe could
|
||||
ever bring a dead stdio server back.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
||||
|
||||
from tools import mcp_tool
|
||||
from tools.mcp_tool import MCPServerTask
|
||||
|
||||
# Shrink the budget and collapse backoff sleeps (but still yield control
|
||||
# to the loop) so the test runs fast without starving the scheduler.
|
||||
monkeypatch.setattr(mcp_tool, "_MAX_RECONNECT_RETRIES", 2)
|
||||
|
||||
_real_sleep = asyncio.sleep
|
||||
|
||||
async def _fast_sleep(_delay, *a, **kw):
|
||||
await _real_sleep(0)
|
||||
|
||||
monkeypatch.setattr(mcp_tool.asyncio, "sleep", _fast_sleep)
|
||||
|
||||
state = {"transport_calls": 0, "deregistered": 0, "revived": False}
|
||||
|
||||
async def _scenario():
|
||||
class _Task(MCPServerTask):
|
||||
def _is_http(self):
|
||||
return False
|
||||
|
||||
def _deregister_tools(self):
|
||||
state["deregistered"] += 1
|
||||
self._registered_tool_names = []
|
||||
|
||||
async def _run_stdio(self, config):
|
||||
state["transport_calls"] += 1
|
||||
# First connect succeeds (sets _ready) then immediately
|
||||
# fails, as if the subprocess died — the post-ready failure
|
||||
# path that counts toward the reconnect budget.
|
||||
if state["transport_calls"] == 1:
|
||||
self.session = object()
|
||||
self._ready.set()
|
||||
self.session = None
|
||||
raise RuntimeError("subprocess died")
|
||||
# Keep failing until the budget is exhausted and the loop
|
||||
# parks, UNLESS we've been revived after parking.
|
||||
if state["revived"]:
|
||||
self.session = object()
|
||||
self._ready.set()
|
||||
await self._wait_for_lifecycle_event()
|
||||
return
|
||||
raise RuntimeError("still down")
|
||||
|
||||
task = _Task("srv")
|
||||
task._registered_tool_names = ["srv__tool"]
|
||||
|
||||
run_task = asyncio.ensure_future(task.run({"command": "x"}))
|
||||
|
||||
# Wait until the loop has parked (it deregisters tools right before
|
||||
# blocking on _wait_for_reconnect_or_shutdown).
|
||||
for _ in range(500):
|
||||
await _real_sleep(0)
|
||||
if state["deregistered"] >= 1:
|
||||
break
|
||||
# Give the loop one more tick to settle into the park wait.
|
||||
await _real_sleep(0)
|
||||
assert not run_task.done(), "run loop exited instead of parking"
|
||||
assert state["deregistered"] >= 1, "tools not deregistered on park"
|
||||
|
||||
# Revive it: a reconnect signal must wake the parked task.
|
||||
state["revived"] = True
|
||||
before = state["transport_calls"]
|
||||
task._reconnect_event.set()
|
||||
for _ in range(500):
|
||||
await _real_sleep(0)
|
||||
if state["transport_calls"] > before:
|
||||
break
|
||||
assert state["transport_calls"] > before, (
|
||||
"parked task did not re-enter transport on reconnect signal"
|
||||
)
|
||||
|
||||
# Clean shutdown.
|
||||
task._shutdown_event.set()
|
||||
task._reconnect_event.set()
|
||||
try:
|
||||
await asyncio.wait_for(run_task, timeout=2)
|
||||
except (asyncio.TimeoutError, asyncio.CancelledError, Exception):
|
||||
run_task.cancel()
|
||||
|
||||
asyncio.run(_scenario())
|
||||
|
|
|
|||
|
|
@ -1759,6 +1759,41 @@ class MCPServerTask:
|
|||
self._reconnect_event.clear()
|
||||
return "reconnect"
|
||||
|
||||
async def _wait_for_reconnect_or_shutdown(self) -> str:
|
||||
"""Block until a reconnect or shutdown is requested while parked.
|
||||
|
||||
Used by :meth:`run` after the reconnect budget is exhausted. The
|
||||
task stays alive (so ``_reconnect_event`` always has a listener) but
|
||||
does no work until something explicitly asks it to come back —
|
||||
the circuit-breaker half-open probe, OAuth recovery, or a manual
|
||||
``/mcp`` refresh.
|
||||
|
||||
Returns:
|
||||
``"shutdown"`` if the server should exit the run loop entirely,
|
||||
``"reconnect"`` if it should rebuild the transport. The reconnect
|
||||
event is cleared before returning so the next park cycle starts
|
||||
from a fresh signal. Shutdown takes precedence.
|
||||
"""
|
||||
shutdown_task = asyncio.ensure_future(self._shutdown_event.wait())
|
||||
reconnect_task = asyncio.ensure_future(self._reconnect_event.wait())
|
||||
try:
|
||||
await asyncio.wait(
|
||||
{shutdown_task, reconnect_task},
|
||||
return_when=asyncio.FIRST_COMPLETED,
|
||||
)
|
||||
finally:
|
||||
for t in (shutdown_task, reconnect_task):
|
||||
if not t.done():
|
||||
t.cancel()
|
||||
try:
|
||||
await t
|
||||
except (asyncio.CancelledError, Exception):
|
||||
pass
|
||||
if self._shutdown_event.is_set():
|
||||
return "shutdown"
|
||||
self._reconnect_event.clear()
|
||||
return "reconnect"
|
||||
|
||||
async def _run_stdio(self, config: dict):
|
||||
"""Run the server using stdio transport."""
|
||||
if not _MCP_AVAILABLE:
|
||||
|
|
@ -1857,6 +1892,10 @@ class MCPServerTask:
|
|||
self.session = session
|
||||
await self._discover_tools()
|
||||
self._ready.set()
|
||||
# Session is live again: clear any breaker state from a
|
||||
# prior outage so the first call after recovery isn't
|
||||
# gated on a stale consecutive-failure count (#16788).
|
||||
_reset_server_error(self.name)
|
||||
# stdio transport does not use OAuth, but we still honor
|
||||
# _reconnect_event (e.g. future manual /mcp refresh) for
|
||||
# consistency with _run_http.
|
||||
|
|
@ -2088,6 +2127,10 @@ class MCPServerTask:
|
|||
self.session = session
|
||||
await self._discover_tools()
|
||||
self._ready.set()
|
||||
# Session is live again: clear any breaker state from a
|
||||
# prior outage so the first call after recovery isn't
|
||||
# gated on a stale consecutive-failure count (#16788).
|
||||
_reset_server_error(self.name)
|
||||
reason = await self._wait_for_lifecycle_event()
|
||||
if reason == "reconnect":
|
||||
logger.info(
|
||||
|
|
@ -2137,6 +2180,10 @@ class MCPServerTask:
|
|||
self.session = session
|
||||
await self._discover_tools()
|
||||
self._ready.set()
|
||||
# Session is live again: clear any breaker state from
|
||||
# a prior outage so the first call after recovery
|
||||
# isn't gated on a stale failure count (#16788).
|
||||
_reset_server_error(self.name)
|
||||
reason = await self._wait_for_lifecycle_event()
|
||||
if reason == "reconnect":
|
||||
logger.info(
|
||||
|
|
@ -2160,6 +2207,10 @@ class MCPServerTask:
|
|||
self.session = session
|
||||
await self._discover_tools()
|
||||
self._ready.set()
|
||||
# Session is live again: clear any breaker state from a
|
||||
# prior outage so the first call after recovery isn't
|
||||
# gated on a stale consecutive-failure count (#16788).
|
||||
_reset_server_error(self.name)
|
||||
reason = await self._wait_for_lifecycle_event()
|
||||
if reason == "reconnect":
|
||||
logger.info(
|
||||
|
|
@ -2298,6 +2349,15 @@ class MCPServerTask:
|
|||
"manual refresh)",
|
||||
self.name,
|
||||
)
|
||||
# A clean transport return only happens after a session was
|
||||
# successfully established and then asked to rebuild (auth
|
||||
# recovery / manual refresh / breaker-driven reconnect). That
|
||||
# is proof the server is reachable, so clear the consecutive-
|
||||
# failure budget — otherwise transient drops accumulated over
|
||||
# a long-lived session would eventually exhaust it and
|
||||
# permanently kill an otherwise-healthy server.
|
||||
retries = 0
|
||||
backoff = 1.0
|
||||
# Reset the session reference; _run_http/_run_stdio will
|
||||
# repopulate it on successful re-entry.
|
||||
self.session = None
|
||||
|
|
@ -2374,10 +2434,32 @@ class MCPServerTask:
|
|||
if retries > _MAX_RECONNECT_RETRIES:
|
||||
logger.warning(
|
||||
"MCP server '%s' failed after %d reconnection attempts, "
|
||||
"giving up: %s",
|
||||
"parking until a reconnect is requested: %s",
|
||||
self.name, _MAX_RECONNECT_RETRIES, exc,
|
||||
)
|
||||
return
|
||||
# Do NOT return — exiting the task orphans the server:
|
||||
# nothing would ever listen for _reconnect_event again,
|
||||
# so a half-open circuit-breaker probe could never revive
|
||||
# it and the server would be permanently wedged for the
|
||||
# life of the process (#16788). Instead, drop the phantom
|
||||
# tools from the registry and park as a dormant listener.
|
||||
# A future _reconnect_event.set() — from the breaker's
|
||||
# half-open probe, OAuth recovery, or a manual /mcp
|
||||
# refresh — wakes us to rebuild the transport (respawning
|
||||
# a dead stdio subprocess in the process).
|
||||
self._deregister_tools()
|
||||
self._reconnect_event.clear()
|
||||
parked = await self._wait_for_reconnect_or_shutdown()
|
||||
if parked == "shutdown":
|
||||
return
|
||||
logger.info(
|
||||
"MCP server '%s': reconnect requested while parked; "
|
||||
"rebuilding transport.",
|
||||
self.name,
|
||||
)
|
||||
retries = 0
|
||||
backoff = 1.0
|
||||
continue
|
||||
|
||||
logger.warning(
|
||||
"MCP server '%s' connection lost (attempt %d/%d), "
|
||||
|
|
@ -2403,8 +2485,6 @@ class MCPServerTask:
|
|||
|
||||
async def shutdown(self):
|
||||
"""Signal the Task to exit and wait for clean resource teardown."""
|
||||
from tools.registry import registry
|
||||
|
||||
self._shutdown_event.set()
|
||||
# Defensive: if _wait_for_lifecycle_event is blocking, we need ANY
|
||||
# event to unblock it. _shutdown_event alone is sufficient (the
|
||||
|
|
@ -2430,11 +2510,24 @@ class MCPServerTask:
|
|||
task.cancel()
|
||||
await asyncio.gather(*self._pending_refresh_tasks, return_exceptions=True)
|
||||
self._pending_refresh_tasks.clear()
|
||||
self._deregister_tools()
|
||||
self.session = None
|
||||
|
||||
def _deregister_tools(self) -> None:
|
||||
"""Drop this server's tools from the global registry (idempotent).
|
||||
|
||||
Pulls the server's tool schemas out of the registry so the agent
|
||||
stops advertising them to the model. Called on shutdown AND when the
|
||||
reconnect budget is exhausted, so a dead server never leaves phantom
|
||||
tool definitions bloating the prompt cache and producing "not
|
||||
connected" errors on every turn.
|
||||
"""
|
||||
from tools.registry import registry
|
||||
|
||||
for tool_name in list(getattr(self, "_registered_tool_names", [])):
|
||||
registry.deregister(tool_name)
|
||||
_forget_mcp_tool_server(tool_name)
|
||||
self._registered_tool_names = []
|
||||
self.session = None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
@ -2491,6 +2584,31 @@ def _reset_server_error(server_name: str) -> None:
|
|||
_server_error_counts[server_name] = 0
|
||||
_server_breaker_opened_at.pop(server_name, None)
|
||||
|
||||
|
||||
def _signal_reconnect(server: Any) -> bool:
|
||||
"""Ask a server task to rebuild its transport, thread-safely.
|
||||
|
||||
The tool handlers run on caller threads, while the server task and its
|
||||
``_reconnect_event`` live on the background MCP loop. Setting an
|
||||
asyncio.Event from another thread must go through
|
||||
``loop.call_soon_threadsafe``; only fall back to a direct ``.set()``
|
||||
when the loop isn't running (e.g. unit tests that drive the handler
|
||||
synchronously).
|
||||
|
||||
Returns True if a reconnect signal was delivered, False if the server
|
||||
has no reconnect machinery (nothing to revive).
|
||||
"""
|
||||
event = getattr(server, "_reconnect_event", None)
|
||||
if event is None:
|
||||
return False
|
||||
loop = _mcp_loop
|
||||
if loop is not None and loop.is_running():
|
||||
loop.call_soon_threadsafe(event.set)
|
||||
else:
|
||||
event.set()
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Auth-failure detection helpers (Task 6 of MCP OAuth consolidation)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
|
@ -3176,12 +3294,35 @@ def _make_tool_handler(server_name: str, tool_name: str, tool_timeout: float):
|
|||
|
||||
with _lock:
|
||||
server = _servers.get(server_name)
|
||||
if not server or not server.session:
|
||||
if not server:
|
||||
_bump_server_error(server_name)
|
||||
return json.dumps({
|
||||
"error": f"MCP server '{server_name}' is not connected"
|
||||
}, ensure_ascii=False)
|
||||
|
||||
if not server.session:
|
||||
# No live session — the server task is reconnecting, or it has
|
||||
# exhausted its retry budget and parked (e.g. a dead stdio
|
||||
# subprocess). Probing here would write into a dead/absent
|
||||
# transport and re-arm the breaker forever (#16788). Instead,
|
||||
# ask the (always-present) server task to rebuild the transport
|
||||
# — which respawns a dead stdio subprocess — and return a clean
|
||||
# "reconnecting" error so the model backs off without burning
|
||||
# iterations. The breaker resets once the fresh session
|
||||
# initializes (_run_stdio/_run_http call _reset_server_error).
|
||||
_bump_server_error(server_name)
|
||||
if _signal_reconnect(server):
|
||||
return json.dumps({
|
||||
"error": (
|
||||
f"MCP server '{server_name}' transport is down; "
|
||||
f"reconnect requested. Do NOT retry this tool "
|
||||
f"immediately — give it a few seconds to come back."
|
||||
)
|
||||
}, ensure_ascii=False)
|
||||
return json.dumps({
|
||||
"error": f"MCP server '{server_name}' is not connected"
|
||||
}, ensure_ascii=False)
|
||||
|
||||
async def _call():
|
||||
async with server._rpc_lock:
|
||||
# Snapshot the agent's context so an elicitation callback
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue