diff --git a/prod/tests/test_pink_capital_harness.py b/prod/tests/test_pink_capital_harness.py new file mode 100644 index 0000000..44d9ad2 --- /dev/null +++ b/prod/tests/test_pink_capital_harness.py @@ -0,0 +1,429 @@ +#!/usr/bin/env python3 +"""PINK capital-accounting harness — automated scenario battery, live BingX VST. + +Pre-cutover gate: drives the REAL PINK runtime (MarketSnapshot -> DecisionEngine -> +IntentEngine -> PinkDirectRuntime.step -> kernel -> BingX VST -> AccountProjection -> +PinkClickHousePersistence) through controlled scenarios via crafted snapshots, and +asserts capital-accounting correctness at every step. Controlled: flat account, +single scenarios sequentially, flatten-between, small (~$20) sizes, no autonomous loop. + +Capital invariants asserted (the crux): + 1. per-fill : Δcapital == realized PnL of that fill (kernel single authority) + 2. end-of-run : kernel.capital == start + Σrealized (flat -> unrealized 0) + 3. exchange : position flat + zero open orders (signed read) + 4. persistence: trade_events.capital_before/after + account_events.capital match kernel + 5. sizing : every order notional = size×price ≤ capital × max_leverage (never inf) + 6. guards : suppressed/degenerate ENTERs place NO order; exits size from slot.size + +Gates: BINGX_SMOKE_LIVE, BINGX_SMOKE_ALLOW_TRADE, PINK_DITA_E2E, PINK_CAPITAL_HARNESS. +Run on a FLAT account, from repo root, PYTHONPATH=/mnt/dolphinng5_predict. +""" + +from __future__ import annotations + +import asyncio +import os +from datetime import datetime, timezone + +import pytest + +for _gate in ("BINGX_SMOKE_LIVE", "BINGX_SMOKE_ALLOW_TRADE", "PINK_DITA_E2E", "PINK_CAPITAL_HARNESS"): + if not os.environ.get(_gate): + pytest.skip(f"{_gate} not set", allow_module_level=True) + +from prod.tests.test_pink_bingx_dita_live_e2e import ( # noqa: E402 + _build_config, _pick_sym, _snap, _verify, _check_open_orders, _flatten, _contract_rows, +) +from prod.bingx.http import BingxHttpClient # noqa: E402 +from prod.clean_arch.dita import ( # noqa: E402 + DecisionAction, DecisionConfig, DecisionEngine, IntentEngine, +) +from prod.clean_arch.dita_v2.launcher import build_launcher_bundle # noqa: E402 +from prod.clean_arch.persistence import PinkClickHousePersistence # noqa: E402 +from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime # noqa: E402 +from prod.clean_arch.ports.data_feed import MarketSnapshot, DataFeedPort # noqa: E402 + +_MAX_LEVERAGE = 3.0 +_CAP_FRACTION = 2.5e-4 # ~ $20 notional on 25k seed -> clears exchange min, safe vs margin +_SEED_CAPITAL = 25_000.0 + + +class _CaptureSink: + def __init__(self) -> None: + self.rows: list[tuple[str, dict]] = [] + + def __call__(self, table: str, row: dict) -> None: + self.rows.append((table, dict(row))) + + def of(self, table: str) -> list[dict]: + return [r for t, r in self.rows if t == table] + + def tables(self) -> list[str]: + return [t for t, _ in self.rows] + + +class _StubFeed(DataFeedPort): + async def connect(self) -> bool: + return True + + async def disconnect(self) -> None: + pass + + async def get_latest_snapshot(self, symbol): + return None + + async def subscribe_snapshots(self, callback) -> None: + pass + + async def get_acb_update(self): + return None + + def get_latency_ms(self) -> float: + return 0.0 + + def health_check(self) -> bool: + return True + + +def _config(exit_leg_ratios=(1.0,)) -> DecisionConfig: + return DecisionConfig( + vel_div_threshold=-0.02, vel_div_extreme=-0.05, fixed_tp_pct=0.0020, + max_hold_bars=250, capital_fraction=_CAP_FRACTION, max_leverage=_MAX_LEVERAGE, + min_irp_alignment=0.0, allow_long=False, allow_short=True, + exit_leg_ratios=exit_leg_ratios, policy_version="pink_capital_harness", + ) + + +def _build_runtime(sink: _CaptureSink, exit_leg_ratios=(1.0,), capital=_SEED_CAPITAL): + cfg = _config(exit_leg_ratios) + bundle = build_launcher_bundle(venue_mode="BINGX", max_slots=1, bingx_config=_build_config(_SEED_CAPITAL)) + k = bundle.kernel + k.account.snapshot.capital = capital + k.account.snapshot.peak_capital = capital if capital == capital else _SEED_CAPITAL + k.account.snapshot.equity = capital + persistence = PinkClickHousePersistence(k.account, sink=sink, v7_sink=sink) + runtime = PinkDirectRuntime( + data_feed=_StubFeed(), kernel=k, + decision_engine=DecisionEngine(cfg), intent_engine=IntentEngine(cfg), + persistence=persistence, market_state_runtime=None, + ) + return runtime, k + + +def _snap_signal(symbol: str, price: float, vel_div: float) -> MarketSnapshot: + return MarketSnapshot( + timestamp=datetime.now(timezone.utc), symbol=symbol, price=price, + bid=price * 0.999, ask=price * 1.001, eigenvalues=[1.0], + velocity_divergence=vel_div, irp_alignment=0.5, scan_number=1, source="harness", + ) + + +async def _await(kernel, predicate, *, timeout_s: float = 12.0, step_s: float = 0.5) -> bool: + waited = 0.0 + while waited < timeout_s: + if predicate(kernel.slot(0)): + return True + await asyncio.sleep(step_s) + waited += step_s + return predicate(kernel.slot(0)) + + +def _capital(kernel) -> float: + return float(kernel.account.snapshot.capital or 0.0) + + +def _realized(kernel) -> float: + return sum(float(kernel.slot(i).realized_pnl or 0.0) for i in range(kernel.max_slots)) + + +async def _full_flatten(client, vsym): + try: + oos = await _check_open_orders(client, vsym) + if oos: + await client._request_json("DELETE", "/openApi/swap/v2/trade/allOpenOrders", {"symbol": vsym}, signed=True) + except Exception: + pass + + +def _pf(row, *keys) -> float: + for k in keys: + try: + v = float(row.get(k) or 0.0) + except Exception: + continue + if v != 0.0: + return v + return 0.0 + + +async def _ensure_account_flat(client) -> None: + """Reliable exchange-truth close-all via the proven kernel EXIT path (mirrors + the standalone flatten tool): build a throwaway BingX kernel, reconcile each + open position into the slot and EXIT it (reduce-only MARKET). Handles + multi-symbol residuals so every scenario starts from a verified-flat account + and a residual-leaving scenario (e.g. the known multi_leg one) cannot cascade + into the rest of the battery.""" + from prod.clean_arch.dita_v2.contracts import ( + TradeSlot, TradeSide, TradeStage, KernelIntent, KernelCommandType, + ) + bundle = build_launcher_bundle(venue_mode="BINGX", max_slots=1, bingx_config=_build_config(_SEED_CAPITAL)) + k = bundle.kernel + k.venue.connect() + qty_keys = ("positionAmt", "positionQty", "positionSize", "quantity", "pa", "qty") + positions = [p for p in k.venue.open_positions() if abs(_pf(p, *qty_keys)) > 1e-12] + for p in positions: + amt = _pf(p, *qty_keys) + qty = abs(amt) + raw_side = str(p.get("positionSide") or p.get("side") or "").upper() + side = TradeSide.SHORT if raw_side in {"SHORT", "SELL"} or amt < 0 else TradeSide.LONG + entry = _pf(p, "entryPrice", "avgPrice", "avgEntryPrice", "ep", "ap", "price") + mark = _pf(p, "markPrice", "mark", "price") or entry + lev = _pf(p, "leverage", "lev") or 1.0 + asset = str(p.get("symbol") or p.get("symbolName") or "").replace("-", "").upper() + if qty <= 0 or not asset: + continue + k.reconcile_from_slots([TradeSlot( + slot_id=0, trade_id=asset, asset=asset, side=side, entry_price=entry or mark, + size=qty, initial_size=qty, leverage=lev, entry_time=datetime.now(timezone.utc), + fsm_state=TradeStage.POSITION_OPEN, metadata={"flatten": True}, + )]) + try: + k.process_intent(KernelIntent( + timestamp=datetime.now(timezone.utc), intent_id=f"flat-{asset}", trade_id=asset, + slot_id=0, asset=asset, side=side, action=KernelCommandType.EXIT, + reference_price=mark, target_size=qty, leverage=lev, exit_leg_ratios=(1.0,), + reason="FLATTEN", metadata={}, + )) + except Exception: + pass + try: + rows = await _contract_rows(client) + for s in {str(r.get("symbol") or "") for r in rows if isinstance(r, dict)}: + if s: + try: + await client._request_json("DELETE", "/openApi/swap/v2/trade/allOpenOrders", {"symbol": s}, signed=True) + except Exception: + pass + except Exception: + pass + await asyncio.sleep(0.8) + + +# -------------------------------------------------------------------------- +# scenario primitives +# -------------------------------------------------------------------------- + +async def _open(runtime, kernel, symbol: str, price: float) -> float: + cap_before = _capital(kernel) + dec = await runtime.step(_snap_signal(symbol, price, vel_div=-0.05)) + assert dec.action == DecisionAction.ENTER, f"expected ENTER, got {dec.action}/{dec.reason}" + assert await _await(kernel, lambda s: s.is_open() and s.size > 0), ( + f"position never opened (state={kernel.slot(0).fsm_state}, size={kernel.slot(0).size})" + ) + assert abs(_capital(kernel) - cap_before) < 1e-6, "entry must not realize PnL / move capital" + slot = kernel.slot(0) + entry = float(slot.entry_price or price) + # invariant 5: notional bound (margin-self-limiting) + notional = float(slot.size) * entry + assert notional <= _capital(kernel) * _MAX_LEVERAGE + 1e-6, ( + f"notional {notional} exceeds margin bound {_capital(kernel) * _MAX_LEVERAGE}" + ) + return entry + + +async def _exit_leg(runtime, kernel, symbol: str, entry_price: float) -> bool: + cap_before = _capital(kernel) + rp_before = _realized(kernel) + size_before = float(kernel.slot(0).size or 0.0) + dec = await runtime.step(_snap_signal(symbol, entry_price * 0.99, vel_div=0.0)) + assert dec.action == DecisionAction.EXIT, f"expected EXIT, got {dec.action}/{dec.reason}" + await _await( + kernel, + lambda s: s.closed or float(s.realized_pnl or 0.0) != rp_before or float(s.size or 0.0) < size_before - 1e-12, + ) + leg_realized = _realized(kernel) - rp_before + # invariant 1: per-fill Δcapital == realized PnL of that fill + assert abs((_capital(kernel) - cap_before) - leg_realized) < 1e-6, ( + f"per-fill mismatch: Δcap={_capital(kernel) - cap_before} realized_leg={leg_realized}" + ) + # Accumulate the cumulative realized across the whole scenario. slot.realized_pnl + # resets on each ENTER (Flaw 13), so multi-cycle reconciliation must sum the + # per-fill deltas, not read the slot's current realized. + runtime.__dict__.setdefault("_realized_legs", []).append(leg_realized) + return kernel.slot(0).closed + + +def _assert_end_invariants(kernel, start_cap: float, total_realized: float, sink: _CaptureSink): + cap = _capital(kernel) + realized = total_realized # cumulative across cycles (slot.realized resets on ENTER) + # invariant 2: capital moved EXACTLY by the sum of per-fill realized PnL — no + # phantom capital movement (entries don't move capital; each exit moves by its + # realized). Flat at end -> no unrealized component. + assert abs((cap - start_cap) - realized) < 1e-6, ( + f"end reconciliation: Δcap={cap - start_cap} Σrealized={realized} (cap={cap} start={start_cap})" + ) + # invariant 4: persistence parity + tes = sink.of("trade_events") + if tes: + assert abs(float(tes[-1]["capital_after"]) - cap) < 1e-6, "trade_events.capital_after != kernel capital" + assert abs(float(tes[-1]["capital_after"]) - float(tes[-1]["capital_before"]) - float(tes[-1]["pnl"])) < 1e-6, ( + "trade_events: capital_after - capital_before != pnl" + ) + aes = sink.of("account_events") + if aes: + assert abs(float(aes[-1]["capital"]) - cap) < 1e-6, "account_events.capital != kernel capital" + legs = sink.of("trade_exit_legs") + if legs: + leg_sum = sum(float(r["pnl_leg"]) for r in legs) + assert abs(leg_sum - realized) < 1e-6, f"Σ trade_exit_legs.pnl_leg {leg_sum} != realized {realized}" + + +# -------------------------------------------------------------------------- +# trading scenarios (SHORT path = PINK policy) +# -------------------------------------------------------------------------- + +async def _sc_round_trip(runtime, kernel, symbol, price): + e = await _open(runtime, kernel, symbol, price) + closed = await _exit_leg(runtime, kernel, symbol, e) + assert closed, "single-leg exit did not close the position" + + +async def _sc_multi_leg(runtime, kernel, symbol, price): + e = await _open(runtime, kernel, symbol, price) + closed1 = await _exit_leg(runtime, kernel, symbol, e) # leg 1 (0.5) + assert not closed1, "first multi-leg exit should not fully close" + closed2 = await _exit_leg(runtime, kernel, symbol, e) # leg 2 (remainder) + assert closed2, "final multi-leg exit must close" + + +async def _sc_sequential(runtime, kernel, symbol, price): + for _ in range(2): + e = await _open(runtime, kernel, symbol, price) + assert await _exit_leg(runtime, kernel, symbol, e), "sequential cycle did not close" + await asyncio.sleep(1.0) + + +async def _sc_exit_then_reentry(runtime, kernel, symbol, price): + e = await _open(runtime, kernel, symbol, price) + assert await _exit_leg(runtime, kernel, symbol, e), "first close failed" + await asyncio.sleep(1.0) + e2 = await _open(runtime, kernel, symbol, price) + assert await _exit_leg(runtime, kernel, symbol, e2), "re-entry close failed" + + +_TRADING_SCENARIOS = { + "round_trip": ((1.0,), _sc_round_trip), + "multi_leg": ((0.5, 1.0), _sc_multi_leg), + "sequential": ((1.0,), _sc_sequential), + "exit_then_reentry": ((1.0,), _sc_exit_then_reentry), +} + + +@pytest.mark.parametrize("name", [ + "round_trip", + pytest.param("multi_leg", marks=pytest.mark.xfail( + reason="KNOWN capital/sizing finding: multi-leg partial reduce-only exits leave a " + "lot-step rounding residual on the venue (kernel believes flat, exchange has a " + "remainder). To be fixed in the capital/sizing-path rework.", + strict=False)), + "sequential", + "exit_then_reentry", +]) +def test_pink_capital(name): + ratios, scenario = _TRADING_SCENARIOS[name] + + async def _run(): + sink = _CaptureSink() + runtime, kernel = _build_runtime(sink, exit_leg_ratios=ratios) + client = BingxHttpClient(_build_config()) + sym = await _pick_sym(kernel, client) + snap, vsym = await _snap(client, sym) + price = float(snap.price) + await _ensure_account_flat(client) # best-effort exchange-truth pre-clean + await runtime.connect(initial_capital=_SEED_CAPITAL) + try: + # connect reconciled any leftover position into the slot; close it via + # the proven kernel path (reliable for the single-symbol residual case). + for _ in range(4): + if kernel.slot(0).is_free(): + break + _flatten(kernel, kernel.slot(0).asset or sym, price, "harness-pre") + await _await(kernel, lambda s: s.is_free(), timeout_s=8) + await _full_flatten(client, vsym) + assert kernel.slot(0).is_free(), ( + f"slot not free after pre-flatten (state={kernel.slot(0).fsm_state})" + ) + runtime.__dict__["_realized_legs"] = [] + start_cap = _capital(kernel) + await scenario(runtime, kernel, sym, price) + total_realized = sum(runtime.__dict__.get("_realized_legs", [])) + _assert_end_invariants(kernel, start_cap, total_realized, sink) + finally: + if not kernel.slot(0).is_free(): + _flatten(kernel, sym, price, "harness-post") + await asyncio.sleep(1.0) + await _full_flatten(client, vsym) + # invariant 3: exchange flat + no dangling orders + vr = await _verify(client, vsym) + assert vr.positions_flat, f"exchange not flat: {vr.error}" + + asyncio.run(_run()) + + +# -------------------------------------------------------------------------- +# guard scenarios (invariant 6) — no live order expected +# -------------------------------------------------------------------------- + +def test_guard_suppressed_nonfinite_capital(): + async def _run(): + sink = _CaptureSink() + runtime, kernel = _build_runtime(sink, capital=float("inf")) + client = BingxHttpClient(_build_config()) + sym = await _pick_sym(kernel, client) + snap, vsym = await _snap(client, sym) + await _ensure_account_flat(client) + await runtime.connect(initial_capital=_SEED_CAPITAL) + kernel.account.snapshot.capital = float("inf") # re-poison after connect seed + dec = await runtime.step(_snap_signal(sym, float(snap.price), vel_div=-0.05)) + assert dec.action == DecisionAction.ENTER # policy decided enter... + assert kernel.slot(0).is_free(), "ENTER must be suppressed on non-finite capital" + vr = await _verify(client, vsym) + assert vr.positions_flat, f"account must be untouched: {vr.error}" + + asyncio.run(_run()) + + +def test_guard_suppressed_subfloor_price(): + async def _run(): + sink = _CaptureSink() + runtime, kernel = _build_runtime(sink) + client = BingxHttpClient(_build_config()) + sym = await _pick_sym(kernel, client) + _, vsym = await _snap(client, sym) + await _ensure_account_flat(client) + await runtime.connect(initial_capital=_SEED_CAPITAL) + await runtime.step(_snap_signal(sym, 1e-12, vel_div=-0.05)) + assert kernel.slot(0).is_free(), "ENTER must be suppressed on sub-floor price" + vr = await _verify(client, vsym) + assert vr.positions_flat, f"account must be untouched: {vr.error}" + + asyncio.run(_run()) + + +def test_guard_degenerate_snapshot_holds(): + async def _run(): + sink = _CaptureSink() + runtime, kernel = _build_runtime(sink) + client = BingxHttpClient(_build_config()) + sym = await _pick_sym(kernel, client) + snap, vsym = await _snap(client, sym) + await _ensure_account_flat(client) + await runtime.connect(initial_capital=_SEED_CAPITAL) + # Degenerate feed (mimics the stddev-NaN data lead): non-finite vel_div. + dec = await runtime.step(_snap_signal(sym, float(snap.price), vel_div=float("nan"))) + assert dec.action != DecisionAction.ENTER, f"degenerate snapshot must not ENTER (got {dec.reason})" + assert kernel.slot(0).is_free() + vr = await _verify(client, vsym) + assert vr.positions_flat, f"account must be untouched: {vr.error}" + + asyncio.run(_run())