PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging

S1 — Leverage cache (bingx_direct.py):
  _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms
  POST when exchange already has the requested leverage.  Saves ~350ms/trade.
  Cache updated ONLY on success; failed POST leaves cache stale → correct retry.
  Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts.
  connect(): _verify_leverage_drift() detects when another process changed leverage
  at the exchange and updates cache to exchange truth (logs WARNING on drift).
  Multi-runner contract: leverage is account-level on BingX; documented that
  concurrent runners with different leverage desires for same symbol conflict.
  20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update,
  concurrent-same-symbol (lock prevents race), drift-detect, persist/restore,
  multi-runner known-limitation documentation test.

S2 — Background state refresh (bingx_direct.py):
  MARKET fills: asyncio.create_task(_refresh_state_background) — does not block
  submit path.  WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway.
  LIMIT fills: synchronous refresh retained (include_history=False, not True) —
  needed to detect resting order state for next pump cycle.
  Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved.

Gap 1 — VenueEvent friction fields (contracts.py):
  Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps,
  mark_at_submit — all with defaults so existing callers are unaffected.
  Detailed inline docs for sign conventions and provenance codes.

Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py):
  submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate;
  annotates ack_row with _fee_estimated, _fee_source, _is_maker_est.
  persist_fee_settled(): new method writes fee_settled_events row when WS
  ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED".
  pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED.

Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py):
  Captures mark_at_submit before the order POST; computes slippage_bps signed
  by side: positive = adverse (taker overpaid / maker undersold), negative =
  price improvement.  Measured for BOTH taker and maker fills for symmetry.
  Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps.

S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with:
  SHORT/LONG-aware price offset design, OBF integration requirements,
  TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s
  calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO.
  REST/WS split: documented why BingX (and all retail venues) separate these
  and why a unified VenueAdapter protocol is the long-term solution.

151/151 existing tests green + 20 new leverage cache tests = 171 total.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Codex
2026-06-05 12:25:12 +02:00
parent 714913bab6
commit c864e9c550
6 changed files with 1171 additions and 36 deletions

View File

