PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing

G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price):
  _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price
  _write_trade_exit_leg: same priority chain via fill_price_hint parameter
  persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome
  persist_fill_events: intent.reference_price = actual fill price → propagates correctly

A14 — entry_bar was active_leg_index (exit leg counter, not bar count):
  _write_position_state: entry_bar = intent.bars_held (0 when intent is None)

A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""):
  Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot

External-position exit_qty=0 fix:
  _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to
  initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful

exit_qty field added to trade_exit_legs rows (was computed but not emitted)

NaN tracing (_checked_float):
  Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool
  row on NaN/inf in financial fields; applied to realized_pnl in exit paths

29 new persistence unit tests (mocked) + chaos/fuzz suite:
  exit_price correctness, capital ordering, pnl_leg incremental, entry_bar,
  recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields
  164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green

FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Codex
2026-06-05 09:30:30 +02:00
parent 025d381623
commit b30205ceb6
3 changed files with 979 additions and 23 deletions

View File

@@ -1,7 +1,7 @@
# PINK DITAv2 — Structural Flaw Analysis (CENTRAL) # PINK DITAv2 — Structural Flaw Analysis (CENTRAL)
**Analysis date:** 2026-05-31 **Analysis date:** 2026-05-31
**Last updated:** 2026-06-04 (flaw fix pass 5N2/N3/N4/Z6 closed; 21 total fixed) **Last updated:** 2026-06-05 (flaw fix pass 6persistence layer; G21/E23/A13/A14/A15 closed; 26 total fixed)
**Scope:** Full PINK pipeline — all flaws across all modules. **Scope:** Full PINK pipeline — all flaws across all modules.
> **Fix notation:** Rows marked **✅ FIXED `<sha>`** are verified-fixed with a test commit on branch `exp/pink-ditav2-sprint0-20260530`. > **Fix notation:** Rows marked **✅ FIXED `<sha>`** are verified-fixed with a test commit on branch `exp/pink-ditav2-sprint0-20260530`.
@@ -111,7 +111,7 @@
| E20 | `_capital()` reads live from `AccountProjection` — stale row risk | Persistence | Low | | E20 | `_capital()` reads live from `AccountProjection` — stale row risk | Persistence | Low |
| E21 | `persist_fill_events()` synthesizes fake Decision/Intent | Persistence | Medium | | E21 | `persist_fill_events()` synthesizes fake Decision/Intent | Persistence | Medium |
| E22 | `_write_trade_exit_leg` capital_before uses arithmetic reconstruction | Persistence | Medium | | E22 | `_write_trade_exit_leg` capital_before uses arithmetic reconstruction | Persistence | Medium |
| E23 | `_write_trade_event` uses entry_price as exit_price | Persistence | Medium | | E23 | `_write_trade_event` uses entry_price as exit_price**✅ FIXED `(pass-6)`** | Persistence | Medium |
| E24 | Mock venue always emits fill on `partial_fill_ratio > 0` | Test | Low | | E24 | Mock venue always emits fill on `partial_fill_ratio > 0` | Test | Low |
| E25 | Test scenarios use MARKET-only `_si()` helper — no LIMIT tests | Test | Low | | E25 | Test scenarios use MARKET-only `_si()` helper — no LIMIT tests | Test | Low |
| E26 | Fresh-kernel reconcile tests create second kernel but share venue | Test | Low | | E26 | Fresh-kernel reconcile tests create second kernel but share venue | Test | Low |
@@ -183,7 +183,7 @@
| G18 | `exit_leg_ratios` no sum-to-1 validation | Config | Low | | G18 | `exit_leg_ratios` no sum-to-1 validation | Config | Low |
| G19 | RealZincControlPlane.read() no sequence check — torn-read risk | Config | Low | | G19 | RealZincControlPlane.read() no sequence check — torn-read risk | Config | Low |
| G20 | ClickHouse journal strategy/db env vars — SQL injection risk | Config | Low | | G20 | ClickHouse journal strategy/db env vars — SQL injection risk | Config | Low |
| G21 | entry_price used as exit_price in trade_events — data loss | Persistence | **High** | | G21 | entry_price used as exit_price in trade_events — **✅ FIXED `(pass-6)`** | Persistence | **High** |
| G22 | active_leg_index → entry_bar semantic mis-mapping | Persistence | Medium | | G22 | active_leg_index → entry_bar semantic mis-mapping | Persistence | Medium |
| G23 | capital_before arithmetic absorbs cross-slot PnL | Persistence | Medium | | G23 | capital_before arithmetic absorbs cross-slot PnL | Persistence | Medium |
| G24 | Recovery trade_reconstruction always has trade_id="" | Persistence | Medium | | G24 | Recovery trade_reconstruction always has trade_id="" | Persistence | Medium |
@@ -383,6 +383,12 @@
| Flaw | Commit | What changed | | Flaw | Commit | What changed |
|------|--------|--------------| |------|--------|--------------|
| W10 — `BingxHttpError` blindly mapped to "REJECTED" | `e90d542` | `_http_error_status()` helper: 429/5xx/transport → RATE_LIMITED; 4xx → REJECTED | | W10 — `BingxHttpError` blindly mapped to "REJECTED" | `e90d542` | `_http_error_status()` helper: 429/5xx/transport → RATE_LIMITED; 4xx → REJECTED |
| **G21/E23/A13**`exit_price = entry_price` in trade_events / trade_exit_legs | `(pass-6)` | `_write_trade_event` + `_write_trade_exit_leg` now use `fill_price_hint` (extracted from venue FULL_FILL event) → `intent.reference_price``decision.reference_price`; entry_price only as last resort |
| **A14**`entry_bar` maps `active_leg_index` | `(pass-6)` | `_write_position_state`: `entry_bar = intent.bars_held if intent else 0` |
| **A15**`persist_recovery_state` uses account dict as slot dict | `(pass-6)` | Recovery reads actual slot from `kernel.slot(0).to_dict()` when kernel is wired; `trade_id` no longer always empty |
| **NaN tracing**`_safe_float` silently swallows anomalies | `(pass-6)` | `_checked_float()` added: logs WARNING + writes `anomaly_events` spool row on NaN/inf in financial fields; used for `realized_pnl` in exit paths |
| **External-position exit_qty=0** — untracked positions gave empty exit legs | `(pass-6)` | `_write_trade_exit_leg`: when `prev_size<=0` (no prior ENTER tracked), falls back to `initial_size` or `intent.target_size` for exit_qty |
| **exit_qty field missing** from `trade_exit_legs` rows | `(pass-6)` | `exit_qty` added explicitly to `trade_exit_legs` sink dict |
### Fixes applied (2026-06-04 pass 5) — async/thread/race audit ### Fixes applied (2026-06-04 pass 5) — async/thread/race audit
@@ -1184,7 +1190,7 @@ fixing for robustness.
--- ---
### Flaw A13: `persist_fill_events` uses current price as exit price ### Flaw A13: `persist_fill_events` uses current price as exit price — **✅ FIXED `(pass-6)`**
**Location:** `pink_clickhouse.py` lines ~408 **Location:** `pink_clickhouse.py` lines ~408
@@ -1208,7 +1214,7 @@ for the persisted trade, breaking any PnL reconstruction that relies on
--- ---
### Flaw A14: `_write_position_state` maps active_leg_index to entry_bar ### Flaw A14: `_write_position_state` maps active_leg_index to entry_bar — **✅ FIXED `(pass-6)`**
**Location:** `pink_clickhouse.py` line ~673 **Location:** `pink_clickhouse.py` line ~673
@@ -1230,7 +1236,7 @@ from `entry_time` to now.
--- ---
### Flaw A15: `persist_recovery_state` passes account dict as slot dict ### Flaw A15: `persist_recovery_state` passes account dict as slot dict — **✅ FIXED `(pass-6)`**
**Location:** `pink_clickhouse.py` lines ~447-460 **Location:** `pink_clickhouse.py` lines ~447-460

