Implement Phase 3 Music Analysis and LLM Engine

- Refactor Database: Add `Artist` model, M2M relationship, and `AnalysisSnapshot` model.
- Backend Services: Implement `StatsService` for computable metrics and `NarrativeService` for Gemini LLM integration.
- Fix Ingestion: Correctly handle multiple artists per track and backfill existing data.
- Testing: Add unit tests for statistics logic and live verification scripts.
- Documentation: Add `PHASE_4_FRONTEND_GUIDE.md`.
This commit is contained in:
google-labs-jules[bot]
2025-12-24 23:16:32 +00:00
parent ab47dd62ca
commit f4432154b6
9 changed files with 942 additions and 30 deletions

84
PHASE_4_FRONTEND_GUIDE.md Normal file
View File

@@ -0,0 +1,84 @@
# Phase 4 Frontend Implementation Guide
This guide details how to consume the data generated by the Phase 3 Backend (Analysis & LLM Engine) and how to display it in the frontend.
## 1. Data Source
The backend now produces **Analysis Snapshots**. You should create an API endpoint (e.g., `GET /api/analysis/latest`) that returns the most recent snapshot.
### JSON Payload Structure
The response object contains two main keys: `metrics_payload` (calculated numbers) and `narrative_report` (LLM text).
```json
{
"id": 1,
"date": "2024-12-25T12:00:00Z",
"period_label": "last_30_days",
"metrics_payload": {
"volume": { ... },
"time_habits": { ... },
"sessions": { ... },
"vibe": { ... },
"era": { ... },
"skips": { ... }
},
"narrative_report": {
"vibe_check": "...",
"patterns": ["..."],
"persona": "...",
"roast": "..."
}
}
```
---
## 2. UI Components & Display Strategy
### A. Hero Section ("The Vibe Check")
**Data Source:** `narrative_report`
- **Headline:** Display `narrative_report.persona` as a large badge/title (e.g., "The Focused Fanatic").
- **Narrative:** Display `narrative_report.vibe_check` as the main text.
- **Roast:** Add a small, dismissible "Roast Me" alert box containing `narrative_report.roast`.
### B. "The Vibe" Radar Chart
**Data Source:** `metrics_payload.vibe`
- Use a **Radar Chart** (Spider Chart) with the following axes (0.0 - 1.0):
- Energy (`avg_energy`)
- Valence (`avg_valence`)
- Danceability (`avg_danceability`)
- Acousticness (`avg_acousticness`)
- Instrumentalness (`avg_instrumentalness`)
- **Tooltip:** Show the exact value.
### C. Listening Habits (Time & Sessions)
**Data Source:** `metrics_payload.time_habits` & `metrics_payload.sessions`
- **Hourly Heatmap:** Use a bar chart for `metrics_payload.time_habits.hourly_distribution` (0-23 hours). Highlight the `peak_hour`.
- **Session Stats:** Display "Average Session" stats:
- `sessions.avg_minutes` (mins)
- `sessions.avg_tracks` (tracks)
- `sessions.count` (total sessions)
### D. Top Favorites
**Data Source:** `metrics_payload.volume`
- **Lists:** Display Top 5 Tracks, Artists, and Genres.
- **Images:** You will need to fetch Artist/Track images from Spotify API using the IDs provided in the lists (the current snapshot only stores names/counts for simplicity, but the IDs are available in the backend if you expand the serializer). *Note: Phase 3 backend currently returns names. For Phase 4, ensure the API endpoint enriches these with Spotify Image URLs.*
### E. Era Analysis
**Data Source:** `metrics_payload.era`
- **Musical Age:** Display `musical_age` (e.g., "1998") prominently.
- **Distribution:** Pie chart for `decade_distribution`.
### F. Attention Span (Skips)
**Data Source:** `metrics_payload.skips`
- **Metric:** Display "Skip Rate" (`skip_rate`) as a percentage.
- **Insight:** "You skipped X tracks this month."
---
## 3. Integration Tips
- **Caching:** The backend stores snapshots. You do NOT need to trigger a calculation on page load. Just fetch the latest snapshot.
- **Theme:** The app uses Ant Design Dark Mode. Stick to Spotify colors (Black/Green/White) but add accent colors based on the "Vibe" (e.g., High Energy = Red/Orange, Low Energy = Blue/Purple).
- **Expansion:** Future snapshots allow for "Trend" views. You can graph `metrics_payload.volume.total_plays` over the last 6 snapshots to show activity trends.

