#!/usr/bin/env python3 """PINK runtime LIVE integration test — the real Sprint 1/2 closer. The kernel-direct e2e suite (`test_pink_bingx_dita_live_e2e.py`) injects `KernelIntent`s straight into `kernel.process_intent`, so it proves the execution *substrate* (Rust FSM + BingX venue + AccountProjection) but never exercises PINK itself. This test drives the FULL PINK stack against BingX VST: MarketSnapshot -> DecisionEngine (vel_div / fixed-TP policy — algorithmic integrity) -> IntentEngine (sizing + trade identity) -> _decision_to_kernel_intent -> kernel.process_intent (DITAv2 Rust FSM) -> BingX VST venue (real reduce-only MARKET orders) -> AccountProjection.settle (single capital authority) -> PinkClickHousePersistence (dolphin_pink row families — captured here) It forces a deterministic SHORT entry (vel_div below threshold, irp ok) then a fixed-TP exit (price below entry), and asserts: * the REAL policy produced the intents (reasons STRUCTURAL_DISLOCATION / a valid exit reason) — i.e. the policy layer actually ran; * `PinkClickHousePersistence` was invoked through the runtime with the BLUE-compatible row families (policy_events, account_events, position_state, status_snapshots, trade_reconstruction); * capital reconciles EXACTLY to start + Σrealized + Σunrealized (kernel is the single authority — no balance-poll overwrite); * the exchange ends flat with no open orders. NOTE on terminal rows: `PinkDirectRuntime.step()` persists immediately after `process_intent`, before the async venue fill is applied, so the terminal `trade_events` / `trade_exit_legs` rows are *timing-dependent*. This test records (not hard-asserts) their presence; a miss is a genuine runtime persistence-timing finding, not a substrate bug. Sizing: a deliberately tiny `capital_fraction` keeps the live notional ~$20; testnet `sizing_mode` floors it to the exchange minimum (same regime the kernel-direct runs traded safely in). Gates (all required): BINGX_SMOKE_LIVE, BINGX_SMOKE_ALLOW_TRADE, PINK_DITA_E2E, PINK_RUNTHROUGH. Run from repo root with PYTHONPATH=/mnt/dolphinng5_predict. """ from __future__ import annotations import asyncio import os from datetime import datetime, timezone import pytest # ---- env gates (skip cleanly before importing the live harness) ---- for _gate in ("BINGX_SMOKE_LIVE", "BINGX_SMOKE_ALLOW_TRADE", "PINK_DITA_E2E", "PINK_RUNTHROUGH"): if not os.environ.get(_gate): pytest.skip(f"{_gate} not set", allow_module_level=True) # Reuse the proven live plumbing from the kernel-direct harness. from prod.tests.test_pink_bingx_dita_live_e2e import ( # noqa: E402 _build_config, _pick_sym, _snap, _flatten, _verify, ) from prod.bingx.http import BingxHttpClient # noqa: E402 from prod.clean_arch.dita import ( # noqa: E402 DecisionAction, DecisionConfig, DecisionEngine, IntentEngine, ) from prod.clean_arch.dita_v2.launcher import build_launcher_bundle # noqa: E402 from prod.clean_arch.persistence import PinkClickHousePersistence # noqa: E402 from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime # noqa: E402 from prod.clean_arch.ports.data_feed import MarketSnapshot, DataFeedPort # noqa: E402 class _CaptureSink: """Capturing ClickHouse writer — records (table, row) instead of hitting CH.""" def __init__(self) -> None: self.rows: list[tuple[str, dict]] = [] def __call__(self, table: str, row: dict) -> None: self.rows.append((table, dict(row))) def tables(self) -> list[str]: return [t for t, _ in self.rows] def of(self, table: str) -> list[dict]: return [r for t, r in self.rows if t == table] class _StubFeed(DataFeedPort): """Minimal DataFeedPort — snapshots are supplied directly to step().""" async def connect(self) -> bool: return True async def disconnect(self) -> None: pass async def get_latest_snapshot(self, symbol): return None async def subscribe_snapshots(self, callback) -> None: pass async def get_acb_update(self): return None def get_latency_ms(self) -> float: return 0.0 def health_check(self) -> bool: return True def _pink_config() -> DecisionConfig: # PINK semantics (short-only, fixed-TP) but with a tiny capital_fraction so # the live notional stays ~$20 -> floored to exchange min by testnet sizing. return DecisionConfig( vel_div_threshold=-0.02, vel_div_extreme=-0.05, fixed_tp_pct=0.0020, max_hold_bars=250, capital_fraction=2.5e-4, max_leverage=3.0, min_irp_alignment=0.0, allow_long=False, allow_short=True, exit_leg_ratios=(1.0,), policy_version="pink_ditav2_runthrough", ) def _build_pink_runtime(capture: _CaptureSink, ic: float = 25000.0): cfg = _pink_config() bundle = build_launcher_bundle( venue_mode="BINGX", max_slots=1, bingx_config=_build_config(ic) ) k = bundle.kernel k.account.snapshot.capital = ic k.account.snapshot.peak_capital = ic k.account.snapshot.equity = ic persistence = PinkClickHousePersistence(k.account, sink=capture, v7_sink=capture) runtime = PinkDirectRuntime( data_feed=_StubFeed(), kernel=k, decision_engine=DecisionEngine(cfg), intent_engine=IntentEngine(cfg), persistence=persistence, market_state_runtime=None, ) return runtime, k, persistence def _snapshot(symbol: str, price: float, *, vel_div: float) -> MarketSnapshot: return MarketSnapshot( timestamp=datetime.now(timezone.utc), symbol=symbol, price=price, bid=price * 0.999, ask=price * 1.001, eigenvalues=[1.0], # required by MarketSnapshot.is_valid() velocity_divergence=vel_div, irp_alignment=0.5, scan_number=1, source="pink_runthrough_test", ) async def _await_state(k, predicate, *, timeout_s: float = 12.0, step_s: float = 0.5) -> bool: """Poll the slot until predicate(slot) is true (lets async venue fills land).""" waited = 0.0 while waited < timeout_s: if predicate(k.slot(0)): return True await asyncio.sleep(step_s) waited += step_s return predicate(k.slot(0)) async def _drive() -> dict: capture = _CaptureSink() runtime, k, _ = _build_pink_runtime(capture) client = BingxHttpClient(_build_config()) sym = await _pick_sym(k, client) snap, vsym = await _snap(client, sym) price = float(snap.price) assert price > 0, f"no live price for {sym}" await runtime.connect(initial_capital=k.account.snapshot.capital) try: # connect() reconciles exchange positions into the slot; with a flat # account it must be free. If not, the account wasn't flattened. assert k.slot(0).is_free(), ( f"slot not free after connect (state={k.slot(0).fsm_state}); " f"flatten the VST account before running this test" ) start_cap = k.account.snapshot.capital # --- ENTER through the real policy ------------------------------- enter_decision = await runtime.step(_snapshot(sym, price, vel_div=-0.05)) assert enter_decision.action == DecisionAction.ENTER, ( f"policy did not ENTER: {enter_decision.action} ({enter_decision.reason})" ) assert enter_decision.reason == "STRUCTURAL_DISLOCATION", enter_decision.reason opened = await _await_state(k, lambda s: s.is_open() and s.size > 0) assert opened, f"position never opened (state={k.slot(0).fsm_state}, size={k.slot(0).size})" entry_price = float(k.slot(0).entry_price) or price # --- EXIT through the real policy (price below SHORT fixed-TP) ---- exit_decision = await runtime.step(_snapshot(sym, entry_price * 0.99, vel_div=0.0)) assert exit_decision.action == DecisionAction.EXIT, ( f"policy did not EXIT: {exit_decision.action} ({exit_decision.reason})" ) assert exit_decision.reason in ("TAKE_PROFIT", "MEAN_REVERSION", "CATASTROPHIC_LOSS"), exit_decision.reason closed = await _await_state(k, lambda s: s.is_free() or s.closed) assert closed, f"position never closed (state={k.slot(0).fsm_state}, size={k.slot(0).size})" # --- assertions on the integrated path --------------------------- tables = capture.tables() # Deterministic row families (written regardless of fill timing): for fam in ("policy_events", "v7_decision_events", "account_events", "position_state", "status_snapshots"): assert fam in tables, f"missing dolphin_pink row family {fam}; got {sorted(set(tables))}" # Policy actually flowed through persistence: pe = capture.of("policy_events") assert any(r.get("action") == "ENTER" for r in pe), "no ENTER policy_event persisted" assert any(r.get("action") == "EXIT" for r in pe), "no EXIT policy_event persisted" # EXACT capital reconciliation — kernel is the single authority. rp = sum(k.slot(i).realized_pnl for i in range(k.max_slots)) up = sum(k.slot(i).unrealized_pnl for i in range(k.max_slots)) cap = k.account.snapshot.capital assert abs(cap - (start_cap + rp + up)) < 0.01, ( f"capital reconciliation: cap={cap} start={start_cap} rp={rp} up={up} " f"diff={abs(cap - (start_cap + rp + up))}" ) # Exchange flat + no dangling orders (independent signed read). vr = await _verify(client, vsym) assert vr.positions_flat, f"exchange not flat: {vr.error}" # Terminal rows are fill-timing dependent — record, don't hard-fail. terminal = { "trade_events": len(capture.of("trade_events")), "trade_exit_legs": len(capture.of("trade_exit_legs")), "trade_reconstruction": len(capture.of("trade_reconstruction")), } return { "symbol": sym, "entry_price": entry_price, "start_cap": start_cap, "end_cap": cap, "realized": rp, "row_families": sorted(set(tables)), "terminal_rows": terminal, } finally: if not k.slot(0).is_free(): _flatten(k, sym, price * 0.99, "pink-rt-post") await asyncio.sleep(1.0) await runtime.disconnect() def test_pink_runtime_live_integration() -> None: result = asyncio.run(_drive()) # Surface the run summary (incl. terminal-row capture) in test output. print(f"[PINK runthrough] {result}") # Terminal-row capture is informational; flag if the runtime missed them. if result["terminal_rows"]["trade_events"] == 0: print( "[PINK runthrough] NOTE: no terminal trade_events captured — " "PinkDirectRuntime persisted the EXIT before the close fill applied " "(runtime persist-vs-fill timing gap to address)." )