diff --git a/nautilus_actor_test_implementation_5x.py b/nautilus_actor_test_implementation_5x.py index 8774a37..9388297 100644 --- a/nautilus_actor_test_implementation_5x.py +++ b/nautilus_actor_test_implementation_5x.py @@ -1,4 +1,5 @@ import time +import sys import numpy as np import asyncio import json @@ -927,6 +928,8 @@ class SILOQYMainActor(Actor): except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish tick: {e}") + self.log.error(f"{tick_tuple}") + sys.exit(2) class DOLPHINRegimeActor(Actor): def __init__(self, config: DOLPHINRegimeActorConfig): diff --git a/nautilus_actor_test_implementation_6x.py b/nautilus_actor_test_implementation_6x.py index 529efac..fd0ab8d 100644 --- a/nautilus_actor_test_implementation_6x.py +++ b/nautilus_actor_test_implementation_6x.py @@ -39,6 +39,11 @@ CANDLES_INITIAL_TOPIC = "SILOQY.CANDLES.INITIAL" # ADDED LINE 18: TICK_SIZES_TOPIC = "SILOQY.TICK.SIZES" +# NEW: Enhanced BB indicator topics for tuple-based publishing +REGIME_INDICATORS_TOPIC = "DOLPHIN.REGIME.INDICATORS" +BB_METRICS_TOPIC = "DOLPHIN.BB.METRICS" +TEMPORAL_PATTERNS_TOPIC = "DOLPHIN.TEMPORAL.PATTERNS" + # Rate limiting constant MIN_INTERVAL = 2.5 # seconds between API batches @@ -53,6 +58,16 @@ class MarketRegime(Enum): # EXTRACTED FROM STANDALONE: REAL WEBSOCKET INFRASTRUCTURE # ============================================================================ +#TODO TODO: Manually added. Left for *future* re-integration of class instead of tuples (where appropriate) +class SILOQYQuoteTick(Data): + def __init__(self, instrument_id, price, size, ts_event, ts_init=None): + super().__init__() + self.instrument_id = str(instrument_id) + self.price = float(price) + self.size = float(size) + self.ts_event = int(ts_event) + self.ts_init = int(ts_init if ts_init is not None else time.time_ns()) + @dataclass class TickData: """Universal tick data structure from standalone""" @@ -834,7 +849,7 @@ class SILOQYMainActor(Actor): def on_websocket_tick(self, symbol: str, price: float, quantity: float, timestamp: int): """ PRESERVED: Original tick processing and candle update logic - """ + """ # PRESERVED: Original candle update logic if symbol not in self.active_candles: candle_open_time = (timestamp // self.candle_interval_ms) * self.candle_interval_ms @@ -865,24 +880,23 @@ class SILOQYMainActor(Actor): candle['high'] = round(max(candle['high'], price), 8) candle['low'] = round(min(candle['low'], price), 8) candle['volume'] += quantity - + # PRESERVED: Original high-performance tuple publishing try: - if hasattr(self, 'msgbus') and self.msgbus: - # STEP 3 FIX: Round all float values to 8 decimals in the tuple - tick_tuple = ( - symbol, - round(float(price), 8), - round(float(quantity), 8), - int(timestamp), - round(float(candle['open_price']), 8), - int(candle['open_time']) - ) - - self.msgbus.publish(RAW_TOPIC, tick_tuple) + tick_tuple = ( + str(symbol), + float(price), + float(quantity), + int(timestamp), + float(candle['open_price']), + int(candle['open_time']) + ) + + self.msgbus.publish(RAW_TOPIC, tick_tuple) except Exception as e: - self.log.error(f"Nautilus ActorExecutor: Failed to publish tick: {e}") + self.log.error(f"Nautilus ActorExecutor: Faish tick: {e}") + print(f"EXCEPTION DETAILS: {type(e)} - {e}") class DOLPHINRegimeActor(Actor): def __init__(self, config: DOLPHINRegimeActorConfig): @@ -920,6 +934,13 @@ class DOLPHINRegimeActor(Actor): self.processed_ticks = 0 self.regime_calculations = 0 + # NEW: Enhanced indicator tracking for BB and temporal patterns + self.signal_history = deque(maxlen=100) # For BB calculations + self.bb_period = 20 # BB calculation period + self.bb_std_dev = 2.0 # BB standard deviation multiplier + self.velocity_history = deque(maxlen=10) # For regime velocity tracking + self.confidence_history = deque(maxlen=20) # For confidence trend analysis + self.log.info(f"DOLPHINRegimeActor initialized with Nautilus ActorExecutor - max_symbols: {self.max_symbols}, " f"ticks_per_analysis: {self.ticks_per_analysis}") @@ -1041,6 +1062,132 @@ class DOLPHINRegimeActor(Actor): except Exception as e: self.log.error(f"Nautilus ActorExecutor: Regime detection error: {e}") + def _calculate_enhanced_indicators(self, bull_ratio, bear_ratio, confidence, analyzed, total_symbols): + """NEW: Calculate enhanced indicators including BB metrics and temporal patterns""" + + # Initialize history attributes if they don't exist + if not hasattr(self, 'signal_history'): + self.signal_history = deque(maxlen=50) + if not hasattr(self, 'velocity_history'): + self.velocity_history = deque(maxlen=50) + if not hasattr(self, 'confidence_history'): + self.confidence_history = deque(maxlen=50) + if not hasattr(self, 'bb_period'): + self.bb_period = 20 + if not hasattr(self, 'bb_std_dev'): + self.bb_std_dev = 2.0 + + # Calculate regime momentum signal + base_momentum = (bull_ratio - bear_ratio) * 100 # -100 to +100 + sample_quality = min(analyzed / total_symbols, 1.0) if total_symbols > 0 else 0.0 + signal = base_momentum * confidence * sample_quality + + # Add to signal history + self.signal_history.append(signal) + + # Calculate velocity (rate of change in signal) + velocity = 0.0 + if len(self.signal_history) >= 2: + velocity = self.signal_history[-1] - self.signal_history[-2] + self.velocity_history.append(velocity) + + # Store confidence for trending + self.confidence_history.append(confidence) + + # Calculate Bollinger Bands if enough history + bb_metrics = {} + if len(self.signal_history) >= self.bb_period: + # REPLACE SLICE: recent_signals = list(self.signal_history)[-self.bb_period:] + recent_signals = [] + start_index = len(self.signal_history) - self.bb_period + for i in range(start_index, len(self.signal_history)): + recent_signals.append(self.signal_history[i]) + + sma = sum(recent_signals) / len(recent_signals) + + # Calculate standard deviation + variance = sum((x - sma) ** 2 for x in recent_signals) / len(recent_signals) + std_dev = variance ** 0.5 + + upper_band = sma + (self.bb_std_dev * std_dev) + lower_band = sma - (self.bb_std_dev * std_dev) + + # Position within BBs (mean reversion interpretation) + if signal > upper_band: + bb_position = 'ABOVE_UPPER' + elif signal < lower_band: + bb_position = 'BELOW_LOWER' + elif signal >= sma: + bb_position = 'UPPER_HALF' + else: + bb_position = 'LOWER_HALF' + + # Momentum persistence interpretation + if signal > upper_band: + momentum_signal = 'STRONG_BULL_BREAKOUT' + elif signal < lower_band: + momentum_signal = 'STRONG_BEAR_BREAKOUT' + elif signal > sma: + momentum_signal = 'MILD_BULLISH' + else: + momentum_signal = 'MILD_BEARISH' + + bb_metrics = { + 'signal': signal, + 'sma': sma, + 'upper_band': upper_band, + 'lower_band': lower_band, + 'bb_position': bb_position, + 'momentum_signal': momentum_signal, + 'bb_ready': True + } + else: + bb_metrics = { + 'signal': signal, + 'sma': None, + 'upper_band': None, + 'lower_band': None, + 'bb_position': 'INSUFFICIENT_DATA', + 'momentum_signal': 'INSUFFICIENT_DATA', + 'bb_ready': False + } + + # Calculate temporal patterns + temporal_metrics = {} + if len(self.velocity_history) >= 3: + avg_velocity = sum(self.velocity_history) / len(self.velocity_history) + velocity_trend = 'ACCELERATING' if velocity > avg_velocity else 'DECELERATING' + else: + avg_velocity = velocity + velocity_trend = 'BUILDING_HISTORY' + + if len(self.confidence_history) >= 5: + # REPLACE SLICE: confidence > sum(self.confidence_history[-5:-1])/4 + confidence_sum = 0.0 + count = 0 + start_index = len(self.confidence_history) - 5 + end_index = len(self.confidence_history) - 1 # exclude last element + for i in range(start_index, end_index): + confidence_sum += self.confidence_history[i] + count += 1 + + if count > 0: + confidence_avg = confidence_sum / count + confidence_trend = 'RISING' if confidence > confidence_avg else 'FALLING' + else: + confidence_trend = 'BUILDING_HISTORY' + else: + confidence_trend = 'BUILDING_HISTORY' + + temporal_metrics = { + 'velocity': velocity, + 'avg_velocity': avg_velocity, + 'velocity_trend': velocity_trend, + 'confidence_trend': confidence_trend + } + + return bb_metrics, temporal_metrics + def _run_regime_detection(self): self.regime_calculations += 1 @@ -1065,7 +1212,6 @@ class DOLPHINRegimeActor(Actor): analyzed += 1 - # NEW: HFT-grade tick-size based comparison tick_size = self.tick_sizes[idx] equality_threshold = tick_size / 2 # Half tick size standard @@ -1126,7 +1272,55 @@ class DOLPHINRegimeActor(Actor): except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish regime result: {e}") - + + # NEW: Calculate enhanced indicators + bb_metrics, temporal_metrics = self._calculate_enhanced_indicators( + bull_ratio, bear_ratio, confidence, analyzed, total_symbols + ) + + # NEW: Publish enhanced indicators to data bus as tuples + try: + if hasattr(self, 'msgbus') and self.msgbus: + timestamp = int(time.time() * 1000) + + # Publish regime indicators as tuple + indicator_tuple = ( + timestamp, + float(bb_metrics['signal']), + bool(bb_metrics['bb_ready']), + float(temporal_metrics['velocity']), + str(temporal_metrics['velocity_trend']), + str(temporal_metrics['confidence_trend']) + ) + self.msgbus.publish(REGIME_INDICATORS_TOPIC, indicator_tuple) + + # Publish BB metrics as tuple (only if BB is ready) + if bb_metrics['bb_ready']: + bb_tuple = ( + timestamp, + float(bb_metrics['signal']), + float(bb_metrics['sma']), + float(bb_metrics['upper_band']), + float(bb_metrics['lower_band']), + str(bb_metrics['bb_position']), + str(bb_metrics['momentum_signal']) + ) + self.msgbus.publish(BB_METRICS_TOPIC, bb_tuple) + + # Publish temporal patterns as tuple + temporal_tuple = ( + timestamp, + float(temporal_metrics['velocity']), + float(temporal_metrics['avg_velocity']), + str(temporal_metrics['velocity_trend']), + str(temporal_metrics['confidence_trend']), + int(len(self.signal_history)) + ) + self.msgbus.publish(TEMPORAL_PATTERNS_TOPIC, temporal_tuple) + + except Exception as e: + self.log.error(f"Failed to publish enhanced indicators: {e}") + # Log regime changes if not self.regime_history or regime != self.regime_history[-1]: self.log.info(f"REGIME CHANGE: {regime.value} | Bull: {bull_ratio:.2%} " @@ -1149,7 +1343,20 @@ class DOLPHINRegimeActor(Actor): self.log.info(f"{color_code}REGIME STATUS: {regime.value} | Bull: {bull_ratio:.2%} " f"Bear: {bear_ratio:.2%} ({bullish}/{bearish}) | Processed: {self.processed_ticks} ticks{reset_code}") - + + # Enhanced indicator line after regime status + if bb_metrics['bb_ready']: + self.log.info(f"{color_code}BB INDICATORS: Signal:{bb_metrics['signal']:.2f} | " + f"SMA:{bb_metrics['sma']:.2f} | Upper:{bb_metrics['upper_band']:.2f} | " + f"Lower:{bb_metrics['lower_band']:.2f} | Pos:{bb_metrics['bb_position']} | " + f"Mom:{bb_metrics['momentum_signal']} | Vel:{temporal_metrics['velocity']:.2f} | " + f"VelTrend:{temporal_metrics['velocity_trend']} | ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}") + else: + self.log.info(f"{color_code}BB INDICATORS: Signal:{bb_metrics['signal']:.2f} | " + f"Status:BUILDING_BB_HISTORY ({len(self.signal_history)}/{self.bb_period}) | " + f"Vel:{temporal_metrics['velocity']:.2f} | VelTrend:{temporal_metrics['velocity_trend']} | " + f"ConfTrend:{temporal_metrics['confidence_trend']}{reset_code}") + # NEW: Log symbol pattern and counts if symbol_pattern: # Only if we have symbols to show pattern_str = " ".join(symbol_pattern) + " " # Create pattern with spaces @@ -1235,6 +1442,7 @@ class SILOQYNormalizerActor(Actor): try: # Publish directly to STRUCTURED_TOPIC to match original v3.28 behavior self.msgbus.publish(STRUCTURED_TOPIC, tick) + pass except Exception as e: self.log.error(f"Nautilus ActorExecutor: Failed to publish to STRUCTURED_TOPIC: {e}")