diff --git a/prod/clean_arch/dita_v2/blue_parity.py b/prod/clean_arch/dita_v2/blue_parity.py new file mode 100644 index 0000000..a50f76f --- /dev/null +++ b/prod/clean_arch/dita_v2/blue_parity.py @@ -0,0 +1,254 @@ +"""BLUE-parity alpha components for the PINK DITAv2 runtime. + +PINK must trade the SAME universe with the SAME sizing curve as BLUE +(operator mandate 2026-06-10). This module wraps BLUE's exact production +kernels — no reimplementation, no drift: + + - ``AlphaAssetSelector`` (nautilus_dolphin/nautilus/alpha_asset_selector.py) + IRP ranking over the scan universe (SYSTEM BIBLE §5). The scan payload + carries ``assets`` + ``asset_prices`` for the full ~50-asset universe on + every scan; PINK accumulates per-asset price history here and ranks at + signal time, exactly like BLUE's engine does. + + - ``AlphaBetSizer`` (nautilus_dolphin/nautilus/alpha_bet_sizer.py) + Cubic-convex dynamic leverage + alpha-layer fraction (SYSTEM BIBLE §6). + Replaces the DITAv2 stub formula that saturated at max_leverage on every + entry (confidence = |vdiv/threshold| ≥ 1 whenever an ENTER is possible). + +Both wrappers are observe()-driven and dedupe on scan_number so the 1 s poll +loop cannot poison histories with repeated scans. + +DUAL-LEVERAGE INVARIANT (prod/docs/FRACTIONAL_LEVERAGE_TO_BINGX_FIX.md, +prod/bingx/leverage.py): the fractional leverage produced here is STRATEGY +conviction — it sizes the quantity. At-exchange leverage is derived from it +at the venue boundary via map_internal_conviction_to_exchange_leverage() +(linear [0.5, 9.0] → [1, cap], bankers rounding, security cap). +""" + +from __future__ import annotations + +import logging +import sys +from collections import deque +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +LOGGER = logging.getLogger("PinkBlueParity") + +_PROJECT_ROOT = str(Path(__file__).resolve().parents[3]) + + +def _import_blue_modules() -> Tuple[Any, Any]: + """Import BLUE's selector/sizer modules, adding their root if needed. + + The real package lives at /nautilus_dolphin/nautilus_dolphin/ — + BLUE runs with /nautilus_dolphin on sys.path (see + nautilus_event_trader.py's proxy_boost_engine import). + """ + try: + from nautilus_dolphin.nautilus import alpha_asset_selector, alpha_bet_sizer + except ImportError: + pkg_root = str(Path(_PROJECT_ROOT) / "nautilus_dolphin") + for p in (pkg_root, _PROJECT_ROOT): + if p not in sys.path: + sys.path.insert(0, p) + # A namespace-package hit from /nautilus_dolphin (no __init__) + # can shadow the real package — drop it so the retry resolves cleanly. + sys.modules.pop("nautilus_dolphin", None) + from nautilus_dolphin.nautilus import alpha_asset_selector, alpha_bet_sizer + return alpha_asset_selector, alpha_bet_sizer + + +class PinkAssetPicker: + """Accumulate universe price history from scan payloads; rank via BLUE IRP. + + BLUE semantics preserved: + - IRP lookback default 50 bars (alpha_asset_selector.IRP_LOOKBACK). + - Kernel gates (noise>500, latency>20, alignment<0.20) apply inside + rank_assets_irp_nb; min_irp_alignment=0.0 matches BLUE gold spec. + - No fallback asset when ranking returns no candidate (Bible §5.5: + "No match → return None"). + """ + + def __init__(self, lookback: int = 0, min_irp_alignment: float = 0.0): + selector_mod, _ = _import_blue_modules() + self._selector_mod = selector_mod + self.lookback = int(lookback) if lookback > 0 else int(selector_mod.IRP_LOOKBACK) + self.min_irp_alignment = float(min_irp_alignment) + self._selector = selector_mod.AlphaAssetSelector(lookback_horizon=self.lookback) + # Per-asset rolling price history. lookback+1 prices = lookback returns. + self._history: Dict[str, deque] = {} + self._last_price: Dict[str, float] = {} + self._last_scan_number: int = -1 + self.scans_observed: int = 0 + + # ── feeding ────────────────────────────────────────────────────────────── + + def observe(self, scan_payload: Optional[dict], scan_number: int) -> bool: + """Append one bar per NEW scan. Returns True if the bar was accepted.""" + payload = scan_payload or {} + sn = int(scan_number or 0) + if sn <= self._last_scan_number: + return False # duplicate / stale scan — poll loop runs faster than scans + assets = payload.get("assets") or [] + prices = payload.get("asset_prices") or [] + if not (isinstance(assets, list) and isinstance(prices, list) and assets): + return False + maxlen = self.lookback + 1 + for asset, price in zip(assets, prices): + try: + px = float(price) + except (TypeError, ValueError): + continue + if px <= 0: + continue + sym = str(asset).upper() + hist = self._history.get(sym) + if hist is None: + hist = deque(maxlen=maxlen) + self._history[sym] = hist + hist.append(px) + self._last_price[sym] = px + self._last_scan_number = sn + self.scans_observed += 1 + return True + + # ── queries ────────────────────────────────────────────────────────────── + + @property + def warm(self) -> bool: + """True once at least one asset has a full lookback window.""" + need = self.lookback + 1 + return any(len(h) >= need for h in self._history.values()) + + def price_of(self, asset: str) -> Optional[float]: + key = str(asset).upper() + px = self._last_price.get(key) + if px is None: + # Venue-format tolerance: reconcile/fill paths write the BingX + # symbol ("FET-USDT") into the slot, while the scan universe keys + # are Binance-style ("FETUSDT"). Without this fallback an open + # slot adopted from the exchange has no price and every policy + # step degrades to HOLD — TP/SL/MAX_HOLD never fire (2026-06-11). + px = self._last_price.get(key.replace("-", "")) + return px + + def pick(self, direction: int = -1) -> Optional[Tuple[str, float, float]]: + """Rank the warm universe; return (asset, last_price, ars) or None. + + ``direction`` is the regime direction (-1 = short regime; PINK is + short-only). BLUE walks the ranked list applying the alignment and + direction gates; first survivor wins. + """ + need = self.lookback + 1 + market_data: Dict[str, List[float]] = { + sym: list(h) for sym, h in self._history.items() if len(h) >= need + } + if not market_data: + return None + try: + rankings = self._selector.rank_assets(market_data, regime_direction=int(direction)) + except Exception as exc: + LOGGER.warning("IRP ranking failed: %s — no asset picked", exc) + return None + expected_action = "SHORT" if int(direction) == -1 else "LONG" + for r in rankings: + if self.min_irp_alignment > 0 and r.metrics.alignment < self.min_irp_alignment: + continue + if r.action != expected_action: + continue + px = self._last_price.get(r.asset) + if px is None or px <= 0: + continue + return r.asset, px, float(r.ars_score) + return None + + +class PinkAlphaSizer: + """BLUE's AlphaBetSizer with vel_div-trend tracking and trade feedback. + + calculate_size() output is BLUE-identical: + strength_score = clamp((threshold − vel_div)/(threshold − extreme), 0, 1) + eff_leverage = min_lev + strength³ × (max_lev − min_lev) (convexity 3) + eff_fraction = alpha layers (bucket boost, streak, trend, extreme) + + vd_trend = vel_div[-1] − vel_div[-10] over the last 10 SCANS (Bible §6.4), + maintained here via observe(); the decision layer only passes capital and + the current vel_div. + """ + + def __init__( + self, + *, + base_fraction: float = 0.20, + min_leverage: float = 0.5, + max_leverage: float = 8.0, + leverage_convexity: float = 3.0, + vel_div_threshold: float = -0.02, + vel_div_extreme: float = -0.05, + use_dynamic_leverage: bool = True, + use_alpha_layers: bool = True, + ): + _, sizer_mod = _import_blue_modules() + self._sizer = sizer_mod.AlphaBetSizer( + base_fraction=base_fraction, + min_leverage=min_leverage, + max_leverage=max_leverage, + leverage_convexity=leverage_convexity, + vel_div_threshold=vel_div_threshold, + vel_div_extreme=vel_div_extreme, + use_dynamic_leverage=use_dynamic_leverage, + use_alpha_layers=use_alpha_layers, + ) + self.min_leverage = float(min_leverage) + self.max_leverage = float(max_leverage) + self._vd_history: deque = deque(maxlen=10) + self._last_scan_number: int = -1 + # Bucket of the most recent calculate_size(); frozen by note_entry() + # so record_close() credits the right bucket even after later scans. + self._pending_bucket: int = 3 + self._open_bucket: Optional[int] = None + + def observe(self, vel_div: Any, scan_number: int) -> None: + sn = int(scan_number or 0) + if sn <= self._last_scan_number: + return + try: + vd = float(vel_div) + except (TypeError, ValueError): + return + self._vd_history.append(vd) + self._last_scan_number = sn + + @property + def vd_trend(self) -> float: + if len(self._vd_history) >= 10: + return float(self._vd_history[-1] - self._vd_history[0]) + return 0.0 + + def calculate_size(self, *, capital: float, vel_div: float) -> dict: + result = self._sizer.calculate_size( + capital=float(capital), + vel_div=float(vel_div), + vel_div_trend=self.vd_trend, + trade_direction=-1, + ) + self._pending_bucket = int(result.get("bucket_idx", 3)) + return result + + # ── trade feedback (bucket boost / streak multiplier inputs) ───────────── + + def note_entry(self) -> None: + """An ENTER sized by the last calculate_size() was actually submitted.""" + self._open_bucket = self._pending_bucket + + def record_close(self, pnl: float) -> None: + """Feed realized PnL of the closed trade back into the alpha layers.""" + bucket = self._open_bucket + self._open_bucket = None + if bucket is None: + return + try: + self._sizer.record_trade(int(bucket), float(pnl)) + except Exception as exc: + LOGGER.warning("sizer record_trade failed: %s", exc)