Files
siloqy/prod/clean_arch/violet/test_violet_cadence.py

97 lines
3.4 KiB
Python
Raw Normal View History

"""V3b: Cadence Control Plane — defaults, independence, control-plane, Q boundaries."""
from __future__ import annotations
import sys
sys.path.insert(0, "/mnt/dolphinng5_predict")
import pytest
from hypothesis import given, settings, strategies as st
from prod.clean_arch.violet.cadence import (
INSTA_Q_NS, OBF_Q_NS, SCAN_Q_NS, Action, CadenceControlPlane, CadenceKnob,
)
def test_defaults_match_spec_table():
cp = CadenceControlPlane()
assert cp.get(Action.CATASTROPHIC_SL).q_ns == INSTA_Q_NS
assert cp.get(Action.ADVSL).q_ns == INSTA_Q_NS
assert cp.get(Action.TP).q_ns == SCAN_Q_NS
assert cp.get(Action.CONSUME_OBF).q_ns == OBF_Q_NS
assert cp.get(Action.ENTRY).q_ns == SCAN_Q_NS
# SL-class defaults strictly tighter than TP (the safety deviation).
assert cp.get(Action.CATASTROPHIC_SL).q_ns < cp.get(Action.TP).q_ns
# every action is registered (universality) and evaluates every tick.
for a in Action:
assert cp.get(a).evaluate_every_tick is True
def test_set_is_independent_per_action():
cp = CadenceControlPlane()
before = {a: cp.get(a).q_ns for a in Action}
cp.set(Action.TP, q_ns=1_000_000_000)
assert cp.get(Action.TP).q_ns == 1_000_000_000
assert cp.get(Action.TP).source == "control_plane"
# nothing else moved
for a in Action:
if a is Action.TP:
continue
assert cp.get(a).q_ns == before[a]
assert cp.get(a).source == "default"
def test_control_plane_override_beats_default_absence_falls_back():
cp = CadenceControlPlane()
overrides = {Action.TP: {"q_ns": 250_000_000}, Action.ENTRY: {"enabled": False}}
n = cp.refresh_from(lambda a: overrides.get(a))
assert n == 2
assert cp.get(Action.TP).q_ns == 250_000_000 # overridden
assert cp.get(Action.ENTRY).enabled is False
assert cp.get(Action.SIZING).q_ns == SCAN_Q_NS # untouched → default floor
def test_negative_q_rejected_at_construction():
with pytest.raises(ValueError):
CadenceKnob(action=Action.TP, q_ns=-1)
def test_due_insta_actuates_every_tick():
cp = CadenceControlPlane()
assert cp.due(Action.CATASTROPHIC_SL, now_ns=1000, last_actuation_ns=999) is True
# never-actuated always due
assert cp.due(Action.TP, now_ns=0, last_actuation_ns=None) is True
def test_due_scan_respects_quantum():
cp = CadenceControlPlane()
last = 1_000_000_000
assert cp.due(Action.TP, now_ns=last + SCAN_Q_NS - 1, last_actuation_ns=last) is False
assert cp.due(Action.TP, now_ns=last + SCAN_Q_NS, last_actuation_ns=last) is True
def test_disabled_action_never_due():
cp = CadenceControlPlane()
cp.set(Action.V7_EXIT, enabled=False)
assert cp.due(Action.V7_EXIT, now_ns=10**12, last_actuation_ns=None) is False
@given(
q=st.integers(min_value=0, max_value=10_000_000_000),
elapsed=st.integers(min_value=0, max_value=20_000_000_000),
)
@settings(max_examples=100, deadline=None)
def test_due_boundary_property(q, elapsed):
cp = CadenceControlPlane()
cp.set(Action.ENTRY, q_ns=q)
last = 5_000_000_000
due = cp.due(Action.ENTRY, now_ns=last + elapsed, last_actuation_ns=last)
assert due == (q == 0 or elapsed >= q)
def test_snapshot_surfaces_all_actions():
snap = CadenceControlPlane().snapshot()
assert set(snap.keys()) == {a.value for a in Action}
assert all({"q_ns", "enabled", "source"} <= set(v) for v in snap.values())