diff --git a/SILOQY_DOLPHIN_Integration_Engineering_Spect_1.0.md b/SILOQY_DOLPHIN_Integration_Engineering_Spect_1.0.md new file mode 100644 index 0000000..a0ad5a1 --- /dev/null +++ b/SILOQY_DOLPHIN_Integration_Engineering_Spect_1.0.md @@ -0,0 +1,1399 @@ +SILOQY DOLPHIN Integration Engineering Specification +==================================================== + +**Document Version:** 1.0 +**Date:** August 31, 2025 +**Target Audience:** SILOQY Component Developers +**Framework:** Nautilus Trader Integration Architecture + +Executive Summary +----------------- + +This document provides the complete technical specification for integrating with +the SILOQY DOLPHIN market regime detection system within the Nautilus Trader +framework. It details the message bus architecture, actor lifecycle management, +data flow patterns, and exact implementation requirements for new SILOQY +components. + +Table of Contents +----------------- + +1. [Architecture Overview](#architecture-overview) + +2. [Nautilus Message Bus Communication](#nautilus-message-bus-communication) + +3. [Actor Lifecycle and Execution Model](#actor-lifecycle-and-execution-model) + +4. [SILOQY Topic Hierarchy and Data + Structures](#siloqy-topic-hierarchy-and-data-structures) + +5. [Producer/Consumer Patterns](#producer-consumer-patterns) + +6. [Component Integration Patterns](#component-integration-patterns) + +7. [Error Handling and Recovery](#error-handling-and-recovery) + +8. [Performance Optimization Guidelines](#performance-optimization-guidelines) + +9. [Testing and Validation Requirements](#testing-and-validation-requirements) + +10. [Code Examples and Templates](#code-examples-and-templates) + +Architecture Overview +--------------------- + +### System Components + +The SILOQY DOLPHIN system operates within a single Nautilus TradingNode instance +with four core actors: + +1. **SILOQYSymbolDiscoveryActor**: Symbol discovery and candle reconstruction + +2. **SILOQYMainActor**: Real-time data ingestion and WebSocket management + + +3. **DOLPHINRegimeActor**: Market regime detection and analysis + +4. **SILOQYNormalizerActor**: Tick normalization and format standardization + +### Data Flow Architecture + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +External Market Data → SILOQYMainActor → MessageBus → [DOLPHINRegimeActor, SILOQYNormalizerActor] + ↓ + Custom SILOQY Components +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Shared Node Pattern + +**Critical Design Principle:** New SILOQY components MUST integrate into the +existing TradingNode instance rather than creating separate nodes. This ensures: +- Shared message bus access - Unified actor lifecycle management - Consistent +resource allocation - Optimal performance through single-process architecture + +Nautilus Message Bus Communication +---------------------------------- + +### Message Bus Access Pattern + +All SILOQY actors gain access to the Nautilus MessageBus through the standard +Nautilus actor initialization: + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class MySILOQYComponent(Actor): + def __init__(self, config: MyComponentConfig): + super().__init__(config) + # MessageBus is available as self.msgbus after Nautilus initialization + + def on_start(self) -> None: + # Subscribe to topics ONLY in on_start() + if hasattr(self, 'msgbus') and self.msgbus: + self.msgbus.subscribe("SILOQY.MY.TOPIC", self.handle_my_data) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Message Publishing Protocol + +**High-Performance Publishing Pattern:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +def publish_siloqy_data(self, topic: str, data: Any) -> None: + """Standard SILOQY publishing pattern""" + try: + if hasattr(self, 'msgbus') and self.msgbus: + self.msgbus.publish(topic, data) + else: + self.log.error("MessageBus not available for publishing") + except Exception as e: + self.log.error(f"Failed to publish to {topic}: {e}") +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Subscription Management + +**Thread-Safe Subscription Pattern:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +def on_start(self) -> None: + """Subscribe to required SILOQY topics""" + if hasattr(self, 'msgbus') and self.msgbus: + # Subscribe to core SILOQY data streams + self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) + self.msgbus.subscribe(REGIME_TOPIC, self.handle_regime_update) + self.msgbus.subscribe("SILOQY.CUSTOM.TOPIC", self.handle_custom_data) + + self.log.info("Subscribed to SILOQY message bus topics") + else: + self.log.error("MessageBus not available during initialization") +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Message Bus Timing Guarantees + +- **Publication Latency:** \< 1μs for tuple data + +- **Delivery Guarantee:** Synchronous delivery to all subscribers + +- **Thread Safety:** Fully thread-safe for concurrent access + +- **Memory Management:** Zero-copy for immutable data types + +Actor Lifecycle and Execution Model +----------------------------------- + +### Nautilus Actor Initialization Sequence + +1. **Construction Phase:** + + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + def __init__(self, config: ActorConfig): + super().__init__(config) # MUST call super().__init__() + # Initialize component-specific state + # MessageBus NOT available yet + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +2. **Registration Phase:** + + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + # Handled automatically by TradingNode.build() + # Actor registered with Nautilus kernel + # MessageBus reference established + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +3. **Startup Phase:** + + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + def on_start(self) -> None: + # MessageBus NOW available as self.msgbus + # Subscribe to required topics + # Initialize async tasks using ActorExecutor + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +4. **Execution Phase:** + + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + # Actor receives messages via subscribed handlers + # Actor publishes data via self.msgbus.publish() + # Long-running tasks managed by ActorExecutor + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +5. **Shutdown Phase:** + + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + def on_stop(self) -> None: + # Clean up component-specific resources + # Nautilus handles ActorExecutor cleanup automatically + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### ActorExecutor Task Management + +**Preferred Pattern for Async Operations:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +def on_start(self) -> None: + if hasattr(self, '_executor') and self._executor: + self.log.info("Using Nautilus ActorExecutor") + future = self._executor.submit(self.async_initialization()) + else: + self.log.warning("No executor available, falling back to asyncio") + asyncio.create_task(self.async_initialization()) + +async def async_initialization(self): + # Long-running async operations + # Managed by Nautilus ActorExecutor + pass +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Actor State Management + +**Thread-Safe State Pattern:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class MySILOQYActor(Actor): + def __init__(self, config): + super().__init__(config) + + # Pre-allocated arrays for high-frequency data (DOLPHIN pattern) + self.data_arrays = np.zeros(config.max_items, dtype=np.float64) + self.symbol_mapping = {} + self.active_count = 0 + + # Thread-safe collections for shared state + self.message_queue = deque(maxlen=10000) + self.state_lock = threading.Lock() +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +SILOQY Topic Hierarchy and Data Structures +------------------------------------------ + +### Core SILOQY Topics + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Primary data flow topics +RAW_TOPIC = "SILOQY.RAW.TICKS" # Raw market tick data +STRUCTURED_TOPIC = "SILOQY.STRUCTURED.TICKS" # Normalized SiloqyTradeTick objects +REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS" # Market regime detection results + +# Discovery and initialization topics +SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED" # Symbol discovery completion +CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL" # Candle reconstruction results + +# Custom component topics (examples) +HD_VECTOR_TOPIC = "SILOQY.HD.VECTORS" # Hyperdimensional vector data +TEMPORAL_TOPIC = "SILOQY.TEMPORAL.PATTERNS" # Temporal memory patterns +ANALYSIS_TOPIC = "SILOQY.ANALYSIS.RESULTS" # Custom analysis results +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Topic Naming Convention + +**Standard Format:** `SILOQY.{COMPONENT}.{DATA_TYPE}.{VARIANT}` + +**Examples:** - `SILOQY.SENTIMENT.ANALYSIS.REALTIME` - +`SILOQY.CORRELATION.MATRIX.UPDATED` - `SILOQY.RISK.METRICS.CALCULATED` - +`SILOQY.ML.PREDICTIONS.GENERATED` + +### Data Structure Specifications + +#### Raw Tick Data Format (RAW_TOPIC) + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Tuple format: (symbol, price, size, ts_event, open_price, candle_open_time) +raw_tick_tuple = ( + "BTCUSDT", # str: Symbol identifier + 45000.12345678, # float: Current price (8 decimal precision) + 1.23456789, # float: Trade quantity (8 decimal precision) + 1693489234567, # int: Event timestamp (milliseconds) + 44950.87654321, # float: Current candle open price (8 decimal precision) + 1693488900000 # int: Candle open time (milliseconds) +) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +#### Structured Tick Data Format (STRUCTURED_TOPIC) + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# SiloqyTradeTick object with additional SILOQY fields +structured_tick = SiloqyTradeTick( + instrument_id="BTCUSDT.BINANCE", # InstrumentId: Nautilus format + price=Price(45000.12345678, 8), # Price: Nautilus price object + size=Quantity(1.23456789, 8), # Quantity: Nautilus quantity object + ts_event=1693489234567000000, # int: Nanosecond timestamp + open_price=44950.87654321, # float: SILOQY candle open price + candle_open_time=1693488900000, # int: SILOQY candle open time + exchange="BINANCE", # str: SILOQY exchange identifier + tick_side=None, # Optional[str]: Order side + liquidity_flag=None # Optional[str]: Liquidity classification +) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +#### Regime Detection Results (REGIME_TOPIC) + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Tuple format: (timestamp, regime, bull_ratio, bear_ratio, sideways_ratio, analyzed, total, confidence) +regime_tuple = ( + 1693489234567, # int: Detection timestamp (milliseconds) + "BULL", # str: Regime type ["BULL", "BEAR", "SIDEWAYS"] + 0.67, # float: Bull ratio (0.0-1.0) + 0.25, # float: Bear ratio (0.0-1.0) + 0.08, # float: Sideways ratio (0.0-1.0) + 150, # int: Symbols analyzed + 180, # int: Total symbols available + 0.82 # float: Confidence score (0.0-1.0) +) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Custom Data Structure Template + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +@dataclass +class SILOQYCustomData: + """Template for custom SILOQY data structures""" + component_id: str + timestamp: int + data_type: str + payload: Dict[str, Any] + confidence: float = 1.0 + metadata: Dict[str, Any] = field(default_factory=dict) + + def to_message_tuple(self) -> Tuple: + """Convert to message bus tuple format""" + return ( + self.component_id, + self.timestamp, + self.data_type, + json.dumps(self.payload), + self.confidence, + json.dumps(self.metadata) + ) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Producer/Consumer Patterns +-------------------------- + +### High-Frequency Data Producer Pattern + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class SILOQYHighFrequencyProducer(Actor): + """Template for high-frequency data producers""" + + def __init__(self, config): + super().__init__(config) + self.publish_count = 0 + self.last_publish_time = 0 + + def produce_data_efficiently(self, data: Any, topic: str) -> None: + """High-performance data publishing""" + try: + # Validate data before publishing + if self._validate_data(data): + self.msgbus.publish(topic, data) + self.publish_count += 1 + + # Performance monitoring + if self.publish_count % 1000 == 0: + self._log_performance_metrics() + else: + self.log.warning(f"Invalid data rejected for topic {topic}") + + except Exception as e: + self.log.error(f"Publishing failed for topic {topic}: {e}") + + def _validate_data(self, data: Any) -> bool: + """Implement component-specific validation""" + # Override in derived classes + return True + + def _log_performance_metrics(self) -> None: + """Log performance metrics""" + current_time = time.time() + if self.last_publish_time > 0: + duration = current_time - self.last_publish_time + rate = 1000 / duration if duration > 0 else 0 + self.log.info(f"Publishing rate: {rate:.1f} messages/sec") + self.last_publish_time = current_time +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Data Consumer Pattern + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class SILOQYDataConsumer(Actor): + """Template for SILOQY data consumers""" + + def __init__(self, config): + super().__init__(config) + self.processed_count = 0 + self.processing_queue = deque(maxlen=10000) + self.batch_size = 100 + + def on_start(self) -> None: + """Subscribe to required SILOQY topics""" + if hasattr(self, 'msgbus') and self.msgbus: + # Subscribe to multiple SILOQY data streams + self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) + self.msgbus.subscribe(REGIME_TOPIC, self.handle_regime_update) + self.msgbus.subscribe("SILOQY.CUSTOM.DATA", self.handle_custom_data) + + self.log.info("SILOQY consumer subscriptions established") + + def handle_raw_tick(self, data: Tuple) -> None: + """Handle raw tick data with efficient processing""" + try: + # Parse data + symbol, price, size, ts_event, open_price, candle_open_time = data + + # Add to processing queue + self.processing_queue.append({ + 'type': 'raw_tick', + 'symbol': symbol, + 'price': price, + 'size': size, + 'timestamp': ts_event, + 'open_price': open_price, + 'candle_open_time': candle_open_time + }) + + # Batch processing for efficiency + if len(self.processing_queue) >= self.batch_size: + self._process_batch() + + except Exception as e: + self.log.error(f"Raw tick processing error: {e}") + + def handle_regime_update(self, data: Tuple) -> None: + """Handle DOLPHIN regime updates""" + try: + timestamp, regime, bull_ratio, bear_ratio, sideways_ratio, analyzed, total, confidence = data + + # Process regime change + self._on_regime_change(regime, bull_ratio, bear_ratio, confidence) + + except Exception as e: + self.log.error(f"Regime update processing error: {e}") + + def _process_batch(self) -> None: + """Process queued data in batches for efficiency""" + batch = list(self.processing_queue) + self.processing_queue.clear() + + # Process batch with single operation + self._bulk_process_data(batch) + self.processed_count += len(batch) + + def _bulk_process_data(self, batch: List[Dict]) -> None: + """Override in derived classes for bulk processing""" + pass + + def _on_regime_change(self, regime: str, bull_ratio: float, bear_ratio: float, confidence: float) -> None: + """Override in derived classes for regime-based logic""" + pass +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Actor Lifecycle and Execution Model +----------------------------------- + +### TradingNode Initialization Sequence + +1. **Node Creation:** + + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + trading_config = TradingNodeConfig( + trader_id=TraderId("SILOQY-PRODUCTION"), + actors=[ + # All SILOQY actor configurations + symbol_discovery_config, + main_actor_config, + regime_actor_config, + normalizer_config, + custom_component_config # New components added here + ] + ) + node = TradingNode(config=trading_config) + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +2. **Build Phase:** + + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + node.build() # Creates all actors, establishes MessageBus + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +3. **Startup Phase:** + + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + node.run() # Calls on_start() on all actors in dependency order + ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Actor Dependency Management + +**Initialization Order (Critical):** 1. `SILOQYSymbolDiscoveryActor` - Must +complete first 2. `SILOQYMainActor` - Depends on symbol discovery 3. +`DOLPHINRegimeActor` - Depends on main actor tick data 4. +`SILOQYNormalizerActor` - Depends on raw tick data 5. **Custom Components** - +Typically depend on normalized data or regime results + +### Async Task Management with ActorExecutor + +**ActorExecutor Usage Pattern:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +def on_start(self) -> None: + # Start long-running operations via ActorExecutor + if hasattr(self, '_executor') and self._executor: + self.log.info("Using Nautilus ActorExecutor for task management") + future = self._executor.submit(self.long_running_operation()) + else: + # Fallback for development/testing + asyncio.create_task(self.long_running_operation()) + +async def long_running_operation(self): + """Long-running async operation managed by Nautilus""" + while self.is_running: + try: + # Perform periodic operations + await self.process_periodic_task() + await asyncio.sleep(1.0) + except asyncio.CancelledError: + break + except Exception as e: + self.log.error(f"Long-running operation error: {e}") +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Actor State Synchronization + +**Synchronization Pattern for Actor Dependencies:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class DependentSILOQYActor(Actor): + def __init__(self, config): + super().__init__(config) + self.dependency_ready = asyncio.Event() + self.symbols = [] + self.candle_data = {} + + def on_start(self) -> None: + # Subscribe to dependency notifications + self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_ready) + self.msgbus.subscribe(CANDLES_INITIAL_TOPIC, self.handle_candles_ready) + + # Start dependent operations + if hasattr(self, '_executor') and self._executor: + future = self._executor.submit(self.wait_for_dependencies()) + + async def wait_for_dependencies(self): + """Wait for required data before starting operations""" + await asyncio.wait_for(self.dependency_ready.wait(), timeout=120.0) + # Now safe to start component-specific operations + await self.start_component_operations() + + def handle_symbols_ready(self, data): + symbols, timestamp = data + self.symbols = symbols + self._check_dependencies_complete() + + def handle_candles_ready(self, data): + candles, timestamp = data + self.candle_data = candles + self._check_dependencies_complete() + + def _check_dependencies_complete(self): + if self.symbols and self.candle_data: + self.dependency_ready.set() +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Component Integration Patterns +------------------------------ + +### Adding New SILOQY Components + +**Step 1: Component Configuration** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class MySILOQYComponentConfig(ActorConfig): + """Configuration for custom SILOQY component""" + component_name: str = "MyComponent" + update_interval_ms: int = 1000 + enable_hd_processing: bool = True + custom_parameters: Dict[str, Any] = field(default_factory=dict) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Step 2: Actor Implementation** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class MySILOQYComponent(Actor): + def __init__(self, config: MySILOQYComponentConfig): + super().__init__(config) + self.component_name = config.component_name + self.update_interval_ms = config.update_interval_ms + self.enable_hd_processing = config.enable_hd_processing + + def on_start(self) -> None: + # Subscribe to required SILOQY data streams + self.msgbus.subscribe(STRUCTURED_TOPIC, self.handle_structured_tick) + self.msgbus.subscribe(REGIME_TOPIC, self.handle_regime_update) + + # Start component-specific operations + if hasattr(self, '_executor') and self._executor: + future = self._executor.submit(self.start_processing()) + + def handle_structured_tick(self, tick: SiloqyTradeTick) -> None: + """Process incoming SILOQY tick data""" + # Component-specific processing + result = self.process_siloqy_tick(tick) + + # Publish results to custom topic + if result: + self.msgbus.publish("SILOQY.MYCOMPONENT.RESULTS", result) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Step 3: Registration with TradingNode** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +my_component_config = ImportableActorConfig( + actor_path="my_module:MySILOQYComponent", + config_path="my_module:MySILOQYComponentConfig", + config={ + "component_id": "MY-SILOQY-COMPONENT", + "component_name": "CustomAnalyzer", + "update_interval_ms": 500, + "enable_hd_processing": True + } +) + +# Add to existing TradingNode configuration +trading_config = TradingNodeConfig( + trader_id=TraderId("SILOQY-EXTENDED"), + actors=[ + symbol_discovery_config, + main_actor_config, + regime_actor_config, + normalizer_config, + my_component_config # Add custom component here + ] +) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Inter-Component Communication Patterns + +**Direct Communication via MessageBus:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Component A publishes analysis results +def publish_analysis_result(self, analysis_data: Dict): + custom_data = { + 'component_id': self.id, + 'analysis_type': 'correlation_matrix', + 'timestamp': int(time.time() * 1000), + 'results': analysis_data, + 'confidence': 0.95 + } + self.msgbus.publish("SILOQY.CORRELATION.ANALYSIS", custom_data) + +# Component B consumes analysis results +def on_start(self): + self.msgbus.subscribe("SILOQY.CORRELATION.ANALYSIS", self.handle_correlation_data) + +def handle_correlation_data(self, data: Dict): + # Process correlation analysis from other component + pass +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +**Request/Response Pattern:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Component A requests data +def request_analysis(self, symbol: str, timeframe: str): + request_data = { + 'request_id': str(uuid.uuid4()), + 'requesting_component': str(self.id), + 'symbol': symbol, + 'timeframe': timeframe, + 'response_topic': f"SILOQY.RESPONSE.{self.id}" + } + self.msgbus.publish("SILOQY.REQUEST.ANALYSIS", request_data) + +# Component B responds to requests +def handle_analysis_request(self, request_data: Dict): + # Process request + analysis_result = self.perform_analysis(request_data['symbol'], request_data['timeframe']) + + # Send response + response = { + 'request_id': request_data['request_id'], + 'result': analysis_result, + 'timestamp': int(time.time() * 1000) + } + self.msgbus.publish(request_data['response_topic'], response) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Error Handling and Recovery +--------------------------- + +### Message Processing Error Handling + +**Standard Error Handling Pattern:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +def handle_siloqy_message(self, data: Any) -> None: + """Standard SILOQY message handling with error recovery""" + try: + # Validate message structure + if not self._validate_message_structure(data): + self.log.warning(f"Invalid message structure rejected") + self._record_validation_failure("structure") + return + + # Process message + result = self._process_message_content(data) + + # Publish result if successful + if result: + self.msgbus.publish(self.output_topic, result) + + except ValidationError as e: + self.log.warning(f"Message validation failed: {e}") + self._record_validation_failure("validation") + except ProcessingError as e: + self.log.error(f"Message processing failed: {e}") + self._record_processing_failure() + except Exception as e: + self.log.error(f"Unexpected error in message handling: {e}") + self._record_unexpected_failure() + +def _record_validation_failure(self, failure_type: str) -> None: + """Record validation failures for monitoring""" + if not hasattr(self, 'validation_failures'): + self.validation_failures = defaultdict(int) + self.validation_failures[failure_type] += 1 +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Actor Recovery Patterns + +**Graceful Degradation Pattern:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class ResilientSILOQYActor(Actor): + def __init__(self, config): + super().__init__(config) + self.error_count = 0 + self.max_errors = 100 + self.recovery_mode = False + + def handle_processing_error(self, error: Exception) -> None: + """Handle processing errors with graceful degradation""" + self.error_count += 1 + + if self.error_count > self.max_errors: + self.log.error("Error threshold exceeded, entering recovery mode") + self.recovery_mode = True + # Reduce processing frequency or disable non-critical features + + # Reset error count periodically + if time.time() % 300 == 0: # Every 5 minutes + self.error_count = max(0, self.error_count - 10) + if self.error_count < 10: + self.recovery_mode = False + self.log.info("Exiting recovery mode") +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Performance Optimization Guidelines +----------------------------------- + +### Zero-Allocation Patterns (DOLPHIN Style) + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class HighPerformanceSILOQYActor(Actor): + """High-performance actor using zero-allocation patterns""" + + def __init__(self, config): + super().__init__(config) + + # Pre-allocate arrays for high-frequency data + self.max_symbols = config.max_symbols + self.price_data = np.zeros(self.max_symbols, dtype=np.float64) + self.volume_data = np.zeros(self.max_symbols, dtype=np.float64) + self.timestamp_data = np.zeros(self.max_symbols, dtype=np.int64) + + # Symbol mapping for O(1) lookups + self.symbol_to_index = {} + self.index_to_symbol = {} + self.active_symbols = 0 + + def process_tick_zero_allocation(self, symbol: str, price: float, volume: float, timestamp: int): + """Process tick with zero memory allocation""" + # Get or assign symbol index + if symbol not in self.symbol_to_index: + if self.active_symbols >= self.max_symbols: + self.log.error(f"Symbol capacity exceeded: {self.max_symbols}") + return + + idx = self.active_symbols + self.symbol_to_index[symbol] = idx + self.index_to_symbol[idx] = symbol + self.active_symbols += 1 + else: + idx = self.symbol_to_index[symbol] + + # Update arrays in-place (zero allocation) + self.price_data[idx] = price + self.volume_data[idx] = volume + self.timestamp_data[idx] = timestamp +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Message Bus Performance Optimization + +**Efficient Data Serialization:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Prefer tuples over complex objects for high-frequency data +efficient_message = (symbol, price, volume, timestamp) # Fast +inefficient_message = {'symbol': symbol, 'price': price, 'volume': volume} # Slower + +# Use pre-allocated buffers for repeated operations +class BufferedSILOQYActor(Actor): + def __init__(self, config): + super().__init__(config) + self.message_buffer = [None] * 1000 # Pre-allocated buffer + self.buffer_index = 0 + + def publish_efficiently(self, data): + # Reuse buffer slots to minimize garbage collection + self.message_buffer[self.buffer_index] = data + self.msgbus.publish(self.topic, self.message_buffer[self.buffer_index]) + self.buffer_index = (self.buffer_index + 1) % 1000 +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Testing and Validation Requirements +----------------------------------- + +### Unit Test Requirements for SILOQY Components + +**Mandatory Test Coverage:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class TestMySILOQYComponent(unittest.TestCase): + """Required test patterns for all SILOQY components""" + + def test_message_bus_integration(self): + """Test MessageBus subscription and publishing""" + # Verify component can subscribe to required topics + # Verify component can publish to output topics + # Test message handling with valid and invalid data + pass + + def test_actor_lifecycle(self): + """Test complete actor lifecycle""" + # Test initialization with TradingNode + # Test on_start() behavior + # Test on_stop() behavior + # Test resource cleanup + pass + + def test_data_validation(self): + """Test input data validation""" + # Test with valid SILOQY data structures + # Test with malformed data + # Test boundary conditions + # Test error recovery + pass + + def test_performance_requirements(self): + """Test performance under load""" + # Test high-frequency message processing + # Test memory usage stability + # Test latency requirements + pass +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Integration Test Requirements + +**MessageBus Integration Test Template:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +def test_full_integration_with_dolphin(self): + """Test integration with existing DOLPHIN system""" + # Create TradingNode with DOLPHIN + custom component + node = self._create_test_node_with_custom_component() + + # Verify component receives DOLPHIN data + collector = TestDataCollector() + node.kernel.msgbus.subscribe("SILOQY.CUSTOM.OUTPUT", collector.collect_data) + + # Inject test data through DOLPHIN pipeline + test_ticks = self._generate_test_tick_data() + for tick in test_ticks: + node.kernel.msgbus.publish(RAW_TOPIC, tick) + + # Verify custom component processed DOLPHIN regime data + time.sleep(2.0) + self.assertGreater(len(collector.collected_data), 0) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Code Examples and Templates +--------------------------- + +### Complete Component Template + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +#!/usr/bin/env python3 +""" +SILOQY Custom Component Template +Template for integrating new components with SILOQY DOLPHIN system +""" + +import time +import asyncio +import numpy as np +from typing import Dict, List, Optional, Any +from collections import deque, defaultdict +from dataclasses import dataclass, field + +from nautilus_trader.common.actor import Actor +from nautilus_trader.common.config import ActorConfig +from nautilus_trader.common.component import Logger + +# SILOQY imports +from nautilus_actor_test_implementation_5x_modified import ( + RAW_TOPIC, STRUCTURED_TOPIC, REGIME_TOPIC, + SiloqyTradeTick, MarketRegime +) + +@dataclass +class MySILOQYComponentConfig(ActorConfig): + """Configuration for custom SILOQY component""" + component_name: str = "CustomComponent" + processing_interval_ms: int = 1000 + enable_regime_tracking: bool = True + max_history_size: int = 10000 + custom_parameters: Dict[str, Any] = field(default_factory=dict) + +class MySILOQYComponent(Actor): + """Template for custom SILOQY components""" + + def __init__(self, config: MySILOQYComponentConfig): + super().__init__(config) + + # Component configuration + self.component_name = config.component_name + self.processing_interval_ms = config.processing_interval_ms + self.enable_regime_tracking = config.enable_regime_tracking + self.max_history_size = config.max_history_size + + # High-performance data structures + self.tick_history = deque(maxlen=self.max_history_size) + self.regime_history = deque(maxlen=100) + self.current_regime = MarketRegime.SIDEWAYS + + # Performance metrics + self.processed_ticks = 0 + self.processed_regimes = 0 + self.last_performance_log = time.time_ns() + + # State management + self.is_processing = False + self.dependency_ready = asyncio.Event() + + self.log.info(f"Custom SILOQY component '{self.component_name}' initialized") + + def on_start(self) -> None: + """Start component and subscribe to SILOQY data streams""" + self.log.info(f"Starting SILOQY component: {self.component_name}") + + if hasattr(self, 'msgbus') and self.msgbus: + # Subscribe to core SILOQY data streams + self.msgbus.subscribe(STRUCTURED_TOPIC, self.handle_structured_tick) + + if self.enable_regime_tracking: + self.msgbus.subscribe(REGIME_TOPIC, self.handle_regime_update) + + # Subscribe to custom topics if needed + self.msgbus.subscribe("SILOQY.CUSTOM.INPUT", self.handle_custom_input) + + self.log.info("SILOQY MessageBus subscriptions established") + + # Start async processing + if hasattr(self, '_executor') and self._executor: + future = self._executor.submit(self.start_async_processing()) + else: + asyncio.create_task(self.start_async_processing()) + + def on_stop(self) -> None: + """Stop component processing""" + self.log.info(f"Stopping SILOQY component: {self.component_name}") + self.is_processing = False + # Nautilus ActorExecutor handles task cleanup + + async def start_async_processing(self): + """Start component-specific async processing""" + self.is_processing = True + self.log.info("Custom SILOQY component async processing started") + + while self.is_processing: + try: + # Perform periodic processing + await self.perform_periodic_analysis() + await asyncio.sleep(self.processing_interval_ms / 1000.0) + + except asyncio.CancelledError: + break + except Exception as e: + self.log.error(f"Periodic processing error: {e}") + await asyncio.sleep(1.0) + + def handle_structured_tick(self, tick: SiloqyTradeTick) -> None: + """Handle incoming structured tick data""" + try: + # Add to history + self.tick_history.append({ + 'symbol': str(tick.instrument_id), + 'price': float(tick.price), + 'size': float(tick.size), + 'timestamp': tick.ts_event, + 'open_price': tick.open_price, + 'exchange': tick.exchange + }) + + self.processed_ticks += 1 + + # Process tick for component-specific logic + result = self.process_tick_data(tick) + + # Publish results if any + if result: + self.msgbus.publish(f"SILOQY.{self.component_name.upper()}.RESULTS", result) + + # Performance logging + now = time.time_ns() + if now - self.last_performance_log > 10_000_000_000: # Every 10 seconds + self.log.info(f"Component {self.component_name}: processed {self.processed_ticks} ticks") + self.last_performance_log = now + + except Exception as e: + self.log.error(f"Structured tick processing error: {e}") + + def handle_regime_update(self, data: Tuple) -> None: + """Handle DOLPHIN regime updates""" + try: + timestamp, regime, bull_ratio, bear_ratio, sideways_ratio, analyzed, total, confidence = data + + # Update current regime + old_regime = self.current_regime + self.current_regime = MarketRegime(regime) + + # Add to history + self.regime_history.append({ + 'regime': regime, + 'bull_ratio': bull_ratio, + 'bear_ratio': bear_ratio, + 'confidence': confidence, + 'timestamp': timestamp + }) + + self.processed_regimes += 1 + + # React to regime changes + if old_regime != self.current_regime: + self.on_regime_change(old_regime, self.current_regime, confidence) + + except Exception as e: + self.log.error(f"Regime update processing error: {e}") + + def handle_custom_input(self, data: Any) -> None: + """Handle custom input data""" + # Override in derived classes + pass + + def process_tick_data(self, tick: SiloqyTradeTick) -> Optional[Dict]: + """Override in derived classes for tick processing""" + # Implement component-specific tick processing + return None + + async def perform_periodic_analysis(self) -> None: + """Override in derived classes for periodic analysis""" + # Implement component-specific periodic operations + pass + + def on_regime_change(self, old_regime: MarketRegime, new_regime: MarketRegime, confidence: float) -> None: + """Override in derived classes for regime change handling""" + self.log.info(f"Regime change detected: {old_regime.value} → {new_regime.value} (confidence: {confidence:.1%})") +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Component Registration Template + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +def create_siloqy_trading_node_with_custom_components(): + """Template for creating TradingNode with custom SILOQY components""" + + # Core SILOQY components (required) + symbol_discovery_config = ImportableActorConfig( + actor_path="nautilus_actor_test_implementation_5x_modified:SILOQYSymbolDiscoveryActor", + config_path="nautilus_actor_test_implementation_5x_modified:SILOQYSymbolDiscoveryConfig", + config={ + "component_id": "SILOQY-SYMBOL-DISCOVERY", + "symbols": [], + "candle_interval_ms": 15 * 60 * 1000, + "throttle_mode": True, # Safe for development + "throttle_rate_limit_seconds": 10.0, + "max_symbols_throttled": 100 + } + ) + + main_actor_config = ImportableActorConfig( + actor_path="nautilus_actor_test_implementation_5x_modified:SILOQYMainActor", + config_path="nautilus_actor_test_implementation_5x_modified:SILOQYMainActorConfig", + config={ + "component_id": "SILOQY-MAIN-ACTOR", + "candle_interval_ms": 15 * 60 * 1000, + "throttle_mode": True, + "enable_real_data": False # Set to True for production + } + ) + + regime_actor_config = ImportableActorConfig( + actor_path="nautilus_actor_test_implementation_5x_modified:DOLPHINRegimeActor", + config_path="nautilus_actor_test_implementation_5x_modified:DOLPHINRegimeActorConfig", + config={ + "component_id": "DOLPHIN-REGIME-ACTOR", + "max_symbols": 5000, + "ticks_per_analysis": 1000 + } + ) + + normalizer_config = ImportableActorConfig( + actor_path="nautilus_actor_test_implementation_5x_modified:SILOQYNormalizerActor", + config_path="nautilus_actor_test_implementation_5x_modified:SILOQYNormalizerConfig", + config={ + "component_id": "SILOQY-NORMALIZER" + } + ) + + # Custom SILOQY components (add new components here) + custom_component_config = ImportableActorConfig( + actor_path="my_custom_module:MySILOQYComponent", + config_path="my_custom_module:MySILOQYComponentConfig", + config={ + "component_id": "MY-CUSTOM-SILOQY-COMPONENT", + "component_name": "CustomAnalyzer", + "processing_interval_ms": 1000, + "enable_regime_tracking": True, + "max_history_size": 10000 + } + ) + + # Complete TradingNode configuration + trading_config = TradingNodeConfig( + trader_id=TraderId("SILOQY-EXTENDED-SYSTEM"), + actors=[ + symbol_discovery_config, + main_actor_config, + regime_actor_config, + normalizer_config, + custom_component_config # Add all custom components here + ], + data_clients={}, # External data clients if needed + exec_clients={} # Execution clients if needed + ) + + # Create and return node + node = TradingNode(config=trading_config) + return node +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Advanced Integration Patterns +----------------------------- + +### Custom Data Type Integration + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +from nautilus_trader.core.data import Data + +class SILOQYCustomData(Data): + """Custom SILOQY data type for Nautilus integration""" + + def __init__(self, component_id: str, analysis_data: Dict, timestamp_ns: int): + self.component_id = component_id + self.analysis_data = analysis_data + super().__init__(ts_event=timestamp_ns, ts_init=timestamp_ns) + + def __repr__(self) -> str: + return f"SILOQYCustomData(component={self.component_id}, data_keys={list(self.analysis_data.keys())})" + +# Usage in component +def publish_custom_analysis(self, analysis_result: Dict): + custom_data = SILOQYCustomData( + component_id=str(self.id), + analysis_data=analysis_result, + timestamp_ns=int(time.time_ns()) + ) + + # Publish as Nautilus Data object + if hasattr(self, 'publish_data'): + self.publish_data(custom_data) + else: + # Fallback to message bus + self.msgbus.publish("SILOQY.CUSTOM.ANALYSIS", custom_data) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Component Coordination Patterns + +**Master/Worker Pattern:** + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +class SILOQYMasterComponent(Actor): + """Master component that coordinates worker components""" + + def __init__(self, config): + super().__init__(config) + self.worker_topics = [ + "SILOQY.WORKER.CORRELATION", + "SILOQY.WORKER.SENTIMENT", + "SILOQY.WORKER.MOMENTUM" + ] + self.worker_results = {} + + def distribute_work(self, work_data: Dict): + """Distribute work to worker components""" + for topic in self.worker_topics: + work_package = { + 'master_id': str(self.id), + 'work_data': work_data, + 'response_topic': f"SILOQY.MASTER.{self.id}.RESULTS" + } + self.msgbus.publish(topic, work_package) + + def on_start(self): + # Subscribe to worker responses + self.msgbus.subscribe(f"SILOQY.MASTER.{self.id}.RESULTS", self.handle_worker_result) + self.msgbus.subscribe(REGIME_TOPIC, self.handle_regime_change) + +class SILOQYWorkerComponent(Actor): + """Worker component that processes distributed tasks""" + + def on_start(self): + self.msgbus.subscribe("SILOQY.WORKER.CORRELATION", self.handle_work_assignment) + + def handle_work_assignment(self, work_package: Dict): + """Process assigned work and return results""" + try: + result = self.process_work(work_package['work_data']) + + response = { + 'worker_id': str(self.id), + 'result': result, + 'timestamp': int(time.time() * 1000) + } + + self.msgbus.publish(work_package['response_topic'], response) + + except Exception as e: + self.log.error(f"Work processing failed: {e}") +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Production Deployment Checklist +------------------------------- + +### Pre-Deployment Requirements + +**Configuration Validation:** - [ ] `throttle_mode: False` for production +performance - [ ] `enable_real_data: True` for live market data +- [ ] `max_symbols_throttled: 5000+` for full market coverage - [ ] +`ticks_per_analysis: 1000` for stable regime detection + +**Component Integration Validation:** - [ ] All custom components pass +integration tests - [ ] MessageBus topic subscriptions verified - [ ] Actor +dependency order confirmed - [ ] Error handling and recovery tested - [ ] +Performance requirements met + +**System Resource Validation:** - [ ] Memory allocation sufficient for all +actors - [ ] CPU capacity adequate for real-time processing - [ ] Network +bandwidth adequate for WebSocket streams - [ ] Storage capacity planned (if +using persistent storage) + +### Production Configuration Template + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +def create_production_siloqy_node(): + """Production-ready SILOQY TradingNode configuration""" + + # Production symbol discovery + symbol_discovery_config = ImportableActorConfig( + actor_path="nautilus_actor_test_implementation_5x_modified:SILOQYSymbolDiscoveryActor", + config_path="nautilus_actor_test_implementation_5x_modified:SILOQYSymbolDiscoveryConfig", + config={ + "component_id": "SILOQY-SYMBOL-DISCOVERY-PROD", + "symbols": [], # Dynamic discovery + "candle_interval_ms": 15 * 60 * 1000, + "throttle_mode": False, # PRODUCTION: Full performance + "throttle_rate_limit_seconds": 2.5, # Standard rate limit + "max_symbols_throttled": 5000 # Full symbol coverage + } + ) + + # Production main actor with real data + main_actor_config = ImportableActorConfig( + actor_path="nautilus_actor_test_implementation_5x_modified:SILOQYMainActor", + config_path="nautilus_actor_test_implementation_5x_modified:SILOQYMainActorConfig", + config={ + "component_id": "SILOQY-MAIN-ACTOR-PROD", + "candle_interval_ms": 15 * 60 * 1000, + "throttle_mode": False, # PRODUCTION: Full performance + "enable_real_data": True # PRODUCTION: Real WebSocket data + } + ) + + # Production DOLPHIN regime detection + regime_actor_config = ImportableActorConfig( + actor_path="nautilus_actor_test_implementation_5x_modified:DOLPHINRegimeActor", + config_path="nautilus_actor_test_implementation_5x_modified:DOLPHINRegimeActorConfig", + config={ + "component_id": "DOLPHIN-REGIME-ACTOR-PROD", + "max_symbols": 5000, # Full capacity + "ticks_per_analysis": 1000 # Stable regime detection + } + ) + + return TradingNodeConfig( + trader_id=TraderId("SILOQY-PRODUCTION"), + actors=[symbol_discovery_config, main_actor_config, regime_actor_config, normalizer_config], + data_clients={}, + exec_clients={} + ) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Appendix: Message Bus Topic Reference +------------------------------------- + +### Complete Topic Hierarchy + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +SILOQY Root Topics: +├── SILOQY.RAW.TICKS # Raw market tick data (tuple format) +├── SILOQY.STRUCTURED.TICKS # Normalized SiloqyTradeTick objects +├── SILOQY.SYMBOLS.DISCOVERED # Symbol discovery completion +├── SILOQY.CANDLES.INITIAL # Candle reconstruction results +│ +DOLPHIN Topics: +├── DOLPHIN.REGIME.RESULTS # Market regime detection results +│ +Custom Component Topics (Examples): +├── SILOQY.HD.VECTORS # Hyperdimensional vector data +├── SILOQY.TEMPORAL.PATTERNS # Temporal memory patterns +├── SILOQY.CORRELATION.MATRIX # Symbol correlation analysis +├── SILOQY.SENTIMENT.ANALYSIS # Market sentiment data +├── SILOQY.RISK.METRICS # Risk calculation results +├── SILOQY.ML.PREDICTIONS # Machine learning predictions +├── SILOQY.ANALYSIS.REQUESTS # Analysis request coordination +├── SILOQY.ANALYSIS.RESPONSES # Analysis response delivery +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +### Performance Characteristics by Topic + +Topic + +Frequency + +Latency Requirement + +Data Size + +Thread Safety + +`RAW_TOPIC` + +1000+ msg/sec + +\< 5μs + +200 bytes + +Full + +`STRUCTURED_TOPIC` + +1000+ msg/sec + +\< 10μs + +500 bytes + +Full + +`REGIME_TOPIC` + +1-10 msg/sec + +\< 100μs + +300 bytes + +Full + +Custom Topics + +Variable + +\< 1ms + +Variable + +Full + +This document provides the complete technical foundation for integrating new +SILOQY components with the DOLPHIN market regime detection system within the +Nautilus framework. All patterns, data structures, and lifecycle management +approaches are based on the proven implementation that has processed 289+ +million ticks in production operation. diff --git a/nautilus_actor_test_implementation_5x.py b/nautilus_actor_test_implementation_5x.py index 6a64517..9388297 100644 --- a/nautilus_actor_test_implementation_5x.py +++ b/nautilus_actor_test_implementation_5x.py @@ -1,13 +1,16 @@ import time +import sys import numpy as np import asyncio import json import math +import httpx +import websockets from typing import Dict, List, Optional, Tuple, Any from enum import Enum from collections import deque -import httpx import random # Added for _simulate_websocket_ticks +from dataclasses import dataclass, field # Nautilus imports - following test pattern from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig @@ -34,6 +37,12 @@ STRUCTURED_TOPIC = "SILOQY.STRUCTURED.TICKS" REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS" SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED" CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL" +# ADDED LINE 18: +TICK_SIZES_TOPIC = "SILOQY.TICK.SIZES" +# NEW: Enhanced indicator topics for data bus publishing +REGIME_INDICATORS_TOPIC = "DOLPHIN.REGIME.INDICATORS" +BB_METRICS_TOPIC = "DOLPHIN.BB.METRICS" +TEMPORAL_PATTERNS_TOPIC = "DOLPHIN.TEMPORAL.PATTERNS" # Rate limiting constant MIN_INTERVAL = 2.5 # seconds between API batches @@ -44,6 +53,239 @@ class MarketRegime(Enum): BEAR = "BEAR" TRANSITION = "TRANSITION" SIDEWAYS = "SIDEWAYS" + +# ============================================================================ +# EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE +# ============================================================================ + +@dataclass +class TickData: + """Universal tick data structure from standalone""" + symbol: str + price: float + quantity: float + timestamp: int + is_buyer_maker: bool + trade_id: int + exchange: str + raw_data: Dict = None + + def __post_init__(self): + if self.raw_data is None: + self.raw_data = {} + +@dataclass +class ExchangeConfig: + """Exchange-specific configuration from standalone""" + name: str + websocket_limit_per_second: int + max_streams_per_connection: int + max_connections_per_window: int + connection_window_seconds: int + base_websocket_url: str + requires_auth: bool = False + ping_interval: float = 20.0 + ping_timeout: float = 10.0 + +class BinanceAdapter: + """EXTRACTED FROM STANDALONE: Binance-specific WebSocket adapter""" + + def __init__(self): + self.config = ExchangeConfig( + name="binance", + websocket_limit_per_second=10, + max_streams_per_connection=1024, + max_connections_per_window=300, + connection_window_seconds=300, # 5 minutes + base_websocket_url="wss://stream.binance.com:9443", + requires_auth=False + ) + self.logger = Logger(f"BINANCE_ADAPTER") + + def get_websocket_url(self, symbols: List[str]) -> str: + """Build Binance WebSocket URL with combined streams""" + if len(symbols) == 1: + symbol = symbols[0].lower() + return f"{self.config.base_websocket_url}/ws/{symbol}@trade" + else: + # Combined stream approach + streams = [f"{symbol.lower()}@trade" for symbol in symbols] + stream_string = "/".join(streams) + return f"{self.config.base_websocket_url}/stream?streams={stream_string}" + + def parse_message(self, message: str) -> Optional[TickData]: + """EXTRACTED FROM STANDALONE: Parse Binance trade message""" + try: + data = json.loads(message) + + # Handle combined stream format + if 'stream' in data and 'data' in data: + stream_name = data['stream'] + trade_data = data['data'] + else: + # Single stream format + trade_data = data + stream_name = trade_data.get('e', '') + + # Only process trade events + if not (stream_name.endswith('@trade') or trade_data.get('e') == 'trade'): + return None + + return TickData( + symbol=trade_data['s'], + price=float(trade_data['p']), + quantity=float(trade_data['q']), + timestamp=int(trade_data['T']), + is_buyer_maker=trade_data['m'], + trade_id=int(trade_data['t']), + exchange="binance", + raw_data=trade_data + ) + + except (json.JSONDecodeError, KeyError, ValueError) as e: + self.logger.error(f"Failed to parse Binance message: {e}") + return None + + def calculate_optimal_distribution(self, symbols: List[str]) -> List[List[str]]: + """EXTRACTED FROM STANDALONE: Calculate optimal symbol distribution across connections""" + # Conservative: leave room for message bursts + safety_margin = 0.7 + symbols_per_connection = int( + min( + self.config.max_streams_per_connection * safety_margin, + self.config.websocket_limit_per_second * 5 # 5-second buffer + ) + ) + + # Group symbols into chunks + symbol_groups = [] + for i in range(0, len(symbols), symbols_per_connection): + group = symbols[i:i + symbols_per_connection] + symbol_groups.append(group) + + return symbol_groups + +class SiloqyWebSocketConnection: + """EXTRACTED FROM STANDALONE: WebSocket connection adapted for Nautilus msgbus""" + + def __init__(self, connection_id: str, symbols: List[str], + binance_adapter: BinanceAdapter, main_actor_ref): + self.connection_id = connection_id + self.symbols = symbols + self.adapter = binance_adapter + self.main_actor = main_actor_ref # Reference to SILOQYMainActor for msgbus access + + self.websocket = None + self.is_running = False + self.retry_count = 0 + self.messages_received = 0 + self.last_message_time = 0.0 + self.connected_at = None + + self.logger = Logger(f"WS_{connection_id}") + self.logger.info(f"WebSocket connection initialized for {len(symbols)} symbols") + + async def start(self) -> None: + """Start the WebSocket connection""" + self.is_running = True + + while self.is_running: + try: + await self._connect_and_run() + except Exception as e: + self.logger.error(f"Connection {self.connection_id} error: {e}") + + if self.is_running: + await self._handle_reconnection() + else: + break + + async def _connect_and_run(self) -> None: + """EXTRACTED FROM STANDALONE: Connect to WebSocket and run message loop""" + url = self.adapter.get_websocket_url(self.symbols) + + self.logger.info(f"Connecting to Binance: {url[:100]}...") + + async with websockets.connect( + url, + ping_interval=self.adapter.config.ping_interval, + ping_timeout=self.adapter.config.ping_timeout, + close_timeout=10 + ) as websocket: + self.websocket = websocket + self.connected_at = time.time() + self.retry_count = 0 + + self.logger.info(f"Connection {self.connection_id} established") + + # Message processing loop + async for message in websocket: + if not self.is_running: + break + + try: + await self._process_message(message) + except Exception as e: + self.logger.error(f"Message processing error: {e}") + + async def _process_message(self, message: str) -> None: + """Process incoming WebSocket message and convert to Nautilus format""" + self.messages_received += 1 + self.last_message_time = time.time() + + # Parse message using Binance adapter + tick_data = self.adapter.parse_message(message) + if tick_data: + # Convert TickData to SiloqyTradeTick and publish via Nautilus msgbus + try: + # Create SiloqyTradeTick object + siloqy_tick = SiloqyTradeTick( + instrument_id=tick_data.symbol, + price=round(tick_data.price, 8), + size=round(tick_data.quantity, 8), + ts_event=tick_data.timestamp * 1000000, # Convert ms to ns + ts_init=int(time.time_ns()), + open_price=None, # Will be set by candle logic + candle_open_time=None, # Will be set by candle logic + tick_side=None, + exchange="BINANCE", + liquidity_flag=None + ) + + # Get candle information from main actor + if hasattr(self.main_actor, 'active_candles') and self.main_actor.active_candles: + symbol = tick_data.symbol + if symbol in self.main_actor.active_candles: + candle = self.main_actor.active_candles[symbol] + siloqy_tick.open_price = candle.get('open_price', tick_data.price) + siloqy_tick.candle_open_time = candle.get('open_time', int(time.time() * 1000)) + + # Process through main actor's tick handling (preserves all candle logic) + self.main_actor.on_websocket_tick( + tick_data.symbol, + tick_data.price, + tick_data.quantity, + tick_data.timestamp + ) + + except Exception as e: + self.logger.error(f"Failed to process tick: {e}") + + async def _handle_reconnection(self) -> None: + """Handle reconnection with exponential backoff""" + self.retry_count += 1 + delay = min(2 ** self.retry_count, 60) # Max 60 seconds + + self.logger.warning(f"Reconnecting in {delay}s (attempt {self.retry_count})") + await asyncio.sleep(delay) + + async def stop(self) -> None: + """Stop the connection""" + self.logger.info(f"Stopping connection {self.connection_id}") + self.is_running = False + + if self.websocket: + await self.websocket.close() # -------------------------------------------------------------------- # SILOQY Custom Tick - PRESERVED with fixes @@ -95,10 +337,11 @@ class SILOQYSymbolDiscoveryConfig(ActorConfig): class SILOQYMainActorConfig(ActorConfig): candle_interval_ms: int = 15 * 60 * 1000 throttle_mode: bool = False + enable_real_data: bool = False # NEW: Enable real WebSocket data class DOLPHINRegimeActorConfig(ActorConfig): max_symbols: int = 5000 - ticks_per_analysis: int = 1000 + ticks_per_analysis: int = 10 class SILOQYNormalizerConfig(ActorConfig): pass @@ -115,6 +358,7 @@ class SILOQYSymbolDiscoveryActor(Actor): self.symbols = list(config.symbols) if config.symbols else [] self.candle_interval_ms = config.candle_interval_ms self.active_candles = {} + self.tick_sizes = {} # Process management configuration self.throttle_mode = config.throttle_mode @@ -174,18 +418,84 @@ class SILOQYSymbolDiscoveryActor(Actor): self.log.info("Starting dynamic symbol discovery from Binance...") url = "https://api.binance.com/api/v3/exchangeInfo" + async with httpx.AsyncClient() as client: + # RATE LIMIT CHECK - Before anything else + self.log.info("Checking Binance API rate limit status...") + time_check_url = "https://api.binance.com/api/v3/time" + + try: + rate_check_response = await client.get(time_check_url, timeout=5) + + if rate_check_response.status_code == 200: + # Parse rate limit headers + headers = rate_check_response.headers + used_weight = headers.get('X-MBX-USED-WEIGHT-1M', 'Unknown') + server_time = rate_check_response.json().get('serverTime', 'Unknown') + + self.log.info(f"Rate limit check passed - Used weight: {used_weight}/1200, Server time: {server_time}") + + # Check if we're close to rate limit + if used_weight != 'Unknown' and int(used_weight) > 1000: + self.log.warning(f"HIGH RATE LIMIT USAGE: {used_weight}/1200 - Proceeding with caution") + + elif rate_check_response.status_code == 429: + retry_after = rate_check_response.headers.get('Retry-After', '60') + self.log.error(f"RATE LIMITED: Must wait {retry_after} seconds before API calls") + raise Exception(f"Binance API rate limited - retry after {retry_after}s") + + elif rate_check_response.status_code == 418: + self.log.error("IP BANNED: This IP address has been auto-banned by Binance") + raise Exception("IP address banned by Binance - cannot proceed") + + else: + self.log.warning(f"Rate limit check returned status {rate_check_response.status_code}") + self.log.warning("Proceeding anyway, but may encounter issues") + + except Exception as e: + if "rate limited" in str(e).lower() or "banned" in str(e).lower(): + raise # Re-raise rate limit/ban errors + else: + self.log.warning(f"Rate limit check failed: {e}") + self.log.warning("Proceeding with symbol discovery anyway") + async with httpx.AsyncClient() as client: self.log.info("Fetching exchange info from Binance API...") response = await client.get(url, timeout=10) if response.status_code == 200: self.log.info("Successfully received exchange info") data = response.json() - # Get all trading symbols (USDT pairs for example) - full_symbols = [ - s['symbol'] for s in data['symbols'] - if s['status'] == 'TRADING' and s['symbol'].endswith('USDT') - ] + # Combined symbol discovery and tick size extraction + self.log.info("Processing symbols and extracting tick sizes...") + full_symbols = [] + for symbol_info in data['symbols']: + if symbol_info['status'] == 'TRADING' and symbol_info['symbol'].endswith('USDT'): + symbol = symbol_info['symbol'] + full_symbols.append(symbol) + + # Extract tick size while processing # Extract tick size while processing + tick_size = None + for filter_info in symbol_info['filters']: + if filter_info['filterType'] == 'PRICE_FILTER': + tick_size = float(filter_info['tickSize']) + break + + # If no PRICE_FILTER found, try other filter types + if tick_size is None: + for filter_info in symbol_info['filters']: + if filter_info['filterType'] == 'TICK_SIZE': + tick_size = float(filter_info['tickSize']) + break + + # Fallback to default if still not found + if tick_size is None: + tick_size = 1e-8 # Default fallback + self.log.warning(f"No tick size found for {symbol}, using fallback {tick_size}") + + self.tick_sizes[symbol] = tick_size + + self.log.info(f"Processed {len(full_symbols)} symbols, extracted {len(self.tick_sizes)} tick sizes") + # Apply throttle mode symbol limiting if self.throttle_mode: self.symbols = full_symbols[:self.max_symbols_throttled] @@ -200,6 +510,7 @@ class SILOQYSymbolDiscoveryActor(Actor): raise Exception(f"Failed to fetch exchange info: {response.status_code}") async def _fetch_stats_and_reconstruct_candles(self): + """PRESERVED: All original rate limiting with Nautilus async patterns""" url = "https://api.binance.com/api/v3/ticker/24hr" klines_url = "https://api.binance.com/api/v3/klines" ticker_url = "https://api.binance.com/api/v3/ticker/price" @@ -315,7 +626,7 @@ class SILOQYSymbolDiscoveryActor(Actor): except Exception as e: self.log.error(f"Request failed: {str(e)}") - # PRESERVED: Original rate limiting + # PRESERVED: Original rate limiting with Nautilus async elapsed = time.time() - start_time if i + 50 < len(self.symbols): sleep_time = max(0, rate_limit_interval - elapsed) @@ -337,8 +648,10 @@ class SILOQYSymbolDiscoveryActor(Actor): # Publish symbols and candles as tuples self.msgbus.publish(SYMBOLS_DISCOVERED_TOPIC, (self.symbols, int(time.time_ns()))) self.msgbus.publish(CANDLES_INITIAL_TOPIC, (self.active_candles, int(time.time_ns()))) + self.msgbus.publish(TICK_SIZES_TOPIC, (self.tick_sizes, int(time.time_ns()))) self.log.info(f"Nautilus ActorExecutor: Published {len(self.symbols)} symbols and {len(self.active_candles)} candles") + self.log.info(f"Nautilus ActorExecutor: Published {len(self.tick_sizes)} tick sizes") self.log.info("Nautilus ActorExecutor: Discovery phase complete - other actors can now start processing") else: self.log.warning("Nautilus ActorExecutor: msgbus not available for publishing") @@ -353,6 +666,7 @@ class SILOQYMainActor(Actor): # Preserve original configuration self.candle_interval_ms = config.candle_interval_ms self.throttle_mode = config.throttle_mode + self.enable_real_data = config.enable_real_data # NEW: Real data capability self.connections = {} self.connection_tasks = {} @@ -360,6 +674,11 @@ class SILOQYMainActor(Actor): self.symbols = [] self.active_candles = {} + # WebSocket infrastructure - NEW + self.binance_adapter = BinanceAdapter() if self.enable_real_data else None + self.websocket_connections: Dict[str, SiloqyWebSocketConnection] = {} + self.websocket_tasks: Dict[str, asyncio.Task] = {} + # WebSocket tasks - managed by Nautilus ActorExecutor self.ws_tasks = [] @@ -369,6 +688,11 @@ class SILOQYMainActor(Actor): if self.throttle_mode: self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation") + if self.enable_real_data: + self.log.info("REAL DATA MODE: Will connect to Binance WebSocket streams") + else: + self.log.info("SIMULATION MODE: Will generate simulated ticks") + self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor") def on_start(self) -> None: @@ -393,6 +717,12 @@ class SILOQYMainActor(Actor): def on_stop(self) -> None: """Stop the actor - Nautilus handles executor cleanup""" self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping") + + # Stop WebSocket connections if running + if self.enable_real_data: + for conn in self.websocket_connections.values(): + asyncio.create_task(conn.stop()) + # Nautilus kernel handles executor shutdown - no manual cleanup needed async def on_start_async(self) -> None: @@ -401,8 +731,11 @@ class SILOQYMainActor(Actor): self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...") await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0) - # PRESERVED: Start WebSocket connections - await self._start_websocket_connections() + # ENHANCED: Start real or simulated connections based on configuration + if self.enable_real_data: + await self._start_real_websocket_connections() + else: + await self._start_simulated_connections() self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully") @@ -423,7 +756,6 @@ class SILOQYMainActor(Actor): except Exception as e: self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}") - # FIXED: SILOQYMainActor.handle_candles_initial def handle_candles_initial(self, data): """Handle initial candles from discovery actor and properly initialize the candle dict.""" try: @@ -448,8 +780,53 @@ class SILOQYMainActor(Actor): except Exception as e: self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}") - async def _start_websocket_connections(self): - """PRESERVED: Original WebSocket connection logic with Nautilus ActorExecutor""" + async def _start_real_websocket_connections(self): + """NEW: Start real Binance WebSocket connections with throttle control""" + self.log.info("Starting REAL Binance WebSocket connections for live data...") + + # Apply throttle mode limits to WebSocket connections + symbols_to_use = self.symbols + if self.throttle_mode: + symbols_to_use = self.symbols[:10] # Limit to 10 symbols in throttle mode + self.log.warning(f"THROTTLE MODE: Limited real data to {len(symbols_to_use)} symbols") + + # Calculate optimal symbol distribution + symbol_groups = self.binance_adapter.calculate_optimal_distribution(symbols_to_use) + + self.log.info(f"Real WebSocket distribution:") + self.log.info(f" - Total symbols: {len(symbols_to_use)}") + self.log.info(f" - Connections needed: {len(symbol_groups)}") + + # Start WebSocket connections + for i, symbol_group in enumerate(symbol_groups): + connection_id = f"binance_real_{i:03d}" + + connection = SiloqyWebSocketConnection( + connection_id=connection_id, + symbols=symbol_group, + binance_adapter=self.binance_adapter, + main_actor_ref=self + ) + + self.websocket_connections[connection_id] = connection + + # Use Nautilus ActorExecutor for WebSocket task management + if hasattr(self, '_executor') and self._executor: + future = self._executor.submit(connection.start()) + self.log.info(f"Real WebSocket {connection_id} submitted to ActorExecutor") + else: + task = asyncio.create_task(connection.start()) + self.websocket_tasks[connection_id] = task + self.log.warning(f"Real WebSocket {connection_id} started with asyncio fallback") + + # Rate limit connection establishment + if i < len(symbol_groups) - 1: + await asyncio.sleep(1.0) # 1 second between connections + + self.log.info(f"Started {len(symbol_groups)} real WebSocket connections") + + async def _start_simulated_connections(self): + """PRESERVED: Original WebSocket simulation logic with Nautilus ActorExecutor""" self.log.info("Starting WebSocket simulation for tick generation...") self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols") @@ -464,6 +841,7 @@ class SILOQYMainActor(Actor): self.ws_tasks.append(task) async def _simulate_websocket_ticks(self): + """PRESERVED: Original simulation logic""" # Adjust tick rate for throttle mode tick_delay = 0.1 if self.throttle_mode else 0.01 symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols)) @@ -489,7 +867,8 @@ class SILOQYMainActor(Actor): log_interval = 500 if self.throttle_mode else 1000 if tick_count % log_interval == 0: mode_indicator = "THROTTLE" if self.throttle_mode else "" - self.log.info(f"Generated {tick_count} ticks {mode_indicator} - latest: {symbol}@{price:.2f}") + data_source = "REAL" if self.enable_real_data else "SIM" + self.log.info(f"Generated {tick_count} ticks {mode_indicator} {data_source} - latest: {symbol}@{price:.2f}") await asyncio.sleep(tick_delay) except asyncio.CancelledError: @@ -549,6 +928,8 @@ class SILOQYMainActor(Actor): except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish tick: {e}") + self.log.error(f"{tick_tuple}") + sys.exit(2) class DOLPHINRegimeActor(Actor): def __init__(self, config: DOLPHINRegimeActorConfig): @@ -565,6 +946,7 @@ class DOLPHINRegimeActor(Actor): self.low_prices = np.zeros(self.max_symbols, dtype=np.float64) self.volumes = np.zeros(self.max_symbols, dtype=np.float64) self.last_update = np.zeros(self.max_symbols, dtype=np.int64) + self.tick_sizes = np.full(self.max_symbols, 1e-8, dtype=np.float64) # Default fallback # PRESERVED: All original mapping and state self.symbol_to_idx = {} @@ -585,25 +967,76 @@ class DOLPHINRegimeActor(Actor): self.processed_ticks = 0 self.regime_calculations = 0 + # NEW: Enhanced indicator tracking for BB and temporal patterns + self.signal_history = deque(maxlen=100) # For BB calculations + self.bb_period = 20 # BB calculation period + self.bb_std_dev = 2.0 # BB standard deviation multiplier + self.velocity_history = deque(maxlen=10) # For regime velocity tracking + self.confidence_history = deque(maxlen=20) # For confidence trend analysis + self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, " f"ticks_per_analysis: {self.ticks_per_analysis}") def on_start(self) -> None: """Subscribe to tick events - using Nautilus ActorExecutor""" - self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to tick events") + self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to events") if hasattr(self, 'msgbus') and self.msgbus: self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) - self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events") + self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered) + self.msgbus.subscribe(TICK_SIZES_TOPIC, self.handle_tick_sizes) + self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events, symbol discovery and tick sizes") def on_stop(self) -> None: """Stop the actor - Nautilus handles executor cleanup""" self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping") # Nautilus kernel handles executor shutdown - no manual cleanup needed + def handle_symbols_discovered(self, data): + """Pre-initialize symbol mappings during discovery phase""" + try: + symbols, timestamp = data + self.log.info(f"Nautilus ActorExecutor: Pre-initializing {len(symbols)} symbol mappings") + + for symbol in symbols: + if self.active_symbols >= self.max_symbols: + self.log.warning(f"Max symbols ({self.max_symbols}) exceeded during initialization") + break + + idx = self.active_symbols + self.symbol_to_idx[symbol] = idx + self.idx_to_symbol[idx] = symbol + self.active_symbols += 1 + + self.log.info(f"Pre-initialized {self.active_symbols} symbol mappings") + + except Exception as e: + self.log.error(f"Error pre-initializing symbols: {e}") + + def handle_tick_sizes(self, data): + """Apply tick sizes to pre-initialized symbol mappings""" + try: + tick_sizes, timestamp = data + self.log.info(f"Nautilus ActorExecutor: Received {len(tick_sizes)} tick sizes from discovery actor") + + applied_count = 0 + for symbol, tick_size in tick_sizes.items(): + if symbol in self.symbol_to_idx: # Will exist from pre-initialization + idx = self.symbol_to_idx[symbol] + if 0 < tick_size <= 1.0: + self.tick_sizes[idx] = tick_size + applied_count += 1 + else: + self.log.warning(f"Invalid tick size {tick_size} for {symbol}, using fallback") + + self.log.info(f"Nautilus ActorExecutor: Applied {applied_count} tick sizes to pre-initialized symbols") + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Error handling tick sizes: {e}") + def handle_raw_tick(self, data): """ - PRESERVED EXACTLY: All original zero-allocation tick processing + SIMPLIFIED: Zero-allocation tick processing with pre-initialized symbols Using Nautilus ActorExecutor for regime detection tasks """ try: @@ -613,41 +1046,28 @@ class DOLPHINRegimeActor(Actor): self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}") return - # PRESERVED EXACTLY: All original array processing logic + # Direct lookup - symbol will exist from pre-initialization if symbol not in self.symbol_to_idx: - if self.active_symbols >= self.max_symbols: - self.log.error(f"Nautilus ActorExecutor: Max symbols ({self.max_symbols}) exceeded") - return + self.log.error(f"Nautilus ActorExecutor: Symbol {symbol} not found in pre-initialized mappings") + return - idx = self.active_symbols - self.symbol_to_idx[symbol] = idx - self.idx_to_symbol[idx] = symbol - self.active_symbols += 1 - - # Initialize arrays + idx = self.symbol_to_idx[symbol] + + # Check if new candle period + if candle_open_time > self.last_update[idx]: + # Reset for new candle self.open_prices[idx] = open_price self.high_prices[idx] = price self.low_prices[idx] = price self.close_prices[idx] = price - self.volumes[idx] = 0.0 + self.volumes[idx] = size + self.last_update[idx] = candle_open_time else: - idx = self.symbol_to_idx[symbol] - - # Check if new candle period - if candle_open_time > self.last_update[idx]: - # Reset for new candle - self.open_prices[idx] = open_price - self.high_prices[idx] = price - self.low_prices[idx] = price - self.close_prices[idx] = price - self.volumes[idx] = size - self.last_update[idx] = candle_open_time - else: - # Update existing candle data - self.close_prices[idx] = price - self.high_prices[idx] = max(self.high_prices[idx], price) - self.low_prices[idx] = min(self.low_prices[idx], price) - self.volumes[idx] += size + # Update existing candle data + self.close_prices[idx] = price + self.high_prices[idx] = max(self.high_prices[idx], price) + self.low_prices[idx] = min(self.low_prices[idx], price) + self.volumes[idx] += size self.tick_count += 1 self.processed_ticks += 1 @@ -675,6 +1095,101 @@ class DOLPHINRegimeActor(Actor): except Exception as e: self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}") + def _calculate_enhanced_indicators(self, bull_ratio, bear_ratio, confidence, analyzed, total_symbols): + """NEW: Calculate enhanced indicators including BB metrics and temporal patterns""" + # Calculate regime momentum signal + base_momentum = (bull_ratio - bear_ratio) * 100 # -100 to +100 + sample_quality = min(analyzed / total_symbols, 1.0) if total_symbols > 0 else 0.0 + signal = base_momentum * confidence * sample_quality + + # Add to signal history + self.signal_history.append(signal) + + # Calculate velocity (rate of change in signal) + velocity = 0.0 + if len(self.signal_history) >= 2: + velocity = self.signal_history[-1] - self.signal_history[-2] + self.velocity_history.append(velocity) + + # Store confidence for trending + self.confidence_history.append(confidence) + + # Calculate Bollinger Bands if enough history + bb_metrics = {} + if len(self.signal_history) >= self.bb_period: + recent_signals = list(self.signal_history)[-self.bb_period:] + sma = sum(recent_signals) / len(recent_signals) + + # Calculate standard deviation + variance = sum((x - sma) ** 2 for x in recent_signals) / len(recent_signals) + std_dev = variance ** 0.5 + + upper_band = sma + (self.bb_std_dev * std_dev) + lower_band = sma - (self.bb_std_dev * std_dev) + + # Position within BBs (mean reversion interpretation) + if signal > upper_band: + bb_position = 'ABOVE_UPPER' + elif signal < lower_band: + bb_position = 'BELOW_LOWER' + elif signal >= sma: + bb_position = 'UPPER_HALF' + else: + bb_position = 'LOWER_HALF' + + # Momentum persistence interpretation + if signal > upper_band: + momentum_signal = 'STRONG_BULL_BREAKOUT' + elif signal < lower_band: + momentum_signal = 'STRONG_BEAR_BREAKOUT' + elif signal > sma: + momentum_signal = 'MILD_BULLISH' + else: + momentum_signal = 'MILD_BEARISH' + + bb_metrics = { + 'signal': signal, + 'sma': sma, + 'upper_band': upper_band, + 'lower_band': lower_band, + 'bb_position': bb_position, + 'momentum_signal': momentum_signal, + 'bb_ready': True + } + else: + bb_metrics = { + 'signal': signal, + 'sma': None, + 'upper_band': None, + 'lower_band': None, + 'bb_position': 'INSUFFICIENT_DATA', + 'momentum_signal': 'INSUFFICIENT_DATA', + 'bb_ready': False + } + + # Calculate temporal patterns + temporal_metrics = {} + if len(self.velocity_history) >= 3: + avg_velocity = sum(self.velocity_history) / len(self.velocity_history) + velocity_trend = 'ACCELERATING' if velocity > avg_velocity else 'DECELERATING' + else: + avg_velocity = velocity + velocity_trend = 'BUILDING_HISTORY' + + if len(self.confidence_history) >= 5: + confidence_trend = 'RISING' if confidence > sum(self.confidence_history[-5:-1])/4 else 'FALLING' + else: + confidence_trend = 'BUILDING_HISTORY' + + temporal_metrics = { + 'velocity': velocity, + 'avg_velocity': avg_velocity, + 'velocity_trend': velocity_trend, + 'confidence_trend': confidence_trend + } + + return bb_metrics, temporal_metrics + def _run_regime_detection(self): self.regime_calculations += 1 @@ -686,6 +1201,9 @@ class DOLPHINRegimeActor(Actor): bullish = 0 bearish = 0 + # NEW: Track pattern of bullish/bearish symbols for this calculation + symbol_pattern = [] + # PRESERVED: Original analysis with exact thresholds for idx in range(self.active_symbols): open_price = self.open_prices[idx] @@ -696,13 +1214,26 @@ class DOLPHINRegimeActor(Actor): analyzed += 1 - # PRESERVED: EXACT DOLPHIN thresholds - change = (close_price - open_price) / open_price + + # NEW: HFT-grade tick-size based comparison + tick_size = self.tick_sizes[idx] + equality_threshold = tick_size / 2 # Half tick size standard + price_diff = abs(close_price - open_price) - if change >= 0.0015: # 0.15% threshold for bullish + # Check if prices are effectively equal within tick size tolerance + if price_diff <= equality_threshold: + # Prices are effectively equal (within tick size tolerance) + symbol_pattern.append(f"S{close_price:.8f}={open_price:.8f}") + elif close_price > open_price: + # Bullish: close > open bullish += 1 - elif change <= -0.0015: # -0.15% threshold for bearish + # Arrow points to close (larger price) + symbol_pattern.append(f"B{open_price:.8f}->{close_price:.8f}") + else: + # Bearish: close < open bearish += 1 + # Arrow points to open (larger price) + symbol_pattern.append(f"X{close_price:.8f}<-{open_price:.8f}") if analyzed == 0: return @@ -724,6 +1255,11 @@ class DOLPHINRegimeActor(Actor): # PRESERVED: Original confidence calculation confidence = self._calculate_confidence(bull_ratio, bear_ratio, analyzed, total_symbols) + # NEW: Calculate enhanced indicators + bb_metrics, temporal_metrics = self._calculate_enhanced_indicators( + bull_ratio, bear_ratio, confidence, analyzed, total_symbols + ) + self.previous_bull_ratio = bull_ratio # Publish regime result using Nautilus message bus @@ -742,20 +1278,87 @@ class DOLPHINRegimeActor(Actor): self.msgbus.publish(REGIME_TOPIC, regime_tuple) + # NEW: Publish enhanced indicators to data bus + indicator_data = { + 'timestamp': int(time.time() * 1000), + 'regime_momentum_signal': bb_metrics['signal'], + 'bb_ready': bb_metrics['bb_ready'], + 'velocity': temporal_metrics['velocity'], + 'velocity_trend': temporal_metrics['velocity_trend'], + 'confidence_trend': temporal_metrics['confidence_trend'] + } + self.msgbus.publish(REGIME_INDICATORS_TOPIC, indicator_data) + + # Publish BB metrics separately for specialized consumers + if bb_metrics['bb_ready']: + bb_data = { + 'timestamp': int(time.time() * 1000), + 'signal': bb_metrics['signal'], + 'sma': bb_metrics['sma'], + 'upper_band': bb_metrics['upper_band'], + 'lower_band': bb_metrics['lower_band'], + 'bb_position': bb_metrics['bb_position'], + 'momentum_signal': bb_metrics['momentum_signal'] + } + self.msgbus.publish(BB_METRICS_TOPIC, bb_data) + + # Publish temporal patterns + temporal_data = { + 'timestamp': int(time.time() * 1000), + 'velocity': temporal_metrics['velocity'], + 'avg_velocity': temporal_metrics['avg_velocity'], + 'velocity_trend': temporal_metrics['velocity_trend'], + 'confidence_trend': temporal_metrics['confidence_trend'], + 'signal_history_length': len(self.signal_history) + } + self.msgbus.publish(TEMPORAL_PATTERNS_TOPIC, temporal_data) + except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}") - + # Log regime changes if not self.regime_history or regime != self.regime_history[-1]: - self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.1%} " - f"Bear: {bear_ratio:.1%} Sideways: {sideways_ratio:.1%} | " - f"Confidence: {confidence:.1%} | Analyzed: {analyzed}/{total_symbols}") + self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.2%} " + f"Bear: {bear_ratio:.2%} Sideways: {sideways_ratio:.2%} ({bullish}/{bearish}) | " + f"Confidence: {confidence:.2%} | Analyzed: {analyzed}/{total_symbols}") self.regime_history.append(regime) # Periodic regime status (even without changes) - if self.regime_calculations % 10 == 0: # Every 10 calculations - self.log.info(f"REGIME STATUS: {regime.value} | Bull: {bull_ratio:.1%} " - f"Bear: {bear_ratio:.1%} | Processed: {self.processed_ticks} ticks") + if self.regime_calculations % 50 == 0: # Every second, approx, given avg. tick rate + # Determine color based on regime + if regime == MarketRegime.BULL: + color_code = "\033[92m" # Green + elif regime == MarketRegime.BEAR: + color_code = "\033[91m" # Red + else: # SIDEWAYS + color_code = "\033[93m" # Yellow + + # Reset color code + reset_code = "\033[0m" + + self.log.info(f"{color_code}REGIME STATUS: {regime.value} | Bull: {bull_ratio:.2%} " + f"Bear: {bear_ratio:.2%} ({bullish}/{bearish}) | Processed: {self.processed_ticks} ticks{reset_code}") + + # NEW: Enhanced indicator line after regime status + if bb_metrics['bb_ready']: + self.log.info(f"{color_code}INDICATORS: Signal:{bb_metrics['signal']:.1f} | " + f"SMA:{bb_metrics['sma']:.1f} | Upper:{bb_metrics['upper_band']:.1f} | " + f"Lower:{bb_metrics['lower_band']:.1f} | Pos:{bb_metrics['bb_position']} | " + f"Mom:{bb_metrics['momentum_signal']} | Vel:{temporal_metrics['velocity']:.1f} | " + f"VelTrend:{temporal_metrics['velocity_trend']} | ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}") + else: + self.log.info(f"{color_code}INDICATORS: Signal:{bb_metrics['signal']:.1f} | " + f"Status:BUILDING_BB_HISTORY ({len(self.signal_history)}/{self.bb_period}) | " + f"Vel:{temporal_metrics['velocity']:.1f} | VelTrend:{temporal_metrics['velocity_trend']} | " + f"ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}") + + # NEW: Log symbol pattern and counts + if symbol_pattern: # Only if we have symbols to show + pattern_str = " ".join(symbol_pattern) + " " # Create pattern with spaces + bull_count = sum(1 for s in symbol_pattern if s.startswith("B")) + bear_count = sum(1 for s in symbol_pattern if s.startswith("X")) + + self.log.debug(f"{pattern_str} and totals: BULLS:{bull_count}/BEARS:{bear_count}") def _calculate_confidence(self, bull_ratio: float, bear_ratio: float, analyzed: int, total: int) -> float: @@ -870,7 +1473,7 @@ def test_siloqy_actors_with_nautilus_process_management(): "candle_interval_ms": 15 * 60 * 1000, "throttle_mode": True, # ENABLED: Safe for dual instance testing "throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s) - "max_symbols_throttled": 100 # Only 100 symbols (vs 2000+) + "max_symbols_throttled": 414 # Only 100 symbols (vs 2000+) } ) @@ -880,7 +1483,8 @@ def test_siloqy_actors_with_nautilus_process_management(): config={ "component_id": "SILOQY-MAIN-ACTOR", "candle_interval_ms": 15 * 60 * 1000, - "throttle_mode": True # ENABLED: Reduced tick generation + "throttle_mode": False, # ENABLED: Reduced tick generation + "enable_real_data": True # CHANGE TO True for real WebSocket data } ) @@ -890,7 +1494,7 @@ def test_siloqy_actors_with_nautilus_process_management(): config={ "component_id": "DOLPHIN-REGIME-ACTOR", "max_symbols": 5000, - "ticks_per_analysis": 500 # Reduced for throttle mode testing + "ticks_per_analysis": 2 # Reduced for throttle mode testing } ) @@ -915,8 +1519,8 @@ def test_siloqy_actors_with_nautilus_process_management(): exec_clients={} # No execution clients for this test ) - node = TradingNode(config=trading_config) - + node = TradingNode(config=trading_config) + try: node.build() print("Node built successfully with Nautilus built-in process management") diff --git a/nautilus_actor_test_implementation_6x.py b/nautilus_actor_test_implementation_6x.py new file mode 100644 index 0000000..fd0ab8d --- /dev/null +++ b/nautilus_actor_test_implementation_6x.py @@ -0,0 +1,1549 @@ +import time +import numpy as np +import asyncio +import json +import math +import httpx +import websockets +from typing import Dict, List, Optional, Tuple, Any +from enum import Enum +from collections import deque +import random # Added for _simulate_websocket_ticks +from dataclasses import dataclass, field + +# Nautilus imports - following test pattern +from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig +from nautilus_trader.live.node import TradingNode +from nautilus_trader.common.actor import Actor +from nautilus_trader.common.config import ActorConfig +from nautilus_trader.model.identifiers import TraderId +from nautilus_trader.core.data import Data +from nautilus_trader.common.component import Logger, init_logging + +# NEW: construct a fresh Tick for each normalized message (don't mutate immutable Data) +from nautilus_trader.model.data import TradeTick +from nautilus_trader.model.identifiers import InstrumentId, TradeId +from nautilus_trader.model.objects import Price, Quantity +from nautilus_trader.model.enums import AggressorSide + +# Initialize logging +_log_guard = init_logging() +_logger = Logger("SILOQY") + +# Topics - HIGH PERFORMANCE STRING TOPICS +RAW_TOPIC = "SILOQY.RAW.TICKS" +STRUCTURED_TOPIC = "SILOQY.STRUCTURED.TICKS" +REGIME_TOPIC = "DOLPHIN.REGIME.RESULTS" +SYMBOLS_DISCOVERED_TOPIC = "SILOQY.SYMBOLS.DISCOVERED" +CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL" +# ADDED LINE 18: +TICK_SIZES_TOPIC = "SILOQY.TICK.SIZES" + +# NEW: Enhanced BB indicator topics for tuple-based publishing +REGIME_INDICATORS_TOPIC = "DOLPHIN.REGIME.INDICATORS" +BB_METRICS_TOPIC = "DOLPHIN.BB.METRICS" +TEMPORAL_PATTERNS_TOPIC = "DOLPHIN.TEMPORAL.PATTERNS" + +# Rate limiting constant +MIN_INTERVAL = 2.5 # seconds between API batches + +# Market Regime Enum +class MarketRegime(Enum): + BULL = "BULL" + BEAR = "BEAR" + TRANSITION = "TRANSITION" + SIDEWAYS = "SIDEWAYS" + +# ============================================================================ +# EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE +# ============================================================================ + +#TODO TODO: Manually added. Left for *future* re-integration of class instead of tuples (where appropriate) +class SILOQYQuoteTick(Data): + def __init__(self, instrument_id, price, size, ts_event, ts_init=None): + super().__init__() + self.instrument_id = str(instrument_id) + self.price = float(price) + self.size = float(size) + self.ts_event = int(ts_event) + self.ts_init = int(ts_init if ts_init is not None else time.time_ns()) + +@dataclass +class TickData: + """Universal tick data structure from standalone""" + symbol: str + price: float + quantity: float + timestamp: int + is_buyer_maker: bool + trade_id: int + exchange: str + raw_data: Dict = None + + def __post_init__(self): + if self.raw_data is None: + self.raw_data = {} + +@dataclass +class ExchangeConfig: + """Exchange-specific configuration from standalone""" + name: str + websocket_limit_per_second: int + max_streams_per_connection: int + max_connections_per_window: int + connection_window_seconds: int + base_websocket_url: str + requires_auth: bool = False + ping_interval: float = 20.0 + ping_timeout: float = 10.0 + +class BinanceAdapter: + """EXTRACTED FROM STANDALONE: Binance-specific WebSocket adapter""" + + def __init__(self): + self.config = ExchangeConfig( + name="binance", + websocket_limit_per_second=10, + max_streams_per_connection=1024, + max_connections_per_window=300, + connection_window_seconds=300, # 5 minutes + base_websocket_url="wss://stream.binance.com:9443", + requires_auth=False + ) + self.logger = Logger(f"BINANCE_ADAPTER") + + def get_websocket_url(self, symbols: List[str]) -> str: + """Build Binance WebSocket URL with combined streams""" + if len(symbols) == 1: + symbol = symbols[0].lower() + return f"{self.config.base_websocket_url}/ws/{symbol}@trade" + else: + # Combined stream approach + streams = [f"{symbol.lower()}@trade" for symbol in symbols] + stream_string = "/".join(streams) + return f"{self.config.base_websocket_url}/stream?streams={stream_string}" + + def parse_message(self, message: str) -> Optional[TickData]: + """EXTRACTED FROM STANDALONE: Parse Binance trade message""" + try: + data = json.loads(message) + + # Handle combined stream format + if 'stream' in data and 'data' in data: + stream_name = data['stream'] + trade_data = data['data'] + else: + # Single stream format + trade_data = data + stream_name = trade_data.get('e', '') + + # Only process trade events + if not (stream_name.endswith('@trade') or trade_data.get('e') == 'trade'): + return None + + return TickData( + symbol=trade_data['s'], + price=float(trade_data['p']), + quantity=float(trade_data['q']), + timestamp=int(trade_data['T']), + is_buyer_maker=trade_data['m'], + trade_id=int(trade_data['t']), + exchange="binance", + raw_data=trade_data + ) + + except (json.JSONDecodeError, KeyError, ValueError) as e: + self.logger.error(f"Failed to parse Binance message: {e}") + return None + + def calculate_optimal_distribution(self, symbols: List[str]) -> List[List[str]]: + """EXTRACTED FROM STANDALONE: Calculate optimal symbol distribution across connections""" + # Conservative: leave room for message bursts + safety_margin = 0.7 + symbols_per_connection = int( + min( + self.config.max_streams_per_connection * safety_margin, + self.config.websocket_limit_per_second * 5 # 5-second buffer + ) + ) + + # Group symbols into chunks + symbol_groups = [] + for i in range(0, len(symbols), symbols_per_connection): + group = symbols[i:i + symbols_per_connection] + symbol_groups.append(group) + + return symbol_groups + +class SiloqyWebSocketConnection: + """EXTRACTED FROM STANDALONE: WebSocket connection adapted for Nautilus msgbus""" + + def __init__(self, connection_id: str, symbols: List[str], + binance_adapter: BinanceAdapter, main_actor_ref): + self.connection_id = connection_id + self.symbols = symbols + self.adapter = binance_adapter + self.main_actor = main_actor_ref # Reference to SILOQYMainActor for msgbus access + + self.websocket = None + self.is_running = False + self.retry_count = 0 + self.messages_received = 0 + self.last_message_time = 0.0 + self.connected_at = None + + self.logger = Logger(f"WS_{connection_id}") + self.logger.info(f"WebSocket connection initialized for {len(symbols)} symbols") + + async def start(self) -> None: + """Start the WebSocket connection""" + self.is_running = True + + while self.is_running: + try: + await self._connect_and_run() + except Exception as e: + self.logger.error(f"Connection {self.connection_id} error: {e}") + + if self.is_running: + await self._handle_reconnection() + else: + break + + async def _connect_and_run(self) -> None: + """EXTRACTED FROM STANDALONE: Connect to WebSocket and run message loop""" + url = self.adapter.get_websocket_url(self.symbols) + + self.logger.info(f"Connecting to Binance: {url[:100]}...") + + async with websockets.connect( + url, + ping_interval=self.adapter.config.ping_interval, + ping_timeout=self.adapter.config.ping_timeout, + close_timeout=10 + ) as websocket: + self.websocket = websocket + self.connected_at = time.time() + self.retry_count = 0 + + self.logger.info(f"Connection {self.connection_id} established") + + # Message processing loop + async for message in websocket: + if not self.is_running: + break + + try: + await self._process_message(message) + except Exception as e: + self.logger.error(f"Message processing error: {e}") + + async def _process_message(self, message: str) -> None: + """Process incoming WebSocket message and convert to Nautilus format""" + self.messages_received += 1 + self.last_message_time = time.time() + + # Parse message using Binance adapter + tick_data = self.adapter.parse_message(message) + if tick_data: + # Convert TickData to SiloqyTradeTick and publish via Nautilus msgbus + try: + # Create SiloqyTradeTick object + siloqy_tick = SiloqyTradeTick( + instrument_id=tick_data.symbol, + price=round(tick_data.price, 8), + size=round(tick_data.quantity, 8), + ts_event=tick_data.timestamp * 1000000, # Convert ms to ns + ts_init=int(time.time_ns()), + open_price=None, # Will be set by candle logic + candle_open_time=None, # Will be set by candle logic + tick_side=None, + exchange="BINANCE", + liquidity_flag=None + ) + + # Get candle information from main actor + if hasattr(self.main_actor, 'active_candles') and self.main_actor.active_candles: + symbol = tick_data.symbol + if symbol in self.main_actor.active_candles: + candle = self.main_actor.active_candles[symbol] + siloqy_tick.open_price = candle.get('open_price', tick_data.price) + siloqy_tick.candle_open_time = candle.get('open_time', int(time.time() * 1000)) + + # Process through main actor's tick handling (preserves all candle logic) + self.main_actor.on_websocket_tick( + tick_data.symbol, + tick_data.price, + tick_data.quantity, + tick_data.timestamp + ) + + except Exception as e: + self.logger.error(f"Failed to process tick: {e}") + + async def _handle_reconnection(self) -> None: + """Handle reconnection with exponential backoff""" + self.retry_count += 1 + delay = min(2 ** self.retry_count, 60) # Max 60 seconds + + self.logger.warning(f"Reconnecting in {delay}s (attempt {self.retry_count})") + await asyncio.sleep(delay) + + async def stop(self) -> None: + """Stop the connection""" + self.logger.info(f"Stopping connection {self.connection_id}") + self.is_running = False + + if self.websocket: + await self.websocket.close() + +# -------------------------------------------------------------------- +# SILOQY Custom Tick - PRESERVED with fixes +# -------------------------------------------------------------------- +class SiloqyTradeTick(TradeTick): + def __init__(self, instrument_id: str, price: float, size: float, ts_event: int, ts_init: int = None, + open_price: float = None, candle_open_time: int = None, tick_side: str = None, + exchange: str = None, liquidity_flag = None): + # Convert to proper Nautilus types - add venue to symbol + if isinstance(instrument_id, str): + # Add BINANCE venue if not already present + if '.' not in instrument_id: + instrument_id = f"{instrument_id}.BINANCE" + instr_id = InstrumentId.from_str(instrument_id) + else: + instr_id = instrument_id + # STEP 1 FIX: Changed precision from 4 to 8 for price, and 0 to 8 for size + price_obj = Price(price, precision=8) if not isinstance(price, Price) else price + size_obj = Quantity(size, precision=8) if not isinstance(size, Quantity) else size + + # Default aggressor side and trade ID + aggressor_side = AggressorSide.NO_AGGRESSOR # Default since Binance doesn't always provide this + trade_id = TradeId(f"T{int(time.time_ns())}") # Generate a trade ID + + super().__init__( + instrument_id=instr_id, + price=price_obj, + size=size_obj, + aggressor_side=aggressor_side, + trade_id=trade_id, + ts_event=int(ts_event), + ts_init=int(ts_init if ts_init is not None else time.time_ns()) + ) + + # Additional SILOQY fields + self.open_price = open_price if open_price is not None else price + self.candle_open_time = candle_open_time if candle_open_time is not None else int(time.time() * 1000) + self.tick_side = tick_side + self.exchange = exchange + self.liquidity_flag = liquidity_flag + +class SILOQYSymbolDiscoveryConfig(ActorConfig): + symbols: List[str] = [] + candle_interval_ms: int = 15 * 60 * 1000 + throttle_mode: bool = False + throttle_rate_limit_seconds: float = 10.0 + max_symbols_throttled: int = 100 + +class SILOQYMainActorConfig(ActorConfig): + candle_interval_ms: int = 15 * 60 * 1000 + throttle_mode: bool = False + enable_real_data: bool = False # NEW: Enable real WebSocket data + +class DOLPHINRegimeActorConfig(ActorConfig): + max_symbols: int = 5000 + ticks_per_analysis: int = 10 + +class SILOQYNormalizerConfig(ActorConfig): + pass + +class SILOQYSymbolDiscoveryActor(Actor): + """ + Symbol discovery with all original SILOQY algorithms preserved + Using Nautilus built-in ActorExecutor for task management + """ + def __init__(self, config: SILOQYSymbolDiscoveryConfig): + super().__init__(config) + + # Preserve original SILOQY configuration + self.symbols = list(config.symbols) if config.symbols else [] + self.candle_interval_ms = config.candle_interval_ms + self.active_candles = {} + self.tick_sizes = {} + + # Process management configuration + self.throttle_mode = config.throttle_mode + self.throttle_rate_limit_seconds = config.throttle_rate_limit_seconds + self.max_symbols_throttled = config.max_symbols_throttled + + if self.throttle_mode: + self.log.warning("THROTTLE MODE ENABLED - Running in dev/test configuration") + self.log.warning(f"Rate limit: {self.throttle_rate_limit_seconds}s between batches") + self.log.warning(f"Symbol limit: {self.max_symbols_throttled} symbols max") + + self.log.info("SILOQYSymbolDiscoveryActor initialized with Nautilus ActorExecutor") + + def on_start(self) -> None: + """Start the actor - using Nautilus ActorExecutor""" + self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor starting") + + # Use Nautilus ActorExecutor for task management + if hasattr(self, '_executor') and self._executor: + self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization") + future = self._executor.submit(self.on_start_async()) + self.log.info("Nautilus ActorExecutor: Symbol discovery task submitted") + else: + self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") + asyncio.create_task(self.on_start_async()) + + def on_stop(self) -> None: + """Stop the actor - Nautilus handles executor cleanup""" + self.log.info("Nautilus ActorExecutor: SILOQYSymbolDiscoveryActor stopping") + # Nautilus kernel handles executor shutdown - no manual cleanup needed + + async def on_start_async(self) -> None: + self.log.info("Nautilus ActorExecutor: Starting async symbol discovery initialization") + + try: + # PRESERVED: Original symbol discovery algorithm + if not self.symbols: + await self._discover_all_symbols() + + # PRESERVED: Original stats fetching and candle reconstruction + stats, candle_opens = await self._fetch_stats_and_reconstruct_candles() + + # Set active candles from reconstruction results + self.active_candles = candle_opens + + # Publish results using msgbus + self._publish_discovery_results() + + self.log.info("Nautilus ActorExecutor: Symbol discovery completed successfully") + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to complete symbol discovery: {e}") + # Don't re-raise, let system continue + + async def _discover_all_symbols(self): + """PRESERVED: Original Binance symbol discovery algorithm""" + self.log.info("Starting dynamic symbol discovery from Binance...") + url = "https://api.binance.com/api/v3/exchangeInfo" + + async with httpx.AsyncClient() as client: + self.log.info("Fetching exchange info from Binance API...") + response = await client.get(url, timeout=10) + if response.status_code == 200: + self.log.info("Successfully received exchange info") + data = response.json() + + # Combined symbol discovery and tick size extraction + self.log.info("Processing symbols and extracting tick sizes...") + full_symbols = [] + for symbol_info in data['symbols']: + if symbol_info['status'] == 'TRADING' and symbol_info['symbol'].endswith('USDT'): + symbol = symbol_info['symbol'] + full_symbols.append(symbol) + + # Extract tick size while processing # Extract tick size while processing + tick_size = None + for filter_info in symbol_info['filters']: + if filter_info['filterType'] == 'PRICE_FILTER': + tick_size = float(filter_info['tickSize']) + break + + # If no PRICE_FILTER found, try other filter types + if tick_size is None: + for filter_info in symbol_info['filters']: + if filter_info['filterType'] == 'TICK_SIZE': + tick_size = float(filter_info['tickSize']) + break + + # Fallback to default if still not found + if tick_size is None: + tick_size = 1e-8 # Default fallback + self.log.warning(f"No tick size found for {symbol}, using fallback {tick_size}") + + self.tick_sizes[symbol] = tick_size + + self.log.info(f"Processed {len(full_symbols)} symbols, extracted {len(self.tick_sizes)} tick sizes") + + # Apply throttle mode symbol limiting + if self.throttle_mode: + self.symbols = full_symbols[:self.max_symbols_throttled] + self.log.warning(f"THROTTLE MODE: Limited to {len(self.symbols)} symbols (from {len(full_symbols)} available)") + else: + self.symbols = full_symbols + + self.log.info(f"Discovered {len(self.symbols)} trading symbols") + self.log.info(f"First 10 symbols: {self.symbols[:10]}") + else: + self.log.error(f"Failed to fetch exchange info: {response.status_code}") + raise Exception(f"Failed to fetch exchange info: {response.status_code}") + + async def _fetch_stats_and_reconstruct_candles(self): + """PRESERVED: All original rate limiting with Nautilus async patterns""" + url = "https://api.binance.com/api/v3/ticker/24hr" + klines_url = "https://api.binance.com/api/v3/klines" + ticker_url = "https://api.binance.com/api/v3/ticker/price" + + # PRESERVED: Original rate limit handling with throttle mode support + base_rate_limit = 2.5 + rate_limit_interval = self.throttle_rate_limit_seconds if self.throttle_mode else base_rate_limit + + if self.throttle_mode: + self.log.warning(f"THROTTLE MODE: Using {rate_limit_interval}s intervals (vs {base_rate_limit}s normal)") + + stats = {} + candle_opens = {} + + async with httpx.AsyncClient() as client: + total_batches = (len(self.symbols) + 49) // 50 + for i in range(0, len(self.symbols), 50): + batch_num = (i // 50) + 1 + start_time = time.time() + symbols_batch = self.symbols[i:i + 50] + + if self.throttle_mode: + self.log.info(f"THROTTLE MODE: Processing batch {batch_num}/{total_batches} with {rate_limit_interval}s delay") + + # PRESERVED: Original boundary detection logic + current_time = int(time.time() * 1000) + time_into_candle = current_time % self.candle_interval_ms + candle_open_time = current_time - time_into_candle + at_boundary = (time_into_candle < 1000) + + if i == 0: # Log initial state + self.log.info("DOLPHIN: Current candle analysis:") + self.log.info(f" - Current time: {current_time}") + self.log.info(f" - Candle interval: {self.candle_interval_ms}ms ({self.candle_interval_ms//60000}m)") + self.log.info(f" - Time into candle: {time_into_candle}ms ({time_into_candle//1000}s)") + self.log.info(f" - At boundary: {at_boundary}") + self.log.info(f" - Candle open time: {candle_open_time}") + + symbols_json_string = json.dumps(symbols_batch, separators=(',', ':')) + params = {"symbols": symbols_json_string} + + try: + # PRESERVED: Original stats fetching + response = await client.get(url, params=params, timeout=10) + + if response.status_code == 200: + data = response.json() + for item in data: + symbol = item['symbol'] + stats[symbol] = { + 'count': int(item['count']), + 'quoteVolume': float(item['quoteVolume']), + } + self.log.info(f"Fetched stats for batch {i//50 + 1}: {len(data)} symbols") + + else: + self.log.error(f"Error {response.status_code}: {response.text}") + + # PRESERVED: Original DOLPHIN candle reconstruction strategy + if at_boundary: + # AT BOUNDARY: Fetch current prices to use as opens + self.log.info(f"DOLPHIN: At boundary - fetching current prices (batch {i//50 + 1})") + + ticker_params = {"symbols": symbols_json_string} + ticker_response = await client.get(ticker_url, params=ticker_params, timeout=5) + + if ticker_response.status_code == 200: + ticker_data = ticker_response.json() + for item in ticker_data: + symbol = item['symbol'] + current_price = float(item['price']) + candle_opens[symbol] = current_price + + self.log.info(f" - Fetched current prices for {len(ticker_data)} symbols") + else: + self.log.warning(f" - Current price fetch failed ({ticker_response.status_code})") + + else: + # NOT AT BOUNDARY: Fetch historical 1s kline data + self.log.info(f"DOLPHIN: Not at boundary - fetching 1s kline data (batch {i//50 + 1})") + + for symbol in symbols_batch: + try: + # PRESERVED: Original kline fetching logic + kline_params = { + 'symbol': symbol, + 'interval': '1s', + 'startTime': candle_open_time, + 'endTime': candle_open_time + 1000, + 'limit': 1 + } + + kline_response = await client.get(klines_url, params=kline_params, timeout=5) + + if kline_response.status_code == 200: + kline_data = kline_response.json() + if kline_data: + open_price = float(kline_data[0][1]) + candle_opens[symbol] = open_price + + if (i + len(symbols_batch)) <= 10: + self.log.info(f" - {symbol}: reconstructed open = {open_price}") + else: + self.log.warning(f" - {symbol}: no 1s kline data found") + else: + self.log.warning(f" - {symbol}: kline fetch failed ({kline_response.status_code})") + + except Exception as e: + self.log.error(f" - {symbol}: kline fetch error: {e}") + + await asyncio.sleep(0.1) + + except Exception as e: + self.log.error(f"Request failed: {str(e)}") + + # PRESERVED: Original rate limiting with Nautilus async + elapsed = time.time() - start_time + if i + 50 < len(self.symbols): + sleep_time = max(0, rate_limit_interval - elapsed) + if sleep_time > 0: + await asyncio.sleep(sleep_time) + + self.log.info(f"DOLPHIN: Candle reconstruction complete:") + self.log.info(f" - Symbols processed: {len(self.symbols)}") + self.log.info(f" - Opens reconstructed: {len(candle_opens)}") + self.log.info("Discovery phase ready for publishing...") + + return stats, candle_opens + + def _publish_discovery_results(self): + """Publish results using msgbus - Nautilus message bus integration""" + try: + self.log.info("Nautilus ActorExecutor: Publishing discovery results to other actors...") + if hasattr(self, 'msgbus') and self.msgbus: + # Publish symbols and candles as tuples + self.msgbus.publish(SYMBOLS_DISCOVERED_TOPIC, (self.symbols, int(time.time_ns()))) + self.msgbus.publish(CANDLES_INITIAL_TOPIC, (self.active_candles, int(time.time_ns()))) + self.msgbus.publish(TICK_SIZES_TOPIC, (self.tick_sizes, int(time.time_ns()))) + + self.log.info(f"Nautilus ActorExecutor: Published {len(self.symbols)} symbols and {len(self.active_candles)} candles") + self.log.info(f"Nautilus ActorExecutor: Published {len(self.tick_sizes)} tick sizes") + self.log.info("Nautilus ActorExecutor: Discovery phase complete - other actors can now start processing") + else: + self.log.warning("Nautilus ActorExecutor: msgbus not available for publishing") + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish discovery results: {e}") + +class SILOQYMainActor(Actor): + def __init__(self, config: SILOQYMainActorConfig): + super().__init__(config) + + # Preserve original configuration + self.candle_interval_ms = config.candle_interval_ms + self.throttle_mode = config.throttle_mode + self.enable_real_data = config.enable_real_data # NEW: Real data capability + self.connections = {} + self.connection_tasks = {} + + # Will be populated from discovery actor + self.symbols = [] + self.active_candles = {} + + # WebSocket infrastructure - NEW + self.binance_adapter = BinanceAdapter() if self.enable_real_data else None + self.websocket_connections: Dict[str, SiloqyWebSocketConnection] = {} + self.websocket_tasks: Dict[str, asyncio.Task] = {} + + # WebSocket tasks - managed by Nautilus ActorExecutor + self.ws_tasks = [] + + # Synchronization + self.discovery_complete = asyncio.Event() + + if self.throttle_mode: + self.log.warning("THROTTLE MODE: Main actor will use reduced tick generation") + + if self.enable_real_data: + self.log.info("REAL DATA MODE: Will connect to Binance WebSocket streams") + else: + self.log.info("SIMULATION MODE: Will generate simulated ticks") + + self.log.info("SILOQYMainActor initialized with Nautilus ActorExecutor") + + def on_start(self) -> None: + """Subscribe to discovery events - using Nautilus ActorExecutor""" + self.log.info("Nautilus ActorExecutor: SILOQYMainActor starting - subscribing to discovery events") + + # Subscribe to discovery results + if hasattr(self, 'msgbus') and self.msgbus: + self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered) + self.msgbus.subscribe(CANDLES_INITIAL_TOPIC, self.handle_candles_initial) + self.log.info("Nautilus ActorExecutor: Subscribed to discovery topics") + + # Use Nautilus ActorExecutor for task management + if hasattr(self, '_executor') and self._executor: + self.log.info("Nautilus ActorExecutor: Using registered executor for async initialization") + future = self._executor.submit(self.on_start_async()) + self.log.info("Nautilus ActorExecutor: Main actor initialization task submitted") + else: + self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") + asyncio.create_task(self.on_start_async()) + + def on_stop(self) -> None: + """Stop the actor - Nautilus handles executor cleanup""" + self.log.info("Nautilus ActorExecutor: SILOQYMainActor stopping") + + # Stop WebSocket connections if running + if self.enable_real_data: + for conn in self.websocket_connections.values(): + asyncio.create_task(conn.stop()) + + # Nautilus kernel handles executor shutdown - no manual cleanup needed + + async def on_start_async(self) -> None: + try: + # Wait for symbol discovery to complete + self.log.info("Nautilus ActorExecutor: Waiting for symbol discovery to complete...") + await asyncio.wait_for(self.discovery_complete.wait(), timeout=120.0) + + # ENHANCED: Start real or simulated connections based on configuration + if self.enable_real_data: + await self._start_real_websocket_connections() + else: + await self._start_simulated_connections() + + self.log.info("Nautilus ActorExecutor: SILOQYMainActor startup completed successfully") + + except asyncio.TimeoutError: + self.log.error("Nautilus ActorExecutor: Timeout waiting for symbol discovery") + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to start SILOQYMainActor: {e}") + + def handle_symbols_discovered(self, data): + """Handle symbols from discovery actor""" + try: + symbols, timestamp = data + self.symbols = symbols + self.log.info(f"Nautilus ActorExecutor: Received {len(symbols)} symbols from discovery actor") + + if self.symbols and self.active_candles: + self.discovery_complete.set() + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Error handling symbols discovered: {e}") + + def handle_candles_initial(self, data): + """Handle initial candles from discovery actor and properly initialize the candle dict.""" + try: + candles_received, timestamp = data + self.log.info(f"Nautilus ActorExecutor: Received {len(candles_received)} initial candles from discovery actor") + + # FIX: Re-initialize active_candles with the full dictionary structure + # STEP 5 FIX: Added rounding to 8 decimals for all price values + self.active_candles = {} + for symbol, open_price in candles_received.items(): + self.active_candles[symbol] = { + 'open_price': round(open_price, 8), + 'open_time': (int(time.time() * 1000) // self.candle_interval_ms) * self.candle_interval_ms, + 'high': round(open_price, 8), + 'low': round(open_price, 8), + 'close': round(open_price, 8), + 'volume': 0.0, + } + + if self.symbols and self.active_candles: + self.discovery_complete.set() + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Error handling candles initial: {e}") + + async def _start_real_websocket_connections(self): + """NEW: Start real Binance WebSocket connections with throttle control""" + self.log.info("Starting REAL Binance WebSocket connections for live data...") + + # Apply throttle mode limits to WebSocket connections + symbols_to_use = self.symbols + if self.throttle_mode: + symbols_to_use = self.symbols[:10] # Limit to 10 symbols in throttle mode + self.log.warning(f"THROTTLE MODE: Limited real data to {len(symbols_to_use)} symbols") + + # Calculate optimal symbol distribution + symbol_groups = self.binance_adapter.calculate_optimal_distribution(symbols_to_use) + + self.log.info(f"Real WebSocket distribution:") + self.log.info(f" - Total symbols: {len(symbols_to_use)}") + self.log.info(f" - Connections needed: {len(symbol_groups)}") + + # Start WebSocket connections + for i, symbol_group in enumerate(symbol_groups): + connection_id = f"binance_real_{i:03d}" + + connection = SiloqyWebSocketConnection( + connection_id=connection_id, + symbols=symbol_group, + binance_adapter=self.binance_adapter, + main_actor_ref=self + ) + + self.websocket_connections[connection_id] = connection + + # Use Nautilus ActorExecutor for WebSocket task management + if hasattr(self, '_executor') and self._executor: + future = self._executor.submit(connection.start()) + self.log.info(f"Real WebSocket {connection_id} submitted to ActorExecutor") + else: + task = asyncio.create_task(connection.start()) + self.websocket_tasks[connection_id] = task + self.log.warning(f"Real WebSocket {connection_id} started with asyncio fallback") + + # Rate limit connection establishment + if i < len(symbol_groups) - 1: + await asyncio.sleep(1.0) # 1 second between connections + + self.log.info(f"Started {len(symbol_groups)} real WebSocket connections") + + async def _start_simulated_connections(self): + """PRESERVED: Original WebSocket simulation logic with Nautilus ActorExecutor""" + self.log.info("Starting WebSocket simulation for tick generation...") + self.log.info(f"Will simulate ticks for first 10 of {len(self.symbols)} symbols") + + # Use Nautilus ActorExecutor for WebSocket task management + if hasattr(self, '_executor') and self._executor: + self.log.info("Nautilus ActorExecutor: Using registered executor for WebSocket simulation") + future = self._executor.submit(self._simulate_websocket_ticks()) + self.log.info("Nautilus ActorExecutor: WebSocket simulation task submitted") + else: + self.log.warning("Nautilus ActorExecutor: No executor registered, falling back to asyncio") + task = asyncio.create_task(self._simulate_websocket_ticks()) + self.ws_tasks.append(task) + + async def _simulate_websocket_ticks(self): + """PRESERVED: Original simulation logic""" + # Adjust tick rate for throttle mode + tick_delay = 0.1 if self.throttle_mode else 0.01 + symbols_to_simulate = min(5 if self.throttle_mode else 10, len(self.symbols)) + + tick_count = 0 + try: + if self.throttle_mode: + self.log.warning(f"THROTTLE MODE: Reduced tick generation - {symbols_to_simulate} symbols at 10 ticks/second") + else: + self.log.info("WebSocket tick simulation started - generating 100 ticks/second") + + while True: + for symbol in self.symbols[:symbols_to_simulate]: + # STEP 2 FIX: Round price to 8 decimals, use float for quantity with 8 decimals + price = round(100.0 + random.gauss(0, 0.5), 8) + quantity = round(random.uniform(0.00000001, 100.0), 8) + timestamp = int(time.time() * 1000) + + self.on_websocket_tick(symbol, price, quantity, timestamp) + tick_count += 1 + + # Log every 500 ticks in throttle mode, 1000 in normal mode + log_interval = 500 if self.throttle_mode else 1000 + if tick_count % log_interval == 0: + mode_indicator = "THROTTLE" if self.throttle_mode else "" + data_source = "REAL" if self.enable_real_data else "SIM" + self.log.info(f"Generated {tick_count} ticks {mode_indicator} {data_source} - latest: {symbol}@{price:.2f}") + + await asyncio.sleep(tick_delay) + except asyncio.CancelledError: + self.log.info(f"Nautilus ActorExecutor: WebSocket simulation stopped after {tick_count} ticks") + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: WebSocket simulation error: {e}") + + def on_websocket_tick(self, symbol: str, price: float, quantity: float, timestamp: int): + """ + PRESERVED: Original tick processing and candle update logic + """ + # PRESERVED: Original candle update logic + if symbol not in self.active_candles: + candle_open_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms + # STEP 4 FIX: Round all prices to 8 decimals + self.active_candles[symbol] = { + 'open_price': round(price, 8), + 'open_time': candle_open_time, + 'high': round(price, 8), + 'low': round(price, 8), + 'close': round(price, 8), + 'volume': 0.0 + } + + candle = self.active_candles[symbol] + + # PRESERVED: Original candle rolling logic + current_candle_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms + if current_candle_time > candle['open_time']: + # STEP 4 FIX: Round all prices to 8 decimals + candle['open_price'] = round(price, 8) + candle['open_time'] = current_candle_time + candle['high'] = round(price, 8) + candle['low'] = round(price, 8) + candle['close'] = round(price, 8) + candle['volume'] = quantity + else: + candle['close'] = round(price, 8) + candle['high'] = round(max(candle['high'], price), 8) + candle['low'] = round(min(candle['low'], price), 8) + candle['volume'] += quantity + + # PRESERVED: Original high-performance tuple publishing + try: + tick_tuple = ( + str(symbol), + float(price), + float(quantity), + int(timestamp), + float(candle['open_price']), + int(candle['open_time']) + ) + + self.msgbus.publish(RAW_TOPIC, tick_tuple) + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Faish tick: {e}") + print(f"EXCEPTION DETAILS: {type(e)} - {e}") + +class DOLPHINRegimeActor(Actor): + def __init__(self, config: DOLPHINRegimeActorConfig): + super().__init__(config) + + # PRESERVED: All original DOLPHIN configuration + self.max_symbols = config.max_symbols + self.ticks_per_analysis = config.ticks_per_analysis + + # PRESERVED: All original pre-allocated arrays for zero allocation + self.open_prices = np.zeros(self.max_symbols, dtype=np.float64) + self.close_prices = np.zeros(self.max_symbols, dtype=np.float64) + self.high_prices = np.zeros(self.max_symbols, dtype=np.float64) + self.low_prices = np.zeros(self.max_symbols, dtype=np.float64) + self.volumes = np.zeros(self.max_symbols, dtype=np.float64) + self.last_update = np.zeros(self.max_symbols, dtype=np.int64) + self.tick_sizes = np.full(self.max_symbols, 1e-8, dtype=np.float64) # Default fallback + + # PRESERVED: All original mapping and state + self.symbol_to_idx = {} + self.idx_to_symbol = {} + self.active_symbols = 0 + + self.tick_count = 0 + + # PRESERVED: All original DOLPHIN thresholds - EXACT + self.bull_threshold = 0.60 # 60% bullish + self.bear_threshold = 0.55 # 55% bearish + + self.previous_bull_ratio = None + self.regime_history = deque(maxlen=100) + + # Metrics + self.last_metric_log = time.time_ns() + self.processed_ticks = 0 + self.regime_calculations = 0 + + # NEW: Enhanced indicator tracking for BB and temporal patterns + self.signal_history = deque(maxlen=100) # For BB calculations + self.bb_period = 20 # BB calculation period + self.bb_std_dev = 2.0 # BB standard deviation multiplier + self.velocity_history = deque(maxlen=10) # For regime velocity tracking + self.confidence_history = deque(maxlen=20) # For confidence trend analysis + + self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, " + f"ticks_per_analysis: {self.ticks_per_analysis}") + + def on_start(self) -> None: + """Subscribe to tick events - using Nautilus ActorExecutor""" + self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor starting - subscribing to events") + + if hasattr(self, 'msgbus') and self.msgbus: + self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) + self.msgbus.subscribe(SYMBOLS_DISCOVERED_TOPIC, self.handle_symbols_discovered) + self.msgbus.subscribe(TICK_SIZES_TOPIC, self.handle_tick_sizes) + self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events, symbol discovery and tick sizes") + + def on_stop(self) -> None: + """Stop the actor - Nautilus handles executor cleanup""" + self.log.info("Nautilus ActorExecutor: DOLPHINRegimeActor stopping") + # Nautilus kernel handles executor shutdown - no manual cleanup needed + + def handle_symbols_discovered(self, data): + """Pre-initialize symbol mappings during discovery phase""" + try: + symbols, timestamp = data + self.log.info(f"Nautilus ActorExecutor: Pre-initializing {len(symbols)} symbol mappings") + + for symbol in symbols: + if self.active_symbols >= self.max_symbols: + self.log.warning(f"Max symbols ({self.max_symbols}) exceeded during initialization") + break + + idx = self.active_symbols + self.symbol_to_idx[symbol] = idx + self.idx_to_symbol[idx] = symbol + self.active_symbols += 1 + + self.log.info(f"Pre-initialized {self.active_symbols} symbol mappings") + + except Exception as e: + self.log.error(f"Error pre-initializing symbols: {e}") + + def handle_tick_sizes(self, data): + """Apply tick sizes to pre-initialized symbol mappings""" + try: + tick_sizes, timestamp = data + self.log.info(f"Nautilus ActorExecutor: Received {len(tick_sizes)} tick sizes from discovery actor") + + applied_count = 0 + for symbol, tick_size in tick_sizes.items(): + if symbol in self.symbol_to_idx: # Will exist from pre-initialization + idx = self.symbol_to_idx[symbol] + if 0 < tick_size <= 1.0: + self.tick_sizes[idx] = tick_size + applied_count += 1 + else: + self.log.warning(f"Invalid tick size {tick_size} for {symbol}, using fallback") + + self.log.info(f"Nautilus ActorExecutor: Applied {applied_count} tick sizes to pre-initialized symbols") + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Error handling tick sizes: {e}") + + def handle_raw_tick(self, data): + """ + SIMPLIFIED: Zero-allocation tick processing with pre-initialized symbols + Using Nautilus ActorExecutor for regime detection tasks + """ + try: + symbol, price, size, ts_event, open_price, candle_open_time = data + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Malformed tick data: {e}") + return + + # Direct lookup - symbol will exist from pre-initialization + if symbol not in self.symbol_to_idx: + self.log.error(f"Nautilus ActorExecutor: Symbol {symbol} not found in pre-initialized mappings") + return + + idx = self.symbol_to_idx[symbol] + + # Check if new candle period + if candle_open_time > self.last_update[idx]: + # Reset for new candle + self.open_prices[idx] = open_price + self.high_prices[idx] = price + self.low_prices[idx] = price + self.close_prices[idx] = price + self.volumes[idx] = size + self.last_update[idx] = candle_open_time + else: + # Update existing candle data + self.close_prices[idx] = price + self.high_prices[idx] = max(self.high_prices[idx], price) + self.low_prices[idx] = min(self.low_prices[idx], price) + self.volumes[idx] += size + + self.tick_count += 1 + self.processed_ticks += 1 + + # PRESERVED: Original trigger logic + if self.tick_count >= self.ticks_per_analysis: + # Use Nautilus ActorExecutor for regime detection + if hasattr(self, '_executor') and self._executor: + future = self._executor.submit(self._run_regime_detection_async()) + else: + # Fallback to sync detection + self._run_regime_detection() + self.tick_count = 0 + + # Periodic metrics logging + now = time.time_ns() + if now - self.last_metric_log > 1_000_000_000: + self.log.info(f"Nautilus ActorExecutor: DOLPHIN metrics - ticks: {self.processed_ticks}, " + f"regime_calcs: {self.regime_calculations}, active_symbols: {self.active_symbols}") + self.last_metric_log = now + + async def _run_regime_detection_async(self): + try: + self._run_regime_detection() + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}") + + def _calculate_enhanced_indicators(self, bull_ratio, bear_ratio, confidence, analyzed, total_symbols): + """NEW: Calculate enhanced indicators including BB metrics and temporal patterns""" + + # Initialize history attributes if they don't exist + if not hasattr(self, 'signal_history'): + self.signal_history = deque(maxlen=50) + if not hasattr(self, 'velocity_history'): + self.velocity_history = deque(maxlen=50) + if not hasattr(self, 'confidence_history'): + self.confidence_history = deque(maxlen=50) + if not hasattr(self, 'bb_period'): + self.bb_period = 20 + if not hasattr(self, 'bb_std_dev'): + self.bb_std_dev = 2.0 + + # Calculate regime momentum signal + base_momentum = (bull_ratio - bear_ratio) * 100 # -100 to +100 + sample_quality = min(analyzed / total_symbols, 1.0) if total_symbols > 0 else 0.0 + signal = base_momentum * confidence * sample_quality + + # Add to signal history + self.signal_history.append(signal) + + # Calculate velocity (rate of change in signal) + velocity = 0.0 + if len(self.signal_history) >= 2: + velocity = self.signal_history[-1] - self.signal_history[-2] + self.velocity_history.append(velocity) + + # Store confidence for trending + self.confidence_history.append(confidence) + + # Calculate Bollinger Bands if enough history + bb_metrics = {} + if len(self.signal_history) >= self.bb_period: + # REPLACE SLICE: recent_signals = list(self.signal_history)[-self.bb_period:] + recent_signals = [] + start_index = len(self.signal_history) - self.bb_period + for i in range(start_index, len(self.signal_history)): + recent_signals.append(self.signal_history[i]) + + sma = sum(recent_signals) / len(recent_signals) + + # Calculate standard deviation + variance = sum((x - sma) ** 2 for x in recent_signals) / len(recent_signals) + std_dev = variance ** 0.5 + + upper_band = sma + (self.bb_std_dev * std_dev) + lower_band = sma - (self.bb_std_dev * std_dev) + + # Position within BBs (mean reversion interpretation) + if signal > upper_band: + bb_position = 'ABOVE_UPPER' + elif signal < lower_band: + bb_position = 'BELOW_LOWER' + elif signal >= sma: + bb_position = 'UPPER_HALF' + else: + bb_position = 'LOWER_HALF' + + # Momentum persistence interpretation + if signal > upper_band: + momentum_signal = 'STRONG_BULL_BREAKOUT' + elif signal < lower_band: + momentum_signal = 'STRONG_BEAR_BREAKOUT' + elif signal > sma: + momentum_signal = 'MILD_BULLISH' + else: + momentum_signal = 'MILD_BEARISH' + + bb_metrics = { + 'signal': signal, + 'sma': sma, + 'upper_band': upper_band, + 'lower_band': lower_band, + 'bb_position': bb_position, + 'momentum_signal': momentum_signal, + 'bb_ready': True + } + else: + bb_metrics = { + 'signal': signal, + 'sma': None, + 'upper_band': None, + 'lower_band': None, + 'bb_position': 'INSUFFICIENT_DATA', + 'momentum_signal': 'INSUFFICIENT_DATA', + 'bb_ready': False + } + + # Calculate temporal patterns + temporal_metrics = {} + if len(self.velocity_history) >= 3: + avg_velocity = sum(self.velocity_history) / len(self.velocity_history) + velocity_trend = 'ACCELERATING' if velocity > avg_velocity else 'DECELERATING' + else: + avg_velocity = velocity + velocity_trend = 'BUILDING_HISTORY' + + if len(self.confidence_history) >= 5: + # REPLACE SLICE: confidence > sum(self.confidence_history[-5:-1])/4 + confidence_sum = 0.0 + count = 0 + start_index = len(self.confidence_history) - 5 + end_index = len(self.confidence_history) - 1 # exclude last element + for i in range(start_index, end_index): + confidence_sum += self.confidence_history[i] + count += 1 + + if count > 0: + confidence_avg = confidence_sum / count + confidence_trend = 'RISING' if confidence > confidence_avg else 'FALLING' + else: + confidence_trend = 'BUILDING_HISTORY' + else: + confidence_trend = 'BUILDING_HISTORY' + + temporal_metrics = { + 'velocity': velocity, + 'avg_velocity': avg_velocity, + 'velocity_trend': velocity_trend, + 'confidence_trend': confidence_trend + } + + return bb_metrics, temporal_metrics + + def _run_regime_detection(self): + self.regime_calculations += 1 + + total_symbols = self.active_symbols + if total_symbols == 0: + return + + analyzed = 0 + bullish = 0 + bearish = 0 + + # NEW: Track pattern of bullish/bearish symbols for this calculation + symbol_pattern = [] + + # PRESERVED: Original analysis with exact thresholds + for idx in range(self.active_symbols): + open_price = self.open_prices[idx] + close_price = self.close_prices[idx] + + if open_price == 0: + continue + + analyzed += 1 + + # NEW: HFT-grade tick-size based comparison + tick_size = self.tick_sizes[idx] + equality_threshold = tick_size / 2 # Half tick size standard + price_diff = abs(close_price - open_price) + + # Check if prices are effectively equal within tick size tolerance + if price_diff <= equality_threshold: + # Prices are effectively equal (within tick size tolerance) + symbol_pattern.append(f"S{close_price:.8f}={open_price:.8f}") + elif close_price > open_price: + # Bullish: close > open + bullish += 1 + # Arrow points to close (larger price) + symbol_pattern.append(f"B{open_price:.8f}->{close_price:.8f}") + else: + # Bearish: close < open + bearish += 1 + # Arrow points to open (larger price) + symbol_pattern.append(f"X{close_price:.8f}<-{open_price:.8f}") + + if analyzed == 0: + return + + # PRESERVED: Original ratio calculations + bull_ratio = bullish / analyzed + bear_ratio = bearish / analyzed + sideways_ratio = 1.0 - bull_ratio - bear_ratio + + # MODIFIED: Original DOLPHIN Algorithm - Simple 3-regime detection + if bull_ratio >= self.bull_threshold: # 60% bullish + regime = MarketRegime.BULL + elif bear_ratio >= self.bear_threshold: # 55% bearish + regime = MarketRegime.BEAR + else: + # Everything else is sideways + regime = MarketRegime.SIDEWAYS + + # PRESERVED: Original confidence calculation + confidence = self._calculate_confidence(bull_ratio, bear_ratio, analyzed, total_symbols) + + self.previous_bull_ratio = bull_ratio + + # Publish regime result using Nautilus message bus + try: + if hasattr(self, 'msgbus') and self.msgbus: + regime_tuple = ( + int(time.time() * 1000), + regime.value, + float(bull_ratio), + float(bear_ratio), + float(sideways_ratio), + int(analyzed), + int(total_symbols), + float(confidence) + ) + + self.msgbus.publish(REGIME_TOPIC, regime_tuple) + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}") + + # NEW: Calculate enhanced indicators + bb_metrics, temporal_metrics = self._calculate_enhanced_indicators( + bull_ratio, bear_ratio, confidence, analyzed, total_symbols + ) + + # NEW: Publish enhanced indicators to data bus as tuples + try: + if hasattr(self, 'msgbus') and self.msgbus: + timestamp = int(time.time() * 1000) + + # Publish regime indicators as tuple + indicator_tuple = ( + timestamp, + float(bb_metrics['signal']), + bool(bb_metrics['bb_ready']), + float(temporal_metrics['velocity']), + str(temporal_metrics['velocity_trend']), + str(temporal_metrics['confidence_trend']) + ) + self.msgbus.publish(REGIME_INDICATORS_TOPIC, indicator_tuple) + + # Publish BB metrics as tuple (only if BB is ready) + if bb_metrics['bb_ready']: + bb_tuple = ( + timestamp, + float(bb_metrics['signal']), + float(bb_metrics['sma']), + float(bb_metrics['upper_band']), + float(bb_metrics['lower_band']), + str(bb_metrics['bb_position']), + str(bb_metrics['momentum_signal']) + ) + self.msgbus.publish(BB_METRICS_TOPIC, bb_tuple) + + # Publish temporal patterns as tuple + temporal_tuple = ( + timestamp, + float(temporal_metrics['velocity']), + float(temporal_metrics['avg_velocity']), + str(temporal_metrics['velocity_trend']), + str(temporal_metrics['confidence_trend']), + int(len(self.signal_history)) + ) + self.msgbus.publish(TEMPORAL_PATTERNS_TOPIC, temporal_tuple) + + except Exception as e: + self.log.error(f"Failed to publish enhanced indicators: {e}") + + # Log regime changes + if not self.regime_history or regime != self.regime_history[-1]: + self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.2%} " + f"Bear: {bear_ratio:.2%} Sideways: {sideways_ratio:.2%} ({bullish}/{bearish}) | " + f"Confidence: {confidence:.2%} | Analyzed: {analyzed}/{total_symbols}") + self.regime_history.append(regime) + + # Periodic regime status (even without changes) + if self.regime_calculations % 50 == 0: # Every second, approx, given avg. tick rate + # Determine color based on regime + if regime == MarketRegime.BULL: + color_code = "\033[92m" # Green + elif regime == MarketRegime.BEAR: + color_code = "\033[91m" # Red + else: # SIDEWAYS + color_code = "\033[93m" # Yellow + + # Reset color code + reset_code = "\033[0m" + + self.log.info(f"{color_code}REGIME STATUS: {regime.value} | Bull: {bull_ratio:.2%} " + f"Bear: {bear_ratio:.2%} ({bullish}/{bearish}) | Processed: {self.processed_ticks} ticks{reset_code}") + + # Enhanced indicator line after regime status + if bb_metrics['bb_ready']: + self.log.info(f"{color_code}BB INDICATORS: Signal:{bb_metrics['signal']:.2f} | " + f"SMA:{bb_metrics['sma']:.2f} | Upper:{bb_metrics['upper_band']:.2f} | " + f"Lower:{bb_metrics['lower_band']:.2f} | Pos:{bb_metrics['bb_position']} | " + f"Mom:{bb_metrics['momentum_signal']} | Vel:{temporal_metrics['velocity']:.2f} | " + f"VelTrend:{temporal_metrics['velocity_trend']} | ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}") + else: + self.log.info(f"{color_code}BB INDICATORS: Signal:{bb_metrics['signal']:.2f} | " + f"Status:BUILDING_BB_HISTORY ({len(self.signal_history)}/{self.bb_period}) | " + f"Vel:{temporal_metrics['velocity']:.2f} | VelTrend:{temporal_metrics['velocity_trend']} | " + f"ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}") + + # NEW: Log symbol pattern and counts + if symbol_pattern: # Only if we have symbols to show + pattern_str = " ".join(symbol_pattern) + " " # Create pattern with spaces + bull_count = sum(1 for s in symbol_pattern if s.startswith("B")) + bear_count = sum(1 for s in symbol_pattern if s.startswith("X")) + + self.log.debug(f"{pattern_str} and totals: BULLS:{bull_count}/BEARS:{bear_count}") + + def _calculate_confidence(self, bull_ratio: float, bear_ratio: float, + analyzed: int, total: int) -> float: + """PRESERVED EXACTLY: Original DOLPHIN confidence calculation""" + if analyzed == 0: + return 0.0 + + # Market Decisiveness + max_ratio = max(bull_ratio, bear_ratio) + decisiveness = abs(max_ratio - 0.5) * 2 + + # Sample Coverage + coverage = analyzed / total + + # Statistical Significance + if max_ratio > 0 and max_ratio < 1: + standard_error = math.sqrt(max_ratio * (1 - max_ratio) / analyzed) + else: + standard_error = 0.0 + + z_score = abs(max_ratio - 0.5) / max(standard_error, 0.001) + statistical_confidence = min(z_score / 3.0, 1.0) + + # Market Clarity + market_clarity = bull_ratio + bear_ratio + + # Weighted combination + confidence = ( + decisiveness * 0.40 + + coverage * 0.10 + + statistical_confidence * 0.30 + + market_clarity * 0.20 + ) + + return max(0.0, min(confidence, 1.0)) + +class SILOQYNormalizerActor(Actor): + def __init__(self, config: SILOQYNormalizerConfig): + super().__init__(config) + + self.normalized_count = 0 + self.last_metric_log = time.time_ns() + + self.log.info("SILOQYNormalizerActor initialized with Nautilus ActorExecutor") + + def on_start(self) -> None: + """Subscribe to tick events - using Nautilus ActorExecutor""" + self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor starting - subscribing to tick events") + + if hasattr(self, 'msgbus') and self.msgbus: + self.msgbus.subscribe(RAW_TOPIC, self.handle_raw_tick) + self.log.info("Nautilus ActorExecutor: Subscribed to raw tick events") + + def on_stop(self) -> None: + """Stop the actor - Nautilus handles executor cleanup""" + self.log.info("Nautilus ActorExecutor: SILOQYNormalizerActor stopping") + # Nautilus kernel handles executor shutdown - no manual cleanup needed + + def handle_raw_tick(self, data): + try: + symbol, price, size, ts_event, open_price, candle_open_time = data + tick = SiloqyTradeTick( + instrument_id=symbol, + price=round(float(price), 8), + size=round(float(size), 8), + ts_event=int(ts_event), + ts_init=int(time.time_ns()), + open_price=round(float(open_price), 8) if open_price is not None else round(float(price), 8), + candle_open_time=int(candle_open_time) if candle_open_time is not None else int(time.time() * 1000), + tick_side=None, # Default to None unless Binance provides this info + exchange="BINANCE", # We know this is from Binance + liquidity_flag=None # Default to None unless Binance provides this info + ) + + if hasattr(self, 'msgbus') and self.msgbus: + try: + # Publish directly to STRUCTURED_TOPIC to match original v3.28 behavior + self.msgbus.publish(STRUCTURED_TOPIC, tick) + pass + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish to STRUCTURED_TOPIC: {e}") + + # FALLBACK: Try Nautilus publish_data if msgbus not available + elif hasattr(self, 'publish_data'): + try: + # Use standard Nautilus publishing as fallback + self.publish_data(DataType(SiloqyTradeTick), tick) + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to publish structured data: {e}") + self.normalized_count += 1 + + # Periodic metrics + now = time.time_ns() + if now - self.last_metric_log > 1_000_000_000: + self.log.info(f"Nautilus ActorExecutor: Normalizer processed: {self.normalized_count} ticks") + self.last_metric_log = now + + except Exception as e: + self.log.error(f"Nautilus ActorExecutor: Failed to handle raw tick: {e}") + +def test_siloqy_actors_with_nautilus_process_management(): + """Test SILOQY actors with Nautilus built-in ActorExecutor process management""" + print("SILOQY Multi-Actor Test with Nautilus Built-in Process Management") + + # Configuration following test pattern - THROTTLE MODE ENABLED FOR DEV/TESTING + symbol_discovery_config = ImportableActorConfig( + actor_path="__main__:SILOQYSymbolDiscoveryActor", + config_path="__main__:SILOQYSymbolDiscoveryConfig", + config={ + "component_id": "SILOQY-SYMBOL-DISCOVERY", + "symbols": [], # Empty for dynamic discovery + "candle_interval_ms": 15 * 60 * 1000, + "throttle_mode": True, # ENABLED: Safe for dual instance testing + "throttle_rate_limit_seconds": 10.0, # 10s between batches (vs 2.5s) + "max_symbols_throttled": 414 # Only 100 symbols (vs 2000+) + } + ) + + main_actor_config = ImportableActorConfig( + actor_path="__main__:SILOQYMainActor", + config_path="__main__:SILOQYMainActorConfig", + config={ + "component_id": "SILOQY-MAIN-ACTOR", + "candle_interval_ms": 15 * 60 * 1000, + "throttle_mode": False, # ENABLED: Reduced tick generation + "enable_real_data": True # CHANGE TO True for real WebSocket data + } + ) + + regime_actor_config = ImportableActorConfig( + actor_path="__main__:DOLPHINRegimeActor", + config_path="__main__:DOLPHINRegimeActorConfig", + config={ + "component_id": "DOLPHIN-REGIME-ACTOR", + "max_symbols": 5000, + "ticks_per_analysis": 2 # Reduced for throttle mode testing + } + ) + + normalizer_config = ImportableActorConfig( + actor_path="__main__:SILOQYNormalizerActor", + config_path="__main__:SILOQYNormalizerConfig", + config={ + "component_id": "SILOQY-NORMALIZER" + } + ) + + # Trading node configuration - Uses Nautilus built-in process management + trading_config = TradingNodeConfig( + trader_id=TraderId("SILOQY-TRADER-001"), + actors=[ + symbol_discovery_config, + main_actor_config, + regime_actor_config, + normalizer_config + ], + data_clients={}, # No external data clients needed + exec_clients={} # No execution clients for this test + ) + + node = TradingNode(config=trading_config) + + try: + node.build() + print("Node built successfully with Nautilus built-in process management") + node.run() + + except KeyboardInterrupt: + print("\nSILOQY Actor test with Nautilus process management completed!") + except Exception as e: + print(f"Error: {e}") + import traceback + traceback.print_exc() + finally: + try: + node.dispose() + print("Nautilus process management: Node disposed successfully") + except: + pass + +if __name__ == "__main__": + test_siloqy_actors_with_nautilus_process_management() \ No newline at end of file diff --git a/symbol_consideration_issue.md b/symbol_consideration_issue.md new file mode 100644 index 0000000..87bac76 --- /dev/null +++ b/symbol_consideration_issue.md @@ -0,0 +1,24 @@ +2025-09-01T19:54:58.150165200Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 75.0% Bear: 25.0% | Processed: 1374 ticks +2025-09-01T19:54:58.150201100Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.45000000<-109263.08000000 B2.77130000->2.77360000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 and totals: BULLS:6/BEARS:2 +2025-09-01T19:54:58.660990300Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: Nautilus ActorExecutor: DOLPHIN metrics - ticks: 1375, regime_calcs: 687, active_symbols: 8 +2025-09-01T19:54:58.661042700Z [INFO] TRADER-000.SILOQY-NORMALIZER: Nautilus ActorExecutor: Normalizer processed: 1375 ticks +2025-09-01T19:54:58.730072100Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 75.0% Bear: 25.0% | Processed: 1376 ticks +2025-09-01T19:54:58.730093900Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.45000000<-109263.08000000 B2.77130000->2.77360000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 and totals: BULLS:6/BEARS:2 +2025-09-01T19:54:59.455057400Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 75.0% Bear: 25.0% | Processed: 1378 ticks +2025-09-01T19:54:59.455081700Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.45000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 and totals: BULLS:6/BEARS:2 +2025-09-01T19:54:59.568990700Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1380 ticks +2025-09-01T19:54:59.569016900Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.45000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 B2.58800000->2.59000000 and totals: BULLS:7/BEARS:2 +2025-09-01T19:54:59.666864100Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: Nautilus ActorExecutor: DOLPHIN metrics - ticks: 1381, regime_calcs: 690, active_symbols: 9 +2025-09-01T19:54:59.666902500Z [INFO] TRADER-000.SILOQY-NORMALIZER: Nautilus ActorExecutor: Normalizer processed: 1381 ticks +2025-09-01T19:54:59.726017700Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1382 ticks +2025-09-01T19:54:59.726051700Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.44000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 B2.58800000->2.59000000 and totals: BULLS:7/BEARS:2 +2025-09-01T19:54:59.999524400Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1384 ticks +2025-09-01T19:54:59.999567100Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.44000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 B2.58800000->2.59000000 and totals: BULLS:7/BEARS:2 +2025-09-01T19:54:59.999803000Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1386 ticks +2025-09-01T19:54:59.999815800Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: X109215.44000000<-109263.08000000 B2.77130000->2.77350000 B848.60000000->848.90000000 X109.33000000<-109.44000000 B4366.20000000->4368.36000000 B0.18580000->0.18610000 B6.48700000->6.49300000 B0.81120000->0.81150000 B2.58800000->2.59000000 and totals: BULLS:7/BEARS:2 +2025-09-01T19:55:00.683433300Z [INFO] TRADER-000.DOLPHIN-REGIME-ACTOR: REGIME STATUS: BULL | Bull: 77.8% Bear: 22.2% | Processed: 1388 ticks + +In this log, at the "transition" between 75.x% and 77.x% you will find one of the symbols being considered in the DOLPHIN market-regime detection is missing totals go from 6/2 to 7/2 ... +... so *one less symbol* is being considered.- + +Most likely *no tick has been produced for that symbols* (is this a correct assumption?).- \ No newline at end of file