"""VIOLET reactor clock primitives (Stage V0). NOT a metronome. The system "clock" is: - ONE monotonic timebase (``mono_ns``) stamped on every event; - per-plane sequence clocks (``PlaneClock``) with staleness budgets — each data plane (scan ~5-6 s, venue push ~ms, account events) advances at its own rate and is judged stale against its own budget; - a deadline scheduler (``DeadlineScheduler``) for genuinely time-based work (router TTLs, SL guards): a timer heap driven by ONE coroutine, woken EARLY via asyncio.Event when an earlier deadline is inserted — the early-wake, not the tick, is what keeps jitter inside budget. Latency accounting uses ``LatencyHistogram``: raw reservoir samples with exact order-statistic percentiles (no bucket interpolation) so the V0 gate numbers are not artifacts of bucketing. """ from __future__ import annotations import asyncio import heapq import json import time from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional def mono_ns() -> int: """The single VIOLET timebase. Wall-clock is for report metadata only.""" return time.monotonic_ns() # ── Latency accounting ─────────────────────────────────────────────────────── class LatencyHistogram: """Raw-reservoir latency recorder with exact percentiles. Keeps up to ``reservoir`` raw samples (first-N retention: V0 storms are sized below the reservoir so retention is total; the cap only guards memory if a harness is misconfigured). Percentiles are exact order statistics over the retained samples. """ def __init__(self, name: str, *, reservoir: int = 200_000) -> None: self.name = name self.reservoir = int(reservoir) self._samples: List[int] = [] self._overflow = 0 self._max_ns = 0 self._min_ns: Optional[int] = None def record(self, dt_ns: int) -> None: dt = int(dt_ns) if dt > self._max_ns: self._max_ns = dt if self._min_ns is None or dt < self._min_ns: self._min_ns = dt if len(self._samples) < self.reservoir: self._samples.append(dt) else: self._overflow += 1 @property def count(self) -> int: return len(self._samples) + self._overflow def percentile_ns(self, p: float) -> int: """Exact order statistic (nearest-rank) over retained samples. nearest-rank: the smallest value whose cumulative fraction >= p, i.e. rank = ceil(p * n). (round(p*n + 0.5) is WRONG: banker's rounding makes round(5.5) == 6, shifting p50 of 1..10 to 6.) """ if not self._samples: return 0 ordered = sorted(self._samples) import math as _math rank = max(1, min(len(ordered), _math.ceil(p * len(ordered)))) return ordered[rank - 1] def to_dict(self) -> Dict[str, Any]: ms = 1e-6 return { "name": self.name, "count": self.count, "retained": len(self._samples), "overflow_dropped": self._overflow, "min_ms": (self._min_ns or 0) * ms, "p50_ms": self.percentile_ns(0.50) * ms, "p90_ms": self.percentile_ns(0.90) * ms, "p99_ms": self.percentile_ns(0.99) * ms, "p999_ms": self.percentile_ns(0.999) * ms, "max_ms": self._max_ns * ms, } def report(self) -> str: d = self.to_dict() return ( f"{d['name']:<24} n={d['count']:<8} " f"p50={d['p50_ms']:8.3f}ms p90={d['p90_ms']:8.3f}ms " f"p99={d['p99_ms']:8.3f}ms p99.9={d['p999_ms']:8.3f}ms " f"max={d['max_ms']:8.3f}ms" ) # ── Plane clocks ───────────────────────────────────────────────────────────── # Default staleness budgets (ns). Scan budget = two missed 6 s scans; venue # budget = 2 s without any push while connected; account = 5 s. These are # LABELS for health logic — never data cadences (operator rule: cadences are # sacred; staleness thresholds are tunable). SCAN_STALENESS_NS = 12_000_000_000 VENUE_STALENESS_NS = 2_000_000_000 ACCOUNT_STALENESS_NS = 5_000_000_000 @dataclass class PlaneClock: """Sequence clock for one data plane. ``seq`` increments on every observed plane event; ``last_mono_ns`` stamps it on the shared timebase. ``is_stale`` is strict (> budget): an age exactly equal to the budget is NOT stale, so a budget chosen as an exact multiple of the plane's natural cadence does not flap on-cycle (the MHS FIX-9 lesson: a threshold equal to the writer's refresh period is stale by construction). """ name: str staleness_budget_ns: int seq: int = 0 last_mono_ns: int = -1 def tick(self, now_ns: Optional[int] = None) -> int: self.seq += 1 self.last_mono_ns = mono_ns() if now_ns is None else int(now_ns) return self.seq def age_ns(self, now_ns: Optional[int] = None) -> int: """Nanoseconds since last tick; -1 if the plane has never ticked.""" if self.last_mono_ns < 0: return -1 now = mono_ns() if now_ns is None else int(now_ns) return max(0, now - self.last_mono_ns) def is_stale(self, now_ns: Optional[int] = None) -> bool: """True when the plane has ticked at least once AND age > budget. A never-ticked plane is 'not yet alive', a different condition from 'went stale' — callers gate startup on seq > 0 separately. """ age = self.age_ns(now_ns) return age > self.staleness_budget_ns if age >= 0 else False def snapshot(self) -> Dict[str, Any]: return { "name": self.name, "seq": self.seq, "last_mono_ns": self.last_mono_ns, "age_ns": self.age_ns(), "stale": self.is_stale(), "budget_ns": self.staleness_budget_ns, } # ── Deadline scheduler ─────────────────────────────────────────────────────── class Deadline: """Handle for a scheduled deadline. ``cancel()`` is idempotent.""" __slots__ = ("due_mono_ns", "name", "_cancelled", "_fired") def __init__(self, due_mono_ns: int, name: str = "") -> None: self.due_mono_ns = int(due_mono_ns) self.name = name self._cancelled = False self._fired = False @property def cancelled(self) -> bool: return self._cancelled @property def fired(self) -> bool: return self._fired def cancel(self) -> None: self._cancelled = True class DeadlineScheduler: """Single-driver asyncio deadline scheduler. One coroutine owns a heapq of ``(due_ns, n, Deadline, callback)``. It sleeps ``min(next_due - now, max_sleep_ms)`` and is woken EARLY through an asyncio.Event whenever a deadline earlier than the current head is inserted — early-wake is the jitter-budget mechanism; the max_sleep tick is only a liveness backstop. Callbacks are synchronous and MUST be non-blocking; async work is dispatched by the callback via ``loop.create_task``. A callback exception is isolated (logged via the optional ``on_error`` hook) and never kills the driver. """ def __init__( self, *, max_sleep_ms: int = 50, jitter_hist: Optional[LatencyHistogram] = None, on_error: Optional[Callable[[Deadline, BaseException], None]] = None, ) -> None: self._max_sleep_s = max(0.001, float(max_sleep_ms) / 1000.0) self.jitter_hist = jitter_hist self._on_error = on_error self._heap: List[tuple] = [] self._n = 0 self._wake = asyncio.Event() self._task: Optional[asyncio.Task] = None self._stopping = False self.fired_count = 0 self.cancelled_count = 0 # -- scheduling -------------------------------------------------------- def schedule_at( self, due_mono_ns: int, cb: Callable[[], None], *, name: str = "" ) -> Deadline: dl = Deadline(due_mono_ns, name) self._n += 1 earlier_than_head = not self._heap or due_mono_ns < self._heap[0][0] heapq.heappush(self._heap, (int(due_mono_ns), self._n, dl, cb)) if earlier_than_head: self._wake.set() # early-wake the driver for the new head return dl def schedule_in( self, delay_ms: float, cb: Callable[[], None], *, name: str = "" ) -> Deadline: return self.schedule_at( mono_ns() + int(float(delay_ms) * 1e6), cb, name=name ) @property def pending(self) -> int: return sum(1 for _, _, dl, _ in self._heap if not dl.cancelled and not dl.fired) # -- driver ------------------------------------------------------------ def start(self) -> None: if self._task is None or self._task.done(): self._stopping = False self._task = asyncio.get_running_loop().create_task( self._run(), name="violet_deadline_driver" ) async def stop(self) -> None: self._stopping = True self._wake.set() if self._task is not None: try: await self._task except asyncio.CancelledError: pass self._task = None async def _run(self) -> None: while not self._stopping: now = mono_ns() # Fire everything due. NEVER fire early: strict due <= now. while self._heap and self._heap[0][0] <= now: _, _, dl, cb = heapq.heappop(self._heap) if dl.cancelled: self.cancelled_count += 1 continue dl._fired = True self.fired_count += 1 if self.jitter_hist is not None: self.jitter_hist.record(mono_ns() - dl.due_mono_ns) try: cb() except BaseException as exc: # noqa: BLE001 — isolate callbacks if self._on_error is not None: try: self._on_error(dl, exc) except Exception: pass now = mono_ns() # Sleep until next head or the liveness tick, whichever first; # an earlier insert sets the event and we re-evaluate immediately. if self._heap: delay_s = min( self._max_sleep_s, max(0.0, (self._heap[0][0] - now) / 1e9), ) else: delay_s = self._max_sleep_s try: await asyncio.wait_for(self._wake.wait(), timeout=delay_s) except asyncio.TimeoutError: pass self._wake.clear() # ── Report bundling helper ─────────────────────────────────────────────────── def histograms_report(hists: List[LatencyHistogram]) -> str: return "\n".join(h.report() for h in hists) def histograms_json(hists: List[LatencyHistogram], **meta: Any) -> str: return json.dumps( {"meta": meta, "histograms": {h.name: h.to_dict() for h in hists}}, indent=2, default=str, )