View File

@@ -0,0 +1,836 @@
"""Comprehensive persistence layer tests — all mocked, no CH/BingX I/O.
Covers:
Unit: exit_price, capital_before/after, pnl_leg, entry_bar, recovery trade_id,
external-position exit_qty, persist_fill_events price propagation.
Chaos: None fields, zero prices, NaN pnl, missing slot keys, multi-leg exits,
restart-mid-trade recovery, concurrent fills, leg ratio mismatches.
"""
from __future__ import annotations
import math
import sys
sys.path.insert(0, "/mnt/dolphinng5_predict")
import pytest
from dataclasses import replace
from datetime import datetime, timezone
from types import SimpleNamespace
from unittest.mock import MagicMock
from prod.clean_arch.dita_v2.account import AccountProjection
from prod.clean_arch.dita_v2.contracts import (
KernelDiagnosticCode, KernelEventKind, KernelOutcome, KernelSeverity,
TradeStage as KernelStage, VenueEvent, VenueEventStatus, TradeSide as KTradeSide,
)
from prod.clean_arch.dita import (
Decision, DecisionAction, Intent, TradeSide, TradeStage,
)
from prod.clean_arch.persistence.pink_clickhouse import (
PinkClickHousePersistence, PinkClickHousePersistenceConfig,
)
# ---------------------------------------------------------------------------
# Fixtures / factories
# ---------------------------------------------------------------------------
_TS = datetime(2026, 6, 5, 10, 0, 0, tzinfo=timezone.utc)
def _snap(price: float = 1.1683, symbol: str = "XRP-USDT") -> SimpleNamespace:
return SimpleNamespace(timestamp=_TS, price=price, symbol=symbol)
def _account(capital: float = 25_000.0, pnl: float = 0.0) -> AccountProjection:
acc = AccountProjection()
acc.snapshot.capital = capital
acc.snapshot.peak_capital = capital
acc.snapshot.equity = capital
acc.snapshot.realized_pnl = pnl
return acc
def _decision(
action: DecisionAction = DecisionAction.ENTER,
asset: str = "XRP-USDT",
side: TradeSide = TradeSide.SHORT,
price: float = 1.1683,
size: float = 5027.0,
bars_held: int = 3,
reason: str = "test_reason",
trade_id: str = "", # ignored — Decision has no trade_id; use for _intent pairing only
) -> Decision:
return Decision(
timestamp=_TS, decision_id="d-001", asset=asset, action=action,
side=side, reason=reason, confidence=0.8, velocity_divergence=0.02,
irp_alignment=0.5, reference_price=price, target_size=size,
leverage=1.0, bars_held=bars_held, stage=TradeStage.ORDER_REQUESTED, metadata={},
)
def _intent(
action: DecisionAction = DecisionAction.ENTER,
asset: str = "XRP-USDT",
side: TradeSide = TradeSide.SHORT,
price: float = 1.1683,
size: float = 5027.0,
bars_held: int = 3,
trade_id: str = "XRP-USDT",
) -> Intent:
return Intent(
timestamp=_TS, trade_id=trade_id, decision_id="d-001",
asset=asset, action=action, side=side, reason="test_reason",
target_size=size, leverage=1.0, reference_price=price,
confidence=0.8, exit_leg_ratios=(1.0,), bars_held=bars_held,
stage=TradeStage.ORDER_REQUESTED, metadata={},
)
def _slot(
trade_id: str = "XRP-USDT",
asset: str = "XRP-USDT",
side: str = "SHORT",
entry_price: float = 1.1683,
size: float = 5027.0,
initial_size: float = 5027.0,
realized_pnl: float = 0.0,
closed: bool = False,
active_leg_index: int = 0,
leverage: float = 1.0,
close_reason: str = "",
exit_leg_ratios: tuple = (1.0,),
) -> dict:
return {
"trade_id": trade_id,
"asset": asset,
"side": side,
"entry_price": entry_price,
"size": size,
"initial_size": initial_size,
"realized_pnl": realized_pnl,
"closed": closed,
"active_leg_index": active_leg_index,
"leverage": leverage,
"close_reason": close_reason,
"exit_leg_ratios": list(exit_leg_ratios),
"slot_id": 0,
}
def _outcome(
accepted: bool = True,
trade_id: str = "XRP-USDT",
state: KernelStage = KernelStage.POSITION_OPEN,
events: tuple = (),
) -> KernelOutcome:
return KernelOutcome(
accepted=accepted, slot_id=0, trade_id=trade_id, state=state,
diagnostic_code=KernelDiagnosticCode.OK, severity=KernelSeverity.INFO,
transitions=(), emitted_events=events, details={},
)
def _fill_event(
price: float = 1.1700,
size: float = 5027.0,
kind: KernelEventKind = KernelEventKind.FULL_FILL,
trade_id: str = "XRP-USDT",
asset: str = "XRP-USDT",
) -> VenueEvent:
return VenueEvent(
timestamp=_TS, event_id="ev-001", trade_id=trade_id, slot_id=0,
kind=kind, status=VenueEventStatus.FILLED,
venue_order_id="ord-001", venue_client_id="cid-001",
side=KTradeSide.SHORT, asset=asset,
price=price, size=size, filled_size=size, remaining_size=0.0,
reason="", raw_payload={}, metadata={},
)
def _sink_and_persist(capital: float = 25_000.0, pnl: float = 0.0, kernel=None):
rows: list[tuple[str, dict]] = []
acc = _account(capital, pnl)
cfg = PinkClickHousePersistenceConfig(
strategy="pink", runtime_namespace="dita_v2",
strategy_namespace="dita_v2", event_namespace="dita_v2",
initial_capital=25_000.0,
)
p = PinkClickHousePersistence(
account=acc, config=cfg,
sink=lambda t, r: rows.append((t, r)),
v7_sink=lambda t, r: rows.append((t, r)),
kernel=kernel,
)
return p, rows, acc
def _rows_of(rows, table: str) -> list[dict]:
return [r for t, r in rows if t == table]
# ===========================================================================
# 1. EXIT_PRICE — must not equal entry_price
# ===========================================================================
class TestExitPrice:
"""G21: exit_price in trade_events must reflect the fill/order price, not entry."""
def test_exit_price_differs_from_entry_price(self):
"""Entry=1.1683, exit order at 1.1700 — trade_events.exit_price must be 1.1700."""
entry = 1.1683
exit_ref = 1.1700
pnl = 5027.0 * (entry - exit_ref) # SHORT: profit when price falls, loss here
p, rows, acc = _sink_and_persist(capital=25_000.0 + pnl, pnl=pnl)
# Seed leg state (simulating a prior ENTER)
p._leg_state["XRP-USDT"] = {
"prev_realized": 0.0,
"prev_size": 5027.0,
"prev_leg_id": "",
}
closed_slot = _slot(
entry_price=entry, size=0.0, initial_size=5027.0,
realized_pnl=pnl, closed=True, active_leg_index=1,
)
dec = _decision(action=DecisionAction.EXIT, price=exit_ref)
intent = _intent(action=DecisionAction.EXIT, price=exit_ref)
fill_ev = _fill_event(price=exit_ref, size=5027.0)
outcome = _outcome(state=KernelStage.CLOSED, events=(fill_ev,))
p.persist_result(
snapshot=_snap(), decision=dec, intent=intent,
outcome=outcome, slot_dict=closed_slot, phase="execution",
)
te = _rows_of(rows, "trade_events")
assert te, "trade_events row must be written on closed slot"
assert te[0]["exit_price"] != entry, (
f"exit_price ({te[0]['exit_price']}) must NOT equal entry_price ({entry})"
)
assert abs(te[0]["exit_price"] - exit_ref) < 1e-9, (
f"exit_price ({te[0]['exit_price']}) should be intent.reference_price={exit_ref}"
)
def test_exit_price_fallback_to_decision_price_when_intent_zero(self):
"""If intent.reference_price=0, fall back to decision.reference_price."""
entry = 1.1683
exit_ref = 1.1700
pnl = -85.46
p, rows, acc = _sink_and_persist(capital=25_000.0 + pnl, pnl=pnl)
p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""}
closed_slot = _slot(entry_price=entry, size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1)
dec = _decision(action=DecisionAction.EXIT, price=exit_ref)
intent = _intent(action=DecisionAction.EXIT, price=0.0) # zero intent price
outcome = _outcome(state=KernelStage.CLOSED)
p.persist_result(
snapshot=_snap(), decision=dec, intent=intent,
outcome=outcome, slot_dict=closed_slot, phase="execution",
)
te = _rows_of(rows, "trade_events")
assert te, "trade_events must be written"
assert te[0]["exit_price"] > 0, "exit_price must be > 0 (should fall back to decision price)"
assert te[0]["exit_price"] != entry, "exit_price must not be entry_price"
def test_persist_fill_events_exit_price_uses_event_price(self):
"""persist_fill_events must propagate the venue fill price to trade_events.exit_price."""
entry = 1.1683
fill_price = 1.1650 # actual fill from exchange
pnl = 5027.0 * (entry - fill_price) # SHORT profit
p, rows, acc = _sink_and_persist(capital=25_000.0 + pnl, pnl=pnl)
p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""}
closed_slot = _slot(entry_price=entry, size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1)
fill_ev = _fill_event(price=fill_price, size=5027.0, kind=KernelEventKind.FULL_FILL)
p.persist_fill_events(
snapshot=_snap(), events=[fill_ev], slot_dict=closed_slot,
)
te = _rows_of(rows, "trade_events")
assert te, "trade_events must be written for closed fill"
assert abs(te[0]["exit_price"] - fill_price) < 1e-9, (
f"exit_price ({te[0]['exit_price']}) must match fill price {fill_price}"
)
# ===========================================================================
# 2. CAPITAL_BEFORE / CAPITAL_AFTER
# ===========================================================================
class TestCapitalFields:
"""capital_before and capital_after must be finite, ordered, and non-null."""
def test_capital_after_equals_account_capital(self):
cap_after = 25_147.32
pnl = 147.32
p, rows, acc = _sink_and_persist(capital=cap_after, pnl=pnl)
p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""}
closed_slot = _slot(size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1)
dec = _decision(action=DecisionAction.EXIT, price=1.16)
intent = _intent(action=DecisionAction.EXIT, price=1.16)
outcome = _outcome(state=KernelStage.CLOSED)
p.persist_result(snapshot=_snap(), decision=dec, intent=intent, outcome=outcome, slot_dict=closed_slot)
te = _rows_of(rows, "trade_events")
assert te
assert abs(te[0]["capital_after"] - cap_after) < 1e-6
assert abs(te[0]["capital_before"] - (cap_after - pnl)) < 1e-6
def test_capital_before_lt_after_on_profitable_trade(self):
"""Profitable SHORT: capital_after > capital_before."""
entry = 1.1683
exit_p = 1.1500
pnl = 5027.0 * (entry - exit_p) # positive PnL for SHORT
p, rows, acc = _sink_and_persist(capital=25_000.0 + pnl, pnl=pnl)
p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""}
closed_slot = _slot(size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1)
p.persist_result(
snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT, price=exit_p),
intent=_intent(action=DecisionAction.EXIT, price=exit_p),
outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot,
)
te = _rows_of(rows, "trade_events")
assert te
assert te[0]["capital_after"] > te[0]["capital_before"]
def test_capital_before_gt_after_on_losing_trade(self):
"""Losing SHORT: capital_after < capital_before."""
entry = 1.1683
exit_p = 1.1900
pnl = 5027.0 * (entry - exit_p) # negative for SHORT when price rises
p, rows, acc = _sink_and_persist(capital=25_000.0 + pnl, pnl=pnl)
p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""}
closed_slot = _slot(size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1)
p.persist_result(
snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT, price=exit_p),
intent=_intent(action=DecisionAction.EXIT, price=exit_p),
outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot,
)
te = _rows_of(rows, "trade_events")
assert te
assert te[0]["capital_after"] < te[0]["capital_before"]
def test_exit_leg_capital_fields_populated(self):
"""trade_exit_legs capital_before and capital_after must be finite floats."""
pnl = 120.0
p, rows, acc = _sink_and_persist(capital=25_000.0 + pnl, pnl=pnl)
p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""}
closed_slot = _slot(size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1)
p.persist_result(
snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT, price=1.16),
intent=_intent(action=DecisionAction.EXIT, price=1.16),
outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot,
)
legs = _rows_of(rows, "trade_exit_legs")
assert legs, "trade_exit_legs must be written"
assert math.isfinite(legs[0]["capital_before"])
assert math.isfinite(legs[0]["capital_after"])
assert legs[0]["capital_before"] > 0
assert legs[0]["capital_after"] > 0
# ===========================================================================
# 3. PNL_LEG — exit quantity correctness
# ===========================================================================
class TestPnlLeg:
def test_pnl_leg_correct_single_leg(self):
"""Single-leg exit: pnl_leg must equal total realized_pnl."""
pnl = 200.0
p, rows, acc = _sink_and_persist(capital=25_200.0, pnl=pnl)
p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""}
closed_slot = _slot(size=0.0, realized_pnl=pnl, closed=True, active_leg_index=1)
p.persist_result(
snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT),
intent=_intent(action=DecisionAction.EXIT),
outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot,
)
legs = _rows_of(rows, "trade_exit_legs")
assert legs
assert abs(legs[0]["pnl_leg"] - pnl) < 1e-9
def test_exit_qty_nonzero_when_leg_state_tracked(self):
"""When leg_state has prev_size=5027, exit_qty must be 5027 for full close."""
p, rows, acc = _sink_and_persist(capital=25_000.0)
p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""}
closed_slot = _slot(size=0.0, initial_size=5027.0, realized_pnl=0.0, closed=True, active_leg_index=1)
p.persist_result(
snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT),
intent=_intent(action=DecisionAction.EXIT, size=5027.0),
outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot,
)
legs = _rows_of(rows, "trade_exit_legs")
assert legs
assert legs[0]["exit_qty"] > 0, "exit_qty must be positive when leg was tracked"
def test_exit_qty_nonzero_for_external_position(self):
"""When leg_state missing (external position), exit_qty must still be > 0."""
# XRP position detected via reconcile — no prior ENTER persist
p, rows, acc = _sink_and_persist(capital=25_000.0, pnl=150.0)
# _leg_state["XRP-USDT"] intentionally NOT set
closed_slot = _slot(size=0.0, initial_size=5027.0, realized_pnl=150.0, closed=True, active_leg_index=1)
p.persist_result(
snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT),
intent=_intent(action=DecisionAction.EXIT, size=5027.0),
outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot,
)
legs = _rows_of(rows, "trade_exit_legs")
assert legs, "trade_exit_legs must be written even for external positions"
assert legs[0]["exit_qty"] > 0, (
f"exit_qty ({legs[0]['exit_qty']}) must be > 0 for external position close"
)
def test_multi_leg_pnl_leg_incremental(self):
"""Two-leg exit: each leg's pnl_leg must be the delta, not cumulative."""
p, rows, acc = _sink_and_persist(capital=25_000.0, pnl=0.0)
p._leg_state["T1"] = {"prev_realized": 0.0, "prev_size": 1000.0, "prev_leg_id": ""}
# Leg 1: partial exit, 400 qty, pnl = 50
slot_leg1 = _slot("T1", size=600.0, initial_size=1000.0, realized_pnl=50.0,
closed=False, active_leg_index=1, exit_leg_ratios=(0.4, 1.0))
acc.snapshot.capital = 25_050.0
acc.snapshot.realized_pnl = 50.0
p.persist_result(
snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT, trade_id="T1"),
intent=_intent(action=DecisionAction.EXIT, trade_id="T1", size=400.0),
outcome=_outcome(state=KernelStage.POSITION_OPEN), slot_dict=slot_leg1,
)
# Leg 2: final exit, 600 qty, additional pnl = 80
slot_leg2 = _slot("T1", size=0.0, initial_size=1000.0, realized_pnl=130.0,
closed=True, active_leg_index=2, exit_leg_ratios=(0.4, 1.0))
acc.snapshot.capital = 25_130.0
acc.snapshot.realized_pnl = 130.0
p.persist_result(
snapshot=_snap(), decision=_decision(action=DecisionAction.EXIT, trade_id="T1"),
intent=_intent(action=DecisionAction.EXIT, trade_id="T1", size=600.0),
outcome=_outcome(state=KernelStage.CLOSED), slot_dict=slot_leg2,
)
legs = _rows_of(rows, "trade_exit_legs")
assert len(legs) == 2, f"Expected 2 exit leg rows, got {len(legs)}"
assert abs(legs[0]["pnl_leg"] - 50.0) < 1e-9, f"Leg 1 pnl_leg={legs[0]['pnl_leg']}, want 50"
assert abs(legs[1]["pnl_leg"] - 80.0) < 1e-9, f"Leg 2 pnl_leg={legs[1]['pnl_leg']}, want 80"
# ===========================================================================
# 4. ENTRY_BAR — must not map active_leg_index
# ===========================================================================
class TestEntryBar:
def test_entry_bar_uses_bars_held_not_leg_index(self):
"""entry_bar in position_state must reflect bars_held, not active_leg_index."""
bars = 7
p, rows, acc = _sink_and_persist()
slot = _slot(active_leg_index=3) # leg_index=3, bars_held=7 — must not use 3
dec = _decision(bars_held=bars)
intent = _intent(bars_held=bars)
p.persist_result(
snapshot=_snap(), decision=dec, intent=intent,
outcome=_outcome(), slot_dict=slot,
)
ps = _rows_of(rows, "position_state")
assert ps
assert ps[0]["entry_bar"] != 3, "entry_bar must not be active_leg_index (3)"
assert ps[0]["entry_bar"] == bars, f"entry_bar should be bars_held={bars}"
def test_entry_bar_zero_when_hold(self):
"""HOLD: entry_bar must be 0 (or bars_held=0), not active_leg_index."""
p, rows, acc = _sink_and_persist()
slot = _slot(active_leg_index=5)
dec = _decision(action=DecisionAction.HOLD, bars_held=0)
intent = _intent(action=DecisionAction.HOLD, bars_held=0)
p.persist_result(snapshot=_snap(), decision=dec, intent=intent,
outcome=None, slot_dict=slot)
ps = _rows_of(rows, "position_state")
assert ps
assert ps[0]["entry_bar"] != 5, "entry_bar must not be active_leg_index"
# ===========================================================================
# 5. RECOVERY — trade_id must come from slot, not account dict
# ===========================================================================
class TestRecoveryPersist:
def test_recovery_trade_id_from_kernel_slot(self):
"""persist_recovery_state must read trade_id from kernel.slot(0), not acc_dict."""
slot_view = MagicMock()
slot_view.to_dict.return_value = {
"trade_id": "XRP-USDT",
"asset": "XRP-USDT",
"side": "SHORT",
"entry_price": 1.1683,
"size": 5027.0,
"initial_size": 5027.0,
"realized_pnl": 0.0,
"closed": False,
"active_leg_index": 0,
"leverage": 1.0,
"close_reason": "",
"exit_leg_ratios": [1.0],
}
kernel = MagicMock()
kernel.slot.return_value = slot_view
kernel.snapshot.return_value = {"account": {}}
kernel.max_slots = 1
p, rows, acc = _sink_and_persist(kernel=kernel)
acc_dict = {"capital": 103747.0, "equity": 103747.0, "realized_pnl": 0.0} # no trade_id!
p.persist_recovery_state(
snapshot=_snap(), acc_dict=acc_dict, phase="recovery",
)
recs = _rows_of(rows, "trade_reconstruction")
assert recs, "trade_reconstruction must be written for recovery"
assert recs[0]["trade_id"] == "XRP-USDT", (
f"trade_id must come from kernel slot, got '{recs[0]['trade_id']}'"
)
def test_recovery_trade_id_empty_when_kernel_not_wired(self):
"""Without kernel, recovery trade_id is empty (not crash)."""
p, rows, acc = _sink_and_persist()
acc_dict = {"capital": 25000.0}
p.persist_recovery_state(snapshot=_snap(), acc_dict=acc_dict)
recs = _rows_of(rows, "trade_reconstruction")
assert recs # row is written, just with empty trade_id
def test_recovery_with_open_position_in_kernel(self):
"""Recovery when kernel has open slot writes OPEN status."""
slot_view = MagicMock()
slot_view.to_dict.return_value = _slot(
trade_id="ATOM-T1", asset="ATOM-USDT", size=200.0, closed=False,
)
kernel = MagicMock()
kernel.slot.return_value = slot_view
kernel.snapshot.return_value = {"account": {}}
kernel.max_slots = 1
p, rows, acc = _sink_and_persist(kernel=kernel)
p.persist_recovery_state(snapshot=_snap(), acc_dict={"capital": 25000.0})
ps = _rows_of(rows, "position_state")
assert ps
assert ps[0]["status"] in ("OPEN", "RECOVERED_OPEN", "FLAT")
# ===========================================================================
# 6. FULL ENTER → EXIT lifecycle — all fields non-null
# ===========================================================================
class TestFullLifecycle:
def test_full_enter_exit_no_null_financial_fields(self):
"""Complete ENTER + EXIT: trade_events must have no null financial fields."""
entry_price = 1.1683
exit_price = 1.1500
qty = 5027.0
pnl = qty * (entry_price - exit_price) # SHORT profit
p, rows, acc = _sink_and_persist(capital=25_000.0)
# --- ENTER ---
snap = _snap(entry_price)
enter_slot = _slot(entry_price=entry_price, size=qty, initial_size=qty)
enter_out = _outcome(state=KernelStage.POSITION_OPEN, events=(_fill_event(entry_price, qty, KernelEventKind.FULL_FILL),))
p.persist_result(
snapshot=snap, decision=_decision(DecisionAction.ENTER, price=entry_price, size=qty),
intent=_intent(DecisionAction.ENTER, price=entry_price, size=qty),
outcome=enter_out, slot_dict=enter_slot, phase="execution",
)
# --- EXIT ---
acc.snapshot.capital = 25_000.0 + pnl
acc.snapshot.realized_pnl = pnl
exit_snap = _snap(exit_price)
exit_slot = _slot(
entry_price=entry_price, size=0.0, initial_size=qty,
realized_pnl=pnl, closed=True, active_leg_index=1,
)
exit_fill = _fill_event(exit_price, qty, KernelEventKind.FULL_FILL)
exit_out = _outcome(state=KernelStage.CLOSED, events=(exit_fill,))
p.persist_result(
snapshot=exit_snap, decision=_decision(DecisionAction.EXIT, price=exit_price, size=qty),
intent=_intent(DecisionAction.EXIT, price=exit_price, size=qty),
outcome=exit_out, slot_dict=exit_slot, phase="execution",
)
te = _rows_of(rows, "trade_events")
assert te, "trade_events must be written on close"
row = te[0]
# Critical financial fields must be non-null and finite
for field in ("pnl", "capital_before", "capital_after", "entry_price", "exit_price"):
assert row[field] is not None, f"trade_events.{field} must not be None"
assert math.isfinite(float(row[field])), f"trade_events.{field} must be finite"
assert row["exit_price"] != row["entry_price"], "exit_price must differ from entry_price"
assert row["capital_after"] > row["capital_before"], "profitable trade: cap_after > cap_before"
def test_enter_writes_entry_filled_reconstruction(self):
"""ENTER fill must write ENTRY_FILLED to trade_reconstruction."""
p, rows, acc = _sink_and_persist()
enter_slot = _slot(size=5027.0, initial_size=5027.0)
fill = _fill_event(1.1683, 5027.0, KernelEventKind.FULL_FILL)
out = _outcome(state=KernelStage.POSITION_OPEN, events=(fill,))
p.persist_result(
snapshot=_snap(), decision=_decision(DecisionAction.ENTER),
intent=_intent(DecisionAction.ENTER),
outcome=out, slot_dict=enter_slot,
)
recs = _rows_of(rows, "trade_reconstruction")
types = [r["event_type"] for r in recs]
assert "ENTRY_FILLED" in types, f"Expected ENTRY_FILLED, got: {types}"
def test_exit_writes_trade_event_and_exit_leg(self):
"""Exit close must write both trade_events and trade_exit_legs."""
p, rows, acc = _sink_and_persist(capital=25_000.0, pnl=100.0)
p._leg_state["XRP-USDT"] = {"prev_realized": 0.0, "prev_size": 5027.0, "prev_leg_id": ""}
acc.snapshot.capital = 25_100.0
closed_slot = _slot(size=0.0, realized_pnl=100.0, closed=True, active_leg_index=1)
fill = _fill_event(1.16, 5027.0)
out = _outcome(state=KernelStage.CLOSED, events=(fill,))
p.persist_result(
snapshot=_snap(), decision=_decision(DecisionAction.EXIT),
intent=_intent(DecisionAction.EXIT, price=1.16),
outcome=out, slot_dict=closed_slot,
)
assert _rows_of(rows, "trade_events"), "trade_events must be written"
assert _rows_of(rows, "trade_exit_legs"), "trade_exit_legs must be written"
# ===========================================================================
# 7. CHAOS / FUZZ TESTS
# ===========================================================================
class TestChaosPersistence:
"""Resilience: None fields, NaN, zero prices, missing keys — must not crash."""
def test_none_entry_price_in_slot(self):
"""slot_dict.entry_price=None must not crash or produce NaN."""
p, rows, acc = _sink_and_persist(capital=25_000.0, pnl=50.0)
p._leg_state["T1"] = {"prev_realized": 0.0, "prev_size": 100.0, "prev_leg_id": ""}
bad_slot = _slot("T1", entry_price=None, size=0.0, realized_pnl=50.0, closed=True)
bad_slot["entry_price"] = None # override to None
p.persist_result(
snapshot=_snap(), decision=_decision(DecisionAction.EXIT, price=1.16),
intent=_intent(DecisionAction.EXIT, price=1.16, trade_id="T1"),
outcome=_outcome(state=KernelStage.CLOSED), slot_dict=bad_slot,
)
te = _rows_of(rows, "trade_events")
assert te, "Must write trade_events even with None entry_price"
assert math.isfinite(te[0]["capital_after"])
def test_nan_realized_pnl_in_slot(self):
"""NaN realized_pnl must be sanitized to 0.0, not propagated."""
p, rows, acc = _sink_and_persist()
p._leg_state["T1"] = {"prev_realized": 0.0, "prev_size": 100.0, "prev_leg_id": ""}
bad_slot = _slot("T1", size=0.0, realized_pnl=float("nan"), closed=True)
p.persist_result(
snapshot=_snap(), decision=_decision(DecisionAction.EXIT, trade_id="T1"),
intent=_intent(DecisionAction.EXIT, trade_id="T1"),
outcome=_outcome(state=KernelStage.CLOSED), slot_dict=bad_slot,
)
te = _rows_of(rows, "trade_events")
if te:
assert math.isfinite(te[0]["pnl"]), "NaN pnl must be sanitized"
def test_zero_exit_price_does_not_crash(self):
"""zero reference_price for exit must not produce NaN/inf or crash."""
p, rows, acc = _sink_and_persist()
p._leg_state["T1"] = {"prev_realized": 0.0, "prev_size": 100.0, "prev_leg_id": ""}
closed_slot = _slot("T1", size=0.0, closed=True)
p.persist_result(
snapshot=_snap(), decision=_decision(DecisionAction.EXIT, price=0.0, trade_id="T1"),
intent=_intent(DecisionAction.EXIT, price=0.0, trade_id="T1"),
outcome=_outcome(state=KernelStage.CLOSED), slot_dict=closed_slot,
)
# Must not raise; rows may be sparse but no crash
def test_missing_slot_keys_gracefully_handled(self):
"""Minimally-populated slot_dict must not crash persistence."""
p, rows, acc = _sink_and_persist()
minimal_slot = {"closed": True} # only closed key
p.persist_result(
snapshot=_snap(), decision=_decision(DecisionAction.EXIT),
intent=_intent(DecisionAction.EXIT),
outcome=_outcome(state=KernelStage.CLOSED), slot_dict=minimal_slot,
)
# Must not raise
def test_hold_decision_writes_status_snapshot(self):
"""HOLD must still emit status_snapshots, position_state, account_events."""
p, rows, acc = _sink_and_persist()
p.persist_result(
snapshot=_snap(), decision=_decision(action=DecisionAction.HOLD),
intent=_intent(action=DecisionAction.HOLD),
outcome=None, slot_dict=_slot(),
)
assert _rows_of(rows, "status_snapshots"), "HOLD must write status_snapshots"
assert _rows_of(rows, "account_events"), "HOLD must write account_events"
assert _rows_of(rows, "position_state"), "HOLD must write position_state"
def test_infinite_capital_does_not_propagate(self):
"""If account capital is somehow inf, output rows must still be finite."""
p, rows, acc = _sink_and_persist()
acc.snapshot.capital = float("inf") # corrupt
p.persist_result(
snapshot=_snap(), decision=_decision(), intent=_intent(),
outcome=_outcome(), slot_dict=_slot(),
)
# Rows should still be written; capital fields may be 0 or clamped
ss = _rows_of(rows, "status_snapshots")
assert ss
def test_empty_events_tuple_in_outcome(self):
"""Empty emitted_events must not cause ENTER to log ENTRY_FILLED."""
p, rows, acc = _sink_and_persist()
open_slot = _slot(size=5027.0)
out = _outcome(events=()) # no fill events
p.persist_result(
snapshot=_snap(), decision=_decision(DecisionAction.ENTER),
intent=_intent(DecisionAction.ENTER),
outcome=out, slot_dict=open_slot,
)
recs = _rows_of(rows, "trade_reconstruction")
types = [r["event_type"] for r in recs]
# slot_open=True so ENTRY_FILLED IS expected (size>0)
# This is the current design — verify no crash
assert recs is not None
def test_partial_exit_followed_by_final_exit(self):
"""Two-leg exit must produce two trade_exit_legs rows and one trade_events."""
p, rows, acc = _sink_and_persist(capital=25_000.0, pnl=0.0)
p._leg_state["T2"] = {"prev_realized": 0.0, "prev_size": 1000.0, "prev_leg_id": ""}
# Partial exit
slot_p = _slot("T2", size=600.0, initial_size=1000.0, realized_pnl=50.0,
active_leg_index=1, exit_leg_ratios=(0.4, 1.0))
acc.snapshot.capital = 25_050.0
acc.snapshot.realized_pnl = 50.0
partial_fill = _fill_event(trade_id="T2", price=1.16, size=400.0, kind=KernelEventKind.PARTIAL_FILL)
p.persist_result(
snapshot=_snap(), decision=_decision(DecisionAction.EXIT, trade_id="T2"),
intent=_intent(DecisionAction.EXIT, trade_id="T2", size=400.0),
outcome=_outcome(state=KernelStage.POSITION_OPEN, events=(partial_fill,)),
slot_dict=slot_p,
)
# Final exit
slot_f = _slot("T2", size=0.0, initial_size=1000.0, realized_pnl=130.0,
closed=True, active_leg_index=2, exit_leg_ratios=(0.4, 1.0))
acc.snapshot.capital = 25_130.0
acc.snapshot.realized_pnl = 130.0
final_fill = _fill_event(trade_id="T2", price=1.15, size=600.0)
p.persist_result(
snapshot=_snap(), decision=_decision(DecisionAction.EXIT, trade_id="T2"),
intent=_intent(DecisionAction.EXIT, trade_id="T2", size=600.0),
outcome=_outcome(state=KernelStage.CLOSED, events=(final_fill,)),
slot_dict=slot_f,
)
legs = _rows_of(rows, "trade_exit_legs")
te = _rows_of(rows, "trade_events")
assert len(legs) == 2, f"Expected 2 exit legs, got {len(legs)}"
assert len(te) == 1, f"Expected 1 trade_event, got {len(te)}"
assert all(math.isfinite(l["pnl_leg"]) for l in legs), "All pnl_leg must be finite"
def test_restart_mid_trade_recovery_then_exit(self):
"""Simulates restart: kernel has open position, recovery runs, then exit."""
slot_view = MagicMock()
slot_view.to_dict.return_value = _slot("ATOM-T1", "ATOM-USDT", "SHORT",
entry_price=1.802, size=26152.81,
initial_size=26152.81)
kernel = MagicMock()
kernel.slot.return_value = slot_view
kernel.snapshot.return_value = {"account": {}}
kernel.max_slots = 1
p, rows, acc = _sink_and_persist(capital=25_000.0, kernel=kernel)
# Recovery after restart
p.persist_recovery_state(
snapshot=_snap(), acc_dict={"capital": 25_000.0}, phase="recovery",
)
# Now exit (position detected via reconcile)
acc.snapshot.capital = 25_100.0
acc.snapshot.realized_pnl = 100.0
closed_slot = _slot("ATOM-T1", "ATOM-USDT", "SHORT", entry_price=1.802,
size=0.0, initial_size=26152.81, realized_pnl=100.0,
closed=True)
fill_ev = _fill_event(trade_id="ATOM-T1", asset="ATOM-USDT", price=1.798, size=26152.81)
p.persist_result(
snapshot=_snap(), decision=_decision(DecisionAction.EXIT, asset="ATOM-USDT", trade_id="ATOM-T1"),
intent=_intent(DecisionAction.EXIT, asset="ATOM-USDT", trade_id="ATOM-T1", size=26152.81),
outcome=_outcome(state=KernelStage.CLOSED, events=(fill_ev,)),
slot_dict=closed_slot,
)
te = _rows_of(rows, "trade_events")
assert te, "trade_events must be written for exit after recovery"
assert math.isfinite(te[0]["capital_after"])
def test_persist_fill_events_partial_fill_updates_leg_state(self):
"""PARTIAL_FILL via pump must update leg_state for next leg."""
p, rows, acc = _sink_and_persist(capital=25_000.0, pnl=0.0)
p._leg_state["T3"] = {"prev_realized": 0.0, "prev_size": 1000.0, "prev_leg_id": ""}
partial_slot = _slot("T3", size=600.0, initial_size=1000.0, realized_pnl=40.0)
partial_fill = _fill_event(trade_id="T3", price=1.16, size=400.0, kind=KernelEventKind.PARTIAL_FILL)
acc.snapshot.capital = 25_040.0
p.persist_fill_events(snapshot=_snap(), events=[partial_fill], slot_dict=partial_slot)
# leg_state should now have prev_size=600
assert "T3" in p._leg_state
assert abs(p._leg_state["T3"]["prev_size"] - 600.0) < 1e-9, (
f"leg_state.prev_size should be 600 after partial fill, got {p._leg_state['T3']['prev_size']}"
)

