domain.py: refined scalar aliases (BarsHeld kills the bars_held=-106 UInt16 poison class by construction), DivergenceRow (DDL-shaped, frozen, extra=forbid), ExecDriverSettings (env boundary for the V2 driver; ttl override exists because the shared router clamps TTLs >= 0.5s), ExecGateReport schema, beartype 'typed' decorator with DOLPHIN_VIOLET_BEARTYPE=0 kill-switch. divergence.py: rows now parse through DivergenceRow before the sink — malformed rows die at the source with a rate-limited WARNING + counter, never at the head of the CH spool. Properties (hypothesis, derandomized): ExecutionRouter state machine (fill/retry mutual exclusion via pop-semantics, R1 exit escalation same trade_id, bounded retry chains, <=1 working ENTER), LatencyHistogram percentile laws (member-of-samples, monotone, extremes), DivergenceRow parse laws. 34 new tests; violet suite 64 green; router 77 green; zero shared-file edits. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
163 lines
6.5 KiB
Python
163 lines
6.5 KiB
Python
"""VIOLET V-TYPES domain layer (Sprint 2, V2a).
|
|
|
|
Doctrine: PARSE, DON'T VALIDATE. Raw ints/floats/strs never cross a module
|
|
boundary; refined types are constructed once at ingress and construction
|
|
failure is a loud reject at the source — never a poisoned row downstream.
|
|
|
|
Motivating incident (2026-06-12): BLUE wrote ``bars_held=-106`` into a
|
|
UInt16 ClickHouse column → the insert was rejected forever → an 18.7M-row
|
|
spool jam → zombie trade resurrections booking fake losses. A negative
|
|
bars_held was REPRESENTABLE in the code path at all; that is the defect
|
|
class this module kills by construction (see ``BarsHeld``).
|
|
|
|
Layers provided here:
|
|
- refined scalar aliases (``Annotated[...]`` + pydantic ``Field`` ranges)
|
|
reused by every violet boundary model;
|
|
- boundary models: ``DivergenceRow`` (DDL-shaped, validated before any row
|
|
reaches the CH spool), ``ExecDriverSettings`` (the env boundary of the V2
|
|
exec driver), ``ExecGateReport`` (the V2 gate JSON schema);
|
|
- the ``typed`` decorator: ``beartype`` runtime enforcement with the
|
|
``DOLPHIN_VIOLET_BEARTYPE=0`` kill-switch (the V2 gate measures its
|
|
latency cost before permanent adoption).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from typing import Annotated, Any, Dict, Optional
|
|
|
|
from pydantic import BaseModel, ConfigDict, Field
|
|
|
|
__all__ = [
|
|
"typed",
|
|
"MonoNs", "EpochMs", "Ms", "Bps", "Px", "Qty", "Seq",
|
|
"BarsHeld", "TradeId", "SessionId", "Symbol",
|
|
"StrictModel", "DivergenceRow", "ExecDriverSettings", "ExecGateReport",
|
|
]
|
|
|
|
|
|
# ── beartype kill-switch ──────────────────────────────────────────────────────
|
|
# Evaluated at import: a process-level switch, not a per-call toggle. The V2
|
|
# gate runs once per setting and records the latency delta in its report.
|
|
|
|
def _beartype_enabled() -> bool:
|
|
raw = os.environ.get("DOLPHIN_VIOLET_BEARTYPE", "1")
|
|
return str(raw).strip().lower() not in ("0", "false", "no", "off")
|
|
|
|
|
|
if _beartype_enabled():
|
|
from beartype import beartype as typed
|
|
else:
|
|
def typed(fn): # type: ignore[misc] # identity when killed
|
|
return fn
|
|
|
|
|
|
# ── refined scalar aliases ────────────────────────────────────────────────────
|
|
|
|
MonoNs = Annotated[int, Field(ge=0)]
|
|
EpochMs = Annotated[int, Field(gt=0)]
|
|
Ms = Annotated[float, Field(ge=0, allow_inf_nan=False)]
|
|
Bps = Annotated[float, Field(allow_inf_nan=False)]
|
|
Px = Annotated[float, Field(gt=0, allow_inf_nan=False)]
|
|
Qty = Annotated[float, Field(ge=0, allow_inf_nan=False)]
|
|
Seq = Annotated[int, Field(ge=0)]
|
|
# UInt16 in ClickHouse — the bars_held=-106 incident type. 0..65535, period.
|
|
BarsHeld = Annotated[int, Field(ge=0, le=65535)]
|
|
TradeId = Annotated[str, Field(min_length=1, max_length=64)]
|
|
SessionId = Annotated[str, Field(min_length=1, max_length=64)]
|
|
Symbol = Annotated[str, Field(min_length=1, max_length=32, pattern=r"^[A-Z0-9\-_]+$")]
|
|
|
|
|
|
class StrictModel(BaseModel):
|
|
"""Frozen + extra=forbid: a violet boundary model is a contract, not a bag."""
|
|
|
|
model_config = ConfigDict(frozen=True, extra="forbid")
|
|
|
|
|
|
# ── boundary models ───────────────────────────────────────────────────────────
|
|
|
|
class DivergenceRow(StrictModel):
|
|
"""One ``dolphin_violet.violet_feed_divergence`` row.
|
|
|
|
Field set MUST equal the DDL column set in
|
|
``prod/clickhouse/violet/20_violet_feed_divergence.sql`` (asserted by
|
|
test_violet_domain). Constructed in FeedDivergenceMonitor.on_scan before
|
|
the sink — a malformed row dies here, not at the head of the CH spool.
|
|
"""
|
|
|
|
ts: EpochMs # DateTime64(3) — epoch milliseconds
|
|
session_id: SessionId
|
|
asset: Symbol
|
|
scan_price: Px
|
|
venue_mid: Px
|
|
divergence_bps: Bps
|
|
scan_seq: Seq
|
|
venue_seq: Seq
|
|
mono_ns: MonoNs
|
|
|
|
|
|
class ExecDriverSettings(StrictModel):
|
|
"""Env boundary of the V2 exec driver.
|
|
|
|
``ttl_override_ms`` exists because the shared router clamps TTLs to
|
|
>= 0.5 s (exec_router.py from_env [0.5, 300] + register_working
|
|
``max(0.5, ttl_s)`` floor): the VIOLET driver is the timing authority
|
|
and schedules ``min(plan.ttl_s, override)`` on its own DeadlineScheduler.
|
|
``None`` (env value "plan") means honor plan.ttl_s verbatim.
|
|
"""
|
|
|
|
ttl_override_ms: Optional[Ms] = 100.0
|
|
requote_hot_window_ns: MonoNs = 5_000_000_000 # mirrors pink_direct hot window
|
|
resolve_grace_ms: Ms = 0.0
|
|
|
|
@classmethod
|
|
def from_env(cls, env: Optional[Dict[str, str]] = None) -> "ExecDriverSettings":
|
|
"""Parse the DOLPHIN_VIOLET_EXEC_* env boundary. A malformed value
|
|
raises at boot — loud reject at the source, never a silent default."""
|
|
e: Any = os.environ if env is None else env
|
|
raw_ttl = str(e.get("DOLPHIN_VIOLET_EXEC_TTL_MS", "100") or "100").strip().lower()
|
|
ttl: Optional[float]
|
|
ttl = None if raw_ttl in ("plan", "none", "off") else float(raw_ttl)
|
|
hot_s = float(e.get("DOLPHIN_VIOLET_EXEC_REQUOTE_HOT_S", "5") or "5")
|
|
grace = float(e.get("DOLPHIN_VIOLET_EXEC_RESOLVE_GRACE_MS", "0") or "0")
|
|
return cls(
|
|
ttl_override_ms=ttl,
|
|
requote_hot_window_ns=int(hot_s * 1_000_000_000),
|
|
resolve_grace_ms=grace,
|
|
)
|
|
|
|
def ttl_ms_for(self, plan_ttl_s: float) -> float:
|
|
"""Effective deadline for one working order, in milliseconds.
|
|
|
|
Taker plans carry ttl_s=0 (no TTL management) — but the driver only
|
|
schedules deadlines for maker plans, so a 0 here means "override
|
|
alone" rather than "instant".
|
|
"""
|
|
plan_ms = max(0.0, float(plan_ttl_s)) * 1000.0
|
|
if self.ttl_override_ms is None:
|
|
return plan_ms
|
|
if plan_ms <= 0.0:
|
|
return float(self.ttl_override_ms)
|
|
return min(plan_ms, float(self.ttl_override_ms))
|
|
|
|
|
|
class ExecGateReport(StrictModel):
|
|
"""Schema for ``prod/VIOLET_dev/reports/violet_v2_exec_gate_*.json``."""
|
|
|
|
generated_utc: str
|
|
host: str
|
|
script: Dict[str, Any]
|
|
cycles: Seq
|
|
scenarios: Dict[str, int]
|
|
jitter: Dict[str, Any]
|
|
ttl_resolution: Dict[str, Any]
|
|
kernel_call: Dict[str, Any] = Field(default_factory=dict)
|
|
early_fires: Seq
|
|
stuck_orders: Seq
|
|
pending_deadlines: Seq
|
|
terminals_ok: bool
|
|
accounting_ok: bool
|
|
deterministic: bool
|
|
beartype: Dict[str, Any] = Field(default_factory=dict)
|
|
passed: bool
|