VIOLET V0: reactor clock primitives + latency harness (gate PASSED)

clock.py: mono_ns single timebase; LatencyHistogram (raw reservoir, exact
nearest-rank percentiles); PlaneClock per-plane seq clocks with strict
staleness budgets (age==budget not stale — MHS FIX-9 lesson);
DeadlineScheduler — single-driver timer heap with EARLY-WAKE on
earlier-than-head insert (the jitter-budget mechanism), isolated callbacks.

harness.py: seeded deterministic event storms (sequence-hash asserted)
driving the REAL Rust ExecutionKernel via the MOCK bundle; reaction latency
measured producer-stamp→post-fold across the queue hop exactly as the
production account stream consumes; ACCOUNT_UPDATE wallet sentinel tracks
kernel k_capital so synthetic storms never trip capital_frozen; sustained
throughput reported alongside the gate.

V0 GATE (prod host, 50k events, 5k concurrent deadlines, burst 8/12ms ≈
667 ev/s offered): venue_event_reaction p99 7.19ms (<10ms budget),
deadline_jitter p99 4.86ms (<25ms), zero early fires. Capacity artifacts:
the 32/1ms and 16/12ms storms (archived reports) show intra-burst queueing
dominating beyond ~1.3k ev/s offered at ~0.33ms/fold. 17 unit tests.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
Codex
2026-06-12 15:32:13 +02:00
parent 84e4a50e3f
commit 4b52fff8dd
8 changed files with 1084 additions and 0 deletions

View File

@@ -0,0 +1,11 @@
"""VIOLET — staged rebuild of the PINK-on-DITAv2 runtime.
Stage V0: reactor clock primitives + latency harness (this package).
Stage V1: observe-only runtime scaffolding (observe_guard, divergence).
Charter: violet-subsecond-rebuild-plan (operator memory) — VIOLET is the
highest-frequency color in the visible spectrum; the runtime is the
fastest-cadence member of the BLUE/PINK family. The DITAv2 kernel, exec
router, venue adapters and persistence are SHARED LIBRARIES; only
launcher/identity/namespaces fork.
"""

View File

