Files
siloqy/prod/clean_arch/dita_v2/bingx_user_stream.py
Codex e38ec77221 PINK: fix fee-sign bug + WARN-unfreeze — 451/451 tests green
Defect A (fee sign): bingx_user_stream._normalise_order flipped to
  fee = -raw_fee so BingX negative-n costs arrive as positive kernel
  costs.  k_maker_rebates no longer accumulates phantom rebates.

Defect B (opening fee dropped): fill_qty now falls back to "z"
  (cumFilledQty) when "l" (lastFilledQty) is zero/absent, so
  apply_predicted_fill computes a non-zero opening-leg fee.

Architectural fix (WARN unfreezes): lib.rs reconcile() now unfreezes
  capital_frozen on WARN as well as OK.  WARN (0.01-20 USDT delta) is
  normal in-flight settlement — only ERROR (≥20, unexplained) should
  halt ENTERs.  The old keep-state logic trapped the kernel permanently
  frozen after the first trade's ENTER predicted-fee phase pushed delta
  briefly into ERROR.

Acceptance criterion: |k_capital - bingx_balance| < 1 USDT, frozen=False
after every round-trip trade — verified numerically against T-1/T-2
ground truth from the CRITICAL doc.

Docs: CRITICAL_AGENT-TODO_ACCOUNTING_BUGFIX.md §12-13 (fix record),
      CAPITAL_BOOKKEEPING_DESIGN.md §8 (kernel spec), SYSTEM_BIBLE §11.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-08 11:08:31 +02:00

457 lines
18 KiB
Python

