feat: Initial backend setup for Music Analyser

- Created FastAPI backend structure.
- Implemented Spotify Recently Played ingestion logic.
- Set up SQLite database with SQLAlchemy models.
- Added AI Service using Google Gemini.
- Created helper scripts for auth and background worker.
- Added Dockerfile and GitHub Actions workflow.
This commit is contained in:
google-labs-jules[bot]
2025-12-24 17:26:01 +00:00
parent a458eb00db
commit a97997a17a
16 changed files with 579 additions and 2 deletions

11
backend/Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

18
backend/app/database.py Normal file
View File

@@ -0,0 +1,18 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
SQLALCHEMY_DATABASE_URL = "sqlite:///./music.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

81
backend/app/ingest.py Normal file
View File

@@ -0,0 +1,81 @@
import asyncio
import os
from datetime import datetime
from sqlalchemy.orm import Session
from .models import Track, PlayHistory
from .database import SessionLocal
from .services.spotify_client import SpotifyClient
from dateutil import parser
# Initialize Spotify Client (env vars will be populated later)
def get_spotify_client():
return SpotifyClient(
client_id=os.getenv("SPOTIFY_CLIENT_ID"),
client_secret=os.getenv("SPOTIFY_CLIENT_SECRET"),
refresh_token=os.getenv("SPOTIFY_REFRESH_TOKEN"),
)
async def ingest_recently_played(db: Session):
client = get_spotify_client()
try:
items = await client.get_recently_played(limit=50)
except Exception as e:
print(f"Error connecting to Spotify: {e}")
return
print(f"Fetched {len(items)} items from Spotify.")
for item in items:
track_data = item["track"]
played_at_str = item["played_at"]
played_at = parser.isoparse(played_at_str)
# 1. Check if track exists, if not create it
track_id = track_data["id"]
track = db.query(Track).filter(Track.id == track_id).first()
if not track:
print(f"New track found: {track_data['name']}")
track = Track(
id=track_id,
name=track_data["name"],
artist=", ".join([a["name"] for a in track_data["artists"]]),
album=track_data["album"]["name"],
duration_ms=track_data["duration_ms"],
popularity=track_data["popularity"],
raw_data=track_data
)
db.add(track)
db.commit() # Commit immediately so ID exists for foreign key
# 2. Check if this specific play instance exists
# We assume (track_id, played_at) is unique enough
exists = db.query(PlayHistory).filter(
PlayHistory.track_id == track_id,
PlayHistory.played_at == played_at
).first()
if not exists:
print(f" recording play: {track_data['name']} at {played_at}")
play = PlayHistory(
track_id=track_id,
played_at=played_at,
context_uri=item.get("context", {}).get("uri") if item.get("context") else None
)
db.add(play)
db.commit()
async def run_worker():
"""Simulates a background worker loop."""
db = SessionLocal()
try:
while True:
print("Worker: Polling Spotify...")
await ingest_recently_played(db)
print("Worker: Sleeping for 60 seconds...")
await asyncio.sleep(60)
except Exception as e:
print(f"Worker crashed: {e}")
finally:
db.close()

36
backend/app/main.py Normal file
View File

@@ -0,0 +1,36 @@
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
from .database import engine, Base, get_db
from .models import PlayHistory as PlayHistoryModel, Track as TrackModel
from . import schemas
from .ingest import ingest_recently_played
import asyncio
from typing import List
from dotenv import load_dotenv
load_dotenv()
# Create tables
Base.metadata.create_all(bind=engine)
app = FastAPI(title="Music Analyser Backend")
@app.get("/")
def read_root():
return {"status": "ok", "message": "Music Analyser API is running"}
@app.get("/history", response_model=List[schemas.PlayHistory])
def get_history(limit: int = 50, db: Session = Depends(get_db)):
history = db.query(PlayHistoryModel).order_by(PlayHistoryModel.played_at.desc()).limit(limit).all()
return history
@app.post("/trigger-ingest")
async def trigger_ingest(db: Session = Depends(get_db)):
"""Manually trigger the ingestion process (useful for testing)"""
await ingest_recently_played(db)
return {"status": "Ingestion triggered"}
@app.get("/tracks", response_model=List[schemas.Track])
def get_tracks(limit: int = 50, db: Session = Depends(get_db)):
tracks = db.query(TrackModel).limit(limit).all()
return tracks

39
backend/app/models.py Normal file
View File