@@ -10,11 +10,13 @@ import asyncio
import json
import logging
import math
import time
import uuid
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime, timezone
from decimal import Decimal, ROUND_DOWN
from typing import Any, Optional
from pathlib import Path
from typing import Any, Dict, Optional, Tuple
from nautilus_trader.model.identifiers import InstrumentId
@@ -194,6 +196,37 @@ class BingxDirectExecutionAdapter(ExecutionPort):
self._state: ExchangeStateSnapshot | None = None
self._connected = False
# ── S1: Leverage cache ────────────────────────────────────────────────
# Maps symbol → last successfully set leverage (int).
# Avoids a ~350ms leverage POST before every order when leverage is unchanged.
#
# Cache key is symbol only (not runner_id) because leverage is an
# ACCOUNT-LEVEL setting on BingX — one value per symbol per account.
# IMPORTANT CONTRACT: if multiple runners share this account and request
# DIFFERENT leverages for the same symbol concurrently, the last writer
# wins at the exchange and the other runner's order executes at wrong
# leverage. Callers MUST ensure leverage is uniform across runners for
# any given symbol when sharing an account, or use separate accounts.
# This cache only eliminates the redundant round-trip; it cannot resolve
# the underlying multi-runner semantic conflict.
#
# Per-symbol asyncio.Lock: prevents concurrent submit_intent calls for the
# same symbol from interleaving leverage POST + cache update, which would
# create a window where the cache says "leverage set" but the POST hasn't
# completed yet. Lock scope is deliberately tight (only the POST + cache
# write), not the entire submit_intent, to avoid head-of-line blocking.
self._leverage_cache: Dict[str, int] = {}
self._leverage_locks: Dict[str, asyncio.Lock] = {}
# Persist path: survives process restarts within a session, not across
# reboots (leverage should be re-verified from exchange on cold start).
env_tag = "live" if getattr(self._config, "environment", None) and \
str(getattr(self._config, "environment", "")).upper() == "LIVE" else "vst"
self._leverage_cache_path = Path(f"/tmp/.bingx_leverage_cache_{env_tag}.json")
self._load_leverage_cache()
# ── S2: Background state refresh tracking ─────────────────────────────
self._state_refreshed_at: float = 0.0 # monotonic seconds
@property
def state(self) -> ExchangeStateSnapshot | None:
return self._state
@@ -202,12 +235,122 @@ class BingxDirectExecutionAdapter(ExecutionPort):
await self._provider.initialize()
self._connected = True
self._state = await self.refresh_state()
# S4/S1: on reconnect, verify cached leverage matches exchange truth.
# Drift happens when another process/runner changed leverage on the same account.
await self._verify_leverage_drift()
return True
async def disconnect(self) -> None:
self._connected = False
await self._client.close()
# ── S1: Leverage cache helpers ────────────────────────────────────────────
def _load_leverage_cache(self) -> None:
"""Load persisted leverage cache from JSON sidecar. Ignores errors."""
try:
raw = json.loads(self._leverage_cache_path.read_text())
self._leverage_cache = {
str(k): int(v)
for k, v in raw.items()
if isinstance(v, (int, float)) and math.isfinite(float(v)) and float(v) >= 1
}
LOGGER.debug("leverage cache loaded: %s", self._leverage_cache)
except Exception:
self._leverage_cache = {}
def _persist_leverage_cache(self) -> None:
"""Flush leverage cache to JSON sidecar. Non-fatal on failure."""
try:
self._leverage_cache_path.write_text(
json.dumps(self._leverage_cache, indent=2)
)
except Exception as exc:
LOGGER.warning("leverage cache persist failed: %s", exc)
async def _ensure_leverage(self, symbol: str, leverage: int) -> bool:
"""Set leverage for symbol only if the cached value differs.
Returns True if a POST was made (leverage was changed), False if skipped.
The asyncio.Lock per symbol ensures concurrent submit_intent calls for the
same symbol never interleave leverage POST and cache update — preventing
the heisenbug where two orders both think they set leverage but one runs
at the wrong value because the other's POST arrived last.
Cache is updated ONLY on successful POST. A failed POST leaves the cache
stale, so the next submit retries — correct conservative behaviour.
"""
lock = self._leverage_locks.setdefault(symbol, asyncio.Lock())
async with lock:
cached = self._leverage_cache.get(symbol)
if cached == leverage:
return False # exchange already at requested value — skip POST
try:
await self._client.signed_post(
"/openApi/swap/v2/trade/leverage",
{"symbol": symbol, "side": "BOTH", "leverage": leverage},
)
prev = self._leverage_cache.get(symbol)
self._leverage_cache[symbol] = leverage
self._persist_leverage_cache()
LOGGER.info(
"leverage SET symbol=%s %s%d",
symbol, f"{prev}" if prev is not None else "", leverage,
)
return True
except Exception as exc:
LOGGER.warning(
"leverage POST failed symbol=%s lev=%d: %s"
"cache NOT updated, will retry on next submit",
symbol, leverage, exc,
)
return False # do NOT cache — retry guarantees correctness
async def _verify_leverage_drift(self) -> None:
"""On connect/reconnect, compare cached leverage with exchange reality.
Drift occurs when another process or runner changed leverage on the same
account while this adapter was offline. Log it loudly; update cache to
exchange truth so next submit does not re-set to the wrong value.
"""
for symbol, cached_lev in list(self._leverage_cache.items()):
try:
resp = await self._client.signed_get(
"/openApi/swap/v2/trade/leverage", {"symbol": symbol}
)
exchange_lev = int(
float(resp.get("longLeverage") or resp.get("leverage") or 0)
)
if exchange_lev > 0 and exchange_lev != cached_lev:
LOGGER.warning(
"LEVERAGE DRIFT symbol=%s cached=%d exchange=%d"
"another process may have changed it; cache updated to exchange truth",
symbol, cached_lev, exchange_lev,
)
self._leverage_cache[symbol] = exchange_lev
except Exception as exc:
LOGGER.debug("leverage drift check failed symbol=%s: %s", symbol, exc)
self._persist_leverage_cache()
# ── S2: Background state refresh ─────────────────────────────────────────
async def _refresh_state_background(self, symbol: str) -> None:
"""Background task: refresh internal state after a synchronous MARKET fill.
MARKET fills deliver fill_price / executedQty in the ACK; capital and
position truth arrive via WS FILL_SETTLED + ACCOUNT_UPDATE. This REST
poll is belt-and-suspenders — catches edge cases such as unexpected
concurrent fills or exchange position ledger lag. It does not block the
submit path.
"""
try:
self._state = await self._refresh_exchange_state(symbol, include_history=False)
self._state_refreshed_at = time.monotonic()
LOGGER.debug("background state refresh complete symbol=%s", symbol)
except Exception as exc:
LOGGER.warning("background state refresh failed symbol=%s: %s", symbol, exc)
def _resolve_instrument(self, asset: str):
normalized = _normalize_symbol(asset)
candidates = [
@@ -354,13 +497,65 @@ class BingxDirectExecutionAdapter(ExecutionPort):
async def refresh_state(self, symbol: str | None = None, *, include_history: bool = False) -> ExchangeStateSnapshot:
return await self._refresh_exchange_state(symbol, include_history=include_history)
# ── S3 — Maker order placement (CRITICAL TODO) ────────────────────────────
# Currently all orders are MARKET (taker, 0.04% fee). For exits especially,
# a LIMIT order at bid+1tick (SHORT close) or ask-1tick (LONG close) with
# reduceOnly=true can fill as maker (0.02% fee) — 50% fee saving per trade.
#
# Design requirements before this is safe to enable:
#
# 1. SHORT/LONG awareness (CRITICAL):
# SHORT close → BUY side → LIMIT at bid+1tick (slightly above bid to rest on book)
# LONG close → SELL side → LIMIT at ask-1tick
# SHORT enter → SELL side → LIMIT at ask-1tick (only if signal allows waiting)
# LONG enter → BUY side → LIMIT at bid+1tick
#
# 2. OBF integration: the OBF subsystem tracks order book depth and emits
# spread guidance. Use OBF.spread_bps and OBF.bid_depth / ask_depth to
# decide whether a maker order is viable (tight spread = fast fill,
# thin book = price impact of own order too large).
#
# 3. TODO_ADD_PARAMSET_VIBRISS: The "calm market" threshold (currently
# hardcoded as spread_bps < 5) MUST be VIBRISS-calibrated. VIBRISS
# governs our own assessment of market microstructure quality. Some
# invariants may remain hardcoded (e.g. "never use maker if spread > 50bps")
# but the normal operating range should be a VIBRISS metaparameter.
# See: prod/vibriss/ for the param-set schema.
#
# 4. Timeout calibration: timeout_s cannot be a fixed constant. It must
# reflect: (a) the strategy's max adverse excursion budget for the position,
# (b) the expected fill velocity from OBF order-flow data, (c) the remaining
# time before the signal's max_hold threshold. A resting limit that misses
# its cancel window creates an orphaned order — a hard risk control failure.
#
# 5. Price impact awareness (future, not yet operational):
# For large notionals, our own LIMIT order changes the book. At some notional
# threshold, the maker price improvement is eaten by the impact of our own
# resting order. The OBF subsystem must estimate this before we switch from
# MARKET to LIMIT. This is the "price impact of our own order" problem.
#
# CRITICAL TODO: Abstraction. The entire order placement logic (MARKET/LIMIT/
# TWAP/VWAP/Iceberg, maker/taker selection, price impact estimation, timeout/
# fallback, cancel-replace) MUST eventually be a dedicated "Smart Order Router"
# (SOR) subsystem. submit_intent() should be a thin dispatcher into the SOR,
# not a monolith. This is a pre-condition for multi-venue support, co-location
# optimisation, and regulatory reporting.
#
# GAP in characterisation: why does BingX separate REST (order placement) from
# WS (fill events)? Almost certainly historical: REST came first for audit-
# trail reasons (HMAC-signed requests → deterministic replay), WS push came
# later for low-latency fills. The FIX protocol unifies these under one
# connection — that is the right long-term model and the reason venue-agnostic
# abstraction via the VenueAdapter protocol is critical. Any future venue
# (Bybit, OKX, Binance) will have the same REST/WS split.
# ─────────────────────────────────────────────────────────────────────────
async def submit_intent(self, intent: Intent) -> ExecutionReceipt:
symbol = self._instrument_venue_symbol(intent.asset)
if intent.action == DecisionAction.EXIT:
side = "SELL" if intent.side == TradeSide.LONG else "BUY"
else:
side = "BUY" if intent.side == TradeSide.LONG else "SELL"
# Entries must be free to open the slot; only exits are reduce-only.
reduce_only = bool(intent.action == DecisionAction.EXIT)
if reduce_only:
self._exit_client_order_seq += 1
@@ -372,25 +567,28 @@ class BingxDirectExecutionAdapter(ExecutionPort):
int(round(float(intent.leverage or self._config.default_leverage))),
exchange_max=self._config.exchange_leverage_cap,
)
try:
await self._client.signed_post(
"/openApi/swap/v2/trade/leverage",
{"symbol": symbol, "side": "BOTH", "leverage": leverage},
)
except Exception as _lev_exc:
# W: leverage POST failed — order will execute at whatever leverage the
# exchange currently has for this symbol. Log prominently; do NOT abort
# the submit because the order may still succeed at the right leverage.
import logging as _logging
_logging.getLogger(__name__).warning(
"BingX leverage set failed (symbol=%s lev=%s): %s — proceeding with submit",
symbol, leverage, _lev_exc,
)
# S1: leverage cache — skip POST when exchange already has the right value.
await self._ensure_leverage(symbol, leverage)
# Capture mark price BEFORE the order POST for slippage measurement (Gap 3).
# This is the most honest reference: the market state at decision time.
mark_at_submit = 0.0
submit_ts_ms = int(time.time() * 1000)
if self._state is not None:
for _sym_key, _pos in self._state.open_positions.items():
_mark = float(_pos.get("markPrice") or _pos.get("avgPrice") or 0.0)
if _mark > 0:
mark_at_submit = _mark
break
if mark_at_submit <= 0:
# No open position for mark — use the last known mark from any symbol
# as a rough reference (e.g. for fresh ENTER with no position yet)
pass # remains 0.0; slippage will be 0
try:
# Honor the order type forwarded by the venue adapter
# (bingx_venue._legacy_intent sets _order_type/_limit_price). MARKET
# is the default; a LIMIT carries a resting price + GTC and will not
# fill synchronously — the async-fill pump settles it later.
# (bingx_venue._legacy_intent encodes _order_type/_limit_price).
order_type = str((intent.metadata or {}).get("_order_type", "MARKET") or "MARKET").upper()
limit_price = float((intent.metadata or {}).get("_limit_price", 0.0) or 0.0)
is_limit = order_type == "LIMIT" and limit_price > 0.0
@@ -422,8 +620,12 @@ class BingxDirectExecutionAdapter(ExecutionPort):
fill_price = value
break
if fill_price <= 0 and self._state is not None:
# Use the last known exchange mark as a fallback for projected accounting.
fill_price = next((float(row.get("markPrice") or row.get("avgPrice") or 0.0) for row in self._state.open_positions.values() if float(row.get("markPrice") or row.get("avgPrice") or 0.0) > 0), 0.0)
fill_price = next(
(float(row.get("markPrice") or row.get("avgPrice") or 0.0)
for row in self._state.open_positions.values()
if float(row.get("markPrice") or row.get("avgPrice") or 0.0) > 0),
0.0,
)
except BingxHttpError as exc:
status = "RATE_LIMITED" if _is_rate_limited_error(exc) else "REJECTED"
ack_row = {
@@ -434,6 +636,52 @@ class BingxDirectExecutionAdapter(ExecutionPort):
}
fill_price = 0.0
ack = None
is_limit = False
# ── Gap 2: fee estimation (ESTIMATED_TAKER / ESTIMATED_MAKER) ────────
# BingX REST ACK does not include commission. WS FILL_SETTLED will deliver
# the actual fee later and update the fee_source to "WS_SETTLED".
# Until then, log an estimate so CH rows are never blank on this field.
fill_qty = float(ack_row.get("executedQty") or ack_row.get("filledQty") or
getattr(intent, "target_size", 0.0) or 0.0)
if is_limit:
# LIMIT orders *may* rest and fill as maker — optimistic estimate.
fee_rate = 0.0002 # BingX perpetuals maker fee 0.02%
fee_source = "ESTIMATED_MAKER"
is_maker_est = True
else:
fee_rate = 0.0004 # BingX perpetuals taker fee 0.04%
fee_source = "ESTIMATED_TAKER"
is_maker_est = False
estimated_fee = fill_price * fill_qty * fee_rate if fill_price > 0 and fill_qty > 0 else 0.0
# ── Gap 3: slippage (signed, vs mark_at_submit) ───────────────────────
# Positive = worse than mark (taker overpays), negative = better (maker/price improvement).
# Measured for BOTH taker and maker fills so post-trade analytics can compare.
slippage_bps = 0.0
if mark_at_submit > 0 and fill_price > 0:
raw_diff = (fill_price - mark_at_submit) / mark_at_submit * 10_000
# Sign convention: adverse = positive regardless of direction.
# For BUY (LONG enter / SHORT close): higher fill price = worse → positive
# For SELL (SHORT enter / LONG close): lower fill price = worse → positive
if side == "BUY":
slippage_bps = raw_diff # positive if fill > mark (paid up)
else:
slippage_bps = -raw_diff # positive if fill < mark (sold down)
# Exchange-assigned fill time from ACK (field "updateTime" or "time"); 0 if absent.
exchange_ts = int(ack_row.get("updateTime") or ack_row.get("time") or
ack_row.get("transactTime") or 0)
# Annotate ack_row with computed friction so _events_from_submit can read them.
ack_row["_fee_estimated"] = estimated_fee
ack_row["_fee_source"] = fee_source
ack_row["_is_maker_est"] = is_maker_est
ack_row["_mark_at_submit"] = mark_at_submit
ack_row["_slippage_bps"] = slippage_bps
ack_row["_submit_ts_ms"] = submit_ts_ms
ack_row["_exchange_ts"] = exchange_ts
receipt = ExecutionReceipt(
timestamp=datetime.now(timezone.utc),
status=status,
@@ -443,12 +691,28 @@ class BingxDirectExecutionAdapter(ExecutionPort):
quantity=float(intent.target_size or 0.0),
price=fill_price,
client_order_id=client_order_id,
order_id=str((ack.order_id if 'ack' in locals() and ack is not None else '') or ack_row.get("orderId") or ""),
order_id=str(
(ack.order_id if "ack" in locals() and ack is not None else "")
or ack_row.get("orderId") or ""
),
raw_ack=ack_row,
raw_state=dict(self._state.account if self._state is not None else {}),
)
# Refresh from the venue so the direct runtime can use exchange-led state.
self._state = await self._refresh_exchange_state(intent.asset, include_history=True)
# S2: background state refresh — do not block the submit path for MARKET fills.
# For MARKET orders that returned FILLED in the ACK, all fill data is already
# in the receipt. The WS stream (FILL_SETTLED + ACCOUNT_UPDATE) delivers
# capital truth. The background REST poll is belt-and-suspenders.
# For LIMIT / non-FILLED orders: must refresh synchronously to detect resting order.
market_filled = (status == "FILLED" and not is_limit)
if market_filled:
asyncio.create_task(
self._refresh_state_background(intent.asset),
name=f"state_refresh_{symbol}",
)
else:
self._state = await self._refresh_exchange_state(intent.asset, include_history=False)
return receipt
async def cancel(self, order: Any, *, reason: str = "") -> dict[str, Any]:

View File

@@ -493,6 +493,14 @@ class BingxVenueAdapter(VenueAdapter):
**{**base_event.__dict__, "event_id": _event_id(self._event_seq), "kind": KernelEventKind.ORDER_REJECT, "status": VenueEventStatus.REJECTED, "reason": _row_text(ack_row, "msg", "message", default="BINGX_ORDER_REJECTED")},
)
]
# Extract friction fields annotated by submit_intent (Gap 1/2/3).
fee_estimated = float(ack_row.get("_fee_estimated") or 0.0)
fee_source = str(ack_row.get("_fee_source") or "")
is_maker_est = bool(ack_row.get("_is_maker_est", False))
mark_at_submit = float(ack_row.get("_mark_at_submit") or 0.0)
slippage_bps = float(ack_row.get("_slippage_bps") or 0.0)
exchange_ts = int(ack_row.get("_exchange_ts") or 0)
events = [base_event]
fill_status = _venue_event_status_from_row(status)
filled_size = _row_float(ack_row, "executedQty", "cumFilledQty", "filledQty", "lastFilledQty", default=0.0)
@@ -524,6 +532,15 @@ class BingxVenueAdapter(VenueAdapter):
reason="",
raw_payload=ack_row or json_safe(receipt),
metadata={"intent_id": intent.intent_id, "action": intent.action.value},
# Gap 1/2/3: fee + friction fields populated from submit_intent annotations.
# fee_source="ESTIMATED_*" until WS FILL_SETTLED updates it to "WS_SETTLED".
fee=fee_estimated,
fee_asset="USDT",
fee_source=fee_source,
is_maker=is_maker_est,
exchange_ts=exchange_ts,
slippage_bps=slippage_bps,
mark_at_submit=mark_at_submit,
)
)
return events

View File

