Two bugs causing INVALID_INTENT_PARSE at FFI boundary: 1. Dangling pointer: ctypes.c_char_p stores a raw C pointer without incrementing the Python refcount. Temporaries passed inline are freed by CPython before the Rust FFI call executes, giving Rust a dangling pointer whose freed memory looks like truncated JSON (column 41). Fix: assign bytes to local vars (_pb/_mb/_vb) to hold refs alive. 2. venue.submit guard: process_intent() called venue.submit() even when the kernel returned INVALID_INTENT, cascading a 30s BingX timeout into a fatal crash. Fix: gate on outcome.accepted. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1018 lines
43 KiB
Python
1018 lines
43 KiB
Python
"""Rust-backed DITAv2 execution kernel.
|
||
|
||
This module keeps the Python API shape stable while moving the kernel state
|
||
machine into a Rust shared library. Slot views write through to the backend on
|
||
assignment, then the Python side mirrors the resulting state into Zinc and the
|
||
existing projections/journals.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from dataclasses import asdict
|
||
from datetime import datetime, timezone
|
||
from pathlib import Path
|
||
from typing import Any, Dict, Iterable, List, Optional, Sequence
|
||
import ctypes
|
||
import json
|
||
import math
|
||
import os
|
||
import subprocess
|
||
import sys
|
||
|
||
from .account import AccountProjection
|
||
from .control import ControlPlane, ControlUpdate, KernelControlSnapshot, KernelVerbosity, build_control_plane
|
||
from .contracts import (
|
||
KernelCommandType,
|
||
KernelDiagnosticCode,
|
||
KernelEventKind,
|
||
KernelIntent,
|
||
KernelOutcome,
|
||
KernelSeverity,
|
||
KernelTransition,
|
||
TradeSide,
|
||
TradeSlot,
|
||
TradeStage,
|
||
VenueEvent,
|
||
VenueOrder,
|
||
VenueOrderStatus,
|
||
VenueEventStatus,
|
||
)
|
||
from .journal import KernelJournal, MemoryKernelJournal
|
||
from .mock_venue import MockVenueAdapter
|
||
from .projection import HazelcastProjection
|
||
from .projection import build_projection
|
||
from .utils import json_safe
|
||
from .venue import VenueAdapter
|
||
from .zinc_plane import InMemoryZincPlane, ZincPlane
|
||
|
||
|
||
def _repo_root() -> Path:
|
||
return Path(__file__).resolve().parents[3]
|
||
|
||
|
||
# ── Rust FFI JSON encoding ────────────────────────────────────────────────────
|
||
#
|
||
# All JSON that crosses the Python→Rust boundary via ctypes.c_char_p MUST be
|
||
# null-byte-free. ctypes.c_char_p is a C null-terminated string: the first
|
||
# 0x00 byte silently terminates the string, so Rust's serde_json parser only
|
||
# sees a truncated payload → "premature end of input at column N".
|
||
#
|
||
# Root cause: json.dumps(..., ensure_ascii=False).encode("utf-8") can produce
|
||
# 0x00 bytes when any string value contains the Unicode null character U+0000
|
||
# (e.g. from BingX event IDs, venue order IDs, or metadata fields).
|
||
#
|
||
# Two-layer fix:
|
||
# 1. _json_null_clean() — strip null chars from all string values before JSON
|
||
# 2. ensure_ascii=True — encode to ASCII; guarantees no 0x00 in output
|
||
# (UTF-8 encodes U+0000 as 0x00; ASCII refuses non-ASCII entirely)
|
||
#
|
||
# Long-term path: change the Rust FFI to accept (ptr, len) pairs instead of
|
||
# null-terminated strings, removing the c_char_p truncation class entirely.
|
||
# That requires Rust changes; this fix is safe and sufficient until then.
|
||
|
||
def _json_null_clean(obj: Any) -> Any:
|
||
"""Recursively replace U+0000 null chars in string values with U+FFFD.
|
||
|
||
Null chars are invisible in logs and repr() but produce 0x00 bytes in
|
||
UTF-8 encoding that silently truncate ctypes c_char_p payloads.
|
||
"""
|
||
if isinstance(obj, str):
|
||
return obj.replace("\x00", "<EFBFBD>") if "\x00" in obj else obj
|
||
if isinstance(obj, dict):
|
||
return {_json_null_clean(k): _json_null_clean(v) for k, v in obj.items()}
|
||
if isinstance(obj, (list, tuple)):
|
||
return [_json_null_clean(v) for v in obj]
|
||
return obj
|
||
|
||
|
||
def _to_rust_bytes(obj: Any) -> bytes:
|
||
"""Serialize *obj* to ASCII JSON bytes safe for ctypes.c_char_p.
|
||
|
||
Never raises; any remaining null byte after sanitization is replaced and
|
||
the anomaly is logged — so a future regression is visible immediately.
|
||
"""
|
||
cleaned = _json_null_clean(json_safe(obj))
|
||
encoded = json.dumps(cleaned, separators=(",", ":"), ensure_ascii=True).encode("ascii")
|
||
if b"\x00" in encoded:
|
||
import logging as _log
|
||
_log.getLogger(__name__).error(
|
||
"BUG: null byte in Rust-bound JSON after sanitization — replacing. "
|
||
"Inspect _json_null_clean() for uncovered type."
|
||
)
|
||
encoded = encoded.replace(b"\x00", b"?")
|
||
return encoded
|
||
|
||
|
||
def _crate_dir() -> Path:
|
||
return Path(__file__).resolve().with_name("_rust_kernel")
|
||
|
||
|
||
def _library_path() -> Path:
|
||
if sys.platform == "darwin":
|
||
name = "libdita_v2_kernel.dylib"
|
||
elif os.name == "nt":
|
||
name = "dita_v2_kernel.dll"
|
||
else:
|
||
name = "libdita_v2_kernel.so"
|
||
return _crate_dir() / "target" / "release" / name
|
||
|
||
|
||
def _build_library() -> None:
|
||
crate_dir = _crate_dir()
|
||
if not crate_dir.exists():
|
||
raise FileNotFoundError(f"Missing Rust kernel crate: {crate_dir}")
|
||
subprocess.run(
|
||
["cargo", "build", "--release", "--manifest-path", str(crate_dir / "Cargo.toml")],
|
||
cwd=_repo_root(),
|
||
check=True,
|
||
)
|
||
|
||
|
||
def _ensure_library() -> Path:
|
||
path = _library_path()
|
||
if not path.exists():
|
||
_build_library()
|
||
return path
|
||
|
||
|
||
class _RustKernelLib:
|
||
def __init__(self) -> None:
|
||
path = _ensure_library()
|
||
self.lib = ctypes.CDLL(str(path))
|
||
self.lib.dita_kernel_create.argtypes = [ctypes.c_size_t]
|
||
self.lib.dita_kernel_create.restype = ctypes.c_void_p
|
||
self.lib.dita_kernel_destroy.argtypes = [ctypes.c_void_p]
|
||
self.lib.dita_kernel_destroy.restype = None
|
||
self.lib.dita_kernel_free_string.argtypes = [ctypes.c_void_p]
|
||
self.lib.dita_kernel_free_string.restype = None
|
||
self.lib.dita_kernel_get_slot_json.argtypes = [ctypes.c_void_p, ctypes.c_size_t]
|
||
self.lib.dita_kernel_get_slot_json.restype = ctypes.c_void_p
|
||
self.lib.dita_kernel_set_slot_json.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_char_p]
|
||
self.lib.dita_kernel_set_slot_json.restype = ctypes.c_int
|
||
self.lib.dita_kernel_process_intent_json.argtypes = [
|
||
ctypes.c_void_p,
|
||
ctypes.c_char_p,
|
||
ctypes.c_char_p,
|
||
ctypes.c_char_p,
|
||
]
|
||
self.lib.dita_kernel_process_intent_json.restype = ctypes.c_void_p
|
||
self.lib.dita_kernel_on_venue_event_json.argtypes = [
|
||
ctypes.c_void_p,
|
||
ctypes.c_char_p,
|
||
ctypes.c_char_p,
|
||
ctypes.c_char_p,
|
||
]
|
||
self.lib.dita_kernel_on_venue_event_json.restype = ctypes.c_void_p
|
||
self.lib.dita_kernel_reconcile_slots_json.argtypes = [
|
||
ctypes.c_void_p,
|
||
ctypes.c_char_p,
|
||
ctypes.c_char_p,
|
||
ctypes.c_char_p,
|
||
]
|
||
self.lib.dita_kernel_reconcile_slots_json.restype = ctypes.c_void_p
|
||
self.lib.dita_kernel_snapshot_json.argtypes = [ctypes.c_void_p]
|
||
self.lib.dita_kernel_snapshot_json.restype = ctypes.c_void_p
|
||
self.lib.dita_kernel_set_seed_capital.argtypes = [ctypes.c_void_p, ctypes.c_double]
|
||
self.lib.dita_kernel_set_seed_capital.restype = ctypes.c_int
|
||
self.lib.dita_kernel_set_exchange_config_json.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
|
||
self.lib.dita_kernel_set_exchange_config_json.restype = ctypes.c_int
|
||
self.lib.dita_kernel_calibrate_fee_json.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
|
||
self.lib.dita_kernel_calibrate_fee_json.restype = ctypes.c_void_p
|
||
self.lib.dita_kernel_on_account_event_json.argtypes = [
|
||
ctypes.c_void_p,
|
||
ctypes.c_char_p,
|
||
]
|
||
self.lib.dita_kernel_on_account_event_json.restype = ctypes.c_void_p
|
||
self.lib.dita_kernel_save_state_json.argtypes = [ctypes.c_void_p]
|
||
self.lib.dita_kernel_save_state_json.restype = ctypes.c_void_p
|
||
self.lib.dita_kernel_restore_state_json.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
|
||
self.lib.dita_kernel_restore_state_json.restype = ctypes.c_int
|
||
self.lib.dita_kernel_is_capital_frozen.argtypes = [ctypes.c_void_p]
|
||
self.lib.dita_kernel_is_capital_frozen.restype = ctypes.c_int
|
||
|
||
def create(self, max_slots: int) -> ctypes.c_void_p:
|
||
handle = self.lib.dita_kernel_create(ctypes.c_size_t(max_slots))
|
||
if not handle:
|
||
raise RuntimeError("dita_kernel_create failed")
|
||
return ctypes.c_void_p(handle)
|
||
|
||
def destroy(self, handle: ctypes.c_void_p) -> None:
|
||
if handle and handle.value:
|
||
self.lib.dita_kernel_destroy(handle)
|
||
|
||
def _take_string(self, raw: ctypes.c_void_p) -> str:
|
||
if not raw:
|
||
raise RuntimeError("Rust kernel returned null string")
|
||
text = ctypes.cast(raw, ctypes.c_char_p).value
|
||
if text is None:
|
||
self.lib.dita_kernel_free_string(raw)
|
||
raise RuntimeError("Rust kernel returned empty string")
|
||
try:
|
||
return text.decode("utf-8")
|
||
finally:
|
||
self.lib.dita_kernel_free_string(raw)
|
||
|
||
def get_slot_json(self, handle: ctypes.c_void_p, slot_id: int) -> Dict[str, Any]:
|
||
raw = self.lib.dita_kernel_get_slot_json(handle, ctypes.c_size_t(slot_id))
|
||
if not raw:
|
||
raise IndexError(f"Invalid slot id: {slot_id}")
|
||
return json.loads(self._take_string(raw))
|
||
|
||
def set_slot_json(self, handle: ctypes.c_void_p, slot_id: int, payload: Dict[str, Any]) -> None:
|
||
rc = self.lib.dita_kernel_set_slot_json(handle, ctypes.c_size_t(slot_id), ctypes.c_char_p(_to_rust_bytes(payload)))
|
||
if rc != 0:
|
||
raise RuntimeError(f"dita_kernel_set_slot_json failed rc={rc}")
|
||
|
||
def process_intent(
|
||
self,
|
||
handle: ctypes.c_void_p,
|
||
payload: Dict[str, Any],
|
||
*,
|
||
mode: str,
|
||
verbosity: str,
|
||
) -> Dict[str, Any]:
|
||
# Keep local refs so CPython's ref-count doesn't free the bytes objects
|
||
# before the Rust FFI call completes. ctypes.c_char_p stores a raw pointer
|
||
# without incrementing the Python refcount; a temporary would be freed
|
||
# after c_char_p() returns, giving Rust a dangling pointer.
|
||
_pb = _to_rust_bytes(payload)
|
||
_mb = mode.encode("ascii")
|
||
_vb = verbosity.encode("ascii")
|
||
raw = self.lib.dita_kernel_process_intent_json(
|
||
handle,
|
||
ctypes.c_char_p(_pb),
|
||
ctypes.c_char_p(_mb),
|
||
ctypes.c_char_p(_vb),
|
||
)
|
||
return json.loads(self._take_string(raw))
|
||
|
||
def on_venue_event(
|
||
self,
|
||
handle: ctypes.c_void_p,
|
||
payload: Dict[str, Any],
|
||
*,
|
||
mode: str,
|
||
verbosity: str,
|
||
) -> Dict[str, Any]:
|
||
_pb = _to_rust_bytes(payload)
|
||
_mb = mode.encode("ascii")
|
||
_vb = verbosity.encode("ascii")
|
||
raw = self.lib.dita_kernel_on_venue_event_json(
|
||
handle,
|
||
ctypes.c_char_p(_pb),
|
||
ctypes.c_char_p(_mb),
|
||
ctypes.c_char_p(_vb),
|
||
)
|
||
return json.loads(self._take_string(raw))
|
||
|
||
def reconcile_slots(
|
||
self,
|
||
handle: ctypes.c_void_p,
|
||
payload: Sequence[Dict[str, Any]],
|
||
*,
|
||
mode: str,
|
||
verbosity: str,
|
||
) -> Dict[str, Any]:
|
||
_pb = _to_rust_bytes(list(payload))
|
||
_mb = mode.encode("ascii")
|
||
_vb = verbosity.encode("ascii")
|
||
raw = self.lib.dita_kernel_reconcile_slots_json(
|
||
handle,
|
||
ctypes.c_char_p(_pb),
|
||
ctypes.c_char_p(_mb),
|
||
ctypes.c_char_p(_vb),
|
||
)
|
||
return json.loads(self._take_string(raw))
|
||
|
||
def snapshot(self, handle: ctypes.c_void_p) -> Dict[str, Any]:
|
||
raw = self.lib.dita_kernel_snapshot_json(handle)
|
||
return json.loads(self._take_string(raw))
|
||
|
||
def set_seed_capital(self, handle: ctypes.c_void_p, seed: float) -> bool:
|
||
rc = self.lib.dita_kernel_set_seed_capital(handle, ctypes.c_double(seed))
|
||
return rc == 0
|
||
|
||
def set_exchange_config(self, handle: ctypes.c_void_p, config: Dict[str, Any]) -> bool:
|
||
encoded = _to_rust_bytes(config)
|
||
rc = self.lib.dita_kernel_set_exchange_config_json(handle, ctypes.c_char_p(encoded))
|
||
return rc == 0
|
||
|
||
def calibrate_fee(self, handle: ctypes.c_void_p, fill_price: float, fill_qty: float, actual_fee: float, is_maker: bool = False) -> Dict[str, Any]:
|
||
payload = _to_rust_bytes({"fill_price": fill_price, "fill_qty": fill_qty, "actual_fee": actual_fee, "is_maker": is_maker})
|
||
raw = self.lib.dita_kernel_calibrate_fee_json(handle, ctypes.c_char_p(payload))
|
||
if not raw:
|
||
return {}
|
||
return json.loads(self._take_string(raw))
|
||
|
||
def on_account_event(
|
||
self, handle: ctypes.c_void_p, event: Dict[str, Any]
|
||
) -> Dict[str, Any]:
|
||
_eb = _to_rust_bytes(event)
|
||
raw = self.lib.dita_kernel_on_account_event_json(handle, ctypes.c_char_p(_eb))
|
||
if not raw:
|
||
return {}
|
||
return json.loads(self._take_string(raw))
|
||
|
||
def save_state(self, handle: ctypes.c_void_p) -> str:
|
||
"""Serialise full kernel state (slots + account + fee calibration) to JSON."""
|
||
raw = self.lib.dita_kernel_save_state_json(handle)
|
||
if not raw:
|
||
raise RuntimeError("dita_kernel_save_state_json returned NULL")
|
||
return self._take_string(raw)
|
||
|
||
def restore_state(self, handle: ctypes.c_void_p, json_str: str) -> bool:
|
||
"""Restore kernel from a previously saved JSON blob. Returns True on success."""
|
||
# json_str comes from Rust's save_state_json — re-encode via _to_rust_bytes so any
|
||
# embedded null chars are sanitized rather than silently truncating the restore.
|
||
encoded = _to_rust_bytes(json.loads(json_str))
|
||
rc = self.lib.dita_kernel_restore_state_json(handle, ctypes.c_char_p(encoded))
|
||
return rc == 0
|
||
|
||
def is_capital_frozen(self, handle: ctypes.c_void_p) -> bool:
|
||
"""Return True if the kernel's capital is frozen (reconcile ERROR active)."""
|
||
rc = self.lib.dita_kernel_is_capital_frozen(handle)
|
||
return rc == 1
|
||
|
||
|
||
_RUST: _RustKernelLib | None = None # lazy init — avoids Rust build on import
|
||
|
||
|
||
def _get_rust() -> _RustKernelLib:
|
||
global _RUST
|
||
if _RUST is None:
|
||
_RUST = _RustKernelLib()
|
||
return _RUST
|
||
|
||
|
||
def _safe_enum(enum_cls, raw: str, default):
|
||
"""H6: parse enum from FFI string without crashing on unknown variants."""
|
||
try:
|
||
return enum_cls(raw)
|
||
except (ValueError, KeyError):
|
||
return default
|
||
|
||
|
||
def _slot_to_payload(slot: TradeSlot) -> Dict[str, Any]:
|
||
return slot.to_dict()
|
||
|
||
|
||
def _order_to_payload(order: Optional[VenueOrder]) -> Optional[Dict[str, Any]]:
|
||
if order is None:
|
||
return None
|
||
return {
|
||
"internal_trade_id": order.internal_trade_id,
|
||
"venue_order_id": order.venue_order_id,
|
||
"venue_client_id": order.venue_client_id,
|
||
"side": order.side.value,
|
||
"intended_size": float(order.intended_size or 0.0),
|
||
"filled_size": float(order.filled_size or 0.0),
|
||
"average_fill_price": float(order.average_fill_price or 0.0),
|
||
"status": order.status.value,
|
||
"metadata": dict(order.metadata),
|
||
}
|
||
|
||
|
||
def _order_from_payload(payload: Optional[Dict[str, Any]], *, trade_id: str) -> Optional[VenueOrder]:
|
||
if not isinstance(payload, dict):
|
||
return None
|
||
return VenueOrder(
|
||
internal_trade_id=trade_id,
|
||
venue_order_id=str(payload.get("venue_order_id", "")),
|
||
venue_client_id=str(payload.get("venue_client_id", "")),
|
||
side=TradeSide(str(payload.get("side", TradeSide.FLAT.value))),
|
||
intended_size=float(payload.get("intended_size", 0.0)),
|
||
filled_size=float(payload.get("filled_size", 0.0)),
|
||
average_fill_price=float(payload.get("average_fill_price", 0.0)),
|
||
status=VenueOrderStatus(str(payload.get("status", VenueOrderStatus.NEW.value))),
|
||
metadata=dict(payload.get("metadata", {})),
|
||
)
|
||
|
||
|
||
def _slot_from_payload(payload: Dict[str, Any]) -> TradeSlot:
|
||
return TradeSlot(
|
||
slot_id=int(payload.get("slot_id", 0)),
|
||
trade_id=str(payload.get("trade_id", "")),
|
||
asset=str(payload.get("asset", "")),
|
||
side=TradeSide(str(payload.get("side", TradeSide.FLAT.value))),
|
||
entry_price=float(payload.get("entry_price", 0.0)),
|
||
size=float(payload.get("size", 0.0)),
|
||
initial_size=float(payload.get("initial_size", 0.0)),
|
||
leverage=float(payload.get("leverage", 0.0)),
|
||
entry_time=datetime.fromisoformat(payload["entry_time"]) if payload.get("entry_time") else None,
|
||
unrealized_pnl=float(payload.get("unrealized_pnl", 0.0)),
|
||
realized_pnl=float(payload.get("realized_pnl", 0.0)),
|
||
closed=bool(payload.get("closed", False)),
|
||
exit_leg_ratios=tuple(float(r) for r in payload.get("exit_leg_ratios", (1.0,))),
|
||
active_leg_index=int(payload.get("active_leg_index", 0)),
|
||
active_exit_order=_order_from_payload(payload.get("active_exit_order"), trade_id=str(payload.get("trade_id", ""))),
|
||
active_entry_order=_order_from_payload(payload.get("active_entry_order"), trade_id=str(payload.get("trade_id", ""))),
|
||
fsm_state=_safe_enum(TradeStage, str(payload.get("fsm_state", TradeStage.IDLE.value)), TradeStage.IDLE),
|
||
close_reason=str(payload.get("close_reason", "")),
|
||
last_event_time=datetime.fromisoformat(payload["last_event_time"]) if payload.get("last_event_time") else None,
|
||
seen_event_ids=tuple(str(event_id) for event_id in payload.get("seen_event_ids", ())),
|
||
metadata=dict(payload.get("metadata", {})),
|
||
)
|
||
|
||
|
||
def _first_invalid_intent_field(intent: KernelIntent) -> Optional[tuple[str, float]]:
|
||
"""Return (field, value) for the first non-finite or out-of-bounds numeric
|
||
field on an intent, or None if all are sane. Guards the kernel boundary
|
||
against inf/NaN that would otherwise crash serde_json serialization."""
|
||
scalar_checks = (
|
||
("target_size", float(intent.target_size if intent.target_size is not None else 0.0)),
|
||
("reference_price", float(intent.reference_price if intent.reference_price is not None else 0.0)),
|
||
("leverage", float(intent.leverage if intent.leverage is not None else 0.0)),
|
||
("limit_price", float(getattr(intent, "limit_price", 0.0) or 0.0)),
|
||
)
|
||
for name, value in scalar_checks:
|
||
if not math.isfinite(value):
|
||
return (name, value)
|
||
for idx, ratio in enumerate(intent.exit_leg_ratios or ()): # type: ignore[union-attr]
|
||
rv = float(ratio if ratio is not None else 0.0)
|
||
if not math.isfinite(rv):
|
||
return (f"exit_leg_ratios[{idx}]", rv)
|
||
size = float(intent.target_size if intent.target_size is not None else 0.0)
|
||
if size < 0.0:
|
||
return ("target_size", size)
|
||
return None
|
||
|
||
|
||
def _intent_to_payload(intent: KernelIntent) -> Dict[str, Any]:
|
||
return {
|
||
"timestamp": intent.timestamp.isoformat() if hasattr(intent.timestamp, "isoformat") else str(intent.timestamp),
|
||
"intent_id": intent.intent_id,
|
||
"trade_id": intent.trade_id,
|
||
"slot_id": intent.slot_id,
|
||
"asset": intent.asset,
|
||
"side": intent.side.value,
|
||
"action": intent.action.value,
|
||
"reference_price": float(intent.reference_price or 0.0),
|
||
"target_size": float(intent.target_size or 0.0),
|
||
"leverage": float(intent.leverage or 0.0),
|
||
"exit_leg_ratios": list(intent.exit_leg_ratios),
|
||
"reason": intent.reason,
|
||
"metadata": dict(intent.metadata),
|
||
"stage": intent.stage.value,
|
||
"order_type": getattr(intent, "order_type", "MARKET"),
|
||
"limit_price": float(getattr(intent, "limit_price", 0.0) or 0.0),
|
||
}
|
||
|
||
|
||
def _event_to_payload(event: VenueEvent) -> Dict[str, Any]:
|
||
return {
|
||
"timestamp": event.timestamp.isoformat() if hasattr(event.timestamp, "isoformat") else str(event.timestamp),
|
||
"event_id": event.event_id,
|
||
"trade_id": event.trade_id,
|
||
"slot_id": event.slot_id,
|
||
"kind": event.kind.value,
|
||
"status": event.status.value,
|
||
"venue_order_id": event.venue_order_id,
|
||
"venue_client_id": event.venue_client_id,
|
||
"side": event.side.value,
|
||
"asset": event.asset,
|
||
"price": float(event.price or 0.0),
|
||
"size": float(event.size or 0.0),
|
||
"filled_size": float(event.filled_size or 0.0),
|
||
"remaining_size": float(event.remaining_size or 0.0),
|
||
"reason": event.reason,
|
||
"raw_payload": dict(event.raw_payload),
|
||
"metadata": dict(event.metadata),
|
||
}
|
||
|
||
|
||
def _transition_from_payload(payload: Dict[str, Any]) -> KernelTransition:
|
||
return KernelTransition(
|
||
timestamp=datetime.fromisoformat(payload["timestamp"]),
|
||
trade_id=str(payload.get("trade_id", "")),
|
||
slot_id=int(payload.get("slot_id", 0)),
|
||
prev_state=_safe_enum(TradeStage, str(payload.get("prev_state", TradeStage.IDLE.value)), TradeStage.IDLE),
|
||
next_state=_safe_enum(TradeStage, str(payload.get("next_state", TradeStage.IDLE.value)), TradeStage.IDLE),
|
||
trigger=str(payload.get("trigger", "")),
|
||
intent_id=str(payload.get("intent_id", "")),
|
||
event_id=str(payload.get("event_id", "")),
|
||
control_mode=str(payload.get("control_mode", "")),
|
||
control_verbosity=str(payload.get("control_verbosity", "")),
|
||
details=dict(payload.get("details", {})),
|
||
)
|
||
|
||
|
||
def _outcome_from_payload(payload: Dict[str, Any]) -> KernelOutcome:
|
||
return KernelOutcome(
|
||
accepted=bool(payload.get("accepted", False)),
|
||
slot_id=int(payload.get("slot_id", 0)),
|
||
trade_id=str(payload.get("trade_id", "")),
|
||
state=_safe_enum(TradeStage, str(payload.get("state", TradeStage.IDLE.value)), TradeStage.IDLE),
|
||
diagnostic_code=KernelDiagnosticCode(str(payload.get("diagnostic_code", KernelDiagnosticCode.OK.value))),
|
||
severity=KernelSeverity(str(payload.get("severity", KernelSeverity.INFO.value))),
|
||
transitions=tuple(_transition_from_payload(row) for row in payload.get("transitions", [])),
|
||
emitted_events=tuple(
|
||
VenueEvent(
|
||
timestamp=datetime.fromisoformat(row["timestamp"]),
|
||
event_id=str(row.get("event_id", "")),
|
||
trade_id=str(row.get("trade_id", "")),
|
||
slot_id=int(row.get("slot_id", 0)),
|
||
kind=KernelEventKind(str(row.get("kind", KernelEventKind.ORDER_ACK.value))),
|
||
status=VenueEventStatus(str(row.get("status", VenueEventStatus.ACKED.value))),
|
||
venue_order_id=str(row.get("venue_order_id", "")),
|
||
venue_client_id=str(row.get("venue_client_id", "")),
|
||
side=TradeSide(str(row.get("side", TradeSide.FLAT.value))),
|
||
asset=str(row.get("asset", "")),
|
||
price=float(row.get("price", 0.0)),
|
||
size=float(row.get("size", 0.0)),
|
||
filled_size=float(row.get("filled_size", 0.0)),
|
||
remaining_size=float(row.get("remaining_size", 0.0)),
|
||
reason=str(row.get("reason", "")),
|
||
raw_payload=dict(row.get("raw_payload", {})),
|
||
metadata=dict(row.get("metadata", {})),
|
||
)
|
||
for row in payload.get("emitted_events", [])
|
||
),
|
||
details=dict(payload.get("details", {})),
|
||
)
|
||
|
||
|
||
def _enum_text(value: Any) -> str:
|
||
if hasattr(value, "value"):
|
||
return str(getattr(value, "value"))
|
||
return str(value)
|
||
|
||
|
||
class KernelSlotView:
|
||
"""Write-through view over a Rust-backed slot."""
|
||
|
||
def __init__(self, kernel: "ExecutionKernel", slot_id: int) -> None:
|
||
object.__setattr__(self, "_kernel", kernel)
|
||
object.__setattr__(self, "_slot_id", int(slot_id))
|
||
|
||
@property
|
||
def slot_id(self) -> int:
|
||
return object.__getattribute__(self, "_slot_id")
|
||
|
||
def _snapshot(self) -> TradeSlot:
|
||
return self._kernel._get_slot(self.slot_id)
|
||
|
||
def __getattr__(self, name: str) -> Any:
|
||
slot = self._snapshot()
|
||
if hasattr(slot, name):
|
||
return getattr(slot, name)
|
||
raise AttributeError(name)
|
||
|
||
def __setattr__(self, name: str, value: Any) -> None:
|
||
if name in {"_kernel", "_slot_id"}:
|
||
object.__setattr__(self, name, value)
|
||
return
|
||
slot = self._snapshot()
|
||
if not hasattr(slot, name):
|
||
raise AttributeError(name)
|
||
setattr(slot, name, value)
|
||
self._kernel._set_slot(slot)
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
return self._snapshot().to_dict()
|
||
|
||
def is_free(self) -> bool:
|
||
return self._snapshot().is_free()
|
||
|
||
def is_open(self) -> bool:
|
||
return self._snapshot().is_open()
|
||
|
||
def mark_price(self, price: float) -> None:
|
||
slot = self._snapshot()
|
||
slot.mark_price(price)
|
||
self._kernel._set_slot(slot)
|
||
|
||
def next_exit_ratio(self) -> float:
|
||
return self._snapshot().next_exit_ratio()
|
||
|
||
def consume_exit_leg(self) -> float:
|
||
slot = self._snapshot()
|
||
ratio = slot.consume_exit_leg()
|
||
self._kernel._set_slot(slot)
|
||
return ratio
|
||
|
||
def attach_entry_order(self, order: VenueOrder) -> None:
|
||
slot = self._snapshot()
|
||
slot.active_entry_order = order
|
||
self._kernel._set_slot(slot)
|
||
|
||
def attach_exit_order(self, order: VenueOrder) -> None:
|
||
slot = self._snapshot()
|
||
slot.active_exit_order = order
|
||
self._kernel._set_slot(slot)
|
||
|
||
def __repr__(self) -> str: # pragma: no cover - debugging helper
|
||
return f"KernelSlotView(slot_id={self.slot_id}, state={self._snapshot().fsm_state.value})"
|
||
|
||
|
||
class KernelStateView:
|
||
def __init__(self, kernel: "ExecutionKernel") -> None:
|
||
self._kernel = kernel
|
||
self.slots = [KernelSlotView(kernel, slot_id) for slot_id in range(kernel.max_slots)]
|
||
self.active_trade_index: Dict[str, int] = {}
|
||
self.venue_order_index: Dict[str, int] = {}
|
||
self.client_order_index: Dict[str, int] = {}
|
||
self.refresh()
|
||
|
||
def refresh(self) -> None:
|
||
snapshot = self._kernel._snapshot_backend()
|
||
self.active_trade_index = dict(snapshot.get("active_trade_index", {}))
|
||
self.venue_order_index = dict(snapshot.get("venue_order_index", {}))
|
||
self.client_order_index = dict(snapshot.get("client_order_index", {}))
|
||
|
||
|
||
class ExecutionKernel:
|
||
"""Rust-backed multi-slot execution kernel."""
|
||
|
||
def __init__(
|
||
self,
|
||
*,
|
||
max_slots: int = 10,
|
||
control_plane: Optional[ControlPlane] = None,
|
||
venue: Optional[VenueAdapter] = None,
|
||
journal: Optional[KernelJournal] = None,
|
||
account: Optional[AccountProjection] = None,
|
||
projection: Optional[HazelcastProjection] = None,
|
||
projection_client: Optional[Any] = None,
|
||
zinc_plane: Optional[ZincPlane] = None,
|
||
) -> None:
|
||
self.max_slots = int(max_slots)
|
||
self.control_plane = control_plane or build_control_plane()
|
||
self.venue = venue or MockVenueAdapter()
|
||
self.journal = journal or MemoryKernelJournal()
|
||
self.account = account or AccountProjection()
|
||
self.projection = projection or build_projection(client=projection_client)
|
||
self.zinc_plane = zinc_plane or InMemoryZincPlane()
|
||
self._backend = _get_rust().create(self.max_slots)
|
||
self._control_snapshot = self.control_plane.read()
|
||
self._last_settled_pnl: Dict[int, float] = {}
|
||
self.projection.write_control(self._control_snapshot)
|
||
self.zinc_plane.update_control(self._control_snapshot)
|
||
self.state = KernelStateView(self)
|
||
self.account.observe_slots([self._get_slot(slot_id) for slot_id in range(self.max_slots)])
|
||
# I14: restore any non-idle slot state that survived in Zinc across
|
||
# a restart. A fresh kernel has all slots IDLE; if Zinc holds slots
|
||
# from a prior session the kernel must re-anchor them so the FSM
|
||
# correctly reflects open/working positions on re-entry.
|
||
_zinc_live = [s for s in self.zinc_plane.read_slots() if not s.is_free()]
|
||
if _zinc_live:
|
||
self.reconcile_from_slots(_zinc_live)
|
||
|
||
def close(self) -> None:
|
||
"""Release the Rust kernel handle deterministically (O10).
|
||
|
||
Safe to call multiple times. After close(), all FFI methods will
|
||
raise RuntimeError — the kernel is no longer usable.
|
||
"""
|
||
backend = self._backend
|
||
if backend is not None:
|
||
self._backend = None # prevent double-free via __del__
|
||
try:
|
||
_get_rust().destroy(backend)
|
||
except Exception:
|
||
pass
|
||
|
||
def __enter__(self) -> "ExecutionKernel":
|
||
return self
|
||
|
||
def __exit__(self, *_: object) -> None:
|
||
self.close()
|
||
|
||
def __del__(self) -> None: # pragma: no cover - backup for non-with use
|
||
self.close()
|
||
|
||
@property
|
||
def control(self) -> KernelControlSnapshot:
|
||
return self.control_plane.read()
|
||
|
||
def update_control(self, update: ControlUpdate) -> KernelControlSnapshot:
|
||
snapshot = self.control_plane.update(update)
|
||
self._control_snapshot = snapshot
|
||
self.projection.write_control(snapshot)
|
||
self.zinc_plane.update_control(snapshot)
|
||
return snapshot
|
||
|
||
def _snapshot_backend(self) -> Dict[str, Any]:
|
||
return _get_rust().snapshot(self._backend)
|
||
|
||
def _get_slot(self, slot_id: int) -> TradeSlot:
|
||
return _slot_from_payload(_get_rust().get_slot_json(self._backend, slot_id))
|
||
|
||
def _set_slot(self, slot: TradeSlot, *, journal: bool = False) -> None:
|
||
payload = _slot_to_payload(slot)
|
||
_get_rust().set_slot_json(self._backend, slot.slot_id, payload)
|
||
self.state.refresh()
|
||
slots = [self._get_slot(slot_id) for slot_id in range(self.max_slots)]
|
||
self.account.observe_slots(slots)
|
||
current = self._get_slot(slot.slot_id)
|
||
self.projection.write_slot(current)
|
||
self.zinc_plane.write_slot(current)
|
||
|
||
def slot(self, slot_id: int) -> KernelSlotView:
|
||
if not (0 <= int(slot_id) < self.max_slots):
|
||
raise IndexError(slot_id)
|
||
return self.state.slots[int(slot_id)]
|
||
|
||
def free_slot(self) -> Optional[KernelSlotView]:
|
||
for slot in self.state.slots:
|
||
if slot.is_free():
|
||
return slot
|
||
return None
|
||
|
||
def _record_transitions(self, transitions: Iterable[KernelTransition], slot: TradeSlot, event: Optional[VenueEvent]) -> None:
|
||
if self.control.debug_clickhouse_enabled:
|
||
for transition in transitions:
|
||
self.journal.record_transition(
|
||
transition=transition,
|
||
slot=slot,
|
||
event=event,
|
||
control=self.control,
|
||
)
|
||
|
||
def process_intent(self, intent: KernelIntent) -> KernelOutcome:
|
||
self.zinc_plane.publish_intent(intent)
|
||
if not (0 <= int(intent.slot_id) < self.max_slots):
|
||
return KernelOutcome(
|
||
accepted=False,
|
||
slot_id=int(intent.slot_id),
|
||
trade_id=intent.trade_id,
|
||
state=TradeStage.IDLE,
|
||
diagnostic_code=KernelDiagnosticCode.INVALID_SLOT_ID,
|
||
details={"reason": "INVALID_SLOT_ID", "slot_id": int(intent.slot_id), "intent_id": intent.intent_id},
|
||
)
|
||
# Finiteness / sanity guard at the kernel boundary. A non-finite (inf/NaN)
|
||
# numeric field would make the Rust core's serde_json serialization return
|
||
# a null string (panic). Reject cleanly with INVALID_INTENT instead, naming
|
||
# the offending field + value so the upstream numerical source can be located.
|
||
bad_field = _first_invalid_intent_field(intent)
|
||
if bad_field is not None:
|
||
name, value = bad_field
|
||
return KernelOutcome(
|
||
accepted=False,
|
||
slot_id=int(intent.slot_id),
|
||
trade_id=intent.trade_id,
|
||
state=self._get_slot(int(intent.slot_id)).fsm_state,
|
||
diagnostic_code=KernelDiagnosticCode.INVALID_INTENT,
|
||
severity=KernelSeverity.WARNING,
|
||
details={
|
||
"reason": "INVALID_INTENT",
|
||
"field": name,
|
||
"value": str(value),
|
||
"intent_id": intent.intent_id,
|
||
"action": intent.action.value,
|
||
"asset": intent.asset,
|
||
},
|
||
)
|
||
payload = _intent_to_payload(intent)
|
||
result = _get_rust().process_intent(
|
||
self._backend,
|
||
payload,
|
||
mode=_enum_text(self.control.mode),
|
||
verbosity=_enum_text(self.control.verbosity),
|
||
)
|
||
outcome = _outcome_from_payload(result["outcome"])
|
||
self.state.refresh()
|
||
if intent.action == KernelCommandType.ENTER and outcome.accepted:
|
||
self._last_settled_pnl[intent.slot_id] = 0.0
|
||
emitted_events = []
|
||
all_venue_transitions: List[KernelTransition] = []
|
||
if outcome.accepted and intent.action in {KernelCommandType.ENTER, KernelCommandType.EXIT}:
|
||
emitted_events = self.venue.submit(intent)
|
||
for event in emitted_events:
|
||
evt_outcome = self.on_venue_event(event)
|
||
all_venue_transitions.extend(evt_outcome.transitions)
|
||
elif intent.action == KernelCommandType.CANCEL:
|
||
slot_view = self.slot(intent.slot_id)
|
||
if slot_view.active_exit_order is not None:
|
||
emitted_events = self.venue.cancel(slot_view.active_exit_order, reason=intent.reason)
|
||
elif slot_view.active_entry_order is not None and slot_view.fsm_state in {
|
||
TradeStage.ENTRY_WORKING,
|
||
TradeStage.ORDER_REQUESTED,
|
||
TradeStage.ORDER_SENT,
|
||
TradeStage.IDLE,
|
||
}:
|
||
emitted_events = self.venue.cancel(slot_view.active_entry_order, reason=intent.reason)
|
||
else:
|
||
emitted_events = []
|
||
for event in emitted_events:
|
||
evt_outcome = self.on_venue_event(event)
|
||
all_venue_transitions.extend(evt_outcome.transitions)
|
||
|
||
final_slot = self._get_slot(outcome.slot_id)
|
||
rate_limit_event = next((event for event in emitted_events if event.kind == KernelEventKind.RATE_LIMITED), None)
|
||
if rate_limit_event is not None:
|
||
rate_limit_details = dict(outcome.details)
|
||
rate_limit_details.update(
|
||
{
|
||
"reason": rate_limit_event.reason or "RATE_LIMITED",
|
||
"retry_after_ms": int(rate_limit_event.metadata.get("retry_after_ms", 0) or 0),
|
||
"venue_event_kind": rate_limit_event.kind.value,
|
||
"severity": KernelSeverity.WARNING.value,
|
||
"release_eta": "few minutes",
|
||
"retryable": True,
|
||
}
|
||
)
|
||
outcome = KernelOutcome(
|
||
accepted=False,
|
||
slot_id=outcome.slot_id,
|
||
trade_id=outcome.trade_id,
|
||
state=final_slot.fsm_state,
|
||
diagnostic_code=KernelDiagnosticCode.RATE_LIMITED,
|
||
severity=KernelSeverity.WARNING,
|
||
transitions=outcome.transitions,
|
||
emitted_events=outcome.emitted_events,
|
||
details=rate_limit_details,
|
||
)
|
||
all_transitions = list(outcome.transitions) + all_venue_transitions
|
||
final_outcome = KernelOutcome(
|
||
accepted=outcome.accepted,
|
||
slot_id=outcome.slot_id,
|
||
trade_id=final_slot.trade_id,
|
||
state=final_slot.fsm_state,
|
||
diagnostic_code=outcome.diagnostic_code,
|
||
transitions=tuple(all_transitions),
|
||
emitted_events=tuple(emitted_events),
|
||
details=dict(outcome.details),
|
||
)
|
||
slots = [self._get_slot(i) for i in range(self.max_slots)]
|
||
self.account.observe_slots(slots)
|
||
current = self._get_slot(final_slot.slot_id)
|
||
self.projection.write_slot(current)
|
||
self.zinc_plane.write_slot(current)
|
||
self._record_transitions(outcome.transitions, final_slot, None)
|
||
return final_outcome
|
||
|
||
def on_venue_event(self, event: VenueEvent) -> KernelOutcome:
|
||
result = _get_rust().on_venue_event(
|
||
self._backend,
|
||
_event_to_payload(event),
|
||
mode=_enum_text(self.control.mode),
|
||
verbosity=_enum_text(self.control.verbosity),
|
||
)
|
||
outcome = _outcome_from_payload(result["outcome"])
|
||
# An INVALID_* fallback result carries a null slot; fall back to the
|
||
# kernel's current slot so settlement/bookkeeping stays consistent.
|
||
slot_payload = result.get("slot")
|
||
slot = _slot_from_payload(slot_payload) if slot_payload else self._get_slot(int(outcome.slot_id))
|
||
self.state.refresh()
|
||
incremental_pnl = slot.realized_pnl - self._last_settled_pnl.get(slot.slot_id, 0.0)
|
||
if abs(incremental_pnl) > 1e-12:
|
||
self.account.settle(incremental_pnl)
|
||
self._last_settled_pnl[slot.slot_id] = slot.realized_pnl
|
||
slots = [self._get_slot(i) for i in range(self.max_slots)]
|
||
self.account.observe_slots(slots)
|
||
current = self._get_slot(slot.slot_id)
|
||
self.projection.write_slot(current)
|
||
self.zinc_plane.write_slot(current)
|
||
self._record_transitions(outcome.transitions, slot, event)
|
||
return outcome
|
||
|
||
def mark_price(self, asset: str, price: float) -> None:
|
||
for slot in self.state.slots:
|
||
if slot.asset == asset and slot.is_open():
|
||
slot.mark_price(price)
|
||
self.account.observe_slots([self._get_slot(i) for i in range(self.max_slots)])
|
||
|
||
def reconcile_from_slots(self, slots: Sequence[TradeSlot]) -> KernelOutcome:
|
||
payload = [_slot_to_payload(slot) for slot in slots]
|
||
result = _get_rust().reconcile_slots(
|
||
self._backend,
|
||
payload,
|
||
mode=_enum_text(self.control.mode),
|
||
verbosity=_enum_text(self.control.verbosity),
|
||
)
|
||
outcome = _outcome_from_payload(result["outcome"])
|
||
if not outcome.accepted:
|
||
return outcome
|
||
self.state.refresh()
|
||
slots = [self._get_slot(i) for i in range(self.max_slots)]
|
||
self.account.observe_slots(slots)
|
||
for current in slots:
|
||
self.projection.write_slot(current)
|
||
self.zinc_plane.write_slot(current)
|
||
return outcome
|
||
|
||
def set_seed_capital(self, seed: float) -> None:
|
||
"""Set the kernel's seed capital for K-value fold. Call once at init."""
|
||
_get_rust().set_seed_capital(self._backend, float(seed))
|
||
|
||
def set_exchange_config(self, config: Dict[str, Any]) -> None:
|
||
"""
|
||
Load the exchange fee schedule into the kernel's fee prediction model.
|
||
|
||
config keys (all optional — unset keys keep defaults):
|
||
taker_rate float fraction (0.0005 = 0.05%)
|
||
maker_rate float fraction (0.0002 = 0.02%)
|
||
lot_step float quantity increment
|
||
tick_size float price increment
|
||
funding_interval_secs int seconds between funding payments (28800 = 8 h)
|
||
|
||
After this call the kernel can predict fees at fill time so K-capital
|
||
tracks E-wallet without waiting for the WS FILL_SETTLED event.
|
||
"""
|
||
_get_rust().set_exchange_config(self._backend, config)
|
||
|
||
def calibrate_fee(
|
||
self,
|
||
fill_price: float,
|
||
fill_qty: float,
|
||
actual_fee: float,
|
||
is_maker: bool = False,
|
||
) -> Dict[str, Any]:
|
||
"""
|
||
Validate the fee model against one known fill.
|
||
|
||
Returns:
|
||
expected_fee model prediction before calibration
|
||
actual_fee exchange-reported value
|
||
ratio actual / expected
|
||
deviation_pct |ratio - 1| × 100
|
||
calibration_status "OK" (<1%) / "WARN" (<5%) / "ERROR" (≥5%)
|
||
calibration_ratio updated EMA of actual/expected ratio
|
||
calibration_samples fills seen so far
|
||
|
||
Call once on startup with a recent fill from account history before
|
||
enabling live trading. If status == ERROR, the fee model needs manual
|
||
review before K-capital figures can be trusted.
|
||
"""
|
||
return _get_rust().calibrate_fee(self._backend, float(fill_price), float(fill_qty), float(actual_fee), bool(is_maker))
|
||
|
||
def on_account_event(self, event: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""
|
||
Apply an account-level exchange event atomically to the kernel.
|
||
|
||
event dict must have "kind" in {"FILL_SETTLED","ACCOUNT_UPDATE","FUNDING_FEE"}
|
||
plus the relevant numeric fields (see Rust FFI doc).
|
||
|
||
Returns the resulting account state dict including reconcile_status,
|
||
available_capital (E rules when present), k_capital, event_seq,
|
||
capital_frozen (bool), duplicate_event (bool if deduplicated).
|
||
"""
|
||
return _get_rust().on_account_event(self._backend, event)
|
||
|
||
# ------------------------------------------------------------------
|
||
# Snapshot / restore — session-to-session state continuity
|
||
# ------------------------------------------------------------------
|
||
|
||
def save_state(self) -> str:
|
||
"""Serialise the full kernel state (slots + account + fee calibration) to JSON.
|
||
|
||
The returned string is opaque — pass it verbatim to restore_state() on the
|
||
next session start to resume without losing fee calibration or slot state.
|
||
"""
|
||
return _get_rust().save_state(self._backend)
|
||
|
||
def restore_state(self, json_str: str) -> bool:
|
||
"""Restore kernel from a previously saved state JSON blob.
|
||
|
||
Returns True on success. Returns False (and leaves state unchanged) on:
|
||
- schema version mismatch
|
||
- slot count mismatch
|
||
- parse error
|
||
- non-finite capital
|
||
|
||
Safe to call on a fresh kernel (e.g. after startup) before any trades.
|
||
"""
|
||
return _get_rust().restore_state(self._backend, json_str)
|
||
|
||
def is_capital_frozen(self) -> bool:
|
||
"""Return True if the kernel's capital is frozen (reconcile ERROR active).
|
||
|
||
When frozen, process_intent will reject all ENTER intents with CAPITAL_FROZEN
|
||
until the next ACCOUNT_UPDATE that brings reconcile to OK.
|
||
"""
|
||
return _get_rust().is_capital_frozen(self._backend)
|
||
|
||
def snapshot(self) -> Dict[str, Any]:
|
||
# Merge kernel Rust snapshot (includes AccountState) with Python state.
|
||
rust_snap = _get_rust().snapshot(self._backend)
|
||
rust_account = rust_snap.get("account", {})
|
||
return {
|
||
"control": self.control.as_dict(),
|
||
"slots": [self._get_slot(slot.slot_id).to_dict() for slot in self.state.slots],
|
||
"account": {
|
||
# Legacy fields (Python AccountProjection) — backward compat
|
||
"capital": self.account.snapshot.capital,
|
||
"equity": self.account.snapshot.equity,
|
||
"realized_pnl": self.account.snapshot.realized_pnl,
|
||
"unrealized_pnl": self.account.snapshot.unrealized_pnl,
|
||
"open_positions": self.account.snapshot.open_positions,
|
||
"open_notional": self.account.snapshot.open_notional,
|
||
"leverage": self.account.snapshot.leverage,
|
||
# V2 — kernel atomic K/E account (E rules; K is parallel check)
|
||
"k_capital": rust_account.get("k_capital", self.account.snapshot.capital),
|
||
"k_realized_pnl": rust_account.get("k_realized_pnl", self.account.snapshot.realized_pnl),
|
||
"k_fees_paid": rust_account.get("k_fees_paid", self.account.snapshot.fees_paid),
|
||
"k_funding_net": rust_account.get("k_funding_net", 0.0),
|
||
"e_wallet_balance": rust_account.get("e_wallet_balance", 0.0),
|
||
"e_available_margin": rust_account.get("e_available_margin", 0.0),
|
||
"e_used_margin": rust_account.get("e_used_margin", 0.0),
|
||
"e_maint_margin": rust_account.get("e_maint_margin", 0.0),
|
||
# available_capital: E rules when present, K as fallback
|
||
"available_capital": rust_account.get("available_capital", self.account.snapshot.capital),
|
||
"reconcile_status": rust_account.get("reconcile_status", "OK"),
|
||
"reconcile_delta": rust_account.get("reconcile_delta", 0.0),
|
||
"reconcile_explanation": rust_account.get("reconcile_explanation", ""),
|
||
"event_seq": rust_account.get("event_seq", 0),
|
||
},
|
||
}
|