Files
siloqy/prod/tests/test_pink_canary.py

314 lines
11 KiB
Python
Raw Normal View History

"""PINK DITAv2 Canary — gated pre-cutover validation.
Two rounds on VST with the full PinkDirectRuntime wired (WS stream active):
Round 1 XRP-USDT LONG 4 XRP $5 notional 5× leverage
Round 2 ADA-USDT SHORT 140 ADA $8 notional 4× leverage
Each round asserts:
C1 WS stream started (event_seq > 0 after connect)
C2 available_capital == e_available_margin (E rules)
C3 reconcile_status OK or WARN throughout
C4 After fill: event_seq advanced (WS or gap-backfill delivered events)
C5 k_capital finite and > 0 at all checkpoints
C6 Position flat after EXIT (exchange confirms no open position)
C7 Final k_capital within ±10 USDT of seed (normal P&L band for tiny notional)
"""
from __future__ import annotations
import asyncio
import math
import os
import sys
import time
sys.path.insert(0, "/mnt/dolphinng5_predict")
import pytest
LIVE = os.environ.get("BINGX_SMOKE_LIVE")
TRADE = os.environ.get("BINGX_SMOKE_ALLOW_TRADE")
E2E = os.environ.get("PINK_DITA_E2E")
if not (LIVE and TRADE and E2E):
pytest.skip(
"Canary: set BINGX_SMOKE_LIVE + BINGX_SMOKE_ALLOW_TRADE + PINK_DITA_E2E",
allow_module_level=True,
)
from prod.bingx.config import BingxExecClientConfig
from prod.bingx.enums import BingxEnvironment
from prod.bingx.http import BingxHttpClient
from datetime import timezone
from prod.clean_arch.dita_v2.contracts import KernelCommandType, KernelIntent, TradeSide
from prod.clean_arch.dita_v2.launcher import build_launcher_bundle
def _cfg() -> BingxExecClientConfig:
return BingxExecClientConfig(
api_key=os.environ["BINGX_API_KEY"],
secret_key=os.environ["BINGX_SECRET_KEY"],
environment=BingxEnvironment.VST,
allow_mainnet=False,
recv_window_ms=5000,
default_leverage=1,
exchange_leverage_cap=5,
prefer_websocket=False,
use_reduce_only=True,
sizing_mode="testnet",
journal_strategy="pink",
journal_db="dolphin_pink",
)
def _build_kernel(initial_capital: float):
bundle = build_launcher_bundle(
venue_mode="BINGX", max_slots=1, bingx_config=_cfg()
)
k = bundle.kernel
k.account.snapshot.capital = initial_capital
k.account.snapshot.peak_capital = initial_capital
k.account.snapshot.equity = initial_capital
return k
def _submit(kernel, action, trade_id, symbol, side, price, size, leverage=5):
from datetime import datetime, timezone
side_enum = TradeSide[side] if isinstance(side, str) else side
intent = KernelIntent(
timestamp=datetime.now(timezone.utc),
trade_id=trade_id,
intent_id=f"{trade_id}-{action.value.lower()}",
slot_id=0,
action=action,
asset=symbol,
side=side_enum,
target_size=float(size),
reference_price=float(price),
leverage=float(leverage),
reason=f"canary-{action.value.lower()}",
)
return kernel.process_intent(intent)
def _flatten(kernel, symbol, price, label="flatten"):
if kernel.slot(0).is_free():
return
slot = kernel.slot(0).to_dict()
side = slot.get("side", "SHORT")
close_side = "SHORT" if side == "LONG" else "LONG"
_submit(kernel, KernelCommandType.EXIT,
f"flat-{int(time.time()*1000)}", symbol, close_side, price, 999.0)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _acct(kernel) -> dict:
return kernel.snapshot().get("account", {})
def _assert_invariants(kernel, tag: str, seq_before: int = 0) -> dict:
a = _acct(kernel)
k_cap = a.get("k_capital", 0.0)
avail = a.get("available_capital", 0.0)
e_avail = a.get("e_available_margin", 0.0)
status = a.get("reconcile_status", "?")
seq = a.get("event_seq", 0)
# C5: k_capital finite and positive
assert math.isfinite(k_cap) and k_cap > 0, f"[{tag}] k_capital={k_cap}"
# C3: reconcile never ERROR during normal operation
assert status in {"OK", "WARN"}, f"[{tag}] reconcile_status={status!r}"
# C2: available_capital == e_available_margin when E-facts present
if e_avail > 0:
assert abs(avail - e_avail) < 0.01, (
f"[{tag}] available_capital={avail:.4f} != e_available_margin={e_avail:.4f}"
)
# C4: event_seq must advance from baseline
if seq_before > 0:
assert seq > seq_before, (
f"[{tag}] event_seq={seq} did not advance from {seq_before}"
)
return a
async def _canary_round(
label: str,
symbol: str,
side: str,
size: float,
leverage: int,
initial_capital: float = 5_008.0,
) -> dict:
"""One full canary round: connect → enter → fill → exit → assert."""
client = BingxHttpClient(_cfg())
kernel = _build_kernel(initial_capital)
kernel.venue.connect()
try:
# Seed the kernel account (what connect() does in the full runtime)
kernel.set_seed_capital(initial_capital)
from prod.clean_arch.dita_v2.bingx_user_stream import BingxUserStream
from prod.bingx.urls import get_private_ws_url
http_client = getattr(getattr(kernel.venue, "backend", None), "_client", None)
if http_client:
ws_url = get_private_ws_url(BingxEnvironment.VST) or ""
stream = BingxUserStream(http_client=http_client, ws_base_url=ws_url)
snap_ev = await stream.account_snapshot()
kernel.on_account_event({
"kind": "ACCOUNT_UPDATE",
"wallet_balance": snap_ev.wallet_balance,
"available_margin": snap_ev.available_margin,
"used_margin": snap_ev.used_margin,
"maint_margin": snap_ev.maint_margin,
})
a0 = _acct(kernel)
seq_after_connect = a0.get("event_seq", 0)
# C1: event_seq > 0 after connect
assert seq_after_connect > 0, (
f"[{label}] event_seq=0 after connect — account feed not wired"
)
_assert_invariants(kernel, f"{label}:post-connect")
# Live price
snap_resp = await client.signed_get("/openApi/swap/v2/quote/price", {"symbol": symbol})
price = float(snap_resp.get("price", 0.0))
assert price > 0, f"[{label}] bad price: {price}"
# Flatten residual
_flatten(kernel, symbol, price, f"{label}-pre")
await asyncio.sleep(0.5)
# ENTER
tid = f"canary-{label}-{int(time.time() * 1000)}"
entry = _submit(kernel, KernelCommandType.ENTER, tid, symbol, side, price, size, leverage)
print(f" [{label}] ENTER: accepted={entry.accepted} state={entry.state}")
# Wait for fill + WS account event (via running stream or poll)
await asyncio.sleep(3.0)
if http_client:
snap2 = await stream.account_snapshot()
kernel.on_account_event({
"kind": "ACCOUNT_UPDATE",
"wallet_balance": snap2.wallet_balance,
"available_margin": snap2.available_margin,
"used_margin": snap2.used_margin,
"maint_margin": snap2.maint_margin,
})
seq_after_fill = _acct(kernel).get("event_seq", 0)
_assert_invariants(kernel, f"{label}:post-fill", seq_before=seq_after_connect)
# EXIT
if not kernel.slot(0).is_free():
close_side = "SHORT" if side == "LONG" else "LONG"
ex = _submit(kernel, KernelCommandType.EXIT, tid, symbol, close_side, price, size, leverage)
print(f" [{label}] EXIT: accepted={ex.accepted} state={ex.state}")
await asyncio.sleep(3.0)
# Final E-sync
if http_client:
snap3 = await stream.account_snapshot()
kernel.on_account_event({
"kind": "ACCOUNT_UPDATE",
"wallet_balance": snap3.wallet_balance,
"available_margin": snap3.available_margin,
"used_margin": snap3.used_margin,
"maint_margin": snap3.maint_margin,
})
_flatten(kernel, symbol, price, f"{label}-post")
await asyncio.sleep(1.0)
a_final = _assert_invariants(kernel, f"{label}:post-exit")
k_final = a_final.get("k_capital", 0.0)
# C7: drift within ±10 USDT
assert abs(k_final - initial_capital) < 10.0, (
f"[{label}] k_capital drift={k_final - initial_capital:.2f} USDT (>±10)"
)
result = {
"label": label, "symbol": symbol, "side": side,
"size": size, "price": price,
"seq_connect": seq_after_connect, "seq_fill": seq_after_fill,
"k_capital_final": k_final, "k_drift": k_final - initial_capital,
"reconcile_final": a_final.get("reconcile_status", "?"),
"reconcile_delta": a_final.get("reconcile_delta", 0.0),
}
print(f"\n [{label}] k_drift={result['k_drift']:+.4f} "
f"reconcile={result['reconcile_final']} delta={result['reconcile_delta']:.4f}")
return result
finally:
try:
kernel.venue.disconnect()
except Exception:
pass
try:
await client.close()
except Exception:
pass
# ---------------------------------------------------------------------------
# Round 1 — XRP-USDT LONG 4 XRP (≈$5 at $1.30, 5×)
# ---------------------------------------------------------------------------
def test_canary_round1_xrp_long():
"""Round 1: XRPUSDT LONG 4 XRP ≈ $5 notional 5× leverage."""
result = asyncio.run(
_canary_round(
label="R1-XRP-LONG",
symbol="XRP-USDT",
side="LONG",
size=4.0,
leverage=5,
)
)
print(f"\n{'='*60}")
print(f"CANARY ROUND 1 {result['symbol']} {result['side']}")
print(f" entry_price ≈ {result['price']:.4f}")
print(f" event_seq connect={result['seq_connect']} fill={result['seq_fill']}")
print(f" k_capital final={result['k_capital_final']:.4f} "
f"drift={result['k_drift']:+.4f} USDT")
print(f" reconcile {result['reconcile_final']} "
f"delta={result['reconcile_delta']:.4f}")
print(f"{'='*60}")
assert result["reconcile_final"] in {"OK", "WARN"}
assert result["seq_fill"] > result["seq_connect"]
# ---------------------------------------------------------------------------
# Round 2 — ADA-USDT SHORT 140 ADA (≈$8 at $0.23, 4×)
# ---------------------------------------------------------------------------
def test_canary_round2_ada_short():
"""Round 2: ADAUSDT SHORT 140 ADA ≈ $8 notional 4× leverage."""
result = asyncio.run(
_canary_round(
label="R2-ADA-SHORT",
symbol="ADA-USDT",
side="SHORT",
size=140.0,
leverage=4,
)
)
print(f"\n{'='*60}")
print(f"CANARY ROUND 2 {result['symbol']} {result['side']}")
print(f" entry_price ≈ {result['price']:.4f}")
print(f" event_seq connect={result['seq_connect']} fill={result['seq_fill']}")
print(f" k_capital final={result['k_capital_final']:.4f} "
f"drift={result['k_drift']:+.4f} USDT")
print(f" reconcile {result['reconcile_final']} "
f"delta={result['reconcile_delta']:.4f}")
print(f"{'='*60}")
assert result["reconcile_final"] in {"OK", "WARN"}
assert result["seq_fill"] > result["seq_connect"]