WIP: feat(nautilus): initial integration #1
@@ -925,44 +925,64 @@ class DOLPHINRegimeActor(Actor):
|
|||||||
|
|
||||||
def on_start(self) -> None:
|
def on_start(self) -> None:
|
||||||
"""Subscribe to tick events - using Nautilus ActorExecutor"""
|
"""Subscribe to tick events - using Nautilus ActorExecutor"""
|
||||||
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to tick events")
|
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to events")
|
||||||
|
|
||||||
if hasattr(self, 'msgbus') and self.msgbus:
|
if hasattr(self, 'msgbus') and self.msgbus:
|
||||||
self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick)
|
self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick)
|
||||||
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events")
|
self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered)
|
||||||
|
self.msgbus.subscribe(TICK_SIZES_TOPIC, self.handle_tick_sizes)
|
||||||
|
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events, symbol discovery and tick sizes")
|
||||||
|
|
||||||
def on_stop(self) -> None:
|
def on_stop(self) -> None:
|
||||||
"""Stop the actor - Nautilus handles executor cleanup"""
|
"""Stop the actor - Nautilus handles executor cleanup"""
|
||||||
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping")
|
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping")
|
||||||
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
||||||
|
|
||||||
|
def handle_symbols_discovered(self, data):
|
||||||
|
"""Pre-initialize symbol mappings during discovery phase"""
|
||||||
|
try:
|
||||||
|
symbols, timestamp = data
|
||||||
|
self.log.info(f"Nautilus ActorExecutor: Pre-initializing {len(symbols)} symbol mappings")
|
||||||
|
|
||||||
|
for symbol in symbols:
|
||||||
|
if self.active_symbols >= self.max_symbols:
|
||||||
|
self.log.warning(f"Max symbols ({self.max_symbols}) exceeded during initialization")
|
||||||
|
break
|
||||||
|
|
||||||
|
idx = self.active_symbols
|
||||||
|
self.symbol_to_idx[symbol] = idx
|
||||||
|
self.idx_to_symbol[idx] = symbol
|
||||||
|
self.active_symbols += 1
|
||||||
|
|
||||||
|
self.log.info(f"Pre-initialized {self.active_symbols} symbol mappings")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Error pre-initializing symbols: {e}")
|
||||||
|
|
||||||
def handle_tick_sizes(self, data):
|
def handle_tick_sizes(self, data):
|
||||||
"""Handle tick sizes from discovery actor"""
|
"""Apply tick sizes to pre-initialized symbol mappings"""
|
||||||
try:
|
try:
|
||||||
tick_sizes, timestamp = data
|
tick_sizes, timestamp = data
|
||||||
self.log.info(f"Nautilus ActorExecutor: Received {len(tick_sizes)} tick sizes from discovery actor")
|
self.log.info(f"Nautilus ActorExecutor: Received {len(tick_sizes)} tick sizes from discovery actor")
|
||||||
|
|
||||||
# Update the pre-allocated array with actual tick sizes
|
applied_count = 0
|
||||||
updated_count = 0
|
for symbol, tick_size in tick_sizes.items():
|
||||||
for symbol, tick_size in tick_sizes.items():
|
if symbol in self.symbol_to_idx: # Will exist from pre-initialization
|
||||||
if symbol in self.symbol_to_idx:
|
idx = self.symbol_to_idx[symbol]
|
||||||
idx = self.symbol_to_idx[symbol]
|
if 0 < tick_size <= 1.0:
|
||||||
# Validate tick size
|
self.tick_sizes[idx] = tick_size
|
||||||
if tick_size <= 0 or tick_size > 1.0:
|
applied_count += 1
|
||||||
self.log.warning(f"Invalid tick size {tick_size} for {symbol}, using fallback")
|
else:
|
||||||
self.tick_sizes[idx] = 1e-8 # Use fallback
|
self.log.warning(f"Invalid tick size {tick_size} for {symbol}, using fallback")
|
||||||
else:
|
|
||||||
self.tick_sizes[idx] = tick_size
|
|
||||||
updated_count += 1
|
|
||||||
|
|
||||||
self.log.info(f"Nautilus ActorExecutor: Updated {updated_count} tick sizes in pre-allocated array")
|
self.log.info(f"Nautilus ActorExecutor: Applied {applied_count} tick sizes to pre-initialized symbols")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error(f"Nautilus ActorExecutor: Error handling tick sizes: {e}")
|
self.log.error(f"Nautilus ActorExecutor: Error handling tick sizes: {e}")
|
||||||
|
|
||||||
def handle_raw_tick(self, data):
|
def handle_raw_tick(self, data):
|
||||||
"""
|
"""
|
||||||
PRESERVED EXACTLY: All original zero-allocation tick processing
|
SIMPLIFIED: Zero-allocation tick processing with pre-initialized symbols
|
||||||
Using Nautilus ActorExecutor for regime detection tasks
|
Using Nautilus ActorExecutor for regime detection tasks
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
@@ -972,41 +992,28 @@ class DOLPHINRegimeActor(Actor):
|
|||||||
self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}")
|
self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# PRESERVED EXACTLY: All original array processing logic
|
# Direct lookup - symbol will exist from pre-initialization
|
||||||
if symbol not in self.symbol_to_idx:
|
if symbol not in self.symbol_to_idx:
|
||||||
if self.active_symbols >= self.max_symbols:
|
self.log.error(f"Nautilus ActorExecutor: Symbol {symbol} not found in pre-initialized mappings")
|
||||||
self.log.error(f"Nautilus ActorExecutor: Max symbols ({self.max_symbols}) exceeded")
|
return
|
||||||
return
|
|
||||||
|
|
||||||
idx = self.active_symbols
|
idx = self.symbol_to_idx[symbol]
|
||||||
self.symbol_to_idx[symbol] = idx
|
|
||||||
self.idx_to_symbol[idx] = symbol
|
|
||||||
self.active_symbols += 1
|
|
||||||
|
|
||||||
# Initialize arrays
|
# Check if new candle period
|
||||||
|
if candle_open_time > self.last_update[idx]:
|
||||||
|
# Reset for new candle
|
||||||
self.open_prices[idx] = open_price
|
self.open_prices[idx] = open_price
|
||||||
self.high_prices[idx] = price
|
self.high_prices[idx] = price
|
||||||
self.low_prices[idx] = price
|
self.low_prices[idx] = price
|
||||||
self.close_prices[idx] = price
|
self.close_prices[idx] = price
|
||||||
self.volumes[idx] = 0.0
|
self.volumes[idx] = size
|
||||||
|
self.last_update[idx] = candle_open_time
|
||||||
else:
|
else:
|
||||||
idx = self.symbol_to_idx[symbol]
|
# Update existing candle data
|
||||||
|
self.close_prices[idx] = price
|
||||||
# Check if new candle period
|
self.high_prices[idx] = max(self.high_prices[idx], price)
|
||||||
if candle_open_time > self.last_update[idx]:
|
self.low_prices[idx] = min(self.low_prices[idx], price)
|
||||||
# Reset for new candle
|
self.volumes[idx] += size
|
||||||
self.open_prices[idx] = open_price
|
|
||||||
self.high_prices[idx] = price
|
|
||||||
self.low_prices[idx] = price
|
|
||||||
self.close_prices[idx] = price
|
|
||||||
self.volumes[idx] = size
|
|
||||||
self.last_update[idx] = candle_open_time
|
|
||||||
else:
|
|
||||||
# Update existing candle data
|
|
||||||
self.close_prices[idx] = price
|
|
||||||
self.high_prices[idx] = max(self.high_prices[idx], price)
|
|
||||||
self.low_prices[idx] = min(self.low_prices[idx], price)
|
|
||||||
self.volumes[idx] += size
|
|
||||||
|
|
||||||
self.tick_count += 1
|
self.tick_count += 1
|
||||||
self.processed_ticks += 1
|
self.processed_ticks += 1
|
||||||
|
|||||||
Reference in New Issue
Block a user