diff --git a/prod/clean_arch/dita_v2/bingx_venue.py b/prod/clean_arch/dita_v2/bingx_venue.py index 7876d63..58a2674 100644 --- a/prod/clean_arch/dita_v2/bingx_venue.py +++ b/prod/clean_arch/dita_v2/bingx_venue.py @@ -341,10 +341,21 @@ class BingxVenueAdapter(VenueAdapter): metadata=metadata, ) - def connect(self) -> bool: - result = getattr(self.backend, "connect", None) - if result is not None: - self._run(result()) + async def connect(self) -> bool: + """Async connect — awaits backend.connect() in the caller's event loop. + + The old sync path called self._run(backend.connect()) which spawns a + thread-pool asyncio.run(), creating the httpx AsyncClient in a temporary + loop that immediately closes. Every subsequent request then raises + "Event loop is closed" or "bound to a different event loop". + Awaiting directly here fixes that: the client is always created in the + main running loop. + """ + conn_fn = getattr(self.backend, "connect", None) + if conn_fn is not None: + result = conn_fn() + if inspect.isawaitable(result): + await result # backend.connect() already called refresh_state() — no second fetch needed return True @@ -402,10 +413,23 @@ class BingxVenueAdapter(VenueAdapter): return self._events_from_cancel(order, response, None, None, reason=reason) def open_orders(self) -> List[VenueOrder]: + # Use backend._state (populated by await backend.connect()) rather than + # _backend_snapshot() → _call_backend() → _run() → pool.submit(asyncio.run) + # which spawns a temporary event loop, creates the httpx AsyncClient inside + # it, then closes that loop — every subsequent HTTP call then raises + # "Event loop is closed" or "asyncio.locks.Event bound to a different loop". + backend_state = getattr(self.backend, "_state", None) + if backend_state is not None: + return [_venue_order_from_row(row) for row in (backend_state.open_orders or [])] snapshot = self._backend_snapshot(include_history=False) return [_venue_order_from_row(row) for row in (snapshot.open_orders or [])] def open_positions(self) -> List[dict[str, Any]]: + # Same rationale as open_orders(): prefer cached backend._state to avoid + # the thread-pool asyncio.run() path that corrupts the httpx session. + backend_state = getattr(self.backend, "_state", None) + if backend_state is not None: + return [dict(row) for row in (backend_state.open_positions or {}).values()] snapshot = self._backend_snapshot(include_history=False) return [dict(row) for row in (snapshot.open_positions or {}).values()] diff --git a/prod/clean_arch/dita_v2/test_bingx_bugs.py b/prod/clean_arch/dita_v2/test_bingx_bugs.py index df122d2..91b6520 100644 --- a/prod/clean_arch/dita_v2/test_bingx_bugs.py +++ b/prod/clean_arch/dita_v2/test_bingx_bugs.py @@ -544,7 +544,9 @@ class TestEventsFromCancelIgnoresSnapshots: class TestConnectNoDoubleRefresh: def test_connect_calls_backend_connect_once_no_extra_snapshot(self): """connect() must call backend.connect() exactly once and not call - refresh_state() a second time (the redundant _backend_snapshot was removed).""" + refresh_state() a second time (the redundant _backend_snapshot was removed). + connect() is async — must be awaited.""" + import asyncio backend = MagicMock() backend.connect = AsyncMock(return_value=True) backend.refresh_state = AsyncMock(return_value=_flat_snap()) @@ -556,7 +558,7 @@ class TestConnectNoDoubleRefresh: venue._snapshot_ready.set() venue._last_snapshot = None - result = venue.connect() + result = asyncio.run(venue.connect()) assert result is True # backend.connect called once backend.connect.assert_called_once() @@ -564,7 +566,9 @@ class TestConnectNoDoubleRefresh: backend.refresh_state.assert_not_called() def test_connect_without_backend_connect_is_noop(self): - """If backend has no connect(), connect() returns True without crashing.""" + """If backend has no connect(), connect() returns True without crashing. + connect() is async — must be awaited.""" + import asyncio backend = MagicMock(spec=[]) # empty spec — no attributes venue = BingxVenueAdapter.__new__(BingxVenueAdapter) venue.backend = backend @@ -574,7 +578,7 @@ class TestConnectNoDoubleRefresh: venue._snapshot_ready.set() venue._last_snapshot = None - result = venue.connect() + result = asyncio.run(venue.connect()) assert result is True diff --git a/prod/clean_arch/dita_v2/test_orphan_prevention.py b/prod/clean_arch/dita_v2/test_orphan_prevention.py new file mode 100644 index 0000000..0dc356a --- /dev/null +++ b/prod/clean_arch/dita_v2/test_orphan_prevention.py @@ -0,0 +1,484 @@ +"""Orphan-trade prevention regression tests. + +Covers every mechanism that can create orphan positions in the single-slot +DITAv2/PINK kernel: + + A. Event-loop corruption — open_positions() / open_orders() must NOT call + _backend_snapshot() from an async context; they must read backend._state. + B. connect() is awaitable — PinkDirectRuntime.connect() must await + venue.connect() in the main loop, never in a thread-pool asyncio.run(). + C. Reconcile multi-position guard — >1 BingX position on startup logs ERROR + and takes only the largest; never silently orphans the rest. + D. clientOrderId on every order — every ENTER/EXIT carries a unique + "p-{action}-{ts36}-{rand4}" id so BingX deduplicates retries. + E. EXIT sizing from slot — _exit_intent_from_slot() sizes the close from + kernel accounting, never from a policy size that could undershoot. +""" +from __future__ import annotations + +import asyncio +import inspect +import types +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any, List +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +ROOT_PATH = "/mnt/dolphinng5_predict" +import sys +sys.path.insert(0, ROOT_PATH) + + +# ── helpers ─────────────────────────────────────────────────────────────────── + +def _make_fake_state(positions: dict | None = None, orders: list | None = None): + """Minimal ExchangeStateSnapshot stand-in.""" + s = types.SimpleNamespace() + s.open_positions = positions or {} + s.open_orders = orders or [] + s.account = {} + s.all_orders = [] + s.all_fills = [] + return s + + +def _make_backend(*, state=None): + b = MagicMock() + b._state = state + b.connect = AsyncMock(return_value=True) + b.refresh_state = AsyncMock(return_value=state or _make_fake_state()) + b.submit_intent = AsyncMock() + return b + + +# ── A. Event-loop corruption ────────────────────────────────────────────────── + +class TestOpenPositionsUsesBackendState: + """open_positions() / open_orders() must read backend._state, never call + _backend_snapshot() which would spawn a thread-pool asyncio.run().""" + + def test_open_positions_reads_backend_state_when_present(self): + from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter + fake_state = _make_fake_state(positions={ + "TRX-USDT": {"symbol": "TRX-USDT", "positionAmt": "10", "positionSide": "SHORT"}, + }) + backend = _make_backend(state=fake_state) + venue = BingxVenueAdapter(backend=backend) + result = venue.open_positions() + assert len(result) == 1 + assert result[0]["symbol"] == "TRX-USDT" + + def test_open_positions_does_not_call_backend_snapshot_when_state_set(self): + from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter + backend = _make_backend(state=_make_fake_state(positions={ + "TRX-USDT": {"symbol": "TRX-USDT", "positionAmt": "10"}, + })) + venue = BingxVenueAdapter(backend=backend) + with patch.object(venue, "_backend_snapshot", side_effect=AssertionError( + "_backend_snapshot must NOT be called when backend._state is set" + )): + venue.open_positions() # must not raise + + def test_open_orders_reads_backend_state_when_present(self): + from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter + fake_state = _make_fake_state(orders=[ + {"orderId": "123", "clientOrderId": "abc", "status": "NEW", + "side": "SELL", "origQty": "10", "executedQty": "0"}, + ]) + backend = _make_backend(state=fake_state) + venue = BingxVenueAdapter(backend=backend) + result = venue.open_orders() + assert len(result) == 1 + + def test_open_orders_does_not_call_backend_snapshot_when_state_set(self): + from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter + backend = _make_backend(state=_make_fake_state(orders=[ + {"orderId": "1", "clientOrderId": "x", "status": "NEW", + "side": "SELL", "origQty": "5", "executedQty": "0"}, + ])) + venue = BingxVenueAdapter(backend=backend) + with patch.object(venue, "_backend_snapshot", side_effect=AssertionError( + "_backend_snapshot must NOT be called when backend._state is set" + )): + venue.open_orders() # must not raise + + def test_open_positions_falls_back_to_snapshot_when_state_none(self): + """Without a connected backend, open_positions() falls back to _backend_snapshot().""" + from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter + backend = _make_backend(state=None) + venue = BingxVenueAdapter(backend=backend) + fallback_state = _make_fake_state(positions={ + "LTC-USDT": {"symbol": "LTC-USDT", "positionAmt": "5"}, + }) + with patch.object(venue, "_backend_snapshot", return_value=fallback_state): + result = venue.open_positions() + assert len(result) == 1 + + def test_open_positions_empty_when_no_positions(self): + from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter + backend = _make_backend(state=_make_fake_state(positions={})) + venue = BingxVenueAdapter(backend=backend) + assert venue.open_positions() == [] + + def test_open_positions_thread_pool_never_called_from_async_context(self): + """Simulate the bug: _run() inside async context must not be reached by + open_positions() when backend._state is set.""" + from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter + backend = _make_backend(state=_make_fake_state(positions={ + "BNB-USDT": {"symbol": "BNB-USDT", "positionAmt": "2"}, + })) + venue = BingxVenueAdapter(backend=backend) + # Poison _run() to fail if reached + with patch.object(venue, "_run", side_effect=RuntimeError( + "_run() must NOT be called by open_positions() when backend._state is set" + )): + result = venue.open_positions() + assert len(result) == 1 + + +# ── B. connect() is awaitable ───────────────────────────────────────────────── + +class TestConnectIsAwaitable: + """venue.connect() must return a coroutine so PinkDirectRuntime.connect() + properly awaits it in the main event loop.""" + + def test_bingx_venue_connect_returns_coroutine(self): + from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter + backend = _make_backend(state=_make_fake_state()) + venue = BingxVenueAdapter(backend=backend) + result = venue.connect() + assert inspect.isawaitable(result), ( + "venue.connect() must return a coroutine so PinkDirectRuntime.connect() " + "awaits it in the main loop — not a bool that bypasses await" + ) + asyncio.run(result) + + def test_connect_awaits_backend_connect_in_main_loop(self): + from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter + backend = _make_backend(state=_make_fake_state()) + connect_calls = [] + + async def fake_connect(): + connect_calls.append(asyncio.get_running_loop()) + return True + + backend.connect = fake_connect + venue = BingxVenueAdapter(backend=backend) + + async def run(): + main_loop = asyncio.get_running_loop() + await venue.connect() + assert len(connect_calls) == 1 + assert connect_calls[0] is main_loop, ( + "backend.connect() must run in the main event loop, " + "not a thread-pool temporary loop" + ) + + asyncio.run(run()) + + def test_pink_direct_runtime_connect_awaits_venue_connect(self): + """PinkDirectRuntime.connect() must detect venue.connect() is awaitable + and await it — the inspect.isawaitable() guard must fire.""" + from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter + backend = _make_backend(state=_make_fake_state()) + venue = BingxVenueAdapter(backend=backend) + # The check: calling connect() without await must give a coroutine + coro = venue.connect() + assert inspect.isawaitable(coro) + # Clean up the coroutine to avoid ResourceWarning + coro.close() + + +# ── C. Reconcile multi-position guard ──────────────────────────────────────── + +class TestReconcileMultiPositionGuard: + """With N > 1 BingX positions on startup, _reconcile_position_slot must: + - log an ERROR naming the orphans + - pass only the largest position to reconcile_from_slots + - never silently discard positions without a log entry + """ + + def _make_kernel(self): + k = MagicMock() + k.max_slots = 1 + k.venue = MagicMock() + k.venue.open_positions = MagicMock(return_value=[]) + k.reconcile_from_slots = MagicMock() + k.set_seed_capital = MagicMock() + # _reconcile_position_slot accesses kernel.account.snapshot.{capital,peak_capital} + snap = MagicMock() + snap.capital = 0.0 + snap.peak_capital = 0.0 + k.account = MagicMock() + k.account.snapshot = snap + return k + + def test_single_position_reconciles_cleanly(self): + from prod.clean_arch.runtime.pink_direct import _reconcile_position_slot + kernel = self._make_kernel() + kernel.venue.open_positions.return_value = [ + {"symbol": "TRX-USDT", "positionAmt": "10", "positionSide": "SHORT", + "avgPrice": "0.32", "leverage": "1"}, + ] + import logging + with patch.object(logging.getLogger("prod.clean_arch.runtime.pink_direct"), + "error") as mock_err: + _reconcile_position_slot(kernel, 100_000.0) + mock_err.assert_not_called() + kernel.reconcile_from_slots.assert_called_once() + slots = kernel.reconcile_from_slots.call_args[0][0] + assert len(slots) == 1 + + def test_two_positions_logs_error_and_takes_largest(self): + from prod.clean_arch.runtime.pink_direct import _reconcile_position_slot + kernel = self._make_kernel() + kernel.venue.open_positions.return_value = [ + {"symbol": "LTC-USDT", "positionAmt": "40", "positionSide": "SHORT", + "avgPrice": "42.0", "leverage": "1"}, + {"symbol": "TRX-USDT", "positionAmt": "5871", "positionSide": "SHORT", + "avgPrice": "0.32", "leverage": "1"}, + ] + import logging + with patch.object(logging.getLogger("prod.clean_arch.runtime.pink_direct"), + "error") as mock_err: + _reconcile_position_slot(kernel, 100_000.0) + # Error logged naming the orphan + mock_err.assert_called_once() + err_msg = str(mock_err.call_args) + assert "LTC" in err_msg or "TRX" in err_msg + # Only the largest (TRX 5871) passed to kernel + slots = kernel.reconcile_from_slots.call_args[0][0] + assert len(slots) == 1 + assert slots[0].asset in ("TRX-USDT", "TRXUSDT") + + def test_seven_positions_logs_six_orphans(self): + from prod.clean_arch.runtime.pink_direct import _reconcile_position_slot + kernel = self._make_kernel() + symbols = ["NEO-USDT", "TRX-USDT", "BNB-USDT", "ADA-USDT", "LTC-USDT", "QTUM-USDT", "ATOM-USDT"] + qtys = [4882, 16560, 9, 62523, 608, 8122, 990] + kernel.venue.open_positions.return_value = [ + {"symbol": s, "positionAmt": str(q), "positionSide": "SHORT", + "avgPrice": "1.0", "leverage": "1"} + for s, q in zip(symbols, qtys) + ] + import logging + with patch.object(logging.getLogger("prod.clean_arch.runtime.pink_direct"), + "error") as mock_err: + _reconcile_position_slot(kernel, 100_000.0) + mock_err.assert_called_once() + slots = kernel.reconcile_from_slots.call_args[0][0] + assert len(slots) == 1 + # Largest by qty is ADA (62523) + assert slots[0].asset in ("ADA-USDT", "ADAUSDT") + + def test_flat_account_calls_reconcile_with_empty_list(self): + from prod.clean_arch.runtime.pink_direct import _reconcile_position_slot + kernel = self._make_kernel() + kernel.venue.open_positions.return_value = [] + _reconcile_position_slot(kernel, 100_000.0) + kernel.reconcile_from_slots.assert_called_once_with([]) + + +# ── D. clientOrderId on every order ────────────────────────────────────────── + +class TestClientOrderId: + """Every order submission must carry a unique hyphen-format clientOrderId + so BingX deduplicates retries and no duplicate positions are created.""" + + def test_clientorderid_present_in_enter_payload(self): + import re + from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter + from prod.clean_arch.dita_v2.contracts import KernelCommandType, KernelIntent, TradeSide + from prod.bingx.config import BingxExecClientConfig, BingxInstrumentProviderConfig + from prod.bingx.enums import BingxEnvironment + + cfg = BingxExecClientConfig( + api_key="test", secret_key="test", + environment=BingxEnvironment.VST, allow_mainnet=False, + instrument_provider=BingxInstrumentProviderConfig(load_all=False), + ) + adapter = BingxDirectExecutionAdapter(cfg) + + posted_payloads: list[dict] = [] + async def fake_post(path, params): + posted_payloads.append(dict(params)) + return {"order": {"orderId": "99", "clientOrderId": params.get("clientOrderId", ""), + "status": "FILLED", "executedQty": "10", "avgPrice": "0.32", + "side": "SELL", "symbol": "TRX-USDT"}} + + async def run(): + from prod.clean_arch.dita import Intent, TradeSide as LTS, DecisionAction + intent = Intent( + timestamp=datetime.now(timezone.utc), + trade_id="t1", decision_id="d1", asset="TRX-USDT", + action=DecisionAction.ENTER, side=LTS.SHORT, + reason="test", target_size=10.0, leverage=1.0, + reference_price=0.32, confidence=1.0, + bars_held=0, exit_leg_ratios=(1.0,), metadata={}, + ) + adapter._client.signed_post = fake_post + adapter._provider = MagicMock() + adapter._provider.find = MagicMock(return_value=None) + adapter._provider.list_all = MagicMock(return_value=[]) + adapter._leverage_cache = {"TRX-USDT": 1} + adapter._state = _make_fake_state() + await adapter.submit_intent(intent) + + asyncio.run(run()) + assert posted_payloads, "No POST was made" + payload = posted_payloads[0] + cid = payload.get("clientOrderId", "") + assert cid, "clientOrderId must be non-empty" + # Must match hyphen format p-{e|x}-{base36}-{rand4} + assert re.match(r"^p-[ex]-[0-9a-z]+-[0-9a-f]{4}$", cid), ( + f"clientOrderId '{cid}' does not match expected 'p-e-{{base36}}-{{rand4}}' format" + ) + + def test_clientorderid_unique_across_calls(self): + """Two submit_intent calls must produce different clientOrderIds.""" + import re + from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter + from prod.bingx.config import BingxExecClientConfig, BingxInstrumentProviderConfig + from prod.bingx.enums import BingxEnvironment + + cfg = BingxExecClientConfig( + api_key="test", secret_key="test", + environment=BingxEnvironment.VST, allow_mainnet=False, + instrument_provider=BingxInstrumentProviderConfig(load_all=False), + ) + adapter = BingxDirectExecutionAdapter(cfg) + ids: list[str] = [] + + async def fake_post(path, params): + ids.append(params.get("clientOrderId", "")) + return {"order": {"orderId": str(len(ids)), "clientOrderId": ids[-1], + "status": "FILLED", "executedQty": "10", "avgPrice": "0.32", + "side": "SELL", "symbol": "TRX-USDT"}} + + async def run(): + from prod.clean_arch.dita import Intent, TradeSide as LTS, DecisionAction + adapter._client.signed_post = fake_post + adapter._provider = MagicMock() + adapter._provider.find = MagicMock(return_value=None) + adapter._provider.list_all = MagicMock(return_value=[]) + adapter._leverage_cache = {"TRX-USDT": 1} + adapter._state = _make_fake_state() + for _ in range(3): + intent = Intent( + timestamp=datetime.now(timezone.utc), + trade_id="t1", decision_id="d1", asset="TRX-USDT", + action=DecisionAction.ENTER, side=LTS.SHORT, + reason="test", target_size=10.0, leverage=1.0, + reference_price=0.32, confidence=1.0, + bars_held=0, exit_leg_ratios=(1.0,), metadata={}, + ) + await adapter.submit_intent(intent) + + asyncio.run(run()) + assert len(ids) == 3 + assert len(set(ids)) == 3, f"clientOrderIds not unique: {ids}" + + +# ── E. EXIT sizing from slot ────────────────────────────────────────────────── + +class TestExitSizingFromSlot: + """_exit_intent_from_slot() must always cap exit size to the kernel's + tracked slot size, never overshoot (double-close) or undershoot.""" + + def _make_runtime(self, slot_size: float): + from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime + from prod.clean_arch.dita_v2.launcher import build_launcher_bundle + bundle = build_launcher_bundle(max_slots=1) + rt = PinkDirectRuntime.__new__(PinkDirectRuntime) + rt.kernel = bundle.kernel + rt.kernel._slots = [MagicMock()] + rt.kernel._slots[0].size = slot_size + rt.kernel.slot = lambda i: rt.kernel._slots[i] + return rt + + def _make_intent(self, policy_size: float): + from prod.clean_arch.dita_v2.contracts import KernelCommandType, KernelIntent, TradeSide + return KernelIntent( + timestamp=datetime.now(timezone.utc), + intent_id="i1", trade_id="t1", slot_id=0, + asset="TRX-USDT", side=TradeSide.SHORT, action=KernelCommandType.EXIT, + reference_price=0.32, target_size=policy_size, leverage=1.0, + exit_leg_ratios=(1.0,), reason="test", metadata={}, + ) + + def test_exit_capped_to_slot_size_when_policy_overshoots(self): + from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime + rt = self._make_runtime(slot_size=10.0) + intent = self._make_intent(policy_size=999.0) + result = rt._exit_intent_from_slot(intent) + assert result.target_size == 10.0, ( + "EXIT must be capped to slot_size=10.0, not policy_size=999.0" + ) + + def test_exit_uses_slot_size_when_policy_is_nan(self): + import math + from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime + rt = self._make_runtime(slot_size=10.0) + intent = self._make_intent(policy_size=float("nan")) + result = rt._exit_intent_from_slot(intent) + assert result.target_size == 10.0 + + def test_exit_uses_min_of_policy_and_slot(self): + from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime + rt = self._make_runtime(slot_size=10.0) + intent = self._make_intent(policy_size=7.0) + result = rt._exit_intent_from_slot(intent) + assert result.target_size == 7.0 + + def test_exit_uses_slot_size_when_slot_size_is_zero(self): + """When kernel reports no remaining size, trust the policy size.""" + from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime + rt = self._make_runtime(slot_size=0.0) + intent = self._make_intent(policy_size=5.0) + result = rt._exit_intent_from_slot(intent) + assert result.target_size == 5.0 + + def test_double_position_would_be_undersized_exit(self): + """Demonstrates why clientOrderId fix is critical: if BingX holds 2x + the slot size (from a retry duplicate), EXIT for slot_size only closes + half — leaving an orphan. This test documents the invariant: + slot_size must equal BingX actual position size for a clean close.""" + from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime + slot_size = 10.0 + bingx_actual = 20.0 # pre-fix: retry doubled the position + rt = self._make_runtime(slot_size=slot_size) + intent = self._make_intent(policy_size=slot_size) + result = rt._exit_intent_from_slot(intent) + # EXIT sends slot_size (10), but BingX has 20 → 10 remain = orphan + exit_qty = result.target_size + orphan = bingx_actual - exit_qty + assert orphan == 10.0, "Documents pre-fix orphan mechanism (clientOrderId fix prevents this)" + # Post-fix: BingX actual == slot_size, no orphan + bingx_post_fix = slot_size + orphan_post_fix = bingx_post_fix - exit_qty + assert orphan_post_fix == 0.0 + + +# ── F. Signature deduplication guard ───────────────────────────────────────── + +class TestSignatureNoDuplication: + """The HMAC signing path must append signature= exactly once.""" + + def test_signature_appears_exactly_once_in_body(self): + from prod.bingx.signing import build_signed_params, canonical_query + params = {"symbol": "TRX-USDT", "side": "SELL", "quantity": "10"} + signed = build_signed_params(params, "secret123") + # canonical_query excludes signature — signature only appended once + canonical = canonical_query({k: v for k, v in signed.items() if k != "signature"}) + body = f"{canonical}&signature={signed['signature']}" + assert body.count("signature=") == 1, ( + f"signature= appeared {body.count('signature=')} times in body — must be exactly 1" + ) + + def test_signature_not_in_canonical_query(self): + from prod.bingx.signing import build_signed_params, canonical_query + signed = build_signed_params({"x": "1"}, "secret") + canonical = canonical_query({k: v for k, v in signed.items() if k != "signature"}) + assert "signature" not in canonical