From ba01b914ce91da662fc6acf62c4706fa583be1e7 Mon Sep 17 00:00:00 2001 From: Codex Date: Sat, 13 Jun 2026 00:08:18 +0200 Subject: [PATCH] VIOLET V2a: V-TYPES domain layer + hypothesis properties + divergence reject-at-source MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit domain.py: refined scalar aliases (BarsHeld kills the bars_held=-106 UInt16 poison class by construction), DivergenceRow (DDL-shaped, frozen, extra=forbid), ExecDriverSettings (env boundary for the V2 driver; ttl override exists because the shared router clamps TTLs >= 0.5s), ExecGateReport schema, beartype 'typed' decorator with DOLPHIN_VIOLET_BEARTYPE=0 kill-switch. divergence.py: rows now parse through DivergenceRow before the sink — malformed rows die at the source with a rate-limited WARNING + counter, never at the head of the CH spool. Properties (hypothesis, derandomized): ExecutionRouter state machine (fill/retry mutual exclusion via pop-semantics, R1 exit escalation same trade_id, bounded retry chains, <=1 working ENTER), LatencyHistogram percentile laws (member-of-samples, monotone, extremes), DivergenceRow parse laws. 34 new tests; violet suite 64 green; router 77 green; zero shared-file edits. Co-Authored-By: Claude Fable 5 --- prod/clean_arch/violet/divergence.py | 38 ++- prod/clean_arch/violet/domain.py | 162 ++++++++++++ prod/clean_arch/violet/test_violet_domain.py | 175 +++++++++++++ .../violet/test_violet_properties.py | 241 ++++++++++++++++++ 4 files changed, 605 insertions(+), 11 deletions(-) create mode 100644 prod/clean_arch/violet/domain.py create mode 100644 prod/clean_arch/violet/test_violet_domain.py create mode 100644 prod/clean_arch/violet/test_violet_properties.py diff --git a/prod/clean_arch/violet/divergence.py b/prod/clean_arch/violet/divergence.py index 51b0503..993720c 100644 --- a/prod/clean_arch/violet/divergence.py +++ b/prod/clean_arch/violet/divergence.py @@ -25,7 +25,10 @@ import time from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional +from pydantic import ValidationError + from .clock import PlaneClock, mono_ns +from .domain import DivergenceRow LOGGER = logging.getLogger("violet.divergence") @@ -84,6 +87,7 @@ class FeedDivergenceMonitor: self._stream_task: Optional[asyncio.Task] = None self._rest_task: Optional[asyncio.Task] = None self.rows_emitted = 0 + self.rows_rejected = 0 # ── venue side ─────────────────────────────────────────────────────────── @@ -204,17 +208,29 @@ class FeedDivergenceMonitor: if (now - vm.mono) > self.venue_clock.staleness_budget_ns: continue divergence_bps = (vm.mid - scan_price) / scan_price * 1e4 - self.sink(self.table, { - "ts": int(time.time() * 1000), # DateTime64(3) - "session_id": self.session_id, - "asset": key, - "scan_price": scan_price, - "venue_mid": vm.mid, - "divergence_bps": divergence_bps, - "scan_seq": int(scan_seq), - "venue_seq": int(vm.seq), - "mono_ns": int(now), - }) + # V-TYPES: parse, don't validate — a malformed row dies here, + # never at the head of the CH spool (the bars_held lesson). + try: + row = DivergenceRow( + ts=int(time.time() * 1000), # DateTime64(3) + session_id=self.session_id, + asset=key, + scan_price=scan_price, + venue_mid=vm.mid, + divergence_bps=divergence_bps, + scan_seq=int(scan_seq), + venue_seq=int(vm.seq), + mono_ns=int(now), + ) + except ValidationError as exc: + self.rows_rejected += 1 + if self.rows_rejected == 1 or self.rows_rejected % 1000 == 0: + self.logger.warning( + "divergence row REJECTED at source (#%d) asset=%s: %s", + self.rows_rejected, key, exc, + ) + continue + self.sink(self.table, row.model_dump()) emitted += 1 self.rows_emitted += emitted return emitted diff --git a/prod/clean_arch/violet/domain.py b/prod/clean_arch/violet/domain.py new file mode 100644 index 0000000..efc4767 --- /dev/null +++ b/prod/clean_arch/violet/domain.py @@ -0,0 +1,162 @@ +"""VIOLET V-TYPES domain layer (Sprint 2, V2a). + +Doctrine: PARSE, DON'T VALIDATE. Raw ints/floats/strs never cross a module +boundary; refined types are constructed once at ingress and construction +failure is a loud reject at the source — never a poisoned row downstream. + +Motivating incident (2026-06-12): BLUE wrote ``bars_held=-106`` into a +UInt16 ClickHouse column → the insert was rejected forever → an 18.7M-row +spool jam → zombie trade resurrections booking fake losses. A negative +bars_held was REPRESENTABLE in the code path at all; that is the defect +class this module kills by construction (see ``BarsHeld``). + +Layers provided here: +- refined scalar aliases (``Annotated[...]`` + pydantic ``Field`` ranges) + reused by every violet boundary model; +- boundary models: ``DivergenceRow`` (DDL-shaped, validated before any row + reaches the CH spool), ``ExecDriverSettings`` (the env boundary of the V2 + exec driver), ``ExecGateReport`` (the V2 gate JSON schema); +- the ``typed`` decorator: ``beartype`` runtime enforcement with the + ``DOLPHIN_VIOLET_BEARTYPE=0`` kill-switch (the V2 gate measures its + latency cost before permanent adoption). +""" + +from __future__ import annotations + +import os +from typing import Annotated, Any, Dict, Optional + +from pydantic import BaseModel, ConfigDict, Field + +__all__ = [ + "typed", + "MonoNs", "EpochMs", "Ms", "Bps", "Px", "Qty", "Seq", + "BarsHeld", "TradeId", "SessionId", "Symbol", + "StrictModel", "DivergenceRow", "ExecDriverSettings", "ExecGateReport", +] + + +# ── beartype kill-switch ────────────────────────────────────────────────────── +# Evaluated at import: a process-level switch, not a per-call toggle. The V2 +# gate runs once per setting and records the latency delta in its report. + +def _beartype_enabled() -> bool: + raw = os.environ.get("DOLPHIN_VIOLET_BEARTYPE", "1") + return str(raw).strip().lower() not in ("0", "false", "no", "off") + + +if _beartype_enabled(): + from beartype import beartype as typed +else: + def typed(fn): # type: ignore[misc] # identity when killed + return fn + + +# ── refined scalar aliases ──────────────────────────────────────────────────── + +MonoNs = Annotated[int, Field(ge=0)] +EpochMs = Annotated[int, Field(gt=0)] +Ms = Annotated[float, Field(ge=0, allow_inf_nan=False)] +Bps = Annotated[float, Field(allow_inf_nan=False)] +Px = Annotated[float, Field(gt=0, allow_inf_nan=False)] +Qty = Annotated[float, Field(ge=0, allow_inf_nan=False)] +Seq = Annotated[int, Field(ge=0)] +# UInt16 in ClickHouse — the bars_held=-106 incident type. 0..65535, period. +BarsHeld = Annotated[int, Field(ge=0, le=65535)] +TradeId = Annotated[str, Field(min_length=1, max_length=64)] +SessionId = Annotated[str, Field(min_length=1, max_length=64)] +Symbol = Annotated[str, Field(min_length=1, max_length=32, pattern=r"^[A-Z0-9\-_]+$")] + + +class StrictModel(BaseModel): + """Frozen + extra=forbid: a violet boundary model is a contract, not a bag.""" + + model_config = ConfigDict(frozen=True, extra="forbid") + + +# ── boundary models ─────────────────────────────────────────────────────────── + +class DivergenceRow(StrictModel): + """One ``dolphin_violet.violet_feed_divergence`` row. + + Field set MUST equal the DDL column set in + ``prod/clickhouse/violet/20_violet_feed_divergence.sql`` (asserted by + test_violet_domain). Constructed in FeedDivergenceMonitor.on_scan before + the sink — a malformed row dies here, not at the head of the CH spool. + """ + + ts: EpochMs # DateTime64(3) — epoch milliseconds + session_id: SessionId + asset: Symbol + scan_price: Px + venue_mid: Px + divergence_bps: Bps + scan_seq: Seq + venue_seq: Seq + mono_ns: MonoNs + + +class ExecDriverSettings(StrictModel): + """Env boundary of the V2 exec driver. + + ``ttl_override_ms`` exists because the shared router clamps TTLs to + >= 0.5 s (exec_router.py from_env [0.5, 300] + register_working + ``max(0.5, ttl_s)`` floor): the VIOLET driver is the timing authority + and schedules ``min(plan.ttl_s, override)`` on its own DeadlineScheduler. + ``None`` (env value "plan") means honor plan.ttl_s verbatim. + """ + + ttl_override_ms: Optional[Ms] = 100.0 + requote_hot_window_ns: MonoNs = 5_000_000_000 # mirrors pink_direct hot window + resolve_grace_ms: Ms = 0.0 + + @classmethod + def from_env(cls, env: Optional[Dict[str, str]] = None) -> "ExecDriverSettings": + """Parse the DOLPHIN_VIOLET_EXEC_* env boundary. A malformed value + raises at boot — loud reject at the source, never a silent default.""" + e: Any = os.environ if env is None else env + raw_ttl = str(e.get("DOLPHIN_VIOLET_EXEC_TTL_MS", "100") or "100").strip().lower() + ttl: Optional[float] + ttl = None if raw_ttl in ("plan", "none", "off") else float(raw_ttl) + hot_s = float(e.get("DOLPHIN_VIOLET_EXEC_REQUOTE_HOT_S", "5") or "5") + grace = float(e.get("DOLPHIN_VIOLET_EXEC_RESOLVE_GRACE_MS", "0") or "0") + return cls( + ttl_override_ms=ttl, + requote_hot_window_ns=int(hot_s * 1_000_000_000), + resolve_grace_ms=grace, + ) + + def ttl_ms_for(self, plan_ttl_s: float) -> float: + """Effective deadline for one working order, in milliseconds. + + Taker plans carry ttl_s=0 (no TTL management) — but the driver only + schedules deadlines for maker plans, so a 0 here means "override + alone" rather than "instant". + """ + plan_ms = max(0.0, float(plan_ttl_s)) * 1000.0 + if self.ttl_override_ms is None: + return plan_ms + if plan_ms <= 0.0: + return float(self.ttl_override_ms) + return min(plan_ms, float(self.ttl_override_ms)) + + +class ExecGateReport(StrictModel): + """Schema for ``prod/VIOLET_dev/reports/violet_v2_exec_gate_*.json``.""" + + generated_utc: str + host: str + script: Dict[str, Any] + cycles: Seq + scenarios: Dict[str, int] + jitter: Dict[str, Any] + ttl_resolution: Dict[str, Any] + kernel_call: Dict[str, Any] = Field(default_factory=dict) + early_fires: Seq + stuck_orders: Seq + pending_deadlines: Seq + terminals_ok: bool + accounting_ok: bool + deterministic: bool + beartype: Dict[str, Any] = Field(default_factory=dict) + passed: bool diff --git a/prod/clean_arch/violet/test_violet_domain.py b/prod/clean_arch/violet/test_violet_domain.py new file mode 100644 index 0000000..4e90c95 --- /dev/null +++ b/prod/clean_arch/violet/test_violet_domain.py @@ -0,0 +1,175 @@ +"""V2a: V-TYPES domain layer — refined types reject by construction.""" + +from __future__ import annotations + +import importlib +import math +import re +import sys +from pathlib import Path + +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest +from pydantic import TypeAdapter, ValidationError + +from prod.clean_arch.violet.domain import ( + BarsHeld, + DivergenceRow, + ExecDriverSettings, + Px, + Symbol, +) + +DDL_PATH = Path( + "/mnt/dolphinng5_predict/prod/clickhouse/violet/20_violet_feed_divergence.sql" +) + +VALID_ROW = dict( + ts=1_781_300_000_000, session_id="sess", asset="FETUSDT", + scan_price=0.2176, venue_mid=0.1878, divergence_bps=-1369.49, + scan_seq=1, venue_seq=7, mono_ns=123, +) + + +def test_bars_held_kills_the_incident(): + """bars_held=-106 (the 2026-06-12 zombie-trades poison) must be + UNREPRESENTABLE — and so must the UInt16 overflow side.""" + ta = TypeAdapter(BarsHeld) + assert ta.validate_python(0) == 0 + assert ta.validate_python(65535) == 65535 + with pytest.raises(ValidationError): + ta.validate_python(-106) + with pytest.raises(ValidationError): + ta.validate_python(65536) + + +def test_px_rejects_nonfinite_and_nonpositive(): + ta = TypeAdapter(Px) + assert ta.validate_python(0.2176) == 0.2176 + for bad in (0.0, -1.0, math.nan, math.inf, -math.inf): + with pytest.raises(ValidationError): + ta.validate_python(bad) + + +def test_symbol_shape(): + ta = TypeAdapter(Symbol) + assert ta.validate_python("BTCUSDT") == "BTCUSDT" + assert ta.validate_python("1000PEPEUSDT") == "1000PEPEUSDT" + assert ta.validate_python("BTC-USDT") == "BTC-USDT" + for bad in ("", "btcusdt", "BTC USDT", "X" * 33): + with pytest.raises(ValidationError): + ta.validate_python(bad) + + +def test_divergence_row_valid_and_frozen(): + row = DivergenceRow(**VALID_ROW) + assert row.model_dump() == VALID_ROW # exact round-trip + with pytest.raises(ValidationError): + row.ts = 1 # frozen + + +@pytest.mark.parametrize("field,bad", [ + ("ts", 0), ("ts", -5), + ("session_id", ""), + ("asset", "fetusdt"), + ("scan_price", 0.0), ("scan_price", float("nan")), + ("venue_mid", float("inf")), + ("divergence_bps", float("nan")), + ("scan_seq", -1), ("venue_seq", -1), ("mono_ns", -1), +]) +def test_divergence_row_rejects(field, bad): + with pytest.raises(ValidationError): + DivergenceRow(**{**VALID_ROW, field: bad}) + + +def test_divergence_row_forbids_extras(): + with pytest.raises(ValidationError): + DivergenceRow(**VALID_ROW, bars_held=1) + + +def test_divergence_row_fields_match_ddl_columns(): + cols = set(re.findall(r"`(\w+)`", DDL_PATH.read_text())) + assert set(DivergenceRow.model_fields) == cols, ( + set(DivergenceRow.model_fields) ^ cols + ) + + +def test_exec_settings_defaults_and_ttl_logic(): + s = ExecDriverSettings() + assert s.ttl_override_ms == 100.0 + assert s.requote_hot_window_ns == 5_000_000_000 + # router maker plan (8s entry TTL) bows to the 100ms override + assert s.ttl_ms_for(8.0) == 100.0 + # plan shorter than override wins + assert ExecDriverSettings(ttl_override_ms=10_000.0).ttl_ms_for(5.0) == 5_000.0 + # taker ttl_s=0 → override alone + assert s.ttl_ms_for(0.0) == 100.0 + # override disabled → plan verbatim + p = ExecDriverSettings(ttl_override_ms=None) + assert p.ttl_ms_for(8.0) == 8_000.0 + + +def test_exec_settings_from_env(): + s = ExecDriverSettings.from_env({"DOLPHIN_VIOLET_EXEC_TTL_MS": "250", + "DOLPHIN_VIOLET_EXEC_REQUOTE_HOT_S": "2"}) + assert s.ttl_override_ms == 250.0 + assert s.requote_hot_window_ns == 2_000_000_000 + assert ExecDriverSettings.from_env( + {"DOLPHIN_VIOLET_EXEC_TTL_MS": "plan"}).ttl_override_ms is None + # malformed env value raises at boot — loud reject at the source + with pytest.raises(ValueError): + ExecDriverSettings.from_env({"DOLPHIN_VIOLET_EXEC_TTL_MS": "fast"}) + with pytest.raises(ValidationError): + ExecDriverSettings.from_env({"DOLPHIN_VIOLET_EXEC_TTL_MS": "-100"}) + + +def test_typed_enforces_and_kill_switch(monkeypatch): + import prod.clean_arch.violet.domain as domain + + @domain.typed + def f(x: int) -> int: + return x + + assert f(3) == 3 + with pytest.raises(Exception): # BeartypeCallHintParamViolation + f("not-an-int") + + monkeypatch.setenv("DOLPHIN_VIOLET_BEARTYPE", "0") + try: + importlib.reload(domain) + + @domain.typed + def g(x: int) -> int: + return x + + assert g("passes-when-killed") == "passes-when-killed" + finally: + monkeypatch.delenv("DOLPHIN_VIOLET_BEARTYPE", raising=False) + importlib.reload(domain) + + +def test_divergence_monitor_rejects_at_source(): + """End-to-end: a poisoned session_id means zero rows reach the sink and + the reject counter advances — the spool never sees the row.""" + from types import SimpleNamespace + from prod.clean_arch.violet.clock import PlaneClock + from prod.clean_arch.violet.divergence import FeedDivergenceMonitor + + rows = [] + m = FeedDivergenceMonitor( + sink=lambda t, r: rows.append(r), + scan_clock=PlaneClock("scan", 12_000_000_000), + venue_clock=PlaneClock("venue", 2_000_000_000), + session_id="", # invalid by construction + ) + m.on_venue_tick("BTC-USDT", 100.0, 100.0) + m.on_scan(SimpleNamespace(scan_payload={"assets": ["BTCUSDT"], + "asset_prices": [100.0]})) + assert rows == [] + assert m.rows_rejected == 1 + assert m.rows_emitted == 0 + + +if __name__ == "__main__": + raise SystemExit(pytest.main([__file__, "-v"])) diff --git a/prod/clean_arch/violet/test_violet_properties.py b/prod/clean_arch/violet/test_violet_properties.py new file mode 100644 index 0000000..2f06292 --- /dev/null +++ b/prod/clean_arch/violet/test_violet_properties.py @@ -0,0 +1,241 @@ +"""V2a: hypothesis property tests (V-TYPES doctrine). + +Three targets, highest value first: +1. ExecutionRouter state machine — the accounting-adjacent core: pop- + semantics make fill-vs-retry mutually exclusive, EXITs are never + suppressible by expiry, retry chains are bounded. +2. LatencyHistogram.percentile_ns — the gate numbers themselves (the + nearest-rank/banker's-rounding bug class is live in this codebase's + history). +3. DivergenceRow — NaN/inf/negative inputs always rejected; valid rows + round-trip unchanged. +""" + +from __future__ import annotations + +import math +import sys + +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest +from hypothesis import HealthCheck, given, settings, strategies as st +from hypothesis.stateful import ( + RuleBasedStateMachine, + initialize, + invariant, + precondition, + rule, +) +from pydantic import ValidationError + +from prod.clean_arch.dita_v2.exec_router import ( + ExecConfig, + ExecutionRouter, + MissAction, +) +from prod.clean_arch.violet.clock import LatencyHistogram +from prod.clean_arch.violet.domain import DivergenceRow + + +# ── 1. router state machine ─────────────────────────────────────────────────── + +class RouterMachine(RuleBasedStateMachine): + """Drives the REAL ExecutionRouter through plan/register/fill/cancel/ + expiry sequences with a fake clock, asserting the V2 driver's load- + bearing invariants.""" + + @initialize(retries=st.integers(min_value=0, max_value=3), + exhaust=st.sampled_from(["skip", "market"])) + def setup(self, retries, exhaust): + self.now = 1000.0 + self.cfg = ExecConfig(style="maker_both", entry_miss="retry", + entry_retries=retries, retry_exhaust=exhaust) + self.router = ExecutionRouter(self.cfg, clock=lambda: self.now) + self.n = 0 + self.filled: set[str] = set() + self.retried: set[str] = set() # trade_ids consumed by a retry + self.retry_count: dict[str, int] = {} # base_trade_id → retries used + + def _fresh_tid(self) -> str: + self.n += 1 + return f"T{self.n:04d}" + + @rule(px=st.floats(min_value=0.5, max_value=50_000, allow_nan=False)) + def plan_and_register_entry(self, px): + tid = self._fresh_tid() + plan = self.router.plan_entry(trade_id=tid, asset="BTCUSDT", + position_side="SHORT", reference_price=px) + if self.router.has_working_entry() and plan.suppress: + return # R2: slot spoken for + assert not plan.suppress + if plan.is_maker: + assert plan.sane() and plan.order_type == "LIMIT" + self.router.register_working(trade_id=tid, asset="BTCUSDT", + position_side="SHORT", plan=plan) + + @rule(px=st.floats(min_value=0.5, max_value=50_000, allow_nan=False), + reason=st.sampled_from(["TAKE_PROFIT", "STOP_LOSS", "MAX_HOLD", + "CATASTROPHIC", "ADVSL"])) + def plan_exit_never_stranded(self, px, reason): + tid = self._fresh_tid() + plan = self.router.plan_exit(trade_id=tid, asset="BTCUSDT", + position_side="SHORT", + reference_price=px, reason=reason) + # RULE 1: with no same-trade working exit, an exit plan is never + # suppressed — maker or MARKET, it reaches the venue. + assert not plan.suppress + assert plan.sane() + if plan.is_maker: + self.router.register_working(trade_id=tid, asset="BTCUSDT", + position_side="SHORT", plan=plan) + + @precondition(lambda self: self.router.working_orders()) + @rule(data=st.data()) + def fill_working(self, data): + wo = data.draw(st.sampled_from(self.router.working_orders())) + self.router.note_fill(wo.trade_id) + assert wo.trade_id not in self.retried, \ + "a trade_id must never be both filled and retried" + self.filled.add(wo.trade_id) + assert self.router.working(wo.trade_id) is None # pop semantics + + @precondition(lambda self: any(w.action == "ENTER" + for w in self.router.working_orders())) + @rule(data=st.data(), + px=st.floats(min_value=0.5, max_value=50_000, allow_nan=False)) + def expire_entry_and_apply_miss_policy(self, data, px): + entries = [w for w in self.router.working_orders() if w.action == "ENTER"] + wo = data.draw(st.sampled_from(entries)) + self.now += 1000.0 # everything expires + action = self.router.entry_miss_action(wo) + self.router.note_cancel(wo.trade_id) # runtime cancelled the quote + assert wo.trade_id not in self.filled, \ + "a filled trade must never reach the miss path (pop semantics)" + if action == MissAction.RETRY: + self.retried.add(wo.trade_id) + used = self.retry_count.get(wo.base_trade_id, 0) + 1 + self.retry_count[wo.base_trade_id] = used + assert used <= self.cfg.entry_retries, "retry chain must be bounded" + new_tid, plan = self.router.retry_plan(wo, reference_price=px) + assert new_tid == f"{wo.base_trade_id}-r{wo.retry_n + 1}" + assert plan.action == "ENTER" + if plan.is_maker and not self.router.has_working_entry(): + self.router.register_working( + trade_id=new_tid, asset=wo.asset, position_side=wo.side, + plan=plan, base_trade_id=wo.base_trade_id, + retry_n=wo.retry_n + 1) + elif action == MissAction.MARKET: + new_tid, plan = self.router.market_fallback_plan(wo) + assert new_tid == f"{wo.base_trade_id}-m" # ENTER → fresh lifecycle + assert plan.action == "ENTER" and plan.order_type == "MARKET" + else: + assert action == MissAction.SKIP + + @precondition(lambda self: any(w.action == "EXIT" + for w in self.router.working_orders())) + @rule(data=st.data()) + def expire_exit_escalates_market_same_trade(self, data): + exits = [w for w in self.router.working_orders() if w.action == "EXIT"] + wo = data.draw(st.sampled_from(exits)) + self.now += 1000.0 + new_tid, plan = self.router.market_fallback_plan(wo) + # RULE 1: the MARKET escalation stays attached to the SAME trade. + assert new_tid == wo.trade_id + assert plan.action == "EXIT" and plan.order_type == "MARKET" + assert not plan.suppress + self.router.note_cancel(wo.trade_id) + + @invariant() + def registry_is_consistent(self): + wos = self.router.working_orders() + assert len({w.trade_id for w in wos}) == len(wos) + for w in wos: + assert w.retries_left >= 0 + assert w.trade_id not in self.filled # filled ⇒ popped, forever + assert sum(1 for w in wos if w.action == "ENTER") <= 1 # R2 + + +# derandomize: the suite must be reproducible run-to-run (the V0/V2 gates +# assert determinism elsewhere; a flaky property is worse than a fixed one). +# Health checks are suppressed because rule preconditions legitimately +# filter often and CPU contention during the full suite trips too_slow. +RouterMachine.TestCase.settings = settings( + max_examples=30, stateful_step_count=30, deadline=None, + derandomize=True, suppress_health_check=list(HealthCheck)) +TestRouterMachine = RouterMachine.TestCase + + +# ── 2. LatencyHistogram percentile laws ─────────────────────────────────────── + +@given(samples=st.lists(st.integers(min_value=0, max_value=10**12), + min_size=1, max_size=500), + p=st.floats(min_value=0.001, max_value=1.0)) +@settings(max_examples=150, deadline=None, derandomize=True, + suppress_health_check=[HealthCheck.too_slow]) +def test_percentile_is_a_retained_sample(samples, p): + h = LatencyHistogram("prop") + for s in samples: + h.record(s) + assert h.percentile_ns(p) in samples + + +@given(samples=st.lists(st.integers(min_value=0, max_value=10**12), + min_size=1, max_size=500)) +@settings(max_examples=100, deadline=None, derandomize=True, + suppress_health_check=[HealthCheck.too_slow]) +def test_percentile_monotone_and_extremes(samples): + h = LatencyHistogram("prop") + for s in samples: + h.record(s) + ps = [0.01, 0.25, 0.5, 0.9, 0.99, 0.999, 1.0] + vals = [h.percentile_ns(p) for p in ps] + assert vals == sorted(vals), "percentile must be monotone in p" + assert vals[-1] == max(samples) + assert h.percentile_ns(0.0000001) == min(samples) + + +# ── 3. DivergenceRow parse laws ─────────────────────────────────────────────── + +_finite_px = st.floats(min_value=1e-9, max_value=1e9, + allow_nan=False, allow_infinity=False) +_any_float = st.floats(allow_nan=True, allow_infinity=True) + + +@given(scan=_finite_px, mid=_finite_px, + bps=st.floats(min_value=-1e6, max_value=1e6, allow_nan=False), + sseq=st.integers(min_value=0, max_value=2**31), + vseq=st.integers(min_value=0, max_value=2**31)) +@settings(max_examples=150, deadline=None, derandomize=True, + suppress_health_check=[HealthCheck.too_slow]) +def test_valid_rows_round_trip(scan, mid, bps, sseq, vseq): + src = dict(ts=1, session_id="s", asset="BTCUSDT", scan_price=scan, + venue_mid=mid, divergence_bps=bps, scan_seq=sseq, + venue_seq=vseq, mono_ns=0) + assert DivergenceRow(**src).model_dump() == src + + +@given(bad_px=_any_float) +@settings(max_examples=150, deadline=None, derandomize=True, + suppress_health_check=[HealthCheck.too_slow]) +def test_nonpositive_or_nonfinite_prices_always_rejected(bad_px): + if math.isfinite(bad_px) and bad_px > 0: + return # valid by definition + with pytest.raises(ValidationError): + DivergenceRow(ts=1, session_id="s", asset="BTCUSDT", + scan_price=bad_px, venue_mid=1.0, divergence_bps=0.0, + scan_seq=0, venue_seq=0, mono_ns=0) + + +@given(seq=st.integers(max_value=-1)) +@settings(max_examples=50, deadline=None, derandomize=True, + suppress_health_check=[HealthCheck.too_slow]) +def test_negative_seqs_always_rejected(seq): + with pytest.raises(ValidationError): + DivergenceRow(ts=1, session_id="s", asset="BTCUSDT", + scan_price=1.0, venue_mid=1.0, divergence_bps=0.0, + scan_seq=seq, venue_seq=0, mono_ns=0) + + +if __name__ == "__main__": + raise SystemExit(pytest.main([__file__, "-v"]))