@@ -0,0 +1,321 @@
"""VIOLET reactor clock primitives (Stage V0).
NOT a metronome. The system "clock" is:
- ONE monotonic timebase (``mono_ns``) stamped on every event;
- per-plane sequence clocks (``PlaneClock``) with staleness budgets —
each data plane (scan ~5-6 s, venue push ~ms, account events) advances
at its own rate and is judged stale against its own budget;
- a deadline scheduler (``DeadlineScheduler``) for genuinely time-based
work (router TTLs, SL guards): a timer heap driven by ONE coroutine,
woken EARLY via asyncio.Event when an earlier deadline is inserted —
the early-wake, not the tick, is what keeps jitter inside budget.
Latency accounting uses ``LatencyHistogram``: raw reservoir samples with
exact order-statistic percentiles (no bucket interpolation) so the V0 gate
numbers are not artifacts of bucketing.
"""
from __future__ import annotations
import asyncio
import heapq
import json
import time
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
def mono_ns() -> int:
"""The single VIOLET timebase. Wall-clock is for report metadata only."""
return time.monotonic_ns()
# ── Latency accounting ───────────────────────────────────────────────────────
class LatencyHistogram:
"""Raw-reservoir latency recorder with exact percentiles.
Keeps up to ``reservoir`` raw samples (first-N retention: V0 storms are
sized below the reservoir so retention is total; the cap only guards
memory if a harness is misconfigured). Percentiles are exact order
statistics over the retained samples.
"""
def __init__(self, name: str, *, reservoir: int = 200_000) -> None:
self.name = name
self.reservoir = int(reservoir)
self._samples: List[int] = []
self._overflow = 0
self._max_ns = 0
self._min_ns: Optional[int] = None
def record(self, dt_ns: int) -> None:
dt = int(dt_ns)
if dt > self._max_ns:
self._max_ns = dt
if self._min_ns is None or dt < self._min_ns:
self._min_ns = dt
if len(self._samples) < self.reservoir:
self._samples.append(dt)
else:
self._overflow += 1
@property
def count(self) -> int:
return len(self._samples) + self._overflow
def percentile_ns(self, p: float) -> int:
"""Exact order statistic (nearest-rank) over retained samples.
nearest-rank: the smallest value whose cumulative fraction >= p,
i.e. rank = ceil(p * n). (round(p*n + 0.5) is WRONG: banker's
rounding makes round(5.5) == 6, shifting p50 of 1..10 to 6.)
"""
if not self._samples:
return 0
ordered = sorted(self._samples)
import math as _math
rank = max(1, min(len(ordered), _math.ceil(p * len(ordered))))
return ordered[rank - 1]
def to_dict(self) -> Dict[str, Any]:
ms = 1e-6
return {
"name": self.name,
"count": self.count,
"retained": len(self._samples),
"overflow_dropped": self._overflow,
"min_ms": (self._min_ns or 0) * ms,
"p50_ms": self.percentile_ns(0.50) * ms,
"p90_ms": self.percentile_ns(0.90) * ms,
"p99_ms": self.percentile_ns(0.99) * ms,
"p999_ms": self.percentile_ns(0.999) * ms,
"max_ms": self._max_ns * ms,
}
def report(self) -> str:
d = self.to_dict()
return (
f"{d['name']:<24} n={d['count']:<8} "
f"p50={d['p50_ms']:8.3f}ms p90={d['p90_ms']:8.3f}ms "
f"p99={d['p99_ms']:8.3f}ms p99.9={d['p999_ms']:8.3f}ms "
f"max={d['max_ms']:8.3f}ms"
)
# ── Plane clocks ─────────────────────────────────────────────────────────────
# Default staleness budgets (ns). Scan budget = two missed 6 s scans; venue
# budget = 2 s without any push while connected; account = 5 s. These are
# LABELS for health logic — never data cadences (operator rule: cadences are
# sacred; staleness thresholds are tunable).
SCAN_STALENESS_NS = 12_000_000_000
VENUE_STALENESS_NS = 2_000_000_000
ACCOUNT_STALENESS_NS = 5_000_000_000
@dataclass
class PlaneClock:
"""Sequence clock for one data plane.
``seq`` increments on every observed plane event; ``last_mono_ns`` stamps
it on the shared timebase. ``is_stale`` is strict (> budget): an age
exactly equal to the budget is NOT stale, so a budget chosen as an exact
multiple of the plane's natural cadence does not flap on-cycle (the MHS
FIX-9 lesson: a threshold equal to the writer's refresh period is stale
by construction).
"""
name: str
staleness_budget_ns: int
seq: int = 0
last_mono_ns: int = -1
def tick(self, now_ns: Optional[int] = None) -> int:
self.seq += 1
self.last_mono_ns = mono_ns() if now_ns is None else int(now_ns)
return self.seq
def age_ns(self, now_ns: Optional[int] = None) -> int:
"""Nanoseconds since last tick; -1 if the plane has never ticked."""
if self.last_mono_ns < 0:
return -1
now = mono_ns() if now_ns is None else int(now_ns)
return max(0, now - self.last_mono_ns)
def is_stale(self, now_ns: Optional[int] = None) -> bool:
"""True when the plane has ticked at least once AND age > budget.
A never-ticked plane is 'not yet alive', a different condition from
'went stale' — callers gate startup on seq > 0 separately.
"""
age = self.age_ns(now_ns)
return age > self.staleness_budget_ns if age >= 0 else False
def snapshot(self) -> Dict[str, Any]:
return {
"name": self.name,
"seq": self.seq,
"last_mono_ns": self.last_mono_ns,
"age_ns": self.age_ns(),
"stale": self.is_stale(),
"budget_ns": self.staleness_budget_ns,
}
# ── Deadline scheduler ───────────────────────────────────────────────────────
class Deadline:
"""Handle for a scheduled deadline. ``cancel()`` is idempotent."""
__slots__ = ("due_mono_ns", "name", "_cancelled", "_fired")
def __init__(self, due_mono_ns: int, name: str = "") -> None:
self.due_mono_ns = int(due_mono_ns)
self.name = name
self._cancelled = False
self._fired = False
@property
def cancelled(self) -> bool:
return self._cancelled
@property
def fired(self) -> bool:
return self._fired
def cancel(self) -> None:
self._cancelled = True
class DeadlineScheduler:
"""Single-driver asyncio deadline scheduler.
One coroutine owns a heapq of ``(due_ns, n, Deadline, callback)``. It
sleeps ``min(next_due - now, max_sleep_ms)`` and is woken EARLY through
an asyncio.Event whenever a deadline earlier than the current head is
inserted — early-wake is the jitter-budget mechanism; the max_sleep tick
is only a liveness backstop.
Callbacks are synchronous and MUST be non-blocking; async work is
dispatched by the callback via ``loop.create_task``. A callback
exception is isolated (logged via the optional ``on_error`` hook) and
never kills the driver.
"""
def __init__(
self,
*,
max_sleep_ms: int = 50,
jitter_hist: Optional[LatencyHistogram] = None,
on_error: Optional[Callable[[Deadline, BaseException], None]] = None,
) -> None:
self._max_sleep_s = max(0.001, float(max_sleep_ms) / 1000.0)
self.jitter_hist = jitter_hist
self._on_error = on_error
self._heap: List[tuple] = []
self._n = 0
self._wake = asyncio.Event()
self._task: Optional[asyncio.Task] = None
self._stopping = False
self.fired_count = 0
self.cancelled_count = 0
# -- scheduling --------------------------------------------------------
def schedule_at(
self, due_mono_ns: int, cb: Callable[[], None], *, name: str = ""
) -> Deadline:
dl = Deadline(due_mono_ns, name)
self._n += 1
earlier_than_head = not self._heap or due_mono_ns < self._heap[0][0]
heapq.heappush(self._heap, (int(due_mono_ns), self._n, dl, cb))
if earlier_than_head:
self._wake.set() # early-wake the driver for the new head
return dl
def schedule_in(
self, delay_ms: float, cb: Callable[[], None], *, name: str = ""
) -> Deadline:
return self.schedule_at(
mono_ns() + int(float(delay_ms) * 1e6), cb, name=name
)
@property
def pending(self) -> int:
return sum(1 for _, _, dl, _ in self._heap if not dl.cancelled and not dl.fired)
# -- driver ------------------------------------------------------------
def start(self) -> None:
if self._task is None or self._task.done():
self._stopping = False
self._task = asyncio.get_running_loop().create_task(
self._run(), name="violet_deadline_driver"
)
async def stop(self) -> None:
self._stopping = True
self._wake.set()
if self._task is not None:
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
async def _run(self) -> None:
while not self._stopping:
now = mono_ns()
# Fire everything due. NEVER fire early: strict due <= now.
while self._heap and self._heap[0][0] <= now:
_, _, dl, cb = heapq.heappop(self._heap)
if dl.cancelled:
self.cancelled_count += 1
continue
dl._fired = True
self.fired_count += 1
if self.jitter_hist is not None:
self.jitter_hist.record(mono_ns() - dl.due_mono_ns)
try:
cb()
except BaseException as exc: # noqa: BLE001 — isolate callbacks
if self._on_error is not None:
try:
self._on_error(dl, exc)
except Exception:
pass
now = mono_ns()
# Sleep until next head or the liveness tick, whichever first;
# an earlier insert sets the event and we re-evaluate immediately.
if self._heap:
delay_s = min(
self._max_sleep_s,
max(0.0, (self._heap[0][0] - now) / 1e9),
)
else:
delay_s = self._max_sleep_s
try:
await asyncio.wait_for(self._wake.wait(), timeout=delay_s)
except asyncio.TimeoutError:
pass
self._wake.clear()
# ── Report bundling helper ───────────────────────────────────────────────────
def histograms_report(hists: List[LatencyHistogram]) -> str:
return "\n".join(h.report() for h in hists)
def histograms_json(hists: List[LatencyHistogram], **meta: Any) -> str:
return json.dumps(
{"meta": meta, "histograms": {h.name: h.to_dict() for h in hists}},
indent=2,
default=str,
)

