diff --git a/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs index 9a706f7..4e8eb95 100644 --- a/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs +++ b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs @@ -863,6 +863,7 @@ impl KernelCore { let exit_ratio = slot.next_exit_ratio(); let base_size = if slot.initial_size > 0.0 { slot.initial_size } else { slot.size }; let exit_size = (base_size * exit_ratio).max(0.0); + let exit_prev_state = slot.fsm_state.clone(); slot.fsm_state = TradeStage::EXIT_REQUESTED; slot.attach_exit_order(VenueOrder { internal_trade_id: slot.trade_id.clone(), @@ -888,7 +889,7 @@ impl KernelCore { self.commit_slot(slot.clone()); let transition = self.transition( &slot, - TradeStage::POSITION_OPEN, + exit_prev_state, slot.fsm_state.clone(), "EXIT_INTENT", None, @@ -1072,6 +1073,45 @@ impl KernelCore { }; } + // I13: Reject stray events on completed slots. A delayed venue event + // (e.g. a fill that arrives after the slot is already closed) must not + // reactivate the slot or corrupt its FSM state. Record the event_id + // so a repeat of the same stray is caught by the dedup guard above. + if slot.closed { + let prev_state = slot.fsm_state.clone(); + let transition = self.transition( + &slot, + prev_state.clone(), + prev_state.clone(), + "TERMINAL_STATE", + Some(&event), + control_mode, + control_verbosity, + ); + Self::append_event_id(&mut slot, &event.event_id); + self.commit_slot(slot.clone()); + return KernelResult { + outcome: KernelOutcome { + accepted: false, + slot_id: slot.slot_id, + trade_id: slot.trade_id.clone(), + state: slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::TERMINAL_STATE, + transitions: vec![transition], + details: json!({ + "event_kind": event.kind, + "reason": "TERMINAL_STATE", + }) + .as_object() + .cloned() + .unwrap_or_default(), + ..KernelOutcome::default() + }, + slot: slot.clone(), + snapshot: self.snapshot(), + }; + } + if slot.fsm_state == TradeStage::STALE_STATE_RECONCILING { let prev_state = slot.fsm_state.clone(); let transition = self.transition( @@ -1339,6 +1379,16 @@ impl KernelCore { event.size } .max(0.0); + // Accumulate incremental fills. WS events carry lastFilledQty + // (incremental per-event); REST/snapshot events are cumulative but + // arrive as a single FULL_FILL with prev_filled == 0, so the sum + // equals fill_size in that case — no change in behavior. + let prev_filled = slot + .active_entry_order + .as_ref() + .map(|order| order.filled_size) + .unwrap_or(0.0); + let accumulated = prev_filled + fill_size; let intended_size = slot .active_entry_order .as_ref() @@ -1350,7 +1400,7 @@ impl KernelCore { venue_client_id: event.venue_client_id.clone(), side: slot.side.clone(), intended_size, - filled_size: fill_size, + filled_size: accumulated, average_fill_price: event.price, status: if partial { VenueOrderStatus::PARTIALLY_FILLED @@ -1363,12 +1413,11 @@ impl KernelCore { map }, }); + // Set initial_size from the intended order size on first fill only. if slot.initial_size <= 0.0 { - slot.initial_size = fill_size; - } else { - slot.initial_size = slot.initial_size.max(fill_size); + slot.initial_size = if intended_size > 0.0 { intended_size } else { accumulated }; } - slot.size = fill_size; + slot.size = accumulated; if event.price > 0.0 { slot.entry_price = event.price; } @@ -1474,7 +1523,16 @@ fn cstr_to_string(ptr: *const c_char) -> Result { } fn into_c_string(value: &str) -> *mut c_char { - CString::new(value).unwrap().into_raw() + // Strip embedded NUL bytes so CString::new never panics. A NUL in a JSON + // payload (e.g. from a malformed exchange response) would otherwise crash + // the process via unwrap(). + match CString::new(value) { + Ok(cs) => cs.into_raw(), + Err(_) => { + let sanitized = value.replace('\0', "\\u0000"); + CString::new(sanitized).unwrap_or_else(|_| CString::new("").unwrap()).into_raw() + } + } } /// Build a well-formed INVALID_INTENT KernelResult JSON string. Used when the diff --git a/prod/clean_arch/dita_v2/real_control_plane.py b/prod/clean_arch/dita_v2/real_control_plane.py index 35dcd3b..0139b91 100644 --- a/prod/clean_arch/dita_v2/real_control_plane.py +++ b/prod/clean_arch/dita_v2/real_control_plane.py @@ -12,7 +12,7 @@ from .control import BackendMode, ControlPlane, ControlUpdate, KernelControlSnap _ZINC_ADAPTER_PATH = Path(__file__).resolve().parents[3] / "zinc" / "adapters" / "python" if _ZINC_ADAPTER_PATH.exists() and str(_ZINC_ADAPTER_PATH) not in sys.path: - sys.path.insert(0, str(_ZINC_ADAPTER_PATH)) + sys.path.append(str(_ZINC_ADAPTER_PATH)) try: # pragma: no cover - exercised in integration tests from zinc import SharedRegion diff --git a/prod/clean_arch/dita_v2/real_zinc_plane.py b/prod/clean_arch/dita_v2/real_zinc_plane.py index f54277c..3a13375 100644 --- a/prod/clean_arch/dita_v2/real_zinc_plane.py +++ b/prod/clean_arch/dita_v2/real_zinc_plane.py @@ -21,7 +21,7 @@ from .control import KernelControlSnapshot _ZINC_ADAPTER_PATH = Path(__file__).resolve().parents[3] / "zinc" / "adapters" / "python" if _ZINC_ADAPTER_PATH.exists() and str(_ZINC_ADAPTER_PATH) not in sys.path: - sys.path.insert(0, str(_ZINC_ADAPTER_PATH)) + sys.path.append(str(_ZINC_ADAPTER_PATH)) try: # pragma: no cover - exercised in integration tests from zinc import SharedRegion