diff --git a/prod/clean_arch/violet/shadow_live_factors.py b/prod/clean_arch/violet/shadow_live_factors.py new file mode 100644 index 0000000..7889622 --- /dev/null +++ b/prod/clean_arch/violet/shadow_live_factors.py @@ -0,0 +1,79 @@ +"""VIOLET launcher shadow helpers for live BLUE factor sourcing. + +These helpers stay separate from the launcher module so they can be unit-tested +without importing the full launcher import chain. +""" + +from __future__ import annotations + +import logging +import os + +LOGGER = logging.getLogger(__name__) + + +def build_shadow_live_source( + *, + client_factory=None, + selector_factory=None, + source_factory=None, + scan_history_factory=None, +): + """Create the read-only BLUE live-factor mirror for the shadow path.""" + if client_factory is None or selector_factory is None or source_factory is None or scan_history_factory is None: + import hazelcast + + from .alpha_wrappers import VioletAssetSelector + from .live_blue_source import LiveBlueScanHistory, source_live_blue_sizing_factors + + client_factory = client_factory or (lambda: hazelcast.HazelcastClient( + cluster_name=os.environ.get("HZ_CLUSTER", "dolphin"), + cluster_members=[os.environ.get("HZ_HOST", "localhost:5701")], + )) + selector_factory = selector_factory or VioletAssetSelector + source_factory = source_factory or source_live_blue_sizing_factors + scan_history_factory = scan_history_factory or LiveBlueScanHistory + + client = client_factory() + return { + "client": client, + "scan_history": scan_history_factory(), + "selector": selector_factory(), + "live_source": source_factory, + } + + +def shadow_decision_step( + shadow: dict, + payload: dict, + *, + scan_number: int, + now_ns: int, + vel_div: float, + vol_ok: bool, +) -> bool: + """Run one shadow decision against the live BLUE factor plane.""" + shadow["engine"].observe(payload, scan_number) + live_source = shadow.get("live_source") + factors = None + if live_source is not None: + live_result = live_source( + shadow["client"], + scan_history=shadow["scan_history"], + selector=shadow["selector"], + ) + shadow["last_live_source"] = live_result + factors = live_result.factors + if factors is None: + return False + decision = shadow["engine"].decide( + now_ns=now_ns, + scan_number=scan_number, + capital=shadow["capital"], + vel_div=vel_div, + vol_ok=vol_ok, + factors=factors, + ) + if decision is None: + return False + return shadow["journal"].journal(decision, mono_ns=now_ns) diff --git a/prod/clean_arch/violet/test_violet_launcher_shadow_live_factors.py b/prod/clean_arch/violet/test_violet_launcher_shadow_live_factors.py new file mode 100644 index 0000000..620176f --- /dev/null +++ b/prod/clean_arch/violet/test_violet_launcher_shadow_live_factors.py @@ -0,0 +1,155 @@ +"""V3.4b launcher shadow wiring — live BLUE factor plane is mandatory.""" + +from __future__ import annotations + +import sys +from types import SimpleNamespace + +sys.path.insert(0, "/mnt/dolphinng5_predict") + +from prod.clean_arch.violet.decision_engine import ShadowDecision, SizingFactors +from prod.clean_arch.violet.shadow_journal import VioletDecisionJournal + + +def test_build_shadow_includes_live_factor_source(): + from prod.clean_arch.violet import shadow_live_factors as slf + + class FakeClient: + pass + + shadow = slf.build_shadow_live_source( + client_factory=lambda: FakeClient(), + selector_factory=lambda: object(), + source_factory=lambda client, scan_history, selector: object(), + scan_history_factory=lambda: object(), + ) + assert isinstance(shadow["client"], FakeClient) + assert shadow["live_source"] is not None + assert shadow["scan_history"] is not None + assert shadow["selector"] is not None + + +def test_build_shadow_propagates_client_factory_failure(): + from prod.clean_arch.violet import shadow_live_factors as slf + + try: + slf.build_shadow_live_source( + client_factory=lambda: (_ for _ in ()).throw(RuntimeError("hz down")), + selector_factory=lambda: object(), + source_factory=lambda client, scan_history, selector: object(), + scan_history_factory=lambda: object(), + ) + except RuntimeError as exc: + assert "hz down" in str(exc) + else: + raise AssertionError("expected live source failure") + + +def test_shadow_decision_step_uses_live_factors_and_journals(): + from prod.clean_arch.violet import shadow_live_factors as slf + + observed = [] + decided = [] + journal_rows = [] + + class FakeEngine: + def observe(self, payload, scan_number): + observed.append((scan_number, payload["vel_div"])) + + def decide(self, **kwargs): + decided.append(kwargs) + factors = kwargs["factors"] + assert isinstance(factors, SizingFactors) + assert factors.posture == "APEX" + return ShadowDecision( + ts_ns=kwargs["now_ns"], + scan_number=kwargs["scan_number"], + asset="BTCUSDT", + side="SHORT", + vel_div=kwargs["vel_div"], + fraction=0.2, + conviction_leverage=3.0, + notional_fraction=0.6, + target_exposure=41400.0, + ars_score=1.23, + bucket_idx=1, + actuated=True, + base_leverage=1.0, + dc_lev_mult=1.0, + regime_size_mult=1.0, + market_ob_mult=1.0, + esof_size_mult=1.0, + ) + + shadow = { + "engine": FakeEngine(), + "journal": VioletDecisionJournal( + sink=lambda table, row: journal_rows.append((table, row)), + session_id="sess", + ), + "capital": 69_000.0, + "mono_ns": lambda: 123, + "client": object(), + "scan_history": object(), + "selector": object(), + "live_source": lambda client, scan_history, selector: SimpleNamespace( + factors=SizingFactors( + boost=1.4, + beta=0.2, + mc_scale=0.5, + esof_score=0.42, + ob_median_imbalance=0.12, + ob_agreement_pct=0.91, + dc_status="CONFIRM", + posture="APEX", + ), + selected_asset="BTCUSDT", + ), + "live_decisions": 0, + "last_live_source": None, + } + payload = {"vel_div": -0.031, "vol_ok": True} + ok = slf.shadow_decision_step( + shadow, + payload, + scan_number=7, + now_ns=123, + vel_div=-0.031, + vol_ok=True, + ) + assert ok is True + assert observed == [(7, -0.031)] + assert decided and decided[0]["factors"].dc_status == "CONFIRM" + assert shadow["last_live_source"].selected_asset == "BTCUSDT" + assert journal_rows and journal_rows[0][0] == "violet_decisions" + + +def test_shadow_decision_step_skips_without_live_factor_plane(): + from prod.clean_arch.violet import shadow_live_factors as slf + + class FailEngine: + def observe(self, payload, scan_number): + pass + + def decide(self, **kwargs): + raise AssertionError("must not fall back to base-only") + + shadow = { + "engine": FailEngine(), + "journal": VioletDecisionJournal(sink=lambda table, row: None, session_id="sess"), + "capital": 69_000.0, + "mono_ns": lambda: 123, + "client": object(), + "scan_history": object(), + "selector": object(), + "live_source": None, + } + ok = slf.shadow_decision_step( + shadow, + {"vel_div": -0.031, "vol_ok": True}, + scan_number=7, + now_ns=123, + vel_div=-0.031, + vol_ok=True, + ) + assert ok is False diff --git a/prod/docs/RECENT_VIOLET_34D_pending.md b/prod/docs/RECENT_VIOLET_34D_pending.md new file mode 100644 index 0000000..be7669d --- /dev/null +++ b/prod/docs/RECENT_VIOLET_34D_pending.md @@ -0,0 +1,102 @@ +# RECENT_VIOLET_34D_pending + +## What This Work Was + +This pass continued the VIOLET plan after `V3.4c` by wiring the launcher-side +shadow path to the live BLUE factor plane. + +The goal stayed read-only. BLUE code, BLUE schemas, and BLUE data structures +were not modified. The change was entirely on the VIOLET side. + +## Scope + +This pass added a thin shadow-side live-factor attachment and a focused test +surface: + +- `prod/clean_arch/violet/shadow_live_factors.py` +- `prod/clean_arch/violet/test_violet_launcher_shadow_live_factors.py` +- `prod/launch_dolphin_violet.py` + +It reused the existing V3.4c live BLUE source adapter: + +- `prod/clean_arch/violet/live_blue_source.py` +- `prod/clean_arch/violet/live_factor_source.py` +- `prod/clean_arch/violet/live_factors.py` + +## Why This Was Needed + +Before this pass, the Violet shadow launcher still had a base-only decision +path. That meant the `VioletDecisionEngine` could produce a muted decision, but +it did not yet receive the full live factor plane from BLUE’s published +surfaces inside the launcher path. + +The missing piece was the launcher-side wiring: source live BLUE factors, +thread them into `decide(...)`, and keep the journaled breakdown faithful to the +same factor plane BLUE would have seen at that scan. + +## What Was Added + +### 1. Shadow live-factor helper + +`shadow_live_factors.py` now provides two small helpers: + +- `build_shadow_live_source(...)` +- `shadow_decision_step(...)` + +`build_shadow_live_source(...)` assembles the read-only BLUE live factor mirror +for the shadow path. The helper keeps imports lazy so it can be unit-tested +without dragging in the full launcher import chain. + +`shadow_decision_step(...)` runs one muted shadow decision using the live BLUE +factor plane, then journals the result when the decision is actuated. + +### 2. Launcher wiring + +`launch_dolphin_violet.py` now: + +- builds the live-factor shadow source when shadow mode is enabled +- passes the live `SizingFactors` into `VioletDecisionEngine.decide(...)` +- skips the shadow decision instead of silently falling back to base-only when + the live factor plane is missing +- keeps the existing journal path intact + +This preserves the existing muted-shadow architecture while making the shadow +decision reflect the live BLUE factor plane rather than a reduced fallback. + +### 3. Focused tests + +`test_violet_launcher_shadow_live_factors.py` covers: + +- the live-factor helper contract +- failure propagation from the client factory +- the shadow decision step with live factors and journaling +- the no-live-factor skip path + +Because the mount was slow under `pytest`, I verified the helper path directly +with a small execution script instead of waiting on long file-system waits. + +## Exactness Rules Followed + +The pass stayed conservative: + +- no BLUE edits +- no schema edits +- no live fallback to a fake factor plane +- no execution path changes outside the muted shadow branch +- no silent loss of the live factor breakdown + +## Verification + +Direct runtime check: + +- the new helper built successfully with injected factories +- the shadow decision step accepted the live factor plane +- the decision was journaled with the expected Violet journal table + +Observed direct result: + +- `ok` + +`pytest` on this mount was slow and repeatedly stalled in netfs waits, so I did +not treat that as a code failure. + diff --git a/prod/launch_dolphin_violet.py b/prod/launch_dolphin_violet.py index d59d93c..732b79b 100644 --- a/prod/launch_dolphin_violet.py +++ b/prod/launch_dolphin_violet.py @@ -50,6 +50,10 @@ from prod.launch_dolphin_pink import ( # noqa: E402 _resolve_bingx_exchange_leverage_cap, _resolve_bingx_recv_window_ms, ) +from prod.clean_arch.violet.shadow_live_factors import ( # noqa: E402 + build_shadow_live_source, + shadow_decision_step, +) logging.basicConfig( level=logging.INFO, @@ -260,17 +264,18 @@ async def _divergence_driver(divergence, data_feed, poll_s: float, shadow=None) if shadow is not None and started: try: sn = int(payload.get("scan_number") or 0) - shadow["engine"].observe(payload, sn) vd = payload.get("vel_div") if vd is not None: now_ns = shadow["mono_ns"]() - d = shadow["engine"].decide( - now_ns=now_ns, scan_number=sn, - capital=shadow["capital"], vel_div=float(vd), + if shadow_decision_step( + shadow, + payload, + scan_number=sn, + now_ns=now_ns, + vel_div=float(vd), vol_ok=bool(payload.get("vol_ok", True)), - ) - if d is not None: - shadow["journal"].journal(d, mono_ns=now_ns) + ): + shadow["live_decisions"] += 1 except Exception as exc: # noqa: BLE001 — shadow must never die LOGGER.debug("shadow decision failed: %s", exc) except Exception as exc: # noqa: BLE001 — sampling must never die @@ -333,13 +338,29 @@ def _build_shadow(): relaxed = abs(thr - (-0.02)) > 1e-9 engine = VioletDecisionEngine(entry_vel_div_threshold=thr) journal = VioletDecisionJournal(sink=ch_put_violet, session_id=sess) + try: + live_source = build_shadow_live_source() + except Exception as exc: + LOGGER.warning( + "VIOLET shadow live-factor source unavailable (%s) — shadow DISABLED.", + exc, + ) + return None LOGGER.warning( "VIOLET DECISION SHADOW ON (session=%s ref_capital=%.0f entry_thr=%.4f%s) — " "journaling muted decisions to dolphin_violet.violet_decisions; NO orders.", sess, capital, thr, " RELAXED:not-parity-faithful" if relaxed else "", ) - return {"engine": engine, "journal": journal, "capital": capital, "mono_ns": mono_ns} + return { + "engine": engine, + "journal": journal, + "capital": capital, + "mono_ns": mono_ns, + **live_source, + "live_decisions": 0, + "last_live_source": None, + } async def run() -> None: