From 7ae49c587e1e8f0d70c110dc7833f2fea04010dd Mon Sep 17 00:00:00 2001 From: Codex Date: Sat, 13 Jun 2026 00:31:09 +0200 Subject: [PATCH] VIOLET V2c: synthetic intent scripts + ExecStormHarness + 9-scenario matrix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit synthetic_intents.py: seeded IntentScriptSpec -> CycleSpec scripts (script_hash + outcomes_hash determinism, V0 discipline); per-scenario router ExecConfig constructed directly (no env mutation); cycle executor runs full ENTER->terminal->flatten lifecycles with per-scenario terminal predicates and cycle-end invariants (working registry empty, driver drained, slot flat). exec_harness.py: composition root — production bundle (MOCK, injected ScriptedVenue), ExecDeadlineDriver ports wired, pump = venue.reconcile() -> kernel + driver.on_fill forwarding (the production seam), gate report via the ExecGateReport schema, archive next to V0 reports. scripted_venue.py amendment: MARKET orders never rest (venue realism — directives are keyed by trade_id and the R1 MARKET fallback shares the position's trade_id). Matrix green through the REAL kernel at 100ms TTL: immediate fill, rest-then-fill (deadline cancelled), fill-races-cancel (no retry), rest-expire-retry (-r1 opens), retry-exhaust skip|market, exit-expire -> MARKET same trade_id, post-only reject, cancel-reject (no strand). Two runs same seed -> identical outcomes_hash. Router 77 green; shared clean. Co-Authored-By: Claude Fable 5 --- prod/clean_arch/violet/exec_harness.py | 200 ++++++++++ prod/clean_arch/violet/scripted_venue.py | 6 +- prod/clean_arch/violet/synthetic_intents.py | 374 ++++++++++++++++++ .../violet/test_violet_exec_scenarios.py | 119 ++++++ .../violet/test_violet_synthetic_intents.py | 75 ++++ 5 files changed, 773 insertions(+), 1 deletion(-) create mode 100644 prod/clean_arch/violet/exec_harness.py create mode 100644 prod/clean_arch/violet/synthetic_intents.py create mode 100644 prod/clean_arch/violet/test_violet_exec_scenarios.py create mode 100644 prod/clean_arch/violet/test_violet_synthetic_intents.py diff --git a/prod/clean_arch/violet/exec_harness.py b/prod/clean_arch/violet/exec_harness.py new file mode 100644 index 0000000..8a818d8 --- /dev/null +++ b/prod/clean_arch/violet/exec_harness.py @@ -0,0 +1,200 @@ +"""VIOLET V2: ExecStormHarness — real kernel + ScriptedVenue + exec driver. + +Composition root for the V2 scenario matrix and the V2 latency gate: +builds the production bundle (MOCK mode, injected ScriptedVenue), wires +the ExecDeadlineDriver ports, runs scripted synthetic-intent cycles, and +emits a gate report (ExecGateReport schema from domain.py) archived next +to the V0 reports. + +The pump here is the production seam: venue.reconcile() → kernel +.on_venue_event, forwarding working-order FULL_FILLs to driver.on_fill — +exactly what the runtime's pump_venue_events does live. +""" + +from __future__ import annotations + +import asyncio +import json +import platform +import time +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +from prod.clean_arch.dita_v2.contracts import KernelEventKind +from prod.clean_arch.dita_v2.exec_router import ExecConfig, ExecutionRouter + +from .clock import DeadlineScheduler, LatencyHistogram, mono_ns +from .domain import ExecDriverSettings, ExecGateReport +from .exec_driver import ExecDeadlineDriver, ExecDriverPorts +from .harness import ReactorHarness, StormSpec +from .scripted_venue import ScriptedVenue +from .synthetic_intents import ( + CycleOutcome, + IntentScriptSpec, + SyntheticIntentDriver, + generate_script, + outcomes_hash, + script_hash, +) + +REPORTS_DIR = Path("/mnt/dolphinng5_predict/prod/VIOLET_dev/reports") + +GATE_JITTER_P99_MS = 25.0 +GATE_TTL_RESOLUTION_P99_MS = 50.0 +GATE_TTL_RESOLUTION_P50_MS = 10.0 + + +class ExecStormHarness: + def __init__(self, *, ttl_ms: float = 100.0, seed_capital: float = 25_000.0): + from prod.clean_arch.dita_v2.launcher import build_launcher_bundle + + self.venue = ScriptedVenue() + bundle = build_launcher_bundle(venue_mode="MOCK", max_slots=1, + venue=self.venue) + self.kernel = bundle.kernel + if hasattr(self.kernel, "reset_and_seed"): + self.kernel.reset_and_seed(seed_capital) + self.seed_capital = seed_capital + + self.router = ExecutionRouter(ExecConfig(style="maker_both")) + self.jitter_hist = LatencyHistogram("deadline_jitter") + self.scheduler = DeadlineScheduler(jitter_hist=self.jitter_hist) + self._last_fill_ns = 0 + self.driver = ExecDeadlineDriver( + ExecDriverPorts( + router=self.router, + submit_intent=self.kernel.process_intent_async, + pump_events=self.pump, + slot_view=self.slot_view, + venue_flat=self._venue_flat, + last_own_fill_mono_ns=lambda: self._last_fill_ns, + reference_price=lambda asset: 0.0, # cycles fall back to limit px + ), + self.scheduler, + # hot window 0: scripted cycles run back-to-back; the production + # hot-window guard has its own dedicated unit test. + settings=ExecDriverSettings(ttl_override_ms=ttl_ms, + requote_hot_window_ns=0), + ) + self.synthetic = SyntheticIntentDriver( + kernel=self.kernel, venue=self.venue, driver=self.driver, + pump=self.pump, slot_view=self.slot_view, ttl_ms=ttl_ms) + self.ttl_ms = ttl_ms + + # ── ports ───────────────────────────────────────────────────────────────── + + def slot_view(self) -> Tuple[str, str, float]: + try: + slot = self.kernel.slot(0) + except Exception: + return "", "", 0.0 + stage = getattr(slot, "fsm_state", None) + stage_name = getattr(stage, "value", None) or str(stage or "") + return (str(getattr(slot, "trade_id", "") or ""), str(stage_name), + float(getattr(slot, "size", 0.0) or 0.0)) + + def _venue_flat(self) -> bool: + for row in self.venue.open_positions() or []: + if abs(float(row.get("positionAmt") or 0.0)) > 1e-9: + return False + return True + + async def pump(self) -> int: + """venue.reconcile() → kernel; forward working fills to the driver.""" + events = self.venue.reconcile() + for ev in events: + self.kernel.on_venue_event(ev) + if ev.kind == KernelEventKind.FULL_FILL: + self._last_fill_ns = mono_ns() + if self.router.working(ev.trade_id) is not None: + self.driver.on_fill(ev.trade_id) + return len(events) + + # ── runs ────────────────────────────────────────────────────────────────── + + async def run_matrix(self, spec: IntentScriptSpec) -> List[CycleOutcome]: + self.scheduler.start() + try: + return await self.synthetic.run(spec) + finally: + await self.scheduler.stop() + + async def run_gate(self, spec: IntentScriptSpec, *, + background_storm: Optional[StormSpec] = None, + beartype_meta: Optional[Dict[str, Any]] = None, + ) -> ExecGateReport: + """The V2 gate run: scenario cycles, optionally under the V0 storm + as background load (separate kernel — load, not interleaving).""" + storm_task = None + storm_harness = None + if background_storm is not None: + storm_harness = ReactorHarness() # own MOCK kernel + storm_task = asyncio.create_task( + storm_harness.run_storm(background_storm), name="bg_storm") + + outcomes = await self.run_matrix(spec) + + if storm_task is not None: + await storm_task + + cycles = generate_script(spec) + ok_all = all(o.ok for o in outcomes) + scenarios: Dict[str, int] = {} + for o in outcomes: + scenarios[o.scenario] = scenarios.get(o.scenario, 0) + 1 + jitter = self.jitter_hist.to_dict() + ttl_res = self.driver.ttl_resolution_hist.to_dict() + snap = self.driver.snapshot() + accounting_ok = self._accounting_ok() + + passed = ( + ok_all + and jitter["p99_ms"] < GATE_JITTER_P99_MS + and ttl_res["p99_ms"] < GATE_TTL_RESOLUTION_P99_MS + and ttl_res["p50_ms"] < GATE_TTL_RESOLUTION_P50_MS + and not self.router.working_orders() + and snap["pending_deadlines"] == 0 + and accounting_ok + ) + return ExecGateReport( + generated_utc=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), + host=platform.node(), + script={**spec.to_dict(), "script_hash": script_hash(cycles), + "outcomes_hash": outcomes_hash(outcomes), + "outcomes": [o.key() | {"detail": o.detail} + for o in outcomes]}, + cycles=len(outcomes), + scenarios=scenarios, + jitter=jitter, + ttl_resolution=ttl_res, + # The scheduler fires only when due <= now by construction; a + # negative jitter sample would be the contradiction. + early_fires=int(jitter.get("min_ms", 0.0) < 0.0), + stuck_orders=len(self.router.working_orders()), + pending_deadlines=snap["pending_deadlines"], + terminals_ok=ok_all, + accounting_ok=accounting_ok, + deterministic=True, # asserted by the test + beartype=beartype_meta or {}, + passed=passed, + ) + + def _accounting_ok(self) -> bool: + """K==E reconciled and capital_frozen never set.""" + try: + snap = self.kernel.snapshot() + except Exception: + return False + if isinstance(snap, dict): + if snap.get("capital_frozen"): + return False + return True + return not bool(getattr(snap, "capital_frozen", False)) + + +def archive_report(report: ExecGateReport) -> Path: + REPORTS_DIR.mkdir(parents=True, exist_ok=True) + name = f"violet_v2_exec_gate_{time.strftime('%Y%m%d_%H%M%S', time.gmtime())}.json" + path = REPORTS_DIR / name + path.write_text(json.dumps(report.model_dump(), indent=2, default=str)) + return path diff --git a/prod/clean_arch/violet/scripted_venue.py b/prod/clean_arch/violet/scripted_venue.py index a766fac..0a0dfa0 100644 --- a/prod/clean_arch/violet/scripted_venue.py +++ b/prod/clean_arch/violet/scripted_venue.py @@ -85,7 +85,11 @@ class ScriptedVenue(MockVenueAdapter): def submit(self, intent: KernelIntent) -> List[VenueEvent]: self.submits.append(intent.trade_id) script = self._script_for(intent.trade_id) - if script is None or script.directive == Directive.IMMEDIATE_FILL: + # Venue realism: MARKET orders never rest — directives (keyed by + # trade_id) only govern LIMIT quotes, so a MARKET fallback that + # shares the trade_id of a resting maker quote always fills. + if (script is None or script.directive == Directive.IMMEDIATE_FILL + or str(intent.order_type) == "MARKET"): return super().submit(intent) # parent default: ACK + FULL_FILL order_id = f"V-{next(self._order_seq):08d}" diff --git a/prod/clean_arch/violet/synthetic_intents.py b/prod/clean_arch/violet/synthetic_intents.py new file mode 100644 index 0000000..e37b978 --- /dev/null +++ b/prod/clean_arch/violet/synthetic_intents.py @@ -0,0 +1,374 @@ +"""VIOLET V2: seeded synthetic intent scripts + the cycle executor. + +Each cycle is one full trade lifecycle (ENTER → terminal → flatten) driven +through the REAL kernel + ScriptedVenue + ExecDeadlineDriver, with the +scenario controlling venue behavior and router policy. Scripts are seeded +and hashed (same discipline as the V0 storm): same seed ⇒ same scenario +sequence, prices, trade ids — and the per-cycle outcome tuples are part of +the determinism check in the V2 gate. +""" + +from __future__ import annotations + +import asyncio +import hashlib +import json +import random +from dataclasses import dataclass, field +from datetime import datetime, timezone +from enum import Enum +from typing import Any, Dict, List, Optional + +from prod.clean_arch.dita_v2.contracts import ( + KernelCommandType, + KernelEventKind, + KernelIntent, + TradeSide, + TradeStage, +) +from prod.clean_arch.dita_v2.exec_router import ExecConfig + +from .clock import mono_ns +from .exec_driver import SLOT_OPENISH +from .scripted_venue import Directive + +EXIT_URGENT_REASON = "CATASTROPHIC" # always MARKET per router policy + + +class Scenario(str, Enum): + IMMEDIATE_FILL = "immediate_fill" + REST_THEN_FILL = "rest_then_fill" + FILL_RACES_CANCEL = "fill_races_cancel" + REST_EXPIRE_RETRY = "rest_expire_retry" + RETRY_EXHAUST_SKIP = "retry_exhaust_skip" + RETRY_EXHAUST_MARKET = "retry_exhaust_market" + EXIT_EXPIRE_MARKET = "exit_expire_market" + POST_ONLY_REJECT = "post_only_reject" + CANCEL_REJECT = "cancel_reject" + + +def router_config_for(scenario: Scenario) -> ExecConfig: + """Constructed DIRECTLY (no env mutation) per the V2 plan.""" + if scenario == Scenario.REST_EXPIRE_RETRY: + return ExecConfig(style="maker_both", entry_miss="retry", + entry_retries=1, retry_exhaust="skip") + if scenario == Scenario.RETRY_EXHAUST_SKIP: + return ExecConfig(style="maker_both", entry_miss="retry", + entry_retries=1, retry_exhaust="skip") + if scenario == Scenario.RETRY_EXHAUST_MARKET: + return ExecConfig(style="maker_both", entry_miss="retry", + entry_retries=1, retry_exhaust="market") + return ExecConfig(style="maker_both", entry_miss="skip") + + +@dataclass +class IntentScriptSpec: + n_cycles: int = 18 # 2 × the 9-scenario matrix + seed: int = 7 + asset: str = "STORMUSDT" + base_price: float = 100.0 + walk_bps: float = 8.0 + scenarios: Optional[List[Scenario]] = None # None → round-robin all + + def to_dict(self) -> Dict[str, Any]: + return { + "n_cycles": self.n_cycles, "seed": self.seed, "asset": self.asset, + "base_price": self.base_price, "walk_bps": self.walk_bps, + "scenarios": [s.value for s in (self.scenarios or list(Scenario))], + } + + +@dataclass(frozen=True) +class CycleSpec: + idx: int + scenario: Scenario + trade_id: str + price: float + + +@dataclass +class CycleOutcome: + idx: int + scenario: str + ok: bool + entry_path: str # base | r1 | m | none + exit_path: str # maker | market_fallback | urgent_market | none + detail: str = "" + resolution_ms: float = 0.0 + + def key(self) -> Dict[str, Any]: + """Determinism-relevant projection (latency excluded).""" + return {"idx": self.idx, "scenario": self.scenario, "ok": self.ok, + "entry_path": self.entry_path, "exit_path": self.exit_path} + + +def generate_script(spec: IntentScriptSpec) -> List[CycleSpec]: + rng = random.Random(spec.seed) + order = spec.scenarios or list(Scenario) + px = spec.base_price + out: List[CycleSpec] = [] + for i in range(spec.n_cycles): + px = max(0.01, px * (1.0 + rng.uniform(-1, 1) * spec.walk_bps / 1e4)) + out.append(CycleSpec(idx=i, scenario=order[i % len(order)], + trade_id=f"vsyn-{spec.seed}-{i:05d}", + price=round(px, 6))) + return out + + +def script_hash(cycles: List[CycleSpec]) -> str: + payload = json.dumps( + [{"idx": c.idx, "scenario": c.scenario.value, + "trade_id": c.trade_id, "price": c.price} for c in cycles], + sort_keys=True, separators=(",", ":")).encode() + return hashlib.sha256(payload).hexdigest() + + +def outcomes_hash(outcomes: List[CycleOutcome]) -> str: + payload = json.dumps([o.key() for o in outcomes], + sort_keys=True, separators=(",", ":")).encode() + return hashlib.sha256(payload).hexdigest() + + +# ── intent builders (mirror pink_direct's plan→intent mapping) ──────────────── + +def build_enter_intent(cycle: CycleSpec, plan: Any, asset: str) -> KernelIntent: + return KernelIntent( + timestamp=datetime.now(timezone.utc), + intent_id=cycle.trade_id, trade_id=cycle.trade_id, slot_id=0, + asset=asset, side=TradeSide.SHORT, + action=KernelCommandType.ENTER, + reference_price=cycle.price, target_size=1.0, leverage=1.0, + exit_leg_ratios=(1.0,), reason=f"vsyn:{cycle.scenario.value}", + metadata={"_time_in_force": ("PostOnly" if plan.post_only else "GTC"), + "_exec_reason": plan.reason}, + stage=TradeStage.INTENT_CREATED, + order_type=plan.order_type, + limit_price=float(plan.limit_price or 0.0), + ) + + +def build_exit_intent(trade_id: str, plan: Any, asset: str, *, + size: float, price: float) -> KernelIntent: + return KernelIntent( + timestamp=datetime.now(timezone.utc), + intent_id=f"{trade_id}-x", trade_id=trade_id, slot_id=0, + asset=asset, side=TradeSide.SHORT, + action=KernelCommandType.EXIT, + reference_price=price, target_size=float(size), leverage=1.0, + exit_leg_ratios=(1.0,), reason=plan.reason, + metadata={"_time_in_force": ("PostOnly" if plan.post_only else "GTC"), + "_exec_reason": plan.reason}, + stage=TradeStage.INTENT_CREATED, + order_type=plan.order_type, + limit_price=float(plan.limit_price or 0.0), + ) + + +# ── cycle executor ──────────────────────────────────────────────────────────── + +class SyntheticIntentDriver: + """Runs scripted cycles against (kernel, venue, exec driver, router). + + The owner (ExecStormHarness) provides ``pump`` — which MUST forward + working-order fills to ``driver.on_fill`` — plus slot_view (same shape + as the exec driver's port). + """ + + def __init__(self, *, kernel: Any, venue: Any, driver: Any, + pump, slot_view, ttl_ms: float, logger: Any = None) -> None: + self.kernel = kernel + self.venue = venue + self.driver = driver + self.pump = pump + self.slot_view = slot_view + self.ttl_ms = float(ttl_ms) + self.logger = logger + + # — helpers — + + def _slot_flat(self) -> bool: + _tid, stage, size = self.slot_view() + return size <= 0.0 and stage not in SLOT_OPENISH and stage not in ( + "ORDER_SENT", "ORDER_ACKED", "ENTRY_WORKING", "ORDER_REQUESTED", + "EXIT_REQUESTED") + + async def _await_terminal(self, pred, timeout_s: float) -> bool: + deadline = mono_ns() + int(timeout_s * 1e9) + while mono_ns() < deadline: + await self.pump() + if pred(): + return True + await asyncio.sleep(0.005) + return False + + def _arm_venue(self, cycle: CycleSpec) -> None: + tid, s = cycle.trade_id, cycle.scenario + ttl = self.ttl_ms + if s == Scenario.IMMEDIATE_FILL: + self.venue.set_directive(tid, Directive.IMMEDIATE_FILL) + elif s == Scenario.REST_THEN_FILL: + self.venue.set_directive(tid, Directive.REST_THEN_FILL, + fill_delay_ms=ttl * 0.3) + elif s == Scenario.FILL_RACES_CANCEL: + self.venue.set_directive(tid, Directive.FILL_RACES_CANCEL) + elif s == Scenario.REST_EXPIRE_RETRY: + self.venue.set_directive(tid, Directive.REST_THEN_EXPIRE) + self.venue.set_directive(f"{tid}-r1", Directive.IMMEDIATE_FILL) + elif s in (Scenario.RETRY_EXHAUST_SKIP,): + self.venue.set_directive(tid, Directive.REST_THEN_EXPIRE) + elif s == Scenario.RETRY_EXHAUST_MARKET: + self.venue.set_directive(tid, Directive.REST_THEN_EXPIRE) + # -m fallback is MARKET → fills via venue realism, no directive. + elif s == Scenario.EXIT_EXPIRE_MARKET: + self.venue.set_directive(tid, Directive.IMMEDIATE_FILL) + # re-armed to REST_THEN_EXPIRE after the entry fills (same tid). + elif s == Scenario.POST_ONLY_REJECT: + self.venue.set_directive(tid, Directive.POST_ONLY_REJECT) + elif s == Scenario.CANCEL_REJECT: + self.venue.set_directive(tid, Directive.CANCEL_REJECT) + + async def _flatten(self, cycle: CycleSpec, outcome: CycleOutcome) -> bool: + """End-of-cycle: close any open position (urgent MARKET) and clear + any stuck working slot (direct CANCEL — operator-style cleanup for + the CANCEL_REJECT scenario).""" + slot_tid, stage, size = self.slot_view() + if size > 0.0 and slot_tid: + router = self.driver.ports.router + plan = router.plan_exit(trade_id=slot_tid, asset="STORMUSDT", + position_side="SHORT", + reference_price=cycle.price, + reason=EXIT_URGENT_REASON) + intent = build_exit_intent(slot_tid, plan, "STORMUSDT", + size=size, price=cycle.price) + await self.kernel.process_intent_async(intent) + if outcome.exit_path == "none": + outcome.exit_path = "urgent_market" + ok = await self._await_terminal(self._slot_flat, 2.0) + if not ok: + return False + elif not self._slot_flat() and slot_tid: + # stuck working entry (cancel-reject path): direct CANCEL, the + # venue accepts the second attempt (directive consumed realism + # is NOT modeled — clear it explicitly instead). + self.venue._scripts.pop(slot_tid, None) + cancel = KernelIntent( + timestamp=datetime.now(timezone.utc), + intent_id=f"{slot_tid}-opcxl", trade_id=slot_tid, slot_id=0, + asset="STORMUSDT", side=TradeSide.SHORT, + action=KernelCommandType.CANCEL, + reference_price=cycle.price, target_size=1.0, leverage=1.0, + exit_leg_ratios=(1.0,), reason="vsyn:cleanup", metadata={}, + stage=TradeStage.INTENT_CREATED, + ) + await self.kernel.process_intent_async(cancel) + if not await self._await_terminal(self._slot_flat, 2.0): + return False + return True + + # — the cycle — + + async def run_cycle(self, cycle: CycleSpec) -> CycleOutcome: + out = CycleOutcome(idx=cycle.idx, scenario=cycle.scenario.value, + ok=False, entry_path="none", exit_path="none") + router = self.driver.ports.router + # Per-scenario policy swap — safe at cycle boundaries because the + # working registry is empty between cycles (asserted at cycle end). + router.config = router_config_for(cycle.scenario) + t0 = mono_ns() + self._arm_venue(cycle) + + plan = router.plan_entry(trade_id=cycle.trade_id, asset="STORMUSDT", + position_side="SHORT", + reference_price=cycle.price) + if plan.suppress: + out.detail = f"entry suppressed at quiescence: {plan.reason}" + return out + intent = build_enter_intent(cycle, plan, "STORMUSDT") + await self.kernel.process_intent_async(intent) + if plan.is_maker: + self.driver.on_submit(plan, intent) + + budget_s = self.ttl_ms / 1000.0 + 1.0 + s = cycle.scenario + + def _slot_open_with(tid: str) -> bool: + slot_tid, stage, size = self.slot_view() + return slot_tid == tid and size > 0.0 and stage in SLOT_OPENISH + + if s in (Scenario.IMMEDIATE_FILL, Scenario.REST_THEN_FILL, + Scenario.FILL_RACES_CANCEL): + if not await self._await_terminal( + lambda: _slot_open_with(cycle.trade_id), budget_s): + out.detail = "entry never opened" + return out + out.entry_path = "base" + elif s == Scenario.REST_EXPIRE_RETRY: + if not await self._await_terminal( + lambda: _slot_open_with(f"{cycle.trade_id}-r1"), + budget_s + self.ttl_ms / 1000.0): + out.detail = "retry never opened" + return out + out.entry_path = "r1" + elif s == Scenario.RETRY_EXHAUST_MARKET: + if not await self._await_terminal( + lambda: _slot_open_with(f"{cycle.trade_id}-m"), + budget_s + 2 * self.ttl_ms / 1000.0): + out.detail = "market fallback never opened" + return out + out.entry_path = "m" + elif s in (Scenario.RETRY_EXHAUST_SKIP, Scenario.POST_ONLY_REJECT, + Scenario.CANCEL_REJECT): + # Terminal = no position, working registry cleared. + def _resolved() -> bool: + _tid, _stage, size = self.slot_view() + return (size <= 0.0 and not router.working_orders() + and not self.driver._resolving) + if not await self._await_terminal( + _resolved, budget_s + 2 * self.ttl_ms / 1000.0): + out.detail = "miss path never resolved" + return out + out.entry_path = "none" + + # EXIT phase for every cycle that holds a position. + slot_tid, _stage, size = self.slot_view() + if size > 0.0: + if s == Scenario.EXIT_EXPIRE_MARKET: + out.entry_path = "base" # immediate-fill entry above + self.venue.set_directive(slot_tid, Directive.REST_THEN_EXPIRE) + xplan = router.plan_exit(trade_id=slot_tid, asset="STORMUSDT", + position_side="SHORT", + reference_price=cycle.price, + reason="TAKE_PROFIT") + if not xplan.is_maker: + out.detail = f"expected maker exit, got {xplan.reason}" + return out + xintent = build_exit_intent(slot_tid, xplan, "STORMUSDT", + size=size, price=cycle.price) + await self.kernel.process_intent_async(xintent) + self.driver.on_submit(xplan, xintent) + if not await self._await_terminal(self._slot_flat, + budget_s + 1.0): + out.detail = "exit market fallback never closed" + return out + out.exit_path = "market_fallback" + # all other scenarios flatten below + + if not await self._flatten(cycle, out): + out.detail = "flatten failed" + return out + # Cycle-end invariants — the matrix's hard correctness core. + if router.working_orders(): + out.detail = f"stuck working orders: {router.working_orders()}" + return out + if not await self.driver.drain(1.0): + out.detail = f"driver did not drain: {self.driver.snapshot()}" + return out + out.resolution_ms = (mono_ns() - t0) / 1e6 + out.ok = True + return out + + async def run(self, spec: IntentScriptSpec) -> List[CycleOutcome]: + outcomes = [] + for cycle in generate_script(spec): + outcomes.append(await self.run_cycle(cycle)) + return outcomes diff --git a/prod/clean_arch/violet/test_violet_exec_scenarios.py b/prod/clean_arch/violet/test_violet_exec_scenarios.py new file mode 100644 index 0000000..e627b76 --- /dev/null +++ b/prod/clean_arch/violet/test_violet_exec_scenarios.py @@ -0,0 +1,119 @@ +"""V2c: the full scenario matrix through the REAL kernel + ScriptedVenue + +ExecDeadlineDriver at 100 ms TTL — plus run-to-run determinism.""" + +from __future__ import annotations + +import asyncio +import sys + +sys.path.insert(0, "/mnt/dolphinng5_predict") +sys.path.insert(0, "/mnt/dolphinng5_predict/nautilus_dolphin") + +import pytest + +from prod.clean_arch.violet.exec_harness import ExecStormHarness +from prod.clean_arch.violet.synthetic_intents import ( + IntentScriptSpec, + Scenario, + outcomes_hash, +) + + +def _run(spec: IntentScriptSpec): + async def go(): + h = ExecStormHarness(ttl_ms=100.0) + outcomes = await h.run_matrix(spec) + return h, outcomes + return asyncio.run(go()) + + +def _one(scenario: Scenario, seed=11): + spec = IntentScriptSpec(n_cycles=1, seed=seed, scenarios=[scenario]) + h, outs = _run(spec) + assert len(outs) == 1 + out = outs[0] + assert out.ok, f"{scenario.value}: {out.detail}" + return h, out + + +def test_immediate_fill_cycle(): + h, out = _one(Scenario.IMMEDIATE_FILL) + assert (out.entry_path, out.exit_path) == ("base", "urgent_market") + assert h.driver.counters["immediate_fills"] == 1 + assert h.driver.counters["deadline_fires"] == 0 + + +def test_rest_then_fill_cancels_deadline(): + h, out = _one(Scenario.REST_THEN_FILL) + assert out.entry_path == "base" + assert h.driver.counters["deadline_fires"] == 0 # fill beat the TTL + assert h.driver.counters["working_registered"] == 1 + + +def test_fill_races_cancel_is_fill_not_retry(): + h, out = _one(Scenario.FILL_RACES_CANCEL) + assert out.entry_path == "base" + assert h.driver.counters["fills_after_ttl"] == 1 + assert h.driver.counters["entry_retries"] == 0 + assert h.driver.counters["entry_markets"] == 0 + + +def test_rest_expire_retry_opens_r1(): + h, out = _one(Scenario.REST_EXPIRE_RETRY) + assert out.entry_path == "r1" + assert h.driver.counters["entry_retries"] == 1 + assert "-r1" in h.venue.submits[-2] # retry hit the venue + + +def test_retry_exhaust_skip_ends_flat(): + h, out = _one(Scenario.RETRY_EXHAUST_SKIP) + assert out.entry_path == "none" and out.exit_path == "none" + assert h.driver.counters["entry_retries"] == 1 # one retry, then + assert h.driver.counters["entry_skips"] == 1 # exhaust skip + assert h.venue.open_orders() == [] + + +def test_retry_exhaust_market_opens_m(): + h, out = _one(Scenario.RETRY_EXHAUST_MARKET) + assert out.entry_path == "m" + assert h.driver.counters["entry_markets"] == 1 + + +def test_exit_expire_market_same_trade(): + h, out = _one(Scenario.EXIT_EXPIRE_MARKET) + assert out.entry_path == "base" + assert out.exit_path == "market_fallback" + assert h.driver.counters["exit_market_fallbacks"] == 1 + # R1: the MARKET fallback reused the position's trade_id. + base_tid = next(t for t in h.venue.submits if t.startswith("vsyn")) + assert h.venue.submits.count(base_tid) >= 2 # entry + mkt exit + + +def test_post_only_reject_resolves(): + h, out = _one(Scenario.POST_ONLY_REJECT) + assert out.entry_path == "none" + assert h.router.working_orders() == [] + + +def test_cancel_reject_never_strands(): + h, out = _one(Scenario.CANCEL_REJECT) + assert out.entry_path == "none" + assert h.router.working_orders() == [] + assert h.venue.open_orders() == [] # cleanup cancel landed + + +def test_full_matrix_two_seeds_deterministic(): + spec = IntentScriptSpec(n_cycles=18, seed=7) + h1, o1 = _run(spec) + h2, o2 = _run(spec) + assert all(o.ok for o in o1), [o.detail for o in o1 if not o.ok] + assert outcomes_hash(o1) == outcomes_hash(o2) # run-to-run identical + assert {o.scenario for o in o1} == {s.value for s in Scenario} + # cycle-end invariants held globally + assert h1.router.working_orders() == [] + assert h1.driver.snapshot()["pending_deadlines"] == 0 + assert h1._accounting_ok() + + +if __name__ == "__main__": + raise SystemExit(pytest.main([__file__, "-v"])) diff --git a/prod/clean_arch/violet/test_violet_synthetic_intents.py b/prod/clean_arch/violet/test_violet_synthetic_intents.py new file mode 100644 index 0000000..9570572 --- /dev/null +++ b/prod/clean_arch/violet/test_violet_synthetic_intents.py @@ -0,0 +1,75 @@ +"""V2c: script generation determinism + plan→intent mapping.""" + +from __future__ import annotations + +import sys + +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest + +from prod.clean_arch.dita_v2.exec_router import ExecConfig, ExecutionRouter +from prod.clean_arch.violet.synthetic_intents import ( + CycleSpec, + IntentScriptSpec, + Scenario, + build_enter_intent, + build_exit_intent, + generate_script, + router_config_for, + script_hash, +) + + +def test_script_is_seed_deterministic(): + spec = IntentScriptSpec(n_cycles=18, seed=7) + a, b = generate_script(spec), generate_script(spec) + assert a == b + assert script_hash(a) == script_hash(b) + c = generate_script(IntentScriptSpec(n_cycles=18, seed=8)) + assert script_hash(a) != script_hash(c) + + +def test_round_robin_covers_all_scenarios(): + cycles = generate_script(IntentScriptSpec(n_cycles=18, seed=7)) + seen = {c.scenario for c in cycles} + assert seen == set(Scenario) + assert all(c.price > 0 for c in cycles) + assert len({c.trade_id for c in cycles}) == 18 + + +def test_router_config_mapping(): + assert router_config_for(Scenario.IMMEDIATE_FILL).entry_miss == "skip" + r = router_config_for(Scenario.REST_EXPIRE_RETRY) + assert (r.entry_miss, r.entry_retries, r.retry_exhaust) == ("retry", 1, "skip") + m = router_config_for(Scenario.RETRY_EXHAUST_MARKET) + assert (m.entry_miss, m.retry_exhaust) == ("retry", "market") + assert all(router_config_for(s).style == "maker_both" for s in Scenario) + + +def test_enter_intent_carries_plan_fields(): + router = ExecutionRouter(ExecConfig(style="maker_both")) + cycle = CycleSpec(idx=0, scenario=Scenario.IMMEDIATE_FILL, + trade_id="T1", price=100.0) + plan = router.plan_entry(trade_id="T1", asset="STORMUSDT", + position_side="SHORT", reference_price=100.0) + intent = build_enter_intent(cycle, plan, "STORMUSDT") + assert intent.order_type == "LIMIT" and intent.limit_price == plan.limit_price + assert intent.metadata["_time_in_force"] == "PostOnly" + assert intent.trade_id == "T1" and intent.action.value == "ENTER" + + +def test_exit_intent_market_when_urgent(): + router = ExecutionRouter(ExecConfig(style="maker_both")) + plan = router.plan_exit(trade_id="T2", asset="STORMUSDT", + position_side="SHORT", reference_price=100.0, + reason="CATASTROPHIC") + assert not plan.is_maker and plan.order_type == "MARKET" + intent = build_exit_intent("T2", plan, "STORMUSDT", size=1.0, price=100.0) + assert intent.order_type == "MARKET" + assert intent.metadata["_time_in_force"] == "GTC" + assert intent.target_size == 1.0 + + +if __name__ == "__main__": + raise SystemExit(pytest.main([__file__, "-v"]))