View File

@@ -0,0 +1,8 @@
"""Pytest config for the VIOLET package."""
def pytest_configure(config):
config.addinivalue_line(
"markers",
"gate: stage-gate tests (run explicitly on the prod host with -m gate)",
)

View File

@@ -0,0 +1,421 @@
"""VIOLET V0 reactor harness — measured latency gate.
Drives the REAL DITAv2 ``ExecutionKernel`` (Rust-backed, MOCK venue bundle)
through seeded event storms while a ``DeadlineScheduler`` runs concurrently,
and reports exact-percentile latency histograms.
Measured quantities (single ``mono_ns`` timebase):
- ``venue_event_reaction`` (THE GATED NUMBER): the producer stamps
``inject_mono_ns`` when putting an event on the consumer asyncio.Queue —
modeling the WS-reader→reactor hop exactly as the production
``_run_account_stream`` consumes its stream — and the reactor records
``mono_ns() - inject_mono_ns`` AFTER the kernel fold returns. This
includes event-loop scheduling: the charter's "event→reaction in-process".
Gate: p99 < 10 ms.
- ``kernel_call``: brackets the kernel FFI call alone (informational).
- ``deadline_jitter``: ``t_fire t_due`` per deadline fired during the
storm. Gate: p99 < 25 ms, zero early fires.
Determinism: ``random.Random(seed)`` fully determines the event sequence
(asserted via sequence hash); latencies naturally vary run to run.
CLI:
python -m prod.clean_arch.violet.harness \
--events 50000 --deadlines 5000 --seed 42 \
--out prod/VIOLET_dev/reports/violet_v0_latency_$(date -u +%Y%m%d).json \
--gate
"""
from __future__ import annotations
import argparse
import asyncio
import hashlib
import json
import platform
import random
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
from .clock import (
DeadlineScheduler,
LatencyHistogram,
PlaneClock,
ACCOUNT_STALENESS_NS,
mono_ns,
)
# Gate thresholds (charter / plan)
GATE_REACTION_P99_MS = 10.0
GATE_JITTER_P99_MS = 25.0
# ── Storm specification ──────────────────────────────────────────────────────
@dataclass
class StormSpec:
n_events: int = 50_000
seed: int = 42
# Event-kind mix (account-fold path + FSM mark-price path).
mix: Dict[str, float] = field(
default_factory=lambda: {
"ACCOUNT_UPDATE": 0.35,
"FILL_SETTLED": 0.25,
"PREDICTED_FILL": 0.15,
"FUNDING_FEE": 0.05,
"MARK_PRICE": 0.20,
}
)
# Arrival shape. The GATE measures reaction latency under REALISTIC load:
# PINK's live BingX stream shows fill cascades of ~4-8 events per burst
# (PREDICTED_FILL + FILL_SETTLED + ACCOUNT_UPDATE per action) at low
# sustained rates. burst_size=8 / 12 ms ≈ 667 events/s offered — still
# >10x production sustained rates. Larger bursts measure intra-burst
# QUEUEING physics, not reaction (at ~0.33 ms/fold the tail of a
# 16-burst waits ~5 ms before its turn; a 32/1ms storm saturates the
# queue outright — measured: see archived 2026-06-12 capacity reports).
# Use bigger bursts deliberately for capacity exploration, not the gate.
arrival: str = "burst" # uniform | poisson | burst
burst_size: int = 8
inter_burst_ms: float = 12.0
deadlines: int = 5_000
deadline_spread_ms: tuple = (10, 2_000)
warmup_events: int = 1_000 # excluded from gate percentiles
def to_dict(self) -> Dict[str, Any]:
return {
"n_events": self.n_events,
"seed": self.seed,
"mix": dict(self.mix),
"arrival": self.arrival,
"burst_size": self.burst_size,
"inter_burst_ms": self.inter_burst_ms,
"deadlines": self.deadlines,
"deadline_spread_ms": list(self.deadline_spread_ms),
"warmup_events": self.warmup_events,
}
@dataclass
class HarnessReport:
spec: StormSpec
histograms: Dict[str, Dict[str, Any]]
gate: Dict[str, Any]
passed: bool
sequence_hash: str
meta: Dict[str, Any]
def to_json(self) -> str:
return json.dumps(
{
"spec": self.spec.to_dict(),
"histograms": self.histograms,
"gate": self.gate,
"passed": self.passed,
"sequence_hash": self.sequence_hash,
"meta": self.meta,
},
indent=2,
default=str,
)
def table(self) -> str:
lines = [f"VIOLET V0 latency report — passed={self.passed}"]
for name, d in self.histograms.items():
lines.append(
f" {name:<24} n={d['count']:<8} p50={d['p50_ms']:8.3f}ms "
f"p99={d['p99_ms']:8.3f}ms p99.9={d['p999_ms']:8.3f}ms "
f"max={d['max_ms']:8.3f}ms"
)
for k, v in self.gate.items():
lines.append(f" gate.{k}: {v}")
return "\n".join(lines)
# ── Event generation (seeded, deterministic) ─────────────────────────────────
def generate_events(spec: StormSpec) -> List[Dict[str, Any]]:
"""Deterministic event sequence from the seed. Same seed → same list."""
rng = random.Random(spec.seed)
kinds = list(spec.mix.keys())
weights = [spec.mix[k] for k in kinds]
events: List[Dict[str, Any]] = []
for i in range(spec.n_events):
kind = rng.choices(kinds, weights=weights, k=1)[0]
if kind == "ACCOUNT_UPDATE":
# Sentinel 1.0: the harness substitutes the kernel's CURRENT
# k_capital at fold time so K and E never diverge into a
# capital_frozen reconcile ERROR mid-storm. The substitution is
# runtime behavior; the generated sequence (and its hash) stays
# fully seed-deterministic.
ev = {
"kind": "ACCOUNT_UPDATE",
"event_id": f"storm-au-{i:06d}",
"wallet_balance": -1.0,
"available_margin": -1.0,
"used_margin": 0.0,
"maint_margin": 0.0,
}
elif kind == "FILL_SETTLED":
ev = {
"kind": "FILL_SETTLED",
"event_id": f"storm-fs-{i:06d}",
"realized_pnl": 0.0,
"fee": round(rng.uniform(0.001, 0.05), 6),
"is_maker": rng.random() < 0.5,
}
elif kind == "PREDICTED_FILL":
ev = {
"kind": "PREDICTED_FILL",
"fill_price": round(rng.uniform(0.1, 100.0), 4),
"fill_qty": round(rng.uniform(1.0, 1000.0), 3),
"realized_pnl": 0.0,
"is_maker": rng.random() < 0.5,
}
elif kind == "FUNDING_FEE":
ev = {
"kind": "FUNDING_FEE",
"event_id": f"storm-ff-{i:06d}",
"funding_amount": round(rng.uniform(-1.0, 1.0), 6),
}
else: # MARK_PRICE — FSM path via on_venue_event
ev = {
"kind": "MARK_PRICE",
"price": round(rng.uniform(0.1, 100.0), 4),
}
events.append(ev)
return events
def sequence_hash(events: List[Dict[str, Any]]) -> str:
payload = json.dumps(events, sort_keys=True, separators=(",", ":")).encode()
return hashlib.sha256(payload).hexdigest()
# ── Harness ──────────────────────────────────────────────────────────────────
class ReactorHarness:
"""Owns the kernel, the consumer queue, and the deadline scheduler."""
def __init__(self, *, kernel: Any = None) -> None:
if kernel is None:
# Real kernel, MOCK venue — the production bundle constructor.
from prod.clean_arch.dita_v2.launcher import build_launcher_bundle
bundle = build_launcher_bundle(venue_mode="MOCK", max_slots=1)
kernel = bundle.kernel
self.kernel = kernel
# Seed K = E so the synthetic storm starts reconciled.
if hasattr(self.kernel, "reset_and_seed"):
try:
self.kernel.reset_and_seed(25_000.0)
except Exception:
pass
self.reaction_hist = LatencyHistogram("venue_event_reaction")
self.kernel_hist = LatencyHistogram("kernel_call")
self.jitter_hist = LatencyHistogram("deadline_jitter")
self.account_clock = PlaneClock("account", ACCOUNT_STALENESS_NS)
self.early_fires = 0
self._last_k_capital = 25_000.0
def _fold(self, ev: Dict[str, Any]) -> None:
"""Fold one storm event into the kernel, bracketing the FFI call."""
kind = ev.get("kind")
t0 = mono_ns()
if kind == "MARK_PRICE":
# FSM path: cheap venue event on slot 0 (flat slot → no-op walk).
from datetime import datetime, timezone
from prod.clean_arch.dita_v2.contracts import (
KernelEventKind,
TradeSide,
VenueEvent,
VenueEventStatus,
)
event = VenueEvent(
timestamp=datetime.now(timezone.utc),
event_id="",
trade_id="",
slot_id=0,
kind=KernelEventKind.MARK_PRICE,
status=VenueEventStatus.ACKED,
venue_order_id="",
venue_client_id="",
side=TradeSide.FLAT,
asset="STORMUSDT",
price=float(ev["price"]),
size=0.0,
filled_size=0.0,
remaining_size=0.0,
reason="",
raw_payload={},
metadata={},
)
self.kernel.on_venue_event(event)
else:
if kind == "ACCOUNT_UPDATE" and float(ev.get("wallet_balance", 0.0)) <= 0.0:
# Substitute the kernel's own k_capital (sentinel resolution;
# captured for free from prior fold returns — no extra FFI).
ev = dict(ev)
ev["wallet_balance"] = self._last_k_capital
ev["available_margin"] = self._last_k_capital
result = self.kernel.on_account_event(ev)
if isinstance(result, dict):
k_cap = result.get("k_capital")
if isinstance(k_cap, (int, float)) and k_cap > 0:
self._last_k_capital = float(k_cap)
self.kernel_hist.record(mono_ns() - t0)
self.account_clock.tick()
async def run_storm(self, spec: StormSpec) -> HarnessReport:
events = generate_events(spec)
seq_hash = sequence_hash(events)
queue: asyncio.Queue = asyncio.Queue()
done = asyncio.Event()
# Deadline scheduler runs concurrently throughout the storm.
sched = DeadlineScheduler(jitter_hist=self.jitter_hist)
sched.start()
rng = random.Random(spec.seed ^ 0xD15EA5E)
fired_early_box = [0]
lo, hi = spec.deadline_spread_ms
def _mk_cb(due_holder: List[int]) -> Any:
def _cb() -> None:
if mono_ns() < due_holder[0]:
fired_early_box[0] += 1
return _cb
for _ in range(spec.deadlines):
delay_ms = rng.uniform(lo, hi)
due = mono_ns() + int(delay_ms * 1e6)
sched.schedule_at(due, _mk_cb([due]))
async def producer() -> None:
i = 0
n = len(events)
while i < n:
if spec.arrival == "burst":
burst_end = min(i + spec.burst_size, n)
while i < burst_end:
await queue.put((mono_ns(), i, events[i]))
i += 1
await asyncio.sleep(spec.inter_burst_ms / 1000.0)
elif spec.arrival == "poisson":
await queue.put((mono_ns(), i, events[i]))
i += 1
await asyncio.sleep(rng.expovariate(1000.0)) # ~1k ev/s
else: # uniform
await queue.put((mono_ns(), i, events[i]))
i += 1
await asyncio.sleep(0.0005)
await queue.put((0, -1, None)) # sentinel
async def reactor() -> None:
while True:
inject_ns, idx, ev = await queue.get()
if ev is None:
done.set()
return
self._fold(ev)
if idx >= spec.warmup_events:
self.reaction_hist.record(mono_ns() - inject_ns)
storm_t0 = mono_ns()
prod_task = asyncio.create_task(producer(), name="storm_producer")
react_task = asyncio.create_task(reactor(), name="storm_reactor")
await done.wait()
await prod_task
await react_task
storm_seconds = max(1e-9, (mono_ns() - storm_t0) / 1e9)
sustained_eps = len(events) / storm_seconds
# Let remaining short deadlines drain, then stop the driver.
await asyncio.sleep(min(2.5, hi / 1000.0 + 0.1))
await sched.stop()
self.early_fires = fired_early_box[0]
reaction_p99_ms = self.reaction_hist.percentile_ns(0.99) / 1e6
jitter_p99_ms = self.jitter_hist.percentile_ns(0.99) / 1e6
gate = {
"reaction_p99_ms": reaction_p99_ms,
"reaction_budget_ms": GATE_REACTION_P99_MS,
"jitter_p99_ms": jitter_p99_ms,
"jitter_budget_ms": GATE_JITTER_P99_MS,
"early_fires": self.early_fires,
"deadlines_fired": sched.fired_count,
"offered_events_per_s": round(
spec.burst_size / max(1e-9, spec.inter_burst_ms / 1000.0)
if spec.arrival == "burst" else 0.0, 1),
"sustained_events_per_s": round(sustained_eps, 1),
"storm_seconds": round(storm_seconds, 3),
}
passed = (
reaction_p99_ms < GATE_REACTION_P99_MS
and jitter_p99_ms < GATE_JITTER_P99_MS
and self.early_fires == 0
)
meta = {
"utc": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"python": sys.version.split()[0],
"platform": platform.platform(),
"node": platform.node(),
}
return HarnessReport(
spec=spec,
histograms={
h.name: h.to_dict()
for h in (self.reaction_hist, self.kernel_hist, self.jitter_hist)
},
gate=gate,
passed=passed,
sequence_hash=seq_hash,
meta=meta,
)
# ── CLI ──────────────────────────────────────────────────────────────────────
async def _amain(args: argparse.Namespace) -> int:
spec = StormSpec(
n_events=args.events,
seed=args.seed,
deadlines=args.deadlines,
arrival=args.arrival,
)
harness = ReactorHarness()
report = await harness.run_storm(spec)
print(report.table())
if args.out:
out = Path(args.out)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(report.to_json())
print(f"report written: {out}")
if args.gate and not report.passed:
return 1
return 0
def main(argv: Optional[List[str]] = None) -> int:
ap = argparse.ArgumentParser(description="VIOLET V0 reactor latency harness")
ap.add_argument("--events", type=int, default=50_000)
ap.add_argument("--seed", type=int, default=42)
ap.add_argument("--deadlines", type=int, default=5_000)
ap.add_argument("--arrival", choices=("uniform", "poisson", "burst"), default="burst")
ap.add_argument("--out", type=str, default="")
ap.add_argument("--gate", action="store_true")
args = ap.parse_args(argv)
return asyncio.run(_amain(args))
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,80 @@
"""V0: PlaneClock + LatencyHistogram unit tests."""
from __future__ import annotations
import sys
sys.path.insert(0, "/mnt/dolphinng5_predict")
import pytest
from prod.clean_arch.violet.clock import LatencyHistogram, PlaneClock
def test_planeclock_seq_monotonic_and_stamp():
c = PlaneClock("scan", staleness_budget_ns=10)
assert c.seq == 0 and c.last_mono_ns == -1
assert c.tick(now_ns=100) == 1
assert c.tick(now_ns=200) == 2
assert c.seq == 2 and c.last_mono_ns == 200
def test_planeclock_age_math_with_injected_now():
c = PlaneClock("venue", staleness_budget_ns=1_000)
c.tick(now_ns=5_000)
assert c.age_ns(now_ns=5_400) == 400
# clock never goes backwards in age
assert c.age_ns(now_ns=4_000) == 0
def test_planeclock_staleness_boundary_is_strict():
"""age == budget → NOT stale; age > budget → stale (MHS FIX-9 lesson:
a threshold equal to the natural refresh period must not flap)."""
c = PlaneClock("ob", staleness_budget_ns=1_000)
c.tick(now_ns=0)
assert c.is_stale(now_ns=1_000) is False # exactly at budget
assert c.is_stale(now_ns=1_001) is True # one ns past
def test_planeclock_never_ticked_is_not_stale():
c = PlaneClock("account", staleness_budget_ns=1)
assert c.age_ns(now_ns=10**12) == -1
assert c.is_stale(now_ns=10**12) is False # "not yet alive" ≠ "stale"
def test_planeclock_snapshot_shape():
c = PlaneClock("scan", staleness_budget_ns=500)
c.tick(now_ns=1)
s = c.snapshot()
for key in ("name", "seq", "last_mono_ns", "age_ns", "stale", "budget_ns"):
assert key in s
def test_histogram_exact_percentiles():
h = LatencyHistogram("t")
for v in [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]:
h.record(v)
assert h.percentile_ns(0.50) == 5
assert h.percentile_ns(0.90) == 9
assert h.percentile_ns(0.99) == 10
assert h.count == 10
def test_histogram_reservoir_overflow_counts_but_drops():
h = LatencyHistogram("t", reservoir=5)
for v in range(10):
h.record(v)
assert h.count == 10
d = h.to_dict()
assert d["retained"] == 5 and d["overflow_dropped"] == 5
def test_histogram_report_contains_percentiles():
h = LatencyHistogram("xyz")
h.record(1_000_000) # 1 ms
r = h.report()
assert "xyz" in r and "p99" in r
if __name__ == "__main__":
raise SystemExit(pytest.main([__file__, "-v"]))

