Refactor Stats and Narrative services to match spec

- StatsService: Fixed N+1 queries, added missing metrics (whiplash, entropy, lifecycle), and improved correctness (boundary checks, null handling).
- NarrativeService: Added payload shaping for token efficiency, improved JSON robustness, and updated prompts to align with persona specs.
- Documentation: Added backend/TECHNICAL_DOCS.md detailing the logic.
This commit is contained in:
bnair123
2025-12-25 18:12:05 +04:00
parent 508d001d7e
commit af0d985253
3 changed files with 410 additions and 202 deletions

View File

@@ -1,10 +1,11 @@
import os
import json
import re
import google.generativeai as genai
from typing import Dict, Any
from typing import Dict, Any, List, Optional
class NarrativeService:
def __init__(self, model_name: str = "gemini-2.5-flash"):
def __init__(self, model_name: str = "gemini-2.0-flash-exp"):
self.api_key = os.getenv("GEMINI_API_KEY")
if not self.api_key:
print("WARNING: GEMINI_API_KEY not found. LLM features will fail.")
@@ -13,47 +14,111 @@ class NarrativeService:
self.model_name = model_name
def generate_narrative(self, stats_json: Dict[str, Any]) -> Dict[str, str]:
def generate_full_narrative(self, stats_json: Dict[str, Any]) -> Dict[str, Any]:
"""
Orchestrates the generation of the full narrative report.
Currently uses a single call for consistency and speed.
"""
if not self.api_key:
return {"error": "Missing API Key"}
return self._get_fallback_narrative()
clean_stats = self._shape_payload(stats_json)
prompt = f"""
You are a witty, insightful, and slightly snarky music critic analyzing a user's listening history.
Below is a JSON summary of their listening data.
You are a witty, insightful, and slightly snarky music critic analyzing a user's Spotify listening data.
Your goal is to generate a JSON report that acts as a deeper, more honest "Spotify Wrapped".
Your goal is to generate a report that feels like a 'Spotify Wrapped' but deeper and more honest.
**CORE RULES:**
1. **NO Mental Health Diagnoses:** Do not mention depression, anxiety, or therapy. Stick to behavioral descriptors (e.g., "introspective", "high-energy").
2. **Be Specific:** Use the provided metrics. Don't say "You like pop," say "Your Mainstream Score of 85% suggests..."
3. **Roast Gently:** Be playful but not cruel.
4. **JSON Output Only:** Return strictly valid JSON.
Please output your response in strict JSON format with the following keys:
1. "vibe_check": (String) 2-3 paragraphs describing their overall listening personality.
2. "patterns": (List of Strings) 3-5 specific observations based on the data (e.g., "You listen to sad music on Tuesdays", "Your Whiplash Score is high").
3. "persona": (String) A creative label for the user (e.g., "The Genre Chameleon", "Nostalgic Dad-Rocker", "Algorithm Victim").
4. "roast": (String) A playful, harmlessly mean roast about their taste (1-2 sentences).
5. "era_insight": (String) A specific comment on their 'Musical Age' and 'Nostalgia Gap'.
**DATA TO ANALYZE:**
{json.dumps(clean_stats, indent=2)}
GUIDELINES:
- **Use the Metrics:** Do not just say "You like pop." Say "Your Mainstream Score of 85% suggests you live on the Top 40."
- **Whiplash Score:** If 'whiplash' > 20, comment on their chaotic transitions.
- **Hipster Score:** If 'hipster_score' > 50, call them pretentious; if < 10, call them basic.
- **Comparison:** Use the 'comparison' block to mention if they are listening more/less or if their mood (valence/energy) has shifted.
- **Tone:** Conversational, fun, slightly judgmental but good-natured.
DATA:
{json.dumps(stats_json, indent=2)}
OUTPUT (JSON):
**REQUIRED JSON STRUCTURE:**
{{
"vibe_check": "2-3 paragraphs describing their overall listening personality this period.",
"patterns": ["Observation 1", "Observation 2", "Observation 3 (Look for specific habits like skipping or late-night sessions)"],
"persona": "A creative label (e.g., 'The Genre Chameleon', 'Nostalgic Dad-Rocker').",
"era_insight": "A specific comment on their Musical Age ({clean_stats.get('era', {}).get('musical_age', 'N/A')}) and Nostalgia Gap.",
"roast": "A 1-2 sentence playful roast about their taste.",
"comparison": "A short comment comparing this period to the previous one (if data exists)."
}}
"""
try:
model = genai.GenerativeModel(self.model_name)
response = model.generate_content(prompt)
# Clean up response to ensure valid JSON
text = response.text.strip()
if text.startswith("```json"):
text = text.replace("```json", "").replace("```", "")
elif text.startswith("```"):
text = text.replace("```", "")
return json.loads(text)
# Use JSON mode if available, otherwise rely on prompt + cleaning
response = model.generate_content(
prompt,
generation_config={"response_mime_type": "application/json"}
)
return self._clean_and_parse_json(response.text)
except Exception as e:
return {"error": str(e), "raw_response": "Error generating narrative."}
print(f"LLM Generation Error: {e}")
return self._get_fallback_narrative()
def _shape_payload(self, stats: Dict[str, Any]) -> Dict[str, Any]:
"""
Compresses the stats JSON to save tokens and focus the LLM.
Removes raw lists beyond top 5/10.
"""
s = stats.copy()
# Simplify Volume
if "volume" in s:
s["volume"] = {
k: v for k, v in s["volume"].items()
if k not in ["top_tracks", "top_artists", "top_albums", "top_genres"]
}
# Add back condensed top lists (just names)
s["volume"]["top_tracks"] = [t["name"] for t in stats["volume"].get("top_tracks", [])[:5]]
s["volume"]["top_artists"] = [a["name"] for a in stats["volume"].get("top_artists", [])[:5]]
s["volume"]["top_genres"] = [g["name"] for g in stats["volume"].get("top_genres", [])[:5]]
# Simplify Time (Keep distributions but maybe round them?)
# Keeping hourly/daily is fine, they are small arrays.
# Simplify Vibe (Remove huge transition arrays if they accidentally leaked, though stats service handles this)
# Remove period details if verbose
return s
def _clean_and_parse_json(self, raw_text: str) -> Dict[str, Any]:
"""
Robust JSON extractor.
"""
try:
# 1. Try direct parse
return json.loads(raw_text)
except json.JSONDecodeError:
pass
# 2. Extract between first { and last }
try:
match = re.search(r"\{.*\}", raw_text, re.DOTALL)
if match:
return json.loads(match.group(0))
except:
pass
return self._get_fallback_narrative()
def _get_fallback_narrative(self) -> Dict[str, Any]:
return {
"vibe_check": "Data processing error. You're too mysterious for us to analyze right now.",
"patterns": [],
"persona": "The Enigma",
"era_insight": "Time is a flat circle.",
"roast": "You broke the machine. Congratulations.",
"comparison": "N/A"
}
# Individual accessors if needed by frontend, though full_narrative is preferred
def generate_vibe_check(self, stats): return self.generate_full_narrative(stats).get("vibe_check")
def identify_patterns(self, stats): return self.generate_full_narrative(stats).get("patterns")
def generate_persona(self, stats): return self.generate_full_narrative(stats).get("persona")
def generate_roast(self, stats): return self.generate_full_narrative(stats).get("roast")

