feat(nautilus): initial integration

This commit is contained in:
2025-08-31 12:39:26 +02:00
parent 34932dda3d
commit 9f1dbffc2a

View File

@@ -3,10 +3,11 @@ import numpy as np
import asyncio
import json
import math
import httpx
import websockets
from typing import Dict, List, Optional, Tuple, Any
from enum import Enum
from collections import deque
import httpx
import random # Added for _simulate_websocket_ticks
# Nautilus imports - following test pattern
@@ -44,6 +45,239 @@ class MarketRegime(Enum):
BEAR = "BEAR"
TRANSITION = "TRANSITION"
SIDEWAYS = "SIDEWAYS"
# ============================================================================
# EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE
# ============================================================================
@dataclass
class TickData:
"""Universal tick data structure from standalone"""
symbol: str
price: float
quantity: float
timestamp: int
is_buyer_maker: bool
trade_id: int
exchange: str
raw_data: Dict = None
def __post_init__(self):
if self.raw_data is None:
self.raw_data = {}
@dataclass
class ExchangeConfig:
"""Exchange-specific configuration from standalone"""
name: str
websocket_limit_per_second: int
max_streams_per_connection: int
max_connections_per_window: int
connection_window_seconds: int
base_websocket_url: str
requires_auth: bool = False
ping_interval: float = 20.0
ping_timeout: float = 10.0
class BinanceAdapter:
"""EXTRACTED FROM STANDALONE: Binance-specific WebSocket adapter"""
def __init__(self):
self.config = ExchangeConfig(
name="binance",
websocket_limit_per_second=10,
max_streams_per_connection=1024,
max_connections_per_window=300,
connection_window_seconds=300, # 5 minutes
base_websocket_url="wss://stream.binance.com:9443",
requires_auth=False
)
self.logger = Logger(f"BINANCE_ADAPTER")
def get_websocket_url(self, symbols: List[str]) -> str:
"""Build Binance WebSocket URL with combined streams"""
if len(symbols) == 1:
symbol = symbols[0].lower()
return f"{self.config.base_websocket_url}/ws/{symbol}@trade"
else:
# Combined stream approach
streams = [f"{symbol.lower()}@trade" for symbol in symbols]
stream_string = "/".join(streams)
return f"{self.config.base_websocket_url}/stream?streams={stream_string}"
def parse_message(self, message: str) -> Optional[TickData]:
"""EXTRACTED FROM STANDALONE: Parse Binance trade message"""
try:
data = json.loads(message)
# Handle combined stream format
if 'stream' in data and 'data' in data:
stream_name = data['stream']
trade_data = data['data']
else:
# Single stream format
trade_data = data
stream_name = trade_data.get('e', '')
# Only process trade events
if not (stream_name.endswith('@trade') or trade_data.get('e') == 'trade'):
return None
return TickData(
symbol=trade_data['s'],
price=float(trade_data['p']),
quantity=float(trade_data['q']),
timestamp=int(trade_data['T']),
is_buyer_maker=trade_data['m'],
trade_id=int(trade_data['t']),
exchange="binance",
raw_data=trade_data
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
self.logger.error(f"Failed to parse Binance message: {e}")
return None
def calculate_optimal_distribution(self, symbols: List[str]) -> List[List[str]]:
"""EXTRACTED FROM STANDALONE: Calculate optimal symbol distribution across connections"""
# Conservative: leave room for message bursts
safety_margin = 0.7
symbols_per_connection = int(
min(
self.config.max_streams_per_connection * safety_margin,
self.config.websocket_limit_per_second * 5 # 5-second buffer
)
)
# Group symbols into chunks
symbol_groups = []
for i in range(0, len(symbols), symbols_per_connection):
group = symbols[i:i + symbols_per_connection]
symbol_groups.append(group)
return symbol_groups
class SiloqyWebSocketConnection:
"""EXTRACTED FROM STANDALONE: WebSocket connection adapted for Nautilus msgbus"""
def __init__(self, connection_id: str, symbols: List[str],
binance_adapter: BinanceAdapter, main_actor_ref):
self.connection_id = connection_id
self.symbols = symbols
self.adapter = binance_adapter
self.main_actor = main_actor_ref # Reference to SILOQYMainActor for msgbus access
self.websocket = None
self.is_running = False
self.retry_count = 0
self.messages_received = 0
self.last_message_time = 0.0
self.connected_at = None
self.logger = Logger(f"WS_{connection_id}")
self.logger.info(f"WebSocket connection initialized for {len(symbols)} symbols")
async def start(self) -> None:
"""Start the WebSocket connection"""
self.is_running = True
while self.is_running:
try:
await self._connect_and_run()
except Exception as e:
self.logger.error(f"Connection {self.connection_id} error: {e}")
if self.is_running:
await self._handle_reconnection()
else:
break
async def _connect_and_run(self) -> None:
"""EXTRACTED FROM STANDALONE: Connect to WebSocket and run message loop"""
url = self.adapter.get_websocket_url(self.symbols)
self.logger.info(f"Connecting to Binance: {url[:100]}...")
async with websockets.connect(
url,
ping_interval=self.adapter.config.ping_interval,
ping_timeout=self.adapter.config.ping_timeout,
close_timeout=10
) as websocket:
self.websocket = websocket
self.connected_at = time.time()
self.retry_count = 0
self.logger.info(f"Connection {self.connection_id} established")
# Message processing loop
async for message in websocket:
if not self.is_running:
break
try:
await self._process_message(message)
except Exception as e:
self.logger.error(f"Message processing error: {e}")
async def _process_message(self, message: str) -> None:
"""Process incoming WebSocket message and convert to Nautilus format"""
self.messages_received += 1
self.last_message_time = time.time()
# Parse message using Binance adapter
tick_data = self.adapter.parse_message(message)
if tick_data:
# Convert TickData to SiloqyTradeTick and publish via Nautilus msgbus
try:
# Create SiloqyTradeTick object
siloqy_tick = SiloqyTradeTick(
instrument_id=tick_data.symbol,
price=round(tick_data.price, 8),
size=round(tick_data.quantity, 8),
ts_event=tick_data.timestamp * 1000000, # Convert ms to ns
ts_init=int(time.time_ns()),
open_price=None, # Will be set by candle logic
candle_open_time=None, # Will be set by candle logic
tick_side=None,
exchange="BINANCE",
liquidity_flag=None
)
# Get candle information from main actor
if hasattr(self.main_actor, 'active_candles') and self.main_actor.active_candles:
symbol = tick_data.symbol
if symbol in self.main_actor.active_candles:
candle = self.main_actor.active_candles[symbol]
siloqy_tick.open_price = candle.get('open_price', tick_data.price)
siloqy_tick.candle_open_time = candle.get('open_time', int(time.time() * 1000))
# Process through main actor's tick handling (preserves all candle logic)
self.main_actor.on_websocket_tick(
tick_data.symbol,
tick_data.price,
tick_data.quantity,
tick_data.timestamp
)
except Exception as e:
self.logger.error(f"Failed to process tick: {e}")
async def _handle_reconnection(self) -> None:
"""Handle reconnection with exponential backoff"""
self.retry_count += 1
delay = min(2 ** self.retry_count, 60) # Max 60 seconds
self.logger.warning(f"Reconnecting in {delay}s (attempt {self.retry_count})")
await asyncio.sleep(delay)
async def stop(self) -> None:
"""Stop the connection"""
self.logger.info(f"Stopping connection {self.connection_id}")
self.is_running = False
if self.websocket:
await self.websocket.close()
# --------------------------------------------------------------------
# SILOQY Custom Tick - PRESERVED with fixes
@@ -95,6 +329,7 @@ class SILOQYSymbolDiscoveryConfig(ActorConfig):
class SILOQYMainActorConfig(ActorConfig):
candle_interval_ms: int = 15 * 60 * 1000
throttle_mode: bool = False
enable_real_data: bool = False # NEW: Enable real WebSocket data
class DOLPHINRegimeActorConfig(ActorConfig):
max_symbols: int = 5000
@@ -200,6 +435,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
raise Exception(f"Failed to fetch exchange info: {response.status_code}")
async def _fetch_stats_and_reconstruct_candles(self):
"""PRESERVED: All original rate limiting with Nautilus async patterns"""
url = "https://api.binance.com/api/v3/ticker/24hr"
klines_url = "https://api.binance.com/api/v3/klines"
ticker_url = "https://api.binance.com/api/v3/ticker/price"
@@ -315,7 +551,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
except Exception as e:
self.log.error(f"Request failed: {str(e)}")
# PRESERVED: Original rate limiting
# PRESERVED: Original rate limiting with Nautilus async
elapsed = time.time() - start_time
if i + 50 < len(self.symbols):
sleep_time = max(0, rate_limit_interval - elapsed)
@@ -353,6 +589,7 @@ class SILOQYMainActor(Actor):
# Preserve original configuration
self.candle_interval_ms = config.candle_interval_ms
self.throttle_mode = config.throttle_mode
self.enable_real_data = config.enable_real_data # NEW: Real data capability
self.connections = {}
self.connection_tasks = {}
@@ -360,6 +597,11 @@ class SILOQYMainActor(Actor):
self.symbols = []
self.active_candles = {}
# WebSocket infrastructure - NEW
self.binance_adapter = BinanceAdapter() if self.enable_real_data else None
self.websocket_connections: Dict[str, SiloqyWebSocketConnection] = {}
self.websocket_tasks: Dict[str, asyncio.Task] = {}
# WebSocket tasks - managed by Nautilus ActorExecutor
self.ws_tasks = []
@@ -369,6 +611,11 @@ class SILOQYMainActor(Actor):
if self.throttle_mode:
self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation")
if self.enable_real_data:
self.log.info("REAL DATA MODE: Will connect to Binance WebSocket streams")
else:
self.log.info("SIMULATION MODE: Will generate simulated ticks")
self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor")
def on_start(self) -> None:
@@ -393,6 +640,12 @@ class SILOQYMainActor(Actor):
def on_stop(self) -> None:
"""Stop the actor - Nautilus handles executor cleanup"""
self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping")
# Stop WebSocket connections if running
if self.enable_real_data:
for conn in self.websocket_connections.values():
asyncio.create_task(conn.stop())
# Nautilus kernel handles executor shutdown - no manual cleanup needed
async def on_start_async(self) -> None:
@@ -401,8 +654,11 @@ class SILOQYMainActor(Actor):
self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...")
await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0)
# PRESERVED: Start WebSocket connections
await self._start_websocket_connections()
# ENHANCED: Start real or simulated connections based on configuration
if self.enable_real_data:
await self._start_real_websocket_connections()
else:
await self._start_simulated_connections()
self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully")
@@ -423,7 +679,6 @@ class SILOQYMainActor(Actor):
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}")
# FIXED: SILOQYMainActor.handle_candles_initial
def handle_candles_initial(self, data):
"""Handle initial candles from discovery actor and properly initialize the candle dict."""
try:
@@ -448,8 +703,53 @@ class SILOQYMainActor(Actor):
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}")
async def _start_websocket_connections(self):
"""PRESERVED: Original WebSocket connection logic with Nautilus ActorExecutor"""
async def _start_real_websocket_connections(self):
"""NEW: Start real Binance WebSocket connections with throttle control"""
self.log.info("Starting REAL Binance WebSocket connections for live data...")
# Apply throttle mode limits to WebSocket connections
symbols_to_use = self.symbols
if self.throttle_mode:
symbols_to_use = self.symbols[:10] # Limit to 10 symbols in throttle mode
self.log.warning(f"THROTTLE MODE: Limited real data to {len(symbols_to_use)} symbols")
# Calculate optimal symbol distribution
symbol_groups = self.binance_adapter.calculate_optimal_distribution(symbols_to_use)
self.log.info(f"Real WebSocket distribution:")
self.log.info(f" - Total symbols: {len(symbols_to_use)}")
self.log.info(f" - Connections needed: {len(symbol_groups)}")
# Start WebSocket connections
for i, symbol_group in enumerate(symbol_groups):
connection_id = f"binance_real_{i:03d}"
connection = SiloqyWebSocketConnection(
connection_id=connection_id,
symbols=symbol_group,
binance_adapter=self.binance_adapter,
main_actor_ref=self
)
self.websocket_connections[connection_id] = connection
# Use Nautilus ActorExecutor for WebSocket task management
if hasattr(self, '_executor') and self._executor:
future = self._executor.submit(connection.start())
self.log.info(f"Real WebSocket {connection_id} submitted to ActorExecutor")
else:
task = asyncio.create_task(connection.start())
self.websocket_tasks[connection_id] = task
self.log.warning(f"Real WebSocket {connection_id} started with asyncio fallback")
# Rate limit connection establishment
if i < len(symbol_groups) - 1:
await asyncio.sleep(1.0) # 1 second between connections
self.log.info(f"Started {len(symbol_groups)} real WebSocket connections")
async def _start_simulated_connections(self):
"""PRESERVED: Original WebSocket simulation logic with Nautilus ActorExecutor"""
self.log.info("Starting WebSocket simulation for tick generation...")
self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols")
@@ -464,6 +764,7 @@ class SILOQYMainActor(Actor):
self.ws_tasks.append(task)
async def _simulate_websocket_ticks(self):
"""PRESERVED: Original simulation logic"""
# Adjust tick rate for throttle mode
tick_delay = 0.1 if self.throttle_mode else 0.01
symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols))
@@ -489,7 +790,8 @@ class SILOQYMainActor(Actor):
log_interval = 500 if self.throttle_mode else 1000
if tick_count % log_interval == 0:
mode_indicator = "THROTTLE" if self.throttle_mode else ""
self.log.info(f"Generated {tick_count} ticks {mode_indicator} - latest: {symbol}@{price:.2f}")
data_source = "REAL" if self.enable_real_data else "SIM"
self.log.info(f"Generated {tick_count} ticks {mode_indicator} {data_source} - latest: {symbol}@{price:.2f}")
await asyncio.sleep(tick_delay)
except asyncio.CancelledError:
@@ -880,7 +1182,8 @@ def test_siloqy_actors_with_nautilus_process_management():
config={
"component_id": "SILOQY-MAIN-ACTOR",
"candle_interval_ms": 15 * 60 * 1000,
"throttle_mode": True # ENABLED: Reduced tick generation
"throttle_mode": True, # ENABLED: Reduced tick generation
"enable_real_data": False # CHANGE TO True for real WebSocket data
}
)