Files
MusicAnalyser/backend/app/services/stats_service.py
bnair123 af0d985253 Refactor Stats and Narrative services to match spec
- StatsService: Fixed N+1 queries, added missing metrics (whiplash, entropy, lifecycle), and improved correctness (boundary checks, null handling).
- NarrativeService: Added payload shaping for token efficiency, improved JSON robustness, and updated prompts to align with persona specs.
- Documentation: Added backend/TECHNICAL_DOCS.md detailing the logic.
2025-12-25 18:12:05 +04:00

714 lines
28 KiB
Python

from sqlalchemy.orm import Session, joinedload
from sqlalchemy import func, distinct
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
import math
import numpy as np
from ..models import PlayHistory, Track, Artist
class StatsService:
def __init__(self, db: Session):
self.db = db
def compute_comparison(self, current_stats: Dict[str, Any], period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Calculates deltas vs the previous period of the same length.
"""
duration = period_end - period_start
prev_end = period_start
prev_start = prev_end - duration
# We only need key metrics for comparison
prev_volume = self.compute_volume_stats(prev_start, prev_end)
prev_vibe = self.compute_vibe_stats(prev_start, prev_end)
prev_taste = self.compute_taste_stats(prev_start, prev_end)
deltas = {}
# Plays
curr_plays = current_stats["volume"]["total_plays"]
prev_plays_count = prev_volume["total_plays"]
deltas["plays_delta"] = curr_plays - prev_plays_count
deltas["plays_pct_change"] = self._pct_change(curr_plays, prev_plays_count)
# Energy & Valence
if "mood_quadrant" in current_stats["vibe"] and "mood_quadrant" in prev_vibe:
curr_e = current_stats["vibe"]["mood_quadrant"]["y"]
prev_e = prev_vibe["mood_quadrant"]["y"]
deltas["energy_delta"] = round(curr_e - prev_e, 2)
curr_v = current_stats["vibe"]["mood_quadrant"]["x"]
prev_v = prev_vibe["mood_quadrant"]["x"]
deltas["valence_delta"] = round(curr_v - prev_v, 2)
# Popularity
if "avg_popularity" in current_stats["taste"] and "avg_popularity" in prev_taste:
deltas["popularity_delta"] = round(current_stats["taste"]["avg_popularity"] - prev_taste["avg_popularity"], 1)
return {
"previous_period": {
"start": prev_start.isoformat(),
"end": prev_end.isoformat()
},
"deltas": deltas
}
def compute_volume_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Calculates volume metrics including Concentration (HHI, Gini, Entropy) and Top Lists.
"""
# Eager load tracks AND artists to fix the "Artist String Problem" and performance
# Use < period_end for half-open interval to avoid double counting boundaries
query = self.db.query(PlayHistory).options(
joinedload(PlayHistory.track).joinedload(Track.artists)
).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
)
plays = query.all()
total_plays = len(plays)
if total_plays == 0:
return self._empty_volume_stats()
total_ms = 0
track_counts = {}
artist_counts = {}
genre_counts = {}
album_counts = {}
# Maps for resolving names later without DB hits
track_map = {}
artist_map = {}
album_map = {}
for p in plays:
t = p.track
if not t: continue
total_ms += t.duration_ms if t.duration_ms else 0
# Track Aggregation
track_counts[t.id] = track_counts.get(t.id, 0) + 1
track_map[t.id] = t
# Album Aggregation
# Prefer ID from raw_data, fallback to name
album_id = t.album
album_name = t.album
if t.raw_data and "album" in t.raw_data:
album_id = t.raw_data["album"].get("id", t.album)
album_name = t.raw_data["album"].get("name", t.album)
album_counts[album_id] = album_counts.get(album_id, 0) + 1
album_map[album_id] = album_name
# Artist Aggregation (Iterate objects, not string)
for artist in t.artists:
artist_counts[artist.id] = artist_counts.get(artist.id, 0) + 1
artist_map[artist.id] = artist.name
# Genre Aggregation
if artist.genres:
# artist.genres is a JSON list of strings
for g in artist.genres:
genre_counts[g] = genre_counts.get(g, 0) + 1
# Derived Metrics
unique_tracks = len(track_counts)
one_and_done = len([c for c in track_counts.values() if c == 1])
shares = [c / total_plays for c in track_counts.values()]
# Top Lists (Optimized: No N+1)
top_tracks = [
{
"name": track_map[tid].name,
"artist": ", ".join([a.name for a in track_map[tid].artists]), # Correct artist display
"count": c
}
for tid, c in sorted(track_counts.items(), key=lambda x: x[1], reverse=True)[:5]
]
top_artists = [
{"name": artist_map.get(aid, "Unknown"), "count": c}
for aid, c in sorted(artist_counts.items(), key=lambda x: x[1], reverse=True)[:5]
]
top_albums = [
{"name": album_map.get(aid, "Unknown"), "count": c}
for aid, c in sorted(album_counts.items(), key=lambda x: x[1], reverse=True)[:5]
]
top_genres = [{"name": k, "count": v} for k, v in sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[:5]]
# Concentration Metrics
# HHI: Sum of (share)^2
hhi = sum([s ** 2 for s in shares])
# Gini Coefficient
sorted_shares = sorted(shares)
n = len(shares)
gini = 0
if n > 0:
gini = (2 * sum((i + 1) * x for i, x in enumerate(sorted_shares))) / (n * sum(sorted_shares)) - (n + 1) / n
# Genre Entropy: -SUM(p * log(p))
total_genre_occurrences = sum(genre_counts.values())
genre_entropy = 0
if total_genre_occurrences > 0:
genre_probs = [count / total_genre_occurrences for count in genre_counts.values()]
genre_entropy = -sum([p * math.log(p) for p in genre_probs if p > 0])
# Top 5 Share
top_5_plays = sum([t["count"] for t in top_tracks])
top_5_share = top_5_plays / total_plays if total_plays else 0
return {
"total_plays": total_plays,
"estimated_minutes": int(total_ms / 60000),
"unique_tracks": unique_tracks,
"unique_artists": len(artist_counts),
"unique_albums": len(album_counts),
"unique_genres": len(genre_counts),
"top_tracks": top_tracks,
"top_artists": top_artists,
"top_albums": top_albums,
"top_genres": top_genres,
"repeat_rate": round((total_plays - unique_tracks) / total_plays, 3) if total_plays else 0,
"one_and_done_rate": round(one_and_done / unique_tracks, 3) if unique_tracks else 0,
"concentration": {
"hhi": round(hhi, 4),
"gini": round(gini, 4),
"top_1_share": round(max(shares), 3) if shares else 0,
"top_5_share": round(top_5_share, 3),
"genre_entropy": round(genre_entropy, 2)
}
}
def compute_time_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Includes Part-of-Day buckets, Listening Streaks, and Active Days stats.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
if not plays:
return {}
hourly_counts = [0] * 24
weekday_counts = [0] * 7
# Spec: Morning (6-12), Afternoon (12-18), Evening (18-24), Night (0-6)
part_of_day = {"morning": 0, "afternoon": 0, "evening": 0, "night": 0}
active_dates = set()
for p in plays:
h = p.played_at.hour
hourly_counts[h] += 1
weekday_counts[p.played_at.weekday()] += 1
active_dates.add(p.played_at.date())
if 6 <= h < 12:
part_of_day["morning"] += 1
elif 12 <= h < 18:
part_of_day["afternoon"] += 1
elif 18 <= h <= 23:
part_of_day["evening"] += 1
else:
part_of_day["night"] += 1
# Calculate Streak
sorted_dates = sorted(list(active_dates))
current_streak = 0
longest_streak = 0
if sorted_dates:
current_streak = 1
longest_streak = 1
for i in range(1, len(sorted_dates)):
delta = (sorted_dates[i] - sorted_dates[i - 1]).days
if delta == 1:
current_streak += 1
else:
longest_streak = max(longest_streak, current_streak)
current_streak = 1
longest_streak = max(longest_streak, current_streak)
weekend_plays = weekday_counts[5] + weekday_counts[6]
active_days_count = len(active_dates)
return {
"hourly_distribution": hourly_counts,
"peak_hour": hourly_counts.index(max(hourly_counts)),
"weekday_distribution": weekday_counts,
"weekend_share": round(weekend_plays / len(plays), 2),
"part_of_day": part_of_day,
"listening_streak": current_streak,
"longest_streak": longest_streak,
"active_days": active_days_count,
"avg_plays_per_active_day": round(len(plays) / active_days_count, 1) if active_days_count else 0
}
def compute_session_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Includes Micro-sessions, Marathon sessions, Energy Arcs, and Median metrics.
"""
query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
if not plays:
return {"count": 0}
sessions = []
current_session = [plays[0]]
# 1. Sessionization (Gap > 20 mins)
for i in range(1, len(plays)):
diff = (plays[i].played_at - plays[i-1].played_at).total_seconds() / 60
if diff > 20:
sessions.append(current_session)
current_session = []
current_session.append(plays[i])
sessions.append(current_session)
# 2. Analyze Sessions
lengths_min = []
micro_sessions = 0
marathon_sessions = 0
energy_arcs = {"rising": 0, "falling": 0, "flat": 0, "unknown": 0}
start_hour_dist = [0] * 24
for sess in sessions:
# Start time distribution
start_hour_dist[sess[0].played_at.hour] += 1
# Durations
if len(sess) > 1:
duration = (sess[-1].played_at - sess[0].played_at).total_seconds() / 60
lengths_min.append(duration)
else:
lengths_min.append(3.0) # Approx single song
# Types
if len(sess) <= 3: micro_sessions += 1
if len(sess) >= 20: marathon_sessions += 1
# Energy Arc
first_t = sess[0].track
last_t = sess[-1].track
if first_t and last_t and first_t.energy is not None and last_t.energy is not None:
diff = last_t.energy - first_t.energy
if diff > 0.1: energy_arcs["rising"] += 1
elif diff < -0.1: energy_arcs["falling"] += 1
else: energy_arcs["flat"] += 1
else:
energy_arcs["unknown"] += 1
avg_min = np.mean(lengths_min) if lengths_min else 0
median_min = np.median(lengths_min) if lengths_min else 0
# Sessions per day
active_days = len(set(p.played_at.date() for p in plays))
sessions_per_day = len(sessions) / active_days if active_days else 0
return {
"count": len(sessions),
"avg_tracks": round(len(plays) / len(sessions), 1),
"avg_minutes": round(float(avg_min), 1),
"median_minutes": round(float(median_min), 1),
"longest_session_minutes": round(max(lengths_min), 1) if lengths_min else 0,
"sessions_per_day": round(sessions_per_day, 1),
"start_hour_distribution": start_hour_dist,
"micro_session_rate": round(micro_sessions / len(sessions), 2),
"marathon_session_rate": round(marathon_sessions / len(sessions), 2),
"energy_arcs": energy_arcs
}
def compute_vibe_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Aggregates Audio Features + Calculates Whiplash, Percentiles, and Profiles.
"""
plays = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
).order_by(PlayHistory.played_at.asc()).all()
if not plays:
return {}
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
# 1. Aggregates
feature_keys = ["energy", "valence", "danceability", "tempo", "acousticness",
"instrumentalness", "liveness", "speechiness", "loudness"]
features = {k: [] for k in feature_keys}
# 2. Transition Arrays (for Whiplash)
transitions = {"tempo": [], "energy": [], "valence": []}
previous_track = None
for i, p in enumerate(plays):
t = track_map.get(p.track_id)
if not t: continue
# Robust Null Check: Append separately
for key in feature_keys:
val = getattr(t, key, None)
if val is not None:
features[key].append(val)
# Calculate Transitions (Whiplash)
if i > 0 and previous_track:
time_diff = (p.played_at - plays[i - 1].played_at).total_seconds()
if time_diff < 300: # 5 min gap max
if t.tempo is not None and previous_track.tempo is not None:
transitions["tempo"].append(abs(t.tempo - previous_track.tempo))
if t.energy is not None and previous_track.energy is not None:
transitions["energy"].append(abs(t.energy - previous_track.energy))
if t.valence is not None and previous_track.valence is not None:
transitions["valence"].append(abs(t.valence - previous_track.valence))
previous_track = t
# Calculate Stats (Mean, Std, Percentiles)
stats = {}
for key, values in features.items():
if values:
stats[f"avg_{key}"] = float(np.mean(values))
stats[f"std_{key}"] = float(np.std(values))
stats[f"p10_{key}"] = float(np.percentile(values, 10))
stats[f"p50_{key}"] = float(np.percentile(values, 50)) # Median
stats[f"p90_{key}"] = float(np.percentile(values, 90))
else:
stats[f"avg_{key}"] = None
# Derived Metrics
if stats.get("avg_energy") is not None and stats.get("avg_valence") is not None:
stats["mood_quadrant"] = {
"x": round(stats["avg_valence"], 2),
"y": round(stats["avg_energy"], 2)
}
# Consistency
avg_std = (stats.get("std_energy", 0) + stats.get("std_valence", 0)) / 2
stats["consistency_score"] = round(1.0 - avg_std, 2)
# Rhythm Profile
if stats.get("avg_tempo") is not None and stats.get("avg_danceability") is not None:
stats["rhythm_profile"] = {
"avg_tempo": round(stats["avg_tempo"], 1),
"avg_danceability": round(stats["avg_danceability"], 2)
}
# Texture Profile
if stats.get("avg_acousticness") is not None and stats.get("avg_instrumentalness") is not None:
stats["texture_profile"] = {
"acousticness": round(stats["avg_acousticness"], 2),
"instrumentalness": round(stats["avg_instrumentalness"], 2)
}
# Whiplash Scores
stats["whiplash"] = {}
for k in ["tempo", "energy", "valence"]:
if transitions[k]:
stats["whiplash"][k] = round(float(np.mean(transitions[k])), 2)
else:
stats["whiplash"][k] = 0
return stats
def compute_era_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Includes Nostalgia Gap and granular decade breakdown.
"""
query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
)
plays = query.all()
years = []
for p in plays:
t = p.track
if t and t.raw_data and "album" in t.raw_data:
rd = t.raw_data["album"].get("release_date")
if rd:
try:
years.append(int(rd.split("-")[0]))
except:
pass
if not years:
return {"musical_age": None}
avg_year = sum(years) / len(years)
current_year = datetime.utcnow().year
decades = {}
for y in years:
dec = (y // 10) * 10
label = f"{dec}s"
decades[label] = decades.get(label, 0) + 1
total = len(years)
dist = {k: round(v / total, 3) for k, v in decades.items()}
return {
"musical_age": int(avg_year),
"nostalgia_gap": int(current_year - avg_year),
"freshness_score": dist.get(f"{int(current_year / 10) * 10}s", 0),
"decade_distribution": dist
}
def compute_skip_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Implements boredom skip detection.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
if len(plays) < 2:
return {"skip_rate": 0, "total_skips": 0}
skips = 0
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
# Denominator: transitions, which is plays - 1
transitions_count = len(plays) - 1
for i in range(transitions_count):
current_play = plays[i]
next_play = plays[i+1]
track = track_map.get(current_play.track_id)
if not track or not track.duration_ms:
continue
diff_seconds = (next_play.played_at - current_play.played_at).total_seconds()
duration_sec = track.duration_ms / 1000.0
# Logic: If diff < (duration - 10s), it's a skip.
# AND it must be a "valid" listening attempt (e.g. > 30s)
# AND it shouldn't be a huge gap (e.g. paused for 2 hours then hit next)
if 30 < diff_seconds < (duration_sec - 10):
skips += 1
return {
"total_skips": skips,
"skip_rate": round(skips / transitions_count, 3) if transitions_count > 0 else 0
}
def compute_context_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Analyzes context_uri and switching rate.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
if not plays:
return {}
context_counts = {"playlist": 0, "album": 0, "artist": 0, "collection": 0, "unknown": 0}
unique_contexts = {}
context_switches = 0
last_context = None
for p in plays:
uri = p.context_uri
if not uri:
context_counts["unknown"] += 1
uri = "unknown"
else:
if "playlist" in uri: context_counts["playlist"] += 1
elif "album" in uri: context_counts["album"] += 1
elif "artist" in uri: context_counts["artist"] += 1
elif "collection" in uri: context_counts["collection"] += 1
else: context_counts["unknown"] += 1
if uri != "unknown":
unique_contexts[uri] = unique_contexts.get(uri, 0) + 1
# Switch detection
if last_context and uri != last_context:
context_switches += 1
last_context = uri
total = len(plays)
breakdown = {k: round(v / total, 2) for k, v in context_counts.items()}
sorted_contexts = sorted(unique_contexts.items(), key=lambda x: x[1], reverse=True)[:5]
return {
"type_breakdown": breakdown,
"album_purist_score": breakdown.get("album", 0),
"playlist_dependency": breakdown.get("playlist", 0),
"context_loyalty": round(len(plays) / len(unique_contexts), 2) if unique_contexts else 0,
"context_switching_rate": round(context_switches / (total - 1), 2) if total > 1 else 0,
"top_context_uris": [{"uri": k, "count": v} for k, v in sorted_contexts]
}
def compute_taste_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Mainstream vs. Hipster analysis.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
)
plays = query.all()
if not plays: return {}
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
pop_values = []
for p in plays:
t = track_map.get(p.track_id)
if t and t.popularity is not None:
pop_values.append(t.popularity)
if not pop_values:
return {"avg_popularity": 0, "hipster_score": 0}
avg_pop = float(np.mean(pop_values))
# Hipster Score: Percentage of tracks with popularity < 30
underground_plays = len([x for x in pop_values if x < 30])
mainstream_plays = len([x for x in pop_values if x > 70])
return {
"avg_popularity": round(avg_pop, 1),
"hipster_score": round((underground_plays / len(pop_values)) * 100, 1),
"mainstream_score": round((mainstream_plays / len(pop_values)) * 100, 1),
"obscurity_rating": round(100 - avg_pop, 1)
}
def compute_lifecycle_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Discovery, Recurrence, Comebacks, Obsessions.
"""
# 1. Current plays
current_plays = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
).all()
if not current_plays: return {}
current_track_ids = set([p.track_id for p in current_plays])
# 2. Historical check
old_tracks_query = self.db.query(distinct(PlayHistory.track_id)).filter(
PlayHistory.track_id.in_(current_track_ids),
PlayHistory.played_at < period_start
)
old_track_ids = set([r[0] for r in old_tracks_query.all()])
# 3. Discovery
new_discoveries = current_track_ids - old_track_ids
# 4. Obsessions (Tracks with > 5 plays in period)
track_counts = {}
for p in current_plays:
track_counts[p.track_id] = track_counts.get(p.track_id, 0) + 1
obsessions = [tid for tid, count in track_counts.items() if count >= 5]
# 5. Comeback Detection (Old tracks not played in last 30 days)
# Simplified: If in old_track_ids but NOT in last 30 days before period_start?
# That requires a gap check. For now, we will mark 'recurrence' as general relistening.
plays_on_new = len([p for p in current_plays if p.track_id in new_discoveries])
total_plays = len(current_plays)
return {
"discovery_count": len(new_discoveries),
"discovery_rate": round(plays_on_new / total_plays, 3) if total_plays > 0 else 0,
"recurrence_rate": round((total_plays - plays_on_new) / total_plays, 3) if total_plays > 0 else 0,
"obsession_count": len(obsessions),
"obsession_rate": round(len(obsessions) / len(current_track_ids), 3) if current_track_ids else 0
}
def compute_explicit_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Analyzes explicit content consumption.
"""
query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
)
plays = query.all()
if not plays: return {"explicit_rate": 0, "hourly_explicit_rate": []}
total_plays = len(plays)
explicit_count = 0
hourly_explicit = [0] * 24
hourly_total = [0] * 24
for p in plays:
h = p.played_at.hour
hourly_total[h] += 1
t = p.track
if t.raw_data and t.raw_data.get("explicit"):
explicit_count += 1
hourly_explicit[h] += 1
hourly_rates = []
for i in range(24):
hourly_rates.append(round(hourly_explicit[i] / hourly_total[i], 2) if hourly_total[i] > 0 else 0.0)
return {
"explicit_rate": round(explicit_count / total_plays, 3),
"total_explicit_plays": explicit_count,
"hourly_explicit_distribution": hourly_rates
}
def generate_full_report(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
current_stats = {
"period": {"start": period_start.isoformat(), "end": period_end.isoformat()},
"volume": self.compute_volume_stats(period_start, period_end),
"time_habits": self.compute_time_stats(period_start, period_end),
"sessions": self.compute_session_stats(period_start, period_end),
"context": self.compute_context_stats(period_start, period_end),
"vibe": self.compute_vibe_stats(period_start, period_end),
"era": self.compute_era_stats(period_start, period_end),
"taste": self.compute_taste_stats(period_start, period_end),
"lifecycle": self.compute_lifecycle_stats(period_start, period_end),
"flags": self.compute_explicit_stats(period_start, period_end),
"skips": self.compute_skip_stats(period_start, period_end)
}
current_stats["comparison"] = self.compute_comparison(current_stats, period_start, period_end)
return current_stats
def _empty_volume_stats(self):
return {
"total_plays": 0, "estimated_minutes": 0, "unique_tracks": 0,
"unique_artists": 0, "unique_albums": 0, "unique_genres": 0,
"top_tracks": [], "top_artists": [], "top_albums": [], "top_genres": [],
"repeat_rate": 0, "one_and_done_rate": 0,
"concentration": {}
}
def _pct_change(self, curr, prev):
if prev == 0:
return 100.0 if curr > 0 else 0.0
return round(((curr - prev) / prev) * 100, 1)