"""Account projection for DITAv2.""" from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime from enum import Enum from typing import Any, Dict, Iterable, List, Optional import math import time from .contracts import TradeSide, TradeSlot, TradeStage from .utils import safe_float @dataclass class AccountSnapshot: """Derived account state.""" capital: float equity: float realized_pnl: float = 0.0 unrealized_pnl: float = 0.0 open_positions: int = 0 open_notional: float = 0.0 fees_paid: float = 0.0 trade_seq: int = 0 peak_capital: float = 0.0 # E-anchored provenance (Phase 1): "seed" | "e_anchored" | "k_bridged" capital_source: str = "seed" e_wallet_balance: float = 0.0 event_seq: int = 0 @property def leverage(self) -> float: if self.capital <= 0 or self.open_notional <= 0: return 0.0 return self.open_notional / self.capital @dataclass class AccountProjection: """Aggregate account view over all active slots.""" runtime_namespace: str = "dita_v2" strategy_namespace: str = "dita_v2" event_namespace: str = "dita_v2" actor_name: str = "ExecutionKernel" exec_venue: str = "bingx" data_venue: str = "binance" ledger_authority: str = "exchange" min_capital: float = 0.0 max_capital: Optional[float] = None snapshot: AccountSnapshot = field(default_factory=lambda: AccountSnapshot(capital=25_000.0, equity=25_000.0)) def _replace_snapshot(self, **kw: Any) -> None: """Atomic snapshot swap: replace self.snapshot with a new frozen AccountSnapshot. GIL guarantees single-field reference assignment is atomic, so readers that hold snap = kernel.account.snapshot before use see a consistent view. """ cur = self.snapshot self.snapshot = AccountSnapshot( capital=kw.get("capital", cur.capital), equity=kw.get("equity", cur.equity), realized_pnl=kw.get("realized_pnl", cur.realized_pnl), unrealized_pnl=kw.get("unrealized_pnl", cur.unrealized_pnl), open_positions=kw.get("open_positions", cur.open_positions), open_notional=kw.get("open_notional", cur.open_notional), fees_paid=kw.get("fees_paid", cur.fees_paid), trade_seq=kw.get("trade_seq", cur.trade_seq), peak_capital=kw.get("peak_capital", cur.peak_capital), capital_source=kw.get("capital_source", cur.capital_source), e_wallet_balance=kw.get("e_wallet_balance", cur.e_wallet_balance), event_seq=kw.get("event_seq", cur.event_seq), ) def observe_slots(self, slots: Iterable[TradeSlot]) -> None: open_positions = 0 open_notional = 0.0 unrealized_pnl = 0.0 for slot in slots: if slot.closed or slot.size <= 0: continue if slot.fsm_state in {TradeStage.POSITION_OPEN, TradeStage.POSITION_OPENED, TradeStage.ENTRY_WORKING, TradeStage.EXIT_WORKING}: open_positions += 1 mark = safe_float(slot.entry_price, 0.0) mark = safe_float(slot.metadata.get("mark_price"), mark) open_notional += abs(slot.size) * abs(mark) unrealized_pnl += float(slot.unrealized_pnl or 0.0) self._replace_snapshot( open_positions=open_positions, open_notional=open_notional, unrealized_pnl=unrealized_pnl, equity=self.snapshot.capital + unrealized_pnl if math.isfinite(self.snapshot.capital + unrealized_pnl) else self.snapshot.capital, peak_capital=max(self.snapshot.peak_capital, self.snapshot.capital) if open_notional > 0 and self.snapshot.capital > 0 else self.snapshot.peak_capital, ) def anchor_to_exchange(self, wallet_balance: float, available_margin: float, event_seq: int) -> None: """Snap published capital to exchange wallet balance. The exchange is the ledger of record (E-anchored). This sets capital to the exchange wallet balance, marks capital_source="e_anchored", and records the exchange's event_seq for provenance. Between anchors settle() bridges using capital_source="k_bridged". Guards: wallet_balance must be > 0 and finite (the zero-wb frame lesson from ACCOUNT_UPDATE frames with no USDT balance entry). """ wb = safe_float(wallet_balance, 0.0) if wb <= 0.0 or not math.isfinite(wb): return self.snapshot.capital = wb self.snapshot.e_wallet_balance = wb self.snapshot.capital_source = "e_anchored" self.snapshot.event_seq = int(event_seq) self.snapshot.equity = wb + self.snapshot.unrealized_pnl if not math.isfinite(self.snapshot.equity): self.snapshot.equity = wb self.snapshot.peak_capital = max(self.snapshot.peak_capital, wb) def settle(self, realized_pnl: float, fees: float = 0.0) -> None: rp = safe_float(realized_pnl, 0.0) # Include fees in capital delta (today fees only accumulate in # fees_paid while published capital ignores them between reseeds). net = rp - safe_float(fees, 0.0) new_capital = safe_float(self.snapshot.capital + net, self.snapshot.capital) if self.max_capital is not None: new_capital = min(new_capital, self.max_capital) new_capital = max(self.min_capital, new_capital) new_source = self.snapshot.capital_source if new_source == "e_anchored" and abs(net) > 1e-12: new_source = "k_bridged" new_fees = self.snapshot.fees_paid + safe_float(fees, 0.0) new_equity = new_capital + self.snapshot.unrealized_pnl if not math.isfinite(new_equity): new_equity = new_capital self._replace_snapshot( capital=new_capital, capital_source=new_source, realized_pnl=self.snapshot.realized_pnl + rp, fees_paid=new_fees, equity=new_equity, ) def to_account_event( self, *, timestamp: datetime, trade_id: str, asset: str, side: TradeSide, stage: TradeStage, reason: str, pnl: float = 0.0, pnl_pct: float = 0.0, bars_held: int = 0, metadata: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: self.snapshot.equity = self.snapshot.capital + self.snapshot.unrealized_pnl return { "timestamp": timestamp.isoformat() if hasattr(timestamp, "isoformat") else str(timestamp), "runtime_namespace": self.runtime_namespace, "strategy_namespace": self.strategy_namespace, "event_namespace": self.event_namespace, "actor_name": self.actor_name, "exec_venue": self.exec_venue, "data_venue": self.data_venue, "ledger_authority": self.ledger_authority, "capital": float(self.snapshot.capital), "equity": float(self.snapshot.equity), "open_positions": int(self.snapshot.open_positions), "current_open_notional": float(self.snapshot.open_notional), "current_account_leverage": float(self.snapshot.leverage), "trade_id": trade_id, "asset": asset, "side": side.value, "reason": reason, "stage": stage.value, "pnl": float(pnl), "pnl_pct": float(pnl_pct), "bars_held": int(bars_held), "metadata": dict(metadata or {}), } # --------------------------------------------------------------------------- # V2 — Dual-ledger, event-sourced, reconciled account (spec G2) # --------------------------------------------------------------------------- class ReconcileStatus(str, Enum): OK = "OK" WARN = "WARN" ERROR = "ERROR" @dataclass(frozen=True) class KBlock: """Kernel-computed values — derived deterministically from the E-fact stream.""" capital: float = 0.0 # seed + Σrealized − Σfee − Σfunding realized_pnl: float = 0.0 unrealized_pnl: float = 0.0 fees_paid: float = 0.0 funding_paid: float = 0.0 open_notional: float = 0.0 # Σ|qty|·mark equity: float = 0.0 # capital + unrealized used_margin: float = 0.0 # Σ notional/leverage available_margin: float = 0.0 # capital − used_margin open_positions: int = 0 peak_capital: float = 0.0 @dataclass(frozen=True) class EPosition: """Single open position as reported by the exchange.""" symbol: str = "" qty: float = 0.0 entry_price: float = 0.0 mark_price: float = 0.0 unrealized_pnl: float = 0.0 leverage: float = 1.0 side: str = "" @dataclass(frozen=True) class EBlock: """Exchange facts — values only the exchange can know.""" wallet_balance: float = 0.0 available_margin: float = 0.0 used_margin: float = 0.0 maint_margin: float = 0.0 positions: tuple = () # tuple[EPosition, ...] last_fill_price: float = 0.0 last_fill_qty: float = 0.0 last_fill_fee: float = 0.0 last_fill_realized_pnl: float = 0.0 last_funding: float = 0.0 @dataclass(frozen=True) class ReconcileResult: """Classification of K-vs-E divergence for one snapshot.""" status: ReconcileStatus = ReconcileStatus.OK deltas: Dict[str, float] = field(default_factory=dict) explanations: List[str] = field(default_factory=list) worst_field: str = "" ts: float = 0.0 def __post_init__(self) -> None: # frozen dataclass — use object.__setattr__ only in __post_init__ if not isinstance(self.deltas, dict): object.__setattr__(self, "deltas", {}) if not isinstance(self.explanations, list): object.__setattr__(self, "explanations", []) @dataclass(frozen=True) class AccountSnapshotV2: """ Immutable versioned snapshot — the atomic unit of account truth. Each exchange event produces exactly one new snapshot; readers hold a reference and are never exposed to a partially-updated state. """ event_seq: int source_event_id: str k: KBlock e: EBlock reconcile: ReconcileResult ts: float = 0.0 @dataclass class ReconcileConfig: """ Bounds for the R1–R6 reconcile rules. All values are config-driven; no magic numbers in the classifier itself. """ capital_epsilon: float = 1e-4 # |δ| < ε → OK (R1, absolute USDT) pending_fee_bound: float = 20.0 # max unsettled fees still in-flight (R1) realized_rounding: float = 0.05 # fee+rounding tolerance for R2 lot_step: float = 0.001 # position qty lot-step for R3 mark_staleness_factor: float = 0.003 # 0.3% mark-price drift tolerance (R4) leverage_rounding_band: float = 2.0 # margin rounding band USDT (R5) def _safe(v: Any, default: float = 0.0) -> float: try: f = float(v) return f if math.isfinite(f) else default except (TypeError, ValueError): return default class AccountProjectionV2: """ Dual-ledger account — tracks K-values (kernel fold) and E-facts (exchange push) independently, reconciles each event, and publishes immutable AccountSnapshotV2 instances. Thread-safety note: Python's GIL makes reference replacement of `_snapshot` atomic for single-field reads. For multi-field consistency callers must hold `_snapshot` locally: `snap = proj.snapshot`. """ def __init__( self, seed_capital: float, *, min_capital: float = 0.0, max_capital: Optional[float] = None, reconcile_config: Optional[ReconcileConfig] = None, ) -> None: self._seed = _safe(seed_capital, 0.0) self._min_capital = min_capital self._max_capital = max_capital self._cfg = reconcile_config or ReconcileConfig() # Running K-value accumulators self._k_realized: float = 0.0 self._k_fees: float = 0.0 self._k_funding: float = 0.0 self._peak_capital: float = self._seed # Latest E-facts (mutable intermediate; frozen into EBlock at snapshot time) self._e_wallet_balance: float = 0.0 self._e_avail_margin: float = 0.0 self._e_used_margin: float = 0.0 self._e_maint_margin: float = 0.0 self._e_positions: List[EPosition] = [] self._e_last_fill_price: float = 0.0 self._e_last_fill_qty: float = 0.0 self._e_last_fill_fee: float = 0.0 self._e_last_fill_realized: float = 0.0 self._e_last_funding: float = 0.0 self._event_seq: int = 0 self._snapshot: AccountSnapshotV2 = self._build(0, "", [], time.time()) # ------------------------------------------------------------------ # E-fact ingestion (called from WS event handlers) # ------------------------------------------------------------------ def apply_fill( self, *, fill_price: float, fill_qty: float, fee: float, realized_pnl: float, ) -> None: self._k_realized += _safe(realized_pnl) self._k_fees += _safe(fee) self._e_last_fill_price = _safe(fill_price) self._e_last_fill_qty = _safe(fill_qty) self._e_last_fill_fee = _safe(fee) self._e_last_fill_realized = _safe(realized_pnl) def apply_funding(self, amount: float) -> None: self._k_funding += _safe(amount) self._e_last_funding = _safe(amount) def apply_balance_update( self, *, wallet_balance: float, available_margin: float, used_margin: float, maint_margin: float, ) -> None: self._e_wallet_balance = _safe(wallet_balance) self._e_avail_margin = _safe(available_margin) self._e_used_margin = _safe(used_margin) self._e_maint_margin = _safe(maint_margin) def apply_position_update(self, positions: List[EPosition]) -> None: self._e_positions = list(positions) # ------------------------------------------------------------------ # Snapshot construction (called after each ingestion step) # ------------------------------------------------------------------ def build_snapshot( self, source_event_id: str, slots: Iterable[TradeSlot], ts: Optional[float] = None, ) -> AccountSnapshotV2: self._event_seq += 1 snap = self._build(self._event_seq, source_event_id, list(slots), ts or time.time()) self._snapshot = snap return snap @property def snapshot(self) -> AccountSnapshotV2: return self._snapshot @property def k_capital(self) -> float: raw = self._seed + self._k_realized - self._k_fees - self._k_funding if self._max_capital is not None: raw = min(raw, self._max_capital) return max(self._min_capital, raw) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _build( self, event_seq: int, source_event_id: str, slots: List[TradeSlot], ts: float, ) -> AccountSnapshotV2: open_notional, unrealized, used_margin, open_positions = self._scan_slots(slots) capital = self.k_capital self._peak_capital = max(self._peak_capital, capital) k = KBlock( capital=capital, realized_pnl=self._k_realized, unrealized_pnl=unrealized, fees_paid=self._k_fees, funding_paid=self._k_funding, open_notional=open_notional, equity=capital + unrealized, used_margin=used_margin, available_margin=max(0.0, capital - used_margin), open_positions=open_positions, peak_capital=self._peak_capital, ) e = EBlock( wallet_balance=self._e_wallet_balance, available_margin=self._e_avail_margin, used_margin=self._e_used_margin, maint_margin=self._e_maint_margin, positions=tuple(self._e_positions), last_fill_price=self._e_last_fill_price, last_fill_qty=self._e_last_fill_qty, last_fill_fee=self._e_last_fill_fee, last_fill_realized_pnl=self._e_last_fill_realized, last_funding=self._e_last_funding, ) reconcile = self._classify(k, e, ts) return AccountSnapshotV2( event_seq=event_seq, source_event_id=source_event_id, k=k, e=e, reconcile=reconcile, ts=ts, ) def _scan_slots( self, slots: List[TradeSlot] ) -> tuple: # (open_notional, unrealized, used_margin, open_count) open_notional = 0.0 unrealized = 0.0 used_margin = 0.0 open_positions = 0 for slot in slots: if slot.closed or slot.size <= 0: continue if slot.fsm_state not in { TradeStage.POSITION_OPEN, TradeStage.POSITION_OPENED, TradeStage.ENTRY_WORKING, TradeStage.EXIT_WORKING, }: continue open_positions += 1 mark = _safe(slot.metadata.get("mark_price") if slot.metadata else None, 0.0) if mark <= 0.0: mark = _safe(slot.entry_price, 0.0) notional = abs(slot.size) * mark open_notional += notional unrealized += _safe(slot.unrealized_pnl) lev = max(1.0, _safe(slot.metadata.get("leverage") if slot.metadata else None, 1.0)) used_margin += notional / lev return open_notional, unrealized, used_margin, open_positions def _classify(self, k: KBlock, e: EBlock, ts: float) -> ReconcileResult: """ Apply reconcile rules R1–R6 (spec §2.3). Returns a ReconcileResult with the worst status seen across all fields. """ cfg = self._cfg status = ReconcileStatus.OK deltas: Dict[str, float] = {} explanations: List[str] = [] worst_field = "" def _escalate(new: ReconcileStatus, field: str) -> None: nonlocal status, worst_field order = {ReconcileStatus.OK: 0, ReconcileStatus.WARN: 1, ReconcileStatus.ERROR: 2} if order[new] > order[status]: status = new worst_field = field # R1: capital vs wallet balance (only meaningful when E-facts are populated) if e.wallet_balance > 0: delta_r1 = abs(k.capital - e.wallet_balance) deltas["capital_vs_wallet"] = k.capital - e.wallet_balance if delta_r1 <= cfg.capital_epsilon: pass # OK elif delta_r1 <= cfg.pending_fee_bound: _escalate(ReconcileStatus.WARN, "capital_vs_wallet") explanations.append(f"UNSETTLED_FEE|capital_vs_wallet|delta={delta_r1:.4f}") else: _escalate(ReconcileStatus.ERROR, "capital_vs_wallet") explanations.append(f"ERROR|capital_vs_wallet|delta={delta_r1:.4f}") # R2: realized PnL vs exchange realized if e.last_fill_realized_pnl != 0: delta_r2 = abs(k.realized_pnl - e.last_fill_realized_pnl) deltas["realized_pnl"] = k.realized_pnl - e.last_fill_realized_pnl if delta_r2 <= cfg.capital_epsilon: pass elif delta_r2 <= cfg.realized_rounding: _escalate(ReconcileStatus.WARN, "realized_pnl") explanations.append(f"LOT_STEP_ROUNDING|realized_pnl|delta={delta_r2:.4f}") else: _escalate(ReconcileStatus.ERROR, "realized_pnl") explanations.append(f"ERROR|realized_pnl|delta={delta_r2:.4f}") # R3: position count (R6) + per-position qty (R3) e_pos_map = {p.symbol: p for p in e.positions} if len(e.positions) > 0: if k.open_positions != len(e_pos_map): deltas["open_positions"] = float(k.open_positions - len(e_pos_map)) _escalate(ReconcileStatus.ERROR, "open_positions") explanations.append( f"ERROR|open_positions|k={k.open_positions}|e={len(e_pos_map)}" ) # R4: open_notional vs exchange notional (mark staleness) if e.used_margin > 0 and k.open_notional > 0: delta_notional = abs(k.open_notional - e.used_margin) deltas["open_notional"] = k.open_notional - e.used_margin staleness_band = k.open_notional * cfg.mark_staleness_factor if delta_notional <= cfg.capital_epsilon: pass elif delta_notional <= staleness_band: _escalate(ReconcileStatus.WARN, "open_notional") explanations.append(f"MARK_PRICE_STALENESS|open_notional|delta={delta_notional:.4f}") else: _escalate(ReconcileStatus.ERROR, "open_notional") explanations.append(f"ERROR|open_notional|delta={delta_notional:.4f}") # R5: used/available margin if e.used_margin > 0: delta_margin = abs(k.used_margin - e.used_margin) deltas["used_margin"] = k.used_margin - e.used_margin if delta_margin <= cfg.capital_epsilon: pass elif delta_margin <= cfg.leverage_rounding_band: _escalate(ReconcileStatus.WARN, "used_margin") explanations.append(f"LEVERAGE_ROUNDING|used_margin|delta={delta_margin:.4f}") else: _escalate(ReconcileStatus.ERROR, "used_margin") explanations.append(f"ERROR|used_margin|delta={delta_margin:.4f}") return ReconcileResult( status=status, deltas=deltas, explanations=explanations, worst_field=worst_field, ts=ts, )