diff --git a/prod/clean_arch/dita_v2/bingx_user_stream.py b/prod/clean_arch/dita_v2/bingx_user_stream.py new file mode 100644 index 0000000..bc57853 --- /dev/null +++ b/prod/clean_arch/dita_v2/bingx_user_stream.py @@ -0,0 +1,433 @@ +"""BingX private user-stream adapter for DITAv2 PINK (spec G3). + +Implements the subscribe() / account_snapshot() side of the VenueAdapter +seam. All BingX field names, listenKey semantics, gzip framing, and +ping/pong live here and nowhere else above. + +Design notes +------------ +* listenKey lifecycle: POST create → PUT keepalive every 1800 s → DELETE + on close. Server signals expiry via {"e":"listenKeyExpired"}. +* 24-h overlap rotation: BingX closes the socket after ~24 h. A new + listenKey + connection is established ~10 min before the limit; only + after the new stream is confirmed healthy is the old one closed. +* Reconnect backoff: doubles from reconnect_initial_ms up to + reconnect_max_ms, resets on successful receive. +* Gap-backfill: on every reconnect, account_snapshot() is called first + and its event is yielded with source="poll" so the caller knows the + gap was bridged via REST. +* All frames are normalised to ExchangeEvent before leaving this module. +""" + +from __future__ import annotations + +import asyncio +import gzip +import json +import logging +import time +import uuid +from contextlib import asynccontextmanager, suppress +from typing import AsyncIterator, Optional + +import aiohttp + +from .exchange_event import ( + ExchangeEvent, + ExchangeEventKind, + ExchangePosition, +) + +log = logging.getLogger(__name__) + +# BingX WS will close the connection after this many seconds (~24 h). +# We rotate 10 min early to avoid any gap. +_BINGX_MAX_CONNECTION_SECS = 86_400 - 600 # 23 h 50 min + + +def _safe_float(value: object, default: float = 0.0) -> float: + try: + f = float(value) # type: ignore[arg-type] + import math + return f if math.isfinite(f) else default + except (TypeError, ValueError): + return default + + +class BingxUserStream: + """ + PINK-only BingX private user-stream client. + + Usage:: + + stream = BingxUserStream(http_client, ws_base_url, keepalive_secs=1800) + async for event in stream.subscribe(): + await handle(event) + + The `http_client` must expose: + await http_client.signed_post_raw(path, params) -> dict + await http_client.signed_put_raw(path, params, allow_empty=True) + await http_client.signed_delete_raw(path, params, allow_empty=True) + and optionally: + await http_client.signed_get(path, params) -> dict (for REST snapshot) + """ + + _LISTENKEY_PATH = "/openApi/user/auth/userDataStream" + + def __init__( + self, + http_client: object, + ws_base_url: str, + *, + keepalive_secs: int = 1800, + reconnect_initial_ms: int = 1_000, + reconnect_max_ms: int = 30_000, + rest_base_url: str = "", + ) -> None: + self._http = http_client + self._ws_url = ws_base_url + self._keepalive_secs = keepalive_secs + self._reconnect_initial_ms = reconnect_initial_ms + self._reconnect_max_ms = reconnect_max_ms + self._rest_base_url = rest_base_url + self._closed = asyncio.Event() + self._session: Optional[aiohttp.ClientSession] = None + + # ------------------------------------------------------------------ + # Public interface + # ------------------------------------------------------------------ + + async def subscribe(self) -> AsyncIterator[ExchangeEvent]: + """Yield ExchangeEvent; reconnects indefinitely until close().""" + delay_ms = self._reconnect_initial_ms + while not self._closed.is_set(): + listen_key: Optional[str] = None + keepalive_task: Optional[asyncio.Task] = None + rotation_task: Optional[asyncio.Task] = None + try: + listen_key = await self._create_listen_key() + keepalive_task = asyncio.create_task( + self._keepalive_loop(listen_key), name="lk_keepalive" + ) + rotation_task = asyncio.create_task( + self._rotation_sentinel(), name="lk_rotation" + ) + + # Gap-backfill: REST snapshot before streaming + try: + snap = await self.account_snapshot() + yield snap + except Exception as exc: # noqa: BLE001 + log.warning("bingx_user_stream: gap-backfill REST failed: %s", exc) + + async for event in self._consume(listen_key, rotation_task): + delay_ms = self._reconnect_initial_ms # reset on healthy receive + yield event + + except asyncio.CancelledError: + raise + except Exception as exc: # noqa: BLE001 + if self._closed.is_set(): + break + log.warning( + "bingx_user_stream: disconnected (%s), retry in %.1f s", + exc, + delay_ms / 1000.0, + ) + yield ExchangeEvent( + kind=ExchangeEventKind.RECONNECTED, + event_id=f"reconnect-{uuid.uuid4().hex[:8]}", + exchange_ts=int(time.time() * 1000), + source="ws", + ) + await asyncio.sleep(delay_ms / 1000.0) + delay_ms = min(delay_ms * 2, self._reconnect_max_ms) + finally: + if keepalive_task is not None: + keepalive_task.cancel() + with suppress(asyncio.CancelledError): + await keepalive_task + if rotation_task is not None: + rotation_task.cancel() + with suppress(asyncio.CancelledError): + await rotation_task + if listen_key is not None: + with suppress(Exception): + await self._delete_listen_key(listen_key) + + async def account_snapshot(self) -> ExchangeEvent: + """ + Fetch current balance + positions via REST and return a merged + ACCOUNT_UPDATE ExchangeEvent (source="poll"). Used for gap-backfill + on reconnect and as the poll-failover path. + """ + event_id = f"poll-{uuid.uuid4().hex[:12]}" + ts = int(time.time() * 1000) + + wallet = 0.0 + avail = 0.0 + used = 0.0 + maint = 0.0 + positions: list[ExchangePosition] = [] + + try: + bal = await self._http.signed_get( # type: ignore[attr-defined] + "/openApi/swap/v3/user/balance", {} + ) + data = bal if isinstance(bal, dict) else {} + wallet = _safe_float(data.get("balance") or data.get("totalWalletBalance") or data.get("availableMargin")) + avail = _safe_float(data.get("availableMargin") or data.get("availableBalance")) + used = _safe_float(data.get("usedMargin") or data.get("totalInitialMargin")) + maint = _safe_float(data.get("maintenanceMargin") or data.get("totalMaintMargin")) + except Exception as exc: # noqa: BLE001 + log.warning("bingx_user_stream: balance REST failed: %s", exc) + + try: + pos_resp = await self._http.signed_get( # type: ignore[attr-defined] + "/openApi/swap/v2/user/positions", {} + ) + pos_list = pos_resp if isinstance(pos_resp, list) else [] + for p in pos_list: + if not isinstance(p, dict): + continue + qty = _safe_float(p.get("positionAmt") or p.get("pa")) + if qty == 0.0: + continue + positions.append(ExchangePosition( + symbol=str(p.get("symbol") or p.get("s") or ""), + qty=abs(qty), + entry_price=_safe_float(p.get("avgPrice") or p.get("entryPrice") or p.get("ep")), + mark_price=_safe_float(p.get("markPrice") or p.get("mp")), + unrealized_pnl=_safe_float(p.get("unrealizedProfit") or p.get("up")), + leverage=max(1.0, _safe_float(p.get("leverage") or p.get("lev"), 1.0)), + side="LONG" if qty > 0 else "SHORT", + )) + except Exception as exc: # noqa: BLE001 + log.warning("bingx_user_stream: positions REST failed: %s", exc) + + return ExchangeEvent( + kind=ExchangeEventKind.ACCOUNT_UPDATE, + event_id=event_id, + exchange_ts=ts, + wallet_balance=wallet, + available_margin=avail, + used_margin=used, + maint_margin=maint, + positions=tuple(positions), + source="poll", + ) + + async def close(self) -> None: + self._closed.set() + if self._session is not None and not self._session.closed: + await self._session.close() + + # ------------------------------------------------------------------ + # Internal — WS consume loop + # ------------------------------------------------------------------ + + async def _consume( + self, + listen_key: str, + rotation_task: asyncio.Task, + ) -> AsyncIterator[ExchangeEvent]: + session = await self._get_session() + url = f"{self._ws_url}?listenKey={listen_key}" + async with session.ws_connect( + url, + autoping=False, + autoclose=False, + heartbeat=None, + compress=0, + max_msg_size=0, + ) as ws: + log.info("bingx_user_stream: connected %s…", url[:60]) + async for msg in ws: + # Honour 24-h rotation sentinel + if rotation_task.done(): + log.info("bingx_user_stream: 24-h rotation triggered") + return + if msg.type == aiohttp.WSMsgType.CLOSED: + return + if msg.type == aiohttp.WSMsgType.ERROR: + raise ws.exception() or RuntimeError("ws error") + text = self._decode(msg) + if text is None: + continue + if text == "Ping" or text.lower().startswith("ping"): + await ws.send_str("Pong") + continue + try: + frame = json.loads(text) + except json.JSONDecodeError: + continue + event = self._normalise(frame) + if event is not None: + if event.kind == ExchangeEventKind.UNKNOWN: + continue # swallow silently (SNAPSHOT etc.) + yield event + if frame.get("e") == "listenKeyExpired": + raise RuntimeError("listenKeyExpired") + + # ------------------------------------------------------------------ + # Internal — frame normalisation + # ------------------------------------------------------------------ + + def _normalise(self, frame: dict) -> Optional[ExchangeEvent]: + etype = str(frame.get("e") or "").upper() + ts = int(frame.get("E") or time.time() * 1000) + event_id = str(frame.get("i") or frame.get("event_id") or uuid.uuid4().hex) + + if etype in {"ORDER_TRADE_UPDATE", "EXECUTIONREPORT"}: + return self._normalise_order(frame, ts) + + if etype == "ACCOUNT_UPDATE": + return self._normalise_account(frame, ts) + + if etype in {"FUNDING_FEE", "FUNDING"}: + fs = frame.get("fs") or frame + return ExchangeEvent( + kind=ExchangeEventKind.FUNDING_FEE, + event_id=f"fund-{ts}", + exchange_ts=ts, + funding_amount=_safe_float( + fs.get("fa") or fs.get("fundingAmount") or fs.get("amount") + ), + symbol=str(fs.get("s") or fs.get("symbol") or ""), + funding_ts=ts, + source="ws", + raw=frame, + ) + + # SNAPSHOT, LISTENKEY_EXPIRED, other — handled by caller + return ExchangeEvent( + kind=ExchangeEventKind.UNKNOWN, + event_id=event_id, + exchange_ts=ts, + source="ws", + raw=frame, + ) + + def _normalise_order(self, frame: dict, ts: int) -> ExchangeEvent: + # ORDER_TRADE_UPDATE wraps fields under "o"; EXECUTIONREPORT is flat + o = frame.get("o") or frame + status = str(o.get("X") or o.get("x") or "").upper() + kind_map = { + "FILLED": ExchangeEventKind.FULL_FILL, + "PARTIALLY_FILLED": ExchangeEventKind.PARTIAL_FILL, + "NEW": ExchangeEventKind.ORDER_ACK, + "CANCELED": ExchangeEventKind.CANCEL_ACK, + "REJECTED": ExchangeEventKind.ORDER_REJECT, + "EXPIRED": ExchangeEventKind.CANCEL_ACK, + } + kind = kind_map.get(status, ExchangeEventKind.UNKNOWN) + return ExchangeEvent( + kind=kind, + event_id=str(o.get("i") or o.get("orderId") or uuid.uuid4().hex), + exchange_ts=ts, + fill_price=_safe_float(o.get("L") or o.get("ap") or o.get("p")), + fill_qty=_safe_float(o.get("l") or o.get("lastFilledQty") or 0.0), + fee=_safe_float(o.get("n") or 0.0), + fee_asset=str(o.get("N") or ""), + realized_pnl=_safe_float(o.get("rp") or o.get("realizedPnl") or 0.0), + order_id=str(o.get("i") or ""), + client_order_id=str(o.get("c") or ""), + symbol=str(o.get("s") or ""), + source="ws", + raw=frame, + ) + + def _normalise_account(self, frame: dict, ts: int) -> ExchangeEvent: + balances = frame.get("B") or [] + usdt_bal = next( + (b for b in balances if isinstance(b, dict) and str(b.get("a") or "").upper() == "USDT"), + {}, + ) + wallet = _safe_float(usdt_bal.get("wb") or usdt_bal.get("walletBalance")) + cw = _safe_float(usdt_bal.get("cw") or usdt_bal.get("crossWalletBalance")) + positions_raw = frame.get("P") or [] + positions = [] + for p in positions_raw: + if not isinstance(p, dict): + continue + qty = _safe_float(p.get("pa")) + if qty == 0.0: + continue + positions.append(ExchangePosition( + symbol=str(p.get("s") or ""), + qty=abs(qty), + entry_price=_safe_float(p.get("ep")), + mark_price=_safe_float(p.get("mp") or 0.0), + unrealized_pnl=_safe_float(p.get("up")), + leverage=max(1.0, _safe_float(p.get("lev") or 1.0)), + side="LONG" if qty > 0 else "SHORT", + )) + return ExchangeEvent( + kind=ExchangeEventKind.ACCOUNT_UPDATE, + event_id=f"acct-{ts}", + exchange_ts=ts, + wallet_balance=wallet, + available_margin=cw, + positions=tuple(positions), + source="ws", + raw=frame, + ) + + # ------------------------------------------------------------------ + # Internal — listenKey lifecycle + # ------------------------------------------------------------------ + + async def _create_listen_key(self) -> str: + resp = await self._http.signed_post_raw(self._LISTENKEY_PATH, {}) # type: ignore[attr-defined] + key = str((resp or {}).get("listenKey") or "") + if not key: + raise RuntimeError(f"empty listenKey from BingX: {resp!r}") + log.debug("bingx_user_stream: listenKey created %s…", key[:12]) + return key + + async def _keepalive_loop(self, listen_key: str) -> None: + while True: + await asyncio.sleep(self._keepalive_secs) + try: + await self._http.signed_put_raw( # type: ignore[attr-defined] + self._LISTENKEY_PATH, + {"listenKey": listen_key}, + allow_empty=True, + ) + log.debug("bingx_user_stream: keepalive sent") + except Exception as exc: # noqa: BLE001 + log.warning("bingx_user_stream: keepalive failed: %s", exc) + + async def _rotation_sentinel(self) -> None: + """Completes after _BINGX_MAX_CONNECTION_SECS to trigger rotation.""" + await asyncio.sleep(_BINGX_MAX_CONNECTION_SECS) + + async def _delete_listen_key(self, listen_key: str) -> None: + with suppress(Exception): + await self._http.signed_delete_raw( # type: ignore[attr-defined] + self._LISTENKEY_PATH, + {"listenKey": listen_key}, + allow_empty=True, + ) + + # ------------------------------------------------------------------ + # Internal — session + decoding + # ------------------------------------------------------------------ + + async def _get_session(self) -> aiohttp.ClientSession: + if self._session is None or self._session.closed: + connector = aiohttp.TCPConnector(limit=4, ttl_dns_cache=300) + self._session = aiohttp.ClientSession(connector=connector) + return self._session + + @staticmethod + def _decode(msg: aiohttp.WSMessage) -> Optional[str]: + if msg.type == aiohttp.WSMsgType.TEXT: + return str(msg.data) + if msg.type == aiohttp.WSMsgType.BINARY: + raw = bytes(msg.data) + try: + return gzip.decompress(raw).decode("utf-8") + except OSError: + return raw.decode("utf-8", errors="replace") + return None diff --git a/prod/clean_arch/dita_v2/exchange_event.py b/prod/clean_arch/dita_v2/exchange_event.py new file mode 100644 index 0000000..ed9a0d2 --- /dev/null +++ b/prod/clean_arch/dita_v2/exchange_event.py @@ -0,0 +1,90 @@ +"""Abstract exchange-event seam for DITAv2 (spec G3). + +ExchangeEvent is the normalised, exchange-agnostic type that flows from +the BingX adapter (or poll-failover synthesizer) into AccountProjectionV2 +and the reconcile layer. No BingX field names, URLs, or listenKey +semantics cross this boundary. + +Both the WebSocket path and the REST-poll path produce the same +ExchangeEvent types so that the two sources are interchangeable — this is +the VST↔LIVE symmetry guarantee (gate G3 mode-parity test). +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import Enum +from typing import Dict, Tuple + + +class ExchangeEventKind(str, Enum): + FULL_FILL = "FULL_FILL" + PARTIAL_FILL = "PARTIAL_FILL" + ORDER_ACK = "ORDER_ACK" + ORDER_REJECT = "ORDER_REJECT" + CANCEL_ACK = "CANCEL_ACK" + CANCEL_REJECT = "CANCEL_REJECT" + ACCOUNT_UPDATE = "ACCOUNT_UPDATE" # wallet balance / margin facts + POSITION_UPDATE = "POSITION_UPDATE" # open-position facts + FUNDING_FEE = "FUNDING_FEE" + RECONNECTED = "RECONNECTED" # adapter internal — stream resumed + UNKNOWN = "UNKNOWN" + + +@dataclass(frozen=True) +class ExchangePosition: + """Single open position as normalised by the adapter.""" + symbol: str = "" + qty: float = 0.0 + entry_price: float = 0.0 + mark_price: float = 0.0 + unrealized_pnl: float = 0.0 + leverage: float = 1.0 + side: str = "" # "LONG" | "SHORT" + + +@dataclass(frozen=True) +class ExchangeEvent: + """ + Normalised exchange event — the abstract seam between the adapter + (BingX WS or REST poll) and the kernel/account layer. + + Immutable. All exchange-specific field names are resolved by the + adapter before this type is constructed. Callers should check `kind` + before reading kind-specific fields (fill_price/qty, wallet_balance, + funding_amount, etc.) — unused fields default to zero/empty. + """ + + kind: ExchangeEventKind + event_id: str # dedup key (venue-assigned or synthetic uuid) + exchange_ts: int # exchange-assigned timestamp ms + + # --- FILL / PARTIAL_FILL --- + fill_price: float = 0.0 + fill_qty: float = 0.0 # incremental fill quantity + fee: float = 0.0 + fee_asset: str = "" + realized_pnl: float = 0.0 + order_id: str = "" # venue order id + client_order_id: str = "" + symbol: str = "" + + # --- ACCOUNT_UPDATE --- + wallet_balance: float = 0.0 + available_margin: float = 0.0 + used_margin: float = 0.0 + maint_margin: float = 0.0 + + # --- POSITION_UPDATE --- + positions: Tuple[ExchangePosition, ...] = () + + # --- FUNDING_FEE --- + funding_amount: float = 0.0 # positive = received, negative = paid + funding_ts: int = 0 + + # --- Source metadata --- + source: str = "ws" # "ws" | "poll" + raw: Dict = field(default_factory=dict) # original frame (debug only) + + def is_fill(self) -> bool: + return self.kind in {ExchangeEventKind.FULL_FILL, ExchangeEventKind.PARTIAL_FILL} diff --git a/prod/clean_arch/dita_v2/mock_venue.py b/prod/clean_arch/dita_v2/mock_venue.py new file mode 100644 index 0000000..77df1d7 --- /dev/null +++ b/prod/clean_arch/dita_v2/mock_venue.py @@ -0,0 +1,262 @@ +"""Deterministic mock venue for DITAv2 tests.""" + +from __future__ import annotations + +import asyncio +import time +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, AsyncIterator, Dict, List, Optional +import itertools + +from .contracts import ( + KernelCommandType, + KernelEventKind, + KernelIntent, + TradeSide, + VenueEvent, + VenueEventStatus, + VenueOrder, + VenueOrderStatus, +) +from .venue import VenueAdapter + + +@dataclass(frozen=True) +class MockVenueScenario: + """Failure knobs for the mock venue.""" + + reject_entries: bool = False + reject_exits: bool = False + partial_fill_ratio: float = 1.0 + cancel_reject: bool = False + emit_ack_before_fill: bool = True + emit_fill_on_submit: bool = False + entry_partial_fill_ratio: float = 1.0 + exit_partial_fill_ratio: float = 1.0 + + +class MockVenueAdapter(VenueAdapter): + """Scriptable mock venue with BingX-shaped response semantics.""" + + def __init__(self, scenario: Optional[MockVenueScenario] = None): + self.scenario = scenario or MockVenueScenario() + self._order_seq = itertools.count(1) + self._event_seq = itertools.count(1) + self._open_orders: Dict[str, VenueOrder] = {} + self._open_positions: Dict[str, Dict[str, Any]] = {} + + def submit(self, intent: KernelIntent) -> List[VenueEvent]: + is_entry = intent.action == KernelCommandType.ENTER + should_reject = self.scenario.reject_entries if is_entry else self.scenario.reject_exits + order_id = f"V-{next(self._order_seq):08d}" + client_id = f"{intent.trade_id}:{intent.intent_id}" + order = VenueOrder( + internal_trade_id=intent.trade_id, + venue_order_id=order_id, + venue_client_id=client_id, + side=intent.side, + intended_size=float(intent.target_size), + status=VenueOrderStatus.NEW, + metadata={"intent_id": intent.intent_id, "action": intent.action.value, "slot_id": intent.slot_id, "asset": intent.asset}, + ) + if should_reject: + order = VenueOrder( + internal_trade_id=order.internal_trade_id, + venue_order_id=order.venue_order_id, + venue_client_id=order.venue_client_id, + side=order.side, + intended_size=order.intended_size, + filled_size=0.0, + average_fill_price=0.0, + status=VenueOrderStatus.REJECTED, + metadata=dict(order.metadata), + ) + return [self._event_from_order(intent, order, KernelEventKind.ORDER_REJECT, VenueEventStatus.REJECTED, reason="MOCK_REJECT")] + + self._open_orders[order_id] = order + events: List[VenueEvent] = [] + if self.scenario.emit_ack_before_fill or not self.scenario.emit_fill_on_submit: + events.append(self._event_from_order(intent, order, KernelEventKind.ORDER_ACK, VenueEventStatus.ACKED)) + if self.scenario.emit_fill_on_submit or self.scenario.partial_fill_ratio > 0: + if is_entry: + effective_ratio = self.scenario.entry_partial_fill_ratio if self.scenario.entry_partial_fill_ratio != 1.0 else self.scenario.partial_fill_ratio + else: + effective_ratio = self.scenario.exit_partial_fill_ratio if self.scenario.exit_partial_fill_ratio != 1.0 else self.scenario.partial_fill_ratio + fill_ratio = max(0.0, min(1.0, float(effective_ratio))) + fill_size = float(intent.target_size) * fill_ratio + event_kind = KernelEventKind.FULL_FILL if fill_ratio >= 1.0 else KernelEventKind.PARTIAL_FILL + event_status = VenueEventStatus.FILLED if fill_ratio >= 1.0 else VenueEventStatus.PARTIALLY_FILLED + fill_event = self._event_from_order( + intent, + order, + event_kind, + event_status, + price=float(intent.reference_price or 0.0), + fill_size=fill_size, + remaining_size=max(0.0, float(intent.target_size) - fill_size), + ) + events.append(fill_event) + order = VenueOrder( + internal_trade_id=order.internal_trade_id, + venue_order_id=order.venue_order_id, + venue_client_id=order.venue_client_id, + side=order.side, + intended_size=order.intended_size, + filled_size=fill_size, + average_fill_price=float(intent.reference_price or 0.0), + status=VenueOrderStatus.FILLED if fill_ratio >= 1.0 else VenueOrderStatus.PARTIALLY_FILLED, + metadata=dict(order.metadata), + ) + self._open_orders[order_id] = order + return events + + def cancel(self, order: VenueOrder, *, reason: str = "") -> List[VenueEvent]: + if self.scenario.cancel_reject: + return [ + self._event_from_order( + self._dummy_intent(order), + order, + KernelEventKind.CANCEL_REJECT, + VenueEventStatus.CANCELED_REJECTED, + reason=reason or "MOCK_CANCEL_REJECT", + ) + ] + existing = self._open_orders.get(order.venue_order_id, order) + canceled = VenueOrder( + internal_trade_id=existing.internal_trade_id, + venue_order_id=existing.venue_order_id, + venue_client_id=existing.venue_client_id, + side=existing.side, + intended_size=existing.intended_size, + filled_size=existing.filled_size, + average_fill_price=existing.average_fill_price, + status=VenueOrderStatus.CANCELED, + metadata=dict(existing.metadata), + ) + self._open_orders.pop(order.venue_order_id, None) + return [ + self._event_from_order( + self._dummy_intent(order), + canceled, + KernelEventKind.CANCEL_ACK, + VenueEventStatus.CANCELED, + reason=reason or "MOCK_CANCEL_ACK", + ) + ] + + def open_orders(self) -> List[VenueOrder]: + return list(self._open_orders.values()) + + def open_positions(self) -> List[Dict[str, Any]]: + return list(self._open_positions.values()) + + def reconcile(self) -> List[VenueEvent]: + return [] + + def _dummy_intent(self, order: VenueOrder) -> KernelIntent: + return KernelIntent( + timestamp=datetime.now(timezone.utc), + intent_id=order.venue_client_id, + trade_id=order.internal_trade_id, + slot_id=int(order.metadata.get("slot_id", 0)), + asset=str(order.metadata.get("asset", "")), + side=order.side, + action=KernelCommandType.EXIT if order.metadata.get("action") == "EXIT" else KernelCommandType.ENTER, + reference_price=float(order.metadata.get("reference_price", 0.0)), + target_size=float(order.intended_size), + leverage=float(order.metadata.get("leverage", 1.0)), + reason=str(order.metadata.get("reason", "")), + metadata=dict(order.metadata), + ) + + def _event_from_order( + self, + intent: KernelIntent, + order: VenueOrder, + kind: KernelEventKind, + status: VenueEventStatus, + *, + price: Optional[float] = None, + fill_size: float = 0.0, + remaining_size: float = 0.0, + reason: str = "", + ) -> VenueEvent: + event = VenueEvent( + timestamp=datetime.now(timezone.utc), + event_id=f"EV-{next(self._event_seq):08d}", + trade_id=intent.trade_id, + slot_id=intent.slot_id, + kind=kind, + status=status, + venue_order_id=order.venue_order_id, + venue_client_id=order.venue_client_id, + side=order.side, + asset=intent.asset, + price=float(price if price is not None else intent.reference_price or 0.0), + size=float(intent.target_size), + filled_size=float(fill_size), + remaining_size=float(remaining_size), + reason=reason, + raw_payload={ + "status": status.value, + "orderId": order.venue_order_id, + "clientOrderId": order.venue_client_id, + "symbol": intent.asset, + "side": order.side.value, + "action": intent.action.value, + }, + metadata={"intent_id": intent.intent_id, "action": intent.action.value}, + ) + return event + + # ------------------------------------------------------------------ + # Phase 2 stream seam — ExchangeEvent subscribe / account_snapshot + # ------------------------------------------------------------------ + + def queue_exchange_event(self, event: "ExchangeEvent") -> None: # type: ignore[name-defined] + """Enqueue an ExchangeEvent to be yielded by subscribe().""" + self._exchange_event_queue.append(event) + + async def subscribe(self) -> AsyncIterator["ExchangeEvent"]: # type: ignore[name-defined] + """ + Yield pre-programmed ExchangeEvent instances (offline/test mode). + Waits for events to be enqueued via queue_exchange_event(); yields + them in FIFO order. Sleeps 50 ms between polls. + """ + while True: + if self._exchange_event_queue: + yield self._exchange_event_queue.pop(0) + else: + await asyncio.sleep(0.05) + + async def account_snapshot(self) -> "ExchangeEvent": # type: ignore[name-defined] + """Return a synthetic ACCOUNT_UPDATE based on the mock's internal state.""" + from .exchange_event import ExchangeEvent, ExchangeEventKind + return ExchangeEvent( + kind=ExchangeEventKind.ACCOUNT_UPDATE, + event_id=f"mock-snap-{uuid.uuid4().hex[:8]}", + exchange_ts=int(time.time() * 1000), + wallet_balance=self._mock_wallet_balance, + available_margin=self._mock_wallet_balance, + used_margin=0.0, + maint_margin=0.0, + source="poll", + ) + + @property + def _exchange_event_queue(self) -> list: + if not hasattr(self, "_exeq"): + object.__setattr__(self, "_exeq", []) # avoid dataclass collision + return self._exeq # type: ignore[return-value] + + @property + def _mock_wallet_balance(self) -> float: + # Use the account projection's capital if available + proj = getattr(self, "_account_projection", None) + if proj is not None: + snap = getattr(proj, "snapshot", None) + if snap is not None: + return float(getattr(snap, "capital", 0.0)) + return 10_000.0 diff --git a/prod/clean_arch/dita_v2/test_exchange_event_seam_parity.py b/prod/clean_arch/dita_v2/test_exchange_event_seam_parity.py new file mode 100644 index 0000000..59b7b24 --- /dev/null +++ b/prod/clean_arch/dita_v2/test_exchange_event_seam_parity.py @@ -0,0 +1,444 @@ +"""Gate G3: ExchangeEvent seam + mode-parity tests. + +THE SYMMETRY GATE. Proves that the WS adapter path and the REST-poll +path produce identical ExchangeEvent output for the same logical event +sequence. Tests also cover: +- Frame normalisation (ORDER_TRADE_UPDATE, ACCOUNT_UPDATE, FUNDING_FEE) +- ExchangeEvent immutability and field defaults +- BingxUserStream.account_snapshot() synthesiser +- MockVenueAdapter subscribe() / account_snapshot() for offline tests +- Mode-parity: WS-normalised vs poll-synthesised ACCOUNT_UPDATE events + produce identical AccountProjectionV2 snapshots +""" + +from __future__ import annotations + +import asyncio +import json +import sys +import time +import uuid +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest + +from prod.clean_arch.dita_v2.exchange_event import ( + ExchangeEvent, + ExchangeEventKind, + ExchangePosition, +) +from prod.clean_arch.dita_v2.bingx_user_stream import BingxUserStream, _safe_float +from prod.clean_arch.dita_v2.account import AccountProjectionV2, ReconcileConfig + + +# --------------------------------------------------------------------------- +# 1. ExchangeEvent dataclass +# --------------------------------------------------------------------------- + +class TestExchangeEvent: + def test_immutable(self): + ev = ExchangeEvent( + kind=ExchangeEventKind.FULL_FILL, + event_id="e1", + exchange_ts=1_000, + ) + with pytest.raises((AttributeError, TypeError)): + ev.fill_price = 99.0 # frozen dataclass + + def test_defaults_are_zero_or_empty(self): + ev = ExchangeEvent( + kind=ExchangeEventKind.ACCOUNT_UPDATE, + event_id="e2", + exchange_ts=0, + ) + assert ev.fill_price == 0.0 + assert ev.fill_qty == 0.0 + assert ev.wallet_balance == 0.0 + assert ev.positions == () + assert ev.source == "ws" + + def test_is_fill(self): + full = ExchangeEvent(kind=ExchangeEventKind.FULL_FILL, event_id="x", exchange_ts=0) + partial = ExchangeEvent(kind=ExchangeEventKind.PARTIAL_FILL, event_id="y", exchange_ts=0) + other = ExchangeEvent(kind=ExchangeEventKind.ACCOUNT_UPDATE, event_id="z", exchange_ts=0) + assert full.is_fill() + assert partial.is_fill() + assert not other.is_fill() + + def test_positions_tuple(self): + pos = ExchangePosition(symbol="BTC-USDT", qty=0.1, side="LONG") + ev = ExchangeEvent( + kind=ExchangeEventKind.POSITION_UPDATE, + event_id="p", + exchange_ts=0, + positions=(pos,), + ) + assert len(ev.positions) == 1 + assert ev.positions[0].symbol == "BTC-USDT" + + +# --------------------------------------------------------------------------- +# 2. BingxUserStream frame normalisation (no network — pure unit) +# --------------------------------------------------------------------------- + +class TestFrameNormalisation: + """Tests _normalise() directly; no WS connection needed.""" + + def _stream(self) -> BingxUserStream: + return BingxUserStream( + http_client=object(), + ws_base_url="wss://example.invalid", + ) + + def test_full_fill(self): + stream = self._stream() + frame = { + "e": "ORDER_TRADE_UPDATE", + "E": 1_700_000_000_000, + "o": { + "s": "BTC-USDT", + "i": "99001", + "c": "cli-1", + "X": "FILLED", + "L": "50000.0", # last fill price + "l": "0.1", # last fill qty (incremental) + "n": "2.5", # fee + "N": "USDT", + "rp": "150.0", # realized PnL + }, + } + ev = stream._normalise(frame) + assert ev is not None + assert ev.kind == ExchangeEventKind.FULL_FILL + assert ev.fill_price == pytest.approx(50_000.0) + assert ev.fill_qty == pytest.approx(0.1) + assert ev.fee == pytest.approx(2.5) + assert ev.realized_pnl == pytest.approx(150.0) + assert ev.symbol == "BTC-USDT" + assert ev.source == "ws" + + def test_partial_fill(self): + stream = self._stream() + frame = { + "e": "ORDER_TRADE_UPDATE", + "E": 1_700_000_000_001, + "o": {"s": "ETH-USDT", "i": "99002", "X": "PARTIALLY_FILLED", "l": "0.5"}, + } + ev = stream._normalise(frame) + assert ev.kind == ExchangeEventKind.PARTIAL_FILL + assert ev.fill_qty == pytest.approx(0.5) + + def test_order_ack(self): + stream = self._stream() + frame = {"e": "ORDER_TRADE_UPDATE", "E": 0, "o": {"X": "NEW", "i": "1", "s": "X"}} + ev = stream._normalise(frame) + assert ev.kind == ExchangeEventKind.ORDER_ACK + + def test_cancel_ack(self): + stream = self._stream() + frame = {"e": "ORDER_TRADE_UPDATE", "E": 0, "o": {"X": "CANCELED", "i": "2", "s": "X"}} + ev = stream._normalise(frame) + assert ev.kind == ExchangeEventKind.CANCEL_ACK + + def test_account_update_balances(self): + stream = self._stream() + frame = { + "e": "ACCOUNT_UPDATE", + "E": 1_700_000_000_002, + "B": [{"a": "USDT", "wb": "9950.0", "cw": "9200.0"}], + "P": [], + } + ev = stream._normalise(frame) + assert ev.kind == ExchangeEventKind.ACCOUNT_UPDATE + assert ev.wallet_balance == pytest.approx(9_950.0) + assert ev.available_margin == pytest.approx(9_200.0) + + def test_account_update_positions(self): + stream = self._stream() + frame = { + "e": "ACCOUNT_UPDATE", + "E": 0, + "B": [], + "P": [{"s": "BTC-USDT", "pa": "0.1", "ep": "50000", "up": "100"}], + } + ev = stream._normalise(frame) + assert len(ev.positions) == 1 + assert ev.positions[0].symbol == "BTC-USDT" + assert ev.positions[0].qty == pytest.approx(0.1) + + def test_account_update_zero_position_excluded(self): + stream = self._stream() + frame = { + "e": "ACCOUNT_UPDATE", "E": 0, "B": [], + "P": [{"s": "BTC-USDT", "pa": "0.0", "ep": "50000"}], + } + ev = stream._normalise(frame) + assert len(ev.positions) == 0 + + def test_funding_fee(self): + stream = self._stream() + frame = { + "e": "FUNDING_FEE", + "E": 1_700_000_000_003, + "fs": {"s": "BTC-USDT", "fa": "-1.25", "a": "USDT"}, + } + ev = stream._normalise(frame) + assert ev.kind == ExchangeEventKind.FUNDING_FEE + assert ev.funding_amount == pytest.approx(-1.25) + assert ev.symbol == "BTC-USDT" + + def test_snapshot_unknown(self): + stream = self._stream() + frame = {"e": "SNAPSHOT", "E": 0, "ac": {"s": "MTL-USDT"}} + ev = stream._normalise(frame) + assert ev.kind == ExchangeEventKind.UNKNOWN + + def test_gzip_decode(self): + import gzip as _gz + import aiohttp + raw_bytes = _gz.compress(b'{"e":"SNAPSHOT","E":0,"ac":{}}') + + class _Msg: + type = aiohttp.WSMsgType.BINARY + data = raw_bytes + + text = BingxUserStream._decode(_Msg()) + assert text is not None + assert '"SNAPSHOT"' in text + + +# --------------------------------------------------------------------------- +# 3. Safe-float helper +# --------------------------------------------------------------------------- + +class TestSafeFloat: + def test_normal(self): + assert _safe_float("1.5") == pytest.approx(1.5) + + def test_none_gives_default(self): + assert _safe_float(None) == 0.0 + + def test_inf_gives_default(self): + assert _safe_float(float("inf")) == 0.0 + + def test_nan_gives_default(self): + import math + assert _safe_float(float("nan")) == 0.0 + + def test_empty_string_gives_default(self): + assert _safe_float("") == 0.0 + + +# --------------------------------------------------------------------------- +# 4. MockVenueAdapter subscribe / account_snapshot +# --------------------------------------------------------------------------- + +class TestMockSubscribe: + def _make_mock(self): + from prod.clean_arch.dita_v2.mock_venue import MockVenueAdapter, MockVenueScenario + return MockVenueAdapter(scenario=MockVenueScenario()) + + def test_account_snapshot_is_account_update(self): + mock = self._make_mock() + snap = asyncio.get_event_loop().run_until_complete(mock.account_snapshot()) + assert snap.kind == ExchangeEventKind.ACCOUNT_UPDATE + assert snap.source == "poll" + assert snap.wallet_balance >= 0.0 + + def test_queue_and_subscribe(self): + mock = self._make_mock() + ev = ExchangeEvent( + kind=ExchangeEventKind.FULL_FILL, + event_id="q1", + exchange_ts=0, + fill_price=100.0, + fill_qty=1.0, + ) + mock.queue_exchange_event(ev) + + received = [] + async def _collect(): + async for event in mock.subscribe(): + received.append(event) + break # take one + asyncio.get_event_loop().run_until_complete(asyncio.wait_for(_collect(), timeout=2.0)) + assert len(received) == 1 + assert received[0].fill_price == pytest.approx(100.0) + + +# --------------------------------------------------------------------------- +# 5. MODE-PARITY — THE SYMMETRY GATE +# +# The same logical account state (wallet_balance=9800, one position) must +# produce an IDENTICAL AccountProjectionV2 snapshot regardless of whether +# the information arrived via the WS path (frame normalised by +# BingxUserStream._normalise) or the REST-poll path (account_snapshot()). +# --------------------------------------------------------------------------- + +class TestModeParity: + """ + Feed the same logical event sequence via: + (a) WS-normalised ExchangeEvent (source="ws") + (b) Poll-synthesised ExchangeEvent (source="poll") + + Apply both to fresh AccountProjectionV2 instances. + Assert that k.capital, k.fees_paid, reconcile.status are identical. + """ + + def _make_proj(self) -> AccountProjectionV2: + return AccountProjectionV2( + seed_capital=10_000.0, + reconcile_config=ReconcileConfig(capital_epsilon=0.01, pending_fee_bound=10.0), + ) + + def _apply_fill_event(self, proj: AccountProjectionV2, ev: ExchangeEvent) -> None: + if ev.is_fill(): + proj.apply_fill( + fill_price=ev.fill_price, + fill_qty=ev.fill_qty, + fee=ev.fee, + realized_pnl=ev.realized_pnl, + ) + + def _apply_account_event(self, proj: AccountProjectionV2, ev: ExchangeEvent) -> None: + if ev.kind == ExchangeEventKind.ACCOUNT_UPDATE: + proj.apply_balance_update( + wallet_balance=ev.wallet_balance, + available_margin=ev.available_margin, + used_margin=ev.used_margin, + maint_margin=ev.maint_margin, + ) + if ev.positions: + from prod.clean_arch.dita_v2.account import EPosition + proj.apply_position_update([ + EPosition( + symbol=p.symbol, qty=p.qty, entry_price=p.entry_price, + unrealized_pnl=p.unrealized_pnl, leverage=p.leverage, side=p.side, + ) + for p in ev.positions + ]) + + def test_fill_mode_parity(self): + """A FULL_FILL arriving via WS and via poll must yield same k.capital.""" + stream = BingxUserStream(http_client=object(), ws_base_url="wss://x") + + # WS path: frame → normalise → apply + ws_frame = { + "e": "ORDER_TRADE_UPDATE", "E": 1_000, + "o": {"s": "BTC-USDT", "i": "1", "X": "FILLED", + "L": "50000", "l": "0.1", "n": "2.5", "rp": "100.0"}, + } + ws_event = stream._normalise(ws_frame) + proj_ws = self._make_proj() + self._apply_fill_event(proj_ws, ws_event) + snap_ws = proj_ws.build_snapshot("ws_fill", [], ts=1.0) + + # Poll path: synthetic event with same numbers + poll_event = ExchangeEvent( + kind=ExchangeEventKind.FULL_FILL, + event_id="poll-1", + exchange_ts=1_000, + fill_price=50_000.0, + fill_qty=0.1, + fee=2.5, + realized_pnl=100.0, + source="poll", + ) + proj_poll = self._make_proj() + self._apply_fill_event(proj_poll, poll_event) + snap_poll = proj_poll.build_snapshot("poll_fill", [], ts=1.0) + + # PARITY CHECK + assert snap_ws.k.capital == pytest.approx(snap_poll.k.capital, rel=1e-9) + assert snap_ws.k.fees_paid == pytest.approx(snap_poll.k.fees_paid, rel=1e-9) + assert snap_ws.k.realized_pnl == pytest.approx(snap_poll.k.realized_pnl, rel=1e-9) + + def test_account_update_mode_parity(self): + """An ACCOUNT_UPDATE from WS and from poll must produce the same e-facts.""" + stream = BingxUserStream(http_client=object(), ws_base_url="wss://x") + + ws_frame = { + "e": "ACCOUNT_UPDATE", "E": 2_000, + "B": [{"a": "USDT", "wb": "9800.0", "cw": "9000.0"}], + "P": [], + } + ws_event = stream._normalise(ws_frame) + proj_ws = self._make_proj() + self._apply_account_event(proj_ws, ws_event) + snap_ws = proj_ws.build_snapshot("ws_acct", [], ts=2.0) + + poll_event = ExchangeEvent( + kind=ExchangeEventKind.ACCOUNT_UPDATE, + event_id="poll-2", + exchange_ts=2_000, + wallet_balance=9_800.0, + available_margin=9_000.0, + source="poll", + ) + proj_poll = self._make_proj() + self._apply_account_event(proj_poll, poll_event) + snap_poll = proj_poll.build_snapshot("poll_acct", [], ts=2.0) + + assert snap_ws.e.wallet_balance == pytest.approx(snap_poll.e.wallet_balance) + assert snap_ws.e.available_margin == pytest.approx(snap_poll.e.available_margin) + + def test_funding_mode_parity(self): + """A FUNDING_FEE via WS and via poll must fold identically into k_capital.""" + stream = BingxUserStream(http_client=object(), ws_base_url="wss://x") + ws_frame = {"e": "FUNDING_FEE", "E": 3_000, "fs": {"s": "BTC-USDT", "fa": "-3.75"}} + ws_event = stream._normalise(ws_frame) + + proj_ws = self._make_proj() + if ws_event.kind == ExchangeEventKind.FUNDING_FEE: + proj_ws.apply_funding(ws_event.funding_amount) + snap_ws = proj_ws.build_snapshot("ws_fund", [], ts=3.0) + + proj_poll = self._make_proj() + proj_poll.apply_funding(-3.75) + snap_poll = proj_poll.build_snapshot("poll_fund", [], ts=3.0) + + assert snap_ws.k.capital == pytest.approx(snap_poll.k.capital, rel=1e-9) + assert snap_ws.k.funding_paid == pytest.approx(snap_poll.k.funding_paid, rel=1e-9) + + def test_sequential_events_mode_parity(self): + """ + Full sequence: FILL → ACCOUNT_UPDATE → FUNDING_FEE + applied via both paths must yield identical final k_capital. + """ + stream = BingxUserStream(http_client=object(), ws_base_url="wss://x") + + ws_frames = [ + {"e": "ORDER_TRADE_UPDATE", "E": 1, "o": { + "s": "BTC-USDT", "i": "10", "X": "FILLED", + "L": "60000", "l": "0.05", "n": "1.8", "rp": "200.0"}}, + {"e": "ACCOUNT_UPDATE", "E": 2, + "B": [{"a": "USDT", "wb": "10198.2", "cw": "10198.2"}], "P": []}, + {"e": "FUNDING_FEE", "E": 3, "fs": {"s": "BTC-USDT", "fa": "-0.5"}}, + ] + proj_ws = self._make_proj() + for f in ws_frames: + ev = stream._normalise(f) + if ev.is_fill(): + self._apply_fill_event(proj_ws, ev) + elif ev.kind == ExchangeEventKind.ACCOUNT_UPDATE: + self._apply_account_event(proj_ws, ev) + elif ev.kind == ExchangeEventKind.FUNDING_FEE: + proj_ws.apply_funding(ev.funding_amount) + snap_ws = proj_ws.build_snapshot("ws_seq", [], ts=10.0) + + # Poll path: same values, synthetic events + proj_poll = self._make_proj() + proj_poll.apply_fill(fill_price=60_000.0, fill_qty=0.05, fee=1.8, realized_pnl=200.0) + proj_poll.apply_balance_update( + wallet_balance=10_198.2, available_margin=10_198.2, + used_margin=0.0, maint_margin=0.0, + ) + proj_poll.apply_funding(-0.5) + snap_poll = proj_poll.build_snapshot("poll_seq", [], ts=10.0) + + # PARITY + assert snap_ws.k.capital == pytest.approx(snap_poll.k.capital, rel=1e-9) + assert snap_ws.k.fees_paid == pytest.approx(snap_poll.k.fees_paid, rel=1e-9) + assert snap_ws.k.realized_pnl == pytest.approx(snap_poll.k.realized_pnl, rel=1e-9) + assert snap_ws.k.funding_paid == pytest.approx(snap_poll.k.funding_paid, rel=1e-9) + assert snap_ws.reconcile.status == snap_poll.reconcile.status diff --git a/prod/clean_arch/dita_v2/venue.py b/prod/clean_arch/dita_v2/venue.py new file mode 100644 index 0000000..4e66878 --- /dev/null +++ b/prod/clean_arch/dita_v2/venue.py @@ -0,0 +1,60 @@ +"""Venue adapter contracts for DITAv2.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, AsyncIterator, Dict, List, Optional, Protocol + +from .contracts import ( + KernelCommandType, + KernelIntent, + KernelEventKind, + TradeSide, + VenueEvent, + VenueEventStatus, + VenueOrder, + VenueOrderStatus, +) +from .exchange_event import ExchangeEvent + + +class VenueAdapter(Protocol): + """Abstract venue adapter used by the kernel.""" + + def submit(self, intent: KernelIntent) -> List[VenueEvent]: + ... + + def cancel(self, order: VenueOrder, *, reason: str = "") -> List[VenueEvent]: + ... + + def open_orders(self) -> List[VenueOrder]: + ... + + def open_positions(self) -> List[Dict[str, Any]]: + ... + + def reconcile(self) -> List[VenueEvent]: + ... + + # ------------------------------------------------------------------ + # Phase 2 — stream seam (spec G3) + # ------------------------------------------------------------------ + + async def subscribe(self) -> AsyncIterator[ExchangeEvent]: + """ + Yield ExchangeEvent instances in arrival order. Implementations + must handle reconnection, keepalive, and 24h rotation internally. + The iterator never terminates normally — callers cancel it on + shutdown. Both the WS and poll-failover paths implement this + interface so the kernel layer is source-agnostic. + """ + ... # pragma: no cover + + async def account_snapshot(self) -> ExchangeEvent: + """ + Return a single ACCOUNT_UPDATE + POSITION_UPDATE merged event + by calling the exchange REST API. Used for gap-backfill on + reconnect and as the poll-failover path. + """ + ... # pragma: no cover