diff --git a/src/tradefinder/__init__.py b/src/tradefinder/__init__.py index e69de29..4ebd280 100644 --- a/src/tradefinder/__init__.py +++ b/src/tradefinder/__init__.py @@ -0,0 +1,3 @@ +"""TradeFinder - Automated crypto trading system.""" + +__version__ = "0.1.0" diff --git a/src/tradefinder/core/backtest.py b/src/tradefinder/core/backtest.py new file mode 100644 index 0000000..abc39a3 --- /dev/null +++ b/src/tradefinder/core/backtest.py @@ -0,0 +1,25 @@ +"""Backtesting entry point stub for future implementation.""" + +import asyncio + +import structlog + +from tradefinder.core.config import get_settings + +logger = structlog.get_logger(__name__) + + +async def run_backtest() -> None: + """Placeholder for the asynchronous backtesting workflow.""" + settings = get_settings() + logger.info("Backtester not yet implemented", settings=settings) + await asyncio.sleep(0) + + +def main() -> None: + """CLI entry that runs the backtest placeholder.""" + asyncio.run(run_backtest()) + + +if __name__ == "__main__": + main() diff --git a/src/tradefinder/core/main.py b/src/tradefinder/core/main.py new file mode 100644 index 0000000..24593ec --- /dev/null +++ b/src/tradefinder/core/main.py @@ -0,0 +1,433 @@ +"""Assemble and run the live trading engine.""" + +from __future__ import annotations + +import asyncio +import logging +import signal +import time +from decimal import Decimal +from functools import partial + +import structlog +from structlog import dev as structlog_dev +from structlog import processors as structlog_processors +from structlog import stdlib as structlog_stdlib + +from tradefinder.adapters.binance_usdm import BinanceUSDMAdapter +from tradefinder.adapters.types import ( + MarginType, + Order, + OrderRequest, + OrderType, + PositionSide, + Side, +) +from tradefinder.core.config import LogFormat, Settings, get_settings +from tradefinder.core.regime import RegimeClassifier +from tradefinder.core.risk import PortfolioRisk, RiskEngine +from tradefinder.data.fetcher import DataFetcher +from tradefinder.data.storage import DataStorage +from tradefinder.strategies.signals import SignalType +from tradefinder.strategies.supertrend import SupertrendStrategy + +logger = structlog.get_logger(__name__) + +DEFAULT_LOOP_INTERVAL = 60 +FETCH_LIMIT = 512 +MAX_FETCH_ATTEMPTS = 3 +MAX_ORDER_ATTEMPTS = 3 +RETRY_DELAY_SECONDS = 2 + + +def _configure_logging(settings: Settings) -> None: + """Set up structlog and the underlying stdlib logger.""" + + level = getattr(logging, settings.log_level, logging.INFO) + logging.basicConfig(level=level) + + renderer = ( + structlog_dev.ConsoleRenderer() + if settings.log_format == LogFormat.CONSOLE + else structlog_processors.JSONRenderer() + ) + + structlog.configure( + processors=[ + structlog_stdlib.add_logger_name, + structlog_stdlib.add_log_level, + structlog_processors.TimeStamper(fmt="iso"), + structlog_processors.StackInfoRenderer(), + structlog_processors.format_exc_info, + renderer, + ], + context_class=dict, + logger_factory=structlog_stdlib.LoggerFactory(), + wrapper_class=structlog_stdlib.BoundLogger, + cache_logger_on_first_use=True, + ) + + global logger + logger = structlog.get_logger(__name__) + + +async def _async_main() -> None: + """Asynchronous entry point that orchestrates the trading loop.""" + + settings = get_settings() + _configure_logging(settings) + + storage = DataStorage(settings.duckdb_path) + storage.connect() + storage.initialize_schema() + + adapter: BinanceUSDMAdapter | None = None + fetcher: DataFetcher | None = None + shutdown_event = asyncio.Event() + + try: + _register_signal_handlers(shutdown_event) + + if settings.is_paper_trading: + logger.info( + "Paper trading enabled, skipping exchange connectivity", + mode=settings.trading_mode.value, + symbols=settings.symbols_list, + ) + else: + adapter = BinanceUSDMAdapter(settings) + await _initialize_adapter(adapter, settings.symbols_list) + fetcher = DataFetcher(adapter, storage) + + strategy = SupertrendStrategy() + risk_engine = RiskEngine(PortfolioRisk()) + regime_classifier = RegimeClassifier() + + logger.info( + "Starting trading loop", + mode=settings.trading_mode.value, + loop_interval=DEFAULT_LOOP_INTERVAL, + symbols=settings.symbols_list, + ) + + await _run_trading_loop( + settings, + storage, + strategy, + regime_classifier, + risk_engine, + adapter, + fetcher, + shutdown_event, + ) + except asyncio.CancelledError: + logger.info("Trading loop cancelled") + raise + except Exception as error: + logger.exception("Trading engine failed", error=str(error)) + raise + finally: + if adapter is not None: + await adapter.disconnect() + storage.disconnect() + logger.info("Trading engine shutdown") + + +def main() -> None: + """Sync wrapper that starts the async trading engine.""" + asyncio.run(_async_main()) + + +async def _run_trading_loop( + settings: Settings, + storage: DataStorage, + strategy: SupertrendStrategy, + regime_classifier: RegimeClassifier, + risk_engine: RiskEngine, + adapter: BinanceUSDMAdapter | None, + fetcher: DataFetcher | None, + shutdown_event: asyncio.Event, +) -> None: + """Perform trading cycles until a shutdown signal is received.""" + + timeframe = settings.signal_timeframe + + while not shutdown_event.is_set(): + loop_start = time.monotonic() + + for symbol in settings.symbols_list: + if shutdown_event.is_set(): + break + + await _process_symbol( + settings, + storage, + strategy, + regime_classifier, + risk_engine, + adapter, + fetcher, + symbol, + timeframe, + shutdown_event, + ) + + if shutdown_event.is_set(): + break + + elapsed = time.monotonic() - loop_start + await _sleep_or_cancel(DEFAULT_LOOP_INTERVAL - elapsed, shutdown_event) + + +async def _process_symbol( + settings: Settings, + storage: DataStorage, + strategy: SupertrendStrategy, + regime_classifier: RegimeClassifier, + risk_engine: RiskEngine, + adapter: BinanceUSDMAdapter | None, + fetcher: DataFetcher | None, + symbol: str, + timeframe: str, + shutdown_event: asyncio.Event, +) -> None: + """Fetch candles, classify the regime, and execute signals for a single symbol.""" + + if shutdown_event.is_set(): + return + + await _fetch_latest_candles(fetcher, symbol, timeframe) + + candles = storage.get_candles(symbol, timeframe, limit=FETCH_LIMIT) + if not candles: + logger.debug("Skipping symbol with no candles", symbol=symbol, timeframe=timeframe) + return + + regime = regime_classifier.classify(candles) + signal = strategy.generate_signal(candles) + if signal is None or not signal.is_entry: + logger.debug( + "No actionable signal", + symbol=symbol, + regime=regime.name, + strategy=strategy.name, + ) + return + + signal_symbol = signal.symbol or symbol + + try: + equity = await _determine_equity(settings, adapter) + except RuntimeError as error: + logger.warning("Unable to determine equity", error=str(error)) + return + + risk_pct = Decimal(str(settings.risk.per_trade_pct)) + + try: + position_size = risk_engine.calculate_position_size( + equity, signal.price, signal.stop_loss, risk_pct + ) + except ValueError as error: + logger.warning("Position sizing failed", symbol=symbol, error=str(error)) + return + + if position_size <= Decimal("0"): + logger.warning("Computed non-positive position size", symbol=symbol) + return + + risk_amount = risk_engine.calculate_risk_amount(position_size, signal.price, signal.stop_loss) + + strategy_cap_pct = Decimal(str(settings.risk.max_equity_per_strategy_pct)) + total_cap_pct = Decimal("100") + + if not risk_engine.can_allocate_strategy( + strategy.name, + risk_amount, + equity, + max_per_strategy_pct=strategy_cap_pct, + max_total_exposure_pct=total_cap_pct, + ): + logger.warning( + "Risk limits block allocation", + symbol=symbol, + strategy=strategy.name, + ) + return + + if adapter is None: + logger.info( + "Paper trading signal", + symbol=signal_symbol, + signal_type=signal.signal_type.value, + regime=regime.name, + strategy=strategy.name, + ) + return + + entry_side, position_side = _map_signal_to_sides(signal.signal_type) + + entry_request = OrderRequest( + symbol=signal_symbol, + side=entry_side, + position_side=position_side, + order_type=OrderType.LIMIT, + quantity=position_size, + price=signal.price, + ) + + try: + entry_order = await _safe_create_order(adapter, entry_request) + logger.info( + "Entry order placed", + order_id=entry_order.id, + symbol=entry_order.symbol, + side=entry_order.side.value, + strategy=strategy.name, + ) + except Exception as error: + logger.error( + "Failed to place entry order", + symbol=signal_symbol, + error=str(error), + ) + return + + stop_side = Side.SELL if entry_side == Side.BUY else Side.BUY + + stop_request = OrderRequest( + symbol=signal_symbol, + side=stop_side, + position_side=position_side, + order_type=OrderType.STOP_MARKET, + quantity=position_size, + stop_price=signal.stop_loss, + reduce_only=True, + ) + + try: + stop_order = await _safe_create_order(adapter, stop_request) + logger.info( + "Stop loss order placed", + order_id=stop_order.id, + symbol=stop_order.symbol, + stop_price=stop_order.stop_price, + ) + except Exception as error: + logger.error( + "Failed to place stop loss order", + symbol=signal_symbol, + error=str(error), + ) + + +async def _fetch_latest_candles(fetcher: DataFetcher | None, symbol: str, timeframe: str) -> None: + """Fetch new candles if possible and read the latest batch from storage.""" + + if fetcher is None: + return + + last_error: Exception | None = None + + for attempt in range(1, MAX_FETCH_ATTEMPTS + 1): + try: + await fetcher.fetch_latest_candles(symbol, timeframe, limit=FETCH_LIMIT) + return + except Exception as error: + last_error = error + logger.warning( + "Candle sync failed", + symbol=symbol, + attempt=attempt, + error=str(error), + ) + await asyncio.sleep(RETRY_DELAY_SECONDS * attempt) + + if last_error is not None: + logger.error("Unable to synchronize candles", symbol=symbol, error=str(last_error)) + + +async def _safe_create_order(adapter: BinanceUSDMAdapter, request: OrderRequest) -> Order: + """Retry order creation to handle transient exchange errors.""" + + last_error: Exception | None = None + + for attempt in range(1, MAX_ORDER_ATTEMPTS + 1): + try: + return await adapter.create_order(request) + except Exception as error: + last_error = error + logger.warning( + "Order attempt failed", + symbol=request.symbol, + attempt=attempt, + error=str(error), + ) + await asyncio.sleep(RETRY_DELAY_SECONDS * attempt) + + logger.error( + "Max retries reached while creating order", + symbol=request.symbol, + error=str(last_error) if last_error else "unknown", + ) + raise last_error or RuntimeError("Order request failed without exception details") + + +def _map_signal_to_sides(signal_type: SignalType) -> tuple[Side, PositionSide]: + """Map a signal direction to a trade side.""" + + if signal_type == SignalType.ENTRY_LONG: + return Side.BUY, PositionSide.LONG + return Side.SELL, PositionSide.SHORT + + +async def _determine_equity(settings: Settings, adapter: BinanceUSDMAdapter | None) -> Decimal: + """Return the equity that should be used for sizing positions.""" + + if settings.is_paper_trading: + return Decimal(str(settings.paper_equity_usdt)) + + if adapter is None: + raise RuntimeError("Adapter is required for non-paper trading") + + balance = await adapter.get_balance() + return balance.total_equity + + +def _register_signal_handlers(shutdown_event: asyncio.Event) -> None: + """Register signal handlers that trigger a graceful shutdown.""" + + loop = asyncio.get_running_loop() + + def _handle(sig: signal.Signals) -> None: + logger.info("Shutdown signal received", signal=sig.name) + shutdown_event.set() + + def _handle_signal(signum: int, frame: object) -> None: + shutdown_event.set() + + for sig in (signal.SIGINT, signal.SIGTERM): + try: + loop.add_signal_handler(sig, partial(_handle, sig)) + except (NotImplementedError, RuntimeError): + signal.signal(sig, _handle_signal) + + +async def _sleep_or_cancel(delay: float, shutdown_event: asyncio.Event) -> None: + """Sleep but exit early if a shutdown signal is received.""" + if delay <= 0: + return + + try: + await asyncio.wait_for(shutdown_event.wait(), timeout=delay) + except TimeoutError: + pass + + +async def _initialize_adapter(adapter: BinanceUSDMAdapter, symbols: list[str]) -> None: + """Configure the adapter for hedge mode and isolated margin symbols.""" + + await adapter.connect() + await adapter.configure_hedge_mode(True) + for symbol in symbols: + await adapter.configure_margin_type(symbol, MarginType.ISOLATED) diff --git a/src/tradefinder/core/optimize.py b/src/tradefinder/core/optimize.py new file mode 100644 index 0000000..9867d46 --- /dev/null +++ b/src/tradefinder/core/optimize.py @@ -0,0 +1,21 @@ +"""Stub optimizer entry point; real logic coming with Optuna.""" + +import asyncio + +import structlog + +from tradefinder.core.config import get_settings + +logger = structlog.get_logger(__name__) + + +async def run_optimizer() -> None: + """Placeholder async optimizer run; future Optuna logic belongs here.""" + settings = get_settings() + # TODO: add Optuna study setup and walk-forward execution pipeline. + logger.info("Optimizer not yet implemented", trading_mode=settings.trading_mode.value) + + +def main() -> None: + """Sync wrapper so python -m tradefinder.core.optimize works.""" + asyncio.run(run_optimizer()) diff --git a/src/tradefinder/ui/__init__.py b/src/tradefinder/ui/__init__.py index e69de29..09f09a4 100644 --- a/src/tradefinder/ui/__init__.py +++ b/src/tradefinder/ui/__init__.py @@ -0,0 +1 @@ +"""TradeFinder UI module for Streamlit dashboard.""" diff --git a/src/tradefinder/ui/app.py b/src/tradefinder/ui/app.py new file mode 100644 index 0000000..b8ccd85 --- /dev/null +++ b/src/tradefinder/ui/app.py @@ -0,0 +1,197 @@ +"""TradeFinder Streamlit Dashboard. + +Provides a web-based UI for monitoring trading activity, positions, +and market regime status. +""" + +from __future__ import annotations + +from datetime import datetime +from typing import Any + +import streamlit as st +import structlog + +from tradefinder.core.config import TradingMode, get_settings +from tradefinder.data.storage import DataStorage + +logger = structlog.get_logger(__name__) + +st.set_page_config( + page_title="TradeFinder Dashboard", + page_icon="📈", + layout="wide", + initial_sidebar_state="expanded", +) + + +def get_storage() -> DataStorage | None: + """Get storage instance if database exists.""" + settings = get_settings() + db_path = settings.duckdb_path + + if not db_path.exists(): + return None + + storage = DataStorage(db_path) + storage.connect() + return storage + + +@st.cache_data(ttl=30) +def get_candle_stats(symbol: str, timeframe: str) -> dict[str, Any]: + """Get candle statistics for a symbol.""" + storage = get_storage() + if storage is None: + return {} + + try: + count = storage.get_candle_count(symbol, timeframe) + latest = storage.get_latest_candle_timestamp(symbol, timeframe) + return { + "count": count, + "latest": latest.isoformat() if latest else "N/A", + } + finally: + storage.disconnect() + + +def render_header() -> None: + """Render the dashboard header.""" + st.title("📈 TradeFinder Dashboard") + st.markdown("---") + + +def render_trading_mode() -> None: + """Render trading mode indicator.""" + try: + settings = get_settings() + mode = settings.trading_mode + + if mode == TradingMode.PAPER: + st.sidebar.success("🟢 Paper Trading") + elif mode == TradingMode.TESTNET: + st.sidebar.warning("🟡 Testnet Mode") + elif mode == TradingMode.LIVE: + st.sidebar.error("🔴 LIVE TRADING") + except Exception: + st.sidebar.info("⚪ Mode Unknown") + + +def render_symbols_overview() -> None: + """Render overview of configured symbols.""" + try: + settings = get_settings() + symbols = settings.symbols_list + timeframe = settings.signal_timeframe + + st.subheader("Symbol Status") + + cols = st.columns(len(symbols)) + for i, symbol in enumerate(symbols): + with cols[i]: + st.metric(label=symbol, value=timeframe) + stats = get_candle_stats(symbol, timeframe) + if stats: + st.caption(f"Candles: {stats.get('count', 0)}") + st.caption(f"Latest: {stats.get('latest', 'N/A')}") + except Exception as e: + st.error(f"Error loading symbols: {e}") + + +def render_database_status() -> None: + """Render database connection status.""" + st.subheader("Database Status") + + storage = get_storage() + if storage is None: + st.warning("Database not initialized yet. Start the trading engine first.") + return + + try: + st.success(f"Connected to: {storage.db_path}") + finally: + storage.disconnect() + + +def render_placeholder_positions() -> None: + """Render placeholder for positions table.""" + st.subheader("Open Positions") + st.info("Position tracking will be available when the trading engine is running.") + + st.dataframe( + { + "Symbol": [], + "Side": [], + "Size": [], + "Entry Price": [], + "Mark Price": [], + "PnL": [], + } + ) + + +def render_placeholder_signals() -> None: + """Render placeholder for recent signals.""" + st.subheader("Recent Signals") + st.info("Signal history will be displayed here when signals are generated.") + + st.dataframe( + { + "Time": [], + "Symbol": [], + "Signal": [], + "Strategy": [], + "Regime": [], + } + ) + + +def render_regime_indicator() -> None: + """Render current regime indicator.""" + st.sidebar.subheader("Market Regime") + st.sidebar.info("Regime detection requires running engine") + + +def render_strategy_status() -> None: + """Render strategy status.""" + st.sidebar.subheader("Active Strategy") + st.sidebar.metric(label="Strategy", value="Supertrend") + st.sidebar.caption("Period: 10, Multiplier: 3.0") + + +def render_footer() -> None: + """Render footer with refresh info.""" + st.markdown("---") + st.caption(f"Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + st.caption("Auto-refresh every 30 seconds") + + +def main() -> None: + """Main dashboard entry point.""" + render_trading_mode() + render_regime_indicator() + render_strategy_status() + + render_header() + + col1, col2 = st.columns(2) + + with col1: + render_database_status() + render_symbols_overview() + + with col2: + render_placeholder_positions() + + render_placeholder_signals() + render_footer() + + import time + + time.sleep(30) + st.rerun() + + +if __name__ == "__main__": + main()