"""Account projection for DITAv2.""" from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Any, Dict, Iterable, List, Optional import math import time from .contracts import TradeSide, TradeSlot, TradeStage from .utils import safe_float @dataclass class AccountSnapshot: """Derived account state.""" capital: float equity: float realized_pnl: float = 0.0 unrealized_pnl: float = 0.0 open_positions: int = 0 open_notional: float = 0.0 fees_paid: float = 0.0 trade_seq: int = 0 peak_capital: float = 0.0 @property def leverage(self) -> float: if self.capital <= 0 or self.open_notional <= 0: return 0.0 return self.open_notional / self.capital @dataclass class AccountProjection: """Aggregate account view over all active slots.""" runtime_namespace: str = "dita_v2" strategy_namespace: str = "dita_v2" event_namespace: str = "dita_v2" actor_name: str = "ExecutionKernel" exec_venue: str = "bingx" data_venue: str = "binance" ledger_authority: str = "exchange" min_capital: float = 0.0 max_capital: Optional[float] = None snapshot: AccountSnapshot = field(default_factory=lambda: AccountSnapshot(capital=25_000.0, equity=25_000.0)) def observe_slots(self, slots: Iterable[TradeSlot]) -> None: open_positions = 0 open_notional = 0.0 unrealized_pnl = 0.0 for slot in slots: if slot.closed or slot.size <= 0: continue if slot.fsm_state in {TradeStage.POSITION_OPEN, TradeStage.POSITION_OPENED, TradeStage.ENTRY_WORKING, TradeStage.EXIT_WORKING}: open_positions += 1 mark = safe_float(slot.entry_price, 0.0) mark = safe_float(slot.metadata.get("mark_price"), mark) open_notional += abs(slot.size) * abs(mark) unrealized_pnl += float(slot.unrealized_pnl or 0.0) self.snapshot.open_positions = open_positions self.snapshot.open_notional = open_notional self.snapshot.unrealized_pnl = unrealized_pnl self.snapshot.equity = self.snapshot.capital + unrealized_pnl if not math.isfinite(self.snapshot.equity): self.snapshot.equity = self.snapshot.capital if open_notional > 0 and self.snapshot.capital > 0: self.snapshot.peak_capital = max(self.snapshot.peak_capital, self.snapshot.capital) def settle(self, realized_pnl: float, fees: float = 0.0) -> None: realized_pnl = safe_float(realized_pnl, 0.0) new_capital = safe_float(self.snapshot.capital + realized_pnl, self.snapshot.capital) if self.max_capital is not None: new_capital = min(new_capital, self.max_capital) new_capital = max(self.min_capital, new_capital) self.snapshot.capital = new_capital self.snapshot.realized_pnl += realized_pnl self.snapshot.fees_paid += safe_float(fees, 0.0) self.snapshot.equity = self.snapshot.capital + self.snapshot.unrealized_pnl if not math.isfinite(self.snapshot.equity): self.snapshot.equity = self.snapshot.capital def to_account_event( self, *, timestamp: datetime, trade_id: str, asset: str, side: TradeSide, stage: TradeStage, reason: str, pnl: float = 0.0, pnl_pct: float = 0.0, bars_held: int = 0, metadata: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: self.snapshot.equity = self.snapshot.capital + self.snapshot.unrealized_pnl return { "timestamp": timestamp.isoformat() if hasattr(timestamp, "isoformat") else str(timestamp), "runtime_namespace": self.runtime_namespace, "strategy_namespace": self.strategy_namespace, "event_namespace": self.event_namespace, "actor_name": self.actor_name, "exec_venue": self.exec_venue, "data_venue": self.data_venue, "ledger_authority": self.ledger_authority, "capital": float(self.snapshot.capital), "equity": float(self.snapshot.equity), "open_positions": int(self.snapshot.open_positions), "current_open_notional": float(self.snapshot.open_notional), "current_account_leverage": float(self.snapshot.leverage), "trade_id": trade_id, "asset": asset, "side": side.value, "reason": reason, "stage": stage.value, "pnl": float(pnl), "pnl_pct": float(pnl_pct), "bars_held": int(bars_held), "metadata": dict(metadata or {}), } # --------------------------------------------------------------------------- # V2 — Dual-ledger, event-sourced, reconciled account (spec G2) # --------------------------------------------------------------------------- class ReconcileStatus(str, Enum): OK = "OK" WARN = "WARN" ERROR = "ERROR" @dataclass(frozen=True) class KBlock: """Kernel-computed values — derived deterministically from the E-fact stream.""" capital: float = 0.0 # seed + Σrealized − Σfee − Σfunding realized_pnl: float = 0.0 unrealized_pnl: float = 0.0 fees_paid: float = 0.0 funding_paid: float = 0.0 open_notional: float = 0.0 # Σ|qty|·mark equity: float = 0.0 # capital + unrealized used_margin: float = 0.0 # Σ notional/leverage available_margin: float = 0.0 # capital − used_margin open_positions: int = 0 peak_capital: float = 0.0 @dataclass(frozen=True) class EPosition: """Single open position as reported by the exchange.""" symbol: str = "" qty: float = 0.0 entry_price: float = 0.0 mark_price: float = 0.0 unrealized_pnl: float = 0.0 leverage: float = 1.0 side: str = "" @dataclass(frozen=True) class EBlock: """Exchange facts — values only the exchange can know.""" wallet_balance: float = 0.0 available_margin: float = 0.0 used_margin: float = 0.0 maint_margin: float = 0.0 positions: tuple = () # tuple[EPosition, ...] last_fill_price: float = 0.0 last_fill_qty: float = 0.0 last_fill_fee: float = 0.0 last_fill_realized_pnl: float = 0.0 last_funding: float = 0.0 @dataclass(frozen=True) class ReconcileResult: """Classification of K-vs-E divergence for one snapshot.""" status: ReconcileStatus = ReconcileStatus.OK deltas: Dict[str, float] = field(default_factory=dict) explanations: List[str] = field(default_factory=list) worst_field: str = "" ts: float = 0.0 def __post_init__(self) -> None: # frozen dataclass — use object.__setattr__ only in __post_init__ if not isinstance(self.deltas, dict): object.__setattr__(self, "deltas", {}) if not isinstance(self.explanations, list): object.__setattr__(self, "explanations", []) @dataclass(frozen=True) class AccountSnapshotV2: """ Immutable versioned snapshot — the atomic unit of account truth. Each exchange event produces exactly one new snapshot; readers hold a reference and are never exposed to a partially-updated state. """ event_seq: int source_event_id: str k: KBlock e: EBlock reconcile: ReconcileResult ts: float = 0.0 @dataclass class ReconcileConfig: """ Bounds for the R1–R6 reconcile rules. All values are config-driven; no magic numbers in the classifier itself. """ capital_epsilon: float = 1e-4 # |δ| < ε → OK (R1, absolute USDT) pending_fee_bound: float = 20.0 # max unsettled fees still in-flight (R1) realized_rounding: float = 0.05 # fee+rounding tolerance for R2 lot_step: float = 0.001 # position qty lot-step for R3 mark_staleness_factor: float = 0.003 # 0.3% mark-price drift tolerance (R4) leverage_rounding_band: float = 2.0 # margin rounding band USDT (R5) def _safe(v: Any, default: float = 0.0) -> float: try: f = float(v) return f if math.isfinite(f) else default except (TypeError, ValueError): return default class AccountProjectionV2: """ Dual-ledger account — tracks K-values (kernel fold) and E-facts (exchange push) independently, reconciles each event, and publishes immutable AccountSnapshotV2 instances. Thread-safety note: Python's GIL makes reference replacement of `_snapshot` atomic for single-field reads. For multi-field consistency callers must hold `_snapshot` locally: `snap = proj.snapshot`. """ def __init__( self, seed_capital: float, *, min_capital: float = 0.0, max_capital: Optional[float] = None, reconcile_config: Optional[ReconcileConfig] = None, ) -> None: self._seed = _safe(seed_capital, 0.0) self._min_capital = min_capital self._max_capital = max_capital self._cfg = reconcile_config or ReconcileConfig() # Running K-value accumulators self._k_realized: float = 0.0 self._k_fees: float = 0.0 self._k_funding: float = 0.0 self._peak_capital: float = self._seed # Latest E-facts (mutable intermediate; frozen into EBlock at snapshot time) self._e_wallet_balance: float = 0.0 self._e_avail_margin: float = 0.0 self._e_used_margin: float = 0.0 self._e_maint_margin: float = 0.0 self._e_positions: List[EPosition] = [] self._e_last_fill_price: float = 0.0 self._e_last_fill_qty: float = 0.0 self._e_last_fill_fee: float = 0.0 self._e_last_fill_realized: float = 0.0 self._e_last_funding: float = 0.0 self._event_seq: int = 0 self._snapshot: AccountSnapshotV2 = self._build(0, "", [], time.time()) # ------------------------------------------------------------------ # E-fact ingestion (called from WS event handlers) # ------------------------------------------------------------------ def apply_fill( self, *, fill_price: float, fill_qty: float, fee: float, realized_pnl: float, ) -> None: self._k_realized += _safe(realized_pnl) self._k_fees += _safe(fee) self._e_last_fill_price = _safe(fill_price) self._e_last_fill_qty = _safe(fill_qty) self._e_last_fill_fee = _safe(fee) self._e_last_fill_realized = _safe(realized_pnl) def apply_funding(self, amount: float) -> None: self._k_funding += _safe(amount) self._e_last_funding = _safe(amount) def apply_balance_update( self, *, wallet_balance: float, available_margin: float, used_margin: float, maint_margin: float, ) -> None: self._e_wallet_balance = _safe(wallet_balance) self._e_avail_margin = _safe(available_margin) self._e_used_margin = _safe(used_margin) self._e_maint_margin = _safe(maint_margin) def apply_position_update(self, positions: List[EPosition]) -> None: self._e_positions = list(positions) # ------------------------------------------------------------------ # Snapshot construction (called after each ingestion step) # ------------------------------------------------------------------ def build_snapshot( self, source_event_id: str, slots: Iterable[TradeSlot], ts: Optional[float] = None, ) -> AccountSnapshotV2: self._event_seq += 1 snap = self._build(self._event_seq, source_event_id, list(slots), ts or time.time()) self._snapshot = snap return snap @property def snapshot(self) -> AccountSnapshotV2: return self._snapshot @property def k_capital(self) -> float: raw = self._seed + self._k_realized - self._k_fees - self._k_funding if self._max_capital is not None: raw = min(raw, self._max_capital) return max(self._min_capital, raw) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _build( self, event_seq: int, source_event_id: str, slots: List[TradeSlot], ts: float, ) -> AccountSnapshotV2: open_notional, unrealized, used_margin, open_positions = self._scan_slots(slots) capital = self.k_capital self._peak_capital = max(self._peak_capital, capital) k = KBlock( capital=capital, realized_pnl=self._k_realized, unrealized_pnl=unrealized, fees_paid=self._k_fees, funding_paid=self._k_funding, open_notional=open_notional, equity=capital + unrealized, used_margin=used_margin, available_margin=max(0.0, capital - used_margin), open_positions=open_positions, peak_capital=self._peak_capital, ) e = EBlock( wallet_balance=self._e_wallet_balance, available_margin=self._e_avail_margin, used_margin=self._e_used_margin, maint_margin=self._e_maint_margin, positions=tuple(self._e_positions), last_fill_price=self._e_last_fill_price, last_fill_qty=self._e_last_fill_qty, last_fill_fee=self._e_last_fill_fee, last_fill_realized_pnl=self._e_last_fill_realized, last_funding=self._e_last_funding, ) reconcile = self._classify(k, e, ts) return AccountSnapshotV2( event_seq=event_seq, source_event_id=source_event_id, k=k, e=e, reconcile=reconcile, ts=ts, ) def _scan_slots( self, slots: List[TradeSlot] ) -> tuple: # (open_notional, unrealized, used_margin, open_count) open_notional = 0.0 unrealized = 0.0 used_margin = 0.0 open_positions = 0 for slot in slots: if slot.closed or slot.size <= 0: continue if slot.fsm_state not in { TradeStage.POSITION_OPEN, TradeStage.POSITION_OPENED, TradeStage.ENTRY_WORKING, TradeStage.EXIT_WORKING, }: continue open_positions += 1 mark = _safe(slot.metadata.get("mark_price") if slot.metadata else None, 0.0) if mark <= 0.0: mark = _safe(slot.entry_price, 0.0) notional = abs(slot.size) * mark open_notional += notional unrealized += _safe(slot.unrealized_pnl) lev = max(1.0, _safe(slot.metadata.get("leverage") if slot.metadata else None, 1.0)) used_margin += notional / lev return open_notional, unrealized, used_margin, open_positions def _classify(self, k: KBlock, e: EBlock, ts: float) -> ReconcileResult: """ Apply reconcile rules R1–R6 (spec §2.3). Returns a ReconcileResult with the worst status seen across all fields. """ cfg = self._cfg status = ReconcileStatus.OK deltas: Dict[str, float] = {} explanations: List[str] = [] worst_field = "" def _escalate(new: ReconcileStatus, field: str) -> None: nonlocal status, worst_field order = {ReconcileStatus.OK: 0, ReconcileStatus.WARN: 1, ReconcileStatus.ERROR: 2} if order[new] > order[status]: status = new worst_field = field # R1: capital vs wallet balance (only meaningful when E-facts are populated) if e.wallet_balance > 0: delta_r1 = abs(k.capital - e.wallet_balance) deltas["capital_vs_wallet"] = k.capital - e.wallet_balance if delta_r1 <= cfg.capital_epsilon: pass # OK elif delta_r1 <= cfg.pending_fee_bound: _escalate(ReconcileStatus.WARN, "capital_vs_wallet") explanations.append(f"UNSETTLED_FEE|capital_vs_wallet|delta={delta_r1:.4f}") else: _escalate(ReconcileStatus.ERROR, "capital_vs_wallet") explanations.append(f"ERROR|capital_vs_wallet|delta={delta_r1:.4f}") # R2: realized PnL vs exchange realized if e.last_fill_realized_pnl != 0: delta_r2 = abs(k.realized_pnl - e.last_fill_realized_pnl) deltas["realized_pnl"] = k.realized_pnl - e.last_fill_realized_pnl if delta_r2 <= cfg.capital_epsilon: pass elif delta_r2 <= cfg.realized_rounding: _escalate(ReconcileStatus.WARN, "realized_pnl") explanations.append(f"LOT_STEP_ROUNDING|realized_pnl|delta={delta_r2:.4f}") else: _escalate(ReconcileStatus.ERROR, "realized_pnl") explanations.append(f"ERROR|realized_pnl|delta={delta_r2:.4f}") # R3: position count (R6) + per-position qty (R3) e_pos_map = {p.symbol: p for p in e.positions} if len(e.positions) > 0: if k.open_positions != len(e_pos_map): deltas["open_positions"] = float(k.open_positions - len(e_pos_map)) _escalate(ReconcileStatus.ERROR, "open_positions") explanations.append( f"ERROR|open_positions|k={k.open_positions}|e={len(e_pos_map)}" ) # R4: open_notional vs exchange notional (mark staleness) if e.used_margin > 0 and k.open_notional > 0: delta_notional = abs(k.open_notional - e.used_margin) deltas["open_notional"] = k.open_notional - e.used_margin staleness_band = k.open_notional * cfg.mark_staleness_factor if delta_notional <= cfg.capital_epsilon: pass elif delta_notional <= staleness_band: _escalate(ReconcileStatus.WARN, "open_notional") explanations.append(f"MARK_PRICE_STALENESS|open_notional|delta={delta_notional:.4f}") else: _escalate(ReconcileStatus.ERROR, "open_notional") explanations.append(f"ERROR|open_notional|delta={delta_notional:.4f}") # R5: used/available margin if e.used_margin > 0: delta_margin = abs(k.used_margin - e.used_margin) deltas["used_margin"] = k.used_margin - e.used_margin if delta_margin <= cfg.capital_epsilon: pass elif delta_margin <= cfg.leverage_rounding_band: _escalate(ReconcileStatus.WARN, "used_margin") explanations.append(f"LEVERAGE_ROUNDING|used_margin|delta={delta_margin:.4f}") else: _escalate(ReconcileStatus.ERROR, "used_margin") explanations.append(f"ERROR|used_margin|delta={delta_margin:.4f}") return ReconcileResult( status=status, deltas=deltas, explanations=explanations, worst_field=worst_field, ts=ts, )