mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-06-21 10:22:18 +00:00
fix(mcp): address adversarial review round 2 (stale-publish race, parity holes)
Second review pass (Codex + Hermes subagent). Codex reproduced a real race with a two-thread harness; both converged on the remaining issues. - Generation-aware publish (fixes a lost-update race): two refresh callers (the late-refresh daemon and the between-turns prologue around turn 1) could each compute a snapshot outside the lock; a SLOWER caller holding an OLDER registry generation could acquire the publish lock after a newer caller and clobber it, deleting just-landed tools. refresh_agent_mcp_tools now captures registry._generation before computing and refuses to publish a stale set; agent._tool_snapshot_generation tracks the published generation. - Context-engine routing names (_context_engine_tool_names) are now staged on a local and published atomically with the snapshot, and only claimed when this rebuild actually appended the schema — matching agent_init's dedup so a registry/plugin tool of the same name keeps its own dispatch. (Previously mutated live, before the publish lock, and on no-change refreshes.) - CLI /reload-mcp: self.enabled_toolsets is resolved once at startup, so a server newly ENABLED in config mid-session wasn't picked up (TUI already re-resolved). Merge now-connected MCP server names into the override (unless the user pinned all/*), mirroring startup, and keep self.enabled_toolsets in sync. Closes the CLI/TUI parity hole. - ACP (acp_adapter/server.py) routed through the shared helper — it was a 5th sibling rebuild that re-injected memory tools but NOT context-engine tools and bypassed the atomic/name-diff path (inert today, fragile). - mcp_startup._resolve_discovery_timeout pulls its default from DEFAULT_CONFIG (single source of truth) instead of a stale hardcoded 5.0 literal. - Tests: stale-generation-no-clobber, _skip_mcp_refresh honored, timeout fallback uses DEFAULT_CONFIG.
This commit is contained in:
parent
b6e2a54a94
commit
88d523220f
7 changed files with 137 additions and 40 deletions
|
|
@ -823,24 +823,21 @@ class HermesACPAgent(acp.Agent):
|
|||
return
|
||||
|
||||
try:
|
||||
from model_tools import get_tool_definitions
|
||||
from agent.memory_manager import inject_memory_provider_tools
|
||||
from tools.mcp_tool import refresh_agent_mcp_tools
|
||||
|
||||
enabled_toolsets = _expand_acp_enabled_toolsets(
|
||||
getattr(state.agent, "enabled_toolsets", None) or ["hermes-acp"],
|
||||
mcp_server_names=[server.name for server in mcp_servers],
|
||||
)
|
||||
state.agent.enabled_toolsets = enabled_toolsets
|
||||
disabled_toolsets = getattr(state.agent, "disabled_toolsets", None)
|
||||
state.agent.tools = get_tool_definitions(
|
||||
enabled_toolsets=enabled_toolsets,
|
||||
disabled_toolsets=disabled_toolsets,
|
||||
# Route through the shared helper (name-diff, atomic publish, and —
|
||||
# critically — additive-preserving so memory-provider AND context-
|
||||
# engine tools survive). enabled_override applies the ACP-expanded
|
||||
# toolset and stores it on the agent, matching prior behavior.
|
||||
refresh_agent_mcp_tools(
|
||||
state.agent,
|
||||
enabled_override=enabled_toolsets,
|
||||
quiet_mode=True,
|
||||
)
|
||||
state.agent.valid_tool_names = {
|
||||
tool["function"]["name"] for tool in state.agent.tools or []
|
||||
}
|
||||
inject_memory_provider_tools(state.agent)
|
||||
invalidate = getattr(state.agent, "_invalidate_system_prompt", None)
|
||||
if callable(invalidate):
|
||||
invalidate()
|
||||
|
|
|
|||
|
|
@ -535,6 +535,14 @@ def init_agent(
|
|||
# Set on internal forks (e.g. background_review) that must keep ``tools[]``
|
||||
# byte-identical to a parent for provider cache parity.
|
||||
agent._skip_mcp_refresh = False
|
||||
# Registry generation the current tool snapshot was derived from. Lets a
|
||||
# late/concurrent refresh reject a stale (older-generation) rebuild instead
|
||||
# of clobbering a newer one. See tools.mcp_tool.refresh_agent_mcp_tools.
|
||||
try:
|
||||
from tools.registry import registry as _registry
|
||||
agent._tool_snapshot_generation = _registry._generation
|
||||
except Exception:
|
||||
agent._tool_snapshot_generation = 0
|
||||
# Rate limit tracking — updated from x-ratelimit-* response headers
|
||||
# after each API call. Accessed by /usage slash command.
|
||||
agent._rate_limit_state: Optional["RateLimitState"] = None
|
||||
|
|
|
|||
21
cli.py
21
cli.py
|
|
@ -9668,13 +9668,28 @@ class HermesCLI(CLIAgentSetupMixin, CLICommandsMixin):
|
|||
# memory-provider and context-engine tools survive the rebuild).
|
||||
if self.agent is not None:
|
||||
from tools.mcp_tool import refresh_agent_mcp_tools
|
||||
# Explicit reload: re-resolve enabled toolsets so a server the
|
||||
# user just enabled in config this session is picked up.
|
||||
# Explicit reload: pick up MCP servers the user ENABLED in config
|
||||
# this session. self.enabled_toolsets was resolved once at
|
||||
# startup; merge in any now-connected server names (unless the
|
||||
# user pinned `all`/`*`, which already includes everything) so a
|
||||
# freshly-added server isn't filtered out. Mirrors startup, where
|
||||
# MCP server names are part of enabled_toolsets (see __init__).
|
||||
enabled_override = None
|
||||
et = self.enabled_toolsets
|
||||
if et and "all" not in et and "*" not in et:
|
||||
merged = list(et)
|
||||
for _name in sorted(connected_servers):
|
||||
if _name not in merged:
|
||||
merged.append(_name)
|
||||
enabled_override = merged
|
||||
refresh_agent_mcp_tools(
|
||||
self.agent,
|
||||
enabled_override=self.enabled_toolsets,
|
||||
enabled_override=enabled_override,
|
||||
quiet_mode=True,
|
||||
)
|
||||
# Keep the CLI's own list in sync with what the agent now uses.
|
||||
if enabled_override is not None:
|
||||
self.enabled_toolsets = enabled_override
|
||||
|
||||
# Inject a message at the END of conversation history so the
|
||||
# model knows tools changed. Appended after all existing
|
||||
|
|
|
|||
|
|
@ -54,20 +54,22 @@ def start_background_mcp_discovery(*, logger, thread_name: str) -> None:
|
|||
def _resolve_discovery_timeout(explicit: "float | None") -> float:
|
||||
"""Resolve the MCP discovery wait bound: explicit arg > config > default.
|
||||
|
||||
Reads ``mcp_discovery_timeout`` from config.yaml. Kept lazy and
|
||||
fail-safe — a missing/invalid value falls back to the historical 0.75s so
|
||||
a broken config can never make startup hang or crash.
|
||||
Reads ``mcp_discovery_timeout`` from config.yaml, defaulting to the value in
|
||||
``DEFAULT_CONFIG`` (single source of truth) when the key is absent. Kept lazy
|
||||
and fail-safe — a missing/invalid value or a broken config falls back to a
|
||||
short safe bound so startup can never hang or crash.
|
||||
"""
|
||||
if explicit is not None:
|
||||
return explicit
|
||||
try:
|
||||
from hermes_cli.config import load_config
|
||||
from hermes_cli.config import load_config, DEFAULT_CONFIG
|
||||
|
||||
raw = (load_config() or {}).get("mcp_discovery_timeout", 5.0)
|
||||
default = float(DEFAULT_CONFIG.get("mcp_discovery_timeout", 1.5))
|
||||
raw = (load_config() or {}).get("mcp_discovery_timeout", default)
|
||||
val = float(raw)
|
||||
return val if val > 0 else 0.75
|
||||
return val if val > 0 else default
|
||||
except Exception:
|
||||
return 0.75
|
||||
return 1.5
|
||||
|
||||
|
||||
def wait_for_mcp_discovery(timeout: "float | None" = None) -> None:
|
||||
|
|
|
|||
|
|
@ -49,6 +49,7 @@ class _FakeAgent:
|
|||
self.valid_tool_names = set()
|
||||
self.enabled_toolsets = None
|
||||
self.disabled_toolsets = None
|
||||
self._skip_mcp_refresh = False
|
||||
self.compression_enabled = False
|
||||
self.context_compressor = types.SimpleNamespace(
|
||||
protect_first_n=2, protect_last_n=2
|
||||
|
|
@ -225,6 +226,21 @@ def test_between_turns_refresh_skipped_when_no_servers():
|
|||
gtd.assert_not_called()
|
||||
|
||||
|
||||
def test_between_turns_refresh_skipped_when_skip_flag_set():
|
||||
"""Internal forks (background_review) set _skip_mcp_refresh to keep tools[]
|
||||
byte-identical to the parent for cache parity — the hook must honor it even
|
||||
when MCP servers are registered."""
|
||||
agent = _FakeAgent()
|
||||
agent._skip_mcp_refresh = True
|
||||
import model_tools
|
||||
|
||||
with patch("tools.mcp_tool.has_registered_mcp_tools", return_value=True), \
|
||||
patch.object(model_tools, "get_tool_definitions") as gtd:
|
||||
_build(agent)
|
||||
|
||||
gtd.assert_not_called()
|
||||
|
||||
|
||||
def test_between_turns_refresh_no_churn_when_unchanged():
|
||||
"""R2: an unchanged tool set leaves the snapshot object identity intact
|
||||
(no needless swap → nothing for the next request prefix to diff against)."""
|
||||
|
|
|
|||
|
|
@ -253,12 +253,35 @@ def test_resolve_discovery_timeout_falls_back_on_bad_value(monkeypatch):
|
|||
from hermes_cli import mcp_startup
|
||||
import hermes_cli.config as cfg
|
||||
|
||||
# Non-positive / unparsable → historical safe default, never hang.
|
||||
# Non-positive / unparsable → DEFAULT_CONFIG value, never hang.
|
||||
default = float(cfg.DEFAULT_CONFIG.get("mcp_discovery_timeout", 1.5))
|
||||
monkeypatch.setattr(cfg, "load_config", lambda: {"mcp_discovery_timeout": 0})
|
||||
assert mcp_startup._resolve_discovery_timeout(None) == 0.75
|
||||
assert mcp_startup._resolve_discovery_timeout(None) == default
|
||||
|
||||
monkeypatch.setattr(cfg, "load_config", lambda: {"mcp_discovery_timeout": "oops"})
|
||||
assert mcp_startup._resolve_discovery_timeout(None) == 0.75
|
||||
assert mcp_startup._resolve_discovery_timeout(None) == default
|
||||
|
||||
|
||||
def test_stale_generation_refresh_does_not_clobber_newer(monkeypatch):
|
||||
"""A slower refresh that computed an OLDER registry generation must not
|
||||
overwrite a snapshot a newer-generation refresh already published."""
|
||||
from tools import registry as _reg_mod
|
||||
|
||||
agent = _agent(["read_file"])
|
||||
# A newer refresh already published generation = current+5, with two tools.
|
||||
agent._tool_snapshot_generation = _reg_mod.registry._generation + 5
|
||||
agent.tools = [_tool("read_file"), _tool("mcp_new_tool")]
|
||||
agent.valid_tool_names = {"read_file", "mcp_new_tool"}
|
||||
|
||||
import model_tools
|
||||
# This (stale) refresh computes only the old single-tool set.
|
||||
monkeypatch.setattr(model_tools, "get_tool_definitions", lambda **kw: [_tool("read_file")])
|
||||
|
||||
added = mcp_tool.refresh_agent_mcp_tools(agent)
|
||||
|
||||
# Stale write rejected: the newer tool survives.
|
||||
assert added == set()
|
||||
assert "mcp_new_tool" in agent.valid_tool_names
|
||||
|
||||
|
||||
def test_wait_returns_instantly_when_no_discovery_thread(monkeypatch):
|
||||
|
|
|
|||
|
|
@ -4292,6 +4292,7 @@ def refresh_agent_mcp_tools(
|
|||
at a turn boundary, before that turn's ``tools=`` prefix is assembled).
|
||||
"""
|
||||
from model_tools import get_tool_definitions
|
||||
from tools.registry import registry
|
||||
|
||||
# Explicit reloads (/reload-mcp) pass freshly-resolved toolsets so a server
|
||||
# the user just ENABLED in config is picked up; the agent's stored selection
|
||||
|
|
@ -4306,10 +4307,18 @@ def refresh_agent_mcp_tools(
|
|||
enabled = getattr(agent, "enabled_toolsets", None)
|
||||
disabled = getattr(agent, "disabled_toolsets", None)
|
||||
|
||||
# Capture the registry generation this rebuild is derived from BEFORE the
|
||||
# (potentially slow) get_tool_definitions call. Used at publish time to
|
||||
# reject a stale write: if two callers race (e.g. the late-refresh daemon
|
||||
# and the between-turns prologue around turn 1), a slower caller that
|
||||
# computed an OLDER set must not clobber a newer set another caller already
|
||||
# published. ``registry._generation`` bumps on every (de)register.
|
||||
snapshot_generation = registry._generation
|
||||
|
||||
# Registry-derived tools (built-ins + MCP), filtered to the agent's toolsets.
|
||||
# Computed OUTSIDE the lock (get_tool_definitions can be slow); the diff and
|
||||
# publish below happen together in ONE critical section so two concurrent
|
||||
# callers can't compute overlapping ``added`` sets or torn-publish.
|
||||
# callers can't torn-publish or compute overlapping ``added`` sets.
|
||||
new_defs = list(
|
||||
get_tool_definitions(
|
||||
enabled_toolsets=enabled,
|
||||
|
|
@ -4323,40 +4332,63 @@ def refresh_agent_mcp_tools(
|
|||
# Re-append the post-build injected families that get_tool_definitions does
|
||||
# NOT reproduce, so a refresh never strips them (memory-provider + context-
|
||||
# engine tools). Staged entirely on LOCALS — the live ``agent.tools`` /
|
||||
# ``valid_tool_names`` are never touched until the single atomic publish
|
||||
# below, so a concurrent reader (``build_api_kwargs``) can't see a partial
|
||||
# rebuild or a cross-attribute half-swap.
|
||||
_reinject_post_build_tools(agent, new_defs, new_names)
|
||||
# ``valid_tool_names`` / ``_context_engine_tool_names`` are never touched
|
||||
# until the single atomic publish below, so a concurrent reader
|
||||
# (``build_api_kwargs``) can't see a partial rebuild or a cross-attribute
|
||||
# half-swap. ``staged_engine_names`` are the context-engine routing names
|
||||
# this rebuild actually appended (matching agent_init's dedup-aware add).
|
||||
staged_engine_names = _reinject_post_build_tools(agent, new_defs, new_names)
|
||||
|
||||
# Single atomic read-diff-publish so the returned ``added`` is consistent
|
||||
# with what was actually published, even under concurrent callers.
|
||||
# with what was actually published, even under concurrent callers, and a
|
||||
# stale (older-generation) rebuild can't overwrite a newer published one.
|
||||
with _agent_tools_lock:
|
||||
published_gen = getattr(agent, "_tool_snapshot_generation", -1)
|
||||
if snapshot_generation < published_gen:
|
||||
# A newer snapshot already won; our set is stale — drop it.
|
||||
return set()
|
||||
current = {
|
||||
t["function"]["name"]
|
||||
for t in (getattr(agent, "tools", None) or [])
|
||||
}
|
||||
if new_names == current:
|
||||
return set() # no change → leave the live snapshot untouched (no churn)
|
||||
# No change → leave the live snapshot untouched (no churn), but
|
||||
# record the generation so an in-flight older caller can't clobber.
|
||||
agent._tool_snapshot_generation = max(published_gen, snapshot_generation)
|
||||
return set()
|
||||
agent.tools = new_defs
|
||||
agent.valid_tool_names = new_names
|
||||
# Publish context-engine routing names atomically with the snapshot.
|
||||
engine_names = getattr(agent, "_context_engine_tool_names", None)
|
||||
if isinstance(engine_names, set):
|
||||
engine_names.clear()
|
||||
engine_names.update(staged_engine_names)
|
||||
agent._tool_snapshot_generation = max(published_gen, snapshot_generation)
|
||||
return new_names - current
|
||||
|
||||
|
||||
def _reinject_post_build_tools(agent, tools_list: list, name_set: set) -> None:
|
||||
def _reinject_post_build_tools(agent, tools_list: list, name_set: set) -> set:
|
||||
"""Append memory-provider and context-engine tools onto staged locals.
|
||||
|
||||
Mirrors the post-``get_tool_definitions`` injection in ``agent_init`` so a
|
||||
snapshot rebuild reconstructs the FULL tool surface, not just the
|
||||
registry-derived subset. Operates on the caller's staged ``tools_list`` /
|
||||
``name_set`` (NOT the live agent attributes) so the rebuild stays atomic.
|
||||
registry-derived subset. Operates ONLY on the caller's staged ``tools_list``
|
||||
/ ``name_set`` (never the live agent attributes) so the rebuild stays atomic.
|
||||
Idempotent (skips names already present) and fail-soft.
|
||||
|
||||
Returns the set of context-engine routing names actually appended by THIS
|
||||
rebuild — matching ``agent_init``'s dedup behavior (a name already provided
|
||||
by a registry/plugin tool is NOT claimed for context-engine routing). The
|
||||
caller publishes this into ``agent._context_engine_tool_names`` atomically
|
||||
with the snapshot.
|
||||
"""
|
||||
def _add(schema: dict) -> None:
|
||||
def _add(schema: dict) -> bool:
|
||||
name = schema.get("name", "")
|
||||
if not name or name in name_set:
|
||||
return
|
||||
return False
|
||||
tools_list.append({"type": "function", "function": schema})
|
||||
name_set.add(name)
|
||||
return True
|
||||
|
||||
# Memory-provider tools (mem0/honcho/byterover/supermemory/…).
|
||||
try:
|
||||
|
|
@ -4378,23 +4410,27 @@ def _reinject_post_build_tools(agent, tools_list: list, name_set: set) -> None:
|
|||
# restricted-toolset platform (e.g. platform_toolsets: telegram: []) would
|
||||
# re-leak lcm_* tools the build deliberately excluded, and pay the local-
|
||||
# model latency penalty.
|
||||
staged_engine_names: set = set()
|
||||
try:
|
||||
enabled = getattr(agent, "enabled_toolsets", None)
|
||||
context_engine_allowed = enabled is None or "context_engine" in enabled
|
||||
compressor = getattr(agent, "context_compressor", None)
|
||||
get_schemas = getattr(compressor, "get_tool_schemas", None) if compressor else None
|
||||
if context_engine_allowed and callable(get_schemas):
|
||||
engine_names = getattr(agent, "_context_engine_tool_names", None)
|
||||
for schema in get_schemas():
|
||||
if not isinstance(schema, dict):
|
||||
continue
|
||||
name = schema.get("name", "")
|
||||
_add(schema)
|
||||
if name and isinstance(engine_names, set):
|
||||
engine_names.add(name)
|
||||
# Only claim the routing name when WE appended the schema, so a
|
||||
# name already owned by a registry/plugin tool keeps its own
|
||||
# dispatch (matches agent_init.py's `continue`-before-claim).
|
||||
if _add(schema) and name:
|
||||
staged_engine_names.add(name)
|
||||
except Exception:
|
||||
logger.debug("Context-engine tool re-injection skipped", exc_info=True)
|
||||
|
||||
return staged_engine_names
|
||||
|
||||
|
||||
def shutdown_mcp_servers():
|
||||
"""Close all MCP server connections and stop the background loop.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue