209 lines
7.7 KiB
Python
209 lines
7.7 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""
|
||
|
|
ADAPTER: HazelcastDataFeed
|
||
|
|
==========================
|
||
|
|
Implementation of DataFeedPort using Hazelcast.
|
||
|
|
|
||
|
|
Consumes the canonical NG7/NG8 Hazelcast payload that BLUE already uses.
|
||
|
|
The legacy scan_bridge bundle format is intentionally not part of the live
|
||
|
|
contract.
|
||
|
|
"""
|
||
|
|
|
||
|
|
import json
|
||
|
|
import logging
|
||
|
|
from datetime import datetime
|
||
|
|
from typing import Optional, Callable, Dict, Any
|
||
|
|
|
||
|
|
# Port interface
|
||
|
|
import sys
|
||
|
|
from pathlib import Path
|
||
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||
|
|
from ports.data_feed import DataFeedPort, MarketSnapshot, ACBUpdate
|
||
|
|
|
||
|
|
try:
|
||
|
|
from .eigen_scan_normalizer import normalize_ng7_scan
|
||
|
|
except ImportError: # pragma: no cover - direct script fallback
|
||
|
|
from prod.clean_arch.adapters.eigen_scan_normalizer import normalize_ng7_scan
|
||
|
|
|
||
|
|
logger = logging.getLogger("HazelcastDataFeed")
|
||
|
|
|
||
|
|
|
||
|
|
class HazelcastDataFeed(DataFeedPort):
|
||
|
|
"""
|
||
|
|
ADAPTER: Hazelcast implementation of DataFeedPort.
|
||
|
|
|
||
|
|
Reads from DolphinNG6 output via Hazelcast maps:
|
||
|
|
- DOLPHIN_FEATURES: Price + eigenvalues (ALWAYS SYNCED)
|
||
|
|
- DOLPHIN_SAFETY: Posture/mode
|
||
|
|
- DOLPHIN_STATE_*: Portfolio state
|
||
|
|
|
||
|
|
No sync issues - all data written atomically by DolphinNG6.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, config: Dict[str, Any]):
|
||
|
|
self.config = config
|
||
|
|
self.hz_client = None
|
||
|
|
self.features_map = None
|
||
|
|
self.safety_map = None
|
||
|
|
self._last_snapshot: Optional[MarketSnapshot] = None
|
||
|
|
self._latency_ms = 0.0
|
||
|
|
|
||
|
|
async def connect(self) -> bool:
|
||
|
|
"""Connect to Hazelcast cluster."""
|
||
|
|
try:
|
||
|
|
import hazelcast
|
||
|
|
|
||
|
|
hz_config = self.config.get('hazelcast', {})
|
||
|
|
cluster = hz_config.get('cluster', 'dolphin')
|
||
|
|
host = hz_config.get('host', 'localhost:5701')
|
||
|
|
|
||
|
|
logger.info(f"Connecting to Hazelcast: {host} (cluster: {cluster})")
|
||
|
|
|
||
|
|
self.hz_client = hazelcast.HazelcastClient(
|
||
|
|
cluster_name=cluster,
|
||
|
|
cluster_members=[host],
|
||
|
|
)
|
||
|
|
|
||
|
|
# Get reference to maps
|
||
|
|
self.features_map = self.hz_client.get_map('DOLPHIN_FEATURES').blocking()
|
||
|
|
self.safety_map = self.hz_client.get_map('DOLPHIN_SAFETY').blocking()
|
||
|
|
|
||
|
|
# Test connection
|
||
|
|
size = self.features_map.size()
|
||
|
|
logger.info(f"[✓] Connected. Features map: {size} entries")
|
||
|
|
|
||
|
|
return True
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"[✗] Connection failed: {e}")
|
||
|
|
return False
|
||
|
|
|
||
|
|
async def disconnect(self):
|
||
|
|
"""Clean disconnect."""
|
||
|
|
if self.hz_client:
|
||
|
|
self.hz_client.shutdown()
|
||
|
|
logger.info("[✓] Disconnected from Hazelcast")
|
||
|
|
|
||
|
|
async def get_latest_snapshot(self, symbol: str = "BTCUSDT") -> Optional[MarketSnapshot]:
|
||
|
|
"""
|
||
|
|
Get latest synchronized snapshot from Hazelcast.
|
||
|
|
|
||
|
|
Consumes the canonical single-result NG7/NG8 Hazelcast payload.
|
||
|
|
BLUE is the contract: PINK follows it.
|
||
|
|
"""
|
||
|
|
try:
|
||
|
|
start = datetime.utcnow()
|
||
|
|
|
||
|
|
raw = self.features_map.get("latest_eigen_scan")
|
||
|
|
if not raw:
|
||
|
|
return self._last_snapshot # Return cached if available
|
||
|
|
|
||
|
|
data = json.loads(raw)
|
||
|
|
if isinstance(data, dict) and data.get("version") == "NG7":
|
||
|
|
data = normalize_ng7_scan(data)
|
||
|
|
|
||
|
|
result = data.get("result") if isinstance(data, dict) else None
|
||
|
|
if isinstance(result, dict):
|
||
|
|
scan = result
|
||
|
|
scan_asset = str(scan.get("asset") or data.get("target_asset") or symbol).upper()
|
||
|
|
else:
|
||
|
|
scan = data if isinstance(data, dict) else {}
|
||
|
|
scan_asset = str(scan.get("asset") or data.get("target_asset") or symbol).upper()
|
||
|
|
|
||
|
|
if scan_asset and symbol and scan_asset != str(symbol).upper():
|
||
|
|
logger.warning(f"Symbol {symbol} not in latest_eigen_scan asset {scan_asset}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
price_value = float(scan.get('price') or 0.0)
|
||
|
|
eigenvalues = []
|
||
|
|
eigen_tracking = scan.get('eigenvalue_tracking') or {}
|
||
|
|
if isinstance(eigen_tracking, dict):
|
||
|
|
lambda_max = eigen_tracking.get('lambda_max')
|
||
|
|
if lambda_max is not None:
|
||
|
|
eigenvalues = [float(lambda_max)]
|
||
|
|
if not eigenvalues:
|
||
|
|
eigenvalues = [float(v) for v in (scan.get('eigenvalues') or []) if v is not None]
|
||
|
|
|
||
|
|
# Prefer an explicit vel_div if present; otherwise compute it from
|
||
|
|
# 50 vs 750 velocity tracking on the single-result schema.
|
||
|
|
vel_div = scan.get('vel_div')
|
||
|
|
if vel_div is None:
|
||
|
|
multi = scan.get('multi_window_results') or {}
|
||
|
|
if isinstance(multi, dict):
|
||
|
|
w50 = multi.get('50', {}).get('tracking_data', {}) if isinstance(multi.get('50'), dict) else {}
|
||
|
|
w750 = multi.get('750', {}).get('tracking_data', {}) if isinstance(multi.get('750'), dict) else {}
|
||
|
|
try:
|
||
|
|
v50 = float(w50.get('lambda_max_velocity'))
|
||
|
|
v750 = float(w750.get('lambda_max_velocity'))
|
||
|
|
vel_div = v50 - v750
|
||
|
|
except Exception:
|
||
|
|
vel_div = None
|
||
|
|
irp_alignment = scan.get('irp_alignment')
|
||
|
|
if irp_alignment is None:
|
||
|
|
irp_alignment = scan.get('confidence')
|
||
|
|
|
||
|
|
# Build snapshot
|
||
|
|
snapshot = MarketSnapshot(
|
||
|
|
timestamp=datetime.utcnow(), # Or parse from data['timestamp']
|
||
|
|
symbol=symbol,
|
||
|
|
price=price_value,
|
||
|
|
eigenvalues=[float(e) for e in eigenvalues] if eigenvalues else [],
|
||
|
|
velocity_divergence=vel_div,
|
||
|
|
irp_alignment=irp_alignment,
|
||
|
|
scan_number=data.get('scan_number'),
|
||
|
|
source="hazelcast",
|
||
|
|
scan_payload=data,
|
||
|
|
)
|
||
|
|
|
||
|
|
self._last_snapshot = snapshot
|
||
|
|
|
||
|
|
# Calculate latency
|
||
|
|
self._latency_ms = (datetime.utcnow() - start).total_seconds() * 1000
|
||
|
|
|
||
|
|
return snapshot
|
||
|
|
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"Error reading snapshot: {e}")
|
||
|
|
return self._last_snapshot
|
||
|
|
|
||
|
|
async def subscribe_snapshots(self, callback: Callable[[MarketSnapshot], None]):
|
||
|
|
"""
|
||
|
|
Subscribe to snapshot updates via polling (listener not critical).
|
||
|
|
|
||
|
|
Polling every 5s matches DolphinNG6 pulse.
|
||
|
|
"""
|
||
|
|
logger.info("[✓] Snapshot subscription ready (polling mode)")
|
||
|
|
|
||
|
|
async def get_acb_update(self) -> Optional[ACBUpdate]:
|
||
|
|
"""Get ACB update from Hazelcast."""
|
||
|
|
try:
|
||
|
|
# DOLPHIN_FEATURES["acb_boost"] — written by acb_processor_service.py
|
||
|
|
raw = self.features_map.get("acb_boost")
|
||
|
|
if raw:
|
||
|
|
data = json.loads(raw)
|
||
|
|
return ACBUpdate(
|
||
|
|
timestamp=datetime.utcnow(),
|
||
|
|
boost=data.get('boost', 1.0),
|
||
|
|
beta=data.get('beta', 0.5),
|
||
|
|
cut=data.get('cut', 0.0),
|
||
|
|
posture=data.get('posture', 'APEX')
|
||
|
|
)
|
||
|
|
except Exception as e:
|
||
|
|
logger.error(f"ACB read error: {e}")
|
||
|
|
return None
|
||
|
|
|
||
|
|
def get_latency_ms(self) -> float:
|
||
|
|
"""Return last measured latency."""
|
||
|
|
return self._latency_ms
|
||
|
|
|
||
|
|
def health_check(self) -> bool:
|
||
|
|
"""Check Hazelcast connection health."""
|
||
|
|
if not self.hz_client:
|
||
|
|
return False
|
||
|
|
try:
|
||
|
|
# Quick ping
|
||
|
|
self.features_map.size()
|
||
|
|
return True
|
||
|
|
except:
|
||
|
|
return False
|