from sqlalchemy.orm import Session from sqlalchemy import func, distinct, desc from datetime import datetime, timedelta from typing import Dict, Any, List import math import numpy as np from ..models import PlayHistory, Track, Artist, AnalysisSnapshot class StatsService: def __init__(self, db: Session): self.db = db def compute_volume_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ Calculates volume metrics: Total Plays, Unique Tracks, Artists, etc. """ query = self.db.query(PlayHistory).filter( PlayHistory.played_at >= period_start, PlayHistory.played_at <= period_end ) plays = query.all() total_plays = len(plays) if total_plays == 0: return { "total_plays": 0, "estimated_minutes": 0, "unique_tracks": 0, "unique_artists": 0, "unique_albums": 0, "unique_genres": 0, "top_tracks": [], "top_artists": [], "repeat_rate": 0, "concentration": {} } # Calculate Duration (Estimated) # Note: We query tracks to get duration. # Ideally we join, but eager loading might be heavy. Let's do a join or simple loop. # Efficient approach: Get all track IDs from plays, fetch Track objects in bulk map. track_ids = [p.track_id for p in plays] tracks = self.db.query(Track).filter(Track.id.in_(set(track_ids))).all() track_map = {t.id: t for t in tracks} total_ms = 0 unique_track_ids = set() unique_artist_ids = set() unique_album_names = set() # Spotify doesn't give album ID in PlayHistory directly unless joined, track has album name string. # Ideally track has raw_data['album']['id']. unique_album_ids = set() genre_counts = {} # For Top Lists track_play_counts = {} artist_play_counts = {} for p in plays: t = track_map.get(p.track_id) if t: total_ms += t.duration_ms unique_track_ids.add(t.id) # Top Tracks track_play_counts[t.id] = track_play_counts.get(t.id, 0) + 1 # Artists (using relation) # Note: This might cause N+1 query if not eager loaded. # For strictly calculation, accessing t.artists (lazy load) loop might be slow for 1000s of plays. # Optimization: Join PlayHistory -> Track -> Artist in query. # Let's rely on raw_data for speed if relation loading is slow, # OR Assume we accept some latency. # Better: Pre-fetch artist connections or use the new tables properly. # Let's use the object relation for correctness as per plan. for artist in t.artists: unique_artist_ids.add(artist.id) artist_play_counts[artist.id] = artist_play_counts.get(artist.id, 0) + 1 if artist.genres: for g in artist.genres: genre_counts[g] = genre_counts.get(g, 0) + 1 if t.raw_data and "album" in t.raw_data: unique_album_ids.add(t.raw_data["album"]["id"]) else: unique_album_ids.add(t.album) # Fallback estimated_minutes = total_ms / 60000 # Top 5 Tracks sorted_tracks = sorted(track_play_counts.items(), key=lambda x: x[1], reverse=True)[:5] top_tracks = [] for tid, count in sorted_tracks: t = track_map.get(tid) top_tracks.append({ "name": t.name, "artist": t.artist, # Display string "count": count }) # Top 5 Artists # Need to fetch Artist names top_artist_ids = sorted(artist_play_counts.items(), key=lambda x: x[1], reverse=True)[:5] top_artists_objs = self.db.query(Artist).filter(Artist.id.in_([x[0] for x in top_artist_ids])).all() artist_name_map = {a.id: a.name for a in top_artists_objs} top_artists = [] for aid, count in top_artist_ids: top_artists.append({ "name": artist_name_map.get(aid, "Unknown"), "count": count }) # Top Genres sorted_genres = sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[:5] top_genres = [{"name": g, "count": c} for g, c in sorted_genres] # Concentration unique_tracks_count = len(unique_track_ids) repeat_rate = (total_plays - unique_tracks_count) / total_plays if total_plays > 0 else 0 # HHI (Herfindahl–Hirschman Index) # Sum of (share)^2. Share = track_plays / total_plays hhi = sum([(c/total_plays)**2 for c in track_play_counts.values()]) return { "total_plays": total_plays, "estimated_minutes": int(estimated_minutes), "unique_tracks": unique_tracks_count, "unique_artists": len(unique_artist_ids), "unique_albums": len(unique_album_ids), "unique_genres": len(genre_counts), "top_tracks": top_tracks, "top_artists": top_artists, "top_genres": top_genres, "repeat_rate": round(repeat_rate, 3), "concentration": { "hhi": round(hhi, 4), # "gini": ... (skip for now to keep it simple) } } def compute_time_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ Hourly, Daily distribution, etc. """ query = self.db.query(PlayHistory).filter( PlayHistory.played_at >= period_start, PlayHistory.played_at <= period_end ) plays = query.all() hourly_counts = [0] * 24 weekday_counts = [0] * 7 # 0=Mon, 6=Sun if not plays: return {"hourly_distribution": hourly_counts} for p in plays: # played_at is UTC in DB usually. Ensure we handle timezone if user wants local. # For now, assuming UTC or system time. h = p.played_at.hour d = p.played_at.weekday() hourly_counts[h] += 1 weekday_counts[d] += 1 peak_hour = hourly_counts.index(max(hourly_counts)) # Weekend Share weekend_plays = weekday_counts[5] + weekday_counts[6] weekend_share = weekend_plays / len(plays) if len(plays) > 0 else 0 return { "hourly_distribution": hourly_counts, "peak_hour": peak_hour, "weekday_distribution": weekday_counts, "weekend_share": round(weekend_share, 2) } def compute_session_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ Session logic: Gap > 20 mins = new session. """ query = self.db.query(PlayHistory).filter( PlayHistory.played_at >= period_start, PlayHistory.played_at <= period_end ).order_by(PlayHistory.played_at.asc()) plays = query.all() if not plays: return {"count": 0, "avg_length_minutes": 0} sessions = [] current_session = [plays[0]] for i in range(1, len(plays)): prev = plays[i-1] curr = plays[i] diff = (curr.played_at - prev.played_at).total_seconds() / 60 if diff > 20: sessions.append(current_session) current_session = [] current_session.append(curr) sessions.append(current_session) session_lengths_min = [] for sess in sessions: if len(sess) > 1: start = sess[0].played_at end = sess[-1].played_at # Add duration of last track? # Let's just do (end - start) for simplicity + avg track duration duration = (end - start).total_seconds() / 60 session_lengths_min.append(duration) else: session_lengths_min.append(3.0) # Approx 1 track avg_min = sum(session_lengths_min) / len(session_lengths_min) if session_lengths_min else 0 return { "count": len(sessions), "avg_tracks": len(plays) / len(sessions), "avg_minutes": round(avg_min, 1), "longest_session_minutes": round(max(session_lengths_min), 1) if session_lengths_min else 0 } def compute_vibe_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ Aggregates Audio Features (Energy, Valence, etc.) """ query = self.db.query(PlayHistory).filter( PlayHistory.played_at >= period_start, PlayHistory.played_at <= period_end ) plays = query.all() track_ids = list(set([p.track_id for p in plays])) if not track_ids: return {} tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all() # Collect features features = { "energy": [], "valence": [], "danceability": [], "tempo": [], "acousticness": [], "instrumentalness": [], "liveness": [], "speechiness": [] } for t in tracks: # Weight by plays? The spec implies "Per-Period Aggregates". # Usually weighted by play count is better representation of what was HEARD. # Let's weight by play count in this period. play_count = len([p for p in plays if p.track_id == t.id]) if t.energy is not None: for _ in range(play_count): features["energy"].append(t.energy) features["valence"].append(t.valence) features["danceability"].append(t.danceability) features["tempo"].append(t.tempo) features["acousticness"].append(t.acousticness) features["instrumentalness"].append(t.instrumentalness) features["liveness"].append(t.liveness) features["speechiness"].append(t.speechiness) stats = {} for key, values in features.items(): valid = [v for v in values if v is not None] if valid: stats[f"avg_{key}"] = float(np.mean(valid)) stats[f"std_{key}"] = float(np.std(valid)) else: stats[f"avg_{key}"] = None # Derived Metrics if stats.get("avg_energy") and stats.get("avg_valence"): stats["mood_quadrant"] = { "x": round(stats["avg_valence"], 2), "y": round(stats["avg_energy"], 2) } return stats def compute_era_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ Musical Age and Era Distribution. """ query = self.db.query(PlayHistory).filter( PlayHistory.played_at >= period_start, PlayHistory.played_at <= period_end ) plays = query.all() years = [] track_ids = list(set([p.track_id for p in plays])) tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all() track_map = {t.id: t for t in tracks} for p in plays: t = track_map.get(p.track_id) if t and t.raw_data and "album" in t.raw_data and "release_date" in t.raw_data["album"]: rd = t.raw_data["album"]["release_date"] # Format can be YYYY, YYYY-MM, YYYY-MM-DD try: year = int(rd.split("-")[0]) years.append(year) except: pass if not years: return {"musical_age": None} avg_year = sum(years) / len(years) # Decade breakdown decades = {} for y in years: dec = (y // 10) * 10 label = f"{dec}s" decades[label] = decades.get(label, 0) + 1 total = len(years) decade_dist = {k: round(v/total, 2) for k, v in decades.items()} return { "musical_age": int(avg_year), "decade_distribution": decade_dist } def compute_skip_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ Implements boredom skip detection: (next_track.played_at - current_track.played_at) < (current_track.duration_ms / 1000 - 10s) """ query = self.db.query(PlayHistory).filter( PlayHistory.played_at >= period_start, PlayHistory.played_at <= period_end ).order_by(PlayHistory.played_at.asc()) plays = query.all() if len(plays) < 2: return {"skip_rate": 0, "total_skips": 0} skips = 0 track_ids = list(set([p.track_id for p in plays])) tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all() track_map = {t.id: t for t in tracks} for i in range(len(plays) - 1): current_play = plays[i] next_play = plays[i+1] track = track_map.get(current_play.track_id) if not track or not track.duration_ms: continue diff_seconds = (next_play.played_at - current_play.played_at).total_seconds() # Logic: If diff < (duration - 10s), it's a skip. # Convert duration to seconds duration_sec = track.duration_ms / 1000.0 # Also ensure diff isn't negative or weirdly small (re-plays) # And assume "listening" means diff > 30s at least? # Spec says "Spotify only returns 30s+". if diff_seconds < (duration_sec - 10): skips += 1 return { "total_skips": skips, "skip_rate": round(skips / len(plays), 3) } def generate_full_report(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: return { "period": { "start": period_start.isoformat(), "end": period_end.isoformat() }, "volume": self.compute_volume_stats(period_start, period_end), "time_habits": self.compute_time_stats(period_start, period_end), "sessions": self.compute_session_stats(period_start, period_end), "vibe": self.compute_vibe_stats(period_start, period_end), "era": self.compute_era_stats(period_start, period_end), "skips": self.compute_skip_stats(period_start, period_end) }