Files
MusicAnalyser/backend/app/main.py
bnair123 887e78bf47 Add skip tracking, compressed heatmap, listening log, docs, tests, and OpenAI support
Major changes:
- Add skip tracking: poll currently-playing every 15s, detect skips (<30s listened)
- Add listening-log and sessions API endpoints
- Fix ReccoBeats client to extract spotify_id from href response
- Compress heatmap from 24 hours to 6 x 4-hour blocks
- Add OpenAI support in narrative service (use max_completion_tokens for new models)
- Add ListeningLog component with timeline and list views
- Update all frontend components to use real data (album art, play counts)
- Add docker-compose external network (dockernet) support
- Add comprehensive documentation (API, DATA_MODEL, ARCHITECTURE, FRONTEND)
- Add unit tests for ingest and API endpoints
2025-12-30 00:15:01 +04:00

207 lines
6.2 KiB
Python

from fastapi import FastAPI, Depends, HTTPException, BackgroundTasks, Query
from sqlalchemy.orm import Session, joinedload
from datetime import datetime, timedelta
from typing import List, Optional
from dotenv import load_dotenv
from .database import engine, Base, get_db
from .models import (
PlayHistory as PlayHistoryModel,
Track as TrackModel,
AnalysisSnapshot,
)
from . import schemas
from .ingest import ingest_recently_played
from .services.stats_service import StatsService
from .services.narrative_service import NarrativeService
load_dotenv()
Base.metadata.create_all(bind=engine)
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(title="Music Analyser Backend")
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "http://localhost:8991"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def read_root():
return {"status": "ok", "message": "Music Analyser API is running"}
@app.get("/history", response_model=List[schemas.PlayHistory])
def get_history(limit: int = 50, db: Session = Depends(get_db)):
history = (
db.query(PlayHistoryModel)
.order_by(PlayHistoryModel.played_at.desc())
.limit(limit)
.all()
)
return history
@app.get("/tracks", response_model=List[schemas.Track])
def get_tracks(limit: int = 50, db: Session = Depends(get_db)):
tracks = db.query(TrackModel).limit(limit).all()
return tracks
@app.post("/trigger-ingest")
async def trigger_ingest(
background_tasks: BackgroundTasks, db: Session = Depends(get_db)
):
"""Triggers Spotify ingestion in the background."""
background_tasks.add_task(ingest_recently_played, db)
return {"status": "Ingestion started in background"}
@app.post("/trigger-analysis")
def trigger_analysis(
days: int = 30,
model_name: str = "gpt-5-mini-2025-08-07",
db: Session = Depends(get_db),
):
"""
Runs the full analysis pipeline (Stats + LLM) for the last X days.
Returns the computed metrics and narrative immediately.
"""
try:
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days)
# 1. Compute Stats
stats_service = StatsService(db)
stats_json = stats_service.generate_full_report(start_date, end_date)
if stats_json["volume"]["total_plays"] == 0:
raise HTTPException(
status_code=404, detail="No plays found in the specified period."
)
narrative_service = NarrativeService(model_name=model_name)
narrative_json = narrative_service.generate_full_narrative(stats_json)
# 3. Save Snapshot
snapshot = AnalysisSnapshot(
period_start=start_date,
period_end=end_date,
period_label=f"last_{days}_days",
metrics_payload=stats_json,
narrative_report=narrative_json,
model_used=model_name,
)
db.add(snapshot)
db.commit()
db.refresh(snapshot)
return {
"status": "success",
"snapshot_id": snapshot.id,
"period": {"start": start_date, "end": end_date},
"metrics": stats_json,
"narrative": narrative_json,
}
except HTTPException:
raise # Re-raise HTTPExceptions as-is (404, etc.)
except Exception as e:
print(f"Analysis Failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/snapshots")
def get_snapshots(limit: int = 10, db: Session = Depends(get_db)):
return (
db.query(AnalysisSnapshot)
.order_by(AnalysisSnapshot.date.desc())
.limit(limit)
.all()
)
@app.get("/listening-log")
def get_listening_log(
days: int = Query(default=7, ge=1, le=365),
limit: int = Query(default=200, ge=1, le=1000),
db: Session = Depends(get_db),
):
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days)
plays = (
db.query(PlayHistoryModel)
.options(joinedload(PlayHistoryModel.track))
.filter(
PlayHistoryModel.played_at >= start_date,
PlayHistoryModel.played_at <= end_date,
)
.order_by(PlayHistoryModel.played_at.desc())
.limit(limit)
.all()
)
result = []
for i, play in enumerate(plays):
track = play.track
listened_ms = play.listened_ms
skipped = play.skipped
if listened_ms is None and i < len(plays) - 1:
next_play = plays[i + 1]
diff_seconds = (play.played_at - next_play.played_at).total_seconds()
if track and track.duration_ms:
duration_sec = track.duration_ms / 1000.0
listened_ms = int(min(diff_seconds, duration_sec) * 1000)
skipped = diff_seconds < 30
result.append(
{
"id": play.id,
"track_id": play.track_id,
"track_name": track.name if track else "Unknown",
"artist": track.artist if track else "Unknown",
"album": track.album if track else "Unknown",
"image": track.image_url if track else None,
"played_at": play.played_at.isoformat(),
"duration_ms": track.duration_ms if track else 0,
"listened_ms": listened_ms,
"skipped": skipped,
"context_uri": play.context_uri,
"source": play.source,
}
)
return {
"plays": result,
"period": {"start": start_date.isoformat(), "end": end_date.isoformat()},
}
@app.get("/sessions")
def get_sessions(
days: int = Query(default=7, ge=1, le=365), db: Session = Depends(get_db)
):
end_date = datetime.utcnow()
start_date = end_date - timedelta(days=days)
stats_service = StatsService(db)
session_stats = stats_service.compute_session_stats(start_date, end_date)
return {
"sessions": session_stats.get("session_list", []),
"summary": {
"count": session_stats.get("count", 0),
"avg_minutes": session_stats.get("avg_minutes", 0),
"micro_rate": session_stats.get("micro_session_rate", 0),
"marathon_rate": session_stats.get("marathon_session_rate", 0),
},
}