hermes-agent/tests/tools/test_mcp_oauth.py
Teknium 70768665a4
fix(mcp): consolidate OAuth handling, pick up external token refreshes (#11383)
* feat(mcp-oauth): scaffold MCPOAuthManager

Central manager for per-server MCP OAuth state. Provides
get_or_build_provider (cached), remove (evicts cache + deletes
disk), invalidate_if_disk_changed (mtime watch, core fix for
external-refresh workflow), and handle_401 (dedup'd recovery).

No behavior change yet — existing call sites still use
build_oauth_auth directly. Task 1 of 8 in the MCP OAuth
consolidation (fixes Cthulhu's BetterStack reliability issues).

* feat(mcp-oauth): add HermesMCPOAuthProvider with pre-flow disk watch

Subclasses the MCP SDK's OAuthClientProvider to inject a disk
mtime check before every async_auth_flow, via the central
manager. When a subclass instance is used, external token
refreshes (cron, another CLI instance) are picked up before
the next API call.

Still dead code: the manager's _build_provider still delegates
to build_oauth_auth and returns the plain OAuthClientProvider.
Task 4 wires this subclass in. Task 2 of 8.

* refactor(mcp-oauth): extract build_oauth_auth helpers

Decomposes build_oauth_auth into _configure_callback_port,
_build_client_metadata, _maybe_preregister_client, and
_parse_base_url. Public API preserved. These helpers let
MCPOAuthManager._build_provider reuse the same logic in Task 4
instead of duplicating the construction dance.

Also updates the SDK version hint in the warning from 1.10.0 to
1.26.0 (which is what we actually require for the OAuth types
used here). Task 3 of 8.

* feat(mcp-oauth): manager now builds HermesMCPOAuthProvider directly

_build_provider constructs the disk-watching subclass using the
helpers from Task 3, instead of delegating to the plain
build_oauth_auth factory. Any consumer using the manager now gets
pre-flow disk-freshness checks automatically.

build_oauth_auth is preserved as the public API for backwards
compatibility. The code path is now:

    MCPOAuthManager.get_or_build_provider  ->
      _build_provider  ->
        _configure_callback_port
        _build_client_metadata
        _maybe_preregister_client
        _parse_base_url
        HermesMCPOAuthProvider(...)

Task 4 of 8.

* feat(mcp): wire OAuth manager + add _reconnect_event

MCPServerTask gains _reconnect_event alongside _shutdown_event.
When set, _run_http / _run_stdio exit their async-with blocks
cleanly (no exception), and the outer run() loop re-enters the
transport to rebuild the MCP session with fresh credentials.
This is the recovery path for OAuth failures that the SDK's
in-place httpx.Auth cannot handle (e.g. cron externally consumed
the refresh_token, or server-side session invalidation).

_run_http now asks MCPOAuthManager for the OAuth provider
instead of calling build_oauth_auth directly. Config-time,
runtime, and reconnect paths all share one provider instance
with pre-flow disk-watch active.

shutdown() defensively sets both events so there is no race
between reconnect and shutdown signalling.

Task 5 of 8.

* feat(mcp): detect auth failures in tool handlers, trigger reconnect

All 5 MCP tool handlers (tool call, list_resources, read_resource,
list_prompts, get_prompt) now detect auth failures and route
through MCPOAuthManager.handle_401:

  1. If the manager says recovery is viable (disk has fresh tokens,
     or SDK can refresh in-place), signal MCPServerTask._reconnect_event
     to tear down and rebuild the MCP session with fresh credentials,
     then retry the tool call once.

  2. If no recovery path exists, return a structured needs_reauth
     JSON error so the model stops hallucinating manual refresh
     attempts (the 'let me curl the token endpoint' loop Cthulhu
     pasted from Discord).

_is_auth_error catches OAuthFlowError, OAuthTokenError,
OAuthNonInteractiveError, and httpx.HTTPStatusError(401). Non-auth
exceptions still surface via the generic error path unchanged.

Task 6 of 8.

* feat(mcp-cli): route add/remove through manager, add 'hermes mcp login'

cmd_mcp_add and cmd_mcp_remove now go through MCPOAuthManager
instead of calling build_oauth_auth / remove_oauth_tokens
directly. This means CLI config-time state and runtime MCP
session state are backed by the same provider cache — removing
a server evicts the live provider, adding a server populates
the same cache the MCP session will read from.

New 'hermes mcp login <name>' command:
  - Wipes both the on-disk tokens file and the in-memory
    MCPOAuthManager cache
  - Triggers a fresh OAuth browser flow via the existing probe
    path
  - Intended target for the needs_reauth error Task 6 returns
    to the model

Task 7 of 8.

* test(mcp-oauth): end-to-end integration tests

Five new tests exercising the full consolidation with real file
I/O and real imports (no transport mocks):

  1. external_refresh_picked_up_without_restart — Cthulhu's cron
     workflow. External process writes fresh tokens to disk;
     on the next auth flow the manager's mtime-watch flips
     _initialized and the SDK re-reads from storage.

  2. handle_401_deduplicates_concurrent_callers — 10 concurrent
     handlers for the same failed token fire exactly ONE recovery
     attempt (thundering-herd protection).

  3. handle_401_returns_false_when_no_provider — defensive path
     for unknown servers.

  4. invalidate_if_disk_changed_handles_missing_file — pre-auth
     state returns False cleanly.

  5. provider_is_reused_across_reconnects — cache stickiness so
     reconnects preserve the disk-watch baseline mtime.

Task 8 of 8 — consolidation complete.
2026-04-16 21:57:10 -07:00

501 lines
18 KiB
Python

"""Tests for tools/mcp_oauth.py — OAuth 2.1 PKCE support for MCP servers."""
import json
import os
from io import BytesIO
from pathlib import Path
from unittest.mock import patch, MagicMock, AsyncMock
import pytest
from tools.mcp_oauth import (
HermesTokenStorage,
OAuthNonInteractiveError,
build_oauth_auth,
remove_oauth_tokens,
_find_free_port,
_can_open_browser,
_is_interactive,
_wait_for_callback,
_make_callback_handler,
)
# ---------------------------------------------------------------------------
# HermesTokenStorage
# ---------------------------------------------------------------------------
class TestHermesTokenStorage:
def test_roundtrip_tokens(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("test-server")
import asyncio
# Initially empty
assert asyncio.run(storage.get_tokens()) is None
# Save and retrieve
mock_token = MagicMock()
mock_token.model_dump.return_value = {
"access_token": "abc123",
"token_type": "Bearer",
"refresh_token": "ref456",
}
asyncio.run(storage.set_tokens(mock_token))
# File exists with correct permissions
token_path = tmp_path / "mcp-tokens" / "test-server.json"
assert token_path.exists()
data = json.loads(token_path.read_text())
assert data["access_token"] == "abc123"
def test_roundtrip_client_info(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("test-server")
import asyncio
assert asyncio.run(storage.get_client_info()) is None
mock_client = MagicMock()
mock_client.model_dump.return_value = {
"client_id": "hermes-123",
"client_secret": "secret",
}
asyncio.run(storage.set_client_info(mock_client))
client_path = tmp_path / "mcp-tokens" / "test-server.client.json"
assert client_path.exists()
def test_remove_cleans_up(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("test-server")
# Create files
d = tmp_path / "mcp-tokens"
d.mkdir(parents=True)
(d / "test-server.json").write_text("{}")
(d / "test-server.client.json").write_text("{}")
storage.remove()
assert not (d / "test-server.json").exists()
assert not (d / "test-server.client.json").exists()
def test_has_cached_tokens(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("my-server")
assert not storage.has_cached_tokens()
d = tmp_path / "mcp-tokens"
d.mkdir(parents=True)
(d / "my-server.json").write_text('{"access_token": "x", "token_type": "Bearer"}')
assert storage.has_cached_tokens()
def test_corrupt_tokens_returns_none(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("bad-server")
d = tmp_path / "mcp-tokens"
d.mkdir(parents=True)
(d / "bad-server.json").write_text("NOT VALID JSON{{{")
import asyncio
assert asyncio.run(storage.get_tokens()) is None
def test_corrupt_client_info_returns_none(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("bad-server")
d = tmp_path / "mcp-tokens"
d.mkdir(parents=True)
(d / "bad-server.client.json").write_text("GARBAGE")
import asyncio
assert asyncio.run(storage.get_client_info()) is None
# ---------------------------------------------------------------------------
# build_oauth_auth
# ---------------------------------------------------------------------------
class TestBuildOAuthAuth:
def test_returns_oauth_provider(self, tmp_path, monkeypatch):
try:
from mcp.client.auth import OAuthClientProvider
except ImportError:
pytest.skip("MCP SDK auth not available")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
auth = build_oauth_auth("test", "https://example.com/mcp")
assert isinstance(auth, OAuthClientProvider)
def test_returns_none_without_sdk(self, monkeypatch):
import tools.mcp_oauth as mod
monkeypatch.setattr(mod, "_OAUTH_AVAILABLE", False)
result = build_oauth_auth("test", "https://example.com")
assert result is None
def test_pre_registered_client_id_stored(self, tmp_path, monkeypatch):
try:
from mcp.client.auth import OAuthClientProvider
except ImportError:
pytest.skip("MCP SDK auth not available")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
build_oauth_auth("slack", "https://slack.example.com/mcp", {
"client_id": "my-app-id",
"client_secret": "my-secret",
"scope": "channels:read",
})
client_path = tmp_path / "mcp-tokens" / "slack.client.json"
assert client_path.exists()
data = json.loads(client_path.read_text())
assert data["client_id"] == "my-app-id"
assert data["client_secret"] == "my-secret"
def test_scope_passed_through(self, tmp_path, monkeypatch):
try:
from mcp.client.auth import OAuthClientProvider
except ImportError:
pytest.skip("MCP SDK auth not available")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
provider = build_oauth_auth("scoped", "https://example.com/mcp", {
"scope": "read write admin",
})
assert provider is not None
assert provider.context.client_metadata.scope == "read write admin"
# ---------------------------------------------------------------------------
# Utility functions
# ---------------------------------------------------------------------------
class TestUtilities:
def test_find_free_port_returns_int(self):
port = _find_free_port()
assert isinstance(port, int)
assert 1024 <= port <= 65535
def test_find_free_port_unique(self):
"""Two consecutive calls should return different ports (usually)."""
ports = {_find_free_port() for _ in range(5)}
# At least 2 different ports out of 5 attempts
assert len(ports) >= 2
def test_can_open_browser_false_in_ssh(self, monkeypatch):
monkeypatch.setenv("SSH_CLIENT", "1.2.3.4 1234 22")
assert _can_open_browser() is False
def test_can_open_browser_false_without_display(self, monkeypatch):
monkeypatch.delenv("SSH_CLIENT", raising=False)
monkeypatch.delenv("SSH_TTY", raising=False)
monkeypatch.delenv("DISPLAY", raising=False)
monkeypatch.delenv("WAYLAND_DISPLAY", raising=False)
# Mock os.name and uname for non-macOS, non-Windows
monkeypatch.setattr(os, "name", "posix")
monkeypatch.setattr(os, "uname", lambda: type("", (), {"sysname": "Linux"})())
assert _can_open_browser() is False
def test_can_open_browser_true_with_display(self, monkeypatch):
monkeypatch.delenv("SSH_CLIENT", raising=False)
monkeypatch.delenv("SSH_TTY", raising=False)
monkeypatch.setenv("DISPLAY", ":0")
monkeypatch.setattr(os, "name", "posix")
assert _can_open_browser() is True
# ---------------------------------------------------------------------------
# Path traversal protection
# ---------------------------------------------------------------------------
class TestPathTraversal:
"""Verify server_name is sanitized to prevent path traversal."""
def test_path_traversal_blocked(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("../../.ssh/config")
path = storage._tokens_path()
# Should stay within mcp-tokens directory
assert "mcp-tokens" in str(path)
assert ".ssh" not in str(path.resolve())
def test_dots_and_slashes_sanitized(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("../../../etc/passwd")
path = storage._tokens_path()
resolved = path.resolve()
assert resolved.is_relative_to((tmp_path / "mcp-tokens").resolve())
def test_normal_name_unchanged(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("my-mcp-server")
assert "my-mcp-server.json" in str(storage._tokens_path())
def test_special_chars_sanitized(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("server@host:8080/path")
path = storage._tokens_path()
assert "@" not in path.name
assert ":" not in path.name
assert "/" not in path.stem
# ---------------------------------------------------------------------------
# Callback handler isolation
# ---------------------------------------------------------------------------
class TestCallbackHandlerIsolation:
"""Verify concurrent OAuth flows don't share state."""
def test_independent_result_dicts(self):
_, result_a = _make_callback_handler()
_, result_b = _make_callback_handler()
result_a["auth_code"] = "code_A"
result_b["auth_code"] = "code_B"
assert result_a["auth_code"] == "code_A"
assert result_b["auth_code"] == "code_B"
def test_handler_writes_to_own_result(self):
HandlerClass, result = _make_callback_handler()
assert result["auth_code"] is None
# Simulate a GET request
handler = HandlerClass.__new__(HandlerClass)
handler.path = "/callback?code=test123&state=mystate"
handler.wfile = BytesIO()
handler.send_response = MagicMock()
handler.send_header = MagicMock()
handler.end_headers = MagicMock()
handler.do_GET()
assert result["auth_code"] == "test123"
assert result["state"] == "mystate"
def test_handler_captures_error(self):
HandlerClass, result = _make_callback_handler()
handler = HandlerClass.__new__(HandlerClass)
handler.path = "/callback?error=access_denied"
handler.wfile = BytesIO()
handler.send_response = MagicMock()
handler.send_header = MagicMock()
handler.end_headers = MagicMock()
handler.do_GET()
assert result["auth_code"] is None
assert result["error"] == "access_denied"
# ---------------------------------------------------------------------------
# Port sharing
# ---------------------------------------------------------------------------
class TestOAuthPortSharing:
"""Verify build_oauth_auth and _wait_for_callback use the same port."""
def test_port_stored_globally(self, tmp_path, monkeypatch):
import tools.mcp_oauth as mod
mod._oauth_port = None
try:
from mcp.client.auth import OAuthClientProvider
except ImportError:
pytest.skip("MCP SDK auth not available")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
build_oauth_auth("test-port", "https://example.com/mcp")
assert mod._oauth_port is not None
assert isinstance(mod._oauth_port, int)
assert 1024 <= mod._oauth_port <= 65535
# ---------------------------------------------------------------------------
# remove_oauth_tokens
# ---------------------------------------------------------------------------
class TestRemoveOAuthTokens:
def test_removes_files(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
d = tmp_path / "mcp-tokens"
d.mkdir()
(d / "myserver.json").write_text("{}")
(d / "myserver.client.json").write_text("{}")
remove_oauth_tokens("myserver")
assert not (d / "myserver.json").exists()
assert not (d / "myserver.client.json").exists()
def test_no_error_when_files_missing(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
remove_oauth_tokens("nonexistent") # should not raise
# ---------------------------------------------------------------------------
# Non-interactive / startup-safety tests
# ---------------------------------------------------------------------------
class TestIsInteractive:
"""_is_interactive() detects headless/daemon/container environments."""
def test_false_when_stdin_not_tty(self, monkeypatch):
mock_stdin = MagicMock()
mock_stdin.isatty.return_value = False
monkeypatch.setattr("tools.mcp_oauth.sys.stdin", mock_stdin)
assert _is_interactive() is False
def test_true_when_stdin_is_tty(self, monkeypatch):
mock_stdin = MagicMock()
mock_stdin.isatty.return_value = True
monkeypatch.setattr("tools.mcp_oauth.sys.stdin", mock_stdin)
assert _is_interactive() is True
def test_false_when_stdin_has_no_isatty(self, monkeypatch):
"""Some environments replace stdin with an object without isatty()."""
mock_stdin = object() # no isatty attribute
monkeypatch.setattr("tools.mcp_oauth.sys.stdin", mock_stdin)
assert _is_interactive() is False
class TestWaitForCallbackNoBlocking:
"""_wait_for_callback() must never call input() — it raises instead."""
def test_raises_on_timeout_instead_of_input(self):
"""When no auth code arrives, raises OAuthNonInteractiveError."""
import tools.mcp_oauth as mod
import asyncio
mod._oauth_port = _find_free_port()
async def instant_sleep(_seconds):
pass
with patch.object(mod.asyncio, "sleep", instant_sleep):
with patch("builtins.input", side_effect=AssertionError("input() must not be called")):
with pytest.raises(OAuthNonInteractiveError, match="callback timed out"):
asyncio.run(_wait_for_callback())
class TestBuildOAuthAuthNonInteractive:
"""build_oauth_auth() in non-interactive mode."""
def test_noninteractive_without_cached_tokens_warns(self, tmp_path, monkeypatch, caplog):
"""Without cached tokens, non-interactive mode logs a clear warning."""
try:
from mcp.client.auth import OAuthClientProvider
except ImportError:
pytest.skip("MCP SDK auth not available")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
mock_stdin = MagicMock()
mock_stdin.isatty.return_value = False
monkeypatch.setattr("tools.mcp_oauth.sys.stdin", mock_stdin)
import logging
with caplog.at_level(logging.WARNING, logger="tools.mcp_oauth"):
auth = build_oauth_auth("atlassian", "https://mcp.atlassian.com/v1/mcp")
assert auth is not None
assert "no cached tokens found" in caplog.text.lower()
assert "non-interactive" in caplog.text.lower()
def test_noninteractive_with_cached_tokens_no_warning(self, tmp_path, monkeypatch, caplog):
"""With cached tokens, non-interactive mode logs no 'no cached tokens' warning."""
try:
from mcp.client.auth import OAuthClientProvider
except ImportError:
pytest.skip("MCP SDK auth not available")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
mock_stdin = MagicMock()
mock_stdin.isatty.return_value = False
monkeypatch.setattr("tools.mcp_oauth.sys.stdin", mock_stdin)
# Pre-populate cached tokens
d = tmp_path / "mcp-tokens"
d.mkdir(parents=True)
(d / "atlassian.json").write_text(json.dumps({
"access_token": "cached",
"token_type": "Bearer",
}))
import logging
with caplog.at_level(logging.WARNING, logger="tools.mcp_oauth"):
auth = build_oauth_auth("atlassian", "https://mcp.atlassian.com/v1/mcp")
assert auth is not None
assert "no cached tokens found" not in caplog.text.lower()
# ---------------------------------------------------------------------------
# Extracted helper tests (Task 3 of MCP OAuth consolidation)
# ---------------------------------------------------------------------------
def test_build_client_metadata_basic():
"""_build_client_metadata returns metadata with expected defaults."""
from tools.mcp_oauth import _build_client_metadata, _configure_callback_port
cfg = {"client_name": "Test Client"}
_configure_callback_port(cfg)
md = _build_client_metadata(cfg)
assert md.client_name == "Test Client"
assert "authorization_code" in md.grant_types
assert "refresh_token" in md.grant_types
def test_build_client_metadata_without_secret_is_public():
"""Without client_secret, token endpoint auth is 'none' (public client)."""
from tools.mcp_oauth import _build_client_metadata, _configure_callback_port
cfg = {}
_configure_callback_port(cfg)
md = _build_client_metadata(cfg)
assert md.token_endpoint_auth_method == "none"
def test_build_client_metadata_with_secret_is_confidential():
"""With client_secret, token endpoint auth is 'client_secret_post'."""
from tools.mcp_oauth import _build_client_metadata, _configure_callback_port
cfg = {"client_secret": "shh"}
_configure_callback_port(cfg)
md = _build_client_metadata(cfg)
assert md.token_endpoint_auth_method == "client_secret_post"
def test_configure_callback_port_picks_free_port():
"""_configure_callback_port(0) picks a free port in the ephemeral range."""
from tools.mcp_oauth import _configure_callback_port
cfg = {"redirect_port": 0}
port = _configure_callback_port(cfg)
assert 1024 < port < 65536
assert cfg["_resolved_port"] == port
def test_configure_callback_port_uses_explicit_port():
"""An explicit redirect_port is preserved."""
from tools.mcp_oauth import _configure_callback_port
cfg = {"redirect_port": 54321}
port = _configure_callback_port(cfg)
assert port == 54321
assert cfg["_resolved_port"] == 54321
def test_parse_base_url_strips_path():
"""_parse_base_url drops path components for OAuth discovery."""
from tools.mcp_oauth import _parse_base_url
assert _parse_base_url("https://example.com/mcp/v1") == "https://example.com"
assert _parse_base_url("https://example.com") == "https://example.com"
assert _parse_base_url("https://host.example.com:8080/api") == "https://host.example.com:8080"