Files
DOLPHIN/nautilus_dolphin/test_vd_exit_calibration.py
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

421 lines
20 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""vel_div Adverse-Turn Exit Calibration
========================================
Runs the champion engine (vd_enabled=False), then post-hoc reconstructs
the vel_div trajectory for every trade and sweeps thresholds.
For each (threshold, debounce_bars) pair:
- % of trades that would trigger early exit
- Sensitivity: % of true losers captured (want HIGH)
- Specificity: % of true winners avoided (want HIGH)
- Separation score: P(loser|triggered) - P(loser|not triggered) (want HIGH)
- avg bars saved for triggered losers (want HIGH)
- avg pnl of triggered vs not-triggered
Output:
run_logs/vd_calib_<ts>.csv — per-trade data
run_logs/vd_sweep_<ts>.csv — full threshold sweep
Printed: top-10 configs by separation score
"""
import sys, time, json, csv
from pathlib import Path
from datetime import datetime
import numpy as np
import pandas as pd
sys.path.insert(0, str(Path(__file__).parent))
print("Compiling numba kernels...")
t0c = time.time()
from nautilus_dolphin.nautilus.alpha_asset_selector import compute_irp_nb, compute_ars_nb, rank_assets_irp_nb
from nautilus_dolphin.nautilus.alpha_bet_sizer import compute_sizing_nb
from nautilus_dolphin.nautilus.alpha_signal_generator import check_dc_nb
from nautilus_dolphin.nautilus.ob_features import (
OBFeatureEngine, compute_imbalance_nb, compute_depth_1pct_nb,
compute_depth_quality_nb, compute_fill_probability_nb, compute_spread_proxy_nb,
compute_depth_asymmetry_nb, compute_imbalance_persistence_nb,
compute_withdrawal_velocity_nb, compute_market_agreement_nb, compute_cascade_signal_nb,
)
from nautilus_dolphin.nautilus.ob_provider import MockOBProvider
_p = np.array([1.0, 2.0, 3.0], dtype=np.float64)
compute_irp_nb(_p, -1); compute_ars_nb(1.0, 0.5, 0.01)
rank_assets_irp_nb(np.ones((10, 2), dtype=np.float64), 8, -1, 5, 500.0, 20, 0.20)
compute_sizing_nb(-0.03, -0.02, -0.05, 3.0, 0.5, 5.0, 0.20, True, True, 0.0,
np.zeros(4, dtype=np.int64), np.zeros(4, dtype=np.int64),
np.zeros(5, dtype=np.float64), 0, -1, 0.01, 0.04)
check_dc_nb(_p, 3, 1, 0.75)
_b = np.array([100.0, 200.0, 300.0, 400.0, 500.0], dtype=np.float64)
_a = np.array([110.0, 190.0, 310.0, 390.0, 510.0], dtype=np.float64)
compute_imbalance_nb(_b, _a); compute_depth_1pct_nb(_b, _a)
compute_depth_quality_nb(210.0, 200.0); compute_fill_probability_nb(1.0)
compute_spread_proxy_nb(_b, _a); compute_depth_asymmetry_nb(_b, _a)
compute_imbalance_persistence_nb(np.array([0.1, -0.1], dtype=np.float64), 2)
compute_withdrawal_velocity_nb(np.array([100.0, 110.0], dtype=np.float64), 1)
compute_market_agreement_nb(np.array([0.1, -0.05], dtype=np.float64), 2)
compute_cascade_signal_nb(np.array([-0.05, -0.15], dtype=np.float64), 2, -0.10)
print(f" JIT: {time.time() - t0c:.1f}s")
from nautilus_dolphin.nautilus.esf_alpha_orchestrator import NDAlphaEngine
from nautilus_dolphin.nautilus.adaptive_circuit_breaker import AdaptiveCircuitBreaker
from mc.mc_ml import DolphinForewarner
# ── Config (identical to main test) ────────────────────────────────────────────
VBT_DIR = Path(r"C:\Users\Lenovo\Documents\- DOLPHIN NG HD HCM TSF Predict\vbt_cache")
META_COLS = {'timestamp', 'scan_number', 'v50_lambda_max_velocity', 'v150_lambda_max_velocity',
'v300_lambda_max_velocity', 'v750_lambda_max_velocity', 'vel_div',
'instability_50', 'instability_150'}
ENGINE_KWARGS = dict(
initial_capital=25000.0, vel_div_threshold=-0.02, vel_div_extreme=-0.05,
min_leverage=0.5, max_leverage=5.0, leverage_convexity=3.0,
fraction=0.20, fixed_tp_pct=0.0099, stop_pct=1.0, max_hold_bars=120,
use_direction_confirm=True, dc_lookback_bars=7, dc_min_magnitude_bps=0.75,
dc_skip_contradicts=True, dc_leverage_boost=1.0, dc_leverage_reduce=0.5,
use_asset_selection=True, min_irp_alignment=0.45,
use_sp_fees=True, use_sp_slippage=True,
sp_maker_entry_rate=0.62, sp_maker_exit_rate=0.50,
use_ob_edge=True, ob_edge_bps=5.0, ob_confirm_rate=0.40,
lookback=100, use_alpha_layers=True, use_dynamic_leverage=True, seed=42,
)
MC_MODELS_DIR = str(Path(r"C:\Users\Lenovo\Documents\- DOLPHIN NG HD HCM TSF Predict\nautilus_dolphin\mc_results\models"))
MC_BASE_CFG = {
'trial_id': 0, 'vel_div_threshold': -0.020, 'vel_div_extreme': -0.050,
'use_direction_confirm': True, 'dc_lookback_bars': 7,
'dc_min_magnitude_bps': 0.75, 'dc_skip_contradicts': True,
'dc_leverage_boost': 1.00, 'dc_leverage_reduce': 0.50,
'vd_trend_lookback': 10, 'min_leverage': 0.50, 'max_leverage': 5.00,
'leverage_convexity': 3.00, 'fraction': 0.20,
'use_alpha_layers': True, 'use_dynamic_leverage': True,
'fixed_tp_pct': 0.0099, 'stop_pct': 1.00, 'max_hold_bars': 120,
'use_sp_fees': True, 'use_sp_slippage': True,
'sp_maker_entry_rate': 0.62, 'sp_maker_exit_rate': 0.50,
'use_ob_edge': True, 'ob_edge_bps': 5.00, 'ob_confirm_rate': 0.40,
'ob_imbalance_bias': -0.09, 'ob_depth_scale': 1.00,
'use_asset_selection': True, 'min_irp_alignment': 0.45, 'lookback': 100,
'acb_beta_high': 0.80, 'acb_beta_low': 0.20, 'acb_w750_threshold_pct': 60,
}
# ── Setup ───────────────────────────────────────────────────────────────────────
print("\nLoading MC-Forewarner...")
forewarner = DolphinForewarner(models_dir=MC_MODELS_DIR)
print(" OK")
parquet_files = sorted(VBT_DIR.glob("*.parquet"))
parquet_files = [p for p in parquet_files if 'catalog' not in str(p)]
acb = AdaptiveCircuitBreaker()
date_strings = [pf.stem for pf in parquet_files]
acb.preload_w750(date_strings)
all_vols = []
for pf in parquet_files[:2]:
df = pd.read_parquet(pf)
if 'BTCUSDT' not in df.columns: continue
pr = df['BTCUSDT'].values
for i in range(60, len(pr)):
seg = pr[max(0, i-50):i]
if len(seg) < 10: continue
v = float(np.std(np.diff(seg)/seg[:-1]))
if v > 0: all_vols.append(v)
vol_p60 = float(np.percentile(all_vols, 60))
pq_data = {}
for pf in parquet_files:
df = pd.read_parquet(pf)
ac = [c for c in df.columns if c not in META_COLS]
bp = df['BTCUSDT'].values if 'BTCUSDT' in df.columns else None
dv = np.full(len(df), np.nan)
if bp is not None:
for i in range(50, len(bp)):
seg = bp[max(0, i-50):i]
if len(seg) < 10: continue
dv[i] = float(np.std(np.diff(seg)/seg[:-1]))
pq_data[pf.stem] = (df, ac, dv)
OB_ASSETS = ["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT"]
_mock_ob = MockOBProvider(
imbalance_bias=-0.09, depth_scale=1.0, assets=OB_ASSETS,
imbalance_biases={"BTCUSDT": -0.086, "ETHUSDT": -0.092,
"BNBUSDT": +0.05, "SOLUSDT": +0.05},
)
ob_eng = OBFeatureEngine(_mock_ob)
ob_eng.preload_date("mock", OB_ASSETS)
# ── Build global_bar_idx → (date_str, row_idx) mapping ─────────────────────────
# Mirrors the _global_bar_idx increment logic in process_day():
# every row increments the counter, including NaN-vd rows.
print("\nBuilding global bar index map...")
gbar_map = {} # global_bar_idx → {'date': str, 'row': int, 'vel_div': float}
g = 0
for pf in parquet_files:
ds = pf.stem
df, _, _ = pq_data[ds]
for ri in range(len(df)):
row = df.iloc[ri]
vd_val = row.get('vel_div')
vd_f = float(vd_val) if (vd_val is not None and np.isfinite(float(vd_val))) else np.nan
gbar_map[g] = {'date': ds, 'row': ri, 'vel_div': vd_f}
g += 1
print(f" Total global bars: {g}")
# ── Engine run (champion exits, vd_enabled=False) ──────────────────────────────
print(f"\n=== Champion run (vd_enabled=False) ===")
t0 = time.time()
engine = NDAlphaEngine(**ENGINE_KWARGS)
engine.set_ob_engine(ob_eng)
engine.set_acb(acb)
engine.set_mc_forewarner(forewarner, MC_BASE_CFG)
engine.set_esoteric_hazard_multiplier(0.0)
for pf in parquet_files:
ds = pf.stem
df, acols, dvol = pq_data[ds]
vol_ok = np.where(np.isfinite(dvol), dvol > vol_p60, False)
engine.process_day(ds, df, acols, vol_regime_ok=vol_ok)
tr = engine.trade_history
elapsed = time.time() - t0
print(f" Done: {elapsed:.1f}s | {len(tr)} trades | ROI={(engine.capital-25000)/25000*100:+.2f}%")
# ── Post-hoc: reconstruct vel_div sequence per trade ───────────────────────────
print("\nReconstructing vel_div trajectories per trade...")
trade_records = []
for t in tr:
# Collect vel_div values from entry_bar to exit_bar (inclusive)
vd_seq = []
for b in range(t.entry_bar, t.exit_bar + 1):
info = gbar_map.get(b)
if info and not np.isnan(info['vel_div']):
vd_seq.append(info['vel_div'])
if not vd_seq:
continue
vd_arr = np.array(vd_seq)
is_winner = t.pnl_absolute > 0
# entry_vel_div not stored in NDTradeRecord — look it up from gbar_map
entry_vd_info = gbar_map.get(t.entry_bar)
entry_vd = entry_vd_info['vel_div'] if entry_vd_info else np.nan
# Basic vel_div stats
rec = {
'trade_id': t.trade_id,
'asset': t.asset,
'exit_reason': t.exit_reason,
'bars_held': t.bars_held,
'pnl_pct': t.pnl_pct * 100,
'pnl_abs': t.pnl_absolute,
'leverage': t.leverage,
'is_winner': is_winner,
'entry_vd': entry_vd,
'exit_vd': vd_arr[-1],
'max_vd': float(np.max(vd_arr)),
'min_vd': float(np.min(vd_arr)),
'mean_vd': float(np.mean(vd_arr)),
'pct_bars_positive_vd': float(np.mean(vd_arr > 0) * 100),
'n_vd_bars': len(vd_arr),
'vd_seq': vd_seq,
}
trade_records.append(rec)
print(f" {len(trade_records)} trades with vel_div data")
# ── Threshold sweep ──────────────────────────────────────────────────────────────
# For each (threshold, debounce): compute separation metrics.
THRESHOLDS = [0.0, 0.005, 0.01, 0.015, 0.02, 0.025, 0.03, 0.04, 0.05, 0.07, 0.10, 0.15, 0.20]
DEBOUNCE = [1, 2, 3, 5, 10, 20]
print(f"\nRunning threshold sweep ({len(THRESHOLDS)} thresholds × {len(DEBOUNCE)} debounces)...")
def first_sustain(vd_seq, threshold, n_consec):
"""Return index of first bar where vd_seq has n_consec consecutive values > threshold.
Returns -1 if never sustained."""
count = 0
for i, v in enumerate(vd_seq):
if v > threshold:
count += 1
if count >= n_consec:
return i - n_consec + 1 # bar where sustained crossing started
else:
count = 0
return -1
sweep_rows = []
for thr in THRESHOLDS:
for deb in DEBOUNCE:
triggered = [] # (is_winner, bars_held, pnl_pct, trigger_bar)
not_triggered = []
for rec in trade_records:
tb = first_sustain(rec['vd_seq'], thr, deb)
if tb >= 0:
triggered.append({
'is_winner': rec['is_winner'],
'bars_held': rec['bars_held'],
'pnl_pct': rec['pnl_pct'],
'trigger_bar': tb,
'bars_saved': rec['bars_held'] - tb,
})
else:
not_triggered.append({
'is_winner': rec['is_winner'],
'pnl_pct': rec['pnl_pct'],
})
n_tot = len(trade_records)
n_trig = len(triggered)
n_ntrig = len(not_triggered)
if n_tot == 0: continue
# True losers / winners
n_losers = sum(1 for r in trade_records if not r['is_winner'])
n_winners = sum(1 for r in trade_records if r['is_winner'])
# Triggered stats
trig_losers = sum(1 for r in triggered if not r['is_winner'])
trig_winners = sum(1 for r in triggered if r['is_winner'])
wr_trig = trig_winners / n_trig * 100 if n_trig > 0 else 0.0
avg_pnl_trig = float(np.mean([r['pnl_pct'] for r in triggered])) if triggered else 0.0
avg_bars_saved = float(np.mean([r['bars_saved'] for r in triggered if r['bars_saved'] > 0])) if triggered else 0.0
# Not-triggered stats
ntrig_losers = sum(1 for r in not_triggered if not r['is_winner'])
wr_ntrig = sum(1 for r in not_triggered if r['is_winner']) / n_ntrig * 100 if n_ntrig > 0 else 0.0
avg_pnl_ntrig = float(np.mean([r['pnl_pct'] for r in not_triggered])) if not_triggered else 0.0
# Sensitivity: % of true losers that triggered (captured)
sensitivity = trig_losers / n_losers * 100 if n_losers > 0 else 0.0
# Specificity: % of true winners NOT triggered (avoided)
specificity = (n_winners - trig_winners) / n_winners * 100 if n_winners > 0 else 0.0
# Precision: % of triggered that are true losers
precision = trig_losers / n_trig * 100 if n_trig > 0 else 0.0
# Separation: how much better wr_ntrig is vs wr_trig (want high)
separation = wr_ntrig - wr_trig
sweep_rows.append({
'threshold': thr, 'debounce': deb,
'pct_triggered': round(n_trig / n_tot * 100, 2),
'sensitivity': round(sensitivity, 2),
'specificity': round(specificity, 2),
'precision': round(precision, 2),
'separation': round(separation, 2),
'wr_triggered': round(wr_trig, 2),
'wr_not_triggered': round(wr_ntrig, 2),
'avg_pnl_triggered': round(avg_pnl_trig, 4),
'avg_pnl_not_triggered': round(avg_pnl_ntrig, 4),
'avg_bars_saved': round(avg_bars_saved, 1),
'n_triggered': n_trig,
'n_not_triggered': n_ntrig,
})
print(f" Sweep complete ({len(sweep_rows)} combos)")
# ── Vel_div distribution summary ────────────────────────────────────────────────
winners = [r for r in trade_records if r['is_winner']]
losers = [r for r in trade_records if not r['is_winner']]
def pct_positive_summary(records, label):
if not records: return
pcts = [r['pct_bars_positive_vd'] for r in records]
max_vds = [r['max_vd'] for r in records]
print(f" {label:8s} n={len(records):5d} "
f"pct_bars_vd>0: mean={np.mean(pcts):5.1f}% p25={np.percentile(pcts,25):5.1f}% p75={np.percentile(pcts,75):5.1f}% "
f"max_vd: mean={np.mean(max_vds):+.4f} p50={np.percentile(max_vds,50):+.4f} p90={np.percentile(max_vds,90):+.4f}")
print(f"\n{''*70}")
print(f" VEL_DIV DISTRIBUTION DURING OPEN TRADES")
print(f"{''*70}")
pct_positive_summary(winners, "WINNERS")
pct_positive_summary(losers, "LOSERS")
print(f" Overall: % trades where vel_div EVER crosses 0: "
f"{sum(1 for r in trade_records if r['max_vd'] > 0)/len(trade_records)*100:.1f}%")
print(f" Overall: % trades where vel_div > +0.02 ever: "
f"{sum(1 for r in trade_records if r['max_vd'] > 0.02)/len(trade_records)*100:.1f}%")
print(f" Overall: % trades where vel_div > +0.05 ever: "
f"{sum(1 for r in trade_records if r['max_vd'] > 0.05)/len(trade_records)*100:.1f}%")
# ASCII histogram of max_vd distribution (winners vs losers)
def ascii_hist(values, lo, hi, bins=16, label=""):
edges = np.linspace(lo, hi, bins+1)
counts, _ = np.histogram(values, bins=edges)
total = max(1, len(values))
bar_max = max(1, max(counts))
width = 20
print(f"\n {label} (n={total})")
for i in range(bins):
bar = '' * int(counts[i] / bar_max * width)
pct = counts[i] / total * 100
print(f" [{edges[i]:+.3f},{edges[i+1]:+.3f}) {bar:<{width}} {pct:4.1f}%")
ascii_hist([r['max_vd'] for r in winners], -0.15, 0.20, bins=14, label="WINNERS max vel_div during trade")
ascii_hist([r['max_vd'] for r in losers], -0.15, 0.20, bins=14, label="LOSERS max vel_div during trade")
# ── Print sweep results: top by separation ──────────────────────────────────────
print(f"\n{''*80}")
print(f" THRESHOLD SWEEP — TOP 20 BY SEPARATION SCORE")
print(f" (separation = WR_not_triggered - WR_triggered; higher = better loser/winner split)")
print(f"{''*80}")
print(f" {'thr':>7} {'deb':>4} {'%trig':>6} {'sens%':>6} {'spec%':>6} {'prec%':>6} {'sep':>6} "
f"{'wr_t%':>6} {'wr_nt%':>7} {'bars_sv':>8} {'avg_pnl_t':>10}")
print(f"{''*80}")
top20 = sorted(sweep_rows, key=lambda x: x['separation'], reverse=True)[:20]
for r in top20:
print(f" {r['threshold']:7.3f} {r['debounce']:4d} {r['pct_triggered']:6.1f} "
f"{r['sensitivity']:6.1f} {r['specificity']:6.1f} {r['precision']:6.1f} "
f"{r['separation']:6.2f} {r['wr_triggered']:6.1f} {r['wr_not_triggered']:7.1f} "
f"{r['avg_bars_saved']:8.1f} {r['avg_pnl_triggered']:10.4f}")
# Also print the Pareto frontier: best separation at each pct_triggered level
print(f"\n{''*80}")
print(f" PARETO FRONTIER (best separation per coverage bracket)")
print(f"{''*80}")
brackets = [(0,5), (5,10), (10,20), (20,30), (30,50), (50,100)]
for lo, hi in brackets:
subset = [r for r in sweep_rows if lo <= r['pct_triggered'] < hi]
if not subset: continue
best = max(subset, key=lambda x: x['separation'])
print(f" Coverage {lo:3d}-{hi:3d}%: thr={best['threshold']:.3f} deb={best['debounce']} "
f"sep={best['separation']:.2f} sens={best['sensitivity']:.1f}% spec={best['specificity']:.1f}% "
f"prec={best['precision']:.1f}% bars_saved={best['avg_bars_saved']:.1f}")
# ── Save logs ───────────────────────────────────────────────────────────────────
LOG_DIR = Path(__file__).parent / "run_logs"
LOG_DIR.mkdir(exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
# Per-trade CSV (without raw vd_seq — too large)
calib_path = LOG_DIR / f"vd_calib_{ts}.csv"
with open(calib_path, 'w', newline='') as f:
cw = csv.writer(f)
cw.writerow(['trade_id','asset','exit_reason','bars_held','pnl_pct','pnl_abs',
'leverage','is_winner','entry_vd','exit_vd','max_vd','min_vd','mean_vd',
'pct_bars_positive_vd','n_vd_bars'])
for r in trade_records:
cw.writerow([r['trade_id'], r['asset'], r['exit_reason'], r['bars_held'],
f"{r['pnl_pct']:.6f}", f"{r['pnl_abs']:.4f}", f"{r['leverage']:.4f}",
int(r['is_winner']), f"{r['entry_vd']:.6f}", f"{r['exit_vd']:.6f}",
f"{r['max_vd']:.6f}", f"{r['min_vd']:.6f}", f"{r['mean_vd']:.6f}",
f"{r['pct_bars_positive_vd']:.2f}", r['n_vd_bars']])
# Sweep CSV
sweep_path = LOG_DIR / f"vd_sweep_{ts}.csv"
with open(sweep_path, 'w', newline='') as f:
cw = csv.writer(f)
cw.writerow(['threshold','debounce','pct_triggered','sensitivity','specificity',
'precision','separation','wr_triggered','wr_not_triggered',
'avg_pnl_triggered','avg_pnl_not_triggered','avg_bars_saved',
'n_triggered','n_not_triggered'])
for r in sweep_rows:
cw.writerow([r['threshold'], r['debounce'], r['pct_triggered'],
r['sensitivity'], r['specificity'], r['precision'],
r['separation'], r['wr_triggered'], r['wr_not_triggered'],
r['avg_pnl_triggered'], r['avg_pnl_not_triggered'],
r['avg_bars_saved'], r['n_triggered'], r['n_not_triggered']])
print(f"\n{''*80}")
print(f" per-trade → {calib_path} ({len(trade_records)} rows)")
print(f" sweep → {sweep_path} ({len(sweep_rows)} rows)")
print(f"{''*80}")