Files
MusicAnalyser/backend/app/services/stats_service.py
2025-12-25 17:48:41 +04:00

666 lines
26 KiB
Python

from sqlalchemy.orm import Session
from sqlalchemy import func, distinct, desc, joinedload
from datetime import datetime, timedelta
from typing import Dict, Any, List
import math
import numpy as np
from ..models import PlayHistory, Track, Artist, AnalysisSnapshot
class StatsService:
def __init__(self, db: Session):
self.db = db
from sqlalchemy.orm import joinedload # Add this to imports
def compute_comparison(self, current_stats: Dict[str, Any], period_start: datetime, period_end: datetime) -> Dict[
str, Any]:
"""
Calculates deltas vs the previous period of the same length.
"""
duration = period_end - period_start
prev_end = period_start
prev_start = prev_end - duration
# We only need key metrics for comparison, not the full heavy report
# Let's re-use existing methods but strictly for the previous window
# 1. Volume Comparison
prev_volume = self.compute_volume_stats(prev_start, prev_end)
# 2. Vibe Comparison (Just energy/valence/popularity)
prev_vibe = self.compute_vibe_stats(prev_start, prev_end)
prev_taste = self.compute_taste_stats(prev_start, prev_end)
# Calculate Deltas
deltas = {}
# Plays
curr_plays = current_stats["volume"]["total_plays"]
prev_plays_count = prev_volume["total_plays"]
deltas["plays_delta"] = curr_plays - prev_plays_count
deltas["plays_pct_change"] = round(((curr_plays - prev_plays_count) / prev_plays_count) * 100,
1) if prev_plays_count else 0
# Energy & Valence
if "mood_quadrant" in current_stats["vibe"] and "mood_quadrant" in prev_vibe:
curr_e = current_stats["vibe"]["mood_quadrant"]["y"]
prev_e = prev_vibe["mood_quadrant"]["y"]
deltas["energy_delta"] = round(curr_e - prev_e, 2)
curr_v = current_stats["vibe"]["mood_quadrant"]["x"]
prev_v = prev_vibe["mood_quadrant"]["x"]
deltas["valence_delta"] = round(curr_v - prev_v, 2)
# Popularity
if "avg_popularity" in current_stats["taste"] and "avg_popularity" in prev_taste:
deltas["popularity_delta"] = round(current_stats["taste"]["avg_popularity"] - prev_taste["avg_popularity"],
1)
return {
"previous_period": {
"start": prev_start.isoformat(),
"end": prev_end.isoformat()
},
"deltas": deltas
}
def compute_volume_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Calculates volume metrics including Concentration (HHI, Gini) and One-and-Done rates.
"""
# Eager load tracks AND artists to fix the "Artist String Problem" and performance
query = self.db.query(PlayHistory).options(
joinedload(PlayHistory.track).joinedload(Track.artists)
).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
)
plays = query.all()
total_plays = len(plays)
if total_plays == 0:
return {
"total_plays": 0, "estimated_minutes": 0, "unique_tracks": 0,
"unique_artists": 0, "unique_albums": 0, "unique_genres": 0,
"top_tracks": [], "top_artists": [], "top_genres": [],
"repeat_rate": 0, "concentration": {}
}
total_ms = 0
track_counts = {}
artist_counts = {}
genre_counts = {}
album_ids = set()
for p in plays:
t = p.track
if not t: continue
total_ms += t.duration_ms if t.duration_ms else 0
# Track Counts
track_counts[t.id] = track_counts.get(t.id, 0) + 1
# Album Counts (using raw_data ID if available, else name)
if t.raw_data and "album" in t.raw_data and "id" in t.raw_data["album"]:
album_ids.add(t.raw_data["album"]["id"])
else:
album_ids.add(t.album)
# Artist Counts (Iterate objects, not string)
for artist in t.artists:
artist_counts[artist.id] = artist_counts.get(artist.id, 0) + 1
if artist.genres:
for g in artist.genres:
genre_counts[g] = genre_counts.get(g, 0) + 1
# Derived Metrics
unique_tracks = len(track_counts)
one_and_done = len([c for c in track_counts.values() if c == 1])
# Top Lists
top_tracks = [
{"name": self.db.query(Track).get(tid).name, "artist": self.db.query(Track).get(tid).artist, "count": c}
for tid, c in sorted(track_counts.items(), key=lambda x: x[1], reverse=True)[:5]
]
top_artist_ids = sorted(artist_counts.items(), key=lambda x: x[1], reverse=True)[:5]
# Fetch artist names efficiently
top_artists_objs = self.db.query(Artist).filter(Artist.id.in_([x[0] for x in top_artist_ids])).all()
artist_map = {a.id: a.name for a in top_artists_objs}
top_artists = [{"name": artist_map.get(aid, "Unknown"), "count": c} for aid, c in top_artist_ids]
top_genres = [{"name": k, "count": v} for k, v in
sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[:5]]
# Concentration (HHI & Gini)
# HHI: Sum of (share)^2
shares = [c / total_plays for c in track_counts.values()]
hhi = sum([s ** 2 for s in shares])
# Gini Coefficient (Inequality of play distribution)
sorted_shares = sorted(shares)
n = len(shares)
if n > 0:
gini = (2 * sum((i + 1) * x for i, x in enumerate(sorted_shares))) / (n * sum(sorted_shares)) - (n + 1) / n
else:
gini = 0
return {
"total_plays": total_plays,
"estimated_minutes": int(total_ms / 60000),
"unique_tracks": unique_tracks,
"unique_artists": len(artist_counts),
"unique_albums": len(album_ids),
"unique_genres": len(genre_counts),
"top_tracks": top_tracks,
"top_artists": top_artists,
"top_genres": top_genres,
"repeat_rate": round((total_plays - unique_tracks) / total_plays, 3) if total_plays else 0,
"one_and_done_rate": round(one_and_done / unique_tracks, 3) if unique_tracks else 0,
"concentration": {
"hhi": round(hhi, 4),
"gini": round(gini, 4),
"top_1_share": round(max(shares), 3) if shares else 0
}
}
def compute_time_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Includes Part-of-Day buckets and Listening Streaks.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
if not plays:
return {}
hourly_counts = [0] * 24
weekday_counts = [0] * 7
part_of_day = {"morning": 0, "afternoon": 0, "evening": 0, "night": 0}
# For Streaks
active_dates = set()
for p in plays:
h = p.played_at.hour
hourly_counts[h] += 1
weekday_counts[p.played_at.weekday()] += 1
active_dates.add(p.played_at.date())
if 5 <= h < 12:
part_of_day["morning"] += 1
elif 12 <= h < 17:
part_of_day["afternoon"] += 1
elif 17 <= h < 22:
part_of_day["evening"] += 1
else:
part_of_day["night"] += 1
# Calculate Streak
sorted_dates = sorted(list(active_dates))
current_streak = 0
longest_streak = 0
if sorted_dates:
current_streak = 1
longest_streak = 1
# Check strictly consecutive days
for i in range(1, len(sorted_dates)):
delta = (sorted_dates[i] - sorted_dates[i - 1]).days
if delta == 1:
current_streak += 1
else:
longest_streak = max(longest_streak, current_streak)
current_streak = 1
longest_streak = max(longest_streak, current_streak)
weekend_plays = weekday_counts[5] + weekday_counts[6]
return {
"hourly_distribution": hourly_counts,
"peak_hour": hourly_counts.index(max(hourly_counts)),
"weekday_distribution": weekday_counts,
"weekend_share": round(weekend_plays / len(plays), 2),
"part_of_day": part_of_day,
"listening_streak": current_streak,
"longest_streak": longest_streak,
"active_days": len(active_dates)
}
def compute_session_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Includes Micro-sessions, Marathon sessions, and Energy Arcs.
"""
# Need to join Track to get Energy features for Arc analysis
query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
if not plays:
return {"count": 0}
sessions = []
current_session = [plays[0]]
# 1. Sessionization (Gap > 20 mins)
for i in range(1, len(plays)):
diff = (plays[i].played_at - plays[i-1].played_at).total_seconds() / 60
if diff > 20:
sessions.append(current_session)
current_session = []
current_session.append(plays[i])
sessions.append(current_session)
# 2. Analyze Sessions
lengths_min = []
micro_sessions = 0
marathon_sessions = 0
energy_arcs = {"rising": 0, "falling": 0, "flat": 0, "unknown": 0}
for sess in sessions:
# Durations
if len(sess) > 1:
duration = (sess[-1].played_at - sess[0].played_at).total_seconds() / 60
lengths_min.append(duration)
else:
lengths_min.append(3.0) # Approx
# Types
if len(sess) <= 3: micro_sessions += 1
if len(sess) >= 20: marathon_sessions += 1
# Energy Arc (First vs Last track)
first_t = sess[0].track
last_t = sess[-1].track
if first_t and last_t and first_t.energy is not None and last_t.energy is not None:
diff = last_t.energy - first_t.energy
if diff > 0.1: energy_arcs["rising"] += 1
elif diff < -0.1: energy_arcs["falling"] += 1
else: energy_arcs["flat"] += 1
else:
energy_arcs["unknown"] += 1
avg_min = sum(lengths_min) / len(lengths_min) if lengths_min else 0
return {
"count": len(sessions),
"avg_tracks": round(len(plays) / len(sessions), 1),
"avg_minutes": round(avg_min, 1),
"longest_session_minutes": round(max(lengths_min), 1) if lengths_min else 0,
"micro_session_rate": round(micro_sessions / len(sessions), 2),
"marathon_session_rate": round(marathon_sessions / len(sessions), 2),
"energy_arcs": energy_arcs
}
def compute_vibe_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Aggregates Audio Features + Calculates Whiplash (Transitions)
"""
# Fetch plays strictly ordered by time for transition analysis
plays = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
).order_by(PlayHistory.played_at.asc()).all()
if not plays:
return {}
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
# 1. Aggregates
features = {k: [] for k in
["energy", "valence", "danceability", "tempo", "acousticness", "instrumentalness", "liveness",
"speechiness", "loudness"]}
# 2. Transition Arrays (for Whiplash)
transitions = {"tempo": [], "energy": [], "valence": []}
previous_track = None
for i, p in enumerate(plays):
t = track_map.get(p.track_id)
if not t: continue
# Populate aggregations
if t.energy is not None:
features["energy"].append(t.energy)
features["valence"].append(t.valence)
features["danceability"].append(t.danceability)
features["tempo"].append(t.tempo)
features["acousticness"].append(t.acousticness)
features["instrumentalness"].append(t.instrumentalness)
features["liveness"].append(t.liveness)
features["speechiness"].append(t.speechiness)
features["loudness"].append(t.loudness)
# Calculate Transitions (Whiplash)
if i > 0 and previous_track:
# Only count transition if within reasonable time (e.g. < 5 mins gap)
# assuming continuous listening
time_diff = (p.played_at - plays[i - 1].played_at).total_seconds()
if time_diff < 300:
if t.tempo and previous_track.tempo:
transitions["tempo"].append(abs(t.tempo - previous_track.tempo))
if t.energy and previous_track.energy:
transitions["energy"].append(abs(t.energy - previous_track.energy))
previous_track = t
# Calculate Stats
stats = {}
for key, values in features.items():
valid = [v for v in values if v is not None]
if valid:
stats[f"avg_{key}"] = float(np.mean(valid))
stats[f"std_{key}"] = float(np.std(valid))
else:
stats[f"avg_{key}"] = None
# Derived Metrics
if stats.get("avg_energy") is not None and stats.get("avg_valence") is not None:
stats["mood_quadrant"] = {
"x": round(stats["avg_valence"], 2),
"y": round(stats["avg_energy"], 2)
}
# Consistency: Inverse of average standard deviation of Mood components
avg_std = (stats["std_energy"] + stats["std_valence"]) / 2
stats["consistency_score"] = round(1.0 - avg_std, 2) # Higher = more consistent
# Whiplash Scores (Average jump between tracks)
stats["whiplash"] = {}
for k in ["tempo", "energy"]:
if transitions[k]:
stats["whiplash"][k] = round(float(np.mean(transitions[k])), 2)
else:
stats["whiplash"][k] = 0
return stats
def compute_era_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Includes Nostalgia Gap and granular decade breakdown.
"""
# Join track to get raw_data
query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
)
plays = query.all()
years = []
for p in plays:
t = p.track
if t and t.raw_data and "album" in t.raw_data:
rd = t.raw_data["album"].get("release_date")
if rd:
try:
years.append(int(rd.split("-")[0]))
except:
pass
if not years:
return {"musical_age": None}
# Musical Age (Weighted Average)
avg_year = sum(years) / len(years)
current_year = datetime.utcnow().year
# Decade Distribution
decades = {}
for y in years:
dec = (y // 10) * 10
label = f"{dec}s"
decades[label] = decades.get(label, 0) + 1
total = len(years)
dist = {k: round(v / total, 3) for k, v in decades.items()}
return {
"musical_age": int(avg_year),
"nostalgia_gap": int(current_year - avg_year),
"freshness_score": dist.get(f"{int(current_year / 10) * 10}s", 0), # Share of current decade
"decade_distribution": dist
}
def compute_skip_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Implements boredom skip detection:
(next_track.played_at - current_track.played_at) < (current_track.duration_ms / 1000 - 10s)
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
if len(plays) < 2:
return {"skip_rate": 0, "total_skips": 0}
skips = 0
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
for i in range(len(plays) - 1):
current_play = plays[i]
next_play = plays[i+1]
track = track_map.get(current_play.track_id)
if not track or not track.duration_ms:
continue
diff_seconds = (next_play.played_at - current_play.played_at).total_seconds()
# Logic: If diff < (duration - 10s), it's a skip.
# Convert duration to seconds
duration_sec = track.duration_ms / 1000.0
# Also ensure diff isn't negative or weirdly small (re-plays)
# And assume "listening" means diff > 30s at least?
# Spec says "Spotify only returns 30s+".
if diff_seconds < (duration_sec - 10):
skips += 1
return {
"total_skips": skips,
"skip_rate": round(skips / len(plays), 3)
}
def compute_context_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Analyzes context_uri to determine if user listens to Playlists, Albums, or Artists.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
)
plays = query.all()
if not plays:
return {}
context_counts = {"playlist": 0, "album": 0, "artist": 0, "collection": 0, "unknown": 0}
unique_contexts = {}
for p in plays:
if not p.context_uri:
context_counts["unknown"] += 1
continue
# Count distinct contexts for loyalty
unique_contexts[p.context_uri] = unique_contexts.get(p.context_uri, 0) + 1
if "playlist" in p.context_uri:
context_counts["playlist"] += 1
elif "album" in p.context_uri:
context_counts["album"] += 1
elif "artist" in p.context_uri:
context_counts["artist"] += 1
elif "collection" in p.context_uri:
# "Liked Songs" usually shows up as collection
context_counts["collection"] += 1
else:
context_counts["unknown"] += 1
total = len(plays)
breakdown = {k: round(v / total, 2) for k, v in context_counts.items()}
# Top 5 Contexts (Requires resolving URI to name, possibly missing metadata here)
sorted_contexts = sorted(unique_contexts.items(), key=lambda x: x[1], reverse=True)[:5]
return {
"type_breakdown": breakdown,
"album_purist_score": breakdown.get("album", 0),
"playlist_dependency": breakdown.get("playlist", 0),
"context_loyalty": round(len(plays) / len(unique_contexts), 2) if unique_contexts else 0,
"top_context_uris": [{"uri": k, "count": v} for k, v in sorted_contexts]
}
def compute_taste_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Mainstream vs. Hipster analysis based on Track.popularity (0-100).
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
)
plays = query.all()
if not plays: return {}
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
pop_values = []
for p in plays:
t = track_map.get(p.track_id)
if t and t.popularity is not None:
pop_values.append(t.popularity)
if not pop_values:
return {"avg_popularity": 0, "hipster_score": 0}
avg_pop = float(np.mean(pop_values))
# Hipster Score: Percentage of tracks with popularity < 30
underground_plays = len([x for x in pop_values if x < 30])
mainstream_plays = len([x for x in pop_values if x > 70])
return {
"avg_popularity": round(avg_pop, 1),
"hipster_score": round((underground_plays / len(pop_values)) * 100, 1),
"mainstream_score": round((mainstream_plays / len(pop_values)) * 100, 1),
"obscurity_rating": round(100 - avg_pop, 1)
}
def compute_lifecycle_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Determines if tracks are 'New Discoveries' or 'Old Favorites'.
"""
# 1. Get tracks played in this period
current_plays = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
).all()
if not current_plays: return {}
current_track_ids = set([p.track_id for p in current_plays])
# 2. Check if these tracks were played BEFORE period_start
# We find which of the current_track_ids exist in history < period_start
old_tracks_query = self.db.query(distinct(PlayHistory.track_id)).filter(
PlayHistory.track_id.in_(current_track_ids),
PlayHistory.played_at < period_start
)
old_track_ids = set([r[0] for r in old_tracks_query.all()])
# 3. Calculate Discovery
new_discoveries = current_track_ids - old_track_ids
discovery_count = len(new_discoveries)
# Calculate plays on new discoveries
plays_on_new = len([p for p in current_plays if p.track_id in new_discoveries])
total_plays = len(current_plays)
return {
"discovery_count": discovery_count,
"discovery_rate": round(plays_on_new / total_plays, 3) if total_plays > 0 else 0,
"recurrence_rate": round((total_plays - plays_on_new) / total_plays, 3) if total_plays > 0 else 0
}
def compute_explicit_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Analyzes explicit content consumption.
"""
query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
)
plays = query.all()
if not plays: return {"explicit_rate": 0, "hourly_explicit_rate": []}
total_plays = len(plays)
explicit_count = 0
hourly_explicit = [0] * 24
hourly_total = [0] * 24
for p in plays:
h = p.played_at.hour
hourly_total[h] += 1
# Check raw_data for explicit flag
t = p.track
is_explicit = False
if t.raw_data and t.raw_data.get("explicit"):
is_explicit = True
if is_explicit:
explicit_count += 1
hourly_explicit[h] += 1
# Calculate hourly percentages
hourly_rates = []
for i in range(24):
if hourly_total[i] > 0:
hourly_rates.append(round(hourly_explicit[i] / hourly_total[i], 2))
else:
hourly_rates.append(0.0)
return {
"explicit_rate": round(explicit_count / total_plays, 3),
"total_explicit_plays": explicit_count,
"hourly_explicit_distribution": hourly_rates
}
def generate_full_report(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
# 1. Calculate all current stats
current_stats = {
"period": {"start": period_start.isoformat(), "end": period_end.isoformat()},
"volume": self.compute_volume_stats(period_start, period_end),
"time_habits": self.compute_time_stats(period_start, period_end),
"sessions": self.compute_session_stats(period_start, period_end),
"context": self.compute_context_stats(period_start, period_end),
"vibe": self.compute_vibe_stats(period_start, period_end),
"era": self.compute_era_stats(period_start, period_end),
"taste": self.compute_taste_stats(period_start, period_end),
"lifecycle": self.compute_lifecycle_stats(period_start, period_end),
"flags": self.compute_explicit_stats(period_start, period_end),
"skips": self.compute_skip_stats(period_start, period_end)
}
# 2. Calculate Comparison
current_stats["comparison"] = self.compute_comparison(current_stats, period_start, period_end)
return current_stats