"""VIOLET V3c: VioletDecisionEngine — reactor-resident SHADOW decision engine. Composes the L1 alpha wrappers (V3a, live-kernel-faithful) with the Cadence Control Plane (V3b) into a reactor-resident engine that, on each scan, produces a shadow ``ShadowDecision`` — and NOTHING ELSE. No venue, no intent, no execution: the decision brain is online but MUTED (the V3 mandate). Execution stays off in the separate, disabled ``PinkDirectRuntime`` path behind the ObserveOnlyVenue guard. This is a NEW engine, distinct from ``prod.clean_arch.dita.decision.DecisionEngine`` (PINK's, built on the untrusted ``blue_parity`` distillation and disabled in V1). V3 models LIVE BLUE via the parity-validated wrappers, runs as a pure shadow alongside, and feeds the V3d parity harness. Decision contract mirrors BLUE/dita gating (short-regime): a decision fires only when ``vel_div < entry_threshold`` AND ``vol_ok`` AND the IRP selector returns a survivor; sizing is the cubic-convex conviction leverage × alpha fraction. ``target_exposure = capital × fraction × conviction_leverage`` (the conviction side of the dual-leverage — exchange leverage is L3, never here). Actuation is gated by the cadence knobs for ENTRY/SIZING (Q=scan by default); evaluation happens every call (shadow-logged). """ from __future__ import annotations from collections import deque from typing import Deque, Dict, List, Optional from pydantic import Field from .alpha_wrappers import AssetPick, SizeDecision, VioletAssetSelector, VioletBetSizer from .cadence import Action, CadenceControlPlane from .domain import StrictModel, Symbol, typed class ShadowDecision(StrictModel): """One muted decision — what BLUE *would* do this scan. Never executed.""" ts_ns: int = Field(ge=0) scan_number: int = Field(ge=0) asset: Symbol side: str vel_div: float = Field(allow_inf_nan=False) fraction: float = Field(ge=0.0, allow_inf_nan=False) conviction_leverage: float = Field(ge=0.0, allow_inf_nan=False) notional_fraction: float = Field(ge=0.0, allow_inf_nan=False) target_exposure: float = Field(ge=0.0, allow_inf_nan=False) ars_score: float = Field(allow_inf_nan=False) bucket_idx: int = Field(ge=0, le=3) actuated: bool class VioletDecisionEngine: """Reactor-resident shadow engine: scans in, ShadowDecisions out (no execution). ``max_leverage`` (and the other sizer knobs) are EXPLICIT — pinned from live BLUE by the parity harness, never a drifted default (V3a doctrine). """ def __init__( self, *, control_plane: Optional[CadenceControlPlane] = None, lookback: int = 0, min_alignment: float = 0.0, base_fraction: float = 0.20, min_leverage: float = 0.5, max_leverage: float = 9.0, entry_vel_div_threshold: float = -0.02, regime_direction: int = -1, ): self.cp = control_plane or CadenceControlPlane() self.selector = VioletAssetSelector(lookback_horizon=lookback, min_alignment=min_alignment) self.sizer = VioletBetSizer( base_fraction=base_fraction, min_leverage=min_leverage, max_leverage=max_leverage, vel_div_threshold=entry_vel_div_threshold, ) self.entry_threshold = float(entry_vel_div_threshold) self.regime_direction = int(regime_direction) self.lookback = int(lookback) if lookback > 0 else self.selector.lookback # per-asset rolling price history (scan-accumulated; bookkeeping, not alpha) self._history: Dict[str, Deque[float]] = {} self._last_scan_number = -1 self._last_entry_actuation_ns: Optional[int] = None # shadow-delta telemetry (evaluate vs actuate) self.evaluations = 0 self.actuations = 0 self.suppressed_by_cadence = 0 # ── scan accumulation (mirrors PinkAssetPicker.observe bookkeeping) ────────── def observe(self, scan_payload: Optional[dict], scan_number: int) -> bool: """Append one bar per NEW scan. Dedupe on scan_number (poll > scan rate).""" payload = scan_payload or {} sn = int(scan_number or 0) if sn <= self._last_scan_number: return False assets = payload.get("assets") or [] prices = payload.get("asset_prices") or [] if not (isinstance(assets, list) and isinstance(prices, list) and assets): return False maxlen = self.lookback + 1 for asset, price in zip(assets, prices): try: px = float(price) except (TypeError, ValueError): continue if px <= 0: continue sym = str(asset).upper() hist = self._history.get(sym) if hist is None: hist = deque(maxlen=maxlen) self._history[sym] = hist hist.append(px) self._last_scan_number = sn return True def _market_data(self) -> Dict[str, List[float]]: need = self.lookback + 1 return {s: list(h) for s, h in self._history.items() if len(h) >= need} # ── the muted decision ────────────────────────────────────────────────────── @typed def decide( self, *, now_ns: int, scan_number: int, capital: float, vel_div: float, vol_ok: bool = True, ) -> Optional[ShadowDecision]: """Evaluate the would-be decision (always); actuate only when ENTRY cadence is due. Returns the ShadowDecision when a short signal fires, else None. ``vel_div`` is the chosen-asset velocity-divergence signal (the entry gate); in V3e wiring it is extracted from the scan, as dita's ``fields.vdiv`` is. """ self.evaluations += 1 # BLUE/dita short-regime gate: no signal unless vel_div below threshold + vol_ok. if vel_div >= self.entry_threshold or not vol_ok: return None pick: Optional[AssetPick] = self.selector.pick(self._market_data(), self.regime_direction) if pick is None: return None # cadence: evaluate-always, actuate-at-Q (ENTRY knob). actuate = self.cp.due(Action.ENTRY, now_ns, self._last_entry_actuation_ns) if not actuate: self.suppressed_by_cadence += 1 return None self._last_entry_actuation_ns = int(now_ns) self.actuations += 1 size: SizeDecision = self.sizer.calculate( capital=capital, vel_div=vel_div, trade_direction=self.regime_direction, ) return ShadowDecision( ts_ns=int(now_ns), scan_number=int(scan_number), asset=pick.asset, side=pick.side, vel_div=float(vel_div), fraction=size.fraction, conviction_leverage=size.conviction_leverage, notional_fraction=size.notional_fraction, target_exposure=float(capital) * size.notional_fraction, ars_score=pick.ars_score, bucket_idx=size.bucket_idx, actuated=True, )