""" m6_test_runner_flow.py — Scheduled M6 continuous-test battery. =============================================================== Runs pytest test categories on three non-overlapping cadences and writes results to run_logs/test_results_latest.json, which MHS reads every 10s via _m6_test_integrity() to gate RM_META (6% weight). Cadences (non-overlapping by tier): FAST — every 10 min : data_integrity only (~3-4 min observed) MED — every 20 min : signal_fill + finance_fuzz (~5-10 min est.) FULL — every 1 h : all 5 categories (~15-20 min est.) Non-overlap logic: Top-of-hour → FULL (subsumes FAST+MED at that slot) :20 and :40 → MED (subsumes FAST at that slot) All others → FAST A file lock prevents concurrent runs if a higher-tier run is still going. Results flow: pytest → conftest.pytest_sessionfinish → test_results_latest.json → MHS._m6_test_integrity() (every ~10s) → RM_META Register / schedule: python prod/m6_test_runner_flow.py --register Run manually: python prod/m6_test_runner_flow.py --mode fast python prod/m6_test_runner_flow.py --mode med python prod/m6_test_runner_flow.py --mode full """ from __future__ import annotations import argparse import fcntl import logging import subprocess import sys import time from datetime import datetime, timezone from pathlib import Path from prefect import flow, task, get_run_logger from prefect.schedules import Cron # ── Paths ───────────────────────────────────────────────────────────────────── _ROOT = Path(__file__).parent.parent # /mnt/dolphinng5_predict _TESTS_DIR = Path(__file__).parent / "tests" _RUN_LOGS = _ROOT / "run_logs" _LOCK_FILE = Path("/tmp/dolphin_m6_test.lock") _PYTHON = "/home/dolphin/siloqy_env/bin/python" # ── Category → test file mapping ───────────────────────────────────────────── _CAT_FILES = { "data_integrity": ["test_data_integrity.py"], "finance_fuzz": ["test_finance_fuzz.py", "test_acb_hz_status_integrity.py"], "signal_fill": ["test_signal_to_fill.py"], "degradation": ["test_degradational.py", "test_mhs_v3.py"], "actor": ["test_scan_bridge_prefect_daemon.py"], } # Tier → categories to run _TIER_CATS = { "fast": ["data_integrity"], "med": ["signal_fill", "finance_fuzz"], "full": ["data_integrity", "finance_fuzz", "signal_fill", "degradation", "actor"], } # ── Tasks ───────────────────────────────────────────────────────────────────── @task(name="acquire_lock", retries=0, timeout_seconds=5) def acquire_lock() -> bool: """Non-blocking lock acquisition. Returns False if another run is active.""" log = get_run_logger() try: _LOCK_FILE.touch(exist_ok=True) fd = open(_LOCK_FILE, "w") fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # Keep fd open — store on Path object as attribute (hacky but Prefect-safe) _LOCK_FILE._fd = fd # type: ignore[attr-defined] log.info("Lock acquired") return True except BlockingIOError: log.warning("Another test run is active — skipping this slot") return False @task(name="release_lock", retries=0, timeout_seconds=5) def release_lock(): fd = getattr(_LOCK_FILE, "_fd", None) if fd: try: fcntl.flock(fd, fcntl.LOCK_UN) fd.close() except Exception: pass @task(name="run_category", retries=0, timeout_seconds=120) def run_category(category: str) -> dict: """Run pytest for one category; return {category, status, passed, total, duration_s}.""" log = get_run_logger() files = _CAT_FILES.get(category, []) if not files: log.warning("No files mapped for category %s — skipping", category) return {"category": category, "status": "N/A", "passed": 0, "total": 0, "duration_s": 0} paths = [] for f in files: p = _TESTS_DIR / f if p.exists(): paths.append(str(p)) else: log.warning("Test file not found: %s", p) if not paths: return {"category": category, "status": "N/A", "passed": 0, "total": 0, "duration_s": 0} cmd = [ _PYTHON, "-m", "pytest", *paths, f"--category={category}", "-x", # stop on first failure within category "-q", "--tb=short", "--no-header", f"--rootdir={_ROOT}", ] log.info("Running: %s", " ".join(cmd)) t0 = time.monotonic() result = subprocess.run( cmd, capture_output=True, text=True, timeout=110, cwd=str(_ROOT), ) duration = round(time.monotonic() - t0, 1) # Log output regardless of outcome — fully logged as requested for line in (result.stdout + result.stderr).splitlines(): log.info("[pytest/%s] %s", category, line) passed = result.returncode == 0 status = "PASS" if passed else "FAIL" log.info("Category %s → %s in %.1fs (exit %d)", category, status, duration, result.returncode) return { "category": category, "status": status, "passed": passed, "total": 1, # conftest counts exact; this is run-level bool "duration_s": duration, "exit_code": result.returncode, } # ── Flows ────────────────────────────────────────────────────────────────────── def _make_flow(tier: str): """Factory — builds a named @flow for each tier (Prefect requires unique names).""" @flow( name=f"m6_tests_{tier}", description=f"M6 test battery tier={tier}: {_TIER_CATS[tier]}", timeout_seconds=300, ) def _flow(): log = get_run_logger() now = datetime.now(timezone.utc) log.info("M6 test runner [%s] starting at %s", tier, now.isoformat()) locked = acquire_lock() if not locked: log.warning("Skipped — lock busy") return try: categories = _TIER_CATS[tier] results = [] for cat in categories: r = run_category(cat) results.append(r) fails = [r for r in results if r["status"] == "FAIL"] total_t = sum(r["duration_s"] for r in results) log.info( "M6 [%s] complete: %d/%d categories passed in %.1fs", tier, len(results) - len(fails), len(results), total_t, ) if fails: log.error("FAILING categories: %s", [r["category"] for r in fails]) finally: release_lock() _flow.__name__ = f"m6_tests_{tier}" return _flow m6_tests_fast = _make_flow("fast") m6_tests_med = _make_flow("med") m6_tests_full = _make_flow("full") # ── CLI ──────────────────────────────────────────────────────────────────────── def _register(): """Register all three deployments with Prefect server.""" import os api = os.environ.get("PREFECT_API_URL", "http://localhost:4200/api") deployments = [ (m6_tests_fast, "10min-fast", Cron("10,30,50 * * * *", timezone="UTC")), (m6_tests_med, "20min-med", Cron("20,40 * * * *", timezone="UTC")), (m6_tests_full, "1h-full", Cron("0 * * * *", timezone="UTC")), ] for flow_fn, suffix, schedule in deployments: name = f"m6-{suffix}" dep = flow_fn.to_deployment( name=name, schedule=schedule, tags=["m6", "tests", "observability"], ) dep.apply() print(f"Registered: {name} schedule={schedule}") print("\nNon-overlap logic:") print(" :00 → FULL (1h) runs all 5 categories") print(" :20 :40 → MED (20m) signal_fill + finance_fuzz") print(" :05...:55 → FAST (5m) data_integrity only") print(" (FAST skips :00 :20 :40 because lock is held by FULL/MED)") if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--register", action="store_true", help="Register Prefect deployments") parser.add_argument("--mode", choices=["fast", "med", "full"], help="Run immediately") args = parser.parse_args() if args.register: _register() elif args.mode: {"fast": m6_tests_fast, "med": m6_tests_med, "full": m6_tests_full}[args.mode]() else: parser.print_help()