Compare commits

1 Commits

Author SHA1 Message Date
9f1dbffc2a feat(nautilus): initial integration 2025-08-31 12:39:26 +02:00

View File

@@ -3,10 +3,11 @@ import numpy as np
import asyncio import asyncio
import json import json
import math import math
import httpx
import websockets
from typing import Dict, List, Optional, Tuple, Any from typing import Dict, List, Optional, Tuple, Any
from enum import Enum from enum import Enum
from collections import deque from collections import deque
import httpx
import random # Added for _simulate_websocket_ticks import random # Added for _simulate_websocket_ticks
# Nautilus imports - following test pattern # Nautilus imports - following test pattern
@@ -44,6 +45,239 @@ class MarketRegime(Enum):
BEAR = "BEAR" BEAR = "BEAR"
TRANSITION = "TRANSITION" TRANSITION = "TRANSITION"
SIDEWAYS = "SIDEWAYS" SIDEWAYS = "SIDEWAYS"
# ============================================================================
# EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE
# ============================================================================
@dataclass
class TickData:
"""Universal tick data structure from standalone"""
symbol: str
price: float
quantity: float
timestamp: int
is_buyer_maker: bool
trade_id: int
exchange: str
raw_data: Dict = None
def __post_init__(self):
if self.raw_data is None:
self.raw_data = {}
@dataclass
class ExchangeConfig:
"""Exchange-specific configuration from standalone"""
name: str
websocket_limit_per_second: int
max_streams_per_connection: int
max_connections_per_window: int
connection_window_seconds: int
base_websocket_url: str
requires_auth: bool = False
ping_interval: float = 20.0
ping_timeout: float = 10.0
class BinanceAdapter:
"""EXTRACTED FROM STANDALONE: Binance-specific WebSocket adapter"""
def __init__(self):
self.config = ExchangeConfig(
name="binance",
websocket_limit_per_second=10,
max_streams_per_connection=1024,
max_connections_per_window=300,
connection_window_seconds=300, # 5 minutes
base_websocket_url="wss://stream.binance.com:9443",
requires_auth=False
)
self.logger = Logger(f"BINANCE_ADAPTER")
def get_websocket_url(self, symbols: List[str]) -> str:
"""Build Binance WebSocket URL with combined streams"""
if len(symbols) == 1:
symbol = symbols[0].lower()
return f"{self.config.base_websocket_url}/ws/{symbol}@trade"
else:
# Combined stream approach
streams = [f"{symbol.lower()}@trade" for symbol in symbols]
stream_string = "/".join(streams)
return f"{self.config.base_websocket_url}/stream?streams={stream_string}"
def parse_message(self, message: str) -> Optional[TickData]:
"""EXTRACTED FROM STANDALONE: Parse Binance trade message"""
try:
data = json.loads(message)
# Handle combined stream format
if 'stream' in data and 'data' in data:
stream_name = data['stream']
trade_data = data['data']
else:
# Single stream format
trade_data = data
stream_name = trade_data.get('e', '')
# Only process trade events
if not (stream_name.endswith('@trade') or trade_data.get('e') == 'trade'):
return None
return TickData(
symbol=trade_data['s'],
price=float(trade_data['p']),
quantity=float(trade_data['q']),
timestamp=int(trade_data['T']),
is_buyer_maker=trade_data['m'],
trade_id=int(trade_data['t']),
exchange="binance",
raw_data=trade_data
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
self.logger.error(f"Failed to parse Binance message: {e}")
return None
def calculate_optimal_distribution(self, symbols: List[str]) -> List[List[str]]:
"""EXTRACTED FROM STANDALONE: Calculate optimal symbol distribution across connections"""
# Conservative: leave room for message bursts
safety_margin = 0.7
symbols_per_connection = int(
min(
self.config.max_streams_per_connection * safety_margin,
self.config.websocket_limit_per_second * 5 # 5-second buffer
)
)
# Group symbols into chunks
symbol_groups = []
for i in range(0, len(symbols), symbols_per_connection):
group = symbols[i:i + symbols_per_connection]
symbol_groups.append(group)
return symbol_groups
class SiloqyWebSocketConnection:
"""EXTRACTED FROM STANDALONE: WebSocket connection adapted for Nautilus msgbus"""
def __init__(self, connection_id: str, symbols: List[str],
binance_adapter: BinanceAdapter, main_actor_ref):
self.connection_id = connection_id
self.symbols = symbols
self.adapter = binance_adapter
self.main_actor = main_actor_ref # Reference to SILOQYMainActor for msgbus access
self.websocket = None
self.is_running = False
self.retry_count = 0
self.messages_received = 0
self.last_message_time = 0.0
self.connected_at = None
self.logger = Logger(f"WS_{connection_id}")
self.logger.info(f"WebSocket connection initialized for {len(symbols)} symbols")
async def start(self) -> None:
"""Start the WebSocket connection"""
self.is_running = True
while self.is_running:
try:
await self._connect_and_run()
except Exception as e:
self.logger.error(f"Connection {self.connection_id} error: {e}")
if self.is_running:
await self._handle_reconnection()
else:
break
async def _connect_and_run(self) -> None:
"""EXTRACTED FROM STANDALONE: Connect to WebSocket and run message loop"""
url = self.adapter.get_websocket_url(self.symbols)
self.logger.info(f"Connecting to Binance: {url[:100]}...")
async with websockets.connect(
url,
ping_interval=self.adapter.config.ping_interval,
ping_timeout=self.adapter.config.ping_timeout,
close_timeout=10
) as websocket:
self.websocket = websocket
self.connected_at = time.time()
self.retry_count = 0
self.logger.info(f"Connection {self.connection_id} established")
# Message processing loop
async for message in websocket:
if not self.is_running:
break
try:
await self._process_message(message)
except Exception as e:
self.logger.error(f"Message processing error: {e}")
async def _process_message(self, message: str) -> None:
"""Process incoming WebSocket message and convert to Nautilus format"""
self.messages_received += 1
self.last_message_time = time.time()
# Parse message using Binance adapter
tick_data = self.adapter.parse_message(message)
if tick_data:
# Convert TickData to SiloqyTradeTick and publish via Nautilus msgbus
try:
# Create SiloqyTradeTick object
siloqy_tick = SiloqyTradeTick(
instrument_id=tick_data.symbol,
price=round(tick_data.price, 8),
size=round(tick_data.quantity, 8),
ts_event=tick_data.timestamp * 1000000, # Convert ms to ns
ts_init=int(time.time_ns()),
open_price=None, # Will be set by candle logic
candle_open_time=None, # Will be set by candle logic
tick_side=None,
exchange="BINANCE",
liquidity_flag=None
)
# Get candle information from main actor
if hasattr(self.main_actor, 'active_candles') and self.main_actor.active_candles:
symbol = tick_data.symbol
if symbol in self.main_actor.active_candles:
candle = self.main_actor.active_candles[symbol]
siloqy_tick.open_price = candle.get('open_price', tick_data.price)
siloqy_tick.candle_open_time = candle.get('open_time', int(time.time() * 1000))
# Process through main actor's tick handling (preserves all candle logic)
self.main_actor.on_websocket_tick(
tick_data.symbol,
tick_data.price,
tick_data.quantity,
tick_data.timestamp
)
except Exception as e:
self.logger.error(f"Failed to process tick: {e}")
async def _handle_reconnection(self) -> None:
"""Handle reconnection with exponential backoff"""
self.retry_count += 1
delay = min(2 ** self.retry_count, 60) # Max 60 seconds
self.logger.warning(f"Reconnecting in {delay}s (attempt {self.retry_count})")
await asyncio.sleep(delay)
async def stop(self) -> None:
"""Stop the connection"""
self.logger.info(f"Stopping connection {self.connection_id}")
self.is_running = False
if self.websocket:
await self.websocket.close()
# -------------------------------------------------------------------- # --------------------------------------------------------------------
# SILOQY Custom Tick - PRESERVED with fixes # SILOQY Custom Tick - PRESERVED with fixes
@@ -95,6 +329,7 @@ class SILOQYSymbolDiscoveryConfig(ActorConfig):
class SILOQYMainActorConfig(ActorConfig): class SILOQYMainActorConfig(ActorConfig):
candle_interval_ms: int = 15 * 60 * 1000 candle_interval_ms: int = 15 * 60 * 1000
throttle_mode: bool = False throttle_mode: bool = False
enable_real_data: bool = False # NEW: Enable real WebSocket data
class DOLPHINRegimeActorConfig(ActorConfig): class DOLPHINRegimeActorConfig(ActorConfig):
max_symbols: int = 5000 max_symbols: int = 5000
@@ -200,6 +435,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
raise Exception(f"Failed to fetch exchange info: {response.status_code}") raise Exception(f"Failed to fetch exchange info: {response.status_code}")
async def _fetch_stats_and_reconstruct_candles(self): async def _fetch_stats_and_reconstruct_candles(self):
"""PRESERVED: All original rate limiting with Nautilus async patterns"""
url = "https://api.binance.com/api/v3/ticker/24hr" url = "https://api.binance.com/api/v3/ticker/24hr"
klines_url = "https://api.binance.com/api/v3/klines" klines_url = "https://api.binance.com/api/v3/klines"
ticker_url = "https://api.binance.com/api/v3/ticker/price" ticker_url = "https://api.binance.com/api/v3/ticker/price"
@@ -315,7 +551,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
except Exception as e: except Exception as e:
self.log.error(f"Request failed: {str(e)}") self.log.error(f"Request failed: {str(e)}")
# PRESERVED: Original rate limiting # PRESERVED: Original rate limiting with Nautilus async
elapsed = time.time() - start_time elapsed = time.time() - start_time
if i + 50 < len(self.symbols): if i + 50 < len(self.symbols):
sleep_time = max(0, rate_limit_interval - elapsed) sleep_time = max(0, rate_limit_interval - elapsed)
@@ -353,6 +589,7 @@ class SILOQYMainActor(Actor):
# Preserve original configuration # Preserve original configuration
self.candle_interval_ms = config.candle_interval_ms self.candle_interval_ms = config.candle_interval_ms
self.throttle_mode = config.throttle_mode self.throttle_mode = config.throttle_mode
self.enable_real_data = config.enable_real_data # NEW: Real data capability
self.connections = {} self.connections = {}
self.connection_tasks = {} self.connection_tasks = {}
@@ -360,6 +597,11 @@ class SILOQYMainActor(Actor):
self.symbols = [] self.symbols = []
self.active_candles = {} self.active_candles = {}
# WebSocket infrastructure - NEW
self.binance_adapter = BinanceAdapter() if self.enable_real_data else None
self.websocket_connections: Dict[str, SiloqyWebSocketConnection] = {}
self.websocket_tasks: Dict[str, asyncio.Task] = {}
# WebSocket tasks - managed by Nautilus ActorExecutor # WebSocket tasks - managed by Nautilus ActorExecutor
self.ws_tasks = [] self.ws_tasks = []
@@ -369,6 +611,11 @@ class SILOQYMainActor(Actor):
if self.throttle_mode: if self.throttle_mode:
self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation") self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation")
if self.enable_real_data:
self.log.info("REAL DATA MODE: Will connect to Binance WebSocket streams")
else:
self.log.info("SIMULATION MODE: Will generate simulated ticks")
self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor") self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor")
def on_start(self) -> None: def on_start(self) -> None:
@@ -393,6 +640,12 @@ class SILOQYMainActor(Actor):
def on_stop(self) -> None: def on_stop(self) -> None:
"""Stop the actor - Nautilus handles executor cleanup""" """Stop the actor - Nautilus handles executor cleanup"""
self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping") self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping")
# Stop WebSocket connections if running
if self.enable_real_data:
for conn in self.websocket_connections.values():
asyncio.create_task(conn.stop())
# Nautilus kernel handles executor shutdown - no manual cleanup needed # Nautilus kernel handles executor shutdown - no manual cleanup needed
async def on_start_async(self) -> None: async def on_start_async(self) -> None:
@@ -401,8 +654,11 @@ class SILOQYMainActor(Actor):
self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...") self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...")
await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0) await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0)
# PRESERVED: Start WebSocket connections # ENHANCED: Start real or simulated connections based on configuration
await self._start_websocket_connections() if self.enable_real_data:
await self._start_real_websocket_connections()
else:
await self._start_simulated_connections()
self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully") self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully")
@@ -423,7 +679,6 @@ class SILOQYMainActor(Actor):
except Exception as e: except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}") self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}")
# FIXED: SILOQYMainActor.handle_candles_initial
def handle_candles_initial(self, data): def handle_candles_initial(self, data):
"""Handle initial candles from discovery actor and properly initialize the candle dict.""" """Handle initial candles from discovery actor and properly initialize the candle dict."""
try: try:
@@ -448,8 +703,53 @@ class SILOQYMainActor(Actor):
except Exception as e: except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}") self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}")
async def _start_websocket_connections(self): async def _start_real_websocket_connections(self):
"""PRESERVED: Original WebSocket connection logic with Nautilus ActorExecutor""" """NEW: Start real Binance WebSocket connections with throttle control"""
self.log.info("Starting REAL Binance WebSocket connections for live data...")
# Apply throttle mode limits to WebSocket connections
symbols_to_use = self.symbols
if self.throttle_mode:
symbols_to_use = self.symbols[:10] # Limit to 10 symbols in throttle mode
self.log.warning(f"THROTTLE MODE: Limited real data to {len(symbols_to_use)} symbols")
# Calculate optimal symbol distribution
symbol_groups = self.binance_adapter.calculate_optimal_distribution(symbols_to_use)
self.log.info(f"Real WebSocket distribution:")
self.log.info(f" - Total symbols: {len(symbols_to_use)}")
self.log.info(f" - Connections needed: {len(symbol_groups)}")
# Start WebSocket connections
for i, symbol_group in enumerate(symbol_groups):
connection_id = f"binance_real_{i:03d}"
connection = SiloqyWebSocketConnection(
connection_id=connection_id,
symbols=symbol_group,
binance_adapter=self.binance_adapter,
main_actor_ref=self
)
self.websocket_connections[connection_id] = connection
# Use Nautilus ActorExecutor for WebSocket task management
if hasattr(self, '_executor') and self._executor:
future = self._executor.submit(connection.start())
self.log.info(f"Real WebSocket {connection_id} submitted to ActorExecutor")
else:
task = asyncio.create_task(connection.start())
self.websocket_tasks[connection_id] = task
self.log.warning(f"Real WebSocket {connection_id} started with asyncio fallback")
# Rate limit connection establishment
if i < len(symbol_groups) - 1:
await asyncio.sleep(1.0) # 1 second between connections
self.log.info(f"Started {len(symbol_groups)} real WebSocket connections")
async def _start_simulated_connections(self):
"""PRESERVED: Original WebSocket simulation logic with Nautilus ActorExecutor"""
self.log.info("Starting WebSocket simulation for tick generation...") self.log.info("Starting WebSocket simulation for tick generation...")
self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols") self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols")
@@ -464,6 +764,7 @@ class SILOQYMainActor(Actor):
self.ws_tasks.append(task) self.ws_tasks.append(task)
async def _simulate_websocket_ticks(self): async def _simulate_websocket_ticks(self):
"""PRESERVED: Original simulation logic"""
# Adjust tick rate for throttle mode # Adjust tick rate for throttle mode
tick_delay = 0.1 if self.throttle_mode else 0.01 tick_delay = 0.1 if self.throttle_mode else 0.01
symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols)) symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols))
@@ -489,7 +790,8 @@ class SILOQYMainActor(Actor):
log_interval = 500 if self.throttle_mode else 1000 log_interval = 500 if self.throttle_mode else 1000
if tick_count % log_interval == 0: if tick_count % log_interval == 0:
mode_indicator = "THROTTLE" if self.throttle_mode else "" mode_indicator = "THROTTLE" if self.throttle_mode else ""
self.log.info(f"Generated {tick_count} ticks {mode_indicator} - latest: {symbol}@{price:.2f}") data_source = "REAL" if self.enable_real_data else "SIM"
self.log.info(f"Generated {tick_count} ticks {mode_indicator} {data_source} - latest: {symbol}@{price:.2f}")
await asyncio.sleep(tick_delay) await asyncio.sleep(tick_delay)
except asyncio.CancelledError: except asyncio.CancelledError:
@@ -880,7 +1182,8 @@ def test_siloqy_actors_with_nautilus_process_management():
config={ config={
"component_id": "SILOQY-MAIN-ACTOR", "component_id": "SILOQY-MAIN-ACTOR",
"candle_interval_ms": 15 * 60 * 1000, "candle_interval_ms": 15 * 60 * 1000,
"throttle_mode": True # ENABLED: Reduced tick generation "throttle_mode": True, # ENABLED: Reduced tick generation
"enable_real_data": False # CHANGE TO True for real WebSocket data
} }
) )