import asyncio import os from datetime import datetime from sqlalchemy.orm import Session from .models import Track, PlayHistory, Artist from .database import SessionLocal from .services.spotify_client import SpotifyClient from .services.reccobeats_client import ReccoBeatsClient from .services.genius_client import GeniusClient from dateutil import parser # Initialize Clients def get_spotify_client(): return SpotifyClient( client_id=os.getenv("SPOTIFY_CLIENT_ID"), client_secret=os.getenv("SPOTIFY_CLIENT_SECRET"), refresh_token=os.getenv("SPOTIFY_REFRESH_TOKEN"), ) def get_reccobeats_client(): return ReccoBeatsClient() def get_genius_client(): return GeniusClient() async def ensure_artists_exist(db: Session, artists_data: list): """ Ensures that all artists in the list exist in the Artist table. """ artist_objects = [] for a_data in artists_data: artist_id = a_data["id"] artist = db.query(Artist).filter(Artist.id == artist_id).first() if not artist: # Check if image is available in this payload (rare for track-linked artists, but possible) img = None if "images" in a_data and a_data["images"]: img = a_data["images"][0]["url"] artist = Artist( id=artist_id, name=a_data["name"], genres=[], image_url=img ) db.add(artist) artist_objects.append(artist) return artist_objects async def enrich_tracks(db: Session, spotify_client: SpotifyClient, recco_client: ReccoBeatsClient, genius_client: GeniusClient): """ Enrichment Pipeline: 1. Audio Features (ReccoBeats) 2. Artist Metadata: Genres & Images (Spotify) 3. Lyrics & Fallback Images (Genius) """ # 1. Enrich Audio Features tracks_missing_features = db.query(Track).filter(Track.danceability == None).limit(50).all() if tracks_missing_features: print(f"Enriching {len(tracks_missing_features)} tracks with audio features...") ids = [t.id for t in tracks_missing_features] features_list = await recco_client.get_audio_features(ids) # Map features by ID features_map = {} for f in features_list: # Handle potential ID mismatch or URI format tid = f.get("id") if tid: features_map[tid] = f for track in tracks_missing_features: data = features_map.get(track.id) if data: track.danceability = data.get("danceability") track.energy = data.get("energy") track.key = data.get("key") track.loudness = data.get("loudness") track.mode = data.get("mode") track.speechiness = data.get("speechiness") track.acousticness = data.get("acousticness") track.instrumentalness = data.get("instrumentalness") track.liveness = data.get("liveness") track.valence = data.get("valence") track.tempo = data.get("tempo") db.commit() # 2. Enrich Artist Genres & Images (Spotify) artists_missing_data = db.query(Artist).filter((Artist.genres == None) | (Artist.image_url == None)).limit(50).all() if artists_missing_data: print(f"Enriching {len(artists_missing_data)} artists with genres/images...") artist_ids_list = [a.id for a in artists_missing_data] artist_data_map = {} for i in range(0, len(artist_ids_list), 50): chunk = artist_ids_list[i:i+50] artists_data = await spotify_client.get_artists(chunk) for a_data in artists_data: if a_data: img = a_data["images"][0]["url"] if a_data.get("images") else None artist_data_map[a_data["id"]] = { "genres": a_data.get("genres", []), "image_url": img } for artist in artists_missing_data: data = artist_data_map.get(artist.id) if data: if artist.genres is None: artist.genres = data["genres"] if artist.image_url is None: artist.image_url = data["image_url"] elif artist.genres is None: artist.genres = [] # Prevent retry loop db.commit() # 3. Enrich Lyrics (Genius) # Only fetch for tracks that have been played recently to avoid spamming Genius API tracks_missing_lyrics = db.query(Track).filter(Track.lyrics == None).order_by(Track.updated_at.desc()).limit(10).all() if tracks_missing_lyrics and genius_client.genius: print(f"Enriching {len(tracks_missing_lyrics)} tracks with lyrics (Genius)...") for track in tracks_missing_lyrics: # We need the primary artist name artist_name = track.artist.split(",")[0] # Heuristic: take first artist print(f"Searching Genius for: {track.name} by {artist_name}") data = genius_client.search_song(track.name, artist_name) if data: track.lyrics = data["lyrics"] # Fallback: if we didn't get high-res art from Spotify, use Genius if not track.image_url and data.get("image_url"): track.image_url = data["image_url"] else: track.lyrics = "" # Mark as empty to prevent retry loop # Small sleep to be nice to API? GeniusClient is synchronous. # We are in async function but GeniusClient is blocking. It's fine for worker. db.commit() async def ingest_recently_played(db: Session): spotify_client = get_spotify_client() recco_client = get_reccobeats_client() genius_client = get_genius_client() try: items = await spotify_client.get_recently_played(limit=50) except Exception as e: print(f"Error connecting to Spotify: {e}") return print(f"Fetched {len(items)} items from Spotify.") for item in items: track_data = item["track"] played_at_str = item["played_at"] played_at = parser.isoparse(played_at_str) track_id = track_data["id"] track = db.query(Track).filter(Track.id == track_id).first() if not track: print(f"New track found: {track_data['name']}") # Extract Album Art image_url = None if track_data.get("album") and track_data["album"].get("images"): image_url = track_data["album"]["images"][0]["url"] track = Track( id=track_id, name=track_data["name"], artist=", ".join([a["name"] for a in track_data["artists"]]), album=track_data["album"]["name"], image_url=image_url, duration_ms=track_data["duration_ms"], popularity=track_data["popularity"], raw_data=track_data ) # Handle Artists Relation artists_data = track_data.get("artists", []) artist_objects = await ensure_artists_exist(db, artists_data) track.artists = artist_objects db.add(track) db.commit() # Ensure relationships exist logic... if not track.artists and track.raw_data and "artists" in track.raw_data: artist_objects = await ensure_artists_exist(db, track.raw_data["artists"]) track.artists = artist_objects db.commit() exists = db.query(PlayHistory).filter( PlayHistory.track_id == track_id, PlayHistory.played_at == played_at ).first() if not exists: print(f" recording play: {track_data['name']} at {played_at}") play = PlayHistory( track_id=track_id, played_at=played_at, context_uri=item.get("context", {}).get("uri") if item.get("context") else None ) db.add(play) db.commit() # Enrich await enrich_tracks(db, spotify_client, recco_client, genius_client) async def run_worker(): """Simulates a background worker loop.""" db = SessionLocal() try: while True: print("Worker: Polling Spotify...") await ingest_recently_played(db) print("Worker: Sleeping for 60 seconds...") await asyncio.sleep(60) except Exception as e: print(f"Worker crashed: {e}") finally: db.close()