From 7e9dd9ca456ec9abf0cec67a3ecb7618fb71b984 Mon Sep 17 00:00:00 2001 From: Dilee Date: Fri, 24 Apr 2026 14:17:44 +0300 Subject: [PATCH] Add native Spotify tools with PKCE auth --- hermes_cli/auth.py | 567 +++++++++++++++++++++++++- hermes_cli/auth_commands.py | 47 +++ hermes_cli/main.py | 13 +- tests/hermes_cli/test_spotify_auth.py | 88 ++++ tests/tools/test_spotify_client.py | 244 +++++++++++ tools/providers/__init__.py | 1 + tools/providers/spotify_client.py | 435 ++++++++++++++++++++ tools/spotify_tool.py | 530 ++++++++++++++++++++++++ toolsets.py | 14 + 9 files changed, 1936 insertions(+), 3 deletions(-) create mode 100644 tests/hermes_cli/test_spotify_auth.py create mode 100644 tests/tools/test_spotify_client.py create mode 100644 tools/providers/__init__.py create mode 100644 tools/providers/spotify_client.py create mode 100644 tools/spotify_tool.py diff --git a/hermes_cli/auth.py b/hermes_cli/auth.py index ed6b8f462..6e8774bf5 100644 --- a/hermes_cli/auth.py +++ b/hermes_cli/auth.py @@ -33,8 +33,10 @@ import webbrowser from contextlib import contextmanager from dataclasses import dataclass, field from datetime import datetime, timezone +from http.server import BaseHTTPRequestHandler, HTTPServer from pathlib import Path from typing import Any, Dict, List, Optional +from urllib.parse import parse_qs, urlencode, urlparse import httpx import yaml @@ -81,6 +83,25 @@ CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120 QWEN_OAUTH_CLIENT_ID = "f0304373b74a44d2b584a3fb70ca9e56" QWEN_OAUTH_TOKEN_URL = "https://chat.qwen.ai/api/v1/oauth2/token" QWEN_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120 +DEFAULT_SPOTIFY_ACCOUNTS_BASE_URL = "https://accounts.spotify.com" +DEFAULT_SPOTIFY_API_BASE_URL = "https://api.spotify.com/v1" +DEFAULT_SPOTIFY_REDIRECT_URI = "http://127.0.0.1:43827/spotify/callback" +SPOTIFY_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120 +DEFAULT_SPOTIFY_SCOPE = " ".join(( + "user-modify-playback-state", + "user-read-playback-state", + "user-read-currently-playing", + "user-read-recently-played", + "playlist-read-private", + "playlist-read-collaborative", + "playlist-modify-public", + "playlist-modify-private", + "user-library-read", + "user-library-modify", +)) +SERVICE_PROVIDER_NAMES: Dict[str, str] = { + "spotify": "Spotify", +} # Google Gemini OAuth (google-gemini-cli provider, Cloud Code Assist backend) DEFAULT_GEMINI_CLOUDCODE_BASE_URL = "cloudcode-pa://google" @@ -795,6 +816,34 @@ def _save_provider_state(auth_store: Dict[str, Any], provider_id: str, state: Di auth_store["active_provider"] = provider_id +def _store_provider_state( + auth_store: Dict[str, Any], + provider_id: str, + state: Dict[str, Any], + *, + set_active: bool = True, +) -> None: + providers = auth_store.setdefault("providers", {}) + if not isinstance(providers, dict): + auth_store["providers"] = {} + providers = auth_store["providers"] + providers[provider_id] = state + if set_active: + auth_store["active_provider"] = provider_id + + +def is_known_auth_provider(provider_id: str) -> bool: + normalized = (provider_id or "").strip().lower() + return normalized in PROVIDER_REGISTRY or normalized in SERVICE_PROVIDER_NAMES + + +def get_auth_provider_display_name(provider_id: str) -> str: + normalized = (provider_id or "").strip().lower() + if normalized in PROVIDER_REGISTRY: + return PROVIDER_REGISTRY[normalized].name + return SERVICE_PROVIDER_NAMES.get(normalized, provider_id) + + def read_credential_pool(provider_id: Optional[str] = None) -> Dict[str, Any]: """Return the persisted credential pool, or one provider slice.""" auth_store = _load_auth_store() @@ -1429,8 +1478,520 @@ def get_gemini_oauth_auth_status() -> Dict[str, Any]: "email": creds.email, "project_id": creds.project_id, } +# Spotify auth — PKCE tokens stored in ~/.hermes/auth.json +# ============================================================================= +def _spotify_scope_list(raw_scope: Optional[str] = None) -> List[str]: + scope_text = (raw_scope or DEFAULT_SPOTIFY_SCOPE).strip() + scopes = [part for part in scope_text.split() if part] + seen: set[str] = set() + ordered: List[str] = [] + for scope in scopes: + if scope not in seen: + seen.add(scope) + ordered.append(scope) + return ordered + + +def _spotify_scope_string(raw_scope: Optional[str] = None) -> str: + return " ".join(_spotify_scope_list(raw_scope)) + + +def _spotify_client_id( + explicit: Optional[str] = None, + state: Optional[Dict[str, Any]] = None, +) -> str: + from hermes_cli.config import get_env_value + + candidates = ( + explicit, + get_env_value("HERMES_SPOTIFY_CLIENT_ID"), + get_env_value("SPOTIFY_CLIENT_ID"), + state.get("client_id") if isinstance(state, dict) else None, + ) + for candidate in candidates: + cleaned = str(candidate or "").strip() + if cleaned: + return cleaned + raise AuthError( + "Spotify client_id is required. Set HERMES_SPOTIFY_CLIENT_ID or pass --client-id.", + provider="spotify", + code="spotify_client_id_missing", + ) + + +def _spotify_redirect_uri( + explicit: Optional[str] = None, + state: Optional[Dict[str, Any]] = None, +) -> str: + from hermes_cli.config import get_env_value + + candidates = ( + explicit, + get_env_value("HERMES_SPOTIFY_REDIRECT_URI"), + get_env_value("SPOTIFY_REDIRECT_URI"), + state.get("redirect_uri") if isinstance(state, dict) else None, + DEFAULT_SPOTIFY_REDIRECT_URI, + ) + for candidate in candidates: + cleaned = str(candidate or "").strip() + if cleaned: + return cleaned + return DEFAULT_SPOTIFY_REDIRECT_URI + + +def _spotify_api_base_url(state: Optional[Dict[str, Any]] = None) -> str: + from hermes_cli.config import get_env_value + + candidates = ( + get_env_value("HERMES_SPOTIFY_API_BASE_URL"), + state.get("api_base_url") if isinstance(state, dict) else None, + DEFAULT_SPOTIFY_API_BASE_URL, + ) + for candidate in candidates: + cleaned = str(candidate or "").strip().rstrip("/") + if cleaned: + return cleaned + return DEFAULT_SPOTIFY_API_BASE_URL + + +def _spotify_accounts_base_url(state: Optional[Dict[str, Any]] = None) -> str: + from hermes_cli.config import get_env_value + + candidates = ( + get_env_value("HERMES_SPOTIFY_ACCOUNTS_BASE_URL"), + state.get("accounts_base_url") if isinstance(state, dict) else None, + DEFAULT_SPOTIFY_ACCOUNTS_BASE_URL, + ) + for candidate in candidates: + cleaned = str(candidate or "").strip().rstrip("/") + if cleaned: + return cleaned + return DEFAULT_SPOTIFY_ACCOUNTS_BASE_URL + + +def _spotify_code_verifier(length: int = 64) -> str: + raw = base64.urlsafe_b64encode(os.urandom(length)).decode("ascii") + return raw.rstrip("=")[:128] + + +def _spotify_code_challenge(code_verifier: str) -> str: + digest = hashlib.sha256(code_verifier.encode("utf-8")).digest() + return base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=") + + +def _spotify_build_authorize_url( + *, + client_id: str, + redirect_uri: str, + scope: str, + state: str, + code_challenge: str, + accounts_base_url: str, +) -> str: + query = urlencode({ + "client_id": client_id, + "response_type": "code", + "redirect_uri": redirect_uri, + "scope": scope, + "state": state, + "code_challenge_method": "S256", + "code_challenge": code_challenge, + }) + return f"{accounts_base_url}/authorize?{query}" + + +def _spotify_validate_redirect_uri(redirect_uri: str) -> tuple[str, int, str]: + parsed = urlparse(redirect_uri) + if parsed.scheme != "http": + raise AuthError( + "Spotify PKCE redirect_uri must use http://localhost or http://127.0.0.1.", + provider="spotify", + code="spotify_redirect_invalid", + ) + host = parsed.hostname or "" + if host not in {"127.0.0.1", "localhost"}: + raise AuthError( + "Spotify PKCE redirect_uri must point to localhost or 127.0.0.1.", + provider="spotify", + code="spotify_redirect_invalid", + ) + if not parsed.port: + raise AuthError( + "Spotify PKCE redirect_uri must include an explicit localhost port.", + provider="spotify", + code="spotify_redirect_invalid", + ) + return host, parsed.port, parsed.path or "/" + + +def _make_spotify_callback_handler(expected_path: str) -> tuple[type[BaseHTTPRequestHandler], dict[str, Any]]: + result: dict[str, Any] = { + "code": None, + "state": None, + "error": None, + "error_description": None, + } + + class _SpotifyCallbackHandler(BaseHTTPRequestHandler): + def do_GET(self) -> None: # noqa: N802 + parsed = urlparse(self.path) + if parsed.path != expected_path: + self.send_response(404) + self.end_headers() + self.wfile.write(b"Not found.") + return + + params = parse_qs(parsed.query) + result["code"] = params.get("code", [None])[0] + result["state"] = params.get("state", [None])[0] + result["error"] = params.get("error", [None])[0] + result["error_description"] = params.get("error_description", [None])[0] + + self.send_response(200) + self.send_header("Content-Type", "text/html; charset=utf-8") + self.end_headers() + if result["error"]: + body = "

