"""VIOLET V3e: shadow-decision journal — V-TYPES row guard + CH sink. Sinks each ACTUATED ``ShadowDecision`` to ``dolphin_violet.violet_decisions`` as a validated row. PARSE, DON'T VALIDATE: a malformed decision dies HERE (rejected to a counter), never at the head of the CH spool — the bars_held=-106 lesson. NEVER an order: this journals what BLUE *would* do; execution stays off (ObserveOnlyVenue). """ from __future__ import annotations import logging import time from typing import Any, Callable, Optional from pydantic import Field, ValidationError from .decision_engine import ShadowDecision from .domain import EpochMs, MonoNs, SessionId, StrictModel, Symbol, Seq LOGGER = logging.getLogger("violet.shadow_journal") TABLE = "violet_decisions" class DecisionRow(StrictModel): """One ``dolphin_violet.violet_decisions`` row. Field set MUST equal the DDL column set in prod/clickhouse/violet/22_violet_decisions.sql (asserted by test).""" ts: EpochMs # DateTime64(3) — epoch milliseconds session_id: SessionId scan_number: Seq mono_ns: MonoNs asset: Symbol side: str = Field(min_length=1, max_length=8) vel_div: float = Field(allow_inf_nan=False) fraction: float = Field(ge=0.0, allow_inf_nan=False) conviction_leverage: float = Field(ge=0.0, allow_inf_nan=False) notional_fraction: float = Field(ge=0.0, allow_inf_nan=False) target_exposure: float = Field(ge=0.0, allow_inf_nan=False) ars_score: float = Field(allow_inf_nan=False) bucket_idx: int = Field(ge=0, le=255) # UInt8 actuated: int = Field(ge=0, le=1) class VioletDecisionJournal: """Validate-then-sink shadow decisions. ``sink(table, row_dict)`` mirrors ch_put_violet / the divergence sink signature.""" def __init__(self, *, sink: Callable[[str, dict], Any], session_id: str): self.sink = sink self.session_id = session_id self.rows_emitted = 0 self.rows_rejected = 0 def journal(self, decision: ShadowDecision, *, mono_ns: int) -> bool: """Build a validated row and sink it. Returns True if emitted.""" try: row = DecisionRow( ts=int(time.time() * 1000), session_id=self.session_id, scan_number=int(decision.scan_number), mono_ns=int(mono_ns), asset=decision.asset, side=decision.side, vel_div=float(decision.vel_div), fraction=float(decision.fraction), conviction_leverage=float(decision.conviction_leverage), notional_fraction=float(decision.notional_fraction), target_exposure=float(decision.target_exposure), ars_score=float(decision.ars_score), bucket_idx=int(decision.bucket_idx), actuated=1 if decision.actuated else 0, ) except ValidationError as exc: self.rows_rejected += 1 if self.rows_rejected == 1 or self.rows_rejected % 1000 == 0: LOGGER.warning( "shadow decision REJECTED at source (#%d) asset=%s: %s", self.rows_rejected, getattr(decision, "asset", "?"), exc, ) return False self.sink(TABLE, row.model_dump()) self.rows_emitted += 1 return True