diff --git a/nautilus_actor_test_implementation_5x.py b/nautilus_actor_test_implementation_5x.py index 42be24f..ded83bf 100644 --- a/nautilus_actor_test_implementation_5x.py +++ b/nautilus_actor_test_implementation_5x.py @@ -925,44 +925,64 @@ class DOLPHINRegimeActor(Actor): def on_start(self) -> None: """Subscribe to tick events - using Nautilus ActorExecutor""" - self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to tick events") + self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to events") if hasattr(self, 'msgbus') and self.msgbus: self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) - self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events") + self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered) + self.msgbus.subscribe(TICK_SIZES_TOPIC, self.handle_tick_sizes) + self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events, symbol discovery and tick sizes") def on_stop(self) -> None: """Stop the actor - Nautilus handles executor cleanup""" self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping") # Nautilus kernel handles executor shutdown - no manual cleanup needed + def handle_symbols_discovered(self, data): + """Pre-initialize symbol mappings during discovery phase""" + try: + symbols, timestamp = data + self.log.info(f"Nautilus ActorExecutor: Pre-initializing {len(symbols)} symbol mappings") + + for symbol in symbols: + if self.active_symbols >= self.max_symbols: + self.log.warning(f"Max symbols ({self.max_symbols}) exceeded during initialization") + break + + idx = self.active_symbols + self.symbol_to_idx[symbol] = idx + self.idx_to_symbol[idx] = symbol + self.active_symbols += 1 + + self.log.info(f"Pre-initialized {self.active_symbols} symbol mappings") + + except Exception as e: + self.log.error(f"Error pre-initializing symbols: {e}") + def handle_tick_sizes(self, data): - """Handle tick sizes from discovery actor""" - try: - tick_sizes, timestamp = data - self.log.info(f"Nautilus ActorExecutor: Received {len(tick_sizes)} tick sizes from discovery actor") - - # Update the pre-allocated array with actual tick sizes - updated_count = 0 - for symbol, tick_size in tick_sizes.items(): - if symbol in self.symbol_to_idx: - idx = self.symbol_to_idx[symbol] - # Validate tick size - if tick_size <= 0 or tick_size > 1.0: - self.log.warning(f"Invalid tick size {tick_size} for {symbol}, using fallback") - self.tick_sizes[idx] = 1e-8 # Use fallback - else: - self.tick_sizes[idx] = tick_size - updated_count += 1 - - self.log.info(f"Nautilus ActorExecutor: Updated {updated_count} tick sizes in pre-allocated array") - - except Exception as e: - self.log.error(f"Nautilus ActorExecutor: Error handling tick sizes: {e}") + """Apply tick sizes to pre-initialized symbol mappings""" + try: + tick_sizes, timestamp = data + self.log.info(f"Nautilus ActorExecutor: Received {len(tick_sizes)} tick sizes from discovery actor") + + applied_count = 0 + for symbol, tick_size in tick_sizes.items(): + if symbol in self.symbol_to_idx: # Will exist from pre-initialization + idx = self.symbol_to_idx[symbol] + if 0 < tick_size <= 1.0: + self.tick_sizes[idx] = tick_size + applied_count += 1 + else: + self.log.warning(f"Invalid tick size {tick_size} for {symbol}, using fallback") + + self.log.info(f"Nautilus ActorExecutor: Applied {applied_count} tick sizes to pre-initialized symbols") + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Error handling tick sizes: {e}") def handle_raw_tick(self, data): """ - PRESERVED EXACTLY: All original zero-allocation tick processing + SIMPLIFIED: Zero-allocation tick processing with pre-initialized symbols Using Nautilus ActorExecutor for regime detection tasks """ try: @@ -972,41 +992,28 @@ class DOLPHINRegimeActor(Actor): self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}") return - # PRESERVED EXACTLY: All original array processing logic + # Direct lookup - symbol will exist from pre-initialization if symbol not in self.symbol_to_idx: - if self.active_symbols >= self.max_symbols: - self.log.error(f"Nautilus ActorExecutor: Max symbols ({self.max_symbols}) exceeded") - return + self.log.error(f"Nautilus ActorExecutor: Symbol {symbol} not found in pre-initialized mappings") + return - idx = self.active_symbols - self.symbol_to_idx[symbol] = idx - self.idx_to_symbol[idx] = symbol - self.active_symbols += 1 - - # Initialize arrays + idx = self.symbol_to_idx[symbol] + + # Check if new candle period + if candle_open_time > self.last_update[idx]: + # Reset for new candle self.open_prices[idx] = open_price self.high_prices[idx] = price self.low_prices[idx] = price self.close_prices[idx] = price - self.volumes[idx] = 0.0 + self.volumes[idx] = size + self.last_update[idx] = candle_open_time else: - idx = self.symbol_to_idx[symbol] - - # Check if new candle period - if candle_open_time > self.last_update[idx]: - # Reset for new candle - self.open_prices[idx] = open_price - self.high_prices[idx] = price - self.low_prices[idx] = price - self.close_prices[idx] = price - self.volumes[idx] = size - self.last_update[idx] = candle_open_time - else: - # Update existing candle data - self.close_prices[idx] = price - self.high_prices[idx] = max(self.high_prices[idx], price) - self.low_prices[idx] = min(self.low_prices[idx], price) - self.volumes[idx] += size + # Update existing candle data + self.close_prices[idx] = price + self.high_prices[idx] = max(self.high_prices[idx], price) + self.low_prices[idx] = min(self.low_prices[idx], price) + self.volumes[idx] += size self.tick_count += 1 self.processed_ticks += 1