Spotify authorization failed.

You can close this tab." + else: + body = "

Spotify authorization received.

You can close this tab." + self.wfile.write(body.encode("utf-8")) + + def log_message(self, format: str, *args: Any) -> None: # noqa: A003 + return + + return _SpotifyCallbackHandler, result + + +def _spotify_wait_for_callback( + redirect_uri: str, + *, + timeout_seconds: float = 180.0, +) -> dict[str, Any]: + host, port, path = _spotify_validate_redirect_uri(redirect_uri) + handler_cls, result = _make_spotify_callback_handler(path) + + class _ReuseHTTPServer(HTTPServer): + allow_reuse_address = True + + try: + server = _ReuseHTTPServer((host, port), handler_cls) + except OSError as exc: + raise AuthError( + f"Could not bind Spotify callback server on {host}:{port}: {exc}", + provider="spotify", + code="spotify_callback_bind_failed", + ) from exc + + thread = threading.Thread(target=server.serve_forever, kwargs={"poll_interval": 0.1}, daemon=True) + thread.start() + deadline = time.time() + max(5.0, timeout_seconds) + try: + while time.time() < deadline: + if result["code"] or result["error"]: + return result + time.sleep(0.1) + finally: + server.shutdown() + server.server_close() + thread.join(timeout=1.0) + raise AuthError( + "Spotify authorization timed out waiting for the local callback.", + provider="spotify", + code="spotify_callback_timeout", + ) + + +def _spotify_token_payload_to_state( + token_payload: Dict[str, Any], + *, + client_id: str, + redirect_uri: str, + requested_scope: str, + accounts_base_url: str, + api_base_url: str, + previous_state: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + now = datetime.now(timezone.utc) + expires_in = _coerce_ttl_seconds(token_payload.get("expires_in", 0)) + expires_at = datetime.fromtimestamp(now.timestamp() + expires_in, tz=timezone.utc) + state = dict(previous_state or {}) + state.update({ + "client_id": client_id, + "redirect_uri": redirect_uri, + "accounts_base_url": accounts_base_url, + "api_base_url": api_base_url, + "scope": requested_scope, + "granted_scope": str(token_payload.get("scope") or requested_scope).strip(), + "token_type": str(token_payload.get("token_type", "Bearer") or "Bearer").strip() or "Bearer", + "access_token": str(token_payload.get("access_token", "") or "").strip(), + "refresh_token": str( + token_payload.get("refresh_token") + or state.get("refresh_token") + or "" + ).strip(), + "obtained_at": now.isoformat(), + "expires_at": expires_at.isoformat(), + "expires_in": expires_in, + "auth_type": "oauth_pkce", + }) + return state + + +def _spotify_exchange_code_for_tokens( + *, + client_id: str, + code: str, + redirect_uri: str, + code_verifier: str, + accounts_base_url: str, + timeout_seconds: float = 20.0, +) -> Dict[str, Any]: + try: + response = httpx.post( + f"{accounts_base_url}/api/token", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data={ + "client_id": client_id, + "grant_type": "authorization_code", + "code": code, + "redirect_uri": redirect_uri, + "code_verifier": code_verifier, + }, + timeout=timeout_seconds, + ) + except Exception as exc: + raise AuthError( + f"Spotify token exchange failed: {exc}", + provider="spotify", + code="spotify_token_exchange_failed", + ) from exc + + if response.status_code >= 400: + detail = response.text.strip() + raise AuthError( + "Spotify token exchange failed." + + (f" Response: {detail}" if detail else ""), + provider="spotify", + code="spotify_token_exchange_failed", + ) + payload = response.json() + if not isinstance(payload, dict) or not str(payload.get("access_token", "") or "").strip(): + raise AuthError( + "Spotify token response did not include an access_token.", + provider="spotify", + code="spotify_token_exchange_invalid", + ) + return payload + + +def _refresh_spotify_oauth_state( + state: Dict[str, Any], + *, + timeout_seconds: float = 20.0, +) -> Dict[str, Any]: + refresh_token = str(state.get("refresh_token", "") or "").strip() + if not refresh_token: + raise AuthError( + "Spotify refresh token missing. Run `hermes auth spotify` again.", + provider="spotify", + code="spotify_refresh_token_missing", + relogin_required=True, + ) + + client_id = _spotify_client_id(state=state) + accounts_base_url = _spotify_accounts_base_url(state) + try: + response = httpx.post( + f"{accounts_base_url}/api/token", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data={ + "grant_type": "refresh_token", + "refresh_token": refresh_token, + "client_id": client_id, + }, + timeout=timeout_seconds, + ) + except Exception as exc: + raise AuthError( + f"Spotify token refresh failed: {exc}", + provider="spotify", + code="spotify_refresh_failed", + ) from exc + + if response.status_code >= 400: + detail = response.text.strip() + raise AuthError( + "Spotify token refresh failed. Run `hermes auth spotify` again." + + (f" Response: {detail}" if detail else ""), + provider="spotify", + code="spotify_refresh_failed", + relogin_required=True, + ) + + payload = response.json() + if not isinstance(payload, dict) or not str(payload.get("access_token", "") or "").strip(): + raise AuthError( + "Spotify refresh response did not include an access_token.", + provider="spotify", + code="spotify_refresh_invalid", + relogin_required=True, + ) + + return _spotify_token_payload_to_state( + payload, + client_id=client_id, + redirect_uri=_spotify_redirect_uri(state=state), + requested_scope=str(state.get("scope") or DEFAULT_SPOTIFY_SCOPE), + accounts_base_url=accounts_base_url, + api_base_url=_spotify_api_base_url(state), + previous_state=state, + ) + + +def resolve_spotify_runtime_credentials( + *, + force_refresh: bool = False, + refresh_if_expiring: bool = True, + refresh_skew_seconds: int = SPOTIFY_ACCESS_TOKEN_REFRESH_SKEW_SECONDS, +) -> Dict[str, Any]: + with _auth_store_lock(): + auth_store = _load_auth_store() + state = _load_provider_state(auth_store, "spotify") + if not state: + raise AuthError( + "Spotify is not authenticated. Run `hermes auth spotify` first.", + provider="spotify", + code="spotify_auth_missing", + relogin_required=True, + ) + + should_refresh = bool(force_refresh) + if not should_refresh and refresh_if_expiring: + should_refresh = _is_expiring(state.get("expires_at"), refresh_skew_seconds) + if should_refresh: + state = _refresh_spotify_oauth_state(state) + _store_provider_state(auth_store, "spotify", state, set_active=False) + _save_auth_store(auth_store) + + access_token = str(state.get("access_token", "") or "").strip() + if not access_token: + raise AuthError( + "Spotify access token missing. Run `hermes auth spotify` again.", + provider="spotify", + code="spotify_access_token_missing", + relogin_required=True, + ) + + return { + "provider": "spotify", + "access_token": access_token, + "api_key": access_token, + "token_type": str(state.get("token_type", "Bearer") or "Bearer"), + "base_url": _spotify_api_base_url(state), + "scope": str(state.get("granted_scope") or state.get("scope") or "").strip(), + "client_id": _spotify_client_id(state=state), + "redirect_uri": _spotify_redirect_uri(state=state), + "expires_at": state.get("expires_at"), + "refresh_token": str(state.get("refresh_token", "") or "").strip(), + } + + +def get_spotify_auth_status() -> Dict[str, Any]: + state = get_provider_auth_state("spotify") + if not state: + return {"logged_in": False} + + expires_at = state.get("expires_at") + refresh_token = str(state.get("refresh_token", "") or "").strip() + return { + "logged_in": bool(refresh_token or not _is_expiring(expires_at, 0)), + "auth_type": state.get("auth_type", "oauth_pkce"), + "client_id": state.get("client_id"), + "redirect_uri": state.get("redirect_uri"), + "scope": state.get("granted_scope") or state.get("scope"), + "expires_at": expires_at, + "api_base_url": state.get("api_base_url"), + "has_refresh_token": bool(refresh_token), + } + + +def login_spotify_command(args) -> None: + existing_state = get_provider_auth_state("spotify") or {} + client_id = _spotify_client_id(getattr(args, "client_id", None), existing_state) + redirect_uri = _spotify_redirect_uri(getattr(args, "redirect_uri", None), existing_state) + scope = _spotify_scope_string(getattr(args, "scope", None) or existing_state.get("scope")) + accounts_base_url = _spotify_accounts_base_url(existing_state) + api_base_url = _spotify_api_base_url(existing_state) + open_browser = not getattr(args, "no_browser", False) + + code_verifier = _spotify_code_verifier() + code_challenge = _spotify_code_challenge(code_verifier) + state_nonce = uuid.uuid4().hex + authorize_url = _spotify_build_authorize_url( + client_id=client_id, + redirect_uri=redirect_uri, + scope=scope, + state=state_nonce, + code_challenge=code_challenge, + accounts_base_url=accounts_base_url, + ) + + print("Starting Spotify PKCE login...") + print(f"Client ID: {client_id}") + print(f"Redirect URI: {redirect_uri}") + print("Make sure this redirect URI is allow-listed in your Spotify app settings.") + print() + print("Open this URL to authorize Hermes:") + print(authorize_url) + print() + + if open_browser and not _is_remote_session(): + try: + opened = webbrowser.open(authorize_url) + except Exception: + opened = False + if opened: + print("Browser opened for Spotify authorization.") + else: + print("Could not open the browser automatically; use the URL above.") + + callback = _spotify_wait_for_callback( + redirect_uri, + timeout_seconds=float(getattr(args, "timeout", None) or 180.0), + ) + if callback.get("error"): + detail = callback.get("error_description") or callback["error"] + raise SystemExit(f"Spotify authorization failed: {detail}") + if callback.get("state") != state_nonce: + raise SystemExit("Spotify authorization failed: state mismatch.") + + token_payload = _spotify_exchange_code_for_tokens( + client_id=client_id, + code=str(callback.get("code") or ""), + redirect_uri=redirect_uri, + code_verifier=code_verifier, + accounts_base_url=accounts_base_url, + timeout_seconds=float(getattr(args, "timeout", None) or 20.0), + ) + spotify_state = _spotify_token_payload_to_state( + token_payload, + client_id=client_id, + redirect_uri=redirect_uri, + requested_scope=scope, + accounts_base_url=accounts_base_url, + api_base_url=api_base_url, + ) + + with _auth_store_lock(): + auth_store = _load_auth_store() + _store_provider_state(auth_store, "spotify", spotify_state, set_active=False) + saved_to = _save_auth_store(auth_store) + + print("Spotify login successful!") + print(f" Auth state: {saved_to}") + print(" Provider state saved under providers.spotify") # ============================================================================= # SSH / remote session detection @@ -2744,6 +3305,8 @@ def get_external_process_provider_status(provider_id: str) -> Dict[str, Any]: def get_auth_status(provider_id: Optional[str] = None) -> Dict[str, Any]: """Generic auth status dispatcher.""" target = provider_id or get_active_provider() + if target == "spotify": + return get_spotify_auth_status() if target == "nous": return get_nous_auth_status() if target == "openai-codex": @@ -3658,7 +4221,7 @@ def logout_command(args) -> None: """Clear auth state for a provider.""" provider_id = getattr(args, "provider", None) - if provider_id and provider_id not in PROVIDER_REGISTRY: + if provider_id and not is_known_auth_provider(provider_id): print(f"Unknown provider: {provider_id}") raise SystemExit(1) @@ -3669,8 +4232,8 @@ def logout_command(args) -> None: print("No provider is currently logged in.") return - provider_name = PROVIDER_REGISTRY[target].name if target in PROVIDER_REGISTRY else target config_matches = _config_provider_matches(target) + provider_name = get_auth_provider_display_name(target) if clear_provider_auth(target) or config_matches: _reset_config_provider() diff --git a/hermes_cli/auth_commands.py b/hermes_cli/auth_commands.py index aa092a058..94ea2559c 100644 --- a/hermes_cli/auth_commands.py +++ b/hermes_cli/auth_commands.py @@ -408,6 +408,44 @@ def auth_reset_command(args) -> None: print(f"Reset status on {count} {provider} credentials") +def auth_status_command(args) -> None: + provider = _normalize_provider(getattr(args, "provider", "") or "") + if not provider: + raise SystemExit("Provider is required. Example: `hermes auth status spotify`.") + status = auth_mod.get_auth_status(provider) + if not status.get("logged_in"): + reason = status.get("error") + if reason: + print(f"{provider}: logged out ({reason})") + else: + print(f"{provider}: logged out") + return + + print(f"{provider}: logged in") + for key in ("auth_type", "client_id", "redirect_uri", "scope", "expires_at", "api_base_url"): + value = status.get(key) + if value: + print(f" {key}: {value}") + + +def auth_logout_command(args) -> None: + auth_mod.logout_command(SimpleNamespace(provider=getattr(args, "provider", None))) + + +def auth_spotify_command(args) -> None: + action = str(getattr(args, "spotify_action", "") or "login").strip().lower() + if action in {"", "login"}: + auth_mod.login_spotify_command(args) + return + if action == "status": + auth_status_command(SimpleNamespace(provider="spotify")) + return + if action == "logout": + auth_logout_command(SimpleNamespace(provider="spotify")) + return + raise SystemExit(f"Unknown Spotify auth action: {action}") + + def _interactive_auth() -> None: """Interactive credential pool management when `hermes auth` is called bare.""" # Show current pool status first @@ -605,5 +643,14 @@ def auth_command(args) -> None: if action == "reset": auth_reset_command(args) return + if action == "status": + auth_status_command(args) + return + if action == "logout": + auth_logout_command(args) + return + if action == "spotify": + auth_spotify_command(args) + return # No subcommand — launch interactive mode _interactive_auth() diff --git a/hermes_cli/main.py b/hermes_cli/main.py index 14cca2f03..96ae9adc8 100644 --- a/hermes_cli/main.py +++ b/hermes_cli/main.py @@ -7316,7 +7316,7 @@ For more help on a command: ) logout_parser.add_argument( "--provider", - choices=["nous", "openai-codex"], + choices=["nous", "openai-codex", "spotify"], default=None, help="Provider to log out from (default: active provider)", ) @@ -7373,6 +7373,17 @@ For more help on a command: "reset", help="Clear exhaustion status for all credentials for a provider" ) auth_reset.add_argument("provider", help="Provider id") + auth_status = auth_subparsers.add_parser("status", help="Show auth status for a provider") + auth_status.add_argument("provider", help="Provider id") + auth_logout = auth_subparsers.add_parser("logout", help="Log out a provider and clear stored auth state") + auth_logout.add_argument("provider", help="Provider id") + auth_spotify = auth_subparsers.add_parser("spotify", help="Authenticate Hermes with Spotify via PKCE") + auth_spotify.add_argument("spotify_action", nargs="?", choices=["login", "status", "logout"], default="login") + auth_spotify.add_argument("--client-id", help="Spotify app client_id (or set HERMES_SPOTIFY_CLIENT_ID)") + auth_spotify.add_argument("--redirect-uri", help="Allow-listed localhost redirect URI for your Spotify app") + auth_spotify.add_argument("--scope", help="Override requested Spotify scopes") + auth_spotify.add_argument("--no-browser", action="store_true", help="Do not attempt to open the browser automatically") + auth_spotify.add_argument("--timeout", type=float, help="Callback/token exchange timeout in seconds") auth_parser.set_defaults(func=cmd_auth) # ========================================================================= diff --git a/tests/hermes_cli/test_spotify_auth.py b/tests/hermes_cli/test_spotify_auth.py new file mode 100644 index 000000000..4873d5f9d --- /dev/null +++ b/tests/hermes_cli/test_spotify_auth.py @@ -0,0 +1,88 @@ +from __future__ import annotations + +from types import SimpleNamespace + +import pytest + +from hermes_cli import auth as auth_mod + + +def test_store_provider_state_can_skip_active_provider() -> None: + auth_store = {"active_provider": "nous", "providers": {}} + + auth_mod._store_provider_state( + auth_store, + "spotify", + {"access_token": "abc"}, + set_active=False, + ) + + assert auth_store["active_provider"] == "nous" + assert auth_store["providers"]["spotify"]["access_token"] == "abc" + + +def test_resolve_spotify_runtime_credentials_refreshes_without_changing_active_provider( + tmp_path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("HERMES_HOME", str(tmp_path)) + + with auth_mod._auth_store_lock(): + store = auth_mod._load_auth_store() + store["active_provider"] = "nous" + auth_mod._store_provider_state( + store, + "spotify", + { + "client_id": "spotify-client", + "redirect_uri": "http://127.0.0.1:43827/spotify/callback", + "api_base_url": auth_mod.DEFAULT_SPOTIFY_API_BASE_URL, + "accounts_base_url": auth_mod.DEFAULT_SPOTIFY_ACCOUNTS_BASE_URL, + "scope": auth_mod.DEFAULT_SPOTIFY_SCOPE, + "access_token": "expired-token", + "refresh_token": "refresh-token", + "token_type": "Bearer", + "expires_at": "2000-01-01T00:00:00+00:00", + }, + set_active=False, + ) + auth_mod._save_auth_store(store) + + monkeypatch.setattr( + auth_mod, + "_refresh_spotify_oauth_state", + lambda state, timeout_seconds=20.0: { + **state, + "access_token": "fresh-token", + "expires_at": "2099-01-01T00:00:00+00:00", + }, + ) + + creds = auth_mod.resolve_spotify_runtime_credentials() + + assert creds["access_token"] == "fresh-token" + persisted = auth_mod.get_provider_auth_state("spotify") + assert persisted is not None + assert persisted["access_token"] == "fresh-token" + assert auth_mod.get_active_provider() == "nous" + + +def test_auth_spotify_status_command_reports_logged_in(capsys, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + auth_mod, + "get_auth_status", + lambda provider=None: { + "logged_in": True, + "auth_type": "oauth_pkce", + "client_id": "spotify-client", + "redirect_uri": "http://127.0.0.1:43827/spotify/callback", + "scope": "user-library-read", + }, + ) + + from hermes_cli.auth_commands import auth_status_command + + auth_status_command(SimpleNamespace(provider="spotify")) + output = capsys.readouterr().out + assert "spotify: logged in" in output + assert "client_id: spotify-client" in output diff --git a/tests/tools/test_spotify_client.py b/tests/tools/test_spotify_client.py new file mode 100644 index 000000000..17157ec4e --- /dev/null +++ b/tests/tools/test_spotify_client.py @@ -0,0 +1,244 @@ +from __future__ import annotations + +import json + +import pytest + +from tools.providers import spotify_client as spotify_mod +from tools import spotify_tool + + +class _FakeResponse: + def __init__(self, status_code: int, payload: dict | None = None, *, text: str = "", headers: dict | None = None): + self.status_code = status_code + self._payload = payload + self.text = text or (json.dumps(payload) if payload is not None else "") + self.headers = headers or {"content-type": "application/json"} + self.content = self.text.encode("utf-8") if self.text else b"" + + def json(self): + if self._payload is None: + raise ValueError("no json") + return self._payload + + +class _StubSpotifyClient: + def __init__(self, payload): + self.payload = payload + + def get_currently_playing(self, *, market=None): + return self.payload + + +def test_spotify_client_retries_once_after_401(monkeypatch: pytest.MonkeyPatch) -> None: + calls: list[str] = [] + tokens = iter([ + { + "access_token": "token-1", + "base_url": "https://api.spotify.com/v1", + }, + { + "access_token": "token-2", + "base_url": "https://api.spotify.com/v1", + }, + ]) + + monkeypatch.setattr( + spotify_mod, + "resolve_spotify_runtime_credentials", + lambda **kwargs: next(tokens), + ) + + def fake_request(method, url, headers=None, params=None, json=None, timeout=None): + calls.append(headers["Authorization"]) + if len(calls) == 1: + return _FakeResponse(401, {"error": {"message": "expired token"}}) + return _FakeResponse(200, {"devices": [{"id": "dev-1"}]}) + + monkeypatch.setattr(spotify_mod.httpx, "request", fake_request) + + client = spotify_mod.SpotifyClient() + payload = client.get_devices() + + assert payload["devices"][0]["id"] == "dev-1" + assert calls == ["Bearer token-1", "Bearer token-2"] + + +def test_normalize_spotify_uri_accepts_urls() -> None: + uri = spotify_mod.normalize_spotify_uri( + "https://open.spotify.com/track/7ouMYWpwJ422jRcDASZB7P", + "track", + ) + assert uri == "spotify:track:7ouMYWpwJ422jRcDASZB7P" + + +@pytest.mark.parametrize( + ("status_code", "path", "payload", "expected"), + [ + ( + 403, + "/me/player/play", + {"error": {"message": "Premium required"}}, + "Spotify rejected this playback request. Playback control usually requires a Spotify Premium account and an active Spotify Connect device.", + ), + ( + 404, + "/me/player", + {"error": {"message": "Device not found"}}, + "Spotify could not find an active playback device or player session for this request.", + ), + ( + 429, + "/search", + {"error": {"message": "rate limit"}}, + "Spotify rate limit exceeded. Retry after 7 seconds.", + ), + ], +) +def test_spotify_client_formats_friendly_api_errors( + monkeypatch: pytest.MonkeyPatch, + status_code: int, + path: str, + payload: dict, + expected: str, +) -> None: + monkeypatch.setattr( + spotify_mod, + "resolve_spotify_runtime_credentials", + lambda **kwargs: { + "access_token": "token-1", + "base_url": "https://api.spotify.com/v1", + }, + ) + + def fake_request(method, url, headers=None, params=None, json=None, timeout=None): + return _FakeResponse(status_code, payload, headers={"content-type": "application/json", "Retry-After": "7"}) + + monkeypatch.setattr(spotify_mod.httpx, "request", fake_request) + + client = spotify_mod.SpotifyClient() + with pytest.raises(spotify_mod.SpotifyAPIError) as exc: + client.request("GET", path) + + assert str(exc.value) == expected + + +def test_get_currently_playing_returns_explanatory_empty_payload(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + spotify_mod, + "resolve_spotify_runtime_credentials", + lambda **kwargs: { + "access_token": "token-1", + "base_url": "https://api.spotify.com/v1", + }, + ) + + def fake_request(method, url, headers=None, params=None, json=None, timeout=None): + return _FakeResponse(204, None, text="", headers={"content-type": "application/json"}) + + monkeypatch.setattr(spotify_mod.httpx, "request", fake_request) + + client = spotify_mod.SpotifyClient() + payload = client.get_currently_playing() + + assert payload == { + "status_code": 204, + "empty": True, + "message": "Spotify is not currently playing anything. Start playback in Spotify and try again.", + } + + +def test_spotify_activity_now_playing_returns_explanatory_empty_result(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + spotify_tool, + "_spotify_client", + lambda: _StubSpotifyClient({ + "status_code": 204, + "empty": True, + "message": "Spotify is not currently playing anything. Start playback in Spotify and try again.", + }), + ) + + payload = json.loads(spotify_tool._handle_spotify_activity({"action": "now_playing"})) + + assert payload == { + "success": True, + "action": "now_playing", + "is_playing": False, + "status_code": 204, + "message": "Spotify is not currently playing anything. Start playback in Spotify and try again.", + } + + +def test_library_contains_uses_generic_library_endpoint(monkeypatch: pytest.MonkeyPatch) -> None: + seen: list[tuple[str, str, dict | None]] = [] + + monkeypatch.setattr( + spotify_mod, + "resolve_spotify_runtime_credentials", + lambda **kwargs: { + "access_token": "token-1", + "base_url": "https://api.spotify.com/v1", + }, + ) + + def fake_request(method, url, headers=None, params=None, json=None, timeout=None): + seen.append((method, url, params)) + return _FakeResponse(200, [True]) + + monkeypatch.setattr(spotify_mod.httpx, "request", fake_request) + + client = spotify_mod.SpotifyClient() + payload = client.library_contains(uris=["spotify:album:abc", "spotify:track:def"]) + + assert payload == [True] + assert seen == [ + ( + "GET", + "https://api.spotify.com/v1/me/library/contains", + {"uris": "spotify:album:abc,spotify:track:def"}, + ) + ] + + +@pytest.mark.parametrize( + ("method_name", "item_key", "item_value", "expected_uris"), + [ + ("remove_saved_tracks", "track_ids", ["track-a", "track-b"], ["spotify:track:track-a", "spotify:track:track-b"]), + ("remove_saved_albums", "album_ids", ["album-a"], ["spotify:album:album-a"]), + ], +) +def test_library_remove_uses_generic_library_endpoint( + monkeypatch: pytest.MonkeyPatch, + method_name: str, + item_key: str, + item_value: list[str], + expected_uris: list[str], +) -> None: + seen: list[tuple[str, str, dict | None]] = [] + + monkeypatch.setattr( + spotify_mod, + "resolve_spotify_runtime_credentials", + lambda **kwargs: { + "access_token": "token-1", + "base_url": "https://api.spotify.com/v1", + }, + ) + + def fake_request(method, url, headers=None, params=None, json=None, timeout=None): + seen.append((method, url, params)) + return _FakeResponse(200, {}) + + monkeypatch.setattr(spotify_mod.httpx, "request", fake_request) + + client = spotify_mod.SpotifyClient() + getattr(client, method_name)(**{item_key: item_value}) + + assert seen == [ + ( + "DELETE", + "https://api.spotify.com/v1/me/library", + {"uris": ",".join(expected_uris)}, + ) + ] diff --git a/tools/providers/__init__.py b/tools/providers/__init__.py new file mode 100644 index 000000000..25dd42409 --- /dev/null +++ b/tools/providers/__init__.py @@ -0,0 +1 @@ +"""Provider-specific native tool clients.""" diff --git a/tools/providers/spotify_client.py b/tools/providers/spotify_client.py new file mode 100644 index 000000000..2195cc20a --- /dev/null +++ b/tools/providers/spotify_client.py @@ -0,0 +1,435 @@ +"""Thin Spotify Web API helper used by Hermes native tools.""" + +from __future__ import annotations + +import json +from typing import Any, Dict, Iterable, Optional +from urllib.parse import urlparse + +import httpx + +from hermes_cli.auth import ( + AuthError, + resolve_spotify_runtime_credentials, +) + + +class SpotifyError(RuntimeError): + """Base Spotify tool error.""" + + +class SpotifyAuthRequiredError(SpotifyError): + """Raised when the user needs to authenticate with Spotify first.""" + + +class SpotifyAPIError(SpotifyError): + """Structured Spotify API failure.""" + + def __init__( + self, + message: str, + *, + status_code: Optional[int] = None, + response_body: Optional[str] = None, + ) -> None: + super().__init__(message) + self.status_code = status_code + self.response_body = response_body + self.path = None + + +class SpotifyClient: + def __init__(self) -> None: + self._runtime = self._resolve_runtime(refresh_if_expiring=True) + + def _resolve_runtime(self, *, force_refresh: bool = False, refresh_if_expiring: bool = True) -> Dict[str, Any]: + try: + return resolve_spotify_runtime_credentials( + force_refresh=force_refresh, + refresh_if_expiring=refresh_if_expiring, + ) + except AuthError as exc: + raise SpotifyAuthRequiredError(str(exc)) from exc + + @property + def base_url(self) -> str: + return str(self._runtime.get("base_url") or "").rstrip("/") + + def _headers(self) -> Dict[str, str]: + return { + "Authorization": f"Bearer {self._runtime['access_token']}", + "Content-Type": "application/json", + } + + def request( + self, + method: str, + path: str, + *, + params: Optional[Dict[str, Any]] = None, + json_body: Optional[Dict[str, Any]] = None, + allow_retry_on_401: bool = True, + empty_response: Optional[Dict[str, Any]] = None, + ) -> Any: + url = f"{self.base_url}{path}" + response = httpx.request( + method, + url, + headers=self._headers(), + params=_strip_none(params), + json=_strip_none(json_body) if json_body is not None else None, + timeout=30.0, + ) + if response.status_code == 401 and allow_retry_on_401: + self._runtime = self._resolve_runtime(force_refresh=True, refresh_if_expiring=True) + return self.request( + method, + path, + params=params, + json_body=json_body, + allow_retry_on_401=False, + ) + if response.status_code >= 400: + self._raise_api_error(response, method=method, path=path) + if response.status_code == 204 or not response.content: + return empty_response or {"success": True, "status_code": response.status_code, "empty": True} + if "application/json" in response.headers.get("content-type", ""): + return response.json() + return {"success": True, "text": response.text} + + def _raise_api_error(self, response: httpx.Response, *, method: str, path: str) -> None: + detail = response.text.strip() + message = _friendly_spotify_error_message( + status_code=response.status_code, + detail=_extract_spotify_error_detail(response, fallback=detail), + method=method, + path=path, + retry_after=response.headers.get("Retry-After"), + ) + error = SpotifyAPIError(message, status_code=response.status_code, response_body=detail) + error.path = path + raise error + + def get_devices(self) -> Any: + return self.request("GET", "/me/player/devices") + + def transfer_playback(self, *, device_id: str, play: bool = False) -> Any: + return self.request("PUT", "/me/player", json_body={ + "device_ids": [device_id], + "play": play, + }) + + def get_playback_state(self, *, market: Optional[str] = None) -> Any: + return self.request( + "GET", + "/me/player", + params={"market": market}, + empty_response={ + "status_code": 204, + "empty": True, + "message": "No active Spotify playback session was found. Open Spotify on a device and start playback, or transfer playback to an available device.", + }, + ) + + def get_currently_playing(self, *, market: Optional[str] = None) -> Any: + return self.request( + "GET", + "/me/player/currently-playing", + params={"market": market}, + empty_response={ + "status_code": 204, + "empty": True, + "message": "Spotify is not currently playing anything. Start playback in Spotify and try again.", + }, + ) + + def start_playback( + self, + *, + device_id: Optional[str] = None, + context_uri: Optional[str] = None, + uris: Optional[list[str]] = None, + offset: Optional[Dict[str, Any]] = None, + position_ms: Optional[int] = None, + ) -> Any: + return self.request( + "PUT", + "/me/player/play", + params={"device_id": device_id}, + json_body={ + "context_uri": context_uri, + "uris": uris, + "offset": offset, + "position_ms": position_ms, + }, + ) + + def pause_playback(self, *, device_id: Optional[str] = None) -> Any: + return self.request("PUT", "/me/player/pause", params={"device_id": device_id}) + + def skip_next(self, *, device_id: Optional[str] = None) -> Any: + return self.request("POST", "/me/player/next", params={"device_id": device_id}) + + def skip_previous(self, *, device_id: Optional[str] = None) -> Any: + return self.request("POST", "/me/player/previous", params={"device_id": device_id}) + + def seek(self, *, position_ms: int, device_id: Optional[str] = None) -> Any: + return self.request("PUT", "/me/player/seek", params={ + "position_ms": position_ms, + "device_id": device_id, + }) + + def set_repeat(self, *, state: str, device_id: Optional[str] = None) -> Any: + return self.request("PUT", "/me/player/repeat", params={"state": state, "device_id": device_id}) + + def set_shuffle(self, *, state: bool, device_id: Optional[str] = None) -> Any: + return self.request("PUT", "/me/player/shuffle", params={"state": str(bool(state)).lower(), "device_id": device_id}) + + def set_volume(self, *, volume_percent: int, device_id: Optional[str] = None) -> Any: + return self.request("PUT", "/me/player/volume", params={ + "volume_percent": volume_percent, + "device_id": device_id, + }) + + def get_queue(self) -> Any: + return self.request("GET", "/me/player/queue") + + def add_to_queue(self, *, uri: str, device_id: Optional[str] = None) -> Any: + return self.request("POST", "/me/player/queue", params={"uri": uri, "device_id": device_id}) + + def search( + self, + *, + query: str, + search_types: list[str], + limit: int = 10, + offset: int = 0, + market: Optional[str] = None, + include_external: Optional[str] = None, + ) -> Any: + return self.request("GET", "/search", params={ + "q": query, + "type": ",".join(search_types), + "limit": limit, + "offset": offset, + "market": market, + "include_external": include_external, + }) + + def get_my_playlists(self, *, limit: int = 20, offset: int = 0) -> Any: + return self.request("GET", "/me/playlists", params={"limit": limit, "offset": offset}) + + def get_playlist(self, *, playlist_id: str, market: Optional[str] = None) -> Any: + return self.request("GET", f"/playlists/{playlist_id}", params={"market": market}) + + def create_playlist( + self, + *, + name: str, + public: bool = False, + collaborative: bool = False, + description: Optional[str] = None, + ) -> Any: + return self.request("POST", "/me/playlists", json_body={ + "name": name, + "public": public, + "collaborative": collaborative, + "description": description, + }) + + def add_playlist_items( + self, + *, + playlist_id: str, + uris: list[str], + position: Optional[int] = None, + ) -> Any: + return self.request("POST", f"/playlists/{playlist_id}/items", json_body={ + "uris": uris, + "position": position, + }) + + def remove_playlist_items( + self, + *, + playlist_id: str, + uris: list[str], + snapshot_id: Optional[str] = None, + ) -> Any: + return self.request("DELETE", f"/playlists/{playlist_id}/items", json_body={ + "items": [{"uri": uri} for uri in uris], + "snapshot_id": snapshot_id, + }) + + def update_playlist_details( + self, + *, + playlist_id: str, + name: Optional[str] = None, + public: Optional[bool] = None, + collaborative: Optional[bool] = None, + description: Optional[str] = None, + ) -> Any: + return self.request("PUT", f"/playlists/{playlist_id}", json_body={ + "name": name, + "public": public, + "collaborative": collaborative, + "description": description, + }) + + def get_album(self, *, album_id: str, market: Optional[str] = None) -> Any: + return self.request("GET", f"/albums/{album_id}", params={"market": market}) + + def get_album_tracks(self, *, album_id: str, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any: + return self.request("GET", f"/albums/{album_id}/tracks", params={ + "limit": limit, + "offset": offset, + "market": market, + }) + + def get_saved_tracks(self, *, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any: + return self.request("GET", "/me/tracks", params={"limit": limit, "offset": offset, "market": market}) + + def save_library_items(self, *, uris: list[str]) -> Any: + return self.request("PUT", "/me/library", params={"uris": ",".join(uris)}) + + def library_contains(self, *, uris: list[str]) -> Any: + return self.request("GET", "/me/library/contains", params={"uris": ",".join(uris)}) + + def get_saved_albums(self, *, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any: + return self.request("GET", "/me/albums", params={"limit": limit, "offset": offset, "market": market}) + + def remove_saved_tracks(self, *, track_ids: list[str]) -> Any: + uris = [f"spotify:track:{track_id}" for track_id in track_ids] + return self.request("DELETE", "/me/library", params={"uris": ",".join(uris)}) + + def remove_saved_albums(self, *, album_ids: list[str]) -> Any: + uris = [f"spotify:album:{album_id}" for album_id in album_ids] + return self.request("DELETE", "/me/library", params={"uris": ",".join(uris)}) + + def get_recently_played( + self, + *, + limit: int = 20, + after: Optional[int] = None, + before: Optional[int] = None, + ) -> Any: + return self.request("GET", "/me/player/recently-played", params={ + "limit": limit, + "after": after, + "before": before, + }) + + +def _extract_spotify_error_detail(response: httpx.Response, *, fallback: str) -> str: + detail = fallback + try: + payload = response.json() + if isinstance(payload, dict): + error_obj = payload.get("error") + if isinstance(error_obj, dict): + detail = str(error_obj.get("message") or detail) + elif isinstance(error_obj, str): + detail = error_obj + except Exception: + pass + return detail.strip() + + +def _friendly_spotify_error_message( + *, + status_code: int, + detail: str, + method: str, + path: str, + retry_after: Optional[str], +) -> str: + normalized_detail = detail.lower() + is_playback_path = path.startswith("/me/player") + + if status_code == 401: + return "Spotify authentication failed or expired. Run `hermes auth spotify` again." + + if status_code == 403: + if is_playback_path: + return ( + "Spotify rejected this playback request. Playback control usually requires a Spotify Premium account " + "and an active Spotify Connect device." + ) + if "scope" in normalized_detail or "permission" in normalized_detail: + return "Spotify rejected the request because the current auth scope is insufficient. Re-run `hermes auth spotify` to refresh permissions." + return "Spotify rejected the request. The account may not have permission for this action." + + if status_code == 404: + if is_playback_path: + return "Spotify could not find an active playback device or player session for this request." + return "Spotify resource not found." + + if status_code == 429: + message = "Spotify rate limit exceeded." + if retry_after: + message += f" Retry after {retry_after} seconds." + return message + + if detail: + return detail + return f"Spotify API request failed with status {status_code}." + + +def _strip_none(payload: Optional[Dict[str, Any]]) -> Dict[str, Any]: + if not payload: + return {} + return {key: value for key, value in payload.items() if value is not None} + + +def normalize_spotify_id(value: str, expected_type: Optional[str] = None) -> str: + cleaned = (value or "").strip() + if not cleaned: + raise SpotifyError("Spotify id/uri/url is required.") + if cleaned.startswith("spotify:"): + parts = cleaned.split(":") + if len(parts) >= 3: + item_type = parts[1] + if expected_type and item_type != expected_type: + raise SpotifyError(f"Expected a Spotify {expected_type}, got {item_type}.") + return parts[2] + if "open.spotify.com" in cleaned: + parsed = urlparse(cleaned) + path_parts = [part for part in parsed.path.split("/") if part] + if len(path_parts) >= 2: + item_type, item_id = path_parts[0], path_parts[1] + if expected_type and item_type != expected_type: + raise SpotifyError(f"Expected a Spotify {expected_type}, got {item_type}.") + return item_id + return cleaned + + +def normalize_spotify_uri(value: str, expected_type: Optional[str] = None) -> str: + cleaned = (value or "").strip() + if not cleaned: + raise SpotifyError("Spotify URI/url/id is required.") + if cleaned.startswith("spotify:"): + if expected_type: + parts = cleaned.split(":") + if len(parts) >= 3 and parts[1] != expected_type: + raise SpotifyError(f"Expected a Spotify {expected_type}, got {parts[1]}.") + return cleaned + item_id = normalize_spotify_id(cleaned, expected_type) + if expected_type: + return f"spotify:{expected_type}:{item_id}" + return cleaned + + +def normalize_spotify_uris(values: Iterable[str], expected_type: Optional[str] = None) -> list[str]: + uris: list[str] = [] + for value in values: + uri = normalize_spotify_uri(str(value), expected_type) + if uri not in uris: + uris.append(uri) + if not uris: + raise SpotifyError("At least one Spotify item is required.") + return uris + + +def compact_json(data: Any) -> str: + return json.dumps(data, ensure_ascii=False) diff --git a/tools/spotify_tool.py b/tools/spotify_tool.py new file mode 100644 index 000000000..dc88953c9 --- /dev/null +++ b/tools/spotify_tool.py @@ -0,0 +1,530 @@ +"""Native Spotify tools for Hermes.""" + +from __future__ import annotations + +from typing import Any, Dict, List + +from hermes_cli.auth import get_auth_status +from tools.providers.spotify_client import ( + SpotifyAPIError, + SpotifyAuthRequiredError, + SpotifyClient, + SpotifyError, + normalize_spotify_id, + normalize_spotify_uri, + normalize_spotify_uris, +) +from tools.registry import registry, tool_error, tool_result + + +def _check_spotify_available() -> bool: + try: + return bool(get_auth_status("spotify").get("logged_in")) + except Exception: + return False + + +def _spotify_client() -> SpotifyClient: + return SpotifyClient() + + +def _spotify_tool_error(exc: Exception) -> str: + if isinstance(exc, (SpotifyError, SpotifyAuthRequiredError)): + return tool_error(str(exc)) + if isinstance(exc, SpotifyAPIError): + return tool_error(str(exc), status_code=exc.status_code) + return tool_error(f"Spotify tool failed: {type(exc).__name__}: {exc}") + + +def _coerce_limit(raw: Any, *, default: int = 20, minimum: int = 1, maximum: int = 50) -> int: + try: + value = int(raw) + except Exception: + value = default + return max(minimum, min(maximum, value)) + + +def _coerce_bool(raw: Any, default: bool = False) -> bool: + if isinstance(raw, bool): + return raw + if isinstance(raw, str): + cleaned = raw.strip().lower() + if cleaned in {"1", "true", "yes", "on"}: + return True + if cleaned in {"0", "false", "no", "off"}: + return False + return default + + +def _as_list(raw: Any) -> List[str]: + if raw is None: + return [] + if isinstance(raw, list): + return [str(item).strip() for item in raw if str(item).strip()] + return [str(raw).strip()] if str(raw).strip() else [] + + +def _describe_empty_playback(payload: Any, *, action: str) -> dict | None: + if not isinstance(payload, dict) or not payload.get("empty"): + return None + if action == "get_currently_playing": + return { + "success": True, + "action": action, + "is_playing": False, + "status_code": payload.get("status_code", 204), + "message": payload.get("message") or "Spotify is not currently playing anything.", + } + if action == "get_state": + return { + "success": True, + "action": action, + "has_active_device": False, + "status_code": payload.get("status_code", 204), + "message": payload.get("message") or "No active Spotify playback session was found.", + } + return None + + +def _handle_spotify_playback(args: dict, **kw) -> str: + action = str(args.get("action") or "get_state").strip().lower() + client = _spotify_client() + try: + if action == "get_state": + payload = client.get_playback_state(market=args.get("market")) + empty_result = _describe_empty_playback(payload, action=action) + return tool_result(empty_result or payload) + if action == "get_currently_playing": + payload = client.get_currently_playing(market=args.get("market")) + empty_result = _describe_empty_playback(payload, action=action) + return tool_result(empty_result or payload) + if action == "play": + offset = args.get("offset") + if isinstance(offset, dict): + payload_offset = {k: v for k, v in offset.items() if v is not None} + else: + payload_offset = None + uris = normalize_spotify_uris(_as_list(args.get("uris")), "track") if args.get("uris") else None + context_uri = None + if args.get("context_uri"): + raw_context = str(args.get("context_uri")) + context_type = None + if raw_context.startswith("spotify:album:") or "/album/" in raw_context: + context_type = "album" + elif raw_context.startswith("spotify:playlist:") or "/playlist/" in raw_context: + context_type = "playlist" + elif raw_context.startswith("spotify:artist:") or "/artist/" in raw_context: + context_type = "artist" + context_uri = normalize_spotify_uri(raw_context, context_type) + result = client.start_playback( + device_id=args.get("device_id"), + context_uri=context_uri, + uris=uris, + offset=payload_offset, + position_ms=args.get("position_ms"), + ) + return tool_result({"success": True, "action": action, "result": result}) + if action == "pause": + result = client.pause_playback(device_id=args.get("device_id")) + return tool_result({"success": True, "action": action, "result": result}) + if action == "next": + result = client.skip_next(device_id=args.get("device_id")) + return tool_result({"success": True, "action": action, "result": result}) + if action == "previous": + result = client.skip_previous(device_id=args.get("device_id")) + return tool_result({"success": True, "action": action, "result": result}) + if action == "seek": + if args.get("position_ms") is None: + return tool_error("position_ms is required for action='seek'") + result = client.seek(position_ms=int(args["position_ms"]), device_id=args.get("device_id")) + return tool_result({"success": True, "action": action, "result": result}) + if action == "set_repeat": + state = str(args.get("state") or "").strip().lower() + if state not in {"track", "context", "off"}: + return tool_error("state must be one of: track, context, off") + result = client.set_repeat(state=state, device_id=args.get("device_id")) + return tool_result({"success": True, "action": action, "result": result}) + if action == "set_shuffle": + result = client.set_shuffle(state=_coerce_bool(args.get("state")), device_id=args.get("device_id")) + return tool_result({"success": True, "action": action, "result": result}) + if action == "set_volume": + if args.get("volume_percent") is None: + return tool_error("volume_percent is required for action='set_volume'") + result = client.set_volume(volume_percent=max(0, min(100, int(args["volume_percent"]))), device_id=args.get("device_id")) + return tool_result({"success": True, "action": action, "result": result}) + return tool_error(f"Unknown spotify_playback action: {action}") + except Exception as exc: + return _spotify_tool_error(exc) + + +def _handle_spotify_devices(args: dict, **kw) -> str: + action = str(args.get("action") or "list").strip().lower() + client = _spotify_client() + try: + if action == "list": + return tool_result(client.get_devices()) + if action == "transfer": + device_id = str(args.get("device_id") or "").strip() + if not device_id: + return tool_error("device_id is required for action='transfer'") + result = client.transfer_playback(device_id=device_id, play=_coerce_bool(args.get("play"))) + return tool_result({"success": True, "action": action, "result": result}) + return tool_error(f"Unknown spotify_devices action: {action}") + except Exception as exc: + return _spotify_tool_error(exc) + + +def _handle_spotify_queue(args: dict, **kw) -> str: + action = str(args.get("action") or "get").strip().lower() + client = _spotify_client() + try: + if action == "get": + return tool_result(client.get_queue()) + if action == "add": + uri = normalize_spotify_uri(str(args.get("uri") or ""), None) + result = client.add_to_queue(uri=uri, device_id=args.get("device_id")) + return tool_result({"success": True, "action": action, "uri": uri, "result": result}) + return tool_error(f"Unknown spotify_queue action: {action}") + except Exception as exc: + return _spotify_tool_error(exc) + + +def _handle_spotify_search(args: dict, **kw) -> str: + client = _spotify_client() + query = str(args.get("query") or "").strip() + if not query: + return tool_error("query is required") + raw_types = _as_list(args.get("types") or args.get("type") or ["track"]) + search_types = [value.lower() for value in raw_types if value.lower() in {"album", "artist", "playlist", "track", "show", "episode", "audiobook"}] + if not search_types: + return tool_error("types must contain one or more of: album, artist, playlist, track, show, episode, audiobook") + try: + return tool_result(client.search( + query=query, + search_types=search_types, + limit=_coerce_limit(args.get("limit"), default=10), + offset=max(0, int(args.get("offset") or 0)), + market=args.get("market"), + include_external=args.get("include_external"), + )) + except Exception as exc: + return _spotify_tool_error(exc) + + +def _handle_spotify_playlists(args: dict, **kw) -> str: + action = str(args.get("action") or "list").strip().lower() + client = _spotify_client() + try: + if action == "list": + return tool_result(client.get_my_playlists( + limit=_coerce_limit(args.get("limit"), default=20), + offset=max(0, int(args.get("offset") or 0)), + )) + if action == "get": + playlist_id = normalize_spotify_id(str(args.get("playlist_id") or ""), "playlist") + return tool_result(client.get_playlist(playlist_id=playlist_id, market=args.get("market"))) + if action == "create": + name = str(args.get("name") or "").strip() + if not name: + return tool_error("name is required for action='create'") + return tool_result(client.create_playlist( + name=name, + public=_coerce_bool(args.get("public")), + collaborative=_coerce_bool(args.get("collaborative")), + description=args.get("description"), + )) + if action == "add_items": + playlist_id = normalize_spotify_id(str(args.get("playlist_id") or ""), "playlist") + uris = normalize_spotify_uris(_as_list(args.get("uris"))) + return tool_result(client.add_playlist_items( + playlist_id=playlist_id, + uris=uris, + position=args.get("position"), + )) + if action == "remove_items": + playlist_id = normalize_spotify_id(str(args.get("playlist_id") or ""), "playlist") + uris = normalize_spotify_uris(_as_list(args.get("uris"))) + return tool_result(client.remove_playlist_items( + playlist_id=playlist_id, + uris=uris, + snapshot_id=args.get("snapshot_id"), + )) + if action == "update_details": + playlist_id = normalize_spotify_id(str(args.get("playlist_id") or ""), "playlist") + return tool_result(client.update_playlist_details( + playlist_id=playlist_id, + name=args.get("name"), + public=args.get("public"), + collaborative=args.get("collaborative"), + description=args.get("description"), + )) + return tool_error(f"Unknown spotify_playlists action: {action}") + except Exception as exc: + return _spotify_tool_error(exc) + + +def _handle_spotify_albums(args: dict, **kw) -> str: + action = str(args.get("action") or "get").strip().lower() + client = _spotify_client() + try: + album_id = normalize_spotify_id(str(args.get("album_id") or args.get("id") or ""), "album") + if action == "get": + return tool_result(client.get_album(album_id=album_id, market=args.get("market"))) + if action == "tracks": + return tool_result(client.get_album_tracks( + album_id=album_id, + limit=_coerce_limit(args.get("limit"), default=20), + offset=max(0, int(args.get("offset") or 0)), + market=args.get("market"), + )) + return tool_error(f"Unknown spotify_albums action: {action}") + except Exception as exc: + return _spotify_tool_error(exc) + + +def _handle_spotify_saved_tracks(args: dict, **kw) -> str: + action = str(args.get("action") or "list").strip().lower() + client = _spotify_client() + try: + if action == "list": + return tool_result(client.get_saved_tracks( + limit=_coerce_limit(args.get("limit"), default=20), + offset=max(0, int(args.get("offset") or 0)), + market=args.get("market"), + )) + if action == "save": + uris = normalize_spotify_uris(_as_list(args.get("uris") or args.get("items")), "track") + return tool_result(client.save_library_items(uris=uris)) + if action == "remove": + track_ids = [normalize_spotify_id(item, "track") for item in _as_list(args.get("ids") or args.get("items"))] + if not track_ids: + return tool_error("ids/items is required for action='remove'") + return tool_result(client.remove_saved_tracks(track_ids=track_ids)) + return tool_error(f"Unknown spotify_saved_tracks action: {action}") + except Exception as exc: + return _spotify_tool_error(exc) + + +def _handle_spotify_saved_albums(args: dict, **kw) -> str: + action = str(args.get("action") or "list").strip().lower() + client = _spotify_client() + try: + if action == "list": + return tool_result(client.get_saved_albums( + limit=_coerce_limit(args.get("limit"), default=20), + offset=max(0, int(args.get("offset") or 0)), + market=args.get("market"), + )) + if action == "save": + uris = normalize_spotify_uris(_as_list(args.get("uris") or args.get("items")), "album") + return tool_result(client.save_library_items(uris=uris)) + if action == "remove": + album_ids = [normalize_spotify_id(item, "album") for item in _as_list(args.get("ids") or args.get("items"))] + if not album_ids: + return tool_error("ids/items is required for action='remove'") + return tool_result(client.remove_saved_albums(album_ids=album_ids)) + return tool_error(f"Unknown spotify_saved_albums action: {action}") + except Exception as exc: + return _spotify_tool_error(exc) + + +def _handle_spotify_activity(args: dict, **kw) -> str: + action = str(args.get("action") or "now_playing").strip().lower() + client = _spotify_client() + try: + if action == "now_playing": + payload = client.get_currently_playing(market=args.get("market")) + if isinstance(payload, dict) and payload.get("empty"): + return tool_result({ + "success": True, + "action": action, + "is_playing": False, + "status_code": payload.get("status_code", 204), + "message": payload.get("message") or "Spotify is not currently playing anything.", + }) + return tool_result(payload) + if action == "recently_played": + after = args.get("after") + before = args.get("before") + if after and before: + return tool_error("Provide only one of 'after' or 'before'") + return tool_result(client.get_recently_played( + limit=_coerce_limit(args.get("limit"), default=20), + after=int(after) if after is not None else None, + before=int(before) if before is not None else None, + )) + return tool_error(f"Unknown spotify_activity action: {action}") + except Exception as exc: + return _spotify_tool_error(exc) + + +COMMON_STRING = {"type": "string"} + +SPOTIFY_PLAYBACK_SCHEMA = { + "name": "spotify_playback", + "description": "Control Spotify playback or inspect the active playback state.", + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "enum": ["get_state", "get_currently_playing", "play", "pause", "next", "previous", "seek", "set_repeat", "set_shuffle", "set_volume"]}, + "device_id": COMMON_STRING, + "market": COMMON_STRING, + "context_uri": COMMON_STRING, + "uris": {"type": "array", "items": COMMON_STRING}, + "offset": {"type": "object"}, + "position_ms": {"type": "integer"}, + "state": {"description": "For set_repeat use track/context/off. For set_shuffle use boolean-like true/false.", "oneOf": [{"type": "string"}, {"type": "boolean"}]}, + "volume_percent": {"type": "integer"}, + }, + "required": ["action"], + }, +} + +SPOTIFY_DEVICES_SCHEMA = { + "name": "spotify_devices", + "description": "List Spotify Connect devices or transfer playback to a different device.", + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "enum": ["list", "transfer"]}, + "device_id": COMMON_STRING, + "play": {"type": "boolean"}, + }, + "required": ["action"], + }, +} + +SPOTIFY_QUEUE_SCHEMA = { + "name": "spotify_queue", + "description": "Inspect the user's Spotify queue or add an item to it.", + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "enum": ["get", "add"]}, + "uri": COMMON_STRING, + "device_id": COMMON_STRING, + }, + "required": ["action"], + }, +} + +SPOTIFY_SEARCH_SCHEMA = { + "name": "spotify_search", + "description": "Search the Spotify catalog for tracks, albums, artists, playlists, shows, or episodes.", + "parameters": { + "type": "object", + "properties": { + "query": COMMON_STRING, + "types": {"type": "array", "items": COMMON_STRING}, + "type": COMMON_STRING, + "limit": {"type": "integer"}, + "offset": {"type": "integer"}, + "market": COMMON_STRING, + "include_external": COMMON_STRING, + }, + "required": ["query"], + }, +} + +SPOTIFY_PLAYLISTS_SCHEMA = { + "name": "spotify_playlists", + "description": "List, inspect, create, update, and modify Spotify playlists.", + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "enum": ["list", "get", "create", "add_items", "remove_items", "update_details"]}, + "playlist_id": COMMON_STRING, + "market": COMMON_STRING, + "limit": {"type": "integer"}, + "offset": {"type": "integer"}, + "name": COMMON_STRING, + "description": COMMON_STRING, + "public": {"type": "boolean"}, + "collaborative": {"type": "boolean"}, + "uris": {"type": "array", "items": COMMON_STRING}, + "position": {"type": "integer"}, + "snapshot_id": COMMON_STRING, + }, + "required": ["action"], + }, +} + +SPOTIFY_ALBUMS_SCHEMA = { + "name": "spotify_albums", + "description": "Fetch Spotify album metadata or album tracks.", + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "enum": ["get", "tracks"]}, + "album_id": COMMON_STRING, + "id": COMMON_STRING, + "market": COMMON_STRING, + "limit": {"type": "integer"}, + "offset": {"type": "integer"}, + }, + "required": ["action"], + }, +} + +SPOTIFY_SAVED_TRACKS_SCHEMA = { + "name": "spotify_saved_tracks", + "description": "List, save, or remove the user's saved Spotify tracks.", + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "enum": ["list", "save", "remove"]}, + "limit": {"type": "integer"}, + "offset": {"type": "integer"}, + "market": COMMON_STRING, + "uris": {"type": "array", "items": COMMON_STRING}, + "ids": {"type": "array", "items": COMMON_STRING}, + "items": {"type": "array", "items": COMMON_STRING}, + }, + "required": ["action"], + }, +} + +SPOTIFY_SAVED_ALBUMS_SCHEMA = { + "name": "spotify_saved_albums", + "description": "List, save, or remove the user's saved Spotify albums.", + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "enum": ["list", "save", "remove"]}, + "limit": {"type": "integer"}, + "offset": {"type": "integer"}, + "market": COMMON_STRING, + "uris": {"type": "array", "items": COMMON_STRING}, + "ids": {"type": "array", "items": COMMON_STRING}, + "items": {"type": "array", "items": COMMON_STRING}, + }, + "required": ["action"], + }, +} + +SPOTIFY_ACTIVITY_SCHEMA = { + "name": "spotify_activity", + "description": "Inspect now playing or recently played Spotify activity.", + "parameters": { + "type": "object", + "properties": { + "action": {"type": "string", "enum": ["now_playing", "recently_played"]}, + "market": COMMON_STRING, + "limit": {"type": "integer"}, + "after": {"type": "integer"}, + "before": {"type": "integer"}, + }, + "required": ["action"], + }, +} + + +registry.register(name="spotify_playback", toolset="spotify", schema=SPOTIFY_PLAYBACK_SCHEMA, handler=_handle_spotify_playback, check_fn=_check_spotify_available, emoji="🎵") +registry.register(name="spotify_devices", toolset="spotify", schema=SPOTIFY_DEVICES_SCHEMA, handler=_handle_spotify_devices, check_fn=_check_spotify_available, emoji="🔈") +registry.register(name="spotify_queue", toolset="spotify", schema=SPOTIFY_QUEUE_SCHEMA, handler=_handle_spotify_queue, check_fn=_check_spotify_available, emoji="📻") +registry.register(name="spotify_search", toolset="spotify", schema=SPOTIFY_SEARCH_SCHEMA, handler=_handle_spotify_search, check_fn=_check_spotify_available, emoji="🔎") +registry.register(name="spotify_playlists", toolset="spotify", schema=SPOTIFY_PLAYLISTS_SCHEMA, handler=_handle_spotify_playlists, check_fn=_check_spotify_available, emoji="📚") +registry.register(name="spotify_albums", toolset="spotify", schema=SPOTIFY_ALBUMS_SCHEMA, handler=_handle_spotify_albums, check_fn=_check_spotify_available, emoji="💿") +registry.register(name="spotify_saved_tracks", toolset="spotify", schema=SPOTIFY_SAVED_TRACKS_SCHEMA, handler=_handle_spotify_saved_tracks, check_fn=_check_spotify_available, emoji="❤️") +registry.register(name="spotify_saved_albums", toolset="spotify", schema=SPOTIFY_SAVED_ALBUMS_SCHEMA, handler=_handle_spotify_saved_albums, check_fn=_check_spotify_available, emoji="💽") +registry.register(name="spotify_activity", toolset="spotify", schema=SPOTIFY_ACTIVITY_SCHEMA, handler=_handle_spotify_activity, check_fn=_check_spotify_available, emoji="🕘") diff --git a/toolsets.py b/toolsets.py index b3cdb2e7a..5fd67fda7 100644 --- a/toolsets.py +++ b/toolsets.py @@ -60,6 +60,10 @@ _HERMES_CORE_TOOLS = [ "send_message", # Home Assistant smart home control (gated on HASS_TOKEN via check_fn) "ha_list_entities", "ha_get_state", "ha_list_services", "ha_call_service", + # Spotify playback and library tools (gated on Spotify auth via check_fn) + "spotify_playback", "spotify_devices", "spotify_queue", "spotify_search", + "spotify_playlists", "spotify_albums", "spotify_saved_tracks", + "spotify_saved_albums", "spotify_activity", ] @@ -217,6 +221,16 @@ TOOLSETS = { "includes": [] }, + "spotify": { + "description": "Native Spotify playback, search, playlist, album, library, and activity tools", + "tools": [ + "spotify_playback", "spotify_devices", "spotify_queue", "spotify_search", + "spotify_playlists", "spotify_albums", "spotify_saved_tracks", + "spotify_saved_albums", "spotify_activity", + ], + "includes": [] + }, + # Scenario-specific toolsets