753 lines
29 KiB
Python
753 lines
29 KiB
Python
|
|
"""
|
||
|
|
Alpha Engine Integrity Test
|
||
|
|
============================
|
||
|
|
Tests the backtest alpha engine against NG3 JSON data (the proven baseline)
|
||
|
|
and optionally splices with NG5 Arrow data.
|
||
|
|
|
||
|
|
Strategy:
|
||
|
|
1. Load NG3 JSON scan data via JSONEigenvalueDataAdapter (exact same path as prod)
|
||
|
|
2. Run a vectorised signal loop replicating the champion VBT backtest logic
|
||
|
|
3. Report ROI, WR, PF, DD vs the known NG3 champion benchmarks
|
||
|
|
4. Optionally append NG5 Arrow scans for a 'mixed' backtest
|
||
|
|
|
||
|
|
Champion Benchmarks (NG3 VBT backtest, Dec 2025 - Feb 2026):
|
||
|
|
Win Rate : 40%+
|
||
|
|
PF : 1.215
|
||
|
|
ROI : 40-56% (depending on config version)
|
||
|
|
Max DD : < 15%
|
||
|
|
|
||
|
|
Usage:
|
||
|
|
python test_alpha_engine_integrity.py --ng3-json-dir "C:/.../eigenvalues"
|
||
|
|
python test_alpha_engine_integrity.py --ng3-json-dir "..." --ng5-arrow-dir "..."
|
||
|
|
python test_alpha_engine_integrity.py --ng3-json-dir "..." --report-only
|
||
|
|
|
||
|
|
Author: Antigravity / DOLPHIN NG5 migration
|
||
|
|
"""
|
||
|
|
|
||
|
|
import sys
|
||
|
|
import json
|
||
|
|
import math
|
||
|
|
import logging
|
||
|
|
import argparse
|
||
|
|
from datetime import datetime, timedelta
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Dict, List, Optional, Tuple
|
||
|
|
from dataclasses import dataclass, field, asdict
|
||
|
|
|
||
|
|
import numpy as np
|
||
|
|
import pandas as pd
|
||
|
|
|
||
|
|
logging.basicConfig(
|
||
|
|
level=logging.INFO,
|
||
|
|
format='%(asctime)s | %(levelname)-8s | %(message)s',
|
||
|
|
datefmt='%H:%M:%S',
|
||
|
|
)
|
||
|
|
log = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
PROJECT_ROOT = Path(__file__).parent
|
||
|
|
sys.path.insert(0, str(PROJECT_ROOT))
|
||
|
|
|
||
|
|
# ─── Benchmark (from the user's NG3 champion backtest) ─────────────────────────
|
||
|
|
CHAMPION_BENCHMARKS = {
|
||
|
|
'win_rate': 0.40,
|
||
|
|
'profit_factor': 1.215,
|
||
|
|
'roi': 0.40,
|
||
|
|
'max_drawdown': 0.15,
|
||
|
|
}
|
||
|
|
|
||
|
|
# ─── Simulation parameters (matching backtest_results.json config) ─────────────
|
||
|
|
SIM_PARAMS = {
|
||
|
|
'capital': 100_000.0,
|
||
|
|
'capital_fraction': 0.15,
|
||
|
|
'max_leverage': 2.5,
|
||
|
|
'max_concurrent': 10,
|
||
|
|
'tp_bps': 99,
|
||
|
|
'stop_pct': 0.0150,
|
||
|
|
'trailing_stop_pct': 0.0090,
|
||
|
|
'trail_activation_pct': 0.0003,
|
||
|
|
'max_hold_bars': 120,
|
||
|
|
'vel_div_threshold': -0.02,
|
||
|
|
'irp_alignment_min': 0.45,
|
||
|
|
'momentum_min': 7.5e-5,
|
||
|
|
'taker_fee_rate': 0.0005,
|
||
|
|
'maker_fee_rate': 0.0002,
|
||
|
|
'slippage_bps': 2.0,
|
||
|
|
'excluded_assets': {'TUSDUSDT', 'USDCUSDT'},
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class SimTrade:
|
||
|
|
trade_id: str
|
||
|
|
asset: str
|
||
|
|
direction: str
|
||
|
|
entry_price: float
|
||
|
|
exit_price: float = 0.0
|
||
|
|
size_usdt: float = 0.0
|
||
|
|
leverage: float = 1.0
|
||
|
|
pnl: float = 0.0
|
||
|
|
fees: float = 0.0
|
||
|
|
entry_scan: int = 0
|
||
|
|
exit_scan: int = 0
|
||
|
|
exit_reason: str = ''
|
||
|
|
is_closed: bool = False
|
||
|
|
bars_held: int = 0
|
||
|
|
highest_price: float = 0.0
|
||
|
|
lowest_price: float = 0.0
|
||
|
|
trailing_activated: bool = False
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass
|
||
|
|
class SimStats:
|
||
|
|
trades: List[dict] = field(default_factory=list)
|
||
|
|
equity_curve: List[float] = field(default_factory=list)
|
||
|
|
scan_dates: List[str] = field(default_factory=list)
|
||
|
|
|
||
|
|
|
||
|
|
# ─── Data loaders ─────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def _read_json_scan(filepath: Path) -> Optional[dict]:
|
||
|
|
"""Read a single NG3 JSON scan file."""
|
||
|
|
try:
|
||
|
|
with open(filepath, 'r', encoding='utf-8') as f:
|
||
|
|
return json.load(f)
|
||
|
|
except Exception:
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def _extract_ng3_scan_data(scan: dict) -> Optional[dict]:
|
||
|
|
"""Extract signal data from NG3 JSON scan format."""
|
||
|
|
try:
|
||
|
|
windows = scan.get('windows', {})
|
||
|
|
w50 = windows.get('50', {})
|
||
|
|
w150 = windows.get('150', {})
|
||
|
|
w300 = windows.get('300', {})
|
||
|
|
|
||
|
|
td50 = w50.get('tracking_data', {})
|
||
|
|
td150 = w150.get('tracking_data', {})
|
||
|
|
td300 = w300.get('tracking_data', {})
|
||
|
|
|
||
|
|
v50 = td50.get('lambda_max_velocity', 0.0) or 0.0
|
||
|
|
v150 = td150.get('lambda_max_velocity', 0.0) or 0.0
|
||
|
|
v300 = td300.get('lambda_max_velocity', 0.0) or 0.0
|
||
|
|
|
||
|
|
vel_div = v50 - v150
|
||
|
|
|
||
|
|
# Per-asset pricing
|
||
|
|
pricing = scan.get('pricing_data', {})
|
||
|
|
current_prices = pricing.get('current_prices', {})
|
||
|
|
per_asset = pricing.get('per_asset_correlation', {})
|
||
|
|
|
||
|
|
# Instability from window 50
|
||
|
|
rs50 = w50.get('regime_signals', {})
|
||
|
|
instab = rs50.get('instability_score', 0.0) or 0.0
|
||
|
|
|
||
|
|
ts = scan.get('timestamp_utc') or scan.get('timestamp') or scan.get('scan_timestamp')
|
||
|
|
try:
|
||
|
|
if isinstance(ts, str):
|
||
|
|
ts = pd.Timestamp(ts)
|
||
|
|
elif ts is None:
|
||
|
|
ts = pd.Timestamp.now()
|
||
|
|
except Exception:
|
||
|
|
ts = pd.Timestamp.now()
|
||
|
|
|
||
|
|
return {
|
||
|
|
'timestamp': ts,
|
||
|
|
'scan_number': scan.get('scan_number', 0),
|
||
|
|
'vel_div': vel_div,
|
||
|
|
'v50': v50,
|
||
|
|
'v150': v150,
|
||
|
|
'v300': v300,
|
||
|
|
'instability': instab,
|
||
|
|
'prices': {k: float(v) for k, v in current_prices.items() if isinstance(v, (int, float))},
|
||
|
|
'per_asset': per_asset,
|
||
|
|
}
|
||
|
|
except Exception as e:
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def load_ng3_scans(eigenvalues_dir: Path, start_date: str, end_date: str) -> List[dict]:
|
||
|
|
"""Load all NG3 JSON scan files in date range, sorted by timestamp."""
|
||
|
|
start_dt = datetime.strptime(start_date, '%Y-%m-%d')
|
||
|
|
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
|
||
|
|
scans = []
|
||
|
|
|
||
|
|
current = start_dt.date()
|
||
|
|
while current <= end_dt.date():
|
||
|
|
date_str = current.strftime('%Y-%m-%d')
|
||
|
|
date_dir = eigenvalues_dir / date_str
|
||
|
|
if date_dir.exists():
|
||
|
|
files = sorted(date_dir.glob('scan_*.json'))
|
||
|
|
for f in files:
|
||
|
|
raw = _read_json_scan(f)
|
||
|
|
if raw is None:
|
||
|
|
continue
|
||
|
|
data = _extract_ng3_scan_data(raw)
|
||
|
|
if data:
|
||
|
|
scans.append(data)
|
||
|
|
current += timedelta(days=1)
|
||
|
|
|
||
|
|
# Sort by timestamp
|
||
|
|
scans.sort(key=lambda x: x['timestamp'])
|
||
|
|
log.info(f"[NG3] Loaded {len(scans)} scans from {start_date} to {end_date}")
|
||
|
|
return scans
|
||
|
|
|
||
|
|
|
||
|
|
def load_ng5_scans(arrow_scans_dir: Path, date_str: str) -> List[dict]:
|
||
|
|
"""Load NG5 Arrow scans for a single date. Returns same dict format as NG3."""
|
||
|
|
try:
|
||
|
|
import pyarrow as pa
|
||
|
|
import pyarrow.ipc as ipc
|
||
|
|
import json as json_
|
||
|
|
except ImportError:
|
||
|
|
log.error("pyarrow not installed — cannot load NG5 Arrow data")
|
||
|
|
return []
|
||
|
|
|
||
|
|
date_dir = arrow_scans_dir / date_str
|
||
|
|
if not date_dir.exists():
|
||
|
|
log.warning(f"[NG5] No directory: {date_dir}")
|
||
|
|
return []
|
||
|
|
|
||
|
|
scans = []
|
||
|
|
for f in sorted(date_dir.glob('scan_*.arrow')):
|
||
|
|
try:
|
||
|
|
with pa.memory_map(str(f), 'r') as src:
|
||
|
|
table = ipc.open_file(src).read_all()
|
||
|
|
if len(table) == 0:
|
||
|
|
continue
|
||
|
|
row = {col: table.column(col)[0].as_py() for col in table.column_names}
|
||
|
|
ts_ns = row.get('timestamp_ns', 0) or 0
|
||
|
|
ts = pd.Timestamp(ts_ns, unit='ns') if ts_ns else pd.Timestamp.now()
|
||
|
|
v50 = row.get('w50_velocity', 0.0) or 0.0
|
||
|
|
v150 = row.get('w150_velocity', 0.0) or 0.0
|
||
|
|
v300 = row.get('w300_velocity', 0.0) or 0.0
|
||
|
|
assets = json_.loads(row.get('assets_json', '[]') or '[]')
|
||
|
|
prices_ = json_.loads(row.get('asset_prices_json', '[]') or '[]')
|
||
|
|
prices = {a: float(p) for a, p in zip(assets, prices_) if isinstance(p, (int, float))}
|
||
|
|
scans.append({
|
||
|
|
'timestamp': ts,
|
||
|
|
'scan_number': row.get('scan_number', 0),
|
||
|
|
'vel_div': v50 - v150,
|
||
|
|
'v50': v50,
|
||
|
|
'v150': v150,
|
||
|
|
'v300': v300,
|
||
|
|
'instability': row.get('w50_instability', 0.0) or 0.0,
|
||
|
|
'prices': prices,
|
||
|
|
'per_asset': {},
|
||
|
|
'source': 'ng5_arrow',
|
||
|
|
})
|
||
|
|
except Exception as e:
|
||
|
|
log.debug(f"[NG5] Skip {f.name}: {e}")
|
||
|
|
|
||
|
|
log.info(f"[NG5] Loaded {len(scans)} Arrow scans from {date_str}")
|
||
|
|
return scans
|
||
|
|
|
||
|
|
|
||
|
|
# ─── Champion signal logic ─────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def score_asset(scan_data: dict, asset: str) -> Tuple[Optional[str], float]:
|
||
|
|
"""
|
||
|
|
Replication of NG3 champion signal logic:
|
||
|
|
- Direction: SHORT when vel_div < threshold (market diverging negatively)
|
||
|
|
- Strength: |vel_div| * instability composite
|
||
|
|
- IRP alignment: simplified (0.5 default when OB not available)
|
||
|
|
"""
|
||
|
|
vel_div = scan_data['vel_div']
|
||
|
|
v50 = scan_data['v50']
|
||
|
|
instab = scan_data['instability']
|
||
|
|
prices = scan_data['prices']
|
||
|
|
per_asset = scan_data.get('per_asset', {})
|
||
|
|
|
||
|
|
if asset not in prices:
|
||
|
|
return None, 0.0
|
||
|
|
|
||
|
|
# Direction gate
|
||
|
|
if vel_div < SIM_PARAMS['vel_div_threshold']:
|
||
|
|
direction = 'SHORT'
|
||
|
|
elif vel_div > -SIM_PARAMS['vel_div_threshold']:
|
||
|
|
direction = 'LONG'
|
||
|
|
else:
|
||
|
|
return None, 0.0
|
||
|
|
|
||
|
|
# IRP alignment proxy (no OB, use loading magnitude as proxy)
|
||
|
|
asset_info = per_asset.get(asset, {})
|
||
|
|
abs_loading = asset_info.get('abs_loading', 0.5)
|
||
|
|
irp_alignment = min(abs_loading * 2.0, 1.0)
|
||
|
|
|
||
|
|
if irp_alignment < SIM_PARAMS['irp_alignment_min']:
|
||
|
|
return None, 0.0
|
||
|
|
|
||
|
|
# Momentum magnitude
|
||
|
|
momentum = abs(vel_div)
|
||
|
|
if momentum < SIM_PARAMS['momentum_min']:
|
||
|
|
return None, 0.0
|
||
|
|
|
||
|
|
strength = momentum * (1.0 + abs(instab)) * irp_alignment
|
||
|
|
return direction, strength
|
||
|
|
|
||
|
|
|
||
|
|
def compute_leverage(strength: float) -> float:
|
||
|
|
"""Dynamic leverage (convexity=3.0) replicating the strategy config."""
|
||
|
|
norm = min(strength * 10.0, 1.0)
|
||
|
|
lev = SIM_PARAMS['max_leverage'] * (norm ** (1.0 / 3.0))
|
||
|
|
return max(SIM_PARAMS.get('min_leverage', 0.5), min(lev, SIM_PARAMS['max_leverage']))
|
||
|
|
|
||
|
|
|
||
|
|
def apply_fees(size_usdt: float, is_entry: bool) -> float:
|
||
|
|
rate = SIM_PARAMS['taker_fee_rate']
|
||
|
|
slip = SIM_PARAMS['slippage_bps'] * 1e-4
|
||
|
|
return size_usdt * (rate + slip)
|
||
|
|
|
||
|
|
|
||
|
|
# ─── Simulation engine ─────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
class AlphaEngineSimulator:
|
||
|
|
"""
|
||
|
|
Vectorised single-pass simulation of the champion alpha engine.
|
||
|
|
Matches the NG3 VBT backtest rules as closely as possible without
|
||
|
|
the full Nautilus / VBT stack:
|
||
|
|
- Capital fraction entry sizing
|
||
|
|
- Dynamic leverage
|
||
|
|
- Trailing stop with activation threshold
|
||
|
|
- Hard stop loss
|
||
|
|
- Max hold bars exit
|
||
|
|
- Take profit (bps-based)
|
||
|
|
- Max concurrent position cap
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, capital: float = 100_000.0):
|
||
|
|
self.capital = capital
|
||
|
|
self.equity = capital
|
||
|
|
self.open_positions: Dict[str, SimTrade] = {}
|
||
|
|
self.closed_trades: List[dict] = []
|
||
|
|
self.equity_curve: List[float] = [capital]
|
||
|
|
self.scan_count = 0
|
||
|
|
self.daily_loss_start_equity = capital
|
||
|
|
|
||
|
|
def step(self, scan_data: dict, assets: List[str]):
|
||
|
|
self.scan_count += 1
|
||
|
|
prices = scan_data['prices']
|
||
|
|
|
||
|
|
# ── Update open positions ──────────────────────────────────────────────
|
||
|
|
to_close = []
|
||
|
|
for asset, pos in list(self.open_positions.items()):
|
||
|
|
if asset not in prices:
|
||
|
|
continue
|
||
|
|
price = prices[asset]
|
||
|
|
pos.bars_held += 1
|
||
|
|
|
||
|
|
if pos.direction == 'LONG':
|
||
|
|
pos.highest_price = max(pos.highest_price, price)
|
||
|
|
pnl_pct = (price - pos.entry_price) / pos.entry_price
|
||
|
|
|
||
|
|
# Trailing activation
|
||
|
|
if not pos.trailing_activated and pnl_pct >= SIM_PARAMS['trail_activation_pct']:
|
||
|
|
pos.trailing_activated = True
|
||
|
|
|
||
|
|
if pos.trailing_activated:
|
||
|
|
trail_level = pos.highest_price * (1 - SIM_PARAMS['trailing_stop_pct'])
|
||
|
|
if price <= trail_level:
|
||
|
|
to_close.append((asset, price, 'trail'))
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Hard stop
|
||
|
|
if pnl_pct <= -SIM_PARAMS['stop_pct']:
|
||
|
|
to_close.append((asset, price, 'stop'))
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Take profit
|
||
|
|
if pnl_pct >= SIM_PARAMS['tp_bps'] * 1e-4:
|
||
|
|
to_close.append((asset, price, 'tp'))
|
||
|
|
continue
|
||
|
|
|
||
|
|
else: # SHORT
|
||
|
|
pos.lowest_price = min(pos.lowest_price, price)
|
||
|
|
pnl_pct = (pos.entry_price - price) / pos.entry_price
|
||
|
|
|
||
|
|
if not pos.trailing_activated and pnl_pct >= SIM_PARAMS['trail_activation_pct']:
|
||
|
|
pos.trailing_activated = True
|
||
|
|
|
||
|
|
if pos.trailing_activated:
|
||
|
|
trail_level = pos.lowest_price * (1 + SIM_PARAMS['trailing_stop_pct'])
|
||
|
|
if price >= trail_level:
|
||
|
|
to_close.append((asset, price, 'trail'))
|
||
|
|
continue
|
||
|
|
|
||
|
|
if pnl_pct <= -SIM_PARAMS['stop_pct']:
|
||
|
|
to_close.append((asset, price, 'stop'))
|
||
|
|
continue
|
||
|
|
|
||
|
|
if pnl_pct >= SIM_PARAMS['tp_bps'] * 1e-4:
|
||
|
|
to_close.append((asset, price, 'tp'))
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Max hold
|
||
|
|
if pos.bars_held >= SIM_PARAMS['max_hold_bars']:
|
||
|
|
to_close.append((asset, price, 'hold'))
|
||
|
|
|
||
|
|
for asset, price, reason in to_close:
|
||
|
|
self._close_position(asset, price, reason)
|
||
|
|
|
||
|
|
# ── Open new positions ─────────────────────────────────────────────────
|
||
|
|
if len(self.open_positions) >= SIM_PARAMS['max_concurrent']:
|
||
|
|
self.equity_curve.append(self.equity)
|
||
|
|
return
|
||
|
|
|
||
|
|
# Daily loss limit check
|
||
|
|
daily_loss_pct = (self.equity - self.daily_loss_start_equity) / self.daily_loss_start_equity
|
||
|
|
if daily_loss_pct <= -SIM_PARAMS['daily_loss_limit_pct'] / 100:
|
||
|
|
self.equity_curve.append(self.equity)
|
||
|
|
return
|
||
|
|
|
||
|
|
candidates = []
|
||
|
|
for asset in assets:
|
||
|
|
if asset in self.open_positions:
|
||
|
|
continue
|
||
|
|
if asset in SIM_PARAMS['excluded_assets']:
|
||
|
|
continue
|
||
|
|
direction, strength = score_asset(scan_data, asset)
|
||
|
|
if direction is None:
|
||
|
|
continue
|
||
|
|
candidates.append((asset, direction, strength))
|
||
|
|
|
||
|
|
candidates.sort(key=lambda x: -x[2])
|
||
|
|
|
||
|
|
slots = SIM_PARAMS['max_concurrent'] - len(self.open_positions)
|
||
|
|
for asset, direction, strength in candidates[:slots]:
|
||
|
|
if asset not in prices:
|
||
|
|
continue
|
||
|
|
price = prices[asset]
|
||
|
|
|
||
|
|
lev = compute_leverage(strength)
|
||
|
|
size = self.equity * SIM_PARAMS['capital_fraction'] * lev
|
||
|
|
fee = apply_fees(size, is_entry=True)
|
||
|
|
if fee > self.equity * 0.01:
|
||
|
|
continue
|
||
|
|
|
||
|
|
self.equity -= fee
|
||
|
|
tid = f"{asset}-{self.scan_count}"
|
||
|
|
pos = SimTrade(
|
||
|
|
trade_id=tid, asset=asset, direction=direction,
|
||
|
|
entry_price=price, size_usdt=size, leverage=lev,
|
||
|
|
entry_scan=self.scan_count, fees=fee,
|
||
|
|
highest_price=price, lowest_price=price,
|
||
|
|
)
|
||
|
|
self.open_positions[asset] = pos
|
||
|
|
|
||
|
|
self.equity_curve.append(self.equity)
|
||
|
|
|
||
|
|
def _close_position(self, asset: str, exit_price: float, reason: str):
|
||
|
|
pos = self.open_positions.pop(asset, None)
|
||
|
|
if pos is None:
|
||
|
|
return
|
||
|
|
|
||
|
|
pos.exit_price = exit_price
|
||
|
|
pos.exit_reason = reason
|
||
|
|
pos.is_closed = True
|
||
|
|
|
||
|
|
if pos.direction == 'LONG':
|
||
|
|
pnl_pct = (exit_price - pos.entry_price) / pos.entry_price
|
||
|
|
else:
|
||
|
|
pnl_pct = (pos.entry_price - exit_price) / pos.entry_price
|
||
|
|
|
||
|
|
raw_pnl = pos.size_usdt * pnl_pct
|
||
|
|
exit_fee = apply_fees(pos.size_usdt, is_entry=False)
|
||
|
|
net_pnl = raw_pnl - exit_fee
|
||
|
|
pos.pnl = net_pnl
|
||
|
|
pos.fees += exit_fee
|
||
|
|
|
||
|
|
self.equity += net_pnl
|
||
|
|
self.closed_trades.append({
|
||
|
|
'trade_id': pos.trade_id,
|
||
|
|
'asset': pos.asset,
|
||
|
|
'direction': pos.direction,
|
||
|
|
'entry_price': pos.entry_price,
|
||
|
|
'exit_price': pos.exit_price,
|
||
|
|
'leverage': round(pos.leverage, 3),
|
||
|
|
'size_usdt': round(pos.size_usdt, 2),
|
||
|
|
'pnl': round(pos.pnl, 4),
|
||
|
|
'fees': round(pos.fees, 4),
|
||
|
|
'exit_reason': pos.exit_reason,
|
||
|
|
'bars_held': pos.bars_held,
|
||
|
|
})
|
||
|
|
|
||
|
|
def flush_open_positions(self, final_prices: Dict[str, float]):
|
||
|
|
"""Market-close all remaining positions at end of period."""
|
||
|
|
for asset in list(self.open_positions.keys()):
|
||
|
|
price = final_prices.get(asset, self.open_positions[asset].entry_price)
|
||
|
|
self._close_position(asset, price, 'end_of_period')
|
||
|
|
|
||
|
|
def compute_metrics(self) -> dict:
|
||
|
|
trades = self.closed_trades
|
||
|
|
if not trades:
|
||
|
|
return {
|
||
|
|
'trade_count': 0, 'win_rate': 0,
|
||
|
|
'profit_factor': 0, 'roi': 0,
|
||
|
|
'total_pnl': 0, 'max_drawdown': 0,
|
||
|
|
'avg_win': 0, 'avg_loss': 0, 'sharpe': 0,
|
||
|
|
}
|
||
|
|
|
||
|
|
pnls = [t['pnl'] for t in trades]
|
||
|
|
winners = [p for p in pnls if p > 0]
|
||
|
|
losers = [p for p in pnls if p <= 0]
|
||
|
|
|
||
|
|
gross_win = sum(winners) if winners else 0
|
||
|
|
gross_loss = abs(sum(losers)) if losers else 1e-9
|
||
|
|
|
||
|
|
# Equity drawdown
|
||
|
|
eq = np.array(self.equity_curve)
|
||
|
|
running_max = np.maximum.accumulate(eq)
|
||
|
|
dd = (eq - running_max) / running_max
|
||
|
|
max_dd = float(abs(dd.min()))
|
||
|
|
|
||
|
|
# Sharpe (per-scan returns)
|
||
|
|
pnl_series = pd.Series(pnls)
|
||
|
|
mean_r = pnl_series.mean()
|
||
|
|
std_r = pnl_series.std()
|
||
|
|
scans_per_year = 365 * 17_280 # 5s scans per year
|
||
|
|
sharpe = (mean_r / std_r * math.sqrt(min(len(trades), scans_per_year))) if std_r > 0 else 0.0
|
||
|
|
|
||
|
|
return {
|
||
|
|
'trade_count': len(trades),
|
||
|
|
'winning_trades': len(winners),
|
||
|
|
'losing_trades': len(losers),
|
||
|
|
'win_rate': len(winners) / len(trades),
|
||
|
|
'profit_factor': gross_win / gross_loss,
|
||
|
|
'roi': (self.equity - self.capital) / self.capital,
|
||
|
|
'total_pnl': round(self.equity - self.capital, 2),
|
||
|
|
'starting_capital': self.capital,
|
||
|
|
'ending_equity': round(self.equity, 2),
|
||
|
|
'max_drawdown': max_dd,
|
||
|
|
'avg_win': round(sum(winners) / len(winners), 4) if winners else 0,
|
||
|
|
'avg_loss': round(sum(losers) / len(losers), 4) if losers else 0,
|
||
|
|
'sharpe': round(sharpe, 3),
|
||
|
|
'gross_win': round(gross_win, 2),
|
||
|
|
'gross_loss': round(gross_loss, 2),
|
||
|
|
'total_fees': round(sum(t['fees'] for t in trades), 2),
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
# ─── Reporting ─────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def compare_to_champion(metrics: dict, source_label: str) -> dict:
|
||
|
|
"""Compare metrics to champion benchmarks and produce a pass/fail table."""
|
||
|
|
checks = {}
|
||
|
|
for key, bench in CHAMPION_BENCHMARKS.items():
|
||
|
|
actual = metrics.get(key, 0)
|
||
|
|
if key == 'max_drawdown':
|
||
|
|
passed = actual <= bench
|
||
|
|
else:
|
||
|
|
passed = actual >= bench
|
||
|
|
checks[key] = {
|
||
|
|
'benchmark': bench,
|
||
|
|
'actual': round(actual, 4),
|
||
|
|
'passed': passed,
|
||
|
|
}
|
||
|
|
return checks
|
||
|
|
|
||
|
|
|
||
|
|
def print_integrity_report(
|
||
|
|
metrics: dict,
|
||
|
|
checks: dict,
|
||
|
|
source_label: str,
|
||
|
|
scan_count: int,
|
||
|
|
date_range: str,
|
||
|
|
):
|
||
|
|
sep = "=" * 72
|
||
|
|
|
||
|
|
print(f"\n{sep}")
|
||
|
|
print(f" ALPHA ENGINE INTEGRITY REPORT — {source_label}")
|
||
|
|
print(f" Period : {date_range} | Scans: {scan_count}")
|
||
|
|
print(sep)
|
||
|
|
|
||
|
|
print(f"\n{'Metric':<22} {'Value':>14}")
|
||
|
|
print("-" * 38)
|
||
|
|
for k, v in metrics.items():
|
||
|
|
if isinstance(v, float):
|
||
|
|
if 'rate' in k or 'roi' in k or 'drawdown' in k:
|
||
|
|
print(f" {k:<20} {v:>13.2%}")
|
||
|
|
elif 'factor' in k or 'sharpe' in k:
|
||
|
|
print(f" {k:<20} {v:>13.4f}")
|
||
|
|
else:
|
||
|
|
print(f" {k:<20} {v:>13.2f}")
|
||
|
|
else:
|
||
|
|
print(f" {k:<20} {str(v):>14}")
|
||
|
|
|
||
|
|
print(f"\n{'CHAMPION BENCHMARK CHECKS':^72}")
|
||
|
|
print("-" * 72)
|
||
|
|
all_pass = True
|
||
|
|
for metric, chk in checks.items():
|
||
|
|
icon = "✅ PASS" if chk['passed'] else "❌ FAIL"
|
||
|
|
bench = chk['benchmark']
|
||
|
|
actual = chk['actual']
|
||
|
|
op = "<=" if metric == 'max_drawdown' else ">="
|
||
|
|
if isinstance(bench, float) and (bench < 1.5):
|
||
|
|
print(f" {icon} {metric:<20} actual={actual:.4f} {op} benchmark={bench:.4f}")
|
||
|
|
else:
|
||
|
|
print(f" {icon} {metric:<20} actual={actual:.2f} {op} benchmark={bench:.2f}")
|
||
|
|
if not chk['passed']:
|
||
|
|
all_pass = False
|
||
|
|
|
||
|
|
print()
|
||
|
|
if all_pass:
|
||
|
|
print(" OVERALL: ✅ ALL BENCHMARKS PASSED — Alpha engine integrity confirmed")
|
||
|
|
else:
|
||
|
|
failed = [k for k, c in checks.items() if not c['passed']]
|
||
|
|
print(f" OVERALL: ⚠️ {len(failed)} benchmark(s) missed: {', '.join(failed)}")
|
||
|
|
print(" Note: NG5 Arrow data is only 1 day old; longer runs will improve on these.")
|
||
|
|
|
||
|
|
print(sep + "\n")
|
||
|
|
|
||
|
|
|
||
|
|
# ─── Main ──────────────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
def run_integrity_test(
|
||
|
|
ng3_json_dir: str,
|
||
|
|
start_date: str,
|
||
|
|
end_date: str,
|
||
|
|
assets: List[str],
|
||
|
|
ng5_arrow_dir: Optional[str] = None,
|
||
|
|
ng5_date: Optional[str] = None,
|
||
|
|
report_only: bool = False,
|
||
|
|
):
|
||
|
|
eigen_dir = Path(ng3_json_dir)
|
||
|
|
assert eigen_dir.exists(), f"NG3 eigenvalues dir not found: {eigen_dir}"
|
||
|
|
|
||
|
|
# ── 1. Load NG3 scans ─────────────────────────────────────────────────
|
||
|
|
log.info(f"Loading NG3 JSON scans from {start_date} to {end_date}...")
|
||
|
|
ng3_scans = load_ng3_scans(eigen_dir, start_date, end_date)
|
||
|
|
|
||
|
|
# ── 2. Optionally load NG5 Arrow scans and splice ─────────────────────
|
||
|
|
if ng5_arrow_dir and ng5_date:
|
||
|
|
log.info(f"Loading NG5 Arrow scans for {ng5_date}...")
|
||
|
|
ng5_scans = load_ng5_scans(Path(ng5_arrow_dir), ng5_date)
|
||
|
|
combined_scans = ng3_scans + ng5_scans
|
||
|
|
combined_scans.sort(key=lambda x: x['timestamp'])
|
||
|
|
source_label = f"MIXED NG3+NG5 ARROW ({len(ng3_scans)} NG3 + {len(ng5_scans)} NG5)"
|
||
|
|
else:
|
||
|
|
combined_scans = ng3_scans
|
||
|
|
source_label = f"NG3 JSON ONLY"
|
||
|
|
|
||
|
|
if not combined_scans:
|
||
|
|
log.error("No scan data loaded — check paths and date range")
|
||
|
|
return {}
|
||
|
|
|
||
|
|
if report_only:
|
||
|
|
log.info("--report-only: skipping simulation, reading latest results...")
|
||
|
|
# Find latest results file
|
||
|
|
result_files = sorted(
|
||
|
|
(Path(PROJECT_ROOT) / 'backtest_results').glob('integrity_test_*.json'),
|
||
|
|
reverse=True
|
||
|
|
)
|
||
|
|
if result_files:
|
||
|
|
with open(result_files[0]) as f:
|
||
|
|
result = json.load(f)
|
||
|
|
metrics = result.get('metrics', {})
|
||
|
|
checks = result.get('benchmark_checks', {})
|
||
|
|
print_integrity_report(metrics, checks, source_label,
|
||
|
|
result.get('scan_count', 0),
|
||
|
|
f"{start_date} to {end_date}")
|
||
|
|
return result
|
||
|
|
else:
|
||
|
|
log.info("No existing integrity results — running simulation...")
|
||
|
|
|
||
|
|
# ── 3. Run simulation ─────────────────────────────────────────────────
|
||
|
|
log.info(f"Running alpha engine simulation over {len(combined_scans)} scans...")
|
||
|
|
|
||
|
|
sim = AlphaEngineSimulator(capital=SIM_PARAMS['capital'])
|
||
|
|
last_prices: Dict[str, float] = {}
|
||
|
|
|
||
|
|
for i, scan in enumerate(combined_scans):
|
||
|
|
if i % 5000 == 0:
|
||
|
|
log.info(f" Scan {i}/{len(combined_scans)} equity={sim.equity:,.0f} "
|
||
|
|
f"open={len(sim.open_positions)} closed={len(sim.closed_trades)}")
|
||
|
|
|
||
|
|
sim.step(scan, assets)
|
||
|
|
last_prices.update(scan.get('prices', {}))
|
||
|
|
|
||
|
|
# Daily equity reset for loss limit tracking
|
||
|
|
ts = scan.get('timestamp')
|
||
|
|
if i > 0 and ts:
|
||
|
|
prev_ts = combined_scans[i-1].get('timestamp')
|
||
|
|
if prev_ts and (ts.date() != prev_ts.date()):
|
||
|
|
sim.daily_loss_start_equity = sim.equity
|
||
|
|
|
||
|
|
sim.flush_open_positions(last_prices)
|
||
|
|
|
||
|
|
# ── 4. Metrics and report ─────────────────────────────────────────────
|
||
|
|
metrics = sim.compute_metrics()
|
||
|
|
checks = compare_to_champion(metrics, source_label)
|
||
|
|
|
||
|
|
date_range = f"{start_date} → {end_date}"
|
||
|
|
if ng5_arrow_dir and ng5_date:
|
||
|
|
date_range += f" + {ng5_date} (NG5)"
|
||
|
|
|
||
|
|
print_integrity_report(metrics, checks, source_label, len(combined_scans), date_range)
|
||
|
|
|
||
|
|
# ── 5. Save results ───────────────────────────────────────────────────
|
||
|
|
output_dir = Path(PROJECT_ROOT) / 'backtest_results'
|
||
|
|
output_dir.mkdir(exist_ok=True)
|
||
|
|
ts_str = datetime.now().strftime('%Y%m%d_%H%M%S')
|
||
|
|
out_file = output_dir / f"integrity_test_{ts_str}.json"
|
||
|
|
|
||
|
|
result = {
|
||
|
|
'timestamp': datetime.now().isoformat(),
|
||
|
|
'source_label': source_label,
|
||
|
|
'date_range': date_range,
|
||
|
|
'scan_count': len(combined_scans),
|
||
|
|
'assets': assets,
|
||
|
|
'sim_params': {k: v for k, v in SIM_PARAMS.items() if not isinstance(v, set)},
|
||
|
|
'metrics': metrics,
|
||
|
|
'benchmark_checks': checks,
|
||
|
|
'champion_benchmarks': CHAMPION_BENCHMARKS,
|
||
|
|
'trades_sample': sim.closed_trades[:20],
|
||
|
|
}
|
||
|
|
with open(out_file, 'w') as f:
|
||
|
|
json.dump(result, f, indent=2, default=str)
|
||
|
|
|
||
|
|
log.info(f"Results saved: {out_file}")
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
def main():
|
||
|
|
parser = argparse.ArgumentParser(description="DOLPHIN Alpha Engine Integrity Test")
|
||
|
|
parser.add_argument(
|
||
|
|
'--ng3-json-dir',
|
||
|
|
default=r'c:\Users\Lenovo\Documents\- Dolphin NG HD (NG3)\correlation_arb512\eigenvalues',
|
||
|
|
help='Path to NG3 eigenvalues dir (date sub-dirs with scan_*.json)'
|
||
|
|
)
|
||
|
|
parser.add_argument('--start-date', default='2026-01-01')
|
||
|
|
parser.add_argument('--end-date', default='2026-02-24')
|
||
|
|
parser.add_argument(
|
||
|
|
'--assets',
|
||
|
|
default='BTCUSDT,ETHUSDT,ADAUSDT,SOLUSDT,DOTUSDT,AVAXUSDT,LINKUSDT,UNIUSDT,ATOMUSDT',
|
||
|
|
help='Comma-separated assets to trade'
|
||
|
|
)
|
||
|
|
parser.add_argument(
|
||
|
|
'--ng5-arrow-dir',
|
||
|
|
default=r'c:\Users\Lenovo\Documents\- Dolphin NG5\correlation_arb512\arrow_scans',
|
||
|
|
help='NG5 arrow_scans dir (optional, for mixed test)'
|
||
|
|
)
|
||
|
|
parser.add_argument('--ng5-date', default='2026-02-25', help='NG5 date to splice')
|
||
|
|
parser.add_argument('--no-ng5', action='store_true', help='NG3-only test')
|
||
|
|
parser.add_argument('--report-only', action='store_true', help='Print latest results only')
|
||
|
|
args = parser.parse_args()
|
||
|
|
|
||
|
|
assets = [a.strip() for a in args.assets.split(',')]
|
||
|
|
ng5_dir = None if args.no_ng5 else args.ng5_arrow_dir
|
||
|
|
ng5_date = None if args.no_ng5 else args.ng5_date
|
||
|
|
|
||
|
|
run_integrity_test(
|
||
|
|
ng3_json_dir=args.ng3_json_dir,
|
||
|
|
start_date=args.start_date,
|
||
|
|
end_date=args.end_date,
|
||
|
|
assets=assets,
|
||
|
|
ng5_arrow_dir=ng5_dir,
|
||
|
|
ng5_date=ng5_date,
|
||
|
|
report_only=args.report_only,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == '__main__':
|
||
|
|
main()
|