import asyncio import os from datetime import datetime from sqlalchemy.orm import Session from .models import Track, PlayHistory from .database import SessionLocal from .services.spotify_client import SpotifyClient from dateutil import parser # Initialize Spotify Client (env vars will be populated later) def get_spotify_client(): return SpotifyClient( client_id=os.getenv("SPOTIFY_CLIENT_ID"), client_secret=os.getenv("SPOTIFY_CLIENT_SECRET"), refresh_token=os.getenv("SPOTIFY_REFRESH_TOKEN"), ) async def ingest_recently_played(db: Session): client = get_spotify_client() try: items = await client.get_recently_played(limit=50) except Exception as e: print(f"Error connecting to Spotify: {e}") return print(f"Fetched {len(items)} items from Spotify.") for item in items: track_data = item["track"] played_at_str = item["played_at"] played_at = parser.isoparse(played_at_str) # 1. Check if track exists, if not create it track_id = track_data["id"] track = db.query(Track).filter(Track.id == track_id).first() if not track: print(f"New track found: {track_data['name']}") track = Track( id=track_id, name=track_data["name"], artist=", ".join([a["name"] for a in track_data["artists"]]), album=track_data["album"]["name"], duration_ms=track_data["duration_ms"], popularity=track_data["popularity"], raw_data=track_data ) db.add(track) db.commit() # Commit immediately so ID exists for foreign key # 2. Check if this specific play instance exists # We assume (track_id, played_at) is unique enough exists = db.query(PlayHistory).filter( PlayHistory.track_id == track_id, PlayHistory.played_at == played_at ).first() if not exists: print(f" recording play: {track_data['name']} at {played_at}") play = PlayHistory( track_id=track_id, played_at=played_at, context_uri=item.get("context", {}).get("uri") if item.get("context") else None ) db.add(play) db.commit() async def run_worker(): """Simulates a background worker loop.""" db = SessionLocal() try: while True: print("Worker: Polling Spotify...") await ingest_recently_played(db) print("Worker: Sleeping for 60 seconds...") await asyncio.sleep(60) except Exception as e: print(f"Worker crashed: {e}") finally: db.close()