@@ -0,0 +1,39 @@
from sqlalchemy import Column, Integer, String, DateTime, JSON, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from datetime import datetime
from .database import Base
class Track(Base):
__tablename__ = "tracks"
id = Column(String, primary_key=True, index=True) # Spotify ID
name = Column(String)
artist = Column(String)
album = Column(String)
duration_ms = Column(Integer)
popularity = Column(Integer, nullable=True)
# Store raw full JSON response for future-proofing analysis
raw_data = Column(JSON, nullable=True)
# AI Analysis fields
lyrics_summary = Column(String, nullable=True)
genre_tags = Column(String, nullable=True) # JSON list stored as string or just raw JSON
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
plays = relationship("PlayHistory", back_populates="track")
class PlayHistory(Base):
__tablename__ = "play_history"
id = Column(Integer, primary_key=True, index=True)
track_id = Column(String, ForeignKey("tracks.id"))
played_at = Column(DateTime, index=True) # The timestamp from Spotify
# Context (album, playlist, etc.)
context_uri = Column(String, nullable=True)
track = relationship("Track", back_populates="plays")

32
backend/app/schemas.py Normal file
View File

@@ -0,0 +1,32 @@
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
class TrackBase(BaseModel):
id: str
name: str
artist: str
album: str
duration_ms: int
popularity: Optional[int] = None
lyrics_summary: Optional[str] = None
genre_tags: Optional[str] = None
class Track(TrackBase):
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class PlayHistoryBase(BaseModel):
track_id: str
played_at: datetime
context_uri: Optional[str] = None
class PlayHistory(PlayHistoryBase):
id: int
track: Track
class Config:
from_attributes = True

View File

@@ -0,0 +1,40 @@
import os
import google.generativeai as genai
from typing import List
from ..models import PlayHistory, Track
class AIService:
def __init__(self, api_key: str):
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('models/gemini-2.0-flash')
def generate_analysis(self, plays: List[PlayHistory]) -> str:
"""
Generates a summary analysis of the provided play history.
"""
if not plays:
return "No listening history available to analyze."
# Prepare a simple text representation of the history
history_text = "Here is my recent listening history:\n"
for play in plays:
history_text += f"- {play.track.name} by {play.track.artist} (Played at {play.played_at})\n"
prompt = f"""
You are a music taste analyst.
Analyze the following listening history and provide a short, fun, and insightful summary.
Identify the vibe, top artists, and any interesting patterns (e.g. "You started with high energy and chilled out").
Keep it under 200 words.
{history_text}
"""
try:
response = self.model.generate_content(prompt)
return response.text
except Exception as e:
return f"AI Analysis failed: {str(e)}"
# Singleton accessor
def get_ai_service():
return AIService(api_key=os.getenv("GEMINI_API_KEY"))

View File

@@ -0,0 +1,70 @@
import os
import base64
import time
import httpx
from fastapi import HTTPException
SPOTIFY_TOKEN_URL = "https://accounts.spotify.com/api/token"
SPOTIFY_API_BASE = "https://api.spotify.com/v1"
class SpotifyClient:
def __init__(self, client_id: str, client_secret: str, refresh_token: str):
self.client_id = client_id
self.client_secret = client_secret
self.refresh_token = refresh_token
self.access_token = None
self.token_expires_at = 0
async def get_access_token(self):
"""Returns a valid access token, refreshing if necessary."""
if self.access_token and time.time() < self.token_expires_at:
return self.access_token
print("Refreshing Spotify Access Token...")
async with httpx.AsyncClient() as client:
auth_str = f"{self.client_id}:{self.client_secret}"
b64_auth = base64.b64encode(auth_str.encode()).decode()
response = await client.post(
SPOTIFY_TOKEN_URL,
data={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
},
headers={"Authorization": f"Basic {b64_auth}"},
)
if response.status_code != 200:
print(f"Failed to refresh token: {response.text}")
raise Exception("Could not refresh Spotify token")
data = response.json()
self.access_token = data["access_token"]
# expires_in is usually 3600 seconds. buffer by 60s
self.token_expires_at = time.time() + data["expires_in"] - 60
return self.access_token
async def get_recently_played(self, limit=50):
token = await self.get_access_token()
async with httpx.AsyncClient() as client:
response = await client.get(
f"{SPOTIFY_API_BASE}/me/player/recently-played",
params={"limit": limit},
headers={"Authorization": f"Bearer {token}"},
)
if response.status_code != 200:
print(f"Error fetching recently played: {response.text}")
return []
return response.json().get("items", [])
async def get_track(self, track_id: str):
token = await self.get_access_token()
async with httpx.AsyncClient() as client:
response = await client.get(
f"{SPOTIFY_API_BASE}/tracks/{track_id}",
headers={"Authorization": f"Bearer {token}"},
)
if response.status_code != 200:
return None
return response.json()

