Files
siloqy/prod/clean_arch/dita_v2/bingx_venue.py
Codex 535eea855d PINK: cancel_async, S2 task guard, 29 new regression tests — 346/346 green
Bug fixes:
  1. bingx_venue.py: add cancel_async() — async cancel that awaits backend.cancel()
     directly in the main event loop. The sync cancel() path goes through _run()
     → thread-pool → asyncio.run() in a new thread, but aiohttp is bound to the
     main loop → deadlock. Identical root cause as the old sync submit() → fixed
     via submit_async. Remove dead cancel_order branch (BingxDirectExecutionAdapter
     has cancel, not cancel_order).

  2. rust_backend.py: process_intent_async CANCEL path now uses cancel_async when
     available (matching the submit_async pattern for ENTER/EXIT). Sync cancel()
     fallback kept for MockVenueAdapter compat.

  3. bingx_direct.py: guard S2 background refresh task per symbol. Old code discarded
     the task reference; rapid submits piled up concurrent _refresh_state_background
     calls all writing self._state in arbitrary completion order (stale last-writer-
     wins). Now: skip creating a new task if one is already pending for the symbol;
     store reference and clear via done-callback.

Test additions (test_bingx_bugs.py, 29 tests):
  - cancel_async: awaitable, calls backend.cancel directly, maps all statuses
  - process_intent_async CANCEL: dispatches cancel_async / falls back to sync
  - S2 guard: task stored, no duplicates while pending, new task after done
  - _events_from_submit with None snapshots: FILLED/NEW/REJECTED/PARTIAL/RATE_LIMITED
  - _filled_size_from_snapshots(None, None): safe 0.0 return
  - _events_from_cancel: before/after completely ignored
  - connect(): no double refresh_state, no-op if backend has no connect
  - submit() sync with None snapshots: FULL_FILL still emitted
  - cancel() branch audit: uses cancel not cancel_order, raises for no-cancel backend

Fix: test_exchange_event_seam_parity.py TestMockSubscribe — replace deprecated
asyncio.get_event_loop().run_until_complete() with asyncio.run() (Python 3.12
raises RuntimeError when event loop is closed by earlier suite tests).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 16:02:13 +02:00

