"""VIOLET V-TYPES domain layer (Sprint 2, V2a). Doctrine: PARSE, DON'T VALIDATE. Raw ints/floats/strs never cross a module boundary; refined types are constructed once at ingress and construction failure is a loud reject at the source — never a poisoned row downstream. Motivating incident (2026-06-12): BLUE wrote ``bars_held=-106`` into a UInt16 ClickHouse column → the insert was rejected forever → an 18.7M-row spool jam → zombie trade resurrections booking fake losses. A negative bars_held was REPRESENTABLE in the code path at all; that is the defect class this module kills by construction (see ``BarsHeld``). Layers provided here: - refined scalar aliases (``Annotated[...]`` + pydantic ``Field`` ranges) reused by every violet boundary model; - boundary models: ``DivergenceRow`` (DDL-shaped, validated before any row reaches the CH spool), ``ExecDriverSettings`` (the env boundary of the V2 exec driver), ``ExecGateReport`` (the V2 gate JSON schema); - the ``typed`` decorator: ``beartype`` runtime enforcement with the ``DOLPHIN_VIOLET_BEARTYPE=0`` kill-switch (the V2 gate measures its latency cost before permanent adoption). """ from __future__ import annotations import os from typing import Annotated, Any, Dict, Optional from pydantic import BaseModel, ConfigDict, Field __all__ = [ "typed", "MonoNs", "EpochMs", "Ms", "Bps", "Px", "Qty", "Seq", "BarsHeld", "TradeId", "SessionId", "Symbol", "StrictModel", "DivergenceRow", "ExecDriverSettings", "ExecGateReport", ] # ── beartype kill-switch ────────────────────────────────────────────────────── # Evaluated at import: a process-level switch, not a per-call toggle. The V2 # gate runs once per setting and records the latency delta in its report. def _beartype_enabled() -> bool: raw = os.environ.get("DOLPHIN_VIOLET_BEARTYPE", "1") return str(raw).strip().lower() not in ("0", "false", "no", "off") if _beartype_enabled(): from beartype import beartype as typed else: def typed(fn): # type: ignore[misc] # identity when killed return fn # ── refined scalar aliases ──────────────────────────────────────────────────── MonoNs = Annotated[int, Field(ge=0)] EpochMs = Annotated[int, Field(gt=0)] Ms = Annotated[float, Field(ge=0, allow_inf_nan=False)] Bps = Annotated[float, Field(allow_inf_nan=False)] Px = Annotated[float, Field(gt=0, allow_inf_nan=False)] Qty = Annotated[float, Field(ge=0, allow_inf_nan=False)] Seq = Annotated[int, Field(ge=0)] # UInt16 in ClickHouse — the bars_held=-106 incident type. 0..65535, period. BarsHeld = Annotated[int, Field(ge=0, le=65535)] TradeId = Annotated[str, Field(min_length=1, max_length=64)] SessionId = Annotated[str, Field(min_length=1, max_length=64)] Symbol = Annotated[str, Field(min_length=1, max_length=32, pattern=r"^[A-Z0-9\-_]+$")] class StrictModel(BaseModel): """Frozen + extra=forbid: a violet boundary model is a contract, not a bag.""" model_config = ConfigDict(frozen=True, extra="forbid") # ── boundary models ─────────────────────────────────────────────────────────── class DivergenceRow(StrictModel): """One ``dolphin_violet.violet_feed_divergence`` row. Field set MUST equal the DDL column set in ``prod/clickhouse/violet/20_violet_feed_divergence.sql`` (asserted by test_violet_domain). Constructed in FeedDivergenceMonitor.on_scan before the sink — a malformed row dies here, not at the head of the CH spool. """ ts: EpochMs # DateTime64(3) — epoch milliseconds session_id: SessionId asset: Symbol scan_price: Px venue_mid: Px divergence_bps: Bps scan_seq: Seq venue_seq: Seq mono_ns: MonoNs class ExecDriverSettings(StrictModel): """Env boundary of the V2 exec driver. ``ttl_override_ms`` exists because the shared router clamps TTLs to >= 0.5 s (exec_router.py from_env [0.5, 300] + register_working ``max(0.5, ttl_s)`` floor): the VIOLET driver is the timing authority and schedules ``min(plan.ttl_s, override)`` on its own DeadlineScheduler. ``None`` (env value "plan") means honor plan.ttl_s verbatim. """ ttl_override_ms: Optional[Ms] = 100.0 requote_hot_window_ns: MonoNs = 5_000_000_000 # mirrors pink_direct hot window resolve_grace_ms: Ms = 0.0 @classmethod def from_env(cls, env: Optional[Dict[str, str]] = None) -> "ExecDriverSettings": """Parse the DOLPHIN_VIOLET_EXEC_* env boundary. A malformed value raises at boot — loud reject at the source, never a silent default.""" e: Any = os.environ if env is None else env raw_ttl = str(e.get("DOLPHIN_VIOLET_EXEC_TTL_MS", "100") or "100").strip().lower() ttl: Optional[float] ttl = None if raw_ttl in ("plan", "none", "off") else float(raw_ttl) hot_s = float(e.get("DOLPHIN_VIOLET_EXEC_REQUOTE_HOT_S", "5") or "5") grace = float(e.get("DOLPHIN_VIOLET_EXEC_RESOLVE_GRACE_MS", "0") or "0") return cls( ttl_override_ms=ttl, requote_hot_window_ns=int(hot_s * 1_000_000_000), resolve_grace_ms=grace, ) def ttl_ms_for(self, plan_ttl_s: float) -> float: """Effective deadline for one working order, in milliseconds. Taker plans carry ttl_s=0 (no TTL management) — but the driver only schedules deadlines for maker plans, so a 0 here means "override alone" rather than "instant". """ plan_ms = max(0.0, float(plan_ttl_s)) * 1000.0 if self.ttl_override_ms is None: return plan_ms if plan_ms <= 0.0: return float(self.ttl_override_ms) return min(plan_ms, float(self.ttl_override_ms)) class ExecGateReport(StrictModel): """Schema for ``prod/VIOLET_dev/reports/violet_v2_exec_gate_*.json``.""" generated_utc: str host: str script: Dict[str, Any] cycles: Seq scenarios: Dict[str, int] jitter: Dict[str, Any] ttl_resolution: Dict[str, Any] kernel_call: Dict[str, Any] = Field(default_factory=dict) early_fires: Seq stuck_orders: Seq pending_deadlines: Seq terminals_ok: bool accounting_ok: bool deterministic: bool beartype: Dict[str, Any] = Field(default_factory=dict) passed: bool