hermes-agent/tests/tools/test_mcp_oauth.py
Teknium 0ff7c09e2f
feat(mcp-oauth): stdin paste-back fallback for headless OAuth flow (#32053)
When the user runs OAuth on a remote/SSH machine without a port forward,
the OAuth provider redirects to http://127.0.0.1:<port>/callback which
only the listener on the remote machine can receive — the user's browser
on another box just shows a connection error.

_wait_for_callback() now races the HTTP listener against a stdin reader
on interactive TTYs. The user can copy the URL from the browser's address
bar after authorization (which contains code=...&state=...) and paste it
back at the prompt. Whichever fills the result dict first wins; the HTTP
listener remains the primary path for local sessions and SSH tunnels.

Accepts any of:
  - Full local redirect URL: http://127.0.0.1:N/callback?code=...&state=...
  - Provider URL after redirect: https://mcp.linear.app/callback?code=...&state=...
  - Just the query string: ?code=...&state=... or code=...&state=...

The paste thread only spawns when _is_interactive() is true, preserving
the existing 'no input() in headless runs' invariant — verified by
TestWaitForCallbackPasteIntegration.test_paste_prompt_NOT_shown_when_noninteractive.

The SSH-session hint in _redirect_handler is updated to surface the paste
option as the primary remedy, with ssh -L tunneling as the alternative.
2026-05-25 05:20:05 -07:00

756 lines
28 KiB
Python

"""Tests for tools/mcp_oauth.py — OAuth 2.1 PKCE support for MCP servers."""
import json
import os
import stat
import sys
from io import BytesIO
from pathlib import Path
from unittest.mock import patch, MagicMock, AsyncMock
import pytest
import asyncio
from tools.mcp_oauth import (
HermesTokenStorage,
OAuthNonInteractiveError,
build_oauth_auth,
remove_oauth_tokens,
_find_free_port,
_can_open_browser,
_is_interactive,
_wait_for_callback,
_make_callback_handler,
_redirect_handler,
_paste_callback_reader,
)
# ---------------------------------------------------------------------------
# HermesTokenStorage
# ---------------------------------------------------------------------------
class TestHermesTokenStorage:
def test_roundtrip_tokens(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("test-server")
import asyncio
# Initially empty
assert asyncio.run(storage.get_tokens()) is None
# Save and retrieve
mock_token = MagicMock()
mock_token.model_dump.return_value = {
"access_token": "abc123",
"token_type": "Bearer",
"refresh_token": "ref456",
}
asyncio.run(storage.set_tokens(mock_token))
# File exists with correct permissions
token_path = tmp_path / "mcp-tokens" / "test-server.json"
assert token_path.exists()
data = json.loads(token_path.read_text())
assert data["access_token"] == "abc123"
@pytest.mark.skipif(sys.platform.startswith("win"), reason="POSIX mode bits not enforced on Windows")
def test_token_file_created_with_0o600(self, tmp_path, monkeypatch):
"""Tokens must land on disk at 0o600 with no umask-default exposure window.
Regression for the TOCTOU race where ``write_text`` + post-write
``chmod`` briefly left credentials at the process umask (commonly
0o644 = world-readable) before tightening to owner-only. Mirrors
the fix shipped for ``agent/google_oauth.py`` in #19673.
"""
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("perm-test-server")
import asyncio
mock_token = MagicMock()
mock_token.model_dump.return_value = {
"access_token": "secret-abc",
"token_type": "Bearer",
"refresh_token": "secret-ref",
}
asyncio.run(storage.set_tokens(mock_token))
token_path = tmp_path / "mcp-tokens" / "perm-test-server.json"
assert token_path.exists()
mode = stat.S_IMODE(token_path.stat().st_mode)
assert mode == 0o600, f"token file mode {oct(mode)} != 0o600 — TOCTOU race regressed"
parent_mode = stat.S_IMODE(token_path.parent.stat().st_mode)
assert parent_mode == 0o700, (
f"token parent dir mode {oct(parent_mode)} != 0o700 — siblings can traverse"
)
def test_roundtrip_client_info(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("test-server")
import asyncio
assert asyncio.run(storage.get_client_info()) is None
mock_client = MagicMock()
mock_client.model_dump.return_value = {
"client_id": "hermes-123",
"client_secret": "secret",
}
asyncio.run(storage.set_client_info(mock_client))
client_path = tmp_path / "mcp-tokens" / "test-server.client.json"
assert client_path.exists()
def test_remove_cleans_up(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("test-server")
# Create files
d = tmp_path / "mcp-tokens"
d.mkdir(parents=True)
(d / "test-server.json").write_text("{}")
(d / "test-server.client.json").write_text("{}")
storage.remove()
assert not (d / "test-server.json").exists()
assert not (d / "test-server.client.json").exists()
def test_has_cached_tokens(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("my-server")
assert not storage.has_cached_tokens()
d = tmp_path / "mcp-tokens"
d.mkdir(parents=True)
(d / "my-server.json").write_text('{"access_token": "x", "token_type": "Bearer"}')
assert storage.has_cached_tokens()
def test_corrupt_tokens_returns_none(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("bad-server")
d = tmp_path / "mcp-tokens"
d.mkdir(parents=True)
(d / "bad-server.json").write_text("NOT VALID JSON{{{")
import asyncio
assert asyncio.run(storage.get_tokens()) is None
def test_corrupt_client_info_returns_none(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("bad-server")
d = tmp_path / "mcp-tokens"
d.mkdir(parents=True)
(d / "bad-server.client.json").write_text("GARBAGE")
import asyncio
assert asyncio.run(storage.get_client_info()) is None
# ---------------------------------------------------------------------------
# build_oauth_auth
# ---------------------------------------------------------------------------
class TestBuildOAuthAuth:
def test_returns_oauth_provider(self, tmp_path, monkeypatch):
try:
from mcp.client.auth import OAuthClientProvider
except ImportError:
pytest.skip("MCP SDK auth not available")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
auth = build_oauth_auth("test", "https://example.com/mcp")
assert isinstance(auth, OAuthClientProvider)
def test_returns_none_without_sdk(self, monkeypatch):
import tools.mcp_oauth as mod
monkeypatch.setattr(mod, "_OAUTH_AVAILABLE", False)
result = build_oauth_auth("test", "https://example.com")
assert result is None
def test_pre_registered_client_id_stored(self, tmp_path, monkeypatch):
try:
from mcp.client.auth import OAuthClientProvider
except ImportError:
pytest.skip("MCP SDK auth not available")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
build_oauth_auth("slack", "https://slack.example.com/mcp", {
"client_id": "my-app-id",
"client_secret": "my-secret",
"scope": "channels:read",
})
client_path = tmp_path / "mcp-tokens" / "slack.client.json"
assert client_path.exists()
data = json.loads(client_path.read_text())
assert data["client_id"] == "my-app-id"
assert data["client_secret"] == "my-secret"
def test_scope_passed_through(self, tmp_path, monkeypatch):
try:
from mcp.client.auth import OAuthClientProvider
except ImportError:
pytest.skip("MCP SDK auth not available")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
provider = build_oauth_auth("scoped", "https://example.com/mcp", {
"scope": "read write admin",
})
assert provider is not None
assert provider.context.client_metadata.scope == "read write admin"
# ---------------------------------------------------------------------------
# Utility functions
# ---------------------------------------------------------------------------
class TestUtilities:
def test_find_free_port_returns_int(self):
port = _find_free_port()
assert isinstance(port, int)
assert 1024 <= port <= 65535
def test_find_free_port_unique(self):
"""Two consecutive calls should return different ports (usually)."""
ports = {_find_free_port() for _ in range(5)}
# At least 2 different ports out of 5 attempts
assert len(ports) >= 2
def test_can_open_browser_false_in_ssh(self, monkeypatch):
monkeypatch.setenv("SSH_CLIENT", "1.2.3.4 1234 22")
assert _can_open_browser() is False
def test_can_open_browser_false_without_display(self, monkeypatch):
monkeypatch.delenv("SSH_CLIENT", raising=False)
monkeypatch.delenv("SSH_TTY", raising=False)
monkeypatch.delenv("DISPLAY", raising=False)
monkeypatch.delenv("WAYLAND_DISPLAY", raising=False)
# Mock os.name and uname for non-macOS, non-Windows
monkeypatch.setattr(os, "name", "posix")
monkeypatch.setattr(os, "uname", lambda: type("", (), {"sysname": "Linux"})())
assert _can_open_browser() is False
def test_can_open_browser_true_with_display(self, monkeypatch):
monkeypatch.delenv("SSH_CLIENT", raising=False)
monkeypatch.delenv("SSH_TTY", raising=False)
monkeypatch.setenv("DISPLAY", ":0")
monkeypatch.setattr(os, "name", "posix")
assert _can_open_browser() is True
class TestRedirectHandlerSshHint:
"""_redirect_handler must print an SSH tunnel hint on remote sessions."""
def _run(self, coro):
return asyncio.get_event_loop().run_until_complete(coro)
def test_ssh_hint_shown_on_ssh_session(self, monkeypatch, capsys):
import tools.mcp_oauth as mco
monkeypatch.setattr(mco, "_oauth_port", 49200)
monkeypatch.setenv("SSH_CLIENT", "1.2.3.4 1234 22")
monkeypatch.delenv("SSH_TTY", raising=False)
monkeypatch.setattr(mco, "_can_open_browser", lambda: False)
self._run(_redirect_handler("https://example.com/auth?foo=bar"))
err = capsys.readouterr().err
assert "49200" in err
assert "ssh -N -L" in err
assert "Remote session detected" in err
def test_ssh_hint_shown_via_ssh_tty(self, monkeypatch, capsys):
import tools.mcp_oauth as mco
monkeypatch.setattr(mco, "_oauth_port", 49201)
monkeypatch.delenv("SSH_CLIENT", raising=False)
monkeypatch.setenv("SSH_TTY", "/dev/pts/1")
monkeypatch.setattr(mco, "_can_open_browser", lambda: False)
self._run(_redirect_handler("https://example.com/auth"))
err = capsys.readouterr().err
assert "49201" in err
assert "ssh -N -L" in err
def test_no_ssh_hint_on_local_session(self, monkeypatch, capsys):
import tools.mcp_oauth as mco
monkeypatch.setattr(mco, "_oauth_port", 49202)
monkeypatch.delenv("SSH_CLIENT", raising=False)
monkeypatch.delenv("SSH_TTY", raising=False)
monkeypatch.setattr(mco, "_can_open_browser", lambda: True)
monkeypatch.setattr("webbrowser.open", lambda url, **kw: True)
self._run(_redirect_handler("https://example.com/auth"))
err = capsys.readouterr().err
assert "ssh -N -L" not in err
def test_no_ssh_hint_when_port_not_set(self, monkeypatch, capsys):
import tools.mcp_oauth as mco
monkeypatch.setattr(mco, "_oauth_port", None)
monkeypatch.setenv("SSH_CLIENT", "1.2.3.4 1234 22")
monkeypatch.setattr(mco, "_can_open_browser", lambda: False)
self._run(_redirect_handler("https://example.com/auth"))
err = capsys.readouterr().err
assert "ssh -N -L" not in err
# ---------------------------------------------------------------------------
# Path traversal protection
# ---------------------------------------------------------------------------
class TestPathTraversal:
"""Verify server_name is sanitized to prevent path traversal."""
def test_path_traversal_blocked(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("../../.ssh/config")
path = storage._tokens_path()
# Should stay within mcp-tokens directory
assert "mcp-tokens" in str(path)
assert ".ssh" not in str(path.resolve())
def test_dots_and_slashes_sanitized(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("../../../etc/passwd")
path = storage._tokens_path()
resolved = path.resolve()
assert resolved.is_relative_to((tmp_path / "mcp-tokens").resolve())
def test_normal_name_unchanged(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("my-mcp-server")
assert "my-mcp-server.json" in str(storage._tokens_path())
def test_special_chars_sanitized(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
storage = HermesTokenStorage("server@host:8080/path")
path = storage._tokens_path()
assert "@" not in path.name
assert ":" not in path.name
assert "/" not in path.stem
# ---------------------------------------------------------------------------
# Callback handler isolation
# ---------------------------------------------------------------------------
class TestCallbackHandlerIsolation:
"""Verify concurrent OAuth flows don't share state."""
def test_independent_result_dicts(self):
_, result_a = _make_callback_handler()
_, result_b = _make_callback_handler()
result_a["auth_code"] = "code_A"
result_b["auth_code"] = "code_B"
assert result_a["auth_code"] == "code_A"
assert result_b["auth_code"] == "code_B"
def test_handler_writes_to_own_result(self):
HandlerClass, result = _make_callback_handler()
assert result["auth_code"] is None
# Simulate a GET request
handler = HandlerClass.__new__(HandlerClass)
handler.path = "/callback?code=test123&state=mystate"
handler.wfile = BytesIO()
handler.send_response = MagicMock()
handler.send_header = MagicMock()
handler.end_headers = MagicMock()
handler.do_GET()
assert result["auth_code"] == "test123"
assert result["state"] == "mystate"
def test_handler_captures_error(self):
HandlerClass, result = _make_callback_handler()
handler = HandlerClass.__new__(HandlerClass)
handler.path = "/callback?error=access_denied"
handler.wfile = BytesIO()
handler.send_response = MagicMock()
handler.send_header = MagicMock()
handler.end_headers = MagicMock()
handler.do_GET()
assert result["auth_code"] is None
assert result["error"] == "access_denied"
# ---------------------------------------------------------------------------
# Port sharing
# ---------------------------------------------------------------------------
class TestOAuthPortSharing:
"""Verify build_oauth_auth and _wait_for_callback use the same port."""
def test_port_stored_globally(self, tmp_path, monkeypatch):
import tools.mcp_oauth as mod
mod._oauth_port = None
try:
from mcp.client.auth import OAuthClientProvider
except ImportError:
pytest.skip("MCP SDK auth not available")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
build_oauth_auth("test-port", "https://example.com/mcp")
assert mod._oauth_port is not None
assert isinstance(mod._oauth_port, int)
assert 1024 <= mod._oauth_port <= 65535
# ---------------------------------------------------------------------------
# remove_oauth_tokens
# ---------------------------------------------------------------------------
class TestRemoveOAuthTokens:
def test_removes_files(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
d = tmp_path / "mcp-tokens"
d.mkdir()
(d / "myserver.json").write_text("{}")
(d / "myserver.client.json").write_text("{}")
remove_oauth_tokens("myserver")
assert not (d / "myserver.json").exists()
assert not (d / "myserver.client.json").exists()
def test_no_error_when_files_missing(self, tmp_path, monkeypatch):
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
remove_oauth_tokens("nonexistent") # should not raise
# ---------------------------------------------------------------------------
# Non-interactive / startup-safety tests
# ---------------------------------------------------------------------------
class TestIsInteractive:
"""_is_interactive() detects headless/daemon/container environments."""
def test_false_when_stdin_not_tty(self, monkeypatch):
mock_stdin = MagicMock()
mock_stdin.isatty.return_value = False
monkeypatch.setattr("tools.mcp_oauth.sys.stdin", mock_stdin)
assert _is_interactive() is False
def test_true_when_stdin_is_tty(self, monkeypatch):
mock_stdin = MagicMock()
mock_stdin.isatty.return_value = True
monkeypatch.setattr("tools.mcp_oauth.sys.stdin", mock_stdin)
assert _is_interactive() is True
def test_false_when_stdin_has_no_isatty(self, monkeypatch):
"""Some environments replace stdin with an object without isatty()."""
mock_stdin = object() # no isatty attribute
monkeypatch.setattr("tools.mcp_oauth.sys.stdin", mock_stdin)
assert _is_interactive() is False
class TestWaitForCallbackNoBlocking:
"""_wait_for_callback() must never call input() — it raises instead."""
def test_raises_on_timeout_instead_of_input(self):
"""When no auth code arrives, raises OAuthNonInteractiveError."""
import tools.mcp_oauth as mod
import asyncio
mod._oauth_port = _find_free_port()
async def instant_sleep(_seconds):
pass
with patch.object(mod.asyncio, "sleep", instant_sleep):
with patch("builtins.input", side_effect=AssertionError("input() must not be called")):
with pytest.raises(OAuthNonInteractiveError, match="callback timed out"):
asyncio.run(_wait_for_callback())
class TestBuildOAuthAuthNonInteractive:
"""build_oauth_auth() in non-interactive mode."""
def test_noninteractive_without_cached_tokens_warns(self, tmp_path, monkeypatch, caplog):
"""Without cached tokens, non-interactive mode logs a clear warning."""
try:
from mcp.client.auth import OAuthClientProvider
except ImportError:
pytest.skip("MCP SDK auth not available")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
mock_stdin = MagicMock()
mock_stdin.isatty.return_value = False
monkeypatch.setattr("tools.mcp_oauth.sys.stdin", mock_stdin)
import logging
with caplog.at_level(logging.WARNING, logger="tools.mcp_oauth"):
auth = build_oauth_auth("atlassian", "https://mcp.atlassian.com/v1/mcp")
assert auth is not None
assert "no cached tokens found" in caplog.text.lower()
assert "non-interactive" in caplog.text.lower()
def test_noninteractive_with_cached_tokens_no_warning(self, tmp_path, monkeypatch, caplog):
"""With cached tokens, non-interactive mode logs no 'no cached tokens' warning."""
try:
from mcp.client.auth import OAuthClientProvider
except ImportError:
pytest.skip("MCP SDK auth not available")
monkeypatch.setenv("HERMES_HOME", str(tmp_path))
mock_stdin = MagicMock()
mock_stdin.isatty.return_value = False
monkeypatch.setattr("tools.mcp_oauth.sys.stdin", mock_stdin)
# Pre-populate cached tokens
d = tmp_path / "mcp-tokens"
d.mkdir(parents=True)
(d / "atlassian.json").write_text(json.dumps({
"access_token": "cached",
"token_type": "Bearer",
}))
import logging
with caplog.at_level(logging.WARNING, logger="tools.mcp_oauth"):
auth = build_oauth_auth("atlassian", "https://mcp.atlassian.com/v1/mcp")
assert auth is not None
assert "no cached tokens found" not in caplog.text.lower()
# ---------------------------------------------------------------------------
# Extracted helper tests (Task 3 of MCP OAuth consolidation)
# ---------------------------------------------------------------------------
def test_build_client_metadata_basic():
"""_build_client_metadata returns metadata with expected defaults."""
pytest.importorskip("mcp")
from tools.mcp_oauth import _build_client_metadata, _configure_callback_port
cfg = {"client_name": "Test Client"}
_configure_callback_port(cfg)
md = _build_client_metadata(cfg)
assert md.client_name == "Test Client"
assert "authorization_code" in md.grant_types
assert "refresh_token" in md.grant_types
def test_build_client_metadata_without_secret_is_public():
"""Without client_secret, token endpoint auth is 'none' (public client)."""
pytest.importorskip("mcp")
from tools.mcp_oauth import _build_client_metadata, _configure_callback_port
cfg = {}
_configure_callback_port(cfg)
md = _build_client_metadata(cfg)
assert md.token_endpoint_auth_method == "none"
def test_build_client_metadata_with_secret_is_confidential():
"""With client_secret, token endpoint auth is 'client_secret_post'."""
pytest.importorskip("mcp")
from tools.mcp_oauth import _build_client_metadata, _configure_callback_port
cfg = {"client_secret": "shh"}
_configure_callback_port(cfg)
md = _build_client_metadata(cfg)
assert md.token_endpoint_auth_method == "client_secret_post"
def test_configure_callback_port_picks_free_port():
"""_configure_callback_port(0) picks a free port in the ephemeral range."""
from tools.mcp_oauth import _configure_callback_port
cfg = {"redirect_port": 0}
port = _configure_callback_port(cfg)
assert 1024 < port < 65536
assert cfg["_resolved_port"] == port
def test_configure_callback_port_uses_explicit_port():
"""An explicit redirect_port is preserved."""
from tools.mcp_oauth import _configure_callback_port
cfg = {"redirect_port": 54321}
port = _configure_callback_port(cfg)
assert port == 54321
assert cfg["_resolved_port"] == 54321
def test_build_oauth_auth_preserves_server_url_path():
"""server_url with path is forwarded to OAuthClientProvider unmodified.
Regression for #16015: previously ``_parse_base_url`` stripped the path,
collapsing ``https://mcp.notion.com/mcp`` to ``https://mcp.notion.com`` and
breaking RFC 9728 protected-resource validation against servers whose PRM
advertises a path-scoped resource (Notion). The MCP SDK strips the path
itself for authorization-server discovery via
``OAuthContext.get_authorization_base_url``; Hermes must not pre-strip.
"""
from tools import mcp_oauth
captured: dict = {}
class _FakeProvider:
def __init__(self, **kwargs):
captured.update(kwargs)
with patch.object(mcp_oauth, "_OAUTH_AVAILABLE", True), \
patch.object(mcp_oauth, "OAuthClientProvider", _FakeProvider), \
patch.object(mcp_oauth, "_is_interactive", return_value=True), \
patch.object(mcp_oauth, "_maybe_preregister_client"), \
patch.object(mcp_oauth, "HermesTokenStorage") as mock_storage_cls:
mock_storage_cls.return_value = MagicMock(has_cached_tokens=lambda: True)
build_oauth_auth(
server_name="notion",
server_url="https://mcp.notion.com/mcp",
oauth_config={},
)
assert captured["server_url"] == "https://mcp.notion.com/mcp"
class TestPasteCallbackReader:
"""_paste_callback_reader parses redirect URLs / query strings from stdin."""
def _empty_result(self):
return {"auth_code": None, "state": None, "error": None}
def test_parses_full_local_redirect_url(self, monkeypatch):
result = self._empty_result()
monkeypatch.setattr(
"sys.stdin",
MagicMock(readline=lambda: "http://127.0.0.1:37949/callback?code=abc&state=xyz\n"),
)
_paste_callback_reader(result)
assert result["auth_code"] == "abc"
assert result["state"] == "xyz"
assert result["error"] is None
def test_parses_remote_provider_url(self, monkeypatch):
"""User pastes the URL their browser ended up on, including a real host."""
result = self._empty_result()
url = "https://mcp.linear.app/callback?code=deadbeef&state=eyJ0ZXN0Ijoi"
monkeypatch.setattr("sys.stdin", MagicMock(readline=lambda: url + "\n"))
_paste_callback_reader(result)
assert result["auth_code"] == "deadbeef"
assert result["state"] == "eyJ0ZXN0Ijoi"
def test_parses_bare_query_string(self, monkeypatch):
result = self._empty_result()
monkeypatch.setattr(
"sys.stdin",
MagicMock(readline=lambda: "code=token123&state=st1\n"),
)
_paste_callback_reader(result)
assert result["auth_code"] == "token123"
assert result["state"] == "st1"
def test_parses_leading_question_mark(self, monkeypatch):
result = self._empty_result()
monkeypatch.setattr(
"sys.stdin",
MagicMock(readline=lambda: "?code=tok&state=stA\n"),
)
_paste_callback_reader(result)
assert result["auth_code"] == "tok"
assert result["state"] == "stA"
def test_captures_error_param(self, monkeypatch):
result = self._empty_result()
monkeypatch.setattr(
"sys.stdin",
MagicMock(readline=lambda: "https://example/cb?error=access_denied\n"),
)
_paste_callback_reader(result)
assert result["auth_code"] is None
assert result["error"] == "access_denied"
def test_empty_input_noop(self, monkeypatch):
result = self._empty_result()
monkeypatch.setattr("sys.stdin", MagicMock(readline=lambda: ""))
_paste_callback_reader(result)
assert result["auth_code"] is None
assert result["error"] is None
def test_garbage_input_noop(self, monkeypatch, capsys):
result = self._empty_result()
monkeypatch.setattr(
"sys.stdin", MagicMock(readline=lambda: "not a url at all\n")
)
_paste_callback_reader(result)
assert result["auth_code"] is None
assert result["error"] is None
err = capsys.readouterr().err
assert "did not contain" in err or "Could not parse" in err
def test_skips_when_http_listener_already_won(self, monkeypatch):
"""If HTTP listener filled the result first, paste must not overwrite."""
result = {"auth_code": "from_http", "state": "http_state", "error": None}
monkeypatch.setattr(
"sys.stdin",
MagicMock(readline=lambda: "code=from_paste&state=paste_state\n"),
)
_paste_callback_reader(result)
assert result["auth_code"] == "from_http"
assert result["state"] == "http_state"
def test_swallows_stdin_errors(self, monkeypatch):
"""OSError / interrupt on readline must not propagate."""
result = self._empty_result()
def raise_oserror():
raise OSError("stdin closed")
monkeypatch.setattr("sys.stdin", MagicMock(readline=raise_oserror))
_paste_callback_reader(result) # must not raise
assert result["auth_code"] is None
class TestWaitForCallbackPasteIntegration:
"""_wait_for_callback offers the paste prompt only when interactive."""
def test_paste_prompt_shown_on_tty(self, monkeypatch, capsys):
import tools.mcp_oauth as mod
mod._oauth_port = _find_free_port()
monkeypatch.setattr(mod, "_is_interactive", lambda: True)
# Make stdin readline block forever so HTTP listener path drives the test;
# we just want to verify the prompt was printed and the thread spawned.
def block_forever():
import threading
threading.Event().wait()
monkeypatch.setattr("sys.stdin", MagicMock(readline=block_forever))
async def instant_sleep(_):
pass
with patch.object(mod.asyncio, "sleep", instant_sleep):
with pytest.raises(OAuthNonInteractiveError):
asyncio.run(_wait_for_callback())
err = capsys.readouterr().err
assert "paste the redirect URL" in err
def test_paste_prompt_NOT_shown_when_noninteractive(self, monkeypatch, capsys):
"""Preserves existing invariant: no input() / paste prompt in headless runs."""
import tools.mcp_oauth as mod
mod._oauth_port = _find_free_port()
monkeypatch.setattr(mod, "_is_interactive", lambda: False)
async def instant_sleep(_):
pass
with patch.object(mod.asyncio, "sleep", instant_sleep):
with patch("builtins.input", side_effect=AssertionError("input() must not be called")):
with pytest.raises(OAuthNonInteractiveError):
asyncio.run(_wait_for_callback())
err = capsys.readouterr().err
assert "paste the redirect URL" not in err