View File

@@ -1,20 +1,17 @@
from sqlalchemy.orm import Session
from sqlalchemy import func, distinct, desc, joinedload
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import func, distinct
from datetime import datetime, timedelta
from typing import Dict, Any, List
from typing import Dict, Any, List, Optional
import math
import numpy as np
from ..models import PlayHistory, Track, Artist, AnalysisSnapshot
from ..models import PlayHistory, Track, Artist
class StatsService:
def __init__(self, db: Session):
self.db = db
from sqlalchemy.orm import joinedload # Add this to imports
def compute_comparison(self, current_stats: Dict[str, Any], period_start: datetime, period_end: datetime) -> Dict[
str, Any]:
def compute_comparison(self, current_stats: Dict[str, Any], period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Calculates deltas vs the previous period of the same length.
"""
@@ -22,25 +19,18 @@ class StatsService:
prev_end = period_start
prev_start = prev_end - duration
# We only need key metrics for comparison, not the full heavy report
# Let's re-use existing methods but strictly for the previous window
# 1. Volume Comparison
# We only need key metrics for comparison
prev_volume = self.compute_volume_stats(prev_start, prev_end)
# 2. Vibe Comparison (Just energy/valence/popularity)
prev_vibe = self.compute_vibe_stats(prev_start, prev_end)
prev_taste = self.compute_taste_stats(prev_start, prev_end)
# Calculate Deltas
deltas = {}
# Plays
curr_plays = current_stats["volume"]["total_plays"]
prev_plays_count = prev_volume["total_plays"]
deltas["plays_delta"] = curr_plays - prev_plays_count
deltas["plays_pct_change"] = round(((curr_plays - prev_plays_count) / prev_plays_count) * 100,
1) if prev_plays_count else 0
deltas["plays_pct_change"] = self._pct_change(curr_plays, prev_plays_count)
# Energy & Valence
if "mood_quadrant" in current_stats["vibe"] and "mood_quadrant" in prev_vibe:
@@ -54,8 +44,7 @@ class StatsService:
# Popularity
if "avg_popularity" in current_stats["taste"] and "avg_popularity" in prev_taste:
deltas["popularity_delta"] = round(current_stats["taste"]["avg_popularity"] - prev_taste["avg_popularity"],
1)
deltas["popularity_delta"] = round(current_stats["taste"]["avg_popularity"] - prev_taste["avg_popularity"], 1)
return {
"previous_period": {
@@ -67,112 +56,143 @@ class StatsService:
def compute_volume_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Calculates volume metrics including Concentration (HHI, Gini) and One-and-Done rates.
Calculates volume metrics including Concentration (HHI, Gini, Entropy) and Top Lists.
"""
# Eager load tracks AND artists to fix the "Artist String Problem" and performance
# Use < period_end for half-open interval to avoid double counting boundaries
query = self.db.query(PlayHistory).options(
joinedload(PlayHistory.track).joinedload(Track.artists)
).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
PlayHistory.played_at < period_end
)
plays = query.all()
total_plays = len(plays)
if total_plays == 0:
return {
"total_plays": 0, "estimated_minutes": 0, "unique_tracks": 0,
"unique_artists": 0, "unique_albums": 0, "unique_genres": 0,
"top_tracks": [], "top_artists": [], "top_genres": [],
"repeat_rate": 0, "concentration": {}
}
return self._empty_volume_stats()
total_ms = 0
track_counts = {}
artist_counts = {}
genre_counts = {}
album_ids = set()
album_counts = {}
# Maps for resolving names later without DB hits
track_map = {}
artist_map = {}
album_map = {}
for p in plays:
t = p.track
if not t: continue
total_ms += t.duration_ms if t.duration_ms else 0
# Track Counts
# Track Aggregation
track_counts[t.id] = track_counts.get(t.id, 0) + 1
track_map[t.id] = t
# Album Counts (using raw_data ID if available, else name)
if t.raw_data and "album" in t.raw_data and "id" in t.raw_data["album"]:
album_ids.add(t.raw_data["album"]["id"])
else:
album_ids.add(t.album)
# Album Aggregation
# Prefer ID from raw_data, fallback to name
album_id = t.album
album_name = t.album
if t.raw_data and "album" in t.raw_data:
album_id = t.raw_data["album"].get("id", t.album)
album_name = t.raw_data["album"].get("name", t.album)
album_counts[album_id] = album_counts.get(album_id, 0) + 1
album_map[album_id] = album_name
# Artist Counts (Iterate objects, not string)
# Artist Aggregation (Iterate objects, not string)
for artist in t.artists:
artist_counts[artist.id] = artist_counts.get(artist.id, 0) + 1
artist_map[artist.id] = artist.name
# Genre Aggregation
if artist.genres:
# artist.genres is a JSON list of strings
for g in artist.genres:
genre_counts[g] = genre_counts.get(g, 0) + 1
# Derived Metrics
unique_tracks = len(track_counts)
one_and_done = len([c for c in track_counts.values() if c == 1])
shares = [c / total_plays for c in track_counts.values()]
# Top Lists
# Top Lists (Optimized: No N+1)
top_tracks = [
{"name": self.db.query(Track).get(tid).name, "artist": self.db.query(Track).get(tid).artist, "count": c}
{
"name": track_map[tid].name,
"artist": ", ".join([a.name for a in track_map[tid].artists]), # Correct artist display
"count": c
}
for tid, c in sorted(track_counts.items(), key=lambda x: x[1], reverse=True)[:5]
]
top_artist_ids = sorted(artist_counts.items(), key=lambda x: x[1], reverse=True)[:5]
# Fetch artist names efficiently
top_artists_objs = self.db.query(Artist).filter(Artist.id.in_([x[0] for x in top_artist_ids])).all()
artist_map = {a.id: a.name for a in top_artists_objs}
top_artists = [{"name": artist_map.get(aid, "Unknown"), "count": c} for aid, c in top_artist_ids]
top_artists = [
{"name": artist_map.get(aid, "Unknown"), "count": c}
for aid, c in sorted(artist_counts.items(), key=lambda x: x[1], reverse=True)[:5]
]
top_albums = [
{"name": album_map.get(aid, "Unknown"), "count": c}
for aid, c in sorted(album_counts.items(), key=lambda x: x[1], reverse=True)[:5]
]
top_genres = [{"name": k, "count": v} for k, v in
sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[:5]]
top_genres = [{"name": k, "count": v} for k, v in sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[:5]]
# Concentration (HHI & Gini)
# Concentration Metrics
# HHI: Sum of (share)^2
shares = [c / total_plays for c in track_counts.values()]
hhi = sum([s ** 2 for s in shares])
# Gini Coefficient (Inequality of play distribution)
# Gini Coefficient
sorted_shares = sorted(shares)
n = len(shares)
gini = 0
if n > 0:
gini = (2 * sum((i + 1) * x for i, x in enumerate(sorted_shares))) / (n * sum(sorted_shares)) - (n + 1) / n
else:
gini = 0
# Genre Entropy: -SUM(p * log(p))
total_genre_occurrences = sum(genre_counts.values())
genre_entropy = 0
if total_genre_occurrences > 0:
genre_probs = [count / total_genre_occurrences for count in genre_counts.values()]
genre_entropy = -sum([p * math.log(p) for p in genre_probs if p > 0])
# Top 5 Share
top_5_plays = sum([t["count"] for t in top_tracks])
top_5_share = top_5_plays / total_plays if total_plays else 0
return {
"total_plays": total_plays,
"estimated_minutes": int(total_ms / 60000),
"unique_tracks": unique_tracks,
"unique_artists": len(artist_counts),
"unique_albums": len(album_ids),
"unique_albums": len(album_counts),
"unique_genres": len(genre_counts),
"top_tracks": top_tracks,
"top_artists": top_artists,
"top_albums": top_albums,
"top_genres": top_genres,
"repeat_rate": round((total_plays - unique_tracks) / total_plays, 3) if total_plays else 0,
"one_and_done_rate": round(one_and_done / unique_tracks, 3) if unique_tracks else 0,
"concentration": {
"hhi": round(hhi, 4),
"gini": round(gini, 4),
"top_1_share": round(max(shares), 3) if shares else 0
"top_1_share": round(max(shares), 3) if shares else 0,
"top_5_share": round(top_5_share, 3),
"genre_entropy": round(genre_entropy, 2)
}
}
def compute_time_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Includes Part-of-Day buckets and Listening Streaks.
Includes Part-of-Day buckets, Listening Streaks, and Active Days stats.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
PlayHistory.played_at < period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
@@ -181,9 +201,8 @@ class StatsService:
hourly_counts = [0] * 24
weekday_counts = [0] * 7
# Spec: Morning (6-12), Afternoon (12-18), Evening (18-24), Night (0-6)
part_of_day = {"morning": 0, "afternoon": 0, "evening": 0, "night": 0}
# For Streaks
active_dates = set()
for p in plays:
@@ -192,11 +211,11 @@ class StatsService:
weekday_counts[p.played_at.weekday()] += 1
active_dates.add(p.played_at.date())
if 5 <= h < 12:
if 6 <= h < 12:
part_of_day["morning"] += 1
elif 12 <= h < 17:
elif 12 <= h < 18:
part_of_day["afternoon"] += 1
elif 17 <= h < 22:
elif 18 <= h <= 23:
part_of_day["evening"] += 1
else:
part_of_day["night"] += 1
@@ -208,7 +227,6 @@ class StatsService:
if sorted_dates:
current_streak = 1
longest_streak = 1
# Check strictly consecutive days
for i in range(1, len(sorted_dates)):
delta = (sorted_dates[i] - sorted_dates[i - 1]).days
if delta == 1:
@@ -219,6 +237,7 @@ class StatsService:
longest_streak = max(longest_streak, current_streak)
weekend_plays = weekday_counts[5] + weekday_counts[6]
active_days_count = len(active_dates)
return {
"hourly_distribution": hourly_counts,
@@ -228,17 +247,17 @@ class StatsService:
"part_of_day": part_of_day,
"listening_streak": current_streak,
"longest_streak": longest_streak,
"active_days": len(active_dates)
"active_days": active_days_count,
"avg_plays_per_active_day": round(len(plays) / active_days_count, 1) if active_days_count else 0
}
def compute_session_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Includes Micro-sessions, Marathon sessions, and Energy Arcs.
Includes Micro-sessions, Marathon sessions, Energy Arcs, and Median metrics.
"""
# Need to join Track to get Energy features for Arc analysis
query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
PlayHistory.played_at < period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
@@ -262,20 +281,24 @@ class StatsService:
micro_sessions = 0
marathon_sessions = 0
energy_arcs = {"rising": 0, "falling": 0, "flat": 0, "unknown": 0}
start_hour_dist = [0] * 24
for sess in sessions:
# Start time distribution
start_hour_dist[sess[0].played_at.hour] += 1
# Durations
if len(sess) > 1:
duration = (sess[-1].played_at - sess[0].played_at).total_seconds() / 60
lengths_min.append(duration)
else:
lengths_min.append(3.0) # Approx
lengths_min.append(3.0) # Approx single song
# Types
if len(sess) <= 3: micro_sessions += 1
if len(sess) >= 20: marathon_sessions += 1
# Energy Arc (First vs Last track)
# Energy Arc
first_t = sess[0].track
last_t = sess[-1].track
if first_t and last_t and first_t.energy is not None and last_t.energy is not None:
@@ -286,13 +309,21 @@ class StatsService:
else:
energy_arcs["unknown"] += 1
avg_min = sum(lengths_min) / len(lengths_min) if lengths_min else 0
avg_min = np.mean(lengths_min) if lengths_min else 0
median_min = np.median(lengths_min) if lengths_min else 0
# Sessions per day
active_days = len(set(p.played_at.date() for p in plays))
sessions_per_day = len(sessions) / active_days if active_days else 0
return {
"count": len(sessions),
"avg_tracks": round(len(plays) / len(sessions), 1),
"avg_minutes": round(avg_min, 1),
"avg_minutes": round(float(avg_min), 1),
"median_minutes": round(float(median_min), 1),
"longest_session_minutes": round(max(lengths_min), 1) if lengths_min else 0,
"sessions_per_day": round(sessions_per_day, 1),
"start_hour_distribution": start_hour_dist,
"micro_session_rate": round(micro_sessions / len(sessions), 2),
"marathon_session_rate": round(marathon_sessions / len(sessions), 2),
"energy_arcs": energy_arcs
@@ -300,12 +331,11 @@ class StatsService:
def compute_vibe_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Aggregates Audio Features + Calculates Whiplash (Transitions)
Aggregates Audio Features + Calculates Whiplash, Percentiles, and Profiles.
"""
# Fetch plays strictly ordered by time for transition analysis
plays = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
PlayHistory.played_at < period_end
).order_by(PlayHistory.played_at.asc()).all()
if not plays:
@@ -316,9 +346,9 @@ class StatsService:
track_map = {t.id: t for t in tracks}
# 1. Aggregates
features = {k: [] for k in
["energy", "valence", "danceability", "tempo", "acousticness", "instrumentalness", "liveness",
"speechiness", "loudness"]}
feature_keys = ["energy", "valence", "danceability", "tempo", "acousticness",
"instrumentalness", "liveness", "speechiness", "loudness"]
features = {k: [] for k in feature_keys}
# 2. Transition Arrays (for Whiplash)
transitions = {"tempo": [], "energy": [], "valence": []}
@@ -329,38 +359,34 @@ class StatsService:
t = track_map.get(p.track_id)
if not t: continue
# Populate aggregations
if t.energy is not None:
features["energy"].append(t.energy)
features["valence"].append(t.valence)
features["danceability"].append(t.danceability)
features["tempo"].append(t.tempo)
features["acousticness"].append(t.acousticness)
features["instrumentalness"].append(t.instrumentalness)
features["liveness"].append(t.liveness)
features["speechiness"].append(t.speechiness)
features["loudness"].append(t.loudness)
# Robust Null Check: Append separately
for key in feature_keys:
val = getattr(t, key, None)
if val is not None:
features[key].append(val)
# Calculate Transitions (Whiplash)
if i > 0 and previous_track:
# Only count transition if within reasonable time (e.g. < 5 mins gap)
# assuming continuous listening
time_diff = (p.played_at - plays[i - 1].played_at).total_seconds()
if time_diff < 300:
if t.tempo and previous_track.tempo:
if time_diff < 300: # 5 min gap max
if t.tempo is not None and previous_track.tempo is not None:
transitions["tempo"].append(abs(t.tempo - previous_track.tempo))
if t.energy and previous_track.energy:
if t.energy is not None and previous_track.energy is not None:
transitions["energy"].append(abs(t.energy - previous_track.energy))
if t.valence is not None and previous_track.valence is not None:
transitions["valence"].append(abs(t.valence - previous_track.valence))
previous_track = t
# Calculate Stats
# Calculate Stats (Mean, Std, Percentiles)
stats = {}
for key, values in features.items():
valid = [v for v in values if v is not None]
if valid:
stats[f"avg_{key}"] = float(np.mean(valid))
stats[f"std_{key}"] = float(np.std(valid))
if values:
stats[f"avg_{key}"] = float(np.mean(values))
stats[f"std_{key}"] = float(np.std(values))
stats[f"p10_{key}"] = float(np.percentile(values, 10))
stats[f"p50_{key}"] = float(np.percentile(values, 50)) # Median
stats[f"p90_{key}"] = float(np.percentile(values, 90))
else:
stats[f"avg_{key}"] = None
@@ -370,13 +396,27 @@ class StatsService:
"x": round(stats["avg_valence"], 2),
"y": round(stats["avg_energy"], 2)
}
# Consistency: Inverse of average standard deviation of Mood components
avg_std = (stats["std_energy"] + stats["std_valence"]) / 2
stats["consistency_score"] = round(1.0 - avg_std, 2) # Higher = more consistent
# Consistency
avg_std = (stats.get("std_energy", 0) + stats.get("std_valence", 0)) / 2
stats["consistency_score"] = round(1.0 - avg_std, 2)
# Rhythm Profile
if stats.get("avg_tempo") is not None and stats.get("avg_danceability") is not None:
stats["rhythm_profile"] = {
"avg_tempo": round(stats["avg_tempo"], 1),
"avg_danceability": round(stats["avg_danceability"], 2)
}
# Texture Profile
if stats.get("avg_acousticness") is not None and stats.get("avg_instrumentalness") is not None:
stats["texture_profile"] = {
"acousticness": round(stats["avg_acousticness"], 2),
"instrumentalness": round(stats["avg_instrumentalness"], 2)
}
# Whiplash Scores (Average jump between tracks)
# Whiplash Scores
stats["whiplash"] = {}
for k in ["tempo", "energy"]:
for k in ["tempo", "energy", "valence"]:
if transitions[k]:
stats["whiplash"][k] = round(float(np.mean(transitions[k])), 2)
else:
@@ -388,10 +428,9 @@ class StatsService:
"""
Includes Nostalgia Gap and granular decade breakdown.
"""
# Join track to get raw_data
query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
PlayHistory.played_at < period_end
)
plays = query.all()
@@ -409,11 +448,9 @@ class StatsService:
if not years:
return {"musical_age": None}
# Musical Age (Weighted Average)
avg_year = sum(years) / len(years)
current_year = datetime.utcnow().year
# Decade Distribution
decades = {}
for y in years:
dec = (y // 10) * 10
@@ -426,18 +463,17 @@ class StatsService:
return {
"musical_age": int(avg_year),
"nostalgia_gap": int(current_year - avg_year),
"freshness_score": dist.get(f"{int(current_year / 10) * 10}s", 0), # Share of current decade
"freshness_score": dist.get(f"{int(current_year / 10) * 10}s", 0),
"decade_distribution": dist
}
def compute_skip_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Implements boredom skip detection:
(next_track.played_at - current_track.played_at) < (current_track.duration_ms / 1000 - 10s)
Implements boredom skip detection.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
PlayHistory.played_at < period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
@@ -449,7 +485,10 @@ class StatsService:
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
for i in range(len(plays) - 1):
# Denominator: transitions, which is plays - 1
transitions_count = len(plays) - 1
for i in range(transitions_count):
current_play = plays[i]
next_play = plays[i+1]
track = track_map.get(current_play.track_id)
@@ -458,31 +497,28 @@ class StatsService:
continue
diff_seconds = (next_play.played_at - current_play.played_at).total_seconds()
# Logic: If diff < (duration - 10s), it's a skip.
# Convert duration to seconds
duration_sec = track.duration_ms / 1000.0
# Also ensure diff isn't negative or weirdly small (re-plays)
# And assume "listening" means diff > 30s at least?
# Spec says "Spotify only returns 30s+".
if diff_seconds < (duration_sec - 10):
# Logic: If diff < (duration - 10s), it's a skip.
# AND it must be a "valid" listening attempt (e.g. > 30s)
# AND it shouldn't be a huge gap (e.g. paused for 2 hours then hit next)
if 30 < diff_seconds < (duration_sec - 10):
skips += 1
return {
"total_skips": skips,
"skip_rate": round(skips / len(plays), 3)
"skip_rate": round(skips / transitions_count, 3) if transitions_count > 0 else 0
}
def compute_context_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Analyzes context_uri to determine if user listens to Playlists, Albums, or Artists.
Analyzes context_uri and switching rate.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
)
PlayHistory.played_at < period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
if not plays:
@@ -490,31 +526,32 @@ class StatsService:
context_counts = {"playlist": 0, "album": 0, "artist": 0, "collection": 0, "unknown": 0}
unique_contexts = {}
context_switches = 0
last_context = None
for p in plays:
if not p.context_uri:
uri = p.context_uri
if not uri:
context_counts["unknown"] += 1
continue
# Count distinct contexts for loyalty
unique_contexts[p.context_uri] = unique_contexts.get(p.context_uri, 0) + 1
if "playlist" in p.context_uri:
context_counts["playlist"] += 1
elif "album" in p.context_uri:
context_counts["album"] += 1
elif "artist" in p.context_uri:
context_counts["artist"] += 1
elif "collection" in p.context_uri:
# "Liked Songs" usually shows up as collection
context_counts["collection"] += 1
uri = "unknown"
else:
context_counts["unknown"] += 1
if "playlist" in uri: context_counts["playlist"] += 1
elif "album" in uri: context_counts["album"] += 1
elif "artist" in uri: context_counts["artist"] += 1
elif "collection" in uri: context_counts["collection"] += 1
else: context_counts["unknown"] += 1
if uri != "unknown":
unique_contexts[uri] = unique_contexts.get(uri, 0) + 1
# Switch detection
if last_context and uri != last_context:
context_switches += 1
last_context = uri
total = len(plays)
breakdown = {k: round(v / total, 2) for k, v in context_counts.items()}
# Top 5 Contexts (Requires resolving URI to name, possibly missing metadata here)
sorted_contexts = sorted(unique_contexts.items(), key=lambda x: x[1], reverse=True)[:5]
return {
@@ -522,16 +559,17 @@ class StatsService:
"album_purist_score": breakdown.get("album", 0),
"playlist_dependency": breakdown.get("playlist", 0),
"context_loyalty": round(len(plays) / len(unique_contexts), 2) if unique_contexts else 0,
"context_switching_rate": round(context_switches / (total - 1), 2) if total > 1 else 0,
"top_context_uris": [{"uri": k, "count": v} for k, v in sorted_contexts]
}
def compute_taste_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Mainstream vs. Hipster analysis based on Track.popularity (0-100).
Mainstream vs. Hipster analysis.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
PlayHistory.played_at < period_end
)
plays = query.all()
if not plays: return {}
@@ -564,38 +602,47 @@ class StatsService:
def compute_lifecycle_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Determines if tracks are 'New Discoveries' or 'Old Favorites'.
Discovery, Recurrence, Comebacks, Obsessions.
"""
# 1. Get tracks played in this period
# 1. Current plays
current_plays = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
PlayHistory.played_at < period_end
).all()
if not current_plays: return {}
current_track_ids = set([p.track_id for p in current_plays])
# 2. Check if these tracks were played BEFORE period_start
# We find which of the current_track_ids exist in history < period_start
# 2. Historical check
old_tracks_query = self.db.query(distinct(PlayHistory.track_id)).filter(
PlayHistory.track_id.in_(current_track_ids),
PlayHistory.played_at < period_start
)
old_track_ids = set([r[0] for r in old_tracks_query.all()])
# 3. Calculate Discovery
# 3. Discovery
new_discoveries = current_track_ids - old_track_ids
discovery_count = len(new_discoveries)
# Calculate plays on new discoveries
# 4. Obsessions (Tracks with > 5 plays in period)
track_counts = {}
for p in current_plays:
track_counts[p.track_id] = track_counts.get(p.track_id, 0) + 1
obsessions = [tid for tid, count in track_counts.items() if count >= 5]
# 5. Comeback Detection (Old tracks not played in last 30 days)
# Simplified: If in old_track_ids but NOT in last 30 days before period_start?
# That requires a gap check. For now, we will mark 'recurrence' as general relistening.
plays_on_new = len([p for p in current_plays if p.track_id in new_discoveries])
total_plays = len(current_plays)
return {
"discovery_count": discovery_count,
"discovery_count": len(new_discoveries),
"discovery_rate": round(plays_on_new / total_plays, 3) if total_plays > 0 else 0,
"recurrence_rate": round((total_plays - plays_on_new) / total_plays, 3) if total_plays > 0 else 0
"recurrence_rate": round((total_plays - plays_on_new) / total_plays, 3) if total_plays > 0 else 0,
"obsession_count": len(obsessions),
"obsession_rate": round(len(obsessions) / len(current_track_ids), 3) if current_track_ids else 0
}
def compute_explicit_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
@@ -604,7 +651,7 @@ class StatsService:
"""
query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
PlayHistory.played_at < period_end
)
plays = query.all()
@@ -618,24 +665,14 @@ class StatsService:
for p in plays:
h = p.played_at.hour
hourly_total[h] += 1
# Check raw_data for explicit flag
t = p.track
is_explicit = False
if t.raw_data and t.raw_data.get("explicit"):
is_explicit = True
if is_explicit:
explicit_count += 1
hourly_explicit[h] += 1
# Calculate hourly percentages
hourly_rates = []
for i in range(24):
if hourly_total[i] > 0:
hourly_rates.append(round(hourly_explicit[i] / hourly_total[i], 2))
else:
hourly_rates.append(0.0)
hourly_rates.append(round(hourly_explicit[i] / hourly_total[i], 2) if hourly_total[i] > 0 else 0.0)
return {
"explicit_rate": round(explicit_count / total_plays, 3),
@@ -644,7 +681,6 @@ class StatsService:
}
def generate_full_report(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
# 1. Calculate all current stats
current_stats = {
"period": {"start": period_start.isoformat(), "end": period_end.isoformat()},
"volume": self.compute_volume_stats(period_start, period_end),
@@ -659,7 +695,19 @@ class StatsService:
"skips": self.compute_skip_stats(period_start, period_end)
}
# 2. Calculate Comparison
current_stats["comparison"] = self.compute_comparison(current_stats, period_start, period_end)
return current_stats
def _empty_volume_stats(self):
return {
"total_plays": 0, "estimated_minutes": 0, "unique_tracks": 0,
"unique_artists": 0, "unique_albums": 0, "unique_genres": 0,
"top_tracks": [], "top_artists": [], "top_albums": [], "top_genres": [],
"repeat_rate": 0, "one_and_done_rate": 0,
"concentration": {}
}
def _pct_change(self, curr, prev):
if prev == 0:
return 100.0 if curr > 0 else 0.0
return round(((curr - prev) / prev) * 100, 1)