import time import numpy as np import asyncio import json import math from typing import Dict, List, Optional, Tuple, Any from enum import Enum from collections import deque import httpx import random # Added for _simulate_websocket_ticks # Nautilus imports - following test pattern from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig from nautilus_trader.live.node import TradingNode from nautilus_trader.common.actor import Actor from nautilus_trader.common.config import ActorConfig from nautilus_trader.model.identifiers import TraderId from nautilus_trader.core.data import Data from nautilus_trader.common.component import Logger, init_logging # NEW: construct a fresh Tick for each normalized message (don't mutate immutable Data) from nautilus_trader.model.data import TradeTick from nautilus_trader.model.identifiers import InstrumentId, TradeId from nautilus_trader.model.objects import Price, Quantity from nautilus_trader.model.enums import AggressorSide # Initialize logging _log_guard = init_logging() _logger = Logger("SILOQY") # Topics - HIGH PERFORMANCE STRING TOPICS RAW_TOPIC = "SILOQY.RAW.TICKS" STRUCTURED_TOPIC = "SILOQY.STRUCTURED.TICKS" REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS" SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED" CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL" # Rate limiting constant MIN_INTERVAL = 2.5 # seconds between API batches # Market Regime Enum class MarketRegime(Enum): BULL = "BULL" BEAR = "BEAR" TRANSITION = "TRANSITION" SIDEWAYS = "SIDEWAYS" # -------------------------------------------------------------------- # SILOQY Custom Tick - PRESERVED with fixes # -------------------------------------------------------------------- class SiloqyTradeTick(TradeTick): def __init__(self, instrument_id: str, price: float, size: float, ts_event: int, ts_init: int = None, open_price: float = None, candle_open_time: int = None, tick_side: str = None, exchange: str = None, liquidity_flag = None): # Convert to proper Nautilus types - add venue to symbol if isinstance(instrument_id, str): # Add BINANCE venue if not already present if '.' not in instrument_id: instrument_id = f"{instrument_id}.BINANCE" instr_id = InstrumentId.from_str(instrument_id) else: instr_id = instrument_id # STEP 1 FIX: Changed precision from 4 to 8 for price, and 0 to 8 for size price_obj = Price(price, precision=8) if not isinstance(price, Price) else price size_obj = Quantity(size, precision=8) if not isinstance(size, Quantity) else size # Default aggressor side and trade ID aggressor_side = AggressorSide.NO_AGGRESSOR # Default since Binance doesn't always provide this trade_id = TradeId(f"T{int(time.time_ns())}") # Generate a trade ID super().__init__( instrument_id=instr_id, price=price_obj, size=size_obj, aggressor_side=aggressor_side, trade_id=trade_id, ts_event=int(ts_event), ts_init=int(ts_init if ts_init is not None else time.time_ns()) ) # Additional SILOQY fields self.open_price = open_price if open_price is not None else price self.candle_open_time = candle_open_time if candle_open_time is not None else int(time.time() * 1000) self.tick_side = tick_side self.exchange = exchange self.liquidity_flag = liquidity_flag class SILOQYSymbolDiscoveryConfig(ActorConfig): symbols: List[str] = [] candle_interval_ms: int = 15 * 60 * 1000 throttle_mode: bool = False throttle_rate_limit_seconds: float = 10.0 max_symbols_throttled: int = 100 class SILOQYMainActorConfig(ActorConfig): candle_interval_ms: int = 15 * 60 * 1000 throttle_mode: bool = False class DOLPHINRegimeActorConfig(ActorConfig): max_symbols: int = 5000 ticks_per_analysis: int = 1000 class SILOQYNormalizerConfig(ActorConfig): pass class SILOQYSymbolDiscoveryActor(Actor): """ Symbol discovery with all original SILOQY algorithms preserved Using Nautilus built-in ActorExecutor for task management """ def __init__(self, config: SILOQYSymbolDiscoveryConfig): super().__init__(config) # Preserve original SILOQY configuration self.symbols = list(config.symbols) if config.symbols else [] self.candle_interval_ms = config.candle_interval_ms self.active_candles = {} # Process management configuration self.throttle_mode = config.throttle_mode self.throttle_rate_limit_seconds = config.throttle_rate_limit_seconds self.max_symbols_throttled = config.max_symbols_throttled if self.throttle_mode: self.log.warning("THROTTLE MODE ENABLED - Running in dev/test configuration") self.log.warning(f"Rate limit: {self.throttle_rate_limit_seconds}s between batches") self.log.warning(f"Symbol limit: {self.max_symbols_throttled} symbols max") self.log.info("SILOQYSymbolDiscoveryActor initialized with Nautilus ActorExecutor") def on_start(self) -> None: """Start the actor - using Nautilus ActorExecutor""" self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor starting") # Use Nautilus ActorExecutor for task management if hasattr(self, '_executor') and self._executor: self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization") future = self._executor.submit(self.on_start_async()) self.log.info("Nautilus ActorExecutor: Symbol discovery task submitted") else: self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") asyncio.create_task(self.on_start_async()) def on_stop(self) -> None: """Stop the actor - Nautilus handles executor cleanup""" self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor stopping") # Nautilus kernel handles executor shutdown - no manual cleanup needed async def on_start_async(self) -> None: self.log.info("Nautilus ActorExecutor: Starting async symbol discovery initialization") try: # PRESERVED: Original symbol discovery algorithm if not self.symbols: await self._discover_all_symbols() # PRESERVED: Original stats fetching and candle reconstruction stats, candle_opens = await self._fetch_stats_and_reconstruct_candles() # Set active candles from reconstruction results self.active_candles = candle_opens # Publish results using msgbus self._publish_discovery_results() self.log.info("Nautilus ActorExecutor: Symbol discovery completed successfully") except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to complete symbol discovery: {e}") # Don't re-raise, let system continue async def _discover_all_symbols(self): """PRESERVED: Original Binance symbol discovery algorithm""" self.log.info("Starting dynamic symbol discovery from Binance...") url = "https://api.binance.com/api/v3/exchangeInfo" async with httpx.AsyncClient() as client: self.log.info("Fetching exchange info from Binance API...") response = await client.get(url, timeout=10) if response.status_code == 200: self.log.info("Successfully received exchange info") data = response.json() # Get all trading symbols (USDT pairs for example) full_symbols = [ s['symbol'] for s in data['symbols'] if s['status'] == 'TRADING' and s['symbol'].endswith('USDT') ] # Apply throttle mode symbol limiting if self.throttle_mode: self.symbols = full_symbols[:self.max_symbols_throttled] self.log.warning(f"THROTTLE MODE: Limited to {len(self.symbols)} symbols (from {len(full_symbols)} available)") else: self.symbols = full_symbols self.log.info(f"Discovered {len(self.symbols)} trading symbols") self.log.info(f"First 10 symbols: {self.symbols[:10]}") else: self.log.error(f"Failed to fetch exchange info: {response.status_code}") raise Exception(f"Failed to fetch exchange info: {response.status_code}") async def _fetch_stats_and_reconstruct_candles(self): url = "https://api.binance.com/api/v3/ticker/24hr" klines_url = "https://api.binance.com/api/v3/klines" ticker_url = "https://api.binance.com/api/v3/ticker/price" # PRESERVED: Original rate limit handling with throttle mode support base_rate_limit = 2.5 rate_limit_interval = self.throttle_rate_limit_seconds if self.throttle_mode else base_rate_limit if self.throttle_mode: self.log.warning(f"THROTTLE MODE: Using {rate_limit_interval}s intervals (vs {base_rate_limit}s normal)") stats = {} candle_opens = {} async with httpx.AsyncClient() as client: total_batches = (len(self.symbols) + 49) // 50 for i in range(0, len(self.symbols), 50): batch_num = (i // 50) + 1 start_time = time.time() symbols_batch = self.symbols[i:i + 50] if self.throttle_mode: self.log.info(f"THROTTLE MODE: Processing batch {batch_num}/{total_batches} with {rate_limit_interval}s delay") # PRESERVED: Original boundary detection logic current_time = int(time.time() * 1000) time_into_candle = current_time % self.candle_interval_ms candle_open_time = current_time - time_into_candle at_boundary = (time_into_candle < 1000) if i == 0: # Log initial state self.log.info("DOLPHIN: Current candle analysis:") self.log.info(f" - Current time: {current_time}") self.log.info(f" - Candle interval: {self.candle_interval_ms}ms ({self.candle_interval_ms//60000}m)") self.log.info(f" - Time into candle: {time_into_candle}ms ({time_into_candle//1000}s)") self.log.info(f" - At boundary: {at_boundary}") self.log.info(f" - Candle open time: {candle_open_time}") symbols_json_string = json.dumps(symbols_batch, separators=(',', ':')) params = {"symbols": symbols_json_string} try: # PRESERVED: Original stats fetching response = await client.get(url, params=params, timeout=10) if response.status_code == 200: data = response.json() for item in data: symbol = item['symbol'] stats[symbol] = { 'count': int(item['count']), 'quoteVolume': float(item['quoteVolume']), } self.log.info(f"Fetched stats for batch {i//50 + 1}: {len(data)} symbols") else: self.log.error(f"Error {response.status_code}: {response.text}") # PRESERVED: Original DOLPHIN candle reconstruction strategy if at_boundary: # AT BOUNDARY: Fetch current prices to use as opens self.log.info(f"DOLPHIN: At boundary - fetching current prices (batch {i//50 + 1})") ticker_params = {"symbols": symbols_json_string} ticker_response = await client.get(ticker_url, params=ticker_params, timeout=5) if ticker_response.status_code == 200: ticker_data = ticker_response.json() for item in ticker_data: symbol = item['symbol'] current_price = float(item['price']) candle_opens[symbol] = current_price self.log.info(f" - Fetched current prices for {len(ticker_data)} symbols") else: self.log.warning(f" - Current price fetch failed ({ticker_response.status_code})") else: # NOT AT BOUNDARY: Fetch historical 1s kline data self.log.info(f"DOLPHIN: Not at boundary - fetching 1s kline data (batch {i//50 + 1})") for symbol in symbols_batch: try: # PRESERVED: Original kline fetching logic kline_params = { 'symbol': symbol, 'interval': '1s', 'startTime': candle_open_time, 'endTime': candle_open_time + 1000, 'limit': 1 } kline_response = await client.get(klines_url, params=kline_params, timeout=5) if kline_response.status_code == 200: kline_data = kline_response.json() if kline_data: open_price = float(kline_data[0][1]) candle_opens[symbol] = open_price if (i + len(symbols_batch)) <= 10: self.log.info(f" - {symbol}: reconstructed open = {open_price}") else: self.log.warning(f" - {symbol}: no 1s kline data found") else: self.log.warning(f" - {symbol}: kline fetch failed ({kline_response.status_code})") except Exception as e: self.log.error(f" - {symbol}: kline fetch error: {e}") await asyncio.sleep(0.1) except Exception as e: self.log.error(f"Request failed: {str(e)}") # PRESERVED: Original rate limiting elapsed = time.time() - start_time if i + 50 < len(self.symbols): sleep_time = max(0, rate_limit_interval - elapsed) if sleep_time > 0: await asyncio.sleep(sleep_time) self.log.info(f"DOLPHIN: Candle reconstruction complete:") self.log.info(f" - Symbols processed: {len(self.symbols)}") self.log.info(f" - Opens reconstructed: {len(candle_opens)}") self.log.info("Discovery phase ready for publishing...") return stats, candle_opens def _publish_discovery_results(self): """Publish results using msgbus - Nautilus message bus integration""" try: self.log.info("Nautilus ActorExecutor: Publishing discovery results to other actors...") if hasattr(self, 'msgbus') and self.msgbus: # Publish symbols and candles as tuples self.msgbus.publish(SYMBOLS_DISCOVERED_TOPIC, (self.symbols, int(time.time_ns()))) self.msgbus.publish(CANDLES_INITIAL_TOPIC, (self.active_candles, int(time.time_ns()))) self.log.info(f"Nautilus ActorExecutor: Published {len(self.symbols)} symbols and {len(self.active_candles)} candles") self.log.info("Nautilus ActorExecutor: Discovery phase complete - other actors can now start processing") else: self.log.warning("Nautilus ActorExecutor: msgbus not available for publishing") except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish discovery results: {e}") class SILOQYMainActor(Actor): def __init__(self, config: SILOQYMainActorConfig): super().__init__(config) # Preserve original configuration self.candle_interval_ms = config.candle_interval_ms self.throttle_mode = config.throttle_mode self.connections = {} self.connection_tasks = {} # Will be populated from discovery actor self.symbols = [] self.active_candles = {} # WebSocket tasks - managed by Nautilus ActorExecutor self.ws_tasks = [] # Synchronization self.discovery_complete = asyncio.Event() if self.throttle_mode: self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation") self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor") def on_start(self) -> None: """Subscribe to discovery events - using Nautilus ActorExecutor""" self.log.info("Nautilus ActorExecutor: SILOQYMainActor starting - subscribing to discovery events") # Subscribe to discovery results if hasattr(self, 'msgbus') and self.msgbus: self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered) self.msgbus.subscribe(CANDLES_INITIAL_TOPIC, self.handle_candles_initial) self.log.info("Nautilus ActorExecutor: Subscribed to discovery topics") # Use Nautilus ActorExecutor for task management if hasattr(self, '_executor') and self._executor: self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization") future = self._executor.submit(self.on_start_async()) self.log.info("Nautilus ActorExecutor: Main actor initialization task submitted") else: self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") asyncio.create_task(self.on_start_async()) def on_stop(self) -> None: """Stop the actor - Nautilus handles executor cleanup""" self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping") # Nautilus kernel handles executor shutdown - no manual cleanup needed async def on_start_async(self) -> None: try: # Wait for symbol discovery to complete self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...") await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0) # PRESERVED: Start WebSocket connections await self._start_websocket_connections() self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully") except asyncio.TimeoutError: self.log.error("Nautilus ActorExecutor: Timeout waiting for symbol discovery") except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to start SILOQYMainActor: {e}") def handle_symbols_discovered(self, data): """Handle symbols from discovery actor""" try: symbols, timestamp = data self.symbols = symbols self.log.info(f"Nautilus ActorExecutor: Received {len(symbols)} symbols from discovery actor") if self.symbols and self.active_candles: self.discovery_complete.set() except Exception as e: self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}") # FIXED: SILOQYMainActor.handle_candles_initial def handle_candles_initial(self, data): """Handle initial candles from discovery actor and properly initialize the candle dict.""" try: candles_received, timestamp = data self.log.info(f"Nautilus ActorExecutor: Received {len(candles_received)} initial candles from discovery actor") # FIX: Re-initialize active_candles with the full dictionary structure # STEP 5 FIX: Added rounding to 8 decimals for all price values self.active_candles = {} for symbol, open_price in candles_received.items(): self.active_candles[symbol] = { 'open_price': round(open_price, 8), 'open_time': (int(time.time() * 1000) // self.candle_interval_ms) * self.candle_interval_ms, 'high': round(open_price, 8), 'low': round(open_price, 8), 'close': round(open_price, 8), 'volume': 0.0, } if self.symbols and self.active_candles: self.discovery_complete.set() except Exception as e: self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}") async def _start_websocket_connections(self): """PRESERVED: Original WebSocket connection logic with Nautilus ActorExecutor""" self.log.info("Starting WebSocket simulation for tick generation...") self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols") # Use Nautilus ActorExecutor for WebSocket task management if hasattr(self, '_executor') and self._executor: self.log.info("Nautilus ActorExecutor: Using registered executor for WebSocket simulation") future = self._executor.submit(self._simulate_websocket_ticks()) self.log.info("Nautilus ActorExecutor: WebSocket simulation task submitted") else: self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") task = asyncio.create_task(self._simulate_websocket_ticks()) self.ws_tasks.append(task) async def _simulate_websocket_ticks(self): # Adjust tick rate for throttle mode tick_delay = 0.1 if self.throttle_mode else 0.01 symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols)) tick_count = 0 try: if self.throttle_mode: self.log.warning(f"THROTTLE MODE: Reduced tick generation - {symbols_to_simulate} symbols at 10 ticks/second") else: self.log.info("WebSocket tick simulation started - generating 100 ticks/second") while True: for symbol in self.symbols[:symbols_to_simulate]: # STEP 2 FIX: Round price to 8 decimals, use float for quantity with 8 decimals price = round(100.0 + random.gauss(0, 0.5), 8) quantity = round(random.uniform(0.00000001, 100.0), 8) timestamp = int(time.time() * 1000) self.on_websocket_tick(symbol, price, quantity, timestamp) tick_count += 1 # Log every 500 ticks in throttle mode, 1000 in normal mode log_interval = 500 if self.throttle_mode else 1000 if tick_count % log_interval == 0: mode_indicator = "THROTTLE" if self.throttle_mode else "" self.log.info(f"Generated {tick_count} ticks {mode_indicator} - latest: {symbol}@{price:.2f}") await asyncio.sleep(tick_delay) except asyncio.CancelledError: self.log.info(f"Nautilus ActorExecutor: WebSocket simulation stopped after {tick_count} ticks") except Exception as e: self.log.error(f"Nautilus ActorExecutor: WebSocket simulation error: {e}") def on_websocket_tick(self, symbol: str, price: float, quantity: float, timestamp: int): """ PRESERVED: Original tick processing and candle update logic """ # PRESERVED: Original candle update logic if symbol not in self.active_candles: candle_open_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms # STEP 4 FIX: Round all prices to 8 decimals self.active_candles[symbol] = { 'open_price': round(price, 8), 'open_time': candle_open_time, 'high': round(price, 8), 'low': round(price, 8), 'close': round(price, 8), 'volume': 0.0 } candle = self.active_candles[symbol] # PRESERVED: Original candle rolling logic current_candle_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms if current_candle_time > candle['open_time']: # STEP 4 FIX: Round all prices to 8 decimals candle['open_price'] = round(price, 8) candle['open_time'] = current_candle_time candle['high'] = round(price, 8) candle['low'] = round(price, 8) candle['close'] = round(price, 8) candle['volume'] = quantity else: candle['close'] = round(price, 8) candle['high'] = round(max(candle['high'], price), 8) candle['low'] = round(min(candle['low'], price), 8) candle['volume'] += quantity # PRESERVED: Original high-performance tuple publishing try: if hasattr(self, 'msgbus') and self.msgbus: # STEP 3 FIX: Round all float values to 8 decimals in the tuple tick_tuple = ( symbol, round(float(price), 8), round(float(quantity), 8), int(timestamp), round(float(candle['open_price']), 8), int(candle['open_time']) ) self.msgbus.publish(RAW_TOPIC, tick_tuple) except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish tick: {e}") class DOLPHINRegimeActor(Actor): def __init__(self, config: DOLPHINRegimeActorConfig): super().__init__(config) # PRESERVED: All original DOLPHIN configuration self.max_symbols = config.max_symbols self.ticks_per_analysis = config.ticks_per_analysis # PRESERVED: All original pre-allocated arrays for zero allocation self.open_prices = np.zeros(self.max_symbols, dtype=np.float64) self.close_prices = np.zeros(self.max_symbols, dtype=np.float64) self.high_prices = np.zeros(self.max_symbols, dtype=np.float64) self.low_prices = np.zeros(self.max_symbols, dtype=np.float64) self.volumes = np.zeros(self.max_symbols, dtype=np.float64) self.last_update = np.zeros(self.max_symbols, dtype=np.int64) # PRESERVED: All original mapping and state self.symbol_to_idx = {} self.idx_to_symbol = {} self.active_symbols = 0 self.tick_count = 0 # PRESERVED: All original DOLPHIN thresholds - EXACT self.bull_threshold = 0.60 # 60% bullish self.bear_threshold = 0.55 # 55% bearish self.previous_bull_ratio = None self.regime_history = deque(maxlen=100) # Metrics self.last_metric_log = time.time_ns() self.processed_ticks = 0 self.regime_calculations = 0 self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, " f"ticks_per_analysis: {self.ticks_per_analysis}") def on_start(self) -> None: """Subscribe to tick events - using Nautilus ActorExecutor""" self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to tick events") if hasattr(self, 'msgbus') and self.msgbus: self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events") def on_stop(self) -> None: """Stop the actor - Nautilus handles executor cleanup""" self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping") # Nautilus kernel handles executor shutdown - no manual cleanup needed def handle_raw_tick(self, data): """ PRESERVED EXACTLY: All original zero-allocation tick processing Using Nautilus ActorExecutor for regime detection tasks """ try: symbol, price, size, ts_event, open_price, candle_open_time = data except Exception as e: self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}") return # PRESERVED EXACTLY: All original array processing logic if symbol not in self.symbol_to_idx: if self.active_symbols >= self.max_symbols: self.log.error(f"Nautilus ActorExecutor: Max symbols ({self.max_symbols}) exceeded") return idx = self.active_symbols self.symbol_to_idx[symbol] = idx self.idx_to_symbol[idx] = symbol self.active_symbols += 1 # Initialize arrays self.open_prices[idx] = open_price self.high_prices[idx] = price self.low_prices[idx] = price self.close_prices[idx] = price self.volumes[idx] = 0.0 else: idx = self.symbol_to_idx[symbol] # Check if new candle period if candle_open_time > self.last_update[idx]: # Reset for new candle self.open_prices[idx] = open_price self.high_prices[idx] = price self.low_prices[idx] = price self.close_prices[idx] = price self.volumes[idx] = size self.last_update[idx] = candle_open_time else: # Update existing candle data self.close_prices[idx] = price self.high_prices[idx] = max(self.high_prices[idx], price) self.low_prices[idx] = min(self.low_prices[idx], price) self.volumes[idx] += size self.tick_count += 1 self.processed_ticks += 1 # PRESERVED: Original trigger logic if self.tick_count >= self.ticks_per_analysis: # Use Nautilus ActorExecutor for regime detection if hasattr(self, '_executor') and self._executor: future = self._executor.submit(self._run_regime_detection_async()) else: # Fallback to sync detection self._run_regime_detection() self.tick_count = 0 # Periodic metrics logging now = time.time_ns() if now - self.last_metric_log > 1_000_000_000: self.log.info(f"Nautilus ActorExecutor: DOLPHIN metrics - ticks: {self.processed_ticks}, " f"regime_calcs: {self.regime_calculations}, active_symbols: {self.active_symbols}") self.last_metric_log = now async def _run_regime_detection_async(self): try: self._run_regime_detection() except Exception as e: self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}") def _run_regime_detection(self): self.regime_calculations += 1 total_symbols = self.active_symbols if total_symbols == 0: return analyzed = 0 bullish = 0 bearish = 0 # PRESERVED: Original analysis with exact thresholds for idx in range(self.active_symbols): open_price = self.open_prices[idx] close_price = self.close_prices[idx] if open_price == 0: continue analyzed += 1 # PRESERVED: EXACT DOLPHIN thresholds change = (close_price - open_price) / open_price if change >= 0.0015: # 0.15% threshold for bullish bullish += 1 elif change <= -0.0015: # -0.15% threshold for bearish bearish += 1 if analyzed == 0: return # PRESERVED: Original ratio calculations bull_ratio = bullish / analyzed bear_ratio = bearish / analyzed sideways_ratio = 1.0 - bull_ratio - bear_ratio # MODIFIED: Original DOLPHIN Algorithm - Simple 3-regime detection if bull_ratio >= self.bull_threshold: # 60% bullish regime = MarketRegime.BULL elif bear_ratio >= self.bear_threshold: # 55% bearish regime = MarketRegime.BEAR else: # Everything else is sideways regime = MarketRegime.SIDEWAYS # PRESERVED: Original confidence calculation confidence = self._calculate_confidence(bull_ratio, bear_ratio, analyzed, total_symbols) self.previous_bull_ratio = bull_ratio # Publish regime result using Nautilus message bus try: if hasattr(self, 'msgbus') and self.msgbus: regime_tuple = ( int(time.time() * 1000), regime.value, float(bull_ratio), float(bear_ratio), float(sideways_ratio), int(analyzed), int(total_symbols), float(confidence) ) self.msgbus.publish(REGIME_TOPIC, regime_tuple) except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}") # Log regime changes if not self.regime_history or regime != self.regime_history[-1]: self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.1%} " f"Bear: {bear_ratio:.1%} Sideways: {sideways_ratio:.1%} | " f"Confidence: {confidence:.1%} | Analyzed: {analyzed}/{total_symbols}") self.regime_history.append(regime) # Periodic regime status (even without changes) if self.regime_calculations % 10 == 0: # Every 10 calculations self.log.info(f"REGIME STATUS: {regime.value} | Bull: {bull_ratio:.1%} " f"Bear: {bear_ratio:.1%} | Processed: {self.processed_ticks} ticks") def _calculate_confidence(self, bull_ratio: float, bear_ratio: float, analyzed: int, total: int) -> float: """PRESERVED EXACTLY: Original DOLPHIN confidence calculation""" if analyzed == 0: return 0.0 # Market Decisiveness max_ratio = max(bull_ratio, bear_ratio) decisiveness = abs(max_ratio - 0.5) * 2 # Sample Coverage coverage = analyzed / total # Statistical Significance if max_ratio > 0 and max_ratio < 1: standard_error = math.sqrt(max_ratio * (1 - max_ratio) / analyzed) else: standard_error = 0.0 z_score = abs(max_ratio - 0.5) / max(standard_error, 0.001) statistical_confidence = min(z_score / 3.0, 1.0) # Market Clarity market_clarity = bull_ratio + bear_ratio # Weighted combination confidence = ( decisiveness * 0.40 + coverage * 0.10 + statistical_confidence * 0.30 + market_clarity * 0.20 ) return max(0.0, min(confidence, 1.0)) class SILOQYNormalizerActor(Actor): def __init__(self, config: SILOQYNormalizerConfig): super().__init__(config) self.normalized_count = 0 self.last_metric_log = time.time_ns() self.log.info("SILOQYNormalizerActor initialized with Nautilus ActorExecutor") def on_start(self) -> None: """Subscribe to tick events - using Nautilus ActorExecutor""" self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor starting - subscribing to tick events") if hasattr(self, 'msgbus') and self.msgbus: self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events") def on_stop(self) -> None: """Stop the actor - Nautilus handles executor cleanup""" self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor stopping") # Nautilus kernel handles executor shutdown - no manual cleanup needed def handle_raw_tick(self, data): try: symbol, price, size, ts_event, open_price, candle_open_time = data tick = SiloqyTradeTick( instrument_id=symbol, price=round(float(price), 8), size=round(float(size), 8), ts_event=int(ts_event), ts_init=int(time.time_ns()), open_price=round(float(open_price), 8) if open_price is not None else round(float(price), 8), candle_open_time=int(candle_open_time) if candle_open_time is not None else int(time.time() * 1000), tick_side=None, # Default to None unless Binance provides this info exchange="BINANCE", # We know this is from Binance liquidity_flag=None # Default to None unless Binance provides this info ) if hasattr(self, 'msgbus') and self.msgbus: try: # Publish directly to STRUCTURED_TOPIC to match original v3.28 behavior self.msgbus.publish(STRUCTURED_TOPIC, tick) except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish to STRUCTURED_TOPIC: {e}") # FALLBACK: Try Nautilus publish_data if msgbus not available elif hasattr(self, 'publish_data'): try: # Use standard Nautilus publishing as fallback self.publish_data(DataType(SiloqyTradeTick), tick) except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish structured data: {e}") self.normalized_count += 1 # Periodic metrics now = time.time_ns() if now - self.last_metric_log > 1_000_000_000: self.log.info(f"Nautilus ActorExecutor: Normalizer processed: {self.normalized_count} ticks") self.last_metric_log = now except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to handle raw tick: {e}") def test_siloqy_actors_with_nautilus_process_management(): """Test SILOQY actors with Nautilus built-in ActorExecutor process management""" print("SILOQY Multi-Actor Test with Nautilus Built-in Process Management") # Configuration following test pattern - THROTTLE MODE ENABLED FOR DEV/TESTING symbol_discovery_config = ImportableActorConfig( actor_path="__main__:SILOQYSymbolDiscoveryActor", config_path="__main__:SILOQYSymbolDiscoveryConfig", config={ "component_id": "SILOQY-SYMBOL-DISCOVERY", "symbols": [], # Empty for dynamic discovery "candle_interval_ms": 15 * 60 * 1000, "throttle_mode": True, # ENABLED: Safe for dual instance testing "throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s) "max_symbols_throttled": 100 # Only 100 symbols (vs 2000+) } ) main_actor_config = ImportableActorConfig( actor_path="__main__:SILOQYMainActor", config_path="__main__:SILOQYMainActorConfig", config={ "component_id": "SILOQY-MAIN-ACTOR", "candle_interval_ms": 15 * 60 * 1000, "throttle_mode": True # ENABLED: Reduced tick generation } ) regime_actor_config = ImportableActorConfig( actor_path="__main__:DOLPHINRegimeActor", config_path="__main__:DOLPHINRegimeActorConfig", config={ "component_id": "DOLPHIN-REGIME-ACTOR", "max_symbols": 5000, "ticks_per_analysis": 500 # Reduced for throttle mode testing } ) normalizer_config = ImportableActorConfig( actor_path="__main__:SILOQYNormalizerActor", config_path="__main__:SILOQYNormalizerConfig", config={ "component_id": "SILOQY-NORMALIZER" } ) # Trading node configuration - Uses Nautilus built-in process management trading_config = TradingNodeConfig( trader_id=TraderId("SILOQY-TRADER-001"), actors=[ symbol_discovery_config, main_actor_config, regime_actor_config, normalizer_config ], data_clients={}, # No external data clients needed exec_clients={} # No execution clients for this test ) node = TradingNode(config=trading_config) try: node.build() print("Node built successfully with Nautilus built-in process management") node.run() except KeyboardInterrupt: print("\nSILOQY Actor test with Nautilus process management completed!") except Exception as e: print(f"Error: {e}") import traceback traceback.print_exc() finally: try: node.dispose() print("Nautilus process management: Node disposed successfully") except: pass if __name__ == "__main__": test_siloqy_actors_with_nautilus_process_management()