From 10a44d86b14b6a37b229ef519b6aefcc868cdccd Mon Sep 17 00:00:00 2001 From: Codex Date: Sat, 6 Jun 2026 11:10:49 +0200 Subject: [PATCH] PINK: wire CH persistence to monitor + add missing friction DDL - migrate_pink_sink_schema.sql: ALTER TABLE adds fee/fee_source/is_maker/ slippage_bps/mark_at_submit/exchange_ts to trade_events and trade_exit_legs; CREATE TABLE fee_settled_events (was missing entirely). DDL already applied. - monitor_pink.py: wire real PinkClickHousePersistence so monitor roundtrips write to dolphin_pink CH tables. Adds _make_decision_intent() helper; calls persist_step() after ENTER and EXIT. Persistence failure is non-fatal (warns and continues). 42 persistence tests green. Co-Authored-By: Claude Sonnet 4.6 --- .../dita_v2/migrate_pink_sink_schema.sql | 38 ++ prod/clean_arch/dita_v2/monitor_pink.py | 332 ++++++++++++++++++ 2 files changed, 370 insertions(+) create mode 100644 prod/clean_arch/dita_v2/migrate_pink_sink_schema.sql create mode 100644 prod/clean_arch/dita_v2/monitor_pink.py diff --git a/prod/clean_arch/dita_v2/migrate_pink_sink_schema.sql b/prod/clean_arch/dita_v2/migrate_pink_sink_schema.sql new file mode 100644 index 0000000..886d641 --- /dev/null +++ b/prod/clean_arch/dita_v2/migrate_pink_sink_schema.sql @@ -0,0 +1,38 @@ +-- DITAv2 PINK sink schema migration +-- Adds Gap 1/2/3 friction columns to trade_events + trade_exit_legs, +-- and creates the missing fee_settled_events table. +-- Safe to re-run: ADD COLUMN IF NOT EXISTS + CREATE TABLE IF NOT EXISTS. + +-- ── trade_events ───────────────────────────────────────────────────────────── +ALTER TABLE dolphin_pink.trade_events + ADD COLUMN IF NOT EXISTS fee Float64 DEFAULT 0, + ADD COLUMN IF NOT EXISTS fee_source LowCardinality(String) DEFAULT '', + ADD COLUMN IF NOT EXISTS is_maker UInt8 DEFAULT 0, + ADD COLUMN IF NOT EXISTS slippage_bps Float32 DEFAULT 0, + ADD COLUMN IF NOT EXISTS mark_at_submit Float64 DEFAULT 0, + ADD COLUMN IF NOT EXISTS exchange_ts Int64 DEFAULT 0; + +-- ── trade_exit_legs ────────────────────────────────────────────────────────── +ALTER TABLE dolphin_pink.trade_exit_legs + ADD COLUMN IF NOT EXISTS fee_leg Float64 DEFAULT 0, + ADD COLUMN IF NOT EXISTS fee_source LowCardinality(String) DEFAULT '', + ADD COLUMN IF NOT EXISTS is_maker UInt8 DEFAULT 0, + ADD COLUMN IF NOT EXISTS slippage_bps Float32 DEFAULT 0; + +-- ── fee_settled_events (new table) ────────────────────────────────────────── +CREATE TABLE IF NOT EXISTS dolphin_pink.fee_settled_events +( + ts DateTime64(6, 'UTC'), + trade_id String DEFAULT '', + fee Float64 DEFAULT 0, + fee_asset LowCardinality(String) DEFAULT 'USDT', + fee_source LowCardinality(String) DEFAULT 'WS_SETTLED', + is_maker UInt8 DEFAULT 0, + exchange_ts Int64 DEFAULT 0, + realized_pnl_delta Float64 DEFAULT 0, + runtime_namespace LowCardinality(String) DEFAULT '', + strategy LowCardinality(String) DEFAULT '' +) +ENGINE = MergeTree() +PARTITION BY toYYYYMM(ts) +ORDER BY (trade_id, ts); diff --git a/prod/clean_arch/dita_v2/monitor_pink.py b/prod/clean_arch/dita_v2/monitor_pink.py new file mode 100644 index 0000000..c19bdc6 --- /dev/null +++ b/prod/clean_arch/dita_v2/monitor_pink.py @@ -0,0 +1,332 @@ +#!/usr/bin/env python3 +"""PINK VST live monitoring — continuous ENTER/EXIT rounds with full trade logging. + +Runs N kernel roundtrips on BingX VST and streams each trade to the console. +Use Ctrl+C to stop at any time; the monitor will issue a final flatten before exit. + +Usage: + PYTHONPATH=/mnt/dolphinng5_predict python monitor_pink.py [--rounds N] [--flatten] +""" +from __future__ import annotations + +import argparse +import asyncio +import logging +import os +import signal +import sys +import time +from datetime import datetime, timezone +from pathlib import Path +from types import SimpleNamespace + +ROOT = Path(__file__).resolve().parents[3] +sys.path.insert(0, str(ROOT)) +from dotenv import load_dotenv +load_dotenv(ROOT / ".env") + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)-7s %(name)s — %(message)s", + datefmt="%H:%M:%S", +) +# Quiet httpx noise +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("ch_writer").setLevel(logging.WARNING) +log = logging.getLogger("pink_monitor") + +_stop = False + +def _handle_signal(sig, frame): + global _stop + log.warning("Caught signal %s — stopping after current round", sig) + _stop = True + +signal.signal(signal.SIGINT, _handle_signal) +signal.signal(signal.SIGTERM, _handle_signal) + + +async def _flatten_account(adapter): + """Close all open positions on BingX VST.""" + try: + state = await adapter.refresh_state(None, include_history=False) + positions = state.open_positions + if not positions: + log.info(" FLATTEN: account already flat") + return + client = adapter._client + for sym_key, pos in positions.items(): + amt = float(pos.get("positionAmt") or pos.get("positionQty") or 0) + if abs(amt) < 1e-9: + continue + venue_sym = str(pos.get("symbol") or sym_key) + pos_side = str(pos.get("positionSide") or "BOTH").upper() + close_side = "BUY" if (amt < 0 or pos_side == "SHORT") else "SELL" + log.info(" FLATTEN: closing %s qty=%.4f via %s", venue_sym, abs(amt), close_side) + try: + await client.signed_post("/openApi/swap/v2/trade/order", { + "symbol": venue_sym, + "side": close_side, + "positionSide": "BOTH", + "type": "MARKET", + "quantity": str(abs(amt)), + "reduceOnly": "true", + }) + except Exception as e: + log.warning(" FLATTEN failed for %s: %s", venue_sym, e) + await asyncio.sleep(1.5) + state2 = await adapter.refresh_state(None, include_history=False) + remaining = {s: r for s, r in state2.open_positions.items() + if abs(float(r.get("positionAmt") or r.get("positionQty") or 0)) > 1e-8} + if remaining: + log.warning(" FLATTEN: still open: %s", list(remaining.keys())) + else: + log.info(" FLATTEN: account flat ✓") + except Exception as e: + log.error(" FLATTEN error: %s", e) + + +def _make_decision_intent(ts, tid, asset, action, price, size): + """Build minimal Decision+Intent pair for persistence from raw monitor round data.""" + from prod.clean_arch.dita import Decision, Intent, TradeSide, TradeStage + decision = Decision( + timestamp=ts, decision_id=tid, asset=asset, + action=action, side=TradeSide.SHORT, + reason=f"monitor_{action.value.lower()}", confidence=0.0, + velocity_divergence=0.0, irp_alignment=0.0, + reference_price=price, target_size=size, leverage=1.0, + bars_held=0, stage=TradeStage.ORDER_REQUESTED, metadata={}, + ) + intent = Intent( + timestamp=ts, trade_id=tid, decision_id=tid, asset=asset, + action=action, side=TradeSide.SHORT, + reason=f"monitor_{action.value.lower()}", target_size=size, leverage=1.0, + reference_price=price, confidence=0.0, exit_leg_ratios=(1.0,), + bars_held=0, stage=TradeStage.ORDER_REQUESTED, metadata={}, + ) + return decision, intent + + +async def _run_one_round(adapter, k, bundle, round_num: int, asset: str, price: float, size: float, + pers=None): + """Execute one ENTER → EXIT roundtrip and return (enter_ms, exit_ms, pnl, success).""" + from prod.clean_arch.dita_v2.contracts import ( + KernelCommandType as KC, KernelIntent as KI, + TradeSide as TS, TradeStage, + ) + from prod.clean_arch.dita import DecisionAction + + tid = f"mon-{round_num}-{int(time.time()*1000)}" + ts = datetime.now(timezone.utc) + + # ── ENTER ────────────────────────────────────────────────────────────────── + log.info("Round %d | ENTER SHORT %s @ %.6f size=%.1f", round_num, asset, price, size) + t0 = time.perf_counter() + enter_outcome = await k.process_intent_async(KI( + timestamp=ts, + intent_id=tid, trade_id=tid, slot_id=0, + asset=asset, side=TS.SHORT, action=KC.ENTER, + reference_price=price, target_size=size, leverage=1.0, + exit_leg_ratios=(1.0,), reason="monitor_enter", metadata={}, + )) + dt_enter = (time.perf_counter() - t0) * 1000 + + # accepted=True + state=IDLE = order processed but rejected by exchange (e.g. "Event loop + # is closed" or BingX business reject). accepted=True + state=POSITION_OPEN = real success. + if not enter_outcome.accepted or str(enter_outcome.state) in ("TradeStage.IDLE", "IDLE"): + log.error(" ENTER FAILED state=%s diag=%s — skipping EXIT", + enter_outcome.state, enter_outcome.diagnostic_code) + return None, None, None, False + + log.info(" ENTER OK state=%-20s latency=%.0fms", enter_outcome.state, dt_enter) + + if pers is not None: + try: + snap = SimpleNamespace(timestamp=ts, price=price, symbol=asset) + dec, intent = _make_decision_intent(ts, tid, asset, DecisionAction.ENTER, price, size) + pers.persist_step( + snapshot=snap, decision=dec, intent=intent, outcome=enter_outcome, + slot_dict=k.slot(0).to_dict(), acc_dict=k.snapshot()["account"], + phase="execution", + ) + except Exception as _pe: + log.warning("persist ENTER failed (non-fatal): %s", _pe) + + await asyncio.sleep(1.5) # let S2 refresh settle + + # refresh price for exit + try: + client = adapter._client + ticker = await client.public_get("/openApi/swap/v2/quote/price", {"symbol": asset}) + if isinstance(ticker, dict): + live_price = float(ticker.get("price") or price) + else: + live_price = price + except Exception: + live_price = price * 0.998 + + # ── EXIT ─────────────────────────────────────────────────────────────────── + ts_exit = datetime.now(timezone.utc) + log.info(" EXIT SHORT @ %.6f", live_price) + t1 = time.perf_counter() + exit_outcome = await k.process_intent_async(KI( + timestamp=ts_exit, + intent_id=tid, trade_id=tid, slot_id=0, + asset=asset, side=TS.SHORT, action=KC.EXIT, + reference_price=live_price, target_size=size, leverage=1.0, + exit_leg_ratios=(1.0,), reason="monitor_exit", metadata={}, + )) + dt_exit = (time.perf_counter() - t1) * 1000 + + if not exit_outcome.accepted: + log.error(" EXIT REJECTED (diag=%s)", exit_outcome.diagnostic_code) + return dt_enter, None, None, False + + slot = k.slot(0).to_dict() + pnl = float(slot.get("realized_pnl") or 0.0) + log.info( + " EXIT OK state=%-20s latency=%.0fms pnl=%.6f", + exit_outcome.state, dt_exit, pnl, + ) + + if pers is not None: + try: + snap_x = SimpleNamespace(timestamp=ts_exit, price=live_price, symbol=asset) + dec_x, intent_x = _make_decision_intent( + ts_exit, tid, asset, DecisionAction.EXIT, live_price, size, + ) + pers.persist_step( + snapshot=snap_x, decision=dec_x, intent=intent_x, outcome=exit_outcome, + slot_dict=slot, acc_dict=k.snapshot()["account"], + phase="execution", + ) + except Exception as _pe: + log.warning("persist EXIT failed (non-fatal): %s", _pe) + + return dt_enter, dt_exit, pnl, True + + +async def main(rounds: int, do_flatten: bool) -> None: + from prod.clean_arch.adapters.bingx_direct import BingxDirectExecutionAdapter + from prod.clean_arch.dita_v2.launcher import ( + build_launcher_bundle, LauncherVenueMode, build_bingx_exec_client_config, + ) + from prod.bingx.config import BingxEnvironment + + cfg = build_bingx_exec_client_config() + adapter = BingxDirectExecutionAdapter(cfg) + bundle = build_launcher_bundle( + max_slots=1, + venue_mode=LauncherVenueMode.BINGX, + bingx_backend=adapter, + ) + k = bundle.kernel + + # Connect using the async path (avoids event-loop hygiene bug) + try: + await adapter.connect() + log.info("Venue connected (async path)") + except Exception as e: + log.warning("Venue connect warning: %s", e) + + # Wire real CH persistence so trades fire to data sinks. + pers = None + try: + from prod.clean_arch.dita_v2.account import AccountProjection + from prod.clean_arch.persistence import ( + PinkClickHousePersistence, PinkClickHousePersistenceConfig, + ) + initial_capital = float( + k.snapshot().get("account", {}).get("capital", 10000.0) or 10000.0 + ) + acc_proj = AccountProjection() + acc_proj.snapshot.capital = initial_capital + acc_proj.snapshot.peak_capital = initial_capital + pers = PinkClickHousePersistence( + account=acc_proj, + config=PinkClickHousePersistenceConfig( + strategy="pink", + runtime_namespace="dita_v2", + strategy_namespace="dita_v2", + event_namespace="dita_v2", + initial_capital=initial_capital, + ), + kernel=k, + ) + log.info("Persistence wired — trades will write to dolphin_pink CH tables") + except Exception as _pe: + log.warning("Persistence init failed (non-fatal, runs without CH writes): %s", _pe) + + if do_flatten: + log.info("── PRE-FLATTEN ──────────────────────────────────────────────") + await _flatten_account(adapter) + + # Fetch live price — use 10 units (confirmed minimum that works on BingX VST) + client = adapter._client + asset = "TRX-USDT" + price = 0.32 + size = 10.0 # 10 units matches flat_and_start_pink.py minimum known-good size + try: + ticker = await client.public_get("/openApi/swap/v2/quote/price", {"symbol": asset}) + if isinstance(ticker, dict): + price = float(ticker.get("price") or price) + except Exception as e: + log.warning("Price fetch failed: %s — using defaults", e) + + log.info("Symbol: %s price=%.6f size=%.1f", asset, price, size) + log.info("Starting %d rounds. Ctrl+C to stop early.\n", rounds) + + results = [] + for i in range(1, rounds + 1): + if _stop: + log.info("Stopping at user request before round %d", i) + break + + log.info("─── Round %d / %d ───────────────────────────────────────────", i, rounds) + # Refresh price each round + try: + ticker = await client.public_get("/openApi/swap/v2/quote/price", {"symbol": asset}) + if isinstance(ticker, dict): + price = float(ticker.get("price") or price) + except Exception: + pass + + dt_enter, dt_exit, pnl, success = await _run_one_round( + adapter, k, bundle, i, asset, price, size, pers=pers, + ) + results.append((i, dt_enter, dt_exit, pnl, success)) + + if not success: + log.warning("Round %d failed — pausing 3s then retrying with fresh state", i) + await asyncio.sleep(3) + else: + await asyncio.sleep(2) # brief pause between rounds + + # ── Summary ──────────────────────────────────────────────────────────────── + log.info("\n══ SUMMARY (%d rounds) ══════════════════════════════════════════", len(results)) + ok = [r for r in results if r[4]] + fail = [r for r in results if not r[4]] + if ok: + avg_enter = sum(r[1] for r in ok if r[1]) / len(ok) + avg_exit = sum(r[2] for r in ok if r[2]) / len(ok) + total_pnl = sum(r[3] for r in ok if r[3] is not None) + log.info( + " Completed: %d/%d avg_enter=%.0fms avg_exit=%.0fms total_pnl=%.6f", + len(ok), len(results), avg_enter, avg_exit, total_pnl, + ) + if fail: + log.warning(" Failed rounds: %s", [r[0] for r in fail]) + + if do_flatten or _stop: + log.info("── FINAL FLATTEN ────────────────────────────────────────────") + await _flatten_account(adapter) + + bundle.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--rounds", type=int, default=5, help="Number of ENTER/EXIT roundtrips") + parser.add_argument("--flatten", action="store_true", help="Flatten positions before start") + args = parser.parse_args() + asyncio.run(main(args.rounds, args.flatten))