Files
siloqy/prod/clean_arch/violet/live_blue_source.py

362 lines
13 KiB
Python

"""VIOLET V3.4c: read BLUE-published live organs from Hazelcast.
This is VIOLET-only. BLUE is untouched.
The adapter reads the published BLUE surfaces that already exist in HZ and
translates them into ``SizingFactors`` for the shadow path:
- ``posture`` from ``DOLPHIN_STATE_BLUE.latest_nautilus`` / ``engine_snapshot``
- ``esof_score`` from ``DOLPHIN_FEATURES.esof_latest`` or ``esof_advisor_latest``
- ``acb_boost`` / ``acb_beta`` from ``DOLPHIN_FEATURES.acb_boost``
- ``mc_scale`` from ``DOLPHIN_FEATURES.mc_forewarner_latest``
- OB market consensus from the live ``asset_*_ob`` maps via BLUE's own
``OBFeatureEngine``
The remaining DC signal is left neutral here for now. It needs the same live
signal-history path BLUE uses and should be added as a separate mirror step.
"""
from __future__ import annotations
import json
import sys
from collections import deque
from collections.abc import Mapping
from dataclasses import dataclass
from dataclasses import field
from pathlib import Path
from typing import Any, Deque, Iterable, Optional
import hazelcast
import numpy as np
_PROJECT_ROOT = Path(__file__).resolve().parents[3]
for _p in (str(_PROJECT_ROOT), str(_PROJECT_ROOT / "nautilus_dolphin")):
if _p not in sys.path:
sys.path.insert(0, _p)
from nautilus_dolphin.nautilus.ob_features import OBFeatureEngine
from nautilus_dolphin.nautilus.ob_provider import OBSnapshot, OBProvider
from nautilus_dolphin.nautilus.alpha_signal_generator import AlphaSignalGenerator
from .alpha_wrappers import VioletAssetSelector
from .decision_engine import SizingFactors
from .live_factor_source import esof_score_from_features, posture_from_engine_snapshot
from .live_factors import extract_live_sizing_factors
def _jsonish(value: Any) -> Any:
if isinstance(value, str):
try:
return json.loads(value)
except Exception:
return value
return value
def _coerce_float(value: Any, default: Optional[float] = None) -> Optional[float]:
try:
if value is None:
return default
out = float(value)
if not np.isfinite(out):
return default
return out
except (TypeError, ValueError):
return default
def _read_hz_map(client: hazelcast.HazelcastClient, map_name: str, key: str) -> Any:
try:
return client.get_map(map_name).blocking().get(key)
except Exception:
return None
def _map_status_to_mc_scale(payload: Any) -> float:
data = _jsonish(payload)
if isinstance(data, Mapping):
status = str(data.get("status", "")).upper()
else:
status = str(data).upper()
return 0.5 if status == "ORANGE" else 1.0
def _extract_acb(payload: Any) -> tuple[float, float]:
data = _jsonish(payload)
if isinstance(data, Mapping):
boost = _coerce_float(data.get("boost"), 1.0) or 1.0
beta = _coerce_float(data.get("beta"), 0.0) or 0.0
return max(0.0, boost), max(0.0, beta)
return 1.0, 0.0
def _scan_view(payload: Any) -> Mapping[str, Any]:
"""Normalize NG7 nested or NG8 flat scan payloads into one mapping."""
data = _jsonish(payload)
if not isinstance(data, Mapping):
return {}
view: dict[str, Any] = dict(data)
result = data.get("result")
if isinstance(result, Mapping):
view.update(result)
return view
def _scan_assets(scan: Mapping[str, Any]) -> list[str]:
assets = scan.get("assets")
if isinstance(assets, list) and assets:
return [str(a).upper() for a in assets if a is not None]
target = scan.get("target_asset") or scan.get("asset")
return [str(target).upper()] if target else []
def _scan_prices(scan: Mapping[str, Any]) -> list[float]:
prices = scan.get("asset_prices") or scan.get("prices")
if not isinstance(prices, list):
return []
out: list[float] = []
for value in prices:
px = _coerce_float(value, None)
if px is None or px <= 0:
continue
out.append(px)
return out
@dataclass
class LiveBlueScanHistory:
"""Stateful scan history mirror for DC confirmation.
BLUE computes `dc_status` from the active asset price history. This helper keeps
a read-only replay of the published scan stream so VIOLET can reproduce that
state without touching BLUE or inventing a new schema.
"""
maxlen: int = 32
trade_direction: int = -1
_prices: dict[str, Deque[float]] = field(default_factory=dict)
last_scan_number: Optional[int] = None
def ingest_scan(self, scan_payload: Any) -> Mapping[str, Any]:
scan = _scan_view(scan_payload)
assets = _scan_assets(scan)
prices = _scan_prices(scan)
for asset, price in zip(assets, prices):
if price <= 0:
continue
hist = self._prices.setdefault(asset, deque(maxlen=self.maxlen))
hist.append(float(price))
sn = _coerce_float(scan.get("scan_number"), None)
if sn is not None:
self.last_scan_number = int(sn)
return scan
def price_history(self, asset: str) -> list[float]:
return list(self._prices.get(str(asset).upper(), ()))
def market_data(self, lookback: int) -> dict[str, list[float]]:
need = max(1, int(lookback) + 1)
return {asset: list(hist) for asset, hist in self._prices.items() if len(hist) >= need}
def dc_status(self, scan_payload: Any, *, already_ingested: bool = False) -> str:
scan = _scan_view(scan_payload) if already_ingested else self.ingest_scan(scan_payload)
asset = _scan_assets(scan)
if not asset:
return "NONE"
vel_div = _coerce_float(scan.get("vel_div"), None)
if vel_div is None:
return "NONE"
history = self.price_history(asset[0])
if not history:
return "NONE"
signal_gen = AlphaSignalGenerator()
sig = signal_gen.generate(
vel_div=vel_div,
vel_div_history=None,
asset_price_history=history,
trade_direction=self.trade_direction,
asset=asset[0],
current_timestamp=_coerce_float(scan.get("timestamp"), 0.0) or 0.0,
)
return sig.dc_status
class HazelcastOBProvider(OBProvider):
"""Read the current BLUE OB shards directly from Hazelcast."""
def __init__(self, client: hazelcast.HazelcastClient):
self.client = client
def _asset_keys(self) -> list[str]:
try:
keys = self.client.get_map("DOLPHIN_FEATURES").blocking().key_set()
except Exception:
return []
assets = []
for key in keys:
if not isinstance(key, str) or not key.startswith("asset_") or not key.endswith("_ob"):
continue
asset = key[len("asset_"):-len("_ob")]
if asset and asset not in assets:
assets.append(asset)
return sorted(assets)
def _read_snapshot(self, asset: str) -> Optional[OBSnapshot]:
raw = _read_hz_map(self.client, "DOLPHIN_FEATURES", f"asset_{asset}_ob")
data = _jsonish(raw)
if not isinstance(data, Mapping):
return None
bid_notional = np.array(
[_coerce_float(v, 0.0) or 0.0 for v in data.get("bid_notional", [0, 0, 0, 0, 0])][:5],
dtype=np.float64,
)
ask_notional = np.array(
[_coerce_float(v, 0.0) or 0.0 for v in data.get("ask_notional", [0, 0, 0, 0, 0])][:5],
dtype=np.float64,
)
bid_depth = np.array(
[_coerce_float(v, 0.0) or 0.0 for v in data.get("bid_depth", [0, 0, 0, 0, 0])][:5],
dtype=np.float64,
)
ask_depth = np.array(
[_coerce_float(v, 0.0) or 0.0 for v in data.get("ask_depth", [0, 0, 0, 0, 0])][:5],
dtype=np.float64,
)
ts = _coerce_float(data.get("timestamp"), 0.0) or 0.0
if (
bid_notional.shape != (5,) or ask_notional.shape != (5,)
or bid_depth.shape != (5,) or ask_depth.shape != (5,)
):
return None
if np.any(bid_notional < 0) or np.any(ask_notional < 0):
return None
if np.any(bid_depth < 0) or np.any(ask_depth < 0):
return None
return OBSnapshot(
timestamp=ts,
asset=asset,
bid_notional=bid_notional,
ask_notional=ask_notional,
bid_depth=bid_depth,
ask_depth=ask_depth,
)
def get_snapshot(self, asset: str, timestamp: float) -> Optional[OBSnapshot]:
return self._read_snapshot(asset)
def get_assets(self) -> list[str]:
return self._asset_keys()
def get_all_timestamps(self, asset: str) -> np.ndarray:
snap = self._read_snapshot(asset)
if snap is None:
return np.array([], dtype=np.float64)
return np.array([snap.timestamp], dtype=np.float64)
def get_snapshot_count(self, asset: str) -> int:
return 1 if self._read_snapshot(asset) is not None else 0
@dataclass(frozen=True)
class LiveBlueSourceResult:
factors: SizingFactors
acb_boost: float
acb_beta: float
mc_scale: float
posture: str
dc_status: str
selected_asset: str
def source_live_blue_sizing_factors(
client: hazelcast.HazelcastClient,
*,
assets: Optional[Iterable[str]] = None,
scan_history: Optional[LiveBlueScanHistory] = None,
selector: Optional[VioletAssetSelector] = None,
) -> LiveBlueSourceResult:
"""Read the BLUE-published live surfaces and return a typed factor plane."""
scan_history = scan_history or LiveBlueScanHistory()
selector = selector or VioletAssetSelector()
engine_snapshot_raw = _read_hz_map(client, "DOLPHIN_STATE_BLUE", "latest_nautilus")
if engine_snapshot_raw is None:
engine_snapshot_raw = _read_hz_map(client, "DOLPHIN_STATE_BLUE", "engine_snapshot")
engine_snapshot = _jsonish(engine_snapshot_raw)
if isinstance(engine_snapshot, str):
try:
engine_snapshot = json.loads(engine_snapshot)
except Exception:
engine_snapshot = {}
posture = posture_from_engine_snapshot(engine_snapshot if isinstance(engine_snapshot, Mapping) else None)
esof_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "esof_latest")
if esof_raw is None:
esof_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "esof_advisor_latest")
esof_score = esof_score_from_features(esof_raw)
acb_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "acb_boost")
if acb_raw is None:
acb_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "acb_boost_short")
acb_boost, acb_beta = _extract_acb(acb_raw)
mc_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "mc_forewarner_latest")
mc_scale = _map_status_to_mc_scale(mc_raw)
scan_raw = _read_hz_map(client, "DOLPHIN_FEATURES", "latest_eigen_scan")
scan = scan_history.ingest_scan(scan_raw)
scan_assets = _scan_assets(scan)
trade_direction = int(
_coerce_float(
engine_snapshot.get("trade_direction_runtime") if isinstance(engine_snapshot, Mapping) else None,
None,
)
or _coerce_float(
engine_snapshot.get("trade_direction_base") if isinstance(engine_snapshot, Mapping) else None,
None,
)
or -1
)
candidate_market = scan_history.market_data(selector.lookback)
pick = selector.pick(candidate_market, regime_direction=trade_direction)
selected_asset = pick.asset if pick is not None else (scan_assets[0] if scan_assets else "")
dc_status = scan_history.dc_status(scan, already_ingested=True)
ob_provider = HazelcastOBProvider(client)
ob_engine = OBFeatureEngine(ob_provider)
ob_assets = list(assets) if assets is not None else (scan_assets or ob_provider.get_assets())
if ob_assets:
try:
ob_engine.step_live(ob_assets, bar_idx=0)
market = ob_engine.get_market(0, ob_assets)
ob_median_imbalance = float(market.median_imbalance)
ob_agreement_pct = float(market.agreement_pct)
except Exception:
ob_median_imbalance = None
ob_agreement_pct = None
else:
ob_median_imbalance = None
ob_agreement_pct = None
hz_snapshot = {
"boost": acb_boost,
"beta": acb_beta,
"mc_scale": mc_scale,
"esof_score": esof_score,
"ob_median_imbalance": ob_median_imbalance,
"ob_agreement_pct": ob_agreement_pct,
"dc_status": dc_status,
"posture": posture,
}
factors = extract_live_sizing_factors(hz_snapshot=hz_snapshot)
return LiveBlueSourceResult(
factors=factors,
acb_boost=acb_boost,
acb_beta=acb_beta,
mc_scale=mc_scale,
posture=posture,
dc_status=dc_status,
selected_asset=selected_asset,
)