Files
MusicAnalyser/backend/app/ingest.py
bnair123 93e7c13f3d feat: implement AI-curated playlist service and dashboard integration
- Added hierarchical AGENTS.md knowledge base
- Implemented PlaylistService with 6h themed and 24h devotion mix logic
- Integrated AI theme generation for 6h playlists via Gemini/OpenAI
- Added /playlists/refresh and metadata endpoints to API
- Updated background worker with scheduled playlist curation
- Created frontend PlaylistsSection, Tooltip components and integrated into Dashboard
- Added Alembic migration for playlist tracking columns
- Fixed Docker healthcheck with curl installation
2025-12-30 09:45:19 +04:00

429 lines
14 KiB
Python

from .services.stats_service import StatsService
from .services.narrative_service import NarrativeService
from .services.playlist_service import PlaylistService
import asyncio
import os
import time
from datetime import datetime, timedelta
from sqlalchemy.orm import Session
from .models import Track, PlayHistory, Artist, AnalysisSnapshot
from .database import SessionLocal
from .services.spotify_client import SpotifyClient
from .services.reccobeats_client import ReccoBeatsClient
from .services.genius_client import GeniusClient
from dateutil import parser
class PlaybackTracker:
def __init__(self):
self.current_track_id = None
self.track_start_time = None
self.accumulated_listen_ms = 0
self.last_progress_ms = 0
self.last_poll_time = None
self.is_paused = False
def get_spotify_client():
return SpotifyClient(
client_id=str(os.getenv("SPOTIFY_CLIENT_ID") or ""),
client_secret=str(os.getenv("SPOTIFY_CLIENT_SECRET") or ""),
refresh_token=str(os.getenv("SPOTIFY_REFRESH_TOKEN") or ""),
)
def get_reccobeats_client():
return ReccoBeatsClient()
def get_genius_client():
return GeniusClient()
async def ensure_artists_exist(db: Session, artists_data: list):
artist_objects = []
for a_data in artists_data:
artist_id = a_data["id"]
artist = db.query(Artist).filter(Artist.id == artist_id).first()
if not artist:
img = None
if "images" in a_data and a_data["images"]:
img = a_data["images"][0]["url"]
artist = Artist(id=artist_id, name=a_data["name"], genres=[], image_url=img)
db.add(artist)
artist_objects.append(artist)
return artist_objects
async def enrich_tracks(
db: Session,
spotify_client: SpotifyClient,
recco_client: ReccoBeatsClient,
genius_client: GeniusClient,
):
tracks_missing_features = (
db.query(Track).filter(Track.danceability == None).limit(50).all()
)
if tracks_missing_features:
print(f"Enriching {len(tracks_missing_features)} tracks with audio features...")
ids = [str(t.id) for t in tracks_missing_features]
features_list = await recco_client.get_audio_features(ids)
features_map = {}
for f in features_list:
tid = f.get("spotify_id") or f.get("id")
if tid:
features_map[tid] = f
for track in tracks_missing_features:
data = features_map.get(track.id)
if data:
track.danceability = data.get("danceability")
track.energy = data.get("energy")
track.key = data.get("key")
track.loudness = data.get("loudness")
track.mode = data.get("mode")
track.speechiness = data.get("speechiness")
track.acousticness = data.get("acousticness")
track.instrumentalness = data.get("instrumentalness")
track.liveness = data.get("liveness")
track.valence = data.get("valence")
track.tempo = data.get("tempo")
db.commit()
artists_missing_data = (
db.query(Artist)
.filter((Artist.genres == None) | (Artist.image_url == None))
.limit(50)
.all()
)
if artists_missing_data:
print(f"Enriching {len(artists_missing_data)} artists with genres/images...")
artist_ids_list = [str(a.id) for a in artists_missing_data]
artist_data_map = {}
for i in range(0, len(artist_ids_list), 50):
chunk = artist_ids_list[i : i + 50]
artists_data = await spotify_client.get_artists(chunk)
for a_data in artists_data:
if a_data:
img = a_data["images"][0]["url"] if a_data.get("images") else None
artist_data_map[a_data["id"]] = {
"genres": a_data.get("genres", []),
"image_url": img,
}
for artist in artists_missing_data:
data = artist_data_map.get(artist.id)
if data:
if artist.genres is None:
artist.genres = data["genres"]
if artist.image_url is None:
artist.image_url = data["image_url"]
elif artist.genres is None:
artist.genres = []
db.commit()
tracks_missing_lyrics = (
db.query(Track)
.filter(Track.lyrics == None)
.order_by(Track.updated_at.desc())
.limit(10)
.all()
)
if tracks_missing_lyrics and genius_client.genius:
print(f"Enriching {len(tracks_missing_lyrics)} tracks with lyrics (Genius)...")
for track in tracks_missing_lyrics:
artist_name = str(track.artist).split(",")[0]
print(f"Searching Genius for: {track.name} by {artist_name}")
data = genius_client.search_song(str(track.name), artist_name)
if data:
track.lyrics = data["lyrics"]
if not track.image_url and data.get("image_url"):
track.image_url = data["image_url"]
else:
track.lyrics = ""
db.commit()
async def ingest_recently_played(db: Session):
spotify_client = get_spotify_client()
recco_client = get_reccobeats_client()
genius_client = get_genius_client()
try:
items = await spotify_client.get_recently_played(limit=50)
except Exception as e:
print(f"Error connecting to Spotify: {e}")
return
print(f"Fetched {len(items)} items from Spotify.")
for item in items:
track_data = item["track"]
played_at_str = item["played_at"]
played_at = parser.isoparse(played_at_str)
track_id = track_data["id"]
track = db.query(Track).filter(Track.id == track_id).first()
if not track:
print(f"New track found: {track_data['name']}")
image_url = None
if track_data.get("album") and track_data["album"].get("images"):
image_url = track_data["album"]["images"][0]["url"]
track = Track(
id=track_id,
name=track_data["name"],
artist=", ".join([a["name"] for a in track_data["artists"]]),
album=track_data["album"]["name"],
image_url=image_url,
duration_ms=track_data["duration_ms"],
popularity=track_data["popularity"],
raw_data=track_data,
)
artists_data = track_data.get("artists", [])
artist_objects = await ensure_artists_exist(db, artists_data)
track.artists = artist_objects
db.add(track)
db.commit()
if not track.artists and track.raw_data and "artists" in track.raw_data:
artist_objects = await ensure_artists_exist(db, track.raw_data["artists"])
track.artists = artist_objects
db.commit()
exists = (
db.query(PlayHistory)
.filter(
PlayHistory.track_id == track_id, PlayHistory.played_at == played_at
)
.first()
)
if not exists:
print(f" recording play: {track_data['name']} at {played_at}")
play = PlayHistory(
track_id=track_id,
played_at=played_at,
context_uri=item.get("context", {}).get("uri")
if item.get("context")
else None,
source="recently_played",
)
db.add(play)
db.commit()
await enrich_tracks(db, spotify_client, recco_client, genius_client)
async def run_worker():
db = SessionLocal()
tracker = PlaybackTracker()
spotify_client = get_spotify_client()
playlist_service = PlaylistService(
db=db,
spotify_client=spotify_client,
recco_client=get_reccobeats_client(),
narrative_service=NarrativeService(),
)
poll_count = 0
last_6h_refresh = 0
last_daily_refresh = 0
try:
while True:
poll_count += 1
now = datetime.utcnow()
await poll_currently_playing(db, spotify_client, tracker)
if poll_count % 4 == 0:
print("Worker: Polling recently-played...")
await ingest_recently_played(db)
current_hour = now.hour
if current_hour in [3, 9, 15, 21] and (
time.time() - last_6h_refresh > 3600
):
print(f"Worker: Triggering 6-hour playlist refresh at {now}")
try:
await playlist_service.curate_six_hour_playlist(
now - timedelta(hours=6), now
)
last_6h_refresh = time.time()
except Exception as e:
print(f"6h Refresh Error: {e}")
if current_hour == 4 and (time.time() - last_daily_refresh > 80000):
print(
f"Worker: Triggering daily playlist refresh and analysis at {now}"
)
try:
stats_service = StatsService(db)
stats_json = stats_service.generate_full_report(
now - timedelta(days=1), now
)
narrative_service = NarrativeService()
narrative_json = narrative_service.generate_full_narrative(
stats_json
)
snapshot = AnalysisSnapshot(
period_start=now - timedelta(days=1),
period_end=now,
period_label="daily_auto",
metrics_payload=stats_json,
narrative_report=narrative_json,
)
db.add(snapshot)
db.commit()
await playlist_service.curate_daily_playlist(
now - timedelta(days=1), now
)
last_daily_refresh = time.time()
except Exception as e:
print(f"Daily Refresh Error: {e}")
await asyncio.sleep(15)
except Exception as e:
print(f"Worker crashed: {e}")
finally:
db.close()
async def poll_currently_playing(
db: Session, spotify_client: SpotifyClient, tracker: PlaybackTracker
):
try:
response = await spotify_client.get_currently_playing()
except Exception as e:
print(f"Error polling currently-playing: {e}")
return
now = datetime.utcnow()
if not response or response.get("currently_playing_type") != "track":
if tracker.current_track_id and tracker.last_poll_time:
finalize_track(db, tracker)
return
item = response.get("item")
if not item:
return
current_track_id = item["id"]
current_progress_ms = response.get("progress_ms", 0)
is_playing = response.get("is_playing", False)
if current_track_id != tracker.current_track_id:
if tracker.current_track_id and tracker.last_poll_time:
finalize_track(db, tracker)
tracker.current_track_id = current_track_id
tracker.track_start_time = now - timedelta(milliseconds=current_progress_ms)
tracker.accumulated_listen_ms = current_progress_ms if is_playing else 0
tracker.last_progress_ms = current_progress_ms
tracker.last_poll_time = now
tracker.is_paused = not is_playing
await ensure_track_exists(db, item, spotify_client)
else:
if tracker.last_poll_time:
time_delta_ms = (now - tracker.last_poll_time).total_seconds() * 1000
if is_playing and not tracker.is_paused:
tracker.accumulated_listen_ms += time_delta_ms
tracker.last_progress_ms = current_progress_ms
tracker.last_poll_time = now
tracker.is_paused = not is_playing
def finalize_track(db: Session, tracker: PlaybackTracker):
listened_ms = int(tracker.accumulated_listen_ms)
skipped = listened_ms < 30000
if tracker.track_start_time is None:
return
existing = (
db.query(PlayHistory)
.filter(
PlayHistory.track_id == tracker.current_track_id,
PlayHistory.played_at >= tracker.track_start_time - timedelta(seconds=5),
PlayHistory.played_at <= tracker.track_start_time + timedelta(seconds=5),
)
.first()
)
if existing:
if existing.listened_ms is None:
existing.listened_ms = listened_ms
existing.skipped = skipped
existing.source = "currently_playing"
db.commit()
else:
play = PlayHistory(
track_id=tracker.current_track_id,
played_at=tracker.track_start_time,
listened_ms=listened_ms,
skipped=skipped,
source="currently_playing",
)
db.add(play)
db.commit()
print(
f"Finalized: {tracker.current_track_id} listened={listened_ms}ms skipped={skipped}"
)
tracker.current_track_id = None
tracker.track_start_time = None
tracker.accumulated_listen_ms = 0
tracker.last_progress_ms = 0
tracker.last_poll_time = None
tracker.is_paused = False
async def ensure_track_exists(
db: Session, track_data: dict, spotify_client: SpotifyClient
):
track_id = track_data["id"]
track = db.query(Track).filter(Track.id == track_id).first()
if not track:
image_url = None
if track_data.get("album") and track_data["album"].get("images"):
image_url = track_data["album"]["images"][0]["url"]
track = Track(
id=track_id,
name=track_data["name"],
artist=", ".join([a["name"] for a in track_data.get("artists", [])]),
album=track_data.get("album", {}).get("name", "Unknown"),
image_url=image_url,
duration_ms=track_data.get("duration_ms"),
popularity=track_data.get("popularity"),
raw_data=track_data,
)
artists_data = track_data.get("artists", [])
artist_objects = await ensure_artists_exist(db, artists_data)
track.artists = artist_objects
db.add(track)
db.commit()