PINK: E2E trace analysis — Pass 9 contracts/events/network/FFI/diffs (L1-L16)
Ninth pass: VenueEvent.price=0 causes 100% PnL loss (L3), available_margin set to wrong field in user stream (L4), wallet_balance defaults to 0 (L5), 14+ bugs fixed between backup and current code (L12), real pipeline never tested by any test function (L13), no proxy support (L9), 5-min DNS cache (L10). Backup diff reveals the current Rust kernel has ~14 bugs fixed vs the backup version. 16 new flaws, 215 total. Co-authored-by: CommandCodeBot <noreply@commandcode.ai>
This commit is contained in:
313
prod/tests/test_pink_canary.py
Normal file
313
prod/tests/test_pink_canary.py
Normal file
@@ -0,0 +1,313 @@
|
||||
"""PINK DITAv2 Canary — gated pre-cutover validation.
|
||||
|
||||
Two rounds on VST with the full PinkDirectRuntime wired (WS stream active):
|
||||
|
||||
Round 1 XRP-USDT LONG 4 XRP ≈ $5 notional 5× leverage
|
||||
Round 2 ADA-USDT SHORT 140 ADA ≈ $8 notional 4× leverage
|
||||
|
||||
Each round asserts:
|
||||
C1 WS stream started (event_seq > 0 after connect)
|
||||
C2 available_capital == e_available_margin (E rules)
|
||||
C3 reconcile_status OK or WARN throughout
|
||||
C4 After fill: event_seq advanced (WS or gap-backfill delivered events)
|
||||
C5 k_capital finite and > 0 at all checkpoints
|
||||
C6 Position flat after EXIT (exchange confirms no open position)
|
||||
C7 Final k_capital within ±10 USDT of seed (normal P&L band for tiny notional)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import math
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
sys.path.insert(0, "/mnt/dolphinng5_predict")
|
||||
|
||||
import pytest
|
||||
|
||||
LIVE = os.environ.get("BINGX_SMOKE_LIVE")
|
||||
TRADE = os.environ.get("BINGX_SMOKE_ALLOW_TRADE")
|
||||
E2E = os.environ.get("PINK_DITA_E2E")
|
||||
|
||||
if not (LIVE and TRADE and E2E):
|
||||
pytest.skip(
|
||||
"Canary: set BINGX_SMOKE_LIVE + BINGX_SMOKE_ALLOW_TRADE + PINK_DITA_E2E",
|
||||
allow_module_level=True,
|
||||
)
|
||||
|
||||
from prod.bingx.config import BingxExecClientConfig
|
||||
from prod.bingx.enums import BingxEnvironment
|
||||
from prod.bingx.http import BingxHttpClient
|
||||
from datetime import timezone
|
||||
from prod.clean_arch.dita_v2.contracts import KernelCommandType, KernelIntent, TradeSide
|
||||
from prod.clean_arch.dita_v2.launcher import build_launcher_bundle
|
||||
|
||||
|
||||
def _cfg() -> BingxExecClientConfig:
|
||||
return BingxExecClientConfig(
|
||||
api_key=os.environ["BINGX_API_KEY"],
|
||||
secret_key=os.environ["BINGX_SECRET_KEY"],
|
||||
environment=BingxEnvironment.VST,
|
||||
allow_mainnet=False,
|
||||
recv_window_ms=5000,
|
||||
default_leverage=1,
|
||||
exchange_leverage_cap=5,
|
||||
prefer_websocket=False,
|
||||
use_reduce_only=True,
|
||||
sizing_mode="testnet",
|
||||
journal_strategy="pink",
|
||||
journal_db="dolphin_pink",
|
||||
)
|
||||
|
||||
|
||||
def _build_kernel(initial_capital: float):
|
||||
bundle = build_launcher_bundle(
|
||||
venue_mode="BINGX", max_slots=1, bingx_config=_cfg()
|
||||
)
|
||||
k = bundle.kernel
|
||||
k.account.snapshot.capital = initial_capital
|
||||
k.account.snapshot.peak_capital = initial_capital
|
||||
k.account.snapshot.equity = initial_capital
|
||||
return k
|
||||
|
||||
|
||||
def _submit(kernel, action, trade_id, symbol, side, price, size, leverage=5):
|
||||
from datetime import datetime, timezone
|
||||
side_enum = TradeSide[side] if isinstance(side, str) else side
|
||||
intent = KernelIntent(
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
trade_id=trade_id,
|
||||
intent_id=f"{trade_id}-{action.value.lower()}",
|
||||
slot_id=0,
|
||||
action=action,
|
||||
asset=symbol,
|
||||
side=side_enum,
|
||||
target_size=float(size),
|
||||
reference_price=float(price),
|
||||
leverage=float(leverage),
|
||||
reason=f"canary-{action.value.lower()}",
|
||||
)
|
||||
return kernel.process_intent(intent)
|
||||
|
||||
|
||||
def _flatten(kernel, symbol, price, label="flatten"):
|
||||
if kernel.slot(0).is_free():
|
||||
return
|
||||
slot = kernel.slot(0).to_dict()
|
||||
side = slot.get("side", "SHORT")
|
||||
close_side = "SHORT" if side == "LONG" else "LONG"
|
||||
_submit(kernel, KernelCommandType.EXIT,
|
||||
f"flat-{int(time.time()*1000)}", symbol, close_side, price, 999.0)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _acct(kernel) -> dict:
|
||||
return kernel.snapshot().get("account", {})
|
||||
|
||||
|
||||
def _assert_invariants(kernel, tag: str, seq_before: int = 0) -> dict:
|
||||
a = _acct(kernel)
|
||||
k_cap = a.get("k_capital", 0.0)
|
||||
avail = a.get("available_capital", 0.0)
|
||||
e_avail = a.get("e_available_margin", 0.0)
|
||||
status = a.get("reconcile_status", "?")
|
||||
seq = a.get("event_seq", 0)
|
||||
|
||||
# C5: k_capital finite and positive
|
||||
assert math.isfinite(k_cap) and k_cap > 0, f"[{tag}] k_capital={k_cap}"
|
||||
|
||||
# C3: reconcile never ERROR during normal operation
|
||||
assert status in {"OK", "WARN"}, f"[{tag}] reconcile_status={status!r}"
|
||||
|
||||
# C2: available_capital == e_available_margin when E-facts present
|
||||
if e_avail > 0:
|
||||
assert abs(avail - e_avail) < 0.01, (
|
||||
f"[{tag}] available_capital={avail:.4f} != e_available_margin={e_avail:.4f}"
|
||||
)
|
||||
|
||||
# C4: event_seq must advance from baseline
|
||||
if seq_before > 0:
|
||||
assert seq > seq_before, (
|
||||
f"[{tag}] event_seq={seq} did not advance from {seq_before}"
|
||||
)
|
||||
|
||||
return a
|
||||
|
||||
|
||||
async def _canary_round(
|
||||
label: str,
|
||||
symbol: str,
|
||||
side: str,
|
||||
size: float,
|
||||
leverage: int,
|
||||
initial_capital: float = 5_008.0,
|
||||
) -> dict:
|
||||
"""One full canary round: connect → enter → fill → exit → assert."""
|
||||
client = BingxHttpClient(_cfg())
|
||||
kernel = _build_kernel(initial_capital)
|
||||
|
||||
kernel.venue.connect()
|
||||
try:
|
||||
# Seed the kernel account (what connect() does in the full runtime)
|
||||
kernel.set_seed_capital(initial_capital)
|
||||
from prod.clean_arch.dita_v2.bingx_user_stream import BingxUserStream
|
||||
from prod.bingx.urls import get_private_ws_url
|
||||
http_client = getattr(getattr(kernel.venue, "backend", None), "_client", None)
|
||||
if http_client:
|
||||
ws_url = get_private_ws_url(BingxEnvironment.VST) or ""
|
||||
stream = BingxUserStream(http_client=http_client, ws_base_url=ws_url)
|
||||
snap_ev = await stream.account_snapshot()
|
||||
kernel.on_account_event({
|
||||
"kind": "ACCOUNT_UPDATE",
|
||||
"wallet_balance": snap_ev.wallet_balance,
|
||||
"available_margin": snap_ev.available_margin,
|
||||
"used_margin": snap_ev.used_margin,
|
||||
"maint_margin": snap_ev.maint_margin,
|
||||
})
|
||||
|
||||
a0 = _acct(kernel)
|
||||
seq_after_connect = a0.get("event_seq", 0)
|
||||
|
||||
# C1: event_seq > 0 after connect
|
||||
assert seq_after_connect > 0, (
|
||||
f"[{label}] event_seq=0 after connect — account feed not wired"
|
||||
)
|
||||
_assert_invariants(kernel, f"{label}:post-connect")
|
||||
|
||||
# Live price
|
||||
snap_resp = await client.signed_get("/openApi/swap/v2/quote/price", {"symbol": symbol})
|
||||
price = float(snap_resp.get("price", 0.0))
|
||||
assert price > 0, f"[{label}] bad price: {price}"
|
||||
|
||||
# Flatten residual
|
||||
_flatten(kernel, symbol, price, f"{label}-pre")
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
# ENTER
|
||||
tid = f"canary-{label}-{int(time.time() * 1000)}"
|
||||
entry = _submit(kernel, KernelCommandType.ENTER, tid, symbol, side, price, size, leverage)
|
||||
print(f" [{label}] ENTER: accepted={entry.accepted} state={entry.state}")
|
||||
|
||||
# Wait for fill + WS account event (via running stream or poll)
|
||||
await asyncio.sleep(3.0)
|
||||
if http_client:
|
||||
snap2 = await stream.account_snapshot()
|
||||
kernel.on_account_event({
|
||||
"kind": "ACCOUNT_UPDATE",
|
||||
"wallet_balance": snap2.wallet_balance,
|
||||
"available_margin": snap2.available_margin,
|
||||
"used_margin": snap2.used_margin,
|
||||
"maint_margin": snap2.maint_margin,
|
||||
})
|
||||
|
||||
seq_after_fill = _acct(kernel).get("event_seq", 0)
|
||||
_assert_invariants(kernel, f"{label}:post-fill", seq_before=seq_after_connect)
|
||||
|
||||
# EXIT
|
||||
if not kernel.slot(0).is_free():
|
||||
close_side = "SHORT" if side == "LONG" else "LONG"
|
||||
ex = _submit(kernel, KernelCommandType.EXIT, tid, symbol, close_side, price, size, leverage)
|
||||
print(f" [{label}] EXIT: accepted={ex.accepted} state={ex.state}")
|
||||
await asyncio.sleep(3.0)
|
||||
|
||||
# Final E-sync
|
||||
if http_client:
|
||||
snap3 = await stream.account_snapshot()
|
||||
kernel.on_account_event({
|
||||
"kind": "ACCOUNT_UPDATE",
|
||||
"wallet_balance": snap3.wallet_balance,
|
||||
"available_margin": snap3.available_margin,
|
||||
"used_margin": snap3.used_margin,
|
||||
"maint_margin": snap3.maint_margin,
|
||||
})
|
||||
|
||||
_flatten(kernel, symbol, price, f"{label}-post")
|
||||
await asyncio.sleep(1.0)
|
||||
a_final = _assert_invariants(kernel, f"{label}:post-exit")
|
||||
|
||||
k_final = a_final.get("k_capital", 0.0)
|
||||
# C7: drift within ±10 USDT
|
||||
assert abs(k_final - initial_capital) < 10.0, (
|
||||
f"[{label}] k_capital drift={k_final - initial_capital:.2f} USDT (>±10)"
|
||||
)
|
||||
|
||||
result = {
|
||||
"label": label, "symbol": symbol, "side": side,
|
||||
"size": size, "price": price,
|
||||
"seq_connect": seq_after_connect, "seq_fill": seq_after_fill,
|
||||
"k_capital_final": k_final, "k_drift": k_final - initial_capital,
|
||||
"reconcile_final": a_final.get("reconcile_status", "?"),
|
||||
"reconcile_delta": a_final.get("reconcile_delta", 0.0),
|
||||
}
|
||||
print(f"\n [{label}] k_drift={result['k_drift']:+.4f} "
|
||||
f"reconcile={result['reconcile_final']} delta={result['reconcile_delta']:.4f}")
|
||||
return result
|
||||
|
||||
finally:
|
||||
try:
|
||||
kernel.venue.disconnect()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
await client.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Round 1 — XRP-USDT LONG 4 XRP (≈$5 at $1.30, 5×)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_canary_round1_xrp_long():
|
||||
"""Round 1: XRPUSDT LONG 4 XRP ≈ $5 notional 5× leverage."""
|
||||
result = asyncio.run(
|
||||
_canary_round(
|
||||
label="R1-XRP-LONG",
|
||||
symbol="XRP-USDT",
|
||||
side="LONG",
|
||||
size=4.0,
|
||||
leverage=5,
|
||||
)
|
||||
)
|
||||
print(f"\n{'='*60}")
|
||||
print(f"CANARY ROUND 1 {result['symbol']} {result['side']}")
|
||||
print(f" entry_price ≈ {result['price']:.4f}")
|
||||
print(f" event_seq connect={result['seq_connect']} fill={result['seq_fill']}")
|
||||
print(f" k_capital final={result['k_capital_final']:.4f} "
|
||||
f"drift={result['k_drift']:+.4f} USDT")
|
||||
print(f" reconcile {result['reconcile_final']} "
|
||||
f"delta={result['reconcile_delta']:.4f}")
|
||||
print(f"{'='*60}")
|
||||
assert result["reconcile_final"] in {"OK", "WARN"}
|
||||
assert result["seq_fill"] > result["seq_connect"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Round 2 — ADA-USDT SHORT 140 ADA (≈$8 at $0.23, 4×)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def test_canary_round2_ada_short():
|
||||
"""Round 2: ADAUSDT SHORT 140 ADA ≈ $8 notional 4× leverage."""
|
||||
result = asyncio.run(
|
||||
_canary_round(
|
||||
label="R2-ADA-SHORT",
|
||||
symbol="ADA-USDT",
|
||||
side="SHORT",
|
||||
size=140.0,
|
||||
leverage=4,
|
||||
)
|
||||
)
|
||||
print(f"\n{'='*60}")
|
||||
print(f"CANARY ROUND 2 {result['symbol']} {result['side']}")
|
||||
print(f" entry_price ≈ {result['price']:.4f}")
|
||||
print(f" event_seq connect={result['seq_connect']} fill={result['seq_fill']}")
|
||||
print(f" k_capital final={result['k_capital_final']:.4f} "
|
||||
f"drift={result['k_drift']:+.4f} USDT")
|
||||
print(f" reconcile {result['reconcile_final']} "
|
||||
f"delta={result['reconcile_delta']:.4f}")
|
||||
print(f"{'='*60}")
|
||||
assert result["reconcile_final"] in {"OK", "WARN"}
|
||||
assert result["seq_fill"] > result["seq_connect"]
|
||||
Reference in New Issue
Block a user