Files
siloqy/prod/tests/test_dita_v2_bingx_adapter.py

392 lines
12 KiB
Python
Raw Normal View History

from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any
import pytest
from prod.clean_arch.dita_v2 import (
BingxVenueAdapter,
ExecutionKernel,
InMemoryControlPlane,
KernelCommandType,
KernelControlSnapshot,
KernelIntent,
KernelMode,
KernelEventKind,
KernelVerbosity,
TradeSide,
TradeStage,
VenueEventStatus,
VenueOrder,
VenueOrderStatus,
)
from prod.clean_arch.ports.execution import ExchangeStateSnapshot, ExecutionReceipt
def _norm_symbol(symbol: str) -> str:
return str(symbol or "").replace("-", "").replace("_", "").upper()
def _snapshot(
*,
capital: float = 25_000.0,
positions: list[dict[str, Any]] | None = None,
open_orders: list[dict[str, Any]] | None = None,
all_orders: list[dict[str, Any]] | None = None,
all_fills: list[dict[str, Any]] | None = None,
source: str = "bingx",
) -> ExchangeStateSnapshot:
position_map = {
_norm_symbol(str(row.get("symbol", ""))): dict(row)
for row in (positions or [])
if _norm_symbol(str(row.get("symbol", "")))
}
return ExchangeStateSnapshot(
timestamp=datetime.now(timezone.utc),
capital=capital,
equity=capital,
open_positions=position_map,
open_orders=[dict(row) for row in (open_orders or [])],
all_orders=[dict(row) for row in (all_orders or [])],
all_fills=[dict(row) for row in (all_fills or [])],
account={"balances": [{"asset": "USDT", "total": capital}]},
open_notional=0.0,
source=source,
recovered=False,
)
class FakeBingxBackend:
def __init__(
self,
*,
snapshots: list[ExchangeStateSnapshot],
receipt: ExecutionReceipt | None = None,
cancel_response: dict[str, Any] | None = None,
) -> None:
self.snapshots = snapshots
self.receipt = receipt
self.cancel_response = cancel_response or {"status": "CANCELED"}
self.calls: list[tuple[str, Any]] = []
self.submitted: list[Any] = []
self.canceled: list[tuple[Any, str]] = []
self._refresh_count = 0
self.connected = False
async def connect(self) -> bool:
self.connected = True
self.calls.append(("connect", None))
return True
async def disconnect(self) -> None:
self.connected = False
self.calls.append(("disconnect", None))
async def refresh_state(self, symbol: str | None = None, *, include_history: bool = False) -> ExchangeStateSnapshot:
self.calls.append(("refresh_state", symbol, include_history))
index = min(self._refresh_count, len(self.snapshots) - 1)
snapshot = self.snapshots[index]
if self._refresh_count < len(self.snapshots) - 1:
self._refresh_count += 1
return snapshot
async def submit_intent(self, legacy_intent: Any) -> ExecutionReceipt:
self.calls.append(("submit_intent", legacy_intent.trade_id))
self.submitted.append(legacy_intent)
if self.receipt is None:
raise AssertionError("receipt must be configured")
return self.receipt
async def cancel_order(self, order: VenueOrder, *, reason: str = "") -> dict[str, Any]:
self.calls.append(("cancel_order", order.venue_order_id, reason))
self.canceled.append((order, reason))
return dict(self.cancel_response)
def _intent(
*,
action: KernelCommandType = KernelCommandType.ENTER,
trade_id: str = "trade-1",
slot_id: int = 0,
asset: str = "BTCUSDT",
side: TradeSide = TradeSide.SHORT,
target_size: float = 1.0,
leverage: float = 2.0,
reference_price: float = 75_000.0,
reason: str = "TEST",
) -> KernelIntent:
return KernelIntent(
timestamp=datetime.now(timezone.utc),
intent_id=f"{trade_id}:{action.value}",
trade_id=trade_id,
slot_id=slot_id,
asset=asset,
side=side,
action=action,
reference_price=reference_price,
target_size=target_size,
leverage=leverage,
reason=reason,
)
def test_submit_maps_bingx_ack_and_snapshot_fill_to_ditav2_events() -> None:
ack_row = {
"orderId": "1001",
"clientOrderId": "cid-1",
"clientOrderID": "cid-1",
"symbol": "BTC-USDT",
"status": "NEW",
"executedQty": "0",
"cumFilledQty": "0",
}
fill_row = {
"clientOrderId": "cid-1",
"clientOrderID": "cid-1",
"orderId": "1001",
"symbol": "BTC-USDT",
"status": "FILLED",
"executedQty": "1",
"lastFilledQty": "1",
"lastFillPrice": "75000",
}
backend = FakeBingxBackend(
snapshots=[
_snapshot(),
_snapshot(
positions=[
{
"symbol": "BTC-USDT",
"positionSide": "SHORT",
"positionAmt": "-1",
"avgPrice": "75000",
"markPrice": "75010",
"leverage": "2",
}
],
open_orders=[ack_row],
all_orders=[ack_row],
all_fills=[fill_row],
),
],
receipt=ExecutionReceipt(
timestamp=datetime.now(timezone.utc),
status="NEW",
symbol="BTC-USDT",
side="SELL",
action="ENTER",
quantity=1.0,
price=75_000.0,
client_order_id="cid-1",
order_id="1001",
raw_ack=ack_row,
raw_state={},
),
)
adapter = BingxVenueAdapter(backend=backend)
events = adapter.submit(_intent())
assert backend.connected is False
assert backend.submitted
assert [event.kind for event in events] == [event.kind for event in events if event.kind.value]
assert events[0].kind.value == "ORDER_ACK"
assert events[0].status == VenueEventStatus.ACKED
assert events[0].venue_client_id == "cid-1"
assert events[0].venue_order_id == "1001"
assert len(events) == 2
assert events[1].kind.value == "FULL_FILL"
assert events[1].status == VenueEventStatus.FILLED
assert events[1].filled_size == pytest.approx(1.0)
assert events[1].remaining_size == pytest.approx(0.0)
def test_cancel_uses_bingx_cancel_surface_and_maps_cancel_ack() -> None:
cancel_row = {
"orderId": "2001",
"clientOrderId": "cid-2",
"clientOrderID": "cid-2",
"symbol": "BTC-USDT",
"status": "CANCELED",
}
backend = FakeBingxBackend(
snapshots=[
_snapshot(
open_orders=[cancel_row],
all_orders=[cancel_row],
),
_snapshot(),
],
cancel_response=cancel_row,
)
adapter = BingxVenueAdapter(backend=backend)
order = VenueOrder(
internal_trade_id="trade-2",
venue_order_id="2001",
venue_client_id="cid-2",
side=TradeSide.SHORT,
intended_size=1.0,
status=VenueOrderStatus.NEW,
metadata={"slot_id": 0, "asset": "BTCUSDT"},
)
events = adapter.cancel(order, reason="MANUAL_CLOSE")
assert backend.canceled
assert events[0].kind.value == "CANCEL_ACK"
assert events[0].status == VenueEventStatus.CANCELED
assert events[0].venue_order_id == "2001"
assert events[0].reason == "MANUAL_CLOSE"
def test_reconcile_and_open_views_normalize_bingx_rows() -> None:
ack_row = {
"orderId": "3001",
"clientOrderId": "cid-3",
"clientOrderID": "cid-3",
"symbol": "ETH-USDT",
"status": "NEW",
"executedQty": "0",
}
fill_row = {
"clientOrderId": "cid-3",
"clientOrderID": "cid-3",
"orderId": "3001",
"symbol": "ETH-USDT",
"status": "PARTIALLY_FILLED",
"executedQty": "2",
"lastFilledQty": "1",
"lastFillPrice": "2500",
}
position_row = {
"symbol": "ETH-USDT",
"positionSide": "LONG",
"positionAmt": "2",
"avgPrice": "2500",
"markPrice": "2510",
"leverage": "3",
}
backend = FakeBingxBackend(
snapshots=[
_snapshot(
positions=[position_row],
open_orders=[ack_row],
all_orders=[ack_row, fill_row],
all_fills=[fill_row],
)
]
)
adapter = BingxVenueAdapter(backend=backend)
orders = adapter.open_orders()
positions = adapter.open_positions()
events = adapter.reconcile()
assert orders[0].status == VenueOrderStatus.NEW
assert orders[0].venue_client_id == "cid-3"
assert positions[0]["positionAmt"] == "2"
assert any(event.kind.value == "PARTIAL_FILL" for event in events)
assert any(event.kind.value == "ORDER_ACK" for event in events)
def test_kernel_can_drive_through_bingx_venue_shim() -> None:
ack_row = {
"orderId": "4001",
"clientOrderId": "cid-4",
"clientOrderID": "cid-4",
"symbol": "BTC-USDT",
"status": "NEW",
"executedQty": "0",
}
fill_row = {
"clientOrderId": "cid-4",
"clientOrderID": "cid-4",
"orderId": "4001",
"symbol": "BTC-USDT",
"status": "FILLED",
"executedQty": "1",
"lastFilledQty": "1",
"lastFillPrice": "75000",
}
backend = FakeBingxBackend(
snapshots=[
_snapshot(),
_snapshot(
positions=[
{
"symbol": "BTC-USDT",
"positionSide": "SHORT",
"positionAmt": "-1",
"avgPrice": "75000",
"markPrice": "75010",
"leverage": "2",
}
],
open_orders=[ack_row],
all_orders=[ack_row],
all_fills=[fill_row],
),
],
receipt=ExecutionReceipt(
timestamp=datetime.now(timezone.utc),
status="NEW",
symbol="BTC-USDT",
side="SELL",
action="ENTER",
quantity=1.0,
price=75_000.0,
client_order_id="cid-4",
order_id="4001",
raw_ack=ack_row,
raw_state={},
),
)
kernel = ExecutionKernel(
control_plane=InMemoryControlPlane(
KernelControlSnapshot(mode=KernelMode.DEBUG, verbosity=KernelVerbosity.TRACE)
),
venue=BingxVenueAdapter(backend=backend),
)
outcome = kernel.process_intent(_intent(trade_id="trade-4"))
slot = kernel.slot(0)
assert outcome.accepted is True
assert slot.fsm_state == TradeStage.POSITION_OPEN
assert slot.trade_id == "trade-4"
assert backend.submitted
def test_submit_maps_bingx_rate_limit_to_first_class_venue_event() -> None:
backend = FakeBingxBackend(
snapshots=[_snapshot(), _snapshot()],
receipt=ExecutionReceipt(
timestamp=datetime.now(timezone.utc),
status="RATE_LIMITED",
symbol="BTC-USDT",
side="SELL",
action="ENTER",
quantity=1.0,
price=75_000.0,
client_order_id="cid-rate-limit",
order_id="",
raw_ack={
"status": "RATE_LIMITED",
"msg": "code:100410 endpoint is in disabled/frequency-limited period",
"retryAfter": int(datetime.now(timezone.utc).timestamp() * 1000) + 2_500,
},
raw_state={},
),
)
adapter = BingxVenueAdapter(backend=backend)
events = adapter.submit(_intent(trade_id="trade-rate-limit"))
assert len(events) == 1
assert events[0].kind == KernelEventKind.RATE_LIMITED
assert events[0].status == VenueEventStatus.RATE_LIMITED
assert events[0].venue_client_id == "cid-rate-limit"
assert events[0].metadata["retry_after_ms"] >= 0