""" Monte Carlo Configuration Validator =================================== Internal consistency validation for all constraint groups V1-V4. Validation Pipeline: V1: Range check - each param within declared [lo, hi] V2: Constraint groups - CG-VD, CG-LEV, CG-EXIT, CG-RISK, CG-ACB, etc. V3: Cross-group check - inter-subsystem coherence V4: Degenerate check - would produce 0 trades or infinite leverage Reference: MONTE_CARLO_SYSTEM_ENVELOPE_SPEC.md Section 4 """ from typing import Dict, List, Optional, Tuple, Any from dataclasses import dataclass from enum import Enum import numpy as np from .mc_sampler import MCTrialConfig, MCSampler class ValidationStatus(Enum): """Validation result status.""" VALID = "VALID" REJECTED_V1 = "REJECTED_V1" # Range check failed REJECTED_V2 = "REJECTED_V2" # Constraint group failed REJECTED_V3 = "REJECTED_V3" # Cross-group check failed REJECTED_V4 = "REJECTED_V4" # Degenerate configuration @dataclass class ValidationResult: """Result of validation.""" status: ValidationStatus trial_id: int reject_reason: Optional[str] = None warnings: List[str] = None def __post_init__(self): if self.warnings is None: self.warnings = [] def is_valid(self) -> bool: """Check if configuration is valid.""" return self.status == ValidationStatus.VALID def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { 'status': self.status.value, 'trial_id': self.trial_id, 'reject_reason': self.reject_reason, 'warnings': self.warnings, } class MCValidator: """ Monte Carlo Configuration Validator. Implements the full V1-V4 validation pipeline. """ def __init__(self, verbose: bool = False): """ Initialize validator. Parameters ---------- verbose : bool Print detailed validation messages """ self.verbose = verbose self.sampler = MCSampler() def validate(self, config: MCTrialConfig) -> ValidationResult: """ Run full validation pipeline on a configuration. Parameters ---------- config : MCTrialConfig Configuration to validate Returns ------- ValidationResult Validation result with status and details """ warnings = [] # V1: Range checks v1_passed, v1_reason = self._validate_v1_ranges(config) if not v1_passed: return ValidationResult( status=ValidationStatus.REJECTED_V1, trial_id=config.trial_id, reject_reason=v1_reason, warnings=warnings ) # V2: Constraint group rules v2_passed, v2_reason = self._validate_v2_constraint_groups(config) if not v2_passed: return ValidationResult( status=ValidationStatus.REJECTED_V2, trial_id=config.trial_id, reject_reason=v2_reason, warnings=warnings ) # V3: Cross-group checks v3_passed, v3_reason, v3_warnings = self._validate_v3_cross_group(config) warnings.extend(v3_warnings) if not v3_passed: return ValidationResult( status=ValidationStatus.REJECTED_V3, trial_id=config.trial_id, reject_reason=v3_reason, warnings=warnings ) # V4: Degenerate check (lightweight - no actual backtest) v4_passed, v4_reason = self._validate_v4_degenerate(config) if not v4_passed: return ValidationResult( status=ValidationStatus.REJECTED_V4, trial_id=config.trial_id, reject_reason=v4_reason, warnings=warnings ) return ValidationResult( status=ValidationStatus.VALID, trial_id=config.trial_id, reject_reason=None, warnings=warnings ) def _validate_v1_ranges(self, config: MCTrialConfig) -> Tuple[bool, Optional[str]]: """ V1: Range checks - each param within declared [lo, hi]. """ params = config._asdict() for name, pdef in self.sampler.PARAMS.items(): if pdef.param_type.value in ('derived', 'fixed'): continue value = params.get(name) if value is None: return False, f"Missing parameter: {name}" # Check lower bound if pdef.lo is not None and value < pdef.lo: return False, f"{name}={value} below minimum {pdef.lo}" # Check upper bound (handle dependent bounds) hi = pdef.hi if hi is None and name == 'vel_div_extreme': hi = params.get('vel_div_threshold', -0.02) * 1.5 if hi is not None and value > hi: return False, f"{name}={value} above maximum {hi}" return True, None def _validate_v2_constraint_groups(self, config: MCTrialConfig) -> Tuple[bool, Optional[str]]: """ V2: Constraint group rules. """ # CG-VD: Velocity Divergence thresholds if not self._check_cg_vd(config): return False, "CG-VD: Velocity divergence constraints violated" # CG-LEV: Leverage bounds if not self._check_cg_lev(config): return False, "CG-LEV: Leverage constraints violated" # CG-EXIT: Exit management if not self._check_cg_exit(config): return False, "CG-EXIT: Exit constraints violated" # CG-RISK: Combined risk if not self._check_cg_risk(config): return False, "CG-RISK: Risk cap exceeded" # CG-DC-LEV: DC leverage adjustments if not self._check_cg_dc_lev(config): return False, "CG-DC-LEV: DC leverage adjustment constraints violated" # CG-ACB: ACB beta bounds if not self._check_cg_acb(config): return False, "CG-ACB: ACB beta constraints violated" # CG-SP: SmartPlacer rates if not self._check_cg_sp(config): return False, "CG-SP: SmartPlacer rate constraints violated" # CG-OB-SIG: OB signal constraints if not self._check_cg_ob_sig(config): return False, "CG-OB-SIG: OB signal constraints violated" return True, None def _check_cg_vd(self, config: MCTrialConfig) -> bool: """CG-VD: Velocity Divergence constraints.""" # extreme < threshold (both negative; extreme is more negative) if config.vel_div_extreme >= config.vel_div_threshold: if self.verbose: print(f" CG-VD fail: extreme={config.vel_div_extreme} >= threshold={config.vel_div_threshold}") return False # extreme >= -0.15 (below this, no bars fire at all) if config.vel_div_extreme < -0.15: if self.verbose: print(f" CG-VD fail: extreme={config.vel_div_extreme} < -0.15") return False # threshold <= -0.005 (above this, too many spurious entries) if config.vel_div_threshold > -0.005: if self.verbose: print(f" CG-VD fail: threshold={config.vel_div_threshold} > -0.005") return False # abs(extreme / threshold) >= 1.5 (meaningful separation) separation = abs(config.vel_div_extreme / config.vel_div_threshold) if separation < 1.5: if self.verbose: print(f" CG-VD fail: separation={separation:.2f} < 1.5") return False return True def _check_cg_lev(self, config: MCTrialConfig) -> bool: """CG-LEV: Leverage bounds.""" # min_leverage < max_leverage if config.min_leverage >= config.max_leverage: if self.verbose: print(f" CG-LEV fail: min={config.min_leverage} >= max={config.max_leverage}") return False # max_leverage - min_leverage >= 1.0 (meaningful range) if config.max_leverage - config.min_leverage < 1.0: if self.verbose: print(f" CG-LEV fail: range={config.max_leverage - config.min_leverage:.2f} < 1.0") return False # max_leverage * fraction <= 2.0 (notional-capital safety cap) notional_cap = config.max_leverage * config.fraction if notional_cap > 2.0: if self.verbose: print(f" CG-LEV fail: notional_cap={notional_cap:.2f} > 2.0") return False return True def _check_cg_exit(self, config: MCTrialConfig) -> bool: """CG-EXIT: Exit management constraints.""" tp_decimal = config.fixed_tp_pct sl_decimal = config.stop_pct / 100.0 # Convert from percentage to decimal # TP must be achievable before SL if tp_decimal > sl_decimal * 5.0: if self.verbose: print(f" CG-EXIT fail: TP={tp_decimal:.4f} > SL*5={sl_decimal*5:.4f}") return False # minimum 30 bps TP if tp_decimal < 0.0030: if self.verbose: print(f" CG-EXIT fail: TP={tp_decimal:.4f} < 0.0030") return False # minimum 20 bps SL width if sl_decimal < 0.0020: if self.verbose: print(f" CG-EXIT fail: SL={sl_decimal:.4f} < 0.0020") return False # minimum meaningful hold period if config.max_hold_bars < 20: if self.verbose: print(f" CG-EXIT fail: max_hold={config.max_hold_bars} < 20") return False # TP:SL ratio >= 0.10x if sl_decimal > 0 and tp_decimal / sl_decimal < 0.10: if self.verbose: print(f" CG-EXIT fail: TP/SL ratio={tp_decimal/sl_decimal:.2f} < 0.10") return False return True def _check_cg_risk(self, config: MCTrialConfig) -> bool: """CG-RISK: Combined risk constraints.""" # fraction * max_leverage <= 2.0 (mirrors CG-LEV) max_notional_fraction = config.fraction * config.max_leverage if max_notional_fraction > 2.0: if self.verbose: print(f" CG-RISK fail: max_notional={max_notional_fraction:.2f} > 2.0") return False # minimum meaningful position if max_notional_fraction < 0.10: if self.verbose: print(f" CG-RISK fail: max_notional={max_notional_fraction:.2f} < 0.10") return False return True def _check_cg_dc_lev(self, config: MCTrialConfig) -> bool: """CG-DC-LEV: DC leverage adjustment constraints.""" if not config.use_direction_confirm: # DC not used - constraints don't apply return True # dc_leverage_boost >= 1.0 (must boost, not reduce) if config.dc_leverage_boost < 1.0: if self.verbose: print(f" CG-DC-LEV fail: boost={config.dc_leverage_boost:.2f} < 1.0") return False # dc_leverage_reduce < 1.0 (must reduce, not boost) if config.dc_leverage_reduce >= 1.0: if self.verbose: print(f" CG-DC-LEV fail: reduce={config.dc_leverage_reduce:.2f} >= 1.0") return False # DC swing bounded: boost * (1/reduce) <= 4.0 dc_swing = config.dc_leverage_boost * (1.0 / config.dc_leverage_reduce) if dc_swing > 4.0: if self.verbose: print(f" CG-DC-LEV fail: dc_swing={dc_swing:.2f} > 4.0") return False return True def _check_cg_acb(self, config: MCTrialConfig) -> bool: """CG-ACB: ACB beta bounds.""" # acb_beta_low < acb_beta_high if config.acb_beta_low >= config.acb_beta_high: if self.verbose: print(f" CG-ACB fail: low={config.acb_beta_low:.2f} >= high={config.acb_beta_high:.2f}") return False # acb_beta_high - acb_beta_low >= 0.20 (meaningful dynamic range) if config.acb_beta_high - config.acb_beta_low < 0.20: if self.verbose: print(f" CG-ACB fail: range={config.acb_beta_high - config.acb_beta_low:.2f} < 0.20") return False # acb_beta_high <= 1.50 (cap at 150%) if config.acb_beta_high > 1.50: if self.verbose: print(f" CG-ACB fail: high={config.acb_beta_high:.2f} > 1.50") return False return True def _check_cg_sp(self, config: MCTrialConfig) -> bool: """CG-SP: SmartPlacer rate constraints.""" if not config.use_sp_slippage: # Slippage disabled - rates don't matter return True # Rates must be in [0, 1] if not (0.0 <= config.sp_maker_entry_rate <= 1.0): if self.verbose: print(f" CG-SP fail: entry_rate={config.sp_maker_entry_rate:.2f} not in [0,1]") return False if not (0.0 <= config.sp_maker_exit_rate <= 1.0): if self.verbose: print(f" CG-SP fail: exit_rate={config.sp_maker_exit_rate:.2f} not in [0,1]") return False return True def _check_cg_ob_sig(self, config: MCTrialConfig) -> bool: """CG-OB-SIG: OB signal constraints.""" # ob_imbalance_bias in [-1.0, 1.0] if not (-1.0 <= config.ob_imbalance_bias <= 1.0): if self.verbose: print(f" CG-OB-SIG fail: bias={config.ob_imbalance_bias:.2f} not in [-1,1]") return False # ob_depth_scale > 0 if config.ob_depth_scale <= 0: if self.verbose: print(f" CG-OB-SIG fail: depth_scale={config.ob_depth_scale:.2f} <= 0") return False return True def _validate_v3_cross_group( self, config: MCTrialConfig ) -> Tuple[bool, Optional[str], List[str]]: """ V3: Cross-group coherence checks. Returns (passed, reason, warnings). """ warnings = [] # Signal threshold vs exit: TP must be achievable before max_hold_bars expires # Approximate: at typical vol, price moves ~0.03% per 5s bar expected_tp_bars = config.fixed_tp_pct / 0.0003 if expected_tp_bars > config.max_hold_bars * 3: warnings.append( f"TP_TIME_RISK: expected_tp_bars={expected_tp_bars:.0f} > max_hold*3={config.max_hold_bars*3}" ) # Leverage convexity vs range: extreme convexity with wide leverage range # produces near-binary leverage if config.leverage_convexity > 5.0 and (config.max_leverage - config.min_leverage) > 5.0: warnings.append( f"HIGH_CONVEXITY_WIDE_RANGE: near-binary leverage behaviour likely" ) # OB skip + DC skip double-filtering: very few trades may fire if config.dc_skip_contradicts and config.ob_imbalance_bias > 0.15: warnings.append( f"DOUBLE_FILTER_RISK: DC skip + strong OB contradiction may starve trades" ) # Reject only on critical cross-group violations # (none currently defined - all are warnings) return True, None, warnings def _validate_v4_degenerate(self, config: MCTrialConfig) -> Tuple[bool, Optional[str]]: """ V4: Degenerate configuration check (lightweight heuristics). Full pre-flight with 500 bars is done in mc_executor during actual trial. This is just a quick sanity check. """ # Check for numerical extremes that would cause issues # Fraction too small - would produce micro-positions if config.fraction < 0.02: return False, f"FRACTION_TOO_SMALL: fraction={config.fraction} < 0.02" # Leverage range too narrow for convexity to matter leverage_range = config.max_leverage - config.min_leverage if leverage_range < 0.5 and config.leverage_convexity > 2.0: return False, f"NARROW_RANGE_HIGH_CONVEXITY: range={leverage_range:.2f}, convexity={config.leverage_convexity:.2f}" # Max hold too short for vol filter to stabilize if config.max_hold_bars < config.vd_trend_lookback + 10: return False, f"HOLD_TOO_SHORT: max_hold={config.max_hold_bars} < trend_lookback+10={config.vd_trend_lookback+10}" # IRP lookback too short for meaningful alignment if config.lookback < 50: return False, f"LOOKBACK_TOO_SHORT: lookback={config.lookback} < 50" return True, None def validate_batch( self, configs: List[MCTrialConfig] ) -> List[ValidationResult]: """ Validate a batch of configurations. Parameters ---------- configs : List[MCTrialConfig] Configurations to validate Returns ------- List[ValidationResult] Validation results (same order as input) """ results = [] for config in configs: result = self.validate(config) results.append(result) return results def get_validity_stats(self, results: List[ValidationResult]) -> Dict[str, Any]: """ Get statistics about validation results. """ total = len(results) if total == 0: return {'total': 0} by_status = {} for status in ValidationStatus: by_status[status.value] = sum(1 for r in results if r.status == status) rejection_reasons = {} for r in results: if r.reject_reason: reason = r.reject_reason.split(':')[0] if ':' in r.reject_reason else r.reject_reason rejection_reasons[reason] = rejection_reasons.get(reason, 0) + 1 return { 'total': total, 'valid': by_status.get(ValidationStatus.VALID.value, 0), 'rejected_v1': by_status.get(ValidationStatus.REJECTED_V1.value, 0), 'rejected_v2': by_status.get(ValidationStatus.REJECTED_V2.value, 0), 'rejected_v3': by_status.get(ValidationStatus.REJECTED_V3.value, 0), 'rejected_v4': by_status.get(ValidationStatus.REJECTED_V4.value, 0), 'validity_rate': by_status.get(ValidationStatus.VALID.value, 0) / total, 'rejection_reasons': rejection_reasons, } def test_validator(): """Quick test of the validator.""" validator = MCValidator(verbose=True) sampler = MCSampler(base_seed=42) # Generate some test configurations trials = sampler.generate_trials(n_samples_per_switch=10, max_trials=100) # Validate results = validator.validate_batch(trials) # Stats stats = validator.get_validity_stats(results) print(f"\nValidation Stats:") print(f" Total: {stats['total']}") print(f" Valid: {stats['valid']} ({stats['validity_rate']*100:.1f}%)") print(f" Rejected V1: {stats['rejected_v1']}") print(f" Rejected V2: {stats['rejected_v2']}") print(f" Rejected V3: {stats['rejected_v3']}") print(f" Rejected V4: {stats['rejected_v4']}") # Show some rejections print("\nSample Rejections:") for r in results: if not r.is_valid(): print(f" Trial {r.trial_id}: {r.status.value} - {r.reject_reason}") if len([x for x in results if not x.is_valid()]) > 5: break return results if __name__ == "__main__": test_validator()