Compare commits

..

5 Commits

Author SHA1 Message Date
bnair123
49ea34041e Fix CI/CD: remove GHA cache, split build jobs
All checks were successful
CI/CD Pipeline / test (push) Successful in 1m45s
CI/CD Pipeline / build-engine (push) Successful in 2m56s
CI/CD Pipeline / build-ui (push) Successful in 2m57s
- Remove cache-from/cache-to that caused timeout errors on Gitea
- Split engine and UI builds into parallel jobs
- Both images now build independently (one failure won't block the other)
2025-12-27 21:26:47 +04:00
bnair123
42feef50c6 Add trading engine, UI dashboard, and entry point stubs
Some checks failed
CI/CD Pipeline / test (push) Successful in 1m49s
CI/CD Pipeline / build-and-push (push) Failing after 6m31s
- Add main.py: Async trading engine with regime detection, Supertrend
  strategy, position sizing, and order execution with retry logic
- Add app.py: Streamlit dashboard for monitoring (placeholder UI)
- Add backtest.py: Stub entry point for tf-backtest CLI
- Add optimize.py: Stub entry point for tf-optimize CLI
- Update package __init__.py files with version and docstrings

All quality checks pass:
- ruff check: 0 errors
- ruff format: formatted
- mypy src/: 0 errors (strict)
- pytest: 191 passed, 68% coverage
2025-12-27 21:14:08 +04:00
bnair123
30af8e7c70 Format code with ruff to pass CI format check
Some checks failed
CI/CD Pipeline / build-and-push (push) Has been cancelled
CI/CD Pipeline / test (push) Has been cancelled
2025-12-27 19:12:19 +04:00
bnair123
524e6d7498 Fix lint: move imports before pytestmark in test_streamer.py
Some checks failed
CI/CD Pipeline / test (push) Failing after 1m21s
CI/CD Pipeline / build-and-push (push) Has been skipped
2025-12-27 19:09:11 +04:00
bnair123
02c0995d26 Fix CI/CD: use public git.thefetagroup.com URL
Some checks failed
CI/CD Pipeline / test (push) Failing after 2m7s
CI/CD Pipeline / build-and-push (push) Has been skipped
2025-12-27 19:05:20 +04:00
12 changed files with 738 additions and 17 deletions

View File

@@ -10,8 +10,7 @@ on:
env: env:
REGISTRY: gitea.thefetagroup.com REGISTRY: gitea.thefetagroup.com
IMAGE_NAME: bnair/cryptobot IMAGE_NAME: bnair/cryptobot
# Internal Docker network URL for git operations GITEA_URL: https://git.thefetagroup.com
GITEA_INTERNAL_URL: http://gitea:3000
jobs: jobs:
test: test:
@@ -19,7 +18,7 @@ jobs:
steps: steps:
- name: Checkout code - name: Checkout code
run: | run: |
git clone --depth 1 ${{ env.GITEA_INTERNAL_URL }}/bnair/CryptoTrading.git . git clone --depth 1 ${{ env.GITEA_URL }}/bnair/CryptoTrading.git .
git fetch origin ${{ github.sha }} --depth 1 git fetch origin ${{ github.sha }} --depth 1
git checkout ${{ github.sha }} git checkout ${{ github.sha }}
@@ -54,14 +53,14 @@ jobs:
- name: Run tests - name: Run tests
run: pytest --cov=tradefinder --cov-report=term-missing run: pytest --cov=tradefinder --cov-report=term-missing
build-and-push: build-engine:
runs-on: ubuntu-latest runs-on: ubuntu-latest
needs: test needs: test
if: startsWith(github.ref, 'refs/tags/v') if: startsWith(github.ref, 'refs/tags/v')
steps: steps:
- name: Checkout code - name: Checkout code
run: | run: |
git clone --depth 1 ${{ env.GITEA_INTERNAL_URL }}/bnair/CryptoTrading.git . git clone --depth 1 ${{ env.GITEA_URL }}/bnair/CryptoTrading.git .
git fetch origin ${{ github.ref_name }} --depth 1 git fetch origin ${{ github.ref_name }} --depth 1
git checkout ${{ github.ref_name }} git checkout ${{ github.ref_name }}
@@ -89,8 +88,31 @@ jobs:
tags: | tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.version.outputs.VERSION }} ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.version.outputs.VERSION }}
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
cache-from: type=gha
cache-to: type=gha,mode=max build-ui:
runs-on: ubuntu-latest
needs: test
if: startsWith(github.ref, 'refs/tags/v')
steps:
- name: Checkout code
run: |
git clone --depth 1 ${{ env.GITEA_URL }}/bnair/CryptoTrading.git .
git fetch origin ${{ github.ref_name }} --depth 1
git checkout ${{ github.ref_name }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Extract version from tag
id: version
run: echo "VERSION=${{ github.ref_name }}" >> $GITHUB_OUTPUT
- name: Login to Gitea Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ secrets.REGISTRY_USERNAME }}
password: ${{ secrets.REGISTRY_PASSWORD }}
- name: Build and push UI image - name: Build and push UI image
uses: docker/build-push-action@v5 uses: docker/build-push-action@v5
@@ -102,5 +124,3 @@ jobs:
tags: | tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-ui:${{ steps.version.outputs.VERSION }} ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-ui:${{ steps.version.outputs.VERSION }}
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-ui:latest ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}-ui:latest
cache-from: type=gha
cache-to: type=gha,mode=max

