Files
siloqy/prod/clean_arch/dita_v2/rust_backend.py
Codex b3b28bb44a PINK: kernel fee prediction + calibration loop
ExchangeFeeConfig in AccountState:
  taker_rate, maker_rate, lot_step, tick_size, funding_interval_secs
  calibration_ratio: EMA of actual/expected, updated on every fill

Kernel now predicts fees at fill time (PREDICTED_FILL event):
  k_capital updated immediately without waiting for WS FILL_SETTLED
  When actual fee arrives, prediction is replaced and ratio recalibrated
  Reconcile delta: 0.000000 (was ~0.9 USDT in canary without prediction)

Calibration loop on connect():
  Fetches recent fill history, validates model vs exchange actuals
  deviation < 1pct -> OK; < 5pct -> WARN; >= 5pct -> ERROR (pre-trade gate)

New FFI: dita_kernel_set_exchange_config_json, dita_kernel_calibrate_fee_json
New ExecutionKernel methods: set_exchange_config(), calibrate_fee()
pink_direct.py: loads BingX fee config on connect, calibrates before stream

131/131 offline pass.
2026-06-01 23:45:50 +02:00

865 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Rust-backed DITAv2 execution kernel.
This module keeps the Python API shape stable while moving the kernel state
machine into a Rust shared library. Slot views write through to the backend on
assignment, then the Python side mirrors the resulting state into Zinc and the
existing projections/journals.
"""
from __future__ import annotations
from dataclasses import asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Sequence
import ctypes
import json
import math
import os
import subprocess
import sys
from .account import AccountProjection
from .control import ControlPlane, ControlUpdate, KernelControlSnapshot, KernelVerbosity, build_control_plane
from .contracts import (
KernelCommandType,
KernelDiagnosticCode,
KernelEventKind,
KernelIntent,
KernelOutcome,
KernelSeverity,
KernelTransition,
TradeSide,
TradeSlot,
TradeStage,
VenueEvent,
VenueOrder,
VenueOrderStatus,
VenueEventStatus,
)
from .journal import KernelJournal, MemoryKernelJournal
from .mock_venue import MockVenueAdapter
from .projection import HazelcastProjection
from .projection import build_projection
from .utils import json_safe
from .venue import VenueAdapter
from .zinc_plane import InMemoryZincPlane, ZincPlane
def _repo_root() -> Path:
return Path(__file__).resolve().parents[3]
def _crate_dir() -> Path:
return Path(__file__).resolve().with_name("_rust_kernel")
def _library_path() -> Path:
if sys.platform == "darwin":
name = "libdita_v2_kernel.dylib"
elif os.name == "nt":
name = "dita_v2_kernel.dll"
else:
name = "libdita_v2_kernel.so"
return _crate_dir() / "target" / "release" / name
def _build_library() -> None:
crate_dir = _crate_dir()
if not crate_dir.exists():
raise FileNotFoundError(f"Missing Rust kernel crate: {crate_dir}")
subprocess.run(
["cargo", "build", "--release", "--manifest-path", str(crate_dir / "Cargo.toml")],
cwd=_repo_root(),
check=True,
)
def _ensure_library() -> Path:
path = _library_path()
if not path.exists():
_build_library()
return path
class _RustKernelLib:
def __init__(self) -> None:
path = _ensure_library()
self.lib = ctypes.CDLL(str(path))
self.lib.dita_kernel_create.argtypes = [ctypes.c_size_t]
self.lib.dita_kernel_create.restype = ctypes.c_void_p
self.lib.dita_kernel_destroy.argtypes = [ctypes.c_void_p]
self.lib.dita_kernel_destroy.restype = None
self.lib.dita_kernel_free_string.argtypes = [ctypes.c_void_p]
self.lib.dita_kernel_free_string.restype = None
self.lib.dita_kernel_get_slot_json.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
self.lib.dita_kernel_get_slot_json.restype = ctypes.c_void_p
self.lib.dita_kernel_set_slot_json.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_char_p]
self.lib.dita_kernel_set_slot_json.restype = ctypes.c_int
self.lib.dita_kernel_process_intent_json.argtypes = [
ctypes.c_void_p,
ctypes.c_char_p,
ctypes.c_char_p,
ctypes.c_char_p,
]
self.lib.dita_kernel_process_intent_json.restype = ctypes.c_void_p
self.lib.dita_kernel_on_venue_event_json.argtypes = [
ctypes.c_void_p,
ctypes.c_char_p,
ctypes.c_char_p,
ctypes.c_char_p,
]
self.lib.dita_kernel_on_venue_event_json.restype = ctypes.c_void_p
self.lib.dita_kernel_reconcile_slots_json.argtypes = [
ctypes.c_void_p,
ctypes.c_char_p,
ctypes.c_char_p,
ctypes.c_char_p,
]
self.lib.dita_kernel_reconcile_slots_json.restype = ctypes.c_void_p
self.lib.dita_kernel_snapshot_json.argtypes = [ctypes.c_void_p]
self.lib.dita_kernel_snapshot_json.restype = ctypes.c_void_p
self.lib.dita_kernel_set_seed_capital.argtypes = [ctypes.c_void_p, ctypes.c_double]
self.lib.dita_kernel_set_seed_capital.restype = ctypes.c_int
self.lib.dita_kernel_set_exchange_config_json.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
self.lib.dita_kernel_set_exchange_config_json.restype = ctypes.c_int
self.lib.dita_kernel_calibrate_fee_json.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
self.lib.dita_kernel_calibrate_fee_json.restype = ctypes.c_void_p
self.lib.dita_kernel_on_account_event_json.argtypes = [
ctypes.c_void_p,
ctypes.c_char_p,
]
self.lib.dita_kernel_on_account_event_json.restype = ctypes.c_void_p
def create(self, max_slots: int) -> ctypes.c_void_p:
handle = self.lib.dita_kernel_create(ctypes.c_size_t(max_slots))
if not handle:
raise RuntimeError("dita_kernel_create failed")
return ctypes.c_void_p(handle)
def destroy(self, handle: ctypes.c_void_p) -> None:
if handle and handle.value:
self.lib.dita_kernel_destroy(handle)
def _take_string(self, raw: ctypes.c_void_p) -> str:
if not raw:
raise RuntimeError("Rust kernel returned null string")
text = ctypes.cast(raw, ctypes.c_char_p).value
if text is None:
self.lib.dita_kernel_free_string(raw)
raise RuntimeError("Rust kernel returned empty string")
try:
return text.decode("utf-8")
finally:
self.lib.dita_kernel_free_string(raw)
def get_slot_json(self, handle: ctypes.c_void_p, slot_id: int) -> Dict[str, Any]:
raw = self.lib.dita_kernel_get_slot_json(handle, ctypes.c_size_t(slot_id))
if not raw:
raise IndexError(f"Invalid slot id: {slot_id}")
return json.loads(self._take_string(raw))
def set_slot_json(self, handle: ctypes.c_void_p, slot_id: int, payload: Dict[str, Any]) -> None:
encoded = json.dumps(json_safe(payload), separators=(",", ":"), ensure_ascii=False).encode("utf-8")
rc = self.lib.dita_kernel_set_slot_json(handle, ctypes.c_size_t(slot_id), ctypes.c_char_p(encoded))
if rc != 0:
raise RuntimeError(f"dita_kernel_set_slot_json failed rc={rc}")
def process_intent(
self,
handle: ctypes.c_void_p,
payload: Dict[str, Any],
*,
mode: str,
verbosity: str,
) -> Dict[str, Any]:
encoded = json.dumps(json_safe(payload), separators=(",", ":"), ensure_ascii=False).encode("utf-8")
raw = self.lib.dita_kernel_process_intent_json(
handle,
ctypes.c_char_p(encoded),
ctypes.c_char_p(mode.encode("utf-8")),
ctypes.c_char_p(verbosity.encode("utf-8")),
)
return json.loads(self._take_string(raw))
def on_venue_event(
self,
handle: ctypes.c_void_p,
payload: Dict[str, Any],
*,
mode: str,
verbosity: str,
) -> Dict[str, Any]:
encoded = json.dumps(json_safe(payload), separators=(",", ":"), ensure_ascii=False).encode("utf-8")
raw = self.lib.dita_kernel_on_venue_event_json(
handle,
ctypes.c_char_p(encoded),
ctypes.c_char_p(mode.encode("utf-8")),
ctypes.c_char_p(verbosity.encode("utf-8")),
)
return json.loads(self._take_string(raw))
def reconcile_slots(
self,
handle: ctypes.c_void_p,
payload: Sequence[Dict[str, Any]],
*,
mode: str,
verbosity: str,
) -> Dict[str, Any]:
encoded = json.dumps(json_safe(list(payload)), separators=(",", ":"), ensure_ascii=False).encode("utf-8")
raw = self.lib.dita_kernel_reconcile_slots_json(
handle,
ctypes.c_char_p(encoded),
ctypes.c_char_p(mode.encode("utf-8")),
ctypes.c_char_p(verbosity.encode("utf-8")),
)
return json.loads(self._take_string(raw))
def snapshot(self, handle: ctypes.c_void_p) -> Dict[str, Any]:
raw = self.lib.dita_kernel_snapshot_json(handle)
return json.loads(self._take_string(raw))
def set_seed_capital(self, handle: ctypes.c_void_p, seed: float) -> bool:
rc = self.lib.dita_kernel_set_seed_capital(handle, ctypes.c_double(seed))
return rc == 0
def set_exchange_config(self, handle: ctypes.c_void_p, config: Dict[str, Any]) -> bool:
encoded = json.dumps(config, separators=(",", ":")).encode("utf-8")
rc = self.lib.dita_kernel_set_exchange_config_json(handle, ctypes.c_char_p(encoded))
return rc == 0
def calibrate_fee(self, handle: ctypes.c_void_p, fill_price: float, fill_qty: float, actual_fee: float) -> Dict[str, Any]:
payload = json.dumps({"fill_price": fill_price, "fill_qty": fill_qty, "actual_fee": actual_fee}).encode("utf-8")
raw = self.lib.dita_kernel_calibrate_fee_json(handle, ctypes.c_char_p(payload))
if not raw:
return {}
return json.loads(self._take_string(raw))
def on_account_event(
self, handle: ctypes.c_void_p, event: Dict[str, Any]
) -> Dict[str, Any]:
encoded = json.dumps(json_safe(event), separators=(",", ":"), ensure_ascii=False).encode("utf-8")
raw = self.lib.dita_kernel_on_account_event_json(handle, ctypes.c_char_p(encoded))
if not raw:
return {}
return json.loads(self._take_string(raw))
_RUST: _RustKernelLib | None = None # lazy init — avoids Rust build on import
def _get_rust() -> _RustKernelLib:
global _RUST
if _RUST is None:
_RUST = _RustKernelLib()
return _RUST
def _slot_to_payload(slot: TradeSlot) -> Dict[str, Any]:
return slot.to_dict()
def _order_to_payload(order: Optional[VenueOrder]) -> Optional[Dict[str, Any]]:
if order is None:
return None
return {
"internal_trade_id": order.internal_trade_id,
"venue_order_id": order.venue_order_id,
"venue_client_id": order.venue_client_id,
"side": order.side.value,
"intended_size": float(order.intended_size or 0.0),
"filled_size": float(order.filled_size or 0.0),
"average_fill_price": float(order.average_fill_price or 0.0),
"status": order.status.value,
"metadata": dict(order.metadata),
}
def _order_from_payload(payload: Optional[Dict[str, Any]], *, trade_id: str) -> Optional[VenueOrder]:
if not isinstance(payload, dict):
return None
return VenueOrder(
internal_trade_id=trade_id,
venue_order_id=str(payload.get("venue_order_id", "")),
venue_client_id=str(payload.get("venue_client_id", "")),
side=TradeSide(str(payload.get("side", TradeSide.FLAT.value))),
intended_size=float(payload.get("intended_size", 0.0)),
filled_size=float(payload.get("filled_size", 0.0)),
average_fill_price=float(payload.get("average_fill_price", 0.0)),
status=VenueOrderStatus(str(payload.get("status", VenueOrderStatus.NEW.value))),
metadata=dict(payload.get("metadata", {})),
)
def _slot_from_payload(payload: Dict[str, Any]) -> TradeSlot:
return TradeSlot(
slot_id=int(payload.get("slot_id", 0)),
trade_id=str(payload.get("trade_id", "")),
asset=str(payload.get("asset", "")),
side=TradeSide(str(payload.get("side", TradeSide.FLAT.value))),
entry_price=float(payload.get("entry_price", 0.0)),
size=float(payload.get("size", 0.0)),
initial_size=float(payload.get("initial_size", 0.0)),
leverage=float(payload.get("leverage", 0.0)),
entry_time=datetime.fromisoformat(payload["entry_time"]) if payload.get("entry_time") else None,
unrealized_pnl=float(payload.get("unrealized_pnl", 0.0)),
realized_pnl=float(payload.get("realized_pnl", 0.0)),
closed=bool(payload.get("closed", False)),
exit_leg_ratios=tuple(float(r) for r in payload.get("exit_leg_ratios", (1.0,))),
active_leg_index=int(payload.get("active_leg_index", 0)),
active_exit_order=_order_from_payload(payload.get("active_exit_order"), trade_id=str(payload.get("trade_id", ""))),
active_entry_order=_order_from_payload(payload.get("active_entry_order"), trade_id=str(payload.get("trade_id", ""))),
fsm_state=TradeStage(str(payload.get("fsm_state", TradeStage.IDLE.value))),
close_reason=str(payload.get("close_reason", "")),
last_event_time=datetime.fromisoformat(payload["last_event_time"]) if payload.get("last_event_time") else None,
seen_event_ids=tuple(str(event_id) for event_id in payload.get("seen_event_ids", ())),
metadata=dict(payload.get("metadata", {})),
)
def _first_invalid_intent_field(intent: KernelIntent) -> Optional[tuple[str, float]]:
"""Return (field, value) for the first non-finite or out-of-bounds numeric
field on an intent, or None if all are sane. Guards the kernel boundary
against inf/NaN that would otherwise crash serde_json serialization."""
scalar_checks = (
("target_size", float(intent.target_size if intent.target_size is not None else 0.0)),
("reference_price", float(intent.reference_price if intent.reference_price is not None else 0.0)),
("leverage", float(intent.leverage if intent.leverage is not None else 0.0)),
("limit_price", float(getattr(intent, "limit_price", 0.0) or 0.0)),
)
for name, value in scalar_checks:
if not math.isfinite(value):
return (name, value)
for idx, ratio in enumerate(intent.exit_leg_ratios or ()): # type: ignore[union-attr]
rv = float(ratio if ratio is not None else 0.0)
if not math.isfinite(rv):
return (f"exit_leg_ratios[{idx}]", rv)
size = float(intent.target_size if intent.target_size is not None else 0.0)
if size < 0.0:
return ("target_size", size)
return None
def _intent_to_payload(intent: KernelIntent) -> Dict[str, Any]:
return {
"timestamp": intent.timestamp.isoformat() if hasattr(intent.timestamp, "isoformat") else str(intent.timestamp),
"intent_id": intent.intent_id,
"trade_id": intent.trade_id,
"slot_id": intent.slot_id,
"asset": intent.asset,
"side": intent.side.value,
"action": intent.action.value,
"reference_price": float(intent.reference_price or 0.0),
"target_size": float(intent.target_size or 0.0),
"leverage": float(intent.leverage or 0.0),
"exit_leg_ratios": list(intent.exit_leg_ratios),
"reason": intent.reason,
"metadata": dict(intent.metadata),
"stage": intent.stage.value,
"order_type": getattr(intent, "order_type", "MARKET"),
"limit_price": float(getattr(intent, "limit_price", 0.0) or 0.0),
}
def _event_to_payload(event: VenueEvent) -> Dict[str, Any]:
return {
"timestamp": event.timestamp.isoformat() if hasattr(event.timestamp, "isoformat") else str(event.timestamp),
"event_id": event.event_id,
"trade_id": event.trade_id,
"slot_id": event.slot_id,
"kind": event.kind.value,
"status": event.status.value,
"venue_order_id": event.venue_order_id,
"venue_client_id": event.venue_client_id,
"side": event.side.value,
"asset": event.asset,
"price": float(event.price or 0.0),
"size": float(event.size or 0.0),
"filled_size": float(event.filled_size or 0.0),
"remaining_size": float(event.remaining_size or 0.0),
"reason": event.reason,
"raw_payload": dict(event.raw_payload),
"metadata": dict(event.metadata),
}
def _transition_from_payload(payload: Dict[str, Any]) -> KernelTransition:
return KernelTransition(
timestamp=datetime.fromisoformat(payload["timestamp"]),
trade_id=str(payload.get("trade_id", "")),
slot_id=int(payload.get("slot_id", 0)),
prev_state=TradeStage(str(payload.get("prev_state", TradeStage.IDLE.value))),
next_state=TradeStage(str(payload.get("next_state", TradeStage.IDLE.value))),
trigger=str(payload.get("trigger", "")),
intent_id=str(payload.get("intent_id", "")),
event_id=str(payload.get("event_id", "")),
control_mode=str(payload.get("control_mode", "")),
control_verbosity=str(payload.get("control_verbosity", "")),
details=dict(payload.get("details", {})),
)
def _outcome_from_payload(payload: Dict[str, Any]) -> KernelOutcome:
return KernelOutcome(
accepted=bool(payload.get("accepted", False)),
slot_id=int(payload.get("slot_id", 0)),
trade_id=str(payload.get("trade_id", "")),
state=TradeStage(str(payload.get("state", TradeStage.IDLE.value))),
diagnostic_code=KernelDiagnosticCode(str(payload.get("diagnostic_code", KernelDiagnosticCode.OK.value))),
severity=KernelSeverity(str(payload.get("severity", KernelSeverity.INFO.value))),
transitions=tuple(_transition_from_payload(row) for row in payload.get("transitions", [])),
emitted_events=tuple(
VenueEvent(
timestamp=datetime.fromisoformat(row["timestamp"]),
event_id=str(row.get("event_id", "")),
trade_id=str(row.get("trade_id", "")),
slot_id=int(row.get("slot_id", 0)),
kind=KernelEventKind(str(row.get("kind", KernelEventKind.ORDER_ACK.value))),
status=VenueEventStatus(str(row.get("status", VenueEventStatus.ACKED.value))),
venue_order_id=str(row.get("venue_order_id", "")),
venue_client_id=str(row.get("venue_client_id", "")),
side=TradeSide(str(row.get("side", TradeSide.FLAT.value))),
asset=str(row.get("asset", "")),
price=float(row.get("price", 0.0)),
size=float(row.get("size", 0.0)),
filled_size=float(row.get("filled_size", 0.0)),
remaining_size=float(row.get("remaining_size", 0.0)),
reason=str(row.get("reason", "")),
raw_payload=dict(row.get("raw_payload", {})),
metadata=dict(row.get("metadata", {})),
)
for row in payload.get("emitted_events", [])
),
details=dict(payload.get("details", {})),
)
def _enum_text(value: Any) -> str:
if hasattr(value, "value"):
return str(getattr(value, "value"))
return str(value)
class KernelSlotView:
"""Write-through view over a Rust-backed slot."""
def __init__(self, kernel: "ExecutionKernel", slot_id: int) -> None:
object.__setattr__(self, "_kernel", kernel)
object.__setattr__(self, "_slot_id", int(slot_id))
@property
def slot_id(self) -> int:
return object.__getattribute__(self, "_slot_id")
def _snapshot(self) -> TradeSlot:
return self._kernel._get_slot(self.slot_id)
def __getattr__(self, name: str) -> Any:
slot = self._snapshot()
if hasattr(slot, name):
return getattr(slot, name)
raise AttributeError(name)
def __setattr__(self, name: str, value: Any) -> None:
if name in {"_kernel", "_slot_id"}:
object.__setattr__(self, name, value)
return
slot = self._snapshot()
if not hasattr(slot, name):
raise AttributeError(name)
setattr(slot, name, value)
self._kernel._set_slot(slot)
def to_dict(self) -> Dict[str, Any]:
return self._snapshot().to_dict()
def is_free(self) -> bool:
return self._snapshot().is_free()
def is_open(self) -> bool:
return self._snapshot().is_open()
def mark_price(self, price: float) -> None:
slot = self._snapshot()
slot.mark_price(price)
self._kernel._set_slot(slot)
def next_exit_ratio(self) -> float:
return self._snapshot().next_exit_ratio()
def consume_exit_leg(self) -> float:
slot = self._snapshot()
ratio = slot.consume_exit_leg()
self._kernel._set_slot(slot)
return ratio
def attach_entry_order(self, order: VenueOrder) -> None:
slot = self._snapshot()
slot.active_entry_order = order
self._kernel._set_slot(slot)
def attach_exit_order(self, order: VenueOrder) -> None:
slot = self._snapshot()
slot.active_exit_order = order
self._kernel._set_slot(slot)
def __repr__(self) -> str: # pragma: no cover - debugging helper
return f"KernelSlotView(slot_id={self.slot_id}, state={self._snapshot().fsm_state.value})"
class KernelStateView:
def __init__(self, kernel: "ExecutionKernel") -> None:
self._kernel = kernel
self.slots = [KernelSlotView(kernel, slot_id) for slot_id in range(kernel.max_slots)]
self.active_trade_index: Dict[str, int] = {}
self.venue_order_index: Dict[str, int] = {}
self.client_order_index: Dict[str, int] = {}
self.refresh()
def refresh(self) -> None:
snapshot = self._kernel._snapshot_backend()
self.active_trade_index = dict(snapshot.get("active_trade_index", {}))
self.venue_order_index = dict(snapshot.get("venue_order_index", {}))
self.client_order_index = dict(snapshot.get("client_order_index", {}))
class ExecutionKernel:
"""Rust-backed multi-slot execution kernel."""
def __init__(
self,
*,
max_slots: int = 10,
control_plane: Optional[ControlPlane] = None,
venue: Optional[VenueAdapter] = None,
journal: Optional[KernelJournal] = None,
account: Optional[AccountProjection] = None,
projection: Optional[HazelcastProjection] = None,
projection_client: Optional[Any] = None,
zinc_plane: Optional[ZincPlane] = None,
) -> None:
self.max_slots = int(max_slots)
self.control_plane = control_plane or build_control_plane()
self.venue = venue or MockVenueAdapter()
self.journal = journal or MemoryKernelJournal()
self.account = account or AccountProjection()
self.projection = projection or build_projection(client=projection_client)
self.zinc_plane = zinc_plane or InMemoryZincPlane()
self._backend = _get_rust().create(self.max_slots)
self._control_snapshot = self.control_plane.read()
self._last_settled_pnl: Dict[int, float] = {}
self.projection.write_control(self._control_snapshot)
self.zinc_plane.update_control(self._control_snapshot)
self.state = KernelStateView(self)
self.account.observe_slots([self._get_slot(slot_id) for slot_id in range(self.max_slots)])
def __del__(self) -> None: # pragma: no cover - cleanup best effort
backend = getattr(self, "_backend", None)
if backend is not None:
try:
_get_rust().destroy(backend)
except Exception:
pass
@property
def control(self) -> KernelControlSnapshot:
return self.control_plane.read()
def update_control(self, update: ControlUpdate) -> KernelControlSnapshot:
snapshot = self.control_plane.update(update)
self._control_snapshot = snapshot
self.projection.write_control(snapshot)
self.zinc_plane.update_control(snapshot)
return snapshot
def _snapshot_backend(self) -> Dict[str, Any]:
return _get_rust().snapshot(self._backend)
def _get_slot(self, slot_id: int) -> TradeSlot:
return _slot_from_payload(_get_rust().get_slot_json(self._backend, slot_id))
def _set_slot(self, slot: TradeSlot, *, journal: bool = False) -> None:
payload = _slot_to_payload(slot)
_get_rust().set_slot_json(self._backend, slot.slot_id, payload)
self.state.refresh()
slots = [self._get_slot(slot_id) for slot_id in range(self.max_slots)]
self.account.observe_slots(slots)
current = self._get_slot(slot.slot_id)
self.projection.write_slot(current)
self.zinc_plane.write_slot(current)
def slot(self, slot_id: int) -> KernelSlotView:
if not (0 <= int(slot_id) < self.max_slots):
raise IndexError(slot_id)
return self.state.slots[int(slot_id)]
def free_slot(self) -> Optional[KernelSlotView]:
for slot in self.state.slots:
if slot.is_free():
return slot
return None
def _record_transitions(self, transitions: Iterable[KernelTransition], slot: TradeSlot, event: Optional[VenueEvent]) -> None:
if self.control.debug_clickhouse_enabled:
for transition in transitions:
self.journal.record_transition(
transition=transition,
slot=slot,
event=event,
control=self.control,
)
def process_intent(self, intent: KernelIntent) -> KernelOutcome:
self.zinc_plane.publish_intent(intent)
if not (0 <= int(intent.slot_id) < self.max_slots):
return KernelOutcome(
accepted=False,
slot_id=int(intent.slot_id),
trade_id=intent.trade_id,
state=TradeStage.IDLE,
diagnostic_code=KernelDiagnosticCode.INVALID_SLOT_ID,
details={"reason": "INVALID_SLOT_ID", "slot_id": int(intent.slot_id), "intent_id": intent.intent_id},
)
# Finiteness / sanity guard at the kernel boundary. A non-finite (inf/NaN)
# numeric field would make the Rust core's serde_json serialization return
# a null string (panic). Reject cleanly with INVALID_INTENT instead, naming
# the offending field + value so the upstream numerical source can be located.
bad_field = _first_invalid_intent_field(intent)
if bad_field is not None:
name, value = bad_field
return KernelOutcome(
accepted=False,
slot_id=int(intent.slot_id),
trade_id=intent.trade_id,
state=self._get_slot(int(intent.slot_id)).fsm_state,
diagnostic_code=KernelDiagnosticCode.INVALID_INTENT,
severity=KernelSeverity.WARNING,
details={
"reason": "INVALID_INTENT",
"field": name,
"value": str(value),
"intent_id": intent.intent_id,
"action": intent.action.value,
"asset": intent.asset,
},
)
payload = _intent_to_payload(intent)
result = _get_rust().process_intent(
self._backend,
payload,
mode=_enum_text(self.control.mode),
verbosity=_enum_text(self.control.verbosity),
)
outcome = _outcome_from_payload(result["outcome"])
self.state.refresh()
if intent.action == KernelCommandType.ENTER and outcome.accepted:
self._last_settled_pnl[intent.slot_id] = 0.0
emitted_events = []
all_venue_transitions: List[KernelTransition] = []
if intent.action in {KernelCommandType.ENTER, KernelCommandType.EXIT}:
emitted_events = self.venue.submit(intent)
for event in emitted_events:
evt_outcome = self.on_venue_event(event)
all_venue_transitions.extend(evt_outcome.transitions)
elif intent.action == KernelCommandType.CANCEL:
slot_view = self.slot(intent.slot_id)
if slot_view.active_exit_order is not None:
emitted_events = self.venue.cancel(slot_view.active_exit_order, reason=intent.reason)
elif slot_view.active_entry_order is not None and slot_view.fsm_state in {
TradeStage.ENTRY_WORKING,
TradeStage.ORDER_REQUESTED,
TradeStage.ORDER_SENT,
TradeStage.IDLE,
}:
emitted_events = self.venue.cancel(slot_view.active_entry_order, reason=intent.reason)
else:
emitted_events = []
for event in emitted_events:
evt_outcome = self.on_venue_event(event)
all_venue_transitions.extend(evt_outcome.transitions)
final_slot = self._get_slot(outcome.slot_id)
rate_limit_event = next((event for event in emitted_events if event.kind == KernelEventKind.RATE_LIMITED), None)
if rate_limit_event is not None:
rate_limit_details = dict(outcome.details)
rate_limit_details.update(
{
"reason": rate_limit_event.reason or "RATE_LIMITED",
"retry_after_ms": int(rate_limit_event.metadata.get("retry_after_ms", 0) or 0),
"venue_event_kind": rate_limit_event.kind.value,
"severity": KernelSeverity.WARNING.value,
"release_eta": "few minutes",
"retryable": True,
}
)
outcome = KernelOutcome(
accepted=False,
slot_id=outcome.slot_id,
trade_id=outcome.trade_id,
state=final_slot.fsm_state,
diagnostic_code=KernelDiagnosticCode.RATE_LIMITED,
severity=KernelSeverity.WARNING,
transitions=outcome.transitions,
emitted_events=outcome.emitted_events,
details=rate_limit_details,
)
all_transitions = list(outcome.transitions) + all_venue_transitions
final_outcome = KernelOutcome(
accepted=outcome.accepted,
slot_id=outcome.slot_id,
trade_id=final_slot.trade_id,
state=final_slot.fsm_state,
diagnostic_code=outcome.diagnostic_code,
transitions=tuple(all_transitions),
emitted_events=tuple(emitted_events),
details=dict(outcome.details),
)
slots = [self._get_slot(i) for i in range(self.max_slots)]
self.account.observe_slots(slots)
current = self._get_slot(final_slot.slot_id)
self.projection.write_slot(current)
self.zinc_plane.write_slot(current)
self._record_transitions(outcome.transitions, final_slot, None)
return final_outcome
def on_venue_event(self, event: VenueEvent) -> KernelOutcome:
result = _get_rust().on_venue_event(
self._backend,
_event_to_payload(event),
mode=_enum_text(self.control.mode),
verbosity=_enum_text(self.control.verbosity),
)
outcome = _outcome_from_payload(result["outcome"])
# An INVALID_* fallback result carries a null slot; fall back to the
# kernel's current slot so settlement/bookkeeping stays consistent.
slot_payload = result.get("slot")
slot = _slot_from_payload(slot_payload) if slot_payload else self._get_slot(int(outcome.slot_id))
self.state.refresh()
incremental_pnl = slot.realized_pnl - self._last_settled_pnl.get(slot.slot_id, 0.0)
if abs(incremental_pnl) > 1e-12:
self.account.settle(incremental_pnl)
self._last_settled_pnl[slot.slot_id] = slot.realized_pnl
slots = [self._get_slot(i) for i in range(self.max_slots)]
self.account.observe_slots(slots)
current = self._get_slot(slot.slot_id)
self.projection.write_slot(current)
self.zinc_plane.write_slot(current)
self._record_transitions(outcome.transitions, slot, event)
return outcome
def mark_price(self, asset: str, price: float) -> None:
for slot in self.state.slots:
if slot.asset == asset and slot.is_open():
slot.mark_price(price)
self.account.observe_slots([self._get_slot(i) for i in range(self.max_slots)])
def reconcile_from_slots(self, slots: Sequence[TradeSlot]) -> KernelOutcome:
payload = [_slot_to_payload(slot) for slot in slots]
result = _get_rust().reconcile_slots(
self._backend,
payload,
mode=_enum_text(self.control.mode),
verbosity=_enum_text(self.control.verbosity),
)
outcome = _outcome_from_payload(result["outcome"])
if not outcome.accepted:
return outcome
self.state.refresh()
slots = [self._get_slot(i) for i in range(self.max_slots)]
self.account.observe_slots(slots)
for current in slots:
self.projection.write_slot(current)
self.zinc_plane.write_slot(current)
return outcome
def set_seed_capital(self, seed: float) -> None:
"""Set the kernel's seed capital for K-value fold. Call once at init."""
_get_rust().set_seed_capital(self._backend, float(seed))
def set_exchange_config(self, config: Dict[str, Any]) -> None:
"""
Load the exchange fee schedule into the kernel's fee prediction model.
config keys (all optional — unset keys keep defaults):
taker_rate float fraction (0.0005 = 0.05%)
maker_rate float fraction (0.0002 = 0.02%)
lot_step float quantity increment
tick_size float price increment
funding_interval_secs int seconds between funding payments (28800 = 8 h)
After this call the kernel can predict fees at fill time so K-capital
tracks E-wallet without waiting for the WS FILL_SETTLED event.
"""
_get_rust().set_exchange_config(self._backend, config)
def calibrate_fee(
self,
fill_price: float,
fill_qty: float,
actual_fee: float,
) -> Dict[str, Any]:
"""
Validate the fee model against one known fill.
Returns:
expected_fee model prediction before calibration
actual_fee exchange-reported value
ratio actual / expected
deviation_pct |ratio - 1| × 100
calibration_status "OK" (<1%) / "WARN" (<5%) / "ERROR" (≥5%)
calibration_ratio updated EMA of actual/expected ratio
calibration_samples fills seen so far
Call once on startup with a recent fill from account history before
enabling live trading. If status == ERROR, the fee model needs manual
review before K-capital figures can be trusted.
"""
return _get_rust().calibrate_fee(self._backend, float(fill_price), float(fill_qty), float(actual_fee))
def on_account_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
"""
Apply an account-level exchange event atomically to the kernel.
event dict must have "kind" in {"FILL_SETTLED","ACCOUNT_UPDATE","FUNDING_FEE"}
plus the relevant numeric fields (see Rust FFI doc).
Returns the resulting account state dict including reconcile_status,
available_capital (E rules when present), k_capital, event_seq.
"""
return _get_rust().on_account_event(self._backend, event)
def snapshot(self) -> Dict[str, Any]:
# Merge kernel Rust snapshot (includes AccountState) with Python state.
rust_snap = _get_rust().snapshot(self._backend)
rust_account = rust_snap.get("account", {})
return {
"control": self.control.as_dict(),
"slots": [self._get_slot(slot.slot_id).to_dict() for slot in self.state.slots],
"account": {
# Legacy fields (Python AccountProjection) — backward compat
"capital": self.account.snapshot.capital,
"equity": self.account.snapshot.equity,
"realized_pnl": self.account.snapshot.realized_pnl,
"unrealized_pnl": self.account.snapshot.unrealized_pnl,
"open_positions": self.account.snapshot.open_positions,
"open_notional": self.account.snapshot.open_notional,
"leverage": self.account.snapshot.leverage,
# V2 — kernel atomic K/E account (E rules; K is parallel check)
"k_capital": rust_account.get("k_capital", self.account.snapshot.capital),
"k_realized_pnl": rust_account.get("k_realized_pnl", self.account.snapshot.realized_pnl),
"k_fees_paid": rust_account.get("k_fees_paid", self.account.snapshot.fees_paid),
"k_funding_net": rust_account.get("k_funding_net", 0.0),
"e_wallet_balance": rust_account.get("e_wallet_balance", 0.0),
"e_available_margin": rust_account.get("e_available_margin", 0.0),
"e_used_margin": rust_account.get("e_used_margin", 0.0),
"e_maint_margin": rust_account.get("e_maint_margin", 0.0),
# available_capital: E rules when present, K as fallback
"available_capital": rust_account.get("available_capital", self.account.snapshot.capital),
"reconcile_status": rust_account.get("reconcile_status", "OK"),
"reconcile_delta": rust_account.get("reconcile_delta", 0.0),
"reconcile_explanation": rust_account.get("reconcile_explanation", ""),
"event_seq": rust_account.get("event_seq", 0),
},
}