Files
siloqy/prod/clean_arch/violet/cadence.py

171 lines
6.6 KiB
Python
Raw Normal View History

"""VIOLET V3b: Cadence Control Plane — per-action quantization.
Spec: prod/docs/VIOLET_SPEC__CADENCE_CONTROL_PLANE.md (BINDING).
VIOLET is a reactor substrate (V0 clock/DeadlineScheduler); BLUE's scan-quantized
behaviour is a guest hosted on it. Scan cadence is a QUANTIZATION SETTING, not the
architecture. Every scan-governed action is *evaluated* every reactor tick (the
would-be action shadow-logged) and *actuated* only when its per-action quantum Q is
crossed. Each action's Q is:
- UNIVERSAL every action (entry, sizing, each exit reason, each input plane)
has one; nothing is hardcoded at scan. SL merely DEFAULTS tighter.
- INDEPENDENT changing one action's Q never touches another's.
- CONTROL-PLANE readable/writable at runtime (HZ-backed via refresh_from); code
defaults are the floor when the control plane is silent.
- LOOSENABLE Q steps down per-action on its own schedule, each step gated on
the shadow deltas (evaluate-vs-actuate) this layer makes visible.
Exit-priority (CATASTROPHIC/ADVSL > mechanical TP > discretionary v7) is INVARIANT to
Q and is enforced by the decision engine, not here a faster Q never lets a
discretionary exit mask a mechanical one (the LINK -$1,248 bug).
"""
from __future__ import annotations
import enum
from dataclasses import dataclass
from typing import Callable, Dict, Optional
# ── quantum constants (nanoseconds) ───────────────────────────────────────────
INSTA_Q_NS = 0 # actuate every reactor tick (no quantization)
OBF_Q_NS = 1_000_000_000 # ~1s — OBF effective cadence (fastest service)
SCAN_Q_NS = 5_000_000_000 # 5s — NG7 scan cadence (the certification anchor)
class Action(str, enum.Enum):
"""Every scan-governed VIOLET action that carries an independent cadence knob."""
CATASTROPHIC_SL = "catastrophic_sl"
ADVSL = "advsl"
TP = "tp"
V7_EXIT = "v7_exit"
ENTRY = "entry"
SIZING = "sizing"
CONSUME_SCAN = "consume_scan"
CONSUME_OBF = "consume_obf"
CONSUME_EXF = "consume_exf"
CONSUME_ESOF = "consume_esof"
CONSUME_MARAS = "consume_maras"
CONSUME_ACB = "consume_acb"
# Default Q per action (spec §3). SL-class defaults to insta (the safety deviation);
# everything else defaults to its source cadence — all loosenable independently.
_DEFAULT_Q_NS: Dict[Action, int] = {
Action.CATASTROPHIC_SL: INSTA_Q_NS,
Action.ADVSL: INSTA_Q_NS,
Action.TP: SCAN_Q_NS,
Action.V7_EXIT: SCAN_Q_NS,
Action.ENTRY: SCAN_Q_NS,
Action.SIZING: SCAN_Q_NS,
Action.CONSUME_SCAN: SCAN_Q_NS,
Action.CONSUME_OBF: OBF_Q_NS,
Action.CONSUME_EXF: SCAN_Q_NS,
Action.CONSUME_ESOF: SCAN_Q_NS,
Action.CONSUME_MARAS: SCAN_Q_NS,
Action.CONSUME_ACB: SCAN_Q_NS,
}
@dataclass
class CadenceKnob:
"""One action's tunable cadence. Mutable — the control plane is its only mutator.
``q_ns`` is the actuation quantum: 0 actuate every reactor tick (insta); else
actuate once ``q_ns`` has elapsed since the last actuation. ``evaluate_every_tick``
keeps the would-be action computed (and shadow-logged) at full reactor speed
regardless of Q.
"""
action: Action
q_ns: int
evaluate_every_tick: bool = True
enabled: bool = True
source: str = "default" # "default" | "control_plane"
def __post_init__(self) -> None:
if int(self.q_ns) < 0:
raise ValueError(f"q_ns must be >= 0 (got {self.q_ns} for {self.action})")
self.q_ns = int(self.q_ns)
class CadenceControlPlane:
"""Runtime registry of per-action cadence knobs (spec §4).
Seeded from code defaults; ``refresh_from`` layers a runtime provider (HZ map /
env) on top, per action, leaving untouched actions at their current value.
"""
def __init__(self) -> None:
self._knobs: Dict[Action, CadenceKnob] = {
a: CadenceKnob(action=a, q_ns=q) for a, q in _DEFAULT_Q_NS.items()
}
def get(self, action: Action) -> CadenceKnob:
return self._knobs[action]
def set(
self, action: Action, *,
q_ns: Optional[int] = None,
evaluate_every_tick: Optional[bool] = None,
enabled: Optional[bool] = None,
source: str = "control_plane",
) -> CadenceKnob:
"""Independently override one action. Other actions are never touched."""
k = self._knobs[action]
new = CadenceKnob(
action=action,
q_ns=k.q_ns if q_ns is None else int(q_ns),
evaluate_every_tick=k.evaluate_every_tick if evaluate_every_tick is None else bool(evaluate_every_tick),
enabled=k.enabled if enabled is None else bool(enabled),
source=source,
)
self._knobs[action] = new
return new
def refresh_from(self, provider: Callable[[Action], Optional[dict]]) -> int:
"""Pull runtime overrides from a provider (HZ map / env reader).
``provider(action)`` returns a dict of overrides or None (no override keep
current value, i.e. the code-default floor). Returns the count of actions
updated. The control plane stays authoritative for live tuning.
"""
n = 0
for action in Action:
ov = provider(action)
if not ov:
continue
self.set(action, **{k: v for k, v in ov.items()
if k in ("q_ns", "evaluate_every_tick", "enabled")})
n += 1
return n
def due(self, action: Action, now_ns: int, last_actuation_ns: Optional[int]) -> bool:
"""Q-boundary test: should this action ACTUATE now?
Disabled never. ``last_actuation_ns is None`` (never actuated) yes.
q_ns == 0 (insta) every tick. Else once the quantum has elapsed.
"""
k = self._knobs[action]
if not k.enabled:
return False
if last_actuation_ns is None:
return True
if k.q_ns == 0:
return True
return (int(now_ns) - int(last_actuation_ns)) >= k.q_ns
# Alias matching spec §4 caller idiom (evaluate always, then consult to actuate).
should_actuate = due
def snapshot(self) -> Dict[str, dict]:
"""Surfaced view for live inspection (spec §4)."""
return {
a.value: {
"q_ns": k.q_ns, "evaluate_every_tick": k.evaluate_every_tick,
"enabled": k.enabled, "source": k.source,
}
for a, k in self._knobs.items()
}