import asyncio import aiohttp import time import json import logging import numpy as np from collections import defaultdict import sys from pathlib import Path # Add project root to path sys.path.insert(0, str(Path(__file__).parent.parent)) from nautilus_dolphin.nautilus.alpha_orchestrator import NDAlphaEngine from nautilus_dolphin.nautilus.adaptive_circuit_breaker import AdaptiveCircuitBreaker from nautilus_dolphin.nautilus.ob_features import OBFeatureEngine from nautilus_dolphin.nautilus.ob_provider import OBProvider, OBSnapshot from external_factors.ob_stream_service import OBStreamService logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s') logger = logging.getLogger("PaperTrade") class LiveOBAdapter(OBProvider): """Adapts continuous OBStreamService to the OBProvider interface for the Engine.""" def __init__(self, stream_service: OBStreamService): self.stream = stream_service self._last_snaps = {} self.assets = stream_service.assets def _update_snap(self, asset: str, data: dict): if data: self._last_snaps[asset] = OBSnapshot( timestamp=data['timestamp'], asset=asset, bid_notional=data['bid_notional'].copy(), ask_notional=data['ask_notional'].copy(), bid_depth=data['bid_depth'].copy(), ask_depth=data['ask_depth'].copy() ) def get_snapshot(self, asset: str, timestamp: float): # Always returns the absolute latest snapshot regardless of requested timestamp # In a real live environment, the engine asks for "now" return self._last_snaps.get(asset) def get_assets(self): return self.assets def get_all_timestamps(self, asset: str): return np.array([time.time()], dtype=np.float64) async def fetch_historical_klines(session: aiohttp.ClientSession, symbol: str, limit: int = 1500) -> list: """Fetch 1500 recent 5m klines to warm up the engine.""" url = f"https://fapi.binance.com/fapi/v1/klines?symbol={symbol}&interval=5m&limit={limit}" async with session.get(url) as resp: data = await resp.json() # [openTime, open, high, low, close, volume, closeTime, quoteVolume, count, takerBuyVolume, takerBuyQuoteVolume, ignore] return [{"ts": d[0]/1000.0, "c": float(d[4])} for d in data] async def run_paper_trade(): assets = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT"] logger.info(f"Starting Paper Trading Environment for {assets}...") # 1. Initialize Engine & Components ENGINE_KWARGS = dict( initial_capital=25000.0, vel_div_threshold=-0.02, vel_div_extreme=-0.05, min_leverage=0.5, max_leverage=5.0, leverage_convexity=3.0, fraction=0.20, fixed_tp_pct=0.0099, stop_pct=1.0, max_hold_bars=120, use_direction_confirm=True, dc_lookback_bars=7, dc_min_magnitude_bps=0.75, dc_skip_contradicts=True, dc_leverage_boost=1.0, dc_leverage_reduce=0.5, use_asset_selection=True, min_irp_alignment=0.45, use_sp_fees=True, use_sp_slippage=True, sp_maker_entry_rate=0.62, sp_maker_exit_rate=0.50, use_ob_edge=True, ob_edge_bps=5.0, ob_confirm_rate=0.40, lookback=100, use_alpha_layers=True, use_dynamic_leverage=True, seed=42, ) engine = NDAlphaEngine(**ENGINE_KWARGS) # 2. Start Live OB Streamer stream_service = OBStreamService(assets=assets) asyncio.create_task(stream_service.stream()) logger.info("Connecting OB stream...") await asyncio.sleep(5) # Let it authenticate and fetch REST models live_ob_adapter = LiveOBAdapter(stream_service) ob_feature_engine = OBFeatureEngine(live_ob_adapter) engine.set_ob_engine(ob_feature_engine) # 3. Fetch Warmup Data (last ~5 days of 5m bars) async with aiohttp.ClientSession() as session: logger.info("Fetching warmup 5m klines from Binance...") history = {a: await fetch_historical_klines(session, a) for a in assets} # Align by timestamp all_ts = set() for a in assets: for k in history[a]: all_ts.add(k['ts']) sorted_ts = sorted(list(all_ts)) logger.info(f"Warming up engine with {len(sorted_ts)} historical bars...") ph = defaultdict(list) b_idx = 0 all_vols = [] engine.regime_direction = -1 engine.regime_size_mult = 1.0 # Quick warmup loop for ts in sorted_ts: prices = {} for a in assets: # Find closest price match = next((k['c'] for k in reversed(history[a]) if k['ts'] <= ts), None) if match: prices[a] = match ph[a].append(match) if len(ph[a]) > 500: ph[a] = ph[a][-500:] # Calculate vel_div (using BTCUSDT as proxy) btc_hist = ph["BTCUSDT"] vd = 0.0 vrok = False if len(btc_hist) >= 50: seg = btc_hist[-50:] vd = float(np.std(np.diff(seg)/np.array(seg[:-1]))) all_vols.append(vd) if len(all_vols) > 100: vol_p60 = float(np.percentile(all_vols, 60)) if vd > vol_p60: vrok = True # Manually pump the live adapter so OB feature engine registers "something" # We don't have historical OB for warmup, so we'll just inject Neutral zeros or live if connected for a in assets: snap = await stream_service.get_depth_buckets(a) if snap: live_ob_adapter._update_snap(a, snap) engine.process_bar(bar_idx=b_idx, vel_div=vd, prices=prices, vol_regime_ok=vrok, price_histories=ph) b_idx += 1 # Evaluate warmup metrics def print_metrics(prefix="[WARMUP METRICS]"): tr = engine.trade_history w = [t for t in tr if t.pnl_absolute > 0] l = [t for t in tr if t.pnl_absolute <= 0] gw = sum(t.pnl_absolute for t in w) if w else 0 gl = abs(sum(t.pnl_absolute for t in l)) if l else 0 roi = (engine.capital - 25000) / 25000 * 100 pf_val = gw / gl if gl > 0 else 999 wr = len(w) / len(tr) * 100 if tr else 0.0 logger.info(f"{prefix} Capital: ${engine.capital:,.2f} | ROI: {roi:+.2f}% | PF: {pf_val:.3f} | WR: {wr:.1f}% | Trades: {len(tr)}") print_metrics() logger.info("=====================================================================") logger.info(">>> WARMUP COMPLETE. ENTERING LIVE PAPER TRADING MODE. <<<") logger.info("=====================================================================") # 4. Live Tick Loop (Every 60s to simulate fast intra-bar ticks or 5m bars) # We tick every 20 seconds, updating the price and OB, and letting SmartPlacer evaluate last_5m = sorted_ts[-1] if sorted_ts else time.time() while True: try: await asyncio.sleep(20) # 20 second tick current_ts = time.time() # Fetch latest prices from Live Stream instead of REST prices = {} for a in assets: snap = await stream_service.get_depth_buckets(a) if snap: live_ob_adapter._update_snap(a, snap) # Use mid price prices[a] = (snap['best_bid'] + snap['best_ask']) / 2.0 if not prices: logger.warning("No live prices available yet...") continue # If 5 minutes elapsed, it's a new bar, update histories is_new_bar = (current_ts - last_5m) >= 300 if is_new_bar: last_5m = current_ts b_idx += 1 for a in assets: if a in prices: ph[a].append(prices[a]) if len(ph[a]) > 500: ph[a] = ph[a][-500:] # Update vd btc_hist = ph["BTCUSDT"] if len(btc_hist) >= 50: seg = btc_hist[-50:] vd = float(np.std(np.diff(seg)/np.array(seg[:-1]))) all_vols.append(vd) else: # Intra-bar read, VD remains same btc_hist = ph.get("BTCUSDT", []) if len(btc_hist) >= 50: seg = btc_hist[-50:] vd = float(np.std(np.diff(seg)/np.array(seg[:-1]))) else: vd = 0.0 vrok = len(all_vols) > 100 and vd > float(np.percentile(all_vols, 60)) # Process tick engine.process_bar(bar_idx=b_idx, vel_div=vd, prices=prices, vol_regime_ok=vrok, price_histories=ph) # Show live OB features safely btc_ob = ob_feature_engine.get_signal("BTCUSDT", b_idx) macro = ob_feature_engine.get_macro() imb_str = f"Imb: {btc_ob.imbalance:+.3f}" if btc_ob else "Imb: N/A" reg_map = {0: "CALM", -1: "NEUTRAL", 1: "STRESS"} reg_str = reg_map.get(macro.regime_signal, "UKWN") print_metrics(prefix=f"[LIVE {time.strftime('%H:%M:%S')}] {imb_str} | Macro: {reg_str} |") except Exception as e: logger.error(f"Live Loop Exception: {e}", exc_info=True) await asyncio.sleep(5) if __name__ == "__main__": try: asyncio.run(run_paper_trade()) except KeyboardInterrupt: logger.info("Paper trading stopped.")