- Add DataStorage class for DuckDB-based market data persistence - Add DataFetcher for historical candle backfill and sync operations - Add BinanceSpotAdapter for spot wallet balance queries - Add binance_spot_base_url to Settings for spot testnet support - Add comprehensive unit tests (50 new tests, 82 total) - Coverage increased from 62% to 86%
294 lines
12 KiB
Python
294 lines
12 KiB
Python
"""Unit tests for DataStorage (DuckDB operations)."""
|
|
|
|
import tempfile
|
|
from datetime import datetime, timedelta
|
|
from decimal import Decimal
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
from tradefinder.adapters.types import Candle, FundingRate
|
|
from tradefinder.data.storage import DataStorage
|
|
|
|
|
|
class TestDataStorageConnection:
|
|
"""Tests for connection lifecycle."""
|
|
|
|
def test_connect_creates_database_file(self) -> None:
|
|
"""Database file is created on connect."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
storage = DataStorage(db_path)
|
|
|
|
storage.connect()
|
|
assert db_path.exists()
|
|
storage.disconnect()
|
|
|
|
def test_connect_creates_parent_directories(self) -> None:
|
|
"""Parent directories are created if they don't exist."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "nested" / "dir" / "test.duckdb"
|
|
storage = DataStorage(db_path)
|
|
|
|
storage.connect()
|
|
assert db_path.parent.exists()
|
|
storage.disconnect()
|
|
|
|
def test_disconnect_closes_connection(self) -> None:
|
|
"""Disconnect properly closes the connection."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
storage = DataStorage(db_path)
|
|
|
|
storage.connect()
|
|
storage.disconnect()
|
|
assert storage._conn is None
|
|
|
|
def test_context_manager_lifecycle(self) -> None:
|
|
"""Context manager properly opens and closes connection."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
assert storage._conn is not None
|
|
assert storage._conn is None
|
|
|
|
def test_conn_property_raises_when_not_connected(self) -> None:
|
|
"""Accessing conn property raises when not connected."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
storage = DataStorage(db_path)
|
|
|
|
with pytest.raises(RuntimeError, match="Not connected"):
|
|
_ = storage.conn
|
|
|
|
|
|
class TestDataStorageSchema:
|
|
"""Tests for schema initialization."""
|
|
|
|
def test_initialize_schema_creates_tables(self) -> None:
|
|
"""All tables are created by initialize_schema."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
|
|
# Verify tables exist
|
|
result = storage.conn.execute(
|
|
"SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'"
|
|
).fetchall()
|
|
tables = {row[0] for row in result}
|
|
|
|
assert "candles" in tables
|
|
assert "trades" in tables
|
|
assert "funding_rates" in tables
|
|
|
|
def test_initialize_schema_is_idempotent(self) -> None:
|
|
"""Calling initialize_schema multiple times is safe."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
storage.initialize_schema() # Should not raise
|
|
|
|
|
|
class TestDataStorageCandles:
|
|
"""Tests for candle CRUD operations."""
|
|
|
|
def _make_candle(self, timestamp: datetime) -> Candle:
|
|
"""Create a test candle."""
|
|
return Candle(
|
|
timestamp=timestamp,
|
|
open=Decimal("50000.00"),
|
|
high=Decimal("51000.00"),
|
|
low=Decimal("49000.00"),
|
|
close=Decimal("50500.00"),
|
|
volume=Decimal("1000.50"),
|
|
)
|
|
|
|
def test_insert_candles_stores_data(self) -> None:
|
|
"""Candles are stored correctly."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
|
|
ts = datetime(2024, 1, 1, 12, 0, 0)
|
|
candles = [self._make_candle(ts)]
|
|
|
|
inserted = storage.insert_candles(candles, "BTCUSDT", "1h")
|
|
assert inserted == 1
|
|
|
|
def test_insert_candles_empty_list_returns_zero(self) -> None:
|
|
"""Inserting empty list returns 0."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
|
|
inserted = storage.insert_candles([], "BTCUSDT", "1h")
|
|
assert inserted == 0
|
|
|
|
def test_insert_candles_handles_duplicates(self) -> None:
|
|
"""Duplicate candles are replaced (upsert behavior)."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
|
|
ts = datetime(2024, 1, 1, 12, 0, 0)
|
|
candles = [self._make_candle(ts)]
|
|
|
|
storage.insert_candles(candles, "BTCUSDT", "1h")
|
|
storage.insert_candles(candles, "BTCUSDT", "1h") # Duplicate
|
|
|
|
count = storage.get_candle_count("BTCUSDT", "1h")
|
|
assert count == 1 # Not 2
|
|
|
|
def test_get_candles_retrieves_data(self) -> None:
|
|
"""Candles can be retrieved correctly."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
|
|
ts = datetime(2024, 1, 1, 12, 0, 0)
|
|
candles = [self._make_candle(ts)]
|
|
storage.insert_candles(candles, "BTCUSDT", "1h")
|
|
|
|
result = storage.get_candles("BTCUSDT", "1h")
|
|
assert len(result) == 1
|
|
assert result[0].open == Decimal("50000.00")
|
|
assert result[0].close == Decimal("50500.00")
|
|
|
|
def test_get_candles_respects_time_range(self) -> None:
|
|
"""Candles are filtered by start/end time."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
|
|
base_ts = datetime(2024, 1, 1, 0, 0, 0)
|
|
candles = [self._make_candle(base_ts + timedelta(hours=i)) for i in range(10)]
|
|
storage.insert_candles(candles, "BTCUSDT", "1h")
|
|
|
|
# Query middle range
|
|
start = base_ts + timedelta(hours=3)
|
|
end = base_ts + timedelta(hours=6)
|
|
result = storage.get_candles("BTCUSDT", "1h", start=start, end=end)
|
|
|
|
assert len(result) == 4 # hours 3, 4, 5, 6
|
|
|
|
def test_get_candles_respects_limit(self) -> None:
|
|
"""Candle count is limited correctly."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
|
|
base_ts = datetime(2024, 1, 1, 0, 0, 0)
|
|
candles = [self._make_candle(base_ts + timedelta(hours=i)) for i in range(10)]
|
|
storage.insert_candles(candles, "BTCUSDT", "1h")
|
|
|
|
result = storage.get_candles("BTCUSDT", "1h", limit=5)
|
|
assert len(result) == 5
|
|
|
|
def test_get_candles_returns_ascending_order(self) -> None:
|
|
"""Candles are returned oldest first."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
|
|
base_ts = datetime(2024, 1, 1, 0, 0, 0)
|
|
# Insert in reverse order
|
|
candles = [self._make_candle(base_ts + timedelta(hours=i)) for i in range(5)]
|
|
storage.insert_candles(candles[::-1], "BTCUSDT", "1h")
|
|
|
|
result = storage.get_candles("BTCUSDT", "1h")
|
|
timestamps = [c.timestamp for c in result]
|
|
assert timestamps == sorted(timestamps)
|
|
|
|
def test_get_candles_filters_by_symbol(self) -> None:
|
|
"""Candles are filtered by symbol."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
|
|
ts = datetime(2024, 1, 1, 12, 0, 0)
|
|
candles = [self._make_candle(ts)]
|
|
|
|
storage.insert_candles(candles, "BTCUSDT", "1h")
|
|
storage.insert_candles(candles, "ETHUSDT", "1h")
|
|
|
|
btc_result = storage.get_candles("BTCUSDT", "1h")
|
|
eth_result = storage.get_candles("ETHUSDT", "1h")
|
|
|
|
assert len(btc_result) == 1
|
|
assert len(eth_result) == 1
|
|
|
|
def test_get_latest_candle_timestamp(self) -> None:
|
|
"""Latest timestamp is returned correctly."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
|
|
base_ts = datetime(2024, 1, 1, 0, 0, 0)
|
|
candles = [self._make_candle(base_ts + timedelta(hours=i)) for i in range(5)]
|
|
storage.insert_candles(candles, "BTCUSDT", "1h")
|
|
|
|
latest = storage.get_latest_candle_timestamp("BTCUSDT", "1h")
|
|
assert latest == base_ts + timedelta(hours=4)
|
|
|
|
def test_get_latest_candle_timestamp_returns_none_when_empty(self) -> None:
|
|
"""None is returned when no candles exist."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
|
|
latest = storage.get_latest_candle_timestamp("BTCUSDT", "1h")
|
|
assert latest is None
|
|
|
|
def test_get_candle_count(self) -> None:
|
|
"""Candle count is accurate."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
|
|
base_ts = datetime(2024, 1, 1, 0, 0, 0)
|
|
candles = [self._make_candle(base_ts + timedelta(hours=i)) for i in range(7)]
|
|
storage.insert_candles(candles, "BTCUSDT", "1h")
|
|
|
|
count = storage.get_candle_count("BTCUSDT", "1h")
|
|
assert count == 7
|
|
|
|
|
|
class TestDataStorageFundingRates:
|
|
"""Tests for funding rate operations."""
|
|
|
|
def test_insert_funding_rate(self) -> None:
|
|
"""Funding rate is stored correctly."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
db_path = Path(tmpdir) / "test.duckdb"
|
|
with DataStorage(db_path) as storage:
|
|
storage.initialize_schema()
|
|
|
|
rate = FundingRate(
|
|
symbol="BTCUSDT",
|
|
funding_rate=Decimal("0.0001"),
|
|
funding_time=datetime(2024, 1, 1, 8, 0, 0),
|
|
mark_price=Decimal("50000.00"),
|
|
)
|
|
|
|
storage.insert_funding_rate(rate)
|
|
|
|
# Verify stored
|
|
result = storage.conn.execute(
|
|
"SELECT symbol, funding_rate FROM funding_rates"
|
|
).fetchone()
|
|
assert result is not None
|
|
assert result[0] == "BTCUSDT"
|