VIOLET V3e: shadow-decision journal + DDL (reject-at-source)

22_violet_decisions.sql (dolphin_violet.violet_decisions, DDL-first) +
shadow_journal.py: VioletDecisionJournal validates each actuated ShadowDecision
via DecisionRow (V-TYPES, allow_inf_nan=False) before the CH sink -- malformed
dies at source, never at the spool head. NEVER an order. DecisionRow field set ==
DDL columns (asserted). 4 tests pass. Launcher wiring + DARK soak = operator step.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Codex
2026-06-13 20:29:53 +02:00
parent 1a449074ae
commit 6f5aa80ed0
3 changed files with 183 additions and 0 deletions

View File

@@ -0,0 +1,84 @@
"""VIOLET V3e: shadow-decision journal — V-TYPES row guard + CH sink.
Sinks each ACTUATED ``ShadowDecision`` to ``dolphin_violet.violet_decisions`` as a
validated row. PARSE, DON'T VALIDATE: a malformed decision dies HERE (rejected to a
counter), never at the head of the CH spool — the bars_held=-106 lesson. NEVER an
order: this journals what BLUE *would* do; execution stays off (ObserveOnlyVenue).
"""
from __future__ import annotations
import logging
import time
from typing import Any, Callable, Optional
from pydantic import Field, ValidationError
from .decision_engine import ShadowDecision
from .domain import EpochMs, MonoNs, SessionId, StrictModel, Symbol, Seq
LOGGER = logging.getLogger("violet.shadow_journal")
TABLE = "violet_decisions"
class DecisionRow(StrictModel):
"""One ``dolphin_violet.violet_decisions`` row. Field set MUST equal the DDL
column set in prod/clickhouse/violet/22_violet_decisions.sql (asserted by test)."""
ts: EpochMs # DateTime64(3) — epoch milliseconds
session_id: SessionId
scan_number: Seq
mono_ns: MonoNs
asset: Symbol
side: str = Field(min_length=1, max_length=8)
vel_div: float = Field(allow_inf_nan=False)
fraction: float = Field(ge=0.0, allow_inf_nan=False)
conviction_leverage: float = Field(ge=0.0, allow_inf_nan=False)
notional_fraction: float = Field(ge=0.0, allow_inf_nan=False)
target_exposure: float = Field(ge=0.0, allow_inf_nan=False)
ars_score: float = Field(allow_inf_nan=False)
bucket_idx: int = Field(ge=0, le=255) # UInt8
actuated: int = Field(ge=0, le=1)
class VioletDecisionJournal:
"""Validate-then-sink shadow decisions. ``sink(table, row_dict)`` mirrors
ch_put_violet / the divergence sink signature."""
def __init__(self, *, sink: Callable[[str, dict], Any], session_id: str):
self.sink = sink
self.session_id = session_id
self.rows_emitted = 0
self.rows_rejected = 0
def journal(self, decision: ShadowDecision, *, mono_ns: int) -> bool:
"""Build a validated row and sink it. Returns True if emitted."""
try:
row = DecisionRow(
ts=int(time.time() * 1000),
session_id=self.session_id,
scan_number=int(decision.scan_number),
mono_ns=int(mono_ns),
asset=decision.asset,
side=decision.side,
vel_div=float(decision.vel_div),
fraction=float(decision.fraction),
conviction_leverage=float(decision.conviction_leverage),
notional_fraction=float(decision.notional_fraction),
target_exposure=float(decision.target_exposure),
ars_score=float(decision.ars_score),
bucket_idx=int(decision.bucket_idx),
actuated=1 if decision.actuated else 0,
)
except ValidationError as exc:
self.rows_rejected += 1
if self.rows_rejected == 1 or self.rows_rejected % 1000 == 0:
LOGGER.warning(
"shadow decision REJECTED at source (#%d) asset=%s: %s",
self.rows_rejected, getattr(decision, "asset", "?"), exc,
)
return False
self.sink(TABLE, row.model_dump())
self.rows_emitted += 1
return True

View File

@@ -0,0 +1,73 @@
"""V3e: shadow-decision journal — DDL-shape parity, sink emission, reject-at-source."""
from __future__ import annotations
import re
import sys
from pathlib import Path
from types import SimpleNamespace
sys.path.insert(0, "/mnt/dolphinng5_predict")
from prod.clean_arch.violet.decision_engine import ShadowDecision
from prod.clean_arch.violet.shadow_journal import DecisionRow, TABLE, VioletDecisionJournal
DDL = Path("/mnt/dolphinng5_predict/prod/clickhouse/violet/22_violet_decisions.sql")
def _ddl_columns() -> set:
body = DDL.read_text()
inner = body[body.index("("): body.rindex(")")]
return set(re.findall(r"`(\w+)`", inner))
def _decision(**over):
base = dict(
ts_ns=10**12, scan_number=7, asset="BTCUSDT", side="SHORT", vel_div=-0.2,
fraction=0.2, conviction_leverage=9.0, notional_fraction=1.8,
target_exposure=124200.0, ars_score=1.48, bucket_idx=1, actuated=True,
)
base.update(over)
return ShadowDecision(**base)
def test_decisionrow_fields_equal_ddl_columns():
assert set(DecisionRow.model_fields.keys()) == _ddl_columns()
def test_journal_emits_validated_row_to_sink():
captured = []
j = VioletDecisionJournal(sink=lambda t, r: captured.append((t, r)), session_id="sess1")
ok = j.journal(_decision(), mono_ns=999)
assert ok and j.rows_emitted == 1
table, row = captured[0]
assert table == TABLE
assert set(row.keys()) == _ddl_columns()
assert row["asset"] == "BTCUSDT" and row["actuated"] == 1
assert row["session_id"] == "sess1" and row["mono_ns"] == 999
def test_reject_at_source_on_malformed_decision():
captured = []
j = VioletDecisionJournal(sink=lambda t, r: captured.append(r), session_id="s")
# duck-typed bad decision: non-finite vel_div must be rejected before the sink.
bad = SimpleNamespace(
scan_number=1, asset="BTCUSDT", side="SHORT", vel_div=float("nan"),
fraction=0.2, conviction_leverage=9.0, notional_fraction=1.8,
target_exposure=1.0, ars_score=1.0, bucket_idx=1, actuated=True,
)
ok = j.journal(bad, mono_ns=1)
assert ok is False
assert j.rows_rejected == 1 and j.rows_emitted == 0
assert captured == [] # nothing reached the spool
def test_negative_exposure_rejected():
j = VioletDecisionJournal(sink=lambda t, r: None, session_id="s")
bad = SimpleNamespace(
scan_number=1, asset="BTCUSDT", side="SHORT", vel_div=-0.2,
fraction=0.2, conviction_leverage=9.0, notional_fraction=1.8,
target_exposure=-5.0, ars_score=1.0, bucket_idx=1, actuated=True,
)
assert j.journal(bad, mono_ns=1) is False
assert j.rows_rejected == 1