Add data layer (DuckDB storage, fetcher) and spot adapter with tests

- Add DataStorage class for DuckDB-based market data persistence
- Add DataFetcher for historical candle backfill and sync operations
- Add BinanceSpotAdapter for spot wallet balance queries
- Add binance_spot_base_url to Settings for spot testnet support
- Add comprehensive unit tests (50 new tests, 82 total)
- Coverage increased from 62% to 86%
This commit is contained in:
bnair123
2025-12-27 14:38:26 +04:00
parent 17d51c4f78
commit 7d63e43b7b
10 changed files with 1635 additions and 1 deletions

View File

@@ -0,0 +1,242 @@
"""DuckDB storage manager for market data.
Provides async-compatible interface for storing and retrieving
market data using DuckDB.
"""
from datetime import datetime
from pathlib import Path
import duckdb
import structlog
from tradefinder.adapters.types import Candle, FundingRate
from tradefinder.data.schemas import ALL_SCHEMAS
logger = structlog.get_logger(__name__)
class DataStorage:
"""DuckDB storage manager for market data.
Usage:
storage = DataStorage(Path("/data/tradefinder.duckdb"))
storage.connect()
storage.initialize_schema()
storage.insert_candles(candles)
storage.disconnect()
"""
def __init__(self, db_path: Path) -> None:
"""Initialize storage with database path.
Args:
db_path: Path to DuckDB database file
"""
self.db_path = db_path
self._conn: duckdb.DuckDBPyConnection | None = None
def connect(self) -> None:
"""Connect to the database."""
# Ensure parent directory exists
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn = duckdb.connect(str(self.db_path))
logger.info("Connected to DuckDB", path=str(self.db_path))
def disconnect(self) -> None:
"""Close database connection."""
if self._conn:
self._conn.close()
self._conn = None
logger.info("Disconnected from DuckDB")
def __enter__(self) -> "DataStorage":
"""Context manager entry."""
self.connect()
return self
def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None:
"""Context manager exit."""
self.disconnect()
@property
def conn(self) -> duckdb.DuckDBPyConnection:
"""Get database connection."""
if self._conn is None:
raise RuntimeError("Not connected. Call connect() first.")
return self._conn
def initialize_schema(self) -> None:
"""Create all database tables and indexes."""
for schema in ALL_SCHEMAS:
# Execute each statement separately
for statement in schema.strip().split(";"):
statement = statement.strip()
if statement:
self.conn.execute(statement)
logger.info("Database schema initialized")
def insert_candles(self, candles: list[Candle], symbol: str, timeframe: str) -> int:
"""Insert candles into the database.
Args:
candles: List of Candle objects to insert
symbol: Trading symbol (e.g., "BTCUSDT")
timeframe: Candle timeframe (e.g., "1h", "4h")
Returns:
Number of candles inserted
"""
if not candles:
return 0
# Prepare data for insertion
data = [
(
symbol,
timeframe,
c.timestamp,
float(c.open),
float(c.high),
float(c.low),
float(c.close),
float(c.volume),
)
for c in candles
]
# Use INSERT OR REPLACE to handle duplicates
self.conn.executemany(
"""
INSERT OR REPLACE INTO candles
(symbol, timeframe, timestamp, open, high, low, close, volume)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
data,
)
logger.debug(
"Inserted candles",
symbol=symbol,
timeframe=timeframe,
count=len(candles),
)
return len(candles)
def get_candles(
self,
symbol: str,
timeframe: str,
start: datetime | None = None,
end: datetime | None = None,
limit: int | None = None,
) -> list[Candle]:
"""Retrieve candles from the database.
Args:
symbol: Trading symbol
timeframe: Candle timeframe
start: Start timestamp (inclusive)
end: End timestamp (inclusive)
limit: Maximum number of candles to return
Returns:
List of Candle objects, oldest first
"""
query = """
SELECT timestamp, open, high, low, close, volume
FROM candles
WHERE symbol = ? AND timeframe = ?
"""
params: list[str | datetime | int] = [symbol, timeframe]
if start:
query += " AND timestamp >= ?"
params.append(start)
if end:
query += " AND timestamp <= ?"
params.append(end)
query += " ORDER BY timestamp ASC"
if limit:
query += " LIMIT ?"
params.append(limit)
result = self.conn.execute(query, params).fetchall()
from decimal import Decimal
return [
Candle(
timestamp=row[0],
open=Decimal(str(row[1])),
high=Decimal(str(row[2])),
low=Decimal(str(row[3])),
close=Decimal(str(row[4])),
volume=Decimal(str(row[5])),
)
for row in result
]
def get_latest_candle_timestamp(self, symbol: str, timeframe: str) -> datetime | None:
"""Get the timestamp of the most recent candle.
Args:
symbol: Trading symbol
timeframe: Candle timeframe
Returns:
Timestamp of latest candle, or None if no candles exist
"""
result = self.conn.execute(
"""
SELECT MAX(timestamp) FROM candles
WHERE symbol = ? AND timeframe = ?
""",
[symbol, timeframe],
).fetchone()
return result[0] if result and result[0] else None
def insert_funding_rate(self, rate: FundingRate) -> None:
"""Insert a funding rate record.
Args:
rate: FundingRate object to insert
"""
self.conn.execute(
"""
INSERT OR REPLACE INTO funding_rates
(symbol, funding_rate, funding_time, mark_price)
VALUES (?, ?, ?, ?)
""",
[
rate.symbol,
float(rate.funding_rate),
rate.funding_time,
float(rate.mark_price),
],
)
def get_candle_count(self, symbol: str, timeframe: str) -> int:
"""Get the number of candles stored for a symbol/timeframe.
Args:
symbol: Trading symbol
timeframe: Candle timeframe
Returns:
Number of candles
"""
result = self.conn.execute(
"""
SELECT COUNT(*) FROM candles
WHERE symbol = ? AND timeframe = ?
""",
[symbol, timeframe],
).fetchone()
return result[0] if result else 0