diff --git a/prod/clean_arch/adapters/bingx_direct.py b/prod/clean_arch/adapters/bingx_direct.py index b934b62..7ab0752 100644 --- a/prod/clean_arch/adapters/bingx_direct.py +++ b/prod/clean_arch/adapters/bingx_direct.py @@ -441,6 +441,63 @@ class BingxDirectExecutionAdapter(ExecutionPort): self._state = await self._refresh_exchange_state(intent.asset, include_history=True) return receipt + async def cancel(self, order: Any, *, reason: str = "") -> dict[str, Any]: + """Cancel a working order on the venue (resting LIMIT support). + + Signs the DELETE with the same client used for order placement, keyed by + the venue orderId (propagated onto the slot order by the kernel on ACK) + with a clientOrderId fallback. Returns the raw BingX response for the + venue adapter to map into a CANCEL_ACK / CANCEL_REJECT event. + """ + asset = str((getattr(order, "metadata", None) or {}).get("asset") or "") + symbol = self._instrument_venue_symbol(asset) if asset else "" + params: dict[str, Any] = { + "symbol": symbol, + "recvWindow": str(int(self._config.recv_window_ms)), + } + venue_order_id = str(getattr(order, "venue_order_id", "") or "") + venue_client_id = str(getattr(order, "venue_client_id", "") or "") + if venue_order_id: + params["orderId"] = venue_order_id + elif venue_client_id: + params["clientOrderId"] = venue_client_id + else: + return {"status": "REJECTED", "msg": "no order id to cancel", + "orderId": venue_order_id, "clientOrderId": venue_client_id} + delete_resp: dict[str, Any] = {} + try: + resp = await self._client.signed_delete("/openApi/swap/v2/trade/order", params) + delete_resp = resp if isinstance(resp, dict) else {"status": "CANCELED"} + except BingxHttpError as exc: + delete_resp = {"status": "RATE_LIMITED" if _is_rate_limited_error(exc) else "ERROR", "msg": str(exc)} + + # Truth-based confirmation: the cancel succeeded iff the order is no + # longer open on the venue. BingX can return transient errors (e.g. + # "order not exist", "same order number ... within 1 second" from an + # internal retry) even when the order was actually removed — so we trust + # exchange state, not the DELETE response. + still_open: bool | None = None + try: + oo = await self._client.signed_get("/openApi/swap/v2/trade/openOrders", {"symbol": symbol}) + rows = oo if isinstance(oo, list) else (oo.get("data") or oo.get("orders") or []) + if isinstance(rows, dict): + rows = rows.get("orders") or [] + ids = {str(r.get("orderId")) for r in rows if isinstance(r, dict)} + cids = {str(r.get("clientOrderId") or r.get("clientOrderID")) for r in rows if isinstance(r, dict)} + still_open = (venue_order_id in ids) if venue_order_id else (venue_client_id in cids) + except Exception: + still_open = None + + if still_open is False: + return {"status": "CANCELED", "orderId": venue_order_id, "clientOrderId": venue_client_id} + if str(delete_resp.get("status", "")).upper() in {"CANCELED", "CANCELLED", "SUCCESS", "OK"}: + return {"status": "CANCELED", "orderId": venue_order_id, "clientOrderId": venue_client_id} + return { + "status": delete_resp.get("status", "REJECTED"), + "msg": delete_resp.get("msg", "cancel not confirmed"), + "orderId": venue_order_id, "clientOrderId": venue_client_id, + } + async def reconcile(self, symbol: str | None = None) -> ExchangeStateSnapshot: # Recovery-only path: ask the venue for authoritative account/position/order state. return await self._refresh_exchange_state(symbol, include_history=True) diff --git a/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs index 25cbc83..b6bb35d 100644 --- a/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs +++ b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs @@ -1111,6 +1111,29 @@ impl KernelCore { let mut accepted = true; let mut diagnostic_code = KernelDiagnosticCode::OK; + // Propagate the venue's order id onto the working order whenever the + // exchange provides one — for ALL order types and event kinds (ACK, + // partial/full fill). Orders are created at submit time with an empty + // venue_order_id; recording the assigned id lets a later cancel + // reference the real exchange order (essential for resting LIMIT cancel; + // harmless for MARKET, which fills synchronously). Only fills empty ids + // (never overwrites) and targets the currently-active order. + if !event.venue_order_id.is_empty() { + let target = if slot.active_entry_order.is_some() { + slot.active_entry_order.as_mut() + } else { + slot.active_exit_order.as_mut() + }; + if let Some(order) = target { + if order.venue_order_id.is_empty() { + order.venue_order_id = event.venue_order_id.clone(); + } + if !event.venue_client_id.is_empty() && order.venue_client_id.is_empty() { + order.venue_client_id = event.venue_client_id.clone(); + } + } + } + match event.kind { KernelEventKind::ORDER_ACK => { if slot.active_entry_order.is_some() diff --git a/prod/tests/test_bingx_direct_limit_order.py b/prod/tests/test_bingx_direct_limit_order.py index 75af2ea..42f7245 100644 --- a/prod/tests/test_bingx_direct_limit_order.py +++ b/prod/tests/test_bingx_direct_limit_order.py @@ -71,3 +71,37 @@ def test_limit_without_valid_price_falls_back_to_market(): captured: dict = {} asyncio.run(_adapter(captured).submit_intent(_intent({"_order_type": "LIMIT", "_limit_price": 0.0}))) assert captured["order"]["type"] == "MARKET", captured["order"] + + +# --- cancel: truth-based confirmation (trust exchange state over the response) --- + +def _cancel_adapter(*, open_after: list): + a = BingxDirectExecutionAdapter.__new__(BingxDirectExecutionAdapter) + a._config = SimpleNamespace(recv_window_ms=5000) + a._instrument_venue_symbol = lambda asset: "TRX-USDT" + + async def _signed_delete(path, params): + # Simulate BingX returning a transient error even when the order is removed. + return {"status": "REJECTED", "msg": "order not exist"} + + async def _signed_get(path, params): + return {"data": {"orders": open_after}} + + a._client = SimpleNamespace(signed_delete=_signed_delete, signed_get=_signed_get) + return a + + +def _order(oid="2060963645141028864"): + return SimpleNamespace(venue_order_id=oid, venue_client_id="T:i", metadata={"asset": "TRXUSDT"}) + + +def test_cancel_succeeds_when_order_gone_despite_error_response(): + a = _cancel_adapter(open_after=[]) # order no longer open + resp = asyncio.run(a.cancel(_order())) + assert resp["status"] == "CANCELED", resp + + +def test_cancel_rejected_when_order_still_open(): + a = _cancel_adapter(open_after=[{"orderId": "2060963645141028864", "status": "PENDING"}]) + resp = asyncio.run(a.cancel(_order())) + assert resp["status"] != "CANCELED", resp diff --git a/prod/tests/test_pink_limit_live.py b/prod/tests/test_pink_limit_live.py new file mode 100644 index 0000000..dbcdaba --- /dev/null +++ b/prod/tests/test_pink_limit_live.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +"""L3 — live VST validation of the LIMIT execution path. + +The policy is MARKET-only (execution-infra scope), so LIMIT is validated by +injecting a LIMIT KernelIntent into the live kernel. This places a *non-marketable* +resting SHORT LIMIT (5% above market, so it will not fill), confirms the exchange +holds it as an open LIMIT order (i.e. the L2 wiring actually placed a LIMIT, not a +MARKET that would have filled), then cancels it and confirms the account is flat +with no dangling orders. + +The LIMIT fill -> async-fill-pump -> settle/persist path is deterministically +covered offline (test_pink_async_fill_pump.py); live we validate placement + +resting + cancel against the real venue. + +Gates: BINGX_SMOKE_LIVE, BINGX_SMOKE_ALLOW_TRADE, PINK_DITA_E2E, PINK_RUNTHROUGH. +Run on a FLAT account, from repo root with PYTHONPATH=/mnt/dolphinng5_predict. +""" + +from __future__ import annotations + +import asyncio +import os +from datetime import datetime, timezone + +import pytest + +for _gate in ("BINGX_SMOKE_LIVE", "BINGX_SMOKE_ALLOW_TRADE", "PINK_DITA_E2E", "PINK_RUNTHROUGH"): + if not os.environ.get(_gate): + pytest.skip(f"{_gate} not set", allow_module_level=True) + +from prod.tests.test_pink_bingx_dita_live_e2e import ( # noqa: E402 + _build_config, _pick_sym, _snap, _check_open_orders, _verify, _flatten, +) +from prod.bingx.http import BingxHttpClient # noqa: E402 +from prod.clean_arch.dita_v2.launcher import build_launcher_bundle # noqa: E402 +from prod.clean_arch.dita_v2.contracts import ( # noqa: E402 + KernelCommandType, KernelIntent, TradeSide, TradeStage, +) + + +def _intent(action, *, asset, price, size, order_type="MARKET", limit_price=0.0, reason="LIMIT_TEST"): + return KernelIntent( + timestamp=datetime.now(timezone.utc), intent_id=f"{reason}-{int(datetime.now().timestamp()*1000)}", + trade_id=f"{reason}-T", slot_id=0, asset=asset, side=TradeSide.SHORT, action=action, + reference_price=price, target_size=size, leverage=1.0, exit_leg_ratios=(1.0,), + reason=reason, order_type=order_type, limit_price=limit_price, + ) + + +async def _run() -> dict: + bundle = build_launcher_bundle(venue_mode="BINGX", max_slots=1, bingx_config=_build_config()) + k = bundle.kernel + k.account.snapshot.capital = 25_000.0 + k.account.snapshot.peak_capital = 25_000.0 + k.account.snapshot.equity = 25_000.0 + client = BingxHttpClient(_build_config()) + + sym = await _pick_sym(k, client) + snap, vsym = await _snap(client, sym) + p = float(snap.price) + assert p > 0, f"no live price for {sym}" + k.venue.connect() + try: + assert k.slot(0).is_free(), f"slot not free (state={k.slot(0).fsm_state}); flatten the account first" + + # Non-marketable resting SHORT LIMIT: sell 5% above market -> will not fill. + # Size to clear the exchange minimum order amount (~$25 notional); + # _format_quantity only quantizes to step, it does NOT floor to the min. + limit_px = round(p * 1.05, 6) + size = round(25.0 / p, 3) + out = k.process_intent(_intent( + KernelCommandType.ENTER, asset=sym, price=limit_px, size=size, + order_type="LIMIT", limit_price=limit_px, + )) + assert out.accepted, f"LIMIT entry rejected: {out.diagnostic_code} {out.details}" + await asyncio.sleep(1.5) + slot = k.slot(0) + # A real resting LIMIT must NOT fill synchronously (a MARKET would have). + assert slot.fsm_state == TradeStage.ENTRY_WORKING, f"expected ENTRY_WORKING, got {slot.fsm_state}" + assert abs(slot.size) < 1e-9, f"resting LIMIT should not be filled, size={slot.size}" + + # Exchange truth: a resting LIMIT order exists for the symbol. + oos = await _check_open_orders(client, vsym) + types = [str(o.get("type", "")).upper() for o in oos] + assert oos, "no open order on the exchange — LIMIT was not placed (or filled as MARKET)" + assert any(t == "LIMIT" for t in types), f"open order is not a LIMIT: {types}" + + # Cancel the working LIMIT -> back to IDLE, account flat, no dangling order. + k.process_intent(_intent(KernelCommandType.CANCEL, asset=sym, price=limit_px, size=0.001, reason="LIMIT_CANCEL")) + await asyncio.sleep(1.5) + assert k.slot(0).is_free(), f"slot not free after cancel: {k.slot(0).fsm_state}" + result = {"symbol": sym, "limit_px": limit_px, "open_order_types": types} + finally: + # Safety net: cancel/flatten anything left. + try: + if not k.slot(0).is_free(): + _flatten(k, sym, p, "limit-post") + except Exception: + pass + try: + k.venue.disconnect() + except Exception: + pass + + vr = await _verify(client, vsym) + assert vr.positions_flat, f"exchange not flat after cancel: {vr.error}" + return result + + +def test_pink_limit_order_rests_and_cancels() -> None: + result = asyncio.run(_run()) + print(f"[PINK limit live] {result}")