PINK: Rust kernel atomic K/E account layer (AccountState + FFI)
AccountState in KernelCore/KernelSnapshot: - K-values: seed_capital, k_realized_pnl, k_fees_paid, k_funding_net - E-facts: e_wallet_balance, e_available_margin, e_used_margin, e_maint_margin - Cached: k_capital, available_capital (E rules when present; K fallback) - Reconcile: OK/WARN(<20)/ERROR(>=20 delta) runs atomically on every event New FFI: dita_kernel_set_seed_capital(handle, seed: f64) -> i32 dita_kernel_on_account_event_json(handle, payload) -> *char Kinds: FILL_SETTLED | ACCOUNT_UPDATE | FUNDING_FEE rust_backend.py: wires set_seed_capital() and on_account_event(); snapshot()[account] exposes both legacy and V2 fields. Smoke-tested: fill->E_update->funding->re-sync all produce correct K/E values and reconcile transitions (OK->OK->WARN->OK). 89/89 offline tests pass.
This commit is contained in:
808
prod/clean_arch/dita_v2/rust_backend.py
Normal file
808
prod/clean_arch/dita_v2/rust_backend.py
Normal file
@@ -0,0 +1,808 @@
|
||||
"""Rust-backed DITAv2 execution kernel.
|
||||
|
||||
This module keeps the Python API shape stable while moving the kernel state
|
||||
machine into a Rust shared library. Slot views write through to the backend on
|
||||
assignment, then the Python side mirrors the resulting state into Zinc and the
|
||||
existing projections/journals.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import asdict
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Optional, Sequence
|
||||
import ctypes
|
||||
import json
|
||||
import math
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
from .account import AccountProjection
|
||||
from .control import ControlPlane, ControlUpdate, KernelControlSnapshot, KernelVerbosity, build_control_plane
|
||||
from .contracts import (
|
||||
KernelCommandType,
|
||||
KernelDiagnosticCode,
|
||||
KernelEventKind,
|
||||
KernelIntent,
|
||||
KernelOutcome,
|
||||
KernelSeverity,
|
||||
KernelTransition,
|
||||
TradeSide,
|
||||
TradeSlot,
|
||||
TradeStage,
|
||||
VenueEvent,
|
||||
VenueOrder,
|
||||
VenueOrderStatus,
|
||||
VenueEventStatus,
|
||||
)
|
||||
from .journal import KernelJournal, MemoryKernelJournal
|
||||
from .mock_venue import MockVenueAdapter
|
||||
from .projection import HazelcastProjection
|
||||
from .projection import build_projection
|
||||
from .utils import json_safe
|
||||
from .venue import VenueAdapter
|
||||
from .zinc_plane import InMemoryZincPlane, ZincPlane
|
||||
|
||||
|
||||
def _repo_root() -> Path:
|
||||
return Path(__file__).resolve().parents[3]
|
||||
|
||||
|
||||
def _crate_dir() -> Path:
|
||||
return Path(__file__).resolve().with_name("_rust_kernel")
|
||||
|
||||
|
||||
def _library_path() -> Path:
|
||||
if sys.platform == "darwin":
|
||||
name = "libdita_v2_kernel.dylib"
|
||||
elif os.name == "nt":
|
||||
name = "dita_v2_kernel.dll"
|
||||
else:
|
||||
name = "libdita_v2_kernel.so"
|
||||
return _crate_dir() / "target" / "release" / name
|
||||
|
||||
|
||||
def _build_library() -> None:
|
||||
crate_dir = _crate_dir()
|
||||
if not crate_dir.exists():
|
||||
raise FileNotFoundError(f"Missing Rust kernel crate: {crate_dir}")
|
||||
subprocess.run(
|
||||
["cargo", "build", "--release", "--manifest-path", str(crate_dir / "Cargo.toml")],
|
||||
cwd=_repo_root(),
|
||||
check=True,
|
||||
)
|
||||
|
||||
|
||||
def _ensure_library() -> Path:
|
||||
path = _library_path()
|
||||
if not path.exists():
|
||||
_build_library()
|
||||
return path
|
||||
|
||||
|
||||
class _RustKernelLib:
|
||||
def __init__(self) -> None:
|
||||
path = _ensure_library()
|
||||
self.lib = ctypes.CDLL(str(path))
|
||||
self.lib.dita_kernel_create.argtypes = [ctypes.c_size_t]
|
||||
self.lib.dita_kernel_create.restype = ctypes.c_void_p
|
||||
self.lib.dita_kernel_destroy.argtypes = [ctypes.c_void_p]
|
||||
self.lib.dita_kernel_destroy.restype = None
|
||||
self.lib.dita_kernel_free_string.argtypes = [ctypes.c_void_p]
|
||||
self.lib.dita_kernel_free_string.restype = None
|
||||
self.lib.dita_kernel_get_slot_json.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
|
||||
self.lib.dita_kernel_get_slot_json.restype = ctypes.c_void_p
|
||||
self.lib.dita_kernel_set_slot_json.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_char_p]
|
||||
self.lib.dita_kernel_set_slot_json.restype = ctypes.c_int
|
||||
self.lib.dita_kernel_process_intent_json.argtypes = [
|
||||
ctypes.c_void_p,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_char_p,
|
||||
]
|
||||
self.lib.dita_kernel_process_intent_json.restype = ctypes.c_void_p
|
||||
self.lib.dita_kernel_on_venue_event_json.argtypes = [
|
||||
ctypes.c_void_p,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_char_p,
|
||||
]
|
||||
self.lib.dita_kernel_on_venue_event_json.restype = ctypes.c_void_p
|
||||
self.lib.dita_kernel_reconcile_slots_json.argtypes = [
|
||||
ctypes.c_void_p,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_char_p,
|
||||
ctypes.c_char_p,
|
||||
]
|
||||
self.lib.dita_kernel_reconcile_slots_json.restype = ctypes.c_void_p
|
||||
self.lib.dita_kernel_snapshot_json.argtypes = [ctypes.c_void_p]
|
||||
self.lib.dita_kernel_snapshot_json.restype = ctypes.c_void_p
|
||||
self.lib.dita_kernel_set_seed_capital.argtypes = [ctypes.c_void_p, ctypes.c_double]
|
||||
self.lib.dita_kernel_set_seed_capital.restype = ctypes.c_int
|
||||
self.lib.dita_kernel_on_account_event_json.argtypes = [
|
||||
ctypes.c_void_p,
|
||||
ctypes.c_char_p,
|
||||
]
|
||||
self.lib.dita_kernel_on_account_event_json.restype = ctypes.c_void_p
|
||||
|
||||
def create(self, max_slots: int) -> ctypes.c_void_p:
|
||||
handle = self.lib.dita_kernel_create(ctypes.c_size_t(max_slots))
|
||||
if not handle:
|
||||
raise RuntimeError("dita_kernel_create failed")
|
||||
return ctypes.c_void_p(handle)
|
||||
|
||||
def destroy(self, handle: ctypes.c_void_p) -> None:
|
||||
if handle and handle.value:
|
||||
self.lib.dita_kernel_destroy(handle)
|
||||
|
||||
def _take_string(self, raw: ctypes.c_void_p) -> str:
|
||||
if not raw:
|
||||
raise RuntimeError("Rust kernel returned null string")
|
||||
text = ctypes.cast(raw, ctypes.c_char_p).value
|
||||
if text is None:
|
||||
self.lib.dita_kernel_free_string(raw)
|
||||
raise RuntimeError("Rust kernel returned empty string")
|
||||
try:
|
||||
return text.decode("utf-8")
|
||||
finally:
|
||||
self.lib.dita_kernel_free_string(raw)
|
||||
|
||||
def get_slot_json(self, handle: ctypes.c_void_p, slot_id: int) -> Dict[str, Any]:
|
||||
raw = self.lib.dita_kernel_get_slot_json(handle, ctypes.c_size_t(slot_id))
|
||||
if not raw:
|
||||
raise IndexError(f"Invalid slot id: {slot_id}")
|
||||
return json.loads(self._take_string(raw))
|
||||
|
||||
def set_slot_json(self, handle: ctypes.c_void_p, slot_id: int, payload: Dict[str, Any]) -> None:
|
||||
encoded = json.dumps(json_safe(payload), separators=(",", ":"), ensure_ascii=False).encode("utf-8")
|
||||
rc = self.lib.dita_kernel_set_slot_json(handle, ctypes.c_size_t(slot_id), ctypes.c_char_p(encoded))
|
||||
if rc != 0:
|
||||
raise RuntimeError(f"dita_kernel_set_slot_json failed rc={rc}")
|
||||
|
||||
def process_intent(
|
||||
self,
|
||||
handle: ctypes.c_void_p,
|
||||
payload: Dict[str, Any],
|
||||
*,
|
||||
mode: str,
|
||||
verbosity: str,
|
||||
) -> Dict[str, Any]:
|
||||
encoded = json.dumps(json_safe(payload), separators=(",", ":"), ensure_ascii=False).encode("utf-8")
|
||||
raw = self.lib.dita_kernel_process_intent_json(
|
||||
handle,
|
||||
ctypes.c_char_p(encoded),
|
||||
ctypes.c_char_p(mode.encode("utf-8")),
|
||||
ctypes.c_char_p(verbosity.encode("utf-8")),
|
||||
)
|
||||
return json.loads(self._take_string(raw))
|
||||
|
||||
def on_venue_event(
|
||||
self,
|
||||
handle: ctypes.c_void_p,
|
||||
payload: Dict[str, Any],
|
||||
*,
|
||||
mode: str,
|
||||
verbosity: str,
|
||||
) -> Dict[str, Any]:
|
||||
encoded = json.dumps(json_safe(payload), separators=(",", ":"), ensure_ascii=False).encode("utf-8")
|
||||
raw = self.lib.dita_kernel_on_venue_event_json(
|
||||
handle,
|
||||
ctypes.c_char_p(encoded),
|
||||
ctypes.c_char_p(mode.encode("utf-8")),
|
||||
ctypes.c_char_p(verbosity.encode("utf-8")),
|
||||
)
|
||||
return json.loads(self._take_string(raw))
|
||||
|
||||
def reconcile_slots(
|
||||
self,
|
||||
handle: ctypes.c_void_p,
|
||||
payload: Sequence[Dict[str, Any]],
|
||||
*,
|
||||
mode: str,
|
||||
verbosity: str,
|
||||
) -> Dict[str, Any]:
|
||||
encoded = json.dumps(json_safe(list(payload)), separators=(",", ":"), ensure_ascii=False).encode("utf-8")
|
||||
raw = self.lib.dita_kernel_reconcile_slots_json(
|
||||
handle,
|
||||
ctypes.c_char_p(encoded),
|
||||
ctypes.c_char_p(mode.encode("utf-8")),
|
||||
ctypes.c_char_p(verbosity.encode("utf-8")),
|
||||
)
|
||||
return json.loads(self._take_string(raw))
|
||||
|
||||
def snapshot(self, handle: ctypes.c_void_p) -> Dict[str, Any]:
|
||||
raw = self.lib.dita_kernel_snapshot_json(handle)
|
||||
return json.loads(self._take_string(raw))
|
||||
|
||||
def set_seed_capital(self, handle: ctypes.c_void_p, seed: float) -> bool:
|
||||
rc = self.lib.dita_kernel_set_seed_capital(handle, ctypes.c_double(seed))
|
||||
return rc == 0
|
||||
|
||||
def on_account_event(
|
||||
self, handle: ctypes.c_void_p, event: Dict[str, Any]
|
||||
) -> Dict[str, Any]:
|
||||
encoded = json.dumps(json_safe(event), separators=(",", ":"), ensure_ascii=False).encode("utf-8")
|
||||
raw = self.lib.dita_kernel_on_account_event_json(handle, ctypes.c_char_p(encoded))
|
||||
if not raw:
|
||||
return {}
|
||||
return json.loads(self._take_string(raw))
|
||||
|
||||
|
||||
_RUST: _RustKernelLib | None = None # lazy init — avoids Rust build on import
|
||||
|
||||
|
||||
def _get_rust() -> _RustKernelLib:
|
||||
global _RUST
|
||||
if _RUST is None:
|
||||
_RUST = _RustKernelLib()
|
||||
return _RUST
|
||||
|
||||
|
||||
def _slot_to_payload(slot: TradeSlot) -> Dict[str, Any]:
|
||||
return slot.to_dict()
|
||||
|
||||
|
||||
def _order_to_payload(order: Optional[VenueOrder]) -> Optional[Dict[str, Any]]:
|
||||
if order is None:
|
||||
return None
|
||||
return {
|
||||
"internal_trade_id": order.internal_trade_id,
|
||||
"venue_order_id": order.venue_order_id,
|
||||
"venue_client_id": order.venue_client_id,
|
||||
"side": order.side.value,
|
||||
"intended_size": float(order.intended_size or 0.0),
|
||||
"filled_size": float(order.filled_size or 0.0),
|
||||
"average_fill_price": float(order.average_fill_price or 0.0),
|
||||
"status": order.status.value,
|
||||
"metadata": dict(order.metadata),
|
||||
}
|
||||
|
||||
|
||||
def _order_from_payload(payload: Optional[Dict[str, Any]], *, trade_id: str) -> Optional[VenueOrder]:
|
||||
if not isinstance(payload, dict):
|
||||
return None
|
||||
return VenueOrder(
|
||||
internal_trade_id=trade_id,
|
||||
venue_order_id=str(payload.get("venue_order_id", "")),
|
||||
venue_client_id=str(payload.get("venue_client_id", "")),
|
||||
side=TradeSide(str(payload.get("side", TradeSide.FLAT.value))),
|
||||
intended_size=float(payload.get("intended_size", 0.0)),
|
||||
filled_size=float(payload.get("filled_size", 0.0)),
|
||||
average_fill_price=float(payload.get("average_fill_price", 0.0)),
|
||||
status=VenueOrderStatus(str(payload.get("status", VenueOrderStatus.NEW.value))),
|
||||
metadata=dict(payload.get("metadata", {})),
|
||||
)
|
||||
|
||||
|
||||
def _slot_from_payload(payload: Dict[str, Any]) -> TradeSlot:
|
||||
return TradeSlot(
|
||||
slot_id=int(payload.get("slot_id", 0)),
|
||||
trade_id=str(payload.get("trade_id", "")),
|
||||
asset=str(payload.get("asset", "")),
|
||||
side=TradeSide(str(payload.get("side", TradeSide.FLAT.value))),
|
||||
entry_price=float(payload.get("entry_price", 0.0)),
|
||||
size=float(payload.get("size", 0.0)),
|
||||
initial_size=float(payload.get("initial_size", 0.0)),
|
||||
leverage=float(payload.get("leverage", 0.0)),
|
||||
entry_time=datetime.fromisoformat(payload["entry_time"]) if payload.get("entry_time") else None,
|
||||
unrealized_pnl=float(payload.get("unrealized_pnl", 0.0)),
|
||||
realized_pnl=float(payload.get("realized_pnl", 0.0)),
|
||||
closed=bool(payload.get("closed", False)),
|
||||
exit_leg_ratios=tuple(float(r) for r in payload.get("exit_leg_ratios", (1.0,))),
|
||||
active_leg_index=int(payload.get("active_leg_index", 0)),
|
||||
active_exit_order=_order_from_payload(payload.get("active_exit_order"), trade_id=str(payload.get("trade_id", ""))),
|
||||
active_entry_order=_order_from_payload(payload.get("active_entry_order"), trade_id=str(payload.get("trade_id", ""))),
|
||||
fsm_state=TradeStage(str(payload.get("fsm_state", TradeStage.IDLE.value))),
|
||||
close_reason=str(payload.get("close_reason", "")),
|
||||
last_event_time=datetime.fromisoformat(payload["last_event_time"]) if payload.get("last_event_time") else None,
|
||||
seen_event_ids=tuple(str(event_id) for event_id in payload.get("seen_event_ids", ())),
|
||||
metadata=dict(payload.get("metadata", {})),
|
||||
)
|
||||
|
||||
|
||||
def _first_invalid_intent_field(intent: KernelIntent) -> Optional[tuple[str, float]]:
|
||||
"""Return (field, value) for the first non-finite or out-of-bounds numeric
|
||||
field on an intent, or None if all are sane. Guards the kernel boundary
|
||||
against inf/NaN that would otherwise crash serde_json serialization."""
|
||||
scalar_checks = (
|
||||
("target_size", float(intent.target_size if intent.target_size is not None else 0.0)),
|
||||
("reference_price", float(intent.reference_price if intent.reference_price is not None else 0.0)),
|
||||
("leverage", float(intent.leverage if intent.leverage is not None else 0.0)),
|
||||
("limit_price", float(getattr(intent, "limit_price", 0.0) or 0.0)),
|
||||
)
|
||||
for name, value in scalar_checks:
|
||||
if not math.isfinite(value):
|
||||
return (name, value)
|
||||
for idx, ratio in enumerate(intent.exit_leg_ratios or ()): # type: ignore[union-attr]
|
||||
rv = float(ratio if ratio is not None else 0.0)
|
||||
if not math.isfinite(rv):
|
||||
return (f"exit_leg_ratios[{idx}]", rv)
|
||||
size = float(intent.target_size if intent.target_size is not None else 0.0)
|
||||
if size < 0.0:
|
||||
return ("target_size", size)
|
||||
return None
|
||||
|
||||
|
||||
def _intent_to_payload(intent: KernelIntent) -> Dict[str, Any]:
|
||||
return {
|
||||
"timestamp": intent.timestamp.isoformat() if hasattr(intent.timestamp, "isoformat") else str(intent.timestamp),
|
||||
"intent_id": intent.intent_id,
|
||||
"trade_id": intent.trade_id,
|
||||
"slot_id": intent.slot_id,
|
||||
"asset": intent.asset,
|
||||
"side": intent.side.value,
|
||||
"action": intent.action.value,
|
||||
"reference_price": float(intent.reference_price or 0.0),
|
||||
"target_size": float(intent.target_size or 0.0),
|
||||
"leverage": float(intent.leverage or 0.0),
|
||||
"exit_leg_ratios": list(intent.exit_leg_ratios),
|
||||
"reason": intent.reason,
|
||||
"metadata": dict(intent.metadata),
|
||||
"stage": intent.stage.value,
|
||||
"order_type": getattr(intent, "order_type", "MARKET"),
|
||||
"limit_price": float(getattr(intent, "limit_price", 0.0) or 0.0),
|
||||
}
|
||||
|
||||
|
||||
def _event_to_payload(event: VenueEvent) -> Dict[str, Any]:
|
||||
return {
|
||||
"timestamp": event.timestamp.isoformat() if hasattr(event.timestamp, "isoformat") else str(event.timestamp),
|
||||
"event_id": event.event_id,
|
||||
"trade_id": event.trade_id,
|
||||
"slot_id": event.slot_id,
|
||||
"kind": event.kind.value,
|
||||
"status": event.status.value,
|
||||
"venue_order_id": event.venue_order_id,
|
||||
"venue_client_id": event.venue_client_id,
|
||||
"side": event.side.value,
|
||||
"asset": event.asset,
|
||||
"price": float(event.price or 0.0),
|
||||
"size": float(event.size or 0.0),
|
||||
"filled_size": float(event.filled_size or 0.0),
|
||||
"remaining_size": float(event.remaining_size or 0.0),
|
||||
"reason": event.reason,
|
||||
"raw_payload": dict(event.raw_payload),
|
||||
"metadata": dict(event.metadata),
|
||||
}
|
||||
|
||||
|
||||
def _transition_from_payload(payload: Dict[str, Any]) -> KernelTransition:
|
||||
return KernelTransition(
|
||||
timestamp=datetime.fromisoformat(payload["timestamp"]),
|
||||
trade_id=str(payload.get("trade_id", "")),
|
||||
slot_id=int(payload.get("slot_id", 0)),
|
||||
prev_state=TradeStage(str(payload.get("prev_state", TradeStage.IDLE.value))),
|
||||
next_state=TradeStage(str(payload.get("next_state", TradeStage.IDLE.value))),
|
||||
trigger=str(payload.get("trigger", "")),
|
||||
intent_id=str(payload.get("intent_id", "")),
|
||||
event_id=str(payload.get("event_id", "")),
|
||||
control_mode=str(payload.get("control_mode", "")),
|
||||
control_verbosity=str(payload.get("control_verbosity", "")),
|
||||
details=dict(payload.get("details", {})),
|
||||
)
|
||||
|
||||
|
||||
def _outcome_from_payload(payload: Dict[str, Any]) -> KernelOutcome:
|
||||
return KernelOutcome(
|
||||
accepted=bool(payload.get("accepted", False)),
|
||||
slot_id=int(payload.get("slot_id", 0)),
|
||||
trade_id=str(payload.get("trade_id", "")),
|
||||
state=TradeStage(str(payload.get("state", TradeStage.IDLE.value))),
|
||||
diagnostic_code=KernelDiagnosticCode(str(payload.get("diagnostic_code", KernelDiagnosticCode.OK.value))),
|
||||
severity=KernelSeverity(str(payload.get("severity", KernelSeverity.INFO.value))),
|
||||
transitions=tuple(_transition_from_payload(row) for row in payload.get("transitions", [])),
|
||||
emitted_events=tuple(
|
||||
VenueEvent(
|
||||
timestamp=datetime.fromisoformat(row["timestamp"]),
|
||||
event_id=str(row.get("event_id", "")),
|
||||
trade_id=str(row.get("trade_id", "")),
|
||||
slot_id=int(row.get("slot_id", 0)),
|
||||
kind=KernelEventKind(str(row.get("kind", KernelEventKind.ORDER_ACK.value))),
|
||||
status=VenueEventStatus(str(row.get("status", VenueEventStatus.ACKED.value))),
|
||||
venue_order_id=str(row.get("venue_order_id", "")),
|
||||
venue_client_id=str(row.get("venue_client_id", "")),
|
||||
side=TradeSide(str(row.get("side", TradeSide.FLAT.value))),
|
||||
asset=str(row.get("asset", "")),
|
||||
price=float(row.get("price", 0.0)),
|
||||
size=float(row.get("size", 0.0)),
|
||||
filled_size=float(row.get("filled_size", 0.0)),
|
||||
remaining_size=float(row.get("remaining_size", 0.0)),
|
||||
reason=str(row.get("reason", "")),
|
||||
raw_payload=dict(row.get("raw_payload", {})),
|
||||
metadata=dict(row.get("metadata", {})),
|
||||
)
|
||||
for row in payload.get("emitted_events", [])
|
||||
),
|
||||
details=dict(payload.get("details", {})),
|
||||
)
|
||||
|
||||
|
||||
def _enum_text(value: Any) -> str:
|
||||
if hasattr(value, "value"):
|
||||
return str(getattr(value, "value"))
|
||||
return str(value)
|
||||
|
||||
|
||||
class KernelSlotView:
|
||||
"""Write-through view over a Rust-backed slot."""
|
||||
|
||||
def __init__(self, kernel: "ExecutionKernel", slot_id: int) -> None:
|
||||
object.__setattr__(self, "_kernel", kernel)
|
||||
object.__setattr__(self, "_slot_id", int(slot_id))
|
||||
|
||||
@property
|
||||
def slot_id(self) -> int:
|
||||
return object.__getattribute__(self, "_slot_id")
|
||||
|
||||
def _snapshot(self) -> TradeSlot:
|
||||
return self._kernel._get_slot(self.slot_id)
|
||||
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
slot = self._snapshot()
|
||||
if hasattr(slot, name):
|
||||
return getattr(slot, name)
|
||||
raise AttributeError(name)
|
||||
|
||||
def __setattr__(self, name: str, value: Any) -> None:
|
||||
if name in {"_kernel", "_slot_id"}:
|
||||
object.__setattr__(self, name, value)
|
||||
return
|
||||
slot = self._snapshot()
|
||||
if not hasattr(slot, name):
|
||||
raise AttributeError(name)
|
||||
setattr(slot, name, value)
|
||||
self._kernel._set_slot(slot)
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return self._snapshot().to_dict()
|
||||
|
||||
def is_free(self) -> bool:
|
||||
return self._snapshot().is_free()
|
||||
|
||||
def is_open(self) -> bool:
|
||||
return self._snapshot().is_open()
|
||||
|
||||
def mark_price(self, price: float) -> None:
|
||||
slot = self._snapshot()
|
||||
slot.mark_price(price)
|
||||
self._kernel._set_slot(slot)
|
||||
|
||||
def next_exit_ratio(self) -> float:
|
||||
return self._snapshot().next_exit_ratio()
|
||||
|
||||
def consume_exit_leg(self) -> float:
|
||||
slot = self._snapshot()
|
||||
ratio = slot.consume_exit_leg()
|
||||
self._kernel._set_slot(slot)
|
||||
return ratio
|
||||
|
||||
def attach_entry_order(self, order: VenueOrder) -> None:
|
||||
slot = self._snapshot()
|
||||
slot.active_entry_order = order
|
||||
self._kernel._set_slot(slot)
|
||||
|
||||
def attach_exit_order(self, order: VenueOrder) -> None:
|
||||
slot = self._snapshot()
|
||||
slot.active_exit_order = order
|
||||
self._kernel._set_slot(slot)
|
||||
|
||||
def __repr__(self) -> str: # pragma: no cover - debugging helper
|
||||
return f"KernelSlotView(slot_id={self.slot_id}, state={self._snapshot().fsm_state.value})"
|
||||
|
||||
|
||||
class KernelStateView:
|
||||
def __init__(self, kernel: "ExecutionKernel") -> None:
|
||||
self._kernel = kernel
|
||||
self.slots = [KernelSlotView(kernel, slot_id) for slot_id in range(kernel.max_slots)]
|
||||
self.active_trade_index: Dict[str, int] = {}
|
||||
self.venue_order_index: Dict[str, int] = {}
|
||||
self.client_order_index: Dict[str, int] = {}
|
||||
self.refresh()
|
||||
|
||||
def refresh(self) -> None:
|
||||
snapshot = self._kernel._snapshot_backend()
|
||||
self.active_trade_index = dict(snapshot.get("active_trade_index", {}))
|
||||
self.venue_order_index = dict(snapshot.get("venue_order_index", {}))
|
||||
self.client_order_index = dict(snapshot.get("client_order_index", {}))
|
||||
|
||||
|
||||
class ExecutionKernel:
|
||||
"""Rust-backed multi-slot execution kernel."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
max_slots: int = 10,
|
||||
control_plane: Optional[ControlPlane] = None,
|
||||
venue: Optional[VenueAdapter] = None,
|
||||
journal: Optional[KernelJournal] = None,
|
||||
account: Optional[AccountProjection] = None,
|
||||
projection: Optional[HazelcastProjection] = None,
|
||||
projection_client: Optional[Any] = None,
|
||||
zinc_plane: Optional[ZincPlane] = None,
|
||||
) -> None:
|
||||
self.max_slots = int(max_slots)
|
||||
self.control_plane = control_plane or build_control_plane()
|
||||
self.venue = venue or MockVenueAdapter()
|
||||
self.journal = journal or MemoryKernelJournal()
|
||||
self.account = account or AccountProjection()
|
||||
self.projection = projection or build_projection(client=projection_client)
|
||||
self.zinc_plane = zinc_plane or InMemoryZincPlane()
|
||||
self._backend = _get_rust().create(self.max_slots)
|
||||
self._control_snapshot = self.control_plane.read()
|
||||
self._last_settled_pnl: Dict[int, float] = {}
|
||||
self.projection.write_control(self._control_snapshot)
|
||||
self.zinc_plane.update_control(self._control_snapshot)
|
||||
self.state = KernelStateView(self)
|
||||
self.account.observe_slots([self._get_slot(slot_id) for slot_id in range(self.max_slots)])
|
||||
|
||||
def __del__(self) -> None: # pragma: no cover - cleanup best effort
|
||||
backend = getattr(self, "_backend", None)
|
||||
if backend is not None:
|
||||
try:
|
||||
_get_rust().destroy(backend)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@property
|
||||
def control(self) -> KernelControlSnapshot:
|
||||
return self.control_plane.read()
|
||||
|
||||
def update_control(self, update: ControlUpdate) -> KernelControlSnapshot:
|
||||
snapshot = self.control_plane.update(update)
|
||||
self._control_snapshot = snapshot
|
||||
self.projection.write_control(snapshot)
|
||||
self.zinc_plane.update_control(snapshot)
|
||||
return snapshot
|
||||
|
||||
def _snapshot_backend(self) -> Dict[str, Any]:
|
||||
return _get_rust().snapshot(self._backend)
|
||||
|
||||
def _get_slot(self, slot_id: int) -> TradeSlot:
|
||||
return _slot_from_payload(_get_rust().get_slot_json(self._backend, slot_id))
|
||||
|
||||
def _set_slot(self, slot: TradeSlot, *, journal: bool = False) -> None:
|
||||
payload = _slot_to_payload(slot)
|
||||
_get_rust().set_slot_json(self._backend, slot.slot_id, payload)
|
||||
self.state.refresh()
|
||||
slots = [self._get_slot(slot_id) for slot_id in range(self.max_slots)]
|
||||
self.account.observe_slots(slots)
|
||||
current = self._get_slot(slot.slot_id)
|
||||
self.projection.write_slot(current)
|
||||
self.zinc_plane.write_slot(current)
|
||||
|
||||
def slot(self, slot_id: int) -> KernelSlotView:
|
||||
if not (0 <= int(slot_id) < self.max_slots):
|
||||
raise IndexError(slot_id)
|
||||
return self.state.slots[int(slot_id)]
|
||||
|
||||
def free_slot(self) -> Optional[KernelSlotView]:
|
||||
for slot in self.state.slots:
|
||||
if slot.is_free():
|
||||
return slot
|
||||
return None
|
||||
|
||||
def _record_transitions(self, transitions: Iterable[KernelTransition], slot: TradeSlot, event: Optional[VenueEvent]) -> None:
|
||||
if self.control.debug_clickhouse_enabled:
|
||||
for transition in transitions:
|
||||
self.journal.record_transition(
|
||||
transition=transition,
|
||||
slot=slot,
|
||||
event=event,
|
||||
control=self.control,
|
||||
)
|
||||
|
||||
def process_intent(self, intent: KernelIntent) -> KernelOutcome:
|
||||
self.zinc_plane.publish_intent(intent)
|
||||
if not (0 <= int(intent.slot_id) < self.max_slots):
|
||||
return KernelOutcome(
|
||||
accepted=False,
|
||||
slot_id=int(intent.slot_id),
|
||||
trade_id=intent.trade_id,
|
||||
state=TradeStage.IDLE,
|
||||
diagnostic_code=KernelDiagnosticCode.INVALID_SLOT_ID,
|
||||
details={"reason": "INVALID_SLOT_ID", "slot_id": int(intent.slot_id), "intent_id": intent.intent_id},
|
||||
)
|
||||
# Finiteness / sanity guard at the kernel boundary. A non-finite (inf/NaN)
|
||||
# numeric field would make the Rust core's serde_json serialization return
|
||||
# a null string (panic). Reject cleanly with INVALID_INTENT instead, naming
|
||||
# the offending field + value so the upstream numerical source can be located.
|
||||
bad_field = _first_invalid_intent_field(intent)
|
||||
if bad_field is not None:
|
||||
name, value = bad_field
|
||||
return KernelOutcome(
|
||||
accepted=False,
|
||||
slot_id=int(intent.slot_id),
|
||||
trade_id=intent.trade_id,
|
||||
state=self._get_slot(int(intent.slot_id)).fsm_state,
|
||||
diagnostic_code=KernelDiagnosticCode.INVALID_INTENT,
|
||||
severity=KernelSeverity.WARNING,
|
||||
details={
|
||||
"reason": "INVALID_INTENT",
|
||||
"field": name,
|
||||
"value": str(value),
|
||||
"intent_id": intent.intent_id,
|
||||
"action": intent.action.value,
|
||||
"asset": intent.asset,
|
||||
},
|
||||
)
|
||||
payload = _intent_to_payload(intent)
|
||||
result = _get_rust().process_intent(
|
||||
self._backend,
|
||||
payload,
|
||||
mode=_enum_text(self.control.mode),
|
||||
verbosity=_enum_text(self.control.verbosity),
|
||||
)
|
||||
outcome = _outcome_from_payload(result["outcome"])
|
||||
self.state.refresh()
|
||||
if intent.action == KernelCommandType.ENTER and outcome.accepted:
|
||||
self._last_settled_pnl[intent.slot_id] = 0.0
|
||||
emitted_events = []
|
||||
all_venue_transitions: List[KernelTransition] = []
|
||||
if intent.action in {KernelCommandType.ENTER, KernelCommandType.EXIT}:
|
||||
emitted_events = self.venue.submit(intent)
|
||||
for event in emitted_events:
|
||||
evt_outcome = self.on_venue_event(event)
|
||||
all_venue_transitions.extend(evt_outcome.transitions)
|
||||
elif intent.action == KernelCommandType.CANCEL:
|
||||
slot_view = self.slot(intent.slot_id)
|
||||
if slot_view.active_exit_order is not None:
|
||||
emitted_events = self.venue.cancel(slot_view.active_exit_order, reason=intent.reason)
|
||||
elif slot_view.active_entry_order is not None and slot_view.fsm_state in {
|
||||
TradeStage.ENTRY_WORKING,
|
||||
TradeStage.ORDER_REQUESTED,
|
||||
TradeStage.ORDER_SENT,
|
||||
TradeStage.IDLE,
|
||||
}:
|
||||
emitted_events = self.venue.cancel(slot_view.active_entry_order, reason=intent.reason)
|
||||
else:
|
||||
emitted_events = []
|
||||
for event in emitted_events:
|
||||
evt_outcome = self.on_venue_event(event)
|
||||
all_venue_transitions.extend(evt_outcome.transitions)
|
||||
|
||||
final_slot = self._get_slot(outcome.slot_id)
|
||||
rate_limit_event = next((event for event in emitted_events if event.kind == KernelEventKind.RATE_LIMITED), None)
|
||||
if rate_limit_event is not None:
|
||||
rate_limit_details = dict(outcome.details)
|
||||
rate_limit_details.update(
|
||||
{
|
||||
"reason": rate_limit_event.reason or "RATE_LIMITED",
|
||||
"retry_after_ms": int(rate_limit_event.metadata.get("retry_after_ms", 0) or 0),
|
||||
"venue_event_kind": rate_limit_event.kind.value,
|
||||
"severity": KernelSeverity.WARNING.value,
|
||||
"release_eta": "few minutes",
|
||||
"retryable": True,
|
||||
}
|
||||
)
|
||||
outcome = KernelOutcome(
|
||||
accepted=False,
|
||||
slot_id=outcome.slot_id,
|
||||
trade_id=outcome.trade_id,
|
||||
state=final_slot.fsm_state,
|
||||
diagnostic_code=KernelDiagnosticCode.RATE_LIMITED,
|
||||
severity=KernelSeverity.WARNING,
|
||||
transitions=outcome.transitions,
|
||||
emitted_events=outcome.emitted_events,
|
||||
details=rate_limit_details,
|
||||
)
|
||||
all_transitions = list(outcome.transitions) + all_venue_transitions
|
||||
final_outcome = KernelOutcome(
|
||||
accepted=outcome.accepted,
|
||||
slot_id=outcome.slot_id,
|
||||
trade_id=final_slot.trade_id,
|
||||
state=final_slot.fsm_state,
|
||||
diagnostic_code=outcome.diagnostic_code,
|
||||
transitions=tuple(all_transitions),
|
||||
emitted_events=tuple(emitted_events),
|
||||
details=dict(outcome.details),
|
||||
)
|
||||
slots = [self._get_slot(i) for i in range(self.max_slots)]
|
||||
self.account.observe_slots(slots)
|
||||
current = self._get_slot(final_slot.slot_id)
|
||||
self.projection.write_slot(current)
|
||||
self.zinc_plane.write_slot(current)
|
||||
self._record_transitions(outcome.transitions, final_slot, None)
|
||||
return final_outcome
|
||||
|
||||
def on_venue_event(self, event: VenueEvent) -> KernelOutcome:
|
||||
result = _get_rust().on_venue_event(
|
||||
self._backend,
|
||||
_event_to_payload(event),
|
||||
mode=_enum_text(self.control.mode),
|
||||
verbosity=_enum_text(self.control.verbosity),
|
||||
)
|
||||
outcome = _outcome_from_payload(result["outcome"])
|
||||
# An INVALID_* fallback result carries a null slot; fall back to the
|
||||
# kernel's current slot so settlement/bookkeeping stays consistent.
|
||||
slot_payload = result.get("slot")
|
||||
slot = _slot_from_payload(slot_payload) if slot_payload else self._get_slot(int(outcome.slot_id))
|
||||
self.state.refresh()
|
||||
incremental_pnl = slot.realized_pnl - self._last_settled_pnl.get(slot.slot_id, 0.0)
|
||||
if abs(incremental_pnl) > 1e-12:
|
||||
self.account.settle(incremental_pnl)
|
||||
self._last_settled_pnl[slot.slot_id] = slot.realized_pnl
|
||||
slots = [self._get_slot(i) for i in range(self.max_slots)]
|
||||
self.account.observe_slots(slots)
|
||||
current = self._get_slot(slot.slot_id)
|
||||
self.projection.write_slot(current)
|
||||
self.zinc_plane.write_slot(current)
|
||||
self._record_transitions(outcome.transitions, slot, event)
|
||||
return outcome
|
||||
|
||||
def mark_price(self, asset: str, price: float) -> None:
|
||||
for slot in self.state.slots:
|
||||
if slot.asset == asset and slot.is_open():
|
||||
slot.mark_price(price)
|
||||
self.account.observe_slots([self._get_slot(i) for i in range(self.max_slots)])
|
||||
|
||||
def reconcile_from_slots(self, slots: Sequence[TradeSlot]) -> KernelOutcome:
|
||||
payload = [_slot_to_payload(slot) for slot in slots]
|
||||
result = _get_rust().reconcile_slots(
|
||||
self._backend,
|
||||
payload,
|
||||
mode=_enum_text(self.control.mode),
|
||||
verbosity=_enum_text(self.control.verbosity),
|
||||
)
|
||||
outcome = _outcome_from_payload(result["outcome"])
|
||||
if not outcome.accepted:
|
||||
return outcome
|
||||
self.state.refresh()
|
||||
slots = [self._get_slot(i) for i in range(self.max_slots)]
|
||||
self.account.observe_slots(slots)
|
||||
for current in slots:
|
||||
self.projection.write_slot(current)
|
||||
self.zinc_plane.write_slot(current)
|
||||
return outcome
|
||||
|
||||
def set_seed_capital(self, seed: float) -> None:
|
||||
"""Set the kernel's seed capital for K-value fold. Call once at init."""
|
||||
_get_rust().set_seed_capital(self._backend, float(seed))
|
||||
|
||||
def on_account_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Apply an account-level exchange event atomically to the kernel.
|
||||
|
||||
event dict must have "kind" in {"FILL_SETTLED","ACCOUNT_UPDATE","FUNDING_FEE"}
|
||||
plus the relevant numeric fields (see Rust FFI doc).
|
||||
|
||||
Returns the resulting account state dict including reconcile_status,
|
||||
available_capital (E rules when present), k_capital, event_seq.
|
||||
"""
|
||||
return _get_rust().on_account_event(self._backend, event)
|
||||
|
||||
def snapshot(self) -> Dict[str, Any]:
|
||||
# Merge kernel Rust snapshot (includes AccountState) with Python state.
|
||||
rust_snap = _get_rust().snapshot(self._backend)
|
||||
rust_account = rust_snap.get("account", {})
|
||||
return {
|
||||
"control": self.control.as_dict(),
|
||||
"slots": [self._get_slot(slot.slot_id).to_dict() for slot in self.state.slots],
|
||||
"account": {
|
||||
# Legacy fields (Python AccountProjection) — backward compat
|
||||
"capital": self.account.snapshot.capital,
|
||||
"equity": self.account.snapshot.equity,
|
||||
"realized_pnl": self.account.snapshot.realized_pnl,
|
||||
"unrealized_pnl": self.account.snapshot.unrealized_pnl,
|
||||
"open_positions": self.account.snapshot.open_positions,
|
||||
"open_notional": self.account.snapshot.open_notional,
|
||||
"leverage": self.account.snapshot.leverage,
|
||||
# V2 — kernel atomic K/E account (E rules; K is parallel check)
|
||||
"k_capital": rust_account.get("k_capital", self.account.snapshot.capital),
|
||||
"k_realized_pnl": rust_account.get("k_realized_pnl", self.account.snapshot.realized_pnl),
|
||||
"k_fees_paid": rust_account.get("k_fees_paid", self.account.snapshot.fees_paid),
|
||||
"k_funding_net": rust_account.get("k_funding_net", 0.0),
|
||||
"e_wallet_balance": rust_account.get("e_wallet_balance", 0.0),
|
||||
"e_available_margin": rust_account.get("e_available_margin", 0.0),
|
||||
"e_used_margin": rust_account.get("e_used_margin", 0.0),
|
||||
"e_maint_margin": rust_account.get("e_maint_margin", 0.0),
|
||||
# available_capital: E rules when present, K as fallback
|
||||
"available_capital": rust_account.get("available_capital", self.account.snapshot.capital),
|
||||
"reconcile_status": rust_account.get("reconcile_status", "OK"),
|
||||
"reconcile_delta": rust_account.get("reconcile_delta", 0.0),
|
||||
"reconcile_explanation": rust_account.get("reconcile_explanation", ""),
|
||||
"event_seq": rust_account.get("event_seq", 0),
|
||||
},
|
||||
}
|
||||
Reference in New Issue
Block a user