WIP: feat(nautilus): initial integration #1

Draft
hjnormey wants to merge 14 commits from feat/nautilus-dolphin-integration into main
4 changed files with 3637 additions and 61 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,13 +1,16 @@
import time
import sys
import numpy as np
import asyncio
import json
import math
import httpx
import websockets
from typing import Dict, List, Optional, Tuple, Any
from enum import Enum
from collections import deque
import httpx
import random # Added for _simulate_websocket_ticks
from dataclasses import dataclass, field
# Nautilus imports - following test pattern
from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig
@@ -34,6 +37,12 @@ STRUCTURED_TOPIC = "SILOQY.STRUCTURED.TICKS"
REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS"
SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED"
CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL"
# ADDED LINE 18:
TICK_SIZES_TOPIC = "SILOQY.TICK.SIZES"
# NEW: Enhanced indicator topics for data bus publishing
REGIME_INDICATORS_TOPIC = "DOLPHIN.REGIME.INDICATORS"
BB_METRICS_TOPIC = "DOLPHIN.BB.METRICS"
TEMPORAL_PATTERNS_TOPIC = "DOLPHIN.TEMPORAL.PATTERNS"
# Rate limiting constant
MIN_INTERVAL = 2.5 # seconds between API batches
@@ -44,6 +53,239 @@ class MarketRegime(Enum):
BEAR = "BEAR"
TRANSITION = "TRANSITION"
SIDEWAYS = "SIDEWAYS"
# ============================================================================
# EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE
# ============================================================================
@dataclass
class TickData:
"""Universal tick data structure from standalone"""
symbol: str
price: float
quantity: float
timestamp: int
is_buyer_maker: bool
trade_id: int
exchange: str
raw_data: Dict = None
def __post_init__(self):
if self.raw_data is None:
self.raw_data = {}
@dataclass
class ExchangeConfig:
"""Exchange-specific configuration from standalone"""
name: str
websocket_limit_per_second: int
max_streams_per_connection: int
max_connections_per_window: int
connection_window_seconds: int
base_websocket_url: str
requires_auth: bool = False
ping_interval: float = 20.0
ping_timeout: float = 10.0
class BinanceAdapter:
"""EXTRACTED FROM STANDALONE: Binance-specific WebSocket adapter"""
def __init__(self):
self.config = ExchangeConfig(
name="binance",
websocket_limit_per_second=10,
max_streams_per_connection=1024,
max_connections_per_window=300,
connection_window_seconds=300, # 5 minutes
base_websocket_url="wss://stream.binance.com:9443",
requires_auth=False
)
self.logger = Logger(f"BINANCE_ADAPTER")
def get_websocket_url(self, symbols: List[str]) -> str:
"""Build Binance WebSocket URL with combined streams"""
if len(symbols) == 1:
symbol = symbols[0].lower()
return f"{self.config.base_websocket_url}/ws/{symbol}@trade"
else:
# Combined stream approach
streams = [f"{symbol.lower()}@trade" for symbol in symbols]
stream_string = "/".join(streams)
return f"{self.config.base_websocket_url}/stream?streams={stream_string}"
def parse_message(self, message: str) -> Optional[TickData]:
"""EXTRACTED FROM STANDALONE: Parse Binance trade message"""
try:
data = json.loads(message)
# Handle combined stream format
if 'stream' in data and 'data' in data:
stream_name = data['stream']
trade_data = data['data']
else:
# Single stream format
trade_data = data
stream_name = trade_data.get('e', '')
# Only process trade events
if not (stream_name.endswith('@trade') or trade_data.get('e') == 'trade'):
return None
return TickData(
symbol=trade_data['s'],
price=float(trade_data['p']),
quantity=float(trade_data['q']),
timestamp=int(trade_data['T']),
is_buyer_maker=trade_data['m'],
trade_id=int(trade_data['t']),
exchange="binance",
raw_data=trade_data
)
except (json.JSONDecodeError, KeyError, ValueError) as e:
self.logger.error(f"Failed to parse Binance message: {e}")
return None
def calculate_optimal_distribution(self, symbols: List[str]) -> List[List[str]]:
"""EXTRACTED FROM STANDALONE: Calculate optimal symbol distribution across connections"""
# Conservative: leave room for message bursts
safety_margin = 0.7
symbols_per_connection = int(
min(
self.config.max_streams_per_connection * safety_margin,
self.config.websocket_limit_per_second * 5 # 5-second buffer
)
)
# Group symbols into chunks
symbol_groups = []
for i in range(0, len(symbols), symbols_per_connection):
group = symbols[i:i + symbols_per_connection]
symbol_groups.append(group)
return symbol_groups
class SiloqyWebSocketConnection:
"""EXTRACTED FROM STANDALONE: WebSocket connection adapted for Nautilus msgbus"""
def __init__(self, connection_id: str, symbols: List[str],
binance_adapter: BinanceAdapter, main_actor_ref):
self.connection_id = connection_id
self.symbols = symbols
self.adapter = binance_adapter
self.main_actor = main_actor_ref # Reference to SILOQYMainActor for msgbus access
self.websocket = None
self.is_running = False
self.retry_count = 0
self.messages_received = 0
self.last_message_time = 0.0
self.connected_at = None
self.logger = Logger(f"WS_{connection_id}")
self.logger.info(f"WebSocket connection initialized for {len(symbols)} symbols")
async def start(self) -> None:
"""Start the WebSocket connection"""
self.is_running = True
while self.is_running:
try:
await self._connect_and_run()
except Exception as e:
self.logger.error(f"Connection {self.connection_id} error: {e}")
if self.is_running:
await self._handle_reconnection()
else:
break
async def _connect_and_run(self) -> None:
"""EXTRACTED FROM STANDALONE: Connect to WebSocket and run message loop"""
url = self.adapter.get_websocket_url(self.symbols)
self.logger.info(f"Connecting to Binance: {url[:100]}...")
async with websockets.connect(
url,
ping_interval=self.adapter.config.ping_interval,
ping_timeout=self.adapter.config.ping_timeout,
close_timeout=10
) as websocket:
self.websocket = websocket
self.connected_at = time.time()
self.retry_count = 0
self.logger.info(f"Connection {self.connection_id} established")
# Message processing loop
async for message in websocket:
if not self.is_running:
break
try:
await self._process_message(message)
except Exception as e:
self.logger.error(f"Message processing error: {e}")
async def _process_message(self, message: str) -> None:
"""Process incoming WebSocket message and convert to Nautilus format"""
self.messages_received += 1
self.last_message_time = time.time()
# Parse message using Binance adapter
tick_data = self.adapter.parse_message(message)
if tick_data:
# Convert TickData to SiloqyTradeTick and publish via Nautilus msgbus
try:
# Create SiloqyTradeTick object
siloqy_tick = SiloqyTradeTick(
instrument_id=tick_data.symbol,
price=round(tick_data.price, 8),
size=round(tick_data.quantity, 8),
ts_event=tick_data.timestamp * 1000000, # Convert ms to ns
ts_init=int(time.time_ns()),
open_price=None, # Will be set by candle logic
candle_open_time=None, # Will be set by candle logic
tick_side=None,
exchange="BINANCE",
liquidity_flag=None
)
# Get candle information from main actor
if hasattr(self.main_actor, 'active_candles') and self.main_actor.active_candles:
symbol = tick_data.symbol
if symbol in self.main_actor.active_candles:
candle = self.main_actor.active_candles[symbol]
siloqy_tick.open_price = candle.get('open_price', tick_data.price)
siloqy_tick.candle_open_time = candle.get('open_time', int(time.time() * 1000))
# Process through main actor's tick handling (preserves all candle logic)
self.main_actor.on_websocket_tick(
tick_data.symbol,
tick_data.price,
tick_data.quantity,
tick_data.timestamp
)
except Exception as e:
self.logger.error(f"Failed to process tick: {e}")
async def _handle_reconnection(self) -> None:
"""Handle reconnection with exponential backoff"""
self.retry_count += 1
delay = min(2 ** self.retry_count, 60) # Max 60 seconds
self.logger.warning(f"Reconnecting in {delay}s (attempt {self.retry_count})")
await asyncio.sleep(delay)
async def stop(self) -> None:
"""Stop the connection"""
self.logger.info(f"Stopping connection {self.connection_id}")
self.is_running = False
if self.websocket:
await self.websocket.close()
# --------------------------------------------------------------------
# SILOQY Custom Tick - PRESERVED with fixes
@@ -95,10 +337,11 @@ class SILOQYSymbolDiscoveryConfig(ActorConfig):
class SILOQYMainActorConfig(ActorConfig):
candle_interval_ms: int = 15 * 60 * 1000
throttle_mode: bool = False
enable_real_data: bool = False # NEW: Enable real WebSocket data
class DOLPHINRegimeActorConfig(ActorConfig):
max_symbols: int = 5000
ticks_per_analysis: int = 1000
ticks_per_analysis: int = 10
class SILOQYNormalizerConfig(ActorConfig):
pass
@@ -115,6 +358,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
self.symbols = list(config.symbols) if config.symbols else []
self.candle_interval_ms = config.candle_interval_ms
self.active_candles = {}
self.tick_sizes = {}
# Process management configuration
self.throttle_mode = config.throttle_mode
@@ -174,18 +418,84 @@ class SILOQYSymbolDiscoveryActor(Actor):
self.log.info("Starting dynamic symbol discovery from Binance...")
url = "https://api.binance.com/api/v3/exchangeInfo"
async with httpx.AsyncClient() as client:
# RATE LIMIT CHECK - Before anything else
self.log.info("Checking Binance API rate limit status...")
time_check_url = "https://api.binance.com/api/v3/time"
try:
rate_check_response = await client.get(time_check_url, timeout=5)
if rate_check_response.status_code == 200:
# Parse rate limit headers
headers = rate_check_response.headers
used_weight = headers.get('X-MBX-USED-WEIGHT-1M', 'Unknown')
server_time = rate_check_response.json().get('serverTime', 'Unknown')
self.log.info(f"Rate limit check passed - Used weight: {used_weight}/1200, Server time: {server_time}")
# Check if we're close to rate limit
if used_weight != 'Unknown' and int(used_weight) > 1000:
self.log.warning(f"HIGH RATE LIMIT USAGE: {used_weight}/1200 - Proceeding with caution")
elif rate_check_response.status_code == 429:
retry_after = rate_check_response.headers.get('Retry-After', '60')
self.log.error(f"RATE LIMITED: Must wait {retry_after} seconds before API calls")
raise Exception(f"Binance API rate limited - retry after {retry_after}s")
elif rate_check_response.status_code == 418:
self.log.error("IP BANNED: This IP address has been auto-banned by Binance")
raise Exception("IP address banned by Binance - cannot proceed")
else:
self.log.warning(f"Rate limit check returned status {rate_check_response.status_code}")
self.log.warning("Proceeding anyway, but may encounter issues")
except Exception as e:
if "rate limited" in str(e).lower() or "banned" in str(e).lower():
raise # Re-raise rate limit/ban errors
else:
self.log.warning(f"Rate limit check failed: {e}")
self.log.warning("Proceeding with symbol discovery anyway")
async with httpx.AsyncClient() as client:
self.log.info("Fetching exchange info from Binance API...")
response = await client.get(url, timeout=10)
if response.status_code == 200:
self.log.info("Successfully received exchange info")
data = response.json()
# Get all trading symbols (USDT pairs for example)
full_symbols = [
s['symbol'] for s in data['symbols']
if s['status'] == 'TRADING' and s['symbol'].endswith('USDT')
]
# Combined symbol discovery and tick size extraction
self.log.info("Processing symbols and extracting tick sizes...")
full_symbols = []
for symbol_info in data['symbols']:
if symbol_info['status'] == 'TRADING' and symbol_info['symbol'].endswith('USDT'):
symbol = symbol_info['symbol']
full_symbols.append(symbol)
# Extract tick size while processing # Extract tick size while processing
tick_size = None
for filter_info in symbol_info['filters']:
if filter_info['filterType'] == 'PRICE_FILTER':
tick_size = float(filter_info['tickSize'])
break
# If no PRICE_FILTER found, try other filter types
if tick_size is None:
for filter_info in symbol_info['filters']:
if filter_info['filterType'] == 'TICK_SIZE':
tick_size = float(filter_info['tickSize'])
break
# Fallback to default if still not found
if tick_size is None:
tick_size = 1e-8 # Default fallback
self.log.warning(f"No tick size found for {symbol}, using fallback {tick_size}")
self.tick_sizes[symbol] = tick_size
self.log.info(f"Processed {len(full_symbols)} symbols, extracted {len(self.tick_sizes)} tick sizes")
# Apply throttle mode symbol limiting
if self.throttle_mode:
self.symbols = full_symbols[:self.max_symbols_throttled]
@@ -200,6 +510,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
raise Exception(f"Failed to fetch exchange info: {response.status_code}")
async def _fetch_stats_and_reconstruct_candles(self):
"""PRESERVED: All original rate limiting with Nautilus async patterns"""
url = "https://api.binance.com/api/v3/ticker/24hr"
klines_url = "https://api.binance.com/api/v3/klines"
ticker_url = "https://api.binance.com/api/v3/ticker/price"
@@ -315,7 +626,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
except Exception as e:
self.log.error(f"Request failed: {str(e)}")
# PRESERVED: Original rate limiting
# PRESERVED: Original rate limiting with Nautilus async
elapsed = time.time() - start_time
if i + 50 < len(self.symbols):
sleep_time = max(0, rate_limit_interval - elapsed)
@@ -337,8 +648,10 @@ class SILOQYSymbolDiscoveryActor(Actor):
# Publish symbols and candles as tuples
self.msgbus.publish(SYMBOLS_DISCOVERED_TOPIC, (self.symbols, int(time.time_ns())))
self.msgbus.publish(CANDLES_INITIAL_TOPIC, (self.active_candles, int(time.time_ns())))
self.msgbus.publish(TICK_SIZES_TOPIC, (self.tick_sizes, int(time.time_ns())))
self.log.info(f"Nautilus ActorExecutor: Published {len(self.symbols)} symbols and {len(self.active_candles)} candles")
self.log.info(f"Nautilus ActorExecutor: Published {len(self.tick_sizes)} tick sizes")
self.log.info("Nautilus ActorExecutor: Discovery phase complete - other actors can now start processing")
else:
self.log.warning("Nautilus ActorExecutor: msgbus not available for publishing")
@@ -353,6 +666,7 @@ class SILOQYMainActor(Actor):
# Preserve original configuration
self.candle_interval_ms = config.candle_interval_ms
self.throttle_mode = config.throttle_mode
self.enable_real_data = config.enable_real_data # NEW: Real data capability
self.connections = {}
self.connection_tasks = {}
@@ -360,6 +674,11 @@ class SILOQYMainActor(Actor):
self.symbols = []
self.active_candles = {}
# WebSocket infrastructure - NEW
self.binance_adapter = BinanceAdapter() if self.enable_real_data else None
self.websocket_connections: Dict[str, SiloqyWebSocketConnection] = {}
self.websocket_tasks: Dict[str, asyncio.Task] = {}
# WebSocket tasks - managed by Nautilus ActorExecutor
self.ws_tasks = []
@@ -369,6 +688,11 @@ class SILOQYMainActor(Actor):
if self.throttle_mode:
self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation")
if self.enable_real_data:
self.log.info("REAL DATA MODE: Will connect to Binance WebSocket streams")
else:
self.log.info("SIMULATION MODE: Will generate simulated ticks")
self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor")
def on_start(self) -> None:
@@ -393,6 +717,12 @@ class SILOQYMainActor(Actor):
def on_stop(self) -> None:
"""Stop the actor - Nautilus handles executor cleanup"""
self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping")
# Stop WebSocket connections if running
if self.enable_real_data:
for conn in self.websocket_connections.values():
asyncio.create_task(conn.stop())
# Nautilus kernel handles executor shutdown - no manual cleanup needed
async def on_start_async(self) -> None:
@@ -401,8 +731,11 @@ class SILOQYMainActor(Actor):
self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...")
await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0)
# PRESERVED: Start WebSocket connections
await self._start_websocket_connections()
# ENHANCED: Start real or simulated connections based on configuration
if self.enable_real_data:
await self._start_real_websocket_connections()
else:
await self._start_simulated_connections()
self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully")
@@ -423,7 +756,6 @@ class SILOQYMainActor(Actor):
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}")
# FIXED: SILOQYMainActor.handle_candles_initial
def handle_candles_initial(self, data):
"""Handle initial candles from discovery actor and properly initialize the candle dict."""
try:
@@ -448,8 +780,53 @@ class SILOQYMainActor(Actor):
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}")
async def _start_websocket_connections(self):
"""PRESERVED: Original WebSocket connection logic with Nautilus ActorExecutor"""
async def _start_real_websocket_connections(self):
"""NEW: Start real Binance WebSocket connections with throttle control"""
self.log.info("Starting REAL Binance WebSocket connections for live data...")
# Apply throttle mode limits to WebSocket connections
symbols_to_use = self.symbols
if self.throttle_mode:
symbols_to_use = self.symbols[:10] # Limit to 10 symbols in throttle mode
self.log.warning(f"THROTTLE MODE: Limited real data to {len(symbols_to_use)} symbols")
# Calculate optimal symbol distribution
symbol_groups = self.binance_adapter.calculate_optimal_distribution(symbols_to_use)
self.log.info(f"Real WebSocket distribution:")
self.log.info(f" - Total symbols: {len(symbols_to_use)}")
self.log.info(f" - Connections needed: {len(symbol_groups)}")
# Start WebSocket connections
for i, symbol_group in enumerate(symbol_groups):
connection_id = f"binance_real_{i:03d}"
connection = SiloqyWebSocketConnection(
connection_id=connection_id,
symbols=symbol_group,
binance_adapter=self.binance_adapter,
main_actor_ref=self
)
self.websocket_connections[connection_id] = connection
# Use Nautilus ActorExecutor for WebSocket task management
if hasattr(self, '_executor') and self._executor:
future = self._executor.submit(connection.start())
self.log.info(f"Real WebSocket {connection_id} submitted to ActorExecutor")
else:
task = asyncio.create_task(connection.start())
self.websocket_tasks[connection_id] = task
self.log.warning(f"Real WebSocket {connection_id} started with asyncio fallback")
# Rate limit connection establishment
if i < len(symbol_groups) - 1:
await asyncio.sleep(1.0) # 1 second between connections
self.log.info(f"Started {len(symbol_groups)} real WebSocket connections")
async def _start_simulated_connections(self):
"""PRESERVED: Original WebSocket simulation logic with Nautilus ActorExecutor"""
self.log.info("Starting WebSocket simulation for tick generation...")
self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols")
@@ -464,6 +841,7 @@ class SILOQYMainActor(Actor):
self.ws_tasks.append(task)
async def _simulate_websocket_ticks(self):
"""PRESERVED: Original simulation logic"""
# Adjust tick rate for throttle mode
tick_delay = 0.1 if self.throttle_mode else 0.01
symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols))
@@ -489,7 +867,8 @@ class SILOQYMainActor(Actor):
log_interval = 500 if self.throttle_mode else 1000
if tick_count % log_interval == 0:
mode_indicator = "THROTTLE" if self.throttle_mode else ""
self.log.info(f"Generated {tick_count} ticks {mode_indicator} - latest: {symbol}@{price:.2f}")
data_source = "REAL" if self.enable_real_data else "SIM"
self.log.info(f"Generated {tick_count} ticks {mode_indicator} {data_source} - latest: {symbol}@{price:.2f}")
await asyncio.sleep(tick_delay)
except asyncio.CancelledError:
@@ -549,6 +928,8 @@ class SILOQYMainActor(Actor):
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to publish tick: {e}")
self.log.error(f"{tick_tuple}")
sys.exit(2)
class DOLPHINRegimeActor(Actor):
def __init__(self, config: DOLPHINRegimeActorConfig):
@@ -565,6 +946,7 @@ class DOLPHINRegimeActor(Actor):
self.low_prices = np.zeros(self.max_symbols, dtype=np.float64)
self.volumes = np.zeros(self.max_symbols, dtype=np.float64)
self.last_update = np.zeros(self.max_symbols, dtype=np.int64)
self.tick_sizes = np.full(self.max_symbols, 1e-8, dtype=np.float64) # Default fallback
# PRESERVED: All original mapping and state
self.symbol_to_idx = {}
@@ -585,25 +967,76 @@ class DOLPHINRegimeActor(Actor):
self.processed_ticks = 0
self.regime_calculations = 0
# NEW: Enhanced indicator tracking for BB and temporal patterns
self.signal_history = deque(maxlen=100) # For BB calculations
self.bb_period = 20 # BB calculation period
self.bb_std_dev = 2.0 # BB standard deviation multiplier
self.velocity_history = deque(maxlen=10) # For regime velocity tracking
self.confidence_history = deque(maxlen=20) # For confidence trend analysis
self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, "
f"ticks_per_analysis: {self.ticks_per_analysis}")
def on_start(self) -> None:
"""Subscribe to tick events - using Nautilus ActorExecutor"""
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to tick events")
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to events")
if hasattr(self, 'msgbus') and self.msgbus:
self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick)
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events")
self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered)
self.msgbus.subscribe(TICK_SIZES_TOPIC, self.handle_tick_sizes)
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events, symbol discovery and tick sizes")
def on_stop(self) -> None:
"""Stop the actor - Nautilus handles executor cleanup"""
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping")
# Nautilus kernel handles executor shutdown - no manual cleanup needed
def handle_symbols_discovered(self, data):
"""Pre-initialize symbol mappings during discovery phase"""
try:
symbols, timestamp = data
self.log.info(f"Nautilus ActorExecutor: Pre-initializing {len(symbols)} symbol mappings")
for symbol in symbols:
if self.active_symbols >= self.max_symbols:
self.log.warning(f"Max symbols ({self.max_symbols}) exceeded during initialization")
break
idx = self.active_symbols
self.symbol_to_idx[symbol] = idx
self.idx_to_symbol[idx] = symbol
self.active_symbols += 1
self.log.info(f"Pre-initialized {self.active_symbols} symbol mappings")
except Exception as e:
self.log.error(f"Error pre-initializing symbols: {e}")
def handle_tick_sizes(self, data):
"""Apply tick sizes to pre-initialized symbol mappings"""
try:
tick_sizes, timestamp = data
self.log.info(f"Nautilus ActorExecutor: Received {len(tick_sizes)} tick sizes from discovery actor")
applied_count = 0
for symbol, tick_size in tick_sizes.items():
if symbol in self.symbol_to_idx: # Will exist from pre-initialization
idx = self.symbol_to_idx[symbol]
if 0 < tick_size <= 1.0:
self.tick_sizes[idx] = tick_size
applied_count += 1
else:
self.log.warning(f"Invalid tick size {tick_size} for {symbol}, using fallback")
self.log.info(f"Nautilus ActorExecutor: Applied {applied_count} tick sizes to pre-initialized symbols")
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Error handling tick sizes: {e}")
def handle_raw_tick(self, data):
"""
PRESERVED EXACTLY: All original zero-allocation tick processing
SIMPLIFIED: Zero-allocation tick processing with pre-initialized symbols
Using Nautilus ActorExecutor for regime detection tasks
"""
try:
@@ -613,41 +1046,28 @@ class DOLPHINRegimeActor(Actor):
self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}")
return
# PRESERVED EXACTLY: All original array processing logic
# Direct lookup - symbol will exist from pre-initialization
if symbol not in self.symbol_to_idx:
if self.active_symbols >= self.max_symbols:
self.log.error(f"Nautilus ActorExecutor: Max symbols ({self.max_symbols}) exceeded")
return
self.log.error(f"Nautilus ActorExecutor: Symbol {symbol} not found in pre-initialized mappings")
return
idx = self.active_symbols
self.symbol_to_idx[symbol] = idx
self.idx_to_symbol[idx] = symbol
self.active_symbols += 1
# Initialize arrays
idx = self.symbol_to_idx[symbol]
# Check if new candle period
if candle_open_time > self.last_update[idx]:
# Reset for new candle
self.open_prices[idx] = open_price
self.high_prices[idx] = price
self.low_prices[idx] = price
self.close_prices[idx] = price
self.volumes[idx] = 0.0
self.volumes[idx] = size
self.last_update[idx] = candle_open_time
else:
idx = self.symbol_to_idx[symbol]
# Check if new candle period
if candle_open_time > self.last_update[idx]:
# Reset for new candle
self.open_prices[idx] = open_price
self.high_prices[idx] = price
self.low_prices[idx] = price
self.close_prices[idx] = price
self.volumes[idx] = size
self.last_update[idx] = candle_open_time
else:
# Update existing candle data
self.close_prices[idx] = price
self.high_prices[idx] = max(self.high_prices[idx], price)
self.low_prices[idx] = min(self.low_prices[idx], price)
self.volumes[idx] += size
# Update existing candle data
self.close_prices[idx] = price
self.high_prices[idx] = max(self.high_prices[idx], price)
self.low_prices[idx] = min(self.low_prices[idx], price)
self.volumes[idx] += size
self.tick_count += 1
self.processed_ticks += 1
@@ -675,6 +1095,101 @@ class DOLPHINRegimeActor(Actor):
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}")
def _calculate_enhanced_indicators(self, bull_ratio, bear_ratio, confidence, analyzed, total_symbols):
"""NEW: Calculate enhanced indicators including BB metrics and temporal patterns"""
# Calculate regime momentum signal
base_momentum = (bull_ratio - bear_ratio) * 100 # -100 to +100
sample_quality = min(analyzed / total_symbols, 1.0) if total_symbols > 0 else 0.0
signal = base_momentum * confidence * sample_quality
# Add to signal history
self.signal_history.append(signal)
# Calculate velocity (rate of change in signal)
velocity = 0.0
if len(self.signal_history) >= 2:
velocity = self.signal_history[-1] - self.signal_history[-2]
self.velocity_history.append(velocity)
# Store confidence for trending
self.confidence_history.append(confidence)
# Calculate Bollinger Bands if enough history
bb_metrics = {}
if len(self.signal_history) >= self.bb_period:
recent_signals = list(self.signal_history)[-self.bb_period:]
sma = sum(recent_signals) / len(recent_signals)
# Calculate standard deviation
variance = sum((x - sma) ** 2 for x in recent_signals) / len(recent_signals)
std_dev = variance ** 0.5
upper_band = sma + (self.bb_std_dev * std_dev)
lower_band = sma - (self.bb_std_dev * std_dev)
# Position within BBs (mean reversion interpretation)
if signal > upper_band:
bb_position = 'ABOVE_UPPER'
elif signal < lower_band:
bb_position = 'BELOW_LOWER'
elif signal >= sma:
bb_position = 'UPPER_HALF'
else:
bb_position = 'LOWER_HALF'
# Momentum persistence interpretation
if signal > upper_band:
momentum_signal = 'STRONG_BULL_BREAKOUT'
elif signal < lower_band:
momentum_signal = 'STRONG_BEAR_BREAKOUT'
elif signal > sma:
momentum_signal = 'MILD_BULLISH'
else:
momentum_signal = 'MILD_BEARISH'
bb_metrics = {
'signal': signal,
'sma': sma,
'upper_band': upper_band,
'lower_band': lower_band,
'bb_position': bb_position,
'momentum_signal': momentum_signal,
'bb_ready': True
}
else:
bb_metrics = {
'signal': signal,
'sma': None,
'upper_band': None,
'lower_band': None,
'bb_position': 'INSUFFICIENT_DATA',
'momentum_signal': 'INSUFFICIENT_DATA',
'bb_ready': False
}
# Calculate temporal patterns
temporal_metrics = {}
if len(self.velocity_history) >= 3:
avg_velocity = sum(self.velocity_history) / len(self.velocity_history)
velocity_trend = 'ACCELERATING' if velocity > avg_velocity else 'DECELERATING'
else:
avg_velocity = velocity
velocity_trend = 'BUILDING_HISTORY'
if len(self.confidence_history) >= 5:
confidence_trend = 'RISING' if confidence > sum(self.confidence_history[-5:-1])/4 else 'FALLING'
else:
confidence_trend = 'BUILDING_HISTORY'
temporal_metrics = {
'velocity': velocity,
'avg_velocity': avg_velocity,
'velocity_trend': velocity_trend,
'confidence_trend': confidence_trend
}
return bb_metrics, temporal_metrics
def _run_regime_detection(self):
self.regime_calculations += 1
@@ -686,6 +1201,9 @@ class DOLPHINRegimeActor(Actor):
bullish = 0
bearish = 0
# NEW: Track pattern of bullish/bearish symbols for this calculation
symbol_pattern = []
# PRESERVED: Original analysis with exact thresholds
for idx in range(self.active_symbols):
open_price = self.open_prices[idx]
@@ -696,13 +1214,26 @@ class DOLPHINRegimeActor(Actor):
analyzed += 1
# PRESERVED: EXACT DOLPHIN thresholds
change = (close_price - open_price) / open_price
# NEW: HFT-grade tick-size based comparison
tick_size = self.tick_sizes[idx]
equality_threshold = tick_size / 2 # Half tick size standard
price_diff = abs(close_price - open_price)
if change >= 0.0015: # 0.15% threshold for bullish
# Check if prices are effectively equal within tick size tolerance
if price_diff <= equality_threshold:
# Prices are effectively equal (within tick size tolerance)
symbol_pattern.append(f"S{close_price:.8f}={open_price:.8f}")
elif close_price > open_price:
# Bullish: close > open
bullish += 1
elif change <= -0.0015: # -0.15% threshold for bearish
# Arrow points to close (larger price)
symbol_pattern.append(f"B{open_price:.8f}->{close_price:.8f}")
else:
# Bearish: close < open
bearish += 1
# Arrow points to open (larger price)
symbol_pattern.append(f"X{close_price:.8f}<-{open_price:.8f}")
if analyzed == 0:
return
@@ -724,6 +1255,11 @@ class DOLPHINRegimeActor(Actor):
# PRESERVED: Original confidence calculation
confidence = self._calculate_confidence(bull_ratio, bear_ratio, analyzed, total_symbols)
# NEW: Calculate enhanced indicators
bb_metrics, temporal_metrics = self._calculate_enhanced_indicators(
bull_ratio, bear_ratio, confidence, analyzed, total_symbols
)
self.previous_bull_ratio = bull_ratio
# Publish regime result using Nautilus message bus
@@ -742,20 +1278,87 @@ class DOLPHINRegimeActor(Actor):
self.msgbus.publish(REGIME_TOPIC, regime_tuple)
# NEW: Publish enhanced indicators to data bus
indicator_data = {
'timestamp': int(time.time() * 1000),
'regime_momentum_signal': bb_metrics['signal'],
'bb_ready': bb_metrics['bb_ready'],
'velocity': temporal_metrics['velocity'],
'velocity_trend': temporal_metrics['velocity_trend'],
'confidence_trend': temporal_metrics['confidence_trend']
}
self.msgbus.publish(REGIME_INDICATORS_TOPIC, indicator_data)
# Publish BB metrics separately for specialized consumers
if bb_metrics['bb_ready']:
bb_data = {
'timestamp': int(time.time() * 1000),
'signal': bb_metrics['signal'],
'sma': bb_metrics['sma'],
'upper_band': bb_metrics['upper_band'],
'lower_band': bb_metrics['lower_band'],
'bb_position': bb_metrics['bb_position'],
'momentum_signal': bb_metrics['momentum_signal']
}
self.msgbus.publish(BB_METRICS_TOPIC, bb_data)
# Publish temporal patterns
temporal_data = {
'timestamp': int(time.time() * 1000),
'velocity': temporal_metrics['velocity'],
'avg_velocity': temporal_metrics['avg_velocity'],
'velocity_trend': temporal_metrics['velocity_trend'],
'confidence_trend': temporal_metrics['confidence_trend'],
'signal_history_length': len(self.signal_history)
}
self.msgbus.publish(TEMPORAL_PATTERNS_TOPIC, temporal_data)
except Exception as e:
self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}")
# Log regime changes
if not self.regime_history or regime != self.regime_history[-1]:
self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.1%} "
f"Bear: {bear_ratio:.1%} Sideways: {sideways_ratio:.1%} | "
f"Confidence: {confidence:.1%} | Analyzed: {analyzed}/{total_symbols}")
self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.2%} "
f"Bear: {bear_ratio:.2%} Sideways: {sideways_ratio:.2%} ({bullish}/{bearish}) | "
f"Confidence: {confidence:.2%} | Analyzed: {analyzed}/{total_symbols}")
self.regime_history.append(regime)
# Periodic regime status (even without changes)
if self.regime_calculations % 10 == 0: # Every 10 calculations
self.log.info(f"REGIME STATUS: {regime.value} | Bull: {bull_ratio:.1%} "
f"Bear: {bear_ratio:.1%} | Processed: {self.processed_ticks} ticks")
if self.regime_calculations % 50 == 0: # Every second, approx, given avg. tick rate
# Determine color based on regime
if regime == MarketRegime.BULL:
color_code = "\033[92m" # Green
elif regime == MarketRegime.BEAR:
color_code = "\033[91m" # Red
else: # SIDEWAYS
color_code = "\033[93m" # Yellow
# Reset color code
reset_code = "\033[0m"
self.log.info(f"{color_code}REGIME STATUS: {regime.value} | Bull: {bull_ratio:.2%} "
f"Bear: {bear_ratio:.2%} ({bullish}/{bearish}) | Processed: {self.processed_ticks} ticks{reset_code}")
# NEW: Enhanced indicator line after regime status
if bb_metrics['bb_ready']:
self.log.info(f"{color_code}INDICATORS: Signal:{bb_metrics['signal']:.1f} | "
f"SMA:{bb_metrics['sma']:.1f} | Upper:{bb_metrics['upper_band']:.1f} | "
f"Lower:{bb_metrics['lower_band']:.1f} | Pos:{bb_metrics['bb_position']} | "
f"Mom:{bb_metrics['momentum_signal']} | Vel:{temporal_metrics['velocity']:.1f} | "
f"VelTrend:{temporal_metrics['velocity_trend']} | ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}")
else:
self.log.info(f"{color_code}INDICATORS: Signal:{bb_metrics['signal']:.1f} | "
f"Status:BUILDING_BB_HISTORY ({len(self.signal_history)}/{self.bb_period}) | "
f"Vel:{temporal_metrics['velocity']:.1f} | VelTrend:{temporal_metrics['velocity_trend']} | "
f"ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}")
# NEW: Log symbol pattern and counts
if symbol_pattern: # Only if we have symbols to show
pattern_str = " ".join(symbol_pattern) + " " # Create pattern with spaces
bull_count = sum(1 for s in symbol_pattern if s.startswith("B"))
bear_count = sum(1 for s in symbol_pattern if s.startswith("X"))
self.log.debug(f"{pattern_str} and totals: BULLS:{bull_count}/BEARS:{bear_count}")
def _calculate_confidence(self, bull_ratio: float, bear_ratio: float,
analyzed: int, total: int) -> float:
@@ -870,7 +1473,7 @@ def test_siloqy_actors_with_nautilus_process_management():
"candle_interval_ms": 15 * 60 * 1000,
"throttle_mode": True, # ENABLED: Safe for dual instance testing
"throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s)
"max_symbols_throttled": 100 # Only 100 symbols (vs 2000+)
"max_symbols_throttled": 414 # Only 100 symbols (vs 2000+)
}
)
@@ -880,7 +1483,8 @@ def test_siloqy_actors_with_nautilus_process_management():
config={
"component_id": "SILOQY-MAIN-ACTOR",
"candle_interval_ms": 15 * 60 * 1000,
"throttle_mode": True # ENABLED: Reduced tick generation
"throttle_mode": False, # ENABLED: Reduced tick generation
"enable_real_data": True # CHANGE TO True for real WebSocket data
}
)
@@ -890,7 +1494,7 @@ def test_siloqy_actors_with_nautilus_process_management():
config={
"component_id": "DOLPHIN-REGIME-ACTOR",
"max_symbols": 5000,
"ticks_per_analysis": 500 # Reduced for throttle mode testing
"ticks_per_analysis": 2 # Reduced for throttle mode testing
}
)
@@ -915,8 +1519,8 @@ def test_siloqy_actors_with_nautilus_process_management():
exec_clients={} # No execution clients for this test
)
node = TradingNode(config=trading_config)
node = TradingNode(config=trading_config)
try:
node.build()
print("Node built successfully with Nautilus built-in process management")

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,24 @@
2025-09-01T19:54:58.150165200Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 75.0% Bear: 25.0% | Processed: 1374 ticks
2025-09-01T19:54:58.150201100Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.45000000<-109263.08000000 B2.77130000->2.77360000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 and totals: BULLS:6/BEARS:2
2025-09-01T19:54:58.660990300Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: Nautilus ActorExecutor: DOLPHIN metrics - ticks: 1375, regime_calcs: 687, active_symbols: 8
2025-09-01T19:54:58.661042700Z [INFO] TRADER-000.SILOQY-NORMALIZER: Nautilus ActorExecutor: Normalizer processed: 1375 ticks
2025-09-01T19:54:58.730072100Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 75.0% Bear: 25.0% | Processed: 1376 ticks
2025-09-01T19:54:58.730093900Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.45000000<-109263.08000000 B2.77130000->2.77360000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 and totals: BULLS:6/BEARS:2
2025-09-01T19:54:59.455057400Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 75.0% Bear: 25.0% | Processed: 1378 ticks
2025-09-01T19:54:59.455081700Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.45000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 and totals: BULLS:6/BEARS:2
2025-09-01T19:54:59.568990700Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1380 ticks
2025-09-01T19:54:59.569016900Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.45000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 B2.58800000->2.59000000 and totals: BULLS:7/BEARS:2
2025-09-01T19:54:59.666864100Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: Nautilus ActorExecutor: DOLPHIN metrics - ticks: 1381, regime_calcs: 690, active_symbols: 9
2025-09-01T19:54:59.666902500Z [INFO] TRADER-000.SILOQY-NORMALIZER: Nautilus ActorExecutor: Normalizer processed: 1381 ticks
2025-09-01T19:54:59.726017700Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1382 ticks
2025-09-01T19:54:59.726051700Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.44000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 B2.58800000->2.59000000 and totals: BULLS:7/BEARS:2
2025-09-01T19:54:59.999524400Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1384 ticks
2025-09-01T19:54:59.999567100Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.44000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 B2.58800000->2.59000000 and totals: BULLS:7/BEARS:2
2025-09-01T19:54:59.999803000Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1386 ticks
2025-09-01T19:54:59.999815800Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.44000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 B2.58800000->2.59000000 and totals: BULLS:7/BEARS:2
2025-09-01T19:55:00.683433300Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1388 ticks
In this log, at the "transition" between 75.x% and 77.x% you will find one of the symbols being considered in the DOLPHIN market-regime detection is missing totals go from 6/2 to 7/2 ...
... so *one less symbol* is being considered.-
Most likely *no tick has been produced for that symbols* (is this a correct assumption?).-