Files
DOLPHIN/nautilus_dolphin/run_nd_backtest_arrow.py
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

505 lines
19 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Nautilus-Dolphin Backtest — Arrow NG5 Edition
===============================================
Runs the full Nautilus-Dolphin backtest using DOLPHIN NG5 Arrow IPC
scan files instead of legacy JSON/VBT-cache Parquet files.
Supports two modes:
--mode catalog : Arrow → Nautilus ParquetDataCatalog → BacktestNode
--mode stream : Arrow → streaming ArrowEigenvalueDataAdapter (VBT path)
Usage (catalog mode = production recommended):
python run_nd_backtest_arrow.py
--arrow-scans "C:/.../correlation_arb512/arrow_scans"
--mode catalog
--assets BTCUSDT,ETHUSDT
--start-date 2026-02-25
--end-date 2026-02-25
Usage (stream mode = direct data validation):
python run_nd_backtest_arrow.py
--arrow-scans "C:/.../correlation_arb512/arrow_scans"
--mode stream
--assets BTCUSDT
Author: Antigravity / DOLPHIN NG5 migration
Date: 2026-02-25
"""
import os
import sys
import json
import asyncio
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any, List
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s | %(levelname)-8s | %(message)s',
datefmt='%H:%M:%S',
)
logger = logging.getLogger(__name__)
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
# ─── Arrow adapter (always available) ─────────────────────────────────────────
from nautilus_dolphin.nautilus.arrow_data_adapter import (
ArrowEigenvalueDataAdapter,
ArrowBacktestDataLoader,
ArrowToParquetBatchConverter,
)
from nautilus_dolphin.nautilus.arrow_parquet_catalog_builder import ArrowNautilusCatalogBuilder
# ─── Nautilus (required for catalog/backtest modes) ────────────────────────────
try:
from nautilus_trader.backtest.node import BacktestNode
from nautilus_trader.backtest.config import (
BacktestRunConfig,
BacktestEngineConfig,
BacktestVenueConfig,
BacktestDataConfig,
)
from nautilus_trader.config import ImportableStrategyConfig
from nautilus_trader.execution.config import ImportableExecAlgorithmConfig
from nautilus_trader.persistence.catalog import ParquetDataCatalog
from nautilus_trader.model.data import QuoteTick
from nautilus_trader.risk.config import RiskEngineConfig
from nautilus_trader.cache.config import CacheConfig
NAUTILUS_AVAILABLE = True
logger.info("[OK] nautilus_trader imports successful")
except ImportError as e:
logger.warning(f"[WARN] nautilus_trader not available: {e}")
NAUTILUS_AVAILABLE = False
from nautilus_dolphin.nautilus.strategy_config import (
create_tight_3_3_config,
DolphinStrategyConfig,
)
import pandas as pd
# ──────────────────────────────────────────────────────────────────────────────
class NDBacktestArrow:
"""
Runs Nautilus-Dolphin backtest using DOLPHIN NG5 Arrow IPC output.
Supports two data paths:
1. ``catalog`` Arrow→catalog→BacktestNode (full Nautilus VBT pipeline)
2. ``stream`` Arrow→adapter→manual signal loop (validation / debug)
"""
def __init__(
self,
arrow_scans_dir: str,
output_dir: str = "backtest_results_arrow",
venue: str = "BINANCE_FUTURES",
):
self.arrow_scans_dir = Path(arrow_scans_dir)
self.output_dir = Path(output_dir)
self.venue_str = venue
self.output_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"[OK] NDBacktestArrow initialized")
logger.info(f" Arrow scans: {self.arrow_scans_dir}")
logger.info(f" Output: {self.output_dir}")
# ─── Catalog mode ────────────────────────────────────────────────────────
def prepare_catalog(
self,
assets: List[str],
start_date: str,
end_date: str,
) -> str:
catalog_dir = self.output_dir / "catalog"
builder = ArrowNautilusCatalogBuilder(
arrow_scans_dir=str(self.arrow_scans_dir),
catalog_output_dir=str(catalog_dir),
venue=self.venue_str,
)
return builder.build_catalog(
assets=assets,
start_date=start_date,
end_date=end_date,
)
def run_backtest_catalog(
self,
catalog_path: str,
assets: List[str],
start_date: str,
end_date: str,
strategy_config: Optional[DolphinStrategyConfig] = None,
) -> Dict[str, Any]:
"""Full Nautilus BacktestNode run (VBT pipeline preserved)."""
if not NAUTILUS_AVAILABLE:
raise RuntimeError("nautilus_trader is required for catalog mode")
if strategy_config is None:
strategy_config = create_tight_3_3_config()
logger.info("=" * 70)
logger.info("CONFIGURING NAUTILUS BACKTEST (Arrow→Catalog mode)")
logger.info("=" * 70)
max_leverage = getattr(strategy_config, 'max_leverage', 2.5)
venue_config = BacktestVenueConfig(
name=self.venue_str,
oms_type="NETTING",
account_type="MARGIN",
base_currency="USDT",
starting_balances=["100000 USDT"],
default_leverage=str(max_leverage),
)
data_configs = []
for asset in assets:
instrument_id = f"{asset}.{self.venue_str}"
data_configs.append(BacktestDataConfig(
catalog_path=catalog_path,
data_cls="nautilus_trader.model.data:QuoteTick",
instrument_id=instrument_id,
))
data_configs.append(BacktestDataConfig(
catalog_path=catalog_path,
data_cls="nautilus_trader.model.data:QuoteTick",
instrument_id=f"{asset}.SIGNAL.{self.venue_str}",
))
nautilus_strategy_config = ImportableStrategyConfig(
strategy_path="nautilus_dolphin.nautilus.strategy:DolphinExecutionStrategy",
config_path="nautilus_dolphin.nautilus.strategy_config:DolphinStrategyConfig",
config=strategy_config.dict(),
)
exec_algorithm_config = ImportableExecAlgorithmConfig(
exec_algorithm_path="nautilus_dolphin.nautilus.smart_exec_algorithm:SmartExecAlgorithm",
config_path="nautilus_trader.execution.config:ExecAlgorithmConfig",
config={
'exec_algorithm_id': "SMART_EXEC",
'entry_timeout_sec': 25,
'entry_abort_threshold_bps': 5.0,
'exit_timeout_sec': 10,
'maker_fee_rate': 0.0002,
'taker_fee_rate': 0.0005,
},
)
engine_config = BacktestEngineConfig(
strategies=[nautilus_strategy_config],
exec_algorithms=[exec_algorithm_config],
risk_engine=RiskEngineConfig(bypass=True),
cache=CacheConfig(tick_capacity=1_000_000, bar_capacity=100_000),
)
run_config = BacktestRunConfig(
venues=[venue_config],
data=data_configs,
engine=engine_config,
chunk_size=None,
raise_exception=True,
dispose_on_completion=False,
)
logger.info("=" * 70)
logger.info("RUNNING BACKTEST")
logger.info("=" * 70)
try:
node = BacktestNode(configs=[run_config])
node.run()
engine = node.get_engine(run_config.id)
result = engine.get_result()
if result:
try:
trades = self._extract_trades_from_result(result)
except Exception:
trades = self._extract_trades(engine)
else:
trades = self._extract_trades(engine)
metrics = self._compute_metrics(trades)
self._enrich_from_nautilus_result(result, metrics)
except Exception as e:
logger.error(f"[FAIL] Backtest failed: {e}")
import traceback; logger.error(traceback.format_exc())
raise
return self._save_and_report(trades, metrics, assets, start_date, end_date, "catalog")
# ─── Stream mode (validation / paper-trade path) ─────────────────────────
def run_backtest_stream(
self,
assets: List[str],
start_date: str,
end_date: str,
) -> Dict[str, Any]:
"""
Stream Arrow files through the adapter and collect signal statistics.
Does NOT use the Nautilus BacktestNode — this is a direct signal-loop
validation mode useful for confirming data parity vs. JSON.
"""
logger.info("=" * 70)
logger.info("RUNNING SIGNAL-LOOP BACKTEST (Arrow stream mode)")
logger.info("=" * 70)
adapter = ArrowEigenvalueDataAdapter(
arrow_scans_dir=str(self.arrow_scans_dir),
venue=self.venue_str,
assets=assets,
)
start = datetime.strptime(start_date, '%Y-%m-%d')
end_dt = datetime.strptime(end_date, '%Y-%m-%d')
adapter.load_date_range(start, end_dt)
all_signals = []
total_bars = 0
for bars, signals in adapter:
total_bars += len(bars)
all_signals.extend(signals)
metrics = self._signal_loop_metrics(all_signals, assets)
logger.info(f"[Stream] Total scans: {len(adapter._scan_files)}")
logger.info(f"[Stream] Total bars: {total_bars}")
logger.info(f"[Stream] Total signals: {len(all_signals)}")
return self._save_and_report([], metrics, assets, start_date, end_date, "stream",
extra={'total_bars': total_bars,
'total_signals': len(all_signals)})
# ─── Helpers ──────────────────────────────────────────────────────────────
def _extract_trades(self, engine) -> list:
return [
{
"trade_id": str(p.id),
"instrument_id": str(p.instrument_id),
"entry_time": str(pd.to_datetime(p.ts_opened, unit='ns', utc=True)),
"exit_time": str(pd.to_datetime(p.ts_closed, unit='ns', utc=True)),
"entry_price": float(p.avg_px_open),
"exit_price": float(p.avg_px_close),
"direction": str(p.side),
"quantity": float(p.quantity),
"pnl": float(p.realized_pnl),
}
for p in engine.cache.positions_closed()
]
def _extract_trades_from_result(self, result) -> list:
return [
{
"trade_id": str(p.id),
"instrument_id": str(p.instrument_id),
"entry_time": str(pd.to_datetime(p.ts_opened, unit='ns', utc=True)),
"exit_time": str(pd.to_datetime(p.ts_closed, unit='ns', utc=True)),
"entry_price": float(p.avg_px_open),
"exit_price": float(p.avg_px_close),
"direction": str(p.side),
"quantity": float(p.quantity),
"pnl": float(p.realized_pnl),
}
for p in result.positions()
]
def _compute_metrics(self, trades: list) -> Dict[str, Any]:
if not trades:
return {"win_rate": 0, "total_pnl": 0, "roi": 0,
"winning_trades": 0, "losing_trades": 0,
"max_drawdown": 0, "sharpe": 0}
pnls = [t.get('pnl', 0) for t in trades]
winners = [p for p in pnls if p > 0]
cumulative = pd.Series(pnls).cumsum()
rolling_max = cumulative.cummax()
drawdown = (cumulative - rolling_max)
max_dd = float(drawdown.min())
import numpy as np
returns = pd.Series(pnls) / 100_000
sharpe = float(returns.mean() / returns.std() * (252 * 17_280) ** 0.5) if returns.std() > 0 else 0.0
return {
"win_rate": len(winners) / len(trades),
"total_pnl": round(sum(pnls), 2),
"roi": sum(pnls) / 100_000,
"winning_trades": len(winners),
"losing_trades": len(trades) - len(winners),
"max_drawdown": max_dd,
"sharpe": sharpe,
}
def _signal_loop_metrics(self, signals: list, assets: List[str]) -> Dict[str, Any]:
import numpy as np
if not signals:
return {"signal_count": 0}
vel_divs = [s.get('vel_div', 0) for s in signals]
longs = sum(1 for vd in vel_divs if vd > 0)
shorts = sum(1 for vd in vel_divs if vd < 0)
return {
"signal_count": len(signals),
"long_signals": longs,
"short_signals": shorts,
"mean_vel_div": float(np.mean(vel_divs)),
"std_vel_div": float(np.std(vel_divs)),
}
def _enrich_from_nautilus_result(self, result, metrics: dict):
if result is None:
return
if hasattr(result, 'stats_pnls'):
stats = result.stats_pnls.get('USDT', {}) or next(iter(result.stats_pnls.values()), {})
for nd_key, our_key in [
('PnL (total)', 'total_pnl'),
('PnL% (total)', 'roi'),
('Win Rate', 'win_rate'),
]:
if nd_key in stats:
try: metrics[our_key] = float(stats[nd_key])
except (TypeError, ValueError): pass
def _save_and_report(
self,
trades: list,
metrics: Dict[str, Any],
assets: List[str],
start_date: str,
end_date: str,
mode: str,
extra: Optional[dict] = None,
) -> Dict[str, Any]:
result = {
"timestamp": datetime.now().isoformat(),
"mode": mode,
"data_source": "arrow_ng5",
"backtest_params": {
"assets": assets,
"start_date": start_date,
"end_date": end_date,
"venue": self.venue_str,
},
"trades": trades,
"metrics": metrics,
"trade_count": len(trades),
}
if extra:
result.update(extra)
ts = datetime.now().strftime('%Y%m%d_%H%M%S')
out_file = self.output_dir / f"nd_arrow_backtest_{mode}_{ts}.json"
with open(out_file, 'w') as f:
json.dump(result, f, indent=2, default=str)
logger.info("=" * 70)
logger.info("BACKTEST RESULTS (Arrow NG5)")
logger.info("=" * 70)
logger.info(f"Mode: {mode}")
logger.info(f"Trades: {len(trades)}")
logger.info(f"Win Rate: {metrics.get('win_rate', 0):.2%}")
logger.info(f"Total P&L: ${metrics.get('total_pnl', 0):,.2f}")
logger.info(f"ROI: {metrics.get('roi', 0):.2%}")
logger.info(f"Max DD: ${metrics.get('max_drawdown', 0):,.2f}")
logger.info(f"Sharpe: {metrics.get('sharpe', 0):.3f}")
logger.info(f"Results: {out_file}")
logger.info("=" * 70)
return result
# ──────────────────────────────────────────────────────────────────────────────
async def main():
import argparse
parser = argparse.ArgumentParser(
description="Nautilus-Dolphin Arrow NG5 Backtest Runner"
)
parser.add_argument(
"--arrow-scans", required=True,
help="Path to NG5 arrow_scans directory, e.g. .../correlation_arb512/arrow_scans",
)
parser.add_argument(
"--mode", choices=["catalog", "stream"], default="catalog",
help="catalog=full Nautilus VBT, stream=signal-loop validation",
)
parser.add_argument("--assets", default="BTCUSDT", help="Comma-separated assets")
parser.add_argument("--start-date", default="2026-02-25", help="Start date YYYY-MM-DD")
parser.add_argument("--end-date", default="2026-02-25", help="End date YYYY-MM-DD")
parser.add_argument("--output-dir", default="backtest_results_arrow")
parser.add_argument("--venue", default="BINANCE_FUTURES")
parser.add_argument(
"--reference-file", default=None,
help="Legacy JSON backtest results for comparison (optional)",
)
args = parser.parse_args()
assets = [a.strip() for a in args.assets.split(",")]
runner = NDBacktestArrow(
arrow_scans_dir=args.arrow_scans,
output_dir=args.output_dir,
venue=args.venue,
)
if args.mode == "catalog":
catalog_path = runner.prepare_catalog(
assets=assets,
start_date=args.start_date,
end_date=args.end_date,
)
results = runner.run_backtest_catalog(
catalog_path=catalog_path,
assets=assets,
start_date=args.start_date,
end_date=args.end_date,
)
else:
results = runner.run_backtest_stream(
assets=assets,
start_date=args.start_date,
end_date=args.end_date,
)
# Optional comparison with legacy JSON results
if args.reference_file and os.path.exists(args.reference_file):
logger.info("=" * 70)
logger.info("COMPARISON vs. LEGACY JSON BACKTEST")
logger.info("=" * 70)
with open(args.reference_file, 'r') as f:
ref = json.load(f)
for key, label in [
('trade_count', 'Trades'),
('metrics.win_rate', 'Win Rate'),
('metrics.roi', 'ROI'),
('metrics.total_pnl', 'Total P&L'),
]:
parts = key.split('.')
ref_val = ref
ng5_val = results
for p in parts:
ref_val = ref_val.get(p, 0) if isinstance(ref_val, dict) else 0
ng5_val = ng5_val.get(p, 0) if isinstance(ng5_val, dict) else 0
if isinstance(ref_val, float) and abs(ref_val) < 10:
logger.info(f"{label:15s}: Legacy={ref_val:.4%}, NG5={ng5_val:.4%}, Δ={abs(ng5_val - ref_val):.4%}")
else:
logger.info(f"{label:15s}: Legacy={ref_val}, NG5={ng5_val}, Δ={abs(ng5_val - ref_val):.2f}")
return results
if __name__ == "__main__":
asyncio.run(main())