View File

@@ -0,0 +1,63 @@
"""Add Artist and Snapshot models
Revision ID: 4401cb416661
Revises: 707387fe1be2
Create Date: 2025-12-24 23:06:59.235445
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = '4401cb416661'
down_revision: Union[str, Sequence[str], None] = '707387fe1be2'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Upgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('analysis_snapshots',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('date', sa.DateTime(), nullable=True),
sa.Column('period_start', sa.DateTime(), nullable=True),
sa.Column('period_end', sa.DateTime(), nullable=True),
sa.Column('period_label', sa.String(), nullable=True),
sa.Column('metrics_payload', sa.JSON(), nullable=True),
sa.Column('narrative_report', sa.JSON(), nullable=True),
sa.Column('model_used', sa.String(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_analysis_snapshots_date'), 'analysis_snapshots', ['date'], unique=False)
op.create_index(op.f('ix_analysis_snapshots_id'), 'analysis_snapshots', ['id'], unique=False)
op.create_table('artists',
sa.Column('id', sa.String(), nullable=False),
sa.Column('name', sa.String(), nullable=True),
sa.Column('genres', sa.JSON(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_artists_id'), 'artists', ['id'], unique=False)
op.create_table('track_artists',
sa.Column('track_id', sa.String(), nullable=False),
sa.Column('artist_id', sa.String(), nullable=False),
sa.ForeignKeyConstraint(['artist_id'], ['artists.id'], ),
sa.ForeignKeyConstraint(['track_id'], ['tracks.id'], ),
sa.PrimaryKeyConstraint('track_id', 'artist_id')
)
# ### end Alembic commands ###
def downgrade() -> None:
"""Downgrade schema."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('track_artists')
op.drop_index(op.f('ix_artists_id'), table_name='artists')
op.drop_table('artists')
op.drop_index(op.f('ix_analysis_snapshots_id'), table_name='analysis_snapshots')
op.drop_index(op.f('ix_analysis_snapshots_date'), table_name='analysis_snapshots')
op.drop_table('analysis_snapshots')
# ### end Alembic commands ###

View File

