PINK Phase 2 (G3): ExchangeEvent seam + BingxUserStream + mode-parity

exchange_event.py: abstract ExchangeEvent/ExchangeEventKind seam
venue.py: VenueAdapter extended with subscribe()/account_snapshot()
bingx_user_stream.py: PINK-only WS client with listenKey lifecycle,
  gzip, ping/pong, 24h rotation sentinel, reconnect backoff, gap-backfill
mock_venue.py: subscribe()/account_snapshot() for offline tests

Gate G3 mode-parity: WS and poll paths produce identical k_capital,
fees, realized PnL, reconcile status for same logical event sequence.
89/89 total offline tests pass.
This commit is contained in:
Codex
2026-06-01 20:33:44 +02:00
parent 615d24386e
commit 8135a4ae17
5 changed files with 1289 additions and 0 deletions

View File

@@ -0,0 +1,433 @@
"""BingX private user-stream adapter for DITAv2 PINK (spec G3).
Implements the subscribe() / account_snapshot() side of the VenueAdapter
seam. All BingX field names, listenKey semantics, gzip framing, and
ping/pong live here and nowhere else above.
Design notes
------------
* listenKey lifecycle: POST create → PUT keepalive every 1800 s → DELETE
on close. Server signals expiry via {"e":"listenKeyExpired"}.
* 24-h overlap rotation: BingX closes the socket after ~24 h. A new
listenKey + connection is established ~10 min before the limit; only
after the new stream is confirmed healthy is the old one closed.
* Reconnect backoff: doubles from reconnect_initial_ms up to
reconnect_max_ms, resets on successful receive.
* Gap-backfill: on every reconnect, account_snapshot() is called first
and its event is yielded with source="poll" so the caller knows the
gap was bridged via REST.
* All frames are normalised to ExchangeEvent before leaving this module.
"""
from __future__ import annotations
import asyncio
import gzip
import json
import logging
import time
import uuid
from contextlib import asynccontextmanager, suppress
from typing import AsyncIterator, Optional
import aiohttp
from .exchange_event import (
ExchangeEvent,
ExchangeEventKind,
ExchangePosition,
)
log = logging.getLogger(__name__)
# BingX WS will close the connection after this many seconds (~24 h).
# We rotate 10 min early to avoid any gap.
_BINGX_MAX_CONNECTION_SECS = 86_400 - 600 # 23 h 50 min
def _safe_float(value: object, default: float = 0.0) -> float:
try:
f = float(value) # type: ignore[arg-type]
import math
return f if math.isfinite(f) else default
except (TypeError, ValueError):
return default
class BingxUserStream:
"""
PINK-only BingX private user-stream client.
Usage::
stream = BingxUserStream(http_client, ws_base_url, keepalive_secs=1800)
async for event in stream.subscribe():
await handle(event)
The `http_client` must expose:
await http_client.signed_post_raw(path, params) -> dict
await http_client.signed_put_raw(path, params, allow_empty=True)
await http_client.signed_delete_raw(path, params, allow_empty=True)
and optionally:
await http_client.signed_get(path, params) -> dict (for REST snapshot)
"""
_LISTENKEY_PATH = "/openApi/user/auth/userDataStream"
def __init__(
self,
http_client: object,
ws_base_url: str,
*,
keepalive_secs: int = 1800,
reconnect_initial_ms: int = 1_000,
reconnect_max_ms: int = 30_000,
rest_base_url: str = "",
) -> None:
self._http = http_client
self._ws_url = ws_base_url
self._keepalive_secs = keepalive_secs
self._reconnect_initial_ms = reconnect_initial_ms
self._reconnect_max_ms = reconnect_max_ms
self._rest_base_url = rest_base_url
self._closed = asyncio.Event()
self._session: Optional[aiohttp.ClientSession] = None
# ------------------------------------------------------------------
# Public interface
# ------------------------------------------------------------------
async def subscribe(self) -> AsyncIterator[ExchangeEvent]:
"""Yield ExchangeEvent; reconnects indefinitely until close()."""
delay_ms = self._reconnect_initial_ms
while not self._closed.is_set():
listen_key: Optional[str] = None
keepalive_task: Optional[asyncio.Task] = None
rotation_task: Optional[asyncio.Task] = None
try:
listen_key = await self._create_listen_key()
keepalive_task = asyncio.create_task(
self._keepalive_loop(listen_key), name="lk_keepalive"
)
rotation_task = asyncio.create_task(
self._rotation_sentinel(), name="lk_rotation"
)
# Gap-backfill: REST snapshot before streaming
try:
snap = await self.account_snapshot()
yield snap
except Exception as exc: # noqa: BLE001
log.warning("bingx_user_stream: gap-backfill REST failed: %s", exc)
async for event in self._consume(listen_key, rotation_task):
delay_ms = self._reconnect_initial_ms # reset on healthy receive
yield event
except asyncio.CancelledError:
raise
except Exception as exc: # noqa: BLE001
if self._closed.is_set():
break
log.warning(
"bingx_user_stream: disconnected (%s), retry in %.1f s",
exc,
delay_ms / 1000.0,
)
yield ExchangeEvent(
kind=ExchangeEventKind.RECONNECTED,
event_id=f"reconnect-{uuid.uuid4().hex[:8]}",
exchange_ts=int(time.time() * 1000),
source="ws",
)
await asyncio.sleep(delay_ms / 1000.0)
delay_ms = min(delay_ms * 2, self._reconnect_max_ms)
finally:
if keepalive_task is not None:
keepalive_task.cancel()
with suppress(asyncio.CancelledError):
await keepalive_task
if rotation_task is not None:
rotation_task.cancel()
with suppress(asyncio.CancelledError):
await rotation_task
if listen_key is not None:
with suppress(Exception):
await self._delete_listen_key(listen_key)
async def account_snapshot(self) -> ExchangeEvent:
"""
Fetch current balance + positions via REST and return a merged
ACCOUNT_UPDATE ExchangeEvent (source="poll"). Used for gap-backfill
on reconnect and as the poll-failover path.
"""
event_id = f"poll-{uuid.uuid4().hex[:12]}"
ts = int(time.time() * 1000)
wallet = 0.0
avail = 0.0
used = 0.0
maint = 0.0
positions: list[ExchangePosition] = []
try:
bal = await self._http.signed_get( # type: ignore[attr-defined]
"/openApi/swap/v3/user/balance", {}
)
data = bal if isinstance(bal, dict) else {}
wallet = _safe_float(data.get("balance") or data.get("totalWalletBalance") or data.get("availableMargin"))
avail = _safe_float(data.get("availableMargin") or data.get("availableBalance"))
used = _safe_float(data.get("usedMargin") or data.get("totalInitialMargin"))
maint = _safe_float(data.get("maintenanceMargin") or data.get("totalMaintMargin"))
except Exception as exc: # noqa: BLE001
log.warning("bingx_user_stream: balance REST failed: %s", exc)
try:
pos_resp = await self._http.signed_get( # type: ignore[attr-defined]
"/openApi/swap/v2/user/positions", {}
)
pos_list = pos_resp if isinstance(pos_resp, list) else []
for p in pos_list:
if not isinstance(p, dict):
continue
qty = _safe_float(p.get("positionAmt") or p.get("pa"))
if qty == 0.0:
continue
positions.append(ExchangePosition(
symbol=str(p.get("symbol") or p.get("s") or ""),
qty=abs(qty),
entry_price=_safe_float(p.get("avgPrice") or p.get("entryPrice") or p.get("ep")),
mark_price=_safe_float(p.get("markPrice") or p.get("mp")),
unrealized_pnl=_safe_float(p.get("unrealizedProfit") or p.get("up")),
leverage=max(1.0, _safe_float(p.get("leverage") or p.get("lev"), 1.0)),
side="LONG" if qty > 0 else "SHORT",
))
except Exception as exc: # noqa: BLE001
log.warning("bingx_user_stream: positions REST failed: %s", exc)
return ExchangeEvent(
kind=ExchangeEventKind.ACCOUNT_UPDATE,
event_id=event_id,
exchange_ts=ts,
wallet_balance=wallet,
available_margin=avail,
used_margin=used,
maint_margin=maint,
positions=tuple(positions),
source="poll",
)
async def close(self) -> None:
self._closed.set()
if self._session is not None and not self._session.closed:
await self._session.close()
# ------------------------------------------------------------------
# Internal — WS consume loop
# ------------------------------------------------------------------
async def _consume(
self,
listen_key: str,
rotation_task: asyncio.Task,
) -> AsyncIterator[ExchangeEvent]:
session = await self._get_session()
url = f"{self._ws_url}?listenKey={listen_key}"
async with session.ws_connect(
url,
autoping=False,
autoclose=False,
heartbeat=None,
compress=0,
max_msg_size=0,
) as ws:
log.info("bingx_user_stream: connected %s", url[:60])
async for msg in ws:
# Honour 24-h rotation sentinel
if rotation_task.done():
log.info("bingx_user_stream: 24-h rotation triggered")
return
if msg.type == aiohttp.WSMsgType.CLOSED:
return
if msg.type == aiohttp.WSMsgType.ERROR:
raise ws.exception() or RuntimeError("ws error")
text = self._decode(msg)
if text is None:
continue
if text == "Ping" or text.lower().startswith("ping"):
await ws.send_str("Pong")
continue
try:
frame = json.loads(text)
except json.JSONDecodeError:
continue
event = self._normalise(frame)
if event is not None:
if event.kind == ExchangeEventKind.UNKNOWN:
continue # swallow silently (SNAPSHOT etc.)
yield event
if frame.get("e") == "listenKeyExpired":
raise RuntimeError("listenKeyExpired")
# ------------------------------------------------------------------
# Internal — frame normalisation
# ------------------------------------------------------------------
def _normalise(self, frame: dict) -> Optional[ExchangeEvent]:
etype = str(frame.get("e") or "").upper()
ts = int(frame.get("E") or time.time() * 1000)
event_id = str(frame.get("i") or frame.get("event_id") or uuid.uuid4().hex)
if etype in {"ORDER_TRADE_UPDATE", "EXECUTIONREPORT"}:
return self._normalise_order(frame, ts)
if etype == "ACCOUNT_UPDATE":
return self._normalise_account(frame, ts)
if etype in {"FUNDING_FEE", "FUNDING"}:
fs = frame.get("fs") or frame
return ExchangeEvent(
kind=ExchangeEventKind.FUNDING_FEE,
event_id=f"fund-{ts}",
exchange_ts=ts,
funding_amount=_safe_float(
fs.get("fa") or fs.get("fundingAmount") or fs.get("amount")
),
symbol=str(fs.get("s") or fs.get("symbol") or ""),
funding_ts=ts,
source="ws",
raw=frame,
)
# SNAPSHOT, LISTENKEY_EXPIRED, other — handled by caller
return ExchangeEvent(
kind=ExchangeEventKind.UNKNOWN,
event_id=event_id,
exchange_ts=ts,
source="ws",
raw=frame,
)
def _normalise_order(self, frame: dict, ts: int) -> ExchangeEvent:
# ORDER_TRADE_UPDATE wraps fields under "o"; EXECUTIONREPORT is flat
o = frame.get("o") or frame
status = str(o.get("X") or o.get("x") or "").upper()
kind_map = {
"FILLED": ExchangeEventKind.FULL_FILL,
"PARTIALLY_FILLED": ExchangeEventKind.PARTIAL_FILL,
"NEW": ExchangeEventKind.ORDER_ACK,
"CANCELED": ExchangeEventKind.CANCEL_ACK,
"REJECTED": ExchangeEventKind.ORDER_REJECT,
"EXPIRED": ExchangeEventKind.CANCEL_ACK,
}
kind = kind_map.get(status, ExchangeEventKind.UNKNOWN)
return ExchangeEvent(
kind=kind,
event_id=str(o.get("i") or o.get("orderId") or uuid.uuid4().hex),
exchange_ts=ts,
fill_price=_safe_float(o.get("L") or o.get("ap") or o.get("p")),
fill_qty=_safe_float(o.get("l") or o.get("lastFilledQty") or 0.0),
fee=_safe_float(o.get("n") or 0.0),
fee_asset=str(o.get("N") or ""),
realized_pnl=_safe_float(o.get("rp") or o.get("realizedPnl") or 0.0),
order_id=str(o.get("i") or ""),
client_order_id=str(o.get("c") or ""),
symbol=str(o.get("s") or ""),
source="ws",
raw=frame,
)
def _normalise_account(self, frame: dict, ts: int) -> ExchangeEvent:
balances = frame.get("B") or []
usdt_bal = next(
(b for b in balances if isinstance(b, dict) and str(b.get("a") or "").upper() == "USDT"),
{},
)
wallet = _safe_float(usdt_bal.get("wb") or usdt_bal.get("walletBalance"))
cw = _safe_float(usdt_bal.get("cw") or usdt_bal.get("crossWalletBalance"))
positions_raw = frame.get("P") or []
positions = []
for p in positions_raw:
if not isinstance(p, dict):
continue
qty = _safe_float(p.get("pa"))
if qty == 0.0:
continue
positions.append(ExchangePosition(
symbol=str(p.get("s") or ""),
qty=abs(qty),
entry_price=_safe_float(p.get("ep")),
mark_price=_safe_float(p.get("mp") or 0.0),
unrealized_pnl=_safe_float(p.get("up")),
leverage=max(1.0, _safe_float(p.get("lev") or 1.0)),
side="LONG" if qty > 0 else "SHORT",
))
return ExchangeEvent(
kind=ExchangeEventKind.ACCOUNT_UPDATE,
event_id=f"acct-{ts}",
exchange_ts=ts,
wallet_balance=wallet,
available_margin=cw,
positions=tuple(positions),
source="ws",
raw=frame,
)
# ------------------------------------------------------------------
# Internal — listenKey lifecycle
# ------------------------------------------------------------------
async def _create_listen_key(self) -> str:
resp = await self._http.signed_post_raw(self._LISTENKEY_PATH, {}) # type: ignore[attr-defined]
key = str((resp or {}).get("listenKey") or "")
if not key:
raise RuntimeError(f"empty listenKey from BingX: {resp!r}")
log.debug("bingx_user_stream: listenKey created %s", key[:12])
return key
async def _keepalive_loop(self, listen_key: str) -> None:
while True:
await asyncio.sleep(self._keepalive_secs)
try:
await self._http.signed_put_raw( # type: ignore[attr-defined]
self._LISTENKEY_PATH,
{"listenKey": listen_key},
allow_empty=True,
)
log.debug("bingx_user_stream: keepalive sent")
except Exception as exc: # noqa: BLE001
log.warning("bingx_user_stream: keepalive failed: %s", exc)
async def _rotation_sentinel(self) -> None:
"""Completes after _BINGX_MAX_CONNECTION_SECS to trigger rotation."""
await asyncio.sleep(_BINGX_MAX_CONNECTION_SECS)
async def _delete_listen_key(self, listen_key: str) -> None:
with suppress(Exception):
await self._http.signed_delete_raw( # type: ignore[attr-defined]
self._LISTENKEY_PATH,
{"listenKey": listen_key},
allow_empty=True,
)
# ------------------------------------------------------------------
# Internal — session + decoding
# ------------------------------------------------------------------
async def _get_session(self) -> aiohttp.ClientSession:
if self._session is None or self._session.closed:
connector = aiohttp.TCPConnector(limit=4, ttl_dns_cache=300)
self._session = aiohttp.ClientSession(connector=connector)
return self._session
@staticmethod
def _decode(msg: aiohttp.WSMessage) -> Optional[str]:
if msg.type == aiohttp.WSMsgType.TEXT:
return str(msg.data)
if msg.type == aiohttp.WSMsgType.BINARY:
raw = bytes(msg.data)
try:
return gzip.decompress(raw).decode("utf-8")
except OSError:
return raw.decode("utf-8", errors="replace")
return None