717 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""DITAv2 BingX venue adapter.
This is a thin normalization layer over the existing direct BingX execution
surface. It converts BingX REST/account/order payloads into DITAv2
``VenueEvent`` / ``VenueOrder`` objects without reimplementing exchange logic.
"""
from __future__ import annotations
import asyncio
import concurrent.futures
import inspect
import itertools
import re
import threading
from datetime import datetime, timezone
from typing import Any, Iterable, List, Optional
from prod.clean_arch.dita import DecisionAction as LegacyDecisionAction
from prod.clean_arch.dita import Intent as LegacyIntent
from prod.clean_arch.dita import TradeSide as LegacyTradeSide
from prod.bingx.http import BingxHttpError
from .contracts import (
KernelCommandType,
KernelEventKind,
KernelIntent,
TradeSide,
VenueEvent,
VenueEventStatus,
VenueOrder,
VenueOrderStatus,
)
from .utils import json_safe
from .utils import safe_float
from .venue import VenueAdapter
def _row_text(row: dict[str, Any], *keys: str, default: str = "") -> str:
for key in keys:
value = row.get(key)
if value is None:
continue
text = str(value)
if text:
return text
return default
def _row_float(row: dict[str, Any], *keys: str, default: float = 0.0) -> float:
for key in keys:
try:
value = float(row.get(key) or 0.0)
except Exception:
continue
if value == value and value not in (float("inf"), float("-inf")) and value != 0.0:
return value
return default
def _normalize_status(status: str) -> str:
return str(status or "").strip().upper()
def _trade_side_from_row(row: dict[str, Any], *, fallback: TradeSide = TradeSide.FLAT) -> TradeSide:
side_raw = _row_text(row, "side", "positionSide", default="").upper()
signed_qty = _row_float(row, "positionAmt", "positionQty", "positionSize", "quantity", "pa", default=0.0)
if side_raw in {"BUY", "LONG"}:
return TradeSide.LONG
if side_raw in {"SELL", "SHORT"}:
return TradeSide.SHORT
if signed_qty < 0:
return TradeSide.SHORT
if signed_qty > 0:
return TradeSide.LONG
return fallback
def _http_error_status(exc_msg: str) -> str:
"""Map a BingxHttpError message to a venue status string.
HTTP 429 and 5xx are transient → RATE_LIMITED so the slot can retry.
4xx (non-429) are genuine client-side rejections → REJECTED.
Transport / DNS / circuit-breaker errors (no HTTP prefix) are transient.
"""
m = exc_msg.upper()
if "HTTP 429" in m:
return "RATE_LIMITED"
for code in ("500", "501", "502", "503", "504"):
if f"HTTP {code}" in m:
return "RATE_LIMITED"
if "HTTP 4" in m:
return "REJECTED"
return "RATE_LIMITED"
def _venue_event_status_from_row(status: str) -> VenueEventStatus:
normalized = _normalize_status(status)
if normalized in {"NEW", "ACKED", "PENDING", "CREATED"}:
return VenueEventStatus.ACKED
if normalized in {"RATE_LIMITED", "THROTTLED"}:
return VenueEventStatus.RATE_LIMITED
if normalized in {"PARTIALLY_FILLED", "PARTIAL_FILL"}:
return VenueEventStatus.PARTIALLY_FILLED
if normalized in {"FILLED", "FULL_FILL"}:
return VenueEventStatus.FILLED
if normalized in {"CANCELED", "CANCELLED", "EXPIRED"}:
return VenueEventStatus.CANCELED
if normalized in {"REJECTED", "FAILED"}:
return VenueEventStatus.REJECTED
if normalized in {"CANCEL_REJECTED", "CANCEL_REJECT"}:
return VenueEventStatus.CANCELED_REJECTED
return VenueEventStatus.ACKED
def _venue_order_status_from_row(status: str) -> VenueOrderStatus:
normalized = _normalize_status(status)
if normalized in {"NEW", "ACKED", "PENDING", "CREATED"}:
return VenueOrderStatus.NEW
if normalized in {"RATE_LIMITED", "THROTTLED"}:
return VenueOrderStatus.NEW
if normalized in {"PARTIALLY_FILLED", "PARTIAL_FILL"}:
return VenueOrderStatus.PARTIALLY_FILLED
if normalized in {"FILLED", "FULL_FILL"}:
return VenueOrderStatus.FILLED
if normalized in {"CANCELED", "CANCELLED", "EXPIRED"}:
return VenueOrderStatus.CANCELED
if normalized in {"REJECTED", "FAILED"}:
return VenueOrderStatus.REJECTED
return VenueOrderStatus.NEW
def _position_qty(row: dict[str, Any]) -> float:
qty = _row_float(row, "positionAmt", "positionQty", "positionSize", "quantity", "pa", default=0.0)
if qty != 0.0:
return abs(qty)
return abs(_row_float(row, "executedQty", "filledQty", "z", default=0.0))
def _position_price(row: dict[str, Any]) -> float:
return _row_float(row, "entryPrice", "avgPrice", "avgEntryPrice", "ep", "ap", "price", "lastFillPrice", "tradePrice")
def _mapping_for_snapshot(rows: Iterable[dict[str, Any]]) -> dict[str, dict[str, Any]]:
mapping: dict[str, dict[str, Any]] = {}
for row in rows:
client_id = _row_text(row, "clientOrderID", "clientOrderId", default="")
order_id = _row_text(row, "orderId", "orderID", "id", default="")
key = client_id or order_id
if key:
mapping[key] = dict(row)
if order_id and order_id not in mapping:
mapping[order_id] = dict(row)
return mapping
def _venue_order_from_row(
row: dict[str, Any],
*,
internal_trade_id: str = "",
fallback_side: TradeSide = TradeSide.FLAT,
) -> VenueOrder:
side = _trade_side_from_row(row, fallback=fallback_side)
client_id = _row_text(row, "clientOrderID", "clientOrderId", default="")
order_id = _row_text(row, "orderId", "orderID", "id", default="")
intended = _row_float(row, "origQty", "quantity", "q", "positionAmt", "positionQty", default=0.0)
if intended <= 0:
intended = _position_qty(row)
return VenueOrder(
internal_trade_id=internal_trade_id or client_id or order_id,
venue_order_id=order_id,
venue_client_id=client_id,
side=side,
intended_size=abs(float(intended or 0.0)),
filled_size=abs(_row_float(row, "executedQty", "filledQty", "z", "lastFilledQty", default=0.0)),
average_fill_price=_position_price(row),
status=_venue_order_status_from_row(_row_text(row, "status", "X", default="NEW")),
metadata={"raw": dict(row)},
)
def _event_id(seq: itertools.count) -> str:
return f"EV-{next(seq):08d}"
def _rate_limit_retry_after_ms(row: dict[str, Any]) -> int:
raw_retry = row.get("retryAfter") or row.get("retry_after_ms") or row.get("retryAfterMs")
if raw_retry is None:
msg = _row_text(row, "msg", "message", default="")
match = re.search(r"unblocked after (\d+)", msg)
if match:
try:
ts = int(match.group(1))
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
return max(0, ts - now_ms)
except Exception:
return 0
return 0
try:
return max(0, int(float(raw_retry)))
except Exception:
return 0
class BingxVenueAdapter(VenueAdapter):
"""Normalizes BingX execution responses into DITAv2 venue events."""
# Shared thread-pool executor reused across all adapter instances and
# all calls. Threads are created once and recycled, eliminating the
# per-call creation/destruction overhead of the old pattern.
_EXECUTOR: concurrent.futures.ThreadPoolExecutor | None = None
_EXECUTOR_LOCK: threading.Lock = threading.Lock()
@classmethod
def _get_executor(cls) -> concurrent.futures.ThreadPoolExecutor:
if cls._EXECUTOR is None:
with cls._EXECUTOR_LOCK:
if cls._EXECUTOR is None:
# max_workers=3 so three concurrent HTTP calls (balance,
# positions, openOrders) can proceed simultaneously without
# serialising on the pool.
cls._EXECUTOR = concurrent.futures.ThreadPoolExecutor(
max_workers=3,
thread_name_prefix="bingx_adapter",
)
return cls._EXECUTOR
def __init__(self, backend: Any | None = None, *, config: Any | None = None) -> None:
if backend is None:
if config is None:
raise ValueError("BingxVenueAdapter requires a backend or config")
from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter
backend = BingxDirectExecutionAdapter(config)
self.backend = backend
self._event_seq = itertools.count(1)
# Thread-safe snapshot cache — reads from a snapshot may arrive from
# the kernel thread while _backend_snapshot writes from the pool thread.
self._snap_lock = threading.Lock()
self._last_snapshot = None
self._snapshot_ready = threading.Event()
self._snapshot_ready.set() # initially ready (no pending write)
# Maximum seconds to wait for a single backend HTTP call. BingX REST
# round-trips are ~0.52 s in normal conditions; 30 s is generous enough
# to survive transient slowdowns without hanging the process forever (O5).
_BACKEND_TIMEOUT_S: float = 30.0
def _run(self, result: Any) -> Any:
if inspect.isawaitable(result):
try:
asyncio.get_running_loop()
except RuntimeError:
return asyncio.run(result)
# Inside a running event loop: submit to the shared singleton
# executor so threads are reused across calls.
pool = self._get_executor()
try:
return pool.submit(asyncio.run, result).result(timeout=self._BACKEND_TIMEOUT_S)
except TimeoutError as exc:
raise TimeoutError(
f"BingX backend call exceeded {self._BACKEND_TIMEOUT_S}s timeout"
) from exc
return result
def close(self) -> None:
"""V2: release the class-level thread-pool and any backend HTTP session."""
executor = self.__class__._EXECUTOR
if executor is not None:
with self.__class__._EXECUTOR_LOCK:
if self.__class__._EXECUTOR is executor:
self.__class__._EXECUTOR = None
executor.shutdown(wait=False)
_maybe_close_backend = getattr(self.backend, "close", None)
if _maybe_close_backend is not None:
try:
_maybe_close_backend()
except Exception:
pass
def _call_backend(self, method_name: str, *args: Any, **kwargs: Any) -> Any:
method = getattr(self.backend, method_name, None)
if method is None:
raise AttributeError(f"backend has no method {method_name}")
return self._run(method(*args, **kwargs))
def _backend_snapshot(self, *, include_history: bool = False, timeout_ms: float = 5000.0):
"""Fetch a fresh snapshot from the backend and cache it thread-safely.
Design (industry best-practice reader-writer pattern):
- A caller that needs a fresh snapshot *waits* on ``_snapshot_ready``
before reading, so it never sees a stale partial write.
- While a snapshot fetch is in-flight, the lock is cleared; concurrent
callers block on ``_snapshot_ready`` with a timeout. If the fetch
succeeds in time they get the fresh snapshot; if it times out they
fall back to ``_last_snapshot`` (an eventually-consistent design —
stale data that *was* consistent is safer than no data).
- The write is guarded by ``_snap_lock`` so concurrent writes are
serialised and ``_last_snapshot`` is never partially assigned.
"""
if not self._snapshot_ready.wait(timeout=timeout_ms / 1000.0):
# Timeout waiting for a previous snapshot write — return the
# last-known-good snapshot rather than blocking the caller.
with self._snap_lock:
return self._last_snapshot
self._snapshot_ready.clear()
try:
snapshot = self._call_backend("refresh_state", None, include_history=include_history)
except Exception:
self._snapshot_ready.set()
raise
with self._snap_lock:
self._last_snapshot = snapshot
self._snapshot_ready.set()
return snapshot
@staticmethod
def _legacy_intent(intent: KernelIntent) -> LegacyIntent:
action = LegacyDecisionAction.ENTER if intent.action == KernelCommandType.ENTER else LegacyDecisionAction.EXIT
side = LegacyTradeSide.SHORT if intent.side == TradeSide.SHORT else LegacyTradeSide.LONG
metadata = dict(intent.metadata)
metadata["_order_type"] = getattr(intent, "order_type", "MARKET")
metadata["_limit_price"] = float(getattr(intent, "limit_price", 0.0) or 0.0)
return LegacyIntent(
timestamp=intent.timestamp,
trade_id=intent.trade_id,
decision_id=intent.intent_id,
asset=intent.asset,
action=action,
side=side,
reason=intent.reason,
target_size=float(intent.target_size),
leverage=float(intent.leverage),
reference_price=float(intent.reference_price),
confidence=1.0,
bars_held=0,
exit_leg_ratios=tuple(intent.exit_leg_ratios or (1.0,)),
metadata=metadata,
)
def connect(self) -> bool:
result = getattr(self.backend, "connect", None)
if result is not None:
self._run(result())
# backend.connect() already called refresh_state() — no second fetch needed
return True
async def cancel_async(self, order: VenueOrder, *, reason: str = "") -> List[VenueEvent]:
"""Async cancel — runs in the caller's event loop, no thread-pool deadlock.
The sync cancel() path goes through _call_backend → _run → thread-pool →
asyncio.run() in a new thread. The aiohttp session is bound to the main
event loop, so using it from a different loop deadlocks — same bug that
was fixed for submit via submit_async. This version awaits backend.cancel()
directly in the caller's (main) event loop.
"""
cancel_fn = getattr(self.backend, "cancel", None)
if cancel_fn is not None:
response = await cancel_fn(order, reason=reason)
else:
response = None
return self._events_from_cancel(order, response, None, None, reason=reason)
def cancel(self, order: VenueOrder, *, reason: str = "") -> List[VenueEvent]:
# _events_from_cancel never reads before/after — snapshots are dead weight.
# NOTE: if backend.cancel is async (BingxDirectExecutionAdapter), this sync
# path goes through the thread-pool and will deadlock in a running event loop.
# Use cancel_async() from async contexts (process_intent_async already does).
response = None
if hasattr(self.backend, "cancel"):
response = self._call_backend("cancel", order, reason=reason)
else:
client = getattr(self.backend, "_client", None)
instrument_symbol = ""
if hasattr(self.backend, "_instrument_venue_symbol"):
asset = str(order.metadata.get("asset") or "")
if not asset:
slot_id = int(order.metadata.get("slot_id", 0) or 0)
if hasattr(self, "_kernel_ref") and self._kernel_ref is not None:
try:
asset = self._kernel_ref.slot(slot_id).asset
except Exception:
pass
if not asset:
asset = str(order.metadata.get("asset") or "")
instrument_symbol = str(self.backend._instrument_venue_symbol(asset)) if asset else ""
if client is None or not instrument_symbol:
raise RuntimeError("backend does not expose a cancel surface")
params = {"symbol": instrument_symbol}
if order.venue_order_id:
params["orderId"] = order.venue_order_id
else:
params["clientOrderId"] = order.venue_client_id
try:
response = self._run(client.signed_delete("/openApi/swap/v2/trade/order", params))
except BingxHttpError as exc:
# W10: map HTTP error class to status — 429/5xx are transient, 4xx are real rejections
response = {"status": _http_error_status(str(exc)), "msg": str(exc), "orderId": order.venue_order_id, "clientOrderId": order.venue_client_id}
return self._events_from_cancel(order, response, None, None, reason=reason)
def open_orders(self) -> List[VenueOrder]:
snapshot = self._backend_snapshot(include_history=False)
return [_venue_order_from_row(row) for row in (snapshot.open_orders or [])]
def open_positions(self) -> List[dict[str, Any]]:
snapshot = self._backend_snapshot(include_history=False)
return [dict(row) for row in (snapshot.open_positions or {}).values()]
async def reconcile(self) -> List[VenueEvent]: # type: ignore[override]
"""Fetch open-order state from BingX and return any pending VenueEvents.
WHY ASYNC: the old sync version called _backend_snapshot() → _call_backend()
→ _run() → pool.submit(asyncio.run, coro).result(timeout=30s). That spawned
a *new* event loop in a thread-pool thread. The BingxHttpClient (aiohttp
session) is bound to the *main* event loop — using it from a different loop
silently deadlocks. BingX VST responds in ~500ms; the deadlock made every
reconcile call block the main event loop for the full 30s timeout.
FIX: declare async, call backend.refresh_state() directly with await so it
runs in the *caller's* (main) event loop where the session lives.
pump_venue_events() already has `if inspect.isawaitable(events): await events`
— zero caller changes required.
include_history=False: all_orders/all_fills require a symbol (symbol=None
skips them anyway), so include_history=True was fetching nothing extra.
"""
try:
snapshot = await self.backend.refresh_state(None, include_history=False)
except Exception as exc:
import logging as _log
_log.getLogger(__name__).warning("reconcile: refresh_state failed: %s", exc)
return []
return self._events_from_snapshot(snapshot)
def submit(self, intent: KernelIntent) -> List[VenueEvent]:
# Snapshots dropped: receipt executedQty fields take precedence (same as submit_async)
receipt = self._call_backend("submit_intent", self._legacy_intent(intent))
return self._events_from_submit(intent, receipt, None, None)
async def submit_async(self, intent: KernelIntent) -> List[VenueEvent]:
"""Async submit — runs in the caller's event loop, no thread-pool deadlock.
The sync submit() calls _backend_snapshot() × 2 + submit_intent via
_run() → asyncio.run() in a thread-pool → new event loop → aiohttp
session (main-loop-bound) deadlocks → 30s timeout on every ENTER/EXIT.
This version awaits the backend directly. The before/after snapshots
are omitted: fill size comes from the receipt's executedQty field, and
the WS account stream delivers FULL_FILL events independently.
Passing None for snapshots makes _filled_size_from_snapshots return 0.0
(a safe fallback; the receipt fields take precedence).
"""
receipt = await self.backend.submit_intent(self._legacy_intent(intent))
return self._events_from_submit(intent, receipt, None, None)
def _events_from_submit(self, intent: KernelIntent, receipt: Any, before, after) -> List[VenueEvent]: # noqa: ANN001
ack_row = dict(getattr(receipt, "raw_ack", {}) or {})
status = _normalize_status(getattr(receipt, "status", "") or _row_text(ack_row, "status", default="NEW"))
order_id = _row_text(ack_row, "orderId", "orderID", default=str(getattr(receipt, "order_id", "") or ""))
client_order_id = _row_text(ack_row, "clientOrderID", "clientOrderId", default=str(getattr(receipt, "client_order_id", "") or intent.intent_id))
if status in {"RATE_LIMITED", "THROTTLED"}:
return [
VenueEvent(
timestamp=getattr(receipt, "timestamp", datetime.now(timezone.utc)),
event_id=_event_id(self._event_seq),
trade_id=intent.trade_id,
slot_id=intent.slot_id,
kind=KernelEventKind.RATE_LIMITED,
status=VenueEventStatus.RATE_LIMITED,
venue_order_id=order_id,
venue_client_id=client_order_id,
side=intent.side,
asset=intent.asset,
price=safe_float(getattr(receipt, "price", 0.0), 0.0),
size=float(intent.target_size or 0.0),
filled_size=0.0,
remaining_size=float(intent.target_size or 0.0),
reason=_row_text(ack_row, "msg", "message", default="BINGX_RATE_LIMITED"),
raw_payload=ack_row or json_safe(receipt),
metadata={"intent_id": intent.intent_id, "action": intent.action.value, "retry_after_ms": _rate_limit_retry_after_ms(ack_row)},
)
]
base_event = VenueEvent(
timestamp=getattr(receipt, "timestamp", datetime.now(timezone.utc)),
event_id=_event_id(self._event_seq),
trade_id=intent.trade_id,
slot_id=intent.slot_id,
kind=KernelEventKind.ORDER_ACK,
status=VenueEventStatus.ACKED,
venue_order_id=order_id,
venue_client_id=client_order_id,
side=intent.side,
asset=intent.asset,
price=safe_float(getattr(receipt, "price", 0.0), 0.0),
size=float(intent.target_size or 0.0),
filled_size=0.0,
remaining_size=float(intent.target_size or 0.0),
reason="",
raw_payload=ack_row or json_safe(receipt),
metadata={"intent_id": intent.intent_id, "action": intent.action.value},
)
if status in {"REJECTED", "FAILED"}:
return [
VenueEvent(
**{**base_event.__dict__, "event_id": _event_id(self._event_seq), "kind": KernelEventKind.ORDER_REJECT, "status": VenueEventStatus.REJECTED, "reason": _row_text(ack_row, "msg", "message", default="BINGX_ORDER_REJECTED")},
)
]
# Extract friction fields annotated by submit_intent (Gap 1/2/3).
fee_estimated = float(ack_row.get("_fee_estimated") or 0.0)
fee_source = str(ack_row.get("_fee_source") or "")
is_maker_est = bool(ack_row.get("_is_maker_est", False))
mark_at_submit = float(ack_row.get("_mark_at_submit") or 0.0)
slippage_bps = float(ack_row.get("_slippage_bps") or 0.0)
exchange_ts = int(ack_row.get("_exchange_ts") or 0)
events = [base_event]
fill_status = _venue_event_status_from_row(status)
filled_size = _row_float(ack_row, "executedQty", "cumFilledQty", "filledQty", "lastFilledQty", default=0.0)
snapshot_fill_size = self._filled_size_from_snapshots(before, after, intent.asset)
if filled_size <= 0:
filled_size = snapshot_fill_size
emit_fill = fill_status in {VenueEventStatus.PARTIALLY_FILLED, VenueEventStatus.FILLED} or snapshot_fill_size > 0.0
if emit_fill:
if filled_size <= 0:
filled_size = float(intent.target_size or 0.0)
remaining_size = max(0.0, float(intent.target_size or 0.0) - float(filled_size))
fill_kind = KernelEventKind.FULL_FILL if fill_status == VenueEventStatus.FILLED or remaining_size <= 1e-12 else KernelEventKind.PARTIAL_FILL
events.append(
VenueEvent(
timestamp=base_event.timestamp,
event_id=_event_id(self._event_seq),
trade_id=intent.trade_id,
slot_id=intent.slot_id,
kind=fill_kind,
status=VenueEventStatus.FILLED if fill_kind == KernelEventKind.FULL_FILL else VenueEventStatus.PARTIALLY_FILLED,
venue_order_id=order_id,
venue_client_id=client_order_id,
side=intent.side,
asset=intent.asset,
price=safe_float(_row_float(ack_row, "avgPrice", "ap", "price", "lastFillPrice", default=getattr(receipt, "price", 0.0)), 0.0),
size=float(intent.target_size or 0.0),
filled_size=float(filled_size),
remaining_size=float(remaining_size),
reason="",
raw_payload=ack_row or json_safe(receipt),
metadata={"intent_id": intent.intent_id, "action": intent.action.value},
# Gap 1/2/3: fee + friction fields populated from submit_intent annotations.
# fee_source="ESTIMATED_*" until WS FILL_SETTLED updates it to "WS_SETTLED".
fee=fee_estimated,
fee_asset="USDT",
fee_source=fee_source,
is_maker=is_maker_est,
exchange_ts=exchange_ts,
slippage_bps=slippage_bps,
mark_at_submit=mark_at_submit,
)
)
return events
def _events_from_cancel(self, order: VenueOrder, response: Any, before, after, *, reason: str = "") -> List[VenueEvent]: # noqa: ANN001
raw = response if isinstance(response, dict) else {}
status = _normalize_status(_row_text(raw, "status", default="CANCELED"))
if status in {"RATE_LIMITED", "THROTTLED"}:
return [
VenueEvent(
timestamp=datetime.now(timezone.utc),
event_id=_event_id(self._event_seq),
trade_id=order.internal_trade_id or order.venue_client_id,
slot_id=int(order.metadata.get("slot_id", 0) or 0),
kind=KernelEventKind.RATE_LIMITED,
status=VenueEventStatus.RATE_LIMITED,
venue_order_id=order.venue_order_id,
venue_client_id=order.venue_client_id,
side=order.side,
asset=str(order.metadata.get("asset") or ""),
price=safe_float(_row_float(raw, "avgPrice", "ap", "price", "lastFillPrice", default=order.average_fill_price), 0.0),
size=float(order.intended_size or 0.0),
filled_size=float(order.filled_size or 0.0),
remaining_size=float(order.remaining_size),
reason=reason or _row_text(raw, "msg", "message", default="BINGX_RATE_LIMITED"),
raw_payload=raw or {"orderId": order.venue_order_id, "clientOrderId": order.venue_client_id, "status": status or "RATE_LIMITED"},
metadata={**dict(order.metadata), "retry_after_ms": _rate_limit_retry_after_ms(raw)},
)
]
event_status = _venue_event_status_from_row(status)
kind = KernelEventKind.CANCEL_ACK if event_status == VenueEventStatus.CANCELED else KernelEventKind.CANCEL_REJECT
if event_status == VenueEventStatus.CANCELED_REJECTED:
kind = KernelEventKind.CANCEL_REJECT
return [
VenueEvent(
timestamp=datetime.now(timezone.utc),
event_id=_event_id(self._event_seq),
trade_id=order.internal_trade_id or order.venue_client_id,
slot_id=int(order.metadata.get("slot_id", 0) or 0),
kind=kind,
status=event_status,
venue_order_id=order.venue_order_id,
venue_client_id=order.venue_client_id,
side=order.side,
asset=str(order.metadata.get("asset") or ""),
price=safe_float(_row_float(raw, "avgPrice", "ap", "price", "lastFillPrice", default=order.average_fill_price), 0.0),
size=float(order.intended_size or 0.0),
filled_size=float(order.filled_size or 0.0),
remaining_size=float(order.remaining_size),
reason=reason or _row_text(raw, "msg", "message", default="BINGX_CANCEL_ACK" if kind == KernelEventKind.CANCEL_ACK else "BINGX_CANCEL_REJECT"),
raw_payload=raw or {"orderId": order.venue_order_id, "clientOrderId": order.venue_client_id, "status": status or event_status.value},
metadata=dict(order.metadata),
)
]
def _events_from_snapshot(self, snapshot: Any) -> List[VenueEvent]: # noqa: ANN001
events: list[VenueEvent] = []
seen: set[tuple[str, str, str]] = set()
for row in getattr(snapshot, "open_orders", []) or []:
if not isinstance(row, dict):
continue
event = self._event_from_row(row, slot_id=0)
key = (event.venue_client_id, event.venue_order_id, event.kind.value)
if key not in seen:
seen.add(key)
events.append(event)
for row in getattr(snapshot, "all_orders", []) or []:
if not isinstance(row, dict):
continue
event = self._event_from_row(row, slot_id=0)
key = (event.venue_client_id, event.venue_order_id, event.kind.value)
if key not in seen:
seen.add(key)
events.append(event)
for row in getattr(snapshot, "all_fills", []) or []:
if not isinstance(row, dict):
continue
event = self._fill_event_from_row(row)
key = (event.venue_client_id, event.venue_order_id, event.kind.value)
if key not in seen:
seen.add(key)
events.append(event)
return events
def _event_from_row(self, row: dict[str, Any], *, slot_id: int) -> VenueEvent:
status = _normalize_status(_row_text(row, "status", "X", default="NEW"))
event_status = _venue_event_status_from_row(status)
kind = {
VenueEventStatus.ACKED: KernelEventKind.ORDER_ACK,
VenueEventStatus.PARTIALLY_FILLED: KernelEventKind.PARTIAL_FILL,
VenueEventStatus.FILLED: KernelEventKind.FULL_FILL,
VenueEventStatus.CANCELED: KernelEventKind.CANCEL_ACK,
VenueEventStatus.REJECTED: KernelEventKind.ORDER_REJECT,
VenueEventStatus.CANCELED_REJECTED: KernelEventKind.CANCEL_REJECT,
VenueEventStatus.RATE_LIMITED: KernelEventKind.RATE_LIMITED,
}.get(event_status, KernelEventKind.ORDER_ACK)
size = _row_float(row, "origQty", "quantity", "q", "positionAmt", default=0.0)
filled = _row_float(row, "executedQty", "cumFilledQty", "filledQty", "z", "lastFilledQty", default=0.0)
if filled <= 0.0 and kind in {KernelEventKind.PARTIAL_FILL, KernelEventKind.FULL_FILL}:
filled = size
return VenueEvent(
timestamp=datetime.now(timezone.utc),
event_id=_event_id(self._event_seq),
trade_id=_row_text(row, "tradeId", "trade_id", default=_row_text(row, "clientOrderId", "clientOrderID", default="")),
slot_id=slot_id,
kind=kind,
status=event_status,
venue_order_id=_row_text(row, "orderId", "orderID", "id", default=""),
venue_client_id=_row_text(row, "clientOrderID", "clientOrderId", "c", default=""),
side=_trade_side_from_row(row),
asset=_row_text(row, "symbol", default=""),
price=safe_float(_row_float(row, "avgPrice", "ap", "price", "lastFillPrice", default=0.0), 0.0),
size=abs(float(size or 0.0)),
filled_size=abs(float(filled or 0.0)),
remaining_size=max(0.0, abs(float(size or 0.0)) - abs(float(filled or 0.0))),
reason=_row_text(row, "msg", "message", default=""),
raw_payload=dict(row),
metadata={"source": "bingx"},
)
def _fill_event_from_row(self, row: dict[str, Any]) -> VenueEvent:
status = _normalize_status(_row_text(row, "status", "X", default="FILLED"))
event_status = _venue_event_status_from_row(status)
kind = KernelEventKind.FULL_FILL if event_status == VenueEventStatus.FILLED else KernelEventKind.PARTIAL_FILL
return VenueEvent(
timestamp=datetime.now(timezone.utc),
event_id=_event_id(self._event_seq),
trade_id=_row_text(row, "tradeId", "trade_id", default=_row_text(row, "clientOrderId", "clientOrderID", default="")),
slot_id=0,
kind=kind,
status=event_status,
venue_order_id=_row_text(row, "orderId", "orderID", "id", default=""),
venue_client_id=_row_text(row, "clientOrderID", "clientOrderId", "c", default=""),
side=_trade_side_from_row(row),
asset=_row_text(row, "symbol", default=""),
price=safe_float(_row_float(row, "lastFillPrice", "L", "price", "ap", default=0.0), 0.0),
size=abs(_row_float(row, "executedQty", "z", "lastFilledQty", default=0.0)),
filled_size=abs(_row_float(row, "lastFilledQty", "l", "z", default=0.0)),
remaining_size=max(0.0, abs(_row_float(row, "executedQty", "z", "lastFilledQty", default=0.0)) - abs(_row_float(row, "lastFilledQty", "l", "z", default=0.0))),
reason=_row_text(row, "msg", "message", default=""),
raw_payload=dict(row),
metadata={"source": "bingx"},
)
@staticmethod
def _filled_size_from_snapshots(before: Any, after: Any, asset: str) -> float: # noqa: ANN001
def _lookup(snapshot: Any) -> float:
positions = getattr(snapshot, "open_positions", {}) or {}
for key, row in positions.items():
symbol = _row_text(row, "symbol", default=str(key))
if symbol.replace("-", "").replace("_", "").upper() == asset.replace("-", "").replace("_", "").upper():
return _position_qty(row)
return 0.0
before_qty = _lookup(before)
after_qty = _lookup(after)
diff = abs(before_qty - after_qty)
return diff