from __future__ import annotations import asyncio import gzip import json import uuid from collections.abc import Awaitable from collections.abc import Callable from typing import Any import aiohttp EventHandler = Callable[[dict[str, Any]], Awaitable[None]] HealthHandler = Callable[[bool], None] class BingxMarketStream: """ Public (unauthenticated) BingX swap-market WebSocket stream. """ def __init__( self, *, ws_url: str, on_event: EventHandler, on_health: HealthHandler | None = None, reconnect_initial_ms: int = 500, reconnect_max_ms: int = 10_000, http_timeout_secs: int = 10, ) -> None: self._ws_url = ws_url self._on_event = on_event self._on_health = on_health self._reconnect_initial_ms = int(reconnect_initial_ms) self._reconnect_max_ms = int(reconnect_max_ms) self._http_timeout_secs = int(http_timeout_secs) self._closed = asyncio.Event() self._session: aiohttp.ClientSession | None = None # dataType -> subscription id self._subscriptions: dict[str, str] = {} self._subscriptions_changed = asyncio.Event() def subscribe(self, data_type: str) -> None: if data_type in self._subscriptions: return self._subscriptions[data_type] = str(uuid.uuid4()) self._subscriptions_changed.set() def unsubscribe(self, data_type: str) -> None: if data_type not in self._subscriptions: return self._subscriptions.pop(data_type, None) self._subscriptions_changed.set() async def run_forever(self) -> None: delay_ms = self._reconnect_initial_ms while not self._closed.is_set(): try: await self._consume() delay_ms = self._reconnect_initial_ms except asyncio.CancelledError: raise except Exception: if self._closed.is_set(): break await asyncio.sleep(delay_ms / 1000.0) delay_ms = min(delay_ms * 2, self._reconnect_max_ms) finally: self._notify_health(False) await self.close() async def close(self) -> None: self._closed.set() if self._session is not None and not self._session.closed: await self._session.close() async def _consume(self) -> None: session = await self._get_session() async with session.ws_connect( self._ws_url, autoping=False, autoclose=False, heartbeat=None, compress=0, max_msg_size=0, ) as ws: self._notify_health(True) await self._flush_subscriptions(ws) async for msg in ws: if msg.type == aiohttp.WSMsgType.CLOSED: break if msg.type == aiohttp.WSMsgType.ERROR: raise ws.exception() or RuntimeError("BingX market socket error") if self._subscriptions_changed.is_set(): await self._flush_subscriptions(ws) text = self._decode_message(msg) if text is None: continue if text == "Ping" or "ping" in text.lower(): await ws.send_str("Pong") continue payload = json.loads(text) await self._on_event(payload) async def _flush_subscriptions(self, ws: aiohttp.ClientWebSocketResponse) -> None: self._subscriptions_changed.clear() for data_type, sub_id in list(self._subscriptions.items()): await ws.send_json({"id": sub_id, "reqType": "sub", "dataType": data_type}) async def _get_session(self) -> aiohttp.ClientSession: if self._session is None or self._session.closed: timeout = aiohttp.ClientTimeout(total=None, sock_connect=self._http_timeout_secs) connector = aiohttp.TCPConnector(limit=2, ttl_dns_cache=300) self._session = aiohttp.ClientSession(timeout=timeout, connector=connector) return self._session def _notify_health(self, healthy: bool) -> None: if self._on_health is not None: self._on_health(healthy) @staticmethod def _decode_message(msg: aiohttp.WSMessage) -> str | None: if msg.type == aiohttp.WSMsgType.TEXT: return str(msg.data) if msg.type == aiohttp.WSMsgType.BINARY: data = bytes(msg.data) try: return gzip.decompress(data).decode("utf-8") except OSError: return data.decode("utf-8") return None