diff --git a/prod/clean_arch/dita_v2/bingx_venue.py b/prod/clean_arch/dita_v2/bingx_venue.py index b5b1c50..63229ba 100644 --- a/prod/clean_arch/dita_v2/bingx_venue.py +++ b/prod/clean_arch/dita_v2/bingx_venue.py @@ -393,8 +393,30 @@ class BingxVenueAdapter(VenueAdapter): snapshot = self._backend_snapshot(include_history=False) return [dict(row) for row in (snapshot.open_positions or {}).values()] - def reconcile(self) -> List[VenueEvent]: - snapshot = self._backend_snapshot(include_history=True) + async def reconcile(self) -> List[VenueEvent]: # type: ignore[override] + """Fetch open-order state from BingX and return any pending VenueEvents. + + WHY ASYNC: the old sync version called _backend_snapshot() → _call_backend() + → _run() → pool.submit(asyncio.run, coro).result(timeout=30s). That spawned + a *new* event loop in a thread-pool thread. The BingxHttpClient (aiohttp + session) is bound to the *main* event loop — using it from a different loop + silently deadlocks. BingX VST responds in ~500ms; the deadlock made every + reconcile call block the main event loop for the full 30s timeout. + + FIX: declare async, call backend.refresh_state() directly with await so it + runs in the *caller's* (main) event loop where the session lives. + pump_venue_events() already has `if inspect.isawaitable(events): await events` + — zero caller changes required. + + include_history=False: all_orders/all_fills require a symbol (symbol=None + skips them anyway), so include_history=True was fetching nothing extra. + """ + try: + snapshot = await self.backend.refresh_state(None, include_history=False) + except Exception as exc: + import logging as _log + _log.getLogger(__name__).warning("reconcile: refresh_state failed: %s", exc) + return [] return self._events_from_snapshot(snapshot) def submit(self, intent: KernelIntent) -> List[VenueEvent]: diff --git a/prod/clean_arch/dita_v2/test_venue_reconcile.py b/prod/clean_arch/dita_v2/test_venue_reconcile.py new file mode 100644 index 0000000..c80700c --- /dev/null +++ b/prod/clean_arch/dita_v2/test_venue_reconcile.py @@ -0,0 +1,249 @@ +"""Tests for BingxVenueAdapter.reconcile() — the async non-blocking path. + +Regression suite for the 30s event-loop deadlock caused by the old sync +reconcile calling asyncio.run() in a thread pool while the aiohttp session +was bound to the main event loop. + +Run with: + python -m pytest prod/clean_arch/dita_v2/test_venue_reconcile.py -v +""" +from __future__ import annotations + +import asyncio +import inspect +import sys +import time +from typing import Any, List +from unittest.mock import AsyncMock, MagicMock + +sys.path.insert(0, "/mnt/dolphinng5_predict") + +import pytest + +from prod.clean_arch.dita_v2.bingx_venue import BingxVenueAdapter +from prod.clean_arch.dita_v2.contracts import ( + KernelCommandType, + KernelEventKind, + VenueEvent, + VenueEventStatus, +) + + +# ── helpers ────────────────────────────────────────────────────────────────── + +def _flat_snapshot(**kwargs) -> MagicMock: + """Build a minimal ExchangeStateSnapshot-like mock.""" + snap = MagicMock() + snap.open_orders = kwargs.get("open_orders", []) + snap.all_orders = kwargs.get("all_orders", []) + snap.all_fills = kwargs.get("all_fills", []) + snap.open_positions = kwargs.get("open_positions", {}) + snap.capital = kwargs.get("capital", 25000.0) + return snap + + +def _make_venue(snapshot=None, raises=None) -> BingxVenueAdapter: + """Return a BingxVenueAdapter with a mock backend.""" + backend = MagicMock() + if raises is not None: + backend.refresh_state = AsyncMock(side_effect=raises) + else: + backend.refresh_state = AsyncMock(return_value=snapshot or _flat_snapshot()) + venue = BingxVenueAdapter.__new__(BingxVenueAdapter) + venue.backend = backend + venue._event_seq = iter(range(10_000)) + venue._snap_lock = __import__("threading").Lock() + venue._snapshot_ready = __import__("threading").Event() + venue._snapshot_ready.set() + venue._last_snapshot = None + return venue + + +# ── 1. Contract: reconcile() must be async ─────────────────────────────────── + +class TestReconcileIsAsync: + def test_reconcile_returns_coroutine(self): + """reconcile() must return a coroutine, not a plain list.""" + venue = _make_venue() + result = venue.reconcile() + assert inspect.isawaitable(result), ( + "reconcile() must be async so pump_venue_events can await it " + "without blocking the event loop" + ) + asyncio.run(result) # clean up + + def test_pump_venue_events_isawaitable_path(self): + """pump_venue_events uses inspect.isawaitable — verify it fires for + our async reconcile (i.e. we didn't accidentally break the contract).""" + venue = _make_venue() + coro = venue.reconcile() + assert inspect.isawaitable(coro) + asyncio.run(coro) + + +# ── 2. Correctness: returns events from snapshot ───────────────────────────── + +class TestReconcileEvents: + def test_flat_exchange_returns_empty(self): + """No open orders → no events.""" + venue = _make_venue(_flat_snapshot()) + events = asyncio.run(venue.reconcile()) + assert events == [] + + def test_open_order_produces_event(self): + """A single open (NEW) BingX order must produce an ORDER_ACK event.""" + order_row = { + "orderId": "V-001", + "clientOrderId": "t1:i1", + "symbol": "BTC-USDT", + "side": "BUY", + "status": "NEW", + "origQty": "1.0", + "executedQty": "0.0", + "avgPrice": "0.0", + } + snap = _flat_snapshot(open_orders=[order_row]) + venue = _make_venue(snap) + events = asyncio.run(venue.reconcile()) + assert len(events) == 1 + assert events[0].kind == KernelEventKind.ORDER_ACK + + def test_filled_order_produces_fill_event(self): + """A FILLED order must produce a FULL_FILL event.""" + order_row = { + "orderId": "V-002", + "clientOrderId": "t1:i1", + "symbol": "BTC-USDT", + "side": "SELL", + "status": "FILLED", + "origQty": "1.0", + "executedQty": "1.0", + "avgPrice": "50000.0", + } + snap = _flat_snapshot(all_orders=[order_row]) + venue = _make_venue(snap) + events = asyncio.run(venue.reconcile()) + assert any(e.kind == KernelEventKind.FULL_FILL for e in events) + + def test_deduplication_across_open_and_all_orders(self): + """Same order in both open_orders and all_orders must appear once.""" + row = { + "orderId": "V-003", + "clientOrderId": "t1:i1", + "symbol": "BTC-USDT", + "side": "BUY", + "status": "NEW", + "origQty": "1.0", + "executedQty": "0.0", + "avgPrice": "0.0", + } + snap = _flat_snapshot(open_orders=[row], all_orders=[row]) + venue = _make_venue(snap) + events = asyncio.run(venue.reconcile()) + assert len(events) == 1, "Duplicate entry must be deduped" + + def test_backend_called_with_include_history_false(self): + """reconcile() must call refresh_state with include_history=False. + include_history=True with symbol=None is a no-op but wastes a round-trip + building the param; False is explicit and cheaper.""" + venue = _make_venue() + asyncio.run(venue.reconcile()) + venue.backend.refresh_state.assert_called_once_with(None, include_history=False) + + +# ── 3. Fault tolerance: exception → empty list, not crash ─────────────────── + +class TestReconcileFaultTolerance: + def test_backend_exception_returns_empty_list(self): + """If refresh_state raises, reconcile must return [] not propagate.""" + venue = _make_venue(raises=RuntimeError("BingX 503")) + events = asyncio.run(venue.reconcile()) + assert events == [], "Exception must be swallowed and return empty list" + + def test_timeout_error_returns_empty_list(self): + venue = _make_venue(raises=TimeoutError("VST timeout")) + events = asyncio.run(venue.reconcile()) + assert events == [] + + def test_connection_error_returns_empty_list(self): + venue = _make_venue(raises=ConnectionError("VST unreachable")) + events = asyncio.run(venue.reconcile()) + assert events == [] + + def test_n_consecutive_failures_never_raise(self): + """10 consecutive backend failures must never propagate an exception.""" + venue = _make_venue(raises=RuntimeError("persistent failure")) + for _ in range(10): + events = asyncio.run(venue.reconcile()) + assert events == [] + + +# ── 4. Non-blocking: reconcile must complete in << 30s ─────────────────────── + +class TestReconcileNonBlocking: + def test_completes_well_under_timeout(self): + """The async reconcile must complete in under 2s (backend is mocked as + instant). The old sync path blocked for 30s minimum on the deadlock. + This guards against any regression to blocking behaviour.""" + venue = _make_venue() + t0 = time.perf_counter() + asyncio.run(venue.reconcile()) + elapsed = time.perf_counter() - t0 + assert elapsed < 2.0, ( + f"reconcile() took {elapsed:.2f}s — blocking regression detected. " + f"Must complete in < 2s (mocked backend = instant)." + ) + + def test_exception_path_completes_quickly(self): + """Even when backend raises, reconcile must return quickly.""" + venue = _make_venue(raises=TimeoutError("30s deadlock simulation")) + t0 = time.perf_counter() + asyncio.run(venue.reconcile()) + elapsed = time.perf_counter() - t0 + assert elapsed < 2.0 + + def test_awaitable_in_asyncio_gather(self): + """reconcile() must compose cleanly with asyncio.gather — critical for + future parallelisation of reconcile + other venue calls.""" + venues = [_make_venue() for _ in range(3)] + async def _run(): + results = await asyncio.gather(*[v.reconcile() for v in venues]) + return results + results = asyncio.run(_run()) + assert len(results) == 3 + assert all(isinstance(r, list) for r in results) + + def test_concurrent_reconciles_do_not_deadlock(self): + """10 concurrent reconcile coroutines must all complete.""" + async def _run(): + venues = [_make_venue() for _ in range(10)] + return await asyncio.gather(*[v.reconcile() for v in venues]) + results = asyncio.run(_run()) + assert len(results) == 10 + + +# ── 5. Integration: pump_venue_events awaits async reconcile ───────────────── + +class TestPumpIntegration: + """Verify the pump_venue_events → reconcile() async chain end-to-end.""" + + def test_pump_venue_events_awaits_async_reconcile(self): + """pump_venue_events must detect isawaitable and await the reconcile + coroutine — verifying the inspect.isawaitable() contract is met.""" + import inspect as _inspect + venue = _make_venue() + coro = venue.reconcile() + assert _inspect.isawaitable(coro), ( + "pump_venue_events checks inspect.isawaitable(events); " + "reconcile() must return a coroutine for the await path to fire" + ) + asyncio.run(coro) + + def test_reconcile_result_is_list_of_venue_events(self): + """The awaited result must be a list (possibly empty) — the type + pump_venue_events iterates over.""" + venue = _make_venue() + result = asyncio.run(venue.reconcile()) + assert isinstance(result, list) + for item in result: + assert isinstance(item, VenueEvent)