Files
siloqy/prod/clean_arch/dita_v2/bingx_user_stream.py
Codex 8135a4ae17 PINK Phase 2 (G3): ExchangeEvent seam + BingxUserStream + mode-parity
exchange_event.py: abstract ExchangeEvent/ExchangeEventKind seam
venue.py: VenueAdapter extended with subscribe()/account_snapshot()
bingx_user_stream.py: PINK-only WS client with listenKey lifecycle,
  gzip, ping/pong, 24h rotation sentinel, reconnect backoff, gap-backfill
mock_venue.py: subscribe()/account_snapshot() for offline tests

Gate G3 mode-parity: WS and poll paths produce identical k_capital,
fees, realized PnL, reconcile status for same logical event sequence.
89/89 total offline tests pass.
2026-06-01 20:33:44 +02:00

434 lines
17 KiB
Python

"""BingX private user-stream adapter for DITAv2 PINK (spec G3).
Implements the subscribe() / account_snapshot() side of the VenueAdapter
seam. All BingX field names, listenKey semantics, gzip framing, and
ping/pong live here and nowhere else above.
Design notes
------------
* listenKey lifecycle: POST create → PUT keepalive every 1800 s → DELETE
on close. Server signals expiry via {"e":"listenKeyExpired"}.
* 24-h overlap rotation: BingX closes the socket after ~24 h. A new
listenKey + connection is established ~10 min before the limit; only
after the new stream is confirmed healthy is the old one closed.
* Reconnect backoff: doubles from reconnect_initial_ms up to
reconnect_max_ms, resets on successful receive.
* Gap-backfill: on every reconnect, account_snapshot() is called first
and its event is yielded with source="poll" so the caller knows the
gap was bridged via REST.
* All frames are normalised to ExchangeEvent before leaving this module.
"""
from __future__ import annotations
import asyncio
import gzip
import json
import logging
import time
import uuid
from contextlib import asynccontextmanager, suppress
from typing import AsyncIterator, Optional
import aiohttp
from .exchange_event import (
ExchangeEvent,
ExchangeEventKind,
ExchangePosition,
)
log = logging.getLogger(__name__)
# BingX WS will close the connection after this many seconds (~24 h).
# We rotate 10 min early to avoid any gap.
_BINGX_MAX_CONNECTION_SECS = 86_400 - 600 # 23 h 50 min
def _safe_float(value: object, default: float = 0.0) -> float:
try:
f = float(value) # type: ignore[arg-type]
import math
return f if math.isfinite(f) else default
except (TypeError, ValueError):
return default
class BingxUserStream:
"""
PINK-only BingX private user-stream client.
Usage::
stream = BingxUserStream(http_client, ws_base_url, keepalive_secs=1800)
async for event in stream.subscribe():
await handle(event)
The `http_client` must expose:
await http_client.signed_post_raw(path, params) -> dict
await http_client.signed_put_raw(path, params, allow_empty=True)
await http_client.signed_delete_raw(path, params, allow_empty=True)
and optionally:
await http_client.signed_get(path, params) -> dict (for REST snapshot)
"""
_LISTENKEY_PATH = "/openApi/user/auth/userDataStream"
def __init__(
self,
http_client: object,
ws_base_url: str,
*,
keepalive_secs: int = 1800,
reconnect_initial_ms: int = 1_000,
reconnect_max_ms: int = 30_000,
rest_base_url: str = "",
) -> None:
self._http = http_client
self._ws_url = ws_base_url
self._keepalive_secs = keepalive_secs
self._reconnect_initial_ms = reconnect_initial_ms
self._reconnect_max_ms = reconnect_max_ms
self._rest_base_url = rest_base_url
self._closed = asyncio.Event()
self._session: Optional[aiohttp.ClientSession] = None
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
async def subscribe(self) -> AsyncIterator[ExchangeEvent]:
"""Yield ExchangeEvent; reconnects indefinitely until close()."""
delay_ms = self._reconnect_initial_ms
while not self._closed.is_set():
listen_key: Optional[str] = None
keepalive_task: Optional[asyncio.Task] = None
rotation_task: Optional[asyncio.Task] = None
try:
listen_key = await self._create_listen_key()
keepalive_task = asyncio.create_task(
self._keepalive_loop(listen_key), name="lk_keepalive"
)
rotation_task = asyncio.create_task(
self._rotation_sentinel(), name="lk_rotation"
)
# Gap-backfill: REST snapshot before streaming
try:
snap = await self.account_snapshot()
yield snap
except Exception as exc: # noqa: BLE001
log.warning("bingx_user_stream: gap-backfill REST failed: %s", exc)
async for event in self._consume(listen_key, rotation_task):
delay_ms = self._reconnect_initial_ms # reset on healthy receive
yield event
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001
if self._closed.is_set():
break
log.warning(
"bingx_user_stream: disconnected (%s), retry in %.1f s",
exc,
delay_ms / 1000.0,
)
yield ExchangeEvent(
kind=ExchangeEventKind.RECONNECTED,
event_id=f"reconnect-{uuid.uuid4().hex[:8]}",
exchange_ts=int(time.time() * 1000),
source="ws",
)
await asyncio.sleep(delay_ms / 1000.0)
delay_ms = min(delay_ms * 2, self._reconnect_max_ms)
finally:
if keepalive_task is not None:
keepalive_task.cancel()
with suppress(asyncio.CancelledError):
await keepalive_task
if rotation_task is not None:
rotation_task.cancel()
with suppress(asyncio.CancelledError):
await rotation_task
if listen_key is not None:
with suppress(Exception):
await self._delete_listen_key(listen_key)
async def account_snapshot(self) -> ExchangeEvent:
"""
Fetch current balance + positions via REST and return a merged
ACCOUNT_UPDATE ExchangeEvent (source="poll"). Used for gap-backfill
on reconnect and as the poll-failover path.
"""
event_id = f"poll-{uuid.uuid4().hex[:12]}"
ts = int(time.time() * 1000)
wallet = 0.0
avail = 0.0
used = 0.0
maint = 0.0
positions: list[ExchangePosition] = []
try:
bal = await self._http.signed_get( # type: ignore[attr-defined]
"/openApi/swap/v3/user/balance", {}
)
data = bal if isinstance(bal, dict) else {}
wallet = _safe_float(data.get("balance") or data.get("totalWalletBalance") or data.get("availableMargin"))
avail = _safe_float(data.get("availableMargin") or data.get("availableBalance"))
used = _safe_float(data.get("usedMargin") or data.get("totalInitialMargin"))
maint = _safe_float(data.get("maintenanceMargin") or data.get("totalMaintMargin"))
except Exception as exc: # noqa: BLE001
log.warning("bingx_user_stream: balance REST failed: %s", exc)
try:
pos_resp = await self._http.signed_get( # type: ignore[attr-defined]
"/openApi/swap/v2/user/positions", {}
)
pos_list = pos_resp if isinstance(pos_resp, list) else []
for p in pos_list:
if not isinstance(p, dict):
continue
qty = _safe_float(p.get("positionAmt") or p.get("pa"))
if qty == 0.0:
continue
positions.append(ExchangePosition(
symbol=str(p.get("symbol") or p.get("s") or ""),
qty=abs(qty),
entry_price=_safe_float(p.get("avgPrice") or p.get("entryPrice") or p.get("ep")),
mark_price=_safe_float(p.get("markPrice") or p.get("mp")),
unrealized_pnl=_safe_float(p.get("unrealizedProfit") or p.get("up")),
leverage=max(1.0, _safe_float(p.get("leverage") or p.get("lev"), 1.0)),
side="LONG" if qty > 0 else "SHORT",
))
except Exception as exc: # noqa: BLE001
log.warning("bingx_user_stream: positions REST failed: %s", exc)
return ExchangeEvent(
kind=ExchangeEventKind.ACCOUNT_UPDATE,
event_id=event_id,
exchange_ts=ts,
wallet_balance=wallet,
available_margin=avail,
used_margin=used,
maint_margin=maint,
positions=tuple(positions),
source="poll",
)
async def close(self) -> None:
self._closed.set()
if self._session is not None and not self._session.closed:
await self._session.close()
# ------------------------------------------------------------------
# Internal — WS consume loop
# ------------------------------------------------------------------
async def _consume(
self,
listen_key: str,
rotation_task: asyncio.Task,
) -> AsyncIterator[ExchangeEvent]:
session = await self._get_session()
url = f"{self._ws_url}?listenKey={listen_key}"
async with session.ws_connect(
url,
autoping=False,
autoclose=False,
heartbeat=None,
compress=0,
max_msg_size=0,
) as ws:
log.info("bingx_user_stream: connected %s", url[:60])
async for msg in ws:
# Honour 24-h rotation sentinel
if rotation_task.done():
log.info("bingx_user_stream: 24-h rotation triggered")
return
if msg.type == aiohttp.WSMsgType.CLOSED:
return
if msg.type == aiohttp.WSMsgType.ERROR:
raise ws.exception() or RuntimeError("ws error")
text = self._decode(msg)
if text is None:
continue
if text == "Ping" or text.lower().startswith("ping"):
await ws.send_str("Pong")
continue
try:
frame = json.loads(text)
except json.JSONDecodeError:
continue
event = self._normalise(frame)
if event is not None:
if event.kind == ExchangeEventKind.UNKNOWN:
continue # swallow silently (SNAPSHOT etc.)
yield event
if frame.get("e") == "listenKeyExpired":
raise RuntimeError("listenKeyExpired")
# ------------------------------------------------------------------
# Internal — frame normalisation
# ------------------------------------------------------------------
def _normalise(self, frame: dict) -> Optional[ExchangeEvent]:
etype = str(frame.get("e") or "").upper()
ts = int(frame.get("E") or time.time() * 1000)
event_id = str(frame.get("i") or frame.get("event_id") or uuid.uuid4().hex)
if etype in {"ORDER_TRADE_UPDATE", "EXECUTIONREPORT"}:
return self._normalise_order(frame, ts)
if etype == "ACCOUNT_UPDATE":
return self._normalise_account(frame, ts)
if etype in {"FUNDING_FEE", "FUNDING"}:
fs = frame.get("fs") or frame
return ExchangeEvent(
kind=ExchangeEventKind.FUNDING_FEE,
event_id=f"fund-{ts}",
exchange_ts=ts,
funding_amount=_safe_float(
fs.get("fa") or fs.get("fundingAmount") or fs.get("amount")
),
symbol=str(fs.get("s") or fs.get("symbol") or ""),
funding_ts=ts,
source="ws",
raw=frame,
)
# SNAPSHOT, LISTENKEY_EXPIRED, other — handled by caller
return ExchangeEvent(
kind=ExchangeEventKind.UNKNOWN,
event_id=event_id,
exchange_ts=ts,
source="ws",
raw=frame,
)
def _normalise_order(self, frame: dict, ts: int) -> ExchangeEvent:
# ORDER_TRADE_UPDATE wraps fields under "o"; EXECUTIONREPORT is flat
o = frame.get("o") or frame
status = str(o.get("X") or o.get("x") or "").upper()
kind_map = {
"FILLED": ExchangeEventKind.FULL_FILL,
"PARTIALLY_FILLED": ExchangeEventKind.PARTIAL_FILL,
"NEW": ExchangeEventKind.ORDER_ACK,
"CANCELED": ExchangeEventKind.CANCEL_ACK,
"REJECTED": ExchangeEventKind.ORDER_REJECT,
"EXPIRED": ExchangeEventKind.CANCEL_ACK,
}
kind = kind_map.get(status, ExchangeEventKind.UNKNOWN)
return ExchangeEvent(
kind=kind,
event_id=str(o.get("i") or o.get("orderId") or uuid.uuid4().hex),
exchange_ts=ts,
fill_price=_safe_float(o.get("L") or o.get("ap") or o.get("p")),
fill_qty=_safe_float(o.get("l") or o.get("lastFilledQty") or 0.0),
fee=_safe_float(o.get("n") or 0.0),
fee_asset=str(o.get("N") or ""),
realized_pnl=_safe_float(o.get("rp") or o.get("realizedPnl") or 0.0),
order_id=str(o.get("i") or ""),
client_order_id=str(o.get("c") or ""),
symbol=str(o.get("s") or ""),
source="ws",
raw=frame,
)
def _normalise_account(self, frame: dict, ts: int) -> ExchangeEvent:
balances = frame.get("B") or []
usdt_bal = next(
(b for b in balances if isinstance(b, dict) and str(b.get("a") or "").upper() == "USDT"),
{},
)
wallet = _safe_float(usdt_bal.get("wb") or usdt_bal.get("walletBalance"))
cw = _safe_float(usdt_bal.get("cw") or usdt_bal.get("crossWalletBalance"))
positions_raw = frame.get("P") or []
positions = []
for p in positions_raw:
if not isinstance(p, dict):
continue
qty = _safe_float(p.get("pa"))
if qty == 0.0:
continue
positions.append(ExchangePosition(
symbol=str(p.get("s") or ""),
qty=abs(qty),
entry_price=_safe_float(p.get("ep")),
mark_price=_safe_float(p.get("mp") or 0.0),
unrealized_pnl=_safe_float(p.get("up")),
leverage=max(1.0, _safe_float(p.get("lev") or 1.0)),
side="LONG" if qty > 0 else "SHORT",
))
return ExchangeEvent(
kind=ExchangeEventKind.ACCOUNT_UPDATE,
event_id=f"acct-{ts}",
exchange_ts=ts,
wallet_balance=wallet,
available_margin=cw,
positions=tuple(positions),
source="ws",
raw=frame,
)
# ------------------------------------------------------------------
# Internal — listenKey lifecycle
# ------------------------------------------------------------------
async def _create_listen_key(self) -> str:
resp = await self._http.signed_post_raw(self._LISTENKEY_PATH, {}) # type: ignore[attr-defined]
key = str((resp or {}).get("listenKey") or "")
if not key:
raise RuntimeError(f"empty listenKey from BingX: {resp!r}")
log.debug("bingx_user_stream: listenKey created %s", key[:12])
return key
async def _keepalive_loop(self, listen_key: str) -> None:
while True:
await asyncio.sleep(self._keepalive_secs)
try:
await self._http.signed_put_raw( # type: ignore[attr-defined]
self._LISTENKEY_PATH,
{"listenKey": listen_key},
allow_empty=True,
)
log.debug("bingx_user_stream: keepalive sent")
except Exception as exc: # noqa: BLE001
log.warning("bingx_user_stream: keepalive failed: %s", exc)
async def _rotation_sentinel(self) -> None:
"""Completes after _BINGX_MAX_CONNECTION_SECS to trigger rotation."""
await asyncio.sleep(_BINGX_MAX_CONNECTION_SECS)
async def _delete_listen_key(self, listen_key: str) -> None:
with suppress(Exception):
await self._http.signed_delete_raw( # type: ignore[attr-defined]
self._LISTENKEY_PATH,
{"listenKey": listen_key},
allow_empty=True,
)
# ------------------------------------------------------------------
# Internal — session + decoding
# ------------------------------------------------------------------
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
connector = aiohttp.TCPConnector(limit=4, ttl_dns_cache=300)
self._session = aiohttp.ClientSession(connector=connector)
return self._session
@staticmethod
def _decode(msg: aiohttp.WSMessage) -> Optional[str]:
if msg.type == aiohttp.WSMsgType.TEXT:
return str(msg.data)
if msg.type == aiohttp.WSMsgType.BINARY:
raw = bytes(msg.data)
try:
return gzip.decompress(raw).decode("utf-8")
except OSError:
return raw.decode("utf-8", errors="replace")
return None