"""DOLPHIN MIG1.3 — MC-Forewarner Flow Runs every 4 hours to evaluate structural market risk via Monte Carlo ML models. Writes to Hazelcast DOLPHIN_FEATURES IMap key 'mc_forewarner_latest'. """ import sys, json, logging, argparse from pathlib import Path from datetime import datetime, timezone from prefect import flow, task, get_run_logger import hazelcast HCM_DIR = Path(__file__).parent.parent logging.basicConfig(level=logging.INFO) # ── Tasks ─────────────────────────────────────────────────────────────────────── @task(name="run_mc_forewarner", retries=1, retry_delay_seconds=30) def evaluate_market_structure() -> dict: log = get_run_logger() models_dir = HCM_DIR / 'nautilus_dolphin' / 'mc_results' / 'models' if not models_dir.exists() or not list(models_dir.glob('*.pkl')): log.warning("MC models not found — using deterministic FALLBACK_NO_DATA") return { 'status': 'ORANGE', 'catastrophic_prob': 0.20, 'envelope_score': 0.80, 'champion_probability': None, 'predicted_roi': None, 'predicted_roi_p10': None, 'predicted_roi_p90': None, 'predicted_max_dd': None, 'parameter_risks': {}, 'warnings': ['MC models not found — using deterministic fallback'], 'source': 'FALLBACK_NO_DATA', } try: sys.path.insert(0, str(HCM_DIR / 'nautilus_dolphin')) from mc.mc_ml import DolphinForewarner from mc.mc_sampler import MCSampler forewarner = DolphinForewarner(models_dir=str(models_dir)) champion_config = MCSampler().generate_champion_trial() # returns MCTrialConfig report = forewarner.assess(champion_config) prob = float(report.catastrophic_probability) env_score = float(report.envelope_score) status = 'GREEN' if prob < 0.10 else ('ORANGE' if prob < 0.30 else 'RED') log.info(f"MC-Forewarner real eval: status={status} prob={prob:.3f}") return { 'status': status, 'catastrophic_prob': prob, 'envelope_score': env_score, 'champion_probability': float(report.champion_probability), 'predicted_roi': float(report.predicted_roi), 'predicted_roi_p10': float(report.predicted_roi_p10), 'predicted_roi_p90': float(report.predicted_roi_p90), 'predicted_max_dd': float(report.predicted_max_dd), 'parameter_risks': report.parameter_risks, 'warnings': report.warnings, 'source': 'REAL_MODEL', } except Exception as e: log.error(f"MC-Forewarner evaluation failed: {e}") # Deterministic conservative fallback — never random return { 'status': 'ORANGE', 'catastrophic_prob': 0.20, 'envelope_score': 0.80, 'champion_probability': None, 'predicted_roi': None, 'predicted_roi_p10': None, 'predicted_roi_p90': None, 'predicted_max_dd': None, 'parameter_risks': {}, 'warnings': [f'MC evaluation failed: {e}'], 'source': 'FALLBACK_ERROR', } @task(name="write_mc_hz", retries=3, retry_delay_seconds=5) def write_mc_to_hz(result: dict): log = get_run_logger() try: client = hazelcast.HazelcastClient(cluster_name="dolphin", cluster_members=["localhost:5701"]) imap = client.get_map('DOLPHIN_FEATURES').blocking() result['timestamp'] = datetime.now(timezone.utc).isoformat() # Include all ForewarningReport fields so TUI can show predicted bounds if 'predicted_roi' not in result: result.setdefault('predicted_roi', None) result.setdefault('predicted_roi_p10', None) result.setdefault('predicted_roi_p90', None) result.setdefault('predicted_max_dd', None) result.setdefault('champion_probability', None) result.setdefault('parameter_risks', {}) result.setdefault('warnings', []) imap.put('mc_forewarner_latest', json.dumps(result)) log.info(f"Successfully wrote MC-Forewarner state to HZ: {result['status']}") client.shutdown() except Exception as e: log.error(f"Failed to write MC-Forewarner to HZ: {e}") # ── Flow ──────────────────────────────────────────────────────────────────────── @flow(name="mc-forewarner", log_prints=True) def mc_forewarner_flow(): log = get_run_logger() log.info(f"=== Starting MC-Forewarner Flow ===") result = evaluate_market_structure() write_mc_to_hz(result) return result if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--register', action='store_true') args = parser.parse_args() if args.register: from prefect.client.schemas.schedules import CronSchedule as CS deployment = mc_forewarner_flow.to_deployment( name="dolphin-mc-forewarner", schedule=CS(cron="0 * * * *", timezone="UTC"), work_pool_name="dolphin", tags=["mc-forewarner", "dolphin"], ) deployment.apply() print("Registered: dolphin-mc-forewarner") else: import os os.environ.setdefault('PREFECT_API_URL', 'http://localhost:4200/api') mc_forewarner_flow()