Files
siloqy/prod/clean_arch/adapters/hazelcast_feed.py

209 lines
7.7 KiB
Python
Raw Normal View History

PINK: TUI Hz fix + DC gate + ACB boost + 10 new tests (104/104 green) TUI Hz fix: - hazelcast_projection.py: write_engine_snapshot now writes all NAUTILUS-era field aliases (trades_executed, current_leverage, open_positions as list, last_scan_number, last_vel_div, vol_ok, open_notional) so gear_rows/capital panel work with no TUI changes. - dolphin_status_pink.py: _normalize_eng_for_tui() safety-net translation added; render() uses it on every Hz read. DC gate (SYSTEM BIBLE §4.2, champion config): - pink_direct.py: _dc_contradicts() — 7-tick lookback, 0.75 bps threshold. Rising price (chg > 0.75 bps) blocks ENTER via dataclasses.replace(HOLD, DC_CONTRADICT). Price history deque initialized in connect(); dc_skip_contradicts=True enforced. ACB boost (SYSTEM BIBLE §10): - hazelcast_feed.py: fix wrong key "latest_acb" → "acb_boost" (DOLPHIN_FEATURES key written by acb_processor_service.py). - pink_direct.py: _last_acb_boost read from scan_payload["acb_boost"] first (scan bridge may embed it), then Hz direct fallback. Applied to intent.leverage via dataclasses.replace() after IntentEngine.plan(), capped at 3x. - _last_scan_number, _last_vel_div, _last_vol_ok tracked from scan_payload. OBF gate: NOT implemented. OBF shards (DOLPHIN_FEATURES_SHARD_*) require new Hz map connections + symbol routing. Gap documented; requires separate decision. Tests: TestDCGate (5) + TestNormalizeEngForTui (5) — 10 new, 104 total, all green. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 14:00:48 +02:00
#!/usr/bin/env python3
"""
ADAPTER: HazelcastDataFeed
==========================
Implementation of DataFeedPort using Hazelcast.
Consumes the canonical NG7/NG8 Hazelcast payload that BLUE already uses.
The legacy scan_bridge bundle format is intentionally not part of the live
contract.
"""
import json
import logging
from datetime import datetime
from typing import Optional, Callable, Dict, Any
# Port interface
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from ports.data_feed import DataFeedPort, MarketSnapshot, ACBUpdate
try:
from .eigen_scan_normalizer import normalize_ng7_scan
except ImportError: # pragma: no cover - direct script fallback
from prod.clean_arch.adapters.eigen_scan_normalizer import normalize_ng7_scan
logger = logging.getLogger("HazelcastDataFeed")
class HazelcastDataFeed(DataFeedPort):
"""
ADAPTER: Hazelcast implementation of DataFeedPort.
Reads from DolphinNG6 output via Hazelcast maps:
- DOLPHIN_FEATURES: Price + eigenvalues (ALWAYS SYNCED)
- DOLPHIN_SAFETY: Posture/mode
- DOLPHIN_STATE_*: Portfolio state
No sync issues - all data written atomically by DolphinNG6.
"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.hz_client = None
self.features_map = None
self.safety_map = None
self._last_snapshot: Optional[MarketSnapshot] = None
self._latency_ms = 0.0
async def connect(self) -> bool:
"""Connect to Hazelcast cluster."""
try:
import hazelcast
hz_config = self.config.get('hazelcast', {})
cluster = hz_config.get('cluster', 'dolphin')
host = hz_config.get('host', 'localhost:5701')
logger.info(f"Connecting to Hazelcast: {host} (cluster: {cluster})")
self.hz_client = hazelcast.HazelcastClient(
cluster_name=cluster,
cluster_members=[host],
)
# Get reference to maps
self.features_map = self.hz_client.get_map('DOLPHIN_FEATURES').blocking()
self.safety_map = self.hz_client.get_map('DOLPHIN_SAFETY').blocking()
# Test connection
size = self.features_map.size()
logger.info(f"[✓] Connected. Features map: {size} entries")
return True
except Exception as e:
logger.error(f"[✗] Connection failed: {e}")
return False
async def disconnect(self):
"""Clean disconnect."""
if self.hz_client:
self.hz_client.shutdown()
logger.info("[✓] Disconnected from Hazelcast")
async def get_latest_snapshot(self, symbol: str = "BTCUSDT") -> Optional[MarketSnapshot]:
"""
Get latest synchronized snapshot from Hazelcast.
Consumes the canonical single-result NG7/NG8 Hazelcast payload.
BLUE is the contract: PINK follows it.
"""
try:
start = datetime.utcnow()
raw = self.features_map.get("latest_eigen_scan")
if not raw:
return self._last_snapshot # Return cached if available
data = json.loads(raw)
if isinstance(data, dict) and data.get("version") == "NG7":
data = normalize_ng7_scan(data)
result = data.get("result") if isinstance(data, dict) else None
if isinstance(result, dict):
scan = result
scan_asset = str(scan.get("asset") or data.get("target_asset") or symbol).upper()
else:
scan = data if isinstance(data, dict) else {}
scan_asset = str(scan.get("asset") or data.get("target_asset") or symbol).upper()
if scan_asset and symbol and scan_asset != str(symbol).upper():
logger.warning(f"Symbol {symbol} not in latest_eigen_scan asset {scan_asset}")
return None
price_value = float(scan.get('price') or 0.0)
eigenvalues = []
eigen_tracking = scan.get('eigenvalue_tracking') or {}
if isinstance(eigen_tracking, dict):
lambda_max = eigen_tracking.get('lambda_max')
if lambda_max is not None:
eigenvalues = [float(lambda_max)]
if not eigenvalues:
eigenvalues = [float(v) for v in (scan.get('eigenvalues') or []) if v is not None]
# Prefer an explicit vel_div if present; otherwise compute it from
# 50 vs 750 velocity tracking on the single-result schema.
vel_div = scan.get('vel_div')
if vel_div is None:
multi = scan.get('multi_window_results') or {}
if isinstance(multi, dict):
w50 = multi.get('50', {}).get('tracking_data', {}) if isinstance(multi.get('50'), dict) else {}
w750 = multi.get('750', {}).get('tracking_data', {}) if isinstance(multi.get('750'), dict) else {}
try:
v50 = float(w50.get('lambda_max_velocity'))
v750 = float(w750.get('lambda_max_velocity'))
vel_div = v50 - v750
except Exception:
vel_div = None
irp_alignment = scan.get('irp_alignment')
if irp_alignment is None:
irp_alignment = scan.get('confidence')
# Build snapshot
snapshot = MarketSnapshot(
timestamp=datetime.utcnow(), # Or parse from data['timestamp']
symbol=symbol,
price=price_value,
eigenvalues=[float(e) for e in eigenvalues] if eigenvalues else [],
velocity_divergence=vel_div,
irp_alignment=irp_alignment,
scan_number=data.get('scan_number'),
source="hazelcast",
scan_payload=data,
)
self._last_snapshot = snapshot
# Calculate latency
self._latency_ms = (datetime.utcnow() - start).total_seconds() * 1000
return snapshot
except Exception as e:
logger.error(f"Error reading snapshot: {e}")
return self._last_snapshot
async def subscribe_snapshots(self, callback: Callable[[MarketSnapshot], None]):
"""
Subscribe to snapshot updates via polling (listener not critical).
Polling every 5s matches DolphinNG6 pulse.
"""
logger.info("[✓] Snapshot subscription ready (polling mode)")
async def get_acb_update(self) -> Optional[ACBUpdate]:
"""Get ACB update from Hazelcast."""
try:
# DOLPHIN_FEATURES["acb_boost"] — written by acb_processor_service.py
raw = self.features_map.get("acb_boost")
if raw:
data = json.loads(raw)
return ACBUpdate(
timestamp=datetime.utcnow(),
boost=data.get('boost', 1.0),
beta=data.get('beta', 0.5),
cut=data.get('cut', 0.0),
posture=data.get('posture', 'APEX')
)
except Exception as e:
logger.error(f"ACB read error: {e}")
return None
def get_latency_ms(self) -> float:
"""Return last measured latency."""
return self._latency_ms
def health_check(self) -> bool:
"""Check Hazelcast connection health."""
if not self.hz_client:
return False
try:
# Quick ping
self.features_map.size()
return True
except:
return False