commit 96735125f3d588401b514330d23970d0e4f9f1e9 Author: HJ Normey Date: Sat Aug 30 20:00:45 2025 +0200 Working starting point: Nautilus integration (throttled) diff --git a/nautilus_actor_test_implementation_5x.py b/nautilus_actor_test_implementation_5x.py new file mode 100644 index 0000000..704bf37 --- /dev/null +++ b/nautilus_actor_test_implementation_5x.py @@ -0,0 +1,947 @@ + +import time +import numpy as np +import asyncio +import json +import math +from typing import Dict, List, Optional, Tuple, Any +from enum import Enum +from collections import deque +import httpx +import random # Added for _simulate_websocket_ticks + +# Nautilus imports - following test pattern +from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig +from nautilus_trader.live.node import TradingNode +from nautilus_trader.common.actor import Actor +from nautilus_trader.common.config import ActorConfig +from nautilus_trader.model.identifiers import TraderId +from nautilus_trader.core.data import Data +from nautilus_trader.common.component import Logger, init_logging + +# NEW: construct a fresh Tick for each normalized message (don't mutate immutable Data) +from nautilus_trader.model.data import TradeTick +from nautilus_trader.model.identifiers import InstrumentId, TradeId +from nautilus_trader.model.objects import Price, Quantity +from nautilus_trader.model.enums import AggressorSide + +# Initialize logging +_log_guard = init_logging() +_logger = Logger("SILOQY") + +# Topics - HIGH PERFORMANCE STRING TOPICS +RAW_TOPIC = "SILOQY.RAW.TICKS" +STRUCTURED_TOPIC = "SILOQY.STRUCTURED.TICKS" +REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS" +SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED" +CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL" + +# Rate limiting constant +MIN_INTERVAL = 2.5 # seconds between API batches + +# Market Regime Enum +class MarketRegime(Enum): + BULL = "BULL" + BEAR = "BEAR" + TRANSITION = "TRANSITION" + SIDEWAYS = "SIDEWAYS" + +# -------------------------------------------------------------------- +# SILOQY Custom Tick - PRESERVED with fixes +# -------------------------------------------------------------------- +class SiloqyTradeTick(TradeTick): + def __init__(self, instrument_id: str, price: float, size: float, ts_event: int, ts_init: int = None, + open_price: float = None, candle_open_time: int = None, tick_side: str = None, + exchange: str = None, liquidity_flag = None): + # Convert to proper Nautilus types - add venue to symbol + if isinstance(instrument_id, str): + # Add BINANCE venue if not already present + if '.' not in instrument_id: + instrument_id = f"{instrument_id}.BINANCE" + instr_id = InstrumentId.from_str(instrument_id) + else: + instr_id = instrument_id + # STEP 1 FIX: Changed precision from 4 to 8 for price, and 0 to 8 for size + price_obj = Price(price, precision=8) if not isinstance(price, Price) else price + size_obj = Quantity(size, precision=8) if not isinstance(size, Quantity) else size + + # Default aggressor side and trade ID + aggressor_side = AggressorSide.NO_AGGRESSOR # Default since Binance doesn't always provide this + trade_id = TradeId(f"T{int(time.time_ns())}") # Generate a trade ID + + super().__init__( + instrument_id=instr_id, + price=price_obj, + size=size_obj, + aggressor_side=aggressor_side, + trade_id=trade_id, + ts_event=int(ts_event), + ts_init=int(ts_init if ts_init is not None else time.time_ns()) + ) + + # Additional SILOQY fields + self.open_price = open_price if open_price is not None else price + self.candle_open_time = candle_open_time if candle_open_time is not None else int(time.time() * 1000) + self.tick_side = tick_side + self.exchange = exchange + self.liquidity_flag = liquidity_flag + +class SILOQYSymbolDiscoveryConfig(ActorConfig): + symbols: List[str] = [] + candle_interval_ms: int = 15 * 60 * 1000 + throttle_mode: bool = False + throttle_rate_limit_seconds: float = 10.0 + max_symbols_throttled: int = 100 + +class SILOQYMainActorConfig(ActorConfig): + candle_interval_ms: int = 15 * 60 * 1000 + throttle_mode: bool = False + +class DOLPHINRegimeActorConfig(ActorConfig): + max_symbols: int = 5000 + ticks_per_analysis: int = 1000 + +class SILOQYNormalizerConfig(ActorConfig): + pass + +class SILOQYSymbolDiscoveryActor(Actor): + """ + Symbol discovery with all original SILOQY algorithms preserved + Using Nautilus built-in ActorExecutor for task management + """ + def __init__(self, config: SILOQYSymbolDiscoveryConfig): + super().__init__(config) + + # Preserve original SILOQY configuration + self.symbols = list(config.symbols) if config.symbols else [] + self.candle_interval_ms = config.candle_interval_ms + self.active_candles = {} + + # Process management configuration + self.throttle_mode = config.throttle_mode + self.throttle_rate_limit_seconds = config.throttle_rate_limit_seconds + self.max_symbols_throttled = config.max_symbols_throttled + + if self.throttle_mode: + self.log.warning("THROTTLE MODE ENABLED - Running in dev/test configuration") + self.log.warning(f"Rate limit: {self.throttle_rate_limit_seconds}s between batches") + self.log.warning(f"Symbol limit: {self.max_symbols_throttled} symbols max") + + self.log.info("SILOQYSymbolDiscoveryActor initialized with Nautilus ActorExecutor") + + def on_start(self) -> None: + """Start the actor - using Nautilus ActorExecutor""" + self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor starting") + + # Use Nautilus ActorExecutor for task management + if hasattr(self, '_executor') and self._executor: + self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization") + future = self._executor.submit(self.on_start_async()) + self.log.info("Nautilus ActorExecutor: Symbol discovery task submitted") + else: + self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") + asyncio.create_task(self.on_start_async()) + + def on_stop(self) -> None: + """Stop the actor - Nautilus handles executor cleanup""" + self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor stopping") + # Nautilus kernel handles executor shutdown - no manual cleanup needed + + async def on_start_async(self) -> None: + self.log.info("Nautilus ActorExecutor: Starting async symbol discovery initialization") + + try: + # PRESERVED: Original symbol discovery algorithm + if not self.symbols: + await self._discover_all_symbols() + + # PRESERVED: Original stats fetching and candle reconstruction + stats, candle_opens = await self._fetch_stats_and_reconstruct_candles() + + # Set active candles from reconstruction results + self.active_candles = candle_opens + + # Publish results using msgbus + self._publish_discovery_results() + + self.log.info("Nautilus ActorExecutor: Symbol discovery completed successfully") + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to complete symbol discovery: {e}") + # Don't re-raise, let system continue + + async def _discover_all_symbols(self): + """PRESERVED: Original Binance symbol discovery algorithm""" + self.log.info("Starting dynamic symbol discovery from Binance...") + url = "https://api.binance.com/api/v3/exchangeInfo" + + async with httpx.AsyncClient() as client: + self.log.info("Fetching exchange info from Binance API...") + response = await client.get(url, timeout=10) + if response.status_code == 200: + self.log.info("Successfully received exchange info") + data = response.json() + # Get all trading symbols (USDT pairs for example) + full_symbols = [ + s['symbol'] for s in data['symbols'] + if s['status'] == 'TRADING' and s['symbol'].endswith('USDT') + ] + + # Apply throttle mode symbol limiting + if self.throttle_mode: + self.symbols = full_symbols[:self.max_symbols_throttled] + self.log.warning(f"THROTTLE MODE: Limited to {len(self.symbols)} symbols (from {len(full_symbols)} available)") + else: + self.symbols = full_symbols + + self.log.info(f"Discovered {len(self.symbols)} trading symbols") + self.log.info(f"First 10 symbols: {self.symbols[:10]}") + else: + self.log.error(f"Failed to fetch exchange info: {response.status_code}") + raise Exception(f"Failed to fetch exchange info: {response.status_code}") + + async def _fetch_stats_and_reconstruct_candles(self): + url = "https://api.binance.com/api/v3/ticker/24hr" + klines_url = "https://api.binance.com/api/v3/klines" + ticker_url = "https://api.binance.com/api/v3/ticker/price" + + # PRESERVED: Original rate limit handling with throttle mode support + base_rate_limit = 2.5 + rate_limit_interval = self.throttle_rate_limit_seconds if self.throttle_mode else base_rate_limit + + if self.throttle_mode: + self.log.warning(f"THROTTLE MODE: Using {rate_limit_interval}s intervals (vs {base_rate_limit}s normal)") + + stats = {} + candle_opens = {} + + async with httpx.AsyncClient() as client: + total_batches = (len(self.symbols) + 49) // 50 + for i in range(0, len(self.symbols), 50): + batch_num = (i // 50) + 1 + start_time = time.time() + symbols_batch = self.symbols[i:i + 50] + + if self.throttle_mode: + self.log.info(f"THROTTLE MODE: Processing batch {batch_num}/{total_batches} with {rate_limit_interval}s delay") + + # PRESERVED: Original boundary detection logic + current_time = int(time.time() * 1000) + time_into_candle = current_time % self.candle_interval_ms + candle_open_time = current_time - time_into_candle + at_boundary = (time_into_candle < 1000) + + if i == 0: # Log initial state + self.log.info("DOLPHIN: Current candle analysis:") + self.log.info(f" - Current time: {current_time}") + self.log.info(f" - Candle interval: {self.candle_interval_ms}ms ({self.candle_interval_ms//60000}m)") + self.log.info(f" - Time into candle: {time_into_candle}ms ({time_into_candle//1000}s)") + self.log.info(f" - At boundary: {at_boundary}") + self.log.info(f" - Candle open time: {candle_open_time}") + + symbols_json_string = json.dumps(symbols_batch, separators=(',', ':')) + params = {"symbols": symbols_json_string} + + try: + # PRESERVED: Original stats fetching + response = await client.get(url, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + for item in data: + symbol = item['symbol'] + stats[symbol] = { + 'count': int(item['count']), + 'quoteVolume': float(item['quoteVolume']), + } + self.log.info(f"Fetched stats for batch {i//50 + 1}: {len(data)} symbols") + + else: + self.log.error(f"Error {response.status_code}: {response.text}") + + # PRESERVED: Original DOLPHIN candle reconstruction strategy + if at_boundary: + # AT BOUNDARY: Fetch current prices to use as opens + self.log.info(f"DOLPHIN: At boundary - fetching current prices (batch {i//50 + 1})") + + ticker_params = {"symbols": symbols_json_string} + ticker_response = await client.get(ticker_url, params=ticker_params, timeout=5) + + if ticker_response.status_code == 200: + ticker_data = ticker_response.json() + for item in ticker_data: + symbol = item['symbol'] + current_price = float(item['price']) + candle_opens[symbol] = current_price + + self.log.info(f" - Fetched current prices for {len(ticker_data)} symbols") + else: + self.log.warning(f" - Current price fetch failed ({ticker_response.status_code})") + + else: + # NOT AT BOUNDARY: Fetch historical 1s kline data + self.log.info(f"DOLPHIN: Not at boundary - fetching 1s kline data (batch {i//50 + 1})") + + for symbol in symbols_batch: + try: + # PRESERVED: Original kline fetching logic + kline_params = { + 'symbol': symbol, + 'interval': '1s', + 'startTime': candle_open_time, + 'endTime': candle_open_time + 1000, + 'limit': 1 + } + + kline_response = await client.get(klines_url, params=kline_params, timeout=5) + + if kline_response.status_code == 200: + kline_data = kline_response.json() + if kline_data: + open_price = float(kline_data[0][1]) + candle_opens[symbol] = open_price + + if (i + len(symbols_batch)) <= 10: + self.log.info(f" - {symbol}: reconstructed open = {open_price}") + else: + self.log.warning(f" - {symbol}: no 1s kline data found") + else: + self.log.warning(f" - {symbol}: kline fetch failed ({kline_response.status_code})") + + except Exception as e: + self.log.error(f" - {symbol}: kline fetch error: {e}") + + await asyncio.sleep(0.1) + + except Exception as e: + self.log.error(f"Request failed: {str(e)}") + + # PRESERVED: Original rate limiting + elapsed = time.time() - start_time + if i + 50 < len(self.symbols): + sleep_time = max(0, rate_limit_interval - elapsed) + if sleep_time > 0: + await asyncio.sleep(sleep_time) + + self.log.info(f"DOLPHIN: Candle reconstruction complete:") + self.log.info(f" - Symbols processed: {len(self.symbols)}") + self.log.info(f" - Opens reconstructed: {len(candle_opens)}") + self.log.info("Discovery phase ready for publishing...") + + return stats, candle_opens + + def _publish_discovery_results(self): + """Publish results using msgbus - Nautilus message bus integration""" + try: + self.log.info("Nautilus ActorExecutor: Publishing discovery results to other actors...") + if hasattr(self, 'msgbus') and self.msgbus: + # Publish symbols and candles as tuples + self.msgbus.publish(SYMBOLS_DISCOVERED_TOPIC, (self.symbols, int(time.time_ns()))) + self.msgbus.publish(CANDLES_INITIAL_TOPIC, (self.active_candles, int(time.time_ns()))) + + self.log.info(f"Nautilus ActorExecutor: Published {len(self.symbols)} symbols and {len(self.active_candles)} candles") + self.log.info("Nautilus ActorExecutor: Discovery phase complete - other actors can now start processing") + else: + self.log.warning("Nautilus ActorExecutor: msgbus not available for publishing") + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish discovery results: {e}") + +class SILOQYMainActor(Actor): + def __init__(self, config: SILOQYMainActorConfig): + super().__init__(config) + + # Preserve original configuration + self.candle_interval_ms = config.candle_interval_ms + self.throttle_mode = config.throttle_mode + self.connections = {} + self.connection_tasks = {} + + # Will be populated from discovery actor + self.symbols = [] + self.active_candles = {} + + # WebSocket tasks - managed by Nautilus ActorExecutor + self.ws_tasks = [] + + # Synchronization + self.discovery_complete = asyncio.Event() + + if self.throttle_mode: + self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation") + + self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor") + + def on_start(self) -> None: + """Subscribe to discovery events - using Nautilus ActorExecutor""" + self.log.info("Nautilus ActorExecutor: SILOQYMainActor starting - subscribing to discovery events") + + # Subscribe to discovery results + if hasattr(self, 'msgbus') and self.msgbus: + self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered) + self.msgbus.subscribe(CANDLES_INITIAL_TOPIC, self.handle_candles_initial) + self.log.info("Nautilus ActorExecutor: Subscribed to discovery topics") + + # Use Nautilus ActorExecutor for task management + if hasattr(self, '_executor') and self._executor: + self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization") + future = self._executor.submit(self.on_start_async()) + self.log.info("Nautilus ActorExecutor: Main actor initialization task submitted") + else: + self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") + asyncio.create_task(self.on_start_async()) + + def on_stop(self) -> None: + """Stop the actor - Nautilus handles executor cleanup""" + self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping") + # Nautilus kernel handles executor shutdown - no manual cleanup needed + + async def on_start_async(self) -> None: + try: + # Wait for symbol discovery to complete + self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...") + await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0) + + # PRESERVED: Start WebSocket connections + await self._start_websocket_connections() + + self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully") + + except asyncio.TimeoutError: + self.log.error("Nautilus ActorExecutor: Timeout waiting for symbol discovery") + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to start SILOQYMainActor: {e}") + + def handle_symbols_discovered(self, data): + """Handle symbols from discovery actor""" + try: + symbols, timestamp = data + self.symbols = symbols + self.log.info(f"Nautilus ActorExecutor: Received {len(symbols)} symbols from discovery actor") + + if self.symbols and self.active_candles: + self.discovery_complete.set() + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}") + + # FIXED: SILOQYMainActor.handle_candles_initial + def handle_candles_initial(self, data): + """Handle initial candles from discovery actor and properly initialize the candle dict.""" + try: + candles_received, timestamp = data + self.log.info(f"Nautilus ActorExecutor: Received {len(candles_received)} initial candles from discovery actor") + + # FIX: Re-initialize active_candles with the full dictionary structure + # STEP 5 FIX: Added rounding to 8 decimals for all price values + self.active_candles = {} + for symbol, open_price in candles_received.items(): + self.active_candles[symbol] = { + 'open_price': round(open_price, 8), + 'open_time': (int(time.time() * 1000) // self.candle_interval_ms) * self.candle_interval_ms, + 'high': round(open_price, 8), + 'low': round(open_price, 8), + 'close': round(open_price, 8), + 'volume': 0.0, + } + + if self.symbols and self.active_candles: + self.discovery_complete.set() + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}") + + async def _start_websocket_connections(self): + """PRESERVED: Original WebSocket connection logic with Nautilus ActorExecutor""" + self.log.info("Starting WebSocket simulation for tick generation...") + self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols") + + # Use Nautilus ActorExecutor for WebSocket task management + if hasattr(self, '_executor') and self._executor: + self.log.info("Nautilus ActorExecutor: Using registered executor for WebSocket simulation") + future = self._executor.submit(self._simulate_websocket_ticks()) + self.log.info("Nautilus ActorExecutor: WebSocket simulation task submitted") + else: + self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") + task = asyncio.create_task(self._simulate_websocket_ticks()) + self.ws_tasks.append(task) + + async def _simulate_websocket_ticks(self): + # Adjust tick rate for throttle mode + tick_delay = 0.1 if self.throttle_mode else 0.01 + symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols)) + + tick_count = 0 + try: + if self.throttle_mode: + self.log.warning(f"THROTTLE MODE: Reduced tick generation - {symbols_to_simulate} symbols at 10 ticks/second") + else: + self.log.info("WebSocket tick simulation started - generating 100 ticks/second") + + while True: + for symbol in self.symbols[:symbols_to_simulate]: + # STEP 2 FIX: Round price to 8 decimals, use float for quantity with 8 decimals + price = round(100.0 + random.gauss(0, 0.5), 8) + quantity = round(random.uniform(0.00000001, 100.0), 8) + timestamp = int(time.time() * 1000) + + self.on_websocket_tick(symbol, price, quantity, timestamp) + tick_count += 1 + + # Log every 500 ticks in throttle mode, 1000 in normal mode + log_interval = 500 if self.throttle_mode else 1000 + if tick_count % log_interval == 0: + mode_indicator = "THROTTLE" if self.throttle_mode else "" + self.log.info(f"Generated {tick_count} ticks {mode_indicator} - latest: {symbol}@{price:.2f}") + + await asyncio.sleep(tick_delay) + except asyncio.CancelledError: + self.log.info(f"Nautilus ActorExecutor: WebSocket simulation stopped after {tick_count} ticks") + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: WebSocket simulation error: {e}") + + def on_websocket_tick(self, symbol: str, price: float, quantity: float, timestamp: int): + """ + PRESERVED: Original tick processing and candle update logic + """ + # PRESERVED: Original candle update logic + if symbol not in self.active_candles: + candle_open_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms + # STEP 4 FIX: Round all prices to 8 decimals + self.active_candles[symbol] = { + 'open_price': round(price, 8), + 'open_time': candle_open_time, + 'high': round(price, 8), + 'low': round(price, 8), + 'close': round(price, 8), + 'volume': 0.0 + } + + candle = self.active_candles[symbol] + + # PRESERVED: Original candle rolling logic + current_candle_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms + if current_candle_time > candle['open_time']: + # STEP 4 FIX: Round all prices to 8 decimals + candle['open_price'] = round(price, 8) + candle['open_time'] = current_candle_time + candle['high'] = round(price, 8) + candle['low'] = round(price, 8) + candle['close'] = round(price, 8) + candle['volume'] = quantity + else: + candle['close'] = round(price, 8) + candle['high'] = round(max(candle['high'], price), 8) + candle['low'] = round(min(candle['low'], price), 8) + candle['volume'] += quantity + + # PRESERVED: Original high-performance tuple publishing + try: + if hasattr(self, 'msgbus') and self.msgbus: + # STEP 3 FIX: Round all float values to 8 decimals in the tuple + tick_tuple = ( + symbol, + round(float(price), 8), + round(float(quantity), 8), + int(timestamp), + round(float(candle['open_price']), 8), + int(candle['open_time']) + ) + + self.msgbus.publish(RAW_TOPIC, tick_tuple) + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish tick: {e}") + +class DOLPHINRegimeActor(Actor): + def __init__(self, config: DOLPHINRegimeActorConfig): + super().__init__(config) + + # PRESERVED: All original DOLPHIN configuration + self.max_symbols = config.max_symbols + self.ticks_per_analysis = config.ticks_per_analysis + + # PRESERVED: All original pre-allocated arrays for zero allocation + self.open_prices = np.zeros(self.max_symbols, dtype=np.float64) + self.close_prices = np.zeros(self.max_symbols, dtype=np.float64) + self.high_prices = np.zeros(self.max_symbols, dtype=np.float64) + self.low_prices = np.zeros(self.max_symbols, dtype=np.float64) + self.volumes = np.zeros(self.max_symbols, dtype=np.float64) + self.last_update = np.zeros(self.max_symbols, dtype=np.int64) + + # PRESERVED: All original mapping and state + self.symbol_to_idx = {} + self.idx_to_symbol = {} + self.active_symbols = 0 + + self.tick_count = 0 + + # PRESERVED: All original DOLPHIN thresholds - EXACT + self.bull_threshold = 0.60 # 60% bullish + self.bear_threshold = 0.55 # 55% bearish + + self.previous_bull_ratio = None + self.regime_history = deque(maxlen=100) + + # Metrics + self.last_metric_log = time.time_ns() + self.processed_ticks = 0 + self.regime_calculations = 0 + + self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, " + f"ticks_per_analysis: {self.ticks_per_analysis}") + + def on_start(self) -> None: + """Subscribe to tick events - using Nautilus ActorExecutor""" + self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to tick events") + + if hasattr(self, 'msgbus') and self.msgbus: + self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) + self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events") + + def on_stop(self) -> None: + """Stop the actor - Nautilus handles executor cleanup""" + self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping") + # Nautilus kernel handles executor shutdown - no manual cleanup needed + + def handle_raw_tick(self, data): + """ + PRESERVED EXACTLY: All original zero-allocation tick processing + Using Nautilus ActorExecutor for regime detection tasks + """ + try: + symbol, price, size, ts_event, open_price, candle_open_time = data + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}") + return + + # PRESERVED EXACTLY: All original array processing logic + if symbol not in self.symbol_to_idx: + if self.active_symbols >= self.max_symbols: + self.log.error(f"Nautilus ActorExecutor: Max symbols ({self.max_symbols}) exceeded") + return + + idx = self.active_symbols + self.symbol_to_idx[symbol] = idx + self.idx_to_symbol[idx] = symbol + self.active_symbols += 1 + + # Initialize arrays + self.open_prices[idx] = open_price + self.high_prices[idx] = price + self.low_prices[idx] = price + self.close_prices[idx] = price + self.volumes[idx] = 0.0 + else: + idx = self.symbol_to_idx[symbol] + + # Check if new candle period + if candle_open_time > self.last_update[idx]: + # Reset for new candle + self.open_prices[idx] = open_price + self.high_prices[idx] = price + self.low_prices[idx] = price + self.close_prices[idx] = price + self.volumes[idx] = size + self.last_update[idx] = candle_open_time + else: + # Update existing candle data + self.close_prices[idx] = price + self.high_prices[idx] = max(self.high_prices[idx], price) + self.low_prices[idx] = min(self.low_prices[idx], price) + self.volumes[idx] += size + + self.tick_count += 1 + self.processed_ticks += 1 + + # PRESERVED: Original trigger logic + if self.tick_count >= self.ticks_per_analysis: + # Use Nautilus ActorExecutor for regime detection + if hasattr(self, '_executor') and self._executor: + future = self._executor.submit(self._run_regime_detection_async()) + else: + # Fallback to sync detection + self._run_regime_detection() + self.tick_count = 0 + + # Periodic metrics logging + now = time.time_ns() + if now - self.last_metric_log > 1_000_000_000: + self.log.info(f"Nautilus ActorExecutor: DOLPHIN metrics - ticks: {self.processed_ticks}, " + f"regime_calcs: {self.regime_calculations}, active_symbols: {self.active_symbols}") + self.last_metric_log = now + + async def _run_regime_detection_async(self): + try: + self._run_regime_detection() + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}") + + def _run_regime_detection(self): + self.regime_calculations += 1 + + total_symbols = self.active_symbols + if total_symbols == 0: + return + + analyzed = 0 + bullish = 0 + bearish = 0 + + # PRESERVED: Original analysis with exact thresholds + for idx in range(self.active_symbols): + open_price = self.open_prices[idx] + close_price = self.close_prices[idx] + + if open_price == 0: + continue + + analyzed += 1 + + # PRESERVED: EXACT DOLPHIN thresholds + change = (close_price - open_price) / open_price + + if change >= 0.0015: # 0.15% threshold for bullish + bullish += 1 + elif change <= -0.0015: # -0.15% threshold for bearish + bearish += 1 + + if analyzed == 0: + return + + # PRESERVED: Original ratio calculations + bull_ratio = bullish / analyzed + bear_ratio = bearish / analyzed + sideways_ratio = 1.0 - bull_ratio - bear_ratio + + # PRESERVED: Original regime determination logic + if bull_ratio >= self.bull_threshold: # 60% bullish + regime = MarketRegime.BULL + elif bear_ratio >= self.bear_threshold: # 55% bearish + regime = MarketRegime.BEAR + else: + # Check for transition + if self.previous_bull_ratio is not None: + ratio_change = abs(bull_ratio - self.previous_bull_ratio) + if ratio_change >= 0.15: # 15% change threshold + regime = MarketRegime.TRANSITION + else: + regime = MarketRegime.SIDEWAYS + else: + regime = MarketRegime.SIDEWAYS + + # PRESERVED: Original confidence calculation + confidence = self._calculate_confidence(bull_ratio, bear_ratio, analyzed, total_symbols) + + self.previous_bull_ratio = bull_ratio + + # Publish regime result using Nautilus message bus + try: + if hasattr(self, 'msgbus') and self.msgbus: + regime_tuple = ( + int(time.time() * 1000), + regime.value, + float(bull_ratio), + float(bear_ratio), + float(sideways_ratio), + int(analyzed), + int(total_symbols), + float(confidence) + ) + + self.msgbus.publish(REGIME_TOPIC, regime_tuple) + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}") + + # Log regime changes + if not self.regime_history or regime != self.regime_history[-1]: + self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.1%} " + f"Bear: {bear_ratio:.1%} Sideways: {sideways_ratio:.1%} | " + f"Confidence: {confidence:.1%} | Analyzed: {analyzed}/{total_symbols}") + self.regime_history.append(regime) + + # Periodic regime status (even without changes) + if self.regime_calculations % 10 == 0: # Every 10 calculations + self.log.info(f"REGIME STATUS: {regime.value} | Bull: {bull_ratio:.1%} " + f"Bear: {bear_ratio:.1%} | Processed: {self.processed_ticks} ticks") + + def _calculate_confidence(self, bull_ratio: float, bear_ratio: float, + analyzed: int, total: int) -> float: + """PRESERVED EXACTLY: Original DOLPHIN confidence calculation""" + if analyzed == 0: + return 0.0 + + # Market Decisiveness + max_ratio = max(bull_ratio, bear_ratio) + decisiveness = abs(max_ratio - 0.5) * 2 + + # Sample Coverage + coverage = analyzed / total + + # Statistical Significance + if max_ratio > 0 and max_ratio < 1: + standard_error = math.sqrt(max_ratio * (1 - max_ratio) / analyzed) + else: + standard_error = 0.0 + + z_score = abs(max_ratio - 0.5) / max(standard_error, 0.001) + statistical_confidence = min(z_score / 3.0, 1.0) + + # Market Clarity + market_clarity = bull_ratio + bear_ratio + + # Weighted combination + confidence = ( + decisiveness * 0.40 + + coverage * 0.10 + + statistical_confidence * 0.30 + + market_clarity * 0.20 + ) + + return max(0.0, min(confidence, 1.0)) + +class SILOQYNormalizerActor(Actor): + def __init__(self, config: SILOQYNormalizerConfig): + super().__init__(config) + + self.normalized_count = 0 + self.last_metric_log = time.time_ns() + + self.log.info("SILOQYNormalizerActor initialized with Nautilus ActorExecutor") + + def on_start(self) -> None: + """Subscribe to tick events - using Nautilus ActorExecutor""" + self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor starting - subscribing to tick events") + + if hasattr(self, 'msgbus') and self.msgbus: + self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) + self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events") + + def on_stop(self) -> None: + """Stop the actor - Nautilus handles executor cleanup""" + self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor stopping") + # Nautilus kernel handles executor shutdown - no manual cleanup needed + + def handle_raw_tick(self, data): + try: + symbol, price, size, ts_event, open_price, candle_open_time = data + tick = SiloqyTradeTick( + instrument_id=symbol, + price=round(float(price), 8), + size=round(float(size), 8), + ts_event=int(ts_event), + ts_init=int(time.time_ns()), + open_price=round(float(open_price), 8) if open_price is not None else round(float(price), 8), + candle_open_time=int(candle_open_time) if candle_open_time is not None else int(time.time() * 1000), + tick_side=None, # Default to None unless Binance provides this info + exchange="BINANCE", # We know this is from Binance + liquidity_flag=None # Default to None unless Binance provides this info + ) + + if hasattr(self, 'msgbus') and self.msgbus: + try: + # Publish directly to STRUCTURED_TOPIC to match original v3.28 behavior + self.msgbus.publish(STRUCTURED_TOPIC, tick) + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish to STRUCTURED_TOPIC: {e}") + + # FALLBACK: Try Nautilus publish_data if msgbus not available + elif hasattr(self, 'publish_data'): + try: + # Use standard Nautilus publishing as fallback + self.publish_data(DataType(SiloqyTradeTick), tick) + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish structured data: {e}") + self.normalized_count += 1 + + # Periodic metrics + now = time.time_ns() + if now - self.last_metric_log > 1_000_000_000: + self.log.info(f"Nautilus ActorExecutor: Normalizer processed: {self.normalized_count} ticks") + self.last_metric_log = now + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to handle raw tick: {e}") + +def test_siloqy_actors_with_nautilus_process_management(): + """Test SILOQY actors with Nautilus built-in ActorExecutor process management""" + print("SILOQY Multi-Actor Test with Nautilus Built-in Process Management") + + # Configuration following test pattern - THROTTLE MODE ENABLED FOR DEV/TESTING + symbol_discovery_config = ImportableActorConfig( + actor_path="__main__:SILOQYSymbolDiscoveryActor", + config_path="__main__:SILOQYSymbolDiscoveryConfig", + config={ + "component_id": "SILOQY-SYMBOL-DISCOVERY", + "symbols": [], # Empty for dynamic discovery + "candle_interval_ms": 15 * 60 * 1000, + "throttle_mode": True, # ENABLED: Safe for dual instance testing + "throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s) + "max_symbols_throttled": 100 # Only 100 symbols (vs 2000+) + } + ) + + main_actor_config = ImportableActorConfig( + actor_path="__main__:SILOQYMainActor", + config_path="__main__:SILOQYMainActorConfig", + config={ + "component_id": "SILOQY-MAIN-ACTOR", + "candle_interval_ms": 15 * 60 * 1000, + "throttle_mode": True # ENABLED: Reduced tick generation + } + ) + + regime_actor_config = ImportableActorConfig( + actor_path="__main__:DOLPHINRegimeActor", + config_path="__main__:DOLPHINRegimeActorConfig", + config={ + "component_id": "DOLPHIN-REGIME-ACTOR", + "max_symbols": 5000, + "ticks_per_analysis": 500 # Reduced for throttle mode testing + } + ) + + normalizer_config = ImportableActorConfig( + actor_path="__main__:SILOQYNormalizerActor", + config_path="__main__:SILOQYNormalizerConfig", + config={ + "component_id": "SILOQY-NORMALIZER" + } + ) + + # Trading node configuration - Uses Nautilus built-in process management + trading_config = TradingNodeConfig( + trader_id=TraderId("SILOQY-TRADER-001"), + actors=[ + symbol_discovery_config, + main_actor_config, + regime_actor_config, + normalizer_config + ], + data_clients={}, # No external data clients needed + exec_clients={} # No execution clients for this test + ) + + node = TradingNode(config=trading_config) + + try: + node.build() + print("Node built successfully with Nautilus built-in process management") + node.run() + + except KeyboardInterrupt: + print("\nSILOQY Actor test with Nautilus process management completed!") + except Exception as e: + print(f"Error: {e}") + import traceback + traceback.print_exc() + finally: + try: + node.dispose() + print("Nautilus process management: Node disposed successfully") + except: + pass + +if __name__ == "__main__": + test_siloqy_actors_with_nautilus_process_management() \ No newline at end of file