diff --git a/prod/clean_arch/violet/live_blue_source.py b/prod/clean_arch/violet/live_blue_source.py new file mode 100644 index 0000000..65157a7 --- /dev/null +++ b/prod/clean_arch/violet/live_blue_source.py @@ -0,0 +1,361 @@ +"""VIOLET V3.4c: read BLUE-published live organs from Hazelcast. + +This is VIOLET-only. BLUE is untouched. + +The adapter reads the published BLUE surfaces that already exist in HZ and +translates them into ``SizingFactors`` for the shadow path: + - ``posture`` from ``DOLPHIN_STATE_BLUE.latest_nautilus`` / ``engine_snapshot`` + - ``esof_score`` from ``DOLPHIN_FEATURES.esof_latest`` or ``esof_advisor_latest`` + - ``acb_boost`` / ``acb_beta`` from ``DOLPHIN_FEATURES.acb_boost`` + - ``mc_scale`` from ``DOLPHIN_FEATURES.mc_forewarner_latest`` + - OB market consensus from the live ``asset_*_ob`` maps via BLUE's own + ``OBFeatureEngine`` + +The remaining DC signal is left neutral here for now. It needs the same live +signal-history path BLUE uses and should be added as a separate mirror step. +""" + +from __future__ import annotations + +import json +import sys +from collections import deque +from collections.abc import Mapping +from dataclasses import dataclass +from dataclasses import field +from pathlib import Path +from typing import Any, Deque, Iterable, Optional + +import hazelcast +import numpy as np + +_PROJECT_ROOT = Path(__file__).resolve().parents[3] +for _p in (str(_PROJECT_ROOT), str(_PROJECT_ROOT / "nautilus_dolphin")): + if _p not in sys.path: + sys.path.insert(0, _p) + +from nautilus_dolphin.nautilus.ob_features import OBFeatureEngine +from nautilus_dolphin.nautilus.ob_provider import OBSnapshot, OBProvider +from nautilus_dolphin.nautilus.alpha_signal_generator import AlphaSignalGenerator + +from .alpha_wrappers import VioletAssetSelector +from .decision_engine import SizingFactors +from .live_factor_source import esof_score_from_features, posture_from_engine_snapshot +from .live_factors import extract_live_sizing_factors + + +def _jsonish(value: Any) -> Any: + if isinstance(value, str): + try: + return json.loads(value) + except Exception: + return value + return value + + +def _coerce_float(value: Any, default: Optional[float] = None) -> Optional[float]: + try: + if value is None: + return default + out = float(value) + if not np.isfinite(out): + return default + return out + except (TypeError, ValueError): + return default + + +def _read_hz_map(client: hazelcast.HazelcastClient, map_name: str, key: str) -> Any: + try: + return client.get_map(map_name).blocking().get(key) + except Exception: + return None + + +def _map_status_to_mc_scale(payload: Any) -> float: + data = _jsonish(payload) + if isinstance(data, Mapping): + status = str(data.get("status", "")).upper() + else: + status = str(data).upper() + return 0.5 if status == "ORANGE" else 1.0 + + +def _extract_acb(payload: Any) -> tuple[float, float]: + data = _jsonish(payload) + if isinstance(data, Mapping): + boost = _coerce_float(data.get("boost"), 1.0) or 1.0 + beta = _coerce_float(data.get("beta"), 0.0) or 0.0 + return max(0.0, boost), max(0.0, beta) + return 1.0, 0.0 + + +def _scan_view(payload: Any) -> Mapping[str, Any]: + """Normalize NG7 nested or NG8 flat scan payloads into one mapping.""" + data = _jsonish(payload) + if not isinstance(data, Mapping): + return {} + view: dict[str, Any] = dict(data) + result = data.get("result") + if isinstance(result, Mapping): + view.update(result) + return view + + +def _scan_assets(scan: Mapping[str, Any]) -> list[str]: + assets = scan.get("assets") + if isinstance(assets, list) and assets: + return [str(a).upper() for a in assets if a is not None] + target = scan.get("target_asset") or scan.get("asset") + return [str(target).upper()] if target else [] + + +def _scan_prices(scan: Mapping[str, Any]) -> list[float]: + prices = scan.get("asset_prices") or scan.get("prices") + if not isinstance(prices, list): + return [] + out: list[float] = [] + for value in prices: + px = _coerce_float(value, None) + if px is None or px <= 0: + continue + out.append(px) + return out + + +@dataclass +class LiveBlueScanHistory: + """Stateful scan history mirror for DC confirmation. + + BLUE computes `dc_status` from the active asset price history. This helper keeps + a read-only replay of the published scan stream so VIOLET can reproduce that + state without touching BLUE or inventing a new schema. + """ + + maxlen: int = 32 + trade_direction: int = -1 + _prices: dict[str, Deque[float]] = field(default_factory=dict) + last_scan_number: Optional[int] = None + + def ingest_scan(self, scan_payload: Any) -> Mapping[str, Any]: + scan = _scan_view(scan_payload) + assets = _scan_assets(scan) + prices = _scan_prices(scan) + for asset, price in zip(assets, prices): + if price <= 0: + continue + hist = self._prices.setdefault(asset, deque(maxlen=self.maxlen)) + hist.append(float(price)) + sn = _coerce_float(scan.get("scan_number"), None) + if sn is not None: + self.last_scan_number = int(sn) + return scan + + def price_history(self, asset: str) -> list[float]: + return list(self._prices.get(str(asset).upper(), ())) + + def market_data(self, lookback: int) -> dict[str, list[float]]: + need = max(1, int(lookback) + 1) + return {asset: list(hist) for asset, hist in self._prices.items() if len(hist) >= need} + + def dc_status(self, scan_payload: Any, *, already_ingested: bool = False) -> str: + scan = _scan_view(scan_payload) if already_ingested else self.ingest_scan(scan_payload) + asset = _scan_assets(scan) + if not asset: + return "NONE" + vel_div = _coerce_float(scan.get("vel_div"), None) + if vel_div is None: + return "NONE" + history = self.price_history(asset[0]) + if not history: + return "NONE" + signal_gen = AlphaSignalGenerator() + sig = signal_gen.generate( + vel_div=vel_div, + vel_div_history=None, + asset_price_history=history, + trade_direction=self.trade_direction, + asset=asset[0], + current_timestamp=_coerce_float(scan.get("timestamp"), 0.0) or 0.0, + ) + return sig.dc_status + + +class HazelcastOBProvider(OBProvider): + """Read the current BLUE OB shards directly from Hazelcast.""" + + def __init__(self, client: hazelcast.HazelcastClient): + self.client = client + + def _asset_keys(self) -> list[str]: + try: + keys = self.client.get_map("DOLPHIN_FEATURES").blocking().key_set() + except Exception: + return [] + assets = [] + for key in keys: + if not isinstance(key, str) or not key.startswith("asset_") or not key.endswith("_ob"): + continue + asset = key[len("asset_"):-len("_ob")] + if asset and asset not in assets: + assets.append(asset) + return sorted(assets) + + def _read_snapshot(self, asset: str) -> Optional[OBSnapshot]: + raw = _read_hz_map(self.client, "DOLPHIN_FEATURES", f"asset_{asset}_ob") + data = _jsonish(raw) + if not isinstance(data, Mapping): + return None + bid_notional = np.array( + [_coerce_float(v, 0.0) or 0.0 for v in data.get("bid_notional", [0, 0, 0, 0, 0])][:5], + dtype=np.float64, + ) + ask_notional = np.array( + [_coerce_float(v, 0.0) or 0.0 for v in data.get("ask_notional", [0, 0, 0, 0, 0])][:5], + dtype=np.float64, + ) + bid_depth = np.array( + [_coerce_float(v, 0.0) or 0.0 for v in data.get("bid_depth", [0, 0, 0, 0, 0])][:5], + dtype=np.float64, + ) + ask_depth = np.array( + [_coerce_float(v, 0.0) or 0.0 for v in data.get("ask_depth", [0, 0, 0, 0, 0])][:5], + dtype=np.float64, + ) + ts = _coerce_float(data.get("timestamp"), 0.0) or 0.0 + if ( + bid_notional.shape != (5,) or ask_notional.shape != (5,) + or bid_depth.shape != (5,) or ask_depth.shape != (5,) + ): + return None + if np.any(bid_notional < 0) or np.any(ask_notional < 0): + return None + if np.any(bid_depth < 0) or np.any(ask_depth < 0): + return None + return OBSnapshot( + timestamp=ts, + asset=asset, + bid_notional=bid_notional, + ask_notional=ask_notional, + bid_depth=bid_depth, + ask_depth=ask_depth, + ) + + def get_snapshot(self, asset: str, timestamp: float) -> Optional[OBSnapshot]: + return self._read_snapshot(asset) + + def get_assets(self) -> list[str]: + return self._asset_keys() + + def get_all_timestamps(self, asset: str) -> np.ndarray: + snap = self._read_snapshot(asset) + if snap is None: + return np.array([], dtype=np.float64) + return np.array([snap.timestamp], dtype=np.float64) + + def get_snapshot_count(self, asset: str) -> int: + return 1 if self._read_snapshot(asset) is not None else 0 + + +@dataclass(frozen=True) +class LiveBlueSourceResult: + factors: SizingFactors + acb_boost: float + acb_beta: float + mc_scale: float + posture: str + dc_status: str + selected_asset: str + + +def source_live_blue_sizing_factors( + client: hazelcast.HazelcastClient, + *, + assets: Optional[Iterable[str]] = None, + scan_history: Optional[LiveBlueScanHistory] = None, + selector: Optional[VioletAssetSelector] = None, +) -> LiveBlueSourceResult: + """Read the BLUE-published live surfaces and return a typed factor plane.""" + scan_history = scan_history or LiveBlueScanHistory() + selector = selector or VioletAssetSelector() + + engine_snapshot_raw = _read_hz_map(client, "DOLPHIN_STATE_BLUE", "latest_nautilus") + if engine_snapshot_raw is None: + engine_snapshot_raw = _read_hz_map(client, "DOLPHIN_STATE_BLUE", "engine_snapshot") + engine_snapshot = _jsonish(engine_snapshot_raw) + if isinstance(engine_snapshot, str): + try: + engine_snapshot = json.loads(engine_snapshot) + except Exception: + engine_snapshot = {} + + posture = posture_from_engine_snapshot(engine_snapshot if isinstance(engine_snapshot, Mapping) else None) + + esof_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "esof_latest") + if esof_raw is None: + esof_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "esof_advisor_latest") + esof_score = esof_score_from_features(esof_raw) + + acb_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "acb_boost") + if acb_raw is None: + acb_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "acb_boost_short") + acb_boost, acb_beta = _extract_acb(acb_raw) + + mc_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "mc_forewarner_latest") + mc_scale = _map_status_to_mc_scale(mc_raw) + + scan_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "latest_eigen_scan") + scan = scan_history.ingest_scan(scan_raw) + scan_assets = _scan_assets(scan) + trade_direction = int( + _coerce_float( + engine_snapshot.get("trade_direction_runtime") if isinstance(engine_snapshot, Mapping) else None, + None, + ) + or _coerce_float( + engine_snapshot.get("trade_direction_base") if isinstance(engine_snapshot, Mapping) else None, + None, + ) + or -1 + ) + candidate_market = scan_history.market_data(selector.lookback) + pick = selector.pick(candidate_market, regime_direction=trade_direction) + selected_asset = pick.asset if pick is not None else (scan_assets[0] if scan_assets else "") + dc_status = scan_history.dc_status(scan, already_ingested=True) + + ob_provider = HazelcastOBProvider(client) + ob_engine = OBFeatureEngine(ob_provider) + ob_assets = list(assets) if assets is not None else (scan_assets or ob_provider.get_assets()) + if ob_assets: + try: + ob_engine.step_live(ob_assets, bar_idx=0) + market = ob_engine.get_market(0, ob_assets) + ob_median_imbalance = float(market.median_imbalance) + ob_agreement_pct = float(market.agreement_pct) + except Exception: + ob_median_imbalance = None + ob_agreement_pct = None + else: + ob_median_imbalance = None + ob_agreement_pct = None + + hz_snapshot = { + "boost": acb_boost, + "beta": acb_beta, + "mc_scale": mc_scale, + "esof_score": esof_score, + "ob_median_imbalance": ob_median_imbalance, + "ob_agreement_pct": ob_agreement_pct, + "dc_status": dc_status, + "posture": posture, + } + factors = extract_live_sizing_factors(hz_snapshot=hz_snapshot) + return LiveBlueSourceResult( + factors=factors, + acb_boost=acb_boost, + acb_beta=acb_beta, + mc_scale=mc_scale, + posture=posture, + dc_status=dc_status, + selected_asset=selected_asset, + ) diff --git a/prod/clean_arch/violet/test_violet_live_blue_source.py b/prod/clean_arch/violet/test_violet_live_blue_source.py new file mode 100644 index 0000000..c1ccfa4 --- /dev/null +++ b/prod/clean_arch/violet/test_violet_live_blue_source.py @@ -0,0 +1,381 @@ +from __future__ import annotations + +import json +import sys +from dataclasses import dataclass + +import pytest + +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import hazelcast + +from prod.clean_arch.violet.decision_engine import SizingFactors +from prod.clean_arch.violet.live_blue_source import ( + HazelcastOBProvider, + LiveBlueScanHistory, + source_live_blue_sizing_factors, +) +from prod.clean_arch.violet.alpha_wrappers import VioletAssetSelector +from nautilus_dolphin.nautilus.alpha_signal_generator import AlphaSignalGenerator + + +@dataclass +class _FakeMap: + payloads: dict + + def get(self, key): + return self.payloads.get(key) + + def key_set(self): + return list(self.payloads.keys()) + + +class _FakeBlocking: + def __init__(self, payloads): + self._payloads = payloads + + def get(self, key): + return self._payloads.get(key) + + def key_set(self): + return list(self._payloads.keys()) + + +class _FakeClient: + def __init__(self, maps): + self._maps = maps + + def get_map(self, name): + return type("M", (), {"blocking": lambda self2: _FakeBlocking(self._maps[name])})() + + +def test_hz_ob_provider_filters_and_parses_latest_payload(): + client = _FakeClient( + { + "DOLPHIN_FEATURES": { + "asset_BTCUSDT_ob": json.dumps( + {"timestamp": 1.0, "bid_notional": [1, 2, 3, 4, 5], "ask_notional": [5, 4, 3, 2, 1], + "bid_depth": [1, 1, 1, 1, 1], "ask_depth": [1, 1, 1, 1, 1]} + ), + "asset_XRPUSDT_ob": json.dumps( + {"timestamp": 2.0, "bid_notional": [-1, 2, 3, 4, 5], "ask_notional": [5, 4, 3, 2, 1], + "bid_depth": [1, 1, 1, 1, 1], "ask_depth": [1, 1, 1, 1, 1]} + ), + "acb_boost": json.dumps({"boost": 1.2, "beta": 0.3}), + "mc_forewarner_latest": json.dumps({"status": "ORANGE"}), + "esof_latest": json.dumps({"advisory_score": 0.4}), + }, + "DOLPHIN_STATE_BLUE": {"latest_nautilus": json.dumps({"posture": "restored"})}, + } + ) + provider = HazelcastOBProvider(client) # type: ignore[arg-type] + assert provider.get_assets() == ["BTCUSDT", "XRPUSDT"] + snap = provider.get_snapshot("BTCUSDT", 0.0) + assert snap is not None + assert snap.asset == "BTCUSDT" + assert snap.bid_notional.tolist() == [1.0, 2.0, 3.0, 4.0, 5.0] + + +def test_source_live_blue_sizing_factors_unit(monkeypatch): + class FakeEngine: + def __init__(self, provider): + self.provider = provider + def step_live(self, assets, bar_idx): + assert "BTCUSDT" in assets + def get_market(self, ts, assets): + return type("M", (), {"median_imbalance": 0.12, "agreement_pct": 0.91})() + + monkeypatch.setattr("prod.clean_arch.violet.live_blue_source.OBFeatureEngine", FakeEngine) + history = LiveBlueScanHistory(maxlen=16, trade_direction=-1) + for idx, px in enumerate([100.0, 99.5, 99.0, 98.4, 97.8, 97.0, 96.2], start=1): + history.ingest_scan( + { + "scan_number": idx, + "timestamp": float(idx), + "vel_div": -0.031, + "target_asset": "BTCUSDT", + "assets": ["BTCUSDT"], + "asset_prices": [px], + } + ) + client = _FakeClient( + { + "DOLPHIN_FEATURES": { + "asset_BTCUSDT_ob": json.dumps( + {"timestamp": 1.0, "bid_notional": [1, 2, 3, 4, 5], "ask_notional": [5, 4, 3, 2, 1], + "bid_depth": [1, 1, 1, 1, 1], "ask_depth": [1, 1, 1, 1, 1]} + ), + "acb_boost": json.dumps({"boost": 1.4, "beta": 0.2}), + "mc_forewarner_latest": json.dumps({"status": "ORANGE"}), + "esof_latest": json.dumps({"advisory_score": 0.4}), + "latest_eigen_scan": json.dumps( + { + "scan_number": 8, + "timestamp": 8.0, + "vel_div": -0.031, + "target_asset": "BTCUSDT", + "assets": ["BTCUSDT"], + "asset_prices": [95.5], + } + ), + }, + "DOLPHIN_STATE_BLUE": {"latest_nautilus": json.dumps({"posture": "stalker"})}, + } + ) + res = source_live_blue_sizing_factors( + client, + assets=["BTCUSDT"], + scan_history=history, + selector=VioletAssetSelector(lookback_horizon=7), + ) + assert isinstance(res.factors, SizingFactors) + assert res.factors.posture == "STALKER" + assert res.factors.mc_scale == 0.5 + assert res.factors.boost == 1.4 + assert res.factors.beta == 0.2 + assert res.factors.esof_score == 0.4 + assert res.factors.ob_median_imbalance == 0.12 + assert res.factors.ob_agreement_pct == 0.91 + assert res.factors.dc_status == "CONFIRM" + assert res.selected_asset == "BTCUSDT" + + +def test_source_live_blue_sizing_factors_preserves_skip_contradict(monkeypatch): + class FakeEngine: + def __init__(self, provider): + self.provider = provider + def step_live(self, assets, bar_idx): + pass + def get_market(self, ts, assets): + return type("M", (), {"median_imbalance": 0.0, "agreement_pct": 0.0})() + + monkeypatch.setattr("prod.clean_arch.violet.live_blue_source.OBFeatureEngine", FakeEngine) + history = LiveBlueScanHistory(maxlen=16, trade_direction=-1) + for idx, px in enumerate([100.0, 100.5, 101.0, 101.6, 102.2, 102.9, 103.6], start=1): + history.ingest_scan( + { + "scan_number": idx, + "timestamp": float(idx), + "vel_div": -0.031, + "target_asset": "BTCUSDT", + "assets": ["BTCUSDT"], + "asset_prices": [px], + } + ) + client = _FakeClient( + { + "DOLPHIN_FEATURES": { + "asset_BTCUSDT_ob": json.dumps( + {"timestamp": 1.0, "bid_notional": [1, 2, 3, 4, 5], "ask_notional": [5, 4, 3, 2, 1], + "bid_depth": [1, 1, 1, 1, 1], "ask_depth": [1, 1, 1, 1, 1]} + ), + "acb_boost": json.dumps({"boost": 1.4, "beta": 0.2}), + "mc_forewarner_latest": json.dumps({"status": "GREEN"}), + "esof_latest": json.dumps({"advisory_score": 0.4}), + "latest_eigen_scan": json.dumps( + { + "scan_number": 8, + "timestamp": 8.0, + "vel_div": -0.031, + "target_asset": "BTCUSDT", + "assets": ["BTCUSDT"], + "asset_prices": [104.2], + } + ), + }, + "DOLPHIN_STATE_BLUE": {"latest_nautilus": json.dumps({"posture": "APEX"})}, + } + ) + res = source_live_blue_sizing_factors( + client, + assets=["BTCUSDT"], + scan_history=history, + selector=VioletAssetSelector(lookback_horizon=7), + ) + assert res.factors.dc_status == "SKIP_CONTRADICT" + + +def test_live_blue_sequence_matches_blue_selector_and_dc_at_each_step(monkeypatch): + class FakeEngine: + def __init__(self, provider): + self.provider = provider + def step_live(self, assets, bar_idx): + pass + def get_market(self, ts, assets): + return type("M", (), {"median_imbalance": 0.0, "agreement_pct": 0.0})() + + monkeypatch.setattr("prod.clean_arch.violet.live_blue_source.OBFeatureEngine", FakeEngine) + selector = VioletAssetSelector(lookback_horizon=7) + history = LiveBlueScanHistory(maxlen=16, trade_direction=-1) + signal_gen = AlphaSignalGenerator() + + scans = [ + { + "scan_number": 1, + "timestamp": 1.0, + "vel_div": -0.010, + "assets": ["BTCUSDT", "ETHUSDT"], + "asset_prices": [100.0, 200.0], + }, + { + "scan_number": 2, + "timestamp": 2.0, + "vel_div": -0.031, + "assets": ["BTCUSDT", "ETHUSDT"], + "asset_prices": [99.0, 198.0], + }, + { + "scan_number": 3, + "timestamp": 3.0, + "vel_div": -0.041, + "assets": ["BTCUSDT", "ETHUSDT"], + "asset_prices": [98.0, 196.0], + }, + { + "scan_number": 4, + "timestamp": 4.0, + "vel_div": -0.031, + "assets": ["BTCUSDT", "ETHUSDT"], + "asset_prices": [97.0, 194.0], + }, + ] + + for idx, scan in enumerate(scans, start=1): + client = _FakeClient( + { + "DOLPHIN_FEATURES": { + "asset_BTCUSDT_ob": json.dumps( + {"timestamp": 1.0, "bid_notional": [1, 2, 3, 4, 5], "ask_notional": [5, 4, 3, 2, 1], + "bid_depth": [1, 1, 1, 1, 1], "ask_depth": [1, 1, 1, 1, 1]} + ), + "asset_ETHUSDT_ob": json.dumps( + {"timestamp": 1.0, "bid_notional": [2, 3, 4, 5, 6], "ask_notional": [6, 5, 4, 3, 2], + "bid_depth": [1, 1, 1, 1, 1], "ask_depth": [1, 1, 1, 1, 1]} + ), + "acb_boost": json.dumps({"boost": 1.0, "beta": 0.0}), + "mc_forewarner_latest": json.dumps({"status": "GREEN"}), + "esof_latest": json.dumps({"advisory_score": 0.3}), + "latest_eigen_scan": json.dumps({**scan, "target_asset": "BTCUSDT"}), + }, + "DOLPHIN_STATE_BLUE": {"latest_nautilus": json.dumps({"posture": "APEX"})}, + } + ) + res = source_live_blue_sizing_factors( + client, + assets=["BTCUSDT", "ETHUSDT"], + scan_history=history, + selector=selector, + ) + + # BLUE selector parity + market = history.market_data(selector.lookback) + expected_pick = selector.pick(market, regime_direction=-1) + expected_asset = expected_pick.asset if expected_pick is not None else "BTCUSDT" + assert res.selected_asset == expected_asset + + # BLUE signal parity + expected_signal = signal_gen.generate( + vel_div=float(scan["vel_div"]), + vel_div_history=None, + asset_price_history=history.price_history(expected_asset), + trade_direction=-1, + asset=expected_asset, + current_timestamp=float(scan["timestamp"]), + ) + assert res.factors.dc_status == expected_signal.dc_status + + +def test_live_blue_sequence_rejects_anomalous_values_without_poisoning_history(monkeypatch): + class FakeEngine: + def __init__(self, provider): + self.provider = provider + def step_live(self, assets, bar_idx): + pass + def get_market(self, ts, assets): + return type("M", (), {"median_imbalance": 0.0, "agreement_pct": 0.0})() + + monkeypatch.setattr("prod.clean_arch.violet.live_blue_source.OBFeatureEngine", FakeEngine) + history = LiveBlueScanHistory(maxlen=16, trade_direction=-1) + selector = VioletAssetSelector(lookback_horizon=7) + scan = { + "scan_number": 99, + "timestamp": 99.0, + "vel_div": -0.031, + "assets": ["BTCUSDT", "ETHUSDT"], + "asset_prices": [float("nan"), -1.0], + "target_asset": "BTCUSDT", + } + client = _FakeClient( + { + "DOLPHIN_FEATURES": { + "asset_BTCUSDT_ob": json.dumps( + {"timestamp": 1.0, "bid_notional": [1, 2, 3, 4, 5], "ask_notional": [5, 4, 3, 2, 1], + "bid_depth": [1, 1, 1, 1, 1], "ask_depth": [1, 1, 1, 1, 1]} + ), + "acb_boost": json.dumps({"boost": 1.0, "beta": 0.0}), + "mc_forewarner_latest": json.dumps({"status": "GREEN"}), + "esof_latest": json.dumps({"advisory_score": 0.3}), + "latest_eigen_scan": json.dumps(scan), + }, + "DOLPHIN_STATE_BLUE": {"latest_nautilus": json.dumps({"posture": "APEX"})}, + } + ) + res = source_live_blue_sizing_factors( + client, + assets=["BTCUSDT", "ETHUSDT"], + scan_history=history, + selector=selector, + ) + assert res.selected_asset == "BTCUSDT" + assert res.factors.dc_status == "NONE" + assert history.price_history("BTCUSDT") == [] + assert history.price_history("ETHUSDT") == [] + + +def test_source_live_blue_sizing_factors_handles_anomalies(monkeypatch): + class FakeEngine: + def __init__(self, provider): + self.provider = provider + def step_live(self, assets, bar_idx): + raise RuntimeError("boom") + def get_market(self, ts, assets): + return type("M", (), {"median_imbalance": 0.0, "agreement_pct": 0.0})() + + monkeypatch.setattr("prod.clean_arch.violet.live_blue_source.OBFeatureEngine", FakeEngine) + client = _FakeClient( + { + "DOLPHIN_FEATURES": { + "asset_BTCUSDT_ob": json.dumps( + {"timestamp": 1.0, "bid_notional": [1, 2, 3, 4, 5], "ask_notional": [5, 4, 3, 2, 1], + "bid_depth": [1, 1, 1, 1, 1], "ask_depth": [1, 1, 1, 1, 1]} + ), + "acb_boost": json.dumps({"boost": -9, "beta": "bad"}), + "mc_forewarner_latest": json.dumps({"status": "GREEN"}), + "esof_latest": "not json", + "latest_eigen_scan": json.dumps({"target_asset": "BTCUSDT", "assets": ["BTCUSDT"], "asset_prices": [0]}), + }, + "DOLPHIN_STATE_BLUE": {"latest_nautilus": json.dumps({"posture": ""})}, + } + ) + res = source_live_blue_sizing_factors(client, assets=["BTCUSDT"]) + assert res.factors.posture == "APEX" + assert res.factors.mc_scale == 1.0 + assert res.factors.boost == 0.0 + assert res.factors.beta == 0.0 + assert res.factors.esof_score is None + assert res.factors.ob_median_imbalance is None + assert res.factors.ob_agreement_pct is None + assert res.factors.dc_status == "NONE" + assert res.selected_asset == "BTCUSDT" + + +def test_live_hz_smoke_reads_current_state(): + client = hazelcast.HazelcastClient(cluster_name="dolphin", cluster_members=["localhost:5701"]) + try: + res = source_live_blue_sizing_factors(client, assets=["BTCUSDT", "ETHUSDT", "XRPUSDT"]) + finally: + client.shutdown() + assert isinstance(res.factors, SizingFactors) + assert res.factors.posture in {"APEX", "STALKER", "RESTORED", "TURTLE", "HIBERNATE"} + assert res.factors.mc_scale in {0.5, 1.0}