Files
MusicAnalyser/backend/app/services/stats_service.py
bnair123 887e78bf47 Add skip tracking, compressed heatmap, listening log, docs, tests, and OpenAI support
Major changes:
- Add skip tracking: poll currently-playing every 15s, detect skips (<30s listened)
- Add listening-log and sessions API endpoints
- Fix ReccoBeats client to extract spotify_id from href response
- Compress heatmap from 24 hours to 6 x 4-hour blocks
- Add OpenAI support in narrative service (use max_completion_tokens for new models)
- Add ListeningLog component with timeline and list views
- Update all frontend components to use real data (album art, play counts)
- Add docker-compose external network (dockernet) support
- Add comprehensive documentation (API, DATA_MODEL, ARCHITECTURE, FRONTEND)
- Add unit tests for ingest and API endpoints
2025-12-30 00:15:01 +04:00

1074 lines
37 KiB
Python

from sqlalchemy.orm import Session, joinedload
from sqlalchemy import func, distinct
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
import math
import numpy as np
from sklearn.cluster import KMeans
from ..models import PlayHistory, Track, Artist
class StatsService:
def __init__(self, db: Session):
self.db = db
def compute_comparison(
self,
current_stats: Dict[str, Any],
period_start: datetime,
period_end: datetime,
) -> Dict[str, Any]:
"""
Calculates deltas vs the previous period of the same length.
"""
duration = period_end - period_start
prev_end = period_start
prev_start = prev_end - duration
# We only need key metrics for comparison
prev_volume = self.compute_volume_stats(prev_start, prev_end)
prev_vibe = self.compute_vibe_stats(prev_start, prev_end)
prev_taste = self.compute_taste_stats(prev_start, prev_end)
deltas = {}
# Plays
curr_plays = current_stats["volume"]["total_plays"]
prev_plays_count = prev_volume["total_plays"]
deltas["plays_delta"] = curr_plays - prev_plays_count
deltas["plays_pct_change"] = self._pct_change(curr_plays, prev_plays_count)
# Energy & Valence
if "mood_quadrant" in current_stats["vibe"] and "mood_quadrant" in prev_vibe:
curr_e = current_stats["vibe"]["mood_quadrant"]["y"]
prev_e = prev_vibe["mood_quadrant"]["y"]
deltas["energy_delta"] = round(curr_e - prev_e, 2)
curr_v = current_stats["vibe"]["mood_quadrant"]["x"]
prev_v = prev_vibe["mood_quadrant"]["x"]
deltas["valence_delta"] = round(curr_v - prev_v, 2)
# Popularity
if (
"avg_popularity" in current_stats["taste"]
and "avg_popularity" in prev_taste
):
deltas["popularity_delta"] = round(
current_stats["taste"]["avg_popularity"] - prev_taste["avg_popularity"],
1,
)
return {
"previous_period": {
"start": prev_start.isoformat(),
"end": prev_end.isoformat(),
},
"deltas": deltas,
}
def compute_volume_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Calculates volume metrics including Concentration (HHI, Gini, Entropy) and Top Lists.
"""
# Eager load tracks AND artists to fix the "Artist String Problem" and performance
# Use < period_end for half-open interval to avoid double counting boundaries
query = (
self.db.query(PlayHistory)
.options(joinedload(PlayHistory.track).joinedload(Track.artists))
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end,
)
)
plays = query.all()
total_plays = len(plays)
if total_plays == 0:
return self._empty_volume_stats()
total_ms = 0
track_counts = {}
artist_counts = {}
genre_counts = {}
album_counts = {}
# Maps for resolving names/images later without DB hits
track_map = {}
artist_map = {}
album_map = {}
# Helper to safely get image
def get_track_image(t):
if t.image_url:
return t.image_url
if t.raw_data and "album" in t.raw_data and "images" in t.raw_data["album"]:
imgs = t.raw_data["album"]["images"]
if imgs:
return imgs[0].get("url")
return None
for p in plays:
t = p.track
if not t:
continue
total_ms += t.duration_ms if t.duration_ms else 0
# Track Aggregation
track_counts[t.id] = track_counts.get(t.id, 0) + 1
track_map[t.id] = t
# Album Aggregation
# Prefer ID from raw_data, fallback to name
album_id = t.album
album_name = t.album
if t.raw_data and "album" in t.raw_data:
album_id = t.raw_data["album"].get("id", t.album)
album_name = t.raw_data["album"].get("name", t.album)
album_counts[album_id] = album_counts.get(album_id, 0) + 1
# Store tuple of (name, image_url)
if album_id not in album_map:
album_map[album_id] = {"name": album_name, "image": get_track_image(t)}
# Artist Aggregation (Iterate objects, not string)
for artist in t.artists:
artist_counts[artist.id] = artist_counts.get(artist.id, 0) + 1
if artist.id not in artist_map:
artist_map[artist.id] = {
"name": artist.name,
"image": artist.image_url,
}
# Genre Aggregation
if artist.genres:
# artist.genres is a JSON list of strings
for g in artist.genres:
genre_counts[g] = genre_counts.get(g, 0) + 1
# Derived Metrics
unique_tracks = len(track_counts)
one_and_done = len([c for c in track_counts.values() if c == 1])
shares = [c / total_plays for c in track_counts.values()]
# Top Lists (Optimized: No N+1)
top_tracks = [
{
"name": track_map[tid].name,
"artist": ", ".join([a.name for a in track_map[tid].artists]),
"image": get_track_image(track_map[tid]),
"count": c,
}
for tid, c in sorted(
track_counts.items(), key=lambda x: x[1], reverse=True
)[:5]
]
top_artists = [
{
"name": artist_map[aid]["name"],
"id": aid,
"image": artist_map[aid]["image"],
"count": c,
}
for aid, c in sorted(
artist_counts.items(), key=lambda x: x[1], reverse=True
)[:5]
]
top_albums = [
{
"name": album_map[aid]["name"],
"image": album_map[aid]["image"],
"count": c,
}
for aid, c in sorted(
album_counts.items(), key=lambda x: x[1], reverse=True
)[:5]
]
top_genres = [
{"name": k, "count": v}
for k, v in sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[
:5
]
]
# Concentration Metrics
# HHI: Sum of (share)^2
hhi = sum([s**2 for s in shares])
# Gini Coefficient
sorted_shares = sorted(shares)
n = len(shares)
gini = 0
if n > 0:
gini = (2 * sum((i + 1) * x for i, x in enumerate(sorted_shares))) / (
n * sum(sorted_shares)
) - (n + 1) / n
# Genre Entropy: -SUM(p * log(p))
total_genre_occurrences = sum(genre_counts.values())
genre_entropy = 0
if total_genre_occurrences > 0:
genre_probs = [
count / total_genre_occurrences for count in genre_counts.values()
]
genre_entropy = -sum([p * math.log(p) for p in genre_probs if p > 0])
# Top 5 Share
top_5_plays = sum([t["count"] for t in top_tracks])
top_5_share = top_5_plays / total_plays if total_plays else 0
return {
"total_plays": total_plays,
"estimated_minutes": int(total_ms / 60000),
"unique_tracks": unique_tracks,
"unique_artists": len(artist_counts),
"unique_albums": len(album_counts),
"unique_genres": len(genre_counts),
"top_tracks": top_tracks,
"top_artists": top_artists,
"top_albums": top_albums,
"top_genres": top_genres,
"repeat_rate": round((total_plays - unique_tracks) / total_plays, 3)
if total_plays
else 0,
"one_and_done_rate": round(one_and_done / unique_tracks, 3)
if unique_tracks
else 0,
"concentration": {
"hhi": round(hhi, 4),
"gini": round(gini, 4),
"top_1_share": round(max(shares), 3) if shares else 0,
"top_5_share": round(top_5_share, 3),
"genre_entropy": round(genre_entropy, 2),
},
}
def compute_time_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Includes Part-of-Day buckets, Listening Streaks, Active Days, and 2D Heatmap.
"""
query = (
self.db.query(PlayHistory)
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end,
)
.order_by(PlayHistory.played_at.asc())
)
plays = query.all()
if not plays:
return {}
# Heatmap: 7 days x 24 hours (granular) and 7 days x 6 blocks (compressed)
heatmap = [[0 for _ in range(24)] for _ in range(7)]
# Compressed heatmap: 6 x 4-hour blocks per day
# Blocks: 0-4 (Night), 4-8 (Early Morning), 8-12 (Morning), 12-16 (Afternoon), 16-20 (Evening), 20-24 (Night)
heatmap_compressed = [[0 for _ in range(6)] for _ in range(7)]
block_labels = [
"12am-4am",
"4am-8am",
"8am-12pm",
"12pm-4pm",
"4pm-8pm",
"8pm-12am",
]
hourly_counts = [0] * 24
weekday_counts = [0] * 7
part_of_day = {"morning": 0, "afternoon": 0, "evening": 0, "night": 0}
active_dates = set()
for p in plays:
h = p.played_at.hour
d = p.played_at.weekday()
# Populate Heatmap (granular)
heatmap[d][h] += 1
# Populate compressed heatmap (4-hour blocks)
block_idx = (
h // 4
) # 0-3 -> 0, 4-7 -> 1, 8-11 -> 2, 12-15 -> 3, 16-19 -> 4, 20-23 -> 5
heatmap_compressed[d][block_idx] += 1
hourly_counts[h] += 1
weekday_counts[d] += 1
active_dates.add(p.played_at.date())
if 6 <= h < 12:
part_of_day["morning"] += 1
elif 12 <= h < 18:
part_of_day["afternoon"] += 1
elif 18 <= h <= 23:
part_of_day["evening"] += 1
else:
part_of_day["night"] += 1
# Calculate Streak
sorted_dates = sorted(list(active_dates))
current_streak = 0
longest_streak = 0
if sorted_dates:
current_streak = 1
longest_streak = 1
for i in range(1, len(sorted_dates)):
delta = (sorted_dates[i] - sorted_dates[i - 1]).days
if delta == 1:
current_streak += 1
else:
longest_streak = max(longest_streak, current_streak)
current_streak = 1
longest_streak = max(longest_streak, current_streak)
weekend_plays = weekday_counts[5] + weekday_counts[6]
active_days_count = len(active_dates)
return {
"heatmap": heatmap,
"heatmap_compressed": heatmap_compressed,
"block_labels": block_labels,
"hourly_distribution": hourly_counts,
"peak_hour": hourly_counts.index(max(hourly_counts)),
"weekday_distribution": weekday_counts,
"daily_distribution": weekday_counts,
"weekend_share": round(weekend_plays / len(plays), 2),
"part_of_day": part_of_day,
"listening_streak": current_streak,
"longest_streak": longest_streak,
"active_days": active_days_count,
"avg_plays_per_active_day": round(len(plays) / active_days_count, 1)
if active_days_count
else 0,
}
def compute_session_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Includes Micro-sessions, Marathon sessions, Energy Arcs, Median metrics, and Session List.
"""
query = (
self.db.query(PlayHistory)
.options(joinedload(PlayHistory.track))
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end,
)
.order_by(PlayHistory.played_at.asc())
)
plays = query.all()
if not plays:
return {"count": 0}
sessions = []
current_session = [plays[0]]
# 1. Sessionization (Gap > 20 mins)
for i in range(1, len(plays)):
diff = (plays[i].played_at - plays[i - 1].played_at).total_seconds() / 60
if diff > 20:
sessions.append(current_session)
current_session = []
current_session.append(plays[i])
sessions.append(current_session)
# 2. Analyze Sessions
lengths_min = []
micro_sessions = 0
marathon_sessions = 0
energy_arcs = {"rising": 0, "falling": 0, "flat": 0, "unknown": 0}
start_hour_dist = [0] * 24
session_list = [] # Metadata for timeline
for sess in sessions:
start_t = sess[0].played_at
end_t = sess[-1].played_at
# Start time distribution
start_hour_dist[start_t.hour] += 1
# Durations
if len(sess) > 1:
duration = (end_t - start_t).total_seconds() / 60
lengths_min.append(duration)
else:
duration = 3.0 # Approx single song
lengths_min.append(duration)
# Types
sess_type = "Standard"
if len(sess) <= 3:
micro_sessions += 1
sess_type = "Micro"
elif len(sess) >= 20:
marathon_sessions += 1
sess_type = "Marathon"
# Store Session Metadata
session_list.append(
{
"start_time": start_t.isoformat(),
"end_time": end_t.isoformat(),
"duration_minutes": round(duration, 1),
"track_count": len(sess),
"type": sess_type,
}
)
# Energy Arc
first_t = sess[0].track
last_t = sess[-1].track
if (
first_t
and last_t
and first_t.energy is not None
and last_t.energy is not None
):
diff = last_t.energy - first_t.energy
if diff > 0.1:
energy_arcs["rising"] += 1
elif diff < -0.1:
energy_arcs["falling"] += 1
else:
energy_arcs["flat"] += 1
else:
energy_arcs["unknown"] += 1
avg_min = np.mean(lengths_min) if lengths_min else 0
median_min = np.median(lengths_min) if lengths_min else 0
# Sessions per day
active_days = len(set(p.played_at.date() for p in plays))
sessions_per_day = len(sessions) / active_days if active_days else 0
return {
"count": len(sessions),
"avg_tracks": round(len(plays) / len(sessions), 1),
"avg_minutes": round(float(avg_min), 1),
"median_minutes": round(float(median_min), 1),
"longest_session_minutes": round(max(lengths_min), 1) if lengths_min else 0,
"sessions_per_day": round(sessions_per_day, 1),
"start_hour_distribution": start_hour_dist,
"micro_session_rate": round(micro_sessions / len(sessions), 2),
"marathon_session_rate": round(marathon_sessions / len(sessions), 2),
"energy_arcs": energy_arcs,
"session_list": session_list,
}
def compute_vibe_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Aggregates Audio Features + Calculates Whiplash + Clustering + Harmonic Profile.
"""
plays = (
self.db.query(PlayHistory)
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end,
)
.order_by(PlayHistory.played_at.asc())
.all()
)
if not plays:
return {}
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
# 1. Aggregates
feature_keys = [
"energy",
"valence",
"danceability",
"tempo",
"acousticness",
"instrumentalness",
"liveness",
"speechiness",
"loudness",
]
features = {k: [] for k in feature_keys}
# For Clustering: List of [energy, valence, danceability, acousticness]
cluster_data = []
# For Harmonic & Tempo
keys = []
modes = []
tempo_zones = {"chill": 0, "groove": 0, "hype": 0}
# 2. Transition Arrays (for Whiplash)
transitions = {"tempo": [], "energy": [], "valence": []}
previous_track = None
for i, p in enumerate(plays):
t = track_map.get(p.track_id)
if not t:
continue
# Robust Null Check: Append separately
for key in feature_keys:
val = getattr(t, key, None)
if val is not None:
features[key].append(val)
# Cluster Data (only if all 4 exist)
if all(
getattr(t, k) is not None
for k in ["energy", "valence", "danceability", "acousticness"]
):
cluster_data.append(
[t.energy, t.valence, t.danceability, t.acousticness]
)
# Harmonic
if t.key is not None:
keys.append(t.key)
if t.mode is not None:
modes.append(t.mode)
# Tempo Zones
if t.tempo is not None:
if t.tempo < 100:
tempo_zones["chill"] += 1
elif t.tempo < 130:
tempo_zones["groove"] += 1
else:
tempo_zones["hype"] += 1
# Calculate Transitions (Whiplash)
if i > 0 and previous_track:
time_diff = (p.played_at - plays[i - 1].played_at).total_seconds()
if time_diff < 300: # 5 min gap max
if t.tempo is not None and previous_track.tempo is not None:
transitions["tempo"].append(abs(t.tempo - previous_track.tempo))
if t.energy is not None and previous_track.energy is not None:
transitions["energy"].append(
abs(t.energy - previous_track.energy)
)
if t.valence is not None and previous_track.valence is not None:
transitions["valence"].append(
abs(t.valence - previous_track.valence)
)
previous_track = t
# Calculate Stats (Mean, Std, Percentiles)
stats = {}
for key, values in features.items():
valid = [v for v in values if v is not None]
if valid:
avg_val = float(np.mean(valid))
stats[key] = round(avg_val, 3)
stats[f"avg_{key}"] = avg_val
stats[f"std_{key}"] = float(np.std(valid))
stats[f"p10_{key}"] = float(np.percentile(valid, 10))
stats[f"p50_{key}"] = float(np.percentile(valid, 50))
stats[f"p90_{key}"] = float(np.percentile(valid, 90))
else:
stats[key] = 0.0
stats[f"avg_{key}"] = None
# Derived Metrics
if stats.get("avg_energy") is not None and stats.get("avg_valence") is not None:
stats["mood_quadrant"] = {
"x": round(stats["avg_valence"], 2),
"y": round(stats["avg_energy"], 2),
}
avg_std = (stats.get("std_energy", 0) + stats.get("std_valence", 0)) / 2
stats["consistency_score"] = round(1.0 - avg_std, 2)
if (
stats.get("avg_tempo") is not None
and stats.get("avg_danceability") is not None
):
stats["rhythm_profile"] = {
"avg_tempo": round(stats["avg_tempo"], 1),
"avg_danceability": round(stats["avg_danceability"], 2),
}
if (
stats.get("avg_acousticness") is not None
and stats.get("avg_instrumentalness") is not None
):
stats["texture_profile"] = {
"acousticness": round(stats["avg_acousticness"], 2),
"instrumentalness": round(stats["avg_instrumentalness"], 2),
}
# Whiplash
stats["whiplash"] = {}
for k in ["tempo", "energy", "valence"]:
if transitions[k]:
stats["whiplash"][k] = round(float(np.mean(transitions[k])), 2)
else:
stats["whiplash"][k] = 0
# Tempo Zones
total_tempo = sum(tempo_zones.values())
if total_tempo > 0:
stats["tempo_zones"] = {
k: round(v / total_tempo, 2) for k, v in tempo_zones.items()
}
else:
stats["tempo_zones"] = {}
# Harmonic Profile
if modes:
major_count = len([m for m in modes if m == 1])
stats["harmonic_profile"] = {
"major_pct": round(major_count / len(modes), 2),
"minor_pct": round((len(modes) - major_count) / len(modes), 2),
}
if keys:
# Map integers to pitch class notation
pitch_class = [
"C",
"C#",
"D",
"D#",
"E",
"F",
"F#",
"G",
"G#",
"A",
"A#",
"B",
]
key_counts = {}
for k in keys:
if 0 <= k < 12:
label = pitch_class[k]
key_counts[label] = key_counts.get(label, 0) + 1
stats["top_keys"] = [
{"key": k, "count": v}
for k, v in sorted(
key_counts.items(), key=lambda x: x[1], reverse=True
)[:3]
]
# CLUSTERING (K-Means)
if len(cluster_data) >= 5: # Need enough data points
try:
# Features: energy, valence, danceability, acousticness
kmeans = KMeans(n_clusters=3, random_state=42, n_init=10)
labels = kmeans.fit_predict(cluster_data)
# Analyze clusters
clusters = []
for i in range(3):
mask = labels == i
count = np.sum(mask)
if count == 0:
continue
centroid = kmeans.cluster_centers_[i]
share = count / len(cluster_data)
# Heuristic Naming
c_energy, c_valence, c_dance, c_acoustic = centroid
name = "Mixed Vibe"
if c_energy > 0.7:
name = "High Energy"
elif c_acoustic > 0.7:
name = "Acoustic / Chill"
elif c_valence < 0.3:
name = "Melancholy"
elif c_dance > 0.7:
name = "Dance / Groove"
clusters.append(
{
"name": name,
"share": round(share, 2),
"features": {
"energy": round(c_energy, 2),
"valence": round(c_valence, 2),
"danceability": round(c_dance, 2),
"acousticness": round(c_acoustic, 2),
},
}
)
# Sort by share
stats["clusters"] = sorted(
clusters, key=lambda x: x["share"], reverse=True
)
except Exception as e:
print(f"Clustering failed: {e}")
stats["clusters"] = []
else:
stats["clusters"] = []
return stats
def compute_era_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Includes Nostalgia Gap and granular decade breakdown.
"""
query = (
self.db.query(PlayHistory)
.options(joinedload(PlayHistory.track))
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end,
)
)
plays = query.all()
years = []
for p in plays:
t = p.track
if t and t.raw_data and "album" in t.raw_data:
rd = t.raw_data["album"].get("release_date")
if rd:
try:
years.append(int(rd.split("-")[0]))
except:
pass
if not years:
return {"musical_age": None}
# Musical Age (Weighted Average)
avg_year = sum(years) / len(years)
current_year = datetime.utcnow().year
# Decade Distribution
decades = {}
for y in years:
dec = (y // 10) * 10
label = f"{dec}s"
decades[label] = decades.get(label, 0) + 1
total = len(years)
dist = {k: round(v / total, 3) for k, v in decades.items()}
return {
"musical_age": int(avg_year),
"nostalgia_gap": int(current_year - avg_year),
"freshness_score": dist.get(
f"{int(current_year / 10) * 10}s", 0
), # Share of current decade
"decade_distribution": dist,
}
def compute_skip_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Implements boredom skip detection:
(next_track.played_at - current_track.played_at) < (current_track.duration_ms / 1000 - 10s)
"""
query = (
self.db.query(PlayHistory)
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end,
)
.order_by(PlayHistory.played_at.asc())
)
plays = query.all()
if len(plays) < 2:
return {"skip_rate": 0, "total_skips": 0}
skips = 0
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
for i in range(len(plays) - 1):
current_play = plays[i]
next_play = plays[i + 1]
track = track_map.get(current_play.track_id)
if not track or not track.duration_ms:
continue
diff_seconds = (
next_play.played_at - current_play.played_at
).total_seconds()
# Logic: If diff < (duration - 10s), it's a skip.
# Convert duration to seconds
duration_sec = track.duration_ms / 1000.0
# Also ensure diff isn't negative or weirdly small (re-plays)
# And assume "listening" means diff > 30s at least?
# Spec says "Spotify only returns 30s+".
if diff_seconds < (duration_sec - 10):
skips += 1
return {"total_skips": skips, "skip_rate": round(skips / len(plays), 3)}
def compute_context_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Analyzes context_uri to determine if user listens to Playlists, Albums, or Artists.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start, PlayHistory.played_at <= period_end
)
plays = query.all()
if not plays:
return {}
context_counts = {
"playlist": 0,
"album": 0,
"artist": 0,
"collection": 0,
"unknown": 0,
}
unique_contexts = {}
for p in plays:
if not p.context_uri:
context_counts["unknown"] += 1
continue
# Count distinct contexts for loyalty
unique_contexts[p.context_uri] = unique_contexts.get(p.context_uri, 0) + 1
if "playlist" in p.context_uri:
context_counts["playlist"] += 1
elif "album" in p.context_uri:
context_counts["album"] += 1
elif "artist" in p.context_uri:
context_counts["artist"] += 1
elif "collection" in p.context_uri:
# "Liked Songs" usually shows up as collection
context_counts["collection"] += 1
else:
context_counts["unknown"] += 1
total = len(plays)
breakdown = {k: round(v / total, 2) for k, v in context_counts.items()}
# Top 5 Contexts (Requires resolving URI to name, possibly missing metadata here)
sorted_contexts = sorted(
unique_contexts.items(), key=lambda x: x[1], reverse=True
)[:5]
return {
"type_breakdown": breakdown,
"album_purist_score": breakdown.get("album", 0),
"playlist_dependency": breakdown.get("playlist", 0),
"context_loyalty": round(len(plays) / len(unique_contexts), 2)
if unique_contexts
else 0,
"top_context_uris": [{"uri": k, "count": v} for k, v in sorted_contexts],
}
def compute_taste_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Mainstream vs. Hipster analysis based on Track.popularity (0-100).
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start, PlayHistory.played_at <= period_end
)
plays = query.all()
if not plays:
return {}
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
pop_values = []
for p in plays:
t = track_map.get(p.track_id)
if t and t.popularity is not None:
pop_values.append(t.popularity)
if not pop_values:
return {"avg_popularity": 0, "hipster_score": 0}
avg_pop = float(np.mean(pop_values))
# Hipster Score: Percentage of tracks with popularity < 30
underground_plays = len([x for x in pop_values if x < 30])
mainstream_plays = len([x for x in pop_values if x > 70])
return {
"avg_popularity": round(avg_pop, 1),
"hipster_score": round((underground_plays / len(pop_values)) * 100, 1),
"mainstream_score": round((mainstream_plays / len(pop_values)) * 100, 1),
"obscurity_rating": round(100 - avg_pop, 1),
}
def compute_lifecycle_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Determines if tracks are 'New Discoveries' or 'Old Favorites'.
"""
# 1. Get tracks played in this period
current_plays = (
self.db.query(PlayHistory)
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end,
)
.all()
)
if not current_plays:
return {}
current_track_ids = set([p.track_id for p in current_plays])
# 2. Check if these tracks were played BEFORE period_start
# We find which of the current_track_ids exist in history < period_start
old_tracks_query = self.db.query(distinct(PlayHistory.track_id)).filter(
PlayHistory.track_id.in_(current_track_ids),
PlayHistory.played_at < period_start,
)
old_track_ids = set([r[0] for r in old_tracks_query.all()])
# 3. Calculate Discovery
new_discoveries = current_track_ids - old_track_ids
discovery_count = len(new_discoveries)
# Calculate plays on new discoveries
plays_on_new = len([p for p in current_plays if p.track_id in new_discoveries])
total_plays = len(current_plays)
return {
"discovery_count": discovery_count,
"discovery_rate": round(plays_on_new / total_plays, 3)
if total_plays > 0
else 0,
"recurrence_rate": round((total_plays - plays_on_new) / total_plays, 3)
if total_plays > 0
else 0,
}
def compute_explicit_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Analyzes explicit content consumption.
"""
query = (
self.db.query(PlayHistory)
.options(joinedload(PlayHistory.track))
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end,
)
)
plays = query.all()
if not plays:
return {"explicit_rate": 0, "hourly_explicit_rate": []}
total_plays = len(plays)
explicit_count = 0
hourly_explicit = [0] * 24
hourly_total = [0] * 24
for p in plays:
h = p.played_at.hour
hourly_total[h] += 1
# Check raw_data for explicit flag
t = p.track
is_explicit = False
if t.raw_data and t.raw_data.get("explicit"):
is_explicit = True
if is_explicit:
explicit_count += 1
hourly_explicit[h] += 1
# Calculate hourly percentages
hourly_rates = []
for i in range(24):
if hourly_total[i] > 0:
hourly_rates.append(round(hourly_explicit[i] / hourly_total[i], 2))
else:
hourly_rates.append(0.0)
return {
"explicit_rate": round(explicit_count / total_plays, 3),
"total_explicit_plays": explicit_count,
"hourly_explicit_distribution": hourly_rates,
}
def generate_full_report(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
# 1. Calculate all current stats
current_stats = {
"period": {
"start": period_start.isoformat(),
"end": period_end.isoformat(),
},
"volume": self.compute_volume_stats(period_start, period_end),
"time_habits": self.compute_time_stats(period_start, period_end),
"sessions": self.compute_session_stats(period_start, period_end),
"context": self.compute_context_stats(period_start, period_end),
"vibe": self.compute_vibe_stats(period_start, period_end),
"era": self.compute_era_stats(period_start, period_end),
"taste": self.compute_taste_stats(period_start, period_end),
"lifecycle": self.compute_lifecycle_stats(period_start, period_end),
"flags": self.compute_explicit_stats(period_start, period_end),
"skips": self.compute_skip_stats(period_start, period_end),
}
# 2. Calculate Comparison
current_stats["comparison"] = self.compute_comparison(
current_stats, period_start, period_end
)
return current_stats
def _empty_volume_stats(self):
return {
"total_plays": 0,
"estimated_minutes": 0,
"unique_tracks": 0,
"unique_artists": 0,
"unique_albums": 0,
"unique_genres": 0,
"top_tracks": [],
"top_artists": [],
"top_albums": [],
"top_genres": [],
"repeat_rate": 0,
"one_and_done_rate": 0,
"concentration": {},
}
def _pct_change(self, curr, prev):
if prev == 0:
return 100.0 if curr > 0 else 0.0
return round(((curr - prev) / prev) * 100, 1)