supporting-blog-content/spotify-to-elasticsearch/python/main.py (157 lines of code) (raw):

import logging from pathlib import Path import typer from datetime import datetime import json from rich.logging import RichHandler from rich.console import Console from rich.progress import ( Progress, SpinnerColumn, BarColumn, TaskProgressColumn, TimeElapsedColumn, ) from services import SpotifyService, ElasticsearchService from models import SpotifyTrack logger = None def try_parsing_date(text): """Attempt to parse a date""" for fmt in ("%Y-%m-%dT%H:%M:%SZ", "%Y-%m-%dT%H:%M:%S.%fZ"): try: return datetime.strptime(text, fmt) except ValueError: logger.error(f"Error parsing date: {text}") pass def process_history_file( file_path: str, spotify_svc: SpotifyService, es_svc: ElasticsearchService, user_name: str, ): """Main processing function""" # Set up rich logging logging.basicConfig( level=logging.INFO, format="%(message)s", handlers=[RichHandler(rich_tracebacks=True)], ) logger = logging.getLogger(__name__) console = Console() with open(file_path) as f: history = json.load(f) console.print(f"[green]Processing {file_path}") documents = [] with Progress( SpinnerColumn(), "[progress.description]{task.description}", BarColumn(), TaskProgressColumn(), TimeElapsedColumn(), ) as progress: task = progress.add_task("[cyan]Processing tracks...", total=len(history)) total_entries = len(history) batch_size = 50 for i in range(0, total_entries, batch_size): entries_batch = history[i : i + batch_size] metadata_batch = spotify_svc.get_tracks_metadata(entries_batch) for entry in entries_batch: try: # let's make sure to only look at songs # we do not support videos, podcats or # anything else yet. if entry["spotify_track_uri"] is not None and entry[ "spotify_track_uri" ].startswith("spotify:track:"): track_id = entry["spotify_track_uri"].replace( "spotify:track:", "" ) metadata = metadata_batch.get(track_id, None) played_at = try_parsing_date(entry["ts"]) if metadata is not None: documents.append( SpotifyTrack( id=str( int( ( played_at - datetime(1970, 1, 1) ).total_seconds() ) ) + "_" + entry["master_metadata_album_artist_name"], artist=[ artist["name"] for artist in metadata["artists"] ], album=metadata["album"]["name"], country=entry["conn_country"], duration=metadata["duration_ms"], explicit=metadata["explicit"], listened_to_pct=( entry["ms_played"] / metadata["duration_ms"] if metadata["duration_ms"] > 0 else None ), listened_to_ms=entry["ms_played"], ip=entry["ip_addr"], reason_start=entry["reason_start"], reason_end=entry["reason_end"], shuffle=entry["shuffle"], skipped=entry["skipped"], offline=entry["offline"], title=metadata["name"], platform=entry["platform"], played_at=played_at, spotify_metadata=metadata, hourOfDay=played_at.hour, dayOfWeek=played_at.strftime("%A"), url=metadata["external_urls"]["spotify"], user=user_name, ) ) else: console.print(f"[red]Metadata not found for track: {entry}") if len(documents) >= 500: console.print( f"[green]Indexing batch of tracks... {len(documents)}" ) es_svc.bulk_index(documents) documents = [] progress.advance(task) except Exception as e: logger.error(f"Error processing track: {e}") spotify_svc.metadata_cache.save_cache() raise if documents: console.print(f"[green]Indexing final batch of tracks... {len(documents)}") es_svc.bulk_index(documents) console.print(f"[green]Done! {file_path} processed!") spotify_svc.metadata_cache.save_cache() app = typer.Typer() @app.command() def process_history( es_url: str = typer.Option(..., help="Elasticsearch URL"), es_api_key: str = typer.Option(..., help="Elasticsearch API Key"), spotify_client_id: str = typer.Option(None, help="Spotify Client ID"), spotify_client_secret: str = typer.Option(None, help="Spotify Client Secret"), user_name: str = typer.Option(None, help="User name"), ): """Setup the services""" if spotify_client_id and spotify_client_secret: spotify_svc = SpotifyService( client_id=spotify_client_id, client_secret=spotify_client_secret, redirect_uri="http://localhost:9100", ) es_svc = ElasticsearchService(es_url=es_url, api_key=es_api_key) # Ensure index exists es_svc.check_index() es_svc.check_pipeline() files = list(Path("to_read").glob("*Audio*.json")) if not files: raise ValueError( "No JSON files found in 'to_read' directory, expected them to be named *Audio*.json, like Streaming_History_Audio_2023_8.json" ) else: for file_path in files: process_history_file(file_path, spotify_svc, es_svc, user_name) move_file(file_path) def move_file(file_path: Path): """Move the file to the 'processed' directory""" processed_dir = Path("processed") processed_dir.mkdir(exist_ok=True) new_path = Path("processed") / file_path.name file_path.rename(new_path) if __name__ == "__main__": app()