Files
siloqy/nautilus_actor_test_implementation_5x.py

939 lines
42 KiB
Python
Raw Permalink Normal View History

import time
import numpy as np
import asyncio
import json
import math
from typing import Dict, List, Optional, Tuple, Any
from enum import Enum
from collections import deque
import httpx
import random # Added for _simulate_websocket_ticks
# Nautilus imports - following test pattern
from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig
from nautilus_trader.live.node import TradingNode
from nautilus_trader.common.actor import Actor
from nautilus_trader.common.config import ActorConfig
from nautilus_trader.model.identifiers import TraderId
from nautilus_trader.core.data import Data
from nautilus_trader.common.component import Logger, init_logging
# NEW: construct a fresh Tick for each normalized message (don't mutate immutable Data)
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.identifiers import InstrumentId, TradeId
from nautilus_trader.model.objects import Price, Quantity
from nautilus_trader.model.enums import AggressorSide
# Initialize logging
_log_guard = init_logging()
_logger = Logger("SILOQY")
# Topics - HIGH PERFORMANCE STRING TOPICS
RAW_TOPIC = "SILOQY.RAW.TICKS"
STRUCTURED_TOPIC = "SILOQY.STRUCTURED.TICKS"
REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS"
SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED"
CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL"
# Rate limiting constant
MIN_INTERVAL = 2.5 # seconds between API batches
# Market Regime Enum
class MarketRegime(Enum):
BULL = "BULL"
BEAR = "BEAR"
TRANSITION = "TRANSITION"
SIDEWAYS = "SIDEWAYS"
# --------------------------------------------------------------------
# SILOQY Custom Tick - PRESERVED with fixes
# --------------------------------------------------------------------
class SiloqyTradeTick(TradeTick):
def __init__(self, instrument_id: str, price: float, size: float, ts_event: int, ts_init: int = None,
open_price: float = None, candle_open_time: int = None, tick_side: str = None,
exchange: str = None, liquidity_flag = None):
# Convert to proper Nautilus types - add venue to symbol
if isinstance(instrument_id, str):
# Add BINANCE venue if not already present
if '.' not in instrument_id:
instrument_id = f"{instrument_id}.BINANCE"
instr_id = InstrumentId.from_str(instrument_id)
else:
instr_id = instrument_id
# STEP 1 FIX: Changed precision from 4 to 8 for price, and 0 to 8 for size
price_obj = Price(price, precision=8) if not isinstance(price, Price) else price
size_obj = Quantity(size, precision=8) if not isinstance(size, Quantity) else size
# Default aggressor side and trade ID
aggressor_side = AggressorSide.NO_AGGRESSOR # Default since Binance doesn't always provide this
trade_id = TradeId(f"T{int(time.time_ns())}") # Generate a trade ID
super().__init__(
instrument_id=instr_id,
price=price_obj,
size=size_obj,
aggressor_side=aggressor_side,
trade_id=trade_id,
ts_event=int(ts_event),
ts_init=int(ts_init if ts_init is not None else time.time_ns())
)
# Additional SILOQY fields
self.open_price = open_price if open_price is not None else price
self.candle_open_time = candle_open_time if candle_open_time is not None else int(time.time() * 1000)
self.tick_side = tick_side
self.exchange = exchange
self.liquidity_flag = liquidity_flag
class SILOQYSymbolDiscoveryConfig(ActorConfig):
symbols: List[str] = []
candle_interval_ms: int = 15 * 60 * 1000
throttle_mode: bool = False
throttle_rate_limit_seconds: float = 10.0
max_symbols_throttled: int = 100
class SILOQYMainActorConfig(ActorConfig):
candle_interval_ms: int = 15 * 60 * 1000
throttle_mode: bool = False
class DOLPHINRegimeActorConfig(ActorConfig):
max_symbols: int = 5000
ticks_per_analysis: int = 1000
class SILOQYNormalizerConfig(ActorConfig):
pass
class SILOQYSymbolDiscoveryActor(Actor):
"""
Symbol discovery with all original SILOQY algorithms preserved
Using Nautilus built-in ActorExecutor for task management
"""
def __init__(self, config: SILOQYSymbolDiscoveryConfig):
super().__init__(config)
# Preserve original SILOQY configuration
self.symbols = list(config.symbols) if config.symbols else []
self.candle_interval_ms = config.candle_interval_ms
self.active_candles = {}
# Process management configuration
self.throttle_mode = config.throttle_mode
self.throttle_rate_limit_seconds = config.throttle_rate_limit_seconds
self.max_symbols_throttled = config.max_symbols_throttled
if self.throttle_mode:
self.log.warning("THROTTLE MODE ENABLED - Running in dev/test configuration")
self.log.warning(f"Rate limit: {self.throttle_rate_limit_seconds}s between batches")
self.log.warning(f"Symbol limit: {self.max_symbols_throttled} symbols max")
self.log.info("SILOQYSymbolDiscoveryActor initialized with Nautilus ActorExecutor")
def on_start(self) -> None:
"""Start the actor - using Nautilus ActorExecutor"""
self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor starting")
# Use Nautilus ActorExecutor for task management
if hasattr(self, '_executor') and self._executor:
self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization")
future = self._executor.submit(self.on_start_async())
self.log.info("Nautilus ActorExecutor: Symbol discovery task submitted")
else:
self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio")
asyncio.create_task(self.on_start_async())
def on_stop(self) -> None:
"""Stop the actor - Nautilus handles executor cleanup"""
self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor stopping")
# Nautilus kernel handles executor shutdown - no manual cleanup needed
async def on_start_async(self) -> None:
self.log.info("Nautilus ActorExecutor: Starting async symbol discovery initialization")
try:
# PRESERVED: Original symbol discovery algorithm
if not self.symbols:
await self._discover_all_symbols()
# PRESERVED: Original stats fetching and candle reconstruction
stats, candle_opens = await self._fetch_stats_and_reconstruct_candles()
# Set active candles from reconstruction results
self.active_candles = candle_opens
# Publish results using msgbus
self._publish_discovery_results()
self.log.info("Nautilus ActorExecutor: Symbol discovery completed successfully")
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to complete symbol discovery: {e}")
# Don't re-raise, let system continue
async def _discover_all_symbols(self):
"""PRESERVED: Original Binance symbol discovery algorithm"""
self.log.info("Starting dynamic symbol discovery from Binance...")
url = "https://api.binance.com/api/v3/exchangeInfo"
async with httpx.AsyncClient() as client:
self.log.info("Fetching exchange info from Binance API...")
response = await client.get(url, timeout=10)
if response.status_code == 200:
self.log.info("Successfully received exchange info")
data = response.json()
# Get all trading symbols (USDT pairs for example)
full_symbols = [
s['symbol'] for s in data['symbols']
if s['status'] == 'TRADING' and s['symbol'].endswith('USDT')
]
# Apply throttle mode symbol limiting
if self.throttle_mode:
self.symbols = full_symbols[:self.max_symbols_throttled]
self.log.warning(f"THROTTLE MODE: Limited to {len(self.symbols)} symbols (from {len(full_symbols)} available)")
else:
self.symbols = full_symbols
self.log.info(f"Discovered {len(self.symbols)} trading symbols")
self.log.info(f"First 10 symbols: {self.symbols[:10]}")
else:
self.log.error(f"Failed to fetch exchange info: {response.status_code}")
raise Exception(f"Failed to fetch exchange info: {response.status_code}")
async def _fetch_stats_and_reconstruct_candles(self):
url = "https://api.binance.com/api/v3/ticker/24hr"
klines_url = "https://api.binance.com/api/v3/klines"
ticker_url = "https://api.binance.com/api/v3/ticker/price"
# PRESERVED: Original rate limit handling with throttle mode support
base_rate_limit = 2.5
rate_limit_interval = self.throttle_rate_limit_seconds if self.throttle_mode else base_rate_limit
if self.throttle_mode:
self.log.warning(f"THROTTLE MODE: Using {rate_limit_interval}s intervals (vs {base_rate_limit}s normal)")
stats = {}
candle_opens = {}
async with httpx.AsyncClient() as client:
total_batches = (len(self.symbols) + 49) // 50
for i in range(0, len(self.symbols), 50):
batch_num = (i // 50) + 1
start_time = time.time()
symbols_batch = self.symbols[i:i + 50]
if self.throttle_mode:
self.log.info(f"THROTTLE MODE: Processing batch {batch_num}/{total_batches} with {rate_limit_interval}s delay")
# PRESERVED: Original boundary detection logic
current_time = int(time.time() * 1000)
time_into_candle = current_time % self.candle_interval_ms
candle_open_time = current_time - time_into_candle
at_boundary = (time_into_candle < 1000)
if i == 0: # Log initial state
self.log.info("DOLPHIN: Current candle analysis:")
self.log.info(f" - Current time: {current_time}")
self.log.info(f" - Candle interval: {self.candle_interval_ms}ms ({self.candle_interval_ms//60000}m)")
self.log.info(f" - Time into candle: {time_into_candle}ms ({time_into_candle//1000}s)")
self.log.info(f" - At boundary: {at_boundary}")
self.log.info(f" - Candle open time: {candle_open_time}")
symbols_json_string = json.dumps(symbols_batch, separators=(',', ':'))
params = {"symbols": symbols_json_string}
try:
# PRESERVED: Original stats fetching
response = await client.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
for item in data:
symbol = item['symbol']
stats[symbol] = {
'count': int(item['count']),
'quoteVolume': float(item['quoteVolume']),
}
self.log.info(f"Fetched stats for batch {i//50 + 1}: {len(data)} symbols")
else:
self.log.error(f"Error {response.status_code}: {response.text}")
# PRESERVED: Original DOLPHIN candle reconstruction strategy
if at_boundary:
# AT BOUNDARY: Fetch current prices to use as opens
self.log.info(f"DOLPHIN: At boundary - fetching current prices (batch {i//50 + 1})")
ticker_params = {"symbols": symbols_json_string}
ticker_response = await client.get(ticker_url, params=ticker_params, timeout=5)
if ticker_response.status_code == 200:
ticker_data = ticker_response.json()
for item in ticker_data:
symbol = item['symbol']
current_price = float(item['price'])
candle_opens[symbol] = current_price
self.log.info(f" - Fetched current prices for {len(ticker_data)} symbols")
else:
self.log.warning(f" - Current price fetch failed ({ticker_response.status_code})")
else:
# NOT AT BOUNDARY: Fetch historical 1s kline data
self.log.info(f"DOLPHIN: Not at boundary - fetching 1s kline data (batch {i//50 + 1})")
for symbol in symbols_batch:
try:
# PRESERVED: Original kline fetching logic
kline_params = {
'symbol': symbol,
'interval': '1s',
'startTime': candle_open_time,
'endTime': candle_open_time + 1000,
'limit': 1
}
kline_response = await client.get(klines_url, params=kline_params, timeout=5)
if kline_response.status_code == 200:
kline_data = kline_response.json()
if kline_data:
open_price = float(kline_data[0][1])
candle_opens[symbol] = open_price
if (i + len(symbols_batch)) <= 10:
self.log.info(f" - {symbol}: reconstructed open = {open_price}")
else:
self.log.warning(f" - {symbol}: no 1s kline data found")
else:
self.log.warning(f" - {symbol}: kline fetch failed ({kline_response.status_code})")
except Exception as e:
self.log.error(f" - {symbol}: kline fetch error: {e}")
await asyncio.sleep(0.1)
except Exception as e:
self.log.error(f"Request failed: {str(e)}")
# PRESERVED: Original rate limiting
elapsed = time.time() - start_time
if i + 50 < len(self.symbols):
sleep_time = max(0, rate_limit_interval - elapsed)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.log.info(f"DOLPHIN: Candle reconstruction complete:")
self.log.info(f" - Symbols processed: {len(self.symbols)}")
self.log.info(f" - Opens reconstructed: {len(candle_opens)}")
self.log.info("Discovery phase ready for publishing...")
return stats, candle_opens
def _publish_discovery_results(self):
"""Publish results using msgbus - Nautilus message bus integration"""
try:
self.log.info("Nautilus ActorExecutor: Publishing discovery results to other actors...")
if hasattr(self, 'msgbus') and self.msgbus:
# Publish symbols and candles as tuples
self.msgbus.publish(SYMBOLS_DISCOVERED_TOPIC, (self.symbols, int(time.time_ns())))
self.msgbus.publish(CANDLES_INITIAL_TOPIC, (self.active_candles, int(time.time_ns())))
self.log.info(f"Nautilus ActorExecutor: Published {len(self.symbols)} symbols and {len(self.active_candles)} candles")
self.log.info("Nautilus ActorExecutor: Discovery phase complete - other actors can now start processing")
else:
self.log.warning("Nautilus ActorExecutor: msgbus not available for publishing")
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to publish discovery results: {e}")
class SILOQYMainActor(Actor):
def __init__(self, config: SILOQYMainActorConfig):
super().__init__(config)
# Preserve original configuration
self.candle_interval_ms = config.candle_interval_ms
self.throttle_mode = config.throttle_mode
self.connections = {}
self.connection_tasks = {}
# Will be populated from discovery actor
self.symbols = []
self.active_candles = {}
# WebSocket tasks - managed by Nautilus ActorExecutor
self.ws_tasks = []
# Synchronization
self.discovery_complete = asyncio.Event()
if self.throttle_mode:
self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation")
self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor")
def on_start(self) -> None:
"""Subscribe to discovery events - using Nautilus ActorExecutor"""
self.log.info("Nautilus ActorExecutor: SILOQYMainActor starting - subscribing to discovery events")
# Subscribe to discovery results
if hasattr(self, 'msgbus') and self.msgbus:
self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered)
self.msgbus.subscribe(CANDLES_INITIAL_TOPIC, self.handle_candles_initial)
self.log.info("Nautilus ActorExecutor: Subscribed to discovery topics")
# Use Nautilus ActorExecutor for task management
if hasattr(self, '_executor') and self._executor:
self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization")
future = self._executor.submit(self.on_start_async())
self.log.info("Nautilus ActorExecutor: Main actor initialization task submitted")
else:
self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio")
asyncio.create_task(self.on_start_async())
def on_stop(self) -> None:
"""Stop the actor - Nautilus handles executor cleanup"""
self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping")
# Nautilus kernel handles executor shutdown - no manual cleanup needed
async def on_start_async(self) -> None:
try:
# Wait for symbol discovery to complete
self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...")
await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0)
# PRESERVED: Start WebSocket connections
await self._start_websocket_connections()
self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully")
except asyncio.TimeoutError:
self.log.error("Nautilus ActorExecutor: Timeout waiting for symbol discovery")
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to start SILOQYMainActor: {e}")
def handle_symbols_discovered(self, data):
"""Handle symbols from discovery actor"""
try:
symbols, timestamp = data
self.symbols = symbols
self.log.info(f"Nautilus ActorExecutor: Received {len(symbols)} symbols from discovery actor")
if self.symbols and self.active_candles:
self.discovery_complete.set()
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}")
# FIXED: SILOQYMainActor.handle_candles_initial
def handle_candles_initial(self, data):
"""Handle initial candles from discovery actor and properly initialize the candle dict."""
try:
candles_received, timestamp = data
self.log.info(f"Nautilus ActorExecutor: Received {len(candles_received)} initial candles from discovery actor")
# FIX: Re-initialize active_candles with the full dictionary structure
# STEP 5 FIX: Added rounding to 8 decimals for all price values
self.active_candles = {}
for symbol, open_price in candles_received.items():
self.active_candles[symbol] = {
'open_price': round(open_price, 8),
'open_time': (int(time.time() * 1000) // self.candle_interval_ms) * self.candle_interval_ms,
'high': round(open_price, 8),
'low': round(open_price, 8),
'close': round(open_price, 8),
'volume': 0.0,
}
if self.symbols and self.active_candles:
self.discovery_complete.set()
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}")
async def _start_websocket_connections(self):
"""PRESERVED: Original WebSocket connection logic with Nautilus ActorExecutor"""
self.log.info("Starting WebSocket simulation for tick generation...")
self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols")
# Use Nautilus ActorExecutor for WebSocket task management
if hasattr(self, '_executor') and self._executor:
self.log.info("Nautilus ActorExecutor: Using registered executor for WebSocket simulation")
future = self._executor.submit(self._simulate_websocket_ticks())
self.log.info("Nautilus ActorExecutor: WebSocket simulation task submitted")
else:
self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio")
task = asyncio.create_task(self._simulate_websocket_ticks())
self.ws_tasks.append(task)
async def _simulate_websocket_ticks(self):
# Adjust tick rate for throttle mode
tick_delay = 0.1 if self.throttle_mode else 0.01
symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols))
tick_count = 0
try:
if self.throttle_mode:
self.log.warning(f"THROTTLE MODE: Reduced tick generation - {symbols_to_simulate} symbols at 10 ticks/second")
else:
self.log.info("WebSocket tick simulation started - generating 100 ticks/second")
while True:
for symbol in self.symbols[:symbols_to_simulate]:
# STEP 2 FIX: Round price to 8 decimals, use float for quantity with 8 decimals
price = round(100.0 + random.gauss(0, 0.5), 8)
quantity = round(random.uniform(0.00000001, 100.0), 8)
timestamp = int(time.time() * 1000)
self.on_websocket_tick(symbol, price, quantity, timestamp)
tick_count += 1
# Log every 500 ticks in throttle mode, 1000 in normal mode
log_interval = 500 if self.throttle_mode else 1000
if tick_count % log_interval == 0:
mode_indicator = "THROTTLE" if self.throttle_mode else ""
self.log.info(f"Generated {tick_count} ticks {mode_indicator} - latest: {symbol}@{price:.2f}")
await asyncio.sleep(tick_delay)
except asyncio.CancelledError:
self.log.info(f"Nautilus ActorExecutor: WebSocket simulation stopped after {tick_count} ticks")
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: WebSocket simulation error: {e}")
def on_websocket_tick(self, symbol: str, price: float, quantity: float, timestamp: int):
"""
PRESERVED: Original tick processing and candle update logic
"""
# PRESERVED: Original candle update logic
if symbol not in self.active_candles:
candle_open_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms
# STEP 4 FIX: Round all prices to 8 decimals
self.active_candles[symbol] = {
'open_price': round(price, 8),
'open_time': candle_open_time,
'high': round(price, 8),
'low': round(price, 8),
'close': round(price, 8),
'volume': 0.0
}
candle = self.active_candles[symbol]
# PRESERVED: Original candle rolling logic
current_candle_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms
if current_candle_time > candle['open_time']:
# STEP 4 FIX: Round all prices to 8 decimals
candle['open_price'] = round(price, 8)
candle['open_time'] = current_candle_time
candle['high'] = round(price, 8)
candle['low'] = round(price, 8)
candle['close'] = round(price, 8)
candle['volume'] = quantity
else:
candle['close'] = round(price, 8)
candle['high'] = round(max(candle['high'], price), 8)
candle['low'] = round(min(candle['low'], price), 8)
candle['volume'] += quantity
# PRESERVED: Original high-performance tuple publishing
try:
if hasattr(self, 'msgbus') and self.msgbus:
# STEP 3 FIX: Round all float values to 8 decimals in the tuple
tick_tuple = (
symbol,
round(float(price), 8),
round(float(quantity), 8),
int(timestamp),
round(float(candle['open_price']), 8),
int(candle['open_time'])
)
self.msgbus.publish(RAW_TOPIC, tick_tuple)
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to publish tick: {e}")
class DOLPHINRegimeActor(Actor):
def __init__(self, config: DOLPHINRegimeActorConfig):
super().__init__(config)
# PRESERVED: All original DOLPHIN configuration
self.max_symbols = config.max_symbols
self.ticks_per_analysis = config.ticks_per_analysis
# PRESERVED: All original pre-allocated arrays for zero allocation
self.open_prices = np.zeros(self.max_symbols, dtype=np.float64)
self.close_prices = np.zeros(self.max_symbols, dtype=np.float64)
self.high_prices = np.zeros(self.max_symbols, dtype=np.float64)
self.low_prices = np.zeros(self.max_symbols, dtype=np.float64)
self.volumes = np.zeros(self.max_symbols, dtype=np.float64)
self.last_update = np.zeros(self.max_symbols, dtype=np.int64)
# PRESERVED: All original mapping and state
self.symbol_to_idx = {}
self.idx_to_symbol = {}
self.active_symbols = 0
self.tick_count = 0
# PRESERVED: All original DOLPHIN thresholds - EXACT
self.bull_threshold = 0.60 # 60% bullish
self.bear_threshold = 0.55 # 55% bearish
self.previous_bull_ratio = None
self.regime_history = deque(maxlen=100)
# Metrics
self.last_metric_log = time.time_ns()
self.processed_ticks = 0
self.regime_calculations = 0
self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, "
f"ticks_per_analysis: {self.ticks_per_analysis}")
def on_start(self) -> None:
"""Subscribe to tick events - using Nautilus ActorExecutor"""
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to tick events")
if hasattr(self, 'msgbus') and self.msgbus:
self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick)
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events")
def on_stop(self) -> None:
"""Stop the actor - Nautilus handles executor cleanup"""
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping")
# Nautilus kernel handles executor shutdown - no manual cleanup needed
def handle_raw_tick(self, data):
"""
PRESERVED EXACTLY: All original zero-allocation tick processing
Using Nautilus ActorExecutor for regime detection tasks
"""
try:
symbol, price, size, ts_event, open_price, candle_open_time = data
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}")
return
# PRESERVED EXACTLY: All original array processing logic
if symbol not in self.symbol_to_idx:
if self.active_symbols >= self.max_symbols:
self.log.error(f"Nautilus ActorExecutor: Max symbols ({self.max_symbols}) exceeded")
return
idx = self.active_symbols
self.symbol_to_idx[symbol] = idx
self.idx_to_symbol[idx] = symbol
self.active_symbols += 1
# Initialize arrays
self.open_prices[idx] = open_price
self.high_prices[idx] = price
self.low_prices[idx] = price
self.close_prices[idx] = price
self.volumes[idx] = 0.0
else:
idx = self.symbol_to_idx[symbol]
# Check if new candle period
if candle_open_time > self.last_update[idx]:
# Reset for new candle
self.open_prices[idx] = open_price
self.high_prices[idx] = price
self.low_prices[idx] = price
self.close_prices[idx] = price
self.volumes[idx] = size
self.last_update[idx] = candle_open_time
else:
# Update existing candle data
self.close_prices[idx] = price
self.high_prices[idx] = max(self.high_prices[idx], price)
self.low_prices[idx] = min(self.low_prices[idx], price)
self.volumes[idx] += size
self.tick_count += 1
self.processed_ticks += 1
# PRESERVED: Original trigger logic
if self.tick_count >= self.ticks_per_analysis:
# Use Nautilus ActorExecutor for regime detection
if hasattr(self, '_executor') and self._executor:
future = self._executor.submit(self._run_regime_detection_async())
else:
# Fallback to sync detection
self._run_regime_detection()
self.tick_count = 0
# Periodic metrics logging
now = time.time_ns()
if now - self.last_metric_log > 1_000_000_000:
self.log.info(f"Nautilus ActorExecutor: DOLPHIN metrics - ticks: {self.processed_ticks}, "
f"regime_calcs: {self.regime_calculations}, active_symbols: {self.active_symbols}")
self.last_metric_log = now
async def _run_regime_detection_async(self):
try:
self._run_regime_detection()
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}")
def _run_regime_detection(self):
self.regime_calculations += 1
total_symbols = self.active_symbols
if total_symbols == 0:
return
analyzed = 0
bullish = 0
bearish = 0
# PRESERVED: Original analysis with exact thresholds
for idx in range(self.active_symbols):
open_price = self.open_prices[idx]
close_price = self.close_prices[idx]
if open_price == 0:
continue
analyzed += 1
# PRESERVED: EXACT DOLPHIN thresholds
change = (close_price - open_price) / open_price
if change >= 0.0015: # 0.15% threshold for bullish
bullish += 1
elif change <= -0.0015: # -0.15% threshold for bearish
bearish += 1
if analyzed == 0:
return
# PRESERVED: Original ratio calculations
bull_ratio = bullish / analyzed
bear_ratio = bearish / analyzed
sideways_ratio = 1.0 - bull_ratio - bear_ratio
# MODIFIED: Original DOLPHIN Algorithm - Simple 3-regime detection
if bull_ratio >= self.bull_threshold: # 60% bullish
regime = MarketRegime.BULL
elif bear_ratio >= self.bear_threshold: # 55% bearish
regime = MarketRegime.BEAR
else:
# Everything else is sideways
regime = MarketRegime.SIDEWAYS
# PRESERVED: Original confidence calculation
confidence = self._calculate_confidence(bull_ratio, bear_ratio, analyzed, total_symbols)
self.previous_bull_ratio = bull_ratio
# Publish regime result using Nautilus message bus
try:
if hasattr(self, 'msgbus') and self.msgbus:
regime_tuple = (
int(time.time() * 1000),
regime.value,
float(bull_ratio),
float(bear_ratio),
float(sideways_ratio),
int(analyzed),
int(total_symbols),
float(confidence)
)
self.msgbus.publish(REGIME_TOPIC, regime_tuple)
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}")
# Log regime changes
if not self.regime_history or regime != self.regime_history[-1]:
self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.1%} "
f"Bear: {bear_ratio:.1%} Sideways: {sideways_ratio:.1%} | "
f"Confidence: {confidence:.1%} | Analyzed: {analyzed}/{total_symbols}")
self.regime_history.append(regime)
# Periodic regime status (even without changes)
if self.regime_calculations % 10 == 0: # Every 10 calculations
self.log.info(f"REGIME STATUS: {regime.value} | Bull: {bull_ratio:.1%} "
f"Bear: {bear_ratio:.1%} | Processed: {self.processed_ticks} ticks")
def _calculate_confidence(self, bull_ratio: float, bear_ratio: float,
analyzed: int, total: int) -> float:
"""PRESERVED EXACTLY: Original DOLPHIN confidence calculation"""
if analyzed == 0:
return 0.0
# Market Decisiveness
max_ratio = max(bull_ratio, bear_ratio)
decisiveness = abs(max_ratio - 0.5) * 2
# Sample Coverage
coverage = analyzed / total
# Statistical Significance
if max_ratio > 0 and max_ratio < 1:
standard_error = math.sqrt(max_ratio * (1 - max_ratio) / analyzed)
else:
standard_error = 0.0
z_score = abs(max_ratio - 0.5) / max(standard_error, 0.001)
statistical_confidence = min(z_score / 3.0, 1.0)
# Market Clarity
market_clarity = bull_ratio + bear_ratio
# Weighted combination
confidence = (
decisiveness * 0.40 +
coverage * 0.10 +
statistical_confidence * 0.30 +
market_clarity * 0.20
)
return max(0.0, min(confidence, 1.0))
class SILOQYNormalizerActor(Actor):
def __init__(self, config: SILOQYNormalizerConfig):
super().__init__(config)
self.normalized_count = 0
self.last_metric_log = time.time_ns()
self.log.info("SILOQYNormalizerActor initialized with Nautilus ActorExecutor")
def on_start(self) -> None:
"""Subscribe to tick events - using Nautilus ActorExecutor"""
self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor starting - subscribing to tick events")
if hasattr(self, 'msgbus') and self.msgbus:
self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick)
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events")
def on_stop(self) -> None:
"""Stop the actor - Nautilus handles executor cleanup"""
self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor stopping")
# Nautilus kernel handles executor shutdown - no manual cleanup needed
def handle_raw_tick(self, data):
try:
symbol, price, size, ts_event, open_price, candle_open_time = data
tick = SiloqyTradeTick(
instrument_id=symbol,
price=round(float(price), 8),
size=round(float(size), 8),
ts_event=int(ts_event),
ts_init=int(time.time_ns()),
open_price=round(float(open_price), 8) if open_price is not None else round(float(price), 8),
candle_open_time=int(candle_open_time) if candle_open_time is not None else int(time.time() * 1000),
tick_side=None, # Default to None unless Binance provides this info
exchange="BINANCE", # We know this is from Binance
liquidity_flag=None # Default to None unless Binance provides this info
)
if hasattr(self, 'msgbus') and self.msgbus:
try:
# Publish directly to STRUCTURED_TOPIC to match original v3.28 behavior
self.msgbus.publish(STRUCTURED_TOPIC, tick)
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to publish to STRUCTURED_TOPIC: {e}")
# FALLBACK: Try Nautilus publish_data if msgbus not available
elif hasattr(self, 'publish_data'):
try:
# Use standard Nautilus publishing as fallback
self.publish_data(DataType(SiloqyTradeTick), tick)
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to publish structured data: {e}")
self.normalized_count += 1
# Periodic metrics
now = time.time_ns()
if now - self.last_metric_log > 1_000_000_000:
self.log.info(f"Nautilus ActorExecutor: Normalizer processed: {self.normalized_count} ticks")
self.last_metric_log = now
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to handle raw tick: {e}")
def test_siloqy_actors_with_nautilus_process_management():
"""Test SILOQY actors with Nautilus built-in ActorExecutor process management"""
print("SILOQY Multi-Actor Test with Nautilus Built-in Process Management")
# Configuration following test pattern - THROTTLE MODE ENABLED FOR DEV/TESTING
symbol_discovery_config = ImportableActorConfig(
actor_path="__main__:SILOQYSymbolDiscoveryActor",
config_path="__main__:SILOQYSymbolDiscoveryConfig",
config={
"component_id": "SILOQY-SYMBOL-DISCOVERY",
"symbols": [], # Empty for dynamic discovery
"candle_interval_ms": 15 * 60 * 1000,
"throttle_mode": True, # ENABLED: Safe for dual instance testing
"throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s)
"max_symbols_throttled": 100 # Only 100 symbols (vs 2000+)
}
)
main_actor_config = ImportableActorConfig(
actor_path="__main__:SILOQYMainActor",
config_path="__main__:SILOQYMainActorConfig",
config={
"component_id": "SILOQY-MAIN-ACTOR",
"candle_interval_ms": 15 * 60 * 1000,
"throttle_mode": True # ENABLED: Reduced tick generation
}
)
regime_actor_config = ImportableActorConfig(
actor_path="__main__:DOLPHINRegimeActor",
config_path="__main__:DOLPHINRegimeActorConfig",
config={
"component_id": "DOLPHIN-REGIME-ACTOR",
"max_symbols": 5000,
"ticks_per_analysis": 500 # Reduced for throttle mode testing
}
)
normalizer_config = ImportableActorConfig(
actor_path="__main__:SILOQYNormalizerActor",
config_path="__main__:SILOQYNormalizerConfig",
config={
"component_id": "SILOQY-NORMALIZER"
}
)
# Trading node configuration - Uses Nautilus built-in process management
trading_config = TradingNodeConfig(
trader_id=TraderId("SILOQY-TRADER-001"),
actors=[
symbol_discovery_config,
main_actor_config,
regime_actor_config,
normalizer_config
],
data_clients={}, # No external data clients needed
exec_clients={} # No execution clients for this test
)
node = TradingNode(config=trading_config)
try:
node.build()
print("Node built successfully with Nautilus built-in process management")
node.run()
except KeyboardInterrupt:
print("\nSILOQY Actor test with Nautilus process management completed!")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
finally:
try:
node.dispose()
print("Nautilus process management: Node disposed successfully")
except:
pass
if __name__ == "__main__":
test_siloqy_actors_with_nautilus_process_management()