"""MacroPostureSwitcher — Live Stack Test (5y Klines, READ-ONLY) ================================================================ Injects MacroPostureSwitcher as a per-day gate into the EXACT same full stack used by test_pf_5y_klines.py. Changes vs baseline: 1. Pass 0: compute prev-day rvol + btc_ret (lag-1, no lookahead) 2. MacroPostureSwitcher.decide() per day from ACB ExF factors + prev rvol 3. NONE days: skip engine.process_day() entirely 4. SHORT/LONG days: run normally (engine is SHORT-only, LONG treated as SHORT) 5. Results saved separately — original test_pf_5y_klines.py is NOT modified Purpose: quantify how much the posture gate improves the 5y full-stack result. """ import sys, time, math, json, csv, gc sys.stdout.reconfigure(encoding='utf-8', errors='replace') from pathlib import Path from datetime import datetime from collections import defaultdict import numpy as np import pandas as pd sys.path.insert(0, str(Path(__file__).parent)) print("Compiling numba kernels...") t0c = time.time() from nautilus_dolphin.nautilus.alpha_asset_selector import compute_irp_nb, compute_ars_nb, rank_assets_irp_nb from nautilus_dolphin.nautilus.alpha_bet_sizer import compute_sizing_nb from nautilus_dolphin.nautilus.alpha_signal_generator import check_dc_nb from nautilus_dolphin.nautilus.ob_features import ( OBFeatureEngine, compute_imbalance_nb, compute_depth_1pct_nb, compute_depth_quality_nb, compute_fill_probability_nb, compute_spread_proxy_nb, compute_depth_asymmetry_nb, compute_imbalance_persistence_nb, compute_withdrawal_velocity_nb, compute_market_agreement_nb, compute_cascade_signal_nb, ) from nautilus_dolphin.nautilus.ob_provider import MockOBProvider _p = np.array([1.0, 2.0, 3.0], dtype=np.float64) compute_irp_nb(_p, -1); compute_ars_nb(1.0, 0.5, 0.01) rank_assets_irp_nb(np.ones((10, 2), dtype=np.float64), 8, -1, 5, 500.0, 20, 0.20) compute_sizing_nb(-0.03, -0.02, -0.05, 3.0, 0.5, 5.0, 0.20, True, True, 0.0, np.zeros(4, dtype=np.int64), np.zeros(4, dtype=np.int64), np.zeros(5, dtype=np.float64), 0, -1, 0.01, 0.04) check_dc_nb(_p, 3, 1, 0.75) _b = np.array([100.0, 200.0, 300.0, 400.0, 500.0], dtype=np.float64) _a = np.array([110.0, 190.0, 310.0, 390.0, 510.0], dtype=np.float64) compute_imbalance_nb(_b, _a); compute_depth_1pct_nb(_b, _a) compute_depth_quality_nb(210.0, 200.0); compute_fill_probability_nb(1.0) compute_spread_proxy_nb(_b, _a); compute_depth_asymmetry_nb(_b, _a) compute_imbalance_persistence_nb(np.array([0.1, -0.1], dtype=np.float64), 2) compute_withdrawal_velocity_nb(np.array([100.0, 110.0], dtype=np.float64), 1) compute_market_agreement_nb(np.array([0.1, -0.05], dtype=np.float64), 2) compute_cascade_signal_nb(np.array([-0.05, -0.15], dtype=np.float64), 2, -0.10) print(f" JIT: {time.time() - t0c:.1f}s") from nautilus_dolphin.nautilus.esf_alpha_orchestrator import NDAlphaEngine from nautilus_dolphin.nautilus.adaptive_circuit_breaker import AdaptiveCircuitBreaker from nautilus_dolphin.nautilus.macro_posture_switcher import MacroPostureSwitcher, Posture from mc.mc_ml import DolphinForewarner VBT_DIR = Path(r"C:\Users\Lenovo\Documents\- DOLPHIN NG HD HCM TSF Predict\vbt_cache_klines") LOG_DIR = Path(r"C:\Users\Lenovo\Documents\- DOLPHIN NG HD HCM TSF Predict\nautilus_dolphin\run_logs") META_COLS = {'timestamp', 'scan_number', 'v50_lambda_max_velocity', 'v150_lambda_max_velocity', 'v300_lambda_max_velocity', 'v750_lambda_max_velocity', 'vel_div', 'instability_50', 'instability_150'} ENGINE_KWARGS = dict( initial_capital=25000.0, vel_div_threshold=-0.02, vel_div_extreme=-0.05, min_leverage=0.5, max_leverage=5.0, leverage_convexity=3.0, fraction=0.20, fixed_tp_pct=0.0095, stop_pct=1.0, max_hold_bars=120, use_direction_confirm=True, dc_lookback_bars=7, dc_min_magnitude_bps=0.75, dc_skip_contradicts=True, dc_leverage_boost=1.0, dc_leverage_reduce=0.5, use_asset_selection=True, min_irp_alignment=0.45, use_sp_fees=True, use_sp_slippage=True, sp_maker_entry_rate=0.62, sp_maker_exit_rate=0.50, use_ob_edge=True, ob_edge_bps=5.0, ob_confirm_rate=0.40, lookback=100, use_alpha_layers=True, use_dynamic_leverage=True, seed=42, ) MC_MODELS_DIR = str(Path(r"C:\Users\Lenovo\Documents\- DOLPHIN NG HD HCM TSF Predict\nautilus_dolphin\mc_results\models")) MC_BASE_CFG = { 'trial_id': 0, 'vel_div_threshold': -0.020, 'vel_div_extreme': -0.050, 'use_direction_confirm': True, 'dc_lookback_bars': 7, 'dc_min_magnitude_bps': 0.75, 'dc_skip_contradicts': True, 'dc_leverage_boost': 1.00, 'dc_leverage_reduce': 0.50, 'vd_trend_lookback': 10, 'min_leverage': 0.50, 'max_leverage': 5.00, 'leverage_convexity': 3.00, 'fraction': 0.20, 'use_alpha_layers': True, 'use_dynamic_leverage': True, 'fixed_tp_pct': 0.0095, 'stop_pct': 1.00, 'max_hold_bars': 120, 'use_sp_fees': True, 'use_sp_slippage': True, 'sp_maker_entry_rate': 0.62, 'sp_maker_exit_rate': 0.50, 'use_ob_edge': True, 'ob_edge_bps': 5.00, 'ob_confirm_rate': 0.40, 'ob_imbalance_bias': -0.09, 'ob_depth_scale': 1.00, 'use_asset_selection': True, 'min_irp_alignment': 0.45, 'lookback': 100, 'acb_beta_high': 0.80, 'acb_beta_low': 0.20, 'acb_w750_threshold_pct': 60, } # ── MacroPostureSwitcher ─────────────────────────────────────────────────────── # 1m-calibrated thresholds (default dvol_none_below=47.5 already validated) switcher = MacroPostureSwitcher( enable_long_posture=True, # Use 1m rvol thresholds (klines data) rvol_pause_thresh=0.000455, # 1m Q1 rvol_strong_thresh=0.000815, # 1m Q3 dvol_none_below=47.5, # validated — removes dvol Q1 losing days ) print(f"\nMacroPostureSwitcher: dvol_none_below={switcher.dvol_none_below} " f"rvol_pause={switcher.rvol_pause_thresh}") print("\nLoading MC-Forewarner trained models...") try: forewarner = DolphinForewarner(models_dir=MC_MODELS_DIR) use_mc = True print(" MC-Forewarner ready") except Exception as e: print(f" WARNING: MC-Forewarner failed: {e} — disabled") forewarner = None use_mc = False parquet_files = sorted(VBT_DIR.glob("*.parquet")) parquet_files = [p for p in parquet_files if 'catalog' not in str(p)] total_files = len(parquet_files) date_strings = [pf.stem for pf in parquet_files] print(f"\nLoaded {total_files} parquet files from vbt_cache_klines/") # ── Pass 0: prev-day rvol and btc_ret (lag-1, no lookahead) ────────────────── print("\nPass 0: computing prev-day rvol + btc_ret...") t0p = time.time() day_rvol = {} day_btcret = {} for pf in parquet_files: ds = pf.stem try: df = pd.read_parquet(pf, columns=['BTCUSDT']) except Exception: continue btc = df['BTCUSDT'].values.astype(np.float64) btc = btc[np.isfinite(btc) & (btc > 0)] if len(btc) < 2: continue log_r = np.diff(np.log(btc)) day_rvol[ds] = float(np.std(log_r)) day_btcret[ds] = float((btc[-1] - btc[0]) / btc[0]) dates_sorted = sorted(day_rvol.keys()) prev_rvol = {d: day_rvol.get(dates_sorted[i-1]) if i > 0 else None for i, d in enumerate(dates_sorted)} prev_btcret = {d: day_btcret.get(dates_sorted[i-1]) if i > 0 else None for i, d in enumerate(dates_sorted)} print(f" Pass 0 done: {time.time()-t0p:.1f}s ({len(day_rvol)} dates with rvol)") # ── Setup (identical to test_pf_5y_klines.py) ──────────────────────────────── print(f"\nScanning {min(100, total_files)} parquet files for asset universe...") all_assets = set() for pf in parquet_files[:100]: df_cols = pd.read_parquet(pf, columns=[]) all_assets.update([c for c in df_cols.columns if c not in META_COLS]) print(f" Total assets: {len(all_assets)}") print("\nInitializing ACB v6 + pre-populating w750...") acb = AdaptiveCircuitBreaker() t_w750 = time.time() w750_loaded = 0 for pf in parquet_files: ds = pf.stem try: df_w = pd.read_parquet(pf, columns=['v750_lambda_max_velocity']) vals = df_w['v750_lambda_max_velocity'].dropna().values if len(vals) > 0: acb._w750_vel_cache[ds] = float(np.median(vals)) w750_loaded += 1 else: acb._w750_vel_cache[ds] = 0.0 except Exception: acb._w750_vel_cache[ds] = 0.0 acb.preload_w750(date_strings) print(f" w750: {w750_loaded}/{total_files} dates threshold={acb._w750_threshold:.6f} ({time.time()-t_w750:.1f}s)") print("\nComputing vol_p60...") all_vols = [] for pf in parquet_files[:5]: df = pd.read_parquet(pf) if 'BTCUSDT' not in df.columns: continue pr = df['BTCUSDT'].values for j in range(50, len(pr)): seg = pr[max(0, j-50):j] if len(seg) < 10: continue v = float(np.std(np.diff(seg) / seg[:-1])) if v > 0: all_vols.append(v) vol_p60 = float(np.percentile(all_vols, 60)) print(f" vol_p60: {vol_p60:.6f}") OB_ASSETS = sorted(list(all_assets)) _mock_ob = MockOBProvider( imbalance_bias=-0.09, depth_scale=1.0, assets=OB_ASSETS, imbalance_biases={"BTCUSDT": -0.086, "ETHUSDT": -0.092, "BNBUSDT": +0.05, "SOLUSDT": +0.05}, ) ob_eng = OBFeatureEngine(_mock_ob) ob_eng.preload_date("mock", OB_ASSETS) # ── Main run ────────────────────────────────────────────────────────────────── print(f"\n=== MacroPostureSwitcher + Full Stack — 5y Klines ===") print(f"=== dvol_none_below=47.5 rvol_pause=0.000455 ===") t0 = time.time() engine = NDAlphaEngine(**ENGINE_KWARGS) engine.set_ob_engine(ob_eng) engine.set_acb(acb) if use_mc: engine.set_mc_forewarner(forewarner, MC_BASE_CFG) engine.set_esoteric_hazard_multiplier(0.0) engine._bar_log_enabled = False dstats = [] year_trades = defaultdict(list) posture_log = [] # per-day posture decisions paused_days = 0 active_days = 0 for i, pf in enumerate(parquet_files): ds = pf.stem year = ds[:4] # ── MacroPostureSwitcher gate ── pr = prev_rvol.get(ds) pb = prev_btcret.get(ds) exf = acb._load_external_factors(ds) # dvol_btc, fng, funding_btc from NPZ decision = switcher.decide( dvol_btc = exf.get('dvol_btc', 50.0), fng = exf.get('fng', 50.0), funding_btc = exf.get('funding_btc', 0.0), realized_vol = pr, btc_day_return = pb, ) posture_log.append({ 'date': ds, 'year': year, 'posture': decision.posture.value, 'fear': round(decision.fear_score, 3), 'dvol': round(exf.get('dvol_btc', 50.0), 1), 'fng': round(exf.get('fng', 50.0), 1), 'size_mult': round(decision.size_mult, 3), 'prev_rvol': round(pr, 7) if pr else None, }) if decision.posture == Posture.NONE: paused_days += 1 continue active_days += 1 df = pd.read_parquet(pf) acols = [c for c in df.columns if c not in META_COLS] bp = df['BTCUSDT'].values if 'BTCUSDT' in df.columns else None dvol = np.full(len(df), np.nan) if bp is not None: for j in range(50, len(bp)): seg = bp[max(0, j-50):j] if len(seg) < 10: continue dvol[j] = float(np.std(np.diff(seg) / seg[:-1])) vol_ok = np.where(np.isfinite(dvol), dvol > vol_p60, False) n_before = len(engine.trade_history) stats = engine.process_day(ds, df, acols, vol_regime_ok=vol_ok) dstats.append({**stats, 'cap': engine.capital, 'date': ds, 'year': year, 'posture': decision.posture.value, 'fear': decision.fear_score}) year_trades[year].extend(engine.trade_history[n_before:]) del df, bp, dvol, vol_ok if (i + 1) % 100 == 0: gc.collect() if (i + 1) % 100 == 0: elapsed = time.time() - t0 rate = active_days / elapsed if elapsed > 0 else 1 print(f" [{i+1}/{total_files}] {ds} active={active_days} paused={paused_days} " f"cap=${engine.capital:,.0f} {elapsed/60:.1f}m") elapsed = time.time() - t0 # ── Metrics ─────────────────────────────────────────────────────────────────── tr = engine.trade_history w = [t for t in tr if t.pnl_absolute > 0] l = [t for t in tr if t.pnl_absolute <= 0] gw = sum(t.pnl_absolute for t in w) if w else 0 gl = abs(sum(t.pnl_absolute for t in l)) if l else 0 roi = (engine.capital - 25000) / 25000 * 100 pf_all = gw / gl if gl > 0 else 999 wr_all = len(w) / len(tr) * 100 if tr else 0.0 dr = [s['pnl'] / 25000 * 100 for s in dstats] sharpe = np.mean(dr) / np.std(dr) * np.sqrt(365) if np.std(dr) > 0 else 0 peak_cap = 25000.0 max_dd = 0.0 for s in dstats: peak_cap = max(peak_cap, s['cap']) dd = (peak_cap - s['cap']) / peak_cap * 100 max_dd = max(max_dd, dd) print(f"\n{'='*80}") print(f" MacroPostureSwitcher + Full Stack — 5y Klines") print(f" dvol_none_below=47.5 rvol_pause=0.000455 Runtime: {elapsed:.0f}s") print(f"{'='*80}") print(f" ROI: {roi:>+8.2f}%") print(f" Max DD: {max_dd:>8.2f}%") print(f" Sharpe: {sharpe:>8.3f}") print(f" PF: {pf_all:>8.4f}") print(f" WR: {wr_all:>8.2f}%") print(f" N trades: {len(tr):>8,}") print(f" Active: {active_days} Paused: {paused_days} / {total_files}") print(f" Capital: ${engine.capital:,.2f}") print(f"\n Per-year:") print(f" {'Year':<6} {'N':>6} {'WR%':>6} {'PF':>7} {'ROI%':>8}") print(f" {'-'*40}") for yr in sorted(year_trades.keys()): yt = year_trades[yr] if not yt: continue yw = [t for t in yt if t.pnl_absolute > 0] yl = [t for t in yt if t.pnl_absolute <= 0] ygw = sum(t.pnl_absolute for t in yw) ygl = abs(sum(t.pnl_absolute for t in yl)) ypf = ygw / ygl if ygl > 0 else 999.0 ywr = len(yw) / len(yt) * 100 yroi = sum(t.pnl_absolute for t in yt) / 25000 * 100 print(f" {yr:<6} {len(yt):>6,} {ywr:>6.2f}% {ypf:>7.4f} {yroi:>+8.2f}%") # ── Save ────────────────────────────────────────────────────────────────────── LOG_DIR.mkdir(exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") posture_csv = LOG_DIR / f"posture_stack_postures_{ts}.csv" if posture_log: with open(posture_csv, 'w', newline='') as f: w_ = csv.DictWriter(f, fieldnames=posture_log[0].keys()) w_.writeheader(); w_.writerows(posture_log) print(f"\n → {posture_csv}") summary = { 'mode': 'posture_5y_stack_test', 'ts': ts, 'runtime_s': round(elapsed, 1), 'dvol_none_below': 47.5, 'rvol_pause_thresh': 0.000455, 'roi_pct': round(roi, 4), 'max_dd_pct': round(max_dd, 4), 'sharpe': round(sharpe, 4), 'pf': round(pf_all, 4), 'wr_pct': round(wr_all, 3), 'n_trades': int(len(tr)), 'active_days': int(active_days), 'paused_days': int(paused_days), 'capital_final': round(engine.capital, 2), 'per_year': {yr: { 'n': len(year_trades[yr]), 'wr': round(len([t for t in year_trades[yr] if t.pnl_absolute > 0]) / max(1, len(year_trades[yr])) * 100, 3), 'pf': round( sum(t.pnl_absolute for t in year_trades[yr] if t.pnl_absolute > 0) / max(1e-9, abs(sum(t.pnl_absolute for t in year_trades[yr] if t.pnl_absolute <= 0))), 4), } for yr in sorted(year_trades.keys()) if year_trades[yr]}, } sum_json = LOG_DIR / f"posture_stack_summary_{ts}.json" with open(sum_json, 'w') as f: json.dump(summary, f, indent=2) print(f" → {sum_json}") print(f"\n Runtime: {elapsed:.0f}s") print(f"\n BASELINE (no posture gate): ROI=-99.9% PF=0.902 WR=60.8% N=7597") print(f" THIS RUN (posture gated): ROI={roi:+.1f}% PF={pf_all:.3f} WR={wr_all:.1f}% N={len(tr):,}")