From 9281ad0da8cb2e93ff2d5f114634675579f715cc Mon Sep 17 00:00:00 2001 From: HJ Normey Date: Sat, 6 Sep 2025 11:49:30 +0200 Subject: [PATCH] main: committing new 'starting' point, after continued (fixed) 'slice' error: Nautilus wasn't taking well to SiloqyTrade|QuoteTick data-class, trying to slice. Back to tuplets --- nautilus_actor_test_implementation_6x.py | 1341 ++++++++++++++++++++++ 1 file changed, 1341 insertions(+) create mode 100644 nautilus_actor_test_implementation_6x.py diff --git a/nautilus_actor_test_implementation_6x.py b/nautilus_actor_test_implementation_6x.py new file mode 100644 index 0000000..529efac --- /dev/null +++ b/nautilus_actor_test_implementation_6x.py @@ -0,0 +1,1341 @@ +import time +import numpy as np +import asyncio +import json +import math +import httpx +import websockets +from typing import Dict, List, Optional, Tuple, Any +from enum import Enum +from collections import deque +import random # Added for _simulate_websocket_ticks +from dataclasses import dataclass, field + +# Nautilus imports - following test pattern +from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig +from nautilus_trader.live.node import TradingNode +from nautilus_trader.common.actor import Actor +from nautilus_trader.common.config import ActorConfig +from nautilus_trader.model.identifiers import TraderId +from nautilus_trader.core.data import Data +from nautilus_trader.common.component import Logger, init_logging + +# NEW: construct a fresh Tick for each normalized message (don't mutate immutable Data) +from nautilus_trader.model.data import TradeTick +from nautilus_trader.model.identifiers import InstrumentId, TradeId +from nautilus_trader.model.objects import Price, Quantity +from nautilus_trader.model.enums import AggressorSide + +# Initialize logging +_log_guard = init_logging() +_logger = Logger("SILOQY") + +# Topics - HIGH PERFORMANCE STRING TOPICS +RAW_TOPIC = "SILOQY.RAW.TICKS" +STRUCTURED_TOPIC = "SILOQY.STRUCTURED.TICKS" +REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS" +SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED" +CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL" +# ADDED LINE 18: +TICK_SIZES_TOPIC = "SILOQY.TICK.SIZES" + +# Rate limiting constant +MIN_INTERVAL = 2.5 # seconds between API batches + +# Market Regime Enum +class MarketRegime(Enum): + BULL = "BULL" + BEAR = "BEAR" + TRANSITION = "TRANSITION" + SIDEWAYS = "SIDEWAYS" + +# ============================================================================ +# EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE +# ============================================================================ + +@dataclass +class TickData: + """Universal tick data structure from standalone""" + symbol: str + price: float + quantity: float + timestamp: int + is_buyer_maker: bool + trade_id: int + exchange: str + raw_data: Dict = None + + def __post_init__(self): + if self.raw_data is None: + self.raw_data = {} + +@dataclass +class ExchangeConfig: + """Exchange-specific configuration from standalone""" + name: str + websocket_limit_per_second: int + max_streams_per_connection: int + max_connections_per_window: int + connection_window_seconds: int + base_websocket_url: str + requires_auth: bool = False + ping_interval: float = 20.0 + ping_timeout: float = 10.0 + +class BinanceAdapter: + """EXTRACTED FROM STANDALONE: Binance-specific WebSocket adapter""" + + def __init__(self): + self.config = ExchangeConfig( + name="binance", + websocket_limit_per_second=10, + max_streams_per_connection=1024, + max_connections_per_window=300, + connection_window_seconds=300, # 5 minutes + base_websocket_url="wss://stream.binance.com:9443", + requires_auth=False + ) + self.logger = Logger(f"BINANCE_ADAPTER") + + def get_websocket_url(self, symbols: List[str]) -> str: + """Build Binance WebSocket URL with combined streams""" + if len(symbols) == 1: + symbol = symbols[0].lower() + return f"{self.config.base_websocket_url}/ws/{symbol}@trade" + else: + # Combined stream approach + streams = [f"{symbol.lower()}@trade" for symbol in symbols] + stream_string = "/".join(streams) + return f"{self.config.base_websocket_url}/stream?streams={stream_string}" + + def parse_message(self, message: str) -> Optional[TickData]: + """EXTRACTED FROM STANDALONE: Parse Binance trade message""" + try: + data = json.loads(message) + + # Handle combined stream format + if 'stream' in data and 'data' in data: + stream_name = data['stream'] + trade_data = data['data'] + else: + # Single stream format + trade_data = data + stream_name = trade_data.get('e', '') + + # Only process trade events + if not (stream_name.endswith('@trade') or trade_data.get('e') == 'trade'): + return None + + return TickData( + symbol=trade_data['s'], + price=float(trade_data['p']), + quantity=float(trade_data['q']), + timestamp=int(trade_data['T']), + is_buyer_maker=trade_data['m'], + trade_id=int(trade_data['t']), + exchange="binance", + raw_data=trade_data + ) + + except (json.JSONDecodeError, KeyError, ValueError) as e: + self.logger.error(f"Failed to parse Binance message: {e}") + return None + + def calculate_optimal_distribution(self, symbols: List[str]) -> List[List[str]]: + """EXTRACTED FROM STANDALONE: Calculate optimal symbol distribution across connections""" + # Conservative: leave room for message bursts + safety_margin = 0.7 + symbols_per_connection = int( + min( + self.config.max_streams_per_connection * safety_margin, + self.config.websocket_limit_per_second * 5 # 5-second buffer + ) + ) + + # Group symbols into chunks + symbol_groups = [] + for i in range(0, len(symbols), symbols_per_connection): + group = symbols[i:i + symbols_per_connection] + symbol_groups.append(group) + + return symbol_groups + +class SiloqyWebSocketConnection: + """EXTRACTED FROM STANDALONE: WebSocket connection adapted for Nautilus msgbus""" + + def __init__(self, connection_id: str, symbols: List[str], + binance_adapter: BinanceAdapter, main_actor_ref): + self.connection_id = connection_id + self.symbols = symbols + self.adapter = binance_adapter + self.main_actor = main_actor_ref # Reference to SILOQYMainActor for msgbus access + + self.websocket = None + self.is_running = False + self.retry_count = 0 + self.messages_received = 0 + self.last_message_time = 0.0 + self.connected_at = None + + self.logger = Logger(f"WS_{connection_id}") + self.logger.info(f"WebSocket connection initialized for {len(symbols)} symbols") + + async def start(self) -> None: + """Start the WebSocket connection""" + self.is_running = True + + while self.is_running: + try: + await self._connect_and_run() + except Exception as e: + self.logger.error(f"Connection {self.connection_id} error: {e}") + + if self.is_running: + await self._handle_reconnection() + else: + break + + async def _connect_and_run(self) -> None: + """EXTRACTED FROM STANDALONE: Connect to WebSocket and run message loop""" + url = self.adapter.get_websocket_url(self.symbols) + + self.logger.info(f"Connecting to Binance: {url[:100]}...") + + async with websockets.connect( + url, + ping_interval=self.adapter.config.ping_interval, + ping_timeout=self.adapter.config.ping_timeout, + close_timeout=10 + ) as websocket: + self.websocket = websocket + self.connected_at = time.time() + self.retry_count = 0 + + self.logger.info(f"Connection {self.connection_id} established") + + # Message processing loop + async for message in websocket: + if not self.is_running: + break + + try: + await self._process_message(message) + except Exception as e: + self.logger.error(f"Message processing error: {e}") + + async def _process_message(self, message: str) -> None: + """Process incoming WebSocket message and convert to Nautilus format""" + self.messages_received += 1 + self.last_message_time = time.time() + + # Parse message using Binance adapter + tick_data = self.adapter.parse_message(message) + if tick_data: + # Convert TickData to SiloqyTradeTick and publish via Nautilus msgbus + try: + # Create SiloqyTradeTick object + siloqy_tick = SiloqyTradeTick( + instrument_id=tick_data.symbol, + price=round(tick_data.price, 8), + size=round(tick_data.quantity, 8), + ts_event=tick_data.timestamp * 1000000, # Convert ms to ns + ts_init=int(time.time_ns()), + open_price=None, # Will be set by candle logic + candle_open_time=None, # Will be set by candle logic + tick_side=None, + exchange="BINANCE", + liquidity_flag=None + ) + + # Get candle information from main actor + if hasattr(self.main_actor, 'active_candles') and self.main_actor.active_candles: + symbol = tick_data.symbol + if symbol in self.main_actor.active_candles: + candle = self.main_actor.active_candles[symbol] + siloqy_tick.open_price = candle.get('open_price', tick_data.price) + siloqy_tick.candle_open_time = candle.get('open_time', int(time.time() * 1000)) + + # Process through main actor's tick handling (preserves all candle logic) + self.main_actor.on_websocket_tick( + tick_data.symbol, + tick_data.price, + tick_data.quantity, + tick_data.timestamp + ) + + except Exception as e: + self.logger.error(f"Failed to process tick: {e}") + + async def _handle_reconnection(self) -> None: + """Handle reconnection with exponential backoff""" + self.retry_count += 1 + delay = min(2 ** self.retry_count, 60) # Max 60 seconds + + self.logger.warning(f"Reconnecting in {delay}s (attempt {self.retry_count})") + await asyncio.sleep(delay) + + async def stop(self) -> None: + """Stop the connection""" + self.logger.info(f"Stopping connection {self.connection_id}") + self.is_running = False + + if self.websocket: + await self.websocket.close() + +# -------------------------------------------------------------------- +# SILOQY Custom Tick - PRESERVED with fixes +# -------------------------------------------------------------------- +class SiloqyTradeTick(TradeTick): + def __init__(self, instrument_id: str, price: float, size: float, ts_event: int, ts_init: int = None, + open_price: float = None, candle_open_time: int = None, tick_side: str = None, + exchange: str = None, liquidity_flag = None): + # Convert to proper Nautilus types - add venue to symbol + if isinstance(instrument_id, str): + # Add BINANCE venue if not already present + if '.' not in instrument_id: + instrument_id = f"{instrument_id}.BINANCE" + instr_id = InstrumentId.from_str(instrument_id) + else: + instr_id = instrument_id + # STEP 1 FIX: Changed precision from 4 to 8 for price, and 0 to 8 for size + price_obj = Price(price, precision=8) if not isinstance(price, Price) else price + size_obj = Quantity(size, precision=8) if not isinstance(size, Quantity) else size + + # Default aggressor side and trade ID + aggressor_side = AggressorSide.NO_AGGRESSOR # Default since Binance doesn't always provide this + trade_id = TradeId(f"T{int(time.time_ns())}") # Generate a trade ID + + super().__init__( + instrument_id=instr_id, + price=price_obj, + size=size_obj, + aggressor_side=aggressor_side, + trade_id=trade_id, + ts_event=int(ts_event), + ts_init=int(ts_init if ts_init is not None else time.time_ns()) + ) + + # Additional SILOQY fields + self.open_price = open_price if open_price is not None else price + self.candle_open_time = candle_open_time if candle_open_time is not None else int(time.time() * 1000) + self.tick_side = tick_side + self.exchange = exchange + self.liquidity_flag = liquidity_flag + +class SILOQYSymbolDiscoveryConfig(ActorConfig): + symbols: List[str] = [] + candle_interval_ms: int = 15 * 60 * 1000 + throttle_mode: bool = False + throttle_rate_limit_seconds: float = 10.0 + max_symbols_throttled: int = 100 + +class SILOQYMainActorConfig(ActorConfig): + candle_interval_ms: int = 15 * 60 * 1000 + throttle_mode: bool = False + enable_real_data: bool = False # NEW: Enable real WebSocket data + +class DOLPHINRegimeActorConfig(ActorConfig): + max_symbols: int = 5000 + ticks_per_analysis: int = 10 + +class SILOQYNormalizerConfig(ActorConfig): + pass + +class SILOQYSymbolDiscoveryActor(Actor): + """ + Symbol discovery with all original SILOQY algorithms preserved + Using Nautilus built-in ActorExecutor for task management + """ + def __init__(self, config: SILOQYSymbolDiscoveryConfig): + super().__init__(config) + + # Preserve original SILOQY configuration + self.symbols = list(config.symbols) if config.symbols else [] + self.candle_interval_ms = config.candle_interval_ms + self.active_candles = {} + self.tick_sizes = {} + + # Process management configuration + self.throttle_mode = config.throttle_mode + self.throttle_rate_limit_seconds = config.throttle_rate_limit_seconds + self.max_symbols_throttled = config.max_symbols_throttled + + if self.throttle_mode: + self.log.warning("THROTTLE MODE ENABLED - Running in dev/test configuration") + self.log.warning(f"Rate limit: {self.throttle_rate_limit_seconds}s between batches") + self.log.warning(f"Symbol limit: {self.max_symbols_throttled} symbols max") + + self.log.info("SILOQYSymbolDiscoveryActor initialized with Nautilus ActorExecutor") + + def on_start(self) -> None: + """Start the actor - using Nautilus ActorExecutor""" + self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor starting") + + # Use Nautilus ActorExecutor for task management + if hasattr(self, '_executor') and self._executor: + self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization") + future = self._executor.submit(self.on_start_async()) + self.log.info("Nautilus ActorExecutor: Symbol discovery task submitted") + else: + self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") + asyncio.create_task(self.on_start_async()) + + def on_stop(self) -> None: + """Stop the actor - Nautilus handles executor cleanup""" + self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor stopping") + # Nautilus kernel handles executor shutdown - no manual cleanup needed + + async def on_start_async(self) -> None: + self.log.info("Nautilus ActorExecutor: Starting async symbol discovery initialization") + + try: + # PRESERVED: Original symbol discovery algorithm + if not self.symbols: + await self._discover_all_symbols() + + # PRESERVED: Original stats fetching and candle reconstruction + stats, candle_opens = await self._fetch_stats_and_reconstruct_candles() + + # Set active candles from reconstruction results + self.active_candles = candle_opens + + # Publish results using msgbus + self._publish_discovery_results() + + self.log.info("Nautilus ActorExecutor: Symbol discovery completed successfully") + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to complete symbol discovery: {e}") + # Don't re-raise, let system continue + + async def _discover_all_symbols(self): + """PRESERVED: Original Binance symbol discovery algorithm""" + self.log.info("Starting dynamic symbol discovery from Binance...") + url = "https://api.binance.com/api/v3/exchangeInfo" + + async with httpx.AsyncClient() as client: + self.log.info("Fetching exchange info from Binance API...") + response = await client.get(url, timeout=10) + if response.status_code == 200: + self.log.info("Successfully received exchange info") + data = response.json() + + # Combined symbol discovery and tick size extraction + self.log.info("Processing symbols and extracting tick sizes...") + full_symbols = [] + for symbol_info in data['symbols']: + if symbol_info['status'] == 'TRADING' and symbol_info['symbol'].endswith('USDT'): + symbol = symbol_info['symbol'] + full_symbols.append(symbol) + + # Extract tick size while processing # Extract tick size while processing + tick_size = None + for filter_info in symbol_info['filters']: + if filter_info['filterType'] == 'PRICE_FILTER': + tick_size = float(filter_info['tickSize']) + break + + # If no PRICE_FILTER found, try other filter types + if tick_size is None: + for filter_info in symbol_info['filters']: + if filter_info['filterType'] == 'TICK_SIZE': + tick_size = float(filter_info['tickSize']) + break + + # Fallback to default if still not found + if tick_size is None: + tick_size = 1e-8 # Default fallback + self.log.warning(f"No tick size found for {symbol}, using fallback {tick_size}") + + self.tick_sizes[symbol] = tick_size + + self.log.info(f"Processed {len(full_symbols)} symbols, extracted {len(self.tick_sizes)} tick sizes") + + # Apply throttle mode symbol limiting + if self.throttle_mode: + self.symbols = full_symbols[:self.max_symbols_throttled] + self.log.warning(f"THROTTLE MODE: Limited to {len(self.symbols)} symbols (from {len(full_symbols)} available)") + else: + self.symbols = full_symbols + + self.log.info(f"Discovered {len(self.symbols)} trading symbols") + self.log.info(f"First 10 symbols: {self.symbols[:10]}") + else: + self.log.error(f"Failed to fetch exchange info: {response.status_code}") + raise Exception(f"Failed to fetch exchange info: {response.status_code}") + + async def _fetch_stats_and_reconstruct_candles(self): + """PRESERVED: All original rate limiting with Nautilus async patterns""" + url = "https://api.binance.com/api/v3/ticker/24hr" + klines_url = "https://api.binance.com/api/v3/klines" + ticker_url = "https://api.binance.com/api/v3/ticker/price" + + # PRESERVED: Original rate limit handling with throttle mode support + base_rate_limit = 2.5 + rate_limit_interval = self.throttle_rate_limit_seconds if self.throttle_mode else base_rate_limit + + if self.throttle_mode: + self.log.warning(f"THROTTLE MODE: Using {rate_limit_interval}s intervals (vs {base_rate_limit}s normal)") + + stats = {} + candle_opens = {} + + async with httpx.AsyncClient() as client: + total_batches = (len(self.symbols) + 49) // 50 + for i in range(0, len(self.symbols), 50): + batch_num = (i // 50) + 1 + start_time = time.time() + symbols_batch = self.symbols[i:i + 50] + + if self.throttle_mode: + self.log.info(f"THROTTLE MODE: Processing batch {batch_num}/{total_batches} with {rate_limit_interval}s delay") + + # PRESERVED: Original boundary detection logic + current_time = int(time.time() * 1000) + time_into_candle = current_time % self.candle_interval_ms + candle_open_time = current_time - time_into_candle + at_boundary = (time_into_candle < 1000) + + if i == 0: # Log initial state + self.log.info("DOLPHIN: Current candle analysis:") + self.log.info(f" - Current time: {current_time}") + self.log.info(f" - Candle interval: {self.candle_interval_ms}ms ({self.candle_interval_ms//60000}m)") + self.log.info(f" - Time into candle: {time_into_candle}ms ({time_into_candle//1000}s)") + self.log.info(f" - At boundary: {at_boundary}") + self.log.info(f" - Candle open time: {candle_open_time}") + + symbols_json_string = json.dumps(symbols_batch, separators=(',', ':')) + params = {"symbols": symbols_json_string} + + try: + # PRESERVED: Original stats fetching + response = await client.get(url, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + for item in data: + symbol = item['symbol'] + stats[symbol] = { + 'count': int(item['count']), + 'quoteVolume': float(item['quoteVolume']), + } + self.log.info(f"Fetched stats for batch {i//50 + 1}: {len(data)} symbols") + + else: + self.log.error(f"Error {response.status_code}: {response.text}") + + # PRESERVED: Original DOLPHIN candle reconstruction strategy + if at_boundary: + # AT BOUNDARY: Fetch current prices to use as opens + self.log.info(f"DOLPHIN: At boundary - fetching current prices (batch {i//50 + 1})") + + ticker_params = {"symbols": symbols_json_string} + ticker_response = await client.get(ticker_url, params=ticker_params, timeout=5) + + if ticker_response.status_code == 200: + ticker_data = ticker_response.json() + for item in ticker_data: + symbol = item['symbol'] + current_price = float(item['price']) + candle_opens[symbol] = current_price + + self.log.info(f" - Fetched current prices for {len(ticker_data)} symbols") + else: + self.log.warning(f" - Current price fetch failed ({ticker_response.status_code})") + + else: + # NOT AT BOUNDARY: Fetch historical 1s kline data + self.log.info(f"DOLPHIN: Not at boundary - fetching 1s kline data (batch {i//50 + 1})") + + for symbol in symbols_batch: + try: + # PRESERVED: Original kline fetching logic + kline_params = { + 'symbol': symbol, + 'interval': '1s', + 'startTime': candle_open_time, + 'endTime': candle_open_time + 1000, + 'limit': 1 + } + + kline_response = await client.get(klines_url, params=kline_params, timeout=5) + + if kline_response.status_code == 200: + kline_data = kline_response.json() + if kline_data: + open_price = float(kline_data[0][1]) + candle_opens[symbol] = open_price + + if (i + len(symbols_batch)) <= 10: + self.log.info(f" - {symbol}: reconstructed open = {open_price}") + else: + self.log.warning(f" - {symbol}: no 1s kline data found") + else: + self.log.warning(f" - {symbol}: kline fetch failed ({kline_response.status_code})") + + except Exception as e: + self.log.error(f" - {symbol}: kline fetch error: {e}") + + await asyncio.sleep(0.1) + + except Exception as e: + self.log.error(f"Request failed: {str(e)}") + + # PRESERVED: Original rate limiting with Nautilus async + elapsed = time.time() - start_time + if i + 50 < len(self.symbols): + sleep_time = max(0, rate_limit_interval - elapsed) + if sleep_time > 0: + await asyncio.sleep(sleep_time) + + self.log.info(f"DOLPHIN: Candle reconstruction complete:") + self.log.info(f" - Symbols processed: {len(self.symbols)}") + self.log.info(f" - Opens reconstructed: {len(candle_opens)}") + self.log.info("Discovery phase ready for publishing...") + + return stats, candle_opens + + def _publish_discovery_results(self): + """Publish results using msgbus - Nautilus message bus integration""" + try: + self.log.info("Nautilus ActorExecutor: Publishing discovery results to other actors...") + if hasattr(self, 'msgbus') and self.msgbus: + # Publish symbols and candles as tuples + self.msgbus.publish(SYMBOLS_DISCOVERED_TOPIC, (self.symbols, int(time.time_ns()))) + self.msgbus.publish(CANDLES_INITIAL_TOPIC, (self.active_candles, int(time.time_ns()))) + self.msgbus.publish(TICK_SIZES_TOPIC, (self.tick_sizes, int(time.time_ns()))) + + self.log.info(f"Nautilus ActorExecutor: Published {len(self.symbols)} symbols and {len(self.active_candles)} candles") + self.log.info(f"Nautilus ActorExecutor: Published {len(self.tick_sizes)} tick sizes") + self.log.info("Nautilus ActorExecutor: Discovery phase complete - other actors can now start processing") + else: + self.log.warning("Nautilus ActorExecutor: msgbus not available for publishing") + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish discovery results: {e}") + +class SILOQYMainActor(Actor): + def __init__(self, config: SILOQYMainActorConfig): + super().__init__(config) + + # Preserve original configuration + self.candle_interval_ms = config.candle_interval_ms + self.throttle_mode = config.throttle_mode + self.enable_real_data = config.enable_real_data # NEW: Real data capability + self.connections = {} + self.connection_tasks = {} + + # Will be populated from discovery actor + self.symbols = [] + self.active_candles = {} + + # WebSocket infrastructure - NEW + self.binance_adapter = BinanceAdapter() if self.enable_real_data else None + self.websocket_connections: Dict[str, SiloqyWebSocketConnection] = {} + self.websocket_tasks: Dict[str, asyncio.Task] = {} + + # WebSocket tasks - managed by Nautilus ActorExecutor + self.ws_tasks = [] + + # Synchronization + self.discovery_complete = asyncio.Event() + + if self.throttle_mode: + self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation") + + if self.enable_real_data: + self.log.info("REAL DATA MODE: Will connect to Binance WebSocket streams") + else: + self.log.info("SIMULATION MODE: Will generate simulated ticks") + + self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor") + + def on_start(self) -> None: + """Subscribe to discovery events - using Nautilus ActorExecutor""" + self.log.info("Nautilus ActorExecutor: SILOQYMainActor starting - subscribing to discovery events") + + # Subscribe to discovery results + if hasattr(self, 'msgbus') and self.msgbus: + self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered) + self.msgbus.subscribe(CANDLES_INITIAL_TOPIC, self.handle_candles_initial) + self.log.info("Nautilus ActorExecutor: Subscribed to discovery topics") + + # Use Nautilus ActorExecutor for task management + if hasattr(self, '_executor') and self._executor: + self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization") + future = self._executor.submit(self.on_start_async()) + self.log.info("Nautilus ActorExecutor: Main actor initialization task submitted") + else: + self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") + asyncio.create_task(self.on_start_async()) + + def on_stop(self) -> None: + """Stop the actor - Nautilus handles executor cleanup""" + self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping") + + # Stop WebSocket connections if running + if self.enable_real_data: + for conn in self.websocket_connections.values(): + asyncio.create_task(conn.stop()) + + # Nautilus kernel handles executor shutdown - no manual cleanup needed + + async def on_start_async(self) -> None: + try: + # Wait for symbol discovery to complete + self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...") + await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0) + + # ENHANCED: Start real or simulated connections based on configuration + if self.enable_real_data: + await self._start_real_websocket_connections() + else: + await self._start_simulated_connections() + + self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully") + + except asyncio.TimeoutError: + self.log.error("Nautilus ActorExecutor: Timeout waiting for symbol discovery") + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to start SILOQYMainActor: {e}") + + def handle_symbols_discovered(self, data): + """Handle symbols from discovery actor""" + try: + symbols, timestamp = data + self.symbols = symbols + self.log.info(f"Nautilus ActorExecutor: Received {len(symbols)} symbols from discovery actor") + + if self.symbols and self.active_candles: + self.discovery_complete.set() + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}") + + def handle_candles_initial(self, data): + """Handle initial candles from discovery actor and properly initialize the candle dict.""" + try: + candles_received, timestamp = data + self.log.info(f"Nautilus ActorExecutor: Received {len(candles_received)} initial candles from discovery actor") + + # FIX: Re-initialize active_candles with the full dictionary structure + # STEP 5 FIX: Added rounding to 8 decimals for all price values + self.active_candles = {} + for symbol, open_price in candles_received.items(): + self.active_candles[symbol] = { + 'open_price': round(open_price, 8), + 'open_time': (int(time.time() * 1000) // self.candle_interval_ms) * self.candle_interval_ms, + 'high': round(open_price, 8), + 'low': round(open_price, 8), + 'close': round(open_price, 8), + 'volume': 0.0, + } + + if self.symbols and self.active_candles: + self.discovery_complete.set() + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}") + + async def _start_real_websocket_connections(self): + """NEW: Start real Binance WebSocket connections with throttle control""" + self.log.info("Starting REAL Binance WebSocket connections for live data...") + + # Apply throttle mode limits to WebSocket connections + symbols_to_use = self.symbols + if self.throttle_mode: + symbols_to_use = self.symbols[:10] # Limit to 10 symbols in throttle mode + self.log.warning(f"THROTTLE MODE: Limited real data to {len(symbols_to_use)} symbols") + + # Calculate optimal symbol distribution + symbol_groups = self.binance_adapter.calculate_optimal_distribution(symbols_to_use) + + self.log.info(f"Real WebSocket distribution:") + self.log.info(f" - Total symbols: {len(symbols_to_use)}") + self.log.info(f" - Connections needed: {len(symbol_groups)}") + + # Start WebSocket connections + for i, symbol_group in enumerate(symbol_groups): + connection_id = f"binance_real_{i:03d}" + + connection = SiloqyWebSocketConnection( + connection_id=connection_id, + symbols=symbol_group, + binance_adapter=self.binance_adapter, + main_actor_ref=self + ) + + self.websocket_connections[connection_id] = connection + + # Use Nautilus ActorExecutor for WebSocket task management + if hasattr(self, '_executor') and self._executor: + future = self._executor.submit(connection.start()) + self.log.info(f"Real WebSocket {connection_id} submitted to ActorExecutor") + else: + task = asyncio.create_task(connection.start()) + self.websocket_tasks[connection_id] = task + self.log.warning(f"Real WebSocket {connection_id} started with asyncio fallback") + + # Rate limit connection establishment + if i < len(symbol_groups) - 1: + await asyncio.sleep(1.0) # 1 second between connections + + self.log.info(f"Started {len(symbol_groups)} real WebSocket connections") + + async def _start_simulated_connections(self): + """PRESERVED: Original WebSocket simulation logic with Nautilus ActorExecutor""" + self.log.info("Starting WebSocket simulation for tick generation...") + self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols") + + # Use Nautilus ActorExecutor for WebSocket task management + if hasattr(self, '_executor') and self._executor: + self.log.info("Nautilus ActorExecutor: Using registered executor for WebSocket simulation") + future = self._executor.submit(self._simulate_websocket_ticks()) + self.log.info("Nautilus ActorExecutor: WebSocket simulation task submitted") + else: + self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") + task = asyncio.create_task(self._simulate_websocket_ticks()) + self.ws_tasks.append(task) + + async def _simulate_websocket_ticks(self): + """PRESERVED: Original simulation logic""" + # Adjust tick rate for throttle mode + tick_delay = 0.1 if self.throttle_mode else 0.01 + symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols)) + + tick_count = 0 + try: + if self.throttle_mode: + self.log.warning(f"THROTTLE MODE: Reduced tick generation - {symbols_to_simulate} symbols at 10 ticks/second") + else: + self.log.info("WebSocket tick simulation started - generating 100 ticks/second") + + while True: + for symbol in self.symbols[:symbols_to_simulate]: + # STEP 2 FIX: Round price to 8 decimals, use float for quantity with 8 decimals + price = round(100.0 + random.gauss(0, 0.5), 8) + quantity = round(random.uniform(0.00000001, 100.0), 8) + timestamp = int(time.time() * 1000) + + self.on_websocket_tick(symbol, price, quantity, timestamp) + tick_count += 1 + + # Log every 500 ticks in throttle mode, 1000 in normal mode + log_interval = 500 if self.throttle_mode else 1000 + if tick_count % log_interval == 0: + mode_indicator = "THROTTLE" if self.throttle_mode else "" + data_source = "REAL" if self.enable_real_data else "SIM" + self.log.info(f"Generated {tick_count} ticks {mode_indicator} {data_source} - latest: {symbol}@{price:.2f}") + + await asyncio.sleep(tick_delay) + except asyncio.CancelledError: + self.log.info(f"Nautilus ActorExecutor: WebSocket simulation stopped after {tick_count} ticks") + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: WebSocket simulation error: {e}") + + def on_websocket_tick(self, symbol: str, price: float, quantity: float, timestamp: int): + """ + PRESERVED: Original tick processing and candle update logic + """ + # PRESERVED: Original candle update logic + if symbol not in self.active_candles: + candle_open_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms + # STEP 4 FIX: Round all prices to 8 decimals + self.active_candles[symbol] = { + 'open_price': round(price, 8), + 'open_time': candle_open_time, + 'high': round(price, 8), + 'low': round(price, 8), + 'close': round(price, 8), + 'volume': 0.0 + } + + candle = self.active_candles[symbol] + + # PRESERVED: Original candle rolling logic + current_candle_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms + if current_candle_time > candle['open_time']: + # STEP 4 FIX: Round all prices to 8 decimals + candle['open_price'] = round(price, 8) + candle['open_time'] = current_candle_time + candle['high'] = round(price, 8) + candle['low'] = round(price, 8) + candle['close'] = round(price, 8) + candle['volume'] = quantity + else: + candle['close'] = round(price, 8) + candle['high'] = round(max(candle['high'], price), 8) + candle['low'] = round(min(candle['low'], price), 8) + candle['volume'] += quantity + + # PRESERVED: Original high-performance tuple publishing + try: + if hasattr(self, 'msgbus') and self.msgbus: + # STEP 3 FIX: Round all float values to 8 decimals in the tuple + tick_tuple = ( + symbol, + round(float(price), 8), + round(float(quantity), 8), + int(timestamp), + round(float(candle['open_price']), 8), + int(candle['open_time']) + ) + + self.msgbus.publish(RAW_TOPIC, tick_tuple) + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish tick: {e}") + +class DOLPHINRegimeActor(Actor): + def __init__(self, config: DOLPHINRegimeActorConfig): + super().__init__(config) + + # PRESERVED: All original DOLPHIN configuration + self.max_symbols = config.max_symbols + self.ticks_per_analysis = config.ticks_per_analysis + + # PRESERVED: All original pre-allocated arrays for zero allocation + self.open_prices = np.zeros(self.max_symbols, dtype=np.float64) + self.close_prices = np.zeros(self.max_symbols, dtype=np.float64) + self.high_prices = np.zeros(self.max_symbols, dtype=np.float64) + self.low_prices = np.zeros(self.max_symbols, dtype=np.float64) + self.volumes = np.zeros(self.max_symbols, dtype=np.float64) + self.last_update = np.zeros(self.max_symbols, dtype=np.int64) + self.tick_sizes = np.full(self.max_symbols, 1e-8, dtype=np.float64) # Default fallback + + # PRESERVED: All original mapping and state + self.symbol_to_idx = {} + self.idx_to_symbol = {} + self.active_symbols = 0 + + self.tick_count = 0 + + # PRESERVED: All original DOLPHIN thresholds - EXACT + self.bull_threshold = 0.60 # 60% bullish + self.bear_threshold = 0.55 # 55% bearish + + self.previous_bull_ratio = None + self.regime_history = deque(maxlen=100) + + # Metrics + self.last_metric_log = time.time_ns() + self.processed_ticks = 0 + self.regime_calculations = 0 + + self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, " + f"ticks_per_analysis: {self.ticks_per_analysis}") + + def on_start(self) -> None: + """Subscribe to tick events - using Nautilus ActorExecutor""" + self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to events") + + if hasattr(self, 'msgbus') and self.msgbus: + self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) + self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered) + self.msgbus.subscribe(TICK_SIZES_TOPIC, self.handle_tick_sizes) + self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events, symbol discovery and tick sizes") + + def on_stop(self) -> None: + """Stop the actor - Nautilus handles executor cleanup""" + self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping") + # Nautilus kernel handles executor shutdown - no manual cleanup needed + + def handle_symbols_discovered(self, data): + """Pre-initialize symbol mappings during discovery phase""" + try: + symbols, timestamp = data + self.log.info(f"Nautilus ActorExecutor: Pre-initializing {len(symbols)} symbol mappings") + + for symbol in symbols: + if self.active_symbols >= self.max_symbols: + self.log.warning(f"Max symbols ({self.max_symbols}) exceeded during initialization") + break + + idx = self.active_symbols + self.symbol_to_idx[symbol] = idx + self.idx_to_symbol[idx] = symbol + self.active_symbols += 1 + + self.log.info(f"Pre-initialized {self.active_symbols} symbol mappings") + + except Exception as e: + self.log.error(f"Error pre-initializing symbols: {e}") + + def handle_tick_sizes(self, data): + """Apply tick sizes to pre-initialized symbol mappings""" + try: + tick_sizes, timestamp = data + self.log.info(f"Nautilus ActorExecutor: Received {len(tick_sizes)} tick sizes from discovery actor") + + applied_count = 0 + for symbol, tick_size in tick_sizes.items(): + if symbol in self.symbol_to_idx: # Will exist from pre-initialization + idx = self.symbol_to_idx[symbol] + if 0 < tick_size <= 1.0: + self.tick_sizes[idx] = tick_size + applied_count += 1 + else: + self.log.warning(f"Invalid tick size {tick_size} for {symbol}, using fallback") + + self.log.info(f"Nautilus ActorExecutor: Applied {applied_count} tick sizes to pre-initialized symbols") + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Error handling tick sizes: {e}") + + def handle_raw_tick(self, data): + """ + SIMPLIFIED: Zero-allocation tick processing with pre-initialized symbols + Using Nautilus ActorExecutor for regime detection tasks + """ + try: + symbol, price, size, ts_event, open_price, candle_open_time = data + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}") + return + + # Direct lookup - symbol will exist from pre-initialization + if symbol not in self.symbol_to_idx: + self.log.error(f"Nautilus ActorExecutor: Symbol {symbol} not found in pre-initialized mappings") + return + + idx = self.symbol_to_idx[symbol] + + # Check if new candle period + if candle_open_time > self.last_update[idx]: + # Reset for new candle + self.open_prices[idx] = open_price + self.high_prices[idx] = price + self.low_prices[idx] = price + self.close_prices[idx] = price + self.volumes[idx] = size + self.last_update[idx] = candle_open_time + else: + # Update existing candle data + self.close_prices[idx] = price + self.high_prices[idx] = max(self.high_prices[idx], price) + self.low_prices[idx] = min(self.low_prices[idx], price) + self.volumes[idx] += size + + self.tick_count += 1 + self.processed_ticks += 1 + + # PRESERVED: Original trigger logic + if self.tick_count >= self.ticks_per_analysis: + # Use Nautilus ActorExecutor for regime detection + if hasattr(self, '_executor') and self._executor: + future = self._executor.submit(self._run_regime_detection_async()) + else: + # Fallback to sync detection + self._run_regime_detection() + self.tick_count = 0 + + # Periodic metrics logging + now = time.time_ns() + if now - self.last_metric_log > 1_000_000_000: + self.log.info(f"Nautilus ActorExecutor: DOLPHIN metrics - ticks: {self.processed_ticks}, " + f"regime_calcs: {self.regime_calculations}, active_symbols: {self.active_symbols}") + self.last_metric_log = now + + async def _run_regime_detection_async(self): + try: + self._run_regime_detection() + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}") + + def _run_regime_detection(self): + self.regime_calculations += 1 + + total_symbols = self.active_symbols + if total_symbols == 0: + return + + analyzed = 0 + bullish = 0 + bearish = 0 + + # NEW: Track pattern of bullish/bearish symbols for this calculation + symbol_pattern = [] + + # PRESERVED: Original analysis with exact thresholds + for idx in range(self.active_symbols): + open_price = self.open_prices[idx] + close_price = self.close_prices[idx] + + if open_price == 0: + continue + + analyzed += 1 + + + # NEW: HFT-grade tick-size based comparison + tick_size = self.tick_sizes[idx] + equality_threshold = tick_size / 2 # Half tick size standard + price_diff = abs(close_price - open_price) + + # Check if prices are effectively equal within tick size tolerance + if price_diff <= equality_threshold: + # Prices are effectively equal (within tick size tolerance) + symbol_pattern.append(f"S{close_price:.8f}={open_price:.8f}") + elif close_price > open_price: + # Bullish: close > open + bullish += 1 + # Arrow points to close (larger price) + symbol_pattern.append(f"B{open_price:.8f}->{close_price:.8f}") + else: + # Bearish: close < open + bearish += 1 + # Arrow points to open (larger price) + symbol_pattern.append(f"X{close_price:.8f}<-{open_price:.8f}") + + if analyzed == 0: + return + + # PRESERVED: Original ratio calculations + bull_ratio = bullish / analyzed + bear_ratio = bearish / analyzed + sideways_ratio = 1.0 - bull_ratio - bear_ratio + + # MODIFIED: Original DOLPHIN Algorithm - Simple 3-regime detection + if bull_ratio >= self.bull_threshold: # 60% bullish + regime = MarketRegime.BULL + elif bear_ratio >= self.bear_threshold: # 55% bearish + regime = MarketRegime.BEAR + else: + # Everything else is sideways + regime = MarketRegime.SIDEWAYS + + # PRESERVED: Original confidence calculation + confidence = self._calculate_confidence(bull_ratio, bear_ratio, analyzed, total_symbols) + + self.previous_bull_ratio = bull_ratio + + # Publish regime result using Nautilus message bus + try: + if hasattr(self, 'msgbus') and self.msgbus: + regime_tuple = ( + int(time.time() * 1000), + regime.value, + float(bull_ratio), + float(bear_ratio), + float(sideways_ratio), + int(analyzed), + int(total_symbols), + float(confidence) + ) + + self.msgbus.publish(REGIME_TOPIC, regime_tuple) + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}") + + # Log regime changes + if not self.regime_history or regime != self.regime_history[-1]: + self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.2%} " + f"Bear: {bear_ratio:.2%} Sideways: {sideways_ratio:.2%} ({bullish}/{bearish}) | " + f"Confidence: {confidence:.2%} | Analyzed: {analyzed}/{total_symbols}") + self.regime_history.append(regime) + + # Periodic regime status (even without changes) + if self.regime_calculations % 50 == 0: # Every second, approx, given avg. tick rate + # Determine color based on regime + if regime == MarketRegime.BULL: + color_code = "\033[92m" # Green + elif regime == MarketRegime.BEAR: + color_code = "\033[91m" # Red + else: # SIDEWAYS + color_code = "\033[93m" # Yellow + + # Reset color code + reset_code = "\033[0m" + + self.log.info(f"{color_code}REGIME STATUS: {regime.value} | Bull: {bull_ratio:.2%} " + f"Bear: {bear_ratio:.2%} ({bullish}/{bearish}) | Processed: {self.processed_ticks} ticks{reset_code}") + + # NEW: Log symbol pattern and counts + if symbol_pattern: # Only if we have symbols to show + pattern_str = " ".join(symbol_pattern) + " " # Create pattern with spaces + bull_count = sum(1 for s in symbol_pattern if s.startswith("B")) + bear_count = sum(1 for s in symbol_pattern if s.startswith("X")) + + self.log.debug(f"{pattern_str} and totals: BULLS:{bull_count}/BEARS:{bear_count}") + + def _calculate_confidence(self, bull_ratio: float, bear_ratio: float, + analyzed: int, total: int) -> float: + """PRESERVED EXACTLY: Original DOLPHIN confidence calculation""" + if analyzed == 0: + return 0.0 + + # Market Decisiveness + max_ratio = max(bull_ratio, bear_ratio) + decisiveness = abs(max_ratio - 0.5) * 2 + + # Sample Coverage + coverage = analyzed / total + + # Statistical Significance + if max_ratio > 0 and max_ratio < 1: + standard_error = math.sqrt(max_ratio * (1 - max_ratio) / analyzed) + else: + standard_error = 0.0 + + z_score = abs(max_ratio - 0.5) / max(standard_error, 0.001) + statistical_confidence = min(z_score / 3.0, 1.0) + + # Market Clarity + market_clarity = bull_ratio + bear_ratio + + # Weighted combination + confidence = ( + decisiveness * 0.40 + + coverage * 0.10 + + statistical_confidence * 0.30 + + market_clarity * 0.20 + ) + + return max(0.0, min(confidence, 1.0)) + +class SILOQYNormalizerActor(Actor): + def __init__(self, config: SILOQYNormalizerConfig): + super().__init__(config) + + self.normalized_count = 0 + self.last_metric_log = time.time_ns() + + self.log.info("SILOQYNormalizerActor initialized with Nautilus ActorExecutor") + + def on_start(self) -> None: + """Subscribe to tick events - using Nautilus ActorExecutor""" + self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor starting - subscribing to tick events") + + if hasattr(self, 'msgbus') and self.msgbus: + self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) + self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events") + + def on_stop(self) -> None: + """Stop the actor - Nautilus handles executor cleanup""" + self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor stopping") + # Nautilus kernel handles executor shutdown - no manual cleanup needed + + def handle_raw_tick(self, data): + try: + symbol, price, size, ts_event, open_price, candle_open_time = data + tick = SiloqyTradeTick( + instrument_id=symbol, + price=round(float(price), 8), + size=round(float(size), 8), + ts_event=int(ts_event), + ts_init=int(time.time_ns()), + open_price=round(float(open_price), 8) if open_price is not None else round(float(price), 8), + candle_open_time=int(candle_open_time) if candle_open_time is not None else int(time.time() * 1000), + tick_side=None, # Default to None unless Binance provides this info + exchange="BINANCE", # We know this is from Binance + liquidity_flag=None # Default to None unless Binance provides this info + ) + + if hasattr(self, 'msgbus') and self.msgbus: + try: + # Publish directly to STRUCTURED_TOPIC to match original v3.28 behavior + self.msgbus.publish(STRUCTURED_TOPIC, tick) + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish to STRUCTURED_TOPIC: {e}") + + # FALLBACK: Try Nautilus publish_data if msgbus not available + elif hasattr(self, 'publish_data'): + try: + # Use standard Nautilus publishing as fallback + self.publish_data(DataType(SiloqyTradeTick), tick) + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish structured data: {e}") + self.normalized_count += 1 + + # Periodic metrics + now = time.time_ns() + if now - self.last_metric_log > 1_000_000_000: + self.log.info(f"Nautilus ActorExecutor: Normalizer processed: {self.normalized_count} ticks") + self.last_metric_log = now + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to handle raw tick: {e}") + +def test_siloqy_actors_with_nautilus_process_management(): + """Test SILOQY actors with Nautilus built-in ActorExecutor process management""" + print("SILOQY Multi-Actor Test with Nautilus Built-in Process Management") + + # Configuration following test pattern - THROTTLE MODE ENABLED FOR DEV/TESTING + symbol_discovery_config = ImportableActorConfig( + actor_path="__main__:SILOQYSymbolDiscoveryActor", + config_path="__main__:SILOQYSymbolDiscoveryConfig", + config={ + "component_id": "SILOQY-SYMBOL-DISCOVERY", + "symbols": [], # Empty for dynamic discovery + "candle_interval_ms": 15 * 60 * 1000, + "throttle_mode": True, # ENABLED: Safe for dual instance testing + "throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s) + "max_symbols_throttled": 414 # Only 100 symbols (vs 2000+) + } + ) + + main_actor_config = ImportableActorConfig( + actor_path="__main__:SILOQYMainActor", + config_path="__main__:SILOQYMainActorConfig", + config={ + "component_id": "SILOQY-MAIN-ACTOR", + "candle_interval_ms": 15 * 60 * 1000, + "throttle_mode": False, # ENABLED: Reduced tick generation + "enable_real_data": True # CHANGE TO True for real WebSocket data + } + ) + + regime_actor_config = ImportableActorConfig( + actor_path="__main__:DOLPHINRegimeActor", + config_path="__main__:DOLPHINRegimeActorConfig", + config={ + "component_id": "DOLPHIN-REGIME-ACTOR", + "max_symbols": 5000, + "ticks_per_analysis": 2 # Reduced for throttle mode testing + } + ) + + normalizer_config = ImportableActorConfig( + actor_path="__main__:SILOQYNormalizerActor", + config_path="__main__:SILOQYNormalizerConfig", + config={ + "component_id": "SILOQY-NORMALIZER" + } + ) + + # Trading node configuration - Uses Nautilus built-in process management + trading_config = TradingNodeConfig( + trader_id=TraderId("SILOQY-TRADER-001"), + actors=[ + symbol_discovery_config, + main_actor_config, + regime_actor_config, + normalizer_config + ], + data_clients={}, # No external data clients needed + exec_clients={} # No execution clients for this test + ) + + node = TradingNode(config=trading_config) + + try: + node.build() + print("Node built successfully with Nautilus built-in process management") + node.run() + + except KeyboardInterrupt: + print("\nSILOQY Actor test with Nautilus process management completed!") + except Exception as e: + print(f"Error: {e}") + import traceback + traceback.print_exc() + finally: + try: + node.dispose() + print("Nautilus process management: Node disposed successfully") + except: + pass + +if __name__ == "__main__": + test_siloqy_actors_with_nautilus_process_management() \ No newline at end of file