diff --git a/hermes_cli/auth.py b/hermes_cli/auth.py
index ed6b8f462..6e8774bf5 100644
--- a/hermes_cli/auth.py
+++ b/hermes_cli/auth.py
@@ -33,8 +33,10 @@ import webbrowser
from contextlib import contextmanager
from dataclasses import dataclass, field
from datetime import datetime, timezone
+from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from typing import Any, Dict, List, Optional
+from urllib.parse import parse_qs, urlencode, urlparse
import httpx
import yaml
@@ -81,6 +83,25 @@ CODEX_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120
QWEN_OAUTH_CLIENT_ID = "f0304373b74a44d2b584a3fb70ca9e56"
QWEN_OAUTH_TOKEN_URL = "https://chat.qwen.ai/api/v1/oauth2/token"
QWEN_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120
+DEFAULT_SPOTIFY_ACCOUNTS_BASE_URL = "https://accounts.spotify.com"
+DEFAULT_SPOTIFY_API_BASE_URL = "https://api.spotify.com/v1"
+DEFAULT_SPOTIFY_REDIRECT_URI = "http://127.0.0.1:43827/spotify/callback"
+SPOTIFY_ACCESS_TOKEN_REFRESH_SKEW_SECONDS = 120
+DEFAULT_SPOTIFY_SCOPE = " ".join((
+ "user-modify-playback-state",
+ "user-read-playback-state",
+ "user-read-currently-playing",
+ "user-read-recently-played",
+ "playlist-read-private",
+ "playlist-read-collaborative",
+ "playlist-modify-public",
+ "playlist-modify-private",
+ "user-library-read",
+ "user-library-modify",
+))
+SERVICE_PROVIDER_NAMES: Dict[str, str] = {
+ "spotify": "Spotify",
+}
# Google Gemini OAuth (google-gemini-cli provider, Cloud Code Assist backend)
DEFAULT_GEMINI_CLOUDCODE_BASE_URL = "cloudcode-pa://google"
@@ -795,6 +816,34 @@ def _save_provider_state(auth_store: Dict[str, Any], provider_id: str, state: Di
auth_store["active_provider"] = provider_id
+def _store_provider_state(
+ auth_store: Dict[str, Any],
+ provider_id: str,
+ state: Dict[str, Any],
+ *,
+ set_active: bool = True,
+) -> None:
+ providers = auth_store.setdefault("providers", {})
+ if not isinstance(providers, dict):
+ auth_store["providers"] = {}
+ providers = auth_store["providers"]
+ providers[provider_id] = state
+ if set_active:
+ auth_store["active_provider"] = provider_id
+
+
+def is_known_auth_provider(provider_id: str) -> bool:
+ normalized = (provider_id or "").strip().lower()
+ return normalized in PROVIDER_REGISTRY or normalized in SERVICE_PROVIDER_NAMES
+
+
+def get_auth_provider_display_name(provider_id: str) -> str:
+ normalized = (provider_id or "").strip().lower()
+ if normalized in PROVIDER_REGISTRY:
+ return PROVIDER_REGISTRY[normalized].name
+ return SERVICE_PROVIDER_NAMES.get(normalized, provider_id)
+
+
def read_credential_pool(provider_id: Optional[str] = None) -> Dict[str, Any]:
"""Return the persisted credential pool, or one provider slice."""
auth_store = _load_auth_store()
@@ -1429,8 +1478,520 @@ def get_gemini_oauth_auth_status() -> Dict[str, Any]:
"email": creds.email,
"project_id": creds.project_id,
}
+# Spotify auth — PKCE tokens stored in ~/.hermes/auth.json
+# =============================================================================
+def _spotify_scope_list(raw_scope: Optional[str] = None) -> List[str]:
+ scope_text = (raw_scope or DEFAULT_SPOTIFY_SCOPE).strip()
+ scopes = [part for part in scope_text.split() if part]
+ seen: set[str] = set()
+ ordered: List[str] = []
+ for scope in scopes:
+ if scope not in seen:
+ seen.add(scope)
+ ordered.append(scope)
+ return ordered
+
+
+def _spotify_scope_string(raw_scope: Optional[str] = None) -> str:
+ return " ".join(_spotify_scope_list(raw_scope))
+
+
+def _spotify_client_id(
+ explicit: Optional[str] = None,
+ state: Optional[Dict[str, Any]] = None,
+) -> str:
+ from hermes_cli.config import get_env_value
+
+ candidates = (
+ explicit,
+ get_env_value("HERMES_SPOTIFY_CLIENT_ID"),
+ get_env_value("SPOTIFY_CLIENT_ID"),
+ state.get("client_id") if isinstance(state, dict) else None,
+ )
+ for candidate in candidates:
+ cleaned = str(candidate or "").strip()
+ if cleaned:
+ return cleaned
+ raise AuthError(
+ "Spotify client_id is required. Set HERMES_SPOTIFY_CLIENT_ID or pass --client-id.",
+ provider="spotify",
+ code="spotify_client_id_missing",
+ )
+
+
+def _spotify_redirect_uri(
+ explicit: Optional[str] = None,
+ state: Optional[Dict[str, Any]] = None,
+) -> str:
+ from hermes_cli.config import get_env_value
+
+ candidates = (
+ explicit,
+ get_env_value("HERMES_SPOTIFY_REDIRECT_URI"),
+ get_env_value("SPOTIFY_REDIRECT_URI"),
+ state.get("redirect_uri") if isinstance(state, dict) else None,
+ DEFAULT_SPOTIFY_REDIRECT_URI,
+ )
+ for candidate in candidates:
+ cleaned = str(candidate or "").strip()
+ if cleaned:
+ return cleaned
+ return DEFAULT_SPOTIFY_REDIRECT_URI
+
+
+def _spotify_api_base_url(state: Optional[Dict[str, Any]] = None) -> str:
+ from hermes_cli.config import get_env_value
+
+ candidates = (
+ get_env_value("HERMES_SPOTIFY_API_BASE_URL"),
+ state.get("api_base_url") if isinstance(state, dict) else None,
+ DEFAULT_SPOTIFY_API_BASE_URL,
+ )
+ for candidate in candidates:
+ cleaned = str(candidate or "").strip().rstrip("/")
+ if cleaned:
+ return cleaned
+ return DEFAULT_SPOTIFY_API_BASE_URL
+
+
+def _spotify_accounts_base_url(state: Optional[Dict[str, Any]] = None) -> str:
+ from hermes_cli.config import get_env_value
+
+ candidates = (
+ get_env_value("HERMES_SPOTIFY_ACCOUNTS_BASE_URL"),
+ state.get("accounts_base_url") if isinstance(state, dict) else None,
+ DEFAULT_SPOTIFY_ACCOUNTS_BASE_URL,
+ )
+ for candidate in candidates:
+ cleaned = str(candidate or "").strip().rstrip("/")
+ if cleaned:
+ return cleaned
+ return DEFAULT_SPOTIFY_ACCOUNTS_BASE_URL
+
+
+def _spotify_code_verifier(length: int = 64) -> str:
+ raw = base64.urlsafe_b64encode(os.urandom(length)).decode("ascii")
+ return raw.rstrip("=")[:128]
+
+
+def _spotify_code_challenge(code_verifier: str) -> str:
+ digest = hashlib.sha256(code_verifier.encode("utf-8")).digest()
+ return base64.urlsafe_b64encode(digest).decode("ascii").rstrip("=")
+
+
+def _spotify_build_authorize_url(
+ *,
+ client_id: str,
+ redirect_uri: str,
+ scope: str,
+ state: str,
+ code_challenge: str,
+ accounts_base_url: str,
+) -> str:
+ query = urlencode({
+ "client_id": client_id,
+ "response_type": "code",
+ "redirect_uri": redirect_uri,
+ "scope": scope,
+ "state": state,
+ "code_challenge_method": "S256",
+ "code_challenge": code_challenge,
+ })
+ return f"{accounts_base_url}/authorize?{query}"
+
+
+def _spotify_validate_redirect_uri(redirect_uri: str) -> tuple[str, int, str]:
+ parsed = urlparse(redirect_uri)
+ if parsed.scheme != "http":
+ raise AuthError(
+ "Spotify PKCE redirect_uri must use http://localhost or http://127.0.0.1.",
+ provider="spotify",
+ code="spotify_redirect_invalid",
+ )
+ host = parsed.hostname or ""
+ if host not in {"127.0.0.1", "localhost"}:
+ raise AuthError(
+ "Spotify PKCE redirect_uri must point to localhost or 127.0.0.1.",
+ provider="spotify",
+ code="spotify_redirect_invalid",
+ )
+ if not parsed.port:
+ raise AuthError(
+ "Spotify PKCE redirect_uri must include an explicit localhost port.",
+ provider="spotify",
+ code="spotify_redirect_invalid",
+ )
+ return host, parsed.port, parsed.path or "/"
+
+
+def _make_spotify_callback_handler(expected_path: str) -> tuple[type[BaseHTTPRequestHandler], dict[str, Any]]:
+ result: dict[str, Any] = {
+ "code": None,
+ "state": None,
+ "error": None,
+ "error_description": None,
+ }
+
+ class _SpotifyCallbackHandler(BaseHTTPRequestHandler):
+ def do_GET(self) -> None: # noqa: N802
+ parsed = urlparse(self.path)
+ if parsed.path != expected_path:
+ self.send_response(404)
+ self.end_headers()
+ self.wfile.write(b"Not found.")
+ return
+
+ params = parse_qs(parsed.query)
+ result["code"] = params.get("code", [None])[0]
+ result["state"] = params.get("state", [None])[0]
+ result["error"] = params.get("error", [None])[0]
+ result["error_description"] = params.get("error_description", [None])[0]
+
+ self.send_response(200)
+ self.send_header("Content-Type", "text/html; charset=utf-8")
+ self.end_headers()
+ if result["error"]:
+ body = "
Spotify authorization failed.
You can close this tab."
+ else:
+ body = "Spotify authorization received.
You can close this tab."
+ self.wfile.write(body.encode("utf-8"))
+
+ def log_message(self, format: str, *args: Any) -> None: # noqa: A003
+ return
+
+ return _SpotifyCallbackHandler, result
+
+
+def _spotify_wait_for_callback(
+ redirect_uri: str,
+ *,
+ timeout_seconds: float = 180.0,
+) -> dict[str, Any]:
+ host, port, path = _spotify_validate_redirect_uri(redirect_uri)
+ handler_cls, result = _make_spotify_callback_handler(path)
+
+ class _ReuseHTTPServer(HTTPServer):
+ allow_reuse_address = True
+
+ try:
+ server = _ReuseHTTPServer((host, port), handler_cls)
+ except OSError as exc:
+ raise AuthError(
+ f"Could not bind Spotify callback server on {host}:{port}: {exc}",
+ provider="spotify",
+ code="spotify_callback_bind_failed",
+ ) from exc
+
+ thread = threading.Thread(target=server.serve_forever, kwargs={"poll_interval": 0.1}, daemon=True)
+ thread.start()
+ deadline = time.time() + max(5.0, timeout_seconds)
+ try:
+ while time.time() < deadline:
+ if result["code"] or result["error"]:
+ return result
+ time.sleep(0.1)
+ finally:
+ server.shutdown()
+ server.server_close()
+ thread.join(timeout=1.0)
+ raise AuthError(
+ "Spotify authorization timed out waiting for the local callback.",
+ provider="spotify",
+ code="spotify_callback_timeout",
+ )
+
+
+def _spotify_token_payload_to_state(
+ token_payload: Dict[str, Any],
+ *,
+ client_id: str,
+ redirect_uri: str,
+ requested_scope: str,
+ accounts_base_url: str,
+ api_base_url: str,
+ previous_state: Optional[Dict[str, Any]] = None,
+) -> Dict[str, Any]:
+ now = datetime.now(timezone.utc)
+ expires_in = _coerce_ttl_seconds(token_payload.get("expires_in", 0))
+ expires_at = datetime.fromtimestamp(now.timestamp() + expires_in, tz=timezone.utc)
+ state = dict(previous_state or {})
+ state.update({
+ "client_id": client_id,
+ "redirect_uri": redirect_uri,
+ "accounts_base_url": accounts_base_url,
+ "api_base_url": api_base_url,
+ "scope": requested_scope,
+ "granted_scope": str(token_payload.get("scope") or requested_scope).strip(),
+ "token_type": str(token_payload.get("token_type", "Bearer") or "Bearer").strip() or "Bearer",
+ "access_token": str(token_payload.get("access_token", "") or "").strip(),
+ "refresh_token": str(
+ token_payload.get("refresh_token")
+ or state.get("refresh_token")
+ or ""
+ ).strip(),
+ "obtained_at": now.isoformat(),
+ "expires_at": expires_at.isoformat(),
+ "expires_in": expires_in,
+ "auth_type": "oauth_pkce",
+ })
+ return state
+
+
+def _spotify_exchange_code_for_tokens(
+ *,
+ client_id: str,
+ code: str,
+ redirect_uri: str,
+ code_verifier: str,
+ accounts_base_url: str,
+ timeout_seconds: float = 20.0,
+) -> Dict[str, Any]:
+ try:
+ response = httpx.post(
+ f"{accounts_base_url}/api/token",
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ data={
+ "client_id": client_id,
+ "grant_type": "authorization_code",
+ "code": code,
+ "redirect_uri": redirect_uri,
+ "code_verifier": code_verifier,
+ },
+ timeout=timeout_seconds,
+ )
+ except Exception as exc:
+ raise AuthError(
+ f"Spotify token exchange failed: {exc}",
+ provider="spotify",
+ code="spotify_token_exchange_failed",
+ ) from exc
+
+ if response.status_code >= 400:
+ detail = response.text.strip()
+ raise AuthError(
+ "Spotify token exchange failed."
+ + (f" Response: {detail}" if detail else ""),
+ provider="spotify",
+ code="spotify_token_exchange_failed",
+ )
+ payload = response.json()
+ if not isinstance(payload, dict) or not str(payload.get("access_token", "") or "").strip():
+ raise AuthError(
+ "Spotify token response did not include an access_token.",
+ provider="spotify",
+ code="spotify_token_exchange_invalid",
+ )
+ return payload
+
+
+def _refresh_spotify_oauth_state(
+ state: Dict[str, Any],
+ *,
+ timeout_seconds: float = 20.0,
+) -> Dict[str, Any]:
+ refresh_token = str(state.get("refresh_token", "") or "").strip()
+ if not refresh_token:
+ raise AuthError(
+ "Spotify refresh token missing. Run `hermes auth spotify` again.",
+ provider="spotify",
+ code="spotify_refresh_token_missing",
+ relogin_required=True,
+ )
+
+ client_id = _spotify_client_id(state=state)
+ accounts_base_url = _spotify_accounts_base_url(state)
+ try:
+ response = httpx.post(
+ f"{accounts_base_url}/api/token",
+ headers={"Content-Type": "application/x-www-form-urlencoded"},
+ data={
+ "grant_type": "refresh_token",
+ "refresh_token": refresh_token,
+ "client_id": client_id,
+ },
+ timeout=timeout_seconds,
+ )
+ except Exception as exc:
+ raise AuthError(
+ f"Spotify token refresh failed: {exc}",
+ provider="spotify",
+ code="spotify_refresh_failed",
+ ) from exc
+
+ if response.status_code >= 400:
+ detail = response.text.strip()
+ raise AuthError(
+ "Spotify token refresh failed. Run `hermes auth spotify` again."
+ + (f" Response: {detail}" if detail else ""),
+ provider="spotify",
+ code="spotify_refresh_failed",
+ relogin_required=True,
+ )
+
+ payload = response.json()
+ if not isinstance(payload, dict) or not str(payload.get("access_token", "") or "").strip():
+ raise AuthError(
+ "Spotify refresh response did not include an access_token.",
+ provider="spotify",
+ code="spotify_refresh_invalid",
+ relogin_required=True,
+ )
+
+ return _spotify_token_payload_to_state(
+ payload,
+ client_id=client_id,
+ redirect_uri=_spotify_redirect_uri(state=state),
+ requested_scope=str(state.get("scope") or DEFAULT_SPOTIFY_SCOPE),
+ accounts_base_url=accounts_base_url,
+ api_base_url=_spotify_api_base_url(state),
+ previous_state=state,
+ )
+
+
+def resolve_spotify_runtime_credentials(
+ *,
+ force_refresh: bool = False,
+ refresh_if_expiring: bool = True,
+ refresh_skew_seconds: int = SPOTIFY_ACCESS_TOKEN_REFRESH_SKEW_SECONDS,
+) -> Dict[str, Any]:
+ with _auth_store_lock():
+ auth_store = _load_auth_store()
+ state = _load_provider_state(auth_store, "spotify")
+ if not state:
+ raise AuthError(
+ "Spotify is not authenticated. Run `hermes auth spotify` first.",
+ provider="spotify",
+ code="spotify_auth_missing",
+ relogin_required=True,
+ )
+
+ should_refresh = bool(force_refresh)
+ if not should_refresh and refresh_if_expiring:
+ should_refresh = _is_expiring(state.get("expires_at"), refresh_skew_seconds)
+ if should_refresh:
+ state = _refresh_spotify_oauth_state(state)
+ _store_provider_state(auth_store, "spotify", state, set_active=False)
+ _save_auth_store(auth_store)
+
+ access_token = str(state.get("access_token", "") or "").strip()
+ if not access_token:
+ raise AuthError(
+ "Spotify access token missing. Run `hermes auth spotify` again.",
+ provider="spotify",
+ code="spotify_access_token_missing",
+ relogin_required=True,
+ )
+
+ return {
+ "provider": "spotify",
+ "access_token": access_token,
+ "api_key": access_token,
+ "token_type": str(state.get("token_type", "Bearer") or "Bearer"),
+ "base_url": _spotify_api_base_url(state),
+ "scope": str(state.get("granted_scope") or state.get("scope") or "").strip(),
+ "client_id": _spotify_client_id(state=state),
+ "redirect_uri": _spotify_redirect_uri(state=state),
+ "expires_at": state.get("expires_at"),
+ "refresh_token": str(state.get("refresh_token", "") or "").strip(),
+ }
+
+
+def get_spotify_auth_status() -> Dict[str, Any]:
+ state = get_provider_auth_state("spotify")
+ if not state:
+ return {"logged_in": False}
+
+ expires_at = state.get("expires_at")
+ refresh_token = str(state.get("refresh_token", "") or "").strip()
+ return {
+ "logged_in": bool(refresh_token or not _is_expiring(expires_at, 0)),
+ "auth_type": state.get("auth_type", "oauth_pkce"),
+ "client_id": state.get("client_id"),
+ "redirect_uri": state.get("redirect_uri"),
+ "scope": state.get("granted_scope") or state.get("scope"),
+ "expires_at": expires_at,
+ "api_base_url": state.get("api_base_url"),
+ "has_refresh_token": bool(refresh_token),
+ }
+
+
+def login_spotify_command(args) -> None:
+ existing_state = get_provider_auth_state("spotify") or {}
+ client_id = _spotify_client_id(getattr(args, "client_id", None), existing_state)
+ redirect_uri = _spotify_redirect_uri(getattr(args, "redirect_uri", None), existing_state)
+ scope = _spotify_scope_string(getattr(args, "scope", None) or existing_state.get("scope"))
+ accounts_base_url = _spotify_accounts_base_url(existing_state)
+ api_base_url = _spotify_api_base_url(existing_state)
+ open_browser = not getattr(args, "no_browser", False)
+
+ code_verifier = _spotify_code_verifier()
+ code_challenge = _spotify_code_challenge(code_verifier)
+ state_nonce = uuid.uuid4().hex
+ authorize_url = _spotify_build_authorize_url(
+ client_id=client_id,
+ redirect_uri=redirect_uri,
+ scope=scope,
+ state=state_nonce,
+ code_challenge=code_challenge,
+ accounts_base_url=accounts_base_url,
+ )
+
+ print("Starting Spotify PKCE login...")
+ print(f"Client ID: {client_id}")
+ print(f"Redirect URI: {redirect_uri}")
+ print("Make sure this redirect URI is allow-listed in your Spotify app settings.")
+ print()
+ print("Open this URL to authorize Hermes:")
+ print(authorize_url)
+ print()
+
+ if open_browser and not _is_remote_session():
+ try:
+ opened = webbrowser.open(authorize_url)
+ except Exception:
+ opened = False
+ if opened:
+ print("Browser opened for Spotify authorization.")
+ else:
+ print("Could not open the browser automatically; use the URL above.")
+
+ callback = _spotify_wait_for_callback(
+ redirect_uri,
+ timeout_seconds=float(getattr(args, "timeout", None) or 180.0),
+ )
+ if callback.get("error"):
+ detail = callback.get("error_description") or callback["error"]
+ raise SystemExit(f"Spotify authorization failed: {detail}")
+ if callback.get("state") != state_nonce:
+ raise SystemExit("Spotify authorization failed: state mismatch.")
+
+ token_payload = _spotify_exchange_code_for_tokens(
+ client_id=client_id,
+ code=str(callback.get("code") or ""),
+ redirect_uri=redirect_uri,
+ code_verifier=code_verifier,
+ accounts_base_url=accounts_base_url,
+ timeout_seconds=float(getattr(args, "timeout", None) or 20.0),
+ )
+ spotify_state = _spotify_token_payload_to_state(
+ token_payload,
+ client_id=client_id,
+ redirect_uri=redirect_uri,
+ requested_scope=scope,
+ accounts_base_url=accounts_base_url,
+ api_base_url=api_base_url,
+ )
+
+ with _auth_store_lock():
+ auth_store = _load_auth_store()
+ _store_provider_state(auth_store, "spotify", spotify_state, set_active=False)
+ saved_to = _save_auth_store(auth_store)
+
+ print("Spotify login successful!")
+ print(f" Auth state: {saved_to}")
+ print(" Provider state saved under providers.spotify")
# =============================================================================
# SSH / remote session detection
@@ -2744,6 +3305,8 @@ def get_external_process_provider_status(provider_id: str) -> Dict[str, Any]:
def get_auth_status(provider_id: Optional[str] = None) -> Dict[str, Any]:
"""Generic auth status dispatcher."""
target = provider_id or get_active_provider()
+ if target == "spotify":
+ return get_spotify_auth_status()
if target == "nous":
return get_nous_auth_status()
if target == "openai-codex":
@@ -3658,7 +4221,7 @@ def logout_command(args) -> None:
"""Clear auth state for a provider."""
provider_id = getattr(args, "provider", None)
- if provider_id and provider_id not in PROVIDER_REGISTRY:
+ if provider_id and not is_known_auth_provider(provider_id):
print(f"Unknown provider: {provider_id}")
raise SystemExit(1)
@@ -3669,8 +4232,8 @@ def logout_command(args) -> None:
print("No provider is currently logged in.")
return
- provider_name = PROVIDER_REGISTRY[target].name if target in PROVIDER_REGISTRY else target
config_matches = _config_provider_matches(target)
+ provider_name = get_auth_provider_display_name(target)
if clear_provider_auth(target) or config_matches:
_reset_config_provider()
diff --git a/hermes_cli/auth_commands.py b/hermes_cli/auth_commands.py
index aa092a058..94ea2559c 100644
--- a/hermes_cli/auth_commands.py
+++ b/hermes_cli/auth_commands.py
@@ -408,6 +408,44 @@ def auth_reset_command(args) -> None:
print(f"Reset status on {count} {provider} credentials")
+def auth_status_command(args) -> None:
+ provider = _normalize_provider(getattr(args, "provider", "") or "")
+ if not provider:
+ raise SystemExit("Provider is required. Example: `hermes auth status spotify`.")
+ status = auth_mod.get_auth_status(provider)
+ if not status.get("logged_in"):
+ reason = status.get("error")
+ if reason:
+ print(f"{provider}: logged out ({reason})")
+ else:
+ print(f"{provider}: logged out")
+ return
+
+ print(f"{provider}: logged in")
+ for key in ("auth_type", "client_id", "redirect_uri", "scope", "expires_at", "api_base_url"):
+ value = status.get(key)
+ if value:
+ print(f" {key}: {value}")
+
+
+def auth_logout_command(args) -> None:
+ auth_mod.logout_command(SimpleNamespace(provider=getattr(args, "provider", None)))
+
+
+def auth_spotify_command(args) -> None:
+ action = str(getattr(args, "spotify_action", "") or "login").strip().lower()
+ if action in {"", "login"}:
+ auth_mod.login_spotify_command(args)
+ return
+ if action == "status":
+ auth_status_command(SimpleNamespace(provider="spotify"))
+ return
+ if action == "logout":
+ auth_logout_command(SimpleNamespace(provider="spotify"))
+ return
+ raise SystemExit(f"Unknown Spotify auth action: {action}")
+
+
def _interactive_auth() -> None:
"""Interactive credential pool management when `hermes auth` is called bare."""
# Show current pool status first
@@ -605,5 +643,14 @@ def auth_command(args) -> None:
if action == "reset":
auth_reset_command(args)
return
+ if action == "status":
+ auth_status_command(args)
+ return
+ if action == "logout":
+ auth_logout_command(args)
+ return
+ if action == "spotify":
+ auth_spotify_command(args)
+ return
# No subcommand — launch interactive mode
_interactive_auth()
diff --git a/hermes_cli/main.py b/hermes_cli/main.py
index 14cca2f03..96ae9adc8 100644
--- a/hermes_cli/main.py
+++ b/hermes_cli/main.py
@@ -7316,7 +7316,7 @@ For more help on a command:
)
logout_parser.add_argument(
"--provider",
- choices=["nous", "openai-codex"],
+ choices=["nous", "openai-codex", "spotify"],
default=None,
help="Provider to log out from (default: active provider)",
)
@@ -7373,6 +7373,17 @@ For more help on a command:
"reset", help="Clear exhaustion status for all credentials for a provider"
)
auth_reset.add_argument("provider", help="Provider id")
+ auth_status = auth_subparsers.add_parser("status", help="Show auth status for a provider")
+ auth_status.add_argument("provider", help="Provider id")
+ auth_logout = auth_subparsers.add_parser("logout", help="Log out a provider and clear stored auth state")
+ auth_logout.add_argument("provider", help="Provider id")
+ auth_spotify = auth_subparsers.add_parser("spotify", help="Authenticate Hermes with Spotify via PKCE")
+ auth_spotify.add_argument("spotify_action", nargs="?", choices=["login", "status", "logout"], default="login")
+ auth_spotify.add_argument("--client-id", help="Spotify app client_id (or set HERMES_SPOTIFY_CLIENT_ID)")
+ auth_spotify.add_argument("--redirect-uri", help="Allow-listed localhost redirect URI for your Spotify app")
+ auth_spotify.add_argument("--scope", help="Override requested Spotify scopes")
+ auth_spotify.add_argument("--no-browser", action="store_true", help="Do not attempt to open the browser automatically")
+ auth_spotify.add_argument("--timeout", type=float, help="Callback/token exchange timeout in seconds")
auth_parser.set_defaults(func=cmd_auth)
# =========================================================================
diff --git a/tests/hermes_cli/test_spotify_auth.py b/tests/hermes_cli/test_spotify_auth.py
new file mode 100644
index 000000000..4873d5f9d
--- /dev/null
+++ b/tests/hermes_cli/test_spotify_auth.py
@@ -0,0 +1,88 @@
+from __future__ import annotations
+
+from types import SimpleNamespace
+
+import pytest
+
+from hermes_cli import auth as auth_mod
+
+
+def test_store_provider_state_can_skip_active_provider() -> None:
+ auth_store = {"active_provider": "nous", "providers": {}}
+
+ auth_mod._store_provider_state(
+ auth_store,
+ "spotify",
+ {"access_token": "abc"},
+ set_active=False,
+ )
+
+ assert auth_store["active_provider"] == "nous"
+ assert auth_store["providers"]["spotify"]["access_token"] == "abc"
+
+
+def test_resolve_spotify_runtime_credentials_refreshes_without_changing_active_provider(
+ tmp_path,
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ monkeypatch.setenv("HERMES_HOME", str(tmp_path))
+
+ with auth_mod._auth_store_lock():
+ store = auth_mod._load_auth_store()
+ store["active_provider"] = "nous"
+ auth_mod._store_provider_state(
+ store,
+ "spotify",
+ {
+ "client_id": "spotify-client",
+ "redirect_uri": "http://127.0.0.1:43827/spotify/callback",
+ "api_base_url": auth_mod.DEFAULT_SPOTIFY_API_BASE_URL,
+ "accounts_base_url": auth_mod.DEFAULT_SPOTIFY_ACCOUNTS_BASE_URL,
+ "scope": auth_mod.DEFAULT_SPOTIFY_SCOPE,
+ "access_token": "expired-token",
+ "refresh_token": "refresh-token",
+ "token_type": "Bearer",
+ "expires_at": "2000-01-01T00:00:00+00:00",
+ },
+ set_active=False,
+ )
+ auth_mod._save_auth_store(store)
+
+ monkeypatch.setattr(
+ auth_mod,
+ "_refresh_spotify_oauth_state",
+ lambda state, timeout_seconds=20.0: {
+ **state,
+ "access_token": "fresh-token",
+ "expires_at": "2099-01-01T00:00:00+00:00",
+ },
+ )
+
+ creds = auth_mod.resolve_spotify_runtime_credentials()
+
+ assert creds["access_token"] == "fresh-token"
+ persisted = auth_mod.get_provider_auth_state("spotify")
+ assert persisted is not None
+ assert persisted["access_token"] == "fresh-token"
+ assert auth_mod.get_active_provider() == "nous"
+
+
+def test_auth_spotify_status_command_reports_logged_in(capsys, monkeypatch: pytest.MonkeyPatch) -> None:
+ monkeypatch.setattr(
+ auth_mod,
+ "get_auth_status",
+ lambda provider=None: {
+ "logged_in": True,
+ "auth_type": "oauth_pkce",
+ "client_id": "spotify-client",
+ "redirect_uri": "http://127.0.0.1:43827/spotify/callback",
+ "scope": "user-library-read",
+ },
+ )
+
+ from hermes_cli.auth_commands import auth_status_command
+
+ auth_status_command(SimpleNamespace(provider="spotify"))
+ output = capsys.readouterr().out
+ assert "spotify: logged in" in output
+ assert "client_id: spotify-client" in output
diff --git a/tests/tools/test_spotify_client.py b/tests/tools/test_spotify_client.py
new file mode 100644
index 000000000..17157ec4e
--- /dev/null
+++ b/tests/tools/test_spotify_client.py
@@ -0,0 +1,244 @@
+from __future__ import annotations
+
+import json
+
+import pytest
+
+from tools.providers import spotify_client as spotify_mod
+from tools import spotify_tool
+
+
+class _FakeResponse:
+ def __init__(self, status_code: int, payload: dict | None = None, *, text: str = "", headers: dict | None = None):
+ self.status_code = status_code
+ self._payload = payload
+ self.text = text or (json.dumps(payload) if payload is not None else "")
+ self.headers = headers or {"content-type": "application/json"}
+ self.content = self.text.encode("utf-8") if self.text else b""
+
+ def json(self):
+ if self._payload is None:
+ raise ValueError("no json")
+ return self._payload
+
+
+class _StubSpotifyClient:
+ def __init__(self, payload):
+ self.payload = payload
+
+ def get_currently_playing(self, *, market=None):
+ return self.payload
+
+
+def test_spotify_client_retries_once_after_401(monkeypatch: pytest.MonkeyPatch) -> None:
+ calls: list[str] = []
+ tokens = iter([
+ {
+ "access_token": "token-1",
+ "base_url": "https://api.spotify.com/v1",
+ },
+ {
+ "access_token": "token-2",
+ "base_url": "https://api.spotify.com/v1",
+ },
+ ])
+
+ monkeypatch.setattr(
+ spotify_mod,
+ "resolve_spotify_runtime_credentials",
+ lambda **kwargs: next(tokens),
+ )
+
+ def fake_request(method, url, headers=None, params=None, json=None, timeout=None):
+ calls.append(headers["Authorization"])
+ if len(calls) == 1:
+ return _FakeResponse(401, {"error": {"message": "expired token"}})
+ return _FakeResponse(200, {"devices": [{"id": "dev-1"}]})
+
+ monkeypatch.setattr(spotify_mod.httpx, "request", fake_request)
+
+ client = spotify_mod.SpotifyClient()
+ payload = client.get_devices()
+
+ assert payload["devices"][0]["id"] == "dev-1"
+ assert calls == ["Bearer token-1", "Bearer token-2"]
+
+
+def test_normalize_spotify_uri_accepts_urls() -> None:
+ uri = spotify_mod.normalize_spotify_uri(
+ "https://open.spotify.com/track/7ouMYWpwJ422jRcDASZB7P",
+ "track",
+ )
+ assert uri == "spotify:track:7ouMYWpwJ422jRcDASZB7P"
+
+
+@pytest.mark.parametrize(
+ ("status_code", "path", "payload", "expected"),
+ [
+ (
+ 403,
+ "/me/player/play",
+ {"error": {"message": "Premium required"}},
+ "Spotify rejected this playback request. Playback control usually requires a Spotify Premium account and an active Spotify Connect device.",
+ ),
+ (
+ 404,
+ "/me/player",
+ {"error": {"message": "Device not found"}},
+ "Spotify could not find an active playback device or player session for this request.",
+ ),
+ (
+ 429,
+ "/search",
+ {"error": {"message": "rate limit"}},
+ "Spotify rate limit exceeded. Retry after 7 seconds.",
+ ),
+ ],
+)
+def test_spotify_client_formats_friendly_api_errors(
+ monkeypatch: pytest.MonkeyPatch,
+ status_code: int,
+ path: str,
+ payload: dict,
+ expected: str,
+) -> None:
+ monkeypatch.setattr(
+ spotify_mod,
+ "resolve_spotify_runtime_credentials",
+ lambda **kwargs: {
+ "access_token": "token-1",
+ "base_url": "https://api.spotify.com/v1",
+ },
+ )
+
+ def fake_request(method, url, headers=None, params=None, json=None, timeout=None):
+ return _FakeResponse(status_code, payload, headers={"content-type": "application/json", "Retry-After": "7"})
+
+ monkeypatch.setattr(spotify_mod.httpx, "request", fake_request)
+
+ client = spotify_mod.SpotifyClient()
+ with pytest.raises(spotify_mod.SpotifyAPIError) as exc:
+ client.request("GET", path)
+
+ assert str(exc.value) == expected
+
+
+def test_get_currently_playing_returns_explanatory_empty_payload(monkeypatch: pytest.MonkeyPatch) -> None:
+ monkeypatch.setattr(
+ spotify_mod,
+ "resolve_spotify_runtime_credentials",
+ lambda **kwargs: {
+ "access_token": "token-1",
+ "base_url": "https://api.spotify.com/v1",
+ },
+ )
+
+ def fake_request(method, url, headers=None, params=None, json=None, timeout=None):
+ return _FakeResponse(204, None, text="", headers={"content-type": "application/json"})
+
+ monkeypatch.setattr(spotify_mod.httpx, "request", fake_request)
+
+ client = spotify_mod.SpotifyClient()
+ payload = client.get_currently_playing()
+
+ assert payload == {
+ "status_code": 204,
+ "empty": True,
+ "message": "Spotify is not currently playing anything. Start playback in Spotify and try again.",
+ }
+
+
+def test_spotify_activity_now_playing_returns_explanatory_empty_result(monkeypatch: pytest.MonkeyPatch) -> None:
+ monkeypatch.setattr(
+ spotify_tool,
+ "_spotify_client",
+ lambda: _StubSpotifyClient({
+ "status_code": 204,
+ "empty": True,
+ "message": "Spotify is not currently playing anything. Start playback in Spotify and try again.",
+ }),
+ )
+
+ payload = json.loads(spotify_tool._handle_spotify_activity({"action": "now_playing"}))
+
+ assert payload == {
+ "success": True,
+ "action": "now_playing",
+ "is_playing": False,
+ "status_code": 204,
+ "message": "Spotify is not currently playing anything. Start playback in Spotify and try again.",
+ }
+
+
+def test_library_contains_uses_generic_library_endpoint(monkeypatch: pytest.MonkeyPatch) -> None:
+ seen: list[tuple[str, str, dict | None]] = []
+
+ monkeypatch.setattr(
+ spotify_mod,
+ "resolve_spotify_runtime_credentials",
+ lambda **kwargs: {
+ "access_token": "token-1",
+ "base_url": "https://api.spotify.com/v1",
+ },
+ )
+
+ def fake_request(method, url, headers=None, params=None, json=None, timeout=None):
+ seen.append((method, url, params))
+ return _FakeResponse(200, [True])
+
+ monkeypatch.setattr(spotify_mod.httpx, "request", fake_request)
+
+ client = spotify_mod.SpotifyClient()
+ payload = client.library_contains(uris=["spotify:album:abc", "spotify:track:def"])
+
+ assert payload == [True]
+ assert seen == [
+ (
+ "GET",
+ "https://api.spotify.com/v1/me/library/contains",
+ {"uris": "spotify:album:abc,spotify:track:def"},
+ )
+ ]
+
+
+@pytest.mark.parametrize(
+ ("method_name", "item_key", "item_value", "expected_uris"),
+ [
+ ("remove_saved_tracks", "track_ids", ["track-a", "track-b"], ["spotify:track:track-a", "spotify:track:track-b"]),
+ ("remove_saved_albums", "album_ids", ["album-a"], ["spotify:album:album-a"]),
+ ],
+)
+def test_library_remove_uses_generic_library_endpoint(
+ monkeypatch: pytest.MonkeyPatch,
+ method_name: str,
+ item_key: str,
+ item_value: list[str],
+ expected_uris: list[str],
+) -> None:
+ seen: list[tuple[str, str, dict | None]] = []
+
+ monkeypatch.setattr(
+ spotify_mod,
+ "resolve_spotify_runtime_credentials",
+ lambda **kwargs: {
+ "access_token": "token-1",
+ "base_url": "https://api.spotify.com/v1",
+ },
+ )
+
+ def fake_request(method, url, headers=None, params=None, json=None, timeout=None):
+ seen.append((method, url, params))
+ return _FakeResponse(200, {})
+
+ monkeypatch.setattr(spotify_mod.httpx, "request", fake_request)
+
+ client = spotify_mod.SpotifyClient()
+ getattr(client, method_name)(**{item_key: item_value})
+
+ assert seen == [
+ (
+ "DELETE",
+ "https://api.spotify.com/v1/me/library",
+ {"uris": ",".join(expected_uris)},
+ )
+ ]
diff --git a/tools/providers/__init__.py b/tools/providers/__init__.py
new file mode 100644
index 000000000..25dd42409
--- /dev/null
+++ b/tools/providers/__init__.py
@@ -0,0 +1 @@
+"""Provider-specific native tool clients."""
diff --git a/tools/providers/spotify_client.py b/tools/providers/spotify_client.py
new file mode 100644
index 000000000..2195cc20a
--- /dev/null
+++ b/tools/providers/spotify_client.py
@@ -0,0 +1,435 @@
+"""Thin Spotify Web API helper used by Hermes native tools."""
+
+from __future__ import annotations
+
+import json
+from typing import Any, Dict, Iterable, Optional
+from urllib.parse import urlparse
+
+import httpx
+
+from hermes_cli.auth import (
+ AuthError,
+ resolve_spotify_runtime_credentials,
+)
+
+
+class SpotifyError(RuntimeError):
+ """Base Spotify tool error."""
+
+
+class SpotifyAuthRequiredError(SpotifyError):
+ """Raised when the user needs to authenticate with Spotify first."""
+
+
+class SpotifyAPIError(SpotifyError):
+ """Structured Spotify API failure."""
+
+ def __init__(
+ self,
+ message: str,
+ *,
+ status_code: Optional[int] = None,
+ response_body: Optional[str] = None,
+ ) -> None:
+ super().__init__(message)
+ self.status_code = status_code
+ self.response_body = response_body
+ self.path = None
+
+
+class SpotifyClient:
+ def __init__(self) -> None:
+ self._runtime = self._resolve_runtime(refresh_if_expiring=True)
+
+ def _resolve_runtime(self, *, force_refresh: bool = False, refresh_if_expiring: bool = True) -> Dict[str, Any]:
+ try:
+ return resolve_spotify_runtime_credentials(
+ force_refresh=force_refresh,
+ refresh_if_expiring=refresh_if_expiring,
+ )
+ except AuthError as exc:
+ raise SpotifyAuthRequiredError(str(exc)) from exc
+
+ @property
+ def base_url(self) -> str:
+ return str(self._runtime.get("base_url") or "").rstrip("/")
+
+ def _headers(self) -> Dict[str, str]:
+ return {
+ "Authorization": f"Bearer {self._runtime['access_token']}",
+ "Content-Type": "application/json",
+ }
+
+ def request(
+ self,
+ method: str,
+ path: str,
+ *,
+ params: Optional[Dict[str, Any]] = None,
+ json_body: Optional[Dict[str, Any]] = None,
+ allow_retry_on_401: bool = True,
+ empty_response: Optional[Dict[str, Any]] = None,
+ ) -> Any:
+ url = f"{self.base_url}{path}"
+ response = httpx.request(
+ method,
+ url,
+ headers=self._headers(),
+ params=_strip_none(params),
+ json=_strip_none(json_body) if json_body is not None else None,
+ timeout=30.0,
+ )
+ if response.status_code == 401 and allow_retry_on_401:
+ self._runtime = self._resolve_runtime(force_refresh=True, refresh_if_expiring=True)
+ return self.request(
+ method,
+ path,
+ params=params,
+ json_body=json_body,
+ allow_retry_on_401=False,
+ )
+ if response.status_code >= 400:
+ self._raise_api_error(response, method=method, path=path)
+ if response.status_code == 204 or not response.content:
+ return empty_response or {"success": True, "status_code": response.status_code, "empty": True}
+ if "application/json" in response.headers.get("content-type", ""):
+ return response.json()
+ return {"success": True, "text": response.text}
+
+ def _raise_api_error(self, response: httpx.Response, *, method: str, path: str) -> None:
+ detail = response.text.strip()
+ message = _friendly_spotify_error_message(
+ status_code=response.status_code,
+ detail=_extract_spotify_error_detail(response, fallback=detail),
+ method=method,
+ path=path,
+ retry_after=response.headers.get("Retry-After"),
+ )
+ error = SpotifyAPIError(message, status_code=response.status_code, response_body=detail)
+ error.path = path
+ raise error
+
+ def get_devices(self) -> Any:
+ return self.request("GET", "/me/player/devices")
+
+ def transfer_playback(self, *, device_id: str, play: bool = False) -> Any:
+ return self.request("PUT", "/me/player", json_body={
+ "device_ids": [device_id],
+ "play": play,
+ })
+
+ def get_playback_state(self, *, market: Optional[str] = None) -> Any:
+ return self.request(
+ "GET",
+ "/me/player",
+ params={"market": market},
+ empty_response={
+ "status_code": 204,
+ "empty": True,
+ "message": "No active Spotify playback session was found. Open Spotify on a device and start playback, or transfer playback to an available device.",
+ },
+ )
+
+ def get_currently_playing(self, *, market: Optional[str] = None) -> Any:
+ return self.request(
+ "GET",
+ "/me/player/currently-playing",
+ params={"market": market},
+ empty_response={
+ "status_code": 204,
+ "empty": True,
+ "message": "Spotify is not currently playing anything. Start playback in Spotify and try again.",
+ },
+ )
+
+ def start_playback(
+ self,
+ *,
+ device_id: Optional[str] = None,
+ context_uri: Optional[str] = None,
+ uris: Optional[list[str]] = None,
+ offset: Optional[Dict[str, Any]] = None,
+ position_ms: Optional[int] = None,
+ ) -> Any:
+ return self.request(
+ "PUT",
+ "/me/player/play",
+ params={"device_id": device_id},
+ json_body={
+ "context_uri": context_uri,
+ "uris": uris,
+ "offset": offset,
+ "position_ms": position_ms,
+ },
+ )
+
+ def pause_playback(self, *, device_id: Optional[str] = None) -> Any:
+ return self.request("PUT", "/me/player/pause", params={"device_id": device_id})
+
+ def skip_next(self, *, device_id: Optional[str] = None) -> Any:
+ return self.request("POST", "/me/player/next", params={"device_id": device_id})
+
+ def skip_previous(self, *, device_id: Optional[str] = None) -> Any:
+ return self.request("POST", "/me/player/previous", params={"device_id": device_id})
+
+ def seek(self, *, position_ms: int, device_id: Optional[str] = None) -> Any:
+ return self.request("PUT", "/me/player/seek", params={
+ "position_ms": position_ms,
+ "device_id": device_id,
+ })
+
+ def set_repeat(self, *, state: str, device_id: Optional[str] = None) -> Any:
+ return self.request("PUT", "/me/player/repeat", params={"state": state, "device_id": device_id})
+
+ def set_shuffle(self, *, state: bool, device_id: Optional[str] = None) -> Any:
+ return self.request("PUT", "/me/player/shuffle", params={"state": str(bool(state)).lower(), "device_id": device_id})
+
+ def set_volume(self, *, volume_percent: int, device_id: Optional[str] = None) -> Any:
+ return self.request("PUT", "/me/player/volume", params={
+ "volume_percent": volume_percent,
+ "device_id": device_id,
+ })
+
+ def get_queue(self) -> Any:
+ return self.request("GET", "/me/player/queue")
+
+ def add_to_queue(self, *, uri: str, device_id: Optional[str] = None) -> Any:
+ return self.request("POST", "/me/player/queue", params={"uri": uri, "device_id": device_id})
+
+ def search(
+ self,
+ *,
+ query: str,
+ search_types: list[str],
+ limit: int = 10,
+ offset: int = 0,
+ market: Optional[str] = None,
+ include_external: Optional[str] = None,
+ ) -> Any:
+ return self.request("GET", "/search", params={
+ "q": query,
+ "type": ",".join(search_types),
+ "limit": limit,
+ "offset": offset,
+ "market": market,
+ "include_external": include_external,
+ })
+
+ def get_my_playlists(self, *, limit: int = 20, offset: int = 0) -> Any:
+ return self.request("GET", "/me/playlists", params={"limit": limit, "offset": offset})
+
+ def get_playlist(self, *, playlist_id: str, market: Optional[str] = None) -> Any:
+ return self.request("GET", f"/playlists/{playlist_id}", params={"market": market})
+
+ def create_playlist(
+ self,
+ *,
+ name: str,
+ public: bool = False,
+ collaborative: bool = False,
+ description: Optional[str] = None,
+ ) -> Any:
+ return self.request("POST", "/me/playlists", json_body={
+ "name": name,
+ "public": public,
+ "collaborative": collaborative,
+ "description": description,
+ })
+
+ def add_playlist_items(
+ self,
+ *,
+ playlist_id: str,
+ uris: list[str],
+ position: Optional[int] = None,
+ ) -> Any:
+ return self.request("POST", f"/playlists/{playlist_id}/items", json_body={
+ "uris": uris,
+ "position": position,
+ })
+
+ def remove_playlist_items(
+ self,
+ *,
+ playlist_id: str,
+ uris: list[str],
+ snapshot_id: Optional[str] = None,
+ ) -> Any:
+ return self.request("DELETE", f"/playlists/{playlist_id}/items", json_body={
+ "items": [{"uri": uri} for uri in uris],
+ "snapshot_id": snapshot_id,
+ })
+
+ def update_playlist_details(
+ self,
+ *,
+ playlist_id: str,
+ name: Optional[str] = None,
+ public: Optional[bool] = None,
+ collaborative: Optional[bool] = None,
+ description: Optional[str] = None,
+ ) -> Any:
+ return self.request("PUT", f"/playlists/{playlist_id}", json_body={
+ "name": name,
+ "public": public,
+ "collaborative": collaborative,
+ "description": description,
+ })
+
+ def get_album(self, *, album_id: str, market: Optional[str] = None) -> Any:
+ return self.request("GET", f"/albums/{album_id}", params={"market": market})
+
+ def get_album_tracks(self, *, album_id: str, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any:
+ return self.request("GET", f"/albums/{album_id}/tracks", params={
+ "limit": limit,
+ "offset": offset,
+ "market": market,
+ })
+
+ def get_saved_tracks(self, *, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any:
+ return self.request("GET", "/me/tracks", params={"limit": limit, "offset": offset, "market": market})
+
+ def save_library_items(self, *, uris: list[str]) -> Any:
+ return self.request("PUT", "/me/library", params={"uris": ",".join(uris)})
+
+ def library_contains(self, *, uris: list[str]) -> Any:
+ return self.request("GET", "/me/library/contains", params={"uris": ",".join(uris)})
+
+ def get_saved_albums(self, *, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any:
+ return self.request("GET", "/me/albums", params={"limit": limit, "offset": offset, "market": market})
+
+ def remove_saved_tracks(self, *, track_ids: list[str]) -> Any:
+ uris = [f"spotify:track:{track_id}" for track_id in track_ids]
+ return self.request("DELETE", "/me/library", params={"uris": ",".join(uris)})
+
+ def remove_saved_albums(self, *, album_ids: list[str]) -> Any:
+ uris = [f"spotify:album:{album_id}" for album_id in album_ids]
+ return self.request("DELETE", "/me/library", params={"uris": ",".join(uris)})
+
+ def get_recently_played(
+ self,
+ *,
+ limit: int = 20,
+ after: Optional[int] = None,
+ before: Optional[int] = None,
+ ) -> Any:
+ return self.request("GET", "/me/player/recently-played", params={
+ "limit": limit,
+ "after": after,
+ "before": before,
+ })
+
+
+def _extract_spotify_error_detail(response: httpx.Response, *, fallback: str) -> str:
+ detail = fallback
+ try:
+ payload = response.json()
+ if isinstance(payload, dict):
+ error_obj = payload.get("error")
+ if isinstance(error_obj, dict):
+ detail = str(error_obj.get("message") or detail)
+ elif isinstance(error_obj, str):
+ detail = error_obj
+ except Exception:
+ pass
+ return detail.strip()
+
+
+def _friendly_spotify_error_message(
+ *,
+ status_code: int,
+ detail: str,
+ method: str,
+ path: str,
+ retry_after: Optional[str],
+) -> str:
+ normalized_detail = detail.lower()
+ is_playback_path = path.startswith("/me/player")
+
+ if status_code == 401:
+ return "Spotify authentication failed or expired. Run `hermes auth spotify` again."
+
+ if status_code == 403:
+ if is_playback_path:
+ return (
+ "Spotify rejected this playback request. Playback control usually requires a Spotify Premium account "
+ "and an active Spotify Connect device."
+ )
+ if "scope" in normalized_detail or "permission" in normalized_detail:
+ return "Spotify rejected the request because the current auth scope is insufficient. Re-run `hermes auth spotify` to refresh permissions."
+ return "Spotify rejected the request. The account may not have permission for this action."
+
+ if status_code == 404:
+ if is_playback_path:
+ return "Spotify could not find an active playback device or player session for this request."
+ return "Spotify resource not found."
+
+ if status_code == 429:
+ message = "Spotify rate limit exceeded."
+ if retry_after:
+ message += f" Retry after {retry_after} seconds."
+ return message
+
+ if detail:
+ return detail
+ return f"Spotify API request failed with status {status_code}."
+
+
+def _strip_none(payload: Optional[Dict[str, Any]]) -> Dict[str, Any]:
+ if not payload:
+ return {}
+ return {key: value for key, value in payload.items() if value is not None}
+
+
+def normalize_spotify_id(value: str, expected_type: Optional[str] = None) -> str:
+ cleaned = (value or "").strip()
+ if not cleaned:
+ raise SpotifyError("Spotify id/uri/url is required.")
+ if cleaned.startswith("spotify:"):
+ parts = cleaned.split(":")
+ if len(parts) >= 3:
+ item_type = parts[1]
+ if expected_type and item_type != expected_type:
+ raise SpotifyError(f"Expected a Spotify {expected_type}, got {item_type}.")
+ return parts[2]
+ if "open.spotify.com" in cleaned:
+ parsed = urlparse(cleaned)
+ path_parts = [part for part in parsed.path.split("/") if part]
+ if len(path_parts) >= 2:
+ item_type, item_id = path_parts[0], path_parts[1]
+ if expected_type and item_type != expected_type:
+ raise SpotifyError(f"Expected a Spotify {expected_type}, got {item_type}.")
+ return item_id
+ return cleaned
+
+
+def normalize_spotify_uri(value: str, expected_type: Optional[str] = None) -> str:
+ cleaned = (value or "").strip()
+ if not cleaned:
+ raise SpotifyError("Spotify URI/url/id is required.")
+ if cleaned.startswith("spotify:"):
+ if expected_type:
+ parts = cleaned.split(":")
+ if len(parts) >= 3 and parts[1] != expected_type:
+ raise SpotifyError(f"Expected a Spotify {expected_type}, got {parts[1]}.")
+ return cleaned
+ item_id = normalize_spotify_id(cleaned, expected_type)
+ if expected_type:
+ return f"spotify:{expected_type}:{item_id}"
+ return cleaned
+
+
+def normalize_spotify_uris(values: Iterable[str], expected_type: Optional[str] = None) -> list[str]:
+ uris: list[str] = []
+ for value in values:
+ uri = normalize_spotify_uri(str(value), expected_type)
+ if uri not in uris:
+ uris.append(uri)
+ if not uris:
+ raise SpotifyError("At least one Spotify item is required.")
+ return uris
+
+
+def compact_json(data: Any) -> str:
+ return json.dumps(data, ensure_ascii=False)
diff --git a/tools/spotify_tool.py b/tools/spotify_tool.py
new file mode 100644
index 000000000..dc88953c9
--- /dev/null
+++ b/tools/spotify_tool.py
@@ -0,0 +1,530 @@
+"""Native Spotify tools for Hermes."""
+
+from __future__ import annotations
+
+from typing import Any, Dict, List
+
+from hermes_cli.auth import get_auth_status
+from tools.providers.spotify_client import (
+ SpotifyAPIError,
+ SpotifyAuthRequiredError,
+ SpotifyClient,
+ SpotifyError,
+ normalize_spotify_id,
+ normalize_spotify_uri,
+ normalize_spotify_uris,
+)
+from tools.registry import registry, tool_error, tool_result
+
+
+def _check_spotify_available() -> bool:
+ try:
+ return bool(get_auth_status("spotify").get("logged_in"))
+ except Exception:
+ return False
+
+
+def _spotify_client() -> SpotifyClient:
+ return SpotifyClient()
+
+
+def _spotify_tool_error(exc: Exception) -> str:
+ if isinstance(exc, (SpotifyError, SpotifyAuthRequiredError)):
+ return tool_error(str(exc))
+ if isinstance(exc, SpotifyAPIError):
+ return tool_error(str(exc), status_code=exc.status_code)
+ return tool_error(f"Spotify tool failed: {type(exc).__name__}: {exc}")
+
+
+def _coerce_limit(raw: Any, *, default: int = 20, minimum: int = 1, maximum: int = 50) -> int:
+ try:
+ value = int(raw)
+ except Exception:
+ value = default
+ return max(minimum, min(maximum, value))
+
+
+def _coerce_bool(raw: Any, default: bool = False) -> bool:
+ if isinstance(raw, bool):
+ return raw
+ if isinstance(raw, str):
+ cleaned = raw.strip().lower()
+ if cleaned in {"1", "true", "yes", "on"}:
+ return True
+ if cleaned in {"0", "false", "no", "off"}:
+ return False
+ return default
+
+
+def _as_list(raw: Any) -> List[str]:
+ if raw is None:
+ return []
+ if isinstance(raw, list):
+ return [str(item).strip() for item in raw if str(item).strip()]
+ return [str(raw).strip()] if str(raw).strip() else []
+
+
+def _describe_empty_playback(payload: Any, *, action: str) -> dict | None:
+ if not isinstance(payload, dict) or not payload.get("empty"):
+ return None
+ if action == "get_currently_playing":
+ return {
+ "success": True,
+ "action": action,
+ "is_playing": False,
+ "status_code": payload.get("status_code", 204),
+ "message": payload.get("message") or "Spotify is not currently playing anything.",
+ }
+ if action == "get_state":
+ return {
+ "success": True,
+ "action": action,
+ "has_active_device": False,
+ "status_code": payload.get("status_code", 204),
+ "message": payload.get("message") or "No active Spotify playback session was found.",
+ }
+ return None
+
+
+def _handle_spotify_playback(args: dict, **kw) -> str:
+ action = str(args.get("action") or "get_state").strip().lower()
+ client = _spotify_client()
+ try:
+ if action == "get_state":
+ payload = client.get_playback_state(market=args.get("market"))
+ empty_result = _describe_empty_playback(payload, action=action)
+ return tool_result(empty_result or payload)
+ if action == "get_currently_playing":
+ payload = client.get_currently_playing(market=args.get("market"))
+ empty_result = _describe_empty_playback(payload, action=action)
+ return tool_result(empty_result or payload)
+ if action == "play":
+ offset = args.get("offset")
+ if isinstance(offset, dict):
+ payload_offset = {k: v for k, v in offset.items() if v is not None}
+ else:
+ payload_offset = None
+ uris = normalize_spotify_uris(_as_list(args.get("uris")), "track") if args.get("uris") else None
+ context_uri = None
+ if args.get("context_uri"):
+ raw_context = str(args.get("context_uri"))
+ context_type = None
+ if raw_context.startswith("spotify:album:") or "/album/" in raw_context:
+ context_type = "album"
+ elif raw_context.startswith("spotify:playlist:") or "/playlist/" in raw_context:
+ context_type = "playlist"
+ elif raw_context.startswith("spotify:artist:") or "/artist/" in raw_context:
+ context_type = "artist"
+ context_uri = normalize_spotify_uri(raw_context, context_type)
+ result = client.start_playback(
+ device_id=args.get("device_id"),
+ context_uri=context_uri,
+ uris=uris,
+ offset=payload_offset,
+ position_ms=args.get("position_ms"),
+ )
+ return tool_result({"success": True, "action": action, "result": result})
+ if action == "pause":
+ result = client.pause_playback(device_id=args.get("device_id"))
+ return tool_result({"success": True, "action": action, "result": result})
+ if action == "next":
+ result = client.skip_next(device_id=args.get("device_id"))
+ return tool_result({"success": True, "action": action, "result": result})
+ if action == "previous":
+ result = client.skip_previous(device_id=args.get("device_id"))
+ return tool_result({"success": True, "action": action, "result": result})
+ if action == "seek":
+ if args.get("position_ms") is None:
+ return tool_error("position_ms is required for action='seek'")
+ result = client.seek(position_ms=int(args["position_ms"]), device_id=args.get("device_id"))
+ return tool_result({"success": True, "action": action, "result": result})
+ if action == "set_repeat":
+ state = str(args.get("state") or "").strip().lower()
+ if state not in {"track", "context", "off"}:
+ return tool_error("state must be one of: track, context, off")
+ result = client.set_repeat(state=state, device_id=args.get("device_id"))
+ return tool_result({"success": True, "action": action, "result": result})
+ if action == "set_shuffle":
+ result = client.set_shuffle(state=_coerce_bool(args.get("state")), device_id=args.get("device_id"))
+ return tool_result({"success": True, "action": action, "result": result})
+ if action == "set_volume":
+ if args.get("volume_percent") is None:
+ return tool_error("volume_percent is required for action='set_volume'")
+ result = client.set_volume(volume_percent=max(0, min(100, int(args["volume_percent"]))), device_id=args.get("device_id"))
+ return tool_result({"success": True, "action": action, "result": result})
+ return tool_error(f"Unknown spotify_playback action: {action}")
+ except Exception as exc:
+ return _spotify_tool_error(exc)
+
+
+def _handle_spotify_devices(args: dict, **kw) -> str:
+ action = str(args.get("action") or "list").strip().lower()
+ client = _spotify_client()
+ try:
+ if action == "list":
+ return tool_result(client.get_devices())
+ if action == "transfer":
+ device_id = str(args.get("device_id") or "").strip()
+ if not device_id:
+ return tool_error("device_id is required for action='transfer'")
+ result = client.transfer_playback(device_id=device_id, play=_coerce_bool(args.get("play")))
+ return tool_result({"success": True, "action": action, "result": result})
+ return tool_error(f"Unknown spotify_devices action: {action}")
+ except Exception as exc:
+ return _spotify_tool_error(exc)
+
+
+def _handle_spotify_queue(args: dict, **kw) -> str:
+ action = str(args.get("action") or "get").strip().lower()
+ client = _spotify_client()
+ try:
+ if action == "get":
+ return tool_result(client.get_queue())
+ if action == "add":
+ uri = normalize_spotify_uri(str(args.get("uri") or ""), None)
+ result = client.add_to_queue(uri=uri, device_id=args.get("device_id"))
+ return tool_result({"success": True, "action": action, "uri": uri, "result": result})
+ return tool_error(f"Unknown spotify_queue action: {action}")
+ except Exception as exc:
+ return _spotify_tool_error(exc)
+
+
+def _handle_spotify_search(args: dict, **kw) -> str:
+ client = _spotify_client()
+ query = str(args.get("query") or "").strip()
+ if not query:
+ return tool_error("query is required")
+ raw_types = _as_list(args.get("types") or args.get("type") or ["track"])
+ search_types = [value.lower() for value in raw_types if value.lower() in {"album", "artist", "playlist", "track", "show", "episode", "audiobook"}]
+ if not search_types:
+ return tool_error("types must contain one or more of: album, artist, playlist, track, show, episode, audiobook")
+ try:
+ return tool_result(client.search(
+ query=query,
+ search_types=search_types,
+ limit=_coerce_limit(args.get("limit"), default=10),
+ offset=max(0, int(args.get("offset") or 0)),
+ market=args.get("market"),
+ include_external=args.get("include_external"),
+ ))
+ except Exception as exc:
+ return _spotify_tool_error(exc)
+
+
+def _handle_spotify_playlists(args: dict, **kw) -> str:
+ action = str(args.get("action") or "list").strip().lower()
+ client = _spotify_client()
+ try:
+ if action == "list":
+ return tool_result(client.get_my_playlists(
+ limit=_coerce_limit(args.get("limit"), default=20),
+ offset=max(0, int(args.get("offset") or 0)),
+ ))
+ if action == "get":
+ playlist_id = normalize_spotify_id(str(args.get("playlist_id") or ""), "playlist")
+ return tool_result(client.get_playlist(playlist_id=playlist_id, market=args.get("market")))
+ if action == "create":
+ name = str(args.get("name") or "").strip()
+ if not name:
+ return tool_error("name is required for action='create'")
+ return tool_result(client.create_playlist(
+ name=name,
+ public=_coerce_bool(args.get("public")),
+ collaborative=_coerce_bool(args.get("collaborative")),
+ description=args.get("description"),
+ ))
+ if action == "add_items":
+ playlist_id = normalize_spotify_id(str(args.get("playlist_id") or ""), "playlist")
+ uris = normalize_spotify_uris(_as_list(args.get("uris")))
+ return tool_result(client.add_playlist_items(
+ playlist_id=playlist_id,
+ uris=uris,
+ position=args.get("position"),
+ ))
+ if action == "remove_items":
+ playlist_id = normalize_spotify_id(str(args.get("playlist_id") or ""), "playlist")
+ uris = normalize_spotify_uris(_as_list(args.get("uris")))
+ return tool_result(client.remove_playlist_items(
+ playlist_id=playlist_id,
+ uris=uris,
+ snapshot_id=args.get("snapshot_id"),
+ ))
+ if action == "update_details":
+ playlist_id = normalize_spotify_id(str(args.get("playlist_id") or ""), "playlist")
+ return tool_result(client.update_playlist_details(
+ playlist_id=playlist_id,
+ name=args.get("name"),
+ public=args.get("public"),
+ collaborative=args.get("collaborative"),
+ description=args.get("description"),
+ ))
+ return tool_error(f"Unknown spotify_playlists action: {action}")
+ except Exception as exc:
+ return _spotify_tool_error(exc)
+
+
+def _handle_spotify_albums(args: dict, **kw) -> str:
+ action = str(args.get("action") or "get").strip().lower()
+ client = _spotify_client()
+ try:
+ album_id = normalize_spotify_id(str(args.get("album_id") or args.get("id") or ""), "album")
+ if action == "get":
+ return tool_result(client.get_album(album_id=album_id, market=args.get("market")))
+ if action == "tracks":
+ return tool_result(client.get_album_tracks(
+ album_id=album_id,
+ limit=_coerce_limit(args.get("limit"), default=20),
+ offset=max(0, int(args.get("offset") or 0)),
+ market=args.get("market"),
+ ))
+ return tool_error(f"Unknown spotify_albums action: {action}")
+ except Exception as exc:
+ return _spotify_tool_error(exc)
+
+
+def _handle_spotify_saved_tracks(args: dict, **kw) -> str:
+ action = str(args.get("action") or "list").strip().lower()
+ client = _spotify_client()
+ try:
+ if action == "list":
+ return tool_result(client.get_saved_tracks(
+ limit=_coerce_limit(args.get("limit"), default=20),
+ offset=max(0, int(args.get("offset") or 0)),
+ market=args.get("market"),
+ ))
+ if action == "save":
+ uris = normalize_spotify_uris(_as_list(args.get("uris") or args.get("items")), "track")
+ return tool_result(client.save_library_items(uris=uris))
+ if action == "remove":
+ track_ids = [normalize_spotify_id(item, "track") for item in _as_list(args.get("ids") or args.get("items"))]
+ if not track_ids:
+ return tool_error("ids/items is required for action='remove'")
+ return tool_result(client.remove_saved_tracks(track_ids=track_ids))
+ return tool_error(f"Unknown spotify_saved_tracks action: {action}")
+ except Exception as exc:
+ return _spotify_tool_error(exc)
+
+
+def _handle_spotify_saved_albums(args: dict, **kw) -> str:
+ action = str(args.get("action") or "list").strip().lower()
+ client = _spotify_client()
+ try:
+ if action == "list":
+ return tool_result(client.get_saved_albums(
+ limit=_coerce_limit(args.get("limit"), default=20),
+ offset=max(0, int(args.get("offset") or 0)),
+ market=args.get("market"),
+ ))
+ if action == "save":
+ uris = normalize_spotify_uris(_as_list(args.get("uris") or args.get("items")), "album")
+ return tool_result(client.save_library_items(uris=uris))
+ if action == "remove":
+ album_ids = [normalize_spotify_id(item, "album") for item in _as_list(args.get("ids") or args.get("items"))]
+ if not album_ids:
+ return tool_error("ids/items is required for action='remove'")
+ return tool_result(client.remove_saved_albums(album_ids=album_ids))
+ return tool_error(f"Unknown spotify_saved_albums action: {action}")
+ except Exception as exc:
+ return _spotify_tool_error(exc)
+
+
+def _handle_spotify_activity(args: dict, **kw) -> str:
+ action = str(args.get("action") or "now_playing").strip().lower()
+ client = _spotify_client()
+ try:
+ if action == "now_playing":
+ payload = client.get_currently_playing(market=args.get("market"))
+ if isinstance(payload, dict) and payload.get("empty"):
+ return tool_result({
+ "success": True,
+ "action": action,
+ "is_playing": False,
+ "status_code": payload.get("status_code", 204),
+ "message": payload.get("message") or "Spotify is not currently playing anything.",
+ })
+ return tool_result(payload)
+ if action == "recently_played":
+ after = args.get("after")
+ before = args.get("before")
+ if after and before:
+ return tool_error("Provide only one of 'after' or 'before'")
+ return tool_result(client.get_recently_played(
+ limit=_coerce_limit(args.get("limit"), default=20),
+ after=int(after) if after is not None else None,
+ before=int(before) if before is not None else None,
+ ))
+ return tool_error(f"Unknown spotify_activity action: {action}")
+ except Exception as exc:
+ return _spotify_tool_error(exc)
+
+
+COMMON_STRING = {"type": "string"}
+
+SPOTIFY_PLAYBACK_SCHEMA = {
+ "name": "spotify_playback",
+ "description": "Control Spotify playback or inspect the active playback state.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "action": {"type": "string", "enum": ["get_state", "get_currently_playing", "play", "pause", "next", "previous", "seek", "set_repeat", "set_shuffle", "set_volume"]},
+ "device_id": COMMON_STRING,
+ "market": COMMON_STRING,
+ "context_uri": COMMON_STRING,
+ "uris": {"type": "array", "items": COMMON_STRING},
+ "offset": {"type": "object"},
+ "position_ms": {"type": "integer"},
+ "state": {"description": "For set_repeat use track/context/off. For set_shuffle use boolean-like true/false.", "oneOf": [{"type": "string"}, {"type": "boolean"}]},
+ "volume_percent": {"type": "integer"},
+ },
+ "required": ["action"],
+ },
+}
+
+SPOTIFY_DEVICES_SCHEMA = {
+ "name": "spotify_devices",
+ "description": "List Spotify Connect devices or transfer playback to a different device.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "action": {"type": "string", "enum": ["list", "transfer"]},
+ "device_id": COMMON_STRING,
+ "play": {"type": "boolean"},
+ },
+ "required": ["action"],
+ },
+}
+
+SPOTIFY_QUEUE_SCHEMA = {
+ "name": "spotify_queue",
+ "description": "Inspect the user's Spotify queue or add an item to it.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "action": {"type": "string", "enum": ["get", "add"]},
+ "uri": COMMON_STRING,
+ "device_id": COMMON_STRING,
+ },
+ "required": ["action"],
+ },
+}
+
+SPOTIFY_SEARCH_SCHEMA = {
+ "name": "spotify_search",
+ "description": "Search the Spotify catalog for tracks, albums, artists, playlists, shows, or episodes.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "query": COMMON_STRING,
+ "types": {"type": "array", "items": COMMON_STRING},
+ "type": COMMON_STRING,
+ "limit": {"type": "integer"},
+ "offset": {"type": "integer"},
+ "market": COMMON_STRING,
+ "include_external": COMMON_STRING,
+ },
+ "required": ["query"],
+ },
+}
+
+SPOTIFY_PLAYLISTS_SCHEMA = {
+ "name": "spotify_playlists",
+ "description": "List, inspect, create, update, and modify Spotify playlists.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "action": {"type": "string", "enum": ["list", "get", "create", "add_items", "remove_items", "update_details"]},
+ "playlist_id": COMMON_STRING,
+ "market": COMMON_STRING,
+ "limit": {"type": "integer"},
+ "offset": {"type": "integer"},
+ "name": COMMON_STRING,
+ "description": COMMON_STRING,
+ "public": {"type": "boolean"},
+ "collaborative": {"type": "boolean"},
+ "uris": {"type": "array", "items": COMMON_STRING},
+ "position": {"type": "integer"},
+ "snapshot_id": COMMON_STRING,
+ },
+ "required": ["action"],
+ },
+}
+
+SPOTIFY_ALBUMS_SCHEMA = {
+ "name": "spotify_albums",
+ "description": "Fetch Spotify album metadata or album tracks.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "action": {"type": "string", "enum": ["get", "tracks"]},
+ "album_id": COMMON_STRING,
+ "id": COMMON_STRING,
+ "market": COMMON_STRING,
+ "limit": {"type": "integer"},
+ "offset": {"type": "integer"},
+ },
+ "required": ["action"],
+ },
+}
+
+SPOTIFY_SAVED_TRACKS_SCHEMA = {
+ "name": "spotify_saved_tracks",
+ "description": "List, save, or remove the user's saved Spotify tracks.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "action": {"type": "string", "enum": ["list", "save", "remove"]},
+ "limit": {"type": "integer"},
+ "offset": {"type": "integer"},
+ "market": COMMON_STRING,
+ "uris": {"type": "array", "items": COMMON_STRING},
+ "ids": {"type": "array", "items": COMMON_STRING},
+ "items": {"type": "array", "items": COMMON_STRING},
+ },
+ "required": ["action"],
+ },
+}
+
+SPOTIFY_SAVED_ALBUMS_SCHEMA = {
+ "name": "spotify_saved_albums",
+ "description": "List, save, or remove the user's saved Spotify albums.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "action": {"type": "string", "enum": ["list", "save", "remove"]},
+ "limit": {"type": "integer"},
+ "offset": {"type": "integer"},
+ "market": COMMON_STRING,
+ "uris": {"type": "array", "items": COMMON_STRING},
+ "ids": {"type": "array", "items": COMMON_STRING},
+ "items": {"type": "array", "items": COMMON_STRING},
+ },
+ "required": ["action"],
+ },
+}
+
+SPOTIFY_ACTIVITY_SCHEMA = {
+ "name": "spotify_activity",
+ "description": "Inspect now playing or recently played Spotify activity.",
+ "parameters": {
+ "type": "object",
+ "properties": {
+ "action": {"type": "string", "enum": ["now_playing", "recently_played"]},
+ "market": COMMON_STRING,
+ "limit": {"type": "integer"},
+ "after": {"type": "integer"},
+ "before": {"type": "integer"},
+ },
+ "required": ["action"],
+ },
+}
+
+
+registry.register(name="spotify_playback", toolset="spotify", schema=SPOTIFY_PLAYBACK_SCHEMA, handler=_handle_spotify_playback, check_fn=_check_spotify_available, emoji="🎵")
+registry.register(name="spotify_devices", toolset="spotify", schema=SPOTIFY_DEVICES_SCHEMA, handler=_handle_spotify_devices, check_fn=_check_spotify_available, emoji="🔈")
+registry.register(name="spotify_queue", toolset="spotify", schema=SPOTIFY_QUEUE_SCHEMA, handler=_handle_spotify_queue, check_fn=_check_spotify_available, emoji="📻")
+registry.register(name="spotify_search", toolset="spotify", schema=SPOTIFY_SEARCH_SCHEMA, handler=_handle_spotify_search, check_fn=_check_spotify_available, emoji="🔎")
+registry.register(name="spotify_playlists", toolset="spotify", schema=SPOTIFY_PLAYLISTS_SCHEMA, handler=_handle_spotify_playlists, check_fn=_check_spotify_available, emoji="📚")
+registry.register(name="spotify_albums", toolset="spotify", schema=SPOTIFY_ALBUMS_SCHEMA, handler=_handle_spotify_albums, check_fn=_check_spotify_available, emoji="💿")
+registry.register(name="spotify_saved_tracks", toolset="spotify", schema=SPOTIFY_SAVED_TRACKS_SCHEMA, handler=_handle_spotify_saved_tracks, check_fn=_check_spotify_available, emoji="❤️")
+registry.register(name="spotify_saved_albums", toolset="spotify", schema=SPOTIFY_SAVED_ALBUMS_SCHEMA, handler=_handle_spotify_saved_albums, check_fn=_check_spotify_available, emoji="💽")
+registry.register(name="spotify_activity", toolset="spotify", schema=SPOTIFY_ACTIVITY_SCHEMA, handler=_handle_spotify_activity, check_fn=_check_spotify_available, emoji="🕘")
diff --git a/toolsets.py b/toolsets.py
index b3cdb2e7a..5fd67fda7 100644
--- a/toolsets.py
+++ b/toolsets.py
@@ -60,6 +60,10 @@ _HERMES_CORE_TOOLS = [
"send_message",
# Home Assistant smart home control (gated on HASS_TOKEN via check_fn)
"ha_list_entities", "ha_get_state", "ha_list_services", "ha_call_service",
+ # Spotify playback and library tools (gated on Spotify auth via check_fn)
+ "spotify_playback", "spotify_devices", "spotify_queue", "spotify_search",
+ "spotify_playlists", "spotify_albums", "spotify_saved_tracks",
+ "spotify_saved_albums", "spotify_activity",
]
@@ -217,6 +221,16 @@ TOOLSETS = {
"includes": []
},
+ "spotify": {
+ "description": "Native Spotify playback, search, playlist, album, library, and activity tools",
+ "tools": [
+ "spotify_playback", "spotify_devices", "spotify_queue", "spotify_search",
+ "spotify_playlists", "spotify_albums", "spotify_saved_tracks",
+ "spotify_saved_albums", "spotify_activity",
+ ],
+ "includes": []
+ },
+
# Scenario-specific toolsets