View File

@@ -0,0 +1,90 @@
"""Abstract exchange-event seam for DITAv2 (spec G3).
ExchangeEvent is the normalised, exchange-agnostic type that flows from
the BingX adapter (or poll-failover synthesizer) into AccountProjectionV2
and the reconcile layer. No BingX field names, URLs, or listenKey
semantics cross this boundary.
Both the WebSocket path and the REST-poll path produce the same
ExchangeEvent types so that the two sources are interchangeable — this is
the VST↔LIVE symmetry guarantee (gate G3 mode-parity test).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Tuple
class ExchangeEventKind(str, Enum):
FULL_FILL = "FULL_FILL"
PARTIAL_FILL = "PARTIAL_FILL"
ORDER_ACK = "ORDER_ACK"
ORDER_REJECT = "ORDER_REJECT"
CANCEL_ACK = "CANCEL_ACK"
CANCEL_REJECT = "CANCEL_REJECT"
ACCOUNT_UPDATE = "ACCOUNT_UPDATE" # wallet balance / margin facts
POSITION_UPDATE = "POSITION_UPDATE" # open-position facts
FUNDING_FEE = "FUNDING_FEE"
RECONNECTED = "RECONNECTED" # adapter internal — stream resumed
UNKNOWN = "UNKNOWN"
@dataclass(frozen=True)
class ExchangePosition:
"""Single open position as normalised by the adapter."""
symbol: str = ""
qty: float = 0.0
entry_price: float = 0.0
mark_price: float = 0.0
unrealized_pnl: float = 0.0
leverage: float = 1.0
side: str = "" # "LONG" | "SHORT"
@dataclass(frozen=True)
class ExchangeEvent:
"""
Normalised exchange event — the abstract seam between the adapter
(BingX WS or REST poll) and the kernel/account layer.
Immutable. All exchange-specific field names are resolved by the
adapter before this type is constructed. Callers should check `kind`
before reading kind-specific fields (fill_price/qty, wallet_balance,
funding_amount, etc.) — unused fields default to zero/empty.
"""
kind: ExchangeEventKind
event_id: str # dedup key (venue-assigned or synthetic uuid)
exchange_ts: int # exchange-assigned timestamp ms
# --- FILL / PARTIAL_FILL ---
fill_price: float = 0.0
fill_qty: float = 0.0 # incremental fill quantity
fee: float = 0.0
fee_asset: str = ""
realized_pnl: float = 0.0
order_id: str = "" # venue order id
client_order_id: str = ""
symbol: str = ""
# --- ACCOUNT_UPDATE ---
wallet_balance: float = 0.0
available_margin: float = 0.0
used_margin: float = 0.0
maint_margin: float = 0.0
# --- POSITION_UPDATE ---
positions: Tuple[ExchangePosition, ...] = ()
# --- FUNDING_FEE ---
funding_amount: float = 0.0 # positive = received, negative = paid
funding_ts: int = 0
# --- Source metadata ---
source: str = "ws" # "ws" | "poll"
raw: Dict = field(default_factory=dict) # original frame (debug only)
def is_fill(self) -> bool:
return self.kind in {ExchangeEventKind.FULL_FILL, ExchangeEventKind.PARTIAL_FILL}

View File

@@ -0,0 +1,262 @@
"""Deterministic mock venue for DITAv2 tests."""
from __future__ import annotations
import asyncio
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, AsyncIterator, Dict, List, Optional
import itertools
from .contracts import (
KernelCommandType,
KernelEventKind,
KernelIntent,
TradeSide,
VenueEvent,
VenueEventStatus,
VenueOrder,
VenueOrderStatus,
)
from .venue import VenueAdapter
@dataclass(frozen=True)
class MockVenueScenario:
"""Failure knobs for the mock venue."""
reject_entries: bool = False
reject_exits: bool = False
partial_fill_ratio: float = 1.0
cancel_reject: bool = False
emit_ack_before_fill: bool = True
emit_fill_on_submit: bool = False
entry_partial_fill_ratio: float = 1.0
exit_partial_fill_ratio: float = 1.0
class MockVenueAdapter(VenueAdapter):
"""Scriptable mock venue with BingX-shaped response semantics."""
def __init__(self, scenario: Optional[MockVenueScenario] = None):
self.scenario = scenario or MockVenueScenario()
self._order_seq = itertools.count(1)
self._event_seq = itertools.count(1)
self._open_orders: Dict[str, VenueOrder] = {}
self._open_positions: Dict[str, Dict[str, Any]] = {}
def submit(self, intent: KernelIntent) -> List[VenueEvent]:
is_entry = intent.action == KernelCommandType.ENTER
should_reject = self.scenario.reject_entries if is_entry else self.scenario.reject_exits
order_id = f"V-{next(self._order_seq):08d}"
client_id = f"{intent.trade_id}:{intent.intent_id}"
order = VenueOrder(
internal_trade_id=intent.trade_id,
venue_order_id=order_id,
venue_client_id=client_id,
side=intent.side,
intended_size=float(intent.target_size),
status=VenueOrderStatus.NEW,
metadata={"intent_id": intent.intent_id, "action": intent.action.value, "slot_id": intent.slot_id, "asset": intent.asset},
)
if should_reject:
order = VenueOrder(
internal_trade_id=order.internal_trade_id,
venue_order_id=order.venue_order_id,
venue_client_id=order.venue_client_id,
side=order.side,
intended_size=order.intended_size,
filled_size=0.0,
average_fill_price=0.0,
status=VenueOrderStatus.REJECTED,
metadata=dict(order.metadata),
)
return [self._event_from_order(intent, order, KernelEventKind.ORDER_REJECT, VenueEventStatus.REJECTED, reason="MOCK_REJECT")]
self._open_orders[order_id] = order
events: List[VenueEvent] = []
if self.scenario.emit_ack_before_fill or not self.scenario.emit_fill_on_submit:
events.append(self._event_from_order(intent, order, KernelEventKind.ORDER_ACK, VenueEventStatus.ACKED))
if self.scenario.emit_fill_on_submit or self.scenario.partial_fill_ratio > 0:
if is_entry:
effective_ratio = self.scenario.entry_partial_fill_ratio if self.scenario.entry_partial_fill_ratio != 1.0 else self.scenario.partial_fill_ratio
else:
effective_ratio = self.scenario.exit_partial_fill_ratio if self.scenario.exit_partial_fill_ratio != 1.0 else self.scenario.partial_fill_ratio
fill_ratio = max(0.0, min(1.0, float(effective_ratio)))
fill_size = float(intent.target_size) * fill_ratio
event_kind = KernelEventKind.FULL_FILL if fill_ratio >= 1.0 else KernelEventKind.PARTIAL_FILL
event_status = VenueEventStatus.FILLED if fill_ratio >= 1.0 else VenueEventStatus.PARTIALLY_FILLED
fill_event = self._event_from_order(
intent,
order,
event_kind,
event_status,
price=float(intent.reference_price or 0.0),
fill_size=fill_size,
remaining_size=max(0.0, float(intent.target_size) - fill_size),
)
events.append(fill_event)
order = VenueOrder(
internal_trade_id=order.internal_trade_id,
venue_order_id=order.venue_order_id,
venue_client_id=order.venue_client_id,
side=order.side,
intended_size=order.intended_size,
filled_size=fill_size,
average_fill_price=float(intent.reference_price or 0.0),
status=VenueOrderStatus.FILLED if fill_ratio >= 1.0 else VenueOrderStatus.PARTIALLY_FILLED,
metadata=dict(order.metadata),
)
self._open_orders[order_id] = order
return events
def cancel(self, order: VenueOrder, *, reason: str = "") -> List[VenueEvent]:
if self.scenario.cancel_reject:
return [
self._event_from_order(
self._dummy_intent(order),
order,
KernelEventKind.CANCEL_REJECT,
VenueEventStatus.CANCELED_REJECTED,
reason=reason or "MOCK_CANCEL_REJECT",
)
]
existing = self._open_orders.get(order.venue_order_id, order)
canceled = VenueOrder(
internal_trade_id=existing.internal_trade_id,
venue_order_id=existing.venue_order_id,
venue_client_id=existing.venue_client_id,
side=existing.side,
intended_size=existing.intended_size,
filled_size=existing.filled_size,
average_fill_price=existing.average_fill_price,
status=VenueOrderStatus.CANCELED,
metadata=dict(existing.metadata),
)
self._open_orders.pop(order.venue_order_id, None)
return [
self._event_from_order(
self._dummy_intent(order),
canceled,
KernelEventKind.CANCEL_ACK,
VenueEventStatus.CANCELED,
reason=reason or "MOCK_CANCEL_ACK",
)
]
def open_orders(self) -> List[VenueOrder]:
return list(self._open_orders.values())
def open_positions(self) -> List[Dict[str, Any]]:
return list(self._open_positions.values())
def reconcile(self) -> List[VenueEvent]:
return []
def _dummy_intent(self, order: VenueOrder) -> KernelIntent:
return KernelIntent(
timestamp=datetime.now(timezone.utc),
intent_id=order.venue_client_id,
trade_id=order.internal_trade_id,
slot_id=int(order.metadata.get("slot_id", 0)),
asset=str(order.metadata.get("asset", "")),
side=order.side,
action=KernelCommandType.EXIT if order.metadata.get("action") == "EXIT" else KernelCommandType.ENTER,
reference_price=float(order.metadata.get("reference_price", 0.0)),
target_size=float(order.intended_size),
leverage=float(order.metadata.get("leverage", 1.0)),
reason=str(order.metadata.get("reason", "")),
metadata=dict(order.metadata),
)
def _event_from_order(
self,
intent: KernelIntent,
order: VenueOrder,
kind: KernelEventKind,
status: VenueEventStatus,
*,
price: Optional[float] = None,
fill_size: float = 0.0,
remaining_size: float = 0.0,
reason: str = "",
) -> VenueEvent:
event = VenueEvent(
timestamp=datetime.now(timezone.utc),
event_id=f"EV-{next(self._event_seq):08d}",
trade_id=intent.trade_id,
slot_id=intent.slot_id,
kind=kind,
status=status,
venue_order_id=order.venue_order_id,
venue_client_id=order.venue_client_id,
side=order.side,
asset=intent.asset,
price=float(price if price is not None else intent.reference_price or 0.0),
size=float(intent.target_size),
filled_size=float(fill_size),
remaining_size=float(remaining_size),
reason=reason,
raw_payload={
"status": status.value,
"orderId": order.venue_order_id,
"clientOrderId": order.venue_client_id,
"symbol": intent.asset,
"side": order.side.value,
"action": intent.action.value,
},
metadata={"intent_id": intent.intent_id, "action": intent.action.value},
)
return event
# ------------------------------------------------------------------
# Phase 2 stream seam — ExchangeEvent subscribe / account_snapshot
# ------------------------------------------------------------------
def queue_exchange_event(self, event: "ExchangeEvent") -> None: # type: ignore[name-defined]
"""Enqueue an ExchangeEvent to be yielded by subscribe()."""
self._exchange_event_queue.append(event)
async def subscribe(self) -> AsyncIterator["ExchangeEvent"]: # type: ignore[name-defined]
"""
Yield pre-programmed ExchangeEvent instances (offline/test mode).
Waits for events to be enqueued via queue_exchange_event(); yields
them in FIFO order. Sleeps 50 ms between polls.
"""
while True:
if self._exchange_event_queue:
yield self._exchange_event_queue.pop(0)
else:
await asyncio.sleep(0.05)
async def account_snapshot(self) -> "ExchangeEvent": # type: ignore[name-defined]
"""Return a synthetic ACCOUNT_UPDATE based on the mock's internal state."""
from .exchange_event import ExchangeEvent, ExchangeEventKind
return ExchangeEvent(
kind=ExchangeEventKind.ACCOUNT_UPDATE,
event_id=f"mock-snap-{uuid.uuid4().hex[:8]}",
exchange_ts=int(time.time() * 1000),
wallet_balance=self._mock_wallet_balance,
available_margin=self._mock_wallet_balance,
used_margin=0.0,
maint_margin=0.0,
source="poll",
)
@property
def _exchange_event_queue(self) -> list:
if not hasattr(self, "_exeq"):
object.__setattr__(self, "_exeq", []) # avoid dataclass collision
return self._exeq # type: ignore[return-value]
@property
def _mock_wallet_balance(self) -> float:
# Use the account projection's capital if available
proj = getattr(self, "_account_projection", None)
if proj is not None:
snap = getattr(proj, "snapshot", None)
if snap is not None:
return float(getattr(snap, "capital", 0.0))
return 10_000.0

