From 55ed6902d846ed0a9e03195e10b61fdb54015f8d Mon Sep 17 00:00:00 2001 From: Codex Date: Sun, 31 May 2026 03:23:44 +0200 Subject: [PATCH] PINK DITAv2 L0-L2: two-phase persistence + async-fill pump + LIMIT wiring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Execution-infra only (policy stays MARKET; algorithmic integrity untouched). L0 — two-phase (request->result) persistence (pink_clickhouse.py): - Split persist_step into persist_request (policy_events + trade_reconstruction ORDER_REQUESTED) and persist_result (state snapshot + per-fill lifecycle rows). - Lifecycle rows (ENTRY_FILLED/EXIT/trade_events/trade_exit_legs) gated on evidence of an actual fill (FULL/PARTIAL_FILL event, closed slot, or size drop vs _leg_state) -> a resting LIMIT (ACK only) emits no terminal rows. - Add persist_fill_events: synthesizes a minimal decision/intent from slot+event for async fills and routes through persist_result. L1 — async-fill pump (pink_direct.py): - PinkDirectRuntime.pump_venue_events(): venue.reconcile() -> kernel.on_venue_event (capital settles, FSM advances), persists applied fills; kernel dedups duplicates (no double-settle). Called at the start of step(). L2 — LIMIT placement (bingx_direct.py): - submit_intent now honors _order_type/_limit_price from intent metadata (was hardcoded MARKET): LIMIT -> type=LIMIT + price + GTC; MARKET default; invalid limit price falls back to MARKET. Offline: 63 passed (persistence/groundwork/pump/limit-payload/runtime/accounting/ flaws/kernel). MARKET path unchanged; resting LIMIT now correct end-to-end offline. Live VST validation (L3) pending. BLUE untouched. Co-Authored-By: Claude Opus 4.8 --- prod/clean_arch/adapters/bingx_direct.py | 446 ++++++++++++++++++ .../clean_arch/persistence/pink_clickhouse.py | 192 ++++++-- prod/clean_arch/runtime/pink_direct.py | 63 +++ prod/tests/test_bingx_direct_limit_order.py | 73 +++ prod/tests/test_pink_async_fill_pump.py | 182 +++++++ .../tests/test_pink_clickhouse_persistence.py | 97 ++++ 6 files changed, 1025 insertions(+), 28 deletions(-) create mode 100644 prod/clean_arch/adapters/bingx_direct.py create mode 100644 prod/tests/test_bingx_direct_limit_order.py create mode 100644 prod/tests/test_pink_async_fill_pump.py diff --git a/prod/clean_arch/adapters/bingx_direct.py b/prod/clean_arch/adapters/bingx_direct.py new file mode 100644 index 0000000..b934b62 --- /dev/null +++ b/prod/clean_arch/adapters/bingx_direct.py @@ -0,0 +1,446 @@ +"""Direct BingX execution adapter with no Nautilus Trader node dependency. + +This adapter speaks BingX REST directly and keeps the exchange state +authoritative. It is intended for PINK live execution under the DITA boundary. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import math +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone +from decimal import Decimal, ROUND_DOWN +from typing import Any, Optional + +from nautilus_trader.model.identifiers import InstrumentId + +from prod.bingx.config import BingxExecClientConfig +from prod.bingx.config import BingxInstrumentProviderConfig +from prod.bingx.enums import BingxEnvironment +from prod.bingx.http import BingxHttpError +from prod.bingx.http import BingxHttpClient +from prod.bingx.instrument_provider import BingxInstrumentProvider +from prod.bingx.leverage import normalize_bingx_leverage_value +from prod.bingx.schemas import BingxOrderAck +from prod.bingx.schemas import unwrap_order_payload +from prod.clean_arch.dita import Intent, TradeSide, DecisionAction +from prod.clean_arch.ports.execution import ExchangeStateSnapshot +from prod.clean_arch.ports.execution import ExecutionReceipt +from prod.clean_arch.ports.execution import ExecutionPort + +LOGGER = logging.getLogger(__name__) + + +def _rows_from_payload(payload: Any, *keys: str) -> list[dict[str, Any]]: + if isinstance(payload, list): + return [row for row in payload if isinstance(row, dict)] + if isinstance(payload, dict): + for key in keys: + rows = payload.get(key) + if isinstance(rows, list): + return [row for row in rows if isinstance(row, dict)] + return [] + + +def _capital_from_balance_rows(rows: Any) -> float: + if not isinstance(rows, list): + return 0.0 + for row in rows: + if not isinstance(row, dict): + continue + capital = 0.0 + for key in ("total", "balance", "equity", "availableMargin", "availableBalance", "walletBalance", "free"): + try: + capital = float(row.get(key, 0.0) or 0.0) + except Exception: + continue + if capital > 0 and math.isfinite(capital): + return capital + if capital > 0 and math.isfinite(capital): + return capital + return 0.0 + + +def _position_notional_from_rows(rows: Any) -> float: + if not isinstance(rows, list): + return 0.0 + total = 0.0 + for row in rows: + if not isinstance(row, dict): + continue + try: + qty = abs( + float( + row.get("positionAmt") + or row.get("positionQty") + or row.get("positionSize") + or row.get("quantity") + or row.get("pa") + or 0.0 + ) + ) + if qty <= 0.0: + continue + notional = row.get("positionValue") or row.get("notional") or row.get("openNotional") + if notional is not None: + total += abs(float(notional or 0.0)) + continue + entry = ( + row.get("entryPrice") + or row.get("avgPrice") + or row.get("markPrice") + or row.get("avgEntryPrice") + or row.get("ep") + or row.get("ap") + or 0.0 + ) + total += qty * abs(float(entry or 0.0)) + except Exception: + continue + return total + + +def _normalize_symbol(symbol: str) -> str: + return str(symbol or "").replace("-", "").replace("_", "").replace("/","").upper() + + +def _venue_symbol_from_asset(asset: str) -> str: + text = _normalize_symbol(asset) + if text.endswith("USDT"): + return f"{text[:-4]}-USDT" + return text + + +def _decimal_text(value: Decimal) -> str: + text = format(value.normalize(), "f") + if "." in text: + text = text.rstrip("0").rstrip(".") + return text or "0" + + +def _is_rate_limited_error(exc: Exception) -> bool: + message = str(exc) + lowered = message.lower() + return "100410" in message or "frequency limit" in lowered or "rate limit" in lowered + + +@dataclass(frozen=True) +class BingxDirectExecutionConfig: + """Execution-specific knobs for the direct adapter.""" + + environment: BingxEnvironment = BingxEnvironment.VST + allow_mainnet: bool = False + default_leverage: int = 1 + exchange_leverage_cap: int = 3 + recv_window_ms: int = 5_000 + prefer_websocket: bool = False + use_reduce_only: bool = True + journal_strategy: str = "pink" + journal_db: str = "dolphin_pink" + instrument_provider: BingxInstrumentProviderConfig = BingxInstrumentProviderConfig(load_all=True) + + +class BingxDirectExecutionAdapter(ExecutionPort): + """Direct BingX execution boundary with exchange-led state snapshots.""" + + def __init__( + self, + config: BingxExecClientConfig | BingxDirectExecutionConfig, + *, + client: BingxHttpClient | None = None, + provider: BingxInstrumentProvider | None = None, + ) -> None: + if isinstance(config, BingxExecClientConfig): + self._config = BingxDirectExecutionConfig( + environment=config.environment, + allow_mainnet=config.allow_mainnet, + default_leverage=int(config.default_leverage), + exchange_leverage_cap=int(config.exchange_leverage_cap), + recv_window_ms=int(config.recv_window_ms), + prefer_websocket=bool(config.prefer_websocket), + use_reduce_only=bool(config.use_reduce_only), + journal_strategy=str(config.journal_strategy or "pink"), + journal_db=str(config.journal_db or "dolphin_pink"), + instrument_provider=config.instrument_provider, + ) + http_config = config + else: + self._config = config + http_config = BingxExecClientConfig( + api_key="", + secret_key="", + environment=config.environment, + allow_mainnet=config.allow_mainnet, + prefer_websocket=config.prefer_websocket, + sizing_mode="testnet", + exchange_leverage_cap=config.exchange_leverage_cap, + use_reduce_only=config.use_reduce_only, + default_leverage=config.default_leverage, + recv_window_ms=config.recv_window_ms, + journal_strategy=config.journal_strategy, + journal_db=config.journal_db, + instrument_provider=config.instrument_provider, + ) + self._client = client or BingxHttpClient(http_config) + self._provider = provider or BingxInstrumentProvider(client=self._client, config=self._config.instrument_provider) + self._log = LOGGER + self._client_order_run_id = uuid.uuid4().hex[:8] + self._entry_client_order_seq = 0 + self._exit_client_order_seq = 0 + self._state: ExchangeStateSnapshot | None = None + self._connected = False + + @property + def state(self) -> ExchangeStateSnapshot | None: + return self._state + + async def connect(self) -> bool: + await self._provider.initialize() + self._connected = True + self._state = await self.refresh_state() + return True + + async def disconnect(self) -> None: + self._connected = False + await self._client.close() + + def _resolve_instrument(self, asset: str): + normalized = _normalize_symbol(asset) + candidates = [ + InstrumentId.from_str(f"{normalized}.BINGX"), + InstrumentId.from_str(f"{_venue_symbol_from_asset(asset)}.BINGX"), + ] + for candidate in candidates: + instrument = self._provider.find(candidate) + if instrument is not None: + return instrument + for instrument in self._provider.list_all(): + if _normalize_symbol(instrument.symbol.value) == normalized: + return instrument + if _normalize_symbol(instrument.raw_symbol.value) == normalized: + return instrument + return None + + def _instrument_venue_symbol(self, asset: str) -> str: + instrument = self._resolve_instrument(asset) + if instrument is not None: + return str(instrument.raw_symbol.value) + return _venue_symbol_from_asset(asset) + + def _instrument_step(self, asset: str) -> Decimal: + instrument = self._resolve_instrument(asset) + if instrument is not None: + try: + return Decimal(str(instrument.size_increment.as_decimal())) + except Exception: + pass + return Decimal("0.001") + + def _format_quantity(self, asset: str, quantity: float) -> str: + step = self._instrument_step(asset) + if step <= 0: + return str(max(0.0, quantity)) + value = Decimal(str(quantity)) + quantized = (value / step).to_integral_value(rounding=ROUND_DOWN) * step + return _decimal_text(max(Decimal("0"), quantized)) + + def _instrument_tick(self, asset: str) -> Decimal: + instrument = self._resolve_instrument(asset) + if instrument is not None: + try: + tick = getattr(instrument, "price_increment", None) + if tick is not None: + return Decimal(str(tick.as_decimal())) + except Exception: + pass + return Decimal("0.01") + + def _format_price(self, asset: str, price: float) -> str: + tick = self._instrument_tick(asset) + if tick <= 0: + return f"{price:.8f}".rstrip("0").rstrip(".") + value = Decimal(str(price)) + quantized = (value / tick).to_integral_value(rounding=ROUND_DOWN) * tick + return _decimal_text(max(Decimal("0"), quantized)) + + async def _safe_get(self, endpoint: str, params: dict | None = None, *, fallback: Any = None) -> Any: + """GET an endpoint, returning *fallback* on rate-limit errors.""" + try: + return await self._client.signed_get(endpoint, params) + except BingxHttpError as exc: + message = str(exc) + if "100410" in message or "frequency limit" in message.lower(): + LOGGER.debug("BingX %s rate-limited; continuing with empty snapshot", endpoint) + return fallback if fallback is not None else [] + raise + + async def _refresh_exchange_state(self, symbol: str | None = None, *, include_history: bool = False) -> ExchangeStateSnapshot: + """Fetch exchange state with parallel HTTP calls. + + The three primary calls (balance, positions, openOrders) are + independent and run concurrently via ``asyncio.gather``. Each has + its own rate-limit fallback so a single throttle does not block + the others. Historical calls (allOrders, allFillOrders) are gated + on ``include_history`` and also gathered. + """ + balance_task = self._safe_get("/openApi/swap/v2/user/balance") + positions_task = self._safe_get("/openApi/swap/v2/user/positions") + orders_task = self._safe_get("/openApi/swap/v2/trade/openOrders") + + balance_payload, positions_payload, open_orders_payload = await asyncio.gather( + balance_task, positions_task, orders_task, + ) + + all_orders_payload: Any = [] + all_fills_payload: Any = [] + if include_history and symbol is not None: + venue_symbol = self._instrument_venue_symbol(symbol) + hist_tasks = asyncio.gather( + self._safe_get("/openApi/swap/v2/trade/allOrders", {"symbol": venue_symbol}), + self._safe_get("/openApi/swap/v2/trade/allFillOrders", {"symbol": venue_symbol}), + return_exceptions=True, + ) + results = await hist_tasks + all_orders_payload = results[0] if not isinstance(results[0], Exception) else [] + all_fills_payload = results[1] if not isinstance(results[1], Exception) else [] + + # Parse results (shared logic, same as before) + if isinstance(balance_payload, list): + balances = balance_payload + elif isinstance(balance_payload, dict): + rows_raw = balance_payload.get("balance") or balance_payload.get("balances") or balance_payload.get("data") + if isinstance(rows_raw, dict): + balances = [rows_raw] + elif isinstance(rows_raw, list): + balances = rows_raw + else: + balances = [] + else: + balances = [] + positions_rows = _rows_from_payload(positions_payload, "positions", "data") + positions: dict[str, dict[str, Any]] = {} + for row in positions_rows: + raw_symbol = str(row.get("symbol") or row.get("symbolName") or row.get("venueSymbol") or "") + key = _normalize_symbol(raw_symbol) + if not key: + continue + positions[key] = dict(row) + open_orders = _rows_from_payload(open_orders_payload, "orders", "data") + capital = _capital_from_balance_rows(balances) + open_notional = _position_notional_from_rows(positions_rows) + equity = capital + if open_notional > 0 and positions_rows: + equity = capital + snapshot = ExchangeStateSnapshot( + timestamp=datetime.now(timezone.utc), + capital=capital, + equity=equity, + open_positions=positions, + open_orders=[dict(row) for row in open_orders], + all_orders=[dict(row) for row in _rows_from_payload(all_orders_payload, "orders", "data")], + all_fills=[dict(row) for row in _rows_from_payload(all_fills_payload, "fills", "data")], + account={"balances": balances}, + open_notional=open_notional, + source="bingx", + recovered=bool(include_history), + ) + self._state = snapshot + return snapshot + + async def refresh_state(self, symbol: str | None = None, *, include_history: bool = False) -> ExchangeStateSnapshot: + return await self._refresh_exchange_state(symbol, include_history=include_history) + + async def submit_intent(self, intent: Intent) -> ExecutionReceipt: + symbol = self._instrument_venue_symbol(intent.asset) + if intent.action == DecisionAction.EXIT: + side = "SELL" if intent.side == TradeSide.LONG else "BUY" + else: + side = "BUY" if intent.side == TradeSide.LONG else "SELL" + # Entries must be free to open the slot; only exits are reduce-only. + reduce_only = bool(intent.action == DecisionAction.EXIT) + if reduce_only: + self._exit_client_order_seq += 1 + client_order_id = f"pink:{self._client_order_run_id}:x{self._exit_client_order_seq:02d}" + else: + self._entry_client_order_seq += 1 + client_order_id = f"pink:{self._client_order_run_id}:e{self._entry_client_order_seq:02d}" + leverage = normalize_bingx_leverage_value( + int(round(float(intent.leverage or self._config.default_leverage))), + exchange_max=self._config.exchange_leverage_cap, + ) + try: + await self._client.signed_post( + "/openApi/swap/v2/trade/leverage", + {"symbol": symbol, "side": "BOTH", "leverage": leverage}, + ) + # Honor the order type forwarded by the venue adapter + # (bingx_venue._legacy_intent sets _order_type/_limit_price). MARKET + # is the default; a LIMIT carries a resting price + GTC and will not + # fill synchronously — the async-fill pump settles it later. + order_type = str((intent.metadata or {}).get("_order_type", "MARKET") or "MARKET").upper() + limit_price = float((intent.metadata or {}).get("_limit_price", 0.0) or 0.0) + is_limit = order_type == "LIMIT" and limit_price > 0.0 + payload: dict[str, Any] = { + "symbol": symbol, + "side": side, + "positionSide": "BOTH", + "type": "LIMIT" if is_limit else "MARKET", + "quantity": self._format_quantity(intent.asset, intent.target_size), + "clientOrderId": client_order_id, + "recvWindow": str(int(self._config.recv_window_ms)), + } + if is_limit: + payload["price"] = self._format_price(intent.asset, limit_price) + payload["timeInForce"] = "GTC" + if reduce_only: + payload["reduceOnly"] = "true" + ack_payload = await self._client.signed_post("/openApi/swap/v2/trade/order", payload) + ack = BingxOrderAck.from_http(ack_payload if isinstance(ack_payload, dict) else {}) + ack_row = dict(unwrap_order_payload(ack_payload)) if isinstance(ack_payload, dict) else {} + status = str(ack_row.get("status") or ack.status or "ACKED") + fill_price = 0.0 + for key in ("avgPrice", "avgFilledPrice", "price", "lastFillPrice", "tradePrice"): + try: + value = float(ack_row.get(key) or 0.0) + except Exception: + value = 0.0 + if value > 0: + fill_price = value + break + if fill_price <= 0 and self._state is not None: + # Use the last known exchange mark as a fallback for projected accounting. + fill_price = next((float(row.get("markPrice") or row.get("avgPrice") or 0.0) for row in self._state.open_positions.values() if float(row.get("markPrice") or row.get("avgPrice") or 0.0) > 0), 0.0) + except BingxHttpError as exc: + status = "RATE_LIMITED" if _is_rate_limited_error(exc) else "REJECTED" + ack_row = { + "status": status, + "msg": str(exc), + "symbol": symbol, + "clientOrderId": client_order_id, + } + fill_price = 0.0 + ack = None + receipt = ExecutionReceipt( + timestamp=datetime.now(timezone.utc), + status=status, + symbol=symbol, + side=side, + action=intent.action.value, + quantity=float(intent.target_size or 0.0), + price=fill_price, + client_order_id=client_order_id, + order_id=str((ack.order_id if 'ack' in locals() and ack is not None else '') or ack_row.get("orderId") or ""), + raw_ack=ack_row, + raw_state=dict(self._state.account if self._state is not None else {}), + ) + # Refresh from the venue so the direct runtime can use exchange-led state. + self._state = await self._refresh_exchange_state(intent.asset, include_history=True) + return receipt + + async def reconcile(self, symbol: str | None = None) -> ExchangeStateSnapshot: + # Recovery-only path: ask the venue for authoritative account/position/order state. + return await self._refresh_exchange_state(symbol, include_history=True) diff --git a/prod/clean_arch/persistence/pink_clickhouse.py b/prod/clean_arch/persistence/pink_clickhouse.py index dc26e5b..aac43b2 100644 --- a/prod/clean_arch/persistence/pink_clickhouse.py +++ b/prod/clean_arch/persistence/pink_clickhouse.py @@ -24,7 +24,8 @@ from enum import Enum from typing import Any, Callable, Mapping, Optional from prod.clean_arch.dita import AccountProjection, Decision, DecisionAction, Intent, TradeSide, TradeStage -from prod.clean_arch.dita_v2.contracts import KernelDiagnosticCode, KernelOutcome +from prod.clean_arch.dita_v2.contracts import KernelDiagnosticCode, KernelEventKind, KernelOutcome +from prod.clean_arch.dita_v2.contracts import KernelSeverity, TradeStage as KernelStage Writer = Callable[[str, dict[str, Any]], None] @@ -222,6 +223,71 @@ class PinkClickHousePersistence: phase: str = "step", market_state: Mapping[str, Any] | None = None, ) -> None: + """Two-phase persist: log the REQUEST, then log the RESULT. + + REQUEST (:meth:`persist_request`) — the decision/order that was + submitted (policy_events + a trade_reconstruction ORDER_REQUESTED row). + RESULT (:meth:`persist_result`) — the settled state snapshot plus the + per-fill lifecycle rows, gated on *evidence of an actual fill*. A resting + LIMIT order (ACK only, no fill) therefore emits state snapshots but no + terminal rows; the async-fill pump persists those later via the same + result path. The synchronous-MARKET path is unchanged: its FILL event + (or the slot's filled/closed state) trips the same gate. + """ + self.persist_request( + snapshot=snapshot, decision=decision, intent=intent, + phase=phase, market_state=market_state, + ) + self.persist_result( + snapshot=snapshot, decision=decision, intent=intent, outcome=outcome, + slot_dict=slot_dict, phase=phase, market_state=market_state, + ) + + def persist_request( + self, + *, + snapshot: Any, + decision: Decision, + intent: Intent, + phase: str = "step", + market_state: Mapping[str, Any] | None = None, + ) -> None: + """Phase 1 — log the requested decision/order (no fill data).""" + self._write_policy_event(snapshot, decision, intent, phase=phase) + if decision.action in (DecisionAction.ENTER, DecisionAction.EXIT): + self._write_trade_reconstruction( + snapshot, intent.trade_id, + event_type="ORDER_REQUESTED", + event_id=f"{intent.trade_id}:request:{decision.action.value.lower()}", + payload={ + "decision": _decision_summary(decision), + "intent": _intent_summary(intent), + "market_state": _json_safe(market_state or {}), + }, + market_state=market_state, + ) + + def persist_result( + self, + *, + snapshot: Any, + decision: Decision, + intent: Intent, + outcome: KernelOutcome | None = None, + slot_dict: dict[str, Any] | None = None, + phase: str = "step", + market_state: Mapping[str, Any] | None = None, + ) -> None: + """Phase 2 — log the settled state + per-fill lifecycle rows. + + The state snapshot rows (account_events, position_state, + status_snapshots) always reflect the current slot. The lifecycle rows + (ENTRY_FILLED / PARTIAL_EXIT / EXIT / trade_events / trade_exit_legs) are + emitted only when a fill is *evidenced* — a FULL/PARTIAL_FILL event in + ``outcome.emitted_events``, a closed slot, or a slot whose size dropped + vs the last leg snapshot. A resting LIMIT (ACK only) emits no terminal + rows here. + """ slot = slot_dict or {} stage = ( TradeStage(decision.stage.value) @@ -231,12 +297,10 @@ class PinkClickHousePersistence: ) status = self._state_label(slot, phase) - self._write_policy_event(snapshot, decision, intent, phase=phase) self._write_account_event(snapshot, decision, intent, stage=stage, slot_dict=slot) self._write_position_state(snapshot, decision, intent, slot_dict=slot, stage=stage, status=status, market_state=market_state) self._write_status_snapshot(snapshot, decision, intent, slot_dict=slot, phase=phase) - # Emit anomaly for diagnostic codes (except OK). if outcome is not None and outcome.diagnostic_code != KernelDiagnosticCode.OK: self._write_anomaly( snapshot, decision, intent, @@ -246,38 +310,56 @@ class PinkClickHousePersistence: ) if outcome is None: - # Decision-only step (HOLD, no execution). + # Decision-only step (HOLD): state snapshot already written. return + events = tuple(outcome.emitted_events or ()) + has_fill_evt = any( + e.kind in (KernelEventKind.FULL_FILL, KernelEventKind.PARTIAL_FILL) + for e in events + ) + slot_closed = bool(slot.get("closed", False)) + cur_size = _safe_float(slot.get("size", 0.0), 0.0) + slot_open = (not slot_closed) and cur_size > 0.0 + if decision.action == DecisionAction.ENTER: - # Reset per-trade leg deltas: a fresh position starts with zero - # realized PnL and the full initial size remaining. - self._leg_state[intent.trade_id] = { - "prev_realized": 0.0, - "prev_size": _safe_float( - slot.get("initial_size", slot.get("size", 0.0)), 0.0 - ) or _safe_float(intent.target_size, 0.0), - "prev_leg_id": "", - } - self._write_trade_reconstruction( - snapshot, intent.trade_id, - event_type="ENTRY_FILLED", - event_id=f"{intent.trade_id}:entry", - payload={ - "decision": _decision_summary(decision), - "intent": _intent_summary(intent), - "outcome": _outcome_summary(outcome), - "slot": slot, - "market_state": _json_safe(market_state or {}), - }, - market_state=market_state, - ) + # Emit ENTRY_FILLED only once the entry is actually filled (fill event + # or an open slot). A resting LIMIT entry emits nothing here. + if has_fill_evt or slot_open: + self._leg_state[intent.trade_id] = { + "prev_realized": 0.0, + "prev_size": _safe_float( + slot.get("initial_size", slot.get("size", 0.0)), 0.0 + ) or _safe_float(intent.target_size, 0.0), + "prev_leg_id": "", + } + self._write_trade_reconstruction( + snapshot, intent.trade_id, + event_type="ENTRY_FILLED", + event_id=f"{intent.trade_id}:entry", + payload={ + "decision": _decision_summary(decision), + "intent": _intent_summary(intent), + "outcome": _outcome_summary(outcome), + "slot": slot, + "market_state": _json_safe(market_state or {}), + }, + market_state=market_state, + ) return if decision.action != DecisionAction.EXIT: return - partial = slot.get("closed", False) is False and slot.get("size", 0) > 0 + # An exit leg is evidenced by a fill event, a closed slot, or a drop in + # remaining size vs the previous leg snapshot. A resting LIMIT exit (no + # size change) emits nothing until the async-fill pump observes the fill. + prev_size = _safe_float(self._leg_state.get(intent.trade_id, {}).get("prev_size", 0.0), 0.0) + exit_filled = has_fill_evt or slot_closed or (prev_size - cur_size > 1e-12) + if not exit_filled: + return + + partial = (not slot_closed) and cur_size > 0.0 # One trade_exit_legs row per exit leg (partial or final), BLUE-schema # compatible so PINK multi-exit trades reconcile against the same table. self._write_trade_exit_leg(snapshot, decision, intent, slot, outcome) @@ -295,9 +377,63 @@ class PinkClickHousePersistence: market_state=market_state, ) # Terminal trade event. - if slot.get("closed", False): + if slot_closed: self._write_trade_event(snapshot, decision, intent, slot, outcome, market_state=market_state) + def persist_fill_events( + self, + *, + snapshot: Any, + events: Any, + slot_dict: dict[str, Any] | None = None, + market_state: Mapping[str, Any] | None = None, + ) -> None: + """Persist a late (async) venue fill drained by the runtime pump. + + There is no fresh policy decision for an async fill, so we synthesize a + minimal Decision/Intent from the post-fill slot + event and route it + through :meth:`persist_result`. Direction (ENTER vs EXIT) is inferred + from the slot: a closed slot or a drop in remaining size vs the last leg + snapshot is an EXIT; otherwise an opening fill is an ENTER. Capital + authority remains the kernel — this only logs the settled result. + """ + slot = slot_dict or {} + event_list = tuple(events or ()) + trade_id = str(slot.get("trade_id") or "") + asset = str(slot.get("asset") or "") + side = self._slot_side(slot) + closed = bool(slot.get("closed", False)) + cur_size = self._slot_size(slot) + leverage = _safe_float(slot.get("leverage", 1.0), 1.0) + price = next((float(getattr(e, "price", 0.0) or 0.0) for e in event_list if getattr(e, "price", 0.0)), 0.0) or self._slot_entry_price(slot) + prev_size = _safe_float(self._leg_state.get(trade_id, {}).get("prev_size", 0.0), 0.0) + is_exit = closed or (prev_size > 0.0 and cur_size < prev_size - 1e-12) + action = DecisionAction.EXIT if is_exit else DecisionAction.ENTER + ts = getattr(snapshot, "timestamp", datetime.now(timezone.utc)) + + decision = Decision( + timestamp=ts, decision_id=trade_id or "async", asset=asset, action=action, + side=side, reason="ASYNC_FILL", confidence=0.0, velocity_divergence=0.0, + irp_alignment=0.0, reference_price=price, target_size=cur_size, + leverage=leverage, stage=TradeStage.POSITION_UPDATED, metadata={}, + ) + intent = Intent( + timestamp=ts, trade_id=trade_id, decision_id=trade_id or "async", asset=asset, + action=action, side=side, reason="ASYNC_FILL", target_size=cur_size, + leverage=leverage, reference_price=price, confidence=0.0, + exit_leg_ratios=tuple(slot.get("exit_leg_ratios", (1.0,)) or (1.0,)), metadata={}, + ) + outcome = KernelOutcome( + accepted=True, slot_id=int(slot.get("slot_id", 0) or 0), trade_id=trade_id, + state=KernelStage.CLOSED if closed else KernelStage.POSITION_OPEN, + diagnostic_code=KernelDiagnosticCode.OK, severity=KernelSeverity.INFO, + transitions=(), emitted_events=event_list, details={"origin": "async_fill_pump"}, + ) + self.persist_result( + snapshot=snapshot, decision=decision, intent=intent, outcome=outcome, + slot_dict=slot, phase="async_fill", market_state=market_state, + ) + def persist_recovery_state( self, *, diff --git a/prod/clean_arch/runtime/pink_direct.py b/prod/clean_arch/runtime/pink_direct.py index 30b2e4a..482ce1d 100644 --- a/prod/clean_arch/runtime/pink_direct.py +++ b/prod/clean_arch/runtime/pink_direct.py @@ -30,6 +30,7 @@ from prod.clean_arch.dita import ( ) from prod.clean_arch.dita_v2.contracts import ( KernelCommandType, + KernelDiagnosticCode, KernelIntent, TradeSide as DitaTradeSide, TradeStage, @@ -306,9 +307,68 @@ class PinkDirectRuntime: except Exception: return {} + async def pump_venue_events( + self, snapshot: Any | None = None, *, market_state: Any = None + ) -> int: + """Drain late (async) venue fills into the kernel and persist the result. + + Resting LIMIT and partial fills arrive *after* the submitting + ``process_intent`` returns. This calls ``venue.reconcile()`` and feeds + each event to ``kernel.on_venue_event`` so capital settles and the FSM + advances; the kernel dedups duplicates via ``seen_event_ids`` / + ``_last_settled_pnl`` (no double-settle). Only events the kernel actually + applied (accepted, not DUPLICATE_EVENT) are persisted, via the two-phase + result-logger. Capital authority stays ``kernel.account``. + + Returns the number of applied events. + """ + venue = self.kernel.venue + reconcile = getattr(venue, "reconcile", None) + if reconcile is None: + return 0 + try: + events = reconcile() + if inspect.isawaitable(events): + events = await events + except Exception as exc: + self.logger.warning("Venue reconcile failed: %s", exc) + return 0 + events = list(events or []) + if not events: + return 0 + + applied: list[Any] = [] + for event in events: + try: + outcome = self.kernel.on_venue_event(event) + except Exception as exc: + self.logger.warning("on_venue_event failed: %s", exc) + continue + if getattr(outcome, "accepted", False) and getattr( + outcome, "diagnostic_code", None + ) != KernelDiagnosticCode.DUPLICATE_EVENT: + applied.append(event) + + if applied and self.persistence is not None: + slot_dict = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} + persist_snapshot = snapshot + if persist_snapshot is None: + persist_snapshot = SimpleNamespace( + timestamp=datetime.now(timezone.utc), + symbol=str(slot_dict.get("asset", "")), + ) + self.persistence.persist_fill_events( + snapshot=persist_snapshot, + events=applied, + slot_dict=slot_dict, + market_state=market_state or {}, + ) + return len(applied) + async def step(self, snapshot: MarketSnapshot) -> Decision: """Single policy + execution cycle. + 0. Pump late (async) venue fills into the kernel (LIMIT/partial settle) 1. Update market state 2. Decide (policy layer) 3. Plan (intent layer) @@ -317,6 +377,9 @@ class PinkDirectRuntime: 6. Persist """ market_state = self._update_market_state_runtime(snapshot) + # Drain any late fills BEFORE the policy reads slot/account state, so a + # resting LIMIT that filled since the last cycle is reflected. + await self.pump_venue_events(snapshot, market_state=market_state) acc = self.kernel.snapshot()["account"] slot_view = self.kernel.slot(0) if self.kernel.max_slots > 0 else None slot_dict = slot_view.to_dict() if slot_view is not None else {} diff --git a/prod/tests/test_bingx_direct_limit_order.py b/prod/tests/test_bingx_direct_limit_order.py new file mode 100644 index 0000000..75af2ea --- /dev/null +++ b/prod/tests/test_bingx_direct_limit_order.py @@ -0,0 +1,73 @@ +"""L2 — LIMIT order payload wiring in BingxDirectExecutionAdapter.submit_intent. + +The venue adapter forwards _order_type/_limit_price in the intent metadata; the +backend must place a LIMIT order (type=LIMIT + price + GTC) when asked, and keep +MARKET as the default. Offline unit test of payload construction — the signed_post +client is stubbed to capture the order payload; no exchange contact. +""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone +from types import SimpleNamespace + +from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter +from prod.clean_arch.dita import DecisionAction, Intent, TradeSide + + +def _adapter(captured: dict): + a = BingxDirectExecutionAdapter.__new__(BingxDirectExecutionAdapter) + a._config = SimpleNamespace(recv_window_ms=5000, default_leverage=1, exchange_leverage_cap=3) + a._client_order_run_id = "test" + a._entry_client_order_seq = 0 + a._exit_client_order_seq = 0 + a._state = SimpleNamespace(open_positions={}, account={}) + + async def _signed_post(path, params): + if path.endswith("/trade/order"): + captured["order"] = dict(params) + return {"orderId": "1", "status": "NEW"} + + a._client = SimpleNamespace(signed_post=_signed_post) + a._instrument_venue_symbol = lambda asset: "BTC-USDT" + a._format_quantity = lambda asset, q: f"{float(q)}" + a._format_price = lambda asset, p: f"{float(p)}" + + async def _refresh(asset, include_history=True): + return a._state + + a._refresh_exchange_state = _refresh + return a + + +def _intent(metadata: dict) -> Intent: + return Intent( + timestamp=datetime.now(timezone.utc), trade_id="T1", decision_id="D1", + asset="BTCUSDT", action=DecisionAction.ENTER, side=TradeSide.SHORT, + reason="TEST", target_size=0.01, leverage=2.0, reference_price=100.0, + confidence=0.5, exit_leg_ratios=(1.0,), metadata=metadata, + ) + + +def test_limit_intent_places_limit_order(): + captured: dict = {} + asyncio.run(_adapter(captured).submit_intent(_intent({"_order_type": "LIMIT", "_limit_price": 95.0}))) + o = captured["order"] + assert o["type"] == "LIMIT", o + assert "price" in o and float(o["price"]) == 95.0, o + assert o.get("timeInForce") == "GTC", o + + +def test_market_intent_places_market_order(): + captured: dict = {} + asyncio.run(_adapter(captured).submit_intent(_intent({}))) + o = captured["order"] + assert o["type"] == "MARKET", o + assert "price" not in o, o + + +def test_limit_without_valid_price_falls_back_to_market(): + captured: dict = {} + asyncio.run(_adapter(captured).submit_intent(_intent({"_order_type": "LIMIT", "_limit_price": 0.0}))) + assert captured["order"]["type"] == "MARKET", captured["order"] diff --git a/prod/tests/test_pink_async_fill_pump.py b/prod/tests/test_pink_async_fill_pump.py new file mode 100644 index 0000000..bdf0877 --- /dev/null +++ b/prod/tests/test_pink_async_fill_pump.py @@ -0,0 +1,182 @@ +"""L1 — async-fill pump. + +A resting order (LIMIT-style: ACK on submit, no synchronous fill) fills on a +*later* venue reconcile. `PinkDirectRuntime.pump_venue_events()` must drain that +fill into the kernel so capital settles and the FSM advances, persist the result, +and dedup duplicate reconcile events (no double-settle). MockVenue only; no exchange. +""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone + +from prod.clean_arch.dita_v2 import ( + ExecutionKernel, + InMemoryControlPlane, + KernelCommandType, + KernelControlSnapshot, + KernelEventKind, + KernelMode, + KernelVerbosity, + MemoryKernelJournal, + MockVenueAdapter, + MockVenueScenario, + TradeSide, + VenueEvent, + VenueEventStatus, +) +from prod.clean_arch.dita_v2.contracts import KernelIntent, TradeStage +from prod.clean_arch.dita import DecisionConfig, DecisionEngine, IntentEngine +from prod.clean_arch.persistence import PinkClickHousePersistence +from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime +from prod.clean_arch.ports.data_feed import DataFeedPort + + +class _Sink: + def __init__(self) -> None: + self.calls: list[tuple[str, dict]] = [] + + def __call__(self, table: str, row: dict) -> None: + self.calls.append((table, dict(row))) + + def tables(self) -> list[str]: + return [t for t, _ in self.calls] + + +class _StubFeed(DataFeedPort): + async def connect(self) -> bool: + return True + + async def disconnect(self) -> None: + pass + + async def get_latest_snapshot(self, symbol): + return None + + async def subscribe_snapshots(self, callback) -> None: + pass + + async def get_acb_update(self): + return None + + def get_latency_ms(self) -> float: + return 0.0 + + def health_check(self) -> bool: + return True + + +class _DelayedFillVenue(MockVenueAdapter): + """MockVenue whose submit ACKs only; queued fills surface on reconcile().""" + + def __init__(self, scenario=None) -> None: + super().__init__(scenario) + self._pending: list[VenueEvent] = [] + + def queue(self, event: VenueEvent) -> None: + self._pending.append(event) + + def reconcile(self): + out, self._pending = list(self._pending), [] + return out + + +def _mk_runtime(): + # ACK-only: no synchronous fill on submit (resting order). + venue = _DelayedFillVenue( + MockVenueScenario(emit_fill_on_submit=False, partial_fill_ratio=0.0, emit_ack_before_fill=True) + ) + kernel = ExecutionKernel( + control_plane=InMemoryControlPlane( + KernelControlSnapshot(mode=KernelMode.DEBUG, verbosity=KernelVerbosity.TRACE) + ), + venue=venue, + journal=MemoryKernelJournal(), + ) + kernel.account.snapshot.capital = 25_000.0 + kernel.account.snapshot.peak_capital = 25_000.0 + kernel.account.snapshot.equity = 25_000.0 + sink = _Sink() + cfg = DecisionConfig() + persistence = PinkClickHousePersistence(kernel.account, sink=sink, v7_sink=sink) + runtime = PinkDirectRuntime( + data_feed=_StubFeed(), kernel=kernel, + decision_engine=DecisionEngine(cfg), intent_engine=IntentEngine(cfg), + persistence=persistence, market_state_runtime=None, + ) + return runtime, kernel, venue, sink + + +def _intent(action, *, size, price, reason="TEST"): + return KernelIntent( + timestamp=datetime.now(timezone.utc), intent_id=f"i-{reason}", trade_id="T1", + slot_id=0, asset="BTCUSDT", side=TradeSide.SHORT, action=action, + reference_price=price, target_size=size, leverage=2.0, exit_leg_ratios=(1.0,), reason=reason, + ) + + +def _fill_for(order, *, kind, price, filled, remaining, eid): + return VenueEvent( + timestamp=datetime.now(timezone.utc), event_id=eid, trade_id="T1", slot_id=0, + kind=kind, status=VenueEventStatus.FILLED if kind == KernelEventKind.FULL_FILL else VenueEventStatus.PARTIALLY_FILLED, + venue_order_id=order.venue_order_id, venue_client_id=order.venue_client_id, + side=TradeSide.SHORT, asset="BTCUSDT", price=price, size=1.0, + filled_size=filled, remaining_size=remaining, + ) + + +def test_resting_entry_fills_via_pump_and_dedups(): + runtime, kernel, venue, sink = _mk_runtime() + + # ENTER rests (ACK only, nothing filled). + kernel.process_intent(_intent(KernelCommandType.ENTER, size=1.0, price=100.0)) + slot = kernel.slot(0) + assert slot.fsm_state == TradeStage.ENTRY_WORKING + assert abs(slot.size) < 1e-9 + entry_order = slot.active_entry_order + assert entry_order is not None + + # A later reconcile surfaces the fill -> pump settles it. + venue.queue(_fill_for(entry_order, kind=KernelEventKind.FULL_FILL, price=100.0, filled=1.0, remaining=0.0, eid="EVF1")) + applied = asyncio.run(runtime.pump_venue_events()) + assert applied == 1 + assert kernel.slot(0).fsm_state == TradeStage.POSITION_OPEN + assert abs(kernel.slot(0).size - 1.0) < 1e-9 + assert "account_events" in sink.tables() and "position_state" in sink.tables() + assert "ENTRY_FILLED" in [r["event_type"] for t, r in sink.calls if t == "trade_reconstruction"] + + # Duplicate reconcile event -> kernel dedups; pump applies nothing, no double-settle. + cap_before = kernel.account.snapshot.capital + rows_before = len(sink.calls) + venue.queue(_fill_for(entry_order, kind=KernelEventKind.FULL_FILL, price=100.0, filled=1.0, remaining=0.0, eid="EVF1")) + applied2 = asyncio.run(runtime.pump_venue_events()) + assert applied2 == 0, "duplicate fill must be deduped by the kernel" + assert kernel.account.snapshot.capital == cap_before + assert len(sink.calls) == rows_before, "no rows persisted for a deduped event" + + +def test_resting_exit_fills_via_pump_settles_capital(): + runtime, kernel, venue, sink = _mk_runtime() + + # Open a position via the pump (entry rests, then fills). + kernel.process_intent(_intent(KernelCommandType.ENTER, size=1.0, price=100.0)) + venue.queue(_fill_for(kernel.slot(0).active_entry_order, kind=KernelEventKind.FULL_FILL, price=100.0, filled=1.0, remaining=0.0, eid="EVE1")) + asyncio.run(runtime.pump_venue_events()) + assert kernel.slot(0).fsm_state == TradeStage.POSITION_OPEN + cap_after_entry = kernel.account.snapshot.capital # entry does not realize PnL + + # EXIT rests (ACK only), then fills @90 on a later reconcile -> SHORT profit. + kernel.process_intent(_intent(KernelCommandType.EXIT, size=1.0, price=90.0, reason="TP")) + exit_order = kernel.slot(0).active_exit_order + assert exit_order is not None + venue.queue(_fill_for(exit_order, kind=KernelEventKind.FULL_FILL, price=90.0, filled=1.0, remaining=0.0, eid="EVX1")) + applied = asyncio.run(runtime.pump_venue_events()) + assert applied == 1 + assert kernel.slot(0).closed + assert kernel.slot(0).fsm_state == TradeStage.CLOSED + # SHORT 1.0 @100 -> exit @90, leverage 2 => realized profit > 0; capital rose. + assert kernel.account.snapshot.capital > cap_after_entry + tables = sink.tables() + assert "trade_exit_legs" in tables, "async exit must persist a leg row" + assert "trade_events" in tables, "async close must persist a terminal trade_event" diff --git a/prod/tests/test_pink_clickhouse_persistence.py b/prod/tests/test_pink_clickhouse_persistence.py index ce62d8c..6d555cb 100644 --- a/prod/tests/test_pink_clickhouse_persistence.py +++ b/prod/tests/test_pink_clickhouse_persistence.py @@ -19,10 +19,14 @@ from prod.clean_arch.dita import ( ) from prod.clean_arch.dita_v2.contracts import ( KernelDiagnosticCode, + KernelEventKind, KernelOutcome, KernelSeverity, KernelTransition, + VenueEvent, + VenueEventStatus, ) +from prod.clean_arch.dita_v2.contracts import TradeSide as DitaTradeSide from prod.clean_arch.persistence.pink_clickhouse import PinkClickHousePersistence @@ -324,3 +328,96 @@ def test_persistence_writes_account_reconcile_rows() -> None: assert status_row["phase"] == "account_reconcile" assert recon_row["event_type"] == "ACCOUNT_RECONCILE" assert "market_state_bundle_json" in recon_row + + +# --------------------------------------------------------------------------- +# L0 — two-phase (request -> result) persistence +# --------------------------------------------------------------------------- + +def _fill_event(kind: KernelEventKind, *, filled: float, remaining: float, price: float = 100.0) -> VenueEvent: + return VenueEvent( + timestamp=datetime(2026, 5, 25, 12, 0, 0, tzinfo=timezone.utc), + event_id=f"EV-{kind.value}", + trade_id="BTCUSDT-T-000000000001", + slot_id=0, + kind=kind, + status=VenueEventStatus.FILLED if kind == KernelEventKind.FULL_FILL else VenueEventStatus.ACKED, + side=DitaTradeSide.SHORT, + asset="BTCUSDT", + price=price, + size=1.0, + filled_size=filled, + remaining_size=remaining, + ) + + +def _outcome_with_events(*events: VenueEvent) -> KernelOutcome: + return KernelOutcome( + accepted=True, slot_id=0, trade_id="BTCUSDT-T-000000000001", + state=DitaTradeStage.POSITION_OPEN, diagnostic_code=KernelDiagnosticCode.OK, + severity=KernelSeverity.INFO, transitions=(), emitted_events=tuple(events), details={}, + ) + + +def test_request_row_precedes_result_rows_on_filled_entry() -> None: + """ENTER with a FULL_FILL event: ORDER_REQUESTED is logged before ENTRY_FILLED.""" + sink = _Sink() + persistence = PinkClickHousePersistence(_make_account(), sink=sink, v7_sink=sink) + persistence.persist_step( + snapshot=_make_snapshot(), + decision=_make_decision(DecisionAction.ENTER), + intent=_make_intent(DecisionAction.ENTER), + outcome=_outcome_with_events(_fill_event(KernelEventKind.FULL_FILL, filled=1.0, remaining=0.0)), + slot_dict=_make_slot_dict(closed=False, size=1.0), + phase="execution", + ) + recon_types = [r["event_type"] for t, r in sink.calls if t == "trade_reconstruction"] + assert "ORDER_REQUESTED" in recon_types, recon_types + assert "ENTRY_FILLED" in recon_types, recon_types + assert recon_types.index("ORDER_REQUESTED") < recon_types.index("ENTRY_FILLED") + + +def test_resting_limit_entry_logs_request_but_no_fill() -> None: + """ACK-only LIMIT entry (slot still working, size 0) -> request row, NO ENTRY_FILLED.""" + sink = _Sink() + persistence = PinkClickHousePersistence(_make_account(), sink=sink, v7_sink=sink) + # Working entry order: slot not closed, size 0 (nothing filled yet). + working_slot = _make_slot_dict(closed=False, size=0.0) + working_slot["fsm_state"] = "ENTRY_WORKING" + persistence.persist_step( + snapshot=_make_snapshot(), + decision=_make_decision(DecisionAction.ENTER), + intent=_make_intent(DecisionAction.ENTER), + outcome=_outcome_with_events(_fill_event(KernelEventKind.ORDER_ACK, filled=0.0, remaining=1.0)), + slot_dict=working_slot, + phase="execution", + ) + recon_types = [r["event_type"] for t, r in sink.calls if t == "trade_reconstruction"] + assert "ORDER_REQUESTED" in recon_types, recon_types + assert "ENTRY_FILLED" not in recon_types, f"resting LIMIT must not log a fill: {recon_types}" + # State snapshot rows still written (observability). + tables = [t for t, _ in sink.calls] + assert "account_events" in tables and "position_state" in tables + + +def test_resting_limit_exit_emits_no_terminal_rows() -> None: + """An exit intent whose order rests (size unchanged) -> no trade_exit_legs / trade_events.""" + sink = _Sink() + account = _make_account() + persistence = PinkClickHousePersistence(account, sink=sink, v7_sink=sink) + # Seed leg state as if a 1.0 position is open (prev_size = 1.0). + persistence._leg_state["BTCUSDT-T-000000000001"] = {"prev_realized": 0.0, "prev_size": 1.0, "prev_leg_id": ""} + # Exit order resting: slot still open at full size, ACK only, no fill. + resting = _make_slot_dict(closed=False, size=1.0) + persistence.persist_step( + snapshot=_make_snapshot(), + decision=_make_decision(DecisionAction.EXIT), + intent=_make_intent(DecisionAction.EXIT), + outcome=_outcome_with_events(_fill_event(KernelEventKind.ORDER_ACK, filled=0.0, remaining=1.0)), + slot_dict=resting, + phase="execution", + ) + tables = [t for t, _ in sink.calls] + assert "trade_exit_legs" not in tables, "resting exit must not emit a leg row" + assert "trade_events" not in tables + assert "ORDER_REQUESTED" in [r["event_type"] for t, r in sink.calls if t == "trade_reconstruction"]