diff --git a/prod/clean_arch/dita_v2/PINK_DITAv2_FLAW_ANALYSIS_2026-05-31.md b/prod/clean_arch/dita_v2/PINK_DITAv2_FLAW_ANALYSIS_2026-05-31.md index 42d9de0..7479ea8 100644 --- a/prod/clean_arch/dita_v2/PINK_DITAv2_FLAW_ANALYSIS_2026-05-31.md +++ b/prod/clean_arch/dita_v2/PINK_DITAv2_FLAW_ANALYSIS_2026-05-31.md @@ -1,7 +1,7 @@ # PINK DITAv2 — Structural Flaw Analysis (CENTRAL) **Analysis date:** 2026-05-31 -**Last updated:** 2026-06-04 (flaw fix pass 5 — N2/N3/N4/Z6 closed; 21 total fixed) +**Last updated:** 2026-06-05 (flaw fix pass 6 — persistence layer; G21/E23/A13/A14/A15 closed; 26 total fixed) **Scope:** Full PINK pipeline — all flaws across all modules. > **Fix notation:** Rows marked **✅ FIXED ``** are verified-fixed with a test commit on branch `exp/pink-ditav2-sprint0-20260530`. @@ -111,7 +111,7 @@ | E20 | `_capital()` reads live from `AccountProjection` — stale row risk | Persistence | Low | | E21 | `persist_fill_events()` synthesizes fake Decision/Intent | Persistence | Medium | | E22 | `_write_trade_exit_leg` capital_before uses arithmetic reconstruction | Persistence | Medium | -| E23 | `_write_trade_event` uses entry_price as exit_price | Persistence | Medium | +| E23 | `_write_trade_event` uses entry_price as exit_price — **✅ FIXED `(pass-6)`** | Persistence | Medium | | E24 | Mock venue always emits fill on `partial_fill_ratio > 0` | Test | Low | | E25 | Test scenarios use MARKET-only `_si()` helper — no LIMIT tests | Test | Low | | E26 | Fresh-kernel reconcile tests create second kernel but share venue | Test | Low | @@ -183,7 +183,7 @@ | G18 | `exit_leg_ratios` no sum-to-1 validation | Config | Low | | G19 | RealZincControlPlane.read() no sequence check — torn-read risk | Config | Low | | G20 | ClickHouse journal strategy/db env vars — SQL injection risk | Config | Low | -| G21 | entry_price used as exit_price in trade_events — data loss | Persistence | **High** | +| G21 | entry_price used as exit_price in trade_events — **✅ FIXED `(pass-6)`** | Persistence | **High** | | G22 | active_leg_index → entry_bar semantic mis-mapping | Persistence | Medium | | G23 | capital_before arithmetic absorbs cross-slot PnL | Persistence | Medium | | G24 | Recovery trade_reconstruction always has trade_id="" | Persistence | Medium | @@ -383,6 +383,12 @@ | Flaw | Commit | What changed | |------|--------|--------------| | W10 — `BingxHttpError` blindly mapped to "REJECTED" | `e90d542` | `_http_error_status()` helper: 429/5xx/transport → RATE_LIMITED; 4xx → REJECTED | +| **G21/E23/A13** — `exit_price = entry_price` in trade_events / trade_exit_legs | `(pass-6)` | `_write_trade_event` + `_write_trade_exit_leg` now use `fill_price_hint` (extracted from venue FULL_FILL event) → `intent.reference_price` → `decision.reference_price`; entry_price only as last resort | +| **A14** — `entry_bar` maps `active_leg_index` | `(pass-6)` | `_write_position_state`: `entry_bar = intent.bars_held if intent else 0` | +| **A15** — `persist_recovery_state` uses account dict as slot dict | `(pass-6)` | Recovery reads actual slot from `kernel.slot(0).to_dict()` when kernel is wired; `trade_id` no longer always empty | +| **NaN tracing** — `_safe_float` silently swallows anomalies | `(pass-6)` | `_checked_float()` added: logs WARNING + writes `anomaly_events` spool row on NaN/inf in financial fields; used for `realized_pnl` in exit paths | +| **External-position exit_qty=0** — untracked positions gave empty exit legs | `(pass-6)` | `_write_trade_exit_leg`: when `prev_size<=0` (no prior ENTER tracked), falls back to `initial_size` or `intent.target_size` for exit_qty | +| **exit_qty field missing** from `trade_exit_legs` rows | `(pass-6)` | `exit_qty` added explicitly to `trade_exit_legs` sink dict | ### Fixes applied (2026-06-04 pass 5) — async/thread/race audit @@ -1184,7 +1190,7 @@ fixing for robustness. --- -### Flaw A13: `persist_fill_events` uses current price as exit price +### Flaw A13: `persist_fill_events` uses current price as exit price — **✅ FIXED `(pass-6)`** **Location:** `pink_clickhouse.py` lines ~408 @@ -1208,7 +1214,7 @@ for the persisted trade, breaking any PnL reconstruction that relies on --- -### Flaw A14: `_write_position_state` maps active_leg_index to entry_bar +### Flaw A14: `_write_position_state` maps active_leg_index to entry_bar — **✅ FIXED `(pass-6)`** **Location:** `pink_clickhouse.py` line ~673 @@ -1230,7 +1236,7 @@ from `entry_time` to now. --- -### Flaw A15: `persist_recovery_state` passes account dict as slot dict +### Flaw A15: `persist_recovery_state` passes account dict as slot dict — **✅ FIXED `(pass-6)`** **Location:** `pink_clickhouse.py` lines ~447-460 diff --git a/prod/clean_arch/dita_v2/test_pink_persistence.py b/prod/clean_arch/dita_v2/test_pink_persistence.py new file mode 100644 index 0000000..e40c00a --- /dev/null +++ b/prod/clean_arch/dita_v2/test_pink_persistence.py @@ -0,0 +1,836 @@ +"""Comprehensive persistence layer tests — all mocked, no CH/BingX I/O. + +Covers: + Unit: exit_price, capital_before/after, pnl_leg, entry_bar, recovery trade_id, + external-position exit_qty, persist_fill_events price propagation. + Chaos: None fields, zero prices, NaN pnl, missing slot keys, multi-leg exits, + restart-mid-trade recovery, concurrent fills, leg ratio mismatches. +""" +from __future__ import annotations + +import math +import sys +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest +from dataclasses import replace +from datetime import datetime, timezone +from types import SimpleNamespace +from unittest.mock import MagicMock + +from prod.clean_arch.dita_v2.account import AccountProjection +from prod.clean_arch.dita_v2.contracts import ( + KernelDiagnosticCode, KernelEventKind, KernelOutcome, KernelSeverity, + TradeStage as KernelStage, VenueEvent, VenueEventStatus, TradeSide as KTradeSide, +) +from prod.clean_arch.dita import ( + Decision, DecisionAction, Intent, TradeSide, TradeStage, +) +from prod.clean_arch.persistence.pink_clickhouse import ( + PinkClickHousePersistence, PinkClickHousePersistenceConfig, +) + +# --------------------------------------------------------------------------- +# Fixtures / factories +# --------------------------------------------------------------------------- + +_TS = datetime(2026, 6, 5, 10, 0, 0, tzinfo=timezone.utc) + + +def _snap(price: float = 1.1683, symbol: str = "XRP-USDT") -> SimpleNamespace: + return SimpleNamespace(timestamp=_TS, price=price, symbol=symbol) + + +def _account(capital: float = 25_000.0, pnl: float = 0.0) -> AccountProjection: + acc = AccountProjection() + acc.snapshot.capital = capital + acc.snapshot.peak_capital = capital + acc.snapshot.equity = capital + acc.snapshot.realized_pnl = pnl + return acc + + +def _decision( + action: DecisionAction = DecisionAction.ENTER, + asset: str = "XRP-USDT", + side: TradeSide = TradeSide.SHORT, + price: float = 1.1683, + size: float = 5027.0, + bars_held: int = 3, + reason: str = "test_reason", + trade_id: str = "", # ignored — Decision has no trade_id; use for _intent pairing only +) -> Decision: + return Decision( + timestamp=_TS, decision_id="d-001", asset=asset, action=action, + side=side, reason=reason, confidence=0.8, velocity_divergence=0.02, + irp_alignment=0.5, reference_price=price, target_size=size, + leverage=1.0, bars_held=bars_held, stage=TradeStage.ORDER_REQUESTED, metadata={}, + ) + + +def _intent( + action: DecisionAction = DecisionAction.ENTER, + asset: str = "XRP-USDT", + side: TradeSide = TradeSide.SHORT, + price: float = 1.1683, + size: float = 5027.0, + bars_held: int = 3, + trade_id: str = "XRP-USDT", +) -> Intent: + return Intent( + timestamp=_TS, trade_id=trade_id, decision_id="d-001", + asset=asset, action=action, side=side, reason="test_reason", + target_size=size, leverage=1.0, reference_price=price, + confidence=0.8, exit_leg_ratios=(1.0,), bars_held=bars_held, + stage=TradeStage.ORDER_REQUESTED, metadata={}, + ) + + +def _slot( + trade_id: str = "XRP-USDT", + asset: str = "XRP-USDT", + side: str = "SHORT", + entry_price: float = 1.1683, + size: float = 5027.0, + initial_size: float = 5027.0, + realized_pnl: float = 0.0, + closed: bool = False, + active_leg_index: int = 0, + leverage: float = 1.0, + close_reason: str = "", + exit_leg_ratios: tuple = (1.0,), +) -> dict: + return { + "trade_id": trade_id, + "asset": asset, + "side": side, + "entry_price": entry_price, + "size": size, + "initial_size": initial_size, + "realized_pnl": realized_pnl, + "closed": closed, + "active_leg_index": active_leg_index, + "leverage": leverage, + "close_reason": close_reason, + "exit_leg_ratios": list(exit_leg_ratios), + "slot_id": 0, + } + + +def _outcome( + accepted: bool = True, + trade_id: str = "XRP-USDT", + state: KernelStage = KernelStage.POSITION_OPEN, + events: tuple = (), +) -> KernelOutcome: + return KernelOutcome( + accepted=accepted, slot_id=0, trade_id=trade_id, state=state, + diagnostic_code=KernelDiagnosticCode.OK, severity=KernelSeverity.INFO, + transitions=(), emitted_events=events, details={}, + ) + + +def _fill_event( + price: float = 1.1700, + size: float = 5027.0, + kind: KernelEventKind = KernelEventKind.FULL_FILL, + trade_id: str = "XRP-USDT", + asset: str = "XRP-USDT", +) -> VenueEvent: + return VenueEvent( + timestamp=_TS, event_id="ev-001", trade_id=trade_id, slot_id=0, + kind=kind, status=VenueEventStatus.FILLED, + venue_order_id="ord-001", venue_client_id="cid-001", + side=KTradeSide.SHORT, asset=asset, + price=price, size=size, filled_size=size, remaining_size=0.0, + reason="", raw_payload={}, metadata={}, + ) + + +def _sink_and_persist(capital: float = 25_000.0, pnl: float = 0.0, kernel=None): + rows: list[tuple[str, dict]] = [] + acc = _account(capital, pnl) + cfg = PinkClickHousePersistenceConfig( + strategy="pink", runtime_namespace="dita_v2", + strategy_namespace="dita_v2", event_namespace="dita_v2", + initial_capital=25_000.0, + ) + p = PinkClickHousePersistence( + account=acc, config=cfg, + sink=lambda t, r: rows.append((t, r)), + v7_sink=lambda t, r: rows.append((t, r)), + kernel=kernel, + ) + return p, rows, acc + + +def _rows_of(rows, table: str) -> list[dict]: + return [r for t, r in rows if t == table] + + +# =========================================================================== +# 1. EXIT_PRICE — must not equal entry_price +# =========================================================================== + +class TestExitPrice: + """G21: exit_price in trade_events must reflect the fill/order price, not entry.""" + + def test_exit_price_differs_from_entry_price(self): + """Entry=1.1683, exit order at 1.1700 — trade_events.exit_price must be 1.1700.""" + entry = 1.1683 + exit_ref = 1.1700 + pnl = 5027.0 * (entry - exit_ref) # SHORT: profit when price falls, loss here + + p, rows, acc = _sink_and_persist(capital=25_000.0 + pnl, pnl=pnl) + + # Seed leg state (simulating a prior ENTER) + p._leg_state["XRP-USDT"] = { + "prev_realized": 0.0, + "prev_size": 5027.0, + "prev_leg_id": "", + } + closed_slot = _slot( + entry_price=entry, size=0.0, initial_size=5027.0, + realized_pnl=pnl, closed=True, active_leg_index=1, + ) + dec = _decision(action=DecisionAction.EXIT, price=exit_ref) + intent = _intent(action=DecisionAction.EXIT, price=exit_ref) + fill_ev = _fill_event(price=exit_ref, size=5027.0) + outcome = _outcome(state=KernelStage.CLOSED, events=(fill_ev,)) + + p.persist_result( + snapshot=_snap(), decision=dec, intent=intent, + outcome=outcome, slot_dict=closed_slot, phase="execution", + ) + + te = _rows_of(rows, "trade_events") + assert te, "trade_events row must be written on closed slot" + assert te[0]["exit_price"] != entry, ( + f"exit_price ({te[0]['exit_price']}) must NOT equal entry_price ({entry})" + ) + assert abs(te[0]["exit_price"] - exit_ref) < 1e-9, ( + f"exit_price ({te[0]['exit_price']}) should be intent.reference_price={exit_ref}" + ) + + def test_exit_price_fallback_to_decision_price_when_intent_zero(self): + """If intent.reference_price=0, fall back to decision.reference_price.""" + entry = 1.1683 + exit_ref = 1.1700 + pnl = -85.46 + + p, rows, acc = _sink_and_persist(capital=25_000.0 + pnl, pnl=pnl) + p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""} + + closed_slot = _slot(entry_price=entry, size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1) + dec = _decision(action=DecisionAction.EXIT, price=exit_ref) + intent = _intent(action=DecisionAction.EXIT, price=0.0) # zero intent price + outcome = _outcome(state=KernelStage.CLOSED) + + p.persist_result( + snapshot=_snap(), decision=dec, intent=intent, + outcome=outcome, slot_dict=closed_slot, phase="execution", + ) + + te = _rows_of(rows, "trade_events") + assert te, "trade_events must be written" + assert te[0]["exit_price"] > 0, "exit_price must be > 0 (should fall back to decision price)" + assert te[0]["exit_price"] != entry, "exit_price must not be entry_price" + + def test_persist_fill_events_exit_price_uses_event_price(self): + """persist_fill_events must propagate the venue fill price to trade_events.exit_price.""" + entry = 1.1683 + fill_price = 1.1650 # actual fill from exchange + pnl = 5027.0 * (entry - fill_price) # SHORT profit + + p, rows, acc = _sink_and_persist(capital=25_000.0 + pnl, pnl=pnl) + p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""} + + closed_slot = _slot(entry_price=entry, size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1) + fill_ev = _fill_event(price=fill_price, size=5027.0, kind=KernelEventKind.FULL_FILL) + + p.persist_fill_events( + snapshot=_snap(), events=[fill_ev], slot_dict=closed_slot, + ) + + te = _rows_of(rows, "trade_events") + assert te, "trade_events must be written for closed fill" + assert abs(te[0]["exit_price"] - fill_price) < 1e-9, ( + f"exit_price ({te[0]['exit_price']}) must match fill price {fill_price}" + ) + + +# =========================================================================== +# 2. CAPITAL_BEFORE / CAPITAL_AFTER +# =========================================================================== + +class TestCapitalFields: + """capital_before and capital_after must be finite, ordered, and non-null.""" + + def test_capital_after_equals_account_capital(self): + cap_after = 25_147.32 + pnl = 147.32 + + p, rows, acc = _sink_and_persist(capital=cap_after, pnl=pnl) + p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""} + + closed_slot = _slot(size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1) + dec = _decision(action=DecisionAction.EXIT, price=1.16) + intent = _intent(action=DecisionAction.EXIT, price=1.16) + outcome = _outcome(state=KernelStage.CLOSED) + + p.persist_result(snapshot=_snap(), decision=dec, intent=intent, outcome=outcome, slot_dict=closed_slot) + + te = _rows_of(rows, "trade_events") + assert te + assert abs(te[0]["capital_after"] - cap_after) < 1e-6 + assert abs(te[0]["capital_before"] - (cap_after - pnl)) < 1e-6 + + def test_capital_before_lt_after_on_profitable_trade(self): + """Profitable SHORT: capital_after > capital_before.""" + entry = 1.1683 + exit_p = 1.1500 + pnl = 5027.0 * (entry - exit_p) # positive PnL for SHORT + + p, rows, acc = _sink_and_persist(capital=25_000.0 + pnl, pnl=pnl) + p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""} + + closed_slot = _slot(size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1) + p.persist_result( + snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT, price=exit_p), + intent=_intent(action=DecisionAction.EXIT, price=exit_p), + outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot, + ) + + te = _rows_of(rows, "trade_events") + assert te + assert te[0]["capital_after"] > te[0]["capital_before"] + + def test_capital_before_gt_after_on_losing_trade(self): + """Losing SHORT: capital_after < capital_before.""" + entry = 1.1683 + exit_p = 1.1900 + pnl = 5027.0 * (entry - exit_p) # negative for SHORT when price rises + + p, rows, acc = _sink_and_persist(capital=25_000.0 + pnl, pnl=pnl) + p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""} + + closed_slot = _slot(size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1) + p.persist_result( + snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT, price=exit_p), + intent=_intent(action=DecisionAction.EXIT, price=exit_p), + outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot, + ) + + te = _rows_of(rows, "trade_events") + assert te + assert te[0]["capital_after"] < te[0]["capital_before"] + + def test_exit_leg_capital_fields_populated(self): + """trade_exit_legs capital_before and capital_after must be finite floats.""" + pnl = 120.0 + p, rows, acc = _sink_and_persist(capital=25_000.0 + pnl, pnl=pnl) + p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""} + + closed_slot = _slot(size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1) + p.persist_result( + snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT, price=1.16), + intent=_intent(action=DecisionAction.EXIT, price=1.16), + outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot, + ) + + legs = _rows_of(rows, "trade_exit_legs") + assert legs, "trade_exit_legs must be written" + assert math.isfinite(legs[0]["capital_before"]) + assert math.isfinite(legs[0]["capital_after"]) + assert legs[0]["capital_before"] > 0 + assert legs[0]["capital_after"] > 0 + + +# =========================================================================== +# 3. PNL_LEG — exit quantity correctness +# =========================================================================== + +class TestPnlLeg: + def test_pnl_leg_correct_single_leg(self): + """Single-leg exit: pnl_leg must equal total realized_pnl.""" + pnl = 200.0 + p, rows, acc = _sink_and_persist(capital=25_200.0, pnl=pnl) + p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""} + + closed_slot = _slot(size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1) + p.persist_result( + snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT), + intent=_intent(action=DecisionAction.EXIT), + outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot, + ) + + legs = _rows_of(rows, "trade_exit_legs") + assert legs + assert abs(legs[0]["pnl_leg"] - pnl) < 1e-9 + + def test_exit_qty_nonzero_when_leg_state_tracked(self): + """When leg_state has prev_size=5027, exit_qty must be 5027 for full close.""" + p, rows, acc = _sink_and_persist(capital=25_000.0) + p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""} + + closed_slot = _slot(size=0.0, initial_size=5027.0, realized_pnl=0.0, closed=True, active_leg_index=1) + p.persist_result( + snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT), + intent=_intent(action=DecisionAction.EXIT, size=5027.0), + outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot, + ) + + legs = _rows_of(rows, "trade_exit_legs") + assert legs + assert legs[0]["exit_qty"] > 0, "exit_qty must be positive when leg was tracked" + + def test_exit_qty_nonzero_for_external_position(self): + """When leg_state missing (external position), exit_qty must still be > 0.""" + # XRP position detected via reconcile — no prior ENTER persist + p, rows, acc = _sink_and_persist(capital=25_000.0, pnl=150.0) + # _leg_state["XRP-USDT"] intentionally NOT set + + closed_slot = _slot(size=0.0, initial_size=5027.0, realized_pnl=150.0, closed=True, active_leg_index=1) + p.persist_result( + snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT), + intent=_intent(action=DecisionAction.EXIT, size=5027.0), + outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot, + ) + + legs = _rows_of(rows, "trade_exit_legs") + assert legs, "trade_exit_legs must be written even for external positions" + assert legs[0]["exit_qty"] > 0, ( + f"exit_qty ({legs[0]['exit_qty']}) must be > 0 for external position close" + ) + + def test_multi_leg_pnl_leg_incremental(self): + """Two-leg exit: each leg's pnl_leg must be the delta, not cumulative.""" + p, rows, acc = _sink_and_persist(capital=25_000.0, pnl=0.0) + p._leg_state["T1"] = {"prev_realized": 0.0, "prev_size": 1000.0, "prev_leg_id": ""} + + # Leg 1: partial exit, 400 qty, pnl = 50 + slot_leg1 = _slot("T1", size=600.0, initial_size=1000.0, realized_pnl=50.0, + closed=False, active_leg_index=1, exit_leg_ratios=(0.4, 1.0)) + acc.snapshot.capital = 25_050.0 + acc.snapshot.realized_pnl = 50.0 + p.persist_result( + snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT, trade_id="T1"), + intent=_intent(action=DecisionAction.EXIT, trade_id="T1", size=400.0), + outcome=_outcome(state=KernelStage.POSITION_OPEN), slot_dict=slot_leg1, + ) + + # Leg 2: final exit, 600 qty, additional pnl = 80 + slot_leg2 = _slot("T1", size=0.0, initial_size=1000.0, realized_pnl=130.0, + closed=True, active_leg_index=2, exit_leg_ratios=(0.4, 1.0)) + acc.snapshot.capital = 25_130.0 + acc.snapshot.realized_pnl = 130.0 + p.persist_result( + snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT, trade_id="T1"), + intent=_intent(action=DecisionAction.EXIT, trade_id="T1", size=600.0), + outcome=_outcome(state=KernelStage.CLOSED), slot_dict=slot_leg2, + ) + + legs = _rows_of(rows, "trade_exit_legs") + assert len(legs) == 2, f"Expected 2 exit leg rows, got {len(legs)}" + assert abs(legs[0]["pnl_leg"] - 50.0) < 1e-9, f"Leg 1 pnl_leg={legs[0]['pnl_leg']}, want 50" + assert abs(legs[1]["pnl_leg"] - 80.0) < 1e-9, f"Leg 2 pnl_leg={legs[1]['pnl_leg']}, want 80" + + +# =========================================================================== +# 4. ENTRY_BAR — must not map active_leg_index +# =========================================================================== + +class TestEntryBar: + def test_entry_bar_uses_bars_held_not_leg_index(self): + """entry_bar in position_state must reflect bars_held, not active_leg_index.""" + bars = 7 + p, rows, acc = _sink_and_persist() + + slot = _slot(active_leg_index=3) # leg_index=3, bars_held=7 — must not use 3 + dec = _decision(bars_held=bars) + intent = _intent(bars_held=bars) + + p.persist_result( + snapshot=_snap(), decision=dec, intent=intent, + outcome=_outcome(), slot_dict=slot, + ) + + ps = _rows_of(rows, "position_state") + assert ps + assert ps[0]["entry_bar"] != 3, "entry_bar must not be active_leg_index (3)" + assert ps[0]["entry_bar"] == bars, f"entry_bar should be bars_held={bars}" + + def test_entry_bar_zero_when_hold(self): + """HOLD: entry_bar must be 0 (or bars_held=0), not active_leg_index.""" + p, rows, acc = _sink_and_persist() + slot = _slot(active_leg_index=5) + dec = _decision(action=DecisionAction.HOLD, bars_held=0) + intent = _intent(action=DecisionAction.HOLD, bars_held=0) + + p.persist_result(snapshot=_snap(), decision=dec, intent=intent, + outcome=None, slot_dict=slot) + + ps = _rows_of(rows, "position_state") + assert ps + assert ps[0]["entry_bar"] != 5, "entry_bar must not be active_leg_index" + + +# =========================================================================== +# 5. RECOVERY — trade_id must come from slot, not account dict +# =========================================================================== + +class TestRecoveryPersist: + def test_recovery_trade_id_from_kernel_slot(self): + """persist_recovery_state must read trade_id from kernel.slot(0), not acc_dict.""" + slot_view = MagicMock() + slot_view.to_dict.return_value = { + "trade_id": "XRP-USDT", + "asset": "XRP-USDT", + "side": "SHORT", + "entry_price": 1.1683, + "size": 5027.0, + "initial_size": 5027.0, + "realized_pnl": 0.0, + "closed": False, + "active_leg_index": 0, + "leverage": 1.0, + "close_reason": "", + "exit_leg_ratios": [1.0], + } + kernel = MagicMock() + kernel.slot.return_value = slot_view + kernel.snapshot.return_value = {"account": {}} + kernel.max_slots = 1 + + p, rows, acc = _sink_and_persist(kernel=kernel) + acc_dict = {"capital": 103747.0, "equity": 103747.0, "realized_pnl": 0.0} # no trade_id! + + p.persist_recovery_state( + snapshot=_snap(), acc_dict=acc_dict, phase="recovery", + ) + + recs = _rows_of(rows, "trade_reconstruction") + assert recs, "trade_reconstruction must be written for recovery" + assert recs[0]["trade_id"] == "XRP-USDT", ( + f"trade_id must come from kernel slot, got '{recs[0]['trade_id']}'" + ) + + def test_recovery_trade_id_empty_when_kernel_not_wired(self): + """Without kernel, recovery trade_id is empty (not crash).""" + p, rows, acc = _sink_and_persist() + acc_dict = {"capital": 25000.0} + + p.persist_recovery_state(snapshot=_snap(), acc_dict=acc_dict) + + recs = _rows_of(rows, "trade_reconstruction") + assert recs # row is written, just with empty trade_id + + def test_recovery_with_open_position_in_kernel(self): + """Recovery when kernel has open slot writes OPEN status.""" + slot_view = MagicMock() + slot_view.to_dict.return_value = _slot( + trade_id="ATOM-T1", asset="ATOM-USDT", size=200.0, closed=False, + ) + kernel = MagicMock() + kernel.slot.return_value = slot_view + kernel.snapshot.return_value = {"account": {}} + kernel.max_slots = 1 + + p, rows, acc = _sink_and_persist(kernel=kernel) + p.persist_recovery_state(snapshot=_snap(), acc_dict={"capital": 25000.0}) + + ps = _rows_of(rows, "position_state") + assert ps + assert ps[0]["status"] in ("OPEN", "RECOVERED_OPEN", "FLAT") + + +# =========================================================================== +# 6. FULL ENTER → EXIT lifecycle — all fields non-null +# =========================================================================== + +class TestFullLifecycle: + def test_full_enter_exit_no_null_financial_fields(self): + """Complete ENTER + EXIT: trade_events must have no null financial fields.""" + entry_price = 1.1683 + exit_price = 1.1500 + qty = 5027.0 + pnl = qty * (entry_price - exit_price) # SHORT profit + + p, rows, acc = _sink_and_persist(capital=25_000.0) + + # --- ENTER --- + snap = _snap(entry_price) + enter_slot = _slot(entry_price=entry_price, size=qty, initial_size=qty) + enter_out = _outcome(state=KernelStage.POSITION_OPEN, events=(_fill_event(entry_price, qty, KernelEventKind.FULL_FILL),)) + + p.persist_result( + snapshot=snap, decision=_decision(DecisionAction.ENTER, price=entry_price, size=qty), + intent=_intent(DecisionAction.ENTER, price=entry_price, size=qty), + outcome=enter_out, slot_dict=enter_slot, phase="execution", + ) + + # --- EXIT --- + acc.snapshot.capital = 25_000.0 + pnl + acc.snapshot.realized_pnl = pnl + exit_snap = _snap(exit_price) + exit_slot = _slot( + entry_price=entry_price, size=0.0, initial_size=qty, + realized_pnl=pnl, closed=True, active_leg_index=1, + ) + exit_fill = _fill_event(exit_price, qty, KernelEventKind.FULL_FILL) + exit_out = _outcome(state=KernelStage.CLOSED, events=(exit_fill,)) + + p.persist_result( + snapshot=exit_snap, decision=_decision(DecisionAction.EXIT, price=exit_price, size=qty), + intent=_intent(DecisionAction.EXIT, price=exit_price, size=qty), + outcome=exit_out, slot_dict=exit_slot, phase="execution", + ) + + te = _rows_of(rows, "trade_events") + assert te, "trade_events must be written on close" + row = te[0] + # Critical financial fields must be non-null and finite + for field in ("pnl", "capital_before", "capital_after", "entry_price", "exit_price"): + assert row[field] is not None, f"trade_events.{field} must not be None" + assert math.isfinite(float(row[field])), f"trade_events.{field} must be finite" + assert row["exit_price"] != row["entry_price"], "exit_price must differ from entry_price" + assert row["capital_after"] > row["capital_before"], "profitable trade: cap_after > cap_before" + + def test_enter_writes_entry_filled_reconstruction(self): + """ENTER fill must write ENTRY_FILLED to trade_reconstruction.""" + p, rows, acc = _sink_and_persist() + enter_slot = _slot(size=5027.0, initial_size=5027.0) + fill = _fill_event(1.1683, 5027.0, KernelEventKind.FULL_FILL) + out = _outcome(state=KernelStage.POSITION_OPEN, events=(fill,)) + + p.persist_result( + snapshot=_snap(), decision=_decision(DecisionAction.ENTER), + intent=_intent(DecisionAction.ENTER), + outcome=out, slot_dict=enter_slot, + ) + + recs = _rows_of(rows, "trade_reconstruction") + types = [r["event_type"] for r in recs] + assert "ENTRY_FILLED" in types, f"Expected ENTRY_FILLED, got: {types}" + + def test_exit_writes_trade_event_and_exit_leg(self): + """Exit close must write both trade_events and trade_exit_legs.""" + p, rows, acc = _sink_and_persist(capital=25_000.0, pnl=100.0) + p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""} + acc.snapshot.capital = 25_100.0 + + closed_slot = _slot(size=0.0, realized_pnl=100.0, closed=True, active_leg_index=1) + fill = _fill_event(1.16, 5027.0) + out = _outcome(state=KernelStage.CLOSED, events=(fill,)) + + p.persist_result( + snapshot=_snap(), decision=_decision(DecisionAction.EXIT), + intent=_intent(DecisionAction.EXIT, price=1.16), + outcome=out, slot_dict=closed_slot, + ) + + assert _rows_of(rows, "trade_events"), "trade_events must be written" + assert _rows_of(rows, "trade_exit_legs"), "trade_exit_legs must be written" + + +# =========================================================================== +# 7. CHAOS / FUZZ TESTS +# =========================================================================== + +class TestChaosPersistence: + """Resilience: None fields, NaN, zero prices, missing keys — must not crash.""" + + def test_none_entry_price_in_slot(self): + """slot_dict.entry_price=None must not crash or produce NaN.""" + p, rows, acc = _sink_and_persist(capital=25_000.0, pnl=50.0) + p._leg_state["T1"] = {"prev_realized": 0.0, "prev_size": 100.0, "prev_leg_id": ""} + + bad_slot = _slot("T1", entry_price=None, size=0.0, realized_pnl=50.0, closed=True) + bad_slot["entry_price"] = None # override to None + + p.persist_result( + snapshot=_snap(), decision=_decision(DecisionAction.EXIT, price=1.16), + intent=_intent(DecisionAction.EXIT, price=1.16, trade_id="T1"), + outcome=_outcome(state=KernelStage.CLOSED), slot_dict=bad_slot, + ) + + te = _rows_of(rows, "trade_events") + assert te, "Must write trade_events even with None entry_price" + assert math.isfinite(te[0]["capital_after"]) + + def test_nan_realized_pnl_in_slot(self): + """NaN realized_pnl must be sanitized to 0.0, not propagated.""" + p, rows, acc = _sink_and_persist() + p._leg_state["T1"] = {"prev_realized": 0.0, "prev_size": 100.0, "prev_leg_id": ""} + + bad_slot = _slot("T1", size=0.0, realized_pnl=float("nan"), closed=True) + + p.persist_result( + snapshot=_snap(), decision=_decision(DecisionAction.EXIT, trade_id="T1"), + intent=_intent(DecisionAction.EXIT, trade_id="T1"), + outcome=_outcome(state=KernelStage.CLOSED), slot_dict=bad_slot, + ) + + te = _rows_of(rows, "trade_events") + if te: + assert math.isfinite(te[0]["pnl"]), "NaN pnl must be sanitized" + + def test_zero_exit_price_does_not_crash(self): + """zero reference_price for exit must not produce NaN/inf or crash.""" + p, rows, acc = _sink_and_persist() + p._leg_state["T1"] = {"prev_realized": 0.0, "prev_size": 100.0, "prev_leg_id": ""} + + closed_slot = _slot("T1", size=0.0, closed=True) + p.persist_result( + snapshot=_snap(), decision=_decision(DecisionAction.EXIT, price=0.0, trade_id="T1"), + intent=_intent(DecisionAction.EXIT, price=0.0, trade_id="T1"), + outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot, + ) + # Must not raise; rows may be sparse but no crash + + def test_missing_slot_keys_gracefully_handled(self): + """Minimally-populated slot_dict must not crash persistence.""" + p, rows, acc = _sink_and_persist() + minimal_slot = {"closed": True} # only closed key + p.persist_result( + snapshot=_snap(), decision=_decision(DecisionAction.EXIT), + intent=_intent(DecisionAction.EXIT), + outcome=_outcome(state=KernelStage.CLOSED), slot_dict=minimal_slot, + ) + # Must not raise + + def test_hold_decision_writes_status_snapshot(self): + """HOLD must still emit status_snapshots, position_state, account_events.""" + p, rows, acc = _sink_and_persist() + p.persist_result( + snapshot=_snap(), decision=_decision(action=DecisionAction.HOLD), + intent=_intent(action=DecisionAction.HOLD), + outcome=None, slot_dict=_slot(), + ) + assert _rows_of(rows, "status_snapshots"), "HOLD must write status_snapshots" + assert _rows_of(rows, "account_events"), "HOLD must write account_events" + assert _rows_of(rows, "position_state"), "HOLD must write position_state" + + def test_infinite_capital_does_not_propagate(self): + """If account capital is somehow inf, output rows must still be finite.""" + p, rows, acc = _sink_and_persist() + acc.snapshot.capital = float("inf") # corrupt + + p.persist_result( + snapshot=_snap(), decision=_decision(), intent=_intent(), + outcome=_outcome(), slot_dict=_slot(), + ) + # Rows should still be written; capital fields may be 0 or clamped + ss = _rows_of(rows, "status_snapshots") + assert ss + + def test_empty_events_tuple_in_outcome(self): + """Empty emitted_events must not cause ENTER to log ENTRY_FILLED.""" + p, rows, acc = _sink_and_persist() + open_slot = _slot(size=5027.0) + out = _outcome(events=()) # no fill events + + p.persist_result( + snapshot=_snap(), decision=_decision(DecisionAction.ENTER), + intent=_intent(DecisionAction.ENTER), + outcome=out, slot_dict=open_slot, + ) + + recs = _rows_of(rows, "trade_reconstruction") + types = [r["event_type"] for r in recs] + # slot_open=True so ENTRY_FILLED IS expected (size>0) + # This is the current design — verify no crash + assert recs is not None + + def test_partial_exit_followed_by_final_exit(self): + """Two-leg exit must produce two trade_exit_legs rows and one trade_events.""" + p, rows, acc = _sink_and_persist(capital=25_000.0, pnl=0.0) + p._leg_state["T2"] = {"prev_realized": 0.0, "prev_size": 1000.0, "prev_leg_id": ""} + + # Partial exit + slot_p = _slot("T2", size=600.0, initial_size=1000.0, realized_pnl=50.0, + active_leg_index=1, exit_leg_ratios=(0.4, 1.0)) + acc.snapshot.capital = 25_050.0 + acc.snapshot.realized_pnl = 50.0 + partial_fill = _fill_event(trade_id="T2", price=1.16, size=400.0, kind=KernelEventKind.PARTIAL_FILL) + + p.persist_result( + snapshot=_snap(), decision=_decision(DecisionAction.EXIT, trade_id="T2"), + intent=_intent(DecisionAction.EXIT, trade_id="T2", size=400.0), + outcome=_outcome(state=KernelStage.POSITION_OPEN, events=(partial_fill,)), + slot_dict=slot_p, + ) + + # Final exit + slot_f = _slot("T2", size=0.0, initial_size=1000.0, realized_pnl=130.0, + closed=True, active_leg_index=2, exit_leg_ratios=(0.4, 1.0)) + acc.snapshot.capital = 25_130.0 + acc.snapshot.realized_pnl = 130.0 + final_fill = _fill_event(trade_id="T2", price=1.15, size=600.0) + + p.persist_result( + snapshot=_snap(), decision=_decision(DecisionAction.EXIT, trade_id="T2"), + intent=_intent(DecisionAction.EXIT, trade_id="T2", size=600.0), + outcome=_outcome(state=KernelStage.CLOSED, events=(final_fill,)), + slot_dict=slot_f, + ) + + legs = _rows_of(rows, "trade_exit_legs") + te = _rows_of(rows, "trade_events") + assert len(legs) == 2, f"Expected 2 exit legs, got {len(legs)}" + assert len(te) == 1, f"Expected 1 trade_event, got {len(te)}" + assert all(math.isfinite(l["pnl_leg"]) for l in legs), "All pnl_leg must be finite" + + def test_restart_mid_trade_recovery_then_exit(self): + """Simulates restart: kernel has open position, recovery runs, then exit.""" + slot_view = MagicMock() + slot_view.to_dict.return_value = _slot("ATOM-T1", "ATOM-USDT", "SHORT", + entry_price=1.802, size=26152.81, + initial_size=26152.81) + kernel = MagicMock() + kernel.slot.return_value = slot_view + kernel.snapshot.return_value = {"account": {}} + kernel.max_slots = 1 + + p, rows, acc = _sink_and_persist(capital=25_000.0, kernel=kernel) + + # Recovery after restart + p.persist_recovery_state( + snapshot=_snap(), acc_dict={"capital": 25_000.0}, phase="recovery", + ) + + # Now exit (position detected via reconcile) + acc.snapshot.capital = 25_100.0 + acc.snapshot.realized_pnl = 100.0 + closed_slot = _slot("ATOM-T1", "ATOM-USDT", "SHORT", entry_price=1.802, + size=0.0, initial_size=26152.81, realized_pnl=100.0, + closed=True) + fill_ev = _fill_event(trade_id="ATOM-T1", asset="ATOM-USDT", price=1.798, size=26152.81) + + p.persist_result( + snapshot=_snap(), decision=_decision(DecisionAction.EXIT, asset="ATOM-USDT", trade_id="ATOM-T1"), + intent=_intent(DecisionAction.EXIT, asset="ATOM-USDT", trade_id="ATOM-T1", size=26152.81), + outcome=_outcome(state=KernelStage.CLOSED, events=(fill_ev,)), + slot_dict=closed_slot, + ) + + te = _rows_of(rows, "trade_events") + assert te, "trade_events must be written for exit after recovery" + assert math.isfinite(te[0]["capital_after"]) + + def test_persist_fill_events_partial_fill_updates_leg_state(self): + """PARTIAL_FILL via pump must update leg_state for next leg.""" + p, rows, acc = _sink_and_persist(capital=25_000.0, pnl=0.0) + p._leg_state["T3"] = {"prev_realized": 0.0, "prev_size": 1000.0, "prev_leg_id": ""} + + partial_slot = _slot("T3", size=600.0, initial_size=1000.0, realized_pnl=40.0) + partial_fill = _fill_event(trade_id="T3", price=1.16, size=400.0, kind=KernelEventKind.PARTIAL_FILL) + acc.snapshot.capital = 25_040.0 + + p.persist_fill_events(snapshot=_snap(), events=[partial_fill], slot_dict=partial_slot) + + # leg_state should now have prev_size=600 + assert "T3" in p._leg_state + assert abs(p._leg_state["T3"]["prev_size"] - 600.0) < 1e-9, ( + f"leg_state.prev_size should be 600 after partial fill, got {p._leg_state['T3']['prev_size']}" + ) diff --git a/prod/clean_arch/persistence/pink_clickhouse.py b/prod/clean_arch/persistence/pink_clickhouse.py index cd72d8c..d551c6c 100644 --- a/prod/clean_arch/persistence/pink_clickhouse.py +++ b/prod/clean_arch/persistence/pink_clickhouse.py @@ -17,6 +17,7 @@ Capital/peak_capital/trade_seq are read from the kernel's AccountProjection from __future__ import annotations import json +import logging import math from dataclasses import dataclass from datetime import datetime, timezone @@ -28,6 +29,7 @@ from prod.clean_arch.dita_v2.contracts import KernelDiagnosticCode, KernelEventK from prod.clean_arch.dita_v2.contracts import KernelSeverity, TradeStage as KernelStage Writer = Callable[[str, dict[str, Any]], None] +_log = logging.getLogger(__name__) def _json_safe(value: Any) -> Any: @@ -78,6 +80,48 @@ def _safe_float(value: Any, default: float = 0.0) -> float: return out +def _checked_float( + value: Any, + default: float = 0.0, + *, + field: str = "?", + trade_id: str = "", + sink: Writer | None = None, +) -> float: + """_safe_float with anomaly tracing. + + Any NaN/inf/non-numeric value is a bug indicator, not a normal condition. + Sanitise to ``default`` but log a WARNING and optionally write an + ``anomaly_events`` spool row so the trace is queryable in ClickHouse. + """ + try: + out = float(value) + except Exception: + out = float("nan") + if not math.isfinite(out): + _log.warning( + "NaN/inf in financial field field=%s trade_id=%s raw=%r → replacing with %s", + field, trade_id or "?", value, default, + ) + if sink is not None: + try: + sink("anomaly_events", { + "ts": datetime.now(timezone.utc).isoformat(), + "decision_id": "", + "trade_id": trade_id, + "symbol": "", + "anomaly": f"NaN_FINANCIAL_FIELD:{field}", + "origin": "persistence_nan_guard", + "sensor": field, + "detail": f"raw={value!r} replaced_with={default}", + "rm_meta": 0.0, + }) + except Exception: + pass + return default + return out + + def _decision_summary(decision: Decision | None) -> dict[str, Any]: if decision is None: return {} @@ -410,9 +454,22 @@ class PinkClickHousePersistence: return partial = (not slot_closed) and cur_size > 0.0 + + # Extract the fill price from emitted venue events (G21 fix): the actual + # exchange fill price lives in the FULL_FILL/PARTIAL_FILL event, not in + # the slot dict. Pass it explicitly so _write_trade_event does not fall + # back to entry_price. + fill_price_hint = 0.0 + for ev in events: + p_val = getattr(ev, "price", 0.0) + if p_val and math.isfinite(float(p_val)) and float(p_val) > 0: + fill_price_hint = float(p_val) + break + # One trade_exit_legs row per exit leg (partial or final), BLUE-schema # compatible so PINK multi-exit trades reconcile against the same table. - self._write_trade_exit_leg(snapshot, decision, intent, slot, outcome) + self._write_trade_exit_leg(snapshot, decision, intent, slot, outcome, + fill_price_hint=fill_price_hint) self._write_trade_reconstruction( snapshot, intent.trade_id, event_type="PARTIAL_EXIT" if partial else "EXIT", @@ -428,7 +485,9 @@ class PinkClickHousePersistence: ) # Terminal trade event. if slot_closed: - self._write_trade_event(snapshot, decision, intent, slot, outcome, market_state=market_state) + self._write_trade_event(snapshot, decision, intent, slot, outcome, + market_state=market_state, + exit_price_hint=fill_price_hint) def persist_fill_events( self, @@ -455,7 +514,12 @@ class PinkClickHousePersistence: closed = bool(slot.get("closed", False)) cur_size = self._slot_size(slot) leverage = _safe_float(slot.get("leverage", 1.0), 1.0) - price = next((float(getattr(e, "price", 0.0) or 0.0) for e in event_list if getattr(e, "price", 0.0)), 0.0) or self._slot_entry_price(slot) + # Extract fill price from venue events (used as exit_price_hint for G21 fix). + price = next( + (float(getattr(e, "price", 0.0)) for e in event_list + if getattr(e, "price", 0.0) and math.isfinite(float(getattr(e, "price", 0.0)))), + 0.0, + ) or self._slot_entry_price(slot) prev_size = _safe_float(self._leg_state.get(trade_id, {}).get("prev_size", 0.0), 0.0) is_exit = closed or (prev_size > 0.0 and cur_size < prev_size - 1e-12) action = DecisionAction.EXIT if is_exit else DecisionAction.ENTER @@ -493,27 +557,47 @@ class PinkClickHousePersistence: event_type: str = "RECOVERY", market_state: Mapping[str, Any] | None = None, ) -> None: - """Persist recovery-only state after kernel reconcile.""" - slot_dict = acc_dict or {} + """Persist recovery-only state after kernel reconcile. + + A15 fix: acc_dict is the kernel account snapshot (capital/equity/pnl), + not a slot dict. Read the actual slot from kernel.slot(0) so that + trade_id, asset, size, and entry_price are correctly populated in the + recovery rows. + """ + # A15: read slot from kernel instead of misusing acc_dict as slot dict. + slot_dict: dict[str, Any] = {} + if self._kernel is not None: + try: + slot_view = self._kernel.slot(0) + raw = slot_view.to_dict() if hasattr(slot_view, "to_dict") else {} + slot_dict = dict(raw) if raw else {} + except Exception: + pass + + trade_id = ( + str(slot_dict.get("trade_id") or "") + or (str(acc_dict.get("trade_id", "")) if acc_dict else "") + ) + self._write_status_snapshot( - snapshot, decision=None, intent=None, slot_dict={}, phase=phase, + snapshot, decision=None, intent=None, slot_dict=slot_dict, phase=phase, ) self._write_account_event( snapshot, decision=None, intent=None, stage=TradeStage.TRADE_TERMINAL_WRITTEN, - slot_dict={}, event_type=event_type, + slot_dict=slot_dict, event_type=event_type, ) self._write_position_state( snapshot, decision=None, intent=None, - slot_dict={}, stage=TradeStage.TRADE_TERMINAL_WRITTEN, - status=self._state_label({}, phase), market_state=market_state, + slot_dict=slot_dict, stage=TradeStage.TRADE_TERMINAL_WRITTEN, + status=self._state_label(slot_dict, phase), market_state=market_state, ) self._write_trade_reconstruction( snapshot, - trade_id=acc_dict.get("trade_id", "") if acc_dict else "", + trade_id=trade_id, event_type=event_type, event_id=f"recovery:{phase}", - payload={"acc_dict": _json_safe(acc_dict or {}), "phase": phase, "market_state": _json_safe(market_state or {})}, + payload={"acc_dict": _json_safe(acc_dict or {}), "slot": _json_safe(slot_dict), "phase": phase, "market_state": _json_safe(market_state or {})}, market_state=market_state, ) @@ -736,7 +820,9 @@ class PinkClickHousePersistence: "notional": _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)), "leverage": _safe_float(slot_dict.get("leverage", 0.0), 0.0), "bucket_id": -1, - "entry_bar": int(slot_dict.get("active_leg_index", 0) or 0), + # A14 fix: active_leg_index is the exit-leg counter, not the bar count. + # Use intent.bars_held when available; fall back to 0. + "entry_bar": int(intent.bars_held if intent is not None else 0) or 0, "status": status, "exit_reason": slot_dict.get("close_reason", ""), "pnl": _safe_float(slot_dict.get("realized_pnl", 0.0), 0.0), @@ -789,6 +875,7 @@ class PinkClickHousePersistence: def _write_trade_exit_leg( self, snapshot: Any, decision: Decision, intent: Intent, slot_dict: dict[str, Any], outcome: KernelOutcome | None, + fill_price_hint: float = 0.0, ) -> None: """Emit one BLUE-schema-compatible ``trade_exit_legs`` row per exit leg. @@ -805,14 +892,20 @@ class PinkClickHousePersistence: "prev_leg_id": "", } entry_price = self._slot_entry_price(slot_dict) or _safe_float(intent.reference_price, 0.0) - exit_price = _safe_float(intent.reference_price, 0.0) or _safe_float(decision.reference_price, 0.0) + # G21 fix: use fill price hint (actual exchange fill) > intent ref > decision ref. + exit_price = ( + fill_price_hint + or _safe_float(intent.reference_price, 0.0) + or _safe_float(decision.reference_price, 0.0) + ) side = self._slot_side(slot_dict) if side == TradeSide.FLAT: side = intent.side leverage_val = _safe_float(slot_dict.get("leverage", intent.leverage), 1.0) cur_size = self._slot_size(slot_dict) - cur_realized = _safe_float(slot_dict.get("realized_pnl", 0.0), 0.0) + cur_realized = _checked_float(slot_dict.get("realized_pnl", 0.0), 0.0, + field="realized_pnl", trade_id=trade_id, sink=self._sink) prev_size = _safe_float(prev.get("prev_size", 0.0), 0.0) prev_realized = _safe_float(prev.get("prev_realized", 0.0), 0.0) @@ -822,7 +915,15 @@ class PinkClickHousePersistence: leg_index = max(0, int(slot_dict.get("active_leg_index", 0) or 0) - 1) fraction = _safe_float(ratios[leg_index], 0.0) if 0 <= leg_index < len(ratios) else 0.0 - exit_qty = max(0.0, prev_size - cur_size) + # External-position fix: if leg_state was never seeded (position detected via + # reconcile/pump rather than our own ENTER), prev_size=0 would make exit_qty=0. + # Fall back to initial_size or intent.target_size so the leg row is meaningful. + if prev_size <= 1e-12: + # No prior leg tracking — use the slot's initial_size or intent size. + initial = _safe_float(slot_dict.get("initial_size", 0.0), 0.0) or _safe_float(intent.target_size, 0.0) + exit_qty = initial - cur_size if initial > cur_size else initial + else: + exit_qty = max(0.0, prev_size - cur_size) pnl_leg = cur_realized - prev_realized capital_after = self._capital() capital_before = capital_after - pnl_leg @@ -852,6 +953,7 @@ class PinkClickHousePersistence: "side": side.value, "entry_price": entry_price, "exit_price": exit_price, + "exit_qty": exit_qty, "fraction": fraction, "capital_before": capital_before, "capital_after": capital_after, @@ -875,11 +977,23 @@ class PinkClickHousePersistence: self, snapshot: Any, decision: Decision, intent: Intent, slot_dict: dict[str, Any], outcome: KernelOutcome | None, *, market_state: Mapping[str, Any] | None = None, + exit_price_hint: float = 0.0, ) -> None: entry_price = _safe_float(slot_dict.get("entry_price", 0.0), 0.0) or _safe_float(intent.reference_price, 0.0) quantity = _safe_float(slot_dict.get("initial_size", slot_dict.get("size", 0.0)), 0.0) or _safe_float(intent.target_size, 0.0) - exit_price = _safe_float(slot_dict.get("entry_price", 0.0), 0.0) - pnl = _safe_float(slot_dict.get("realized_pnl", 0.0), 0.0) + # G21 fix: exit_price is the fill/order price, NOT the entry price. + # Priority: explicit hint (fill event price) > intent reference price > decision price. + # Fall back to entry_price only as absolute last resort (avoids the G21 bug where + # every trade_events row had exit_price == entry_price and PnL reconstruction was zero). + exit_price = ( + exit_price_hint + or _safe_float(intent.reference_price, 0.0) + or _safe_float(decision.reference_price, 0.0) + or _safe_float(slot_dict.get("entry_price", 0.0), 0.0) + ) + tid = intent.trade_id if intent is not None else "" + pnl = _checked_float(slot_dict.get("realized_pnl", 0.0), 0.0, + field="realized_pnl", trade_id=tid, sink=self._sink) pnl_pct = 0.0 leverage_val = _safe_float(slot_dict.get("leverage", intent.leverage), 1.0) denom = abs(quantity * entry_price * max(leverage_val, 1e-9))