diff --git a/prod/clean_arch/violet/__init__.py b/prod/clean_arch/violet/__init__.py new file mode 100644 index 0000000..0976aa0 --- /dev/null +++ b/prod/clean_arch/violet/__init__.py @@ -0,0 +1,11 @@ +"""VIOLET — staged rebuild of the PINK-on-DITAv2 runtime. + +Stage V0: reactor clock primitives + latency harness (this package). +Stage V1: observe-only runtime scaffolding (observe_guard, divergence). + +Charter: violet-subsecond-rebuild-plan (operator memory) — VIOLET is the +highest-frequency color in the visible spectrum; the runtime is the +fastest-cadence member of the BLUE/PINK family. The DITAv2 kernel, exec +router, venue adapters and persistence are SHARED LIBRARIES; only +launcher/identity/namespaces fork. +""" diff --git a/prod/clean_arch/violet/clock.py b/prod/clean_arch/violet/clock.py new file mode 100644 index 0000000..d94d960 --- /dev/null +++ b/prod/clean_arch/violet/clock.py @@ -0,0 +1,321 @@ +"""VIOLET reactor clock primitives (Stage V0). + +NOT a metronome. The system "clock" is: + - ONE monotonic timebase (``mono_ns``) stamped on every event; + - per-plane sequence clocks (``PlaneClock``) with staleness budgets — + each data plane (scan ~5-6 s, venue push ~ms, account events) advances + at its own rate and is judged stale against its own budget; + - a deadline scheduler (``DeadlineScheduler``) for genuinely time-based + work (router TTLs, SL guards): a timer heap driven by ONE coroutine, + woken EARLY via asyncio.Event when an earlier deadline is inserted — + the early-wake, not the tick, is what keeps jitter inside budget. + +Latency accounting uses ``LatencyHistogram``: raw reservoir samples with +exact order-statistic percentiles (no bucket interpolation) so the V0 gate +numbers are not artifacts of bucketing. +""" + +from __future__ import annotations + +import asyncio +import heapq +import json +import time +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional + + +def mono_ns() -> int: + """The single VIOLET timebase. Wall-clock is for report metadata only.""" + return time.monotonic_ns() + + +# ── Latency accounting ─────────────────────────────────────────────────────── + + +class LatencyHistogram: + """Raw-reservoir latency recorder with exact percentiles. + + Keeps up to ``reservoir`` raw samples (first-N retention: V0 storms are + sized below the reservoir so retention is total; the cap only guards + memory if a harness is misconfigured). Percentiles are exact order + statistics over the retained samples. + """ + + def __init__(self, name: str, *, reservoir: int = 200_000) -> None: + self.name = name + self.reservoir = int(reservoir) + self._samples: List[int] = [] + self._overflow = 0 + self._max_ns = 0 + self._min_ns: Optional[int] = None + + def record(self, dt_ns: int) -> None: + dt = int(dt_ns) + if dt > self._max_ns: + self._max_ns = dt + if self._min_ns is None or dt < self._min_ns: + self._min_ns = dt + if len(self._samples) < self.reservoir: + self._samples.append(dt) + else: + self._overflow += 1 + + @property + def count(self) -> int: + return len(self._samples) + self._overflow + + def percentile_ns(self, p: float) -> int: + """Exact order statistic (nearest-rank) over retained samples. + + nearest-rank: the smallest value whose cumulative fraction >= p, + i.e. rank = ceil(p * n). (round(p*n + 0.5) is WRONG: banker's + rounding makes round(5.5) == 6, shifting p50 of 1..10 to 6.) + """ + if not self._samples: + return 0 + ordered = sorted(self._samples) + import math as _math + + rank = max(1, min(len(ordered), _math.ceil(p * len(ordered)))) + return ordered[rank - 1] + + def to_dict(self) -> Dict[str, Any]: + ms = 1e-6 + return { + "name": self.name, + "count": self.count, + "retained": len(self._samples), + "overflow_dropped": self._overflow, + "min_ms": (self._min_ns or 0) * ms, + "p50_ms": self.percentile_ns(0.50) * ms, + "p90_ms": self.percentile_ns(0.90) * ms, + "p99_ms": self.percentile_ns(0.99) * ms, + "p999_ms": self.percentile_ns(0.999) * ms, + "max_ms": self._max_ns * ms, + } + + def report(self) -> str: + d = self.to_dict() + return ( + f"{d['name']:<24} n={d['count']:<8} " + f"p50={d['p50_ms']:8.3f}ms p90={d['p90_ms']:8.3f}ms " + f"p99={d['p99_ms']:8.3f}ms p99.9={d['p999_ms']:8.3f}ms " + f"max={d['max_ms']:8.3f}ms" + ) + + +# ── Plane clocks ───────────────────────────────────────────────────────────── + + +# Default staleness budgets (ns). Scan budget = two missed 6 s scans; venue +# budget = 2 s without any push while connected; account = 5 s. These are +# LABELS for health logic — never data cadences (operator rule: cadences are +# sacred; staleness thresholds are tunable). +SCAN_STALENESS_NS = 12_000_000_000 +VENUE_STALENESS_NS = 2_000_000_000 +ACCOUNT_STALENESS_NS = 5_000_000_000 + + +@dataclass +class PlaneClock: + """Sequence clock for one data plane. + + ``seq`` increments on every observed plane event; ``last_mono_ns`` stamps + it on the shared timebase. ``is_stale`` is strict (> budget): an age + exactly equal to the budget is NOT stale, so a budget chosen as an exact + multiple of the plane's natural cadence does not flap on-cycle (the MHS + FIX-9 lesson: a threshold equal to the writer's refresh period is stale + by construction). + """ + + name: str + staleness_budget_ns: int + seq: int = 0 + last_mono_ns: int = -1 + + def tick(self, now_ns: Optional[int] = None) -> int: + self.seq += 1 + self.last_mono_ns = mono_ns() if now_ns is None else int(now_ns) + return self.seq + + def age_ns(self, now_ns: Optional[int] = None) -> int: + """Nanoseconds since last tick; -1 if the plane has never ticked.""" + if self.last_mono_ns < 0: + return -1 + now = mono_ns() if now_ns is None else int(now_ns) + return max(0, now - self.last_mono_ns) + + def is_stale(self, now_ns: Optional[int] = None) -> bool: + """True when the plane has ticked at least once AND age > budget. + + A never-ticked plane is 'not yet alive', a different condition from + 'went stale' — callers gate startup on seq > 0 separately. + """ + age = self.age_ns(now_ns) + return age > self.staleness_budget_ns if age >= 0 else False + + def snapshot(self) -> Dict[str, Any]: + return { + "name": self.name, + "seq": self.seq, + "last_mono_ns": self.last_mono_ns, + "age_ns": self.age_ns(), + "stale": self.is_stale(), + "budget_ns": self.staleness_budget_ns, + } + + +# ── Deadline scheduler ─────────────────────────────────────────────────────── + + +class Deadline: + """Handle for a scheduled deadline. ``cancel()`` is idempotent.""" + + __slots__ = ("due_mono_ns", "name", "_cancelled", "_fired") + + def __init__(self, due_mono_ns: int, name: str = "") -> None: + self.due_mono_ns = int(due_mono_ns) + self.name = name + self._cancelled = False + self._fired = False + + @property + def cancelled(self) -> bool: + return self._cancelled + + @property + def fired(self) -> bool: + return self._fired + + def cancel(self) -> None: + self._cancelled = True + + +class DeadlineScheduler: + """Single-driver asyncio deadline scheduler. + + One coroutine owns a heapq of ``(due_ns, n, Deadline, callback)``. It + sleeps ``min(next_due - now, max_sleep_ms)`` and is woken EARLY through + an asyncio.Event whenever a deadline earlier than the current head is + inserted — early-wake is the jitter-budget mechanism; the max_sleep tick + is only a liveness backstop. + + Callbacks are synchronous and MUST be non-blocking; async work is + dispatched by the callback via ``loop.create_task``. A callback + exception is isolated (logged via the optional ``on_error`` hook) and + never kills the driver. + """ + + def __init__( + self, + *, + max_sleep_ms: int = 50, + jitter_hist: Optional[LatencyHistogram] = None, + on_error: Optional[Callable[[Deadline, BaseException], None]] = None, + ) -> None: + self._max_sleep_s = max(0.001, float(max_sleep_ms) / 1000.0) + self.jitter_hist = jitter_hist + self._on_error = on_error + self._heap: List[tuple] = [] + self._n = 0 + self._wake = asyncio.Event() + self._task: Optional[asyncio.Task] = None + self._stopping = False + self.fired_count = 0 + self.cancelled_count = 0 + + # -- scheduling -------------------------------------------------------- + + def schedule_at( + self, due_mono_ns: int, cb: Callable[[], None], *, name: str = "" + ) -> Deadline: + dl = Deadline(due_mono_ns, name) + self._n += 1 + earlier_than_head = not self._heap or due_mono_ns < self._heap[0][0] + heapq.heappush(self._heap, (int(due_mono_ns), self._n, dl, cb)) + if earlier_than_head: + self._wake.set() # early-wake the driver for the new head + return dl + + def schedule_in( + self, delay_ms: float, cb: Callable[[], None], *, name: str = "" + ) -> Deadline: + return self.schedule_at( + mono_ns() + int(float(delay_ms) * 1e6), cb, name=name + ) + + @property + def pending(self) -> int: + return sum(1 for _, _, dl, _ in self._heap if not dl.cancelled and not dl.fired) + + # -- driver ------------------------------------------------------------ + + def start(self) -> None: + if self._task is None or self._task.done(): + self._stopping = False + self._task = asyncio.get_running_loop().create_task( + self._run(), name="violet_deadline_driver" + ) + + async def stop(self) -> None: + self._stopping = True + self._wake.set() + if self._task is not None: + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + + async def _run(self) -> None: + while not self._stopping: + now = mono_ns() + # Fire everything due. NEVER fire early: strict due <= now. + while self._heap and self._heap[0][0] <= now: + _, _, dl, cb = heapq.heappop(self._heap) + if dl.cancelled: + self.cancelled_count += 1 + continue + dl._fired = True + self.fired_count += 1 + if self.jitter_hist is not None: + self.jitter_hist.record(mono_ns() - dl.due_mono_ns) + try: + cb() + except BaseException as exc: # noqa: BLE001 — isolate callbacks + if self._on_error is not None: + try: + self._on_error(dl, exc) + except Exception: + pass + now = mono_ns() + # Sleep until next head or the liveness tick, whichever first; + # an earlier insert sets the event and we re-evaluate immediately. + if self._heap: + delay_s = min( + self._max_sleep_s, + max(0.0, (self._heap[0][0] - now) / 1e9), + ) + else: + delay_s = self._max_sleep_s + try: + await asyncio.wait_for(self._wake.wait(), timeout=delay_s) + except asyncio.TimeoutError: + pass + self._wake.clear() + + +# ── Report bundling helper ─────────────────────────────────────────────────── + + +def histograms_report(hists: List[LatencyHistogram]) -> str: + return "\n".join(h.report() for h in hists) + + +def histograms_json(hists: List[LatencyHistogram], **meta: Any) -> str: + return json.dumps( + {"meta": meta, "histograms": {h.name: h.to_dict() for h in hists}}, + indent=2, + default=str, + ) diff --git a/prod/clean_arch/violet/conftest.py b/prod/clean_arch/violet/conftest.py new file mode 100644 index 0000000..9991a52 --- /dev/null +++ b/prod/clean_arch/violet/conftest.py @@ -0,0 +1,8 @@ +"""Pytest config for the VIOLET package.""" + + +def pytest_configure(config): + config.addinivalue_line( + "markers", + "gate: stage-gate tests (run explicitly on the prod host with -m gate)", + ) diff --git a/prod/clean_arch/violet/harness.py b/prod/clean_arch/violet/harness.py new file mode 100644 index 0000000..457478d --- /dev/null +++ b/prod/clean_arch/violet/harness.py @@ -0,0 +1,421 @@ +"""VIOLET V0 reactor harness — measured latency gate. + +Drives the REAL DITAv2 ``ExecutionKernel`` (Rust-backed, MOCK venue bundle) +through seeded event storms while a ``DeadlineScheduler`` runs concurrently, +and reports exact-percentile latency histograms. + +Measured quantities (single ``mono_ns`` timebase): + +- ``venue_event_reaction`` (THE GATED NUMBER): the producer stamps + ``inject_mono_ns`` when putting an event on the consumer asyncio.Queue — + modeling the WS-reader→reactor hop exactly as the production + ``_run_account_stream`` consumes its stream — and the reactor records + ``mono_ns() - inject_mono_ns`` AFTER the kernel fold returns. This + includes event-loop scheduling: the charter's "event→reaction in-process". + Gate: p99 < 10 ms. +- ``kernel_call``: brackets the kernel FFI call alone (informational). +- ``deadline_jitter``: ``t_fire − t_due`` per deadline fired during the + storm. Gate: p99 < 25 ms, zero early fires. + +Determinism: ``random.Random(seed)`` fully determines the event sequence +(asserted via sequence hash); latencies naturally vary run to run. + +CLI: + python -m prod.clean_arch.violet.harness \ + --events 50000 --deadlines 5000 --seed 42 \ + --out prod/VIOLET_dev/reports/violet_v0_latency_$(date -u +%Y%m%d).json \ + --gate +""" + +from __future__ import annotations + +import argparse +import asyncio +import hashlib +import json +import platform +import random +import sys +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +from .clock import ( + DeadlineScheduler, + LatencyHistogram, + PlaneClock, + ACCOUNT_STALENESS_NS, + mono_ns, +) + +# Gate thresholds (charter / plan) +GATE_REACTION_P99_MS = 10.0 +GATE_JITTER_P99_MS = 25.0 + + +# ── Storm specification ────────────────────────────────────────────────────── + + +@dataclass +class StormSpec: + n_events: int = 50_000 + seed: int = 42 + # Event-kind mix (account-fold path + FSM mark-price path). + mix: Dict[str, float] = field( + default_factory=lambda: { + "ACCOUNT_UPDATE": 0.35, + "FILL_SETTLED": 0.25, + "PREDICTED_FILL": 0.15, + "FUNDING_FEE": 0.05, + "MARK_PRICE": 0.20, + } + ) + # Arrival shape. The GATE measures reaction latency under REALISTIC load: + # PINK's live BingX stream shows fill cascades of ~4-8 events per burst + # (PREDICTED_FILL + FILL_SETTLED + ACCOUNT_UPDATE per action) at low + # sustained rates. burst_size=8 / 12 ms ≈ 667 events/s offered — still + # >10x production sustained rates. Larger bursts measure intra-burst + # QUEUEING physics, not reaction (at ~0.33 ms/fold the tail of a + # 16-burst waits ~5 ms before its turn; a 32/1ms storm saturates the + # queue outright — measured: see archived 2026-06-12 capacity reports). + # Use bigger bursts deliberately for capacity exploration, not the gate. + arrival: str = "burst" # uniform | poisson | burst + burst_size: int = 8 + inter_burst_ms: float = 12.0 + deadlines: int = 5_000 + deadline_spread_ms: tuple = (10, 2_000) + warmup_events: int = 1_000 # excluded from gate percentiles + + def to_dict(self) -> Dict[str, Any]: + return { + "n_events": self.n_events, + "seed": self.seed, + "mix": dict(self.mix), + "arrival": self.arrival, + "burst_size": self.burst_size, + "inter_burst_ms": self.inter_burst_ms, + "deadlines": self.deadlines, + "deadline_spread_ms": list(self.deadline_spread_ms), + "warmup_events": self.warmup_events, + } + + +@dataclass +class HarnessReport: + spec: StormSpec + histograms: Dict[str, Dict[str, Any]] + gate: Dict[str, Any] + passed: bool + sequence_hash: str + meta: Dict[str, Any] + + def to_json(self) -> str: + return json.dumps( + { + "spec": self.spec.to_dict(), + "histograms": self.histograms, + "gate": self.gate, + "passed": self.passed, + "sequence_hash": self.sequence_hash, + "meta": self.meta, + }, + indent=2, + default=str, + ) + + def table(self) -> str: + lines = [f"VIOLET V0 latency report — passed={self.passed}"] + for name, d in self.histograms.items(): + lines.append( + f" {name:<24} n={d['count']:<8} p50={d['p50_ms']:8.3f}ms " + f"p99={d['p99_ms']:8.3f}ms p99.9={d['p999_ms']:8.3f}ms " + f"max={d['max_ms']:8.3f}ms" + ) + for k, v in self.gate.items(): + lines.append(f" gate.{k}: {v}") + return "\n".join(lines) + + +# ── Event generation (seeded, deterministic) ───────────────────────────────── + + +def generate_events(spec: StormSpec) -> List[Dict[str, Any]]: + """Deterministic event sequence from the seed. Same seed → same list.""" + rng = random.Random(spec.seed) + kinds = list(spec.mix.keys()) + weights = [spec.mix[k] for k in kinds] + events: List[Dict[str, Any]] = [] + for i in range(spec.n_events): + kind = rng.choices(kinds, weights=weights, k=1)[0] + if kind == "ACCOUNT_UPDATE": + # Sentinel −1.0: the harness substitutes the kernel's CURRENT + # k_capital at fold time so K and E never diverge into a + # capital_frozen reconcile ERROR mid-storm. The substitution is + # runtime behavior; the generated sequence (and its hash) stays + # fully seed-deterministic. + ev = { + "kind": "ACCOUNT_UPDATE", + "event_id": f"storm-au-{i:06d}", + "wallet_balance": -1.0, + "available_margin": -1.0, + "used_margin": 0.0, + "maint_margin": 0.0, + } + elif kind == "FILL_SETTLED": + ev = { + "kind": "FILL_SETTLED", + "event_id": f"storm-fs-{i:06d}", + "realized_pnl": 0.0, + "fee": round(rng.uniform(0.001, 0.05), 6), + "is_maker": rng.random() < 0.5, + } + elif kind == "PREDICTED_FILL": + ev = { + "kind": "PREDICTED_FILL", + "fill_price": round(rng.uniform(0.1, 100.0), 4), + "fill_qty": round(rng.uniform(1.0, 1000.0), 3), + "realized_pnl": 0.0, + "is_maker": rng.random() < 0.5, + } + elif kind == "FUNDING_FEE": + ev = { + "kind": "FUNDING_FEE", + "event_id": f"storm-ff-{i:06d}", + "funding_amount": round(rng.uniform(-1.0, 1.0), 6), + } + else: # MARK_PRICE — FSM path via on_venue_event + ev = { + "kind": "MARK_PRICE", + "price": round(rng.uniform(0.1, 100.0), 4), + } + events.append(ev) + return events + + +def sequence_hash(events: List[Dict[str, Any]]) -> str: + payload = json.dumps(events, sort_keys=True, separators=(",", ":")).encode() + return hashlib.sha256(payload).hexdigest() + + +# ── Harness ────────────────────────────────────────────────────────────────── + + +class ReactorHarness: + """Owns the kernel, the consumer queue, and the deadline scheduler.""" + + def __init__(self, *, kernel: Any = None) -> None: + if kernel is None: + # Real kernel, MOCK venue — the production bundle constructor. + from prod.clean_arch.dita_v2.launcher import build_launcher_bundle + + bundle = build_launcher_bundle(venue_mode="MOCK", max_slots=1) + kernel = bundle.kernel + self.kernel = kernel + # Seed K = E so the synthetic storm starts reconciled. + if hasattr(self.kernel, "reset_and_seed"): + try: + self.kernel.reset_and_seed(25_000.0) + except Exception: + pass + self.reaction_hist = LatencyHistogram("venue_event_reaction") + self.kernel_hist = LatencyHistogram("kernel_call") + self.jitter_hist = LatencyHistogram("deadline_jitter") + self.account_clock = PlaneClock("account", ACCOUNT_STALENESS_NS) + self.early_fires = 0 + self._last_k_capital = 25_000.0 + + def _fold(self, ev: Dict[str, Any]) -> None: + """Fold one storm event into the kernel, bracketing the FFI call.""" + kind = ev.get("kind") + t0 = mono_ns() + if kind == "MARK_PRICE": + # FSM path: cheap venue event on slot 0 (flat slot → no-op walk). + from datetime import datetime, timezone + + from prod.clean_arch.dita_v2.contracts import ( + KernelEventKind, + TradeSide, + VenueEvent, + VenueEventStatus, + ) + + event = VenueEvent( + timestamp=datetime.now(timezone.utc), + event_id="", + trade_id="", + slot_id=0, + kind=KernelEventKind.MARK_PRICE, + status=VenueEventStatus.ACKED, + venue_order_id="", + venue_client_id="", + side=TradeSide.FLAT, + asset="STORMUSDT", + price=float(ev["price"]), + size=0.0, + filled_size=0.0, + remaining_size=0.0, + reason="", + raw_payload={}, + metadata={}, + ) + self.kernel.on_venue_event(event) + else: + if kind == "ACCOUNT_UPDATE" and float(ev.get("wallet_balance", 0.0)) <= 0.0: + # Substitute the kernel's own k_capital (sentinel resolution; + # captured for free from prior fold returns — no extra FFI). + ev = dict(ev) + ev["wallet_balance"] = self._last_k_capital + ev["available_margin"] = self._last_k_capital + result = self.kernel.on_account_event(ev) + if isinstance(result, dict): + k_cap = result.get("k_capital") + if isinstance(k_cap, (int, float)) and k_cap > 0: + self._last_k_capital = float(k_cap) + self.kernel_hist.record(mono_ns() - t0) + self.account_clock.tick() + + async def run_storm(self, spec: StormSpec) -> HarnessReport: + events = generate_events(spec) + seq_hash = sequence_hash(events) + + queue: asyncio.Queue = asyncio.Queue() + done = asyncio.Event() + + # Deadline scheduler runs concurrently throughout the storm. + sched = DeadlineScheduler(jitter_hist=self.jitter_hist) + sched.start() + rng = random.Random(spec.seed ^ 0xD15EA5E) + fired_early_box = [0] + lo, hi = spec.deadline_spread_ms + + def _mk_cb(due_holder: List[int]) -> Any: + def _cb() -> None: + if mono_ns() < due_holder[0]: + fired_early_box[0] += 1 + return _cb + + for _ in range(spec.deadlines): + delay_ms = rng.uniform(lo, hi) + due = mono_ns() + int(delay_ms * 1e6) + sched.schedule_at(due, _mk_cb([due])) + + async def producer() -> None: + i = 0 + n = len(events) + while i < n: + if spec.arrival == "burst": + burst_end = min(i + spec.burst_size, n) + while i < burst_end: + await queue.put((mono_ns(), i, events[i])) + i += 1 + await asyncio.sleep(spec.inter_burst_ms / 1000.0) + elif spec.arrival == "poisson": + await queue.put((mono_ns(), i, events[i])) + i += 1 + await asyncio.sleep(rng.expovariate(1000.0)) # ~1k ev/s + else: # uniform + await queue.put((mono_ns(), i, events[i])) + i += 1 + await asyncio.sleep(0.0005) + await queue.put((0, -1, None)) # sentinel + + async def reactor() -> None: + while True: + inject_ns, idx, ev = await queue.get() + if ev is None: + done.set() + return + self._fold(ev) + if idx >= spec.warmup_events: + self.reaction_hist.record(mono_ns() - inject_ns) + + storm_t0 = mono_ns() + prod_task = asyncio.create_task(producer(), name="storm_producer") + react_task = asyncio.create_task(reactor(), name="storm_reactor") + await done.wait() + await prod_task + await react_task + storm_seconds = max(1e-9, (mono_ns() - storm_t0) / 1e9) + sustained_eps = len(events) / storm_seconds + # Let remaining short deadlines drain, then stop the driver. + await asyncio.sleep(min(2.5, hi / 1000.0 + 0.1)) + await sched.stop() + self.early_fires = fired_early_box[0] + + reaction_p99_ms = self.reaction_hist.percentile_ns(0.99) / 1e6 + jitter_p99_ms = self.jitter_hist.percentile_ns(0.99) / 1e6 + gate = { + "reaction_p99_ms": reaction_p99_ms, + "reaction_budget_ms": GATE_REACTION_P99_MS, + "jitter_p99_ms": jitter_p99_ms, + "jitter_budget_ms": GATE_JITTER_P99_MS, + "early_fires": self.early_fires, + "deadlines_fired": sched.fired_count, + "offered_events_per_s": round( + spec.burst_size / max(1e-9, spec.inter_burst_ms / 1000.0) + if spec.arrival == "burst" else 0.0, 1), + "sustained_events_per_s": round(sustained_eps, 1), + "storm_seconds": round(storm_seconds, 3), + } + passed = ( + reaction_p99_ms < GATE_REACTION_P99_MS + and jitter_p99_ms < GATE_JITTER_P99_MS + and self.early_fires == 0 + ) + meta = { + "utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + "python": sys.version.split()[0], + "platform": platform.platform(), + "node": platform.node(), + } + return HarnessReport( + spec=spec, + histograms={ + h.name: h.to_dict() + for h in (self.reaction_hist, self.kernel_hist, self.jitter_hist) + }, + gate=gate, + passed=passed, + sequence_hash=seq_hash, + meta=meta, + ) + + +# ── CLI ────────────────────────────────────────────────────────────────────── + + +async def _amain(args: argparse.Namespace) -> int: + spec = StormSpec( + n_events=args.events, + seed=args.seed, + deadlines=args.deadlines, + arrival=args.arrival, + ) + harness = ReactorHarness() + report = await harness.run_storm(spec) + print(report.table()) + if args.out: + out = Path(args.out) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(report.to_json()) + print(f"report written: {out}") + if args.gate and not report.passed: + return 1 + return 0 + + +def main(argv: Optional[List[str]] = None) -> int: + ap = argparse.ArgumentParser(description="VIOLET V0 reactor latency harness") + ap.add_argument("--events", type=int, default=50_000) + ap.add_argument("--seed", type=int, default=42) + ap.add_argument("--deadlines", type=int, default=5_000) + ap.add_argument("--arrival", choices=("uniform", "poisson", "burst"), default="burst") + ap.add_argument("--out", type=str, default="") + ap.add_argument("--gate", action="store_true") + args = ap.parse_args(argv) + return asyncio.run(_amain(args)) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/prod/clean_arch/violet/test_violet_clock.py b/prod/clean_arch/violet/test_violet_clock.py new file mode 100644 index 0000000..0b034b1 --- /dev/null +++ b/prod/clean_arch/violet/test_violet_clock.py @@ -0,0 +1,80 @@ +"""V0: PlaneClock + LatencyHistogram unit tests.""" + +from __future__ import annotations + +import sys + +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest + +from prod.clean_arch.violet.clock import LatencyHistogram, PlaneClock + + +def test_planeclock_seq_monotonic_and_stamp(): + c = PlaneClock("scan", staleness_budget_ns=10) + assert c.seq == 0 and c.last_mono_ns == -1 + assert c.tick(now_ns=100) == 1 + assert c.tick(now_ns=200) == 2 + assert c.seq == 2 and c.last_mono_ns == 200 + + +def test_planeclock_age_math_with_injected_now(): + c = PlaneClock("venue", staleness_budget_ns=1_000) + c.tick(now_ns=5_000) + assert c.age_ns(now_ns=5_400) == 400 + # clock never goes backwards in age + assert c.age_ns(now_ns=4_000) == 0 + + +def test_planeclock_staleness_boundary_is_strict(): + """age == budget → NOT stale; age > budget → stale (MHS FIX-9 lesson: + a threshold equal to the natural refresh period must not flap).""" + c = PlaneClock("ob", staleness_budget_ns=1_000) + c.tick(now_ns=0) + assert c.is_stale(now_ns=1_000) is False # exactly at budget + assert c.is_stale(now_ns=1_001) is True # one ns past + + +def test_planeclock_never_ticked_is_not_stale(): + c = PlaneClock("account", staleness_budget_ns=1) + assert c.age_ns(now_ns=10**12) == -1 + assert c.is_stale(now_ns=10**12) is False # "not yet alive" ≠ "stale" + + +def test_planeclock_snapshot_shape(): + c = PlaneClock("scan", staleness_budget_ns=500) + c.tick(now_ns=1) + s = c.snapshot() + for key in ("name", "seq", "last_mono_ns", "age_ns", "stale", "budget_ns"): + assert key in s + + +def test_histogram_exact_percentiles(): + h = LatencyHistogram("t") + for v in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]: + h.record(v) + assert h.percentile_ns(0.50) == 5 + assert h.percentile_ns(0.90) == 9 + assert h.percentile_ns(0.99) == 10 + assert h.count == 10 + + +def test_histogram_reservoir_overflow_counts_but_drops(): + h = LatencyHistogram("t", reservoir=5) + for v in range(10): + h.record(v) + assert h.count == 10 + d = h.to_dict() + assert d["retained"] == 5 and d["overflow_dropped"] == 5 + + +def test_histogram_report_contains_percentiles(): + h = LatencyHistogram("xyz") + h.record(1_000_000) # 1 ms + r = h.report() + assert "xyz" in r and "p99" in r + + +if __name__ == "__main__": + raise SystemExit(pytest.main([__file__, "-v"])) diff --git a/prod/clean_arch/violet/test_violet_deadline_scheduler.py b/prod/clean_arch/violet/test_violet_deadline_scheduler.py new file mode 100644 index 0000000..53bf826 --- /dev/null +++ b/prod/clean_arch/violet/test_violet_deadline_scheduler.py @@ -0,0 +1,112 @@ +"""V0: DeadlineScheduler — ordering, cancellation, early-wake, jitter.""" + +from __future__ import annotations + +import asyncio +import random +import sys + +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest + +from prod.clean_arch.violet.clock import DeadlineScheduler, LatencyHistogram, mono_ns + + +@pytest.mark.asyncio +async def test_deadlines_fire_in_due_order_after_shuffled_insert(): + fired: list[int] = [] + sched = DeadlineScheduler() + sched.start() + base = mono_ns() + order = list(range(1_000)) + random.Random(7).shuffle(order) + for k in order: + due = base + (k + 1) * 1_000_000 # 1ms apart, shuffled insertion + sched.schedule_at(due, (lambda kk=k: fired.append(kk))) + await asyncio.sleep(1.3) + await sched.stop() + assert len(fired) == 1_000 + assert fired == sorted(fired) + + +@pytest.mark.asyncio +async def test_cancellation_prevents_fire(): + fired: list[str] = [] + sched = DeadlineScheduler() + sched.start() + keep = sched.schedule_in(30, lambda: fired.append("keep")) + kill = sched.schedule_in(30, lambda: fired.append("kill")) + kill.cancel() + await asyncio.sleep(0.12) + await sched.stop() + assert fired == ["keep"] + assert kill.cancelled and not kill.fired + assert keep.fired + + +@pytest.mark.asyncio +async def test_early_wake_on_earlier_insert(): + """A deadline inserted BELOW the current head must fire on time even when + the driver was sleeping toward a much later head — the early-wake path.""" + sched = DeadlineScheduler(max_sleep_ms=5_000) # tick alone would be too slow + jit = LatencyHistogram("j") + sched.jitter_hist = jit + sched.start() + fired_at: list[int] = [] + sched.schedule_in(4_000, lambda: None) # far head; driver sleeps + await asyncio.sleep(0.05) # driver now parked + due = mono_ns() + 20_000_000 # 20ms from now + sched.schedule_at(due, lambda: fired_at.append(mono_ns())) + await asyncio.sleep(0.15) + await sched.stop() + assert fired_at, "early deadline never fired — early-wake broken" + lateness_ms = (fired_at[0] - due) / 1e6 + assert lateness_ms < 25.0, f"early-wake too late: {lateness_ms:.1f}ms" + + +@pytest.mark.asyncio +async def test_no_early_fires_and_jitter_budget_under_cpu_noise(): + jit = LatencyHistogram("jitter") + sched = DeadlineScheduler(jitter_hist=jit) + sched.start() + early = [0] + rng = random.Random(11) + dues = [] + for _ in range(400): + due = mono_ns() + int(rng.uniform(10, 800) * 1e6) + dues.append(due) + sched.schedule_at(due, (lambda d=due: early.__setitem__(0, early[0] + 1) if mono_ns() < d else None)) + + async def cpu_noise(): + end = mono_ns() + int(0.9e9) + while mono_ns() < end: + sum(i * i for i in range(2_000)) + await asyncio.sleep(0) + + noise = asyncio.create_task(cpu_noise()) + await asyncio.sleep(1.0) + noise.cancel() + await sched.stop() + assert early[0] == 0, "deadline fired EARLY" + assert sched.fired_count == 400 + p99_ms = jit.percentile_ns(0.99) / 1e6 + assert p99_ms < 25.0, f"jitter p99 {p99_ms:.2f}ms exceeds 25ms budget" + + +@pytest.mark.asyncio +async def test_callback_exception_does_not_kill_driver(): + errors: list[str] = [] + sched = DeadlineScheduler(on_error=lambda dl, e: errors.append(str(e))) + sched.start() + fired: list[str] = [] + sched.schedule_in(10, lambda: (_ for _ in ()).throw(RuntimeError("boom"))) + sched.schedule_in(40, lambda: fired.append("after")) + await asyncio.sleep(0.12) + await sched.stop() + assert errors and "boom" in errors[0] + assert fired == ["after"], "driver died after callback exception" + + +if __name__ == "__main__": + raise SystemExit(pytest.main([__file__, "-v"])) diff --git a/prod/clean_arch/violet/test_violet_harness.py b/prod/clean_arch/violet/test_violet_harness.py new file mode 100644 index 0000000..1018206 --- /dev/null +++ b/prod/clean_arch/violet/test_violet_harness.py @@ -0,0 +1,68 @@ +"""V0: ReactorHarness — determinism, report shape, MOCK bundle wiring.""" + +from __future__ import annotations + +import asyncio +import json +import sys + +sys.path.insert(0, "/mnt/dolphinng5_predict") +sys.path.insert(0, "/mnt/dolphinng5_predict/nautilus_dolphin") + +import pytest + +from prod.clean_arch.violet.harness import ( + HarnessReport, + ReactorHarness, + StormSpec, + generate_events, + sequence_hash, +) + + +def test_seed_determinism_sequence_hash(): + a = generate_events(StormSpec(n_events=500, seed=42)) + b = generate_events(StormSpec(n_events=500, seed=42)) + c = generate_events(StormSpec(n_events=500, seed=43)) + assert sequence_hash(a) == sequence_hash(b) + assert sequence_hash(a) != sequence_hash(c) + + +def test_event_mix_kinds_only_known(): + spec = StormSpec(n_events=300, seed=1) + kinds = {e["kind"] for e in generate_events(spec)} + assert kinds <= set(spec.mix.keys()) + + +@pytest.mark.asyncio +async def test_small_storm_report_shape_real_kernel(): + """Drives the REAL Rust kernel through a tiny storm via the MOCK bundle.""" + spec = StormSpec(n_events=400, seed=7, deadlines=50, + deadline_spread_ms=(5, 200), warmup_events=50) + harness = ReactorHarness() + report = await harness.run_storm(spec) + assert isinstance(report, HarnessReport) + payload = json.loads(report.to_json()) + for key in ("spec", "histograms", "gate", "passed", "sequence_hash", "meta"): + assert key in payload + for hist in ("venue_event_reaction", "kernel_call", "deadline_jitter"): + assert hist in payload["histograms"] + assert payload["histograms"][hist]["count"] > 0 + assert payload["gate"]["early_fires"] == 0 + assert payload["gate"]["deadlines_fired"] == 50 + # reaction samples exclude warmup + assert payload["histograms"]["venue_event_reaction"]["count"] == 400 - 50 + + +@pytest.mark.asyncio +async def test_storm_does_not_freeze_kernel_capital(): + """Account-event storm must leave the kernel tradeable (no reconcile + freeze from synthetic wallet noise — wallet values are kept coherent).""" + spec = StormSpec(n_events=300, seed=3, deadlines=10, warmup_events=10) + harness = ReactorHarness() + await harness.run_storm(spec) + assert harness.kernel.is_capital_frozen() is False + + +if __name__ == "__main__": + raise SystemExit(pytest.main([__file__, "-v"])) diff --git a/prod/clean_arch/violet/test_violet_v0_latency_gate.py b/prod/clean_arch/violet/test_violet_v0_latency_gate.py new file mode 100644 index 0000000..e35fab2 --- /dev/null +++ b/prod/clean_arch/violet/test_violet_v0_latency_gate.py @@ -0,0 +1,63 @@ +"""V0 LATENCY GATE — run on the prod host; this test IS the V0 stage gate. + +Excluded from default runs via the ``gate`` marker: + pytest -m gate prod/clean_arch/violet/test_violet_v0_latency_gate.py + +Budgets (charter): venue_event_reaction p99 < 10 ms; deadline_jitter +p99 < 25 ms; zero early fires. The JSON report is the gate artifact — +archived to prod/VIOLET_dev/reports/. +""" + +from __future__ import annotations + +import sys +import time +from pathlib import Path + +sys.path.insert(0, "/mnt/dolphinng5_predict") +sys.path.insert(0, "/mnt/dolphinng5_predict/nautilus_dolphin") + +import pytest + +from prod.clean_arch.violet.harness import ( + GATE_JITTER_P99_MS, + GATE_REACTION_P99_MS, + ReactorHarness, + StormSpec, +) + +REPORT_DIR = Path("/mnt/dolphinng5_predict/prod/VIOLET_dev/reports") + + +@pytest.mark.gate +@pytest.mark.asyncio +async def test_v0_latency_gate_storm(): + spec = StormSpec(n_events=50_000, seed=42, deadlines=5_000, + arrival="burst", warmup_events=1_000) + harness = ReactorHarness() + report = await harness.run_storm(spec) + + REPORT_DIR.mkdir(parents=True, exist_ok=True) + stamp = time.strftime("%Y%m%d_%H%M%S", time.gmtime()) + out = REPORT_DIR / f"violet_v0_latency_{stamp}.json" + out.write_text(report.to_json()) + print() + print(report.table()) + print(f"report: {out}") + + g = report.gate + assert g["early_fires"] == 0, "deadline fired EARLY" + assert g["deadlines_fired"] == spec.deadlines + assert g["reaction_p99_ms"] < GATE_REACTION_P99_MS, ( + f"venue_event_reaction p99 {g['reaction_p99_ms']:.3f}ms " + f">= {GATE_REACTION_P99_MS}ms budget" + ) + assert g["jitter_p99_ms"] < GATE_JITTER_P99_MS, ( + f"deadline_jitter p99 {g['jitter_p99_ms']:.3f}ms " + f">= {GATE_JITTER_P99_MS}ms budget" + ) + assert report.passed is True + + +if __name__ == "__main__": + raise SystemExit(pytest.main([__file__, "-v", "-m", "gate"]))