Compare commits
5 Commits
main
...
c9cffb4b53
| Author | SHA1 | Date | |
|---|---|---|---|
| c9cffb4b53 | |||
| 0f9a4a6f9c | |||
| 8b59e0ed4e | |||
| b5cd68263c | |||
| 9f1dbffc2a |
1399
SILOQY_DOLPHIN_Integration_Engineering_Spect_1.0.md
Normal file
1399
SILOQY_DOLPHIN_Integration_Engineering_Spect_1.0.md
Normal file
File diff suppressed because it is too large
Load Diff
@@ -3,11 +3,13 @@ import numpy as np
|
|||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
import math
|
import math
|
||||||
|
import httpx
|
||||||
|
import websockets
|
||||||
from typing import Dict, List, Optional, Tuple, Any
|
from typing import Dict, List, Optional, Tuple, Any
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from collections import deque
|
from collections import deque
|
||||||
import httpx
|
|
||||||
import random # Added for _simulate_websocket_ticks
|
import random # Added for _simulate_websocket_ticks
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
|
||||||
# Nautilus imports - following test pattern
|
# Nautilus imports - following test pattern
|
||||||
from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig
|
from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig
|
||||||
@@ -44,6 +46,239 @@ class MarketRegime(Enum):
|
|||||||
BEAR = "BEAR"
|
BEAR = "BEAR"
|
||||||
TRANSITION = "TRANSITION"
|
TRANSITION = "TRANSITION"
|
||||||
SIDEWAYS = "SIDEWAYS"
|
SIDEWAYS = "SIDEWAYS"
|
||||||
|
|
||||||
|
# ============================================================================
|
||||||
|
# EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE
|
||||||
|
# ============================================================================
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class TickData:
|
||||||
|
"""Universal tick data structure from standalone"""
|
||||||
|
symbol: str
|
||||||
|
price: float
|
||||||
|
quantity: float
|
||||||
|
timestamp: int
|
||||||
|
is_buyer_maker: bool
|
||||||
|
trade_id: int
|
||||||
|
exchange: str
|
||||||
|
raw_data: Dict = None
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
if self.raw_data is None:
|
||||||
|
self.raw_data = {}
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class ExchangeConfig:
|
||||||
|
"""Exchange-specific configuration from standalone"""
|
||||||
|
name: str
|
||||||
|
websocket_limit_per_second: int
|
||||||
|
max_streams_per_connection: int
|
||||||
|
max_connections_per_window: int
|
||||||
|
connection_window_seconds: int
|
||||||
|
base_websocket_url: str
|
||||||
|
requires_auth: bool = False
|
||||||
|
ping_interval: float = 20.0
|
||||||
|
ping_timeout: float = 10.0
|
||||||
|
|
||||||
|
class BinanceAdapter:
|
||||||
|
"""EXTRACTED FROM STANDALONE: Binance-specific WebSocket adapter"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.config = ExchangeConfig(
|
||||||
|
name="binance",
|
||||||
|
websocket_limit_per_second=10,
|
||||||
|
max_streams_per_connection=1024,
|
||||||
|
max_connections_per_window=300,
|
||||||
|
connection_window_seconds=300, # 5 minutes
|
||||||
|
base_websocket_url="wss://stream.binance.com:9443",
|
||||||
|
requires_auth=False
|
||||||
|
)
|
||||||
|
self.logger = Logger(f"BINANCE_ADAPTER")
|
||||||
|
|
||||||
|
def get_websocket_url(self, symbols: List[str]) -> str:
|
||||||
|
"""Build Binance WebSocket URL with combined streams"""
|
||||||
|
if len(symbols) == 1:
|
||||||
|
symbol = symbols[0].lower()
|
||||||
|
return f"{self.config.base_websocket_url}/ws/{symbol}@trade"
|
||||||
|
else:
|
||||||
|
# Combined stream approach
|
||||||
|
streams = [f"{symbol.lower()}@trade" for symbol in symbols]
|
||||||
|
stream_string = "/".join(streams)
|
||||||
|
return f"{self.config.base_websocket_url}/stream?streams={stream_string}"
|
||||||
|
|
||||||
|
def parse_message(self, message: str) -> Optional[TickData]:
|
||||||
|
"""EXTRACTED FROM STANDALONE: Parse Binance trade message"""
|
||||||
|
try:
|
||||||
|
data = json.loads(message)
|
||||||
|
|
||||||
|
# Handle combined stream format
|
||||||
|
if 'stream' in data and 'data' in data:
|
||||||
|
stream_name = data['stream']
|
||||||
|
trade_data = data['data']
|
||||||
|
else:
|
||||||
|
# Single stream format
|
||||||
|
trade_data = data
|
||||||
|
stream_name = trade_data.get('e', '')
|
||||||
|
|
||||||
|
# Only process trade events
|
||||||
|
if not (stream_name.endswith('@trade') or trade_data.get('e') == 'trade'):
|
||||||
|
return None
|
||||||
|
|
||||||
|
return TickData(
|
||||||
|
symbol=trade_data['s'],
|
||||||
|
price=float(trade_data['p']),
|
||||||
|
quantity=float(trade_data['q']),
|
||||||
|
timestamp=int(trade_data['T']),
|
||||||
|
is_buyer_maker=trade_data['m'],
|
||||||
|
trade_id=int(trade_data['t']),
|
||||||
|
exchange="binance",
|
||||||
|
raw_data=trade_data
|
||||||
|
)
|
||||||
|
|
||||||
|
except (json.JSONDecodeError, KeyError, ValueError) as e:
|
||||||
|
self.logger.error(f"Failed to parse Binance message: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def calculate_optimal_distribution(self, symbols: List[str]) -> List[List[str]]:
|
||||||
|
"""EXTRACTED FROM STANDALONE: Calculate optimal symbol distribution across connections"""
|
||||||
|
# Conservative: leave room for message bursts
|
||||||
|
safety_margin = 0.7
|
||||||
|
symbols_per_connection = int(
|
||||||
|
min(
|
||||||
|
self.config.max_streams_per_connection * safety_margin,
|
||||||
|
self.config.websocket_limit_per_second * 5 # 5-second buffer
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Group symbols into chunks
|
||||||
|
symbol_groups = []
|
||||||
|
for i in range(0, len(symbols), symbols_per_connection):
|
||||||
|
group = symbols[i:i + symbols_per_connection]
|
||||||
|
symbol_groups.append(group)
|
||||||
|
|
||||||
|
return symbol_groups
|
||||||
|
|
||||||
|
class SiloqyWebSocketConnection:
|
||||||
|
"""EXTRACTED FROM STANDALONE: WebSocket connection adapted for Nautilus msgbus"""
|
||||||
|
|
||||||
|
def __init__(self, connection_id: str, symbols: List[str],
|
||||||
|
binance_adapter: BinanceAdapter, main_actor_ref):
|
||||||
|
self.connection_id = connection_id
|
||||||
|
self.symbols = symbols
|
||||||
|
self.adapter = binance_adapter
|
||||||
|
self.main_actor = main_actor_ref # Reference to SILOQYMainActor for msgbus access
|
||||||
|
|
||||||
|
self.websocket = None
|
||||||
|
self.is_running = False
|
||||||
|
self.retry_count = 0
|
||||||
|
self.messages_received = 0
|
||||||
|
self.last_message_time = 0.0
|
||||||
|
self.connected_at = None
|
||||||
|
|
||||||
|
self.logger = Logger(f"WS_{connection_id}")
|
||||||
|
self.logger.info(f"WebSocket connection initialized for {len(symbols)} symbols")
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
"""Start the WebSocket connection"""
|
||||||
|
self.is_running = True
|
||||||
|
|
||||||
|
while self.is_running:
|
||||||
|
try:
|
||||||
|
await self._connect_and_run()
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Connection {self.connection_id} error: {e}")
|
||||||
|
|
||||||
|
if self.is_running:
|
||||||
|
await self._handle_reconnection()
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
|
||||||
|
async def _connect_and_run(self) -> None:
|
||||||
|
"""EXTRACTED FROM STANDALONE: Connect to WebSocket and run message loop"""
|
||||||
|
url = self.adapter.get_websocket_url(self.symbols)
|
||||||
|
|
||||||
|
self.logger.info(f"Connecting to Binance: {url[:100]}...")
|
||||||
|
|
||||||
|
async with websockets.connect(
|
||||||
|
url,
|
||||||
|
ping_interval=self.adapter.config.ping_interval,
|
||||||
|
ping_timeout=self.adapter.config.ping_timeout,
|
||||||
|
close_timeout=10
|
||||||
|
) as websocket:
|
||||||
|
self.websocket = websocket
|
||||||
|
self.connected_at = time.time()
|
||||||
|
self.retry_count = 0
|
||||||
|
|
||||||
|
self.logger.info(f"Connection {self.connection_id} established")
|
||||||
|
|
||||||
|
# Message processing loop
|
||||||
|
async for message in websocket:
|
||||||
|
if not self.is_running:
|
||||||
|
break
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._process_message(message)
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Message processing error: {e}")
|
||||||
|
|
||||||
|
async def _process_message(self, message: str) -> None:
|
||||||
|
"""Process incoming WebSocket message and convert to Nautilus format"""
|
||||||
|
self.messages_received += 1
|
||||||
|
self.last_message_time = time.time()
|
||||||
|
|
||||||
|
# Parse message using Binance adapter
|
||||||
|
tick_data = self.adapter.parse_message(message)
|
||||||
|
if tick_data:
|
||||||
|
# Convert TickData to SiloqyTradeTick and publish via Nautilus msgbus
|
||||||
|
try:
|
||||||
|
# Create SiloqyTradeTick object
|
||||||
|
siloqy_tick = SiloqyTradeTick(
|
||||||
|
instrument_id=tick_data.symbol,
|
||||||
|
price=round(tick_data.price, 8),
|
||||||
|
size=round(tick_data.quantity, 8),
|
||||||
|
ts_event=tick_data.timestamp * 1000000, # Convert ms to ns
|
||||||
|
ts_init=int(time.time_ns()),
|
||||||
|
open_price=None, # Will be set by candle logic
|
||||||
|
candle_open_time=None, # Will be set by candle logic
|
||||||
|
tick_side=None,
|
||||||
|
exchange="BINANCE",
|
||||||
|
liquidity_flag=None
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get candle information from main actor
|
||||||
|
if hasattr(self.main_actor, 'active_candles') and self.main_actor.active_candles:
|
||||||
|
symbol = tick_data.symbol
|
||||||
|
if symbol in self.main_actor.active_candles:
|
||||||
|
candle = self.main_actor.active_candles[symbol]
|
||||||
|
siloqy_tick.open_price = candle.get('open_price', tick_data.price)
|
||||||
|
siloqy_tick.candle_open_time = candle.get('open_time', int(time.time() * 1000))
|
||||||
|
|
||||||
|
# Process through main actor's tick handling (preserves all candle logic)
|
||||||
|
self.main_actor.on_websocket_tick(
|
||||||
|
tick_data.symbol,
|
||||||
|
tick_data.price,
|
||||||
|
tick_data.quantity,
|
||||||
|
tick_data.timestamp
|
||||||
|
)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
self.logger.error(f"Failed to process tick: {e}")
|
||||||
|
|
||||||
|
async def _handle_reconnection(self) -> None:
|
||||||
|
"""Handle reconnection with exponential backoff"""
|
||||||
|
self.retry_count += 1
|
||||||
|
delay = min(2 ** self.retry_count, 60) # Max 60 seconds
|
||||||
|
|
||||||
|
self.logger.warning(f"Reconnecting in {delay}s (attempt {self.retry_count})")
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
"""Stop the connection"""
|
||||||
|
self.logger.info(f"Stopping connection {self.connection_id}")
|
||||||
|
self.is_running = False
|
||||||
|
|
||||||
|
if self.websocket:
|
||||||
|
await self.websocket.close()
|
||||||
|
|
||||||
# --------------------------------------------------------------------
|
# --------------------------------------------------------------------
|
||||||
# SILOQY Custom Tick - PRESERVED with fixes
|
# SILOQY Custom Tick - PRESERVED with fixes
|
||||||
@@ -95,10 +330,11 @@ class SILOQYSymbolDiscoveryConfig(ActorConfig):
|
|||||||
class SILOQYMainActorConfig(ActorConfig):
|
class SILOQYMainActorConfig(ActorConfig):
|
||||||
candle_interval_ms: int = 15 * 60 * 1000
|
candle_interval_ms: int = 15 * 60 * 1000
|
||||||
throttle_mode: bool = False
|
throttle_mode: bool = False
|
||||||
|
enable_real_data: bool = False # NEW: Enable real WebSocket data
|
||||||
|
|
||||||
class DOLPHINRegimeActorConfig(ActorConfig):
|
class DOLPHINRegimeActorConfig(ActorConfig):
|
||||||
max_symbols: int = 5000
|
max_symbols: int = 5000
|
||||||
ticks_per_analysis: int = 1000
|
ticks_per_analysis: int = 10
|
||||||
|
|
||||||
class SILOQYNormalizerConfig(ActorConfig):
|
class SILOQYNormalizerConfig(ActorConfig):
|
||||||
pass
|
pass
|
||||||
@@ -200,6 +436,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
|
|||||||
raise Exception(f"Failed to fetch exchange info: {response.status_code}")
|
raise Exception(f"Failed to fetch exchange info: {response.status_code}")
|
||||||
|
|
||||||
async def _fetch_stats_and_reconstruct_candles(self):
|
async def _fetch_stats_and_reconstruct_candles(self):
|
||||||
|
"""PRESERVED: All original rate limiting with Nautilus async patterns"""
|
||||||
url = "https://api.binance.com/api/v3/ticker/24hr"
|
url = "https://api.binance.com/api/v3/ticker/24hr"
|
||||||
klines_url = "https://api.binance.com/api/v3/klines"
|
klines_url = "https://api.binance.com/api/v3/klines"
|
||||||
ticker_url = "https://api.binance.com/api/v3/ticker/price"
|
ticker_url = "https://api.binance.com/api/v3/ticker/price"
|
||||||
@@ -315,7 +552,7 @@ class SILOQYSymbolDiscoveryActor(Actor):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error(f"Request failed: {str(e)}")
|
self.log.error(f"Request failed: {str(e)}")
|
||||||
|
|
||||||
# PRESERVED: Original rate limiting
|
# PRESERVED: Original rate limiting with Nautilus async
|
||||||
elapsed = time.time() - start_time
|
elapsed = time.time() - start_time
|
||||||
if i + 50 < len(self.symbols):
|
if i + 50 < len(self.symbols):
|
||||||
sleep_time = max(0, rate_limit_interval - elapsed)
|
sleep_time = max(0, rate_limit_interval - elapsed)
|
||||||
@@ -353,6 +590,7 @@ class SILOQYMainActor(Actor):
|
|||||||
# Preserve original configuration
|
# Preserve original configuration
|
||||||
self.candle_interval_ms = config.candle_interval_ms
|
self.candle_interval_ms = config.candle_interval_ms
|
||||||
self.throttle_mode = config.throttle_mode
|
self.throttle_mode = config.throttle_mode
|
||||||
|
self.enable_real_data = config.enable_real_data # NEW: Real data capability
|
||||||
self.connections = {}
|
self.connections = {}
|
||||||
self.connection_tasks = {}
|
self.connection_tasks = {}
|
||||||
|
|
||||||
@@ -360,6 +598,11 @@ class SILOQYMainActor(Actor):
|
|||||||
self.symbols = []
|
self.symbols = []
|
||||||
self.active_candles = {}
|
self.active_candles = {}
|
||||||
|
|
||||||
|
# WebSocket infrastructure - NEW
|
||||||
|
self.binance_adapter = BinanceAdapter() if self.enable_real_data else None
|
||||||
|
self.websocket_connections: Dict[str, SiloqyWebSocketConnection] = {}
|
||||||
|
self.websocket_tasks: Dict[str, asyncio.Task] = {}
|
||||||
|
|
||||||
# WebSocket tasks - managed by Nautilus ActorExecutor
|
# WebSocket tasks - managed by Nautilus ActorExecutor
|
||||||
self.ws_tasks = []
|
self.ws_tasks = []
|
||||||
|
|
||||||
@@ -369,6 +612,11 @@ class SILOQYMainActor(Actor):
|
|||||||
if self.throttle_mode:
|
if self.throttle_mode:
|
||||||
self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation")
|
self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation")
|
||||||
|
|
||||||
|
if self.enable_real_data:
|
||||||
|
self.log.info("REAL DATA MODE: Will connect to Binance WebSocket streams")
|
||||||
|
else:
|
||||||
|
self.log.info("SIMULATION MODE: Will generate simulated ticks")
|
||||||
|
|
||||||
self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor")
|
self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor")
|
||||||
|
|
||||||
def on_start(self) -> None:
|
def on_start(self) -> None:
|
||||||
@@ -393,6 +641,12 @@ class SILOQYMainActor(Actor):
|
|||||||
def on_stop(self) -> None:
|
def on_stop(self) -> None:
|
||||||
"""Stop the actor - Nautilus handles executor cleanup"""
|
"""Stop the actor - Nautilus handles executor cleanup"""
|
||||||
self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping")
|
self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping")
|
||||||
|
|
||||||
|
# Stop WebSocket connections if running
|
||||||
|
if self.enable_real_data:
|
||||||
|
for conn in self.websocket_connections.values():
|
||||||
|
asyncio.create_task(conn.stop())
|
||||||
|
|
||||||
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
# Nautilus kernel handles executor shutdown - no manual cleanup needed
|
||||||
|
|
||||||
async def on_start_async(self) -> None:
|
async def on_start_async(self) -> None:
|
||||||
@@ -401,8 +655,11 @@ class SILOQYMainActor(Actor):
|
|||||||
self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...")
|
self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...")
|
||||||
await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0)
|
await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0)
|
||||||
|
|
||||||
# PRESERVED: Start WebSocket connections
|
# ENHANCED: Start real or simulated connections based on configuration
|
||||||
await self._start_websocket_connections()
|
if self.enable_real_data:
|
||||||
|
await self._start_real_websocket_connections()
|
||||||
|
else:
|
||||||
|
await self._start_simulated_connections()
|
||||||
|
|
||||||
self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully")
|
self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully")
|
||||||
|
|
||||||
@@ -423,7 +680,6 @@ class SILOQYMainActor(Actor):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}")
|
self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}")
|
||||||
|
|
||||||
# FIXED: SILOQYMainActor.handle_candles_initial
|
|
||||||
def handle_candles_initial(self, data):
|
def handle_candles_initial(self, data):
|
||||||
"""Handle initial candles from discovery actor and properly initialize the candle dict."""
|
"""Handle initial candles from discovery actor and properly initialize the candle dict."""
|
||||||
try:
|
try:
|
||||||
@@ -448,8 +704,53 @@ class SILOQYMainActor(Actor):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}")
|
self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}")
|
||||||
|
|
||||||
async def _start_websocket_connections(self):
|
async def _start_real_websocket_connections(self):
|
||||||
"""PRESERVED: Original WebSocket connection logic with Nautilus ActorExecutor"""
|
"""NEW: Start real Binance WebSocket connections with throttle control"""
|
||||||
|
self.log.info("Starting REAL Binance WebSocket connections for live data...")
|
||||||
|
|
||||||
|
# Apply throttle mode limits to WebSocket connections
|
||||||
|
symbols_to_use = self.symbols
|
||||||
|
if self.throttle_mode:
|
||||||
|
symbols_to_use = self.symbols[:10] # Limit to 10 symbols in throttle mode
|
||||||
|
self.log.warning(f"THROTTLE MODE: Limited real data to {len(symbols_to_use)} symbols")
|
||||||
|
|
||||||
|
# Calculate optimal symbol distribution
|
||||||
|
symbol_groups = self.binance_adapter.calculate_optimal_distribution(symbols_to_use)
|
||||||
|
|
||||||
|
self.log.info(f"Real WebSocket distribution:")
|
||||||
|
self.log.info(f" - Total symbols: {len(symbols_to_use)}")
|
||||||
|
self.log.info(f" - Connections needed: {len(symbol_groups)}")
|
||||||
|
|
||||||
|
# Start WebSocket connections
|
||||||
|
for i, symbol_group in enumerate(symbol_groups):
|
||||||
|
connection_id = f"binance_real_{i:03d}"
|
||||||
|
|
||||||
|
connection = SiloqyWebSocketConnection(
|
||||||
|
connection_id=connection_id,
|
||||||
|
symbols=symbol_group,
|
||||||
|
binance_adapter=self.binance_adapter,
|
||||||
|
main_actor_ref=self
|
||||||
|
)
|
||||||
|
|
||||||
|
self.websocket_connections[connection_id] = connection
|
||||||
|
|
||||||
|
# Use Nautilus ActorExecutor for WebSocket task management
|
||||||
|
if hasattr(self, '_executor') and self._executor:
|
||||||
|
future = self._executor.submit(connection.start())
|
||||||
|
self.log.info(f"Real WebSocket {connection_id} submitted to ActorExecutor")
|
||||||
|
else:
|
||||||
|
task = asyncio.create_task(connection.start())
|
||||||
|
self.websocket_tasks[connection_id] = task
|
||||||
|
self.log.warning(f"Real WebSocket {connection_id} started with asyncio fallback")
|
||||||
|
|
||||||
|
# Rate limit connection establishment
|
||||||
|
if i < len(symbol_groups) - 1:
|
||||||
|
await asyncio.sleep(1.0) # 1 second between connections
|
||||||
|
|
||||||
|
self.log.info(f"Started {len(symbol_groups)} real WebSocket connections")
|
||||||
|
|
||||||
|
async def _start_simulated_connections(self):
|
||||||
|
"""PRESERVED: Original WebSocket simulation logic with Nautilus ActorExecutor"""
|
||||||
self.log.info("Starting WebSocket simulation for tick generation...")
|
self.log.info("Starting WebSocket simulation for tick generation...")
|
||||||
self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols")
|
self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols")
|
||||||
|
|
||||||
@@ -464,6 +765,7 @@ class SILOQYMainActor(Actor):
|
|||||||
self.ws_tasks.append(task)
|
self.ws_tasks.append(task)
|
||||||
|
|
||||||
async def _simulate_websocket_ticks(self):
|
async def _simulate_websocket_ticks(self):
|
||||||
|
"""PRESERVED: Original simulation logic"""
|
||||||
# Adjust tick rate for throttle mode
|
# Adjust tick rate for throttle mode
|
||||||
tick_delay = 0.1 if self.throttle_mode else 0.01
|
tick_delay = 0.1 if self.throttle_mode else 0.01
|
||||||
symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols))
|
symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols))
|
||||||
@@ -489,7 +791,8 @@ class SILOQYMainActor(Actor):
|
|||||||
log_interval = 500 if self.throttle_mode else 1000
|
log_interval = 500 if self.throttle_mode else 1000
|
||||||
if tick_count % log_interval == 0:
|
if tick_count % log_interval == 0:
|
||||||
mode_indicator = "THROTTLE" if self.throttle_mode else ""
|
mode_indicator = "THROTTLE" if self.throttle_mode else ""
|
||||||
self.log.info(f"Generated {tick_count} ticks {mode_indicator} - latest: {symbol}@{price:.2f}")
|
data_source = "REAL" if self.enable_real_data else "SIM"
|
||||||
|
self.log.info(f"Generated {tick_count} ticks {mode_indicator} {data_source} - latest: {symbol}@{price:.2f}")
|
||||||
|
|
||||||
await asyncio.sleep(tick_delay)
|
await asyncio.sleep(tick_delay)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
@@ -686,6 +989,9 @@ class DOLPHINRegimeActor(Actor):
|
|||||||
bullish = 0
|
bullish = 0
|
||||||
bearish = 0
|
bearish = 0
|
||||||
|
|
||||||
|
# NEW: Track pattern of bullish/bearish symbols for this calculation
|
||||||
|
symbol_pattern = []
|
||||||
|
|
||||||
# PRESERVED: Original analysis with exact thresholds
|
# PRESERVED: Original analysis with exact thresholds
|
||||||
for idx in range(self.active_symbols):
|
for idx in range(self.active_symbols):
|
||||||
open_price = self.open_prices[idx]
|
open_price = self.open_prices[idx]
|
||||||
@@ -696,13 +1002,23 @@ class DOLPHINRegimeActor(Actor):
|
|||||||
|
|
||||||
analyzed += 1
|
analyzed += 1
|
||||||
|
|
||||||
# PRESERVED: EXACT DOLPHIN thresholds
|
# NEW: Direct price comparison with epsilon for precision
|
||||||
change = (close_price - open_price) / open_price
|
EPSILON = 1e-10 # Very small tolerance to capture any meaningful price change
|
||||||
|
|
||||||
if change >= 0.0015: # 0.15% threshold for bullish
|
# Check if prices are effectively equal
|
||||||
|
if abs(close_price - open_price) <= EPSILON:
|
||||||
|
# Prices are effectively equal
|
||||||
|
symbol_pattern.append(f"S{close_price:.2f}={open_price:.2f}")
|
||||||
|
elif close_price > open_price:
|
||||||
|
# Bullish: close > open
|
||||||
bullish += 1
|
bullish += 1
|
||||||
elif change <= -0.0015: # -0.15% threshold for bearish
|
# Arrow points to close (larger price)
|
||||||
|
symbol_pattern.append(f"B{open_price:.2f}->{close_price:.2f}")
|
||||||
|
else:
|
||||||
|
# Bearish: close < open
|
||||||
bearish += 1
|
bearish += 1
|
||||||
|
# Arrow points to open (larger price)
|
||||||
|
symbol_pattern.append(f"X{close_price:.2f}<-{open_price:.2f}")
|
||||||
|
|
||||||
if analyzed == 0:
|
if analyzed == 0:
|
||||||
return
|
return
|
||||||
@@ -753,9 +1069,16 @@ class DOLPHINRegimeActor(Actor):
|
|||||||
self.regime_history.append(regime)
|
self.regime_history.append(regime)
|
||||||
|
|
||||||
# Periodic regime status (even without changes)
|
# Periodic regime status (even without changes)
|
||||||
if self.regime_calculations % 10 == 0: # Every 10 calculations
|
if self.regime_calculations % 1 == 0: # Every calculation
|
||||||
self.log.info(f"REGIME STATUS: {regime.value} | Bull: {bull_ratio:.1%} "
|
self.log.info(f"REGIME STATUS: {regime.value} | Bull: {bull_ratio:.1%} "
|
||||||
f"Bear: {bear_ratio:.1%} | Processed: {self.processed_ticks} ticks")
|
f"Bear: {bear_ratio:.1%} | Processed: {self.processed_ticks} ticks")
|
||||||
|
|
||||||
|
# NEW: Log symbol pattern and counts
|
||||||
|
if symbol_pattern: # Only if we have symbols to show
|
||||||
|
pattern_str = " ".join(symbol_pattern) + " " # Create pattern with spaces
|
||||||
|
bull_count = sum(1 for s in symbol_pattern if s.startswith("B"))
|
||||||
|
bear_count = sum(1 for s in symbol_pattern if s.startswith("X"))
|
||||||
|
self.log.info(f"{pattern_str} and totals: BULLS:{bull_count}/BEARS:{bear_count}")
|
||||||
|
|
||||||
def _calculate_confidence(self, bull_ratio: float, bear_ratio: float,
|
def _calculate_confidence(self, bull_ratio: float, bear_ratio: float,
|
||||||
analyzed: int, total: int) -> float:
|
analyzed: int, total: int) -> float:
|
||||||
@@ -880,7 +1203,8 @@ def test_siloqy_actors_with_nautilus_process_management():
|
|||||||
config={
|
config={
|
||||||
"component_id": "SILOQY-MAIN-ACTOR",
|
"component_id": "SILOQY-MAIN-ACTOR",
|
||||||
"candle_interval_ms": 15 * 60 * 1000,
|
"candle_interval_ms": 15 * 60 * 1000,
|
||||||
"throttle_mode": True # ENABLED: Reduced tick generation
|
"throttle_mode": True, # ENABLED: Reduced tick generation
|
||||||
|
"enable_real_data": True # CHANGE TO True for real WebSocket data
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -890,7 +1214,7 @@ def test_siloqy_actors_with_nautilus_process_management():
|
|||||||
config={
|
config={
|
||||||
"component_id": "DOLPHIN-REGIME-ACTOR",
|
"component_id": "DOLPHIN-REGIME-ACTOR",
|
||||||
"max_symbols": 5000,
|
"max_symbols": 5000,
|
||||||
"ticks_per_analysis": 500 # Reduced for throttle mode testing
|
"ticks_per_analysis": 2 # Reduced for throttle mode testing
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -915,8 +1239,9 @@ def test_siloqy_actors_with_nautilus_process_management():
|
|||||||
exec_clients={} # No execution clients for this test
|
exec_clients={} # No execution clients for this test
|
||||||
)
|
)
|
||||||
|
|
||||||
node = TradingNode(config=trading_config)
|
node = TradingNode(config=trading_config)
|
||||||
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
node.build()
|
node.build()
|
||||||
print("Node built successfully with Nautilus built-in process management")
|
print("Node built successfully with Nautilus built-in process management")
|
||||||
|
|||||||
Reference in New Issue
Block a user