Files
siloqy/prod/clean_arch/dita_v2/bingx_venue.py
Codex a9ba407ae2 PINK: fix reconcile 30s deadlock — async def + direct await
Root cause: _run() → pool.submit(asyncio.run, coro).result(30s) created a
new event loop in a thread-pool thread; aiohttp session is main-loop-bound
→ silent deadlock every step cycle. BingX VST is healthy (544ms gather).

Fix: async def reconcile() + await self.backend.refresh_state() in main loop.
pump_venue_events() already handles isawaitable → zero caller changes.
include_history=False (symbol=None skips history anyway).
Tests: 13/13 passing (async contract, 3 fault paths, <2s timing, gather-10).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-04 18:46:19 +02:00

669 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""DITAv2 BingX venue adapter.
This is a thin normalization layer over the existing direct BingX execution
surface. It converts BingX REST/account/order payloads into DITAv2
``VenueEvent`` / ``VenueOrder`` objects without reimplementing exchange logic.
"""
from __future__ import annotations
import asyncio
import concurrent.futures
import inspect
import itertools
import re
import threading
from datetime import datetime, timezone
from typing import Any, Iterable, List, Optional
from prod.clean_arch.dita import DecisionAction as LegacyDecisionAction
from prod.clean_arch.dita import Intent as LegacyIntent
from prod.clean_arch.dita import TradeSide as LegacyTradeSide
from prod.bingx.http import BingxHttpError
from .contracts import (
KernelCommandType,
KernelEventKind,
KernelIntent,
TradeSide,
VenueEvent,
VenueEventStatus,
VenueOrder,
VenueOrderStatus,
)
from .utils import json_safe
from .utils import safe_float
from .venue import VenueAdapter
def _row_text(row: dict[str, Any], *keys: str, default: str = "") -> str:
for key in keys:
value = row.get(key)
if value is None:
continue
text = str(value)
if text:
return text
return default
def _row_float(row: dict[str, Any], *keys: str, default: float = 0.0) -> float:
for key in keys:
try:
value = float(row.get(key) or 0.0)
except Exception:
continue
if value == value and value not in (float("inf"), float("-inf")) and value != 0.0:
return value
return default
def _normalize_status(status: str) -> str:
return str(status or "").strip().upper()
def _trade_side_from_row(row: dict[str, Any], *, fallback: TradeSide = TradeSide.FLAT) -> TradeSide:
side_raw = _row_text(row, "side", "positionSide", default="").upper()
signed_qty = _row_float(row, "positionAmt", "positionQty", "positionSize", "quantity", "pa", default=0.0)
if side_raw in {"BUY", "LONG"}:
return TradeSide.LONG
if side_raw in {"SELL", "SHORT"}:
return TradeSide.SHORT
if signed_qty < 0:
return TradeSide.SHORT
if signed_qty > 0:
return TradeSide.LONG
return fallback
def _http_error_status(exc_msg: str) -> str:
"""Map a BingxHttpError message to a venue status string.
HTTP 429 and 5xx are transient → RATE_LIMITED so the slot can retry.
4xx (non-429) are genuine client-side rejections → REJECTED.
Transport / DNS / circuit-breaker errors (no HTTP prefix) are transient.
"""
m = exc_msg.upper()
if "HTTP 429" in m:
return "RATE_LIMITED"
for code in ("500", "501", "502", "503", "504"):
if f"HTTP {code}" in m:
return "RATE_LIMITED"
if "HTTP 4" in m:
return "REJECTED"
return "RATE_LIMITED"
def _venue_event_status_from_row(status: str) -> VenueEventStatus:
normalized = _normalize_status(status)
if normalized in {"NEW", "ACKED", "PENDING", "CREATED"}:
return VenueEventStatus.ACKED
if normalized in {"RATE_LIMITED", "THROTTLED"}:
return VenueEventStatus.RATE_LIMITED
if normalized in {"PARTIALLY_FILLED", "PARTIAL_FILL"}:
return VenueEventStatus.PARTIALLY_FILLED
if normalized in {"FILLED", "FULL_FILL"}:
return VenueEventStatus.FILLED
if normalized in {"CANCELED", "CANCELLED", "EXPIRED"}:
return VenueEventStatus.CANCELED
if normalized in {"REJECTED", "FAILED"}:
return VenueEventStatus.REJECTED
if normalized in {"CANCEL_REJECTED", "CANCEL_REJECT"}:
return VenueEventStatus.CANCELED_REJECTED
return VenueEventStatus.ACKED
def _venue_order_status_from_row(status: str) -> VenueOrderStatus:
normalized = _normalize_status(status)
if normalized in {"NEW", "ACKED", "PENDING", "CREATED"}:
return VenueOrderStatus.NEW
if normalized in {"RATE_LIMITED", "THROTTLED"}:
return VenueOrderStatus.NEW
if normalized in {"PARTIALLY_FILLED", "PARTIAL_FILL"}:
return VenueOrderStatus.PARTIALLY_FILLED
if normalized in {"FILLED", "FULL_FILL"}:
return VenueOrderStatus.FILLED
if normalized in {"CANCELED", "CANCELLED", "EXPIRED"}:
return VenueOrderStatus.CANCELED
if normalized in {"REJECTED", "FAILED"}:
return VenueOrderStatus.REJECTED
return VenueOrderStatus.NEW
def _position_qty(row: dict[str, Any]) -> float:
qty = _row_float(row, "positionAmt", "positionQty", "positionSize", "quantity", "pa", default=0.0)
if qty != 0.0:
return abs(qty)
return abs(_row_float(row, "executedQty", "filledQty", "z", default=0.0))
def _position_price(row: dict[str, Any]) -> float:
return _row_float(row, "entryPrice", "avgPrice", "avgEntryPrice", "ep", "ap", "price", "lastFillPrice", "tradePrice")
def _mapping_for_snapshot(rows: Iterable[dict[str, Any]]) -> dict[str, dict[str, Any]]:
mapping: dict[str, dict[str, Any]] = {}
for row in rows:
client_id = _row_text(row, "clientOrderID", "clientOrderId", default="")
order_id = _row_text(row, "orderId", "orderID", "id", default="")
key = client_id or order_id
if key:
mapping[key] = dict(row)
if order_id and order_id not in mapping:
mapping[order_id] = dict(row)
return mapping
def _venue_order_from_row(
row: dict[str, Any],
*,
internal_trade_id: str = "",
fallback_side: TradeSide = TradeSide.FLAT,
) -> VenueOrder:
side = _trade_side_from_row(row, fallback=fallback_side)
client_id = _row_text(row, "clientOrderID", "clientOrderId", default="")
order_id = _row_text(row, "orderId", "orderID", "id", default="")
intended = _row_float(row, "origQty", "quantity", "q", "positionAmt", "positionQty", default=0.0)
if intended <= 0:
intended = _position_qty(row)
return VenueOrder(
internal_trade_id=internal_trade_id or client_id or order_id,
venue_order_id=order_id,
venue_client_id=client_id,
side=side,
intended_size=abs(float(intended or 0.0)),
filled_size=abs(_row_float(row, "executedQty", "filledQty", "z", "lastFilledQty", default=0.0)),
average_fill_price=_position_price(row),
status=_venue_order_status_from_row(_row_text(row, "status", "X", default="NEW")),
metadata={"raw": dict(row)},
)
def _event_id(seq: itertools.count) -> str:
return f"EV-{next(seq):08d}"
def _rate_limit_retry_after_ms(row: dict[str, Any]) -> int:
raw_retry = row.get("retryAfter") or row.get("retry_after_ms") or row.get("retryAfterMs")
if raw_retry is None:
msg = _row_text(row, "msg", "message", default="")
match = re.search(r"unblocked after (\d+)", msg)
if match:
try:
ts = int(match.group(1))
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
return max(0, ts - now_ms)
except Exception:
return 0
return 0
try:
return max(0, int(float(raw_retry)))
except Exception:
return 0
class BingxVenueAdapter(VenueAdapter):
"""Normalizes BingX execution responses into DITAv2 venue events."""
# Shared thread-pool executor reused across all adapter instances and
# all calls. Threads are created once and recycled, eliminating the
# per-call creation/destruction overhead of the old pattern.
_EXECUTOR: concurrent.futures.ThreadPoolExecutor | None = None
_EXECUTOR_LOCK: threading.Lock = threading.Lock()
@classmethod
def _get_executor(cls) -> concurrent.futures.ThreadPoolExecutor:
if cls._EXECUTOR is None:
with cls._EXECUTOR_LOCK:
if cls._EXECUTOR is None:
# max_workers=3 so three concurrent HTTP calls (balance,
# positions, openOrders) can proceed simultaneously without
# serialising on the pool.
cls._EXECUTOR = concurrent.futures.ThreadPoolExecutor(
max_workers=3,
thread_name_prefix="bingx_adapter",
)
return cls._EXECUTOR
def __init__(self, backend: Any | None = None, *, config: Any | None = None) -> None:
if backend is None:
if config is None:
raise ValueError("BingxVenueAdapter requires a backend or config")
from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter
backend = BingxDirectExecutionAdapter(config)
self.backend = backend
self._event_seq = itertools.count(1)
# Thread-safe snapshot cache — reads from a snapshot may arrive from
# the kernel thread while _backend_snapshot writes from the pool thread.
self._snap_lock = threading.Lock()
self._last_snapshot = None
self._snapshot_ready = threading.Event()
self._snapshot_ready.set() # initially ready (no pending write)
# Maximum seconds to wait for a single backend HTTP call. BingX REST
# round-trips are ~0.52 s in normal conditions; 30 s is generous enough
# to survive transient slowdowns without hanging the process forever (O5).
_BACKEND_TIMEOUT_S: float = 30.0
def _run(self, result: Any) -> Any:
if inspect.isawaitable(result):
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(result)
# Inside a running event loop: submit to the shared singleton
# executor so threads are reused across calls.
pool = self._get_executor()
try:
return pool.submit(asyncio.run, result).result(timeout=self._BACKEND_TIMEOUT_S)
except TimeoutError as exc:
raise TimeoutError(
f"BingX backend call exceeded {self._BACKEND_TIMEOUT_S}s timeout"
) from exc
return result
def close(self) -> None:
"""V2: release the class-level thread-pool and any backend HTTP session."""
executor = self.__class__._EXECUTOR
if executor is not None:
with self.__class__._EXECUTOR_LOCK:
if self.__class__._EXECUTOR is executor:
self.__class__._EXECUTOR = None
executor.shutdown(wait=False)
_maybe_close_backend = getattr(self.backend, "close", None)
if _maybe_close_backend is not None:
try:
_maybe_close_backend()
except Exception:
pass
def _call_backend(self, method_name: str, *args: Any, **kwargs: Any) -> Any:
method = getattr(self.backend, method_name, None)
if method is None:
raise AttributeError(f"backend has no method {method_name}")
return self._run(method(*args, **kwargs))
def _backend_snapshot(self, *, include_history: bool = False, timeout_ms: float = 5000.0):
"""Fetch a fresh snapshot from the backend and cache it thread-safely.
Design (industry best-practice reader-writer pattern):
- A caller that needs a fresh snapshot *waits* on ``_snapshot_ready``
before reading, so it never sees a stale partial write.
- While a snapshot fetch is in-flight, the lock is cleared; concurrent
callers block on ``_snapshot_ready`` with a timeout. If the fetch
succeeds in time they get the fresh snapshot; if it times out they
fall back to ``_last_snapshot`` (an eventually-consistent design —
stale data that *was* consistent is safer than no data).
- The write is guarded by ``_snap_lock`` so concurrent writes are
serialised and ``_last_snapshot`` is never partially assigned.
"""
if not self._snapshot_ready.wait(timeout=timeout_ms / 1000.0):
# Timeout waiting for a previous snapshot write — return the
# last-known-good snapshot rather than blocking the caller.
with self._snap_lock:
return self._last_snapshot
self._snapshot_ready.clear()
try:
snapshot = self._call_backend("refresh_state", None, include_history=include_history)
except Exception:
self._snapshot_ready.set()
raise
with self._snap_lock:
self._last_snapshot = snapshot
self._snapshot_ready.set()
return snapshot
@staticmethod
def _legacy_intent(intent: KernelIntent) -> LegacyIntent:
action = LegacyDecisionAction.ENTER if intent.action == KernelCommandType.ENTER else LegacyDecisionAction.EXIT
side = LegacyTradeSide.SHORT if intent.side == TradeSide.SHORT else LegacyTradeSide.LONG
metadata = dict(intent.metadata)
metadata["_order_type"] = getattr(intent, "order_type", "MARKET")
metadata["_limit_price"] = float(getattr(intent, "limit_price", 0.0) or 0.0)
return LegacyIntent(
timestamp=intent.timestamp,
trade_id=intent.trade_id,
decision_id=intent.intent_id,
asset=intent.asset,
action=action,
side=side,
reason=intent.reason,
target_size=float(intent.target_size),
leverage=float(intent.leverage),
reference_price=float(intent.reference_price),
confidence=1.0,
bars_held=0,
exit_leg_ratios=tuple(intent.exit_leg_ratios or (1.0,)),
metadata=metadata,
)
def connect(self) -> bool:
result = getattr(self.backend, "connect", None)
if result is not None:
self._run(result())
self._backend_snapshot(include_history=True)
return True
def cancel(self, order: VenueOrder, *, reason: str = "") -> List[VenueEvent]:
snapshot_before = self._backend_snapshot(include_history=True)
response = None
if hasattr(self.backend, "cancel_order"):
response = self._call_backend("cancel_order", order, reason=reason)
elif hasattr(self.backend, "cancel"):
response = self._call_backend("cancel", order, reason=reason)
else:
client = getattr(self.backend, "_client", None)
instrument_symbol = ""
if hasattr(self.backend, "_instrument_venue_symbol"):
asset = str(order.metadata.get("asset") or "")
if not asset:
slot_id = int(order.metadata.get("slot_id", 0) or 0)
if hasattr(self, "_kernel_ref") and self._kernel_ref is not None:
try:
asset = self._kernel_ref.slot(slot_id).asset
except Exception:
pass
if not asset:
asset = str(order.metadata.get("asset") or "")
instrument_symbol = str(self.backend._instrument_venue_symbol(asset)) if asset else ""
if client is None or not instrument_symbol:
raise RuntimeError("backend does not expose a cancel surface")
params = {"symbol": instrument_symbol}
if order.venue_order_id:
params["orderId"] = order.venue_order_id
else:
params["clientOrderId"] = order.venue_client_id
try:
response = self._run(client.signed_delete("/openApi/swap/v2/trade/order", params))
except BingxHttpError as exc:
# W10: map HTTP error class to status — 429/5xx are transient, 4xx are real rejections
response = {"status": _http_error_status(str(exc)), "msg": str(exc), "orderId": order.venue_order_id, "clientOrderId": order.venue_client_id}
snapshot_after = self._backend_snapshot(include_history=True)
return self._events_from_cancel(order, response, snapshot_before, snapshot_after, reason=reason)
def open_orders(self) -> List[VenueOrder]:
snapshot = self._backend_snapshot(include_history=False)
return [_venue_order_from_row(row) for row in (snapshot.open_orders or [])]
def open_positions(self) -> List[dict[str, Any]]:
snapshot = self._backend_snapshot(include_history=False)
return [dict(row) for row in (snapshot.open_positions or {}).values()]
async def reconcile(self) -> List[VenueEvent]: # type: ignore[override]
"""Fetch open-order state from BingX and return any pending VenueEvents.
WHY ASYNC: the old sync version called _backend_snapshot() → _call_backend()
→ _run() → pool.submit(asyncio.run, coro).result(timeout=30s). That spawned
a *new* event loop in a thread-pool thread. The BingxHttpClient (aiohttp
session) is bound to the *main* event loop — using it from a different loop
silently deadlocks. BingX VST responds in ~500ms; the deadlock made every
reconcile call block the main event loop for the full 30s timeout.
FIX: declare async, call backend.refresh_state() directly with await so it
runs in the *caller's* (main) event loop where the session lives.
pump_venue_events() already has `if inspect.isawaitable(events): await events`
— zero caller changes required.
include_history=False: all_orders/all_fills require a symbol (symbol=None
skips them anyway), so include_history=True was fetching nothing extra.
"""
try:
snapshot = await self.backend.refresh_state(None, include_history=False)
except Exception as exc:
import logging as _log
_log.getLogger(__name__).warning("reconcile: refresh_state failed: %s", exc)
return []
return self._events_from_snapshot(snapshot)
def submit(self, intent: KernelIntent) -> List[VenueEvent]:
snapshot_before = self._backend_snapshot(include_history=True)
receipt = self._call_backend("submit_intent", self._legacy_intent(intent))
snapshot_after = self._backend_snapshot(include_history=True)
return self._events_from_submit(intent, receipt, snapshot_before, snapshot_after)
def _events_from_submit(self, intent: KernelIntent, receipt: Any, before, after) -> List[VenueEvent]: # noqa: ANN001
ack_row = dict(getattr(receipt, "raw_ack", {}) or {})
status = _normalize_status(getattr(receipt, "status", "") or _row_text(ack_row, "status", default="NEW"))
order_id = _row_text(ack_row, "orderId", "orderID", default=str(getattr(receipt, "order_id", "") or ""))
client_order_id = _row_text(ack_row, "clientOrderID", "clientOrderId", default=str(getattr(receipt, "client_order_id", "") or intent.intent_id))
if status in {"RATE_LIMITED", "THROTTLED"}:
return [
VenueEvent(
timestamp=getattr(receipt, "timestamp", datetime.now(timezone.utc)),
event_id=_event_id(self._event_seq),
trade_id=intent.trade_id,
slot_id=intent.slot_id,
kind=KernelEventKind.RATE_LIMITED,
status=VenueEventStatus.RATE_LIMITED,
venue_order_id=order_id,
venue_client_id=client_order_id,
side=intent.side,
asset=intent.asset,
price=safe_float(getattr(receipt, "price", 0.0), 0.0),
size=float(intent.target_size or 0.0),
filled_size=0.0,
remaining_size=float(intent.target_size or 0.0),
reason=_row_text(ack_row, "msg", "message", default="BINGX_RATE_LIMITED"),
raw_payload=ack_row or json_safe(receipt),
metadata={"intent_id": intent.intent_id, "action": intent.action.value, "retry_after_ms": _rate_limit_retry_after_ms(ack_row)},
)
]
base_event = VenueEvent(
timestamp=getattr(receipt, "timestamp", datetime.now(timezone.utc)),
event_id=_event_id(self._event_seq),
trade_id=intent.trade_id,
slot_id=intent.slot_id,
kind=KernelEventKind.ORDER_ACK,
status=VenueEventStatus.ACKED,
venue_order_id=order_id,
venue_client_id=client_order_id,
side=intent.side,
asset=intent.asset,
price=safe_float(getattr(receipt, "price", 0.0), 0.0),
size=float(intent.target_size or 0.0),
filled_size=0.0,
remaining_size=float(intent.target_size or 0.0),
reason="",
raw_payload=ack_row or json_safe(receipt),
metadata={"intent_id": intent.intent_id, "action": intent.action.value},
)
if status in {"REJECTED", "FAILED"}:
return [
VenueEvent(
**{**base_event.__dict__, "event_id": _event_id(self._event_seq), "kind": KernelEventKind.ORDER_REJECT, "status": VenueEventStatus.REJECTED, "reason": _row_text(ack_row, "msg", "message", default="BINGX_ORDER_REJECTED")},
)
]
events = [base_event]
fill_status = _venue_event_status_from_row(status)
filled_size = _row_float(ack_row, "executedQty", "cumFilledQty", "filledQty", "lastFilledQty", default=0.0)
snapshot_fill_size = self._filled_size_from_snapshots(before, after, intent.asset)
if filled_size <= 0:
filled_size = snapshot_fill_size
emit_fill = fill_status in {VenueEventStatus.PARTIALLY_FILLED, VenueEventStatus.FILLED} or snapshot_fill_size > 0.0
if emit_fill:
if filled_size <= 0:
filled_size = float(intent.target_size or 0.0)
remaining_size = max(0.0, float(intent.target_size or 0.0) - float(filled_size))
fill_kind = KernelEventKind.FULL_FILL if fill_status == VenueEventStatus.FILLED or remaining_size <= 1e-12 else KernelEventKind.PARTIAL_FILL
events.append(
VenueEvent(
timestamp=base_event.timestamp,
event_id=_event_id(self._event_seq),
trade_id=intent.trade_id,
slot_id=intent.slot_id,
kind=fill_kind,
status=VenueEventStatus.FILLED if fill_kind == KernelEventKind.FULL_FILL else VenueEventStatus.PARTIALLY_FILLED,
venue_order_id=order_id,
venue_client_id=client_order_id,
side=intent.side,
asset=intent.asset,
price=safe_float(_row_float(ack_row, "avgPrice", "ap", "price", "lastFillPrice", default=getattr(receipt, "price", 0.0)), 0.0),
size=float(intent.target_size or 0.0),
filled_size=float(filled_size),
remaining_size=float(remaining_size),
reason="",
raw_payload=ack_row or json_safe(receipt),
metadata={"intent_id": intent.intent_id, "action": intent.action.value},
)
)
return events
def _events_from_cancel(self, order: VenueOrder, response: Any, before, after, *, reason: str = "") -> List[VenueEvent]: # noqa: ANN001
raw = response if isinstance(response, dict) else {}
status = _normalize_status(_row_text(raw, "status", default="CANCELED"))
if status in {"RATE_LIMITED", "THROTTLED"}:
return [
VenueEvent(
timestamp=datetime.now(timezone.utc),
event_id=_event_id(self._event_seq),
trade_id=order.internal_trade_id or order.venue_client_id,
slot_id=int(order.metadata.get("slot_id", 0) or 0),
kind=KernelEventKind.RATE_LIMITED,
status=VenueEventStatus.RATE_LIMITED,
venue_order_id=order.venue_order_id,
venue_client_id=order.venue_client_id,
side=order.side,
asset=str(order.metadata.get("asset") or ""),
price=safe_float(_row_float(raw, "avgPrice", "ap", "price", "lastFillPrice", default=order.average_fill_price), 0.0),
size=float(order.intended_size or 0.0),
filled_size=float(order.filled_size or 0.0),
remaining_size=float(order.remaining_size),
reason=reason or _row_text(raw, "msg", "message", default="BINGX_RATE_LIMITED"),
raw_payload=raw or {"orderId": order.venue_order_id, "clientOrderId": order.venue_client_id, "status": status or "RATE_LIMITED"},
metadata={**dict(order.metadata), "retry_after_ms": _rate_limit_retry_after_ms(raw)},
)
]
event_status = _venue_event_status_from_row(status)
kind = KernelEventKind.CANCEL_ACK if event_status == VenueEventStatus.CANCELED else KernelEventKind.CANCEL_REJECT
if event_status == VenueEventStatus.CANCELED_REJECTED:
kind = KernelEventKind.CANCEL_REJECT
return [
VenueEvent(
timestamp=datetime.now(timezone.utc),
event_id=_event_id(self._event_seq),
trade_id=order.internal_trade_id or order.venue_client_id,
slot_id=int(order.metadata.get("slot_id", 0) or 0),
kind=kind,
status=event_status,
venue_order_id=order.venue_order_id,
venue_client_id=order.venue_client_id,
side=order.side,
asset=str(order.metadata.get("asset") or ""),
price=safe_float(_row_float(raw, "avgPrice", "ap", "price", "lastFillPrice", default=order.average_fill_price), 0.0),
size=float(order.intended_size or 0.0),
filled_size=float(order.filled_size or 0.0),
remaining_size=float(order.remaining_size),
reason=reason or _row_text(raw, "msg", "message", default="BINGX_CANCEL_ACK" if kind == KernelEventKind.CANCEL_ACK else "BINGX_CANCEL_REJECT"),
raw_payload=raw or {"orderId": order.venue_order_id, "clientOrderId": order.venue_client_id, "status": status or event_status.value},
metadata=dict(order.metadata),
)
]
def _events_from_snapshot(self, snapshot: Any) -> List[VenueEvent]: # noqa: ANN001
events: list[VenueEvent] = []
seen: set[tuple[str, str, str]] = set()
for row in getattr(snapshot, "open_orders", []) or []:
if not isinstance(row, dict):
continue
event = self._event_from_row(row, slot_id=0)
key = (event.venue_client_id, event.venue_order_id, event.kind.value)
if key not in seen:
seen.add(key)
events.append(event)
for row in getattr(snapshot, "all_orders", []) or []:
if not isinstance(row, dict):
continue
event = self._event_from_row(row, slot_id=0)
key = (event.venue_client_id, event.venue_order_id, event.kind.value)
if key not in seen:
seen.add(key)
events.append(event)
for row in getattr(snapshot, "all_fills", []) or []:
if not isinstance(row, dict):
continue
event = self._fill_event_from_row(row)
key = (event.venue_client_id, event.venue_order_id, event.kind.value)
if key not in seen:
seen.add(key)
events.append(event)
return events
def _event_from_row(self, row: dict[str, Any], *, slot_id: int) -> VenueEvent:
status = _normalize_status(_row_text(row, "status", "X", default="NEW"))
event_status = _venue_event_status_from_row(status)
kind = {
VenueEventStatus.ACKED: KernelEventKind.ORDER_ACK,
VenueEventStatus.PARTIALLY_FILLED: KernelEventKind.PARTIAL_FILL,
VenueEventStatus.FILLED: KernelEventKind.FULL_FILL,
VenueEventStatus.CANCELED: KernelEventKind.CANCEL_ACK,
VenueEventStatus.REJECTED: KernelEventKind.ORDER_REJECT,
VenueEventStatus.CANCELED_REJECTED: KernelEventKind.CANCEL_REJECT,
VenueEventStatus.RATE_LIMITED: KernelEventKind.RATE_LIMITED,
}.get(event_status, KernelEventKind.ORDER_ACK)
size = _row_float(row, "origQty", "quantity", "q", "positionAmt", default=0.0)
filled = _row_float(row, "executedQty", "cumFilledQty", "filledQty", "z", "lastFilledQty", default=0.0)
if filled <= 0.0 and kind in {KernelEventKind.PARTIAL_FILL, KernelEventKind.FULL_FILL}:
filled = size
return VenueEvent(
timestamp=datetime.now(timezone.utc),
event_id=_event_id(self._event_seq),
trade_id=_row_text(row, "tradeId", "trade_id", default=_row_text(row, "clientOrderId", "clientOrderID", default="")),
slot_id=slot_id,
kind=kind,
status=event_status,
venue_order_id=_row_text(row, "orderId", "orderID", "id", default=""),
venue_client_id=_row_text(row, "clientOrderID", "clientOrderId", "c", default=""),
side=_trade_side_from_row(row),
asset=_row_text(row, "symbol", default=""),
price=safe_float(_row_float(row, "avgPrice", "ap", "price", "lastFillPrice", default=0.0), 0.0),
size=abs(float(size or 0.0)),
filled_size=abs(float(filled or 0.0)),
remaining_size=max(0.0, abs(float(size or 0.0)) - abs(float(filled or 0.0))),
reason=_row_text(row, "msg", "message", default=""),
raw_payload=dict(row),
metadata={"source": "bingx"},
)
def _fill_event_from_row(self, row: dict[str, Any]) -> VenueEvent:
status = _normalize_status(_row_text(row, "status", "X", default="FILLED"))
event_status = _venue_event_status_from_row(status)
kind = KernelEventKind.FULL_FILL if event_status == VenueEventStatus.FILLED else KernelEventKind.PARTIAL_FILL
return VenueEvent(
timestamp=datetime.now(timezone.utc),
event_id=_event_id(self._event_seq),
trade_id=_row_text(row, "tradeId", "trade_id", default=_row_text(row, "clientOrderId", "clientOrderID", default="")),
slot_id=0,
kind=kind,
status=event_status,
venue_order_id=_row_text(row, "orderId", "orderID", "id", default=""),
venue_client_id=_row_text(row, "clientOrderID", "clientOrderId", "c", default=""),
side=_trade_side_from_row(row),
asset=_row_text(row, "symbol", default=""),
price=safe_float(_row_float(row, "lastFillPrice", "L", "price", "ap", default=0.0), 0.0),
size=abs(_row_float(row, "executedQty", "z", "lastFilledQty", default=0.0)),
filled_size=abs(_row_float(row, "lastFilledQty", "l", "z", default=0.0)),
remaining_size=max(0.0, abs(_row_float(row, "executedQty", "z", "lastFilledQty", default=0.0)) - abs(_row_float(row, "lastFilledQty", "l", "z", default=0.0))),
reason=_row_text(row, "msg", "message", default=""),
raw_payload=dict(row),
metadata={"source": "bingx"},
)
@staticmethod
def _filled_size_from_snapshots(before: Any, after: Any, asset: str) -> float: # noqa: ANN001
def _lookup(snapshot: Any) -> float:
positions = getattr(snapshot, "open_positions", {}) or {}
for key, row in positions.items():
symbol = _row_text(row, "symbol", default=str(key))
if symbol.replace("-", "").replace("_", "").upper() == asset.replace("-", "").replace("_", "").upper():
return _position_qty(row)
return 0.0
before_qty = _lookup(before)
after_qty = _lookup(after)
diff = abs(before_qty - after_qty)
return diff