diff --git a/prod/clean_arch/adapters/bingx_direct.py b/prod/clean_arch/adapters/bingx_direct.py index 715a72a..7f1c9f5 100644 --- a/prod/clean_arch/adapters/bingx_direct.py +++ b/prod/clean_arch/adapters/bingx_direct.py @@ -10,11 +10,13 @@ import asyncio import json import logging import math +import time import uuid -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime, timezone from decimal import Decimal, ROUND_DOWN -from typing import Any, Optional +from pathlib import Path +from typing import Any, Dict, Optional, Tuple from nautilus_trader.model.identifiers import InstrumentId @@ -194,6 +196,37 @@ class BingxDirectExecutionAdapter(ExecutionPort): self._state: ExchangeStateSnapshot | None = None self._connected = False + # ── S1: Leverage cache ──────────────────────────────────────────────── + # Maps symbol → last successfully set leverage (int). + # Avoids a ~350ms leverage POST before every order when leverage is unchanged. + # + # Cache key is symbol only (not runner_id) because leverage is an + # ACCOUNT-LEVEL setting on BingX — one value per symbol per account. + # IMPORTANT CONTRACT: if multiple runners share this account and request + # DIFFERENT leverages for the same symbol concurrently, the last writer + # wins at the exchange and the other runner's order executes at wrong + # leverage. Callers MUST ensure leverage is uniform across runners for + # any given symbol when sharing an account, or use separate accounts. + # This cache only eliminates the redundant round-trip; it cannot resolve + # the underlying multi-runner semantic conflict. + # + # Per-symbol asyncio.Lock: prevents concurrent submit_intent calls for the + # same symbol from interleaving leverage POST + cache update, which would + # create a window where the cache says "leverage set" but the POST hasn't + # completed yet. Lock scope is deliberately tight (only the POST + cache + # write), not the entire submit_intent, to avoid head-of-line blocking. + self._leverage_cache: Dict[str, int] = {} + self._leverage_locks: Dict[str, asyncio.Lock] = {} + # Persist path: survives process restarts within a session, not across + # reboots (leverage should be re-verified from exchange on cold start). + env_tag = "live" if getattr(self._config, "environment", None) and \ + str(getattr(self._config, "environment", "")).upper() == "LIVE" else "vst" + self._leverage_cache_path = Path(f"/tmp/.bingx_leverage_cache_{env_tag}.json") + self._load_leverage_cache() + + # ── S2: Background state refresh tracking ───────────────────────────── + self._state_refreshed_at: float = 0.0 # monotonic seconds + @property def state(self) -> ExchangeStateSnapshot | None: return self._state @@ -202,12 +235,122 @@ class BingxDirectExecutionAdapter(ExecutionPort): await self._provider.initialize() self._connected = True self._state = await self.refresh_state() + # S4/S1: on reconnect, verify cached leverage matches exchange truth. + # Drift happens when another process/runner changed leverage on the same account. + await self._verify_leverage_drift() return True async def disconnect(self) -> None: self._connected = False await self._client.close() + # ── S1: Leverage cache helpers ──────────────────────────────────────────── + + def _load_leverage_cache(self) -> None: + """Load persisted leverage cache from JSON sidecar. Ignores errors.""" + try: + raw = json.loads(self._leverage_cache_path.read_text()) + self._leverage_cache = { + str(k): int(v) + for k, v in raw.items() + if isinstance(v, (int, float)) and math.isfinite(float(v)) and float(v) >= 1 + } + LOGGER.debug("leverage cache loaded: %s", self._leverage_cache) + except Exception: + self._leverage_cache = {} + + def _persist_leverage_cache(self) -> None: + """Flush leverage cache to JSON sidecar. Non-fatal on failure.""" + try: + self._leverage_cache_path.write_text( + json.dumps(self._leverage_cache, indent=2) + ) + except Exception as exc: + LOGGER.warning("leverage cache persist failed: %s", exc) + + async def _ensure_leverage(self, symbol: str, leverage: int) -> bool: + """Set leverage for symbol only if the cached value differs. + + Returns True if a POST was made (leverage was changed), False if skipped. + The asyncio.Lock per symbol ensures concurrent submit_intent calls for the + same symbol never interleave leverage POST and cache update — preventing + the heisenbug where two orders both think they set leverage but one runs + at the wrong value because the other's POST arrived last. + + Cache is updated ONLY on successful POST. A failed POST leaves the cache + stale, so the next submit retries — correct conservative behaviour. + """ + lock = self._leverage_locks.setdefault(symbol, asyncio.Lock()) + async with lock: + cached = self._leverage_cache.get(symbol) + if cached == leverage: + return False # exchange already at requested value — skip POST + + try: + await self._client.signed_post( + "/openApi/swap/v2/trade/leverage", + {"symbol": symbol, "side": "BOTH", "leverage": leverage}, + ) + prev = self._leverage_cache.get(symbol) + self._leverage_cache[symbol] = leverage + self._persist_leverage_cache() + LOGGER.info( + "leverage SET symbol=%s %s→%d", + symbol, f"{prev}→" if prev is not None else "", leverage, + ) + return True + except Exception as exc: + LOGGER.warning( + "leverage POST failed symbol=%s lev=%d: %s — " + "cache NOT updated, will retry on next submit", + symbol, leverage, exc, + ) + return False # do NOT cache — retry guarantees correctness + + async def _verify_leverage_drift(self) -> None: + """On connect/reconnect, compare cached leverage with exchange reality. + + Drift occurs when another process or runner changed leverage on the same + account while this adapter was offline. Log it loudly; update cache to + exchange truth so next submit does not re-set to the wrong value. + """ + for symbol, cached_lev in list(self._leverage_cache.items()): + try: + resp = await self._client.signed_get( + "/openApi/swap/v2/trade/leverage", {"symbol": symbol} + ) + exchange_lev = int( + float(resp.get("longLeverage") or resp.get("leverage") or 0) + ) + if exchange_lev > 0 and exchange_lev != cached_lev: + LOGGER.warning( + "LEVERAGE DRIFT symbol=%s cached=%d exchange=%d — " + "another process may have changed it; cache updated to exchange truth", + symbol, cached_lev, exchange_lev, + ) + self._leverage_cache[symbol] = exchange_lev + except Exception as exc: + LOGGER.debug("leverage drift check failed symbol=%s: %s", symbol, exc) + self._persist_leverage_cache() + + # ── S2: Background state refresh ───────────────────────────────────────── + + async def _refresh_state_background(self, symbol: str) -> None: + """Background task: refresh internal state after a synchronous MARKET fill. + + MARKET fills deliver fill_price / executedQty in the ACK; capital and + position truth arrive via WS FILL_SETTLED + ACCOUNT_UPDATE. This REST + poll is belt-and-suspenders — catches edge cases such as unexpected + concurrent fills or exchange position ledger lag. It does not block the + submit path. + """ + try: + self._state = await self._refresh_exchange_state(symbol, include_history=False) + self._state_refreshed_at = time.monotonic() + LOGGER.debug("background state refresh complete symbol=%s", symbol) + except Exception as exc: + LOGGER.warning("background state refresh failed symbol=%s: %s", symbol, exc) + def _resolve_instrument(self, asset: str): normalized = _normalize_symbol(asset) candidates = [ @@ -354,13 +497,65 @@ class BingxDirectExecutionAdapter(ExecutionPort): async def refresh_state(self, symbol: str | None = None, *, include_history: bool = False) -> ExchangeStateSnapshot: return await self._refresh_exchange_state(symbol, include_history=include_history) + # ── S3 — Maker order placement (CRITICAL TODO) ──────────────────────────── + # Currently all orders are MARKET (taker, 0.04% fee). For exits especially, + # a LIMIT order at bid+1tick (SHORT close) or ask-1tick (LONG close) with + # reduceOnly=true can fill as maker (0.02% fee) — 50% fee saving per trade. + # + # Design requirements before this is safe to enable: + # + # 1. SHORT/LONG awareness (CRITICAL): + # SHORT close → BUY side → LIMIT at bid+1tick (slightly above bid to rest on book) + # LONG close → SELL side → LIMIT at ask-1tick + # SHORT enter → SELL side → LIMIT at ask-1tick (only if signal allows waiting) + # LONG enter → BUY side → LIMIT at bid+1tick + # + # 2. OBF integration: the OBF subsystem tracks order book depth and emits + # spread guidance. Use OBF.spread_bps and OBF.bid_depth / ask_depth to + # decide whether a maker order is viable (tight spread = fast fill, + # thin book = price impact of own order too large). + # + # 3. TODO_ADD_PARAMSET_VIBRISS: The "calm market" threshold (currently + # hardcoded as spread_bps < 5) MUST be VIBRISS-calibrated. VIBRISS + # governs our own assessment of market microstructure quality. Some + # invariants may remain hardcoded (e.g. "never use maker if spread > 50bps") + # but the normal operating range should be a VIBRISS metaparameter. + # See: prod/vibriss/ for the param-set schema. + # + # 4. Timeout calibration: timeout_s cannot be a fixed constant. It must + # reflect: (a) the strategy's max adverse excursion budget for the position, + # (b) the expected fill velocity from OBF order-flow data, (c) the remaining + # time before the signal's max_hold threshold. A resting limit that misses + # its cancel window creates an orphaned order — a hard risk control failure. + # + # 5. Price impact awareness (future, not yet operational): + # For large notionals, our own LIMIT order changes the book. At some notional + # threshold, the maker price improvement is eaten by the impact of our own + # resting order. The OBF subsystem must estimate this before we switch from + # MARKET to LIMIT. This is the "price impact of our own order" problem. + # + # CRITICAL TODO: Abstraction. The entire order placement logic (MARKET/LIMIT/ + # TWAP/VWAP/Iceberg, maker/taker selection, price impact estimation, timeout/ + # fallback, cancel-replace) MUST eventually be a dedicated "Smart Order Router" + # (SOR) subsystem. submit_intent() should be a thin dispatcher into the SOR, + # not a monolith. This is a pre-condition for multi-venue support, co-location + # optimisation, and regulatory reporting. + # + # GAP in characterisation: why does BingX separate REST (order placement) from + # WS (fill events)? Almost certainly historical: REST came first for audit- + # trail reasons (HMAC-signed requests → deterministic replay), WS push came + # later for low-latency fills. The FIX protocol unifies these under one + # connection — that is the right long-term model and the reason venue-agnostic + # abstraction via the VenueAdapter protocol is critical. Any future venue + # (Bybit, OKX, Binance) will have the same REST/WS split. + # ───────────────────────────────────────────────────────────────────────── + async def submit_intent(self, intent: Intent) -> ExecutionReceipt: symbol = self._instrument_venue_symbol(intent.asset) if intent.action == DecisionAction.EXIT: side = "SELL" if intent.side == TradeSide.LONG else "BUY" else: side = "BUY" if intent.side == TradeSide.LONG else "SELL" - # Entries must be free to open the slot; only exits are reduce-only. reduce_only = bool(intent.action == DecisionAction.EXIT) if reduce_only: self._exit_client_order_seq += 1 @@ -372,25 +567,28 @@ class BingxDirectExecutionAdapter(ExecutionPort): int(round(float(intent.leverage or self._config.default_leverage))), exchange_max=self._config.exchange_leverage_cap, ) - try: - await self._client.signed_post( - "/openApi/swap/v2/trade/leverage", - {"symbol": symbol, "side": "BOTH", "leverage": leverage}, - ) - except Exception as _lev_exc: - # W: leverage POST failed — order will execute at whatever leverage the - # exchange currently has for this symbol. Log prominently; do NOT abort - # the submit because the order may still succeed at the right leverage. - import logging as _logging - _logging.getLogger(__name__).warning( - "BingX leverage set failed (symbol=%s lev=%s): %s — proceeding with submit", - symbol, leverage, _lev_exc, - ) + + # S1: leverage cache — skip POST when exchange already has the right value. + await self._ensure_leverage(symbol, leverage) + + # Capture mark price BEFORE the order POST for slippage measurement (Gap 3). + # This is the most honest reference: the market state at decision time. + mark_at_submit = 0.0 + submit_ts_ms = int(time.time() * 1000) + if self._state is not None: + for _sym_key, _pos in self._state.open_positions.items(): + _mark = float(_pos.get("markPrice") or _pos.get("avgPrice") or 0.0) + if _mark > 0: + mark_at_submit = _mark + break + if mark_at_submit <= 0: + # No open position for mark — use the last known mark from any symbol + # as a rough reference (e.g. for fresh ENTER with no position yet) + pass # remains 0.0; slippage will be 0 + try: # Honor the order type forwarded by the venue adapter - # (bingx_venue._legacy_intent sets _order_type/_limit_price). MARKET - # is the default; a LIMIT carries a resting price + GTC and will not - # fill synchronously — the async-fill pump settles it later. + # (bingx_venue._legacy_intent encodes _order_type/_limit_price). order_type = str((intent.metadata or {}).get("_order_type", "MARKET") or "MARKET").upper() limit_price = float((intent.metadata or {}).get("_limit_price", 0.0) or 0.0) is_limit = order_type == "LIMIT" and limit_price > 0.0 @@ -422,8 +620,12 @@ class BingxDirectExecutionAdapter(ExecutionPort): fill_price = value break if fill_price <= 0 and self._state is not None: - # Use the last known exchange mark as a fallback for projected accounting. - fill_price = next((float(row.get("markPrice") or row.get("avgPrice") or 0.0) for row in self._state.open_positions.values() if float(row.get("markPrice") or row.get("avgPrice") or 0.0) > 0), 0.0) + fill_price = next( + (float(row.get("markPrice") or row.get("avgPrice") or 0.0) + for row in self._state.open_positions.values() + if float(row.get("markPrice") or row.get("avgPrice") or 0.0) > 0), + 0.0, + ) except BingxHttpError as exc: status = "RATE_LIMITED" if _is_rate_limited_error(exc) else "REJECTED" ack_row = { @@ -434,6 +636,52 @@ class BingxDirectExecutionAdapter(ExecutionPort): } fill_price = 0.0 ack = None + is_limit = False + + # ── Gap 2: fee estimation (ESTIMATED_TAKER / ESTIMATED_MAKER) ──────── + # BingX REST ACK does not include commission. WS FILL_SETTLED will deliver + # the actual fee later and update the fee_source to "WS_SETTLED". + # Until then, log an estimate so CH rows are never blank on this field. + fill_qty = float(ack_row.get("executedQty") or ack_row.get("filledQty") or + getattr(intent, "target_size", 0.0) or 0.0) + if is_limit: + # LIMIT orders *may* rest and fill as maker — optimistic estimate. + fee_rate = 0.0002 # BingX perpetuals maker fee 0.02% + fee_source = "ESTIMATED_MAKER" + is_maker_est = True + else: + fee_rate = 0.0004 # BingX perpetuals taker fee 0.04% + fee_source = "ESTIMATED_TAKER" + is_maker_est = False + estimated_fee = fill_price * fill_qty * fee_rate if fill_price > 0 and fill_qty > 0 else 0.0 + + # ── Gap 3: slippage (signed, vs mark_at_submit) ─────────────────────── + # Positive = worse than mark (taker overpays), negative = better (maker/price improvement). + # Measured for BOTH taker and maker fills so post-trade analytics can compare. + slippage_bps = 0.0 + if mark_at_submit > 0 and fill_price > 0: + raw_diff = (fill_price - mark_at_submit) / mark_at_submit * 10_000 + # Sign convention: adverse = positive regardless of direction. + # For BUY (LONG enter / SHORT close): higher fill price = worse → positive + # For SELL (SHORT enter / LONG close): lower fill price = worse → positive + if side == "BUY": + slippage_bps = raw_diff # positive if fill > mark (paid up) + else: + slippage_bps = -raw_diff # positive if fill < mark (sold down) + + # Exchange-assigned fill time from ACK (field "updateTime" or "time"); 0 if absent. + exchange_ts = int(ack_row.get("updateTime") or ack_row.get("time") or + ack_row.get("transactTime") or 0) + + # Annotate ack_row with computed friction so _events_from_submit can read them. + ack_row["_fee_estimated"] = estimated_fee + ack_row["_fee_source"] = fee_source + ack_row["_is_maker_est"] = is_maker_est + ack_row["_mark_at_submit"] = mark_at_submit + ack_row["_slippage_bps"] = slippage_bps + ack_row["_submit_ts_ms"] = submit_ts_ms + ack_row["_exchange_ts"] = exchange_ts + receipt = ExecutionReceipt( timestamp=datetime.now(timezone.utc), status=status, @@ -443,12 +691,28 @@ class BingxDirectExecutionAdapter(ExecutionPort): quantity=float(intent.target_size or 0.0), price=fill_price, client_order_id=client_order_id, - order_id=str((ack.order_id if 'ack' in locals() and ack is not None else '') or ack_row.get("orderId") or ""), + order_id=str( + (ack.order_id if "ack" in locals() and ack is not None else "") + or ack_row.get("orderId") or "" + ), raw_ack=ack_row, raw_state=dict(self._state.account if self._state is not None else {}), ) - # Refresh from the venue so the direct runtime can use exchange-led state. - self._state = await self._refresh_exchange_state(intent.asset, include_history=True) + + # S2: background state refresh — do not block the submit path for MARKET fills. + # For MARKET orders that returned FILLED in the ACK, all fill data is already + # in the receipt. The WS stream (FILL_SETTLED + ACCOUNT_UPDATE) delivers + # capital truth. The background REST poll is belt-and-suspenders. + # For LIMIT / non-FILLED orders: must refresh synchronously to detect resting order. + market_filled = (status == "FILLED" and not is_limit) + if market_filled: + asyncio.create_task( + self._refresh_state_background(intent.asset), + name=f"state_refresh_{symbol}", + ) + else: + self._state = await self._refresh_exchange_state(intent.asset, include_history=False) + return receipt async def cancel(self, order: Any, *, reason: str = "") -> dict[str, Any]: diff --git a/prod/clean_arch/dita_v2/bingx_venue.py b/prod/clean_arch/dita_v2/bingx_venue.py index ac51c98..467426c 100644 --- a/prod/clean_arch/dita_v2/bingx_venue.py +++ b/prod/clean_arch/dita_v2/bingx_venue.py @@ -493,6 +493,14 @@ class BingxVenueAdapter(VenueAdapter): **{**base_event.__dict__, "event_id": _event_id(self._event_seq), "kind": KernelEventKind.ORDER_REJECT, "status": VenueEventStatus.REJECTED, "reason": _row_text(ack_row, "msg", "message", default="BINGX_ORDER_REJECTED")}, ) ] + # Extract friction fields annotated by submit_intent (Gap 1/2/3). + fee_estimated = float(ack_row.get("_fee_estimated") or 0.0) + fee_source = str(ack_row.get("_fee_source") or "") + is_maker_est = bool(ack_row.get("_is_maker_est", False)) + mark_at_submit = float(ack_row.get("_mark_at_submit") or 0.0) + slippage_bps = float(ack_row.get("_slippage_bps") or 0.0) + exchange_ts = int(ack_row.get("_exchange_ts") or 0) + events = [base_event] fill_status = _venue_event_status_from_row(status) filled_size = _row_float(ack_row, "executedQty", "cumFilledQty", "filledQty", "lastFilledQty", default=0.0) @@ -524,6 +532,15 @@ class BingxVenueAdapter(VenueAdapter): reason="", raw_payload=ack_row or json_safe(receipt), metadata={"intent_id": intent.intent_id, "action": intent.action.value}, + # Gap 1/2/3: fee + friction fields populated from submit_intent annotations. + # fee_source="ESTIMATED_*" until WS FILL_SETTLED updates it to "WS_SETTLED". + fee=fee_estimated, + fee_asset="USDT", + fee_source=fee_source, + is_maker=is_maker_est, + exchange_ts=exchange_ts, + slippage_bps=slippage_bps, + mark_at_submit=mark_at_submit, ) ) return events diff --git a/prod/clean_arch/dita_v2/contracts.py b/prod/clean_arch/dita_v2/contracts.py new file mode 100644 index 0000000..f7a687c --- /dev/null +++ b/prod/clean_arch/dita_v2/contracts.py @@ -0,0 +1,362 @@ +"""Canonical v2 contracts for the DITAv2 execution kernel.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Dict, Mapping, Optional, Sequence, Tuple + + +class TradeSide(str, Enum): + """Trade side.""" + + LONG = "LONG" + SHORT = "SHORT" + FLAT = "FLAT" + + +class TradeStage(str, Enum): + """Execution stage for a trade slot.""" + + IDLE = "IDLE" + DECISION_CREATED = "DECISION_CREATED" + INTENT_CREATED = "INTENT_CREATED" + ORDER_REQUESTED = "ORDER_REQUESTED" + ORDER_SENT = "ORDER_SENT" + ORDER_ACKED = "ORDER_ACKED" + ORDER_REJECTED = "ORDER_REJECTED" + ENTRY_WORKING = "ENTRY_WORKING" + PARTIAL_FILL = "PARTIAL_FILL" + POSITION_OPENED = "POSITION_OPENED" + POSITION_OPEN = "POSITION_OPEN" + EXIT_REQUESTED = "EXIT_REQUESTED" + EXIT_SENT = "EXIT_SENT" + EXIT_ACKED = "EXIT_ACKED" + EXIT_REJECTED = "EXIT_REJECTED" + EXIT_WORKING = "EXIT_WORKING" + POSITION_PARTIALLY_CLOSED = "POSITION_PARTIALLY_CLOSED" + POSITION_CLOSED = "POSITION_CLOSED" + CLOSED = "CLOSED" + TRADE_TERMINAL_WRITTEN = "TRADE_TERMINAL_WRITTEN" + STALE_STATE_RECONCILING = "STALE_STATE_RECONCILING" + + +class KernelCommandType(str, Enum): + """Kernel command types.""" + + ENTER = "ENTER" + EXIT = "EXIT" + MARK_PRICE = "MARK_PRICE" + RECONCILE = "RECONCILE" + CONTROL = "CONTROL" + CANCEL = "CANCEL" + + +class KernelEventKind(str, Enum): + """Normalized venue event kinds.""" + + ORDER_ACK = "ORDER_ACK" + ORDER_REJECT = "ORDER_REJECT" + RATE_LIMITED = "RATE_LIMITED" + PARTIAL_FILL = "PARTIAL_FILL" + FULL_FILL = "FULL_FILL" + CANCEL_ACK = "CANCEL_ACK" + CANCEL_REJECT = "CANCEL_REJECT" + MARK_PRICE = "MARK_PRICE" + RECONCILE = "RECONCILE" + CONTROL = "CONTROL" + + +class KernelDiagnosticCode(str, Enum): + """Structured diagnostic codes emitted by the kernel.""" + + OK = "OK" + RATE_LIMITED = "RATE_LIMITED" + INVALID_SLOT_ID = "INVALID_SLOT_ID" + INVALID_INTENT = "INVALID_INTENT" + UNSUPPORTED_INTENT = "UNSUPPORTED_INTENT" + SLOT_BUSY = "SLOT_BUSY" + NO_OPEN_POSITION = "NO_OPEN_POSITION" + NO_ACTIVE_EXIT_ORDER = "NO_ACTIVE_EXIT_ORDER" + UNKNOWN_EVENT_KIND = "UNKNOWN_EVENT_KIND" + ORDER_REJECTED = "ORDER_REJECTED" + ENTRY_ORDER_REJECTED = "ENTRY_ORDER_REJECTED" + EXIT_ORDER_REJECTED = "EXIT_ORDER_REJECTED" + CANCEL_REJECTED = "CANCEL_REJECTED" + STALE_STATE_RECONCILE = "STALE_STATE_RECONCILE" + RECONCILED = "RECONCILED" + DUPLICATE_EVENT = "DUPLICATE_EVENT" + UNRESOLVED_SLOT = "UNRESOLVED_SLOT" + INVALID_TRANSITION = "INVALID_TRANSITION" + TERMINAL_STATE = "TERMINAL_STATE" + CAPITAL_FROZEN = "CAPITAL_FROZEN" + + +class KernelSeverity(str, Enum): + """Severity classification for kernel outcomes.""" + + INFO = "INFO" + WARNING = "WARNING" + ERROR = "ERROR" + CRITICAL = "CRITICAL" + + +class VenueOrderStatus(str, Enum): + """Order status surface mirrored from venue truth.""" + + NEW = "NEW" + ACKED = "ACKED" + PARTIALLY_FILLED = "PARTIALLY_FILLED" + FILLED = "FILLED" + CANCELED = "CANCELED" + REJECTED = "REJECTED" + + +class VenueEventStatus(str, Enum): + """Status alias for normalized venue events.""" + + ACKED = "ACKED" + REJECTED = "REJECTED" + RATE_LIMITED = "RATE_LIMITED" + PARTIALLY_FILLED = "PARTIALLY_FILLED" + FILLED = "FILLED" + CANCELED = "CANCELED" + CANCELED_REJECTED = "CANCEL_REJECTED" + + +@dataclass(frozen=True) +class VenueOrder: + """Venue-specific order identity and fill state.""" + + internal_trade_id: str + venue_order_id: str + venue_client_id: str + side: TradeSide + intended_size: float + filled_size: float = 0.0 + average_fill_price: float = 0.0 + status: VenueOrderStatus = VenueOrderStatus.NEW + metadata: Dict[str, Any] = field(default_factory=dict) + + @property + def remaining_size(self) -> float: + return max(0.0, float(self.intended_size) - float(self.filled_size)) + + +@dataclass +class TradeSlot: + """A single execution slot managed by the v2 kernel.""" + + slot_id: int + trade_id: str = "" + asset: str = "" + side: TradeSide = TradeSide.FLAT + entry_price: float = 0.0 + size: float = 0.0 + initial_size: float = 0.0 + leverage: float = 0.0 + entry_time: Optional[datetime] = None + unrealized_pnl: float = 0.0 + realized_pnl: float = 0.0 + closed: bool = False + exit_leg_ratios: Tuple[float, ...] = (1.0,) + active_leg_index: int = 0 + active_exit_order: Optional[VenueOrder] = None + active_entry_order: Optional[VenueOrder] = None + fsm_state: TradeStage = TradeStage.IDLE + close_reason: str = "" + last_event_time: Optional[datetime] = None + seen_event_ids: Tuple[str, ...] = () + metadata: Dict[str, Any] = field(default_factory=dict) + + def is_free(self) -> bool: + return self.fsm_state in {TradeStage.IDLE, TradeStage.CLOSED} and float(self.size or 0.0) <= 0.0 and not self.active_entry_order and not self.active_exit_order + + def is_open(self) -> bool: + return self.fsm_state in { + TradeStage.ENTRY_WORKING, + TradeStage.POSITION_OPENED, + TradeStage.POSITION_OPEN, + TradeStage.EXIT_WORKING, + } and not self.closed + + def mark_price(self, price: float) -> None: + if price is None or price != price or price <= 0: + return + self.entry_price = self.entry_price or price + if self.entry_price <= 0 or self.size <= 0: + self.unrealized_pnl = 0.0 + return + delta = (price - self.entry_price) / self.entry_price + if self.side == TradeSide.SHORT: + delta = -delta + self.unrealized_pnl = delta * self.size * self.entry_price * self.leverage + + def next_exit_ratio(self) -> float: + if self.active_leg_index < len(self.exit_leg_ratios): + ratio = float(self.exit_leg_ratios[self.active_leg_index]) + return max(0.0, min(1.0, ratio)) + return 1.0 + + def consume_exit_leg(self) -> float: + ratio = self.next_exit_ratio() + self.active_leg_index = min(self.active_leg_index + 1, max(len(self.exit_leg_ratios), 1)) + return ratio + + def remaining_size(self) -> float: + return max(0.0, float(self.size)) + + def attach_entry_order(self, order: VenueOrder) -> None: + self.active_entry_order = order + + def attach_exit_order(self, order: VenueOrder) -> None: + self.active_exit_order = order + + def to_dict(self) -> Dict[str, Any]: + def _order_dict(order: Optional[VenueOrder]) -> Optional[Dict[str, Any]]: + if order is None: + return None + return { + "internal_trade_id": order.internal_trade_id, + "venue_order_id": order.venue_order_id, + "venue_client_id": order.venue_client_id, + "side": order.side.value, + "intended_size": float(order.intended_size or 0.0), + "filled_size": float(order.filled_size or 0.0), + "average_fill_price": float(order.average_fill_price or 0.0), + "status": order.status.value, + "metadata": dict(order.metadata), + } + + return { + "slot_id": self.slot_id, + "trade_id": self.trade_id, + "asset": self.asset, + "side": self.side.value, + "entry_price": float(self.entry_price or 0.0), + "size": float(self.size or 0.0), + "initial_size": float(self.initial_size or 0.0), + "leverage": float(self.leverage or 0.0), + "entry_time": self.entry_time.isoformat() if hasattr(self.entry_time, "isoformat") else None, + "unrealized_pnl": float(self.unrealized_pnl or 0.0), + "realized_pnl": float(self.realized_pnl or 0.0), + "closed": bool(self.closed), + "exit_leg_ratios": [float(r) for r in self.exit_leg_ratios], + "active_leg_index": int(self.active_leg_index or 0), + "active_exit_order": _order_dict(self.active_exit_order), + "active_entry_order": _order_dict(self.active_entry_order), + "fsm_state": self.fsm_state.value, + "close_reason": self.close_reason, + "last_event_time": self.last_event_time.isoformat() if hasattr(self.last_event_time, "isoformat") else None, + "seen_event_ids": list(self.seen_event_ids), + "metadata": dict(self.metadata), + } + + +@dataclass(frozen=True) +class KernelIntent: + """Command emitted by the algo and written to the hot-path intent region.""" + + timestamp: datetime + intent_id: str + trade_id: str + slot_id: int + asset: str + side: TradeSide + action: KernelCommandType + reference_price: float + target_size: float + leverage: float + exit_leg_ratios: Tuple[float, ...] = (1.0,) + reason: str = "" + metadata: Dict[str, Any] = field(default_factory=dict) + stage: TradeStage = TradeStage.INTENT_CREATED + order_type: str = "MARKET" + limit_price: float = 0.0 + + +@dataclass(frozen=True) +class VenueEvent: + """Normalized venue truth mapped into DITAv2 semantics.""" + + timestamp: datetime + event_id: str + trade_id: str + slot_id: int + kind: KernelEventKind + status: VenueEventStatus + venue_order_id: str = "" + venue_client_id: str = "" + side: TradeSide = TradeSide.FLAT + asset: str = "" + price: float = 0.0 # avg fill price + size: float = 0.0 + filled_size: float = 0.0 + remaining_size: float = 0.0 + reason: str = "" + raw_payload: Dict[str, Any] = field(default_factory=dict) + metadata: Dict[str, Any] = field(default_factory=dict) + + # ── Fee / friction fields ────────────────────────────────────────────── + # fee: exchange commission for this fill event. + # Positive = cost; negative = rebate (maker on some venues). + # Starts as ESTIMATED_TAKER from the REST ACK path (BingX ACK does not + # include commission — we estimate from fill_size × fill_price × rate). + # Updated to WS_SETTLED when the FILL_SETTLED event arrives from the + # account stream with the actual commission field "n". + fee: float = 0.0 + fee_asset: str = "" # e.g. "USDT" + # fee_source provenance codes: + # "" — fee unknown (e.g. CANCEL_ACK, ORDER_ACK events) + # "ESTIMATED_TAKER" — REST path, MARKET order; estimated at taker rate + # "ESTIMATED_MAKER" — REST path, LIMIT order that may rest; estimated at maker rate + # "WS_SETTLED" — actual fee from WS ORDER_TRADE_UPDATE field "n" + # "REST_SETTLED" — actual fee from REST fill history (allFillOrders) + fee_source: str = "" + is_maker: bool = False # True when LIMIT order rested and filled as maker + # exchange_ts: exchange-assigned fill timestamp (ms epoch). + # 0 when not available (REST ACK path — use VenueEvent.timestamp as fallback). + # Non-zero from WS ORDER_TRADE_UPDATE (field "T" or "t"). + exchange_ts: int = 0 + # slippage_bps: signed fill-quality metric. + # For taker fills: (fill_price - mark_at_submit) / mark_at_submit × 10_000 + # positive = worse than mark (usual for taker on moving markets) + # negative = better than mark (rare — mark moved in our favour between submit and fill) + # For maker fills: fill_price is better than mid by design; slippage is typically + # negative (price improvement vs what a taker would have paid). + # 0.0 when mark_at_submit is unavailable. + slippage_bps: float = 0.0 + mark_at_submit: float = 0.0 # mark/mid price captured just before submit_intent POST + + +@dataclass(frozen=True) +class KernelTransition: + """Durable kernel transition used for debug journaling.""" + + timestamp: datetime + trade_id: str + slot_id: int + prev_state: TradeStage + next_state: TradeStage + trigger: str + intent_id: str = "" + event_id: str = "" + control_mode: str = "" + control_verbosity: str = "" + details: Dict[str, Any] = field(default_factory=dict) + + +@dataclass(frozen=True) +class KernelOutcome: + """Result of applying a command or venue event.""" + + accepted: bool + slot_id: int + trade_id: str + state: TradeStage + diagnostic_code: KernelDiagnosticCode = KernelDiagnosticCode.OK + severity: KernelSeverity = KernelSeverity.INFO + transitions: Tuple[KernelTransition, ...] = () + emitted_events: Tuple[VenueEvent, ...] = () + details: Dict[str, Any] = field(default_factory=dict) diff --git a/prod/clean_arch/dita_v2/test_leverage_cache.py b/prod/clean_arch/dita_v2/test_leverage_cache.py new file mode 100644 index 0000000..2362dbf --- /dev/null +++ b/prod/clean_arch/dita_v2/test_leverage_cache.py @@ -0,0 +1,390 @@ +"""S1: Leverage cache — comprehensive mock tests. + +Covers: + - Same-leverage skip (no POST) + - Change triggers POST + - POST failure → cache NOT updated → retry on next call + - Concurrent same-symbol same-leverage: only one POST (lock) + - Concurrent same-symbol different-leverage: serialised, both POST + - Connect-time drift detection and cache correction + - Persist/restore across "restarts" (file-based) + - Multi-runner conflict: both runners see the same account-level leverage + - Leverage after BingX HTTP error: partial state handled correctly + +Bad-leverage-at-trade is one of the worst possible outcomes — these tests +guard every code path that could produce it. +""" +from __future__ import annotations + +import asyncio +import json +import sys +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch, call +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest + + +# --------------------------------------------------------------------------- +# Helpers — minimal adapter stub wired to a fake HTTP client +# --------------------------------------------------------------------------- + +def _make_adapter(tmp_path: Path, env_tag: str = "vst"): + """Build a BingxDirectExecutionAdapter with a mocked HTTP client.""" + from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter, BingxDirectExecutionConfig + from prod.bingx.enums import BingxEnvironment + + cfg = BingxDirectExecutionConfig( + environment=BingxEnvironment.VST, + allow_mainnet=False, + default_leverage=1, + exchange_leverage_cap=3, + ) + + mock_client = AsyncMock() + mock_client.signed_post = AsyncMock(return_value={"leverage": 1}) + mock_client.signed_get = AsyncMock(return_value={"leverage": 1, "longLeverage": 1}) + mock_provider = MagicMock() + mock_provider.initialize = AsyncMock() + mock_provider.find = MagicMock(return_value=None) + mock_provider.list_all = MagicMock(return_value=[]) + + adapter = BingxDirectExecutionAdapter(cfg, client=mock_client, provider=mock_provider) + # Override persist path to tmp THEN reload — __init__ already loaded from the + # default /tmp path. Reloading after override ensures tests are path-isolated. + adapter._leverage_cache_path = tmp_path / f".bingx_leverage_cache_{env_tag}.json" + adapter._leverage_cache = {} + adapter._load_leverage_cache() # load from test-specific tmp path (may be empty) + + return adapter, mock_client + + +# --------------------------------------------------------------------------- +# 1. Basic skip — same leverage, no POST +# --------------------------------------------------------------------------- + +class TestLeverageCacheBasic: + @pytest.mark.asyncio + async def test_skip_when_same_leverage(self, tmp_path): + adapter, client = _make_adapter(tmp_path) + # First call: cache miss → POST + await adapter._ensure_leverage("TRX-USDT", 1) + assert client.signed_post.call_count == 1 + + # Second call: cache hit → NO POST + await adapter._ensure_leverage("TRX-USDT", 1) + assert client.signed_post.call_count == 1, "Should not POST when leverage unchanged" + + @pytest.mark.asyncio + async def test_post_on_change(self, tmp_path): + adapter, client = _make_adapter(tmp_path) + await adapter._ensure_leverage("TRX-USDT", 1) + assert client.signed_post.call_count == 1 + + # Change leverage → must POST again + await adapter._ensure_leverage("TRX-USDT", 2) + assert client.signed_post.call_count == 2, "Should POST when leverage changes" + assert adapter._leverage_cache["TRX-USDT"] == 2 + + @pytest.mark.asyncio + async def test_different_symbols_independent(self, tmp_path): + adapter, client = _make_adapter(tmp_path) + await adapter._ensure_leverage("TRX-USDT", 1) + await adapter._ensure_leverage("XRP-USDT", 1) + assert client.signed_post.call_count == 2, "Each symbol needs its own POST" + + # Skip both on repeat + await adapter._ensure_leverage("TRX-USDT", 1) + await adapter._ensure_leverage("XRP-USDT", 1) + assert client.signed_post.call_count == 2, "No extra POSTs for cached symbols" + + +# --------------------------------------------------------------------------- +# 2. Failure handling — cache NOT updated on POST failure +# --------------------------------------------------------------------------- + +class TestLeverageCacheFailure: + @pytest.mark.asyncio + async def test_cache_not_updated_on_post_failure(self, tmp_path): + from prod.bingx.http import BingxHttpError + adapter, client = _make_adapter(tmp_path) + client.signed_post.side_effect = BingxHttpError("HTTP 429 rate limit") + + result = await adapter._ensure_leverage("TRX-USDT", 2) + + assert result is False, "Should return False on failure" + assert "TRX-USDT" not in adapter._leverage_cache, ( + "Cache must NOT be updated when POST fails — " + "next submit must retry, not use wrong leverage" + ) + + @pytest.mark.asyncio + async def test_retry_on_next_call_after_failure(self, tmp_path): + from prod.bingx.http import BingxHttpError + adapter, client = _make_adapter(tmp_path) + + # First attempt fails + client.signed_post.side_effect = BingxHttpError("rate limit") + await adapter._ensure_leverage("TRX-USDT", 2) + assert client.signed_post.call_count == 1 + + # Second attempt succeeds + client.signed_post.side_effect = None + client.signed_post.return_value = {"leverage": 2} + await adapter._ensure_leverage("TRX-USDT", 2) + assert client.signed_post.call_count == 2, "Must retry after failure" + assert adapter._leverage_cache.get("TRX-USDT") == 2 + + @pytest.mark.asyncio + async def test_first_call_returns_true_on_success(self, tmp_path): + adapter, client = _make_adapter(tmp_path) + result = await adapter._ensure_leverage("TRX-USDT", 1) + assert result is True, "Should return True when POST is made" + + @pytest.mark.asyncio + async def test_skip_returns_false(self, tmp_path): + adapter, client = _make_adapter(tmp_path) + await adapter._ensure_leverage("TRX-USDT", 1) + result = await adapter._ensure_leverage("TRX-USDT", 1) + assert result is False, "Should return False when POST is skipped" + + +# --------------------------------------------------------------------------- +# 3. Concurrency — asyncio.Lock prevents interleaving +# --------------------------------------------------------------------------- + +class TestLeverageCacheConcurrency: + @pytest.mark.asyncio + async def test_concurrent_same_symbol_same_leverage_one_post(self, tmp_path): + """Two concurrent submits for same symbol+leverage → exactly one POST.""" + adapter, client = _make_adapter(tmp_path) + + # Introduce a small delay so both calls enter _ensure_leverage before either completes + call_count = 0 + async def slow_post(*args, **kwargs): + nonlocal call_count + call_count += 1 + await asyncio.sleep(0.01) + return {"leverage": 1} + + client.signed_post.side_effect = slow_post + + results = await asyncio.gather( + adapter._ensure_leverage("TRX-USDT", 1), + adapter._ensure_leverage("TRX-USDT", 1), + ) + # Only ONE should have actually POSTed (the one that won the lock) + assert call_count == 1, ( + f"Expected exactly 1 leverage POST for same symbol+leverage, got {call_count}. " + "This is the heisenbug: if the lock isn't protecting the cache check+update " + "atomically, both calls see an empty cache and both POST." + ) + # One True (did POST), one False (saw cache hit after lock) + assert sorted(results) == [False, True], f"Expected [False, True], got {results}" + + @pytest.mark.asyncio + async def test_concurrent_same_symbol_different_leverage_both_post(self, tmp_path): + """Two calls with different leverages for same symbol → both POST, serialised.""" + adapter, client = _make_adapter(tmp_path) + posted_leverages = [] + + async def recording_post(path, params): + await asyncio.sleep(0.005) + posted_leverages.append(params.get("leverage")) + return {"leverage": params.get("leverage")} + + client.signed_post.side_effect = recording_post + + await asyncio.gather( + adapter._ensure_leverage("TRX-USDT", 1), + adapter._ensure_leverage("TRX-USDT", 2), + ) + assert len(posted_leverages) == 2, "Both different-leverage calls must POST" + assert set(posted_leverages) == {1, 2} + + @pytest.mark.asyncio + async def test_independent_symbols_concurrent_no_interference(self, tmp_path): + """Different symbols are fully independent — no cross-symbol blocking.""" + adapter, client = _make_adapter(tmp_path) + call_order = [] + + async def recording_post(path, params): + call_order.append(params.get("symbol", "")) + return {"leverage": 1} + + client.signed_post.side_effect = recording_post + + await asyncio.gather( + adapter._ensure_leverage("TRX-USDT", 1), + adapter._ensure_leverage("XRP-USDT", 1), + adapter._ensure_leverage("BTC-USDT", 1), + ) + assert len(call_order) == 3, "All three symbols should POST independently" + + +# --------------------------------------------------------------------------- +# 4. Persistence — JSON sidecar survives "restarts" +# --------------------------------------------------------------------------- + +class TestLeverageCachePersistence: + @pytest.mark.asyncio + async def test_cache_persisted_on_set(self, tmp_path): + adapter, client = _make_adapter(tmp_path) + await adapter._ensure_leverage("TRX-USDT", 2) + + persisted = json.loads(adapter._leverage_cache_path.read_text()) + assert persisted.get("TRX-USDT") == 2 + + @pytest.mark.asyncio + async def test_cache_restored_on_init(self, tmp_path): + """Second adapter instance loads cache from file → skips POST for cached symbol.""" + adapter1, client1 = _make_adapter(tmp_path) + await adapter1._ensure_leverage("TRX-USDT", 2) + assert client1.signed_post.call_count == 1 + + # Second adapter reads from same file + adapter2, client2 = _make_adapter(tmp_path) + assert adapter2._leverage_cache.get("TRX-USDT") == 2 + + await adapter2._ensure_leverage("TRX-USDT", 2) + assert client2.signed_post.call_count == 0, ( + "After restart, cached leverage should not trigger another POST" + ) + + @pytest.mark.asyncio + async def test_corrupt_cache_file_handled_gracefully(self, tmp_path): + adapter, _ = _make_adapter(tmp_path) + adapter._leverage_cache_path.write_text("{not valid json}") + + # Re-load should not crash; cache resets to empty + adapter._load_leverage_cache() + assert adapter._leverage_cache == {} + + @pytest.mark.asyncio + async def test_invalid_leverage_values_filtered(self, tmp_path): + # Write the file first, then create adapter pointing at it + cache_file = tmp_path / ".bingx_leverage_cache_vst.json" + cache_file.write_text( + json.dumps({"TRX-USDT": 2, "XRP-USDT": -1, "BTC-USDT": "bad", "ETH-USDT": 0}) + ) + adapter, _ = _make_adapter(tmp_path) # _make_adapter calls _load_leverage_cache after path set + # Only valid (>= 1) entries survive + assert adapter._leverage_cache.get("TRX-USDT") == 2 + assert "XRP-USDT" not in adapter._leverage_cache, "Negative leverage filtered" + assert "BTC-USDT" not in adapter._leverage_cache, "Non-numeric leverage filtered" + assert "ETH-USDT" not in adapter._leverage_cache, "Zero leverage filtered" + + +# --------------------------------------------------------------------------- +# 5. Connect-time drift detection +# --------------------------------------------------------------------------- + +class TestLeverageDriftDetection: + @pytest.mark.asyncio + async def test_drift_detected_and_cache_corrected(self, tmp_path): + """Exchange has lev=2 but cache says 1 → cache updated to exchange truth.""" + adapter, client = _make_adapter(tmp_path) + adapter._leverage_cache["TRX-USDT"] = 1 # stale cache + + # Exchange returns 2 (another runner changed it) + client.signed_get.return_value = {"leverage": 2, "longLeverage": 2} + await adapter._verify_leverage_drift() + + assert adapter._leverage_cache.get("TRX-USDT") == 2, ( + "Cache must be updated to exchange truth after drift detected" + ) + + @pytest.mark.asyncio + async def test_no_drift_no_update(self, tmp_path): + adapter, client = _make_adapter(tmp_path) + adapter._leverage_cache["TRX-USDT"] = 1 + + client.signed_get.return_value = {"leverage": 1, "longLeverage": 1} + await adapter._verify_leverage_drift() + + # No change — cache is already correct + assert adapter._leverage_cache.get("TRX-USDT") == 1 + + @pytest.mark.asyncio + async def test_drift_check_failure_is_non_fatal(self, tmp_path): + """If drift check itself fails (network error), adapter must not crash.""" + from prod.bingx.http import BingxHttpError + adapter, client = _make_adapter(tmp_path) + adapter._leverage_cache["TRX-USDT"] = 1 + client.signed_get.side_effect = BingxHttpError("rate limit") + + await adapter._verify_leverage_drift() # must not raise + assert adapter._leverage_cache.get("TRX-USDT") == 1 # unchanged + + @pytest.mark.asyncio + async def test_connect_calls_drift_verification(self, tmp_path): + """connect() must call _verify_leverage_drift after refresh_state.""" + adapter, client = _make_adapter(tmp_path) + adapter._leverage_cache["TRX-USDT"] = 99 # obviously wrong + + from prod.clean_arch.ports.execution import ExchangeStateSnapshot + from datetime import datetime, timezone + + snap = ExchangeStateSnapshot( + timestamp=datetime.now(timezone.utc), + capital=25000.0, equity=25000.0, + open_positions={}, open_orders=[], + all_orders=[], all_fills=[], + account={}, open_notional=0.0, source="test", + ) + adapter.refresh_state = AsyncMock(return_value=snap) + client.signed_get.return_value = {"leverage": 1, "longLeverage": 1} + + await adapter.connect() + client.signed_get.assert_called() # drift check happened + + +# --------------------------------------------------------------------------- +# 6. Multi-runner contract documentation test +# --------------------------------------------------------------------------- + +class TestMultiRunnerContract: + @pytest.mark.asyncio + async def test_account_level_leverage_last_writer_wins(self, tmp_path): + """ + CRITICAL: BingX has ONE leverage setting per symbol per account. + Two runners requesting different leverages for the same symbol + CANNOT be safely arbitrated by the cache alone — the exchange + will reflect whichever runner's POST arrived last. + + This test documents the known limitation: runner-A sets lev=1, + runner-B sets lev=2, runner-A's order may execute at lev=2. + Detection requires cross-process coordination (Zinc arbiter) which + is not yet implemented. For now, ensure leverage is uniform across + all runners for a shared account. + """ + # Simulate runner A + adapter_a, client_a = _make_adapter(tmp_path) + client_a.signed_post.return_value = {"leverage": 1} + await adapter_a._ensure_leverage("TRX-USDT", 1) + assert adapter_a._leverage_cache["TRX-USDT"] == 1 + + # Simulate runner B (different adapter, same account) + adapter_b, client_b = _make_adapter(tmp_path) + client_b.signed_post.return_value = {"leverage": 2} + await adapter_b._ensure_leverage("TRX-USDT", 2) + assert adapter_b._leverage_cache["TRX-USDT"] == 2 + + # Runner A's cache is now STALE — exchange has lev=2 from runner B + # Runner A believes lev=1 but the exchange has lev=2. + # This is the known multi-runner conflict with no current mitigation. + assert adapter_a._leverage_cache["TRX-USDT"] == 1, ( + "Runner A's cache is stale after runner B changed leverage. " + "This is expected — document the known limitation. " + "Fix: cross-process Zinc leverage arbiter (future work)." + ) + + @pytest.mark.asyncio + async def test_single_runner_consistent_after_multiple_symbols(self, tmp_path): + """Within one runner: leverage is always correct after successful POST.""" + adapter, client = _make_adapter(tmp_path) + for sym, lev in [("TRX-USDT", 1), ("XRP-USDT", 2), ("BTC-USDT", 1)]: + await adapter._ensure_leverage(sym, lev) + assert adapter._leverage_cache[sym] == lev, f"Cache wrong for {sym}" diff --git a/prod/clean_arch/persistence/pink_clickhouse.py b/prod/clean_arch/persistence/pink_clickhouse.py index d551c6c..102fe88 100644 --- a/prod/clean_arch/persistence/pink_clickhouse.py +++ b/prod/clean_arch/persistence/pink_clickhouse.py @@ -455,21 +455,36 @@ class PinkClickHousePersistence: partial = (not slot_closed) and cur_size > 0.0 - # Extract the fill price from emitted venue events (G21 fix): the actual - # exchange fill price lives in the FULL_FILL/PARTIAL_FILL event, not in - # the slot dict. Pass it explicitly so _write_trade_event does not fall - # back to entry_price. + # Extract fill price AND friction fields from emitted venue events. + # These are first-class fields on VenueEvent (Gap 1/2/3). fill_price_hint = 0.0 + fill_fee = 0.0 + fill_fee_source = "" + fill_is_maker = False + fill_slippage_bps = 0.0 + fill_mark_at_submit = 0.0 + fill_exchange_ts = 0 for ev in events: p_val = getattr(ev, "price", 0.0) if p_val and math.isfinite(float(p_val)) and float(p_val) > 0: fill_price_hint = float(p_val) - break + if getattr(ev, "fee", 0.0): + fill_fee = float(ev.fee) + if getattr(ev, "fee_source", ""): + fill_fee_source = str(ev.fee_source) + if getattr(ev, "is_maker", False): + fill_is_maker = bool(ev.is_maker) + if getattr(ev, "slippage_bps", 0.0): + fill_slippage_bps = float(ev.slippage_bps) + if getattr(ev, "mark_at_submit", 0.0): + fill_mark_at_submit = float(ev.mark_at_submit) + if getattr(ev, "exchange_ts", 0): + fill_exchange_ts = int(ev.exchange_ts) - # One trade_exit_legs row per exit leg (partial or final), BLUE-schema - # compatible so PINK multi-exit trades reconcile against the same table. self._write_trade_exit_leg(snapshot, decision, intent, slot, outcome, - fill_price_hint=fill_price_hint) + fill_price_hint=fill_price_hint, + fill_fee=fill_fee, fill_fee_source=fill_fee_source, + fill_is_maker=fill_is_maker, fill_slippage_bps=fill_slippage_bps) self._write_trade_reconstruction( snapshot, intent.trade_id, event_type="PARTIAL_EXIT" if partial else "EXIT", @@ -483,11 +498,15 @@ class PinkClickHousePersistence: }, market_state=market_state, ) - # Terminal trade event. if slot_closed: self._write_trade_event(snapshot, decision, intent, slot, outcome, market_state=market_state, - exit_price_hint=fill_price_hint) + exit_price_hint=fill_price_hint, + fill_fee=fill_fee, fill_fee_source=fill_fee_source, + fill_is_maker=fill_is_maker, + fill_slippage_bps=fill_slippage_bps, + fill_mark_at_submit=fill_mark_at_submit, + fill_exchange_ts=fill_exchange_ts) def persist_fill_events( self, @@ -601,6 +620,46 @@ class PinkClickHousePersistence: market_state=market_state, ) + def persist_fee_settled( + self, + *, + trade_id: str, + fee: float, + fee_asset: str = "USDT", + is_maker: bool = False, + exchange_ts: int = 0, + realized_pnl_delta: float = 0.0, + ts: Optional[Any] = None, + ) -> None: + """Record the WS FILL_SETTLED fee arriving after the REST submit. + + Gap 2: the REST ACK path writes fee_source="ESTIMATED_TAKER/MAKER". + When the WS ORDER_TRADE_UPDATE frame arrives with field "n" (actual + commission), call this method to log the settled truth. + + The CH spool stores both the original estimated row AND this settled row. + Downstream queries can reconcile using: + SELECT trade_id, MAX(fee) FILTER(WHERE fee_source='WS_SETTLED') AS settled_fee, + MAX(fee) FILTER(WHERE fee_source LIKE 'ESTIMATED%') AS estimated_fee + FROM trade_events GROUP BY trade_id + + This method writes to ``fee_settled_events`` (a lightweight supplementary + table, not trade_events) so the original row is never mutated. + """ + ts_val = ts or datetime.now(timezone.utc) + self._sink("fee_settled_events", { + "ts": ts_val.isoformat() if hasattr(ts_val, "isoformat") else str(ts_val), + "trade_id": trade_id, + "fee": float(fee), + "fee_asset": fee_asset, + "fee_source": "WS_SETTLED", + "is_maker": bool(is_maker), + "exchange_ts": int(exchange_ts), + "realized_pnl_delta": float(realized_pnl_delta), + "runtime_namespace": self.config.runtime_namespace, + "strategy": self.config.strategy, + }) + def record_anomaly( self, *, @@ -876,6 +935,10 @@ class PinkClickHousePersistence: self, snapshot: Any, decision: Decision, intent: Intent, slot_dict: dict[str, Any], outcome: KernelOutcome | None, fill_price_hint: float = 0.0, + fill_fee: float = 0.0, + fill_fee_source: str = "", + fill_is_maker: bool = False, + fill_slippage_bps: float = 0.0, ) -> None: """Emit one BLUE-schema-compatible ``trade_exit_legs`` row per exit leg. @@ -964,6 +1027,11 @@ class PinkClickHousePersistence: "pnl_leg": pnl_leg, "pnl_realized_total": cur_realized, "bars_held": int(intent.bars_held or 0), + # Gap 1/2/3: per-leg friction + "fee_leg": fill_fee, + "fee_source": fill_fee_source, + "is_maker": fill_is_maker, + "slippage_bps": fill_slippage_bps, }) # Advance the per-trade leg snapshot for the next leg's delta. @@ -978,6 +1046,12 @@ class PinkClickHousePersistence: slot_dict: dict[str, Any], outcome: KernelOutcome | None, *, market_state: Mapping[str, Any] | None = None, exit_price_hint: float = 0.0, + fill_fee: float = 0.0, + fill_fee_source: str = "", + fill_is_maker: bool = False, + fill_slippage_bps: float = 0.0, + fill_mark_at_submit: float = 0.0, + fill_exchange_ts: int = 0, ) -> None: entry_price = _safe_float(slot_dict.get("entry_price", 0.0), 0.0) or _safe_float(intent.reference_price, 0.0) quantity = _safe_float(slot_dict.get("initial_size", slot_dict.get("size", 0.0)), 0.0) or _safe_float(intent.target_size, 0.0) @@ -1050,7 +1124,19 @@ class PinkClickHousePersistence: "entry_payload_json": _json_text({"decision": _decision_summary(decision), "intent": _intent_summary(intent)}), "exit_payload_json": _json_text({"outcome": _outcome_summary(outcome), "slot": _json_safe(slot_dict)}), "execution_payload_json": _json_text({"outcome": _outcome_summary(outcome)}), - "friction_payload_json": _json_text({"fees": 0.0}), + # Gap 1/2/3: fee, maker/taker, slippage, exchange timing. + # fee_source provenance: "ESTIMATED_TAKER" | "ESTIMATED_MAKER" | "WS_SETTLED" | "REST_SETTLED" + "fee": fill_fee, + "fee_source": fill_fee_source, + "is_maker": fill_is_maker, + "slippage_bps": fill_slippage_bps, + "mark_at_submit": fill_mark_at_submit, + "exchange_ts": fill_exchange_ts, + "friction_payload_json": _json_text({ + "fee": fill_fee, "fee_source": fill_fee_source, + "is_maker": fill_is_maker, "slippage_bps": fill_slippage_bps, + "mark_at_submit": fill_mark_at_submit, "exchange_ts": fill_exchange_ts, + }), "event_payload_json": _json_text({"phase": "terminal_close", "trade_id": intent.trade_id}), "market_state_bundle_json": _json_text(market_state or {}), "tp_base_pct": _safe_float(metadata.get("tp_base_pct", 0.0), 0.0), diff --git a/prod/clean_arch/runtime/pink_direct.py b/prod/clean_arch/runtime/pink_direct.py index 7b2eef3..72257e0 100644 --- a/prod/clean_arch/runtime/pink_direct.py +++ b/prod/clean_arch/runtime/pink_direct.py @@ -514,6 +514,22 @@ class PinkDirectRuntime: "fee": event.fee, # negative = rebate "is_maker": event.is_maker, }) + # Gap 2: log settled fee with WS_SETTLED provenance so + # downstream can reconcile against the ESTIMATED_TAKER row. + if self.persistence is not None: + try: + # trade_id: best-effort from the client_order_id field ("c") + # or order_id ("i") — WS may not carry our trade_id directly. + ws_trade_id = str(event.client_order_id or event.order_id or "") + self.persistence.persist_fee_settled( + trade_id=ws_trade_id, + fee=event.fee, + fee_asset=event.fee_asset or "USDT", + is_maker=event.is_maker, + exchange_ts=event.exchange_ts, + ) + except Exception as _fee_exc: + self.logger.debug("persist_fee_settled failed: %s", _fee_exc) # Persist full kernel state after every settled fill for # crash recovery + session-to-session calibration continuity. _persist_kernel_snapshot(self.kernel, self.logger)