diff --git a/prod/clean_arch/violet/exec_driver.py b/prod/clean_arch/violet/exec_driver.py new file mode 100644 index 0000000..b603f26 --- /dev/null +++ b/prod/clean_arch/violet/exec_driver.py @@ -0,0 +1,345 @@ +"""VIOLET V2: ExecDeadlineDriver — event-driven TTL resolution at 100 ms. + +Lifts the PINK TTL-resolution logic (pink_direct.py `_exec_after_submit` / +`_exec_ttl_loop` / `_handle_expired_working`, proven live) out of the +runtime into a standalone driver with injected ports, and replaces the 1 s +polling sweep with ONE DeadlineScheduler deadline PER WORKING ORDER — +sub-second TTLs with measured jitter instead of polling quantization. + +Timing authority note: the shared ExecutionRouter clamps TTLs to >= 0.5 s +(ExecConfig.from_env [0.5, 300] + register_working's max(0.5, ttl_s) +floor). This driver therefore NEVER polls router.expired(); it schedules +its own deadlines from ExecDriverSettings.ttl_ms_for(plan.ttl_s) and uses +the router purely as policy authority (plans, miss policy, retry/fallback, +registry pop-semantics). The router's internal WorkingOrder.deadline is +vestigial here. + +Concurrency: asyncio single-threaded cooperative. Re-entrancy between a +deadline fire and a fill notification is handled by (a) the `_resolving` +single-flight set, (b) the router's pop-semantics (`note_fill`/`note_cancel` +make double resolution a no-op), and (c) a state re-check after every await +(fills race cancels on a live venue — same doctrine as pink_direct). +""" + +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass, field, replace +from datetime import datetime, timezone +from typing import Any, Awaitable, Callable, Dict, Optional, Set, Tuple + +from prod.clean_arch.dita_v2.contracts import ( + KernelCommandType, + KernelIntent, + TradeSide, +) +from prod.clean_arch.dita_v2.exec_router import ExecutionRouter, MissAction + +from .clock import Deadline, DeadlineScheduler, LatencyHistogram, mono_ns +from .domain import ExecDriverSettings, typed + +LOGGER = logging.getLogger("violet.exec_driver") + +SlotView = Tuple[str, str, float] # (trade_id, fsm_state_name, size) + +# Copied from pink_direct._SLOT_OPENISH (9 string constants — a copy, not a +# runtime import: the driver must not couple to PinkDirectRuntime). +SLOT_OPENISH = ("PARTIAL_FILL", "POSITION_OPENED", "POSITION_OPEN", + "EXIT_REQUESTED", "EXIT_SENT", "EXIT_ACKED", "EXIT_WORKING", + "POSITION_PARTIALLY_CLOSED") +SLOT_CLOSEDISH = ("POSITION_CLOSED", "CLOSED", "TRADE_TERMINAL_WRITTEN", "IDLE") + + +@dataclass +class ExecDriverPorts: + """Everything the driver needs from its runtime, injected.""" + + router: ExecutionRouter + submit_intent: Callable[[KernelIntent], Awaitable[Any]] + pump_events: Callable[[], Awaitable[int]] + slot_view: Callable[[], SlotView] + venue_flat: Callable[[], bool] # raises ⇒ treated as NOT flat + last_own_fill_mono_ns: Callable[[], int] + reference_price: Callable[[str], float] # asset → freshest px, 0.0 unknown + exit_intent_fixup: Callable[[KernelIntent], KernelIntent] = lambda i: i + emit: Callable[..., None] = lambda **kw: None # telemetry sink + + +class ExecDeadlineDriver: + """One instance per runtime; single-slot scope (max_slots=1).""" + + def __init__( + self, + ports: ExecDriverPorts, + scheduler: DeadlineScheduler, + *, + settings: Optional[ExecDriverSettings] = None, + ttl_resolution_hist: Optional[LatencyHistogram] = None, + logger: logging.Logger = LOGGER, + ) -> None: + self.ports = ports + self.scheduler = scheduler + self.settings = settings or ExecDriverSettings() + self.ttl_resolution_hist = ttl_resolution_hist or LatencyHistogram( + "ttl_resolution") + self.logger = logger + self._working_intents: Dict[str, KernelIntent] = {} + self._deadlines: Dict[str, Deadline] = {} + self._resolving: Set[str] = set() + self.counters: Dict[str, int] = { + "immediate_fills": 0, "working_registered": 0, "deadline_fires": 0, + "fills_after_ttl": 0, "exit_market_fallbacks": 0, + "entry_skips": 0, "entry_retries": 0, "entry_markets": 0, + "requote_blocked": 0, "resolve_errors": 0, + } + + # ── submit-side ─────────────────────────────────────────────────────────── + + @typed + def on_submit(self, plan: Any, intent: KernelIntent) -> None: + """Classify a maker submit: filled now, resting, or rejected. + + Resting and rejected both register and get a deadline — a rejected + post-only quote gets schedule_in(0) so the SAME resolution path + (cancel → re-classify → miss/fallback) handles it immediately. + """ + router = self.ports.router + tid = intent.trade_id + slot_tid, stage, size = self.ports.slot_view() + filled = ( + (plan.action == "ENTER" and slot_tid == tid and size > 0.0 + and stage in SLOT_OPENISH) + or (plan.action == "EXIT" and ( + slot_tid != tid or stage in SLOT_CLOSEDISH)) + ) + if filled: + self.counters["immediate_fills"] += 1 + self.ports.emit(event="immediate_fill", action=plan.action, + trade_id=tid, reason=plan.reason) + return + rejected = stage in ("ORDER_REJECTED", "EXIT_REJECTED") + router.register_working( + trade_id=tid, asset=intent.asset, + position_side=(getattr(intent.side, "value", None) or str(intent.side)), + plan=plan, + base_trade_id=plan.metadata.get("base_trade_id") or tid, + retry_n=int(plan.metadata.get("retry_n", 0) or 0), + ) + self._working_intents[tid] = intent + self.counters["working_registered"] += 1 + ttl_ms = 0.0 if rejected else self.settings.ttl_ms_for(plan.ttl_s) + self._deadlines[tid] = self.scheduler.schedule_in( + ttl_ms, lambda t=tid: self._fire(t), name=f"ttl:{tid}") + self.ports.emit(event="working", action=plan.action, trade_id=tid, + limit_price=plan.limit_price, ttl_ms=ttl_ms, + rejected=rejected, reason=plan.reason) + + # ── fill/cancel notifications (the polling loop never needed these) ────── + + @typed + def on_fill(self, trade_id: str) -> None: + dl = self._deadlines.pop(trade_id, None) + if dl is not None: + dl.cancel() + self.ports.router.note_fill(trade_id) + self._working_intents.pop(trade_id, None) + + @typed + def on_cancel(self, trade_id: str) -> None: + dl = self._deadlines.pop(trade_id, None) + if dl is not None: + dl.cancel() + self.ports.router.note_cancel(trade_id) + self._working_intents.pop(trade_id, None) + + # ── deadline path ───────────────────────────────────────────────────────── + + def _fire(self, trade_id: str) -> None: + """DeadlineScheduler callback: sync + non-blocking by contract.""" + fire_ns = mono_ns() + self.counters["deadline_fires"] += 1 + if trade_id in self._resolving: + return + self._resolving.add(trade_id) + asyncio.get_running_loop().create_task( + self._resolve_expired(trade_id, fire_ns), + name=f"violet_exec_resolve:{trade_id}", + ) + + async def _resolve_expired(self, trade_id: str, fire_ns: int) -> None: + try: + await self._resolve_expired_inner(trade_id, fire_ns) + except Exception as exc: # noqa: BLE001 + self.counters["resolve_errors"] += 1 + self.logger.error("exec_driver: resolution failed for %s: %s", + trade_id, exc, exc_info=True) + finally: + self._resolving.discard(trade_id) + self._deadlines.pop(trade_id, None) + + async def _resolve_expired_inner(self, trade_id: str, fire_ns: int) -> None: + router = self.ports.router + wo = router.working(trade_id) + if wo is None: + return # already resolved (fill/cancel notification raced us) + + # 1. Drain late venue events — the quote may already be filled. + await self.ports.pump_events() + if router.working(trade_id) is None: + return + + def _entry_filled() -> bool: + slot_tid, stage, size = self.ports.slot_view() + return slot_tid == trade_id and size > 0.0 and stage in SLOT_OPENISH + + def _exit_done() -> bool: + slot_tid, stage, size = self.ports.slot_view() + return (slot_tid != trade_id or size <= 0.0 + or stage in SLOT_CLOSEDISH) + + # 2. Cancel the quote (idempotent; CANCEL_REJECT on a filled order is + # harmless). ttl_resolution = fire → cancel-intent handed to the + # kernel: the gated V2 latency metric. + base = self._working_intents.get(trade_id) + cancel_intent = KernelIntent( + timestamp=datetime.now(timezone.utc), + intent_id=f"{trade_id}-ttlcxl", + trade_id=trade_id, + slot_id=0, + asset=wo.asset, + side=(base.side if base is not None else TradeSide.SHORT), + action=KernelCommandType.CANCEL, + reference_price=(base.reference_price if base is not None else 0.0), + target_size=(base.target_size if base is not None else 0.0), + leverage=(base.leverage if base is not None else 1.0), + reason="exec_driver:ttl_expired", + ) + self.ttl_resolution_hist.record(mono_ns() - fire_ns) + try: + await self.ports.submit_intent(cancel_intent) + except Exception as exc: # noqa: BLE001 + self.logger.warning("exec_driver: ttl-cancel %s failed: %s", + trade_id, exc) + await self.ports.pump_events() + + # 3. Re-classify after the cancel round-trip (fill may have raced it). + if ((wo.action == "ENTER" and _entry_filled()) + or (wo.action == "EXIT" and _exit_done())): + router.note_fill(trade_id) + self._working_intents.pop(trade_id, None) + self.counters["fills_after_ttl"] += 1 + self.ports.emit(event="fill_after_ttl", trade_id=trade_id) + return + + router.note_cancel(trade_id) + base_intent = self._working_intents.pop(trade_id, None) + + # 4. EXIT: never strand a position — MARKET escalation, SAME trade_id. + if wo.action == "EXIT": + _tid, plan = router.market_fallback_plan(wo) + if base_intent is not None and not _exit_done(): + market_exit = replace( + base_intent, + intent_id=f"{trade_id}-mkt", + order_type="MARKET", limit_price=0.0, + timestamp=datetime.now(timezone.utc), + metadata={**(base_intent.metadata or {}), + "_time_in_force": "GTC", + "_exec_reason": plan.reason}, + ) + market_exit = self.ports.exit_intent_fixup(market_exit) + self.counters["exit_market_fallbacks"] += 1 + self.logger.warning("exec_driver: exit TTL → MARKET fallback %s", + trade_id) + self.ports.emit(event="exit_market_fallback", trade_id=trade_id) + await self.ports.submit_intent(market_exit) + await self.ports.pump_events() + return + + # 5. ENTER miss policy: skip | retry (bounded) | market. + action = router.entry_miss_action(wo) + self.ports.emit(event="entry_miss", trade_id=trade_id, action=action, + retry_n=wo.retry_n) + if action == MissAction.SKIP or base_intent is None: + self.counters["entry_skips"] += 1 + return + slot_tid, stage, size = self.ports.slot_view() + if size > 0.0 or stage in SLOT_OPENISH: + # Slot occupied (raced fill of remainder / another trade) — never + # double-enter. + self.counters["entry_skips"] += 1 + self.logger.warning("exec_driver: entry miss %s: slot busy (%s) — skip", + trade_id, stage) + return + # Venue-truth gate: re-quote ONLY when the venue is provably flat AND + # no own fill landed in the hot window. Ambiguity → skip; a skipped + # entry is always safe, a doubled position is not. + if not self._safe_to_requote(): + self.counters["requote_blocked"] += 1 + self.ports.emit(event="requote_blocked", trade_id=trade_id) + return + ref = 0.0 + try: + ref = float(self.ports.reference_price(wo.asset) or 0.0) + except Exception: # noqa: BLE001 + ref = 0.0 + if ref <= 0.0: + ref = float(wo.plan.limit_price or 0.0) + if action == MissAction.RETRY: + new_tid, plan = router.retry_plan(wo, reference_price=ref) + self.counters["entry_retries"] += 1 + else: # MARKET + new_tid, plan = router.market_fallback_plan(wo) + self.counters["entry_markets"] += 1 + new_intent = replace( + base_intent, + trade_id=new_tid, intent_id=new_tid, + timestamp=datetime.now(timezone.utc), + reference_price=(ref if ref > 0.0 else base_intent.reference_price), + order_type=plan.order_type, + limit_price=float(plan.limit_price or 0.0), + metadata={**(base_intent.metadata or {}), + "_time_in_force": ("PostOnly" if plan.post_only else "GTC"), + "_exec_reason": plan.reason}, + ) + self.logger.info("exec_driver: entry %s → %s as %s", + trade_id, action, new_tid) + await self.ports.submit_intent(new_intent) + if plan.is_maker: + self.on_submit(plan, new_intent) # deadline chains + await self.ports.pump_events() + + # ── guards / observability ──────────────────────────────────────────────── + + def _safe_to_requote(self) -> bool: + """Fail SAFE: recent own fill, any live position, or probe error ⇒ no.""" + if (mono_ns() - int(self.ports.last_own_fill_mono_ns() or 0) + < self.settings.requote_hot_window_ns): + return False + try: + return bool(self.ports.venue_flat()) + except Exception as exc: # noqa: BLE001 + self.logger.warning("exec_driver: requote probe failed (%s) — fail safe", + exc) + return False + + def snapshot(self) -> Dict[str, Any]: + return { + "working": [w.trade_id for w in self.ports.router.working_orders()], + "pending_deadlines": len(self._deadlines), + "resolving": len(self._resolving), + "counters": dict(self.counters), + "ttl_resolution": self.ttl_resolution_hist.to_dict(), + } + + async def drain(self, timeout_s: float = 5.0) -> bool: + """Quiesce: True when no pending deadlines / in-flight resolutions / + working orders remain within the timeout.""" + deadline = mono_ns() + int(timeout_s * 1e9) + while mono_ns() < deadline: + if (not self._deadlines and not self._resolving + and not self.ports.router.working_orders()): + return True + await asyncio.sleep(0.005) + return False diff --git a/prod/clean_arch/violet/scripted_venue.py b/prod/clean_arch/violet/scripted_venue.py new file mode 100644 index 0000000..a766fac --- /dev/null +++ b/prod/clean_arch/violet/scripted_venue.py @@ -0,0 +1,164 @@ +"""VIOLET V2: ScriptedVenue — per-order directives over the MOCK venue. + +Subclasses ``MockVenueAdapter`` (zero edits to the shared module) to give +tests deterministic, per-trade control over whether a maker quote fills +immediately, rests then fills, rests until TTL expiry, rejects post-only, +rejects the cancel, or fills in the race window between TTL fire and the +CANCEL round-trip (the late-WS-fill case production sees). + +Deferred fills are released through ``reconcile()`` — the exact seam the +production runtime drains via ``pump_venue_events`` — NEVER through the +parent's ``subscribe()`` 50 ms poll, which would put an artificial floor +under the V2 latency gate. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from enum import Enum +from typing import Dict, List, Optional, Tuple + +from prod.clean_arch.dita_v2.contracts import ( + KernelEventKind, + KernelIntent, + VenueEvent, + VenueEventStatus, + VenueOrder, + VenueOrderStatus, +) +from prod.clean_arch.dita_v2.mock_venue import MockVenueAdapter, MockVenueScenario + +from .clock import mono_ns +from .domain import typed + + +class Directive(str, Enum): + IMMEDIATE_FILL = "immediate_fill" + REST_THEN_FILL = "rest_then_fill" + REST_THEN_EXPIRE = "rest_then_expire" + POST_ONLY_REJECT = "post_only_reject" + CANCEL_REJECT = "cancel_reject" # rests; cancel is rejected + FILL_RACES_CANCEL = "fill_races_cancel" # rests; fill lands during cancel + + +@dataclass +class _Script: + directive: Directive + fill_delay_ms: float = 0.0 + + +class ScriptedVenue(MockVenueAdapter): + """MockVenueAdapter + per-trade_id directives. + + Directive lookup is longest-prefix: a directive set for ``T1`` also + governs ``T1-r1``/``T1-m`` unless those register their own — retry + chains can change behavior mid-scenario. + """ + + def __init__(self, scenario: Optional[MockVenueScenario] = None): + super().__init__(scenario) + self._scripts: Dict[str, _Script] = {} + self._intents: Dict[str, KernelIntent] = {} # venue_order_id → intent + self._pending_fills: List[Tuple[int, str]] = [] # (due_mono_ns, venue_order_id) + self.submits: List[str] = [] + self.cancels: List[str] = [] + + # ── scripting API ───────────────────────────────────────────────────────── + + @typed + def set_directive(self, trade_id_prefix: str, directive: Directive, + *, fill_delay_ms: float = 0.0) -> None: + self._scripts[trade_id_prefix] = _Script(directive, fill_delay_ms) + + def _script_for(self, trade_id: str) -> Optional[_Script]: + if trade_id in self._scripts: + return self._scripts[trade_id] + best = None + for prefix in self._scripts: + if trade_id.startswith(prefix): + if best is None or len(prefix) > len(best): + best = prefix + return self._scripts.get(best) if best else None + + # ── venue surface ───────────────────────────────────────────────────────── + + def submit(self, intent: KernelIntent) -> List[VenueEvent]: + self.submits.append(intent.trade_id) + script = self._script_for(intent.trade_id) + if script is None or script.directive == Directive.IMMEDIATE_FILL: + return super().submit(intent) # parent default: ACK + FULL_FILL + + order_id = f"V-{next(self._order_seq):08d}" + order = VenueOrder( + internal_trade_id=intent.trade_id, + venue_order_id=order_id, + venue_client_id=f"{intent.trade_id}:{intent.intent_id}", + side=intent.side, + intended_size=float(intent.target_size), + status=VenueOrderStatus.NEW, + metadata={"intent_id": intent.intent_id, "action": intent.action.value, + "slot_id": intent.slot_id, "asset": intent.asset}, + ) + if script.directive == Directive.POST_ONLY_REJECT: + return [self._event_from_order( + intent, order, KernelEventKind.ORDER_REJECT, + VenueEventStatus.REJECTED, reason="POST_ONLY_WOULD_CROSS")] + + # All REST_* directives: the quote rests — ACK only. + self._open_orders[order_id] = order + self._intents[order_id] = intent + if script.directive == Directive.REST_THEN_FILL: + due = mono_ns() + int(script.fill_delay_ms * 1_000_000) + self._pending_fills.append((due, order_id)) + return [self._event_from_order( + intent, order, KernelEventKind.ORDER_ACK, VenueEventStatus.ACKED)] + + def cancel(self, order: VenueOrder, *, reason: str = "") -> List[VenueEvent]: + self.cancels.append(order.internal_trade_id) + script = self._script_for(order.internal_trade_id) + if script is not None and script.directive == Directive.FILL_RACES_CANCEL: + # The fill beat the cancel on the wire: surface it on the next + # reconcile (= production pump), reject this cancel. + self._pending_fills.append((0, order.venue_order_id)) + return [self._event_from_order( + self._dummy_intent(order), order, KernelEventKind.CANCEL_REJECT, + VenueEventStatus.CANCELED_REJECTED, reason="ORDER_ALREADY_FILLED")] + if script is not None and script.directive == Directive.CANCEL_REJECT: + return [self._event_from_order( + self._dummy_intent(order), order, KernelEventKind.CANCEL_REJECT, + VenueEventStatus.CANCELED_REJECTED, reason="MOCK_CANCEL_REJECT")] + return super().cancel(order, reason=reason) + + def reconcile(self) -> List[VenueEvent]: + """Release pending fills whose due time has passed — the production + pump path. Fill price = limit price for LIMIT orders, else reference.""" + now = mono_ns() + out: List[VenueEvent] = [] + keep: List[Tuple[int, str]] = [] + for due, order_id in self._pending_fills: + if now < due: + keep.append((due, order_id)) + continue + order = self._open_orders.get(order_id) + intent = self._intents.get(order_id) + if order is None or intent is None: + continue # cancelled before the fill landed + px = (float(intent.limit_price) if intent.order_type == "LIMIT" + and float(intent.limit_price or 0.0) > 0.0 + else float(intent.reference_price or 0.0)) + out.append(self._event_from_order( + intent, order, KernelEventKind.FULL_FILL, VenueEventStatus.FILLED, + price=px, fill_size=float(intent.target_size), remaining_size=0.0)) + self._open_orders[order_id] = VenueOrder( + internal_trade_id=order.internal_trade_id, + venue_order_id=order.venue_order_id, + venue_client_id=order.venue_client_id, + side=order.side, + intended_size=order.intended_size, + filled_size=float(intent.target_size), + average_fill_price=px, + status=VenueOrderStatus.FILLED, + metadata=dict(order.metadata), + ) + self._pending_fills = keep + return out diff --git a/prod/clean_arch/violet/test_violet_exec_driver.py b/prod/clean_arch/violet/test_violet_exec_driver.py new file mode 100644 index 0000000..d1b6ca3 --- /dev/null +++ b/prod/clean_arch/violet/test_violet_exec_driver.py @@ -0,0 +1,260 @@ +"""V2b: ExecDeadlineDriver — TTL resolution semantics over fake-kernel ports. + +The real ExecutionRouter and the real DeadlineScheduler are used; only the +kernel/venue side is faked so each path (skip, retry, exhaust, market, +exit escalation, fill races, rejected-instant, fail-safe gate) can be +forced deterministically. Kernel-integration runs live in V2c. +""" + +from __future__ import annotations + +import asyncio +import sys + +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest + +from prod.clean_arch.dita_v2.exec_router import ExecConfig, ExecutionRouter +from prod.clean_arch.violet.clock import DeadlineScheduler, LatencyHistogram +from prod.clean_arch.violet.exec_driver import ( + ExecDeadlineDriver, + ExecDriverPorts, +) +from prod.clean_arch.violet.domain import ExecDriverSettings +from prod.clean_arch.violet.test_violet_scripted_venue import _intent + +TTL_MS = 30.0 + + +class _FakeRuntime: + """slot_view + submit + pump fakes the driver ports plug into.""" + + def __init__(self): + self.slot = ("", "IDLE", 0.0) + self.submitted = [] # KernelIntents the driver sent + self.pump_hook = None # optional callable run on each pump + self.flat = True + self.last_fill_ns = 0 + + async def submit_intent(self, intent): + self.submitted.append(intent) + return None + + async def pump_events(self): + if self.pump_hook is not None: + self.pump_hook() + return 0 + + def ports(self, router): + return ExecDriverPorts( + router=router, + submit_intent=self.submit_intent, + pump_events=self.pump_events, + slot_view=lambda: self.slot, + venue_flat=lambda: self.flat, + last_own_fill_mono_ns=lambda: self.last_fill_ns, + reference_price=lambda asset: 100.0, + ) + + +def _setup(style="maker_both", miss="skip", retries=1, exhaust="skip"): + rt = _FakeRuntime() + router = ExecutionRouter(ExecConfig( + style=style, entry_miss=miss, entry_retries=retries, + retry_exhaust=exhaust)) + sched = DeadlineScheduler(jitter_hist=LatencyHistogram("jit")) + driver = ExecDeadlineDriver( + rt.ports(router), sched, + settings=ExecDriverSettings(ttl_override_ms=TTL_MS)) + return rt, router, sched, driver + + +def _enter_plan(router, tid, px=100.0): + return router.plan_entry(trade_id=tid, asset="BTCUSDT", + position_side="SHORT", reference_price=px) + + +def _exit_plan(router, tid, px=100.0): + return router.plan_exit(trade_id=tid, asset="BTCUSDT", + position_side="SHORT", reference_price=px, + reason="TAKE_PROFIT") + + +async def _drive(driver, sched, body): + sched.start() + try: + await body() + assert await driver.drain(2.0), driver.snapshot() + finally: + await sched.stop() + + +@pytest.mark.asyncio +async def test_expire_skip_cancels_then_clears(): + rt, router, sched, driver = _setup(miss="skip") + + async def body(): + plan = _enter_plan(router, "E1") + driver.on_submit(plan, _intent("E1", limit_price=plan.limit_price)) + assert router.working("E1") is not None + await asyncio.sleep(TTL_MS / 1000 + 0.1) + + await _drive(driver, sched, body) + cancels = [i for i in rt.submitted if i.intent_id == "E1-ttlcxl"] + assert len(cancels) == 1 and cancels[0].action.value == "CANCEL" + assert router.working_orders() == [] + assert driver.counters["entry_skips"] == 1 + assert driver.ttl_resolution_hist.count == 1 + + +@pytest.mark.asyncio +async def test_retry_chain_then_exhaust_skip(): + rt, router, sched, driver = _setup(miss="retry", retries=1, exhaust="skip") + + async def body(): + plan = _enter_plan(router, "E2") + driver.on_submit(plan, _intent("E2", limit_price=plan.limit_price)) + await asyncio.sleep(2 * (TTL_MS / 1000) + 0.3) # two expiries + + await _drive(driver, sched, body) + tids = [i.trade_id for i in rt.submitted if i.action.value == "ENTER"] + assert tids == ["E2-r1"] # exactly one retry + r1 = next(i for i in rt.submitted if i.trade_id == "E2-r1") + assert r1.order_type == "LIMIT" + assert r1.metadata["_time_in_force"] == "PostOnly" + assert driver.counters["entry_retries"] == 1 + assert driver.counters["entry_skips"] == 1 # the exhaust + assert router.working_orders() == [] + + +@pytest.mark.asyncio +async def test_exhaust_market_submits_market_enter(): + rt, router, sched, driver = _setup(miss="retry", retries=0, exhaust="market") + + async def body(): + plan = _enter_plan(router, "E3") + driver.on_submit(plan, _intent("E3", limit_price=plan.limit_price)) + await asyncio.sleep(TTL_MS / 1000 + 0.15) + + await _drive(driver, sched, body) + markets = [i for i in rt.submitted if i.trade_id == "E3-m"] + assert len(markets) == 1 + assert markets[0].order_type == "MARKET" + assert driver.counters["entry_markets"] == 1 + + +@pytest.mark.asyncio +async def test_exit_expiry_escalates_market_same_trade_id(): + rt, router, sched, driver = _setup() + rt.slot = ("X1", "POSITION_OPEN", 1.0) # position held by X1 + + async def body(): + plan = _exit_plan(router, "X1") + assert plan.is_maker + driver.on_submit(plan, _intent("X1", limit_price=plan.limit_price)) + await asyncio.sleep(TTL_MS / 1000 + 0.15) + + await _drive(driver, sched, body) + mkt = [i for i in rt.submitted if i.intent_id == "X1-mkt"] + assert len(mkt) == 1 + assert mkt[0].trade_id == "X1" # R1: SAME trade lifecycle + assert mkt[0].order_type == "MARKET" + assert mkt[0].metadata["_time_in_force"] == "GTC" + assert driver.counters["exit_market_fallbacks"] == 1 + + +@pytest.mark.asyncio +async def test_fill_racing_cancel_is_noted_not_retried(): + rt, router, sched, driver = _setup(miss="retry", retries=3) + fired = {"n": 0} + + def pump_hook(): + # The SECOND pump (after the cancel) reveals the fill won the race. + fired["n"] += 1 + if fired["n"] >= 2: + rt.slot = ("E4", "POSITION_OPEN", 1.0) + + rt.pump_hook = pump_hook + + async def body(): + plan = _enter_plan(router, "E4") + driver.on_submit(plan, _intent("E4", limit_price=plan.limit_price)) + await asyncio.sleep(TTL_MS / 1000 + 0.15) + + await _drive(driver, sched, body) + assert driver.counters["fills_after_ttl"] == 1 + assert driver.counters["entry_retries"] == 0 # no re-quote after a fill + assert [i.trade_id for i in rt.submitted + if i.action.value == "ENTER"] == [] + + +@pytest.mark.asyncio +async def test_on_fill_cancels_deadline_before_expiry(): + rt, router, sched, driver = _setup() + + async def body(): + plan = _enter_plan(router, "E5") + driver.on_submit(plan, _intent("E5", limit_price=plan.limit_price)) + driver.on_fill("E5") # WS fill notification + await asyncio.sleep(TTL_MS / 1000 + 0.1) + + await _drive(driver, sched, body) + assert rt.submitted == [] # no cancel ever sent + assert driver.counters["deadline_fires"] == 0 + assert router.working_orders() == [] + + +@pytest.mark.asyncio +async def test_post_only_reject_resolves_instantly(): + rt, router, sched, driver = _setup(miss="skip") + rt.slot = ("E6", "ORDER_REJECTED", 0.0) + + async def body(): + plan = _enter_plan(router, "E6") + driver.on_submit(plan, _intent("E6", limit_price=plan.limit_price)) + await asyncio.sleep(0.05) # far below TTL + + await _drive(driver, sched, body) + assert driver.counters["deadline_fires"] == 1 # schedule_in(0) fired now + assert any(i.intent_id == "E6-ttlcxl" for i in rt.submitted) + assert router.working_orders() == [] + + +@pytest.mark.asyncio +async def test_requote_gate_fails_safe_on_probe_error(): + rt, router, sched, driver = _setup(miss="retry", retries=2) + + def boom(): + raise RuntimeError("venue probe down") + + driver.ports.venue_flat = boom + + async def body(): + plan = _enter_plan(router, "E7") + driver.on_submit(plan, _intent("E7", limit_price=plan.limit_price)) + await asyncio.sleep(TTL_MS / 1000 + 0.15) + + await _drive(driver, sched, body) + assert driver.counters["requote_blocked"] == 1 + assert driver.counters["entry_retries"] == 0 # ambiguity ⇒ skip, never quote + assert router.working_orders() == [] + + +@pytest.mark.asyncio +async def test_immediate_fill_never_registers(): + rt, router, sched, driver = _setup() + rt.slot = ("E8", "POSITION_OPENED", 1.0) + + async def body(): + plan = _enter_plan(router, "E8") + driver.on_submit(plan, _intent("E8", limit_price=plan.limit_price)) + + await _drive(driver, sched, body) + assert driver.counters["immediate_fills"] == 1 + assert router.working_orders() == [] + assert driver.snapshot()["pending_deadlines"] == 0 + + +if __name__ == "__main__": + raise SystemExit(pytest.main([__file__, "-v"])) diff --git a/prod/clean_arch/violet/test_violet_scripted_venue.py b/prod/clean_arch/violet/test_violet_scripted_venue.py new file mode 100644 index 0000000..e70e2af --- /dev/null +++ b/prod/clean_arch/violet/test_violet_scripted_venue.py @@ -0,0 +1,137 @@ +"""V2b: ScriptedVenue — directive semantics + kernel CANCEL mapping.""" + +from __future__ import annotations + +import sys +import time +from datetime import datetime, timezone + +sys.path.insert(0, "/mnt/dolphinng5_predict") +sys.path.insert(0, "/mnt/dolphinng5_predict/nautilus_dolphin") + +import pytest + +from prod.clean_arch.dita_v2.contracts import ( + KernelCommandType, + KernelEventKind, + KernelIntent, + TradeSide, + TradeStage, +) +from prod.clean_arch.violet.scripted_venue import Directive, ScriptedVenue + + +def _intent(tid: str, action=KernelCommandType.ENTER, *, order_type="LIMIT", + limit_price=99.5, ref=100.0) -> KernelIntent: + return KernelIntent( + timestamp=datetime.now(timezone.utc), + intent_id=tid, trade_id=tid, slot_id=0, + asset="BTCUSDT", side=TradeSide.SHORT, action=action, + reference_price=ref, target_size=1.0, leverage=1.0, + exit_leg_ratios=(1.0,), reason="test", metadata={}, + stage=TradeStage.INTENT_CREATED, + order_type=order_type, limit_price=limit_price, + ) + + +def test_rest_then_expire_acks_only_and_cancel_pops(): + v = ScriptedVenue() + v.set_directive("T1", Directive.REST_THEN_EXPIRE) + events = v.submit(_intent("T1")) + assert [e.kind for e in events] == [KernelEventKind.ORDER_ACK] + assert len(v.open_orders()) == 1 + order = v.open_orders()[0] + assert order.internal_trade_id == "T1" + out = v.cancel(order) + assert [e.kind for e in out] == [KernelEventKind.CANCEL_ACK] + assert v.open_orders() == [] + assert v.reconcile() == [] # nothing ever falls due + + +def test_rest_then_fill_releases_via_reconcile_at_limit_price(): + v = ScriptedVenue() + v.set_directive("T2", Directive.REST_THEN_FILL, fill_delay_ms=10.0) + v.submit(_intent("T2", limit_price=99.25)) + assert v.reconcile() == [] # not due yet + time.sleep(0.02) + fills = v.reconcile() + assert [e.kind for e in fills] == [KernelEventKind.FULL_FILL] + assert fills[0].price == 99.25 # maker fills at the limit + assert fills[0].filled_size == 1.0 + assert v.reconcile() == [] # released exactly once + + +def test_post_only_reject(): + v = ScriptedVenue() + v.set_directive("T3", Directive.POST_ONLY_REJECT) + events = v.submit(_intent("T3")) + assert [e.kind for e in events] == [KernelEventKind.ORDER_REJECT] + assert v.open_orders() == [] + + +def test_fill_races_cancel(): + """Cancel is rejected and the fill surfaces on the next reconcile — + the late-WS-fill-beats-cancel race.""" + v = ScriptedVenue() + v.set_directive("T4", Directive.FILL_RACES_CANCEL) + v.submit(_intent("T4")) + order = v.open_orders()[0] + out = v.cancel(order) + assert [e.kind for e in out] == [KernelEventKind.CANCEL_REJECT] + fills = v.reconcile() + assert [e.kind for e in fills] == [KernelEventKind.FULL_FILL] + assert fills[0].trade_id == "T4" + + +def test_cancel_reject_directive(): + v = ScriptedVenue() + v.set_directive("T5", Directive.CANCEL_REJECT) + v.submit(_intent("T5")) + out = v.cancel(v.open_orders()[0]) + assert [e.kind for e in out] == [KernelEventKind.CANCEL_REJECT] + assert v.reconcile() == [] # no phantom fill + + +def test_prefix_lookup_covers_retries(): + v = ScriptedVenue() + v.set_directive("T6", Directive.REST_THEN_EXPIRE) + v.set_directive("T6-r1", Directive.IMMEDIATE_FILL) + assert [e.kind for e in v.submit(_intent("T6"))] == [KernelEventKind.ORDER_ACK] + kinds = [e.kind for e in v.submit(_intent("T6-r1"))] + assert KernelEventKind.FULL_FILL in kinds # own directive wins + kinds_m = [e.kind for e in v.submit(_intent("T6-m"))] + assert kinds_m == [KernelEventKind.ORDER_ACK] # falls back to T6 prefix + + +def test_no_directive_is_parent_default(): + v = ScriptedVenue() + kinds = [e.kind for e in v.submit(_intent("T7"))] + assert KernelEventKind.FULL_FILL in kinds + + +def test_kernel_cancel_reaches_venue_with_right_order(): + """Full kernel drive: ENTER (resting LIMIT) then CANCEL via + process_intent — the CANCEL must reach ScriptedVenue.cancel with the + venue_order_id of the resting quote and pop it.""" + from prod.clean_arch.dita_v2.launcher import build_launcher_bundle + + venue = ScriptedVenue() + venue.set_directive("T-CXL", Directive.REST_THEN_EXPIRE) + bundle = build_launcher_bundle(venue_mode="MOCK", max_slots=1, venue=venue) + kernel = bundle.kernel + + kernel.process_intent(_intent("T-CXL")) + assert venue.submits == ["T-CXL"] + assert len(venue.open_orders()) == 1 + resting_oid = venue.open_orders()[0].venue_order_id + + cancel = _intent("T-CXL", action=KernelCommandType.CANCEL, + order_type="MARKET", limit_price=0.0) + kernel.process_intent(cancel) + assert venue.cancels == ["T-CXL"] + assert venue.open_orders() == [] # CANCEL_ACK popped it + assert resting_oid # mapping existed end-to-end + + +if __name__ == "__main__": + raise SystemExit(pytest.main([__file__, "-v"]))