PINK: TUI Hz fix + DC gate + ACB boost + 10 new tests (104/104 green)
TUI Hz fix: - hazelcast_projection.py: write_engine_snapshot now writes all NAUTILUS-era field aliases (trades_executed, current_leverage, open_positions as list, last_scan_number, last_vel_div, vol_ok, open_notional) so gear_rows/capital panel work with no TUI changes. - dolphin_status_pink.py: _normalize_eng_for_tui() safety-net translation added; render() uses it on every Hz read. DC gate (SYSTEM BIBLE §4.2, champion config): - pink_direct.py: _dc_contradicts() — 7-tick lookback, 0.75 bps threshold. Rising price (chg > 0.75 bps) blocks ENTER via dataclasses.replace(HOLD, DC_CONTRADICT). Price history deque initialized in connect(); dc_skip_contradicts=True enforced. ACB boost (SYSTEM BIBLE §10): - hazelcast_feed.py: fix wrong key "latest_acb" → "acb_boost" (DOLPHIN_FEATURES key written by acb_processor_service.py). - pink_direct.py: _last_acb_boost read from scan_payload["acb_boost"] first (scan bridge may embed it), then Hz direct fallback. Applied to intent.leverage via dataclasses.replace() after IntentEngine.plan(), capped at 3x. - _last_scan_number, _last_vel_div, _last_vol_ok tracked from scan_payload. OBF gate: NOT implemented. OBF shards (DOLPHIN_FEATURES_SHARD_*) require new Hz map connections + symbol routing. Gap documented; requires separate decision. Tests: TestDCGate (5) + TestNormalizeEngForTui (5) — 10 new, 104 total, all green. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
208
prod/clean_arch/adapters/hazelcast_feed.py
Normal file
208
prod/clean_arch/adapters/hazelcast_feed.py
Normal file
@@ -0,0 +1,208 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
ADAPTER: HazelcastDataFeed
|
||||
==========================
|
||||
Implementation of DataFeedPort using Hazelcast.
|
||||
|
||||
Consumes the canonical NG7/NG8 Hazelcast payload that BLUE already uses.
|
||||
The legacy scan_bridge bundle format is intentionally not part of the live
|
||||
contract.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import Optional, Callable, Dict, Any
|
||||
|
||||
# Port interface
|
||||
import sys
|
||||
from pathlib import Path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
from ports.data_feed import DataFeedPort, MarketSnapshot, ACBUpdate
|
||||
|
||||
try:
|
||||
from .eigen_scan_normalizer import normalize_ng7_scan
|
||||
except ImportError: # pragma: no cover - direct script fallback
|
||||
from prod.clean_arch.adapters.eigen_scan_normalizer import normalize_ng7_scan
|
||||
|
||||
logger = logging.getLogger("HazelcastDataFeed")
|
||||
|
||||
|
||||
class HazelcastDataFeed(DataFeedPort):
|
||||
"""
|
||||
ADAPTER: Hazelcast implementation of DataFeedPort.
|
||||
|
||||
Reads from DolphinNG6 output via Hazelcast maps:
|
||||
- DOLPHIN_FEATURES: Price + eigenvalues (ALWAYS SYNCED)
|
||||
- DOLPHIN_SAFETY: Posture/mode
|
||||
- DOLPHIN_STATE_*: Portfolio state
|
||||
|
||||
No sync issues - all data written atomically by DolphinNG6.
|
||||
"""
|
||||
|
||||
def __init__(self, config: Dict[str, Any]):
|
||||
self.config = config
|
||||
self.hz_client = None
|
||||
self.features_map = None
|
||||
self.safety_map = None
|
||||
self._last_snapshot: Optional[MarketSnapshot] = None
|
||||
self._latency_ms = 0.0
|
||||
|
||||
async def connect(self) -> bool:
|
||||
"""Connect to Hazelcast cluster."""
|
||||
try:
|
||||
import hazelcast
|
||||
|
||||
hz_config = self.config.get('hazelcast', {})
|
||||
cluster = hz_config.get('cluster', 'dolphin')
|
||||
host = hz_config.get('host', 'localhost:5701')
|
||||
|
||||
logger.info(f"Connecting to Hazelcast: {host} (cluster: {cluster})")
|
||||
|
||||
self.hz_client = hazelcast.HazelcastClient(
|
||||
cluster_name=cluster,
|
||||
cluster_members=[host],
|
||||
)
|
||||
|
||||
# Get reference to maps
|
||||
self.features_map = self.hz_client.get_map('DOLPHIN_FEATURES').blocking()
|
||||
self.safety_map = self.hz_client.get_map('DOLPHIN_SAFETY').blocking()
|
||||
|
||||
# Test connection
|
||||
size = self.features_map.size()
|
||||
logger.info(f"[✓] Connected. Features map: {size} entries")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"[✗] Connection failed: {e}")
|
||||
return False
|
||||
|
||||
async def disconnect(self):
|
||||
"""Clean disconnect."""
|
||||
if self.hz_client:
|
||||
self.hz_client.shutdown()
|
||||
logger.info("[✓] Disconnected from Hazelcast")
|
||||
|
||||
async def get_latest_snapshot(self, symbol: str = "BTCUSDT") -> Optional[MarketSnapshot]:
|
||||
"""
|
||||
Get latest synchronized snapshot from Hazelcast.
|
||||
|
||||
Consumes the canonical single-result NG7/NG8 Hazelcast payload.
|
||||
BLUE is the contract: PINK follows it.
|
||||
"""
|
||||
try:
|
||||
start = datetime.utcnow()
|
||||
|
||||
raw = self.features_map.get("latest_eigen_scan")
|
||||
if not raw:
|
||||
return self._last_snapshot # Return cached if available
|
||||
|
||||
data = json.loads(raw)
|
||||
if isinstance(data, dict) and data.get("version") == "NG7":
|
||||
data = normalize_ng7_scan(data)
|
||||
|
||||
result = data.get("result") if isinstance(data, dict) else None
|
||||
if isinstance(result, dict):
|
||||
scan = result
|
||||
scan_asset = str(scan.get("asset") or data.get("target_asset") or symbol).upper()
|
||||
else:
|
||||
scan = data if isinstance(data, dict) else {}
|
||||
scan_asset = str(scan.get("asset") or data.get("target_asset") or symbol).upper()
|
||||
|
||||
if scan_asset and symbol and scan_asset != str(symbol).upper():
|
||||
logger.warning(f"Symbol {symbol} not in latest_eigen_scan asset {scan_asset}")
|
||||
return None
|
||||
|
||||
price_value = float(scan.get('price') or 0.0)
|
||||
eigenvalues = []
|
||||
eigen_tracking = scan.get('eigenvalue_tracking') or {}
|
||||
if isinstance(eigen_tracking, dict):
|
||||
lambda_max = eigen_tracking.get('lambda_max')
|
||||
if lambda_max is not None:
|
||||
eigenvalues = [float(lambda_max)]
|
||||
if not eigenvalues:
|
||||
eigenvalues = [float(v) for v in (scan.get('eigenvalues') or []) if v is not None]
|
||||
|
||||
# Prefer an explicit vel_div if present; otherwise compute it from
|
||||
# 50 vs 750 velocity tracking on the single-result schema.
|
||||
vel_div = scan.get('vel_div')
|
||||
if vel_div is None:
|
||||
multi = scan.get('multi_window_results') or {}
|
||||
if isinstance(multi, dict):
|
||||
w50 = multi.get('50', {}).get('tracking_data', {}) if isinstance(multi.get('50'), dict) else {}
|
||||
w750 = multi.get('750', {}).get('tracking_data', {}) if isinstance(multi.get('750'), dict) else {}
|
||||
try:
|
||||
v50 = float(w50.get('lambda_max_velocity'))
|
||||
v750 = float(w750.get('lambda_max_velocity'))
|
||||
vel_div = v50 - v750
|
||||
except Exception:
|
||||
vel_div = None
|
||||
irp_alignment = scan.get('irp_alignment')
|
||||
if irp_alignment is None:
|
||||
irp_alignment = scan.get('confidence')
|
||||
|
||||
# Build snapshot
|
||||
snapshot = MarketSnapshot(
|
||||
timestamp=datetime.utcnow(), # Or parse from data['timestamp']
|
||||
symbol=symbol,
|
||||
price=price_value,
|
||||
eigenvalues=[float(e) for e in eigenvalues] if eigenvalues else [],
|
||||
velocity_divergence=vel_div,
|
||||
irp_alignment=irp_alignment,
|
||||
scan_number=data.get('scan_number'),
|
||||
source="hazelcast",
|
||||
scan_payload=data,
|
||||
)
|
||||
|
||||
self._last_snapshot = snapshot
|
||||
|
||||
# Calculate latency
|
||||
self._latency_ms = (datetime.utcnow() - start).total_seconds() * 1000
|
||||
|
||||
return snapshot
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading snapshot: {e}")
|
||||
return self._last_snapshot
|
||||
|
||||
async def subscribe_snapshots(self, callback: Callable[[MarketSnapshot], None]):
|
||||
"""
|
||||
Subscribe to snapshot updates via polling (listener not critical).
|
||||
|
||||
Polling every 5s matches DolphinNG6 pulse.
|
||||
"""
|
||||
logger.info("[✓] Snapshot subscription ready (polling mode)")
|
||||
|
||||
async def get_acb_update(self) -> Optional[ACBUpdate]:
|
||||
"""Get ACB update from Hazelcast."""
|
||||
try:
|
||||
# DOLPHIN_FEATURES["acb_boost"] — written by acb_processor_service.py
|
||||
raw = self.features_map.get("acb_boost")
|
||||
if raw:
|
||||
data = json.loads(raw)
|
||||
return ACBUpdate(
|
||||
timestamp=datetime.utcnow(),
|
||||
boost=data.get('boost', 1.0),
|
||||
beta=data.get('beta', 0.5),
|
||||
cut=data.get('cut', 0.0),
|
||||
posture=data.get('posture', 'APEX')
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"ACB read error: {e}")
|
||||
return None
|
||||
|
||||
def get_latency_ms(self) -> float:
|
||||
"""Return last measured latency."""
|
||||
return self._latency_ms
|
||||
|
||||
def health_check(self) -> bool:
|
||||
"""Check Hazelcast connection health."""
|
||||
if not self.hz_client:
|
||||
return False
|
||||
try:
|
||||
# Quick ping
|
||||
self.features_map.size()
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
Reference in New Issue
Block a user