97 lines
3.4 KiB
Python
97 lines
3.4 KiB
Python
|
|
"""V3b: Cadence Control Plane — defaults, independence, control-plane, Q boundaries."""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import sys
|
||
|
|
|
||
|
|
sys.path.insert(0, "/mnt/dolphinng5_predict")
|
||
|
|
|
||
|
|
import pytest
|
||
|
|
from hypothesis import given, settings, strategies as st
|
||
|
|
|
||
|
|
from prod.clean_arch.violet.cadence import (
|
||
|
|
INSTA_Q_NS, OBF_Q_NS, SCAN_Q_NS, Action, CadenceControlPlane, CadenceKnob,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def test_defaults_match_spec_table():
|
||
|
|
cp = CadenceControlPlane()
|
||
|
|
assert cp.get(Action.CATASTROPHIC_SL).q_ns == INSTA_Q_NS
|
||
|
|
assert cp.get(Action.ADVSL).q_ns == INSTA_Q_NS
|
||
|
|
assert cp.get(Action.TP).q_ns == SCAN_Q_NS
|
||
|
|
assert cp.get(Action.CONSUME_OBF).q_ns == OBF_Q_NS
|
||
|
|
assert cp.get(Action.ENTRY).q_ns == SCAN_Q_NS
|
||
|
|
# SL-class defaults strictly tighter than TP (the safety deviation).
|
||
|
|
assert cp.get(Action.CATASTROPHIC_SL).q_ns < cp.get(Action.TP).q_ns
|
||
|
|
# every action is registered (universality) and evaluates every tick.
|
||
|
|
for a in Action:
|
||
|
|
assert cp.get(a).evaluate_every_tick is True
|
||
|
|
|
||
|
|
|
||
|
|
def test_set_is_independent_per_action():
|
||
|
|
cp = CadenceControlPlane()
|
||
|
|
before = {a: cp.get(a).q_ns for a in Action}
|
||
|
|
cp.set(Action.TP, q_ns=1_000_000_000)
|
||
|
|
assert cp.get(Action.TP).q_ns == 1_000_000_000
|
||
|
|
assert cp.get(Action.TP).source == "control_plane"
|
||
|
|
# nothing else moved
|
||
|
|
for a in Action:
|
||
|
|
if a is Action.TP:
|
||
|
|
continue
|
||
|
|
assert cp.get(a).q_ns == before[a]
|
||
|
|
assert cp.get(a).source == "default"
|
||
|
|
|
||
|
|
|
||
|
|
def test_control_plane_override_beats_default_absence_falls_back():
|
||
|
|
cp = CadenceControlPlane()
|
||
|
|
overrides = {Action.TP: {"q_ns": 250_000_000}, Action.ENTRY: {"enabled": False}}
|
||
|
|
n = cp.refresh_from(lambda a: overrides.get(a))
|
||
|
|
assert n == 2
|
||
|
|
assert cp.get(Action.TP).q_ns == 250_000_000 # overridden
|
||
|
|
assert cp.get(Action.ENTRY).enabled is False
|
||
|
|
assert cp.get(Action.SIZING).q_ns == SCAN_Q_NS # untouched → default floor
|
||
|
|
|
||
|
|
|
||
|
|
def test_negative_q_rejected_at_construction():
|
||
|
|
with pytest.raises(ValueError):
|
||
|
|
CadenceKnob(action=Action.TP, q_ns=-1)
|
||
|
|
|
||
|
|
|
||
|
|
def test_due_insta_actuates_every_tick():
|
||
|
|
cp = CadenceControlPlane()
|
||
|
|
assert cp.due(Action.CATASTROPHIC_SL, now_ns=1000, last_actuation_ns=999) is True
|
||
|
|
# never-actuated always due
|
||
|
|
assert cp.due(Action.TP, now_ns=0, last_actuation_ns=None) is True
|
||
|
|
|
||
|
|
|
||
|
|
def test_due_scan_respects_quantum():
|
||
|
|
cp = CadenceControlPlane()
|
||
|
|
last = 1_000_000_000
|
||
|
|
assert cp.due(Action.TP, now_ns=last + SCAN_Q_NS - 1, last_actuation_ns=last) is False
|
||
|
|
assert cp.due(Action.TP, now_ns=last + SCAN_Q_NS, last_actuation_ns=last) is True
|
||
|
|
|
||
|
|
|
||
|
|
def test_disabled_action_never_due():
|
||
|
|
cp = CadenceControlPlane()
|
||
|
|
cp.set(Action.V7_EXIT, enabled=False)
|
||
|
|
assert cp.due(Action.V7_EXIT, now_ns=10**12, last_actuation_ns=None) is False
|
||
|
|
|
||
|
|
|
||
|
|
@given(
|
||
|
|
q=st.integers(min_value=0, max_value=10_000_000_000),
|
||
|
|
elapsed=st.integers(min_value=0, max_value=20_000_000_000),
|
||
|
|
)
|
||
|
|
@settings(max_examples=100, deadline=None)
|
||
|
|
def test_due_boundary_property(q, elapsed):
|
||
|
|
cp = CadenceControlPlane()
|
||
|
|
cp.set(Action.ENTRY, q_ns=q)
|
||
|
|
last = 5_000_000_000
|
||
|
|
due = cp.due(Action.ENTRY, now_ns=last + elapsed, last_actuation_ns=last)
|
||
|
|
assert due == (q == 0 or elapsed >= q)
|
||
|
|
|
||
|
|
|
||
|
|
def test_snapshot_surfaces_all_actions():
|
||
|
|
snap = CadenceControlPlane().snapshot()
|
||
|
|
assert set(snap.keys()) == {a.value for a in Action}
|
||
|
|
assert all({"q_ns", "enabled", "source"} <= set(v) for v in snap.values())
|