mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-05-18 04:41:56 +00:00
Wraps every sync->async coroutine-scheduling site in the codebase with a new agent.async_utils.safe_schedule_threadsafe() helper that closes the coroutine on scheduling failure (closed loop, shutdown race, etc.) instead of leaking it as 'coroutine was never awaited' RuntimeWarnings plus reference leaks. 22 production call sites migrated across the codebase: - acp_adapter/events.py, acp_adapter/permissions.py - agent/lsp/manager.py - cron/scheduler.py (media + text delivery paths) - gateway/platforms/feishu.py (5 sites, via existing _submit_on_loop helper which now delegates to safe_schedule_threadsafe) - gateway/run.py (10 sites: telegram rename, agent:step hook, status callback, interim+bg-review, clarify send, exec-approval button+text, temp-bubble cleanup, channel-directory refresh) - plugins/memory/hindsight, plugins/platforms/google_chat - tools/browser_supervisor.py (3), browser_cdp_tool.py, computer_use/cua_backend.py, slash_confirm.py - tools/environments/modal.py (_AsyncWorker) - tools/mcp_tool.py (2 + 8 _run_on_mcp_loop callers converted to factory-style so the coroutine is never constructed on a dead loop) - tui_gateway/ws.py Tests: new tests/agent/test_async_utils.py covers helper behavior under live loop, dead loop, None loop, and scheduling exceptions. Regression tests added at three PR-original sites (acp events, acp permissions, mcp loop runner) mirroring contributor's intent. Live-tested end-to-end: - Helper stress test: 1500 schedules across live/dead/race scenarios, zero leaked coroutines - Race exercised: 5000 schedules with loop killed mid-flight, 100 ok / 4900 None returns, zero leaks - hermes chat -q with terminal tool call (exercises step_callback bridge) - MCP probe against failing subprocess servers + factory path - Real gateway daemon boot + SIGINT shutdown across multiple platform adapter inits - WSTransport 100 live + 50 dead-loop writes - Cron delivery path live + dead loop Salvages PR #2657 — adopts contributor's intent over a much wider site list and a single centralized helper instead of inline try/except at each site. 3 of the original PR's 6 sites no longer exist on main (environments/patches.py deleted, DingTalk refactored to native async); the equivalent fix lives in tools/environments/modal.py instead. Co-authored-by: JithendraNara <jithendranaidunara@gmail.com>
143 lines
5.6 KiB
Python
143 lines
5.6 KiB
Python
"""Tests for MCP tool structuredContent preservation."""
|
|
|
|
import asyncio
|
|
import json
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from tools import mcp_tool
|
|
|
|
|
|
class _FakeContentBlock:
|
|
"""Minimal content block with .text and .type attributes."""
|
|
|
|
def __init__(self, text: str, block_type: str = "text"):
|
|
self.text = text
|
|
self.type = block_type
|
|
|
|
|
|
class _FakeCallToolResult:
|
|
"""Minimal CallToolResult stand-in.
|
|
|
|
Uses camelCase ``structuredContent`` / ``isError`` to match the real
|
|
MCP SDK Pydantic model (``mcp.types.CallToolResult``).
|
|
"""
|
|
|
|
def __init__(self, content, is_error=False, structuredContent=None):
|
|
self.content = content
|
|
self.isError = is_error
|
|
self.structuredContent = structuredContent
|
|
|
|
|
|
def _fake_run_on_mcp_loop(coro_or_factory, timeout=30):
|
|
coro = coro_or_factory() if callable(coro_or_factory) else coro_or_factory
|
|
"""Run an MCP coroutine directly in a fresh event loop."""
|
|
loop = asyncio.new_event_loop()
|
|
try:
|
|
# `_rpc_lock` must be created inside the loop that awaits it, or asyncio
|
|
# raises "attached to a different loop". Build it here and attach it to
|
|
# whatever fake server is currently registered under _servers.
|
|
async def _install_lock_and_run():
|
|
for srv in list(mcp_tool._servers.values()):
|
|
if getattr(srv, "_rpc_lock", None) is None:
|
|
srv._rpc_lock = asyncio.Lock()
|
|
return await coro
|
|
return loop.run_until_complete(_install_lock_and_run())
|
|
finally:
|
|
loop.close()
|
|
|
|
|
|
@pytest.fixture
|
|
def _patch_mcp_server():
|
|
"""Patch _servers and the MCP event loop so _make_tool_handler can run."""
|
|
fake_session = MagicMock()
|
|
# `_rpc_lock` is acquired by _make_tool_handler's call path (mcp_tool.py
|
|
# ~L2008) to serialize JSON-RPC against the server — build it inside the
|
|
# fresh loop that _fake_run_on_mcp_loop spins up, not at fixture import.
|
|
fake_server = SimpleNamespace(session=fake_session, _rpc_lock=None)
|
|
with patch.dict(mcp_tool._servers, {"test-server": fake_server}), \
|
|
patch("tools.mcp_tool._run_on_mcp_loop", side_effect=_fake_run_on_mcp_loop):
|
|
yield fake_session
|
|
|
|
|
|
class TestStructuredContentPreservation:
|
|
"""Ensure structuredContent from CallToolResult is forwarded."""
|
|
|
|
def test_text_only_result(self, _patch_mcp_server):
|
|
"""When no structuredContent, result is text-only (existing behaviour)."""
|
|
session = _patch_mcp_server
|
|
session.call_tool = AsyncMock(
|
|
return_value=_FakeCallToolResult(
|
|
content=[_FakeContentBlock("hello")],
|
|
)
|
|
)
|
|
handler = mcp_tool._make_tool_handler("test-server", "my-tool", 30.0)
|
|
raw = handler({})
|
|
data = json.loads(raw)
|
|
assert data == {"result": "hello"}
|
|
|
|
def test_both_content_and_structured(self, _patch_mcp_server):
|
|
"""When both content and structuredContent are present, combine them."""
|
|
session = _patch_mcp_server
|
|
payload = {"value": "secret-123", "revealed": True}
|
|
session.call_tool = AsyncMock(
|
|
return_value=_FakeCallToolResult(
|
|
content=[_FakeContentBlock("OK")],
|
|
structuredContent=payload,
|
|
)
|
|
)
|
|
handler = mcp_tool._make_tool_handler("test-server", "my-tool", 30.0)
|
|
raw = handler({})
|
|
data = json.loads(raw)
|
|
# content is the primary result, structuredContent is supplementary
|
|
assert data["result"] == "OK"
|
|
assert data["structuredContent"] == payload
|
|
|
|
def test_both_content_and_structured_desktop_commander(self, _patch_mcp_server):
|
|
"""Real-world case: Desktop Commander returns file text in content,
|
|
metadata in structuredContent. Agent must see file contents."""
|
|
session = _patch_mcp_server
|
|
file_text = "import os\nprint('hello')\n"
|
|
metadata = {"fileName": "main.py", "filePath": "/tmp/main.py", "fileType": "python"}
|
|
session.call_tool = AsyncMock(
|
|
return_value=_FakeCallToolResult(
|
|
content=[_FakeContentBlock(file_text)],
|
|
structuredContent=metadata,
|
|
)
|
|
)
|
|
handler = mcp_tool._make_tool_handler("test-server", "my-tool", 30.0)
|
|
raw = handler({})
|
|
data = json.loads(raw)
|
|
assert data["result"] == file_text
|
|
assert data["structuredContent"] == metadata
|
|
|
|
def test_structured_content_none_falls_back_to_text(self, _patch_mcp_server):
|
|
"""When structuredContent is explicitly None, fall back to text."""
|
|
session = _patch_mcp_server
|
|
session.call_tool = AsyncMock(
|
|
return_value=_FakeCallToolResult(
|
|
content=[_FakeContentBlock("done")],
|
|
structuredContent=None,
|
|
)
|
|
)
|
|
handler = mcp_tool._make_tool_handler("test-server", "my-tool", 30.0)
|
|
raw = handler({})
|
|
data = json.loads(raw)
|
|
assert data == {"result": "done"}
|
|
|
|
def test_empty_text_with_structured_content(self, _patch_mcp_server):
|
|
"""When content blocks are empty but structuredContent exists."""
|
|
session = _patch_mcp_server
|
|
payload = {"status": "ok", "data": [1, 2, 3]}
|
|
session.call_tool = AsyncMock(
|
|
return_value=_FakeCallToolResult(
|
|
content=[],
|
|
structuredContent=payload,
|
|
)
|
|
)
|
|
handler = mcp_tool._make_tool_handler("test-server", "my-tool", 30.0)
|
|
raw = handler({})
|
|
data = json.loads(raw)
|
|
assert data["result"] == payload
|