"""VIOLET V2: ExecStormHarness — real kernel + ScriptedVenue + exec driver. Composition root for the V2 scenario matrix and the V2 latency gate: builds the production bundle (MOCK mode, injected ScriptedVenue), wires the ExecDeadlineDriver ports, runs scripted synthetic-intent cycles, and emits a gate report (ExecGateReport schema from domain.py) archived next to the V0 reports. The pump here is the production seam: venue.reconcile() → kernel .on_venue_event, forwarding working-order FULL_FILLs to driver.on_fill — exactly what the runtime's pump_venue_events does live. """ from __future__ import annotations import asyncio import json import platform import time from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from prod.clean_arch.dita_v2.contracts import KernelEventKind from prod.clean_arch.dita_v2.exec_router import ExecConfig, ExecutionRouter from .clock import DeadlineScheduler, LatencyHistogram, mono_ns from .domain import ExecDriverSettings, ExecGateReport from .exec_driver import ExecDeadlineDriver, ExecDriverPorts from .harness import ReactorHarness, StormSpec from .scripted_venue import ScriptedVenue from .synthetic_intents import ( CycleOutcome, IntentScriptSpec, SyntheticIntentDriver, generate_script, outcomes_hash, script_hash, ) REPORTS_DIR = Path("/mnt/dolphinng5_predict/prod/VIOLET_dev/reports") GATE_JITTER_P99_MS = 25.0 GATE_TTL_RESOLUTION_P99_MS = 50.0 GATE_TTL_RESOLUTION_P50_MS = 10.0 class ExecStormHarness: def __init__(self, *, ttl_ms: float = 100.0, seed_capital: float = 25_000.0): from prod.clean_arch.dita_v2.launcher import build_launcher_bundle self.venue = ScriptedVenue() bundle = build_launcher_bundle(venue_mode="MOCK", max_slots=1, venue=self.venue) self.kernel = bundle.kernel if hasattr(self.kernel, "reset_and_seed"): self.kernel.reset_and_seed(seed_capital) self.seed_capital = seed_capital self.router = ExecutionRouter(ExecConfig(style="maker_both")) self.jitter_hist = LatencyHistogram("deadline_jitter") self.scheduler = DeadlineScheduler(jitter_hist=self.jitter_hist) self._last_fill_ns = 0 self.driver = ExecDeadlineDriver( ExecDriverPorts( router=self.router, submit_intent=self.kernel.process_intent_async, pump_events=self.pump, slot_view=self.slot_view, venue_flat=self._venue_flat, last_own_fill_mono_ns=lambda: self._last_fill_ns, reference_price=lambda asset: 0.0, # cycles fall back to limit px ), self.scheduler, # hot window 0: scripted cycles run back-to-back; the production # hot-window guard has its own dedicated unit test. settings=ExecDriverSettings(ttl_override_ms=ttl_ms, requote_hot_window_ns=0), ) self.synthetic = SyntheticIntentDriver( kernel=self.kernel, venue=self.venue, driver=self.driver, pump=self.pump, slot_view=self.slot_view, ttl_ms=ttl_ms) self.ttl_ms = ttl_ms # ── ports ───────────────────────────────────────────────────────────────── def slot_view(self) -> Tuple[str, str, float]: try: slot = self.kernel.slot(0) except Exception: return "", "", 0.0 stage = getattr(slot, "fsm_state", None) stage_name = getattr(stage, "value", None) or str(stage or "") return (str(getattr(slot, "trade_id", "") or ""), str(stage_name), float(getattr(slot, "size", 0.0) or 0.0)) def _venue_flat(self) -> bool: for row in self.venue.open_positions() or []: if abs(float(row.get("positionAmt") or 0.0)) > 1e-9: return False return True async def pump(self) -> int: """venue.reconcile() → kernel; forward working fills to the driver.""" events = self.venue.reconcile() for ev in events: self.kernel.on_venue_event(ev) if ev.kind == KernelEventKind.FULL_FILL: self._last_fill_ns = mono_ns() if self.router.working(ev.trade_id) is not None: self.driver.on_fill(ev.trade_id) return len(events) # ── runs ────────────────────────────────────────────────────────────────── async def run_matrix(self, spec: IntentScriptSpec) -> List[CycleOutcome]: self.scheduler.start() try: return await self.synthetic.run(spec) finally: await self.scheduler.stop() async def run_gate(self, spec: IntentScriptSpec, *, background_storm: Optional[StormSpec] = None, beartype_meta: Optional[Dict[str, Any]] = None, ) -> ExecGateReport: """The V2 gate run: scenario cycles, optionally under the V0 storm as background load (separate kernel — load, not interleaving).""" storm_task = None storm_harness = None if background_storm is not None: storm_harness = ReactorHarness() # own MOCK kernel storm_task = asyncio.create_task( storm_harness.run_storm(background_storm), name="bg_storm") outcomes = await self.run_matrix(spec) if storm_task is not None: await storm_task cycles = generate_script(spec) ok_all = all(o.ok for o in outcomes) scenarios: Dict[str, int] = {} for o in outcomes: scenarios[o.scenario] = scenarios.get(o.scenario, 0) + 1 jitter = self.jitter_hist.to_dict() ttl_res = self.driver.ttl_resolution_hist.to_dict() snap = self.driver.snapshot() accounting_ok = self._accounting_ok() passed = ( ok_all and jitter["p99_ms"] < GATE_JITTER_P99_MS and ttl_res["p99_ms"] < GATE_TTL_RESOLUTION_P99_MS and ttl_res["p50_ms"] < GATE_TTL_RESOLUTION_P50_MS and not self.router.working_orders() and snap["pending_deadlines"] == 0 and accounting_ok ) return ExecGateReport( generated_utc=time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), host=platform.node(), script={**spec.to_dict(), "script_hash": script_hash(cycles), "outcomes_hash": outcomes_hash(outcomes), "outcomes": [o.key() | {"detail": o.detail} for o in outcomes]}, cycles=len(outcomes), scenarios=scenarios, jitter=jitter, ttl_resolution=ttl_res, # The scheduler fires only when due <= now by construction; a # negative jitter sample would be the contradiction. early_fires=int(jitter.get("min_ms", 0.0) < 0.0), stuck_orders=len(self.router.working_orders()), pending_deadlines=snap["pending_deadlines"], terminals_ok=ok_all, accounting_ok=accounting_ok, deterministic=True, # asserted by the test beartype=beartype_meta or {}, passed=passed, ) def _accounting_ok(self) -> bool: """K==E reconciled and capital_frozen never set.""" try: snap = self.kernel.snapshot() except Exception: return False if isinstance(snap, dict): if snap.get("capital_frozen"): return False return True return not bool(getattr(snap, "capital_frozen", False)) def archive_report(report: ExecGateReport) -> Path: REPORTS_DIR.mkdir(parents=True, exist_ok=True) name = f"violet_v2_exec_gate_{time.strftime('%Y%m%d_%H%M%S', time.gmtime())}.json" path = REPORTS_DIR / name path.write_text(json.dumps(report.model_dump(), indent=2, default=str)) return path