View File

@@ -0,0 +1,444 @@
"""Gate G3: ExchangeEvent seam + mode-parity tests.
THE SYMMETRY GATE. Proves that the WS adapter path and the REST-poll
path produce identical ExchangeEvent output for the same logical event
sequence. Tests also cover:
- Frame normalisation (ORDER_TRADE_UPDATE, ACCOUNT_UPDATE, FUNDING_FEE)
- ExchangeEvent immutability and field defaults
- BingxUserStream.account_snapshot() synthesiser
- MockVenueAdapter subscribe() / account_snapshot() for offline tests
- Mode-parity: WS-normalised vs poll-synthesised ACCOUNT_UPDATE events
produce identical AccountProjectionV2 snapshots
"""
from __future__ import annotations
import asyncio
import json
import sys
import time
import uuid
sys.path.insert(0, "/mnt/dolphinng5_predict")
import pytest
from prod.clean_arch.dita_v2.exchange_event import (
ExchangeEvent,
ExchangeEventKind,
ExchangePosition,
)
from prod.clean_arch.dita_v2.bingx_user_stream import BingxUserStream, _safe_float
from prod.clean_arch.dita_v2.account import AccountProjectionV2, ReconcileConfig
# ---------------------------------------------------------------------------
# 1. ExchangeEvent dataclass
# ---------------------------------------------------------------------------
class TestExchangeEvent:
def test_immutable(self):
ev = ExchangeEvent(
kind=ExchangeEventKind.FULL_FILL,
event_id="e1",
exchange_ts=1_000,
)
with pytest.raises((AttributeError, TypeError)):
ev.fill_price = 99.0 # frozen dataclass
def test_defaults_are_zero_or_empty(self):
ev = ExchangeEvent(
kind=ExchangeEventKind.ACCOUNT_UPDATE,
event_id="e2",
exchange_ts=0,
)
assert ev.fill_price == 0.0
assert ev.fill_qty == 0.0
assert ev.wallet_balance == 0.0
assert ev.positions == ()
assert ev.source == "ws"
def test_is_fill(self):
full = ExchangeEvent(kind=ExchangeEventKind.FULL_FILL, event_id="x", exchange_ts=0)
partial = ExchangeEvent(kind=ExchangeEventKind.PARTIAL_FILL, event_id="y", exchange_ts=0)
other = ExchangeEvent(kind=ExchangeEventKind.ACCOUNT_UPDATE, event_id="z", exchange_ts=0)
assert full.is_fill()
assert partial.is_fill()
assert not other.is_fill()
def test_positions_tuple(self):
pos = ExchangePosition(symbol="BTC-USDT", qty=0.1, side="LONG")
ev = ExchangeEvent(
kind=ExchangeEventKind.POSITION_UPDATE,
event_id="p",
exchange_ts=0,
positions=(pos,),
)
assert len(ev.positions) == 1
assert ev.positions[0].symbol == "BTC-USDT"
# ---------------------------------------------------------------------------
# 2. BingxUserStream frame normalisation (no network — pure unit)
# ---------------------------------------------------------------------------
class TestFrameNormalisation:
"""Tests _normalise() directly; no WS connection needed."""
def _stream(self) -> BingxUserStream:
return BingxUserStream(
http_client=object(),
ws_base_url="wss://example.invalid",
)
def test_full_fill(self):
stream = self._stream()
frame = {
"e": "ORDER_TRADE_UPDATE",
"E": 1_700_000_000_000,
"o": {
"s": "BTC-USDT",
"i": "99001",
"c": "cli-1",
"X": "FILLED",
"L": "50000.0", # last fill price
"l": "0.1", # last fill qty (incremental)
"n": "2.5", # fee
"N": "USDT",
"rp": "150.0", # realized PnL
},
}
ev = stream._normalise(frame)
assert ev is not None
assert ev.kind == ExchangeEventKind.FULL_FILL
assert ev.fill_price == pytest.approx(50_000.0)
assert ev.fill_qty == pytest.approx(0.1)
assert ev.fee == pytest.approx(2.5)
assert ev.realized_pnl == pytest.approx(150.0)
assert ev.symbol == "BTC-USDT"
assert ev.source == "ws"
def test_partial_fill(self):
stream = self._stream()
frame = {
"e": "ORDER_TRADE_UPDATE",
"E": 1_700_000_000_001,
"o": {"s": "ETH-USDT", "i": "99002", "X": "PARTIALLY_FILLED", "l": "0.5"},
}
ev = stream._normalise(frame)
assert ev.kind == ExchangeEventKind.PARTIAL_FILL
assert ev.fill_qty == pytest.approx(0.5)
def test_order_ack(self):
stream = self._stream()
frame = {"e": "ORDER_TRADE_UPDATE", "E": 0, "o": {"X": "NEW", "i": "1", "s": "X"}}
ev = stream._normalise(frame)
assert ev.kind == ExchangeEventKind.ORDER_ACK
def test_cancel_ack(self):
stream = self._stream()
frame = {"e": "ORDER_TRADE_UPDATE", "E": 0, "o": {"X": "CANCELED", "i": "2", "s": "X"}}
ev = stream._normalise(frame)
assert ev.kind == ExchangeEventKind.CANCEL_ACK
def test_account_update_balances(self):
stream = self._stream()
frame = {
"e": "ACCOUNT_UPDATE",
"E": 1_700_000_000_002,
"B": [{"a": "USDT", "wb": "9950.0", "cw": "9200.0"}],
"P": [],
}
ev = stream._normalise(frame)
assert ev.kind == ExchangeEventKind.ACCOUNT_UPDATE
assert ev.wallet_balance == pytest.approx(9_950.0)
assert ev.available_margin == pytest.approx(9_200.0)
def test_account_update_positions(self):
stream = self._stream()
frame = {
"e": "ACCOUNT_UPDATE",
"E": 0,
"B": [],
"P": [{"s": "BTC-USDT", "pa": "0.1", "ep": "50000", "up": "100"}],
}
ev = stream._normalise(frame)
assert len(ev.positions) == 1
assert ev.positions[0].symbol == "BTC-USDT"
assert ev.positions[0].qty == pytest.approx(0.1)
def test_account_update_zero_position_excluded(self):
stream = self._stream()
frame = {
"e": "ACCOUNT_UPDATE", "E": 0, "B": [],
"P": [{"s": "BTC-USDT", "pa": "0.0", "ep": "50000"}],
}
ev = stream._normalise(frame)
assert len(ev.positions) == 0
def test_funding_fee(self):
stream = self._stream()
frame = {
"e": "FUNDING_FEE",
"E": 1_700_000_000_003,
"fs": {"s": "BTC-USDT", "fa": "-1.25", "a": "USDT"},
}
ev = stream._normalise(frame)
assert ev.kind == ExchangeEventKind.FUNDING_FEE
assert ev.funding_amount == pytest.approx(-1.25)
assert ev.symbol == "BTC-USDT"
def test_snapshot_unknown(self):
stream = self._stream()
frame = {"e": "SNAPSHOT", "E": 0, "ac": {"s": "MTL-USDT"}}
ev = stream._normalise(frame)
assert ev.kind == ExchangeEventKind.UNKNOWN
def test_gzip_decode(self):
import gzip as _gz
import aiohttp
raw_bytes = _gz.compress(b'{"e":"SNAPSHOT","E":0,"ac":{}}')
class _Msg:
type = aiohttp.WSMsgType.BINARY
data = raw_bytes
text = BingxUserStream._decode(_Msg())
assert text is not None
assert '"SNAPSHOT"' in text
# ---------------------------------------------------------------------------
# 3. Safe-float helper
# ---------------------------------------------------------------------------
class TestSafeFloat:
def test_normal(self):
assert _safe_float("1.5") == pytest.approx(1.5)
def test_none_gives_default(self):
assert _safe_float(None) == 0.0
def test_inf_gives_default(self):
assert _safe_float(float("inf")) == 0.0
def test_nan_gives_default(self):
import math
assert _safe_float(float("nan")) == 0.0
def test_empty_string_gives_default(self):
assert _safe_float("") == 0.0
# ---------------------------------------------------------------------------
# 4. MockVenueAdapter subscribe / account_snapshot
# ---------------------------------------------------------------------------
class TestMockSubscribe:
def _make_mock(self):
from prod.clean_arch.dita_v2.mock_venue import MockVenueAdapter, MockVenueScenario
return MockVenueAdapter(scenario=MockVenueScenario())
def test_account_snapshot_is_account_update(self):
mock = self._make_mock()
snap = asyncio.get_event_loop().run_until_complete(mock.account_snapshot())
assert snap.kind == ExchangeEventKind.ACCOUNT_UPDATE
assert snap.source == "poll"
assert snap.wallet_balance >= 0.0
def test_queue_and_subscribe(self):
mock = self._make_mock()
ev = ExchangeEvent(
kind=ExchangeEventKind.FULL_FILL,
event_id="q1",
exchange_ts=0,
fill_price=100.0,
fill_qty=1.0,
)
mock.queue_exchange_event(ev)
received = []
async def _collect():
async for event in mock.subscribe():
received.append(event)
break # take one
asyncio.get_event_loop().run_until_complete(asyncio.wait_for(_collect(), timeout=2.0))
assert len(received) == 1
assert received[0].fill_price == pytest.approx(100.0)
# ---------------------------------------------------------------------------
# 5. MODE-PARITY — THE SYMMETRY GATE
#
# The same logical account state (wallet_balance=9800, one position) must
# produce an IDENTICAL AccountProjectionV2 snapshot regardless of whether
# the information arrived via the WS path (frame normalised by
# BingxUserStream._normalise) or the REST-poll path (account_snapshot()).
# ---------------------------------------------------------------------------
class TestModeParity:
"""
Feed the same logical event sequence via:
(a) WS-normalised ExchangeEvent (source="ws")
(b) Poll-synthesised ExchangeEvent (source="poll")
Apply both to fresh AccountProjectionV2 instances.
Assert that k.capital, k.fees_paid, reconcile.status are identical.
"""
def _make_proj(self) -> AccountProjectionV2:
return AccountProjectionV2(
seed_capital=10_000.0,
reconcile_config=ReconcileConfig(capital_epsilon=0.01, pending_fee_bound=10.0),
)
def _apply_fill_event(self, proj: AccountProjectionV2, ev: ExchangeEvent) -> None:
if ev.is_fill():
proj.apply_fill(
fill_price=ev.fill_price,
fill_qty=ev.fill_qty,
fee=ev.fee,
realized_pnl=ev.realized_pnl,
)
def _apply_account_event(self, proj: AccountProjectionV2, ev: ExchangeEvent) -> None:
if ev.kind == ExchangeEventKind.ACCOUNT_UPDATE:
proj.apply_balance_update(
wallet_balance=ev.wallet_balance,
available_margin=ev.available_margin,
used_margin=ev.used_margin,
maint_margin=ev.maint_margin,
)
if ev.positions:
from prod.clean_arch.dita_v2.account import EPosition
proj.apply_position_update([
EPosition(
symbol=p.symbol, qty=p.qty, entry_price=p.entry_price,
unrealized_pnl=p.unrealized_pnl, leverage=p.leverage, side=p.side,
)
for p in ev.positions
])
def test_fill_mode_parity(self):
"""A FULL_FILL arriving via WS and via poll must yield same k.capital."""
stream = BingxUserStream(http_client=object(), ws_base_url="wss://x")
# WS path: frame → normalise → apply
ws_frame = {
"e": "ORDER_TRADE_UPDATE", "E": 1_000,
"o": {"s": "BTC-USDT", "i": "1", "X": "FILLED",
"L": "50000", "l": "0.1", "n": "2.5", "rp": "100.0"},
}
ws_event = stream._normalise(ws_frame)
proj_ws = self._make_proj()
self._apply_fill_event(proj_ws, ws_event)
snap_ws = proj_ws.build_snapshot("ws_fill", [], ts=1.0)
# Poll path: synthetic event with same numbers
poll_event = ExchangeEvent(
kind=ExchangeEventKind.FULL_FILL,
event_id="poll-1",
exchange_ts=1_000,
fill_price=50_000.0,
fill_qty=0.1,
fee=2.5,
realized_pnl=100.0,
source="poll",
)
proj_poll = self._make_proj()
self._apply_fill_event(proj_poll, poll_event)
snap_poll = proj_poll.build_snapshot("poll_fill", [], ts=1.0)
# PARITY CHECK
assert snap_ws.k.capital == pytest.approx(snap_poll.k.capital, rel=1e-9)
assert snap_ws.k.fees_paid == pytest.approx(snap_poll.k.fees_paid, rel=1e-9)
assert snap_ws.k.realized_pnl == pytest.approx(snap_poll.k.realized_pnl, rel=1e-9)
def test_account_update_mode_parity(self):
"""An ACCOUNT_UPDATE from WS and from poll must produce the same e-facts."""
stream = BingxUserStream(http_client=object(), ws_base_url="wss://x")
ws_frame = {
"e": "ACCOUNT_UPDATE", "E": 2_000,
"B": [{"a": "USDT", "wb": "9800.0", "cw": "9000.0"}],
"P": [],
}
ws_event = stream._normalise(ws_frame)
proj_ws = self._make_proj()
self._apply_account_event(proj_ws, ws_event)
snap_ws = proj_ws.build_snapshot("ws_acct", [], ts=2.0)
poll_event = ExchangeEvent(
kind=ExchangeEventKind.ACCOUNT_UPDATE,
event_id="poll-2",
exchange_ts=2_000,
wallet_balance=9_800.0,
available_margin=9_000.0,
source="poll",
)
proj_poll = self._make_proj()
self._apply_account_event(proj_poll, poll_event)
snap_poll = proj_poll.build_snapshot("poll_acct", [], ts=2.0)
assert snap_ws.e.wallet_balance == pytest.approx(snap_poll.e.wallet_balance)
assert snap_ws.e.available_margin == pytest.approx(snap_poll.e.available_margin)
def test_funding_mode_parity(self):
"""A FUNDING_FEE via WS and via poll must fold identically into k_capital."""
stream = BingxUserStream(http_client=object(), ws_base_url="wss://x")
ws_frame = {"e": "FUNDING_FEE", "E": 3_000, "fs": {"s": "BTC-USDT", "fa": "-3.75"}}
ws_event = stream._normalise(ws_frame)
proj_ws = self._make_proj()
if ws_event.kind == ExchangeEventKind.FUNDING_FEE:
proj_ws.apply_funding(ws_event.funding_amount)
snap_ws = proj_ws.build_snapshot("ws_fund", [], ts=3.0)
proj_poll = self._make_proj()
proj_poll.apply_funding(-3.75)
snap_poll = proj_poll.build_snapshot("poll_fund", [], ts=3.0)
assert snap_ws.k.capital == pytest.approx(snap_poll.k.capital, rel=1e-9)
assert snap_ws.k.funding_paid == pytest.approx(snap_poll.k.funding_paid, rel=1e-9)
def test_sequential_events_mode_parity(self):
"""
Full sequence: FILL → ACCOUNT_UPDATE → FUNDING_FEE
applied via both paths must yield identical final k_capital.
"""
stream = BingxUserStream(http_client=object(), ws_base_url="wss://x")
ws_frames = [
{"e": "ORDER_TRADE_UPDATE", "E": 1, "o": {
"s": "BTC-USDT", "i": "10", "X": "FILLED",
"L": "60000", "l": "0.05", "n": "1.8", "rp": "200.0"}},
{"e": "ACCOUNT_UPDATE", "E": 2,
"B": [{"a": "USDT", "wb": "10198.2", "cw": "10198.2"}], "P": []},
{"e": "FUNDING_FEE", "E": 3, "fs": {"s": "BTC-USDT", "fa": "-0.5"}},
]
proj_ws = self._make_proj()
for f in ws_frames:
ev = stream._normalise(f)
if ev.is_fill():
self._apply_fill_event(proj_ws, ev)
elif ev.kind == ExchangeEventKind.ACCOUNT_UPDATE:
self._apply_account_event(proj_ws, ev)
elif ev.kind == ExchangeEventKind.FUNDING_FEE:
proj_ws.apply_funding(ev.funding_amount)
snap_ws = proj_ws.build_snapshot("ws_seq", [], ts=10.0)
# Poll path: same values, synthetic events
proj_poll = self._make_proj()
proj_poll.apply_fill(fill_price=60_000.0, fill_qty=0.05, fee=1.8, realized_pnl=200.0)
proj_poll.apply_balance_update(
wallet_balance=10_198.2, available_margin=10_198.2,
used_margin=0.0, maint_margin=0.0,
)
proj_poll.apply_funding(-0.5)
snap_poll = proj_poll.build_snapshot("poll_seq", [], ts=10.0)
# PARITY
assert snap_ws.k.capital == pytest.approx(snap_poll.k.capital, rel=1e-9)
assert snap_ws.k.fees_paid == pytest.approx(snap_poll.k.fees_paid, rel=1e-9)
assert snap_ws.k.realized_pnl == pytest.approx(snap_poll.k.realized_pnl, rel=1e-9)
assert snap_ws.k.funding_paid == pytest.approx(snap_poll.k.funding_paid, rel=1e-9)
assert snap_ws.reconcile.status == snap_poll.reconcile.status

