"""Unit tests for DataStorage (DuckDB operations).""" import tempfile from datetime import datetime, timedelta from decimal import Decimal from pathlib import Path import pytest from tradefinder.adapters.types import Candle, FundingRate from tradefinder.data.storage import DataStorage class TestDataStorageConnection: """Tests for connection lifecycle.""" def test_connect_creates_database_file(self) -> None: """Database file is created on connect.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" storage = DataStorage(db_path) storage.connect() assert db_path.exists() storage.disconnect() def test_connect_creates_parent_directories(self) -> None: """Parent directories are created if they don't exist.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "nested" / "dir" / "test.duckdb" storage = DataStorage(db_path) storage.connect() assert db_path.parent.exists() storage.disconnect() def test_disconnect_closes_connection(self) -> None: """Disconnect properly closes the connection.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" storage = DataStorage(db_path) storage.connect() storage.disconnect() assert storage._conn is None def test_context_manager_lifecycle(self) -> None: """Context manager properly opens and closes connection.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: assert storage._conn is not None assert storage._conn is None def test_conn_property_raises_when_not_connected(self) -> None: """Accessing conn property raises when not connected.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" storage = DataStorage(db_path) with pytest.raises(RuntimeError, match="Not connected"): _ = storage.conn class TestDataStorageSchema: """Tests for schema initialization.""" def test_initialize_schema_creates_tables(self) -> None: """All tables are created by initialize_schema.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() # Verify tables exist result = storage.conn.execute( "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'" ).fetchall() tables = {row[0] for row in result} assert "candles" in tables assert "trades" in tables assert "funding_rates" in tables def test_initialize_schema_is_idempotent(self) -> None: """Calling initialize_schema multiple times is safe.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() storage.initialize_schema() # Should not raise class TestDataStorageCandles: """Tests for candle CRUD operations.""" def _make_candle(self, timestamp: datetime) -> Candle: """Create a test candle.""" return Candle( timestamp=timestamp, open=Decimal("50000.00"), high=Decimal("51000.00"), low=Decimal("49000.00"), close=Decimal("50500.00"), volume=Decimal("1000.50"), ) def test_insert_candles_stores_data(self) -> None: """Candles are stored correctly.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() ts = datetime(2024, 1, 1, 12, 0, 0) candles = [self._make_candle(ts)] inserted = storage.insert_candles(candles, "BTCUSDT", "1h") assert inserted == 1 def test_insert_candles_empty_list_returns_zero(self) -> None: """Inserting empty list returns 0.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() inserted = storage.insert_candles([], "BTCUSDT", "1h") assert inserted == 0 def test_insert_candles_handles_duplicates(self) -> None: """Duplicate candles are replaced (upsert behavior).""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() ts = datetime(2024, 1, 1, 12, 0, 0) candles = [self._make_candle(ts)] storage.insert_candles(candles, "BTCUSDT", "1h") storage.insert_candles(candles, "BTCUSDT", "1h") # Duplicate count = storage.get_candle_count("BTCUSDT", "1h") assert count == 1 # Not 2 def test_get_candles_retrieves_data(self) -> None: """Candles can be retrieved correctly.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() ts = datetime(2024, 1, 1, 12, 0, 0) candles = [self._make_candle(ts)] storage.insert_candles(candles, "BTCUSDT", "1h") result = storage.get_candles("BTCUSDT", "1h") assert len(result) == 1 assert result[0].open == Decimal("50000.00") assert result[0].close == Decimal("50500.00") def test_get_candles_respects_time_range(self) -> None: """Candles are filtered by start/end time.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() base_ts = datetime(2024, 1, 1, 0, 0, 0) candles = [self._make_candle(base_ts + timedelta(hours=i)) for i in range(10)] storage.insert_candles(candles, "BTCUSDT", "1h") # Query middle range start = base_ts + timedelta(hours=3) end = base_ts + timedelta(hours=6) result = storage.get_candles("BTCUSDT", "1h", start=start, end=end) assert len(result) == 4 # hours 3, 4, 5, 6 def test_get_candles_respects_limit(self) -> None: """Candle count is limited correctly.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() base_ts = datetime(2024, 1, 1, 0, 0, 0) candles = [self._make_candle(base_ts + timedelta(hours=i)) for i in range(10)] storage.insert_candles(candles, "BTCUSDT", "1h") result = storage.get_candles("BTCUSDT", "1h", limit=5) assert len(result) == 5 def test_get_candles_returns_ascending_order(self) -> None: """Candles are returned oldest first.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() base_ts = datetime(2024, 1, 1, 0, 0, 0) # Insert in reverse order candles = [self._make_candle(base_ts + timedelta(hours=i)) for i in range(5)] storage.insert_candles(candles[::-1], "BTCUSDT", "1h") result = storage.get_candles("BTCUSDT", "1h") timestamps = [c.timestamp for c in result] assert timestamps == sorted(timestamps) def test_get_candles_filters_by_symbol(self) -> None: """Candles are filtered by symbol.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() ts = datetime(2024, 1, 1, 12, 0, 0) candles = [self._make_candle(ts)] storage.insert_candles(candles, "BTCUSDT", "1h") storage.insert_candles(candles, "ETHUSDT", "1h") btc_result = storage.get_candles("BTCUSDT", "1h") eth_result = storage.get_candles("ETHUSDT", "1h") assert len(btc_result) == 1 assert len(eth_result) == 1 def test_get_latest_candle_timestamp(self) -> None: """Latest timestamp is returned correctly.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() base_ts = datetime(2024, 1, 1, 0, 0, 0) candles = [self._make_candle(base_ts + timedelta(hours=i)) for i in range(5)] storage.insert_candles(candles, "BTCUSDT", "1h") latest = storage.get_latest_candle_timestamp("BTCUSDT", "1h") assert latest == base_ts + timedelta(hours=4) def test_get_latest_candle_timestamp_returns_none_when_empty(self) -> None: """None is returned when no candles exist.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() latest = storage.get_latest_candle_timestamp("BTCUSDT", "1h") assert latest is None def test_get_candle_count(self) -> None: """Candle count is accurate.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() base_ts = datetime(2024, 1, 1, 0, 0, 0) candles = [self._make_candle(base_ts + timedelta(hours=i)) for i in range(7)] storage.insert_candles(candles, "BTCUSDT", "1h") count = storage.get_candle_count("BTCUSDT", "1h") assert count == 7 class TestDataStorageFundingRates: """Tests for funding rate operations.""" def test_insert_funding_rate(self) -> None: """Funding rate is stored correctly.""" with tempfile.TemporaryDirectory() as tmpdir: db_path = Path(tmpdir) / "test.duckdb" with DataStorage(db_path) as storage: storage.initialize_schema() rate = FundingRate( symbol="BTCUSDT", funding_rate=Decimal("0.0001"), funding_time=datetime(2024, 1, 1, 8, 0, 0), mark_price=Decimal("50000.00"), ) storage.insert_funding_rate(rate) # Verify stored result = storage.conn.execute( "SELECT symbol, funding_rate FROM funding_rates" ).fetchone() assert result is not None assert result[0] == "BTCUSDT"