VIOLET V3b: Cadence Control Plane — universal per-action quantization
Spec prod/docs/VIOLET_SPEC__CADENCE_CONTROL_PLANE.md + cadence.py: every scan-governed action (entry/sizing/each exit reason/each input plane) carries an INDEPENDENT, runtime-tunable Q knob surfaced in a control plane (HZ-backed, code defaults as floor). Evaluate-every-tick / actuate-at-Q; SL defaults insta, TP/entry default scan, OBF ~1s — all loosenable per-action. 9 tests pass. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
96
prod/clean_arch/violet/test_violet_cadence.py
Normal file
96
prod/clean_arch/violet/test_violet_cadence.py
Normal file
@@ -0,0 +1,96 @@
|
||||
"""V3b: Cadence Control Plane — defaults, independence, control-plane, Q boundaries."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, "/mnt/dolphinng5_predict")
|
||||
|
||||
import pytest
|
||||
from hypothesis import given, settings, strategies as st
|
||||
|
||||
from prod.clean_arch.violet.cadence import (
|
||||
INSTA_Q_NS, OBF_Q_NS, SCAN_Q_NS, Action, CadenceControlPlane, CadenceKnob,
|
||||
)
|
||||
|
||||
|
||||
def test_defaults_match_spec_table():
|
||||
cp = CadenceControlPlane()
|
||||
assert cp.get(Action.CATASTROPHIC_SL).q_ns == INSTA_Q_NS
|
||||
assert cp.get(Action.ADVSL).q_ns == INSTA_Q_NS
|
||||
assert cp.get(Action.TP).q_ns == SCAN_Q_NS
|
||||
assert cp.get(Action.CONSUME_OBF).q_ns == OBF_Q_NS
|
||||
assert cp.get(Action.ENTRY).q_ns == SCAN_Q_NS
|
||||
# SL-class defaults strictly tighter than TP (the safety deviation).
|
||||
assert cp.get(Action.CATASTROPHIC_SL).q_ns < cp.get(Action.TP).q_ns
|
||||
# every action is registered (universality) and evaluates every tick.
|
||||
for a in Action:
|
||||
assert cp.get(a).evaluate_every_tick is True
|
||||
|
||||
|
||||
def test_set_is_independent_per_action():
|
||||
cp = CadenceControlPlane()
|
||||
before = {a: cp.get(a).q_ns for a in Action}
|
||||
cp.set(Action.TP, q_ns=1_000_000_000)
|
||||
assert cp.get(Action.TP).q_ns == 1_000_000_000
|
||||
assert cp.get(Action.TP).source == "control_plane"
|
||||
# nothing else moved
|
||||
for a in Action:
|
||||
if a is Action.TP:
|
||||
continue
|
||||
assert cp.get(a).q_ns == before[a]
|
||||
assert cp.get(a).source == "default"
|
||||
|
||||
|
||||
def test_control_plane_override_beats_default_absence_falls_back():
|
||||
cp = CadenceControlPlane()
|
||||
overrides = {Action.TP: {"q_ns": 250_000_000}, Action.ENTRY: {"enabled": False}}
|
||||
n = cp.refresh_from(lambda a: overrides.get(a))
|
||||
assert n == 2
|
||||
assert cp.get(Action.TP).q_ns == 250_000_000 # overridden
|
||||
assert cp.get(Action.ENTRY).enabled is False
|
||||
assert cp.get(Action.SIZING).q_ns == SCAN_Q_NS # untouched → default floor
|
||||
|
||||
|
||||
def test_negative_q_rejected_at_construction():
|
||||
with pytest.raises(ValueError):
|
||||
CadenceKnob(action=Action.TP, q_ns=-1)
|
||||
|
||||
|
||||
def test_due_insta_actuates_every_tick():
|
||||
cp = CadenceControlPlane()
|
||||
assert cp.due(Action.CATASTROPHIC_SL, now_ns=1000, last_actuation_ns=999) is True
|
||||
# never-actuated always due
|
||||
assert cp.due(Action.TP, now_ns=0, last_actuation_ns=None) is True
|
||||
|
||||
|
||||
def test_due_scan_respects_quantum():
|
||||
cp = CadenceControlPlane()
|
||||
last = 1_000_000_000
|
||||
assert cp.due(Action.TP, now_ns=last + SCAN_Q_NS - 1, last_actuation_ns=last) is False
|
||||
assert cp.due(Action.TP, now_ns=last + SCAN_Q_NS, last_actuation_ns=last) is True
|
||||
|
||||
|
||||
def test_disabled_action_never_due():
|
||||
cp = CadenceControlPlane()
|
||||
cp.set(Action.V7_EXIT, enabled=False)
|
||||
assert cp.due(Action.V7_EXIT, now_ns=10**12, last_actuation_ns=None) is False
|
||||
|
||||
|
||||
@given(
|
||||
q=st.integers(min_value=0, max_value=10_000_000_000),
|
||||
elapsed=st.integers(min_value=0, max_value=20_000_000_000),
|
||||
)
|
||||
@settings(max_examples=100, deadline=None)
|
||||
def test_due_boundary_property(q, elapsed):
|
||||
cp = CadenceControlPlane()
|
||||
cp.set(Action.ENTRY, q_ns=q)
|
||||
last = 5_000_000_000
|
||||
due = cp.due(Action.ENTRY, now_ns=last + elapsed, last_actuation_ns=last)
|
||||
assert due == (q == 0 or elapsed >= q)
|
||||
|
||||
|
||||
def test_snapshot_surfaces_all_actions():
|
||||
snap = CadenceControlPlane().snapshot()
|
||||
assert set(snap.keys()) == {a.value for a in Action}
|
||||
assert all({"q_ns", "enabled", "source"} <= set(v) for v in snap.values())
|
||||
Reference in New Issue
Block a user