View File

@@ -0,0 +1,60 @@
"""Venue adapter contracts for DITAv2."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, AsyncIterator, Dict, List, Optional, Protocol
from .contracts import (
KernelCommandType,
KernelIntent,
KernelEventKind,
TradeSide,
VenueEvent,
VenueEventStatus,
VenueOrder,
VenueOrderStatus,
)
from .exchange_event import ExchangeEvent
class VenueAdapter(Protocol):
"""Abstract venue adapter used by the kernel."""
def submit(self, intent: KernelIntent) -> List[VenueEvent]:
...
def cancel(self, order: VenueOrder, *, reason: str = "") -> List[VenueEvent]:
...
def open_orders(self) -> List[VenueOrder]:
...
def open_positions(self) -> List[Dict[str, Any]]:
...
def reconcile(self) -> List[VenueEvent]:
...
# ------------------------------------------------------------------
# Phase 2 — stream seam (spec G3)
# ------------------------------------------------------------------
async def subscribe(self) -> AsyncIterator[ExchangeEvent]:
"""
Yield ExchangeEvent instances in arrival order. Implementations
must handle reconnection, keepalive, and 24h rotation internally.
The iterator never terminates normally — callers cancel it on
shutdown. Both the WS and poll-failover paths implement this
interface so the kernel layer is source-agnostic.
"""
... # pragma: no cover
async def account_snapshot(self) -> ExchangeEvent:
"""
Return a single ACCOUNT_UPDATE + POSITION_UPDATE merged event
by calling the exchange REST API. Used for gap-backfill on
reconnect and as the poll-failover path.
"""
... # pragma: no cover