#!/usr/bin/env python3 """ DOLPHIN Meta Health Service v3 =============================== Watchdog-of-watchdogs: monitors all Dolphin subsystems, emits Rm_meta, and performs supervised recovery via supervisorctl (NOT systemctl). CRITICAL fixes vs v1/v2: FIX-1 Product formula replaced with weighted sum — one absent optional service no longer collapses rm_meta to 0 and triggers restarts. FIX-2 Recovery uses supervisorctl exclusively — systemctl calls are gone. systemctl was the "weird stopping bug": it killed supervisord-managed processes, causing supervisord to fight against MHS. FIX-3 Process patterns updated to current supervisord program names. FIX-4 HZ keys updated to current system (exf_latest, acb_boost, etc.). FIX-5 Trader services (nautilus_trader, scan_bridge) are NEVER auto-restarted — they may be intentionally stopped. Only dolphin_data infra is recovered. FIX-6 Recovery is rate-limited: max one restart per service per 10 min. FIX-7 EsoF removed (never deployed on this system). FIX-8 obf_universe added as a monitored data service. Outputs: - /mnt/dolphinng5_predict/run_logs/meta_health.json - Hazelcast DOLPHIN_META_HEALTH["latest"] - stdout/file log Services monitored (supervisord program names): dolphin_data:exf_fetcher — ExF live indicators → HZ exf_latest dolphin_data:acb_processor — ACB boost writer → HZ acb_boost dolphin_data:obf_universe — L2 universe health → HZ obf_universe_latest dolphin:nautilus_trader — Execution engine (informational only) dolphin:scan_bridge — Arrow→HZ bridge (informational only) """ import json import logging import os import subprocess import sys import threading import time import urllib.request from dataclasses import asdict, dataclass from datetime import datetime, timezone from pathlib import Path from typing import Dict, Optional, Tuple # Optional deps try: import psutil PSUTIL_AVAILABLE = True except ImportError: PSUTIL_AVAILABLE = False try: from hazelcast import HazelcastClient HZ_CLIENT_AVAILABLE = True except ImportError: HZ_CLIENT_AVAILABLE = False try: sys.path.insert(0, str(Path(__file__).parent.parent / 'nautilus_dolphin')) from nautilus_dolphin.nautilus.survival_stack import SurvivalStack SURVIVAL_STACK_AVAILABLE = True except ImportError: SURVIVAL_STACK_AVAILABLE = False from dolphin_exit_handler import install_exit_handler install_exit_handler("meta_health") from hz_warmup import hz_warmup, read_lifecycle_state # ── Configuration ────────────────────────────────────────────────────────────── PROJECT_ROOT = Path("/mnt/dolphinng5_predict") LOG_DIR = PROJECT_ROOT / "run_logs" LOG_FILE = LOG_DIR / "meta_health.log" STATUS_JSON = LOG_DIR / "meta_health.json" SUPERVISORD_CONF = PROJECT_ROOT / "prod" / "supervisor" / "dolphin-supervisord.conf" CHECK_INTERVAL_S = 10.0 # main loop cadence DATA_STALE_S = 30.0 # warn if HZ key older than this DATA_DEAD_S = 120.0 # score=0 if HZ key older than this RECOVERY_COOLDOWN_CRITICAL_S = 10.0 # critical data infra: retry every 10s RECOVERY_COOLDOWN_DEFAULT_S = 300.0 # non-critical / informational services # ── Container watchdog (fast-tick background thread) ────────────────────────── # Probes HTTP health endpoints every 0.5s; triggers docker restart after N # consecutive failures. Runs independently of the 10s main loop. CONTAINER_WATCHDOGS = [ # NOTE: dolphin-hazelcast REMOVED from watchdog (2026-04-07). # HZ is RAM-only volatile — restarting it wipes ALL state and triggers cascading # failures (Cat1=0 → HIBERNATE → trade halt). Docker autoheal handles container # health. MHS must NEVER restart HZ. See project_hz_volatile_state.md. { "name": "dolphin-prefect", "url": "http://127.0.0.1:4200/api/health", "check": lambda body: body.strip() == b"true", "threshold": 4, # 4 × 0.5s = ~2s detection → ~37s worst-case total "cooldown": 60, "restart_timeout": 45, }, { "name": "dolphin-hazelcast-mc", "url": "http://127.0.0.1:8080/", "check": lambda body: len(body) > 0, "threshold": 20, # 20 × 0.5s = 10s — MC is non-critical "cooldown": 120, "restart_timeout": 60, }, ] CONTAINER_POLL_S = 0.5 # fast-tick interval # ── Service registry ─────────────────────────────────────────────────────────── # Maps supervisord program name → list of cmdline patterns to detect process. # "critical_data": True → auto-restart via supervisorctl on failure. # "critical_data": False → informational only, never auto-restarted. SERVICES = { "dolphin_data:exf_fetcher": { "patterns": ["exf_fetcher_flow"], "critical_data": True, }, "dolphin_data:acb_processor": { "patterns": ["acb_processor_service"], "critical_data": True, }, "dolphin_data:obf_universe": { "patterns": ["obf_universe_service"], "critical_data": True, }, "dolphin:nautilus_trader": { "patterns": ["nautilus_event_trader"], "critical_data": False, # may be intentionally stopped }, "dolphin:scan_bridge": { "patterns": ["scan_bridge_service"], "critical_data": False, # informational }, } # ── HZ data keys to check freshness ─────────────────────────────────────────── # (map_name, key, ts_field_or_None) # ts_field=None → key presence only, no freshness score HZ_DATA_SOURCES = { "exf_latest": ("DOLPHIN_FEATURES", "exf_latest", "_pushed_at"), "acb_boost": ("DOLPHIN_FEATURES", "acb_boost", None), "latest_eigen_scan": ("DOLPHIN_FEATURES", "latest_eigen_scan", "timestamp"), "obf_universe": ("DOLPHIN_FEATURES", "obf_universe_latest", "_snapshot_utc"), } HZ_PORTS = {"hazelcast": 5701, "prefect_api": 4200} # ── Sensor weights (sum = 1.0) ───────────────────────────────────────────────── # FIX-1: weighted sum replaces product. No single sensor can zero-out rm_meta. SENSOR_WEIGHTS = { "m4_control_plane": 0.33, # HZ must be up — highest weight "m1_data_infra": 0.33, # data pipeline processes must run "m3_data_freshness": 0.19, # HZ data must be fresh "m5_coherence": 0.09, # coherence / sanity checks "m6_test_integrity": 0.06, # continuous test suite gate (NEW) # m1_trader and m2_heartbeat are emitted but NOT included in rm_meta # because they may be intentionally down during non-trading hours } # M6 config _TEST_RESULTS_PATH = Path("/mnt/dolphinng5_predict/run_logs/test_results_latest.json") M6_STALE_SECONDS = 900 # 15 min — if last run older than this, penalise M6_DEAD_SECONDS = 3600 # 60 min — if no run in 1h, treat as 0 # ── Logging ──────────────────────────────────────────────────────────────────── os.makedirs(LOG_DIR, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s — %(message)s", handlers=[ logging.FileHandler(LOG_FILE), logging.StreamHandler(sys.stdout), ], ) logger = logging.getLogger("MHSv3") @dataclass class HealthReport: rm_meta: float status: str # GREEN / DEGRADED / CRITICAL / DEAD m4_control_plane: float m1_data_infra: float # critical data services m1_trader: float # informational — not in rm_meta m2_heartbeat: float # informational — not in rm_meta m3_data_freshness: float m5_coherence: float m6_test_integrity: float # continuous test gate (NEW) service_status: dict # per-supervisord-program state hz_key_status: dict # per-HZ-key freshness timestamp: str class MetaHealthServiceV3: def __init__(self): self._hz_client: Optional[object] = None self._recovery_timestamps: Dict[str, float] = {} # FIX-6 rate limit # Per-container fail streaks and last-restart timestamps for watchdog thread self._cw_fail_streaks: Dict[str, int] = {c["name"]: 0 for c in CONTAINER_WATCHDOGS} self._cw_last_restart: Dict[str, float] = {c["name"]: 0.0 for c in CONTAINER_WATCHDOGS} # SurvivalStack — computes posture and writes to DOLPHIN_SAFETY _fast_recovery = os.environ.get('DOLPHIN_FAST_RECOVERY', '').lower() in ('1', 'true', 'yes') self._survival_stack = SurvivalStack(fast_recovery=_fast_recovery) if SURVIVAL_STACK_AVAILABLE else None if _fast_recovery: logger.warning("POSTURE_DEBUG: fast_recovery=True — bounded recovery BYPASSED") logger.info("MHSv3 starting. PID=%d psutil=%s hz=%s survival_stack=%s", os.getpid(), PSUTIL_AVAILABLE, HZ_CLIENT_AVAILABLE, SURVIVAL_STACK_AVAILABLE) # Start fast-tick container watchdog as daemon thread t = threading.Thread(target=self._container_watchdog, daemon=True, name="container-watchdog") t.start() logger.info("Container watchdog thread started (poll=%.1fs)", CONTAINER_POLL_S) # ── Container watchdog (fast-tick background thread) ────────────────────── def _container_watchdog(self): """Runs forever in a daemon thread. Polls HTTP health endpoints every CONTAINER_POLL_S and triggers docker restart after N consecutive failures.""" # Grace period on startup — let containers stabilize before watchdog activates # HZ restart takes ~15s and MHS itself needs ~10s to connect. 120s is safe. time.sleep(120) logger.info("Container watchdog armed after 120s grace period") while True: # Don't heal anything while HZ is in STARTING state try: hz = self._get_hz() if hz and read_lifecycle_state(hz) == "STARTING": time.sleep(CONTAINER_POLL_S) continue except Exception: pass t0 = time.monotonic() for cfg in CONTAINER_WATCHDOGS: name = cfg["name"] try: with urllib.request.urlopen(cfg["url"], timeout=0.3) as resp: body = resp.read() healthy = cfg["check"](body) except Exception: healthy = False if healthy: self._cw_fail_streaks[name] = 0 # Reset last_restart when healthy: next failure heals without cooldown wait. # Safe because confirmation of health proves prior restart completed. self._cw_last_restart[name] = 0.0 else: self._cw_fail_streaks[name] += 1 streak = self._cw_fail_streaks[name] if streak >= cfg["threshold"]: self._docker_heal(name, cfg) elapsed = time.monotonic() - t0 time.sleep(max(0.01, CONTAINER_POLL_S - elapsed)) def _docker_heal(self, name: str, cfg: dict): """Issue docker restart for a container if cooldown allows.""" now = time.time() last = self._cw_last_restart.get(name, 0.0) if now - last < cfg["cooldown"]: return self._cw_last_restart[name] = now self._cw_fail_streaks[name] = 0 logger.critical("CONTAINER-HEAL: restarting %s (streak=%d)", name, self._cw_fail_streaks.get(name, 0)) try: subprocess.run( ["docker", "restart", name], timeout=cfg["restart_timeout"], check=True, capture_output=True, ) logger.info("CONTAINER-HEAL: %s restarted successfully", name) except Exception as e: logger.error("CONTAINER-HEAL: docker restart %s failed: %s", name, e) # ── HZ connection ───────────────────────────────────────────────────────── def _get_hz(self): if not HZ_CLIENT_AVAILABLE: return None if self._hz_client: try: if self._hz_client.lifecycle_service.is_running(): return self._hz_client except Exception: pass try: self._hz_client = HazelcastClient( cluster_name="dolphin", cluster_members=["127.0.0.1:5701"], connection_timeout=2.0, ) # Warm-up from CH if HZ maps are empty (fresh restart) try: lifecycle = read_lifecycle_state(self._hz_client) if lifecycle in ("UNKNOWN", "STARTING"): logger.info("HZ lifecycle=%s — running CH warm-up", lifecycle) hz_warmup(self._hz_client) except Exception as e: logger.warning("HZ warm-up failed (non-fatal): %s", e) return self._hz_client except Exception as e: logger.debug("HZ connect failed: %s", e) self._hz_client = None return None # ── M4: Control Plane ──────────────────────────────────────────────────── def _m4_control_plane(self) -> float: """Check HZ and Prefect ports are listening.""" import socket scores = [] for name, port in HZ_PORTS.items(): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(1.0) up = s.connect_ex(("127.0.0.1", port)) == 0 except Exception: up = False scores.append(1.0 if up else 0.0) if not up: logger.debug("M4: %s port %d DOWN", name, port) # HZ (index 0) is more important than Prefect (index 1) hz_score = scores[0] prefect_score = scores[1] if len(scores) > 1 else 0.0 return hz_score * 0.8 + prefect_score * 0.2 # ── M1: Process / supervisord integrity ─────────────────────────────────── def _check_supervisord_status(self) -> Dict[str, str]: """Ask supervisorctl for the status of all managed programs. Returns dict: program_name → 'RUNNING' | 'STOPPED' | 'UNKNOWN' """ try: result = subprocess.run( ["supervisorctl", "-c", str(SUPERVISORD_CONF), "status"], capture_output=True, text=True, timeout=5, ) statuses = {} for line in result.stdout.splitlines(): parts = line.split() if len(parts) >= 2: statuses[parts[0]] = parts[1] return statuses except Exception as e: logger.debug("supervisorctl status failed: %s", e) return {} def _m1_process_integrity(self) -> Tuple[float, float, dict]: """ Returns (m1_data_infra, m1_trader, per_service_status_dict). Uses supervisorctl status first; falls back to psutil cmdline scan. """ sv_status = self._check_supervisord_status() service_results = {} for prog, cfg in SERVICES.items(): # Try supervisorctl result first if prog in sv_status: running = sv_status[prog] == "RUNNING" elif PSUTIL_AVAILABLE: # Fallback: cmdline scan running = False for proc in psutil.process_iter(["name", "cmdline"]): try: cmdline = " ".join(proc.info["cmdline"] or []) if any(p in cmdline for p in cfg["patterns"]): running = True break except (psutil.NoSuchProcess, psutil.AccessDenied): continue else: running = True # Cannot check, assume OK service_results[prog] = "RUNNING" if running else "STOPPED" if not running: logger.warning("M1: %s is STOPPED", prog) # Score data infra (critical_data=True) separately from trader services data_progs = [p for p, c in SERVICES.items() if c["critical_data"]] trader_progs = [p for p, c in SERVICES.items() if not c["critical_data"]] def score(progs): if not progs: return 1.0 n_ok = sum(1 for p in progs if service_results.get(p) == "RUNNING") return n_ok / len(progs) return score(data_progs), score(trader_progs), service_results # ── M2: Heartbeat freshness ─────────────────────────────────────────────── def _m2_heartbeat_freshness(self) -> float: """Check for a live nautilus heartbeat in HZ (informational).""" hz = self._get_hz() if not hz: return 0.0 try: hb_map = hz.get_map("DOLPHIN_HEARTBEAT").blocking() raw = hb_map.get("nautilus_flow_heartbeat") if not raw: return 0.5 # HZ up but no heartbeat — trader may be stopped data = json.loads(raw) if isinstance(raw, str) else raw age = time.time() - data.get("ts", 0) if age > 60: return 0.0 if age > 30: return 0.5 return 1.0 except Exception: return 0.5 # ── M3: Data freshness ─────────────────────────────────────────────────── def _m3_data_freshness(self) -> Tuple[float, dict]: """Check all critical HZ keys are present and fresh.""" hz = self._get_hz() if not hz: return 0.0, {} results = {} scores = [] try: features = hz.get_map("DOLPHIN_FEATURES").blocking() except Exception as e: logger.debug("M3: DOLPHIN_FEATURES map error: %s", e) return 0.0, {} for name, (map_name, key, ts_field) in HZ_DATA_SOURCES.items(): try: if map_name == "DOLPHIN_FEATURES": raw = features.get(key) else: raw = hz.get_map(map_name).blocking().get(key) if raw is None: results[name] = {"status": "missing", "score": 0.0} scores.append(0.0) logger.warning("M3: %s missing from HZ", name) continue if ts_field is None: # Presence-only check results[name] = {"status": "present", "score": 1.0} scores.append(1.0) continue data = json.loads(raw) if isinstance(raw, str) else raw ts_val = data.get(ts_field) if isinstance(data, dict) else None if ts_val is None: results[name] = {"status": "no_ts", "score": 0.7} scores.append(0.7) continue # Parse timestamp if isinstance(ts_val, (int, float)): ts = float(ts_val) else: ts = datetime.fromisoformat( str(ts_val).replace("Z", "+00:00") ).timestamp() age = time.time() - ts if age > DATA_DEAD_S: score = 0.0 status = f"dead ({age:.0f}s)" logger.warning("M3: %s DEAD — %s ago", name, f"{age:.0f}s") elif age > DATA_STALE_S: score = 0.5 status = f"stale ({age:.0f}s)" else: score = 1.0 status = f"fresh ({age:.1f}s)" results[name] = {"status": status, "age_s": round(age, 1), "score": score} scores.append(score) except Exception as e: logger.debug("M3 error for %s: %s", name, e) results[name] = {"status": "error", "score": 0.0} scores.append(0.0) avg = sum(scores) / len(scores) if scores else 0.0 return avg, results # ── M5: Coherence ──────────────────────────────────────────────────────── def _m5_coherence(self) -> float: """Sanity checks on HZ data integrity.""" hz = self._get_hz() if not hz: return 0.0 checks = [] try: features = hz.get_map("DOLPHIN_FEATURES").blocking() # exf_latest: _acb_ready must be True and ok_count > 20 exf_raw = features.get("exf_latest") if exf_raw: exf = json.loads(exf_raw) acb_ready = exf.get("_acb_ready", False) ok_count = exf.get("_ok_count", 0) checks.append(1.0 if (acb_ready and ok_count >= 20) else 0.5) else: checks.append(0.0) # acb_boost: boost must be in [1.0, 2.5] acb_raw = features.get("acb_boost") if acb_raw: acb = json.loads(acb_raw) boost = acb.get("boost", 0) checks.append(1.0 if 1.0 <= boost <= 2.5 else 0.0) else: checks.append(0.0) # obf_universe: must have >= 200 assets uni_raw = features.get("obf_universe_latest") if uni_raw: uni = json.loads(uni_raw) n = uni.get("_n_assets", 0) checks.append(1.0 if n >= 200 else 0.5) else: checks.append(0.5) # optional — not fatal except Exception as e: logger.debug("M5 error: %s", e) return 0.0 return sum(checks) / len(checks) if checks else 0.0 # ── M6: Continuous test integrity ──────────────────────────────────────── def _m6_test_integrity(self) -> float: """ Reads run_logs/test_results_latest.json and scores the test suite health. Scoring: - Each category in {data_integrity, finance_fuzz, signal_fill, degradation, actor} contributes equally. - PASS → 1.0 per category - FAIL → 0.0 per category - N/A → 0.8 (not yet automated — not penalised heavily) - Missing → 0.5 (unknown state) Age penalty: - Last run > M6_STALE_SECONDS (15 min): score × 0.7 - Last run > M6_DEAD_SECONDS (60 min): score = 0.0 """ try: if not _TEST_RESULTS_PATH.exists(): return 0.5 # file never written — unknown raw = json.loads(_TEST_RESULTS_PATH.read_text()) # Age check run_at_str = raw.get("_run_at") if run_at_str: try: run_at = datetime.fromisoformat(run_at_str.replace("Z", "+00:00")) age_s = (datetime.now(timezone.utc) - run_at).total_seconds() if age_s > M6_DEAD_SECONDS: logger.warning("M6: test results stale %.0f min > dead threshold", age_s / 60) return 0.0 age_penalty = 0.7 if age_s > M6_STALE_SECONDS else 1.0 except Exception: age_penalty = 1.0 else: age_penalty = 1.0 cats = ["data_integrity", "finance_fuzz", "signal_fill", "degradation", "actor"] cat_scores = [] for cat in cats: info = raw.get(cat, {}) status = (info.get("status") or "MISSING") if info else "MISSING" if status == "PASS": cat_scores.append(1.0) elif status == "FAIL": cat_scores.append(0.0) elif status == "N/A": cat_scores.append(0.8) else: cat_scores.append(0.5) score = sum(cat_scores) / len(cat_scores) * age_penalty logger.debug( "M6: cats=%s age_penalty=%.2f score=%.3f", [f"{c}={s:.1f}" for c, s in zip(cats, cat_scores)], age_penalty, score, ) return round(score, 3) except Exception as e: logger.debug("M6 error: %s", e) return 0.5 # unknown — don't penalise if file unreadable # ── Rm_meta ────────────────────────────────────────────────────────────── def _compute_rm_meta(self, m4, m1_data, m3, m5, m6) -> Tuple[float, str]: """ FIX-1: Weighted sum — no single sensor can zero rm_meta. Trader heartbeat (m2) and trader process (m1_trader) are excluded because they may be intentionally stopped. m6 = continuous test suite integrity gate (6% weight). """ w = SENSOR_WEIGHTS tot = sum(w.values()) rm = ( w["m4_control_plane"] * m4 + w["m1_data_infra"] * m1_data + w["m3_data_freshness"] * m3 + w["m5_coherence"] * m5 + w["m6_test_integrity"] * m6 ) / tot if rm > 0.85: status = "GREEN" elif rm > 0.6: status = "DEGRADED" elif rm > 0.3: status = "CRITICAL" else: status = "DEAD" return round(rm, 3), status # ── Recovery ───────────────────────────────────────────────────────────── def _restart_via_supervisorctl(self, program: str): """FIX-2: Restart via supervisorctl, never systemctl. FIX-7: Runs in a daemon thread so a slow/hung restart never blocks the main check loop — other services keep being monitored. """ import threading now = time.time() last = self._recovery_timestamps.get(program, 0.0) # Choose cooldown: critical data infra gets 60s, others 300s cfg = SERVICES.get(program, {}) cooldown = (RECOVERY_COOLDOWN_CRITICAL_S if cfg.get("critical_data") else RECOVERY_COOLDOWN_DEFAULT_S) if now - last < cooldown: logger.debug("RECOVERY: %s cooldown active (%.0fs remaining)", program, cooldown - (now - last)) return # Mark timestamp NOW (before thread runs) so concurrent calls don't # also trigger a restart while the thread is in flight. self._recovery_timestamps[program] = now logger.warning("RECOVERY: restarting %s via supervisorctl (async)", program) def _do_restart(): try: result = subprocess.run( ["supervisorctl", "-c", str(SUPERVISORD_CONF), "restart", program], capture_output=True, text=True, timeout=30, ) logger.info("RECOVERY: %s → %s", program, result.stdout.strip() or "ok") except Exception as e: logger.error("RECOVERY: failed to restart %s: %s", program, e) threading.Thread(target=_do_restart, daemon=True, name=f"recovery-{program.replace(':', '-')}").start() def _attempt_recovery(self, report: HealthReport): """ FIX-5: Only auto-restart critical data infra services. Trader services are NEVER auto-restarted (may be intentionally stopped). FIX-6: Each service has its own 10-min restart cooldown. """ if report.status == "GREEN": return # Only recover data infra, and only when CRITICAL or DEAD if report.status not in ("CRITICAL", "DEAD"): return for prog, state in report.service_status.items(): cfg = SERVICES.get(prog, {}) if cfg.get("critical_data") and state == "STOPPED": self._restart_via_supervisorctl(prog) # ── SurvivalStack (posture control) ────────────────────────────────────── def _process_survival_stack(self, rm_meta: float, m1_data: float): """Compute posture via SurvivalStack and write to DOLPHIN_SAFETY. Integrated from system_watchdog_service.py (MIG3) — no duplicate process needed. """ if not self._survival_stack: return hz = self._get_hz() if not hz: return try: hz_nodes = len(hz.cluster_service.get_members()) # Heartbeat age — read from DOLPHIN_HEARTBEAT try: hb_raw = hz.get_map("DOLPHIN_HEARTBEAT").blocking().get("nautilus_flow_heartbeat") hb_data = json.loads(hb_raw) if hb_raw else {} heartbeat_age_s = (datetime.now(timezone.utc).timestamp() - hb_data.get("ts", 0)) except Exception: heartbeat_age_s = 999.0 now_ts = datetime.now(timezone.utc).timestamp() features_map = hz.get_map("DOLPHIN_FEATURES").blocking() # MC-Forewarner — absent = not deployed on this instance, treat as neutral try: mc_raw = features_map.get("mc_forewarner_latest") if mc_raw is None: mc_status, mc_age_h = "GREEN", 0.0 else: mc_state = json.loads(mc_raw) mc_status = mc_state.get("status", "GREEN") mc_ts = mc_state.get("timestamp") mc_age_h = ((now_ts - datetime.fromisoformat(mc_ts).timestamp()) / 3600.0 if mc_ts else 0.0) except Exception: mc_status, mc_age_h = "GREEN", 0.0 # OBF (order-book features) — obf_universe IS deployed; absence/staleness = real failure try: ob_raw = features_map.get("asset_BTCUSDT_ob") if ob_raw is None: # obf_universe should be writing — absent means subsystem failed ob_stale, ob_depth, ob_fill = True, 0.5, 0.5 else: ob_state = json.loads(ob_raw) ob_ts = ob_state.get("timestamp", 0) ob_age_s = now_ts - ob_ts if ob_ts else 999.0 if ob_age_s > 60.0: ob_stale, ob_depth, ob_fill = True, 0.5, 0.5 else: # Data is live — measure DEPTH (liquidity), NOT balance (balance is a signal, not a risk) # Deep book with skew is fillable; thin book is risky regardless of balance bid_top = (ob_state.get("bid_notional") or [0])[0] ask_top = (ob_state.get("ask_notional") or [0])[0] total = bid_top + ask_top # Normalize total top-of-book notional: $200k = 1.0 (well above typical min order size) depth = min(1.0, total / 200_000.0) if total > 0 else 0.0 ob_stale = False ob_depth = max(0.3, depth) ob_fill = ob_depth logger.debug("OBF live: age=%.1fs total_notional=%.0f depth=%.3f", ob_age_s, total, ob_depth) except Exception as ob_exc: logger.debug("OBF parse error: %s", ob_exc) ob_stale, ob_depth, ob_fill = True, 0.5, 0.5 # ExYF (external factors / dvol) — exf_fetcher IS deployed; absence/staleness = real failure # Cat4 models dvol spike risk: spike → 0.3, no spike (t=999) → ≈1.0 DVOL_SPIKE_THRESHOLD = 70.0 # BTC DVOL > 70 = elevated vol regime EXF_STALE_S = 120.0 # exf_latest older than 2 min = stale DVOL_STALE_S = 300.0 # dvol_btc staleness > 5 min = unknown try: exf_raw = features_map.get("exf_latest") if exf_raw is None: dvol_spike, t_since_spike_min = True, 0.0 else: exf = json.loads(exf_raw) pushed_at = exf.get("_pushed_at") exf_age_s = (now_ts - datetime.fromisoformat(pushed_at).timestamp() if pushed_at else 999.0) if exf_age_s > EXF_STALE_S: # ExF feed stale — treat as unknown risk dvol_spike, t_since_spike_min = True, 0.0 else: dvol_btc = exf.get("dvol_btc") dvol_staleness = (exf.get("_staleness_s") or {}).get("dvol_btc", 999.0) if dvol_btc is None or dvol_staleness > DVOL_STALE_S: # dvol specifically not fresh dvol_spike, t_since_spike_min = True, 0.0 else: dvol_spike = float(dvol_btc) > DVOL_SPIKE_THRESHOLD t_since_spike_min = 0.0 if dvol_spike else 999.0 except Exception: dvol_spike, t_since_spike_min = True, 0.0 # Drawdown from engine snapshot try: snap_raw = hz.get_map("DOLPHIN_STATE_BLUE").blocking().get("engine_snapshot") snap = json.loads(snap_raw) if snap_raw else {} drawdown = snap.get("drawdown", 0.0) except Exception: drawdown = 0.0 rm, breakdown = self._survival_stack.compute_rm( hz_nodes=hz_nodes, heartbeat_age_s=heartbeat_age_s, mc_status=mc_status, mc_staleness_hours=mc_age_h, ob_depth_quality=ob_depth, ob_fill_prob=ob_fill, ob_stale=ob_stale, dvol_spike=dvol_spike, t_since_spike_min=t_since_spike_min, drawdown=drawdown, ) self._survival_stack.update_posture(rm) self._survival_stack.write_to_hz(rm, breakdown, hz) try: from ch_writer import ch_put, ts_us as _ts curr_posture = (getattr(self._survival_stack, 'posture', None) or getattr(self._survival_stack, 'current_posture', None) or '') if curr_posture: prev = getattr(self, '_ch_prev_posture', '') if curr_posture != prev: ch_put("posture_events", { "ts": _ts(), "posture": curr_posture, "rm": float(rm), "prev_posture": prev, "trigger": str(breakdown)[:200], "scan_uuid": "", }) self._ch_prev_posture = curr_posture except Exception: pass except Exception as e: logger.debug("SurvivalStack processing error: %s", e) # ── Emit ───────────────────────────────────────────────────────────────── def _emit(self, report: HealthReport): d = asdict(report) # Local JSON try: STATUS_JSON.write_text(json.dumps(d, indent=2)) except Exception as e: logger.error("Failed to write status JSON: %s", e) # HZ push hz = self._get_hz() if hz: try: hz.get_map("DOLPHIN_META_HEALTH").blocking().put("latest", json.dumps(d)) except Exception: pass try: from ch_writer import ch_put, ts_us as _ts ch_put("meta_health", { "ts": _ts(), "status": report.status, "rm_meta": report.rm_meta, "m1_data_infra": report.m1_data_infra, "m1_trader": report.m1_trader, "m2_heartbeat": report.m2_heartbeat, "m3_data_freshness": report.m3_data_freshness, "m4_control_plane": report.m4_control_plane, "m5_coherence": report.m5_coherence, "m6_test_integrity": report.m6_test_integrity, }) except Exception: pass logger.info( "RM_META=%.3f [%s] M4=%.2f M1_data=%.2f M3=%.2f M5=%.2f M6=%.2f " "M1_trader=%.2f M2_hb=%.2f", report.rm_meta, report.status, report.m4_control_plane, report.m1_data_infra, report.m3_data_freshness, report.m5_coherence, report.m6_test_integrity, report.m1_trader, report.m2_heartbeat, ) # ── Main loop ───────────────────────────────────────────────────────────── def run(self): logger.info("MHSv3 running (interval=%.0fs)", CHECK_INTERVAL_S) while True: t0 = time.monotonic() try: m4 = self._m4_control_plane() m1_data, m1_trader, svc_s = self._m1_process_integrity() m2 = self._m2_heartbeat_freshness() m3, hz_keys = self._m3_data_freshness() m5 = self._m5_coherence() m6 = self._m6_test_integrity() rm, status = self._compute_rm_meta(m4, m1_data, m3, m5, m6) report = HealthReport( rm_meta = rm, status = status, m4_control_plane = round(m4, 3), m1_data_infra = round(m1_data, 3), m1_trader = round(m1_trader, 3), m2_heartbeat = round(m2, 3), m3_data_freshness = round(m3, 3), m5_coherence = round(m5, 3), m6_test_integrity = round(m6, 3), service_status = svc_s, hz_key_status = hz_keys, timestamp = datetime.now(timezone.utc).isoformat(), ) self._emit(report) self._attempt_recovery(report) self._process_survival_stack(rm, m1_data) except Exception as e: logger.error("Main loop error: %s", e) elapsed = time.monotonic() - t0 sleep = max(1.0, CHECK_INTERVAL_S - elapsed) time.sleep(sleep) if __name__ == "__main__": import signal svc = MetaHealthServiceV3() def _sig(signum, frame): logger.info("MHSv3 received signal %d — shutting down", signum) sys.exit(0) signal.signal(signal.SIGTERM, _sig) try: svc.run() except KeyboardInterrupt: logger.info("MHSv3 stopped by user") sys.exit(0)