#!/usr/bin/env python3 """ ADAPTER: HazelcastDataFeed ========================== Implementation of DataFeedPort using Hazelcast. Consumes the canonical NG7/NG8 Hazelcast payload that BLUE already uses. The legacy scan_bridge bundle format is intentionally not part of the live contract. """ import json import logging from datetime import datetime from typing import Optional, Callable, Dict, Any # Port interface import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) from ports.data_feed import DataFeedPort, MarketSnapshot, ACBUpdate try: from .eigen_scan_normalizer import normalize_ng7_scan except ImportError: # pragma: no cover - direct script fallback from prod.clean_arch.adapters.eigen_scan_normalizer import normalize_ng7_scan logger = logging.getLogger("HazelcastDataFeed") class HazelcastDataFeed(DataFeedPort): """ ADAPTER: Hazelcast implementation of DataFeedPort. Reads from DolphinNG6 output via Hazelcast maps: - DOLPHIN_FEATURES: Price + eigenvalues (ALWAYS SYNCED) - DOLPHIN_SAFETY: Posture/mode - DOLPHIN_STATE_*: Portfolio state No sync issues - all data written atomically by DolphinNG6. """ def __init__(self, config: Dict[str, Any]): self.config = config self.hz_client = None self.features_map = None self.safety_map = None self._last_snapshot: Optional[MarketSnapshot] = None self._latency_ms = 0.0 async def connect(self) -> bool: """Connect to Hazelcast cluster.""" try: import hazelcast hz_config = self.config.get('hazelcast', {}) cluster = hz_config.get('cluster', 'dolphin') host = hz_config.get('host', 'localhost:5701') logger.info(f"Connecting to Hazelcast: {host} (cluster: {cluster})") self.hz_client = hazelcast.HazelcastClient( cluster_name=cluster, cluster_members=[host], ) # Get reference to maps self.features_map = self.hz_client.get_map('DOLPHIN_FEATURES').blocking() self.safety_map = self.hz_client.get_map('DOLPHIN_SAFETY').blocking() # Test connection size = self.features_map.size() logger.info(f"[✓] Connected. Features map: {size} entries") return True except Exception as e: logger.error(f"[✗] Connection failed: {e}") return False async def disconnect(self): """Clean disconnect.""" if self.hz_client: self.hz_client.shutdown() logger.info("[✓] Disconnected from Hazelcast") async def get_latest_snapshot(self, symbol: str = "BTCUSDT") -> Optional[MarketSnapshot]: """ Get latest synchronized snapshot from Hazelcast. Consumes the canonical single-result NG7/NG8 Hazelcast payload. BLUE is the contract: PINK follows it. """ try: start = datetime.utcnow() raw = self.features_map.get("latest_eigen_scan") if not raw: return self._last_snapshot # Return cached if available data = json.loads(raw) if isinstance(data, dict) and data.get("version") == "NG7": data = normalize_ng7_scan(data) result = data.get("result") if isinstance(data, dict) else None if isinstance(result, dict): scan = result scan_asset = str(scan.get("asset") or data.get("target_asset") or symbol).upper() else: scan = data if isinstance(data, dict) else {} scan_asset = str(scan.get("asset") or data.get("target_asset") or symbol).upper() if scan_asset and symbol and scan_asset != str(symbol).upper(): logger.warning(f"Symbol {symbol} not in latest_eigen_scan asset {scan_asset}") return None price_value = float(scan.get('price') or 0.0) eigenvalues = [] eigen_tracking = scan.get('eigenvalue_tracking') or {} if isinstance(eigen_tracking, dict): lambda_max = eigen_tracking.get('lambda_max') if lambda_max is not None: eigenvalues = [float(lambda_max)] if not eigenvalues: eigenvalues = [float(v) for v in (scan.get('eigenvalues') or []) if v is not None] # Prefer an explicit vel_div if present; otherwise compute it from # 50 vs 750 velocity tracking on the single-result schema. vel_div = scan.get('vel_div') if vel_div is None: multi = scan.get('multi_window_results') or {} if isinstance(multi, dict): w50 = multi.get('50', {}).get('tracking_data', {}) if isinstance(multi.get('50'), dict) else {} w750 = multi.get('750', {}).get('tracking_data', {}) if isinstance(multi.get('750'), dict) else {} try: v50 = float(w50.get('lambda_max_velocity')) v750 = float(w750.get('lambda_max_velocity')) vel_div = v50 - v750 except Exception: vel_div = None irp_alignment = scan.get('irp_alignment') if irp_alignment is None: irp_alignment = scan.get('confidence') # Build snapshot snapshot = MarketSnapshot( timestamp=datetime.utcnow(), # Or parse from data['timestamp'] symbol=symbol, price=price_value, eigenvalues=[float(e) for e in eigenvalues] if eigenvalues else [], velocity_divergence=vel_div, irp_alignment=irp_alignment, scan_number=data.get('scan_number'), source="hazelcast", scan_payload=data, ) self._last_snapshot = snapshot # Calculate latency self._latency_ms = (datetime.utcnow() - start).total_seconds() * 1000 return snapshot except Exception as e: logger.error(f"Error reading snapshot: {e}") return self._last_snapshot async def subscribe_snapshots(self, callback: Callable[[MarketSnapshot], None]): """ Subscribe to snapshot updates via polling (listener not critical). Polling every 5s matches DolphinNG6 pulse. """ logger.info("[✓] Snapshot subscription ready (polling mode)") async def get_acb_update(self) -> Optional[ACBUpdate]: """Get ACB update from Hazelcast.""" try: # DOLPHIN_FEATURES["acb_boost"] — written by acb_processor_service.py raw = self.features_map.get("acb_boost") if raw: data = json.loads(raw) return ACBUpdate( timestamp=datetime.utcnow(), boost=data.get('boost', 1.0), beta=data.get('beta', 0.5), cut=data.get('cut', 0.0), posture=data.get('posture', 'APEX') ) except Exception as e: logger.error(f"ACB read error: {e}") return None def get_latency_ms(self) -> float: """Return last measured latency.""" return self._latency_ms def health_check(self) -> bool: """Check Hazelcast connection health.""" if not self.hz_client: return False try: # Quick ping self.features_map.size() return True except: return False