import time import numpy as np import asyncio import json import math import httpx import websockets from typing import Dict, List, Optional, Tuple, Any from enum import Enum from collections import deque import random # Added for _simulate_websocket_ticks from dataclasses import dataclass, field # Nautilus imports - following test pattern from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig from nautilus_trader.live.node import TradingNode from nautilus_trader.common.actor import Actor from nautilus_trader.common.config import ActorConfig from nautilus_trader.model.identifiers import TraderId from nautilus_trader.core.data import Data from nautilus_trader.common.component import Logger, init_logging # NEW: construct a fresh Tick for each normalized message (don't mutate immutable Data) from nautilus_trader.model.data import TradeTick from nautilus_trader.model.identifiers import InstrumentId, TradeId from nautilus_trader.model.objects import Price, Quantity from nautilus_trader.model.enums import AggressorSide # Initialize logging _log_guard = init_logging() _logger = Logger("SILOQY") # Topics - HIGH PERFORMANCE STRING TOPICS RAW_TOPIC = "SILOQY.RAW.TICKS" STRUCTURED_TOPIC = "SILOQY.STRUCTURED.TICKS" REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS" SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED" CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL" # ADDED LINE 18: TICK_SIZES_TOPIC = "SILOQY.TICK.SIZES" # NEW: Enhanced indicator topics for data bus publishing REGIME_INDICATORS_TOPIC = "DOLPHIN.REGIME.INDICATORS" BB_METRICS_TOPIC = "DOLPHIN.BB.METRICS" TEMPORAL_PATTERNS_TOPIC = "DOLPHIN.TEMPORAL.PATTERNS" # Rate limiting constant MIN_INTERVAL = 2.5 # seconds between API batches # Market Regime Enum class MarketRegime(Enum): BULL = "BULL" BEAR = "BEAR" TRANSITION = "TRANSITION" SIDEWAYS = "SIDEWAYS" # ============================================================================ # EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE # ============================================================================ @dataclass class TickData: """Universal tick data structure from standalone""" symbol: str price: float quantity: float timestamp: int is_buyer_maker: bool trade_id: int exchange: str raw_data: Dict = None def __post_init__(self): if self.raw_data is None: self.raw_data = {} @dataclass class ExchangeConfig: """Exchange-specific configuration from standalone""" name: str websocket_limit_per_second: int max_streams_per_connection: int max_connections_per_window: int connection_window_seconds: int base_websocket_url: str requires_auth: bool = False ping_interval: float = 20.0 ping_timeout: float = 10.0 class BinanceAdapter: """EXTRACTED FROM STANDALONE: Binance-specific WebSocket adapter""" def __init__(self): self.config = ExchangeConfig( name="binance", websocket_limit_per_second=10, max_streams_per_connection=1024, max_connections_per_window=300, connection_window_seconds=300, # 5 minutes base_websocket_url="wss://stream.binance.com:9443", requires_auth=False ) self.logger = Logger(f"BINANCE_ADAPTER") def get_websocket_url(self, symbols: List[str]) -> str: """Build Binance WebSocket URL with combined streams""" if len(symbols) == 1: symbol = symbols[0].lower() return f"{self.config.base_websocket_url}/ws/{symbol}@trade" else: # Combined stream approach streams = [f"{symbol.lower()}@trade" for symbol in symbols] stream_string = "/".join(streams) return f"{self.config.base_websocket_url}/stream?streams={stream_string}" def parse_message(self, message: str) -> Optional[TickData]: """EXTRACTED FROM STANDALONE: Parse Binance trade message""" try: data = json.loads(message) # Handle combined stream format if 'stream' in data and 'data' in data: stream_name = data['stream'] trade_data = data['data'] else: # Single stream format trade_data = data stream_name = trade_data.get('e', '') # Only process trade events if not (stream_name.endswith('@trade') or trade_data.get('e') == 'trade'): return None return TickData( symbol=trade_data['s'], price=float(trade_data['p']), quantity=float(trade_data['q']), timestamp=int(trade_data['T']), is_buyer_maker=trade_data['m'], trade_id=int(trade_data['t']), exchange="binance", raw_data=trade_data ) except (json.JSONDecodeError, KeyError, ValueError) as e: self.logger.error(f"Failed to parse Binance message: {e}") return None def calculate_optimal_distribution(self, symbols: List[str]) -> List[List[str]]: """EXTRACTED FROM STANDALONE: Calculate optimal symbol distribution across connections""" # Conservative: leave room for message bursts safety_margin = 0.7 symbols_per_connection = int( min( self.config.max_streams_per_connection * safety_margin, self.config.websocket_limit_per_second * 5 # 5-second buffer ) ) # Group symbols into chunks symbol_groups = [] for i in range(0, len(symbols), symbols_per_connection): group = symbols[i:i + symbols_per_connection] symbol_groups.append(group) return symbol_groups class SiloqyWebSocketConnection: """EXTRACTED FROM STANDALONE: WebSocket connection adapted for Nautilus msgbus""" def __init__(self, connection_id: str, symbols: List[str], binance_adapter: BinanceAdapter, main_actor_ref): self.connection_id = connection_id self.symbols = symbols self.adapter = binance_adapter self.main_actor = main_actor_ref # Reference to SILOQYMainActor for msgbus access self.websocket = None self.is_running = False self.retry_count = 0 self.messages_received = 0 self.last_message_time = 0.0 self.connected_at = None self.logger = Logger(f"WS_{connection_id}") self.logger.info(f"WebSocket connection initialized for {len(symbols)} symbols") async def start(self) -> None: """Start the WebSocket connection""" self.is_running = True while self.is_running: try: await self._connect_and_run() except Exception as e: self.logger.error(f"Connection {self.connection_id} error: {e}") if self.is_running: await self._handle_reconnection() else: break async def _connect_and_run(self) -> None: """EXTRACTED FROM STANDALONE: Connect to WebSocket and run message loop""" url = self.adapter.get_websocket_url(self.symbols) self.logger.info(f"Connecting to Binance: {url[:100]}...") async with websockets.connect( url, ping_interval=self.adapter.config.ping_interval, ping_timeout=self.adapter.config.ping_timeout, close_timeout=10 ) as websocket: self.websocket = websocket self.connected_at = time.time() self.retry_count = 0 self.logger.info(f"Connection {self.connection_id} established") # Message processing loop async for message in websocket: if not self.is_running: break try: await self._process_message(message) except Exception as e: self.logger.error(f"Message processing error: {e}") async def _process_message(self, message: str) -> None: """Process incoming WebSocket message and convert to Nautilus format""" self.messages_received += 1 self.last_message_time = time.time() # Parse message using Binance adapter tick_data = self.adapter.parse_message(message) if tick_data: # Convert TickData to SiloqyTradeTick and publish via Nautilus msgbus try: # Create SiloqyTradeTick object siloqy_tick = SiloqyTradeTick( instrument_id=tick_data.symbol, price=round(tick_data.price, 8), size=round(tick_data.quantity, 8), ts_event=tick_data.timestamp * 1000000, # Convert ms to ns ts_init=int(time.time_ns()), open_price=None, # Will be set by candle logic candle_open_time=None, # Will be set by candle logic tick_side=None, exchange="BINANCE", liquidity_flag=None ) # Get candle information from main actor if hasattr(self.main_actor, 'active_candles') and self.main_actor.active_candles: symbol = tick_data.symbol if symbol in self.main_actor.active_candles: candle = self.main_actor.active_candles[symbol] siloqy_tick.open_price = candle.get('open_price', tick_data.price) siloqy_tick.candle_open_time = candle.get('open_time', int(time.time() * 1000)) # Process through main actor's tick handling (preserves all candle logic) self.main_actor.on_websocket_tick( tick_data.symbol, tick_data.price, tick_data.quantity, tick_data.timestamp ) except Exception as e: self.logger.error(f"Failed to process tick: {e}") async def _handle_reconnection(self) -> None: """Handle reconnection with exponential backoff""" self.retry_count += 1 delay = min(2 ** self.retry_count, 60) # Max 60 seconds self.logger.warning(f"Reconnecting in {delay}s (attempt {self.retry_count})") await asyncio.sleep(delay) async def stop(self) -> None: """Stop the connection""" self.logger.info(f"Stopping connection {self.connection_id}") self.is_running = False if self.websocket: await self.websocket.close() # -------------------------------------------------------------------- # SILOQY Custom Tick - PRESERVED with fixes # -------------------------------------------------------------------- class SiloqyTradeTick(TradeTick): def __init__(self, instrument_id: str, price: float, size: float, ts_event: int, ts_init: int = None, open_price: float = None, candle_open_time: int = None, tick_side: str = None, exchange: str = None, liquidity_flag = None): # Convert to proper Nautilus types - add venue to symbol if isinstance(instrument_id, str): # Add BINANCE venue if not already present if '.' not in instrument_id: instrument_id = f"{instrument_id}.BINANCE" instr_id = InstrumentId.from_str(instrument_id) else: instr_id = instrument_id # STEP 1 FIX: Changed precision from 4 to 8 for price, and 0 to 8 for size price_obj = Price(price, precision=8) if not isinstance(price, Price) else price size_obj = Quantity(size, precision=8) if not isinstance(size, Quantity) else size # Default aggressor side and trade ID aggressor_side = AggressorSide.NO_AGGRESSOR # Default since Binance doesn't always provide this trade_id = TradeId(f"T{int(time.time_ns())}") # Generate a trade ID super().__init__( instrument_id=instr_id, price=price_obj, size=size_obj, aggressor_side=aggressor_side, trade_id=trade_id, ts_event=int(ts_event), ts_init=int(ts_init if ts_init is not None else time.time_ns()) ) # Additional SILOQY fields self.open_price = open_price if open_price is not None else price self.candle_open_time = candle_open_time if candle_open_time is not None else int(time.time() * 1000) self.tick_side = tick_side self.exchange = exchange self.liquidity_flag = liquidity_flag class SILOQYSymbolDiscoveryConfig(ActorConfig): symbols: List[str] = [] candle_interval_ms: int = 15 * 60 * 1000 throttle_mode: bool = False throttle_rate_limit_seconds: float = 10.0 max_symbols_throttled: int = 100 class SILOQYMainActorConfig(ActorConfig): candle_interval_ms: int = 15 * 60 * 1000 throttle_mode: bool = False enable_real_data: bool = False # NEW: Enable real WebSocket data class DOLPHINRegimeActorConfig(ActorConfig): max_symbols: int = 5000 ticks_per_analysis: int = 10 class SILOQYNormalizerConfig(ActorConfig): pass class SILOQYSymbolDiscoveryActor(Actor): """ Symbol discovery with all original SILOQY algorithms preserved Using Nautilus built-in ActorExecutor for task management """ def __init__(self, config: SILOQYSymbolDiscoveryConfig): super().__init__(config) # Preserve original SILOQY configuration self.symbols = list(config.symbols) if config.symbols else [] self.candle_interval_ms = config.candle_interval_ms self.active_candles = {} self.tick_sizes = {} # Process management configuration self.throttle_mode = config.throttle_mode self.throttle_rate_limit_seconds = config.throttle_rate_limit_seconds self.max_symbols_throttled = config.max_symbols_throttled if self.throttle_mode: self.log.warning("THROTTLE MODE ENABLED - Running in dev/test configuration") self.log.warning(f"Rate limit: {self.throttle_rate_limit_seconds}s between batches") self.log.warning(f"Symbol limit: {self.max_symbols_throttled} symbols max") self.log.info("SILOQYSymbolDiscoveryActor initialized with Nautilus ActorExecutor") def on_start(self) -> None: """Start the actor - using Nautilus ActorExecutor""" self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor starting") # Use Nautilus ActorExecutor for task management if hasattr(self, '_executor') and self._executor: self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization") future = self._executor.submit(self.on_start_async()) self.log.info("Nautilus ActorExecutor: Symbol discovery task submitted") else: self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") asyncio.create_task(self.on_start_async()) def on_stop(self) -> None: """Stop the actor - Nautilus handles executor cleanup""" self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor stopping") # Nautilus kernel handles executor shutdown - no manual cleanup needed async def on_start_async(self) -> None: self.log.info("Nautilus ActorExecutor: Starting async symbol discovery initialization") try: # PRESERVED: Original symbol discovery algorithm if not self.symbols: await self._discover_all_symbols() # PRESERVED: Original stats fetching and candle reconstruction stats, candle_opens = await self._fetch_stats_and_reconstruct_candles() # Set active candles from reconstruction results self.active_candles = candle_opens # Publish results using msgbus self._publish_discovery_results() self.log.info("Nautilus ActorExecutor: Symbol discovery completed successfully") except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to complete symbol discovery: {e}") # Don't re-raise, let system continue async def _discover_all_symbols(self): """PRESERVED: Original Binance symbol discovery algorithm""" self.log.info("Starting dynamic symbol discovery from Binance...") url = "https://api.binance.com/api/v3/exchangeInfo" async with httpx.AsyncClient() as client: # RATE LIMIT CHECK - Before anything else self.log.info("Checking Binance API rate limit status...") time_check_url = "https://api.binance.com/api/v3/time" try: rate_check_response = await client.get(time_check_url, timeout=5) if rate_check_response.status_code == 200: # Parse rate limit headers headers = rate_check_response.headers used_weight = headers.get('X-MBX-USED-WEIGHT-1M', 'Unknown') server_time = rate_check_response.json().get('serverTime', 'Unknown') self.log.info(f"Rate limit check passed - Used weight: {used_weight}/1200, Server time: {server_time}") # Check if we're close to rate limit if used_weight != 'Unknown' and int(used_weight) > 1000: self.log.warning(f"HIGH RATE LIMIT USAGE: {used_weight}/1200 - Proceeding with caution") elif rate_check_response.status_code == 429: retry_after = rate_check_response.headers.get('Retry-After', '60') self.log.error(f"RATE LIMITED: Must wait {retry_after} seconds before API calls") raise Exception(f"Binance API rate limited - retry after {retry_after}s") elif rate_check_response.status_code == 418: self.log.error("IP BANNED: This IP address has been auto-banned by Binance") raise Exception("IP address banned by Binance - cannot proceed") else: self.log.warning(f"Rate limit check returned status {rate_check_response.status_code}") self.log.warning("Proceeding anyway, but may encounter issues") except Exception as e: if "rate limited" in str(e).lower() or "banned" in str(e).lower(): raise # Re-raise rate limit/ban errors else: self.log.warning(f"Rate limit check failed: {e}") self.log.warning("Proceeding with symbol discovery anyway") async with httpx.AsyncClient() as client: self.log.info("Fetching exchange info from Binance API...") response = await client.get(url, timeout=10) if response.status_code == 200: self.log.info("Successfully received exchange info") data = response.json() # Combined symbol discovery and tick size extraction self.log.info("Processing symbols and extracting tick sizes...") full_symbols = [] for symbol_info in data['symbols']: if symbol_info['status'] == 'TRADING' and symbol_info['symbol'].endswith('USDT'): symbol = symbol_info['symbol'] full_symbols.append(symbol) # Extract tick size while processing # Extract tick size while processing tick_size = None for filter_info in symbol_info['filters']: if filter_info['filterType'] == 'PRICE_FILTER': tick_size = float(filter_info['tickSize']) break # If no PRICE_FILTER found, try other filter types if tick_size is None: for filter_info in symbol_info['filters']: if filter_info['filterType'] == 'TICK_SIZE': tick_size = float(filter_info['tickSize']) break # Fallback to default if still not found if tick_size is None: tick_size = 1e-8 # Default fallback self.log.warning(f"No tick size found for {symbol}, using fallback {tick_size}") self.tick_sizes[symbol] = tick_size self.log.info(f"Processed {len(full_symbols)} symbols, extracted {len(self.tick_sizes)} tick sizes") # Apply throttle mode symbol limiting if self.throttle_mode: self.symbols = full_symbols[:self.max_symbols_throttled] self.log.warning(f"THROTTLE MODE: Limited to {len(self.symbols)} symbols (from {len(full_symbols)} available)") else: self.symbols = full_symbols self.log.info(f"Discovered {len(self.symbols)} trading symbols") self.log.info(f"First 10 symbols: {self.symbols[:10]}") else: self.log.error(f"Failed to fetch exchange info: {response.status_code}") raise Exception(f"Failed to fetch exchange info: {response.status_code}") async def _fetch_stats_and_reconstruct_candles(self): """PRESERVED: All original rate limiting with Nautilus async patterns""" url = "https://api.binance.com/api/v3/ticker/24hr" klines_url = "https://api.binance.com/api/v3/klines" ticker_url = "https://api.binance.com/api/v3/ticker/price" # PRESERVED: Original rate limit handling with throttle mode support base_rate_limit = 2.5 rate_limit_interval = self.throttle_rate_limit_seconds if self.throttle_mode else base_rate_limit if self.throttle_mode: self.log.warning(f"THROTTLE MODE: Using {rate_limit_interval}s intervals (vs {base_rate_limit}s normal)") stats = {} candle_opens = {} async with httpx.AsyncClient() as client: total_batches = (len(self.symbols) + 49) // 50 for i in range(0, len(self.symbols), 50): batch_num = (i // 50) + 1 start_time = time.time() symbols_batch = self.symbols[i:i + 50] if self.throttle_mode: self.log.info(f"THROTTLE MODE: Processing batch {batch_num}/{total_batches} with {rate_limit_interval}s delay") # PRESERVED: Original boundary detection logic current_time = int(time.time() * 1000) time_into_candle = current_time % self.candle_interval_ms candle_open_time = current_time - time_into_candle at_boundary = (time_into_candle < 1000) if i == 0: # Log initial state self.log.info("DOLPHIN: Current candle analysis:") self.log.info(f" - Current time: {current_time}") self.log.info(f" - Candle interval: {self.candle_interval_ms}ms ({self.candle_interval_ms//60000}m)") self.log.info(f" - Time into candle: {time_into_candle}ms ({time_into_candle//1000}s)") self.log.info(f" - At boundary: {at_boundary}") self.log.info(f" - Candle open time: {candle_open_time}") symbols_json_string = json.dumps(symbols_batch, separators=(',', ':')) params = {"symbols": symbols_json_string} try: # PRESERVED: Original stats fetching response = await client.get(url, params=params, timeout=10) if response.status_code == 200: data = response.json() for item in data: symbol = item['symbol'] stats[symbol] = { 'count': int(item['count']), 'quoteVolume': float(item['quoteVolume']), } self.log.info(f"Fetched stats for batch {i//50 + 1}: {len(data)} symbols") else: self.log.error(f"Error {response.status_code}: {response.text}") # PRESERVED: Original DOLPHIN candle reconstruction strategy if at_boundary: # AT BOUNDARY: Fetch current prices to use as opens self.log.info(f"DOLPHIN: At boundary - fetching current prices (batch {i//50 + 1})") ticker_params = {"symbols": symbols_json_string} ticker_response = await client.get(ticker_url, params=ticker_params, timeout=5) if ticker_response.status_code == 200: ticker_data = ticker_response.json() for item in ticker_data: symbol = item['symbol'] current_price = float(item['price']) candle_opens[symbol] = current_price self.log.info(f" - Fetched current prices for {len(ticker_data)} symbols") else: self.log.warning(f" - Current price fetch failed ({ticker_response.status_code})") else: # NOT AT BOUNDARY: Fetch historical 1s kline data self.log.info(f"DOLPHIN: Not at boundary - fetching 1s kline data (batch {i//50 + 1})") for symbol in symbols_batch: try: # PRESERVED: Original kline fetching logic kline_params = { 'symbol': symbol, 'interval': '1s', 'startTime': candle_open_time, 'endTime': candle_open_time + 1000, 'limit': 1 } kline_response = await client.get(klines_url, params=kline_params, timeout=5) if kline_response.status_code == 200: kline_data = kline_response.json() if kline_data: open_price = float(kline_data[0][1]) candle_opens[symbol] = open_price if (i + len(symbols_batch)) <= 10: self.log.info(f" - {symbol}: reconstructed open = {open_price}") else: self.log.warning(f" - {symbol}: no 1s kline data found") else: self.log.warning(f" - {symbol}: kline fetch failed ({kline_response.status_code})") except Exception as e: self.log.error(f" - {symbol}: kline fetch error: {e}") await asyncio.sleep(0.1) except Exception as e: self.log.error(f"Request failed: {str(e)}") # PRESERVED: Original rate limiting with Nautilus async elapsed = time.time() - start_time if i + 50 < len(self.symbols): sleep_time = max(0, rate_limit_interval - elapsed) if sleep_time > 0: await asyncio.sleep(sleep_time) self.log.info(f"DOLPHIN: Candle reconstruction complete:") self.log.info(f" - Symbols processed: {len(self.symbols)}") self.log.info(f" - Opens reconstructed: {len(candle_opens)}") self.log.info("Discovery phase ready for publishing...") return stats, candle_opens def _publish_discovery_results(self): """Publish results using msgbus - Nautilus message bus integration""" try: self.log.info("Nautilus ActorExecutor: Publishing discovery results to other actors...") if hasattr(self, 'msgbus') and self.msgbus: # Publish symbols and candles as tuples self.msgbus.publish(SYMBOLS_DISCOVERED_TOPIC, (self.symbols, int(time.time_ns()))) self.msgbus.publish(CANDLES_INITIAL_TOPIC, (self.active_candles, int(time.time_ns()))) self.msgbus.publish(TICK_SIZES_TOPIC, (self.tick_sizes, int(time.time_ns()))) self.log.info(f"Nautilus ActorExecutor: Published {len(self.symbols)} symbols and {len(self.active_candles)} candles") self.log.info(f"Nautilus ActorExecutor: Published {len(self.tick_sizes)} tick sizes") self.log.info("Nautilus ActorExecutor: Discovery phase complete - other actors can now start processing") else: self.log.warning("Nautilus ActorExecutor: msgbus not available for publishing") except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish discovery results: {e}") class SILOQYMainActor(Actor): def __init__(self, config: SILOQYMainActorConfig): super().__init__(config) # Preserve original configuration self.candle_interval_ms = config.candle_interval_ms self.throttle_mode = config.throttle_mode self.enable_real_data = config.enable_real_data # NEW: Real data capability self.connections = {} self.connection_tasks = {} # Will be populated from discovery actor self.symbols = [] self.active_candles = {} # WebSocket infrastructure - NEW self.binance_adapter = BinanceAdapter() if self.enable_real_data else None self.websocket_connections: Dict[str, SiloqyWebSocketConnection] = {} self.websocket_tasks: Dict[str, asyncio.Task] = {} # WebSocket tasks - managed by Nautilus ActorExecutor self.ws_tasks = [] # Synchronization self.discovery_complete = asyncio.Event() if self.throttle_mode: self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation") if self.enable_real_data: self.log.info("REAL DATA MODE: Will connect to Binance WebSocket streams") else: self.log.info("SIMULATION MODE: Will generate simulated ticks") self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor") def on_start(self) -> None: """Subscribe to discovery events - using Nautilus ActorExecutor""" self.log.info("Nautilus ActorExecutor: SILOQYMainActor starting - subscribing to discovery events") # Subscribe to discovery results if hasattr(self, 'msgbus') and self.msgbus: self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered) self.msgbus.subscribe(CANDLES_INITIAL_TOPIC, self.handle_candles_initial) self.log.info("Nautilus ActorExecutor: Subscribed to discovery topics") # Use Nautilus ActorExecutor for task management if hasattr(self, '_executor') and self._executor: self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization") future = self._executor.submit(self.on_start_async()) self.log.info("Nautilus ActorExecutor: Main actor initialization task submitted") else: self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") asyncio.create_task(self.on_start_async()) def on_stop(self) -> None: """Stop the actor - Nautilus handles executor cleanup""" self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping") # Stop WebSocket connections if running if self.enable_real_data: for conn in self.websocket_connections.values(): asyncio.create_task(conn.stop()) # Nautilus kernel handles executor shutdown - no manual cleanup needed async def on_start_async(self) -> None: try: # Wait for symbol discovery to complete self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...") await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0) # ENHANCED: Start real or simulated connections based on configuration if self.enable_real_data: await self._start_real_websocket_connections() else: await self._start_simulated_connections() self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully") except asyncio.TimeoutError: self.log.error("Nautilus ActorExecutor: Timeout waiting for symbol discovery") except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to start SILOQYMainActor: {e}") def handle_symbols_discovered(self, data): """Handle symbols from discovery actor""" try: symbols, timestamp = data self.symbols = symbols self.log.info(f"Nautilus ActorExecutor: Received {len(symbols)} symbols from discovery actor") if self.symbols and self.active_candles: self.discovery_complete.set() except Exception as e: self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}") def handle_candles_initial(self, data): """Handle initial candles from discovery actor and properly initialize the candle dict.""" try: candles_received, timestamp = data self.log.info(f"Nautilus ActorExecutor: Received {len(candles_received)} initial candles from discovery actor") # FIX: Re-initialize active_candles with the full dictionary structure # STEP 5 FIX: Added rounding to 8 decimals for all price values self.active_candles = {} for symbol, open_price in candles_received.items(): self.active_candles[symbol] = { 'open_price': round(open_price, 8), 'open_time': (int(time.time() * 1000) // self.candle_interval_ms) * self.candle_interval_ms, 'high': round(open_price, 8), 'low': round(open_price, 8), 'close': round(open_price, 8), 'volume': 0.0, } if self.symbols and self.active_candles: self.discovery_complete.set() except Exception as e: self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}") async def _start_real_websocket_connections(self): """NEW: Start real Binance WebSocket connections with throttle control""" self.log.info("Starting REAL Binance WebSocket connections for live data...") # Apply throttle mode limits to WebSocket connections symbols_to_use = self.symbols if self.throttle_mode: symbols_to_use = self.symbols[:10] # Limit to 10 symbols in throttle mode self.log.warning(f"THROTTLE MODE: Limited real data to {len(symbols_to_use)} symbols") # Calculate optimal symbol distribution symbol_groups = self.binance_adapter.calculate_optimal_distribution(symbols_to_use) self.log.info(f"Real WebSocket distribution:") self.log.info(f" - Total symbols: {len(symbols_to_use)}") self.log.info(f" - Connections needed: {len(symbol_groups)}") # Start WebSocket connections for i, symbol_group in enumerate(symbol_groups): connection_id = f"binance_real_{i:03d}" connection = SiloqyWebSocketConnection( connection_id=connection_id, symbols=symbol_group, binance_adapter=self.binance_adapter, main_actor_ref=self ) self.websocket_connections[connection_id] = connection # Use Nautilus ActorExecutor for WebSocket task management if hasattr(self, '_executor') and self._executor: future = self._executor.submit(connection.start()) self.log.info(f"Real WebSocket {connection_id} submitted to ActorExecutor") else: task = asyncio.create_task(connection.start()) self.websocket_tasks[connection_id] = task self.log.warning(f"Real WebSocket {connection_id} started with asyncio fallback") # Rate limit connection establishment if i < len(symbol_groups) - 1: await asyncio.sleep(1.0) # 1 second between connections self.log.info(f"Started {len(symbol_groups)} real WebSocket connections") async def _start_simulated_connections(self): """PRESERVED: Original WebSocket simulation logic with Nautilus ActorExecutor""" self.log.info("Starting WebSocket simulation for tick generation...") self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols") # Use Nautilus ActorExecutor for WebSocket task management if hasattr(self, '_executor') and self._executor: self.log.info("Nautilus ActorExecutor: Using registered executor for WebSocket simulation") future = self._executor.submit(self._simulate_websocket_ticks()) self.log.info("Nautilus ActorExecutor: WebSocket simulation task submitted") else: self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") task = asyncio.create_task(self._simulate_websocket_ticks()) self.ws_tasks.append(task) async def _simulate_websocket_ticks(self): """PRESERVED: Original simulation logic""" # Adjust tick rate for throttle mode tick_delay = 0.1 if self.throttle_mode else 0.01 symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols)) tick_count = 0 try: if self.throttle_mode: self.log.warning(f"THROTTLE MODE: Reduced tick generation - {symbols_to_simulate} symbols at 10 ticks/second") else: self.log.info("WebSocket tick simulation started - generating 100 ticks/second") while True: for symbol in self.symbols[:symbols_to_simulate]: # STEP 2 FIX: Round price to 8 decimals, use float for quantity with 8 decimals price = round(100.0 + random.gauss(0, 0.5), 8) quantity = round(random.uniform(0.00000001, 100.0), 8) timestamp = int(time.time() * 1000) self.on_websocket_tick(symbol, price, quantity, timestamp) tick_count += 1 # Log every 500 ticks in throttle mode, 1000 in normal mode log_interval = 500 if self.throttle_mode else 1000 if tick_count % log_interval == 0: mode_indicator = "THROTTLE" if self.throttle_mode else "" data_source = "REAL" if self.enable_real_data else "SIM" self.log.info(f"Generated {tick_count} ticks {mode_indicator} {data_source} - latest: {symbol}@{price:.2f}") await asyncio.sleep(tick_delay) except asyncio.CancelledError: self.log.info(f"Nautilus ActorExecutor: WebSocket simulation stopped after {tick_count} ticks") except Exception as e: self.log.error(f"Nautilus ActorExecutor: WebSocket simulation error: {e}") def on_websocket_tick(self, symbol: str, price: float, quantity: float, timestamp: int): """ PRESERVED: Original tick processing and candle update logic """ # PRESERVED: Original candle update logic if symbol not in self.active_candles: candle_open_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms # STEP 4 FIX: Round all prices to 8 decimals self.active_candles[symbol] = { 'open_price': round(price, 8), 'open_time': candle_open_time, 'high': round(price, 8), 'low': round(price, 8), 'close': round(price, 8), 'volume': 0.0 } candle = self.active_candles[symbol] # PRESERVED: Original candle rolling logic current_candle_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms if current_candle_time > candle['open_time']: # STEP 4 FIX: Round all prices to 8 decimals candle['open_price'] = round(price, 8) candle['open_time'] = current_candle_time candle['high'] = round(price, 8) candle['low'] = round(price, 8) candle['close'] = round(price, 8) candle['volume'] = quantity else: candle['close'] = round(price, 8) candle['high'] = round(max(candle['high'], price), 8) candle['low'] = round(min(candle['low'], price), 8) candle['volume'] += quantity # PRESERVED: Original high-performance tuple publishing try: if hasattr(self, 'msgbus') and self.msgbus: # STEP 3 FIX: Round all float values to 8 decimals in the tuple tick_tuple = ( symbol, round(float(price), 8), round(float(quantity), 8), int(timestamp), round(float(candle['open_price']), 8), int(candle['open_time']) ) self.msgbus.publish(RAW_TOPIC, tick_tuple) except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish tick: {e}") class DOLPHINRegimeActor(Actor): def __init__(self, config: DOLPHINRegimeActorConfig): super().__init__(config) # PRESERVED: All original DOLPHIN configuration self.max_symbols = config.max_symbols self.ticks_per_analysis = config.ticks_per_analysis # PRESERVED: All original pre-allocated arrays for zero allocation self.open_prices = np.zeros(self.max_symbols, dtype=np.float64) self.close_prices = np.zeros(self.max_symbols, dtype=np.float64) self.high_prices = np.zeros(self.max_symbols, dtype=np.float64) self.low_prices = np.zeros(self.max_symbols, dtype=np.float64) self.volumes = np.zeros(self.max_symbols, dtype=np.float64) self.last_update = np.zeros(self.max_symbols, dtype=np.int64) self.tick_sizes = np.full(self.max_symbols, 1e-8, dtype=np.float64) # Default fallback # PRESERVED: All original mapping and state self.symbol_to_idx = {} self.idx_to_symbol = {} self.active_symbols = 0 self.tick_count = 0 # PRESERVED: All original DOLPHIN thresholds - EXACT self.bull_threshold = 0.60 # 60% bullish self.bear_threshold = 0.55 # 55% bearish self.previous_bull_ratio = None self.regime_history = deque(maxlen=100) # Metrics self.last_metric_log = time.time_ns() self.processed_ticks = 0 self.regime_calculations = 0 # NEW: Enhanced indicator tracking for BB and temporal patterns self.signal_history = deque(maxlen=100) # For BB calculations self.bb_period = 20 # BB calculation period self.bb_std_dev = 2.0 # BB standard deviation multiplier self.velocity_history = deque(maxlen=10) # For regime velocity tracking self.confidence_history = deque(maxlen=20) # For confidence trend analysis self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, " f"ticks_per_analysis: {self.ticks_per_analysis}") def on_start(self) -> None: """Subscribe to tick events - using Nautilus ActorExecutor""" self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to events") if hasattr(self, 'msgbus') and self.msgbus: self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered) self.msgbus.subscribe(TICK_SIZES_TOPIC, self.handle_tick_sizes) self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events, symbol discovery and tick sizes") def on_stop(self) -> None: """Stop the actor - Nautilus handles executor cleanup""" self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping") # Nautilus kernel handles executor shutdown - no manual cleanup needed def handle_symbols_discovered(self, data): """Pre-initialize symbol mappings during discovery phase""" try: symbols, timestamp = data self.log.info(f"Nautilus ActorExecutor: Pre-initializing {len(symbols)} symbol mappings") for symbol in symbols: if self.active_symbols >= self.max_symbols: self.log.warning(f"Max symbols ({self.max_symbols}) exceeded during initialization") break idx = self.active_symbols self.symbol_to_idx[symbol] = idx self.idx_to_symbol[idx] = symbol self.active_symbols += 1 self.log.info(f"Pre-initialized {self.active_symbols} symbol mappings") except Exception as e: self.log.error(f"Error pre-initializing symbols: {e}") def handle_tick_sizes(self, data): """Apply tick sizes to pre-initialized symbol mappings""" try: tick_sizes, timestamp = data self.log.info(f"Nautilus ActorExecutor: Received {len(tick_sizes)} tick sizes from discovery actor") applied_count = 0 for symbol, tick_size in tick_sizes.items(): if symbol in self.symbol_to_idx: # Will exist from pre-initialization idx = self.symbol_to_idx[symbol] if 0 < tick_size <= 1.0: self.tick_sizes[idx] = tick_size applied_count += 1 else: self.log.warning(f"Invalid tick size {tick_size} for {symbol}, using fallback") self.log.info(f"Nautilus ActorExecutor: Applied {applied_count} tick sizes to pre-initialized symbols") except Exception as e: self.log.error(f"Nautilus ActorExecutor: Error handling tick sizes: {e}") def handle_raw_tick(self, data): """ SIMPLIFIED: Zero-allocation tick processing with pre-initialized symbols Using Nautilus ActorExecutor for regime detection tasks """ try: symbol, price, size, ts_event, open_price, candle_open_time = data except Exception as e: self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}") return # Direct lookup - symbol will exist from pre-initialization if symbol not in self.symbol_to_idx: self.log.error(f"Nautilus ActorExecutor: Symbol {symbol} not found in pre-initialized mappings") return idx = self.symbol_to_idx[symbol] # Check if new candle period if candle_open_time > self.last_update[idx]: # Reset for new candle self.open_prices[idx] = open_price self.high_prices[idx] = price self.low_prices[idx] = price self.close_prices[idx] = price self.volumes[idx] = size self.last_update[idx] = candle_open_time else: # Update existing candle data self.close_prices[idx] = price self.high_prices[idx] = max(self.high_prices[idx], price) self.low_prices[idx] = min(self.low_prices[idx], price) self.volumes[idx] += size self.tick_count += 1 self.processed_ticks += 1 # PRESERVED: Original trigger logic if self.tick_count >= self.ticks_per_analysis: # Use Nautilus ActorExecutor for regime detection if hasattr(self, '_executor') and self._executor: future = self._executor.submit(self._run_regime_detection_async()) else: # Fallback to sync detection self._run_regime_detection() self.tick_count = 0 # Periodic metrics logging now = time.time_ns() if now - self.last_metric_log > 1_000_000_000: self.log.info(f"Nautilus ActorExecutor: DOLPHIN metrics - ticks: {self.processed_ticks}, " f"regime_calcs: {self.regime_calculations}, active_symbols: {self.active_symbols}") self.last_metric_log = now async def _run_regime_detection_async(self): try: self._run_regime_detection() except Exception as e: self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}") def _calculate_enhanced_indicators(self, bull_ratio, bear_ratio, confidence, analyzed, total_symbols): """NEW: Calculate enhanced indicators including BB metrics and temporal patterns""" # Calculate regime momentum signal base_momentum = (bull_ratio - bear_ratio) * 100 # -100 to +100 sample_quality = min(analyzed / total_symbols, 1.0) if total_symbols > 0 else 0.0 signal = base_momentum * confidence * sample_quality # Add to signal history self.signal_history.append(signal) # Calculate velocity (rate of change in signal) velocity = 0.0 if len(self.signal_history) >= 2: velocity = self.signal_history[-1] - self.signal_history[-2] self.velocity_history.append(velocity) # Store confidence for trending self.confidence_history.append(confidence) # Calculate Bollinger Bands if enough history bb_metrics = {} if len(self.signal_history) >= self.bb_period: recent_signals = list(self.signal_history)[-self.bb_period:] sma = sum(recent_signals) / len(recent_signals) # Calculate standard deviation variance = sum((x - sma) ** 2 for x in recent_signals) / len(recent_signals) std_dev = variance ** 0.5 upper_band = sma + (self.bb_std_dev * std_dev) lower_band = sma - (self.bb_std_dev * std_dev) # Position within BBs (mean reversion interpretation) if signal > upper_band: bb_position = 'ABOVE_UPPER' elif signal < lower_band: bb_position = 'BELOW_LOWER' elif signal >= sma: bb_position = 'UPPER_HALF' else: bb_position = 'LOWER_HALF' # Momentum persistence interpretation if signal > upper_band: momentum_signal = 'STRONG_BULL_BREAKOUT' elif signal < lower_band: momentum_signal = 'STRONG_BEAR_BREAKOUT' elif signal > sma: momentum_signal = 'MILD_BULLISH' else: momentum_signal = 'MILD_BEARISH' bb_metrics = { 'signal': signal, 'sma': sma, 'upper_band': upper_band, 'lower_band': lower_band, 'bb_position': bb_position, 'momentum_signal': momentum_signal, 'bb_ready': True } else: bb_metrics = { 'signal': signal, 'sma': None, 'upper_band': None, 'lower_band': None, 'bb_position': 'INSUFFICIENT_DATA', 'momentum_signal': 'INSUFFICIENT_DATA', 'bb_ready': False } # Calculate temporal patterns temporal_metrics = {} if len(self.velocity_history) >= 3: avg_velocity = sum(self.velocity_history) / len(self.velocity_history) velocity_trend = 'ACCELERATING' if velocity > avg_velocity else 'DECELERATING' else: avg_velocity = velocity velocity_trend = 'BUILDING_HISTORY' if len(self.confidence_history) >= 5: confidence_trend = 'RISING' if confidence > sum(self.confidence_history[-5:-1])/4 else 'FALLING' else: confidence_trend = 'BUILDING_HISTORY' temporal_metrics = { 'velocity': velocity, 'avg_velocity': avg_velocity, 'velocity_trend': velocity_trend, 'confidence_trend': confidence_trend } return bb_metrics, temporal_metrics def _run_regime_detection(self): self.regime_calculations += 1 total_symbols = self.active_symbols if total_symbols == 0: return analyzed = 0 bullish = 0 bearish = 0 # NEW: Track pattern of bullish/bearish symbols for this calculation symbol_pattern = [] # PRESERVED: Original analysis with exact thresholds for idx in range(self.active_symbols): open_price = self.open_prices[idx] close_price = self.close_prices[idx] if open_price == 0: continue analyzed += 1 # NEW: HFT-grade tick-size based comparison tick_size = self.tick_sizes[idx] equality_threshold = tick_size / 2 # Half tick size standard price_diff = abs(close_price - open_price) # Check if prices are effectively equal within tick size tolerance if price_diff <= equality_threshold: # Prices are effectively equal (within tick size tolerance) symbol_pattern.append(f"S{close_price:.8f}={open_price:.8f}") elif close_price > open_price: # Bullish: close > open bullish += 1 # Arrow points to close (larger price) symbol_pattern.append(f"B{open_price:.8f}->{close_price:.8f}") else: # Bearish: close < open bearish += 1 # Arrow points to open (larger price) symbol_pattern.append(f"X{close_price:.8f}<-{open_price:.8f}") if analyzed == 0: return # PRESERVED: Original ratio calculations bull_ratio = bullish / analyzed bear_ratio = bearish / analyzed sideways_ratio = 1.0 - bull_ratio - bear_ratio # MODIFIED: Original DOLPHIN Algorithm - Simple 3-regime detection if bull_ratio >= self.bull_threshold: # 60% bullish regime = MarketRegime.BULL elif bear_ratio >= self.bear_threshold: # 55% bearish regime = MarketRegime.BEAR else: # Everything else is sideways regime = MarketRegime.SIDEWAYS # PRESERVED: Original confidence calculation confidence = self._calculate_confidence(bull_ratio, bear_ratio, analyzed, total_symbols) # NEW: Calculate enhanced indicators bb_metrics, temporal_metrics = self._calculate_enhanced_indicators( bull_ratio, bear_ratio, confidence, analyzed, total_symbols ) self.previous_bull_ratio = bull_ratio # Publish regime result using Nautilus message bus try: if hasattr(self, 'msgbus') and self.msgbus: regime_tuple = ( int(time.time() * 1000), regime.value, float(bull_ratio), float(bear_ratio), float(sideways_ratio), int(analyzed), int(total_symbols), float(confidence) ) self.msgbus.publish(REGIME_TOPIC, regime_tuple) # NEW: Publish enhanced indicators to data bus indicator_data = { 'timestamp': int(time.time() * 1000), 'regime_momentum_signal': bb_metrics['signal'], 'bb_ready': bb_metrics['bb_ready'], 'velocity': temporal_metrics['velocity'], 'velocity_trend': temporal_metrics['velocity_trend'], 'confidence_trend': temporal_metrics['confidence_trend'] } self.msgbus.publish(REGIME_INDICATORS_TOPIC, indicator_data) # Publish BB metrics separately for specialized consumers if bb_metrics['bb_ready']: bb_data = { 'timestamp': int(time.time() * 1000), 'signal': bb_metrics['signal'], 'sma': bb_metrics['sma'], 'upper_band': bb_metrics['upper_band'], 'lower_band': bb_metrics['lower_band'], 'bb_position': bb_metrics['bb_position'], 'momentum_signal': bb_metrics['momentum_signal'] } self.msgbus.publish(BB_METRICS_TOPIC, bb_data) # Publish temporal patterns temporal_data = { 'timestamp': int(time.time() * 1000), 'velocity': temporal_metrics['velocity'], 'avg_velocity': temporal_metrics['avg_velocity'], 'velocity_trend': temporal_metrics['velocity_trend'], 'confidence_trend': temporal_metrics['confidence_trend'], 'signal_history_length': len(self.signal_history) } self.msgbus.publish(TEMPORAL_PATTERNS_TOPIC, temporal_data) except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}") # Log regime changes if not self.regime_history or regime != self.regime_history[-1]: self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.2%} " f"Bear: {bear_ratio:.2%} Sideways: {sideways_ratio:.2%} ({bullish}/{bearish}) | " f"Confidence: {confidence:.2%} | Analyzed: {analyzed}/{total_symbols}") self.regime_history.append(regime) # Periodic regime status (even without changes) if self.regime_calculations % 50 == 0: # Every second, approx, given avg. tick rate # Determine color based on regime if regime == MarketRegime.BULL: color_code = "\033[92m" # Green elif regime == MarketRegime.BEAR: color_code = "\033[91m" # Red else: # SIDEWAYS color_code = "\033[93m" # Yellow # Reset color code reset_code = "\033[0m" self.log.info(f"{color_code}REGIME STATUS: {regime.value} | Bull: {bull_ratio:.2%} " f"Bear: {bear_ratio:.2%} ({bullish}/{bearish}) | Processed: {self.processed_ticks} ticks{reset_code}") # NEW: Enhanced indicator line after regime status if bb_metrics['bb_ready']: self.log.info(f"{color_code}INDICATORS: Signal:{bb_metrics['signal']:.1f} | " f"SMA:{bb_metrics['sma']:.1f} | Upper:{bb_metrics['upper_band']:.1f} | " f"Lower:{bb_metrics['lower_band']:.1f} | Pos:{bb_metrics['bb_position']} | " f"Mom:{bb_metrics['momentum_signal']} | Vel:{temporal_metrics['velocity']:.1f} | " f"VelTrend:{temporal_metrics['velocity_trend']} | ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}") else: self.log.info(f"{color_code}INDICATORS: Signal:{bb_metrics['signal']:.1f} | " f"Status:BUILDING_BB_HISTORY ({len(self.signal_history)}/{self.bb_period}) | " f"Vel:{temporal_metrics['velocity']:.1f} | VelTrend:{temporal_metrics['velocity_trend']} | " f"ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}") # NEW: Log symbol pattern and counts if symbol_pattern: # Only if we have symbols to show pattern_str = " ".join(symbol_pattern) + " " # Create pattern with spaces bull_count = sum(1 for s in symbol_pattern if s.startswith("B")) bear_count = sum(1 for s in symbol_pattern if s.startswith("X")) self.log.debug(f"{pattern_str} and totals: BULLS:{bull_count}/BEARS:{bear_count}") def _calculate_confidence(self, bull_ratio: float, bear_ratio: float, analyzed: int, total: int) -> float: """PRESERVED EXACTLY: Original DOLPHIN confidence calculation""" if analyzed == 0: return 0.0 # Market Decisiveness max_ratio = max(bull_ratio, bear_ratio) decisiveness = abs(max_ratio - 0.5) * 2 # Sample Coverage coverage = analyzed / total # Statistical Significance if max_ratio > 0 and max_ratio < 1: standard_error = math.sqrt(max_ratio * (1 - max_ratio) / analyzed) else: standard_error = 0.0 z_score = abs(max_ratio - 0.5) / max(standard_error, 0.001) statistical_confidence = min(z_score / 3.0, 1.0) # Market Clarity market_clarity = bull_ratio + bear_ratio # Weighted combination confidence = ( decisiveness * 0.40 + coverage * 0.10 + statistical_confidence * 0.30 + market_clarity * 0.20 ) return max(0.0, min(confidence, 1.0)) class SILOQYNormalizerActor(Actor): def __init__(self, config: SILOQYNormalizerConfig): super().__init__(config) self.normalized_count = 0 self.last_metric_log = time.time_ns() self.log.info("SILOQYNormalizerActor initialized with Nautilus ActorExecutor") def on_start(self) -> None: """Subscribe to tick events - using Nautilus ActorExecutor""" self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor starting - subscribing to tick events") if hasattr(self, 'msgbus') and self.msgbus: self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events") def on_stop(self) -> None: """Stop the actor - Nautilus handles executor cleanup""" self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor stopping") # Nautilus kernel handles executor shutdown - no manual cleanup needed def handle_raw_tick(self, data): try: symbol, price, size, ts_event, open_price, candle_open_time = data tick = SiloqyTradeTick( instrument_id=symbol, price=round(float(price), 8), size=round(float(size), 8), ts_event=int(ts_event), ts_init=int(time.time_ns()), open_price=round(float(open_price), 8) if open_price is not None else round(float(price), 8), candle_open_time=int(candle_open_time) if candle_open_time is not None else int(time.time() * 1000), tick_side=None, # Default to None unless Binance provides this info exchange="BINANCE", # We know this is from Binance liquidity_flag=None # Default to None unless Binance provides this info ) if hasattr(self, 'msgbus') and self.msgbus: try: # Publish directly to STRUCTURED_TOPIC to match original v3.28 behavior self.msgbus.publish(STRUCTURED_TOPIC, tick) except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish to STRUCTURED_TOPIC: {e}") # FALLBACK: Try Nautilus publish_data if msgbus not available elif hasattr(self, 'publish_data'): try: # Use standard Nautilus publishing as fallback self.publish_data(DataType(SiloqyTradeTick), tick) except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish structured data: {e}") self.normalized_count += 1 # Periodic metrics now = time.time_ns() if now - self.last_metric_log > 1_000_000_000: self.log.info(f"Nautilus ActorExecutor: Normalizer processed: {self.normalized_count} ticks") self.last_metric_log = now except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to handle raw tick: {e}") def test_siloqy_actors_with_nautilus_process_management(): """Test SILOQY actors with Nautilus built-in ActorExecutor process management""" print("SILOQY Multi-Actor Test with Nautilus Built-in Process Management") # Configuration following test pattern - THROTTLE MODE ENABLED FOR DEV/TESTING symbol_discovery_config = ImportableActorConfig( actor_path="__main__:SILOQYSymbolDiscoveryActor", config_path="__main__:SILOQYSymbolDiscoveryConfig", config={ "component_id": "SILOQY-SYMBOL-DISCOVERY", "symbols": [], # Empty for dynamic discovery "candle_interval_ms": 15 * 60 * 1000, "throttle_mode": True, # ENABLED: Safe for dual instance testing "throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s) "max_symbols_throttled": 414 # Only 100 symbols (vs 2000+) } ) main_actor_config = ImportableActorConfig( actor_path="__main__:SILOQYMainActor", config_path="__main__:SILOQYMainActorConfig", config={ "component_id": "SILOQY-MAIN-ACTOR", "candle_interval_ms": 15 * 60 * 1000, "throttle_mode": False, # ENABLED: Reduced tick generation "enable_real_data": True # CHANGE TO True for real WebSocket data } ) regime_actor_config = ImportableActorConfig( actor_path="__main__:DOLPHINRegimeActor", config_path="__main__:DOLPHINRegimeActorConfig", config={ "component_id": "DOLPHIN-REGIME-ACTOR", "max_symbols": 5000, "ticks_per_analysis": 2 # Reduced for throttle mode testing } ) normalizer_config = ImportableActorConfig( actor_path="__main__:SILOQYNormalizerActor", config_path="__main__:SILOQYNormalizerConfig", config={ "component_id": "SILOQY-NORMALIZER" } ) # Trading node configuration - Uses Nautilus built-in process management trading_config = TradingNodeConfig( trader_id=TraderId("SILOQY-TRADER-001"), actors=[ symbol_discovery_config, main_actor_config, regime_actor_config, normalizer_config ], data_clients={}, # No external data clients needed exec_clients={} # No execution clients for this test ) node = TradingNode(config=trading_config) try: node.build() print("Node built successfully with Nautilus built-in process management") node.run() except KeyboardInterrupt: print("\nSILOQY Actor test with Nautilus process management completed!") except Exception as e: print(f"Error: {e}") import traceback traceback.print_exc() finally: try: node.dispose() print("Nautilus process management: Node disposed successfully") except: pass if __name__ == "__main__": test_siloqy_actors_with_nautilus_process_management()