11
backend/requirements.txt Normal file
View File

@@ -0,0 +1,11 @@
fastapi==0.109.2
uvicorn==0.27.1
sqlalchemy==2.0.27
httpx==0.26.0
python-dotenv==1.0.1
pydantic==2.6.1
pydantic-settings==2.1.0
google-generativeai==0.3.2
tenacity==8.2.3
python-dateutil==2.9.0.post0
requests==2.31.0

28
backend/run_ingest.py Normal file
View File

@@ -0,0 +1,28 @@
import asyncio
import sys
import os
# Add the current directory to sys.path to allow imports from app
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from dotenv import load_dotenv
load_dotenv()
from app.database import SessionLocal, Base, engine
from app.ingest import ingest_recently_played
# Ensure tables exist
Base.metadata.create_all(bind=engine)
async def main():
print("Starting manual ingestion...")
db = SessionLocal()
try:
await ingest_recently_played(db)
print("Ingestion complete.")
finally:
db.close()
if __name__ == "__main__":
asyncio.run(main())

22
backend/run_worker.py Normal file
View File

@@ -0,0 +1,22 @@
import asyncio
import sys
import os
from dotenv import load_dotenv
load_dotenv()
# Add the current directory to sys.path
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
from app.database import Base, engine
from app.ingest import run_worker
# Ensure tables exist
Base.metadata.create_all(bind=engine)
if __name__ == "__main__":
print("Starting Background Worker...")
try:
asyncio.run(run_worker())
except KeyboardInterrupt:
print("Worker stopped.")

View File

@@ -0,0 +1,87 @@
import os
import sys
import webbrowser
import requests
import urllib.parse
from http.server import HTTPServer, BaseHTTPRequestHandler
# Instructions for the user:
# 1. Go to Spotify Developer Dashboard: https://developer.spotify.com/dashboard/
# 2. Create an App.
# 3. Edit Settings -> Redirect URIs -> Add "http://localhost:8888/callback"
# 4. Save Settings.
# 5. Copy Client ID and Client Secret.
# 6. Run this script: python get_refresh_token.py
# CONFIGURATION - You can hardcode these or input them when prompted
SPOTIFY_CLIENT_ID = input("Enter your Spotify Client ID: ").strip()
SPOTIFY_CLIENT_SECRET = input("Enter your Spotify Client Secret: ").strip()
REDIRECT_URI = "http://localhost:8888/callback"
SCOPE = "user-read-recently-played user-read-playback-state"
class RequestHandler(BaseHTTPRequestHandler):
def do_GET(self):
query = urllib.parse.urlparse(self.path).query
params = urllib.parse.parse_qs(query)
if "code" in params:
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(b"<h1>Got the code! check your terminal.</h1>")
code = params["code"][0]
get_token(code)
# Shut down server
raise KeyboardInterrupt
def get_token(code):
url = "https://accounts.spotify.com/api/token"
payload = {
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"client_id": SPOTIFY_CLIENT_ID,
"client_secret": SPOTIFY_CLIENT_SECRET,
}
response = requests.post(url, data=payload)
if response.status_code == 200:
data = response.json()
print("\n" + "="*50)
print("SUCCESS! HERE ARE YOUR CREDENTIALS")
print("="*50)
print(f"\nSPOTIFY_REFRESH_TOKEN={data['refresh_token']}")
print(f"SPOTIFY_CLIENT_ID={SPOTIFY_CLIENT_ID}")
print(f"SPOTIFY_CLIENT_SECRET={SPOTIFY_CLIENT_SECRET}")
print("\nSave these in your .env file or share them with the agent.")
print("="*50 + "\n")
else:
print("Error getting token:", response.text)
def start_auth():
auth_url = "https://accounts.spotify.com/authorize?" + urllib.parse.urlencode({
"response_type": "code",
"client_id": SPOTIFY_CLIENT_ID,
"scope": SCOPE,
"redirect_uri": REDIRECT_URI,
})
print(f"Opening browser to: {auth_url}")
try:
webbrowser.open(auth_url)
except:
print(f"Could not open browser. Please manually visit: {auth_url}")
server_address = ('', 8888)
httpd = HTTPServer(server_address, RequestHandler)
print("Listening on port 8888...")
try:
httpd.serve_forever()
except KeyboardInterrupt:
pass
httpd.server_close()
if __name__ == "__main__":
start_auth()

0
backend/worker.log Normal file
View File