Add native Spotify tools with PKCE auth

This commit is contained in:
Dilee 2026-04-24 14:17:44 +03:00 committed by Teknium
parent 3392d1e422
commit 7e9dd9ca45
9 changed files with 1936 additions and 3 deletions

View file

@ -0,0 +1 @@
"""Provider-specific native tool clients."""

View file

@ -0,0 +1,435 @@
"""Thin Spotify Web API helper used by Hermes native tools."""
from __future__ import annotations
import json
from typing import Any, Dict, Iterable, Optional
from urllib.parse import urlparse
import httpx
from hermes_cli.auth import (
AuthError,
resolve_spotify_runtime_credentials,
)
class SpotifyError(RuntimeError):
"""Base Spotify tool error."""
class SpotifyAuthRequiredError(SpotifyError):
"""Raised when the user needs to authenticate with Spotify first."""
class SpotifyAPIError(SpotifyError):
"""Structured Spotify API failure."""
def __init__(
self,
message: str,
*,
status_code: Optional[int] = None,
response_body: Optional[str] = None,
) -> None:
super().__init__(message)
self.status_code = status_code
self.response_body = response_body
self.path = None
class SpotifyClient:
def __init__(self) -> None:
self._runtime = self._resolve_runtime(refresh_if_expiring=True)
def _resolve_runtime(self, *, force_refresh: bool = False, refresh_if_expiring: bool = True) -> Dict[str, Any]:
try:
return resolve_spotify_runtime_credentials(
force_refresh=force_refresh,
refresh_if_expiring=refresh_if_expiring,
)
except AuthError as exc:
raise SpotifyAuthRequiredError(str(exc)) from exc
@property
def base_url(self) -> str:
return str(self._runtime.get("base_url") or "").rstrip("/")
def _headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self._runtime['access_token']}",
"Content-Type": "application/json",
}
def request(
self,
method: str,
path: str,
*,
params: Optional[Dict[str, Any]] = None,
json_body: Optional[Dict[str, Any]] = None,
allow_retry_on_401: bool = True,
empty_response: Optional[Dict[str, Any]] = None,
) -> Any:
url = f"{self.base_url}{path}"
response = httpx.request(
method,
url,
headers=self._headers(),
params=_strip_none(params),
json=_strip_none(json_body) if json_body is not None else None,
timeout=30.0,
)
if response.status_code == 401 and allow_retry_on_401:
self._runtime = self._resolve_runtime(force_refresh=True, refresh_if_expiring=True)
return self.request(
method,
path,
params=params,
json_body=json_body,
allow_retry_on_401=False,
)
if response.status_code >= 400:
self._raise_api_error(response, method=method, path=path)
if response.status_code == 204 or not response.content:
return empty_response or {"success": True, "status_code": response.status_code, "empty": True}
if "application/json" in response.headers.get("content-type", ""):
return response.json()
return {"success": True, "text": response.text}
def _raise_api_error(self, response: httpx.Response, *, method: str, path: str) -> None:
detail = response.text.strip()
message = _friendly_spotify_error_message(
status_code=response.status_code,
detail=_extract_spotify_error_detail(response, fallback=detail),
method=method,
path=path,
retry_after=response.headers.get("Retry-After"),
)
error = SpotifyAPIError(message, status_code=response.status_code, response_body=detail)
error.path = path
raise error
def get_devices(self) -> Any:
return self.request("GET", "/me/player/devices")
def transfer_playback(self, *, device_id: str, play: bool = False) -> Any:
return self.request("PUT", "/me/player", json_body={
"device_ids": [device_id],
"play": play,
})
def get_playback_state(self, *, market: Optional[str] = None) -> Any:
return self.request(
"GET",
"/me/player",
params={"market": market},
empty_response={
"status_code": 204,
"empty": True,
"message": "No active Spotify playback session was found. Open Spotify on a device and start playback, or transfer playback to an available device.",
},
)
def get_currently_playing(self, *, market: Optional[str] = None) -> Any:
return self.request(
"GET",
"/me/player/currently-playing",
params={"market": market},
empty_response={
"status_code": 204,
"empty": True,
"message": "Spotify is not currently playing anything. Start playback in Spotify and try again.",
},
)
def start_playback(
self,
*,
device_id: Optional[str] = None,
context_uri: Optional[str] = None,
uris: Optional[list[str]] = None,
offset: Optional[Dict[str, Any]] = None,
position_ms: Optional[int] = None,
) -> Any:
return self.request(
"PUT",
"/me/player/play",
params={"device_id": device_id},
json_body={
"context_uri": context_uri,
"uris": uris,
"offset": offset,
"position_ms": position_ms,
},
)
def pause_playback(self, *, device_id: Optional[str] = None) -> Any:
return self.request("PUT", "/me/player/pause", params={"device_id": device_id})
def skip_next(self, *, device_id: Optional[str] = None) -> Any:
return self.request("POST", "/me/player/next", params={"device_id": device_id})
def skip_previous(self, *, device_id: Optional[str] = None) -> Any:
return self.request("POST", "/me/player/previous", params={"device_id": device_id})
def seek(self, *, position_ms: int, device_id: Optional[str] = None) -> Any:
return self.request("PUT", "/me/player/seek", params={
"position_ms": position_ms,
"device_id": device_id,
})
def set_repeat(self, *, state: str, device_id: Optional[str] = None) -> Any:
return self.request("PUT", "/me/player/repeat", params={"state": state, "device_id": device_id})
def set_shuffle(self, *, state: bool, device_id: Optional[str] = None) -> Any:
return self.request("PUT", "/me/player/shuffle", params={"state": str(bool(state)).lower(), "device_id": device_id})
def set_volume(self, *, volume_percent: int, device_id: Optional[str] = None) -> Any:
return self.request("PUT", "/me/player/volume", params={
"volume_percent": volume_percent,
"device_id": device_id,
})
def get_queue(self) -> Any:
return self.request("GET", "/me/player/queue")
def add_to_queue(self, *, uri: str, device_id: Optional[str] = None) -> Any:
return self.request("POST", "/me/player/queue", params={"uri": uri, "device_id": device_id})
def search(
self,
*,
query: str,
search_types: list[str],
limit: int = 10,
offset: int = 0,
market: Optional[str] = None,
include_external: Optional[str] = None,
) -> Any:
return self.request("GET", "/search", params={
"q": query,
"type": ",".join(search_types),
"limit": limit,
"offset": offset,
"market": market,
"include_external": include_external,
})
def get_my_playlists(self, *, limit: int = 20, offset: int = 0) -> Any:
return self.request("GET", "/me/playlists", params={"limit": limit, "offset": offset})
def get_playlist(self, *, playlist_id: str, market: Optional[str] = None) -> Any:
return self.request("GET", f"/playlists/{playlist_id}", params={"market": market})
def create_playlist(
self,
*,
name: str,
public: bool = False,
collaborative: bool = False,
description: Optional[str] = None,
) -> Any:
return self.request("POST", "/me/playlists", json_body={
"name": name,
"public": public,
"collaborative": collaborative,
"description": description,
})
def add_playlist_items(
self,
*,
playlist_id: str,
uris: list[str],
position: Optional[int] = None,
) -> Any:
return self.request("POST", f"/playlists/{playlist_id}/items", json_body={
"uris": uris,
"position": position,
})
def remove_playlist_items(
self,
*,
playlist_id: str,
uris: list[str],
snapshot_id: Optional[str] = None,
) -> Any:
return self.request("DELETE", f"/playlists/{playlist_id}/items", json_body={
"items": [{"uri": uri} for uri in uris],
"snapshot_id": snapshot_id,
})
def update_playlist_details(
self,
*,
playlist_id: str,
name: Optional[str] = None,
public: Optional[bool] = None,
collaborative: Optional[bool] = None,
description: Optional[str] = None,
) -> Any:
return self.request("PUT", f"/playlists/{playlist_id}", json_body={
"name": name,
"public": public,
"collaborative": collaborative,
"description": description,
})
def get_album(self, *, album_id: str, market: Optional[str] = None) -> Any:
return self.request("GET", f"/albums/{album_id}", params={"market": market})
def get_album_tracks(self, *, album_id: str, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any:
return self.request("GET", f"/albums/{album_id}/tracks", params={
"limit": limit,
"offset": offset,
"market": market,
})
def get_saved_tracks(self, *, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any:
return self.request("GET", "/me/tracks", params={"limit": limit, "offset": offset, "market": market})
def save_library_items(self, *, uris: list[str]) -> Any:
return self.request("PUT", "/me/library", params={"uris": ",".join(uris)})
def library_contains(self, *, uris: list[str]) -> Any:
return self.request("GET", "/me/library/contains", params={"uris": ",".join(uris)})
def get_saved_albums(self, *, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any:
return self.request("GET", "/me/albums", params={"limit": limit, "offset": offset, "market": market})
def remove_saved_tracks(self, *, track_ids: list[str]) -> Any:
uris = [f"spotify:track:{track_id}" for track_id in track_ids]
return self.request("DELETE", "/me/library", params={"uris": ",".join(uris)})
def remove_saved_albums(self, *, album_ids: list[str]) -> Any:
uris = [f"spotify:album:{album_id}" for album_id in album_ids]
return self.request("DELETE", "/me/library", params={"uris": ",".join(uris)})
def get_recently_played(
self,
*,
limit: int = 20,
after: Optional[int] = None,
before: Optional[int] = None,
) -> Any:
return self.request("GET", "/me/player/recently-played", params={
"limit": limit,
"after": after,
"before": before,
})
def _extract_spotify_error_detail(response: httpx.Response, *, fallback: str) -> str:
detail = fallback
try:
payload = response.json()
if isinstance(payload, dict):
error_obj = payload.get("error")
if isinstance(error_obj, dict):
detail = str(error_obj.get("message") or detail)
elif isinstance(error_obj, str):
detail = error_obj
except Exception:
pass
return detail.strip()
def _friendly_spotify_error_message(
*,
status_code: int,
detail: str,
method: str,
path: str,
retry_after: Optional[str],
) -> str:
normalized_detail = detail.lower()
is_playback_path = path.startswith("/me/player")
if status_code == 401:
return "Spotify authentication failed or expired. Run `hermes auth spotify` again."
if status_code == 403:
if is_playback_path:
return (
"Spotify rejected this playback request. Playback control usually requires a Spotify Premium account "
"and an active Spotify Connect device."
)
if "scope" in normalized_detail or "permission" in normalized_detail:
return "Spotify rejected the request because the current auth scope is insufficient. Re-run `hermes auth spotify` to refresh permissions."
return "Spotify rejected the request. The account may not have permission for this action."
if status_code == 404:
if is_playback_path:
return "Spotify could not find an active playback device or player session for this request."
return "Spotify resource not found."
if status_code == 429:
message = "Spotify rate limit exceeded."
if retry_after:
message += f" Retry after {retry_after} seconds."
return message
if detail:
return detail
return f"Spotify API request failed with status {status_code}."
def _strip_none(payload: Optional[Dict[str, Any]]) -> Dict[str, Any]:
if not payload:
return {}
return {key: value for key, value in payload.items() if value is not None}
def normalize_spotify_id(value: str, expected_type: Optional[str] = None) -> str:
cleaned = (value or "").strip()
if not cleaned:
raise SpotifyError("Spotify id/uri/url is required.")
if cleaned.startswith("spotify:"):
parts = cleaned.split(":")
if len(parts) >= 3:
item_type = parts[1]
if expected_type and item_type != expected_type:
raise SpotifyError(f"Expected a Spotify {expected_type}, got {item_type}.")
return parts[2]
if "open.spotify.com" in cleaned:
parsed = urlparse(cleaned)
path_parts = [part for part in parsed.path.split("/") if part]
if len(path_parts) >= 2:
item_type, item_id = path_parts[0], path_parts[1]
if expected_type and item_type != expected_type:
raise SpotifyError(f"Expected a Spotify {expected_type}, got {item_type}.")
return item_id
return cleaned
def normalize_spotify_uri(value: str, expected_type: Optional[str] = None) -> str:
cleaned = (value or "").strip()
if not cleaned:
raise SpotifyError("Spotify URI/url/id is required.")
if cleaned.startswith("spotify:"):
if expected_type:
parts = cleaned.split(":")
if len(parts) >= 3 and parts[1] != expected_type:
raise SpotifyError(f"Expected a Spotify {expected_type}, got {parts[1]}.")
return cleaned
item_id = normalize_spotify_id(cleaned, expected_type)
if expected_type:
return f"spotify:{expected_type}:{item_id}"
return cleaned
def normalize_spotify_uris(values: Iterable[str], expected_type: Optional[str] = None) -> list[str]:
uris: list[str] = []
for value in values:
uri = normalize_spotify_uri(str(value), expected_type)
if uri not in uris:
uris.append(uri)
if not uris:
raise SpotifyError("At least one Spotify item is required.")
return uris
def compact_json(data: Any) -> str:
return json.dumps(data, ensure_ascii=False)

530
tools/spotify_tool.py Normal file
View file

@ -0,0 +1,530 @@
"""Native Spotify tools for Hermes."""
from __future__ import annotations
from typing import Any, Dict, List
from hermes_cli.auth import get_auth_status
from tools.providers.spotify_client import (
SpotifyAPIError,
SpotifyAuthRequiredError,
SpotifyClient,
SpotifyError,
normalize_spotify_id,
normalize_spotify_uri,
normalize_spotify_uris,
)
from tools.registry import registry, tool_error, tool_result
def _check_spotify_available() -> bool:
try:
return bool(get_auth_status("spotify").get("logged_in"))
except Exception:
return False
def _spotify_client() -> SpotifyClient:
return SpotifyClient()
def _spotify_tool_error(exc: Exception) -> str:
if isinstance(exc, (SpotifyError, SpotifyAuthRequiredError)):
return tool_error(str(exc))
if isinstance(exc, SpotifyAPIError):
return tool_error(str(exc), status_code=exc.status_code)
return tool_error(f"Spotify tool failed: {type(exc).__name__}: {exc}")
def _coerce_limit(raw: Any, *, default: int = 20, minimum: int = 1, maximum: int = 50) -> int:
try:
value = int(raw)
except Exception:
value = default
return max(minimum, min(maximum, value))
def _coerce_bool(raw: Any, default: bool = False) -> bool:
if isinstance(raw, bool):
return raw
if isinstance(raw, str):
cleaned = raw.strip().lower()
if cleaned in {"1", "true", "yes", "on"}:
return True
if cleaned in {"0", "false", "no", "off"}:
return False
return default
def _as_list(raw: Any) -> List[str]:
if raw is None:
return []
if isinstance(raw, list):
return [str(item).strip() for item in raw if str(item).strip()]
return [str(raw).strip()] if str(raw).strip() else []
def _describe_empty_playback(payload: Any, *, action: str) -> dict | None:
if not isinstance(payload, dict) or not payload.get("empty"):
return None
if action == "get_currently_playing":
return {
"success": True,
"action": action,
"is_playing": False,
"status_code": payload.get("status_code", 204),
"message": payload.get("message") or "Spotify is not currently playing anything.",
}
if action == "get_state":
return {
"success": True,
"action": action,
"has_active_device": False,
"status_code": payload.get("status_code", 204),
"message": payload.get("message") or "No active Spotify playback session was found.",
}
return None
def _handle_spotify_playback(args: dict, **kw) -> str:
action = str(args.get("action") or "get_state").strip().lower()
client = _spotify_client()
try:
if action == "get_state":
payload = client.get_playback_state(market=args.get("market"))
empty_result = _describe_empty_playback(payload, action=action)
return tool_result(empty_result or payload)
if action == "get_currently_playing":
payload = client.get_currently_playing(market=args.get("market"))
empty_result = _describe_empty_playback(payload, action=action)
return tool_result(empty_result or payload)
if action == "play":
offset = args.get("offset")
if isinstance(offset, dict):
payload_offset = {k: v for k, v in offset.items() if v is not None}
else:
payload_offset = None
uris = normalize_spotify_uris(_as_list(args.get("uris")), "track") if args.get("uris") else None
context_uri = None
if args.get("context_uri"):
raw_context = str(args.get("context_uri"))
context_type = None
if raw_context.startswith("spotify:album:") or "/album/" in raw_context:
context_type = "album"
elif raw_context.startswith("spotify:playlist:") or "/playlist/" in raw_context:
context_type = "playlist"
elif raw_context.startswith("spotify:artist:") or "/artist/" in raw_context:
context_type = "artist"
context_uri = normalize_spotify_uri(raw_context, context_type)
result = client.start_playback(
device_id=args.get("device_id"),
context_uri=context_uri,
uris=uris,
offset=payload_offset,
position_ms=args.get("position_ms"),
)
return tool_result({"success": True, "action": action, "result": result})
if action == "pause":
result = client.pause_playback(device_id=args.get("device_id"))
return tool_result({"success": True, "action": action, "result": result})
if action == "next":
result = client.skip_next(device_id=args.get("device_id"))
return tool_result({"success": True, "action": action, "result": result})
if action == "previous":
result = client.skip_previous(device_id=args.get("device_id"))
return tool_result({"success": True, "action": action, "result": result})
if action == "seek":
if args.get("position_ms") is None:
return tool_error("position_ms is required for action='seek'")
result = client.seek(position_ms=int(args["position_ms"]), device_id=args.get("device_id"))
return tool_result({"success": True, "action": action, "result": result})
if action == "set_repeat":
state = str(args.get("state") or "").strip().lower()
if state not in {"track", "context", "off"}:
return tool_error("state must be one of: track, context, off")
result = client.set_repeat(state=state, device_id=args.get("device_id"))
return tool_result({"success": True, "action": action, "result": result})
if action == "set_shuffle":
result = client.set_shuffle(state=_coerce_bool(args.get("state")), device_id=args.get("device_id"))
return tool_result({"success": True, "action": action, "result": result})
if action == "set_volume":
if args.get("volume_percent") is None:
return tool_error("volume_percent is required for action='set_volume'")
result = client.set_volume(volume_percent=max(0, min(100, int(args["volume_percent"]))), device_id=args.get("device_id"))
return tool_result({"success": True, "action": action, "result": result})
return tool_error(f"Unknown spotify_playback action: {action}")
except Exception as exc:
return _spotify_tool_error(exc)
def _handle_spotify_devices(args: dict, **kw) -> str:
action = str(args.get("action") or "list").strip().lower()
client = _spotify_client()
try:
if action == "list":
return tool_result(client.get_devices())
if action == "transfer":
device_id = str(args.get("device_id") or "").strip()
if not device_id:
return tool_error("device_id is required for action='transfer'")
result = client.transfer_playback(device_id=device_id, play=_coerce_bool(args.get("play")))
return tool_result({"success": True, "action": action, "result": result})
return tool_error(f"Unknown spotify_devices action: {action}")
except Exception as exc:
return _spotify_tool_error(exc)
def _handle_spotify_queue(args: dict, **kw) -> str:
action = str(args.get("action") or "get").strip().lower()
client = _spotify_client()
try:
if action == "get":
return tool_result(client.get_queue())
if action == "add":
uri = normalize_spotify_uri(str(args.get("uri") or ""), None)
result = client.add_to_queue(uri=uri, device_id=args.get("device_id"))
return tool_result({"success": True, "action": action, "uri": uri, "result": result})
return tool_error(f"Unknown spotify_queue action: {action}")
except Exception as exc:
return _spotify_tool_error(exc)
def _handle_spotify_search(args: dict, **kw) -> str:
client = _spotify_client()
query = str(args.get("query") or "").strip()
if not query:
return tool_error("query is required")
raw_types = _as_list(args.get("types") or args.get("type") or ["track"])
search_types = [value.lower() for value in raw_types if value.lower() in {"album", "artist", "playlist", "track", "show", "episode", "audiobook"}]
if not search_types:
return tool_error("types must contain one or more of: album, artist, playlist, track, show, episode, audiobook")
try:
return tool_result(client.search(
query=query,
search_types=search_types,
limit=_coerce_limit(args.get("limit"), default=10),
offset=max(0, int(args.get("offset") or 0)),
market=args.get("market"),
include_external=args.get("include_external"),
))
except Exception as exc:
return _spotify_tool_error(exc)
def _handle_spotify_playlists(args: dict, **kw) -> str:
action = str(args.get("action") or "list").strip().lower()
client = _spotify_client()
try:
if action == "list":
return tool_result(client.get_my_playlists(
limit=_coerce_limit(args.get("limit"), default=20),
offset=max(0, int(args.get("offset") or 0)),
))
if action == "get":
playlist_id = normalize_spotify_id(str(args.get("playlist_id") or ""), "playlist")
return tool_result(client.get_playlist(playlist_id=playlist_id, market=args.get("market")))
if action == "create":
name = str(args.get("name") or "").strip()
if not name:
return tool_error("name is required for action='create'")
return tool_result(client.create_playlist(
name=name,
public=_coerce_bool(args.get("public")),
collaborative=_coerce_bool(args.get("collaborative")),
description=args.get("description"),
))
if action == "add_items":
playlist_id = normalize_spotify_id(str(args.get("playlist_id") or ""), "playlist")
uris = normalize_spotify_uris(_as_list(args.get("uris")))
return tool_result(client.add_playlist_items(
playlist_id=playlist_id,
uris=uris,
position=args.get("position"),
))
if action == "remove_items":
playlist_id = normalize_spotify_id(str(args.get("playlist_id") or ""), "playlist")
uris = normalize_spotify_uris(_as_list(args.get("uris")))
return tool_result(client.remove_playlist_items(
playlist_id=playlist_id,
uris=uris,
snapshot_id=args.get("snapshot_id"),
))
if action == "update_details":
playlist_id = normalize_spotify_id(str(args.get("playlist_id") or ""), "playlist")
return tool_result(client.update_playlist_details(
playlist_id=playlist_id,
name=args.get("name"),
public=args.get("public"),
collaborative=args.get("collaborative"),
description=args.get("description"),
))
return tool_error(f"Unknown spotify_playlists action: {action}")
except Exception as exc:
return _spotify_tool_error(exc)
def _handle_spotify_albums(args: dict, **kw) -> str:
action = str(args.get("action") or "get").strip().lower()
client = _spotify_client()
try:
album_id = normalize_spotify_id(str(args.get("album_id") or args.get("id") or ""), "album")
if action == "get":
return tool_result(client.get_album(album_id=album_id, market=args.get("market")))
if action == "tracks":
return tool_result(client.get_album_tracks(
album_id=album_id,
limit=_coerce_limit(args.get("limit"), default=20),
offset=max(0, int(args.get("offset") or 0)),
market=args.get("market"),
))
return tool_error(f"Unknown spotify_albums action: {action}")
except Exception as exc:
return _spotify_tool_error(exc)
def _handle_spotify_saved_tracks(args: dict, **kw) -> str:
action = str(args.get("action") or "list").strip().lower()
client = _spotify_client()
try:
if action == "list":
return tool_result(client.get_saved_tracks(
limit=_coerce_limit(args.get("limit"), default=20),
offset=max(0, int(args.get("offset") or 0)),
market=args.get("market"),
))
if action == "save":
uris = normalize_spotify_uris(_as_list(args.get("uris") or args.get("items")), "track")
return tool_result(client.save_library_items(uris=uris))
if action == "remove":
track_ids = [normalize_spotify_id(item, "track") for item in _as_list(args.get("ids") or args.get("items"))]
if not track_ids:
return tool_error("ids/items is required for action='remove'")
return tool_result(client.remove_saved_tracks(track_ids=track_ids))
return tool_error(f"Unknown spotify_saved_tracks action: {action}")
except Exception as exc:
return _spotify_tool_error(exc)
def _handle_spotify_saved_albums(args: dict, **kw) -> str:
action = str(args.get("action") or "list").strip().lower()
client = _spotify_client()
try:
if action == "list":
return tool_result(client.get_saved_albums(
limit=_coerce_limit(args.get("limit"), default=20),
offset=max(0, int(args.get("offset") or 0)),
market=args.get("market"),
))
if action == "save":
uris = normalize_spotify_uris(_as_list(args.get("uris") or args.get("items")), "album")
return tool_result(client.save_library_items(uris=uris))
if action == "remove":
album_ids = [normalize_spotify_id(item, "album") for item in _as_list(args.get("ids") or args.get("items"))]
if not album_ids:
return tool_error("ids/items is required for action='remove'")
return tool_result(client.remove_saved_albums(album_ids=album_ids))
return tool_error(f"Unknown spotify_saved_albums action: {action}")
except Exception as exc:
return _spotify_tool_error(exc)
def _handle_spotify_activity(args: dict, **kw) -> str:
action = str(args.get("action") or "now_playing").strip().lower()
client = _spotify_client()
try:
if action == "now_playing":
payload = client.get_currently_playing(market=args.get("market"))
if isinstance(payload, dict) and payload.get("empty"):
return tool_result({
"success": True,
"action": action,
"is_playing": False,
"status_code": payload.get("status_code", 204),
"message": payload.get("message") or "Spotify is not currently playing anything.",
})
return tool_result(payload)
if action == "recently_played":
after = args.get("after")
before = args.get("before")
if after and before:
return tool_error("Provide only one of 'after' or 'before'")
return tool_result(client.get_recently_played(
limit=_coerce_limit(args.get("limit"), default=20),
after=int(after) if after is not None else None,
before=int(before) if before is not None else None,
))
return tool_error(f"Unknown spotify_activity action: {action}")
except Exception as exc:
return _spotify_tool_error(exc)
COMMON_STRING = {"type": "string"}
SPOTIFY_PLAYBACK_SCHEMA = {
"name": "spotify_playback",
"description": "Control Spotify playback or inspect the active playback state.",
"parameters": {
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["get_state", "get_currently_playing", "play", "pause", "next", "previous", "seek", "set_repeat", "set_shuffle", "set_volume"]},
"device_id": COMMON_STRING,
"market": COMMON_STRING,
"context_uri": COMMON_STRING,
"uris": {"type": "array", "items": COMMON_STRING},
"offset": {"type": "object"},
"position_ms": {"type": "integer"},
"state": {"description": "For set_repeat use track/context/off. For set_shuffle use boolean-like true/false.", "oneOf": [{"type": "string"}, {"type": "boolean"}]},
"volume_percent": {"type": "integer"},
},
"required": ["action"],
},
}
SPOTIFY_DEVICES_SCHEMA = {
"name": "spotify_devices",
"description": "List Spotify Connect devices or transfer playback to a different device.",
"parameters": {
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["list", "transfer"]},
"device_id": COMMON_STRING,
"play": {"type": "boolean"},
},
"required": ["action"],
},
}
SPOTIFY_QUEUE_SCHEMA = {
"name": "spotify_queue",
"description": "Inspect the user's Spotify queue or add an item to it.",
"parameters": {
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["get", "add"]},
"uri": COMMON_STRING,
"device_id": COMMON_STRING,
},
"required": ["action"],
},
}
SPOTIFY_SEARCH_SCHEMA = {
"name": "spotify_search",
"description": "Search the Spotify catalog for tracks, albums, artists, playlists, shows, or episodes.",
"parameters": {
"type": "object",
"properties": {
"query": COMMON_STRING,
"types": {"type": "array", "items": COMMON_STRING},
"type": COMMON_STRING,
"limit": {"type": "integer"},
"offset": {"type": "integer"},
"market": COMMON_STRING,
"include_external": COMMON_STRING,
},
"required": ["query"],
},
}
SPOTIFY_PLAYLISTS_SCHEMA = {
"name": "spotify_playlists",
"description": "List, inspect, create, update, and modify Spotify playlists.",
"parameters": {
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["list", "get", "create", "add_items", "remove_items", "update_details"]},
"playlist_id": COMMON_STRING,
"market": COMMON_STRING,
"limit": {"type": "integer"},
"offset": {"type": "integer"},
"name": COMMON_STRING,
"description": COMMON_STRING,
"public": {"type": "boolean"},
"collaborative": {"type": "boolean"},
"uris": {"type": "array", "items": COMMON_STRING},
"position": {"type": "integer"},
"snapshot_id": COMMON_STRING,
},
"required": ["action"],
},
}
SPOTIFY_ALBUMS_SCHEMA = {
"name": "spotify_albums",
"description": "Fetch Spotify album metadata or album tracks.",
"parameters": {
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["get", "tracks"]},
"album_id": COMMON_STRING,
"id": COMMON_STRING,
"market": COMMON_STRING,
"limit": {"type": "integer"},
"offset": {"type": "integer"},
},
"required": ["action"],
},
}
SPOTIFY_SAVED_TRACKS_SCHEMA = {
"name": "spotify_saved_tracks",
"description": "List, save, or remove the user's saved Spotify tracks.",
"parameters": {
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["list", "save", "remove"]},
"limit": {"type": "integer"},
"offset": {"type": "integer"},
"market": COMMON_STRING,
"uris": {"type": "array", "items": COMMON_STRING},
"ids": {"type": "array", "items": COMMON_STRING},
"items": {"type": "array", "items": COMMON_STRING},
},
"required": ["action"],
},
}
SPOTIFY_SAVED_ALBUMS_SCHEMA = {
"name": "spotify_saved_albums",
"description": "List, save, or remove the user's saved Spotify albums.",
"parameters": {
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["list", "save", "remove"]},
"limit": {"type": "integer"},
"offset": {"type": "integer"},
"market": COMMON_STRING,
"uris": {"type": "array", "items": COMMON_STRING},
"ids": {"type": "array", "items": COMMON_STRING},
"items": {"type": "array", "items": COMMON_STRING},
},
"required": ["action"],
},
}
SPOTIFY_ACTIVITY_SCHEMA = {
"name": "spotify_activity",
"description": "Inspect now playing or recently played Spotify activity.",
"parameters": {
"type": "object",
"properties": {
"action": {"type": "string", "enum": ["now_playing", "recently_played"]},
"market": COMMON_STRING,
"limit": {"type": "integer"},
"after": {"type": "integer"},
"before": {"type": "integer"},
},
"required": ["action"],
},
}
registry.register(name="spotify_playback", toolset="spotify", schema=SPOTIFY_PLAYBACK_SCHEMA, handler=_handle_spotify_playback, check_fn=_check_spotify_available, emoji="🎵")
registry.register(name="spotify_devices", toolset="spotify", schema=SPOTIFY_DEVICES_SCHEMA, handler=_handle_spotify_devices, check_fn=_check_spotify_available, emoji="🔈")
registry.register(name="spotify_queue", toolset="spotify", schema=SPOTIFY_QUEUE_SCHEMA, handler=_handle_spotify_queue, check_fn=_check_spotify_available, emoji="📻")
registry.register(name="spotify_search", toolset="spotify", schema=SPOTIFY_SEARCH_SCHEMA, handler=_handle_spotify_search, check_fn=_check_spotify_available, emoji="🔎")
registry.register(name="spotify_playlists", toolset="spotify", schema=SPOTIFY_PLAYLISTS_SCHEMA, handler=_handle_spotify_playlists, check_fn=_check_spotify_available, emoji="📚")
registry.register(name="spotify_albums", toolset="spotify", schema=SPOTIFY_ALBUMS_SCHEMA, handler=_handle_spotify_albums, check_fn=_check_spotify_available, emoji="💿")
registry.register(name="spotify_saved_tracks", toolset="spotify", schema=SPOTIFY_SAVED_TRACKS_SCHEMA, handler=_handle_spotify_saved_tracks, check_fn=_check_spotify_available, emoji="❤️")
registry.register(name="spotify_saved_albums", toolset="spotify", schema=SPOTIFY_SAVED_ALBUMS_SCHEMA, handler=_handle_spotify_saved_albums, check_fn=_check_spotify_available, emoji="💽")
registry.register(name="spotify_activity", toolset="spotify", schema=SPOTIFY_ACTIVITY_SCHEMA, handler=_handle_spotify_activity, check_fn=_check_spotify_available, emoji="🕘")