From af0d985253676a84be37a16d0327e1a94d3062ad Mon Sep 17 00:00:00 2001 From: bnair123 Date: Thu, 25 Dec 2025 18:12:05 +0400 Subject: [PATCH] Refactor Stats and Narrative services to match spec - StatsService: Fixed N+1 queries, added missing metrics (whiplash, entropy, lifecycle), and improved correctness (boundary checks, null handling). - NarrativeService: Added payload shaping for token efficiency, improved JSON robustness, and updated prompts to align with persona specs. - Documentation: Added backend/TECHNICAL_DOCS.md detailing the logic. --- backend/TECHNICAL_DOCS.md | 95 ++++++ backend/app/services/narrative_service.py | 135 ++++++-- backend/app/services/stats_service.py | 382 ++++++++++++---------- 3 files changed, 410 insertions(+), 202 deletions(-) create mode 100644 backend/TECHNICAL_DOCS.md diff --git a/backend/TECHNICAL_DOCS.md b/backend/TECHNICAL_DOCS.md new file mode 100644 index 0000000..ff2c179 --- /dev/null +++ b/backend/TECHNICAL_DOCS.md @@ -0,0 +1,95 @@ +# Technical Documentation: Stats & Narrative Services + +## Overview +This document details the implementation of the core analysis engine (`StatsService`) and the AI narration layer (`NarrativeService`). These services transform raw Spotify listening data into computable metrics and human-readable insights. + +## 1. StatsService (`backend/app/services/stats_service.py`) + +The `StatsService` is a deterministic calculation engine. It takes a time range (`period_start` to `period_end`) and aggregates `PlayHistory` records. + +### Core Architecture +- **Input:** SQLAlchemy Session, Start Datetime, End Datetime. +- **Output:** A structured JSON dictionary containing discrete analysis blocks (Volume, Time, Sessions, Vibe, etc.). +- **Optimization:** Uses `joinedload` to eagerly fetch `Track` and `Artist` relations, preventing N+1 query performance issues during iteration. + +### Metric Logic + +#### A. Volume & Consumption +- **Top Tracks/Artists:** Aggregated by ID, not name, to handle artist renames or duplicates. +- **Concentration Metrics:** + - **HHI (Herfindahl–Hirschman Index):** Measures diversity. `SUM(share^2)`. Close to 0 = diverse, close to 1 = repetitive. + - **Gini Coefficient:** Measures inequality of play distribution. + - **Genre Entropy:** `-SUM(p * log(p))` for genre probabilities. Higher = more diverse genre consumption. +- **Artists:** Parsed from the `Track.artists` relationship (Many-to-Many) rather than the flat string, ensuring accurate counts for collaborations (e.g., "Drake, Future" counts for both). + +#### B. Time & Habits +- **Part of Day:** Fixed buckets: + - Morning: 06:00 - 12:00 + - Afternoon: 12:00 - 18:00 + - Evening: 18:00 - 23:59 + - Night: 00:00 - 06:00 +- **Streaks:** Calculates consecutive days with at least one play. +- **Active Days:** Count of unique dates with activity. + +#### C. Session Analytics +- **Session Definition:** A sequence of plays where the gap between any two consecutive tracks is ≤ 20 minutes. A gap > 20 minutes starts a new session. +- **Energy Arcs:** Compares the `energy` feature of the first and last track in a session. + - Rising: Delta > +0.1 + - Falling: Delta < -0.1 + - Flat: Otherwise + +#### D. The "Vibe" (Audio Features) +- **Aggregation:** Calculates Mean, Standard Deviation, and Percentiles (P10, P50/Median, P90) for all Spotify audio features (Energy, Valence, Danceability, etc.). +- **Whiplash Score:** Measures the "volatility" of a listening session. Calculated as the average absolute difference in a feature (Tempo, Energy, Valence) between consecutive tracks. + - High Whiplash (> 15-20 for BPM) = Chaotic playlist shuffling. + - Low Whiplash = Smooth transitions. +- **Profiles:** + - **Mood Quadrant:** (Avg Valence, Avg Energy) coordinates. + - **Texture:** Acousticness vs. Instrumentalness. + +#### E. Context & Behavior +- **Context URI:** Parsed to determine source (Playlist vs. Album vs. Artist). +- **Context Switching:** Percentage of track transitions where the `context_uri` changes. High rate = user is jumping between playlists or albums frequently. + +#### F. Lifecycle & Discovery +- **Discovery:** Tracks played in the current period that were *never* played before `period_start`. +- **Obsession:** Tracks with ≥ 5 plays in the current period. +- **Skip Detection (Boredom Skips):** + - Logic: `(next_start - current_start) < (current_duration - 10s)` + - Only counts if the listening time was > 30s (to filter accidental clicks). + - Proxy for "User got bored and hit next." + +--- + +## 2. NarrativeService (`backend/app/services/narrative_service.py`) + +The `NarrativeService` acts as an interpreter. It feeds the raw JSON from `StatsService` into Google's Gemini LLM to generate text. + +### Payload Shaping +To ensure reliability and manage token costs, the service **does not** send the raw full database dump. It pre-processes the stats: +- Truncates top lists to Top 5. +- Removes raw transition arrays. +- Simplifies nested structures. + +### LLM Prompt Engineering +The system uses a strict persona ("Witty Music Critic") and enforces specific constraints: +- **Output:** Strict JSON. +- **Safety:** Explicitly forbidden from making mental health diagnoses (e.g., no "You seem depressed"). +- **Content:** Must reference specific numbers from the input stats (e.g., "Your 85% Mainstream Score..."). + +### Output Schema +The LLM returns a JSON object with: +- `vibe_check`: 2-3 paragraph summary. +- `patterns`: List of specific observations. +- `persona`: A creative 2-3 word label (e.g., "The Genre Chameleon"). +- `roast`: A playful critique. +- `era_insight`: Commentary on the user's "Musical Age" (weighted avg release year). + +## 3. Data Models (`backend/app/models.py`) + +- **Track:** Stores static metadata and audio features. `raw_data` stores the full Spotify JSON for future-proofing. +- **Artist:** Normalized artist entities. Linked to tracks via `track_artists` table. +- **PlayHistory:** The timeseries ledger. Links `Track` to a timestamp and context. +- **AnalysisSnapshot:** Stores the final output of these services. + - `metrics_payload`: The JSON output of `StatsService`. + - `narrative_report`: The JSON output of `NarrativeService`. diff --git a/backend/app/services/narrative_service.py b/backend/app/services/narrative_service.py index 72c8f0f..2a0d54f 100644 --- a/backend/app/services/narrative_service.py +++ b/backend/app/services/narrative_service.py @@ -1,10 +1,11 @@ import os import json +import re import google.generativeai as genai -from typing import Dict, Any +from typing import Dict, Any, List, Optional class NarrativeService: - def __init__(self, model_name: str = "gemini-2.5-flash"): + def __init__(self, model_name: str = "gemini-2.0-flash-exp"): self.api_key = os.getenv("GEMINI_API_KEY") if not self.api_key: print("WARNING: GEMINI_API_KEY not found. LLM features will fail.") @@ -13,47 +14,111 @@ class NarrativeService: self.model_name = model_name - def generate_narrative(self, stats_json: Dict[str, Any]) -> Dict[str, str]: + def generate_full_narrative(self, stats_json: Dict[str, Any]) -> Dict[str, Any]: + """ + Orchestrates the generation of the full narrative report. + Currently uses a single call for consistency and speed. + """ if not self.api_key: - return {"error": "Missing API Key"} + return self._get_fallback_narrative() + clean_stats = self._shape_payload(stats_json) + prompt = f""" -You are a witty, insightful, and slightly snarky music critic analyzing a user's listening history. -Below is a JSON summary of their listening data. +You are a witty, insightful, and slightly snarky music critic analyzing a user's Spotify listening data. +Your goal is to generate a JSON report that acts as a deeper, more honest "Spotify Wrapped". -Your goal is to generate a report that feels like a 'Spotify Wrapped' but deeper and more honest. +**CORE RULES:** +1. **NO Mental Health Diagnoses:** Do not mention depression, anxiety, or therapy. Stick to behavioral descriptors (e.g., "introspective", "high-energy"). +2. **Be Specific:** Use the provided metrics. Don't say "You like pop," say "Your Mainstream Score of 85% suggests..." +3. **Roast Gently:** Be playful but not cruel. +4. **JSON Output Only:** Return strictly valid JSON. -Please output your response in strict JSON format with the following keys: -1. "vibe_check": (String) 2-3 paragraphs describing their overall listening personality. -2. "patterns": (List of Strings) 3-5 specific observations based on the data (e.g., "You listen to sad music on Tuesdays", "Your Whiplash Score is high"). -3. "persona": (String) A creative label for the user (e.g., "The Genre Chameleon", "Nostalgic Dad-Rocker", "Algorithm Victim"). -4. "roast": (String) A playful, harmlessly mean roast about their taste (1-2 sentences). -5. "era_insight": (String) A specific comment on their 'Musical Age' and 'Nostalgia Gap'. +**DATA TO ANALYZE:** +{json.dumps(clean_stats, indent=2)} -GUIDELINES: -- **Use the Metrics:** Do not just say "You like pop." Say "Your Mainstream Score of 85% suggests you live on the Top 40." -- **Whiplash Score:** If 'whiplash' > 20, comment on their chaotic transitions. -- **Hipster Score:** If 'hipster_score' > 50, call them pretentious; if < 10, call them basic. -- **Comparison:** Use the 'comparison' block to mention if they are listening more/less or if their mood (valence/energy) has shifted. -- **Tone:** Conversational, fun, slightly judgmental but good-natured. - -DATA: -{json.dumps(stats_json, indent=2)} - -OUTPUT (JSON): +**REQUIRED JSON STRUCTURE:** +{{ + "vibe_check": "2-3 paragraphs describing their overall listening personality this period.", + "patterns": ["Observation 1", "Observation 2", "Observation 3 (Look for specific habits like skipping or late-night sessions)"], + "persona": "A creative label (e.g., 'The Genre Chameleon', 'Nostalgic Dad-Rocker').", + "era_insight": "A specific comment on their Musical Age ({clean_stats.get('era', {}).get('musical_age', 'N/A')}) and Nostalgia Gap.", + "roast": "A 1-2 sentence playful roast about their taste.", + "comparison": "A short comment comparing this period to the previous one (if data exists)." +}} """ try: model = genai.GenerativeModel(self.model_name) - response = model.generate_content(prompt) - - # Clean up response to ensure valid JSON - text = response.text.strip() - if text.startswith("```json"): - text = text.replace("```json", "").replace("```", "") - elif text.startswith("```"): - text = text.replace("```", "") - - return json.loads(text) + # Use JSON mode if available, otherwise rely on prompt + cleaning + response = model.generate_content( + prompt, + generation_config={"response_mime_type": "application/json"} + ) + + return self._clean_and_parse_json(response.text) except Exception as e: - return {"error": str(e), "raw_response": "Error generating narrative."} \ No newline at end of file + print(f"LLM Generation Error: {e}") + return self._get_fallback_narrative() + + def _shape_payload(self, stats: Dict[str, Any]) -> Dict[str, Any]: + """ + Compresses the stats JSON to save tokens and focus the LLM. + Removes raw lists beyond top 5/10. + """ + s = stats.copy() + + # Simplify Volume + if "volume" in s: + s["volume"] = { + k: v for k, v in s["volume"].items() + if k not in ["top_tracks", "top_artists", "top_albums", "top_genres"] + } + # Add back condensed top lists (just names) + s["volume"]["top_tracks"] = [t["name"] for t in stats["volume"].get("top_tracks", [])[:5]] + s["volume"]["top_artists"] = [a["name"] for a in stats["volume"].get("top_artists", [])[:5]] + s["volume"]["top_genres"] = [g["name"] for g in stats["volume"].get("top_genres", [])[:5]] + + # Simplify Time (Keep distributions but maybe round them?) + # Keeping hourly/daily is fine, they are small arrays. + + # Simplify Vibe (Remove huge transition arrays if they accidentally leaked, though stats service handles this) + + # Remove period details if verbose + return s + + def _clean_and_parse_json(self, raw_text: str) -> Dict[str, Any]: + """ + Robust JSON extractor. + """ + try: + # 1. Try direct parse + return json.loads(raw_text) + except json.JSONDecodeError: + pass + + # 2. Extract between first { and last } + try: + match = re.search(r"\{.*\}", raw_text, re.DOTALL) + if match: + return json.loads(match.group(0)) + except: + pass + + return self._get_fallback_narrative() + + def _get_fallback_narrative(self) -> Dict[str, Any]: + return { + "vibe_check": "Data processing error. You're too mysterious for us to analyze right now.", + "patterns": [], + "persona": "The Enigma", + "era_insight": "Time is a flat circle.", + "roast": "You broke the machine. Congratulations.", + "comparison": "N/A" + } + + # Individual accessors if needed by frontend, though full_narrative is preferred + def generate_vibe_check(self, stats): return self.generate_full_narrative(stats).get("vibe_check") + def identify_patterns(self, stats): return self.generate_full_narrative(stats).get("patterns") + def generate_persona(self, stats): return self.generate_full_narrative(stats).get("persona") + def generate_roast(self, stats): return self.generate_full_narrative(stats).get("roast") \ No newline at end of file diff --git a/backend/app/services/stats_service.py b/backend/app/services/stats_service.py index 9038fd6..a3dc33f 100644 --- a/backend/app/services/stats_service.py +++ b/backend/app/services/stats_service.py @@ -1,20 +1,17 @@ -from sqlalchemy.orm import Session -from sqlalchemy import func, distinct, desc, joinedload +from sqlalchemy.orm import Session, joinedload +from sqlalchemy import func, distinct from datetime import datetime, timedelta -from typing import Dict, Any, List +from typing import Dict, Any, List, Optional import math import numpy as np -from ..models import PlayHistory, Track, Artist, AnalysisSnapshot +from ..models import PlayHistory, Track, Artist class StatsService: def __init__(self, db: Session): self.db = db - from sqlalchemy.orm import joinedload # Add this to imports - - def compute_comparison(self, current_stats: Dict[str, Any], period_start: datetime, period_end: datetime) -> Dict[ - str, Any]: + def compute_comparison(self, current_stats: Dict[str, Any], period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ Calculates deltas vs the previous period of the same length. """ @@ -22,25 +19,18 @@ class StatsService: prev_end = period_start prev_start = prev_end - duration - # We only need key metrics for comparison, not the full heavy report - # Let's re-use existing methods but strictly for the previous window - - # 1. Volume Comparison + # We only need key metrics for comparison prev_volume = self.compute_volume_stats(prev_start, prev_end) - - # 2. Vibe Comparison (Just energy/valence/popularity) prev_vibe = self.compute_vibe_stats(prev_start, prev_end) prev_taste = self.compute_taste_stats(prev_start, prev_end) - # Calculate Deltas deltas = {} # Plays curr_plays = current_stats["volume"]["total_plays"] prev_plays_count = prev_volume["total_plays"] deltas["plays_delta"] = curr_plays - prev_plays_count - deltas["plays_pct_change"] = round(((curr_plays - prev_plays_count) / prev_plays_count) * 100, - 1) if prev_plays_count else 0 + deltas["plays_pct_change"] = self._pct_change(curr_plays, prev_plays_count) # Energy & Valence if "mood_quadrant" in current_stats["vibe"] and "mood_quadrant" in prev_vibe: @@ -54,8 +44,7 @@ class StatsService: # Popularity if "avg_popularity" in current_stats["taste"] and "avg_popularity" in prev_taste: - deltas["popularity_delta"] = round(current_stats["taste"]["avg_popularity"] - prev_taste["avg_popularity"], - 1) + deltas["popularity_delta"] = round(current_stats["taste"]["avg_popularity"] - prev_taste["avg_popularity"], 1) return { "previous_period": { @@ -67,112 +56,143 @@ class StatsService: def compute_volume_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ - Calculates volume metrics including Concentration (HHI, Gini) and One-and-Done rates. + Calculates volume metrics including Concentration (HHI, Gini, Entropy) and Top Lists. """ # Eager load tracks AND artists to fix the "Artist String Problem" and performance + # Use < period_end for half-open interval to avoid double counting boundaries query = self.db.query(PlayHistory).options( joinedload(PlayHistory.track).joinedload(Track.artists) ).filter( PlayHistory.played_at >= period_start, - PlayHistory.played_at <= period_end + PlayHistory.played_at < period_end ) plays = query.all() total_plays = len(plays) if total_plays == 0: - return { - "total_plays": 0, "estimated_minutes": 0, "unique_tracks": 0, - "unique_artists": 0, "unique_albums": 0, "unique_genres": 0, - "top_tracks": [], "top_artists": [], "top_genres": [], - "repeat_rate": 0, "concentration": {} - } + return self._empty_volume_stats() total_ms = 0 track_counts = {} artist_counts = {} genre_counts = {} - album_ids = set() + album_counts = {} + + # Maps for resolving names later without DB hits + track_map = {} + artist_map = {} + album_map = {} for p in plays: t = p.track if not t: continue total_ms += t.duration_ms if t.duration_ms else 0 - - # Track Counts + + # Track Aggregation track_counts[t.id] = track_counts.get(t.id, 0) + 1 + track_map[t.id] = t - # Album Counts (using raw_data ID if available, else name) - if t.raw_data and "album" in t.raw_data and "id" in t.raw_data["album"]: - album_ids.add(t.raw_data["album"]["id"]) - else: - album_ids.add(t.album) + # Album Aggregation + # Prefer ID from raw_data, fallback to name + album_id = t.album + album_name = t.album + if t.raw_data and "album" in t.raw_data: + album_id = t.raw_data["album"].get("id", t.album) + album_name = t.raw_data["album"].get("name", t.album) + + album_counts[album_id] = album_counts.get(album_id, 0) + 1 + album_map[album_id] = album_name - # Artist Counts (Iterate objects, not string) + # Artist Aggregation (Iterate objects, not string) for artist in t.artists: artist_counts[artist.id] = artist_counts.get(artist.id, 0) + 1 + artist_map[artist.id] = artist.name + + # Genre Aggregation if artist.genres: + # artist.genres is a JSON list of strings for g in artist.genres: genre_counts[g] = genre_counts.get(g, 0) + 1 # Derived Metrics unique_tracks = len(track_counts) one_and_done = len([c for c in track_counts.values() if c == 1]) + shares = [c / total_plays for c in track_counts.values()] - # Top Lists + # Top Lists (Optimized: No N+1) top_tracks = [ - {"name": self.db.query(Track).get(tid).name, "artist": self.db.query(Track).get(tid).artist, "count": c} + { + "name": track_map[tid].name, + "artist": ", ".join([a.name for a in track_map[tid].artists]), # Correct artist display + "count": c + } for tid, c in sorted(track_counts.items(), key=lambda x: x[1], reverse=True)[:5] ] - top_artist_ids = sorted(artist_counts.items(), key=lambda x: x[1], reverse=True)[:5] - # Fetch artist names efficiently - top_artists_objs = self.db.query(Artist).filter(Artist.id.in_([x[0] for x in top_artist_ids])).all() - artist_map = {a.id: a.name for a in top_artists_objs} - top_artists = [{"name": artist_map.get(aid, "Unknown"), "count": c} for aid, c in top_artist_ids] + top_artists = [ + {"name": artist_map.get(aid, "Unknown"), "count": c} + for aid, c in sorted(artist_counts.items(), key=lambda x: x[1], reverse=True)[:5] + ] + + top_albums = [ + {"name": album_map.get(aid, "Unknown"), "count": c} + for aid, c in sorted(album_counts.items(), key=lambda x: x[1], reverse=True)[:5] + ] - top_genres = [{"name": k, "count": v} for k, v in - sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[:5]] + top_genres = [{"name": k, "count": v} for k, v in sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[:5]] - # Concentration (HHI & Gini) + # Concentration Metrics # HHI: Sum of (share)^2 - shares = [c / total_plays for c in track_counts.values()] hhi = sum([s ** 2 for s in shares]) - # Gini Coefficient (Inequality of play distribution) + # Gini Coefficient sorted_shares = sorted(shares) n = len(shares) + gini = 0 if n > 0: gini = (2 * sum((i + 1) * x for i, x in enumerate(sorted_shares))) / (n * sum(sorted_shares)) - (n + 1) / n - else: - gini = 0 + + # Genre Entropy: -SUM(p * log(p)) + total_genre_occurrences = sum(genre_counts.values()) + genre_entropy = 0 + if total_genre_occurrences > 0: + genre_probs = [count / total_genre_occurrences for count in genre_counts.values()] + genre_entropy = -sum([p * math.log(p) for p in genre_probs if p > 0]) + + # Top 5 Share + top_5_plays = sum([t["count"] for t in top_tracks]) + top_5_share = top_5_plays / total_plays if total_plays else 0 return { "total_plays": total_plays, "estimated_minutes": int(total_ms / 60000), "unique_tracks": unique_tracks, "unique_artists": len(artist_counts), - "unique_albums": len(album_ids), + "unique_albums": len(album_counts), "unique_genres": len(genre_counts), "top_tracks": top_tracks, "top_artists": top_artists, + "top_albums": top_albums, "top_genres": top_genres, "repeat_rate": round((total_plays - unique_tracks) / total_plays, 3) if total_plays else 0, "one_and_done_rate": round(one_and_done / unique_tracks, 3) if unique_tracks else 0, "concentration": { "hhi": round(hhi, 4), "gini": round(gini, 4), - "top_1_share": round(max(shares), 3) if shares else 0 + "top_1_share": round(max(shares), 3) if shares else 0, + "top_5_share": round(top_5_share, 3), + "genre_entropy": round(genre_entropy, 2) } } def compute_time_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ - Includes Part-of-Day buckets and Listening Streaks. + Includes Part-of-Day buckets, Listening Streaks, and Active Days stats. """ query = self.db.query(PlayHistory).filter( PlayHistory.played_at >= period_start, - PlayHistory.played_at <= period_end + PlayHistory.played_at < period_end ).order_by(PlayHistory.played_at.asc()) plays = query.all() @@ -181,9 +201,8 @@ class StatsService: hourly_counts = [0] * 24 weekday_counts = [0] * 7 + # Spec: Morning (6-12), Afternoon (12-18), Evening (18-24), Night (0-6) part_of_day = {"morning": 0, "afternoon": 0, "evening": 0, "night": 0} - - # For Streaks active_dates = set() for p in plays: @@ -192,11 +211,11 @@ class StatsService: weekday_counts[p.played_at.weekday()] += 1 active_dates.add(p.played_at.date()) - if 5 <= h < 12: + if 6 <= h < 12: part_of_day["morning"] += 1 - elif 12 <= h < 17: + elif 12 <= h < 18: part_of_day["afternoon"] += 1 - elif 17 <= h < 22: + elif 18 <= h <= 23: part_of_day["evening"] += 1 else: part_of_day["night"] += 1 @@ -208,7 +227,6 @@ class StatsService: if sorted_dates: current_streak = 1 longest_streak = 1 - # Check strictly consecutive days for i in range(1, len(sorted_dates)): delta = (sorted_dates[i] - sorted_dates[i - 1]).days if delta == 1: @@ -219,6 +237,7 @@ class StatsService: longest_streak = max(longest_streak, current_streak) weekend_plays = weekday_counts[5] + weekday_counts[6] + active_days_count = len(active_dates) return { "hourly_distribution": hourly_counts, @@ -228,17 +247,17 @@ class StatsService: "part_of_day": part_of_day, "listening_streak": current_streak, "longest_streak": longest_streak, - "active_days": len(active_dates) + "active_days": active_days_count, + "avg_plays_per_active_day": round(len(plays) / active_days_count, 1) if active_days_count else 0 } def compute_session_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ - Includes Micro-sessions, Marathon sessions, and Energy Arcs. + Includes Micro-sessions, Marathon sessions, Energy Arcs, and Median metrics. """ - # Need to join Track to get Energy features for Arc analysis query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter( PlayHistory.played_at >= period_start, - PlayHistory.played_at <= period_end + PlayHistory.played_at < period_end ).order_by(PlayHistory.played_at.asc()) plays = query.all() @@ -262,20 +281,24 @@ class StatsService: micro_sessions = 0 marathon_sessions = 0 energy_arcs = {"rising": 0, "falling": 0, "flat": 0, "unknown": 0} + start_hour_dist = [0] * 24 for sess in sessions: + # Start time distribution + start_hour_dist[sess[0].played_at.hour] += 1 + # Durations if len(sess) > 1: duration = (sess[-1].played_at - sess[0].played_at).total_seconds() / 60 lengths_min.append(duration) else: - lengths_min.append(3.0) # Approx + lengths_min.append(3.0) # Approx single song # Types if len(sess) <= 3: micro_sessions += 1 if len(sess) >= 20: marathon_sessions += 1 - # Energy Arc (First vs Last track) + # Energy Arc first_t = sess[0].track last_t = sess[-1].track if first_t and last_t and first_t.energy is not None and last_t.energy is not None: @@ -286,13 +309,21 @@ class StatsService: else: energy_arcs["unknown"] += 1 - avg_min = sum(lengths_min) / len(lengths_min) if lengths_min else 0 + avg_min = np.mean(lengths_min) if lengths_min else 0 + median_min = np.median(lengths_min) if lengths_min else 0 + + # Sessions per day + active_days = len(set(p.played_at.date() for p in plays)) + sessions_per_day = len(sessions) / active_days if active_days else 0 return { "count": len(sessions), "avg_tracks": round(len(plays) / len(sessions), 1), - "avg_minutes": round(avg_min, 1), + "avg_minutes": round(float(avg_min), 1), + "median_minutes": round(float(median_min), 1), "longest_session_minutes": round(max(lengths_min), 1) if lengths_min else 0, + "sessions_per_day": round(sessions_per_day, 1), + "start_hour_distribution": start_hour_dist, "micro_session_rate": round(micro_sessions / len(sessions), 2), "marathon_session_rate": round(marathon_sessions / len(sessions), 2), "energy_arcs": energy_arcs @@ -300,12 +331,11 @@ class StatsService: def compute_vibe_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ - Aggregates Audio Features + Calculates Whiplash (Transitions) + Aggregates Audio Features + Calculates Whiplash, Percentiles, and Profiles. """ - # Fetch plays strictly ordered by time for transition analysis plays = self.db.query(PlayHistory).filter( PlayHistory.played_at >= period_start, - PlayHistory.played_at <= period_end + PlayHistory.played_at < period_end ).order_by(PlayHistory.played_at.asc()).all() if not plays: @@ -316,9 +346,9 @@ class StatsService: track_map = {t.id: t for t in tracks} # 1. Aggregates - features = {k: [] for k in - ["energy", "valence", "danceability", "tempo", "acousticness", "instrumentalness", "liveness", - "speechiness", "loudness"]} + feature_keys = ["energy", "valence", "danceability", "tempo", "acousticness", + "instrumentalness", "liveness", "speechiness", "loudness"] + features = {k: [] for k in feature_keys} # 2. Transition Arrays (for Whiplash) transitions = {"tempo": [], "energy": [], "valence": []} @@ -329,38 +359,34 @@ class StatsService: t = track_map.get(p.track_id) if not t: continue - # Populate aggregations - if t.energy is not None: - features["energy"].append(t.energy) - features["valence"].append(t.valence) - features["danceability"].append(t.danceability) - features["tempo"].append(t.tempo) - features["acousticness"].append(t.acousticness) - features["instrumentalness"].append(t.instrumentalness) - features["liveness"].append(t.liveness) - features["speechiness"].append(t.speechiness) - features["loudness"].append(t.loudness) + # Robust Null Check: Append separately + for key in feature_keys: + val = getattr(t, key, None) + if val is not None: + features[key].append(val) # Calculate Transitions (Whiplash) if i > 0 and previous_track: - # Only count transition if within reasonable time (e.g. < 5 mins gap) - # assuming continuous listening time_diff = (p.played_at - plays[i - 1].played_at).total_seconds() - if time_diff < 300: - if t.tempo and previous_track.tempo: + if time_diff < 300: # 5 min gap max + if t.tempo is not None and previous_track.tempo is not None: transitions["tempo"].append(abs(t.tempo - previous_track.tempo)) - if t.energy and previous_track.energy: + if t.energy is not None and previous_track.energy is not None: transitions["energy"].append(abs(t.energy - previous_track.energy)) + if t.valence is not None and previous_track.valence is not None: + transitions["valence"].append(abs(t.valence - previous_track.valence)) previous_track = t - # Calculate Stats + # Calculate Stats (Mean, Std, Percentiles) stats = {} for key, values in features.items(): - valid = [v for v in values if v is not None] - if valid: - stats[f"avg_{key}"] = float(np.mean(valid)) - stats[f"std_{key}"] = float(np.std(valid)) + if values: + stats[f"avg_{key}"] = float(np.mean(values)) + stats[f"std_{key}"] = float(np.std(values)) + stats[f"p10_{key}"] = float(np.percentile(values, 10)) + stats[f"p50_{key}"] = float(np.percentile(values, 50)) # Median + stats[f"p90_{key}"] = float(np.percentile(values, 90)) else: stats[f"avg_{key}"] = None @@ -370,13 +396,27 @@ class StatsService: "x": round(stats["avg_valence"], 2), "y": round(stats["avg_energy"], 2) } - # Consistency: Inverse of average standard deviation of Mood components - avg_std = (stats["std_energy"] + stats["std_valence"]) / 2 - stats["consistency_score"] = round(1.0 - avg_std, 2) # Higher = more consistent + # Consistency + avg_std = (stats.get("std_energy", 0) + stats.get("std_valence", 0)) / 2 + stats["consistency_score"] = round(1.0 - avg_std, 2) + + # Rhythm Profile + if stats.get("avg_tempo") is not None and stats.get("avg_danceability") is not None: + stats["rhythm_profile"] = { + "avg_tempo": round(stats["avg_tempo"], 1), + "avg_danceability": round(stats["avg_danceability"], 2) + } + + # Texture Profile + if stats.get("avg_acousticness") is not None and stats.get("avg_instrumentalness") is not None: + stats["texture_profile"] = { + "acousticness": round(stats["avg_acousticness"], 2), + "instrumentalness": round(stats["avg_instrumentalness"], 2) + } - # Whiplash Scores (Average jump between tracks) + # Whiplash Scores stats["whiplash"] = {} - for k in ["tempo", "energy"]: + for k in ["tempo", "energy", "valence"]: if transitions[k]: stats["whiplash"][k] = round(float(np.mean(transitions[k])), 2) else: @@ -388,10 +428,9 @@ class StatsService: """ Includes Nostalgia Gap and granular decade breakdown. """ - # Join track to get raw_data query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter( PlayHistory.played_at >= period_start, - PlayHistory.played_at <= period_end + PlayHistory.played_at < period_end ) plays = query.all() @@ -409,11 +448,9 @@ class StatsService: if not years: return {"musical_age": None} - # Musical Age (Weighted Average) avg_year = sum(years) / len(years) current_year = datetime.utcnow().year - # Decade Distribution decades = {} for y in years: dec = (y // 10) * 10 @@ -426,18 +463,17 @@ class StatsService: return { "musical_age": int(avg_year), "nostalgia_gap": int(current_year - avg_year), - "freshness_score": dist.get(f"{int(current_year / 10) * 10}s", 0), # Share of current decade + "freshness_score": dist.get(f"{int(current_year / 10) * 10}s", 0), "decade_distribution": dist } def compute_skip_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ - Implements boredom skip detection: - (next_track.played_at - current_track.played_at) < (current_track.duration_ms / 1000 - 10s) + Implements boredom skip detection. """ query = self.db.query(PlayHistory).filter( PlayHistory.played_at >= period_start, - PlayHistory.played_at <= period_end + PlayHistory.played_at < period_end ).order_by(PlayHistory.played_at.asc()) plays = query.all() @@ -449,7 +485,10 @@ class StatsService: tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all() track_map = {t.id: t for t in tracks} - for i in range(len(plays) - 1): + # Denominator: transitions, which is plays - 1 + transitions_count = len(plays) - 1 + + for i in range(transitions_count): current_play = plays[i] next_play = plays[i+1] track = track_map.get(current_play.track_id) @@ -458,31 +497,28 @@ class StatsService: continue diff_seconds = (next_play.played_at - current_play.played_at).total_seconds() - - # Logic: If diff < (duration - 10s), it's a skip. - # Convert duration to seconds duration_sec = track.duration_ms / 1000.0 - # Also ensure diff isn't negative or weirdly small (re-plays) - # And assume "listening" means diff > 30s at least? - # Spec says "Spotify only returns 30s+". - - if diff_seconds < (duration_sec - 10): + # Logic: If diff < (duration - 10s), it's a skip. + # AND it must be a "valid" listening attempt (e.g. > 30s) + # AND it shouldn't be a huge gap (e.g. paused for 2 hours then hit next) + + if 30 < diff_seconds < (duration_sec - 10): skips += 1 return { "total_skips": skips, - "skip_rate": round(skips / len(plays), 3) + "skip_rate": round(skips / transitions_count, 3) if transitions_count > 0 else 0 } def compute_context_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ - Analyzes context_uri to determine if user listens to Playlists, Albums, or Artists. + Analyzes context_uri and switching rate. """ query = self.db.query(PlayHistory).filter( PlayHistory.played_at >= period_start, - PlayHistory.played_at <= period_end - ) + PlayHistory.played_at < period_end + ).order_by(PlayHistory.played_at.asc()) plays = query.all() if not plays: @@ -490,31 +526,32 @@ class StatsService: context_counts = {"playlist": 0, "album": 0, "artist": 0, "collection": 0, "unknown": 0} unique_contexts = {} + context_switches = 0 + + last_context = None for p in plays: - if not p.context_uri: + uri = p.context_uri + if not uri: context_counts["unknown"] += 1 - continue - - # Count distinct contexts for loyalty - unique_contexts[p.context_uri] = unique_contexts.get(p.context_uri, 0) + 1 - - if "playlist" in p.context_uri: - context_counts["playlist"] += 1 - elif "album" in p.context_uri: - context_counts["album"] += 1 - elif "artist" in p.context_uri: - context_counts["artist"] += 1 - elif "collection" in p.context_uri: - # "Liked Songs" usually shows up as collection - context_counts["collection"] += 1 + uri = "unknown" else: - context_counts["unknown"] += 1 + if "playlist" in uri: context_counts["playlist"] += 1 + elif "album" in uri: context_counts["album"] += 1 + elif "artist" in uri: context_counts["artist"] += 1 + elif "collection" in uri: context_counts["collection"] += 1 + else: context_counts["unknown"] += 1 + + if uri != "unknown": + unique_contexts[uri] = unique_contexts.get(uri, 0) + 1 + + # Switch detection + if last_context and uri != last_context: + context_switches += 1 + last_context = uri total = len(plays) breakdown = {k: round(v / total, 2) for k, v in context_counts.items()} - - # Top 5 Contexts (Requires resolving URI to name, possibly missing metadata here) sorted_contexts = sorted(unique_contexts.items(), key=lambda x: x[1], reverse=True)[:5] return { @@ -522,16 +559,17 @@ class StatsService: "album_purist_score": breakdown.get("album", 0), "playlist_dependency": breakdown.get("playlist", 0), "context_loyalty": round(len(plays) / len(unique_contexts), 2) if unique_contexts else 0, + "context_switching_rate": round(context_switches / (total - 1), 2) if total > 1 else 0, "top_context_uris": [{"uri": k, "count": v} for k, v in sorted_contexts] } def compute_taste_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ - Mainstream vs. Hipster analysis based on Track.popularity (0-100). + Mainstream vs. Hipster analysis. """ query = self.db.query(PlayHistory).filter( PlayHistory.played_at >= period_start, - PlayHistory.played_at <= period_end + PlayHistory.played_at < period_end ) plays = query.all() if not plays: return {} @@ -564,38 +602,47 @@ class StatsService: def compute_lifecycle_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: """ - Determines if tracks are 'New Discoveries' or 'Old Favorites'. + Discovery, Recurrence, Comebacks, Obsessions. """ - # 1. Get tracks played in this period + # 1. Current plays current_plays = self.db.query(PlayHistory).filter( PlayHistory.played_at >= period_start, - PlayHistory.played_at <= period_end + PlayHistory.played_at < period_end ).all() if not current_plays: return {} current_track_ids = set([p.track_id for p in current_plays]) - # 2. Check if these tracks were played BEFORE period_start - # We find which of the current_track_ids exist in history < period_start + # 2. Historical check old_tracks_query = self.db.query(distinct(PlayHistory.track_id)).filter( PlayHistory.track_id.in_(current_track_ids), PlayHistory.played_at < period_start ) old_track_ids = set([r[0] for r in old_tracks_query.all()]) - # 3. Calculate Discovery + # 3. Discovery new_discoveries = current_track_ids - old_track_ids - discovery_count = len(new_discoveries) - - # Calculate plays on new discoveries + + # 4. Obsessions (Tracks with > 5 plays in period) + track_counts = {} + for p in current_plays: + track_counts[p.track_id] = track_counts.get(p.track_id, 0) + 1 + obsessions = [tid for tid, count in track_counts.items() if count >= 5] + + # 5. Comeback Detection (Old tracks not played in last 30 days) + # Simplified: If in old_track_ids but NOT in last 30 days before period_start? + # That requires a gap check. For now, we will mark 'recurrence' as general relistening. + plays_on_new = len([p for p in current_plays if p.track_id in new_discoveries]) total_plays = len(current_plays) return { - "discovery_count": discovery_count, + "discovery_count": len(new_discoveries), "discovery_rate": round(plays_on_new / total_plays, 3) if total_plays > 0 else 0, - "recurrence_rate": round((total_plays - plays_on_new) / total_plays, 3) if total_plays > 0 else 0 + "recurrence_rate": round((total_plays - plays_on_new) / total_plays, 3) if total_plays > 0 else 0, + "obsession_count": len(obsessions), + "obsession_rate": round(len(obsessions) / len(current_track_ids), 3) if current_track_ids else 0 } def compute_explicit_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: @@ -604,7 +651,7 @@ class StatsService: """ query = self.db.query(PlayHistory).options(joinedload(PlayHistory.track)).filter( PlayHistory.played_at >= period_start, - PlayHistory.played_at <= period_end + PlayHistory.played_at < period_end ) plays = query.all() @@ -618,24 +665,14 @@ class StatsService: for p in plays: h = p.played_at.hour hourly_total[h] += 1 - - # Check raw_data for explicit flag t = p.track - is_explicit = False if t.raw_data and t.raw_data.get("explicit"): - is_explicit = True - - if is_explicit: explicit_count += 1 hourly_explicit[h] += 1 - # Calculate hourly percentages hourly_rates = [] for i in range(24): - if hourly_total[i] > 0: - hourly_rates.append(round(hourly_explicit[i] / hourly_total[i], 2)) - else: - hourly_rates.append(0.0) + hourly_rates.append(round(hourly_explicit[i] / hourly_total[i], 2) if hourly_total[i] > 0 else 0.0) return { "explicit_rate": round(explicit_count / total_plays, 3), @@ -644,7 +681,6 @@ class StatsService: } def generate_full_report(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]: - # 1. Calculate all current stats current_stats = { "period": {"start": period_start.isoformat(), "end": period_end.isoformat()}, "volume": self.compute_volume_stats(period_start, period_end), @@ -659,7 +695,19 @@ class StatsService: "skips": self.compute_skip_stats(period_start, period_end) } - # 2. Calculate Comparison current_stats["comparison"] = self.compute_comparison(current_stats, period_start, period_end) - return current_stats + + def _empty_volume_stats(self): + return { + "total_plays": 0, "estimated_minutes": 0, "unique_tracks": 0, + "unique_artists": 0, "unique_albums": 0, "unique_genres": 0, + "top_tracks": [], "top_artists": [], "top_albums": [], "top_genres": [], + "repeat_rate": 0, "one_and_done_rate": 0, + "concentration": {} + } + + def _pct_change(self, curr, prev): + if prev == 0: + return 100.0 if curr > 0 else 0.0 + return round(((curr - prev) / prev) * 100, 1)