Files
MusicAnalyser/backend/app/services/stats_service.py
bnair123 93e7c13f3d feat: implement AI-curated playlist service and dashboard integration
- Added hierarchical AGENTS.md knowledge base
- Implemented PlaylistService with 6h themed and 24h devotion mix logic
- Integrated AI theme generation for 6h playlists via Gemini/OpenAI
- Added /playlists/refresh and metadata endpoints to API
- Updated background worker with scheduled playlist curation
- Created frontend PlaylistsSection, Tooltip components and integrated into Dashboard
- Added Alembic migration for playlist tracking columns
- Fixed Docker healthcheck with curl installation
2025-12-30 09:45:19 +04:00

997 lines
34 KiB
Python

from sqlalchemy.orm import Session, joinedload
from sqlalchemy import func, distinct
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
import math
import numpy as np
from sklearn.cluster import KMeans
from ..models import PlayHistory, Track, Artist
class StatsService:
def __init__(self, db: Session):
self.db = db
def compute_comparison(
self,
current_stats: Dict[str, Any],
period_start: datetime,
period_end: datetime,
) -> Dict[str, Any]:
duration = period_end - period_start
prev_end = period_start
prev_start = prev_end - duration
prev_volume = self.compute_volume_stats(prev_start, prev_end)
prev_vibe = self.compute_vibe_stats(prev_start, prev_end)
prev_taste = self.compute_taste_stats(prev_start, prev_end)
deltas = {}
curr_plays = current_stats["volume"]["total_plays"]
prev_plays_count = prev_volume["total_plays"]
deltas["plays_delta"] = curr_plays - prev_plays_count
deltas["plays_pct_change"] = self._pct_change(curr_plays, prev_plays_count)
if "mood_quadrant" in current_stats["vibe"] and "mood_quadrant" in prev_vibe:
curr_e = current_stats["vibe"]["mood_quadrant"]["y"]
prev_e = prev_vibe["mood_quadrant"]["y"]
deltas["energy_delta"] = round(curr_e - prev_e, 2)
curr_v = current_stats["vibe"]["mood_quadrant"]["x"]
prev_v = prev_vibe["mood_quadrant"]["x"]
deltas["valence_delta"] = round(curr_v - prev_v, 2)
if (
"avg_popularity" in current_stats["taste"]
and "avg_popularity" in prev_taste
):
deltas["popularity_delta"] = round(
current_stats["taste"]["avg_popularity"] - prev_taste["avg_popularity"],
1,
)
return {
"previous_period": {
"start": prev_start.isoformat(),
"end": prev_end.isoformat(),
},
"deltas": deltas,
}
def compute_volume_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
query = (
self.db.query(PlayHistory)
.options(joinedload(PlayHistory.track).joinedload(Track.artists))
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end,
)
)
plays = query.all()
total_plays = len(plays)
if total_plays == 0:
return self._empty_volume_stats()
total_ms = 0
track_counts = {}
artist_counts = {}
genre_counts = {}
album_counts = {}
track_map = {}
artist_map = {}
album_map = {}
def get_track_image(t):
if t.image_url:
return t.image_url
if t.raw_data and "album" in t.raw_data and "images" in t.raw_data["album"]:
imgs = t.raw_data["album"]["images"]
if imgs:
return imgs[0].get("url")
return None
for p in plays:
t = p.track
if not t:
continue
total_ms += t.duration_ms if t.duration_ms else 0
track_counts[t.id] = track_counts.get(t.id, 0) + 1
track_map[t.id] = t
album_id = t.album
album_name = t.album
if t.raw_data and "album" in t.raw_data:
album_id = t.raw_data["album"].get("id", t.album)
album_name = t.raw_data["album"].get("name", t.album)
album_counts[album_id] = album_counts.get(album_id, 0) + 1
if album_id not in album_map:
album_map[album_id] = {"name": album_name, "image": get_track_image(t)}
for artist in t.artists:
artist_counts[artist.id] = artist_counts.get(artist.id, 0) + 1
if artist.id not in artist_map:
artist_map[artist.id] = {
"name": artist.name,
"image": artist.image_url,
}
if artist.genres:
for g in artist.genres:
genre_counts[g] = genre_counts.get(g, 0) + 1
unique_tracks = len(track_counts)
one_and_done = len([c for c in track_counts.values() if c == 1])
shares = [c / total_plays for c in track_counts.values()]
top_tracks = [
{
"id": tid,
"name": track_map[tid].name,
"artist": ", ".join([a.name for a in track_map[tid].artists]),
"image": get_track_image(track_map[tid]),
"count": c,
}
for tid, c in sorted(
track_counts.items(), key=lambda x: x[1], reverse=True
)[:5]
]
top_artists = [
{
"name": artist_map[aid]["name"],
"id": aid,
"image": artist_map[aid]["image"],
"count": c,
}
for aid, c in sorted(
artist_counts.items(), key=lambda x: x[1], reverse=True
)[:5]
]
top_albums = [
{
"name": album_map[aid]["name"],
"image": album_map[aid]["image"],
"count": c,
}
for aid, c in sorted(
album_counts.items(), key=lambda x: x[1], reverse=True
)[:5]
]
top_genres = [
{"name": k, "count": v}
for k, v in sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[
:5
]
]
hhi = sum([s**2 for s in shares])
sorted_shares = sorted(shares)
n = len(shares)
gini = 0
if n > 0:
gini = (2 * sum((i + 1) * x for i, x in enumerate(sorted_shares))) / (
n * sum(sorted_shares)
) - (n + 1) / n
total_genre_occurrences = sum(genre_counts.values())
genre_entropy = 0
if total_genre_occurrences > 0:
genre_probs = [
count / total_genre_occurrences for count in genre_counts.values()
]
genre_entropy = -sum([p * math.log(p) for p in genre_probs if p > 0])
top_5_plays = sum([t["count"] for t in top_tracks])
top_5_share = top_5_plays / total_plays if total_plays else 0
return {
"total_plays": total_plays,
"estimated_minutes": int(total_ms / 60000),
"unique_tracks": unique_tracks,
"unique_artists": len(artist_counts),
"unique_albums": len(album_counts),
"unique_genres": len(genre_counts),
"top_tracks": top_tracks,
"top_artists": top_artists,
"top_albums": top_albums,
"top_genres": top_genres,
"repeat_rate": round((total_plays - unique_tracks) / total_plays, 3)
if total_plays
else 0,
"one_and_done_rate": round(one_and_done / unique_tracks, 3)
if unique_tracks
else 0,
"concentration": {
"hhi": round(hhi, 4),
"gini": round(gini, 4),
"top_1_share": round(max(shares), 3) if shares else 0,
"top_5_share": round(top_5_share, 3),
"genre_entropy": round(genre_entropy, 2),
},
}
def compute_time_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
query = (
self.db.query(PlayHistory)
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end,
)
.order_by(PlayHistory.played_at.asc())
)
plays = query.all()
if not plays:
return self._empty_time_stats()
heatmap = [[0 for _ in range(24)] for _ in range(7)]
heatmap_compressed = [[0 for _ in range(6)] for _ in range(7)]
block_labels = [
"12am-4am",
"4am-8am",
"8am-12pm",
"12pm-4pm",
"4pm-8pm",
"8pm-12am",
]
hourly_counts = [0] * 24
weekday_counts = [0] * 7
part_of_day = {"morning": 0, "afternoon": 0, "evening": 0, "night": 0}
active_dates = set()
for p in plays:
h = p.played_at.hour
d = p.played_at.weekday()
heatmap[d][h] += 1
block_idx = h // 4
heatmap_compressed[d][block_idx] += 1
hourly_counts[h] += 1
weekday_counts[d] += 1
active_dates.add(p.played_at.date())
if 6 <= h < 12:
part_of_day["morning"] += 1
elif 12 <= h < 18:
part_of_day["afternoon"] += 1
elif 18 <= h <= 23:
part_of_day["evening"] += 1
else:
part_of_day["night"] += 1
sorted_dates = sorted(list(active_dates))
current_streak = 0
longest_streak = 0
if sorted_dates:
current_streak = 1
longest_streak = 1
for i in range(1, len(sorted_dates)):
delta = (sorted_dates[i] - sorted_dates[i - 1]).days
if delta == 1:
current_streak += 1
else:
longest_streak = max(longest_streak, current_streak)
current_streak = 1
longest_streak = max(longest_streak, current_streak)
weekend_plays = weekday_counts[5] + weekday_counts[6]
active_days_count = len(active_dates)
return {
"heatmap": heatmap,
"heatmap_compressed": heatmap_compressed,
"block_labels": block_labels,
"hourly_distribution": hourly_counts,
"peak_hour": hourly_counts.index(max(hourly_counts)),
"weekday_distribution": weekday_counts,
"daily_distribution": weekday_counts,
"weekend_share": round(weekend_plays / len(plays), 2),
"part_of_day": part_of_day,
"listening_streak": current_streak,
"longest_streak": longest_streak,
"active_days": active_days_count,
"avg_plays_per_active_day": round(len(plays) / active_days_count, 1)
if active_days_count
else 0,
}
def compute_session_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
query = (
self.db.query(PlayHistory)
.options(joinedload(PlayHistory.track))
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end,
)
.order_by(PlayHistory.played_at.asc())
)
plays = query.all()
if not plays:
return self._empty_session_stats()
sessions = []
current_session = [plays[0]]
for i in range(1, len(plays)):
diff = (plays[i].played_at - plays[i - 1].played_at).total_seconds() / 60
if diff > 20:
sessions.append(current_session)
current_session = []
current_session.append(plays[i])
sessions.append(current_session)
lengths_min = []
micro_sessions = 0
marathon_sessions = 0
energy_arcs = {"rising": 0, "falling": 0, "flat": 0, "unknown": 0}
start_hour_dist = [0] * 24
session_list = []
for sess in sessions:
start_t = sess[0].played_at
end_t = sess[-1].played_at
start_hour_dist[start_t.hour] += 1
if len(sess) > 1:
duration = (end_t - start_t).total_seconds() / 60
lengths_min.append(duration)
else:
duration = 3.0
lengths_min.append(duration)
sess_type = "Standard"
if len(sess) <= 3:
micro_sessions += 1
sess_type = "Micro"
elif len(sess) >= 20:
marathon_sessions += 1
sess_type = "Marathon"
session_list.append(
{
"start_time": start_t.isoformat(),
"end_time": end_t.isoformat(),
"duration_minutes": round(duration, 1),
"track_count": len(sess),
"type": sess_type,
}
)
first_t = sess[0].track
last_t = sess[-1].track
if (
first_t
and last_t
and getattr(first_t, "energy", None) is not None
and getattr(last_t, "energy", None) is not None
):
diff = last_t.energy - first_t.energy
if diff > 0.1:
energy_arcs["rising"] += 1
elif diff < -0.1:
energy_arcs["falling"] += 1
else:
energy_arcs["flat"] += 1
else:
energy_arcs["unknown"] += 1
avg_min = np.mean(lengths_min) if lengths_min else 0
median_min = np.median(lengths_min) if lengths_min else 0
active_days = len(set(p.played_at.date() for p in plays))
sessions_per_day = len(sessions) / active_days if active_days else 0
return {
"count": len(sessions),
"avg_tracks": round(len(plays) / len(sessions), 1),
"avg_minutes": round(float(avg_min), 1),
"median_minutes": round(float(median_min), 1),
"longest_session_minutes": round(max(lengths_min), 1) if lengths_min else 0,
"sessions_per_day": round(sessions_per_day, 1),
"start_hour_distribution": start_hour_dist,
"micro_session_rate": round(micro_sessions / len(sessions), 2),
"marathon_session_rate": round(marathon_sessions / len(sessions), 2),
"energy_arcs": energy_arcs,
"session_list": session_list,
}
def compute_vibe_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
plays = (
self.db.query(PlayHistory)
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end,
)
.order_by(PlayHistory.played_at.asc())
.all()
)
if not plays:
return self._empty_vibe_stats()
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
feature_keys = [
"energy",
"valence",
"danceability",
"tempo",
"acousticness",
"instrumentalness",
"liveness",
"speechiness",
"loudness",
]
features = {k: [] for k in feature_keys}
cluster_data = []
keys = []
modes = []
tempo_zones = {"chill": 0, "groove": 0, "hype": 0}
transitions = {"tempo": [], "energy": [], "valence": []}
previous_track = None
for i, p in enumerate(plays):
t = track_map.get(p.track_id)
if not t:
continue
for key in feature_keys:
val = getattr(t, key, None)
if val is not None:
features[key].append(val)
if all(
getattr(t, k, None) is not None
for k in ["energy", "valence", "danceability", "acousticness"]
):
cluster_data.append(
[t.energy, t.valence, t.danceability, t.acousticness]
)
if getattr(t, "key", None) is not None:
keys.append(t.key)
if getattr(t, "mode", None) is not None:
modes.append(t.mode)
if getattr(t, "tempo", None) is not None:
if t.tempo < 100:
tempo_zones["chill"] += 1
elif t.tempo < 130:
tempo_zones["groove"] += 1
else:
tempo_zones["hype"] += 1
if i > 0 and previous_track:
time_diff = (p.played_at - plays[i - 1].played_at).total_seconds()
if time_diff < 300:
if (
getattr(t, "tempo", None) is not None
and getattr(previous_track, "tempo", None) is not None
):
transitions["tempo"].append(abs(t.tempo - previous_track.tempo))
if (
getattr(t, "energy", None) is not None
and getattr(previous_track, "energy", None) is not None
):
transitions["energy"].append(
abs(t.energy - previous_track.energy)
)
if (
getattr(t, "valence", None) is not None
and getattr(previous_track, "valence", None) is not None
):
transitions["valence"].append(
abs(t.valence - previous_track.valence)
)
previous_track = t
stats_res = {}
for key, values in features.items():
valid = [v for v in values if v is not None]
if valid:
avg_val = float(np.mean(valid))
stats_res[key] = round(avg_val, 3)
stats_res[f"avg_{key}"] = avg_val
stats_res[f"std_{key}"] = float(np.std(valid))
stats_res[f"p10_{key}"] = float(np.percentile(valid, 10))
stats_res[f"p50_{key}"] = float(np.percentile(valid, 50))
stats_res[f"p90_{key}"] = float(np.percentile(valid, 90))
else:
stats_res[key] = 0.0
stats_res[f"avg_{key}"] = None
if (
stats_res.get("avg_energy") is not None
and stats_res.get("avg_valence") is not None
):
stats_res["mood_quadrant"] = {
"x": round(stats_res["avg_valence"], 2),
"y": round(stats_res["avg_energy"], 2),
}
avg_std = (
stats_res.get("std_energy", 0) + stats_res.get("std_valence", 0)
) / 2
stats_res["consistency_score"] = round(1.0 - avg_std, 2)
if (
stats_res.get("avg_tempo") is not None
and stats_res.get("avg_danceability") is not None
):
stats_res["rhythm_profile"] = {
"avg_tempo": round(stats_res["avg_tempo"], 1),
"avg_danceability": round(stats_res["avg_danceability"], 2),
}
if (
stats_res.get("avg_acousticness") is not None
and stats_res.get("avg_instrumentalness") is not None
):
stats_res["texture_profile"] = {
"acousticness": round(stats_res["avg_acousticness"], 2),
"instrumentalness": round(stats_res["avg_instrumentalness"], 2),
}
stats_res["whiplash"] = {}
for k in ["tempo", "energy", "valence"]:
if transitions[k]:
stats_res["whiplash"][k] = round(float(np.mean(transitions[k])), 2)
else:
stats_res["whiplash"][k] = 0
total_tempo = sum(tempo_zones.values())
if total_tempo > 0:
stats_res["tempo_zones"] = {
k: round(v / total_tempo, 2) for k, v in tempo_zones.items()
}
else:
stats_res["tempo_zones"] = {}
if modes:
major_count = len([m for m in modes if m == 1])
stats_res["harmonic_profile"] = {
"major_pct": round(major_count / len(modes), 2),
"minor_pct": round((len(modes) - major_count) / len(modes), 2),
}
if keys:
pitch_class = [
"C",
"C#",
"D",
"D#",
"E",
"F",
"F#",
"G",
"G#",
"A",
"A#",
"B",
]
key_counts = {}
for k in keys:
if 0 <= k < 12:
label = pitch_class[k]
key_counts[label] = key_counts.get(label, 0) + 1
stats_res["top_keys"] = [
{"key": k, "count": v}
for k, v in sorted(
key_counts.items(), key=lambda x: x[1], reverse=True
)[:3]
]
if len(cluster_data) >= 5:
try:
kmeans = KMeans(n_clusters=3, random_state=42, n_init="auto")
labels = kmeans.fit_predict(cluster_data)
clusters = []
for i in range(3):
mask = labels == i
count = np.sum(mask)
if count == 0:
continue
centroid = kmeans.cluster_centers_[i]
share = count / len(cluster_data)
c_energy, c_valence, c_dance, c_acoustic = centroid
name = "Mixed Vibe"
if c_energy > 0.7:
name = "High Energy"
elif c_acoustic > 0.7:
name = "Acoustic / Chill"
elif c_valence < 0.3:
name = "Melancholy"
elif c_dance > 0.7:
name = "Dance / Groove"
clusters.append(
{
"name": name,
"share": round(share, 2),
"features": {
"energy": round(c_energy, 2),
"valence": round(c_valence, 2),
"danceability": round(c_dance, 2),
"acousticness": round(c_acoustic, 2),
},
}
)
stats_res["clusters"] = sorted(
clusters, key=lambda x: x["share"], reverse=True
)
except Exception as e:
print(f"Clustering failed: {e}")
stats_res["clusters"] = []
else:
stats_res["clusters"] = []
return stats_res
def compute_era_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
query = (
self.db.query(PlayHistory)
.options(joinedload(PlayHistory.track))
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end,
)
)
plays = query.all()
years = []
for p in plays:
t = p.track
if t and t.raw_data and "album" in t.raw_data:
rd = t.raw_data["album"].get("release_date")
if rd:
try:
years.append(int(rd.split("-")[0]))
except:
pass
if not years:
return {"musical_age": None}
avg_year = sum(years) / len(years)
current_year = datetime.utcnow().year
decades = {}
for y in years:
dec = (y // 10) * 10
label = f"{dec}s"
decades[label] = decades.get(label, 0) + 1
total = len(years)
dist = {k: round(v / total, 3) for k, v in decades.items()}
return {
"musical_age": int(avg_year),
"nostalgia_gap": int(current_year - avg_year),
"freshness_score": dist.get(f"{int(current_year / 10) * 10}s", 0),
"decade_distribution": dist,
}
def compute_skip_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
query = (
self.db.query(PlayHistory)
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end,
)
.order_by(PlayHistory.played_at.asc())
)
plays = query.all()
if len(plays) < 2:
return {"skip_rate": 0, "total_skips": 0}
skips = 0
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
for i in range(len(plays) - 1):
current_play = plays[i]
next_play = plays[i + 1]
track = track_map.get(current_play.track_id)
if not track or not getattr(track, "duration_ms", None):
continue
diff_seconds = (
next_play.played_at - current_play.played_at
).total_seconds()
duration_sec = track.duration_ms / 1000.0
if diff_seconds < (duration_sec - 10):
skips += 1
return {"total_skips": skips, "skip_rate": round(skips / len(plays), 3)}
def compute_context_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start, PlayHistory.played_at <= period_end
)
plays = query.all()
if not plays:
return {}
context_counts = {
"playlist": 0,
"album": 0,
"artist": 0,
"collection": 0,
"unknown": 0,
}
unique_contexts = {}
for p in plays:
if not p.context_uri:
context_counts["unknown"] += 1
continue
unique_contexts[p.context_uri] = unique_contexts.get(p.context_uri, 0) + 1
if "playlist" in p.context_uri:
context_counts["playlist"] += 1
elif "album" in p.context_uri:
context_counts["album"] += 1
elif "artist" in p.context_uri:
context_counts["artist"] += 1
elif "collection" in p.context_uri:
context_counts["collection"] += 1
else:
context_counts["unknown"] += 1
total = len(plays)
breakdown = {k: round(v / total, 2) for k, v in context_counts.items()}
sorted_contexts = sorted(
unique_contexts.items(), key=lambda x: x[1], reverse=True
)[:5]
return {
"type_breakdown": breakdown,
"album_purist_score": breakdown.get("album", 0),
"playlist_dependency": breakdown.get("playlist", 0),
"context_loyalty": round(len(plays) / len(unique_contexts), 2)
if unique_contexts
else 0,
"top_context_uris": [{"uri": k, "count": v} for k, v in sorted_contexts],
}
def compute_taste_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start, PlayHistory.played_at <= period_end
)
plays = query.all()
if not plays:
return {}
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
pop_values = []
for p in plays:
t = track_map.get(p.track_id)
if t and getattr(t, "popularity", None) is not None:
pop_values.append(t.popularity)
if not pop_values:
return {"avg_popularity": 0, "hipster_score": 0}
avg_pop = float(np.mean(pop_values))
underground_plays = len([x for x in pop_values if x < 30])
mainstream_plays = len([x for x in pop_values if x > 70])
return {
"avg_popularity": round(avg_pop, 1),
"hipster_score": round((underground_plays / len(pop_values)) * 100, 1),
"mainstream_score": round((mainstream_plays / len(pop_values)) * 100, 1),
"obscurity_rating": round(100 - avg_pop, 1),
}
def compute_lifecycle_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
current_plays = (
self.db.query(PlayHistory)
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end,
)
.all()
)
if not current_plays:
return {}
current_track_ids = set([p.track_id for p in current_plays])
old_tracks_query = self.db.query(distinct(PlayHistory.track_id)).filter(
PlayHistory.track_id.in_(current_track_ids),
PlayHistory.played_at < period_start,
)
old_track_ids = set([r[0] for r in old_tracks_query.all()])
new_discoveries = current_track_ids - old_track_ids
discovery_count = len(new_discoveries)
plays_on_new = len([p for p in current_plays if p.track_id in new_discoveries])
total_plays = len(current_plays)
return {
"discovery_count": discovery_count,
"discovery_rate": round(plays_on_new / total_plays, 3)
if total_plays > 0
else 0,
"recurrence_rate": round((total_plays - plays_on_new) / total_plays, 3)
if total_plays > 0
else 0,
}
def compute_explicit_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
query = (
self.db.query(PlayHistory)
.options(joinedload(PlayHistory.track))
.filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end,
)
)
plays = query.all()
if not plays:
return {"explicit_rate": 0, "hourly_explicit_distribution": []}
total_plays = len(plays)
explicit_count = 0
hourly_explicit = [0] * 24
hourly_total = [0] * 24
for p in plays:
h = p.played_at.hour
hourly_total[h] += 1
t = p.track
if t and t.raw_data and t.raw_data.get("explicit"):
explicit_count += 1
hourly_explicit[h] += 1
hourly_rates = []
for i in range(24):
if hourly_total[i] > 0:
hourly_rates.append(round(hourly_explicit[i] / hourly_total[i], 2))
else:
hourly_rates.append(0.0)
return {
"explicit_rate": round(explicit_count / total_plays, 3),
"total_explicit_plays": explicit_count,
"hourly_explicit_distribution": hourly_rates,
}
def generate_full_report(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
current_stats = {
"period": {
"start": period_start.isoformat(),
"end": period_end.isoformat(),
},
"volume": self.compute_volume_stats(period_start, period_end),
"time_habits": self.compute_time_stats(period_start, period_end),
"sessions": self.compute_session_stats(period_start, period_end),
"context": self.compute_context_stats(period_start, period_end),
"vibe": self.compute_vibe_stats(period_start, period_end),
"era": self.compute_era_stats(period_start, period_end),
"taste": self.compute_taste_stats(period_start, period_end),
"lifecycle": self.compute_lifecycle_stats(period_start, period_end),
"flags": self.compute_explicit_stats(period_start, period_end),
"skips": self.compute_skip_stats(period_start, period_end),
}
current_stats["comparison"] = self.compute_comparison(
current_stats, period_start, period_end
)
return current_stats
def _empty_volume_stats(self):
return {
"total_plays": 0,
"estimated_minutes": 0,
"unique_tracks": 0,
"unique_artists": 0,
"unique_albums": 0,
"unique_genres": 0,
"top_tracks": [],
"top_artists": [],
"top_albums": [],
"top_genres": [],
"repeat_rate": 0,
"one_and_done_rate": 0,
"concentration": {
"hhi": 0,
"gini": 0,
"top_1_share": 0,
"top_5_share": 0,
"genre_entropy": 0,
},
}
def _empty_time_stats(self):
return {
"heatmap": [],
"heatmap_compressed": [],
"block_labels": [],
"hourly_distribution": [0] * 24,
"peak_hour": None,
"weekday_distribution": [0] * 7,
"daily_distribution": [0] * 7,
"weekend_share": 0,
"part_of_day": {"morning": 0, "afternoon": 0, "evening": 0, "night": 0},
"listening_streak": 0,
"longest_streak": 0,
"active_days": 0,
"avg_plays_per_active_day": 0,
}
def _empty_session_stats(self):
return {
"count": 0,
"avg_tracks": 0,
"avg_minutes": 0,
"median_minutes": 0,
"longest_session_minutes": 0,
"sessions_per_day": 0,
"start_hour_distribution": [0] * 24,
"micro_session_rate": 0,
"marathon_session_rate": 0,
"energy_arcs": {"rising": 0, "falling": 0, "flat": 0, "unknown": 0},
"session_list": [],
}
def _empty_vibe_stats(self):
return {
"avg_energy": 0,
"avg_valence": 0,
"mood_quadrant": {"x": 0, "y": 0},
"clusters": [],
}
def _pct_change(self, curr, prev):
if prev == 0:
return 100.0 if curr > 0 else 0.0
return round(((curr - prev) / prev) * 100, 1)