"""Assemble and run the live trading engine.""" from __future__ import annotations import asyncio import logging import signal import time from decimal import Decimal from functools import partial import structlog from structlog import dev as structlog_dev from structlog import processors as structlog_processors from structlog import stdlib as structlog_stdlib from tradefinder.adapters.binance_usdm import BinanceUSDMAdapter from tradefinder.adapters.types import ( MarginType, Order, OrderRequest, OrderType, PositionSide, Side, ) from tradefinder.core.config import LogFormat, Settings, get_settings from tradefinder.core.regime import RegimeClassifier from tradefinder.core.risk import PortfolioRisk, RiskEngine from tradefinder.data.fetcher import DataFetcher from tradefinder.data.storage import DataStorage from tradefinder.strategies.signals import SignalType from tradefinder.strategies.supertrend import SupertrendStrategy logger = structlog.get_logger(__name__) DEFAULT_LOOP_INTERVAL = 60 FETCH_LIMIT = 512 MAX_FETCH_ATTEMPTS = 3 MAX_ORDER_ATTEMPTS = 3 RETRY_DELAY_SECONDS = 2 def _configure_logging(settings: Settings) -> None: """Set up structlog and the underlying stdlib logger.""" level = getattr(logging, settings.log_level, logging.INFO) logging.basicConfig(level=level) renderer = ( structlog_dev.ConsoleRenderer() if settings.log_format == LogFormat.CONSOLE else structlog_processors.JSONRenderer() ) structlog.configure( processors=[ structlog_stdlib.add_logger_name, structlog_stdlib.add_log_level, structlog_processors.TimeStamper(fmt="iso"), structlog_processors.StackInfoRenderer(), structlog_processors.format_exc_info, renderer, ], context_class=dict, logger_factory=structlog_stdlib.LoggerFactory(), wrapper_class=structlog_stdlib.BoundLogger, cache_logger_on_first_use=True, ) global logger logger = structlog.get_logger(__name__) async def _async_main() -> None: """Asynchronous entry point that orchestrates the trading loop.""" settings = get_settings() _configure_logging(settings) storage = DataStorage(settings.duckdb_path) storage.connect() storage.initialize_schema() adapter: BinanceUSDMAdapter | None = None fetcher: DataFetcher | None = None shutdown_event = asyncio.Event() try: _register_signal_handlers(shutdown_event) if settings.is_paper_trading: logger.info( "Paper trading enabled, skipping exchange connectivity", mode=settings.trading_mode.value, symbols=settings.symbols_list, ) else: adapter = BinanceUSDMAdapter(settings) await _initialize_adapter(adapter, settings.symbols_list) fetcher = DataFetcher(adapter, storage) strategy = SupertrendStrategy() risk_engine = RiskEngine(PortfolioRisk()) regime_classifier = RegimeClassifier() logger.info( "Starting trading loop", mode=settings.trading_mode.value, loop_interval=DEFAULT_LOOP_INTERVAL, symbols=settings.symbols_list, ) await _run_trading_loop( settings, storage, strategy, regime_classifier, risk_engine, adapter, fetcher, shutdown_event, ) except asyncio.CancelledError: logger.info("Trading loop cancelled") raise except Exception as error: logger.exception("Trading engine failed", error=str(error)) raise finally: if adapter is not None: await adapter.disconnect() storage.disconnect() logger.info("Trading engine shutdown") def main() -> None: """Sync wrapper that starts the async trading engine.""" asyncio.run(_async_main()) async def _run_trading_loop( settings: Settings, storage: DataStorage, strategy: SupertrendStrategy, regime_classifier: RegimeClassifier, risk_engine: RiskEngine, adapter: BinanceUSDMAdapter | None, fetcher: DataFetcher | None, shutdown_event: asyncio.Event, ) -> None: """Perform trading cycles until a shutdown signal is received.""" timeframe = settings.signal_timeframe while not shutdown_event.is_set(): loop_start = time.monotonic() for symbol in settings.symbols_list: if shutdown_event.is_set(): break await _process_symbol( settings, storage, strategy, regime_classifier, risk_engine, adapter, fetcher, symbol, timeframe, shutdown_event, ) if shutdown_event.is_set(): break elapsed = time.monotonic() - loop_start await _sleep_or_cancel(DEFAULT_LOOP_INTERVAL - elapsed, shutdown_event) async def _process_symbol( settings: Settings, storage: DataStorage, strategy: SupertrendStrategy, regime_classifier: RegimeClassifier, risk_engine: RiskEngine, adapter: BinanceUSDMAdapter | None, fetcher: DataFetcher | None, symbol: str, timeframe: str, shutdown_event: asyncio.Event, ) -> None: """Fetch candles, classify the regime, and execute signals for a single symbol.""" if shutdown_event.is_set(): return await _fetch_latest_candles(fetcher, symbol, timeframe) candles = storage.get_candles(symbol, timeframe, limit=FETCH_LIMIT) if not candles: logger.debug("Skipping symbol with no candles", symbol=symbol, timeframe=timeframe) return regime = regime_classifier.classify(candles) signal = strategy.generate_signal(candles) if signal is None or not signal.is_entry: logger.debug( "No actionable signal", symbol=symbol, regime=regime.name, strategy=strategy.name, ) return signal_symbol = signal.symbol or symbol try: equity = await _determine_equity(settings, adapter) except RuntimeError as error: logger.warning("Unable to determine equity", error=str(error)) return risk_pct = Decimal(str(settings.risk.per_trade_pct)) try: position_size = risk_engine.calculate_position_size( equity, signal.price, signal.stop_loss, risk_pct ) except ValueError as error: logger.warning("Position sizing failed", symbol=symbol, error=str(error)) return if position_size <= Decimal("0"): logger.warning("Computed non-positive position size", symbol=symbol) return risk_amount = risk_engine.calculate_risk_amount(position_size, signal.price, signal.stop_loss) strategy_cap_pct = Decimal(str(settings.risk.max_equity_per_strategy_pct)) total_cap_pct = Decimal("100") if not risk_engine.can_allocate_strategy( strategy.name, risk_amount, equity, max_per_strategy_pct=strategy_cap_pct, max_total_exposure_pct=total_cap_pct, ): logger.warning( "Risk limits block allocation", symbol=symbol, strategy=strategy.name, ) return if adapter is None: logger.info( "Paper trading signal", symbol=signal_symbol, signal_type=signal.signal_type.value, regime=regime.name, strategy=strategy.name, ) return entry_side, position_side = _map_signal_to_sides(signal.signal_type) entry_request = OrderRequest( symbol=signal_symbol, side=entry_side, position_side=position_side, order_type=OrderType.LIMIT, quantity=position_size, price=signal.price, ) try: entry_order = await _safe_create_order(adapter, entry_request) logger.info( "Entry order placed", order_id=entry_order.id, symbol=entry_order.symbol, side=entry_order.side.value, strategy=strategy.name, ) except Exception as error: logger.error( "Failed to place entry order", symbol=signal_symbol, error=str(error), ) return stop_side = Side.SELL if entry_side == Side.BUY else Side.BUY stop_request = OrderRequest( symbol=signal_symbol, side=stop_side, position_side=position_side, order_type=OrderType.STOP_MARKET, quantity=position_size, stop_price=signal.stop_loss, reduce_only=True, ) try: stop_order = await _safe_create_order(adapter, stop_request) logger.info( "Stop loss order placed", order_id=stop_order.id, symbol=stop_order.symbol, stop_price=stop_order.stop_price, ) except Exception as error: logger.error( "Failed to place stop loss order", symbol=signal_symbol, error=str(error), ) async def _fetch_latest_candles(fetcher: DataFetcher | None, symbol: str, timeframe: str) -> None: """Fetch new candles if possible and read the latest batch from storage.""" if fetcher is None: return last_error: Exception | None = None for attempt in range(1, MAX_FETCH_ATTEMPTS + 1): try: await fetcher.fetch_latest_candles(symbol, timeframe, limit=FETCH_LIMIT) return except Exception as error: last_error = error logger.warning( "Candle sync failed", symbol=symbol, attempt=attempt, error=str(error), ) await asyncio.sleep(RETRY_DELAY_SECONDS * attempt) if last_error is not None: logger.error("Unable to synchronize candles", symbol=symbol, error=str(last_error)) async def _safe_create_order(adapter: BinanceUSDMAdapter, request: OrderRequest) -> Order: """Retry order creation to handle transient exchange errors.""" last_error: Exception | None = None for attempt in range(1, MAX_ORDER_ATTEMPTS + 1): try: return await adapter.create_order(request) except Exception as error: last_error = error logger.warning( "Order attempt failed", symbol=request.symbol, attempt=attempt, error=str(error), ) await asyncio.sleep(RETRY_DELAY_SECONDS * attempt) logger.error( "Max retries reached while creating order", symbol=request.symbol, error=str(last_error) if last_error else "unknown", ) raise last_error or RuntimeError("Order request failed without exception details") def _map_signal_to_sides(signal_type: SignalType) -> tuple[Side, PositionSide]: """Map a signal direction to a trade side.""" if signal_type == SignalType.ENTRY_LONG: return Side.BUY, PositionSide.LONG return Side.SELL, PositionSide.SHORT async def _determine_equity(settings: Settings, adapter: BinanceUSDMAdapter | None) -> Decimal: """Return the equity that should be used for sizing positions.""" if settings.is_paper_trading: return Decimal(str(settings.paper_equity_usdt)) if adapter is None: raise RuntimeError("Adapter is required for non-paper trading") balance = await adapter.get_balance() return balance.total_equity def _register_signal_handlers(shutdown_event: asyncio.Event) -> None: """Register signal handlers that trigger a graceful shutdown.""" loop = asyncio.get_running_loop() def _handle(sig: signal.Signals) -> None: logger.info("Shutdown signal received", signal=sig.name) shutdown_event.set() def _handle_signal(signum: int, frame: object) -> None: shutdown_event.set() for sig in (signal.SIGINT, signal.SIGTERM): try: loop.add_signal_handler(sig, partial(_handle, sig)) except (NotImplementedError, RuntimeError): signal.signal(sig, _handle_signal) async def _sleep_or_cancel(delay: float, shutdown_event: asyncio.Event) -> None: """Sleep but exit early if a shutdown signal is received.""" if delay <= 0: return try: await asyncio.wait_for(shutdown_event.wait(), timeout=delay) except TimeoutError: pass async def _initialize_adapter(adapter: BinanceUSDMAdapter, symbols: list[str]) -> None: """Configure the adapter for hedge mode and isolated margin symbols.""" await adapter.connect() await adapter.configure_hedge_mode(True) for symbol in symbols: await adapter.configure_margin_type(symbol, MarginType.ISOLATED)