hermes-agent/tools/providers/spotify_client.py
2026-04-24 05:20:38 -07:00

435 lines
15 KiB
Python

"""Thin Spotify Web API helper used by Hermes native tools."""
from __future__ import annotations
import json
from typing import Any, Dict, Iterable, Optional
from urllib.parse import urlparse
import httpx
from hermes_cli.auth import (
AuthError,
resolve_spotify_runtime_credentials,
)
class SpotifyError(RuntimeError):
"""Base Spotify tool error."""
class SpotifyAuthRequiredError(SpotifyError):
"""Raised when the user needs to authenticate with Spotify first."""
class SpotifyAPIError(SpotifyError):
"""Structured Spotify API failure."""
def __init__(
self,
message: str,
*,
status_code: Optional[int] = None,
response_body: Optional[str] = None,
) -> None:
super().__init__(message)
self.status_code = status_code
self.response_body = response_body
self.path = None
class SpotifyClient:
def __init__(self) -> None:
self._runtime = self._resolve_runtime(refresh_if_expiring=True)
def _resolve_runtime(self, *, force_refresh: bool = False, refresh_if_expiring: bool = True) -> Dict[str, Any]:
try:
return resolve_spotify_runtime_credentials(
force_refresh=force_refresh,
refresh_if_expiring=refresh_if_expiring,
)
except AuthError as exc:
raise SpotifyAuthRequiredError(str(exc)) from exc
@property
def base_url(self) -> str:
return str(self._runtime.get("base_url") or "").rstrip("/")
def _headers(self) -> Dict[str, str]:
return {
"Authorization": f"Bearer {self._runtime['access_token']}",
"Content-Type": "application/json",
}
def request(
self,
method: str,
path: str,
*,
params: Optional[Dict[str, Any]] = None,
json_body: Optional[Dict[str, Any]] = None,
allow_retry_on_401: bool = True,
empty_response: Optional[Dict[str, Any]] = None,
) -> Any:
url = f"{self.base_url}{path}"
response = httpx.request(
method,
url,
headers=self._headers(),
params=_strip_none(params),
json=_strip_none(json_body) if json_body is not None else None,
timeout=30.0,
)
if response.status_code == 401 and allow_retry_on_401:
self._runtime = self._resolve_runtime(force_refresh=True, refresh_if_expiring=True)
return self.request(
method,
path,
params=params,
json_body=json_body,
allow_retry_on_401=False,
)
if response.status_code >= 400:
self._raise_api_error(response, method=method, path=path)
if response.status_code == 204 or not response.content:
return empty_response or {"success": True, "status_code": response.status_code, "empty": True}
if "application/json" in response.headers.get("content-type", ""):
return response.json()
return {"success": True, "text": response.text}
def _raise_api_error(self, response: httpx.Response, *, method: str, path: str) -> None:
detail = response.text.strip()
message = _friendly_spotify_error_message(
status_code=response.status_code,
detail=_extract_spotify_error_detail(response, fallback=detail),
method=method,
path=path,
retry_after=response.headers.get("Retry-After"),
)
error = SpotifyAPIError(message, status_code=response.status_code, response_body=detail)
error.path = path
raise error
def get_devices(self) -> Any:
return self.request("GET", "/me/player/devices")
def transfer_playback(self, *, device_id: str, play: bool = False) -> Any:
return self.request("PUT", "/me/player", json_body={
"device_ids": [device_id],
"play": play,
})
def get_playback_state(self, *, market: Optional[str] = None) -> Any:
return self.request(
"GET",
"/me/player",
params={"market": market},
empty_response={
"status_code": 204,
"empty": True,
"message": "No active Spotify playback session was found. Open Spotify on a device and start playback, or transfer playback to an available device.",
},
)
def get_currently_playing(self, *, market: Optional[str] = None) -> Any:
return self.request(
"GET",
"/me/player/currently-playing",
params={"market": market},
empty_response={
"status_code": 204,
"empty": True,
"message": "Spotify is not currently playing anything. Start playback in Spotify and try again.",
},
)
def start_playback(
self,
*,
device_id: Optional[str] = None,
context_uri: Optional[str] = None,
uris: Optional[list[str]] = None,
offset: Optional[Dict[str, Any]] = None,
position_ms: Optional[int] = None,
) -> Any:
return self.request(
"PUT",
"/me/player/play",
params={"device_id": device_id},
json_body={
"context_uri": context_uri,
"uris": uris,
"offset": offset,
"position_ms": position_ms,
},
)
def pause_playback(self, *, device_id: Optional[str] = None) -> Any:
return self.request("PUT", "/me/player/pause", params={"device_id": device_id})
def skip_next(self, *, device_id: Optional[str] = None) -> Any:
return self.request("POST", "/me/player/next", params={"device_id": device_id})
def skip_previous(self, *, device_id: Optional[str] = None) -> Any:
return self.request("POST", "/me/player/previous", params={"device_id": device_id})
def seek(self, *, position_ms: int, device_id: Optional[str] = None) -> Any:
return self.request("PUT", "/me/player/seek", params={
"position_ms": position_ms,
"device_id": device_id,
})
def set_repeat(self, *, state: str, device_id: Optional[str] = None) -> Any:
return self.request("PUT", "/me/player/repeat", params={"state": state, "device_id": device_id})
def set_shuffle(self, *, state: bool, device_id: Optional[str] = None) -> Any:
return self.request("PUT", "/me/player/shuffle", params={"state": str(bool(state)).lower(), "device_id": device_id})
def set_volume(self, *, volume_percent: int, device_id: Optional[str] = None) -> Any:
return self.request("PUT", "/me/player/volume", params={
"volume_percent": volume_percent,
"device_id": device_id,
})
def get_queue(self) -> Any:
return self.request("GET", "/me/player/queue")
def add_to_queue(self, *, uri: str, device_id: Optional[str] = None) -> Any:
return self.request("POST", "/me/player/queue", params={"uri": uri, "device_id": device_id})
def search(
self,
*,
query: str,
search_types: list[str],
limit: int = 10,
offset: int = 0,
market: Optional[str] = None,
include_external: Optional[str] = None,
) -> Any:
return self.request("GET", "/search", params={
"q": query,
"type": ",".join(search_types),
"limit": limit,
"offset": offset,
"market": market,
"include_external": include_external,
})
def get_my_playlists(self, *, limit: int = 20, offset: int = 0) -> Any:
return self.request("GET", "/me/playlists", params={"limit": limit, "offset": offset})
def get_playlist(self, *, playlist_id: str, market: Optional[str] = None) -> Any:
return self.request("GET", f"/playlists/{playlist_id}", params={"market": market})
def create_playlist(
self,
*,
name: str,
public: bool = False,
collaborative: bool = False,
description: Optional[str] = None,
) -> Any:
return self.request("POST", "/me/playlists", json_body={
"name": name,
"public": public,
"collaborative": collaborative,
"description": description,
})
def add_playlist_items(
self,
*,
playlist_id: str,
uris: list[str],
position: Optional[int] = None,
) -> Any:
return self.request("POST", f"/playlists/{playlist_id}/items", json_body={
"uris": uris,
"position": position,
})
def remove_playlist_items(
self,
*,
playlist_id: str,
uris: list[str],
snapshot_id: Optional[str] = None,
) -> Any:
return self.request("DELETE", f"/playlists/{playlist_id}/items", json_body={
"items": [{"uri": uri} for uri in uris],
"snapshot_id": snapshot_id,
})
def update_playlist_details(
self,
*,
playlist_id: str,
name: Optional[str] = None,
public: Optional[bool] = None,
collaborative: Optional[bool] = None,
description: Optional[str] = None,
) -> Any:
return self.request("PUT", f"/playlists/{playlist_id}", json_body={
"name": name,
"public": public,
"collaborative": collaborative,
"description": description,
})
def get_album(self, *, album_id: str, market: Optional[str] = None) -> Any:
return self.request("GET", f"/albums/{album_id}", params={"market": market})
def get_album_tracks(self, *, album_id: str, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any:
return self.request("GET", f"/albums/{album_id}/tracks", params={
"limit": limit,
"offset": offset,
"market": market,
})
def get_saved_tracks(self, *, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any:
return self.request("GET", "/me/tracks", params={"limit": limit, "offset": offset, "market": market})
def save_library_items(self, *, uris: list[str]) -> Any:
return self.request("PUT", "/me/library", params={"uris": ",".join(uris)})
def library_contains(self, *, uris: list[str]) -> Any:
return self.request("GET", "/me/library/contains", params={"uris": ",".join(uris)})
def get_saved_albums(self, *, limit: int = 20, offset: int = 0, market: Optional[str] = None) -> Any:
return self.request("GET", "/me/albums", params={"limit": limit, "offset": offset, "market": market})
def remove_saved_tracks(self, *, track_ids: list[str]) -> Any:
uris = [f"spotify:track:{track_id}" for track_id in track_ids]
return self.request("DELETE", "/me/library", params={"uris": ",".join(uris)})
def remove_saved_albums(self, *, album_ids: list[str]) -> Any:
uris = [f"spotify:album:{album_id}" for album_id in album_ids]
return self.request("DELETE", "/me/library", params={"uris": ",".join(uris)})
def get_recently_played(
self,
*,
limit: int = 20,
after: Optional[int] = None,
before: Optional[int] = None,
) -> Any:
return self.request("GET", "/me/player/recently-played", params={
"limit": limit,
"after": after,
"before": before,
})
def _extract_spotify_error_detail(response: httpx.Response, *, fallback: str) -> str:
detail = fallback
try:
payload = response.json()
if isinstance(payload, dict):
error_obj = payload.get("error")
if isinstance(error_obj, dict):
detail = str(error_obj.get("message") or detail)
elif isinstance(error_obj, str):
detail = error_obj
except Exception:
pass
return detail.strip()
def _friendly_spotify_error_message(
*,
status_code: int,
detail: str,
method: str,
path: str,
retry_after: Optional[str],
) -> str:
normalized_detail = detail.lower()
is_playback_path = path.startswith("/me/player")
if status_code == 401:
return "Spotify authentication failed or expired. Run `hermes auth spotify` again."
if status_code == 403:
if is_playback_path:
return (
"Spotify rejected this playback request. Playback control usually requires a Spotify Premium account "
"and an active Spotify Connect device."
)
if "scope" in normalized_detail or "permission" in normalized_detail:
return "Spotify rejected the request because the current auth scope is insufficient. Re-run `hermes auth spotify` to refresh permissions."
return "Spotify rejected the request. The account may not have permission for this action."
if status_code == 404:
if is_playback_path:
return "Spotify could not find an active playback device or player session for this request."
return "Spotify resource not found."
if status_code == 429:
message = "Spotify rate limit exceeded."
if retry_after:
message += f" Retry after {retry_after} seconds."
return message
if detail:
return detail
return f"Spotify API request failed with status {status_code}."
def _strip_none(payload: Optional[Dict[str, Any]]) -> Dict[str, Any]:
if not payload:
return {}
return {key: value for key, value in payload.items() if value is not None}
def normalize_spotify_id(value: str, expected_type: Optional[str] = None) -> str:
cleaned = (value or "").strip()
if not cleaned:
raise SpotifyError("Spotify id/uri/url is required.")
if cleaned.startswith("spotify:"):
parts = cleaned.split(":")
if len(parts) >= 3:
item_type = parts[1]
if expected_type and item_type != expected_type:
raise SpotifyError(f"Expected a Spotify {expected_type}, got {item_type}.")
return parts[2]
if "open.spotify.com" in cleaned:
parsed = urlparse(cleaned)
path_parts = [part for part in parsed.path.split("/") if part]
if len(path_parts) >= 2:
item_type, item_id = path_parts[0], path_parts[1]
if expected_type and item_type != expected_type:
raise SpotifyError(f"Expected a Spotify {expected_type}, got {item_type}.")
return item_id
return cleaned
def normalize_spotify_uri(value: str, expected_type: Optional[str] = None) -> str:
cleaned = (value or "").strip()
if not cleaned:
raise SpotifyError("Spotify URI/url/id is required.")
if cleaned.startswith("spotify:"):
if expected_type:
parts = cleaned.split(":")
if len(parts) >= 3 and parts[1] != expected_type:
raise SpotifyError(f"Expected a Spotify {expected_type}, got {parts[1]}.")
return cleaned
item_id = normalize_spotify_id(cleaned, expected_type)
if expected_type:
return f"spotify:{expected_type}:{item_id}"
return cleaned
def normalize_spotify_uris(values: Iterable[str], expected_type: Optional[str] = None) -> list[str]:
uris: list[str] = []
for value in values:
uri = normalize_spotify_uri(str(value), expected_type)
if uri not in uris:
uris.append(uri)
if not uris:
raise SpotifyError("At least one Spotify item is required.")
return uris
def compact_json(data: Any) -> str:
return json.dumps(data, ensure_ascii=False)