0
.todo Normal file
View File

View File

@@ -0,0 +1,3 @@
"""TradeFinder - Automated crypto trading system."""
__version__ = "0.1.0"

View File

@@ -0,0 +1,25 @@
"""Backtesting entry point stub for future implementation."""
import asyncio
import structlog
from tradefinder.core.config import get_settings
logger = structlog.get_logger(__name__)
async def run_backtest() -> None:
"""Placeholder for the asynchronous backtesting workflow."""
settings = get_settings()
logger.info("Backtester not yet implemented", settings=settings)
await asyncio.sleep(0)
def main() -> None:
"""CLI entry that runs the backtest placeholder."""
asyncio.run(run_backtest())
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,433 @@
"""Assemble and run the live trading engine."""
from __future__ import annotations
import asyncio
import logging
import signal
import time
from decimal import Decimal
from functools import partial
import structlog
from structlog import dev as structlog_dev
from structlog import processors as structlog_processors
from structlog import stdlib as structlog_stdlib
from tradefinder.adapters.binance_usdm import BinanceUSDMAdapter
from tradefinder.adapters.types import (
MarginType,
Order,
OrderRequest,
OrderType,
PositionSide,
Side,
)
from tradefinder.core.config import LogFormat, Settings, get_settings
from tradefinder.core.regime import RegimeClassifier
from tradefinder.core.risk import PortfolioRisk, RiskEngine
from tradefinder.data.fetcher import DataFetcher
from tradefinder.data.storage import DataStorage
from tradefinder.strategies.signals import SignalType
from tradefinder.strategies.supertrend import SupertrendStrategy
logger = structlog.get_logger(__name__)
DEFAULT_LOOP_INTERVAL = 60
FETCH_LIMIT = 512
MAX_FETCH_ATTEMPTS = 3
MAX_ORDER_ATTEMPTS = 3
RETRY_DELAY_SECONDS = 2
def _configure_logging(settings: Settings) -> None:
"""Set up structlog and the underlying stdlib logger."""
level = getattr(logging, settings.log_level, logging.INFO)
logging.basicConfig(level=level)
renderer = (
structlog_dev.ConsoleRenderer()
if settings.log_format == LogFormat.CONSOLE
else structlog_processors.JSONRenderer()
)
structlog.configure(
processors=[
structlog_stdlib.add_logger_name,
structlog_stdlib.add_log_level,
structlog_processors.TimeStamper(fmt="iso"),
structlog_processors.StackInfoRenderer(),
structlog_processors.format_exc_info,
renderer,
],
context_class=dict,
logger_factory=structlog_stdlib.LoggerFactory(),
wrapper_class=structlog_stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
global logger
logger = structlog.get_logger(__name__)
async def _async_main() -> None:
"""Asynchronous entry point that orchestrates the trading loop."""
settings = get_settings()
_configure_logging(settings)
storage = DataStorage(settings.duckdb_path)
storage.connect()
storage.initialize_schema()
adapter: BinanceUSDMAdapter | None = None
fetcher: DataFetcher | None = None
shutdown_event = asyncio.Event()
try:
_register_signal_handlers(shutdown_event)
if settings.is_paper_trading:
logger.info(
"Paper trading enabled, skipping exchange connectivity",
mode=settings.trading_mode.value,
symbols=settings.symbols_list,
)
else:
adapter = BinanceUSDMAdapter(settings)
await _initialize_adapter(adapter, settings.symbols_list)
fetcher = DataFetcher(adapter, storage)
strategy = SupertrendStrategy()
risk_engine = RiskEngine(PortfolioRisk())
regime_classifier = RegimeClassifier()
logger.info(
"Starting trading loop",
mode=settings.trading_mode.value,
loop_interval=DEFAULT_LOOP_INTERVAL,
symbols=settings.symbols_list,
)
await _run_trading_loop(
settings,
storage,
strategy,
regime_classifier,
risk_engine,
adapter,
fetcher,
shutdown_event,
)
except asyncio.CancelledError:
logger.info("Trading loop cancelled")
raise
except Exception as error:
logger.exception("Trading engine failed", error=str(error))
raise
finally:
if adapter is not None:
await adapter.disconnect()
storage.disconnect()
logger.info("Trading engine shutdown")
def main() -> None:
"""Sync wrapper that starts the async trading engine."""
asyncio.run(_async_main())
async def _run_trading_loop(
settings: Settings,
storage: DataStorage,
strategy: SupertrendStrategy,
regime_classifier: RegimeClassifier,
risk_engine: RiskEngine,
adapter: BinanceUSDMAdapter | None,
fetcher: DataFetcher | None,
shutdown_event: asyncio.Event,
) -> None:
"""Perform trading cycles until a shutdown signal is received."""
timeframe = settings.signal_timeframe
while not shutdown_event.is_set():
loop_start = time.monotonic()
for symbol in settings.symbols_list:
if shutdown_event.is_set():
break
await _process_symbol(
settings,
storage,
strategy,
regime_classifier,
risk_engine,
adapter,
fetcher,
symbol,
timeframe,
shutdown_event,
)
if shutdown_event.is_set():
break
elapsed = time.monotonic() - loop_start
await _sleep_or_cancel(DEFAULT_LOOP_INTERVAL - elapsed, shutdown_event)
async def _process_symbol(
settings: Settings,
storage: DataStorage,
strategy: SupertrendStrategy,
regime_classifier: RegimeClassifier,
risk_engine: RiskEngine,
adapter: BinanceUSDMAdapter | None,
fetcher: DataFetcher | None,
symbol: str,
timeframe: str,
shutdown_event: asyncio.Event,
) -> None:
"""Fetch candles, classify the regime, and execute signals for a single symbol."""
if shutdown_event.is_set():
return
await _fetch_latest_candles(fetcher, symbol, timeframe)
candles = storage.get_candles(symbol, timeframe, limit=FETCH_LIMIT)
if not candles:
logger.debug("Skipping symbol with no candles", symbol=symbol, timeframe=timeframe)
return
regime = regime_classifier.classify(candles)
signal = strategy.generate_signal(candles)
if signal is None or not signal.is_entry:
logger.debug(
"No actionable signal",
symbol=symbol,
regime=regime.name,
strategy=strategy.name,
)
return
signal_symbol = signal.symbol or symbol
try:
equity = await _determine_equity(settings, adapter)
except RuntimeError as error:
logger.warning("Unable to determine equity", error=str(error))
return
risk_pct = Decimal(str(settings.risk.per_trade_pct))
try:
position_size = risk_engine.calculate_position_size(
equity, signal.price, signal.stop_loss, risk_pct
)
except ValueError as error:
logger.warning("Position sizing failed", symbol=symbol, error=str(error))
return
if position_size <= Decimal("0"):
logger.warning("Computed non-positive position size", symbol=symbol)
return
risk_amount = risk_engine.calculate_risk_amount(position_size, signal.price, signal.stop_loss)
strategy_cap_pct = Decimal(str(settings.risk.max_equity_per_strategy_pct))
total_cap_pct = Decimal("100")
if not risk_engine.can_allocate_strategy(
strategy.name,
risk_amount,
equity,
max_per_strategy_pct=strategy_cap_pct,
max_total_exposure_pct=total_cap_pct,
):
logger.warning(
"Risk limits block allocation",
symbol=symbol,
strategy=strategy.name,
)
return
if adapter is None:
logger.info(
"Paper trading signal",
symbol=signal_symbol,
signal_type=signal.signal_type.value,
regime=regime.name,
strategy=strategy.name,
)
return
entry_side, position_side = _map_signal_to_sides(signal.signal_type)
entry_request = OrderRequest(
symbol=signal_symbol,
side=entry_side,
position_side=position_side,
order_type=OrderType.LIMIT,
quantity=position_size,
price=signal.price,
)
try:
entry_order = await _safe_create_order(adapter, entry_request)
logger.info(
"Entry order placed",
order_id=entry_order.id,
symbol=entry_order.symbol,
side=entry_order.side.value,
strategy=strategy.name,
)
except Exception as error:
logger.error(
"Failed to place entry order",
symbol=signal_symbol,
error=str(error),
)
return
stop_side = Side.SELL if entry_side == Side.BUY else Side.BUY
stop_request = OrderRequest(
symbol=signal_symbol,
side=stop_side,
position_side=position_side,
order_type=OrderType.STOP_MARKET,
quantity=position_size,
stop_price=signal.stop_loss,
reduce_only=True,
)
try:
stop_order = await _safe_create_order(adapter, stop_request)
logger.info(
"Stop loss order placed",
order_id=stop_order.id,
symbol=stop_order.symbol,
stop_price=stop_order.stop_price,
)
except Exception as error:
logger.error(
"Failed to place stop loss order",
symbol=signal_symbol,
error=str(error),
)
async def _fetch_latest_candles(fetcher: DataFetcher | None, symbol: str, timeframe: str) -> None:
"""Fetch new candles if possible and read the latest batch from storage."""
if fetcher is None:
return
last_error: Exception | None = None
for attempt in range(1, MAX_FETCH_ATTEMPTS + 1):
try:
await fetcher.fetch_latest_candles(symbol, timeframe, limit=FETCH_LIMIT)
return
except Exception as error:
last_error = error
logger.warning(
"Candle sync failed",
symbol=symbol,
attempt=attempt,
error=str(error),
)
await asyncio.sleep(RETRY_DELAY_SECONDS * attempt)
if last_error is not None:
logger.error("Unable to synchronize candles", symbol=symbol, error=str(last_error))
async def _safe_create_order(adapter: BinanceUSDMAdapter, request: OrderRequest) -> Order:
"""Retry order creation to handle transient exchange errors."""
last_error: Exception | None = None
for attempt in range(1, MAX_ORDER_ATTEMPTS + 1):
try:
return await adapter.create_order(request)
except Exception as error:
last_error = error
logger.warning(
"Order attempt failed",
symbol=request.symbol,
attempt=attempt,
error=str(error),
)
await asyncio.sleep(RETRY_DELAY_SECONDS * attempt)
logger.error(
"Max retries reached while creating order",
symbol=request.symbol,
error=str(last_error) if last_error else "unknown",
)
raise last_error or RuntimeError("Order request failed without exception details")
def _map_signal_to_sides(signal_type: SignalType) -> tuple[Side, PositionSide]:
"""Map a signal direction to a trade side."""
if signal_type == SignalType.ENTRY_LONG:
return Side.BUY, PositionSide.LONG
return Side.SELL, PositionSide.SHORT
async def _determine_equity(settings: Settings, adapter: BinanceUSDMAdapter | None) -> Decimal:
"""Return the equity that should be used for sizing positions."""
if settings.is_paper_trading:
return Decimal(str(settings.paper_equity_usdt))
if adapter is None:
raise RuntimeError("Adapter is required for non-paper trading")
balance = await adapter.get_balance()
return balance.total_equity
def _register_signal_handlers(shutdown_event: asyncio.Event) -> None:
"""Register signal handlers that trigger a graceful shutdown."""
loop = asyncio.get_running_loop()
def _handle(sig: signal.Signals) -> None:
logger.info("Shutdown signal received", signal=sig.name)
shutdown_event.set()
def _handle_signal(signum: int, frame: object) -> None:
shutdown_event.set()
for sig in (signal.SIGINT, signal.SIGTERM):
try:
loop.add_signal_handler(sig, partial(_handle, sig))
except (NotImplementedError, RuntimeError):
signal.signal(sig, _handle_signal)
async def _sleep_or_cancel(delay: float, shutdown_event: asyncio.Event) -> None:
"""Sleep but exit early if a shutdown signal is received."""
if delay <= 0:
return
try:
await asyncio.wait_for(shutdown_event.wait(), timeout=delay)
except TimeoutError:
pass
async def _initialize_adapter(adapter: BinanceUSDMAdapter, symbols: list[str]) -> None:
"""Configure the adapter for hedge mode and isolated margin symbols."""
await adapter.connect()
await adapter.configure_hedge_mode(True)
for symbol in symbols:
await adapter.configure_margin_type(symbol, MarginType.ISOLATED)

View File

@@ -0,0 +1,21 @@
"""Stub optimizer entry point; real logic coming with Optuna."""
import asyncio
import structlog
from tradefinder.core.config import get_settings
logger = structlog.get_logger(__name__)
async def run_optimizer() -> None:
"""Placeholder async optimizer run; future Optuna logic belongs here."""
settings = get_settings()
# TODO: add Optuna study setup and walk-forward execution pipeline.
logger.info("Optimizer not yet implemented", trading_mode=settings.trading_mode.value)
def main() -> None:
"""Sync wrapper so python -m tradefinder.core.optimize works."""
asyncio.run(run_optimizer())

View File

@@ -172,7 +172,9 @@ class RiskEngine:
if equity <= Decimal("0"): if equity <= Decimal("0"):
raise ValueError("Equity must be positive to allocate exposure") raise ValueError("Equity must be positive to allocate exposure")
if risk_amount <= Decimal("0"): if risk_amount <= Decimal("0"):
logger.debug("Risk amount is non-positive", strategy=strategy, risk_amount=str(risk_amount)) logger.debug(
"Risk amount is non-positive", strategy=strategy, risk_amount=str(risk_amount)
)
return False return False
strategy_pct = max_per_strategy_pct or self._max_per_strategy_pct strategy_pct = max_per_strategy_pct or self._max_per_strategy_pct

View File

@@ -45,7 +45,9 @@ class SupertrendStrategy(Strategy):
multiplier=float(self._multiplier), multiplier=float(self._multiplier),
) )
direction_col = next((col for col in supertrend.columns if col.startswith("SUPERTd_")), None) direction_col = next(
(col for col in supertrend.columns if col.startswith("SUPERTd_")), None
)
if direction_col is None: if direction_col is None:
logger.debug("Supertrend direction series missing", strategy=self.name) logger.debug("Supertrend direction series missing", strategy=self.name)
return None return None
@@ -110,7 +112,11 @@ class SupertrendStrategy(Strategy):
def get_stop_loss(self, entry_price: Decimal, side: Side) -> Decimal: def get_stop_loss(self, entry_price: Decimal, side: Side) -> Decimal:
"""Use ATR buffer for Supertrend stop loss.""" """Use ATR buffer for Supertrend stop loss."""
atr_buffer = self._last_atr if self._last_atr and self._last_atr > Decimal("0") else entry_price * Decimal("0.02") atr_buffer = (
self._last_atr
if self._last_atr and self._last_atr > Decimal("0")
else entry_price * Decimal("0.02")
)
if side == Side.BUY: if side == Side.BUY:
stop = entry_price - atr_buffer stop = entry_price - atr_buffer
else: else:
@@ -150,7 +156,14 @@ class SupertrendStrategy(Strategy):
@staticmethod @staticmethod
def _trend_level(supertrend: pd.DataFrame) -> Decimal | None: def _trend_level(supertrend: pd.DataFrame) -> Decimal | None:
trend_col = next((col for col in supertrend.columns if col.startswith("SUPERT_") and not col.startswith("SUPERTd_")), None) trend_col = next(
(
col
for col in supertrend.columns
if col.startswith("SUPERT_") and not col.startswith("SUPERTd_")
),
None,
)
if trend_col is None: if trend_col is None:
return None return None
return SupertrendStrategy._decimal_from_series_tail(supertrend[trend_col]) return SupertrendStrategy._decimal_from_series_tail(supertrend[trend_col])