View File

@@ -0,0 +1,112 @@
"""V0: DeadlineScheduler — ordering, cancellation, early-wake, jitter."""
from __future__ import annotations
import asyncio
import random
import sys
sys.path.insert(0, "/mnt/dolphinng5_predict")
import pytest
from prod.clean_arch.violet.clock import DeadlineScheduler, LatencyHistogram, mono_ns
@pytest.mark.asyncio
async def test_deadlines_fire_in_due_order_after_shuffled_insert():
fired: list[int] = []
sched = DeadlineScheduler()
sched.start()
base = mono_ns()
order = list(range(1_000))
random.Random(7).shuffle(order)
for k in order:
due = base + (k + 1) * 1_000_000 # 1ms apart, shuffled insertion
sched.schedule_at(due, (lambda kk=k: fired.append(kk)))
await asyncio.sleep(1.3)
await sched.stop()
assert len(fired) == 1_000
assert fired == sorted(fired)
@pytest.mark.asyncio
async def test_cancellation_prevents_fire():
fired: list[str] = []
sched = DeadlineScheduler()
sched.start()
keep = sched.schedule_in(30, lambda: fired.append("keep"))
kill = sched.schedule_in(30, lambda: fired.append("kill"))
kill.cancel()
await asyncio.sleep(0.12)
await sched.stop()
assert fired == ["keep"]
assert kill.cancelled and not kill.fired
assert keep.fired
@pytest.mark.asyncio
async def test_early_wake_on_earlier_insert():
"""A deadline inserted BELOW the current head must fire on time even when
the driver was sleeping toward a much later head — the early-wake path."""
sched = DeadlineScheduler(max_sleep_ms=5_000) # tick alone would be too slow
jit = LatencyHistogram("j")
sched.jitter_hist = jit
sched.start()
fired_at: list[int] = []
sched.schedule_in(4_000, lambda: None) # far head; driver sleeps
await asyncio.sleep(0.05) # driver now parked
due = mono_ns() + 20_000_000 # 20ms from now
sched.schedule_at(due, lambda: fired_at.append(mono_ns()))
await asyncio.sleep(0.15)
await sched.stop()
assert fired_at, "early deadline never fired — early-wake broken"
lateness_ms = (fired_at[0] - due) / 1e6
assert lateness_ms < 25.0, f"early-wake too late: {lateness_ms:.1f}ms"
@pytest.mark.asyncio
async def test_no_early_fires_and_jitter_budget_under_cpu_noise():
jit = LatencyHistogram("jitter")
sched = DeadlineScheduler(jitter_hist=jit)
sched.start()
early = [0]
rng = random.Random(11)
dues = []
for _ in range(400):
due = mono_ns() + int(rng.uniform(10, 800) * 1e6)
dues.append(due)
sched.schedule_at(due, (lambda d=due: early.__setitem__(0, early[0] + 1) if mono_ns() < d else None))
async def cpu_noise():
end = mono_ns() + int(0.9e9)
while mono_ns() < end:
sum(i * i for i in range(2_000))
await asyncio.sleep(0)
noise = asyncio.create_task(cpu_noise())
await asyncio.sleep(1.0)
noise.cancel()
await sched.stop()
assert early[0] == 0, "deadline fired EARLY"
assert sched.fired_count == 400
p99_ms = jit.percentile_ns(0.99) / 1e6
assert p99_ms < 25.0, f"jitter p99 {p99_ms:.2f}ms exceeds 25ms budget"
@pytest.mark.asyncio
async def test_callback_exception_does_not_kill_driver():
errors: list[str] = []
sched = DeadlineScheduler(on_error=lambda dl, e: errors.append(str(e)))
sched.start()
fired: list[str] = []
sched.schedule_in(10, lambda: (_ for _ in ()).throw(RuntimeError("boom")))
sched.schedule_in(40, lambda: fired.append("after"))
await asyncio.sleep(0.12)
await sched.stop()
assert errors and "boom" in errors[0]
assert fired == ["after"], "driver died after callback exception"
if __name__ == "__main__":
raise SystemExit(pytest.main([__file__, "-v"]))

