Files
siloqy/prod/tests/test_pink_capital_harness.py
Codex 4d15edcc54 PINK: fix multi-leg exit residual — carry kernel leg-index into legacy position
Root cause (harness multi_leg, ~14-TRX residual): pink_direct rebuilds the legacy
TradePosition from the kernel slot every step, but left exit_leg_index=0, so
IntentEngine.next_exit_ratio() consumed ratio[0] (0.5) on EVERY leg and never
advanced to the final leg's 1.0:
  leg1: 0.5×53 ≈ 26 closed -> 27 remain
  leg2: 0.5×27 ≈ 13 closed -> 14 RESIDUAL  (kernel believes flat, exchange isn't)

Fix: propagate the kernel slot's authoritative active_leg_index into the rebuilt
legacy position's exit_leg_index, so the intent engine consumes the correct leg
ratio. The final leg now closes the full remaining -> fully flattens.

Verified: offline 18 green (no regression); live VST harness multi_leg now closes
fully (XPASS) — residual gone, all 6 capital invariants hold. xfail mark removed;
capital-accounting battery is now fully green (7/7) on testnet.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-05-31 18:46:06 +02:00

421 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""PINK capital-accounting harness — automated scenario battery, live BingX VST.
Pre-cutover gate: drives the REAL PINK runtime (MarketSnapshot -> DecisionEngine ->
IntentEngine -> PinkDirectRuntime.step -> kernel -> BingX VST -> AccountProjection ->
PinkClickHousePersistence) through controlled scenarios via crafted snapshots, and
asserts capital-accounting correctness at every step. Controlled: flat account,
single scenarios sequentially, flatten-between, small (~$20) sizes, no autonomous loop.
Capital invariants asserted (the crux):
1. per-fill : Δcapital == realized PnL of that fill (kernel single authority)
2. end-of-run : kernel.capital == start + Σrealized (flat -> unrealized 0)
3. exchange : position flat + zero open orders (signed read)
4. persistence: trade_events.capital_before/after + account_events.capital match kernel
5. sizing : every order notional = size×price ≤ capital × max_leverage (never inf)
6. guards : suppressed/degenerate ENTERs place NO order; exits size from slot.size
Gates: BINGX_SMOKE_LIVE, BINGX_SMOKE_ALLOW_TRADE, PINK_DITA_E2E, PINK_CAPITAL_HARNESS.
Run on a FLAT account, from repo root, PYTHONPATH=/mnt/dolphinng5_predict.
"""
from __future__ import annotations
import asyncio
import os
from datetime import datetime, timezone
import pytest
for _gate in ("BINGX_SMOKE_LIVE", "BINGX_SMOKE_ALLOW_TRADE", "PINK_DITA_E2E", "PINK_CAPITAL_HARNESS"):
if not os.environ.get(_gate):
pytest.skip(f"{_gate} not set", allow_module_level=True)
from prod.tests.test_pink_bingx_dita_live_e2e import ( # noqa: E402
_build_config, _pick_sym, _snap, _verify, _check_open_orders, _flatten, _contract_rows,
)
from prod.bingx.http import BingxHttpClient # noqa: E402
from prod.clean_arch.dita import ( # noqa: E402
DecisionAction, DecisionConfig, DecisionEngine, IntentEngine,
)
from prod.clean_arch.dita_v2.launcher import build_launcher_bundle # noqa: E402
from prod.clean_arch.persistence import PinkClickHousePersistence # noqa: E402
from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime # noqa: E402
from prod.clean_arch.ports.data_feed import MarketSnapshot, DataFeedPort # noqa: E402
_MAX_LEVERAGE = 3.0
_CAP_FRACTION = 2.5e-4 # ~ $20 notional on 25k seed -> clears exchange min, safe vs margin
_SEED_CAPITAL = 25_000.0
class _CaptureSink:
def __init__(self) -> None:
self.rows: list[tuple[str, dict]] = []
def __call__(self, table: str, row: dict) -> None:
self.rows.append((table, dict(row)))
def of(self, table: str) -> list[dict]:
return [r for t, r in self.rows if t == table]
def tables(self) -> list[str]:
return [t for t, _ in self.rows]
class _StubFeed(DataFeedPort):
async def connect(self) -> bool:
return True
async def disconnect(self) -> None:
pass
async def get_latest_snapshot(self, symbol):
return None
async def subscribe_snapshots(self, callback) -> None:
pass
async def get_acb_update(self):
return None
def get_latency_ms(self) -> float:
return 0.0
def health_check(self) -> bool:
return True
def _config(exit_leg_ratios=(1.0,)) -> DecisionConfig:
return DecisionConfig(
vel_div_threshold=-0.02, vel_div_extreme=-0.05, fixed_tp_pct=0.0020,
max_hold_bars=250, capital_fraction=_CAP_FRACTION, max_leverage=_MAX_LEVERAGE,
min_irp_alignment=0.0, allow_long=False, allow_short=True,
exit_leg_ratios=exit_leg_ratios, policy_version="pink_capital_harness",
)
def _build_runtime(sink: _CaptureSink, exit_leg_ratios=(1.0,), capital=_SEED_CAPITAL):
cfg = _config(exit_leg_ratios)
bundle = build_launcher_bundle(venue_mode="BINGX", max_slots=1, bingx_config=_build_config(_SEED_CAPITAL))
k = bundle.kernel
k.account.snapshot.capital = capital
k.account.snapshot.peak_capital = capital if capital == capital else _SEED_CAPITAL
k.account.snapshot.equity = capital
persistence = PinkClickHousePersistence(k.account, sink=sink, v7_sink=sink)
runtime = PinkDirectRuntime(
data_feed=_StubFeed(), kernel=k,
decision_engine=DecisionEngine(cfg), intent_engine=IntentEngine(cfg),
persistence=persistence, market_state_runtime=None,
)
return runtime, k
def _snap_signal(symbol: str, price: float, vel_div: float) -> MarketSnapshot:
return MarketSnapshot(
timestamp=datetime.now(timezone.utc), symbol=symbol, price=price,
bid=price * 0.999, ask=price * 1.001, eigenvalues=[1.0],
velocity_divergence=vel_div, irp_alignment=0.5, scan_number=1, source="harness",
)
async def _await(kernel, predicate, *, timeout_s: float = 12.0, step_s: float = 0.5) -> bool:
waited = 0.0
while waited < timeout_s:
if predicate(kernel.slot(0)):
return True
await asyncio.sleep(step_s)
waited += step_s
return predicate(kernel.slot(0))
def _capital(kernel) -> float:
return float(kernel.account.snapshot.capital or 0.0)
def _realized(kernel) -> float:
return sum(float(kernel.slot(i).realized_pnl or 0.0) for i in range(kernel.max_slots))
async def _full_flatten(client, vsym):
try:
oos = await _check_open_orders(client, vsym)
if oos:
await client._request_json("DELETE", "/openApi/swap/v2/trade/allOpenOrders", {"symbol": vsym}, signed=True)
except Exception:
pass
def _pf(row, *keys) -> float:
for k in keys:
try:
v = float(row.get(k) or 0.0)
except Exception:
continue
if v != 0.0:
return v
return 0.0
async def _ensure_account_flat(client) -> None:
"""Reliable exchange-truth close-all via the proven kernel EXIT path (mirrors
the standalone flatten tool): build a throwaway BingX kernel, reconcile each
open position into the slot and EXIT it (reduce-only MARKET). Handles
multi-symbol residuals so every scenario starts from a verified-flat account
and a residual-leaving scenario (e.g. the known multi_leg one) cannot cascade
into the rest of the battery."""
from prod.clean_arch.dita_v2.contracts import (
TradeSlot, TradeSide, TradeStage, KernelIntent, KernelCommandType,
)
bundle = build_launcher_bundle(venue_mode="BINGX", max_slots=1, bingx_config=_build_config(_SEED_CAPITAL))
k = bundle.kernel
k.venue.connect()
qty_keys = ("positionAmt", "positionQty", "positionSize", "quantity", "pa", "qty")
positions = [p for p in k.venue.open_positions() if abs(_pf(p, *qty_keys)) > 1e-12]
for p in positions:
amt = _pf(p, *qty_keys)
qty = abs(amt)
raw_side = str(p.get("positionSide") or p.get("side") or "").upper()
side = TradeSide.SHORT if raw_side in {"SHORT", "SELL"} or amt < 0 else TradeSide.LONG
entry = _pf(p, "entryPrice", "avgPrice", "avgEntryPrice", "ep", "ap", "price")
mark = _pf(p, "markPrice", "mark", "price") or entry
lev = _pf(p, "leverage", "lev") or 1.0
asset = str(p.get("symbol") or p.get("symbolName") or "").replace("-", "").upper()
if qty <= 0 or not asset:
continue
k.reconcile_from_slots([TradeSlot(
slot_id=0, trade_id=asset, asset=asset, side=side, entry_price=entry or mark,
size=qty, initial_size=qty, leverage=lev, entry_time=datetime.now(timezone.utc),
fsm_state=TradeStage.POSITION_OPEN, metadata={"flatten": True},
)])
try:
k.process_intent(KernelIntent(
timestamp=datetime.now(timezone.utc), intent_id=f"flat-{asset}", trade_id=asset,
slot_id=0, asset=asset, side=side, action=KernelCommandType.EXIT,
reference_price=mark, target_size=qty, leverage=lev, exit_leg_ratios=(1.0,),
reason="FLATTEN", metadata={},
))
except Exception:
pass
try:
rows = await _contract_rows(client)
for s in {str(r.get("symbol") or "") for r in rows if isinstance(r, dict)}:
if s:
try:
await client._request_json("DELETE", "/openApi/swap/v2/trade/allOpenOrders", {"symbol": s}, signed=True)
except Exception:
pass
except Exception:
pass
await asyncio.sleep(0.8)
# --------------------------------------------------------------------------
# scenario primitives
# --------------------------------------------------------------------------
async def _open(runtime, kernel, symbol: str, price: float) -> float:
cap_before = _capital(kernel)
dec = await runtime.step(_snap_signal(symbol, price, vel_div=-0.05))
assert dec.action == DecisionAction.ENTER, f"expected ENTER, got {dec.action}/{dec.reason}"
assert await _await(kernel, lambda s: s.is_open() and s.size > 0), (
f"position never opened (state={kernel.slot(0).fsm_state}, size={kernel.slot(0).size})"
)
assert abs(_capital(kernel) - cap_before) < 1e-6, "entry must not realize PnL / move capital"
slot = kernel.slot(0)
entry = float(slot.entry_price or price)
# invariant 5: notional bound (margin-self-limiting)
notional = float(slot.size) * entry
assert notional <= _capital(kernel) * _MAX_LEVERAGE + 1e-6, (
f"notional {notional} exceeds margin bound {_capital(kernel) * _MAX_LEVERAGE}"
)
return entry
async def _exit_leg(runtime, kernel, symbol: str, entry_price: float) -> bool:
cap_before = _capital(kernel)
rp_before = _realized(kernel)
size_before = float(kernel.slot(0).size or 0.0)
dec = await runtime.step(_snap_signal(symbol, entry_price * 0.99, vel_div=0.0))
assert dec.action == DecisionAction.EXIT, f"expected EXIT, got {dec.action}/{dec.reason}"
await _await(
kernel,
lambda s: s.closed or float(s.realized_pnl or 0.0) != rp_before or float(s.size or 0.0) < size_before - 1e-12,
)
leg_realized = _realized(kernel) - rp_before
# invariant 1: per-fill Δcapital == realized PnL of that fill
assert abs((_capital(kernel) - cap_before) - leg_realized) < 1e-6, (
f"per-fill mismatch: Δcap={_capital(kernel) - cap_before} realized_leg={leg_realized}"
)
# Accumulate the cumulative realized across the whole scenario. slot.realized_pnl
# resets on each ENTER (Flaw 13), so multi-cycle reconciliation must sum the
# per-fill deltas, not read the slot's current realized.
runtime.__dict__.setdefault("_realized_legs", []).append(leg_realized)
return kernel.slot(0).closed
def _assert_end_invariants(kernel, start_cap: float, total_realized: float, sink: _CaptureSink):
cap = _capital(kernel)
realized = total_realized # cumulative across cycles (slot.realized resets on ENTER)
# invariant 2: capital moved EXACTLY by the sum of per-fill realized PnL — no
# phantom capital movement (entries don't move capital; each exit moves by its
# realized). Flat at end -> no unrealized component.
assert abs((cap - start_cap) - realized) < 1e-6, (
f"end reconciliation: Δcap={cap - start_cap} Σrealized={realized} (cap={cap} start={start_cap})"
)
# invariant 4: persistence parity
tes = sink.of("trade_events")
if tes:
assert abs(float(tes[-1]["capital_after"]) - cap) < 1e-6, "trade_events.capital_after != kernel capital"
assert abs(float(tes[-1]["capital_after"]) - float(tes[-1]["capital_before"]) - float(tes[-1]["pnl"])) < 1e-6, (
"trade_events: capital_after - capital_before != pnl"
)
aes = sink.of("account_events")
if aes:
assert abs(float(aes[-1]["capital"]) - cap) < 1e-6, "account_events.capital != kernel capital"
legs = sink.of("trade_exit_legs")
if legs:
leg_sum = sum(float(r["pnl_leg"]) for r in legs)
assert abs(leg_sum - realized) < 1e-6, f"Σ trade_exit_legs.pnl_leg {leg_sum} != realized {realized}"
# --------------------------------------------------------------------------
# trading scenarios (SHORT path = PINK policy)
# --------------------------------------------------------------------------
async def _sc_round_trip(runtime, kernel, symbol, price):
e = await _open(runtime, kernel, symbol, price)
closed = await _exit_leg(runtime, kernel, symbol, e)
assert closed, "single-leg exit did not close the position"
async def _sc_multi_leg(runtime, kernel, symbol, price):
e = await _open(runtime, kernel, symbol, price)
closed1 = await _exit_leg(runtime, kernel, symbol, e) # leg 1 (0.5)
assert not closed1, "first multi-leg exit should not fully close"
closed2 = await _exit_leg(runtime, kernel, symbol, e) # leg 2 (remainder)
assert closed2, "final multi-leg exit must close"
async def _sc_sequential(runtime, kernel, symbol, price):
for _ in range(2):
e = await _open(runtime, kernel, symbol, price)
assert await _exit_leg(runtime, kernel, symbol, e), "sequential cycle did not close"
await asyncio.sleep(1.0)
async def _sc_exit_then_reentry(runtime, kernel, symbol, price):
e = await _open(runtime, kernel, symbol, price)
assert await _exit_leg(runtime, kernel, symbol, e), "first close failed"
await asyncio.sleep(1.0)
e2 = await _open(runtime, kernel, symbol, price)
assert await _exit_leg(runtime, kernel, symbol, e2), "re-entry close failed"
_TRADING_SCENARIOS = {
"round_trip": ((1.0,), _sc_round_trip),
"multi_leg": ((0.5, 1.0), _sc_multi_leg),
"sequential": ((1.0,), _sc_sequential),
"exit_then_reentry": ((1.0,), _sc_exit_then_reentry),
}
@pytest.mark.parametrize("name", list(_TRADING_SCENARIOS))
def test_pink_capital(name):
ratios, scenario = _TRADING_SCENARIOS[name]
async def _run():
sink = _CaptureSink()
runtime, kernel = _build_runtime(sink, exit_leg_ratios=ratios)
client = BingxHttpClient(_build_config())
sym = await _pick_sym(kernel, client)
snap, vsym = await _snap(client, sym)
price = float(snap.price)
await _ensure_account_flat(client) # best-effort exchange-truth pre-clean
await runtime.connect(initial_capital=_SEED_CAPITAL)
try:
# connect reconciled any leftover position into the slot; close it via
# the proven kernel path (reliable for the single-symbol residual case).
for _ in range(4):
if kernel.slot(0).is_free():
break
_flatten(kernel, kernel.slot(0).asset or sym, price, "harness-pre")
await _await(kernel, lambda s: s.is_free(), timeout_s=8)
await _full_flatten(client, vsym)
assert kernel.slot(0).is_free(), (
f"slot not free after pre-flatten (state={kernel.slot(0).fsm_state})"
)
runtime.__dict__["_realized_legs"] = []
start_cap = _capital(kernel)
await scenario(runtime, kernel, sym, price)
total_realized = sum(runtime.__dict__.get("_realized_legs", []))
_assert_end_invariants(kernel, start_cap, total_realized, sink)
finally:
if not kernel.slot(0).is_free():
_flatten(kernel, sym, price, "harness-post")
await asyncio.sleep(1.0)
await _full_flatten(client, vsym)
# invariant 3: exchange flat + no dangling orders
vr = await _verify(client, vsym)
assert vr.positions_flat, f"exchange not flat: {vr.error}"
asyncio.run(_run())
# --------------------------------------------------------------------------
# guard scenarios (invariant 6) — no live order expected
# --------------------------------------------------------------------------
def test_guard_suppressed_nonfinite_capital():
async def _run():
sink = _CaptureSink()
runtime, kernel = _build_runtime(sink, capital=float("inf"))
client = BingxHttpClient(_build_config())
sym = await _pick_sym(kernel, client)
snap, vsym = await _snap(client, sym)
await _ensure_account_flat(client)
await runtime.connect(initial_capital=_SEED_CAPITAL)
kernel.account.snapshot.capital = float("inf") # re-poison after connect seed
dec = await runtime.step(_snap_signal(sym, float(snap.price), vel_div=-0.05))
assert dec.action == DecisionAction.ENTER # policy decided enter...
assert kernel.slot(0).is_free(), "ENTER must be suppressed on non-finite capital"
vr = await _verify(client, vsym)
assert vr.positions_flat, f"account must be untouched: {vr.error}"
asyncio.run(_run())
def test_guard_suppressed_subfloor_price():
async def _run():
sink = _CaptureSink()
runtime, kernel = _build_runtime(sink)
client = BingxHttpClient(_build_config())
sym = await _pick_sym(kernel, client)
_, vsym = await _snap(client, sym)
await _ensure_account_flat(client)
await runtime.connect(initial_capital=_SEED_CAPITAL)
await runtime.step(_snap_signal(sym, 1e-12, vel_div=-0.05))
assert kernel.slot(0).is_free(), "ENTER must be suppressed on sub-floor price"
vr = await _verify(client, vsym)
assert vr.positions_flat, f"account must be untouched: {vr.error}"
asyncio.run(_run())
def test_guard_degenerate_snapshot_holds():
async def _run():
sink = _CaptureSink()
runtime, kernel = _build_runtime(sink)
client = BingxHttpClient(_build_config())
sym = await _pick_sym(kernel, client)
snap, vsym = await _snap(client, sym)
await _ensure_account_flat(client)
await runtime.connect(initial_capital=_SEED_CAPITAL)
# Degenerate feed (mimics the stddev-NaN data lead): non-finite vel_div.
dec = await runtime.step(_snap_signal(sym, float(snap.price), vel_div=float("nan")))
assert dec.action != DecisionAction.ENTER, f"degenerate snapshot must not ENTER (got {dec.reason})"
assert kernel.slot(0).is_free()
vr = await _verify(client, vsym)
assert vr.positions_flat, f"account must be untouched: {vr.error}"
asyncio.run(_run())