VIOLET V2b: ScriptedVenue + ExecDeadlineDriver (event-driven TTL @sub-second)
exec_driver.py lifts PINK's TTL-resolution logic (_exec_after_submit / _handle_expired_working, live-proven) into a standalone driver with injected ports (router/submit/pump/slot_view/venue_flat/ref-price) and replaces the 1s polling sweep with one DeadlineScheduler deadline per working order. The driver is the TIMING authority (router clamps TTLs >=0.5s — its internal deadline is vestigial here); the router stays the POLICY authority. R1 preserved verbatim: exit TTL -> MARKET escalation on the SAME trade_id; post-only reject -> schedule_in(0) through the one shared resolution path; venue-truth requote gate fails safe. scripted_venue.py subclasses MockVenueAdapter (zero shared edits) with per-trade directives: IMMEDIATE_FILL / REST_THEN_FILL / REST_THEN_EXPIRE / POST_ONLY_REJECT / CANCEL_REJECT / FILL_RACES_CANCEL; deferred fills release through reconcile() (the production pump seam), never the 50ms subscribe() poll. 17 new tests incl. full-kernel CANCEL->venue mapping, fill-races-cancel, bounded retry chains, on_fill deadline cancellation, fail-safe probe. Router 77 green untouched; shared files clean. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
345
prod/clean_arch/violet/exec_driver.py
Normal file
345
prod/clean_arch/violet/exec_driver.py
Normal file
@@ -0,0 +1,345 @@
|
||||
"""VIOLET V2: ExecDeadlineDriver — event-driven TTL resolution at 100 ms.
|
||||
|
||||
Lifts the PINK TTL-resolution logic (pink_direct.py `_exec_after_submit` /
|
||||
`_exec_ttl_loop` / `_handle_expired_working`, proven live) out of the
|
||||
runtime into a standalone driver with injected ports, and replaces the 1 s
|
||||
polling sweep with ONE DeadlineScheduler deadline PER WORKING ORDER —
|
||||
sub-second TTLs with measured jitter instead of polling quantization.
|
||||
|
||||
Timing authority note: the shared ExecutionRouter clamps TTLs to >= 0.5 s
|
||||
(ExecConfig.from_env [0.5, 300] + register_working's max(0.5, ttl_s)
|
||||
floor). This driver therefore NEVER polls router.expired(); it schedules
|
||||
its own deadlines from ExecDriverSettings.ttl_ms_for(plan.ttl_s) and uses
|
||||
the router purely as policy authority (plans, miss policy, retry/fallback,
|
||||
registry pop-semantics). The router's internal WorkingOrder.deadline is
|
||||
vestigial here.
|
||||
|
||||
Concurrency: asyncio single-threaded cooperative. Re-entrancy between a
|
||||
deadline fire and a fill notification is handled by (a) the `_resolving`
|
||||
single-flight set, (b) the router's pop-semantics (`note_fill`/`note_cancel`
|
||||
make double resolution a no-op), and (c) a state re-check after every await
|
||||
(fills race cancels on a live venue — same doctrine as pink_direct).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from dataclasses import dataclass, field, replace
|
||||
from datetime import datetime, timezone
|
||||
from typing import Any, Awaitable, Callable, Dict, Optional, Set, Tuple
|
||||
|
||||
from prod.clean_arch.dita_v2.contracts import (
|
||||
KernelCommandType,
|
||||
KernelIntent,
|
||||
TradeSide,
|
||||
)
|
||||
from prod.clean_arch.dita_v2.exec_router import ExecutionRouter, MissAction
|
||||
|
||||
from .clock import Deadline, DeadlineScheduler, LatencyHistogram, mono_ns
|
||||
from .domain import ExecDriverSettings, typed
|
||||
|
||||
LOGGER = logging.getLogger("violet.exec_driver")
|
||||
|
||||
SlotView = Tuple[str, str, float] # (trade_id, fsm_state_name, size)
|
||||
|
||||
# Copied from pink_direct._SLOT_OPENISH (9 string constants — a copy, not a
|
||||
# runtime import: the driver must not couple to PinkDirectRuntime).
|
||||
SLOT_OPENISH = ("PARTIAL_FILL", "POSITION_OPENED", "POSITION_OPEN",
|
||||
"EXIT_REQUESTED", "EXIT_SENT", "EXIT_ACKED", "EXIT_WORKING",
|
||||
"POSITION_PARTIALLY_CLOSED")
|
||||
SLOT_CLOSEDISH = ("POSITION_CLOSED", "CLOSED", "TRADE_TERMINAL_WRITTEN", "IDLE")
|
||||
|
||||
|
||||
@dataclass
|
||||
class ExecDriverPorts:
|
||||
"""Everything the driver needs from its runtime, injected."""
|
||||
|
||||
router: ExecutionRouter
|
||||
submit_intent: Callable[[KernelIntent], Awaitable[Any]]
|
||||
pump_events: Callable[[], Awaitable[int]]
|
||||
slot_view: Callable[[], SlotView]
|
||||
venue_flat: Callable[[], bool] # raises ⇒ treated as NOT flat
|
||||
last_own_fill_mono_ns: Callable[[], int]
|
||||
reference_price: Callable[[str], float] # asset → freshest px, 0.0 unknown
|
||||
exit_intent_fixup: Callable[[KernelIntent], KernelIntent] = lambda i: i
|
||||
emit: Callable[..., None] = lambda **kw: None # telemetry sink
|
||||
|
||||
|
||||
class ExecDeadlineDriver:
|
||||
"""One instance per runtime; single-slot scope (max_slots=1)."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
ports: ExecDriverPorts,
|
||||
scheduler: DeadlineScheduler,
|
||||
*,
|
||||
settings: Optional[ExecDriverSettings] = None,
|
||||
ttl_resolution_hist: Optional[LatencyHistogram] = None,
|
||||
logger: logging.Logger = LOGGER,
|
||||
) -> None:
|
||||
self.ports = ports
|
||||
self.scheduler = scheduler
|
||||
self.settings = settings or ExecDriverSettings()
|
||||
self.ttl_resolution_hist = ttl_resolution_hist or LatencyHistogram(
|
||||
"ttl_resolution")
|
||||
self.logger = logger
|
||||
self._working_intents: Dict[str, KernelIntent] = {}
|
||||
self._deadlines: Dict[str, Deadline] = {}
|
||||
self._resolving: Set[str] = set()
|
||||
self.counters: Dict[str, int] = {
|
||||
"immediate_fills": 0, "working_registered": 0, "deadline_fires": 0,
|
||||
"fills_after_ttl": 0, "exit_market_fallbacks": 0,
|
||||
"entry_skips": 0, "entry_retries": 0, "entry_markets": 0,
|
||||
"requote_blocked": 0, "resolve_errors": 0,
|
||||
}
|
||||
|
||||
# ── submit-side ───────────────────────────────────────────────────────────
|
||||
|
||||
@typed
|
||||
def on_submit(self, plan: Any, intent: KernelIntent) -> None:
|
||||
"""Classify a maker submit: filled now, resting, or rejected.
|
||||
|
||||
Resting and rejected both register and get a deadline — a rejected
|
||||
post-only quote gets schedule_in(0) so the SAME resolution path
|
||||
(cancel → re-classify → miss/fallback) handles it immediately.
|
||||
"""
|
||||
router = self.ports.router
|
||||
tid = intent.trade_id
|
||||
slot_tid, stage, size = self.ports.slot_view()
|
||||
filled = (
|
||||
(plan.action == "ENTER" and slot_tid == tid and size > 0.0
|
||||
and stage in SLOT_OPENISH)
|
||||
or (plan.action == "EXIT" and (
|
||||
slot_tid != tid or stage in SLOT_CLOSEDISH))
|
||||
)
|
||||
if filled:
|
||||
self.counters["immediate_fills"] += 1
|
||||
self.ports.emit(event="immediate_fill", action=plan.action,
|
||||
trade_id=tid, reason=plan.reason)
|
||||
return
|
||||
rejected = stage in ("ORDER_REJECTED", "EXIT_REJECTED")
|
||||
router.register_working(
|
||||
trade_id=tid, asset=intent.asset,
|
||||
position_side=(getattr(intent.side, "value", None) or str(intent.side)),
|
||||
plan=plan,
|
||||
base_trade_id=plan.metadata.get("base_trade_id") or tid,
|
||||
retry_n=int(plan.metadata.get("retry_n", 0) or 0),
|
||||
)
|
||||
self._working_intents[tid] = intent
|
||||
self.counters["working_registered"] += 1
|
||||
ttl_ms = 0.0 if rejected else self.settings.ttl_ms_for(plan.ttl_s)
|
||||
self._deadlines[tid] = self.scheduler.schedule_in(
|
||||
ttl_ms, lambda t=tid: self._fire(t), name=f"ttl:{tid}")
|
||||
self.ports.emit(event="working", action=plan.action, trade_id=tid,
|
||||
limit_price=plan.limit_price, ttl_ms=ttl_ms,
|
||||
rejected=rejected, reason=plan.reason)
|
||||
|
||||
# ── fill/cancel notifications (the polling loop never needed these) ──────
|
||||
|
||||
@typed
|
||||
def on_fill(self, trade_id: str) -> None:
|
||||
dl = self._deadlines.pop(trade_id, None)
|
||||
if dl is not None:
|
||||
dl.cancel()
|
||||
self.ports.router.note_fill(trade_id)
|
||||
self._working_intents.pop(trade_id, None)
|
||||
|
||||
@typed
|
||||
def on_cancel(self, trade_id: str) -> None:
|
||||
dl = self._deadlines.pop(trade_id, None)
|
||||
if dl is not None:
|
||||
dl.cancel()
|
||||
self.ports.router.note_cancel(trade_id)
|
||||
self._working_intents.pop(trade_id, None)
|
||||
|
||||
# ── deadline path ─────────────────────────────────────────────────────────
|
||||
|
||||
def _fire(self, trade_id: str) -> None:
|
||||
"""DeadlineScheduler callback: sync + non-blocking by contract."""
|
||||
fire_ns = mono_ns()
|
||||
self.counters["deadline_fires"] += 1
|
||||
if trade_id in self._resolving:
|
||||
return
|
||||
self._resolving.add(trade_id)
|
||||
asyncio.get_running_loop().create_task(
|
||||
self._resolve_expired(trade_id, fire_ns),
|
||||
name=f"violet_exec_resolve:{trade_id}",
|
||||
)
|
||||
|
||||
async def _resolve_expired(self, trade_id: str, fire_ns: int) -> None:
|
||||
try:
|
||||
await self._resolve_expired_inner(trade_id, fire_ns)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
self.counters["resolve_errors"] += 1
|
||||
self.logger.error("exec_driver: resolution failed for %s: %s",
|
||||
trade_id, exc, exc_info=True)
|
||||
finally:
|
||||
self._resolving.discard(trade_id)
|
||||
self._deadlines.pop(trade_id, None)
|
||||
|
||||
async def _resolve_expired_inner(self, trade_id: str, fire_ns: int) -> None:
|
||||
router = self.ports.router
|
||||
wo = router.working(trade_id)
|
||||
if wo is None:
|
||||
return # already resolved (fill/cancel notification raced us)
|
||||
|
||||
# 1. Drain late venue events — the quote may already be filled.
|
||||
await self.ports.pump_events()
|
||||
if router.working(trade_id) is None:
|
||||
return
|
||||
|
||||
def _entry_filled() -> bool:
|
||||
slot_tid, stage, size = self.ports.slot_view()
|
||||
return slot_tid == trade_id and size > 0.0 and stage in SLOT_OPENISH
|
||||
|
||||
def _exit_done() -> bool:
|
||||
slot_tid, stage, size = self.ports.slot_view()
|
||||
return (slot_tid != trade_id or size <= 0.0
|
||||
or stage in SLOT_CLOSEDISH)
|
||||
|
||||
# 2. Cancel the quote (idempotent; CANCEL_REJECT on a filled order is
|
||||
# harmless). ttl_resolution = fire → cancel-intent handed to the
|
||||
# kernel: the gated V2 latency metric.
|
||||
base = self._working_intents.get(trade_id)
|
||||
cancel_intent = KernelIntent(
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
intent_id=f"{trade_id}-ttlcxl",
|
||||
trade_id=trade_id,
|
||||
slot_id=0,
|
||||
asset=wo.asset,
|
||||
side=(base.side if base is not None else TradeSide.SHORT),
|
||||
action=KernelCommandType.CANCEL,
|
||||
reference_price=(base.reference_price if base is not None else 0.0),
|
||||
target_size=(base.target_size if base is not None else 0.0),
|
||||
leverage=(base.leverage if base is not None else 1.0),
|
||||
reason="exec_driver:ttl_expired",
|
||||
)
|
||||
self.ttl_resolution_hist.record(mono_ns() - fire_ns)
|
||||
try:
|
||||
await self.ports.submit_intent(cancel_intent)
|
||||
except Exception as exc: # noqa: BLE001
|
||||
self.logger.warning("exec_driver: ttl-cancel %s failed: %s",
|
||||
trade_id, exc)
|
||||
await self.ports.pump_events()
|
||||
|
||||
# 3. Re-classify after the cancel round-trip (fill may have raced it).
|
||||
if ((wo.action == "ENTER" and _entry_filled())
|
||||
or (wo.action == "EXIT" and _exit_done())):
|
||||
router.note_fill(trade_id)
|
||||
self._working_intents.pop(trade_id, None)
|
||||
self.counters["fills_after_ttl"] += 1
|
||||
self.ports.emit(event="fill_after_ttl", trade_id=trade_id)
|
||||
return
|
||||
|
||||
router.note_cancel(trade_id)
|
||||
base_intent = self._working_intents.pop(trade_id, None)
|
||||
|
||||
# 4. EXIT: never strand a position — MARKET escalation, SAME trade_id.
|
||||
if wo.action == "EXIT":
|
||||
_tid, plan = router.market_fallback_plan(wo)
|
||||
if base_intent is not None and not _exit_done():
|
||||
market_exit = replace(
|
||||
base_intent,
|
||||
intent_id=f"{trade_id}-mkt",
|
||||
order_type="MARKET", limit_price=0.0,
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
metadata={**(base_intent.metadata or {}),
|
||||
"_time_in_force": "GTC",
|
||||
"_exec_reason": plan.reason},
|
||||
)
|
||||
market_exit = self.ports.exit_intent_fixup(market_exit)
|
||||
self.counters["exit_market_fallbacks"] += 1
|
||||
self.logger.warning("exec_driver: exit TTL → MARKET fallback %s",
|
||||
trade_id)
|
||||
self.ports.emit(event="exit_market_fallback", trade_id=trade_id)
|
||||
await self.ports.submit_intent(market_exit)
|
||||
await self.ports.pump_events()
|
||||
return
|
||||
|
||||
# 5. ENTER miss policy: skip | retry (bounded) | market.
|
||||
action = router.entry_miss_action(wo)
|
||||
self.ports.emit(event="entry_miss", trade_id=trade_id, action=action,
|
||||
retry_n=wo.retry_n)
|
||||
if action == MissAction.SKIP or base_intent is None:
|
||||
self.counters["entry_skips"] += 1
|
||||
return
|
||||
slot_tid, stage, size = self.ports.slot_view()
|
||||
if size > 0.0 or stage in SLOT_OPENISH:
|
||||
# Slot occupied (raced fill of remainder / another trade) — never
|
||||
# double-enter.
|
||||
self.counters["entry_skips"] += 1
|
||||
self.logger.warning("exec_driver: entry miss %s: slot busy (%s) — skip",
|
||||
trade_id, stage)
|
||||
return
|
||||
# Venue-truth gate: re-quote ONLY when the venue is provably flat AND
|
||||
# no own fill landed in the hot window. Ambiguity → skip; a skipped
|
||||
# entry is always safe, a doubled position is not.
|
||||
if not self._safe_to_requote():
|
||||
self.counters["requote_blocked"] += 1
|
||||
self.ports.emit(event="requote_blocked", trade_id=trade_id)
|
||||
return
|
||||
ref = 0.0
|
||||
try:
|
||||
ref = float(self.ports.reference_price(wo.asset) or 0.0)
|
||||
except Exception: # noqa: BLE001
|
||||
ref = 0.0
|
||||
if ref <= 0.0:
|
||||
ref = float(wo.plan.limit_price or 0.0)
|
||||
if action == MissAction.RETRY:
|
||||
new_tid, plan = router.retry_plan(wo, reference_price=ref)
|
||||
self.counters["entry_retries"] += 1
|
||||
else: # MARKET
|
||||
new_tid, plan = router.market_fallback_plan(wo)
|
||||
self.counters["entry_markets"] += 1
|
||||
new_intent = replace(
|
||||
base_intent,
|
||||
trade_id=new_tid, intent_id=new_tid,
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
reference_price=(ref if ref > 0.0 else base_intent.reference_price),
|
||||
order_type=plan.order_type,
|
||||
limit_price=float(plan.limit_price or 0.0),
|
||||
metadata={**(base_intent.metadata or {}),
|
||||
"_time_in_force": ("PostOnly" if plan.post_only else "GTC"),
|
||||
"_exec_reason": plan.reason},
|
||||
)
|
||||
self.logger.info("exec_driver: entry %s → %s as %s",
|
||||
trade_id, action, new_tid)
|
||||
await self.ports.submit_intent(new_intent)
|
||||
if plan.is_maker:
|
||||
self.on_submit(plan, new_intent) # deadline chains
|
||||
await self.ports.pump_events()
|
||||
|
||||
# ── guards / observability ────────────────────────────────────────────────
|
||||
|
||||
def _safe_to_requote(self) -> bool:
|
||||
"""Fail SAFE: recent own fill, any live position, or probe error ⇒ no."""
|
||||
if (mono_ns() - int(self.ports.last_own_fill_mono_ns() or 0)
|
||||
< self.settings.requote_hot_window_ns):
|
||||
return False
|
||||
try:
|
||||
return bool(self.ports.venue_flat())
|
||||
except Exception as exc: # noqa: BLE001
|
||||
self.logger.warning("exec_driver: requote probe failed (%s) — fail safe",
|
||||
exc)
|
||||
return False
|
||||
|
||||
def snapshot(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"working": [w.trade_id for w in self.ports.router.working_orders()],
|
||||
"pending_deadlines": len(self._deadlines),
|
||||
"resolving": len(self._resolving),
|
||||
"counters": dict(self.counters),
|
||||
"ttl_resolution": self.ttl_resolution_hist.to_dict(),
|
||||
}
|
||||
|
||||
async def drain(self, timeout_s: float = 5.0) -> bool:
|
||||
"""Quiesce: True when no pending deadlines / in-flight resolutions /
|
||||
working orders remain within the timeout."""
|
||||
deadline = mono_ns() + int(timeout_s * 1e9)
|
||||
while mono_ns() < deadline:
|
||||
if (not self._deadlines and not self._resolving
|
||||
and not self.ports.router.working_orders()):
|
||||
return True
|
||||
await asyncio.sleep(0.005)
|
||||
return False
|
||||
Reference in New Issue
Block a user