PINK DITAv2: kernel-level finiteness guard (no more null-string crash on inf/NaN)
The aborted hard cutover crash-looped with "Rust kernel returned null string" from process_intent on the first live trading step. Root cause (reproduced): a non-finite (inf/NaN) numeric field reaching the kernel — Python json.dumps emits the Infinity/NaN token, serde_json rejects it at parse, and the FFI returned null. Magnitude is fine; only finiteness was the problem. Defense in depth, kernel catches it: - Rust FFI (lib.rs): dita_kernel_process_intent_json / _on_venue_event_json now return a clean INVALID_INTENT KernelResult on parse failure (incl. Infinity/NaN tokens) AND on serialize failure (a non-finite produced internally) — never a null string. - Python bridge (rust_backend.py): ExecutionKernel.process_intent validates intent finiteness/bounds (target_size, reference_price, limit_price, leverage, exit_leg_ratios; size>=0) BEFORE the FFI and rejects INVALID_INTENT, naming the offending field+value. - contracts.py: add KernelDiagnosticCode.INVALID_INTENT. - pink_direct.py: on INVALID_INTENT, log full upstream provenance (snapshot.price, capital, leverage, sizes) so the numerical SOURCE can be located on the next live run. - on_venue_event bridge tolerates the fallback's null slot (uses the live slot). Verified: kernel recompiled; offline 65 + 7 new guard tests green (no regression); direct-FFI inf payload -> INVALID_INTENT (no null crash). NOTE: this turns the cutover crash into a clean rejection — the upstream source of the non-finite (the live run's inf) still needs locating, now aided by the provenance log. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -1477,6 +1477,31 @@ fn into_c_string(value: &str) -> *mut c_char {
|
||||
CString::new(value).unwrap().into_raw()
|
||||
}
|
||||
|
||||
/// Build a well-formed INVALID_INTENT KernelResult JSON string. Used when the
|
||||
/// kernel cannot safely process a request — a payload that fails to parse
|
||||
/// (e.g. non-finite Infinity/NaN tokens serde rejects) or a result that fails
|
||||
/// to serialize (a non-finite value produced internally). This keeps the kernel
|
||||
/// from ever returning a null string on non-finite input/output: it rejects
|
||||
/// cleanly with a diagnostic instead of panicking.
|
||||
fn invalid_intent_cstring(reason: &str, detail: &str) -> *mut c_char {
|
||||
let fallback = serde_json::json!({
|
||||
"outcome": {
|
||||
"accepted": false,
|
||||
"slot_id": 0,
|
||||
"trade_id": "",
|
||||
"state": "IDLE",
|
||||
"diagnostic_code": "INVALID_INTENT",
|
||||
"severity": "WARNING",
|
||||
"transitions": [],
|
||||
"emitted_events": [],
|
||||
"details": {"reason": reason, "detail": detail}
|
||||
},
|
||||
"slot": serde_json::Value::Null,
|
||||
"snapshot": serde_json::Value::Null
|
||||
});
|
||||
into_c_string(&fallback.to_string())
|
||||
}
|
||||
|
||||
fn with_handle_mut<F, R>(handle: *mut KernelHandle, f: F) -> Result<R, String>
|
||||
where
|
||||
F: FnOnce(&mut KernelCore) -> Result<R, String>,
|
||||
@@ -1551,11 +1576,21 @@ pub extern "C" fn dita_kernel_process_intent_json(
|
||||
};
|
||||
let control_mode = cstr_to_string(control_mode).unwrap_or_else(|_| "NORMAL".to_string());
|
||||
let control_verbosity = cstr_to_string(control_verbosity).unwrap_or_else(|_| "QUIET".to_string());
|
||||
// Reject non-parseable payloads (incl. non-finite Infinity/NaN tokens serde
|
||||
// refuses) with a clean INVALID_INTENT, never a null string.
|
||||
let intent: KernelIntent = match serde_json::from_str(&payload) {
|
||||
Ok(value) => value,
|
||||
Err(err) => return invalid_intent_cstring("INVALID_INTENT_PARSE", &err.to_string()),
|
||||
};
|
||||
match with_handle_mut(handle, |core| {
|
||||
let intent: KernelIntent = serde_json::from_str(&payload).map_err(|err| err.to_string())?;
|
||||
Ok(core.process_intent(intent, &control_mode, &control_verbosity))
|
||||
Ok::<_, String>(core.process_intent(intent, &control_mode, &control_verbosity))
|
||||
}) {
|
||||
Ok(result) => serde_json::to_string(&result).ok().map(|s| into_c_string(&s)).unwrap_or(ptr::null_mut()),
|
||||
// A serialize failure means a non-finite value reached the output; reject
|
||||
// cleanly rather than returning null.
|
||||
Ok(result) => match serde_json::to_string(&result) {
|
||||
Ok(s) => into_c_string(&s),
|
||||
Err(err) => invalid_intent_cstring("INVALID_INTENT_SERIALIZE", &err.to_string()),
|
||||
},
|
||||
Err(_) => ptr::null_mut(),
|
||||
}
|
||||
}
|
||||
@@ -1573,11 +1608,17 @@ pub extern "C" fn dita_kernel_on_venue_event_json(
|
||||
};
|
||||
let control_mode = cstr_to_string(control_mode).unwrap_or_else(|_| "NORMAL".to_string());
|
||||
let control_verbosity = cstr_to_string(control_verbosity).unwrap_or_else(|_| "QUIET".to_string());
|
||||
let event: VenueEvent = match serde_json::from_str(&payload) {
|
||||
Ok(value) => value,
|
||||
Err(err) => return invalid_intent_cstring("INVALID_EVENT_PARSE", &err.to_string()),
|
||||
};
|
||||
match with_handle_mut(handle, |core| {
|
||||
let event: VenueEvent = serde_json::from_str(&payload).map_err(|err| err.to_string())?;
|
||||
Ok(core.on_venue_event(event, &control_mode, &control_verbosity))
|
||||
Ok::<_, String>(core.on_venue_event(event, &control_mode, &control_verbosity))
|
||||
}) {
|
||||
Ok(result) => serde_json::to_string(&result).ok().map(|s| into_c_string(&s)).unwrap_or(ptr::null_mut()),
|
||||
Ok(result) => match serde_json::to_string(&result) {
|
||||
Ok(s) => into_c_string(&s),
|
||||
Err(err) => invalid_intent_cstring("INVALID_EVENT_SERIALIZE", &err.to_string()),
|
||||
},
|
||||
Err(_) => ptr::null_mut(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,6 +74,7 @@ class KernelDiagnosticCode(str, Enum):
|
||||
OK = "OK"
|
||||
RATE_LIMITED = "RATE_LIMITED"
|
||||
INVALID_SLOT_ID = "INVALID_SLOT_ID"
|
||||
INVALID_INTENT = "INVALID_INTENT"
|
||||
UNSUPPORTED_INTENT = "UNSUPPORTED_INTENT"
|
||||
SLOT_BUSY = "SLOT_BUSY"
|
||||
NO_OPEN_POSITION = "NO_OPEN_POSITION"
|
||||
|
||||
@@ -14,6 +14,7 @@ from pathlib import Path
|
||||
from typing import Any, Dict, Iterable, List, Optional, Sequence
|
||||
import ctypes
|
||||
import json
|
||||
import math
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
@@ -281,6 +282,29 @@ def _slot_from_payload(payload: Dict[str, Any]) -> TradeSlot:
|
||||
)
|
||||
|
||||
|
||||
def _first_invalid_intent_field(intent: KernelIntent) -> Optional[tuple[str, float]]:
|
||||
"""Return (field, value) for the first non-finite or out-of-bounds numeric
|
||||
field on an intent, or None if all are sane. Guards the kernel boundary
|
||||
against inf/NaN that would otherwise crash serde_json serialization."""
|
||||
scalar_checks = (
|
||||
("target_size", float(intent.target_size if intent.target_size is not None else 0.0)),
|
||||
("reference_price", float(intent.reference_price if intent.reference_price is not None else 0.0)),
|
||||
("leverage", float(intent.leverage if intent.leverage is not None else 0.0)),
|
||||
("limit_price", float(getattr(intent, "limit_price", 0.0) or 0.0)),
|
||||
)
|
||||
for name, value in scalar_checks:
|
||||
if not math.isfinite(value):
|
||||
return (name, value)
|
||||
for idx, ratio in enumerate(intent.exit_leg_ratios or ()): # type: ignore[union-attr]
|
||||
rv = float(ratio if ratio is not None else 0.0)
|
||||
if not math.isfinite(rv):
|
||||
return (f"exit_leg_ratios[{idx}]", rv)
|
||||
size = float(intent.target_size if intent.target_size is not None else 0.0)
|
||||
if size < 0.0:
|
||||
return ("target_size", size)
|
||||
return None
|
||||
|
||||
|
||||
def _intent_to_payload(intent: KernelIntent) -> Dict[str, Any]:
|
||||
return {
|
||||
"timestamp": intent.timestamp.isoformat() if hasattr(intent.timestamp, "isoformat") else str(intent.timestamp),
|
||||
@@ -561,6 +585,29 @@ class ExecutionKernel:
|
||||
diagnostic_code=KernelDiagnosticCode.INVALID_SLOT_ID,
|
||||
details={"reason": "INVALID_SLOT_ID", "slot_id": int(intent.slot_id), "intent_id": intent.intent_id},
|
||||
)
|
||||
# Finiteness / sanity guard at the kernel boundary. A non-finite (inf/NaN)
|
||||
# numeric field would make the Rust core's serde_json serialization return
|
||||
# a null string (panic). Reject cleanly with INVALID_INTENT instead, naming
|
||||
# the offending field + value so the upstream numerical source can be located.
|
||||
bad_field = _first_invalid_intent_field(intent)
|
||||
if bad_field is not None:
|
||||
name, value = bad_field
|
||||
return KernelOutcome(
|
||||
accepted=False,
|
||||
slot_id=int(intent.slot_id),
|
||||
trade_id=intent.trade_id,
|
||||
state=self._get_slot(int(intent.slot_id)).fsm_state,
|
||||
diagnostic_code=KernelDiagnosticCode.INVALID_INTENT,
|
||||
severity=KernelSeverity.WARNING,
|
||||
details={
|
||||
"reason": "INVALID_INTENT",
|
||||
"field": name,
|
||||
"value": str(value),
|
||||
"intent_id": intent.intent_id,
|
||||
"action": intent.action.value,
|
||||
"asset": intent.asset,
|
||||
},
|
||||
)
|
||||
payload = _intent_to_payload(intent)
|
||||
result = _get_rust().process_intent(
|
||||
self._backend,
|
||||
@@ -648,7 +695,10 @@ class ExecutionKernel:
|
||||
verbosity=_enum_text(self.control.verbosity),
|
||||
)
|
||||
outcome = _outcome_from_payload(result["outcome"])
|
||||
slot = _slot_from_payload(result["slot"])
|
||||
# An INVALID_* fallback result carries a null slot; fall back to the
|
||||
# kernel's current slot so settlement/bookkeeping stays consistent.
|
||||
slot_payload = result.get("slot")
|
||||
slot = _slot_from_payload(slot_payload) if slot_payload else self._get_slot(int(outcome.slot_id))
|
||||
self.state.refresh()
|
||||
incremental_pnl = slot.realized_pnl - self._last_settled_pnl.get(slot.slot_id, 0.0)
|
||||
if abs(incremental_pnl) > 1e-12:
|
||||
|
||||
@@ -427,6 +427,27 @@ class PinkDirectRuntime:
|
||||
kernel_intent = _decision_to_kernel_intent(decision, intent, slot_id=0)
|
||||
outcome = self.kernel.process_intent(kernel_intent)
|
||||
|
||||
# Locate the source of any non-finite intent the kernel rejected:
|
||||
# log the full upstream provenance (snapshot price, account capital,
|
||||
# leverage, sizing) so a numerical error can be traced to its origin
|
||||
# rather than silently rejected.
|
||||
if outcome.diagnostic_code == KernelDiagnosticCode.INVALID_INTENT:
|
||||
self.logger.error(
|
||||
"INVALID_INTENT rejected by kernel: %s | provenance: "
|
||||
"snapshot.price=%r capital=%r open_positions=%r leverage=%r "
|
||||
"target_size=%r reference_price=%r limit_price=%r action=%s asset=%s",
|
||||
dict(outcome.details or {}),
|
||||
getattr(snapshot, "price", None),
|
||||
context.capital,
|
||||
context.open_positions,
|
||||
getattr(kernel_intent, "leverage", None),
|
||||
getattr(kernel_intent, "target_size", None),
|
||||
getattr(kernel_intent, "reference_price", None),
|
||||
getattr(kernel_intent, "limit_price", None),
|
||||
decision.action.value,
|
||||
intent.asset,
|
||||
)
|
||||
|
||||
# Read authoritative final state from kernel.
|
||||
final_slot = self.kernel.slot(0)
|
||||
slot_dict = final_slot.to_dict()
|
||||
|
||||
73
prod/tests/test_pink_invalid_intent_guard.py
Normal file
73
prod/tests/test_pink_invalid_intent_guard.py
Normal file
@@ -0,0 +1,73 @@
|
||||
"""Kernel-level finiteness guard: non-finite (inf/NaN) intents must be rejected
|
||||
with INVALID_INTENT, never crash the kernel ("Rust kernel returned null string").
|
||||
|
||||
Two layers (defense in depth):
|
||||
- Python bridge (ExecutionKernel.process_intent): rejects non-finite/insane fields
|
||||
before the FFI call, naming the offending field for source-location.
|
||||
- Rust kernel (FFI): a payload that fails to parse (incl. the Infinity/NaN tokens
|
||||
serde rejects) or a result that fails to serialize returns a clean INVALID_INTENT
|
||||
outcome instead of a null string.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import pytest
|
||||
|
||||
from prod.clean_arch.dita_v2 import (
|
||||
ExecutionKernel, InMemoryControlPlane, KernelCommandType, KernelControlSnapshot,
|
||||
KernelMode, KernelVerbosity, MemoryKernelJournal, MockVenueAdapter, MockVenueScenario,
|
||||
TradeSide,
|
||||
)
|
||||
from prod.clean_arch.dita_v2.contracts import KernelDiagnosticCode, KernelIntent
|
||||
from prod.clean_arch.dita_v2.rust_backend import _get_rust, _intent_to_payload
|
||||
|
||||
|
||||
def _kernel():
|
||||
return ExecutionKernel(
|
||||
control_plane=InMemoryControlPlane(
|
||||
KernelControlSnapshot(mode=KernelMode.DEBUG, verbosity=KernelVerbosity.TRACE)
|
||||
),
|
||||
venue=MockVenueAdapter(MockVenueScenario(emit_fill_on_submit=True, partial_fill_ratio=1.0)),
|
||||
journal=MemoryKernelJournal(),
|
||||
)
|
||||
|
||||
|
||||
def _intent(size, price, lev=3.0):
|
||||
return KernelIntent(
|
||||
timestamp=datetime.now(timezone.utc), intent_id="i", trade_id="T", slot_id=0,
|
||||
asset="BTCUSDT", side=TradeSide.SHORT, action=KernelCommandType.ENTER,
|
||||
reference_price=price, target_size=size, leverage=lev, exit_leg_ratios=(1.0,), reason="X",
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("size,price,lev,field", [
|
||||
(float("inf"), 100.0, 3.0, "target_size"),
|
||||
(float("nan"), 100.0, 3.0, "target_size"),
|
||||
(0.1, float("inf"), 3.0, "reference_price"),
|
||||
(0.1, 100.0, float("nan"), "leverage"),
|
||||
(-0.1, 100.0, 3.0, "target_size"),
|
||||
])
|
||||
def test_bridge_rejects_nonfinite_intent(size, price, lev, field):
|
||||
out = _kernel().process_intent(_intent(size, price, lev))
|
||||
assert out.accepted is False
|
||||
assert out.diagnostic_code == KernelDiagnosticCode.INVALID_INTENT
|
||||
assert out.details.get("field") == field
|
||||
|
||||
|
||||
def test_finite_intent_still_accepted():
|
||||
out = _kernel().process_intent(_intent(0.15, 100000.0))
|
||||
assert out.accepted is True
|
||||
assert out.diagnostic_code == KernelDiagnosticCode.OK
|
||||
|
||||
|
||||
def test_rust_kernel_rejects_nonfinite_payload_without_null_crash():
|
||||
# Bypass the Python bridge guard: hand a non-finite payload straight to the
|
||||
# Rust FFI (json.dumps emits the Infinity token serde rejects). The kernel
|
||||
# must return a clean INVALID_INTENT outcome, not a null string.
|
||||
k = _kernel()
|
||||
payload = _intent_to_payload(_intent(float("inf"), 100.0))
|
||||
res = _get_rust().process_intent(k._backend, payload, mode="NORMAL", verbosity="QUIET")
|
||||
assert res["outcome"]["diagnostic_code"] == "INVALID_INTENT"
|
||||
assert res["outcome"]["accepted"] is False
|
||||
Reference in New Issue
Block a user