Files
DOLPHIN/nautilus_dolphin/exf_regime_characterizer.py

310 lines
13 KiB
Python
Raw Permalink Normal View History

"""ExF-Driven Regime Characterizer
==================================
Reads the backfilled ExF NPZ files for all klines dates and builds a
daily regime score from: funding_btc, dvol_btc, fng, taker.
Then cross-tabulates:
1. ExF composite score vs vel_div SHORT/LONG edge
2. Each ExF indicator individually vs edge
3. Can ExF predict which DAYS/QUARTERS will have edge?
Also attempts to characterize "overall posture" (BEAR/NEUTRAL/BULL) from ExF
and validate against the year-level edge pattern we observed:
2021/22 (bear/vol) = high edge, 2023 (bull/low-vol) = near-zero edge.
Outputs:
run_logs/exf_regime_YYYYMMDD.csv per-date ExF values + edge
run_logs/exf_regime_YYYYMMDD.json summary correlations + regime bins
Console: correlation table + regime breakdown
"""
import sys, time, json, csv, gc
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
from pathlib import Path
from datetime import datetime, date
from collections import defaultdict
import numpy as np
import pandas as pd
VBT_DIR = Path(r"C:\Users\Lenovo\Documents\- DOLPHIN NG HD HCM TSF Predict\vbt_cache_klines")
EIGEN_PATH = Path(r"C:\Users\Lenovo\Documents\- Dolphin NG HD (NG3)\correlation_arb512\eigenvalues")
LOG_DIR = Path(r"C:\Users\Lenovo\Documents\- DOLPHIN NG HD HCM TSF Predict\nautilus_dolphin\run_logs")
TP_BPS = 95
MAX_HOLD = 120
tp_pct = TP_BPS / 10000.0
SHORT_T = -0.020
LONG_T = +0.020
# ExF indicator names as stored in NPZ files
EXF_NAMES = ['funding_btc', 'dvol_btc', 'fng', 'taker']
def load_exf_for_date(date_str):
"""Load ExF indicators for a date. Returns dict of name->value or None."""
date_path = EIGEN_PATH / date_str
if not date_path.exists():
return None
npz_files = sorted(date_path.glob('scan_*__Indicators.npz'))
if not npz_files:
return None
try:
data = np.load(str(npz_files[0]), allow_pickle=True)
if 'api_names' not in data or 'api_indicators' not in data:
return None
names = list(data['api_names'])
vals = data['api_indicators']
result = {}
for name in EXF_NAMES:
if name in names:
idx = names.index(name)
v = float(vals[idx])
result[name] = v if np.isfinite(v) else None
else:
result[name] = None
return result
except Exception:
return None
parquet_files = sorted(VBT_DIR.glob("*.parquet"))
parquet_files = [p for p in parquet_files if 'catalog' not in str(p)]
total = len(parquet_files)
print(f"Files: {total} Loading ExF + computing raw edge per date...")
print(f"SHORT_T={SHORT_T} LONG_T={LONG_T} TP={TP_BPS}bps MAX_HOLD={MAX_HOLD}\n")
# Control
ctrl_dn_count = ctrl_up_count = ctrl_n = 0
rows = []
t0 = time.time()
for i, pf in enumerate(parquet_files):
ds = pf.stem
year = ds[:4]
month = int(ds[5:7])
quarter = f"Q{(month-1)//3+1}"
# Load ExF
exf = load_exf_for_date(ds)
funding = exf.get('funding_btc') if exf else None
dvol = exf.get('dvol_btc') if exf else None
fng = exf.get('fng') if exf else None
taker = exf.get('taker') if exf else None
try:
df = pd.read_parquet(pf)
except Exception:
continue
if 'vel_div' not in df.columns or 'BTCUSDT' not in df.columns:
continue
vd = df['vel_div'].values.astype(np.float64)
btc = df['BTCUSDT'].values.astype(np.float64)
vd = np.where(np.isfinite(vd), vd, 0.0)
btc = np.where(np.isfinite(btc) & (btc > 0), btc, np.nan)
n = len(btc)
if n < MAX_HOLD + 5:
del df, vd, btc
continue
# BTC day metrics
btc_clean = btc[np.isfinite(btc)]
btc_return = (btc_clean[-1] - btc_clean[0]) / btc_clean[0] if len(btc_clean) >= 2 else 0.0
log_r = np.diff(np.log(btc_clean)) if len(btc_clean) >= 2 else np.array([])
realized_vol = float(np.std(log_r)) if len(log_r) > 1 else 0.0
# vel_div distribution
vd_finite = vd[np.isfinite(vd)]
vd_p10 = float(np.percentile(vd_finite, 10)) if len(vd_finite) > 5 else 0.0
vd_p50 = float(np.median(vd_finite)) if len(vd_finite) > 0 else 0.0
vd_p90 = float(np.percentile(vd_finite, 90)) if len(vd_finite) > 5 else 0.0
# Control
for j in range(0, n - MAX_HOLD, 60):
ep = btc[j]; ex = btc[j + MAX_HOLD]
if np.isfinite(ep) and np.isfinite(ex) and ep > 0:
r = (ex - ep) / ep
ctrl_dn_count += int(r <= -tp_pct)
ctrl_up_count += int(r >= tp_pct)
ctrl_n += 1
# Raw edge per direction
n_usable = n - MAX_HOLD
from numpy.lib.stride_tricks import sliding_window_view
windows = sliding_window_view(btc, MAX_HOLD + 1)[:n_usable]
ep_arr = windows[:, 0]
fut_min = np.nanmin(windows[:, 1:], axis=1)
fut_max = np.nanmax(windows[:, 1:], axis=1)
valid = np.isfinite(ep_arr) & (ep_arr > 0)
for direction, threshold in [('S', SHORT_T), ('L', LONG_T)]:
active = (vd[:n_usable] <= threshold) if direction == 'S' else (vd[:n_usable] >= threshold)
sig_idx = np.where(active & valid)[0]
n_sig = len(sig_idx)
if n_sig == 0:
rows.append({
'date': ds, 'year': year, 'quarter': quarter,
'direction': direction,
'n_sig': 0, 'wr': None, 'n_trades': 0,
'funding_btc': funding, 'dvol_btc': dvol, 'fng': fng, 'taker': taker,
'btc_return': round(btc_return, 6),
'realized_vol': round(realized_vol, 8),
'vd_p10': round(vd_p10, 6), 'vd_p50': round(vd_p50, 6), 'vd_p90': round(vd_p90, 6),
})
continue
ep_s = ep_arr[sig_idx]
if direction == 'S':
hit = fut_min[sig_idx] <= ep_s * (1.0 - tp_pct)
else:
hit = fut_max[sig_idx] >= ep_s * (1.0 + tp_pct)
wins = int(np.sum(hit))
n_tr = len(sig_idx)
wr = wins / n_tr * 100 if n_tr > 0 else None
rows.append({
'date': ds, 'year': year, 'quarter': quarter,
'direction': direction,
'n_sig': n_sig, 'wr': round(wr, 3) if wr is not None else None, 'n_trades': n_tr,
'funding_btc': funding, 'dvol_btc': dvol, 'fng': fng, 'taker': taker,
'btc_return': round(btc_return, 6),
'realized_vol': round(realized_vol, 8),
'vd_p10': round(vd_p10, 6), 'vd_p50': round(vd_p50, 6), 'vd_p90': round(vd_p90, 6),
})
del df, vd, btc, windows, ep_arr, fut_min, fut_max, valid
if (i + 1) % 100 == 0:
gc.collect()
elapsed = time.time() - t0
print(f" [{i+1}/{total}] {ds} {elapsed/60:.1f}m")
elapsed = time.time() - t0
print(f"\nPass complete: {elapsed:.0f}s")
ctrl_dn_pct = ctrl_dn_count / ctrl_n * 100 if ctrl_n else 7.1
ctrl_up_pct = ctrl_up_count / ctrl_n * 100 if ctrl_n else 7.1
print(f"Control: DOWN={ctrl_dn_pct:.2f}% UP={ctrl_up_pct:.2f}% n={ctrl_n:,}")
# Analysis
df_rows = pd.DataFrame(rows)
# Add edge column
df_rows['baseline'] = df_rows['direction'].map({'S': ctrl_dn_pct, 'L': ctrl_up_pct})
df_rows['edge'] = df_rows['wr'] - df_rows['baseline']
print(f"\n{'='*70}")
print(f" ExF AVAILABILITY")
print(f"{'='*70}")
for col in ['funding_btc', 'dvol_btc', 'fng', 'taker']:
n_avail = df_rows[df_rows['direction']=='S'][col].notna().sum()
print(f" {col:<15}: {n_avail}/{total} dates available ({n_avail/total*100:.0f}%)")
# Correlations (SHORT only, dates with trades)
df_s = df_rows[(df_rows['direction'] == 'S') & (df_rows['n_trades'] > 0)].copy()
print(f"\n{'='*70}")
print(f" CORRELATIONS WITH SHORT EDGE (n={len(df_s)} days with trades)")
print(f"{'='*70}")
for col in ['funding_btc', 'dvol_btc', 'fng', 'taker', 'btc_return', 'realized_vol', 'vd_p10', 'vd_p50']:
sub = df_s[[col, 'edge']].dropna()
if len(sub) < 10:
print(f" {col:<18}: n={len(sub)} (insufficient)")
continue
corr = sub[col].corr(sub['edge'])
print(f" {col:<18}: r={corr:>+.4f} (n={len(sub)})")
# ExF bin analysis
print(f"\n{'='*70}")
print(f" EDGE BY ExF QUARTILE (SHORT)")
print(f"{'='*70}")
for col in ['funding_btc', 'dvol_btc', 'fng', 'taker']:
sub = df_s[[col, 'edge']].dropna()
if len(sub) < 20:
continue
try:
sub['q'] = pd.qcut(sub[col], 4, labels=False, duplicates='drop')
sub['q_label'] = sub['q'].map(lambda x: ['Q1(low)', 'Q2', 'Q3', 'Q4(high)'][int(x)] if pd.notna(x) else 'NA')
except Exception:
sub['q'] = pd.cut(sub[col], 4, labels=False)
sub['q_label'] = sub['q'].map(lambda x: f'B{int(x)}' if pd.notna(x) else 'NA')
print(f"\n {col}:")
for q_label, grp in sub.groupby('q_label', observed=True):
print(f" {q_label:<12}: mean_edge={grp['edge'].mean():>+.2f}pp n={len(grp)}")
# Realized vol quartile
print(f"\n{'='*70}")
print(f" EDGE BY REALIZED_VOL QUARTILE (SHORT)")
print(f"{'='*70}")
sub = df_s[['realized_vol', 'edge']].dropna()
sub['q'] = pd.qcut(sub['realized_vol'], 4, labels=['Q1(calm)', 'Q2', 'Q3', 'Q4(volatile)'])
for q_label, grp in sub.groupby('q', observed=True):
print(f" {q_label:<15}: mean_edge={grp['edge'].mean():>+.2f}pp median={grp['edge'].median():>+.2f}pp n={len(grp)}")
# BTC return (up vs down days)
print(f"\n{'='*70}")
print(f" EDGE BY BTC DAILY RETURN (SHORT)")
print(f"{'='*70}")
df_s['btc_up'] = df_s['btc_return'] > 0
for up, grp in df_s.groupby('btc_up'):
label = 'BTC UP day' if up else 'BTC DN day'
e_vals = grp['edge'].dropna()
print(f" {label:<15}: mean_edge={e_vals.mean():>+.2f}pp n={len(e_vals)}")
# Year breakdown with ExF summary
print(f"\n{'='*70}")
print(f" PER-YEAR: ExF MEDIANS vs EDGE")
print(f"{'='*70}")
print(f" {'Year':<6} {'edge_S':>8} {'edge_L':>8} {'funding':>10} {'dvol':>8} {'fng':>6} {'taker':>8} {'rvol':>10}")
print(f" {'-'*68}")
for yr in ['2021','2022','2023','2024','2025','2026']:
yr_s = df_rows[(df_rows['year']==yr) & (df_rows['direction']=='S') & (df_rows['n_trades']>0)]
yr_l = df_rows[(df_rows['year']==yr) & (df_rows['direction']=='L') & (df_rows['n_trades']>0)]
es = yr_s['edge'].mean() if len(yr_s) > 0 else float('nan')
el = yr_l['edge'].mean() if len(yr_l) > 0 else float('nan')
fund = yr_s['funding_btc'].median() if yr_s['funding_btc'].notna().any() else float('nan')
dv = yr_s['dvol_btc'].median() if yr_s['dvol_btc'].notna().any() else float('nan')
fg = yr_s['fng'].median() if yr_s['fng'].notna().any() else float('nan')
tk = yr_s['taker'].median() if yr_s['taker'].notna().any() else float('nan')
rv = yr_s['realized_vol'].median()
print(f" {yr:<6} {es:>+7.2f}pp {el:>+7.2f}pp {fund:>10.4f} {dv:>8.1f} {fg:>6.1f} {tk:>8.4f} {rv:>10.6f}")
# Same for LONG
df_l = df_rows[(df_rows['direction'] == 'L') & (df_rows['n_trades'] > 0)].copy()
print(f"\n{'='*70}")
print(f" CORRELATIONS WITH LONG EDGE (n={len(df_l)} days with trades)")
print(f"{'='*70}")
for col in ['funding_btc', 'dvol_btc', 'fng', 'taker', 'btc_return', 'realized_vol', 'vd_p50']:
sub = df_l[[col, 'edge']].dropna()
if len(sub) < 10:
print(f" {col:<18}: n={len(sub)} (insufficient)")
continue
corr = sub[col].corr(sub['edge'])
print(f" {col:<18}: r={corr:>+.4f} (n={len(sub)})")
# Save
LOG_DIR.mkdir(exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
regime_path = LOG_DIR / f"exf_regime_{ts}.csv"
df_rows.to_csv(regime_path, index=False)
# Summary JSON
summary = {
'timestamp': ts,
'n_dates': total,
'ctrl_dn_pct': round(ctrl_dn_pct, 4),
'ctrl_up_pct': round(ctrl_up_pct, 4),
'correlations_short': {},
'correlations_long': {},
'exf_availability': {},
}
for col in ['funding_btc', 'dvol_btc', 'fng', 'taker', 'btc_return', 'realized_vol']:
sub_s = df_s[[col, 'edge']].dropna()
sub_l = df_l[[col, 'edge']].dropna()
summary['correlations_short'][col] = round(sub_s[col].corr(sub_s['edge']), 4) if len(sub_s) >= 10 else None
summary['correlations_long'][col] = round(sub_l[col].corr(sub_l['edge']), 4) if len(sub_l) >= 10 else None
n_avail = df_rows[df_rows['direction']=='S'][col].notna().sum()
summary['exf_availability'][col] = int(n_avail)
json_path = LOG_DIR / f"exf_regime_{ts}.json"
with open(json_path, 'w') as f:
json.dump(summary, f, indent=2)
print(f"\n regime CSV → {regime_path}")
print(f" summary → {json_path}")
print(f" Runtime: {elapsed:.0f}s Rows: {len(df_rows):,}")