diff --git a/nautilus_actor_test_implementation_5x.py b/nautilus_actor_test_implementation_5x.py index 6a64517..bea9427 100644 --- a/nautilus_actor_test_implementation_5x.py +++ b/nautilus_actor_test_implementation_5x.py @@ -3,10 +3,11 @@ import numpy as np import asyncio import json import math +import httpx +import websockets from typing import Dict, List, Optional, Tuple, Any from enum import Enum from collections import deque -import httpx import random # Added for _simulate_websocket_ticks # Nautilus imports - following test pattern @@ -44,6 +45,239 @@ class MarketRegime(Enum): BEAR = "BEAR" TRANSITION = "TRANSITION" SIDEWAYS = "SIDEWAYS" + +# ============================================================================ +# EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE +# ============================================================================ + +@dataclass +class TickData: + """Universal tick data structure from standalone""" + symbol: str + price: float + quantity: float + timestamp: int + is_buyer_maker: bool + trade_id: int + exchange: str + raw_data: Dict = None + + def __post_init__(self): + if self.raw_data is None: + self.raw_data = {} + +@dataclass +class ExchangeConfig: + """Exchange-specific configuration from standalone""" + name: str + websocket_limit_per_second: int + max_streams_per_connection: int + max_connections_per_window: int + connection_window_seconds: int + base_websocket_url: str + requires_auth: bool = False + ping_interval: float = 20.0 + ping_timeout: float = 10.0 + +class BinanceAdapter: + """EXTRACTED FROM STANDALONE: Binance-specific WebSocket adapter""" + + def __init__(self): + self.config = ExchangeConfig( + name="binance", + websocket_limit_per_second=10, + max_streams_per_connection=1024, + max_connections_per_window=300, + connection_window_seconds=300, # 5 minutes + base_websocket_url="wss://stream.binance.com:9443", + requires_auth=False + ) + self.logger = Logger(f"BINANCE_ADAPTER") + + def get_websocket_url(self, symbols: List[str]) -> str: + """Build Binance WebSocket URL with combined streams""" + if len(symbols) == 1: + symbol = symbols[0].lower() + return f"{self.config.base_websocket_url}/ws/{symbol}@trade" + else: + # Combined stream approach + streams = [f"{symbol.lower()}@trade" for symbol in symbols] + stream_string = "/".join(streams) + return f"{self.config.base_websocket_url}/stream?streams={stream_string}" + + def parse_message(self, message: str) -> Optional[TickData]: + """EXTRACTED FROM STANDALONE: Parse Binance trade message""" + try: + data = json.loads(message) + + # Handle combined stream format + if 'stream' in data and 'data' in data: + stream_name = data['stream'] + trade_data = data['data'] + else: + # Single stream format + trade_data = data + stream_name = trade_data.get('e', '') + + # Only process trade events + if not (stream_name.endswith('@trade') or trade_data.get('e') == 'trade'): + return None + + return TickData( + symbol=trade_data['s'], + price=float(trade_data['p']), + quantity=float(trade_data['q']), + timestamp=int(trade_data['T']), + is_buyer_maker=trade_data['m'], + trade_id=int(trade_data['t']), + exchange="binance", + raw_data=trade_data + ) + + except (json.JSONDecodeError, KeyError, ValueError) as e: + self.logger.error(f"Failed to parse Binance message: {e}") + return None + + def calculate_optimal_distribution(self, symbols: List[str]) -> List[List[str]]: + """EXTRACTED FROM STANDALONE: Calculate optimal symbol distribution across connections""" + # Conservative: leave room for message bursts + safety_margin = 0.7 + symbols_per_connection = int( + min( + self.config.max_streams_per_connection * safety_margin, + self.config.websocket_limit_per_second * 5 # 5-second buffer + ) + ) + + # Group symbols into chunks + symbol_groups = [] + for i in range(0, len(symbols), symbols_per_connection): + group = symbols[i:i + symbols_per_connection] + symbol_groups.append(group) + + return symbol_groups + +class SiloqyWebSocketConnection: + """EXTRACTED FROM STANDALONE: WebSocket connection adapted for Nautilus msgbus""" + + def __init__(self, connection_id: str, symbols: List[str], + binance_adapter: BinanceAdapter, main_actor_ref): + self.connection_id = connection_id + self.symbols = symbols + self.adapter = binance_adapter + self.main_actor = main_actor_ref # Reference to SILOQYMainActor for msgbus access + + self.websocket = None + self.is_running = False + self.retry_count = 0 + self.messages_received = 0 + self.last_message_time = 0.0 + self.connected_at = None + + self.logger = Logger(f"WS_{connection_id}") + self.logger.info(f"WebSocket connection initialized for {len(symbols)} symbols") + + async def start(self) -> None: + """Start the WebSocket connection""" + self.is_running = True + + while self.is_running: + try: + await self._connect_and_run() + except Exception as e: + self.logger.error(f"Connection {self.connection_id} error: {e}") + + if self.is_running: + await self._handle_reconnection() + else: + break + + async def _connect_and_run(self) -> None: + """EXTRACTED FROM STANDALONE: Connect to WebSocket and run message loop""" + url = self.adapter.get_websocket_url(self.symbols) + + self.logger.info(f"Connecting to Binance: {url[:100]}...") + + async with websockets.connect( + url, + ping_interval=self.adapter.config.ping_interval, + ping_timeout=self.adapter.config.ping_timeout, + close_timeout=10 + ) as websocket: + self.websocket = websocket + self.connected_at = time.time() + self.retry_count = 0 + + self.logger.info(f"Connection {self.connection_id} established") + + # Message processing loop + async for message in websocket: + if not self.is_running: + break + + try: + await self._process_message(message) + except Exception as e: + self.logger.error(f"Message processing error: {e}") + + async def _process_message(self, message: str) -> None: + """Process incoming WebSocket message and convert to Nautilus format""" + self.messages_received += 1 + self.last_message_time = time.time() + + # Parse message using Binance adapter + tick_data = self.adapter.parse_message(message) + if tick_data: + # Convert TickData to SiloqyTradeTick and publish via Nautilus msgbus + try: + # Create SiloqyTradeTick object + siloqy_tick = SiloqyTradeTick( + instrument_id=tick_data.symbol, + price=round(tick_data.price, 8), + size=round(tick_data.quantity, 8), + ts_event=tick_data.timestamp * 1000000, # Convert ms to ns + ts_init=int(time.time_ns()), + open_price=None, # Will be set by candle logic + candle_open_time=None, # Will be set by candle logic + tick_side=None, + exchange="BINANCE", + liquidity_flag=None + ) + + # Get candle information from main actor + if hasattr(self.main_actor, 'active_candles') and self.main_actor.active_candles: + symbol = tick_data.symbol + if symbol in self.main_actor.active_candles: + candle = self.main_actor.active_candles[symbol] + siloqy_tick.open_price = candle.get('open_price', tick_data.price) + siloqy_tick.candle_open_time = candle.get('open_time', int(time.time() * 1000)) + + # Process through main actor's tick handling (preserves all candle logic) + self.main_actor.on_websocket_tick( + tick_data.symbol, + tick_data.price, + tick_data.quantity, + tick_data.timestamp + ) + + except Exception as e: + self.logger.error(f"Failed to process tick: {e}") + + async def _handle_reconnection(self) -> None: + """Handle reconnection with exponential backoff""" + self.retry_count += 1 + delay = min(2 ** self.retry_count, 60) # Max 60 seconds + + self.logger.warning(f"Reconnecting in {delay}s (attempt {self.retry_count})") + await asyncio.sleep(delay) + + async def stop(self) -> None: + """Stop the connection""" + self.logger.info(f"Stopping connection {self.connection_id}") + self.is_running = False + + if self.websocket: + await self.websocket.close() # -------------------------------------------------------------------- # SILOQY Custom Tick - PRESERVED with fixes @@ -95,6 +329,7 @@ class SILOQYSymbolDiscoveryConfig(ActorConfig): class SILOQYMainActorConfig(ActorConfig): candle_interval_ms: int = 15 * 60 * 1000 throttle_mode: bool = False + enable_real_data: bool = False # NEW: Enable real WebSocket data class DOLPHINRegimeActorConfig(ActorConfig): max_symbols: int = 5000 @@ -200,6 +435,7 @@ class SILOQYSymbolDiscoveryActor(Actor): raise Exception(f"Failed to fetch exchange info: {response.status_code}") async def _fetch_stats_and_reconstruct_candles(self): + """PRESERVED: All original rate limiting with Nautilus async patterns""" url = "https://api.binance.com/api/v3/ticker/24hr" klines_url = "https://api.binance.com/api/v3/klines" ticker_url = "https://api.binance.com/api/v3/ticker/price" @@ -315,7 +551,7 @@ class SILOQYSymbolDiscoveryActor(Actor): except Exception as e: self.log.error(f"Request failed: {str(e)}") - # PRESERVED: Original rate limiting + # PRESERVED: Original rate limiting with Nautilus async elapsed = time.time() - start_time if i + 50 < len(self.symbols): sleep_time = max(0, rate_limit_interval - elapsed) @@ -353,6 +589,7 @@ class SILOQYMainActor(Actor): # Preserve original configuration self.candle_interval_ms = config.candle_interval_ms self.throttle_mode = config.throttle_mode + self.enable_real_data = config.enable_real_data # NEW: Real data capability self.connections = {} self.connection_tasks = {} @@ -360,6 +597,11 @@ class SILOQYMainActor(Actor): self.symbols = [] self.active_candles = {} + # WebSocket infrastructure - NEW + self.binance_adapter = BinanceAdapter() if self.enable_real_data else None + self.websocket_connections: Dict[str, SiloqyWebSocketConnection] = {} + self.websocket_tasks: Dict[str, asyncio.Task] = {} + # WebSocket tasks - managed by Nautilus ActorExecutor self.ws_tasks = [] @@ -369,6 +611,11 @@ class SILOQYMainActor(Actor): if self.throttle_mode: self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation") + if self.enable_real_data: + self.log.info("REAL DATA MODE: Will connect to Binance WebSocket streams") + else: + self.log.info("SIMULATION MODE: Will generate simulated ticks") + self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor") def on_start(self) -> None: @@ -393,6 +640,12 @@ class SILOQYMainActor(Actor): def on_stop(self) -> None: """Stop the actor - Nautilus handles executor cleanup""" self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping") + + # Stop WebSocket connections if running + if self.enable_real_data: + for conn in self.websocket_connections.values(): + asyncio.create_task(conn.stop()) + # Nautilus kernel handles executor shutdown - no manual cleanup needed async def on_start_async(self) -> None: @@ -401,8 +654,11 @@ class SILOQYMainActor(Actor): self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...") await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0) - # PRESERVED: Start WebSocket connections - await self._start_websocket_connections() + # ENHANCED: Start real or simulated connections based on configuration + if self.enable_real_data: + await self._start_real_websocket_connections() + else: + await self._start_simulated_connections() self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully") @@ -423,7 +679,6 @@ class SILOQYMainActor(Actor): except Exception as e: self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}") - # FIXED: SILOQYMainActor.handle_candles_initial def handle_candles_initial(self, data): """Handle initial candles from discovery actor and properly initialize the candle dict.""" try: @@ -448,8 +703,53 @@ class SILOQYMainActor(Actor): except Exception as e: self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}") - async def _start_websocket_connections(self): - """PRESERVED: Original WebSocket connection logic with Nautilus ActorExecutor""" + async def _start_real_websocket_connections(self): + """NEW: Start real Binance WebSocket connections with throttle control""" + self.log.info("Starting REAL Binance WebSocket connections for live data...") + + # Apply throttle mode limits to WebSocket connections + symbols_to_use = self.symbols + if self.throttle_mode: + symbols_to_use = self.symbols[:10] # Limit to 10 symbols in throttle mode + self.log.warning(f"THROTTLE MODE: Limited real data to {len(symbols_to_use)} symbols") + + # Calculate optimal symbol distribution + symbol_groups = self.binance_adapter.calculate_optimal_distribution(symbols_to_use) + + self.log.info(f"Real WebSocket distribution:") + self.log.info(f" - Total symbols: {len(symbols_to_use)}") + self.log.info(f" - Connections needed: {len(symbol_groups)}") + + # Start WebSocket connections + for i, symbol_group in enumerate(symbol_groups): + connection_id = f"binance_real_{i:03d}" + + connection = SiloqyWebSocketConnection( + connection_id=connection_id, + symbols=symbol_group, + binance_adapter=self.binance_adapter, + main_actor_ref=self + ) + + self.websocket_connections[connection_id] = connection + + # Use Nautilus ActorExecutor for WebSocket task management + if hasattr(self, '_executor') and self._executor: + future = self._executor.submit(connection.start()) + self.log.info(f"Real WebSocket {connection_id} submitted to ActorExecutor") + else: + task = asyncio.create_task(connection.start()) + self.websocket_tasks[connection_id] = task + self.log.warning(f"Real WebSocket {connection_id} started with asyncio fallback") + + # Rate limit connection establishment + if i < len(symbol_groups) - 1: + await asyncio.sleep(1.0) # 1 second between connections + + self.log.info(f"Started {len(symbol_groups)} real WebSocket connections") + + async def _start_simulated_connections(self): + """PRESERVED: Original WebSocket simulation logic with Nautilus ActorExecutor""" self.log.info("Starting WebSocket simulation for tick generation...") self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols") @@ -464,6 +764,7 @@ class SILOQYMainActor(Actor): self.ws_tasks.append(task) async def _simulate_websocket_ticks(self): + """PRESERVED: Original simulation logic""" # Adjust tick rate for throttle mode tick_delay = 0.1 if self.throttle_mode else 0.01 symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols)) @@ -489,7 +790,8 @@ class SILOQYMainActor(Actor): log_interval = 500 if self.throttle_mode else 1000 if tick_count % log_interval == 0: mode_indicator = "THROTTLE" if self.throttle_mode else "" - self.log.info(f"Generated {tick_count} ticks {mode_indicator} - latest: {symbol}@{price:.2f}") + data_source = "REAL" if self.enable_real_data else "SIM" + self.log.info(f"Generated {tick_count} ticks {mode_indicator} {data_source} - latest: {symbol}@{price:.2f}") await asyncio.sleep(tick_delay) except asyncio.CancelledError: @@ -880,7 +1182,8 @@ def test_siloqy_actors_with_nautilus_process_management(): config={ "component_id": "SILOQY-MAIN-ACTOR", "candle_interval_ms": 15 * 60 * 1000, - "throttle_mode": True # ENABLED: Reduced tick generation + "throttle_mode": True, # ENABLED: Reduced tick generation + "enable_real_data": False # CHANGE TO True for real WebSocket data } )