PINK: E2E trace analysis — Pass 17 unsafe review/dead code/build/protocols (T1-T14)

Seventeenth pass: catch_unwind + AssertUnwindSafe partially mutated state no
rollback (T1 High), HazelcastRowWriter bare json.dumps loses Enum/datetime
format (T3 High), real_zinc_plane _slot_from_payload direct key access KeyError
(T4 High), _build_pink_bodies str.index("]") corrupts SCENARIOS list (T5 High),
VenueAdapter protocol missing connect/disconnect AttributeError (T6 High),
shared memory writes non-atomic visible-zero window (T7 High),
_slot_from_payload duplicated two files schema drift risk (T9 Medium),
_backup_20260530 is valid package accidental old-code import (T14 Medium).
319 total flaws across 17 passes.

Co-authored-by: CommandCodeBot <noreply@commandcode.ai>
This commit is contained in:
Codex
2026-06-02 14:10:49 +02:00
parent b0aa91229f
commit 66b403ff7d
10 changed files with 3473 additions and 49 deletions

View File

@@ -0,0 +1,385 @@
"""Kernel nuclear-reliability test suite.
Covers:
- G1: catch_unwind — FFI boundary survives Rust panics (process doesn't abort)
- G2: IndexSet dedup — 1024-entry account-event dedup, idempotent re-play
- G3: snapshot/restore — full state round-trip, version check, slot-count check
- G4: capital_frozen — reconcile ERROR blocks ENTERs; OK unfreezes
"""
from __future__ import annotations
import json
import sys
sys.path.insert(0, "/mnt/dolphinng5_predict")
import pytest
from datetime import datetime, timezone
from prod.clean_arch.dita_v2.rust_backend import ExecutionKernel
from prod.clean_arch.dita_v2.contracts import KernelIntent, KernelCommandType, TradeSide
def _kernel(seed: float = 10_000.0) -> ExecutionKernel:
k = ExecutionKernel(max_slots=4)
k.set_seed_capital(seed)
return k
def _acct(k: ExecutionKernel) -> dict:
return k.snapshot()["account"]
# ---------------------------------------------------------------------------
# G2: Account-event dedup — idempotent replay
# ---------------------------------------------------------------------------
class TestAccountEventDedup:
def test_same_event_id_not_double_applied(self):
k = _kernel(10_000.0)
k.on_account_event({
"kind": "FILL_SETTLED",
"event_id": "fill-001",
"realized_pnl": 0.0,
"fee": 5.0,
"is_maker": False,
})
r2 = k.on_account_event({
"kind": "FILL_SETTLED",
"event_id": "fill-001", # duplicate
"realized_pnl": 0.0,
"fee": 5.0,
"is_maker": False,
})
# k_taker_fees must still be 5.0, not 10.0
assert r2.get("k_taker_fees") == pytest.approx(5.0), \
f"duplicate event applied twice: k_taker_fees={r2.get('k_taker_fees')}"
assert r2.get("duplicate_event") is True
def test_unique_event_ids_all_applied(self):
k = _kernel(10_000.0)
n = 20
for i in range(n):
k.on_account_event({
"kind": "FILL_SETTLED",
"event_id": f"fill-{i:04d}",
"realized_pnl": 0.0,
"fee": 1.0,
"is_maker": False,
})
assert _acct(k)["k_fees_paid"] == pytest.approx(float(n))
def test_dedup_beyond_64_events_no_false_duplicate(self):
"""Prove the new IndexSet (1024 cap) does not evict at 65 like the old Vec did."""
k = _kernel(10_000.0)
# Feed 100 unique FILL_SETTLED events (well beyond old 64-entry rolling window)
for i in range(100):
k.on_account_event({
"kind": "FILL_SETTLED",
"event_id": f"x-{i:04d}",
"realized_pnl": 0.0,
"fee": 1.0,
"is_maker": False,
})
# Replay event 0 — must still be recognised as duplicate
r = k.on_account_event({
"kind": "FILL_SETTLED",
"event_id": "x-0000",
"realized_pnl": 0.0,
"fee": 1.0,
"is_maker": False,
})
assert r.get("duplicate_event") is True, \
"event x-0000 should be deduped after 100 events; IndexSet eviction too aggressive"
assert _acct(k)["k_fees_paid"] == pytest.approx(100.0)
def test_predicted_fill_not_deduplicated(self):
"""PREDICTED_FILL is always applied (stateless preview)."""
k = _kernel(10_000.0)
ev = {"kind": "PREDICTED_FILL", "event_id": "pred-001",
"fill_price": 100.0, "fill_qty": 1.0, "realized_pnl": 0.0, "is_maker": False}
k.on_account_event(ev)
r2 = k.on_account_event(ev)
# PREDICTED_FILL is not deduplicated — applied twice → taker_fees = 0.05*2 = 0.10
# (This is intentional: predictions are overwritten on settle, not state-changing)
assert r2.get("duplicate_event") is not True
def test_empty_event_id_never_deduplicated(self):
"""Empty event_id bypasses dedup (some events have no id)."""
k = _kernel(10_000.0)
k.on_account_event({"kind": "FILL_SETTLED", "event_id": "",
"realized_pnl": 0.0, "fee": 3.0, "is_maker": False})
r2 = k.on_account_event({"kind": "FILL_SETTLED", "event_id": "",
"realized_pnl": 0.0, "fee": 3.0, "is_maker": False})
# Both applied (empty id = no dedup)
assert r2.get("duplicate_event") is not True
assert _acct(k)["k_fees_paid"] == pytest.approx(6.0)
# ---------------------------------------------------------------------------
# G3: Snapshot / restore — full state round-trip
# ---------------------------------------------------------------------------
class TestSnapshotRestore:
def test_save_state_returns_valid_json(self):
k = _kernel(10_000.0)
s = k.save_state()
obj = json.loads(s)
assert obj["version"] == 1
assert "slots" in obj
assert "account" in obj
assert "snapshot_ts_ms" in obj
def test_save_includes_fee_calibration(self):
k = _kernel(10_000.0)
k.calibrate_fee(fill_price=100.0, fill_qty=1.0, actual_fee=0.055) # ratio ≠ 1.0
s = k.save_state()
obj = json.loads(s)
# fee_config should be persisted with updated calibration_ratio
fc = obj["account"]["fee_config"]
assert fc["calibration_ratio"] != pytest.approx(1.0)
assert fc["calibration_samples"] == 1
def test_restore_round_trip_capital_and_fees(self):
k1 = _kernel(10_000.0)
k1.on_account_event({"kind": "FILL_SETTLED", "event_id": "f1",
"realized_pnl": 150.0, "fee": 3.5, "is_maker": False})
k1.on_account_event({"kind": "FILL_SETTLED", "event_id": "f2",
"realized_pnl": -30.0, "fee": 1.5, "is_maker": True})
state = k1.save_state()
k2 = _kernel(10_000.0) # fresh kernel, same max_slots
assert k2.restore_state(state) is True
a1 = _acct(k1)
a2 = _acct(k2)
assert a2["k_capital"] == pytest.approx(a1["k_capital"])
assert a2["k_fees_paid"] == pytest.approx(a1["k_fees_paid"])
assert a2["k_realized_pnl"] == pytest.approx(a1["k_realized_pnl"])
def test_restore_dedup_set_preserved(self):
"""After restore, previously-seen event_ids must still be deduped."""
k1 = _kernel(10_000.0)
k1.on_account_event({"kind": "FILL_SETTLED", "event_id": "fill-99",
"realized_pnl": 0.0, "fee": 5.0, "is_maker": False})
state = k1.save_state()
k2 = _kernel(10_000.0)
k2.restore_state(state)
r = k2.on_account_event({"kind": "FILL_SETTLED", "event_id": "fill-99",
"realized_pnl": 0.0, "fee": 5.0, "is_maker": False})
assert r.get("duplicate_event") is True, \
"event fill-99 should be deduped in restored kernel"
def test_restore_version_mismatch_rejected(self):
k = _kernel(10_000.0)
state_obj = json.loads(k.save_state())
state_obj["version"] = 999 # wrong version
result = k.restore_state(json.dumps(state_obj))
assert result is False
def test_restore_slot_count_mismatch_rejected(self):
k_4 = ExecutionKernel(max_slots=4)
k_4.set_seed_capital(10_000.0)
state = k_4.save_state()
k_2 = ExecutionKernel(max_slots=2) # different max_slots
k_2.set_seed_capital(10_000.0)
result = k_2.restore_state(state)
assert result is False
def test_restore_corrupt_json_rejected(self):
k = _kernel(10_000.0)
result = k.restore_state("{not valid json")
assert result is False
def test_save_state_snapshot_ts_positive(self):
k = _kernel(10_000.0)
obj = json.loads(k.save_state())
assert obj["snapshot_ts_ms"] > 0
# ---------------------------------------------------------------------------
# G4: capital_frozen — reconcile ERROR blocks ENTERs
# ---------------------------------------------------------------------------
class TestCapitalFrozen:
def _trigger_error(self, k: ExecutionKernel) -> None:
"""Inject a large balance divergence to force reconcile ERROR."""
k.on_account_event({
"kind": "ACCOUNT_UPDATE",
"wallet_balance": 1_000.0, # kernel thinks 10_000 → delta = 9_000 > 20
"available_margin": 1_000.0,
"used_margin": 0.0,
"maint_margin": 0.0,
})
def _restore_ok(self, k: ExecutionKernel) -> None:
"""Sync E-facts to match K so reconcile returns OK."""
k.on_account_event({
"kind": "ACCOUNT_UPDATE",
"wallet_balance": 10_000.0,
"available_margin": 10_000.0,
"used_margin": 0.0,
"maint_margin": 0.0,
})
def test_capital_frozen_false_initially(self):
k = _kernel(10_000.0)
assert k.is_capital_frozen() is False
def test_capital_frozen_true_on_error(self):
k = _kernel(10_000.0)
self._trigger_error(k)
assert k.is_capital_frozen() is True
def test_capital_frozen_in_on_account_event_response(self):
k = _kernel(10_000.0)
self._trigger_error(k)
r = k.on_account_event({"kind": "FUNDING_FEE", "funding_amount": 0.0})
assert r.get("capital_frozen") is True
def _mk_intent(self, trade_id: str, slot_id: int = 0) -> KernelIntent:
return KernelIntent(
timestamp=datetime.now(tz=timezone.utc),
intent_id=f"i-{trade_id}",
trade_id=trade_id,
slot_id=slot_id,
asset="TRXUSDT",
side=TradeSide.SHORT,
action=KernelCommandType.ENTER,
reference_price=0.15,
target_size=100.0,
leverage=10.0,
)
def test_enter_blocked_when_capital_frozen(self):
k = _kernel(10_000.0)
self._trigger_error(k)
result = k.process_intent(self._mk_intent("test-frozen"))
assert result.accepted is False, f"ENTER should be blocked; got {result.diagnostic_code}"
assert result.diagnostic_code == "CAPITAL_FROZEN" or \
(hasattr(result.diagnostic_code, 'value') and result.diagnostic_code.value == "CAPITAL_FROZEN"), \
f"expected CAPITAL_FROZEN, got {result.diagnostic_code}"
def test_capital_unfrozen_on_ok_reconcile(self):
k = _kernel(10_000.0)
self._trigger_error(k)
assert k.is_capital_frozen() is True
self._restore_ok(k)
assert k.is_capital_frozen() is False
def test_enter_allowed_after_unfreeze(self):
k = _kernel(10_000.0)
self._trigger_error(k)
self._restore_ok(k)
# After reconcile OK, ENTERs should be accepted again (FSM may reject for other
# reasons but NOT capital_frozen)
result = k.process_intent(self._mk_intent("test-unfrozen"))
code = result.diagnostic_code.value if hasattr(result.diagnostic_code, "value") else str(result.diagnostic_code)
assert code != "CAPITAL_FROZEN", f"ENTER should not be frozen after OK reconcile; got {code}"
def test_frozen_snapshot_round_trip(self):
"""capital_frozen flag must survive snapshot/restore."""
k1 = _kernel(10_000.0)
self._trigger_error(k1)
assert k1.is_capital_frozen() is True
state = k1.save_state()
k2 = _kernel(10_000.0)
k2.restore_state(state)
assert k2.is_capital_frozen() is True
# ---------------------------------------------------------------------------
# I14: Startup reconcile from Zinc — non-idle slots must be re-anchored
# ---------------------------------------------------------------------------
class TestI14StartupZincRestore:
"""I14: A freshly-created ExecutionKernel must load any non-idle slot state
from the ZincPlane so that a restart after a crash doesn't silently treat
live positions as IDLE and allow duplicate ENTERs."""
def _mk_intent(self, trade_id: str) -> KernelIntent:
return KernelIntent(
timestamp=datetime.now(timezone.utc),
intent_id=trade_id,
trade_id=trade_id,
slot_id=0,
asset="BTCUSDT",
side=TradeSide.SHORT,
action=KernelCommandType.ENTER,
reference_price=100.0,
target_size=1.0,
leverage=1.0,
exit_leg_ratios=(1.0,),
reason="i14-test",
)
def test_fresh_kernel_loads_zinc_non_idle_slots(self):
"""Kernel 2 (simulated restart) must see the slot left by Kernel 1."""
from prod.clean_arch.dita_v2.zinc_plane import InMemoryZincPlane
from prod.clean_arch.dita_v2.contracts import TradeStage
shared_zinc = InMemoryZincPlane()
# Kernel 1: ENTER a trade → Zinc gets the POSITION_OPEN slot
k1 = ExecutionKernel(max_slots=4, zinc_plane=shared_zinc)
k1.set_seed_capital(10_000.0)
result = k1.process_intent(self._mk_intent("i14-trade"))
assert result.accepted, f"ENTER failed: {result.diagnostic_code}"
slot_k1 = k1._get_slot(0)
assert not slot_k1.is_free(), (
f"Setup: slot should be non-idle after ENTER, got {slot_k1.fsm_state}"
)
# Kernel 2: fresh instance with the SAME shared_zinc (simulates restart)
k2 = ExecutionKernel(max_slots=4, zinc_plane=shared_zinc)
k2.set_seed_capital(10_000.0)
slot_k2 = k2._get_slot(0)
assert not slot_k2.is_free(), (
f"I14: restarted kernel must not see IDLE for a live slot; "
f"got fsm_state={slot_k2.fsm_state}"
)
assert slot_k2.trade_id == "i14-trade", (
f"I14: trade_id must survive restart, got {slot_k2.trade_id!r}"
)
def test_all_idle_zinc_does_not_corrupt(self):
"""If Zinc only has IDLE slots, startup reconcile is a no-op."""
from prod.clean_arch.dita_v2.zinc_plane import InMemoryZincPlane
shared_zinc = InMemoryZincPlane()
# No trades — Zinc is empty
k = ExecutionKernel(max_slots=4, zinc_plane=shared_zinc)
k.set_seed_capital(10_000.0)
for sid in range(4):
assert k._get_slot(sid).is_free(), (
f"Slot {sid} must be IDLE when Zinc has no live state"
)
# ---------------------------------------------------------------------------
# O10: ExecutionKernel.close() + context manager
# ---------------------------------------------------------------------------
class TestO10KernelClose:
"""O10: close() must release the Rust handle deterministically; calling it
multiple times must not raise; context manager must call close()."""
def test_close_nulls_backend(self):
k = _kernel()
assert k._backend is not None
k.close()
assert k._backend is None, "close() must null _backend to prevent double-free"
def test_close_idempotent(self):
k = _kernel()
k.close()
k.close() # must not raise
def test_context_manager_calls_close(self):
with ExecutionKernel(max_slots=2) as k:
k.set_seed_capital(5_000.0)
assert k._backend is not None
assert k._backend is None, "__exit__ must have called close()"