from __future__ import annotations import asyncio import json import logging import math import urllib.parse import urllib.request from dataclasses import asdict from dataclasses import is_dataclass from collections.abc import Iterable from datetime import timedelta from decimal import Decimal from hashlib import sha256 from typing import Any from nautilus_trader.accounting.accounts.margin import MarginAccount from nautilus_trader.cache.cache import Cache from nautilus_trader.common.component import LiveClock from nautilus_trader.common.component import MessageBus from nautilus_trader.common.enums import LogColor from nautilus_trader.core.uuid import UUID4 from nautilus_trader.execution.messages import BatchCancelOrders from nautilus_trader.execution.messages import CancelAllOrders from nautilus_trader.execution.messages import CancelOrder from nautilus_trader.execution.messages import GenerateFillReports from nautilus_trader.execution.messages import GenerateOrderStatusReport from nautilus_trader.execution.messages import GenerateOrderStatusReports from nautilus_trader.execution.messages import GeneratePositionStatusReports from nautilus_trader.execution.messages import ModifyOrder from nautilus_trader.execution.messages import SubmitOrder from nautilus_trader.execution.messages import SubmitOrderList from nautilus_trader.execution.reports import ExecutionMassStatus from nautilus_trader.execution.reports import FillReport from nautilus_trader.execution.reports import OrderStatusReport from nautilus_trader.execution.reports import PositionStatusReport from nautilus_trader.live.execution_client import LiveExecutionClient from nautilus_trader.model.enums import AccountType from nautilus_trader.model.enums import OmsType from nautilus_trader.model.enums import OrderSide from nautilus_trader.model.enums import OrderStatus from nautilus_trader.model.enums import OrderType from nautilus_trader.model.enums import PositionSide from nautilus_trader.model.enums import LiquiditySide from nautilus_trader.model.enums import TimeInForce from nautilus_trader.model.enums import order_side_to_str from nautilus_trader.model.identifiers import AccountId from nautilus_trader.model.identifiers import ClientId from nautilus_trader.model.identifiers import ClientOrderId from nautilus_trader.model.identifiers import InstrumentId from nautilus_trader.model.identifiers import PositionId from nautilus_trader.model.identifiers import TradeId from nautilus_trader.model.identifiers import VenueOrderId from nautilus_trader.model.objects import AccountBalance from nautilus_trader.model.objects import Currency from nautilus_trader.model.objects import MarginBalance from nautilus_trader.model.objects import Money from nautilus_trader.model.objects import Price from nautilus_trader.model.objects import Quantity from nautilus_trader.model.orders import Order from .config import BingxExecClientConfig from .enums import BINGX_VENUE from .leverage import LEVERAGE_MAPPING_RULE from .leverage import map_internal_conviction_to_exchange_leverage from .leverage import map_internal_conviction_to_exchange_leverage_target from .leverage import normalize_bingx_leverage_value from .sizing_mode import build_split_sizing_payload from .friction import estimate_friction from ..execution_quality import build_execution_quality_record from .journal import build_snapshot as build_journal_snapshot from .journal import load_latest_snapshot as load_journal_snapshot from .journal import write_snapshot as write_journal_snapshot from .reconciliation import get_terminal_trade_handler try: from prod.ch_writer import ch_put from prod.ch_writer import ts_us as _ch_ts_us except Exception: # pragma: no cover - durable logging remains optional in tests def ch_put(*_args, **_kwargs): # type: ignore[no-redef] return None def _ch_ts_us() -> int: # type: ignore[no-redef] return 0 from .http import BingxHttpClient from .http import BingxHttpError from .instrument_provider import BingxInstrumentProvider from .schemas import BingxOrderAck from .schemas import unwrap_order_payload from .websocket import BingxUserStream LOGGER = logging.getLogger(__name__) def _decimal_text(value: Decimal) -> str: text = format(value.normalize(), "f") if "." in text: text = text.rstrip("0").rstrip(".") return text or "0" def _jsonable_snapshot(value: Any) -> Any: if is_dataclass(value): return asdict(value) if hasattr(value, "__dict__"): return dict(vars(value)) return value def _account_event_observability_tags(client: Any) -> dict[str, str]: resolver = getattr(client, "_pink_observability_tags", None) if callable(resolver): try: tags = resolver() if isinstance(tags, dict): return {str(k): str(v) for k, v in tags.items() if v is not None} except Exception: pass if str(getattr(client, "_journal_strategy", "")).lower() != "pink": return {} return { "runtime_namespace": "pink", "strategy_namespace": "pink", "event_namespace": "pink", "actor_name": "BingxExecutionClient", "exec_venue": BINGX_VENUE.value, "data_venue": BINGX_VENUE.value, } def _rows_from_payload(payload: Any, *keys: str) -> list[dict[str, Any]]: if isinstance(payload, list): return [row for row in payload if isinstance(row, dict)] if isinstance(payload, dict): for key in keys: rows = payload.get(key) if isinstance(rows, list): return [row for row in rows if isinstance(row, dict)] return [] def _capital_from_balance_rows(rows: Any) -> float: if not isinstance(rows, list): LOGGER.warning("BingX account snapshot balances payload is not a list") return 0.0 for row in rows: if not isinstance(row, dict): LOGGER.warning("BingX account snapshot skipped malformed balance row: %r", row) continue capital = 0.0 for key in ("total", "balance", "equity", "availableMargin", "availableBalance", "walletBalance", "free"): try: capital = float(row.get(key, 0.0) or 0.0) except Exception: continue if capital > 0 and math.isfinite(capital): return capital if capital > 0 and math.isfinite(capital): return capital if rows: LOGGER.error("BingX account snapshot contained no usable balance rows") return 0.0 def _open_notional_from_positions(rows: Any) -> float: if not isinstance(rows, dict): LOGGER.warning("BingX position snapshot payload is not a dict") return 0.0 total = 0.0 for row in rows.values(): if not isinstance(row, dict): LOGGER.warning("BingX position snapshot skipped malformed position row: %r", row) continue try: qty = abs( float( row.get("positionAmt") or row.get("positionQty") or row.get("positionSize") or row.get("quantity") or row.get("pa") or 0.0 ) ) if qty <= 0.0: continue notional = row.get("positionValue") or row.get("notional") or row.get("openNotional") if notional is not None: total += abs(float(notional or 0.0)) continue entry = ( row.get("entryPrice") or row.get("avgPrice") or row.get("markPrice") or row.get("avgEntryPrice") or row.get("ep") or row.get("ap") or 0.0 ) total += qty * abs(float(entry or 0.0)) except Exception: LOGGER.warning("BingX position snapshot skipped unreadable position row: %r", row) continue return total def _filled_order_count_from_snapshots(rows: Any) -> int: if not isinstance(rows, list): return 0 seen: set[str] = set() count = 0 for snapshot in rows: if not isinstance(snapshot, dict): continue row = snapshot.get("row") if isinstance(snapshot.get("row"), dict) else snapshot if not isinstance(row, dict): continue status = str(row.get("status") or "").upper() if status and status not in {"FILLED", "CLOSED"}: continue trade_key = str(snapshot.get("_trade_key") or "").strip() if trade_key: base_key = trade_key.split(":", 1)[0] else: base_key = str( row.get("orderId") or row.get("orderID") or row.get("clientOrderId") or row.get("clientOrderID") or "" ).strip() if not base_key or base_key in seen: continue seen.add(base_key) count += 1 return count def _first_balance_row(rows: Any) -> dict[str, Any] | None: if not isinstance(rows, list): return None for row in rows: if isinstance(row, dict): return row return None def _positive_int_or_none(value: Any) -> int | None: try: parsed = int(value) except Exception: return None return parsed if parsed > 0 else None async def _reserve_leverage_update(client: Any, symbol: str, desired: int) -> bool: async with client._state_lock: if client._configured_leverage.get(symbol) == desired: return False inflight = getattr(client, "_leverage_update_inflight", None) if isinstance(inflight, dict) and inflight.get(symbol) == desired: return False if inflight is None: inflight = {} client._leverage_update_inflight = inflight inflight[symbol] = desired return True async def _commit_leverage_update(client: Any, symbol: str, desired: int) -> None: async with client._state_lock: client._configured_leverage[symbol] = desired inflight = getattr(client, "_leverage_update_inflight", None) if isinstance(inflight, dict): inflight.pop(symbol, None) async def _release_leverage_update(client: Any, symbol: str, desired: int) -> None: async with client._state_lock: inflight = getattr(client, "_leverage_update_inflight", None) if isinstance(inflight, dict) and inflight.get(symbol) == desired: inflight.pop(symbol, None) class BingxExecutionClient(LiveExecutionClient): """ Nautilus live execution client for BingX perpetual futures. This adapter is execution-first for DOLPHIN's split-venue architecture: Binance remains the market-data venue while BingX is responsible for execution, account balances, positions, and leverage state. """ _normalize_bingx_leverage_value = staticmethod(normalize_bingx_leverage_value) _map_internal_conviction_to_exchange_leverage = staticmethod( map_internal_conviction_to_exchange_leverage ) _map_internal_conviction_to_exchange_leverage_target = staticmethod( map_internal_conviction_to_exchange_leverage_target ) _build_split_sizing_payload = staticmethod(build_split_sizing_payload) def __init__( self, loop: asyncio.AbstractEventLoop, client: BingxHttpClient, msgbus: MessageBus, cache: Cache, clock: LiveClock, instrument_provider: BingxInstrumentProvider, config: BingxExecClientConfig, name: str | None = None, ) -> None: super().__init__( loop=loop, client_id=ClientId(name or BINGX_VENUE.value), venue=BINGX_VENUE, oms_type=OmsType.NETTING, account_type=AccountType.MARGIN, base_currency=None, instrument_provider=instrument_provider, msgbus=msgbus, cache=cache, clock=clock, config=config, ) self._client = client self._provider = instrument_provider self._config = config suffix = "vst" if config.environment.is_vst else "live" self._set_account_id(AccountId(f"{BINGX_VENUE.value}-{suffix}")) self._poll_tasks: list[asyncio.Task] = [] self._order_snapshots: dict[str, dict[str, Any]] = {} self._fill_snapshots: list[dict[str, Any]] = [] self._fill_snapshots_by_key: dict[str, dict[str, Any]] = {} self._position_snapshots: dict[str, dict[str, Any]] = {} self._order_id_aliases: dict[str, str] = {} self._account_snapshot: dict[str, Any] = {} self._ledger_authority = "exchange" self._reported_terminal_orders: set[str] = set() self._configured_leverage: dict[str, int] = {} self._leverage_update_inflight: dict[str, int] = {} self._state_lock = asyncio.Lock() self._journal_last_fingerprint = "" self._journal_strategy = getattr(config, "journal_strategy", None) or "prodgreen" self._journal_db = getattr(config, "journal_db", None) or "dolphin_prodgreen" self._user_stream: BingxUserStream | None = None self._user_stream_task: asyncio.Task | None = None self._ws_healthy = False self._ws_last_event_ns = 0 self._ws_supports_order_updates = False self._ws_last_health_state = False self._order_update_events: dict[str, asyncio.Event] = {} self._fill_update_events: dict[str, asyncio.Event] = {} self._account_event_last_fingerprint = "" def _pink_observability_tags(self) -> dict[str, str]: if str(self._journal_strategy).lower() != "pink": return {} return { "runtime_namespace": "pink", "strategy_namespace": "pink", "event_namespace": "pink", "actor_name": "BingxExecutionClient", "exec_venue": BINGX_VENUE.value, "data_venue": BINGX_VENUE.value, } def _ensure_account_events_table(self) -> None: ddl = f""" ALTER TABLE {self._journal_db}.account_events ADD COLUMN IF NOT EXISTS runtime_namespace LowCardinality(String) DEFAULT '', ADD COLUMN IF NOT EXISTS strategy_namespace LowCardinality(String) DEFAULT '', ADD COLUMN IF NOT EXISTS event_namespace LowCardinality(String) DEFAULT '', ADD COLUMN IF NOT EXISTS actor_name LowCardinality(String) DEFAULT '', ADD COLUMN IF NOT EXISTS exec_venue LowCardinality(String) DEFAULT '', ADD COLUMN IF NOT EXISTS data_venue LowCardinality(String) DEFAULT '' """ try: req = urllib.request.Request( "http://localhost:8123/", data=ddl.encode(), method="POST", ) req.add_header("X-ClickHouse-User", "dolphin") req.add_header("X-ClickHouse-Key", "dolphin_ch_2026") urllib.request.urlopen(req, timeout=5).close() self._log.info(f"[CH] {self._journal_db}.account_events tag columns ensured") except Exception as exc: self._log.warning(f"[CH] account_events tag column ensure failed: {exc}") async def _connect(self) -> None: self._log.info("Loading BingX instruments...", LogColor.BLUE) await self._provider.initialize() instruments = self._provider.list_all() self._log.info(f"Loaded {len(instruments)} BingX instruments", LogColor.GREEN) for instrument in instruments: self._cache.add_instrument(instrument) for currency in self._provider.currencies().values(): self._cache.add_currency(currency) self._log.info(f"Registered {len(instruments)} BingX instruments in cache", LogColor.GREEN) ensure_account_events_table = getattr(self, "_ensure_account_events_table", None) if callable(ensure_account_events_table): ensure_account_events_table() await self._refresh_account_state() await self._restore_journal_snapshot() await self._persist_journal_snapshot("STARTUP", force=True) await self._await_account_registered(log_registered=False) await self._persist_journal_snapshot("ACCOUNT_REGISTERED", force=True) if self._config.prefer_websocket: self._start_user_stream() self._start_pollers() async def _disconnect(self) -> None: if self._user_stream is not None: await self._user_stream.close() if self._user_stream_task is not None: self._user_stream_task.cancel() self._user_stream_task = None for task in self._poll_tasks: task.cancel() self._poll_tasks.clear() await self._client.close() cancel_pending = getattr(self, "cancel_pending_tasks", None) if cancel_pending is not None: result = cancel_pending() if asyncio.iscoroutine(result): await result async def refresh_account_state(self) -> None: await self._refresh_account_state() async def refresh_positions(self) -> None: await self._refresh_positions() async def refresh_open_orders(self) -> None: await self._refresh_open_orders() async def _await_account_registered( self, *, timeout_secs: float = 5.0, poll_interval_secs: float = 0.05, log_registered: bool = True, ) -> None: account_id = getattr(self, "account_id", "") deadline = self._clock.utc_now().timestamp() + timeout_secs while self._clock.utc_now().timestamp() < deadline: account = self.get_account() if account is not None: if log_registered: self._log.info(f"BingX account registered: {account_id}", LogColor.GREEN) return snapshot = getattr(self, "_account_snapshot", None) if isinstance(snapshot, dict): capital = _capital_from_balance_rows(snapshot.get("balances", [])) if capital > 0: if log_registered: self._log.info( f"BingX account snapshot ready: {account_id}", LogColor.GREEN, ) return await asyncio.sleep(poll_interval_secs) self._log.warning( f"Timed out waiting for BingX account registration: {account_id}; " "continuing with REST/polling fallback" ) def resolve_split_sizing_payload( self, *, sizing_lev: float, capital: float | None = None, mark_price: float | None = None, quantity_step: float | None = None, venue_notional_cap: float | None = None, exchange_leverage_cap: int | None = None, margin_budget_fraction_override: float | None = None, system_fraction_override: float | None = None, control_plane: dict[str, Any] | None = None, notes: dict[str, Any] | None = None, ) -> dict[str, Any] | None: """Build a BingX-ready sizing payload when split mode is enabled.""" return self._build_split_sizing_payload( sizing_mode=getattr(self._config, "sizing_mode", "engine"), sizing_lev=sizing_lev, capital=capital, mark_price=mark_price, quantity_step=quantity_step, venue_notional_cap=venue_notional_cap, exchange_leverage_cap=exchange_leverage_cap, margin_budget_fraction_override=margin_budget_fraction_override, system_fraction_override=system_fraction_override, control_plane=control_plane, hz_client=self._hz_client, defaults=None, notes=notes or {}, ) async def set_leverage(self, instrument_id: InstrumentId, leverage: int) -> None: desired = normalize_bingx_leverage_value( leverage, exchange_max=self._config.exchange_leverage_cap, ) if desired != int(Decimal(str(leverage))): self._log.info( f"BingX leverage quantized from {leverage!r} to integer exchange leverage {desired}", LogColor.YELLOW, ) symbol = self._normalize_symbol(instrument_id.symbol.value) if not await _reserve_leverage_update(self, symbol, desired): return try: await self._client.signed_post( "/openApi/swap/v2/trade/leverage", {"symbol": self._venue_symbol(instrument_id), "side": "BOTH", "leverage": desired}, ) except Exception: await _release_leverage_update(self, symbol, desired) raise await _commit_leverage_update(self, symbol, desired) account = self.get_account() if isinstance(account, MarginAccount): account.set_leverage(instrument_id, Decimal(desired)) persist = getattr(self, "_persist_journal_snapshot", None) if persist is not None: await persist("LEVERAGE_UPDATE") def snapshot_account_state(self) -> dict[str, Any]: account = dict(getattr(self, "_account_snapshot", {})) balances = account.get("balances", []) capital = _capital_from_balance_rows(balances) peak_capital = capital positions = dict(getattr(self, "_position_snapshots", {})) current_open_notional = _open_notional_from_positions(positions) current_account_leverage = current_open_notional / capital if capital > 0 else 0.0 exchange_leverage = 0 if getattr(self, "_configured_leverage", None): try: exchange_leverage = max( parsed for v in self._configured_leverage.values() if (parsed := _positive_int_or_none(v)) is not None ) except Exception: exchange_leverage = 0 return { "account_id": str(self.account_id), "account_snapshot": dict(getattr(self, "_account_snapshot", {})), "ledger_authority": getattr(self, "_ledger_authority", "exchange"), "order_snapshots": dict(getattr(self, "_order_snapshots", {})), "position_snapshots": dict(getattr(self, "_position_snapshots", {})), "configured_leverage": dict(getattr(self, "_configured_leverage", {})), "current_open_notional": current_open_notional, "current_account_leverage": current_account_leverage, "exchange_leverage": exchange_leverage, "exchange_leverage_mode": "mapped_conservative_integer", "leverage_mapping_rule": LEVERAGE_MAPPING_RULE, "capital": capital, "peak_capital": peak_capital, "ws_healthy": bool(getattr(self, "_ws_healthy", False)), "ws_last_event_ns": int(getattr(self, "_ws_last_event_ns", 0)), "ws_supports_order_updates": bool(getattr(self, "_ws_supports_order_updates", False)), "rate_limits": _jsonable_snapshot(self._client.rate_limit_snapshot()), "circuit_breaker": _jsonable_snapshot(self._client.circuit_breaker.snapshot()), } def _write_account_event(self, reason: str) -> None: account = dict(getattr(self, "_account_snapshot", {})) observability_tags = _account_event_observability_tags(self) strategy_name = str(getattr(self, "_journal_strategy", "") or "").lower() journal_db = str(getattr(self, "_journal_db", "") or "") if not journal_db: if strategy_name == "pink": journal_db = "dolphin_pink" elif strategy_name == "green": journal_db = "dolphin_green" elif strategy_name.startswith("prod"): journal_db = "dolphin_prodgreen" else: journal_db = "dolphin" balances = account.get("balances", []) capital = _capital_from_balance_rows(balances) peak_capital = capital positions = dict(self._position_snapshots) if isinstance(self._position_snapshots, dict) else {} orders = dict(self._order_snapshots) if isinstance(self._order_snapshots, dict) else {} current_open_notional = _open_notional_from_positions(positions) current_account_leverage = current_open_notional / capital if capital > 0 else 0.0 fills_today = len(self._fill_snapshots) if isinstance(self._fill_snapshots, list) else 0 filled_orders_today = _filled_order_count_from_snapshots(self._fill_snapshots) exchange_leverage = 0 if getattr(self, "_configured_leverage", None): try: exchange_leverage = max( parsed for v in self._configured_leverage.values() if (parsed := _positive_int_or_none(v)) is not None ) except Exception: exchange_leverage = 0 row = { "ts": __import__("datetime").datetime.now(__import__("datetime").timezone.utc).strftime("%Y-%m-%d %H:%M:%S.%f"), "event_type": reason, "strategy": self._journal_strategy, "posture": "N/A", "capital": capital, "peak_capital": peak_capital, "drawdown_pct": 0.0, "pnl_today": 0.0, "trades_today": filled_orders_today, "open_positions": len(positions), "boost": 1.0, "beta": 1.0, "current_open_notional": current_open_notional, "current_account_leverage": current_account_leverage, "exchange_leverage": exchange_leverage, "exchange_leverage_mode": "mapped_conservative_integer", "leverage_mapping_rule": LEVERAGE_MAPPING_RULE, "notes": json.dumps( { "account_id": str(self.account_id), "ledger_authority": getattr(self, "_ledger_authority", "exchange"), "observability": observability_tags, "fills_today": fills_today, "filled_orders_today": filled_orders_today, "account": account, "orders": orders, "positions": positions, }, sort_keys=True, separators=(",", ":"), default=str, ), } row.update(observability_tags) if capital <= 0.0: self._log.error( "BingX account event has no usable capital: strategy=%s account_id=%s reason=%s", self._journal_strategy, self.account_id, reason, ) stable = { "event_type": row["event_type"], "strategy": row["strategy"], "posture": row["posture"], "capital": row["capital"], "peak_capital": row["peak_capital"], "drawdown_pct": row["drawdown_pct"], "trades_today": row["trades_today"], "open_positions": row["open_positions"], "current_open_notional": row["current_open_notional"], "current_account_leverage": row["current_account_leverage"], "exchange_leverage": row["exchange_leverage"], "boost": row["boost"], "beta": row["beta"], "notes": row["notes"], } fingerprint = sha256( json.dumps(stable, sort_keys=True, separators=(",", ":"), default=str).encode("utf-8") ).hexdigest() if fingerprint == self._account_event_last_fingerprint: return self._account_event_last_fingerprint = fingerprint try: body = json.dumps(row, sort_keys=True, separators=(",", ":"), default=str).encode("utf-8") sql = "INSERT INTO account_events FORMAT JSONEachRow" url = ( "http://localhost:8123/" f"?database={journal_db}&query={urllib.parse.quote(sql)}" ) req = urllib.request.Request(url, data=body, method="POST") req.add_header("X-ClickHouse-User", "dolphin") req.add_header("X-ClickHouse-Key", "dolphin_ch_2026") req.add_header("Content-Type", "application/octet-stream") with urllib.request.urlopen(req, timeout=5) as resp: if resp.status not in (200, 201): raise RuntimeError(f"HTTP {resp.status}") self._log.info(f"[CH] {journal_db} account event written: {reason}") except Exception as exc: self._log.debug(f"Failed to write {journal_db} account event: {exc}") def _journal_payload(self) -> dict[str, Any]: return { "account": dict(getattr(self, "_account_snapshot", {})), "orders": dict(getattr(self, "_order_snapshots", {})), "positions": dict(getattr(self, "_position_snapshots", {})), "configured_leverage": dict(getattr(self, "_configured_leverage", {})), "ws_healthy": bool(getattr(self, "_ws_healthy", False)), "ws_last_event_ns": int(getattr(self, "_ws_last_event_ns", 0)), "ws_supports_order_updates": bool(getattr(self, "_ws_supports_order_updates", False)), "fills": list(getattr(self, "_fill_snapshots", [])), "rate_limits": _jsonable_snapshot(self._client.rate_limit_snapshot()), "circuit_breaker": _jsonable_snapshot(self._client.circuit_breaker.snapshot()), "ledger_authority": getattr(self, "_ledger_authority", "exchange"), "account_id": str(self.account_id), } async def _persist_journal_snapshot(self, reason: str, *, force: bool = False) -> None: payload = self._journal_payload() fingerprint = sha256( json.dumps(payload, sort_keys=True, separators=(",", ":")).encode("utf-8") ).hexdigest() if not force and fingerprint == self._journal_last_fingerprint: return self._journal_last_fingerprint = fingerprint try: snapshot = build_journal_snapshot( strategy=self._journal_strategy, account_id=str(self.account_id), ledger_authority=getattr(self, "_ledger_authority", "exchange"), payload=payload, reason=reason, ) write_journal_snapshot(snapshot) except Exception as exc: self._log.warning(f"Failed to persist BingX journal snapshot: {exc}") async def _persist_health_alarm(self, reason: str, *, force: bool = True) -> None: persist = getattr(self, "_persist_journal_snapshot", None) if persist is not None: await persist(reason, force=force) async def _restore_journal_snapshot(self) -> None: if getattr(self, "_account_snapshot", None): return payload = load_journal_snapshot(strategy=self._journal_strategy, account_id=str(self.account_id)) if not payload: return await self._apply_journal_payload(payload) async def _apply_journal_payload(self, payload: dict[str, Any]) -> None: config = getattr(self, "_config", None) is_vst = bool(getattr(config, "is_vst", False)) account = payload.get("account") if isinstance(account, dict): self._account_snapshot = dict(account) orders = payload.get("orders") if isinstance(orders, dict): self._order_snapshots = dict(orders) self._order_id_aliases = {} for key, snapshot in self._order_snapshots.items(): if not isinstance(snapshot, dict): continue order_id = str(snapshot.get("orderId") or snapshot.get("orderID") or "") if order_id: self._order_id_aliases[order_id] = str(key) positions = payload.get("positions") if isinstance(positions, dict): if is_vst and getattr(self, "_ledger_authority", "exchange") == "exchange": # Live BingX is exchange-ledger authoritative. Do not resurrect # historical positions from the journal on startup; the live # REST/websocket refresh path will repopulate `_position_snapshots` # from the actual exchange state. self._position_snapshots = {} else: self._position_snapshots = dict(positions) elif is_vst and getattr(self, "_ledger_authority", "exchange") == "exchange": self._position_snapshots = {} fills = payload.get("fills") if isinstance(fills, list): self._fill_snapshots = [dict(row) for row in fills if isinstance(row, dict)] self._fill_snapshots_by_key = {} for snapshot in self._fill_snapshots: row = snapshot.get("row", {}) if not isinstance(row, dict): continue client_order_id = str(row.get("clientOrderId") or row.get("clientOrderID") or "") order_id = str(row.get("orderId") or row.get("orderID") or "") if client_order_id: self._fill_snapshots_by_key[client_order_id] = dict(row) if order_id: self._fill_snapshots_by_key[order_id] = dict(row) configured = payload.get("configured_leverage") if isinstance(configured, dict): self._configured_leverage = {} for key, value in configured.items(): parsed = _positive_int_or_none(value) if parsed is not None: self._configured_leverage[str(key)] = parsed balances_raw = account.get("balances") if isinstance(account, dict) else None if isinstance(balances_raw, list) and balances_raw: balances: list[AccountBalance] = [] margins: list[MarginBalance] = [] for row in balances_raw: if not isinstance(row, dict): continue asset = Currency.from_str(str(row.get("asset") or "USDT")) total = Money(Decimal(str(row.get("total") or "0")), asset) free = Money(Decimal(str(row.get("free") or "0")), asset) locked = Money(total.as_decimal() - free.as_decimal(), asset) balances.append(AccountBalance(total=total, locked=locked, free=free)) margins.append(MarginBalance(initial=Money(Decimal("0"), asset), maintenance=Money(Decimal("0"), asset))) if balances: clock = getattr(self, "_clock", None) ts_event = clock.timestamp_ns() if clock is not None and hasattr(clock, "timestamp_ns") else 0 self.generate_account_state( balances=balances, margins=margins, reported=True, ts_event=ts_event, info={"source": "journal", "raw": payload}, ) async def _check_state_drift(self, reason: str) -> None: drift: dict[str, Any] = {} account = self.get_account() snapshot = self._account_snapshot balances = snapshot.get("balances") if isinstance(snapshot, dict) else [] first = _first_balance_row(balances) if isinstance(account, MarginAccount) and first is not None: asset = Currency.from_str(str(first.get("asset") or "USDT")) expected_total = Decimal(str(first.get("total") or "0")) expected_free = Decimal(str(first.get("free") or "0")) expected_locked = Decimal(str(first.get("locked") or "0")) observed_total = account.balance_total(asset).as_decimal() observed_free = account.balance_free(asset).as_decimal() observed_locked = account.balance_locked(asset).as_decimal() if abs(observed_total - expected_total) > Decimal("0.00000001"): drift["balance_total"] = [str(expected_total), str(observed_total)] if abs(observed_free - expected_free) > Decimal("0.00000001"): drift["balance_free"] = [str(expected_free), str(observed_free)] if abs(observed_locked - expected_locked) > Decimal("0.00000001"): drift["balance_locked"] = [str(expected_locked), str(observed_locked)] cached_positions = { str(position.instrument_id.symbol.value): str(position.signed_qty) for position in self._cache.positions(venue=self.venue) } snapshot_positions = { str(row.get("symbol") or ""): str(row.get("positionAmt") or "0") for row in self._position_snapshots.values() } if cached_positions != snapshot_positions: drift["positions"] = { "exchange": snapshot_positions, "nautilus": cached_positions, } if drift: self._ws_healthy = False self._ledger_authority = "exchange" self._log.error(f"BingX/Nautilus state drift detected ({reason}): {drift}") persist = getattr(self, "_persist_journal_snapshot", None) if persist is not None: await persist("BINGX_DRIFT", force=True) def _start_user_stream(self) -> None: self._user_stream = BingxUserStream( client=self._client, config=self._config, on_event=self._handle_user_stream_event, on_health=self._handle_user_stream_health, ) self._user_stream_task = self.create_task( self._user_stream.run_forever(), log_msg="bingx_user_stream", ) def _start_pollers(self) -> None: self._poll_tasks = [ self.create_task( self._poll_open_orders_loop(), log_msg="bingx_poll_open_orders", ), self.create_task( self._poll_positions_loop(), log_msg="bingx_poll_positions", ), self.create_task( self._poll_account_loop(), log_msg="bingx_poll_account", ), ] async def _poll_open_orders_loop(self) -> None: try: while True: if self._should_poll_open_orders(): await self._refresh_open_orders() await asyncio.sleep(self._config.poll_open_orders_interval_ms / 1000.0) except asyncio.CancelledError: return async def _poll_positions_loop(self) -> None: try: while True: if self._should_use_http_polling(): await self._refresh_positions() await asyncio.sleep(self._config.poll_positions_interval_ms / 1000.0) except asyncio.CancelledError: return async def _poll_account_loop(self) -> None: try: while True: if self._should_use_http_polling(): await self._refresh_account_state() await asyncio.sleep(self._config.poll_account_interval_ms / 1000.0) except asyncio.CancelledError: return def _should_use_http_polling(self) -> bool: if not self._config.prefer_websocket: return True if not self._ws_healthy: return True stale_after_ns = int(self._config.ws_event_stale_after_ms) * 1_000_000 if stale_after_ns <= 0: return True return (self._clock.timestamp_ns() - self._ws_last_event_ns) > stale_after_ns def _should_poll_open_orders(self) -> bool: if self._should_use_http_polling(): return True return not self._ws_supports_order_updates def _handle_user_stream_health(self, healthy: bool) -> None: previous = self._ws_healthy self._ws_healthy = healthy if healthy: self._ws_last_event_ns = self._clock.timestamp_ns() if previous != healthy: self.create_task( self._persist_health_alarm("BINGX_WS_UP" if healthy else "BINGX_WS_DOWN"), log_msg="bingx_ws_health", ) async def _handle_user_stream_event(self, payload: dict[str, Any]) -> None: self._ws_last_event_ns = self._clock.timestamp_ns() data = payload.get("data") if isinstance(payload.get("data"), dict) else None event_type = str((data or payload).get("e") or "").upper() data_type = str(payload.get("dataType") or "").lower() if event_type == "ACCOUNT_UPDATE": await self._apply_account_update(data or payload) return if event_type == "ORDER_TRADE_UPDATE": await self._apply_order_trade_update(payload) return if event_type == "EXECUTIONREPORT" or data_type == "spot.executionreport": await self._apply_execution_report(data or payload) return if event_type == "ACCOUNT_CONFIG_UPDATE": await self._apply_account_config_update((data or payload).get("ac")) return if event_type == "SNAPSHOT": # TODO(dolphin): Absorb the initial BingX futures SNAPSHOT burst into a # ws_primed startup/reconnect phase. Use it to warm leverage/margin-mode # caches and suppress false drift alarms / excess REST polling until the # snapshot flood has drained. Do not treat SNAPSHOT as order-execution truth. await self._apply_account_config_update((data or payload).get("ac")) return if event_type == "LISTENKEYEXPIRED": self._ws_healthy = False await self._persist_health_alarm("BINGX_LISTENKEY_EXPIRED") async def _apply_account_update(self, payload: dict[str, Any]) -> None: account = payload.get("a") if not isinstance(account, dict): return updated = False balances_raw = account.get("B") if isinstance(balances_raw, list): balances: list[AccountBalance] = [] margins: list[MarginBalance] = [] for row in balances_raw: if not isinstance(row, dict): continue asset = Currency.from_str(str(row.get("a") or "USDT")) total = Money(Decimal(str(row.get("wb") or "0")), asset) free = Money(Decimal(str(row.get("cw") or row.get("wb") or "0")), asset) locked = Money(total.as_decimal() - free.as_decimal(), asset) balances.append(AccountBalance(total=total, locked=locked, free=free)) margins.append( MarginBalance( initial=Money(Decimal("0"), asset), maintenance=Money(Decimal("0"), asset), ), ) if balances: async with self._state_lock: self._account_snapshot = { "source": "ws", "ts_event": self._clock.timestamp_ns(), "balances": [ { "asset": balance.total.currency.code, "total": _decimal_text(balance.total.as_decimal()), "free": _decimal_text(balance.free.as_decimal()), "locked": _decimal_text(balance.locked.as_decimal()), } for balance in balances ], "margins": [ { "asset": margin.initial.currency.code, "initial": str(margin.initial), "maintenance": str(margin.maintenance), } for margin in margins ], "raw": account, } self.generate_account_state( balances=balances, margins=margins, reported=True, ts_event=self._clock.timestamp_ns(), info={"source": "ws", "raw": account}, ) updated = True positions_raw = account.get("P") if isinstance(positions_raw, list): async with self._state_lock: for row in positions_raw: if not isinstance(row, dict): continue symbol = self._normalize_symbol(str(row.get("s") or "")) amount = Decimal(str(row.get("pa") or "0")) if amount == 0: self._position_snapshots.pop(symbol, None) continue self._position_snapshots[symbol] = { "symbol": row.get("s"), "positionAmt": row.get("pa"), "entryPrice": row.get("ep"), "unRealizedProfit": row.get("up"), "marginType": row.get("mt"), "positionSide": row.get("ps"), "positionID": row.get("positionID") or row.get("positionId"), } updated = True if not updated: return persist = getattr(self, "_persist_journal_snapshot", None) if persist is not None: await persist("ACCOUNT_UPDATE") self._write_account_event("ACCOUNT_UPDATE") drift_check = getattr(self, "_check_state_drift", None) if drift_check is not None: await drift_check("ACCOUNT_UPDATE") async def _apply_order_trade_update(self, payload: dict[str, Any]) -> None: self._ws_supports_order_updates = True order_update = payload.get("o") if not isinstance(order_update, dict): return client_order_id = str(order_update.get("c") or "") venue_order_id = str(order_update.get("i") or "") if not client_order_id and venue_order_id: async with self._state_lock: client_order_id = self._order_id_aliases.get(venue_order_id, "") row = { "symbol": order_update.get("s"), "clientOrderId": client_order_id, "clientOrderID": client_order_id, "orderId": order_update.get("i"), "orderID": order_update.get("i"), "status": order_update.get("X") or order_update.get("x"), "price": order_update.get("p"), "avgPrice": order_update.get("ap") or order_update.get("L") or order_update.get("p"), "executedQty": order_update.get("z") or "0", "cumFilledQty": order_update.get("z") or "0", "lastFilledQty": order_update.get("l") or "0", "lastFillPrice": order_update.get("L") or order_update.get("ap") or order_update.get("p"), "commission": order_update.get("n") or "0", "commissionAsset": order_update.get("N"), "executionType": order_update.get("x"), "positionID": order_update.get("positionID") or order_update.get("positionId"), "triggerOrderId": order_update.get("triggerOrderId"), "mainOrderId": order_update.get("mainOrderId"), "source": "ws_order_trade_update", "raw": order_update, } cached_order = self._cache.order(ClientOrderId(client_order_id)) if client_order_id else None if cached_order is not None: instrument = self._cache.instrument(cached_order.instrument_id) if instrument is not None: try: row["friction"] = self._build_friction_snapshot( cached_order, row, instrument, feed_source=str(row.get("source") or "ws_order_trade_update"), ) except Exception: pass async with self._state_lock: if client_order_id and venue_order_id: self._order_id_aliases[venue_order_id] = client_order_id if client_order_id: async with self._state_lock: self._order_snapshots[client_order_id] = row signal_update = getattr(self, "_signal_order_update", None) if signal_update is not None: signal_update(client_order_id) if venue_order_id: async with self._state_lock: self._order_snapshots[venue_order_id] = row signal_update = getattr(self, "_signal_order_update", None) if signal_update is not None: signal_update(venue_order_id) persist = getattr(self, "_persist_journal_snapshot", None) if persist is not None: await persist("ORDER_TRADE_UPDATE") if str(order_update.get("x") or "").upper() == "TRADE_PREVENTION": cached_order = self._cache.order(ClientOrderId(client_order_id)) if cached_order is None: return self.generate_order_rejected( strategy_id=cached_order.strategy_id, instrument_id=cached_order.instrument_id, client_order_id=ClientOrderId(client_order_id), reason="Post-Only would cross spread", ts_event=self._clock.timestamp_ns(), due_post_only=True, ) return emit_progress = getattr(self, "_emit_order_progress", None) if emit_progress is not None: await emit_progress(row) async def _apply_execution_report(self, order_update: dict[str, Any]) -> None: if not isinstance(order_update, dict): return self._ws_supports_order_updates = True payload = { "o": { "s": order_update.get("s"), "c": order_update.get("c") or order_update.get("clientOrderId") or order_update.get("clientOrderID"), "i": order_update.get("i") or order_update.get("orderId") or order_update.get("orderID"), "X": order_update.get("X"), "x": order_update.get("x"), "p": order_update.get("p") or order_update.get("price"), "ap": order_update.get("ap") or order_update.get("avgPrice"), "z": order_update.get("z") or order_update.get("executedQty") or order_update.get("cumFilledQty"), "l": order_update.get("l") or order_update.get("lastFilledQty") or order_update.get("lastExecutedQty"), "L": order_update.get("L") or order_update.get("lastFillPrice") or order_update.get("avgPrice"), "n": order_update.get("n") or order_update.get("commission"), "N": order_update.get("N") or order_update.get("commissionAsset"), "positionID": order_update.get("positionID") or order_update.get("positionId"), "triggerOrderId": order_update.get("triggerOrderId"), "mainOrderId": order_update.get("mainOrderId"), } } await self._apply_order_trade_update(payload) async def _apply_account_config_update(self, payload: Any) -> None: if not isinstance(payload, dict): return symbol = self._normalize_symbol(str(payload.get("s") or "")) if not symbol: return leverage = payload.get("S") or payload.get("l") or 1 parsed = _positive_int_or_none(leverage) if parsed is None: return async with self._state_lock: self._configured_leverage[symbol] = parsed async def _refresh_account_state(self) -> None: try: balances_raw = await self._client.signed_get("/openApi/swap/v2/user/balance") rows_raw = balances_raw if isinstance(balances_raw, list) else balances_raw.get("balance", []) if isinstance(rows_raw, dict): rows = [rows_raw] elif isinstance(rows_raw, list): rows = rows_raw else: rows = [] balances: list[AccountBalance] = [] margins: list[MarginBalance] = [] for row in rows: asset = Currency.from_str(str(row.get("asset", "USDT"))) total = Money(Decimal(str(row.get("balance") or row.get("equity") or "0")), asset) free = Money( Decimal(str(row.get("availableBalance") or row.get("availableMargin") or "0")), asset, ) locked = Money(total.as_decimal() - free.as_decimal(), asset) balances.append(AccountBalance(total=total, locked=locked, free=free)) margins.append( MarginBalance( initial=Money( Decimal(str(row.get("initialMargin") or "0")), asset, ), maintenance=Money( Decimal(str(row.get("maintMargin") or "0")), asset, ), ), ) async with self._state_lock: self.generate_account_state( balances=balances, margins=margins, reported=True, ts_event=self._clock.timestamp_ns(), info={"source": "rest", "raw": balances_raw}, ) self._account_snapshot = { "source": "rest", "ts_event": self._clock.timestamp_ns(), "balances": [ { "asset": balance.total.currency.code, "total": _decimal_text(balance.total.as_decimal()), "free": _decimal_text(balance.free.as_decimal()), "locked": _decimal_text(balance.locked.as_decimal()), } for balance in balances ], "margins": [ { "asset": margin.initial.currency.code, "initial": str(margin.initial), "maintenance": str(margin.maintenance), } for margin in margins ], "raw": balances_raw, } persist = getattr(self, "_persist_journal_snapshot", None) if persist is not None: await persist("ACCOUNT_REFRESH") self._write_account_event("ACCOUNT_REFRESH") drift_check = getattr(self, "_check_state_drift", None) if drift_check is not None: await drift_check("ACCOUNT_REFRESH") except Exception as exc: self._log.warning(f"Failed to refresh BingX account state: {exc}") await self._persist_health_alarm("BINGX_REST_FAIL") async def _refresh_positions(self) -> None: try: positions = await self._client.signed_get("/openApi/swap/v2/user/positions") rows = _rows_from_payload(positions, "positions", "data") exchange_snapshots: dict[str, dict[str, Any]] = { self._normalize_symbol(str(row.get("symbol", ""))): row for row in rows if Decimal(str(row.get("positionAmt") or row.get("positionQty") or "0")) != 0 } async with self._state_lock: self._position_snapshots = dict(exchange_snapshots) await self._sync_cache_positions_with_snapshot(exchange_snapshots) persist = getattr(self, "_persist_journal_snapshot", None) if persist is not None: await persist("POSITION_REFRESH") drift_check = getattr(self, "_check_state_drift", None) if drift_check is not None: await drift_check("POSITION_REFRESH") except Exception as exc: self._log.warning(f"Failed to refresh BingX positions: {exc}") await self._persist_health_alarm("BINGX_REST_FAIL") async def _sync_cache_positions_with_snapshot(self, exchange_snapshots: dict[str, dict[str, Any]]) -> None: """Purge stale Nautilus cache positions that no longer exist on BingX. BingX is the authoritative ledger in live/testnet mode. If the exchange reports no open position for a symbol, but Nautilus still has a cached open position, that cached leg must be removed so the actor stops treating it as live exposure. """ cache_positions = list(self._cache.positions(venue=self.venue)) if not cache_positions: return stale_positions = [] for position in cache_positions: try: instrument_id = getattr(position, "instrument_id", None) symbol = str(getattr(getattr(instrument_id, "symbol", None), "value", "") or "") norm_symbol = self._normalize_symbol(symbol) if norm_symbol and norm_symbol not in exchange_snapshots: stale_positions.append(position) except Exception: continue if not stale_positions: return for position in stale_positions: try: position_id = getattr(position, "position_id", None) or getattr(position, "id", None) if position_id is None: continue self._cache.purge_position(position_id, purge_from_database=False) except Exception as exc: self._log.debug(f"Failed to purge stale BingX cache position: {exc}") async def _refresh_open_orders(self) -> None: try: orders = await self._client.signed_get("/openApi/swap/v2/trade/openOrders") rows = _rows_from_payload(orders, "orders", "data") active_client_ids = set() for row in rows: client_order_id = str(row.get("clientOrderID") or row.get("clientOrderId") or "") if not client_order_id: continue active_client_ids.add(client_order_id) async with self._state_lock: self._order_snapshots[client_order_id] = row await self._emit_order_progress(row) async with self._state_lock: snapshots = list(self._order_snapshots.items()) for client_order_id, snapshot in snapshots: if client_order_id in active_client_ids: continue await self._emit_terminal_reconciliation(snapshot) persist = getattr(self, "_persist_journal_snapshot", None) if persist is not None: await persist("OPEN_ORDER_REFRESH") except Exception as exc: self._log.warning(f"Failed to refresh BingX open orders: {exc}") await self._persist_health_alarm("BINGX_REST_FAIL") async def _emit_order_progress(self, row: dict[str, Any]) -> None: client_order_id = str(row.get("clientOrderID") or row.get("clientOrderId") or "") venue_order_id = str(row.get("orderId") or row.get("orderID") or "") if not client_order_id and venue_order_id: async with self._state_lock: client_order_id = self._order_id_aliases.get(venue_order_id, "") order = self._cache.order(ClientOrderId(client_order_id)) if client_order_id else None if order is None: return venue_order_id = VenueOrderId(venue_order_id or str(row.get("id") or "")) status = str(row.get("status") or "").upper() if status in {"NEW", "PENDING", "CREATED"}: self.generate_order_accepted( strategy_id=order.strategy_id, instrument_id=order.instrument_id, client_order_id=order.client_order_id, venue_order_id=venue_order_id, ts_event=self._clock.timestamp_ns(), ) signal_update = getattr(self, "_signal_order_update", None) if signal_update is not None: signal_update(client_order_id) return filled_qty = Decimal(str(row.get("executedQty") or row.get("cumFilledQty") or "0")) last_filled_qty = Decimal(str(row.get("lastFilledQty") or row.get("lastExecutedQty") or "0")) if last_filled_qty > 0: await self._emit_fill_from_row( order=order, row=row, venue_order_id=venue_order_id, last_filled_qty=last_filled_qty, cumulative_filled_qty=filled_qty if filled_qty > 0 else last_filled_qty, ) elif filled_qty > 0: await self._emit_fill_from_row( order=order, row=row, venue_order_id=venue_order_id, last_filled_qty=filled_qty, cumulative_filled_qty=filled_qty, ) if status in {"CANCELED", "CANCELLED"}: async with self._state_lock: if client_order_id in self._reported_terminal_orders: return self._reported_terminal_orders.add(client_order_id) self.generate_order_canceled( strategy_id=order.strategy_id, instrument_id=order.instrument_id, client_order_id=order.client_order_id, venue_order_id=venue_order_id, ts_event=self._clock.timestamp_ns(), ) signal_update = getattr(self, "_signal_order_update", None) if signal_update is not None: signal_update(client_order_id) return if status in {"FAILED", "REJECTED"}: async with self._state_lock: if client_order_id in self._reported_terminal_orders: return self._reported_terminal_orders.add(client_order_id) self.generate_order_rejected( strategy_id=order.strategy_id, instrument_id=order.instrument_id, client_order_id=order.client_order_id, reason=status, ts_event=self._clock.timestamp_ns(), due_post_only=False, ) signal_update = getattr(self, "_signal_order_update", None) if signal_update is not None: signal_update(client_order_id) return if status == "EXPIRED": async with self._state_lock: if client_order_id in self._reported_terminal_orders: return self._reported_terminal_orders.add(client_order_id) self.generate_order_canceled( strategy_id=order.strategy_id, instrument_id=order.instrument_id, client_order_id=order.client_order_id, venue_order_id=venue_order_id, ts_event=self._clock.timestamp_ns(), ) signal_update = getattr(self, "_signal_order_update", None) if signal_update is not None: signal_update(client_order_id) async def _emit_terminal_reconciliation(self, row: dict[str, Any]) -> None: client_order_id = str(row.get("clientOrderID") or row.get("clientOrderId") or "") venue_order_id = str(row.get("orderId") or row.get("orderID") or "") if not client_order_id and venue_order_id: async with self._state_lock: client_order_id = self._order_id_aliases.get(venue_order_id, "") if not client_order_id: return async with self._state_lock: if client_order_id in self._reported_terminal_orders: self._order_snapshots.pop(client_order_id, None) return order = self._cache.order(ClientOrderId(client_order_id)) if order is None: async with self._state_lock: self._order_snapshots.pop(client_order_id, None) return try: detail = await self._client.signed_get( "/openApi/swap/v2/trade/order", { "symbol": self._venue_symbol(order.instrument_id), "clientOrderId": client_order_id, }, ) detail_row = unwrap_order_payload(detail) if isinstance(detail, dict) else None if isinstance(detail_row, dict): detail_row.setdefault("source", "rest_order_detail") await self._emit_order_progress(detail_row) terminal_handler = get_terminal_trade_handler(str(self.account_id)) if terminal_handler is not None: result = terminal_handler( { "account_id": str(self.account_id), "client_order_id": client_order_id, "venue_order_id": venue_order_id, "snapshot_row": dict(row), "detail_row": dict(detail_row), } ) if asyncio.iscoroutine(result): await result finally: async with self._state_lock: self._order_snapshots.pop(client_order_id, None) signal_update = getattr(self, "_signal_order_update", None) if signal_update is not None: signal_update(client_order_id) async def _emit_fill_from_row( self, order: Order, row: dict[str, Any], venue_order_id: VenueOrderId, last_filled_qty: Decimal, cumulative_filled_qty: Decimal, ) -> None: last_qty = Quantity.from_str(str(last_filled_qty)) avg_price_raw = ( row.get("lastFillPrice") or row.get("avgPrice") or row.get("avgFilledPrice") or row.get("price") ) last_px = Price.from_str(str(avg_price_raw or "0")) trade_key = f"{venue_order_id.value}:{cumulative_filled_qty}:{last_px}" instrument = self._cache.instrument(order.instrument_id) if instrument is None: return feed_source = str(row.get("source") or "unknown") friction = self._build_friction_snapshot( order, row, instrument, last_qty=last_qty, last_px=last_px, feed_source=feed_source, ) enriched_row = dict(row) enriched_row["friction"] = friction quality_builder = getattr(self, "_build_execution_quality_record", None) if not callable(quality_builder): quality_builder = BingxExecutionClient._build_execution_quality_record quality_record = quality_builder( order=order, venue_order_id=venue_order_id, row=enriched_row, friction=friction, source=str(enriched_row.get("source") or "unknown"), ) enriched_row["execution_quality"] = quality_record async with self._state_lock: if any(snapshot.get("_trade_key") == trade_key for snapshot in self._fill_snapshots): return self._fill_snapshots.append({"_trade_key": trade_key, "row": enriched_row}) if len(self._fill_snapshots) > 1024: del self._fill_snapshots[:-1024] self._fill_snapshots_by_key[order.client_order_id.value] = dict(enriched_row) self._fill_snapshots_by_key[str(venue_order_id.value)] = dict(enriched_row) self._order_id_aliases[str(venue_order_id.value)] = order.client_order_id.value order_snapshot = self._order_snapshots.get(order.client_order_id.value) if isinstance(order_snapshot, dict): order_snapshot["lastFriction"] = friction order_snapshot["lastExecutionQuality"] = quality_record persist_quality = getattr(self, "_persist_execution_quality", None) if callable(persist_quality): persist_quality(quality_record) else: BingxExecutionClient._persist_execution_quality(self, quality_record) self.generate_order_filled( strategy_id=order.strategy_id, instrument_id=order.instrument_id, client_order_id=order.client_order_id, venue_order_id=venue_order_id, venue_position_id=self._position_id_from_snapshot(row), trade_id=TradeId(trade_key), order_side=order.side, order_type=order.order_type, last_qty=last_qty, last_px=last_px, quote_currency=instrument.quote_currency, commission=Money(Decimal(str(friction["commission_quote"] or "0")), instrument.quote_currency), liquidity_side=LiquiditySide[friction["liquidity_side"]], ts_event=self._clock.timestamp_ns(), ) status = str(row.get("status") or "").upper() if status == "FILLED": async with self._state_lock: self._reported_terminal_orders.add(order.client_order_id.value) signal_fill = getattr(self, "_signal_fill_update", None) if signal_fill is not None: signal_fill(order.client_order_id.value) signal_fill(str(venue_order_id.value)) async def _submit_order(self, command: SubmitOrder) -> None: order = command.order self.generate_order_submitted( strategy_id=order.strategy_id, instrument_id=order.instrument_id, client_order_id=order.client_order_id, ts_event=self._clock.timestamp_ns(), ) try: per_trade_lev = self._parse_leverage_from_tags(order) self._log.info( f"[BINGX] submit start symbol={self._venue_symbol(order.instrument_id)} " f"side={order.side} qty={order.quantity} " f"lev={per_trade_lev if per_trade_lev is not None else self._config.default_leverage} " f"client_order_id={order.client_order_id.value}" ) await self._ensure_leverage(order.instrument_id, per_trade_leverage=per_trade_lev) submit_payload = self._map_submit_order(order) self._log.info( f"[BINGX] submit payload symbol={submit_payload.get('symbol')} " f"type={submit_payload.get('type')} side={submit_payload.get('side')} " f"qty={submit_payload.get('quantity')}" ) ack_payload = await self._client.signed_post( "/openApi/swap/v2/trade/order", submit_payload, ) ack = BingxOrderAck.from_http(ack_payload) self._log.info( f"[BINGX] submit ack client_order_id={order.client_order_id.value} " f"order_id={ack.order_id} status={ack.status}" ) ack_row_raw = unwrap_order_payload(ack_payload) ack_row = dict(ack_row_raw) if isinstance(ack_row_raw, dict) else {} if ack_row: ack_row.setdefault("orderId", ack.order_id) ack_row.setdefault("orderID", ack.order_id) ack_row.setdefault("clientOrderId", ack.client_order_id or order.client_order_id.value) ack_row.setdefault("clientOrderID", ack.client_order_id or order.client_order_id.value) ack_row.setdefault("symbol", self._venue_symbol(order.instrument_id)) self.generate_order_accepted( strategy_id=order.strategy_id, instrument_id=order.instrument_id, client_order_id=order.client_order_id, venue_order_id=VenueOrderId(ack.order_id or order.client_order_id.value), ts_event=self._clock.timestamp_ns(), ) async with self._state_lock: self._order_snapshots[order.client_order_id.value] = { **ack_row, "orderId": ack.order_id, "orderID": ack.order_id, "clientOrderId": order.client_order_id.value, "clientOrderID": order.client_order_id.value, "status": str(ack_row.get("status") or "NEW"), "symbol": self._venue_symbol(order.instrument_id), "raw": ack_payload, } if ack.order_id: self._order_id_aliases[str(ack.order_id)] = order.client_order_id.value terminal_status = str(ack_row.get("status") or "").upper() executed_qty = Decimal(str(ack_row.get("executedQty") or ack_row.get("cumFilledQty") or "0")) if terminal_status in {"FILLED", "PARTIALLY_FILLED", "CANCELED", "CANCELLED", "REJECTED", "FAILED", "EXPIRED"} or executed_qty > 0: ack_progress = dict(ack_row) ack_progress["clientOrderId"] = order.client_order_id.value ack_progress["clientOrderID"] = order.client_order_id.value ack_progress["orderId"] = ack.order_id ack_progress["orderID"] = ack.order_id if ack_progress.get("lastFilledQty") in (None, "", "0", "0.0", "0.0000") and executed_qty > 0: ack_progress["lastFilledQty"] = ack_progress.get("executedQty") or ack_progress.get("cumFilledQty") await self._emit_order_progress(ack_progress) persist = getattr(self, "_persist_journal_snapshot", None) if persist is not None: await persist("ORDER_SUBMIT") except Exception as exc: due_post_only = "post only" in str(exc).lower() self._log.exception( f"[BINGX] submit failed client_order_id={order.client_order_id.value} " f"symbol={self._venue_symbol(order.instrument_id)}", exc, ) self.generate_order_rejected( strategy_id=order.strategy_id, instrument_id=order.instrument_id, client_order_id=order.client_order_id, reason=str(exc), ts_event=self._clock.timestamp_ns(), due_post_only=due_post_only, ) await self._persist_health_alarm("BINGX_ORDER_REJECTED") async def _submit_order_list(self, command: SubmitOrderList) -> None: for order in command.order_list.orders: await self._submit_order(SubmitOrder(order=order, command_id=UUID4(), ts_init=command.ts_init)) async def _modify_order(self, command: ModifyOrder) -> None: order = self._cache.order(command.client_order_id) if order is None: self.generate_order_modify_rejected( strategy_id=command.strategy_id, instrument_id=command.instrument_id, client_order_id=command.client_order_id, venue_order_id=command.venue_order_id, reason="ORDER_NOT_FOUND_IN_CACHE", ts_event=self._clock.timestamp_ns(), ) return try: await self._client.signed_post( "/openApi/swap/v1/trade/cancelReplace", self._map_modify_order(order, command), ) except Exception as exc: self.generate_order_modify_rejected( strategy_id=command.strategy_id, instrument_id=command.instrument_id, client_order_id=command.client_order_id, venue_order_id=command.venue_order_id, reason=str(exc), ts_event=self._clock.timestamp_ns(), ) await self._persist_health_alarm("BINGX_ORDER_MODIFY_FAIL") async def _cancel_order(self, command: CancelOrder) -> None: try: order = self._cache.order(command.client_order_id) instrument_id = order.instrument_id if order is not None else command.instrument_id snapshot = self._order_snapshots.get(command.client_order_id.value, {}) params = {"symbol": self._venue_symbol(instrument_id)} if snapshot.get("orderId"): params["orderId"] = snapshot["orderId"] else: params["clientOrderId"] = command.client_order_id.value await self._client.signed_delete("/openApi/swap/v2/trade/order", params) except Exception as exc: self.generate_order_cancel_rejected( strategy_id=command.strategy_id, instrument_id=command.instrument_id, client_order_id=command.client_order_id, venue_order_id=command.venue_order_id, reason=str(exc), ts_event=self._clock.timestamp_ns(), ) await self._persist_health_alarm("BINGX_ORDER_CANCEL_FAIL") async def _cancel_all_orders(self, command: CancelAllOrders) -> None: try: params = {} if command.instrument_id is not None: params["symbol"] = self._venue_symbol(command.instrument_id) await self._client.signed_delete("/openApi/swap/v2/trade/allOpenOrders", params) except Exception as exc: self._log.warning(f"Cancel all orders failed: {exc}") await self._persist_health_alarm("BINGX_ORDER_CANCEL_ALL_FAIL") async def _batch_cancel_orders(self, command: BatchCancelOrders) -> None: orders = [cancel.client_order_id.value for cancel in command.cancels] if not orders: return try: await self._client.signed_delete( "/openApi/swap/v2/trade/batchOrders", { "symbol": self._venue_symbol(command.cancels[0].instrument_id), "clientOrderIdList": ",".join(orders), }, ) except Exception as exc: for cancel in command.cancels: self.generate_order_cancel_rejected( strategy_id=cancel.strategy_id, instrument_id=cancel.instrument_id, client_order_id=cancel.client_order_id, venue_order_id=cancel.venue_order_id, reason=str(exc), ts_event=self._clock.timestamp_ns(), ) await self._persist_health_alarm("BINGX_BATCH_CANCEL_FAIL") async def generate_order_status_report( self, command: GenerateOrderStatusReport, ) -> OrderStatusReport | None: client_order_id = command.client_order_id.value if command.client_order_id else None if not client_order_id: return None order = self._cache.order(ClientOrderId(client_order_id)) async with self._state_lock: raw_snapshot = self._order_snapshots.get(client_order_id) snapshot = dict(raw_snapshot) if raw_snapshot is not None else None if order is None or snapshot is None: return None return self._build_order_status_report(order, snapshot) async def generate_order_status_reports( self, command: GenerateOrderStatusReports, ) -> list[OrderStatusReport]: reports: list[OrderStatusReport] = [] async with self._state_lock: snapshots = list(self._order_snapshots.items()) for client_order_id, snapshot in snapshots: order = self._cache.order(ClientOrderId(client_order_id)) if order is None: continue if command.instrument_id and order.instrument_id != command.instrument_id: continue reports.append(self._build_order_status_report(order, snapshot)) return reports async def generate_mass_status( self, lookback_mins: int | None = None, ) -> ExecutionMassStatus | None: logger = getattr(self, "_log", None) if logger is not None: logger.info("Generating BingX ExecutionMassStatus...") self.reconciliation_active = True mass_status = ExecutionMassStatus( client_id=self.id, account_id=self.account_id, venue=self.venue, report_id=UUID4(), ts_init=self._clock.timestamp_ns(), ) since = None if lookback_mins is not None: since = self._clock.utc_now() - timedelta(minutes=lookback_mins) try: await asyncio.gather( self._refresh_account_state(), self._refresh_positions(), self._refresh_open_orders(), ) order_reports, fill_reports, position_reports = await asyncio.gather( self.generate_order_status_reports( GenerateOrderStatusReports( instrument_id=None, start=since, end=None, open_only=False, command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ), ), self.generate_fill_reports( GenerateFillReports( instrument_id=None, venue_order_id=None, start=since, end=None, command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ), ), self.generate_position_status_reports( GeneratePositionStatusReports( instrument_id=None, start=since, end=None, command_id=UUID4(), ts_init=self._clock.timestamp_ns(), ), ), ) mass_status.add_order_reports(reports=order_reports) mass_status.add_fill_reports(reports=fill_reports) mass_status.add_position_reports(reports=position_reports) self.reconciliation_active = False return mass_status except Exception as exc: if logger is not None: logger.exception(f"Cannot reconcile BingX execution state: {exc}") self.reconciliation_active = False return None async def generate_fill_reports( self, command: GenerateFillReports, ) -> list[FillReport]: reports: list[FillReport] = [] async with self._state_lock: snapshots = list(self._fill_snapshots) for snapshot in snapshots: row = snapshot.get("row", {}) client_order_id = str(row.get("clientOrderId") or row.get("clientOrderID") or "") if not client_order_id: order_id = str(row.get("orderId") or row.get("orderID") or "") if order_id: async with self._state_lock: client_order_id = self._order_id_aliases.get(order_id, "") order = self._cache.order(ClientOrderId(client_order_id)) if client_order_id else None if order is None: continue instrument = self._cache.instrument(order.instrument_id) if instrument is None: continue friction = row.get("friction") if not isinstance(friction, dict): friction = self._build_friction_snapshot( order, row, instrument, feed_source=str(row.get("source") or "unknown"), ) reports.append( FillReport( account_id=self.account_id, instrument_id=order.instrument_id, venue_order_id=VenueOrderId(str(row.get("orderId") or order.client_order_id.value)), trade_id=TradeId(str(snapshot.get("_trade_key") or row.get("orderId") or "")), order_side=order.side, last_qty=Quantity.from_str(str(row.get("lastFilledQty") or row.get("executedQty") or "0")), last_px=Price.from_str(str(row.get("lastFillPrice") or row.get("avgPrice") or "0")), commission=Money(Decimal(str(friction.get("commission_quote") or "0")), instrument.quote_currency), liquidity_side=LiquiditySide[str(friction.get("liquidity_side") or "NO_LIQUIDITY_SIDE").upper()], report_id=UUID4(), ts_event=self._clock.timestamp_ns(), ts_init=self._clock.timestamp_ns(), client_order_id=order.client_order_id, venue_position_id=self._position_id_from_snapshot(row), ) ) return reports async def generate_position_status_reports( self, command: GeneratePositionStatusReports, ) -> list[PositionStatusReport]: reports: list[PositionStatusReport] = [] async with self._state_lock: rows = list(self._position_snapshots.values()) for row in rows: symbol = self._normalize_symbol(str(row.get("symbol") or "")) if not symbol: continue instrument = None for candidate in self._provider.list_all(): if candidate.symbol.value == symbol or candidate.raw_symbol.value == str(row.get("symbol") or ""): instrument = candidate break if instrument is None: continue qty = Decimal(str(row.get("positionAmt") or "0")) if qty == 0: continue side = PositionSide.LONG if qty > 0 else PositionSide.SHORT reports.append( PositionStatusReport( account_id=self.account_id, instrument_id=instrument.id, position_side=side, quantity=Quantity.from_str(str(abs(qty))), report_id=UUID4(), ts_last=self._clock.timestamp_ns(), ts_init=self._clock.timestamp_ns(), venue_position_id=self._position_id_from_snapshot(row), ) ) return reports def _build_order_status_report( self, order: Order, snapshot: dict[str, Any], ) -> OrderStatusReport: instrument = self._cache.instrument(order.instrument_id) if instrument is None: raise BingxHttpError(f"Instrument {order.instrument_id} not available in cache") quantity = order.quantity filled_qty = Quantity.from_str(str(snapshot.get("executedQty") or snapshot.get("cumFilledQty") or "0")) raw_status = str(snapshot.get("status") or "NEW") status = self._map_order_status(raw_status) if filled_qty.as_decimal() > 0 and filled_qty.as_decimal() < quantity.as_decimal(): status = OrderStatus.PARTIALLY_FILLED elif filled_qty.as_decimal() >= quantity.as_decimal() and status not in {OrderStatus.CANCELED, OrderStatus.REJECTED, OrderStatus.EXPIRED}: status = OrderStatus.FILLED return OrderStatusReport( account_id=self.account_id, instrument_id=order.instrument_id, venue_order_id=VenueOrderId(str(snapshot.get("orderId") or order.client_order_id.value)), order_side=order.side, order_type=order.order_type, time_in_force=order.time_in_force, order_status=status, quantity=quantity, filled_qty=filled_qty, report_id=UUID4(), ts_accepted=self._clock.timestamp_ns(), ts_last=self._clock.timestamp_ns(), ts_init=self._clock.timestamp_ns(), client_order_id=order.client_order_id, price=order.price if order.has_price else None, trigger_price=order.trigger_price if order.has_trigger_price else None, avg_px=Decimal(str(snapshot.get("avgPrice") or snapshot.get("avgFilledPrice") or "0")), post_only=order.is_post_only, reduce_only=order.is_reduce_only, venue_position_id=self._position_id_from_snapshot(snapshot), ) @staticmethod def _parse_leverage_from_tags(order: Order) -> float | None: tags = getattr(order, "tags", None) or [] for tag in tags: if isinstance(tag, str) and (tag.startswith("lev:") or tag.startswith("cm:")): try: return float(tag.split(":", 1)[1]) except (ValueError, IndexError): pass return None async def _ensure_leverage(self, instrument_id: InstrumentId, per_trade_leverage: float | None = None) -> None: symbol = self._normalize_symbol(instrument_id.symbol.value) desired = ( map_internal_conviction_to_exchange_leverage( per_trade_leverage, exchange_max=self._config.exchange_leverage_cap, ) if per_trade_leverage is not None else normalize_bingx_leverage_value( self._config.default_leverage, exchange_max=self._config.exchange_leverage_cap, ) ) if self._config.leverage_by_symbol and symbol in self._config.leverage_by_symbol: desired = normalize_bingx_leverage_value( self._config.leverage_by_symbol[symbol], exchange_max=self._config.exchange_leverage_cap, ) if not await _reserve_leverage_update(self, symbol, desired): return try: await self._client.signed_post( "/openApi/swap/v2/trade/leverage", {"symbol": self._venue_symbol(instrument_id), "side": "BOTH", "leverage": desired}, ) except Exception: await _release_leverage_update(self, symbol, desired) raise await _commit_leverage_update(self, symbol, desired) account = self.get_account() if isinstance(account, MarginAccount): account.set_leverage(instrument_id, Decimal(desired)) def _map_submit_order(self, order: Order) -> dict[str, Any]: close_position = bool(getattr(order, "close_position", False) or getattr(order, "is_close_position", False)) if close_position and order.is_reduce_only: raise BingxHttpError("closePosition cannot be combined with reduceOnly") if close_position and order.order_type not in {OrderType.STOP_MARKET, OrderType.MARKET_IF_TOUCHED}: raise BingxHttpError("closePosition requires STOP_MARKET or TAKE_PROFIT_MARKET") if order.is_post_only and order.order_type != OrderType.LIMIT: raise BingxHttpError("Post-only is only valid for limit orders") params: dict[str, Any] = { "symbol": self._venue_symbol(order.instrument_id), "side": order_side_to_str(order.side), "positionSide": "BOTH", "type": self._map_order_type(order), "quantity": self._format_quantity(order.quantity), "clientOrderId": order.client_order_id.value, "recvWindow": str(_positive_int_or_none(self._config.recv_window_ms) or 5_000), } if order.has_price: params["price"] = self._format_price(order.price) if order.has_trigger_price: params["stopPrice"] = self._format_price(order.trigger_price) params["activationPrice"] = self._format_price(order.trigger_price) params["priceProtect"] = "true" trigger_type = getattr(order, "trigger_type", None) params["workingType"] = "CONTRACT_PRICE" if "CONTRACT" in str(trigger_type).upper() else "MARK_PRICE" if order.is_post_only: params["timeInForce"] = "PostOnly" params["postOnly"] = "true" elif order.order_type != OrderType.MARKET: tif = self._map_time_in_force(order.time_in_force) if tif: params["timeInForce"] = tif tags = tuple(str(tag).lower() for tag in (getattr(order, "tags", None) or ())) exit_tagged = any(tag == "type:exit" or tag.startswith("type:exit,") for tag in tags) if (order.is_reduce_only or exit_tagged or close_position) and self._config.use_reduce_only: params["reduceOnly"] = "true" if order.order_type == OrderType.TRAILING_STOP_MARKET: trailing_value = Decimal(str(getattr(order, "trailing_offset", "1"))) if trailing_value <= 0 or trailing_value > Decimal("10"): raise BingxHttpError("Trailing stop callbackRate must be within (0, 10]") trailing = _decimal_text(trailing_value) params["callbackRate"] = trailing params["priceRate"] = trailing return params def _map_modify_order(self, order: Order, command: ModifyOrder) -> dict[str, Any]: params = self._map_submit_order(order) params["cancelClientOrderId"] = command.client_order_id.value if command.quantity is not None: params["quantity"] = self._format_quantity(command.quantity) if command.price is not None: params["price"] = self._format_price(command.price) if command.trigger_price is not None: params["stopPrice"] = self._format_price(command.trigger_price) params["activationPrice"] = self._format_price(command.trigger_price) return params @staticmethod def _map_order_type(order: Order) -> str: mapping = { OrderType.MARKET: "MARKET", OrderType.LIMIT: "LIMIT", OrderType.STOP_LIMIT: "STOP", OrderType.STOP_MARKET: "STOP_MARKET", OrderType.MARKET_IF_TOUCHED: "TAKE_PROFIT_MARKET", OrderType.LIMIT_IF_TOUCHED: "TAKE_PROFIT", OrderType.TRAILING_STOP_MARKET: "TRAILING_STOP_MARKET", } if order.order_type not in mapping: raise BingxHttpError(f"Unsupported BingX order type {order.order_type}") return mapping[order.order_type] @staticmethod def _map_order_status(status: str) -> OrderStatus: normalized = status.upper() if normalized in {"NEW", "PENDING", "CREATED"}: return OrderStatus.ACCEPTED if normalized in {"PARTIALLY_FILLED", "PARTIALLYFILLED"}: return OrderStatus.PARTIALLY_FILLED if normalized == "FILLED": return OrderStatus.FILLED if normalized in {"CANCELED", "CANCELLED"}: return OrderStatus.CANCELED if normalized in {"FAILED", "REJECTED", "TRADE_PREVENTION"}: return OrderStatus.REJECTED return OrderStatus.ACCEPTED @staticmethod def _map_time_in_force(tif: TimeInForce) -> str | None: mapping = { TimeInForce.GTC: "GTC", TimeInForce.IOC: "IOC", TimeInForce.FOK: "FOK", } return mapping.get(tif) @staticmethod def _normalize_symbol(symbol: str) -> str: return symbol.replace("-", "").replace("-PERP", "").replace("PERP", "") def _venue_symbol(self, instrument_id: InstrumentId) -> str: instrument = self._cache.instrument(instrument_id) if instrument is not None: return instrument.raw_symbol.value symbol = instrument_id.symbol.value if "-" in symbol: return symbol if symbol.endswith("USDT"): return f"{symbol[:-4]}-USDT" return symbol @staticmethod def _format_quantity(quantity: Quantity) -> str: return str(quantity) @staticmethod def _format_price(price: Price) -> str: return str(price) @staticmethod def _position_id_from_snapshot(snapshot: dict[str, Any]) -> PositionId | None: value = snapshot.get("positionID") if value in (None, "", 0, "0"): return None return PositionId(str(value)) @staticmethod def _client_order_id_from_snapshot(value: Any) -> ClientOrderId | None: if value in (None, ""): return None return ClientOrderId(str(value)) def _linked_order_ids_from_snapshot(self, snapshot: dict[str, Any]) -> list[ClientOrderId] | None: ids: list[ClientOrderId] = [] for key in ("triggerOrderId", "mainOrderId"): value = snapshot.get(key) if value not in (None, "", 0, "0"): ids.append(ClientOrderId(str(value))) return ids or None async def _order_key_aliases(self, key: str) -> list[str]: aliases = [key] async with self._state_lock: venue_key = next((venue for venue, client in self._order_id_aliases.items() if client == key), None) client_key = self._order_id_aliases.get(key) if client_key and client_key not in aliases: aliases.append(client_key) if venue_key and venue_key not in aliases: aliases.append(venue_key) return aliases def _signal_order_update(self, key: str) -> None: if not key: return event = self._order_update_events.get(key) if event is None: event = asyncio.Event() self._order_update_events[key] = event event.set() def _signal_fill_update(self, key: str) -> None: if not key: return event = self._fill_update_events.get(key) if event is None: event = asyncio.Event() self._fill_update_events[key] = event event.set() async def wait_for_order_update(self, key: str, *, timeout_s: float = 20.0) -> dict[str, Any] | None: deadline = asyncio.get_running_loop().time() + timeout_s alias_helper = getattr(self, "_order_key_aliases", None) if callable(alias_helper): keys = await alias_helper(key) else: keys = [key] async with self._state_lock: alias = getattr(self, "_order_id_aliases", {}).get(key) if alias and alias not in keys: keys.append(alias) reverse = next((venue for venue, client in getattr(self, "_order_id_aliases", {}).items() if client == key), None) if reverse and reverse not in keys: keys.append(reverse) while asyncio.get_running_loop().time() < deadline: async with self._state_lock: for candidate in keys: row = self._order_snapshots.get(candidate) if isinstance(row, dict): return dict(row) event = self._order_update_events.setdefault(key, asyncio.Event()) remaining = deadline - asyncio.get_running_loop().time() if remaining <= 0: break try: await asyncio.wait_for(event.wait(), timeout=remaining) except TimeoutError: break finally: event.clear() async with self._state_lock: for candidate in keys: row = self._order_snapshots.get(candidate) if isinstance(row, dict): return dict(row) return None async def wait_for_order_terminal(self, key: str, *, timeout_s: float = 20.0) -> dict[str, Any] | None: deadline = asyncio.get_running_loop().time() + timeout_s baseline: dict[str, Any] | None = None alias_helper = getattr(self, "_order_key_aliases", None) if callable(alias_helper): keys = await alias_helper(key) else: keys = [key] async with self._state_lock: alias = getattr(self, "_order_id_aliases", {}).get(key) if alias and alias not in keys: keys.append(alias) reverse = next((venue for venue, client in getattr(self, "_order_id_aliases", {}).items() if client == key), None) if reverse and reverse not in keys: keys.append(reverse) async with self._state_lock: for candidate in keys: row = self._order_snapshots.get(candidate) if isinstance(row, dict): baseline = dict(row) status = str(baseline.get("status") or baseline.get("X") or baseline.get("x") or "").upper() if status in {"FILLED", "CANCELED", "CANCELLED", "REJECTED", "EXPIRED"}: return baseline event = self._order_update_events.setdefault(key, asyncio.Event()) while asyncio.get_running_loop().time() < deadline: remaining = deadline - asyncio.get_running_loop().time() if remaining <= 0: break try: await asyncio.wait_for(event.wait(), timeout=remaining) except TimeoutError: break finally: event.clear() async with self._state_lock: row = None for candidate in keys: row = self._order_snapshots.get(candidate) if isinstance(row, dict): break if not isinstance(row, dict): continue snapshot = dict(row) status = str(snapshot.get("status") or snapshot.get("X") or snapshot.get("x") or "").upper() if status in {"FILLED", "CANCELED", "CANCELLED", "REJECTED", "EXPIRED"}: return snapshot baseline = snapshot return baseline async def wait_for_order_fill(self, key: str, *, timeout_s: float = 20.0) -> dict[str, Any] | None: deadline = asyncio.get_running_loop().time() + timeout_s alias_helper = getattr(self, "_order_key_aliases", None) if callable(alias_helper): keys = await alias_helper(key) else: keys = [key] async with self._state_lock: alias = getattr(self, "_order_id_aliases", {}).get(key) if alias and alias not in keys: keys.append(alias) reverse = next((venue for venue, client in getattr(self, "_order_id_aliases", {}).items() if client == key), None) if reverse and reverse not in keys: keys.append(reverse) while asyncio.get_running_loop().time() < deadline: async with self._state_lock: for candidate in keys: row = self._fill_snapshots_by_key.get(candidate) if isinstance(row, dict): return dict(row) event = self._fill_update_events.setdefault(key, asyncio.Event()) remaining = deadline - asyncio.get_running_loop().time() if remaining <= 0: break try: await asyncio.wait_for(event.wait(), timeout=remaining) except TimeoutError: break finally: event.clear() async with self._state_lock: for candidate in keys: row = self._fill_snapshots_by_key.get(candidate) if isinstance(row, dict): return dict(row) return None def _build_friction_snapshot( self, order: Order, row: dict[str, Any], instrument: Any, *, last_qty: Quantity | None = None, last_px: Price | None = None, feed_source: str = "unknown", ) -> dict[str, Any]: return estimate_friction( order, row, last_qty=last_qty, last_px=last_px, quote_currency=instrument.quote_currency.code, base_currency=instrument.base_currency.code, maker_fee=instrument.maker_fee, taker_fee=instrument.taker_fee, feed_source=feed_source, ) @staticmethod def _build_execution_quality_record( *, order: Order, venue_order_id: VenueOrderId, row: dict[str, Any], friction: dict[str, Any], source: str, ) -> dict[str, Any]: return build_execution_quality_record( record_kind="fill", trade_id=str(row.get("tradeId") or row.get("trade_id") or venue_order_id.value or order.client_order_id.value), strategy=str(order.strategy_id), asset=str(order.instrument_id.symbol.value), side=getattr(order.side, "name", str(order.side)), source=str(source or friction.get("feed_source") or "unknown"), feed_source=str(source or friction.get("feed_source") or "unknown"), client_order_id=order.client_order_id.value, venue_order_id=venue_order_id.value, order_type=getattr(order.order_type, "name", str(order.order_type)), reference_source=str(friction.get("reference_source") or "unavailable"), liquidity_side=str(friction.get("liquidity_side") or "NO_LIQUIDITY_SIDE"), fill_quality_score=friction.get("fill_quality_score"), fill_quality_class=str(friction.get("fill_quality_class") or ""), commission_quote=friction.get("commission_quote"), estimated_fee_quote=friction.get("estimated_fee_quote"), fee_rate=friction.get("fee_rate"), fee_bps=friction.get("fee_bps"), reference_px=friction.get("reference_px"), slippage_quote=friction.get("slippage_quote"), slippage_bps=friction.get("slippage_bps"), gross_friction_quote=friction.get("gross_friction_quote"), net_friction_quote=friction.get("net_friction_quote"), notional_quote=friction.get("notional_quote"), last_qty=friction.get("last_qty"), last_px=friction.get("last_px"), ts=_ch_ts_us(), extra={"row": row, "friction": friction}, ) def _persist_execution_quality(self, record: dict[str, Any]) -> None: ch_put("trade_execution_quality", record)