67 production .py modules that the running PINK service imports but which were never committed: prod/bingx/ (HTTP client, market/user streams, journal, config), prod/clean_arch/ adapters/persistence/runtime/dita/dita_v2 production modules and their co-located tests. Rule going forward: every module imported by launch_dolphin_pink.py / pink_direct.py must appear in git ls-files. Excludes _backup dirs, __pycache__, and non-code files. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
140 lines
4.6 KiB
Python
140 lines
4.6 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import gzip
|
|
import json
|
|
import uuid
|
|
from collections.abc import Awaitable
|
|
from collections.abc import Callable
|
|
from typing import Any
|
|
|
|
import aiohttp
|
|
|
|
|
|
EventHandler = Callable[[dict[str, Any]], Awaitable[None]]
|
|
HealthHandler = Callable[[bool], None]
|
|
|
|
|
|
class BingxMarketStream:
|
|
"""
|
|
Public (unauthenticated) BingX swap-market WebSocket stream.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
*,
|
|
ws_url: str,
|
|
on_event: EventHandler,
|
|
on_health: HealthHandler | None = None,
|
|
reconnect_initial_ms: int = 500,
|
|
reconnect_max_ms: int = 10_000,
|
|
http_timeout_secs: int = 10,
|
|
) -> None:
|
|
self._ws_url = ws_url
|
|
self._on_event = on_event
|
|
self._on_health = on_health
|
|
self._reconnect_initial_ms = int(reconnect_initial_ms)
|
|
self._reconnect_max_ms = int(reconnect_max_ms)
|
|
self._http_timeout_secs = int(http_timeout_secs)
|
|
|
|
self._closed = asyncio.Event()
|
|
self._session: aiohttp.ClientSession | None = None
|
|
|
|
# dataType -> subscription id
|
|
self._subscriptions: dict[str, str] = {}
|
|
self._subscriptions_changed = asyncio.Event()
|
|
|
|
def subscribe(self, data_type: str) -> None:
|
|
if data_type in self._subscriptions:
|
|
return
|
|
self._subscriptions[data_type] = str(uuid.uuid4())
|
|
self._subscriptions_changed.set()
|
|
|
|
def unsubscribe(self, data_type: str) -> None:
|
|
if data_type not in self._subscriptions:
|
|
return
|
|
self._subscriptions.pop(data_type, None)
|
|
self._subscriptions_changed.set()
|
|
|
|
async def run_forever(self) -> None:
|
|
delay_ms = self._reconnect_initial_ms
|
|
while not self._closed.is_set():
|
|
try:
|
|
await self._consume()
|
|
delay_ms = self._reconnect_initial_ms
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception:
|
|
if self._closed.is_set():
|
|
break
|
|
await asyncio.sleep(delay_ms / 1000.0)
|
|
delay_ms = min(delay_ms * 2, self._reconnect_max_ms)
|
|
finally:
|
|
self._notify_health(False)
|
|
await self.close()
|
|
|
|
async def close(self) -> None:
|
|
self._closed.set()
|
|
if self._session is not None and not self._session.closed:
|
|
await self._session.close()
|
|
|
|
async def _consume(self) -> None:
|
|
session = await self._get_session()
|
|
async with session.ws_connect(
|
|
self._ws_url,
|
|
autoping=False,
|
|
autoclose=False,
|
|
heartbeat=None,
|
|
compress=0,
|
|
max_msg_size=0,
|
|
) as ws:
|
|
self._notify_health(True)
|
|
await self._flush_subscriptions(ws)
|
|
|
|
async for msg in ws:
|
|
if msg.type == aiohttp.WSMsgType.CLOSED:
|
|
break
|
|
if msg.type == aiohttp.WSMsgType.ERROR:
|
|
raise ws.exception() or RuntimeError("BingX market socket error")
|
|
|
|
if self._subscriptions_changed.is_set():
|
|
await self._flush_subscriptions(ws)
|
|
|
|
text = self._decode_message(msg)
|
|
if text is None:
|
|
continue
|
|
if text == "Ping" or "ping" in text.lower():
|
|
await ws.send_str("Pong")
|
|
continue
|
|
payload = json.loads(text)
|
|
await self._on_event(payload)
|
|
|
|
async def _flush_subscriptions(self, ws: aiohttp.ClientWebSocketResponse) -> None:
|
|
self._subscriptions_changed.clear()
|
|
for data_type, sub_id in list(self._subscriptions.items()):
|
|
await ws.send_json({"id": sub_id, "reqType": "sub", "dataType": data_type})
|
|
|
|
async def _get_session(self) -> aiohttp.ClientSession:
|
|
if self._session is None or self._session.closed:
|
|
timeout = aiohttp.ClientTimeout(total=None, sock_connect=self._http_timeout_secs)
|
|
connector = aiohttp.TCPConnector(limit=2, ttl_dns_cache=300)
|
|
self._session = aiohttp.ClientSession(timeout=timeout, connector=connector)
|
|
return self._session
|
|
|
|
def _notify_health(self, healthy: bool) -> None:
|
|
if self._on_health is not None:
|
|
self._on_health(healthy)
|
|
|
|
@staticmethod
|
|
def _decode_message(msg: aiohttp.WSMessage) -> str | None:
|
|
if msg.type == aiohttp.WSMsgType.TEXT:
|
|
return str(msg.data)
|
|
if msg.type == aiohttp.WSMsgType.BINARY:
|
|
data = bytes(msg.data)
|
|
try:
|
|
return gzip.decompress(data).decode("utf-8")
|
|
except OSError:
|
|
return data.decode("utf-8")
|
|
return None
|
|
|