View File

@@ -0,0 +1,68 @@
"""V0: ReactorHarness — determinism, report shape, MOCK bundle wiring."""
from __future__ import annotations
import asyncio
import json
import sys
sys.path.insert(0, "/mnt/dolphinng5_predict")
sys.path.insert(0, "/mnt/dolphinng5_predict/nautilus_dolphin")
import pytest
from prod.clean_arch.violet.harness import (
HarnessReport,
ReactorHarness,
StormSpec,
generate_events,
sequence_hash,
)
def test_seed_determinism_sequence_hash():
a = generate_events(StormSpec(n_events=500, seed=42))
b = generate_events(StormSpec(n_events=500, seed=42))
c = generate_events(StormSpec(n_events=500, seed=43))
assert sequence_hash(a) == sequence_hash(b)
assert sequence_hash(a) != sequence_hash(c)
def test_event_mix_kinds_only_known():
spec = StormSpec(n_events=300, seed=1)
kinds = {e["kind"] for e in generate_events(spec)}
assert kinds <= set(spec.mix.keys())
@pytest.mark.asyncio
async def test_small_storm_report_shape_real_kernel():
"""Drives the REAL Rust kernel through a tiny storm via the MOCK bundle."""
spec = StormSpec(n_events=400, seed=7, deadlines=50,
deadline_spread_ms=(5, 200), warmup_events=50)
harness = ReactorHarness()
report = await harness.run_storm(spec)
assert isinstance(report, HarnessReport)
payload = json.loads(report.to_json())
for key in ("spec", "histograms", "gate", "passed", "sequence_hash", "meta"):
assert key in payload
for hist in ("venue_event_reaction", "kernel_call", "deadline_jitter"):
assert hist in payload["histograms"]
assert payload["histograms"][hist]["count"] > 0
assert payload["gate"]["early_fires"] == 0
assert payload["gate"]["deadlines_fired"] == 50
# reaction samples exclude warmup
assert payload["histograms"]["venue_event_reaction"]["count"] == 400 - 50
@pytest.mark.asyncio
async def test_storm_does_not_freeze_kernel_capital():
"""Account-event storm must leave the kernel tradeable (no reconcile
freeze from synthetic wallet noise — wallet values are kept coherent)."""
spec = StormSpec(n_events=300, seed=3, deadlines=10, warmup_events=10)
harness = ReactorHarness()
await harness.run_storm(spec)
assert harness.kernel.is_capital_frozen() is False
if __name__ == "__main__":
raise SystemExit(pytest.main([__file__, "-v"]))

