From 468984baab3ad58cc0b7ad5bd9248fed50dc31ce Mon Sep 17 00:00:00 2001 From: Codex Date: Mon, 1 Jun 2026 21:22:01 +0200 Subject: [PATCH] PINK: Rust kernel atomic K/E account layer (AccountState + FFI) AccountState in KernelCore/KernelSnapshot: - K-values: seed_capital, k_realized_pnl, k_fees_paid, k_funding_net - E-facts: e_wallet_balance, e_available_margin, e_used_margin, e_maint_margin - Cached: k_capital, available_capital (E rules when present; K fallback) - Reconcile: OK/WARN(<20)/ERROR(>=20 delta) runs atomically on every event New FFI: dita_kernel_set_seed_capital(handle, seed: f64) -> i32 dita_kernel_on_account_event_json(handle, payload) -> *char Kinds: FILL_SETTLED | ACCOUNT_UPDATE | FUNDING_FEE rust_backend.py: wires set_seed_capital() and on_account_event(); snapshot()[account] exposes both legacy and V2 fields. Smoke-tested: fill->E_update->funding->re-sync all produce correct K/E values and reconcile transitions (OK->OK->WARN->OK). 89/89 offline tests pass. --- .../dita_v2/_rust_kernel/src/lib.rs | 2041 +++++++++++++++++ prod/clean_arch/dita_v2/rust_backend.py | 808 +++++++ 2 files changed, 2849 insertions(+) create mode 100644 prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs create mode 100644 prod/clean_arch/dita_v2/rust_backend.py diff --git a/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs new file mode 100644 index 0000000..2a4c1e7 --- /dev/null +++ b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs @@ -0,0 +1,2041 @@ +#![allow(non_camel_case_types)] + +use std::collections::HashMap; +use std::ffi::{c_char, CStr, CString}; +use std::ptr; + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Map, Value}; + +const MAX_SEEN_EVENT_IDS: usize = 256; + +#[repr(C)] +pub struct KernelHandle { + core: KernelCore, +} + +macro_rules! string_enum { + ( + $(#[$meta:meta])* + enum $name:ident { + $( $variant:ident ),+ $(,)? + } + ) => { + $(#[$meta])* + #[derive(Debug, Clone, PartialEq, Eq)] + enum $name { + $( $variant ),+ + } + + impl $name { + fn as_str(&self) -> &'static str { + match self { + $( Self::$variant => stringify!($variant), )+ + } + } + } + + impl Serialize for $name { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(self.as_str()) + } + } + + impl<'de> Deserialize<'de> for $name { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct Visitor; + + impl<'de> serde::de::Visitor<'de> for Visitor { + type Value = $name; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str(concat!("a valid ", stringify!($name))) + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + match value { + $( stringify!($variant) => Ok($name::$variant), )+ + _ => Err(E::custom(format!("invalid {}: {}", stringify!($name), value))), + } + } + } + + deserializer.deserialize_str(Visitor) + } + } + }; +} + +string_enum! { + /// Trade side. + enum TradeSide { + LONG, + SHORT, + FLAT, + } +} + +impl Default for TradeSide { + fn default() -> Self { + Self::FLAT + } +} + +string_enum! { + /// Execution stage for a trade slot. + enum TradeStage { + IDLE, + DECISION_CREATED, + INTENT_CREATED, + ORDER_REQUESTED, + ORDER_SENT, + ORDER_ACKED, + ORDER_REJECTED, + ENTRY_WORKING, + PARTIAL_FILL, + POSITION_OPENED, + POSITION_OPEN, + EXIT_REQUESTED, + EXIT_SENT, + EXIT_ACKED, + EXIT_REJECTED, + EXIT_WORKING, + POSITION_PARTIALLY_CLOSED, + POSITION_CLOSED, + CLOSED, + TRADE_TERMINAL_WRITTEN, + STALE_STATE_RECONCILING, + } +} + +impl Default for TradeStage { + fn default() -> Self { + Self::IDLE + } +} + +string_enum! { + /// Kernel command types. + enum KernelCommandType { + ENTER, + EXIT, + MARK_PRICE, + RECONCILE, + CONTROL, + CANCEL, + } +} + +string_enum! { + /// Normalized venue event kinds. + enum KernelEventKind { + ORDER_ACK, + ORDER_REJECT, + RATE_LIMITED, + PARTIAL_FILL, + FULL_FILL, + CANCEL_ACK, + CANCEL_REJECT, + MARK_PRICE, + RECONCILE, + CONTROL, + } +} + +string_enum! { + /// Structured diagnostic codes emitted by the kernel. + enum KernelDiagnosticCode { + OK, + INVALID_SLOT_ID, + UNSUPPORTED_INTENT, + SLOT_BUSY, + NO_OPEN_POSITION, + NO_ACTIVE_EXIT_ORDER, + RATE_LIMITED, + UNKNOWN_EVENT_KIND, + ORDER_REJECTED, + ENTRY_ORDER_REJECTED, + EXIT_ORDER_REJECTED, + CANCEL_REJECTED, + STALE_STATE_RECONCILE, + RECONCILED, + DUPLICATE_EVENT, + UNRESOLVED_SLOT, + INVALID_TRANSITION, + TERMINAL_STATE, + } +} + +impl Default for KernelDiagnosticCode { + fn default() -> Self { + Self::OK + } +} + +string_enum! { + /// Severity classification for kernel outcomes. + enum KernelSeverity { + INFO, + WARNING, + ERROR, + CRITICAL, + } +} + +impl Default for KernelSeverity { + fn default() -> Self { + Self::INFO + } +} + +string_enum! { + /// Order status surface mirrored from venue truth. + enum VenueOrderStatus { + NEW, + ACKED, + PARTIALLY_FILLED, + FILLED, + CANCELED, + REJECTED, + } +} + +impl Default for VenueOrderStatus { + fn default() -> Self { + Self::NEW + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum VenueEventStatus { + ACKED, + REJECTED, + RATE_LIMITED, + PARTIALLY_FILLED, + FILLED, + CANCELED, + CANCELED_REJECTED, +} + +impl VenueEventStatus { + fn as_str(&self) -> &'static str { + match self { + Self::ACKED => "ACKED", + Self::REJECTED => "REJECTED", + Self::RATE_LIMITED => "RATE_LIMITED", + Self::PARTIALLY_FILLED => "PARTIALLY_FILLED", + Self::FILLED => "FILLED", + Self::CANCELED => "CANCELED", + Self::CANCELED_REJECTED => "CANCEL_REJECTED", + } + } +} + +impl Serialize for VenueEventStatus { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(self.as_str()) + } +} + +impl<'de> Deserialize<'de> for VenueEventStatus { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct Visitor; + + impl<'de> serde::de::Visitor<'de> for Visitor { + type Value = VenueEventStatus; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("a valid VenueEventStatus") + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + match value { + "ACKED" => Ok(VenueEventStatus::ACKED), + "REJECTED" => Ok(VenueEventStatus::REJECTED), + "RATE_LIMITED" => Ok(VenueEventStatus::RATE_LIMITED), + "PARTIALLY_FILLED" => Ok(VenueEventStatus::PARTIALLY_FILLED), + "FILLED" => Ok(VenueEventStatus::FILLED), + "CANCELED" => Ok(VenueEventStatus::CANCELED), + "CANCEL_REJECTED" => Ok(VenueEventStatus::CANCELED_REJECTED), + _ => Err(E::custom(format!("invalid VenueEventStatus: {}", value))), + } + } + } + + deserializer.deserialize_str(Visitor) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct VenueOrder { + internal_trade_id: String, + venue_order_id: String, + venue_client_id: String, + side: TradeSide, + intended_size: f64, + filled_size: f64, + average_fill_price: f64, + status: VenueOrderStatus, + #[serde(default)] + metadata: Map, +} + +impl VenueOrder { +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct TradeSlot { + slot_id: usize, + #[serde(default)] + trade_id: String, + #[serde(default)] + asset: String, + #[serde(default)] + side: TradeSide, + #[serde(default)] + entry_price: f64, + #[serde(default)] + size: f64, + #[serde(default)] + initial_size: f64, + #[serde(default)] + leverage: f64, + #[serde(default)] + entry_time: Option>, + #[serde(default)] + unrealized_pnl: f64, + #[serde(default)] + realized_pnl: f64, + #[serde(default)] + closed: bool, + #[serde(default)] + exit_leg_ratios: Vec, + #[serde(default)] + active_leg_index: usize, + #[serde(default)] + active_exit_order: Option, + #[serde(default)] + active_entry_order: Option, + #[serde(default)] + fsm_state: TradeStage, + #[serde(default)] + close_reason: String, + #[serde(default)] + last_event_time: Option>, + #[serde(default)] + seen_event_ids: Vec, + #[serde(default)] + metadata: Map, +} + +impl Default for TradeSlot { + fn default() -> Self { + Self { + slot_id: 0, + trade_id: String::new(), + asset: String::new(), + side: TradeSide::FLAT, + entry_price: 0.0, + size: 0.0, + initial_size: 0.0, + leverage: 0.0, + entry_time: None, + unrealized_pnl: 0.0, + realized_pnl: 0.0, + closed: false, + exit_leg_ratios: vec![1.0], + active_leg_index: 0, + active_exit_order: None, + active_entry_order: None, + fsm_state: TradeStage::IDLE, + close_reason: String::new(), + last_event_time: None, + seen_event_ids: Vec::new(), + metadata: Map::new(), + } + } +} + +impl TradeSlot { + fn is_free(&self) -> bool { + matches!(self.fsm_state, TradeStage::IDLE | TradeStage::CLOSED) + && self.size <= 0.0 + && self.active_entry_order.is_none() + && self.active_exit_order.is_none() + } + + fn mark_price(&mut self, price: f64) { + if !price.is_finite() || price <= 0.0 { + return; + } + if self.entry_price <= 0.0 { + self.entry_price = price; + } + if self.entry_price <= 0.0 || self.size <= 0.0 { + self.unrealized_pnl = 0.0; + return; + } + let mut delta = (price - self.entry_price) / self.entry_price; + if self.side == TradeSide::SHORT { + delta = -delta; + } + self.unrealized_pnl = delta * self.size * self.entry_price * self.leverage; + self.metadata + .insert("mark_price".to_string(), Value::from(price)); + } + + fn next_exit_ratio(&self) -> f64 { + self.exit_leg_ratios + .get(self.active_leg_index) + .copied() + .unwrap_or(1.0) + .clamp(0.0, 1.0) + } + + fn consume_exit_leg(&mut self) -> f64 { + let ratio = self.next_exit_ratio(); + let max_index = self.exit_leg_ratios.len().max(1); + self.active_leg_index = (self.active_leg_index + 1).min(max_index); + ratio + } + + fn attach_entry_order(&mut self, order: VenueOrder) { + self.active_entry_order = Some(order); + } + + fn attach_exit_order(&mut self, order: VenueOrder) { + self.active_exit_order = Some(order); + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct KernelIntent { + timestamp: DateTime, + intent_id: String, + trade_id: String, + slot_id: i64, + asset: String, + side: TradeSide, + action: KernelCommandType, + reference_price: f64, + target_size: f64, + leverage: f64, + #[serde(default)] + exit_leg_ratios: Vec, + #[serde(default)] + reason: String, + #[serde(default)] + metadata: Map, + #[serde(default)] + stage: TradeStage, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct VenueEvent { + timestamp: DateTime, + event_id: String, + trade_id: String, + slot_id: i64, + kind: KernelEventKind, + status: VenueEventStatus, + #[serde(default)] + venue_order_id: String, + #[serde(default)] + venue_client_id: String, + #[serde(default)] + side: TradeSide, + #[serde(default)] + asset: String, + #[serde(default)] + price: f64, + #[serde(default)] + size: f64, + #[serde(default)] + filled_size: f64, + #[serde(default)] + remaining_size: f64, + #[serde(default)] + reason: String, + #[serde(default)] + raw_payload: Map, + #[serde(default)] + metadata: Map, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct KernelTransition { + timestamp: DateTime, + trade_id: String, + slot_id: usize, + prev_state: TradeStage, + next_state: TradeStage, + trigger: String, + #[serde(default)] + intent_id: String, + #[serde(default)] + event_id: String, + #[serde(default)] + control_mode: String, + #[serde(default)] + control_verbosity: String, + #[serde(default)] + details: Map, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct KernelOutcome { + accepted: bool, + slot_id: usize, + trade_id: String, + state: TradeStage, + diagnostic_code: KernelDiagnosticCode, + #[serde(default)] + severity: KernelSeverity, + #[serde(default)] + transitions: Vec, + #[serde(default)] + emitted_events: Vec, + #[serde(default)] + details: Map, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct KernelSnapshot { + slots: Vec, + active_trade_index: HashMap, + venue_order_index: HashMap, + client_order_index: HashMap, + account: AccountState, +} + +/// Per-kernel account ledger — K-values + E-facts + reconcile. +/// +/// K-values are computed deterministically from the event stream the kernel +/// has observed. E-facts are stored exactly as received from the exchange. +/// When E-facts are present they are authoritative (E rules); K is a +/// parallel integrity check. On every event the reconcile classifier runs +/// atomically: unexplained divergence between K and E is ERROR. +/// +/// Persisted to Zinc on every mutation so a crash/restart can reload +/// exact state rather than starting from zero. +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct AccountState { + // --- K-values: kernel's parallel computation ----------------------- + /// Starting capital set at kernel init (seed). + seed_capital: f64, + /// Σ realized PnL from all closed exit fills. + k_realized_pnl: f64, + /// Σ fees paid (always ≥ 0; reduces k_capital). + k_fees_paid: f64, + /// Net funding paid (positive = paid out; negative = received). + /// k_capital = seed + k_realized − k_fees − k_funding_net + k_funding_net: f64, + + // --- E-facts: exchange truth, rules when present ------------------- + /// Exchange wallet balance. 0.0 means not yet received. + e_wallet_balance: f64, + /// Exchange-reported available margin. + e_available_margin: f64, + /// Exchange-reported used margin. + e_used_margin: f64, + /// Exchange-reported maintenance margin. + e_maint_margin: f64, + + // --- Versioning + reconcile ---------------------------------------- + /// Monotonically increasing; bumped on every account event. + event_seq: u64, + /// "OK" | "WARN" | "ERROR" + reconcile_status: String, + /// |k_capital − e_wallet_balance|; 0 when no E-facts yet. + reconcile_delta: f64, + /// Human-readable explanation of any divergence. + reconcile_explanation: String, + /// Cached seed+realized−fees−funding_net. Kept as a field so + /// the snapshot JSON is self-contained (serde cannot call methods). + k_capital: f64, + /// Cached available_capital: e_available_margin when E-facts present, + /// k_capital as fallback. E rules. + available_capital: f64, +} + +impl AccountState { + /// Classify K vs E divergence and update all cached fields atomically. + /// Called at the end of every apply_* method. + fn reconcile(&mut self) { + // Recompute cached derived fields + let raw = self.seed_capital + self.k_realized_pnl - self.k_fees_paid - self.k_funding_net; + self.k_capital = if raw.is_finite() { raw } else { self.seed_capital }; + // E rules for available capital; K as fallback when no E-facts yet + self.available_capital = if self.e_wallet_balance > 0.0 { + self.e_available_margin + } else { + self.k_capital + }; + self.reconcile_delta = 0.0; + if self.e_wallet_balance <= 0.0 { + self.reconcile_status = "OK".to_string(); + self.reconcile_explanation = "NO_E_FACTS".to_string(); + return; + } + let delta = (self.k_capital - self.e_wallet_balance).abs(); + self.reconcile_delta = delta; + if delta < 0.01 { + self.reconcile_status = "OK".to_string(); + self.reconcile_explanation = String::new(); + } else if delta < 20.0 { + self.reconcile_status = "WARN".to_string(); + self.reconcile_explanation = format!( + "UNSETTLED|delta={:.6}|k={:.4}|e={:.4}", + delta, self.k_capital, self.e_wallet_balance + ); + } else { + self.reconcile_status = "ERROR".to_string(); + self.reconcile_explanation = format!( + "UNEXPLAINED|delta={:.6}|k={:.4}|e={:.4}", + delta, self.k_capital, self.e_wallet_balance + ); + } + } + + fn apply_fill_settled(&mut self, realized_pnl: f64, fee: f64) { + if realized_pnl.is_finite() { + self.k_realized_pnl += realized_pnl; + } + if fee.is_finite() && fee >= 0.0 { + self.k_fees_paid += fee; + } + self.event_seq += 1; + self.reconcile(); + } + + fn apply_account_update( + &mut self, + wallet_balance: f64, + available_margin: f64, + used_margin: f64, + maint_margin: f64, + ) { + // E-facts: store exactly as received; exchange always rules. + if wallet_balance.is_finite() && wallet_balance > 0.0 { + self.e_wallet_balance = wallet_balance; + } + if available_margin.is_finite() { + self.e_available_margin = available_margin; + } + if used_margin.is_finite() { + self.e_used_margin = used_margin; + } + if maint_margin.is_finite() { + self.e_maint_margin = maint_margin; + } + self.event_seq += 1; + self.reconcile(); + } + + fn apply_funding_fee(&mut self, amount: f64) { + if amount.is_finite() { + // amount > 0 = received (increases capital → reduces net funding paid) + // amount < 0 = paid (decreases capital → increases net funding paid) + self.k_funding_net -= amount; + } + self.event_seq += 1; + self.reconcile(); + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +struct KernelResult { + outcome: KernelOutcome, + slot: TradeSlot, + snapshot: KernelSnapshot, +} + +#[derive(Debug, Default)] +struct KernelCore { + slots: Vec, + active_trade_index: HashMap, + venue_order_index: HashMap, + client_order_index: HashMap, + account: AccountState, +} + +impl KernelCore { + fn new(max_slots: usize) -> Self { + let mut slots = Vec::with_capacity(max_slots); + for slot_id in 0..max_slots { + let mut slot = TradeSlot::default(); + slot.slot_id = slot_id; + slots.push(slot); + } + let mut core = Self { + slots, + active_trade_index: HashMap::new(), + venue_order_index: HashMap::new(), + client_order_index: HashMap::new(), + account: AccountState::default(), + }; + core.rebuild_indexes(); + core + } + + fn on_account_event(&mut self, payload: &str) -> Value { + let parsed: Value = match serde_json::from_str(payload) { + Ok(v) => v, + Err(e) => return json!({"error": format!("parse: {}", e)}), + }; + let kind = parsed.get("kind").and_then(|v| v.as_str()).unwrap_or("").to_uppercase(); + match kind.as_str() { + "FILL_SETTLED" => { + let realized = parsed.get("realized_pnl").and_then(|v| v.as_f64()).unwrap_or(0.0); + let fee = parsed.get("fee").and_then(|v| v.as_f64()).unwrap_or(0.0); + self.account.apply_fill_settled(realized, fee); + } + "ACCOUNT_UPDATE" => { + let wb = parsed.get("wallet_balance").and_then(|v| v.as_f64()).unwrap_or(0.0); + let am = parsed.get("available_margin").and_then(|v| v.as_f64()).unwrap_or(0.0); + let um = parsed.get("used_margin").and_then(|v| v.as_f64()).unwrap_or(0.0); + let mm = parsed.get("maint_margin").and_then(|v| v.as_f64()).unwrap_or(0.0); + self.account.apply_account_update(wb, am, um, mm); + } + "FUNDING_FEE" => { + let amount = parsed.get("funding_amount").and_then(|v| v.as_f64()).unwrap_or(0.0); + self.account.apply_funding_fee(amount); + } + _ => return json!({"error": format!("unknown account event kind: {}", kind)}), + } + serde_json::to_value(&self.account).unwrap_or(json!({"error":"serialize"})) + } + + fn snapshot(&self) -> KernelSnapshot { + KernelSnapshot { + slots: self.slots.clone(), + active_trade_index: self.active_trade_index.clone(), + venue_order_index: self.venue_order_index.clone(), + client_order_index: self.client_order_index.clone(), + account: self.account.clone(), + } + } + + fn rebuild_indexes(&mut self) { + self.active_trade_index.clear(); + self.venue_order_index.clear(); + self.client_order_index.clear(); + for slot in &self.slots { + if !slot.trade_id.is_empty() { + self.active_trade_index.insert(slot.trade_id.clone(), slot.slot_id); + } + if let Some(order) = &slot.active_entry_order { + self.client_order_index.insert(order.venue_client_id.clone(), slot.slot_id); + if !order.venue_order_id.is_empty() { + self.venue_order_index.insert(order.venue_order_id.clone(), slot.slot_id); + } + } + if let Some(order) = &slot.active_exit_order { + self.client_order_index.insert(order.venue_client_id.clone(), slot.slot_id); + if !order.venue_order_id.is_empty() { + self.venue_order_index.insert(order.venue_order_id.clone(), slot.slot_id); + } + } + } + } + + fn slot(&self, slot_id: usize) -> Option<&TradeSlot> { + self.slots.get(slot_id) + } + + fn commit_slot(&mut self, slot: TradeSlot) { + let slot_id = slot.slot_id; + if slot_id < self.slots.len() { + self.slots[slot_id] = slot; + self.rebuild_indexes(); + } + } + + fn resolve_slot(&self, event: &VenueEvent) -> usize { + let slot_id = event.slot_id; + if slot_id >= 0 { + let slot_id = slot_id as usize; + if slot_id < self.slots.len() { + return slot_id; + } + } + if let Some(slot_id) = self.active_trade_index.get(&event.trade_id) { + return *slot_id; + } + if let Some(slot_id) = self.venue_order_index.get(&event.venue_order_id) { + return *slot_id; + } + if let Some(slot_id) = self.client_order_index.get(&event.venue_client_id) { + return *slot_id; + } + self.slots.first().map(|slot| slot.slot_id).unwrap_or(0) + } + + fn transition( + &self, + slot: &TradeSlot, + prev: TradeStage, + next_state: TradeStage, + trigger: &str, + event: Option<&VenueEvent>, + control_mode: &str, + control_verbosity: &str, + ) -> KernelTransition { + KernelTransition { + timestamp: event + .map(|e| e.timestamp) + .unwrap_or_else(Utc::now), + trade_id: slot.trade_id.clone(), + slot_id: slot.slot_id, + prev_state: prev, + next_state, + trigger: trigger.to_string(), + intent_id: event.map(|e| e.venue_client_id.clone()).unwrap_or_default(), + event_id: event.map(|e| e.event_id.clone()).unwrap_or_default(), + control_mode: control_mode.to_string(), + control_verbosity: control_verbosity.to_string(), + details: json!({ + "asset": slot.asset, + "side": slot.side, + "closed": slot.closed, + }) + .as_object() + .cloned() + .unwrap_or_default(), + } + } + + fn realized_pnl(slot: &TradeSlot, exit_price: f64, exit_size: f64) -> f64 { + if slot.entry_price <= 0.0 || exit_size <= 0.0 { + return 0.0; + } + let mut delta = (exit_price - slot.entry_price) / slot.entry_price; + if slot.side == TradeSide::SHORT { + delta = -delta; + } + let notional = exit_size * slot.entry_price * slot.leverage.max(1.0); + delta * notional + } + + fn append_event_id(slot: &mut TradeSlot, event_id: &str) { + if event_id.is_empty() { + return; + } + if slot.seen_event_ids.iter().any(|seen| seen == event_id) { + return; + } + slot.seen_event_ids.push(event_id.to_string()); + if slot.seen_event_ids.len() > MAX_SEEN_EVENT_IDS { + let overflow = slot.seen_event_ids.len() - MAX_SEEN_EVENT_IDS; + slot.seen_event_ids.drain(0..overflow); + } + } + + fn validate_slot(slot: &TradeSlot) -> Result<(), String> { + match slot.fsm_state { + TradeStage::IDLE => { + if slot.size.abs() > 1e-12 { + return Err(format!("IDLE slot {} has nonzero size {}", slot.slot_id, slot.size)); + } + } + TradeStage::POSITION_OPEN | TradeStage::ENTRY_WORKING | TradeStage::EXIT_WORKING => { + if slot.size.abs() <= 1e-12 && !slot.active_entry_order.is_some() { + return Err(format!( + "{} slot {} has zero size and no entry order", + slot.fsm_state.as_str(), + slot.slot_id + )); + } + if slot.asset.is_empty() { + return Err(format!("{} slot {} has empty asset", slot.fsm_state.as_str(), slot.slot_id)); + } + } + TradeStage::CLOSED => { + if !slot.closed { + return Err(format!("CLOSED slot {} has closed=false", slot.slot_id)); + } + } + _ => {} + } + if slot.size < 0.0 { + return Err(format!("slot {} has negative size {}", slot.slot_id, slot.size)); + } + Ok(()) + } + + fn set_slot_from_json(&mut self, slot_id: usize, json: &str) -> Result<(), String> { + if slot_id >= self.slots.len() { + return Err("INVALID_SLOT_ID".to_string()); + } + let slot: TradeSlot = serde_json::from_str(json).map_err(|err| err.to_string())?; + self.slots[slot_id] = slot; + self.rebuild_indexes(); + Ok(()) + } + + fn process_intent( + &mut self, + intent: KernelIntent, + control_mode: &str, + control_verbosity: &str, + ) -> KernelResult { + let slot_id = intent.slot_id; + if slot_id < 0 || slot_id as usize >= self.slots.len() { + let slot_id = slot_id.max(0) as usize; + return KernelResult { + outcome: KernelOutcome { + accepted: false, + slot_id, + trade_id: intent.trade_id.clone(), + state: TradeStage::IDLE, + diagnostic_code: KernelDiagnosticCode::INVALID_SLOT_ID, + details: json!({ + "reason": "INVALID_SLOT_ID", + "slot_id": intent.slot_id, + "intent_id": intent.intent_id, + }) + .as_object() + .cloned() + .unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: TradeSlot::default(), + snapshot: self.snapshot(), + }; + } + let mut slot = self.slots[slot_id as usize].clone(); + if matches!(intent.action, KernelCommandType::ENTER) { + if !slot.is_free() && !slot.trade_id.is_empty() && slot.trade_id != intent.trade_id { + return KernelResult { + outcome: KernelOutcome { + accepted: false, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::SLOT_BUSY, + details: json!({"reason": "SLOT_BUSY"}).as_object().cloned().unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + }; + } + slot.trade_id = intent.trade_id.clone(); + slot.asset = intent.asset.clone(); + slot.side = intent.side.clone(); + slot.leverage = if intent.leverage.is_finite() && intent.leverage > 0.0 { + intent.leverage + } else { + 1.0 + }; + slot.entry_time = Some(intent.timestamp); + slot.entry_price = 0.0; + slot.size = 0.0; + slot.initial_size = 0.0; + slot.unrealized_pnl = 0.0; + slot.realized_pnl = 0.0; + slot.exit_leg_ratios = if intent.exit_leg_ratios.is_empty() { + vec![1.0] + } else { + intent.exit_leg_ratios.clone() + }; + slot.active_leg_index = 0; + slot.active_entry_order = None; + slot.active_exit_order = None; + slot.close_reason.clear(); + slot.closed = false; + slot.last_event_time = None; + slot.fsm_state = TradeStage::ORDER_REQUESTED; + slot.attach_entry_order(VenueOrder { + internal_trade_id: intent.trade_id.clone(), + venue_order_id: String::new(), + venue_client_id: format!("{}:{}", intent.trade_id, intent.intent_id), + side: intent.side.clone(), + intended_size: intent.target_size.max(0.0), + filled_size: 0.0, + average_fill_price: 0.0, + status: VenueOrderStatus::NEW, + metadata: json!({ + "slot_id": slot.slot_id, + "asset": intent.asset, + "reference_price": intent.reference_price, + "leverage": intent.leverage, + "reason": intent.reason, + "action": intent.action, + }) + .as_object() + .cloned() + .unwrap_or_default(), + }); + self.commit_slot(slot.clone()); + let transition = self.transition( + &slot, + TradeStage::IDLE, + slot.fsm_state.clone(), + "ENTER_INTENT", + None, + control_mode, + control_verbosity, + ); + return KernelResult { + outcome: KernelOutcome { + accepted: true, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::OK, + transitions: vec![transition], + details: json!({"action": "ENTER"}).as_object().cloned().unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + }; + } + if matches!(intent.action, KernelCommandType::EXIT) { + if slot.is_free() || slot.closed || slot.size <= 0.0 { + return KernelResult { + outcome: KernelOutcome { + accepted: false, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::NO_OPEN_POSITION, + details: json!({"reason": "NO_OPEN_POSITION"}).as_object().cloned().unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + }; + } + let exit_ratio = slot.next_exit_ratio(); + let base_size = if slot.initial_size > 0.0 { slot.initial_size } else { slot.size }; + let exit_size = (base_size * exit_ratio).max(0.0); + let exit_prev_state = slot.fsm_state.clone(); + slot.fsm_state = TradeStage::EXIT_REQUESTED; + slot.attach_exit_order(VenueOrder { + internal_trade_id: slot.trade_id.clone(), + venue_order_id: String::new(), + venue_client_id: format!("{}:{}", intent.trade_id, intent.intent_id), + side: intent.side.clone(), + intended_size: exit_size, + filled_size: 0.0, + average_fill_price: 0.0, + status: VenueOrderStatus::NEW, + metadata: json!({ + "slot_id": slot.slot_id, + "asset": intent.asset, + "reference_price": intent.reference_price, + "leverage": intent.leverage, + "reason": intent.reason, + "action": intent.action, + }) + .as_object() + .cloned() + .unwrap_or_default(), + }); + self.commit_slot(slot.clone()); + let transition = self.transition( + &slot, + exit_prev_state, + slot.fsm_state.clone(), + "EXIT_INTENT", + None, + control_mode, + control_verbosity, + ); + return KernelResult { + outcome: KernelOutcome { + accepted: true, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::OK, + transitions: vec![transition], + details: json!({"action": "EXIT", "exit_size": exit_size}) + .as_object() + .cloned() + .unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + }; + } + if matches!(intent.action, KernelCommandType::MARK_PRICE) { + slot.mark_price(intent.reference_price); + self.commit_slot(slot.clone()); + let transition = self.transition( + &slot, + slot.fsm_state.clone(), + slot.fsm_state.clone(), + "MARK_PRICE", + None, + control_mode, + control_verbosity, + ); + return KernelResult { + outcome: KernelOutcome { + accepted: true, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::OK, + transitions: vec![transition], + details: json!({"action": "MARK_PRICE"}).as_object().cloned().unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + }; + } + if matches!(intent.action, KernelCommandType::RECONCILE) { + let prev = slot.fsm_state.clone(); + slot.fsm_state = TradeStage::STALE_STATE_RECONCILING; + self.commit_slot(slot.clone()); + let transition = self.transition( + &slot, + prev, + slot.fsm_state.clone(), + "RECONCILE", + None, + control_mode, + control_verbosity, + ); + return KernelResult { + outcome: KernelOutcome { + accepted: true, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::STALE_STATE_RECONCILE, + transitions: vec![transition], + details: json!({"action": "RECONCILE"}).as_object().cloned().unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + }; + } + if matches!(intent.action, KernelCommandType::CANCEL) { + let has_cancellable_exit = slot.active_exit_order.is_some(); + let has_cancellable_entry = slot.active_entry_order.is_some() + && matches!( + slot.fsm_state, + TradeStage::ENTRY_WORKING + | TradeStage::ORDER_REQUESTED + | TradeStage::ORDER_SENT + | TradeStage::IDLE + ); + if !has_cancellable_exit && !has_cancellable_entry { + return KernelResult { + outcome: KernelOutcome { + accepted: false, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::NO_ACTIVE_EXIT_ORDER, + details: json!({"reason": "NO_ACTIVE_EXIT_ORDER"}).as_object().cloned().unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + }; + } + return KernelResult { + outcome: KernelOutcome { + accepted: true, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::OK, + details: json!({"action": "CANCEL"}).as_object().cloned().unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + }; + } + KernelResult { + outcome: KernelOutcome { + accepted: false, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::UNSUPPORTED_INTENT, + details: json!({ + "reason": "UNSUPPORTED_INTENT", + "intent_action": intent.action, + "intent_id": intent.intent_id, + }) + .as_object() + .cloned() + .unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + } + } + + fn on_venue_event( + &mut self, + event: VenueEvent, + control_mode: &str, + control_verbosity: &str, + ) -> KernelResult { + let slot_id = self.resolve_slot(&event); + let mut slot = self.slots[slot_id].clone(); + + if !event.event_id.is_empty() && slot.seen_event_ids.iter().any(|seen| seen == &event.event_id) { + let prev_state = slot.fsm_state.clone(); + let transition = self.transition( + &slot, + prev_state.clone(), + prev_state.clone(), + "DUPLICATE_EVENT", + Some(&event), + control_mode, + control_verbosity, + ); + self.commit_slot(slot.clone()); + return KernelResult { + outcome: KernelOutcome { + accepted: true, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::DUPLICATE_EVENT, + transitions: vec![transition], + details: json!({ + "event_kind": event.kind, + "reason": "DUPLICATE_EVENT", + }) + .as_object() + .cloned() + .unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + }; + } + + // I13: Reject stray events on completed slots. A delayed venue event + // (e.g. a fill that arrives after the slot is already closed) must not + // reactivate the slot or corrupt its FSM state. Record the event_id + // so a repeat of the same stray is caught by the dedup guard above. + if slot.closed { + let prev_state = slot.fsm_state.clone(); + let transition = self.transition( + &slot, + prev_state.clone(), + prev_state.clone(), + "TERMINAL_STATE", + Some(&event), + control_mode, + control_verbosity, + ); + Self::append_event_id(&mut slot, &event.event_id); + self.commit_slot(slot.clone()); + return KernelResult { + outcome: KernelOutcome { + accepted: false, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::TERMINAL_STATE, + transitions: vec![transition], + details: json!({ + "event_kind": event.kind, + "reason": "TERMINAL_STATE", + }) + .as_object() + .cloned() + .unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + }; + } + + if slot.fsm_state == TradeStage::STALE_STATE_RECONCILING { + let prev_state = slot.fsm_state.clone(); + let transition = self.transition( + &slot, + prev_state.clone(), + prev_state.clone(), + "STALE_STATE_RECONCILE", + Some(&event), + control_mode, + control_verbosity, + ); + Self::append_event_id(&mut slot, &event.event_id); + self.commit_slot(slot.clone()); + return KernelResult { + outcome: KernelOutcome { + accepted: event.kind == KernelEventKind::RECONCILE, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::STALE_STATE_RECONCILE, + transitions: vec![transition], + details: json!({ + "event_kind": event.kind, + "reason": "STALE_STATE_RECONCILING", + }) + .as_object() + .cloned() + .unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + }; + } + + let prev_state = slot.fsm_state.clone(); + let mut accepted = true; + let mut diagnostic_code = KernelDiagnosticCode::OK; + + // Propagate the venue's order id onto the working order whenever the + // exchange provides one — for ALL order types and event kinds (ACK, + // partial/full fill). Orders are created at submit time with an empty + // venue_order_id; recording the assigned id lets a later cancel + // reference the real exchange order (essential for resting LIMIT cancel; + // harmless for MARKET, which fills synchronously). Only fills empty ids + // (never overwrites) and targets the currently-active order. + if !event.venue_order_id.is_empty() { + let target = if slot.active_entry_order.is_some() { + slot.active_entry_order.as_mut() + } else { + slot.active_exit_order.as_mut() + }; + if let Some(order) = target { + if order.venue_order_id.is_empty() { + order.venue_order_id = event.venue_order_id.clone(); + } + if !event.venue_client_id.is_empty() && order.venue_client_id.is_empty() { + order.venue_client_id = event.venue_client_id.clone(); + } + } + } + + match event.kind { + KernelEventKind::ORDER_ACK => { + if slot.active_entry_order.is_some() + && matches!( + slot.fsm_state, + TradeStage::IDLE + | TradeStage::ORDER_REQUESTED + | TradeStage::ORDER_SENT + | TradeStage::ENTRY_WORKING + ) + { + slot.fsm_state = TradeStage::ENTRY_WORKING; + } else if slot.active_exit_order.is_some() + && matches!( + slot.fsm_state, + TradeStage::POSITION_OPEN + | TradeStage::EXIT_REQUESTED + | TradeStage::EXIT_SENT + | TradeStage::EXIT_WORKING + ) + { + slot.fsm_state = TradeStage::EXIT_WORKING; + } else if slot.active_entry_order.as_ref().map(|o| o.status == VenueOrderStatus::FILLED).unwrap_or(false) + && matches!( + slot.fsm_state, + TradeStage::POSITION_OPEN | TradeStage::EXIT_WORKING | TradeStage::CLOSED + ) + { + diagnostic_code = KernelDiagnosticCode::DUPLICATE_EVENT; + } else if slot.active_entry_order.is_some() { + slot.fsm_state = TradeStage::ENTRY_WORKING; + } else if slot.active_exit_order.is_some() { + slot.fsm_state = TradeStage::EXIT_WORKING; + } + } + KernelEventKind::ORDER_REJECT => { + if slot.active_entry_order.is_some() && slot.fsm_state != TradeStage::POSITION_OPEN { + slot.active_entry_order = None; + slot.trade_id.clear(); + slot.asset.clear(); + slot.side = TradeSide::FLAT; + slot.size = 0.0; + slot.initial_size = 0.0; + slot.closed = false; + slot.close_reason = if event.reason.is_empty() { + "ORDER_REJECTED".to_string() + } else { + event.reason.clone() + }; + slot.fsm_state = TradeStage::IDLE; + diagnostic_code = KernelDiagnosticCode::ENTRY_ORDER_REJECTED; + } else if slot.active_exit_order.is_some() { + slot.active_exit_order = None; + slot.fsm_state = TradeStage::POSITION_OPEN; + diagnostic_code = KernelDiagnosticCode::EXIT_ORDER_REJECTED; + } else { + slot.fsm_state = TradeStage::IDLE; + diagnostic_code = KernelDiagnosticCode::ORDER_REJECTED; + } + } + KernelEventKind::RATE_LIMITED => { + accepted = false; + diagnostic_code = KernelDiagnosticCode::RATE_LIMITED; + slot.close_reason = if event.reason.is_empty() { + "RATE_LIMITED".to_string() + } else { + event.reason.clone() + }; + } + KernelEventKind::PARTIAL_FILL => { + self.apply_fill(&mut slot, &event, true); + } + KernelEventKind::FULL_FILL => { + self.apply_fill(&mut slot, &event, false); + } + KernelEventKind::CANCEL_ACK => { + if slot.active_exit_order.is_some() { + slot.active_exit_order = None; + slot.fsm_state = TradeStage::POSITION_OPEN; + } else if slot.active_entry_order.is_some() + && matches!( + slot.fsm_state, + TradeStage::ENTRY_WORKING + | TradeStage::ORDER_REQUESTED + | TradeStage::ORDER_SENT + | TradeStage::IDLE + ) + { + slot.active_entry_order = None; + slot.trade_id.clear(); + slot.asset.clear(); + slot.side = TradeSide::FLAT; + slot.size = 0.0; + slot.initial_size = 0.0; + slot.unrealized_pnl = 0.0; + slot.realized_pnl = 0.0; + slot.fsm_state = TradeStage::IDLE; + slot.closed = false; + } + } + KernelEventKind::CANCEL_REJECT => { + if slot.fsm_state == TradeStage::EXIT_WORKING { + slot.fsm_state = TradeStage::EXIT_WORKING; + } + diagnostic_code = KernelDiagnosticCode::CANCEL_REJECTED; + } + KernelEventKind::MARK_PRICE => { + slot.mark_price(event.price); + } + KernelEventKind::RECONCILE => { + slot.fsm_state = TradeStage::STALE_STATE_RECONCILING; + } + KernelEventKind::CONTROL => { + accepted = false; + diagnostic_code = KernelDiagnosticCode::UNKNOWN_EVENT_KIND; + } + } + + Self::append_event_id(&mut slot, &event.event_id); + self.commit_slot(slot.clone()); + let mut details = json!({"event_kind": event.kind}) + .as_object() + .cloned() + .unwrap_or_default(); + if event.kind == KernelEventKind::RATE_LIMITED { + details.insert( + "venue_event_kind".to_string(), + Value::String(event.kind.as_str().to_string()), + ); + details.insert("severity".to_string(), Value::String("WARNING".to_string())); + details.insert( + "reason".to_string(), + Value::String(if event.reason.is_empty() { + "RATE_LIMITED".to_string() + } else { + event.reason.clone() + }), + ); + if let Some(retry_after_ms) = event + .metadata + .get("retry_after_ms") + .and_then(|value| value.as_i64()) + { + details.insert("retry_after_ms".to_string(), Value::from(retry_after_ms)); + } + details.insert("release_eta".to_string(), Value::String("few minutes".to_string())); + details.insert("retryable".to_string(), Value::Bool(true)); + } + let transition = self.transition( + &slot, + prev_state, + slot.fsm_state.clone(), + match event.kind { + KernelEventKind::ORDER_ACK => "ORDER_ACK", + KernelEventKind::ORDER_REJECT => "ORDER_REJECT", + KernelEventKind::RATE_LIMITED => "RATE_LIMITED", + KernelEventKind::PARTIAL_FILL => "PARTIAL_FILL", + KernelEventKind::FULL_FILL => "FULL_FILL", + KernelEventKind::CANCEL_ACK => "CANCEL_ACK", + KernelEventKind::CANCEL_REJECT => "CANCEL_REJECT", + KernelEventKind::MARK_PRICE => "MARK_PRICE", + KernelEventKind::RECONCILE => "RECONCILE", + KernelEventKind::CONTROL => "UNKNOWN_EVENT", + }, + Some(&event), + control_mode, + control_verbosity, + ); + KernelResult { + outcome: KernelOutcome { + accepted, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code, + severity: if event.kind == KernelEventKind::RATE_LIMITED { + KernelSeverity::WARNING + } else { + KernelSeverity::INFO + }, + transitions: vec![transition], + details, + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + } + } + + fn apply_fill(&mut self, slot: &mut TradeSlot, event: &VenueEvent, partial: bool) { + if slot.active_entry_order.is_some() + && matches!( + slot.fsm_state, + TradeStage::ORDER_REQUESTED + | TradeStage::ORDER_SENT + | TradeStage::ENTRY_WORKING + | TradeStage::IDLE + ) + { + let fill_size = if event.filled_size > 0.0 { + event.filled_size + } else { + event.size + } + .max(0.0); + // Accumulate incremental fills. WS events carry lastFilledQty + // (incremental per-event); REST/snapshot events are cumulative but + // arrive as a single FULL_FILL with prev_filled == 0, so the sum + // equals fill_size in that case — no change in behavior. + let prev_filled = slot + .active_entry_order + .as_ref() + .map(|order| order.filled_size) + .unwrap_or(0.0); + let accumulated = prev_filled + fill_size; + let intended_size = slot + .active_entry_order + .as_ref() + .map(|order| order.intended_size) + .unwrap_or(event.size); + slot.active_entry_order = Some(VenueOrder { + internal_trade_id: slot.trade_id.clone(), + venue_order_id: event.venue_order_id.clone(), + venue_client_id: event.venue_client_id.clone(), + side: slot.side.clone(), + intended_size, + filled_size: accumulated, + average_fill_price: event.price, + status: if partial { + VenueOrderStatus::PARTIALLY_FILLED + } else { + VenueOrderStatus::FILLED + }, + metadata: { + let mut map = Map::new(); + map.insert("slot_id".to_string(), Value::from(slot.slot_id as i64)); + map + }, + }); + // Set initial_size from the intended order size on first fill only. + if slot.initial_size <= 0.0 { + slot.initial_size = if intended_size > 0.0 { intended_size } else { accumulated }; + } + slot.size = accumulated; + if event.price > 0.0 { + slot.entry_price = event.price; + } + slot.unrealized_pnl = 0.0; + slot.last_event_time = Some(event.timestamp); + if partial { + slot.fsm_state = TradeStage::ENTRY_WORKING; + } else { + slot.fsm_state = TradeStage::POSITION_OPEN; + slot.active_entry_order = Some(VenueOrder { + internal_trade_id: slot.trade_id.clone(), + venue_order_id: event.venue_order_id.clone(), + venue_client_id: event.venue_client_id.clone(), + side: slot.side.clone(), + intended_size: slot.size, + filled_size: slot.size, + average_fill_price: event.price, + status: VenueOrderStatus::FILLED, + metadata: { + let mut map = Map::new(); + map.insert("slot_id".to_string(), Value::from(slot.slot_id as i64)); + map + }, + }); + } + return; + } + + if slot.active_exit_order.is_some() + && matches!( + slot.fsm_state, + TradeStage::EXIT_REQUESTED + | TradeStage::EXIT_SENT + | TradeStage::EXIT_WORKING + | TradeStage::POSITION_OPEN + ) + { + let fill_size = if event.filled_size > 0.0 { + event.filled_size + } else { + event.size + } + .max(0.0); + let realized = Self::realized_pnl(slot, event.price, fill_size); + slot.realized_pnl += realized; + slot.size = (slot.size - fill_size).max(0.0); + slot.mark_price(event.price); + slot.last_event_time = Some(event.timestamp); + + let all_legs_done = slot.active_leg_index >= slot.exit_leg_ratios.len(); + let should_close = (slot.size <= 1e-12 || (!partial && all_legs_done)); + + if !partial { + slot.consume_exit_leg(); + } + + if should_close && slot.size <= 1e-12 { + slot.closed = true; + slot.close_reason = if event.reason.is_empty() { + slot.close_reason.clone() + } else { + event.reason.clone() + }; + slot.fsm_state = TradeStage::CLOSED; + slot.active_exit_order = None; + slot.active_entry_order = None; + } else if !partial && !all_legs_done { + slot.fsm_state = TradeStage::POSITION_OPEN; + slot.active_exit_order = None; + } else if partial { + slot.fsm_state = TradeStage::EXIT_WORKING; + slot.active_exit_order = Some(VenueOrder { + internal_trade_id: slot.trade_id.clone(), + venue_order_id: event.venue_order_id.clone(), + venue_client_id: event.venue_client_id.clone(), + side: slot.side.clone(), + intended_size: fill_size, + filled_size: fill_size, + average_fill_price: event.price, + status: VenueOrderStatus::PARTIALLY_FILLED, + metadata: { + let mut map = Map::new(); + map.insert("slot_id".to_string(), Value::from(slot.slot_id as i64)); + map + }, + }); + } else { + slot.fsm_state = TradeStage::POSITION_OPEN; + slot.active_exit_order = None; + } + } + } +} + +fn cstr_to_string(ptr: *const c_char) -> Result { + if ptr.is_null() { + return Err("NULL_POINTER".to_string()); + } + unsafe { CStr::from_ptr(ptr) } + .to_str() + .map(|s| s.to_string()) + .map_err(|err| err.to_string()) +} + +fn into_c_string(value: &str) -> *mut c_char { + // Strip embedded NUL bytes so CString::new never panics. A NUL in a JSON + // payload (e.g. from a malformed exchange response) would otherwise crash + // the process via unwrap(). + match CString::new(value) { + Ok(cs) => cs.into_raw(), + Err(_) => { + let sanitized = value.replace('\0', "\\u0000"); + CString::new(sanitized).unwrap_or_else(|_| CString::new("").unwrap()).into_raw() + } + } +} + +/// Build a well-formed INVALID_INTENT KernelResult JSON string. Used when the +/// kernel cannot safely process a request — a payload that fails to parse +/// (e.g. non-finite Infinity/NaN tokens serde rejects) or a result that fails +/// to serialize (a non-finite value produced internally). This keeps the kernel +/// from ever returning a null string on non-finite input/output: it rejects +/// cleanly with a diagnostic instead of panicking. +fn invalid_intent_cstring(reason: &str, detail: &str) -> *mut c_char { + let fallback = serde_json::json!({ + "outcome": { + "accepted": false, + "slot_id": 0, + "trade_id": "", + "state": "IDLE", + "diagnostic_code": "INVALID_INTENT", + "severity": "WARNING", + "transitions": [], + "emitted_events": [], + "details": {"reason": reason, "detail": detail} + }, + "slot": serde_json::Value::Null, + "snapshot": serde_json::Value::Null + }); + into_c_string(&fallback.to_string()) +} + +fn with_handle_mut(handle: *mut KernelHandle, f: F) -> Result +where + F: FnOnce(&mut KernelCore) -> Result, +{ + if handle.is_null() { + return Err("NULL_HANDLE".to_string()); + } + let handle = unsafe { &mut *handle }; + f(&mut handle.core) +} + +#[no_mangle] +pub extern "C" fn dita_kernel_create(max_slots: usize) -> *mut KernelHandle { + let handle = KernelHandle { + core: KernelCore::new(max_slots.max(1)), + }; + Box::into_raw(Box::new(handle)) +} + +#[no_mangle] +pub extern "C" fn dita_kernel_destroy(handle: *mut KernelHandle) { + if !handle.is_null() { + unsafe { + drop(Box::from_raw(handle)); + } + } +} + +#[no_mangle] +pub extern "C" fn dita_kernel_free_string(ptr: *mut c_char) { + if !ptr.is_null() { + unsafe { + drop(CString::from_raw(ptr)); + } + } +} + +#[no_mangle] +pub extern "C" fn dita_kernel_get_slot_json(handle: *mut KernelHandle, slot_id: usize) -> *mut c_char { + match with_handle_mut(handle, |core| { + core.slot(slot_id) + .map(|slot| serde_json::to_string(slot).map_err(|err| err.to_string())) + .unwrap_or_else(|| Err("INVALID_SLOT_ID".to_string())) + }) { + Ok(json) => into_c_string(&json), + Err(_) => ptr::null_mut(), + } +} + +#[no_mangle] +pub extern "C" fn dita_kernel_set_slot_json(handle: *mut KernelHandle, slot_id: usize, payload: *const c_char) -> i32 { + let payload = match cstr_to_string(payload) { + Ok(value) => value, + Err(_) => return -22, + }; + match with_handle_mut(handle, |core| core.set_slot_from_json(slot_id, &payload)) { + Ok(()) => 0, + Err(_) => -22, + } +} + +#[no_mangle] +pub extern "C" fn dita_kernel_process_intent_json( + handle: *mut KernelHandle, + payload: *const c_char, + control_mode: *const c_char, + control_verbosity: *const c_char, +) -> *mut c_char { + let payload = match cstr_to_string(payload) { + Ok(value) => value, + Err(_) => return ptr::null_mut(), + }; + let control_mode = cstr_to_string(control_mode).unwrap_or_else(|_| "NORMAL".to_string()); + let control_verbosity = cstr_to_string(control_verbosity).unwrap_or_else(|_| "QUIET".to_string()); + // Reject non-parseable payloads (incl. non-finite Infinity/NaN tokens serde + // refuses) with a clean INVALID_INTENT, never a null string. + let intent: KernelIntent = match serde_json::from_str(&payload) { + Ok(value) => value, + Err(err) => return invalid_intent_cstring("INVALID_INTENT_PARSE", &err.to_string()), + }; + match with_handle_mut(handle, |core| { + Ok::<_, String>(core.process_intent(intent, &control_mode, &control_verbosity)) + }) { + // A serialize failure means a non-finite value reached the output; reject + // cleanly rather than returning null. + Ok(result) => match serde_json::to_string(&result) { + Ok(s) => into_c_string(&s), + Err(err) => invalid_intent_cstring("INVALID_INTENT_SERIALIZE", &err.to_string()), + }, + Err(_) => ptr::null_mut(), + } +} + +#[no_mangle] +pub extern "C" fn dita_kernel_on_venue_event_json( + handle: *mut KernelHandle, + payload: *const c_char, + control_mode: *const c_char, + control_verbosity: *const c_char, +) -> *mut c_char { + let payload = match cstr_to_string(payload) { + Ok(value) => value, + Err(_) => return ptr::null_mut(), + }; + let control_mode = cstr_to_string(control_mode).unwrap_or_else(|_| "NORMAL".to_string()); + let control_verbosity = cstr_to_string(control_verbosity).unwrap_or_else(|_| "QUIET".to_string()); + let event: VenueEvent = match serde_json::from_str(&payload) { + Ok(value) => value, + Err(err) => return invalid_intent_cstring("INVALID_EVENT_PARSE", &err.to_string()), + }; + match with_handle_mut(handle, |core| { + Ok::<_, String>(core.on_venue_event(event, &control_mode, &control_verbosity)) + }) { + Ok(result) => match serde_json::to_string(&result) { + Ok(s) => into_c_string(&s), + Err(err) => invalid_intent_cstring("INVALID_EVENT_SERIALIZE", &err.to_string()), + }, + Err(_) => ptr::null_mut(), + } +} + +#[no_mangle] +pub extern "C" fn dita_kernel_reconcile_slots_json( + handle: *mut KernelHandle, + payload: *const c_char, + control_mode: *const c_char, + control_verbosity: *const c_char, +) -> *mut c_char { + let payload = match cstr_to_string(payload) { + Ok(value) => value, + Err(_) => return ptr::null_mut(), + }; + let control_mode = cstr_to_string(control_mode).unwrap_or_else(|_| "NORMAL".to_string()); + let control_verbosity = cstr_to_string(control_verbosity).unwrap_or_else(|_| "QUIET".to_string()); + match with_handle_mut(handle, |core| { + let slots: Vec = serde_json::from_str(&payload).map_err(|err| err.to_string())?; + let mut validation_errors: Vec = Vec::new(); + for slot in &slots { + if let Err(e) = KernelCore::validate_slot(slot) { + validation_errors.push(e); + } + } + if !validation_errors.is_empty() { + let outcome = KernelOutcome { + accepted: false, + slot_id: 0, + trade_id: String::new(), + state: TradeStage::STALE_STATE_RECONCILING, + diagnostic_code: KernelDiagnosticCode::STALE_STATE_RECONCILE, + details: json!({ + "reason": "VALIDATION_FAILED", + "errors": validation_errors, + }) + .as_object() + .cloned() + .unwrap_or_default(), + ..KernelOutcome::default() + }; + let snapshot = core.snapshot(); + return Ok(KernelResult { + outcome, + slot: snapshot.slots.first().cloned().unwrap_or_default(), + snapshot, + }); + } + for slot in slots { + if slot.slot_id < core.slots.len() { + core.slots[slot.slot_id] = slot.clone(); + } + } + core.rebuild_indexes(); + let snapshot = core.snapshot(); + let outcome = KernelOutcome { + accepted: true, + slot_id: 0, + trade_id: String::new(), + state: TradeStage::STALE_STATE_RECONCILING, + diagnostic_code: KernelDiagnosticCode::RECONCILED, + details: json!({ + "reconciled_slots": snapshot.slots.len(), + "control_mode": control_mode, + "control_verbosity": control_verbosity, + }) + .as_object() + .cloned() + .unwrap_or_default(), + ..KernelOutcome::default() + }; + Ok(KernelResult { + outcome, + slot: snapshot.slots.first().cloned().unwrap_or_default(), + snapshot, + }) + }) { + Ok(result) => serde_json::to_string(&result).ok().map(|s| into_c_string(&s)).unwrap_or(ptr::null_mut()), + Err(_) => ptr::null_mut(), + } +} + +#[no_mangle] +pub extern "C" fn dita_kernel_snapshot_json(handle: *mut KernelHandle) -> *mut c_char { + match with_handle_mut(handle, |core| Ok(core.snapshot())) { + Ok(snapshot) => serde_json::to_string(&snapshot).ok().map(|s| into_c_string(&s)).unwrap_or(ptr::null_mut()), + Err(_) => ptr::null_mut(), + } +} + +/// Set the seed capital for the kernel's K-value fold. +/// Must be called once at kernel init (before any account events). +/// Returns 0 on success, -1 on error. +#[no_mangle] +pub extern "C" fn dita_kernel_set_seed_capital( + handle: *mut KernelHandle, + seed_capital: f64, +) -> i32 { + if !seed_capital.is_finite() || seed_capital < 0.0 { + return -1; + } + match with_handle_mut(handle, |core| { + core.account.seed_capital = seed_capital; + core.account.reconcile(); + Ok(()) + }) { + Ok(_) => 0, + Err(_) => -1, + } +} + +/// Apply an account-level event atomically: fold K-values, store E-facts, +/// run reconcile, bump event_seq — all in one call. +/// +/// Payload JSON schema (kind field selects which other fields are read): +/// FILL_SETTLED : { "kind":"FILL_SETTLED", "realized_pnl": f64, "fee": f64 } +/// ACCOUNT_UPDATE: { "kind":"ACCOUNT_UPDATE", "wallet_balance": f64, +/// "available_margin": f64, "used_margin": f64, "maint_margin": f64 } +/// FUNDING_FEE : { "kind":"FUNDING_FEE", "funding_amount": f64 } +/// funding_amount > 0 = received; < 0 = paid +/// +/// Returns JSON: { event_seq, k_capital, e_wallet_balance, available_capital, +/// reconcile_status, reconcile_delta, reconcile_explanation, ... } +/// Returns null on handle error. +#[no_mangle] +pub extern "C" fn dita_kernel_on_account_event_json( + handle: *mut KernelHandle, + payload: *const c_char, +) -> *mut c_char { + let payload_str = match unsafe { CStr::from_ptr(payload) }.to_str() { + Ok(s) => s.to_string(), + Err(_) => return ptr::null_mut(), + }; + match with_handle_mut(handle, |core| Ok(core.on_account_event(&payload_str))) { + Ok(result) => { + let s = serde_json::to_string(&result).unwrap_or_else(|_| "{}".to_string()); + into_c_string(&s) + } + Err(_) => ptr::null_mut(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn mk_intent() -> KernelIntent { + KernelIntent { + timestamp: Utc::now(), + intent_id: "intent-1".to_string(), + trade_id: "trade-1".to_string(), + slot_id: 0, + asset: "BTCUSDT".to_string(), + side: TradeSide::SHORT, + action: KernelCommandType::ENTER, + reference_price: 100.0, + target_size: 1.0, + leverage: 2.0, + exit_leg_ratios: vec![1.0], + reason: String::new(), + metadata: Map::new(), + stage: TradeStage::INTENT_CREATED, + } + } + + #[test] + fn enter_then_ack_fill() { + let mut core = KernelCore::new(2); + let res = core.process_intent(mk_intent(), "DEBUG", "TRACE"); + assert!(res.outcome.accepted); + assert_eq!(res.slot.fsm_state, TradeStage::ORDER_REQUESTED); + let evt = VenueEvent { + timestamp: Utc::now(), + event_id: "evt-1".to_string(), + trade_id: "trade-1".to_string(), + slot_id: 0, + kind: KernelEventKind::ORDER_ACK, + status: VenueEventStatus::ACKED, + venue_order_id: "V1".to_string(), + venue_client_id: "trade-1:intent-1".to_string(), + side: TradeSide::SHORT, + asset: "BTCUSDT".to_string(), + price: 100.0, + size: 1.0, + filled_size: 1.0, + remaining_size: 0.0, + reason: String::new(), + raw_payload: Map::new(), + metadata: Map::new(), + }; + let ack = core.on_venue_event(evt, "DEBUG", "TRACE"); + assert!(ack.outcome.accepted); + assert_eq!(ack.slot.fsm_state, TradeStage::ENTRY_WORKING); + } +} diff --git a/prod/clean_arch/dita_v2/rust_backend.py b/prod/clean_arch/dita_v2/rust_backend.py new file mode 100644 index 0000000..801c4c3 --- /dev/null +++ b/prod/clean_arch/dita_v2/rust_backend.py @@ -0,0 +1,808 @@ +"""Rust-backed DITAv2 execution kernel. + +This module keeps the Python API shape stable while moving the kernel state +machine into a Rust shared library. Slot views write through to the backend on +assignment, then the Python side mirrors the resulting state into Zinc and the +existing projections/journals. +""" + +from __future__ import annotations + +from dataclasses import asdict +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Iterable, List, Optional, Sequence +import ctypes +import json +import math +import os +import subprocess +import sys + +from .account import AccountProjection +from .control import ControlPlane, ControlUpdate, KernelControlSnapshot, KernelVerbosity, build_control_plane +from .contracts import ( + KernelCommandType, + KernelDiagnosticCode, + KernelEventKind, + KernelIntent, + KernelOutcome, + KernelSeverity, + KernelTransition, + TradeSide, + TradeSlot, + TradeStage, + VenueEvent, + VenueOrder, + VenueOrderStatus, + VenueEventStatus, +) +from .journal import KernelJournal, MemoryKernelJournal +from .mock_venue import MockVenueAdapter +from .projection import HazelcastProjection +from .projection import build_projection +from .utils import json_safe +from .venue import VenueAdapter +from .zinc_plane import InMemoryZincPlane, ZincPlane + + +def _repo_root() -> Path: + return Path(__file__).resolve().parents[3] + + +def _crate_dir() -> Path: + return Path(__file__).resolve().with_name("_rust_kernel") + + +def _library_path() -> Path: + if sys.platform == "darwin": + name = "libdita_v2_kernel.dylib" + elif os.name == "nt": + name = "dita_v2_kernel.dll" + else: + name = "libdita_v2_kernel.so" + return _crate_dir() / "target" / "release" / name + + +def _build_library() -> None: + crate_dir = _crate_dir() + if not crate_dir.exists(): + raise FileNotFoundError(f"Missing Rust kernel crate: {crate_dir}") + subprocess.run( + ["cargo", "build", "--release", "--manifest-path", str(crate_dir / "Cargo.toml")], + cwd=_repo_root(), + check=True, + ) + + +def _ensure_library() -> Path: + path = _library_path() + if not path.exists(): + _build_library() + return path + + +class _RustKernelLib: + def __init__(self) -> None: + path = _ensure_library() + self.lib = ctypes.CDLL(str(path)) + self.lib.dita_kernel_create.argtypes = [ctypes.c_size_t] + self.lib.dita_kernel_create.restype = ctypes.c_void_p + self.lib.dita_kernel_destroy.argtypes = [ctypes.c_void_p] + self.lib.dita_kernel_destroy.restype = None + self.lib.dita_kernel_free_string.argtypes = [ctypes.c_void_p] + self.lib.dita_kernel_free_string.restype = None + self.lib.dita_kernel_get_slot_json.argtypes = [ctypes.c_void_p, ctypes.c_size_t] + self.lib.dita_kernel_get_slot_json.restype = ctypes.c_void_p + self.lib.dita_kernel_set_slot_json.argtypes = [ctypes.c_void_p, ctypes.c_size_t, ctypes.c_char_p] + self.lib.dita_kernel_set_slot_json.restype = ctypes.c_int + self.lib.dita_kernel_process_intent_json.argtypes = [ + ctypes.c_void_p, + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_char_p, + ] + self.lib.dita_kernel_process_intent_json.restype = ctypes.c_void_p + self.lib.dita_kernel_on_venue_event_json.argtypes = [ + ctypes.c_void_p, + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_char_p, + ] + self.lib.dita_kernel_on_venue_event_json.restype = ctypes.c_void_p + self.lib.dita_kernel_reconcile_slots_json.argtypes = [ + ctypes.c_void_p, + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_char_p, + ] + self.lib.dita_kernel_reconcile_slots_json.restype = ctypes.c_void_p + self.lib.dita_kernel_snapshot_json.argtypes = [ctypes.c_void_p] + self.lib.dita_kernel_snapshot_json.restype = ctypes.c_void_p + self.lib.dita_kernel_set_seed_capital.argtypes = [ctypes.c_void_p, ctypes.c_double] + self.lib.dita_kernel_set_seed_capital.restype = ctypes.c_int + self.lib.dita_kernel_on_account_event_json.argtypes = [ + ctypes.c_void_p, + ctypes.c_char_p, + ] + self.lib.dita_kernel_on_account_event_json.restype = ctypes.c_void_p + + def create(self, max_slots: int) -> ctypes.c_void_p: + handle = self.lib.dita_kernel_create(ctypes.c_size_t(max_slots)) + if not handle: + raise RuntimeError("dita_kernel_create failed") + return ctypes.c_void_p(handle) + + def destroy(self, handle: ctypes.c_void_p) -> None: + if handle and handle.value: + self.lib.dita_kernel_destroy(handle) + + def _take_string(self, raw: ctypes.c_void_p) -> str: + if not raw: + raise RuntimeError("Rust kernel returned null string") + text = ctypes.cast(raw, ctypes.c_char_p).value + if text is None: + self.lib.dita_kernel_free_string(raw) + raise RuntimeError("Rust kernel returned empty string") + try: + return text.decode("utf-8") + finally: + self.lib.dita_kernel_free_string(raw) + + def get_slot_json(self, handle: ctypes.c_void_p, slot_id: int) -> Dict[str, Any]: + raw = self.lib.dita_kernel_get_slot_json(handle, ctypes.c_size_t(slot_id)) + if not raw: + raise IndexError(f"Invalid slot id: {slot_id}") + return json.loads(self._take_string(raw)) + + def set_slot_json(self, handle: ctypes.c_void_p, slot_id: int, payload: Dict[str, Any]) -> None: + encoded = json.dumps(json_safe(payload), separators=(",", ":"), ensure_ascii=False).encode("utf-8") + rc = self.lib.dita_kernel_set_slot_json(handle, ctypes.c_size_t(slot_id), ctypes.c_char_p(encoded)) + if rc != 0: + raise RuntimeError(f"dita_kernel_set_slot_json failed rc={rc}") + + def process_intent( + self, + handle: ctypes.c_void_p, + payload: Dict[str, Any], + *, + mode: str, + verbosity: str, + ) -> Dict[str, Any]: + encoded = json.dumps(json_safe(payload), separators=(",", ":"), ensure_ascii=False).encode("utf-8") + raw = self.lib.dita_kernel_process_intent_json( + handle, + ctypes.c_char_p(encoded), + ctypes.c_char_p(mode.encode("utf-8")), + ctypes.c_char_p(verbosity.encode("utf-8")), + ) + return json.loads(self._take_string(raw)) + + def on_venue_event( + self, + handle: ctypes.c_void_p, + payload: Dict[str, Any], + *, + mode: str, + verbosity: str, + ) -> Dict[str, Any]: + encoded = json.dumps(json_safe(payload), separators=(",", ":"), ensure_ascii=False).encode("utf-8") + raw = self.lib.dita_kernel_on_venue_event_json( + handle, + ctypes.c_char_p(encoded), + ctypes.c_char_p(mode.encode("utf-8")), + ctypes.c_char_p(verbosity.encode("utf-8")), + ) + return json.loads(self._take_string(raw)) + + def reconcile_slots( + self, + handle: ctypes.c_void_p, + payload: Sequence[Dict[str, Any]], + *, + mode: str, + verbosity: str, + ) -> Dict[str, Any]: + encoded = json.dumps(json_safe(list(payload)), separators=(",", ":"), ensure_ascii=False).encode("utf-8") + raw = self.lib.dita_kernel_reconcile_slots_json( + handle, + ctypes.c_char_p(encoded), + ctypes.c_char_p(mode.encode("utf-8")), + ctypes.c_char_p(verbosity.encode("utf-8")), + ) + return json.loads(self._take_string(raw)) + + def snapshot(self, handle: ctypes.c_void_p) -> Dict[str, Any]: + raw = self.lib.dita_kernel_snapshot_json(handle) + return json.loads(self._take_string(raw)) + + def set_seed_capital(self, handle: ctypes.c_void_p, seed: float) -> bool: + rc = self.lib.dita_kernel_set_seed_capital(handle, ctypes.c_double(seed)) + return rc == 0 + + def on_account_event( + self, handle: ctypes.c_void_p, event: Dict[str, Any] + ) -> Dict[str, Any]: + encoded = json.dumps(json_safe(event), separators=(",", ":"), ensure_ascii=False).encode("utf-8") + raw = self.lib.dita_kernel_on_account_event_json(handle, ctypes.c_char_p(encoded)) + if not raw: + return {} + return json.loads(self._take_string(raw)) + + +_RUST: _RustKernelLib | None = None # lazy init — avoids Rust build on import + + +def _get_rust() -> _RustKernelLib: + global _RUST + if _RUST is None: + _RUST = _RustKernelLib() + return _RUST + + +def _slot_to_payload(slot: TradeSlot) -> Dict[str, Any]: + return slot.to_dict() + + +def _order_to_payload(order: Optional[VenueOrder]) -> Optional[Dict[str, Any]]: + if order is None: + return None + return { + "internal_trade_id": order.internal_trade_id, + "venue_order_id": order.venue_order_id, + "venue_client_id": order.venue_client_id, + "side": order.side.value, + "intended_size": float(order.intended_size or 0.0), + "filled_size": float(order.filled_size or 0.0), + "average_fill_price": float(order.average_fill_price or 0.0), + "status": order.status.value, + "metadata": dict(order.metadata), + } + + +def _order_from_payload(payload: Optional[Dict[str, Any]], *, trade_id: str) -> Optional[VenueOrder]: + if not isinstance(payload, dict): + return None + return VenueOrder( + internal_trade_id=trade_id, + venue_order_id=str(payload.get("venue_order_id", "")), + venue_client_id=str(payload.get("venue_client_id", "")), + side=TradeSide(str(payload.get("side", TradeSide.FLAT.value))), + intended_size=float(payload.get("intended_size", 0.0)), + filled_size=float(payload.get("filled_size", 0.0)), + average_fill_price=float(payload.get("average_fill_price", 0.0)), + status=VenueOrderStatus(str(payload.get("status", VenueOrderStatus.NEW.value))), + metadata=dict(payload.get("metadata", {})), + ) + + +def _slot_from_payload(payload: Dict[str, Any]) -> TradeSlot: + return TradeSlot( + slot_id=int(payload.get("slot_id", 0)), + trade_id=str(payload.get("trade_id", "")), + asset=str(payload.get("asset", "")), + side=TradeSide(str(payload.get("side", TradeSide.FLAT.value))), + entry_price=float(payload.get("entry_price", 0.0)), + size=float(payload.get("size", 0.0)), + initial_size=float(payload.get("initial_size", 0.0)), + leverage=float(payload.get("leverage", 0.0)), + entry_time=datetime.fromisoformat(payload["entry_time"]) if payload.get("entry_time") else None, + unrealized_pnl=float(payload.get("unrealized_pnl", 0.0)), + realized_pnl=float(payload.get("realized_pnl", 0.0)), + closed=bool(payload.get("closed", False)), + exit_leg_ratios=tuple(float(r) for r in payload.get("exit_leg_ratios", (1.0,))), + active_leg_index=int(payload.get("active_leg_index", 0)), + active_exit_order=_order_from_payload(payload.get("active_exit_order"), trade_id=str(payload.get("trade_id", ""))), + active_entry_order=_order_from_payload(payload.get("active_entry_order"), trade_id=str(payload.get("trade_id", ""))), + fsm_state=TradeStage(str(payload.get("fsm_state", TradeStage.IDLE.value))), + close_reason=str(payload.get("close_reason", "")), + last_event_time=datetime.fromisoformat(payload["last_event_time"]) if payload.get("last_event_time") else None, + seen_event_ids=tuple(str(event_id) for event_id in payload.get("seen_event_ids", ())), + metadata=dict(payload.get("metadata", {})), + ) + + +def _first_invalid_intent_field(intent: KernelIntent) -> Optional[tuple[str, float]]: + """Return (field, value) for the first non-finite or out-of-bounds numeric + field on an intent, or None if all are sane. Guards the kernel boundary + against inf/NaN that would otherwise crash serde_json serialization.""" + scalar_checks = ( + ("target_size", float(intent.target_size if intent.target_size is not None else 0.0)), + ("reference_price", float(intent.reference_price if intent.reference_price is not None else 0.0)), + ("leverage", float(intent.leverage if intent.leverage is not None else 0.0)), + ("limit_price", float(getattr(intent, "limit_price", 0.0) or 0.0)), + ) + for name, value in scalar_checks: + if not math.isfinite(value): + return (name, value) + for idx, ratio in enumerate(intent.exit_leg_ratios or ()): # type: ignore[union-attr] + rv = float(ratio if ratio is not None else 0.0) + if not math.isfinite(rv): + return (f"exit_leg_ratios[{idx}]", rv) + size = float(intent.target_size if intent.target_size is not None else 0.0) + if size < 0.0: + return ("target_size", size) + return None + + +def _intent_to_payload(intent: KernelIntent) -> Dict[str, Any]: + return { + "timestamp": intent.timestamp.isoformat() if hasattr(intent.timestamp, "isoformat") else str(intent.timestamp), + "intent_id": intent.intent_id, + "trade_id": intent.trade_id, + "slot_id": intent.slot_id, + "asset": intent.asset, + "side": intent.side.value, + "action": intent.action.value, + "reference_price": float(intent.reference_price or 0.0), + "target_size": float(intent.target_size or 0.0), + "leverage": float(intent.leverage or 0.0), + "exit_leg_ratios": list(intent.exit_leg_ratios), + "reason": intent.reason, + "metadata": dict(intent.metadata), + "stage": intent.stage.value, + "order_type": getattr(intent, "order_type", "MARKET"), + "limit_price": float(getattr(intent, "limit_price", 0.0) or 0.0), + } + + +def _event_to_payload(event: VenueEvent) -> Dict[str, Any]: + return { + "timestamp": event.timestamp.isoformat() if hasattr(event.timestamp, "isoformat") else str(event.timestamp), + "event_id": event.event_id, + "trade_id": event.trade_id, + "slot_id": event.slot_id, + "kind": event.kind.value, + "status": event.status.value, + "venue_order_id": event.venue_order_id, + "venue_client_id": event.venue_client_id, + "side": event.side.value, + "asset": event.asset, + "price": float(event.price or 0.0), + "size": float(event.size or 0.0), + "filled_size": float(event.filled_size or 0.0), + "remaining_size": float(event.remaining_size or 0.0), + "reason": event.reason, + "raw_payload": dict(event.raw_payload), + "metadata": dict(event.metadata), + } + + +def _transition_from_payload(payload: Dict[str, Any]) -> KernelTransition: + return KernelTransition( + timestamp=datetime.fromisoformat(payload["timestamp"]), + trade_id=str(payload.get("trade_id", "")), + slot_id=int(payload.get("slot_id", 0)), + prev_state=TradeStage(str(payload.get("prev_state", TradeStage.IDLE.value))), + next_state=TradeStage(str(payload.get("next_state", TradeStage.IDLE.value))), + trigger=str(payload.get("trigger", "")), + intent_id=str(payload.get("intent_id", "")), + event_id=str(payload.get("event_id", "")), + control_mode=str(payload.get("control_mode", "")), + control_verbosity=str(payload.get("control_verbosity", "")), + details=dict(payload.get("details", {})), + ) + + +def _outcome_from_payload(payload: Dict[str, Any]) -> KernelOutcome: + return KernelOutcome( + accepted=bool(payload.get("accepted", False)), + slot_id=int(payload.get("slot_id", 0)), + trade_id=str(payload.get("trade_id", "")), + state=TradeStage(str(payload.get("state", TradeStage.IDLE.value))), + diagnostic_code=KernelDiagnosticCode(str(payload.get("diagnostic_code", KernelDiagnosticCode.OK.value))), + severity=KernelSeverity(str(payload.get("severity", KernelSeverity.INFO.value))), + transitions=tuple(_transition_from_payload(row) for row in payload.get("transitions", [])), + emitted_events=tuple( + VenueEvent( + timestamp=datetime.fromisoformat(row["timestamp"]), + event_id=str(row.get("event_id", "")), + trade_id=str(row.get("trade_id", "")), + slot_id=int(row.get("slot_id", 0)), + kind=KernelEventKind(str(row.get("kind", KernelEventKind.ORDER_ACK.value))), + status=VenueEventStatus(str(row.get("status", VenueEventStatus.ACKED.value))), + venue_order_id=str(row.get("venue_order_id", "")), + venue_client_id=str(row.get("venue_client_id", "")), + side=TradeSide(str(row.get("side", TradeSide.FLAT.value))), + asset=str(row.get("asset", "")), + price=float(row.get("price", 0.0)), + size=float(row.get("size", 0.0)), + filled_size=float(row.get("filled_size", 0.0)), + remaining_size=float(row.get("remaining_size", 0.0)), + reason=str(row.get("reason", "")), + raw_payload=dict(row.get("raw_payload", {})), + metadata=dict(row.get("metadata", {})), + ) + for row in payload.get("emitted_events", []) + ), + details=dict(payload.get("details", {})), + ) + + +def _enum_text(value: Any) -> str: + if hasattr(value, "value"): + return str(getattr(value, "value")) + return str(value) + + +class KernelSlotView: + """Write-through view over a Rust-backed slot.""" + + def __init__(self, kernel: "ExecutionKernel", slot_id: int) -> None: + object.__setattr__(self, "_kernel", kernel) + object.__setattr__(self, "_slot_id", int(slot_id)) + + @property + def slot_id(self) -> int: + return object.__getattribute__(self, "_slot_id") + + def _snapshot(self) -> TradeSlot: + return self._kernel._get_slot(self.slot_id) + + def __getattr__(self, name: str) -> Any: + slot = self._snapshot() + if hasattr(slot, name): + return getattr(slot, name) + raise AttributeError(name) + + def __setattr__(self, name: str, value: Any) -> None: + if name in {"_kernel", "_slot_id"}: + object.__setattr__(self, name, value) + return + slot = self._snapshot() + if not hasattr(slot, name): + raise AttributeError(name) + setattr(slot, name, value) + self._kernel._set_slot(slot) + + def to_dict(self) -> Dict[str, Any]: + return self._snapshot().to_dict() + + def is_free(self) -> bool: + return self._snapshot().is_free() + + def is_open(self) -> bool: + return self._snapshot().is_open() + + def mark_price(self, price: float) -> None: + slot = self._snapshot() + slot.mark_price(price) + self._kernel._set_slot(slot) + + def next_exit_ratio(self) -> float: + return self._snapshot().next_exit_ratio() + + def consume_exit_leg(self) -> float: + slot = self._snapshot() + ratio = slot.consume_exit_leg() + self._kernel._set_slot(slot) + return ratio + + def attach_entry_order(self, order: VenueOrder) -> None: + slot = self._snapshot() + slot.active_entry_order = order + self._kernel._set_slot(slot) + + def attach_exit_order(self, order: VenueOrder) -> None: + slot = self._snapshot() + slot.active_exit_order = order + self._kernel._set_slot(slot) + + def __repr__(self) -> str: # pragma: no cover - debugging helper + return f"KernelSlotView(slot_id={self.slot_id}, state={self._snapshot().fsm_state.value})" + + +class KernelStateView: + def __init__(self, kernel: "ExecutionKernel") -> None: + self._kernel = kernel + self.slots = [KernelSlotView(kernel, slot_id) for slot_id in range(kernel.max_slots)] + self.active_trade_index: Dict[str, int] = {} + self.venue_order_index: Dict[str, int] = {} + self.client_order_index: Dict[str, int] = {} + self.refresh() + + def refresh(self) -> None: + snapshot = self._kernel._snapshot_backend() + self.active_trade_index = dict(snapshot.get("active_trade_index", {})) + self.venue_order_index = dict(snapshot.get("venue_order_index", {})) + self.client_order_index = dict(snapshot.get("client_order_index", {})) + + +class ExecutionKernel: + """Rust-backed multi-slot execution kernel.""" + + def __init__( + self, + *, + max_slots: int = 10, + control_plane: Optional[ControlPlane] = None, + venue: Optional[VenueAdapter] = None, + journal: Optional[KernelJournal] = None, + account: Optional[AccountProjection] = None, + projection: Optional[HazelcastProjection] = None, + projection_client: Optional[Any] = None, + zinc_plane: Optional[ZincPlane] = None, + ) -> None: + self.max_slots = int(max_slots) + self.control_plane = control_plane or build_control_plane() + self.venue = venue or MockVenueAdapter() + self.journal = journal or MemoryKernelJournal() + self.account = account or AccountProjection() + self.projection = projection or build_projection(client=projection_client) + self.zinc_plane = zinc_plane or InMemoryZincPlane() + self._backend = _get_rust().create(self.max_slots) + self._control_snapshot = self.control_plane.read() + self._last_settled_pnl: Dict[int, float] = {} + self.projection.write_control(self._control_snapshot) + self.zinc_plane.update_control(self._control_snapshot) + self.state = KernelStateView(self) + self.account.observe_slots([self._get_slot(slot_id) for slot_id in range(self.max_slots)]) + + def __del__(self) -> None: # pragma: no cover - cleanup best effort + backend = getattr(self, "_backend", None) + if backend is not None: + try: + _get_rust().destroy(backend) + except Exception: + pass + + @property + def control(self) -> KernelControlSnapshot: + return self.control_plane.read() + + def update_control(self, update: ControlUpdate) -> KernelControlSnapshot: + snapshot = self.control_plane.update(update) + self._control_snapshot = snapshot + self.projection.write_control(snapshot) + self.zinc_plane.update_control(snapshot) + return snapshot + + def _snapshot_backend(self) -> Dict[str, Any]: + return _get_rust().snapshot(self._backend) + + def _get_slot(self, slot_id: int) -> TradeSlot: + return _slot_from_payload(_get_rust().get_slot_json(self._backend, slot_id)) + + def _set_slot(self, slot: TradeSlot, *, journal: bool = False) -> None: + payload = _slot_to_payload(slot) + _get_rust().set_slot_json(self._backend, slot.slot_id, payload) + self.state.refresh() + slots = [self._get_slot(slot_id) for slot_id in range(self.max_slots)] + self.account.observe_slots(slots) + current = self._get_slot(slot.slot_id) + self.projection.write_slot(current) + self.zinc_plane.write_slot(current) + + def slot(self, slot_id: int) -> KernelSlotView: + if not (0 <= int(slot_id) < self.max_slots): + raise IndexError(slot_id) + return self.state.slots[int(slot_id)] + + def free_slot(self) -> Optional[KernelSlotView]: + for slot in self.state.slots: + if slot.is_free(): + return slot + return None + + def _record_transitions(self, transitions: Iterable[KernelTransition], slot: TradeSlot, event: Optional[VenueEvent]) -> None: + if self.control.debug_clickhouse_enabled: + for transition in transitions: + self.journal.record_transition( + transition=transition, + slot=slot, + event=event, + control=self.control, + ) + + def process_intent(self, intent: KernelIntent) -> KernelOutcome: + self.zinc_plane.publish_intent(intent) + if not (0 <= int(intent.slot_id) < self.max_slots): + return KernelOutcome( + accepted=False, + slot_id=int(intent.slot_id), + trade_id=intent.trade_id, + state=TradeStage.IDLE, + diagnostic_code=KernelDiagnosticCode.INVALID_SLOT_ID, + details={"reason": "INVALID_SLOT_ID", "slot_id": int(intent.slot_id), "intent_id": intent.intent_id}, + ) + # Finiteness / sanity guard at the kernel boundary. A non-finite (inf/NaN) + # numeric field would make the Rust core's serde_json serialization return + # a null string (panic). Reject cleanly with INVALID_INTENT instead, naming + # the offending field + value so the upstream numerical source can be located. + bad_field = _first_invalid_intent_field(intent) + if bad_field is not None: + name, value = bad_field + return KernelOutcome( + accepted=False, + slot_id=int(intent.slot_id), + trade_id=intent.trade_id, + state=self._get_slot(int(intent.slot_id)).fsm_state, + diagnostic_code=KernelDiagnosticCode.INVALID_INTENT, + severity=KernelSeverity.WARNING, + details={ + "reason": "INVALID_INTENT", + "field": name, + "value": str(value), + "intent_id": intent.intent_id, + "action": intent.action.value, + "asset": intent.asset, + }, + ) + payload = _intent_to_payload(intent) + result = _get_rust().process_intent( + self._backend, + payload, + mode=_enum_text(self.control.mode), + verbosity=_enum_text(self.control.verbosity), + ) + outcome = _outcome_from_payload(result["outcome"]) + self.state.refresh() + if intent.action == KernelCommandType.ENTER and outcome.accepted: + self._last_settled_pnl[intent.slot_id] = 0.0 + emitted_events = [] + all_venue_transitions: List[KernelTransition] = [] + if intent.action in {KernelCommandType.ENTER, KernelCommandType.EXIT}: + emitted_events = self.venue.submit(intent) + for event in emitted_events: + evt_outcome = self.on_venue_event(event) + all_venue_transitions.extend(evt_outcome.transitions) + elif intent.action == KernelCommandType.CANCEL: + slot_view = self.slot(intent.slot_id) + if slot_view.active_exit_order is not None: + emitted_events = self.venue.cancel(slot_view.active_exit_order, reason=intent.reason) + elif slot_view.active_entry_order is not None and slot_view.fsm_state in { + TradeStage.ENTRY_WORKING, + TradeStage.ORDER_REQUESTED, + TradeStage.ORDER_SENT, + TradeStage.IDLE, + }: + emitted_events = self.venue.cancel(slot_view.active_entry_order, reason=intent.reason) + else: + emitted_events = [] + for event in emitted_events: + evt_outcome = self.on_venue_event(event) + all_venue_transitions.extend(evt_outcome.transitions) + + final_slot = self._get_slot(outcome.slot_id) + rate_limit_event = next((event for event in emitted_events if event.kind == KernelEventKind.RATE_LIMITED), None) + if rate_limit_event is not None: + rate_limit_details = dict(outcome.details) + rate_limit_details.update( + { + "reason": rate_limit_event.reason or "RATE_LIMITED", + "retry_after_ms": int(rate_limit_event.metadata.get("retry_after_ms", 0) or 0), + "venue_event_kind": rate_limit_event.kind.value, + "severity": KernelSeverity.WARNING.value, + "release_eta": "few minutes", + "retryable": True, + } + ) + outcome = KernelOutcome( + accepted=False, + slot_id=outcome.slot_id, + trade_id=outcome.trade_id, + state=final_slot.fsm_state, + diagnostic_code=KernelDiagnosticCode.RATE_LIMITED, + severity=KernelSeverity.WARNING, + transitions=outcome.transitions, + emitted_events=outcome.emitted_events, + details=rate_limit_details, + ) + all_transitions = list(outcome.transitions) + all_venue_transitions + final_outcome = KernelOutcome( + accepted=outcome.accepted, + slot_id=outcome.slot_id, + trade_id=final_slot.trade_id, + state=final_slot.fsm_state, + diagnostic_code=outcome.diagnostic_code, + transitions=tuple(all_transitions), + emitted_events=tuple(emitted_events), + details=dict(outcome.details), + ) + slots = [self._get_slot(i) for i in range(self.max_slots)] + self.account.observe_slots(slots) + current = self._get_slot(final_slot.slot_id) + self.projection.write_slot(current) + self.zinc_plane.write_slot(current) + self._record_transitions(outcome.transitions, final_slot, None) + return final_outcome + + def on_venue_event(self, event: VenueEvent) -> KernelOutcome: + result = _get_rust().on_venue_event( + self._backend, + _event_to_payload(event), + mode=_enum_text(self.control.mode), + verbosity=_enum_text(self.control.verbosity), + ) + outcome = _outcome_from_payload(result["outcome"]) + # An INVALID_* fallback result carries a null slot; fall back to the + # kernel's current slot so settlement/bookkeeping stays consistent. + slot_payload = result.get("slot") + slot = _slot_from_payload(slot_payload) if slot_payload else self._get_slot(int(outcome.slot_id)) + self.state.refresh() + incremental_pnl = slot.realized_pnl - self._last_settled_pnl.get(slot.slot_id, 0.0) + if abs(incremental_pnl) > 1e-12: + self.account.settle(incremental_pnl) + self._last_settled_pnl[slot.slot_id] = slot.realized_pnl + slots = [self._get_slot(i) for i in range(self.max_slots)] + self.account.observe_slots(slots) + current = self._get_slot(slot.slot_id) + self.projection.write_slot(current) + self.zinc_plane.write_slot(current) + self._record_transitions(outcome.transitions, slot, event) + return outcome + + def mark_price(self, asset: str, price: float) -> None: + for slot in self.state.slots: + if slot.asset == asset and slot.is_open(): + slot.mark_price(price) + self.account.observe_slots([self._get_slot(i) for i in range(self.max_slots)]) + + def reconcile_from_slots(self, slots: Sequence[TradeSlot]) -> KernelOutcome: + payload = [_slot_to_payload(slot) for slot in slots] + result = _get_rust().reconcile_slots( + self._backend, + payload, + mode=_enum_text(self.control.mode), + verbosity=_enum_text(self.control.verbosity), + ) + outcome = _outcome_from_payload(result["outcome"]) + if not outcome.accepted: + return outcome + self.state.refresh() + slots = [self._get_slot(i) for i in range(self.max_slots)] + self.account.observe_slots(slots) + for current in slots: + self.projection.write_slot(current) + self.zinc_plane.write_slot(current) + return outcome + + def set_seed_capital(self, seed: float) -> None: + """Set the kernel's seed capital for K-value fold. Call once at init.""" + _get_rust().set_seed_capital(self._backend, float(seed)) + + def on_account_event(self, event: Dict[str, Any]) -> Dict[str, Any]: + """ + Apply an account-level exchange event atomically to the kernel. + + event dict must have "kind" in {"FILL_SETTLED","ACCOUNT_UPDATE","FUNDING_FEE"} + plus the relevant numeric fields (see Rust FFI doc). + + Returns the resulting account state dict including reconcile_status, + available_capital (E rules when present), k_capital, event_seq. + """ + return _get_rust().on_account_event(self._backend, event) + + def snapshot(self) -> Dict[str, Any]: + # Merge kernel Rust snapshot (includes AccountState) with Python state. + rust_snap = _get_rust().snapshot(self._backend) + rust_account = rust_snap.get("account", {}) + return { + "control": self.control.as_dict(), + "slots": [self._get_slot(slot.slot_id).to_dict() for slot in self.state.slots], + "account": { + # Legacy fields (Python AccountProjection) — backward compat + "capital": self.account.snapshot.capital, + "equity": self.account.snapshot.equity, + "realized_pnl": self.account.snapshot.realized_pnl, + "unrealized_pnl": self.account.snapshot.unrealized_pnl, + "open_positions": self.account.snapshot.open_positions, + "open_notional": self.account.snapshot.open_notional, + "leverage": self.account.snapshot.leverage, + # V2 — kernel atomic K/E account (E rules; K is parallel check) + "k_capital": rust_account.get("k_capital", self.account.snapshot.capital), + "k_realized_pnl": rust_account.get("k_realized_pnl", self.account.snapshot.realized_pnl), + "k_fees_paid": rust_account.get("k_fees_paid", self.account.snapshot.fees_paid), + "k_funding_net": rust_account.get("k_funding_net", 0.0), + "e_wallet_balance": rust_account.get("e_wallet_balance", 0.0), + "e_available_margin": rust_account.get("e_available_margin", 0.0), + "e_used_margin": rust_account.get("e_used_margin", 0.0), + "e_maint_margin": rust_account.get("e_maint_margin", 0.0), + # available_capital: E rules when present, K as fallback + "available_capital": rust_account.get("available_capital", self.account.snapshot.capital), + "reconcile_status": rust_account.get("reconcile_status", "OK"), + "reconcile_delta": rust_account.get("reconcile_delta", 0.0), + "reconcile_explanation": rust_account.get("reconcile_explanation", ""), + "event_seq": rust_account.get("event_seq", 0), + }, + }