Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 45f6e4da1c | |||
| 9281ad0da8 | |||
| 6d12eb9497 | |||
| ba220f2380 | |||
| fde38cbef1 | |||
| 4748a7ba62 | |||
| 4d8ec0ddc6 | |||
| 2902dd0791 | |||
| af0bfbe100 | |||
| c9cffb4b53 | |||
| 0f9a4a6f9c | |||
| 8b59e0ed4e | |||
| b5cd68263c | |||
| 9f1dbffc2a |
1399
SILOQY_DOLPHIN_Integration_Engineering_Spect_1.0.md
Normal file
1399
SILOQY_DOLPHIN_Integration_Engineering_Spect_1.0.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,13 +1,16 @@
|
|||||||
import time
|
import time
|
||||||
|
import sys
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import math
|
import math
|
||||||
|
import httpx
|
||||||
|
import websockets
|
||||||
from typing import Dict, List, Optional, Tuple, Any
|
from typing import Dict, List, Optional, Tuple, Any
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from collections import deque
|
from collections import deque
|
||||||
import httpx
|
|
||||||
import random # Added for _simulate_websocket_ticks
|
import random # Added for _simulate_websocket_ticks
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
# Nautilus imports - following test pattern
|
# Nautilus imports - following test pattern
|
||||||
from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig
|
from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig
|
||||||
@@ -34,6 +37,12 @@ STRUCTURED_TOPIC = "SILOQY.STRUCTURED.TICKS"
|
|||||||
REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS"
|
REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS"
|
||||||
SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED"
|
SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED"
|
||||||
CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL"
|
CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL"
|
||||||
|
# ADDED LINE 18:
|
||||||
|
TICK_SIZES_TOPIC = "SILOQY.TICK.SIZES"
|
||||||
|
# NEW: Enhanced indicator topics for data bus publishing
|
||||||
|
REGIME_INDICATORS_TOPIC = "DOLPHIN.REGIME.INDICATORS"
|
||||||
|
BB_METRICS_TOPIC = "DOLPHIN.BB.METRICS"
|
||||||
|
TEMPORAL_PATTERNS_TOPIC = "DOLPHIN.TEMPORAL.PATTERNS"
|
||||||
|
|
||||||
# Rate limiting constant
|
# Rate limiting constant
|
||||||
MIN_INTERVAL = 2.5 # seconds between API batches
|
MIN_INTERVAL = 2.5 # seconds between API batches
|
||||||
@@ -44,6 +53,239 @@ class MarketRegime(Enum):
|
|||||||
BEAR = "BEAR"
|
BEAR = "BEAR"
|
||||||
TRANSITION = "TRANSITION"
|
TRANSITION = "TRANSITION"
|
||||||
SIDEWAYS = "SIDEWAYS"
|
SIDEWAYS = "SIDEWAYS"
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TickData:
|
||||||
|
"""Universal tick data structure from standalone"""
|
||||||
|
symbol: str
|
||||||
|
price: float
|
||||||
|
quantity: float
|
||||||
|
timestamp: int
|
||||||
|
is_buyer_maker: bool
|
||||||
|
trade_id: int
|
||||||
|
exchange: str
|
||||||
|
raw_data: Dict = None
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if self.raw_data is None:
|
||||||
|
self.raw_data = {}
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ExchangeConfig:
|
||||||
|
"""Exchange-specific configuration from standalone"""
|
||||||
|
name: str
|
||||||
|
websocket_limit_per_second: int
|
||||||
|
max_streams_per_connection: int
|
||||||
|
max_connections_per_window: int
|
||||||
|
connection_window_seconds: int
|
||||||
|
base_websocket_url: str
|
||||||
|
requires_auth: bool = False
|
||||||
|
ping_interval: float = 20.0
|
||||||
|
ping_timeout: float = 10.0
|
||||||
|
|
||||||
|
class BinanceAdapter:
|
||||||
|
"""EXTRACTED FROM STANDALONE: Binance-specific WebSocket adapter"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.config = ExchangeConfig(
|
||||||
|
name="binance",
|
||||||
|
websocket_limit_per_second=10,
|
||||||
|
max_streams_per_connection=1024,
|
||||||
|
max_connections_per_window=300,
|
||||||
|
connection_window_seconds=300, # 5 minutes
|
||||||
|
base_websocket_url="wss://stream.binance.com:9443",
|
||||||
|
requires_auth=False
|
||||||
|
)
|
||||||
|
self.logger = Logger(f"BINANCE_ADAPTER")
|
||||||
|
|
||||||
|
def get_websocket_url(self, symbols: List[str]) -> str:
|
||||||
|
"""Build Binance WebSocket URL with combined streams"""
|
||||||
|
if len(symbols) == 1:
|
||||||
|
symbol = symbols[0].lower()
|
||||||
|
return f"{self.config.base_websocket_url}/ws/{symbol}@trade"
|
||||||
|
else:
|
||||||
|
# Combined stream approach
|
||||||
|
streams = [f"{symbol.lower()}@trade" for symbol in symbols]
|
||||||
|
stream_string = "/".join(streams)
|
||||||
|
return f"{self.config.base_websocket_url}/stream?streams={stream_string}"
|
||||||
|
|
||||||
|
def parse_message(self, message: str) -> Optional[TickData]:
|
||||||
|
"""EXTRACTED FROM STANDALONE: Parse Binance trade message"""
|
||||||
|
try:
|
||||||
|
data = json.loads(message)
|
||||||
|
|
||||||
|
# Handle combined stream format
|
||||||
|
if 'stream' in data and 'data' in data:
|
||||||
|
stream_name = data['stream']
|
||||||
|
trade_data = data['data']
|
||||||
|
else:
|
||||||
|
# Single stream format
|
||||||
|
trade_data = data
|
||||||
|
stream_name = trade_data.get('e', '')
|
||||||
|
|
||||||
|
# Only process trade events
|
||||||
|
if not (stream_name.endswith('@trade') or trade_data.get('e') == 'trade'):
|
||||||
|
return None
|
||||||
|
|
||||||
|
return TickData(
|
||||||
|
symbol=trade_data['s'],
|
||||||
|
price=float(trade_data['p']),
|
||||||
|
quantity=float(trade_data['q']),
|
||||||
|
timestamp=int(trade_data['T']),
|
||||||
|
is_buyer_maker=trade_data['m'],
|
||||||
|
trade_id=int(trade_data['t']),
|
||||||
|
exchange="binance",
|
||||||
|
raw_data=trade_data
|
||||||
|
)
|
||||||
|
|
||||||
|
except (json.JSONDecodeError, KeyError, ValueError) as e:
|
||||||
|
self.logger.error(f"Failed to parse Binance message: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def calculate_optimal_distribution(self, symbols: List[str]) -> List[List[str]]:
|
||||||
|
"""EXTRACTED FROM STANDALONE: Calculate optimal symbol distribution across connections"""
|
||||||
|
# Conservative: leave room for message bursts
|
||||||
|
safety_margin = 0.7
|
||||||
|
symbols_per_connection = int(
|
||||||
|
min(
|
||||||
|
self.config.max_streams_per_connection * safety_margin,
|
||||||
|
self.config.websocket_limit_per_second * 5 # 5-second buffer
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Group symbols into chunks
|
||||||
|
symbol_groups = []
|
||||||
|
for i in range(0, len(symbols), symbols_per_connection):
|
||||||
|
group = symbols[i:i + symbols_per_connection]
|
||||||
|
symbol_groups.append(group)
|
||||||
|
|
||||||
|
return symbol_groups
|
||||||
|
|
||||||
|
class SiloqyWebSocketConnection:
|
||||||
|
"""EXTRACTED FROM STANDALONE: WebSocket connection adapted for Nautilus msgbus"""
|
||||||
|
|
||||||
|
def __init__(self, connection_id: str, symbols: List[str],
|
||||||
|
binance_adapter: BinanceAdapter, main_actor_ref):
|
||||||
|
self.connection_id = connection_id
|
||||||
|
self.symbols = symbols
|
||||||
|
self.adapter = binance_adapter
|
||||||
|
self.main_actor = main_actor_ref # Reference to SILOQYMainActor for msgbus access
|
||||||
|
|
||||||
|
self.websocket = None
|
||||||
|
self.is_running = False
|
||||||
|
self.retry_count = 0
|
||||||
|
self.messages_received = 0
|
||||||
|
self.last_message_time = 0.0
|
||||||
|
self.connected_at = None
|
||||||
|
|
||||||
|
self.logger = Logger(f"WS_{connection_id}")
|
||||||
|
self.logger.info(f"WebSocket connection initialized for {len(symbols)} symbols")
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""Start the WebSocket connection"""
|
||||||
|
self.is_running = True
|
||||||
|
|
||||||
|
while self.is_running:
|
||||||
|
try:
|
||||||
|
await self._connect_and_run()
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Connection {self.connection_id} error: {e}")
|
||||||
|
|
||||||
|
if self.is_running:
|
||||||
|
await self._handle_reconnection()
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
async def _connect_and_run(self) -> None:
|
||||||
|
"""EXTRACTED FROM STANDALONE: Connect to WebSocket and run message loop"""
|
||||||
|
url = self.adapter.get_websocket_url(self.symbols)
|
||||||
|
|
||||||
|
self.logger.info(f"Connecting to Binance: {url[:100]}...")
|
||||||
|
|
||||||
|
async with websockets.connect(
|
||||||
|
url,
|
||||||
|
ping_interval=self.adapter.config.ping_interval,
|
||||||
|
ping_timeout=self.adapter.config.ping_timeout,
|
||||||
|
close_timeout=10
|
||||||
|
) as websocket:
|
||||||
|
self.websocket = websocket
|
||||||
|
self.connected_at = time.time()
|
||||||
|
self.retry_count = 0
|
||||||
|
|
||||||
|
self.logger.info(f"Connection {self.connection_id} established")
|
||||||
|
|
||||||
|
# Message processing loop
|
||||||
|
async for message in websocket:
|
||||||
|
if not self.is_running:
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._process_message(message)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Message processing error: {e}")
|
||||||
|
|
||||||
|
async def _process_message(self, message: str) -> None:
|
||||||
|
"""Process incoming WebSocket message and convert to Nautilus format"""
|
||||||
|
self.messages_received += 1
|
||||||
|
self.last_message_time = time.time()
|
||||||
|
|
||||||
|
# Parse message using Binance adapter
|
||||||
|
tick_data = self.adapter.parse_message(message)
|
||||||
|
if tick_data:
|
||||||
|
# Convert TickData to SiloqyTradeTick and publish via Nautilus msgbus
|
||||||
|
try:
|
||||||
|
# Create SiloqyTradeTick object
|
||||||
|
siloqy_tick = SiloqyTradeTick(
|
||||||
|
instrument_id=tick_data.symbol,
|
||||||
|
price=round(tick_data.price, 8),
|
||||||
|
size=round(tick_data.quantity, 8),
|
||||||
|
ts_event=tick_data.timestamp * 1000000, # Convert ms to ns
|
||||||
|
ts_init=int(time.time_ns()),
|
||||||
|
open_price=None, # Will be set by candle logic
|
||||||
|
candle_open_time=None, # Will be set by candle logic
|
||||||
|
tick_side=None,
|
||||||
|
exchange="BINANCE",
|
||||||
|
liquidity_flag=None
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get candle information from main actor
|
||||||
|
if hasattr(self.main_actor, 'active_candles') and self.main_actor.active_candles:
|
||||||
|
symbol = tick_data.symbol
|
||||||
|
if symbol in self.main_actor.active_candles:
|
||||||
|
candle = self.main_actor.active_candles[symbol]
|
||||||
|
siloqy_tick.open_price = candle.get('open_price', tick_data.price)
|
||||||
|
siloqy_tick.candle_open_time = candle.get('open_time', int(time.time() * 1000))
|
||||||
|
|
||||||
|
# Process through main actor's tick handling (preserves all candle logic)
|
||||||
|
self.main_actor.on_websocket_tick(
|
||||||
|
tick_data.symbol,
|
||||||
|
tick_data.price,
|
||||||
|
tick_data.quantity,
|
||||||
|
tick_data.timestamp
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Failed to process tick: {e}")
|
||||||
|
|
||||||
|
async def _handle_reconnection(self) -> None:
|
||||||
|
"""Handle reconnection with exponential backoff"""
|
||||||
|
self.retry_count += 1
|
||||||
|
delay = min(2 ** self.retry_count, 60) # Max 60 seconds
|
||||||
|
|
||||||
|
self.logger.warning(f"Reconnecting in {delay}s (attempt {self.retry_count})")
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Stop the connection"""
|
||||||
|
self.logger.info(f"Stopping connection {self.connection_id}")
|
||||||
|
self.is_running = False
|
||||||
|
|
||||||
|
if self.websocket:
|
||||||
|
await self.websocket.close()
|
||||||
|
|
||||||
# --------------------------------------------------------------------
|
# --------------------------------------------------------------------
|
||||||
# SILOQY Custom Tick - PRESERVED with fixes
|
# SILOQY Custom Tick - PRESERVED with fixes
|
||||||
@@ -95,10 +337,11 @@ class SILOQYSymbolDiscoveryConfig(ActorConfig):
|
|||||||
class SILOQYMainActorConfig(ActorConfig):
|
class SILOQYMainActorConfig(ActorConfig):
|
||||||
candle_interval_ms: int = 15 * 60 * 1000
|
candle_interval_ms: int = 15 * 60 * 1000
|
||||||
throttle_mode: bool = False
|
throttle_mode: bool = False
|
||||||
|
enable_real_data: bool = False # NEW: Enable real WebSocket data
|
||||||
|
|
||||||
class DOLPHINRegimeActorConfig(ActorConfig):
|
class DOLPHINRegimeActorConfig(ActorConfig):
|
||||||
max_symbols: int = 5000
|
max_symbols: int = 5000
|
||||||
ticks_per_analysis: int = 1000
|
ticks_per_analysis: int = 10
|
||||||
|
|
||||||
class SILOQYNormalizerConfig(ActorConfig):
|
class SILOQYNormalizerConfig(ActorConfig):
|
||||||
pass
|
pass
|
||||||
@@ -115,6 +358,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
|
|||||||
self.symbols = list(config.symbols) if config.symbols else []
|
self.symbols = list(config.symbols) if config.symbols else []
|
||||||
self.candle_interval_ms = config.candle_interval_ms
|
self.candle_interval_ms = config.candle_interval_ms
|
||||||
self.active_candles = {}
|
self.active_candles = {}
|
||||||
|
self.tick_sizes = {}
|
||||||
|
|
||||||
# Process management configuration
|
# Process management configuration
|
||||||
self.throttle_mode = config.throttle_mode
|
self.throttle_mode = config.throttle_mode
|
||||||
@@ -174,18 +418,84 @@ class SILOQYSymbolDiscoveryActor(Actor):
|
|||||||
self.log.info("Starting dynamic symbol discovery from Binance...")
|
self.log.info("Starting dynamic symbol discovery from Binance...")
|
||||||
url = "https://api.binance.com/api/v3/exchangeInfo"
|
url = "https://api.binance.com/api/v3/exchangeInfo"
|
||||||
|
|
||||||
|
async with httpx.AsyncClient() as client:
|
||||||
|
# RATE LIMIT CHECK - Before anything else
|
||||||
|
self.log.info("Checking Binance API rate limit status...")
|
||||||
|
time_check_url = "https://api.binance.com/api/v3/time"
|
||||||
|
|
||||||
|
try:
|
||||||
|
rate_check_response = await client.get(time_check_url, timeout=5)
|
||||||
|
|
||||||
|
if rate_check_response.status_code == 200:
|
||||||
|
# Parse rate limit headers
|
||||||
|
headers = rate_check_response.headers
|
||||||
|
used_weight = headers.get('X-MBX-USED-WEIGHT-1M', 'Unknown')
|
||||||
|
server_time = rate_check_response.json().get('serverTime', 'Unknown')
|
||||||
|
|
||||||
|
self.log.info(f"Rate limit check passed - Used weight: {used_weight}/1200, Server time: {server_time}")
|
||||||
|
|
||||||
|
# Check if we're close to rate limit
|
||||||
|
if used_weight != 'Unknown' and int(used_weight) > 1000:
|
||||||
|
self.log.warning(f"HIGH RATE LIMIT USAGE: {used_weight}/1200 - Proceeding with caution")
|
||||||
|
|
||||||
|
elif rate_check_response.status_code == 429:
|
||||||
|
retry_after = rate_check_response.headers.get('Retry-After', '60')
|
||||||
|
self.log.error(f"RATE LIMITED: Must wait {retry_after} seconds before API calls")
|
||||||
|
raise Exception(f"Binance API rate limited - retry after {retry_after}s")
|
||||||
|
|
||||||
|
elif rate_check_response.status_code == 418:
|
||||||
|
self.log.error("IP BANNED: This IP address has been auto-banned by Binance")
|
||||||
|
raise Exception("IP address banned by Binance - cannot proceed")
|
||||||
|
|
||||||
|
else:
|
||||||
|
self.log.warning(f"Rate limit check returned status {rate_check_response.status_code}")
|
||||||
|
self.log.warning("Proceeding anyway, but may encounter issues")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
if "rate limited" in str(e).lower() or "banned" in str(e).lower():
|
||||||
|
raise # Re-raise rate limit/ban errors
|
||||||
|
else:
|
||||||
|
self.log.warning(f"Rate limit check failed: {e}")
|
||||||
|
self.log.warning("Proceeding with symbol discovery anyway")
|
||||||
|
|
||||||
async with httpx.AsyncClient() as client:
|
async with httpx.AsyncClient() as client:
|
||||||
self.log.info("Fetching exchange info from Binance API...")
|
self.log.info("Fetching exchange info from Binance API...")
|
||||||
response = await client.get(url, timeout=10)
|
response = await client.get(url, timeout=10)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
self.log.info("Successfully received exchange info")
|
self.log.info("Successfully received exchange info")
|
||||||
data = response.json()
|
data = response.json()
|
||||||
# Get all trading symbols (USDT pairs for example)
|
|
||||||
full_symbols = [
|
|
||||||
s['symbol'] for s in data['symbols']
|
|
||||||
if s['status'] == 'TRADING' and s['symbol'].endswith('USDT')
|
|
||||||
]
|
|
||||||
|
|
||||||
|
# Combined symbol discovery and tick size extraction
|
||||||
|
self.log.info("Processing symbols and extracting tick sizes...")
|
||||||
|
full_symbols = []
|
||||||
|
for symbol_info in data['symbols']:
|
||||||
|
if symbol_info['status'] == 'TRADING' and symbol_info['symbol'].endswith('USDT'):
|
||||||
|
symbol = symbol_info['symbol']
|
||||||
|
full_symbols.append(symbol)
|
||||||
|
|
||||||
|
# Extract tick size while processing # Extract tick size while processing
|
||||||
|
tick_size = None
|
||||||
|
for filter_info in symbol_info['filters']:
|
||||||
|
if filter_info['filterType'] == 'PRICE_FILTER':
|
||||||
|
tick_size = float(filter_info['tickSize'])
|
||||||
|
break
|
||||||
|
|
||||||
|
# If no PRICE_FILTER found, try other filter types
|
||||||
|
if tick_size is None:
|
||||||
|
for filter_info in symbol_info['filters']:
|
||||||
|
if filter_info['filterType'] == 'TICK_SIZE':
|
||||||
|
tick_size = float(filter_info['tickSize'])
|
||||||
|
break
|
||||||
|
|
||||||
|
# Fallback to default if still not found
|
||||||
|
if tick_size is None:
|
||||||
|
tick_size = 1e-8 # Default fallback
|
||||||
|
self.log.warning(f"No tick size found for {symbol}, using fallback {tick_size}")
|
||||||
|
|
||||||
|
self.tick_sizes[symbol] = tick_size
|
||||||
|
|
||||||
|
self.log.info(f"Processed {len(full_symbols)} symbols, extracted {len(self.tick_sizes)} tick sizes")
|
||||||
|
|
||||||
# Apply throttle mode symbol limiting
|
# Apply throttle mode symbol limiting
|
||||||
if self.throttle_mode:
|
if self.throttle_mode:
|
||||||
self.symbols = full_symbols[:self.max_symbols_throttled]
|
self.symbols = full_symbols[:self.max_symbols_throttled]
|
||||||
@@ -200,6 +510,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
|
|||||||
raise Exception(f"Failed to fetch exchange info: {response.status_code}")
|
raise Exception(f"Failed to fetch exchange info: {response.status_code}")
|
||||||
|
|
||||||
async def _fetch_stats_and_reconstruct_candles(self):
|
async def _fetch_stats_and_reconstruct_candles(self):
|
||||||
|
"""PRESERVED: All original rate limiting with Nautilus async patterns"""
|
||||||
url = "https://api.binance.com/api/v3/ticker/24hr"
|
url = "https://api.binance.com/api/v3/ticker/24hr"
|
||||||
klines_url = "https://api.binance.com/api/v3/klines"
|
klines_url = "https://api.binance.com/api/v3/klines"
|
||||||
ticker_url = "https://api.binance.com/api/v3/ticker/price"
|
ticker_url = "https://api.binance.com/api/v3/ticker/price"
|
||||||
@@ -315,7 +626,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error(f"Request failed: {str(e)}")
|
self.log.error(f"Request failed: {str(e)}")
|
||||||
|
|
||||||
# PRESERVED: Original rate limiting
|
# PRESERVED: Original rate limiting with Nautilus async
|
||||||
elapsed = time.time() - start_time
|
elapsed = time.time() - start_time
|
||||||
if i + 50 < len(self.symbols):
|
if i + 50 < len(self.symbols):
|
||||||
sleep_time = max(0, rate_limit_interval - elapsed)
|
sleep_time = max(0, rate_limit_interval - elapsed)
|
||||||
@@ -337,8 +648,10 @@ class SILOQYSymbolDiscoveryActor(Actor):
|
|||||||
# Publish symbols and candles as tuples
|
# Publish symbols and candles as tuples
|
||||||
self.msgbus.publish(SYMBOLS_DISCOVERED_TOPIC, (self.symbols, int(time.time_ns())))
|
self.msgbus.publish(SYMBOLS_DISCOVERED_TOPIC, (self.symbols, int(time.time_ns())))
|
||||||
self.msgbus.publish(CANDLES_INITIAL_TOPIC, (self.active_candles, int(time.time_ns())))
|
self.msgbus.publish(CANDLES_INITIAL_TOPIC, (self.active_candles, int(time.time_ns())))
|
||||||
|
self.msgbus.publish(TICK_SIZES_TOPIC, (self.tick_sizes, int(time.time_ns())))
|
||||||
|
|
||||||
self.log.info(f"Nautilus ActorExecutor: Published {len(self.symbols)} symbols and {len(self.active_candles)} candles")
|
self.log.info(f"Nautilus ActorExecutor: Published {len(self.symbols)} symbols and {len(self.active_candles)} candles")
|
||||||
|
self.log.info(f"Nautilus ActorExecutor: Published {len(self.tick_sizes)} tick sizes")
|
||||||
self.log.info("Nautilus ActorExecutor: Discovery phase complete - other actors can now start processing")
|
self.log.info("Nautilus ActorExecutor: Discovery phase complete - other actors can now start processing")
|
||||||
else:
|
else:
|
||||||
self.log.warning("Nautilus ActorExecutor: msgbus not available for publishing")
|
self.log.warning("Nautilus ActorExecutor: msgbus not available for publishing")
|
||||||
@@ -353,6 +666,7 @@ class SILOQYMainActor(Actor):
|
|||||||
# Preserve original configuration
|
# Preserve original configuration
|
||||||
self.candle_interval_ms = config.candle_interval_ms
|
self.candle_interval_ms = config.candle_interval_ms
|
||||||
self.throttle_mode = config.throttle_mode
|
self.throttle_mode = config.throttle_mode
|
||||||
|
self.enable_real_data = config.enable_real_data # NEW: Real data capability
|
||||||
self.connections = {}
|
self.connections = {}
|
||||||
self.connection_tasks = {}
|
self.connection_tasks = {}
|
||||||
|
|
||||||
@@ -360,6 +674,11 @@ class SILOQYMainActor(Actor):
|
|||||||
self.symbols = []
|
self.symbols = []
|
||||||
self.active_candles = {}
|
self.active_candles = {}
|
||||||
|
|
||||||
|
# WebSocket infrastructure - NEW
|
||||||
|
self.binance_adapter = BinanceAdapter() if self.enable_real_data else None
|
||||||
|
self.websocket_connections: Dict[str, SiloqyWebSocketConnection] = {}
|
||||||
|
self.websocket_tasks: Dict[str, asyncio.Task] = {}
|
||||||
|
|
||||||
# WebSocket tasks - managed by Nautilus ActorExecutor
|
# WebSocket tasks - managed by Nautilus ActorExecutor
|
||||||
self.ws_tasks = []
|
self.ws_tasks = []
|
||||||
|
|
||||||
@@ -369,6 +688,11 @@ class SILOQYMainActor(Actor):
|
|||||||
if self.throttle_mode:
|
if self.throttle_mode:
|
||||||
self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation")
|
self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation")
|
||||||
|
|
||||||
|
if self.enable_real_data:
|
||||||
|
self.log.info("REAL DATA MODE: Will connect to Binance WebSocket streams")
|
||||||
|
else:
|
||||||
|
self.log.info("SIMULATION MODE: Will generate simulated ticks")
|
||||||
|
|
||||||
self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor")
|
self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor")
|
||||||
|
|
||||||
def on_start(self) -> None:
|
def on_start(self) -> None:
|
||||||
@@ -393,6 +717,12 @@ class SILOQYMainActor(Actor):
|
|||||||
def on_stop(self) -> None:
|
def on_stop(self) -> None:
|
||||||
"""Stop the actor - Nautilus handles executor cleanup"""
|
"""Stop the actor - Nautilus handles executor cleanup"""
|
||||||
self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping")
|
self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping")
|
||||||
|
|
||||||
|
# Stop WebSocket connections if running
|
||||||
|
if self.enable_real_data:
|
||||||
|
for conn in self.websocket_connections.values():
|
||||||
|
asyncio.create_task(conn.stop())
|
||||||
|
|
||||||
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
||||||
|
|
||||||
async def on_start_async(self) -> None:
|
async def on_start_async(self) -> None:
|
||||||
@@ -401,8 +731,11 @@ class SILOQYMainActor(Actor):
|
|||||||
self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...")
|
self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...")
|
||||||
await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0)
|
await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0)
|
||||||
|
|
||||||
# PRESERVED: Start WebSocket connections
|
# ENHANCED: Start real or simulated connections based on configuration
|
||||||
await self._start_websocket_connections()
|
if self.enable_real_data:
|
||||||
|
await self._start_real_websocket_connections()
|
||||||
|
else:
|
||||||
|
await self._start_simulated_connections()
|
||||||
|
|
||||||
self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully")
|
self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully")
|
||||||
|
|
||||||
@@ -423,7 +756,6 @@ class SILOQYMainActor(Actor):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}")
|
self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}")
|
||||||
|
|
||||||
# FIXED: SILOQYMainActor.handle_candles_initial
|
|
||||||
def handle_candles_initial(self, data):
|
def handle_candles_initial(self, data):
|
||||||
"""Handle initial candles from discovery actor and properly initialize the candle dict."""
|
"""Handle initial candles from discovery actor and properly initialize the candle dict."""
|
||||||
try:
|
try:
|
||||||
@@ -448,8 +780,53 @@ class SILOQYMainActor(Actor):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}")
|
self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}")
|
||||||
|
|
||||||
async def _start_websocket_connections(self):
|
async def _start_real_websocket_connections(self):
|
||||||
"""PRESERVED: Original WebSocket connection logic with Nautilus ActorExecutor"""
|
"""NEW: Start real Binance WebSocket connections with throttle control"""
|
||||||
|
self.log.info("Starting REAL Binance WebSocket connections for live data...")
|
||||||
|
|
||||||
|
# Apply throttle mode limits to WebSocket connections
|
||||||
|
symbols_to_use = self.symbols
|
||||||
|
if self.throttle_mode:
|
||||||
|
symbols_to_use = self.symbols[:10] # Limit to 10 symbols in throttle mode
|
||||||
|
self.log.warning(f"THROTTLE MODE: Limited real data to {len(symbols_to_use)} symbols")
|
||||||
|
|
||||||
|
# Calculate optimal symbol distribution
|
||||||
|
symbol_groups = self.binance_adapter.calculate_optimal_distribution(symbols_to_use)
|
||||||
|
|
||||||
|
self.log.info(f"Real WebSocket distribution:")
|
||||||
|
self.log.info(f" - Total symbols: {len(symbols_to_use)}")
|
||||||
|
self.log.info(f" - Connections needed: {len(symbol_groups)}")
|
||||||
|
|
||||||
|
# Start WebSocket connections
|
||||||
|
for i, symbol_group in enumerate(symbol_groups):
|
||||||
|
connection_id = f"binance_real_{i:03d}"
|
||||||
|
|
||||||
|
connection = SiloqyWebSocketConnection(
|
||||||
|
connection_id=connection_id,
|
||||||
|
symbols=symbol_group,
|
||||||
|
binance_adapter=self.binance_adapter,
|
||||||
|
main_actor_ref=self
|
||||||
|
)
|
||||||
|
|
||||||
|
self.websocket_connections[connection_id] = connection
|
||||||
|
|
||||||
|
# Use Nautilus ActorExecutor for WebSocket task management
|
||||||
|
if hasattr(self, '_executor') and self._executor:
|
||||||
|
future = self._executor.submit(connection.start())
|
||||||
|
self.log.info(f"Real WebSocket {connection_id} submitted to ActorExecutor")
|
||||||
|
else:
|
||||||
|
task = asyncio.create_task(connection.start())
|
||||||
|
self.websocket_tasks[connection_id] = task
|
||||||
|
self.log.warning(f"Real WebSocket {connection_id} started with asyncio fallback")
|
||||||
|
|
||||||
|
# Rate limit connection establishment
|
||||||
|
if i < len(symbol_groups) - 1:
|
||||||
|
await asyncio.sleep(1.0) # 1 second between connections
|
||||||
|
|
||||||
|
self.log.info(f"Started {len(symbol_groups)} real WebSocket connections")
|
||||||
|
|
||||||
|
async def _start_simulated_connections(self):
|
||||||
|
"""PRESERVED: Original WebSocket simulation logic with Nautilus ActorExecutor"""
|
||||||
self.log.info("Starting WebSocket simulation for tick generation...")
|
self.log.info("Starting WebSocket simulation for tick generation...")
|
||||||
self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols")
|
self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols")
|
||||||
|
|
||||||
@@ -464,6 +841,7 @@ class SILOQYMainActor(Actor):
|
|||||||
self.ws_tasks.append(task)
|
self.ws_tasks.append(task)
|
||||||
|
|
||||||
async def _simulate_websocket_ticks(self):
|
async def _simulate_websocket_ticks(self):
|
||||||
|
"""PRESERVED: Original simulation logic"""
|
||||||
# Adjust tick rate for throttle mode
|
# Adjust tick rate for throttle mode
|
||||||
tick_delay = 0.1 if self.throttle_mode else 0.01
|
tick_delay = 0.1 if self.throttle_mode else 0.01
|
||||||
symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols))
|
symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols))
|
||||||
@@ -489,7 +867,8 @@ class SILOQYMainActor(Actor):
|
|||||||
log_interval = 500 if self.throttle_mode else 1000
|
log_interval = 500 if self.throttle_mode else 1000
|
||||||
if tick_count % log_interval == 0:
|
if tick_count % log_interval == 0:
|
||||||
mode_indicator = "THROTTLE" if self.throttle_mode else ""
|
mode_indicator = "THROTTLE" if self.throttle_mode else ""
|
||||||
self.log.info(f"Generated {tick_count} ticks {mode_indicator} - latest: {symbol}@{price:.2f}")
|
data_source = "REAL" if self.enable_real_data else "SIM"
|
||||||
|
self.log.info(f"Generated {tick_count} ticks {mode_indicator} {data_source} - latest: {symbol}@{price:.2f}")
|
||||||
|
|
||||||
await asyncio.sleep(tick_delay)
|
await asyncio.sleep(tick_delay)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
@@ -549,6 +928,8 @@ class SILOQYMainActor(Actor):
|
|||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error(f"Nautilus ActorExecutor: Failed to publish tick: {e}")
|
self.log.error(f"Nautilus ActorExecutor: Failed to publish tick: {e}")
|
||||||
|
self.log.error(f"{tick_tuple}")
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
class DOLPHINRegimeActor(Actor):
|
class DOLPHINRegimeActor(Actor):
|
||||||
def __init__(self, config: DOLPHINRegimeActorConfig):
|
def __init__(self, config: DOLPHINRegimeActorConfig):
|
||||||
@@ -565,6 +946,7 @@ class DOLPHINRegimeActor(Actor):
|
|||||||
self.low_prices = np.zeros(self.max_symbols, dtype=np.float64)
|
self.low_prices = np.zeros(self.max_symbols, dtype=np.float64)
|
||||||
self.volumes = np.zeros(self.max_symbols, dtype=np.float64)
|
self.volumes = np.zeros(self.max_symbols, dtype=np.float64)
|
||||||
self.last_update = np.zeros(self.max_symbols, dtype=np.int64)
|
self.last_update = np.zeros(self.max_symbols, dtype=np.int64)
|
||||||
|
self.tick_sizes = np.full(self.max_symbols, 1e-8, dtype=np.float64) # Default fallback
|
||||||
|
|
||||||
# PRESERVED: All original mapping and state
|
# PRESERVED: All original mapping and state
|
||||||
self.symbol_to_idx = {}
|
self.symbol_to_idx = {}
|
||||||
@@ -585,25 +967,76 @@ class DOLPHINRegimeActor(Actor):
|
|||||||
self.processed_ticks = 0
|
self.processed_ticks = 0
|
||||||
self.regime_calculations = 0
|
self.regime_calculations = 0
|
||||||
|
|
||||||
|
# NEW: Enhanced indicator tracking for BB and temporal patterns
|
||||||
|
self.signal_history = deque(maxlen=100) # For BB calculations
|
||||||
|
self.bb_period = 20 # BB calculation period
|
||||||
|
self.bb_std_dev = 2.0 # BB standard deviation multiplier
|
||||||
|
self.velocity_history = deque(maxlen=10) # For regime velocity tracking
|
||||||
|
self.confidence_history = deque(maxlen=20) # For confidence trend analysis
|
||||||
|
|
||||||
self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, "
|
self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, "
|
||||||
f"ticks_per_analysis: {self.ticks_per_analysis}")
|
f"ticks_per_analysis: {self.ticks_per_analysis}")
|
||||||
|
|
||||||
def on_start(self) -> None:
|
def on_start(self) -> None:
|
||||||
"""Subscribe to tick events - using Nautilus ActorExecutor"""
|
"""Subscribe to tick events - using Nautilus ActorExecutor"""
|
||||||
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to tick events")
|
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to events")
|
||||||
|
|
||||||
if hasattr(self, 'msgbus') and self.msgbus:
|
if hasattr(self, 'msgbus') and self.msgbus:
|
||||||
self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick)
|
self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick)
|
||||||
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events")
|
self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered)
|
||||||
|
self.msgbus.subscribe(TICK_SIZES_TOPIC, self.handle_tick_sizes)
|
||||||
|
self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events, symbol discovery and tick sizes")
|
||||||
|
|
||||||
def on_stop(self) -> None:
|
def on_stop(self) -> None:
|
||||||
"""Stop the actor - Nautilus handles executor cleanup"""
|
"""Stop the actor - Nautilus handles executor cleanup"""
|
||||||
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping")
|
self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping")
|
||||||
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
||||||
|
|
||||||
|
def handle_symbols_discovered(self, data):
|
||||||
|
"""Pre-initialize symbol mappings during discovery phase"""
|
||||||
|
try:
|
||||||
|
symbols, timestamp = data
|
||||||
|
self.log.info(f"Nautilus ActorExecutor: Pre-initializing {len(symbols)} symbol mappings")
|
||||||
|
|
||||||
|
for symbol in symbols:
|
||||||
|
if self.active_symbols >= self.max_symbols:
|
||||||
|
self.log.warning(f"Max symbols ({self.max_symbols}) exceeded during initialization")
|
||||||
|
break
|
||||||
|
|
||||||
|
idx = self.active_symbols
|
||||||
|
self.symbol_to_idx[symbol] = idx
|
||||||
|
self.idx_to_symbol[idx] = symbol
|
||||||
|
self.active_symbols += 1
|
||||||
|
|
||||||
|
self.log.info(f"Pre-initialized {self.active_symbols} symbol mappings")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Error pre-initializing symbols: {e}")
|
||||||
|
|
||||||
|
def handle_tick_sizes(self, data):
|
||||||
|
"""Apply tick sizes to pre-initialized symbol mappings"""
|
||||||
|
try:
|
||||||
|
tick_sizes, timestamp = data
|
||||||
|
self.log.info(f"Nautilus ActorExecutor: Received {len(tick_sizes)} tick sizes from discovery actor")
|
||||||
|
|
||||||
|
applied_count = 0
|
||||||
|
for symbol, tick_size in tick_sizes.items():
|
||||||
|
if symbol in self.symbol_to_idx: # Will exist from pre-initialization
|
||||||
|
idx = self.symbol_to_idx[symbol]
|
||||||
|
if 0 < tick_size <= 1.0:
|
||||||
|
self.tick_sizes[idx] = tick_size
|
||||||
|
applied_count += 1
|
||||||
|
else:
|
||||||
|
self.log.warning(f"Invalid tick size {tick_size} for {symbol}, using fallback")
|
||||||
|
|
||||||
|
self.log.info(f"Nautilus ActorExecutor: Applied {applied_count} tick sizes to pre-initialized symbols")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.log.error(f"Nautilus ActorExecutor: Error handling tick sizes: {e}")
|
||||||
|
|
||||||
def handle_raw_tick(self, data):
|
def handle_raw_tick(self, data):
|
||||||
"""
|
"""
|
||||||
PRESERVED EXACTLY: All original zero-allocation tick processing
|
SIMPLIFIED: Zero-allocation tick processing with pre-initialized symbols
|
||||||
Using Nautilus ActorExecutor for regime detection tasks
|
Using Nautilus ActorExecutor for regime detection tasks
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
@@ -613,41 +1046,28 @@ class DOLPHINRegimeActor(Actor):
|
|||||||
self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}")
|
self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}")
|
||||||
return
|
return
|
||||||
|
|
||||||
# PRESERVED EXACTLY: All original array processing logic
|
# Direct lookup - symbol will exist from pre-initialization
|
||||||
if symbol not in self.symbol_to_idx:
|
if symbol not in self.symbol_to_idx:
|
||||||
if self.active_symbols >= self.max_symbols:
|
self.log.error(f"Nautilus ActorExecutor: Symbol {symbol} not found in pre-initialized mappings")
|
||||||
self.log.error(f"Nautilus ActorExecutor: Max symbols ({self.max_symbols}) exceeded")
|
return
|
||||||
return
|
|
||||||
|
|
||||||
idx = self.active_symbols
|
idx = self.symbol_to_idx[symbol]
|
||||||
self.symbol_to_idx[symbol] = idx
|
|
||||||
self.idx_to_symbol[idx] = symbol
|
# Check if new candle period
|
||||||
self.active_symbols += 1
|
if candle_open_time > self.last_update[idx]:
|
||||||
|
# Reset for new candle
|
||||||
# Initialize arrays
|
|
||||||
self.open_prices[idx] = open_price
|
self.open_prices[idx] = open_price
|
||||||
self.high_prices[idx] = price
|
self.high_prices[idx] = price
|
||||||
self.low_prices[idx] = price
|
self.low_prices[idx] = price
|
||||||
self.close_prices[idx] = price
|
self.close_prices[idx] = price
|
||||||
self.volumes[idx] = 0.0
|
self.volumes[idx] = size
|
||||||
|
self.last_update[idx] = candle_open_time
|
||||||
else:
|
else:
|
||||||
idx = self.symbol_to_idx[symbol]
|
# Update existing candle data
|
||||||
|
self.close_prices[idx] = price
|
||||||
# Check if new candle period
|
self.high_prices[idx] = max(self.high_prices[idx], price)
|
||||||
if candle_open_time > self.last_update[idx]:
|
self.low_prices[idx] = min(self.low_prices[idx], price)
|
||||||
# Reset for new candle
|
self.volumes[idx] += size
|
||||||
self.open_prices[idx] = open_price
|
|
||||||
self.high_prices[idx] = price
|
|
||||||
self.low_prices[idx] = price
|
|
||||||
self.close_prices[idx] = price
|
|
||||||
self.volumes[idx] = size
|
|
||||||
self.last_update[idx] = candle_open_time
|
|
||||||
else:
|
|
||||||
# Update existing candle data
|
|
||||||
self.close_prices[idx] = price
|
|
||||||
self.high_prices[idx] = max(self.high_prices[idx], price)
|
|
||||||
self.low_prices[idx] = min(self.low_prices[idx], price)
|
|
||||||
self.volumes[idx] += size
|
|
||||||
|
|
||||||
self.tick_count += 1
|
self.tick_count += 1
|
||||||
self.processed_ticks += 1
|
self.processed_ticks += 1
|
||||||
@@ -675,6 +1095,101 @@ class DOLPHINRegimeActor(Actor):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}")
|
self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}")
|
||||||
|
|
||||||
|
def _calculate_enhanced_indicators(self, bull_ratio, bear_ratio, confidence, analyzed, total_symbols):
|
||||||
|
"""NEW: Calculate enhanced indicators including BB metrics and temporal patterns"""
|
||||||
|
# Calculate regime momentum signal
|
||||||
|
base_momentum = (bull_ratio - bear_ratio) * 100 # -100 to +100
|
||||||
|
sample_quality = min(analyzed / total_symbols, 1.0) if total_symbols > 0 else 0.0
|
||||||
|
signal = base_momentum * confidence * sample_quality
|
||||||
|
|
||||||
|
# Add to signal history
|
||||||
|
self.signal_history.append(signal)
|
||||||
|
|
||||||
|
# Calculate velocity (rate of change in signal)
|
||||||
|
velocity = 0.0
|
||||||
|
if len(self.signal_history) >= 2:
|
||||||
|
velocity = self.signal_history[-1] - self.signal_history[-2]
|
||||||
|
self.velocity_history.append(velocity)
|
||||||
|
|
||||||
|
# Store confidence for trending
|
||||||
|
self.confidence_history.append(confidence)
|
||||||
|
|
||||||
|
# Calculate Bollinger Bands if enough history
|
||||||
|
bb_metrics = {}
|
||||||
|
if len(self.signal_history) >= self.bb_period:
|
||||||
|
recent_signals = list(self.signal_history)[-self.bb_period:]
|
||||||
|
sma = sum(recent_signals) / len(recent_signals)
|
||||||
|
|
||||||
|
# Calculate standard deviation
|
||||||
|
variance = sum((x - sma) ** 2 for x in recent_signals) / len(recent_signals)
|
||||||
|
std_dev = variance ** 0.5
|
||||||
|
|
||||||
|
upper_band = sma + (self.bb_std_dev * std_dev)
|
||||||
|
lower_band = sma - (self.bb_std_dev * std_dev)
|
||||||
|
|
||||||
|
# Position within BBs (mean reversion interpretation)
|
||||||
|
if signal > upper_band:
|
||||||
|
bb_position = 'ABOVE_UPPER'
|
||||||
|
elif signal < lower_band:
|
||||||
|
bb_position = 'BELOW_LOWER'
|
||||||
|
elif signal >= sma:
|
||||||
|
bb_position = 'UPPER_HALF'
|
||||||
|
else:
|
||||||
|
bb_position = 'LOWER_HALF'
|
||||||
|
|
||||||
|
# Momentum persistence interpretation
|
||||||
|
if signal > upper_band:
|
||||||
|
momentum_signal = 'STRONG_BULL_BREAKOUT'
|
||||||
|
elif signal < lower_band:
|
||||||
|
momentum_signal = 'STRONG_BEAR_BREAKOUT'
|
||||||
|
elif signal > sma:
|
||||||
|
momentum_signal = 'MILD_BULLISH'
|
||||||
|
else:
|
||||||
|
momentum_signal = 'MILD_BEARISH'
|
||||||
|
|
||||||
|
bb_metrics = {
|
||||||
|
'signal': signal,
|
||||||
|
'sma': sma,
|
||||||
|
'upper_band': upper_band,
|
||||||
|
'lower_band': lower_band,
|
||||||
|
'bb_position': bb_position,
|
||||||
|
'momentum_signal': momentum_signal,
|
||||||
|
'bb_ready': True
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
bb_metrics = {
|
||||||
|
'signal': signal,
|
||||||
|
'sma': None,
|
||||||
|
'upper_band': None,
|
||||||
|
'lower_band': None,
|
||||||
|
'bb_position': 'INSUFFICIENT_DATA',
|
||||||
|
'momentum_signal': 'INSUFFICIENT_DATA',
|
||||||
|
'bb_ready': False
|
||||||
|
}
|
||||||
|
|
||||||
|
# Calculate temporal patterns
|
||||||
|
temporal_metrics = {}
|
||||||
|
if len(self.velocity_history) >= 3:
|
||||||
|
avg_velocity = sum(self.velocity_history) / len(self.velocity_history)
|
||||||
|
velocity_trend = 'ACCELERATING' if velocity > avg_velocity else 'DECELERATING'
|
||||||
|
else:
|
||||||
|
avg_velocity = velocity
|
||||||
|
velocity_trend = 'BUILDING_HISTORY'
|
||||||
|
|
||||||
|
if len(self.confidence_history) >= 5:
|
||||||
|
confidence_trend = 'RISING' if confidence > sum(self.confidence_history[-5:-1])/4 else 'FALLING'
|
||||||
|
else:
|
||||||
|
confidence_trend = 'BUILDING_HISTORY'
|
||||||
|
|
||||||
|
temporal_metrics = {
|
||||||
|
'velocity': velocity,
|
||||||
|
'avg_velocity': avg_velocity,
|
||||||
|
'velocity_trend': velocity_trend,
|
||||||
|
'confidence_trend': confidence_trend
|
||||||
|
}
|
||||||
|
|
||||||
|
return bb_metrics, temporal_metrics
|
||||||
|
|
||||||
def _run_regime_detection(self):
|
def _run_regime_detection(self):
|
||||||
self.regime_calculations += 1
|
self.regime_calculations += 1
|
||||||
|
|
||||||
@@ -686,6 +1201,9 @@ class DOLPHINRegimeActor(Actor):
|
|||||||
bullish = 0
|
bullish = 0
|
||||||
bearish = 0
|
bearish = 0
|
||||||
|
|
||||||
|
# NEW: Track pattern of bullish/bearish symbols for this calculation
|
||||||
|
symbol_pattern = []
|
||||||
|
|
||||||
# PRESERVED: Original analysis with exact thresholds
|
# PRESERVED: Original analysis with exact thresholds
|
||||||
for idx in range(self.active_symbols):
|
for idx in range(self.active_symbols):
|
||||||
open_price = self.open_prices[idx]
|
open_price = self.open_prices[idx]
|
||||||
@@ -696,13 +1214,26 @@ class DOLPHINRegimeActor(Actor):
|
|||||||
|
|
||||||
analyzed += 1
|
analyzed += 1
|
||||||
|
|
||||||
# PRESERVED: EXACT DOLPHIN thresholds
|
|
||||||
change = (close_price - open_price) / open_price
|
# NEW: HFT-grade tick-size based comparison
|
||||||
|
tick_size = self.tick_sizes[idx]
|
||||||
|
equality_threshold = tick_size / 2 # Half tick size standard
|
||||||
|
price_diff = abs(close_price - open_price)
|
||||||
|
|
||||||
if change >= 0.0015: # 0.15% threshold for bullish
|
# Check if prices are effectively equal within tick size tolerance
|
||||||
|
if price_diff <= equality_threshold:
|
||||||
|
# Prices are effectively equal (within tick size tolerance)
|
||||||
|
symbol_pattern.append(f"S{close_price:.8f}={open_price:.8f}")
|
||||||
|
elif close_price > open_price:
|
||||||
|
# Bullish: close > open
|
||||||
bullish += 1
|
bullish += 1
|
||||||
elif change <= -0.0015: # -0.15% threshold for bearish
|
# Arrow points to close (larger price)
|
||||||
|
symbol_pattern.append(f"B{open_price:.8f}->{close_price:.8f}")
|
||||||
|
else:
|
||||||
|
# Bearish: close < open
|
||||||
bearish += 1
|
bearish += 1
|
||||||
|
# Arrow points to open (larger price)
|
||||||
|
symbol_pattern.append(f"X{close_price:.8f}<-{open_price:.8f}")
|
||||||
|
|
||||||
if analyzed == 0:
|
if analyzed == 0:
|
||||||
return
|
return
|
||||||
@@ -724,6 +1255,11 @@ class DOLPHINRegimeActor(Actor):
|
|||||||
# PRESERVED: Original confidence calculation
|
# PRESERVED: Original confidence calculation
|
||||||
confidence = self._calculate_confidence(bull_ratio, bear_ratio, analyzed, total_symbols)
|
confidence = self._calculate_confidence(bull_ratio, bear_ratio, analyzed, total_symbols)
|
||||||
|
|
||||||
|
# NEW: Calculate enhanced indicators
|
||||||
|
bb_metrics, temporal_metrics = self._calculate_enhanced_indicators(
|
||||||
|
bull_ratio, bear_ratio, confidence, analyzed, total_symbols
|
||||||
|
)
|
||||||
|
|
||||||
self.previous_bull_ratio = bull_ratio
|
self.previous_bull_ratio = bull_ratio
|
||||||
|
|
||||||
# Publish regime result using Nautilus message bus
|
# Publish regime result using Nautilus message bus
|
||||||
@@ -742,20 +1278,87 @@ class DOLPHINRegimeActor(Actor):
|
|||||||
|
|
||||||
self.msgbus.publish(REGIME_TOPIC, regime_tuple)
|
self.msgbus.publish(REGIME_TOPIC, regime_tuple)
|
||||||
|
|
||||||
|
# NEW: Publish enhanced indicators to data bus
|
||||||
|
indicator_data = {
|
||||||
|
'timestamp': int(time.time() * 1000),
|
||||||
|
'regime_momentum_signal': bb_metrics['signal'],
|
||||||
|
'bb_ready': bb_metrics['bb_ready'],
|
||||||
|
'velocity': temporal_metrics['velocity'],
|
||||||
|
'velocity_trend': temporal_metrics['velocity_trend'],
|
||||||
|
'confidence_trend': temporal_metrics['confidence_trend']
|
||||||
|
}
|
||||||
|
self.msgbus.publish(REGIME_INDICATORS_TOPIC, indicator_data)
|
||||||
|
|
||||||
|
# Publish BB metrics separately for specialized consumers
|
||||||
|
if bb_metrics['bb_ready']:
|
||||||
|
bb_data = {
|
||||||
|
'timestamp': int(time.time() * 1000),
|
||||||
|
'signal': bb_metrics['signal'],
|
||||||
|
'sma': bb_metrics['sma'],
|
||||||
|
'upper_band': bb_metrics['upper_band'],
|
||||||
|
'lower_band': bb_metrics['lower_band'],
|
||||||
|
'bb_position': bb_metrics['bb_position'],
|
||||||
|
'momentum_signal': bb_metrics['momentum_signal']
|
||||||
|
}
|
||||||
|
self.msgbus.publish(BB_METRICS_TOPIC, bb_data)
|
||||||
|
|
||||||
|
# Publish temporal patterns
|
||||||
|
temporal_data = {
|
||||||
|
'timestamp': int(time.time() * 1000),
|
||||||
|
'velocity': temporal_metrics['velocity'],
|
||||||
|
'avg_velocity': temporal_metrics['avg_velocity'],
|
||||||
|
'velocity_trend': temporal_metrics['velocity_trend'],
|
||||||
|
'confidence_trend': temporal_metrics['confidence_trend'],
|
||||||
|
'signal_history_length': len(self.signal_history)
|
||||||
|
}
|
||||||
|
self.msgbus.publish(TEMPORAL_PATTERNS_TOPIC, temporal_data)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}")
|
self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}")
|
||||||
|
|
||||||
# Log regime changes
|
# Log regime changes
|
||||||
if not self.regime_history or regime != self.regime_history[-1]:
|
if not self.regime_history or regime != self.regime_history[-1]:
|
||||||
self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.1%} "
|
self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.2%} "
|
||||||
f"Bear: {bear_ratio:.1%} Sideways: {sideways_ratio:.1%} | "
|
f"Bear: {bear_ratio:.2%} Sideways: {sideways_ratio:.2%} ({bullish}/{bearish}) | "
|
||||||
f"Confidence: {confidence:.1%} | Analyzed: {analyzed}/{total_symbols}")
|
f"Confidence: {confidence:.2%} | Analyzed: {analyzed}/{total_symbols}")
|
||||||
self.regime_history.append(regime)
|
self.regime_history.append(regime)
|
||||||
|
|
||||||
# Periodic regime status (even without changes)
|
# Periodic regime status (even without changes)
|
||||||
if self.regime_calculations % 10 == 0: # Every 10 calculations
|
if self.regime_calculations % 50 == 0: # Every second, approx, given avg. tick rate
|
||||||
self.log.info(f"REGIME STATUS: {regime.value} | Bull: {bull_ratio:.1%} "
|
# Determine color based on regime
|
||||||
f"Bear: {bear_ratio:.1%} | Processed: {self.processed_ticks} ticks")
|
if regime == MarketRegime.BULL:
|
||||||
|
color_code = "\033[92m" # Green
|
||||||
|
elif regime == MarketRegime.BEAR:
|
||||||
|
color_code = "\033[91m" # Red
|
||||||
|
else: # SIDEWAYS
|
||||||
|
color_code = "\033[93m" # Yellow
|
||||||
|
|
||||||
|
# Reset color code
|
||||||
|
reset_code = "\033[0m"
|
||||||
|
|
||||||
|
self.log.info(f"{color_code}REGIME STATUS: {regime.value} | Bull: {bull_ratio:.2%} "
|
||||||
|
f"Bear: {bear_ratio:.2%} ({bullish}/{bearish}) | Processed: {self.processed_ticks} ticks{reset_code}")
|
||||||
|
|
||||||
|
# NEW: Enhanced indicator line after regime status
|
||||||
|
if bb_metrics['bb_ready']:
|
||||||
|
self.log.info(f"{color_code}INDICATORS: Signal:{bb_metrics['signal']:.1f} | "
|
||||||
|
f"SMA:{bb_metrics['sma']:.1f} | Upper:{bb_metrics['upper_band']:.1f} | "
|
||||||
|
f"Lower:{bb_metrics['lower_band']:.1f} | Pos:{bb_metrics['bb_position']} | "
|
||||||
|
f"Mom:{bb_metrics['momentum_signal']} | Vel:{temporal_metrics['velocity']:.1f} | "
|
||||||
|
f"VelTrend:{temporal_metrics['velocity_trend']} | ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}")
|
||||||
|
else:
|
||||||
|
self.log.info(f"{color_code}INDICATORS: Signal:{bb_metrics['signal']:.1f} | "
|
||||||
|
f"Status:BUILDING_BB_HISTORY ({len(self.signal_history)}/{self.bb_period}) | "
|
||||||
|
f"Vel:{temporal_metrics['velocity']:.1f} | VelTrend:{temporal_metrics['velocity_trend']} | "
|
||||||
|
f"ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}")
|
||||||
|
|
||||||
|
# NEW: Log symbol pattern and counts
|
||||||
|
if symbol_pattern: # Only if we have symbols to show
|
||||||
|
pattern_str = " ".join(symbol_pattern) + " " # Create pattern with spaces
|
||||||
|
bull_count = sum(1 for s in symbol_pattern if s.startswith("B"))
|
||||||
|
bear_count = sum(1 for s in symbol_pattern if s.startswith("X"))
|
||||||
|
|
||||||
|
self.log.debug(f"{pattern_str} and totals: BULLS:{bull_count}/BEARS:{bear_count}")
|
||||||
|
|
||||||
def _calculate_confidence(self, bull_ratio: float, bear_ratio: float,
|
def _calculate_confidence(self, bull_ratio: float, bear_ratio: float,
|
||||||
analyzed: int, total: int) -> float:
|
analyzed: int, total: int) -> float:
|
||||||
@@ -870,7 +1473,7 @@ def test_siloqy_actors_with_nautilus_process_management():
|
|||||||
"candle_interval_ms": 15 * 60 * 1000,
|
"candle_interval_ms": 15 * 60 * 1000,
|
||||||
"throttle_mode": True, # ENABLED: Safe for dual instance testing
|
"throttle_mode": True, # ENABLED: Safe for dual instance testing
|
||||||
"throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s)
|
"throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s)
|
||||||
"max_symbols_throttled": 100 # Only 100 symbols (vs 2000+)
|
"max_symbols_throttled": 414 # Only 100 symbols (vs 2000+)
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -880,7 +1483,8 @@ def test_siloqy_actors_with_nautilus_process_management():
|
|||||||
config={
|
config={
|
||||||
"component_id": "SILOQY-MAIN-ACTOR",
|
"component_id": "SILOQY-MAIN-ACTOR",
|
||||||
"candle_interval_ms": 15 * 60 * 1000,
|
"candle_interval_ms": 15 * 60 * 1000,
|
||||||
"throttle_mode": True # ENABLED: Reduced tick generation
|
"throttle_mode": False, # ENABLED: Reduced tick generation
|
||||||
|
"enable_real_data": True # CHANGE TO True for real WebSocket data
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -890,7 +1494,7 @@ def test_siloqy_actors_with_nautilus_process_management():
|
|||||||
config={
|
config={
|
||||||
"component_id": "DOLPHIN-REGIME-ACTOR",
|
"component_id": "DOLPHIN-REGIME-ACTOR",
|
||||||
"max_symbols": 5000,
|
"max_symbols": 5000,
|
||||||
"ticks_per_analysis": 500 # Reduced for throttle mode testing
|
"ticks_per_analysis": 2 # Reduced for throttle mode testing
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -915,8 +1519,8 @@ def test_siloqy_actors_with_nautilus_process_management():
|
|||||||
exec_clients={} # No execution clients for this test
|
exec_clients={} # No execution clients for this test
|
||||||
)
|
)
|
||||||
|
|
||||||
node = TradingNode(config=trading_config)
|
node = TradingNode(config=trading_config)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
node.build()
|
node.build()
|
||||||
print("Node built successfully with Nautilus built-in process management")
|
print("Node built successfully with Nautilus built-in process management")
|
||||||
|
|||||||
1549
nautilus_actor_test_implementation_6x.py
Normal file
1549
nautilus_actor_test_implementation_6x.py
Normal file
File diff suppressed because it is too large
Load Diff
24
symbol_consideration_issue.md
Normal file
24
symbol_consideration_issue.md
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
2025-09-01T19:54:58.150165200Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 75.0% Bear: 25.0% | Processed: 1374 ticks
|
||||||
|
2025-09-01T19:54:58.150201100Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.45000000<-109263.08000000 B2.77130000->2.77360000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 and totals: BULLS:6/BEARS:2
|
||||||
|
2025-09-01T19:54:58.660990300Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: Nautilus ActorExecutor: DOLPHIN metrics - ticks: 1375, regime_calcs: 687, active_symbols: 8
|
||||||
|
2025-09-01T19:54:58.661042700Z [INFO] TRADER-000.SILOQY-NORMALIZER: Nautilus ActorExecutor: Normalizer processed: 1375 ticks
|
||||||
|
2025-09-01T19:54:58.730072100Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 75.0% Bear: 25.0% | Processed: 1376 ticks
|
||||||
|
2025-09-01T19:54:58.730093900Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.45000000<-109263.08000000 B2.77130000->2.77360000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 and totals: BULLS:6/BEARS:2
|
||||||
|
2025-09-01T19:54:59.455057400Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 75.0% Bear: 25.0% | Processed: 1378 ticks
|
||||||
|
2025-09-01T19:54:59.455081700Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.45000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 and totals: BULLS:6/BEARS:2
|
||||||
|
2025-09-01T19:54:59.568990700Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1380 ticks
|
||||||
|
2025-09-01T19:54:59.569016900Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.45000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 B2.58800000->2.59000000 and totals: BULLS:7/BEARS:2
|
||||||
|
2025-09-01T19:54:59.666864100Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: Nautilus ActorExecutor: DOLPHIN metrics - ticks: 1381, regime_calcs: 690, active_symbols: 9
|
||||||
|
2025-09-01T19:54:59.666902500Z [INFO] TRADER-000.SILOQY-NORMALIZER: Nautilus ActorExecutor: Normalizer processed: 1381 ticks
|
||||||
|
2025-09-01T19:54:59.726017700Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1382 ticks
|
||||||
|
2025-09-01T19:54:59.726051700Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.44000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 B2.58800000->2.59000000 and totals: BULLS:7/BEARS:2
|
||||||
|
2025-09-01T19:54:59.999524400Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1384 ticks
|
||||||
|
2025-09-01T19:54:59.999567100Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.44000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 B2.58800000->2.59000000 and totals: BULLS:7/BEARS:2
|
||||||
|
2025-09-01T19:54:59.999803000Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1386 ticks
|
||||||
|
2025-09-01T19:54:59.999815800Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.44000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 B2.58800000->2.59000000 and totals: BULLS:7/BEARS:2
|
||||||
|
2025-09-01T19:55:00.683433300Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1388 ticks
|
||||||
|
|
||||||
|
In this log, at the "transition" between 75.x% and 77.x% you will find one of the symbols being considered in the DOLPHIN market-regime detection is missing totals go from 6/2 to 7/2 ...
|
||||||
|
... so *one less symbol* is being considered.-
|
||||||
|
|
||||||
|
Most likely *no tick has been produced for that symbols* (is this a correct assumption?).-
|
||||||
Reference in New Issue
Block a user