Files
DOLPHIN/prod/mc_forewarner_flow.py
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

139 lines
5.6 KiB
Python
Executable File

"""DOLPHIN MIG1.3 — MC-Forewarner Flow
Runs every 4 hours to evaluate structural market risk via Monte Carlo ML models.
Writes to Hazelcast DOLPHIN_FEATURES IMap key 'mc_forewarner_latest'.
"""
import sys, json, logging, argparse
from pathlib import Path
from datetime import datetime, timezone
from prefect import flow, task, get_run_logger
import hazelcast
HCM_DIR = Path(__file__).parent.parent
logging.basicConfig(level=logging.INFO)
# ── Tasks ───────────────────────────────────────────────────────────────────────
@task(name="run_mc_forewarner", retries=1, retry_delay_seconds=30)
def evaluate_market_structure() -> dict:
log = get_run_logger()
models_dir = HCM_DIR / 'nautilus_dolphin' / 'mc_results' / 'models'
if not models_dir.exists() or not list(models_dir.glob('*.pkl')):
log.warning("MC models not found — using deterministic FALLBACK_NO_DATA")
return {
'status': 'ORANGE',
'catastrophic_prob': 0.20,
'envelope_score': 0.80,
'champion_probability': None,
'predicted_roi': None,
'predicted_roi_p10': None,
'predicted_roi_p90': None,
'predicted_max_dd': None,
'parameter_risks': {},
'warnings': ['MC models not found — using deterministic fallback'],
'source': 'FALLBACK_NO_DATA',
}
try:
sys.path.insert(0, str(HCM_DIR / 'nautilus_dolphin'))
from mc.mc_ml import DolphinForewarner
from mc.mc_sampler import MCSampler
forewarner = DolphinForewarner(models_dir=str(models_dir))
champion_config = MCSampler().generate_champion_trial() # returns MCTrialConfig
report = forewarner.assess(champion_config)
prob = float(report.catastrophic_probability)
env_score = float(report.envelope_score)
status = 'GREEN' if prob < 0.10 else ('ORANGE' if prob < 0.30 else 'RED')
log.info(f"MC-Forewarner real eval: status={status} prob={prob:.3f}")
return {
'status': status,
'catastrophic_prob': prob,
'envelope_score': env_score,
'champion_probability': float(report.champion_probability),
'predicted_roi': float(report.predicted_roi),
'predicted_roi_p10': float(report.predicted_roi_p10),
'predicted_roi_p90': float(report.predicted_roi_p90),
'predicted_max_dd': float(report.predicted_max_dd),
'parameter_risks': report.parameter_risks,
'warnings': report.warnings,
'source': 'REAL_MODEL',
}
except Exception as e:
log.error(f"MC-Forewarner evaluation failed: {e}")
# Deterministic conservative fallback — never random
return {
'status': 'ORANGE',
'catastrophic_prob': 0.20,
'envelope_score': 0.80,
'champion_probability': None,
'predicted_roi': None,
'predicted_roi_p10': None,
'predicted_roi_p90': None,
'predicted_max_dd': None,
'parameter_risks': {},
'warnings': [f'MC evaluation failed: {e}'],
'source': 'FALLBACK_ERROR',
}
@task(name="write_mc_hz", retries=3, retry_delay_seconds=5)
def write_mc_to_hz(result: dict):
log = get_run_logger()
try:
client = hazelcast.HazelcastClient(cluster_name="dolphin", cluster_members=["localhost:5701"])
imap = client.get_map('DOLPHIN_FEATURES').blocking()
result['timestamp'] = datetime.now(timezone.utc).isoformat()
# Include all ForewarningReport fields so TUI can show predicted bounds
if 'predicted_roi' not in result:
result.setdefault('predicted_roi', None)
result.setdefault('predicted_roi_p10', None)
result.setdefault('predicted_roi_p90', None)
result.setdefault('predicted_max_dd', None)
result.setdefault('champion_probability', None)
result.setdefault('parameter_risks', {})
result.setdefault('warnings', [])
imap.put('mc_forewarner_latest', json.dumps(result))
log.info(f"Successfully wrote MC-Forewarner state to HZ: {result['status']}")
client.shutdown()
except Exception as e:
log.error(f"Failed to write MC-Forewarner to HZ: {e}")
# ── Flow ────────────────────────────────────────────────────────────────────────
@flow(name="mc-forewarner", log_prints=True)
def mc_forewarner_flow():
log = get_run_logger()
log.info(f"=== Starting MC-Forewarner Flow ===")
result = evaluate_market_structure()
write_mc_to_hz(result)
return result
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--register', action='store_true')
args = parser.parse_args()
if args.register:
from prefect.client.schemas.schedules import CronSchedule as CS
deployment = mc_forewarner_flow.to_deployment(
name="dolphin-mc-forewarner",
schedule=CS(cron="0 * * * *", timezone="UTC"),
work_pool_name="dolphin",
tags=["mc-forewarner", "dolphin"],
)
deployment.apply()
print("Registered: dolphin-mc-forewarner")
else:
import os
os.environ.setdefault('PREFECT_API_URL', 'http://localhost:4200/api')
mc_forewarner_flow()