View File

@@ -0,0 +1 @@
"""TradeFinder UI module for Streamlit dashboard."""

197
src/tradefinder/ui/app.py Normal file
View File

@@ -0,0 +1,197 @@
"""TradeFinder Streamlit Dashboard.
Provides a web-based UI for monitoring trading activity, positions,
and market regime status.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any
import streamlit as st
import structlog
from tradefinder.core.config import TradingMode, get_settings
from tradefinder.data.storage import DataStorage
logger = structlog.get_logger(__name__)
st.set_page_config(
page_title="TradeFinder Dashboard",
page_icon="📈",
layout="wide",
initial_sidebar_state="expanded",
)
def get_storage() -> DataStorage | None:
"""Get storage instance if database exists."""
settings = get_settings()
db_path = settings.duckdb_path
if not db_path.exists():
return None
storage = DataStorage(db_path)
storage.connect()
return storage
@st.cache_data(ttl=30)
def get_candle_stats(symbol: str, timeframe: str) -> dict[str, Any]:
"""Get candle statistics for a symbol."""
storage = get_storage()
if storage is None:
return {}
try:
count = storage.get_candle_count(symbol, timeframe)
latest = storage.get_latest_candle_timestamp(symbol, timeframe)
return {
"count": count,
"latest": latest.isoformat() if latest else "N/A",
}
finally:
storage.disconnect()
def render_header() -> None:
"""Render the dashboard header."""
st.title("📈 TradeFinder Dashboard")
st.markdown("---")
def render_trading_mode() -> None:
"""Render trading mode indicator."""
try:
settings = get_settings()
mode = settings.trading_mode
if mode == TradingMode.PAPER:
st.sidebar.success("🟢 Paper Trading")
elif mode == TradingMode.TESTNET:
st.sidebar.warning("🟡 Testnet Mode")
elif mode == TradingMode.LIVE:
st.sidebar.error("🔴 LIVE TRADING")
except Exception:
st.sidebar.info("⚪ Mode Unknown")
def render_symbols_overview() -> None:
"""Render overview of configured symbols."""
try:
settings = get_settings()
symbols = settings.symbols_list
timeframe = settings.signal_timeframe
st.subheader("Symbol Status")
cols = st.columns(len(symbols))
for i, symbol in enumerate(symbols):
with cols[i]:
st.metric(label=symbol, value=timeframe)
stats = get_candle_stats(symbol, timeframe)
if stats:
st.caption(f"Candles: {stats.get('count', 0)}")
st.caption(f"Latest: {stats.get('latest', 'N/A')}")
except Exception as e:
st.error(f"Error loading symbols: {e}")
def render_database_status() -> None:
"""Render database connection status."""
st.subheader("Database Status")
storage = get_storage()
if storage is None:
st.warning("Database not initialized yet. Start the trading engine first.")
return
try:
st.success(f"Connected to: {storage.db_path}")
finally:
storage.disconnect()
def render_placeholder_positions() -> None:
"""Render placeholder for positions table."""
st.subheader("Open Positions")
st.info("Position tracking will be available when the trading engine is running.")
st.dataframe(
{
"Symbol": [],
"Side": [],
"Size": [],
"Entry Price": [],
"Mark Price": [],
"PnL": [],
}
)
def render_placeholder_signals() -> None:
"""Render placeholder for recent signals."""
st.subheader("Recent Signals")
st.info("Signal history will be displayed here when signals are generated.")
st.dataframe(
{
"Time": [],
"Symbol": [],
"Signal": [],
"Strategy": [],
"Regime": [],
}
)
def render_regime_indicator() -> None:
"""Render current regime indicator."""
st.sidebar.subheader("Market Regime")
st.sidebar.info("Regime detection requires running engine")
def render_strategy_status() -> None:
"""Render strategy status."""
st.sidebar.subheader("Active Strategy")
st.sidebar.metric(label="Strategy", value="Supertrend")
st.sidebar.caption("Period: 10, Multiplier: 3.0")
def render_footer() -> None:
"""Render footer with refresh info."""
st.markdown("---")
st.caption(f"Last updated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
st.caption("Auto-refresh every 30 seconds")
def main() -> None:
"""Main dashboard entry point."""
render_trading_mode()
render_regime_indicator()
render_strategy_status()
render_header()
col1, col2 = st.columns(2)
with col1:
render_database_status()
render_symbols_overview()
with col2:
render_placeholder_positions()
render_placeholder_signals()
render_footer()
import time
time.sleep(30)
st.rerun()
if __name__ == "__main__":
main()

View File

@@ -12,8 +12,6 @@ from unittest.mock import AsyncMock, Mock, patch
import pytest import pytest
pytestmark = pytest.mark.skip(reason="Async WebSocket tests have timing issues - streamer verified manually")
from tradefinder.core.config import Settings from tradefinder.core.config import Settings
from tradefinder.data.streamer import ( from tradefinder.data.streamer import (
DataStreamer, DataStreamer,
@@ -21,6 +19,10 @@ from tradefinder.data.streamer import (
MarkPriceMessage, MarkPriceMessage,
) )
pytestmark = pytest.mark.skip(
reason="Async WebSocket tests have timing issues - streamer verified manually"
)
@pytest.fixture @pytest.fixture
def settings() -> Settings: def settings() -> Settings:

View File

@@ -126,10 +126,14 @@ class TestSupertrendStrategySignals:
candles = _build_candle_sequence(base_timestamp, partial) candles = _build_candle_sequence(base_timestamp, partial)
assert strategy.generate_signal(candles) is None assert strategy.generate_signal(candles) is None
def test_generate_signal_empty_candles_returns_none(self, default_strategy: SupertrendStrategy) -> None: def test_generate_signal_empty_candles_returns_none(
self, default_strategy: SupertrendStrategy
) -> None:
assert default_strategy.generate_signal([]) is None assert default_strategy.generate_signal([]) is None
def test_generate_signal_none_input_raises_type_error(self, default_strategy: SupertrendStrategy) -> None: def test_generate_signal_none_input_raises_type_error(
self, default_strategy: SupertrendStrategy
) -> None:
with pytest.raises(TypeError): with pytest.raises(TypeError):
default_strategy.generate_signal(None) # type: ignore[arg-type] default_strategy.generate_signal(None) # type: ignore[arg-type]