From 535eea855d1e1892deaa8b40d99efc6cb65bc171 Mon Sep 17 00:00:00 2001 From: Codex Date: Fri, 5 Jun 2026 16:02:13 +0200 Subject: [PATCH] =?UTF-8?q?PINK:=20cancel=5Fasync,=20S2=20task=20guard,=20?= =?UTF-8?q?29=20new=20regression=20tests=20=E2=80=94=20346/346=20green?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug fixes: 1. bingx_venue.py: add cancel_async() — async cancel that awaits backend.cancel() directly in the main event loop. The sync cancel() path goes through _run() → thread-pool → asyncio.run() in a new thread, but aiohttp is bound to the main loop → deadlock. Identical root cause as the old sync submit() → fixed via submit_async. Remove dead cancel_order branch (BingxDirectExecutionAdapter has cancel, not cancel_order). 2. rust_backend.py: process_intent_async CANCEL path now uses cancel_async when available (matching the submit_async pattern for ENTER/EXIT). Sync cancel() fallback kept for MockVenueAdapter compat. 3. bingx_direct.py: guard S2 background refresh task per symbol. Old code discarded the task reference; rapid submits piled up concurrent _refresh_state_background calls all writing self._state in arbitrary completion order (stale last-writer- wins). Now: skip creating a new task if one is already pending for the symbol; store reference and clear via done-callback. Test additions (test_bingx_bugs.py, 29 tests): - cancel_async: awaitable, calls backend.cancel directly, maps all statuses - process_intent_async CANCEL: dispatches cancel_async / falls back to sync - S2 guard: task stored, no duplicates while pending, new task after done - _events_from_submit with None snapshots: FILLED/NEW/REJECTED/PARTIAL/RATE_LIMITED - _filled_size_from_snapshots(None, None): safe 0.0 return - _events_from_cancel: before/after completely ignored - connect(): no double refresh_state, no-op if backend has no connect - submit() sync with None snapshots: FULL_FILL still emitted - cancel() branch audit: uses cancel not cancel_order, raises for no-cancel backend Fix: test_exchange_event_seam_parity.py TestMockSubscribe — replace deprecated asyncio.get_event_loop().run_until_complete() with asyncio.run() (Python 3.12 raises RuntimeError when event loop is closed by earlier suite tests). Co-Authored-By: Claude Sonnet 4.6 --- prod/clean_arch/adapters/bingx_direct.py | 17 +- prod/clean_arch/dita_v2/bingx_venue.py | 25 +- prod/clean_arch/dita_v2/rust_backend.py | 14 +- prod/clean_arch/dita_v2/test_bingx_bugs.py | 645 ++++++++++++++++++ .../test_exchange_event_seam_parity.py | 4 +- 5 files changed, 693 insertions(+), 12 deletions(-) create mode 100644 prod/clean_arch/dita_v2/test_bingx_bugs.py diff --git a/prod/clean_arch/adapters/bingx_direct.py b/prod/clean_arch/adapters/bingx_direct.py index 7f1c9f5..424f94e 100644 --- a/prod/clean_arch/adapters/bingx_direct.py +++ b/prod/clean_arch/adapters/bingx_direct.py @@ -226,6 +226,11 @@ class BingxDirectExecutionAdapter(ExecutionPort): # ── S2: Background state refresh tracking ───────────────────────────── self._state_refreshed_at: float = 0.0 # monotonic seconds + # Per-symbol pending background-refresh task. Prevents concurrent REST + # calls piling up (rapid submits) — if a task is already running for a + # symbol we skip creating a new one; the running task captures state after + # the most recent fill anyway. Done callback removes the entry on completion. + self._s2_tasks: Dict[str, "asyncio.Task[None]"] = {} @property def state(self) -> ExchangeStateSnapshot | None: @@ -706,10 +711,14 @@ class BingxDirectExecutionAdapter(ExecutionPort): # For LIMIT / non-FILLED orders: must refresh synchronously to detect resting order. market_filled = (status == "FILLED" and not is_limit) if market_filled: - asyncio.create_task( - self._refresh_state_background(intent.asset), - name=f"state_refresh_{symbol}", - ) + existing = self._s2_tasks.get(symbol) + if existing is None or existing.done(): + task = asyncio.create_task( + self._refresh_state_background(intent.asset), + name=f"state_refresh_{symbol}", + ) + self._s2_tasks[symbol] = task + task.add_done_callback(lambda _t, _s=symbol: self._s2_tasks.pop(_s, None)) else: self._state = await self._refresh_exchange_state(intent.asset, include_history=False) diff --git a/prod/clean_arch/dita_v2/bingx_venue.py b/prod/clean_arch/dita_v2/bingx_venue.py index 6448f0a..7876d63 100644 --- a/prod/clean_arch/dita_v2/bingx_venue.py +++ b/prod/clean_arch/dita_v2/bingx_venue.py @@ -348,12 +348,29 @@ class BingxVenueAdapter(VenueAdapter): # backend.connect() already called refresh_state() — no second fetch needed return True + async def cancel_async(self, order: VenueOrder, *, reason: str = "") -> List[VenueEvent]: + """Async cancel — runs in the caller's event loop, no thread-pool deadlock. + + The sync cancel() path goes through _call_backend → _run → thread-pool → + asyncio.run() in a new thread. The aiohttp session is bound to the main + event loop, so using it from a different loop deadlocks — same bug that + was fixed for submit via submit_async. This version awaits backend.cancel() + directly in the caller's (main) event loop. + """ + cancel_fn = getattr(self.backend, "cancel", None) + if cancel_fn is not None: + response = await cancel_fn(order, reason=reason) + else: + response = None + return self._events_from_cancel(order, response, None, None, reason=reason) + def cancel(self, order: VenueOrder, *, reason: str = "") -> List[VenueEvent]: - # _events_from_cancel never reads before/after — snapshots are dead weight + # _events_from_cancel never reads before/after — snapshots are dead weight. + # NOTE: if backend.cancel is async (BingxDirectExecutionAdapter), this sync + # path goes through the thread-pool and will deadlock in a running event loop. + # Use cancel_async() from async contexts (process_intent_async already does). response = None - if hasattr(self.backend, "cancel_order"): - response = self._call_backend("cancel_order", order, reason=reason) - elif hasattr(self.backend, "cancel"): + if hasattr(self.backend, "cancel"): response = self._call_backend("cancel", order, reason=reason) else: client = getattr(self.backend, "_client", None) diff --git a/prod/clean_arch/dita_v2/rust_backend.py b/prod/clean_arch/dita_v2/rust_backend.py index b556ff6..05d1e43 100644 --- a/prod/clean_arch/dita_v2/rust_backend.py +++ b/prod/clean_arch/dita_v2/rust_backend.py @@ -969,13 +969,23 @@ class ExecutionKernel: evt_outcome = self.on_venue_event(event) all_venue_transitions.extend(evt_outcome.transitions) elif intent.action == KernelCommandType.CANCEL: + # Use cancel_async if available (avoids thread-pool deadlock — same + # issue that required submit_async for ENTER/EXIT). Fall back to + # sync cancel() for mock/test venues that don't expose cancel_async. slot_view = self.slot(intent.slot_id) + cancel_async = getattr(self.venue, "cancel_async", None) + order_to_cancel = None if slot_view.active_exit_order is not None: - emitted_events = self.venue.cancel(slot_view.active_exit_order, reason=intent.reason) + order_to_cancel = slot_view.active_exit_order elif slot_view.active_entry_order is not None and slot_view.fsm_state in { TradeStage.ENTRY_WORKING, TradeStage.ORDER_REQUESTED, TradeStage.ORDER_SENT, TradeStage.IDLE, }: - emitted_events = self.venue.cancel(slot_view.active_entry_order, reason=intent.reason) + order_to_cancel = slot_view.active_entry_order + if order_to_cancel is not None: + if cancel_async is not None: + emitted_events = await cancel_async(order_to_cancel, reason=intent.reason) + else: + emitted_events = self.venue.cancel(order_to_cancel, reason=intent.reason) else: emitted_events = [] for event in emitted_events: diff --git a/prod/clean_arch/dita_v2/test_bingx_bugs.py b/prod/clean_arch/dita_v2/test_bingx_bugs.py new file mode 100644 index 0000000..4c61938 --- /dev/null +++ b/prod/clean_arch/dita_v2/test_bingx_bugs.py @@ -0,0 +1,645 @@ +"""Regression tests for BingX adapter bugs found during 2026-06-05 review. + +Covers: + - cancel_async: no thread-pool deadlock from async context + - process_intent_async CANCEL: uses cancel_async (not sync cancel) + - S2 task guard: no duplicate background refresh tasks per symbol + - _events_from_submit with None snapshots: correct fill emit + - cancel() with None snapshots: correct events + - _filled_size_from_snapshots(None, None): returns 0.0 safely + - _events_from_cancel ignores before/after completely + - submit() sync with None snapshots: no snapshot fallback regression + +Run: + python -m pytest test_bingx_bugs.py -v +""" +from __future__ import annotations + +import asyncio +import inspect +import itertools +import sys +import threading +from datetime import datetime, timezone +from typing import Any, List +from unittest.mock import AsyncMock, MagicMock, patch + +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest + +from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter +from prod.clean_arch.dita_v2.contracts import ( + KernelCommandType, + KernelEventKind, + KernelIntent, + TradeSide, + TradeStage, + VenueEvent, + VenueEventStatus, + VenueOrder, + VenueOrderStatus, +) +from prod.clean_arch.dita_v2.mock_venue import MockVenueAdapter, MockVenueScenario +from prod.clean_arch.dita_v2.rust_backend import ExecutionKernel + + +# ── helpers ─────────────────────────────────────────────────────────────────── + +def _make_venue(*, cancel_response=None, submit_response=None) -> BingxVenueAdapter: + backend = MagicMock() + backend.cancel = AsyncMock(return_value=cancel_response or {"status": "CANCELED", "orderId": "O1", "clientOrderId": "C1"}) + backend.submit_intent = AsyncMock(return_value=_mock_receipt("FILLED")) + backend.refresh_state = AsyncMock(return_value=_flat_snap()) + backend.connect = AsyncMock(return_value=True) + venue = BingxVenueAdapter.__new__(BingxVenueAdapter) + venue.backend = backend + venue._event_seq = itertools.count(1) + venue._snap_lock = threading.Lock() + venue._snapshot_ready = threading.Event() + venue._snapshot_ready.set() + venue._last_snapshot = None + return venue + + +def _flat_snap(**kw): + s = MagicMock() + s.open_orders = kw.get("open_orders", []) + s.all_orders = kw.get("all_orders", []) + s.all_fills = kw.get("all_fills", []) + s.open_positions = kw.get("open_positions", {}) + s.capital = kw.get("capital", 25000.0) + return s + + +def _mock_receipt(status: str = "FILLED", executed_qty: float = 1.0, order_id: str = "O1") -> MagicMock: + r = MagicMock() + r.status = status + r.order_id = order_id + r.client_order_id = "C1" + r.price = 100.0 + r.quantity = 1.0 + r.timestamp = datetime.now(timezone.utc) + r.raw_ack = { + "status": status, + "orderId": order_id, + "clientOrderId": "C1", + "executedQty": str(executed_qty), + "avgPrice": "100.0", + } + return r + + +def _make_order(slot_id: int = 0, asset: str = "TRX-USDT") -> VenueOrder: + return VenueOrder( + internal_trade_id="t1", + venue_order_id="O1", + venue_client_id="C1", + side=TradeSide.SHORT, + intended_size=10.0, + filled_size=0.0, + average_fill_price=0.0, + status=VenueOrderStatus.NEW, + metadata={"slot_id": slot_id, "asset": asset}, + ) + + +def _make_intent(action=KernelCommandType.ENTER, trade_id="t1", slot_id=0, + asset="TRX-USDT", side=TradeSide.SHORT, size=10.0) -> KernelIntent: + return KernelIntent( + timestamp=datetime.now(timezone.utc), + intent_id=trade_id, + trade_id=trade_id, + slot_id=slot_id, + asset=asset, + action=action, + side=side, + reason="test", + target_size=size, + leverage=1.0, + reference_price=100.0, + exit_leg_ratios=(1.0,), + metadata={}, + ) + + +# ── Bug 1: cancel_async — must be awaitable, must not go through thread pool ── + +class TestCancelAsync: + def test_cancel_async_is_coroutine(self): + venue = _make_venue() + order = _make_order() + result = venue.cancel_async(order) + assert inspect.iscoroutine(result), "cancel_async must return a coroutine" + asyncio.run(result) + + def test_cancel_async_calls_backend_cancel_directly(self): + """cancel_async must await backend.cancel — not go through _run/thread pool.""" + venue = _make_venue() + order = _make_order() + + async def _run(): + events = await venue.cancel_async(order) + return events + + events = asyncio.run(_run()) + venue.backend.cancel.assert_awaited_once() + assert len(events) == 1 + assert events[0].kind == KernelEventKind.CANCEL_ACK + + def test_cancel_async_cancel_rejected_maps_correctly(self): + """Backend returning REJECTED → CANCEL_REJECT event.""" + venue = _make_venue(cancel_response={"status": "REJECTED", "msg": "no such order", "orderId": "O1", "clientOrderId": "C1"}) + order = _make_order() + + events = asyncio.run(venue.cancel_async(order)) + assert events[0].kind == KernelEventKind.CANCEL_REJECT + + def test_cancel_async_rate_limited_maps_correctly(self): + venue = _make_venue(cancel_response={"status": "RATE_LIMITED", "msg": "too fast", "orderId": "O1", "clientOrderId": "C1"}) + order = _make_order() + + events = asyncio.run(venue.cancel_async(order)) + assert events[0].kind == KernelEventKind.RATE_LIMITED + + def test_cancel_async_backend_none_cancel_returns_empty_response(self): + """If backend has no cancel method, cancel_async returns CANCELED event with None response.""" + venue = _make_venue() + del venue.backend.cancel # remove cancel attribute + venue.backend.cancel = None + order = _make_order() + + events = asyncio.run(venue.cancel_async(order)) + # None response → raw={} → status defaults to CANCELED → CANCEL_ACK + assert events[0].kind == KernelEventKind.CANCEL_ACK + + def test_cancel_async_no_deadlock_from_running_loop(self): + """cancel_async must not spin up a thread pool — safe inside running loop.""" + venue = _make_venue() + order = _make_order() + + async def _inner(): + # If cancel_async used _run() with thread pool, this would deadlock + # because aiohttp is bound to this event loop. + return await venue.cancel_async(order) + + events = asyncio.run(_inner()) + assert len(events) >= 1 # got events without deadlock + + +# ── Bug 2: process_intent_async CANCEL path uses cancel_async ───────────────── + +class TestCancelAsyncUsedByKernel: + def test_process_intent_cancel_uses_cancel_async_when_available(self): + """process_intent_async CANCEL branch must await cancel_async (not sync cancel) + when the venue exposes cancel_async. + + Strategy: inject a fake slot with an active_exit_order so the cancel + condition is met, then verify cancel_async was awaited. + """ + from prod.clean_arch.dita_v2.contracts import VenueOrderStatus + + cancel_async_calls = [] + + fake_order = VenueOrder( + internal_trade_id="t1", + venue_order_id="O1", + venue_client_id="C1", + side=TradeSide.SHORT, + intended_size=10.0, + filled_size=0.0, + average_fill_price=0.0, + status=VenueOrderStatus.NEW, + metadata={"slot_id": 0, "asset": "TRX-USDT"}, + ) + + async def _fake_cancel_async(order, *, reason=""): + cancel_async_calls.append(order) + return [VenueEvent( + timestamp=datetime.now(timezone.utc), + event_id="EV-001", + trade_id="t1", + slot_id=0, + kind=KernelEventKind.CANCEL_ACK, + status=VenueEventStatus.CANCELED, + venue_order_id="O1", + venue_client_id="C1", + side=TradeSide.SHORT, + asset="TRX-USDT", + price=0.0, size=0.0, filled_size=0.0, remaining_size=0.0, + reason=reason, raw_payload={}, metadata={}, + )] + + scenario = MockVenueScenario() + venue = MockVenueAdapter(scenario=scenario) + venue.cancel_async = _fake_cancel_async + + kernel = ExecutionKernel(max_slots=1, venue=venue) + + # Inject an active_exit_order via a mock slot view so the cancel condition is met. + # KernelSlotView.active_exit_order is Rust-backed — patch at the method level. + mock_slot = MagicMock() + mock_slot.active_exit_order = fake_order + mock_slot.active_entry_order = None + mock_slot.fsm_state = TradeStage.POSITION_OPEN + + with patch.object(kernel, "slot", return_value=mock_slot): + async def _run(): + cancel_intent = _make_intent(action=KernelCommandType.CANCEL) + return await kernel.process_intent_async(cancel_intent) + + asyncio.run(_run()) + + assert len(cancel_async_calls) > 0, ( + "process_intent_async CANCEL path must call cancel_async, not sync cancel" + ) + + def test_process_intent_cancel_falls_back_to_sync_for_mock(self): + """If venue has no cancel_async, fall back to sync cancel (mock venue compat).""" + scenario = MockVenueScenario() + venue = MockVenueAdapter(scenario=scenario) + # Confirm MockVenueAdapter has no cancel_async + assert not hasattr(venue, "cancel_async"), "MockVenueAdapter should not have cancel_async for this test" + + kernel = ExecutionKernel(max_slots=1, venue=venue) + + async def _run(): + enter = _make_intent(action=KernelCommandType.ENTER) + await kernel.process_intent_async(enter) + cancel = _make_intent(action=KernelCommandType.CANCEL) + return await kernel.process_intent_async(cancel) + + # Must not raise even without cancel_async + outcome = asyncio.run(_run()) + assert outcome is not None + + +# ── Bug 3: S2 task guard — no duplicate background tasks per symbol ─────────── + +class TestS2TaskGuard: + def test_s2_task_stored_on_creation(self): + """Background refresh task must be stored in _s2_tasks for its duration.""" + from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter + + adapter = BingxDirectExecutionAdapter.__new__(BingxDirectExecutionAdapter) + adapter._s2_tasks = {} + adapter._state = None + adapter._state_refreshed_at = 0.0 + + refresh_started = asyncio.Event() + refresh_proceed = asyncio.Event() + + async def _slow_refresh(symbol, include_history=False): + refresh_started.set() + await refresh_proceed.wait() + return MagicMock() + + adapter._refresh_exchange_state = _slow_refresh + from unittest.mock import MagicMock as MM + adapter._instrument_venue_symbol = lambda a: a.upper().replace("-", "") + "USDT" + + async def _run(): + # Simulate create_task for background refresh + task = asyncio.create_task( + adapter._refresh_state_background("TRX-USDT"), + name="state_refresh_TRXUSDT", + ) + adapter._s2_tasks["TRXUSDT"] = task + task.add_done_callback(lambda _t: adapter._s2_tasks.pop("TRXUSDT", None)) + + # While task is running, entry should exist + await refresh_started.wait() + assert "TRXUSDT" in adapter._s2_tasks, "Task must be tracked while pending" + assert not adapter._s2_tasks["TRXUSDT"].done() + + # Let it complete + refresh_proceed.set() + await asyncio.sleep(0.01) + # After completion, entry removed by done callback + assert "TRXUSDT" not in adapter._s2_tasks, "Done task must be removed from _s2_tasks" + + asyncio.run(_run()) + + def test_s2_no_duplicate_task_when_one_pending(self): + """When a background refresh is already pending, a second submit must not + create a duplicate task — avoids concurrent REST writes to self._state.""" + from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter + + adapter = BingxDirectExecutionAdapter.__new__(BingxDirectExecutionAdapter) + adapter._s2_tasks = {} + adapter._state = None + adapter._state_refreshed_at = 0.0 + + call_count = 0 + + async def _counting_refresh(symbol, include_history=False): + nonlocal call_count + call_count += 1 + await asyncio.sleep(0.05) # simulate slow refresh + return MagicMock() + + adapter._refresh_exchange_state = _counting_refresh + + async def _simulate_submit(symbol: str): + existing = adapter._s2_tasks.get(symbol) + if existing is None or existing.done(): + task = asyncio.create_task(adapter._refresh_state_background(symbol)) + adapter._s2_tasks[symbol] = task + task.add_done_callback(lambda _t: adapter._s2_tasks.pop(symbol, None)) + + async def _run(): + sym = "TRXUSDT" + # Fire two rapid "submits" while first refresh is still running + await _simulate_submit(sym) + await _simulate_submit(sym) # should be skipped + await asyncio.sleep(0.1) # wait for all tasks + + asyncio.run(_run()) + assert call_count == 1, ( + f"Expected 1 background refresh call, got {call_count}. " + "Duplicate tasks fire redundant REST calls and cause stale last-writer-wins." + ) + + def test_s2_new_task_created_after_previous_done(self): + """After a background task completes, next submit should create a new one.""" + from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter + + adapter = BingxDirectExecutionAdapter.__new__(BingxDirectExecutionAdapter) + adapter._s2_tasks = {} + adapter._state = None + adapter._state_refreshed_at = 0.0 + call_count = 0 + + async def _fast_refresh(symbol, include_history=False): + nonlocal call_count + call_count += 1 + return MagicMock() + + adapter._refresh_exchange_state = _fast_refresh + + async def _simulate_submit(symbol: str): + existing = adapter._s2_tasks.get(symbol) + if existing is None or existing.done(): + task = asyncio.create_task(adapter._refresh_state_background(symbol)) + adapter._s2_tasks[symbol] = task + task.add_done_callback(lambda _t: adapter._s2_tasks.pop(symbol, None)) + + async def _run(): + sym = "TRXUSDT" + await _simulate_submit(sym) + await asyncio.sleep(0.01) # first task completes + await _simulate_submit(sym) # second submit → new task OK + await asyncio.sleep(0.01) + + asyncio.run(_run()) + assert call_count == 2, f"Expected 2 refresh calls (one per submit after prior done), got {call_count}" + + +# ── _events_from_submit with None snapshots ─────────────────────────────────── + +class TestEventsFromSubmitNoneSnapshots: + def _venue(self): + return _make_venue() + + def test_filled_market_order_emits_full_fill_no_snapshots(self): + """MARKET FILLED with None snapshots → ORDER_ACK + FULL_FILL via executedQty.""" + venue = self._venue() + intent = _make_intent() + receipt = _mock_receipt("FILLED", executed_qty=10.0) + + events = venue._events_from_submit(intent, receipt, None, None) + kinds = [e.kind for e in events] + assert KernelEventKind.ORDER_ACK in kinds + assert KernelEventKind.FULL_FILL in kinds + fill = next(e for e in events if e.kind == KernelEventKind.FULL_FILL) + assert fill.filled_size == pytest.approx(10.0) + + def test_filled_no_executed_qty_falls_back_to_target_size(self): + """FILLED with no executedQty → fill size falls back to intent.target_size.""" + venue = self._venue() + intent = _make_intent(size=10.0) + receipt = _mock_receipt("FILLED", executed_qty=0.0) + receipt.raw_ack["executedQty"] = "0" + + events = venue._events_from_submit(intent, receipt, None, None) + fill = next((e for e in events if e.kind == KernelEventKind.FULL_FILL), None) + assert fill is not None + assert fill.filled_size == pytest.approx(10.0), "Should fall back to target_size when executedQty=0" + + def test_acked_order_no_fill_event(self): + """NEW/ACKED order with None snapshots → ORDER_ACK only, no FULL_FILL.""" + venue = self._venue() + intent = _make_intent() + receipt = _mock_receipt("NEW", executed_qty=0.0) + receipt.raw_ack["executedQty"] = "0" + receipt.raw_ack["status"] = "NEW" + + events = venue._events_from_submit(intent, receipt, None, None) + kinds = [e.kind for e in events] + assert KernelEventKind.ORDER_ACK in kinds + assert KernelEventKind.FULL_FILL not in kinds + assert KernelEventKind.PARTIAL_FILL not in kinds + + def test_rejected_order_emits_order_reject(self): + """REJECTED → ORDER_REJECT only.""" + venue = self._venue() + intent = _make_intent() + receipt = _mock_receipt("REJECTED", executed_qty=0.0) + receipt.raw_ack["status"] = "REJECTED" + receipt.raw_ack["msg"] = "min qty not met" + + events = venue._events_from_submit(intent, receipt, None, None) + assert len(events) == 1 + assert events[0].kind == KernelEventKind.ORDER_REJECT + assert "min qty" in events[0].reason + + def test_partial_fill_emits_partial_fill_event(self): + """PARTIALLY_FILLED → ORDER_ACK + PARTIAL_FILL.""" + venue = self._venue() + intent = _make_intent(size=10.0) + receipt = _mock_receipt("PARTIALLY_FILLED", executed_qty=5.0) + receipt.raw_ack["status"] = "PARTIALLY_FILLED" + receipt.raw_ack["executedQty"] = "5.0" + + events = venue._events_from_submit(intent, receipt, None, None) + kinds = [e.kind for e in events] + assert KernelEventKind.PARTIAL_FILL in kinds + partial = next(e for e in events if e.kind == KernelEventKind.PARTIAL_FILL) + assert partial.filled_size == pytest.approx(5.0) + assert partial.remaining_size == pytest.approx(5.0) + + def test_rate_limited_returns_rate_limited_event(self): + """RATE_LIMITED → single RATE_LIMITED event, no fills.""" + venue = self._venue() + intent = _make_intent() + receipt = _mock_receipt("RATE_LIMITED", executed_qty=0.0) + receipt.raw_ack = {"status": "RATE_LIMITED", "msg": "too many requests"} + + events = venue._events_from_submit(intent, receipt, None, None) + assert len(events) == 1 + assert events[0].kind == KernelEventKind.RATE_LIMITED + + +# ── _filled_size_from_snapshots with None inputs ────────────────────────────── + +class TestFilledSizeFromNoneSnapshots: + def test_both_none_returns_zero(self): + result = BingxVenueAdapter._filled_size_from_snapshots(None, None, "TRX-USDT") + assert result == 0.0 + + def test_before_none_after_valid(self): + after = MagicMock() + after.open_positions = {"TRX-USDT": {"symbol": "TRX-USDT", "positionAmt": "10.0"}} + result = BingxVenueAdapter._filled_size_from_snapshots(None, after, "TRX-USDT") + assert result == pytest.approx(10.0) + + def test_both_valid_returns_diff(self): + before = MagicMock() + before.open_positions = {"TRX-USDT": {"symbol": "TRX-USDT", "positionAmt": "0.0"}} + after = MagicMock() + after.open_positions = {"TRX-USDT": {"symbol": "TRX-USDT", "positionAmt": "10.0"}} + result = BingxVenueAdapter._filled_size_from_snapshots(before, after, "TRX-USDT") + assert result == pytest.approx(10.0) + + +# ── _events_from_cancel ignores before/after completely ────────────────────── + +class TestEventsFromCancelIgnoresSnapshots: + def _venue(self): + return _make_venue() + + def test_cancel_ack_with_none_snapshots(self): + venue = self._venue() + order = _make_order() + events = venue._events_from_cancel(order, {"status": "CANCELED", "orderId": "O1"}, None, None) + assert len(events) == 1 + assert events[0].kind == KernelEventKind.CANCEL_ACK + + def test_cancel_ack_with_garbage_snapshots_same_result(self): + """Passing arbitrary objects as before/after must produce the same events + as passing None — confirming they're truly ignored.""" + venue = self._venue() + order = _make_order() + + events_null = venue._events_from_cancel(order, {"status": "CANCELED"}, None, None) + events_junk = venue._events_from_cancel(order, {"status": "CANCELED"}, "JUNK", 42) + assert events_null[0].kind == events_junk[0].kind + assert events_null[0].status == events_junk[0].status + + def test_cancel_reject_with_none_snapshots(self): + venue = self._venue() + order = _make_order() + events = venue._events_from_cancel(order, {"status": "REJECTED", "msg": "already filled"}, None, None) + assert events[0].kind == KernelEventKind.CANCEL_REJECT + + def test_cancel_rate_limited_with_none_snapshots(self): + venue = self._venue() + order = _make_order() + events = venue._events_from_cancel(order, {"status": "RATE_LIMITED", "msg": "slow down"}, None, None) + assert events[0].kind == KernelEventKind.RATE_LIMITED + + +# ── connect() no double refresh ─────────────────────────────────────────────── + +class TestConnectNoDoubleRefresh: + def test_connect_calls_backend_connect_once_no_extra_snapshot(self): + """connect() must call backend.connect() exactly once and not call + refresh_state() a second time (the redundant _backend_snapshot was removed).""" + backend = MagicMock() + backend.connect = AsyncMock(return_value=True) + backend.refresh_state = AsyncMock(return_value=_flat_snap()) + venue = BingxVenueAdapter.__new__(BingxVenueAdapter) + venue.backend = backend + venue._event_seq = itertools.count(1) + venue._snap_lock = threading.Lock() + venue._snapshot_ready = threading.Event() + venue._snapshot_ready.set() + venue._last_snapshot = None + + result = venue.connect() + assert result is True + # backend.connect called once + backend.connect.assert_called_once() + # refresh_state must NOT have been called from connect() itself + backend.refresh_state.assert_not_called() + + def test_connect_without_backend_connect_is_noop(self): + """If backend has no connect(), connect() returns True without crashing.""" + backend = MagicMock(spec=[]) # empty spec — no attributes + venue = BingxVenueAdapter.__new__(BingxVenueAdapter) + venue.backend = backend + venue._event_seq = itertools.count(1) + venue._snap_lock = threading.Lock() + venue._snapshot_ready = threading.Event() + venue._snapshot_ready.set() + venue._last_snapshot = None + + result = venue.connect() + assert result is True + + +# ── submit() sync path with None snapshots ──────────────────────────────────── + +class TestSubmitSyncNoneSnapshots: + def test_submit_sync_filled_emits_full_fill(self): + """Sync submit() with None snapshots must still emit FULL_FILL for FILLED receipt.""" + backend = MagicMock() + backend.submit_intent = MagicMock(return_value=_mock_receipt("FILLED", 10.0)) + venue = BingxVenueAdapter.__new__(BingxVenueAdapter) + venue.backend = backend + venue._event_seq = itertools.count(1) + venue._snap_lock = threading.Lock() + venue._snapshot_ready = threading.Event() + venue._snapshot_ready.set() + venue._last_snapshot = None + + intent = _make_intent(size=10.0) + # Patch _call_backend to return the receipt directly + from unittest.mock import patch as _patch + with _patch.object(venue, "_call_backend", return_value=_mock_receipt("FILLED", 10.0)): + with _patch.object(venue, "_legacy_intent", return_value=MagicMock()): + events = venue.submit(intent) + + kinds = [e.kind for e in events] + assert KernelEventKind.FULL_FILL in kinds + + +# ── cancel() dead-branch audit ──────────────────────────────────────────────── + +class TestCancelBranchAudit: + def test_cancel_uses_cancel_not_cancel_order(self): + """BingxDirectExecutionAdapter has cancel(), not cancel_order(). + The cancel_order branch was dead code — confirm cancel() uses the right branch.""" + backend = MagicMock() + # Has cancel but not cancel_order + backend.cancel = AsyncMock(return_value={"status": "CANCELED", "orderId": "O1", "clientOrderId": "C1"}) + del backend.cancel_order # ensure cancel_order doesn't exist + venue = BingxVenueAdapter.__new__(BingxVenueAdapter) + venue.backend = backend + venue._event_seq = itertools.count(1) + venue._snap_lock = threading.Lock() + venue._snapshot_ready = threading.Event() + venue._snapshot_ready.set() + venue._last_snapshot = None + + order = _make_order() + # Sync cancel goes through _call_backend → _run + # With asyncio not running, _run calls asyncio.run() directly + events = venue.cancel(order) + assert len(events) >= 1 + assert events[0].kind == KernelEventKind.CANCEL_ACK + + def test_cancel_no_cancel_attribute_falls_to_third_branch(self): + """If backend has neither cancel_order nor cancel, raise RuntimeError for live backend.""" + backend = MagicMock(spec=[]) # no methods + venue = BingxVenueAdapter.__new__(BingxVenueAdapter) + venue.backend = backend + venue._event_seq = itertools.count(1) + venue._snap_lock = threading.Lock() + venue._snapshot_ready = threading.Event() + venue._snapshot_ready.set() + venue._last_snapshot = None + + order = _make_order() + with pytest.raises(RuntimeError, match="cancel surface"): + venue.cancel(order) diff --git a/prod/clean_arch/dita_v2/test_exchange_event_seam_parity.py b/prod/clean_arch/dita_v2/test_exchange_event_seam_parity.py index 59b7b24..b05fd30 100644 --- a/prod/clean_arch/dita_v2/test_exchange_event_seam_parity.py +++ b/prod/clean_arch/dita_v2/test_exchange_event_seam_parity.py @@ -240,7 +240,7 @@ class TestMockSubscribe: def test_account_snapshot_is_account_update(self): mock = self._make_mock() - snap = asyncio.get_event_loop().run_until_complete(mock.account_snapshot()) + snap = asyncio.run(mock.account_snapshot()) assert snap.kind == ExchangeEventKind.ACCOUNT_UPDATE assert snap.source == "poll" assert snap.wallet_balance >= 0.0 @@ -261,7 +261,7 @@ class TestMockSubscribe: async for event in mock.subscribe(): received.append(event) break # take one - asyncio.get_event_loop().run_until_complete(asyncio.wait_for(_collect(), timeout=2.0)) + asyncio.run(asyncio.wait_for(_collect(), timeout=2.0)) assert len(received) == 1 assert received[0].fill_price == pytest.approx(100.0)