Files
siloqy/prod/clean_arch/adapters/bingx_direct.py

778 lines
37 KiB
Python
Raw Normal View History

"""Direct BingX execution adapter with no Nautilus Trader node dependency.
This adapter speaks BingX REST directly and keeps the exchange state
authoritative. It is intended for PINK live execution under the DITA boundary.
"""
from __future__ import annotations
import asyncio
import json
import logging
import math
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
import time
import uuid
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
from dataclasses import dataclass, field
from datetime import datetime, timezone
from decimal import Decimal, ROUND_DOWN
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
from nautilus_trader.model.identifiers import InstrumentId
from prod.bingx.config import BingxExecClientConfig
from prod.bingx.config import BingxInstrumentProviderConfig
from prod.bingx.enums import BingxEnvironment
from prod.bingx.http import BingxHttpError
from prod.bingx.http import BingxHttpClient
from prod.bingx.instrument_provider import BingxInstrumentProvider
from prod.bingx.leverage import normalize_bingx_leverage_value
from prod.bingx.schemas import BingxOrderAck
from prod.bingx.schemas import unwrap_order_payload
from prod.clean_arch.dita import Intent, TradeSide, DecisionAction
from prod.clean_arch.ports.execution import ExchangeStateSnapshot
from prod.clean_arch.ports.execution import ExecutionReceipt
from prod.clean_arch.ports.execution import ExecutionPort
LOGGER = logging.getLogger(__name__)
def _rows_from_payload(payload: Any, *keys: str) -> list[dict[str, Any]]:
if isinstance(payload, list):
return [row for row in payload if isinstance(row, dict)]
if isinstance(payload, dict):
for key in keys:
rows = payload.get(key)
if isinstance(rows, list):
return [row for row in rows if isinstance(row, dict)]
return []
def _capital_from_balance_rows(rows: Any) -> float:
if not isinstance(rows, list):
return 0.0
for row in rows:
if not isinstance(row, dict):
continue
capital = 0.0
for key in ("total", "balance", "equity", "availableMargin", "availableBalance", "walletBalance", "free"):
try:
capital = float(row.get(key, 0.0) or 0.0)
except Exception:
continue
if capital > 0 and math.isfinite(capital):
return capital
if capital > 0 and math.isfinite(capital):
return capital
return 0.0
def _position_notional_from_rows(rows: Any) -> float:
if not isinstance(rows, list):
return 0.0
total = 0.0
for row in rows:
if not isinstance(row, dict):
continue
try:
qty = abs(
float(
row.get("positionAmt")
or row.get("positionQty")
or row.get("positionSize")
or row.get("quantity")
or row.get("pa")
or 0.0
)
)
if qty <= 0.0:
continue
notional = row.get("positionValue") or row.get("notional") or row.get("openNotional")
if notional is not None:
total += abs(float(notional or 0.0))
continue
entry = (
row.get("entryPrice")
or row.get("avgPrice")
or row.get("markPrice")
or row.get("avgEntryPrice")
or row.get("ep")
or row.get("ap")
or 0.0
)
total += qty * abs(float(entry or 0.0))
except Exception:
continue
return total
def _normalize_symbol(symbol: str) -> str:
return str(symbol or "").replace("-", "").replace("_", "").replace("/","").upper()
def _venue_symbol_from_asset(asset: str) -> str:
text = _normalize_symbol(asset)
if text.endswith("USDT"):
return f"{text[:-4]}-USDT"
return text
def _decimal_text(value: Decimal) -> str:
text = format(value.normalize(), "f")
if "." in text:
text = text.rstrip("0").rstrip(".")
return text or "0"
def _is_rate_limited_error(exc: Exception) -> bool:
message = str(exc)
lowered = message.lower()
return "100410" in message or "frequency limit" in lowered or "rate limit" in lowered
@dataclass(frozen=True)
class BingxDirectExecutionConfig:
"""Execution-specific knobs for the direct adapter."""
environment: BingxEnvironment = BingxEnvironment.VST
allow_mainnet: bool = False
default_leverage: int = 1
exchange_leverage_cap: int = 3
recv_window_ms: int = 5_000
prefer_websocket: bool = False
use_reduce_only: bool = True
journal_strategy: str = "pink"
journal_db: str = "dolphin_pink"
instrument_provider: BingxInstrumentProviderConfig = BingxInstrumentProviderConfig(load_all=True)
class BingxDirectExecutionAdapter(ExecutionPort):
"""Direct BingX execution boundary with exchange-led state snapshots."""
def __init__(
self,
config: BingxExecClientConfig | BingxDirectExecutionConfig,
*,
client: BingxHttpClient | None = None,
provider: BingxInstrumentProvider | None = None,
) -> None:
if isinstance(config, BingxExecClientConfig):
self._config = BingxDirectExecutionConfig(
environment=config.environment,
allow_mainnet=config.allow_mainnet,
default_leverage=int(config.default_leverage),
exchange_leverage_cap=int(config.exchange_leverage_cap),
recv_window_ms=int(config.recv_window_ms),
prefer_websocket=bool(config.prefer_websocket),
use_reduce_only=bool(config.use_reduce_only),
journal_strategy=str(config.journal_strategy or "pink"),
journal_db=str(config.journal_db or "dolphin_pink"),
instrument_provider=config.instrument_provider,
)
http_config = config
else:
self._config = config
http_config = BingxExecClientConfig(
api_key="",
secret_key="",
environment=config.environment,
allow_mainnet=config.allow_mainnet,
prefer_websocket=config.prefer_websocket,
sizing_mode="testnet",
exchange_leverage_cap=config.exchange_leverage_cap,
use_reduce_only=config.use_reduce_only,
default_leverage=config.default_leverage,
recv_window_ms=config.recv_window_ms,
journal_strategy=config.journal_strategy,
journal_db=config.journal_db,
instrument_provider=config.instrument_provider,
)
self._client = client or BingxHttpClient(http_config)
self._provider = provider or BingxInstrumentProvider(client=self._client, config=self._config.instrument_provider)
self._log = LOGGER
self._client_order_run_id = uuid.uuid4().hex[:8]
self._entry_client_order_seq = 0
self._exit_client_order_seq = 0
self._state: ExchangeStateSnapshot | None = None
self._connected = False
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
# ── S1: Leverage cache ────────────────────────────────────────────────
# Maps symbol → last successfully set leverage (int).
# Avoids a ~350ms leverage POST before every order when leverage is unchanged.
#
# Cache key is symbol only (not runner_id) because leverage is an
# ACCOUNT-LEVEL setting on BingX — one value per symbol per account.
# IMPORTANT CONTRACT: if multiple runners share this account and request
# DIFFERENT leverages for the same symbol concurrently, the last writer
# wins at the exchange and the other runner's order executes at wrong
# leverage. Callers MUST ensure leverage is uniform across runners for
# any given symbol when sharing an account, or use separate accounts.
# This cache only eliminates the redundant round-trip; it cannot resolve
# the underlying multi-runner semantic conflict.
#
# Per-symbol asyncio.Lock: prevents concurrent submit_intent calls for the
# same symbol from interleaving leverage POST + cache update, which would
# create a window where the cache says "leverage set" but the POST hasn't
# completed yet. Lock scope is deliberately tight (only the POST + cache
# write), not the entire submit_intent, to avoid head-of-line blocking.
self._leverage_cache: Dict[str, int] = {}
self._leverage_locks: Dict[str, asyncio.Lock] = {}
# Persist path: survives process restarts within a session, not across
# reboots (leverage should be re-verified from exchange on cold start).
env_tag = "live" if getattr(self._config, "environment", None) and \
str(getattr(self._config, "environment", "")).upper() == "LIVE" else "vst"
self._leverage_cache_path = Path(f"/tmp/.bingx_leverage_cache_{env_tag}.json")
self._load_leverage_cache()
# ── S2: Background state refresh tracking ─────────────────────────────
self._state_refreshed_at: float = 0.0 # monotonic seconds
@property
def state(self) -> ExchangeStateSnapshot | None:
return self._state
async def connect(self) -> bool:
await self._provider.initialize()
self._connected = True
self._state = await self.refresh_state()
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
# S4/S1: on reconnect, verify cached leverage matches exchange truth.
# Drift happens when another process/runner changed leverage on the same account.
await self._verify_leverage_drift()
return True
async def disconnect(self) -> None:
self._connected = False
await self._client.close()
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
# ── S1: Leverage cache helpers ────────────────────────────────────────────
def _load_leverage_cache(self) -> None:
"""Load persisted leverage cache from JSON sidecar. Ignores errors."""
try:
raw = json.loads(self._leverage_cache_path.read_text())
self._leverage_cache = {
str(k): int(v)
for k, v in raw.items()
if isinstance(v, (int, float)) and math.isfinite(float(v)) and float(v) >= 1
}
LOGGER.debug("leverage cache loaded: %s", self._leverage_cache)
except Exception:
self._leverage_cache = {}
def _persist_leverage_cache(self) -> None:
"""Flush leverage cache to JSON sidecar. Non-fatal on failure."""
try:
self._leverage_cache_path.write_text(
json.dumps(self._leverage_cache, indent=2)
)
except Exception as exc:
LOGGER.warning("leverage cache persist failed: %s", exc)
async def _ensure_leverage(self, symbol: str, leverage: int) -> bool:
"""Set leverage for symbol only if the cached value differs.
Returns True if a POST was made (leverage was changed), False if skipped.
The asyncio.Lock per symbol ensures concurrent submit_intent calls for the
same symbol never interleave leverage POST and cache update preventing
the heisenbug where two orders both think they set leverage but one runs
at the wrong value because the other's POST arrived last.
Cache is updated ONLY on successful POST. A failed POST leaves the cache
stale, so the next submit retries correct conservative behaviour.
"""
lock = self._leverage_locks.setdefault(symbol, asyncio.Lock())
async with lock:
cached = self._leverage_cache.get(symbol)
if cached == leverage:
return False # exchange already at requested value — skip POST
try:
await self._client.signed_post(
"/openApi/swap/v2/trade/leverage",
{"symbol": symbol, "side": "BOTH", "leverage": leverage},
)
prev = self._leverage_cache.get(symbol)
self._leverage_cache[symbol] = leverage
self._persist_leverage_cache()
LOGGER.info(
"leverage SET symbol=%s %s%d",
symbol, f"{prev}" if prev is not None else "", leverage,
)
return True
except Exception as exc:
LOGGER.warning(
"leverage POST failed symbol=%s lev=%d: %s"
"cache NOT updated, will retry on next submit",
symbol, leverage, exc,
)
return False # do NOT cache — retry guarantees correctness
async def _verify_leverage_drift(self) -> None:
"""On connect/reconnect, compare cached leverage with exchange reality.
Drift occurs when another process or runner changed leverage on the same
account while this adapter was offline. Log it loudly; update cache to
exchange truth so next submit does not re-set to the wrong value.
"""
for symbol, cached_lev in list(self._leverage_cache.items()):
try:
resp = await self._client.signed_get(
"/openApi/swap/v2/trade/leverage", {"symbol": symbol}
)
exchange_lev = int(
float(resp.get("longLeverage") or resp.get("leverage") or 0)
)
if exchange_lev > 0 and exchange_lev != cached_lev:
LOGGER.warning(
"LEVERAGE DRIFT symbol=%s cached=%d exchange=%d"
"another process may have changed it; cache updated to exchange truth",
symbol, cached_lev, exchange_lev,
)
self._leverage_cache[symbol] = exchange_lev
except Exception as exc:
LOGGER.debug("leverage drift check failed symbol=%s: %s", symbol, exc)
self._persist_leverage_cache()
# ── S2: Background state refresh ─────────────────────────────────────────
async def _refresh_state_background(self, symbol: str) -> None:
"""Background task: refresh internal state after a synchronous MARKET fill.
MARKET fills deliver fill_price / executedQty in the ACK; capital and
position truth arrive via WS FILL_SETTLED + ACCOUNT_UPDATE. This REST
poll is belt-and-suspenders catches edge cases such as unexpected
concurrent fills or exchange position ledger lag. It does not block the
submit path.
"""
try:
self._state = await self._refresh_exchange_state(symbol, include_history=False)
self._state_refreshed_at = time.monotonic()
LOGGER.debug("background state refresh complete symbol=%s", symbol)
except Exception as exc:
LOGGER.warning("background state refresh failed symbol=%s: %s", symbol, exc)
def _resolve_instrument(self, asset: str):
normalized = _normalize_symbol(asset)
candidates = [
InstrumentId.from_str(f"{normalized}.BINGX"),
InstrumentId.from_str(f"{_venue_symbol_from_asset(asset)}.BINGX"),
]
for candidate in candidates:
instrument = self._provider.find(candidate)
if instrument is not None:
return instrument
for instrument in self._provider.list_all():
if _normalize_symbol(instrument.symbol.value) == normalized:
return instrument
if _normalize_symbol(instrument.raw_symbol.value) == normalized:
return instrument
return None
def _instrument_venue_symbol(self, asset: str) -> str:
instrument = self._resolve_instrument(asset)
if instrument is not None:
return str(instrument.raw_symbol.value)
return _venue_symbol_from_asset(asset)
def _instrument_step(self, asset: str) -> Decimal:
instrument = self._resolve_instrument(asset)
if instrument is not None:
try:
return Decimal(str(instrument.size_increment.as_decimal()))
except Exception:
pass
return Decimal("0.001")
def _format_quantity(self, asset: str, quantity: float) -> str:
step = self._instrument_step(asset)
if step <= 0:
return str(max(0.0, quantity))
value = Decimal(str(quantity))
quantized = (value / step).to_integral_value(rounding=ROUND_DOWN) * step
return _decimal_text(max(Decimal("0"), quantized))
def _instrument_tick(self, asset: str) -> Decimal:
instrument = self._resolve_instrument(asset)
if instrument is not None:
try:
tick = getattr(instrument, "price_increment", None)
if tick is not None:
return Decimal(str(tick.as_decimal()))
except Exception:
pass
return Decimal("0.01")
def _format_price(self, asset: str, price: float) -> str:
tick = self._instrument_tick(asset)
if tick <= 0:
return f"{price:.8f}".rstrip("0").rstrip(".")
value = Decimal(str(price))
quantized = (value / tick).to_integral_value(rounding=ROUND_DOWN) * tick
return _decimal_text(max(Decimal("0"), quantized))
async def _safe_get(self, endpoint: str, params: dict | None = None, *, fallback: Any = None) -> Any:
"""GET an endpoint, returning *fallback* on rate-limit errors."""
try:
return await self._client.signed_get(endpoint, params)
except BingxHttpError as exc:
message = str(exc)
if "100410" in message or "frequency limit" in message.lower():
LOGGER.debug("BingX %s rate-limited; continuing with empty snapshot", endpoint)
return fallback if fallback is not None else []
raise
async def _refresh_exchange_state(self, symbol: str | None = None, *, include_history: bool = False) -> ExchangeStateSnapshot:
"""Fetch exchange state with parallel HTTP calls.
The three primary calls (balance, positions, openOrders) are
independent and run concurrently via ``asyncio.gather``. Each has
its own rate-limit fallback so a single throttle does not block
the others. Historical calls (allOrders, allFillOrders) are gated
on ``include_history`` and also gathered.
"""
balance_task = self._safe_get("/openApi/swap/v2/user/balance")
positions_task = self._safe_get("/openApi/swap/v2/user/positions")
orders_task = self._safe_get("/openApi/swap/v2/trade/openOrders")
balance_payload, positions_payload, open_orders_payload = await asyncio.gather(
balance_task, positions_task, orders_task,
)
all_orders_payload: Any = []
all_fills_payload: Any = []
if include_history and symbol is not None:
venue_symbol = self._instrument_venue_symbol(symbol)
hist_tasks = asyncio.gather(
self._safe_get("/openApi/swap/v2/trade/allOrders", {"symbol": venue_symbol}),
self._safe_get("/openApi/swap/v2/trade/allFillOrders", {"symbol": venue_symbol}),
return_exceptions=True,
)
results = await hist_tasks
all_orders_payload = results[0] if not isinstance(results[0], Exception) else []
all_fills_payload = results[1] if not isinstance(results[1], Exception) else []
# Parse results (shared logic, same as before)
if isinstance(balance_payload, list):
balances = balance_payload
elif isinstance(balance_payload, dict):
rows_raw = balance_payload.get("balance") or balance_payload.get("balances") or balance_payload.get("data")
if isinstance(rows_raw, dict):
balances = [rows_raw]
elif isinstance(rows_raw, list):
balances = rows_raw
else:
balances = []
else:
balances = []
positions_rows = _rows_from_payload(positions_payload, "positions", "data")
positions: dict[str, dict[str, Any]] = {}
for row in positions_rows:
raw_symbol = str(row.get("symbol") or row.get("symbolName") or row.get("venueSymbol") or "")
key = _normalize_symbol(raw_symbol)
if not key:
continue
positions[key] = dict(row)
open_orders = _rows_from_payload(open_orders_payload, "orders", "data")
capital = _capital_from_balance_rows(balances)
open_notional = _position_notional_from_rows(positions_rows)
equity = capital
if open_notional > 0 and positions_rows:
equity = capital
snapshot = ExchangeStateSnapshot(
timestamp=datetime.now(timezone.utc),
capital=capital,
equity=equity,
open_positions=positions,
open_orders=[dict(row) for row in open_orders],
all_orders=[dict(row) for row in _rows_from_payload(all_orders_payload, "orders", "data")],
all_fills=[dict(row) for row in _rows_from_payload(all_fills_payload, "fills", "data")],
account={"balances": balances},
open_notional=open_notional,
source="bingx",
recovered=bool(include_history),
)
self._state = snapshot
return snapshot
async def refresh_state(self, symbol: str | None = None, *, include_history: bool = False) -> ExchangeStateSnapshot:
return await self._refresh_exchange_state(symbol, include_history=include_history)
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
# ── S3 — Maker order placement (CRITICAL TODO) ────────────────────────────
# Currently all orders are MARKET (taker, 0.04% fee). For exits especially,
# a LIMIT order at bid+1tick (SHORT close) or ask-1tick (LONG close) with
# reduceOnly=true can fill as maker (0.02% fee) — 50% fee saving per trade.
#
# Design requirements before this is safe to enable:
#
# 1. SHORT/LONG awareness (CRITICAL):
# SHORT close → BUY side → LIMIT at bid+1tick (slightly above bid to rest on book)
# LONG close → SELL side → LIMIT at ask-1tick
# SHORT enter → SELL side → LIMIT at ask-1tick (only if signal allows waiting)
# LONG enter → BUY side → LIMIT at bid+1tick
#
# 2. OBF integration: the OBF subsystem tracks order book depth and emits
# spread guidance. Use OBF.spread_bps and OBF.bid_depth / ask_depth to
# decide whether a maker order is viable (tight spread = fast fill,
# thin book = price impact of own order too large).
#
# 3. TODO_ADD_PARAMSET_VIBRISS: The "calm market" threshold (currently
# hardcoded as spread_bps < 5) MUST be VIBRISS-calibrated. VIBRISS
# governs our own assessment of market microstructure quality. Some
# invariants may remain hardcoded (e.g. "never use maker if spread > 50bps")
# but the normal operating range should be a VIBRISS metaparameter.
# See: prod/vibriss/ for the param-set schema.
#
# 4. Timeout calibration: timeout_s cannot be a fixed constant. It must
# reflect: (a) the strategy's max adverse excursion budget for the position,
# (b) the expected fill velocity from OBF order-flow data, (c) the remaining
# time before the signal's max_hold threshold. A resting limit that misses
# its cancel window creates an orphaned order — a hard risk control failure.
#
# 5. Price impact awareness (future, not yet operational):
# For large notionals, our own LIMIT order changes the book. At some notional
# threshold, the maker price improvement is eaten by the impact of our own
# resting order. The OBF subsystem must estimate this before we switch from
# MARKET to LIMIT. This is the "price impact of our own order" problem.
#
# CRITICAL TODO: Abstraction. The entire order placement logic (MARKET/LIMIT/
# TWAP/VWAP/Iceberg, maker/taker selection, price impact estimation, timeout/
# fallback, cancel-replace) MUST eventually be a dedicated "Smart Order Router"
# (SOR) subsystem. submit_intent() should be a thin dispatcher into the SOR,
# not a monolith. This is a pre-condition for multi-venue support, co-location
# optimisation, and regulatory reporting.
#
# GAP in characterisation: why does BingX separate REST (order placement) from
# WS (fill events)? Almost certainly historical: REST came first for audit-
# trail reasons (HMAC-signed requests → deterministic replay), WS push came
# later for low-latency fills. The FIX protocol unifies these under one
# connection — that is the right long-term model and the reason venue-agnostic
# abstraction via the VenueAdapter protocol is critical. Any future venue
# (Bybit, OKX, Binance) will have the same REST/WS split.
# ─────────────────────────────────────────────────────────────────────────
async def submit_intent(self, intent: Intent) -> ExecutionReceipt:
symbol = self._instrument_venue_symbol(intent.asset)
if intent.action == DecisionAction.EXIT:
side = "SELL" if intent.side == TradeSide.LONG else "BUY"
else:
side = "BUY" if intent.side == TradeSide.LONG else "SELL"
reduce_only = bool(intent.action == DecisionAction.EXIT)
if reduce_only:
self._exit_client_order_seq += 1
client_order_id = f"pink:{self._client_order_run_id}:x{self._exit_client_order_seq:02d}"
else:
self._entry_client_order_seq += 1
client_order_id = f"pink:{self._client_order_run_id}:e{self._entry_client_order_seq:02d}"
leverage = normalize_bingx_leverage_value(
int(round(float(intent.leverage or self._config.default_leverage))),
exchange_max=self._config.exchange_leverage_cap,
)
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
# S1: leverage cache — skip POST when exchange already has the right value.
await self._ensure_leverage(symbol, leverage)
# Capture mark price BEFORE the order POST for slippage measurement (Gap 3).
# This is the most honest reference: the market state at decision time.
mark_at_submit = 0.0
submit_ts_ms = int(time.time() * 1000)
if self._state is not None:
for _sym_key, _pos in self._state.open_positions.items():
_mark = float(_pos.get("markPrice") or _pos.get("avgPrice") or 0.0)
if _mark > 0:
mark_at_submit = _mark
break
if mark_at_submit <= 0:
# No open position for mark — use the last known mark from any symbol
# as a rough reference (e.g. for fresh ENTER with no position yet)
pass # remains 0.0; slippage will be 0
try:
# Honor the order type forwarded by the venue adapter
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
# (bingx_venue._legacy_intent encodes _order_type/_limit_price).
order_type = str((intent.metadata or {}).get("_order_type", "MARKET") or "MARKET").upper()
limit_price = float((intent.metadata or {}).get("_limit_price", 0.0) or 0.0)
is_limit = order_type == "LIMIT" and limit_price > 0.0
payload: dict[str, Any] = {
"symbol": symbol,
"side": side,
"positionSide": "BOTH",
"type": "LIMIT" if is_limit else "MARKET",
"quantity": self._format_quantity(intent.asset, intent.target_size),
"clientOrderId": client_order_id,
"recvWindow": str(int(self._config.recv_window_ms)),
}
if is_limit:
payload["price"] = self._format_price(intent.asset, limit_price)
payload["timeInForce"] = "GTC"
if reduce_only:
payload["reduceOnly"] = "true"
ack_payload = await self._client.signed_post("/openApi/swap/v2/trade/order", payload)
ack = BingxOrderAck.from_http(ack_payload if isinstance(ack_payload, dict) else {})
ack_row = dict(unwrap_order_payload(ack_payload)) if isinstance(ack_payload, dict) else {}
status = str(ack_row.get("status") or ack.status or "ACKED")
fill_price = 0.0
for key in ("avgPrice", "avgFilledPrice", "price", "lastFillPrice", "tradePrice"):
try:
value = float(ack_row.get(key) or 0.0)
except Exception:
value = 0.0
if value > 0:
fill_price = value
break
if fill_price <= 0 and self._state is not None:
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
fill_price = next(
(float(row.get("markPrice") or row.get("avgPrice") or 0.0)
for row in self._state.open_positions.values()
if float(row.get("markPrice") or row.get("avgPrice") or 0.0) > 0),
0.0,
)
except BingxHttpError as exc:
status = "RATE_LIMITED" if _is_rate_limited_error(exc) else "REJECTED"
ack_row = {
"status": status,
"msg": str(exc),
"symbol": symbol,
"clientOrderId": client_order_id,
}
fill_price = 0.0
ack = None
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
is_limit = False
# ── Gap 2: fee estimation (ESTIMATED_TAKER / ESTIMATED_MAKER) ────────
# BingX REST ACK does not include commission. WS FILL_SETTLED will deliver
# the actual fee later and update the fee_source to "WS_SETTLED".
# Until then, log an estimate so CH rows are never blank on this field.
fill_qty = float(ack_row.get("executedQty") or ack_row.get("filledQty") or
getattr(intent, "target_size", 0.0) or 0.0)
if is_limit:
# LIMIT orders *may* rest and fill as maker — optimistic estimate.
fee_rate = 0.0002 # BingX perpetuals maker fee 0.02%
fee_source = "ESTIMATED_MAKER"
is_maker_est = True
else:
fee_rate = 0.0004 # BingX perpetuals taker fee 0.04%
fee_source = "ESTIMATED_TAKER"
is_maker_est = False
estimated_fee = fill_price * fill_qty * fee_rate if fill_price > 0 and fill_qty > 0 else 0.0
# ── Gap 3: slippage (signed, vs mark_at_submit) ───────────────────────
# Positive = worse than mark (taker overpays), negative = better (maker/price improvement).
# Measured for BOTH taker and maker fills so post-trade analytics can compare.
slippage_bps = 0.0
if mark_at_submit > 0 and fill_price > 0:
raw_diff = (fill_price - mark_at_submit) / mark_at_submit * 10_000
# Sign convention: adverse = positive regardless of direction.
# For BUY (LONG enter / SHORT close): higher fill price = worse → positive
# For SELL (SHORT enter / LONG close): lower fill price = worse → positive
if side == "BUY":
slippage_bps = raw_diff # positive if fill > mark (paid up)
else:
slippage_bps = -raw_diff # positive if fill < mark (sold down)
# Exchange-assigned fill time from ACK (field "updateTime" or "time"); 0 if absent.
exchange_ts = int(ack_row.get("updateTime") or ack_row.get("time") or
ack_row.get("transactTime") or 0)
# Annotate ack_row with computed friction so _events_from_submit can read them.
ack_row["_fee_estimated"] = estimated_fee
ack_row["_fee_source"] = fee_source
ack_row["_is_maker_est"] = is_maker_est
ack_row["_mark_at_submit"] = mark_at_submit
ack_row["_slippage_bps"] = slippage_bps
ack_row["_submit_ts_ms"] = submit_ts_ms
ack_row["_exchange_ts"] = exchange_ts
receipt = ExecutionReceipt(
timestamp=datetime.now(timezone.utc),
status=status,
symbol=symbol,
side=side,
action=intent.action.value,
quantity=float(intent.target_size or 0.0),
price=fill_price,
client_order_id=client_order_id,
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
order_id=str(
(ack.order_id if "ack" in locals() and ack is not None else "")
or ack_row.get("orderId") or ""
),
raw_ack=ack_row,
raw_state=dict(self._state.account if self._state is not None else {}),
)
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
# S2: background state refresh — do not block the submit path for MARKET fills.
# For MARKET orders that returned FILLED in the ACK, all fill data is already
# in the receipt. The WS stream (FILL_SETTLED + ACCOUNT_UPDATE) delivers
# capital truth. The background REST poll is belt-and-suspenders.
# For LIMIT / non-FILLED orders: must refresh synchronously to detect resting order.
market_filled = (status == "FILLED" and not is_limit)
if market_filled:
asyncio.create_task(
self._refresh_state_background(intent.asset),
name=f"state_refresh_{symbol}",
)
else:
self._state = await self._refresh_exchange_state(intent.asset, include_history=False)
return receipt
async def cancel(self, order: Any, *, reason: str = "") -> dict[str, Any]:
"""Cancel a working order on the venue (resting LIMIT support).
Signs the DELETE with the same client used for order placement, keyed by
the venue orderId (propagated onto the slot order by the kernel on ACK)
with a clientOrderId fallback. Returns the raw BingX response for the
venue adapter to map into a CANCEL_ACK / CANCEL_REJECT event.
"""
asset = str((getattr(order, "metadata", None) or {}).get("asset") or "")
symbol = self._instrument_venue_symbol(asset) if asset else ""
params: dict[str, Any] = {
"symbol": symbol,
"recvWindow": str(int(self._config.recv_window_ms)),
}
venue_order_id = str(getattr(order, "venue_order_id", "") or "")
venue_client_id = str(getattr(order, "venue_client_id", "") or "")
if venue_order_id:
params["orderId"] = venue_order_id
elif venue_client_id:
params["clientOrderId"] = venue_client_id
else:
return {"status": "REJECTED", "msg": "no order id to cancel",
"orderId": venue_order_id, "clientOrderId": venue_client_id}
delete_resp: dict[str, Any] = {}
try:
resp = await self._client.signed_delete("/openApi/swap/v2/trade/order", params)
delete_resp = resp if isinstance(resp, dict) else {"status": "CANCELED"}
except BingxHttpError as exc:
delete_resp = {"status": "RATE_LIMITED" if _is_rate_limited_error(exc) else "ERROR", "msg": str(exc)}
# Truth-based confirmation: the cancel succeeded iff the order is no
# longer open on the venue. BingX can return transient errors (e.g.
# "order not exist", "same order number ... within 1 second" from an
# internal retry) even when the order was actually removed — so we trust
# exchange state, not the DELETE response.
still_open: bool | None = None
try:
oo = await self._client.signed_get("/openApi/swap/v2/trade/openOrders", {"symbol": symbol})
rows = oo if isinstance(oo, list) else (oo.get("data") or oo.get("orders") or [])
if isinstance(rows, dict):
rows = rows.get("orders") or []
ids = {str(r.get("orderId")) for r in rows if isinstance(r, dict)}
cids = {str(r.get("clientOrderId") or r.get("clientOrderID")) for r in rows if isinstance(r, dict)}
still_open = (venue_order_id in ids) if venue_order_id else (venue_client_id in cids)
except Exception:
still_open = None
if still_open is False:
return {"status": "CANCELED", "orderId": venue_order_id, "clientOrderId": venue_client_id}
if str(delete_resp.get("status", "")).upper() in {"CANCELED", "CANCELLED", "SUCCESS", "OK"}:
return {"status": "CANCELED", "orderId": venue_order_id, "clientOrderId": venue_client_id}
return {
"status": delete_resp.get("status", "REJECTED"),
"msg": delete_resp.get("msg", "cancel not confirmed"),
"orderId": venue_order_id, "clientOrderId": venue_client_id,
}
async def reconcile(self, symbol: str | None = None) -> ExchangeStateSnapshot:
# Recovery-only path: ask the venue for authoritative account/position/order state.
return await self._refresh_exchange_state(symbol, include_history=True)