mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-22 10:32:00 +00:00
fix(mcp): expose late-connecting MCP tools to the agent (TUI/CLI/gateway)
MCP servers that connect after the agent's one-time tool snapshot were invisible for the whole session. Two root causes, fixed together: 1. The startup discovery wait was a flat 0.75s. HTTP/OAuth servers commonly take 2-6s on a cold connect, so they missed the window and their tools never entered the agent's snapshot. `thread.join(timeout)` already returns the instant discovery completes, so raising the bound costs ~0s for the common case (no MCP / fast servers) and only ever blocks for a genuinely-pending server, capped so a dead server can't freeze startup. The bound is now configurable via `mcp_discovery_timeout` (config.yaml, default 5.0s). 2. Three call sites duplicated the agent tool-snapshot rebuild (the TUI `reload.mcp` RPC, the gateway reload, and the TUI late-binding refresh thread), and the late-refresh detected changes by tool COUNT — missing an equal-size add/remove swap. Consolidated into one shared `tools.mcp_tool.refresh_agent_mcp_tools(agent)` helper that diffs by tool NAME, mutates the agent under a lock (thread-safe), and respects the agent's own enabled/disabled toolsets. The late-binding refresh keeps its pre-first-turn cache-safety guard: it never rebuilds the tool list once a turn has started, so the cached prompt prefix is never invalidated mid-conversation. Tests: new tests/tools/test_refresh_agent_mcp_tools.py covers the name-based diff, in-place mutation, agent-scoped filtering, thread safety, and the config-driven discovery bound (incl. instant-return when nothing is pending). 75 passed across the touched areas.
This commit is contained in:
parent
746c46d610
commit
93d6e73028
7 changed files with 297 additions and 41 deletions
|
|
@ -11656,7 +11656,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
# consented to the prompt-cache invalidation via the slash-confirm
|
||||
# gate in _handle_reload_mcp_command before we reach this point.
|
||||
try:
|
||||
from model_tools import get_tool_definitions
|
||||
from tools.mcp_tool import refresh_agent_mcp_tools
|
||||
_cache = getattr(self, "_agent_cache", None)
|
||||
_cache_lock = getattr(self, "_agent_cache_lock", None)
|
||||
if _cache_lock is not None and _cache:
|
||||
|
|
@ -11668,15 +11668,7 @@ class GatewayRunner(GatewayAuthorizationMixin, GatewayKanbanWatchersMixin, Gatew
|
|||
continue
|
||||
if _agent is None:
|
||||
continue
|
||||
new_defs = get_tool_definitions(
|
||||
enabled_toolsets=getattr(_agent, "enabled_toolsets", None),
|
||||
disabled_toolsets=getattr(_agent, "disabled_toolsets", None),
|
||||
quiet_mode=True,
|
||||
)
|
||||
_agent.tools = new_defs
|
||||
_agent.valid_tool_names = {
|
||||
t["function"]["name"] for t in new_defs
|
||||
} if new_defs else set()
|
||||
refresh_agent_mcp_tools(_agent, quiet_mode=True)
|
||||
except Exception as _exc:
|
||||
logger.debug(
|
||||
"Failed to update cached agent tools after MCP reload: %s",
|
||||
|
|
|
|||
|
|
@ -1201,6 +1201,19 @@ DEFAULT_CONFIG = {
|
|||
# 100K chars ≈ 25–35K tokens across typical tokenisers.
|
||||
"file_read_max_chars": 100_000,
|
||||
|
||||
# Seconds to wait at agent-build time for in-flight MCP server discovery
|
||||
# to finish before the agent snapshots its tool list. MCP discovery runs
|
||||
# in a background thread so a slow/dead server can't freeze startup; this
|
||||
# bounds how long the first agent build blocks on it. The wait returns
|
||||
# the INSTANT discovery completes, so users with no MCP servers (the common
|
||||
# case) or fast servers pay ~0s regardless of this value — the bound is
|
||||
# only reached when a server is genuinely still connecting. The old 0.75s
|
||||
# default was too short for HTTP/OAuth servers (which can take 2–6s on a
|
||||
# cold connect), so their tools were invisible for the whole session.
|
||||
# Slow servers that miss this window are still picked up by the automatic
|
||||
# late-binding refresh, so this is a UX/latency knob, not a correctness one.
|
||||
"mcp_discovery_timeout": 5.0,
|
||||
|
||||
# Tool-output truncation thresholds. When terminal output or a
|
||||
# single read_file page exceeds these limits, Hermes truncates the
|
||||
# payload sent to the model (keeping head + tail for terminal,
|
||||
|
|
|
|||
|
|
@ -51,9 +51,36 @@ def start_background_mcp_discovery(*, logger, thread_name: str) -> None:
|
|||
thread.start()
|
||||
|
||||
|
||||
def wait_for_mcp_discovery(timeout: float = 0.75) -> None:
|
||||
"""Briefly wait for background MCP discovery before the first tool snapshot."""
|
||||
def _resolve_discovery_timeout(explicit: "float | None") -> float:
|
||||
"""Resolve the MCP discovery wait bound: explicit arg > config > default.
|
||||
|
||||
Reads ``mcp_discovery_timeout`` from config.yaml. Kept lazy and
|
||||
fail-safe — a missing/invalid value falls back to the historical 0.75s so
|
||||
a broken config can never make startup hang or crash.
|
||||
"""
|
||||
if explicit is not None:
|
||||
return explicit
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
|
||||
raw = (load_config() or {}).get("mcp_discovery_timeout", 5.0)
|
||||
val = float(raw)
|
||||
return val if val > 0 else 0.75
|
||||
except Exception:
|
||||
return 0.75
|
||||
|
||||
|
||||
def wait_for_mcp_discovery(timeout: "float | None" = None) -> None:
|
||||
"""Wait for background MCP discovery before the first tool snapshot.
|
||||
|
||||
``thread.join(timeout)`` returns the INSTANT discovery completes, so this
|
||||
only ever blocks for the real connect time of a still-pending server —
|
||||
users with no MCP servers or fast servers pay ~0s. The bound (from
|
||||
``mcp_discovery_timeout`` in config) just caps the wait so a dead server
|
||||
can't freeze startup; servers that miss it are picked up by the automatic
|
||||
late-binding refresh.
|
||||
"""
|
||||
thread = _mcp_discovery_thread
|
||||
if thread is None or not thread.is_alive():
|
||||
return
|
||||
thread.join(timeout=timeout)
|
||||
thread.join(timeout=_resolve_discovery_timeout(timeout))
|
||||
|
|
|
|||
173
tests/tools/test_refresh_agent_mcp_tools.py
Normal file
173
tests/tools/test_refresh_agent_mcp_tools.py
Normal file
|
|
@ -0,0 +1,173 @@
|
|||
"""Tests for the shared MCP agent-tool refresh helper and discovery-wait bound.
|
||||
|
||||
``refresh_agent_mcp_tools`` is the single rebuild path used by the TUI
|
||||
``reload.mcp`` RPC, the gateway reload, and the late-binding refresh thread —
|
||||
so a slow MCP server that connects after the agent's one-time tool snapshot is
|
||||
picked up everywhere identically. These assert the *contracts* those callers
|
||||
rely on (name-based diff, in-place mutation, agent-scoped filtering) rather than
|
||||
freezing any particular tool list.
|
||||
"""
|
||||
|
||||
import threading
|
||||
import types
|
||||
|
||||
from tools import mcp_tool
|
||||
|
||||
|
||||
def _tool(name):
|
||||
return {"type": "function", "function": {"name": name, "description": "", "parameters": {}}}
|
||||
|
||||
|
||||
def _agent(tool_names, *, enabled=None, disabled=None):
|
||||
a = types.SimpleNamespace()
|
||||
a.tools = [_tool(n) for n in tool_names]
|
||||
a.valid_tool_names = set(tool_names)
|
||||
a.enabled_toolsets = enabled
|
||||
a.disabled_toolsets = disabled
|
||||
return a
|
||||
|
||||
|
||||
def test_refresh_adds_late_landing_tools(monkeypatch):
|
||||
"""A server that registers after build → its tools land in the snapshot."""
|
||||
agent = _agent(["read_file", "terminal"])
|
||||
|
||||
new_defs = [_tool(n) for n in ("read_file", "terminal", "mcp_granola_get_account_info")]
|
||||
monkeypatch.setattr(mcp_tool, "get_tool_definitions", lambda **kw: new_defs, raising=False)
|
||||
# get_tool_definitions is imported inside the helper from model_tools, so patch there too.
|
||||
import model_tools
|
||||
monkeypatch.setattr(model_tools, "get_tool_definitions", lambda **kw: new_defs)
|
||||
|
||||
added = mcp_tool.refresh_agent_mcp_tools(agent)
|
||||
|
||||
assert added == {"mcp_granola_get_account_info"}
|
||||
assert "mcp_granola_get_account_info" in agent.valid_tool_names
|
||||
assert len(agent.tools) == 3
|
||||
|
||||
|
||||
def test_refresh_no_change_returns_empty_and_leaves_agent_untouched(monkeypatch):
|
||||
"""No new tools → empty set, and the snapshot object is not swapped."""
|
||||
agent = _agent(["read_file", "terminal"])
|
||||
original_tools = agent.tools
|
||||
|
||||
import model_tools
|
||||
monkeypatch.setattr(
|
||||
model_tools, "get_tool_definitions",
|
||||
lambda **kw: [_tool("read_file"), _tool("terminal")],
|
||||
)
|
||||
|
||||
added = mcp_tool.refresh_agent_mcp_tools(agent)
|
||||
|
||||
assert added == set()
|
||||
assert agent.tools is original_tools # not replaced → no churn / no cache thrash
|
||||
|
||||
|
||||
def test_refresh_detects_equal_size_swap(monkeypatch):
|
||||
"""Name-based diff catches an add+remove of equal count (count-compare can't)."""
|
||||
agent = _agent(["a", "old_mcp_tool"]) # 2 tools
|
||||
|
||||
import model_tools
|
||||
# Same COUNT (2) but a different membership: old_mcp_tool removed, new added.
|
||||
monkeypatch.setattr(
|
||||
model_tools, "get_tool_definitions",
|
||||
lambda **kw: [_tool("a"), _tool("new_mcp_tool")],
|
||||
)
|
||||
|
||||
added = mcp_tool.refresh_agent_mcp_tools(agent)
|
||||
|
||||
assert added == {"new_mcp_tool"}
|
||||
assert agent.valid_tool_names == {"a", "new_mcp_tool"}
|
||||
assert "old_mcp_tool" not in agent.valid_tool_names
|
||||
|
||||
|
||||
def test_refresh_passes_agent_toolset_filters(monkeypatch):
|
||||
"""The rebuild re-derives with the agent's OWN enabled/disabled toolsets."""
|
||||
agent = _agent(["a"], enabled=["coding", "granola"], disabled=["messaging"])
|
||||
seen = {}
|
||||
|
||||
import model_tools
|
||||
|
||||
def _capture(**kw):
|
||||
seen.update(kw)
|
||||
return [_tool("a"), _tool("b")]
|
||||
|
||||
monkeypatch.setattr(model_tools, "get_tool_definitions", _capture)
|
||||
|
||||
mcp_tool.refresh_agent_mcp_tools(agent)
|
||||
|
||||
assert seen["enabled_toolsets"] == ["coding", "granola"]
|
||||
assert seen["disabled_toolsets"] == ["messaging"]
|
||||
|
||||
|
||||
def test_refresh_is_thread_safe_under_concurrent_calls(monkeypatch):
|
||||
"""Concurrent refreshes never leave tools / valid_tool_names inconsistent."""
|
||||
agent = _agent(["a"])
|
||||
|
||||
import model_tools
|
||||
defs = [_tool("a"), _tool("b"), _tool("c")]
|
||||
monkeypatch.setattr(model_tools, "get_tool_definitions", lambda **kw: defs)
|
||||
|
||||
errors = []
|
||||
|
||||
def _worker():
|
||||
try:
|
||||
for _ in range(50):
|
||||
mcp_tool.refresh_agent_mcp_tools(agent)
|
||||
# Invariant: valid_tool_names must always match agent.tools.
|
||||
names = {t["function"]["name"] for t in agent.tools}
|
||||
assert agent.valid_tool_names == names
|
||||
except Exception as exc: # pragma: no cover - failure path
|
||||
errors.append(exc)
|
||||
|
||||
threads = [threading.Thread(target=_worker) for _ in range(4)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join(timeout=10)
|
||||
|
||||
assert not errors
|
||||
assert agent.valid_tool_names == {"a", "b", "c"}
|
||||
|
||||
|
||||
# ── discovery-wait bound (mcp_discovery_timeout config) ──────────────────────
|
||||
|
||||
|
||||
def test_resolve_discovery_timeout_explicit_wins(monkeypatch):
|
||||
from hermes_cli import mcp_startup
|
||||
|
||||
assert mcp_startup._resolve_discovery_timeout(2.5) == 2.5
|
||||
|
||||
|
||||
def test_resolve_discovery_timeout_reads_config(monkeypatch):
|
||||
from hermes_cli import mcp_startup
|
||||
|
||||
monkeypatch.setattr(mcp_startup, "load_config", None, raising=False)
|
||||
import hermes_cli.config as cfg
|
||||
monkeypatch.setattr(cfg, "load_config", lambda: {"mcp_discovery_timeout": 8.0})
|
||||
|
||||
assert mcp_startup._resolve_discovery_timeout(None) == 8.0
|
||||
|
||||
|
||||
def test_resolve_discovery_timeout_falls_back_on_bad_value(monkeypatch):
|
||||
from hermes_cli import mcp_startup
|
||||
import hermes_cli.config as cfg
|
||||
|
||||
# Non-positive / unparsable → historical safe default, never hang.
|
||||
monkeypatch.setattr(cfg, "load_config", lambda: {"mcp_discovery_timeout": 0})
|
||||
assert mcp_startup._resolve_discovery_timeout(None) == 0.75
|
||||
|
||||
monkeypatch.setattr(cfg, "load_config", lambda: {"mcp_discovery_timeout": "oops"})
|
||||
assert mcp_startup._resolve_discovery_timeout(None) == 0.75
|
||||
|
||||
|
||||
def test_wait_returns_instantly_when_no_discovery_thread(monkeypatch):
|
||||
"""The common case (no MCP / discovery done) pays ~0s regardless of bound."""
|
||||
import time
|
||||
from hermes_cli import mcp_startup
|
||||
|
||||
monkeypatch.setattr(mcp_startup, "_mcp_discovery_thread", None)
|
||||
import hermes_cli.config as cfg
|
||||
monkeypatch.setattr(cfg, "load_config", lambda: {"mcp_discovery_timeout": 999.0})
|
||||
|
||||
t0 = time.time()
|
||||
mcp_startup.wait_for_mcp_discovery()
|
||||
assert time.time() - t0 < 0.2 # never blocks on the bound when nothing's pending
|
||||
|
|
@ -4228,6 +4228,61 @@ def probe_mcp_server_tools() -> Dict[str, List[tuple]]:
|
|||
return result
|
||||
|
||||
|
||||
# Serializes in-place mutation of an agent's tool snapshot. The reload RPC,
|
||||
# the gateway reload, and the late-binding refresh thread all swap
|
||||
# ``agent.tools`` / ``agent.valid_tool_names`` after the agent was built; the
|
||||
# agent's run loop reads those during tool iteration, so a concurrent write
|
||||
# mid-read could otherwise expose a half-updated list.
|
||||
_agent_tools_lock = threading.Lock()
|
||||
|
||||
|
||||
def refresh_agent_mcp_tools(agent, *, quiet_mode: bool = True) -> set:
|
||||
"""Re-derive an already-built agent's tool snapshot from the live registry.
|
||||
|
||||
The agent snapshots ``agent.tools`` once at build time and never re-reads
|
||||
the registry (see ``run_agent`` / ``agent_init``). When MCP servers connect
|
||||
*after* that snapshot — a slow HTTP/OAuth server that misses the bounded
|
||||
startup wait, or a ``/reload-mcp`` — their tools are invisible until the
|
||||
snapshot is rebuilt. This is the single shared rebuild used by every such
|
||||
caller (the TUI ``reload.mcp`` RPC, the gateway reload, and the late-binding
|
||||
refresh thread) so they can't drift apart again.
|
||||
|
||||
The rebuild respects the agent's own ``enabled_toolsets`` /
|
||||
``disabled_toolsets`` (the same filtering it was built with), diffs by tool
|
||||
**name** (not count — a count compare misses an equal-size add/remove swap),
|
||||
and mutates the agent under ``_agent_tools_lock``.
|
||||
|
||||
Returns the set of newly-added tool names (empty when nothing changed), so
|
||||
callers can decide whether to notify the user / re-emit session info. The
|
||||
caller owns the prompt-cache contract: this helper does NOT check turn state,
|
||||
because each caller has a different policy (``/reload-mcp`` rebuilds after
|
||||
explicit user consent; the late-binding thread only rebuilds pre-first-turn).
|
||||
"""
|
||||
from model_tools import get_tool_definitions
|
||||
|
||||
with _agent_tools_lock:
|
||||
current = {
|
||||
t["function"]["name"]
|
||||
for t in (getattr(agent, "tools", None) or [])
|
||||
}
|
||||
|
||||
new_defs = get_tool_definitions(
|
||||
enabled_toolsets=getattr(agent, "enabled_toolsets", None),
|
||||
disabled_toolsets=getattr(agent, "disabled_toolsets", None),
|
||||
quiet_mode=quiet_mode,
|
||||
)
|
||||
new_names = {t["function"]["name"] for t in new_defs} if new_defs else set()
|
||||
|
||||
if new_names == current:
|
||||
return set()
|
||||
|
||||
with _agent_tools_lock:
|
||||
agent.tools = new_defs
|
||||
agent.valid_tool_names = new_names
|
||||
|
||||
return new_names - current
|
||||
|
||||
|
||||
def shutdown_mcp_servers():
|
||||
"""Close all MCP server connections and stop the background loop.
|
||||
|
||||
|
|
|
|||
|
|
@ -192,22 +192,32 @@ def _log_exit(reason: str) -> None:
|
|||
print(f"[gateway-exit] {reason}", file=sys.stderr, flush=True)
|
||||
|
||||
|
||||
def wait_for_mcp_discovery(timeout: float = 0.75) -> None:
|
||||
"""Briefly block until background MCP discovery finishes, up to ``timeout``.
|
||||
def wait_for_mcp_discovery(timeout: "float | None" = None) -> None:
|
||||
"""Block until background MCP discovery finishes, up to the resolved bound.
|
||||
|
||||
MCP discovery runs in a daemon thread spawned at startup (see main()) so a
|
||||
slow/dead server can't freeze ``gateway.ready``. But the agent snapshots
|
||||
its tool list ONCE at build time and never re-reads it, so a reachable-but-
|
||||
slow server that finishes connecting *after* the first prompt would be
|
||||
invisible for the whole session. Joining with a short bounded timeout
|
||||
before the first agent build lets already-spawning fast servers land
|
||||
without re-introducing the startup hang: a dead server simply isn't waited
|
||||
on beyond ``timeout``. No-op when no discovery thread was started.
|
||||
invisible for the whole session. Joining with a bounded timeout before the
|
||||
first agent build lets already-spawning servers land without re-introducing
|
||||
the startup hang: ``thread.join(timeout)`` returns the instant discovery
|
||||
completes (so fast/no-MCP startups pay ~0s), and a dead server is simply not
|
||||
waited on beyond the bound. No-op when no discovery thread was started.
|
||||
|
||||
The bound comes from ``mcp_discovery_timeout`` in config (shared with the
|
||||
CLI path via ``hermes_cli.mcp_startup``); ``timeout`` overrides it.
|
||||
"""
|
||||
thread = _mcp_discovery_thread
|
||||
if thread is None or not thread.is_alive():
|
||||
return
|
||||
thread.join(timeout=timeout)
|
||||
try:
|
||||
from hermes_cli.mcp_startup import _resolve_discovery_timeout
|
||||
|
||||
bound = _resolve_discovery_timeout(timeout)
|
||||
except Exception:
|
||||
bound = timeout if timeout is not None else 0.75
|
||||
thread.join(timeout=bound)
|
||||
|
||||
|
||||
def mcp_discovery_in_flight() -> bool:
|
||||
|
|
|
|||
|
|
@ -3557,26 +3557,19 @@ def _schedule_mcp_late_refresh(sid: str, agent) -> None:
|
|||
):
|
||||
return
|
||||
try:
|
||||
from model_tools import get_tool_definitions
|
||||
from tools.mcp_tool import refresh_agent_mcp_tools
|
||||
|
||||
new_defs = get_tool_definitions(
|
||||
enabled_toolsets=_load_enabled_toolsets(),
|
||||
quiet_mode=True,
|
||||
)
|
||||
added = refresh_agent_mcp_tools(agent, quiet_mode=True)
|
||||
except Exception as exc:
|
||||
logger.warning(
|
||||
"Late MCP refresh: get_tool_definitions failed for %s: %s",
|
||||
"Late MCP refresh: tool snapshot rebuild failed for %s: %s",
|
||||
sid,
|
||||
exc,
|
||||
)
|
||||
return
|
||||
# No change (discovery added nothing new) → don't churn the client.
|
||||
if len(new_defs or []) == len(getattr(agent, "tools", []) or []):
|
||||
# No new tools landed (discovery added nothing) → don't churn the client.
|
||||
if not added:
|
||||
return
|
||||
agent.tools = new_defs
|
||||
agent.valid_tool_names = (
|
||||
{t["function"]["name"] for t in new_defs} if new_defs else set()
|
||||
)
|
||||
info = _session_info(agent, session)
|
||||
# Emit outside the lock — write_json must not block under _sessions_lock.
|
||||
_emit("session.info", sid, info)
|
||||
|
|
@ -8414,16 +8407,9 @@ def _(rid, params: dict) -> dict:
|
|||
# The user already consented to the prompt-cache invalidation via
|
||||
# the confirm gate above. Mirrors gateway/run.py::_execute_mcp_reload.
|
||||
try:
|
||||
from model_tools import get_tool_definitions
|
||||
from tools.mcp_tool import refresh_agent_mcp_tools
|
||||
|
||||
new_defs = get_tool_definitions(
|
||||
enabled_toolsets=_load_enabled_toolsets(),
|
||||
quiet_mode=True,
|
||||
)
|
||||
agent.tools = new_defs
|
||||
agent.valid_tool_names = (
|
||||
{t["function"]["name"] for t in new_defs} if new_defs else set()
|
||||
)
|
||||
refresh_agent_mcp_tools(agent, quiet_mode=True)
|
||||
except Exception as _exc:
|
||||
logger.warning(
|
||||
"Failed to refresh cached agent tools after /reload-mcp: %s",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue