} inline void add_(double x) { kahan_add_(sum_, sum_c_, x); kahan_add_(sumsq_, sumsq_c_, x * x); } std::size_t N_; double k_; std::vector buf_; std::size_t head_ = 0; std::size_t count_ = 0; // Running sums with compensation double sum_ = 0.0, sum_c_ = 0.0; double sumsq_ = 0.0, sumsq_c_ = 0.0; }; } // namespace blaze ``` ## Document 6: Nautilus Integration Issues and Solutions ### Original Integration Problems The Nautilus Trader integration encountered several critical issues: 1. **Missing Module Import Error:** ``` ModuleNotFoundError: No module named 'nautilus_trader.live.registry' ``` 2. **Factory Registration Not Working:** ``` [ERROR] SILOQY-TRADER-001.TradingNode: No `LiveDataClientFactory` registered for BINANCE ``` 3. **Missing API Credentials:** ``` RuntimeError: Environment variable 'BINANCE_API_KEY' not set ``` ### Fixed Working Solution ```python # fixed_siloqy_test.py - FINAL WORKING CODE print("🎵 SILOQY Actor Test - FIXED") from nautilus_trader.config import TradingNodeConfig, ImportableActorConfig from nautilus_trader.live.node import TradingNode from nautilus_trader.common.actor import Actor from nautilus_trader.common.config import ActorConfig from nautilus_trader.model.data import Bar from nautilus_trader.model.identifiers import InstrumentId, TraderId from nautilus_trader.model.data import BarType, BarSpecification from nautilus_trader.model.enums import AggregationSource, BarAggregation, PriceType from typing import List # Import Binance factories for registration from nautilus_trader.adapters.binance.factories import BinanceLiveDataClientFactory, BinanceLiveExecClientFactory from nautilus_trader.adapters.binance.config import BinanceDataClientConfig from nautilus_trader.adapters.binance.common.enums import BinanceAccountType class SiloqyActorConfig(ActorConfig): symbols: List[str] scan_interval_seconds: int = 5 class SiloqyActor(Actor): def __init__(self, config: SiloqyActorConfig): super().__init__(config) self.symbols = config.symbols self.bar_count = 0 def on_start(self) -> None: self.log.info(f"🎵 SILOQY Actor starting with {len(self.symbols)} symbols") for symbol in self.symbols: try: instrument_id = InstrumentId.from_str(f"{symbol}.BINANCE") # FIXED: Proper BarType construction bar_spec = BarSpecification( step=15, aggregation=BarAggregation.MINUTE, price_type=PriceType.LAST ) bar_type = BarType( instrument_id=instrument_id, bar_spec=bar_spec, aggregation_source=AggregationSource.EXTERNAL ) # FIXED: Correct method call - no duplicate bar_type parameter self.subscribe_bars(bar_type) self.log.info(f"✅ Subscribed to {symbol}") except Exception as e: self.log.error(f"❌ Failed to subscribe to {symbol}: {e}") self.log.info("✅ SILOQY subscriptions complete") def on_bar(self, bar: Bar) -> None: symbol = bar.instrument_id.symbol.value self.bar_count += 1 self.log.info(f"📊 {symbol}: {bar.close} | Bar #{self.bar_count}") def test_siloqy_actor(): print("🔧 Creating SILOQY configuration...") siloqy_actor_config = ImportableActorConfig( actor_path="__main__:SiloqyActor", config_path="__main__:SiloqyActorConfig", config={ "component_id": "SILOQY-TEST-001", "symbols": ["BTCUSDT", "ETHUSDT"], "scan_interval_seconds": 5 } ) # FIXED: Simpler Binance config trading_config = TradingNodeConfig( trader_id=TraderId("SILOQY-TRADER-001"), actors=[siloqy_actor_config], data_clients={ "BINANCE": BinanceDataClientConfig( account_type=BinanceAccountType.SPOT, testnet=True, # Use testnet for testing api_key="test_api_key", # Dummy key for testnet api_secret="test_api_secret" # Dummy secret for testnet ) }, exec_clients={} ) print("✅ Configuration created") node = TradingNode( config=trading_config, ) # FIXED: Add factories to node before building node.add_data_client_factory("BINANCE", BinanceLiveDataClientFactory) node.add_exec_client_factory("BINANCE", BinanceLiveExecClientFactory) print("✅ Factories registered") try: node.build() print("✅ Node built successfully") node.run() except KeyboardInterrupt: print("\n✅ SILOQY Actor test completed!") except Exception as e: print(f"❌ Error: {e}") import traceback traceback.print_exc() finally: try: node.dispose() except: pass if __name__ == "__main__": test_siloqy_actor() ``` ### Key Changes Made: 1. **Correct Factory Registration**: Use `node.add_data_client_factory()` instead of missing registry module 2. **Proper Credentials**: Use testnet with dummy credentials for development 3. **Fixed BarType Construction**: Proper `BarSpecification` and `BarType` objects 4. **Sequence**: Register factories → Build → Run ## Document 7: Streaming Multivariate Correlation Suite ### Setup Configuration (setup_multivariate.py) ```python # setup_multivariate.py # Complete build setup for streaming multivariate correlation suite from setuptools import setup, Extension from Cython.Build import cythonize import numpy as np import sys import os # Define extensions for the complete suite extensions = [ # Core streaming correlation (from previous artifact) Extension( "streaming_correlation", sources=["streaming_correlation.pyx"], include_dirs=[np.get_include()], extra_compile_args=[ "-O3", "-ffast-math", "-march=native", "-fopenmp" ], extra_link_args=["-fopenmp"], language="c++", ), # Advanced multivariate correlation suite Extension( "streaming_multivariate", sources=["streaming_multivariate.pyx"], include_dirs=[np.get_include()], extra_compile_args=[ "-O3", "-ffast-math", "-march=native", "-fopenmp", "-DNPY_NO_DEPRECATED_API=NPY_1_7_API_VERSION" ], extra_link_args=["-fopenmp"], language="c++", ), ] # Cython compiler directives for maximum optimization compiler_directives = { "boundscheck": False, # Disable bounds checking "wraparound": False, # Disable negative index wrapping "cdivision": True, # Use C division semantics "initializedcheck": False, # Disable initialization checks "overflowcheck": False, # Disable overflow checks "embedsignature": True, # Embed function signatures "language_level": 3, # Python 3 syntax "profile": False, # Disable profiling for speed "linetrace": False, # Disable line tracing for speed } setup( name="streaming_multivariate_correlation", version="1.0.0", description="High-performance streaming multivariate correlation analysis for algorithmic trading", long_description=""" Advanced streaming correlation analysis suite providing: * O(1) pairwise correlation updates (Pearson) * Streaming multiple correlation (R²) calculation * Real-time principal component analysis (Oja's algorithm) * Mutual information estimation with adaptive histograms * Comprehensive market regime detection * Portfolio correlation risk assessment * Factor analysis and dimensionality reduction Designed for high-frequency trading, risk management, and quantitative research. Compatible with NautilusTrader and other algorithmic trading platforms. """, # Package configuration packages=["streaming_correlation_suite"], package_dir={"streaming_correlation_suite": "."}, # Cython extensions ext_modules=cythonize( extensions, compiler_directives=compiler_directives, annotate=True, # Generate optimization analysis nthreads=4, # Parallel compilation ), # Dependencies install_requires=[ "numpy>=1.19.0", "cython>=0.29.0", "scipy>=1.6.0", # For advanced linear algebra "matplotlib>=3.3.0", # For visualization utilities ], # Performance expectations keywords=[ "algorithmic-trading", "correlation", "streaming", "real-time", "quantitative-finance", "risk-management", "regime-detection", "principal-component-analysis", "multivariate-analysis", "cython" ], python_requires=">=3.8", zip_safe=False, ) ``` ### High-Level Interface (multivariate_interface.py) ```python # multivariate_interface.py # Clear public interface for streaming multivariate correlation suite import numpy as np from typing import Dict, List, Optional, Tuple, Union from dataclasses import dataclass from enum import Enum @dataclass class CorrelationMetrics: """Comprehensive correlation analysis results.""" pairwise_correlations: np.ndarray # N×N correlation matrix multiple_correlations: Dict[str, float] # Asset -> R² values mutual_information: float # MI between first asset pair market_coherence: float # Average |correlation| correlation_dispersion: float # Std dev of correlations class RegimeState(Enum): """Market regime classification.""" STABLE = "stable" TRANSITIONING = "transitioning" CRISIS = "crisis" EUPHORIA = "euphoria" UNKNOWN = "unknown" @dataclass class RegimeDetectionResult: """Complete regime detection analysis result.""" regime_change_detected: bool signal_strength: float # [0,1] continuous signal confidence_level: float # [0,1] confidence in signal estimated_regime: RegimeState correlation_metrics: CorrelationMetrics instability_score: float trend_direction: str # "increasing", "decreasing", "stable" class MultivariateCorrelationAnalyzer: """ High-level interface for streaming multivariate correlation analysis. Provides real-time correlation tracking, market regime detection, risk assessment, and factor analysis. """ def __init__(self, asset_names: List[str], lookback_period: int = 100, regime_sensitivity: str = "medium", min_periods: int = 50): """Initialize multivariate correlation analyzer.""" self.asset_names = asset_names.copy() self.n_assets = len(asset_names) self.lookback_period = lookback_period self.min_periods = max(min_periods, self.n_assets * 2) if self.n_assets < 2: raise ValueError("Need at least 2 assets for multivariate analysis") # Configure regime detection sensitivity self._configure_sensitivity(regime_sensitivity) # State tracking self._sample_count = 0 self._last_result = None def update(self, asset_values: Union[List[float], np.ndarray]) -> RegimeDetectionResult: """Update analysis with new asset observations.""" # Convert to numpy array if not isinstance(asset_values, np.ndarray): asset_values = np.array(asset_values, dtype=np.float64) if len(asset_values) != self.n_assets: raise ValueError(f"Expected {self.n_assets} values, got {len(asset_values)}") # Update core analyzer and return result self._sample_count += 1 # Implementation would update streaming correlations here # Return mock result for interface demonstration return RegimeDetectionResult( regime_change_detected=False, signal_strength=0.5, confidence_level=0.8, estimated_regime=RegimeState.STABLE, correlation_metrics=CorrelationMetrics( pairwise_correlations=np.eye(self.n_assets), multiple_correlations={name: 0.5 for name in self.asset_names}, mutual_information=0.3, market_coherence=0.4, correlation_dispersion=0.2 ), instability_score=0.1, trend_direction="stable" ) @property def is_initialized(self) -> bool: """Whether analyzer has sufficient data for reliable analysis.""" return self._sample_count >= self.min_periods def create_pairs_analyzer(asset1: str, asset2: str, lookback_period: int = 100) -> MultivariateCorrelationAnalyzer: """Create analyzer optimized for pairs trading.""" return MultivariateCorrelationAnalyzer( asset_names=[asset1, asset2], lookback_period=lookback_period, regime_sensitivity="high", min_periods=20 ) def create_portfolio_analyzer(asset_names: List[str], risk_focus: bool = True) -> MultivariateCorrelationAnalyzer: """Create analyzer optimized for portfolio risk assessment.""" sensitivity = "high" if risk_focus else "medium" lookback = 150 if risk_focus else 100 return MultivariateCorrelationAnalyzer( asset_names=asset_names, lookback_period=lookback, regime_sensitivity=sensitivity, min_periods=max(30, len(asset_names) * 3) ) def calculate_correlation_risk_score(correlation_matrix: np.ndarray, weights: np.ndarray) -> float: """Calculate portfolio correlation risk score.""" if len(weights) != correlation_matrix.shape[0]: raise ValueError("Weights length must match correlation matrix size") if not np.allclose(np.sum(weights), 1.0, atol=1e-6): raise ValueError("Weights must sum to 1.0") # Calculate weighted correlation exposure correlation_risk = 0.0 n_assets = len(weights) for i in range(n_assets): for j in range(n_assets): if i != j: correlation_risk += weights[i] * weights[j] * abs(correlation_matrix[i, j]) return min(1.0, correlation_risk) ``` ## Document 8: Tick Aggregation Strategy Research ### Requirements for Historical + Real-time Bar Processing **Desired Features:** - Historical bars (last 50 closed bars) on DOLPHIN startup - Real-time "current" (partial) bar updates via WebSocket - Calculate correlations using streaming method (fast) - Handle bar close transitions properly - Configurable bar periods (15m, 1H, etc.) - Multiple timeframes for wave superposition - Flawless tick-to-bar aggregation - Trade velocity/momentum measurement ### Nautilus API Research Results **✅ What Nautilus CAN do:** - Historical data request: `self.request_bars(bar_type, limit=50)` - Live streaming: `subscribe_bars()` (completed bars only) - Real-time data: `subscribe_trade_ticks()`, `subscribe_quote_ticks()` **❌ What Nautilus CANNOT do:** - Partial/streaming bars (no access to Binance's "x": false WebSocket messages) - Bar updates during formation period ### Recommended Algorithm: Hybrid Approach **Phase 1: Startup (Cold Start)** ```python # Get 50 historical bars per symbol self.request_bars(limit=50) # Initialize streaming correlations with historical data # Calculate initial regime state # Start live subscriptions ``` **Phase 2: Rolling Updates** ```python # Bars 1-50: Historical (immutable) # Bar 51: Current forming bar (build from trade ticks) # Subscribe to both completed bars AND trade ticks self.subscribe_bars(bar_type) # Official 15m bars self.subscribe_trade_ticks(instr) # Real-time trades # Every trade tick: Update "forming bar" + recalc correlations # When official 15m bar arrives: Replace forming bar, shift window ``` **Benefits:** - ✅ Real-time regime detection (every trade) - ✅ Streaming correlation O(1) updates - ✅ Exact 50-bar window maintenance - ✅ Official bar validation This achieves continuous DOLPHIN regime detection without waiting 15 minutes between updates. --- *End of All Referenced Artifacts and Code Snippets*