Files
siloqy/prod/bingx/http.py
Codex 2c9da8f592 PINK Phase 0: FET -$5,990 fix batch — leverage-free PnL, true fill prices, reconcile baseline anchors
Defects fix (FET -$5,990 replay, 2026-06-11):
- realized_pnl() and mark_price(): PnL = qty × Δprice, side-signed; no ×leverage inflation (was 3× every leg).
- BingX MARKET fill events carry true fill price (avgPrice/lastFillPrice), never the order's nominal price (protective bound ±20-25% from mark, poisoned PnL to -$5,990 on a +$164 round-trip).
- Fill routing by ORDER IDENTITY first, FSM state second — late entry-remainder fills during EXIT_WORKING no longer misclassify as exits.
- Entry basis = VWAP across entry fills, not last fill price.
- reconcile_from_slots / restore_state: re-anchor _last_settled_pnl / _slot_was_closed to adopted slot state (cross-restart double-book of carried PnL).
- ACCOUNT_UPDATE with wallet_balance=0 dropped (margin-only frames no longer zero e_available_margin).
- Foreign-fill skip on shared VST account (PRODGREEN collision filter).
- exec_router TTL: entry-requote venue-truth gate (recent own fill + live exchange position probes prevent double-entry).
- bingx_direct: openOrders fetched BEFORE positions (sequential ordering prevents dangerous tear → double-entries).
- Dual-leverage translation via map_internal_conviction_to_exchange_leverage() (strategy conviction → integer at-exchange leverage, bankers rounding).
- BLUE-parity alpha components wired: asset picker (IRP universe ranking) + alpha sizer (cubic-convex dynamic leverage, 0.5-8.0 range).
- ch_writer: date_time_input_format=best_effort on insert URLs; flush error logging at WARNING with counter.
- blue_parity.price_of(): hyphen-tolerant fallback (FET-USDT → FETUSDT).
- Fill test updated to incremental filled_size semantics (BingX WS lastFilledQty).
- Env-override base URLs, supervisord autorestart, per-asset DC histories, single-slot invariant, fill-attribution filter.

Co-authored-by: CommandCodeBot <noreply@commandcode.ai>
2026-06-11 20:53:49 +02:00

646 lines
28 KiB
Python

