From 6f5aa80ed0f33e25bff82bd78877021be74c1074 Mon Sep 17 00:00:00 2001 From: Codex Date: Sat, 13 Jun 2026 20:29:53 +0200 Subject: [PATCH] VIOLET V3e: shadow-decision journal + DDL (reject-at-source) 22_violet_decisions.sql (dolphin_violet.violet_decisions, DDL-first) + shadow_journal.py: VioletDecisionJournal validates each actuated ShadowDecision via DecisionRow (V-TYPES, allow_inf_nan=False) before the CH sink -- malformed dies at source, never at the spool head. NEVER an order. DecisionRow field set == DDL columns (asserted). 4 tests pass. Launcher wiring + DARK soak = operator step. Co-Authored-By: Claude Opus 4.8 --- prod/clean_arch/violet/shadow_journal.py | 84 +++++++++++++++++++ .../violet/test_violet_shadow_journal.py | 73 ++++++++++++++++ .../clickhouse/violet/22_violet_decisions.sql | 26 ++++++ 3 files changed, 183 insertions(+) create mode 100644 prod/clean_arch/violet/shadow_journal.py create mode 100644 prod/clean_arch/violet/test_violet_shadow_journal.py create mode 100644 prod/clickhouse/violet/22_violet_decisions.sql diff --git a/prod/clean_arch/violet/shadow_journal.py b/prod/clean_arch/violet/shadow_journal.py new file mode 100644 index 0000000..87eeeff --- /dev/null +++ b/prod/clean_arch/violet/shadow_journal.py @@ -0,0 +1,84 @@ +"""VIOLET V3e: shadow-decision journal — V-TYPES row guard + CH sink. + +Sinks each ACTUATED ``ShadowDecision`` to ``dolphin_violet.violet_decisions`` as a +validated row. PARSE, DON'T VALIDATE: a malformed decision dies HERE (rejected to a +counter), never at the head of the CH spool — the bars_held=-106 lesson. NEVER an +order: this journals what BLUE *would* do; execution stays off (ObserveOnlyVenue). +""" + +from __future__ import annotations + +import logging +import time +from typing import Any, Callable, Optional + +from pydantic import Field, ValidationError + +from .decision_engine import ShadowDecision +from .domain import EpochMs, MonoNs, SessionId, StrictModel, Symbol, Seq + +LOGGER = logging.getLogger("violet.shadow_journal") + +TABLE = "violet_decisions" + + +class DecisionRow(StrictModel): + """One ``dolphin_violet.violet_decisions`` row. Field set MUST equal the DDL + column set in prod/clickhouse/violet/22_violet_decisions.sql (asserted by test).""" + + ts: EpochMs # DateTime64(3) — epoch milliseconds + session_id: SessionId + scan_number: Seq + mono_ns: MonoNs + asset: Symbol + side: str = Field(min_length=1, max_length=8) + vel_div: float = Field(allow_inf_nan=False) + fraction: float = Field(ge=0.0, allow_inf_nan=False) + conviction_leverage: float = Field(ge=0.0, allow_inf_nan=False) + notional_fraction: float = Field(ge=0.0, allow_inf_nan=False) + target_exposure: float = Field(ge=0.0, allow_inf_nan=False) + ars_score: float = Field(allow_inf_nan=False) + bucket_idx: int = Field(ge=0, le=255) # UInt8 + actuated: int = Field(ge=0, le=1) + + +class VioletDecisionJournal: + """Validate-then-sink shadow decisions. ``sink(table, row_dict)`` mirrors + ch_put_violet / the divergence sink signature.""" + + def __init__(self, *, sink: Callable[[str, dict], Any], session_id: str): + self.sink = sink + self.session_id = session_id + self.rows_emitted = 0 + self.rows_rejected = 0 + + def journal(self, decision: ShadowDecision, *, mono_ns: int) -> bool: + """Build a validated row and sink it. Returns True if emitted.""" + try: + row = DecisionRow( + ts=int(time.time() * 1000), + session_id=self.session_id, + scan_number=int(decision.scan_number), + mono_ns=int(mono_ns), + asset=decision.asset, + side=decision.side, + vel_div=float(decision.vel_div), + fraction=float(decision.fraction), + conviction_leverage=float(decision.conviction_leverage), + notional_fraction=float(decision.notional_fraction), + target_exposure=float(decision.target_exposure), + ars_score=float(decision.ars_score), + bucket_idx=int(decision.bucket_idx), + actuated=1 if decision.actuated else 0, + ) + except ValidationError as exc: + self.rows_rejected += 1 + if self.rows_rejected == 1 or self.rows_rejected % 1000 == 0: + LOGGER.warning( + "shadow decision REJECTED at source (#%d) asset=%s: %s", + self.rows_rejected, getattr(decision, "asset", "?"), exc, + ) + return False + self.sink(TABLE, row.model_dump()) + self.rows_emitted += 1 + return True diff --git a/prod/clean_arch/violet/test_violet_shadow_journal.py b/prod/clean_arch/violet/test_violet_shadow_journal.py new file mode 100644 index 0000000..3622002 --- /dev/null +++ b/prod/clean_arch/violet/test_violet_shadow_journal.py @@ -0,0 +1,73 @@ +"""V3e: shadow-decision journal — DDL-shape parity, sink emission, reject-at-source.""" + +from __future__ import annotations + +import re +import sys +from pathlib import Path +from types import SimpleNamespace + +sys.path.insert(0, "/mnt/dolphinng5_predict") + +from prod.clean_arch.violet.decision_engine import ShadowDecision +from prod.clean_arch.violet.shadow_journal import DecisionRow, TABLE, VioletDecisionJournal + +DDL = Path("/mnt/dolphinng5_predict/prod/clickhouse/violet/22_violet_decisions.sql") + + +def _ddl_columns() -> set: + body = DDL.read_text() + inner = body[body.index("("): body.rindex(")")] + return set(re.findall(r"`(\w+)`", inner)) + + +def _decision(**over): + base = dict( + ts_ns=10**12, scan_number=7, asset="BTCUSDT", side="SHORT", vel_div=-0.2, + fraction=0.2, conviction_leverage=9.0, notional_fraction=1.8, + target_exposure=124200.0, ars_score=1.48, bucket_idx=1, actuated=True, + ) + base.update(over) + return ShadowDecision(**base) + + +def test_decisionrow_fields_equal_ddl_columns(): + assert set(DecisionRow.model_fields.keys()) == _ddl_columns() + + +def test_journal_emits_validated_row_to_sink(): + captured = [] + j = VioletDecisionJournal(sink=lambda t, r: captured.append((t, r)), session_id="sess1") + ok = j.journal(_decision(), mono_ns=999) + assert ok and j.rows_emitted == 1 + table, row = captured[0] + assert table == TABLE + assert set(row.keys()) == _ddl_columns() + assert row["asset"] == "BTCUSDT" and row["actuated"] == 1 + assert row["session_id"] == "sess1" and row["mono_ns"] == 999 + + +def test_reject_at_source_on_malformed_decision(): + captured = [] + j = VioletDecisionJournal(sink=lambda t, r: captured.append(r), session_id="s") + # duck-typed bad decision: non-finite vel_div must be rejected before the sink. + bad = SimpleNamespace( + scan_number=1, asset="BTCUSDT", side="SHORT", vel_div=float("nan"), + fraction=0.2, conviction_leverage=9.0, notional_fraction=1.8, + target_exposure=1.0, ars_score=1.0, bucket_idx=1, actuated=True, + ) + ok = j.journal(bad, mono_ns=1) + assert ok is False + assert j.rows_rejected == 1 and j.rows_emitted == 0 + assert captured == [] # nothing reached the spool + + +def test_negative_exposure_rejected(): + j = VioletDecisionJournal(sink=lambda t, r: None, session_id="s") + bad = SimpleNamespace( + scan_number=1, asset="BTCUSDT", side="SHORT", vel_div=-0.2, + fraction=0.2, conviction_leverage=9.0, notional_fraction=1.8, + target_exposure=-5.0, ars_score=1.0, bucket_idx=1, actuated=True, + ) + assert j.journal(bad, mono_ns=1) is False + assert j.rows_rejected == 1 diff --git a/prod/clickhouse/violet/22_violet_decisions.sql b/prod/clickhouse/violet/22_violet_decisions.sql new file mode 100644 index 0000000..e4a8307 --- /dev/null +++ b/prod/clickhouse/violet/22_violet_decisions.sql @@ -0,0 +1,26 @@ +-- VIOLET V3e: shadow decision journal. One row per ACTUATED ShadowDecision the +-- VioletDecisionEngine emits (the muted "what BLUE would do this scan"). NEVER an +-- order — execution stays off behind the ObserveOnlyVenue guard. session_id is a +-- boot-time UUID (mono_ns is meaningless across restarts without it). +-- Apply via apply_violet_ddl.py (one statement per HTTP POST). + +CREATE TABLE IF NOT EXISTS dolphin_violet.violet_decisions +( + `ts` DateTime64(3, 'UTC'), + `session_id` String, + `scan_number` UInt64, + `mono_ns` UInt64, + `asset` LowCardinality(String), + `side` LowCardinality(String), + `vel_div` Float64, + `fraction` Float64, + `conviction_leverage` Float64, + `notional_fraction` Float64, + `target_exposure` Float64, + `ars_score` Float64, + `bucket_idx` UInt8, + `actuated` UInt8 +) +ENGINE = MergeTree +ORDER BY (asset, ts) +TTL toDate(ts) + toIntervalDay(180);