diff --git a/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs index 8798b55..53341d6 100644 --- a/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs +++ b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs @@ -1035,13 +1035,29 @@ impl KernelCore { // realized_pnl and clear the flag. if let Some(slot_id) = parsed.get("slot_id").and_then(|v| v.as_u64()) { let sid = slot_id as usize; - if sid < self.slots.len() && !self.slots[sid].closed { + // NOTE: closed slots MUST be repairable — the most common + // price-less leg is the TERMINAL fill, where the slot is + // already CLOSED by the time FILL_SETTLED arrives. The + // was_skipped flag + account-event dedup keep this + // idempotent; a `!closed` guard would permanently lose + // exactly the legs the repair exists for. + if sid < self.slots.len() { let was_skipped = self.slots[sid].metadata .get("realized_skipped_no_price") .and_then(|v| v.as_bool()) .unwrap_or(false); - if was_skipped && realized.is_finite() && realized != 0.0 { - self.slots[sid].realized_pnl += realized; + // The K-fold field (`realized_pnl`) is 0 on the live + // path because PREDICTED_FILL already folded it — the + // slot repair uses its own field so the same number + // is never double-folded into k_realized_pnl. + // Falls back to `realized_pnl` for callers that only + // send the legacy shape. + let repair = parsed + .get("repair_realized_pnl") + .and_then(|v| v.as_f64()) + .unwrap_or(realized); + if was_skipped && repair.is_finite() && repair != 0.0 { + self.slots[sid].realized_pnl += repair; self.slots[sid].metadata.insert( "realized_skipped_no_price".to_string(), Value::Bool(false), diff --git a/prod/clean_arch/dita_v2/rust_backend.py b/prod/clean_arch/dita_v2/rust_backend.py index 7e30678..aaf41e6 100644 --- a/prod/clean_arch/dita_v2/rust_backend.py +++ b/prod/clean_arch/dita_v2/rust_backend.py @@ -1156,7 +1156,33 @@ class ExecutionKernel: available_capital (E rules when present), k_capital, event_seq, capital_frozen (bool), duplicate_event (bool if deduplicated). """ - return _get_rust().on_account_event(self._backend, event) + result = _get_rust().on_account_event(self._backend, event) + # Settle slot-level repairs into published capital. The Rust + # FILL_SETTLED handler may add exchange realized PnL to a slot whose + # exit leg booked 0 (price-less fill); the settle-baseline diff + # otherwise only runs in on_venue_event, so a repair — especially on + # an already-CLOSED slot (terminal fill, the common case) — would + # never reach AccountProjection.capital. + try: + kind = str(event.get("kind", "") or "").upper() + sid_raw = event.get("slot_id") + if kind == "FILL_SETTLED" and sid_raw is not None: + sid = int(sid_raw) + if 0 <= sid < self.max_slots: + slot = self._get_slot(sid) + incremental = slot.realized_pnl - self._last_settled_pnl.get(sid, 0.0) + if abs(incremental) > 1e-12: + self.account.settle(incremental) + self._last_settled_pnl[sid] = slot.realized_pnl + self.account.observe_slots( + [self._get_slot(i) for i in range(self.max_slots)] + ) + current = self._get_slot(sid) + self.projection.write_slot(current) + self.zinc_plane.write_slot(current) + except Exception: + pass + return result # ------------------------------------------------------------------ # Snapshot / restore — session-to-session state continuity diff --git a/prod/clean_arch/runtime/pink_direct.py b/prod/clean_arch/runtime/pink_direct.py index 312ade3..2e1c693 100644 --- a/prod/clean_arch/runtime/pink_direct.py +++ b/prod/clean_arch/runtime/pink_direct.py @@ -672,6 +672,14 @@ class PinkDirectRuntime: "realized_pnl": 0.0, # already folded above "fee": event.fee, # negative = rebate "is_maker": event.is_maker, + # Slot-level PnL repair plumbing (Phase 3.2): the + # kernel repairs a price-less exit leg + # (realized_skipped_no_price) from the exchange's + # realized figure. Separate field so the K-fold is + # never double-counted; slot 0 = single-slot kernel + # and the fill ownership filter already passed. + "slot_id": 0, + "repair_realized_pnl": float(event.realized_pnl or 0.0), }) # Gap 2: log settled fee with WS_SETTLED provenance so # downstream can reconcile against the ESTIMATED_TAKER row. diff --git a/prod/tests/test_pink_direct_runtime.py b/prod/tests/test_pink_direct_runtime.py new file mode 100644 index 0000000..ee78a5d --- /dev/null +++ b/prod/tests/test_pink_direct_runtime.py @@ -0,0 +1,498 @@ +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, Optional, List, Dict + +from prod.clean_arch.dita import ( + Decision, + Intent, + DecisionConfig, + DecisionEngine, + IntentEngine, + TradeSide as LegacyTradeSide, +) +from prod.clean_arch.ports.data_feed import MarketSnapshot +from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime, _decision_to_kernel_intent +from prod.clean_arch.dita_v2.contracts import ( + KernelCommandType, + KernelDiagnosticCode, + KernelIntent, + KernelOutcome, + KernelSeverity, + KernelTransition, + TradeSide, + TradeSlot, + TradeStage, + VenueEvent, + VenueEventStatus, + VenueOrder, + VenueOrderStatus, + KernelEventKind, +) + + +@dataclass +class _FakeFeed: + """Fake Hazelcast data feed — returns canned snapshots.""" + + connected: bool = False + _snapshots: list[MarketSnapshot | None] = field(default_factory=list) + + async def connect(self) -> bool: + self.connected = True + return True + + async def disconnect(self) -> None: + self.connected = False + + async def get_latest_snapshot(self, symbol: str) -> MarketSnapshot | None: + if self._snapshots: + return self._snapshots.pop(0) + return None + + +class _FakeMarketStateRuntime: + """Fake market state runtime — records calls, returns canned bundle.""" + + def __init__(self) -> None: + self.calls: list[dict[str, Any]] = [] + self.latest_bundle_dict: dict[str, Any] = { + "market_fingerprint_choppiness_strength": 0.2, + "market_fingerprint_trend_persistence": 0.4, + "market_state_top_asset_target": "BTCUSDT", + } + + def update_scan_state(self, **kwargs): + self.calls.append(dict(kwargs)) + return type("Bundle", (), {"as_dict": lambda self: dict(kwargs)})() + + +class _FakeKernelAccount: + """Minimal kernel account projection stand-in.""" + + def __init__(self, capital: float = 25000.0): + self.snapshot = type("Snap", (), { + "capital": capital, + "equity": capital, + "peak_capital": capital, + "realized_pnl": 0.0, + "unrealized_pnl": 0.0, + "open_positions": 0, + "open_notional": 0.0, + "leverage": 0.0, + "trade_seq": 0, + "capital_source": "seed", + "e_wallet_balance": 0.0, + "event_seq": 0, + })() + + def anchor_to_exchange(self, wallet_balance: float, + available_margin: float = 0.0, + event_seq: int = 0) -> None: + """Mirror AccountProjection.anchor_to_exchange (Phase 1 contract).""" + wb = float(wallet_balance or 0.0) + if not (wb > 0.0): + return + self.snapshot.capital = wb + self.snapshot.equity = wb + self.snapshot.peak_capital = max(self.snapshot.peak_capital, wb) + self.snapshot.capital_source = "e_anchored" + self.snapshot.e_wallet_balance = wb + self.snapshot.event_seq = int(event_seq or 0) + + def settle(self, realized_pnl: float, fees: float = 0.0) -> None: + delta = float(realized_pnl or 0.0) - float(fees or 0.0) + self.snapshot.capital += delta + self.snapshot.equity = self.snapshot.capital + self.snapshot.realized_pnl += float(realized_pnl or 0.0) + self.snapshot.capital_source = "k_bridged" + + def observe_slots(self, slots) -> None: + return None + + +class _FakeSlotView: + """Minimal slot view stand-in.""" + + def __init__(self, slot_dict: dict | None = None): + d = slot_dict or { + "slot_id": 0, "trade_id": "", "asset": "", "side": "FLAT", + "entry_price": 0.0, "size": 0.0, "initial_size": 0.0, + "leverage": 0.0, "realized_pnl": 0.0, "unrealized_pnl": 0.0, + "closed": False, "close_reason": "", "fsm_state": "IDLE", + "exit_leg_ratios": [], "active_leg_index": 0, + "active_exit_order": None, "active_entry_order": None, + "entry_velocity_divergence": 0.0, "entry_irp_alignment": 0.0, + } + self._d = d + state_str = d.get("fsm_state", "IDLE") + # Map string to enum + for s in TradeStage: + if s.value == state_str: + self.fsm_state = s + break + else: + self.fsm_state = TradeStage.IDLE + + def to_dict(self) -> dict: + return dict(self._d) + + def is_free(self) -> bool: + return self.fsm_state in {TradeStage.IDLE, TradeStage.CLOSED} + + def is_open(self) -> bool: + return self.fsm_state in { + TradeStage.ENTRY_WORKING, TradeStage.POSITION_OPENED, + TradeStage.POSITION_OPEN, TradeStage.EXIT_WORKING, + } + + def mark_price(self, price: float) -> None: + self._d["entry_price"] = price + + +class _FakeVenue: + """Fake venue for runtime tests — simulates position lifecycle.""" + + def __init__(self): + self._capital = 25000.0 + self._position: dict | None = None + self._trade_seq = 0 + self._connected = False + + async def connect(self): + self._connected = True + + async def disconnect(self): + self._connected = False + + async def reconcile(self) -> dict: + return { + "capital": self._capital, + "equity": self._capital, + "open_positions": {} if self._position is None else {self._position["trade_id"]: self._position}, + "open_orders": [], + } + + def open_positions(self) -> list[dict]: + return [dict(self._position)] if self._position else [] + + +class _FakeKernel: + """Fake DITAv2 ExecutionKernel for runtime tests. + + Tracks an internal position lifecycle matching the _FakeVenue. + """ + + def __init__(self, capital: float = 25000.0): + self.max_slots = 1 + self.account = _FakeKernelAccount(capital) + self.venue = _FakeVenue() + self._slots: dict[int, _FakeSlotView] = {0: _FakeSlotView()} + self._capital = capital + self._position: dict | None = None + + def slot(self, slot_id: int) -> _FakeSlotView: + return self._slots.get(slot_id, _FakeSlotView()) + + def snapshot(self) -> dict: + return { + "account": { + "capital": self.account.snapshot.capital, + "equity": self.account.snapshot.equity, + "realized_pnl": self.account.snapshot.realized_pnl, + "unrealized_pnl": self.account.snapshot.unrealized_pnl, + "open_positions": self.account.snapshot.open_positions, + "open_notional": self.account.snapshot.open_notional, + "leverage": self.account.snapshot.leverage, + "trade_seq": self.account.snapshot.trade_seq, + }, + "slots": [self.slot(0).to_dict()], + } + + def process_intent(self, intent: KernelIntent) -> KernelOutcome: + """Simulate entry/exit lifecycle matching old _FakeExecution logic.""" + price = float(intent.reference_price or 0.0) + qty = float(intent.target_size or 0.0) + + if intent.action == KernelCommandType.ENTER: + self._position = { + "trade_id": intent.trade_id, + "asset": intent.asset, + "side": "SHORT" if intent.side == TradeSide.SHORT else "LONG", + "entry_price": price, + "size": qty, + "leverage": float(intent.leverage or 1.0), + } + self._slots[0] = _FakeSlotView({ + "slot_id": 0, "trade_id": intent.trade_id, "asset": intent.asset, + "side": self._position["side"], "entry_price": price, + "size": qty, "initial_size": qty, + "leverage": float(intent.leverage or 1.0), + "realized_pnl": 0.0, "unrealized_pnl": 0.0, + "closed": False, "close_reason": "", "fsm_state": "POSITION_OPEN", + "exit_leg_ratios": list(intent.exit_leg_ratios), "active_leg_index": 0, + "active_exit_order": None, "active_entry_order": None, + }) + self.account.snapshot.open_positions = 1 + self.account.snapshot.open_notional = qty * price + self.account.snapshot.trade_seq += 1 + + elif intent.action == KernelCommandType.EXIT and self._position is not None: + current_qty = float(self._position["size"]) + remaining = max(0.0, current_qty - qty) + entry_price = float(self._position["entry_price"]) + leverage = float(self._position.get("leverage", 1.0)) + pnl_pct = (entry_price - price) / entry_price # short profit + realized = pnl_pct * qty * entry_price * leverage + self._capital += realized + self.account.snapshot.capital = self._capital + self.account.snapshot.realized_pnl += realized + self.account.snapshot.peak_capital = max(self.account.snapshot.peak_capital, self._capital) + self.account.snapshot.equity = self._capital + + if remaining <= 1e-12: + self._position = None + self._slots[0] = _FakeSlotView({ + "slot_id": 0, "trade_id": intent.trade_id, "asset": intent.asset, + "side": "FLAT", "entry_price": 0.0, "size": 0.0, "initial_size": 0.0, + "leverage": 0.0, "realized_pnl": realized, "unrealized_pnl": 0.0, + "closed": True, "close_reason": intent.reason, "fsm_state": "CLOSED", + "exit_leg_ratios": [], "active_leg_index": 1, + "active_exit_order": None, "active_entry_order": None, + }) + self.account.snapshot.open_positions = 0 + self.account.snapshot.open_notional = 0.0 + else: + self._position["size"] = remaining + self._slots[0] = _FakeSlotView({ + "slot_id": 0, "trade_id": intent.trade_id, "asset": intent.asset, + "side": "SHORT", "entry_price": entry_price, "size": remaining, + "initial_size": qty, "leverage": leverage, + "realized_pnl": realized, "unrealized_pnl": 0.0, + "closed": False, "close_reason": "", "fsm_state": "POSITION_OPEN", + "exit_leg_ratios": list(intent.exit_leg_ratios), "active_leg_index": 1, + "active_exit_order": None, "active_entry_order": None, + }) + self.account.snapshot.open_positions = 1 + self.account.snapshot.open_notional = remaining * entry_price + + elif intent.action == KernelCommandType.MARK_PRICE: + if self._position: + self._position["entry_price"] = price + + return KernelOutcome( + accepted=True, + slot_id=0, + trade_id=intent.trade_id, + state=TradeStage.POSITION_OPEN if self._position else TradeStage.IDLE, + diagnostic_code=KernelDiagnosticCode.OK, + severity=KernelSeverity.INFO, + transitions=(), + emitted_events=(), + details={}, + ) + + def mark_price(self, asset: str, price: float) -> None: + self.slot(0).mark_price(price) + + def set_seed_capital(self, capital: float) -> None: + self._capital = capital + self.account.snapshot.capital = capital + + def reset_and_seed(self, capital: float) -> None: + self._capital = capital + self.account.snapshot.capital = capital + + def set_exchange_config(self, config: dict) -> None: + pass + + def on_account_event(self, event: dict) -> dict: + return {} + + async def process_intent_async(self, intent) -> KernelOutcome: + return self.process_intent(intent) + + def on_venue_event(self, event) -> KernelOutcome: + return KernelOutcome( + accepted=True, slot_id=0, trade_id="", + state=TradeStage.IDLE, diagnostic_code=KernelDiagnosticCode.OK, + ) + + def restore_state(self, state_json: str) -> bool: + return True + + def calibrate_fee(self, fill_price: float, fill_qty: float, actual_fee: float, is_maker: bool = False) -> dict: + return {} + + def reconcile_from_slots(self, slots: list) -> KernelOutcome: + # Populate slot from venue position if present + if self.venue._position is not None: + p = self.venue._position + self._position = dict(p) + self.venue._capital = self._capital + self._slots[0] = _FakeSlotView({ + "slot_id": 0, + "trade_id": p.get("trade_id", ""), + "asset": p.get("asset", ""), + "side": p.get("side", "FLAT"), + "entry_price": float(p.get("entry_price", 0.0)), + "size": float(p.get("size", 0.0)), + "initial_size": float(p.get("size", 0.0)), + "leverage": float(p.get("leverage", 1.0)), + "realized_pnl": 0.0, "unrealized_pnl": 0.0, + "closed": False, "close_reason": "", + "fsm_state": "POSITION_OPEN", + "exit_leg_ratios": [1.0], "active_leg_index": 0, + "active_exit_order": None, "active_entry_order": None, + "entry_velocity_divergence": 0.0, + "entry_irp_alignment": 0.0, + }) + self.account.snapshot.open_positions = 1 + self.account.snapshot.open_notional = float(p.get("size", 0)) * float(p.get("entry_price", 0)) + return KernelOutcome( + accepted=True, slot_id=0, trade_id="", + state=TradeStage.IDLE, diagnostic_code=KernelDiagnosticCode.OK, + ) + + +def _snapshot(price: float, vdiv: float, *, symbol: str = "BTCUSDT") -> MarketSnapshot: + return MarketSnapshot( + timestamp=datetime.now(timezone.utc), + symbol=symbol, + price=price, + bid=price * 0.9995, + ask=price * 1.0005, + eigenvalues=[1.0, 0.9, 0.8], + eigenvectors=None, + velocity_divergence=vdiv, + irp_alignment=0.5, + scan_number=int(datetime.now(timezone.utc).timestamp()), + source="pink_direct_runtime_test", + scan_payload={ + "version": "NG7", + "scan_number": int(datetime.now(timezone.utc).timestamp()), + "vel_div": vdiv, + "w50_velocity": 0.01, + "w750_velocity": 0.02, + "posture": "APEX", + "assets": [symbol], + "asset_prices": [price], + "market_fingerprint_choppiness_strength": 0.2, + }, + ) + + +def test_runtime_handles_open_partial_close_and_terminal_close() -> None: + """Full lifecycle: entry → partial exit → terminal exit via DITAv2 kernel.""" + feed = _FakeFeed() + kernel = _FakeKernel(capital=25000.0) + market_state_runtime = _FakeMarketStateRuntime() + cfg = DecisionConfig( + vel_div_threshold=-0.02, + fixed_tp_pct=0.002, + capital_fraction=0.01, + max_leverage=1.0, + exit_leg_ratios=(0.5, 1.0), + policy_version="pink_direct_test", + ) + runtime = PinkDirectRuntime( + data_feed=feed, + kernel=kernel, + decision_engine=DecisionEngine(cfg), + intent_engine=IntentEngine(cfg), + market_state_runtime=market_state_runtime, + ) + + asyncio.run(runtime.connect(initial_capital=25000.0)) + asyncio.run(runtime.step(_snapshot(100.0, -0.1))) + slot = kernel.slot(0) + assert slot.is_open(), f"Expected open slot after entry, got {slot.fsm_state}" + assert slot.to_dict().get("size", 0) > 0 + assert market_state_runtime.calls + + asyncio.run(runtime.step(_snapshot(99.5, 0.05))) + slot = kernel.slot(0) + remaining = slot.to_dict().get("size", 0) + assert remaining > 0, "Should still have position after partial exit" + + asyncio.run(runtime.step(_snapshot(99.3, 0.05))) + slot = kernel.slot(0) + # The decision engine decides whether to exit; what matters is that + # capital was not corrupted (logic should be profitable). + assert kernel.account.snapshot.capital > 25000.0, \ + f"Expected capital > 25000 after profitable trades, got {kernel.account.snapshot.capital}" + + asyncio.run(runtime.disconnect()) + assert feed.connected is False + + +def test_runtime_enter_maps_correct_kernel_intent() -> None: + """Verify the runtime's decision-to-intent translation is correct.""" + from prod.clean_arch.dita import DecisionAction as DAction, TradeStage as TStage + cfg = DecisionConfig(policy_version="pink_direct_test") + runtime = PinkDirectRuntime( + data_feed=_FakeFeed(), + kernel=_FakeKernel(), + decision_engine=DecisionEngine(cfg), + intent_engine=IntentEngine(cfg), + ) + decision = Decision( + timestamp=datetime.now(timezone.utc), + decision_id="d-001", asset="BTCUSDT", + action=DAction.ENTER, + side=LegacyTradeSide.SHORT, + reason="test", confidence=0.8, + velocity_divergence=-0.03, irp_alignment=0.5, + reference_price=65000.0, target_size=0.01, + leverage=2.0, bars_held=0, + stage=TStage.ORDER_REQUESTED, + metadata={}, + ) + intent = Intent( + timestamp=datetime.now(timezone.utc), + trade_id="t-001", decision_id="d-001", + asset="BTCUSDT", + action=DAction.ENTER, + side=LegacyTradeSide.SHORT, + reason="test", target_size=0.01, + leverage=2.0, reference_price=65000.0, + confidence=0.8, bars_held=0, + stage=TStage.INTENT_CREATED, + exit_leg_ratios=(0.5, 1.0), + metadata={}, + ) + ki = _decision_to_kernel_intent(decision, intent, slot_id=0) + assert ki.action == KernelCommandType.ENTER + assert ki.target_size == 0.01 + assert ki.side == TradeSide.SHORT + + +def test_runtime_recovers_from_exchange_state() -> None: + """Startup recovery seeds slot from existing exchange position.""" + feed = _FakeFeed() + kernel = _FakeKernel(capital=25000.0) + # Pre-seed a position in the kernel's venue + kernel.venue._position = { + "trade_id": "BTCUSDT", + "asset": "BTCUSDT", + "side": "SHORT", + "entry_price": 100.0, + "size": 1.5, + "leverage": 1.0, + } + cfg = DecisionConfig(policy_version="pink_direct_test") + runtime = PinkDirectRuntime( + data_feed=feed, + kernel=kernel, + decision_engine=DecisionEngine(cfg), + intent_engine=IntentEngine(cfg), + market_state_runtime=_FakeMarketStateRuntime(), + ) + + asyncio.run(runtime.connect(initial_capital=25000.0)) + slot = kernel.slot(0) + assert slot.is_open(), f"Expected open slot after recovery, got {slot.fsm_state}" + assert slot.to_dict().get("size", 0) == 1.5, \ + f"Expected size 1.5, got {slot.to_dict().get('size')}"