from __future__ import annotations
import asyncio
import json
import logging
import os
import socket
import subprocess
from contextlib import asynccontextmanager
from collections.abc import Mapping
from dataclasses import dataclass
from decimal import Decimal
from typing import Any
from urllib.parse import urlsplit
from urllib.parse import urlunsplit
import httpx
from .config import BingxExecClientConfig
from .config import require_mainnet_opt_in
from .rate_limits import BingxCircuitBreaker
from .signing import build_signed_params
from .signing import canonical_query
from .rate_limits import BingxRateLimitTracker
from .dns_cache import BingxDnsFallbackCache
from .dns_cache import is_bingx_hostname
from .urls import get_rest_base_urls
class BingxHttpError(RuntimeError):
pass
@dataclass(frozen=True)
class BingxHttpResponse:
code: int
msg: str
data: Any
def _recv_window_ms_or_default(value: Any, default: int = 5_000) -> int:
try:
parsed = int(value)
except Exception:
return default
return parsed if parsed > 0 else default
def _positive_int_or_default(value: Any, default: int) -> int:
try:
parsed = int(value)
except Exception:
return default
return parsed if parsed > 0 else default
class BingxHttpClient:
def __init__(self, config: BingxExecClientConfig) -> None:
self._logger = logging.getLogger(__name__)
self._config = config
require_mainnet_opt_in(config.environment, getattr(config, "allow_mainnet", False), context="BingX HTTP client")
# Env overrides (2026-06-10): the *.bingx.pro backup mirrors serve an
# incomplete TLS chain (missing Cloudflare intermediate) and can never
# verify on this host, so every failover retry was burning time on a
# dead endpoint. Operators can point the backup at a reachable host
# (e.g. the primary itself) without touching code.
_env_primary = os.environ.get("DOLPHIN_BINGX_BASE_URL", "").strip()
_env_backup = os.environ.get("DOLPHIN_BINGX_BASE_URL_BACKUP", "").strip()
self._base_urls = (
_env_primary or config.base_url_http or get_rest_base_urls(config.environment)[0],
_env_backup or config.base_url_http_backup or get_rest_base_urls(config.environment)[1],
)
self._base_hosts = tuple(urlsplit(url).hostname for url in self._base_urls)
self._api_key = config.api_key
self._secret_key = config.secret_key
self._timeout_secs = _positive_int_or_default(config.http_timeout_secs, 10)
self._source_key = "WEB"
self._rate_limits = BingxRateLimitTracker()
self._dns_cache = BingxDnsFallbackCache()
for host in self._base_hosts:
if host is not None and is_bingx_hostname(host):
self._dns_cache.record_static(host)
self._circuit_breaker = BingxCircuitBreaker(
failure_threshold=_positive_int_or_default(config.max_retries, 3),
base_backoff_ms=_positive_int_or_default(config.retry_delay_initial_ms, 250),
max_backoff_ms=_positive_int_or_default(config.retry_delay_max_ms, 2_000),
)
self._session: httpx.AsyncClient | None = None
self._dns_patch_lock: asyncio.Lock | None = None
async def public_get(self, path: str, params: Mapping[str, object] | None = None) -> Any:
return await self._request_json("GET", path, params or {}, signed=False)
async def signed_get(self, path: str, params: Mapping[str, object] | None = None) -> Any:
return await self._request_json("GET", path, params or {}, signed=True)
async def signed_post(
self,
path: str,
params: Mapping[str, object] | None = None,
*,
idempotent: bool = True,
) -> Any:
# idempotent=False: skip retry loop and backup-URL fallback.
# Use this for MARKET orders — retrying a non-idempotent order to the
# same exchange (both bingx.com and bingx.pro hit the same account) will
# open duplicate positions with no clientOrderId for deduplication.
return await self._request_json(
"POST", path, params or {}, signed=True,
max_retries_override=0 if not idempotent else None,
)
async def signed_put(self, path: str, params: Mapping[str, object] | None = None) -> Any:
return await self._request_json("PUT", path, params or {}, signed=True)
async def signed_delete(self, path: str, params: Mapping[str, object] | None = None) -> Any:
return await self._request_json("DELETE", path, params or {}, signed=True)
async def signed_post_raw(self, path: str, params: Mapping[str, object] | None = None) -> Any:
return await self._request_json(
"POST",
path,
params or {},
signed=True,
expect_envelope=False,
)
async def signed_put_raw(
self,
path: str,
params: Mapping[str, object] | None = None,
*,
allow_empty: bool = False,
) -> Any:
return await self._request_json(
"PUT",
path,
params or {},
signed=True,
expect_envelope=False,
allow_empty=allow_empty,
)
async def signed_delete_raw(
self,
path: str,
params: Mapping[str, object] | None = None,
*,
allow_empty: bool = False,
) -> Any:
return await self._request_json(
"DELETE",
path,
params or {},
signed=True,
expect_envelope=False,
allow_empty=allow_empty,
)
async def close(self) -> None:
if self._session is not None:
if hasattr(self._session, "aclose"):
await self._session.aclose()
else: # pragma: no cover - compatibility fallback
await self._session.close()
self._session = None
@property
def rate_limits(self) -> BingxRateLimitTracker:
return self._rate_limits
def rate_limit_snapshot(self):
return self._rate_limits.snapshot()
@property
def circuit_breaker(self) -> BingxCircuitBreaker:
return self._circuit_breaker
async def _get_session(self) -> httpx.AsyncClient:
if self._session is None:
limits = httpx.Limits(max_connections=64, max_keepalive_connections=32, keepalive_expiry=300.0)
timeout = httpx.Timeout(self._timeout_secs)
self._session = httpx.AsyncClient(
http2=True,
limits=limits,
timeout=timeout,
headers={"Accept-Encoding": "gzip, deflate"},
trust_env=False,
)
return self._session
def _get_dns_patch_lock(self) -> asyncio.Lock:
if self._dns_patch_lock is None:
self._dns_patch_lock = asyncio.Lock()
return self._dns_patch_lock
async def _request_via_fallback_session(
self,
*,
session: Any | None,
hostname: str | None,
ips: tuple[str, ...] | None,
method: str,
url: str,
headers: dict[str, str],
body: str | None,
) -> httpx.Response:
if session is not None and not isinstance(session, httpx.AsyncClient):
return await session.request(
method=method,
url=url,
headers=headers,
data=body,
)
if hostname and ips:
parsed = urlsplit(url)
last_error: Exception | None = None
for ip in ips:
target_host = f"[{ip}]" if ":" in ip else ip
fallback_url = urlunsplit((parsed.scheme, target_host, parsed.path, parsed.query, parsed.fragment))
fallback_headers = dict(headers)
fallback_headers["Host"] = hostname
limits = httpx.Limits(max_connections=8, max_keepalive_connections=4, keepalive_expiry=30.0)
timeout = httpx.Timeout(self._timeout_secs)
async with httpx.AsyncClient(
http2=True,
limits=limits,
timeout=timeout,
headers={"Accept-Encoding": "gzip, deflate"},
trust_env=False,
) as fallback_session:
try:
return await fallback_session.request(
method=method,
url=fallback_url,
headers=fallback_headers,
data=body,
extensions={"sni_hostname": hostname},
)
except Exception as exc:
last_error = exc
continue
if last_error is not None:
curl_response = await self._request_via_curl_fallback(
hostname=hostname,
ips=ips,
method=method,
url=url,
headers=headers,
body=body,
)
if curl_response is not None:
return curl_response
raise last_error
limits = httpx.Limits(max_connections=8, max_keepalive_connections=4, keepalive_expiry=30.0)
timeout = httpx.Timeout(self._timeout_secs)
async with httpx.AsyncClient(
http2=True,
limits=limits,
timeout=timeout,
headers={"Accept-Encoding": "gzip, deflate"},
trust_env=False,
) as fallback_session:
return await fallback_session.request(
method=method,
url=url,
headers=headers,
data=body,
)
async def _request_via_curl_fallback(
self,
*,
hostname: str | None,
ips: tuple[str, ...] | None,
method: str,
url: str,
headers: dict[str, str],
body: str | None,
) -> httpx.Response | None:
if not hostname or not ips:
return None
header_args = []
for key, value in headers.items():
if key.lower() == "host":
continue
header_args.extend(["-H", f"{key}: {value}"])
for ip in ips:
proc = await asyncio.create_subprocess_exec(
"curl",
"--silent",
"--show-error",
"--fail",
"--http2",
"--resolve",
f"{hostname}:443:{ip}",
"-X",
method,
*header_args,
*(["--data-binary", "@-"] if body is not None else []),
url,
stdin=asyncio.subprocess.PIPE if body is not None else None,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate(body.encode("utf-8") if body is not None else None)
if proc.returncode == 0:
text = stdout.decode("utf-8", errors="replace")
header_block, _, body_text = text.partition("\r\n\r\n")
if not body_text:
header_block, _, body_text = text.partition("\n\n")
status_code = 200
reason = "OK"
response_headers: dict[str, str] = {}
for line in header_block.splitlines():
if not line:
continue
if line.startswith("HTTP/"):
parts = line.split(" ", 2)
if len(parts) >= 2:
try:
status_code = int(parts[1])
except Exception:
status_code = 200
reason = parts[2] if len(parts) >= 3 else reason
continue
if ":" in line:
key, value = line.split(":", 1)
response_headers[key.strip()] = value.strip()
return httpx.Response(
status_code=status_code,
headers=response_headers,
content=(body_text.strip() if body_text else "").encode("utf-8"),
request=httpx.Request(method, url),
)
last_err = stderr.decode("utf-8", errors="replace").strip() or "curl fallback failed"
self._logger.warning("BingX curl DNS fallback failed: host=%s ip=%s error=%s", hostname, ip, last_err)
return None
@asynccontextmanager
async def _dns_fallback_context(self, hostname: str, ips: tuple[str, ...]):
lock = self._get_dns_patch_lock()
async with lock:
original_getaddrinfo = socket.getaddrinfo
def patched_getaddrinfo(*args, **kwargs):
if args and args[0] == hostname:
port = args[1] if len(args) > 1 else None
rest_args = args[2:] if len(args) > 2 else ()
resolved: list[tuple[Any, ...]] = []
for ip in ips:
resolved.extend(original_getaddrinfo(ip, port, *rest_args, **kwargs))
return resolved
return original_getaddrinfo(*args, **kwargs)
socket.getaddrinfo = patched_getaddrinfo # type: ignore[assignment]
try:
yield
finally:
socket.getaddrinfo = original_getaddrinfo # type: ignore[assignment]
async def _maybe_refresh_dns_cache(self, hostname: str) -> None:
if not is_bingx_hostname(hostname):
return
try:
ips = await asyncio.to_thread(self._dns_cache.maybe_refresh_from_dns, hostname)
if ips:
self._logger.debug("BingX DNS cache refreshed: host=%s ips=%s", hostname, ",".join(ips))
except Exception as exc: # pragma: no cover - best-effort observability
self._logger.debug("BingX DNS cache refresh skipped: host=%s error=%s", hostname, exc)
async def _request_json(
self,
method: str,
path: str,
params: Mapping[str, object],
*,
signed: bool,
expect_envelope: bool = True,
allow_empty: bool = False,
max_retries_override: int | None = None,
) -> Any:
session = await self._get_session()
last_error: Exception | None = None
max_retries = (
max_retries_override
if max_retries_override is not None
else _positive_int_or_default(self._config.max_retries, 3)
)
# When max_retries=0, also restrict to primary URL only — no backup
# fallback either, because backup hits the same exchange account.
urls_to_try = self._base_urls[:1] if max_retries == 0 else self._base_urls
for attempt in range(max_retries + 1):
await self._circuit_breaker.wait_if_open()
for base_index, base_url in enumerate(urls_to_try):
hostname = self._base_hosts[base_index]
try:
url = f"{base_url}{path}"
body: str | None = None
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip, deflate",
"Accept-Language": "en-US,en;q=0.9",
"User-Agent": (
"Mozilla/5.0 (X11; Linux x86_64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/135.0.0.0 Safari/537.36"
),
"X-SOURCE-KEY": self._source_key,
}
if signed and self._api_key:
headers["X-BX-APIKEY"] = self._api_key
payload = dict(params)
if signed:
if not self._api_key or not self._secret_key:
raise BingxHttpError("BingX API credentials are required for signed requests")
payload = build_signed_params(
payload,
self._secret_key,
recv_window_ms=_recv_window_ms_or_default(self._config.recv_window_ms),
)
canonical = canonical_query(
{k: v for k, v in payload.items() if k != "signature"}
)
if method in {"GET", "DELETE"}:
query = canonical
if signed:
query = f"{canonical}&signature={payload['signature']}"
if query:
url = f"{url}?{query}"
else:
if signed:
body = f"{canonical}&signature={payload['signature']}"
else:
body = canonical
headers["Content-Type"] = "application/x-www-form-urlencoded"
if method == "POST" and body:
self._logger.debug(
"HTTP POST [attempt=%d url_idx=%d]: %s", attempt, base_index, body[:400]
)
response = await session.request(
method=method,
url=url,
headers=headers,
data=body,
)
text = response.text
self._rate_limits.update_rest_headers({k.lower(): v for k, v in response.headers.items()})
if response.status_code >= 400:
retryable = self._is_retryable_status(response.status_code)
if retryable:
delay = self._circuit_breaker.record_failure(
rate_limited=response.status_code == 429,
retry_after_ms=self._rate_limits.snapshot().rest_reset_ms,
)
last_error = BingxHttpError(f"HTTP {response.status_code}: {text or response.reason_phrase}")
if base_index < len(self._base_urls) - 1:
continue
if attempt < max_retries:
await asyncio.sleep(delay)
break
raise BingxHttpError(f"HTTP {response.status_code}: {text or response.reason_phrase}")
self._circuit_breaker.record_success()
if hostname is not None:
await self._maybe_refresh_dns_cache(hostname)
if not text.strip():
if allow_empty:
return None
data = {}
else:
data = json.loads(text)
if expect_envelope:
return self._unwrap_response(data)
return data
except BingxHttpError as exc:
retry_after_ms = self._retry_after_ms_from_error(str(exc))
if retry_after_ms is not None:
last_error = exc
delay = self._circuit_breaker.record_failure(
rate_limited=True,
retry_after_ms=retry_after_ms,
)
if attempt < max_retries:
await asyncio.sleep(delay)
break
raise
except httpx.HTTPError as exc:
self._logger.warning("HTTP error [att=%d url=%d %s]: %s%s",
attempt, base_index, method, type(exc).__name__, exc)
last_error = BingxHttpError(f"{method} {path} failed: {exc}")
if self._is_dns_resolution_error(exc):
if hostname is not None:
cached_ips = self._dns_cache.resolve(hostname)
if cached_ips:
self._logger.warning(
"BingX DNS fallback engaged: host=%s ips=%s error=%s",
hostname,
",".join(cached_ips),
exc,
)
try:
response = await self._request_via_fallback_session(
session=session,
hostname=hostname,
ips=cached_ips,
method=method,
url=url,
headers=headers,
body=body,
)
text = response.text
self._rate_limits.update_rest_headers(
{k.lower(): v for k, v in response.headers.items()}
)
if response.status_code >= 400:
raise BingxHttpError(
f"HTTP {response.status_code}: {text or response.reason_phrase}"
)
self._circuit_breaker.record_success()
await self._maybe_refresh_dns_cache(hostname)
if not text.strip():
if allow_empty:
return None
data = {}
else:
data = json.loads(text)
if expect_envelope:
return self._unwrap_response(data)
return data
except Exception as fallback_exc:
last_error = BingxHttpError(f"{method} {path} failed: {fallback_exc}")
if base_index < len(self._base_urls) - 1:
continue
if last_error is not None:
raise last_error
raise BingxHttpError(f"{method} {path} failed: {exc}")
delay = self._circuit_breaker.record_failure()
if base_index < len(self._base_urls) - 1:
continue
if attempt < max_retries:
await asyncio.sleep(delay)
break
except Exception as exc: # pragma: no cover - transport fallback
self._logger.warning("Transport error [att=%d url=%d %s]: %s%s",
attempt, base_index, method, type(exc).__name__, exc)
last_error = exc
if self._is_dns_resolution_error(exc):
if hostname is not None:
cached_ips = self._dns_cache.resolve(hostname)
if cached_ips:
self._logger.warning(
"BingX DNS fallback engaged: host=%s ips=%s error=%s",
hostname,
",".join(cached_ips),
exc,
)
try:
response = await self._request_via_fallback_session(
session=session,
hostname=hostname,
ips=cached_ips,
method=method,
url=url,
headers=headers,
body=body,
)
text = response.text
self._rate_limits.update_rest_headers(
{k.lower(): v for k, v in response.headers.items()}
)
if response.status_code >= 400:
raise BingxHttpError(
f"HTTP {response.status_code}: {text or response.reason_phrase}"
)
self._circuit_breaker.record_success()
await self._maybe_refresh_dns_cache(hostname)
if not text.strip():
if allow_empty:
return None
data = {}
else:
data = json.loads(text)
if expect_envelope:
return self._unwrap_response(data)
return data
except Exception as fallback_exc:
last_error = BingxHttpError(f"{method} {path} failed: {fallback_exc}")
if base_index < len(self._base_urls) - 1:
continue
if last_error is not None:
raise last_error
raise BingxHttpError(f"{method} {path} failed: {exc}")
delay = self._circuit_breaker.record_failure()
if base_index < len(self._base_urls) - 1:
continue
if attempt < max_retries:
await asyncio.sleep(delay)
break
if last_error is None:
raise BingxHttpError(f"{method} {path} failed without an error")
raise BingxHttpError(str(last_error))
@staticmethod
def _is_retryable_status(status_code: int) -> bool:
return status_code == 429 or status_code >= 500
@staticmethod
def _is_dns_resolution_error(exc: BaseException) -> bool:
text = str(exc).lower()
if isinstance(exc, socket.gaierror):
return True
return any(
marker in text
for marker in (
"name or service not known",
"temporary failure in name resolution",
"gaierror",
"nodename nor servname provided",
"getaddrinfo failed",
)
)
@staticmethod
def _unwrap_response(payload: dict[str, Any]) -> Any:
code = int(payload.get("code", -1))
if code != 0:
raise BingxHttpError(payload.get("msg", f"BingX error code {code}"))
return payload.get("data")
@staticmethod
def _retry_after_ms_from_error(message: str) -> int | None:
lower = message.lower()
if "109400" in message or "requests within 480000 ms" in lower:
return 480_000
if "rate limit" in lower or "too many requests" in lower:
return 60_000
return None
@staticmethod
def extract_available_balance(balance_rows: list[dict[str, object]], currency: str) -> Decimal:
for row in balance_rows:
if str(row.get("asset")) == currency:
return Decimal(str(row.get("availableBalance") or row.get("balance") or "0"))
return Decimal("0")