"""VIOLET V2: ExecDeadlineDriver — event-driven TTL resolution at 100 ms. Lifts the PINK TTL-resolution logic (pink_direct.py `_exec_after_submit` / `_exec_ttl_loop` / `_handle_expired_working`, proven live) out of the runtime into a standalone driver with injected ports, and replaces the 1 s polling sweep with ONE DeadlineScheduler deadline PER WORKING ORDER — sub-second TTLs with measured jitter instead of polling quantization. Timing authority note: the shared ExecutionRouter clamps TTLs to >= 0.5 s (ExecConfig.from_env [0.5, 300] + register_working's max(0.5, ttl_s) floor). This driver therefore NEVER polls router.expired(); it schedules its own deadlines from ExecDriverSettings.ttl_ms_for(plan.ttl_s) and uses the router purely as policy authority (plans, miss policy, retry/fallback, registry pop-semantics). The router's internal WorkingOrder.deadline is vestigial here. Concurrency: asyncio single-threaded cooperative. Re-entrancy between a deadline fire and a fill notification is handled by (a) the `_resolving` single-flight set, (b) the router's pop-semantics (`note_fill`/`note_cancel` make double resolution a no-op), and (c) a state re-check after every await (fills race cancels on a live venue — same doctrine as pink_direct). """ from __future__ import annotations import asyncio import logging from dataclasses import dataclass, field, replace from datetime import datetime, timezone from typing import Any, Awaitable, Callable, Dict, Optional, Set, Tuple from prod.clean_arch.dita_v2.contracts import ( KernelCommandType, KernelIntent, TradeSide, ) from prod.clean_arch.dita_v2.exec_router import ExecutionRouter, MissAction from .clock import Deadline, DeadlineScheduler, LatencyHistogram, mono_ns from .domain import ExecDriverSettings, typed LOGGER = logging.getLogger("violet.exec_driver") SlotView = Tuple[str, str, float] # (trade_id, fsm_state_name, size) # Copied from pink_direct._SLOT_OPENISH (9 string constants — a copy, not a # runtime import: the driver must not couple to PinkDirectRuntime). SLOT_OPENISH = ("PARTIAL_FILL", "POSITION_OPENED", "POSITION_OPEN", "EXIT_REQUESTED", "EXIT_SENT", "EXIT_ACKED", "EXIT_WORKING", "POSITION_PARTIALLY_CLOSED") SLOT_CLOSEDISH = ("POSITION_CLOSED", "CLOSED", "TRADE_TERMINAL_WRITTEN", "IDLE") @dataclass class ExecDriverPorts: """Everything the driver needs from its runtime, injected.""" router: ExecutionRouter submit_intent: Callable[[KernelIntent], Awaitable[Any]] pump_events: Callable[[], Awaitable[int]] slot_view: Callable[[], SlotView] venue_flat: Callable[[], bool] # raises ⇒ treated as NOT flat last_own_fill_mono_ns: Callable[[], int] reference_price: Callable[[str], float] # asset → freshest px, 0.0 unknown exit_intent_fixup: Callable[[KernelIntent], KernelIntent] = lambda i: i emit: Callable[..., None] = lambda **kw: None # telemetry sink class ExecDeadlineDriver: """One instance per runtime; single-slot scope (max_slots=1).""" def __init__( self, ports: ExecDriverPorts, scheduler: DeadlineScheduler, *, settings: Optional[ExecDriverSettings] = None, ttl_resolution_hist: Optional[LatencyHistogram] = None, logger: logging.Logger = LOGGER, ) -> None: self.ports = ports self.scheduler = scheduler self.settings = settings or ExecDriverSettings() self.ttl_resolution_hist = ttl_resolution_hist or LatencyHistogram( "ttl_resolution") self.logger = logger self._working_intents: Dict[str, KernelIntent] = {} self._deadlines: Dict[str, Deadline] = {} self._resolving: Set[str] = set() self.counters: Dict[str, int] = { "immediate_fills": 0, "working_registered": 0, "deadline_fires": 0, "fills_after_ttl": 0, "exit_market_fallbacks": 0, "entry_skips": 0, "entry_retries": 0, "entry_markets": 0, "requote_blocked": 0, "resolve_errors": 0, } # ── submit-side ─────────────────────────────────────────────────────────── @typed def on_submit(self, plan: Any, intent: KernelIntent) -> None: """Classify a maker submit: filled now, resting, or rejected. Resting and rejected both register and get a deadline — a rejected post-only quote gets schedule_in(0) so the SAME resolution path (cancel → re-classify → miss/fallback) handles it immediately. """ router = self.ports.router tid = intent.trade_id slot_tid, stage, size = self.ports.slot_view() filled = ( (plan.action == "ENTER" and slot_tid == tid and size > 0.0 and stage in SLOT_OPENISH) or (plan.action == "EXIT" and ( slot_tid != tid or stage in SLOT_CLOSEDISH)) ) if filled: self.counters["immediate_fills"] += 1 self.ports.emit(event="immediate_fill", action=plan.action, trade_id=tid, reason=plan.reason) return rejected = stage in ("ORDER_REJECTED", "EXIT_REJECTED") router.register_working( trade_id=tid, asset=intent.asset, position_side=(getattr(intent.side, "value", None) or str(intent.side)), plan=plan, base_trade_id=plan.metadata.get("base_trade_id") or tid, retry_n=int(plan.metadata.get("retry_n", 0) or 0), ) self._working_intents[tid] = intent self.counters["working_registered"] += 1 ttl_ms = 0.0 if rejected else self.settings.ttl_ms_for(plan.ttl_s) self._deadlines[tid] = self.scheduler.schedule_in( ttl_ms, lambda t=tid: self._fire(t), name=f"ttl:{tid}") self.ports.emit(event="working", action=plan.action, trade_id=tid, limit_price=plan.limit_price, ttl_ms=ttl_ms, rejected=rejected, reason=plan.reason) # ── fill/cancel notifications (the polling loop never needed these) ────── @typed def on_fill(self, trade_id: str) -> None: dl = self._deadlines.pop(trade_id, None) if dl is not None: dl.cancel() self.ports.router.note_fill(trade_id) self._working_intents.pop(trade_id, None) @typed def on_cancel(self, trade_id: str) -> None: dl = self._deadlines.pop(trade_id, None) if dl is not None: dl.cancel() self.ports.router.note_cancel(trade_id) self._working_intents.pop(trade_id, None) # ── deadline path ───────────────────────────────────────────────────────── def _fire(self, trade_id: str) -> None: """DeadlineScheduler callback: sync + non-blocking by contract.""" fire_ns = mono_ns() self.counters["deadline_fires"] += 1 if trade_id in self._resolving: return self._resolving.add(trade_id) asyncio.get_running_loop().create_task( self._resolve_expired(trade_id, fire_ns), name=f"violet_exec_resolve:{trade_id}", ) async def _resolve_expired(self, trade_id: str, fire_ns: int) -> None: try: await self._resolve_expired_inner(trade_id, fire_ns) except Exception as exc: # noqa: BLE001 self.counters["resolve_errors"] += 1 self.logger.error("exec_driver: resolution failed for %s: %s", trade_id, exc, exc_info=True) finally: self._resolving.discard(trade_id) self._deadlines.pop(trade_id, None) async def _resolve_expired_inner(self, trade_id: str, fire_ns: int) -> None: router = self.ports.router wo = router.working(trade_id) if wo is None: return # already resolved (fill/cancel notification raced us) # 1. Drain late venue events — the quote may already be filled. await self.ports.pump_events() if router.working(trade_id) is None: return def _entry_filled() -> bool: slot_tid, stage, size = self.ports.slot_view() return slot_tid == trade_id and size > 0.0 and stage in SLOT_OPENISH def _exit_done() -> bool: slot_tid, stage, size = self.ports.slot_view() return (slot_tid != trade_id or size <= 0.0 or stage in SLOT_CLOSEDISH) # 2. Cancel the quote (idempotent; CANCEL_REJECT on a filled order is # harmless). ttl_resolution = fire → cancel-intent handed to the # kernel: the gated V2 latency metric. base = self._working_intents.get(trade_id) cancel_intent = KernelIntent( timestamp=datetime.now(timezone.utc), intent_id=f"{trade_id}-ttlcxl", trade_id=trade_id, slot_id=0, asset=wo.asset, side=(base.side if base is not None else TradeSide.SHORT), action=KernelCommandType.CANCEL, reference_price=(base.reference_price if base is not None else 0.0), target_size=(base.target_size if base is not None else 0.0), leverage=(base.leverage if base is not None else 1.0), reason="exec_driver:ttl_expired", ) self.ttl_resolution_hist.record(mono_ns() - fire_ns) try: await self.ports.submit_intent(cancel_intent) except Exception as exc: # noqa: BLE001 self.logger.warning("exec_driver: ttl-cancel %s failed: %s", trade_id, exc) await self.ports.pump_events() # 3. Re-classify after the cancel round-trip (fill may have raced it). if ((wo.action == "ENTER" and _entry_filled()) or (wo.action == "EXIT" and _exit_done())): router.note_fill(trade_id) self._working_intents.pop(trade_id, None) self.counters["fills_after_ttl"] += 1 self.ports.emit(event="fill_after_ttl", trade_id=trade_id) return router.note_cancel(trade_id) base_intent = self._working_intents.pop(trade_id, None) # 4. EXIT: never strand a position — MARKET escalation, SAME trade_id. if wo.action == "EXIT": _tid, plan = router.market_fallback_plan(wo) if base_intent is not None and not _exit_done(): market_exit = replace( base_intent, intent_id=f"{trade_id}-mkt", order_type="MARKET", limit_price=0.0, timestamp=datetime.now(timezone.utc), metadata={**(base_intent.metadata or {}), "_time_in_force": "GTC", "_exec_reason": plan.reason}, ) market_exit = self.ports.exit_intent_fixup(market_exit) self.counters["exit_market_fallbacks"] += 1 self.logger.warning("exec_driver: exit TTL → MARKET fallback %s", trade_id) self.ports.emit(event="exit_market_fallback", trade_id=trade_id) await self.ports.submit_intent(market_exit) await self.ports.pump_events() return # 5. ENTER miss policy: skip | retry (bounded) | market. action = router.entry_miss_action(wo) self.ports.emit(event="entry_miss", trade_id=trade_id, action=action, retry_n=wo.retry_n) if action == MissAction.SKIP or base_intent is None: self.counters["entry_skips"] += 1 return slot_tid, stage, size = self.ports.slot_view() if size > 0.0 or stage in SLOT_OPENISH: # Slot occupied (raced fill of remainder / another trade) — never # double-enter. self.counters["entry_skips"] += 1 self.logger.warning("exec_driver: entry miss %s: slot busy (%s) — skip", trade_id, stage) return # Venue-truth gate: re-quote ONLY when the venue is provably flat AND # no own fill landed in the hot window. Ambiguity → skip; a skipped # entry is always safe, a doubled position is not. if not self._safe_to_requote(): self.counters["requote_blocked"] += 1 self.ports.emit(event="requote_blocked", trade_id=trade_id) return ref = 0.0 try: ref = float(self.ports.reference_price(wo.asset) or 0.0) except Exception: # noqa: BLE001 ref = 0.0 if ref <= 0.0: ref = float(wo.plan.limit_price or 0.0) if action == MissAction.RETRY: new_tid, plan = router.retry_plan(wo, reference_price=ref) self.counters["entry_retries"] += 1 else: # MARKET new_tid, plan = router.market_fallback_plan(wo) self.counters["entry_markets"] += 1 new_intent = replace( base_intent, trade_id=new_tid, intent_id=new_tid, timestamp=datetime.now(timezone.utc), reference_price=(ref if ref > 0.0 else base_intent.reference_price), order_type=plan.order_type, limit_price=float(plan.limit_price or 0.0), metadata={**(base_intent.metadata or {}), "_time_in_force": ("PostOnly" if plan.post_only else "GTC"), "_exec_reason": plan.reason}, ) self.logger.info("exec_driver: entry %s → %s as %s", trade_id, action, new_tid) await self.ports.submit_intent(new_intent) if plan.is_maker: self.on_submit(plan, new_intent) # deadline chains await self.ports.pump_events() # ── guards / observability ──────────────────────────────────────────────── def _safe_to_requote(self) -> bool: """Fail SAFE: recent own fill, any live position, or probe error ⇒ no.""" if (mono_ns() - int(self.ports.last_own_fill_mono_ns() or 0) < self.settings.requote_hot_window_ns): return False try: return bool(self.ports.venue_flat()) except Exception as exc: # noqa: BLE001 self.logger.warning("exec_driver: requote probe failed (%s) — fail safe", exc) return False def snapshot(self) -> Dict[str, Any]: return { "working": [w.trade_id for w in self.ports.router.working_orders()], "pending_deadlines": len(self._deadlines), "resolving": len(self._resolving), "counters": dict(self.counters), "ttl_resolution": self.ttl_resolution_hist.to_dict(), } async def drain(self, timeout_s: float = 5.0) -> bool: """Quiesce: True when no pending deadlines / in-flight resolutions / working orders remain within the timeout.""" deadline = mono_ns() + int(timeout_s * 1e9) while mono_ns() < deadline: if (not self._deadlines and not self._resolving and not self.ports.router.working_orders()): return True await asyncio.sleep(0.005) return False