Files
siloqy/prod/clean_arch/dita_v2/account.py

510 lines
19 KiB
Python
Raw Normal View History

"""Account projection for DITAv2."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Iterable, List, Optional
import math
import time
from .contracts import TradeSide, TradeSlot, TradeStage
from .utils import safe_float
@dataclass
class AccountSnapshot:
"""Derived account state."""
capital: float
equity: float
realized_pnl: float = 0.0
unrealized_pnl: float = 0.0
open_positions: int = 0
open_notional: float = 0.0
fees_paid: float = 0.0
trade_seq: int = 0
peak_capital: float = 0.0
@property
def leverage(self) -> float:
if self.capital <= 0 or self.open_notional <= 0:
return 0.0
return self.open_notional / self.capital
@dataclass
class AccountProjection:
"""Aggregate account view over all active slots."""
runtime_namespace: str = "dita_v2"
strategy_namespace: str = "dita_v2"
event_namespace: str = "dita_v2"
actor_name: str = "ExecutionKernel"
exec_venue: str = "bingx"
data_venue: str = "binance"
ledger_authority: str = "exchange"
min_capital: float = 0.0
max_capital: Optional[float] = None
snapshot: AccountSnapshot = field(default_factory=lambda: AccountSnapshot(capital=25_000.0, equity=25_000.0))
def observe_slots(self, slots: Iterable[TradeSlot]) -> None:
open_positions = 0
open_notional = 0.0
unrealized_pnl = 0.0
for slot in slots:
if slot.closed or slot.size <= 0:
continue
if slot.fsm_state in {TradeStage.POSITION_OPEN, TradeStage.POSITION_OPENED, TradeStage.ENTRY_WORKING, TradeStage.EXIT_WORKING}:
open_positions += 1
mark = safe_float(slot.entry_price, 0.0)
mark = safe_float(slot.metadata.get("mark_price"), mark)
open_notional += abs(slot.size) * abs(mark)
unrealized_pnl += float(slot.unrealized_pnl or 0.0)
self.snapshot.open_positions = open_positions
self.snapshot.open_notional = open_notional
self.snapshot.unrealized_pnl = unrealized_pnl
self.snapshot.equity = self.snapshot.capital + unrealized_pnl
if not math.isfinite(self.snapshot.equity):
self.snapshot.equity = self.snapshot.capital
if open_notional > 0 and self.snapshot.capital > 0:
self.snapshot.peak_capital = max(self.snapshot.peak_capital, self.snapshot.capital)
def settle(self, realized_pnl: float, fees: float = 0.0) -> None:
realized_pnl = safe_float(realized_pnl, 0.0)
new_capital = safe_float(self.snapshot.capital + realized_pnl, self.snapshot.capital)
if self.max_capital is not None:
new_capital = min(new_capital, self.max_capital)
new_capital = max(self.min_capital, new_capital)
self.snapshot.capital = new_capital
self.snapshot.realized_pnl += realized_pnl
self.snapshot.fees_paid += safe_float(fees, 0.0)
self.snapshot.equity = self.snapshot.capital + self.snapshot.unrealized_pnl
if not math.isfinite(self.snapshot.equity):
self.snapshot.equity = self.snapshot.capital
def to_account_event(
self,
*,
timestamp: datetime,
trade_id: str,
asset: str,
side: TradeSide,
stage: TradeStage,
reason: str,
pnl: float = 0.0,
pnl_pct: float = 0.0,
bars_held: int = 0,
metadata: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
self.snapshot.equity = self.snapshot.capital + self.snapshot.unrealized_pnl
return {
"timestamp": timestamp.isoformat() if hasattr(timestamp, "isoformat") else str(timestamp),
"runtime_namespace": self.runtime_namespace,
"strategy_namespace": self.strategy_namespace,
"event_namespace": self.event_namespace,
"actor_name": self.actor_name,
"exec_venue": self.exec_venue,
"data_venue": self.data_venue,
"ledger_authority": self.ledger_authority,
"capital": float(self.snapshot.capital),
"equity": float(self.snapshot.equity),
"open_positions": int(self.snapshot.open_positions),
"current_open_notional": float(self.snapshot.open_notional),
"current_account_leverage": float(self.snapshot.leverage),
"trade_id": trade_id,
"asset": asset,
"side": side.value,
"reason": reason,
"stage": stage.value,
"pnl": float(pnl),
"pnl_pct": float(pnl_pct),
"bars_held": int(bars_held),
"metadata": dict(metadata or {}),
}
# ---------------------------------------------------------------------------
# V2 — Dual-ledger, event-sourced, reconciled account (spec G2)
# ---------------------------------------------------------------------------
class ReconcileStatus(str, Enum):
OK = "OK"
WARN = "WARN"
ERROR = "ERROR"
@dataclass(frozen=True)
class KBlock:
"""Kernel-computed values — derived deterministically from the E-fact stream."""
capital: float = 0.0 # seed + Σrealized Σfee Σfunding
realized_pnl: float = 0.0
unrealized_pnl: float = 0.0
fees_paid: float = 0.0
funding_paid: float = 0.0
open_notional: float = 0.0 # Σ|qty|·mark
equity: float = 0.0 # capital + unrealized
used_margin: float = 0.0 # Σ notional/leverage
available_margin: float = 0.0 # capital used_margin
open_positions: int = 0
peak_capital: float = 0.0
@dataclass(frozen=True)
class EPosition:
"""Single open position as reported by the exchange."""
symbol: str = ""
qty: float = 0.0
entry_price: float = 0.0
mark_price: float = 0.0
unrealized_pnl: float = 0.0
leverage: float = 1.0
side: str = ""
@dataclass(frozen=True)
class EBlock:
"""Exchange facts — values only the exchange can know."""
wallet_balance: float = 0.0
available_margin: float = 0.0
used_margin: float = 0.0
maint_margin: float = 0.0
positions: tuple = () # tuple[EPosition, ...]
last_fill_price: float = 0.0
last_fill_qty: float = 0.0
last_fill_fee: float = 0.0
last_fill_realized_pnl: float = 0.0
last_funding: float = 0.0
@dataclass(frozen=True)
class ReconcileResult:
"""Classification of K-vs-E divergence for one snapshot."""
status: ReconcileStatus = ReconcileStatus.OK
deltas: Dict[str, float] = field(default_factory=dict)
explanations: List[str] = field(default_factory=list)
worst_field: str = ""
ts: float = 0.0
def __post_init__(self) -> None:
# frozen dataclass — use object.__setattr__ only in __post_init__
if not isinstance(self.deltas, dict):
object.__setattr__(self, "deltas", {})
if not isinstance(self.explanations, list):
object.__setattr__(self, "explanations", [])
@dataclass(frozen=True)
class AccountSnapshotV2:
"""
Immutable versioned snapshot the atomic unit of account truth.
Each exchange event produces exactly one new snapshot; readers hold
a reference and are never exposed to a partially-updated state.
"""
event_seq: int
source_event_id: str
k: KBlock
e: EBlock
reconcile: ReconcileResult
ts: float = 0.0
@dataclass
class ReconcileConfig:
"""
Bounds for the R1R6 reconcile rules. All values are config-driven;
no magic numbers in the classifier itself.
"""
capital_epsilon: float = 1e-4 # |δ| < ε → OK (R1, absolute USDT)
pending_fee_bound: float = 20.0 # max unsettled fees still in-flight (R1)
realized_rounding: float = 0.05 # fee+rounding tolerance for R2
lot_step: float = 0.001 # position qty lot-step for R3
mark_staleness_factor: float = 0.003 # 0.3% mark-price drift tolerance (R4)
leverage_rounding_band: float = 2.0 # margin rounding band USDT (R5)
def _safe(v: Any, default: float = 0.0) -> float:
try:
f = float(v)
return f if math.isfinite(f) else default
except (TypeError, ValueError):
return default
class AccountProjectionV2:
"""
Dual-ledger account tracks K-values (kernel fold) and E-facts
(exchange push) independently, reconciles each event, and publishes
immutable AccountSnapshotV2 instances.
Thread-safety note: Python's GIL makes reference replacement of
`_snapshot` atomic for single-field reads. For multi-field consistency
callers must hold `_snapshot` locally: `snap = proj.snapshot`.
"""
def __init__(
self,
seed_capital: float,
*,
min_capital: float = 0.0,
max_capital: Optional[float] = None,
reconcile_config: Optional[ReconcileConfig] = None,
) -> None:
self._seed = _safe(seed_capital, 0.0)
self._min_capital = min_capital
self._max_capital = max_capital
self._cfg = reconcile_config or ReconcileConfig()
# Running K-value accumulators
self._k_realized: float = 0.0
self._k_fees: float = 0.0
self._k_funding: float = 0.0
self._peak_capital: float = self._seed
# Latest E-facts (mutable intermediate; frozen into EBlock at snapshot time)
self._e_wallet_balance: float = 0.0
self._e_avail_margin: float = 0.0
self._e_used_margin: float = 0.0
self._e_maint_margin: float = 0.0
self._e_positions: List[EPosition] = []
self._e_last_fill_price: float = 0.0
self._e_last_fill_qty: float = 0.0
self._e_last_fill_fee: float = 0.0
self._e_last_fill_realized: float = 0.0
self._e_last_funding: float = 0.0
self._event_seq: int = 0
self._snapshot: AccountSnapshotV2 = self._build(0, "", [], time.time())
# ------------------------------------------------------------------
# E-fact ingestion (called from WS event handlers)
# ------------------------------------------------------------------
def apply_fill(
self,
*,
fill_price: float,
fill_qty: float,
fee: float,
realized_pnl: float,
) -> None:
self._k_realized += _safe(realized_pnl)
self._k_fees += _safe(fee)
self._e_last_fill_price = _safe(fill_price)
self._e_last_fill_qty = _safe(fill_qty)
self._e_last_fill_fee = _safe(fee)
self._e_last_fill_realized = _safe(realized_pnl)
def apply_funding(self, amount: float) -> None:
self._k_funding += _safe(amount)
self._e_last_funding = _safe(amount)
def apply_balance_update(
self,
*,
wallet_balance: float,
available_margin: float,
used_margin: float,
maint_margin: float,
) -> None:
self._e_wallet_balance = _safe(wallet_balance)
self._e_avail_margin = _safe(available_margin)
self._e_used_margin = _safe(used_margin)
self._e_maint_margin = _safe(maint_margin)
def apply_position_update(self, positions: List[EPosition]) -> None:
self._e_positions = list(positions)
# ------------------------------------------------------------------
# Snapshot construction (called after each ingestion step)
# ------------------------------------------------------------------
def build_snapshot(
self,
source_event_id: str,
slots: Iterable[TradeSlot],
ts: Optional[float] = None,
) -> AccountSnapshotV2:
self._event_seq += 1
snap = self._build(self._event_seq, source_event_id, list(slots), ts or time.time())
self._snapshot = snap
return snap
@property
def snapshot(self) -> AccountSnapshotV2:
return self._snapshot
@property
def k_capital(self) -> float:
raw = self._seed + self._k_realized - self._k_fees - self._k_funding
if self._max_capital is not None:
raw = min(raw, self._max_capital)
return max(self._min_capital, raw)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _build(
self,
event_seq: int,
source_event_id: str,
slots: List[TradeSlot],
ts: float,
) -> AccountSnapshotV2:
open_notional, unrealized, used_margin, open_positions = self._scan_slots(slots)
capital = self.k_capital
self._peak_capital = max(self._peak_capital, capital)
k = KBlock(
capital=capital,
realized_pnl=self._k_realized,
unrealized_pnl=unrealized,
fees_paid=self._k_fees,
funding_paid=self._k_funding,
open_notional=open_notional,
equity=capital + unrealized,
used_margin=used_margin,
available_margin=max(0.0, capital - used_margin),
open_positions=open_positions,
peak_capital=self._peak_capital,
)
e = EBlock(
wallet_balance=self._e_wallet_balance,
available_margin=self._e_avail_margin,
used_margin=self._e_used_margin,
maint_margin=self._e_maint_margin,
positions=tuple(self._e_positions),
last_fill_price=self._e_last_fill_price,
last_fill_qty=self._e_last_fill_qty,
last_fill_fee=self._e_last_fill_fee,
last_fill_realized_pnl=self._e_last_fill_realized,
last_funding=self._e_last_funding,
)
reconcile = self._classify(k, e, ts)
return AccountSnapshotV2(
event_seq=event_seq,
source_event_id=source_event_id,
k=k,
e=e,
reconcile=reconcile,
ts=ts,
)
def _scan_slots(
self, slots: List[TradeSlot]
) -> tuple: # (open_notional, unrealized, used_margin, open_count)
open_notional = 0.0
unrealized = 0.0
used_margin = 0.0
open_positions = 0
for slot in slots:
if slot.closed or slot.size <= 0:
continue
if slot.fsm_state not in {
TradeStage.POSITION_OPEN,
TradeStage.POSITION_OPENED,
TradeStage.ENTRY_WORKING,
TradeStage.EXIT_WORKING,
}:
continue
open_positions += 1
mark = _safe(slot.metadata.get("mark_price") if slot.metadata else None, 0.0)
if mark <= 0.0:
mark = _safe(slot.entry_price, 0.0)
notional = abs(slot.size) * mark
open_notional += notional
unrealized += _safe(slot.unrealized_pnl)
lev = max(1.0, _safe(slot.metadata.get("leverage") if slot.metadata else None, 1.0))
used_margin += notional / lev
return open_notional, unrealized, used_margin, open_positions
def _classify(self, k: KBlock, e: EBlock, ts: float) -> ReconcileResult:
"""
Apply reconcile rules R1R6 (spec §2.3).
Returns a ReconcileResult with the worst status seen across all fields.
"""
cfg = self._cfg
status = ReconcileStatus.OK
deltas: Dict[str, float] = {}
explanations: List[str] = []
worst_field = ""
def _escalate(new: ReconcileStatus, field: str) -> None:
nonlocal status, worst_field
order = {ReconcileStatus.OK: 0, ReconcileStatus.WARN: 1, ReconcileStatus.ERROR: 2}
if order[new] > order[status]:
status = new
worst_field = field
# R1: capital vs wallet balance (only meaningful when E-facts are populated)
if e.wallet_balance > 0:
delta_r1 = abs(k.capital - e.wallet_balance)
deltas["capital_vs_wallet"] = k.capital - e.wallet_balance
if delta_r1 <= cfg.capital_epsilon:
pass # OK
elif delta_r1 <= cfg.pending_fee_bound:
_escalate(ReconcileStatus.WARN, "capital_vs_wallet")
explanations.append(f"UNSETTLED_FEE|capital_vs_wallet|delta={delta_r1:.4f}")
else:
_escalate(ReconcileStatus.ERROR, "capital_vs_wallet")
explanations.append(f"ERROR|capital_vs_wallet|delta={delta_r1:.4f}")
# R2: realized PnL vs exchange realized
if e.last_fill_realized_pnl != 0:
delta_r2 = abs(k.realized_pnl - e.last_fill_realized_pnl)
deltas["realized_pnl"] = k.realized_pnl - e.last_fill_realized_pnl
if delta_r2 <= cfg.capital_epsilon:
pass
elif delta_r2 <= cfg.realized_rounding:
_escalate(ReconcileStatus.WARN, "realized_pnl")
explanations.append(f"LOT_STEP_ROUNDING|realized_pnl|delta={delta_r2:.4f}")
else:
_escalate(ReconcileStatus.ERROR, "realized_pnl")
explanations.append(f"ERROR|realized_pnl|delta={delta_r2:.4f}")
# R3: position count (R6) + per-position qty (R3)
e_pos_map = {p.symbol: p for p in e.positions}
if len(e.positions) > 0:
if k.open_positions != len(e_pos_map):
deltas["open_positions"] = float(k.open_positions - len(e_pos_map))
_escalate(ReconcileStatus.ERROR, "open_positions")
explanations.append(
f"ERROR|open_positions|k={k.open_positions}|e={len(e_pos_map)}"
)
# R4: open_notional vs exchange notional (mark staleness)
if e.used_margin > 0 and k.open_notional > 0:
delta_notional = abs(k.open_notional - e.used_margin)
deltas["open_notional"] = k.open_notional - e.used_margin
staleness_band = k.open_notional * cfg.mark_staleness_factor
if delta_notional <= cfg.capital_epsilon:
pass
elif delta_notional <= staleness_band:
_escalate(ReconcileStatus.WARN, "open_notional")
explanations.append(f"MARK_PRICE_STALENESS|open_notional|delta={delta_notional:.4f}")
else:
_escalate(ReconcileStatus.ERROR, "open_notional")
explanations.append(f"ERROR|open_notional|delta={delta_notional:.4f}")
# R5: used/available margin
if e.used_margin > 0:
delta_margin = abs(k.used_margin - e.used_margin)
deltas["used_margin"] = k.used_margin - e.used_margin
if delta_margin <= cfg.capital_epsilon:
pass
elif delta_margin <= cfg.leverage_rounding_band:
_escalate(ReconcileStatus.WARN, "used_margin")
explanations.append(f"LEVERAGE_ROUNDING|used_margin|delta={delta_margin:.4f}")
else:
_escalate(ReconcileStatus.ERROR, "used_margin")
explanations.append(f"ERROR|used_margin|delta={delta_margin:.4f}")
return ReconcileResult(
status=status,
deltas=deltas,
explanations=explanations,
worst_field=worst_field,
ts=ts,
)