226 lines
9.6 KiB
Python
226 lines
9.6 KiB
Python
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
from typing import Any
|
||
|
|
|
||
|
|
from nautilus_trader.cache.cache import Cache
|
||
|
|
from nautilus_trader.common.component import LiveClock
|
||
|
|
from nautilus_trader.common.component import MessageBus
|
||
|
|
from nautilus_trader.common.enums import LogColor
|
||
|
|
from nautilus_trader.common.providers import InstrumentProvider
|
||
|
|
from nautilus_trader.core.datetime import millis_to_nanos
|
||
|
|
from nautilus_trader.data.messages import SubscribeOrderBook
|
||
|
|
from nautilus_trader.data.messages import SubscribeQuoteTicks
|
||
|
|
from nautilus_trader.data.messages import UnsubscribeOrderBook
|
||
|
|
from nautilus_trader.data.messages import UnsubscribeQuoteTicks
|
||
|
|
from nautilus_trader.live.data_client import LiveMarketDataClient
|
||
|
|
from nautilus_trader.model.data import BookOrder
|
||
|
|
from nautilus_trader.model.data import OrderBookDelta
|
||
|
|
from nautilus_trader.model.data import OrderBookDeltas
|
||
|
|
from nautilus_trader.model.data import QuoteTick
|
||
|
|
from nautilus_trader.model.enums import BookAction
|
||
|
|
from nautilus_trader.model.enums import OrderSide
|
||
|
|
from nautilus_trader.model.identifiers import ClientId
|
||
|
|
from nautilus_trader.model.identifiers import InstrumentId
|
||
|
|
from nautilus_trader.model.objects import Price
|
||
|
|
from nautilus_trader.model.objects import Quantity
|
||
|
|
|
||
|
|
from .data_config import BingxDataClientConfig
|
||
|
|
from .enums import BINGX_VENUE
|
||
|
|
from .http import BingxHttpClient
|
||
|
|
from .market_stream import BingxMarketStream
|
||
|
|
from .urls import get_public_ws_url
|
||
|
|
|
||
|
|
|
||
|
|
class BingxMarketDataClient(LiveMarketDataClient):
|
||
|
|
"""
|
||
|
|
Nautilus `LiveMarketDataClient` for BingX USDT-M perpetuals.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(
|
||
|
|
self,
|
||
|
|
loop: asyncio.AbstractEventLoop,
|
||
|
|
client: BingxHttpClient,
|
||
|
|
msgbus: MessageBus,
|
||
|
|
cache: Cache,
|
||
|
|
clock: LiveClock,
|
||
|
|
instrument_provider: InstrumentProvider,
|
||
|
|
config: BingxDataClientConfig,
|
||
|
|
name: str | None = None,
|
||
|
|
) -> None:
|
||
|
|
super().__init__(
|
||
|
|
loop=loop,
|
||
|
|
client_id=ClientId(name or config.venue.value),
|
||
|
|
venue=BINGX_VENUE,
|
||
|
|
msgbus=msgbus,
|
||
|
|
cache=cache,
|
||
|
|
clock=clock,
|
||
|
|
instrument_provider=instrument_provider,
|
||
|
|
)
|
||
|
|
self._client = client
|
||
|
|
self._cfg = config
|
||
|
|
|
||
|
|
ws_url = config.base_url_ws_market or get_public_ws_url(config.environment)
|
||
|
|
self._ws_url = ws_url
|
||
|
|
self._stream = BingxMarketStream(
|
||
|
|
ws_url=ws_url,
|
||
|
|
on_event=self._handle_ws_event,
|
||
|
|
on_health=self._handle_ws_health,
|
||
|
|
reconnect_initial_ms=int(config.ws_reconnect_initial_ms),
|
||
|
|
reconnect_max_ms=int(config.ws_reconnect_max_ms),
|
||
|
|
http_timeout_secs=int(config.http_timeout_secs),
|
||
|
|
)
|
||
|
|
self._stream_task: asyncio.Task | None = None
|
||
|
|
|
||
|
|
self._raw_to_instrument_id: dict[str, InstrumentId] = {}
|
||
|
|
self._book_sequences: dict[InstrumentId, int] = {}
|
||
|
|
self._quote_subs: set[InstrumentId] = set()
|
||
|
|
self._book_subs: dict[InstrumentId, int] = {}
|
||
|
|
|
||
|
|
def _instrument_for(self, instrument_id: InstrumentId):
|
||
|
|
return self._instrument_provider.get_all().get(instrument_id)
|
||
|
|
|
||
|
|
def _send_all_instruments_to_data_engine(self) -> None:
|
||
|
|
for instrument in self._instrument_provider.get_all().values():
|
||
|
|
self._handle_data(instrument)
|
||
|
|
|
||
|
|
for currency in self._instrument_provider.currencies().values():
|
||
|
|
self._cache.add_currency(currency)
|
||
|
|
|
||
|
|
async def _connect(self) -> None:
|
||
|
|
await self._instrument_provider.initialize()
|
||
|
|
self._send_all_instruments_to_data_engine()
|
||
|
|
|
||
|
|
for instrument_id, instrument in self._instrument_provider.get_all().items():
|
||
|
|
raw = getattr(instrument, "raw_symbol", None)
|
||
|
|
if raw is None:
|
||
|
|
continue
|
||
|
|
self._raw_to_instrument_id[str(raw)] = instrument_id
|
||
|
|
|
||
|
|
self._log.info(f"BingX market WS {self._ws_url}", LogColor.BLUE)
|
||
|
|
self._stream_task = self.create_task(self._stream.run_forever(), log_msg="bingx_market_stream") # type: ignore[arg-type]
|
||
|
|
|
||
|
|
async def _disconnect(self) -> None:
|
||
|
|
if self._stream_task is not None:
|
||
|
|
self._stream_task.cancel()
|
||
|
|
await self._stream.close()
|
||
|
|
|
||
|
|
async def _subscribe_quote_ticks(self, command: SubscribeQuoteTicks) -> None:
|
||
|
|
instrument = self._instrument_for(command.instrument_id)
|
||
|
|
if instrument is None:
|
||
|
|
self._log.warning(f"BingX quote subscription skipped, instrument not found: {command.instrument_id}")
|
||
|
|
return
|
||
|
|
self._quote_subs.add(command.instrument_id)
|
||
|
|
raw_symbol = str(getattr(instrument, "raw_symbol"))
|
||
|
|
self._stream.subscribe(f"{raw_symbol}@bookTicker")
|
||
|
|
|
||
|
|
async def _unsubscribe_quote_ticks(self, command: UnsubscribeQuoteTicks) -> None:
|
||
|
|
self._quote_subs.discard(command.instrument_id)
|
||
|
|
instrument = self._instrument_for(command.instrument_id)
|
||
|
|
if instrument is None:
|
||
|
|
return
|
||
|
|
raw_symbol = str(getattr(instrument, "raw_symbol"))
|
||
|
|
self._stream.unsubscribe(f"{raw_symbol}@bookTicker")
|
||
|
|
|
||
|
|
async def _subscribe_order_book_deltas(self, command: SubscribeOrderBook) -> None:
|
||
|
|
instrument = self._instrument_for(command.instrument_id)
|
||
|
|
if instrument is None:
|
||
|
|
self._log.warning(f"BingX book subscription skipped, instrument not found: {command.instrument_id}")
|
||
|
|
return
|
||
|
|
self._book_subs[command.instrument_id] = int(command.depth or self._cfg.depth_level)
|
||
|
|
raw_symbol = str(getattr(instrument, "raw_symbol"))
|
||
|
|
self._stream.subscribe(f"{raw_symbol}@incrDepth")
|
||
|
|
|
||
|
|
async def _unsubscribe_order_book_deltas(self, command: UnsubscribeOrderBook) -> None:
|
||
|
|
self._book_subs.pop(command.instrument_id, None)
|
||
|
|
instrument = self._instrument_for(command.instrument_id)
|
||
|
|
if instrument is None:
|
||
|
|
return
|
||
|
|
raw_symbol = str(getattr(instrument, "raw_symbol"))
|
||
|
|
self._stream.unsubscribe(f"{raw_symbol}@incrDepth")
|
||
|
|
|
||
|
|
async def _subscribe_order_book_depth(self, command: SubscribeOrderBook) -> None:
|
||
|
|
await self._subscribe_order_book_deltas(command)
|
||
|
|
|
||
|
|
async def _unsubscribe_order_book_depth(self, command: UnsubscribeOrderBook) -> None:
|
||
|
|
await self._unsubscribe_order_book_deltas(command)
|
||
|
|
|
||
|
|
async def _handle_ws_event(self, payload: dict[str, Any]) -> None:
|
||
|
|
data_type = str(payload.get("dataType") or "")
|
||
|
|
data = payload.get("data")
|
||
|
|
if not isinstance(data, dict) or not data_type:
|
||
|
|
return
|
||
|
|
|
||
|
|
sym = str(data.get("s") or data.get("symbol") or "")
|
||
|
|
if not sym:
|
||
|
|
sym = data_type.split("@", 1)[0]
|
||
|
|
instrument_id = self._raw_to_instrument_id.get(sym)
|
||
|
|
if instrument_id is None:
|
||
|
|
return
|
||
|
|
|
||
|
|
ts_ms = int(data.get("T") or 0)
|
||
|
|
ts_event = millis_to_nanos(ts_ms) if ts_ms else self._clock.timestamp_ns()
|
||
|
|
ts_init = self._clock.timestamp_ns()
|
||
|
|
|
||
|
|
if data_type.endswith("@bookTicker") and instrument_id in self._quote_subs:
|
||
|
|
qt = QuoteTick(
|
||
|
|
instrument_id,
|
||
|
|
Price.from_str(str(data.get("b") or "0")),
|
||
|
|
Price.from_str(str(data.get("a") or "0")),
|
||
|
|
Quantity.from_str(str(data.get("B") or "0")),
|
||
|
|
Quantity.from_str(str(data.get("A") or "0")),
|
||
|
|
ts_event,
|
||
|
|
ts_init,
|
||
|
|
)
|
||
|
|
self._handle_data(qt)
|
||
|
|
return
|
||
|
|
|
||
|
|
if data_type.endswith("@incrDepth") and instrument_id in self._book_subs:
|
||
|
|
action = str(data.get("action") or "")
|
||
|
|
last_update_id = int(data.get("lastUpdateId") or 0)
|
||
|
|
bids = data.get("bids")
|
||
|
|
asks = data.get("asks")
|
||
|
|
if not isinstance(bids, list) or not isinstance(asks, list):
|
||
|
|
return
|
||
|
|
|
||
|
|
deltas: list[OrderBookDelta] = []
|
||
|
|
if action == "all":
|
||
|
|
deltas.append(OrderBookDelta(instrument_id, BookAction.CLEAR, None, 0, last_update_id, ts_event, ts_init))
|
||
|
|
else:
|
||
|
|
prev = self._book_sequences.get(instrument_id)
|
||
|
|
if prev is not None and last_update_id and last_update_id != prev + 1:
|
||
|
|
deltas.append(OrderBookDelta(instrument_id, BookAction.CLEAR, None, 0, last_update_id, ts_event, ts_init))
|
||
|
|
if last_update_id:
|
||
|
|
self._book_sequences[instrument_id] = last_update_id
|
||
|
|
|
||
|
|
depth = int(self._book_subs[instrument_id])
|
||
|
|
|
||
|
|
def _emit(side: OrderSide, rows: list) -> None:
|
||
|
|
n = 0
|
||
|
|
for item in rows:
|
||
|
|
if n >= depth:
|
||
|
|
break
|
||
|
|
if not isinstance(item, (list, tuple)) or len(item) < 2:
|
||
|
|
continue
|
||
|
|
px_s = str(item[0])
|
||
|
|
qty_s = str(item[1])
|
||
|
|
qty = Quantity.from_str(qty_s)
|
||
|
|
if qty.as_double() == 0.0:
|
||
|
|
order = BookOrder(side, Price.from_str(px_s), Quantity.from_str("0"), 0)
|
||
|
|
deltas.append(OrderBookDelta(instrument_id, BookAction.DELETE, order, 0, last_update_id, ts_event, ts_init))
|
||
|
|
else:
|
||
|
|
order = BookOrder(side, Price.from_str(px_s), qty, 0)
|
||
|
|
deltas.append(OrderBookDelta(instrument_id, BookAction.UPDATE, order, 0, last_update_id, ts_event, ts_init))
|
||
|
|
n += 1
|
||
|
|
|
||
|
|
_emit(OrderSide.BUY, bids)
|
||
|
|
_emit(OrderSide.SELL, asks)
|
||
|
|
if deltas:
|
||
|
|
self._handle_data(OrderBookDeltas(instrument_id, deltas))
|
||
|
|
|
||
|
|
def _handle_ws_health(self, healthy: bool) -> None:
|
||
|
|
if healthy:
|
||
|
|
self._log.info("BingX market WS healthy", LogColor.GREEN)
|
||
|
|
else:
|
||
|
|
self._log.warning("BingX market WS unhealthy")
|