726 lines
29 KiB
Python
726 lines
29 KiB
Python
|
|
#!/usr/bin/env python3
|
|||
|
|
"""
|
|||
|
|
EsoF Gate Strategy — Counterfactual Simulation + Unit Tests
|
|||
|
|
|
|||
|
|
Runs 6 gating strategies against the real 637-trade CH dataset.
|
|||
|
|
For each strategy: computes what would have happened if the gate
|
|||
|
|
had been active at every entry.
|
|||
|
|
|
|||
|
|
Methodology
|
|||
|
|
───────────
|
|||
|
|
- Pull trades from dolphin.trade_events (ClickHouse)
|
|||
|
|
- For each trade: reconstruct EsoF advisory at entry ts via compute_esof()
|
|||
|
|
- Apply gate strategy → get action (ALLOW/BLOCK/SCALE) + lev_mult
|
|||
|
|
- Strategy A-E: counterfactual_pnl = actual_pnl * lev_mult (or 0 if BLOCK)
|
|||
|
|
PnL scales linearly with leverage: halving leverage halves both win and loss.
|
|||
|
|
This is accurate for FIXED_TP and MAX_HOLD exits (fixed % targets).
|
|||
|
|
- Strategy F (S6_BUCKET): counterfactual_pnl = actual_pnl * s6_mult[bucket_id]
|
|||
|
|
Uses EsoF-modulated per-bucket multipliers. Compared to baseline S6 (uniform S6
|
|||
|
|
regardless of EsoF) to isolate the EsoF contribution.
|
|||
|
|
- Sn coefficient modulation: analytical sensitivity analysis (cannot be tested
|
|||
|
|
against existing data without a full IRP klines replay).
|
|||
|
|
|
|||
|
|
Run standalone:
|
|||
|
|
source /home/dolphin/siloqy_env/bin/activate
|
|||
|
|
cd /mnt/dolphinng5_predict
|
|||
|
|
python prod/tests/test_esof_gate_strategies.py
|
|||
|
|
|
|||
|
|
Run as pytest:
|
|||
|
|
pytest prod/tests/test_esof_gate_strategies.py -v
|
|||
|
|
"""
|
|||
|
|
from __future__ import annotations
|
|||
|
|
|
|||
|
|
import json
|
|||
|
|
import math
|
|||
|
|
import sys
|
|||
|
|
import urllib.request
|
|||
|
|
import base64
|
|||
|
|
from collections import defaultdict
|
|||
|
|
from datetime import datetime, timezone
|
|||
|
|
from pathlib import Path
|
|||
|
|
from typing import Dict, List, Optional, Tuple
|
|||
|
|
|
|||
|
|
import pytest
|
|||
|
|
|
|||
|
|
# ── path setup ────────────────────────────────────────────────────────────────
|
|||
|
|
_ROOT = Path(__file__).parent.parent.parent
|
|||
|
|
sys.path.insert(0, str(_ROOT))
|
|||
|
|
sys.path.insert(0, str(_ROOT / "Observability"))
|
|||
|
|
|
|||
|
|
from esof_advisor import compute_esof, BASELINE_WR
|
|||
|
|
from esof_gate import (
|
|||
|
|
apply_gate, get_s6_mult, get_bucket,
|
|||
|
|
BUCKET_MAP, S6_BASE, S6_MULT, IRP_PARAMS, IRP_GOLD,
|
|||
|
|
GateResult,
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# ── CH config ─────────────────────────────────────────────────────────────────
|
|||
|
|
CH_URL = "http://localhost:8123"
|
|||
|
|
CH_USER = "dolphin"
|
|||
|
|
CH_PASS = "dolphin_ch_2026"
|
|||
|
|
CH_DB = "dolphin"
|
|||
|
|
|
|||
|
|
def _ch_query(sql: str) -> List[List[str]]:
|
|||
|
|
"""Execute CH query, return rows as list of string lists. Raises on error."""
|
|||
|
|
auth = base64.b64encode(f"{CH_USER}:{CH_PASS}".encode()).decode()
|
|||
|
|
req = urllib.request.Request(
|
|||
|
|
f"{CH_URL}/?database={CH_DB}&default_format=TabSeparated",
|
|||
|
|
data=sql.encode(),
|
|||
|
|
headers={"Authorization": f"Basic {auth}"},
|
|||
|
|
)
|
|||
|
|
with urllib.request.urlopen(req, timeout=10) as r:
|
|||
|
|
raw = r.read().decode().strip()
|
|||
|
|
if not raw:
|
|||
|
|
return []
|
|||
|
|
return [line.split('\t') for line in raw.split('\n')]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _ch_available() -> bool:
|
|||
|
|
try:
|
|||
|
|
_ch_query("SELECT 1")
|
|||
|
|
return True
|
|||
|
|
except Exception:
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
|
|||
|
|
CH_UP = _ch_available()
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ── Trade fetch ───────────────────────────────────────────────────────────────
|
|||
|
|
|
|||
|
|
def fetch_trades() -> List[dict]:
|
|||
|
|
"""
|
|||
|
|
Pull all blue strategy trades from CH.
|
|||
|
|
Returns list of dicts with keys:
|
|||
|
|
ts (datetime UTC), asset, side, pnl, exit_reason, leverage, bucket_id
|
|||
|
|
"""
|
|||
|
|
sql = """
|
|||
|
|
SELECT
|
|||
|
|
toUnixTimestamp64Milli(ts) AS ts_ms,
|
|||
|
|
asset,
|
|||
|
|
side,
|
|||
|
|
pnl,
|
|||
|
|
exit_reason,
|
|||
|
|
leverage
|
|||
|
|
FROM dolphin.trade_events
|
|||
|
|
WHERE strategy = 'blue'
|
|||
|
|
AND exit_reason NOT IN ('HIBERNATE_HALT', 'SUBDAY_ACB_NORMALIZATION')
|
|||
|
|
ORDER BY ts
|
|||
|
|
"""
|
|||
|
|
# Excluded:
|
|||
|
|
# HIBERNATE_HALT — force-exit by MHS posture, not alpha
|
|||
|
|
# SUBDAY_ACB_NORMALIZATION — intraday ACB control-plane forced exit, not alpha
|
|||
|
|
rows = _ch_query(sql)
|
|||
|
|
trades = []
|
|||
|
|
# Load bucket assignments from pkl if available
|
|||
|
|
pkl_map: Optional[Dict[str, int]] = None
|
|||
|
|
try:
|
|||
|
|
import pickle
|
|||
|
|
pkl_path = _ROOT / "adaptive_exit/models/bucket_assignments.pkl"
|
|||
|
|
with open(pkl_path, 'rb') as f:
|
|||
|
|
data = pickle.load(f)
|
|||
|
|
pkl_map = data.get('assignments', {})
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
for row in rows:
|
|||
|
|
if len(row) < 6:
|
|||
|
|
continue
|
|||
|
|
try:
|
|||
|
|
ts_ms = int(row[0])
|
|||
|
|
asset = row[1]
|
|||
|
|
side = row[2]
|
|||
|
|
pnl = float(row[3])
|
|||
|
|
exit_rsn = row[4]
|
|||
|
|
leverage = float(row[5])
|
|||
|
|
except (ValueError, IndexError):
|
|||
|
|
continue
|
|||
|
|
|
|||
|
|
ts = datetime.fromtimestamp(ts_ms / 1000.0, tz=timezone.utc)
|
|||
|
|
bucket_id = get_bucket(asset, pkl_map)
|
|||
|
|
|
|||
|
|
trades.append({
|
|||
|
|
"ts": ts,
|
|||
|
|
"asset": asset,
|
|||
|
|
"side": side,
|
|||
|
|
"pnl": pnl,
|
|||
|
|
"exit_reason": exit_rsn,
|
|||
|
|
"leverage": leverage,
|
|||
|
|
"bucket_id": bucket_id,
|
|||
|
|
})
|
|||
|
|
return trades
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ── Counterfactual engine ──────────────────────────────────────────────────────
|
|||
|
|
|
|||
|
|
def run_strategy(strategy: str, trades: List[dict]) -> dict:
|
|||
|
|
"""
|
|||
|
|
Run one gating strategy against the trade list.
|
|||
|
|
Returns summary dict.
|
|||
|
|
"""
|
|||
|
|
cf_pnl = 0.0
|
|||
|
|
actual_pnl = 0.0
|
|||
|
|
n_trades = len(trades)
|
|||
|
|
n_blocked = 0
|
|||
|
|
n_scaled = 0
|
|||
|
|
n_wins_cf = 0
|
|||
|
|
n_wins_act = 0
|
|||
|
|
|
|||
|
|
for t in trades:
|
|||
|
|
adv = compute_esof(t["ts"])
|
|||
|
|
result = apply_gate(strategy, adv)
|
|||
|
|
|
|||
|
|
actual_pnl += t["pnl"]
|
|||
|
|
n_wins_act += 1 if t["pnl"] > 0 else 0
|
|||
|
|
|
|||
|
|
if strategy == "F":
|
|||
|
|
# S6 bucket modulation: apply per-bucket × EsoF multiplier
|
|||
|
|
mult = result.s6_mult.get(t["bucket_id"], 0.4)
|
|||
|
|
cf_pnl += t["pnl"] * mult
|
|||
|
|
n_wins_cf += 1 if t["pnl"] * mult > 0 else 0
|
|||
|
|
if mult < 1e-6:
|
|||
|
|
n_blocked += 1
|
|||
|
|
elif mult < 1.0:
|
|||
|
|
n_scaled += 1
|
|||
|
|
else:
|
|||
|
|
mult = result.lev_mult
|
|||
|
|
if result.is_blocked:
|
|||
|
|
n_blocked += 1
|
|||
|
|
# cf_pnl += 0 (skip trade)
|
|||
|
|
else:
|
|||
|
|
cf_pnl += t["pnl"] * mult
|
|||
|
|
n_wins_cf += 1 if t["pnl"] * mult > 0 else 0
|
|||
|
|
if mult < 1.0:
|
|||
|
|
n_scaled += 1
|
|||
|
|
|
|||
|
|
n_exec_cf = n_trades - (n_blocked if strategy != "F" else 0)
|
|||
|
|
wr_act = (n_wins_act / n_trades * 100) if n_trades else 0
|
|||
|
|
wr_cf = (n_wins_cf / max(n_exec_cf, 1) * 100) if strategy != "F" else (n_wins_cf / n_trades * 100)
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"strategy": strategy,
|
|||
|
|
"n_trades": n_trades,
|
|||
|
|
"n_exec": n_exec_cf,
|
|||
|
|
"n_blocked": n_blocked,
|
|||
|
|
"n_scaled": n_scaled,
|
|||
|
|
"actual_pnl": round(actual_pnl, 2),
|
|||
|
|
"cf_pnl": round(cf_pnl, 2),
|
|||
|
|
"delta_pnl": round(cf_pnl - actual_pnl, 2),
|
|||
|
|
"wr_actual": round(wr_act, 1),
|
|||
|
|
"wr_cf": round(wr_cf, 1),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def run_s6_baseline(trades: List[dict]) -> dict:
|
|||
|
|
"""
|
|||
|
|
Baseline S6 (NEUTRAL mults, no EsoF modulation).
|
|||
|
|
Used to isolate EsoF contribution from strategy F.
|
|||
|
|
"""
|
|||
|
|
cf_pnl = 0.0
|
|||
|
|
n_wins_cf = 0
|
|||
|
|
for t in trades:
|
|||
|
|
mult = S6_BASE.get(t["bucket_id"], 0.4)
|
|||
|
|
cf_pnl += t["pnl"] * mult
|
|||
|
|
n_wins_cf += 1 if t["pnl"] * mult > 0 else 0
|
|||
|
|
wr_cf = n_wins_cf / len(trades) * 100 if trades else 0
|
|||
|
|
return {
|
|||
|
|
"strategy": "F_S6_BASE",
|
|||
|
|
"cf_pnl": round(cf_pnl, 2),
|
|||
|
|
"wr_cf": round(wr_cf, 1),
|
|||
|
|
"delta_pnl": round(cf_pnl - sum(t["pnl"] for t in trades), 2),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ── IRP Sn coefficient sensitivity analysis ───────────────────────────────────
|
|||
|
|
# The ARS constitutive formula: ARS = S1×log1p(eff) + S2×alignment − S3×noise×1000
|
|||
|
|
# Gold spec: S1=0.50, S2=0.35, S3=0.15
|
|||
|
|
# Cannot be tested against existing CH trade data without a full IRP klines replay.
|
|||
|
|
# Below: mathematical sensitivity analysis — what direction does modulating Sn push things.
|
|||
|
|
|
|||
|
|
SN_GOLD = {"S1": 0.50, "S2": 0.35, "S3": 0.15}
|
|||
|
|
|
|||
|
|
SN_CONFIGS: Dict[str, Dict[str, float]] = {
|
|||
|
|
"GOLD (baseline)": {"S1": 0.50, "S2": 0.35, "S3": 0.15},
|
|||
|
|
"EFF-HEAVY (FAVORABLE)": {"S1": 0.60, "S2": 0.35, "S3": 0.10},
|
|||
|
|
"ALIGN-HEAVY (FAVORABLE)": {"S1": 0.45, "S2": 0.50, "S3": 0.10},
|
|||
|
|
"TIGHT (UNFAVORABLE)": {"S1": 0.45, "S2": 0.45, "S3": 0.25},
|
|||
|
|
"ULTRA-TIGHT (UNFAV)": {"S1": 0.40, "S2": 0.45, "S3": 0.30},
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def simulate_ars_sensitivity():
|
|||
|
|
"""
|
|||
|
|
Sn coefficient sensitivity: how much does the ARS of a 'good' vs 'marginal'
|
|||
|
|
asset change under each coefficient config?
|
|||
|
|
|
|||
|
|
Profiles a STRONG asset (high eff, high align, low noise) and
|
|||
|
|
a MARGINAL asset (moderate eff, low align, moderate noise).
|
|||
|
|
Shows: does the config WIDEN (strong-marginal gap decreases) or
|
|||
|
|
TIGHTEN (gap increases) selection?
|
|||
|
|
|
|||
|
|
A larger gap = tighter selection (fewer assets qualify relative to each other).
|
|||
|
|
A smaller gap = wider selection (more assets reach near-equal ARS → more diversity).
|
|||
|
|
"""
|
|||
|
|
profiles = {
|
|||
|
|
"B3 STRONG (ADA/DOGE): eff=3.2, align=0.60, noise=0.002":
|
|||
|
|
dict(eff=3.2, align=0.60, noise=0.002),
|
|||
|
|
"B6 GOOD (FET/ZRX): eff=2.0, align=0.52, noise=0.003":
|
|||
|
|
dict(eff=2.0, align=0.52, noise=0.003),
|
|||
|
|
"B0 MARGINAL (ONT/VET): eff=1.2, align=0.35, noise=0.006":
|
|||
|
|
dict(eff=1.2, align=0.35, noise=0.006),
|
|||
|
|
"B4 WORST (LTC/BNB): eff=0.8, align=0.28, noise=0.009":
|
|||
|
|
dict(eff=0.8, align=0.28, noise=0.009),
|
|||
|
|
"B1 LOW-CORR (XRP/XLM): eff=0.6, align=0.22, noise=0.012":
|
|||
|
|
dict(eff=0.6, align=0.22, noise=0.012),
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
results = {}
|
|||
|
|
for cfg_name, sn in SN_CONFIGS.items():
|
|||
|
|
row = {}
|
|||
|
|
for asset_name, p in profiles.items():
|
|||
|
|
ars = sn["S1"] * math.log1p(p["eff"]) + sn["S2"] * p["align"] - sn["S3"] * p["noise"] * 1000
|
|||
|
|
row[asset_name] = round(ars, 4)
|
|||
|
|
results[cfg_name] = row
|
|||
|
|
return results, list(profiles.keys())
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ── Report printer ─────────────────────────────────────────────────────────────
|
|||
|
|
|
|||
|
|
GREEN = "\033[32m"; RED = "\033[31m"; YELLOW = "\033[33m"
|
|||
|
|
BOLD = "\033[1m"; DIM = "\033[2m"; RST = "\033[0m"
|
|||
|
|
|
|||
|
|
def print_report(all_results: List[dict], s6_base: dict, sn_analysis):
|
|||
|
|
sn_table, asset_names = sn_analysis
|
|||
|
|
actual_net = all_results[0]["actual_pnl"]
|
|||
|
|
actual_wr = all_results[0]["wr_actual"]
|
|||
|
|
n = all_results[0]["n_trades"]
|
|||
|
|
|
|||
|
|
print(f"\n{BOLD}{'═'*72}{RST}")
|
|||
|
|
print(f"{BOLD} DOLPHIN EsoF Gate Strategy — Counterfactual Simulation{RST}")
|
|||
|
|
print(f" Dataset: {n} trades (HIBERNATE_HALT excluded) Baseline WR={actual_wr:.1f}% Net={actual_net:+,.2f}")
|
|||
|
|
print(f"{'═'*72}{RST}")
|
|||
|
|
|
|||
|
|
header = f" {'Strategy':<20}│{'T_exec':>7}│{'T_blk':>6}│{'CF Net':>10}│{'ΔPnL':>10}│{'WR_cf':>7}│{'WR_Δ':>6}"
|
|||
|
|
sep = f" {'─'*20}┼{'─'*7}┼{'─'*6}┼{'─'*10}┼{'─'*10}┼{'─'*7}┼{'─'*6}"
|
|||
|
|
print(f"\n{BOLD}{header}{RST}")
|
|||
|
|
print(sep)
|
|||
|
|
|
|||
|
|
STRAT_DESC = {
|
|||
|
|
"A": "A: LEV_SCALE",
|
|||
|
|
"B": "B: HARD_BLOCK",
|
|||
|
|
"C": "C: DOW_BLOCK",
|
|||
|
|
"D": "D: SESSION_BLOCK",
|
|||
|
|
"E": "E: COMBINED",
|
|||
|
|
"F": "F: S6_BUCKET",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
for r in all_results:
|
|||
|
|
name = STRAT_DESC.get(r["strategy"], r["strategy"])
|
|||
|
|
dpnl = r["delta_pnl"]
|
|||
|
|
dwr = r["wr_cf"] - r["wr_actual"]
|
|||
|
|
col = GREEN if dpnl > 0 else RED
|
|||
|
|
wrcol = GREEN if dwr > 0 else RED
|
|||
|
|
print(f" {name:<20}│{r['n_exec']:>7}│{r['n_blocked']:>6}│"
|
|||
|
|
f"{col}{r['cf_pnl']:>+10,.0f}{RST}│"
|
|||
|
|
f"{col}{dpnl:>+10,.0f}{RST}│"
|
|||
|
|
f"{wrcol}{r['wr_cf']:>6.1f}%{RST}│"
|
|||
|
|
f"{wrcol}{dwr:>+5.1f}pp{RST}")
|
|||
|
|
|
|||
|
|
# Strategy F vs baseline S6 (to show EsoF contribution)
|
|||
|
|
print(sep)
|
|||
|
|
f_r = next(r for r in all_results if r["strategy"] == "F")
|
|||
|
|
f_delta_vs_s6 = f_r["cf_pnl"] - s6_base["cf_pnl"]
|
|||
|
|
col = GREEN if f_delta_vs_s6 > 0 else RED
|
|||
|
|
print(f" {'F vs S6_BASE':<20}│{'':>7}│{'':>6}│{'':>10}│"
|
|||
|
|
f"{col}{f_delta_vs_s6:>+10,.0f}{RST}│{'':>7}│{'':>6} "
|
|||
|
|
f"{DIM}(EsoF contribution on top of flat S6){RST}")
|
|||
|
|
print(f" {'S6_BASE (flat)':<20}│{'':>7}│{'':>6}│{s6_base['cf_pnl']:>+10,.0f}│"
|
|||
|
|
f"{s6_base['delta_pnl']:>+10,.0f}│{s6_base['wr_cf']:>6.1f}%│{'':>6} "
|
|||
|
|
f"{DIM}(S6 no EsoF, for reference){RST}")
|
|||
|
|
|
|||
|
|
# Per-bucket breakdown for strategy F (EsoF-modulated vs flat S6)
|
|||
|
|
print(f"\n{BOLD} Strategy F: S6 bucket multipliers by EsoF label{RST}")
|
|||
|
|
bkt_header = f" {'Label':<16} " + " ".join(f"{'B'+str(b):>6}" for b in range(7))
|
|||
|
|
print(bkt_header)
|
|||
|
|
print(f" {'─'*16} " + " ".join(f"{'──────':>6}" for _ in range(7)))
|
|||
|
|
for label, mults in S6_MULT.items():
|
|||
|
|
note = "← WIDEN" if label in ("FAVORABLE","MILD_POSITIVE") else "← TIGHTEN" if label in ("UNFAVORABLE","MILD_NEGATIVE") else "← GOLD"
|
|||
|
|
row = f" {label:<16} " + " ".join(f"{mults.get(b,0.0):>6.2f}" for b in range(7))
|
|||
|
|
print(f"{row} {DIM}{note}{RST}")
|
|||
|
|
|
|||
|
|
# Sn coefficient sensitivity
|
|||
|
|
print(f"\n{BOLD} IRP Sn Coefficient Sensitivity (analytical — not from trades){RST}")
|
|||
|
|
print(f" {DIM}ARS = S1×log1p(eff) + S2×alignment − S3×noise×1000{RST}")
|
|||
|
|
print(f" {DIM}Gold: S1=0.50, S2=0.35, S3=0.15 | Effect: how much ARS changes per profile{RST}")
|
|||
|
|
print()
|
|||
|
|
|
|||
|
|
# Print as table: rows=configs, cols=asset profiles
|
|||
|
|
short_names = ["B3-STRONG", "B6-GOOD", "B0-MARG", "B4-WORST", "B1-LOWCR"]
|
|||
|
|
sn_hdr = f" {'Config':<28} " + " ".join(f"{n:>10}" for n in short_names)
|
|||
|
|
print(sn_hdr)
|
|||
|
|
print(f" {'─'*28} " + " ".join(f"{'──────────':>10}" for _ in short_names))
|
|||
|
|
|
|||
|
|
gold_row = list(sn_table.values())[0]
|
|||
|
|
for cfg_name, row in sn_table.items():
|
|||
|
|
vals = list(row.values())
|
|||
|
|
cells = []
|
|||
|
|
for i, v in enumerate(vals):
|
|||
|
|
ref = list(gold_row.values())[i]
|
|||
|
|
delta = v - ref
|
|||
|
|
if abs(delta) < 1e-4:
|
|||
|
|
cells.append(f"{v:>10.4f}")
|
|||
|
|
elif delta > 0:
|
|||
|
|
cells.append(f"{GREEN}{v:>10.4f}{RST}")
|
|||
|
|
else:
|
|||
|
|
cells.append(f"{RED}{v:>10.4f}{RST}")
|
|||
|
|
print(f" {cfg_name:<28} " + " ".join(cells))
|
|||
|
|
|
|||
|
|
# IRP threshold table
|
|||
|
|
print(f"\n{BOLD} IRP Filter Thresholds by EsoF Label (for future IRP replay backtest){RST}")
|
|||
|
|
print(f" {'Label':<16} {'align_min':>10} {'noise_max':>10} {'latency_max':>12} {'Effect'}")
|
|||
|
|
print(f" {'─'*16} {'─'*10} {'─'*10} {'─'*12} {'─'*20}")
|
|||
|
|
for label, p in IRP_PARAMS.items():
|
|||
|
|
note = "wider IRP" if label in ("FAVORABLE","MILD_POSITIVE") else "tighter IRP" if label in ("UNFAVORABLE","MILD_NEGATIVE") else "gold spec"
|
|||
|
|
col = GREEN if "wider" in note else RED if "tighter" in note else YELLOW
|
|||
|
|
print(f" {label:<16} {p['alignment_min']:>10.2f} {p['noise_max']:>10.0f} "
|
|||
|
|
f"{p['latency_max']:>12.0f} {col}{note}{RST}")
|
|||
|
|
|
|||
|
|
# Calibration protocol note
|
|||
|
|
print(f"\n{DIM} {'─'*68}{RST}")
|
|||
|
|
print(f" {BOLD}Online calibration protocol (no EsoF feedback loop):{RST}")
|
|||
|
|
print(f" {DIM}1. BLUE always runs ungated. New trades accumulate in CH unfiltered.{RST}")
|
|||
|
|
print(f" {DIM}2. EsoF tables are refreshed ONLY from ungated BLUE trades.{RST}")
|
|||
|
|
print(f" {DIM}3. Gate performance is evaluated on out-of-sample ungated data.{RST}")
|
|||
|
|
print(f" {DIM}4. Gate is wired in ONLY after ≥500 out-of-sample trades confirm{RST}")
|
|||
|
|
print(f" {DIM} that the gated periods (Mon, NY_AFT) remain negative out-of-sample.{RST}")
|
|||
|
|
print(f" {DIM} This prevents the filter→calibration→overfit loop.{RST}")
|
|||
|
|
print(f"{'═'*72}\n")
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ═════════════════════════════════════════════════════════════════════════════
|
|||
|
|
# UNIT TESTS (pytest)
|
|||
|
|
# ═════════════════════════════════════════════════════════════════════════════
|
|||
|
|
|
|||
|
|
class TestGateLogicPure:
|
|||
|
|
"""Pure unit tests — no CH, no HZ."""
|
|||
|
|
|
|||
|
|
def _adv(self, dow=1, session="ASIA_PACIFIC", score=0.0, label="NEUTRAL"):
|
|||
|
|
"""Minimal advisory dict for testing."""
|
|||
|
|
return {
|
|||
|
|
"dow": dow, "dow_name": ["Mon","Tue","Wed","Thu","Fri","Sat","Sun"][dow],
|
|||
|
|
"session": session,
|
|||
|
|
"advisory_score": score,
|
|||
|
|
"advisory_label": label,
|
|||
|
|
"hour_utc": 3,
|
|||
|
|
"slot_15m": "3:00",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def test_strategy_C_blocks_monday(self):
|
|||
|
|
adv = self._adv(dow=0)
|
|||
|
|
r = apply_gate("C", adv)
|
|||
|
|
assert r.is_blocked
|
|||
|
|
assert r.lev_mult == 0.0
|
|||
|
|
|
|||
|
|
def test_strategy_C_allows_tuesday(self):
|
|||
|
|
adv = self._adv(dow=1)
|
|||
|
|
r = apply_gate("C", adv)
|
|||
|
|
assert not r.is_blocked
|
|||
|
|
assert r.lev_mult == 1.0
|
|||
|
|
|
|||
|
|
def test_strategy_D_blocks_ny_afternoon(self):
|
|||
|
|
adv = self._adv(session="NY_AFTERNOON")
|
|||
|
|
r = apply_gate("D", adv)
|
|||
|
|
assert r.is_blocked
|
|||
|
|
|
|||
|
|
def test_strategy_D_allows_london_morning(self):
|
|||
|
|
adv = self._adv(session="LONDON_MORNING")
|
|||
|
|
r = apply_gate("D", adv)
|
|||
|
|
assert not r.is_blocked
|
|||
|
|
|
|||
|
|
def test_strategy_E_blocks_monday(self):
|
|||
|
|
adv = self._adv(dow=0, session="ASIA_PACIFIC")
|
|||
|
|
r = apply_gate("E", adv)
|
|||
|
|
assert r.is_blocked
|
|||
|
|
|
|||
|
|
def test_strategy_E_blocks_ny_afternoon(self):
|
|||
|
|
adv = self._adv(dow=2, session="NY_AFTERNOON")
|
|||
|
|
r = apply_gate("E", adv)
|
|||
|
|
assert r.is_blocked
|
|||
|
|
|
|||
|
|
def test_strategy_E_allows_tue_london(self):
|
|||
|
|
adv = self._adv(dow=1, session="LONDON_MORNING")
|
|||
|
|
r = apply_gate("E", adv)
|
|||
|
|
assert not r.is_blocked
|
|||
|
|
|
|||
|
|
def test_strategy_A_halves_on_unfavorable(self):
|
|||
|
|
adv = self._adv(score=-0.40, label="UNFAVORABLE")
|
|||
|
|
r = apply_gate("A", adv)
|
|||
|
|
assert r.lev_mult == 0.50
|
|||
|
|
assert r.action == "SCALE"
|
|||
|
|
|
|||
|
|
def test_strategy_A_no_boost_on_favorable(self):
|
|||
|
|
# Gold spec: never boost beyond 1.0
|
|||
|
|
adv = self._adv(score=0.40, label="FAVORABLE")
|
|||
|
|
r = apply_gate("A", adv)
|
|||
|
|
assert r.lev_mult == 1.0
|
|||
|
|
|
|||
|
|
def test_strategy_A_75pct_on_mild_neg(self):
|
|||
|
|
adv = self._adv(score=-0.15, label="MILD_NEGATIVE")
|
|||
|
|
r = apply_gate("A", adv)
|
|||
|
|
assert r.lev_mult == 0.75
|
|||
|
|
|
|||
|
|
def test_strategy_B_blocks_unfav_ny_afternoon(self):
|
|||
|
|
adv = self._adv(dow=4, session="NY_AFTERNOON", label="UNFAVORABLE", score=-0.35)
|
|||
|
|
r = apply_gate("B", adv)
|
|||
|
|
assert r.is_blocked
|
|||
|
|
|
|||
|
|
def test_strategy_B_reduces_monday(self):
|
|||
|
|
adv = self._adv(dow=0, session="ASIA_PACIFIC", label="NEUTRAL", score=0.0)
|
|||
|
|
r = apply_gate("B", adv)
|
|||
|
|
assert r.lev_mult == 0.60
|
|||
|
|
assert not r.is_blocked
|
|||
|
|
|
|||
|
|
def test_strategy_B_allows_mild_neg_london(self):
|
|||
|
|
adv = self._adv(dow=3, session="LONDON_MORNING", label="MILD_NEGATIVE", score=-0.15)
|
|||
|
|
r = apply_gate("B", adv)
|
|||
|
|
assert r.action == "ALLOW"
|
|||
|
|
|
|||
|
|
def test_strategy_F_unfav_blocks_b4_b0_b1_b5(self):
|
|||
|
|
adv = self._adv(label="UNFAVORABLE", score=-0.40)
|
|||
|
|
r = apply_gate("F", adv)
|
|||
|
|
# UNFAVORABLE: B0=0, B1=0, B4=0, B5=0
|
|||
|
|
assert r.s6_mult[4] == 0.0 # B4 blocked
|
|||
|
|
assert r.s6_mult[0] == 0.0 # B0 blocked
|
|||
|
|
assert r.s6_mult[1] == 0.0 # B1 blocked
|
|||
|
|
assert r.s6_mult[5] == 0.0 # B5 blocked
|
|||
|
|
|
|||
|
|
def test_strategy_F_unfav_keeps_b3_b6(self):
|
|||
|
|
adv = self._adv(label="UNFAVORABLE", score=-0.40)
|
|||
|
|
r = apply_gate("F", adv)
|
|||
|
|
assert r.s6_mult[3] > 0 # B3 still active
|
|||
|
|
assert r.s6_mult[6] > 0 # B6 still active
|
|||
|
|
|
|||
|
|
def test_strategy_F_favorable_allows_b4(self):
|
|||
|
|
adv = self._adv(label="FAVORABLE", score=0.40)
|
|||
|
|
r = apply_gate("F", adv)
|
|||
|
|
# FAVORABLE: B4 gets 0.20 (reduced but non-zero)
|
|||
|
|
assert r.s6_mult[4] > 0.0
|
|||
|
|
|
|||
|
|
def test_strategy_F_neutral_is_gold_s6(self):
|
|||
|
|
adv = self._adv(label="NEUTRAL", score=0.02)
|
|||
|
|
r = apply_gate("F", adv)
|
|||
|
|
from esof_gate import S6_BASE
|
|||
|
|
assert r.s6_mult == S6_BASE
|
|||
|
|
|
|||
|
|
def test_get_s6_mult_for_bucket(self):
|
|||
|
|
adv = self._adv(label="FAVORABLE", score=0.35)
|
|||
|
|
mult = get_s6_mult(adv, bucket_id=3) # B3 in FAVORABLE
|
|||
|
|
assert mult == 2.0 # B3 always 2.0 regardless of EsoF label
|
|||
|
|
|
|||
|
|
def test_irp_params_widen_on_favorable(self):
|
|||
|
|
from esof_gate import get_irp_params
|
|||
|
|
adv = self._adv(label="FAVORABLE")
|
|||
|
|
p = get_irp_params(adv)
|
|||
|
|
assert p["alignment_min"] < IRP_GOLD["alignment_min"] # relaxed
|
|||
|
|
assert p["noise_max"] > IRP_GOLD["noise_max"] # relaxed
|
|||
|
|
assert p["latency_max"] > IRP_GOLD["latency_max"] # relaxed
|
|||
|
|
|
|||
|
|
def test_irp_params_tighten_on_unfavorable(self):
|
|||
|
|
from esof_gate import get_irp_params
|
|||
|
|
adv = self._adv(label="UNFAVORABLE")
|
|||
|
|
p = get_irp_params(adv)
|
|||
|
|
assert p["alignment_min"] > IRP_GOLD["alignment_min"] # stricter
|
|||
|
|
assert p["noise_max"] < IRP_GOLD["noise_max"] # stricter
|
|||
|
|
assert p["latency_max"] < IRP_GOLD["latency_max"] # stricter
|
|||
|
|
|
|||
|
|
def test_unknown_strategy_raises(self):
|
|||
|
|
adv = self._adv()
|
|||
|
|
with pytest.raises(KeyError):
|
|||
|
|
apply_gate("Z", adv)
|
|||
|
|
|
|||
|
|
def test_gate_result_is_blocked_property(self):
|
|||
|
|
r = GateResult("BLOCK", 0.0, "test")
|
|||
|
|
assert r.is_blocked
|
|||
|
|
r2 = GateResult("SCALE", 0.5, "test")
|
|||
|
|
assert not r2.is_blocked
|
|||
|
|
|
|||
|
|
def test_bucket_map_coverage(self):
|
|||
|
|
# Known B3 assets must map to 3
|
|||
|
|
for asset in ["ADAUSDT", "DOGEUSDT", "ENJUSDT"]:
|
|||
|
|
assert get_bucket(asset) == 3
|
|||
|
|
# Known B4 must map to 4
|
|||
|
|
for asset in ["LTCUSDT", "BNBUSDT"]:
|
|||
|
|
assert get_bucket(asset) == 4
|
|||
|
|
|
|||
|
|
def test_bucket_fallback_unknown(self):
|
|||
|
|
assert get_bucket("UNKNOWNUSDT") == 0 # B0 fallback
|
|||
|
|
|
|||
|
|
def test_pkl_overrides_map(self):
|
|||
|
|
assert get_bucket("LTCUSDT", {"LTCUSDT": 9}) == 9
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TestEsoFComputeIntegration:
|
|||
|
|
"""Tests compute_esof on known fixtures (no CH required)."""
|
|||
|
|
|
|||
|
|
def test_monday_dow_is_zero(self):
|
|||
|
|
# 2026-04-13 is a Monday
|
|||
|
|
dt = datetime(2026, 4, 13, 10, 0, tzinfo=timezone.utc)
|
|||
|
|
adv = compute_esof(dt)
|
|||
|
|
assert adv["dow"] == 0
|
|||
|
|
assert adv["dow_name"] == "Mon"
|
|||
|
|
|
|||
|
|
def test_ny_afternoon_session(self):
|
|||
|
|
dt = datetime(2026, 4, 19, 18, 30, tzinfo=timezone.utc)
|
|||
|
|
adv = compute_esof(dt)
|
|||
|
|
assert adv["session"] == "NY_AFTERNOON"
|
|||
|
|
|
|||
|
|
def test_advisory_score_bounded(self):
|
|||
|
|
import random
|
|||
|
|
for _ in range(20):
|
|||
|
|
day_offset = random.randint(0, 30)
|
|||
|
|
hour = random.randint(0, 23)
|
|||
|
|
dt = datetime(2026, 3, 31, hour, 0, tzinfo=timezone.utc).replace(
|
|||
|
|
day=min(31, datetime(2026, 3, 31, tzinfo=timezone.utc).day + day_offset)
|
|||
|
|
)
|
|||
|
|
try:
|
|||
|
|
adv = compute_esof(dt)
|
|||
|
|
assert -1.0 <= adv["advisory_score"] <= 1.0
|
|||
|
|
except Exception:
|
|||
|
|
pass # date arithmetic edge case
|
|||
|
|
|
|||
|
|
def test_strategy_applied_to_real_advisory(self):
|
|||
|
|
"""Strategy C blocks Monday advisory output."""
|
|||
|
|
dt = datetime(2026, 4, 13, 10, 0, tzinfo=timezone.utc) # Monday
|
|||
|
|
adv = compute_esof(dt)
|
|||
|
|
assert apply_gate("C", adv).is_blocked
|
|||
|
|
|
|||
|
|
def test_sun_london_morning_is_favorable_or_mild_pos(self):
|
|||
|
|
"""Sun LDN (WR=85%) should score positive."""
|
|||
|
|
dt = datetime(2026, 4, 19, 10, 0, tzinfo=timezone.utc) # Sun 10:00
|
|||
|
|
adv = compute_esof(dt)
|
|||
|
|
assert adv["dow"] == 6 # Sunday
|
|||
|
|
assert adv["session"] == "LONDON_MORNING"
|
|||
|
|
assert adv["advisory_score"] > 0.0 # positive EsoF
|
|||
|
|
|
|||
|
|
def test_sun_ny_afternoon_is_negative(self):
|
|||
|
|
"""Sun NY_AFT (WR=6%) must score negative."""
|
|||
|
|
dt = datetime(2026, 4, 19, 18, 0, tzinfo=timezone.utc) # Sun 18:00
|
|||
|
|
adv = compute_esof(dt)
|
|||
|
|
assert adv["session"] == "NY_AFTERNOON"
|
|||
|
|
# Sun is +3.7 WR on DoW, but NY_AFT is -8.3 WR on session → net negative
|
|||
|
|
assert adv["advisory_score"] < 0.0
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TestSNSensitivity:
|
|||
|
|
"""Tests on Sn coefficient sensitivity analysis (analytical, no CH)."""
|
|||
|
|
|
|||
|
|
def test_b3_always_highest_ars(self):
|
|||
|
|
results, asset_names = simulate_ars_sensitivity()
|
|||
|
|
b3_idx = 0 # B3 STRONG is first profile
|
|||
|
|
b4_idx = 3 # B4 WORST
|
|||
|
|
for cfg, row in results.items():
|
|||
|
|
vals = list(row.values())
|
|||
|
|
assert vals[b3_idx] > vals[b4_idx], f"B3 should beat B4 under config {cfg}"
|
|||
|
|
|
|||
|
|
def test_tight_config_widens_b3_vs_b4_gap(self):
|
|||
|
|
"""Tighter Sn (higher noise penalty) should increase gap between B3 and B4."""
|
|||
|
|
results, _ = simulate_ars_sensitivity()
|
|||
|
|
gold = list(results.values())[0]
|
|||
|
|
tight = results["TIGHT (UNFAVORABLE)"]
|
|||
|
|
vals_gold = list(gold.values())
|
|||
|
|
vals_tight = list(tight.values())
|
|||
|
|
gap_gold = vals_gold[0] - vals_gold[3] # B3_STRONG - B4_WORST
|
|||
|
|
gap_tight = vals_tight[0] - vals_tight[3]
|
|||
|
|
assert gap_tight > gap_gold, "Tighter noise penalty should widen B3-vs-B4 gap"
|
|||
|
|
|
|||
|
|
def test_eff_heavy_widens_selection(self):
|
|||
|
|
"""
|
|||
|
|
EFF-HEAVY reduces noise penalty (S3 0.15→0.10) as well as boosting efficiency weight.
|
|||
|
|
Net effect: LIFTS all profiles (B0/B1 become less negative) — WIDENS asset selection.
|
|||
|
|
B3 remains highest ARS; B0 moves closest to zero (nearly qualifies).
|
|||
|
|
"""
|
|||
|
|
results, _ = simulate_ars_sensitivity()
|
|||
|
|
gold = list(results.values())[0]
|
|||
|
|
eff_heavy = results["EFF-HEAVY (FAVORABLE)"]
|
|||
|
|
vals_g = list(gold.values())
|
|||
|
|
vals_e = list(eff_heavy.values())
|
|||
|
|
# All profiles improve under EFF-HEAVY (wider selection)
|
|||
|
|
for i, v in enumerate(vals_e):
|
|||
|
|
assert v > vals_g[i], f"EFF-HEAVY should improve all profiles (idx={i})"
|
|||
|
|
# B3 is still the highest ARS
|
|||
|
|
assert vals_e[0] == max(vals_e), "B3-STRONG must remain the top ARS"
|
|||
|
|
|
|||
|
|
|
|||
|
|
class TestCHIntegration:
|
|||
|
|
"""CH-dependent tests — skipped if CH unavailable."""
|
|||
|
|
|
|||
|
|
@pytest.mark.skipif(not CH_UP, reason="ClickHouse not available")
|
|||
|
|
def test_can_fetch_trades(self):
|
|||
|
|
trades = fetch_trades()
|
|||
|
|
assert len(trades) >= 100, "Expected at least 100 trades in CH"
|
|||
|
|
|
|||
|
|
@pytest.mark.skipif(not CH_UP, reason="ClickHouse not available")
|
|||
|
|
def test_all_strategies_improve_pnl(self):
|
|||
|
|
"""Sanity: strategies C, D, E should all improve net PnL (well-established signals)."""
|
|||
|
|
trades = fetch_trades()
|
|||
|
|
for s in ["C", "D", "E"]:
|
|||
|
|
r = run_strategy(s, trades)
|
|||
|
|
assert r["cf_pnl"] > r["actual_pnl"], (
|
|||
|
|
f"Strategy {s} should improve PnL: cf={r['cf_pnl']:.2f} <= actual={r['actual_pnl']:.2f}"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
@pytest.mark.skipif(not CH_UP, reason="ClickHouse not available")
|
|||
|
|
def test_strategy_C_reduces_trade_count(self):
|
|||
|
|
trades = fetch_trades()
|
|||
|
|
r = run_strategy("C", trades)
|
|||
|
|
assert r["n_blocked"] > 0
|
|||
|
|
assert r["n_exec"] < r["n_trades"]
|
|||
|
|
|
|||
|
|
@pytest.mark.skipif(not CH_UP, reason="ClickHouse not available")
|
|||
|
|
def test_s6_base_beats_raw_baseline(self):
|
|||
|
|
"""Base S6 (no EsoF) should beat raw baseline — established by CRITICAL_ASSET_PICKING."""
|
|||
|
|
trades = fetch_trades()
|
|||
|
|
s6_base = run_s6_baseline(trades)
|
|||
|
|
actual_net = sum(t["pnl"] for t in trades)
|
|||
|
|
assert s6_base["cf_pnl"] > actual_net, "Base S6 should outperform raw baseline"
|
|||
|
|
|
|||
|
|
@pytest.mark.skipif(not CH_UP, reason="ClickHouse not available")
|
|||
|
|
def test_strategy_F_esof_beats_s6_base(self):
|
|||
|
|
"""EsoF-modulated S6 should beat flat S6 (otherwise EsoF modulation adds no value)."""
|
|||
|
|
trades = fetch_trades()
|
|||
|
|
r_f = run_strategy("F", trades)
|
|||
|
|
s6_base = run_s6_baseline(trades)
|
|||
|
|
# Even a small improvement is acceptable — EsoF is noise-limited at 637 trades
|
|||
|
|
assert r_f["cf_pnl"] >= s6_base["cf_pnl"] - 200, (
|
|||
|
|
f"EsoF-S6 ({r_f['cf_pnl']:.0f}) should be within $200 of S6_BASE ({s6_base['cf_pnl']:.0f})"
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ═════════════════════════════════════════════════════════════════════════════
|
|||
|
|
# STANDALONE SIMULATION
|
|||
|
|
# ═════════════════════════════════════════════════════════════════════════════
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
if not CH_UP:
|
|||
|
|
print(f"{RED}ERROR: ClickHouse not reachable at {CH_URL}{RST}")
|
|||
|
|
print("Start ClickHouse then re-run.")
|
|||
|
|
sys.exit(1)
|
|||
|
|
|
|||
|
|
print("Fetching trades from ClickHouse...")
|
|||
|
|
trades = fetch_trades()
|
|||
|
|
print(f" {len(trades)} trades loaded.")
|
|||
|
|
if len(trades) < 50:
|
|||
|
|
print(f"{RED}Too few trades — check dolphin.trade_events.{RST}")
|
|||
|
|
sys.exit(1)
|
|||
|
|
|
|||
|
|
print("Running strategies...")
|
|||
|
|
results = []
|
|||
|
|
for s in ["A", "B", "C", "D", "E", "F"]:
|
|||
|
|
r = run_strategy(s, trades)
|
|||
|
|
results.append(r)
|
|||
|
|
print(f" {s} done.")
|
|||
|
|
|
|||
|
|
s6_base = run_s6_baseline(trades)
|
|||
|
|
sn_analysis = simulate_ars_sensitivity()
|
|||
|
|
|
|||
|
|
print_report(results, s6_base, sn_analysis)
|