From 75e71ef66d61aff8210debb6be3611273bd1b2cb Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 14 Jun 2026 20:39:37 +0200 Subject: [PATCH] VIOLET V3e-wire: launcher shadow-decision path (default OFF) + DDL applied launch_dolphin_violet.py: _build_shadow() behind DOLPHIN_VIOLET_DECISION_SHADOW=1 (default OFF -> _build_shadow returns None, zero behavior change, verified). Divergence driver optionally feeds VioletDecisionEngine + journals to dolphin_violet.violet_decisions (NO orders). Self-disables if the table is absent (no CH doom-loop, the 2026-06-11 spool lesson). 22_violet_decisions.sql applied to CH (table present, empty). vel_div sourced from scan_payload['vel_div'] (matches pink_direct). Restart for the DARK soak held for operator. Co-Authored-By: Claude Opus 4.8 --- prod/launch_dolphin_violet.py | 74 +++++++++++++++++++++++++++++++++-- 1 file changed, 71 insertions(+), 3 deletions(-) diff --git a/prod/launch_dolphin_violet.py b/prod/launch_dolphin_violet.py index 467ed8c..2822719 100644 --- a/prod/launch_dolphin_violet.py +++ b/prod/launch_dolphin_violet.py @@ -221,8 +221,13 @@ def _build_divergence(sink=None): ) -async def _divergence_driver(divergence, data_feed, poll_s: float) -> None: - """Shared scan-sampling loop for both DARK and observe modes.""" +async def _divergence_driver(divergence, data_feed, poll_s: float, shadow=None) -> None: + """Shared scan-sampling loop for both DARK and observe modes. + + When ``shadow`` is provided (DOLPHIN_VIOLET_DECISION_SHADOW=1), each scan is + ALSO fed to the V3 VioletDecisionEngine and any actuated decision is journaled + to dolphin_violet.violet_decisions. This NEVER executes — muted shadow only. + """ # The driver owns this feed instance and is its only connector; an # unconnected HazelcastDataFeed has features_map=None and every poll # raises 'NoneType' has no attribute 'get' at ERROR level (1 Hz). @@ -252,6 +257,22 @@ async def _divergence_driver(divergence, data_feed, poll_s: float) -> None: started = True if started: divergence.on_scan(snapshot) + if shadow is not None and started: + try: + sn = int(payload.get("scan_number") or 0) + shadow["engine"].observe(payload, sn) + vd = payload.get("vel_div") + if vd is not None: + now_ns = shadow["mono_ns"]() + d = shadow["engine"].decide( + now_ns=now_ns, scan_number=sn, + capital=shadow["capital"], vel_div=float(vd), + vol_ok=bool(payload.get("vol_ok", True)), + ) + if d is not None: + shadow["journal"].journal(d, mono_ns=now_ns) + except Exception as exc: # noqa: BLE001 — shadow must never die + LOGGER.debug("shadow decision failed: %s", exc) except Exception as exc: # noqa: BLE001 — sampling must never die LOGGER.debug("divergence scan sample failed: %s", exc) await asyncio.sleep(poll_s) @@ -269,6 +290,52 @@ async def _dark_idle_loop(divergence_task: asyncio.Task | None) -> None: LOGGER.warning(msg) +def _violet_table_present(table: str) -> bool: + """One-table SELECT-probe (additive tables not in REQUIRED_TABLES preflight).""" + ch_url = os.environ.get("CH_URL", "http://localhost:8123") + q = urllib.parse.quote_plus(f"SELECT 0 FROM dolphin_violet.{table} LIMIT 0") + req = urllib.request.Request(f"{ch_url}/?query={q}", method="POST") + req.add_header("X-ClickHouse-User", os.environ.get("CH_USER", "dolphin")) + req.add_header("X-ClickHouse-Key", os.environ.get("CH_PASS", "dolphin_ch_2026")) + try: + urllib.request.urlopen(req, timeout=5).read() + return True + except Exception: + return False + + +def _build_shadow(): + """V3 shadow-decision path (DOLPHIN_VIOLET_DECISION_SHADOW=1, default OFF). + + Returns wiring dict or None. NEVER executes — journals muted decisions only. + Self-disables if violet_decisions is absent so a missing additive DDL can + NEVER cause a CH-flush doom-loop (the 2026-06-11 spool/disk-fill lesson). + """ + if not _env_bool("DOLPHIN_VIOLET_DECISION_SHADOW", False): + return None + if not _violet_table_present("violet_decisions"): + LOGGER.warning( + "VIOLET shadow requested but dolphin_violet.violet_decisions is MISSING " + "— apply 22_violet_decisions.sql; shadow DISABLED (no doom-loop)." + ) + return None + from prod.ch_writer import ch_put_violet + from prod.clean_arch.violet.clock import mono_ns + from prod.clean_arch.violet.decision_engine import VioletDecisionEngine + from prod.clean_arch.violet.shadow_journal import VioletDecisionJournal + + sess = uuid.uuid4().hex + capital = float(os.environ.get("DOLPHIN_VIOLET_SHADOW_CAPITAL", "69000")) + # engine defaults == live BLUE base curve (max_leverage 9.0, vel_div_threshold -0.02). + engine = VioletDecisionEngine() + journal = VioletDecisionJournal(sink=ch_put_violet, session_id=sess) + LOGGER.warning( + "VIOLET DECISION SHADOW ON (session=%s ref_capital=%.0f) — journaling muted " + "decisions to dolphin_violet.violet_decisions; NO orders.", sess, capital, + ) + return {"engine": engine, "journal": journal, "capital": capital, "mono_ns": mono_ns} + + async def run() -> None: _apply_violet_env() poll_s = float(os.environ.get("DOLPHIN_VIOLET_POLL_INTERVAL_SEC", "1.0")) @@ -287,8 +354,9 @@ async def run() -> None: divergence_task = None if _env_bool("DOLPHIN_VIOLET_DARK_DIVERGENCE", True) or _violet_keys_present(): divergence = _build_divergence() + shadow = _build_shadow() divergence_task = asyncio.create_task( - _divergence_driver(divergence, _build_data_feed(), poll_s), + _divergence_driver(divergence, _build_data_feed(), poll_s, shadow=shadow), name="violet_divergence_driver", )