"""PINK ClickHouse persistence — DITAv2-backed, reads capital from kernel. Row families preserved (same schema, no new columns): - policy_events / v7_decision_events - position_state - account_events - status_snapshots - trade_events - trade_reconstruction - trade_exit_legs - anomaly_events Capital/peak_capital/trade_seq are read from the kernel's AccountProjection (single authority). No duplicate tracking in this module. """ from __future__ import annotations import json import logging import math from dataclasses import dataclass from datetime import datetime, timezone from enum import Enum from typing import Any, Callable, Mapping, Optional from prod.clean_arch.dita import AccountProjection, Decision, DecisionAction, Intent, TradeSide, TradeStage from prod.clean_arch.dita_v2.contracts import KernelDiagnosticCode, KernelEventKind, KernelOutcome from prod.clean_arch.dita_v2.contracts import KernelSeverity, TradeStage as KernelStage Writer = Callable[[str, dict[str, Any]], None] _log = logging.getLogger(__name__) def _json_safe(value: Any) -> Any: if isinstance(value, Enum): return value.value if isinstance(value, dict): return {str(key): _json_safe(val) for key, val in value.items()} if isinstance(value, (list, tuple)): return [_json_safe(item) for item in value] if hasattr(value, "isoformat"): try: return value.isoformat() except Exception: pass if hasattr(value, "__dict__"): try: return _json_safe(dict(vars(value))) except Exception: pass return value def _json_text(value: Any) -> str: return json.dumps(_json_safe(value), separators=(",", ":"), ensure_ascii=False, default=str) def _direction(side: TradeSide) -> int: return -1 if side == TradeSide.SHORT else 1 def _direction_from_str(side: str) -> int: return -1 if side.upper() in ("SHORT", "SELL") else 1 def _notional(size: float, price: float) -> float: if not math.isfinite(size) or not math.isfinite(price): return 0.0 return abs(size) * abs(price) def _safe_float(value: Any, default: float = 0.0) -> float: try: out = float(value) except Exception: return default if not math.isfinite(out): return default return out def _checked_float( value: Any, default: float = 0.0, *, field: str = "?", trade_id: str = "", sink: Writer | None = None, ) -> float: """_safe_float with anomaly tracing. Any NaN/inf/non-numeric value is a bug indicator, not a normal condition. Sanitise to ``default`` but log a WARNING and optionally write an ``anomaly_events`` spool row so the trace is queryable in ClickHouse. """ try: out = float(value) except Exception: out = float("nan") if not math.isfinite(out): _log.warning( "NaN/inf in financial field field=%s trade_id=%s raw=%r → replacing with %s", field, trade_id or "?", value, default, ) if sink is not None: try: sink("anomaly_events", { "ts": datetime.now(timezone.utc).isoformat(), "decision_id": "", "trade_id": trade_id, "symbol": "", "anomaly": f"NaN_FINANCIAL_FIELD:{field}", "origin": "persistence_nan_guard", "sensor": field, "detail": f"raw={value!r} replaced_with={default}", "rm_meta": 0.0, }) except Exception: pass return default return out def _decision_summary(decision: Decision | None) -> dict[str, Any]: if decision is None: return {} return { "timestamp": decision.timestamp.isoformat() if hasattr(decision.timestamp, "isoformat") else str(decision.timestamp), "decision_id": decision.decision_id, "asset": decision.asset, "action": decision.action.value, "side": decision.side.value, "reason": decision.reason, "confidence": float(decision.confidence or 0.0), "velocity_divergence": float(decision.velocity_divergence or 0.0), "irp_alignment": float(decision.irp_alignment or 0.0), "reference_price": float(decision.reference_price or 0.0), "target_size": float(decision.target_size or 0.0), "leverage": float(decision.leverage or 0.0), "bars_held": int(decision.bars_held or 0), "stage": decision.stage.value, "metadata": _json_safe(decision.metadata), } def _intent_summary(intent: Intent | None) -> dict[str, Any]: if intent is None: return {} return { "timestamp": intent.timestamp.isoformat() if hasattr(intent.timestamp, "isoformat") else str(intent.timestamp), "trade_id": intent.trade_id, "decision_id": intent.decision_id, "asset": intent.asset, "action": intent.action.value, "side": intent.side.value, "reason": intent.reason, "target_size": float(intent.target_size or 0.0), "leverage": float(intent.leverage or 0.0), "reference_price": float(intent.reference_price or 0.0), "confidence": float(intent.confidence or 0.0), "bars_held": int(intent.bars_held or 0), "stage": intent.stage.value, "exit_leg_ratios": [float(r) for r in intent.exit_leg_ratios], "metadata": _json_safe(intent.metadata), } def _outcome_summary(outcome: KernelOutcome | None) -> dict[str, Any]: if outcome is None: return {} return { "accepted": bool(outcome.accepted), "slot_id": int(outcome.slot_id), "trade_id": outcome.trade_id, "state": outcome.state.value, "diagnostic_code": outcome.diagnostic_code.value, "severity": outcome.severity.value, "details": _json_safe(outcome.details), } @dataclass(frozen=True) class PinkClickHousePersistenceConfig: """Row-shape knobs for the PINK ClickHouse mirror.""" strategy: str = "pink" runtime_namespace: str = "pink" strategy_namespace: str = "pink" event_namespace: str = "pink" actor_name: str = "PinkDirectRuntime" exec_venue: str = "bingx" data_venue: str = "binance" ledger_authority: str = "exchange" initial_capital: float = 25_000.0 max_account_leverage: float = 3.0 exchange_leverage_mode: str = "" leverage_mapping_rule: str = "round_half_even_linear_0.5_to_9.0_to_1_to_exchange_cap" class PinkClickHousePersistence: """Durable PINK ClickHouse sink — capital reads from kernel AccountProjection.""" def __init__( self, account: AccountProjection, *, config: PinkClickHousePersistenceConfig | None = None, sink: Writer | None = None, v7_sink: Writer | None = None, kernel: Optional[Any] = None, # ExecutionKernel — optional; enables event_seq/reconcile writes ) -> None: self.account = account self._kernel = kernel # set post-construction via set_kernel() if needed self.config = config or PinkClickHousePersistenceConfig( runtime_namespace=account.runtime_namespace, strategy_namespace=account.strategy_namespace, event_namespace=account.event_namespace, actor_name=account.actor_name, exec_venue=account.exec_venue, data_venue=account.data_venue, ledger_authority=account.ledger_authority, initial_capital=float(account.snapshot.capital or 25_000.0), ) self._sink = sink or self._resolve_sink("pink") self._v7_sink = v7_sink or self._resolve_v7_sink("pink") self._leg_state: dict[str, dict[str, Any]] = {} def set_kernel(self, kernel: Any) -> None: """Wire the ExecutionKernel after construction (e.g. from connect()).""" self._kernel = kernel @staticmethod def _resolve_sink(strategy: str) -> Writer: from prod.ch_writer import ch_put_pink return ch_put_pink @staticmethod def _resolve_v7_sink(strategy: str) -> Writer: from prod.ch_writer import ch_put_pink_v7 return ch_put_pink_v7 def _account_event_seq(self) -> int: """event_seq from kernel's atomic account state; 0 when kernel not wired.""" if self._kernel is None: return 0 try: return int(self._kernel.snapshot().get("account", {}).get("event_seq", 0) or 0) except Exception: return 0 def _kernel_account(self) -> dict: """Full kernel account snapshot dict; empty dict when kernel not wired.""" if self._kernel is None: return {} try: return self._kernel.snapshot().get("account", {}) or {} except Exception: return {} def write_reconcile_event( self, *, event_seq: int, ts: Optional[Any] = None, k_capital: float = 0.0, e_wallet_balance: float = 0.0, delta: float = 0.0, status: str = "OK", explanation: str = "", ) -> None: """ Persist one reconcile record to the reconcile_events table. Idempotent: if the same event_seq is written twice, CH dedup (via ReplacingMergeTree on event_seq) keeps the latest row only. """ from datetime import datetime, timezone ts_val = ts or datetime.now(timezone.utc).isoformat() self._sink("reconcile_events", { "timestamp": ts_val if isinstance(ts_val, str) else ts_val.isoformat(), "runtime_namespace": self.config.runtime_namespace, "strategy_namespace": self.config.strategy_namespace, "event_seq": int(event_seq), "k_capital": float(k_capital), "e_wallet_balance": float(e_wallet_balance), "delta": float(delta), "reconcile_status": str(status), "explanation": str(explanation), }) def _capital(self) -> float: return float(self.account.snapshot.capital or 0.0) def _peak_capital(self) -> float: return float(getattr(self.account.snapshot, "peak_capital", self._capital()) or self._capital()) def _trade_seq(self) -> int: return int(getattr(self.account.snapshot, "trade_seq", 0) or 0) def _equity(self) -> float: return float(self.account.snapshot.equity or self._capital()) # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def persist_step( self, *, snapshot: Any, decision: Decision, intent: Intent, outcome: KernelOutcome | None = None, slot_dict: dict[str, Any] | None = None, acc_dict: dict[str, Any] | None = None, phase: str = "step", market_state: Mapping[str, Any] | None = None, ) -> None: """Two-phase persist: log the REQUEST, then log the RESULT. REQUEST (:meth:`persist_request`) — the decision/order that was submitted (policy_events + a trade_reconstruction ORDER_REQUESTED row). RESULT (:meth:`persist_result`) — the settled state snapshot plus the per-fill lifecycle rows, gated on *evidence of an actual fill*. A resting LIMIT order (ACK only, no fill) therefore emits state snapshots but no terminal rows; the async-fill pump persists those later via the same result path. The synchronous-MARKET path is unchanged: its FILL event (or the slot's filled/closed state) trips the same gate. """ self.persist_request( snapshot=snapshot, decision=decision, intent=intent, phase=phase, market_state=market_state, ) self.persist_result( snapshot=snapshot, decision=decision, intent=intent, outcome=outcome, slot_dict=slot_dict, phase=phase, market_state=market_state, ) def persist_request( self, *, snapshot: Any, decision: Decision, intent: Intent, phase: str = "step", market_state: Mapping[str, Any] | None = None, ) -> None: """Phase 1 — log the requested decision/order (no fill data).""" self._write_policy_event(snapshot, decision, intent, phase=phase) if decision.action in (DecisionAction.ENTER, DecisionAction.EXIT): self._write_trade_reconstruction( snapshot, intent.trade_id, event_type="ORDER_REQUESTED", event_id=f"{intent.trade_id}:request:{decision.action.value.lower()}", payload={ "decision": _decision_summary(decision), "intent": _intent_summary(intent), "market_state": _json_safe(market_state or {}), }, market_state=market_state, ) def persist_result( self, *, snapshot: Any, decision: Decision, intent: Intent, outcome: KernelOutcome | None = None, slot_dict: dict[str, Any] | None = None, phase: str = "step", market_state: Mapping[str, Any] | None = None, ) -> None: """Phase 2 — log the settled state + per-fill lifecycle rows. The state snapshot rows (account_events, position_state, status_snapshots) always reflect the current slot. The lifecycle rows (ENTRY_FILLED / PARTIAL_EXIT / EXIT / trade_events / trade_exit_legs) are emitted only when a fill is *evidenced* — a FULL/PARTIAL_FILL event in ``outcome.emitted_events``, a closed slot, or a slot whose size dropped vs the last leg snapshot. A resting LIMIT (ACK only) emits no terminal rows here. """ slot = slot_dict or {} stage = ( TradeStage(decision.stage.value) if hasattr(decision.stage, "value") else TradeStage(decision.stage) if isinstance(decision.stage, str) else TradeStage.ORDER_REQUESTED ) status = self._state_label(slot, phase) self._write_account_event(snapshot, decision, intent, stage=stage, slot_dict=slot) self._write_position_state(snapshot, decision, intent, slot_dict=slot, stage=stage, status=status, market_state=market_state) self._write_status_snapshot(snapshot, decision, intent, slot_dict=slot, phase=phase) if outcome is not None and outcome.diagnostic_code != KernelDiagnosticCode.OK: self._write_anomaly( snapshot, decision, intent, anomaly=outcome.diagnostic_code.value, origin="ditav2_kernel", detail=outcome.details, ) if outcome is None: # Decision-only step (HOLD): state snapshot already written. return events = tuple(outcome.emitted_events or ()) has_fill_evt = any( e.kind in (KernelEventKind.FULL_FILL, KernelEventKind.PARTIAL_FILL) for e in events ) slot_closed = bool(slot.get("closed", False)) cur_size = _safe_float(slot.get("size", 0.0), 0.0) slot_open = (not slot_closed) and cur_size > 0.0 if decision.action == DecisionAction.ENTER: # Emit ENTRY_FILLED only once the entry is actually filled (fill event # or an open slot). A resting LIMIT entry emits nothing here. if has_fill_evt or slot_open: self._leg_state[intent.trade_id] = { "prev_realized": 0.0, "prev_size": _safe_float( slot.get("initial_size", slot.get("size", 0.0)), 0.0 ) or _safe_float(intent.target_size, 0.0), "prev_leg_id": "", } self._write_trade_reconstruction( snapshot, intent.trade_id, event_type="ENTRY_FILLED", event_id=f"{intent.trade_id}:entry", payload={ "decision": _decision_summary(decision), "intent": _intent_summary(intent), "outcome": _outcome_summary(outcome), "slot": slot, "market_state": _json_safe(market_state or {}), }, market_state=market_state, ) return if decision.action != DecisionAction.EXIT: return # An exit leg is evidenced by a fill event, a closed slot, or a drop in # remaining size vs the previous leg snapshot. A resting LIMIT exit (no # size change) emits nothing until the async-fill pump observes the fill. prev_size = _safe_float(self._leg_state.get(intent.trade_id, {}).get("prev_size", 0.0), 0.0) exit_filled = has_fill_evt or slot_closed or (prev_size - cur_size > 1e-12) if not exit_filled: return partial = (not slot_closed) and cur_size > 0.0 # Extract the fill price from emitted venue events (G21 fix): the actual # exchange fill price lives in the FULL_FILL/PARTIAL_FILL event, not in # the slot dict. Pass it explicitly so _write_trade_event does not fall # back to entry_price. fill_price_hint = 0.0 for ev in events: p_val = getattr(ev, "price", 0.0) if p_val and math.isfinite(float(p_val)) and float(p_val) > 0: fill_price_hint = float(p_val) break # One trade_exit_legs row per exit leg (partial or final), BLUE-schema # compatible so PINK multi-exit trades reconcile against the same table. self._write_trade_exit_leg(snapshot, decision, intent, slot, outcome, fill_price_hint=fill_price_hint) self._write_trade_reconstruction( snapshot, intent.trade_id, event_type="PARTIAL_EXIT" if partial else "EXIT", event_id=f"{intent.trade_id}:{'partial' if partial else 'close'}", payload={ "decision": _decision_summary(decision), "intent": _intent_summary(intent), "outcome": _outcome_summary(outcome), "slot": slot, "market_state": _json_safe(market_state or {}), }, market_state=market_state, ) # Terminal trade event. if slot_closed: self._write_trade_event(snapshot, decision, intent, slot, outcome, market_state=market_state, exit_price_hint=fill_price_hint) def persist_fill_events( self, *, snapshot: Any, events: Any, slot_dict: dict[str, Any] | None = None, market_state: Mapping[str, Any] | None = None, ) -> None: """Persist a late (async) venue fill drained by the runtime pump. There is no fresh policy decision for an async fill, so we synthesize a minimal Decision/Intent from the post-fill slot + event and route it through :meth:`persist_result`. Direction (ENTER vs EXIT) is inferred from the slot: a closed slot or a drop in remaining size vs the last leg snapshot is an EXIT; otherwise an opening fill is an ENTER. Capital authority remains the kernel — this only logs the settled result. """ slot = slot_dict or {} event_list = tuple(events or ()) trade_id = str(slot.get("trade_id") or "") asset = str(slot.get("asset") or "") side = self._slot_side(slot) closed = bool(slot.get("closed", False)) cur_size = self._slot_size(slot) leverage = _safe_float(slot.get("leverage", 1.0), 1.0) # Extract fill price from venue events (used as exit_price_hint for G21 fix). price = next( (float(getattr(e, "price", 0.0)) for e in event_list if getattr(e, "price", 0.0) and math.isfinite(float(getattr(e, "price", 0.0)))), 0.0, ) or self._slot_entry_price(slot) prev_size = _safe_float(self._leg_state.get(trade_id, {}).get("prev_size", 0.0), 0.0) is_exit = closed or (prev_size > 0.0 and cur_size < prev_size - 1e-12) action = DecisionAction.EXIT if is_exit else DecisionAction.ENTER ts = getattr(snapshot, "timestamp", datetime.now(timezone.utc)) decision = Decision( timestamp=ts, decision_id=trade_id or "async", asset=asset, action=action, side=side, reason="ASYNC_FILL", confidence=0.0, velocity_divergence=0.0, irp_alignment=0.0, reference_price=price, target_size=cur_size, leverage=leverage, stage=TradeStage.POSITION_UPDATED, metadata={}, ) intent = Intent( timestamp=ts, trade_id=trade_id, decision_id=trade_id or "async", asset=asset, action=action, side=side, reason="ASYNC_FILL", target_size=cur_size, leverage=leverage, reference_price=price, confidence=0.0, exit_leg_ratios=tuple(slot.get("exit_leg_ratios", (1.0,)) or (1.0,)), metadata={}, ) outcome = KernelOutcome( accepted=True, slot_id=int(slot.get("slot_id", 0) or 0), trade_id=trade_id, state=KernelStage.CLOSED if closed else KernelStage.POSITION_OPEN, diagnostic_code=KernelDiagnosticCode.OK, severity=KernelSeverity.INFO, transitions=(), emitted_events=event_list, details={"origin": "async_fill_pump"}, ) self.persist_result( snapshot=snapshot, decision=decision, intent=intent, outcome=outcome, slot_dict=slot, phase="async_fill", market_state=market_state, ) def persist_recovery_state( self, *, snapshot: Any, acc_dict: dict[str, Any] | None = None, phase: str = "recovery", event_type: str = "RECOVERY", market_state: Mapping[str, Any] | None = None, ) -> None: """Persist recovery-only state after kernel reconcile. A15 fix: acc_dict is the kernel account snapshot (capital/equity/pnl), not a slot dict. Read the actual slot from kernel.slot(0) so that trade_id, asset, size, and entry_price are correctly populated in the recovery rows. """ # A15: read slot from kernel instead of misusing acc_dict as slot dict. slot_dict: dict[str, Any] = {} if self._kernel is not None: try: slot_view = self._kernel.slot(0) raw = slot_view.to_dict() if hasattr(slot_view, "to_dict") else {} slot_dict = dict(raw) if raw else {} except Exception: pass trade_id = ( str(slot_dict.get("trade_id") or "") or (str(acc_dict.get("trade_id", "")) if acc_dict else "") ) self._write_status_snapshot( snapshot, decision=None, intent=None, slot_dict=slot_dict, phase=phase, ) self._write_account_event( snapshot, decision=None, intent=None, stage=TradeStage.TRADE_TERMINAL_WRITTEN, slot_dict=slot_dict, event_type=event_type, ) self._write_position_state( snapshot, decision=None, intent=None, slot_dict=slot_dict, stage=TradeStage.TRADE_TERMINAL_WRITTEN, status=self._state_label(slot_dict, phase), market_state=market_state, ) self._write_trade_reconstruction( snapshot, trade_id=trade_id, event_type=event_type, event_id=f"recovery:{phase}", payload={"acc_dict": _json_safe(acc_dict or {}), "slot": _json_safe(slot_dict), "phase": phase, "market_state": _json_safe(market_state or {})}, market_state=market_state, ) def record_anomaly( self, *, snapshot: Any, decision: Any, intent: Any, anomaly: str, origin: str = "emergent", sensor: str = "", detail: Any = "", rm_meta: float = 0.0, ) -> None: """Persist a DITA anomaly row with legacy-compatible shape.""" self._sink( "anomaly_events", { "ts": snapshot.timestamp.isoformat(), "decision_id": decision.decision_id, "trade_id": intent.trade_id, "symbol": intent.asset, "anomaly": anomaly, "origin": origin, "sensor": sensor, "detail": _json_text(detail) if not isinstance(detail, str) else detail, "rm_meta": float(rm_meta), }, ) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @staticmethod def _state_label(slot_dict: dict[str, Any], phase: str) -> str: if slot_dict.get("closed", False): return "CLOSED" if slot_dict.get("size", 0) > 0: if phase.lower().startswith("recovery"): return "RECOVERED_OPEN" return "OPEN" return "FLAT" def _posture(self, slot_dict: dict[str, Any]) -> str: if slot_dict.get("closed", False) or not slot_dict.get("size", 0): return "FLAT" return str(slot_dict.get("side", "FLAT")) def _slot_entry_price(self, slot_dict: dict[str, Any]) -> float: return _safe_float(slot_dict.get("entry_price", 0.0), 0.0) def _slot_size(self, slot_dict: dict[str, Any]) -> float: return _safe_float(slot_dict.get("size", 0.0), 0.0) def _slot_side(self, slot_dict: dict[str, Any]) -> TradeSide: raw = str(slot_dict.get("side", "FLAT")).upper() if raw == "SHORT": return TradeSide.SHORT if raw == "LONG": return TradeSide.LONG return TradeSide.FLAT def _slot_trade_id(self, slot_dict: dict[str, Any]) -> str: return str(slot_dict.get("trade_id", "")) def _slot_asset(self, slot_dict: dict[str, Any]) -> str: return str(slot_dict.get("asset", "")) # ------------------------------------------------------------------ # Row writers # ------------------------------------------------------------------ def _write_anomaly( self, snapshot: Any, decision: Decision, intent: Intent, *, anomaly: str, origin: str = "ditav2_kernel", detail: Any = "", ) -> None: self._sink("anomaly_events", { "ts": snapshot.timestamp.isoformat(), "decision_id": decision.decision_id, "trade_id": intent.trade_id, "symbol": intent.asset, "anomaly": anomaly, "origin": origin, "sensor": "", "detail": _json_text(detail) if not isinstance(detail, str) else detail, "rm_meta": 0.0, }) def _write_policy_event( self, snapshot: Any, decision: Decision, intent: Intent, *, phase: str, ) -> None: price = _safe_float(decision.reference_price, 0.0) quantity = _safe_float(intent.target_size, 0.0) row = { "ts": snapshot.timestamp.isoformat(), "strategy": self.config.strategy, "runtime_namespace": self.config.runtime_namespace, "strategy_namespace": self.config.strategy_namespace, "event_namespace": self.config.event_namespace, "actor_name": self.config.actor_name, "exec_venue": self.config.exec_venue, "data_venue": self.config.data_venue, "source": "ditav2", "trade_id": intent.trade_id, "asset": decision.asset, "side": decision.side.value, "entry_price": price, "current_price": price, "quantity": quantity, "notional": _notional(quantity, price), "leverage": _safe_float(intent.leverage, 1.0), "bar_idx": 0, "decision_seq": self._trade_seq(), "bars_held": int(intent.bars_held or 0), "action": decision.action.value, "reason": decision.reason, "pnl_pct": 0.0, "mfe": 0.0, "mae": 0.0, "mfe_risk": 0.0, "mae_risk": 0.0, "exit_pressure": 0.0, "rv_comp": 0.0, "mae_thresh1": 0.0, "bounce_score": 0.0, "bounce_risk": 0.0, "ob_imbalance": 0.0, "vel_div_entry": float(decision.velocity_divergence or 0.0), "vel_div_now": float(decision.velocity_divergence or 0.0), "v50_vel": 0.0, "v750_vel": 0.0, "exf_funding": 0.0, "exf_dvol": 0.0, "exf_fear_greed": 0.0, "exf_taker": 0.0, "posture": decision.side.value, } self._sink("policy_events", row) self._v7_sink("v7_decision_events", row) def _write_account_event( self, snapshot: Any, decision: Decision | None, intent: Intent | None, *, stage: TradeStage, slot_dict: dict[str, Any], event_type: str | None = None, ) -> None: capital = self._capital() peak_cap = self._peak_capital() is_open = not slot_dict.get("closed", False) and slot_dict.get("size", 0) > 0 open_notional = _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)) if is_open else 0.0 drawdown_pct = 0.0 if peak_cap <= 0 else max(0.0, (peak_cap - capital) / peak_cap) row = { "ts": snapshot.timestamp.isoformat(), "event_type": event_type or stage.value, "strategy": self.config.strategy, "posture": self._posture(slot_dict), "capital": capital, "peak_capital": peak_cap, "drawdown_pct": drawdown_pct, "pnl_today": float(self.account.snapshot.realized_pnl or 0.0), "trades_today": self._trade_seq(), "open_positions": 1 if is_open else 0, "boost": 1.0, "beta": 0.0, "current_open_notional": open_notional, "current_account_leverage": 0.0 if capital <= 0 else open_notional / capital, "exchange_leverage": int(round(_safe_float(slot_dict.get("leverage", 0.0), 0.0))), "exchange_leverage_mode": self.config.exchange_leverage_mode, "leverage_mapping_rule": self.config.leverage_mapping_rule, "runtime_namespace": self.config.runtime_namespace, "strategy_namespace": self.config.strategy_namespace, "event_namespace": self.config.event_namespace, "actor_name": self.config.actor_name, "exec_venue": self.config.exec_venue, "data_venue": self.config.data_venue, "notes": _json_text({ "decision_id": None if decision is None else decision.decision_id, "trade_id": None if intent is None else intent.trade_id, "reason": None if intent is None else intent.reason, "stage": stage.value, }), # Phase 4: kernel atomic account versioning "account_event_seq": self._account_event_seq(), "reconcile_status": self._kernel_account().get("reconcile_status", "OK"), } self._sink("account_events", row) # Write a reconcile_events record whenever E-facts are present ka = self._kernel_account() if ka.get("e_wallet_balance", 0.0) > 0: self.write_reconcile_event( event_seq=ka.get("event_seq", 0), ts=snapshot.timestamp, k_capital=ka.get("k_capital", 0.0), e_wallet_balance=ka.get("e_wallet_balance", 0.0), delta=ka.get("reconcile_delta", 0.0), status=ka.get("reconcile_status", "OK"), explanation=ka.get("reconcile_explanation", ""), ) def _write_position_state( self, snapshot: Any, decision: Decision | None, intent: Intent | None, *, slot_dict: dict[str, Any], stage: TradeStage, status: str, market_state: Mapping[str, Any] | None = None, ) -> None: side = self._slot_side(slot_dict) trade_id = self._slot_trade_id(slot_dict) asset = self._slot_asset(slot_dict) if not trade_id and intent is not None: trade_id = intent.trade_id asset = intent.asset side = intent.side row = { "ts": snapshot.timestamp.isoformat(), "trade_id": trade_id, "asset": asset, "direction": _direction(side), "entry_price": self._slot_entry_price(slot_dict), "quantity": self._slot_size(slot_dict), "notional": _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)), "leverage": _safe_float(slot_dict.get("leverage", 0.0), 0.0), "bucket_id": -1, # A14 fix: active_leg_index is the exit-leg counter, not the bar count. # Use intent.bars_held when available; fall back to 0. "entry_bar": int(intent.bars_held if intent is not None else 0) or 0, "status": status, "exit_reason": slot_dict.get("close_reason", ""), "pnl": _safe_float(slot_dict.get("realized_pnl", 0.0), 0.0), "bars_held": 0, "market_state_bundle_json": _json_text(market_state or {}), "tp_base_pct": 0.0, "tp_effective_pct": 0.0, "our_leverage": _safe_float(slot_dict.get("leverage", 0.0), 0.0), } self._sink("position_state", row) def _write_status_snapshot( self, snapshot: Any, decision: Decision | None, intent: Intent | None, *, slot_dict: dict[str, Any], phase: str, ) -> None: capital = self._capital() peak_cap = self._peak_capital() is_open = not slot_dict.get("closed", False) and slot_dict.get("size", 0) > 0 open_notional = _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)) if is_open else 0.0 leverage = 0.0 if capital <= 0 else open_notional / capital drawdown = 0.0 if peak_cap <= 0 else max(0.0, (peak_cap - capital) / peak_cap) row = { "ts": snapshot.timestamp.isoformat(timespec="milliseconds"), "capital": capital, "roi_pct": 0.0 if self.config.initial_capital <= 0 else ((capital / self.config.initial_capital) - 1.0) * 100.0, "dd_pct": drawdown * 100.0, "trades_executed": self._trade_seq(), "posture": self._posture(slot_dict), "rm": 1.0 if decision is None else max(0.0, min(1.0, decision.confidence)), "vel_div": 0.0 if decision is None else float(decision.velocity_divergence), "vol_ok": 1, "phase": phase, "mhs_status": "GREEN", "boost": 1.0, "cat5": 0.0, "conviction_multiplier": 0.0 if intent is None else float(intent.confidence or 0.0), "exchange_leverage": int(round(_safe_float(slot_dict.get("leverage", 0.0), 0.0))), "exchange_leverage_mode": self.config.exchange_leverage_mode, "leverage_mapping_rule": self.config.leverage_mapping_rule, "account_capital": capital, "portfolio_capital": capital, "current_open_notional": open_notional, "current_account_leverage": leverage, "remaining_notional_capacity": max(0.0, self.config.max_account_leverage * capital - open_notional), "max_account_leverage": self.config.max_account_leverage, "ledger_authority": self.config.ledger_authority, } self._sink("status_snapshots", row) def _write_trade_exit_leg( self, snapshot: Any, decision: Decision, intent: Intent, slot_dict: dict[str, Any], outcome: KernelOutcome | None, fill_price_hint: float = 0.0, ) -> None: """Emit one BLUE-schema-compatible ``trade_exit_legs`` row per exit leg. The DITAv2 kernel uses a single slot with sequential exit legs rather than BLUE's chained per-leg trade_ids, so the chain_* columns describe the leg sequence within this one trade (root = trade_id). Per-leg deltas (exit_qty, pnl_leg) are computed against the previous leg's snapshot held in ``self._leg_state`` so each row is isolated, not cumulative. """ trade_id = intent.trade_id prev = self._leg_state.get(trade_id) or { "prev_realized": 0.0, "prev_size": _safe_float(slot_dict.get("initial_size", 0.0), 0.0), "prev_leg_id": "", } entry_price = self._slot_entry_price(slot_dict) or _safe_float(intent.reference_price, 0.0) # G21 fix: use fill price hint (actual exchange fill) > intent ref > decision ref. exit_price = ( fill_price_hint or _safe_float(intent.reference_price, 0.0) or _safe_float(decision.reference_price, 0.0) ) side = self._slot_side(slot_dict) if side == TradeSide.FLAT: side = intent.side leverage_val = _safe_float(slot_dict.get("leverage", intent.leverage), 1.0) cur_size = self._slot_size(slot_dict) cur_realized = _checked_float(slot_dict.get("realized_pnl", 0.0), 0.0, field="realized_pnl", trade_id=trade_id, sink=self._sink) prev_size = _safe_float(prev.get("prev_size", 0.0), 0.0) prev_realized = _safe_float(prev.get("prev_realized", 0.0), 0.0) # active_leg_index is post-fill (already advanced); the leg that just # filled is therefore one behind. Clamp to a valid ratio index. ratios = slot_dict.get("exit_leg_ratios", []) or [] leg_index = max(0, int(slot_dict.get("active_leg_index", 0) or 0) - 1) fraction = _safe_float(ratios[leg_index], 0.0) if 0 <= leg_index < len(ratios) else 0.0 # External-position fix: if leg_state was never seeded (position detected via # reconcile/pump rather than our own ENTER), prev_size=0 would make exit_qty=0. # Fall back to initial_size or intent.target_size so the leg row is meaningful. if prev_size <= 1e-12: # No prior leg tracking — use the slot's initial_size or intent size. initial = _safe_float(slot_dict.get("initial_size", 0.0), 0.0) or _safe_float(intent.target_size, 0.0) exit_qty = initial - cur_size if initial > cur_size else initial else: exit_qty = max(0.0, prev_size - cur_size) pnl_leg = cur_realized - prev_realized capital_after = self._capital() capital_before = capital_after - pnl_leg exit_notional = _notional(exit_qty, exit_price or entry_price) remaining_notional = _notional(cur_size, entry_price) denom = abs(exit_qty * entry_price * max(leverage_val, 1e-9)) pnl_pct_leg = pnl_leg / denom if denom > 0 else 0.0 exit_leg_id = f"{trade_id}:leg{leg_index}" self._sink("trade_exit_legs", { "ts": snapshot.timestamp.isoformat(), "date": snapshot.timestamp.date().isoformat(), "strategy": self.config.strategy, "trade_id": trade_id, "chain_root_trade_id": trade_id, "chain_head_leg_id": f"{trade_id}:leg0", "chain_prev_leg_id": str(prev.get("prev_leg_id", "") or ""), "chain_seq": leg_index, "chain_token": trade_id, "chain_mode": "LIVE", "exit_leg_id": exit_leg_id, "exit_seq": leg_index, "command_id": decision.decision_id, "source": "ditav2", "reason": intent.reason, "asset": intent.asset, "side": side.value, "entry_price": entry_price, "exit_price": exit_price, "exit_qty": exit_qty, "fraction": fraction, "capital_before": capital_before, "capital_after": capital_after, "exit_notional": exit_notional, "remaining_notional": remaining_notional, "remaining_qty": cur_size, "pnl_pct_leg": pnl_pct_leg, "pnl_leg": pnl_leg, "pnl_realized_total": cur_realized, "bars_held": int(intent.bars_held or 0), }) # Advance the per-trade leg snapshot for the next leg's delta. self._leg_state[trade_id] = { "prev_realized": cur_realized, "prev_size": cur_size, "prev_leg_id": exit_leg_id, } def _write_trade_event( self, snapshot: Any, decision: Decision, intent: Intent, slot_dict: dict[str, Any], outcome: KernelOutcome | None, *, market_state: Mapping[str, Any] | None = None, exit_price_hint: float = 0.0, ) -> None: entry_price = _safe_float(slot_dict.get("entry_price", 0.0), 0.0) or _safe_float(intent.reference_price, 0.0) quantity = _safe_float(slot_dict.get("initial_size", slot_dict.get("size", 0.0)), 0.0) or _safe_float(intent.target_size, 0.0) # G21 fix: exit_price is the fill/order price, NOT the entry price. # Priority: explicit hint (fill event price) > intent reference price > decision price. # Fall back to entry_price only as absolute last resort (avoids the G21 bug where # every trade_events row had exit_price == entry_price and PnL reconstruction was zero). exit_price = ( exit_price_hint or _safe_float(intent.reference_price, 0.0) or _safe_float(decision.reference_price, 0.0) or _safe_float(slot_dict.get("entry_price", 0.0), 0.0) ) tid = intent.trade_id if intent is not None else "" pnl = _checked_float(slot_dict.get("realized_pnl", 0.0), 0.0, field="realized_pnl", trade_id=tid, sink=self._sink) pnl_pct = 0.0 leverage_val = _safe_float(slot_dict.get("leverage", intent.leverage), 1.0) denom = abs(quantity * entry_price * max(leverage_val, 1e-9)) if denom > 0: pnl_pct = pnl / denom capital_after = self._capital() capital_before = capital_after - pnl open_notional = _notional(quantity, exit_price or entry_price) conviction = float(intent.confidence or decision.confidence or 0.0) metadata = intent.metadata if intent is not None else (decision.metadata if decision is not None else {}) row = { "ts": snapshot.timestamp.isoformat(), "date": snapshot.timestamp.date().isoformat(), "strategy": self.config.strategy, "trade_id": intent.trade_id, "asset": intent.asset, "side": intent.side.value, "entry_price": entry_price, "exit_price": exit_price, "quantity": quantity, "pnl": pnl, "pnl_pct": pnl_pct, "exit_reason": intent.reason, "vel_div_entry": float(decision.velocity_divergence or 0.0), "boost_at_entry": 1.0, "beta_at_entry": 0.0, "posture": intent.side.value, "leverage": leverage_val, "conviction_multiplier": conviction, "exchange_leverage": int(round(leverage_val)), "exchange_leverage_mode": self.config.exchange_leverage_mode, "leverage_mapping_rule": self.config.leverage_mapping_rule, "runtime_namespace": self.config.runtime_namespace, "strategy_namespace": self.config.strategy_namespace, "event_namespace": self.config.event_namespace, "actor_name": self.config.actor_name, "exec_venue": self.config.exec_venue, "data_venue": self.config.data_venue, "account_capital": capital_after, "portfolio_capital": capital_after, "current_open_notional": open_notional, "remaining_notional_capacity": max(0.0, self.config.max_account_leverage * capital_after - open_notional), "max_account_leverage": self.config.max_account_leverage, "margin_required": 0.0 if leverage_val <= 0 else open_notional / leverage_val, "ledger_authority": self.config.ledger_authority, "regime_signal": 0, "capital_before": capital_before, "capital_after": capital_after, "peak_capital": self._peak_capital(), "drawdown_at_entry": 0.0 if self._peak_capital() <= 0 else max(0.0, (self._peak_capital() - capital_before) / self._peak_capital()), "open_positions_count": 0, "scan_uuid": decision.decision_id, "bars_held": int(intent.bars_held or 0), "entry_payload_json": _json_text({"decision": _decision_summary(decision), "intent": _intent_summary(intent)}), "exit_payload_json": _json_text({"outcome": _outcome_summary(outcome), "slot": _json_safe(slot_dict)}), "execution_payload_json": _json_text({"outcome": _outcome_summary(outcome)}), "friction_payload_json": _json_text({"fees": 0.0}), "event_payload_json": _json_text({"phase": "terminal_close", "trade_id": intent.trade_id}), "market_state_bundle_json": _json_text(market_state or {}), "tp_base_pct": _safe_float(metadata.get("tp_base_pct", 0.0), 0.0), "tp_effective_pct": _safe_float(metadata.get("tp_effective_pct", 0.0), 0.0), "our_leverage": _safe_float(metadata.get("our_leverage", 0.0), 0.0), } self._sink("trade_events", row) def _write_trade_reconstruction( self, snapshot: Any, trade_id: str, *, event_type: str, event_id: str, payload: Any, market_state: Mapping[str, Any] | None = None, ) -> None: self._sink("trade_reconstruction", { "ts": snapshot.timestamp.isoformat(), "trade_id": trade_id, "event_type": event_type, "event_id": event_id, "payload_json": _json_text(payload), "market_state_bundle_json": _json_text(market_state or {}), })