mirror of
https://github.com/NousResearch/hermes-agent.git
synced 2026-04-25 00:51:20 +00:00
Fix variable name breakage (run_agent, hermes_constants, etc.) where import rewriter changed 'import X' to 'import hermes_agent.Y' but test code still referenced 'X' as a variable name. Fix package-vs-module confusion (cli.auth, cli.models, cli.ui) where single files became directories. Fix hardcoded file paths in tests pointing to old locations. Fix tool registry to discover tools in subpackage directories. Fix stale import in hermes_agent/tools/__init__.py. Part of #14182, #14183
1111 lines
44 KiB
Python
1111 lines
44 KiB
Python
"""
|
|
Tests for mcp_serve — Hermes MCP server.
|
|
|
|
Three layers of tests:
|
|
1. Unit tests — helpers, content extraction, attachment parsing
|
|
2. EventBridge tests — queue mechanics, cursors, waiters, concurrency
|
|
3. End-to-end tests — call actual MCP tools through FastMCP's tool manager
|
|
with real session data in SQLite and sessions.json
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import time
|
|
import threading
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _isolate_hermes_home(tmp_path, monkeypatch):
|
|
"""Redirect HERMES_HOME to a temp directory."""
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
try:
|
|
from hermes_agent import constants as hermes_constants
|
|
monkeypatch.setattr(hermes_constants, "get_hermes_home", lambda: tmp_path)
|
|
except (ImportError, AttributeError):
|
|
pass
|
|
return tmp_path
|
|
|
|
|
|
@pytest.fixture
|
|
def sessions_dir(tmp_path):
|
|
sdir = tmp_path / "sessions"
|
|
sdir.mkdir(parents=True, exist_ok=True)
|
|
return sdir
|
|
|
|
|
|
@pytest.fixture
|
|
def sample_sessions():
|
|
return {
|
|
"agent:main:telegram:dm:123456": {
|
|
"session_key": "agent:main:telegram:dm:123456",
|
|
"session_id": "20260329_120000_abc123",
|
|
"platform": "telegram",
|
|
"chat_type": "dm",
|
|
"display_name": "Alice",
|
|
"created_at": "2026-03-29T12:00:00",
|
|
"updated_at": "2026-03-29T14:30:00",
|
|
"input_tokens": 50000,
|
|
"output_tokens": 2000,
|
|
"total_tokens": 52000,
|
|
"origin": {
|
|
"platform": "telegram",
|
|
"chat_id": "123456",
|
|
"chat_name": "Alice",
|
|
"chat_type": "dm",
|
|
"user_id": "123456",
|
|
"user_name": "Alice",
|
|
"thread_id": None,
|
|
"chat_topic": None,
|
|
},
|
|
},
|
|
"agent:main:discord:group:789:456": {
|
|
"session_key": "agent:main:discord:group:789:456",
|
|
"session_id": "20260329_100000_def456",
|
|
"platform": "discord",
|
|
"chat_type": "group",
|
|
"display_name": "Bob",
|
|
"created_at": "2026-03-29T10:00:00",
|
|
"updated_at": "2026-03-29T13:00:00",
|
|
"input_tokens": 30000,
|
|
"output_tokens": 1000,
|
|
"total_tokens": 31000,
|
|
"origin": {
|
|
"platform": "discord",
|
|
"chat_id": "789",
|
|
"chat_name": "#general",
|
|
"chat_type": "group",
|
|
"user_id": "456",
|
|
"user_name": "Bob",
|
|
"thread_id": None,
|
|
"chat_topic": None,
|
|
},
|
|
},
|
|
"agent:main:slack:group:C1234:U5678": {
|
|
"session_key": "agent:main:slack:group:C1234:U5678",
|
|
"session_id": "20260328_090000_ghi789",
|
|
"platform": "slack",
|
|
"chat_type": "group",
|
|
"display_name": "Carol",
|
|
"created_at": "2026-03-28T09:00:00",
|
|
"updated_at": "2026-03-28T11:00:00",
|
|
"input_tokens": 10000,
|
|
"output_tokens": 500,
|
|
"total_tokens": 10500,
|
|
"origin": {
|
|
"platform": "slack",
|
|
"chat_id": "C1234",
|
|
"chat_name": "#engineering",
|
|
"chat_type": "group",
|
|
"user_id": "U5678",
|
|
"user_name": "Carol",
|
|
"thread_id": None,
|
|
"chat_topic": None,
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
@pytest.fixture
|
|
def populated_sessions_dir(sessions_dir, sample_sessions):
|
|
(sessions_dir / "sessions.json").write_text(json.dumps(sample_sessions))
|
|
return sessions_dir
|
|
|
|
|
|
def _create_test_db(db_path, session_id, messages):
|
|
"""Create a minimal SQLite DB mimicking hermes_state schema."""
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS sessions (
|
|
id TEXT PRIMARY KEY,
|
|
source TEXT DEFAULT 'cli',
|
|
started_at TEXT,
|
|
message_count INTEGER DEFAULT 0
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS messages (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
session_id TEXT NOT NULL,
|
|
role TEXT NOT NULL,
|
|
content TEXT,
|
|
tool_call_id TEXT,
|
|
tool_calls TEXT,
|
|
tool_name TEXT,
|
|
timestamp TEXT,
|
|
token_count INTEGER DEFAULT 0,
|
|
finish_reason TEXT,
|
|
reasoning TEXT,
|
|
reasoning_details TEXT,
|
|
codex_reasoning_items TEXT
|
|
)
|
|
""")
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO sessions (id, source, started_at, message_count) VALUES (?, 'gateway', ?, ?)",
|
|
(session_id, "2026-03-29T12:00:00", len(messages)),
|
|
)
|
|
for msg in messages:
|
|
content = msg.get("content", "")
|
|
if isinstance(content, (list, dict)):
|
|
content = json.dumps(content)
|
|
conn.execute(
|
|
"INSERT INTO messages (session_id, role, content, timestamp, tool_calls) VALUES (?, ?, ?, ?, ?)",
|
|
(session_id, msg["role"], content,
|
|
msg.get("timestamp", "2026-03-29T12:00:00"),
|
|
json.dumps(msg["tool_calls"]) if msg.get("tool_calls") else None),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_session_db(tmp_path, populated_sessions_dir):
|
|
"""Create a real SQLite DB with test messages and wire it up."""
|
|
db_path = tmp_path / "state.db"
|
|
messages = [
|
|
{"role": "user", "content": "Hello Alice!", "timestamp": "2026-03-29T12:00:01"},
|
|
{"role": "assistant", "content": "Hi! How can I help?", "timestamp": "2026-03-29T12:00:05"},
|
|
{"role": "user", "content": "Check the image MEDIA: /tmp/screenshot.png please",
|
|
"timestamp": "2026-03-29T12:01:00"},
|
|
{"role": "assistant", "content": "I see the screenshot. It shows a terminal.",
|
|
"timestamp": "2026-03-29T12:01:10"},
|
|
{"role": "tool", "content": '{"result": "ok"}', "timestamp": "2026-03-29T12:01:15"},
|
|
{"role": "user", "content": "Thanks!", "timestamp": "2026-03-29T12:02:00"},
|
|
]
|
|
_create_test_db(db_path, "20260329_120000_abc123", messages)
|
|
|
|
# Create a mock SessionDB that reads from our test DB
|
|
class TestSessionDB:
|
|
def __init__(self):
|
|
self._db_path = db_path
|
|
|
|
def get_messages(self, session_id):
|
|
conn = sqlite3.connect(str(self._db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
rows = conn.execute(
|
|
"SELECT * FROM messages WHERE session_id = ? ORDER BY id",
|
|
(session_id,),
|
|
).fetchall()
|
|
conn.close()
|
|
result = []
|
|
for r in rows:
|
|
d = dict(r)
|
|
if d.get("tool_calls"):
|
|
d["tool_calls"] = json.loads(d["tool_calls"])
|
|
result.append(d)
|
|
return result
|
|
|
|
return TestSessionDB()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 1. UNIT TESTS — helpers, extraction, attachments
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestImports:
|
|
def test_import_module(self):
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
assert hasattr(mcp_serve, "create_mcp_server")
|
|
assert hasattr(mcp_serve, "run_mcp_server")
|
|
assert hasattr(mcp_serve, "EventBridge")
|
|
|
|
def test_mcp_available_flag(self):
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
assert isinstance(mcp_serve._MCP_SERVER_AVAILABLE, bool)
|
|
|
|
|
|
class TestHelpers:
|
|
def test_get_sessions_dir(self, tmp_path):
|
|
from hermes_agent.tools.mcp.serve import _get_sessions_dir
|
|
result = _get_sessions_dir()
|
|
assert result == tmp_path / "sessions"
|
|
|
|
def test_load_sessions_index_empty(self, sessions_dir, monkeypatch):
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir)
|
|
assert mcp_serve._load_sessions_index() == {}
|
|
|
|
def test_load_sessions_index_with_data(self, populated_sessions_dir, monkeypatch):
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: populated_sessions_dir)
|
|
result = mcp_serve._load_sessions_index()
|
|
assert len(result) == 3
|
|
|
|
def test_load_sessions_index_corrupt(self, sessions_dir, monkeypatch):
|
|
(sessions_dir / "sessions.json").write_text("not json!")
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir)
|
|
assert mcp_serve._load_sessions_index() == {}
|
|
|
|
|
|
class TestContentExtraction:
|
|
def test_text(self):
|
|
from hermes_agent.tools.mcp.serve import _extract_message_content
|
|
assert _extract_message_content({"content": "Hello"}) == "Hello"
|
|
|
|
def test_multipart(self):
|
|
from hermes_agent.tools.mcp.serve import _extract_message_content
|
|
msg = {"content": [
|
|
{"type": "text", "text": "A"},
|
|
{"type": "image", "url": "http://x.com/i.png"},
|
|
{"type": "text", "text": "B"},
|
|
]}
|
|
assert _extract_message_content(msg) == "A\nB"
|
|
|
|
def test_empty(self):
|
|
from hermes_agent.tools.mcp.serve import _extract_message_content
|
|
assert _extract_message_content({"content": ""}) == ""
|
|
assert _extract_message_content({}) == ""
|
|
assert _extract_message_content({"content": None}) == ""
|
|
|
|
|
|
class TestAttachmentExtraction:
|
|
def test_image_url_block(self):
|
|
from hermes_agent.tools.mcp.serve import _extract_attachments
|
|
msg = {"content": [
|
|
{"type": "image_url", "image_url": {"url": "http://x.com/pic.jpg"}},
|
|
]}
|
|
att = _extract_attachments(msg)
|
|
assert len(att) == 1
|
|
assert att[0] == {"type": "image", "url": "http://x.com/pic.jpg"}
|
|
|
|
def test_media_tag_in_text(self):
|
|
from hermes_agent.tools.mcp.serve import _extract_attachments
|
|
msg = {"content": "Here MEDIA: /tmp/out.png done"}
|
|
att = _extract_attachments(msg)
|
|
assert len(att) == 1
|
|
assert att[0] == {"type": "media", "path": "/tmp/out.png"}
|
|
|
|
def test_multiple_media_tags(self):
|
|
from hermes_agent.tools.mcp.serve import _extract_attachments
|
|
msg = {"content": "MEDIA: /a.png and MEDIA: /b.mp3"}
|
|
assert len(_extract_attachments(msg)) == 2
|
|
|
|
def test_no_attachments(self):
|
|
from hermes_agent.tools.mcp.serve import _extract_attachments
|
|
assert _extract_attachments({"content": "plain text"}) == []
|
|
|
|
def test_image_content_block(self):
|
|
from hermes_agent.tools.mcp.serve import _extract_attachments
|
|
msg = {"content": [{"type": "image", "url": "http://x.com/p.png"}]}
|
|
att = _extract_attachments(msg)
|
|
assert att[0]["type"] == "image"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 2. EVENT BRIDGE TESTS — queue, cursors, waiters, concurrency
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestEventBridge:
|
|
def test_create(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge
|
|
b = EventBridge()
|
|
assert b._cursor == 0
|
|
assert b._queue == []
|
|
|
|
def test_enqueue_and_poll(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge, QueueEvent
|
|
b = EventBridge()
|
|
b._enqueue(QueueEvent(cursor=0, type="message", session_key="k1",
|
|
data={"content": "hi"}))
|
|
r = b.poll_events(after_cursor=0)
|
|
assert len(r["events"]) == 1
|
|
assert r["events"][0]["type"] == "message"
|
|
assert r["next_cursor"] == 1
|
|
|
|
def test_cursor_filter(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge, QueueEvent
|
|
b = EventBridge()
|
|
for i in range(5):
|
|
b._enqueue(QueueEvent(cursor=0, type="message", session_key=f"s{i}"))
|
|
r = b.poll_events(after_cursor=3)
|
|
assert len(r["events"]) == 2
|
|
assert r["events"][0]["session_key"] == "s3"
|
|
|
|
def test_session_filter(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge, QueueEvent
|
|
b = EventBridge()
|
|
b._enqueue(QueueEvent(cursor=0, type="message", session_key="a"))
|
|
b._enqueue(QueueEvent(cursor=0, type="message", session_key="b"))
|
|
b._enqueue(QueueEvent(cursor=0, type="message", session_key="a"))
|
|
r = b.poll_events(after_cursor=0, session_key="a")
|
|
assert len(r["events"]) == 2
|
|
|
|
def test_poll_empty(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge
|
|
r = EventBridge().poll_events(after_cursor=0)
|
|
assert r["events"] == []
|
|
assert r["next_cursor"] == 0
|
|
|
|
def test_poll_limit(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge, QueueEvent
|
|
b = EventBridge()
|
|
for i in range(10):
|
|
b._enqueue(QueueEvent(cursor=0, type="message", session_key=f"s{i}"))
|
|
r = b.poll_events(after_cursor=0, limit=3)
|
|
assert len(r["events"]) == 3
|
|
|
|
def test_wait_immediate(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge, QueueEvent
|
|
b = EventBridge()
|
|
b._enqueue(QueueEvent(cursor=0, type="message", session_key="t",
|
|
data={"content": "hi"}))
|
|
event = b.wait_for_event(after_cursor=0, timeout_ms=100)
|
|
assert event is not None
|
|
assert event["type"] == "message"
|
|
|
|
def test_wait_timeout(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge
|
|
start = time.monotonic()
|
|
event = EventBridge().wait_for_event(after_cursor=0, timeout_ms=150)
|
|
assert event is None
|
|
assert time.monotonic() - start >= 0.1
|
|
|
|
def test_wait_wakes_on_enqueue(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge, QueueEvent
|
|
b = EventBridge()
|
|
result = [None]
|
|
|
|
def waiter():
|
|
result[0] = b.wait_for_event(after_cursor=0, timeout_ms=5000)
|
|
|
|
t = threading.Thread(target=waiter)
|
|
t.start()
|
|
time.sleep(0.05)
|
|
b._enqueue(QueueEvent(cursor=0, type="message", session_key="wake"))
|
|
t.join(timeout=2)
|
|
assert result[0] is not None
|
|
assert result[0]["session_key"] == "wake"
|
|
|
|
def test_queue_limit(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge, QueueEvent, QUEUE_LIMIT
|
|
b = EventBridge()
|
|
for i in range(QUEUE_LIMIT + 50):
|
|
b._enqueue(QueueEvent(cursor=0, type="message", session_key=f"s{i}"))
|
|
assert len(b._queue) == QUEUE_LIMIT
|
|
|
|
def test_concurrent_enqueue(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge, QueueEvent
|
|
b = EventBridge()
|
|
errors = []
|
|
|
|
def batch(start):
|
|
try:
|
|
for i in range(100):
|
|
b._enqueue(QueueEvent(cursor=0, type="message",
|
|
session_key=f"s{start}_{i}"))
|
|
except Exception as e:
|
|
errors.append(e)
|
|
|
|
threads = [threading.Thread(target=batch, args=(i,)) for i in range(5)]
|
|
for t in threads:
|
|
t.start()
|
|
for t in threads:
|
|
t.join()
|
|
assert not errors
|
|
assert len(b._queue) == 500
|
|
assert b._cursor == 500
|
|
|
|
def test_approvals_lifecycle(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge
|
|
b = EventBridge()
|
|
b._pending_approvals["a1"] = {
|
|
"id": "a1", "kind": "exec",
|
|
"description": "rm -rf /tmp",
|
|
"session_key": "test", "created_at": "2026-03-29T12:00:00",
|
|
}
|
|
assert len(b.list_pending_approvals()) == 1
|
|
result = b.respond_to_approval("a1", "deny")
|
|
assert result["resolved"] is True
|
|
assert len(b.list_pending_approvals()) == 0
|
|
|
|
def test_respond_nonexistent(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge
|
|
r = EventBridge().respond_to_approval("nope", "deny")
|
|
assert "error" in r
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 3. END-TO-END TESTS — call MCP tools through FastMCP server
|
|
# ---------------------------------------------------------------------------
|
|
|
|
@pytest.fixture
|
|
def mcp_server_e2e(populated_sessions_dir, mock_session_db, monkeypatch):
|
|
"""Create a fully wired MCP server for E2E testing."""
|
|
mcp = pytest.importorskip("mcp", reason="MCP SDK not installed")
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: populated_sessions_dir)
|
|
monkeypatch.setattr(mcp_serve, "_get_session_db", lambda: mock_session_db)
|
|
monkeypatch.setattr(mcp_serve, "_load_channel_directory", lambda: {})
|
|
|
|
bridge = mcp_serve.EventBridge()
|
|
server = mcp_serve.create_mcp_server(event_bridge=bridge)
|
|
return server, bridge
|
|
|
|
|
|
def _run_tool(server, name, args=None):
|
|
"""Call an MCP tool through FastMCP's tool manager and return parsed JSON."""
|
|
result = asyncio.get_event_loop().run_until_complete(
|
|
server._tool_manager.call_tool(name, args or {})
|
|
)
|
|
return json.loads(result) if isinstance(result, str) else result
|
|
|
|
|
|
@pytest.fixture
|
|
def _event_loop():
|
|
"""Ensure an event loop exists for sync tests calling async tools."""
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
yield loop
|
|
loop.close()
|
|
|
|
|
|
class TestE2EConversationsList:
|
|
def test_list_all(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "conversations_list")
|
|
assert result["count"] == 3
|
|
platforms = {c["platform"] for c in result["conversations"]}
|
|
assert platforms == {"telegram", "discord", "slack"}
|
|
|
|
def test_list_sorted_by_updated(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "conversations_list")
|
|
keys = [c["session_key"] for c in result["conversations"]]
|
|
# Telegram (14:30) > Discord (13:00) > Slack (11:00)
|
|
assert keys[0] == "agent:main:telegram:dm:123456"
|
|
assert keys[1] == "agent:main:discord:group:789:456"
|
|
assert keys[2] == "agent:main:slack:group:C1234:U5678"
|
|
|
|
def test_filter_by_platform(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "conversations_list", {"platform": "discord"})
|
|
assert result["count"] == 1
|
|
assert result["conversations"][0]["platform"] == "discord"
|
|
|
|
def test_filter_by_platform_case_insensitive(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "conversations_list", {"platform": "TELEGRAM"})
|
|
assert result["count"] == 1
|
|
|
|
def test_search_by_name(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "conversations_list", {"search": "Alice"})
|
|
assert result["count"] == 1
|
|
assert result["conversations"][0]["display_name"] == "Alice"
|
|
|
|
def test_search_no_match(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "conversations_list", {"search": "nobody"})
|
|
assert result["count"] == 0
|
|
|
|
def test_limit(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "conversations_list", {"limit": 2})
|
|
assert result["count"] == 2
|
|
|
|
|
|
class TestE2EConversationGet:
|
|
def test_get_existing(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "conversation_get",
|
|
{"session_key": "agent:main:telegram:dm:123456"})
|
|
assert result["platform"] == "telegram"
|
|
assert result["display_name"] == "Alice"
|
|
assert result["chat_id"] == "123456"
|
|
assert result["input_tokens"] == 50000
|
|
|
|
def test_get_nonexistent(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "conversation_get",
|
|
{"session_key": "nonexistent:key"})
|
|
assert "error" in result
|
|
|
|
|
|
class TestE2EMessagesRead:
|
|
def test_read_messages(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "messages_read",
|
|
{"session_key": "agent:main:telegram:dm:123456"})
|
|
assert result["count"] > 0
|
|
# Should filter out tool messages — only user/assistant
|
|
roles = {m["role"] for m in result["messages"]}
|
|
assert "tool" not in roles
|
|
assert "user" in roles
|
|
assert "assistant" in roles
|
|
|
|
def test_read_messages_content(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "messages_read",
|
|
{"session_key": "agent:main:telegram:dm:123456"})
|
|
contents = [m["content"] for m in result["messages"]]
|
|
assert "Hello Alice!" in contents
|
|
assert "Hi! How can I help?" in contents
|
|
|
|
def test_read_messages_have_ids(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "messages_read",
|
|
{"session_key": "agent:main:telegram:dm:123456"})
|
|
for msg in result["messages"]:
|
|
assert "id" in msg
|
|
assert msg["id"] # non-empty
|
|
|
|
def test_read_with_limit(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "messages_read",
|
|
{"session_key": "agent:main:telegram:dm:123456",
|
|
"limit": 2})
|
|
assert result["count"] == 2
|
|
|
|
def test_read_nonexistent_session(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "messages_read",
|
|
{"session_key": "nonexistent:key"})
|
|
assert "error" in result
|
|
|
|
|
|
class TestE2EAttachmentsFetch:
|
|
def test_fetch_media_from_message(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
# First get message IDs
|
|
msgs = _run_tool(server, "messages_read",
|
|
{"session_key": "agent:main:telegram:dm:123456"})
|
|
# Find the message with MEDIA: tag
|
|
media_msg = None
|
|
for m in msgs["messages"]:
|
|
if "MEDIA:" in m["content"]:
|
|
media_msg = m
|
|
break
|
|
assert media_msg is not None, "Should have a message with MEDIA: tag"
|
|
|
|
result = _run_tool(server, "attachments_fetch", {
|
|
"session_key": "agent:main:telegram:dm:123456",
|
|
"message_id": media_msg["id"],
|
|
})
|
|
assert result["count"] >= 1
|
|
assert result["attachments"][0]["type"] == "media"
|
|
assert result["attachments"][0]["path"] == "/tmp/screenshot.png"
|
|
|
|
def test_fetch_from_nonexistent_message(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "attachments_fetch", {
|
|
"session_key": "agent:main:telegram:dm:123456",
|
|
"message_id": "99999",
|
|
})
|
|
assert "error" in result
|
|
|
|
def test_fetch_from_nonexistent_session(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "attachments_fetch", {
|
|
"session_key": "nonexistent:key",
|
|
"message_id": "1",
|
|
})
|
|
assert "error" in result
|
|
|
|
|
|
class TestE2EEventsPoll:
|
|
def test_poll_empty(self, mcp_server_e2e, _event_loop):
|
|
server, bridge = mcp_server_e2e
|
|
result = _run_tool(server, "events_poll")
|
|
assert result["events"] == []
|
|
assert result["next_cursor"] == 0
|
|
|
|
def test_poll_with_events(self, mcp_server_e2e, _event_loop):
|
|
from hermes_agent.tools.mcp.serve import QueueEvent
|
|
server, bridge = mcp_server_e2e
|
|
bridge._enqueue(QueueEvent(cursor=0, type="message",
|
|
session_key="agent:main:telegram:dm:123456",
|
|
data={"role": "user", "content": "Hello"}))
|
|
bridge._enqueue(QueueEvent(cursor=0, type="message",
|
|
session_key="agent:main:telegram:dm:123456",
|
|
data={"role": "assistant", "content": "Hi"}))
|
|
|
|
result = _run_tool(server, "events_poll")
|
|
assert len(result["events"]) == 2
|
|
assert result["events"][0]["content"] == "Hello"
|
|
assert result["events"][1]["content"] == "Hi"
|
|
assert result["next_cursor"] == 2
|
|
|
|
def test_poll_cursor_pagination(self, mcp_server_e2e, _event_loop):
|
|
from hermes_agent.tools.mcp.serve import QueueEvent
|
|
server, bridge = mcp_server_e2e
|
|
for i in range(5):
|
|
bridge._enqueue(QueueEvent(cursor=0, type="message",
|
|
session_key=f"s{i}"))
|
|
|
|
page1 = _run_tool(server, "events_poll", {"limit": 2})
|
|
assert len(page1["events"]) == 2
|
|
assert page1["next_cursor"] == 2
|
|
|
|
page2 = _run_tool(server, "events_poll",
|
|
{"after_cursor": page1["next_cursor"], "limit": 2})
|
|
assert len(page2["events"]) == 2
|
|
assert page2["next_cursor"] == 4
|
|
|
|
def test_poll_session_filter(self, mcp_server_e2e, _event_loop):
|
|
from hermes_agent.tools.mcp.serve import QueueEvent
|
|
server, bridge = mcp_server_e2e
|
|
bridge._enqueue(QueueEvent(cursor=0, type="message", session_key="a"))
|
|
bridge._enqueue(QueueEvent(cursor=0, type="message", session_key="b"))
|
|
bridge._enqueue(QueueEvent(cursor=0, type="message", session_key="a"))
|
|
|
|
result = _run_tool(server, "events_poll",
|
|
{"session_key": "b"})
|
|
assert len(result["events"]) == 1
|
|
|
|
|
|
class TestE2EEventsWait:
|
|
def test_wait_timeout(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "events_wait", {"timeout_ms": 100})
|
|
assert result["event"] is None
|
|
assert result["reason"] == "timeout"
|
|
|
|
def test_wait_with_existing_event(self, mcp_server_e2e, _event_loop):
|
|
from hermes_agent.tools.mcp.serve import QueueEvent
|
|
server, bridge = mcp_server_e2e
|
|
bridge._enqueue(QueueEvent(cursor=0, type="message",
|
|
session_key="test",
|
|
data={"content": "waiting for this"}))
|
|
result = _run_tool(server, "events_wait", {"timeout_ms": 100})
|
|
assert result["event"] is not None
|
|
assert result["event"]["content"] == "waiting for this"
|
|
|
|
def test_wait_caps_timeout(self, mcp_server_e2e, _event_loop):
|
|
"""Timeout should be capped at 300000ms (5 min)."""
|
|
from hermes_agent.tools.mcp.serve import QueueEvent
|
|
server, bridge = mcp_server_e2e
|
|
bridge._enqueue(QueueEvent(cursor=0, type="message", session_key="t"))
|
|
# Even with huge timeout, should return immediately since event exists
|
|
result = _run_tool(server, "events_wait", {"timeout_ms": 999999})
|
|
assert result["event"] is not None
|
|
|
|
|
|
class TestE2EMessagesSend:
|
|
def test_send_missing_args(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "messages_send", {"target": "", "message": "hi"})
|
|
assert "error" in result
|
|
|
|
def test_send_delegates_to_tool(self, mcp_server_e2e, _event_loop, monkeypatch):
|
|
server, _ = mcp_server_e2e
|
|
mock = MagicMock(return_value=json.dumps({"success": True, "platform": "telegram"}))
|
|
monkeypatch.setattr("hermes_agent.tools.send_message.send_message_tool", mock)
|
|
|
|
result = _run_tool(server, "messages_send",
|
|
{"target": "telegram:123456", "message": "Hello!"})
|
|
assert result["success"] is True
|
|
mock.assert_called_once()
|
|
call_args = mock.call_args[0][0]
|
|
assert call_args["action"] == "send"
|
|
assert call_args["target"] == "telegram:123456"
|
|
|
|
|
|
class TestE2EChannelsList:
|
|
def test_channels_from_sessions(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "channels_list")
|
|
assert result["count"] == 3
|
|
targets = {c["target"] for c in result["channels"]}
|
|
assert "telegram:123456" in targets
|
|
assert "discord:789" in targets
|
|
assert "slack:C1234" in targets
|
|
|
|
def test_channels_platform_filter(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "channels_list", {"platform": "slack"})
|
|
assert result["count"] == 1
|
|
assert result["channels"][0]["target"] == "slack:C1234"
|
|
|
|
def test_channels_with_directory(self, mcp_server_e2e, _event_loop, monkeypatch):
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
monkeypatch.setattr(mcp_serve, "_load_channel_directory", lambda: {
|
|
"telegram": [
|
|
{"id": "123456", "name": "Alice", "type": "dm"},
|
|
{"id": "-100999", "name": "Dev Group", "type": "group"},
|
|
],
|
|
})
|
|
# Need to recreate server to pick up the new mock
|
|
server, bridge = mcp_server_e2e
|
|
# The tool closure already captured the old mock, so test the function directly
|
|
directory = mcp_serve._load_channel_directory()
|
|
assert len(directory["telegram"]) == 2
|
|
|
|
|
|
class TestE2EPermissions:
|
|
def test_list_empty(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "permissions_list_open")
|
|
assert result["count"] == 0
|
|
assert result["approvals"] == []
|
|
|
|
def test_list_with_approvals(self, mcp_server_e2e, _event_loop):
|
|
server, bridge = mcp_server_e2e
|
|
bridge._pending_approvals["a1"] = {
|
|
"id": "a1", "kind": "exec",
|
|
"description": "sudo rm -rf /",
|
|
"session_key": "test",
|
|
"created_at": "2026-03-29T12:00:00",
|
|
}
|
|
result = _run_tool(server, "permissions_list_open")
|
|
assert result["count"] == 1
|
|
assert result["approvals"][0]["id"] == "a1"
|
|
|
|
def test_respond_allow(self, mcp_server_e2e, _event_loop):
|
|
server, bridge = mcp_server_e2e
|
|
bridge._pending_approvals["a1"] = {"id": "a1", "kind": "exec"}
|
|
result = _run_tool(server, "permissions_respond",
|
|
{"id": "a1", "decision": "allow-once"})
|
|
assert result["resolved"] is True
|
|
assert result["decision"] == "allow-once"
|
|
# Should be gone now
|
|
check = _run_tool(server, "permissions_list_open")
|
|
assert check["count"] == 0
|
|
|
|
def test_respond_deny(self, mcp_server_e2e, _event_loop):
|
|
server, bridge = mcp_server_e2e
|
|
bridge._pending_approvals["a2"] = {"id": "a2", "kind": "plugin"}
|
|
result = _run_tool(server, "permissions_respond",
|
|
{"id": "a2", "decision": "deny"})
|
|
assert result["resolved"] is True
|
|
|
|
def test_respond_invalid_decision(self, mcp_server_e2e, _event_loop):
|
|
server, bridge = mcp_server_e2e
|
|
bridge._pending_approvals["a3"] = {"id": "a3", "kind": "exec"}
|
|
result = _run_tool(server, "permissions_respond",
|
|
{"id": "a3", "decision": "maybe"})
|
|
assert "error" in result
|
|
|
|
def test_respond_nonexistent(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
result = _run_tool(server, "permissions_respond",
|
|
{"id": "nope", "decision": "deny"})
|
|
assert "error" in result
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 4. TOOL LISTING — verify all 10 tools are registered
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestToolRegistration:
|
|
def test_all_tools_registered(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
tools = server._tool_manager.list_tools()
|
|
tool_names = {t.name for t in tools}
|
|
|
|
expected = {
|
|
"conversations_list", "conversation_get", "messages_read",
|
|
"attachments_fetch", "events_poll", "events_wait",
|
|
"messages_send", "channels_list",
|
|
"permissions_list_open", "permissions_respond",
|
|
}
|
|
assert expected == tool_names, f"Missing: {expected - tool_names}, Extra: {tool_names - expected}"
|
|
|
|
def test_tools_have_descriptions(self, mcp_server_e2e, _event_loop):
|
|
server, _ = mcp_server_e2e
|
|
for tool in server._tool_manager.list_tools():
|
|
assert tool.description, f"Tool {tool.name} has no description"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 5. SERVER LIFECYCLE / CLI INTEGRATION
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestServerCreation:
|
|
def test_create_server(self, populated_sessions_dir, monkeypatch):
|
|
pytest.importorskip("mcp", reason="MCP SDK not installed")
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: populated_sessions_dir)
|
|
assert mcp_serve.create_mcp_server() is not None
|
|
|
|
def test_create_with_bridge(self, populated_sessions_dir, monkeypatch):
|
|
pytest.importorskip("mcp", reason="MCP SDK not installed")
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: populated_sessions_dir)
|
|
bridge = mcp_serve.EventBridge()
|
|
assert mcp_serve.create_mcp_server(event_bridge=bridge) is not None
|
|
|
|
def test_create_without_mcp_sdk(self, monkeypatch):
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
monkeypatch.setattr(mcp_serve, "_MCP_SERVER_AVAILABLE", False)
|
|
with pytest.raises(ImportError, match="MCP server requires"):
|
|
mcp_serve.create_mcp_server()
|
|
|
|
|
|
class TestRunMcpServer:
|
|
def test_run_without_mcp_exits(self, monkeypatch):
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
monkeypatch.setattr(mcp_serve, "_MCP_SERVER_AVAILABLE", False)
|
|
with pytest.raises(SystemExit) as exc_info:
|
|
mcp_serve.run_mcp_server()
|
|
assert exc_info.value.code == 1
|
|
|
|
|
|
class TestCliIntegration:
|
|
def test_parse_serve(self):
|
|
import argparse
|
|
parser = argparse.ArgumentParser()
|
|
subs = parser.add_subparsers(dest="command")
|
|
mcp_p = subs.add_parser("mcp")
|
|
mcp_sub = mcp_p.add_subparsers(dest="mcp_action")
|
|
serve_p = mcp_sub.add_parser("serve")
|
|
serve_p.add_argument("-v", "--verbose", action="store_true")
|
|
|
|
args = parser.parse_args(["mcp", "serve"])
|
|
assert args.mcp_action == "serve"
|
|
assert args.verbose is False
|
|
|
|
def test_parse_serve_verbose(self):
|
|
import argparse
|
|
parser = argparse.ArgumentParser()
|
|
subs = parser.add_subparsers(dest="command")
|
|
mcp_p = subs.add_parser("mcp")
|
|
mcp_sub = mcp_p.add_subparsers(dest="mcp_action")
|
|
serve_p = mcp_sub.add_parser("serve")
|
|
serve_p.add_argument("-v", "--verbose", action="store_true")
|
|
|
|
args = parser.parse_args(["mcp", "serve", "--verbose"])
|
|
assert args.verbose is True
|
|
|
|
def test_dispatcher_routes_serve(self, monkeypatch, tmp_path):
|
|
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
|
|
mock_run = MagicMock()
|
|
monkeypatch.setattr("hermes_agent.tools.mcp.serve.run_mcp_server", mock_run)
|
|
|
|
import argparse
|
|
args = argparse.Namespace(mcp_action="serve", verbose=True)
|
|
from hermes_agent.cli.mcp_config import mcp_command
|
|
mcp_command(args)
|
|
mock_run.assert_called_once_with(verbose=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 6. EDGE CASES
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestEdgeCases:
|
|
def test_empty_sessions_json(self, sessions_dir, monkeypatch):
|
|
(sessions_dir / "sessions.json").write_text("{}")
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir)
|
|
assert mcp_serve._load_sessions_index() == {}
|
|
|
|
def test_sessions_without_origin(self, sessions_dir, monkeypatch):
|
|
data = {"agent:main:telegram:dm:111": {
|
|
"session_key": "agent:main:telegram:dm:111",
|
|
"session_id": "20260329_120000_xyz",
|
|
"platform": "telegram",
|
|
"updated_at": "2026-03-29T12:00:00",
|
|
}}
|
|
(sessions_dir / "sessions.json").write_text(json.dumps(data))
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir)
|
|
entries = mcp_serve._load_sessions_index()
|
|
assert entries["agent:main:telegram:dm:111"]["platform"] == "telegram"
|
|
|
|
def test_bridge_start_stop(self):
|
|
from hermes_agent.tools.mcp.serve import EventBridge
|
|
b = EventBridge()
|
|
assert not b._running
|
|
b._running = True
|
|
b.stop()
|
|
assert not b._running
|
|
|
|
def test_truncation(self):
|
|
assert len(("x" * 5000)[:2000]) == 2000
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# 7. EVENT BRIDGE POLL LOOP E2E — real SQLite DB, mtime optimization
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestEventBridgePollE2E:
|
|
"""End-to-end tests for the EventBridge polling loop with real files."""
|
|
|
|
def test_poll_detects_new_messages(self, tmp_path, monkeypatch):
|
|
"""Write to SQLite + sessions.json, verify EventBridge picks it up."""
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
sessions_dir = tmp_path / "sessions"
|
|
sessions_dir.mkdir()
|
|
monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir)
|
|
|
|
session_id = "20260329_150000_poll_test"
|
|
db_path = tmp_path / "state.db"
|
|
|
|
# Write sessions.json
|
|
sessions_data = {
|
|
"agent:main:telegram:dm:poll_test": {
|
|
"session_key": "agent:main:telegram:dm:poll_test",
|
|
"session_id": session_id,
|
|
"platform": "telegram",
|
|
"chat_type": "dm",
|
|
"display_name": "PollTest",
|
|
"updated_at": "2026-03-29T15:00:05",
|
|
"origin": {"platform": "telegram", "chat_id": "poll_test"},
|
|
}
|
|
}
|
|
(sessions_dir / "sessions.json").write_text(json.dumps(sessions_data))
|
|
|
|
# Write messages to SQLite
|
|
messages = [
|
|
{"role": "user", "content": "First message",
|
|
"timestamp": "2026-03-29T15:00:01"},
|
|
{"role": "assistant", "content": "Reply",
|
|
"timestamp": "2026-03-29T15:00:03"},
|
|
]
|
|
_create_test_db(db_path, session_id, messages)
|
|
|
|
# Create a mock SessionDB that reads our test DB
|
|
class TestDB:
|
|
def get_messages(self, sid):
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
rows = conn.execute(
|
|
"SELECT * FROM messages WHERE session_id = ? ORDER BY id",
|
|
(sid,),
|
|
).fetchall()
|
|
conn.close()
|
|
return [dict(r) for r in rows]
|
|
|
|
monkeypatch.setattr(mcp_serve, "_get_session_db", lambda: TestDB())
|
|
|
|
bridge = mcp_serve.EventBridge()
|
|
# Run one poll cycle manually
|
|
bridge._poll_once(TestDB())
|
|
|
|
# Should have found the messages
|
|
result = bridge.poll_events(after_cursor=0)
|
|
assert len(result["events"]) == 2
|
|
assert result["events"][0]["role"] == "user"
|
|
assert result["events"][0]["content"] == "First message"
|
|
assert result["events"][1]["role"] == "assistant"
|
|
|
|
def test_poll_skips_when_unchanged(self, tmp_path, monkeypatch):
|
|
"""Second poll with no file changes should be a no-op."""
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
sessions_dir = tmp_path / "sessions"
|
|
sessions_dir.mkdir()
|
|
monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir)
|
|
|
|
session_id = "20260329_150000_skip_test"
|
|
db_path = tmp_path / "state.db"
|
|
|
|
sessions_data = {
|
|
"agent:main:telegram:dm:skip": {
|
|
"session_key": "agent:main:telegram:dm:skip",
|
|
"session_id": session_id,
|
|
"platform": "telegram",
|
|
"updated_at": "2026-03-29T15:00:05",
|
|
"origin": {"platform": "telegram", "chat_id": "skip"},
|
|
}
|
|
}
|
|
(sessions_dir / "sessions.json").write_text(json.dumps(sessions_data))
|
|
_create_test_db(db_path, session_id, [
|
|
{"role": "user", "content": "Hello", "timestamp": "2026-03-29T15:00:01"},
|
|
])
|
|
|
|
class TestDB:
|
|
def __init__(self):
|
|
self.call_count = 0
|
|
|
|
def get_messages(self, sid):
|
|
self.call_count += 1
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
rows = conn.execute(
|
|
"SELECT * FROM messages WHERE session_id = ? ORDER BY id",
|
|
(sid,),
|
|
).fetchall()
|
|
conn.close()
|
|
return [dict(r) for r in rows]
|
|
|
|
db = TestDB()
|
|
bridge = mcp_serve.EventBridge()
|
|
|
|
# First poll — should process
|
|
bridge._poll_once(db)
|
|
first_calls = db.call_count
|
|
assert first_calls >= 1
|
|
|
|
# Second poll — files unchanged, should skip entirely
|
|
bridge._poll_once(db)
|
|
assert db.call_count == first_calls, \
|
|
"Second poll should skip DB queries when files unchanged"
|
|
|
|
def test_poll_detects_new_message_after_db_write(self, tmp_path, monkeypatch):
|
|
"""Write a new message to the DB after first poll, verify it's detected."""
|
|
from hermes_agent.tools.mcp import serve as mcp_serve
|
|
sessions_dir = tmp_path / "sessions"
|
|
sessions_dir.mkdir()
|
|
monkeypatch.setattr(mcp_serve, "_get_sessions_dir", lambda: sessions_dir)
|
|
|
|
session_id = "20260329_150000_new_msg"
|
|
db_path = tmp_path / "state.db"
|
|
|
|
sessions_data = {
|
|
"agent:main:telegram:dm:new": {
|
|
"session_key": "agent:main:telegram:dm:new",
|
|
"session_id": session_id,
|
|
"platform": "telegram",
|
|
"updated_at": "2026-03-29T15:00:05",
|
|
"origin": {"platform": "telegram", "chat_id": "new"},
|
|
}
|
|
}
|
|
(sessions_dir / "sessions.json").write_text(json.dumps(sessions_data))
|
|
_create_test_db(db_path, session_id, [
|
|
{"role": "user", "content": "First", "timestamp": "2026-03-29T15:00:01"},
|
|
])
|
|
|
|
class TestDB:
|
|
def get_messages(self, sid):
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
rows = conn.execute(
|
|
"SELECT * FROM messages WHERE session_id = ? ORDER BY id",
|
|
(sid,),
|
|
).fetchall()
|
|
conn.close()
|
|
return [dict(r) for r in rows]
|
|
|
|
db = TestDB()
|
|
bridge = mcp_serve.EventBridge()
|
|
|
|
# First poll
|
|
bridge._poll_once(db)
|
|
r1 = bridge.poll_events(after_cursor=0)
|
|
assert len(r1["events"]) == 1
|
|
|
|
# Add a new message to the DB
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.execute(
|
|
"INSERT INTO messages (session_id, role, content, timestamp) VALUES (?, ?, ?, ?)",
|
|
(session_id, "assistant", "New reply!", "2026-03-29T15:00:10"),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
# Touch the DB file to update mtime (WAL mode may not update mtime on small writes)
|
|
os.utime(db_path, None)
|
|
|
|
# Update sessions.json updated_at to trigger re-check
|
|
sessions_data["agent:main:telegram:dm:new"]["updated_at"] = "2026-03-29T15:00:10"
|
|
(sessions_dir / "sessions.json").write_text(json.dumps(sessions_data))
|
|
|
|
# Second poll — should detect the new message
|
|
bridge._poll_once(db)
|
|
r2 = bridge.poll_events(after_cursor=r1["next_cursor"])
|
|
assert len(r2["events"]) == 1
|
|
assert r2["events"][0]["content"] == "New reply!"
|
|
|
|
def test_poll_interval_is_200ms(self):
|
|
"""Verify the poll interval constant."""
|
|
from hermes_agent.tools.mcp.serve import POLL_INTERVAL
|
|
assert POLL_INTERVAL == 0.2
|