Fixed and added all the stats_service.py methods

This commit is contained in:
bnair123
2025-12-25 22:17:21 +04:00
parent e7980cc706
commit 9b8f7355fb
9 changed files with 412 additions and 146 deletions

View File

@@ -0,0 +1,35 @@
import os
import lyricsgenius
from typing import Optional, Dict, Any
class GeniusClient:
def __init__(self):
self.access_token = os.getenv("GENIUS_ACCESS_TOKEN")
if self.access_token:
self.genius = lyricsgenius.Genius(self.access_token, verbose=False, remove_section_headers=True)
else:
print("WARNING: GENIUS_ACCESS_TOKEN not found. Lyrics enrichment will be skipped.")
self.genius = None
def search_song(self, title: str, artist: str) -> Optional[Dict[str, Any]]:
"""
Searches for a song on Genius and returns metadata + lyrics.
"""
if not self.genius:
return None
try:
# Clean up title (remove "Feat.", "Remastered", etc for better search match)
clean_title = title.split(" - ")[0].split("(")[0].strip()
song = self.genius.search_song(clean_title, artist)
if song:
return {
"lyrics": song.lyrics,
"image_url": song.song_art_image_url,
"artist_image_url": song.primary_artist.image_url
}
except Exception as e:
print(f"Genius Search Error for {title} by {artist}: {e}")
return None

View File

