Working starting point: Nautilus integration (throttled)
This commit is contained in:
947
nautilus_actor_test_implementation_5x.py
Normal file
947
nautilus_actor_test_implementation_5x.py
Normal file
@@ -0,0 +1,947 @@
|
|||||||
|
|
||||||
|
import time
|
||||||
|
import numpy as np
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import math
|
||||||
|
from typing import Dict, List, Optional, Tuple, Any
|
||||||
|
from enum import Enum
|
||||||
|
from collections import deque
|
||||||
|
import httpx
|
||||||
|
import random # Added for _simulate_websocket_ticks
|
||||||
|
|
||||||
|
# Nautilus imports - following test pattern
|
||||||
|
from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig
|
||||||
|
from nautilus_trader.live.node import TradingNode
|
||||||
|
from nautilus_trader.common.actor import Actor
|
||||||
|
from nautilus_trader.common.config import ActorConfig
|
||||||
|
from nautilus_trader.model.identifiers import TraderId
|
||||||
|
from nautilus_trader.core.data import Data
|
||||||
|
from nautilus_trader.common.component import Logger, init_logging
|
||||||
|
|
||||||
|
# NEW: construct a fresh Tick for each normalized message (don't mutate immutable Data)
|
||||||
|
from nautilus_trader.model.data import TradeTick
|
||||||
|
from nautilus_trader.model.identifiers import InstrumentId, TradeId
|
||||||
|
from nautilus_trader.model.objects import Price, Quantity
|
||||||
|
from nautilus_trader.model.enums import AggressorSide
|
||||||
|
|
||||||
|
# Initialize logging
|
||||||
|
_log_guard = init_logging()
|
||||||
|
_logger = Logger("SILOQY")
|
||||||
|
|
||||||
|
# Topics - HIGH PERFORMANCE STRING TOPICS
|
||||||
|
RAW_TOPIC = "SILOQY.RAW.TICKS"
|
||||||
|
STRUCTURED_TOPIC = "SILOQY.STRUCTURED.TICKS"
|
||||||
|
REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS"
|
||||||
|
SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED"
|
||||||
|
CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL"
|
||||||
|
|
||||||
|
# Rate limiting constant
|
||||||
|
MIN_INTERVAL = 2.5 # seconds between API batches
|
||||||
|
|
||||||
|
# Market Regime Enum
|
||||||
|
class MarketRegime(Enum):
|
||||||
|
BULL = "BULL"
|
||||||
|
BEAR = "BEAR"
|
||||||
|
TRANSITION = "TRANSITION"
|
||||||
|
SIDEWAYS = "SIDEWAYS"
|
||||||
|
|
||||||
|
# --------------------------------------------------------------------
|
||||||
|
# SILOQY Custom Tick - PRESERVED with fixes
|
||||||
|
# --------------------------------------------------------------------
|
||||||
|
class SiloqyTradeTick(TradeTick):
|
||||||
|
def __init__(self, instrument_id: str, price: float, size: float, ts_event: int, ts_init: int = None,
|
||||||
|
open_price: float = None, candle_open_time: int = None, tick_side: str = None,
|
||||||
|
exchange: str = None, liquidity_flag = None):
|
||||||
|
# Convert to proper Nautilus types - add venue to symbol
|
||||||
|
if isinstance(instrument_id, str):
|
||||||
|
# Add BINANCE venue if not already present
|
||||||
|
if '.' not in instrument_id:
|
||||||
|
instrument_id = f"{instrument_id}.BINANCE"
|
||||||
|
instr_id = InstrumentId.from_str(instrument_id)
|
||||||
|
else:
|
||||||
|
instr_id = instrument_id
|
||||||
|
# STEP 1 FIX: Changed precision from 4 to 8 for price, and 0 to 8 for size
|
||||||
|
price_obj = Price(price, precision=8) if not isinstance(price, Price) else price
|
||||||
|
size_obj = Quantity(size, precision=8) if not isinstance(size, Quantity) else size
|
||||||
|
|
||||||
|
# Default aggressor side and trade ID
|
||||||
|
aggressor_side = AggressorSide.NO_AGGRESSOR # Default since Binance doesn't always provide this
|
||||||
|
trade_id = TradeId(f"T{int(time.time_ns())}") # Generate a trade ID
|
||||||
|
|
||||||
|
super().__init__(
|
||||||
|
instrument_id=instr_id,
|
||||||
|
price=price_obj,
|
||||||
|
size=size_obj,
|
||||||
|
aggressor_side=aggressor_side,
|
||||||
|
trade_id=trade_id,
|
||||||
|
ts_event=int(ts_event),
|
||||||
|
ts_init=int(ts_init if ts_init is not None else time.time_ns())
|
||||||
|
)
|
||||||
|
|
||||||
|
# Additional SILOQY fields
|
||||||
|
self.open_price = open_price if open_price is not None else price
|
||||||
|
self.candle_open_time = candle_open_time if candle_open_time is not None else int(time.time() * 1000)
|
||||||
|
self.tick_side = tick_side
|
||||||
|
self.exchange = exchange
|
||||||
|
self.liquidity_flag = liquidity_flag
|
||||||
|
|
||||||
|
class SILOQYSymbolDiscoveryConfig(ActorConfig):
|
||||||
|
symbols: List[str] = []
|
||||||
|
candle_interval_ms: int = 15 * 60 * 1000
|
||||||
|
throttle_mode: bool = False
|
||||||
|
throttle_rate_limit_seconds: float = 10.0
|
||||||
|
max_symbols_throttled: int = 100
|
||||||
|
|
||||||
|
class SILOQYMainActorConfig(ActorConfig):
|
||||||
|
candle_interval_ms: int = 15 * 60 * 1000
|
||||||
|
throttle_mode: bool = False
|
||||||
|
|
||||||
|
class DOLPHINRegimeActorConfig(ActorConfig):
|
||||||
|
max_symbols: int = 5000
|
||||||
|
ticks_per_analysis: int = 1000
|
||||||
|
|
||||||
|
class SILOQYNormalizerConfig(ActorConfig):
|
||||||
|
pass
|
||||||
|
|
||||||
|
class SILOQYSymbolDiscoveryActor(Actor):
|
||||||
|
"""
|
||||||
|
Symbol discovery with all original SILOQY algorithms preserved
|
||||||
|
Using Nautilus built-in ActorExecutor for task management
|
||||||
|
"""
|
||||||
|
def __init__(self, config: SILOQYSymbolDiscoveryConfig):
|
||||||
|
super().__init__(config)
|
||||||
|
|
||||||
|
# Preserve original SILOQY configuration
|
||||||
|
self.symbols = list(config.symbols) if config.symbols else []
|
||||||
|
self.candle_interval_ms = config.candle_interval_ms
|
||||||
|
self.active_candles = {}
|
||||||
|
|
||||||
|
# Process management configuration
|
||||||
|
self.throttle_mode = config.throttle_mode
|
||||||
|
self.throttle_rate_limit_seconds = config.throttle_rate_limit_seconds
|
||||||
|
self.max_symbols_throttled = config.max_symbols_throttled
|
||||||
|
|
||||||
|
if self.throttle_mode:
|
||||||
|
self.log.warning("THROTTLE MODE ENABLED - Running in dev/test configuration")
|
||||||
|
self.log.warning(f"Rate limit: {self.throttle_rate_limit_seconds}s between batches")
|
||||||
|
self.log.warning(f"Symbol limit: {self.max_symbols_throttled} symbols max")
|
||||||
|
|
||||||
|
self.log.info("SILOQYSymbolDiscoveryActor initialized with Nautilus ActorExecutor")
|
||||||
|
|
||||||
|
def on_start(self) -> None:
|
||||||
|
"""Start the actor - using Nautilus ActorExecutor"""
|
||||||
|
self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor starting")
|
||||||
|
|
||||||
|
# Use Nautilus ActorExecutor for task management
|
||||||
|
if hasattr(self, '_executor') and self._executor:
|
||||||
|
self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization")
|
||||||
|
future = self._executor.submit(self.on_start_async())
|
||||||
|
self.log.info("Nautilus ActorExecutor: Symbol discovery task submitted")
|
||||||
|
else:
|
||||||
|
self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio")
|
||||||
|
asyncio.create_task(self.on_start_async())
|
||||||
|
|
||||||
|
def on_stop(self) -> None:
|
||||||
|
"""Stop the actor - Nautilus handles executor cleanup"""
|
||||||
|
self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor stopping")
|
||||||
|
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
||||||
|
|
||||||
|
async def on_start_async(self) -> None:
|
||||||
|
self.log.info("Nautilus ActorExecutor: Starting async symbol discovery initialization")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# PRESERVED: Original symbol discovery algorithm
|
||||||
|
if not self.symbols:
|
||||||
|
await self._discover_all_symbols()
|
||||||
|
|
||||||
|
# PRESERVED: Original stats fetching and candle reconstruction
|
||||||
|
stats, candle_opens = await self._fetch_stats_and_reconstruct_candles()
|
||||||
|
|
||||||
|
# Set active candles from reconstruction results
|
||||||
|
self.active_candles = candle_opens
|
||||||
|
|
||||||
|
# Publish results using msgbus
|
||||||
|
self._publish_discovery_results()
|
||||||
|
|
||||||
|
self.log.info("Nautilus ActorExecutor: Symbol discovery completed successfully")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Failed to complete symbol discovery: {e}")
|
||||||
|
# Don't re-raise, let system continue
|
||||||
|
|
||||||
|
async def _discover_all_symbols(self):
|
||||||
|
"""PRESERVED: Original Binance symbol discovery algorithm"""
|
||||||
|
self.log.info("Starting dynamic symbol discovery from Binance...")
|
||||||
|
url = "https://api.binance.com/api/v3/exchangeInfo"
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
self.log.info("Fetching exchange info from Binance API...")
|
||||||
|
response = await client.get(url, timeout=10)
|
||||||
|
if response.status_code == 200:
|
||||||
|
self.log.info("Successfully received exchange info")
|
||||||
|
data = response.json()
|
||||||
|
# Get all trading symbols (USDT pairs for example)
|
||||||
|
full_symbols = [
|
||||||
|
s['symbol'] for s in data['symbols']
|
||||||
|
if s['status'] == 'TRADING' and s['symbol'].endswith('USDT')
|
||||||
|
]
|
||||||
|
|
||||||
|
# Apply throttle mode symbol limiting
|
||||||
|
if self.throttle_mode:
|
||||||
|
self.symbols = full_symbols[:self.max_symbols_throttled]
|
||||||
|
self.log.warning(f"THROTTLE MODE: Limited to {len(self.symbols)} symbols (from {len(full_symbols)} available)")
|
||||||
|
else:
|
||||||
|
self.symbols = full_symbols
|
||||||
|
|
||||||
|
self.log.info(f"Discovered {len(self.symbols)} trading symbols")
|
||||||
|
self.log.info(f"First 10 symbols: {self.symbols[:10]}")
|
||||||
|
else:
|
||||||
|
self.log.error(f"Failed to fetch exchange info: {response.status_code}")
|
||||||
|
raise Exception(f"Failed to fetch exchange info: {response.status_code}")
|
||||||
|
|
||||||
|
async def _fetch_stats_and_reconstruct_candles(self):
|
||||||
|
url = "https://api.binance.com/api/v3/ticker/24hr"
|
||||||
|
klines_url = "https://api.binance.com/api/v3/klines"
|
||||||
|
ticker_url = "https://api.binance.com/api/v3/ticker/price"
|
||||||
|
|
||||||
|
# PRESERVED: Original rate limit handling with throttle mode support
|
||||||
|
base_rate_limit = 2.5
|
||||||
|
rate_limit_interval = self.throttle_rate_limit_seconds if self.throttle_mode else base_rate_limit
|
||||||
|
|
||||||
|
if self.throttle_mode:
|
||||||
|
self.log.warning(f"THROTTLE MODE: Using {rate_limit_interval}s intervals (vs {base_rate_limit}s normal)")
|
||||||
|
|
||||||
|
stats = {}
|
||||||
|
candle_opens = {}
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
total_batches = (len(self.symbols) + 49) // 50
|
||||||
|
for i in range(0, len(self.symbols), 50):
|
||||||
|
batch_num = (i // 50) + 1
|
||||||
|
start_time = time.time()
|
||||||
|
symbols_batch = self.symbols[i:i + 50]
|
||||||
|
|
||||||
|
if self.throttle_mode:
|
||||||
|
self.log.info(f"THROTTLE MODE: Processing batch {batch_num}/{total_batches} with {rate_limit_interval}s delay")
|
||||||
|
|
||||||
|
# PRESERVED: Original boundary detection logic
|
||||||
|
current_time = int(time.time() * 1000)
|
||||||
|
time_into_candle = current_time % self.candle_interval_ms
|
||||||
|
candle_open_time = current_time - time_into_candle
|
||||||
|
at_boundary = (time_into_candle < 1000)
|
||||||
|
|
||||||
|
if i == 0: # Log initial state
|
||||||
|
self.log.info("DOLPHIN: Current candle analysis:")
|
||||||
|
self.log.info(f" - Current time: {current_time}")
|
||||||
|
self.log.info(f" - Candle interval: {self.candle_interval_ms}ms ({self.candle_interval_ms//60000}m)")
|
||||||
|
self.log.info(f" - Time into candle: {time_into_candle}ms ({time_into_candle//1000}s)")
|
||||||
|
self.log.info(f" - At boundary: {at_boundary}")
|
||||||
|
self.log.info(f" - Candle open time: {candle_open_time}")
|
||||||
|
|
||||||
|
symbols_json_string = json.dumps(symbols_batch, separators=(',', ':'))
|
||||||
|
params = {"symbols": symbols_json_string}
|
||||||
|
|
||||||
|
try:
|
||||||
|
# PRESERVED: Original stats fetching
|
||||||
|
response = await client.get(url, params=params, timeout=10)
|
||||||
|
|
||||||
|
if response.status_code == 200:
|
||||||
|
data = response.json()
|
||||||
|
for item in data:
|
||||||
|
symbol = item['symbol']
|
||||||
|
stats[symbol] = {
|
||||||
|
'count': int(item['count']),
|
||||||
|
'quoteVolume': float(item['quoteVolume']),
|
||||||
|
}
|
||||||
|
self.log.info(f"Fetched stats for batch {i//50 + 1}: {len(data)} symbols")
|
||||||
|
|
||||||
|
else:
|
||||||
|
self.log.error(f"Error {response.status_code}: {response.text}")
|
||||||
|
|
||||||
|
# PRESERVED: Original DOLPHIN candle reconstruction strategy
|
||||||
|
if at_boundary:
|
||||||
|
# AT BOUNDARY: Fetch current prices to use as opens
|
||||||
|
self.log.info(f"DOLPHIN: At boundary - fetching current prices (batch {i//50 + 1})")
|
||||||
|
|
||||||
|
ticker_params = {"symbols": symbols_json_string}
|
||||||
|
ticker_response = await client.get(ticker_url, params=ticker_params, timeout=5)
|
||||||
|
|
||||||
|
if ticker_response.status_code == 200:
|
||||||
|
ticker_data = ticker_response.json()
|
||||||
|
for item in ticker_data:
|
||||||
|
symbol = item['symbol']
|
||||||
|
current_price = float(item['price'])
|
||||||
|
candle_opens[symbol] = current_price
|
||||||
|
|
||||||
|
self.log.info(f" - Fetched current prices for {len(ticker_data)} symbols")
|
||||||
|
else:
|
||||||
|
self.log.warning(f" - Current price fetch failed ({ticker_response.status_code})")
|
||||||
|
|
||||||
|
else:
|
||||||
|
# NOT AT BOUNDARY: Fetch historical 1s kline data
|
||||||
|
self.log.info(f"DOLPHIN: Not at boundary - fetching 1s kline data (batch {i//50 + 1})")
|
||||||
|
|
||||||
|
for symbol in symbols_batch:
|
||||||
|
try:
|
||||||
|
# PRESERVED: Original kline fetching logic
|
||||||
|
kline_params = {
|
||||||
|
'symbol': symbol,
|
||||||
|
'interval': '1s',
|
||||||
|
'startTime': candle_open_time,
|
||||||
|
'endTime': candle_open_time + 1000,
|
||||||
|
'limit': 1
|
||||||
|
}
|
||||||
|
|
||||||
|
kline_response = await client.get(klines_url, params=kline_params, timeout=5)
|
||||||
|
|
||||||
|
if kline_response.status_code == 200:
|
||||||
|
kline_data = kline_response.json()
|
||||||
|
if kline_data:
|
||||||
|
open_price = float(kline_data[0][1])
|
||||||
|
candle_opens[symbol] = open_price
|
||||||
|
|
||||||
|
if (i + len(symbols_batch)) <= 10:
|
||||||
|
self.log.info(f" - {symbol}: reconstructed open = {open_price}")
|
||||||
|
else:
|
||||||
|
self.log.warning(f" - {symbol}: no 1s kline data found")
|
||||||
|
else:
|
||||||
|
self.log.warning(f" - {symbol}: kline fetch failed ({kline_response.status_code})")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f" - {symbol}: kline fetch error: {e}")
|
||||||
|
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Request failed: {str(e)}")
|
||||||
|
|
||||||
|
# PRESERVED: Original rate limiting
|
||||||
|
elapsed = time.time() - start_time
|
||||||
|
if i + 50 < len(self.symbols):
|
||||||
|
sleep_time = max(0, rate_limit_interval - elapsed)
|
||||||
|
if sleep_time > 0:
|
||||||
|
await asyncio.sleep(sleep_time)
|
||||||
|
|
||||||
|
self.log.info(f"DOLPHIN: Candle reconstruction complete:")
|
||||||
|
self.log.info(f" - Symbols processed: {len(self.symbols)}")
|
||||||
|
self.log.info(f" - Opens reconstructed: {len(candle_opens)}")
|
||||||
|
self.log.info("Discovery phase ready for publishing...")
|
||||||
|
|
||||||
|
return stats, candle_opens
|
||||||
|
|
||||||
|
def _publish_discovery_results(self):
|
||||||
|
"""Publish results using msgbus - Nautilus message bus integration"""
|
||||||
|
try:
|
||||||
|
self.log.info("Nautilus ActorExecutor: Publishing discovery results to other actors...")
|
||||||
|
if hasattr(self, 'msgbus') and self.msgbus:
|
||||||
|
# Publish symbols and candles as tuples
|
||||||
|
self.msgbus.publish(SYMBOLS_DISCOVERED_TOPIC, (self.symbols, int(time.time_ns())))
|
||||||
|
self.msgbus.publish(CANDLES_INITIAL_TOPIC, (self.active_candles, int(time.time_ns())))
|
||||||
|
|
||||||
|
self.log.info(f"Nautilus ActorExecutor: Published {len(self.symbols)} symbols and {len(self.active_candles)} candles")
|
||||||
|
self.log.info("Nautilus ActorExecutor: Discovery phase complete - other actors can now start processing")
|
||||||
|
else:
|
||||||
|
self.log.warning("Nautilus ActorExecutor: msgbus not available for publishing")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Failed to publish discovery results: {e}")
|
||||||
|
|
||||||
|
class SILOQYMainActor(Actor):
|
||||||
|
def __init__(self, config: SILOQYMainActorConfig):
|
||||||
|
super().__init__(config)
|
||||||
|
|
||||||
|
# Preserve original configuration
|
||||||
|
self.candle_interval_ms = config.candle_interval_ms
|
||||||
|
self.throttle_mode = config.throttle_mode
|
||||||
|
self.connections = {}
|
||||||
|
self.connection_tasks = {}
|
||||||
|
|
||||||
|
# Will be populated from discovery actor
|
||||||
|
self.symbols = []
|
||||||
|
self.active_candles = {}
|
||||||
|
|
||||||
|
# WebSocket tasks - managed by Nautilus ActorExecutor
|
||||||
|
self.ws_tasks = []
|
||||||
|
|
||||||
|
# Synchronization
|
||||||
|
self.discovery_complete = asyncio.Event()
|
||||||
|
|
||||||
|
if self.throttle_mode:
|
||||||
|
self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation")
|
||||||
|
|
||||||
|
self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor")
|
||||||
|
|
||||||
|
def on_start(self) -> None:
|
||||||
|
"""Subscribe to discovery events - using Nautilus ActorExecutor"""
|
||||||
|
self.log.info("Nautilus ActorExecutor: SILOQYMainActor starting - subscribing to discovery events")
|
||||||
|
|
||||||
|
# Subscribe to discovery results
|
||||||
|
if hasattr(self, 'msgbus') and self.msgbus:
|
||||||
|
self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered)
|
||||||
|
self.msgbus.subscribe(CANDLES_INITIAL_TOPIC, self.handle_candles_initial)
|
||||||
|
self.log.info("Nautilus ActorExecutor: Subscribed to discovery topics")
|
||||||
|
|
||||||
|
# Use Nautilus ActorExecutor for task management
|
||||||
|
if hasattr(self, '_executor') and self._executor:
|
||||||
|
self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization")
|
||||||
|
future = self._executor.submit(self.on_start_async())
|
||||||
|
self.log.info("Nautilus ActorExecutor: Main actor initialization task submitted")
|
||||||
|
else:
|
||||||
|
self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio")
|
||||||
|
asyncio.create_task(self.on_start_async())
|
||||||
|
|
||||||
|
def on_stop(self) -> None:
|
||||||
|
"""Stop the actor - Nautilus handles executor cleanup"""
|
||||||
|
self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping")
|
||||||
|
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
||||||
|
|
||||||
|
async def on_start_async(self) -> None:
|
||||||
|
try:
|
||||||
|
# Wait for symbol discovery to complete
|
||||||
|
self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...")
|
||||||
|
await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0)
|
||||||
|
|
||||||
|
# PRESERVED: Start WebSocket connections
|
||||||
|
await self._start_websocket_connections()
|
||||||
|
|
||||||
|
self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully")
|
||||||
|
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
self.log.error("Nautilus ActorExecutor: Timeout waiting for symbol discovery")
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Failed to start SILOQYMainActor: {e}")
|
||||||
|
|
||||||
|
def handle_symbols_discovered(self, data):
|
||||||
|
"""Handle symbols from discovery actor"""
|
||||||
|
try:
|
||||||
|
symbols, timestamp = data
|
||||||
|
self.symbols = symbols
|
||||||
|
self.log.info(f"Nautilus ActorExecutor: Received {len(symbols)} symbols from discovery actor")
|
||||||
|
|
||||||
|
if self.symbols and self.active_candles:
|
||||||
|
self.discovery_complete.set()
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}")
|
||||||
|
|
||||||
|
# FIXED: SILOQYMainActor.handle_candles_initial
|
||||||
|
def handle_candles_initial(self, data):
|
||||||
|
"""Handle initial candles from discovery actor and properly initialize the candle dict."""
|
||||||
|
try:
|
||||||
|
candles_received, timestamp = data
|
||||||
|
self.log.info(f"Nautilus ActorExecutor: Received {len(candles_received)} initial candles from discovery actor")
|
||||||
|
|
||||||
|
# FIX: Re-initialize active_candles with the full dictionary structure
|
||||||
|
# STEP 5 FIX: Added rounding to 8 decimals for all price values
|
||||||
|
self.active_candles = {}
|
||||||
|
for symbol, open_price in candles_received.items():
|
||||||
|
self.active_candles[symbol] = {
|
||||||
|
'open_price': round(open_price, 8),
|
||||||
|
'open_time': (int(time.time() * 1000) // self.candle_interval_ms) * self.candle_interval_ms,
|
||||||
|
'high': round(open_price, 8),
|
||||||
|
'low': round(open_price, 8),
|
||||||
|
'close': round(open_price, 8),
|
||||||
|
'volume': 0.0,
|
||||||
|
}
|
||||||
|
|
||||||
|
if self.symbols and self.active_candles:
|
||||||
|
self.discovery_complete.set()
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}")
|
||||||
|
|
||||||
|
async def _start_websocket_connections(self):
|
||||||
|
"""PRESERVED: Original WebSocket connection logic with Nautilus ActorExecutor"""
|
||||||
|
self.log.info("Starting WebSocket simulation for tick generation...")
|
||||||
|
self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols")
|
||||||
|
|
||||||
|
# Use Nautilus ActorExecutor for WebSocket task management
|
||||||
|
if hasattr(self, '_executor') and self._executor:
|
||||||
|
self.log.info("Nautilus ActorExecutor: Using registered executor for WebSocket simulation")
|
||||||
|
future = self._executor.submit(self._simulate_websocket_ticks())
|
||||||
|
self.log.info("Nautilus ActorExecutor: WebSocket simulation task submitted")
|
||||||
|
else:
|
||||||
|
self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio")
|
||||||
|
task = asyncio.create_task(self._simulate_websocket_ticks())
|
||||||
|
self.ws_tasks.append(task)
|
||||||
|
|
||||||
|
async def _simulate_websocket_ticks(self):
|
||||||
|
# Adjust tick rate for throttle mode
|
||||||
|
tick_delay = 0.1 if self.throttle_mode else 0.01
|
||||||
|
symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols))
|
||||||
|
|
||||||
|
tick_count = 0
|
||||||
|
try:
|
||||||
|
if self.throttle_mode:
|
||||||
|
self.log.warning(f"THROTTLE MODE: Reduced tick generation - {symbols_to_simulate} symbols at 10 ticks/second")
|
||||||
|
else:
|
||||||
|
self.log.info("WebSocket tick simulation started - generating 100 ticks/second")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
for symbol in self.symbols[:symbols_to_simulate]:
|
||||||
|
# STEP 2 FIX: Round price to 8 decimals, use float for quantity with 8 decimals
|
||||||
|
price = round(100.0 + random.gauss(0, 0.5), 8)
|
||||||
|
quantity = round(random.uniform(0.00000001, 100.0), 8)
|
||||||
|
timestamp = int(time.time() * 1000)
|
||||||
|
|
||||||
|
self.on_websocket_tick(symbol, price, quantity, timestamp)
|
||||||
|
tick_count += 1
|
||||||
|
|
||||||
|
# Log every 500 ticks in throttle mode, 1000 in normal mode
|
||||||
|
log_interval = 500 if self.throttle_mode else 1000
|
||||||
|
if tick_count % log_interval == 0:
|
||||||
|
mode_indicator = "THROTTLE" if self.throttle_mode else ""
|
||||||
|
self.log.info(f"Generated {tick_count} ticks {mode_indicator} - latest: {symbol}@{price:.2f}")
|
||||||
|
|
||||||
|
await asyncio.sleep(tick_delay)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
self.log.info(f"Nautilus ActorExecutor: WebSocket simulation stopped after {tick_count} ticks")
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: WebSocket simulation error: {e}")
|
||||||
|
|
||||||
|
def on_websocket_tick(self, symbol: str, price: float, quantity: float, timestamp: int):
|
||||||
|
"""
|
||||||
|
PRESERVED: Original tick processing and candle update logic
|
||||||
|
"""
|
||||||
|
# PRESERVED: Original candle update logic
|
||||||
|
if symbol not in self.active_candles:
|
||||||
|
candle_open_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms
|
||||||
|
# STEP 4 FIX: Round all prices to 8 decimals
|
||||||
|
self.active_candles[symbol] = {
|
||||||
|
'open_price': round(price, 8),
|
||||||
|
'open_time': candle_open_time,
|
||||||
|
'high': round(price, 8),
|
||||||
|
'low': round(price, 8),
|
||||||
|
'close': round(price, 8),
|
||||||
|
'volume': 0.0
|
||||||
|
}
|
||||||
|
|
||||||
|
candle = self.active_candles[symbol]
|
||||||
|
|
||||||
|
# PRESERVED: Original candle rolling logic
|
||||||
|
current_candle_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms
|
||||||
|
if current_candle_time > candle['open_time']:
|
||||||
|
# STEP 4 FIX: Round all prices to 8 decimals
|
||||||
|
candle['open_price'] = round(price, 8)
|
||||||
|
candle['open_time'] = current_candle_time
|
||||||
|
candle['high'] = round(price, 8)
|
||||||
|
candle['low'] = round(price, 8)
|
||||||
|
candle['close'] = round(price, 8)
|
||||||
|
candle['volume'] = quantity
|
||||||
|
else:
|
||||||
|
candle['close'] = round(price, 8)
|
||||||
|
candle['high'] = round(max(candle['high'], price), 8)
|
||||||
|
candle['low'] = round(min(candle['low'], price), 8)
|
||||||
|
candle['volume'] += quantity
|
||||||
|
|
||||||
|
# PRESERVED: Original high-performance tuple publishing
|
||||||
|
try:
|
||||||
|
if hasattr(self, 'msgbus') and self.msgbus:
|
||||||
|
# STEP 3 FIX: Round all float values to 8 decimals in the tuple
|
||||||
|
tick_tuple = (
|
||||||
|
symbol,
|
||||||
|
round(float(price), 8),
|
||||||
|
round(float(quantity), 8),
|
||||||
|
int(timestamp),
|
||||||
|
round(float(candle['open_price']), 8),
|
||||||
|
int(candle['open_time'])
|
||||||
|
)
|
||||||
|
|
||||||
|
self.msgbus.publish(RAW_TOPIC, tick_tuple)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Failed to publish tick: {e}")
|
||||||
|
|
||||||
|
class DOLPHINRegimeActor(Actor):
|
||||||
|
def __init__(self, config: DOLPHINRegimeActorConfig):
|
||||||
|
super().__init__(config)
|
||||||
|
|
||||||
|
# PRESERVED: All original DOLPHIN configuration
|
||||||
|
self.max_symbols = config.max_symbols
|
||||||
|
self.ticks_per_analysis = config.ticks_per_analysis
|
||||||
|
|
||||||
|
# PRESERVED: All original pre-allocated arrays for zero allocation
|
||||||
|
self.open_prices = np.zeros(self.max_symbols, dtype=np.float64)
|
||||||
|
self.close_prices = np.zeros(self.max_symbols, dtype=np.float64)
|
||||||
|
self.high_prices = np.zeros(self.max_symbols, dtype=np.float64)
|
||||||
|
self.low_prices = np.zeros(self.max_symbols, dtype=np.float64)
|
||||||
|
self.volumes = np.zeros(self.max_symbols, dtype=np.float64)
|
||||||
|
self.last_update = np.zeros(self.max_symbols, dtype=np.int64)
|
||||||
|
|
||||||
|
# PRESERVED: All original mapping and state
|
||||||
|
self.symbol_to_idx = {}
|
||||||
|
self.idx_to_symbol = {}
|
||||||
|
self.active_symbols = 0
|
||||||
|
|
||||||
|
self.tick_count = 0
|
||||||
|
|
||||||
|
# PRESERVED: All original DOLPHIN thresholds - EXACT
|
||||||
|
self.bull_threshold = 0.60 # 60% bullish
|
||||||
|
self.bear_threshold = 0.55 # 55% bearish
|
||||||
|
|
||||||
|
self.previous_bull_ratio = None
|
||||||
|
self.regime_history = deque(maxlen=100)
|
||||||
|
|
||||||
|
# Metrics
|
||||||
|
self.last_metric_log = time.time_ns()
|
||||||
|
self.processed_ticks = 0
|
||||||
|
self.regime_calculations = 0
|
||||||
|
|
||||||
|
self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, "
|
||||||
|
f"ticks_per_analysis: {self.ticks_per_analysis}")
|
||||||
|
|
||||||
|
def on_start(self) -> None:
|
||||||
|
"""Subscribe to tick events - using Nautilus ActorExecutor"""
|
||||||
|
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to tick events")
|
||||||
|
|
||||||
|
if hasattr(self, 'msgbus') and self.msgbus:
|
||||||
|
self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick)
|
||||||
|
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events")
|
||||||
|
|
||||||
|
def on_stop(self) -> None:
|
||||||
|
"""Stop the actor - Nautilus handles executor cleanup"""
|
||||||
|
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping")
|
||||||
|
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
||||||
|
|
||||||
|
def handle_raw_tick(self, data):
|
||||||
|
"""
|
||||||
|
PRESERVED EXACTLY: All original zero-allocation tick processing
|
||||||
|
Using Nautilus ActorExecutor for regime detection tasks
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
symbol, price, size, ts_event, open_price, candle_open_time = data
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}")
|
||||||
|
return
|
||||||
|
|
||||||
|
# PRESERVED EXACTLY: All original array processing logic
|
||||||
|
if symbol not in self.symbol_to_idx:
|
||||||
|
if self.active_symbols >= self.max_symbols:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Max symbols ({self.max_symbols}) exceeded")
|
||||||
|
return
|
||||||
|
|
||||||
|
idx = self.active_symbols
|
||||||
|
self.symbol_to_idx[symbol] = idx
|
||||||
|
self.idx_to_symbol[idx] = symbol
|
||||||
|
self.active_symbols += 1
|
||||||
|
|
||||||
|
# Initialize arrays
|
||||||
|
self.open_prices[idx] = open_price
|
||||||
|
self.high_prices[idx] = price
|
||||||
|
self.low_prices[idx] = price
|
||||||
|
self.close_prices[idx] = price
|
||||||
|
self.volumes[idx] = 0.0
|
||||||
|
else:
|
||||||
|
idx = self.symbol_to_idx[symbol]
|
||||||
|
|
||||||
|
# Check if new candle period
|
||||||
|
if candle_open_time > self.last_update[idx]:
|
||||||
|
# Reset for new candle
|
||||||
|
self.open_prices[idx] = open_price
|
||||||
|
self.high_prices[idx] = price
|
||||||
|
self.low_prices[idx] = price
|
||||||
|
self.close_prices[idx] = price
|
||||||
|
self.volumes[idx] = size
|
||||||
|
self.last_update[idx] = candle_open_time
|
||||||
|
else:
|
||||||
|
# Update existing candle data
|
||||||
|
self.close_prices[idx] = price
|
||||||
|
self.high_prices[idx] = max(self.high_prices[idx], price)
|
||||||
|
self.low_prices[idx] = min(self.low_prices[idx], price)
|
||||||
|
self.volumes[idx] += size
|
||||||
|
|
||||||
|
self.tick_count += 1
|
||||||
|
self.processed_ticks += 1
|
||||||
|
|
||||||
|
# PRESERVED: Original trigger logic
|
||||||
|
if self.tick_count >= self.ticks_per_analysis:
|
||||||
|
# Use Nautilus ActorExecutor for regime detection
|
||||||
|
if hasattr(self, '_executor') and self._executor:
|
||||||
|
future = self._executor.submit(self._run_regime_detection_async())
|
||||||
|
else:
|
||||||
|
# Fallback to sync detection
|
||||||
|
self._run_regime_detection()
|
||||||
|
self.tick_count = 0
|
||||||
|
|
||||||
|
# Periodic metrics logging
|
||||||
|
now = time.time_ns()
|
||||||
|
if now - self.last_metric_log > 1_000_000_000:
|
||||||
|
self.log.info(f"Nautilus ActorExecutor: DOLPHIN metrics - ticks: {self.processed_ticks}, "
|
||||||
|
f"regime_calcs: {self.regime_calculations}, active_symbols: {self.active_symbols}")
|
||||||
|
self.last_metric_log = now
|
||||||
|
|
||||||
|
async def _run_regime_detection_async(self):
|
||||||
|
try:
|
||||||
|
self._run_regime_detection()
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}")
|
||||||
|
|
||||||
|
def _run_regime_detection(self):
|
||||||
|
self.regime_calculations += 1
|
||||||
|
|
||||||
|
total_symbols = self.active_symbols
|
||||||
|
if total_symbols == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
analyzed = 0
|
||||||
|
bullish = 0
|
||||||
|
bearish = 0
|
||||||
|
|
||||||
|
# PRESERVED: Original analysis with exact thresholds
|
||||||
|
for idx in range(self.active_symbols):
|
||||||
|
open_price = self.open_prices[idx]
|
||||||
|
close_price = self.close_prices[idx]
|
||||||
|
|
||||||
|
if open_price == 0:
|
||||||
|
continue
|
||||||
|
|
||||||
|
analyzed += 1
|
||||||
|
|
||||||
|
# PRESERVED: EXACT DOLPHIN thresholds
|
||||||
|
change = (close_price - open_price) / open_price
|
||||||
|
|
||||||
|
if change >= 0.0015: # 0.15% threshold for bullish
|
||||||
|
bullish += 1
|
||||||
|
elif change <= -0.0015: # -0.15% threshold for bearish
|
||||||
|
bearish += 1
|
||||||
|
|
||||||
|
if analyzed == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
# PRESERVED: Original ratio calculations
|
||||||
|
bull_ratio = bullish / analyzed
|
||||||
|
bear_ratio = bearish / analyzed
|
||||||
|
sideways_ratio = 1.0 - bull_ratio - bear_ratio
|
||||||
|
|
||||||
|
# PRESERVED: Original regime determination logic
|
||||||
|
if bull_ratio >= self.bull_threshold: # 60% bullish
|
||||||
|
regime = MarketRegime.BULL
|
||||||
|
elif bear_ratio >= self.bear_threshold: # 55% bearish
|
||||||
|
regime = MarketRegime.BEAR
|
||||||
|
else:
|
||||||
|
# Check for transition
|
||||||
|
if self.previous_bull_ratio is not None:
|
||||||
|
ratio_change = abs(bull_ratio - self.previous_bull_ratio)
|
||||||
|
if ratio_change >= 0.15: # 15% change threshold
|
||||||
|
regime = MarketRegime.TRANSITION
|
||||||
|
else:
|
||||||
|
regime = MarketRegime.SIDEWAYS
|
||||||
|
else:
|
||||||
|
regime = MarketRegime.SIDEWAYS
|
||||||
|
|
||||||
|
# PRESERVED: Original confidence calculation
|
||||||
|
confidence = self._calculate_confidence(bull_ratio, bear_ratio, analyzed, total_symbols)
|
||||||
|
|
||||||
|
self.previous_bull_ratio = bull_ratio
|
||||||
|
|
||||||
|
# Publish regime result using Nautilus message bus
|
||||||
|
try:
|
||||||
|
if hasattr(self, 'msgbus') and self.msgbus:
|
||||||
|
regime_tuple = (
|
||||||
|
int(time.time() * 1000),
|
||||||
|
regime.value,
|
||||||
|
float(bull_ratio),
|
||||||
|
float(bear_ratio),
|
||||||
|
float(sideways_ratio),
|
||||||
|
int(analyzed),
|
||||||
|
int(total_symbols),
|
||||||
|
float(confidence)
|
||||||
|
)
|
||||||
|
|
||||||
|
self.msgbus.publish(REGIME_TOPIC, regime_tuple)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}")
|
||||||
|
|
||||||
|
# Log regime changes
|
||||||
|
if not self.regime_history or regime != self.regime_history[-1]:
|
||||||
|
self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.1%} "
|
||||||
|
f"Bear: {bear_ratio:.1%} Sideways: {sideways_ratio:.1%} | "
|
||||||
|
f"Confidence: {confidence:.1%} | Analyzed: {analyzed}/{total_symbols}")
|
||||||
|
self.regime_history.append(regime)
|
||||||
|
|
||||||
|
# Periodic regime status (even without changes)
|
||||||
|
if self.regime_calculations % 10 == 0: # Every 10 calculations
|
||||||
|
self.log.info(f"REGIME STATUS: {regime.value} | Bull: {bull_ratio:.1%} "
|
||||||
|
f"Bear: {bear_ratio:.1%} | Processed: {self.processed_ticks} ticks")
|
||||||
|
|
||||||
|
def _calculate_confidence(self, bull_ratio: float, bear_ratio: float,
|
||||||
|
analyzed: int, total: int) -> float:
|
||||||
|
"""PRESERVED EXACTLY: Original DOLPHIN confidence calculation"""
|
||||||
|
if analyzed == 0:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
# Market Decisiveness
|
||||||
|
max_ratio = max(bull_ratio, bear_ratio)
|
||||||
|
decisiveness = abs(max_ratio - 0.5) * 2
|
||||||
|
|
||||||
|
# Sample Coverage
|
||||||
|
coverage = analyzed / total
|
||||||
|
|
||||||
|
# Statistical Significance
|
||||||
|
if max_ratio > 0 and max_ratio < 1:
|
||||||
|
standard_error = math.sqrt(max_ratio * (1 - max_ratio) / analyzed)
|
||||||
|
else:
|
||||||
|
standard_error = 0.0
|
||||||
|
|
||||||
|
z_score = abs(max_ratio - 0.5) / max(standard_error, 0.001)
|
||||||
|
statistical_confidence = min(z_score / 3.0, 1.0)
|
||||||
|
|
||||||
|
# Market Clarity
|
||||||
|
market_clarity = bull_ratio + bear_ratio
|
||||||
|
|
||||||
|
# Weighted combination
|
||||||
|
confidence = (
|
||||||
|
decisiveness * 0.40 +
|
||||||
|
coverage * 0.10 +
|
||||||
|
statistical_confidence * 0.30 +
|
||||||
|
market_clarity * 0.20
|
||||||
|
)
|
||||||
|
|
||||||
|
return max(0.0, min(confidence, 1.0))
|
||||||
|
|
||||||
|
class SILOQYNormalizerActor(Actor):
|
||||||
|
def __init__(self, config: SILOQYNormalizerConfig):
|
||||||
|
super().__init__(config)
|
||||||
|
|
||||||
|
self.normalized_count = 0
|
||||||
|
self.last_metric_log = time.time_ns()
|
||||||
|
|
||||||
|
self.log.info("SILOQYNormalizerActor initialized with Nautilus ActorExecutor")
|
||||||
|
|
||||||
|
def on_start(self) -> None:
|
||||||
|
"""Subscribe to tick events - using Nautilus ActorExecutor"""
|
||||||
|
self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor starting - subscribing to tick events")
|
||||||
|
|
||||||
|
if hasattr(self, 'msgbus') and self.msgbus:
|
||||||
|
self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick)
|
||||||
|
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events")
|
||||||
|
|
||||||
|
def on_stop(self) -> None:
|
||||||
|
"""Stop the actor - Nautilus handles executor cleanup"""
|
||||||
|
self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor stopping")
|
||||||
|
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
||||||
|
|
||||||
|
def handle_raw_tick(self, data):
|
||||||
|
try:
|
||||||
|
symbol, price, size, ts_event, open_price, candle_open_time = data
|
||||||
|
tick = SiloqyTradeTick(
|
||||||
|
instrument_id=symbol,
|
||||||
|
price=round(float(price), 8),
|
||||||
|
size=round(float(size), 8),
|
||||||
|
ts_event=int(ts_event),
|
||||||
|
ts_init=int(time.time_ns()),
|
||||||
|
open_price=round(float(open_price), 8) if open_price is not None else round(float(price), 8),
|
||||||
|
candle_open_time=int(candle_open_time) if candle_open_time is not None else int(time.time() * 1000),
|
||||||
|
tick_side=None, # Default to None unless Binance provides this info
|
||||||
|
exchange="BINANCE", # We know this is from Binance
|
||||||
|
liquidity_flag=None # Default to None unless Binance provides this info
|
||||||
|
)
|
||||||
|
|
||||||
|
if hasattr(self, 'msgbus') and self.msgbus:
|
||||||
|
try:
|
||||||
|
# Publish directly to STRUCTURED_TOPIC to match original v3.28 behavior
|
||||||
|
self.msgbus.publish(STRUCTURED_TOPIC, tick)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Failed to publish to STRUCTURED_TOPIC: {e}")
|
||||||
|
|
||||||
|
# FALLBACK: Try Nautilus publish_data if msgbus not available
|
||||||
|
elif hasattr(self, 'publish_data'):
|
||||||
|
try:
|
||||||
|
# Use standard Nautilus publishing as fallback
|
||||||
|
self.publish_data(DataType(SiloqyTradeTick), tick)
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Failed to publish structured data: {e}")
|
||||||
|
self.normalized_count += 1
|
||||||
|
|
||||||
|
# Periodic metrics
|
||||||
|
now = time.time_ns()
|
||||||
|
if now - self.last_metric_log > 1_000_000_000:
|
||||||
|
self.log.info(f"Nautilus ActorExecutor: Normalizer processed: {self.normalized_count} ticks")
|
||||||
|
self.last_metric_log = now
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Failed to handle raw tick: {e}")
|
||||||
|
|
||||||
|
def test_siloqy_actors_with_nautilus_process_management():
|
||||||
|
"""Test SILOQY actors with Nautilus built-in ActorExecutor process management"""
|
||||||
|
print("SILOQY Multi-Actor Test with Nautilus Built-in Process Management")
|
||||||
|
|
||||||
|
# Configuration following test pattern - THROTTLE MODE ENABLED FOR DEV/TESTING
|
||||||
|
symbol_discovery_config = ImportableActorConfig(
|
||||||
|
actor_path="__main__:SILOQYSymbolDiscoveryActor",
|
||||||
|
config_path="__main__:SILOQYSymbolDiscoveryConfig",
|
||||||
|
config={
|
||||||
|
"component_id": "SILOQY-SYMBOL-DISCOVERY",
|
||||||
|
"symbols": [], # Empty for dynamic discovery
|
||||||
|
"candle_interval_ms": 15 * 60 * 1000,
|
||||||
|
"throttle_mode": True, # ENABLED: Safe for dual instance testing
|
||||||
|
"throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s)
|
||||||
|
"max_symbols_throttled": 100 # Only 100 symbols (vs 2000+)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
main_actor_config = ImportableActorConfig(
|
||||||
|
actor_path="__main__:SILOQYMainActor",
|
||||||
|
config_path="__main__:SILOQYMainActorConfig",
|
||||||
|
config={
|
||||||
|
"component_id": "SILOQY-MAIN-ACTOR",
|
||||||
|
"candle_interval_ms": 15 * 60 * 1000,
|
||||||
|
"throttle_mode": True # ENABLED: Reduced tick generation
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
regime_actor_config = ImportableActorConfig(
|
||||||
|
actor_path="__main__:DOLPHINRegimeActor",
|
||||||
|
config_path="__main__:DOLPHINRegimeActorConfig",
|
||||||
|
config={
|
||||||
|
"component_id": "DOLPHIN-REGIME-ACTOR",
|
||||||
|
"max_symbols": 5000,
|
||||||
|
"ticks_per_analysis": 500 # Reduced for throttle mode testing
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
normalizer_config = ImportableActorConfig(
|
||||||
|
actor_path="__main__:SILOQYNormalizerActor",
|
||||||
|
config_path="__main__:SILOQYNormalizerConfig",
|
||||||
|
config={
|
||||||
|
"component_id": "SILOQY-NORMALIZER"
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Trading node configuration - Uses Nautilus built-in process management
|
||||||
|
trading_config = TradingNodeConfig(
|
||||||
|
trader_id=TraderId("SILOQY-TRADER-001"),
|
||||||
|
actors=[
|
||||||
|
symbol_discovery_config,
|
||||||
|
main_actor_config,
|
||||||
|
regime_actor_config,
|
||||||
|
normalizer_config
|
||||||
|
],
|
||||||
|
data_clients={}, # No external data clients needed
|
||||||
|
exec_clients={} # No execution clients for this test
|
||||||
|
)
|
||||||
|
|
||||||
|
node = TradingNode(config=trading_config)
|
||||||
|
|
||||||
|
try:
|
||||||
|
node.build()
|
||||||
|
print("Node built successfully with Nautilus built-in process management")
|
||||||
|
node.run()
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\nSILOQY Actor test with Nautilus process management completed!")
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error: {e}")
|
||||||
|
import traceback
|
||||||
|
traceback.print_exc()
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
node.dispose()
|
||||||
|
print("Nautilus process management: Node disposed successfully")
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
test_siloqy_actors_with_nautilus_process_management()
|
||||||
Reference in New Issue
Block a user