Root cause: open_positions()/open_orders() called _backend_snapshot() -> _call_backend() -> _run() -> pool.submit(asyncio.run, coro) which spawned a temporary event loop in a worker thread. httpx AsyncClient created inside that temp loop, loop closed immediately. All subsequent HTTP calls raised Event loop is closed or asyncio.locks.Event bound to different loop. Crash triggered WS stream reconnects; each reconnect re-ran reconcile with N>1 BingX positions and orphaned all but the largest. Fix: open_positions()/open_orders() now read backend._state (populated by await backend.connect() in the main loop). Fallback to _backend_snapshot() for callers without a connected backend. Fixes test_bingx_bugs::TestConnectNoDoubleRefresh: connect() is now async. New test_orphan_prevention.py: 23 tests covering all 5 orphan mechanisms: A. open_positions/open_orders use backend._state, never hit thread pool B. connect() awaitable, backend.connect() runs in main event loop C. Reconcile guard: >1 position logs ERROR and takes only largest D. clientOrderId p-action-base36-rand4 on every order E. EXIT sizing capped to kernel slot_size 391 passed, 2 skipped, 0 failed across all 14 test files. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
741 lines
36 KiB
Python
741 lines
36 KiB
Python
"""DITAv2 BingX venue adapter.
|
||
|
||
This is a thin normalization layer over the existing direct BingX execution
|
||
surface. It converts BingX REST/account/order payloads into DITAv2
|
||
``VenueEvent`` / ``VenueOrder`` objects without reimplementing exchange logic.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import concurrent.futures
|
||
import inspect
|
||
import itertools
|
||
import re
|
||
import threading
|
||
from datetime import datetime, timezone
|
||
from typing import Any, Iterable, List, Optional
|
||
|
||
from prod.clean_arch.dita import DecisionAction as LegacyDecisionAction
|
||
from prod.clean_arch.dita import Intent as LegacyIntent
|
||
from prod.clean_arch.dita import TradeSide as LegacyTradeSide
|
||
|
||
from prod.bingx.http import BingxHttpError
|
||
|
||
from .contracts import (
|
||
KernelCommandType,
|
||
KernelEventKind,
|
||
KernelIntent,
|
||
TradeSide,
|
||
VenueEvent,
|
||
VenueEventStatus,
|
||
VenueOrder,
|
||
VenueOrderStatus,
|
||
)
|
||
from .utils import json_safe
|
||
from .utils import safe_float
|
||
from .venue import VenueAdapter
|
||
|
||
|
||
def _row_text(row: dict[str, Any], *keys: str, default: str = "") -> str:
|
||
for key in keys:
|
||
value = row.get(key)
|
||
if value is None:
|
||
continue
|
||
text = str(value)
|
||
if text:
|
||
return text
|
||
return default
|
||
|
||
|
||
def _row_float(row: dict[str, Any], *keys: str, default: float = 0.0) -> float:
|
||
for key in keys:
|
||
try:
|
||
value = float(row.get(key) or 0.0)
|
||
except Exception:
|
||
continue
|
||
if value == value and value not in (float("inf"), float("-inf")) and value != 0.0:
|
||
return value
|
||
return default
|
||
|
||
|
||
def _normalize_status(status: str) -> str:
|
||
return str(status or "").strip().upper()
|
||
|
||
|
||
def _trade_side_from_row(row: dict[str, Any], *, fallback: TradeSide = TradeSide.FLAT) -> TradeSide:
|
||
side_raw = _row_text(row, "side", "positionSide", default="").upper()
|
||
signed_qty = _row_float(row, "positionAmt", "positionQty", "positionSize", "quantity", "pa", default=0.0)
|
||
if side_raw in {"BUY", "LONG"}:
|
||
return TradeSide.LONG
|
||
if side_raw in {"SELL", "SHORT"}:
|
||
return TradeSide.SHORT
|
||
if signed_qty < 0:
|
||
return TradeSide.SHORT
|
||
if signed_qty > 0:
|
||
return TradeSide.LONG
|
||
return fallback
|
||
|
||
|
||
def _http_error_status(exc_msg: str) -> str:
|
||
"""Map a BingxHttpError message to a venue status string.
|
||
|
||
HTTP 429 and 5xx are transient → RATE_LIMITED so the slot can retry.
|
||
4xx (non-429) are genuine client-side rejections → REJECTED.
|
||
Transport / DNS / circuit-breaker errors (no HTTP prefix) are transient.
|
||
"""
|
||
m = exc_msg.upper()
|
||
if "HTTP 429" in m:
|
||
return "RATE_LIMITED"
|
||
for code in ("500", "501", "502", "503", "504"):
|
||
if f"HTTP {code}" in m:
|
||
return "RATE_LIMITED"
|
||
if "HTTP 4" in m:
|
||
return "REJECTED"
|
||
return "RATE_LIMITED"
|
||
|
||
|
||
def _venue_event_status_from_row(status: str) -> VenueEventStatus:
|
||
normalized = _normalize_status(status)
|
||
if normalized in {"NEW", "ACKED", "PENDING", "CREATED"}:
|
||
return VenueEventStatus.ACKED
|
||
if normalized in {"RATE_LIMITED", "THROTTLED"}:
|
||
return VenueEventStatus.RATE_LIMITED
|
||
if normalized in {"PARTIALLY_FILLED", "PARTIAL_FILL"}:
|
||
return VenueEventStatus.PARTIALLY_FILLED
|
||
if normalized in {"FILLED", "FULL_FILL"}:
|
||
return VenueEventStatus.FILLED
|
||
if normalized in {"CANCELED", "CANCELLED", "EXPIRED"}:
|
||
return VenueEventStatus.CANCELED
|
||
if normalized in {"REJECTED", "FAILED"}:
|
||
return VenueEventStatus.REJECTED
|
||
if normalized in {"CANCEL_REJECTED", "CANCEL_REJECT"}:
|
||
return VenueEventStatus.CANCELED_REJECTED
|
||
return VenueEventStatus.ACKED
|
||
|
||
|
||
def _venue_order_status_from_row(status: str) -> VenueOrderStatus:
|
||
normalized = _normalize_status(status)
|
||
if normalized in {"NEW", "ACKED", "PENDING", "CREATED"}:
|
||
return VenueOrderStatus.NEW
|
||
if normalized in {"RATE_LIMITED", "THROTTLED"}:
|
||
return VenueOrderStatus.NEW
|
||
if normalized in {"PARTIALLY_FILLED", "PARTIAL_FILL"}:
|
||
return VenueOrderStatus.PARTIALLY_FILLED
|
||
if normalized in {"FILLED", "FULL_FILL"}:
|
||
return VenueOrderStatus.FILLED
|
||
if normalized in {"CANCELED", "CANCELLED", "EXPIRED"}:
|
||
return VenueOrderStatus.CANCELED
|
||
if normalized in {"REJECTED", "FAILED"}:
|
||
return VenueOrderStatus.REJECTED
|
||
return VenueOrderStatus.NEW
|
||
|
||
|
||
def _position_qty(row: dict[str, Any]) -> float:
|
||
qty = _row_float(row, "positionAmt", "positionQty", "positionSize", "quantity", "pa", default=0.0)
|
||
if qty != 0.0:
|
||
return abs(qty)
|
||
return abs(_row_float(row, "executedQty", "filledQty", "z", default=0.0))
|
||
|
||
|
||
def _position_price(row: dict[str, Any]) -> float:
|
||
return _row_float(row, "entryPrice", "avgPrice", "avgEntryPrice", "ep", "ap", "price", "lastFillPrice", "tradePrice")
|
||
|
||
|
||
def _mapping_for_snapshot(rows: Iterable[dict[str, Any]]) -> dict[str, dict[str, Any]]:
|
||
mapping: dict[str, dict[str, Any]] = {}
|
||
for row in rows:
|
||
client_id = _row_text(row, "clientOrderID", "clientOrderId", default="")
|
||
order_id = _row_text(row, "orderId", "orderID", "id", default="")
|
||
key = client_id or order_id
|
||
if key:
|
||
mapping[key] = dict(row)
|
||
if order_id and order_id not in mapping:
|
||
mapping[order_id] = dict(row)
|
||
return mapping
|
||
|
||
|
||
def _venue_order_from_row(
|
||
row: dict[str, Any],
|
||
*,
|
||
internal_trade_id: str = "",
|
||
fallback_side: TradeSide = TradeSide.FLAT,
|
||
) -> VenueOrder:
|
||
side = _trade_side_from_row(row, fallback=fallback_side)
|
||
client_id = _row_text(row, "clientOrderID", "clientOrderId", default="")
|
||
order_id = _row_text(row, "orderId", "orderID", "id", default="")
|
||
intended = _row_float(row, "origQty", "quantity", "q", "positionAmt", "positionQty", default=0.0)
|
||
if intended <= 0:
|
||
intended = _position_qty(row)
|
||
return VenueOrder(
|
||
internal_trade_id=internal_trade_id or client_id or order_id,
|
||
venue_order_id=order_id,
|
||
venue_client_id=client_id,
|
||
side=side,
|
||
intended_size=abs(float(intended or 0.0)),
|
||
filled_size=abs(_row_float(row, "executedQty", "filledQty", "z", "lastFilledQty", default=0.0)),
|
||
average_fill_price=_position_price(row),
|
||
status=_venue_order_status_from_row(_row_text(row, "status", "X", default="NEW")),
|
||
metadata={"raw": dict(row)},
|
||
)
|
||
|
||
|
||
def _event_id(seq: itertools.count) -> str:
|
||
return f"EV-{next(seq):08d}"
|
||
|
||
|
||
def _rate_limit_retry_after_ms(row: dict[str, Any]) -> int:
|
||
raw_retry = row.get("retryAfter") or row.get("retry_after_ms") or row.get("retryAfterMs")
|
||
if raw_retry is None:
|
||
msg = _row_text(row, "msg", "message", default="")
|
||
match = re.search(r"unblocked after (\d+)", msg)
|
||
if match:
|
||
try:
|
||
ts = int(match.group(1))
|
||
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
|
||
return max(0, ts - now_ms)
|
||
except Exception:
|
||
return 0
|
||
return 0
|
||
try:
|
||
return max(0, int(float(raw_retry)))
|
||
except Exception:
|
||
return 0
|
||
|
||
|
||
class BingxVenueAdapter(VenueAdapter):
|
||
"""Normalizes BingX execution responses into DITAv2 venue events."""
|
||
|
||
# Shared thread-pool executor reused across all adapter instances and
|
||
# all calls. Threads are created once and recycled, eliminating the
|
||
# per-call creation/destruction overhead of the old pattern.
|
||
_EXECUTOR: concurrent.futures.ThreadPoolExecutor | None = None
|
||
_EXECUTOR_LOCK: threading.Lock = threading.Lock()
|
||
|
||
@classmethod
|
||
def _get_executor(cls) -> concurrent.futures.ThreadPoolExecutor:
|
||
if cls._EXECUTOR is None:
|
||
with cls._EXECUTOR_LOCK:
|
||
if cls._EXECUTOR is None:
|
||
# max_workers=3 so three concurrent HTTP calls (balance,
|
||
# positions, openOrders) can proceed simultaneously without
|
||
# serialising on the pool.
|
||
cls._EXECUTOR = concurrent.futures.ThreadPoolExecutor(
|
||
max_workers=3,
|
||
thread_name_prefix="bingx_adapter",
|
||
)
|
||
return cls._EXECUTOR
|
||
|
||
def __init__(self, backend: Any | None = None, *, config: Any | None = None) -> None:
|
||
if backend is None:
|
||
if config is None:
|
||
raise ValueError("BingxVenueAdapter requires a backend or config")
|
||
from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter
|
||
|
||
backend = BingxDirectExecutionAdapter(config)
|
||
self.backend = backend
|
||
self._event_seq = itertools.count(1)
|
||
# Thread-safe snapshot cache — reads from a snapshot may arrive from
|
||
# the kernel thread while _backend_snapshot writes from the pool thread.
|
||
self._snap_lock = threading.Lock()
|
||
self._last_snapshot = None
|
||
self._snapshot_ready = threading.Event()
|
||
self._snapshot_ready.set() # initially ready (no pending write)
|
||
|
||
# Maximum seconds to wait for a single backend HTTP call. BingX REST
|
||
# round-trips are ~0.5–2 s in normal conditions; 30 s is generous enough
|
||
# to survive transient slowdowns without hanging the process forever (O5).
|
||
_BACKEND_TIMEOUT_S: float = 30.0
|
||
|
||
def _run(self, result: Any) -> Any:
|
||
if inspect.isawaitable(result):
|
||
try:
|
||
asyncio.get_running_loop()
|
||
except RuntimeError:
|
||
return asyncio.run(result)
|
||
# Inside a running event loop: submit to the shared singleton
|
||
# executor so threads are reused across calls.
|
||
pool = self._get_executor()
|
||
try:
|
||
return pool.submit(asyncio.run, result).result(timeout=self._BACKEND_TIMEOUT_S)
|
||
except TimeoutError as exc:
|
||
raise TimeoutError(
|
||
f"BingX backend call exceeded {self._BACKEND_TIMEOUT_S}s timeout"
|
||
) from exc
|
||
return result
|
||
|
||
def close(self) -> None:
|
||
"""V2: release the class-level thread-pool and any backend HTTP session."""
|
||
executor = self.__class__._EXECUTOR
|
||
if executor is not None:
|
||
with self.__class__._EXECUTOR_LOCK:
|
||
if self.__class__._EXECUTOR is executor:
|
||
self.__class__._EXECUTOR = None
|
||
executor.shutdown(wait=False)
|
||
_maybe_close_backend = getattr(self.backend, "close", None)
|
||
if _maybe_close_backend is not None:
|
||
try:
|
||
_maybe_close_backend()
|
||
except Exception:
|
||
pass
|
||
|
||
def _call_backend(self, method_name: str, *args: Any, **kwargs: Any) -> Any:
|
||
method = getattr(self.backend, method_name, None)
|
||
if method is None:
|
||
raise AttributeError(f"backend has no method {method_name}")
|
||
return self._run(method(*args, **kwargs))
|
||
|
||
def _backend_snapshot(self, *, include_history: bool = False, timeout_ms: float = 5000.0):
|
||
"""Fetch a fresh snapshot from the backend and cache it thread-safely.
|
||
|
||
Design (industry best-practice reader-writer pattern):
|
||
- A caller that needs a fresh snapshot *waits* on ``_snapshot_ready``
|
||
before reading, so it never sees a stale partial write.
|
||
- While a snapshot fetch is in-flight, the lock is cleared; concurrent
|
||
callers block on ``_snapshot_ready`` with a timeout. If the fetch
|
||
succeeds in time they get the fresh snapshot; if it times out they
|
||
fall back to ``_last_snapshot`` (an eventually-consistent design —
|
||
stale data that *was* consistent is safer than no data).
|
||
- The write is guarded by ``_snap_lock`` so concurrent writes are
|
||
serialised and ``_last_snapshot`` is never partially assigned.
|
||
"""
|
||
if not self._snapshot_ready.wait(timeout=timeout_ms / 1000.0):
|
||
# Timeout waiting for a previous snapshot write — return the
|
||
# last-known-good snapshot rather than blocking the caller.
|
||
with self._snap_lock:
|
||
return self._last_snapshot
|
||
|
||
self._snapshot_ready.clear()
|
||
try:
|
||
snapshot = self._call_backend("refresh_state", None, include_history=include_history)
|
||
except Exception:
|
||
self._snapshot_ready.set()
|
||
raise
|
||
|
||
with self._snap_lock:
|
||
self._last_snapshot = snapshot
|
||
self._snapshot_ready.set()
|
||
return snapshot
|
||
|
||
@staticmethod
|
||
def _legacy_intent(intent: KernelIntent) -> LegacyIntent:
|
||
action = LegacyDecisionAction.ENTER if intent.action == KernelCommandType.ENTER else LegacyDecisionAction.EXIT
|
||
side = LegacyTradeSide.SHORT if intent.side == TradeSide.SHORT else LegacyTradeSide.LONG
|
||
metadata = dict(intent.metadata)
|
||
metadata["_order_type"] = getattr(intent, "order_type", "MARKET")
|
||
metadata["_limit_price"] = float(getattr(intent, "limit_price", 0.0) or 0.0)
|
||
return LegacyIntent(
|
||
timestamp=intent.timestamp,
|
||
trade_id=intent.trade_id,
|
||
decision_id=intent.intent_id,
|
||
asset=intent.asset,
|
||
action=action,
|
||
side=side,
|
||
reason=intent.reason,
|
||
target_size=float(intent.target_size),
|
||
leverage=float(intent.leverage),
|
||
reference_price=float(intent.reference_price),
|
||
confidence=1.0,
|
||
bars_held=0,
|
||
exit_leg_ratios=tuple(intent.exit_leg_ratios or (1.0,)),
|
||
metadata=metadata,
|
||
)
|
||
|
||
async def connect(self) -> bool:
|
||
"""Async connect — awaits backend.connect() in the caller's event loop.
|
||
|
||
The old sync path called self._run(backend.connect()) which spawns a
|
||
thread-pool asyncio.run(), creating the httpx AsyncClient in a temporary
|
||
loop that immediately closes. Every subsequent request then raises
|
||
"Event loop is closed" or "bound to a different event loop".
|
||
Awaiting directly here fixes that: the client is always created in the
|
||
main running loop.
|
||
"""
|
||
conn_fn = getattr(self.backend, "connect", None)
|
||
if conn_fn is not None:
|
||
result = conn_fn()
|
||
if inspect.isawaitable(result):
|
||
await result
|
||
# backend.connect() already called refresh_state() — no second fetch needed
|
||
return True
|
||
|
||
async def cancel_async(self, order: VenueOrder, *, reason: str = "") -> List[VenueEvent]:
|
||
"""Async cancel — runs in the caller's event loop, no thread-pool deadlock.
|
||
|
||
The sync cancel() path goes through _call_backend → _run → thread-pool →
|
||
asyncio.run() in a new thread. The aiohttp session is bound to the main
|
||
event loop, so using it from a different loop deadlocks — same bug that
|
||
was fixed for submit via submit_async. This version awaits backend.cancel()
|
||
directly in the caller's (main) event loop.
|
||
"""
|
||
cancel_fn = getattr(self.backend, "cancel", None)
|
||
if cancel_fn is not None:
|
||
response = await cancel_fn(order, reason=reason)
|
||
else:
|
||
response = None
|
||
return self._events_from_cancel(order, response, None, None, reason=reason)
|
||
|
||
def cancel(self, order: VenueOrder, *, reason: str = "") -> List[VenueEvent]:
|
||
# _events_from_cancel never reads before/after — snapshots are dead weight.
|
||
# NOTE: if backend.cancel is async (BingxDirectExecutionAdapter), this sync
|
||
# path goes through the thread-pool and will deadlock in a running event loop.
|
||
# Use cancel_async() from async contexts (process_intent_async already does).
|
||
response = None
|
||
if hasattr(self.backend, "cancel"):
|
||
response = self._call_backend("cancel", order, reason=reason)
|
||
else:
|
||
client = getattr(self.backend, "_client", None)
|
||
instrument_symbol = ""
|
||
if hasattr(self.backend, "_instrument_venue_symbol"):
|
||
asset = str(order.metadata.get("asset") or "")
|
||
if not asset:
|
||
slot_id = int(order.metadata.get("slot_id", 0) or 0)
|
||
if hasattr(self, "_kernel_ref") and self._kernel_ref is not None:
|
||
try:
|
||
asset = self._kernel_ref.slot(slot_id).asset
|
||
except Exception:
|
||
pass
|
||
if not asset:
|
||
asset = str(order.metadata.get("asset") or "")
|
||
instrument_symbol = str(self.backend._instrument_venue_symbol(asset)) if asset else ""
|
||
if client is None or not instrument_symbol:
|
||
raise RuntimeError("backend does not expose a cancel surface")
|
||
params = {"symbol": instrument_symbol}
|
||
if order.venue_order_id:
|
||
params["orderId"] = order.venue_order_id
|
||
else:
|
||
params["clientOrderId"] = order.venue_client_id
|
||
try:
|
||
response = self._run(client.signed_delete("/openApi/swap/v2/trade/order", params))
|
||
except BingxHttpError as exc:
|
||
# W10: map HTTP error class to status — 429/5xx are transient, 4xx are real rejections
|
||
response = {"status": _http_error_status(str(exc)), "msg": str(exc), "orderId": order.venue_order_id, "clientOrderId": order.venue_client_id}
|
||
return self._events_from_cancel(order, response, None, None, reason=reason)
|
||
|
||
def open_orders(self) -> List[VenueOrder]:
|
||
# Use backend._state (populated by await backend.connect()) rather than
|
||
# _backend_snapshot() → _call_backend() → _run() → pool.submit(asyncio.run)
|
||
# which spawns a temporary event loop, creates the httpx AsyncClient inside
|
||
# it, then closes that loop — every subsequent HTTP call then raises
|
||
# "Event loop is closed" or "asyncio.locks.Event bound to a different loop".
|
||
backend_state = getattr(self.backend, "_state", None)
|
||
if backend_state is not None:
|
||
return [_venue_order_from_row(row) for row in (backend_state.open_orders or [])]
|
||
snapshot = self._backend_snapshot(include_history=False)
|
||
return [_venue_order_from_row(row) for row in (snapshot.open_orders or [])]
|
||
|
||
def open_positions(self) -> List[dict[str, Any]]:
|
||
# Same rationale as open_orders(): prefer cached backend._state to avoid
|
||
# the thread-pool asyncio.run() path that corrupts the httpx session.
|
||
backend_state = getattr(self.backend, "_state", None)
|
||
if backend_state is not None:
|
||
return [dict(row) for row in (backend_state.open_positions or {}).values()]
|
||
snapshot = self._backend_snapshot(include_history=False)
|
||
return [dict(row) for row in (snapshot.open_positions or {}).values()]
|
||
|
||
async def reconcile(self) -> List[VenueEvent]: # type: ignore[override]
|
||
"""Fetch open-order state from BingX and return any pending VenueEvents.
|
||
|
||
WHY ASYNC: the old sync version called _backend_snapshot() → _call_backend()
|
||
→ _run() → pool.submit(asyncio.run, coro).result(timeout=30s). That spawned
|
||
a *new* event loop in a thread-pool thread. The BingxHttpClient (aiohttp
|
||
session) is bound to the *main* event loop — using it from a different loop
|
||
silently deadlocks. BingX VST responds in ~500ms; the deadlock made every
|
||
reconcile call block the main event loop for the full 30s timeout.
|
||
|
||
FIX: declare async, call backend.refresh_state() directly with await so it
|
||
runs in the *caller's* (main) event loop where the session lives.
|
||
pump_venue_events() already has `if inspect.isawaitable(events): await events`
|
||
— zero caller changes required.
|
||
|
||
include_history=False: all_orders/all_fills require a symbol (symbol=None
|
||
skips them anyway), so include_history=True was fetching nothing extra.
|
||
"""
|
||
try:
|
||
snapshot = await self.backend.refresh_state(None, include_history=False)
|
||
except Exception as exc:
|
||
import logging as _log
|
||
_log.getLogger(__name__).warning("reconcile: refresh_state failed: %s", exc)
|
||
return []
|
||
return self._events_from_snapshot(snapshot)
|
||
|
||
def submit(self, intent: KernelIntent) -> List[VenueEvent]:
|
||
# Snapshots dropped: receipt executedQty fields take precedence (same as submit_async)
|
||
receipt = self._call_backend("submit_intent", self._legacy_intent(intent))
|
||
return self._events_from_submit(intent, receipt, None, None)
|
||
|
||
async def submit_async(self, intent: KernelIntent) -> List[VenueEvent]:
|
||
"""Async submit — runs in the caller's event loop, no thread-pool deadlock.
|
||
|
||
The sync submit() calls _backend_snapshot() × 2 + submit_intent via
|
||
_run() → asyncio.run() in a thread-pool → new event loop → aiohttp
|
||
session (main-loop-bound) deadlocks → 30s timeout on every ENTER/EXIT.
|
||
|
||
This version awaits the backend directly. The before/after snapshots
|
||
are omitted: fill size comes from the receipt's executedQty field, and
|
||
the WS account stream delivers FULL_FILL events independently.
|
||
Passing None for snapshots makes _filled_size_from_snapshots return 0.0
|
||
(a safe fallback; the receipt fields take precedence).
|
||
"""
|
||
receipt = await self.backend.submit_intent(self._legacy_intent(intent))
|
||
return self._events_from_submit(intent, receipt, None, None)
|
||
|
||
def _events_from_submit(self, intent: KernelIntent, receipt: Any, before, after) -> List[VenueEvent]: # noqa: ANN001
|
||
ack_row = dict(getattr(receipt, "raw_ack", {}) or {})
|
||
status = _normalize_status(getattr(receipt, "status", "") or _row_text(ack_row, "status", default="NEW"))
|
||
order_id = _row_text(ack_row, "orderId", "orderID", default=str(getattr(receipt, "order_id", "") or ""))
|
||
client_order_id = _row_text(ack_row, "clientOrderID", "clientOrderId", default=str(getattr(receipt, "client_order_id", "") or intent.intent_id))
|
||
if status in {"RATE_LIMITED", "THROTTLED"}:
|
||
return [
|
||
VenueEvent(
|
||
timestamp=getattr(receipt, "timestamp", datetime.now(timezone.utc)),
|
||
event_id=_event_id(self._event_seq),
|
||
trade_id=intent.trade_id,
|
||
slot_id=intent.slot_id,
|
||
kind=KernelEventKind.RATE_LIMITED,
|
||
status=VenueEventStatus.RATE_LIMITED,
|
||
venue_order_id=order_id,
|
||
venue_client_id=client_order_id,
|
||
side=intent.side,
|
||
asset=intent.asset,
|
||
price=safe_float(getattr(receipt, "price", 0.0), 0.0),
|
||
size=float(intent.target_size or 0.0),
|
||
filled_size=0.0,
|
||
remaining_size=float(intent.target_size or 0.0),
|
||
reason=_row_text(ack_row, "msg", "message", default="BINGX_RATE_LIMITED"),
|
||
raw_payload=ack_row or json_safe(receipt),
|
||
metadata={"intent_id": intent.intent_id, "action": intent.action.value, "retry_after_ms": _rate_limit_retry_after_ms(ack_row)},
|
||
)
|
||
]
|
||
base_event = VenueEvent(
|
||
timestamp=getattr(receipt, "timestamp", datetime.now(timezone.utc)),
|
||
event_id=_event_id(self._event_seq),
|
||
trade_id=intent.trade_id,
|
||
slot_id=intent.slot_id,
|
||
kind=KernelEventKind.ORDER_ACK,
|
||
status=VenueEventStatus.ACKED,
|
||
venue_order_id=order_id,
|
||
venue_client_id=client_order_id,
|
||
side=intent.side,
|
||
asset=intent.asset,
|
||
price=safe_float(getattr(receipt, "price", 0.0), 0.0),
|
||
size=float(intent.target_size or 0.0),
|
||
filled_size=0.0,
|
||
remaining_size=float(intent.target_size or 0.0),
|
||
reason="",
|
||
raw_payload=ack_row or json_safe(receipt),
|
||
metadata={"intent_id": intent.intent_id, "action": intent.action.value},
|
||
)
|
||
if status in {"REJECTED", "FAILED"}:
|
||
return [
|
||
VenueEvent(
|
||
**{**base_event.__dict__, "event_id": _event_id(self._event_seq), "kind": KernelEventKind.ORDER_REJECT, "status": VenueEventStatus.REJECTED, "reason": _row_text(ack_row, "msg", "message", default="BINGX_ORDER_REJECTED")},
|
||
)
|
||
]
|
||
# Extract friction fields annotated by submit_intent (Gap 1/2/3).
|
||
fee_estimated = float(ack_row.get("_fee_estimated") or 0.0)
|
||
fee_source = str(ack_row.get("_fee_source") or "")
|
||
is_maker_est = bool(ack_row.get("_is_maker_est", False))
|
||
mark_at_submit = float(ack_row.get("_mark_at_submit") or 0.0)
|
||
slippage_bps = float(ack_row.get("_slippage_bps") or 0.0)
|
||
exchange_ts = int(ack_row.get("_exchange_ts") or 0)
|
||
|
||
events = [base_event]
|
||
fill_status = _venue_event_status_from_row(status)
|
||
filled_size = _row_float(ack_row, "executedQty", "cumFilledQty", "filledQty", "lastFilledQty", default=0.0)
|
||
snapshot_fill_size = self._filled_size_from_snapshots(before, after, intent.asset)
|
||
if filled_size <= 0:
|
||
filled_size = snapshot_fill_size
|
||
emit_fill = fill_status in {VenueEventStatus.PARTIALLY_FILLED, VenueEventStatus.FILLED} or snapshot_fill_size > 0.0
|
||
if emit_fill:
|
||
if filled_size <= 0:
|
||
filled_size = float(intent.target_size or 0.0)
|
||
remaining_size = max(0.0, float(intent.target_size or 0.0) - float(filled_size))
|
||
fill_kind = KernelEventKind.FULL_FILL if fill_status == VenueEventStatus.FILLED or remaining_size <= 1e-12 else KernelEventKind.PARTIAL_FILL
|
||
events.append(
|
||
VenueEvent(
|
||
timestamp=base_event.timestamp,
|
||
event_id=_event_id(self._event_seq),
|
||
trade_id=intent.trade_id,
|
||
slot_id=intent.slot_id,
|
||
kind=fill_kind,
|
||
status=VenueEventStatus.FILLED if fill_kind == KernelEventKind.FULL_FILL else VenueEventStatus.PARTIALLY_FILLED,
|
||
venue_order_id=order_id,
|
||
venue_client_id=client_order_id,
|
||
side=intent.side,
|
||
asset=intent.asset,
|
||
price=safe_float(_row_float(ack_row, "avgPrice", "ap", "price", "lastFillPrice", default=getattr(receipt, "price", 0.0)), 0.0),
|
||
size=float(intent.target_size or 0.0),
|
||
filled_size=float(filled_size),
|
||
remaining_size=float(remaining_size),
|
||
reason="",
|
||
raw_payload=ack_row or json_safe(receipt),
|
||
metadata={"intent_id": intent.intent_id, "action": intent.action.value},
|
||
# Gap 1/2/3: fee + friction fields populated from submit_intent annotations.
|
||
# fee_source="ESTIMATED_*" until WS FILL_SETTLED updates it to "WS_SETTLED".
|
||
fee=fee_estimated,
|
||
fee_asset="USDT",
|
||
fee_source=fee_source,
|
||
is_maker=is_maker_est,
|
||
exchange_ts=exchange_ts,
|
||
slippage_bps=slippage_bps,
|
||
mark_at_submit=mark_at_submit,
|
||
)
|
||
)
|
||
return events
|
||
|
||
def _events_from_cancel(self, order: VenueOrder, response: Any, before, after, *, reason: str = "") -> List[VenueEvent]: # noqa: ANN001
|
||
raw = response if isinstance(response, dict) else {}
|
||
status = _normalize_status(_row_text(raw, "status", default="CANCELED"))
|
||
if status in {"RATE_LIMITED", "THROTTLED"}:
|
||
return [
|
||
VenueEvent(
|
||
timestamp=datetime.now(timezone.utc),
|
||
event_id=_event_id(self._event_seq),
|
||
trade_id=order.internal_trade_id or order.venue_client_id,
|
||
slot_id=int(order.metadata.get("slot_id", 0) or 0),
|
||
kind=KernelEventKind.RATE_LIMITED,
|
||
status=VenueEventStatus.RATE_LIMITED,
|
||
venue_order_id=order.venue_order_id,
|
||
venue_client_id=order.venue_client_id,
|
||
side=order.side,
|
||
asset=str(order.metadata.get("asset") or ""),
|
||
price=safe_float(_row_float(raw, "avgPrice", "ap", "price", "lastFillPrice", default=order.average_fill_price), 0.0),
|
||
size=float(order.intended_size or 0.0),
|
||
filled_size=float(order.filled_size or 0.0),
|
||
remaining_size=float(order.remaining_size),
|
||
reason=reason or _row_text(raw, "msg", "message", default="BINGX_RATE_LIMITED"),
|
||
raw_payload=raw or {"orderId": order.venue_order_id, "clientOrderId": order.venue_client_id, "status": status or "RATE_LIMITED"},
|
||
metadata={**dict(order.metadata), "retry_after_ms": _rate_limit_retry_after_ms(raw)},
|
||
)
|
||
]
|
||
event_status = _venue_event_status_from_row(status)
|
||
kind = KernelEventKind.CANCEL_ACK if event_status == VenueEventStatus.CANCELED else KernelEventKind.CANCEL_REJECT
|
||
if event_status == VenueEventStatus.CANCELED_REJECTED:
|
||
kind = KernelEventKind.CANCEL_REJECT
|
||
return [
|
||
VenueEvent(
|
||
timestamp=datetime.now(timezone.utc),
|
||
event_id=_event_id(self._event_seq),
|
||
trade_id=order.internal_trade_id or order.venue_client_id,
|
||
slot_id=int(order.metadata.get("slot_id", 0) or 0),
|
||
kind=kind,
|
||
status=event_status,
|
||
venue_order_id=order.venue_order_id,
|
||
venue_client_id=order.venue_client_id,
|
||
side=order.side,
|
||
asset=str(order.metadata.get("asset") or ""),
|
||
price=safe_float(_row_float(raw, "avgPrice", "ap", "price", "lastFillPrice", default=order.average_fill_price), 0.0),
|
||
size=float(order.intended_size or 0.0),
|
||
filled_size=float(order.filled_size or 0.0),
|
||
remaining_size=float(order.remaining_size),
|
||
reason=reason or _row_text(raw, "msg", "message", default="BINGX_CANCEL_ACK" if kind == KernelEventKind.CANCEL_ACK else "BINGX_CANCEL_REJECT"),
|
||
raw_payload=raw or {"orderId": order.venue_order_id, "clientOrderId": order.venue_client_id, "status": status or event_status.value},
|
||
metadata=dict(order.metadata),
|
||
)
|
||
]
|
||
|
||
def _events_from_snapshot(self, snapshot: Any) -> List[VenueEvent]: # noqa: ANN001
|
||
events: list[VenueEvent] = []
|
||
seen: set[tuple[str, str, str]] = set()
|
||
for row in getattr(snapshot, "open_orders", []) or []:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
event = self._event_from_row(row, slot_id=0)
|
||
key = (event.venue_client_id, event.venue_order_id, event.kind.value)
|
||
if key not in seen:
|
||
seen.add(key)
|
||
events.append(event)
|
||
for row in getattr(snapshot, "all_orders", []) or []:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
event = self._event_from_row(row, slot_id=0)
|
||
key = (event.venue_client_id, event.venue_order_id, event.kind.value)
|
||
if key not in seen:
|
||
seen.add(key)
|
||
events.append(event)
|
||
for row in getattr(snapshot, "all_fills", []) or []:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
event = self._fill_event_from_row(row)
|
||
key = (event.venue_client_id, event.venue_order_id, event.kind.value)
|
||
if key not in seen:
|
||
seen.add(key)
|
||
events.append(event)
|
||
return events
|
||
|
||
def _event_from_row(self, row: dict[str, Any], *, slot_id: int) -> VenueEvent:
|
||
status = _normalize_status(_row_text(row, "status", "X", default="NEW"))
|
||
event_status = _venue_event_status_from_row(status)
|
||
kind = {
|
||
VenueEventStatus.ACKED: KernelEventKind.ORDER_ACK,
|
||
VenueEventStatus.PARTIALLY_FILLED: KernelEventKind.PARTIAL_FILL,
|
||
VenueEventStatus.FILLED: KernelEventKind.FULL_FILL,
|
||
VenueEventStatus.CANCELED: KernelEventKind.CANCEL_ACK,
|
||
VenueEventStatus.REJECTED: KernelEventKind.ORDER_REJECT,
|
||
VenueEventStatus.CANCELED_REJECTED: KernelEventKind.CANCEL_REJECT,
|
||
VenueEventStatus.RATE_LIMITED: KernelEventKind.RATE_LIMITED,
|
||
}.get(event_status, KernelEventKind.ORDER_ACK)
|
||
size = _row_float(row, "origQty", "quantity", "q", "positionAmt", default=0.0)
|
||
filled = _row_float(row, "executedQty", "cumFilledQty", "filledQty", "z", "lastFilledQty", default=0.0)
|
||
if filled <= 0.0 and kind in {KernelEventKind.PARTIAL_FILL, KernelEventKind.FULL_FILL}:
|
||
filled = size
|
||
return VenueEvent(
|
||
timestamp=datetime.now(timezone.utc),
|
||
event_id=_event_id(self._event_seq),
|
||
trade_id=_row_text(row, "tradeId", "trade_id", default=_row_text(row, "clientOrderId", "clientOrderID", default="")),
|
||
slot_id=slot_id,
|
||
kind=kind,
|
||
status=event_status,
|
||
venue_order_id=_row_text(row, "orderId", "orderID", "id", default=""),
|
||
venue_client_id=_row_text(row, "clientOrderID", "clientOrderId", "c", default=""),
|
||
side=_trade_side_from_row(row),
|
||
asset=_row_text(row, "symbol", default=""),
|
||
price=safe_float(_row_float(row, "avgPrice", "ap", "price", "lastFillPrice", default=0.0), 0.0),
|
||
size=abs(float(size or 0.0)),
|
||
filled_size=abs(float(filled or 0.0)),
|
||
remaining_size=max(0.0, abs(float(size or 0.0)) - abs(float(filled or 0.0))),
|
||
reason=_row_text(row, "msg", "message", default=""),
|
||
raw_payload=dict(row),
|
||
metadata={"source": "bingx"},
|
||
)
|
||
|
||
def _fill_event_from_row(self, row: dict[str, Any]) -> VenueEvent:
|
||
status = _normalize_status(_row_text(row, "status", "X", default="FILLED"))
|
||
event_status = _venue_event_status_from_row(status)
|
||
kind = KernelEventKind.FULL_FILL if event_status == VenueEventStatus.FILLED else KernelEventKind.PARTIAL_FILL
|
||
return VenueEvent(
|
||
timestamp=datetime.now(timezone.utc),
|
||
event_id=_event_id(self._event_seq),
|
||
trade_id=_row_text(row, "tradeId", "trade_id", default=_row_text(row, "clientOrderId", "clientOrderID", default="")),
|
||
slot_id=0,
|
||
kind=kind,
|
||
status=event_status,
|
||
venue_order_id=_row_text(row, "orderId", "orderID", "id", default=""),
|
||
venue_client_id=_row_text(row, "clientOrderID", "clientOrderId", "c", default=""),
|
||
side=_trade_side_from_row(row),
|
||
asset=_row_text(row, "symbol", default=""),
|
||
price=safe_float(_row_float(row, "lastFillPrice", "L", "price", "ap", default=0.0), 0.0),
|
||
size=abs(_row_float(row, "executedQty", "z", "lastFilledQty", default=0.0)),
|
||
filled_size=abs(_row_float(row, "lastFilledQty", "l", "z", default=0.0)),
|
||
remaining_size=max(0.0, abs(_row_float(row, "executedQty", "z", "lastFilledQty", default=0.0)) - abs(_row_float(row, "lastFilledQty", "l", "z", default=0.0))),
|
||
reason=_row_text(row, "msg", "message", default=""),
|
||
raw_payload=dict(row),
|
||
metadata={"source": "bingx"},
|
||
)
|
||
|
||
@staticmethod
|
||
def _filled_size_from_snapshots(before: Any, after: Any, asset: str) -> float: # noqa: ANN001
|
||
def _lookup(snapshot: Any) -> float:
|
||
positions = getattr(snapshot, "open_positions", {}) or {}
|
||
for key, row in positions.items():
|
||
symbol = _row_text(row, "symbol", default=str(key))
|
||
if symbol.replace("-", "").replace("_", "").upper() == asset.replace("-", "").replace("_", "").upper():
|
||
return _position_qty(row)
|
||
return 0.0
|
||
|
||
before_qty = _lookup(before)
|
||
after_qty = _lookup(after)
|
||
diff = abs(before_qty - after_qty)
|
||
return diff
|