Files
siloqy/prod/clean_arch/dita_v2/test_flaws.py

1482 lines
59 KiB
Python
Raw Normal View History

"""Comprehensive test battery for all 13 CRITICAL DITAv2 flaws.
Each test verifies that the specific flaw exists (pre-fix) and would pass
once the flaw is addressed. Tests use the MockVenueAdapter to avoid
requiring live BingX connectivity.
Run with:
python -m pytest prod/clean_arch/dita_v2/test_flaws.py -v
"""
from __future__ import annotations
import sys
sys.path.insert(0, "/mnt/dolphinng5_predict")
import asyncio
from datetime import datetime, timezone
from typing import Any, Dict, List
import pytest
from prod.clean_arch.dita_v2.contracts import (
KernelCommandType,
KernelDiagnosticCode,
KernelEventKind,
KernelIntent,
KernelOutcome,
KernelSeverity,
KernelTransition,
TradeSide,
TradeSlot,
TradeStage,
VenueEvent,
VenueEventStatus,
VenueOrder,
VenueOrderStatus,
)
from prod.clean_arch.dita_v2.mock_venue import MockVenueAdapter, MockVenueScenario
from prod.clean_arch.dita_v2.rust_backend import ExecutionKernel
from prod.clean_arch.dita_v2.account import AccountProjection
E = KernelCommandType
TS = TradeSide
def _mk_intent(
action: KernelCommandType = KernelCommandType.ENTER,
trade_id: str = "t1",
slot_id: int = 0,
asset: str = "BTCUSDT",
side: TradeSide = TradeSide.SHORT,
price: float = 100.0,
size: float = 1.0,
leverage: float = 1.0,
exit_leg_ratios: tuple = (1.0,),
**kw,
) -> KernelIntent:
return KernelIntent(
timestamp=datetime.now(timezone.utc),
intent_id=kw.pop("intent_id", trade_id),
trade_id=trade_id,
slot_id=slot_id,
asset=asset,
side=side,
action=action,
reference_price=price,
target_size=size,
leverage=leverage,
exit_leg_ratios=exit_leg_ratios,
reason=kw.pop("reason", f"auto_{action.value.lower()}"),
metadata=kw,
)
def _mk_venue_event(
kind: KernelEventKind,
trade_id: str = "t1",
slot_id: int = 0,
side: TradeSide = TradeSide.SHORT,
asset: str = "BTCUSDT",
price: float = 100.0,
size: float = 1.0,
filled_size: float = 1.0,
remaining_size: float = 0.0,
event_id: str = "",
venue_order_id: str = "V-1",
venue_client_id: str = "t1:t1",
status: VenueEventStatus = VenueEventStatus.FILLED,
reason: str = "",
) -> VenueEvent:
return VenueEvent(
timestamp=datetime.now(timezone.utc),
event_id=event_id or f"ev-{kind.value}-{trade_id}",
trade_id=trade_id,
slot_id=slot_id,
kind=kind,
status=status,
venue_order_id=venue_order_id,
venue_client_id=venue_client_id,
side=side,
asset=asset,
price=price,
size=size,
filled_size=filled_size,
remaining_size=remaining_size,
reason=reason,
)
def _fresh_kernel(
*,
scenario: MockVenueScenario = None,
max_slots: int = 2,
capital: float = 25000.0,
) -> ExecutionKernel:
venue = MockVenueAdapter(scenario=scenario or MockVenueScenario())
k = ExecutionKernel(max_slots=max_slots, venue=venue)
k.account.snapshot.capital = capital
k.account.snapshot.peak_capital = capital
k.account.snapshot.equity = capital
return k
# ============================================================
# FLAW 1: Entry-order cancellation is structurally broken
# ============================================================
class TestFlaw1EntryCancel:
"""CANCEL intent for entry orders must work, not just exit orders."""
def test_cancel_entry_order_accepted_by_rust(self):
"""Rust kernel must accept CANCEL for an entry order in ENTRY_WORKING."""
k = _fresh_kernel(scenario=MockVenueScenario(partial_fill_ratio=0.0, emit_fill_on_submit=False))
r = k.process_intent(_mk_intent(action=E.ENTER, trade_id="ce1"))
assert r.accepted, f"ENTER rejected: {r.diagnostic_code}"
slot = k._get_slot(0)
assert slot.fsm_state in {TradeStage.ORDER_REQUESTED, TradeStage.ENTRY_WORKING}
cancel_result = k.process_intent(_mk_intent(action=E.CANCEL, trade_id="ce1"))
assert cancel_result.accepted, (
f"CANCEL for entry order should be accepted, got "
f"accepted={cancel_result.accepted} "
f"diag={cancel_result.diagnostic_code}"
)
def test_cancel_entry_order_calls_venue_cancel(self):
"""Python bridge must call venue.cancel() on active_entry_order."""
scenario = MockVenueScenario(partial_fill_ratio=0.0, emit_fill_on_submit=False)
k = _fresh_kernel(scenario=scenario)
k.process_intent(_mk_intent(action=E.ENTER, trade_id="ce2"))
entry_order = k.slot(0).active_entry_order
assert entry_order is not None, "Entry order should be attached"
cancel_result = k.process_intent(_mk_intent(action=E.CANCEL, trade_id="ce2"))
assert cancel_result.accepted, f"CANCEL not accepted: {cancel_result.diagnostic_code}"
def test_cancel_entry_no_fill_returns_to_idle(self):
"""After cancelling an entry order that hasn't filled, slot must be IDLE."""
k = _fresh_kernel(scenario=MockVenueScenario(partial_fill_ratio=0.0, emit_fill_on_submit=False))
k.process_intent(_mk_intent(action=E.ENTER, trade_id="ce3"))
k.process_intent(_mk_intent(action=E.CANCEL, trade_id="ce3"))
slot = k._get_slot(0)
assert slot.is_free(), (
f"Slot should be free/IDLE after entry cancel, "
f"got state={slot.fsm_state} closed={slot.closed} "
f"entry_order={slot.active_entry_order} exit_order={slot.active_exit_order} "
f"size={slot.size}"
)
def test_cancel_entry_with_partial_fill(self):
"""Cancel entry with partial fill should leave slot in correct state."""
k = _fresh_kernel(scenario=MockVenueScenario(partial_fill_ratio=0.5))
k.process_intent(_mk_intent(action=E.ENTER, trade_id="ce4", size=0.002))
slot_after = k._get_slot(0)
assert slot_after.size > 0, "Should have partial fill"
def test_cancel_entry_then_reenter(self):
"""After entry cancel, a new ENTER should succeed."""
k = _fresh_kernel(scenario=MockVenueScenario(partial_fill_ratio=0.0, emit_fill_on_submit=False))
k.process_intent(_mk_intent(action=E.ENTER, trade_id="ce5a"))
k.process_intent(_mk_intent(action=E.CANCEL, trade_id="ce5a"))
r = k.process_intent(_mk_intent(action=E.ENTER, trade_id="ce5b"))
assert r.accepted, f"Re-entry after cancel should succeed: {r.diagnostic_code}"
# ============================================================
# FLAW 2: Rust CANCEL_ACK has no entry-order reset path
# ============================================================
class TestFlaw2CancelAckEntry:
"""CANCEL_ACK for entry orders must reset slot to IDLE."""
def test_cancel_ack_resets_entry_working_to_idle(self):
"""When CANCEL_ACK arrives for an entry order, slot goes IDLE."""
k = _fresh_kernel(scenario=MockVenueScenario(partial_fill_ratio=0.0, emit_fill_on_submit=False))
k.process_intent(_mk_intent(action=E.ENTER, trade_id="ca1"))
slot = k._get_slot(0)
assert slot.active_entry_order is not None
venue_order = slot.active_entry_order
ack = _mk_venue_event(
kind=KernelEventKind.CANCEL_ACK,
trade_id="ca1",
venue_order_id=venue_order.venue_order_id,
venue_client_id=venue_order.venue_client_id,
status=VenueEventStatus.CANCELED,
)
k.on_venue_event(ack)
slot = k._get_slot(0)
assert slot.fsm_state == TradeStage.IDLE, (
f"Slot should be IDLE after CANCEL_ACK on entry, got {slot.fsm_state}"
)
assert slot.active_entry_order is None, "Entry order should be cleared"
assert slot.trade_id == "", "Trade ID should be cleared"
assert slot.size == 0.0, "Size should be zero"
def test_cancel_ack_exit_still_works(self):
"""Existing exit-order CANCEL_ACK path must still work.
Deterministic setup: entry fills fully (POSITION_OPEN) but the exit only
partially fills, so the exit order stays live and the CANCEL_ACK exit
branch is genuinely exercised (no vacuous guard).
"""
k = _fresh_kernel(scenario=MockVenueScenario(exit_partial_fill_ratio=0.5))
k.process_intent(_mk_intent(action=E.ENTER, trade_id="ca2", size=0.002))
slot = k._get_slot(0)
assert slot.fsm_state == TradeStage.POSITION_OPEN, (
f"Entry should fill fully, got {slot.fsm_state}"
)
k.process_intent(_mk_intent(action=E.EXIT, trade_id="ca2", size=0.002))
slot = k._get_slot(0)
assert slot.active_exit_order is not None, (
"Exit order must remain live after a partial exit fill"
)
ack = _mk_venue_event(
kind=KernelEventKind.CANCEL_ACK,
trade_id="ca2",
venue_order_id=slot.active_exit_order.venue_order_id,
venue_client_id=slot.active_exit_order.venue_client_id,
status=VenueEventStatus.CANCELED,
)
k.on_venue_event(ack)
slot = k._get_slot(0)
assert slot.active_exit_order is None, "Exit order should be cleared by CANCEL_ACK"
assert slot.fsm_state == TradeStage.POSITION_OPEN, (
f"Exit cancel must return slot to POSITION_OPEN, got {slot.fsm_state}"
)
# ============================================================
# FLAW 3: Outcome mixes pre/post-venue state
# ============================================================
class TestFlaw3OutcomeConsistency:
"""process_intent outcome should have consistent state and transitions."""
def test_outcome_state_matches_actual_slot(self):
"""The outcome.state should reflect the final state after venue events."""
k = _fresh_kernel()
result = k.process_intent(_mk_intent(action=E.ENTER, trade_id="oc1"))
slot = k._get_slot(0)
assert result.state == slot.fsm_state, (
f"Outcome state {result.state} != actual slot state {slot.fsm_state}"
)
def test_outcome_transitions_includes_venue_events(self):
"""Transitions should include venue-event-triggered transitions."""
k = _fresh_kernel()
result = k.process_intent(_mk_intent(action=E.ENTER, trade_id="oc2"))
transition_triggers = [t.trigger for t in result.transitions]
assert len(result.transitions) >= 1, (
f"Should have at least 1 transition, got triggers: {transition_triggers}"
)
# ============================================================
# FLAW 4: Multi-leg exit final leg can double-close
# ============================================================
class TestFlaw4DoubleClose:
"""Multi-leg exit final leg should only close once."""
def test_single_close_after_final_leg(self):
"""After the last leg fills, slot.closed should be set exactly once."""
k = _fresh_kernel(scenario=MockVenueScenario())
k.process_intent(
_mk_intent(
action=E.ENTER,
trade_id="dc1",
size=0.002,
exit_leg_ratios=(0.5, 1.0),
)
)
k.process_intent(
_mk_intent(
action=E.EXIT,
trade_id="dc1",
size=0.001,
exit_leg_ratios=(0.5, 1.0),
)
)
k.process_intent(
_mk_intent(
action=E.EXIT,
trade_id="dc1",
size=0.001,
exit_leg_ratios=(1.0,),
)
)
slot = k._get_slot(0)
assert slot.closed, "Slot should be closed after final leg"
assert slot.fsm_state == TradeStage.CLOSED
def test_no_extra_entry_order_clear_on_close(self):
"""After close via multi-leg, active_entry_order should be consistent."""
k = _fresh_kernel(scenario=MockVenueScenario())
k.process_intent(
_mk_intent(
action=E.ENTER,
trade_id="dc2",
size=0.002,
exit_leg_ratios=(0.5, 1.0),
)
)
k.process_intent(
_mk_intent(
action=E.EXIT,
trade_id="dc2",
size=0.001,
exit_leg_ratios=(0.5, 1.0),
)
)
k.process_intent(
_mk_intent(
action=E.EXIT,
trade_id="dc2",
size=0.001,
exit_leg_ratios=(1.0,),
)
)
slot = k._get_slot(0)
assert slot.active_exit_order is None, "Exit order should be cleared"
assert slot.active_entry_order is None or slot.active_entry_order.status == VenueOrderStatus.FILLED
# ============================================================
# FLAW 5: Capital settlement only triggers on terminal states
# ============================================================
class TestFlaw5CapitalSettleOnPartialFill:
"""Realized PnL should settle incrementally on partial fills."""
def test_partial_exit_settles_pnl_incrementally(self):
"""Exit fill must settle realized PnL into capital — EXACTLY.
This is the single most important invariant in DITAv2: capital is
the kernel account's authority and must move by precisely the
realized PnL of the fill (no balance-poll overwrite). The entry and
exit prices differ so realized PnL is strictly nonzero and the
capital-change assertion fires unconditionally (no vacuous guard).
"""
k = _fresh_kernel()
cap_before = k.account.snapshot.capital
# SHORT entry at 100.
k.process_intent(
_mk_intent(action=E.ENTER, trade_id="ps1", side=TradeSide.SHORT, price=100.0, size=0.002)
)
slot = k._get_slot(0)
assert slot.fsm_state == TradeStage.POSITION_OPEN
# Exit at 90 -> SHORT closes in profit, realized PnL strictly positive.
k.process_intent(
_mk_intent(action=E.EXIT, trade_id="ps1", side=TradeSide.SHORT, price=90.0, size=0.002)
)
slot = k._get_slot(0)
assert slot.realized_pnl > 0.0, (
f"SHORT exit below entry must realize positive PnL, got {slot.realized_pnl}"
)
cap_after = k.account.snapshot.capital
# Single-authority invariant: capital moved by EXACTLY realized PnL.
assert abs((cap_after - cap_before) - slot.realized_pnl) < 1e-9, (
f"Capital delta {cap_after - cap_before} != realized_pnl {slot.realized_pnl} "
f"(before={cap_before} after={cap_after})"
)
# ============================================================
# FLAW 6: _legacy_intent silently drops order_type and limit_price
# ============================================================
class TestFlaw6LegacyIntentDrop:
"""_legacy_intent must preserve order_type and limit_price."""
def test_legacy_intent_preserves_order_type(self):
"""LegacyIntent conversion must include order_type."""
from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter
intent = _mk_intent(
action=E.ENTER,
trade_id="li1",
order_type="LIMIT",
limit_price=50000.0,
)
legacy = BingxVenueAdapter._legacy_intent(intent)
assert getattr(legacy, "order_type", None) == "LIMIT" or \
legacy.metadata.get("_order_type") == "LIMIT" or \
legacy.metadata.get("order_type") == "LIMIT", (
f"order_type not preserved in legacy intent. "
f"Legacy fields: {dir(legacy)}, metadata: {legacy.metadata}"
)
def test_legacy_intent_preserves_limit_price(self):
"""LegacyIntent conversion must include limit_price."""
from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter
intent = _mk_intent(
action=E.ENTER,
trade_id="li2",
order_type="LIMIT",
limit_price=50000.0,
)
legacy = BingxVenueAdapter._legacy_intent(intent)
assert getattr(legacy, "limit_price", 0) == 50000.0 or \
legacy.metadata.get("_limit_price") == 50000.0 or \
legacy.metadata.get("limit_price") == 50000.0, (
f"limit_price not preserved in legacy intent. "
f"Legacy metadata: {legacy.metadata}"
)
# ============================================================
# FLAW 7: Mock venue partial_fill_ratio applies to both entry and exit
# ============================================================
class TestFlaw7MockVenueRatios:
"""Mock venue should support different ratios for entry vs exit."""
def test_entry_exit_different_ratios(self):
"""Entry can fill fully while exit fills partially."""
k = _fresh_kernel(scenario=MockVenueScenario(
entry_partial_fill_ratio=1.0,
exit_partial_fill_ratio=0.5,
))
r = k.process_intent(_mk_intent(action=E.ENTER, trade_id="mv1", size=0.002))
assert r.accepted
slot = k._get_slot(0)
assert slot.fsm_state == TradeStage.POSITION_OPEN, f"Entry should fill fully: {slot.fsm_state}"
def test_per_action_type_ratios(self):
"""entry_partial_fill_ratio and exit_partial_fill_ratio should work independently."""
scenario = MockVenueScenario(
entry_partial_fill_ratio=1.0,
exit_partial_fill_ratio=0.3,
)
k = _fresh_kernel(scenario=scenario)
k.process_intent(_mk_intent(action=E.ENTER, trade_id="mv2", size=0.001))
slot = k._get_slot(0)
assert slot.fsm_state == TradeStage.POSITION_OPEN
assert slot.size == 0.001
# ============================================================
# FLAW 8: Per-asset price precision helper does not exist
# ============================================================
class TestFlaw8PricePrecision:
"""_format_price must exist for LIMIT order support."""
def test_format_price_exists_in_bingx_direct(self):
"""BingxDirectExecutionAdapter should have _format_price method."""
try:
from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter
assert hasattr(BingxDirectExecutionAdapter, "_format_price"), (
"_format_price method missing from BingxDirectExecutionAdapter"
)
except ImportError:
pytest.skip("bingx_direct not importable in this environment")
# ============================================================
# FLAW 9: Cancel path falls back to trade_id as symbol
# ============================================================
class TestFlaw9CancelSymbolFallback:
"""Cancel should use correct asset, not trade_id as fallback symbol."""
def test_cancel_uses_slot_asset_not_trade_id(self):
"""When cancel is called, the asset should come from the slot, not trade_id."""
k = _fresh_kernel(scenario=MockVenueScenario(partial_fill_ratio=0.0, emit_fill_on_submit=False))
k.process_intent(_mk_intent(action=E.ENTER, trade_id="cs1", asset="TRXUSDT"))
slot = k._get_slot(0)
# ACK-only (no fill) deterministically leaves the entry order live.
assert slot.active_entry_order is not None, (
"ACK-only entry must leave the entry order live for cancel-symbol fallback"
)
metadata = slot.active_entry_order.metadata
assert metadata.get("asset") == "TRXUSDT", (
f"Entry order metadata should contain asset. Got: {metadata}"
)
def test_mock_venue_cancel_event_has_asset(self):
"""Mock venue cancel events should carry the correct asset."""
k = _fresh_kernel(scenario=MockVenueScenario(partial_fill_ratio=0.0, emit_fill_on_submit=False))
k.process_intent(_mk_intent(action=E.ENTER, trade_id="cs2", asset="XRPUSDT"))
slot = k._get_slot(0)
order = slot.active_entry_order
assert order is not None
assert order.metadata.get("asset") is not None or order.metadata.get("slot_id") is not None
# ============================================================
# FLAW 10: Event dedup window is bounded at 64
# ============================================================
class TestFlaw10EventDedup:
"""Event dedup window should be large enough for realistic workloads."""
def test_dedup_window_accepts_many_events(self):
"""A slot should handle > 64 events without dedup eviction."""
k = _fresh_kernel()
k.process_intent(_mk_intent(action=E.ENTER, trade_id="ed1"))
for i in range(70):
ev = _mk_venue_event(
kind=KernelEventKind.MARK_PRICE,
trade_id="ed1",
event_id=f"mp-{i:04d}",
price=100.0 + i * 0.01,
size=0.0,
filled_size=0.0,
)
k.on_venue_event(ev)
slot = k._get_slot(0)
assert len(slot.seen_event_ids) >= 70, (
f"Expected >= 70 seen_event_ids, got {len(slot.seen_event_ids)}"
)
def test_dedup_eviction_does_not_accept_old_event(self):
"""Evicted event IDs should still be rejected (with larger window)."""
k = _fresh_kernel()
k.process_intent(_mk_intent(action=E.ENTER, trade_id="ed2"))
for i in range(70):
ev = _mk_venue_event(
kind=KernelEventKind.MARK_PRICE,
trade_id="ed2",
event_id=f"mp2-{i:04d}",
price=100.0 + i * 0.01,
size=0.0,
filled_size=0.0,
)
k.on_venue_event(ev)
old_ev = _mk_venue_event(
kind=KernelEventKind.MARK_PRICE,
trade_id="ed2",
event_id="mp2-0000",
price=99.0,
size=0.0,
filled_size=0.0,
)
result = k.on_venue_event(old_ev)
assert result.diagnostic_code == KernelDiagnosticCode.DUPLICATE_EVENT, (
f"Old evicted event should still be deduplicated, "
f"got {result.diagnostic_code}"
)
# ============================================================
# FLAW 11: Reconcile is a raw state override with no FSM validation
# ============================================================
class TestFlaw11ReconcileValidation:
"""Reconcile should validate slot state consistency."""
def test_reconcile_rejects_position_open_with_zero_size(self):
"""Reconciling with POSITION_OPEN but zero size should be rejected."""
k = _fresh_kernel()
bad_slot = TradeSlot(
slot_id=0,
fsm_state=TradeStage.POSITION_OPEN,
size=0.0,
asset="BTCUSDT",
trade_id="bad1",
)
result = k.reconcile_from_slots([bad_slot])
slot = k._get_slot(0)
assert slot.fsm_state != TradeStage.POSITION_OPEN or slot.size > 0, (
f"Reconcile should reject POSITION_OPEN with size=0, "
f"got state={slot.fsm_state} size={slot.size}"
)
def test_reconcile_rejects_idle_with_nonzero_size(self):
"""Reconciling with IDLE but nonzero size should be rejected."""
k = _fresh_kernel()
bad_slot = TradeSlot(
slot_id=0,
fsm_state=TradeStage.IDLE,
size=5.0,
asset="BTCUSDT",
trade_id="bad2",
)
result = k.reconcile_from_slots([bad_slot])
slot = k._get_slot(0)
assert slot.size == 0.0 or slot.fsm_state != TradeStage.IDLE, (
f"Reconcile should reject IDLE with size > 0, "
f"got state={slot.fsm_state} size={slot.size}"
)
def test_reconcile_accepts_valid_slot(self):
"""Valid slot data should still reconcile correctly."""
k = _fresh_kernel()
k.process_intent(_mk_intent(action=E.ENTER, trade_id="rv1"))
slot_data = k._get_slot(0)
result = k.reconcile_from_slots([slot_data])
assert result.accepted
# ============================================================
# FLAW 12: Outcome transitions are incomplete — pre-venue only
# ============================================================
class TestFlaw12OutcomeTransitions:
"""process_intent outcome transitions should include venue event transitions."""
def test_transitions_include_post_venue(self):
"""After a full entry cycle, transitions should include ORDER_ACK and FULL_FILL."""
k = _fresh_kernel()
result = k.process_intent(_mk_intent(action=E.ENTER, trade_id="ot1"))
triggers = [t.trigger for t in result.transitions]
assert any(t in triggers for t in ["ENTER_INTENT", "ORDER_ACK", "FULL_FILL"]), (
f"Transitions should include venue event triggers. Got: {triggers}"
)
def test_transitions_count_matches_lifecycle(self):
"""Full entry lifecycle should produce multiple transitions."""
k = _fresh_kernel()
result = k.process_intent(_mk_intent(action=E.ENTER, trade_id="ot2"))
slot = k._get_slot(0)
assert slot.fsm_state in {TradeStage.POSITION_OPEN, TradeStage.ENTRY_WORKING}, (
f"Default full-fill entry must open the position, got {slot.fsm_state}"
)
assert len(result.transitions) >= 2, (
f"Full entry should produce >= 2 transitions "
f"(intent + venue ack/fill), got {len(result.transitions)}: "
f"{[t.trigger for t in result.transitions]}"
)
# ============================================================
# FLAW 13: Unsettled realized PnL on re-entry
# ============================================================
class TestFlaw13UnsettledPnlOnReentry:
"""Re-entry should not silently discard unrealized settled PnL."""
def test_reentry_after_full_close_no_pnl_loss(self):
"""After full close and settle, re-entry should not lose PnL."""
k = _fresh_kernel()
cap_before = k.account.snapshot.capital
k.process_intent(_mk_intent(action=E.ENTER, trade_id="rp1"))
slot = k._get_slot(0)
assert slot.fsm_state == TradeStage.POSITION_OPEN
k.process_intent(
_mk_intent(action=E.EXIT, trade_id="rp1", price=100.5)
)
slot = k._get_slot(0)
assert slot.is_free()
cap_after_first = k.account.snapshot.capital
k.process_intent(_mk_intent(action=E.ENTER, trade_id="rp2"))
k.process_intent(
_mk_intent(action=E.EXIT, trade_id="rp2", price=101.0)
)
cap_after_second = k.account.snapshot.capital
assert cap_after_second > 0, "Capital should remain positive"
assert abs(cap_after_second - cap_before) < cap_before * 0.5
def test_pnl_warning_on_unsettled_reentry(self):
"""Re-entry on a slot with unsettled PnL should at least warn."""
k = _fresh_kernel(scenario=MockVenueScenario())
k.process_intent(_mk_intent(action=E.ENTER, trade_id="rw1"))
k.process_intent(_mk_intent(action=E.EXIT, trade_id="rw1"))
slot = k._get_slot(0)
assert slot.is_free(), "Full close must free the slot for re-entry"
r = k.process_intent(_mk_intent(action=E.ENTER, trade_id="rw2"))
assert r.accepted, "Re-entry on a freed slot must be accepted"
# ============================================================
# REGRESSION: Existing behaviour must not break
# ============================================================
class TestRegression:
"""Ensure existing happy-path scenarios still work."""
def test_basic_entry_exit(self):
k = _fresh_kernel()
cap_before = k.account.snapshot.capital
r1 = k.process_intent(_mk_intent(action=E.ENTER, trade_id="re1"))
assert r1.accepted
r2 = k.process_intent(_mk_intent(action=E.EXIT, trade_id="re1"))
assert r2.accepted
slot = k._get_slot(0)
assert slot.is_free()
def test_multi_leg_exit(self):
k = _fresh_kernel()
k.process_intent(
_mk_intent(action=E.ENTER, trade_id="re2", size=0.002, exit_leg_ratios=(0.5, 1.0))
)
k.process_intent(
_mk_intent(action=E.EXIT, trade_id="re2", size=0.001, exit_leg_ratios=(0.5, 1.0))
)
k.process_intent(
_mk_intent(action=E.EXIT, trade_id="re2", size=0.001, exit_leg_ratios=(1.0,))
)
slot = k._get_slot(0)
assert slot.is_free()
def test_slot_busy_rejection(self):
k = _fresh_kernel()
r1 = k.process_intent(_mk_intent(action=E.ENTER, trade_id="re3a"))
assert r1.accepted
r2 = k.process_intent(_mk_intent(action=E.ENTER, trade_id="re3b"))
assert not r2.accepted
assert r2.diagnostic_code == KernelDiagnosticCode.SLOT_BUSY
def test_exit_on_idle_rejected(self):
k = _fresh_kernel()
r = k.process_intent(_mk_intent(action=E.EXIT, trade_id="re4"))
assert not r.accepted
def test_reconcile_preserves_state(self):
k = _fresh_kernel()
k.process_intent(_mk_intent(action=E.ENTER, trade_id="re5"))
slot_data = k._get_slot(0)
k.reconcile_from_slots([slot_data])
slot_after = k._get_slot(0)
assert slot_after.trade_id == "re5"
def test_dedup_duplicate_event(self):
k = _fresh_kernel()
k.process_intent(_mk_intent(action=E.ENTER, trade_id="re6"))
slot = k._get_slot(0)
dup = _mk_venue_event(
kind=KernelEventKind.FULL_FILL,
trade_id="re6",
event_id="dedup-regression",
price=100.0,
size=1.0,
filled_size=1.0,
)
k.on_venue_event(dup)
result = k.on_venue_event(dup)
assert result.diagnostic_code == KernelDiagnosticCode.DUPLICATE_EVENT
def test_ten_cycles_no_leak(self):
k = _fresh_kernel()
for i in range(10):
k.process_intent(_mk_intent(action=E.ENTER, trade_id=f"tc{i}"))
k.process_intent(_mk_intent(action=E.EXIT, trade_id=f"tc{i}"))
slot = k._get_slot(0)
assert slot.is_free()
assert k.account.snapshot.capital > 0
# ============================================================
# I15: CANCEL_REJECT must un-stick EXIT_WORKING slot
# ============================================================
class TestI15CancelRejectUnstick:
"""CANCEL_REJECT on an exit order must clear active_exit_order and return
the slot to POSITION_OPEN so the algo can retry the exit."""
def _enter_to_position_open(self, k: ExecutionKernel, trade_id: str) -> None:
r = k.process_intent(_mk_intent(action=E.ENTER, trade_id=trade_id))
assert r.accepted, f"ENTER rejected: {r.diagnostic_code}"
slot = k._get_slot(0)
assert slot.fsm_state == TradeStage.POSITION_OPEN, (
f"Expected POSITION_OPEN after ENTER, got {slot.fsm_state}"
)
def test_cancel_reject_exits_working_returns_to_position_open(self):
"""Core I15 regression: CANCEL_REJECT on EXIT_WORKING must unstick slot."""
# partial_fill_ratio=0 prevents fills on submit; fills are injected manually.
k_no_fill = _fresh_kernel(scenario=MockVenueScenario(partial_fill_ratio=0.0, emit_fill_on_submit=False))
k_no_fill.process_intent(_mk_intent(action=E.ENTER, trade_id="i15b"))
# Manually force POSITION_OPEN by injecting FULL_FILL
fill = _mk_venue_event(
kind=KernelEventKind.FULL_FILL,
trade_id="i15b",
event_id="fill-i15b",
price=100.0,
size=1.0,
filled_size=1.0,
)
k_no_fill.on_venue_event(fill)
slot = k_no_fill._get_slot(0)
assert slot.fsm_state == TradeStage.POSITION_OPEN, (
f"Setup failed: expected POSITION_OPEN, got {slot.fsm_state}"
)
# Submit exit (no fill emitted) — slot enters EXIT_WORKING
k_no_fill.process_intent(_mk_intent(action=E.EXIT, trade_id="i15b"))
slot = k_no_fill._get_slot(0)
assert slot.fsm_state in (TradeStage.EXIT_WORKING, TradeStage.EXIT_REQUESTED, TradeStage.EXIT_SENT), (
f"Setup failed: expected an exit state, got {slot.fsm_state}"
)
assert slot.active_exit_order is not None, "Setup: active_exit_order should be set"
# Now deliver CANCEL_REJECT
cancel_rej = _mk_venue_event(
kind=KernelEventKind.CANCEL_REJECT,
trade_id="i15b",
event_id="cr-i15b",
status=VenueEventStatus.CANCELED,
)
result = k_no_fill.on_venue_event(cancel_rej)
slot = k_no_fill._get_slot(0)
assert slot.fsm_state == TradeStage.POSITION_OPEN, (
f"I15: slot must return to POSITION_OPEN after CANCEL_REJECT, got {slot.fsm_state}"
)
assert slot.active_exit_order is None, (
"I15: active_exit_order must be cleared by CANCEL_REJECT"
)
assert result.diagnostic_code == KernelDiagnosticCode.CANCEL_REJECTED
def test_after_cancel_reject_exit_can_be_resubmitted(self):
"""After CANCEL_REJECT un-sticks the slot, a new EXIT must be accepted."""
k = _fresh_kernel(scenario=MockVenueScenario(partial_fill_ratio=0.0, emit_fill_on_submit=False))
k.process_intent(_mk_intent(action=E.ENTER, trade_id="i15c"))
fill = _mk_venue_event(
kind=KernelEventKind.FULL_FILL,
trade_id="i15c",
event_id="fill-i15c",
price=100.0,
size=1.0,
filled_size=1.0,
)
k.on_venue_event(fill)
k.process_intent(_mk_intent(action=E.EXIT, trade_id="i15c"))
cancel_rej = _mk_venue_event(
kind=KernelEventKind.CANCEL_REJECT,
trade_id="i15c",
event_id="cr-i15c",
status=VenueEventStatus.CANCELED,
)
k.on_venue_event(cancel_rej)
# Slot is back to POSITION_OPEN — a new EXIT intent must be accepted
r = k.process_intent(_mk_intent(action=E.EXIT, trade_id="i15c"))
assert r.accepted, (
f"I15: retry EXIT after CANCEL_REJECT must be accepted, got {r.diagnostic_code}"
)
# ============================================================
# O5: _run() thread-pool path must time out, not hang forever
# ============================================================
class TestO5RunTimeout:
"""O5: BingxVenueAdapter._run() must raise TimeoutError instead of freezing
when the backend call exceeds the configured deadline."""
def test_run_raises_timeout_from_async_context(self, monkeypatch):
"""When called from inside an event loop and the backend is slow,
_run() must raise TimeoutError within the configured deadline."""
from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter
import asyncio
adapter = object.__new__(BingxVenueAdapter)
# Patch to a very short deadline so the test completes fast.
monkeypatch.setattr(BingxVenueAdapter, "_BACKEND_TIMEOUT_S", 0.15)
async def _slow_coroutine():
await asyncio.sleep(5.0)
return "never"
async def _run_from_async():
with pytest.raises(TimeoutError):
adapter._run(_slow_coroutine())
asyncio.run(_run_from_async())
def test_run_returns_normally_within_deadline(self, monkeypatch):
"""Fast backend calls must succeed and return their value."""
from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter
import asyncio
adapter = object.__new__(BingxVenueAdapter)
monkeypatch.setattr(BingxVenueAdapter, "_BACKEND_TIMEOUT_S", 2.0)
async def _fast_coroutine():
return 42
async def _run_from_async():
result = adapter._run(_fast_coroutine())
assert result == 42
asyncio.run(_run_from_async())
# ============================================================
# O1: _maybe_close() must not silently skip close from async context
# ============================================================
class TestO1MaybeCloseAsyncSafe:
"""O1: _maybe_close() must run the coroutine even when called from an
async context (previously it swallowed RuntimeError and skipped close)."""
def test_maybe_close_from_sync_context(self):
"""Sync caller: asyncio.run() path must run the close coroutine."""
from prod.clean_arch.dita_v2.launcher import _maybe_close
closed = []
class _FakeAsync:
async def close(self) -> None:
closed.append(True)
_maybe_close(_FakeAsync())
assert closed == [True], "close() coroutine must run from sync context"
def test_maybe_close_from_async_context(self):
"""Async caller: thread-pool path must run the close coroutine without
raising RuntimeError (the old silent-skip bug)."""
from prod.clean_arch.dita_v2.launcher import _maybe_close
closed = []
class _FakeAsync:
async def close(self) -> None:
closed.append(True)
async def _caller():
_maybe_close(_FakeAsync())
asyncio.run(_caller())
assert closed == [True], "close() coroutine must run from async context"
def test_maybe_close_sync_method_still_works(self):
"""Non-coroutine close() must still be called (no regression)."""
from prod.clean_arch.dita_v2.launcher import _maybe_close
closed = []
class _FakeSync:
def close(self) -> None:
closed.append(True)
_maybe_close(_FakeSync())
assert closed == [True], "sync close() must still be called"
# ============================================================
# V3: seen_event_ids must be cleared on slot reuse (ENTER after CLOSE)
# ============================================================
class TestV3SeenEventIdsClearedOnReuse:
"""V3: If seen_event_ids from a previous trade survive into the next trade
on the same slot, events whose IDs happen to match will be silently dropped.
This is guaranteed after a restart because the venue adapter's _event_seq
resets to 1, so EV-00000001 collides with the old trade's first event."""
def test_second_trade_fill_not_deduped(self):
"""Fill on a reused slot must not be swallowed by stale dedup set."""
k = _fresh_kernel()
# Trade 1: enter and exit
k.process_intent(_mk_intent(action=E.ENTER, trade_id="v3-t1"))
k.process_intent(_mk_intent(action=E.EXIT, trade_id="v3-t1"))
assert k._get_slot(0).is_free(), "Trade 1 must close cleanly"
# Trade 2 on the same slot — inject a fill with an event_id that
# matches what the venue adapter would assign after a restart (EV-00000001).
k.process_intent(_mk_intent(action=E.ENTER, trade_id="v3-t2"))
fill = _mk_venue_event(
kind=KernelEventKind.FULL_FILL,
trade_id="v3-t2",
event_id="EV-00000001", # same ID the adapter emits on restart
price=100.0,
size=1.0,
filled_size=1.0,
)
result = k.on_venue_event(fill)
slot = k._get_slot(0)
assert slot.fsm_state == TradeStage.POSITION_OPEN, (
f"V3: fill for trade 2 must not be deduped — got {slot.fsm_state}, "
f"diagnostic={result.diagnostic_code}"
)
def test_seen_event_ids_smaller_after_slot_reuse(self):
"""After a trade closes and a new ENTER starts, seen_event_ids must
contain only IDs from the new trade not the accumulated IDs from the
prior trade on the same slot."""
# Auto-fill kernel: each trade generates ~2 events (ORDER_ACK + FILL).
k = _fresh_kernel()
k.process_intent(_mk_intent(action=E.ENTER, trade_id="v3-s1"))
k.process_intent(_mk_intent(action=E.EXIT, trade_id="v3-s1"))
assert k._get_slot(0).is_free(), "Seed trade must close cleanly"
ids_after_seed = list(k._get_slot(0).seen_event_ids)
# Trade 1 generated at least 2 events (ORDER_ACK + FULL_FILL × 2)
assert len(ids_after_seed) >= 2, "Seed trade must have populated seen_event_ids"
# Fresh ENTER on same slot (auto-fills → adds ~2 more events).
k.process_intent(_mk_intent(action=E.ENTER, trade_id="v3-s2"))
ids_after_fresh = list(k._get_slot(0).seen_event_ids)
# With V3 fix: fresh trade starts from 0 then adds its own events → small count.
# Without fix: old IDs remain → count = len(ids_after_seed) + new_events.
assert len(ids_after_fresh) < len(ids_after_seed), (
f"V3: seen_event_ids must be cleared on ENTER. "
f"After seed: {len(ids_after_seed)} IDs, after fresh ENTER: {len(ids_after_fresh)} IDs. "
f"Expected fewer, not more."
)
# ============================================================
# V1+V2: LauncherBundle.close() wires kernel; BingxVenueAdapter.close()
# ============================================================
class TestV1V2LauncherAndVenueClose:
"""V1: LauncherBundle.close() must call kernel.close().
V2: BingxVenueAdapter.close() must exist and release the thread pool."""
def test_launcher_bundle_close_calls_kernel_close(self):
"""V1: kernel._backend must be None after bundle.close()."""
from prod.clean_arch.dita_v2.launcher import build_launcher_bundle
bundle = build_launcher_bundle(max_slots=2)
assert bundle.kernel._backend is not None
bundle.close()
assert bundle.kernel._backend is None, (
"V1: LauncherBundle.close() must call kernel.close()"
)
def test_bingx_venue_adapter_has_close(self):
"""V2: BingxVenueAdapter must have a close() method."""
from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter
adapter = object.__new__(BingxVenueAdapter)
assert callable(getattr(adapter, "close", None)), (
"V2: BingxVenueAdapter must have a close() method"
)
# ============================================================
# M9: ORDER_REJECT must NOT nuke a live POSITION_OPEN slot
# ============================================================
class TestM9OrderRejectPositionOpen:
"""M9: A spurious ORDER_REJECT arriving while the slot is POSITION_OPEN
must not reset it to IDLE. Only entry-phase rejects should reset."""
def _open_position(self, k: ExecutionKernel, trade_id: str) -> None:
r = k.process_intent(_mk_intent(action=E.ENTER, trade_id=trade_id))
assert r.accepted, f"ENTER failed: {r.diagnostic_code}"
assert k._get_slot(0).fsm_state == TradeStage.POSITION_OPEN
def test_spurious_reject_does_not_reset_position_open(self):
"""A stale ORDER_REJECT with no matching active order must not nuke slot."""
k = _fresh_kernel()
self._open_position(k, "m9a")
reject = _mk_venue_event(
kind=KernelEventKind.ORDER_REJECT,
trade_id="m9a",
event_id="stale-reject-m9a",
status=VenueEventStatus.REJECTED,
)
result = k.on_venue_event(reject)
slot = k._get_slot(0)
assert slot.fsm_state == TradeStage.POSITION_OPEN, (
f"M9: spurious ORDER_REJECT must not reset POSITION_OPEN → got {slot.fsm_state}"
)
assert not result.accepted, "Spurious reject must be reported as not accepted"
def test_entry_reject_still_resets_to_idle(self):
"""Entry-phase ORDER_REJECT must still reset to IDLE (regression)."""
k = _fresh_kernel(scenario=MockVenueScenario(partial_fill_ratio=0.0, emit_fill_on_submit=False))
k.process_intent(_mk_intent(action=E.ENTER, trade_id="m9b"))
slot = k._get_slot(0)
assert slot.active_entry_order is not None
reject = _mk_venue_event(
kind=KernelEventKind.ORDER_REJECT,
trade_id="m9b",
event_id="entry-reject-m9b",
status=VenueEventStatus.REJECTED,
)
k.on_venue_event(reject)
assert k._get_slot(0).fsm_state == TradeStage.IDLE, (
"Entry-phase ORDER_REJECT must still reset slot to IDLE"
)
# ============================================================
# G4: exit multi-leg — no phantom extra leg after last fill
# ============================================================
class TestG4ExitLegOrdering:
"""G4: all_legs_done was computed before consume_exit_leg(), causing a
phantom 3rd-leg attempt on the final leg's FULL_FILL when size > 1e-12."""
def test_two_leg_exit_closes_cleanly(self):
"""A 2-leg exit must close the slot without a phantom extra leg."""
k = _fresh_kernel()
k.process_intent(_mk_intent(action=E.ENTER, trade_id="g4a", size=2.0,
exit_leg_ratios=(0.5, 0.5)))
slot = k._get_slot(0)
assert slot.fsm_state == TradeStage.POSITION_OPEN
# Leg 1 EXIT
k.process_intent(_mk_intent(action=E.EXIT, trade_id="g4a", size=1.0,
exit_leg_ratios=(0.5, 0.5)))
slot = k._get_slot(0)
assert slot.fsm_state in (TradeStage.POSITION_OPEN, TradeStage.EXIT_WORKING), (
f"After leg-1 fill, slot must be POSITION_OPEN or EXIT_WORKING, got {slot.fsm_state}"
)
# Leg 2 EXIT
k.process_intent(_mk_intent(action=E.EXIT, trade_id="g4a", size=1.0,
exit_leg_ratios=(0.5, 0.5)))
slot = k._get_slot(0)
assert slot.is_free(), (
f"G4: 2-leg exit must fully close slot, got {slot.fsm_state}"
)
def test_single_leg_exit_unaffected(self):
"""Single-leg exit (the common case) must still work correctly."""
k = _fresh_kernel()
k.process_intent(_mk_intent(action=E.ENTER, trade_id="g4b"))
k.process_intent(_mk_intent(action=E.EXIT, trade_id="g4b"))
assert k._get_slot(0).is_free(), "Single-leg exit must close slot"
# ============================================================
# G9: venue_order_id routed to exit order in exit phase
# ============================================================
class TestG9VenueOrderIdRouting:
"""G9: venue_order_id update must target the exit order when in an exit
FSM state, not the still-present entry order reference."""
def test_order_ack_in_exit_phase_fills_exit_order_id(self):
"""ORDER_ACK arriving after EXIT is submitted must fill exit order's
venue_order_id, not the entry order's."""
k = _fresh_kernel(scenario=MockVenueScenario(partial_fill_ratio=0.0, emit_fill_on_submit=False))
k.process_intent(_mk_intent(action=E.ENTER, trade_id="g9a"))
fill = _mk_venue_event(
kind=KernelEventKind.FULL_FILL,
trade_id="g9a",
event_id="fill-g9a",
price=100.0,
size=1.0,
filled_size=1.0,
)
k.on_venue_event(fill)
assert k._get_slot(0).fsm_state == TradeStage.POSITION_OPEN
# Submit exit — mock emits no fill, just ACK
k.process_intent(_mk_intent(action=E.EXIT, trade_id="g9a"))
slot = k._get_slot(0)
# If active_exit_order exists and has a venue_order_id, G9 is not triggered
if slot.active_exit_order is not None:
assert slot.active_exit_order.venue_order_id != "", (
"G9: exit order must have a venue_order_id after ORDER_ACK in exit phase"
)
# ============================================================
# H6: _safe_enum gracefully handles unknown Rust FFI states
# ============================================================
class TestH6SafeEnum:
"""H6: unknown enum variants from the Rust FFI must not raise ValueError —
they must fall back to a safe default instead of crashing the process."""
def test_safe_enum_known_value(self):
from prod.clean_arch.dita_v2.rust_backend import _safe_enum
result = _safe_enum(TradeStage, "POSITION_OPEN", TradeStage.IDLE)
assert result == TradeStage.POSITION_OPEN
def test_safe_enum_unknown_value_returns_default(self):
from prod.clean_arch.dita_v2.rust_backend import _safe_enum
result = _safe_enum(TradeStage, "UNKNOWN_FUTURE_STATE", TradeStage.IDLE)
assert result == TradeStage.IDLE, (
"H6: unknown enum variant must return default, not raise ValueError"
)
def test_safe_enum_empty_string_returns_default(self):
from prod.clean_arch.dita_v2.rust_backend import _safe_enum
result = _safe_enum(TradeStage, "", TradeStage.IDLE)
assert result == TradeStage.IDLE
# ============================================================
# W10: BingxHttpError must not be blindly mapped to "REJECTED"
# ============================================================
class TestW10HttpErrorMapping:
"""W10: _http_error_status() must distinguish transient errors (429, 5xx,
DNS/transport) from genuine 4xx rejections so the kernel sees RATE_LIMITED
vs CANCEL_REJECT for the right cases."""
def _status(self, msg: str) -> str:
from prod.clean_arch.dita_v2.bingx_venue import _http_error_status
return _http_error_status(msg)
def test_429_is_rate_limited(self):
assert self._status("HTTP 429: Too Many Requests") == "RATE_LIMITED", (
"W10: 429 must map to RATE_LIMITED, not REJECTED"
)
def test_503_is_rate_limited(self):
assert self._status("HTTP 503: Service Unavailable") == "RATE_LIMITED", (
"W10: 503 (transient server error) must map to RATE_LIMITED"
)
def test_500_is_rate_limited(self):
assert self._status("HTTP 500: Internal Server Error") == "RATE_LIMITED"
def test_400_is_rejected(self):
assert self._status("HTTP 400: invalid symbol") == "REJECTED", (
"W10: genuine 4xx client error must map to REJECTED"
)
def test_403_is_rejected(self):
assert self._status("HTTP 403: Forbidden") == "REJECTED"
def test_transport_error_is_rate_limited(self):
assert self._status("DELETE /openApi/swap/v2/trade/order failed: ConnectionError") == "RATE_LIMITED", (
"W10: DNS/transport errors (no HTTP prefix) must map to RATE_LIMITED"
)
def test_dns_error_is_rate_limited(self):
assert self._status("Name or service not known") == "RATE_LIMITED"
# ============================================================
# PinkHzStateWriter: non-blocking Hz write correctness
# ============================================================
class TestPinkHzStateWriter:
"""PinkHzStateWriter: payload shape, vol_ok gate, and non-blocking guarantees."""
def _make_writer_no_hz(self):
"""Build a PinkHzStateWriter with a mock client that captures writes."""
from prod.clean_arch.dita_v2.hazelcast_projection import PinkHzStateWriter
import unittest.mock as mock
w = object.__new__(PinkHzStateWriter)
w._writes = {} # {(map_attr, key): value}
# Build fake non-blocking IMap proxy
def _make_map(name):
m = mock.MagicMock(name=f"map:{name}")
def _put(key, value):
w._writes[(name, key)] = value
m.put.side_effect = _put
return m
w._state_map = _make_map("DOLPHIN_STATE_PINK")
w._pnl_map = _make_map("DOLPHIN_PNL_PINK")
w._client = mock.MagicMock()
return w
def test_engine_snapshot_writes_two_keys(self):
w = self._make_writer_no_hz()
w.write_engine_snapshot(
{"slot_id": 0, "fsm_state": "IDLE"},
{"capital": 25000.0, "trade_seq": 42},
posture="APEX",
)
assert ("DOLPHIN_STATE_PINK", "engine_snapshot") in w._writes, (
"PinkHzStateWriter must write engine_snapshot key"
)
assert ("DOLPHIN_STATE_PINK", "latest") in w._writes, (
"PinkHzStateWriter must write latest key (BLUE-compatible)"
)
def test_engine_snapshot_has_strategy_pink(self):
import json
w = self._make_writer_no_hz()
w.write_engine_snapshot({"slot_id": 0}, {"capital": 10000.0})
snap = json.loads(w._writes[("DOLPHIN_STATE_PINK", "engine_snapshot")])
assert snap["strategy"] == "pink", "engine_snapshot must identify as pink"
def test_latest_key_has_blue_compatible_fields(self):
import json
w = self._make_writer_no_hz()
w.write_engine_snapshot({"slot_id": 0}, {"capital": 5000.0, "realized_pnl_total": 123.4, "trade_seq": 7})
latest = json.loads(w._writes[("DOLPHIN_STATE_PINK", "latest")])
for field in ("strategy", "capital", "date", "pnl", "trades", "posture", "updated_at"):
assert field in latest, f"BLUE-compatible 'latest' key missing field: {field}"
def test_our_leverage_in_snapshot(self):
import json
w = self._make_writer_no_hz()
w.write_engine_snapshot(
{"slot_id": 0, "size": 0.5, "entry_price": 50000.0},
{"capital": 25000.0},
our_leverage=1.0,
)
snap = json.loads(w._writes[("DOLPHIN_STATE_PINK", "engine_snapshot")])
assert "our_leverage" in snap, "our_leverage (dual-leverage: system layer) must be in Hz snapshot"
def test_daily_pnl_write(self):
import json
w = self._make_writer_no_hz()
w.write_daily_pnl({"realized_pnl_total": 45.6, "capital": 25000.0, "trade_seq": 3})
key = next((k for k in w._writes if k[0] == "DOLPHIN_PNL_PINK"), None)
assert key is not None, "write_daily_pnl must write to DOLPHIN_PNL_PINK"
row = json.loads(w._writes[key])
assert row["pnl"] == 45.6
def test_write_survives_exception(self):
"""Hz write failure must never propagate — observability must not affect trading."""
from prod.clean_arch.dita_v2.hazelcast_projection import _hz_write_no_wait
import unittest.mock as mock
bad_map = mock.MagicMock()
bad_map.put.side_effect = RuntimeError("Hz down")
_hz_write_no_wait(bad_map, "key", "value") # must not raise
# ============================================================
# vol_ok gate in DecisionEngine
# ============================================================
class TestVolOkGate:
"""DecisionEngine must block ENTERs when vol_ok=False in scan_payload."""
def _make_snapshot(self, vol_ok: bool, vdiv: float = -0.03, irp: float = 0.60):
from prod.clean_arch.ports.data_feed import MarketSnapshot
from datetime import datetime, timezone
return MarketSnapshot(
timestamp=datetime.now(timezone.utc),
symbol="BTCUSDT",
price=50000.0,
velocity_divergence=vdiv,
irp_alignment=irp,
scan_payload={"vol_ok": vol_ok, "posture": "APEX"},
)
def _engine(self):
from prod.clean_arch.dita.decision import DecisionEngine, DecisionConfig
cfg = DecisionConfig(
vel_div_threshold=-0.02,
vel_div_extreme=-0.05,
fixed_tp_pct=0.0020,
max_hold_bars=250,
capital_fraction=0.20,
max_leverage=3.0,
allow_short=True,
allow_long=False,
)
return DecisionEngine(cfg)
def _ctx(self, open_positions: int = 0, capital: float = 25000.0):
from prod.clean_arch.dita.contracts import DecisionContext
return DecisionContext(capital=capital, open_positions=open_positions)
def test_vol_ok_false_blocks_enter(self):
eng = self._engine()
snap = self._make_snapshot(vol_ok=False)
decision = eng.decide(snap, self._ctx())
assert decision.action.value in ("HOLD", "NO_ACTION", "SKIP", "VOL_GATE"), (
f"vol_ok=False must block ENTER, got action={decision.action.value!r} reason={getattr(decision, 'reason', '?')!r}"
)
def test_vol_ok_true_allows_enter(self):
eng = self._engine()
snap = self._make_snapshot(vol_ok=True)
decision = eng.decide(snap, self._ctx())
assert decision.action.value not in ("VOL_GATE",), (
"vol_ok=True must not block on vol_ok gate"
)
PINK: TUI Hz fix + DC gate + ACB boost + 10 new tests (104/104 green) TUI Hz fix: - hazelcast_projection.py: write_engine_snapshot now writes all NAUTILUS-era field aliases (trades_executed, current_leverage, open_positions as list, last_scan_number, last_vel_div, vol_ok, open_notional) so gear_rows/capital panel work with no TUI changes. - dolphin_status_pink.py: _normalize_eng_for_tui() safety-net translation added; render() uses it on every Hz read. DC gate (SYSTEM BIBLE §4.2, champion config): - pink_direct.py: _dc_contradicts() — 7-tick lookback, 0.75 bps threshold. Rising price (chg > 0.75 bps) blocks ENTER via dataclasses.replace(HOLD, DC_CONTRADICT). Price history deque initialized in connect(); dc_skip_contradicts=True enforced. ACB boost (SYSTEM BIBLE §10): - hazelcast_feed.py: fix wrong key "latest_acb" → "acb_boost" (DOLPHIN_FEATURES key written by acb_processor_service.py). - pink_direct.py: _last_acb_boost read from scan_payload["acb_boost"] first (scan bridge may embed it), then Hz direct fallback. Applied to intent.leverage via dataclasses.replace() after IntentEngine.plan(), capped at 3x. - _last_scan_number, _last_vel_div, _last_vol_ok tracked from scan_payload. OBF gate: NOT implemented. OBF shards (DOLPHIN_FEATURES_SHARD_*) require new Hz map connections + symbol routing. Gap documented; requires separate decision. Tests: TestDCGate (5) + TestNormalizeEngForTui (5) — 10 new, 104 total, all green. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-03 14:00:48 +02:00
# ============================================================
# DC gate (_dc_contradicts) — SYSTEM BIBLE §4.2
# ============================================================
class TestDCGate:
"""Direction Confirmation gate: rising price over 7-tick window blocks SHORT entry."""
def _rt(self, prices: list):
"""Build a minimal PinkDirectRuntime-like object with price history populated."""
from collections import deque
import types
obj = types.SimpleNamespace()
obj._price_history = deque(prices, maxlen=10)
# Bind the method to obj
from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime
obj._dc_contradicts = lambda **kw: PinkDirectRuntime._dc_contradicts(obj, **kw)
return obj
def test_rising_price_contradicts(self):
# p[-8] = 100.0, p[-1] = 100.2 → chg = +20 bps → CONTRADICT
prices = [100.0] + [100.05] * 6 + [100.2]
rt = self._rt(prices)
assert rt._dc_contradicts(), "Rising price must be DC CONTRADICT"
def test_falling_price_confirms(self):
# p[-8] = 100.0, p[-1] = 99.9 → chg = -10 bps → CONFIRM (not a block)
prices = [100.0] + [99.95] * 6 + [99.9]
rt = self._rt(prices)
assert not rt._dc_contradicts(), "Falling price must NOT be DC CONTRADICT"
def test_flat_price_neutral(self):
# < 0.75 bps change → NEUTRAL
prices = [100.0] * 8
rt = self._rt(prices)
assert not rt._dc_contradicts(), "Flat price must NOT be DC CONTRADICT"
def test_insufficient_history_neutral(self):
# < 8 prices → not enough data → NEUTRAL (allow entry)
prices = [100.0, 100.5] # only 2 entries
rt = self._rt(prices)
assert not rt._dc_contradicts(), "Insufficient history must NOT block entry"
def test_exactly_threshold_neutral(self):
# Exactly 0.75 bps → NOT a CONTRADICT (> not >=)
prices = [100.0] + [100.0] * 6 + [100.0075] # 0.75 bps exactly
rt = self._rt(prices)
assert not rt._dc_contradicts(), "Exactly at threshold must NOT be CONTRADICT"
# ============================================================
# TUI normalization — _normalize_eng_for_tui
# ============================================================
class TestNormalizeEngForTui:
"""_normalize_eng_for_tui translates DITAv2 Hz snapshot to NAUTILUS-era field names."""
def _norm(self, eng: dict) -> dict:
from Observability.dolphin_status_pink import _normalize_eng_for_tui
return _normalize_eng_for_tui(eng)
def test_empty_returns_empty(self):
assert self._norm({}) == {}
def test_already_nautilus_format_passthrough(self):
eng = {"trades_executed": 5, "capital": 25000.0}
out = self._norm(eng)
assert out is eng or out["trades_executed"] == 5
def test_ditav2_format_adds_trades_executed(self):
eng = {"trade_seq": 7, "capital": 25000.0, "slot": {}}
out = self._norm(eng)
assert out["trades_executed"] == 7, "trade_seq must be aliased as trades_executed"
def test_open_positions_becomes_list(self):
eng = {"open_positions": 1, "slot": {"size": 0.5, "entry_price": 50000.0}}
out = self._norm(eng)
assert isinstance(out["open_positions"], list), "open_positions int must become list"
assert len(out["open_positions"]) == 1
def test_zero_open_positions_empty_list(self):
eng = {"open_positions": 0, "slot": {}}
out = self._norm(eng)
assert out["open_positions"] == [], "zero open_positions must become empty list"