"""DuckDB storage manager for market data. Provides async-compatible interface for storing and retrieving market data using DuckDB. """ from datetime import datetime from pathlib import Path import duckdb import structlog from tradefinder.adapters.types import Candle, FundingRate from tradefinder.data.schemas import ALL_SCHEMAS logger = structlog.get_logger(__name__) class DataStorage: """DuckDB storage manager for market data. Usage: storage = DataStorage(Path("/data/tradefinder.duckdb")) storage.connect() storage.initialize_schema() storage.insert_candles(candles) storage.disconnect() """ def __init__(self, db_path: Path, *, read_only: bool = False) -> None: """Initialize storage with database path. Args: db_path: Path to DuckDB database file read_only: If True, open database in read-only mode (no locking) """ self.db_path = db_path self._read_only = read_only self._conn: duckdb.DuckDBPyConnection | None = None def connect(self) -> None: """Connect to the database.""" if not self._read_only: self.db_path.parent.mkdir(parents=True, exist_ok=True) self._conn = duckdb.connect(str(self.db_path), read_only=self._read_only) logger.info("Connected to DuckDB", path=str(self.db_path), read_only=self._read_only) def disconnect(self) -> None: """Close database connection.""" if self._conn: self._conn.close() self._conn = None logger.info("Disconnected from DuckDB") def __enter__(self) -> "DataStorage": """Context manager entry.""" self.connect() return self def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None: """Context manager exit.""" self.disconnect() @property def conn(self) -> duckdb.DuckDBPyConnection: """Get database connection.""" if self._conn is None: raise RuntimeError("Not connected. Call connect() first.") return self._conn def initialize_schema(self) -> None: """Create all database tables and indexes.""" for schema in ALL_SCHEMAS: # Execute each statement separately for statement in schema.strip().split(";"): statement = statement.strip() if statement: self.conn.execute(statement) logger.info("Database schema initialized") def insert_candles(self, candles: list[Candle], symbol: str, timeframe: str) -> int: """Insert candles into the database. Args: candles: List of Candle objects to insert symbol: Trading symbol (e.g., "BTCUSDT") timeframe: Candle timeframe (e.g., "1h", "4h") Returns: Number of candles inserted """ if not candles: return 0 # Prepare data for insertion data = [ ( symbol, timeframe, c.timestamp, float(c.open), float(c.high), float(c.low), float(c.close), float(c.volume), ) for c in candles ] # Use INSERT OR REPLACE to handle duplicates self.conn.executemany( """ INSERT OR REPLACE INTO candles (symbol, timeframe, timestamp, open, high, low, close, volume) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, data, ) logger.debug( "Inserted candles", symbol=symbol, timeframe=timeframe, count=len(candles), ) return len(candles) def get_candles( self, symbol: str, timeframe: str, start: datetime | None = None, end: datetime | None = None, limit: int | None = None, ) -> list[Candle]: """Retrieve candles from the database. Args: symbol: Trading symbol timeframe: Candle timeframe start: Start timestamp (inclusive) end: End timestamp (inclusive) limit: Maximum number of candles to return Returns: List of Candle objects, oldest first """ query = """ SELECT timestamp, open, high, low, close, volume FROM candles WHERE symbol = ? AND timeframe = ? """ params: list[str | datetime | int] = [symbol, timeframe] if start: query += " AND timestamp >= ?" params.append(start) if end: query += " AND timestamp <= ?" params.append(end) query += " ORDER BY timestamp ASC" if limit: query += " LIMIT ?" params.append(limit) result = self.conn.execute(query, params).fetchall() from decimal import Decimal return [ Candle( timestamp=row[0], open=Decimal(str(row[1])), high=Decimal(str(row[2])), low=Decimal(str(row[3])), close=Decimal(str(row[4])), volume=Decimal(str(row[5])), ) for row in result ] def get_latest_candle_timestamp(self, symbol: str, timeframe: str) -> datetime | None: """Get the timestamp of the most recent candle. Args: symbol: Trading symbol timeframe: Candle timeframe Returns: Timestamp of latest candle, or None if no candles exist """ result = self.conn.execute( """ SELECT MAX(timestamp) FROM candles WHERE symbol = ? AND timeframe = ? """, [symbol, timeframe], ).fetchone() return result[0] if result and result[0] else None def insert_funding_rate(self, rate: FundingRate) -> None: """Insert a funding rate record. Args: rate: FundingRate object to insert """ self.conn.execute( """ INSERT OR REPLACE INTO funding_rates (symbol, funding_rate, funding_time, mark_price) VALUES (?, ?, ?, ?) """, [ rate.symbol, float(rate.funding_rate), rate.funding_time, float(rate.mark_price), ], ) def get_candle_count(self, symbol: str, timeframe: str) -> int: """Get the number of candles stored for a symbol/timeframe. Args: symbol: Trading symbol timeframe: Candle timeframe Returns: Number of candles """ result = self.conn.execute( """ SELECT COUNT(*) FROM candles WHERE symbol = ? AND timeframe = ? """, [symbol, timeframe], ).fetchone() return result[0] if result else 0