Files
siloqy/prod/clean_arch/violet/test_violet_synthetic_intents.py

76 lines
2.7 KiB
Python
Raw Normal View History

"""V2c: script generation determinism + plan→intent mapping."""
from __future__ import annotations
import sys
sys.path.insert(0, "/mnt/dolphinng5_predict")
import pytest
from prod.clean_arch.dita_v2.exec_router import ExecConfig, ExecutionRouter
from prod.clean_arch.violet.synthetic_intents import (
CycleSpec,
IntentScriptSpec,
Scenario,
build_enter_intent,
build_exit_intent,
generate_script,
router_config_for,
script_hash,
)
def test_script_is_seed_deterministic():
spec = IntentScriptSpec(n_cycles=18, seed=7)
a, b = generate_script(spec), generate_script(spec)
assert a == b
assert script_hash(a) == script_hash(b)
c = generate_script(IntentScriptSpec(n_cycles=18, seed=8))
assert script_hash(a) != script_hash(c)
def test_round_robin_covers_all_scenarios():
cycles = generate_script(IntentScriptSpec(n_cycles=18, seed=7))
seen = {c.scenario for c in cycles}
assert seen == set(Scenario)
assert all(c.price > 0 for c in cycles)
assert len({c.trade_id for c in cycles}) == 18
def test_router_config_mapping():
assert router_config_for(Scenario.IMMEDIATE_FILL).entry_miss == "skip"
r = router_config_for(Scenario.REST_EXPIRE_RETRY)
assert (r.entry_miss, r.entry_retries, r.retry_exhaust) == ("retry", 1, "skip")
m = router_config_for(Scenario.RETRY_EXHAUST_MARKET)
assert (m.entry_miss, m.retry_exhaust) == ("retry", "market")
assert all(router_config_for(s).style == "maker_both" for s in Scenario)
def test_enter_intent_carries_plan_fields():
router = ExecutionRouter(ExecConfig(style="maker_both"))
cycle = CycleSpec(idx=0, scenario=Scenario.IMMEDIATE_FILL,
trade_id="T1", price=100.0)
plan = router.plan_entry(trade_id="T1", asset="STORMUSDT",
position_side="SHORT", reference_price=100.0)
intent = build_enter_intent(cycle, plan, "STORMUSDT")
assert intent.order_type == "LIMIT" and intent.limit_price == plan.limit_price
assert intent.metadata["_time_in_force"] == "PostOnly"
assert intent.trade_id == "T1" and intent.action.value == "ENTER"
def test_exit_intent_market_when_urgent():
router = ExecutionRouter(ExecConfig(style="maker_both"))
plan = router.plan_exit(trade_id="T2", asset="STORMUSDT",
position_side="SHORT", reference_price=100.0,
reason="CATASTROPHIC")
assert not plan.is_maker and plan.order_type == "MARKET"
intent = build_exit_intent("T2", plan, "STORMUSDT", size=1.0, price=100.0)
assert intent.order_type == "MARKET"
assert intent.metadata["_time_in_force"] == "GTC"
assert intent.target_size == 1.0
if __name__ == "__main__":
raise SystemExit(pytest.main([__file__, "-v"]))