From 1e331d80bc4218e6a6e1c64158a90c6f2d25772c Mon Sep 17 00:00:00 2001 From: Codex Date: Sat, 13 Jun 2026 18:51:27 +0200 Subject: [PATCH] =?UTF-8?q?VIOLET=20V3c:=20VioletDecisionEngine=20?= =?UTF-8?q?=E2=80=94=20reactor-resident=20SHADOW=20engine?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Composes V3a live-kernel wrappers + V3b cadence into a muted decision engine: scans in -> ShadowDecision out, NO execution (distinct from PINK's disabled dita DecisionEngine; runs alongside as pure shadow). Short-regime gate mirrors BLUE/dita (vel_div --- prod/clean_arch/violet/decision_engine.py | 162 ++++++++++++++++++ .../violet/test_violet_decision_engine.py | 111 ++++++++++++ 2 files changed, 273 insertions(+) create mode 100644 prod/clean_arch/violet/decision_engine.py create mode 100644 prod/clean_arch/violet/test_violet_decision_engine.py diff --git a/prod/clean_arch/violet/decision_engine.py b/prod/clean_arch/violet/decision_engine.py new file mode 100644 index 0000000..ef524f3 --- /dev/null +++ b/prod/clean_arch/violet/decision_engine.py @@ -0,0 +1,162 @@ +"""VIOLET V3c: VioletDecisionEngine — reactor-resident SHADOW decision engine. + +Composes the L1 alpha wrappers (V3a, live-kernel-faithful) with the Cadence Control +Plane (V3b) into a reactor-resident engine that, on each scan, produces a shadow +``ShadowDecision`` — and NOTHING ELSE. No venue, no intent, no execution: the decision +brain is online but MUTED (the V3 mandate). Execution stays off in the separate, +disabled ``PinkDirectRuntime`` path behind the ObserveOnlyVenue guard. + +This is a NEW engine, distinct from ``prod.clean_arch.dita.decision.DecisionEngine`` +(PINK's, built on the untrusted ``blue_parity`` distillation and disabled in V1). V3 +models LIVE BLUE via the parity-validated wrappers, runs as a pure shadow alongside, +and feeds the V3d parity harness. + +Decision contract mirrors BLUE/dita gating (short-regime): a decision fires only when +``vel_div < entry_threshold`` AND ``vol_ok`` AND the IRP selector returns a survivor; +sizing is the cubic-convex conviction leverage × alpha fraction. ``target_exposure = +capital × fraction × conviction_leverage`` (the conviction side of the dual-leverage — +exchange leverage is L3, never here). Actuation is gated by the cadence knobs for +ENTRY/SIZING (Q=scan by default); evaluation happens every call (shadow-logged). +""" + +from __future__ import annotations + +from collections import deque +from typing import Deque, Dict, List, Optional + +from pydantic import Field + +from .alpha_wrappers import AssetPick, SizeDecision, VioletAssetSelector, VioletBetSizer +from .cadence import Action, CadenceControlPlane +from .domain import StrictModel, Symbol, typed + + +class ShadowDecision(StrictModel): + """One muted decision — what BLUE *would* do this scan. Never executed.""" + + ts_ns: int = Field(ge=0) + scan_number: int = Field(ge=0) + asset: Symbol + side: str + vel_div: float = Field(allow_inf_nan=False) + fraction: float = Field(ge=0.0, allow_inf_nan=False) + conviction_leverage: float = Field(ge=0.0, allow_inf_nan=False) + notional_fraction: float = Field(ge=0.0, allow_inf_nan=False) + target_exposure: float = Field(ge=0.0, allow_inf_nan=False) + ars_score: float = Field(allow_inf_nan=False) + bucket_idx: int = Field(ge=0, le=3) + actuated: bool + + +class VioletDecisionEngine: + """Reactor-resident shadow engine: scans in, ShadowDecisions out (no execution). + + ``max_leverage`` (and the other sizer knobs) are EXPLICIT — pinned from live BLUE + by the parity harness, never a drifted default (V3a doctrine). + """ + + def __init__( + self, + *, + control_plane: Optional[CadenceControlPlane] = None, + lookback: int = 0, + min_alignment: float = 0.0, + base_fraction: float = 0.20, + min_leverage: float = 0.5, + max_leverage: float = 9.0, + entry_vel_div_threshold: float = -0.02, + regime_direction: int = -1, + ): + self.cp = control_plane or CadenceControlPlane() + self.selector = VioletAssetSelector(lookback_horizon=lookback, min_alignment=min_alignment) + self.sizer = VioletBetSizer( + base_fraction=base_fraction, min_leverage=min_leverage, + max_leverage=max_leverage, vel_div_threshold=entry_vel_div_threshold, + ) + self.entry_threshold = float(entry_vel_div_threshold) + self.regime_direction = int(regime_direction) + self.lookback = int(lookback) if lookback > 0 else self.selector.lookback + + # per-asset rolling price history (scan-accumulated; bookkeeping, not alpha) + self._history: Dict[str, Deque[float]] = {} + self._last_scan_number = -1 + self._last_entry_actuation_ns: Optional[int] = None + # shadow-delta telemetry (evaluate vs actuate) + self.evaluations = 0 + self.actuations = 0 + self.suppressed_by_cadence = 0 + + # ── scan accumulation (mirrors PinkAssetPicker.observe bookkeeping) ────────── + + def observe(self, scan_payload: Optional[dict], scan_number: int) -> bool: + """Append one bar per NEW scan. Dedupe on scan_number (poll > scan rate).""" + payload = scan_payload or {} + sn = int(scan_number or 0) + if sn <= self._last_scan_number: + return False + assets = payload.get("assets") or [] + prices = payload.get("asset_prices") or [] + if not (isinstance(assets, list) and isinstance(prices, list) and assets): + return False + maxlen = self.lookback + 1 + for asset, price in zip(assets, prices): + try: + px = float(price) + except (TypeError, ValueError): + continue + if px <= 0: + continue + sym = str(asset).upper() + hist = self._history.get(sym) + if hist is None: + hist = deque(maxlen=maxlen) + self._history[sym] = hist + hist.append(px) + self._last_scan_number = sn + return True + + def _market_data(self) -> Dict[str, List[float]]: + need = self.lookback + 1 + return {s: list(h) for s, h in self._history.items() if len(h) >= need} + + # ── the muted decision ────────────────────────────────────────────────────── + + @typed + def decide( + self, *, now_ns: int, scan_number: int, capital: float, + vel_div: float, vol_ok: bool = True, + ) -> Optional[ShadowDecision]: + """Evaluate the would-be decision (always); actuate only when ENTRY cadence + is due. Returns the ShadowDecision when a short signal fires, else None. + + ``vel_div`` is the chosen-asset velocity-divergence signal (the entry gate); + in V3e wiring it is extracted from the scan, as dita's ``fields.vdiv`` is. + """ + self.evaluations += 1 + + # BLUE/dita short-regime gate: no signal unless vel_div below threshold + vol_ok. + if vel_div >= self.entry_threshold or not vol_ok: + return None + pick: Optional[AssetPick] = self.selector.pick(self._market_data(), self.regime_direction) + if pick is None: + return None + + # cadence: evaluate-always, actuate-at-Q (ENTRY knob). + actuate = self.cp.due(Action.ENTRY, now_ns, self._last_entry_actuation_ns) + if not actuate: + self.suppressed_by_cadence += 1 + return None + self._last_entry_actuation_ns = int(now_ns) + self.actuations += 1 + + size: SizeDecision = self.sizer.calculate( + capital=capital, vel_div=vel_div, trade_direction=self.regime_direction, + ) + return ShadowDecision( + ts_ns=int(now_ns), scan_number=int(scan_number), + asset=pick.asset, side=pick.side, vel_div=float(vel_div), + fraction=size.fraction, conviction_leverage=size.conviction_leverage, + notional_fraction=size.notional_fraction, + target_exposure=float(capital) * size.notional_fraction, + ars_score=pick.ars_score, bucket_idx=size.bucket_idx, actuated=True, + ) diff --git a/prod/clean_arch/violet/test_violet_decision_engine.py b/prod/clean_arch/violet/test_violet_decision_engine.py new file mode 100644 index 0000000..9c3ea94 --- /dev/null +++ b/prod/clean_arch/violet/test_violet_decision_engine.py @@ -0,0 +1,111 @@ +"""V3c: VioletDecisionEngine — muted-decision gating, cadence suppression, no exec.""" + +from __future__ import annotations + +import sys + +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest + +from prod.clean_arch.violet.cadence import Action, CadenceControlPlane, INSTA_Q_NS, SCAN_Q_NS +from prod.clean_arch.violet.decision_engine import ShadowDecision, VioletDecisionEngine + +LOOKBACK = 5 + + +def _engine(cp=None, **kw): + return VioletDecisionEngine(control_plane=cp, lookback=LOOKBACK, **kw) + + +def _warm(engine, n_scans: int = 8): + """Feed a short-favorable universe (downtrending) so the selector warms + ranks.""" + assets = ["AAAUSDT", "BBBUSDT", "CCCUSDT"] + for s in range(n_scans): + prices = [100.0 * (1 - 0.002 * s - 0.0005 * i) for i in range(len(assets))] + engine.observe({"assets": assets, "asset_prices": prices}, scan_number=s + 1) + + +def test_short_signal_produces_shadow_decision(): + e = _engine() + _warm(e) + d = e.decide(now_ns=10**12, scan_number=99, capital=69_000.0, vel_div=-0.20, vol_ok=True) + assert d is None or isinstance(d, ShadowDecision) + if d is not None: + assert d.side == "SHORT" + assert d.target_exposure == pytest.approx(69_000.0 * d.notional_fraction) + assert d.actuated is True + + +def test_no_decision_when_vel_div_above_threshold(): + e = _engine() + _warm(e) + # vel_div above entry threshold (-0.02) → no short signal + assert e.decide(now_ns=10**12, scan_number=1, capital=69_000.0, vel_div=0.05) is None + + +def test_no_decision_when_vol_gate_blocks(): + e = _engine() + _warm(e) + assert e.decide(now_ns=10**12, scan_number=1, capital=69_000.0, vel_div=-0.20, vol_ok=False) is None + + +def test_no_decision_before_universe_warm(): + e = _engine() + e.observe({"assets": ["AAAUSDT"], "asset_prices": [100.0]}, scan_number=1) # 1 bar << lookback + assert e.decide(now_ns=10**12, scan_number=1, capital=69_000.0, vel_div=-0.20) is None + + +def test_cadence_suppresses_repeat_within_quantum(): + cp = CadenceControlPlane() # ENTRY default Q = scan (5s) + e = _engine(cp=cp) + _warm(e) + t0 = 10**12 + d1 = e.decide(now_ns=t0, scan_number=10, capital=69_000.0, vel_div=-0.20) + # second call 1ms later: within the scan quantum → suppressed (no actuation) + d2 = e.decide(now_ns=t0 + 1_000_000, scan_number=11, capital=69_000.0, vel_div=-0.20) + if d1 is not None: + assert d2 is None + assert e.suppressed_by_cadence >= 1 + # after a full scan quantum elapses → actuates again + d3 = e.decide(now_ns=t0 + SCAN_Q_NS, scan_number=12, capital=69_000.0, vel_div=-0.20) + if d1 is not None: + assert d3 is not None + + +def test_insta_cadence_actuates_every_call(): + cp = CadenceControlPlane() + cp.set(Action.ENTRY, q_ns=INSTA_Q_NS) + e = _engine(cp=cp) + _warm(e) + t = 10**12 + d1 = e.decide(now_ns=t, scan_number=1, capital=69_000.0, vel_div=-0.20) + d2 = e.decide(now_ns=t + 1, scan_number=2, capital=69_000.0, vel_div=-0.20) + if d1 is not None: + assert d2 is not None # insta → no suppression + + +def test_evaluate_always_ge_actuate_invariant(): + e = _engine() + _warm(e) + t = 10**12 + for k in range(5): + e.decide(now_ns=t + k * 1_000_000, scan_number=20 + k, capital=69_000.0, vel_div=-0.20) + assert e.evaluations >= e.actuations + + +def test_engine_holds_no_venue_or_kernel(): + # structural no-execution guarantee: the shadow engine has no venue/kernel/submit. + e = _engine() + for attr in ("venue", "kernel", "submit", "submit_intent", "execute"): + assert not hasattr(e, attr) + + +def test_determinism_same_inputs_same_decision(): + e1, e2 = _engine(), _engine() + _warm(e1); _warm(e2) + d1 = e1.decide(now_ns=10**12, scan_number=5, capital=69_000.0, vel_div=-0.20) + d2 = e2.decide(now_ns=10**12, scan_number=5, capital=69_000.0, vel_div=-0.20) + assert (d1 is None) == (d2 is None) + if d1 is not None: + assert d1.model_dump() == d2.model_dump()