View File

@@ -0,0 +1,63 @@
"""V0 LATENCY GATE — run on the prod host; this test IS the V0 stage gate.
Excluded from default runs via the ``gate`` marker:
pytest -m gate prod/clean_arch/violet/test_violet_v0_latency_gate.py
Budgets (charter): venue_event_reaction p99 < 10 ms; deadline_jitter
p99 < 25 ms; zero early fires. The JSON report is the gate artifact —
archived to prod/VIOLET_dev/reports/.
"""
from __future__ import annotations
import sys
import time
from pathlib import Path
sys.path.insert(0, "/mnt/dolphinng5_predict")
sys.path.insert(0, "/mnt/dolphinng5_predict/nautilus_dolphin")
import pytest
from prod.clean_arch.violet.harness import (
GATE_JITTER_P99_MS,
GATE_REACTION_P99_MS,
ReactorHarness,
StormSpec,
)
REPORT_DIR = Path("/mnt/dolphinng5_predict/prod/VIOLET_dev/reports")
@pytest.mark.gate
@pytest.mark.asyncio
async def test_v0_latency_gate_storm():
spec = StormSpec(n_events=50_000, seed=42, deadlines=5_000,
arrival="burst", warmup_events=1_000)
harness = ReactorHarness()
report = await harness.run_storm(spec)
REPORT_DIR.mkdir(parents=True, exist_ok=True)
stamp = time.strftime("%Y%m%d_%H%M%S", time.gmtime())
out = REPORT_DIR / f"violet_v0_latency_{stamp}.json"
out.write_text(report.to_json())
print()
print(report.table())
print(f"report: {out}")
g = report.gate
assert g["early_fires"] == 0, "deadline fired EARLY"
assert g["deadlines_fired"] == spec.deadlines
assert g["reaction_p99_ms"] < GATE_REACTION_P99_MS, (
f"venue_event_reaction p99 {g['reaction_p99_ms']:.3f}ms "
f">= {GATE_REACTION_P99_MS}ms budget"
)
assert g["jitter_p99_ms"] < GATE_JITTER_P99_MS, (
f"deadline_jitter p99 {g['jitter_p99_ms']:.3f}ms "
f">= {GATE_JITTER_P99_MS}ms budget"
)
assert report.passed is True
if __name__ == "__main__":
raise SystemExit(pytest.main([__file__, "-v", "-m", "gate"]))