"""VIOLET V3b: Cadence Control Plane — per-action quantization. Spec: prod/docs/VIOLET_SPEC__CADENCE_CONTROL_PLANE.md (BINDING). VIOLET is a reactor substrate (V0 clock/DeadlineScheduler); BLUE's scan-quantized behaviour is a guest hosted on it. Scan cadence is a QUANTIZATION SETTING, not the architecture. Every scan-governed action is *evaluated* every reactor tick (the would-be action shadow-logged) and *actuated* only when its per-action quantum Q is crossed. Each action's Q is: - UNIVERSAL — every action (entry, sizing, each exit reason, each input plane) has one; nothing is hardcoded at scan. SL merely DEFAULTS tighter. - INDEPENDENT — changing one action's Q never touches another's. - CONTROL-PLANE — readable/writable at runtime (HZ-backed via refresh_from); code defaults are the floor when the control plane is silent. - LOOSENABLE — Q steps down per-action on its own schedule, each step gated on the shadow deltas (evaluate-vs-actuate) this layer makes visible. Exit-priority (CATASTROPHIC/ADVSL > mechanical TP > discretionary v7) is INVARIANT to Q and is enforced by the decision engine, not here — a faster Q never lets a discretionary exit mask a mechanical one (the LINK -$1,248 bug). """ from __future__ import annotations import enum from dataclasses import dataclass from typing import Callable, Dict, Optional # ── quantum constants (nanoseconds) ─────────────────────────────────────────── INSTA_Q_NS = 0 # actuate every reactor tick (no quantization) OBF_Q_NS = 1_000_000_000 # ~1s — OBF effective cadence (fastest service) SCAN_Q_NS = 5_000_000_000 # 5s — NG7 scan cadence (the certification anchor) class Action(str, enum.Enum): """Every scan-governed VIOLET action that carries an independent cadence knob.""" CATASTROPHIC_SL = "catastrophic_sl" ADVSL = "advsl" TP = "tp" V7_EXIT = "v7_exit" ENTRY = "entry" SIZING = "sizing" CONSUME_SCAN = "consume_scan" CONSUME_OBF = "consume_obf" CONSUME_EXF = "consume_exf" CONSUME_ESOF = "consume_esof" CONSUME_MARAS = "consume_maras" CONSUME_ACB = "consume_acb" # Default Q per action (spec §3). SL-class defaults to insta (the safety deviation); # everything else defaults to its source cadence — all loosenable independently. _DEFAULT_Q_NS: Dict[Action, int] = { Action.CATASTROPHIC_SL: INSTA_Q_NS, Action.ADVSL: INSTA_Q_NS, Action.TP: SCAN_Q_NS, Action.V7_EXIT: SCAN_Q_NS, Action.ENTRY: SCAN_Q_NS, Action.SIZING: SCAN_Q_NS, Action.CONSUME_SCAN: SCAN_Q_NS, Action.CONSUME_OBF: OBF_Q_NS, Action.CONSUME_EXF: SCAN_Q_NS, Action.CONSUME_ESOF: SCAN_Q_NS, Action.CONSUME_MARAS: SCAN_Q_NS, Action.CONSUME_ACB: SCAN_Q_NS, } @dataclass class CadenceKnob: """One action's tunable cadence. Mutable — the control plane is its only mutator. ``q_ns`` is the actuation quantum: 0 ⇒ actuate every reactor tick (insta); else actuate once ``q_ns`` has elapsed since the last actuation. ``evaluate_every_tick`` keeps the would-be action computed (and shadow-logged) at full reactor speed regardless of Q. """ action: Action q_ns: int evaluate_every_tick: bool = True enabled: bool = True source: str = "default" # "default" | "control_plane" def __post_init__(self) -> None: if int(self.q_ns) < 0: raise ValueError(f"q_ns must be >= 0 (got {self.q_ns} for {self.action})") self.q_ns = int(self.q_ns) class CadenceControlPlane: """Runtime registry of per-action cadence knobs (spec §4). Seeded from code defaults; ``refresh_from`` layers a runtime provider (HZ map / env) on top, per action, leaving untouched actions at their current value. """ def __init__(self) -> None: self._knobs: Dict[Action, CadenceKnob] = { a: CadenceKnob(action=a, q_ns=q) for a, q in _DEFAULT_Q_NS.items() } def get(self, action: Action) -> CadenceKnob: return self._knobs[action] def set( self, action: Action, *, q_ns: Optional[int] = None, evaluate_every_tick: Optional[bool] = None, enabled: Optional[bool] = None, source: str = "control_plane", ) -> CadenceKnob: """Independently override one action. Other actions are never touched.""" k = self._knobs[action] new = CadenceKnob( action=action, q_ns=k.q_ns if q_ns is None else int(q_ns), evaluate_every_tick=k.evaluate_every_tick if evaluate_every_tick is None else bool(evaluate_every_tick), enabled=k.enabled if enabled is None else bool(enabled), source=source, ) self._knobs[action] = new return new def refresh_from(self, provider: Callable[[Action], Optional[dict]]) -> int: """Pull runtime overrides from a provider (HZ map / env reader). ``provider(action)`` returns a dict of overrides or None (no override → keep current value, i.e. the code-default floor). Returns the count of actions updated. The control plane stays authoritative for live tuning. """ n = 0 for action in Action: ov = provider(action) if not ov: continue self.set(action, **{k: v for k, v in ov.items() if k in ("q_ns", "evaluate_every_tick", "enabled")}) n += 1 return n def due(self, action: Action, now_ns: int, last_actuation_ns: Optional[int]) -> bool: """Q-boundary test: should this action ACTUATE now? Disabled ⇒ never. ``last_actuation_ns is None`` (never actuated) ⇒ yes. q_ns == 0 (insta) ⇒ every tick. Else ⇒ once the quantum has elapsed. """ k = self._knobs[action] if not k.enabled: return False if last_actuation_ns is None: return True if k.q_ns == 0: return True return (int(now_ns) - int(last_actuation_ns)) >= k.q_ns # Alias matching spec §4 caller idiom (evaluate always, then consult to actuate). should_actuate = due def snapshot(self) -> Dict[str, dict]: """Surfaced view for live inspection (spec §4).""" return { a.value: { "q_ns": k.q_ns, "evaluate_every_tick": k.evaluate_every_tick, "enabled": k.enabled, "source": k.source, } for a, k in self._knobs.items() }