Files
DOLPHIN/nautilus_dolphin/regime_exit_sweep_5y.py
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

534 lines
26 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Regime-Based Exit Sweep — 5y Klines
=======================================
Tests EXHAUSTION + INVALIDATION exit logic ported from non_naive_exit_manager.py.
For SHORT entry (vel_div <= -ENTRY_T):
NORMALIZATION: vel_div still below EXHST_T → hold (move ongoing)
EXHAUSTION: vel_div > EXHST_T → exit (edge used up, divergence unwound)
INVALIDATION: vel_div > +INV_T → exit (signal fully flipped, wrong-way)
TP: price dropped tp_bps from entry → take profit
STOP: price rose stop_bps from entry → cut loss
MAX_HOLD: safety fallback after max_hold bars
For LONG entry (vel_div <= -ENTRY_T, mean-reversion thesis):
NORMALIZATION: vel_div still below EXHST_T → hold
EXHAUSTION: vel_div > EXHST_T → exit (reversion complete)
INVALIDATION: vel_div < -INV_T → exit (divergence deepened, wrong-way)
TP: price rose tp_bps → take profit
STOP: price dropped stop_bps → cut loss
MAX_HOLD: safety fallback
KEY CALIBRATION NOTE (from distribution analysis 2026-03-09):
vel_div on 1m klines: range [-14.5, +11.3], std=0.505, median=+0.013
41% of bars have vel_div < -0.020 (NOT a rare signal!)
After SHORT entry: at lag+1, only 42% still below -0.020; mean snaps to -0.038
Exhaustion thresholds relative to 1m scale must be calibrated here.
SWEEP PARAMETERS:
Entry threshold (ENTRY_T): 0.020 (fixed)
Exhaustion (EXHST_T): [-0.020, -0.010, 0.000, +0.010, +0.020]
"exit when vel_div > EXHST_T" (returned from extreme toward center/positive)
Invalidation (INV_T): [0.020, 0.050, 0.100, 0.200] (abs, direction-aware)
Take profit (TP_BPS): [None, 60, 95] (None = only micro exits)
Hard stop (STOP_BPS): [None, 50, 100, 200] (None = no hard stop)
Max hold (MAX_HOLD): [10, 20, 60] (safety fallback bars)
Logs (ALL painstakingly written):
run_logs/regime_exit_summary_YYYYMMDD_HHMMSS.csv — one row per combo
run_logs/regime_exit_byyear_YYYYMMDD_HHMMSS.csv — per combo × year
run_logs/regime_exit_byreason_YYYYMMDD_HHMMSS.csv — per combo × exit reason
run_logs/regime_exit_top50_YYYYMMDD_HHMMSS.txt — human-readable top-50 table
Console: top-30 and representative tables
"""
import sys, time, csv, gc
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
from pathlib import Path
from datetime import datetime
from collections import defaultdict
import numpy as np
import pandas as pd
from itertools import product
VBT_DIR = Path(r"C:\Users\Lenovo\Documents\- DOLPHIN NG HD HCM TSF Predict\vbt_cache_klines")
LOG_DIR = Path(r"C:\Users\Lenovo\Documents\- DOLPHIN NG HD HCM TSF Predict\nautilus_dolphin\run_logs")
LOG_DIR.mkdir(exist_ok=True)
ENTRY_T = 0.020 # vel_div threshold for both SHORT and LONG entries
# Sweep grid
EXHST_T_LIST = [-0.020, -0.010, 0.000, +0.010, +0.020]
INV_T_LIST = [0.020, 0.050, 0.100, 0.200]
TP_BPS_LIST = [None, 60, 95]
STOP_BPS_LIST = [None, 50, 100, 200]
MAX_HOLD_LIST = [10, 20, 60]
DIRECTIONS = ['S', 'L']
MAX_HOLD_MAX = max(MAX_HOLD_LIST) # precompute windows up to this
# ─────────────────────────────────────────────────────────────────────
# Accumulators — keyed by (direction, exhst_t, inv_t, tp_bps, stop_bps, max_hold, year)
# Each value: dict with per-exit-reason counts/pnl, plus totals
REASON_NAMES = ['TP', 'STOP', 'EXHST', 'INV', 'MAX_HOLD']
# stats[(key)] = {reason: {wins, losses, gw, gl, n_trades, hold_sum}, ...}
# We'll use a flat dict of dicts per key
def make_reason_dict():
return {r: {'n': 0, 'wins': 0, 'gw': 0.0, 'gl': 0.0, 'hold_sum': 0} for r in REASON_NAMES}
stats = defaultdict(make_reason_dict)
ctrl = defaultdict(lambda: {'dn': 0, 'up': 0, 'n': 0}) # (year,)
parquet_files = sorted(VBT_DIR.glob("*.parquet"))
parquet_files = [p for p in parquet_files if 'catalog' not in str(p)]
total = len(parquet_files)
print(f"Files: {total} ENTRY_T=±{ENTRY_T}")
print(f"EXHST_T: {EXHST_T_LIST}")
print(f"INV_T: {INV_T_LIST}")
print(f"TP_BPS: {TP_BPS_LIST}")
print(f"STOP_BPS: {STOP_BPS_LIST}")
print(f"MAX_HOLD: {MAX_HOLD_LIST}")
n_combos = (len(EXHST_T_LIST) * len(INV_T_LIST) * len(TP_BPS_LIST) *
len(STOP_BPS_LIST) * len(MAX_HOLD_LIST) * len(DIRECTIONS))
print(f"Total combos: {n_combos}")
print()
t0 = time.time()
for i_file, pf in enumerate(parquet_files):
ds = pf.stem # YYYY-MM-DD
year = ds[:4]
try:
df = pd.read_parquet(pf)
except Exception:
continue
if 'vel_div' not in df.columns or 'BTCUSDT' not in df.columns:
continue
vd = df['vel_div'].values.astype(np.float64)
btc = df['BTCUSDT'].values.astype(np.float64)
del df
vd = np.where(np.isfinite(vd), vd, 0.0)
btc = np.where(np.isfinite(btc) & (btc > 0), btc, np.nan)
n = len(btc)
if n < MAX_HOLD_MAX + 5:
del vd, btc
continue
n_usable = n - MAX_HOLD_MAX
# Control baseline
ck = (year,)
tp95 = 0.0095
for j in range(0, n_usable, 30):
ep = btc[j]
if not np.isfinite(ep) or ep <= 0:
continue
future = btc[j+1:j+1+60]
fmin = np.nanmin(future) if len(future) else np.nan
fmax = np.nanmax(future) if len(future) else np.nan
if np.isfinite(fmin) and np.isfinite(fmax):
ctrl[ck]['dn'] += int((ep - fmin) / ep >= tp95)
ctrl[ck]['up'] += int((fmax - ep) / ep >= tp95)
ctrl[ck]['n'] += 1
# ── Extract trajectories for both directions ──────────────────────────
# For each potential entry bar, store: entry_vd, entry_btc,
# future_vd[MAX_HOLD_MAX], future_btc[MAX_HOLD_MAX]
valid_entry = np.isfinite(btc[:n_usable]) & (btc[:n_usable] > 0)
for direction in DIRECTIONS:
if direction == 'S':
entry_mask = (vd[:n_usable] <= -ENTRY_T) & valid_entry
else: # LONG
entry_mask = (vd[:n_usable] <= -ENTRY_T) & valid_entry # same entry, different exit logic
sig_idx = np.where(entry_mask)[0]
if len(sig_idx) == 0:
continue
N_sig = len(sig_idx)
entry_vd = vd[sig_idx] # (N_sig,)
entry_btc = btc[sig_idx] # (N_sig,)
# Build future arrays: (N_sig, MAX_HOLD_MAX)
from numpy.lib.stride_tricks import sliding_window_view
# future vel_div: bars [j+1 .. j+MAX_HOLD_MAX]
vd_windows = np.lib.stride_tricks.sliding_window_view(vd, MAX_HOLD_MAX+1)[:n_usable]
btc_windows = np.lib.stride_tricks.sliding_window_view(btc, MAX_HOLD_MAX+1)[:n_usable]
fut_vd = vd_windows[sig_idx, 1:] # (N_sig, MAX_HOLD_MAX)
fut_btc = btc_windows[sig_idx, 1:] # (N_sig, MAX_HOLD_MAX)
del vd_windows, btc_windows
# Price returns from entry (positive = favourable for direction)
ep_col = entry_btc[:, None] # (N_sig, 1)
if direction == 'S':
price_ret = (ep_col - fut_btc) / ep_col # positive = price went DOWN = SHORT win
else:
price_ret = (fut_btc - ep_col) / ep_col # positive = price went UP = LONG win
# Replace NaN btc with 0 price_ret (no movement signal)
price_ret = np.where(np.isfinite(fut_btc), price_ret, 0.0)
# ── Sweep all parameter combos ────────────────────────────────────
for exhst_t, inv_t, tp_bps, stop_bps, max_hold in product(
EXHST_T_LIST, INV_T_LIST, TP_BPS_LIST, STOP_BPS_LIST, MAX_HOLD_LIST):
H = max_hold # actual hold limit
tp_pct = (tp_bps / 10_000.0) if tp_bps is not None else None
stop_pct = (stop_bps / 10_000.0) if stop_bps is not None else None
fvd = fut_vd[:, :H] # (N_sig, H)
fpr = price_ret[:, :H] # (N_sig, H), positive = profit direction
fbt = fut_btc[:, :H] # (N_sig, H) for exit price
BIG = H + 1 # sentinel "never fires"
# ── EXHAUSTION: vel_div > EXHST_T (returned from -ENTRY_T toward center/positive)
# For SHORT: exit when vel_div > exhst_t (no longer extremely negative)
# For LONG: same (reversion complete when vel_div returns above exhst_t)
exhst_mask = fvd > exhst_t
exhst_bar = np.where(exhst_mask.any(1), np.argmax(exhst_mask, 1), BIG)
# ── INVALIDATION
if direction == 'S':
# SHORT: invalidation if vel_div goes strongly positive (flipped)
inv_mask = fvd > +inv_t
else:
# LONG: invalidation if vel_div deepens more negative
inv_mask = fvd < -inv_t
inv_bar = np.where(inv_mask.any(1), np.argmax(inv_mask, 1), BIG)
# ── TP
if tp_pct is not None:
tp_mask = fpr >= tp_pct
tp_bar = np.where(tp_mask.any(1), np.argmax(tp_mask, 1), BIG)
else:
tp_bar = np.full(N_sig, BIG, dtype=np.int32)
# ── STOP
if stop_pct is not None:
stop_mask = fpr <= -stop_pct
st_bar = np.where(stop_mask.any(1), np.argmax(stop_mask, 1), BIG)
else:
st_bar = np.full(N_sig, BIG, dtype=np.int32)
# ── MAX_HOLD fallback: always fires at bar H-1
mh_bar = np.full(N_sig, H - 1, dtype=np.int32)
# ── Find first-firing exit (priority: TP > STOP > EXHST > INV > MAX_HOLD)
all_bars = np.column_stack([tp_bar, st_bar, exhst_bar, inv_bar, mh_bar])
first_reason_idx = np.argmin(all_bars, axis=1) # 0=TP,1=STOP,2=EXHST,3=INV,4=MAX_HOLD
exit_bar = all_bars[np.arange(N_sig), first_reason_idx]
# Clip to valid range
exit_bar = np.clip(exit_bar, 0, H - 1)
# Exit price return for each trade
exit_pnl = fpr[np.arange(N_sig), exit_bar] # positive = profit
# Win = exit_pnl > 0 (for STOP, it's always a loss; for TP, always a win)
won = exit_pnl > 0
key = (direction, exhst_t, inv_t, tp_bps, stop_bps, max_hold, year)
s = stats[key]
for ri, rname in enumerate(REASON_NAMES):
rmask = (first_reason_idx == ri)
if not rmask.any():
continue
n_r = int(rmask.sum())
wins = int(won[rmask].sum())
gw = float(exit_pnl[rmask & won].sum()) if (rmask & won).any() else 0.0
gl = float((-exit_pnl[rmask & ~won]).sum()) if (rmask & ~won).any() else 0.0
hold_sum = int(exit_bar[rmask].sum())
s[rname]['n'] += n_r
s[rname]['wins'] += wins
s[rname]['gw'] += gw
s[rname]['gl'] += gl
s[rname]['hold_sum'] += hold_sum
del fvd, fpr, fbt, exhst_mask, exhst_bar, inv_mask, inv_bar
del tp_bar, st_bar, mh_bar, all_bars, exit_bar, exit_pnl, won
del fut_vd, fut_btc, price_ret, entry_vd, entry_btc, sig_idx
del vd, btc
if (i_file + 1) % 100 == 0:
gc.collect()
elapsed = time.time() - t0
print(f" [{i_file+1}/{total}] {ds} {elapsed:.0f}s eta={elapsed/(i_file+1)*(total-i_file-1):.0f}s")
elapsed = time.time() - t0
print(f"\nPass complete: {elapsed:.0f}s")
# ═══════════════════════════════════════════════════════════════════════════
# AGGREGATE RESULTS
# ═══════════════════════════════════════════════════════════════════════════
YEARS = ['2021', '2022', '2023', '2024', '2025', '2026']
def aggregate_key(direction, exhst_t, inv_t, tp_bps, stop_bps, max_hold, years=None):
"""Sum stats across years (or a subset) for a given param combo."""
total = {r: {'n': 0, 'wins': 0, 'gw': 0.0, 'gl': 0.0, 'hold_sum': 0} for r in REASON_NAMES}
for yr in (years or YEARS):
k = (direction, exhst_t, inv_t, tp_bps, stop_bps, max_hold, yr)
s = stats.get(k)
if s is None:
continue
for r in REASON_NAMES:
for field in ['n', 'wins', 'gw', 'gl', 'hold_sum']:
total[r][field] += s[r][field]
return total
def summary_metrics(agg):
"""Compute overall PF, WR, n_trades, avg_hold from aggregated reason dict."""
n_tot = sum(agg[r]['n'] for r in REASON_NAMES)
wins = sum(agg[r]['wins'] for r in REASON_NAMES)
gw = sum(agg[r]['gw'] for r in REASON_NAMES)
gl = sum(agg[r]['gl'] for r in REASON_NAMES)
h_sum = sum(agg[r]['hold_sum'] for r in REASON_NAMES)
wr = wins / n_tot * 100 if n_tot else float('nan')
pf = gw / gl if gl > 0 else (999.0 if gw > 0 else float('nan'))
avg_h = h_sum / n_tot if n_tot else float('nan')
return {'n': n_tot, 'wins': wins, 'wr': wr, 'pf': pf, 'gw': gw, 'gl': gl, 'avg_hold': avg_h}
# ─────────────────────────────────────────────────────────────────────────
# Build summary rows (one per param combo × direction)
# ─────────────────────────────────────────────────────────────────────────
print("\nBuilding summary rows...")
summary_rows = []
for direction in DIRECTIONS:
for exhst_t, inv_t, tp_bps, stop_bps, max_hold in product(
EXHST_T_LIST, INV_T_LIST, TP_BPS_LIST, STOP_BPS_LIST, MAX_HOLD_LIST):
agg = aggregate_key(direction, exhst_t, inv_t, tp_bps, stop_bps, max_hold)
m = summary_metrics(agg)
# Per-reason breakdown
reason_stats = {}
for r in REASON_NAMES:
rn = agg[r]['n']
reason_stats[f'n_{r}'] = rn
reason_stats[f'pct_{r}'] = round(rn / m['n'] * 100, 1) if m['n'] else float('nan')
row = {
'direction': direction,
'exhst_t': exhst_t,
'inv_t': inv_t,
'tp_bps': tp_bps if tp_bps is not None else 'none',
'stop_bps': stop_bps if stop_bps is not None else 'none',
'max_hold': max_hold,
'n_trades': m['n'],
'wr': round(m['wr'], 3),
'pf': round(m['pf'], 4),
'gw': round(m['gw'], 2),
'gl': round(m['gl'], 2),
'avg_hold': round(m['avg_hold'], 2),
**reason_stats,
}
summary_rows.append(row)
# Sort by PF descending
summary_rows.sort(key=lambda r: -r['pf'])
# ─────────────────────────────────────────────────────────────────────────
# Build per-year rows
# ─────────────────────────────────────────────────────────────────────────
print("Building per-year rows...")
year_rows = []
for direction in DIRECTIONS:
for exhst_t, inv_t, tp_bps, stop_bps, max_hold in product(
EXHST_T_LIST, INV_T_LIST, TP_BPS_LIST, STOP_BPS_LIST, MAX_HOLD_LIST):
for yr in YEARS:
agg = aggregate_key(direction, exhst_t, inv_t, tp_bps, stop_bps, max_hold, [yr])
m = summary_metrics(agg)
# Control baseline
ck = (yr,)
c = ctrl.get(ck, {'dn': 0, 'up': 0, 'n': 1})
bl = (c['dn'] / c['n'] * 100) if direction == 'S' else (c['up'] / c['n'] * 100)
edge = m['wr'] - bl if not np.isnan(m['wr']) else float('nan')
year_rows.append({
'direction': direction,
'exhst_t': exhst_t,
'inv_t': inv_t,
'tp_bps': tp_bps if tp_bps is not None else 'none',
'stop_bps': stop_bps if stop_bps is not None else 'none',
'max_hold': max_hold,
'year': yr,
'n_trades': m['n'],
'wr': round(m['wr'], 3),
'pf': round(m['pf'], 4),
'edge_pp': round(edge, 3),
'avg_hold': round(m['avg_hold'], 2),
'ctrl_bl': round(bl, 3),
})
# ─────────────────────────────────────────────────────────────────────────
# Build per-reason rows
# ─────────────────────────────────────────────────────────────────────────
print("Building per-reason rows...")
reason_rows = []
for direction in DIRECTIONS:
for exhst_t, inv_t, tp_bps, stop_bps, max_hold in product(
EXHST_T_LIST, INV_T_LIST, TP_BPS_LIST, STOP_BPS_LIST, MAX_HOLD_LIST):
agg = aggregate_key(direction, exhst_t, inv_t, tp_bps, stop_bps, max_hold)
n_tot = sum(agg[r]['n'] for r in REASON_NAMES)
for rname in REASON_NAMES:
rv = agg[rname]
rn = rv['n']
if rn == 0:
continue
rwr = rv['wins'] / rn * 100
rpf = rv['gw'] / rv['gl'] if rv['gl'] > 0 else (999.0 if rv['gw'] > 0 else float('nan'))
reason_rows.append({
'direction': direction,
'exhst_t': exhst_t,
'inv_t': inv_t,
'tp_bps': tp_bps if tp_bps is not None else 'none',
'stop_bps': stop_bps if stop_bps is not None else 'none',
'max_hold': max_hold,
'exit_reason': rname,
'n': rn,
'pct_of_total': round(rn / n_tot * 100, 1) if n_tot else float('nan'),
'wins': rv['wins'],
'wr': round(rwr, 3),
'pf': round(rpf, 4),
'avg_hold': round(rv['hold_sum'] / rn, 2) if rn > 0 else float('nan'),
})
# ═══════════════════════════════════════════════════════════════════════════
# SAVE CSVs
# ═══════════════════════════════════════════════════════════════════════════
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
def save_csv(rows, name):
if not rows:
return
path = LOG_DIR / f"regime_exit_{name}_{ts}.csv"
with open(path, 'w', newline='', encoding='utf-8') as f:
w = csv.DictWriter(f, fieldnames=rows[0].keys())
w.writeheader()
w.writerows(rows)
print(f"{path} ({len(rows)} rows)")
return path
print("\nSaving CSVs...")
save_csv(summary_rows, 'summary')
save_csv(year_rows, 'byyear')
save_csv(reason_rows, 'byreason')
# ═══════════════════════════════════════════════════════════════════════════
# CONSOLE OUTPUT — TOP RESULTS
# ═══════════════════════════════════════════════════════════════════════════
def fmt_pf(pf):
if np.isnan(pf):
return ' nan'
elif pf >= 999:
return ' inf '
else:
marker = ' ***' if pf > 1.0 else (' **' if pf > 0.8 else (' *' if pf > 0.6 else ''))
return f'{pf:6.3f}{marker}'
print(f"\n{'='*110}")
print(f" TOP 50 RESULTS BY PROFIT FACTOR")
print(f" (*** = PF>1.0 = RAW PROFITABLE | ** = PF>0.8 | * = PF>0.6)")
print(f"{'='*110}")
hdr = (f" {'Dir':3} {'ExhT':>7} {'InvT':>6} {'TP':>6} {'Stop':>5} {'MH':>4} | "
f"{'N':>8} {'WR%':>6} {'PF':>10} {'AvgHold':>8} | "
f"{'%TP':>5} {'%ST':>5} {'%EX':>5} {'%IN':>5} {'%MH':>5}")
print(hdr)
print(f" {'-'*108}")
for row in summary_rows[:50]:
print(f" {row['direction']:3} {row['exhst_t']:>+7.3f} {row['inv_t']:>6.3f} "
f"{str(row['tp_bps']):>6} {str(row['stop_bps']):>5} {row['max_hold']:>4} | "
f"{row['n_trades']:>8,} {row['wr']:>6.1f}% {fmt_pf(row['pf']):>10} {row['avg_hold']:>8.1f} | "
f"{row.get('pct_TP',0):>5.1f} {row.get('pct_STOP',0):>5.1f} "
f"{row.get('pct_EXHST',0):>5.1f} {row.get('pct_INV',0):>5.1f} {row.get('pct_MAX_HOLD',0):>5.1f}")
# Per-direction best 10
for direction in DIRECTIONS:
d_rows = [r for r in summary_rows if r['direction'] == direction][:10]
print(f"\n{'='*90}")
print(f" BEST 10 — DIRECTION={direction}")
print(f"{'='*90}")
print(hdr)
print(f" {'-'*88}")
for row in d_rows:
print(f" {row['direction']:3} {row['exhst_t']:>+7.3f} {row['inv_t']:>6.3f} "
f"{str(row['tp_bps']):>6} {str(row['stop_bps']):>5} {row['max_hold']:>4} | "
f"{row['n_trades']:>8,} {row['wr']:>6.1f}% {fmt_pf(row['pf']):>10} {row['avg_hold']:>8.1f} | "
f"{row.get('pct_TP',0):>5.1f} {row.get('pct_STOP',0):>5.1f} "
f"{row.get('pct_EXHST',0):>5.1f} {row.get('pct_INV',0):>5.1f} {row.get('pct_MAX_HOLD',0):>5.1f}")
# Per-year breakdown of top-5 overall
print(f"\n{'='*90}")
print(f" PER-YEAR BREAKDOWN OF TOP-10 COMBOS")
print(f"{'='*90}")
top10_keys = [(r['direction'], r['exhst_t'], r['inv_t'], r['tp_bps'], r['stop_bps'], r['max_hold'])
for r in summary_rows[:10]]
for dk in top10_keys:
d, e, inv, tp, st, mh = dk
print(f"\n {d} exhst={e:+.3f} inv={inv:.3f} tp={tp} stop={st} mh={mh}")
print(f" {'Year':<6} {'N':>7} {'WR%':>7} {'PF':>8} {'Edge':>8} {'AvgHold':>8}")
print(f" {'-'*55}")
for yr in YEARS:
yr_rows = [r for r in year_rows
if r['direction']==d and r['exhst_t']==e and r['inv_t']==inv
and str(r['tp_bps'])==str(tp) and str(r['stop_bps'])==str(st)
and r['max_hold']==mh and r['year']==yr]
if yr_rows:
yr_r = yr_rows[0]
print(f" {yr:<6} {yr_r['n_trades']:>7,} {yr_r['wr']:>7.1f}% {yr_r['pf']:>8.3f} "
f"{yr_r['edge_pp']:>+8.2f}pp {yr_r['avg_hold']:>8.1f}b")
# Comparison table: regime exit vs dumb MAX_HOLD
print(f"\n{'='*90}")
print(f" REGIME EXIT vs DUMB TIMER — KEY COMPARISON")
print(f" (tp=95bps, stop=None, entry=±{ENTRY_T})")
print(f"{'='*90}")
print(f" {'Config':<35} {'N':>8} {'WR%':>7} {'PF':>8} {'AvgHold':>8}")
print(f" {'-'*70}")
for direction in DIRECTIONS:
for max_hold in [10, 20, 60]:
# Dumb timer (exhst disabled = exhst_t very high, inv disabled = inv_t very high)
dumb_key = (direction, +0.020, 0.200, 95, None, max_hold) # exhaustion fires after TP only
dumb_rows = [r for r in summary_rows if
r['direction']==direction and r['exhst_t']==0.020 and r['inv_t']==0.200
and str(r['tp_bps'])=='95' and str(r['stop_bps'])=='none' and r['max_hold']==max_hold]
# Best regime exit with tp=95
best_regime = [r for r in summary_rows if
r['direction']==direction and str(r['tp_bps'])=='95'
and str(r['stop_bps'])=='none' and r['max_hold']==max_hold]
if dumb_rows:
dr = dumb_rows[0]
print(f" {direction} DUMB mh={max_hold:>3}b tp=95 stop=none{'':<5} "
f"{dr['n_trades']:>8,} {dr['wr']:>7.1f}% {fmt_pf(dr['pf']):>8} {dr['avg_hold']:>8.1f}b")
if best_regime:
br = best_regime[0]
label = f"exhst={br['exhst_t']:+.3f} inv={br['inv_t']:.3f}"
print(f" {direction} BEST mh={max_hold:>3}b tp=95 stop=none {label:<15} "
f"{br['n_trades']:>8,} {br['wr']:>7.1f}% {fmt_pf(br['pf']):>8} {br['avg_hold']:>8.1f}b")
print()
# Save top-50 human-readable text file
top50_path = LOG_DIR / f"regime_exit_top50_{ts}.txt"
with open(top50_path, 'w', encoding='utf-8') as f:
f.write(f"REGIME EXIT SWEEP — TOP 50\n")
f.write(f"Generated: {ts}\n")
f.write(f"Entry threshold: ±{ENTRY_T} (both directions: SHORT and LONG at vel_div<=-{ENTRY_T})\n\n")
f.write(hdr + "\n")
f.write(f" {'-'*108}\n")
for row in summary_rows[:50]:
f.write(f" {row['direction']:3} {row['exhst_t']:>+7.3f} {row['inv_t']:>6.3f} "
f"{str(row['tp_bps']):>6} {str(row['stop_bps']):>5} {row['max_hold']:>4} | "
f"{row['n_trades']:>8,} {row['wr']:>6.1f}% {fmt_pf(row['pf']):>10} {row['avg_hold']:>8.1f} | "
f"{row.get('pct_TP',0):>5.1f} {row.get('pct_STOP',0):>5.1f} "
f"{row.get('pct_EXHST',0):>5.1f} {row.get('pct_INV',0):>5.1f} {row.get('pct_MAX_HOLD',0):>5.1f}\n")
f.write(f"\nRuntime: {elapsed:.0f}s Files: {total}\n")
print(f"\n{top50_path}")
print(f"\nTotal runtime: {elapsed:.0f}s")
print(f"\nKEY: Look for PF > 1.0 — that's the profitable regime-exit configuration.")
print(f" Compare avg_hold vs MAX_HOLD to see how much early the regime exits fire.")