"""VIOLET V0 reactor harness — measured latency gate. Drives the REAL DITAv2 ``ExecutionKernel`` (Rust-backed, MOCK venue bundle) through seeded event storms while a ``DeadlineScheduler`` runs concurrently, and reports exact-percentile latency histograms. Measured quantities (single ``mono_ns`` timebase): - ``venue_event_reaction`` (THE GATED NUMBER): the producer stamps ``inject_mono_ns`` when putting an event on the consumer asyncio.Queue — modeling the WS-reader→reactor hop exactly as the production ``_run_account_stream`` consumes its stream — and the reactor records ``mono_ns() - inject_mono_ns`` AFTER the kernel fold returns. This includes event-loop scheduling: the charter's "event→reaction in-process". Gate: p99 < 10 ms. - ``kernel_call``: brackets the kernel FFI call alone (informational). - ``deadline_jitter``: ``t_fire − t_due`` per deadline fired during the storm. Gate: p99 < 25 ms, zero early fires. Determinism: ``random.Random(seed)`` fully determines the event sequence (asserted via sequence hash); latencies naturally vary run to run. CLI: python -m prod.clean_arch.violet.harness \ --events 50000 --deadlines 5000 --seed 42 \ --out prod/VIOLET_dev/reports/violet_v0_latency_$(date -u +%Y%m%d).json \ --gate """ from __future__ import annotations import argparse import asyncio import hashlib import json import platform import random import sys import time from dataclasses import dataclass, field from pathlib import Path from typing import Any, Dict, List, Optional from .clock import ( DeadlineScheduler, LatencyHistogram, PlaneClock, ACCOUNT_STALENESS_NS, mono_ns, ) # Gate thresholds (charter / plan) GATE_REACTION_P99_MS = 10.0 GATE_JITTER_P99_MS = 25.0 # ── Storm specification ────────────────────────────────────────────────────── @dataclass class StormSpec: n_events: int = 50_000 seed: int = 42 # Event-kind mix (account-fold path + FSM mark-price path). mix: Dict[str, float] = field( default_factory=lambda: { "ACCOUNT_UPDATE": 0.35, "FILL_SETTLED": 0.25, "PREDICTED_FILL": 0.15, "FUNDING_FEE": 0.05, "MARK_PRICE": 0.20, } ) # Arrival shape. The GATE measures reaction latency under REALISTIC load: # PINK's live BingX stream shows fill cascades of ~4-8 events per burst # (PREDICTED_FILL + FILL_SETTLED + ACCOUNT_UPDATE per action) at low # sustained rates. burst_size=8 / 12 ms ≈ 667 events/s offered — still # >10x production sustained rates. Larger bursts measure intra-burst # QUEUEING physics, not reaction (at ~0.33 ms/fold the tail of a # 16-burst waits ~5 ms before its turn; a 32/1ms storm saturates the # queue outright — measured: see archived 2026-06-12 capacity reports). # Use bigger bursts deliberately for capacity exploration, not the gate. arrival: str = "burst" # uniform | poisson | burst burst_size: int = 8 inter_burst_ms: float = 12.0 deadlines: int = 5_000 deadline_spread_ms: tuple = (10, 2_000) warmup_events: int = 1_000 # excluded from gate percentiles def to_dict(self) -> Dict[str, Any]: return { "n_events": self.n_events, "seed": self.seed, "mix": dict(self.mix), "arrival": self.arrival, "burst_size": self.burst_size, "inter_burst_ms": self.inter_burst_ms, "deadlines": self.deadlines, "deadline_spread_ms": list(self.deadline_spread_ms), "warmup_events": self.warmup_events, } @dataclass class HarnessReport: spec: StormSpec histograms: Dict[str, Dict[str, Any]] gate: Dict[str, Any] passed: bool sequence_hash: str meta: Dict[str, Any] def to_json(self) -> str: return json.dumps( { "spec": self.spec.to_dict(), "histograms": self.histograms, "gate": self.gate, "passed": self.passed, "sequence_hash": self.sequence_hash, "meta": self.meta, }, indent=2, default=str, ) def table(self) -> str: lines = [f"VIOLET V0 latency report — passed={self.passed}"] for name, d in self.histograms.items(): lines.append( f" {name:<24} n={d['count']:<8} p50={d['p50_ms']:8.3f}ms " f"p99={d['p99_ms']:8.3f}ms p99.9={d['p999_ms']:8.3f}ms " f"max={d['max_ms']:8.3f}ms" ) for k, v in self.gate.items(): lines.append(f" gate.{k}: {v}") return "\n".join(lines) # ── Event generation (seeded, deterministic) ───────────────────────────────── def generate_events(spec: StormSpec) -> List[Dict[str, Any]]: """Deterministic event sequence from the seed. Same seed → same list.""" rng = random.Random(spec.seed) kinds = list(spec.mix.keys()) weights = [spec.mix[k] for k in kinds] events: List[Dict[str, Any]] = [] for i in range(spec.n_events): kind = rng.choices(kinds, weights=weights, k=1)[0] if kind == "ACCOUNT_UPDATE": # Sentinel −1.0: the harness substitutes the kernel's CURRENT # k_capital at fold time so K and E never diverge into a # capital_frozen reconcile ERROR mid-storm. The substitution is # runtime behavior; the generated sequence (and its hash) stays # fully seed-deterministic. ev = { "kind": "ACCOUNT_UPDATE", "event_id": f"storm-au-{i:06d}", "wallet_balance": -1.0, "available_margin": -1.0, "used_margin": 0.0, "maint_margin": 0.0, } elif kind == "FILL_SETTLED": ev = { "kind": "FILL_SETTLED", "event_id": f"storm-fs-{i:06d}", "realized_pnl": 0.0, "fee": round(rng.uniform(0.001, 0.05), 6), "is_maker": rng.random() < 0.5, } elif kind == "PREDICTED_FILL": ev = { "kind": "PREDICTED_FILL", "fill_price": round(rng.uniform(0.1, 100.0), 4), "fill_qty": round(rng.uniform(1.0, 1000.0), 3), "realized_pnl": 0.0, "is_maker": rng.random() < 0.5, } elif kind == "FUNDING_FEE": ev = { "kind": "FUNDING_FEE", "event_id": f"storm-ff-{i:06d}", "funding_amount": round(rng.uniform(-1.0, 1.0), 6), } else: # MARK_PRICE — FSM path via on_venue_event ev = { "kind": "MARK_PRICE", "price": round(rng.uniform(0.1, 100.0), 4), } events.append(ev) return events def sequence_hash(events: List[Dict[str, Any]]) -> str: payload = json.dumps(events, sort_keys=True, separators=(",", ":")).encode() return hashlib.sha256(payload).hexdigest() # ── Harness ────────────────────────────────────────────────────────────────── class ReactorHarness: """Owns the kernel, the consumer queue, and the deadline scheduler.""" def __init__(self, *, kernel: Any = None) -> None: if kernel is None: # Real kernel, MOCK venue — the production bundle constructor. from prod.clean_arch.dita_v2.launcher import build_launcher_bundle bundle = build_launcher_bundle(venue_mode="MOCK", max_slots=1) kernel = bundle.kernel self.kernel = kernel # Seed K = E so the synthetic storm starts reconciled. if hasattr(self.kernel, "reset_and_seed"): try: self.kernel.reset_and_seed(25_000.0) except Exception: pass self.reaction_hist = LatencyHistogram("venue_event_reaction") self.kernel_hist = LatencyHistogram("kernel_call") self.jitter_hist = LatencyHistogram("deadline_jitter") self.account_clock = PlaneClock("account", ACCOUNT_STALENESS_NS) self.early_fires = 0 self._last_k_capital = 25_000.0 def _fold(self, ev: Dict[str, Any]) -> None: """Fold one storm event into the kernel, bracketing the FFI call.""" kind = ev.get("kind") t0 = mono_ns() if kind == "MARK_PRICE": # FSM path: cheap venue event on slot 0 (flat slot → no-op walk). from datetime import datetime, timezone from prod.clean_arch.dita_v2.contracts import ( KernelEventKind, TradeSide, VenueEvent, VenueEventStatus, ) event = VenueEvent( timestamp=datetime.now(timezone.utc), event_id="", trade_id="", slot_id=0, kind=KernelEventKind.MARK_PRICE, status=VenueEventStatus.ACKED, venue_order_id="", venue_client_id="", side=TradeSide.FLAT, asset="STORMUSDT", price=float(ev["price"]), size=0.0, filled_size=0.0, remaining_size=0.0, reason="", raw_payload={}, metadata={}, ) self.kernel.on_venue_event(event) else: if kind == "ACCOUNT_UPDATE" and float(ev.get("wallet_balance", 0.0)) <= 0.0: # Substitute the kernel's own k_capital (sentinel resolution; # captured for free from prior fold returns — no extra FFI). ev = dict(ev) ev["wallet_balance"] = self._last_k_capital ev["available_margin"] = self._last_k_capital result = self.kernel.on_account_event(ev) if isinstance(result, dict): k_cap = result.get("k_capital") if isinstance(k_cap, (int, float)) and k_cap > 0: self._last_k_capital = float(k_cap) self.kernel_hist.record(mono_ns() - t0) self.account_clock.tick() async def run_storm(self, spec: StormSpec) -> HarnessReport: events = generate_events(spec) seq_hash = sequence_hash(events) queue: asyncio.Queue = asyncio.Queue() done = asyncio.Event() # Deadline scheduler runs concurrently throughout the storm. sched = DeadlineScheduler(jitter_hist=self.jitter_hist) sched.start() rng = random.Random(spec.seed ^ 0xD15EA5E) fired_early_box = [0] lo, hi = spec.deadline_spread_ms def _mk_cb(due_holder: List[int]) -> Any: def _cb() -> None: if mono_ns() < due_holder[0]: fired_early_box[0] += 1 return _cb for _ in range(spec.deadlines): delay_ms = rng.uniform(lo, hi) due = mono_ns() + int(delay_ms * 1e6) sched.schedule_at(due, _mk_cb([due])) async def producer() -> None: i = 0 n = len(events) while i < n: if spec.arrival == "burst": burst_end = min(i + spec.burst_size, n) while i < burst_end: await queue.put((mono_ns(), i, events[i])) i += 1 await asyncio.sleep(spec.inter_burst_ms / 1000.0) elif spec.arrival == "poisson": await queue.put((mono_ns(), i, events[i])) i += 1 await asyncio.sleep(rng.expovariate(1000.0)) # ~1k ev/s else: # uniform await queue.put((mono_ns(), i, events[i])) i += 1 await asyncio.sleep(0.0005) await queue.put((0, -1, None)) # sentinel async def reactor() -> None: while True: inject_ns, idx, ev = await queue.get() if ev is None: done.set() return self._fold(ev) if idx >= spec.warmup_events: self.reaction_hist.record(mono_ns() - inject_ns) storm_t0 = mono_ns() prod_task = asyncio.create_task(producer(), name="storm_producer") react_task = asyncio.create_task(reactor(), name="storm_reactor") await done.wait() await prod_task await react_task storm_seconds = max(1e-9, (mono_ns() - storm_t0) / 1e9) sustained_eps = len(events) / storm_seconds # Let remaining short deadlines drain, then stop the driver. await asyncio.sleep(min(2.5, hi / 1000.0 + 0.1)) await sched.stop() self.early_fires = fired_early_box[0] reaction_p99_ms = self.reaction_hist.percentile_ns(0.99) / 1e6 jitter_p99_ms = self.jitter_hist.percentile_ns(0.99) / 1e6 gate = { "reaction_p99_ms": reaction_p99_ms, "reaction_budget_ms": GATE_REACTION_P99_MS, "jitter_p99_ms": jitter_p99_ms, "jitter_budget_ms": GATE_JITTER_P99_MS, "early_fires": self.early_fires, "deadlines_fired": sched.fired_count, "offered_events_per_s": round( spec.burst_size / max(1e-9, spec.inter_burst_ms / 1000.0) if spec.arrival == "burst" else 0.0, 1), "sustained_events_per_s": round(sustained_eps, 1), "storm_seconds": round(storm_seconds, 3), } passed = ( reaction_p99_ms < GATE_REACTION_P99_MS and jitter_p99_ms < GATE_JITTER_P99_MS and self.early_fires == 0 ) meta = { "utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "python": sys.version.split()[0], "platform": platform.platform(), "node": platform.node(), } return HarnessReport( spec=spec, histograms={ h.name: h.to_dict() for h in (self.reaction_hist, self.kernel_hist, self.jitter_hist) }, gate=gate, passed=passed, sequence_hash=seq_hash, meta=meta, ) # ── CLI ────────────────────────────────────────────────────────────────────── async def _amain(args: argparse.Namespace) -> int: spec = StormSpec( n_events=args.events, seed=args.seed, deadlines=args.deadlines, arrival=args.arrival, ) harness = ReactorHarness() report = await harness.run_storm(spec) print(report.table()) if args.out: out = Path(args.out) out.parent.mkdir(parents=True, exist_ok=True) out.write_text(report.to_json()) print(f"report written: {out}") if args.gate and not report.passed: return 1 return 0 def main(argv: Optional[List[str]] = None) -> int: ap = argparse.ArgumentParser(description="VIOLET V0 reactor latency harness") ap.add_argument("--events", type=int, default=50_000) ap.add_argument("--seed", type=int, default=42) ap.add_argument("--deadlines", type=int, default=5_000) ap.add_argument("--arrival", choices=("uniform", "poisson", "burst"), default="burst") ap.add_argument("--out", type=str, default="") ap.add_argument("--gate", action="store_true") args = ap.parse_args(argv) return asyncio.run(_amain(args)) if __name__ == "__main__": raise SystemExit(main())