"""VIOLET V3.4c: read BLUE-published live organs from Hazelcast. This is VIOLET-only. BLUE is untouched. The adapter reads the published BLUE surfaces that already exist in HZ and translates them into ``SizingFactors`` for the shadow path: - ``posture`` from ``DOLPHIN_STATE_BLUE.latest_nautilus`` / ``engine_snapshot`` - ``esof_score`` from ``DOLPHIN_FEATURES.esof_latest`` or ``esof_advisor_latest`` - ``acb_boost`` / ``acb_beta`` from ``DOLPHIN_FEATURES.acb_boost`` - ``mc_scale`` from ``DOLPHIN_FEATURES.mc_forewarner_latest`` - OB market consensus from the live ``asset_*_ob`` maps via BLUE's own ``OBFeatureEngine`` The remaining DC signal is left neutral here for now. It needs the same live signal-history path BLUE uses and should be added as a separate mirror step. """ from __future__ import annotations import json import sys from collections import deque from collections.abc import Mapping from dataclasses import dataclass from dataclasses import field from pathlib import Path from typing import Any, Deque, Iterable, Optional import hazelcast import numpy as np _PROJECT_ROOT = Path(__file__).resolve().parents[3] for _p in (str(_PROJECT_ROOT), str(_PROJECT_ROOT / "nautilus_dolphin")): if _p not in sys.path: sys.path.insert(0, _p) from nautilus_dolphin.nautilus.ob_features import OBFeatureEngine from nautilus_dolphin.nautilus.ob_provider import OBSnapshot, OBProvider from nautilus_dolphin.nautilus.alpha_signal_generator import AlphaSignalGenerator from .alpha_wrappers import VioletAssetSelector from .decision_engine import SizingFactors from .live_factor_source import esof_score_from_features, posture_from_engine_snapshot from .live_factors import extract_live_sizing_factors def _jsonish(value: Any) -> Any: if isinstance(value, str): try: return json.loads(value) except Exception: return value return value def _coerce_float(value: Any, default: Optional[float] = None) -> Optional[float]: try: if value is None: return default out = float(value) if not np.isfinite(out): return default return out except (TypeError, ValueError): return default def _read_hz_map(client: hazelcast.HazelcastClient, map_name: str, key: str) -> Any: try: return client.get_map(map_name).blocking().get(key) except Exception: return None def _map_status_to_mc_scale(payload: Any) -> float: data = _jsonish(payload) if isinstance(data, Mapping): status = str(data.get("status", "")).upper() else: status = str(data).upper() return 0.5 if status == "ORANGE" else 1.0 def _extract_acb(payload: Any) -> tuple[float, float]: data = _jsonish(payload) if isinstance(data, Mapping): boost = _coerce_float(data.get("boost"), 1.0) or 1.0 beta = _coerce_float(data.get("beta"), 0.0) or 0.0 return max(0.0, boost), max(0.0, beta) return 1.0, 0.0 def _scan_view(payload: Any) -> Mapping[str, Any]: """Normalize NG7 nested or NG8 flat scan payloads into one mapping.""" data = _jsonish(payload) if not isinstance(data, Mapping): return {} view: dict[str, Any] = dict(data) result = data.get("result") if isinstance(result, Mapping): view.update(result) return view def _scan_assets(scan: Mapping[str, Any]) -> list[str]: assets = scan.get("assets") if isinstance(assets, list) and assets: return [str(a).upper() for a in assets if a is not None] target = scan.get("target_asset") or scan.get("asset") return [str(target).upper()] if target else [] def _scan_prices(scan: Mapping[str, Any]) -> list[float]: prices = scan.get("asset_prices") or scan.get("prices") if not isinstance(prices, list): return [] out: list[float] = [] for value in prices: px = _coerce_float(value, None) if px is None or px <= 0: continue out.append(px) return out @dataclass class LiveBlueScanHistory: """Stateful scan history mirror for DC confirmation. BLUE computes `dc_status` from the active asset price history. This helper keeps a read-only replay of the published scan stream so VIOLET can reproduce that state without touching BLUE or inventing a new schema. """ maxlen: int = 32 trade_direction: int = -1 _prices: dict[str, Deque[float]] = field(default_factory=dict) last_scan_number: Optional[int] = None def ingest_scan(self, scan_payload: Any) -> Mapping[str, Any]: scan = _scan_view(scan_payload) assets = _scan_assets(scan) prices = _scan_prices(scan) for asset, price in zip(assets, prices): if price <= 0: continue hist = self._prices.setdefault(asset, deque(maxlen=self.maxlen)) hist.append(float(price)) sn = _coerce_float(scan.get("scan_number"), None) if sn is not None: self.last_scan_number = int(sn) return scan def price_history(self, asset: str) -> list[float]: return list(self._prices.get(str(asset).upper(), ())) def market_data(self, lookback: int) -> dict[str, list[float]]: need = max(1, int(lookback) + 1) return {asset: list(hist) for asset, hist in self._prices.items() if len(hist) >= need} def dc_status(self, scan_payload: Any, *, already_ingested: bool = False) -> str: scan = _scan_view(scan_payload) if already_ingested else self.ingest_scan(scan_payload) asset = _scan_assets(scan) if not asset: return "NONE" vel_div = _coerce_float(scan.get("vel_div"), None) if vel_div is None: return "NONE" history = self.price_history(asset[0]) if not history: return "NONE" signal_gen = AlphaSignalGenerator() sig = signal_gen.generate( vel_div=vel_div, vel_div_history=None, asset_price_history=history, trade_direction=self.trade_direction, asset=asset[0], current_timestamp=_coerce_float(scan.get("timestamp"), 0.0) or 0.0, ) return sig.dc_status class HazelcastOBProvider(OBProvider): """Read the current BLUE OB shards directly from Hazelcast.""" def __init__(self, client: hazelcast.HazelcastClient): self.client = client def _asset_keys(self) -> list[str]: try: keys = self.client.get_map("DOLPHIN_FEATURES").blocking().key_set() except Exception: return [] assets = [] for key in keys: if not isinstance(key, str) or not key.startswith("asset_") or not key.endswith("_ob"): continue asset = key[len("asset_"):-len("_ob")] if asset and asset not in assets: assets.append(asset) return sorted(assets) def _read_snapshot(self, asset: str) -> Optional[OBSnapshot]: raw = _read_hz_map(self.client, "DOLPHIN_FEATURES", f"asset_{asset}_ob") data = _jsonish(raw) if not isinstance(data, Mapping): return None bid_notional = np.array( [_coerce_float(v, 0.0) or 0.0 for v in data.get("bid_notional", [0, 0, 0, 0, 0])][:5], dtype=np.float64, ) ask_notional = np.array( [_coerce_float(v, 0.0) or 0.0 for v in data.get("ask_notional", [0, 0, 0, 0, 0])][:5], dtype=np.float64, ) bid_depth = np.array( [_coerce_float(v, 0.0) or 0.0 for v in data.get("bid_depth", [0, 0, 0, 0, 0])][:5], dtype=np.float64, ) ask_depth = np.array( [_coerce_float(v, 0.0) or 0.0 for v in data.get("ask_depth", [0, 0, 0, 0, 0])][:5], dtype=np.float64, ) ts = _coerce_float(data.get("timestamp"), 0.0) or 0.0 if ( bid_notional.shape != (5,) or ask_notional.shape != (5,) or bid_depth.shape != (5,) or ask_depth.shape != (5,) ): return None if np.any(bid_notional < 0) or np.any(ask_notional < 0): return None if np.any(bid_depth < 0) or np.any(ask_depth < 0): return None return OBSnapshot( timestamp=ts, asset=asset, bid_notional=bid_notional, ask_notional=ask_notional, bid_depth=bid_depth, ask_depth=ask_depth, ) def get_snapshot(self, asset: str, timestamp: float) -> Optional[OBSnapshot]: return self._read_snapshot(asset) def get_assets(self) -> list[str]: return self._asset_keys() def get_all_timestamps(self, asset: str) -> np.ndarray: snap = self._read_snapshot(asset) if snap is None: return np.array([], dtype=np.float64) return np.array([snap.timestamp], dtype=np.float64) def get_snapshot_count(self, asset: str) -> int: return 1 if self._read_snapshot(asset) is not None else 0 @dataclass(frozen=True) class LiveBlueSourceResult: factors: SizingFactors acb_boost: float acb_beta: float mc_scale: float posture: str dc_status: str selected_asset: str def source_live_blue_sizing_factors( client: hazelcast.HazelcastClient, *, assets: Optional[Iterable[str]] = None, scan_history: Optional[LiveBlueScanHistory] = None, selector: Optional[VioletAssetSelector] = None, ) -> LiveBlueSourceResult: """Read the BLUE-published live surfaces and return a typed factor plane.""" scan_history = scan_history or LiveBlueScanHistory() selector = selector or VioletAssetSelector() engine_snapshot_raw = _read_hz_map(client, "DOLPHIN_STATE_BLUE", "latest_nautilus") if engine_snapshot_raw is None: engine_snapshot_raw = _read_hz_map(client, "DOLPHIN_STATE_BLUE", "engine_snapshot") engine_snapshot = _jsonish(engine_snapshot_raw) if isinstance(engine_snapshot, str): try: engine_snapshot = json.loads(engine_snapshot) except Exception: engine_snapshot = {} posture = posture_from_engine_snapshot(engine_snapshot if isinstance(engine_snapshot, Mapping) else None) esof_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "esof_latest") if esof_raw is None: esof_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "esof_advisor_latest") esof_score = esof_score_from_features(esof_raw) acb_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "acb_boost") if acb_raw is None: acb_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "acb_boost_short") acb_boost, acb_beta = _extract_acb(acb_raw) mc_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "mc_forewarner_latest") mc_scale = _map_status_to_mc_scale(mc_raw) scan_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "latest_eigen_scan") scan = scan_history.ingest_scan(scan_raw) scan_assets = _scan_assets(scan) trade_direction = int( _coerce_float( engine_snapshot.get("trade_direction_runtime") if isinstance(engine_snapshot, Mapping) else None, None, ) or _coerce_float( engine_snapshot.get("trade_direction_base") if isinstance(engine_snapshot, Mapping) else None, None, ) or -1 ) candidate_market = scan_history.market_data(selector.lookback) pick = selector.pick(candidate_market, regime_direction=trade_direction) selected_asset = pick.asset if pick is not None else (scan_assets[0] if scan_assets else "") dc_status = scan_history.dc_status(scan, already_ingested=True) ob_provider = HazelcastOBProvider(client) ob_engine = OBFeatureEngine(ob_provider) ob_assets = list(assets) if assets is not None else (scan_assets or ob_provider.get_assets()) if ob_assets: try: ob_engine.step_live(ob_assets, bar_idx=0) market = ob_engine.get_market(0, ob_assets) ob_median_imbalance = float(market.median_imbalance) ob_agreement_pct = float(market.agreement_pct) except Exception: ob_median_imbalance = None ob_agreement_pct = None else: ob_median_imbalance = None ob_agreement_pct = None hz_snapshot = { "boost": acb_boost, "beta": acb_beta, "mc_scale": mc_scale, "esof_score": esof_score, "ob_median_imbalance": ob_median_imbalance, "ob_agreement_pct": ob_agreement_pct, "dc_status": dc_status, "posture": posture, } factors = extract_live_sizing_factors(hz_snapshot=hz_snapshot) return LiveBlueSourceResult( factors=factors, acb_boost=acb_boost, acb_beta=acb_beta, mc_scale=mc_scale, posture=posture, dc_status=dc_status, selected_asset=selected_asset, )