422 lines
16 KiB
Python
422 lines
16 KiB
Python
|
|
"""VIOLET V0 reactor harness — measured latency gate.
|
|||
|
|
|
|||
|
|
Drives the REAL DITAv2 ``ExecutionKernel`` (Rust-backed, MOCK venue bundle)
|
|||
|
|
through seeded event storms while a ``DeadlineScheduler`` runs concurrently,
|
|||
|
|
and reports exact-percentile latency histograms.
|
|||
|
|
|
|||
|
|
Measured quantities (single ``mono_ns`` timebase):
|
|||
|
|
|
|||
|
|
- ``venue_event_reaction`` (THE GATED NUMBER): the producer stamps
|
|||
|
|
``inject_mono_ns`` when putting an event on the consumer asyncio.Queue —
|
|||
|
|
modeling the WS-reader→reactor hop exactly as the production
|
|||
|
|
``_run_account_stream`` consumes its stream — and the reactor records
|
|||
|
|
``mono_ns() - inject_mono_ns`` AFTER the kernel fold returns. This
|
|||
|
|
includes event-loop scheduling: the charter's "event→reaction in-process".
|
|||
|
|
Gate: p99 < 10 ms.
|
|||
|
|
- ``kernel_call``: brackets the kernel FFI call alone (informational).
|
|||
|
|
- ``deadline_jitter``: ``t_fire − t_due`` per deadline fired during the
|
|||
|
|
storm. Gate: p99 < 25 ms, zero early fires.
|
|||
|
|
|
|||
|
|
Determinism: ``random.Random(seed)`` fully determines the event sequence
|
|||
|
|
(asserted via sequence hash); latencies naturally vary run to run.
|
|||
|
|
|
|||
|
|
CLI:
|
|||
|
|
python -m prod.clean_arch.violet.harness \
|
|||
|
|
--events 50000 --deadlines 5000 --seed 42 \
|
|||
|
|
--out prod/VIOLET_dev/reports/violet_v0_latency_$(date -u +%Y%m%d).json \
|
|||
|
|
--gate
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import argparse
|
|||
|
|
import asyncio
|
|||
|
|
import hashlib
|
|||
|
|
import json
|
|||
|
|
import platform
|
|||
|
|
import random
|
|||
|
|
import sys
|
|||
|
|
import time
|
|||
|
|
from dataclasses import dataclass, field
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Any, Dict, List, Optional
|
|||
|
|
|
|||
|
|
from .clock import (
|
|||
|
|
DeadlineScheduler,
|
|||
|
|
LatencyHistogram,
|
|||
|
|
PlaneClock,
|
|||
|
|
ACCOUNT_STALENESS_NS,
|
|||
|
|
mono_ns,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# Gate thresholds (charter / plan)
|
|||
|
|
GATE_REACTION_P99_MS = 10.0
|
|||
|
|
GATE_JITTER_P99_MS = 25.0
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ── Storm specification ──────────────────────────────────────────────────────
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class StormSpec:
|
|||
|
|
n_events: int = 50_000
|
|||
|
|
seed: int = 42
|
|||
|
|
# Event-kind mix (account-fold path + FSM mark-price path).
|
|||
|
|
mix: Dict[str, float] = field(
|
|||
|
|
default_factory=lambda: {
|
|||
|
|
"ACCOUNT_UPDATE": 0.35,
|
|||
|
|
"FILL_SETTLED": 0.25,
|
|||
|
|
"PREDICTED_FILL": 0.15,
|
|||
|
|
"FUNDING_FEE": 0.05,
|
|||
|
|
"MARK_PRICE": 0.20,
|
|||
|
|
}
|
|||
|
|
)
|
|||
|
|
# Arrival shape. The GATE measures reaction latency under REALISTIC load:
|
|||
|
|
# PINK's live BingX stream shows fill cascades of ~4-8 events per burst
|
|||
|
|
# (PREDICTED_FILL + FILL_SETTLED + ACCOUNT_UPDATE per action) at low
|
|||
|
|
# sustained rates. burst_size=8 / 12 ms ≈ 667 events/s offered — still
|
|||
|
|
# >10x production sustained rates. Larger bursts measure intra-burst
|
|||
|
|
# QUEUEING physics, not reaction (at ~0.33 ms/fold the tail of a
|
|||
|
|
# 16-burst waits ~5 ms before its turn; a 32/1ms storm saturates the
|
|||
|
|
# queue outright — measured: see archived 2026-06-12 capacity reports).
|
|||
|
|
# Use bigger bursts deliberately for capacity exploration, not the gate.
|
|||
|
|
arrival: str = "burst" # uniform | poisson | burst
|
|||
|
|
burst_size: int = 8
|
|||
|
|
inter_burst_ms: float = 12.0
|
|||
|
|
deadlines: int = 5_000
|
|||
|
|
deadline_spread_ms: tuple = (10, 2_000)
|
|||
|
|
warmup_events: int = 1_000 # excluded from gate percentiles
|
|||
|
|
|
|||
|
|
def to_dict(self) -> Dict[str, Any]:
|
|||
|
|
return {
|
|||
|
|
"n_events": self.n_events,
|
|||
|
|
"seed": self.seed,
|
|||
|
|
"mix": dict(self.mix),
|
|||
|
|
"arrival": self.arrival,
|
|||
|
|
"burst_size": self.burst_size,
|
|||
|
|
"inter_burst_ms": self.inter_burst_ms,
|
|||
|
|
"deadlines": self.deadlines,
|
|||
|
|
"deadline_spread_ms": list(self.deadline_spread_ms),
|
|||
|
|
"warmup_events": self.warmup_events,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class HarnessReport:
|
|||
|
|
spec: StormSpec
|
|||
|
|
histograms: Dict[str, Dict[str, Any]]
|
|||
|
|
gate: Dict[str, Any]
|
|||
|
|
passed: bool
|
|||
|
|
sequence_hash: str
|
|||
|
|
meta: Dict[str, Any]
|
|||
|
|
|
|||
|
|
def to_json(self) -> str:
|
|||
|
|
return json.dumps(
|
|||
|
|
{
|
|||
|
|
"spec": self.spec.to_dict(),
|
|||
|
|
"histograms": self.histograms,
|
|||
|
|
"gate": self.gate,
|
|||
|
|
"passed": self.passed,
|
|||
|
|
"sequence_hash": self.sequence_hash,
|
|||
|
|
"meta": self.meta,
|
|||
|
|
},
|
|||
|
|
indent=2,
|
|||
|
|
default=str,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
def table(self) -> str:
|
|||
|
|
lines = [f"VIOLET V0 latency report — passed={self.passed}"]
|
|||
|
|
for name, d in self.histograms.items():
|
|||
|
|
lines.append(
|
|||
|
|
f" {name:<24} n={d['count']:<8} p50={d['p50_ms']:8.3f}ms "
|
|||
|
|
f"p99={d['p99_ms']:8.3f}ms p99.9={d['p999_ms']:8.3f}ms "
|
|||
|
|
f"max={d['max_ms']:8.3f}ms"
|
|||
|
|
)
|
|||
|
|
for k, v in self.gate.items():
|
|||
|
|
lines.append(f" gate.{k}: {v}")
|
|||
|
|
return "\n".join(lines)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ── Event generation (seeded, deterministic) ─────────────────────────────────
|
|||
|
|
|
|||
|
|
|
|||
|
|
def generate_events(spec: StormSpec) -> List[Dict[str, Any]]:
|
|||
|
|
"""Deterministic event sequence from the seed. Same seed → same list."""
|
|||
|
|
rng = random.Random(spec.seed)
|
|||
|
|
kinds = list(spec.mix.keys())
|
|||
|
|
weights = [spec.mix[k] for k in kinds]
|
|||
|
|
events: List[Dict[str, Any]] = []
|
|||
|
|
for i in range(spec.n_events):
|
|||
|
|
kind = rng.choices(kinds, weights=weights, k=1)[0]
|
|||
|
|
if kind == "ACCOUNT_UPDATE":
|
|||
|
|
# Sentinel −1.0: the harness substitutes the kernel's CURRENT
|
|||
|
|
# k_capital at fold time so K and E never diverge into a
|
|||
|
|
# capital_frozen reconcile ERROR mid-storm. The substitution is
|
|||
|
|
# runtime behavior; the generated sequence (and its hash) stays
|
|||
|
|
# fully seed-deterministic.
|
|||
|
|
ev = {
|
|||
|
|
"kind": "ACCOUNT_UPDATE",
|
|||
|
|
"event_id": f"storm-au-{i:06d}",
|
|||
|
|
"wallet_balance": -1.0,
|
|||
|
|
"available_margin": -1.0,
|
|||
|
|
"used_margin": 0.0,
|
|||
|
|
"maint_margin": 0.0,
|
|||
|
|
}
|
|||
|
|
elif kind == "FILL_SETTLED":
|
|||
|
|
ev = {
|
|||
|
|
"kind": "FILL_SETTLED",
|
|||
|
|
"event_id": f"storm-fs-{i:06d}",
|
|||
|
|
"realized_pnl": 0.0,
|
|||
|
|
"fee": round(rng.uniform(0.001, 0.05), 6),
|
|||
|
|
"is_maker": rng.random() < 0.5,
|
|||
|
|
}
|
|||
|
|
elif kind == "PREDICTED_FILL":
|
|||
|
|
ev = {
|
|||
|
|
"kind": "PREDICTED_FILL",
|
|||
|
|
"fill_price": round(rng.uniform(0.1, 100.0), 4),
|
|||
|
|
"fill_qty": round(rng.uniform(1.0, 1000.0), 3),
|
|||
|
|
"realized_pnl": 0.0,
|
|||
|
|
"is_maker": rng.random() < 0.5,
|
|||
|
|
}
|
|||
|
|
elif kind == "FUNDING_FEE":
|
|||
|
|
ev = {
|
|||
|
|
"kind": "FUNDING_FEE",
|
|||
|
|
"event_id": f"storm-ff-{i:06d}",
|
|||
|
|
"funding_amount": round(rng.uniform(-1.0, 1.0), 6),
|
|||
|
|
}
|
|||
|
|
else: # MARK_PRICE — FSM path via on_venue_event
|
|||
|
|
ev = {
|
|||
|
|
"kind": "MARK_PRICE",
|
|||
|
|
"price": round(rng.uniform(0.1, 100.0), 4),
|
|||
|
|
}
|
|||
|
|
events.append(ev)
|
|||
|
|
return events
|
|||
|
|
|
|||
|
|
|
|||
|
|
def sequence_hash(events: List[Dict[str, Any]]) -> str:
|
|||
|
|
payload = json.dumps(events, sort_keys=True, separators=(",", ":")).encode()
|
|||
|
|
return hashlib.sha256(payload).hexdigest()
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ── Harness ──────────────────────────────────────────────────────────────────
|
|||
|
|
|
|||
|
|
|
|||
|
|
class ReactorHarness:
|
|||
|
|
"""Owns the kernel, the consumer queue, and the deadline scheduler."""
|
|||
|
|
|
|||
|
|
def __init__(self, *, kernel: Any = None) -> None:
|
|||
|
|
if kernel is None:
|
|||
|
|
# Real kernel, MOCK venue — the production bundle constructor.
|
|||
|
|
from prod.clean_arch.dita_v2.launcher import build_launcher_bundle
|
|||
|
|
|
|||
|
|
bundle = build_launcher_bundle(venue_mode="MOCK", max_slots=1)
|
|||
|
|
kernel = bundle.kernel
|
|||
|
|
self.kernel = kernel
|
|||
|
|
# Seed K = E so the synthetic storm starts reconciled.
|
|||
|
|
if hasattr(self.kernel, "reset_and_seed"):
|
|||
|
|
try:
|
|||
|
|
self.kernel.reset_and_seed(25_000.0)
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
self.reaction_hist = LatencyHistogram("venue_event_reaction")
|
|||
|
|
self.kernel_hist = LatencyHistogram("kernel_call")
|
|||
|
|
self.jitter_hist = LatencyHistogram("deadline_jitter")
|
|||
|
|
self.account_clock = PlaneClock("account", ACCOUNT_STALENESS_NS)
|
|||
|
|
self.early_fires = 0
|
|||
|
|
self._last_k_capital = 25_000.0
|
|||
|
|
|
|||
|
|
def _fold(self, ev: Dict[str, Any]) -> None:
|
|||
|
|
"""Fold one storm event into the kernel, bracketing the FFI call."""
|
|||
|
|
kind = ev.get("kind")
|
|||
|
|
t0 = mono_ns()
|
|||
|
|
if kind == "MARK_PRICE":
|
|||
|
|
# FSM path: cheap venue event on slot 0 (flat slot → no-op walk).
|
|||
|
|
from datetime import datetime, timezone
|
|||
|
|
|
|||
|
|
from prod.clean_arch.dita_v2.contracts import (
|
|||
|
|
KernelEventKind,
|
|||
|
|
TradeSide,
|
|||
|
|
VenueEvent,
|
|||
|
|
VenueEventStatus,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
event = VenueEvent(
|
|||
|
|
timestamp=datetime.now(timezone.utc),
|
|||
|
|
event_id="",
|
|||
|
|
trade_id="",
|
|||
|
|
slot_id=0,
|
|||
|
|
kind=KernelEventKind.MARK_PRICE,
|
|||
|
|
status=VenueEventStatus.ACKED,
|
|||
|
|
venue_order_id="",
|
|||
|
|
venue_client_id="",
|
|||
|
|
side=TradeSide.FLAT,
|
|||
|
|
asset="STORMUSDT",
|
|||
|
|
price=float(ev["price"]),
|
|||
|
|
size=0.0,
|
|||
|
|
filled_size=0.0,
|
|||
|
|
remaining_size=0.0,
|
|||
|
|
reason="",
|
|||
|
|
raw_payload={},
|
|||
|
|
metadata={},
|
|||
|
|
)
|
|||
|
|
self.kernel.on_venue_event(event)
|
|||
|
|
else:
|
|||
|
|
if kind == "ACCOUNT_UPDATE" and float(ev.get("wallet_balance", 0.0)) <= 0.0:
|
|||
|
|
# Substitute the kernel's own k_capital (sentinel resolution;
|
|||
|
|
# captured for free from prior fold returns — no extra FFI).
|
|||
|
|
ev = dict(ev)
|
|||
|
|
ev["wallet_balance"] = self._last_k_capital
|
|||
|
|
ev["available_margin"] = self._last_k_capital
|
|||
|
|
result = self.kernel.on_account_event(ev)
|
|||
|
|
if isinstance(result, dict):
|
|||
|
|
k_cap = result.get("k_capital")
|
|||
|
|
if isinstance(k_cap, (int, float)) and k_cap > 0:
|
|||
|
|
self._last_k_capital = float(k_cap)
|
|||
|
|
self.kernel_hist.record(mono_ns() - t0)
|
|||
|
|
self.account_clock.tick()
|
|||
|
|
|
|||
|
|
async def run_storm(self, spec: StormSpec) -> HarnessReport:
|
|||
|
|
events = generate_events(spec)
|
|||
|
|
seq_hash = sequence_hash(events)
|
|||
|
|
|
|||
|
|
queue: asyncio.Queue = asyncio.Queue()
|
|||
|
|
done = asyncio.Event()
|
|||
|
|
|
|||
|
|
# Deadline scheduler runs concurrently throughout the storm.
|
|||
|
|
sched = DeadlineScheduler(jitter_hist=self.jitter_hist)
|
|||
|
|
sched.start()
|
|||
|
|
rng = random.Random(spec.seed ^ 0xD15EA5E)
|
|||
|
|
fired_early_box = [0]
|
|||
|
|
lo, hi = spec.deadline_spread_ms
|
|||
|
|
|
|||
|
|
def _mk_cb(due_holder: List[int]) -> Any:
|
|||
|
|
def _cb() -> None:
|
|||
|
|
if mono_ns() < due_holder[0]:
|
|||
|
|
fired_early_box[0] += 1
|
|||
|
|
return _cb
|
|||
|
|
|
|||
|
|
for _ in range(spec.deadlines):
|
|||
|
|
delay_ms = rng.uniform(lo, hi)
|
|||
|
|
due = mono_ns() + int(delay_ms * 1e6)
|
|||
|
|
sched.schedule_at(due, _mk_cb([due]))
|
|||
|
|
|
|||
|
|
async def producer() -> None:
|
|||
|
|
i = 0
|
|||
|
|
n = len(events)
|
|||
|
|
while i < n:
|
|||
|
|
if spec.arrival == "burst":
|
|||
|
|
burst_end = min(i + spec.burst_size, n)
|
|||
|
|
while i < burst_end:
|
|||
|
|
await queue.put((mono_ns(), i, events[i]))
|
|||
|
|
i += 1
|
|||
|
|
await asyncio.sleep(spec.inter_burst_ms / 1000.0)
|
|||
|
|
elif spec.arrival == "poisson":
|
|||
|
|
await queue.put((mono_ns(), i, events[i]))
|
|||
|
|
i += 1
|
|||
|
|
await asyncio.sleep(rng.expovariate(1000.0)) # ~1k ev/s
|
|||
|
|
else: # uniform
|
|||
|
|
await queue.put((mono_ns(), i, events[i]))
|
|||
|
|
i += 1
|
|||
|
|
await asyncio.sleep(0.0005)
|
|||
|
|
await queue.put((0, -1, None)) # sentinel
|
|||
|
|
|
|||
|
|
async def reactor() -> None:
|
|||
|
|
while True:
|
|||
|
|
inject_ns, idx, ev = await queue.get()
|
|||
|
|
if ev is None:
|
|||
|
|
done.set()
|
|||
|
|
return
|
|||
|
|
self._fold(ev)
|
|||
|
|
if idx >= spec.warmup_events:
|
|||
|
|
self.reaction_hist.record(mono_ns() - inject_ns)
|
|||
|
|
|
|||
|
|
storm_t0 = mono_ns()
|
|||
|
|
prod_task = asyncio.create_task(producer(), name="storm_producer")
|
|||
|
|
react_task = asyncio.create_task(reactor(), name="storm_reactor")
|
|||
|
|
await done.wait()
|
|||
|
|
await prod_task
|
|||
|
|
await react_task
|
|||
|
|
storm_seconds = max(1e-9, (mono_ns() - storm_t0) / 1e9)
|
|||
|
|
sustained_eps = len(events) / storm_seconds
|
|||
|
|
# Let remaining short deadlines drain, then stop the driver.
|
|||
|
|
await asyncio.sleep(min(2.5, hi / 1000.0 + 0.1))
|
|||
|
|
await sched.stop()
|
|||
|
|
self.early_fires = fired_early_box[0]
|
|||
|
|
|
|||
|
|
reaction_p99_ms = self.reaction_hist.percentile_ns(0.99) / 1e6
|
|||
|
|
jitter_p99_ms = self.jitter_hist.percentile_ns(0.99) / 1e6
|
|||
|
|
gate = {
|
|||
|
|
"reaction_p99_ms": reaction_p99_ms,
|
|||
|
|
"reaction_budget_ms": GATE_REACTION_P99_MS,
|
|||
|
|
"jitter_p99_ms": jitter_p99_ms,
|
|||
|
|
"jitter_budget_ms": GATE_JITTER_P99_MS,
|
|||
|
|
"early_fires": self.early_fires,
|
|||
|
|
"deadlines_fired": sched.fired_count,
|
|||
|
|
"offered_events_per_s": round(
|
|||
|
|
spec.burst_size / max(1e-9, spec.inter_burst_ms / 1000.0)
|
|||
|
|
if spec.arrival == "burst" else 0.0, 1),
|
|||
|
|
"sustained_events_per_s": round(sustained_eps, 1),
|
|||
|
|
"storm_seconds": round(storm_seconds, 3),
|
|||
|
|
}
|
|||
|
|
passed = (
|
|||
|
|
reaction_p99_ms < GATE_REACTION_P99_MS
|
|||
|
|
and jitter_p99_ms < GATE_JITTER_P99_MS
|
|||
|
|
and self.early_fires == 0
|
|||
|
|
)
|
|||
|
|
meta = {
|
|||
|
|
"utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
|
|||
|
|
"python": sys.version.split()[0],
|
|||
|
|
"platform": platform.platform(),
|
|||
|
|
"node": platform.node(),
|
|||
|
|
}
|
|||
|
|
return HarnessReport(
|
|||
|
|
spec=spec,
|
|||
|
|
histograms={
|
|||
|
|
h.name: h.to_dict()
|
|||
|
|
for h in (self.reaction_hist, self.kernel_hist, self.jitter_hist)
|
|||
|
|
},
|
|||
|
|
gate=gate,
|
|||
|
|
passed=passed,
|
|||
|
|
sequence_hash=seq_hash,
|
|||
|
|
meta=meta,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ── CLI ──────────────────────────────────────────────────────────────────────
|
|||
|
|
|
|||
|
|
|
|||
|
|
async def _amain(args: argparse.Namespace) -> int:
|
|||
|
|
spec = StormSpec(
|
|||
|
|
n_events=args.events,
|
|||
|
|
seed=args.seed,
|
|||
|
|
deadlines=args.deadlines,
|
|||
|
|
arrival=args.arrival,
|
|||
|
|
)
|
|||
|
|
harness = ReactorHarness()
|
|||
|
|
report = await harness.run_storm(spec)
|
|||
|
|
print(report.table())
|
|||
|
|
if args.out:
|
|||
|
|
out = Path(args.out)
|
|||
|
|
out.parent.mkdir(parents=True, exist_ok=True)
|
|||
|
|
out.write_text(report.to_json())
|
|||
|
|
print(f"report written: {out}")
|
|||
|
|
if args.gate and not report.passed:
|
|||
|
|
return 1
|
|||
|
|
return 0
|
|||
|
|
|
|||
|
|
|
|||
|
|
def main(argv: Optional[List[str]] = None) -> int:
|
|||
|
|
ap = argparse.ArgumentParser(description="VIOLET V0 reactor latency harness")
|
|||
|
|
ap.add_argument("--events", type=int, default=50_000)
|
|||
|
|
ap.add_argument("--seed", type=int, default=42)
|
|||
|
|
ap.add_argument("--deadlines", type=int, default=5_000)
|
|||
|
|
ap.add_argument("--arrival", choices=("uniform", "poisson", "burst"), default="burst")
|
|||
|
|
ap.add_argument("--out", type=str, default="")
|
|||
|
|
ap.add_argument("--gate", action="store_true")
|
|||
|
|
args = ap.parse_args(argv)
|
|||
|
|
return asyncio.run(_amain(args))
|
|||
|
|
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
raise SystemExit(main())
|