Files
DOLPHIN/prod/exf_integrity_monitor.py
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

423 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""
EXF INTEGRITY MONITOR v1.0
==========================
Continuous monitoring of ExF data integrity across:
- Hazelcast cache (hot path)
- Disk persistence (durability)
- Source APIs (freshness)
Alerts on:
- Data divergence between HZ and disk
- Stale indicators (> threshold)
- Missing critical indicators (ACB keys)
- Fetch failures
Integration:
- Runs as background thread in exf_fetcher_flow
- Logs to structured JSON for monitoring
- Can trigger alerts (future: webhook/PagerDuty)
Author: DOLPHIN ExF System
Date: 2026-03-17
"""
import json
import logging
import threading
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Any, Optional, Callable, List
from dataclasses import dataclass, field
from collections import deque, defaultdict
logger = logging.getLogger(__name__)
# Alert thresholds
STALENESS_ALERT_S = 120 # 2 minutes
CRITICAL_MISSING_ALERT = True
INTEGRITY_CHECK_INTERVAL_S = 60 # Check every minute
# ACB-critical indicators (must be fresh for alpha engine)
ACB_CRITICAL_INDICATORS = [
'funding_btc', 'funding_eth', 'dvol_btc', 'dvol_eth',
'fng', 'vix', 'ls_btc', 'taker', 'oi_btc'
]
# All indicators we expect (from realtime_exf_service)
EXPECTED_INDICATORS = [
# Binance
'funding_btc', 'funding_eth', 'oi_btc', 'oi_eth', 'ls_btc', 'ls_eth', 'ls_top', 'taker',
'basis', 'imbal_btc', 'imbal_eth', 'spread',
# Deribit
'dvol_btc', 'dvol_eth', 'fund_dbt_btc', 'fund_dbt_eth',
# Macro
'vix', 'dxy', 'us10y', 'sp500', 'fedfunds',
# Sentiment
'fng',
# On-chain
'hashrate',
# DeFi
'tvl',
# Liquidations
'liq_vol_24h', 'liq_long_ratio', 'liq_z_score', 'liq_percentile',
]
@dataclass
class Alert:
"""Single alert entry."""
timestamp: str
severity: str # info, warning, critical
category: str # staleness, missing, integrity, fetch_fail
indicator: Optional[str]
message: str
details: Dict[str, Any] = field(default_factory=dict)
@dataclass
class HealthStatus:
"""Overall health status."""
timestamp: str
overall: str # healthy, degraded, critical
hz_connected: bool
persist_connected: bool
indicators_present: int
indicators_expected: int
acb_ready: bool
stale_count: int
alerts_active: int
details: Dict[str, Any] = field(default_factory=dict)
class ExFIntegrityMonitor:
"""
Continuous integrity monitoring for ExF system.
Usage:
monitor = ExFIntegrityMonitor(hz_client, persistence_svc)
monitor.start()
# Get current status
status = monitor.get_health_status()
monitor.stop()
"""
def __init__(
self,
hz_client=None,
persistence_service=None,
indicator_source: Optional[Callable[[], Dict[str, Any]]] = None,
check_interval_s: float = INTEGRITY_CHECK_INTERVAL_S
):
self.hz_client = hz_client
self.persistence = persistence_service
self.indicator_source = indicator_source # Callback to get current indicators
self.check_interval_s = check_interval_s
self._running = False
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# State
self._alerts: deque = deque(maxlen=1000)
self._health_history: deque = deque(maxlen=100)
self._current_status: Optional[HealthStatus] = None
self._status_lock = threading.Lock()
# Counters
self._checks_performed = 0
self._alerts_triggered = 0
# ----- Public API -----
def start(self) -> None:
"""Start the monitoring loop."""
if self._running:
return
self._running = True
self._stop_event.clear()
self._thread = threading.Thread(target=self._monitor_loop, daemon=True)
self._thread.start()
logger.info(f"ExFIntegrityMonitor started (interval={self.check_interval_s}s)")
def stop(self) -> None:
"""Stop the monitoring loop."""
if not self._running:
return
self._running = False
self._stop_event.set()
if self._thread:
self._thread.join(timeout=5)
logger.info("ExFIntegrityMonitor stopped")
def get_health_status(self) -> Optional[HealthStatus]:
"""Get current health status."""
with self._status_lock:
return self._current_status
def get_recent_alerts(self, severity: Optional[str] = None, n: int = 10) -> List[Alert]:
"""Get recent alerts, optionally filtered by severity."""
with self._status_lock:
alerts = list(self._alerts)
if severity:
alerts = [a for a in alerts if a.severity == severity]
return alerts[-n:]
def get_stats(self) -> Dict[str, Any]:
"""Get monitoring statistics."""
with self._status_lock:
return {
'checks_performed': self._checks_performed,
'alerts_triggered': self._alerts_triggered,
'alerts_active': len(self._alerts),
'health_history_entries': len(self._health_history),
}
def force_check(self) -> HealthStatus:
"""Force an immediate integrity check."""
return self._perform_check()
# ----- Internal -----
def _add_alert(self, severity: str, category: str, message: str,
indicator: Optional[str] = None, details: Dict = None) -> None:
"""Add an alert."""
alert = Alert(
timestamp=datetime.now(timezone.utc).isoformat(),
severity=severity,
category=category,
indicator=indicator,
message=message,
details=details or {}
)
with self._status_lock:
self._alerts.append(alert)
self._alerts_triggered += 1
# Log based on severity
log_msg = f"[{severity.upper()}] {category}: {message}"
if severity == 'critical':
logger.error(log_msg)
elif severity == 'warning':
logger.warning(log_msg)
else:
logger.info(log_msg)
def _get_hz_data(self) -> Optional[Dict[str, Any]]:
"""Fetch data from Hazelcast."""
if not self.hz_client:
return None
try:
features_map = self.hz_client.get_map("DOLPHIN_FEATURES")
data = features_map.get("exf_latest").result()
if isinstance(data, str):
return json.loads(data)
return data
except Exception as e:
logger.debug(f"HZ fetch error: {e}")
return None
def _perform_check(self) -> HealthStatus:
"""Perform a single integrity check."""
now = datetime.now(timezone.utc)
now_str = now.isoformat()
# Collect data from all sources
hz_data = self._get_hz_data()
source_data = self.indicator_source() if self.indicator_source else None
persist_stats = self.persistence.get_stats() if self.persistence else None
# Initialize status components
alerts_this_check = []
issues = []
stale_indicators = []
missing_critical = []
# Determine data source (prefer source, fallback to HZ)
current_data = source_data or hz_data or {}
# 1. Check Hazelcast connectivity
hz_connected = hz_data is not None
if not hz_connected:
self._add_alert('critical', 'connectivity',
'Hazelcast connection failed', details={'source': 'hz'})
alerts_this_check.append('hz_connectivity')
# 2. Check persistence connectivity
persist_connected = persist_stats is not None
if not persist_connected and self.persistence:
self._add_alert('warning', 'connectivity',
'Persistence service not available', details={'source': 'persist'})
alerts_this_check.append('persist_connectivity')
# 3. Check indicator presence
present_indicators = set(k for k in current_data.keys()
if not k.startswith('_') and isinstance(current_data[k], (int, float)))
expected_set = set(EXPECTED_INDICATORS)
missing_indicators = expected_set - present_indicators
if missing_indicators:
for ind in list(missing_indicators)[:5]: # Log first 5
self._add_alert('warning', 'missing',
f'Indicator not present', indicator=ind)
alerts_this_check.append(f'missing_{len(missing_indicators)}')
# 4. Check critical indicators (ACB)
for crit in ACB_CRITICAL_INDICATORS:
if crit not in present_indicators or current_data.get(crit) != current_data.get(crit): # NaN check
missing_critical.append(crit)
self._add_alert('critical', 'missing_critical',
f'ACB-critical indicator missing/failed', indicator=crit)
alerts_this_check.append(f'critical_{crit}')
# 5. Check staleness
staleness = current_data.get('_staleness_s', {})
if isinstance(staleness, dict):
for ind, age_s in staleness.items():
if isinstance(age_s, (int, float)) and age_s > STALENESS_ALERT_S:
stale_indicators.append((ind, age_s))
self._add_alert('warning', 'staleness',
f'Indicator stale ({age_s:.0f}s)', indicator=ind,
details={'staleness_seconds': age_s})
alerts_this_check.append(f'stale_{ind}')
# 6. Check HZ vs persistence divergence (if both available)
if hz_data and persist_stats and persist_stats.get('last_write_path'):
try:
import numpy as np
last_path = Path(persist_stats['last_write_path'])
if last_path.exists():
with np.load(last_path, allow_pickle=True) as npz:
persisted_keys = set(k for k in npz.keys() if not k.startswith('_'))
hz_keys = set(k for k in hz_data.keys() if not k.startswith('_'))
divergence = persisted_keys.symmetric_difference(hz_keys)
if len(divergence) > 3: # Allow small differences
self._add_alert('warning', 'integrity',
f'HZ/disk divergence: {len(divergence)} indicators',
details={'hz_only': list(hz_keys - persisted_keys)[:3],
'disk_only': list(persisted_keys - hz_keys)[:3]})
alerts_this_check.append('divergence')
except Exception as e:
logger.debug(f"Divergence check error: {e}")
# 7. Check ACB readiness
acb_ready = current_data.get('_acb_ready', False)
if not acb_ready and len(missing_critical) == 0:
# ACB not ready but we have all critical indicators - investigate
pass # This is handled by the fetcher flow
# Determine overall health
if len(missing_critical) > 0 or not hz_connected:
overall = 'critical'
elif len(stale_indicators) > 3 or len(missing_indicators) > 10:
overall = 'degraded'
else:
overall = 'healthy'
# Build status
status = HealthStatus(
timestamp=now_str,
overall=overall,
hz_connected=hz_connected,
persist_connected=persist_connected,
indicators_present=len(present_indicators),
indicators_expected=len(expected_set),
acb_ready=acb_ready and len(missing_critical) == 0,
stale_count=len(stale_indicators),
alerts_active=len(alerts_this_check),
details={
'missing_critical': missing_critical,
'stale_indicators': [i[0] for i in stale_indicators],
'max_staleness': max((i[1] for i in stale_indicators), default=0),
}
)
with self._status_lock:
self._current_status = status
self._health_history.append(status)
self._checks_performed += 1
# Structured log
if overall != 'healthy':
logger.warning(f"Health check: {overall} | "
f"indicators={len(present_indicators)}/{len(expected_set)} | "
f"acb_ready={status.acb_ready} | "
f"alerts={len(alerts_this_check)}")
else:
logger.debug(f"Health check: {overall} | "
f"indicators={len(present_indicators)}/{len(expected_set)}")
return status
def _monitor_loop(self) -> None:
"""Main monitoring loop."""
logger.info("Integrity monitor loop started")
while not self._stop_event.is_set():
try:
self._perform_check()
except Exception as e:
logger.error(f"Monitor check error: {e}")
self._stop_event.wait(timeout=self.check_interval_s)
logger.info("Integrity monitor loop stopped")
# =====================================================================
# STANDALONE TEST
# =====================================================================
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Create monitor with mock data source
def mock_source():
return {
'funding_btc': 0.0001,
'funding_eth': -0.0002,
'basis': 0.01,
'vix': 20.0,
'_staleness_s': {'funding_btc': 10.0, 'basis': 150.0}, # basis is stale
'_acb_ready': True,
}
monitor = ExFIntegrityMonitor(indicator_source=mock_source)
monitor.start()
# Let it run
print("Monitor running for 5 seconds...")
time.sleep(5)
# Check status
status = monitor.get_health_status()
if status:
print(f"\nHealth Status:")
print(f" Overall: {status.overall}")
print(f" Indicators: {status.indicators_present}/{status.indicators_expected}")
print(f" ACB Ready: {status.acb_ready}")
print(f" Stale count: {status.stale_count}")
print(f" Details: {status.details}")
alerts = monitor.get_recent_alerts(n=5)
print(f"\nRecent Alerts ({len(alerts)}):")
for a in alerts:
print(f" [{a.severity}] {a.category}: {a.message}")
monitor.stop()
print("\nTest complete")