Compare commits
1 Commits
main
...
9f1dbffc2a
| Author | SHA1 | Date | |
|---|---|---|---|
| 9f1dbffc2a |
@@ -3,10 +3,11 @@ import numpy as np
|
||||
import asyncio
|
||||
import json
|
||||
import math
|
||||
import httpx
|
||||
import websockets
|
||||
from typing import Dict, List, Optional, Tuple, Any
|
||||
from enum import Enum
|
||||
from collections import deque
|
||||
import httpx
|
||||
import random # Added for _simulate_websocket_ticks
|
||||
|
||||
# Nautilus imports - following test pattern
|
||||
@@ -45,6 +46,239 @@ class MarketRegime(Enum):
|
||||
TRANSITION = "TRANSITION"
|
||||
SIDEWAYS = "SIDEWAYS"
|
||||
|
||||
# ============================================================================
|
||||
# EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE
|
||||
# ============================================================================
|
||||
|
||||
@dataclass
|
||||
class TickData:
|
||||
"""Universal tick data structure from standalone"""
|
||||
symbol: str
|
||||
price: float
|
||||
quantity: float
|
||||
timestamp: int
|
||||
is_buyer_maker: bool
|
||||
trade_id: int
|
||||
exchange: str
|
||||
raw_data: Dict = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.raw_data is None:
|
||||
self.raw_data = {}
|
||||
|
||||
@dataclass
|
||||
class ExchangeConfig:
|
||||
"""Exchange-specific configuration from standalone"""
|
||||
name: str
|
||||
websocket_limit_per_second: int
|
||||
max_streams_per_connection: int
|
||||
max_connections_per_window: int
|
||||
connection_window_seconds: int
|
||||
base_websocket_url: str
|
||||
requires_auth: bool = False
|
||||
ping_interval: float = 20.0
|
||||
ping_timeout: float = 10.0
|
||||
|
||||
class BinanceAdapter:
|
||||
"""EXTRACTED FROM STANDALONE: Binance-specific WebSocket adapter"""
|
||||
|
||||
def __init__(self):
|
||||
self.config = ExchangeConfig(
|
||||
name="binance",
|
||||
websocket_limit_per_second=10,
|
||||
max_streams_per_connection=1024,
|
||||
max_connections_per_window=300,
|
||||
connection_window_seconds=300, # 5 minutes
|
||||
base_websocket_url="wss://stream.binance.com:9443",
|
||||
requires_auth=False
|
||||
)
|
||||
self.logger = Logger(f"BINANCE_ADAPTER")
|
||||
|
||||
def get_websocket_url(self, symbols: List[str]) -> str:
|
||||
"""Build Binance WebSocket URL with combined streams"""
|
||||
if len(symbols) == 1:
|
||||
symbol = symbols[0].lower()
|
||||
return f"{self.config.base_websocket_url}/ws/{symbol}@trade"
|
||||
else:
|
||||
# Combined stream approach
|
||||
streams = [f"{symbol.lower()}@trade" for symbol in symbols]
|
||||
stream_string = "/".join(streams)
|
||||
return f"{self.config.base_websocket_url}/stream?streams={stream_string}"
|
||||
|
||||
def parse_message(self, message: str) -> Optional[TickData]:
|
||||
"""EXTRACTED FROM STANDALONE: Parse Binance trade message"""
|
||||
try:
|
||||
data = json.loads(message)
|
||||
|
||||
# Handle combined stream format
|
||||
if 'stream' in data and 'data' in data:
|
||||
stream_name = data['stream']
|
||||
trade_data = data['data']
|
||||
else:
|
||||
# Single stream format
|
||||
trade_data = data
|
||||
stream_name = trade_data.get('e', '')
|
||||
|
||||
# Only process trade events
|
||||
if not (stream_name.endswith('@trade') or trade_data.get('e') == 'trade'):
|
||||
return None
|
||||
|
||||
return TickData(
|
||||
symbol=trade_data['s'],
|
||||
price=float(trade_data['p']),
|
||||
quantity=float(trade_data['q']),
|
||||
timestamp=int(trade_data['T']),
|
||||
is_buyer_maker=trade_data['m'],
|
||||
trade_id=int(trade_data['t']),
|
||||
exchange="binance",
|
||||
raw_data=trade_data
|
||||
)
|
||||
|
||||
except (json.JSONDecodeError, KeyError, ValueError) as e:
|
||||
self.logger.error(f"Failed to parse Binance message: {e}")
|
||||
return None
|
||||
|
||||
def calculate_optimal_distribution(self, symbols: List[str]) -> List[List[str]]:
|
||||
"""EXTRACTED FROM STANDALONE: Calculate optimal symbol distribution across connections"""
|
||||
# Conservative: leave room for message bursts
|
||||
safety_margin = 0.7
|
||||
symbols_per_connection = int(
|
||||
min(
|
||||
self.config.max_streams_per_connection * safety_margin,
|
||||
self.config.websocket_limit_per_second * 5 # 5-second buffer
|
||||
)
|
||||
)
|
||||
|
||||
# Group symbols into chunks
|
||||
symbol_groups = []
|
||||
for i in range(0, len(symbols), symbols_per_connection):
|
||||
group = symbols[i:i + symbols_per_connection]
|
||||
symbol_groups.append(group)
|
||||
|
||||
return symbol_groups
|
||||
|
||||
class SiloqyWebSocketConnection:
|
||||
"""EXTRACTED FROM STANDALONE: WebSocket connection adapted for Nautilus msgbus"""
|
||||
|
||||
def __init__(self, connection_id: str, symbols: List[str],
|
||||
binance_adapter: BinanceAdapter, main_actor_ref):
|
||||
self.connection_id = connection_id
|
||||
self.symbols = symbols
|
||||
self.adapter = binance_adapter
|
||||
self.main_actor = main_actor_ref # Reference to SILOQYMainActor for msgbus access
|
||||
|
||||
self.websocket = None
|
||||
self.is_running = False
|
||||
self.retry_count = 0
|
||||
self.messages_received = 0
|
||||
self.last_message_time = 0.0
|
||||
self.connected_at = None
|
||||
|
||||
self.logger = Logger(f"WS_{connection_id}")
|
||||
self.logger.info(f"WebSocket connection initialized for {len(symbols)} symbols")
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the WebSocket connection"""
|
||||
self.is_running = True
|
||||
|
||||
while self.is_running:
|
||||
try:
|
||||
await self._connect_and_run()
|
||||
except Exception as e:
|
||||
self.logger.error(f"Connection {self.connection_id} error: {e}")
|
||||
|
||||
if self.is_running:
|
||||
await self._handle_reconnection()
|
||||
else:
|
||||
break
|
||||
|
||||
async def _connect_and_run(self) -> None:
|
||||
"""EXTRACTED FROM STANDALONE: Connect to WebSocket and run message loop"""
|
||||
url = self.adapter.get_websocket_url(self.symbols)
|
||||
|
||||
self.logger.info(f"Connecting to Binance: {url[:100]}...")
|
||||
|
||||
async with websockets.connect(
|
||||
url,
|
||||
ping_interval=self.adapter.config.ping_interval,
|
||||
ping_timeout=self.adapter.config.ping_timeout,
|
||||
close_timeout=10
|
||||
) as websocket:
|
||||
self.websocket = websocket
|
||||
self.connected_at = time.time()
|
||||
self.retry_count = 0
|
||||
|
||||
self.logger.info(f"Connection {self.connection_id} established")
|
||||
|
||||
# Message processing loop
|
||||
async for message in websocket:
|
||||
if not self.is_running:
|
||||
break
|
||||
|
||||
try:
|
||||
await self._process_message(message)
|
||||
except Exception as e:
|
||||
self.logger.error(f"Message processing error: {e}")
|
||||
|
||||
async def _process_message(self, message: str) -> None:
|
||||
"""Process incoming WebSocket message and convert to Nautilus format"""
|
||||
self.messages_received += 1
|
||||
self.last_message_time = time.time()
|
||||
|
||||
# Parse message using Binance adapter
|
||||
tick_data = self.adapter.parse_message(message)
|
||||
if tick_data:
|
||||
# Convert TickData to SiloqyTradeTick and publish via Nautilus msgbus
|
||||
try:
|
||||
# Create SiloqyTradeTick object
|
||||
siloqy_tick = SiloqyTradeTick(
|
||||
instrument_id=tick_data.symbol,
|
||||
price=round(tick_data.price, 8),
|
||||
size=round(tick_data.quantity, 8),
|
||||
ts_event=tick_data.timestamp * 1000000, # Convert ms to ns
|
||||
ts_init=int(time.time_ns()),
|
||||
open_price=None, # Will be set by candle logic
|
||||
candle_open_time=None, # Will be set by candle logic
|
||||
tick_side=None,
|
||||
exchange="BINANCE",
|
||||
liquidity_flag=None
|
||||
)
|
||||
|
||||
# Get candle information from main actor
|
||||
if hasattr(self.main_actor, 'active_candles') and self.main_actor.active_candles:
|
||||
symbol = tick_data.symbol
|
||||
if symbol in self.main_actor.active_candles:
|
||||
candle = self.main_actor.active_candles[symbol]
|
||||
siloqy_tick.open_price = candle.get('open_price', tick_data.price)
|
||||
siloqy_tick.candle_open_time = candle.get('open_time', int(time.time() * 1000))
|
||||
|
||||
# Process through main actor's tick handling (preserves all candle logic)
|
||||
self.main_actor.on_websocket_tick(
|
||||
tick_data.symbol,
|
||||
tick_data.price,
|
||||
tick_data.quantity,
|
||||
tick_data.timestamp
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Failed to process tick: {e}")
|
||||
|
||||
async def _handle_reconnection(self) -> None:
|
||||
"""Handle reconnection with exponential backoff"""
|
||||
self.retry_count += 1
|
||||
delay = min(2 ** self.retry_count, 60) # Max 60 seconds
|
||||
|
||||
self.logger.warning(f"Reconnecting in {delay}s (attempt {self.retry_count})")
|
||||
await asyncio.sleep(delay)
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""Stop the connection"""
|
||||
self.logger.info(f"Stopping connection {self.connection_id}")
|
||||
self.is_running = False
|
||||
|
||||
if self.websocket:
|
||||
await self.websocket.close()
|
||||
|
||||
# --------------------------------------------------------------------
|
||||
# SILOQY Custom Tick - PRESERVED with fixes
|
||||
# --------------------------------------------------------------------
|
||||
@@ -95,6 +329,7 @@ class SILOQYSymbolDiscoveryConfig(ActorConfig):
|
||||
class SILOQYMainActorConfig(ActorConfig):
|
||||
candle_interval_ms: int = 15 * 60 * 1000
|
||||
throttle_mode: bool = False
|
||||
enable_real_data: bool = False # NEW: Enable real WebSocket data
|
||||
|
||||
class DOLPHINRegimeActorConfig(ActorConfig):
|
||||
max_symbols: int = 5000
|
||||
@@ -200,6 +435,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
|
||||
raise Exception(f"Failed to fetch exchange info: {response.status_code}")
|
||||
|
||||
async def _fetch_stats_and_reconstruct_candles(self):
|
||||
"""PRESERVED: All original rate limiting with Nautilus async patterns"""
|
||||
url = "https://api.binance.com/api/v3/ticker/24hr"
|
||||
klines_url = "https://api.binance.com/api/v3/klines"
|
||||
ticker_url = "https://api.binance.com/api/v3/ticker/price"
|
||||
@@ -315,7 +551,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
|
||||
except Exception as e:
|
||||
self.log.error(f"Request failed: {str(e)}")
|
||||
|
||||
# PRESERVED: Original rate limiting
|
||||
# PRESERVED: Original rate limiting with Nautilus async
|
||||
elapsed = time.time() - start_time
|
||||
if i + 50 < len(self.symbols):
|
||||
sleep_time = max(0, rate_limit_interval - elapsed)
|
||||
@@ -353,6 +589,7 @@ class SILOQYMainActor(Actor):
|
||||
# Preserve original configuration
|
||||
self.candle_interval_ms = config.candle_interval_ms
|
||||
self.throttle_mode = config.throttle_mode
|
||||
self.enable_real_data = config.enable_real_data # NEW: Real data capability
|
||||
self.connections = {}
|
||||
self.connection_tasks = {}
|
||||
|
||||
@@ -360,6 +597,11 @@ class SILOQYMainActor(Actor):
|
||||
self.symbols = []
|
||||
self.active_candles = {}
|
||||
|
||||
# WebSocket infrastructure - NEW
|
||||
self.binance_adapter = BinanceAdapter() if self.enable_real_data else None
|
||||
self.websocket_connections: Dict[str, SiloqyWebSocketConnection] = {}
|
||||
self.websocket_tasks: Dict[str, asyncio.Task] = {}
|
||||
|
||||
# WebSocket tasks - managed by Nautilus ActorExecutor
|
||||
self.ws_tasks = []
|
||||
|
||||
@@ -369,6 +611,11 @@ class SILOQYMainActor(Actor):
|
||||
if self.throttle_mode:
|
||||
self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation")
|
||||
|
||||
if self.enable_real_data:
|
||||
self.log.info("REAL DATA MODE: Will connect to Binance WebSocket streams")
|
||||
else:
|
||||
self.log.info("SIMULATION MODE: Will generate simulated ticks")
|
||||
|
||||
self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor")
|
||||
|
||||
def on_start(self) -> None:
|
||||
@@ -393,6 +640,12 @@ class SILOQYMainActor(Actor):
|
||||
def on_stop(self) -> None:
|
||||
"""Stop the actor - Nautilus handles executor cleanup"""
|
||||
self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping")
|
||||
|
||||
# Stop WebSocket connections if running
|
||||
if self.enable_real_data:
|
||||
for conn in self.websocket_connections.values():
|
||||
asyncio.create_task(conn.stop())
|
||||
|
||||
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
||||
|
||||
async def on_start_async(self) -> None:
|
||||
@@ -401,8 +654,11 @@ class SILOQYMainActor(Actor):
|
||||
self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...")
|
||||
await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0)
|
||||
|
||||
# PRESERVED: Start WebSocket connections
|
||||
await self._start_websocket_connections()
|
||||
# ENHANCED: Start real or simulated connections based on configuration
|
||||
if self.enable_real_data:
|
||||
await self._start_real_websocket_connections()
|
||||
else:
|
||||
await self._start_simulated_connections()
|
||||
|
||||
self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully")
|
||||
|
||||
@@ -423,7 +679,6 @@ class SILOQYMainActor(Actor):
|
||||
except Exception as e:
|
||||
self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}")
|
||||
|
||||
# FIXED: SILOQYMainActor.handle_candles_initial
|
||||
def handle_candles_initial(self, data):
|
||||
"""Handle initial candles from discovery actor and properly initialize the candle dict."""
|
||||
try:
|
||||
@@ -448,8 +703,53 @@ class SILOQYMainActor(Actor):
|
||||
except Exception as e:
|
||||
self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}")
|
||||
|
||||
async def _start_websocket_connections(self):
|
||||
"""PRESERVED: Original WebSocket connection logic with Nautilus ActorExecutor"""
|
||||
async def _start_real_websocket_connections(self):
|
||||
"""NEW: Start real Binance WebSocket connections with throttle control"""
|
||||
self.log.info("Starting REAL Binance WebSocket connections for live data...")
|
||||
|
||||
# Apply throttle mode limits to WebSocket connections
|
||||
symbols_to_use = self.symbols
|
||||
if self.throttle_mode:
|
||||
symbols_to_use = self.symbols[:10] # Limit to 10 symbols in throttle mode
|
||||
self.log.warning(f"THROTTLE MODE: Limited real data to {len(symbols_to_use)} symbols")
|
||||
|
||||
# Calculate optimal symbol distribution
|
||||
symbol_groups = self.binance_adapter.calculate_optimal_distribution(symbols_to_use)
|
||||
|
||||
self.log.info(f"Real WebSocket distribution:")
|
||||
self.log.info(f" - Total symbols: {len(symbols_to_use)}")
|
||||
self.log.info(f" - Connections needed: {len(symbol_groups)}")
|
||||
|
||||
# Start WebSocket connections
|
||||
for i, symbol_group in enumerate(symbol_groups):
|
||||
connection_id = f"binance_real_{i:03d}"
|
||||
|
||||
connection = SiloqyWebSocketConnection(
|
||||
connection_id=connection_id,
|
||||
symbols=symbol_group,
|
||||
binance_adapter=self.binance_adapter,
|
||||
main_actor_ref=self
|
||||
)
|
||||
|
||||
self.websocket_connections[connection_id] = connection
|
||||
|
||||
# Use Nautilus ActorExecutor for WebSocket task management
|
||||
if hasattr(self, '_executor') and self._executor:
|
||||
future = self._executor.submit(connection.start())
|
||||
self.log.info(f"Real WebSocket {connection_id} submitted to ActorExecutor")
|
||||
else:
|
||||
task = asyncio.create_task(connection.start())
|
||||
self.websocket_tasks[connection_id] = task
|
||||
self.log.warning(f"Real WebSocket {connection_id} started with asyncio fallback")
|
||||
|
||||
# Rate limit connection establishment
|
||||
if i < len(symbol_groups) - 1:
|
||||
await asyncio.sleep(1.0) # 1 second between connections
|
||||
|
||||
self.log.info(f"Started {len(symbol_groups)} real WebSocket connections")
|
||||
|
||||
async def _start_simulated_connections(self):
|
||||
"""PRESERVED: Original WebSocket simulation logic with Nautilus ActorExecutor"""
|
||||
self.log.info("Starting WebSocket simulation for tick generation...")
|
||||
self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols")
|
||||
|
||||
@@ -464,6 +764,7 @@ class SILOQYMainActor(Actor):
|
||||
self.ws_tasks.append(task)
|
||||
|
||||
async def _simulate_websocket_ticks(self):
|
||||
"""PRESERVED: Original simulation logic"""
|
||||
# Adjust tick rate for throttle mode
|
||||
tick_delay = 0.1 if self.throttle_mode else 0.01
|
||||
symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols))
|
||||
@@ -489,7 +790,8 @@ class SILOQYMainActor(Actor):
|
||||
log_interval = 500 if self.throttle_mode else 1000
|
||||
if tick_count % log_interval == 0:
|
||||
mode_indicator = "THROTTLE" if self.throttle_mode else ""
|
||||
self.log.info(f"Generated {tick_count} ticks {mode_indicator} - latest: {symbol}@{price:.2f}")
|
||||
data_source = "REAL" if self.enable_real_data else "SIM"
|
||||
self.log.info(f"Generated {tick_count} ticks {mode_indicator} {data_source} - latest: {symbol}@{price:.2f}")
|
||||
|
||||
await asyncio.sleep(tick_delay)
|
||||
except asyncio.CancelledError:
|
||||
@@ -880,7 +1182,8 @@ def test_siloqy_actors_with_nautilus_process_management():
|
||||
config={
|
||||
"component_id": "SILOQY-MAIN-ACTOR",
|
||||
"candle_interval_ms": 15 * 60 * 1000,
|
||||
"throttle_mode": True # ENABLED: Reduced tick generation
|
||||
"throttle_mode": True, # ENABLED: Reduced tick generation
|
||||
"enable_real_data": False # CHANGE TO True for real WebSocket data
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user