Includes core prod + GREEN/BLUE subsystems: - prod/ (BLUE harness, configs, scripts, docs) - nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved) - adaptive_exit/ (AEM engine + models/bucket_assignments.pkl) - Observability/ (EsoF advisor, TUI, dashboards) - external_factors/ (EsoF producer) - mc_forewarning_qlabs_fork/ (MC regime/envelope) Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
506 lines
22 KiB
Python
Executable File
506 lines
22 KiB
Python
Executable File
"""
|
|
Exp 4 — proxy_B Coupling & Orthogonality Sweep
|
|
|
|
Research questions:
|
|
1. Is proxy_B orthogonal to the entry signal (vel_div) and other system state?
|
|
2. Can proxy_B be *coupled* to existing system parameters to reduce DD without
|
|
reducing ROI? (position scale, hold limit, stop gate, rising-proxy exit)
|
|
3. Does proxy_B predict trades that will hit large adverse excursions (MAE)?
|
|
|
|
Method: retroactive shadow analysis on the full 2155-trade baseline.
|
|
- One full AE run with extended logging (per-bar proxy_B + vel_div + prices)
|
|
- All coupling tests applied post-hoc: O(N_trades) per config → < 1s for 150+ configs
|
|
- Focus metric: DD reduction with ROI >= gold * 0.95
|
|
|
|
Note on stop_pct=1.0 in gold config:
|
|
The engine has stop_pct=1.0 (100% — effectively no stop). Trades exit via:
|
|
fixed_tp (0.95%), max_hold_bars (120), or direction-reversal signal.
|
|
This means MAE can be large before trades recover → proxy-gated stop is meaningful.
|
|
|
|
Coupling modes:
|
|
A. scale_suppress: scale down position when proxy_B high at entry
|
|
B. scale_boost: scale up position when proxy_B low at entry
|
|
C. hold_limit: exit at fraction of natural hold when proxy_B_max exceeds threshold
|
|
D. rising_exit: exit early when proxy_B trajectory during hold is strongly rising
|
|
E. pure_stop: retroactive stop simulation (benchmark, no proxy coupling)
|
|
F. gated_stop: stop applies ONLY when proxy_B at entry exceeds threshold
|
|
|
|
Statistical tests:
|
|
- Pearson + Spearman: proxy_B vs vel_div, pnl, MAE
|
|
- Mann-Whitney U: worst-10% trades vs rest on proxy_B_entry
|
|
"""
|
|
import sys, time, json, math
|
|
sys.stdout.reconfigure(encoding='utf-8', errors='replace')
|
|
from pathlib import Path
|
|
import numpy as np
|
|
from collections import defaultdict
|
|
|
|
_HERE = Path(__file__).resolve().parent
|
|
sys.path.insert(0, str(_HERE.parent))
|
|
|
|
from exp_shared import (
|
|
ensure_jit, ENGINE_KWARGS, GOLD, MC_BASE_CFG,
|
|
load_data, load_forewarner, log_results
|
|
)
|
|
from nautilus_dolphin.nautilus.esf_alpha_orchestrator import NDAlphaEngine
|
|
from nautilus_dolphin.nautilus.adaptive_circuit_breaker import AdaptiveCircuitBreaker
|
|
|
|
|
|
# ── Extended shadow engine ────────────────────────────────────────────────────
|
|
|
|
class CouplingEngine(NDAlphaEngine):
|
|
"""Runs baseline + captures per-bar: proxy_B, vel_div, asset prices."""
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.day_proxy = {} # date_str → {ri: proxy_B}
|
|
self.day_veldiv = {} # date_str → {ri: vel_div}
|
|
self.day_prices = {} # date_str → {ri: {asset: price}}
|
|
self._n_before = 0
|
|
self.trade_dates = [] # parallel to trade_history
|
|
|
|
def process_day(self, date_str, df, asset_columns,
|
|
vol_regime_ok=None, direction=None, posture='APEX'):
|
|
self.day_proxy[date_str] = {}
|
|
self.day_veldiv[date_str] = {}
|
|
self.day_prices[date_str] = {}
|
|
self._n_before = len(self.trade_history)
|
|
|
|
self.begin_day(date_str, posture=posture, direction=direction)
|
|
bid = 0
|
|
for ri in range(len(df)):
|
|
row = df.iloc[ri]
|
|
vd = row.get('vel_div')
|
|
if vd is None or not np.isfinite(float(vd)):
|
|
self._global_bar_idx += 1; bid += 1; continue
|
|
|
|
def gf(col):
|
|
v = row.get(col)
|
|
if v is None: return 0.0
|
|
try: return float(v) if np.isfinite(float(v)) else 0.0
|
|
except: return 0.0
|
|
|
|
v50 = gf('v50_lambda_max_velocity')
|
|
v750 = gf('v750_lambda_max_velocity')
|
|
inst = gf('instability_50')
|
|
pb = inst - v750
|
|
|
|
self.day_proxy[date_str][ri] = pb
|
|
self.day_veldiv[date_str][ri] = float(vd)
|
|
|
|
prices = {}
|
|
for ac in asset_columns:
|
|
p = row.get(ac)
|
|
if p is not None and p > 0 and np.isfinite(float(p)):
|
|
prices[ac] = float(p)
|
|
self.day_prices[date_str][ri] = prices
|
|
|
|
if not prices:
|
|
self._global_bar_idx += 1; bid += 1; continue
|
|
|
|
vrok = bool(vol_regime_ok[ri]) if vol_regime_ok is not None else (bid >= 100)
|
|
self.step_bar(bar_idx=ri, vel_div=float(vd), prices=prices,
|
|
vol_regime_ok=vrok, v50_vel=v50, v750_vel=v750)
|
|
bid += 1
|
|
|
|
self.end_day()
|
|
for _ in self.trade_history[self._n_before:]:
|
|
self.trade_dates.append(date_str)
|
|
|
|
|
|
# ── Build shadow data ─────────────────────────────────────────────────────────
|
|
|
|
def build_shadow(d, fw):
|
|
kw = ENGINE_KWARGS.copy()
|
|
acb = AdaptiveCircuitBreaker()
|
|
acb.preload_w750(d['date_strings'])
|
|
eng = CouplingEngine(**kw)
|
|
eng.set_ob_engine(d['ob_eng'])
|
|
eng.set_acb(acb)
|
|
if fw: eng.set_mc_forewarner(fw, MC_BASE_CFG)
|
|
eng.set_esoteric_hazard_multiplier(0.0)
|
|
|
|
for pf in d['parquet_files']:
|
|
ds = pf.stem
|
|
df, acols, dvol = d['pq_data'][ds]
|
|
vol_ok = np.where(np.isfinite(dvol), dvol > d['vol_p60'], False)
|
|
eng.process_day(ds, df, acols, vol_regime_ok=vol_ok)
|
|
|
|
tr = eng.trade_history
|
|
roi = (eng.capital - 25000) / 25000 * 100
|
|
print(f" Shadow run: ROI={roi:.2f}% Trades={len(tr)}"
|
|
f" Tagged={len(eng.trade_dates)}")
|
|
return eng, tr
|
|
|
|
|
|
# ── Feature extraction ────────────────────────────────────────────────────────
|
|
|
|
def extract_features(eng, tr):
|
|
"""Per-trade features for coupling analysis."""
|
|
feats = []
|
|
for t, date in zip(tr, eng.trade_dates):
|
|
if date is None:
|
|
continue
|
|
entry_bar = int(t.entry_bar)
|
|
exit_bar = int(getattr(t, 'exit_bar', entry_bar))
|
|
direction = int(t.direction)
|
|
asset = t.asset
|
|
pnl_frac = float(t.pnl_pct) # fraction (not %)
|
|
pnl_abs = float(t.pnl_absolute) if hasattr(t, 'pnl_absolute') else pnl_frac * 250.
|
|
entry_price = float(getattr(t, 'entry_price', 0) or 0)
|
|
|
|
pb_entry = eng.day_proxy.get(date, {}).get(entry_bar, np.nan)
|
|
vd_entry = eng.day_veldiv.get(date, {}).get(entry_bar, np.nan)
|
|
|
|
# Hold bars (in-trade, exclusive of entry)
|
|
hold_bars = sorted(ri for ri in eng.day_proxy.get(date, {})
|
|
if entry_bar < ri <= exit_bar)
|
|
pb_hold = [eng.day_proxy[date][ri] for ri in hold_bars]
|
|
pb_max = max(pb_hold) if pb_hold else (pb_entry if np.isfinite(pb_entry) else 0.0)
|
|
pb_traj = (pb_hold[-1] - pb_hold[0]) if len(pb_hold) > 1 else 0.0
|
|
|
|
# Max adverse excursion (MAE) — negative = loss
|
|
mae = 0.0
|
|
if entry_price > 0:
|
|
for ri in hold_bars:
|
|
p = eng.day_prices.get(date, {}).get(ri, {}).get(asset, 0.0)
|
|
if p > 0:
|
|
exc = direction * (p - entry_price) / entry_price
|
|
if exc < mae:
|
|
mae = exc
|
|
|
|
# Early exit prices at hold fraction 0.25, 0.50, 0.75
|
|
early = {}
|
|
for frac in (0.25, 0.50, 0.75):
|
|
target = entry_bar + max(1, int(frac * (exit_bar - entry_bar)))
|
|
avail = [ri for ri in hold_bars if ri >= target]
|
|
if avail and entry_price > 0:
|
|
p = eng.day_prices.get(date, {}).get(avail[0], {}).get(asset, 0.0)
|
|
if p > 0:
|
|
early[frac] = direction * (p - entry_price) / entry_price
|
|
continue
|
|
early[frac] = pnl_frac # fallback: no change
|
|
|
|
feats.append(dict(
|
|
date=date,
|
|
hold_bars=exit_bar - entry_bar,
|
|
direction=direction,
|
|
pnl_frac=pnl_frac,
|
|
pnl_abs=pnl_abs,
|
|
pb_entry=pb_entry,
|
|
vd_entry=vd_entry,
|
|
pb_max=pb_max,
|
|
pb_traj=pb_traj,
|
|
mae=mae,
|
|
e25=early[0.25],
|
|
e50=early[0.50],
|
|
e75=early[0.75],
|
|
))
|
|
return feats
|
|
|
|
|
|
# ── Orthogonality analysis ────────────────────────────────────────────────────
|
|
|
|
def orthogonality_analysis(feats):
|
|
from scipy.stats import pearsonr, spearmanr, mannwhitneyu
|
|
|
|
valid = [f for f in feats if np.isfinite(f['pb_entry']) and np.isfinite(f['vd_entry'])]
|
|
pb_e = np.array([f['pb_entry'] for f in valid])
|
|
vd_e = np.array([f['vd_entry'] for f in valid])
|
|
pnl = np.array([f['pnl_frac'] for f in valid])
|
|
mae = np.array([f['mae'] for f in valid])
|
|
pb_mx = np.array([f['pb_max'] for f in valid])
|
|
hold = np.array([f['hold_bars'] for f in valid])
|
|
|
|
print(f"\n N valid (finite pb_entry + vd_entry): {len(valid)}/{len(feats)}")
|
|
print(f" proxy_B stats: mean={pb_e.mean():.4f} std={pb_e.std():.4f} "
|
|
f"p10={np.percentile(pb_e,10):.4f} p90={np.percentile(pb_e,90):.4f}")
|
|
print(f" vel_div stats: mean={vd_e.mean():.4f} std={vd_e.std():.4f}")
|
|
print()
|
|
|
|
pairs = [
|
|
('pb_entry', pb_e, 'vel_div_entry', vd_e),
|
|
('pb_entry', pb_e, 'pnl_frac', pnl),
|
|
('pb_entry', pb_e, 'mae', mae),
|
|
('pb_entry', pb_e, 'hold_bars', hold),
|
|
('pb_max', pb_mx, 'pnl_frac', pnl),
|
|
('pb_max', pb_mx, 'mae', mae),
|
|
]
|
|
corr_res = {}
|
|
for na, a, nb, b in pairs:
|
|
pr, pp = pearsonr(a, b)
|
|
sr, sp = spearmanr(a, b)
|
|
sig = '***' if pp < 0.001 else '**' if pp < 0.01 else '*' if pp < 0.05 else 'ns'
|
|
print(f" corr({na}, {nb}): Pearson r={pr:+.4f} p={pp:.4f} {sig:3s}"
|
|
f" Spearman rho={sr:+.4f}")
|
|
corr_res[f'{na}_vs_{nb}'] = dict(pearson=float(pr), p=float(pp),
|
|
spearman=float(sr), sig=sig)
|
|
|
|
# Mann-Whitney: is proxy_B different for worst-10% trades vs rest?
|
|
print()
|
|
for label, metric in [('worst_pnl_10pct', pnl), ('worst_mae_10pct', mae)]:
|
|
cut = np.percentile(metric, 10)
|
|
mask_w = metric <= cut
|
|
pb_w = pb_e[mask_w]
|
|
pb_r = pb_e[~mask_w]
|
|
stat, p = mannwhitneyu(pb_w, pb_r, alternative='two-sided')
|
|
sig = '***' if p < 0.001 else '**' if p < 0.01 else '*' if p < 0.05 else 'ns'
|
|
print(f" MW {label}: pb_entry worst={pb_w.mean():.4f} rest={pb_r.mean():.4f} "
|
|
f"p={p:.4f} {sig}")
|
|
corr_res[f'mw_{label}'] = dict(stat=float(stat), p=float(p),
|
|
mean_worst=float(pb_w.mean()),
|
|
mean_rest=float(pb_r.mean()), sig=sig)
|
|
return corr_res
|
|
|
|
|
|
# ── Coupling sweep ────────────────────────────────────────────────────────────
|
|
|
|
def _dd_roi(new_pnl_abs, date_order, date_to_trades):
|
|
"""Retroactive DD and ROI from modified per-trade PnL array."""
|
|
cap, peak, max_dd = 25000.0, 25000.0, 0.0
|
|
total = 0.0
|
|
for d in date_order:
|
|
for i in date_to_trades[d]:
|
|
cap += new_pnl_abs[i]
|
|
total += new_pnl_abs[i]
|
|
if cap > peak: peak = cap
|
|
dd = (peak - cap) / peak * 100.0
|
|
if dd > max_dd: max_dd = dd
|
|
return total / 25000. * 100., max_dd
|
|
|
|
|
|
def coupling_sweep(feats, n_max=None):
|
|
N = len(feats)
|
|
if n_max: feats = feats[:n_max]
|
|
|
|
# ---- Arrays ----
|
|
pnl_abs = np.array([f['pnl_abs'] for f in feats])
|
|
pnl_frac = np.array([f['pnl_frac'] for f in feats])
|
|
pb_entry = np.array([f['pb_entry'] for f in feats])
|
|
pb_max = np.array([f['pb_max'] for f in feats])
|
|
pb_traj = np.array([f['pb_traj'] for f in feats])
|
|
mae = np.array([f['mae'] for f in feats])
|
|
e25 = np.array([f['e25'] for f in feats])
|
|
e50 = np.array([f['e50'] for f in feats])
|
|
e75 = np.array([f['e75'] for f in feats])
|
|
|
|
# Replace NaN pb with median
|
|
pb_med = float(np.nanmedian(pb_entry))
|
|
pb_entry_c = np.where(np.isfinite(pb_entry), pb_entry, pb_med)
|
|
pb_max_c = np.where(np.isfinite(pb_max), pb_max, pb_med)
|
|
|
|
# Percentile ranks (0=low, 1=high)
|
|
def prank(x):
|
|
r = np.argsort(np.argsort(x)).astype(float)
|
|
return r / max(len(r) - 1, 1)
|
|
|
|
rk_e = prank(pb_entry_c)
|
|
rk_mx = prank(pb_max_c)
|
|
rk_tr = prank(pb_traj)
|
|
|
|
# Date ordering for DD computation
|
|
dates_list = [f['date'] for f in feats]
|
|
date_order = sorted(set(dates_list))
|
|
date_to_trades = defaultdict(list)
|
|
for i, d in enumerate(dates_list):
|
|
date_to_trades[d].append(i)
|
|
|
|
base_roi, base_dd = _dd_roi(pnl_abs, date_order, date_to_trades)
|
|
|
|
# Helper: new_pnl_abs from early exit fraction
|
|
def _early_abs(early_frac_arr):
|
|
ratio = np.where(np.abs(pnl_frac) > 1e-9,
|
|
early_frac_arr / pnl_frac, 1.0)
|
|
ratio = np.clip(ratio, -5.0, 5.0)
|
|
return pnl_abs * ratio
|
|
|
|
configs = []
|
|
|
|
def add(name, new_pnl, **meta):
|
|
roi, dd = _dd_roi(new_pnl, date_order, date_to_trades)
|
|
configs.append(dict(name=name, roi=roi, dd=dd,
|
|
roi_delta=roi - base_roi,
|
|
dd_delta=dd - base_dd,
|
|
**meta))
|
|
|
|
# ─── Mode A: scale_suppress — scale down when proxy_B high ───────────────
|
|
for sig_name, rk in [('pb_entry', rk_e), ('pb_max', rk_mx), ('pb_traj', rk_tr)]:
|
|
for thr in [0.50, 0.65, 0.75, 0.85]:
|
|
for alpha in [0.5, 1.0, 2.0]:
|
|
for s_min in [0.0, 0.25, 0.5]:
|
|
scales = np.maximum(s_min, 1.0 - alpha * np.maximum(0, rk - thr))
|
|
add(f'A/{sig_name}/thr{thr}/a{alpha}/min{s_min}',
|
|
pnl_abs * scales,
|
|
mode='scale_suppress', signal=sig_name,
|
|
thr=thr, alpha=alpha, s_min=s_min,
|
|
scale_mean=float(scales.mean()))
|
|
|
|
# ─── Mode B: scale_boost — scale up when proxy_B low ─────────────────────
|
|
for sig_name, rk in [('pb_entry', rk_e)]:
|
|
for thr in [0.25, 0.35, 0.50]:
|
|
for alpha in [0.5, 1.0]:
|
|
scales = 1.0 + alpha * np.maximum(0, thr - rk)
|
|
add(f'B/{sig_name}/thr{thr}/a{alpha}',
|
|
pnl_abs * scales,
|
|
mode='scale_boost', signal=sig_name,
|
|
thr=thr, alpha=alpha,
|
|
scale_mean=float(scales.mean()))
|
|
|
|
# ─── Mode C: hold_limit — exit early when pb_max high ────────────────────
|
|
for frac, early_arr in [(0.25, e25), (0.50, e50), (0.75, e75)]:
|
|
for thr_pct in [0.65, 0.75, 0.85, 0.90]:
|
|
thr_abs = np.percentile(pb_max_c, thr_pct * 100)
|
|
trigger = pb_max_c > thr_abs
|
|
new_pnl_f = np.where(trigger, early_arr, pnl_frac)
|
|
n_trig = int(trigger.sum())
|
|
add(f'C/frac{frac}/pbmax_p{thr_pct}',
|
|
_early_abs(new_pnl_f),
|
|
mode='hold_limit', frac=frac, thr_pct=thr_pct, n_triggered=n_trig)
|
|
|
|
# ─── Mode D: rising_exit — exit early when pb trajectory strongly up ──────
|
|
for frac, early_arr in [(0.25, e25), (0.50, e50)]:
|
|
for thr_pct in [0.70, 0.80, 0.90]:
|
|
thr_abs = np.percentile(pb_traj, thr_pct * 100)
|
|
trigger = pb_traj > thr_abs
|
|
new_pnl_f = np.where(trigger, early_arr, pnl_frac)
|
|
n_trig = int(trigger.sum())
|
|
add(f'D/frac{frac}/traj_p{thr_pct}',
|
|
_early_abs(new_pnl_f),
|
|
mode='rising_exit', frac=frac, thr_pct=thr_pct, n_triggered=n_trig)
|
|
|
|
# ─── Mode E: pure_stop — retroactive stop (no proxy, benchmark) ──────────
|
|
for stop_p in [0.003, 0.005, 0.008, 0.010, 0.015, 0.020, 0.030]:
|
|
# mae < -stop_p → exit was stopped; clamp pnl_frac to -stop_p
|
|
stopped = mae < -stop_p
|
|
new_pnl_f = np.where(stopped, -stop_p, pnl_frac)
|
|
n_trig = int(stopped.sum())
|
|
add(f'E/stop_{stop_p:.3f}',
|
|
_early_abs(new_pnl_f),
|
|
mode='pure_stop', stop_pct=stop_p, n_triggered=n_trig)
|
|
|
|
# ─── Mode F: gated_stop — stop applies only when pb_entry high ───────────
|
|
for stop_p in [0.005, 0.008, 0.010, 0.015]:
|
|
for gate_pct in [0.50, 0.60, 0.75, 0.85]:
|
|
gate_thr = np.percentile(pb_entry_c, gate_pct * 100)
|
|
gated = pb_entry_c > gate_thr
|
|
stopped = gated & (mae < -stop_p)
|
|
new_pnl_f = np.where(stopped, -stop_p, pnl_frac)
|
|
n_trig = int(stopped.sum())
|
|
add(f'F/stop_{stop_p:.3f}/gate_p{gate_pct}',
|
|
_early_abs(new_pnl_f),
|
|
mode='gated_stop', stop_pct=stop_p, gate_pct=gate_pct,
|
|
n_triggered=n_trig)
|
|
|
|
return base_roi, base_dd, configs
|
|
|
|
|
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
|
|
|
def main():
|
|
ensure_jit()
|
|
print("\nLoading data & forewarner...")
|
|
d = load_data()
|
|
fw = load_forewarner()
|
|
|
|
print("\nBuilding shadow data (one full AE run)...")
|
|
t0 = time.time()
|
|
eng, tr = build_shadow(d, fw)
|
|
print(f" Built in {time.time()-t0:.0f}s")
|
|
|
|
print("\nExtracting per-trade features...")
|
|
feats = extract_features(eng, tr)
|
|
print(f" {len(feats)} trades with valid features")
|
|
|
|
# ── Orthogonality ─────────────────────────────────────────────────────────
|
|
print("\n" + "="*60)
|
|
print("ORTHOGONALITY ANALYSIS")
|
|
print("="*60)
|
|
corr_res = orthogonality_analysis(feats)
|
|
|
|
# ── Coupling sweep ────────────────────────────────────────────────────────
|
|
print("\n" + "="*60)
|
|
print(f"COUPLING SWEEP (N={len(feats)} trades)")
|
|
print("="*60)
|
|
t1 = time.time()
|
|
base_roi, base_dd, configs = coupling_sweep(feats)
|
|
print(f" Tested {len(configs)} configs in {time.time()-t1:.2f}s")
|
|
print(f" Baseline: ROI={base_roi:.2f}% DD={base_dd:.2f}%")
|
|
|
|
# ── Find DD-reduction candidates ──────────────────────────────────────────
|
|
GOLD_ROI = GOLD['roi']
|
|
GOLD_DD = GOLD['dd']
|
|
ROI_FLOOR = GOLD_ROI * 0.95 # allow at most -5% ROI cost
|
|
|
|
candidates = [c for c in configs
|
|
if c['dd'] < GOLD_DD and c['roi'] >= ROI_FLOOR]
|
|
candidates.sort(key=lambda c: (c['dd_delta'], -c['roi_delta']))
|
|
|
|
print(f"\n Configs with DD < {GOLD_DD:.2f}% AND ROI >= {ROI_FLOOR:.1f}%: "
|
|
f"{len(candidates)}")
|
|
|
|
# Also find absolute best DD reduction regardless of ROI
|
|
by_dd = sorted(configs, key=lambda c: c['dd'])[:10]
|
|
|
|
# Print tables
|
|
def hdr():
|
|
print(f"\n {'Config':<45} {'ROI%':>7} {'DD%':>6} {'ΔROI':>7} {'ΔDD':>7}"
|
|
f" {'mode':<14}")
|
|
print(' ' + '-'*90)
|
|
|
|
def row(c):
|
|
extra = ''
|
|
if 'n_triggered' in c: extra = f" trig={c['n_triggered']}"
|
|
if 'scale_mean' in c: extra = f" smean={c['scale_mean']:.3f}"
|
|
print(f" {c['name']:<45} {c['roi']:>7.2f} {c['dd']:>6.2f} "
|
|
f"{c['roi_delta']:>+7.2f} {c['dd_delta']:>+7.2f} "
|
|
f"{c.get('mode',''):<14}{extra}")
|
|
|
|
print(f"\n *** GOLD ***: ROI={GOLD_ROI:.2f}% DD={GOLD_DD:.2f}%")
|
|
|
|
if candidates:
|
|
print("\n ── DD < gold AND ROI >= 95% gold ──")
|
|
hdr()
|
|
for c in candidates[:20]:
|
|
row(c)
|
|
else:
|
|
print("\n (no configs meet both criteria)")
|
|
|
|
print("\n ── Top 10 by lowest DD (regardless of ROI) ──")
|
|
hdr()
|
|
for c in by_dd:
|
|
row(c)
|
|
|
|
# ── Summary by mode ───────────────────────────────────────────────────────
|
|
from itertools import groupby
|
|
print("\n ── Best config per mode (by DD delta, ROI >= floor) ──")
|
|
hdr()
|
|
by_mode = defaultdict(list)
|
|
for c in configs:
|
|
by_mode[c.get('mode', 'other')].append(c)
|
|
for mode, cs in sorted(by_mode.items()):
|
|
best = min(cs, key=lambda c: c['dd'])
|
|
row(best)
|
|
|
|
# ── Log results ───────────────────────────────────────────────────────────
|
|
out = _HERE / 'exp4_proxy_coupling_results.json'
|
|
payload = {
|
|
'gold': GOLD,
|
|
'baseline': dict(roi=base_roi, dd=base_dd),
|
|
'orthogonality': corr_res,
|
|
'n_configs_tested': len(configs),
|
|
'dd_reduction_candidates': candidates[:20],
|
|
'top10_by_dd': by_dd,
|
|
'best_per_mode': {
|
|
mode: min(cs, key=lambda c: c['dd'])
|
|
for mode, cs in by_mode.items()
|
|
},
|
|
'all_configs': configs,
|
|
}
|
|
out.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(out, 'w', encoding='utf-8') as f:
|
|
json.dump(payload, f, indent=2)
|
|
print(f"\n Logged → {out}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|