feat: implement AI-curated playlist service and dashboard integration

- Added hierarchical AGENTS.md knowledge base
- Implemented PlaylistService with 6h themed and 24h devotion mix logic
- Integrated AI theme generation for 6h playlists via Gemini/OpenAI
- Added /playlists/refresh and metadata endpoints to API
- Updated background worker with scheduled playlist curation
- Created frontend PlaylistsSection, Tooltip components and integrated into Dashboard
- Added Alembic migration for playlist tracking columns
- Fixed Docker healthcheck with curl installation
This commit is contained in:
bnair123
2025-12-30 09:45:19 +04:00
parent fa28b98c1a
commit 93e7c13f3d
18 changed files with 1037 additions and 295 deletions

View File

@@ -19,27 +19,21 @@ class StatsService:
period_start: datetime,
period_end: datetime,
) -> Dict[str, Any]:
"""
Calculates deltas vs the previous period of the same length.
"""
duration = period_end - period_start
prev_end = period_start
prev_start = prev_end - duration
# We only need key metrics for comparison
prev_volume = self.compute_volume_stats(prev_start, prev_end)
prev_vibe = self.compute_vibe_stats(prev_start, prev_end)
prev_taste = self.compute_taste_stats(prev_start, prev_end)
deltas = {}
# Plays
curr_plays = current_stats["volume"]["total_plays"]
prev_plays_count = prev_volume["total_plays"]
deltas["plays_delta"] = curr_plays - prev_plays_count
deltas["plays_pct_change"] = self._pct_change(curr_plays, prev_plays_count)
# Energy & Valence
if "mood_quadrant" in current_stats["vibe"] and "mood_quadrant" in prev_vibe:
curr_e = current_stats["vibe"]["mood_quadrant"]["y"]
prev_e = prev_vibe["mood_quadrant"]["y"]
@@ -49,7 +43,6 @@ class StatsService:
prev_v = prev_vibe["mood_quadrant"]["x"]
deltas["valence_delta"] = round(curr_v - prev_v, 2)
# Popularity
if (
"avg_popularity" in current_stats["taste"]
and "avg_popularity" in prev_taste
@@ -70,11 +63,6 @@ class StatsService:
def compute_volume_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Calculates volume metrics including Concentration (HHI, Gini, Entropy) and Top Lists.
"""
# Eager load tracks AND artists to fix the "Artist String Problem" and performance
# Use < period_end for half-open interval to avoid double counting boundaries
query = (
self.db.query(PlayHistory)
.options(joinedload(PlayHistory.track).joinedload(Track.artists))
@@ -95,12 +83,10 @@ class StatsService:
genre_counts = {}
album_counts = {}
# Maps for resolving names/images later without DB hits
track_map = {}
artist_map = {}
album_map = {}
# Helper to safely get image
def get_track_image(t):
if t.image_url:
return t.image_url
@@ -116,13 +102,9 @@ class StatsService:
continue
total_ms += t.duration_ms if t.duration_ms else 0
# Track Aggregation
track_counts[t.id] = track_counts.get(t.id, 0) + 1
track_map[t.id] = t
# Album Aggregation
# Prefer ID from raw_data, fallback to name
album_id = t.album
album_name = t.album
if t.raw_data and "album" in t.raw_data:
@@ -130,11 +112,9 @@ class StatsService:
album_name = t.raw_data["album"].get("name", t.album)
album_counts[album_id] = album_counts.get(album_id, 0) + 1
# Store tuple of (name, image_url)
if album_id not in album_map:
album_map[album_id] = {"name": album_name, "image": get_track_image(t)}
# Artist Aggregation (Iterate objects, not string)
for artist in t.artists:
artist_counts[artist.id] = artist_counts.get(artist.id, 0) + 1
if artist.id not in artist_map:
@@ -143,20 +123,17 @@ class StatsService:
"image": artist.image_url,
}
# Genre Aggregation
if artist.genres:
# artist.genres is a JSON list of strings
for g in artist.genres:
genre_counts[g] = genre_counts.get(g, 0) + 1
# Derived Metrics
unique_tracks = len(track_counts)
one_and_done = len([c for c in track_counts.values() if c == 1])
shares = [c / total_plays for c in track_counts.values()]
# Top Lists (Optimized: No N+1)
top_tracks = [
{
"id": tid,
"name": track_map[tid].name,
"artist": ", ".join([a.name for a in track_map[tid].artists]),
"image": get_track_image(track_map[tid]),
@@ -197,11 +174,8 @@ class StatsService:
]
]
# Concentration Metrics
# HHI: Sum of (share)^2
hhi = sum([s**2 for s in shares])
# Gini Coefficient
sorted_shares = sorted(shares)
n = len(shares)
gini = 0
@@ -210,7 +184,6 @@ class StatsService:
n * sum(sorted_shares)
) - (n + 1) / n
# Genre Entropy: -SUM(p * log(p))
total_genre_occurrences = sum(genre_counts.values())
genre_entropy = 0
if total_genre_occurrences > 0:
@@ -219,7 +192,6 @@ class StatsService:
]
genre_entropy = -sum([p * math.log(p) for p in genre_probs if p > 0])
# Top 5 Share
top_5_plays = sum([t["count"] for t in top_tracks])
top_5_share = top_5_plays / total_plays if total_plays else 0
@@ -252,9 +224,6 @@ class StatsService:
def compute_time_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Includes Part-of-Day buckets, Listening Streaks, Active Days, and 2D Heatmap.
"""
query = (
self.db.query(PlayHistory)
.filter(
@@ -266,12 +235,9 @@ class StatsService:
plays = query.all()
if not plays:
return {}
return self._empty_time_stats()
# Heatmap: 7 days x 24 hours (granular) and 7 days x 6 blocks (compressed)
heatmap = [[0 for _ in range(24)] for _ in range(7)]
# Compressed heatmap: 6 x 4-hour blocks per day
# Blocks: 0-4 (Night), 4-8 (Early Morning), 8-12 (Morning), 12-16 (Afternoon), 16-20 (Evening), 20-24 (Night)
heatmap_compressed = [[0 for _ in range(6)] for _ in range(7)]
block_labels = [
"12am-4am",
@@ -292,13 +258,8 @@ class StatsService:
h = p.played_at.hour
d = p.played_at.weekday()
# Populate Heatmap (granular)
heatmap[d][h] += 1
# Populate compressed heatmap (4-hour blocks)
block_idx = (
h // 4
) # 0-3 -> 0, 4-7 -> 1, 8-11 -> 2, 12-15 -> 3, 16-19 -> 4, 20-23 -> 5
block_idx = h // 4
heatmap_compressed[d][block_idx] += 1
hourly_counts[h] += 1
@@ -314,7 +275,6 @@ class StatsService:
else:
part_of_day["night"] += 1
# Calculate Streak
sorted_dates = sorted(list(active_dates))
current_streak = 0
longest_streak = 0
@@ -354,9 +314,6 @@ class StatsService:
def compute_session_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Includes Micro-sessions, Marathon sessions, Energy Arcs, Median metrics, and Session List.
"""
query = (
self.db.query(PlayHistory)
.options(joinedload(PlayHistory.track))
@@ -369,12 +326,11 @@ class StatsService:
plays = query.all()
if not plays:
return {"count": 0}
return self._empty_session_stats()
sessions = []
current_session = [plays[0]]
# 1. Sessionization (Gap > 20 mins)
for i in range(1, len(plays)):
diff = (plays[i].played_at - plays[i - 1].played_at).total_seconds() / 60
if diff > 20:
@@ -383,31 +339,26 @@ class StatsService:
current_session.append(plays[i])
sessions.append(current_session)
# 2. Analyze Sessions
lengths_min = []
micro_sessions = 0
marathon_sessions = 0
energy_arcs = {"rising": 0, "falling": 0, "flat": 0, "unknown": 0}
start_hour_dist = [0] * 24
session_list = [] # Metadata for timeline
session_list = []
for sess in sessions:
start_t = sess[0].played_at
end_t = sess[-1].played_at
# Start time distribution
start_hour_dist[start_t.hour] += 1
# Durations
if len(sess) > 1:
duration = (end_t - start_t).total_seconds() / 60
lengths_min.append(duration)
else:
duration = 3.0 # Approx single song
duration = 3.0
lengths_min.append(duration)
# Types
sess_type = "Standard"
if len(sess) <= 3:
micro_sessions += 1
@@ -416,7 +367,6 @@ class StatsService:
marathon_sessions += 1
sess_type = "Marathon"
# Store Session Metadata
session_list.append(
{
"start_time": start_t.isoformat(),
@@ -427,14 +377,13 @@ class StatsService:
}
)
# Energy Arc
first_t = sess[0].track
last_t = sess[-1].track
if (
first_t
and last_t
and first_t.energy is not None
and last_t.energy is not None
and getattr(first_t, "energy", None) is not None
and getattr(last_t, "energy", None) is not None
):
diff = last_t.energy - first_t.energy
if diff > 0.1:
@@ -448,8 +397,6 @@ class StatsService:
avg_min = np.mean(lengths_min) if lengths_min else 0
median_min = np.median(lengths_min) if lengths_min else 0
# Sessions per day
active_days = len(set(p.played_at.date() for p in plays))
sessions_per_day = len(sessions) / active_days if active_days else 0
@@ -470,9 +417,6 @@ class StatsService:
def compute_vibe_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Aggregates Audio Features + Calculates Whiplash + Clustering + Harmonic Profile.
"""
plays = (
self.db.query(PlayHistory)
.filter(
@@ -484,13 +428,12 @@ class StatsService:
)
if not plays:
return {}
return self._empty_vibe_stats()
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
# 1. Aggregates
feature_keys = [
"energy",
"valence",
@@ -503,18 +446,11 @@ class StatsService:
"loudness",
]
features = {k: [] for k in feature_keys}
# For Clustering: List of [energy, valence, danceability, acousticness]
cluster_data = []
# For Harmonic & Tempo
keys = []
modes = []
tempo_zones = {"chill": 0, "groove": 0, "hype": 0}
# 2. Transition Arrays (for Whiplash)
transitions = {"tempo": [], "energy": [], "valence": []}
previous_track = None
for i, p in enumerate(plays):
@@ -522,29 +458,25 @@ class StatsService:
if not t:
continue
# Robust Null Check: Append separately
for key in feature_keys:
val = getattr(t, key, None)
if val is not None:
features[key].append(val)
# Cluster Data (only if all 4 exist)
if all(
getattr(t, k) is not None
getattr(t, k, None) is not None
for k in ["energy", "valence", "danceability", "acousticness"]
):
cluster_data.append(
[t.energy, t.valence, t.danceability, t.acousticness]
)
# Harmonic
if t.key is not None:
if getattr(t, "key", None) is not None:
keys.append(t.key)
if t.mode is not None:
if getattr(t, "mode", None) is not None:
modes.append(t.mode)
# Tempo Zones
if t.tempo is not None:
if getattr(t, "tempo", None) is not None:
if t.tempo < 100:
tempo_zones["chill"] += 1
elif t.tempo < 130:
@@ -552,93 +484,100 @@ class StatsService:
else:
tempo_zones["hype"] += 1
# Calculate Transitions (Whiplash)
if i > 0 and previous_track:
time_diff = (p.played_at - plays[i - 1].played_at).total_seconds()
if time_diff < 300: # 5 min gap max
if t.tempo is not None and previous_track.tempo is not None:
if time_diff < 300:
if (
getattr(t, "tempo", None) is not None
and getattr(previous_track, "tempo", None) is not None
):
transitions["tempo"].append(abs(t.tempo - previous_track.tempo))
if t.energy is not None and previous_track.energy is not None:
if (
getattr(t, "energy", None) is not None
and getattr(previous_track, "energy", None) is not None
):
transitions["energy"].append(
abs(t.energy - previous_track.energy)
)
if t.valence is not None and previous_track.valence is not None:
if (
getattr(t, "valence", None) is not None
and getattr(previous_track, "valence", None) is not None
):
transitions["valence"].append(
abs(t.valence - previous_track.valence)
)
previous_track = t
# Calculate Stats (Mean, Std, Percentiles)
stats = {}
stats_res = {}
for key, values in features.items():
valid = [v for v in values if v is not None]
if valid:
avg_val = float(np.mean(valid))
stats[key] = round(avg_val, 3)
stats[f"avg_{key}"] = avg_val
stats[f"std_{key}"] = float(np.std(valid))
stats[f"p10_{key}"] = float(np.percentile(valid, 10))
stats[f"p50_{key}"] = float(np.percentile(valid, 50))
stats[f"p90_{key}"] = float(np.percentile(valid, 90))
stats_res[key] = round(avg_val, 3)
stats_res[f"avg_{key}"] = avg_val
stats_res[f"std_{key}"] = float(np.std(valid))
stats_res[f"p10_{key}"] = float(np.percentile(valid, 10))
stats_res[f"p50_{key}"] = float(np.percentile(valid, 50))
stats_res[f"p90_{key}"] = float(np.percentile(valid, 90))
else:
stats[key] = 0.0
stats[f"avg_{key}"] = None
# Derived Metrics
if stats.get("avg_energy") is not None and stats.get("avg_valence") is not None:
stats["mood_quadrant"] = {
"x": round(stats["avg_valence"], 2),
"y": round(stats["avg_energy"], 2),
}
avg_std = (stats.get("std_energy", 0) + stats.get("std_valence", 0)) / 2
stats["consistency_score"] = round(1.0 - avg_std, 2)
stats_res[key] = 0.0
stats_res[f"avg_{key}"] = None
if (
stats.get("avg_tempo") is not None
and stats.get("avg_danceability") is not None
stats_res.get("avg_energy") is not None
and stats_res.get("avg_valence") is not None
):
stats["rhythm_profile"] = {
"avg_tempo": round(stats["avg_tempo"], 1),
"avg_danceability": round(stats["avg_danceability"], 2),
stats_res["mood_quadrant"] = {
"x": round(stats_res["avg_valence"], 2),
"y": round(stats_res["avg_energy"], 2),
}
avg_std = (
stats_res.get("std_energy", 0) + stats_res.get("std_valence", 0)
) / 2
stats_res["consistency_score"] = round(1.0 - avg_std, 2)
if (
stats_res.get("avg_tempo") is not None
and stats_res.get("avg_danceability") is not None
):
stats_res["rhythm_profile"] = {
"avg_tempo": round(stats_res["avg_tempo"], 1),
"avg_danceability": round(stats_res["avg_danceability"], 2),
}
if (
stats.get("avg_acousticness") is not None
and stats.get("avg_instrumentalness") is not None
stats_res.get("avg_acousticness") is not None
and stats_res.get("avg_instrumentalness") is not None
):
stats["texture_profile"] = {
"acousticness": round(stats["avg_acousticness"], 2),
"instrumentalness": round(stats["avg_instrumentalness"], 2),
stats_res["texture_profile"] = {
"acousticness": round(stats_res["avg_acousticness"], 2),
"instrumentalness": round(stats_res["avg_instrumentalness"], 2),
}
# Whiplash
stats["whiplash"] = {}
stats_res["whiplash"] = {}
for k in ["tempo", "energy", "valence"]:
if transitions[k]:
stats["whiplash"][k] = round(float(np.mean(transitions[k])), 2)
stats_res["whiplash"][k] = round(float(np.mean(transitions[k])), 2)
else:
stats["whiplash"][k] = 0
stats_res["whiplash"][k] = 0
# Tempo Zones
total_tempo = sum(tempo_zones.values())
if total_tempo > 0:
stats["tempo_zones"] = {
stats_res["tempo_zones"] = {
k: round(v / total_tempo, 2) for k, v in tempo_zones.items()
}
else:
stats["tempo_zones"] = {}
stats_res["tempo_zones"] = {}
# Harmonic Profile
if modes:
major_count = len([m for m in modes if m == 1])
stats["harmonic_profile"] = {
stats_res["harmonic_profile"] = {
"major_pct": round(major_count / len(modes), 2),
"minor_pct": round((len(modes) - major_count) / len(modes), 2),
}
if keys:
# Map integers to pitch class notation
pitch_class = [
"C",
"C#",
@@ -658,32 +597,25 @@ class StatsService:
if 0 <= k < 12:
label = pitch_class[k]
key_counts[label] = key_counts.get(label, 0) + 1
stats["top_keys"] = [
stats_res["top_keys"] = [
{"key": k, "count": v}
for k, v in sorted(
key_counts.items(), key=lambda x: x[1], reverse=True
)[:3]
]
# CLUSTERING (K-Means)
if len(cluster_data) >= 5: # Need enough data points
if len(cluster_data) >= 5:
try:
# Features: energy, valence, danceability, acousticness
kmeans = KMeans(n_clusters=3, random_state=42, n_init=10)
kmeans = KMeans(n_clusters=3, random_state=42, n_init="auto")
labels = kmeans.fit_predict(cluster_data)
# Analyze clusters
clusters = []
for i in range(3):
mask = labels == i
count = np.sum(mask)
if count == 0:
continue
centroid = kmeans.cluster_centers_[i]
share = count / len(cluster_data)
# Heuristic Naming
c_energy, c_valence, c_dance, c_acoustic = centroid
name = "Mixed Vibe"
if c_energy > 0.7:
@@ -694,7 +626,6 @@ class StatsService:
name = "Melancholy"
elif c_dance > 0.7:
name = "Dance / Groove"
clusters.append(
{
"name": name,
@@ -707,25 +638,20 @@ class StatsService:
},
}
)
# Sort by share
stats["clusters"] = sorted(
stats_res["clusters"] = sorted(
clusters, key=lambda x: x["share"], reverse=True
)
except Exception as e:
print(f"Clustering failed: {e}")
stats["clusters"] = []
stats_res["clusters"] = []
else:
stats["clusters"] = []
stats_res["clusters"] = []
return stats
return stats_res
def compute_era_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Includes Nostalgia Gap and granular decade breakdown.
"""
query = (
self.db.query(PlayHistory)
.options(joinedload(PlayHistory.track))
@@ -750,11 +676,9 @@ class StatsService:
if not years:
return {"musical_age": None}
# Musical Age (Weighted Average)
avg_year = sum(years) / len(years)
current_year = datetime.utcnow().year
# Decade Distribution
decades = {}
for y in years:
dec = (y // 10) * 10
@@ -767,19 +691,13 @@ class StatsService:
return {
"musical_age": int(avg_year),
"nostalgia_gap": int(current_year - avg_year),
"freshness_score": dist.get(
f"{int(current_year / 10) * 10}s", 0
), # Share of current decade
"freshness_score": dist.get(f"{int(current_year / 10) * 10}s", 0),
"decade_distribution": dist,
}
def compute_skip_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Implements boredom skip detection:
(next_track.played_at - current_track.played_at) < (current_track.duration_ms / 1000 - 10s)
"""
query = (
self.db.query(PlayHistory)
.filter(
@@ -803,21 +721,14 @@ class StatsService:
next_play = plays[i + 1]
track = track_map.get(current_play.track_id)
if not track or not track.duration_ms:
if not track or not getattr(track, "duration_ms", None):
continue
diff_seconds = (
next_play.played_at - current_play.played_at
).total_seconds()
# Logic: If diff < (duration - 10s), it's a skip.
# Convert duration to seconds
duration_sec = track.duration_ms / 1000.0
# Also ensure diff isn't negative or weirdly small (re-plays)
# And assume "listening" means diff > 30s at least?
# Spec says "Spotify only returns 30s+".
if diff_seconds < (duration_sec - 10):
skips += 1
@@ -826,9 +737,6 @@ class StatsService:
def compute_context_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Analyzes context_uri to determine if user listens to Playlists, Albums, or Artists.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start, PlayHistory.played_at <= period_end
)
@@ -851,7 +759,6 @@ class StatsService:
context_counts["unknown"] += 1
continue
# Count distinct contexts for loyalty
unique_contexts[p.context_uri] = unique_contexts.get(p.context_uri, 0) + 1
if "playlist" in p.context_uri:
@@ -861,15 +768,12 @@ class StatsService:
elif "artist" in p.context_uri:
context_counts["artist"] += 1
elif "collection" in p.context_uri:
# "Liked Songs" usually shows up as collection
context_counts["collection"] += 1
else:
context_counts["unknown"] += 1
total = len(plays)
breakdown = {k: round(v / total, 2) for k, v in context_counts.items()}
# Top 5 Contexts (Requires resolving URI to name, possibly missing metadata here)
sorted_contexts = sorted(
unique_contexts.items(), key=lambda x: x[1], reverse=True
)[:5]
@@ -887,9 +791,6 @@ class StatsService:
def compute_taste_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Mainstream vs. Hipster analysis based on Track.popularity (0-100).
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start, PlayHistory.played_at <= period_end
)
@@ -904,15 +805,13 @@ class StatsService:
pop_values = []
for p in plays:
t = track_map.get(p.track_id)
if t and t.popularity is not None:
if t and getattr(t, "popularity", None) is not None:
pop_values.append(t.popularity)
if not pop_values:
return {"avg_popularity": 0, "hipster_score": 0}
avg_pop = float(np.mean(pop_values))
# Hipster Score: Percentage of tracks with popularity < 30
underground_plays = len([x for x in pop_values if x < 30])
mainstream_plays = len([x for x in pop_values if x > 70])
@@ -926,10 +825,6 @@ class StatsService:
def compute_lifecycle_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Determines if tracks are 'New Discoveries' or 'Old Favorites'.
"""
# 1. Get tracks played in this period
current_plays = (
self.db.query(PlayHistory)
.filter(
@@ -943,20 +838,14 @@ class StatsService:
return {}
current_track_ids = set([p.track_id for p in current_plays])
# 2. Check if these tracks were played BEFORE period_start
# We find which of the current_track_ids exist in history < period_start
old_tracks_query = self.db.query(distinct(PlayHistory.track_id)).filter(
PlayHistory.track_id.in_(current_track_ids),
PlayHistory.played_at < period_start,
)
old_track_ids = set([r[0] for r in old_tracks_query.all()])
# 3. Calculate Discovery
new_discoveries = current_track_ids - old_track_ids
discovery_count = len(new_discoveries)
# Calculate plays on new discoveries
plays_on_new = len([p for p in current_plays if p.track_id in new_discoveries])
total_plays = len(current_plays)
@@ -973,9 +862,6 @@ class StatsService:
def compute_explicit_stats(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
"""
Analyzes explicit content consumption.
"""
query = (
self.db.query(PlayHistory)
.options(joinedload(PlayHistory.track))
@@ -987,7 +873,7 @@ class StatsService:
plays = query.all()
if not plays:
return {"explicit_rate": 0, "hourly_explicit_rate": []}
return {"explicit_rate": 0, "hourly_explicit_distribution": []}
total_plays = len(plays)
explicit_count = 0
@@ -997,18 +883,11 @@ class StatsService:
for p in plays:
h = p.played_at.hour
hourly_total[h] += 1
# Check raw_data for explicit flag
t = p.track
is_explicit = False
if t.raw_data and t.raw_data.get("explicit"):
is_explicit = True
if is_explicit:
if t and t.raw_data and t.raw_data.get("explicit"):
explicit_count += 1
hourly_explicit[h] += 1
# Calculate hourly percentages
hourly_rates = []
for i in range(24):
if hourly_total[i] > 0:
@@ -1025,7 +904,6 @@ class StatsService:
def generate_full_report(
self, period_start: datetime, period_end: datetime
) -> Dict[str, Any]:
# 1. Calculate all current stats
current_stats = {
"period": {
"start": period_start.isoformat(),
@@ -1043,7 +921,6 @@ class StatsService:
"skips": self.compute_skip_stats(period_start, period_end),
}
# 2. Calculate Comparison
current_stats["comparison"] = self.compute_comparison(
current_stats, period_start, period_end
)
@@ -1064,7 +941,53 @@ class StatsService:
"top_genres": [],
"repeat_rate": 0,
"one_and_done_rate": 0,
"concentration": {},
"concentration": {
"hhi": 0,
"gini": 0,
"top_1_share": 0,
"top_5_share": 0,
"genre_entropy": 0,
},
}
def _empty_time_stats(self):
return {
"heatmap": [],
"heatmap_compressed": [],
"block_labels": [],
"hourly_distribution": [0] * 24,
"peak_hour": None,
"weekday_distribution": [0] * 7,
"daily_distribution": [0] * 7,
"weekend_share": 0,
"part_of_day": {"morning": 0, "afternoon": 0, "evening": 0, "night": 0},
"listening_streak": 0,
"longest_streak": 0,
"active_days": 0,
"avg_plays_per_active_day": 0,
}
def _empty_session_stats(self):
return {
"count": 0,
"avg_tracks": 0,
"avg_minutes": 0,
"median_minutes": 0,
"longest_session_minutes": 0,
"sessions_per_day": 0,
"start_hour_distribution": [0] * 24,
"micro_session_rate": 0,
"marathon_session_rate": 0,
"energy_arcs": {"rising": 0, "falling": 0, "flat": 0, "unknown": 0},
"session_list": [],
}
def _empty_vibe_stats(self):
return {
"avg_energy": 0,
"avg_valence": 0,
"mood_quadrant": {"x": 0, "y": 0},
"clusters": [],
}
def _pct_change(self, curr, prev):