@@ -4,6 +4,7 @@ from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
import math
import numpy as np
from sklearn.cluster import KMeans
from ..models import PlayHistory, Track, Artist
@@ -78,10 +79,18 @@ class StatsService:
genre_counts = {}
album_counts = {}
# Maps for resolving names later without DB hits
# Maps for resolving names/images later without DB hits
track_map = {}
artist_map = {}
album_map = {}
# Helper to safely get image
def get_track_image(t):
if t.image_url: return t.image_url
if t.raw_data and "album" in t.raw_data and "images" in t.raw_data["album"]:
imgs = t.raw_data["album"]["images"]
if imgs: return imgs[0].get("url")
return None
for p in plays:
t = p.track
@@ -102,12 +111,15 @@ class StatsService:
album_name = t.raw_data["album"].get("name", t.album)
album_counts[album_id] = album_counts.get(album_id, 0) + 1
album_map[album_id] = album_name
# Store tuple of (name, image_url)
if album_id not in album_map:
album_map[album_id] = {"name": album_name, "image": get_track_image(t)}
# Artist Aggregation (Iterate objects, not string)
for artist in t.artists:
artist_counts[artist.id] = artist_counts.get(artist.id, 0) + 1
artist_map[artist.id] = artist.name
if artist.id not in artist_map:
artist_map[artist.id] = {"name": artist.name, "image": artist.image_url}
# Genre Aggregation
if artist.genres:
@@ -124,19 +136,20 @@ class StatsService:
top_tracks = [
{
"name": track_map[tid].name,
"artist": ", ".join([a.name for a in track_map[tid].artists]), # Correct artist display
"artist": ", ".join([a.name for a in track_map[tid].artists]),
"image": get_track_image(track_map[tid]),
"count": c
}
for tid, c in sorted(track_counts.items(), key=lambda x: x[1], reverse=True)[:5]
]
top_artists = [
{"name": artist_map.get(aid, "Unknown"), "count": c}
{"name": artist_map[aid]["name"], "id": aid, "image": artist_map[aid]["image"], "count": c}
for aid, c in sorted(artist_counts.items(), key=lambda x: x[1], reverse=True)[:5]
]
top_albums = [
{"name": album_map.get(aid, "Unknown"), "count": c}
{"name": album_map[aid]["name"], "image": album_map[aid]["image"], "count": c}
for aid, c in sorted(album_counts.items(), key=lambda x: x[1], reverse=True)[:5]
]
@@ -188,7 +201,7 @@ class StatsService:
def compute_time_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Includes Part-of-Day buckets, Listening Streaks, and Active Days stats.
Includes Part-of-Day buckets, Listening Streaks, Active Days, and 2D Heatmap.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
@@ -199,16 +212,24 @@ class StatsService:
if not plays:
return {}
# Heatmap: 7 days x 24 hours
heatmap = [[0 for _ in range(24)] for _ in range(7)]
hourly_counts = [0] * 24
weekday_counts = [0] * 7
# Spec: Morning (6-12), Afternoon (12-18), Evening (18-24), Night (0-6)
part_of_day = {"morning": 0, "afternoon": 0, "evening": 0, "night": 0}
active_dates = set()
for p in plays:
h = p.played_at.hour
d = p.played_at.weekday()
# Populate Heatmap
heatmap[d][h] += 1
hourly_counts[h] += 1
weekday_counts[p.played_at.weekday()] += 1
weekday_counts[d] += 1
active_dates.add(p.played_at.date())
if 6 <= h < 12:
@@ -240,6 +261,7 @@ class StatsService:
active_days_count = len(active_dates)
return {
"heatmap": heatmap, # 7x24 Matrix
"hourly_distribution": hourly_counts,
"peak_hour": hourly_counts.index(max(hourly_counts)),
"weekday_distribution": weekday_counts,
@@ -253,7 +275,7 @@ class StatsService:
def compute_session_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Includes Micro-sessions, Marathon sessions, Energy Arcs, and Median metrics.
Includes Micro-sessions, Marathon sessions, Energy Arcs, Median metrics, and Session List.
"""
query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter(
PlayHistory.played_at >= period_start,
@@ -282,21 +304,41 @@ class StatsService:
marathon_sessions = 0
energy_arcs = {"rising": 0, "falling": 0, "flat": 0, "unknown": 0}
start_hour_dist = [0] * 24
session_list = [] # Metadata for timeline
for sess in sessions:
start_t = sess[0].played_at
end_t = sess[-1].played_at
# Start time distribution
start_hour_dist[sess[0].played_at.hour] += 1
start_hour_dist[start_t.hour] += 1
# Durations
if len(sess) > 1:
duration = (sess[-1].played_at - sess[0].played_at).total_seconds() / 60
duration = (end_t - start_t).total_seconds() / 60
lengths_min.append(duration)
else:
lengths_min.append(3.0) # Approx single song
duration = 3.0 # Approx single song
lengths_min.append(duration)
# Types
if len(sess) <= 3: micro_sessions += 1
if len(sess) >= 20: marathon_sessions += 1
sess_type = "Standard"
if len(sess) <= 3:
micro_sessions += 1
sess_type = "Micro"
elif len(sess) >= 20:
marathon_sessions += 1
sess_type = "Marathon"
# Store Session Metadata
session_list.append({
"start_time": start_t.isoformat(),
"end_time": end_t.isoformat(),
"duration_minutes": round(duration, 1),
"track_count": len(sess),
"type": sess_type
})
# Energy Arc
first_t = sess[0].track
@@ -326,12 +368,13 @@ class StatsService:
"start_hour_distribution": start_hour_dist,
"micro_session_rate": round(micro_sessions / len(sessions), 2),
"marathon_session_rate": round(marathon_sessions / len(sessions), 2),
"energy_arcs": energy_arcs
"energy_arcs": energy_arcs,
"session_list": session_list
}
def compute_vibe_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Aggregates Audio Features + Calculates Whiplash, Percentiles, and Profiles.
Aggregates Audio Features + Calculates Whiplash + Clustering + Harmonic Profile.
"""
plays = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
@@ -349,6 +392,14 @@ class StatsService:
feature_keys = ["energy", "valence", "danceability", "tempo", "acousticness",
"instrumentalness", "liveness", "speechiness", "loudness"]
features = {k: [] for k in feature_keys}
# For Clustering: List of [energy, valence, danceability, acousticness]
cluster_data = []
# For Harmonic & Tempo
keys = []
modes = []
tempo_zones = {"chill": 0, "groove": 0, "hype": 0}
# 2. Transition Arrays (for Whiplash)
transitions = {"tempo": [], "energy": [], "valence": []}
@@ -364,6 +415,20 @@ class StatsService:
val = getattr(t, key, None)
if val is not None:
features[key].append(val)
# Cluster Data (only if all 4 exist)
if all(getattr(t, k) is not None for k in ["energy", "valence", "danceability", "acousticness"]):
cluster_data.append([t.energy, t.valence, t.danceability, t.acousticness])
# Harmonic
if t.key is not None: keys.append(t.key)
if t.mode is not None: modes.append(t.mode)
# Tempo Zones
if t.tempo is not None:
if t.tempo < 100: tempo_zones["chill"] += 1
elif t.tempo < 130: tempo_zones["groove"] += 1
else: tempo_zones["hype"] += 1
# Calculate Transitions (Whiplash)
if i > 0 and previous_track:
@@ -381,12 +446,13 @@ class StatsService:
# Calculate Stats (Mean, Std, Percentiles)
stats = {}
for key, values in features.items():
if values:
stats[f"avg_{key}"] = float(np.mean(values))
stats[f"std_{key}"] = float(np.std(values))
stats[f"p10_{key}"] = float(np.percentile(values, 10))
stats[f"p50_{key}"] = float(np.percentile(values, 50)) # Median
stats[f"p90_{key}"] = float(np.percentile(values, 90))
valid = [v for v in values if v is not None]
if valid:
stats[f"avg_{key}"] = float(np.mean(valid))
stats[f"std_{key}"] = float(np.std(valid))
stats[f"p10_{key}"] = float(np.percentile(valid, 10))
stats[f"p50_{key}"] = float(np.percentile(valid, 50)) # Median
stats[f"p90_{key}"] = float(np.percentile(valid, 90))
else:
stats[f"avg_{key}"] = None
@@ -396,31 +462,97 @@ class StatsService:
"x": round(stats["avg_valence"], 2),
"y": round(stats["avg_energy"], 2)
}
# Consistency
avg_std = (stats.get("std_energy", 0) + stats.get("std_valence", 0)) / 2
stats["consistency_score"] = round(1.0 - avg_std, 2)
# Rhythm Profile
if stats.get("avg_tempo") is not None and stats.get("avg_danceability") is not None:
stats["rhythm_profile"] = {
"avg_tempo": round(stats["avg_tempo"], 1),
"avg_danceability": round(stats["avg_danceability"], 2)
}
# Texture Profile
if stats.get("avg_acousticness") is not None and stats.get("avg_instrumentalness") is not None:
stats["texture_profile"] = {
"acousticness": round(stats["avg_acousticness"], 2),
"instrumentalness": round(stats["avg_instrumentalness"], 2)
}
# Whiplash Scores
# Whiplash
stats["whiplash"] = {}
for k in ["tempo", "energy", "valence"]:
if transitions[k]:
stats["whiplash"][k] = round(float(np.mean(transitions[k])), 2)
else:
stats["whiplash"][k] = 0
# Tempo Zones
total_tempo = sum(tempo_zones.values())
if total_tempo > 0:
stats["tempo_zones"] = {k: round(v / total_tempo, 2) for k, v in tempo_zones.items()}
else:
stats["tempo_zones"] = {}
# Harmonic Profile
if modes:
major_count = len([m for m in modes if m == 1])
stats["harmonic_profile"] = {
"major_pct": round(major_count / len(modes), 2),
"minor_pct": round((len(modes) - major_count) / len(modes), 2)
}
if keys:
# Map integers to pitch class notation
pitch_class = ["C", "C#", "D", "D#", "E", "F", "F#", "G", "G#", "A", "A#", "B"]
key_counts = {}
for k in keys:
if 0 <= k < 12:
label = pitch_class[k]
key_counts[label] = key_counts.get(label, 0) + 1
stats["top_keys"] = [{"key": k, "count": v} for k, v in sorted(key_counts.items(), key=lambda x: x[1], reverse=True)[:3]]
# CLUSTERING (K-Means)
if len(cluster_data) >= 5: # Need enough data points
try:
# Features: energy, valence, danceability, acousticness
kmeans = KMeans(n_clusters=3, random_state=42, n_init=10)
labels = kmeans.fit_predict(cluster_data)
# Analyze clusters
clusters = []
for i in range(3):
mask = (labels == i)
count = np.sum(mask)
if count == 0: continue
centroid = kmeans.cluster_centers_[i]
share = count / len(cluster_data)
# Heuristic Naming
c_energy, c_valence, c_dance, c_acoustic = centroid
name = "Mixed Vibe"
if c_energy > 0.7: name = "High Energy"
elif c_acoustic > 0.7: name = "Acoustic / Chill"
elif c_valence < 0.3: name = "Melancholy"
elif c_dance > 0.7: name = "Dance / Groove"
clusters.append({
"name": name,
"share": round(share, 2),
"features": {
"energy": round(c_energy, 2),
"valence": round(c_valence, 2),
"danceability": round(c_dance, 2),
"acousticness": round(c_acoustic, 2)
}
})
# Sort by share
stats["clusters"] = sorted(clusters, key=lambda x: x["share"], reverse=True)
except Exception as e:
print(f"Clustering failed: {e}")
stats["clusters"] = []
else:
stats["clusters"] = []
return stats
@@ -448,9 +580,11 @@ class StatsService:
if not years:
return {"musical_age": None}
# Musical Age (Weighted Average)
avg_year = sum(years) / len(years)
current_year = datetime.utcnow().year
# Decade Distribution
decades = {}
for y in years:
dec = (y // 10) * 10
@@ -463,17 +597,18 @@ class StatsService:
return {
"musical_age": int(avg_year),
"nostalgia_gap": int(current_year - avg_year),
"freshness_score": dist.get(f"{int(current_year / 10) * 10}s", 0),
"freshness_score": dist.get(f"{int(current_year / 10) * 10}s", 0), # Share of current decade
"decade_distribution": dist
}
def compute_skip_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Implements boredom skip detection.
Implements boredom skip detection:
(next_track.played_at - current_track.played_at) < (current_track.duration_ms / 1000 - 10s)
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
PlayHistory.played_at <= period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
@@ -485,10 +620,7 @@ class StatsService:
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
# Denominator: transitions, which is plays - 1
transitions_count = len(plays) - 1
for i in range(transitions_count):
for i in range(len(plays) - 1):
current_play = plays[i]
next_play = plays[i+1]
track = track_map.get(current_play.track_id)
@@ -497,28 +629,31 @@ class StatsService:
continue
diff_seconds = (next_play.played_at - current_play.played_at).total_seconds()
duration_sec = track.duration_ms / 1000.0
# Logic: If diff < (duration - 10s), it's a skip.
# AND it must be a "valid" listening attempt (e.g. > 30s)
# AND it shouldn't be a huge gap (e.g. paused for 2 hours then hit next)
if 30 < diff_seconds < (duration_sec - 10):
# Convert duration to seconds
duration_sec = track.duration_ms / 1000.0
# Also ensure diff isn't negative or weirdly small (re-plays)
# And assume "listening" means diff > 30s at least?
# Spec says "Spotify only returns 30s+".
if diff_seconds < (duration_sec - 10):
skips += 1
return {
"total_skips": skips,
"skip_rate": round(skips / transitions_count, 3) if transitions_count > 0 else 0
"skip_rate": round(skips / len(plays), 3)
}
def compute_context_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Analyzes context_uri and switching rate.
Analyzes context_uri to determine if user listens to Playlists, Albums, or Artists.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
).order_by(PlayHistory.played_at.asc())
PlayHistory.played_at <= period_end
)
plays = query.all()
if not plays:
@@ -526,32 +661,31 @@ class StatsService:
context_counts = {"playlist": 0, "album": 0, "artist": 0, "collection": 0, "unknown": 0}
unique_contexts = {}
context_switches = 0
last_context = None
for p in plays:
uri = p.context_uri
if not uri:
if not p.context_uri:
context_counts["unknown"] += 1
uri = "unknown"
else:
if "playlist" in uri: context_counts["playlist"] += 1
elif "album" in uri: context_counts["album"] += 1
elif "artist" in uri: context_counts["artist"] += 1
elif "collection" in uri: context_counts["collection"] += 1
else: context_counts["unknown"] += 1
continue
if uri != "unknown":
unique_contexts[uri] = unique_contexts.get(uri, 0) + 1
# Switch detection
if last_context and uri != last_context:
context_switches += 1
last_context = uri
# Count distinct contexts for loyalty
unique_contexts[p.context_uri] = unique_contexts.get(p.context_uri, 0) + 1
if "playlist" in p.context_uri:
context_counts["playlist"] += 1
elif "album" in p.context_uri:
context_counts["album"] += 1
elif "artist" in p.context_uri:
context_counts["artist"] += 1
elif "collection" in p.context_uri:
# "Liked Songs" usually shows up as collection
context_counts["collection"] += 1
else:
context_counts["unknown"] += 1
total = len(plays)
breakdown = {k: round(v / total, 2) for k, v in context_counts.items()}
# Top 5 Contexts (Requires resolving URI to name, possibly missing metadata here)
sorted_contexts = sorted(unique_contexts.items(), key=lambda x: x[1], reverse=True)[:5]
return {
@@ -559,17 +693,16 @@ class StatsService:
"album_purist_score": breakdown.get("album", 0),
"playlist_dependency": breakdown.get("playlist", 0),
"context_loyalty": round(len(plays) / len(unique_contexts), 2) if unique_contexts else 0,
"context_switching_rate": round(context_switches / (total - 1), 2) if total > 1 else 0,
"top_context_uris": [{"uri": k, "count": v} for k, v in sorted_contexts]
}
def compute_taste_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Mainstream vs. Hipster analysis.
Mainstream vs. Hipster analysis based on Track.popularity (0-100).
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
PlayHistory.played_at <= period_end
)
plays = query.all()
if not plays: return {}
@@ -602,47 +735,38 @@ class StatsService:
def compute_lifecycle_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Discovery, Recurrence, Comebacks, Obsessions.
Determines if tracks are 'New Discoveries' or 'Old Favorites'.
"""
# 1. Current plays
# 1. Get tracks played in this period
current_plays = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
PlayHistory.played_at <= period_end
).all()
if not current_plays: return {}
current_track_ids = set([p.track_id for p in current_plays])
# 2. Historical check
# 2. Check if these tracks were played BEFORE period_start
# We find which of the current_track_ids exist in history < period_start
old_tracks_query = self.db.query(distinct(PlayHistory.track_id)).filter(
PlayHistory.track_id.in_(current_track_ids),
PlayHistory.played_at < period_start
)
old_track_ids = set([r[0] for r in old_tracks_query.all()])
# 3. Discovery
# 3. Calculate Discovery
new_discoveries = current_track_ids - old_track_ids
# 4. Obsessions (Tracks with > 5 plays in period)
track_counts = {}
for p in current_plays:
track_counts[p.track_id] = track_counts.get(p.track_id, 0) + 1
obsessions = [tid for tid, count in track_counts.items() if count >= 5]
# 5. Comeback Detection (Old tracks not played in last 30 days)
# Simplified: If in old_track_ids but NOT in last 30 days before period_start?
# That requires a gap check. For now, we will mark 'recurrence' as general relistening.
discovery_count = len(new_discoveries)
# Calculate plays on new discoveries
plays_on_new = len([p for p in current_plays if p.track_id in new_discoveries])
total_plays = len(current_plays)
return {
"discovery_count": len(new_discoveries),
"discovery_count": discovery_count,
"discovery_rate": round(plays_on_new / total_plays, 3) if total_plays > 0 else 0,
"recurrence_rate": round((total_plays - plays_on_new) / total_plays, 3) if total_plays > 0 else 0,
"obsession_count": len(obsessions),
"obsession_rate": round(len(obsessions) / len(current_track_ids), 3) if current_track_ids else 0
"recurrence_rate": round((total_plays - plays_on_new) / total_plays, 3) if total_plays > 0 else 0
}
def compute_explicit_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
@@ -651,7 +775,7 @@ class StatsService:
"""
query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at < period_end
PlayHistory.played_at <= period_end
)
plays = query.all()
@@ -665,14 +789,24 @@ class StatsService:
for p in plays:
h = p.played_at.hour
hourly_total[h] += 1
# Check raw_data for explicit flag
t = p.track
is_explicit = False
if t.raw_data and t.raw_data.get("explicit"):
is_explicit = True
if is_explicit:
explicit_count += 1
hourly_explicit[h] += 1
# Calculate hourly percentages
hourly_rates = []
for i in range(24):
hourly_rates.append(round(hourly_explicit[i] / hourly_total[i], 2) if hourly_total[i] > 0 else 0.0)
if hourly_total[i] > 0:
hourly_rates.append(round(hourly_explicit[i] / hourly_total[i], 2))
else:
hourly_rates.append(0.0)
return {
"explicit_rate": round(explicit_count / total_plays, 3),
@@ -681,6 +815,7 @@ class StatsService:
}
def generate_full_report(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
# 1. Calculate all current stats
current_stats = {
"period": {"start": period_start.isoformat(), "end": period_end.isoformat()},
"volume": self.compute_volume_stats(period_start, period_end),
@@ -695,7 +830,9 @@ class StatsService:
"skips": self.compute_skip_stats(period_start, period_end)
}
# 2. Calculate Comparison
current_stats["comparison"] = self.compute_comparison(current_stats, period_start, period_end)
return current_stats
def _empty_volume_stats(self):
@@ -710,4 +847,4 @@ class StatsService:
def _pct_change(self, curr, prev):
if prev == 0:
return 100.0 if curr > 0 else 0.0
return round(((curr - prev) / prev) * 100, 1)
return round(((curr - prev) / prev) * 100, 1)