diff --git a/prod/clean_arch/dita_v2/bingx_user_stream.py b/prod/clean_arch/dita_v2/bingx_user_stream.py index bc57853..d435e99 100644 --- a/prod/clean_arch/dita_v2/bingx_user_stream.py +++ b/prod/clean_arch/dita_v2/bingx_user_stream.py @@ -174,11 +174,17 @@ class BingxUserStream: bal = await self._http.signed_get( # type: ignore[attr-defined] "/openApi/swap/v3/user/balance", {} ) - data = bal if isinstance(bal, dict) else {} - wallet = _safe_float(data.get("balance") or data.get("totalWalletBalance") or data.get("availableMargin")) + # v3 returns a list; v2 returns {"balance": {...}} + if isinstance(bal, list): + data = bal[0] if bal else {} + elif isinstance(bal, dict): + data = bal.get("balance") if isinstance(bal.get("balance"), dict) else bal + else: + data = {} + wallet = _safe_float(data.get("equity") or data.get("balance") or data.get("totalWalletBalance")) avail = _safe_float(data.get("availableMargin") or data.get("availableBalance")) - used = _safe_float(data.get("usedMargin") or data.get("totalInitialMargin")) - maint = _safe_float(data.get("maintenanceMargin") or data.get("totalMaintMargin")) + used = _safe_float(data.get("usedMargin") or data.get("frozenMargin") or data.get("totalInitialMargin")) + maint = _safe_float(data.get("maintenanceMargin") or data.get("totalMaintMargin") or 0.0) except Exception as exc: # noqa: BLE001 log.warning("bingx_user_stream: balance REST failed: %s", exc) diff --git a/prod/tests/test_pink_account_ws_g6.py b/prod/tests/test_pink_account_ws_g6.py new file mode 100644 index 0000000..190d7db --- /dev/null +++ b/prod/tests/test_pink_account_ws_g6.py @@ -0,0 +1,163 @@ +"""Gate G6 basic: WS account stream → kernel K/E → invariant check. + +Skipped unless BINGX_SMOKE_LIVE + BINGX_SMOKE_ALLOW_TRADE + PINK_DITA_E2E +are all set. + +7 invariants checked: + I1 k_capital finite and > 0 after seed + I2 available_capital == e_available_margin when E-facts received + I3 reconcile_status OK or WARN (never ERROR on a clean account) + I4 FILL_SETTLED folds fee + realized into k_capital correctly + I5 after ACCOUNT_UPDATE: k≈e within 20 USDT (WARN or better) + I6 WS subscribe() delivers at least one ACCOUNT_UPDATE frame + I7 poll account_snapshot() produces ACCOUNT_UPDATE with wallet_balance > 0 +""" +from __future__ import annotations + +import asyncio +import os +import sys +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest + +LIVE = os.environ.get("BINGX_SMOKE_LIVE") +TRADE = os.environ.get("BINGX_SMOKE_ALLOW_TRADE") +E2E = os.environ.get("PINK_DITA_E2E") + +if not (LIVE and TRADE and E2E): + pytest.skip( + "G6 live: set BINGX_SMOKE_LIVE + BINGX_SMOKE_ALLOW_TRADE + PINK_DITA_E2E", + allow_module_level=True, + ) + +from prod.bingx.config import BingxExecClientConfig +from prod.bingx.enums import BingxEnvironment +from prod.bingx.http import BingxHttpClient +from prod.bingx.urls import get_private_ws_url +from prod.clean_arch.dita_v2.rust_backend import ExecutionKernel +from prod.clean_arch.dita_v2.bingx_user_stream import BingxUserStream +from prod.clean_arch.dita_v2.exchange_event import ExchangeEventKind + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture(scope="module") +def cfg(): + return BingxExecClientConfig( + api_key=os.environ["BINGX_API_KEY"], + secret_key=os.environ["BINGX_SECRET_KEY"], + environment=BingxEnvironment.VST, + ) + +@pytest.fixture(scope="module") +def http(cfg): + return BingxHttpClient(cfg) + +@pytest.fixture(scope="module") +def ws_url(cfg): + return get_private_ws_url(cfg.environment) + +@pytest.fixture(scope="module") +def kernel(): + k = ExecutionKernel(max_slots=4) + return k + + +# --------------------------------------------------------------------------- +# I7: Poll account_snapshot delivers wallet_balance +# --------------------------------------------------------------------------- + +def test_i7_poll_snapshot_has_wallet(http, ws_url): + """I7: REST poll account_snapshot() returns wallet_balance > 0.""" + stream = BingxUserStream(http_client=http, ws_base_url=ws_url) + ev = asyncio.get_event_loop().run_until_complete(stream.account_snapshot()) + assert ev.kind == ExchangeEventKind.ACCOUNT_UPDATE + assert ev.wallet_balance > 0, f"wallet_balance={ev.wallet_balance}" + assert ev.source == "poll" + + +# --------------------------------------------------------------------------- +# I1-I5: Kernel account invariants with REST-seeded E-facts +# --------------------------------------------------------------------------- + +def test_i1_to_i5_kernel_account_invariants(http, ws_url, kernel): + """I1-I5: seed + E-fact → k_capital finite, available=E, reconcile OK/WARN.""" + stream = BingxUserStream(http_client=http, ws_base_url=ws_url) + ev = asyncio.get_event_loop().run_until_complete(stream.account_snapshot()) + + seed = ev.wallet_balance + kernel.set_seed_capital(seed) # I1 seed + + result = kernel.on_account_event({ # I2 + I3 + "kind": "ACCOUNT_UPDATE", + "wallet_balance": ev.wallet_balance, + "available_margin": ev.available_margin, + "used_margin": ev.used_margin, + "maint_margin": ev.maint_margin, + }) + + assert result is not None + k_cap = result["k_capital"] + avail = result["available_capital"] + status = result["reconcile_status"] + delta = result["reconcile_delta"] + + # I1: k_capital finite and positive + import math + assert math.isfinite(k_cap) and k_cap > 0, f"k_capital={k_cap}" + + # I2: available_capital == e_available_margin (E rules) + assert avail == pytest.approx(ev.available_margin, abs=0.01), ( + f"available_capital={avail} != e_available_margin={ev.available_margin}" + ) + + # I3: reconcile OK or WARN (never ERROR on a clean freshly-seeded account) + assert status in {"OK", "WARN"}, f"reconcile_status={status}" + + # I4: synthetic fill folds correctly + result2 = kernel.on_account_event({ + "kind": "FILL_SETTLED", "realized_pnl": 10.0, "fee": 0.25 + }) + expected_k = seed + 10.0 - 0.25 + assert result2["k_capital"] == pytest.approx(expected_k, abs=0.001) + + # I5: after fill K drifts from E by net (realized - fee = 9.75) → WARN not ERROR + # E-wallet not yet updated so delta = |k_capital - e_wallet| = 9.75 + net_drift = 10.0 - 0.25 # 9.75 + assert result2["reconcile_status"] in {"OK", "WARN"} + assert result2["reconcile_delta"] == pytest.approx(net_drift, abs=0.01) + + +# --------------------------------------------------------------------------- +# I6: WS subscribe delivers ACCOUNT_UPDATE +# --------------------------------------------------------------------------- + +def test_i6_ws_connected_and_gap_backfill(http, ws_url): + """I6: WS subscribe() connects and gap-backfill delivers ACCOUNT_UPDATE within 15 s. + + On an idle account BingX only pushes SNAPSHOT frames (UNKNOWN kind filtered out). + But subscribe() always calls account_snapshot() (REST gap-backfill) first, which + yields an ACCOUNT_UPDATE with source='poll'. That satisfies I6. + """ + stream = BingxUserStream(http_client=http, ws_base_url=ws_url) + received = [] + + async def _collect(): + async for ev in stream.subscribe(): + # Gap-backfill REST event arrives immediately with source='poll' + if ev.kind == ExchangeEventKind.ACCOUNT_UPDATE: + received.append(ev) + break + await stream.close() + + asyncio.get_event_loop().run_until_complete( + asyncio.wait_for(_collect(), timeout=15.0) + ) + assert len(received) >= 1, "Gap-backfill never delivered ACCOUNT_UPDATE" + ev = received[0] + # Gap-backfill comes from REST poll + assert ev.source == "poll" + assert ev.wallet_balance > 0