from __future__ import annotations import asyncio from typing import Any from nautilus_trader.cache.cache import Cache from nautilus_trader.common.component import LiveClock from nautilus_trader.common.component import MessageBus from nautilus_trader.common.enums import LogColor from nautilus_trader.common.providers import InstrumentProvider from nautilus_trader.core.datetime import millis_to_nanos from nautilus_trader.data.messages import SubscribeOrderBook from nautilus_trader.data.messages import SubscribeQuoteTicks from nautilus_trader.data.messages import UnsubscribeOrderBook from nautilus_trader.data.messages import UnsubscribeQuoteTicks from nautilus_trader.live.data_client import LiveMarketDataClient from nautilus_trader.model.data import BookOrder from nautilus_trader.model.data import OrderBookDelta from nautilus_trader.model.data import OrderBookDeltas from nautilus_trader.model.data import QuoteTick from nautilus_trader.model.enums import BookAction from nautilus_trader.model.enums import OrderSide from nautilus_trader.model.identifiers import ClientId from nautilus_trader.model.identifiers import InstrumentId from nautilus_trader.model.objects import Price from nautilus_trader.model.objects import Quantity from .data_config import BingxDataClientConfig from .enums import BINGX_VENUE from .http import BingxHttpClient from .market_stream import BingxMarketStream from .urls import get_public_ws_url class BingxMarketDataClient(LiveMarketDataClient): """ Nautilus `LiveMarketDataClient` for BingX USDT-M perpetuals. """ def __init__( self, loop: asyncio.AbstractEventLoop, client: BingxHttpClient, msgbus: MessageBus, cache: Cache, clock: LiveClock, instrument_provider: InstrumentProvider, config: BingxDataClientConfig, name: str | None = None, ) -> None: super().__init__( loop=loop, client_id=ClientId(name or config.venue.value), venue=BINGX_VENUE, msgbus=msgbus, cache=cache, clock=clock, instrument_provider=instrument_provider, ) self._client = client self._cfg = config ws_url = config.base_url_ws_market or get_public_ws_url(config.environment) self._ws_url = ws_url self._stream = BingxMarketStream( ws_url=ws_url, on_event=self._handle_ws_event, on_health=self._handle_ws_health, reconnect_initial_ms=int(config.ws_reconnect_initial_ms), reconnect_max_ms=int(config.ws_reconnect_max_ms), http_timeout_secs=int(config.http_timeout_secs), ) self._stream_task: asyncio.Task | None = None self._raw_to_instrument_id: dict[str, InstrumentId] = {} self._book_sequences: dict[InstrumentId, int] = {} self._quote_subs: set[InstrumentId] = set() self._book_subs: dict[InstrumentId, int] = {} def _instrument_for(self, instrument_id: InstrumentId): return self._instrument_provider.get_all().get(instrument_id) def _send_all_instruments_to_data_engine(self) -> None: for instrument in self._instrument_provider.get_all().values(): self._handle_data(instrument) for currency in self._instrument_provider.currencies().values(): self._cache.add_currency(currency) async def _connect(self) -> None: await self._instrument_provider.initialize() self._send_all_instruments_to_data_engine() for instrument_id, instrument in self._instrument_provider.get_all().items(): raw = getattr(instrument, "raw_symbol", None) if raw is None: continue self._raw_to_instrument_id[str(raw)] = instrument_id self._log.info(f"BingX market WS {self._ws_url}", LogColor.BLUE) self._stream_task = self.create_task(self._stream.run_forever(), log_msg="bingx_market_stream") # type: ignore[arg-type] async def _disconnect(self) -> None: if self._stream_task is not None: self._stream_task.cancel() await self._stream.close() async def _subscribe_quote_ticks(self, command: SubscribeQuoteTicks) -> None: instrument = self._instrument_for(command.instrument_id) if instrument is None: self._log.warning(f"BingX quote subscription skipped, instrument not found: {command.instrument_id}") return self._quote_subs.add(command.instrument_id) raw_symbol = str(getattr(instrument, "raw_symbol")) self._stream.subscribe(f"{raw_symbol}@bookTicker") async def _unsubscribe_quote_ticks(self, command: UnsubscribeQuoteTicks) -> None: self._quote_subs.discard(command.instrument_id) instrument = self._instrument_for(command.instrument_id) if instrument is None: return raw_symbol = str(getattr(instrument, "raw_symbol")) self._stream.unsubscribe(f"{raw_symbol}@bookTicker") async def _subscribe_order_book_deltas(self, command: SubscribeOrderBook) -> None: instrument = self._instrument_for(command.instrument_id) if instrument is None: self._log.warning(f"BingX book subscription skipped, instrument not found: {command.instrument_id}") return self._book_subs[command.instrument_id] = int(command.depth or self._cfg.depth_level) raw_symbol = str(getattr(instrument, "raw_symbol")) self._stream.subscribe(f"{raw_symbol}@incrDepth") async def _unsubscribe_order_book_deltas(self, command: UnsubscribeOrderBook) -> None: self._book_subs.pop(command.instrument_id, None) instrument = self._instrument_for(command.instrument_id) if instrument is None: return raw_symbol = str(getattr(instrument, "raw_symbol")) self._stream.unsubscribe(f"{raw_symbol}@incrDepth") async def _subscribe_order_book_depth(self, command: SubscribeOrderBook) -> None: await self._subscribe_order_book_deltas(command) async def _unsubscribe_order_book_depth(self, command: UnsubscribeOrderBook) -> None: await self._unsubscribe_order_book_deltas(command) async def _handle_ws_event(self, payload: dict[str, Any]) -> None: data_type = str(payload.get("dataType") or "") data = payload.get("data") if not isinstance(data, dict) or not data_type: return sym = str(data.get("s") or data.get("symbol") or "") if not sym: sym = data_type.split("@", 1)[0] instrument_id = self._raw_to_instrument_id.get(sym) if instrument_id is None: return ts_ms = int(data.get("T") or 0) ts_event = millis_to_nanos(ts_ms) if ts_ms else self._clock.timestamp_ns() ts_init = self._clock.timestamp_ns() if data_type.endswith("@bookTicker") and instrument_id in self._quote_subs: qt = QuoteTick( instrument_id, Price.from_str(str(data.get("b") or "0")), Price.from_str(str(data.get("a") or "0")), Quantity.from_str(str(data.get("B") or "0")), Quantity.from_str(str(data.get("A") or "0")), ts_event, ts_init, ) self._handle_data(qt) return if data_type.endswith("@incrDepth") and instrument_id in self._book_subs: action = str(data.get("action") or "") last_update_id = int(data.get("lastUpdateId") or 0) bids = data.get("bids") asks = data.get("asks") if not isinstance(bids, list) or not isinstance(asks, list): return deltas: list[OrderBookDelta] = [] if action == "all": deltas.append(OrderBookDelta(instrument_id, BookAction.CLEAR, None, 0, last_update_id, ts_event, ts_init)) else: prev = self._book_sequences.get(instrument_id) if prev is not None and last_update_id and last_update_id != prev + 1: deltas.append(OrderBookDelta(instrument_id, BookAction.CLEAR, None, 0, last_update_id, ts_event, ts_init)) if last_update_id: self._book_sequences[instrument_id] = last_update_id depth = int(self._book_subs[instrument_id]) def _emit(side: OrderSide, rows: list) -> None: n = 0 for item in rows: if n >= depth: break if not isinstance(item, (list, tuple)) or len(item) < 2: continue px_s = str(item[0]) qty_s = str(item[1]) qty = Quantity.from_str(qty_s) if qty.as_double() == 0.0: order = BookOrder(side, Price.from_str(px_s), Quantity.from_str("0"), 0) deltas.append(OrderBookDelta(instrument_id, BookAction.DELETE, order, 0, last_update_id, ts_event, ts_init)) else: order = BookOrder(side, Price.from_str(px_s), qty, 0) deltas.append(OrderBookDelta(instrument_id, BookAction.UPDATE, order, 0, last_update_id, ts_event, ts_init)) n += 1 _emit(OrderSide.BUY, bids) _emit(OrderSide.SELL, asks) if deltas: self._handle_data(OrderBookDeltas(instrument_id, deltas)) def _handle_ws_health(self, healthy: bool) -> None: if healthy: self._log.info("BingX market WS healthy", LogColor.GREEN) else: self._log.warning("BingX market WS unhealthy")