supporting-blog-content/spotify-to-elasticsearch/python/services.py (151 lines of code) (raw):

from pathlib import Path import json from typing import Dict, List from elasticsearch import Elasticsearch, helpers, exceptions from rich.console import Console import spotipy from spotipy.oauth2 import SpotifyOAuth from models import SpotifyTrack console = Console() class MetadataCache: def __init__(self, cache_file: Path = Path("metadata_cache.json")): self.cache_file = cache_file self.cache = self._load_cache() def _load_cache(self) -> Dict: if self.cache_file.exists(): return json.loads(self.cache_file.read_text()) return {} def save_cache(self): console.print(f"[green] Saving cache to disk {len(self.cache)}") self.cache_file.write_text(json.dumps(self.cache)) class SpotifyService: def __init__(self, client_id: str, client_secret: str, redirect_uri: str): self.client = spotipy.Spotify( auth_manager=SpotifyOAuth( client_id=client_id, client_secret=client_secret, redirect_uri=redirect_uri, ), requests_timeout=30, ) self.metadata_cache = MetadataCache() def get_tracks_metadata(self, track_ids: str) -> Dict: metadatas = {} to_fetch = [] for track_id in track_ids: if track_id["spotify_track_uri"] is not None and track_id[ "spotify_track_uri" ].startswith("spotify:track:"): track_id = track_id["spotify_track_uri"].replace("spotify:track:", "") if self.metadata_cache.cache.get(track_id, None) is not None: metadatas[track_id] = self.metadata_cache.cache[track_id] else: to_fetch.append(track_id) if len(to_fetch) > 0: spotify_answer = self.client.tracks(to_fetch) if spotify_answer["tracks"] is not None: # Spotify can be a bit annoying and send back an Array that has `None` in it. spotify_answer["tracks"] = [ t for t in spotify_answer["tracks"] if t is not None ] if len(to_fetch) != len(spotify_answer["tracks"]): for missing_id in set(to_fetch) - { t["id"] for t in spotify_answer["tracks"] }: console.print( f"[red] Could not fetch metadata for track id: {missing_id}" ) if len(spotify_answer["tracks"]) > 0: for track in spotify_answer["tracks"]: metadatas[track["id"]] = track self.metadata_cache.cache[track["id"]] = track else: console.print( f"[red] Could not fetch metadata from Spotify. {to_fetch}" ) return metadatas class ElasticsearchService: def __init__(self, es_url: str, api_key: str, index: str = "spotify-history"): self.client = Elasticsearch(hosts=es_url, api_key=api_key, request_timeout=30) self.index = index def check_pipeline(self): pipeline = {"processors": [{"geoip": {"field": "ip", "ignore_failure": True}}]} try: self.client.ingest.put_pipeline(id="spotify", body=pipeline) except exceptions.RequestError as e: console.print( f"[red] Could not ingest the pipeline. Check permissions. {e}" ) def check_index(self): if self.client.indices.exists(index=self.index).body is False: self.client.indices.create( index=self.index, settings={"final_pipeline": "spotify"}, mappings={ "dynamic": "true", "dynamic_date_formats": [ "strict_date_optional_time", "yyyy/MM/dd HH:mm:ss Z||yyyy/MM/dd Z", ], "dynamic_templates": [ { "stringsaskeywords": { "match": "*", "match_mapping_type": "string", "mapping": {"type": "keyword"}, } } ], "date_detection": True, "numeric_detection": False, "properties": { "@timestamp": { "type": "date", "format": "strict_date_optional_time", }, "album": {"type": "keyword"}, "artist": {"type": "keyword"}, "dayOfWeek": {"type": "keyword"}, "duration": {"type": "long"}, "explicit": {"type": "boolean"}, "geoip": { "properties": { "city_name": {"type": "keyword"}, "continent_name": {"type": "keyword"}, "country_iso_code": {"type": "keyword"}, "country_name": {"type": "keyword"}, "location": {"type": "geo_point"}, "region_iso_code": {"type": "keyword"}, "region_name": {"type": "keyword"}, } }, "hourOfDay": {"type": "long"}, "ip": {"type": "ip"}, "not_found": {"type": "boolean"}, "offline": {"type": "boolean"}, "platform": {"type": "keyword"}, "played_at": { "type": "date", "format": "strict_date_optional_time", }, "reason_end": {"type": "keyword"}, "reason_start": {"type": "keyword"}, "skipped": {"type": "boolean"}, "title": {"type": "keyword"}, "url": {"type": "keyword"}, "user": {"type": "keyword"}, }, }, ) def bulk_index(self, documents: List[SpotifyTrack]): actions = [ { "_index": self.index, "_id": doc.id, "_source": {**vars(doc), "@timestamp": doc.played_at}, } for doc in documents ] return helpers.bulk( self.client, actions, )