diff --git a/Observability/dolphin_status_pink.py b/Observability/dolphin_status_pink.py index aff94c2..677b308 100644 --- a/Observability/dolphin_status_pink.py +++ b/Observability/dolphin_status_pink.py @@ -1607,13 +1607,46 @@ def gear_rows(eng, safe, acb, exf, hb, obf_universe=None): return sig_row, trade_row, fill +def _normalize_eng_for_tui(eng: dict) -> dict: + """Translate DITAv2 Hz engine_snapshot fields to TUI-expected NAUTILUS-era names. + + PinkHzStateWriter already writes the TUI-compatible aliases; this function is a + defensive fallback in case any older or partial snapshot lands in Hz. + """ + if not eng: + return {} + # If it already has the NAUTILUS-era field, it's either the new DITAv2 format + # (which writes both) or old NAUTILUS format — either way, return as-is. + if "trades_executed" in eng: + return eng + # Minimal translation for any snapshot that only has DITAv2 fields + out = dict(eng) + slot = eng.get("slot") or {} + size = float(slot.get("size") or 0.0) + ep = float(slot.get("entry_price") or 0.0) + out.setdefault("trades_executed", eng.get("trade_seq", 0)) + out.setdefault("current_leverage", eng.get("our_leverage", 0.0)) + out.setdefault("leverage_abs_cap", 3.0) + out.setdefault("open_notional", size * ep) + out.setdefault("last_scan_number", eng.get("scan_number", 0)) + out.setdefault("scans_processed", eng.get("scan_number", 0)) + out.setdefault("last_vel_div", 0.0) + out.setdefault("vol_ok", True) + out.setdefault("bar_idx", 0) + # open_positions: TUI expects a list (len() is called on it) + open_int = int(eng.get("open_positions", 0)) + if not isinstance(eng.get("open_positions"), list): + out["open_positions"] = [slot] if open_int > 0 and slot else [] + return out + + def render(hz): global START_CAP, CAP_PEAK # ── DITAv2 kernel + BingX account (reads kernel snapshot file, not Hz) ── ditav2_section = _render_ditav2_section() - eng = _get(hz, PINK_STATE_MAP, "engine_snapshot") + eng = _normalize_eng_for_tui(_get(hz, PINK_STATE_MAP, "engine_snapshot")) cap = _get(hz, PINK_STATE_MAP, "capital_checkpoint") safe = _get(hz, PINK_SAFETY_MAP, "latest") hb = _get(hz, PINK_HEARTBEAT_MAP, PINK_HEARTBEAT_KEY) diff --git a/prod/clean_arch/adapters/hazelcast_feed.py b/prod/clean_arch/adapters/hazelcast_feed.py new file mode 100644 index 0000000..2803ab0 --- /dev/null +++ b/prod/clean_arch/adapters/hazelcast_feed.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python3 +""" +ADAPTER: HazelcastDataFeed +========================== +Implementation of DataFeedPort using Hazelcast. + +Consumes the canonical NG7/NG8 Hazelcast payload that BLUE already uses. +The legacy scan_bridge bundle format is intentionally not part of the live +contract. +""" + +import json +import logging +from datetime import datetime +from typing import Optional, Callable, Dict, Any + +# Port interface +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent.parent)) +from ports.data_feed import DataFeedPort, MarketSnapshot, ACBUpdate + +try: + from .eigen_scan_normalizer import normalize_ng7_scan +except ImportError: # pragma: no cover - direct script fallback + from prod.clean_arch.adapters.eigen_scan_normalizer import normalize_ng7_scan + +logger = logging.getLogger("HazelcastDataFeed") + + +class HazelcastDataFeed(DataFeedPort): + """ + ADAPTER: Hazelcast implementation of DataFeedPort. + + Reads from DolphinNG6 output via Hazelcast maps: + - DOLPHIN_FEATURES: Price + eigenvalues (ALWAYS SYNCED) + - DOLPHIN_SAFETY: Posture/mode + - DOLPHIN_STATE_*: Portfolio state + + No sync issues - all data written atomically by DolphinNG6. + """ + + def __init__(self, config: Dict[str, Any]): + self.config = config + self.hz_client = None + self.features_map = None + self.safety_map = None + self._last_snapshot: Optional[MarketSnapshot] = None + self._latency_ms = 0.0 + + async def connect(self) -> bool: + """Connect to Hazelcast cluster.""" + try: + import hazelcast + + hz_config = self.config.get('hazelcast', {}) + cluster = hz_config.get('cluster', 'dolphin') + host = hz_config.get('host', 'localhost:5701') + + logger.info(f"Connecting to Hazelcast: {host} (cluster: {cluster})") + + self.hz_client = hazelcast.HazelcastClient( + cluster_name=cluster, + cluster_members=[host], + ) + + # Get reference to maps + self.features_map = self.hz_client.get_map('DOLPHIN_FEATURES').blocking() + self.safety_map = self.hz_client.get_map('DOLPHIN_SAFETY').blocking() + + # Test connection + size = self.features_map.size() + logger.info(f"[✓] Connected. Features map: {size} entries") + + return True + + except Exception as e: + logger.error(f"[✗] Connection failed: {e}") + return False + + async def disconnect(self): + """Clean disconnect.""" + if self.hz_client: + self.hz_client.shutdown() + logger.info("[✓] Disconnected from Hazelcast") + + async def get_latest_snapshot(self, symbol: str = "BTCUSDT") -> Optional[MarketSnapshot]: + """ + Get latest synchronized snapshot from Hazelcast. + + Consumes the canonical single-result NG7/NG8 Hazelcast payload. + BLUE is the contract: PINK follows it. + """ + try: + start = datetime.utcnow() + + raw = self.features_map.get("latest_eigen_scan") + if not raw: + return self._last_snapshot # Return cached if available + + data = json.loads(raw) + if isinstance(data, dict) and data.get("version") == "NG7": + data = normalize_ng7_scan(data) + + result = data.get("result") if isinstance(data, dict) else None + if isinstance(result, dict): + scan = result + scan_asset = str(scan.get("asset") or data.get("target_asset") or symbol).upper() + else: + scan = data if isinstance(data, dict) else {} + scan_asset = str(scan.get("asset") or data.get("target_asset") or symbol).upper() + + if scan_asset and symbol and scan_asset != str(symbol).upper(): + logger.warning(f"Symbol {symbol} not in latest_eigen_scan asset {scan_asset}") + return None + + price_value = float(scan.get('price') or 0.0) + eigenvalues = [] + eigen_tracking = scan.get('eigenvalue_tracking') or {} + if isinstance(eigen_tracking, dict): + lambda_max = eigen_tracking.get('lambda_max') + if lambda_max is not None: + eigenvalues = [float(lambda_max)] + if not eigenvalues: + eigenvalues = [float(v) for v in (scan.get('eigenvalues') or []) if v is not None] + + # Prefer an explicit vel_div if present; otherwise compute it from + # 50 vs 750 velocity tracking on the single-result schema. + vel_div = scan.get('vel_div') + if vel_div is None: + multi = scan.get('multi_window_results') or {} + if isinstance(multi, dict): + w50 = multi.get('50', {}).get('tracking_data', {}) if isinstance(multi.get('50'), dict) else {} + w750 = multi.get('750', {}).get('tracking_data', {}) if isinstance(multi.get('750'), dict) else {} + try: + v50 = float(w50.get('lambda_max_velocity')) + v750 = float(w750.get('lambda_max_velocity')) + vel_div = v50 - v750 + except Exception: + vel_div = None + irp_alignment = scan.get('irp_alignment') + if irp_alignment is None: + irp_alignment = scan.get('confidence') + + # Build snapshot + snapshot = MarketSnapshot( + timestamp=datetime.utcnow(), # Or parse from data['timestamp'] + symbol=symbol, + price=price_value, + eigenvalues=[float(e) for e in eigenvalues] if eigenvalues else [], + velocity_divergence=vel_div, + irp_alignment=irp_alignment, + scan_number=data.get('scan_number'), + source="hazelcast", + scan_payload=data, + ) + + self._last_snapshot = snapshot + + # Calculate latency + self._latency_ms = (datetime.utcnow() - start).total_seconds() * 1000 + + return snapshot + + except Exception as e: + logger.error(f"Error reading snapshot: {e}") + return self._last_snapshot + + async def subscribe_snapshots(self, callback: Callable[[MarketSnapshot], None]): + """ + Subscribe to snapshot updates via polling (listener not critical). + + Polling every 5s matches DolphinNG6 pulse. + """ + logger.info("[✓] Snapshot subscription ready (polling mode)") + + async def get_acb_update(self) -> Optional[ACBUpdate]: + """Get ACB update from Hazelcast.""" + try: + # DOLPHIN_FEATURES["acb_boost"] — written by acb_processor_service.py + raw = self.features_map.get("acb_boost") + if raw: + data = json.loads(raw) + return ACBUpdate( + timestamp=datetime.utcnow(), + boost=data.get('boost', 1.0), + beta=data.get('beta', 0.5), + cut=data.get('cut', 0.0), + posture=data.get('posture', 'APEX') + ) + except Exception as e: + logger.error(f"ACB read error: {e}") + return None + + def get_latency_ms(self) -> float: + """Return last measured latency.""" + return self._latency_ms + + def health_check(self) -> bool: + """Check Hazelcast connection health.""" + if not self.hz_client: + return False + try: + # Quick ping + self.features_map.size() + return True + except: + return False diff --git a/prod/clean_arch/dita_v2/hazelcast_projection.py b/prod/clean_arch/dita_v2/hazelcast_projection.py index 3948ed6..a3a015d 100644 --- a/prod/clean_arch/dita_v2/hazelcast_projection.py +++ b/prod/clean_arch/dita_v2/hazelcast_projection.py @@ -131,31 +131,56 @@ class PinkHzStateWriter: acc_dict: dict, posture: str = "APEX", our_leverage: float = 0.0, + scan_number: int = 0, + vel_div: float = 0.0, + vol_ok: bool = True, ) -> None: - """Write full engine state. Called after every kernel mutation (non-blocking).""" + """Write full engine state. Called after every kernel mutation (non-blocking). + + Field names mirror DOLPHIN_STATE_BLUE["engine_snapshot"] where possible so + the existing PINK TUI panels (gear_rows, capital panel, etc.) work without + modification. DITAv2-specific fields are additive. + """ + open_pos_int = int(acc_dict.get("open_positions", 0)) + trade_seq = int(acc_dict.get("trade_seq", 0)) + size = float(slot_dict.get("size") or 0.0) + ep = float(slot_dict.get("entry_price") or 0.0) + open_notional = size * ep payload: dict[str, Any] = { + # Core (BLUE-compatible names) "strategy": "pink", "capital": acc_dict.get("capital", 0.0), "equity": acc_dict.get("equity", 0.0), "available_capital": acc_dict.get("available_capital", 0.0), "pnl": acc_dict.get("realized_pnl_total", 0.0), "fee_total": acc_dict.get("fee_total", 0.0), - "open_positions": int(acc_dict.get("open_positions", 0)), - "trade_seq": int(acc_dict.get("trade_seq", 0)), "posture": posture, "capital_frozen": bool(acc_dict.get("capital_frozen", False)), + "updated_at": _utcnow_iso(), + # TUI-compatible aliases (NAUTILUS-era field names expected by gear_rows etc.) + "trades_executed": trade_seq, + "current_leverage": our_leverage, + "leverage_abs_cap": 3.0, + "open_notional": open_notional, + "open_positions": [slot_dict] if open_pos_int > 0 else [], + "last_scan_number": scan_number, + "scans_processed": scan_number, + "last_vel_div": vel_div, + "vol_ok": vol_ok, + "bar_idx": scan_number, + # DITAv2-native fields + "trade_seq": trade_seq, "our_leverage": our_leverage, "slot": slot_dict, - "updated_at": _utcnow_iso(), } _hz_write_no_wait(self._state_map, "engine_snapshot", _json_encode(payload)) - # Compact "latest" key — same shape as BLUE's DOLPHIN_STATE_BLUE["latest"] + # Compact "latest" — same shape as BLUE's DOLPHIN_STATE_BLUE["latest"] _hz_write_no_wait(self._state_map, "latest", _json_encode({ "strategy": "pink", "capital": payload["capital"], "date": _today_iso(), "pnl": payload["pnl"], - "trades": payload["trade_seq"], + "trades": trade_seq, "posture": posture, "updated_at": payload["updated_at"], })) diff --git a/prod/clean_arch/dita_v2/test_flaws.py b/prod/clean_arch/dita_v2/test_flaws.py index 284cada..bebc8cc 100644 --- a/prod/clean_arch/dita_v2/test_flaws.py +++ b/prod/clean_arch/dita_v2/test_flaws.py @@ -1392,3 +1392,90 @@ class TestVolOkGate: assert decision.action.value not in ("VOL_GATE",), ( "vol_ok=True must not block on vol_ok gate" ) + + +# ============================================================ +# DC gate (_dc_contradicts) — SYSTEM BIBLE §4.2 +# ============================================================ + +class TestDCGate: + """Direction Confirmation gate: rising price over 7-tick window blocks SHORT entry.""" + + def _rt(self, prices: list): + """Build a minimal PinkDirectRuntime-like object with price history populated.""" + from collections import deque + import types + + obj = types.SimpleNamespace() + obj._price_history = deque(prices, maxlen=10) + + # Bind the method to obj + from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime + obj._dc_contradicts = lambda **kw: PinkDirectRuntime._dc_contradicts(obj, **kw) + return obj + + def test_rising_price_contradicts(self): + # p[-8] = 100.0, p[-1] = 100.2 → chg = +20 bps → CONTRADICT + prices = [100.0] + [100.05] * 6 + [100.2] + rt = self._rt(prices) + assert rt._dc_contradicts(), "Rising price must be DC CONTRADICT" + + def test_falling_price_confirms(self): + # p[-8] = 100.0, p[-1] = 99.9 → chg = -10 bps → CONFIRM (not a block) + prices = [100.0] + [99.95] * 6 + [99.9] + rt = self._rt(prices) + assert not rt._dc_contradicts(), "Falling price must NOT be DC CONTRADICT" + + def test_flat_price_neutral(self): + # < 0.75 bps change → NEUTRAL + prices = [100.0] * 8 + rt = self._rt(prices) + assert not rt._dc_contradicts(), "Flat price must NOT be DC CONTRADICT" + + def test_insufficient_history_neutral(self): + # < 8 prices → not enough data → NEUTRAL (allow entry) + prices = [100.0, 100.5] # only 2 entries + rt = self._rt(prices) + assert not rt._dc_contradicts(), "Insufficient history must NOT block entry" + + def test_exactly_threshold_neutral(self): + # Exactly 0.75 bps → NOT a CONTRADICT (> not >=) + prices = [100.0] + [100.0] * 6 + [100.0075] # 0.75 bps exactly + rt = self._rt(prices) + assert not rt._dc_contradicts(), "Exactly at threshold must NOT be CONTRADICT" + + +# ============================================================ +# TUI normalization — _normalize_eng_for_tui +# ============================================================ + +class TestNormalizeEngForTui: + """_normalize_eng_for_tui translates DITAv2 Hz snapshot to NAUTILUS-era field names.""" + + def _norm(self, eng: dict) -> dict: + from Observability.dolphin_status_pink import _normalize_eng_for_tui + return _normalize_eng_for_tui(eng) + + def test_empty_returns_empty(self): + assert self._norm({}) == {} + + def test_already_nautilus_format_passthrough(self): + eng = {"trades_executed": 5, "capital": 25000.0} + out = self._norm(eng) + assert out is eng or out["trades_executed"] == 5 + + def test_ditav2_format_adds_trades_executed(self): + eng = {"trade_seq": 7, "capital": 25000.0, "slot": {}} + out = self._norm(eng) + assert out["trades_executed"] == 7, "trade_seq must be aliased as trades_executed" + + def test_open_positions_becomes_list(self): + eng = {"open_positions": 1, "slot": {"size": 0.5, "entry_price": 50000.0}} + out = self._norm(eng) + assert isinstance(out["open_positions"], list), "open_positions int must become list" + assert len(out["open_positions"]) == 1 + + def test_zero_open_positions_empty_list(self): + eng = {"open_positions": 0, "slot": {}} + out = self._norm(eng) + assert out["open_positions"] == [], "zero open_positions must become empty list" diff --git a/prod/clean_arch/runtime/pink_direct.py b/prod/clean_arch/runtime/pink_direct.py index 8850d30..bb3bb30 100644 --- a/prod/clean_arch/runtime/pink_direct.py +++ b/prod/clean_arch/runtime/pink_direct.py @@ -274,9 +274,19 @@ class PinkDirectRuntime: _enter_frozen: bool = field(default=False, init=False, repr=False, compare=False) # Last known posture — carried into Hz writes for TUI/algo monitoring _last_posture: str = field(default="APEX", init=False, repr=False, compare=False) + # Scan-derived fields for Hz writes and DC gate + _last_scan_number: int = field(default=0, init=False, repr=False, compare=False) + _last_vel_div: float = field(default=0.0, init=False, repr=False, compare=False) + _last_vol_ok: bool = field(default=True, init=False, repr=False, compare=False) + # Price history for Direction Confirmation (DC) gate — last 10 prices (5 needed for 7-bar) + _price_history: Any = field(default=None, init=False, repr=False, compare=False) + # ACB boost — multiplied into intent leverage (SYSTEM BIBLE §10); default=1.0 (no-op) + _last_acb_boost: float = field(default=1.0, init=False, repr=False, compare=False) async def connect(self, initial_capital: float = 25000.0) -> None: """Connect data feed, venue, seed capital from exchange, start WS stream.""" + from collections import deque + self._price_history = deque(maxlen=10) await self.data_feed.connect() venue = self.kernel.venue if hasattr(venue, "connect"): @@ -593,21 +603,57 @@ class PinkDirectRuntime: if isinstance(scan_payload.get("esof_payload"), dict) else None, ) - # Track posture for Hz writes + # Track scan-derived fields for Hz writes and DC gate self._last_posture = str(scan_payload.get("posture") or "APEX") + self._last_vel_div = float(scan_payload.get("vel_div") or scan_payload.get("velocity_divergence") or 0.0) + self._last_vol_ok = bool(scan_payload.get("vol_ok", True)) + self._last_scan_number = int(scan_payload.get("scan_number") or snapshot.scan_number or 0) + # ACB boost — read from scan_payload (scan bridge may embed it) or Hz direct + acb_data = scan_payload.get("acb_boost") or {} + if isinstance(acb_data, dict) and "boost" in acb_data: + self._last_acb_boost = max(0.1, float(acb_data.get("boost", 1.0))) + else: + # Fall back to Hz direct read (non-blocking — features_map is blocking proxy) + try: + feed = getattr(self.data_feed, "features_map", None) + if feed is not None: + raw = feed.get("acb_boost") + if raw: + import json as _json + d = _json.loads(raw) + self._last_acb_boost = max(0.1, float(d.get("boost", 1.0))) + except Exception: + pass # Hz read failure must never affect trading return dict( getattr(runtime, "latest_bundle_dict", {}) or bundle.as_dict() ) except Exception: return {} + def _dc_contradicts(self, lookback: int = 7, min_bps: float = 0.75) -> bool: + """Direction Confirmation gate (SYSTEM BIBLE §4.2, champion config). + + Returns True if the price over the last `lookback` ticks ROSE by ≥ min_bps bps + (0.75 bps). A rising price contradicts a SHORT signal → block ENTER. + + Champion: dc_skip_contradicts=True, dc_leverage_boost=1.0 (no boost). + """ + hist = self._price_history + if hist is None or len(hist) < lookback + 1: + return False # not enough history → NEUTRAL, allow entry + p0 = hist[-lookback - 1] + p1 = hist[-1] + if p0 <= 0: + return False + chg_bps = (p1 - p0) / p0 * 10_000.0 + return chg_bps > min_bps # rising price → CONTRADICT → skip + def _hz_publish(self, slot_dict: dict, acc: dict) -> None: """Fire-and-forget Hz write after any kernel state change. Computes system leverage (our_leverage = notional/capital) for the Hz - snapshot — this is the PINK/BLUE dual-leverage invariant: system leverage - reflects real margin utilisation; exchange leverage (1-3x cap) is set at - the BingX API level and never touches this path. + snapshot — PINK/BLUE dual-leverage invariant: system leverage reflects real + margin utilisation; exchange leverage (1-3x cap) is set at BingX API level. """ if self.hz_state_writer is None: return @@ -617,7 +663,12 @@ class PinkDirectRuntime: capital = float(acc.get("capital") or 0.0) our_leverage = (size * ep / capital) if capital > 1e-10 else 0.0 self.hz_state_writer.write_engine_snapshot( - slot_dict, acc, posture=self._last_posture, our_leverage=our_leverage + slot_dict, acc, + posture=self._last_posture, + our_leverage=our_leverage, + scan_number=self._last_scan_number, + vel_div=self._last_vel_div, + vol_ok=self._last_vol_ok, ) except Exception: pass @@ -780,13 +831,25 @@ class PinkDirectRuntime: closed=False, ) + # Price history for DC gate — update before decide() so current tick is included + if self._price_history is not None and snapshot.price and snapshot.price > 0: + self._price_history.append(float(snapshot.price)) + context = DecisionContext( # E-provided available_capital when present (E rules); K-fallback otherwise. capital=float(acc.get("available_capital") or acc.get("capital", 0.0)), open_positions=int(acc.get("open_positions", 0)), trade_seq=int(acc.get("trade_seq", 0)), ) + # DC gate (Direction Confirmation, SYSTEM BIBLE §4.2): + # Check BEFORE DecisionEngine so a CONTRADICT voids the ENTER without + # touching the kernel. Champion params: 7-tick lookback, 0.75 bps threshold. + # dc_skip_contradicts = True → rising price during short window = HOLD. + dc_blocked = self._dc_contradicts() decision = self.decision_engine.decide(snapshot, context, legacy_position) + if dc_blocked and decision.action == DecisionAction.ENTER: + import dataclasses + decision = dataclasses.replace(decision, action=DecisionAction.HOLD, reason="DC_CONTRADICT") self._emit("decision", decision=decision) intent_context = IntentContext( @@ -797,6 +860,13 @@ class PinkDirectRuntime: plan = self.intent_engine.plan(decision, intent_context, legacy_position) intent = plan.intent + # ACB boost (SYSTEM BIBLE §10): multiply intent leverage by the current boost + # factor from acb_processor_service. Capped at exchange_leverage_cap (3x). + if self._last_acb_boost != 1.0 and intent is not None: + import dataclasses as _dc + boosted_lev = min(3.0, max(1.0, float(intent.leverage or 1.0) * self._last_acb_boost)) + intent = _dc.replace(intent, leverage=boosted_lev) + if decision.action in {DecisionAction.ENTER, DecisionAction.EXIT}: kernel_intent = _decision_to_kernel_intent(decision, intent, slot_id=0)