Files
DOLPHIN/nautilus_dolphin/run_hazard_validation.py

299 lines
12 KiB
Python
Raw Permalink Normal View History

import sys, time
from pathlib import Path
import numpy as np
import pandas as pd
from scipy.stats import norm
sys.path.insert(0, str(Path(__file__).parent))
from nautilus_dolphin.nautilus.alpha_orchestrator import NDAlphaEngine
from nautilus_dolphin.nautilus.adaptive_circuit_breaker import AdaptiveCircuitBreaker
from nautilus_dolphin.nautilus.ob_features import OBFeatureEngine
from nautilus_dolphin.nautilus.ob_provider import MockOBProvider
def wilson_ci(k, n, confidence=0.95):
if n == 0: return 0.0, 0.0
z = norm.ppf(1 - (1 - confidence) / 2)
p = k / n
denominator = 1 + z**2/n
centre_adjusted_num = p + z**2 / (2*n)
adjusted_sd = np.sqrt((p*(1-p)/n) + (z**2/(4*n**2)))
lower_bound = (centre_adjusted_num - z*adjusted_sd) / denominator
upper_bound = (centre_adjusted_num + z*adjusted_sd) / denominator
return float(lower_bound), float(upper_bound)
VBT_DIR = Path(r"C:\Users\Lenovo\Documents\- DOLPHIN NG HD HCM TSF Predict\vbt_cache")
parquet_files = sorted(VBT_DIR.glob("*.parquet"))
parquet_files = [p for p in parquet_files if 'catalog' not in str(p)]
print("Loading data & extracting daily metrics...")
daily_metrics = []
for pf in parquet_files:
ds = pf.stem
df = pd.read_parquet(pf)
v50_max = df['v50_lambda_max_velocity'].max() if 'v50_lambda_max_velocity' in df.columns else np.nan
assets = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'SOLUSDT']
valid_assets = [a for a in assets if a in df.columns]
if len(valid_assets) > 1:
rets = df[valid_assets].pct_change().fillna(0)
corr_matrix = rets.corr().values
cross_corr = corr_matrix[np.triu_indices_from(corr_matrix, k=1)]
max_cross_corr = np.nanmax(cross_corr)
else:
max_cross_corr = 0
daily_metrics.append({
'Date': ds,
'v50_max': v50_max,
'cross_corr_max': max_cross_corr,
})
metrics_df = pd.DataFrame(daily_metrics).set_index('Date')
precursor_df = metrics_df.shift(1).dropna()
print("Running fast 6.0x trajectory...")
pq_data = {}
all_vols = []
for pf in parquet_files:
df = pd.read_parquet(pf)
ac = [c for c in df.columns if c not in {'timestamp', 'scan_number', 'v50_lambda_max_velocity', 'v150_lambda_max_velocity',
'v300_lambda_max_velocity', 'v750_lambda_max_velocity', 'vel_div',
'instability_50', 'instability_150'}]
dv = df['vel_div'].values if 'vel_div' in df.columns else np.zeros(len(df))
pq_data[pf.stem] = (df, ac, dv)
acb = AdaptiveCircuitBreaker()
acb.preload_w750([pf.stem for pf in parquet_files])
mock = MockOBProvider(imbalance_bias=-0.09, depth_scale=1.0,
assets=["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT"],
imbalance_biases={"BNBUSDT": 0.20, "SOLUSDT": 0.20})
ob_engine_inst = OBFeatureEngine(mock)
ob_engine_inst.preload_date("mock", mock.get_assets())
ENGINE_KWARGS = dict(
initial_capital=25000.0, vel_div_threshold=-0.02, vel_div_extreme=-0.05,
min_leverage=0.5, max_leverage=6.0, leverage_convexity=3.0,
fraction=0.20, fixed_tp_pct=0.0099, stop_pct=1.0, max_hold_bars=120,
use_direction_confirm=True, dc_lookback_bars=7, dc_min_magnitude_bps=0.75,
dc_skip_contradicts=True, dc_leverage_boost=1.0, dc_leverage_reduce=0.5,
use_asset_selection=True, min_irp_alignment=0.45,
use_sp_fees=True, use_sp_slippage=True,
use_ob_edge=True, ob_edge_bps=5.0, ob_confirm_rate=0.40,
lookback=100, use_alpha_layers=True, use_dynamic_leverage=True, seed=42,
)
engine = NDAlphaEngine(**ENGINE_KWARGS)
engine.set_ob_engine(ob_engine_inst)
daily_returns = {}
bar_idx = 0
all_vols_engine = []
for pf in parquet_files:
ds = pf.stem
cs = engine.capital
acb_info = acb.get_dynamic_boost_for_date(ds, ob_engine=ob_engine_inst)
base_boost = acb_info['boost']
beta = acb_info['beta']
df, acols, dvol_raw = pq_data[ds]
ph = {}
for ri in range(len(df)):
row = df.iloc[ri]
vd = dvol_raw[ri]
if not np.isfinite(vd): bar_idx+=1; continue
prices = {}
for ac in acols:
p = row[ac]
if p and p > 0 and np.isfinite(p):
prices[ac] = float(p)
if ac not in ph: ph[ac] = []
ph[ac].append(float(p))
if len(ph[ac]) > 500: ph[ac] = ph[ac][-200:]
if not prices: bar_idx+=1; continue
btc_hist = ph.get("BTCUSDT", [])
engine_vrok = False
if len(btc_hist) >= 50:
seg = btc_hist[-50:]
vd_eng = float(np.std(np.diff(seg)/np.array(seg[:-1])))
all_vols_engine.append(vd_eng)
if len(all_vols_engine) > 100:
engine_vrok = vd_eng > np.percentile(all_vols_engine, 60)
if beta > 0:
ss = 0.0
if vd < -0.02:
raw = (-0.02 - float(vd)) / (-0.02 - -0.05)
ss = min(1.0, max(0.0, raw)) ** 3.0
engine.regime_size_mult = base_boost * (1.0 + beta * ss)
else:
engine.regime_size_mult = base_boost
engine.process_bar(bar_idx=bar_idx, vel_div=float(vd), prices=prices, vol_regime_ok=engine_vrok, price_histories=ph)
bar_idx += 1
daily_returns[ds] = (engine.capital - cs) / cs if cs > 0 else 0
returns_df = pd.DataFrame.from_dict(daily_returns, orient='index', columns=['Return'])
merged = precursor_df.join(returns_df, how='inner')
threshold_pnl = merged['Return'].quantile(0.10)
merged['Is_Tail'] = merged['Return'] <= threshold_pnl
base_p_tail = merged['Is_Tail'].mean()
def run_mc_sim(baseline_returns, periods=252, n_simulations=1000):
np.random.seed(42)
daily_returns = np.array(baseline_returns)
simulated_returns = np.random.choice(daily_returns, size=(n_simulations, periods), replace=True)
equity_curves = np.cumprod(1.0 + simulated_returns, axis=1)
cagrs = (equity_curves[:, -1] - 1.0) * 100
median_cagr = np.median(cagrs)
p05_cagr = np.percentile(cagrs, 5)
max_dds = np.zeros(n_simulations)
for i in range(n_simulations):
curve = equity_curves[i]
peaks = np.maximum.accumulate(curve)
drawdowns = (peaks - curve) / peaks
max_dds[i] = np.max(drawdowns)
prob_40dd = np.mean(max_dds >= 0.40) * 100
med_max_dd = np.median(max_dds) * 100
return median_cagr, p05_cagr, prob_40dd, med_max_dd
print("\n==========================================================================================")
print(" 2. CORE CONDITIONAL HAZARD CURVE (v50)")
print("==========================================================================================")
percentiles = [75, 85, 90, 95, 97.5, 99]
thresholds = [(p, merged['v50_max'].quantile(p/100.0)) for p in percentiles]
print(f"Baseline P(Tail) = {base_p_tail:.1%} (N={len(merged)}, Tail={merged['Is_Tail'].sum()})")
print(f"\n{'Threshold':<15} | {'N (>Thresh)':<12} | {'Tail Days':<10} | {'P(Tail|v50>X)':<15} | {'95% CI'}")
print("-" * 85)
for p, val in thresholds:
subset = merged[merged['v50_max'] > val]
n_days = len(subset)
k_tails = subset['Is_Tail'].sum()
p_tail = k_tails / n_days if n_days > 0 else 0
ci_low, ci_high = wilson_ci(k_tails, n_days)
print(f"v50 > {p:>4.1f}th | {n_days:<12} | {k_tails:<10} | {p_tail:>13.1%} | [{ci_low:.1%}, {ci_high:.1%}]")
print("\n==========================================================================================")
print(" 3. ECONOMIC VIABILITY TEST (Tapering 5.0x / 0.83x multiplier)")
print("==========================================================================================")
taper_mult = 5.0 / 6.0
print(f"{'Condition':<15} | {'Spike rMean':<12} | {'NonSpk rMean':<12} | {'StDev':<10} | {'Med CAGR':<15} | {'5th CAGR':<15} | {'P(>40% DD)':<12} | {'Med DD':<10}")
print("-" * 115)
base_mc = run_mc_sim(merged['Return'])
print(f"{'Baseline(6.0x)':<15} | {'-':<12} | {'-':<12} | {merged['Return'].std():<10.4f} | {base_mc[0]:>14.1f}% | {base_mc[1]:>14.1f}% | {base_mc[2]:>11.1f}% | {base_mc[3]:>8.1f}%")
for p, val in thresholds:
is_spike = merged['v50_max'] > val
spike_returns = merged[is_spike]['Return']
nonspike_returns = merged[~is_spike]['Return']
tapered_returns = merged['Return'].copy()
tapered_returns[is_spike] = tapered_returns[is_spike] * taper_mult
spike_mean = spike_returns.mean() if len(spike_returns)>0 else 0
non_mean = nonspike_returns.mean() if len(nonspike_returns)>0 else 0
std_taper = tapered_returns.std()
mc = run_mc_sim(tapered_returns)
print(f"v50 > {p:>4.1f}th | {spike_mean:>12.4f} | {non_mean:>12.4f} | {std_taper:<10.4f} | {mc[0]:>14.1f}% | {mc[1]:>14.1f}% | {mc[2]:>11.1f}% | {mc[3]:>8.1f}%")
print("\n==========================================================================================")
print(" 4. STABILITY / OVERFIT CHECK (Split Sample)")
print("==========================================================================================")
half = len(merged) // 2
df_h1 = merged.iloc[:half]
df_h2 = merged.iloc[half:]
def calc_curve(df_, name):
print(f"\n{name} (N={len(df_)}):")
b_tail = df_['Is_Tail'].mean()
print(f"Baseline P(Tail): {b_tail:.1%}")
for p in [75, 90, 95]:
val = df_['v50_max'].quantile(p/100.0)
subset = df_[df_['v50_max'] > val]
k = subset['Is_Tail'].sum()
n = len(subset)
if n > 0: print(f" v50 > {p}th : {k/n:.1%} (N={n})")
calc_curve(df_h1, "First 50%")
calc_curve(df_h2, "Second 50%")
print("\n==========================================================================================")
print(" 5. INTERACTION TEST (v50 & Cross-Corr)")
print("==========================================================================================")
v50_90 = merged['v50_max'].quantile(0.90)
cc_90 = merged['cross_corr_max'].quantile(0.90)
joint_cond = (merged['v50_max'] > v50_90) & (merged['cross_corr_max'] > cc_90)
j_subset = merged[joint_cond]
j_n = len(j_subset)
j_k = j_subset['Is_Tail'].sum()
j_p = j_k / j_n if j_n > 0 else 0
print(f"P(Tail | v50 > 90th AND cross_corr > 90th) = {j_p:.1%} (N={j_n}, Tails={j_k})")
print("\n==========================================================================================")
print(" 6. RANDOMIZATION SANITY CHECK")
print("==========================================================================================")
np.random.seed(42)
shuffled_merged = merged.copy()
shuffled_merged['Is_Tail'] = np.random.permutation(shuffled_merged['Is_Tail'].values)
print(f"{'Threshold':<15} | {'N (>Thresh)':<12} | {'Tail Days':<10} | {'P(Tail|v50>X) (SHUFFLED)':<25}")
for p, val in thresholds:
subset = shuffled_merged[shuffled_merged['v50_max'] > val]
n_days = len(subset)
k_tails = subset['Is_Tail'].sum()
p_tail = k_tails / n_days if n_days > 0 else 0
print(f"v50 > {p:>4.1f}th | {n_days:<12} | {k_tails:<10} | {p_tail:>13.1%}")
print("\n==========================================================================================")
print(" 7. DECISION CRITERIA")
print("==========================================================================================")
p95_val = merged['v50_max'].quantile(0.95)
ss95 = merged[merged['v50_max'] > p95_val]
p_tail_95 = ss95['Is_Tail'].mean() if len(ss95)>0 else 0
freq_95 = len(ss95) / len(merged)
mc_95_taper = run_mc_sim(np.where(merged['v50_max'] > p95_val, merged['Return']*taper_mult, merged['Return']))
c1 = p_tail_95 >= 2.5 * base_p_tail
c2 = freq_95 <= 0.08
c3 = mc_95_taper[1] > base_mc[1]
print(f"1. P(Tail | >95th) >= 2.5x baseline? {c1} ({p_tail_95:.1%} vs {2.5*base_p_tail:.1%})")
print(f"2. Convex acceleration visible? Check manual.")
print(f"3. Spike freq <= 8%? {c2} ({freq_95:.1%} - Using 95th pctile is inherently <= 5%)")
print(f"4. Taper improves 5th pctile CAGR? {c3} ({mc_95_taper[1]:.1f}% vs {base_mc[1]:.1f}%)")
print("\nFINAL VERDICT:")
if c1 and c3:
print("-> True convex hazard. Viable for taper layer.")
else:
print("-> Failed criteria. Weak clustering or non-viability.")