2026-06-02 14:10:49 +02:00
|
|
|
|
"""DITAv2 BingX venue adapter.
|
|
|
|
|
|
|
|
|
|
|
|
This is a thin normalization layer over the existing direct BingX execution
|
|
|
|
|
|
surface. It converts BingX REST/account/order payloads into DITAv2
|
|
|
|
|
|
``VenueEvent`` / ``VenueOrder`` objects without reimplementing exchange logic.
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
|
import concurrent.futures
|
|
|
|
|
|
import inspect
|
|
|
|
|
|
import itertools
|
|
|
|
|
|
import re
|
|
|
|
|
|
import threading
|
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
|
from typing import Any, Iterable, List, Optional
|
|
|
|
|
|
|
|
|
|
|
|
from prod.clean_arch.dita import DecisionAction as LegacyDecisionAction
|
|
|
|
|
|
from prod.clean_arch.dita import Intent as LegacyIntent
|
|
|
|
|
|
from prod.clean_arch.dita import TradeSide as LegacyTradeSide
|
|
|
|
|
|
|
|
|
|
|
|
from prod.bingx.http import BingxHttpError
|
|
|
|
|
|
|
|
|
|
|
|
from .contracts import (
|
|
|
|
|
|
KernelCommandType,
|
|
|
|
|
|
KernelEventKind,
|
|
|
|
|
|
KernelIntent,
|
|
|
|
|
|
TradeSide,
|
|
|
|
|
|
VenueEvent,
|
|
|
|
|
|
VenueEventStatus,
|
|
|
|
|
|
VenueOrder,
|
|
|
|
|
|
VenueOrderStatus,
|
|
|
|
|
|
)
|
|
|
|
|
|
from .utils import json_safe
|
|
|
|
|
|
from .utils import safe_float
|
|
|
|
|
|
from .venue import VenueAdapter
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _row_text(row: dict[str, Any], *keys: str, default: str = "") -> str:
|
|
|
|
|
|
for key in keys:
|
|
|
|
|
|
value = row.get(key)
|
|
|
|
|
|
if value is None:
|
|
|
|
|
|
continue
|
|
|
|
|
|
text = str(value)
|
|
|
|
|
|
if text:
|
|
|
|
|
|
return text
|
|
|
|
|
|
return default
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _row_float(row: dict[str, Any], *keys: str, default: float = 0.0) -> float:
|
|
|
|
|
|
for key in keys:
|
|
|
|
|
|
try:
|
|
|
|
|
|
value = float(row.get(key) or 0.0)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
continue
|
|
|
|
|
|
if value == value and value not in (float("inf"), float("-inf")) and value != 0.0:
|
|
|
|
|
|
return value
|
|
|
|
|
|
return default
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _normalize_status(status: str) -> str:
|
|
|
|
|
|
return str(status or "").strip().upper()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _trade_side_from_row(row: dict[str, Any], *, fallback: TradeSide = TradeSide.FLAT) -> TradeSide:
|
|
|
|
|
|
side_raw = _row_text(row, "side", "positionSide", default="").upper()
|
|
|
|
|
|
signed_qty = _row_float(row, "positionAmt", "positionQty", "positionSize", "quantity", "pa", default=0.0)
|
|
|
|
|
|
if side_raw in {"BUY", "LONG"}:
|
|
|
|
|
|
return TradeSide.LONG
|
|
|
|
|
|
if side_raw in {"SELL", "SHORT"}:
|
|
|
|
|
|
return TradeSide.SHORT
|
|
|
|
|
|
if signed_qty < 0:
|
|
|
|
|
|
return TradeSide.SHORT
|
|
|
|
|
|
if signed_qty > 0:
|
|
|
|
|
|
return TradeSide.LONG
|
|
|
|
|
|
return fallback
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-06-02 18:04:33 +02:00
|
|
|
|
def _http_error_status(exc_msg: str) -> str:
|
|
|
|
|
|
"""Map a BingxHttpError message to a venue status string.
|
|
|
|
|
|
|
|
|
|
|
|
HTTP 429 and 5xx are transient → RATE_LIMITED so the slot can retry.
|
|
|
|
|
|
4xx (non-429) are genuine client-side rejections → REJECTED.
|
|
|
|
|
|
Transport / DNS / circuit-breaker errors (no HTTP prefix) are transient.
|
|
|
|
|
|
"""
|
|
|
|
|
|
m = exc_msg.upper()
|
|
|
|
|
|
if "HTTP 429" in m:
|
|
|
|
|
|
return "RATE_LIMITED"
|
|
|
|
|
|
for code in ("500", "501", "502", "503", "504"):
|
|
|
|
|
|
if f"HTTP {code}" in m:
|
|
|
|
|
|
return "RATE_LIMITED"
|
|
|
|
|
|
if "HTTP 4" in m:
|
|
|
|
|
|
return "REJECTED"
|
|
|
|
|
|
return "RATE_LIMITED"
|
|
|
|
|
|
|
|
|
|
|
|
|
2026-06-02 14:10:49 +02:00
|
|
|
|
def _venue_event_status_from_row(status: str) -> VenueEventStatus:
|
|
|
|
|
|
normalized = _normalize_status(status)
|
|
|
|
|
|
if normalized in {"NEW", "ACKED", "PENDING", "CREATED"}:
|
|
|
|
|
|
return VenueEventStatus.ACKED
|
|
|
|
|
|
if normalized in {"RATE_LIMITED", "THROTTLED"}:
|
|
|
|
|
|
return VenueEventStatus.RATE_LIMITED
|
|
|
|
|
|
if normalized in {"PARTIALLY_FILLED", "PARTIAL_FILL"}:
|
|
|
|
|
|
return VenueEventStatus.PARTIALLY_FILLED
|
|
|
|
|
|
if normalized in {"FILLED", "FULL_FILL"}:
|
|
|
|
|
|
return VenueEventStatus.FILLED
|
|
|
|
|
|
if normalized in {"CANCELED", "CANCELLED", "EXPIRED"}:
|
|
|
|
|
|
return VenueEventStatus.CANCELED
|
|
|
|
|
|
if normalized in {"REJECTED", "FAILED"}:
|
|
|
|
|
|
return VenueEventStatus.REJECTED
|
|
|
|
|
|
if normalized in {"CANCEL_REJECTED", "CANCEL_REJECT"}:
|
|
|
|
|
|
return VenueEventStatus.CANCELED_REJECTED
|
|
|
|
|
|
return VenueEventStatus.ACKED
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _venue_order_status_from_row(status: str) -> VenueOrderStatus:
|
|
|
|
|
|
normalized = _normalize_status(status)
|
|
|
|
|
|
if normalized in {"NEW", "ACKED", "PENDING", "CREATED"}:
|
|
|
|
|
|
return VenueOrderStatus.NEW
|
|
|
|
|
|
if normalized in {"RATE_LIMITED", "THROTTLED"}:
|
|
|
|
|
|
return VenueOrderStatus.NEW
|
|
|
|
|
|
if normalized in {"PARTIALLY_FILLED", "PARTIAL_FILL"}:
|
|
|
|
|
|
return VenueOrderStatus.PARTIALLY_FILLED
|
|
|
|
|
|
if normalized in {"FILLED", "FULL_FILL"}:
|
|
|
|
|
|
return VenueOrderStatus.FILLED
|
|
|
|
|
|
if normalized in {"CANCELED", "CANCELLED", "EXPIRED"}:
|
|
|
|
|
|
return VenueOrderStatus.CANCELED
|
|
|
|
|
|
if normalized in {"REJECTED", "FAILED"}:
|
|
|
|
|
|
return VenueOrderStatus.REJECTED
|
|
|
|
|
|
return VenueOrderStatus.NEW
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _position_qty(row: dict[str, Any]) -> float:
|
|
|
|
|
|
qty = _row_float(row, "positionAmt", "positionQty", "positionSize", "quantity", "pa", default=0.0)
|
|
|
|
|
|
if qty != 0.0:
|
|
|
|
|
|
return abs(qty)
|
|
|
|
|
|
return abs(_row_float(row, "executedQty", "filledQty", "z", default=0.0))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _position_price(row: dict[str, Any]) -> float:
|
|
|
|
|
|
return _row_float(row, "entryPrice", "avgPrice", "avgEntryPrice", "ep", "ap", "price", "lastFillPrice", "tradePrice")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _mapping_for_snapshot(rows: Iterable[dict[str, Any]]) -> dict[str, dict[str, Any]]:
|
|
|
|
|
|
mapping: dict[str, dict[str, Any]] = {}
|
|
|
|
|
|
for row in rows:
|
|
|
|
|
|
client_id = _row_text(row, "clientOrderID", "clientOrderId", default="")
|
|
|
|
|
|
order_id = _row_text(row, "orderId", "orderID", "id", default="")
|
|
|
|
|
|
key = client_id or order_id
|
|
|
|
|
|
if key:
|
|
|
|
|
|
mapping[key] = dict(row)
|
|
|
|
|
|
if order_id and order_id not in mapping:
|
|
|
|
|
|
mapping[order_id] = dict(row)
|
|
|
|
|
|
return mapping
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _venue_order_from_row(
|
|
|
|
|
|
row: dict[str, Any],
|
|
|
|
|
|
*,
|
|
|
|
|
|
internal_trade_id: str = "",
|
|
|
|
|
|
fallback_side: TradeSide = TradeSide.FLAT,
|
|
|
|
|
|
) -> VenueOrder:
|
|
|
|
|
|
side = _trade_side_from_row(row, fallback=fallback_side)
|
|
|
|
|
|
client_id = _row_text(row, "clientOrderID", "clientOrderId", default="")
|
|
|
|
|
|
order_id = _row_text(row, "orderId", "orderID", "id", default="")
|
|
|
|
|
|
intended = _row_float(row, "origQty", "quantity", "q", "positionAmt", "positionQty", default=0.0)
|
|
|
|
|
|
if intended <= 0:
|
|
|
|
|
|
intended = _position_qty(row)
|
|
|
|
|
|
return VenueOrder(
|
|
|
|
|
|
internal_trade_id=internal_trade_id or client_id or order_id,
|
|
|
|
|
|
venue_order_id=order_id,
|
|
|
|
|
|
venue_client_id=client_id,
|
|
|
|
|
|
side=side,
|
|
|
|
|
|
intended_size=abs(float(intended or 0.0)),
|
|
|
|
|
|
filled_size=abs(_row_float(row, "executedQty", "filledQty", "z", "lastFilledQty", default=0.0)),
|
|
|
|
|
|
average_fill_price=_position_price(row),
|
|
|
|
|
|
status=_venue_order_status_from_row(_row_text(row, "status", "X", default="NEW")),
|
|
|
|
|
|
metadata={"raw": dict(row)},
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _event_id(seq: itertools.count) -> str:
|
|
|
|
|
|
return f"EV-{next(seq):08d}"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _rate_limit_retry_after_ms(row: dict[str, Any]) -> int:
|
|
|
|
|
|
raw_retry = row.get("retryAfter") or row.get("retry_after_ms") or row.get("retryAfterMs")
|
|
|
|
|
|
if raw_retry is None:
|
|
|
|
|
|
msg = _row_text(row, "msg", "message", default="")
|
|
|
|
|
|
match = re.search(r"unblocked after (\d+)", msg)
|
|
|
|
|
|
if match:
|
|
|
|
|
|
try:
|
|
|
|
|
|
ts = int(match.group(1))
|
|
|
|
|
|
now_ms = int(datetime.now(timezone.utc).timestamp() * 1000)
|
|
|
|
|
|
return max(0, ts - now_ms)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
return 0
|
|
|
|
|
|
return 0
|
|
|
|
|
|
try:
|
|
|
|
|
|
return max(0, int(float(raw_retry)))
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BingxVenueAdapter(VenueAdapter):
|
|
|
|
|
|
"""Normalizes BingX execution responses into DITAv2 venue events."""
|
|
|
|
|
|
|
|
|
|
|
|
# Shared thread-pool executor reused across all adapter instances and
|
|
|
|
|
|
# all calls. Threads are created once and recycled, eliminating the
|
|
|
|
|
|
# per-call creation/destruction overhead of the old pattern.
|
|
|
|
|
|
_EXECUTOR: concurrent.futures.ThreadPoolExecutor | None = None
|
|
|
|
|
|
_EXECUTOR_LOCK: threading.Lock = threading.Lock()
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
|
def _get_executor(cls) -> concurrent.futures.ThreadPoolExecutor:
|
|
|
|
|
|
if cls._EXECUTOR is None:
|
|
|
|
|
|
with cls._EXECUTOR_LOCK:
|
|
|
|
|
|
if cls._EXECUTOR is None:
|
|
|
|
|
|
# max_workers=3 so three concurrent HTTP calls (balance,
|
|
|
|
|
|
# positions, openOrders) can proceed simultaneously without
|
|
|
|
|
|
# serialising on the pool.
|
|
|
|
|
|
cls._EXECUTOR = concurrent.futures.ThreadPoolExecutor(
|
|
|
|
|
|
max_workers=3,
|
|
|
|
|
|
thread_name_prefix="bingx_adapter",
|
|
|
|
|
|
)
|
|
|
|
|
|
return cls._EXECUTOR
|
|
|
|
|
|
|
|
|
|
|
|
def __init__(self, backend: Any | None = None, *, config: Any | None = None) -> None:
|
|
|
|
|
|
if backend is None:
|
|
|
|
|
|
if config is None:
|
|
|
|
|
|
raise ValueError("BingxVenueAdapter requires a backend or config")
|
|
|
|
|
|
from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter
|
|
|
|
|
|
|
|
|
|
|
|
backend = BingxDirectExecutionAdapter(config)
|
|
|
|
|
|
self.backend = backend
|
|
|
|
|
|
self._event_seq = itertools.count(1)
|
|
|
|
|
|
# Thread-safe snapshot cache — reads from a snapshot may arrive from
|
|
|
|
|
|
# the kernel thread while _backend_snapshot writes from the pool thread.
|
|
|
|
|
|
self._snap_lock = threading.Lock()
|
|
|
|
|
|
self._last_snapshot = None
|
|
|
|
|
|
self._snapshot_ready = threading.Event()
|
|
|
|
|
|
self._snapshot_ready.set() # initially ready (no pending write)
|
|
|
|
|
|
|
|
|
|
|
|
# Maximum seconds to wait for a single backend HTTP call. BingX REST
|
|
|
|
|
|
# round-trips are ~0.5–2 s in normal conditions; 30 s is generous enough
|
|
|
|
|
|
# to survive transient slowdowns without hanging the process forever (O5).
|
|
|
|
|
|
_BACKEND_TIMEOUT_S: float = 30.0
|
|
|
|
|
|
|
|
|
|
|
|
def _run(self, result: Any) -> Any:
|
|
|
|
|
|
if inspect.isawaitable(result):
|
|
|
|
|
|
try:
|
|
|
|
|
|
asyncio.get_running_loop()
|
|
|
|
|
|
except RuntimeError:
|
|
|
|
|
|
return asyncio.run(result)
|
|
|
|
|
|
# Inside a running event loop: submit to the shared singleton
|
|
|
|
|
|
# executor so threads are reused across calls.
|
|
|
|
|
|
pool = self._get_executor()
|
|
|
|
|
|
try:
|
|
|
|
|
|
return pool.submit(asyncio.run, result).result(timeout=self._BACKEND_TIMEOUT_S)
|
|
|
|
|
|
except TimeoutError as exc:
|
|
|
|
|
|
raise TimeoutError(
|
|
|
|
|
|
f"BingX backend call exceeded {self._BACKEND_TIMEOUT_S}s timeout"
|
|
|
|
|
|
) from exc
|
|
|
|
|
|
return result
|
|
|
|
|
|
|
2026-06-02 18:04:33 +02:00
|
|
|
|
def close(self) -> None:
|
|
|
|
|
|
"""V2: release the class-level thread-pool and any backend HTTP session."""
|
|
|
|
|
|
executor = self.__class__._EXECUTOR
|
|
|
|
|
|
if executor is not None:
|
|
|
|
|
|
with self.__class__._EXECUTOR_LOCK:
|
|
|
|
|
|
if self.__class__._EXECUTOR is executor:
|
|
|
|
|
|
self.__class__._EXECUTOR = None
|
|
|
|
|
|
executor.shutdown(wait=False)
|
|
|
|
|
|
_maybe_close_backend = getattr(self.backend, "close", None)
|
|
|
|
|
|
if _maybe_close_backend is not None:
|
|
|
|
|
|
try:
|
|
|
|
|
|
_maybe_close_backend()
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
2026-06-02 14:10:49 +02:00
|
|
|
|
def _call_backend(self, method_name: str, *args: Any, **kwargs: Any) -> Any:
|
|
|
|
|
|
method = getattr(self.backend, method_name, None)
|
|
|
|
|
|
if method is None:
|
|
|
|
|
|
raise AttributeError(f"backend has no method {method_name}")
|
|
|
|
|
|
return self._run(method(*args, **kwargs))
|
|
|
|
|
|
|
|
|
|
|
|
def _backend_snapshot(self, *, include_history: bool = False, timeout_ms: float = 5000.0):
|
|
|
|
|
|
"""Fetch a fresh snapshot from the backend and cache it thread-safely.
|
|
|
|
|
|
|
|
|
|
|
|
Design (industry best-practice reader-writer pattern):
|
|
|
|
|
|
- A caller that needs a fresh snapshot *waits* on ``_snapshot_ready``
|
|
|
|
|
|
before reading, so it never sees a stale partial write.
|
|
|
|
|
|
- While a snapshot fetch is in-flight, the lock is cleared; concurrent
|
|
|
|
|
|
callers block on ``_snapshot_ready`` with a timeout. If the fetch
|
|
|
|
|
|
succeeds in time they get the fresh snapshot; if it times out they
|
|
|
|
|
|
fall back to ``_last_snapshot`` (an eventually-consistent design —
|
|
|
|
|
|
stale data that *was* consistent is safer than no data).
|
|
|
|
|
|
- The write is guarded by ``_snap_lock`` so concurrent writes are
|
|
|
|
|
|
serialised and ``_last_snapshot`` is never partially assigned.
|
|
|
|
|
|
"""
|
|
|
|
|
|
if not self._snapshot_ready.wait(timeout=timeout_ms / 1000.0):
|
|
|
|
|
|
# Timeout waiting for a previous snapshot write — return the
|
|
|
|
|
|
# last-known-good snapshot rather than blocking the caller.
|
|
|
|
|
|
with self._snap_lock:
|
|
|
|
|
|
return self._last_snapshot
|
|
|
|
|
|
|
|
|
|
|
|
self._snapshot_ready.clear()
|
|
|
|
|
|
try:
|
|
|
|
|
|
snapshot = self._call_backend("refresh_state", None, include_history=include_history)
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
self._snapshot_ready.set()
|
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
with self._snap_lock:
|
|
|
|
|
|
self._last_snapshot = snapshot
|
|
|
|
|
|
self._snapshot_ready.set()
|
|
|
|
|
|
return snapshot
|
|
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
|
def _legacy_intent(intent: KernelIntent) -> LegacyIntent:
|
|
|
|
|
|
action = LegacyDecisionAction.ENTER if intent.action == KernelCommandType.ENTER else LegacyDecisionAction.EXIT
|
|
|
|
|
|
side = LegacyTradeSide.SHORT if intent.side == TradeSide.SHORT else LegacyTradeSide.LONG
|
|
|
|
|
|
metadata = dict(intent.metadata)
|
|
|
|
|
|
metadata["_order_type"] = getattr(intent, "order_type", "MARKET")
|
|
|
|
|
|
metadata["_limit_price"] = float(getattr(intent, "limit_price", 0.0) or 0.0)
|
|
|
|
|
|
return LegacyIntent(
|
|
|
|
|
|
timestamp=intent.timestamp,
|
|
|
|
|
|
trade_id=intent.trade_id,
|
|
|
|
|
|
decision_id=intent.intent_id,
|
|
|
|
|
|
asset=intent.asset,
|
|
|
|
|
|
action=action,
|
|
|
|
|
|
side=side,
|
|
|
|
|
|
reason=intent.reason,
|
|
|
|
|
|
target_size=float(intent.target_size),
|
|
|
|
|
|
leverage=float(intent.leverage),
|
|
|
|
|
|
reference_price=float(intent.reference_price),
|
|
|
|
|
|
confidence=1.0,
|
|
|
|
|
|
bars_held=0,
|
|
|
|
|
|
exit_leg_ratios=tuple(intent.exit_leg_ratios or (1.0,)),
|
|
|
|
|
|
metadata=metadata,
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def connect(self) -> bool:
|
|
|
|
|
|
result = getattr(self.backend, "connect", None)
|
|
|
|
|
|
if result is not None:
|
|
|
|
|
|
self._run(result())
|
|
|
|
|
|
self._backend_snapshot(include_history=True)
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
def cancel(self, order: VenueOrder, *, reason: str = "") -> List[VenueEvent]:
|
|
|
|
|
|
snapshot_before = self._backend_snapshot(include_history=True)
|
|
|
|
|
|
response = None
|
|
|
|
|
|
if hasattr(self.backend, "cancel_order"):
|
|
|
|
|
|
response = self._call_backend("cancel_order", order, reason=reason)
|
|
|
|
|
|
elif hasattr(self.backend, "cancel"):
|
|
|
|
|
|
response = self._call_backend("cancel", order, reason=reason)
|
|
|
|
|
|
else:
|
|
|
|
|
|
client = getattr(self.backend, "_client", None)
|
|
|
|
|
|
instrument_symbol = ""
|
|
|
|
|
|
if hasattr(self.backend, "_instrument_venue_symbol"):
|
|
|
|
|
|
asset = str(order.metadata.get("asset") or "")
|
|
|
|
|
|
if not asset:
|
|
|
|
|
|
slot_id = int(order.metadata.get("slot_id", 0) or 0)
|
|
|
|
|
|
if hasattr(self, "_kernel_ref") and self._kernel_ref is not None:
|
|
|
|
|
|
try:
|
|
|
|
|
|
asset = self._kernel_ref.slot(slot_id).asset
|
|
|
|
|
|
except Exception:
|
|
|
|
|
|
pass
|
|
|
|
|
|
if not asset:
|
|
|
|
|
|
asset = str(order.metadata.get("asset") or "")
|
|
|
|
|
|
instrument_symbol = str(self.backend._instrument_venue_symbol(asset)) if asset else ""
|
|
|
|
|
|
if client is None or not instrument_symbol:
|
|
|
|
|
|
raise RuntimeError("backend does not expose a cancel surface")
|
|
|
|
|
|
params = {"symbol": instrument_symbol}
|
|
|
|
|
|
if order.venue_order_id:
|
|
|
|
|
|
params["orderId"] = order.venue_order_id
|
|
|
|
|
|
else:
|
|
|
|
|
|
params["clientOrderId"] = order.venue_client_id
|
|
|
|
|
|
try:
|
|
|
|
|
|
response = self._run(client.signed_delete("/openApi/swap/v2/trade/order", params))
|
|
|
|
|
|
except BingxHttpError as exc:
|
2026-06-02 18:04:33 +02:00
|
|
|
|
# W10: map HTTP error class to status — 429/5xx are transient, 4xx are real rejections
|
|
|
|
|
|
response = {"status": _http_error_status(str(exc)), "msg": str(exc), "orderId": order.venue_order_id, "clientOrderId": order.venue_client_id}
|
2026-06-02 14:10:49 +02:00
|
|
|
|
snapshot_after = self._backend_snapshot(include_history=True)
|
|
|
|
|
|
return self._events_from_cancel(order, response, snapshot_before, snapshot_after, reason=reason)
|
|
|
|
|
|
|
|
|
|
|
|
def open_orders(self) -> List[VenueOrder]:
|
|
|
|
|
|
snapshot = self._backend_snapshot(include_history=False)
|
|
|
|
|
|
return [_venue_order_from_row(row) for row in (snapshot.open_orders or [])]
|
|
|
|
|
|
|
|
|
|
|
|
def open_positions(self) -> List[dict[str, Any]]:
|
|
|
|
|
|
snapshot = self._backend_snapshot(include_history=False)
|
|
|
|
|
|
return [dict(row) for row in (snapshot.open_positions or {}).values()]
|
|
|
|
|
|
|
2026-06-04 18:46:19 +02:00
|
|
|
|
async def reconcile(self) -> List[VenueEvent]: # type: ignore[override]
|
|
|
|
|
|
"""Fetch open-order state from BingX and return any pending VenueEvents.
|
|
|
|
|
|
|
|
|
|
|
|
WHY ASYNC: the old sync version called _backend_snapshot() → _call_backend()
|
|
|
|
|
|
→ _run() → pool.submit(asyncio.run, coro).result(timeout=30s). That spawned
|
|
|
|
|
|
a *new* event loop in a thread-pool thread. The BingxHttpClient (aiohttp
|
|
|
|
|
|
session) is bound to the *main* event loop — using it from a different loop
|
|
|
|
|
|
silently deadlocks. BingX VST responds in ~500ms; the deadlock made every
|
|
|
|
|
|
reconcile call block the main event loop for the full 30s timeout.
|
|
|
|
|
|
|
|
|
|
|
|
FIX: declare async, call backend.refresh_state() directly with await so it
|
|
|
|
|
|
runs in the *caller's* (main) event loop where the session lives.
|
|
|
|
|
|
pump_venue_events() already has `if inspect.isawaitable(events): await events`
|
|
|
|
|
|
— zero caller changes required.
|
|
|
|
|
|
|
|
|
|
|
|
include_history=False: all_orders/all_fills require a symbol (symbol=None
|
|
|
|
|
|
skips them anyway), so include_history=True was fetching nothing extra.
|
|
|
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
|
|
|
snapshot = await self.backend.refresh_state(None, include_history=False)
|
|
|
|
|
|
except Exception as exc:
|
|
|
|
|
|
import logging as _log
|
|
|
|
|
|
_log.getLogger(__name__).warning("reconcile: refresh_state failed: %s", exc)
|
|
|
|
|
|
return []
|
2026-06-02 14:10:49 +02:00
|
|
|
|
return self._events_from_snapshot(snapshot)
|
|
|
|
|
|
|
|
|
|
|
|
def submit(self, intent: KernelIntent) -> List[VenueEvent]:
|
|
|
|
|
|
snapshot_before = self._backend_snapshot(include_history=True)
|
|
|
|
|
|
receipt = self._call_backend("submit_intent", self._legacy_intent(intent))
|
|
|
|
|
|
snapshot_after = self._backend_snapshot(include_history=True)
|
|
|
|
|
|
return self._events_from_submit(intent, receipt, snapshot_before, snapshot_after)
|
|
|
|
|
|
|
2026-06-04 21:02:26 +02:00
|
|
|
|
async def submit_async(self, intent: KernelIntent) -> List[VenueEvent]:
|
|
|
|
|
|
"""Async submit — runs in the caller's event loop, no thread-pool deadlock.
|
|
|
|
|
|
|
|
|
|
|
|
The sync submit() calls _backend_snapshot() × 2 + submit_intent via
|
|
|
|
|
|
_run() → asyncio.run() in a thread-pool → new event loop → aiohttp
|
|
|
|
|
|
session (main-loop-bound) deadlocks → 30s timeout on every ENTER/EXIT.
|
|
|
|
|
|
|
|
|
|
|
|
This version awaits the backend directly. The before/after snapshots
|
|
|
|
|
|
are omitted: fill size comes from the receipt's executedQty field, and
|
|
|
|
|
|
the WS account stream delivers FULL_FILL events independently.
|
|
|
|
|
|
Passing None for snapshots makes _filled_size_from_snapshots return 0.0
|
|
|
|
|
|
(a safe fallback; the receipt fields take precedence).
|
|
|
|
|
|
"""
|
|
|
|
|
|
receipt = await self.backend.submit_intent(self._legacy_intent(intent))
|
|
|
|
|
|
return self._events_from_submit(intent, receipt, None, None)
|
|
|
|
|
|
|
2026-06-02 14:10:49 +02:00
|
|
|
|
def _events_from_submit(self, intent: KernelIntent, receipt: Any, before, after) -> List[VenueEvent]: # noqa: ANN001
|
|
|
|
|
|
ack_row = dict(getattr(receipt, "raw_ack", {}) or {})
|
|
|
|
|
|
status = _normalize_status(getattr(receipt, "status", "") or _row_text(ack_row, "status", default="NEW"))
|
|
|
|
|
|
order_id = _row_text(ack_row, "orderId", "orderID", default=str(getattr(receipt, "order_id", "") or ""))
|
|
|
|
|
|
client_order_id = _row_text(ack_row, "clientOrderID", "clientOrderId", default=str(getattr(receipt, "client_order_id", "") or intent.intent_id))
|
|
|
|
|
|
if status in {"RATE_LIMITED", "THROTTLED"}:
|
|
|
|
|
|
return [
|
|
|
|
|
|
VenueEvent(
|
|
|
|
|
|
timestamp=getattr(receipt, "timestamp", datetime.now(timezone.utc)),
|
|
|
|
|
|
event_id=_event_id(self._event_seq),
|
|
|
|
|
|
trade_id=intent.trade_id,
|
|
|
|
|
|
slot_id=intent.slot_id,
|
|
|
|
|
|
kind=KernelEventKind.RATE_LIMITED,
|
|
|
|
|
|
status=VenueEventStatus.RATE_LIMITED,
|
|
|
|
|
|
venue_order_id=order_id,
|
|
|
|
|
|
venue_client_id=client_order_id,
|
|
|
|
|
|
side=intent.side,
|
|
|
|
|
|
asset=intent.asset,
|
|
|
|
|
|
price=safe_float(getattr(receipt, "price", 0.0), 0.0),
|
|
|
|
|
|
size=float(intent.target_size or 0.0),
|
|
|
|
|
|
filled_size=0.0,
|
|
|
|
|
|
remaining_size=float(intent.target_size or 0.0),
|
|
|
|
|
|
reason=_row_text(ack_row, "msg", "message", default="BINGX_RATE_LIMITED"),
|
|
|
|
|
|
raw_payload=ack_row or json_safe(receipt),
|
|
|
|
|
|
metadata={"intent_id": intent.intent_id, "action": intent.action.value, "retry_after_ms": _rate_limit_retry_after_ms(ack_row)},
|
|
|
|
|
|
)
|
|
|
|
|
|
]
|
|
|
|
|
|
base_event = VenueEvent(
|
|
|
|
|
|
timestamp=getattr(receipt, "timestamp", datetime.now(timezone.utc)),
|
|
|
|
|
|
event_id=_event_id(self._event_seq),
|
|
|
|
|
|
trade_id=intent.trade_id,
|
|
|
|
|
|
slot_id=intent.slot_id,
|
|
|
|
|
|
kind=KernelEventKind.ORDER_ACK,
|
|
|
|
|
|
status=VenueEventStatus.ACKED,
|
|
|
|
|
|
venue_order_id=order_id,
|
|
|
|
|
|
venue_client_id=client_order_id,
|
|
|
|
|
|
side=intent.side,
|
|
|
|
|
|
asset=intent.asset,
|
|
|
|
|
|
price=safe_float(getattr(receipt, "price", 0.0), 0.0),
|
|
|
|
|
|
size=float(intent.target_size or 0.0),
|
|
|
|
|
|
filled_size=0.0,
|
|
|
|
|
|
remaining_size=float(intent.target_size or 0.0),
|
|
|
|
|
|
reason="",
|
|
|
|
|
|
raw_payload=ack_row or json_safe(receipt),
|
|
|
|
|
|
metadata={"intent_id": intent.intent_id, "action": intent.action.value},
|
|
|
|
|
|
)
|
|
|
|
|
|
if status in {"REJECTED", "FAILED"}:
|
|
|
|
|
|
return [
|
|
|
|
|
|
VenueEvent(
|
|
|
|
|
|
**{**base_event.__dict__, "event_id": _event_id(self._event_seq), "kind": KernelEventKind.ORDER_REJECT, "status": VenueEventStatus.REJECTED, "reason": _row_text(ack_row, "msg", "message", default="BINGX_ORDER_REJECTED")},
|
|
|
|
|
|
)
|
|
|
|
|
|
]
|
|
|
|
|
|
events = [base_event]
|
|
|
|
|
|
fill_status = _venue_event_status_from_row(status)
|
|
|
|
|
|
filled_size = _row_float(ack_row, "executedQty", "cumFilledQty", "filledQty", "lastFilledQty", default=0.0)
|
|
|
|
|
|
snapshot_fill_size = self._filled_size_from_snapshots(before, after, intent.asset)
|
|
|
|
|
|
if filled_size <= 0:
|
|
|
|
|
|
filled_size = snapshot_fill_size
|
|
|
|
|
|
emit_fill = fill_status in {VenueEventStatus.PARTIALLY_FILLED, VenueEventStatus.FILLED} or snapshot_fill_size > 0.0
|
|
|
|
|
|
if emit_fill:
|
|
|
|
|
|
if filled_size <= 0:
|
|
|
|
|
|
filled_size = float(intent.target_size or 0.0)
|
|
|
|
|
|
remaining_size = max(0.0, float(intent.target_size or 0.0) - float(filled_size))
|
|
|
|
|
|
fill_kind = KernelEventKind.FULL_FILL if fill_status == VenueEventStatus.FILLED or remaining_size <= 1e-12 else KernelEventKind.PARTIAL_FILL
|
|
|
|
|
|
events.append(
|
|
|
|
|
|
VenueEvent(
|
|
|
|
|
|
timestamp=base_event.timestamp,
|
|
|
|
|
|
event_id=_event_id(self._event_seq),
|
|
|
|
|
|
trade_id=intent.trade_id,
|
|
|
|
|
|
slot_id=intent.slot_id,
|
|
|
|
|
|
kind=fill_kind,
|
|
|
|
|
|
status=VenueEventStatus.FILLED if fill_kind == KernelEventKind.FULL_FILL else VenueEventStatus.PARTIALLY_FILLED,
|
|
|
|
|
|
venue_order_id=order_id,
|
|
|
|
|
|
venue_client_id=client_order_id,
|
|
|
|
|
|
side=intent.side,
|
|
|
|
|
|
asset=intent.asset,
|
|
|
|
|
|
price=safe_float(_row_float(ack_row, "avgPrice", "ap", "price", "lastFillPrice", default=getattr(receipt, "price", 0.0)), 0.0),
|
|
|
|
|
|
size=float(intent.target_size or 0.0),
|
|
|
|
|
|
filled_size=float(filled_size),
|
|
|
|
|
|
remaining_size=float(remaining_size),
|
|
|
|
|
|
reason="",
|
|
|
|
|
|
raw_payload=ack_row or json_safe(receipt),
|
|
|
|
|
|
metadata={"intent_id": intent.intent_id, "action": intent.action.value},
|
|
|
|
|
|
)
|
|
|
|
|
|
)
|
|
|
|
|
|
return events
|
|
|
|
|
|
|
|
|
|
|
|
def _events_from_cancel(self, order: VenueOrder, response: Any, before, after, *, reason: str = "") -> List[VenueEvent]: # noqa: ANN001
|
|
|
|
|
|
raw = response if isinstance(response, dict) else {}
|
|
|
|
|
|
status = _normalize_status(_row_text(raw, "status", default="CANCELED"))
|
|
|
|
|
|
if status in {"RATE_LIMITED", "THROTTLED"}:
|
|
|
|
|
|
return [
|
|
|
|
|
|
VenueEvent(
|
|
|
|
|
|
timestamp=datetime.now(timezone.utc),
|
|
|
|
|
|
event_id=_event_id(self._event_seq),
|
|
|
|
|
|
trade_id=order.internal_trade_id or order.venue_client_id,
|
|
|
|
|
|
slot_id=int(order.metadata.get("slot_id", 0) or 0),
|
|
|
|
|
|
kind=KernelEventKind.RATE_LIMITED,
|
|
|
|
|
|
status=VenueEventStatus.RATE_LIMITED,
|
|
|
|
|
|
venue_order_id=order.venue_order_id,
|
|
|
|
|
|
venue_client_id=order.venue_client_id,
|
|
|
|
|
|
side=order.side,
|
|
|
|
|
|
asset=str(order.metadata.get("asset") or ""),
|
|
|
|
|
|
price=safe_float(_row_float(raw, "avgPrice", "ap", "price", "lastFillPrice", default=order.average_fill_price), 0.0),
|
|
|
|
|
|
size=float(order.intended_size or 0.0),
|
|
|
|
|
|
filled_size=float(order.filled_size or 0.0),
|
|
|
|
|
|
remaining_size=float(order.remaining_size),
|
|
|
|
|
|
reason=reason or _row_text(raw, "msg", "message", default="BINGX_RATE_LIMITED"),
|
|
|
|
|
|
raw_payload=raw or {"orderId": order.venue_order_id, "clientOrderId": order.venue_client_id, "status": status or "RATE_LIMITED"},
|
|
|
|
|
|
metadata={**dict(order.metadata), "retry_after_ms": _rate_limit_retry_after_ms(raw)},
|
|
|
|
|
|
)
|
|
|
|
|
|
]
|
|
|
|
|
|
event_status = _venue_event_status_from_row(status)
|
|
|
|
|
|
kind = KernelEventKind.CANCEL_ACK if event_status == VenueEventStatus.CANCELED else KernelEventKind.CANCEL_REJECT
|
|
|
|
|
|
if event_status == VenueEventStatus.CANCELED_REJECTED:
|
|
|
|
|
|
kind = KernelEventKind.CANCEL_REJECT
|
|
|
|
|
|
return [
|
|
|
|
|
|
VenueEvent(
|
|
|
|
|
|
timestamp=datetime.now(timezone.utc),
|
|
|
|
|
|
event_id=_event_id(self._event_seq),
|
|
|
|
|
|
trade_id=order.internal_trade_id or order.venue_client_id,
|
|
|
|
|
|
slot_id=int(order.metadata.get("slot_id", 0) or 0),
|
|
|
|
|
|
kind=kind,
|
|
|
|
|
|
status=event_status,
|
|
|
|
|
|
venue_order_id=order.venue_order_id,
|
|
|
|
|
|
venue_client_id=order.venue_client_id,
|
|
|
|
|
|
side=order.side,
|
|
|
|
|
|
asset=str(order.metadata.get("asset") or ""),
|
|
|
|
|
|
price=safe_float(_row_float(raw, "avgPrice", "ap", "price", "lastFillPrice", default=order.average_fill_price), 0.0),
|
|
|
|
|
|
size=float(order.intended_size or 0.0),
|
|
|
|
|
|
filled_size=float(order.filled_size or 0.0),
|
|
|
|
|
|
remaining_size=float(order.remaining_size),
|
|
|
|
|
|
reason=reason or _row_text(raw, "msg", "message", default="BINGX_CANCEL_ACK" if kind == KernelEventKind.CANCEL_ACK else "BINGX_CANCEL_REJECT"),
|
|
|
|
|
|
raw_payload=raw or {"orderId": order.venue_order_id, "clientOrderId": order.venue_client_id, "status": status or event_status.value},
|
|
|
|
|
|
metadata=dict(order.metadata),
|
|
|
|
|
|
)
|
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
def _events_from_snapshot(self, snapshot: Any) -> List[VenueEvent]: # noqa: ANN001
|
|
|
|
|
|
events: list[VenueEvent] = []
|
|
|
|
|
|
seen: set[tuple[str, str, str]] = set()
|
|
|
|
|
|
for row in getattr(snapshot, "open_orders", []) or []:
|
|
|
|
|
|
if not isinstance(row, dict):
|
|
|
|
|
|
continue
|
|
|
|
|
|
event = self._event_from_row(row, slot_id=0)
|
|
|
|
|
|
key = (event.venue_client_id, event.venue_order_id, event.kind.value)
|
|
|
|
|
|
if key not in seen:
|
|
|
|
|
|
seen.add(key)
|
|
|
|
|
|
events.append(event)
|
|
|
|
|
|
for row in getattr(snapshot, "all_orders", []) or []:
|
|
|
|
|
|
if not isinstance(row, dict):
|
|
|
|
|
|
continue
|
|
|
|
|
|
event = self._event_from_row(row, slot_id=0)
|
|
|
|
|
|
key = (event.venue_client_id, event.venue_order_id, event.kind.value)
|
|
|
|
|
|
if key not in seen:
|
|
|
|
|
|
seen.add(key)
|
|
|
|
|
|
events.append(event)
|
|
|
|
|
|
for row in getattr(snapshot, "all_fills", []) or []:
|
|
|
|
|
|
if not isinstance(row, dict):
|
|
|
|
|
|
continue
|
|
|
|
|
|
event = self._fill_event_from_row(row)
|
|
|
|
|
|
key = (event.venue_client_id, event.venue_order_id, event.kind.value)
|
|
|
|
|
|
if key not in seen:
|
|
|
|
|
|
seen.add(key)
|
|
|
|
|
|
events.append(event)
|
|
|
|
|
|
return events
|
|
|
|
|
|
|
|
|
|
|
|
def _event_from_row(self, row: dict[str, Any], *, slot_id: int) -> VenueEvent:
|
|
|
|
|
|
status = _normalize_status(_row_text(row, "status", "X", default="NEW"))
|
|
|
|
|
|
event_status = _venue_event_status_from_row(status)
|
|
|
|
|
|
kind = {
|
|
|
|
|
|
VenueEventStatus.ACKED: KernelEventKind.ORDER_ACK,
|
|
|
|
|
|
VenueEventStatus.PARTIALLY_FILLED: KernelEventKind.PARTIAL_FILL,
|
|
|
|
|
|
VenueEventStatus.FILLED: KernelEventKind.FULL_FILL,
|
|
|
|
|
|
VenueEventStatus.CANCELED: KernelEventKind.CANCEL_ACK,
|
|
|
|
|
|
VenueEventStatus.REJECTED: KernelEventKind.ORDER_REJECT,
|
|
|
|
|
|
VenueEventStatus.CANCELED_REJECTED: KernelEventKind.CANCEL_REJECT,
|
|
|
|
|
|
VenueEventStatus.RATE_LIMITED: KernelEventKind.RATE_LIMITED,
|
|
|
|
|
|
}.get(event_status, KernelEventKind.ORDER_ACK)
|
|
|
|
|
|
size = _row_float(row, "origQty", "quantity", "q", "positionAmt", default=0.0)
|
|
|
|
|
|
filled = _row_float(row, "executedQty", "cumFilledQty", "filledQty", "z", "lastFilledQty", default=0.0)
|
|
|
|
|
|
if filled <= 0.0 and kind in {KernelEventKind.PARTIAL_FILL, KernelEventKind.FULL_FILL}:
|
|
|
|
|
|
filled = size
|
|
|
|
|
|
return VenueEvent(
|
|
|
|
|
|
timestamp=datetime.now(timezone.utc),
|
|
|
|
|
|
event_id=_event_id(self._event_seq),
|
|
|
|
|
|
trade_id=_row_text(row, "tradeId", "trade_id", default=_row_text(row, "clientOrderId", "clientOrderID", default="")),
|
|
|
|
|
|
slot_id=slot_id,
|
|
|
|
|
|
kind=kind,
|
|
|
|
|
|
status=event_status,
|
|
|
|
|
|
venue_order_id=_row_text(row, "orderId", "orderID", "id", default=""),
|
|
|
|
|
|
venue_client_id=_row_text(row, "clientOrderID", "clientOrderId", "c", default=""),
|
|
|
|
|
|
side=_trade_side_from_row(row),
|
|
|
|
|
|
asset=_row_text(row, "symbol", default=""),
|
|
|
|
|
|
price=safe_float(_row_float(row, "avgPrice", "ap", "price", "lastFillPrice", default=0.0), 0.0),
|
|
|
|
|
|
size=abs(float(size or 0.0)),
|
|
|
|
|
|
filled_size=abs(float(filled or 0.0)),
|
|
|
|
|
|
remaining_size=max(0.0, abs(float(size or 0.0)) - abs(float(filled or 0.0))),
|
|
|
|
|
|
reason=_row_text(row, "msg", "message", default=""),
|
|
|
|
|
|
raw_payload=dict(row),
|
|
|
|
|
|
metadata={"source": "bingx"},
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
def _fill_event_from_row(self, row: dict[str, Any]) -> VenueEvent:
|
|
|
|
|
|
status = _normalize_status(_row_text(row, "status", "X", default="FILLED"))
|
|
|
|
|
|
event_status = _venue_event_status_from_row(status)
|
|
|
|
|
|
kind = KernelEventKind.FULL_FILL if event_status == VenueEventStatus.FILLED else KernelEventKind.PARTIAL_FILL
|
|
|
|
|
|
return VenueEvent(
|
|
|
|
|
|
timestamp=datetime.now(timezone.utc),
|
|
|
|
|
|
event_id=_event_id(self._event_seq),
|
|
|
|
|
|
trade_id=_row_text(row, "tradeId", "trade_id", default=_row_text(row, "clientOrderId", "clientOrderID", default="")),
|
|
|
|
|
|
slot_id=0,
|
|
|
|
|
|
kind=kind,
|
|
|
|
|
|
status=event_status,
|
|
|
|
|
|
venue_order_id=_row_text(row, "orderId", "orderID", "id", default=""),
|
|
|
|
|
|
venue_client_id=_row_text(row, "clientOrderID", "clientOrderId", "c", default=""),
|
|
|
|
|
|
side=_trade_side_from_row(row),
|
|
|
|
|
|
asset=_row_text(row, "symbol", default=""),
|
|
|
|
|
|
price=safe_float(_row_float(row, "lastFillPrice", "L", "price", "ap", default=0.0), 0.0),
|
|
|
|
|
|
size=abs(_row_float(row, "executedQty", "z", "lastFilledQty", default=0.0)),
|
|
|
|
|
|
filled_size=abs(_row_float(row, "lastFilledQty", "l", "z", default=0.0)),
|
|
|
|
|
|
remaining_size=max(0.0, abs(_row_float(row, "executedQty", "z", "lastFilledQty", default=0.0)) - abs(_row_float(row, "lastFilledQty", "l", "z", default=0.0))),
|
|
|
|
|
|
reason=_row_text(row, "msg", "message", default=""),
|
|
|
|
|
|
raw_payload=dict(row),
|
|
|
|
|
|
metadata={"source": "bingx"},
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
|
def _filled_size_from_snapshots(before: Any, after: Any, asset: str) -> float: # noqa: ANN001
|
|
|
|
|
|
def _lookup(snapshot: Any) -> float:
|
|
|
|
|
|
positions = getattr(snapshot, "open_positions", {}) or {}
|
|
|
|
|
|
for key, row in positions.items():
|
|
|
|
|
|
symbol = _row_text(row, "symbol", default=str(key))
|
|
|
|
|
|
if symbol.replace("-", "").replace("_", "").upper() == asset.replace("-", "").replace("_", "").upper():
|
|
|
|
|
|
return _position_qty(row)
|
|
|
|
|
|
return 0.0
|
|
|
|
|
|
|
|
|
|
|
|
before_qty = _lookup(before)
|
|
|
|
|
|
after_qty = _lookup(after)
|
|
|
|
|
|
diff = abs(before_qty - after_qty)
|
|
|
|
|
|
return diff
|