PINK DITAv2: kernel-level finiteness guard (no more null-string crash on inf/NaN)
The aborted hard cutover crash-looped with "Rust kernel returned null string" from process_intent on the first live trading step. Root cause (reproduced): a non-finite (inf/NaN) numeric field reaching the kernel — Python json.dumps emits the Infinity/NaN token, serde_json rejects it at parse, and the FFI returned null. Magnitude is fine; only finiteness was the problem. Defense in depth, kernel catches it: - Rust FFI (lib.rs): dita_kernel_process_intent_json / _on_venue_event_json now return a clean INVALID_INTENT KernelResult on parse failure (incl. Infinity/NaN tokens) AND on serialize failure (a non-finite produced internally) — never a null string. - Python bridge (rust_backend.py): ExecutionKernel.process_intent validates intent finiteness/bounds (target_size, reference_price, limit_price, leverage, exit_leg_ratios; size>=0) BEFORE the FFI and rejects INVALID_INTENT, naming the offending field+value. - contracts.py: add KernelDiagnosticCode.INVALID_INTENT. - pink_direct.py: on INVALID_INTENT, log full upstream provenance (snapshot.price, capital, leverage, sizes) so the numerical SOURCE can be located on the next live run. - on_venue_event bridge tolerates the fallback's null slot (uses the live slot). Verified: kernel recompiled; offline 65 + 7 new guard tests green (no regression); direct-FFI inf payload -> INVALID_INTENT (no null crash). NOTE: this turns the cutover crash into a clean rejection — the upstream source of the non-finite (the live run's inf) still needs locating, now aided by the provenance log. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -1477,6 +1477,31 @@ fn into_c_string(value: &str) -> *mut c_char {
|
|||||||
CString::new(value).unwrap().into_raw()
|
CString::new(value).unwrap().into_raw()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Build a well-formed INVALID_INTENT KernelResult JSON string. Used when the
|
||||||
|
/// kernel cannot safely process a request — a payload that fails to parse
|
||||||
|
/// (e.g. non-finite Infinity/NaN tokens serde rejects) or a result that fails
|
||||||
|
/// to serialize (a non-finite value produced internally). This keeps the kernel
|
||||||
|
/// from ever returning a null string on non-finite input/output: it rejects
|
||||||
|
/// cleanly with a diagnostic instead of panicking.
|
||||||
|
fn invalid_intent_cstring(reason: &str, detail: &str) -> *mut c_char {
|
||||||
|
let fallback = serde_json::json!({
|
||||||
|
"outcome": {
|
||||||
|
"accepted": false,
|
||||||
|
"slot_id": 0,
|
||||||
|
"trade_id": "",
|
||||||
|
"state": "IDLE",
|
||||||
|
"diagnostic_code": "INVALID_INTENT",
|
||||||
|
"severity": "WARNING",
|
||||||
|
"transitions": [],
|
||||||
|
"emitted_events": [],
|
||||||
|
"details": {"reason": reason, "detail": detail}
|
||||||
|
},
|
||||||
|
"slot": serde_json::Value::Null,
|
||||||
|
"snapshot": serde_json::Value::Null
|
||||||
|
});
|
||||||
|
into_c_string(&fallback.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
fn with_handle_mut<F, R>(handle: *mut KernelHandle, f: F) -> Result<R, String>
|
fn with_handle_mut<F, R>(handle: *mut KernelHandle, f: F) -> Result<R, String>
|
||||||
where
|
where
|
||||||
F: FnOnce(&mut KernelCore) -> Result<R, String>,
|
F: FnOnce(&mut KernelCore) -> Result<R, String>,
|
||||||
@@ -1551,11 +1576,21 @@ pub extern "C" fn dita_kernel_process_intent_json(
|
|||||||
};
|
};
|
||||||
let control_mode = cstr_to_string(control_mode).unwrap_or_else(|_| "NORMAL".to_string());
|
let control_mode = cstr_to_string(control_mode).unwrap_or_else(|_| "NORMAL".to_string());
|
||||||
let control_verbosity = cstr_to_string(control_verbosity).unwrap_or_else(|_| "QUIET".to_string());
|
let control_verbosity = cstr_to_string(control_verbosity).unwrap_or_else(|_| "QUIET".to_string());
|
||||||
|
// Reject non-parseable payloads (incl. non-finite Infinity/NaN tokens serde
|
||||||
|
// refuses) with a clean INVALID_INTENT, never a null string.
|
||||||
|
let intent: KernelIntent = match serde_json::from_str(&payload) {
|
||||||
|
Ok(value) => value,
|
||||||
|
Err(err) => return invalid_intent_cstring("INVALID_INTENT_PARSE", &err.to_string()),
|
||||||
|
};
|
||||||
match with_handle_mut(handle, |core| {
|
match with_handle_mut(handle, |core| {
|
||||||
let intent: KernelIntent = serde_json::from_str(&payload).map_err(|err| err.to_string())?;
|
Ok::<_, String>(core.process_intent(intent, &control_mode, &control_verbosity))
|
||||||
Ok(core.process_intent(intent, &control_mode, &control_verbosity))
|
|
||||||
}) {
|
}) {
|
||||||
Ok(result) => serde_json::to_string(&result).ok().map(|s| into_c_string(&s)).unwrap_or(ptr::null_mut()),
|
// A serialize failure means a non-finite value reached the output; reject
|
||||||
|
// cleanly rather than returning null.
|
||||||
|
Ok(result) => match serde_json::to_string(&result) {
|
||||||
|
Ok(s) => into_c_string(&s),
|
||||||
|
Err(err) => invalid_intent_cstring("INVALID_INTENT_SERIALIZE", &err.to_string()),
|
||||||
|
},
|
||||||
Err(_) => ptr::null_mut(),
|
Err(_) => ptr::null_mut(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1573,11 +1608,17 @@ pub extern "C" fn dita_kernel_on_venue_event_json(
|
|||||||
};
|
};
|
||||||
let control_mode = cstr_to_string(control_mode).unwrap_or_else(|_| "NORMAL".to_string());
|
let control_mode = cstr_to_string(control_mode).unwrap_or_else(|_| "NORMAL".to_string());
|
||||||
let control_verbosity = cstr_to_string(control_verbosity).unwrap_or_else(|_| "QUIET".to_string());
|
let control_verbosity = cstr_to_string(control_verbosity).unwrap_or_else(|_| "QUIET".to_string());
|
||||||
|
let event: VenueEvent = match serde_json::from_str(&payload) {
|
||||||
|
Ok(value) => value,
|
||||||
|
Err(err) => return invalid_intent_cstring("INVALID_EVENT_PARSE", &err.to_string()),
|
||||||
|
};
|
||||||
match with_handle_mut(handle, |core| {
|
match with_handle_mut(handle, |core| {
|
||||||
let event: VenueEvent = serde_json::from_str(&payload).map_err(|err| err.to_string())?;
|
Ok::<_, String>(core.on_venue_event(event, &control_mode, &control_verbosity))
|
||||||
Ok(core.on_venue_event(event, &control_mode, &control_verbosity))
|
|
||||||
}) {
|
}) {
|
||||||
Ok(result) => serde_json::to_string(&result).ok().map(|s| into_c_string(&s)).unwrap_or(ptr::null_mut()),
|
Ok(result) => match serde_json::to_string(&result) {
|
||||||
|
Ok(s) => into_c_string(&s),
|
||||||
|
Err(err) => invalid_intent_cstring("INVALID_EVENT_SERIALIZE", &err.to_string()),
|
||||||
|
},
|
||||||
Err(_) => ptr::null_mut(),
|
Err(_) => ptr::null_mut(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -74,6 +74,7 @@ class KernelDiagnosticCode(str, Enum):
|
|||||||
OK = "OK"
|
OK = "OK"
|
||||||
RATE_LIMITED = "RATE_LIMITED"
|
RATE_LIMITED = "RATE_LIMITED"
|
||||||
INVALID_SLOT_ID = "INVALID_SLOT_ID"
|
INVALID_SLOT_ID = "INVALID_SLOT_ID"
|
||||||
|
INVALID_INTENT = "INVALID_INTENT"
|
||||||
UNSUPPORTED_INTENT = "UNSUPPORTED_INTENT"
|
UNSUPPORTED_INTENT = "UNSUPPORTED_INTENT"
|
||||||
SLOT_BUSY = "SLOT_BUSY"
|
SLOT_BUSY = "SLOT_BUSY"
|
||||||
NO_OPEN_POSITION = "NO_OPEN_POSITION"
|
NO_OPEN_POSITION = "NO_OPEN_POSITION"
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ from pathlib import Path
|
|||||||
from typing import Any, Dict, Iterable, List, Optional, Sequence
|
from typing import Any, Dict, Iterable, List, Optional, Sequence
|
||||||
import ctypes
|
import ctypes
|
||||||
import json
|
import json
|
||||||
|
import math
|
||||||
import os
|
import os
|
||||||
import subprocess
|
import subprocess
|
||||||
import sys
|
import sys
|
||||||
@@ -281,6 +282,29 @@ def _slot_from_payload(payload: Dict[str, Any]) -> TradeSlot:
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _first_invalid_intent_field(intent: KernelIntent) -> Optional[tuple[str, float]]:
|
||||||
|
"""Return (field, value) for the first non-finite or out-of-bounds numeric
|
||||||
|
field on an intent, or None if all are sane. Guards the kernel boundary
|
||||||
|
against inf/NaN that would otherwise crash serde_json serialization."""
|
||||||
|
scalar_checks = (
|
||||||
|
("target_size", float(intent.target_size if intent.target_size is not None else 0.0)),
|
||||||
|
("reference_price", float(intent.reference_price if intent.reference_price is not None else 0.0)),
|
||||||
|
("leverage", float(intent.leverage if intent.leverage is not None else 0.0)),
|
||||||
|
("limit_price", float(getattr(intent, "limit_price", 0.0) or 0.0)),
|
||||||
|
)
|
||||||
|
for name, value in scalar_checks:
|
||||||
|
if not math.isfinite(value):
|
||||||
|
return (name, value)
|
||||||
|
for idx, ratio in enumerate(intent.exit_leg_ratios or ()): # type: ignore[union-attr]
|
||||||
|
rv = float(ratio if ratio is not None else 0.0)
|
||||||
|
if not math.isfinite(rv):
|
||||||
|
return (f"exit_leg_ratios[{idx}]", rv)
|
||||||
|
size = float(intent.target_size if intent.target_size is not None else 0.0)
|
||||||
|
if size < 0.0:
|
||||||
|
return ("target_size", size)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
def _intent_to_payload(intent: KernelIntent) -> Dict[str, Any]:
|
def _intent_to_payload(intent: KernelIntent) -> Dict[str, Any]:
|
||||||
return {
|
return {
|
||||||
"timestamp": intent.timestamp.isoformat() if hasattr(intent.timestamp, "isoformat") else str(intent.timestamp),
|
"timestamp": intent.timestamp.isoformat() if hasattr(intent.timestamp, "isoformat") else str(intent.timestamp),
|
||||||
@@ -561,6 +585,29 @@ class ExecutionKernel:
|
|||||||
diagnostic_code=KernelDiagnosticCode.INVALID_SLOT_ID,
|
diagnostic_code=KernelDiagnosticCode.INVALID_SLOT_ID,
|
||||||
details={"reason": "INVALID_SLOT_ID", "slot_id": int(intent.slot_id), "intent_id": intent.intent_id},
|
details={"reason": "INVALID_SLOT_ID", "slot_id": int(intent.slot_id), "intent_id": intent.intent_id},
|
||||||
)
|
)
|
||||||
|
# Finiteness / sanity guard at the kernel boundary. A non-finite (inf/NaN)
|
||||||
|
# numeric field would make the Rust core's serde_json serialization return
|
||||||
|
# a null string (panic). Reject cleanly with INVALID_INTENT instead, naming
|
||||||
|
# the offending field + value so the upstream numerical source can be located.
|
||||||
|
bad_field = _first_invalid_intent_field(intent)
|
||||||
|
if bad_field is not None:
|
||||||
|
name, value = bad_field
|
||||||
|
return KernelOutcome(
|
||||||
|
accepted=False,
|
||||||
|
slot_id=int(intent.slot_id),
|
||||||
|
trade_id=intent.trade_id,
|
||||||
|
state=self._get_slot(int(intent.slot_id)).fsm_state,
|
||||||
|
diagnostic_code=KernelDiagnosticCode.INVALID_INTENT,
|
||||||
|
severity=KernelSeverity.WARNING,
|
||||||
|
details={
|
||||||
|
"reason": "INVALID_INTENT",
|
||||||
|
"field": name,
|
||||||
|
"value": str(value),
|
||||||
|
"intent_id": intent.intent_id,
|
||||||
|
"action": intent.action.value,
|
||||||
|
"asset": intent.asset,
|
||||||
|
},
|
||||||
|
)
|
||||||
payload = _intent_to_payload(intent)
|
payload = _intent_to_payload(intent)
|
||||||
result = _get_rust().process_intent(
|
result = _get_rust().process_intent(
|
||||||
self._backend,
|
self._backend,
|
||||||
@@ -648,7 +695,10 @@ class ExecutionKernel:
|
|||||||
verbosity=_enum_text(self.control.verbosity),
|
verbosity=_enum_text(self.control.verbosity),
|
||||||
)
|
)
|
||||||
outcome = _outcome_from_payload(result["outcome"])
|
outcome = _outcome_from_payload(result["outcome"])
|
||||||
slot = _slot_from_payload(result["slot"])
|
# An INVALID_* fallback result carries a null slot; fall back to the
|
||||||
|
# kernel's current slot so settlement/bookkeeping stays consistent.
|
||||||
|
slot_payload = result.get("slot")
|
||||||
|
slot = _slot_from_payload(slot_payload) if slot_payload else self._get_slot(int(outcome.slot_id))
|
||||||
self.state.refresh()
|
self.state.refresh()
|
||||||
incremental_pnl = slot.realized_pnl - self._last_settled_pnl.get(slot.slot_id, 0.0)
|
incremental_pnl = slot.realized_pnl - self._last_settled_pnl.get(slot.slot_id, 0.0)
|
||||||
if abs(incremental_pnl) > 1e-12:
|
if abs(incremental_pnl) > 1e-12:
|
||||||
|
|||||||
@@ -427,6 +427,27 @@ class PinkDirectRuntime:
|
|||||||
kernel_intent = _decision_to_kernel_intent(decision, intent, slot_id=0)
|
kernel_intent = _decision_to_kernel_intent(decision, intent, slot_id=0)
|
||||||
outcome = self.kernel.process_intent(kernel_intent)
|
outcome = self.kernel.process_intent(kernel_intent)
|
||||||
|
|
||||||
|
# Locate the source of any non-finite intent the kernel rejected:
|
||||||
|
# log the full upstream provenance (snapshot price, account capital,
|
||||||
|
# leverage, sizing) so a numerical error can be traced to its origin
|
||||||
|
# rather than silently rejected.
|
||||||
|
if outcome.diagnostic_code == KernelDiagnosticCode.INVALID_INTENT:
|
||||||
|
self.logger.error(
|
||||||
|
"INVALID_INTENT rejected by kernel: %s | provenance: "
|
||||||
|
"snapshot.price=%r capital=%r open_positions=%r leverage=%r "
|
||||||
|
"target_size=%r reference_price=%r limit_price=%r action=%s asset=%s",
|
||||||
|
dict(outcome.details or {}),
|
||||||
|
getattr(snapshot, "price", None),
|
||||||
|
context.capital,
|
||||||
|
context.open_positions,
|
||||||
|
getattr(kernel_intent, "leverage", None),
|
||||||
|
getattr(kernel_intent, "target_size", None),
|
||||||
|
getattr(kernel_intent, "reference_price", None),
|
||||||
|
getattr(kernel_intent, "limit_price", None),
|
||||||
|
decision.action.value,
|
||||||
|
intent.asset,
|
||||||
|
)
|
||||||
|
|
||||||
# Read authoritative final state from kernel.
|
# Read authoritative final state from kernel.
|
||||||
final_slot = self.kernel.slot(0)
|
final_slot = self.kernel.slot(0)
|
||||||
slot_dict = final_slot.to_dict()
|
slot_dict = final_slot.to_dict()
|
||||||
|
|||||||
73
prod/tests/test_pink_invalid_intent_guard.py
Normal file
73
prod/tests/test_pink_invalid_intent_guard.py
Normal file
@@ -0,0 +1,73 @@
|
|||||||
|
"""Kernel-level finiteness guard: non-finite (inf/NaN) intents must be rejected
|
||||||
|
with INVALID_INTENT, never crash the kernel ("Rust kernel returned null string").
|
||||||
|
|
||||||
|
Two layers (defense in depth):
|
||||||
|
- Python bridge (ExecutionKernel.process_intent): rejects non-finite/insane fields
|
||||||
|
before the FFI call, naming the offending field for source-location.
|
||||||
|
- Rust kernel (FFI): a payload that fails to parse (incl. the Infinity/NaN tokens
|
||||||
|
serde rejects) or a result that fails to serialize returns a clean INVALID_INTENT
|
||||||
|
outcome instead of a null string.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from prod.clean_arch.dita_v2 import (
|
||||||
|
ExecutionKernel, InMemoryControlPlane, KernelCommandType, KernelControlSnapshot,
|
||||||
|
KernelMode, KernelVerbosity, MemoryKernelJournal, MockVenueAdapter, MockVenueScenario,
|
||||||
|
TradeSide,
|
||||||
|
)
|
||||||
|
from prod.clean_arch.dita_v2.contracts import KernelDiagnosticCode, KernelIntent
|
||||||
|
from prod.clean_arch.dita_v2.rust_backend import _get_rust, _intent_to_payload
|
||||||
|
|
||||||
|
|
||||||
|
def _kernel():
|
||||||
|
return ExecutionKernel(
|
||||||
|
control_plane=InMemoryControlPlane(
|
||||||
|
KernelControlSnapshot(mode=KernelMode.DEBUG, verbosity=KernelVerbosity.TRACE)
|
||||||
|
),
|
||||||
|
venue=MockVenueAdapter(MockVenueScenario(emit_fill_on_submit=True, partial_fill_ratio=1.0)),
|
||||||
|
journal=MemoryKernelJournal(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _intent(size, price, lev=3.0):
|
||||||
|
return KernelIntent(
|
||||||
|
timestamp=datetime.now(timezone.utc), intent_id="i", trade_id="T", slot_id=0,
|
||||||
|
asset="BTCUSDT", side=TradeSide.SHORT, action=KernelCommandType.ENTER,
|
||||||
|
reference_price=price, target_size=size, leverage=lev, exit_leg_ratios=(1.0,), reason="X",
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("size,price,lev,field", [
|
||||||
|
(float("inf"), 100.0, 3.0, "target_size"),
|
||||||
|
(float("nan"), 100.0, 3.0, "target_size"),
|
||||||
|
(0.1, float("inf"), 3.0, "reference_price"),
|
||||||
|
(0.1, 100.0, float("nan"), "leverage"),
|
||||||
|
(-0.1, 100.0, 3.0, "target_size"),
|
||||||
|
])
|
||||||
|
def test_bridge_rejects_nonfinite_intent(size, price, lev, field):
|
||||||
|
out = _kernel().process_intent(_intent(size, price, lev))
|
||||||
|
assert out.accepted is False
|
||||||
|
assert out.diagnostic_code == KernelDiagnosticCode.INVALID_INTENT
|
||||||
|
assert out.details.get("field") == field
|
||||||
|
|
||||||
|
|
||||||
|
def test_finite_intent_still_accepted():
|
||||||
|
out = _kernel().process_intent(_intent(0.15, 100000.0))
|
||||||
|
assert out.accepted is True
|
||||||
|
assert out.diagnostic_code == KernelDiagnosticCode.OK
|
||||||
|
|
||||||
|
|
||||||
|
def test_rust_kernel_rejects_nonfinite_payload_without_null_crash():
|
||||||
|
# Bypass the Python bridge guard: hand a non-finite payload straight to the
|
||||||
|
# Rust FFI (json.dumps emits the Infinity token serde rejects). The kernel
|
||||||
|
# must return a clean INVALID_INTENT outcome, not a null string.
|
||||||
|
k = _kernel()
|
||||||
|
payload = _intent_to_payload(_intent(float("inf"), 100.0))
|
||||||
|
res = _get_rust().process_intent(k._backend, payload, mode="NORMAL", verbosity="QUIET")
|
||||||
|
assert res["outcome"]["diagnostic_code"] == "INVALID_INTENT"
|
||||||
|
assert res["outcome"]["accepted"] is False
|
||||||
Reference in New Issue
Block a user