"""BingX private user-stream adapter for DITAv2 PINK (spec G3).
Implements the subscribe() / account_snapshot() side of the VenueAdapter
seam. All BingX field names, listenKey semantics, gzip framing, and
ping/pong live here and nowhere else above.
Design notes
------------
* listenKey lifecycle: POST create → PUT keepalive every 1800 s → DELETE
on close. Server signals expiry via {"e":"listenKeyExpired"}.
* 24-h overlap rotation: BingX closes the socket after ~24 h. A new
listenKey + connection is established ~10 min before the limit; only
after the new stream is confirmed healthy is the old one closed.
* Reconnect backoff: doubles from reconnect_initial_ms up to
reconnect_max_ms, resets on successful receive.
* Gap-backfill: on every reconnect, account_snapshot() is called first
and its event is yielded with source="poll" so the caller knows the
gap was bridged via REST.
* All frames are normalised to ExchangeEvent before leaving this module.
"""
from __future__ import annotations
import asyncio
import gzip
import json
import logging
import time
import uuid
from contextlib import asynccontextmanager, suppress
from typing import AsyncIterator, Optional
import aiohttp
from .exchange_event import (
ExchangeEvent,
ExchangeEventKind,
ExchangePosition,
)
log = logging.getLogger(__name__)
# BingX WS will close the connection after this many seconds (~24 h).
# We rotate 10 min early to avoid any gap.
_BINGX_MAX_CONNECTION_SECS = 86_400 - 600 # 23 h 50 min
def _safe_float(value: object, default: float = 0.0) -> float:
try:
f = float(value) # type: ignore[arg-type]
import math
return f if math.isfinite(f) else default
except (TypeError, ValueError):
return default
class BingxUserStream:
"""
PINK-only BingX private user-stream client.
Usage::
stream = BingxUserStream(http_client, ws_base_url, keepalive_secs=1800)
async for event in stream.subscribe():
await handle(event)
The `http_client` must expose:
await http_client.signed_post_raw(path, params) -> dict
await http_client.signed_put_raw(path, params, allow_empty=True)
await http_client.signed_delete_raw(path, params, allow_empty=True)
and optionally:
await http_client.signed_get(path, params) -> dict (for REST snapshot)
"""
_LISTENKEY_PATH = "/openApi/user/auth/userDataStream"
def __init__(
self,
http_client: object,
ws_base_url: str,
*,
keepalive_secs: int = 1800,
reconnect_initial_ms: int = 1_000,
reconnect_max_ms: int = 30_000,
rest_base_url: str = "",
) -> None:
self._http = http_client
self._ws_url = ws_base_url
self._keepalive_secs = keepalive_secs
self._reconnect_initial_ms = reconnect_initial_ms
self._reconnect_max_ms = reconnect_max_ms
self._rest_base_url = rest_base_url
self._closed = asyncio.Event()
self._session: Optional[aiohttp.ClientSession] = None
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
async def subscribe(self) -> AsyncIterator[ExchangeEvent]:
"""Yield ExchangeEvent; reconnects indefinitely until close()."""
delay_ms = self._reconnect_initial_ms
while not self._closed.is_set():
listen_key: Optional[str] = None
keepalive_task: Optional[asyncio.Task] = None
rotation_task: Optional[asyncio.Task] = None
try:
listen_key = await self._create_listen_key()
keepalive_task = asyncio.create_task(
self._keepalive_loop(listen_key), name="lk_keepalive"
)
rotation_task = asyncio.create_task(
self._rotation_sentinel(), name="lk_rotation"
)
# Gap-backfill: REST snapshot before streaming
try:
snap = await self.account_snapshot()
yield snap
except Exception as exc: # noqa: BLE001
log.warning("bingx_user_stream: gap-backfill REST failed: %s", exc)
async for event in self._consume(listen_key, rotation_task):
delay_ms = self._reconnect_initial_ms # reset on healthy receive
yield event
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001
if self._closed.is_set():
break
log.warning(
"bingx_user_stream: disconnected (%s), retry in %.1f s",
exc,
delay_ms / 1000.0,
)
yield ExchangeEvent(
kind=ExchangeEventKind.RECONNECTED,
event_id=f"reconnect-{uuid.uuid4().hex[:8]}",
exchange_ts=int(time.time() * 1000),
source="ws",
)
await asyncio.sleep(delay_ms / 1000.0)
delay_ms = min(delay_ms * 2, self._reconnect_max_ms)
finally:
if keepalive_task is not None:
keepalive_task.cancel()
with suppress(asyncio.CancelledError):
await keepalive_task
if rotation_task is not None:
rotation_task.cancel()
with suppress(asyncio.CancelledError):
await rotation_task
if listen_key is not None:
with suppress(Exception):
await self._delete_listen_key(listen_key)
async def account_snapshot(self) -> ExchangeEvent:
"""
Fetch current balance + positions via REST and return a merged
ACCOUNT_UPDATE ExchangeEvent (source="poll"). Used for gap-backfill
on reconnect and as the poll-failover path.
"""
event_id = f"poll-{uuid.uuid4().hex[:12]}"
ts = int(time.time() * 1000)
wallet = 0.0
avail = 0.0
used = 0.0
maint = 0.0
positions: list[ExchangePosition] = []
try:
bal = await self._http.signed_get( # type: ignore[attr-defined]
"/openApi/swap/v3/user/balance", {}
)
# v3 returns a list; v2 returns {"balance": {...}}
if isinstance(bal, list):
data = bal[0] if bal else {}
elif isinstance(bal, dict):
data = bal.get("balance") if isinstance(bal.get("balance"), dict) else bal
else:
data = {}
wallet = _safe_float(data.get("equity") or data.get("balance") or data.get("totalWalletBalance"))
avail = _safe_float(data.get("availableMargin") or data.get("availableBalance"))
used = _safe_float(data.get("usedMargin") or data.get("frozenMargin") or data.get("totalInitialMargin"))
maint = _safe_float(data.get("maintenanceMargin") or data.get("totalMaintMargin") or 0.0)
except Exception as exc: # noqa: BLE001
log.warning("bingx_user_stream: balance REST failed: %s", exc)
try:
pos_resp = await self._http.signed_get( # type: ignore[attr-defined]
"/openApi/swap/v2/user/positions", {}
)
pos_list = pos_resp if isinstance(pos_resp, list) else []
for p in pos_list:
if not isinstance(p, dict):
continue
qty = _safe_float(p.get("positionAmt") or p.get("pa"))
if qty == 0.0:
continue
positions.append(ExchangePosition(
symbol=str(p.get("symbol") or p.get("s") or ""),
qty=abs(qty),
entry_price=_safe_float(p.get("avgPrice") or p.get("entryPrice") or p.get("ep")),
mark_price=_safe_float(p.get("markPrice") or p.get("mp")),
unrealized_pnl=_safe_float(p.get("unrealizedProfit") or p.get("up")),
leverage=max(1.0, _safe_float(p.get("leverage") or p.get("lev"), 1.0)),
side="LONG" if qty > 0 else "SHORT",
))
except Exception as exc: # noqa: BLE001
log.warning("bingx_user_stream: positions REST failed: %s", exc)
return ExchangeEvent(
kind=ExchangeEventKind.ACCOUNT_UPDATE,
event_id=event_id,
exchange_ts=ts,
wallet_balance=wallet,
available_margin=avail,
used_margin=used,
maint_margin=maint,
positions=tuple(positions),
source="poll",
)
async def close(self) -> None:
self._closed.set()
if self._session is not None and not self._session.closed:
await self._session.close()
# ------------------------------------------------------------------
# Internal — WS consume loop
# ------------------------------------------------------------------
async def _consume(
self,
listen_key: str,
rotation_task: asyncio.Task,
) -> AsyncIterator[ExchangeEvent]:
session = await self._get_session()
url = f"{self._ws_url}?listenKey={listen_key}"
async with session.ws_connect(
url,
autoping=False,
autoclose=False,
heartbeat=None,
compress=0,
max_msg_size=0,
) as ws:
log.info("bingx_user_stream: connected %s", url[:60])
async for msg in ws:
# Honour 24-h rotation sentinel
if rotation_task.done():
log.info("bingx_user_stream: 24-h rotation triggered")
return
if msg.type == aiohttp.WSMsgType.CLOSED:
return
if msg.type == aiohttp.WSMsgType.ERROR:
raise ws.exception() or RuntimeError("ws error")
text = self._decode(msg)
if text is None:
continue
if text == "Ping" or text.lower().startswith("ping"):
await ws.send_str("Pong")
continue
try:
frame = json.loads(text)
except json.JSONDecodeError:
continue
event = self._normalise(frame)
if event is not None:
if event.kind == ExchangeEventKind.UNKNOWN:
continue # swallow silently (SNAPSHOT etc.)
yield event
if frame.get("e") == "listenKeyExpired":
raise RuntimeError("listenKeyExpired")
# ------------------------------------------------------------------
# Internal — frame normalisation
# ------------------------------------------------------------------
def _normalise(self, frame: dict) -> Optional[ExchangeEvent]:
etype = str(frame.get("e") or "").upper()
ts = int(frame.get("E") or time.time() * 1000)
event_id = str(frame.get("i") or frame.get("event_id") or uuid.uuid4().hex)
if etype in {"ORDER_TRADE_UPDATE", "EXECUTIONREPORT"}:
return self._normalise_order(frame, ts)
if etype == "ACCOUNT_UPDATE":
return self._normalise_account(frame, ts)
if etype in {"FUNDING_FEE", "FUNDING"}:
fs = frame.get("fs") or frame
return ExchangeEvent(
kind=ExchangeEventKind.FUNDING_FEE,
event_id=f"fund-{ts}",
exchange_ts=ts,
funding_amount=_safe_float(
fs.get("fa") or fs.get("fundingAmount") or fs.get("amount")
),
symbol=str(fs.get("s") or fs.get("symbol") or ""),
funding_ts=ts,
source="ws",
raw=frame,
)
# SNAPSHOT, LISTENKEY_EXPIRED, other — handled by caller
return ExchangeEvent(
kind=ExchangeEventKind.UNKNOWN,
event_id=event_id,
exchange_ts=ts,
source="ws",
raw=frame,
)
def _normalise_order(self, frame: dict, ts: int) -> ExchangeEvent:
# ORDER_TRADE_UPDATE wraps fields under "o"; EXECUTIONREPORT is flat
o = frame.get("o") or frame
status = str(o.get("X") or o.get("x") or "").upper()
kind_map = {
"FILLED": ExchangeEventKind.FULL_FILL,
"PARTIALLY_FILLED": ExchangeEventKind.PARTIAL_FILL,
"NEW": ExchangeEventKind.ORDER_ACK,
"CANCELED": ExchangeEventKind.CANCEL_ACK,
"REJECTED": ExchangeEventKind.ORDER_REJECT,
"EXPIRED": ExchangeEventKind.CANCEL_ACK,
}
kind = kind_map.get(status, ExchangeEventKind.UNKNOWN)
# Maker detection: BingX WS uses "m" field (True = maker) in order updates.
# Falls back to order type field "o" (LIMIT=maker, MARKET=taker).
is_maker = bool(o.get("m") or (
str(o.get("o") or o.get("type") or "MARKET").upper() == "LIMIT"
and status in {"FILLED", "PARTIALLY_FILLED"}
))
# DEFECT A FIX: BingX sends "n" (commission) NEGATIVE for costs, POSITIVE for
# rebates on VST. Kernel convention is the opposite: POSITIVE = cost, NEGATIVE
# = rebate. Flip at this boundary so the kernel never sees wrong-sign fees.
raw_fee = _safe_float(o.get("n") or 0.0)
fee = -raw_fee # BingX cost (negative n) → kernel cost (positive fee)
# DEFECT B FIX: prefer "l" (lastFilledQty) when non-zero; fall back to "z"
# (cumFilledQty) for ENTER fills where BingX reports qty only in "z".
_last_qty = _safe_float(o.get("l") or 0.0)
_cum_qty = _safe_float(o.get("z") or o.get("cumFilledQty") or 0.0)
fill_qty = _last_qty if _last_qty > 0.0 else _cum_qty
return ExchangeEvent(
kind=kind,
event_id=str(o.get("i") or o.get("orderId") or uuid.uuid4().hex),
exchange_ts=ts,
fill_price=_safe_float(o.get("L") or o.get("ap") or o.get("p")),
fill_qty=fill_qty,
fee=fee,
fee_asset=str(o.get("N") or ""),
realized_pnl=_safe_float(o.get("rp") or o.get("realizedPnl") or 0.0),
order_id=str(o.get("i") or ""),
client_order_id=str(o.get("c") or ""),
symbol=str(o.get("s") or ""),
is_maker=is_maker,
source="ws",
raw=frame,
)
def _normalise_account(self, frame: dict, ts: int) -> ExchangeEvent:
balances = frame.get("B") or []
usdt_bal = next(
(b for b in balances if isinstance(b, dict) and str(b.get("a") or "").upper() == "USDT"),
{},
)
wallet = _safe_float(usdt_bal.get("wb") or usdt_bal.get("walletBalance"))
cw = _safe_float(usdt_bal.get("cw") or usdt_bal.get("crossWalletBalance"))
positions_raw = frame.get("P") or []
positions = []
for p in positions_raw:
if not isinstance(p, dict):
continue
qty = _safe_float(p.get("pa"))
if qty == 0.0:
continue
positions.append(ExchangePosition(
symbol=str(p.get("s") or ""),
qty=abs(qty),
entry_price=_safe_float(p.get("ep")),
mark_price=_safe_float(p.get("mp") or 0.0),
unrealized_pnl=_safe_float(p.get("up")),
leverage=max(1.0, _safe_float(p.get("lev") or 1.0)),
side="LONG" if qty > 0 else "SHORT",
))
return ExchangeEvent(
kind=ExchangeEventKind.ACCOUNT_UPDATE,
event_id=f"acct-{ts}",
exchange_ts=ts,
wallet_balance=wallet,
available_margin=cw,
positions=tuple(positions),
source="ws",
raw=frame,
)
# ------------------------------------------------------------------
# Internal — listenKey lifecycle
# ------------------------------------------------------------------
async def _create_listen_key(self) -> str:
resp = await self._http.signed_post_raw(self._LISTENKEY_PATH, {}) # type: ignore[attr-defined]
key = str((resp or {}).get("listenKey") or "")
if not key:
raise RuntimeError(f"empty listenKey from BingX: {resp!r}")
log.debug("bingx_user_stream: listenKey created %s", key[:12])
return key
async def _keepalive_loop(self, listen_key: str) -> None:
while True:
await asyncio.sleep(self._keepalive_secs)
try:
await self._http.signed_put_raw( # type: ignore[attr-defined]
self._LISTENKEY_PATH,
{"listenKey": listen_key},
allow_empty=True,
)
log.debug("bingx_user_stream: keepalive sent")
except Exception as exc: # noqa: BLE001
log.warning("bingx_user_stream: keepalive failed: %s", exc)
async def _rotation_sentinel(self) -> None:
"""Completes after _BINGX_MAX_CONNECTION_SECS to trigger rotation."""
await asyncio.sleep(_BINGX_MAX_CONNECTION_SECS)
async def _delete_listen_key(self, listen_key: str) -> None:
with suppress(Exception):
await self._http.signed_delete_raw( # type: ignore[attr-defined]
self._LISTENKEY_PATH,
{"listenKey": listen_key},
allow_empty=True,
)
# ------------------------------------------------------------------
# Internal — session + decoding
# ------------------------------------------------------------------
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
connector = aiohttp.TCPConnector(limit=4, ttl_dns_cache=300)
self._session = aiohttp.ClientSession(connector=connector)
return self._session
@staticmethod
def _decode(msg: aiohttp.WSMessage) -> Optional[str]:
if msg.type == aiohttp.WSMsgType.TEXT:
return str(msg.data)
if msg.type == aiohttp.WSMsgType.BINARY:
raw = bytes(msg.data)
try:
return gzip.decompress(raw).decode("utf-8")
except OSError:
return raw.decode("utf-8", errors="replace")
return None