""" Monte Carlo Metrics Extractor ============================= Extract 48 metrics and 10 classification labels from trial results. Metric Categories: M01-M15: Primary Performance Metrics M16-M32: Risk / Stability Metrics M33-M38: Signal Quality Metrics M39-M43: Capital Path Metrics M44-M48: Regime Metrics L01-L10: Derived Classification Labels Reference: MONTE_CARLO_SYSTEM_ENVELOPE_SPEC.md Section 6 """ from typing import Dict, List, Optional, NamedTuple, Any, Tuple from dataclasses import dataclass, field from datetime import datetime import numpy as np from .mc_sampler import MCTrialConfig @dataclass class MCTrialResult: """Complete result from a Monte Carlo trial.""" trial_id: int config: MCTrialConfig # Primary Performance Metrics (M01-M15) roi_pct: float = 0.0 profit_factor: float = 0.0 win_rate: float = 0.0 n_trades: int = 0 max_drawdown_pct: float = 0.0 sharpe_ratio: float = 0.0 sortino_ratio: float = 0.0 calmar_ratio: float = 0.0 avg_win_pct: float = 0.0 avg_loss_pct: float = 0.0 win_loss_ratio: float = 0.0 expectancy_pct: float = 0.0 h1_roi_pct: float = 0.0 h2_roi_pct: float = 0.0 h2_h1_ratio: float = 0.0 # Risk / Stability Metrics (M16-M32) n_consecutive_losses_max: int = 0 n_stop_exits: int = 0 n_tp_exits: int = 0 n_hold_exits: int = 0 stop_rate: float = 0.0 tp_rate: float = 0.0 hold_rate: float = 0.0 avg_hold_bars: float = 0.0 vol_of_daily_pnl: float = 0.0 skew_daily_pnl: float = 0.0 kurtosis_daily_pnl: float = 0.0 worst_day_pct: float = 0.0 best_day_pct: float = 0.0 n_days_profitable: int = 0 n_days_loss: int = 0 profitable_day_rate: float = 0.0 max_daily_drawdown_pct: float = 0.0 # Signal Quality Metrics (M33-M38) dc_skip_rate: float = 0.0 ob_skip_rate: float = 0.0 dc_confirm_rate: float = 0.0 irp_match_rate: float = 0.0 entry_attempt_rate: float = 0.0 signal_to_trade_rate: float = 0.0 # Capital Path Metrics (M39-M43) equity_curve_slope: float = 0.0 equity_curve_r2: float = 0.0 equity_curve_autocorr: float = 0.0 max_underwater_days: int = 0 recovery_factor: float = 0.0 # Regime Metrics (M44-M48) date_pnl_std: float = 0.0 date_pnl_range: float = 0.0 q10_date_pnl: float = 0.0 q90_date_pnl: float = 0.0 tail_ratio: float = 0.0 # Classification Labels (L01-L10) profitable: bool = False strongly_profitable: bool = False drawdown_ok: bool = False sharpe_ok: bool = False pf_ok: bool = False wr_ok: bool = False champion_region: bool = False catastrophic: bool = False inert: bool = False h2_degradation: bool = False # Metadata timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) execution_time_sec: float = 0.0 status: str = "pending" error_message: Optional[str] = None def compute_labels(self): """Compute classification labels from metrics.""" # L01: profitable self.profitable = self.roi_pct > 0 # L02: strongly_profitable self.strongly_profitable = self.roi_pct > 30 # L03: drawdown_ok self.drawdown_ok = self.max_drawdown_pct < 20 # L04: sharpe_ok self.sharpe_ok = self.sharpe_ratio > 1.5 # L05: pf_ok self.pf_ok = self.profit_factor > 1.10 # L06: wr_ok self.wr_ok = self.win_rate > 0.45 # L07: champion_region self.champion_region = ( self.strongly_profitable and self.drawdown_ok and self.sharpe_ok and self.pf_ok and self.wr_ok ) # L08: catastrophic self.catastrophic = ( self.roi_pct < -30 or self.max_drawdown_pct > 40 ) # L09: inert self.inert = self.n_trades < 50 # L10: h2_degradation self.h2_degradation = self.h2_h1_ratio < 0.50 def to_dict(self) -> Dict[str, Any]: """Convert to dictionary (flat structure for DataFrame).""" result = { # IDs 'trial_id': self.trial_id, 'timestamp': self.timestamp, 'execution_time_sec': self.execution_time_sec, 'status': self.status, 'error_message': self.error_message, } # Add all config parameters with P_ prefix config_dict = self.config.to_dict() for k, v in config_dict.items(): result[f'P_{k}'] = v # Add metrics with M_ prefix result.update({ 'M_roi_pct': self.roi_pct, 'M_profit_factor': self.profit_factor, 'M_win_rate': self.win_rate, 'M_n_trades': self.n_trades, 'M_max_drawdown_pct': self.max_drawdown_pct, 'M_sharpe_ratio': self.sharpe_ratio, 'M_sortino_ratio': self.sortino_ratio, 'M_calmar_ratio': self.calmar_ratio, 'M_avg_win_pct': self.avg_win_pct, 'M_avg_loss_pct': self.avg_loss_pct, 'M_win_loss_ratio': self.win_loss_ratio, 'M_expectancy_pct': self.expectancy_pct, 'M_h1_roi_pct': self.h1_roi_pct, 'M_h2_roi_pct': self.h2_roi_pct, 'M_h2_h1_ratio': self.h2_h1_ratio, 'M_n_consecutive_losses_max': self.n_consecutive_losses_max, 'M_n_stop_exits': self.n_stop_exits, 'M_n_tp_exits': self.n_tp_exits, 'M_n_hold_exits': self.n_hold_exits, 'M_stop_rate': self.stop_rate, 'M_tp_rate': self.tp_rate, 'M_hold_rate': self.hold_rate, 'M_avg_hold_bars': self.avg_hold_bars, 'M_vol_of_daily_pnl': self.vol_of_daily_pnl, 'M_skew_daily_pnl': self.skew_daily_pnl, 'M_kurtosis_daily_pnl': self.kurtosis_daily_pnl, 'M_worst_day_pct': self.worst_day_pct, 'M_best_day_pct': self.best_day_pct, 'M_n_days_profitable': self.n_days_profitable, 'M_n_days_loss': self.n_days_loss, 'M_profitable_day_rate': self.profitable_day_rate, 'M_max_daily_drawdown_pct': self.max_daily_drawdown_pct, 'M_dc_skip_rate': self.dc_skip_rate, 'M_ob_skip_rate': self.ob_skip_rate, 'M_dc_confirm_rate': self.dc_confirm_rate, 'M_irp_match_rate': self.irp_match_rate, 'M_entry_attempt_rate': self.entry_attempt_rate, 'M_signal_to_trade_rate': self.signal_to_trade_rate, 'M_equity_curve_slope': self.equity_curve_slope, 'M_equity_curve_r2': self.equity_curve_r2, 'M_equity_curve_autocorr': self.equity_curve_autocorr, 'M_max_underwater_days': self.max_underwater_days, 'M_recovery_factor': self.recovery_factor, 'M_date_pnl_std': self.date_pnl_std, 'M_date_pnl_range': self.date_pnl_range, 'M_q10_date_pnl': self.q10_date_pnl, 'M_q90_date_pnl': self.q90_date_pnl, 'M_tail_ratio': self.tail_ratio, }) # Add labels with L_ prefix result.update({ 'L_profitable': self.profitable, 'L_strongly_profitable': self.strongly_profitable, 'L_drawdown_ok': self.drawdown_ok, 'L_sharpe_ok': self.sharpe_ok, 'L_pf_ok': self.pf_ok, 'L_wr_ok': self.wr_ok, 'L_champion_region': self.champion_region, 'L_catastrophic': self.catastrophic, 'L_inert': self.inert, 'L_h2_degradation': self.h2_degradation, }) return result @classmethod def from_dict(cls, d: Dict[str, Any]) -> 'MCTrialResult': """Create from dictionary.""" # Extract config config_dict = {k[2:]: v for k, v in d.items() if k.startswith('P_') and k != 'P_trial_id'} config = MCTrialConfig.from_dict(config_dict) # Create result result = cls(trial_id=d.get('trial_id', 0), config=config) # Set metrics for k, v in d.items(): if k.startswith('M_'): attr_name = k[2:] if hasattr(result, attr_name): setattr(result, attr_name, v) elif k.startswith('L_'): attr_name = k[2:] if hasattr(result, attr_name): setattr(result, attr_name, v) # Set metadata result.timestamp = d.get('timestamp', datetime.now().isoformat()) result.execution_time_sec = d.get('execution_time_sec', 0.0) result.status = d.get('status', 'completed') result.error_message = d.get('error_message') return result class MCMetrics: """ Monte Carlo Metrics Extractor. Computes all 48 metrics and 10 classification labels from backtest results. """ def __init__(self, initial_capital: float = 25000.0): """ Initialize metrics extractor. Parameters ---------- initial_capital : float Initial capital for ROI calculation """ self.initial_capital = initial_capital def compute( self, config: MCTrialConfig, trades: List[Dict], daily_pnls: List[float], date_stats: List[Dict], signal_stats: Dict[str, Any], execution_time_sec: float = 0.0 ) -> MCTrialResult: """ Compute all metrics from backtest results. Parameters ---------- config : MCTrialConfig Trial configuration trades : List[Dict] Trade records with keys: pnl, pnl_pct, exit_type, bars_held, etc. daily_pnls : List[float] Daily P&L values date_stats : List[Dict] Per-date statistics signal_stats : Dict[str, Any] Signal processing statistics execution_time_sec : float Trial execution time Returns ------- MCTrialResult Complete trial result with all metrics """ result = MCTrialResult(trial_id=config.trial_id, config=config) result.execution_time_sec = execution_time_sec # Compute metrics self._compute_performance_metrics(result, trades, daily_pnls, date_stats) self._compute_risk_metrics(result, trades, daily_pnls) self._compute_signal_metrics(result, signal_stats) self._compute_capital_metrics(result, daily_pnls) self._compute_regime_metrics(result, daily_pnls) # Compute labels result.compute_labels() result.status = "completed" return result def _compute_performance_metrics( self, result: MCTrialResult, trades: List[Dict], daily_pnls: List[float], date_stats: List[Dict] ): """Compute M01-M15: Primary Performance Metrics.""" n_trades = len(trades) result.n_trades = n_trades if n_trades == 0: # No trades - all metrics stay at defaults return # Win/loss separation winning_trades = [t for t in trades if t.get('pnl', 0) > 0] losing_trades = [t for t in trades if t.get('pnl', 0) <= 0] n_wins = len(winning_trades) n_losses = len(losing_trades) # M01: roi_pct final_capital = self.initial_capital + sum(daily_pnls) if daily_pnls else self.initial_capital result.roi_pct = (final_capital - self.initial_capital) / self.initial_capital * 100 # M02: profit_factor gross_wins = sum(t.get('pnl', 0) for t in winning_trades) gross_losses = abs(sum(t.get('pnl', 0) for t in losing_trades)) result.profit_factor = gross_wins / gross_losses if gross_losses > 0 else float('inf') # M03: win_rate result.win_rate = n_wins / n_trades if n_trades > 0 else 0 # M05: max_drawdown_pct result.max_drawdown_pct = self._compute_max_drawdown_pct(daily_pnls) # M06: sharpe_ratio (annualized) result.sharpe_ratio = self._compute_sharpe_ratio(daily_pnls) # M07: sortino_ratio result.sortino_ratio = self._compute_sortino_ratio(daily_pnls) # M08: calmar_ratio result.calmar_ratio = result.roi_pct / result.max_drawdown_pct if result.max_drawdown_pct > 0 else float('inf') # M09: avg_win_pct win_pnls_pct = [t.get('pnl_pct', 0) * 100 for t in winning_trades] result.avg_win_pct = np.mean(win_pnls_pct) if win_pnls_pct else 0 # M10: avg_loss_pct loss_pnls_pct = [t.get('pnl_pct', 0) * 100 for t in losing_trades] result.avg_loss_pct = np.mean(loss_pnls_pct) if loss_pnls_pct else 0 # M11: win_loss_ratio result.win_loss_ratio = abs(result.avg_win_pct / result.avg_loss_pct) if result.avg_loss_pct != 0 else float('inf') # M12: expectancy_pct wr = result.win_rate result.expectancy_pct = wr * result.avg_win_pct + (1 - wr) * result.avg_loss_pct # M13-M15: H1/H2 metrics if len(date_stats) >= 2: mid = len(date_stats) // 2 h1_pnl = sum(d.get('pnl', 0) for d in date_stats[:mid]) h2_pnl = sum(d.get('pnl', 0) for d in date_stats[mid:]) h1_capital = self.initial_capital + h1_pnl result.h1_roi_pct = h1_pnl / self.initial_capital * 100 result.h2_roi_pct = h2_pnl / self.initial_capital * 100 result.h2_h1_ratio = h2_pnl / h1_pnl if h1_pnl != 0 else 0 def _compute_risk_metrics( self, result: MCTrialResult, trades: List[Dict], daily_pnls: List[float] ): """Compute M16-M32: Risk / Stability Metrics.""" # M16: n_consecutive_losses_max result.n_consecutive_losses_max = self._compute_max_consecutive_losses(trades) # M17-M19: Exit type counts result.n_stop_exits = sum(1 for t in trades if t.get('exit_type') == 'stop') result.n_tp_exits = sum(1 for t in trades if t.get('exit_type') == 'tp') result.n_hold_exits = sum(1 for t in trades if t.get('exit_type') == 'hold') # M20-M22: Exit rates n_trades = len(trades) if n_trades > 0: result.stop_rate = result.n_stop_exits / n_trades result.tp_rate = result.n_tp_exits / n_trades result.hold_rate = result.n_hold_exits / n_trades # M23: avg_hold_bars hold_bars = [t.get('bars_held', 0) for t in trades] result.avg_hold_bars = np.mean(hold_bars) if hold_bars else 0 # M24-M26: Daily P&L distribution stats if len(daily_pnls) >= 2: result.vol_of_daily_pnl = np.std(daily_pnls, ddof=1) result.skew_daily_pnl = self._compute_skewness(daily_pnls) result.kurtosis_daily_pnl = self._compute_kurtosis(daily_pnls) # M27-M28: Best/worst day if daily_pnls: result.worst_day_pct = min(daily_pnls) / self.initial_capital * 100 result.best_day_pct = max(daily_pnls) / self.initial_capital * 100 # M29-M31: Profitable days result.n_days_profitable = sum(1 for pnl in daily_pnls if pnl > 0) result.n_days_loss = sum(1 for pnl in daily_pnls if pnl <= 0) if daily_pnls: result.profitable_day_rate = result.n_days_profitable / len(daily_pnls) # M32: max_daily_drawdown_pct result.max_daily_drawdown_pct = self._compute_max_daily_drawdown_pct(daily_pnls) def _compute_signal_metrics( self, result: MCTrialResult, signal_stats: Dict[str, Any] ): """Compute M33-M38: Signal Quality Metrics.""" result.dc_skip_rate = signal_stats.get('dc_skip_rate', 0) result.ob_skip_rate = signal_stats.get('ob_skip_rate', 0) result.dc_confirm_rate = signal_stats.get('dc_confirm_rate', 0) result.irp_match_rate = signal_stats.get('irp_match_rate', 0) result.entry_attempt_rate = signal_stats.get('entry_attempt_rate', 0) result.signal_to_trade_rate = signal_stats.get('signal_to_trade_rate', 0) def _compute_capital_metrics( self, result: MCTrialResult, daily_pnls: List[float] ): """Compute M39-M43: Capital Path Metrics.""" if len(daily_pnls) < 2: return # Compute equity curve equity = [self.initial_capital] for pnl in daily_pnls: equity.append(equity[-1] + pnl) # M39: equity_curve_slope (linear regression) days = np.arange(len(equity)) result.equity_curve_slope, result.equity_curve_r2 = self._linear_regression(days, equity) # M41: equity_curve_autocorr returns = np.diff(equity) / equity[:-1] if len(returns) > 1: result.equity_curve_autocorr = np.corrcoef(returns[:-1], returns[1:])[0, 1] if len(returns) > 2 else 0 # M42: max_underwater_days result.max_underwater_days = self._compute_max_underwater_days(equity) # M43: recovery_factor total_return = sum(daily_pnls) max_dd = self._compute_max_drawdown_value(daily_pnls) result.recovery_factor = total_return / max_dd if max_dd > 0 else float('inf') def _compute_regime_metrics( self, result: MCTrialResult, daily_pnls: List[float] ): """Compute M44-M48: Regime Metrics.""" if len(daily_pnls) < 2: return # M44: date_pnl_std result.date_pnl_std = np.std(daily_pnls, ddof=1) # M45: date_pnl_range result.date_pnl_range = max(daily_pnls) - min(daily_pnls) # M46-M47: Quantiles result.q10_date_pnl = np.percentile(daily_pnls, 10) result.q90_date_pnl = np.percentile(daily_pnls, 90) # M48: tail_ratio if result.q90_date_pnl != 0: result.tail_ratio = abs(result.q10_date_pnl) / abs(result.q90_date_pnl) # --- Helper Methods --- def _compute_max_drawdown_pct(self, daily_pnls: List[float]) -> float: """Compute maximum drawdown as percentage.""" if not daily_pnls: return 0 equity = [self.initial_capital] for pnl in daily_pnls: equity.append(equity[-1] + pnl) peak = equity[0] max_dd = 0 for e in equity: if e > peak: peak = e dd = (peak - e) / peak max_dd = max(max_dd, dd) return max_dd * 100 def _compute_max_drawdown_value(self, daily_pnls: List[float]) -> float: """Compute maximum drawdown as value.""" if not daily_pnls: return 0 equity = [self.initial_capital] for pnl in daily_pnls: equity.append(equity[-1] + pnl) peak = equity[0] max_dd = 0 for e in equity: if e > peak: peak = e dd = peak - e max_dd = max(max_dd, dd) return max_dd def _compute_sharpe_ratio(self, daily_pnls: List[float]) -> float: """Compute annualized Sharpe ratio.""" if len(daily_pnls) < 2: return 0 returns = [p / self.initial_capital for p in daily_pnls] mean_ret = np.mean(returns) std_ret = np.std(returns, ddof=1) if std_ret == 0: return 0 # Annualize (assuming 365 trading days) return (mean_ret / std_ret) * np.sqrt(365) def _compute_sortino_ratio(self, daily_pnls: List[float]) -> float: """Compute annualized Sortino ratio.""" if len(daily_pnls) < 2: return 0 returns = [p / self.initial_capital for p in daily_pnls] mean_ret = np.mean(returns) # Downside deviation (only negative returns) downside_returns = [r for r in returns if r < 0] if not downside_returns: return float('inf') downside_std = np.std(downside_returns, ddof=1) if downside_std == 0: return float('inf') return (mean_ret / downside_std) * np.sqrt(365) def _compute_max_consecutive_losses(self, trades: List[Dict]) -> int: """Compute maximum consecutive losing trades.""" max_consec = 0 current_consec = 0 for trade in trades: if trade.get('pnl', 0) <= 0: current_consec += 1 max_consec = max(max_consec, current_consec) else: current_consec = 0 return max_consec def _compute_skewness(self, data: List[float]) -> float: """Compute skewness.""" if len(data) < 3: return 0 n = len(data) mean = np.mean(data) std = np.std(data, ddof=1) if std == 0: return 0 skew = sum(((x - mean) / std) ** 3 for x in data) * n / ((n - 1) * (n - 2)) return skew def _compute_kurtosis(self, data: List[float]) -> float: """Compute excess kurtosis.""" if len(data) < 4: return 0 n = len(data) mean = np.mean(data) std = np.std(data, ddof=1) if std == 0: return 0 kurt = sum(((x - mean) / std) ** 4 for x in data) * n * (n + 1) / ((n - 1) * (n - 2) * (n - 3)) kurt -= 3 * (n - 1) ** 2 / ((n - 2) * (n - 3)) return kurt def _linear_regression(self, x: np.ndarray, y: List[float]) -> Tuple[float, float]: """Simple linear regression. Returns (slope, r_squared).""" if len(x) < 2: return 0, 0 x_mean = np.mean(x) y_mean = np.mean(y) numerator = sum((xi - x_mean) * (yi - y_mean) for xi, yi in zip(x, y)) denom_x = sum((xi - x_mean) ** 2 for xi in x) denom_y = sum((yi - y_mean) ** 2 for yi in y) if denom_x == 0: return 0, 0 slope = numerator / denom_x if denom_y == 0: r_squared = 0 else: r_squared = (numerator ** 2) / (denom_x * denom_y) return slope, r_squared def _compute_max_underwater_days(self, equity: List[float]) -> int: """Compute maximum consecutive days in drawdown.""" max_underwater = 0 current_underwater = 0 peak = equity[0] for e in equity: if e >= peak: peak = e current_underwater = 0 else: current_underwater += 1 max_underwater = max(max_underwater, current_underwater) return max_underwater def _compute_max_daily_drawdown_pct(self, daily_pnls: List[float]) -> float: """Compute worst single-day drawdown percentage.""" if not daily_pnls: return 0 equity = [self.initial_capital] for pnl in daily_pnls: equity.append(equity[-1] + pnl) max_dd_pct = 0 for i in range(1, len(equity)): prev_equity = equity[i-1] if prev_equity > 0: dd_pct = min(0, daily_pnls[i-1]) / prev_equity * 100 max_dd_pct = min(max_dd_pct, dd_pct) return max_dd_pct def test_metrics(): """Quick test of metrics computation.""" from .mc_sampler import MCSampler sampler = MCSampler() config = sampler.generate_champion_trial() # Create dummy data trades = [ {'pnl': 100, 'pnl_pct': 0.004, 'exit_type': 'tp', 'bars_held': 50}, {'pnl': -50, 'pnl_pct': -0.002, 'exit_type': 'stop', 'bars_held': 20}, {'pnl': 150, 'pnl_pct': 0.006, 'exit_type': 'tp', 'bars_held': 80}, ] * 20 # 60 trades daily_pnls = [50, -20, 80, -10, 100, -30, 60, 40, -15, 90] * 5 # 50 days date_stats = [{'date': f'2026-01-{i+1:02d}', 'pnl': daily_pnls[i]} for i in range(len(daily_pnls))] signal_stats = { 'dc_skip_rate': 0.1, 'ob_skip_rate': 0.05, 'dc_confirm_rate': 0.7, 'irp_match_rate': 0.6, 'entry_attempt_rate': 0.3, 'signal_to_trade_rate': 0.15, } metrics = MCMetrics() result = metrics.compute(config, trades, daily_pnls, date_stats, signal_stats) print("Test Metrics Result:") print(f" ROI: {result.roi_pct:.2f}%") print(f" Profit Factor: {result.profit_factor:.2f}") print(f" Win Rate: {result.win_rate:.2%}") print(f" Sharpe: {result.sharpe_ratio:.2f}") print(f" Max DD: {result.max_drawdown_pct:.2f}%") print(f" Champion Region: {result.champion_region}") return result if __name__ == "__main__": test_metrics()