View File

@@ -17,6 +17,7 @@ Capital/peak_capital/trade_seq are read from the kernel's AccountProjection
from __future__ import annotations from __future__ import annotations
import json import json
import logging
import math import math
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -28,6 +29,7 @@ from prod.clean_arch.dita_v2.contracts import KernelDiagnosticCode, KernelEventK
from prod.clean_arch.dita_v2.contracts import KernelSeverity, TradeStage as KernelStage from prod.clean_arch.dita_v2.contracts import KernelSeverity, TradeStage as KernelStage
Writer = Callable[[str, dict[str, Any]], None] Writer = Callable[[str, dict[str, Any]], None]
_log = logging.getLogger(__name__)
def _json_safe(value: Any) -> Any: def _json_safe(value: Any) -> Any:
@@ -78,6 +80,48 @@ def _safe_float(value: Any, default: float = 0.0) -> float:
return out return out
def _checked_float(
value: Any,
default: float = 0.0,
*,
field: str = "?",
trade_id: str = "",
sink: Writer | None = None,
) -> float:
"""_safe_float with anomaly tracing.
Any NaN/inf/non-numeric value is a bug indicator, not a normal condition.
Sanitise to ``default`` but log a WARNING and optionally write an
``anomaly_events`` spool row so the trace is queryable in ClickHouse.
"""
try:
out = float(value)
except Exception:
out = float("nan")
if not math.isfinite(out):
_log.warning(
"NaN/inf in financial field field=%s trade_id=%s raw=%r → replacing with %s",
field, trade_id or "?", value, default,
)
if sink is not None:
try:
sink("anomaly_events", {
"ts": datetime.now(timezone.utc).isoformat(),
"decision_id": "",
"trade_id": trade_id,
"symbol": "",
"anomaly": f"NaN_FINANCIAL_FIELD:{field}",
"origin": "persistence_nan_guard",
"sensor": field,
"detail": f"raw={value!r} replaced_with={default}",
"rm_meta": 0.0,
})
except Exception:
pass
return default
return out
def _decision_summary(decision: Decision | None) -> dict[str, Any]: def _decision_summary(decision: Decision | None) -> dict[str, Any]:
if decision is None: if decision is None:
return {} return {}
@@ -410,9 +454,22 @@ class PinkClickHousePersistence:
return return
partial = (not slot_closed) and cur_size > 0.0 partial = (not slot_closed) and cur_size > 0.0
# Extract the fill price from emitted venue events (G21 fix): the actual
# exchange fill price lives in the FULL_FILL/PARTIAL_FILL event, not in
# the slot dict. Pass it explicitly so _write_trade_event does not fall
# back to entry_price.
fill_price_hint = 0.0
for ev in events:
p_val = getattr(ev, "price", 0.0)
if p_val and math.isfinite(float(p_val)) and float(p_val) > 0:
fill_price_hint = float(p_val)
break
# One trade_exit_legs row per exit leg (partial or final), BLUE-schema # One trade_exit_legs row per exit leg (partial or final), BLUE-schema
# compatible so PINK multi-exit trades reconcile against the same table. # compatible so PINK multi-exit trades reconcile against the same table.
self._write_trade_exit_leg(snapshot, decision, intent, slot, outcome) self._write_trade_exit_leg(snapshot, decision, intent, slot, outcome,
fill_price_hint=fill_price_hint)
self._write_trade_reconstruction( self._write_trade_reconstruction(
snapshot, intent.trade_id, snapshot, intent.trade_id,
event_type="PARTIAL_EXIT" if partial else "EXIT", event_type="PARTIAL_EXIT" if partial else "EXIT",
@@ -428,7 +485,9 @@ class PinkClickHousePersistence:
) )
# Terminal trade event. # Terminal trade event.
if slot_closed: if slot_closed:
self._write_trade_event(snapshot, decision, intent, slot, outcome, market_state=market_state) self._write_trade_event(snapshot, decision, intent, slot, outcome,
market_state=market_state,
exit_price_hint=fill_price_hint)
def persist_fill_events( def persist_fill_events(
self, self,
@@ -455,7 +514,12 @@ class PinkClickHousePersistence:
closed = bool(slot.get("closed", False)) closed = bool(slot.get("closed", False))
cur_size = self._slot_size(slot) cur_size = self._slot_size(slot)
leverage = _safe_float(slot.get("leverage", 1.0), 1.0) leverage = _safe_float(slot.get("leverage", 1.0), 1.0)
price = next((float(getattr(e, "price", 0.0) or 0.0) for e in event_list if getattr(e, "price", 0.0)), 0.0) or self._slot_entry_price(slot) # Extract fill price from venue events (used as exit_price_hint for G21 fix).
price = next(
(float(getattr(e, "price", 0.0)) for e in event_list
if getattr(e, "price", 0.0) and math.isfinite(float(getattr(e, "price", 0.0)))),
0.0,
) or self._slot_entry_price(slot)
prev_size = _safe_float(self._leg_state.get(trade_id, {}).get("prev_size", 0.0), 0.0) prev_size = _safe_float(self._leg_state.get(trade_id, {}).get("prev_size", 0.0), 0.0)
is_exit = closed or (prev_size > 0.0 and cur_size < prev_size - 1e-12) is_exit = closed or (prev_size > 0.0 and cur_size < prev_size - 1e-12)
action = DecisionAction.EXIT if is_exit else DecisionAction.ENTER action = DecisionAction.EXIT if is_exit else DecisionAction.ENTER
@@ -493,27 +557,47 @@ class PinkClickHousePersistence:
event_type: str = "RECOVERY", event_type: str = "RECOVERY",
market_state: Mapping[str, Any] | None = None, market_state: Mapping[str, Any] | None = None,
) -> None: ) -> None:
"""Persist recovery-only state after kernel reconcile.""" """Persist recovery-only state after kernel reconcile.
slot_dict = acc_dict or {}
A15 fix: acc_dict is the kernel account snapshot (capital/equity/pnl),
not a slot dict. Read the actual slot from kernel.slot(0) so that
trade_id, asset, size, and entry_price are correctly populated in the
recovery rows.
"""
# A15: read slot from kernel instead of misusing acc_dict as slot dict.
slot_dict: dict[str, Any] = {}
if self._kernel is not None:
try:
slot_view = self._kernel.slot(0)
raw = slot_view.to_dict() if hasattr(slot_view, "to_dict") else {}
slot_dict = dict(raw) if raw else {}
except Exception:
pass
trade_id = (
str(slot_dict.get("trade_id") or "")
or (str(acc_dict.get("trade_id", "")) if acc_dict else "")
)
self._write_status_snapshot( self._write_status_snapshot(
snapshot, decision=None, intent=None, slot_dict={}, phase=phase, snapshot, decision=None, intent=None, slot_dict=slot_dict, phase=phase,
) )
self._write_account_event( self._write_account_event(
snapshot, decision=None, intent=None, snapshot, decision=None, intent=None,
stage=TradeStage.TRADE_TERMINAL_WRITTEN, stage=TradeStage.TRADE_TERMINAL_WRITTEN,
slot_dict={}, event_type=event_type, slot_dict=slot_dict, event_type=event_type,
) )
self._write_position_state( self._write_position_state(
snapshot, decision=None, intent=None, snapshot, decision=None, intent=None,
slot_dict={}, stage=TradeStage.TRADE_TERMINAL_WRITTEN, slot_dict=slot_dict, stage=TradeStage.TRADE_TERMINAL_WRITTEN,
status=self._state_label({}, phase), market_state=market_state, status=self._state_label(slot_dict, phase), market_state=market_state,
) )
self._write_trade_reconstruction( self._write_trade_reconstruction(
snapshot, snapshot,
trade_id=acc_dict.get("trade_id", "") if acc_dict else "", trade_id=trade_id,
event_type=event_type, event_type=event_type,
event_id=f"recovery:{phase}", event_id=f"recovery:{phase}",
payload={"acc_dict": _json_safe(acc_dict or {}), "phase": phase, "market_state": _json_safe(market_state or {})}, payload={"acc_dict": _json_safe(acc_dict or {}), "slot": _json_safe(slot_dict), "phase": phase, "market_state": _json_safe(market_state or {})},
market_state=market_state, market_state=market_state,
) )
@@ -736,7 +820,9 @@ class PinkClickHousePersistence:
"notional": _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)), "notional": _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)),
"leverage": _safe_float(slot_dict.get("leverage", 0.0), 0.0), "leverage": _safe_float(slot_dict.get("leverage", 0.0), 0.0),
"bucket_id": -1, "bucket_id": -1,
"entry_bar": int(slot_dict.get("active_leg_index", 0) or 0), # A14 fix: active_leg_index is the exit-leg counter, not the bar count.
# Use intent.bars_held when available; fall back to 0.
"entry_bar": int(intent.bars_held if intent is not None else 0) or 0,
"status": status, "status": status,
"exit_reason": slot_dict.get("close_reason", ""), "exit_reason": slot_dict.get("close_reason", ""),
"pnl": _safe_float(slot_dict.get("realized_pnl", 0.0), 0.0), "pnl": _safe_float(slot_dict.get("realized_pnl", 0.0), 0.0),
@@ -789,6 +875,7 @@ class PinkClickHousePersistence:
def _write_trade_exit_leg( def _write_trade_exit_leg(
self, snapshot: Any, decision: Decision, intent: Intent, self, snapshot: Any, decision: Decision, intent: Intent,
slot_dict: dict[str, Any], outcome: KernelOutcome | None, slot_dict: dict[str, Any], outcome: KernelOutcome | None,
fill_price_hint: float = 0.0,
) -> None: ) -> None:
"""Emit one BLUE-schema-compatible ``trade_exit_legs`` row per exit leg. """Emit one BLUE-schema-compatible ``trade_exit_legs`` row per exit leg.
@@ -805,14 +892,20 @@ class PinkClickHousePersistence:
"prev_leg_id": "", "prev_leg_id": "",
} }
entry_price = self._slot_entry_price(slot_dict) or _safe_float(intent.reference_price, 0.0) entry_price = self._slot_entry_price(slot_dict) or _safe_float(intent.reference_price, 0.0)
exit_price = _safe_float(intent.reference_price, 0.0) or _safe_float(decision.reference_price, 0.0) # G21 fix: use fill price hint (actual exchange fill) > intent ref > decision ref.
exit_price = (
fill_price_hint
or _safe_float(intent.reference_price, 0.0)
or _safe_float(decision.reference_price, 0.0)
)
side = self._slot_side(slot_dict) side = self._slot_side(slot_dict)
if side == TradeSide.FLAT: if side == TradeSide.FLAT:
side = intent.side side = intent.side
leverage_val = _safe_float(slot_dict.get("leverage", intent.leverage), 1.0) leverage_val = _safe_float(slot_dict.get("leverage", intent.leverage), 1.0)
cur_size = self._slot_size(slot_dict) cur_size = self._slot_size(slot_dict)
cur_realized = _safe_float(slot_dict.get("realized_pnl", 0.0), 0.0) cur_realized = _checked_float(slot_dict.get("realized_pnl", 0.0), 0.0,
field="realized_pnl", trade_id=trade_id, sink=self._sink)
prev_size = _safe_float(prev.get("prev_size", 0.0), 0.0) prev_size = _safe_float(prev.get("prev_size", 0.0), 0.0)
prev_realized = _safe_float(prev.get("prev_realized", 0.0), 0.0) prev_realized = _safe_float(prev.get("prev_realized", 0.0), 0.0)
@@ -822,7 +915,15 @@ class PinkClickHousePersistence:
leg_index = max(0, int(slot_dict.get("active_leg_index", 0) or 0) - 1) leg_index = max(0, int(slot_dict.get("active_leg_index", 0) or 0) - 1)
fraction = _safe_float(ratios[leg_index], 0.0) if 0 <= leg_index < len(ratios) else 0.0 fraction = _safe_float(ratios[leg_index], 0.0) if 0 <= leg_index < len(ratios) else 0.0
exit_qty = max(0.0, prev_size - cur_size) # External-position fix: if leg_state was never seeded (position detected via
# reconcile/pump rather than our own ENTER), prev_size=0 would make exit_qty=0.
# Fall back to initial_size or intent.target_size so the leg row is meaningful.
if prev_size <= 1e-12:
# No prior leg tracking — use the slot's initial_size or intent size.
initial = _safe_float(slot_dict.get("initial_size", 0.0), 0.0) or _safe_float(intent.target_size, 0.0)
exit_qty = initial - cur_size if initial > cur_size else initial
else:
exit_qty = max(0.0, prev_size - cur_size)
pnl_leg = cur_realized - prev_realized pnl_leg = cur_realized - prev_realized
capital_after = self._capital() capital_after = self._capital()
capital_before = capital_after - pnl_leg capital_before = capital_after - pnl_leg
@@ -852,6 +953,7 @@ class PinkClickHousePersistence:
"side": side.value, "side": side.value,
"entry_price": entry_price, "entry_price": entry_price,
"exit_price": exit_price, "exit_price": exit_price,
"exit_qty": exit_qty,
"fraction": fraction, "fraction": fraction,
"capital_before": capital_before, "capital_before": capital_before,
"capital_after": capital_after, "capital_after": capital_after,
@@ -875,11 +977,23 @@ class PinkClickHousePersistence:
self, snapshot: Any, decision: Decision, intent: Intent, self, snapshot: Any, decision: Decision, intent: Intent,
slot_dict: dict[str, Any], outcome: KernelOutcome | None, slot_dict: dict[str, Any], outcome: KernelOutcome | None,
*, market_state: Mapping[str, Any] | None = None, *, market_state: Mapping[str, Any] | None = None,
exit_price_hint: float = 0.0,
) -> None: ) -> None:
entry_price = _safe_float(slot_dict.get("entry_price", 0.0), 0.0) or _safe_float(intent.reference_price, 0.0) entry_price = _safe_float(slot_dict.get("entry_price", 0.0), 0.0) or _safe_float(intent.reference_price, 0.0)
quantity = _safe_float(slot_dict.get("initial_size", slot_dict.get("size", 0.0)), 0.0) or _safe_float(intent.target_size, 0.0) quantity = _safe_float(slot_dict.get("initial_size", slot_dict.get("size", 0.0)), 0.0) or _safe_float(intent.target_size, 0.0)
exit_price = _safe_float(slot_dict.get("entry_price", 0.0), 0.0) # G21 fix: exit_price is the fill/order price, NOT the entry price.
pnl = _safe_float(slot_dict.get("realized_pnl", 0.0), 0.0) # Priority: explicit hint (fill event price) > intent reference price > decision price.
# Fall back to entry_price only as absolute last resort (avoids the G21 bug where
# every trade_events row had exit_price == entry_price and PnL reconstruction was zero).
exit_price = (
exit_price_hint
or _safe_float(intent.reference_price, 0.0)
or _safe_float(decision.reference_price, 0.0)
or _safe_float(slot_dict.get("entry_price", 0.0), 0.0)
)
tid = intent.trade_id if intent is not None else ""
pnl = _checked_float(slot_dict.get("realized_pnl", 0.0), 0.0,
field="realized_pnl", trade_id=tid, sink=self._sink)
pnl_pct = 0.0 pnl_pct = 0.0
leverage_val = _safe_float(slot_dict.get("leverage", intent.leverage), 1.0) leverage_val = _safe_float(slot_dict.get("leverage", intent.leverage), 1.0)
denom = abs(quantity * entry_price * max(leverage_val, 1e-9)) denom = abs(quantity * entry_price * max(leverage_val, 1e-9))