exec_driver.py lifts PINK's TTL-resolution logic (_exec_after_submit / _handle_expired_working, live-proven) into a standalone driver with injected ports (router/submit/pump/slot_view/venue_flat/ref-price) and replaces the 1s polling sweep with one DeadlineScheduler deadline per working order. The driver is the TIMING authority (router clamps TTLs >=0.5s — its internal deadline is vestigial here); the router stays the POLICY authority. R1 preserved verbatim: exit TTL -> MARKET escalation on the SAME trade_id; post-only reject -> schedule_in(0) through the one shared resolution path; venue-truth requote gate fails safe. scripted_venue.py subclasses MockVenueAdapter (zero shared edits) with per-trade directives: IMMEDIATE_FILL / REST_THEN_FILL / REST_THEN_EXPIRE / POST_ONLY_REJECT / CANCEL_REJECT / FILL_RACES_CANCEL; deferred fills release through reconcile() (the production pump seam), never the 50ms subscribe() poll. 17 new tests incl. full-kernel CANCEL->venue mapping, fill-races-cancel, bounded retry chains, on_fill deadline cancellation, fail-safe probe. Router 77 green untouched; shared files clean. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
346 lines
15 KiB
Python
346 lines
15 KiB
Python
"""VIOLET V2: ExecDeadlineDriver — event-driven TTL resolution at 100 ms.
|
|
|
|
Lifts the PINK TTL-resolution logic (pink_direct.py `_exec_after_submit` /
|
|
`_exec_ttl_loop` / `_handle_expired_working`, proven live) out of the
|
|
runtime into a standalone driver with injected ports, and replaces the 1 s
|
|
polling sweep with ONE DeadlineScheduler deadline PER WORKING ORDER —
|
|
sub-second TTLs with measured jitter instead of polling quantization.
|
|
|
|
Timing authority note: the shared ExecutionRouter clamps TTLs to >= 0.5 s
|
|
(ExecConfig.from_env [0.5, 300] + register_working's max(0.5, ttl_s)
|
|
floor). This driver therefore NEVER polls router.expired(); it schedules
|
|
its own deadlines from ExecDriverSettings.ttl_ms_for(plan.ttl_s) and uses
|
|
the router purely as policy authority (plans, miss policy, retry/fallback,
|
|
registry pop-semantics). The router's internal WorkingOrder.deadline is
|
|
vestigial here.
|
|
|
|
Concurrency: asyncio single-threaded cooperative. Re-entrancy between a
|
|
deadline fire and a fill notification is handled by (a) the `_resolving`
|
|
single-flight set, (b) the router's pop-semantics (`note_fill`/`note_cancel`
|
|
make double resolution a no-op), and (c) a state re-check after every await
|
|
(fills race cancels on a live venue — same doctrine as pink_direct).
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
from dataclasses import dataclass, field, replace
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Awaitable, Callable, Dict, Optional, Set, Tuple
|
|
|
|
from prod.clean_arch.dita_v2.contracts import (
|
|
KernelCommandType,
|
|
KernelIntent,
|
|
TradeSide,
|
|
)
|
|
from prod.clean_arch.dita_v2.exec_router import ExecutionRouter, MissAction
|
|
|
|
from .clock import Deadline, DeadlineScheduler, LatencyHistogram, mono_ns
|
|
from .domain import ExecDriverSettings, typed
|
|
|
|
LOGGER = logging.getLogger("violet.exec_driver")
|
|
|
|
SlotView = Tuple[str, str, float] # (trade_id, fsm_state_name, size)
|
|
|
|
# Copied from pink_direct._SLOT_OPENISH (9 string constants — a copy, not a
|
|
# runtime import: the driver must not couple to PinkDirectRuntime).
|
|
SLOT_OPENISH = ("PARTIAL_FILL", "POSITION_OPENED", "POSITION_OPEN",
|
|
"EXIT_REQUESTED", "EXIT_SENT", "EXIT_ACKED", "EXIT_WORKING",
|
|
"POSITION_PARTIALLY_CLOSED")
|
|
SLOT_CLOSEDISH = ("POSITION_CLOSED", "CLOSED", "TRADE_TERMINAL_WRITTEN", "IDLE")
|
|
|
|
|
|
@dataclass
|
|
class ExecDriverPorts:
|
|
"""Everything the driver needs from its runtime, injected."""
|
|
|
|
router: ExecutionRouter
|
|
submit_intent: Callable[[KernelIntent], Awaitable[Any]]
|
|
pump_events: Callable[[], Awaitable[int]]
|
|
slot_view: Callable[[], SlotView]
|
|
venue_flat: Callable[[], bool] # raises ⇒ treated as NOT flat
|
|
last_own_fill_mono_ns: Callable[[], int]
|
|
reference_price: Callable[[str], float] # asset → freshest px, 0.0 unknown
|
|
exit_intent_fixup: Callable[[KernelIntent], KernelIntent] = lambda i: i
|
|
emit: Callable[..., None] = lambda **kw: None # telemetry sink
|
|
|
|
|
|
class ExecDeadlineDriver:
|
|
"""One instance per runtime; single-slot scope (max_slots=1)."""
|
|
|
|
def __init__(
|
|
self,
|
|
ports: ExecDriverPorts,
|
|
scheduler: DeadlineScheduler,
|
|
*,
|
|
settings: Optional[ExecDriverSettings] = None,
|
|
ttl_resolution_hist: Optional[LatencyHistogram] = None,
|
|
logger: logging.Logger = LOGGER,
|
|
) -> None:
|
|
self.ports = ports
|
|
self.scheduler = scheduler
|
|
self.settings = settings or ExecDriverSettings()
|
|
self.ttl_resolution_hist = ttl_resolution_hist or LatencyHistogram(
|
|
"ttl_resolution")
|
|
self.logger = logger
|
|
self._working_intents: Dict[str, KernelIntent] = {}
|
|
self._deadlines: Dict[str, Deadline] = {}
|
|
self._resolving: Set[str] = set()
|
|
self.counters: Dict[str, int] = {
|
|
"immediate_fills": 0, "working_registered": 0, "deadline_fires": 0,
|
|
"fills_after_ttl": 0, "exit_market_fallbacks": 0,
|
|
"entry_skips": 0, "entry_retries": 0, "entry_markets": 0,
|
|
"requote_blocked": 0, "resolve_errors": 0,
|
|
}
|
|
|
|
# ── submit-side ───────────────────────────────────────────────────────────
|
|
|
|
@typed
|
|
def on_submit(self, plan: Any, intent: KernelIntent) -> None:
|
|
"""Classify a maker submit: filled now, resting, or rejected.
|
|
|
|
Resting and rejected both register and get a deadline — a rejected
|
|
post-only quote gets schedule_in(0) so the SAME resolution path
|
|
(cancel → re-classify → miss/fallback) handles it immediately.
|
|
"""
|
|
router = self.ports.router
|
|
tid = intent.trade_id
|
|
slot_tid, stage, size = self.ports.slot_view()
|
|
filled = (
|
|
(plan.action == "ENTER" and slot_tid == tid and size > 0.0
|
|
and stage in SLOT_OPENISH)
|
|
or (plan.action == "EXIT" and (
|
|
slot_tid != tid or stage in SLOT_CLOSEDISH))
|
|
)
|
|
if filled:
|
|
self.counters["immediate_fills"] += 1
|
|
self.ports.emit(event="immediate_fill", action=plan.action,
|
|
trade_id=tid, reason=plan.reason)
|
|
return
|
|
rejected = stage in ("ORDER_REJECTED", "EXIT_REJECTED")
|
|
router.register_working(
|
|
trade_id=tid, asset=intent.asset,
|
|
position_side=(getattr(intent.side, "value", None) or str(intent.side)),
|
|
plan=plan,
|
|
base_trade_id=plan.metadata.get("base_trade_id") or tid,
|
|
retry_n=int(plan.metadata.get("retry_n", 0) or 0),
|
|
)
|
|
self._working_intents[tid] = intent
|
|
self.counters["working_registered"] += 1
|
|
ttl_ms = 0.0 if rejected else self.settings.ttl_ms_for(plan.ttl_s)
|
|
self._deadlines[tid] = self.scheduler.schedule_in(
|
|
ttl_ms, lambda t=tid: self._fire(t), name=f"ttl:{tid}")
|
|
self.ports.emit(event="working", action=plan.action, trade_id=tid,
|
|
limit_price=plan.limit_price, ttl_ms=ttl_ms,
|
|
rejected=rejected, reason=plan.reason)
|
|
|
|
# ── fill/cancel notifications (the polling loop never needed these) ──────
|
|
|
|
@typed
|
|
def on_fill(self, trade_id: str) -> None:
|
|
dl = self._deadlines.pop(trade_id, None)
|
|
if dl is not None:
|
|
dl.cancel()
|
|
self.ports.router.note_fill(trade_id)
|
|
self._working_intents.pop(trade_id, None)
|
|
|
|
@typed
|
|
def on_cancel(self, trade_id: str) -> None:
|
|
dl = self._deadlines.pop(trade_id, None)
|
|
if dl is not None:
|
|
dl.cancel()
|
|
self.ports.router.note_cancel(trade_id)
|
|
self._working_intents.pop(trade_id, None)
|
|
|
|
# ── deadline path ─────────────────────────────────────────────────────────
|
|
|
|
def _fire(self, trade_id: str) -> None:
|
|
"""DeadlineScheduler callback: sync + non-blocking by contract."""
|
|
fire_ns = mono_ns()
|
|
self.counters["deadline_fires"] += 1
|
|
if trade_id in self._resolving:
|
|
return
|
|
self._resolving.add(trade_id)
|
|
asyncio.get_running_loop().create_task(
|
|
self._resolve_expired(trade_id, fire_ns),
|
|
name=f"violet_exec_resolve:{trade_id}",
|
|
)
|
|
|
|
async def _resolve_expired(self, trade_id: str, fire_ns: int) -> None:
|
|
try:
|
|
await self._resolve_expired_inner(trade_id, fire_ns)
|
|
except Exception as exc: # noqa: BLE001
|
|
self.counters["resolve_errors"] += 1
|
|
self.logger.error("exec_driver: resolution failed for %s: %s",
|
|
trade_id, exc, exc_info=True)
|
|
finally:
|
|
self._resolving.discard(trade_id)
|
|
self._deadlines.pop(trade_id, None)
|
|
|
|
async def _resolve_expired_inner(self, trade_id: str, fire_ns: int) -> None:
|
|
router = self.ports.router
|
|
wo = router.working(trade_id)
|
|
if wo is None:
|
|
return # already resolved (fill/cancel notification raced us)
|
|
|
|
# 1. Drain late venue events — the quote may already be filled.
|
|
await self.ports.pump_events()
|
|
if router.working(trade_id) is None:
|
|
return
|
|
|
|
def _entry_filled() -> bool:
|
|
slot_tid, stage, size = self.ports.slot_view()
|
|
return slot_tid == trade_id and size > 0.0 and stage in SLOT_OPENISH
|
|
|
|
def _exit_done() -> bool:
|
|
slot_tid, stage, size = self.ports.slot_view()
|
|
return (slot_tid != trade_id or size <= 0.0
|
|
or stage in SLOT_CLOSEDISH)
|
|
|
|
# 2. Cancel the quote (idempotent; CANCEL_REJECT on a filled order is
|
|
# harmless). ttl_resolution = fire → cancel-intent handed to the
|
|
# kernel: the gated V2 latency metric.
|
|
base = self._working_intents.get(trade_id)
|
|
cancel_intent = KernelIntent(
|
|
timestamp=datetime.now(timezone.utc),
|
|
intent_id=f"{trade_id}-ttlcxl",
|
|
trade_id=trade_id,
|
|
slot_id=0,
|
|
asset=wo.asset,
|
|
side=(base.side if base is not None else TradeSide.SHORT),
|
|
action=KernelCommandType.CANCEL,
|
|
reference_price=(base.reference_price if base is not None else 0.0),
|
|
target_size=(base.target_size if base is not None else 0.0),
|
|
leverage=(base.leverage if base is not None else 1.0),
|
|
reason="exec_driver:ttl_expired",
|
|
)
|
|
self.ttl_resolution_hist.record(mono_ns() - fire_ns)
|
|
try:
|
|
await self.ports.submit_intent(cancel_intent)
|
|
except Exception as exc: # noqa: BLE001
|
|
self.logger.warning("exec_driver: ttl-cancel %s failed: %s",
|
|
trade_id, exc)
|
|
await self.ports.pump_events()
|
|
|
|
# 3. Re-classify after the cancel round-trip (fill may have raced it).
|
|
if ((wo.action == "ENTER" and _entry_filled())
|
|
or (wo.action == "EXIT" and _exit_done())):
|
|
router.note_fill(trade_id)
|
|
self._working_intents.pop(trade_id, None)
|
|
self.counters["fills_after_ttl"] += 1
|
|
self.ports.emit(event="fill_after_ttl", trade_id=trade_id)
|
|
return
|
|
|
|
router.note_cancel(trade_id)
|
|
base_intent = self._working_intents.pop(trade_id, None)
|
|
|
|
# 4. EXIT: never strand a position — MARKET escalation, SAME trade_id.
|
|
if wo.action == "EXIT":
|
|
_tid, plan = router.market_fallback_plan(wo)
|
|
if base_intent is not None and not _exit_done():
|
|
market_exit = replace(
|
|
base_intent,
|
|
intent_id=f"{trade_id}-mkt",
|
|
order_type="MARKET", limit_price=0.0,
|
|
timestamp=datetime.now(timezone.utc),
|
|
metadata={**(base_intent.metadata or {}),
|
|
"_time_in_force": "GTC",
|
|
"_exec_reason": plan.reason},
|
|
)
|
|
market_exit = self.ports.exit_intent_fixup(market_exit)
|
|
self.counters["exit_market_fallbacks"] += 1
|
|
self.logger.warning("exec_driver: exit TTL → MARKET fallback %s",
|
|
trade_id)
|
|
self.ports.emit(event="exit_market_fallback", trade_id=trade_id)
|
|
await self.ports.submit_intent(market_exit)
|
|
await self.ports.pump_events()
|
|
return
|
|
|
|
# 5. ENTER miss policy: skip | retry (bounded) | market.
|
|
action = router.entry_miss_action(wo)
|
|
self.ports.emit(event="entry_miss", trade_id=trade_id, action=action,
|
|
retry_n=wo.retry_n)
|
|
if action == MissAction.SKIP or base_intent is None:
|
|
self.counters["entry_skips"] += 1
|
|
return
|
|
slot_tid, stage, size = self.ports.slot_view()
|
|
if size > 0.0 or stage in SLOT_OPENISH:
|
|
# Slot occupied (raced fill of remainder / another trade) — never
|
|
# double-enter.
|
|
self.counters["entry_skips"] += 1
|
|
self.logger.warning("exec_driver: entry miss %s: slot busy (%s) — skip",
|
|
trade_id, stage)
|
|
return
|
|
# Venue-truth gate: re-quote ONLY when the venue is provably flat AND
|
|
# no own fill landed in the hot window. Ambiguity → skip; a skipped
|
|
# entry is always safe, a doubled position is not.
|
|
if not self._safe_to_requote():
|
|
self.counters["requote_blocked"] += 1
|
|
self.ports.emit(event="requote_blocked", trade_id=trade_id)
|
|
return
|
|
ref = 0.0
|
|
try:
|
|
ref = float(self.ports.reference_price(wo.asset) or 0.0)
|
|
except Exception: # noqa: BLE001
|
|
ref = 0.0
|
|
if ref <= 0.0:
|
|
ref = float(wo.plan.limit_price or 0.0)
|
|
if action == MissAction.RETRY:
|
|
new_tid, plan = router.retry_plan(wo, reference_price=ref)
|
|
self.counters["entry_retries"] += 1
|
|
else: # MARKET
|
|
new_tid, plan = router.market_fallback_plan(wo)
|
|
self.counters["entry_markets"] += 1
|
|
new_intent = replace(
|
|
base_intent,
|
|
trade_id=new_tid, intent_id=new_tid,
|
|
timestamp=datetime.now(timezone.utc),
|
|
reference_price=(ref if ref > 0.0 else base_intent.reference_price),
|
|
order_type=plan.order_type,
|
|
limit_price=float(plan.limit_price or 0.0),
|
|
metadata={**(base_intent.metadata or {}),
|
|
"_time_in_force": ("PostOnly" if plan.post_only else "GTC"),
|
|
"_exec_reason": plan.reason},
|
|
)
|
|
self.logger.info("exec_driver: entry %s → %s as %s",
|
|
trade_id, action, new_tid)
|
|
await self.ports.submit_intent(new_intent)
|
|
if plan.is_maker:
|
|
self.on_submit(plan, new_intent) # deadline chains
|
|
await self.ports.pump_events()
|
|
|
|
# ── guards / observability ────────────────────────────────────────────────
|
|
|
|
def _safe_to_requote(self) -> bool:
|
|
"""Fail SAFE: recent own fill, any live position, or probe error ⇒ no."""
|
|
if (mono_ns() - int(self.ports.last_own_fill_mono_ns() or 0)
|
|
< self.settings.requote_hot_window_ns):
|
|
return False
|
|
try:
|
|
return bool(self.ports.venue_flat())
|
|
except Exception as exc: # noqa: BLE001
|
|
self.logger.warning("exec_driver: requote probe failed (%s) — fail safe",
|
|
exc)
|
|
return False
|
|
|
|
def snapshot(self) -> Dict[str, Any]:
|
|
return {
|
|
"working": [w.trade_id for w in self.ports.router.working_orders()],
|
|
"pending_deadlines": len(self._deadlines),
|
|
"resolving": len(self._resolving),
|
|
"counters": dict(self.counters),
|
|
"ttl_resolution": self.ttl_resolution_hist.to_dict(),
|
|
}
|
|
|
|
async def drain(self, timeout_s: float = 5.0) -> bool:
|
|
"""Quiesce: True when no pending deadlines / in-flight resolutions /
|
|
working orders remain within the timeout."""
|
|
deadline = mono_ns() + int(timeout_s * 1e9)
|
|
while mono_ns() < deadline:
|
|
if (not self._deadlines and not self._resolving
|
|
and not self.ports.router.working_orders()):
|
|
return True
|
|
await asyncio.sleep(0.005)
|
|
return False
|