@@ -0,0 +1,362 @@
"""Canonical v2 contracts for the DITAv2 execution kernel."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any, Dict, Mapping, Optional, Sequence, Tuple
class TradeSide(str, Enum):
"""Trade side."""
LONG = "LONG"
SHORT = "SHORT"
FLAT = "FLAT"
class TradeStage(str, Enum):
"""Execution stage for a trade slot."""
IDLE = "IDLE"
DECISION_CREATED = "DECISION_CREATED"
INTENT_CREATED = "INTENT_CREATED"
ORDER_REQUESTED = "ORDER_REQUESTED"
ORDER_SENT = "ORDER_SENT"
ORDER_ACKED = "ORDER_ACKED"
ORDER_REJECTED = "ORDER_REJECTED"
ENTRY_WORKING = "ENTRY_WORKING"
PARTIAL_FILL = "PARTIAL_FILL"
POSITION_OPENED = "POSITION_OPENED"
POSITION_OPEN = "POSITION_OPEN"
EXIT_REQUESTED = "EXIT_REQUESTED"
EXIT_SENT = "EXIT_SENT"
EXIT_ACKED = "EXIT_ACKED"
EXIT_REJECTED = "EXIT_REJECTED"
EXIT_WORKING = "EXIT_WORKING"
POSITION_PARTIALLY_CLOSED = "POSITION_PARTIALLY_CLOSED"
POSITION_CLOSED = "POSITION_CLOSED"
CLOSED = "CLOSED"
TRADE_TERMINAL_WRITTEN = "TRADE_TERMINAL_WRITTEN"
STALE_STATE_RECONCILING = "STALE_STATE_RECONCILING"
class KernelCommandType(str, Enum):
"""Kernel command types."""
ENTER = "ENTER"
EXIT = "EXIT"
MARK_PRICE = "MARK_PRICE"
RECONCILE = "RECONCILE"
CONTROL = "CONTROL"
CANCEL = "CANCEL"
class KernelEventKind(str, Enum):
"""Normalized venue event kinds."""
ORDER_ACK = "ORDER_ACK"
ORDER_REJECT = "ORDER_REJECT"
RATE_LIMITED = "RATE_LIMITED"
PARTIAL_FILL = "PARTIAL_FILL"
FULL_FILL = "FULL_FILL"
CANCEL_ACK = "CANCEL_ACK"
CANCEL_REJECT = "CANCEL_REJECT"
MARK_PRICE = "MARK_PRICE"
RECONCILE = "RECONCILE"
CONTROL = "CONTROL"
class KernelDiagnosticCode(str, Enum):
"""Structured diagnostic codes emitted by the kernel."""
OK = "OK"
RATE_LIMITED = "RATE_LIMITED"
INVALID_SLOT_ID = "INVALID_SLOT_ID"
INVALID_INTENT = "INVALID_INTENT"
UNSUPPORTED_INTENT = "UNSUPPORTED_INTENT"
SLOT_BUSY = "SLOT_BUSY"
NO_OPEN_POSITION = "NO_OPEN_POSITION"
NO_ACTIVE_EXIT_ORDER = "NO_ACTIVE_EXIT_ORDER"
UNKNOWN_EVENT_KIND = "UNKNOWN_EVENT_KIND"
ORDER_REJECTED = "ORDER_REJECTED"
ENTRY_ORDER_REJECTED = "ENTRY_ORDER_REJECTED"
EXIT_ORDER_REJECTED = "EXIT_ORDER_REJECTED"
CANCEL_REJECTED = "CANCEL_REJECTED"
STALE_STATE_RECONCILE = "STALE_STATE_RECONCILE"
RECONCILED = "RECONCILED"
DUPLICATE_EVENT = "DUPLICATE_EVENT"
UNRESOLVED_SLOT = "UNRESOLVED_SLOT"
INVALID_TRANSITION = "INVALID_TRANSITION"
TERMINAL_STATE = "TERMINAL_STATE"
CAPITAL_FROZEN = "CAPITAL_FROZEN"
class KernelSeverity(str, Enum):
"""Severity classification for kernel outcomes."""
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class VenueOrderStatus(str, Enum):
"""Order status surface mirrored from venue truth."""
NEW = "NEW"
ACKED = "ACKED"
PARTIALLY_FILLED = "PARTIALLY_FILLED"
FILLED = "FILLED"
CANCELED = "CANCELED"
REJECTED = "REJECTED"
class VenueEventStatus(str, Enum):
"""Status alias for normalized venue events."""
ACKED = "ACKED"
REJECTED = "REJECTED"
RATE_LIMITED = "RATE_LIMITED"
PARTIALLY_FILLED = "PARTIALLY_FILLED"
FILLED = "FILLED"
CANCELED = "CANCELED"
CANCELED_REJECTED = "CANCEL_REJECTED"
@dataclass(frozen=True)
class VenueOrder:
"""Venue-specific order identity and fill state."""
internal_trade_id: str
venue_order_id: str
venue_client_id: str
side: TradeSide
intended_size: float
filled_size: float = 0.0
average_fill_price: float = 0.0
status: VenueOrderStatus = VenueOrderStatus.NEW
metadata: Dict[str, Any] = field(default_factory=dict)
@property
def remaining_size(self) -> float:
return max(0.0, float(self.intended_size) - float(self.filled_size))
@dataclass
class TradeSlot:
"""A single execution slot managed by the v2 kernel."""
slot_id: int
trade_id: str = ""
asset: str = ""
side: TradeSide = TradeSide.FLAT
entry_price: float = 0.0
size: float = 0.0
initial_size: float = 0.0
leverage: float = 0.0
entry_time: Optional[datetime] = None
unrealized_pnl: float = 0.0
realized_pnl: float = 0.0
closed: bool = False
exit_leg_ratios: Tuple[float, ...] = (1.0,)
active_leg_index: int = 0
active_exit_order: Optional[VenueOrder] = None
active_entry_order: Optional[VenueOrder] = None
fsm_state: TradeStage = TradeStage.IDLE
close_reason: str = ""
last_event_time: Optional[datetime] = None
seen_event_ids: Tuple[str, ...] = ()
metadata: Dict[str, Any] = field(default_factory=dict)
def is_free(self) -> bool:
return self.fsm_state in {TradeStage.IDLE, TradeStage.CLOSED} and float(self.size or 0.0) <= 0.0 and not self.active_entry_order and not self.active_exit_order
def is_open(self) -> bool:
return self.fsm_state in {
TradeStage.ENTRY_WORKING,
TradeStage.POSITION_OPENED,
TradeStage.POSITION_OPEN,
TradeStage.EXIT_WORKING,
} and not self.closed
def mark_price(self, price: float) -> None:
if price is None or price != price or price <= 0:
return
self.entry_price = self.entry_price or price
if self.entry_price <= 0 or self.size <= 0:
self.unrealized_pnl = 0.0
return
delta = (price - self.entry_price) / self.entry_price
if self.side == TradeSide.SHORT:
delta = -delta
self.unrealized_pnl = delta * self.size * self.entry_price * self.leverage
def next_exit_ratio(self) -> float:
if self.active_leg_index < len(self.exit_leg_ratios):
ratio = float(self.exit_leg_ratios[self.active_leg_index])
return max(0.0, min(1.0, ratio))
return 1.0
def consume_exit_leg(self) -> float:
ratio = self.next_exit_ratio()
self.active_leg_index = min(self.active_leg_index + 1, max(len(self.exit_leg_ratios), 1))
return ratio
def remaining_size(self) -> float:
return max(0.0, float(self.size))
def attach_entry_order(self, order: VenueOrder) -> None:
self.active_entry_order = order
def attach_exit_order(self, order: VenueOrder) -> None:
self.active_exit_order = order
def to_dict(self) -> Dict[str, Any]:
def _order_dict(order: Optional[VenueOrder]) -> Optional[Dict[str, Any]]:
if order is None:
return None
return {
"internal_trade_id": order.internal_trade_id,
"venue_order_id": order.venue_order_id,
"venue_client_id": order.venue_client_id,
"side": order.side.value,
"intended_size": float(order.intended_size or 0.0),
"filled_size": float(order.filled_size or 0.0),
"average_fill_price": float(order.average_fill_price or 0.0),
"status": order.status.value,
"metadata": dict(order.metadata),
}
return {
"slot_id": self.slot_id,
"trade_id": self.trade_id,
"asset": self.asset,
"side": self.side.value,
"entry_price": float(self.entry_price or 0.0),
"size": float(self.size or 0.0),
"initial_size": float(self.initial_size or 0.0),
"leverage": float(self.leverage or 0.0),
"entry_time": self.entry_time.isoformat() if hasattr(self.entry_time, "isoformat") else None,
"unrealized_pnl": float(self.unrealized_pnl or 0.0),
"realized_pnl": float(self.realized_pnl or 0.0),
"closed": bool(self.closed),
"exit_leg_ratios": [float(r) for r in self.exit_leg_ratios],
"active_leg_index": int(self.active_leg_index or 0),
"active_exit_order": _order_dict(self.active_exit_order),
"active_entry_order": _order_dict(self.active_entry_order),
"fsm_state": self.fsm_state.value,
"close_reason": self.close_reason,
"last_event_time": self.last_event_time.isoformat() if hasattr(self.last_event_time, "isoformat") else None,
"seen_event_ids": list(self.seen_event_ids),
"metadata": dict(self.metadata),
}
@dataclass(frozen=True)
class KernelIntent:
"""Command emitted by the algo and written to the hot-path intent region."""
timestamp: datetime
intent_id: str
trade_id: str
slot_id: int
asset: str
side: TradeSide
action: KernelCommandType
reference_price: float
target_size: float
leverage: float
exit_leg_ratios: Tuple[float, ...] = (1.0,)
reason: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
stage: TradeStage = TradeStage.INTENT_CREATED
order_type: str = "MARKET"
limit_price: float = 0.0
@dataclass(frozen=True)
class VenueEvent:
"""Normalized venue truth mapped into DITAv2 semantics."""
timestamp: datetime
event_id: str
trade_id: str
slot_id: int
kind: KernelEventKind
status: VenueEventStatus
venue_order_id: str = ""
venue_client_id: str = ""
side: TradeSide = TradeSide.FLAT
asset: str = ""
price: float = 0.0 # avg fill price
size: float = 0.0
filled_size: float = 0.0
remaining_size: float = 0.0
reason: str = ""
raw_payload: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
# ── Fee / friction fields ──────────────────────────────────────────────
# fee: exchange commission for this fill event.
# Positive = cost; negative = rebate (maker on some venues).
# Starts as ESTIMATED_TAKER from the REST ACK path (BingX ACK does not
# include commission — we estimate from fill_size × fill_price × rate).
# Updated to WS_SETTLED when the FILL_SETTLED event arrives from the
# account stream with the actual commission field "n".
fee: float = 0.0
fee_asset: str = "" # e.g. "USDT"
# fee_source provenance codes:
# "" — fee unknown (e.g. CANCEL_ACK, ORDER_ACK events)
# "ESTIMATED_TAKER" — REST path, MARKET order; estimated at taker rate
# "ESTIMATED_MAKER" — REST path, LIMIT order that may rest; estimated at maker rate
# "WS_SETTLED" — actual fee from WS ORDER_TRADE_UPDATE field "n"
# "REST_SETTLED" — actual fee from REST fill history (allFillOrders)
fee_source: str = ""
is_maker: bool = False # True when LIMIT order rested and filled as maker
# exchange_ts: exchange-assigned fill timestamp (ms epoch).
# 0 when not available (REST ACK path — use VenueEvent.timestamp as fallback).
# Non-zero from WS ORDER_TRADE_UPDATE (field "T" or "t").
exchange_ts: int = 0
# slippage_bps: signed fill-quality metric.
# For taker fills: (fill_price - mark_at_submit) / mark_at_submit × 10_000
# positive = worse than mark (usual for taker on moving markets)
# negative = better than mark (rare — mark moved in our favour between submit and fill)
# For maker fills: fill_price is better than mid by design; slippage is typically
# negative (price improvement vs what a taker would have paid).
# 0.0 when mark_at_submit is unavailable.
slippage_bps: float = 0.0
mark_at_submit: float = 0.0 # mark/mid price captured just before submit_intent POST
@dataclass(frozen=True)
class KernelTransition:
"""Durable kernel transition used for debug journaling."""
timestamp: datetime
trade_id: str
slot_id: int
prev_state: TradeStage
next_state: TradeStage
trigger: str
intent_id: str = ""
event_id: str = ""
control_mode: str = ""
control_verbosity: str = ""
details: Dict[str, Any] = field(default_factory=dict)
@dataclass(frozen=True)
class KernelOutcome:
"""Result of applying a command or venue event."""
accepted: bool
slot_id: int
trade_id: str
state: TradeStage
diagnostic_code: KernelDiagnosticCode = KernelDiagnosticCode.OK
severity: KernelSeverity = KernelSeverity.INFO
transitions: Tuple[KernelTransition, ...] = ()
emitted_events: Tuple[VenueEvent, ...] = ()
details: Dict[str, Any] = field(default_factory=dict)

View File

@@ -0,0 +1,390 @@
"""S1: Leverage cache — comprehensive mock tests.
Covers:
- Same-leverage skip (no POST)
- Change triggers POST
- POST failure → cache NOT updated → retry on next call
- Concurrent same-symbol same-leverage: only one POST (lock)
- Concurrent same-symbol different-leverage: serialised, both POST
- Connect-time drift detection and cache correction
- Persist/restore across "restarts" (file-based)
- Multi-runner conflict: both runners see the same account-level leverage
- Leverage after BingX HTTP error: partial state handled correctly
Bad-leverage-at-trade is one of the worst possible outcomes — these tests
guard every code path that could produce it.
"""
from __future__ import annotations
import asyncio
import json
import sys
import tempfile
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch, call
sys.path.insert(0, "/mnt/dolphinng5_predict")
import pytest
# ---------------------------------------------------------------------------
# Helpers — minimal adapter stub wired to a fake HTTP client
# ---------------------------------------------------------------------------
def _make_adapter(tmp_path: Path, env_tag: str = "vst"):
"""Build a BingxDirectExecutionAdapter with a mocked HTTP client."""
from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter, BingxDirectExecutionConfig
from prod.bingx.enums import BingxEnvironment
cfg = BingxDirectExecutionConfig(
environment=BingxEnvironment.VST,
allow_mainnet=False,
default_leverage=1,
exchange_leverage_cap=3,
)
mock_client = AsyncMock()
mock_client.signed_post = AsyncMock(return_value={"leverage": 1})
mock_client.signed_get = AsyncMock(return_value={"leverage": 1, "longLeverage": 1})
mock_provider = MagicMock()
mock_provider.initialize = AsyncMock()
mock_provider.find = MagicMock(return_value=None)
mock_provider.list_all = MagicMock(return_value=[])
adapter = BingxDirectExecutionAdapter(cfg, client=mock_client, provider=mock_provider)
# Override persist path to tmp THEN reload — __init__ already loaded from the
# default /tmp path. Reloading after override ensures tests are path-isolated.
adapter._leverage_cache_path = tmp_path / f".bingx_leverage_cache_{env_tag}.json"
adapter._leverage_cache = {}
adapter._load_leverage_cache() # load from test-specific tmp path (may be empty)
return adapter, mock_client
# ---------------------------------------------------------------------------
# 1. Basic skip — same leverage, no POST
# ---------------------------------------------------------------------------
class TestLeverageCacheBasic:
@pytest.mark.asyncio
async def test_skip_when_same_leverage(self, tmp_path):
adapter, client = _make_adapter(tmp_path)
# First call: cache miss → POST
await adapter._ensure_leverage("TRX-USDT", 1)
assert client.signed_post.call_count == 1
# Second call: cache hit → NO POST
await adapter._ensure_leverage("TRX-USDT", 1)
assert client.signed_post.call_count == 1, "Should not POST when leverage unchanged"
@pytest.mark.asyncio
async def test_post_on_change(self, tmp_path):
adapter, client = _make_adapter(tmp_path)
await adapter._ensure_leverage("TRX-USDT", 1)
assert client.signed_post.call_count == 1
# Change leverage → must POST again
await adapter._ensure_leverage("TRX-USDT", 2)
assert client.signed_post.call_count == 2, "Should POST when leverage changes"
assert adapter._leverage_cache["TRX-USDT"] == 2
@pytest.mark.asyncio
async def test_different_symbols_independent(self, tmp_path):
adapter, client = _make_adapter(tmp_path)
await adapter._ensure_leverage("TRX-USDT", 1)
await adapter._ensure_leverage("XRP-USDT", 1)
assert client.signed_post.call_count == 2, "Each symbol needs its own POST"
# Skip both on repeat
await adapter._ensure_leverage("TRX-USDT", 1)
await adapter._ensure_leverage("XRP-USDT", 1)
assert client.signed_post.call_count == 2, "No extra POSTs for cached symbols"
# ---------------------------------------------------------------------------
# 2. Failure handling — cache NOT updated on POST failure
# ---------------------------------------------------------------------------
class TestLeverageCacheFailure:
@pytest.mark.asyncio
async def test_cache_not_updated_on_post_failure(self, tmp_path):
from prod.bingx.http import BingxHttpError
adapter, client = _make_adapter(tmp_path)
client.signed_post.side_effect = BingxHttpError("HTTP 429 rate limit")
result = await adapter._ensure_leverage("TRX-USDT", 2)
assert result is False, "Should return False on failure"
assert "TRX-USDT" not in adapter._leverage_cache, (
"Cache must NOT be updated when POST fails — "
"next submit must retry, not use wrong leverage"
)
@pytest.mark.asyncio
async def test_retry_on_next_call_after_failure(self, tmp_path):
from prod.bingx.http import BingxHttpError
adapter, client = _make_adapter(tmp_path)
# First attempt fails
client.signed_post.side_effect = BingxHttpError("rate limit")
await adapter._ensure_leverage("TRX-USDT", 2)
assert client.signed_post.call_count == 1
# Second attempt succeeds
client.signed_post.side_effect = None
client.signed_post.return_value = {"leverage": 2}
await adapter._ensure_leverage("TRX-USDT", 2)
assert client.signed_post.call_count == 2, "Must retry after failure"
assert adapter._leverage_cache.get("TRX-USDT") == 2
@pytest.mark.asyncio
async def test_first_call_returns_true_on_success(self, tmp_path):
adapter, client = _make_adapter(tmp_path)
result = await adapter._ensure_leverage("TRX-USDT", 1)
assert result is True, "Should return True when POST is made"
@pytest.mark.asyncio
async def test_skip_returns_false(self, tmp_path):
adapter, client = _make_adapter(tmp_path)
await adapter._ensure_leverage("TRX-USDT", 1)
result = await adapter._ensure_leverage("TRX-USDT", 1)
assert result is False, "Should return False when POST is skipped"
# ---------------------------------------------------------------------------
# 3. Concurrency — asyncio.Lock prevents interleaving
# ---------------------------------------------------------------------------
class TestLeverageCacheConcurrency:
@pytest.mark.asyncio
async def test_concurrent_same_symbol_same_leverage_one_post(self, tmp_path):
"""Two concurrent submits for same symbol+leverage → exactly one POST."""
adapter, client = _make_adapter(tmp_path)
# Introduce a small delay so both calls enter _ensure_leverage before either completes
call_count = 0
async def slow_post(*args, **kwargs):
nonlocal call_count
call_count += 1
await asyncio.sleep(0.01)
return {"leverage": 1}
client.signed_post.side_effect = slow_post
results = await asyncio.gather(
adapter._ensure_leverage("TRX-USDT", 1),
adapter._ensure_leverage("TRX-USDT", 1),
)
# Only ONE should have actually POSTed (the one that won the lock)
assert call_count == 1, (
f"Expected exactly 1 leverage POST for same symbol+leverage, got {call_count}. "
"This is the heisenbug: if the lock isn't protecting the cache check+update "
"atomically, both calls see an empty cache and both POST."
)
# One True (did POST), one False (saw cache hit after lock)
assert sorted(results) == [False, True], f"Expected [False, True], got {results}"
@pytest.mark.asyncio
async def test_concurrent_same_symbol_different_leverage_both_post(self, tmp_path):
"""Two calls with different leverages for same symbol → both POST, serialised."""
adapter, client = _make_adapter(tmp_path)
posted_leverages = []
async def recording_post(path, params):
await asyncio.sleep(0.005)
posted_leverages.append(params.get("leverage"))
return {"leverage": params.get("leverage")}
client.signed_post.side_effect = recording_post
await asyncio.gather(
adapter._ensure_leverage("TRX-USDT", 1),
adapter._ensure_leverage("TRX-USDT", 2),
)
assert len(posted_leverages) == 2, "Both different-leverage calls must POST"
assert set(posted_leverages) == {1, 2}
@pytest.mark.asyncio
async def test_independent_symbols_concurrent_no_interference(self, tmp_path):
"""Different symbols are fully independent — no cross-symbol blocking."""
adapter, client = _make_adapter(tmp_path)
call_order = []
async def recording_post(path, params):
call_order.append(params.get("symbol", ""))
return {"leverage": 1}
client.signed_post.side_effect = recording_post
await asyncio.gather(
adapter._ensure_leverage("TRX-USDT", 1),
adapter._ensure_leverage("XRP-USDT", 1),
adapter._ensure_leverage("BTC-USDT", 1),
)
assert len(call_order) == 3, "All three symbols should POST independently"
# ---------------------------------------------------------------------------
# 4. Persistence — JSON sidecar survives "restarts"
# ---------------------------------------------------------------------------
class TestLeverageCachePersistence:
@pytest.mark.asyncio
async def test_cache_persisted_on_set(self, tmp_path):
adapter, client = _make_adapter(tmp_path)
await adapter._ensure_leverage("TRX-USDT", 2)
persisted = json.loads(adapter._leverage_cache_path.read_text())
assert persisted.get("TRX-USDT") == 2
@pytest.mark.asyncio
async def test_cache_restored_on_init(self, tmp_path):
"""Second adapter instance loads cache from file → skips POST for cached symbol."""
adapter1, client1 = _make_adapter(tmp_path)
await adapter1._ensure_leverage("TRX-USDT", 2)
assert client1.signed_post.call_count == 1
# Second adapter reads from same file
adapter2, client2 = _make_adapter(tmp_path)
assert adapter2._leverage_cache.get("TRX-USDT") == 2
await adapter2._ensure_leverage("TRX-USDT", 2)
assert client2.signed_post.call_count == 0, (
"After restart, cached leverage should not trigger another POST"
)
@pytest.mark.asyncio
async def test_corrupt_cache_file_handled_gracefully(self, tmp_path):
adapter, _ = _make_adapter(tmp_path)
adapter._leverage_cache_path.write_text("{not valid json}")
# Re-load should not crash; cache resets to empty
adapter._load_leverage_cache()
assert adapter._leverage_cache == {}
@pytest.mark.asyncio
async def test_invalid_leverage_values_filtered(self, tmp_path):
# Write the file first, then create adapter pointing at it
cache_file = tmp_path / ".bingx_leverage_cache_vst.json"
cache_file.write_text(
json.dumps({"TRX-USDT": 2, "XRP-USDT": -1, "BTC-USDT": "bad", "ETH-USDT": 0})
)
adapter, _ = _make_adapter(tmp_path) # _make_adapter calls _load_leverage_cache after path set
# Only valid (>= 1) entries survive
assert adapter._leverage_cache.get("TRX-USDT") == 2
assert "XRP-USDT" not in adapter._leverage_cache, "Negative leverage filtered"
assert "BTC-USDT" not in adapter._leverage_cache, "Non-numeric leverage filtered"
assert "ETH-USDT" not in adapter._leverage_cache, "Zero leverage filtered"
# ---------------------------------------------------------------------------
# 5. Connect-time drift detection
# ---------------------------------------------------------------------------
class TestLeverageDriftDetection:
@pytest.mark.asyncio
async def test_drift_detected_and_cache_corrected(self, tmp_path):
"""Exchange has lev=2 but cache says 1 → cache updated to exchange truth."""
adapter, client = _make_adapter(tmp_path)
adapter._leverage_cache["TRX-USDT"] = 1 # stale cache
# Exchange returns 2 (another runner changed it)
client.signed_get.return_value = {"leverage": 2, "longLeverage": 2}
await adapter._verify_leverage_drift()
assert adapter._leverage_cache.get("TRX-USDT") == 2, (
"Cache must be updated to exchange truth after drift detected"
)
@pytest.mark.asyncio
async def test_no_drift_no_update(self, tmp_path):
adapter, client = _make_adapter(tmp_path)
adapter._leverage_cache["TRX-USDT"] = 1
client.signed_get.return_value = {"leverage": 1, "longLeverage": 1}
await adapter._verify_leverage_drift()
# No change — cache is already correct
assert adapter._leverage_cache.get("TRX-USDT") == 1
@pytest.mark.asyncio
async def test_drift_check_failure_is_non_fatal(self, tmp_path):
"""If drift check itself fails (network error), adapter must not crash."""
from prod.bingx.http import BingxHttpError
adapter, client = _make_adapter(tmp_path)
adapter._leverage_cache["TRX-USDT"] = 1
client.signed_get.side_effect = BingxHttpError("rate limit")
await adapter._verify_leverage_drift() # must not raise
assert adapter._leverage_cache.get("TRX-USDT") == 1 # unchanged
@pytest.mark.asyncio
async def test_connect_calls_drift_verification(self, tmp_path):
"""connect() must call _verify_leverage_drift after refresh_state."""
adapter, client = _make_adapter(tmp_path)
adapter._leverage_cache["TRX-USDT"] = 99 # obviously wrong
from prod.clean_arch.ports.execution import ExchangeStateSnapshot
from datetime import datetime, timezone
snap = ExchangeStateSnapshot(
timestamp=datetime.now(timezone.utc),
capital=25000.0, equity=25000.0,
open_positions={}, open_orders=[],
all_orders=[], all_fills=[],
account={}, open_notional=0.0, source="test",
)
adapter.refresh_state = AsyncMock(return_value=snap)
client.signed_get.return_value = {"leverage": 1, "longLeverage": 1}
await adapter.connect()
client.signed_get.assert_called() # drift check happened
# ---------------------------------------------------------------------------
# 6. Multi-runner contract documentation test
# ---------------------------------------------------------------------------
class TestMultiRunnerContract:
@pytest.mark.asyncio
async def test_account_level_leverage_last_writer_wins(self, tmp_path):
"""
CRITICAL: BingX has ONE leverage setting per symbol per account.
Two runners requesting different leverages for the same symbol
CANNOT be safely arbitrated by the cache alone — the exchange
will reflect whichever runner's POST arrived last.
This test documents the known limitation: runner-A sets lev=1,
runner-B sets lev=2, runner-A's order may execute at lev=2.
Detection requires cross-process coordination (Zinc arbiter) which
is not yet implemented. For now, ensure leverage is uniform across
all runners for a shared account.
"""
# Simulate runner A
adapter_a, client_a = _make_adapter(tmp_path)
client_a.signed_post.return_value = {"leverage": 1}
await adapter_a._ensure_leverage("TRX-USDT", 1)
assert adapter_a._leverage_cache["TRX-USDT"] == 1
# Simulate runner B (different adapter, same account)
adapter_b, client_b = _make_adapter(tmp_path)
client_b.signed_post.return_value = {"leverage": 2}
await adapter_b._ensure_leverage("TRX-USDT", 2)
assert adapter_b._leverage_cache["TRX-USDT"] == 2
# Runner A's cache is now STALE — exchange has lev=2 from runner B
# Runner A believes lev=1 but the exchange has lev=2.
# This is the known multi-runner conflict with no current mitigation.
assert adapter_a._leverage_cache["TRX-USDT"] == 1, (
"Runner A's cache is stale after runner B changed leverage. "
"This is expected — document the known limitation. "
"Fix: cross-process Zinc leverage arbiter (future work)."
)
@pytest.mark.asyncio
async def test_single_runner_consistent_after_multiple_symbols(self, tmp_path):
"""Within one runner: leverage is always correct after successful POST."""
adapter, client = _make_adapter(tmp_path)
for sym, lev in [("TRX-USDT", 1), ("XRP-USDT", 2), ("BTC-USDT", 1)]:
await adapter._ensure_leverage(sym, lev)
assert adapter._leverage_cache[sym] == lev, f"Cache wrong for {sym}"

View File

@@ -455,21 +455,36 @@ class PinkClickHousePersistence:
partial = (not slot_closed) and cur_size > 0.0
# Extract the fill price from emitted venue events (G21 fix): the actual
# exchange fill price lives in the FULL_FILL/PARTIAL_FILL event, not in
# the slot dict. Pass it explicitly so _write_trade_event does not fall
# back to entry_price.
# Extract fill price AND friction fields from emitted venue events.
# These are first-class fields on VenueEvent (Gap 1/2/3).
fill_price_hint = 0.0
fill_fee = 0.0
fill_fee_source = ""
fill_is_maker = False
fill_slippage_bps = 0.0
fill_mark_at_submit = 0.0
fill_exchange_ts = 0
for ev in events:
p_val = getattr(ev, "price", 0.0)
if p_val and math.isfinite(float(p_val)) and float(p_val) > 0:
fill_price_hint = float(p_val)
break
if getattr(ev, "fee", 0.0):
fill_fee = float(ev.fee)
if getattr(ev, "fee_source", ""):
fill_fee_source = str(ev.fee_source)
if getattr(ev, "is_maker", False):
fill_is_maker = bool(ev.is_maker)
if getattr(ev, "slippage_bps", 0.0):
fill_slippage_bps = float(ev.slippage_bps)
if getattr(ev, "mark_at_submit", 0.0):
fill_mark_at_submit = float(ev.mark_at_submit)
if getattr(ev, "exchange_ts", 0):
fill_exchange_ts = int(ev.exchange_ts)
# One trade_exit_legs row per exit leg (partial or final), BLUE-schema
# compatible so PINK multi-exit trades reconcile against the same table.
self._write_trade_exit_leg(snapshot, decision, intent, slot, outcome,
fill_price_hint=fill_price_hint)
fill_price_hint=fill_price_hint,
fill_fee=fill_fee, fill_fee_source=fill_fee_source,
fill_is_maker=fill_is_maker, fill_slippage_bps=fill_slippage_bps)
self._write_trade_reconstruction(
snapshot, intent.trade_id,
event_type="PARTIAL_EXIT" if partial else "EXIT",
@@ -483,11 +498,15 @@ class PinkClickHousePersistence:
},
market_state=market_state,
)
# Terminal trade event.
if slot_closed:
self._write_trade_event(snapshot, decision, intent, slot, outcome,
market_state=market_state,
exit_price_hint=fill_price_hint)
exit_price_hint=fill_price_hint,
fill_fee=fill_fee, fill_fee_source=fill_fee_source,
fill_is_maker=fill_is_maker,
fill_slippage_bps=fill_slippage_bps,
fill_mark_at_submit=fill_mark_at_submit,
fill_exchange_ts=fill_exchange_ts)
def persist_fill_events(
self,
@@ -601,6 +620,46 @@ class PinkClickHousePersistence:
market_state=market_state,
)
def persist_fee_settled(
self,
*,
trade_id: str,
fee: float,
fee_asset: str = "USDT",
is_maker: bool = False,
exchange_ts: int = 0,
realized_pnl_delta: float = 0.0,
ts: Optional[Any] = None,
) -> None:
"""Record the WS FILL_SETTLED fee arriving after the REST submit.
Gap 2: the REST ACK path writes fee_source="ESTIMATED_TAKER/MAKER".
When the WS ORDER_TRADE_UPDATE frame arrives with field "n" (actual
commission), call this method to log the settled truth.
The CH spool stores both the original estimated row AND this settled row.
Downstream queries can reconcile using:
SELECT trade_id, MAX(fee) FILTER(WHERE fee_source='WS_SETTLED') AS settled_fee,
MAX(fee) FILTER(WHERE fee_source LIKE 'ESTIMATED%') AS estimated_fee
FROM trade_events GROUP BY trade_id
This method writes to ``fee_settled_events`` (a lightweight supplementary
table, not trade_events) so the original row is never mutated.
"""
ts_val = ts or datetime.now(timezone.utc)
self._sink("fee_settled_events", {
"ts": ts_val.isoformat() if hasattr(ts_val, "isoformat") else str(ts_val),
"trade_id": trade_id,
"fee": float(fee),
"fee_asset": fee_asset,
"fee_source": "WS_SETTLED",
"is_maker": bool(is_maker),
"exchange_ts": int(exchange_ts),
"realized_pnl_delta": float(realized_pnl_delta),
"runtime_namespace": self.config.runtime_namespace,
"strategy": self.config.strategy,
})
def record_anomaly(
self,
*,
@@ -876,6 +935,10 @@ class PinkClickHousePersistence:
self, snapshot: Any, decision: Decision, intent: Intent,
slot_dict: dict[str, Any], outcome: KernelOutcome | None,
fill_price_hint: float = 0.0,
fill_fee: float = 0.0,
fill_fee_source: str = "",
fill_is_maker: bool = False,
fill_slippage_bps: float = 0.0,
) -> None:
"""Emit one BLUE-schema-compatible ``trade_exit_legs`` row per exit leg.
@@ -964,6 +1027,11 @@ class PinkClickHousePersistence:
"pnl_leg": pnl_leg,
"pnl_realized_total": cur_realized,
"bars_held": int(intent.bars_held or 0),
# Gap 1/2/3: per-leg friction
"fee_leg": fill_fee,
"fee_source": fill_fee_source,
"is_maker": fill_is_maker,
"slippage_bps": fill_slippage_bps,
})
# Advance the per-trade leg snapshot for the next leg's delta.
@@ -978,6 +1046,12 @@ class PinkClickHousePersistence:
slot_dict: dict[str, Any], outcome: KernelOutcome | None,
*, market_state: Mapping[str, Any] | None = None,
exit_price_hint: float = 0.0,
fill_fee: float = 0.0,
fill_fee_source: str = "",
fill_is_maker: bool = False,
fill_slippage_bps: float = 0.0,
fill_mark_at_submit: float = 0.0,
fill_exchange_ts: int = 0,
) -> None:
entry_price = _safe_float(slot_dict.get("entry_price", 0.0), 0.0) or _safe_float(intent.reference_price, 0.0)
quantity = _safe_float(slot_dict.get("initial_size", slot_dict.get("size", 0.0)), 0.0) or _safe_float(intent.target_size, 0.0)
@@ -1050,7 +1124,19 @@ class PinkClickHousePersistence:
"entry_payload_json": _json_text({"decision": _decision_summary(decision), "intent": _intent_summary(intent)}),
"exit_payload_json": _json_text({"outcome": _outcome_summary(outcome), "slot": _json_safe(slot_dict)}),
"execution_payload_json": _json_text({"outcome": _outcome_summary(outcome)}),
"friction_payload_json": _json_text({"fees": 0.0}),
# Gap 1/2/3: fee, maker/taker, slippage, exchange timing.
# fee_source provenance: "ESTIMATED_TAKER" | "ESTIMATED_MAKER" | "WS_SETTLED" | "REST_SETTLED"
"fee": fill_fee,
"fee_source": fill_fee_source,
"is_maker": fill_is_maker,
"slippage_bps": fill_slippage_bps,
"mark_at_submit": fill_mark_at_submit,
"exchange_ts": fill_exchange_ts,
"friction_payload_json": _json_text({
"fee": fill_fee, "fee_source": fill_fee_source,
"is_maker": fill_is_maker, "slippage_bps": fill_slippage_bps,
"mark_at_submit": fill_mark_at_submit, "exchange_ts": fill_exchange_ts,
}),
"event_payload_json": _json_text({"phase": "terminal_close", "trade_id": intent.trade_id}),
"market_state_bundle_json": _json_text(market_state or {}),
"tp_base_pct": _safe_float(metadata.get("tp_base_pct", 0.0), 0.0),

View File

@@ -514,6 +514,22 @@ class PinkDirectRuntime:
"fee": event.fee, # negative = rebate
"is_maker": event.is_maker,
})
# Gap 2: log settled fee with WS_SETTLED provenance so
# downstream can reconcile against the ESTIMATED_TAKER row.
if self.persistence is not None:
try:
# trade_id: best-effort from the client_order_id field ("c")
# or order_id ("i") — WS may not carry our trade_id directly.
ws_trade_id = str(event.client_order_id or event.order_id or "")
self.persistence.persist_fee_settled(
trade_id=ws_trade_id,
fee=event.fee,
fee_asset=event.fee_asset or "USDT",
is_maker=event.is_maker,
exchange_ts=event.exchange_ts,
)
except Exception as _fee_exc:
self.logger.debug("persist_fee_settled failed: %s", _fee_exc)
# Persist full kernel state after every settled fill for
# crash recovery + session-to-session calibration continuity.
_persist_kernel_snapshot(self.kernel, self.logger)