67 production .py modules that the running PINK service imports but which were never committed: prod/bingx/ (HTTP client, market/user streams, journal, config), prod/clean_arch/ adapters/persistence/runtime/dita/dita_v2 production modules and their co-located tests. Rule going forward: every module imported by launch_dolphin_pink.py / pink_direct.py must appear in git ls-files. Excludes _backup dirs, __pycache__, and non-code files. Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
153 lines
5.8 KiB
Python
153 lines
5.8 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import contextlib
|
|
import gzip
|
|
import json
|
|
from collections.abc import Awaitable
|
|
from collections.abc import Callable
|
|
from typing import Any
|
|
|
|
import aiohttp
|
|
|
|
from .config import BingxExecClientConfig
|
|
from .http import BingxHttpClient
|
|
from .rate_limits import BingxRateLimitTracker
|
|
from .urls import get_private_ws_url
|
|
|
|
|
|
EventHandler = Callable[[dict[str, Any]], Awaitable[None]]
|
|
HealthHandler = Callable[[bool], None]
|
|
|
|
|
|
class BingxUserStream:
|
|
def __init__(
|
|
self,
|
|
client: BingxHttpClient,
|
|
config: BingxExecClientConfig,
|
|
on_event: EventHandler,
|
|
on_health: HealthHandler | None = None,
|
|
) -> None:
|
|
self._client = client
|
|
self._config = config
|
|
self._on_event = on_event
|
|
self._on_health = on_health
|
|
self._rate_limits: BingxRateLimitTracker = client.rate_limits
|
|
self._closed = asyncio.Event()
|
|
self._session: aiohttp.ClientSession | None = None
|
|
|
|
async def run_forever(self) -> None:
|
|
delay_ms = int(self._config.ws_reconnect_initial_ms)
|
|
max_delay_ms = int(self._config.ws_reconnect_max_ms)
|
|
while not self._closed.is_set():
|
|
listen_key: str | None = None
|
|
keepalive_task: asyncio.Task | None = None
|
|
try:
|
|
listen_key = await self._create_listen_key()
|
|
keepalive_task = asyncio.create_task(self._keepalive_loop(listen_key))
|
|
await self._consume(listen_key)
|
|
delay_ms = int(self._config.ws_reconnect_initial_ms)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception:
|
|
if self._closed.is_set():
|
|
break
|
|
await asyncio.sleep(delay_ms / 1000.0)
|
|
delay_ms = min(delay_ms * 2, max_delay_ms)
|
|
finally:
|
|
self._notify_health(False)
|
|
if keepalive_task is not None:
|
|
keepalive_task.cancel()
|
|
with contextlib.suppress(asyncio.CancelledError):
|
|
await keepalive_task
|
|
if listen_key is not None:
|
|
with contextlib.suppress(Exception, asyncio.CancelledError):
|
|
await self._delete_listen_key(listen_key)
|
|
await self.close()
|
|
|
|
async def close(self) -> None:
|
|
self._closed.set()
|
|
if self._session is not None and not self._session.closed:
|
|
await self._session.close()
|
|
|
|
async def _consume(self, listen_key: str) -> None:
|
|
ws_base = self._config.base_url_ws_private or get_private_ws_url(self._config.environment)
|
|
if not ws_base:
|
|
raise RuntimeError(f"No BingX private WS URL configured for {self._config.environment.value}")
|
|
session = await self._get_session()
|
|
ws_url = f"{ws_base}?listenKey={listen_key}"
|
|
async with session.ws_connect(
|
|
ws_url,
|
|
autoping=False,
|
|
autoclose=False,
|
|
heartbeat=None,
|
|
compress=0,
|
|
max_msg_size=0,
|
|
) as ws:
|
|
self._notify_health(True)
|
|
async for msg in ws:
|
|
if msg.type == aiohttp.WSMsgType.CLOSED:
|
|
break
|
|
if msg.type == aiohttp.WSMsgType.ERROR:
|
|
raise ws.exception() or RuntimeError("BingX user stream socket error")
|
|
text = self._decode_message(msg)
|
|
if text is None:
|
|
continue
|
|
if text == "Ping" or "ping" in text.lower():
|
|
await ws.send_str("Pong")
|
|
continue
|
|
payload = json.loads(text)
|
|
await self._on_event(payload)
|
|
if payload.get("e") == "listenKeyExpired":
|
|
raise RuntimeError("BingX listen key expired")
|
|
|
|
async def _create_listen_key(self) -> str:
|
|
self._rate_limits.count_ws_listenkey_op()
|
|
response = await self._client.signed_post_raw("/openApi/user/auth/userDataStream", {})
|
|
listen_key = str(response.get("listenKey") or "")
|
|
if not listen_key:
|
|
raise RuntimeError("BingX listen key was empty")
|
|
return listen_key
|
|
|
|
async def _keepalive_loop(self, listen_key: str) -> None:
|
|
interval_secs = int(self._config.ws_listenkey_keepalive_interval_secs)
|
|
while not self._closed.is_set():
|
|
await asyncio.sleep(interval_secs)
|
|
self._rate_limits.count_ws_listenkey_op()
|
|
await self._client.signed_put_raw(
|
|
"/openApi/user/auth/userDataStream",
|
|
{"listenKey": listen_key},
|
|
allow_empty=True,
|
|
)
|
|
|
|
async def _delete_listen_key(self, listen_key: str) -> None:
|
|
self._rate_limits.count_ws_listenkey_op()
|
|
await self._client.signed_delete_raw(
|
|
"/openApi/user/auth/userDataStream",
|
|
{"listenKey": listen_key},
|
|
allow_empty=True,
|
|
)
|
|
|
|
async def _get_session(self) -> aiohttp.ClientSession:
|
|
if self._session is None or self._session.closed:
|
|
timeout = aiohttp.ClientTimeout(total=None, sock_connect=self._config.http_timeout_secs)
|
|
connector = aiohttp.TCPConnector(limit=4, ttl_dns_cache=300)
|
|
self._session = aiohttp.ClientSession(timeout=timeout, connector=connector)
|
|
return self._session
|
|
|
|
def _notify_health(self, healthy: bool) -> None:
|
|
if self._on_health is not None:
|
|
self._on_health(healthy)
|
|
|
|
@staticmethod
|
|
def _decode_message(msg: aiohttp.WSMessage) -> str | None:
|
|
if msg.type == aiohttp.WSMsgType.TEXT:
|
|
return str(msg.data)
|
|
if msg.type == aiohttp.WSMsgType.BINARY:
|
|
data = bytes(msg.data)
|
|
try:
|
|
return gzip.decompress(data).decode("utf-8")
|
|
except OSError:
|
|
return data.decode("utf-8")
|
|
return None
|