@@ -2,7 +2,7 @@ import asyncio
import os
from datetime import datetime
from sqlalchemy.orm import Session
from .models import Track, PlayHistory
from .models import Track, PlayHistory, Artist
from .database import SessionLocal
from .services.spotify_client import SpotifyClient
from .services.reccobeats_client import ReccoBeatsClient
@@ -19,9 +19,32 @@ def get_spotify_client():
def get_reccobeats_client():
return ReccoBeatsClient()
async def ensure_artists_exist(db: Session, artists_data: list):
"""
Ensures that all artists in the list exist in the Artist table.
Returns a list of Artist objects.
"""
artist_objects = []
for a_data in artists_data:
artist_id = a_data["id"]
artist = db.query(Artist).filter(Artist.id == artist_id).first()
if not artist:
artist = Artist(
id=artist_id,
name=a_data["name"],
genres=[] # Will be enriched later
)
db.add(artist)
# We commit inside the loop or after, but for now we rely on the main commit
# However, to return the object correctly we might need to flush if we were doing complex things,
# but here adding to session is enough for SQLAlchemy to track it.
artist_objects.append(artist)
return artist_objects
async def enrich_tracks(db: Session, spotify_client: SpotifyClient, recco_client: ReccoBeatsClient):
"""
Finds tracks missing genres (Spotify) or audio features (ReccoBeats) and enriches them.
Also enriches Artists with genres.
"""
# 1. Enrich Audio Features (via ReccoBeats)
@@ -66,39 +89,35 @@ async def enrich_tracks(db: Session, spotify_client: SpotifyClient, recco_client
print(f"Updated {updated_count} tracks with audio features.")
db.commit()
# 2. Enrich Genres (via Spotify Artists)
tracks_missing_genres = db.query(Track).filter(Track.genres == None).limit(50).all()
# 2. Enrich Artist Genres (via Spotify Artists)
# We look for artists who have no genres. Note: an artist might genuinely have no genres,
# so we might need a flag "genres_checked" in the future, but for now checking empty list is okay.
# However, newly created artists have genres=[] (empty list) or None?
# My model definition: genres = Column(JSON, nullable=True)
# So if it is None, we haven't fetched it.
if tracks_missing_genres:
print(f"Enriching {len(tracks_missing_genres)} tracks with genres (Spotify)...")
artists_missing_genres = db.query(Artist).filter(Artist.genres == None).limit(50).all()
artist_ids = set()
track_artist_map = {}
for t in tracks_missing_genres:
if t.raw_data and "artists" in t.raw_data:
a_ids = [a["id"] for a in t.raw_data["artists"]]
artist_ids.update(a_ids)
track_artist_map[t.id] = a_ids
artist_ids_list = list(artist_ids)
artist_genre_map = {}
if artists_missing_genres:
print(f"Enriching {len(artists_missing_genres)} artists with genres (Spotify)...")
artist_ids_list = [a.id for a in artists_missing_genres]
artist_data_map = {}
# Spotify allows fetching 50 artists at a time
for i in range(0, len(artist_ids_list), 50):
chunk = artist_ids_list[i:i+50]
artists_data = await spotify_client.get_artists(chunk)
for a_data in artists_data:
if a_data:
artist_genre_map[a_data["id"]] = a_data.get("genres", [])
artist_data_map[a_data["id"]] = a_data.get("genres", [])
for t in tracks_missing_genres:
a_ids = track_artist_map.get(t.id, [])
combined_genres = set()
for a_id in a_ids:
genres = artist_genre_map.get(a_id, [])
combined_genres.update(genres)
t.genres = list(combined_genres)
for artist in artists_missing_genres:
genres = artist_data_map.get(artist.id)
if genres is not None:
artist.genres = genres
else:
# If we couldn't fetch, set to empty list so we don't keep retrying forever (or handle errors better)
artist.genres = []
db.commit()
@@ -128,15 +147,30 @@ async def ingest_recently_played(db: Session):
track = Track(
id=track_id,
name=track_data["name"],
artist=", ".join([a["name"] for a in track_data["artists"]]),
artist=", ".join([a["name"] for a in track_data["artists"]]), # Legacy string
album=track_data["album"]["name"],
duration_ms=track_data["duration_ms"],
popularity=track_data["popularity"],
raw_data=track_data
)
# Handle Artists Relation
artists_data = track_data.get("artists", [])
artist_objects = await ensure_artists_exist(db, artists_data)
track.artists = artist_objects
db.add(track)
db.commit()
# Ensure relationships exist even if track existed (e.g. migration)
# Check if track has artists linked. If not (and raw_data has them), link them.
# FIX: Logic was previously indented improperly inside `if not track`.
if not track.artists and track.raw_data and "artists" in track.raw_data:
print(f"Backfilling artists for track {track.name}")
artist_objects = await ensure_artists_exist(db, track.raw_data["artists"])
track.artists = artist_objects
db.commit()
exists = db.query(PlayHistory).filter(
PlayHistory.track_id == track_id,
PlayHistory.played_at == played_at

View File

@@ -1,14 +1,32 @@
from sqlalchemy import Column, Integer, String, DateTime, JSON, ForeignKey, Float
from sqlalchemy import Column, Integer, String, DateTime, JSON, ForeignKey, Float, Table, Text
from sqlalchemy.orm import relationship
from datetime import datetime
from .database import Base
# Association Table for Many-to-Many Relationship between Track and Artist
track_artists = Table(
'track_artists',
Base.metadata,
Column('track_id', String, ForeignKey('tracks.id'), primary_key=True),
Column('artist_id', String, ForeignKey('artists.id'), primary_key=True)
)
class Artist(Base):
__tablename__ = "artists"
id = Column(String, primary_key=True, index=True) # Spotify ID
name = Column(String)
genres = Column(JSON, nullable=True) # List of genre strings
# Relationships
tracks = relationship("Track", secondary=track_artists, back_populates="artists")
class Track(Base):
__tablename__ = "tracks"
id = Column(String, primary_key=True, index=True) # Spotify ID
name = Column(String)
artist = Column(String)
artist = Column(String) # Display string (e.g. "Drake, Future") - kept for convenience
album = Column(String)
duration_ms = Column(Integer)
popularity = Column(Integer, nullable=True)
@@ -31,17 +49,18 @@ class Track(Base):
tempo = Column(Float, nullable=True)
time_signature = Column(Integer, nullable=True)
# Genres (stored as JSON list of strings)
# Genres (stored as JSON list of strings) - DEPRECATED in favor of Artist.genres but kept for now
genres = Column(JSON, nullable=True)
# AI Analysis fields
lyrics_summary = Column(String, nullable=True)
genre_tags = Column(String, nullable=True) # JSON list stored as string or just raw JSON
genre_tags = Column(String, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
plays = relationship("PlayHistory", back_populates="track")
artists = relationship("Artist", secondary=track_artists, back_populates="tracks")
class PlayHistory(Base):
@@ -55,3 +74,23 @@ class PlayHistory(Base):
context_uri = Column(String, nullable=True)
track = relationship("Track", back_populates="plays")
class AnalysisSnapshot(Base):
"""
Stores the computed statistics and LLM analysis for a given period.
Allows for trend analysis over time.
"""
__tablename__ = "analysis_snapshots"
id = Column(Integer, primary_key=True, index=True)
date = Column(DateTime, default=datetime.utcnow, index=True) # When the analysis was run
period_start = Column(DateTime)
period_end = Column(DateTime)
period_label = Column(String) # e.g., "last_30_days", "monthly_nov_2023"
# The heavy lifting: stored as JSON blobs
metrics_payload = Column(JSON) # The input to the LLM (StatsService output)
narrative_report = Column(JSON) # The output from the LLM (NarrativeService output)
model_used = Column(String, nullable=True) # e.g. "gemini-1.5-flash"

View File

@@ -0,0 +1,67 @@
import os
import json
import google.generativeai as genai
from typing import Dict, Any
class NarrativeService:
def __init__(self, model_name: str = "gemini-2.5-flash"):
self.api_key = os.getenv("GEMINI_API_KEY")
if not self.api_key:
print("WARNING: GEMINI_API_KEY not found. LLM features will fail.")
else:
genai.configure(api_key=self.api_key)
self.model_name = model_name
def generate_narrative(self, stats_json: Dict[str, Any]) -> Dict[str, str]:
if not self.api_key:
return {"error": "Missing API Key"}
prompt = f"""
You are analyzing a user's Spotify listening data. Below is a JSON summary of metrics I've computed. Your job is to:
1. Write a narrative "Vibe Check" (2-3 paragraphs) describing their overall listening personality this period.
2. Identify 3-5 notable patterns or anomalies.
3. Provide a "Musical Persona" label (e.g., "Late-Night Binge Listener", "Genre Chameleon", "Album Purist").
4. Write a brief, playful "roast" (1-2 sentences) based on the data.
Guidelines:
- Do NOT recalculate any numbers.
- Use specific metrics to support observations (e.g., "Your whiplash score of 18.3 BPM suggests...").
- Keep tone conversational but insightful.
- Avoid mental health claims; stick to behavioral descriptors.
- Highlight both positive patterns and quirks.
Data:
{json.dumps(stats_json, indent=2)}
Output Format (return valid JSON):
{{
"vibe_check": "...",
"patterns": ["...", "..."],
"persona": "...",
"roast": "..."
}}
"""
try:
# Handle full model path if passed or default short name
# The library often accepts 'gemini-2.5-flash' but list_models returns 'models/gemini-2.5-flash'
model_id = self.model_name
if not model_id.startswith("models/") and "/" not in model_id:
# Try simple name, if it fails user might need to pass 'models/...'
pass
model = genai.GenerativeModel(model_id)
response = model.generate_content(prompt)
# Clean up response to ensure valid JSON (sometimes LLMs add markdown blocks)
text = response.text.strip()
if text.startswith("```json"):
text = text.replace("```json", "").replace("```", "")
elif text.startswith("```"):
text = text.replace("```", "")
return json.loads(text)
except Exception as e:
return {"error": str(e), "raw_response": response.text if 'response' in locals() else "No response"}

View File

@@ -0,0 +1,396 @@
from sqlalchemy.orm import Session
from sqlalchemy import func, distinct, desc
from datetime import datetime, timedelta
from typing import Dict, Any, List
import math
import numpy as np
from ..models import PlayHistory, Track, Artist, AnalysisSnapshot
class StatsService:
def __init__(self, db: Session):
self.db = db
def compute_volume_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Calculates volume metrics: Total Plays, Unique Tracks, Artists, etc.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
)
plays = query.all()
total_plays = len(plays)
if total_plays == 0:
return {
"total_plays": 0,
"estimated_minutes": 0,
"unique_tracks": 0,
"unique_artists": 0,
"unique_albums": 0,
"unique_genres": 0,
"top_tracks": [],
"top_artists": [],
"repeat_rate": 0,
"concentration": {}
}
# Calculate Duration (Estimated)
# Note: We query tracks to get duration.
# Ideally we join, but eager loading might be heavy. Let's do a join or simple loop.
# Efficient approach: Get all track IDs from plays, fetch Track objects in bulk map.
track_ids = [p.track_id for p in plays]
tracks = self.db.query(Track).filter(Track.id.in_(set(track_ids))).all()
track_map = {t.id: t for t in tracks}
total_ms = 0
unique_track_ids = set()
unique_artist_ids = set()
unique_album_names = set() # Spotify doesn't give album ID in PlayHistory directly unless joined, track has album name string.
# Ideally track has raw_data['album']['id'].
unique_album_ids = set()
genre_counts = {}
# For Top Lists
track_play_counts = {}
artist_play_counts = {}
for p in plays:
t = track_map.get(p.track_id)
if t:
total_ms += t.duration_ms
unique_track_ids.add(t.id)
# Top Tracks
track_play_counts[t.id] = track_play_counts.get(t.id, 0) + 1
# Artists (using relation)
# Note: This might cause N+1 query if not eager loaded.
# For strictly calculation, accessing t.artists (lazy load) loop might be slow for 1000s of plays.
# Optimization: Join PlayHistory -> Track -> Artist in query.
# Let's rely on raw_data for speed if relation loading is slow,
# OR Assume we accept some latency.
# Better: Pre-fetch artist connections or use the new tables properly.
# Let's use the object relation for correctness as per plan.
for artist in t.artists:
unique_artist_ids.add(artist.id)
artist_play_counts[artist.id] = artist_play_counts.get(artist.id, 0) + 1
if artist.genres:
for g in artist.genres:
genre_counts[g] = genre_counts.get(g, 0) + 1
if t.raw_data and "album" in t.raw_data:
unique_album_ids.add(t.raw_data["album"]["id"])
else:
unique_album_ids.add(t.album) # Fallback
estimated_minutes = total_ms / 60000
# Top 5 Tracks
sorted_tracks = sorted(track_play_counts.items(), key=lambda x: x[1], reverse=True)[:5]
top_tracks = []
for tid, count in sorted_tracks:
t = track_map.get(tid)
top_tracks.append({
"name": t.name,
"artist": t.artist, # Display string
"count": count
})
# Top 5 Artists
# Need to fetch Artist names
top_artist_ids = sorted(artist_play_counts.items(), key=lambda x: x[1], reverse=True)[:5]
top_artists_objs = self.db.query(Artist).filter(Artist.id.in_([x[0] for x in top_artist_ids])).all()
artist_name_map = {a.id: a.name for a in top_artists_objs}
top_artists = []
for aid, count in top_artist_ids:
top_artists.append({
"name": artist_name_map.get(aid, "Unknown"),
"count": count
})
# Top Genres
sorted_genres = sorted(genre_counts.items(), key=lambda x: x[1], reverse=True)[:5]
top_genres = [{"name": g, "count": c} for g, c in sorted_genres]
# Concentration
unique_tracks_count = len(unique_track_ids)
repeat_rate = (total_plays - unique_tracks_count) / total_plays if total_plays > 0 else 0
# HHI (HerfindahlHirschman Index)
# Sum of (share)^2. Share = track_plays / total_plays
hhi = sum([(c/total_plays)**2 for c in track_play_counts.values()])
return {
"total_plays": total_plays,
"estimated_minutes": int(estimated_minutes),
"unique_tracks": unique_tracks_count,
"unique_artists": len(unique_artist_ids),
"unique_albums": len(unique_album_ids),
"unique_genres": len(genre_counts),
"top_tracks": top_tracks,
"top_artists": top_artists,
"top_genres": top_genres,
"repeat_rate": round(repeat_rate, 3),
"concentration": {
"hhi": round(hhi, 4),
# "gini": ... (skip for now to keep it simple)
}
}
def compute_time_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Hourly, Daily distribution, etc.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
)
plays = query.all()
hourly_counts = [0] * 24
weekday_counts = [0] * 7 # 0=Mon, 6=Sun
if not plays:
return {"hourly_distribution": hourly_counts}
for p in plays:
# played_at is UTC in DB usually. Ensure we handle timezone if user wants local.
# For now, assuming UTC or system time.
h = p.played_at.hour
d = p.played_at.weekday()
hourly_counts[h] += 1
weekday_counts[d] += 1
peak_hour = hourly_counts.index(max(hourly_counts))
# Weekend Share
weekend_plays = weekday_counts[5] + weekday_counts[6]
weekend_share = weekend_plays / len(plays) if len(plays) > 0 else 0
return {
"hourly_distribution": hourly_counts,
"peak_hour": peak_hour,
"weekday_distribution": weekday_counts,
"weekend_share": round(weekend_share, 2)
}
def compute_session_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Session logic: Gap > 20 mins = new session.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
if not plays:
return {"count": 0, "avg_length_minutes": 0}
sessions = []
current_session = [plays[0]]
for i in range(1, len(plays)):
prev = plays[i-1]
curr = plays[i]
diff = (curr.played_at - prev.played_at).total_seconds() / 60
if diff > 20:
sessions.append(current_session)
current_session = []
current_session.append(curr)
sessions.append(current_session)
session_lengths_min = []
for sess in sessions:
if len(sess) > 1:
start = sess[0].played_at
end = sess[-1].played_at
# Add duration of last track?
# Let's just do (end - start) for simplicity + avg track duration
duration = (end - start).total_seconds() / 60
session_lengths_min.append(duration)
else:
session_lengths_min.append(3.0) # Approx 1 track
avg_min = sum(session_lengths_min) / len(session_lengths_min) if session_lengths_min else 0
return {
"count": len(sessions),
"avg_tracks": len(plays) / len(sessions),
"avg_minutes": round(avg_min, 1),
"longest_session_minutes": round(max(session_lengths_min), 1) if session_lengths_min else 0
}
def compute_vibe_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Aggregates Audio Features (Energy, Valence, etc.)
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
)
plays = query.all()
track_ids = list(set([p.track_id for p in plays]))
if not track_ids:
return {}
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
# Collect features
features = {
"energy": [], "valence": [], "danceability": [],
"tempo": [], "acousticness": [], "instrumentalness": [],
"liveness": [], "speechiness": []
}
for t in tracks:
# Weight by plays? The spec implies "Per-Period Aggregates".
# Usually weighted by play count is better representation of what was HEARD.
# Let's weight by play count in this period.
play_count = len([p for p in plays if p.track_id == t.id])
if t.energy is not None:
for _ in range(play_count):
features["energy"].append(t.energy)
features["valence"].append(t.valence)
features["danceability"].append(t.danceability)
features["tempo"].append(t.tempo)
features["acousticness"].append(t.acousticness)
features["instrumentalness"].append(t.instrumentalness)
features["liveness"].append(t.liveness)
features["speechiness"].append(t.speechiness)
stats = {}
for key, values in features.items():
valid = [v for v in values if v is not None]
if valid:
stats[f"avg_{key}"] = float(np.mean(valid))
stats[f"std_{key}"] = float(np.std(valid))
else:
stats[f"avg_{key}"] = None
# Derived Metrics
if stats.get("avg_energy") and stats.get("avg_valence"):
stats["mood_quadrant"] = {
"x": round(stats["avg_valence"], 2),
"y": round(stats["avg_energy"], 2)
}
return stats
def compute_era_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Musical Age and Era Distribution.
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
)
plays = query.all()
years = []
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
for p in plays:
t = track_map.get(p.track_id)
if t and t.raw_data and "album" in t.raw_data and "release_date" in t.raw_data["album"]:
rd = t.raw_data["album"]["release_date"]
# Format can be YYYY, YYYY-MM, YYYY-MM-DD
try:
year = int(rd.split("-")[0])
years.append(year)
except:
pass
if not years:
return {"musical_age": None}
avg_year = sum(years) / len(years)
# Decade breakdown
decades = {}
for y in years:
dec = (y // 10) * 10
label = f"{dec}s"
decades[label] = decades.get(label, 0) + 1
total = len(years)
decade_dist = {k: round(v/total, 2) for k, v in decades.items()}
return {
"musical_age": int(avg_year),
"decade_distribution": decade_dist
}
def compute_skip_stats(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
"""
Implements boredom skip detection:
(next_track.played_at - current_track.played_at) < (current_track.duration_ms / 1000 - 10s)
"""
query = self.db.query(PlayHistory).filter(
PlayHistory.played_at >= period_start,
PlayHistory.played_at <= period_end
).order_by(PlayHistory.played_at.asc())
plays = query.all()
if len(plays) < 2:
return {"skip_rate": 0, "total_skips": 0}
skips = 0
track_ids = list(set([p.track_id for p in plays]))
tracks = self.db.query(Track).filter(Track.id.in_(track_ids)).all()
track_map = {t.id: t for t in tracks}
for i in range(len(plays) - 1):
current_play = plays[i]
next_play = plays[i+1]
track = track_map.get(current_play.track_id)
if not track or not track.duration_ms:
continue
diff_seconds = (next_play.played_at - current_play.played_at).total_seconds()
# Logic: If diff < (duration - 10s), it's a skip.
# Convert duration to seconds
duration_sec = track.duration_ms / 1000.0
# Also ensure diff isn't negative or weirdly small (re-plays)
# And assume "listening" means diff > 30s at least?
# Spec says "Spotify only returns 30s+".
if diff_seconds < (duration_sec - 10):
skips += 1
return {
"total_skips": skips,
"skip_rate": round(skips / len(plays), 3)
}
def generate_full_report(self, period_start: datetime, period_end: datetime) -> Dict[str, Any]:
return {
"period": {
"start": period_start.isoformat(),
"end": period_end.isoformat()
},
"volume": self.compute_volume_stats(period_start, period_end),
"time_habits": self.compute_time_stats(period_start, period_end),
"sessions": self.compute_session_stats(period_start, period_end),
"vibe": self.compute_vibe_stats(period_start, period_end),
"era": self.compute_era_stats(period_start, period_end),
"skips": self.compute_skip_stats(period_start, period_end)
}

82
backend/run_analysis.py Normal file
View File

@@ -0,0 +1,82 @@
import os
import sys
import json
from datetime import datetime, timedelta
from app.database import SessionLocal
from app.services.stats_service import StatsService
from app.services.narrative_service import NarrativeService
from app.models import AnalysisSnapshot
def run_analysis_pipeline(days: int = 30, model_name: str = "gemini-2.5-flash"):
db = SessionLocal()
try:
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days)
print(f"--- Starting Analysis for period: {start_date} to {end_date} ---")
# 1. Compute Stats
print("Calculating metrics...")
stats_service = StatsService(db)
stats_json = stats_service.generate_full_report(start_date, end_date)
# Check if we have enough data
if stats_json["volume"]["total_plays"] == 0:
print("No plays found in this period. Skipping LLM analysis.")
return
print(f"Stats computed. Total Plays: {stats_json['volume']['total_plays']}")
print(f"Top Artist: {stats_json['volume']['top_artists'][0]['name'] if stats_json['volume']['top_artists'] else 'N/A'}")
# 2. Generate Narrative
print(f"Generating Narrative with {model_name}...")
narrative_service = NarrativeService(model_name=model_name)
narrative_json = narrative_service.generate_narrative(stats_json)
if "error" in narrative_json:
print(f"LLM Error: {narrative_json['error']}")
else:
print("Narrative generated successfully.")
print(f"Persona: {narrative_json.get('persona')}")
# 3. Save Snapshot
print("Saving snapshot to database...")
snapshot = AnalysisSnapshot(
period_start=start_date,
period_end=end_date,
period_label=f"last_{days}_days",
metrics_payload=stats_json,
narrative_report=narrative_json,
model_used=model_name
)
db.add(snapshot)
db.commit()
print(f"Snapshot saved with ID: {snapshot.id}")
# 4. Output to file for easy inspection
output = {
"snapshot_id": snapshot.id,
"metrics": stats_json,
"narrative": narrative_json
}
with open("latest_analysis.json", "w") as f:
json.dump(output, f, indent=2)
print("Full report saved to latest_analysis.json")
except Exception as e:
print(f"Pipeline Failed: {e}")
import traceback
traceback.print_exc()
finally:
db.close()
if __name__ == "__main__":
# Allow arguments?
days = 30
if len(sys.argv) > 1:
try:
days = int(sys.argv[1])
except:
pass
run_analysis_pipeline(days=days)

78
backend/seed_data.py Normal file
View File

@@ -0,0 +1,78 @@
from datetime import datetime, timedelta
import random
from app.database import SessionLocal
from app.models import Track, Artist, PlayHistory
from app.services.stats_service import StatsService
def seed_db():
db = SessionLocal()
# 1. Create Artists
artists = []
for i in range(10):
a = Artist(
id=f"artist_{i}",
name=f"Artist {i}",
genres=[random.choice(["pop", "rock", "jazz", "edm", "hip-hop"]) for _ in range(2)]
)
db.merge(a) # merge handles insert/update
artists.append(a)
db.commit()
print(f"Seeded {len(artists)} artists.")
# 2. Create Tracks
tracks = []
for i in range(50):
# Random artist
artist = random.choice(artists)
t = Track(
id=f"track_{i}",
name=f"Track {i}",
artist=artist.name, # Legacy
album=f"Album {i % 10}",
duration_ms=random.randint(180000, 300000), # 3-5 mins
popularity=random.randint(10, 90),
danceability=random.uniform(0.3, 0.9),
energy=random.uniform(0.3, 0.9),
valence=random.uniform(0.1, 0.9),
tempo=random.uniform(80, 160),
raw_data={"album": {"id": f"album_{i%10}", "release_date": f"{random.randint(2000, 2023)}-01-01"}}
)
# Link artist
t.artists.append(artist)
db.merge(t)
tracks.append(t)
db.commit()
print(f"Seeded {len(tracks)} tracks.")
# 3. Create Play History (Last 30 days)
plays = []
base_time = datetime.utcnow() - timedelta(days=25)
for i in range(200):
# Create sessions
# 80% chance next play is soon (2-5 mins), 20% chance gap (30-600 mins)
gap = random.randint(2, 6) if random.random() > 0.2 else random.randint(30, 600)
base_time += timedelta(minutes=gap)
if base_time > datetime.utcnow():
break
track = random.choice(tracks)
p = PlayHistory(
track_id=track.id,
played_at=base_time,
context_uri="spotify:playlist:fake"
)
db.add(p)
db.commit()
print(f"Seeded play history until {base_time}.")
db.close()
if __name__ == "__main__":
seed_db()

View File

@@ -0,0 +1,69 @@
import unittest
from datetime import datetime, timedelta
from unittest.mock import MagicMock
from app.services.stats_service import StatsService
from app.models import PlayHistory, Track, Artist
class TestStatsService(unittest.TestCase):
def setUp(self):
self.mock_db = MagicMock()
self.service = StatsService(self.mock_db)
def test_compute_volume_stats_empty(self):
# Mock empty query result
self.mock_db.query.return_value.filter.return_value.all.return_value = []
start = datetime.utcnow()
end = datetime.utcnow()
stats = self.service.compute_volume_stats(start, end)
self.assertEqual(stats["total_plays"], 0)
self.assertEqual(stats["unique_tracks"], 0)
def test_compute_session_stats(self):
# Create dummy plays
t1 = datetime(2023, 1, 1, 10, 0, 0)
t2 = datetime(2023, 1, 1, 10, 5, 0) # 5 min gap (same session)
t3 = datetime(2023, 1, 1, 12, 0, 0) # 1h 55m gap (new session)
plays = [
PlayHistory(played_at=t1, track_id="1"),
PlayHistory(played_at=t2, track_id="2"),
PlayHistory(played_at=t3, track_id="3"),
]
# Mock the query chain
# service.db.query().filter().order_by().all()
query_mock = self.mock_db.query.return_value.filter.return_value.order_by.return_value
query_mock.all.return_value = plays
stats = self.service.compute_session_stats(datetime.utcnow(), datetime.utcnow())
# Expected: 2 sessions ([t1, t2], [t3])
self.assertEqual(stats["count"], 2)
# Avg tracks: 3 plays / 2 sessions = 1.5
self.assertEqual(stats["avg_tracks"], 1.5)
def test_compute_skip_stats(self):
# Track duration = 30s
track = Track(id="t1", duration_ms=30000)
# Play 1: 10:00:00
# Play 2: 10:00:10 (Diff 10s. Duration 30s. 10 < 20 (30-10) -> Skip)
p1 = PlayHistory(played_at=datetime(2023, 1, 1, 10, 0, 0), track_id="t1")
p2 = PlayHistory(played_at=datetime(2023, 1, 1, 10, 0, 10), track_id="t1")
plays = [p1, p2]
query_mock = self.mock_db.query.return_value.filter.return_value.order_by.return_value
query_mock.all.return_value = plays
# Mock track lookup
self.mock_db.query.return_value.filter.return_value.all.return_value = [track]
stats = self.service.compute_skip_stats(datetime.utcnow(), datetime.utcnow())
self.assertEqual(stats["total_skips"], 1)
if __name__ == '__main__':
unittest.main()