repo hygiene: track the PINK launcher import closure
67 production .py modules that the running PINK service imports but which were never committed: prod/bingx/ (HTTP client, market/user streams, journal, config), prod/clean_arch/ adapters/persistence/runtime/dita/dita_v2 production modules and their co-located tests. Rule going forward: every module imported by launch_dolphin_pink.py / pink_direct.py must appear in git ls-files. Excludes _backup dirs, __pycache__, and non-code files. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
225
prod/bingx/data_client.py
Normal file
225
prod/bingx/data_client.py
Normal file
@@ -0,0 +1,225 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import Any
|
||||
|
||||
from nautilus_trader.cache.cache import Cache
|
||||
from nautilus_trader.common.component import LiveClock
|
||||
from nautilus_trader.common.component import MessageBus
|
||||
from nautilus_trader.common.enums import LogColor
|
||||
from nautilus_trader.common.providers import InstrumentProvider
|
||||
from nautilus_trader.core.datetime import millis_to_nanos
|
||||
from nautilus_trader.data.messages import SubscribeOrderBook
|
||||
from nautilus_trader.data.messages import SubscribeQuoteTicks
|
||||
from nautilus_trader.data.messages import UnsubscribeOrderBook
|
||||
from nautilus_trader.data.messages import UnsubscribeQuoteTicks
|
||||
from nautilus_trader.live.data_client import LiveMarketDataClient
|
||||
from nautilus_trader.model.data import BookOrder
|
||||
from nautilus_trader.model.data import OrderBookDelta
|
||||
from nautilus_trader.model.data import OrderBookDeltas
|
||||
from nautilus_trader.model.data import QuoteTick
|
||||
from nautilus_trader.model.enums import BookAction
|
||||
from nautilus_trader.model.enums import OrderSide
|
||||
from nautilus_trader.model.identifiers import ClientId
|
||||
from nautilus_trader.model.identifiers import InstrumentId
|
||||
from nautilus_trader.model.objects import Price
|
||||
from nautilus_trader.model.objects import Quantity
|
||||
|
||||
from .data_config import BingxDataClientConfig
|
||||
from .enums import BINGX_VENUE
|
||||
from .http import BingxHttpClient
|
||||
from .market_stream import BingxMarketStream
|
||||
from .urls import get_public_ws_url
|
||||
|
||||
|
||||
class BingxMarketDataClient(LiveMarketDataClient):
|
||||
"""
|
||||
Nautilus `LiveMarketDataClient` for BingX USDT-M perpetuals.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
loop: asyncio.AbstractEventLoop,
|
||||
client: BingxHttpClient,
|
||||
msgbus: MessageBus,
|
||||
cache: Cache,
|
||||
clock: LiveClock,
|
||||
instrument_provider: InstrumentProvider,
|
||||
config: BingxDataClientConfig,
|
||||
name: str | None = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
loop=loop,
|
||||
client_id=ClientId(name or config.venue.value),
|
||||
venue=BINGX_VENUE,
|
||||
msgbus=msgbus,
|
||||
cache=cache,
|
||||
clock=clock,
|
||||
instrument_provider=instrument_provider,
|
||||
)
|
||||
self._client = client
|
||||
self._cfg = config
|
||||
|
||||
ws_url = config.base_url_ws_market or get_public_ws_url(config.environment)
|
||||
self._ws_url = ws_url
|
||||
self._stream = BingxMarketStream(
|
||||
ws_url=ws_url,
|
||||
on_event=self._handle_ws_event,
|
||||
on_health=self._handle_ws_health,
|
||||
reconnect_initial_ms=int(config.ws_reconnect_initial_ms),
|
||||
reconnect_max_ms=int(config.ws_reconnect_max_ms),
|
||||
http_timeout_secs=int(config.http_timeout_secs),
|
||||
)
|
||||
self._stream_task: asyncio.Task | None = None
|
||||
|
||||
self._raw_to_instrument_id: dict[str, InstrumentId] = {}
|
||||
self._book_sequences: dict[InstrumentId, int] = {}
|
||||
self._quote_subs: set[InstrumentId] = set()
|
||||
self._book_subs: dict[InstrumentId, int] = {}
|
||||
|
||||
def _instrument_for(self, instrument_id: InstrumentId):
|
||||
return self._instrument_provider.get_all().get(instrument_id)
|
||||
|
||||
def _send_all_instruments_to_data_engine(self) -> None:
|
||||
for instrument in self._instrument_provider.get_all().values():
|
||||
self._handle_data(instrument)
|
||||
|
||||
for currency in self._instrument_provider.currencies().values():
|
||||
self._cache.add_currency(currency)
|
||||
|
||||
async def _connect(self) -> None:
|
||||
await self._instrument_provider.initialize()
|
||||
self._send_all_instruments_to_data_engine()
|
||||
|
||||
for instrument_id, instrument in self._instrument_provider.get_all().items():
|
||||
raw = getattr(instrument, "raw_symbol", None)
|
||||
if raw is None:
|
||||
continue
|
||||
self._raw_to_instrument_id[str(raw)] = instrument_id
|
||||
|
||||
self._log.info(f"BingX market WS {self._ws_url}", LogColor.BLUE)
|
||||
self._stream_task = self.create_task(self._stream.run_forever(), log_msg="bingx_market_stream") # type: ignore[arg-type]
|
||||
|
||||
async def _disconnect(self) -> None:
|
||||
if self._stream_task is not None:
|
||||
self._stream_task.cancel()
|
||||
await self._stream.close()
|
||||
|
||||
async def _subscribe_quote_ticks(self, command: SubscribeQuoteTicks) -> None:
|
||||
instrument = self._instrument_for(command.instrument_id)
|
||||
if instrument is None:
|
||||
self._log.warning(f"BingX quote subscription skipped, instrument not found: {command.instrument_id}")
|
||||
return
|
||||
self._quote_subs.add(command.instrument_id)
|
||||
raw_symbol = str(getattr(instrument, "raw_symbol"))
|
||||
self._stream.subscribe(f"{raw_symbol}@bookTicker")
|
||||
|
||||
async def _unsubscribe_quote_ticks(self, command: UnsubscribeQuoteTicks) -> None:
|
||||
self._quote_subs.discard(command.instrument_id)
|
||||
instrument = self._instrument_for(command.instrument_id)
|
||||
if instrument is None:
|
||||
return
|
||||
raw_symbol = str(getattr(instrument, "raw_symbol"))
|
||||
self._stream.unsubscribe(f"{raw_symbol}@bookTicker")
|
||||
|
||||
async def _subscribe_order_book_deltas(self, command: SubscribeOrderBook) -> None:
|
||||
instrument = self._instrument_for(command.instrument_id)
|
||||
if instrument is None:
|
||||
self._log.warning(f"BingX book subscription skipped, instrument not found: {command.instrument_id}")
|
||||
return
|
||||
self._book_subs[command.instrument_id] = int(command.depth or self._cfg.depth_level)
|
||||
raw_symbol = str(getattr(instrument, "raw_symbol"))
|
||||
self._stream.subscribe(f"{raw_symbol}@incrDepth")
|
||||
|
||||
async def _unsubscribe_order_book_deltas(self, command: UnsubscribeOrderBook) -> None:
|
||||
self._book_subs.pop(command.instrument_id, None)
|
||||
instrument = self._instrument_for(command.instrument_id)
|
||||
if instrument is None:
|
||||
return
|
||||
raw_symbol = str(getattr(instrument, "raw_symbol"))
|
||||
self._stream.unsubscribe(f"{raw_symbol}@incrDepth")
|
||||
|
||||
async def _subscribe_order_book_depth(self, command: SubscribeOrderBook) -> None:
|
||||
await self._subscribe_order_book_deltas(command)
|
||||
|
||||
async def _unsubscribe_order_book_depth(self, command: UnsubscribeOrderBook) -> None:
|
||||
await self._unsubscribe_order_book_deltas(command)
|
||||
|
||||
async def _handle_ws_event(self, payload: dict[str, Any]) -> None:
|
||||
data_type = str(payload.get("dataType") or "")
|
||||
data = payload.get("data")
|
||||
if not isinstance(data, dict) or not data_type:
|
||||
return
|
||||
|
||||
sym = str(data.get("s") or data.get("symbol") or "")
|
||||
if not sym:
|
||||
sym = data_type.split("@", 1)[0]
|
||||
instrument_id = self._raw_to_instrument_id.get(sym)
|
||||
if instrument_id is None:
|
||||
return
|
||||
|
||||
ts_ms = int(data.get("T") or 0)
|
||||
ts_event = millis_to_nanos(ts_ms) if ts_ms else self._clock.timestamp_ns()
|
||||
ts_init = self._clock.timestamp_ns()
|
||||
|
||||
if data_type.endswith("@bookTicker") and instrument_id in self._quote_subs:
|
||||
qt = QuoteTick(
|
||||
instrument_id,
|
||||
Price.from_str(str(data.get("b") or "0")),
|
||||
Price.from_str(str(data.get("a") or "0")),
|
||||
Quantity.from_str(str(data.get("B") or "0")),
|
||||
Quantity.from_str(str(data.get("A") or "0")),
|
||||
ts_event,
|
||||
ts_init,
|
||||
)
|
||||
self._handle_data(qt)
|
||||
return
|
||||
|
||||
if data_type.endswith("@incrDepth") and instrument_id in self._book_subs:
|
||||
action = str(data.get("action") or "")
|
||||
last_update_id = int(data.get("lastUpdateId") or 0)
|
||||
bids = data.get("bids")
|
||||
asks = data.get("asks")
|
||||
if not isinstance(bids, list) or not isinstance(asks, list):
|
||||
return
|
||||
|
||||
deltas: list[OrderBookDelta] = []
|
||||
if action == "all":
|
||||
deltas.append(OrderBookDelta(instrument_id, BookAction.CLEAR, None, 0, last_update_id, ts_event, ts_init))
|
||||
else:
|
||||
prev = self._book_sequences.get(instrument_id)
|
||||
if prev is not None and last_update_id and last_update_id != prev + 1:
|
||||
deltas.append(OrderBookDelta(instrument_id, BookAction.CLEAR, None, 0, last_update_id, ts_event, ts_init))
|
||||
if last_update_id:
|
||||
self._book_sequences[instrument_id] = last_update_id
|
||||
|
||||
depth = int(self._book_subs[instrument_id])
|
||||
|
||||
def _emit(side: OrderSide, rows: list) -> None:
|
||||
n = 0
|
||||
for item in rows:
|
||||
if n >= depth:
|
||||
break
|
||||
if not isinstance(item, (list, tuple)) or len(item) < 2:
|
||||
continue
|
||||
px_s = str(item[0])
|
||||
qty_s = str(item[1])
|
||||
qty = Quantity.from_str(qty_s)
|
||||
if qty.as_double() == 0.0:
|
||||
order = BookOrder(side, Price.from_str(px_s), Quantity.from_str("0"), 0)
|
||||
deltas.append(OrderBookDelta(instrument_id, BookAction.DELETE, order, 0, last_update_id, ts_event, ts_init))
|
||||
else:
|
||||
order = BookOrder(side, Price.from_str(px_s), qty, 0)
|
||||
deltas.append(OrderBookDelta(instrument_id, BookAction.UPDATE, order, 0, last_update_id, ts_event, ts_init))
|
||||
n += 1
|
||||
|
||||
_emit(OrderSide.BUY, bids)
|
||||
_emit(OrderSide.SELL, asks)
|
||||
if deltas:
|
||||
self._handle_data(OrderBookDeltas(instrument_id, deltas))
|
||||
|
||||
def _handle_ws_health(self, healthy: bool) -> None:
|
||||
if healthy:
|
||||
self._log.info("BingX market WS healthy", LogColor.GREEN)
|
||||
else:
|
||||
self._log.warning("BingX market WS unhealthy")
|
||||
Reference in New Issue
Block a user