Files
MusicAnalyser/backend/app/ingest.py
google-labs-jules[bot] 6e80e97960 Implement Phase 2 Frontend with Ant Design and verify Data Ingestion
- Created `frontend/` React+Vite app using Ant Design (Dark Theme).
- Implemented `App.jsx` to display listening history and calculated "Vibes".
- Updated `backend/app/ingest.py` to fix ReccoBeats ID parsing.
- Updated `backend/app/schemas.py` to expose audio features to the API.
- Updated `README.md` with detailed Docker hosting instructions.
- Added `TODO.md` for Phase 3 roadmap.
- Cleaned up test scripts.
2025-12-24 22:51:53 +00:00

173 lines
5.9 KiB
Python

import asyncio
import os
from datetime import datetime
from sqlalchemy.orm import Session
from .models import Track, PlayHistory
from .database import SessionLocal
from .services.spotify_client import SpotifyClient
from .services.reccobeats_client import ReccoBeatsClient
from dateutil import parser
# Initialize Spotify Client (env vars will be populated later)
def get_spotify_client():
return SpotifyClient(
client_id=os.getenv("SPOTIFY_CLIENT_ID"),
client_secret=os.getenv("SPOTIFY_CLIENT_SECRET"),
refresh_token=os.getenv("SPOTIFY_REFRESH_TOKEN"),
)
def get_reccobeats_client():
return ReccoBeatsClient()
async def enrich_tracks(db: Session, spotify_client: SpotifyClient, recco_client: ReccoBeatsClient):
"""
Finds tracks missing genres (Spotify) or audio features (ReccoBeats) and enriches them.
"""
# 1. Enrich Audio Features (via ReccoBeats)
tracks_missing_features = db.query(Track).filter(Track.danceability == None).limit(50).all()
print(f"DEBUG: Found {len(tracks_missing_features)} tracks missing audio features.")
if tracks_missing_features:
print(f"Enriching {len(tracks_missing_features)} tracks with audio features (ReccoBeats)...")
ids = [t.id for t in tracks_missing_features]
features_list = await recco_client.get_audio_features(ids)
features_map = {}
for f in features_list:
tid = f.get("id")
if not tid and "href" in f:
if "tracks/" in f["href"]:
tid = f["href"].split("tracks/")[1].split("?")[0]
elif "track/" in f["href"]:
tid = f["href"].split("track/")[1].split("?")[0]
if tid:
features_map[tid] = f
updated_count = 0
for track in tracks_missing_features:
data = features_map.get(track.id)
if data:
track.danceability = data.get("danceability")
track.energy = data.get("energy")
track.key = data.get("key")
track.loudness = data.get("loudness")
track.mode = data.get("mode")
track.speechiness = data.get("speechiness")
track.acousticness = data.get("acousticness")
track.instrumentalness = data.get("instrumentalness")
track.liveness = data.get("liveness")
track.valence = data.get("valence")
track.tempo = data.get("tempo")
updated_count += 1
print(f"Updated {updated_count} tracks with audio features.")
db.commit()
# 2. Enrich Genres (via Spotify Artists)
tracks_missing_genres = db.query(Track).filter(Track.genres == None).limit(50).all()
if tracks_missing_genres:
print(f"Enriching {len(tracks_missing_genres)} tracks with genres (Spotify)...")
artist_ids = set()
track_artist_map = {}
for t in tracks_missing_genres:
if t.raw_data and "artists" in t.raw_data:
a_ids = [a["id"] for a in t.raw_data["artists"]]
artist_ids.update(a_ids)
track_artist_map[t.id] = a_ids
artist_ids_list = list(artist_ids)
artist_genre_map = {}
for i in range(0, len(artist_ids_list), 50):
chunk = artist_ids_list[i:i+50]
artists_data = await spotify_client.get_artists(chunk)
for a_data in artists_data:
if a_data:
artist_genre_map[a_data["id"]] = a_data.get("genres", [])
for t in tracks_missing_genres:
a_ids = track_artist_map.get(t.id, [])
combined_genres = set()
for a_id in a_ids:
genres = artist_genre_map.get(a_id, [])
combined_genres.update(genres)
t.genres = list(combined_genres)
db.commit()
async def ingest_recently_played(db: Session):
spotify_client = get_spotify_client()
recco_client = get_reccobeats_client()
try:
items = await spotify_client.get_recently_played(limit=50)
except Exception as e:
print(f"Error connecting to Spotify: {e}")
return
print(f"Fetched {len(items)} items from Spotify.")
for item in items:
track_data = item["track"]
played_at_str = item["played_at"]
played_at = parser.isoparse(played_at_str)
track_id = track_data["id"]
track = db.query(Track).filter(Track.id == track_id).first()
if not track:
print(f"New track found: {track_data['name']}")
track = Track(
id=track_id,
name=track_data["name"],
artist=", ".join([a["name"] for a in track_data["artists"]]),
album=track_data["album"]["name"],
duration_ms=track_data["duration_ms"],
popularity=track_data["popularity"],
raw_data=track_data
)
db.add(track)
db.commit()
exists = db.query(PlayHistory).filter(
PlayHistory.track_id == track_id,
PlayHistory.played_at == played_at
).first()
if not exists:
print(f" recording play: {track_data['name']} at {played_at}")
play = PlayHistory(
track_id=track_id,
played_at=played_at,
context_uri=item.get("context", {}).get("uri") if item.get("context") else None
)
db.add(play)
db.commit()
# Enrich
await enrich_tracks(db, spotify_client, recco_client)
async def run_worker():
"""Simulates a background worker loop."""
db = SessionLocal()
try:
while True:
print("Worker: Polling Spotify...")
await ingest_recently_played(db)
print("Worker: Sleeping for 60 seconds...")
await asyncio.sleep(60)
except Exception as e:
print(f"Worker crashed: {e}")
finally:
db.close()