diff --git a/README.md b/README.md index df32d54c8..fffdca373 100644 --- a/README.md +++ b/README.md @@ -53,9 +53,9 @@ The package is published on pip install deezer-python -## Basic Use +## Basic Use (synchronous) -Easily query the Deezer API from you Python code. The data returned by the Deezer +Easily query the Deezer API from your Python code. The data returned by the Deezer API is mapped to python resources: ```pycon @@ -64,8 +64,46 @@ API is mapped to python resources: 'Monkey Business' ``` +## Async usage + +An asynchronous client is also available, built on top of `httpx.AsyncClient`. +It mirrors the public API of the synchronous `deezer.Client`, but all network +calls must be awaited and the client must be used in an async context. + +```python +import asyncio +from async_deezer import AsyncClient + + +async def main() -> None: + async with AsyncClient() as client: + album = await client.get_album(680407) + print(album.title) + + +asyncio.run(main()) +``` + +You can work with paginated endpoints using `AsyncPaginatedList` and `async for`: + +```python +from async_deezer import AsyncClient + + +async def main() -> None: + async with AsyncClient() as client: + tracks = client.get_user_tracks() # AsyncPaginatedList + async for track in tracks: + print(track.title) +``` + +Notes: + +- The async client lives in the separate `async_deezer` package inside this + project so it does not change the public API of `deezer-python`. + Ready for more? Look at our whole [documentation](http://deezer-python.readthedocs.io/) -on Read The Docs or have a play in pre-populated Jupyter notebook +on Read The Docs or have a play in a pre-populated Jupyter notebook [on Binder](https://mybinder.org/v2/gh/browniebroke/deezer-python/main?filepath=demo.ipynb). ## Contributors diff --git a/docs/usage.md b/docs/usage.md index 9e1290bac..d278ef5a2 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -15,7 +15,66 @@ with deezer.Client() as client: ... ``` -This is [the recommended way to use it by `httpx`](https://www.python-httpx.org/advanced/clients/#usage, which is the library we use under the hood. +This is [the recommended way to use it by `httpx`](https://www.python-httpx.org/advanced/clients/#usage), which is the library we use under the hood. + +## Async client + +If you are writing an asynchronous application, you can use the async client +provided by the `async_deezer` package. It closely mirrors the public API of +{class}`Client `, but: + +- it is built on top of {class}`httpx.AsyncClient`, and +- methods that perform network I/O are declared as `async` and must be awaited. + +### First steps with the async client + +Instantiate and use the async client inside an async function and context +manager: + +```python +import asyncio + +from async_deezer import AsyncClient + + +async def main() -> None: + async with AsyncClient() as client: + album = await client.get_album(680407) + print(album.title) + + +asyncio.run(main()) +``` + +The same recommendations from the synchronous client apply: using it as a +context manager ensures that network resources are cleaned up properly. + +### Async pagination + +Endpoints that return multiple items, such as `get_user_tracks()` or the +various `search*` methods, return an {class}`AsyncPaginatedList +` when using the async client. +You can iterate over it with `async for`: + +```python +from async_deezer import AsyncClient + + +async def main() -> None: + async with AsyncClient() as client: + tracks = client.get_user_tracks() + async for track in tracks: + print(track.title) +``` + +For random access into the list, use the explicit async helpers instead of +`__getitem__`: + +```python +track = await tracks.aget(0) +first_five = await tracks.aslice(0, 5) +total = await tracks.get_total() +``` From there, you can search for some terms: diff --git a/src/async_deezer/__init__.py b/src/async_deezer/__init__.py new file mode 100644 index 000000000..3bb4816eb --- /dev/null +++ b/src/async_deezer/__init__.py @@ -0,0 +1,3 @@ +from async_deezer.client import AsyncClient + +__all__ = ["AsyncClient"] diff --git a/src/async_deezer/client.py b/src/async_deezer/client.py new file mode 100644 index 000000000..691463ba3 --- /dev/null +++ b/src/async_deezer/client.py @@ -0,0 +1,474 @@ +from __future__ import annotations + +from typing import Any, ClassVar + +import httpx +from httpx._types import HeaderTypes + +from deezer.auth import DeezerQueryAuth +from deezer.exceptions import ( + DeezerErrorResponse, + DeezerHTTPError, + DeezerUnknownResource, +) + +from .pagination import AsyncPaginatedList +from .resources import ( + Album, + Artist, + Chart, + Editorial, + Episode, + Genre, + Playlist, + Podcast, + Radio, + Resource, + Track, + User, +) + + +class AsyncClient(httpx.AsyncClient): + """ + Asynchronous Deezer API client. + + This mirrors the public API of :class:`deezer.Client`, but uses an + :class:`httpx.AsyncClient` under the hood so that all network operations + are non-blocking. + """ + + objects_types: ClassVar[dict[str, type[Resource] | None]] = { + "album": Album, + "artist": Artist, + "chart": Chart, + "editorial": Editorial, + "episode": Episode, + "genre": Genre, + "playlist": Playlist, + "podcast": Podcast, + "radio": Radio, + "track": Track, + "user": User, + } + + def __init__( + self, + access_token: str | None = None, + headers: HeaderTypes | None = None, + ): + if access_token: + deezer_auth = DeezerQueryAuth(access_token=access_token) + else: + deezer_auth = None + super().__init__( + base_url="https://api.deezer.com", + auth=deezer_auth, + headers=headers, + ) + + def _process_json( + self, + item: dict[str, Any], + parent: Resource | None = None, + resource_type: type[Resource] | None = None, + resource_id: int | None = None, + paginate_list: bool = False, + ): + """ + Recursively convert dictionary to :class:`~deezer.Resource` object. + + This is intentionally kept synchronous, as it only manipulates in-memory + Python objects. + """ + if "data" in item: + parsed_data = [self._process_json(i, parent, paginate_list=False) for i in item["data"]] + if not paginate_list: + return parsed_data + item["data"] = parsed_data + return item + + result: dict[str, Any] = {} + for key, value in item.items(): + if isinstance(value, dict) and ("type" in value or "data" in value): + value = self._process_json(value, parent) + result[key] = value + if parent is not None: + result[parent.type] = parent + + if "id" not in result and resource_id is not None: + result["id"] = resource_id + + if "type" in result and result["type"] in self.objects_types: + object_class = self.objects_types[result["type"]] + elif "type" in result or (not resource_type and "id" in result): + # in case any new types are introduced by the API + object_class = Resource + elif resource_type: + object_class = resource_type + elif item.get("results") is True: + return True + else: + raise DeezerUnknownResource(f"Unable to find resource type for {result!r}") + assert object_class is not None # noqa: S101 + return object_class(self, result) + + async def request( # type: ignore[override] + self, + method: str, + path: str, + parent: Resource | None = None, + resource_type: type[Resource] | None = None, + resource_id: int | None = None, + paginate_list: bool = False, + **kwargs: Any, + ): + """ + Make an asynchronous request to the API and parse the response. + """ + response = await super().request( + method, + path, + **kwargs, + ) + try: + response.raise_for_status() + except httpx.HTTPStatusError as exc: + raise DeezerHTTPError.from_http_error(exc) from exc + json_data = response.json() + if not isinstance(json_data, dict): + return json_data + if json_data.get("error"): + raise DeezerErrorResponse(json_data) + return self._process_json( + json_data, + parent=parent, + resource_type=resource_type, + resource_id=resource_id, + paginate_list=paginate_list, + ) + + # High level methods – mirror deezer.Client but return awaitables. + + def _get_paginated_list(self, path: str, params: dict | None = None) -> AsyncPaginatedList: + """ + Build an :class:`AsyncPaginatedList` bound to this client. + + Constructing the object itself is synchronous and does not perform + any network I/O; actual page fetching happens when you iterate over + it with ``async for`` or use its async helpers. + """ + return AsyncPaginatedList(client=self, base_path=path, params=params) + + async def get_album(self, album_id: int) -> Album: + return await self.request("GET", f"album/{album_id}") + + async def get_artist(self, artist_id: int) -> Artist: + return await self.request("GET", f"artist/{artist_id}") + + async def get_chart(self, genre_id: int = 0) -> Chart: + return await self.request( + "GET", + f"chart/{genre_id}", + resource_type=Chart, + resource_id=genre_id, + ) + + async def get_tracks_chart(self, genre_id: int = 0) -> list[Track]: + return await self.request("GET", f"chart/{genre_id}/tracks") + + async def get_albums_chart(self, genre_id: int = 0) -> list[Album]: + return await self.request("GET", f"chart/{genre_id}/albums") + + async def get_artists_chart(self, genre_id: int = 0) -> list[Artist]: + return await self.request("GET", f"chart/{genre_id}/artists") + + async def get_playlists_chart(self, genre_id: int = 0) -> list[Playlist]: + return await self.request("GET", f"chart/{genre_id}/playlists") + + async def get_podcasts_chart(self, genre_id: int = 0) -> list[Podcast]: + return await self.request("GET", f"chart/{genre_id}/podcasts") + + async def get_editorial(self, editorial_id: int) -> Editorial: + return await self.request("GET", f"editorial/{editorial_id}") + + def list_editorials(self) -> AsyncPaginatedList[Editorial]: + return self._get_paginated_list("editorial") + + async def get_episode(self, episode_id: int) -> Episode: + return await self.request("GET", f"episode/{episode_id}") + + async def get_genre(self, genre_id: int) -> Genre: + return await self.request("GET", f"genre/{genre_id}") + + async def list_genres(self) -> list[Genre]: + return await self.request("GET", "genre") + + async def get_playlist(self, playlist_id: int) -> Playlist: + return await self.request("GET", f"playlist/{playlist_id}") + + async def get_podcast(self, podcast_id: int) -> Podcast: + return await self.request("GET", f"podcast/{podcast_id}") + + async def get_radio(self, radio_id: int) -> Radio: + return await self.request("GET", f"radio/{radio_id}") + + async def list_radios(self) -> list[Radio]: + return await self.request("GET", "radio") + + def get_radios_top(self) -> AsyncPaginatedList[Radio]: + return self._get_paginated_list("radio/top") + + async def get_track(self, track_id: int) -> Track: + return await self.request("GET", f"track/{track_id}") + + async def get_user(self, user_id: int | None = None) -> User: + user_id_str = str(user_id) if user_id else "me" + return await self.request("GET", f"user/{user_id_str}") + + def get_user_recommended_tracks(self, **kwargs) -> AsyncPaginatedList[Track]: + from .pagination import AsyncPaginatedList # Import here to avoid circular import + + return AsyncPaginatedList( + client=self, + base_path="user/me/recommendations/tracks", + params=kwargs or None, + ) + + def get_user_recommended_albums(self, **kwargs) -> AsyncPaginatedList[Album]: + from .pagination import AsyncPaginatedList # Import here to avoid circular import + + return AsyncPaginatedList( + client=self, + base_path="user/me/recommendations/albums", + params=kwargs or None, + ) + + def get_user_recommended_artists(self, **kwargs) -> AsyncPaginatedList[Artist]: + from .pagination import AsyncPaginatedList # Import here to avoid circular import + + return AsyncPaginatedList( + client=self, + base_path="user/me/recommendations/artists", + params=kwargs or None, + ) + + def get_user_recommended_playlists(self, **kwargs) -> AsyncPaginatedList[Playlist]: + from .pagination import AsyncPaginatedList # Import here to avoid circular import + + return AsyncPaginatedList( + client=self, + base_path="user/me/recommendations/playlists", + params=kwargs or None, + ) + + def get_user_flow(self, **kwargs) -> AsyncPaginatedList[Track]: + from .pagination import AsyncPaginatedList # Import here to avoid circular import + + return AsyncPaginatedList( + client=self, + base_path="user/me/flow", + params=kwargs or None, + ) + + def get_user_albums(self, user_id: int | None = None) -> AsyncPaginatedList[Album]: + user_id_str = str(user_id) if user_id else "me" + return self._get_paginated_list(f"user/{user_id_str}/albums") + + async def add_user_album(self, album_id: int) -> bool: + return await self.request( + "POST", + "user/me/albums", + params={"album_id": album_id}, + ) + + async def remove_user_album(self, album_id: int) -> bool: + return await self.request( + "DELETE", + "user/me/albums", + params={"album_id": album_id}, + ) + + def get_user_artists(self, user_id: int | None = None) -> AsyncPaginatedList[Artist]: + user_id_str = str(user_id) if user_id else "me" + return self._get_paginated_list(f"user/{user_id_str}/artists") + + async def add_user_artist(self, artist_id: int) -> bool: + return await self.request( + "POST", + "user/me/artists", + params={"artist_id": artist_id}, + ) + + async def remove_user_artist(self, artist_id: int) -> bool: + return await self.request( + "DELETE", + "user/me/artists", + params={"artist_id": artist_id}, + ) + + def get_user_followers(self, user_id: int | None = None) -> AsyncPaginatedList[User]: + user_id_str = str(user_id) if user_id else "me" + return self._get_paginated_list(f"user/{user_id_str}/followers") + + def get_user_followings(self, user_id: int | None = None) -> AsyncPaginatedList[User]: + user_id_str = str(user_id) if user_id else "me" + return self._get_paginated_list(f"user/{user_id_str}/followings") + + async def add_user_following(self, user_id: int) -> bool: + return await self.request( + "POST", + "user/me/followings", + params={"user_id": user_id}, + ) + + async def remove_user_following(self, user_id: int) -> bool: + return await self.request( + "DELETE", + "user/me/followings", + params={"user_id": user_id}, + ) + + def get_user_history(self) -> AsyncPaginatedList[Track]: + return self._get_paginated_list("user/me/history") + + def get_user_tracks(self, user_id: int | None = None) -> AsyncPaginatedList[Track]: + user_id_str = str(user_id) if user_id else "me" + return self._get_paginated_list(f"user/{user_id_str}/tracks") + + async def add_user_track(self, track_id: int) -> bool: + return await self.request( + "POST", + "user/me/tracks", + params={"track_id": track_id}, + ) + + async def remove_user_track(self, track_id: int) -> bool: + return await self.request( + "DELETE", + "user/me/tracks", + params={"track_id": track_id}, + ) + + async def remove_user_playlist(self, playlist_id: int) -> bool: + return await self.request( + "DELETE", + "user/me/playlists", + params={"playlist_id": playlist_id}, + ) + + async def add_user_playlist(self, playlist_id: int) -> bool: + return await self.request( + "POST", + "user/me/playlists", + params={"playlist_id": playlist_id}, + ) + + async def create_playlist(self, playlist_name: str) -> int: + result = await self.request( + "POST", + "user/me/playlists", + params={"title": playlist_name}, + ) + return result.id + + async def delete_playlist(self, playlist_id: int) -> bool: + return await self.request("DELETE", f"playlist/{playlist_id}") + + def _search( + self, + path: str, + query: str = "", + strict: bool | None = None, + ordering: str | None = None, + **advanced_params: str | int | None, + ): + optional_params: dict[str, str] = {} + if strict is True: + optional_params["strict"] = "on" + if ordering: + optional_params["ordering"] = ordering + query_parts: list[str] = [] + if query: + query_parts.append(query) + query_parts.extend( + f'{param_name}:"{param_value}"' + for param_name, param_value in advanced_params.items() + if param_value is not None + ) + + return self._get_paginated_list( + path=f"search/{path}" if path else "search", + params={ + "q": " ".join(query_parts), + **optional_params, + }, + ) + + def search( + self, + query: str = "", + strict: bool | None = None, + ordering: str | None = None, + artist: str | None = None, + album: str | None = None, + track: str | None = None, + label: str | None = None, + dur_min: int | None = None, + dur_max: int | None = None, + bpm_min: int | None = None, + bpm_max: int | None = None, + ) -> AsyncPaginatedList[Track]: + return self._search( + "", + query=query, + strict=strict, + ordering=ordering, + artist=artist, + album=album, + track=track, + label=label, + dur_min=dur_min, + dur_max=dur_max, + bpm_min=bpm_min, + bpm_max=bpm_max, + ) + + def search_albums( + self, + query: str = "", + strict: bool | None = None, + ordering: str | None = None, + ) -> AsyncPaginatedList[Album]: + return self._search( + path="album", + query=query, + strict=strict, + ordering=ordering, + ) + + def search_artists( + self, + query: str = "", + strict: bool | None = None, + ordering: str | None = None, + ) -> AsyncPaginatedList[Artist]: + return self._search( + path="artist", + query=query, + strict=strict, + ordering=ordering, + ) + + def search_playlists( + self, + query: str = "", + strict: bool | None = None, + ordering: str | None = None, + ) -> AsyncPaginatedList[Playlist]: + return self._search( + path="playlist", + query=query, + strict=strict, + ordering=ordering, + ) diff --git a/src/async_deezer/pagination.py b/src/async_deezer/pagination.py new file mode 100644 index 000000000..4dd9a0773 --- /dev/null +++ b/src/async_deezer/pagination.py @@ -0,0 +1,128 @@ +from __future__ import annotations + +from collections.abc import AsyncGenerator +from typing import TYPE_CHECKING, Generic, TypeVar +from urllib.parse import parse_qs, urlparse + +if TYPE_CHECKING: # pragma: no cover - import used for typing only + from async_deezer.client import AsyncClient + from async_deezer.resources import Resource + +ResourceType = TypeVar("ResourceType") +REPR_OUTPUT_SIZE = 5 + + +class AsyncPaginatedList(Generic[ResourceType]): + """ + Asynchronous equivalent of :class:`deezer.pagination.PaginatedList`. + + It exposes an async iterator interface and explicit async helpers instead + of relying on synchronous ``__iter__`` / ``__getitem__`` that may block. + """ + + def __init__( + self, + client: AsyncClient, + base_path: str, + parent: Resource | None = None, + params: dict | None = None, + ): + self._elements: list[ResourceType] = [] + self._client = client + self._base_path = base_path + self._base_params = params or {} + self._next_path: str | None = base_path + self._next_params = params or {} + self._parent = parent + self._total: int | None = None + + def __repr__(self) -> str: + preview = self._elements[: REPR_OUTPUT_SIZE + 1] + data: list[ResourceType | str] = list(preview) + if len(data) > REPR_OUTPUT_SIZE: + data[-1] = "..." + return f"<{self.__class__.__name__} {data!r}>" + + def __aiter__(self) -> AsyncGenerator[ResourceType, None]: + async def _iter() -> AsyncGenerator[ResourceType, None]: + for element in self._elements: + yield element + while self._could_grow(): + for element in await self._grow(): + yield element + + return _iter() + + def _could_grow(self) -> bool: + return self._next_path is not None + + async def _grow(self) -> list[ResourceType]: + new_elements = await self._fetch_next_page() + self._elements.extend(new_elements) + return new_elements + + async def _fetch_next_page(self) -> list[ResourceType]: + assert self._next_path is not None # noqa: S101 + response_payload = await self._client.request( + "GET", + self._next_path, + parent=self._parent, + paginate_list=True, + params=self._next_params, + ) + self._next_path = None + self._total = response_payload.get("total") + next_url = response_payload.get("next", None) + if next_url: + url_bits = urlparse(next_url) + self._next_path = url_bits.path.lstrip("/") + self._next_params = parse_qs(url_bits.query) + return response_payload["data"] + + async def _fetch_to_index(self, index: int) -> None: + while len(self._elements) <= index and self._could_grow(): + await self._grow() + + async def aget(self, index: int) -> ResourceType: + """ + Asynchronously fetch the item at the given index. + + Unlike ``__getitem__`` on the synchronous pagination object, this does + not block the event loop. + """ + await self._fetch_to_index(index) + return self._elements[index] + + async def aslice(self, start: int, stop: int) -> list[ResourceType]: + """ + Asynchronously fetch a slice of items. + + This ensures that enough elements have been loaded from the API to + cover the requested range. + """ + if stop is not None: + await self._fetch_to_index(stop - 1) + else: + while self._could_grow(): + await self._grow() + return self._elements[start:stop] + + async def get_total(self) -> int: + """ + Asynchronously get the total number of items across all pages. + + Mirrors the ``total`` property of :class:`deezer.pagination.PaginatedList`. + """ + if self._total is None: + params = self._base_params.copy() + params["limit"] = 1 + response_payload = await self._client.request( + "GET", + self._base_path, + parent=self._parent, + paginate_list=True, + params=params, + ) + self._total = response_payload["total"] + assert self._total is not None # noqa: S101 + return self._total diff --git a/src/async_deezer/resources/__init__.py b/src/async_deezer/resources/__init__.py new file mode 100644 index 000000000..f04170081 --- /dev/null +++ b/src/async_deezer/resources/__init__.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from .album import Album +from .artist import Artist +from .chart import Chart +from .editorial import Editorial +from .episode import Episode +from .genre import Genre +from .playlist import Playlist +from .podcast import Podcast +from .radio import Radio +from .resource import Resource +from .track import Track +from .user import User + +__all__ = [ + "Album", + "Artist", + "Chart", + "Editorial", + "Episode", + "Genre", + "Playlist", + "Podcast", + "Radio", + "Resource", + "Track", + "User", +] diff --git a/src/async_deezer/resources/album.py b/src/async_deezer/resources/album.py new file mode 100644 index 000000000..724658960 --- /dev/null +++ b/src/async_deezer/resources/album.py @@ -0,0 +1,71 @@ +from __future__ import annotations + +import datetime as dt +from typing import TYPE_CHECKING + +from deezer.dates import parse_date + +from .artist import Artist +from .resource import Resource + +if TYPE_CHECKING: + from async_deezer.pagination import AsyncPaginatedList + + from .genre import Genre + from .track import Track + + +class Album(Resource): + """ + Async counterpart to :class:`deezer.resources.Album`. + """ + + id: int + title: str + upc: str + link: str + share: str + cover: str + cover_small: str + cover_medium: str + cover_big: str + cover_xl: str + md5_image: str + + genre_id: int + genres: list[Genre] + label: str + nb_tracks: int + duration: int + fans: int + release_date: dt.date + record_type: str + available: bool + + alternative: Album + tracklist: str + explicit_lyrics: bool + + explicit_content_lyrics: int + explicit_content_cover: int + contributors: list[Artist] + + artist: Artist + tracks: list[Track] + + _parse_release_date = staticmethod(parse_date) + + def _parse_contributors(self, raw_value): + return [Artist(client=self.client, json=val) for val in raw_value] + + async def get_artist(self) -> Artist: + """ + Get the artist of the Album. + """ + return await self.client.get_artist(self.artist.id) + + def get_tracks(self, **kwargs) -> AsyncPaginatedList[Track]: + """ + Get a paginated list of album's tracks. + """ + return self.get_paginated_list("tracks", params=kwargs or None) diff --git a/src/async_deezer/resources/artist.py b/src/async_deezer/resources/artist.py new file mode 100644 index 000000000..0f9fe3031 --- /dev/null +++ b/src/async_deezer/resources/artist.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from .resource import Resource + +if TYPE_CHECKING: + from async_deezer.pagination import AsyncPaginatedList + + from .album import Album + from .playlist import Playlist + from .track import Track + + +class Artist(Resource): + """ + Async counterpart to :class:`deezer.resources.Artist`. + """ + + id: int + name: str + link: str + share: str + picture: str + picture_small: str + picture_medium: str + picture_big: str + picture_xl: str + nb_album: int + nb_fan: int + radio: bool + tracklist: str + + def get_top(self, **kwargs) -> AsyncPaginatedList[Track]: + """Get the top tracks of an artist.""" + return self.get_paginated_list("top", params=kwargs or None) + + def get_related(self, **kwargs) -> AsyncPaginatedList[Artist]: + """Get a list of related artists.""" + return self.get_paginated_list("related", params=kwargs or None) + + async def get_radio(self, **kwargs) -> list[Track]: + """ + Get a list of tracks for the artist radio. + """ + # radio returns tracks from different artists -> no fwd parent + return await self.get_relation("radio", fwd_parent=False, params=kwargs or None) + + def get_albums(self, **kwargs) -> AsyncPaginatedList[Album]: + """Get a paginated list of artist's albums.""" + return self.get_paginated_list("albums", params=kwargs or None) + + def get_playlists(self, **kwargs) -> AsyncPaginatedList[Playlist]: + """Get a paginated list of artist's playlists.""" + return self.get_paginated_list("playlists", params=kwargs or None) diff --git a/src/async_deezer/resources/chart.py b/src/async_deezer/resources/chart.py new file mode 100644 index 000000000..a68765051 --- /dev/null +++ b/src/async_deezer/resources/chart.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from .resource import Resource + +if TYPE_CHECKING: + from async_deezer.pagination import AsyncPaginatedList + + from .album import Album + from .artist import Artist + from .playlist import Playlist + from .podcast import Podcast + from .track import Track + + +class Chart(Resource): + """ + Async counterpart to :class:`deezer.resources.Chart`. + """ + + type = "chart" + + id: int + tracks: list[Track] + albums: list[Album] + artists: list[Artist] + playlists: list[Playlist] + podcasts: list[Podcast] + + def get_tracks(self, **kwargs) -> AsyncPaginatedList[Track]: + return self.get_paginated_list("tracks", params=kwargs or None) + + def get_albums(self, **kwargs) -> AsyncPaginatedList[Album]: + return self.get_paginated_list("albums", params=kwargs or None) + + def get_artists(self, **kwargs) -> AsyncPaginatedList[Artist]: + return self.get_paginated_list("artists", params=kwargs or None) + + def get_playlists(self, **kwargs) -> AsyncPaginatedList[Playlist]: + return self.get_paginated_list("playlists", params=kwargs or None) + + def get_podcasts(self, **kwargs) -> AsyncPaginatedList[Podcast]: + return self.get_paginated_list("podcasts", params=kwargs or None) diff --git a/src/async_deezer/resources/editorial.py b/src/async_deezer/resources/editorial.py new file mode 100644 index 000000000..4494d2f7d --- /dev/null +++ b/src/async_deezer/resources/editorial.py @@ -0,0 +1,37 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from .chart import Chart +from .resource import Resource + +if TYPE_CHECKING: + from async_deezer.pagination import AsyncPaginatedList + + from .album import Album + + +class Editorial(Resource): + """ + Async counterpart to :class:`deezer.resources.Editorial`. + """ + + id: int + name: str + picture: str + picture_small: str + picture_medium: str + picture_big: str + picture_xl: str + + async def get_selection(self) -> list[Album]: + """Get a list of albums selected every week by the Deezer Team.""" + return await self.get_relation("selection") + + async def get_chart(self) -> Chart: + """Get top charts for tracks, albums, artists and playlists.""" + return await self.get_relation("charts", resource_type=Chart) + + def get_releases(self, **kwargs) -> AsyncPaginatedList[Album]: + """Get the new releases per genre for the current country.""" + return self.get_paginated_list("releases", params=kwargs or None) diff --git a/src/async_deezer/resources/episode.py b/src/async_deezer/resources/episode.py new file mode 100644 index 000000000..58ffed851 --- /dev/null +++ b/src/async_deezer/resources/episode.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +import datetime as dt +from typing import TYPE_CHECKING, Any + +from deezer.dates import parse_datetime + +from .resource import Resource + +if TYPE_CHECKING: + from .podcast import Podcast + + +class Episode(Resource): + """ + Async counterpart to :class:`deezer.resources.Episode`. + """ + + id: int + title: str + description: str + available: bool + release_date: dt.datetime + duration: int + link: str + share: str + picture: str + picture_small: str + picture_medium: str + picture_big: str + picture_xl: str + podcast: Podcast + + _parse_release_date = staticmethod(parse_datetime) + + def _infer_missing_field(self, item) -> Any: # type: ignore[override] + if item == "link": + return f"https://www.deezer.com/episode/{self.id}" + elif item == "share": + return f"{self.link}?utm_source=deezer&utm_content=episode-{self.id}&utm_medium=web" + return super()._infer_missing_field(item) # delegated to sync Resource via async base + + async def add_bookmark(self, offset: int) -> bool: + """ + Sets a bookmark on the episode. + """ + return await self.client.request( + "POST", + f"episode/{self.id}/bookmark", + params={"offset": offset}, + ) + + async def remove_bookmark(self) -> bool: + """ + Removes the bookmark on the episode. + """ + return await self.client.request("DELETE", f"episode/{self.id}/bookmark") diff --git a/src/async_deezer/resources/genre.py b/src/async_deezer/resources/genre.py new file mode 100644 index 000000000..cc30d1922 --- /dev/null +++ b/src/async_deezer/resources/genre.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from .resource import Resource + +if TYPE_CHECKING: + from async_deezer.pagination import AsyncPaginatedList + + from .artist import Artist + from .podcast import Podcast + from .radio import Radio + + +class Genre(Resource): + """ + Async counterpart to :class:`deezer.resources.Genre`. + """ + + id: int + name: str + picture: str + picture_small: str + picture_medium: str + picture_big: str + picture_xl: str + + async def get_artists(self, **kwargs) -> list[Artist]: + """Get all artists for a genre.""" + return await self.get_relation("artists", params=kwargs or None) + + def get_podcasts(self, **kwargs) -> AsyncPaginatedList[Podcast]: + """Get all podcasts for a genre.""" + return self.get_paginated_list("podcasts", params=kwargs or None) + + async def get_radios(self, **kwargs) -> list[Radio]: + """Get all radios for a genre.""" + return await self.get_relation("radios", params=kwargs or None) diff --git a/src/async_deezer/resources/playlist.py b/src/async_deezer/resources/playlist.py new file mode 100644 index 000000000..f77b3722c --- /dev/null +++ b/src/async_deezer/resources/playlist.py @@ -0,0 +1,80 @@ +from __future__ import annotations + +from collections.abc import Iterable +from typing import TYPE_CHECKING + +from deezer.utils import gen_ids + +from .resource import Resource + +if TYPE_CHECKING: + from async_deezer.pagination import AsyncPaginatedList + + from .track import Track + from .user import User + + +class Playlist(Resource): + """ + Async counterpart to :class:`deezer.resources.Playlist`. + """ + + id: int + title: str + description: str + duration: int + public: bool + is_loved_track: bool + collaborative: bool + nb_tracks: int + unseen_track_count: int + fans: int + link: str + share: str + picture: str + picture_small: str + picture_medium: str + picture_big: str + picture_xl: str + checksum: str + creator: User + tracks: list[Track] + + def get_tracks(self, **kwargs) -> AsyncPaginatedList[Track]: + """Get tracks from a playlist.""" + return self.get_paginated_list("tracks", params=kwargs or None) + + def get_fans(self, **kwargs) -> AsyncPaginatedList[User]: + """Get fans from a playlist.""" + return self.get_paginated_list("fans", params=kwargs or None) + + async def mark_seen(self) -> bool: + """Mark the playlist as seen.""" + return await self.client.request("POST", f"playlist/{self.id}/seen") + + async def add_tracks(self, tracks: Iterable[int | Track]) -> bool: + """Add tracks to a playlist.""" + track_ids_str = ",".join(str(tid) for tid in gen_ids(tracks)) + return await self.client.request( + "POST", + f"playlist/{self.id}/tracks", + params={"songs": track_ids_str}, + ) + + async def delete_tracks(self, tracks: Iterable[int | Track]) -> bool: + """Delete tracks from a playlist.""" + track_ids_str = ",".join(map(str, gen_ids(tracks))) + return await self.client.request( + "DELETE", + f"playlist/{self.id}/tracks", + params={"songs": track_ids_str}, + ) + + async def reorder_tracks(self, order: Iterable[int | Track]) -> bool: + """Reorder the tracks of a playlist.""" + order_track_ids_str = ",".join(map(str, gen_ids(order))) + return await self.client.request( + "POST", + f"playlist/{self.id}/tracks", + params={"order": order_track_ids_str}, + ) diff --git a/src/async_deezer/resources/podcast.py b/src/async_deezer/resources/podcast.py new file mode 100644 index 000000000..56051d9a2 --- /dev/null +++ b/src/async_deezer/resources/podcast.py @@ -0,0 +1,33 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from .resource import Resource + +if TYPE_CHECKING: + from async_deezer.pagination import AsyncPaginatedList + + from .episode import Episode + + +class Podcast(Resource): + """ + Async counterpart to :class:`deezer.resources.Podcast`. + """ + + id: int + title: str + description: str + available: bool + fans: int + link: str + share: str + picture: str + picture_small: str + picture_medium: str + picture_big: str + picture_xl: str + + def get_episodes(self, **kwargs) -> AsyncPaginatedList[Episode]: + """Get episodes from a podcast.""" + return self.get_paginated_list("episodes", params=kwargs or None) diff --git a/src/async_deezer/resources/radio.py b/src/async_deezer/resources/radio.py new file mode 100644 index 000000000..914a08709 --- /dev/null +++ b/src/async_deezer/resources/radio.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +from .resource import Resource + +if TYPE_CHECKING: + from .track import Track + + +class Radio(Resource): + """ + Async counterpart to :class:`deezer.resources.Radio`. + """ + + id: int + title: str + description: str + share: str + picture: str + picture_small: str + picture_medium: str + picture_big: str + picture_xl: str + tracklist: str + md5_image: str + + async def get_tracks(self) -> list[Track]: + """ + Get first 40 tracks in the radio. This endpoint is not paginated. + """ + return await self.get_relation("tracks") diff --git a/src/async_deezer/resources/resource.py b/src/async_deezer/resources/resource.py new file mode 100644 index 000000000..082f1376f --- /dev/null +++ b/src/async_deezer/resources/resource.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +import datetime as dt +from typing import Any + +from async_deezer.pagination import AsyncPaginatedList +from deezer.resources import Resource as SyncResource + + +class Resource: + """ + Async base class for any Deezer resource. + + This mirrors :class:`deezer.resources.Resource`, but relation helpers and + ``get`` are implemented as async methods, and pagination returns + :class:`async_deezer.pagination.AsyncPaginatedList`. + """ + + id: int + type: str + + _fields: tuple[str, ...] + + def __init__(self, client, json): + self.client = client + for field_name in json.keys(): + parse_func = getattr(self, f"_parse_{field_name}", None) + if callable(parse_func): + json[field_name] = parse_func(json[field_name]) + self._fields = tuple(json.keys()) + for key in json: + setattr(self, key, json[key]) + + def __repr__(self): + name = getattr(self, "name", None) + title = getattr(self, "title", None) + id_ = getattr(self, "id", None) + return f"<{self.__class__.__name__}: {name or title or id_}>" + + def as_dict(self) -> dict[str, Any]: + """Convert resource to dictionary.""" + result = {} + for key in self._fields: + value = getattr(self, key) + if isinstance(value, list): + value = [i.as_dict() if isinstance(i, Resource) else i for i in value] + elif isinstance(value, Resource): + value = value.as_dict() + elif isinstance(value, dt.datetime): + value = f"{value:%Y-%m-%d %H:%M:%S}" + elif isinstance(value, dt.date): + value = value.isoformat() + result[key] = value + return result + + async def get_relation( + self, + relation: str, + resource_type: type[Resource] | None = None, # type: ignore[valid-type] + params: dict | None = None, + fwd_parent: bool = True, + ): + """ + Generic async method to load the relation from any resource. + """ + return await self.client.request( + "GET", + f"{self.type}/{self.id}/{relation}", + parent=self if fwd_parent else None, + resource_type=resource_type, + params=params, + ) + + async def post_relation(self, relation: str, params: dict): + """ + Generic async method to make a POST request to a relation. + """ + return await self.client.request( + "POST", + f"{self.type}/{self.id}/{relation}", + params=params, + ) + + async def delete_relation(self, relation: str, params: dict | None = None): + """ + Generic async method to make a DELETE request to a relation. + """ + return await self.client.request( + "DELETE", + f"{self.type}/{self.id}/{relation}", + params=params, + ) + + def get_paginated_list( + self, + relation: str, + params: dict | None = None, + ) -> AsyncPaginatedList: + """ + Build the async pagination object based on the relation. + + Note that constructing the pagination object is synchronous + network I/O only occurs when you iterate it or use its async helpers. + """ + return AsyncPaginatedList( + client=self.client, + base_path=f"{self.type}/{self.id}/{relation}", + parent=self, + params=params, + ) + + def _infer_missing_field(self, item: str) -> Any: + """ + Hook to infer missing field values, delegating to the sync base class + to keep behavior consistent (e.g. inferred ``link`` / ``share``). + """ + return SyncResource._infer_missing_field(self, item) + + def __getattr__(self, item: str) -> Any: + """ + Fallback attribute access mirroring the synchronous Resource behavior. + """ + return SyncResource.__getattr__(self, item) + + async def get(self): + """Get the resource from the API.""" + return await self.client.request("GET", f"{self.type}/{self.id}") diff --git a/src/async_deezer/resources/track.py b/src/async_deezer/resources/track.py new file mode 100644 index 000000000..572d4f2fa --- /dev/null +++ b/src/async_deezer/resources/track.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import datetime as dt +from typing import TYPE_CHECKING + +from deezer.dates import parse_date + +from .artist import Artist +from .resource import Resource + +if TYPE_CHECKING: + from .album import Album + + +class Track(Resource): + """ + Async counterpart to :class:`deezer.resources.Track`. + """ + + id: int + readable: bool + title: str + title_short: str + title_version: str + unseen: bool + isrc: str + link: str + share: str + duration: int + track_position: int + disk_number: int + rank: int + release_date: dt.date + explicit_lyrics: bool + explicit_content_lyrics: int + explicit_content_cover: int + preview: str + bpm: float + gain: float + available_countries: list[str] + alternative: Track + contributors: list[Artist] + md5_image: str + artist: Artist + album: Album + + _parse_release_date = staticmethod(parse_date) + + def _parse_contributors(self, raw_value): + return [Artist(client=self.client, json=val) for val in raw_value] + + async def get_artist(self) -> Artist: + """Get the artist of the track.""" + return await self.client.get_artist(self.artist.id) + + async def get_album(self) -> Album: + """Get the album of the track.""" + return await self.client.get_album(self.album.id) diff --git a/src/async_deezer/resources/user.py b/src/async_deezer/resources/user.py new file mode 100644 index 000000000..cb7fff138 --- /dev/null +++ b/src/async_deezer/resources/user.py @@ -0,0 +1,117 @@ +from __future__ import annotations + +import datetime as dt +from typing import TYPE_CHECKING + +from deezer.dates import parse_date +from deezer.utils import get_id + +from .resource import Resource + +if TYPE_CHECKING: + from async_deezer.pagination import AsyncPaginatedList + + from .album import Album + from .artist import Artist + from .playlist import Playlist + from .track import Track + + +class User(Resource): + """ + Async counterpart to :class:`deezer.resources.User`. + """ + + id: int + name: str + lastname: str | None + firstname: str | None + email: str | None + status: int | None + birthday: dt.date | None + inscription_date: dt.date + gender: str | None + link: str + picture: str + picture_small: str + picture_medium: str + picture_big: str + picture_xl: str + country: str + lang: str | None + is_kid: bool | None + explicit_content_level: str | None + explicit_content_levels_available: list[str] | None + tracklist: str + + _parse_birthday = staticmethod(parse_date) + _parse_inscription_date = staticmethod(parse_date) + + def get_albums(self, **params) -> AsyncPaginatedList[Album]: + """Get user's favorite albums.""" + return self.get_paginated_list("albums", params=params or None) + + async def add_album(self, album: Album | int): + """Add an album to user's favorite albums.""" + return await self.post_relation("albums", params={"album_id": get_id(album)}) + + async def remove_album(self, album: Album | int): + """Remove an album from user's favorite albums.""" + return await self.delete_relation("albums", params={"album_id": get_id(album)}) + + def get_tracks(self, **kwargs) -> AsyncPaginatedList[Track]: + """Get user's favorite tracks.""" + return self.get_paginated_list("tracks", params=kwargs or None) + + async def add_track(self, track: Track | int): + """Add a track to user's favorite tracks.""" + return await self.post_relation("tracks", params={"track_id": get_id(track)}) + + async def remove_track(self, track: Track | int): + """Remove a track from user's favorite tracks.""" + return await self.delete_relation("tracks", params={"track_id": get_id(track)}) + + def get_artists(self, **params) -> AsyncPaginatedList[Artist]: + """Get user's favorite artists.""" + return self.get_paginated_list("artists", params=params or None) + + async def add_artist(self, artist: Artist | int): + """Add an artist to user's favorite artists.""" + return await self.post_relation("artists", params={"artist_id": get_id(artist)}) + + async def remove_artist(self, artist: Artist | int): + """Remove an artist from user's favorite artists.""" + return await self.delete_relation("artists", params={"artist_id": get_id(artist)}) + + def get_followers(self, **params) -> AsyncPaginatedList[User]: + """Get user's followers.""" + return self.get_paginated_list("followers", params=params or None) + + def get_followings(self, **params) -> AsyncPaginatedList[User]: + """Get user's followings.""" + return self.get_paginated_list("followings", params=params or None) + + async def follow(self, user: User | int): + """Follow a user.""" + return await self.post_relation("followings", params={"user_id": get_id(user)}) + + async def unfollow(self, user: User | int): + """Unfollow a user.""" + return await self.delete_relation("followings", params={"user_id": get_id(user)}) + + def get_playlists(self, **params) -> AsyncPaginatedList[Playlist]: + """Get user's public playlists.""" + return self.get_paginated_list("playlists", params=params or None) + + async def add_playlist(self, playlist: Playlist | int): + """Add a playlist to user's public playlists.""" + return await self.post_relation("playlists", params={"playlist_id": get_id(playlist)}) + + async def remove_playlist(self, playlist: Playlist | int): + """Remove a playlist from user's public playlists.""" + return await self.delete_relation("playlists", params={"playlist_id": get_id(playlist)}) + + async def create_playlist(self, title: str) -> int: + """Create a playlist and return its ID.""" + result = await self.post_relation("playlists", params={"title": title}) + return result.id