1243 lines
54 KiB
Python
1243 lines
54 KiB
Python
import time
|
|
import numpy as np
|
|
import asyncio
|
|
import json
|
|
import math
|
|
import httpx
|
|
import websockets
|
|
from typing import Dict, List, Optional, Tuple, Any
|
|
from enum import Enum
|
|
from collections import deque
|
|
import random # Added for _simulate_websocket_ticks
|
|
from dataclasses import dataclass, field
|
|
|
|
# Nautilus imports - following test pattern
|
|
from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig
|
|
from nautilus_trader.live.node import TradingNode
|
|
from nautilus_trader.common.actor import Actor
|
|
from nautilus_trader.common.config import ActorConfig
|
|
from nautilus_trader.model.identifiers import TraderId
|
|
from nautilus_trader.core.data import Data
|
|
from nautilus_trader.common.component import Logger, init_logging
|
|
|
|
# NEW: construct a fresh Tick for each normalized message (don't mutate immutable Data)
|
|
from nautilus_trader.model.data import TradeTick
|
|
from nautilus_trader.model.identifiers import InstrumentId, TradeId
|
|
from nautilus_trader.model.objects import Price, Quantity
|
|
from nautilus_trader.model.enums import AggressorSide
|
|
|
|
# Initialize logging
|
|
_log_guard = init_logging()
|
|
_logger = Logger("SILOQY")
|
|
|
|
# Topics - HIGH PERFORMANCE STRING TOPICS
|
|
RAW_TOPIC = "SILOQY.RAW.TICKS"
|
|
STRUCTURED_TOPIC = "SILOQY.STRUCTURED.TICKS"
|
|
REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS"
|
|
SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED"
|
|
CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL"
|
|
|
|
# Rate limiting constant
|
|
MIN_INTERVAL = 2.5 # seconds between API batches
|
|
|
|
# Market Regime Enum
|
|
class MarketRegime(Enum):
|
|
BULL = "BULL"
|
|
BEAR = "BEAR"
|
|
TRANSITION = "TRANSITION"
|
|
SIDEWAYS = "SIDEWAYS"
|
|
|
|
# ============================================================================
|
|
# EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE
|
|
# ============================================================================
|
|
|
|
@dataclass
|
|
class TickData:
|
|
"""Universal tick data structure from standalone"""
|
|
symbol: str
|
|
price: float
|
|
quantity: float
|
|
timestamp: int
|
|
is_buyer_maker: bool
|
|
trade_id: int
|
|
exchange: str
|
|
raw_data: Dict = None
|
|
|
|
def __post_init__(self):
|
|
if self.raw_data is None:
|
|
self.raw_data = {}
|
|
|
|
@dataclass
|
|
class ExchangeConfig:
|
|
"""Exchange-specific configuration from standalone"""
|
|
name: str
|
|
websocket_limit_per_second: int
|
|
max_streams_per_connection: int
|
|
max_connections_per_window: int
|
|
connection_window_seconds: int
|
|
base_websocket_url: str
|
|
requires_auth: bool = False
|
|
ping_interval: float = 20.0
|
|
ping_timeout: float = 10.0
|
|
|
|
class BinanceAdapter:
|
|
"""EXTRACTED FROM STANDALONE: Binance-specific WebSocket adapter"""
|
|
|
|
def __init__(self):
|
|
self.config = ExchangeConfig(
|
|
name="binance",
|
|
websocket_limit_per_second=10,
|
|
max_streams_per_connection=1024,
|
|
max_connections_per_window=300,
|
|
connection_window_seconds=300, # 5 minutes
|
|
base_websocket_url="wss://stream.binance.com:9443",
|
|
requires_auth=False
|
|
)
|
|
self.logger = Logger(f"BINANCE_ADAPTER")
|
|
|
|
def get_websocket_url(self, symbols: List[str]) -> str:
|
|
"""Build Binance WebSocket URL with combined streams"""
|
|
if len(symbols) == 1:
|
|
symbol = symbols[0].lower()
|
|
return f"{self.config.base_websocket_url}/ws/{symbol}@trade"
|
|
else:
|
|
# Combined stream approach
|
|
streams = [f"{symbol.lower()}@trade" for symbol in symbols]
|
|
stream_string = "/".join(streams)
|
|
return f"{self.config.base_websocket_url}/stream?streams={stream_string}"
|
|
|
|
def parse_message(self, message: str) -> Optional[TickData]:
|
|
"""EXTRACTED FROM STANDALONE: Parse Binance trade message"""
|
|
try:
|
|
data = json.loads(message)
|
|
|
|
# Handle combined stream format
|
|
if 'stream' in data and 'data' in data:
|
|
stream_name = data['stream']
|
|
trade_data = data['data']
|
|
else:
|
|
# Single stream format
|
|
trade_data = data
|
|
stream_name = trade_data.get('e', '')
|
|
|
|
# Only process trade events
|
|
if not (stream_name.endswith('@trade') or trade_data.get('e') == 'trade'):
|
|
return None
|
|
|
|
return TickData(
|
|
symbol=trade_data['s'],
|
|
price=float(trade_data['p']),
|
|
quantity=float(trade_data['q']),
|
|
timestamp=int(trade_data['T']),
|
|
is_buyer_maker=trade_data['m'],
|
|
trade_id=int(trade_data['t']),
|
|
exchange="binance",
|
|
raw_data=trade_data
|
|
)
|
|
|
|
except (json.JSONDecodeError, KeyError, ValueError) as e:
|
|
self.logger.error(f"Failed to parse Binance message: {e}")
|
|
return None
|
|
|
|
def calculate_optimal_distribution(self, symbols: List[str]) -> List[List[str]]:
|
|
"""EXTRACTED FROM STANDALONE: Calculate optimal symbol distribution across connections"""
|
|
# Conservative: leave room for message bursts
|
|
safety_margin = 0.7
|
|
symbols_per_connection = int(
|
|
min(
|
|
self.config.max_streams_per_connection * safety_margin,
|
|
self.config.websocket_limit_per_second * 5 # 5-second buffer
|
|
)
|
|
)
|
|
|
|
# Group symbols into chunks
|
|
symbol_groups = []
|
|
for i in range(0, len(symbols), symbols_per_connection):
|
|
group = symbols[i:i + symbols_per_connection]
|
|
symbol_groups.append(group)
|
|
|
|
return symbol_groups
|
|
|
|
class SiloqyWebSocketConnection:
|
|
"""EXTRACTED FROM STANDALONE: WebSocket connection adapted for Nautilus msgbus"""
|
|
|
|
def __init__(self, connection_id: str, symbols: List[str],
|
|
binance_adapter: BinanceAdapter, main_actor_ref):
|
|
self.connection_id = connection_id
|
|
self.symbols = symbols
|
|
self.adapter = binance_adapter
|
|
self.main_actor = main_actor_ref # Reference to SILOQYMainActor for msgbus access
|
|
|
|
self.websocket = None
|
|
self.is_running = False
|
|
self.retry_count = 0
|
|
self.messages_received = 0
|
|
self.last_message_time = 0.0
|
|
self.connected_at = None
|
|
|
|
self.logger = Logger(f"WS_{connection_id}")
|
|
self.logger.info(f"WebSocket connection initialized for {len(symbols)} symbols")
|
|
|
|
async def start(self) -> None:
|
|
"""Start the WebSocket connection"""
|
|
self.is_running = True
|
|
|
|
while self.is_running:
|
|
try:
|
|
await self._connect_and_run()
|
|
except Exception as e:
|
|
self.logger.error(f"Connection {self.connection_id} error: {e}")
|
|
|
|
if self.is_running:
|
|
await self._handle_reconnection()
|
|
else:
|
|
break
|
|
|
|
async def _connect_and_run(self) -> None:
|
|
"""EXTRACTED FROM STANDALONE: Connect to WebSocket and run message loop"""
|
|
url = self.adapter.get_websocket_url(self.symbols)
|
|
|
|
self.logger.info(f"Connecting to Binance: {url[:100]}...")
|
|
|
|
async with websockets.connect(
|
|
url,
|
|
ping_interval=self.adapter.config.ping_interval,
|
|
ping_timeout=self.adapter.config.ping_timeout,
|
|
close_timeout=10
|
|
) as websocket:
|
|
self.websocket = websocket
|
|
self.connected_at = time.time()
|
|
self.retry_count = 0
|
|
|
|
self.logger.info(f"Connection {self.connection_id} established")
|
|
|
|
# Message processing loop
|
|
async for message in websocket:
|
|
if not self.is_running:
|
|
break
|
|
|
|
try:
|
|
await self._process_message(message)
|
|
except Exception as e:
|
|
self.logger.error(f"Message processing error: {e}")
|
|
|
|
async def _process_message(self, message: str) -> None:
|
|
"""Process incoming WebSocket message and convert to Nautilus format"""
|
|
self.messages_received += 1
|
|
self.last_message_time = time.time()
|
|
|
|
# Parse message using Binance adapter
|
|
tick_data = self.adapter.parse_message(message)
|
|
if tick_data:
|
|
# Convert TickData to SiloqyTradeTick and publish via Nautilus msgbus
|
|
try:
|
|
# Create SiloqyTradeTick object
|
|
siloqy_tick = SiloqyTradeTick(
|
|
instrument_id=tick_data.symbol,
|
|
price=round(tick_data.price, 8),
|
|
size=round(tick_data.quantity, 8),
|
|
ts_event=tick_data.timestamp * 1000000, # Convert ms to ns
|
|
ts_init=int(time.time_ns()),
|
|
open_price=None, # Will be set by candle logic
|
|
candle_open_time=None, # Will be set by candle logic
|
|
tick_side=None,
|
|
exchange="BINANCE",
|
|
liquidity_flag=None
|
|
)
|
|
|
|
# Get candle information from main actor
|
|
if hasattr(self.main_actor, 'active_candles') and self.main_actor.active_candles:
|
|
symbol = tick_data.symbol
|
|
if symbol in self.main_actor.active_candles:
|
|
candle = self.main_actor.active_candles[symbol]
|
|
siloqy_tick.open_price = candle.get('open_price', tick_data.price)
|
|
siloqy_tick.candle_open_time = candle.get('open_time', int(time.time() * 1000))
|
|
|
|
# Process through main actor's tick handling (preserves all candle logic)
|
|
self.main_actor.on_websocket_tick(
|
|
tick_data.symbol,
|
|
tick_data.price,
|
|
tick_data.quantity,
|
|
tick_data.timestamp
|
|
)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Failed to process tick: {e}")
|
|
|
|
async def _handle_reconnection(self) -> None:
|
|
"""Handle reconnection with exponential backoff"""
|
|
self.retry_count += 1
|
|
delay = min(2 ** self.retry_count, 60) # Max 60 seconds
|
|
|
|
self.logger.warning(f"Reconnecting in {delay}s (attempt {self.retry_count})")
|
|
await asyncio.sleep(delay)
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the connection"""
|
|
self.logger.info(f"Stopping connection {self.connection_id}")
|
|
self.is_running = False
|
|
|
|
if self.websocket:
|
|
await self.websocket.close()
|
|
|
|
# --------------------------------------------------------------------
|
|
# SILOQY Custom Tick - PRESERVED with fixes
|
|
# --------------------------------------------------------------------
|
|
class SiloqyTradeTick(TradeTick):
|
|
def __init__(self, instrument_id: str, price: float, size: float, ts_event: int, ts_init: int = None,
|
|
open_price: float = None, candle_open_time: int = None, tick_side: str = None,
|
|
exchange: str = None, liquidity_flag = None):
|
|
# Convert to proper Nautilus types - add venue to symbol
|
|
if isinstance(instrument_id, str):
|
|
# Add BINANCE venue if not already present
|
|
if '.' not in instrument_id:
|
|
instrument_id = f"{instrument_id}.BINANCE"
|
|
instr_id = InstrumentId.from_str(instrument_id)
|
|
else:
|
|
instr_id = instrument_id
|
|
# STEP 1 FIX: Changed precision from 4 to 8 for price, and 0 to 8 for size
|
|
price_obj = Price(price, precision=8) if not isinstance(price, Price) else price
|
|
size_obj = Quantity(size, precision=8) if not isinstance(size, Quantity) else size
|
|
|
|
# Default aggressor side and trade ID
|
|
aggressor_side = AggressorSide.NO_AGGRESSOR # Default since Binance doesn't always provide this
|
|
trade_id = TradeId(f"T{int(time.time_ns())}") # Generate a trade ID
|
|
|
|
super().__init__(
|
|
instrument_id=instr_id,
|
|
price=price_obj,
|
|
size=size_obj,
|
|
aggressor_side=aggressor_side,
|
|
trade_id=trade_id,
|
|
ts_event=int(ts_event),
|
|
ts_init=int(ts_init if ts_init is not None else time.time_ns())
|
|
)
|
|
|
|
# Additional SILOQY fields
|
|
self.open_price = open_price if open_price is not None else price
|
|
self.candle_open_time = candle_open_time if candle_open_time is not None else int(time.time() * 1000)
|
|
self.tick_side = tick_side
|
|
self.exchange = exchange
|
|
self.liquidity_flag = liquidity_flag
|
|
|
|
class SILOQYSymbolDiscoveryConfig(ActorConfig):
|
|
symbols: List[str] = []
|
|
candle_interval_ms: int = 15 * 60 * 1000
|
|
throttle_mode: bool = False
|
|
throttle_rate_limit_seconds: float = 10.0
|
|
max_symbols_throttled: int = 100
|
|
|
|
class SILOQYMainActorConfig(ActorConfig):
|
|
candle_interval_ms: int = 15 * 60 * 1000
|
|
throttle_mode: bool = False
|
|
enable_real_data: bool = False # NEW: Enable real WebSocket data
|
|
|
|
class DOLPHINRegimeActorConfig(ActorConfig):
|
|
max_symbols: int = 5000
|
|
ticks_per_analysis: int = 250
|
|
|
|
class SILOQYNormalizerConfig(ActorConfig):
|
|
pass
|
|
|
|
class SILOQYSymbolDiscoveryActor(Actor):
|
|
"""
|
|
Symbol discovery with all original SILOQY algorithms preserved
|
|
Using Nautilus built-in ActorExecutor for task management
|
|
"""
|
|
def __init__(self, config: SILOQYSymbolDiscoveryConfig):
|
|
super().__init__(config)
|
|
|
|
# Preserve original SILOQY configuration
|
|
self.symbols = list(config.symbols) if config.symbols else []
|
|
self.candle_interval_ms = config.candle_interval_ms
|
|
self.active_candles = {}
|
|
|
|
# Process management configuration
|
|
self.throttle_mode = config.throttle_mode
|
|
self.throttle_rate_limit_seconds = config.throttle_rate_limit_seconds
|
|
self.max_symbols_throttled = config.max_symbols_throttled
|
|
|
|
if self.throttle_mode:
|
|
self.log.warning("THROTTLE MODE ENABLED - Running in dev/test configuration")
|
|
self.log.warning(f"Rate limit: {self.throttle_rate_limit_seconds}s between batches")
|
|
self.log.warning(f"Symbol limit: {self.max_symbols_throttled} symbols max")
|
|
|
|
self.log.info("SILOQYSymbolDiscoveryActor initialized with Nautilus ActorExecutor")
|
|
|
|
def on_start(self) -> None:
|
|
"""Start the actor - using Nautilus ActorExecutor"""
|
|
self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor starting")
|
|
|
|
# Use Nautilus ActorExecutor for task management
|
|
if hasattr(self, '_executor') and self._executor:
|
|
self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization")
|
|
future = self._executor.submit(self.on_start_async())
|
|
self.log.info("Nautilus ActorExecutor: Symbol discovery task submitted")
|
|
else:
|
|
self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio")
|
|
asyncio.create_task(self.on_start_async())
|
|
|
|
def on_stop(self) -> None:
|
|
"""Stop the actor - Nautilus handles executor cleanup"""
|
|
self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor stopping")
|
|
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
|
|
|
async def on_start_async(self) -> None:
|
|
self.log.info("Nautilus ActorExecutor: Starting async symbol discovery initialization")
|
|
|
|
try:
|
|
# PRESERVED: Original symbol discovery algorithm
|
|
if not self.symbols:
|
|
await self._discover_all_symbols()
|
|
|
|
# PRESERVED: Original stats fetching and candle reconstruction
|
|
stats, candle_opens = await self._fetch_stats_and_reconstruct_candles()
|
|
|
|
# Set active candles from reconstruction results
|
|
self.active_candles = candle_opens
|
|
|
|
# Publish results using msgbus
|
|
self._publish_discovery_results()
|
|
|
|
self.log.info("Nautilus ActorExecutor: Symbol discovery completed successfully")
|
|
|
|
except Exception as e:
|
|
self.log.error(f"Nautilus ActorExecutor: Failed to complete symbol discovery: {e}")
|
|
# Don't re-raise, let system continue
|
|
|
|
async def _discover_all_symbols(self):
|
|
"""PRESERVED: Original Binance symbol discovery algorithm"""
|
|
self.log.info("Starting dynamic symbol discovery from Binance...")
|
|
url = "https://api.binance.com/api/v3/exchangeInfo"
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
self.log.info("Fetching exchange info from Binance API...")
|
|
response = await client.get(url, timeout=10)
|
|
if response.status_code == 200:
|
|
self.log.info("Successfully received exchange info")
|
|
data = response.json()
|
|
# Get all trading symbols (USDT pairs for example)
|
|
full_symbols = [
|
|
s['symbol'] for s in data['symbols']
|
|
if s['status'] == 'TRADING' and s['symbol'].endswith('USDT')
|
|
]
|
|
|
|
# Apply throttle mode symbol limiting
|
|
if self.throttle_mode:
|
|
self.symbols = full_symbols[:self.max_symbols_throttled]
|
|
self.log.warning(f"THROTTLE MODE: Limited to {len(self.symbols)} symbols (from {len(full_symbols)} available)")
|
|
else:
|
|
self.symbols = full_symbols
|
|
|
|
self.log.info(f"Discovered {len(self.symbols)} trading symbols")
|
|
self.log.info(f"First 10 symbols: {self.symbols[:10]}")
|
|
else:
|
|
self.log.error(f"Failed to fetch exchange info: {response.status_code}")
|
|
raise Exception(f"Failed to fetch exchange info: {response.status_code}")
|
|
|
|
async def _fetch_stats_and_reconstruct_candles(self):
|
|
"""PRESERVED: All original rate limiting with Nautilus async patterns"""
|
|
url = "https://api.binance.com/api/v3/ticker/24hr"
|
|
klines_url = "https://api.binance.com/api/v3/klines"
|
|
ticker_url = "https://api.binance.com/api/v3/ticker/price"
|
|
|
|
# PRESERVED: Original rate limit handling with throttle mode support
|
|
base_rate_limit = 2.5
|
|
rate_limit_interval = self.throttle_rate_limit_seconds if self.throttle_mode else base_rate_limit
|
|
|
|
if self.throttle_mode:
|
|
self.log.warning(f"THROTTLE MODE: Using {rate_limit_interval}s intervals (vs {base_rate_limit}s normal)")
|
|
|
|
stats = {}
|
|
candle_opens = {}
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
total_batches = (len(self.symbols) + 49) // 50
|
|
for i in range(0, len(self.symbols), 50):
|
|
batch_num = (i // 50) + 1
|
|
start_time = time.time()
|
|
symbols_batch = self.symbols[i:i + 50]
|
|
|
|
if self.throttle_mode:
|
|
self.log.info(f"THROTTLE MODE: Processing batch {batch_num}/{total_batches} with {rate_limit_interval}s delay")
|
|
|
|
# PRESERVED: Original boundary detection logic
|
|
current_time = int(time.time() * 1000)
|
|
time_into_candle = current_time % self.candle_interval_ms
|
|
candle_open_time = current_time - time_into_candle
|
|
at_boundary = (time_into_candle < 1000)
|
|
|
|
if i == 0: # Log initial state
|
|
self.log.info("DOLPHIN: Current candle analysis:")
|
|
self.log.info(f" - Current time: {current_time}")
|
|
self.log.info(f" - Candle interval: {self.candle_interval_ms}ms ({self.candle_interval_ms//60000}m)")
|
|
self.log.info(f" - Time into candle: {time_into_candle}ms ({time_into_candle//1000}s)")
|
|
self.log.info(f" - At boundary: {at_boundary}")
|
|
self.log.info(f" - Candle open time: {candle_open_time}")
|
|
|
|
symbols_json_string = json.dumps(symbols_batch, separators=(',', ':'))
|
|
params = {"symbols": symbols_json_string}
|
|
|
|
try:
|
|
# PRESERVED: Original stats fetching
|
|
response = await client.get(url, params=params, timeout=10)
|
|
|
|
if response.status_code == 200:
|
|
data = response.json()
|
|
for item in data:
|
|
symbol = item['symbol']
|
|
stats[symbol] = {
|
|
'count': int(item['count']),
|
|
'quoteVolume': float(item['quoteVolume']),
|
|
}
|
|
self.log.info(f"Fetched stats for batch {i//50 + 1}: {len(data)} symbols")
|
|
|
|
else:
|
|
self.log.error(f"Error {response.status_code}: {response.text}")
|
|
|
|
# PRESERVED: Original DOLPHIN candle reconstruction strategy
|
|
if at_boundary:
|
|
# AT BOUNDARY: Fetch current prices to use as opens
|
|
self.log.info(f"DOLPHIN: At boundary - fetching current prices (batch {i//50 + 1})")
|
|
|
|
ticker_params = {"symbols": symbols_json_string}
|
|
ticker_response = await client.get(ticker_url, params=ticker_params, timeout=5)
|
|
|
|
if ticker_response.status_code == 200:
|
|
ticker_data = ticker_response.json()
|
|
for item in ticker_data:
|
|
symbol = item['symbol']
|
|
current_price = float(item['price'])
|
|
candle_opens[symbol] = current_price
|
|
|
|
self.log.info(f" - Fetched current prices for {len(ticker_data)} symbols")
|
|
else:
|
|
self.log.warning(f" - Current price fetch failed ({ticker_response.status_code})")
|
|
|
|
else:
|
|
# NOT AT BOUNDARY: Fetch historical 1s kline data
|
|
self.log.info(f"DOLPHIN: Not at boundary - fetching 1s kline data (batch {i//50 + 1})")
|
|
|
|
for symbol in symbols_batch:
|
|
try:
|
|
# PRESERVED: Original kline fetching logic
|
|
kline_params = {
|
|
'symbol': symbol,
|
|
'interval': '1s',
|
|
'startTime': candle_open_time,
|
|
'endTime': candle_open_time + 1000,
|
|
'limit': 1
|
|
}
|
|
|
|
kline_response = await client.get(klines_url, params=kline_params, timeout=5)
|
|
|
|
if kline_response.status_code == 200:
|
|
kline_data = kline_response.json()
|
|
if kline_data:
|
|
open_price = float(kline_data[0][1])
|
|
candle_opens[symbol] = open_price
|
|
|
|
if (i + len(symbols_batch)) <= 10:
|
|
self.log.info(f" - {symbol}: reconstructed open = {open_price}")
|
|
else:
|
|
self.log.warning(f" - {symbol}: no 1s kline data found")
|
|
else:
|
|
self.log.warning(f" - {symbol}: kline fetch failed ({kline_response.status_code})")
|
|
|
|
except Exception as e:
|
|
self.log.error(f" - {symbol}: kline fetch error: {e}")
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
except Exception as e:
|
|
self.log.error(f"Request failed: {str(e)}")
|
|
|
|
# PRESERVED: Original rate limiting with Nautilus async
|
|
elapsed = time.time() - start_time
|
|
if i + 50 < len(self.symbols):
|
|
sleep_time = max(0, rate_limit_interval - elapsed)
|
|
if sleep_time > 0:
|
|
await asyncio.sleep(sleep_time)
|
|
|
|
self.log.info(f"DOLPHIN: Candle reconstruction complete:")
|
|
self.log.info(f" - Symbols processed: {len(self.symbols)}")
|
|
self.log.info(f" - Opens reconstructed: {len(candle_opens)}")
|
|
self.log.info("Discovery phase ready for publishing...")
|
|
|
|
return stats, candle_opens
|
|
|
|
def _publish_discovery_results(self):
|
|
"""Publish results using msgbus - Nautilus message bus integration"""
|
|
try:
|
|
self.log.info("Nautilus ActorExecutor: Publishing discovery results to other actors...")
|
|
if hasattr(self, 'msgbus') and self.msgbus:
|
|
# Publish symbols and candles as tuples
|
|
self.msgbus.publish(SYMBOLS_DISCOVERED_TOPIC, (self.symbols, int(time.time_ns())))
|
|
self.msgbus.publish(CANDLES_INITIAL_TOPIC, (self.active_candles, int(time.time_ns())))
|
|
|
|
self.log.info(f"Nautilus ActorExecutor: Published {len(self.symbols)} symbols and {len(self.active_candles)} candles")
|
|
self.log.info("Nautilus ActorExecutor: Discovery phase complete - other actors can now start processing")
|
|
else:
|
|
self.log.warning("Nautilus ActorExecutor: msgbus not available for publishing")
|
|
|
|
except Exception as e:
|
|
self.log.error(f"Nautilus ActorExecutor: Failed to publish discovery results: {e}")
|
|
|
|
class SILOQYMainActor(Actor):
|
|
def __init__(self, config: SILOQYMainActorConfig):
|
|
super().__init__(config)
|
|
|
|
# Preserve original configuration
|
|
self.candle_interval_ms = config.candle_interval_ms
|
|
self.throttle_mode = config.throttle_mode
|
|
self.enable_real_data = config.enable_real_data # NEW: Real data capability
|
|
self.connections = {}
|
|
self.connection_tasks = {}
|
|
|
|
# Will be populated from discovery actor
|
|
self.symbols = []
|
|
self.active_candles = {}
|
|
|
|
# WebSocket infrastructure - NEW
|
|
self.binance_adapter = BinanceAdapter() if self.enable_real_data else None
|
|
self.websocket_connections: Dict[str, SiloqyWebSocketConnection] = {}
|
|
self.websocket_tasks: Dict[str, asyncio.Task] = {}
|
|
|
|
# WebSocket tasks - managed by Nautilus ActorExecutor
|
|
self.ws_tasks = []
|
|
|
|
# Synchronization
|
|
self.discovery_complete = asyncio.Event()
|
|
|
|
if self.throttle_mode:
|
|
self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation")
|
|
|
|
if self.enable_real_data:
|
|
self.log.info("REAL DATA MODE: Will connect to Binance WebSocket streams")
|
|
else:
|
|
self.log.info("SIMULATION MODE: Will generate simulated ticks")
|
|
|
|
self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor")
|
|
|
|
def on_start(self) -> None:
|
|
"""Subscribe to discovery events - using Nautilus ActorExecutor"""
|
|
self.log.info("Nautilus ActorExecutor: SILOQYMainActor starting - subscribing to discovery events")
|
|
|
|
# Subscribe to discovery results
|
|
if hasattr(self, 'msgbus') and self.msgbus:
|
|
self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered)
|
|
self.msgbus.subscribe(CANDLES_INITIAL_TOPIC, self.handle_candles_initial)
|
|
self.log.info("Nautilus ActorExecutor: Subscribed to discovery topics")
|
|
|
|
# Use Nautilus ActorExecutor for task management
|
|
if hasattr(self, '_executor') and self._executor:
|
|
self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization")
|
|
future = self._executor.submit(self.on_start_async())
|
|
self.log.info("Nautilus ActorExecutor: Main actor initialization task submitted")
|
|
else:
|
|
self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio")
|
|
asyncio.create_task(self.on_start_async())
|
|
|
|
def on_stop(self) -> None:
|
|
"""Stop the actor - Nautilus handles executor cleanup"""
|
|
self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping")
|
|
|
|
# Stop WebSocket connections if running
|
|
if self.enable_real_data:
|
|
for conn in self.websocket_connections.values():
|
|
asyncio.create_task(conn.stop())
|
|
|
|
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
|
|
|
async def on_start_async(self) -> None:
|
|
try:
|
|
# Wait for symbol discovery to complete
|
|
self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...")
|
|
await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0)
|
|
|
|
# ENHANCED: Start real or simulated connections based on configuration
|
|
if self.enable_real_data:
|
|
await self._start_real_websocket_connections()
|
|
else:
|
|
await self._start_simulated_connections()
|
|
|
|
self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully")
|
|
|
|
except asyncio.TimeoutError:
|
|
self.log.error("Nautilus ActorExecutor: Timeout waiting for symbol discovery")
|
|
except Exception as e:
|
|
self.log.error(f"Nautilus ActorExecutor: Failed to start SILOQYMainActor: {e}")
|
|
|
|
def handle_symbols_discovered(self, data):
|
|
"""Handle symbols from discovery actor"""
|
|
try:
|
|
symbols, timestamp = data
|
|
self.symbols = symbols
|
|
self.log.info(f"Nautilus ActorExecutor: Received {len(symbols)} symbols from discovery actor")
|
|
|
|
if self.symbols and self.active_candles:
|
|
self.discovery_complete.set()
|
|
except Exception as e:
|
|
self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}")
|
|
|
|
def handle_candles_initial(self, data):
|
|
"""Handle initial candles from discovery actor and properly initialize the candle dict."""
|
|
try:
|
|
candles_received, timestamp = data
|
|
self.log.info(f"Nautilus ActorExecutor: Received {len(candles_received)} initial candles from discovery actor")
|
|
|
|
# FIX: Re-initialize active_candles with the full dictionary structure
|
|
# STEP 5 FIX: Added rounding to 8 decimals for all price values
|
|
self.active_candles = {}
|
|
for symbol, open_price in candles_received.items():
|
|
self.active_candles[symbol] = {
|
|
'open_price': round(open_price, 8),
|
|
'open_time': (int(time.time() * 1000) // self.candle_interval_ms) * self.candle_interval_ms,
|
|
'high': round(open_price, 8),
|
|
'low': round(open_price, 8),
|
|
'close': round(open_price, 8),
|
|
'volume': 0.0,
|
|
}
|
|
|
|
if self.symbols and self.active_candles:
|
|
self.discovery_complete.set()
|
|
except Exception as e:
|
|
self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}")
|
|
|
|
async def _start_real_websocket_connections(self):
|
|
"""NEW: Start real Binance WebSocket connections with throttle control"""
|
|
self.log.info("Starting REAL Binance WebSocket connections for live data...")
|
|
|
|
# Apply throttle mode limits to WebSocket connections
|
|
symbols_to_use = self.symbols
|
|
if self.throttle_mode:
|
|
symbols_to_use = self.symbols[:10] # Limit to 10 symbols in throttle mode
|
|
self.log.warning(f"THROTTLE MODE: Limited real data to {len(symbols_to_use)} symbols")
|
|
|
|
# Calculate optimal symbol distribution
|
|
symbol_groups = self.binance_adapter.calculate_optimal_distribution(symbols_to_use)
|
|
|
|
self.log.info(f"Real WebSocket distribution:")
|
|
self.log.info(f" - Total symbols: {len(symbols_to_use)}")
|
|
self.log.info(f" - Connections needed: {len(symbol_groups)}")
|
|
|
|
# Start WebSocket connections
|
|
for i, symbol_group in enumerate(symbol_groups):
|
|
connection_id = f"binance_real_{i:03d}"
|
|
|
|
connection = SiloqyWebSocketConnection(
|
|
connection_id=connection_id,
|
|
symbols=symbol_group,
|
|
binance_adapter=self.binance_adapter,
|
|
main_actor_ref=self
|
|
)
|
|
|
|
self.websocket_connections[connection_id] = connection
|
|
|
|
# Use Nautilus ActorExecutor for WebSocket task management
|
|
if hasattr(self, '_executor') and self._executor:
|
|
future = self._executor.submit(connection.start())
|
|
self.log.info(f"Real WebSocket {connection_id} submitted to ActorExecutor")
|
|
else:
|
|
task = asyncio.create_task(connection.start())
|
|
self.websocket_tasks[connection_id] = task
|
|
self.log.warning(f"Real WebSocket {connection_id} started with asyncio fallback")
|
|
|
|
# Rate limit connection establishment
|
|
if i < len(symbol_groups) - 1:
|
|
await asyncio.sleep(1.0) # 1 second between connections
|
|
|
|
self.log.info(f"Started {len(symbol_groups)} real WebSocket connections")
|
|
|
|
async def _start_simulated_connections(self):
|
|
"""PRESERVED: Original WebSocket simulation logic with Nautilus ActorExecutor"""
|
|
self.log.info("Starting WebSocket simulation for tick generation...")
|
|
self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols")
|
|
|
|
# Use Nautilus ActorExecutor for WebSocket task management
|
|
if hasattr(self, '_executor') and self._executor:
|
|
self.log.info("Nautilus ActorExecutor: Using registered executor for WebSocket simulation")
|
|
future = self._executor.submit(self._simulate_websocket_ticks())
|
|
self.log.info("Nautilus ActorExecutor: WebSocket simulation task submitted")
|
|
else:
|
|
self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio")
|
|
task = asyncio.create_task(self._simulate_websocket_ticks())
|
|
self.ws_tasks.append(task)
|
|
|
|
async def _simulate_websocket_ticks(self):
|
|
"""PRESERVED: Original simulation logic"""
|
|
# Adjust tick rate for throttle mode
|
|
tick_delay = 0.1 if self.throttle_mode else 0.01
|
|
symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols))
|
|
|
|
tick_count = 0
|
|
try:
|
|
if self.throttle_mode:
|
|
self.log.warning(f"THROTTLE MODE: Reduced tick generation - {symbols_to_simulate} symbols at 10 ticks/second")
|
|
else:
|
|
self.log.info("WebSocket tick simulation started - generating 100 ticks/second")
|
|
|
|
while True:
|
|
for symbol in self.symbols[:symbols_to_simulate]:
|
|
# STEP 2 FIX: Round price to 8 decimals, use float for quantity with 8 decimals
|
|
price = round(100.0 + random.gauss(0, 0.5), 8)
|
|
quantity = round(random.uniform(0.00000001, 100.0), 8)
|
|
timestamp = int(time.time() * 1000)
|
|
|
|
self.on_websocket_tick(symbol, price, quantity, timestamp)
|
|
tick_count += 1
|
|
|
|
# Log every 500 ticks in throttle mode, 1000 in normal mode
|
|
log_interval = 500 if self.throttle_mode else 1000
|
|
if tick_count % log_interval == 0:
|
|
mode_indicator = "THROTTLE" if self.throttle_mode else ""
|
|
data_source = "REAL" if self.enable_real_data else "SIM"
|
|
self.log.info(f"Generated {tick_count} ticks {mode_indicator} {data_source} - latest: {symbol}@{price:.2f}")
|
|
|
|
await asyncio.sleep(tick_delay)
|
|
except asyncio.CancelledError:
|
|
self.log.info(f"Nautilus ActorExecutor: WebSocket simulation stopped after {tick_count} ticks")
|
|
except Exception as e:
|
|
self.log.error(f"Nautilus ActorExecutor: WebSocket simulation error: {e}")
|
|
|
|
def on_websocket_tick(self, symbol: str, price: float, quantity: float, timestamp: int):
|
|
"""
|
|
PRESERVED: Original tick processing and candle update logic
|
|
"""
|
|
# PRESERVED: Original candle update logic
|
|
if symbol not in self.active_candles:
|
|
candle_open_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms
|
|
# STEP 4 FIX: Round all prices to 8 decimals
|
|
self.active_candles[symbol] = {
|
|
'open_price': round(price, 8),
|
|
'open_time': candle_open_time,
|
|
'high': round(price, 8),
|
|
'low': round(price, 8),
|
|
'close': round(price, 8),
|
|
'volume': 0.0
|
|
}
|
|
|
|
candle = self.active_candles[symbol]
|
|
|
|
# PRESERVED: Original candle rolling logic
|
|
current_candle_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms
|
|
if current_candle_time > candle['open_time']:
|
|
# STEP 4 FIX: Round all prices to 8 decimals
|
|
candle['open_price'] = round(price, 8)
|
|
candle['open_time'] = current_candle_time
|
|
candle['high'] = round(price, 8)
|
|
candle['low'] = round(price, 8)
|
|
candle['close'] = round(price, 8)
|
|
candle['volume'] = quantity
|
|
else:
|
|
candle['close'] = round(price, 8)
|
|
candle['high'] = round(max(candle['high'], price), 8)
|
|
candle['low'] = round(min(candle['low'], price), 8)
|
|
candle['volume'] += quantity
|
|
|
|
# PRESERVED: Original high-performance tuple publishing
|
|
try:
|
|
if hasattr(self, 'msgbus') and self.msgbus:
|
|
# STEP 3 FIX: Round all float values to 8 decimals in the tuple
|
|
tick_tuple = (
|
|
symbol,
|
|
round(float(price), 8),
|
|
round(float(quantity), 8),
|
|
int(timestamp),
|
|
round(float(candle['open_price']), 8),
|
|
int(candle['open_time'])
|
|
)
|
|
|
|
self.msgbus.publish(RAW_TOPIC, tick_tuple)
|
|
|
|
except Exception as e:
|
|
self.log.error(f"Nautilus ActorExecutor: Failed to publish tick: {e}")
|
|
|
|
class DOLPHINRegimeActor(Actor):
|
|
def __init__(self, config: DOLPHINRegimeActorConfig):
|
|
super().__init__(config)
|
|
|
|
# PRESERVED: All original DOLPHIN configuration
|
|
self.max_symbols = config.max_symbols
|
|
self.ticks_per_analysis = config.ticks_per_analysis
|
|
|
|
# PRESERVED: All original pre-allocated arrays for zero allocation
|
|
self.open_prices = np.zeros(self.max_symbols, dtype=np.float64)
|
|
self.close_prices = np.zeros(self.max_symbols, dtype=np.float64)
|
|
self.high_prices = np.zeros(self.max_symbols, dtype=np.float64)
|
|
self.low_prices = np.zeros(self.max_symbols, dtype=np.float64)
|
|
self.volumes = np.zeros(self.max_symbols, dtype=np.float64)
|
|
self.last_update = np.zeros(self.max_symbols, dtype=np.int64)
|
|
|
|
# PRESERVED: All original mapping and state
|
|
self.symbol_to_idx = {}
|
|
self.idx_to_symbol = {}
|
|
self.active_symbols = 0
|
|
|
|
self.tick_count = 0
|
|
|
|
# PRESERVED: All original DOLPHIN thresholds - EXACT
|
|
self.bull_threshold = 0.60 # 60% bullish
|
|
self.bear_threshold = 0.55 # 55% bearish
|
|
|
|
self.previous_bull_ratio = None
|
|
self.regime_history = deque(maxlen=100)
|
|
|
|
# Metrics
|
|
self.last_metric_log = time.time_ns()
|
|
self.processed_ticks = 0
|
|
self.regime_calculations = 0
|
|
|
|
self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, "
|
|
f"ticks_per_analysis: {self.ticks_per_analysis}")
|
|
|
|
def on_start(self) -> None:
|
|
"""Subscribe to tick events - using Nautilus ActorExecutor"""
|
|
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to tick events")
|
|
|
|
if hasattr(self, 'msgbus') and self.msgbus:
|
|
self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick)
|
|
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events")
|
|
|
|
def on_stop(self) -> None:
|
|
"""Stop the actor - Nautilus handles executor cleanup"""
|
|
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping")
|
|
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
|
|
|
def handle_raw_tick(self, data):
|
|
"""
|
|
PRESERVED EXACTLY: All original zero-allocation tick processing
|
|
Using Nautilus ActorExecutor for regime detection tasks
|
|
"""
|
|
try:
|
|
symbol, price, size, ts_event, open_price, candle_open_time = data
|
|
|
|
except Exception as e:
|
|
self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}")
|
|
return
|
|
|
|
# PRESERVED EXACTLY: All original array processing logic
|
|
if symbol not in self.symbol_to_idx:
|
|
if self.active_symbols >= self.max_symbols:
|
|
self.log.error(f"Nautilus ActorExecutor: Max symbols ({self.max_symbols}) exceeded")
|
|
return
|
|
|
|
idx = self.active_symbols
|
|
self.symbol_to_idx[symbol] = idx
|
|
self.idx_to_symbol[idx] = symbol
|
|
self.active_symbols += 1
|
|
|
|
# Initialize arrays
|
|
self.open_prices[idx] = open_price
|
|
self.high_prices[idx] = price
|
|
self.low_prices[idx] = price
|
|
self.close_prices[idx] = price
|
|
self.volumes[idx] = 0.0
|
|
else:
|
|
idx = self.symbol_to_idx[symbol]
|
|
|
|
# Check if new candle period
|
|
if candle_open_time > self.last_update[idx]:
|
|
# Reset for new candle
|
|
self.open_prices[idx] = open_price
|
|
self.high_prices[idx] = price
|
|
self.low_prices[idx] = price
|
|
self.close_prices[idx] = price
|
|
self.volumes[idx] = size
|
|
self.last_update[idx] = candle_open_time
|
|
else:
|
|
# Update existing candle data
|
|
self.close_prices[idx] = price
|
|
self.high_prices[idx] = max(self.high_prices[idx], price)
|
|
self.low_prices[idx] = min(self.low_prices[idx], price)
|
|
self.volumes[idx] += size
|
|
|
|
self.tick_count += 1
|
|
self.processed_ticks += 1
|
|
|
|
# PRESERVED: Original trigger logic
|
|
if self.tick_count >= self.ticks_per_analysis:
|
|
# Use Nautilus ActorExecutor for regime detection
|
|
if hasattr(self, '_executor') and self._executor:
|
|
future = self._executor.submit(self._run_regime_detection_async())
|
|
else:
|
|
# Fallback to sync detection
|
|
self._run_regime_detection()
|
|
self.tick_count = 0
|
|
|
|
# Periodic metrics logging
|
|
now = time.time_ns()
|
|
if now - self.last_metric_log > 1_000_000_000:
|
|
self.log.info(f"Nautilus ActorExecutor: DOLPHIN metrics - ticks: {self.processed_ticks}, "
|
|
f"regime_calcs: {self.regime_calculations}, active_symbols: {self.active_symbols}")
|
|
self.last_metric_log = now
|
|
|
|
async def _run_regime_detection_async(self):
|
|
try:
|
|
self._run_regime_detection()
|
|
except Exception as e:
|
|
self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}")
|
|
|
|
def _run_regime_detection(self):
|
|
self.regime_calculations += 1
|
|
|
|
total_symbols = self.active_symbols
|
|
if total_symbols == 0:
|
|
return
|
|
|
|
analyzed = 0
|
|
bullish = 0
|
|
bearish = 0
|
|
|
|
# PRESERVED: Original analysis with exact thresholds
|
|
for idx in range(self.active_symbols):
|
|
open_price = self.open_prices[idx]
|
|
close_price = self.close_prices[idx]
|
|
|
|
if open_price == 0:
|
|
continue
|
|
|
|
analyzed += 1
|
|
|
|
# PRESERVED: EXACT DOLPHIN thresholds
|
|
change = (close_price - open_price) / open_price
|
|
|
|
if change >= 0.0015: # 0.15% threshold for bullish
|
|
bullish += 1
|
|
elif change <= -0.0015: # -0.15% threshold for bearish
|
|
bearish += 1
|
|
|
|
if analyzed == 0:
|
|
return
|
|
|
|
# PRESERVED: Original ratio calculations
|
|
bull_ratio = bullish / analyzed
|
|
bear_ratio = bearish / analyzed
|
|
sideways_ratio = 1.0 - bull_ratio - bear_ratio
|
|
|
|
# MODIFIED: Original DOLPHIN Algorithm - Simple 3-regime detection
|
|
if bull_ratio >= self.bull_threshold: # 60% bullish
|
|
regime = MarketRegime.BULL
|
|
elif bear_ratio >= self.bear_threshold: # 55% bearish
|
|
regime = MarketRegime.BEAR
|
|
else:
|
|
# Everything else is sideways
|
|
regime = MarketRegime.SIDEWAYS
|
|
|
|
# PRESERVED: Original confidence calculation
|
|
confidence = self._calculate_confidence(bull_ratio, bear_ratio, analyzed, total_symbols)
|
|
|
|
self.previous_bull_ratio = bull_ratio
|
|
|
|
# Publish regime result using Nautilus message bus
|
|
try:
|
|
if hasattr(self, 'msgbus') and self.msgbus:
|
|
regime_tuple = (
|
|
int(time.time() * 1000),
|
|
regime.value,
|
|
float(bull_ratio),
|
|
float(bear_ratio),
|
|
float(sideways_ratio),
|
|
int(analyzed),
|
|
int(total_symbols),
|
|
float(confidence)
|
|
)
|
|
|
|
self.msgbus.publish(REGIME_TOPIC, regime_tuple)
|
|
|
|
except Exception as e:
|
|
self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}")
|
|
|
|
# Log regime changes
|
|
if not self.regime_history or regime != self.regime_history[-1]:
|
|
self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.1%} "
|
|
f"Bear: {bear_ratio:.1%} Sideways: {sideways_ratio:.1%} | "
|
|
f"Confidence: {confidence:.1%} | Analyzed: {analyzed}/{total_symbols}")
|
|
self.regime_history.append(regime)
|
|
|
|
# Periodic regime status (even without changes)
|
|
if self.regime_calculations % 10 == 0: # Every 10 calculations
|
|
self.log.info(f"REGIME STATUS: {regime.value} | Bull: {bull_ratio:.1%} "
|
|
f"Bear: {bear_ratio:.1%} | Processed: {self.processed_ticks} ticks")
|
|
|
|
def _calculate_confidence(self, bull_ratio: float, bear_ratio: float,
|
|
analyzed: int, total: int) -> float:
|
|
"""PRESERVED EXACTLY: Original DOLPHIN confidence calculation"""
|
|
if analyzed == 0:
|
|
return 0.0
|
|
|
|
# Market Decisiveness
|
|
max_ratio = max(bull_ratio, bear_ratio)
|
|
decisiveness = abs(max_ratio - 0.5) * 2
|
|
|
|
# Sample Coverage
|
|
coverage = analyzed / total
|
|
|
|
# Statistical Significance
|
|
if max_ratio > 0 and max_ratio < 1:
|
|
standard_error = math.sqrt(max_ratio * (1 - max_ratio) / analyzed)
|
|
else:
|
|
standard_error = 0.0
|
|
|
|
z_score = abs(max_ratio - 0.5) / max(standard_error, 0.001)
|
|
statistical_confidence = min(z_score / 3.0, 1.0)
|
|
|
|
# Market Clarity
|
|
market_clarity = bull_ratio + bear_ratio
|
|
|
|
# Weighted combination
|
|
confidence = (
|
|
decisiveness * 0.40 +
|
|
coverage * 0.10 +
|
|
statistical_confidence * 0.30 +
|
|
market_clarity * 0.20
|
|
)
|
|
|
|
return max(0.0, min(confidence, 1.0))
|
|
|
|
class SILOQYNormalizerActor(Actor):
|
|
def __init__(self, config: SILOQYNormalizerConfig):
|
|
super().__init__(config)
|
|
|
|
self.normalized_count = 0
|
|
self.last_metric_log = time.time_ns()
|
|
|
|
self.log.info("SILOQYNormalizerActor initialized with Nautilus ActorExecutor")
|
|
|
|
def on_start(self) -> None:
|
|
"""Subscribe to tick events - using Nautilus ActorExecutor"""
|
|
self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor starting - subscribing to tick events")
|
|
|
|
if hasattr(self, 'msgbus') and self.msgbus:
|
|
self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick)
|
|
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events")
|
|
|
|
def on_stop(self) -> None:
|
|
"""Stop the actor - Nautilus handles executor cleanup"""
|
|
self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor stopping")
|
|
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
|
|
|
def handle_raw_tick(self, data):
|
|
try:
|
|
symbol, price, size, ts_event, open_price, candle_open_time = data
|
|
tick = SiloqyTradeTick(
|
|
instrument_id=symbol,
|
|
price=round(float(price), 8),
|
|
size=round(float(size), 8),
|
|
ts_event=int(ts_event),
|
|
ts_init=int(time.time_ns()),
|
|
open_price=round(float(open_price), 8) if open_price is not None else round(float(price), 8),
|
|
candle_open_time=int(candle_open_time) if candle_open_time is not None else int(time.time() * 1000),
|
|
tick_side=None, # Default to None unless Binance provides this info
|
|
exchange="BINANCE", # We know this is from Binance
|
|
liquidity_flag=None # Default to None unless Binance provides this info
|
|
)
|
|
|
|
if hasattr(self, 'msgbus') and self.msgbus:
|
|
try:
|
|
# Publish directly to STRUCTURED_TOPIC to match original v3.28 behavior
|
|
self.msgbus.publish(STRUCTURED_TOPIC, tick)
|
|
|
|
except Exception as e:
|
|
self.log.error(f"Nautilus ActorExecutor: Failed to publish to STRUCTURED_TOPIC: {e}")
|
|
|
|
# FALLBACK: Try Nautilus publish_data if msgbus not available
|
|
elif hasattr(self, 'publish_data'):
|
|
try:
|
|
# Use standard Nautilus publishing as fallback
|
|
self.publish_data(DataType(SiloqyTradeTick), tick)
|
|
except Exception as e:
|
|
self.log.error(f"Nautilus ActorExecutor: Failed to publish structured data: {e}")
|
|
self.normalized_count += 1
|
|
|
|
# Periodic metrics
|
|
now = time.time_ns()
|
|
if now - self.last_metric_log > 1_000_000_000:
|
|
self.log.info(f"Nautilus ActorExecutor: Normalizer processed: {self.normalized_count} ticks")
|
|
self.last_metric_log = now
|
|
|
|
except Exception as e:
|
|
self.log.error(f"Nautilus ActorExecutor: Failed to handle raw tick: {e}")
|
|
|
|
def test_siloqy_actors_with_nautilus_process_management():
|
|
"""Test SILOQY actors with Nautilus built-in ActorExecutor process management"""
|
|
print("SILOQY Multi-Actor Test with Nautilus Built-in Process Management")
|
|
|
|
# Configuration following test pattern - THROTTLE MODE ENABLED FOR DEV/TESTING
|
|
symbol_discovery_config = ImportableActorConfig(
|
|
actor_path="__main__:SILOQYSymbolDiscoveryActor",
|
|
config_path="__main__:SILOQYSymbolDiscoveryConfig",
|
|
config={
|
|
"component_id": "SILOQY-SYMBOL-DISCOVERY",
|
|
"symbols": [], # Empty for dynamic discovery
|
|
"candle_interval_ms": 15 * 60 * 1000,
|
|
"throttle_mode": True, # ENABLED: Safe for dual instance testing
|
|
"throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s)
|
|
"max_symbols_throttled": 100 # Only 100 symbols (vs 2000+)
|
|
}
|
|
)
|
|
|
|
main_actor_config = ImportableActorConfig(
|
|
actor_path="__main__:SILOQYMainActor",
|
|
config_path="__main__:SILOQYMainActorConfig",
|
|
config={
|
|
"component_id": "SILOQY-MAIN-ACTOR",
|
|
"candle_interval_ms": 15 * 60 * 1000,
|
|
"throttle_mode": True, # ENABLED: Reduced tick generation
|
|
"enable_real_data": True # CHANGE TO True for real WebSocket data
|
|
}
|
|
)
|
|
|
|
regime_actor_config = ImportableActorConfig(
|
|
actor_path="__main__:DOLPHINRegimeActor",
|
|
config_path="__main__:DOLPHINRegimeActorConfig",
|
|
config={
|
|
"component_id": "DOLPHIN-REGIME-ACTOR",
|
|
"max_symbols": 5000,
|
|
"ticks_per_analysis": 250 # Reduced for throttle mode testing
|
|
}
|
|
)
|
|
|
|
normalizer_config = ImportableActorConfig(
|
|
actor_path="__main__:SILOQYNormalizerActor",
|
|
config_path="__main__:SILOQYNormalizerConfig",
|
|
config={
|
|
"component_id": "SILOQY-NORMALIZER"
|
|
}
|
|
)
|
|
|
|
# Trading node configuration - Uses Nautilus built-in process management
|
|
trading_config = TradingNodeConfig(
|
|
trader_id=TraderId("SILOQY-TRADER-001"),
|
|
actors=[
|
|
symbol_discovery_config,
|
|
main_actor_config,
|
|
regime_actor_config,
|
|
normalizer_config
|
|
],
|
|
data_clients={}, # No external data clients needed
|
|
exec_clients={} # No execution clients for this test
|
|
)
|
|
|
|
node = TradingNode(config=trading_config)
|
|
|
|
try:
|
|
node.build()
|
|
print("Node built successfully with Nautilus built-in process management")
|
|
node.run()
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nSILOQY Actor test with Nautilus process management completed!")
|
|
except Exception as e:
|
|
print(f"Error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
try:
|
|
node.dispose()
|
|
print("Nautilus process management: Node disposed successfully")
|
|
except:
|
|
pass
|
|
|
|
if __name__ == "__main__":
|
|
test_siloqy_actors_with_nautilus_process_management() |