Files
siloqy/nautilus_actor_test_implementation_6x.py

1549 lines
69 KiB
Python

import time
import numpy as np
import asyncio
import json
import math
import httpx
import websockets
from typing import Dict, List, Optional, Tuple, Any
from enum import Enum
from collections import deque
import random # Added for _simulate_websocket_ticks
from dataclasses import dataclass, field
# Nautilus imports - following test pattern
from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig
from nautilus_trader.live.node import TradingNode
from nautilus_trader.common.actor import Actor
from nautilus_trader.common.config import ActorConfig
from nautilus_trader.model.identifiers import TraderId
from nautilus_trader.core.data import Data
from nautilus_trader.common.component import Logger, init_logging
# NEW: construct a fresh Tick for each normalized message (don't mutate immutable Data)
from nautilus_trader.model.data import TradeTick
from nautilus_trader.model.identifiers import InstrumentId, TradeId
from nautilus_trader.model.objects import Price, Quantity
from nautilus_trader.model.enums import AggressorSide
# Initialize logging
_log_guard = init_logging()
_logger = Logger("SILOQY")
# Topics - HIGH PERFORMANCE STRING TOPICS
RAW_TOPIC = "SILOQY.RAW.TICKS"
STRUCTURED_TOPIC = "SILOQY.STRUCTURED.TICKS"
REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS"
SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED"
CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL"
# ADDED LINE 18:
TICK_SIZES_TOPIC = "SILOQY.TICK.SIZES"
# NEW: Enhanced BB indicator topics for tuple-based publishing
REGIME_INDICATORS_TOPIC = "DOLPHIN.REGIME.INDICATORS"
BB_METRICS_TOPIC = "DOLPHIN.BB.METRICS"
TEMPORAL_PATTERNS_TOPIC = "DOLPHIN.TEMPORAL.PATTERNS"
# Rate limiting constant
MIN_INTERVAL = 2.5 # seconds between API batches
# Market Regime Enum
class MarketRegime(Enum):
BULL = "BULL"
BEAR = "BEAR"
TRANSITION = "TRANSITION"
SIDEWAYS = "SIDEWAYS"
# ============================================================================
# EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE
# ============================================================================
#TODO TODO: Manually added. Left for *future* re-integration of class instead of tuples (where appropriate)
class SILOQYQuoteTick(Data):
def __init__(self, instrument_id, price, size, ts_event, ts_init=None):
super().__init__()
self.instrument_id = str(instrument_id)
self.price = float(price)
self.size = float(size)
self.ts_event = int(ts_event)
self.ts_init = int(ts_init if ts_init is not None else time.time_ns())
@dataclass
class TickData:
"""Universal tick data structure from standalone"""
symbol: str
price: float
quantity: float
timestamp: int
is_buyer_maker: bool
trade_id: int
exchange: str
raw_data: Dict = None
def __post_init__(self):
if self.raw_data is None:
self.raw_data = {}
@dataclass
class ExchangeConfig:
"""Exchange-specific configuration from standalone"""
name: str
websocket_limit_per_second: int
max_streams_per_connection: int
max_connections_per_window: int
connection_window_seconds: int
base_websocket_url: str
requires_auth: bool = False
ping_interval: float = 20.0
ping_timeout: float = 10.0
class BinanceAdapter:
"""EXTRACTED FROM STANDALONE: Binance-specific WebSocket adapter"""
def __init__(self):
self.config = ExchangeConfig(
name="binance",
websocket_limit_per_second=10,
max_streams_per_connection=1024,
max_connections_per_window=300,
connection_window_seconds=300, # 5 minutes
base_websocket_url="wss://stream.binance.com:9443",
requires_auth=False
)
self.logger = Logger(f"BINANCE_ADAPTER")
def get_websocket_url(self, symbols: List[str]) -> str:
"""Build Binance WebSocket URL with combined streams"""
if len(symbols) == 1:
symbol = symbols[0].lower()
return f"{self.config.base_websocket_url}/ws/{symbol}@trade"
else:
# Combined stream approach
streams = [f"{symbol.lower()}@trade" for symbol in symbols]
stream_string = "/".join(streams)
return f"{self.config.base_websocket_url}/stream?streams={stream_string}"
def parse_message(self, message: str) -> Optional[TickData]:
"""EXTRACTED FROM STANDALONE: Parse Binance trade message"""
try:
data = json.loads(message)
# Handle combined stream format
if 'stream' in data and 'data' in data:
stream_name = data['stream']
trade_data = data['data']
else:
# Single stream format
trade_data = data
stream_name = trade_data.get('e', '')
# Only process trade events
if not (stream_name.endswith('@trade') or trade_data.get('e') == 'trade'):
return None
return TickData(
symbol=trade_data['s'],
price=float(trade_data['p']),
quantity=float(trade_data['q']),
timestamp=int(trade_data['T']),
is_buyer_maker=trade_data['m'],
trade_id=int(trade_data['t']),
exchange="binance",
raw_data=trade_data
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
self.logger.error(f"Failed to parse Binance message: {e}")
return None
def calculate_optimal_distribution(self, symbols: List[str]) -> List[List[str]]:
"""EXTRACTED FROM STANDALONE: Calculate optimal symbol distribution across connections"""
# Conservative: leave room for message bursts
safety_margin = 0.7
symbols_per_connection = int(
min(
self.config.max_streams_per_connection * safety_margin,
self.config.websocket_limit_per_second * 5 # 5-second buffer
)
)
# Group symbols into chunks
symbol_groups = []
for i in range(0, len(symbols), symbols_per_connection):
group = symbols[i:i + symbols_per_connection]
symbol_groups.append(group)
return symbol_groups
class SiloqyWebSocketConnection:
"""EXTRACTED FROM STANDALONE: WebSocket connection adapted for Nautilus msgbus"""
def __init__(self, connection_id: str, symbols: List[str],
binance_adapter: BinanceAdapter, main_actor_ref):
self.connection_id = connection_id
self.symbols = symbols
self.adapter = binance_adapter
self.main_actor = main_actor_ref # Reference to SILOQYMainActor for msgbus access
self.websocket = None
self.is_running = False
self.retry_count = 0
self.messages_received = 0
self.last_message_time = 0.0
self.connected_at = None
self.logger = Logger(f"WS_{connection_id}")
self.logger.info(f"WebSocket connection initialized for {len(symbols)} symbols")
async def start(self) -> None:
"""Start the WebSocket connection"""
self.is_running = True
while self.is_running:
try:
await self._connect_and_run()
except Exception as e:
self.logger.error(f"Connection {self.connection_id} error: {e}")
if self.is_running:
await self._handle_reconnection()
else:
break
async def _connect_and_run(self) -> None:
"""EXTRACTED FROM STANDALONE: Connect to WebSocket and run message loop"""
url = self.adapter.get_websocket_url(self.symbols)
self.logger.info(f"Connecting to Binance: {url[:100]}...")
async with websockets.connect(
url,
ping_interval=self.adapter.config.ping_interval,
ping_timeout=self.adapter.config.ping_timeout,
close_timeout=10
) as websocket:
self.websocket = websocket
self.connected_at = time.time()
self.retry_count = 0
self.logger.info(f"Connection {self.connection_id} established")
# Message processing loop
async for message in websocket:
if not self.is_running:
break
try:
await self._process_message(message)
except Exception as e:
self.logger.error(f"Message processing error: {e}")
async def _process_message(self, message: str) -> None:
"""Process incoming WebSocket message and convert to Nautilus format"""
self.messages_received += 1
self.last_message_time = time.time()
# Parse message using Binance adapter
tick_data = self.adapter.parse_message(message)
if tick_data:
# Convert TickData to SiloqyTradeTick and publish via Nautilus msgbus
try:
# Create SiloqyTradeTick object
siloqy_tick = SiloqyTradeTick(
instrument_id=tick_data.symbol,
price=round(tick_data.price, 8),
size=round(tick_data.quantity, 8),
ts_event=tick_data.timestamp * 1000000, # Convert ms to ns
ts_init=int(time.time_ns()),
open_price=None, # Will be set by candle logic
candle_open_time=None, # Will be set by candle logic
tick_side=None,
exchange="BINANCE",
liquidity_flag=None
)
# Get candle information from main actor
if hasattr(self.main_actor, 'active_candles') and self.main_actor.active_candles:
symbol = tick_data.symbol
if symbol in self.main_actor.active_candles:
candle = self.main_actor.active_candles[symbol]
siloqy_tick.open_price = candle.get('open_price', tick_data.price)
siloqy_tick.candle_open_time = candle.get('open_time', int(time.time() * 1000))
# Process through main actor's tick handling (preserves all candle logic)
self.main_actor.on_websocket_tick(
tick_data.symbol,
tick_data.price,
tick_data.quantity,
tick_data.timestamp
)
except Exception as e:
self.logger.error(f"Failed to process tick: {e}")
async def _handle_reconnection(self) -> None:
"""Handle reconnection with exponential backoff"""
self.retry_count += 1
delay = min(2 ** self.retry_count, 60) # Max 60 seconds
self.logger.warning(f"Reconnecting in {delay}s (attempt {self.retry_count})")
await asyncio.sleep(delay)
async def stop(self) -> None:
"""Stop the connection"""
self.logger.info(f"Stopping connection {self.connection_id}")
self.is_running = False
if self.websocket:
await self.websocket.close()
# --------------------------------------------------------------------
# SILOQY Custom Tick - PRESERVED with fixes
# --------------------------------------------------------------------
class SiloqyTradeTick(TradeTick):
def __init__(self, instrument_id: str, price: float, size: float, ts_event: int, ts_init: int = None,
open_price: float = None, candle_open_time: int = None, tick_side: str = None,
exchange: str = None, liquidity_flag = None):
# Convert to proper Nautilus types - add venue to symbol
if isinstance(instrument_id, str):
# Add BINANCE venue if not already present
if '.' not in instrument_id:
instrument_id = f"{instrument_id}.BINANCE"
instr_id = InstrumentId.from_str(instrument_id)
else:
instr_id = instrument_id
# STEP 1 FIX: Changed precision from 4 to 8 for price, and 0 to 8 for size
price_obj = Price(price, precision=8) if not isinstance(price, Price) else price
size_obj = Quantity(size, precision=8) if not isinstance(size, Quantity) else size
# Default aggressor side and trade ID
aggressor_side = AggressorSide.NO_AGGRESSOR # Default since Binance doesn't always provide this
trade_id = TradeId(f"T{int(time.time_ns())}") # Generate a trade ID
super().__init__(
instrument_id=instr_id,
price=price_obj,
size=size_obj,
aggressor_side=aggressor_side,
trade_id=trade_id,
ts_event=int(ts_event),
ts_init=int(ts_init if ts_init is not None else time.time_ns())
)
# Additional SILOQY fields
self.open_price = open_price if open_price is not None else price
self.candle_open_time = candle_open_time if candle_open_time is not None else int(time.time() * 1000)
self.tick_side = tick_side
self.exchange = exchange
self.liquidity_flag = liquidity_flag
class SILOQYSymbolDiscoveryConfig(ActorConfig):
symbols: List[str] = []
candle_interval_ms: int = 15 * 60 * 1000
throttle_mode: bool = False
throttle_rate_limit_seconds: float = 10.0
max_symbols_throttled: int = 100
class SILOQYMainActorConfig(ActorConfig):
candle_interval_ms: int = 15 * 60 * 1000
throttle_mode: bool = False
enable_real_data: bool = False # NEW: Enable real WebSocket data
class DOLPHINRegimeActorConfig(ActorConfig):
max_symbols: int = 5000
ticks_per_analysis: int = 10
class SILOQYNormalizerConfig(ActorConfig):
pass
class SILOQYSymbolDiscoveryActor(Actor):
"""
Symbol discovery with all original SILOQY algorithms preserved
Using Nautilus built-in ActorExecutor for task management
"""
def __init__(self, config: SILOQYSymbolDiscoveryConfig):
super().__init__(config)
# Preserve original SILOQY configuration
self.symbols = list(config.symbols) if config.symbols else []
self.candle_interval_ms = config.candle_interval_ms
self.active_candles = {}
self.tick_sizes = {}
# Process management configuration
self.throttle_mode = config.throttle_mode
self.throttle_rate_limit_seconds = config.throttle_rate_limit_seconds
self.max_symbols_throttled = config.max_symbols_throttled
if self.throttle_mode:
self.log.warning("THROTTLE MODE ENABLED - Running in dev/test configuration")
self.log.warning(f"Rate limit: {self.throttle_rate_limit_seconds}s between batches")
self.log.warning(f"Symbol limit: {self.max_symbols_throttled} symbols max")
self.log.info("SILOQYSymbolDiscoveryActor initialized with Nautilus ActorExecutor")
def on_start(self) -> None:
"""Start the actor - using Nautilus ActorExecutor"""
self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor starting")
# Use Nautilus ActorExecutor for task management
if hasattr(self, '_executor') and self._executor:
self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization")
future = self._executor.submit(self.on_start_async())
self.log.info("Nautilus ActorExecutor: Symbol discovery task submitted")
else:
self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio")
asyncio.create_task(self.on_start_async())
def on_stop(self) -> None:
"""Stop the actor - Nautilus handles executor cleanup"""
self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor stopping")
# Nautilus kernel handles executor shutdown - no manual cleanup needed
async def on_start_async(self) -> None:
self.log.info("Nautilus ActorExecutor: Starting async symbol discovery initialization")
try:
# PRESERVED: Original symbol discovery algorithm
if not self.symbols:
await self._discover_all_symbols()
# PRESERVED: Original stats fetching and candle reconstruction
stats, candle_opens = await self._fetch_stats_and_reconstruct_candles()
# Set active candles from reconstruction results
self.active_candles = candle_opens
# Publish results using msgbus
self._publish_discovery_results()
self.log.info("Nautilus ActorExecutor: Symbol discovery completed successfully")
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to complete symbol discovery: {e}")
# Don't re-raise, let system continue
async def _discover_all_symbols(self):
"""PRESERVED: Original Binance symbol discovery algorithm"""
self.log.info("Starting dynamic symbol discovery from Binance...")
url = "https://api.binance.com/api/v3/exchangeInfo"
async with httpx.AsyncClient() as client:
self.log.info("Fetching exchange info from Binance API...")
response = await client.get(url, timeout=10)
if response.status_code == 200:
self.log.info("Successfully received exchange info")
data = response.json()
# Combined symbol discovery and tick size extraction
self.log.info("Processing symbols and extracting tick sizes...")
full_symbols = []
for symbol_info in data['symbols']:
if symbol_info['status'] == 'TRADING' and symbol_info['symbol'].endswith('USDT'):
symbol = symbol_info['symbol']
full_symbols.append(symbol)
# Extract tick size while processing # Extract tick size while processing
tick_size = None
for filter_info in symbol_info['filters']:
if filter_info['filterType'] == 'PRICE_FILTER':
tick_size = float(filter_info['tickSize'])
break
# If no PRICE_FILTER found, try other filter types
if tick_size is None:
for filter_info in symbol_info['filters']:
if filter_info['filterType'] == 'TICK_SIZE':
tick_size = float(filter_info['tickSize'])
break
# Fallback to default if still not found
if tick_size is None:
tick_size = 1e-8 # Default fallback
self.log.warning(f"No tick size found for {symbol}, using fallback {tick_size}")
self.tick_sizes[symbol] = tick_size
self.log.info(f"Processed {len(full_symbols)} symbols, extracted {len(self.tick_sizes)} tick sizes")
# Apply throttle mode symbol limiting
if self.throttle_mode:
self.symbols = full_symbols[:self.max_symbols_throttled]
self.log.warning(f"THROTTLE MODE: Limited to {len(self.symbols)} symbols (from {len(full_symbols)} available)")
else:
self.symbols = full_symbols
self.log.info(f"Discovered {len(self.symbols)} trading symbols")
self.log.info(f"First 10 symbols: {self.symbols[:10]}")
else:
self.log.error(f"Failed to fetch exchange info: {response.status_code}")
raise Exception(f"Failed to fetch exchange info: {response.status_code}")
async def _fetch_stats_and_reconstruct_candles(self):
"""PRESERVED: All original rate limiting with Nautilus async patterns"""
url = "https://api.binance.com/api/v3/ticker/24hr"
klines_url = "https://api.binance.com/api/v3/klines"
ticker_url = "https://api.binance.com/api/v3/ticker/price"
# PRESERVED: Original rate limit handling with throttle mode support
base_rate_limit = 2.5
rate_limit_interval = self.throttle_rate_limit_seconds if self.throttle_mode else base_rate_limit
if self.throttle_mode:
self.log.warning(f"THROTTLE MODE: Using {rate_limit_interval}s intervals (vs {base_rate_limit}s normal)")
stats = {}
candle_opens = {}
async with httpx.AsyncClient() as client:
total_batches = (len(self.symbols) + 49) // 50
for i in range(0, len(self.symbols), 50):
batch_num = (i // 50) + 1
start_time = time.time()
symbols_batch = self.symbols[i:i + 50]
if self.throttle_mode:
self.log.info(f"THROTTLE MODE: Processing batch {batch_num}/{total_batches} with {rate_limit_interval}s delay")
# PRESERVED: Original boundary detection logic
current_time = int(time.time() * 1000)
time_into_candle = current_time % self.candle_interval_ms
candle_open_time = current_time - time_into_candle
at_boundary = (time_into_candle < 1000)
if i == 0: # Log initial state
self.log.info("DOLPHIN: Current candle analysis:")
self.log.info(f" - Current time: {current_time}")
self.log.info(f" - Candle interval: {self.candle_interval_ms}ms ({self.candle_interval_ms//60000}m)")
self.log.info(f" - Time into candle: {time_into_candle}ms ({time_into_candle//1000}s)")
self.log.info(f" - At boundary: {at_boundary}")
self.log.info(f" - Candle open time: {candle_open_time}")
symbols_json_string = json.dumps(symbols_batch, separators=(',', ':'))
params = {"symbols": symbols_json_string}
try:
# PRESERVED: Original stats fetching
response = await client.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
for item in data:
symbol = item['symbol']
stats[symbol] = {
'count': int(item['count']),
'quoteVolume': float(item['quoteVolume']),
}
self.log.info(f"Fetched stats for batch {i//50 + 1}: {len(data)} symbols")
else:
self.log.error(f"Error {response.status_code}: {response.text}")
# PRESERVED: Original DOLPHIN candle reconstruction strategy
if at_boundary:
# AT BOUNDARY: Fetch current prices to use as opens
self.log.info(f"DOLPHIN: At boundary - fetching current prices (batch {i//50 + 1})")
ticker_params = {"symbols": symbols_json_string}
ticker_response = await client.get(ticker_url, params=ticker_params, timeout=5)
if ticker_response.status_code == 200:
ticker_data = ticker_response.json()
for item in ticker_data:
symbol = item['symbol']
current_price = float(item['price'])
candle_opens[symbol] = current_price
self.log.info(f" - Fetched current prices for {len(ticker_data)} symbols")
else:
self.log.warning(f" - Current price fetch failed ({ticker_response.status_code})")
else:
# NOT AT BOUNDARY: Fetch historical 1s kline data
self.log.info(f"DOLPHIN: Not at boundary - fetching 1s kline data (batch {i//50 + 1})")
for symbol in symbols_batch:
try:
# PRESERVED: Original kline fetching logic
kline_params = {
'symbol': symbol,
'interval': '1s',
'startTime': candle_open_time,
'endTime': candle_open_time + 1000,
'limit': 1
}
kline_response = await client.get(klines_url, params=kline_params, timeout=5)
if kline_response.status_code == 200:
kline_data = kline_response.json()
if kline_data:
open_price = float(kline_data[0][1])
candle_opens[symbol] = open_price
if (i + len(symbols_batch)) <= 10:
self.log.info(f" - {symbol}: reconstructed open = {open_price}")
else:
self.log.warning(f" - {symbol}: no 1s kline data found")
else:
self.log.warning(f" - {symbol}: kline fetch failed ({kline_response.status_code})")
except Exception as e:
self.log.error(f" - {symbol}: kline fetch error: {e}")
await asyncio.sleep(0.1)
except Exception as e:
self.log.error(f"Request failed: {str(e)}")
# PRESERVED: Original rate limiting with Nautilus async
elapsed = time.time() - start_time
if i + 50 < len(self.symbols):
sleep_time = max(0, rate_limit_interval - elapsed)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
self.log.info(f"DOLPHIN: Candle reconstruction complete:")
self.log.info(f" - Symbols processed: {len(self.symbols)}")
self.log.info(f" - Opens reconstructed: {len(candle_opens)}")
self.log.info("Discovery phase ready for publishing...")
return stats, candle_opens
def _publish_discovery_results(self):
"""Publish results using msgbus - Nautilus message bus integration"""
try:
self.log.info("Nautilus ActorExecutor: Publishing discovery results to other actors...")
if hasattr(self, 'msgbus') and self.msgbus:
# Publish symbols and candles as tuples
self.msgbus.publish(SYMBOLS_DISCOVERED_TOPIC, (self.symbols, int(time.time_ns())))
self.msgbus.publish(CANDLES_INITIAL_TOPIC, (self.active_candles, int(time.time_ns())))
self.msgbus.publish(TICK_SIZES_TOPIC, (self.tick_sizes, int(time.time_ns())))
self.log.info(f"Nautilus ActorExecutor: Published {len(self.symbols)} symbols and {len(self.active_candles)} candles")
self.log.info(f"Nautilus ActorExecutor: Published {len(self.tick_sizes)} tick sizes")
self.log.info("Nautilus ActorExecutor: Discovery phase complete - other actors can now start processing")
else:
self.log.warning("Nautilus ActorExecutor: msgbus not available for publishing")
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to publish discovery results: {e}")
class SILOQYMainActor(Actor):
def __init__(self, config: SILOQYMainActorConfig):
super().__init__(config)
# Preserve original configuration
self.candle_interval_ms = config.candle_interval_ms
self.throttle_mode = config.throttle_mode
self.enable_real_data = config.enable_real_data # NEW: Real data capability
self.connections = {}
self.connection_tasks = {}
# Will be populated from discovery actor
self.symbols = []
self.active_candles = {}
# WebSocket infrastructure - NEW
self.binance_adapter = BinanceAdapter() if self.enable_real_data else None
self.websocket_connections: Dict[str, SiloqyWebSocketConnection] = {}
self.websocket_tasks: Dict[str, asyncio.Task] = {}
# WebSocket tasks - managed by Nautilus ActorExecutor
self.ws_tasks = []
# Synchronization
self.discovery_complete = asyncio.Event()
if self.throttle_mode:
self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation")
if self.enable_real_data:
self.log.info("REAL DATA MODE: Will connect to Binance WebSocket streams")
else:
self.log.info("SIMULATION MODE: Will generate simulated ticks")
self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor")
def on_start(self) -> None:
"""Subscribe to discovery events - using Nautilus ActorExecutor"""
self.log.info("Nautilus ActorExecutor: SILOQYMainActor starting - subscribing to discovery events")
# Subscribe to discovery results
if hasattr(self, 'msgbus') and self.msgbus:
self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered)
self.msgbus.subscribe(CANDLES_INITIAL_TOPIC, self.handle_candles_initial)
self.log.info("Nautilus ActorExecutor: Subscribed to discovery topics")
# Use Nautilus ActorExecutor for task management
if hasattr(self, '_executor') and self._executor:
self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization")
future = self._executor.submit(self.on_start_async())
self.log.info("Nautilus ActorExecutor: Main actor initialization task submitted")
else:
self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio")
asyncio.create_task(self.on_start_async())
def on_stop(self) -> None:
"""Stop the actor - Nautilus handles executor cleanup"""
self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping")
# Stop WebSocket connections if running
if self.enable_real_data:
for conn in self.websocket_connections.values():
asyncio.create_task(conn.stop())
# Nautilus kernel handles executor shutdown - no manual cleanup needed
async def on_start_async(self) -> None:
try:
# Wait for symbol discovery to complete
self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...")
await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0)
# ENHANCED: Start real or simulated connections based on configuration
if self.enable_real_data:
await self._start_real_websocket_connections()
else:
await self._start_simulated_connections()
self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully")
except asyncio.TimeoutError:
self.log.error("Nautilus ActorExecutor: Timeout waiting for symbol discovery")
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to start SILOQYMainActor: {e}")
def handle_symbols_discovered(self, data):
"""Handle symbols from discovery actor"""
try:
symbols, timestamp = data
self.symbols = symbols
self.log.info(f"Nautilus ActorExecutor: Received {len(symbols)} symbols from discovery actor")
if self.symbols and self.active_candles:
self.discovery_complete.set()
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}")
def handle_candles_initial(self, data):
"""Handle initial candles from discovery actor and properly initialize the candle dict."""
try:
candles_received, timestamp = data
self.log.info(f"Nautilus ActorExecutor: Received {len(candles_received)} initial candles from discovery actor")
# FIX: Re-initialize active_candles with the full dictionary structure
# STEP 5 FIX: Added rounding to 8 decimals for all price values
self.active_candles = {}
for symbol, open_price in candles_received.items():
self.active_candles[symbol] = {
'open_price': round(open_price, 8),
'open_time': (int(time.time() * 1000) // self.candle_interval_ms) * self.candle_interval_ms,
'high': round(open_price, 8),
'low': round(open_price, 8),
'close': round(open_price, 8),
'volume': 0.0,
}
if self.symbols and self.active_candles:
self.discovery_complete.set()
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}")
async def _start_real_websocket_connections(self):
"""NEW: Start real Binance WebSocket connections with throttle control"""
self.log.info("Starting REAL Binance WebSocket connections for live data...")
# Apply throttle mode limits to WebSocket connections
symbols_to_use = self.symbols
if self.throttle_mode:
symbols_to_use = self.symbols[:10] # Limit to 10 symbols in throttle mode
self.log.warning(f"THROTTLE MODE: Limited real data to {len(symbols_to_use)} symbols")
# Calculate optimal symbol distribution
symbol_groups = self.binance_adapter.calculate_optimal_distribution(symbols_to_use)
self.log.info(f"Real WebSocket distribution:")
self.log.info(f" - Total symbols: {len(symbols_to_use)}")
self.log.info(f" - Connections needed: {len(symbol_groups)}")
# Start WebSocket connections
for i, symbol_group in enumerate(symbol_groups):
connection_id = f"binance_real_{i:03d}"
connection = SiloqyWebSocketConnection(
connection_id=connection_id,
symbols=symbol_group,
binance_adapter=self.binance_adapter,
main_actor_ref=self
)
self.websocket_connections[connection_id] = connection
# Use Nautilus ActorExecutor for WebSocket task management
if hasattr(self, '_executor') and self._executor:
future = self._executor.submit(connection.start())
self.log.info(f"Real WebSocket {connection_id} submitted to ActorExecutor")
else:
task = asyncio.create_task(connection.start())
self.websocket_tasks[connection_id] = task
self.log.warning(f"Real WebSocket {connection_id} started with asyncio fallback")
# Rate limit connection establishment
if i < len(symbol_groups) - 1:
await asyncio.sleep(1.0) # 1 second between connections
self.log.info(f"Started {len(symbol_groups)} real WebSocket connections")
async def _start_simulated_connections(self):
"""PRESERVED: Original WebSocket simulation logic with Nautilus ActorExecutor"""
self.log.info("Starting WebSocket simulation for tick generation...")
self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols")
# Use Nautilus ActorExecutor for WebSocket task management
if hasattr(self, '_executor') and self._executor:
self.log.info("Nautilus ActorExecutor: Using registered executor for WebSocket simulation")
future = self._executor.submit(self._simulate_websocket_ticks())
self.log.info("Nautilus ActorExecutor: WebSocket simulation task submitted")
else:
self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio")
task = asyncio.create_task(self._simulate_websocket_ticks())
self.ws_tasks.append(task)
async def _simulate_websocket_ticks(self):
"""PRESERVED: Original simulation logic"""
# Adjust tick rate for throttle mode
tick_delay = 0.1 if self.throttle_mode else 0.01
symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols))
tick_count = 0
try:
if self.throttle_mode:
self.log.warning(f"THROTTLE MODE: Reduced tick generation - {symbols_to_simulate} symbols at 10 ticks/second")
else:
self.log.info("WebSocket tick simulation started - generating 100 ticks/second")
while True:
for symbol in self.symbols[:symbols_to_simulate]:
# STEP 2 FIX: Round price to 8 decimals, use float for quantity with 8 decimals
price = round(100.0 + random.gauss(0, 0.5), 8)
quantity = round(random.uniform(0.00000001, 100.0), 8)
timestamp = int(time.time() * 1000)
self.on_websocket_tick(symbol, price, quantity, timestamp)
tick_count += 1
# Log every 500 ticks in throttle mode, 1000 in normal mode
log_interval = 500 if self.throttle_mode else 1000
if tick_count % log_interval == 0:
mode_indicator = "THROTTLE" if self.throttle_mode else ""
data_source = "REAL" if self.enable_real_data else "SIM"
self.log.info(f"Generated {tick_count} ticks {mode_indicator} {data_source} - latest: {symbol}@{price:.2f}")
await asyncio.sleep(tick_delay)
except asyncio.CancelledError:
self.log.info(f"Nautilus ActorExecutor: WebSocket simulation stopped after {tick_count} ticks")
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: WebSocket simulation error: {e}")
def on_websocket_tick(self, symbol: str, price: float, quantity: float, timestamp: int):
"""
PRESERVED: Original tick processing and candle update logic
"""
# PRESERVED: Original candle update logic
if symbol not in self.active_candles:
candle_open_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms
# STEP 4 FIX: Round all prices to 8 decimals
self.active_candles[symbol] = {
'open_price': round(price, 8),
'open_time': candle_open_time,
'high': round(price, 8),
'low': round(price, 8),
'close': round(price, 8),
'volume': 0.0
}
candle = self.active_candles[symbol]
# PRESERVED: Original candle rolling logic
current_candle_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms
if current_candle_time > candle['open_time']:
# STEP 4 FIX: Round all prices to 8 decimals
candle['open_price'] = round(price, 8)
candle['open_time'] = current_candle_time
candle['high'] = round(price, 8)
candle['low'] = round(price, 8)
candle['close'] = round(price, 8)
candle['volume'] = quantity
else:
candle['close'] = round(price, 8)
candle['high'] = round(max(candle['high'], price), 8)
candle['low'] = round(min(candle['low'], price), 8)
candle['volume'] += quantity
# PRESERVED: Original high-performance tuple publishing
try:
tick_tuple = (
str(symbol),
float(price),
float(quantity),
int(timestamp),
float(candle['open_price']),
int(candle['open_time'])
)
self.msgbus.publish(RAW_TOPIC, tick_tuple)
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Faish tick: {e}")
print(f"EXCEPTION DETAILS: {type(e)} - {e}")
class DOLPHINRegimeActor(Actor):
def __init__(self, config: DOLPHINRegimeActorConfig):
super().__init__(config)
# PRESERVED: All original DOLPHIN configuration
self.max_symbols = config.max_symbols
self.ticks_per_analysis = config.ticks_per_analysis
# PRESERVED: All original pre-allocated arrays for zero allocation
self.open_prices = np.zeros(self.max_symbols, dtype=np.float64)
self.close_prices = np.zeros(self.max_symbols, dtype=np.float64)
self.high_prices = np.zeros(self.max_symbols, dtype=np.float64)
self.low_prices = np.zeros(self.max_symbols, dtype=np.float64)
self.volumes = np.zeros(self.max_symbols, dtype=np.float64)
self.last_update = np.zeros(self.max_symbols, dtype=np.int64)
self.tick_sizes = np.full(self.max_symbols, 1e-8, dtype=np.float64) # Default fallback
# PRESERVED: All original mapping and state
self.symbol_to_idx = {}
self.idx_to_symbol = {}
self.active_symbols = 0
self.tick_count = 0
# PRESERVED: All original DOLPHIN thresholds - EXACT
self.bull_threshold = 0.60 # 60% bullish
self.bear_threshold = 0.55 # 55% bearish
self.previous_bull_ratio = None
self.regime_history = deque(maxlen=100)
# Metrics
self.last_metric_log = time.time_ns()
self.processed_ticks = 0
self.regime_calculations = 0
# NEW: Enhanced indicator tracking for BB and temporal patterns
self.signal_history = deque(maxlen=100) # For BB calculations
self.bb_period = 20 # BB calculation period
self.bb_std_dev = 2.0 # BB standard deviation multiplier
self.velocity_history = deque(maxlen=10) # For regime velocity tracking
self.confidence_history = deque(maxlen=20) # For confidence trend analysis
self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, "
f"ticks_per_analysis: {self.ticks_per_analysis}")
def on_start(self) -> None:
"""Subscribe to tick events - using Nautilus ActorExecutor"""
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to events")
if hasattr(self, 'msgbus') and self.msgbus:
self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick)
self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered)
self.msgbus.subscribe(TICK_SIZES_TOPIC, self.handle_tick_sizes)
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events, symbol discovery and tick sizes")
def on_stop(self) -> None:
"""Stop the actor - Nautilus handles executor cleanup"""
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping")
# Nautilus kernel handles executor shutdown - no manual cleanup needed
def handle_symbols_discovered(self, data):
"""Pre-initialize symbol mappings during discovery phase"""
try:
symbols, timestamp = data
self.log.info(f"Nautilus ActorExecutor: Pre-initializing {len(symbols)} symbol mappings")
for symbol in symbols:
if self.active_symbols >= self.max_symbols:
self.log.warning(f"Max symbols ({self.max_symbols}) exceeded during initialization")
break
idx = self.active_symbols
self.symbol_to_idx[symbol] = idx
self.idx_to_symbol[idx] = symbol
self.active_symbols += 1
self.log.info(f"Pre-initialized {self.active_symbols} symbol mappings")
except Exception as e:
self.log.error(f"Error pre-initializing symbols: {e}")
def handle_tick_sizes(self, data):
"""Apply tick sizes to pre-initialized symbol mappings"""
try:
tick_sizes, timestamp = data
self.log.info(f"Nautilus ActorExecutor: Received {len(tick_sizes)} tick sizes from discovery actor")
applied_count = 0
for symbol, tick_size in tick_sizes.items():
if symbol in self.symbol_to_idx: # Will exist from pre-initialization
idx = self.symbol_to_idx[symbol]
if 0 < tick_size <= 1.0:
self.tick_sizes[idx] = tick_size
applied_count += 1
else:
self.log.warning(f"Invalid tick size {tick_size} for {symbol}, using fallback")
self.log.info(f"Nautilus ActorExecutor: Applied {applied_count} tick sizes to pre-initialized symbols")
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Error handling tick sizes: {e}")
def handle_raw_tick(self, data):
"""
SIMPLIFIED: Zero-allocation tick processing with pre-initialized symbols
Using Nautilus ActorExecutor for regime detection tasks
"""
try:
symbol, price, size, ts_event, open_price, candle_open_time = data
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}")
return
# Direct lookup - symbol will exist from pre-initialization
if symbol not in self.symbol_to_idx:
self.log.error(f"Nautilus ActorExecutor: Symbol {symbol} not found in pre-initialized mappings")
return
idx = self.symbol_to_idx[symbol]
# Check if new candle period
if candle_open_time > self.last_update[idx]:
# Reset for new candle
self.open_prices[idx] = open_price
self.high_prices[idx] = price
self.low_prices[idx] = price
self.close_prices[idx] = price
self.volumes[idx] = size
self.last_update[idx] = candle_open_time
else:
# Update existing candle data
self.close_prices[idx] = price
self.high_prices[idx] = max(self.high_prices[idx], price)
self.low_prices[idx] = min(self.low_prices[idx], price)
self.volumes[idx] += size
self.tick_count += 1
self.processed_ticks += 1
# PRESERVED: Original trigger logic
if self.tick_count >= self.ticks_per_analysis:
# Use Nautilus ActorExecutor for regime detection
if hasattr(self, '_executor') and self._executor:
future = self._executor.submit(self._run_regime_detection_async())
else:
# Fallback to sync detection
self._run_regime_detection()
self.tick_count = 0
# Periodic metrics logging
now = time.time_ns()
if now - self.last_metric_log > 1_000_000_000:
self.log.info(f"Nautilus ActorExecutor: DOLPHIN metrics - ticks: {self.processed_ticks}, "
f"regime_calcs: {self.regime_calculations}, active_symbols: {self.active_symbols}")
self.last_metric_log = now
async def _run_regime_detection_async(self):
try:
self._run_regime_detection()
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}")
def _calculate_enhanced_indicators(self, bull_ratio, bear_ratio, confidence, analyzed, total_symbols):
"""NEW: Calculate enhanced indicators including BB metrics and temporal patterns"""
# Initialize history attributes if they don't exist
if not hasattr(self, 'signal_history'):
self.signal_history = deque(maxlen=50)
if not hasattr(self, 'velocity_history'):
self.velocity_history = deque(maxlen=50)
if not hasattr(self, 'confidence_history'):
self.confidence_history = deque(maxlen=50)
if not hasattr(self, 'bb_period'):
self.bb_period = 20
if not hasattr(self, 'bb_std_dev'):
self.bb_std_dev = 2.0
# Calculate regime momentum signal
base_momentum = (bull_ratio - bear_ratio) * 100 # -100 to +100
sample_quality = min(analyzed / total_symbols, 1.0) if total_symbols > 0 else 0.0
signal = base_momentum * confidence * sample_quality
# Add to signal history
self.signal_history.append(signal)
# Calculate velocity (rate of change in signal)
velocity = 0.0
if len(self.signal_history) >= 2:
velocity = self.signal_history[-1] - self.signal_history[-2]
self.velocity_history.append(velocity)
# Store confidence for trending
self.confidence_history.append(confidence)
# Calculate Bollinger Bands if enough history
bb_metrics = {}
if len(self.signal_history) >= self.bb_period:
# REPLACE SLICE: recent_signals = list(self.signal_history)[-self.bb_period:]
recent_signals = []
start_index = len(self.signal_history) - self.bb_period
for i in range(start_index, len(self.signal_history)):
recent_signals.append(self.signal_history[i])
sma = sum(recent_signals) / len(recent_signals)
# Calculate standard deviation
variance = sum((x - sma) ** 2 for x in recent_signals) / len(recent_signals)
std_dev = variance ** 0.5
upper_band = sma + (self.bb_std_dev * std_dev)
lower_band = sma - (self.bb_std_dev * std_dev)
# Position within BBs (mean reversion interpretation)
if signal > upper_band:
bb_position = 'ABOVE_UPPER'
elif signal < lower_band:
bb_position = 'BELOW_LOWER'
elif signal >= sma:
bb_position = 'UPPER_HALF'
else:
bb_position = 'LOWER_HALF'
# Momentum persistence interpretation
if signal > upper_band:
momentum_signal = 'STRONG_BULL_BREAKOUT'
elif signal < lower_band:
momentum_signal = 'STRONG_BEAR_BREAKOUT'
elif signal > sma:
momentum_signal = 'MILD_BULLISH'
else:
momentum_signal = 'MILD_BEARISH'
bb_metrics = {
'signal': signal,
'sma': sma,
'upper_band': upper_band,
'lower_band': lower_band,
'bb_position': bb_position,
'momentum_signal': momentum_signal,
'bb_ready': True
}
else:
bb_metrics = {
'signal': signal,
'sma': None,
'upper_band': None,
'lower_band': None,
'bb_position': 'INSUFFICIENT_DATA',
'momentum_signal': 'INSUFFICIENT_DATA',
'bb_ready': False
}
# Calculate temporal patterns
temporal_metrics = {}
if len(self.velocity_history) >= 3:
avg_velocity = sum(self.velocity_history) / len(self.velocity_history)
velocity_trend = 'ACCELERATING' if velocity > avg_velocity else 'DECELERATING'
else:
avg_velocity = velocity
velocity_trend = 'BUILDING_HISTORY'
if len(self.confidence_history) >= 5:
# REPLACE SLICE: confidence > sum(self.confidence_history[-5:-1])/4
confidence_sum = 0.0
count = 0
start_index = len(self.confidence_history) - 5
end_index = len(self.confidence_history) - 1 # exclude last element
for i in range(start_index, end_index):
confidence_sum += self.confidence_history[i]
count += 1
if count > 0:
confidence_avg = confidence_sum / count
confidence_trend = 'RISING' if confidence > confidence_avg else 'FALLING'
else:
confidence_trend = 'BUILDING_HISTORY'
else:
confidence_trend = 'BUILDING_HISTORY'
temporal_metrics = {
'velocity': velocity,
'avg_velocity': avg_velocity,
'velocity_trend': velocity_trend,
'confidence_trend': confidence_trend
}
return bb_metrics, temporal_metrics
def _run_regime_detection(self):
self.regime_calculations += 1
total_symbols = self.active_symbols
if total_symbols == 0:
return
analyzed = 0
bullish = 0
bearish = 0
# NEW: Track pattern of bullish/bearish symbols for this calculation
symbol_pattern = []
# PRESERVED: Original analysis with exact thresholds
for idx in range(self.active_symbols):
open_price = self.open_prices[idx]
close_price = self.close_prices[idx]
if open_price == 0:
continue
analyzed += 1
# NEW: HFT-grade tick-size based comparison
tick_size = self.tick_sizes[idx]
equality_threshold = tick_size / 2 # Half tick size standard
price_diff = abs(close_price - open_price)
# Check if prices are effectively equal within tick size tolerance
if price_diff <= equality_threshold:
# Prices are effectively equal (within tick size tolerance)
symbol_pattern.append(f"S{close_price:.8f}={open_price:.8f}")
elif close_price > open_price:
# Bullish: close > open
bullish += 1
# Arrow points to close (larger price)
symbol_pattern.append(f"B{open_price:.8f}->{close_price:.8f}")
else:
# Bearish: close < open
bearish += 1
# Arrow points to open (larger price)
symbol_pattern.append(f"X{close_price:.8f}<-{open_price:.8f}")
if analyzed == 0:
return
# PRESERVED: Original ratio calculations
bull_ratio = bullish / analyzed
bear_ratio = bearish / analyzed
sideways_ratio = 1.0 - bull_ratio - bear_ratio
# MODIFIED: Original DOLPHIN Algorithm - Simple 3-regime detection
if bull_ratio >= self.bull_threshold: # 60% bullish
regime = MarketRegime.BULL
elif bear_ratio >= self.bear_threshold: # 55% bearish
regime = MarketRegime.BEAR
else:
# Everything else is sideways
regime = MarketRegime.SIDEWAYS
# PRESERVED: Original confidence calculation
confidence = self._calculate_confidence(bull_ratio, bear_ratio, analyzed, total_symbols)
self.previous_bull_ratio = bull_ratio
# Publish regime result using Nautilus message bus
try:
if hasattr(self, 'msgbus') and self.msgbus:
regime_tuple = (
int(time.time() * 1000),
regime.value,
float(bull_ratio),
float(bear_ratio),
float(sideways_ratio),
int(analyzed),
int(total_symbols),
float(confidence)
)
self.msgbus.publish(REGIME_TOPIC, regime_tuple)
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}")
# NEW: Calculate enhanced indicators
bb_metrics, temporal_metrics = self._calculate_enhanced_indicators(
bull_ratio, bear_ratio, confidence, analyzed, total_symbols
)
# NEW: Publish enhanced indicators to data bus as tuples
try:
if hasattr(self, 'msgbus') and self.msgbus:
timestamp = int(time.time() * 1000)
# Publish regime indicators as tuple
indicator_tuple = (
timestamp,
float(bb_metrics['signal']),
bool(bb_metrics['bb_ready']),
float(temporal_metrics['velocity']),
str(temporal_metrics['velocity_trend']),
str(temporal_metrics['confidence_trend'])
)
self.msgbus.publish(REGIME_INDICATORS_TOPIC, indicator_tuple)
# Publish BB metrics as tuple (only if BB is ready)
if bb_metrics['bb_ready']:
bb_tuple = (
timestamp,
float(bb_metrics['signal']),
float(bb_metrics['sma']),
float(bb_metrics['upper_band']),
float(bb_metrics['lower_band']),
str(bb_metrics['bb_position']),
str(bb_metrics['momentum_signal'])
)
self.msgbus.publish(BB_METRICS_TOPIC, bb_tuple)
# Publish temporal patterns as tuple
temporal_tuple = (
timestamp,
float(temporal_metrics['velocity']),
float(temporal_metrics['avg_velocity']),
str(temporal_metrics['velocity_trend']),
str(temporal_metrics['confidence_trend']),
int(len(self.signal_history))
)
self.msgbus.publish(TEMPORAL_PATTERNS_TOPIC, temporal_tuple)
except Exception as e:
self.log.error(f"Failed to publish enhanced indicators: {e}")
# Log regime changes
if not self.regime_history or regime != self.regime_history[-1]:
self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.2%} "
f"Bear: {bear_ratio:.2%} Sideways: {sideways_ratio:.2%} ({bullish}/{bearish}) | "
f"Confidence: {confidence:.2%} | Analyzed: {analyzed}/{total_symbols}")
self.regime_history.append(regime)
# Periodic regime status (even without changes)
if self.regime_calculations % 50 == 0: # Every second, approx, given avg. tick rate
# Determine color based on regime
if regime == MarketRegime.BULL:
color_code = "\033[92m" # Green
elif regime == MarketRegime.BEAR:
color_code = "\033[91m" # Red
else: # SIDEWAYS
color_code = "\033[93m" # Yellow
# Reset color code
reset_code = "\033[0m"
self.log.info(f"{color_code}REGIME STATUS: {regime.value} | Bull: {bull_ratio:.2%} "
f"Bear: {bear_ratio:.2%} ({bullish}/{bearish}) | Processed: {self.processed_ticks} ticks{reset_code}")
# Enhanced indicator line after regime status
if bb_metrics['bb_ready']:
self.log.info(f"{color_code}BB INDICATORS: Signal:{bb_metrics['signal']:.2f} | "
f"SMA:{bb_metrics['sma']:.2f} | Upper:{bb_metrics['upper_band']:.2f} | "
f"Lower:{bb_metrics['lower_band']:.2f} | Pos:{bb_metrics['bb_position']} | "
f"Mom:{bb_metrics['momentum_signal']} | Vel:{temporal_metrics['velocity']:.2f} | "
f"VelTrend:{temporal_metrics['velocity_trend']} | ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}")
else:
self.log.info(f"{color_code}BB INDICATORS: Signal:{bb_metrics['signal']:.2f} | "
f"Status:BUILDING_BB_HISTORY ({len(self.signal_history)}/{self.bb_period}) | "
f"Vel:{temporal_metrics['velocity']:.2f} | VelTrend:{temporal_metrics['velocity_trend']} | "
f"ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}")
# NEW: Log symbol pattern and counts
if symbol_pattern: # Only if we have symbols to show
pattern_str = " ".join(symbol_pattern) + " " # Create pattern with spaces
bull_count = sum(1 for s in symbol_pattern if s.startswith("B"))
bear_count = sum(1 for s in symbol_pattern if s.startswith("X"))
self.log.debug(f"{pattern_str} and totals: BULLS:{bull_count}/BEARS:{bear_count}")
def _calculate_confidence(self, bull_ratio: float, bear_ratio: float,
analyzed: int, total: int) -> float:
"""PRESERVED EXACTLY: Original DOLPHIN confidence calculation"""
if analyzed == 0:
return 0.0
# Market Decisiveness
max_ratio = max(bull_ratio, bear_ratio)
decisiveness = abs(max_ratio - 0.5) * 2
# Sample Coverage
coverage = analyzed / total
# Statistical Significance
if max_ratio > 0 and max_ratio < 1:
standard_error = math.sqrt(max_ratio * (1 - max_ratio) / analyzed)
else:
standard_error = 0.0
z_score = abs(max_ratio - 0.5) / max(standard_error, 0.001)
statistical_confidence = min(z_score / 3.0, 1.0)
# Market Clarity
market_clarity = bull_ratio + bear_ratio
# Weighted combination
confidence = (
decisiveness * 0.40 +
coverage * 0.10 +
statistical_confidence * 0.30 +
market_clarity * 0.20
)
return max(0.0, min(confidence, 1.0))
class SILOQYNormalizerActor(Actor):
def __init__(self, config: SILOQYNormalizerConfig):
super().__init__(config)
self.normalized_count = 0
self.last_metric_log = time.time_ns()
self.log.info("SILOQYNormalizerActor initialized with Nautilus ActorExecutor")
def on_start(self) -> None:
"""Subscribe to tick events - using Nautilus ActorExecutor"""
self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor starting - subscribing to tick events")
if hasattr(self, 'msgbus') and self.msgbus:
self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick)
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events")
def on_stop(self) -> None:
"""Stop the actor - Nautilus handles executor cleanup"""
self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor stopping")
# Nautilus kernel handles executor shutdown - no manual cleanup needed
def handle_raw_tick(self, data):
try:
symbol, price, size, ts_event, open_price, candle_open_time = data
tick = SiloqyTradeTick(
instrument_id=symbol,
price=round(float(price), 8),
size=round(float(size), 8),
ts_event=int(ts_event),
ts_init=int(time.time_ns()),
open_price=round(float(open_price), 8) if open_price is not None else round(float(price), 8),
candle_open_time=int(candle_open_time) if candle_open_time is not None else int(time.time() * 1000),
tick_side=None, # Default to None unless Binance provides this info
exchange="BINANCE", # We know this is from Binance
liquidity_flag=None # Default to None unless Binance provides this info
)
if hasattr(self, 'msgbus') and self.msgbus:
try:
# Publish directly to STRUCTURED_TOPIC to match original v3.28 behavior
self.msgbus.publish(STRUCTURED_TOPIC, tick)
pass
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to publish to STRUCTURED_TOPIC: {e}")
# FALLBACK: Try Nautilus publish_data if msgbus not available
elif hasattr(self, 'publish_data'):
try:
# Use standard Nautilus publishing as fallback
self.publish_data(DataType(SiloqyTradeTick), tick)
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to publish structured data: {e}")
self.normalized_count += 1
# Periodic metrics
now = time.time_ns()
if now - self.last_metric_log > 1_000_000_000:
self.log.info(f"Nautilus ActorExecutor: Normalizer processed: {self.normalized_count} ticks")
self.last_metric_log = now
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to handle raw tick: {e}")
def test_siloqy_actors_with_nautilus_process_management():
"""Test SILOQY actors with Nautilus built-in ActorExecutor process management"""
print("SILOQY Multi-Actor Test with Nautilus Built-in Process Management")
# Configuration following test pattern - THROTTLE MODE ENABLED FOR DEV/TESTING
symbol_discovery_config = ImportableActorConfig(
actor_path="__main__:SILOQYSymbolDiscoveryActor",
config_path="__main__:SILOQYSymbolDiscoveryConfig",
config={
"component_id": "SILOQY-SYMBOL-DISCOVERY",
"symbols": [], # Empty for dynamic discovery
"candle_interval_ms": 15 * 60 * 1000,
"throttle_mode": True, # ENABLED: Safe for dual instance testing
"throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s)
"max_symbols_throttled": 414 # Only 100 symbols (vs 2000+)
}
)
main_actor_config = ImportableActorConfig(
actor_path="__main__:SILOQYMainActor",
config_path="__main__:SILOQYMainActorConfig",
config={
"component_id": "SILOQY-MAIN-ACTOR",
"candle_interval_ms": 15 * 60 * 1000,
"throttle_mode": False, # ENABLED: Reduced tick generation
"enable_real_data": True # CHANGE TO True for real WebSocket data
}
)
regime_actor_config = ImportableActorConfig(
actor_path="__main__:DOLPHINRegimeActor",
config_path="__main__:DOLPHINRegimeActorConfig",
config={
"component_id": "DOLPHIN-REGIME-ACTOR",
"max_symbols": 5000,
"ticks_per_analysis": 2 # Reduced for throttle mode testing
}
)
normalizer_config = ImportableActorConfig(
actor_path="__main__:SILOQYNormalizerActor",
config_path="__main__:SILOQYNormalizerConfig",
config={
"component_id": "SILOQY-NORMALIZER"
}
)
# Trading node configuration - Uses Nautilus built-in process management
trading_config = TradingNodeConfig(
trader_id=TraderId("SILOQY-TRADER-001"),
actors=[
symbol_discovery_config,
main_actor_config,
regime_actor_config,
normalizer_config
],
data_clients={}, # No external data clients needed
exec_clients={} # No execution clients for this test
)
node = TradingNode(config=trading_config)
try:
node.build()
print("Node built successfully with Nautilus built-in process management")
node.run()
except KeyboardInterrupt:
print("\nSILOQY Actor test with Nautilus process management completed!")
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
finally:
try:
node.dispose()
print("Nautilus process management: Node disposed successfully")
except:
pass
if __name__ == "__main__":
test_siloqy_actors_with_nautilus_process_management()