Files
DOLPHIN/prod/exf_persistence.py

452 lines
16 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
EXF PERSISTENCE SERVICE v1.0
============================
Off-hot-path persistence for External Factors indicators.
Design:
- Background thread, non-blocking to hot path
- Writes ALL fetched indicators to NPZ (DOLPHIN-compliant format)
- Separate from Hazelcast push (HZ = instant, Disk = durability)
- Integrity checksums for validation
- Automatic cleanup of old files
DOLPHIN Format:
/mnt/ng6_data/eigenvalues/{YYYY-MM-DD}/extf_snapshot_{timestamp}__Indicators.npz
Author: DOLPHIN ExF System
Date: 2026-03-17
"""
import json
import logging
import threading
import time
import hashlib
import numpy as np
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, field
from collections import deque
logger = logging.getLogger(__name__)
# Configuration
DATA_DIR = Path("/mnt/ng6_data/eigenvalues")
FLUSH_INTERVAL_S = 300 # 5 minutes (off hot path)
MAX_FILE_AGE_DAYS = 7 # Keep 7 days of history
REQUIRED_INDICATORS_MIN = 20 # Minimum indicators for data sufficiency
@dataclass
class PersistenceStats:
"""Statistics for monitoring."""
files_written: int = 0
files_failed: int = 0
bytes_written: int = 0
last_write_time: float = 0.0
last_write_path: Optional[Path] = None
integrity_errors: int = 0
history: deque = field(default_factory=lambda: deque(maxlen=100))
class ExFPersistenceService:
"""
Off-hot-path persistence service for External Factors.
Usage:
svc = ExFPersistenceService()
svc.start()
# In hot path (exf_fetcher_flow):
svc.update_snapshot(indicators_dict)
svc.stop()
"""
def __init__(self, data_dir: Path = DATA_DIR, flush_interval_s: float = FLUSH_INTERVAL_S):
self.data_dir = Path(data_dir)
self.flush_interval_s = flush_interval_s
self._stats = PersistenceStats()
self._latest_snapshot: Optional[Dict[str, Any]] = None
self._snapshot_lock = threading.Lock()
self._running = False
self._thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
# Ensure data directory exists
self.data_dir.mkdir(parents=True, exist_ok=True)
# ----- Public API (thread-safe, non-blocking) -----
def update_snapshot(self, indicators: Dict[str, Any]) -> None:
"""
Update the latest snapshot (called from hot path).
This is non-blocking - just updates in-memory reference.
"""
with self._snapshot_lock:
# Deep copy to avoid reference issues
self._latest_snapshot = dict(indicators)
def get_stats(self) -> Dict[str, Any]:
"""Get persistence statistics."""
with self._snapshot_lock:
return {
'files_written': self._stats.files_written,
'files_failed': self._stats.files_failed,
'bytes_written': self._stats.bytes_written,
'last_write_time': self._stats.last_write_time,
'last_write_path': str(self._stats.last_write_path) if self._stats.last_write_path else None,
'integrity_errors': self._stats.integrity_errors,
'has_snapshot': self._latest_snapshot is not None,
'snapshot_keys': len(self._latest_snapshot) if self._latest_snapshot else 0,
}
def check_data_sufficiency(self) -> Dict[str, Any]:
"""
Check if we have sufficient data for ACB and alpha engine.
Returns sufficiency score and details.
"""
with self._snapshot_lock:
if not self._latest_snapshot:
return {'sufficient': False, 'score': 0.0, 'reason': 'no_snapshot'}
snapshot = self._latest_snapshot
# ACB-critical indicators (must have fresh data)
acb_critical = [
'funding_btc', 'funding_eth', 'dvol_btc', 'dvol_eth',
'fng', 'vix', 'ls_btc', 'taker', 'oi_btc'
]
# Count available and fresh indicators
acb_available = sum(1 for k in acb_critical if k in snapshot)
total_available = sum(1 for k in snapshot.keys() if not k.startswith('_'))
# Check staleness
staleness = snapshot.get('_staleness_s', {})
stale_count = sum(1 for k, v in staleness.items()
if isinstance(v, (int, float)) and v > 60) # >60s = stale
# Calculate sufficiency score
acb_score = acb_available / len(acb_critical)
coverage_score = min(1.0, total_available / REQUIRED_INDICATORS_MIN)
freshness_score = 1.0 - (stale_count / max(1, len(staleness)))
overall_score = (acb_score * 0.5) + (coverage_score * 0.3) + (freshness_score * 0.2)
return {
'sufficient': overall_score > 0.7,
'score': round(overall_score, 3),
'acb_critical': f"{acb_available}/{len(acb_critical)}",
'total_indicators': total_available,
'stale_indicators': stale_count,
'freshness': round(freshness_score, 3),
}
# ----- Background Operations -----
def _compute_checksum(self, data: bytes) -> str:
"""Compute MD5 checksum for integrity verification."""
return hashlib.md5(data).hexdigest()
def _write_npz(self, snapshot: Dict[str, Any]) -> Optional[Path]:
"""
Write snapshot to NPZ file in DOLPHIN-compliant format.
Returns path on success, None on failure.
"""
try:
now = datetime.now(timezone.utc)
date_str = now.strftime('%Y-%m-%d')
timestamp = now.strftime('%Y-%m-%d_%H-%M-%S')
# Create date directory
date_dir = self.data_dir / date_str
date_dir.mkdir(parents=True, exist_ok=True)
# Prepare filename
filename = f"extf_snapshot_{timestamp}__Indicators.npz"
filepath = date_dir / filename
# Separate numeric data from metadata
numeric_data = {}
metadata = {
'_timestamp_utc': now.isoformat(),
'_version': '1.0',
'_service': 'ExFPersistence',
}
for key, value in snapshot.items():
if key.startswith('_'):
# Metadata/structural keys
if isinstance(value, dict):
metadata[key] = json.dumps(value)
else:
metadata[key] = value
elif isinstance(value, (int, float)):
if value == value: # Not NaN
numeric_data[key] = np.array([value], dtype=np.float64)
elif isinstance(value, (list, tuple)) and len(value) > 0:
try:
numeric_data[key] = np.array(value, dtype=np.float64)
except (ValueError, TypeError):
metadata[key] = json.dumps(value)
else:
metadata[key] = str(value)
# Write NPZ with compression
np.savez_compressed(
filepath,
_metadata=json.dumps(metadata),
**numeric_data
)
# Verify checksum
file_bytes = filepath.read_bytes()
checksum = self._compute_checksum(file_bytes)
# Write checksum file
checksum_path = filepath.with_suffix('.npz.sha256')
checksum_path.write_text(f"{checksum} {filename}\n")
logger.info(f"Wrote ExF snapshot: {filepath.name} ({len(numeric_data)} indicators, {len(file_bytes)} bytes)")
return filepath
except Exception as e:
logger.error(f"Failed to write ExF snapshot: {e}")
self._stats.files_failed += 1
return None
def _cleanup_old_files(self) -> int:
"""Remove files older than MAX_FILE_AGE_DAYS. Returns count removed."""
removed = 0
cutoff = time.time() - (MAX_FILE_AGE_DAYS * 24 * 3600)
try:
for date_dir in self.data_dir.iterdir():
if not date_dir.is_dir():
continue
# Check if directory is old
try:
dir_time = date_dir.stat().st_mtime
if dir_time < cutoff:
# Remove old directory
import shutil
shutil.rmtree(date_dir)
removed += 1
logger.info(f"Cleaned up old ExF directory: {date_dir.name}")
except Exception as e:
logger.warning(f"Failed to cleanup {date_dir}: {e}")
except Exception as e:
logger.warning(f"Cleanup error: {e}")
return removed
def _flush_loop(self) -> None:
"""Background thread: periodically flush to disk."""
logger.info(f"ExF Persistence loop started (interval={self.flush_interval_s}s)")
while not self._stop_event.is_set():
try:
# Get current snapshot
with self._snapshot_lock:
snapshot = self._latest_snapshot.copy() if self._latest_snapshot else None
if snapshot:
# Write to disk
filepath = self._write_npz(snapshot)
if filepath:
file_size = filepath.stat().st_size
self._stats.files_written += 1
self._stats.bytes_written += file_size
self._stats.last_write_time = time.time()
self._stats.last_write_path = filepath
# Record history entry
sufficiency = self.check_data_sufficiency()
self._stats.history.append({
'timestamp': datetime.now(timezone.utc).isoformat(),
'path': str(filepath),
'size': file_size,
'indicators': len([k for k in snapshot.keys() if not k.startswith('_')]),
'sufficient': sufficiency['sufficient'],
'score': sufficiency['score'],
})
# Periodic cleanup (every ~20 flushes = ~100 min)
if self._stats.files_written % 20 == 0:
self._cleanup_old_files()
except Exception as e:
logger.error(f"Flush loop error: {e}")
self._stats.files_failed += 1
# Wait for next flush
self._stop_event.wait(timeout=self.flush_interval_s)
logger.info("ExF Persistence loop stopped")
# ----- Lifecycle -----
def start(self) -> None:
"""Start the persistence service."""
if self._running:
return
self._running = True
self._stop_event.clear()
self._thread = threading.Thread(target=self._flush_loop, daemon=True)
self._thread.start()
logger.info(f"ExFPersistenceService started (data_dir={self.data_dir})")
def stop(self) -> None:
"""Stop the persistence service."""
if not self._running:
return
self._running = False
self._stop_event.set()
if self._thread:
self._thread.join(timeout=10)
logger.info("ExFPersistenceService stopped")
def force_flush(self) -> Optional[Path]:
"""Force an immediate flush (for testing/debugging)."""
with self._snapshot_lock:
snapshot = self._latest_snapshot.copy() if self._latest_snapshot else None
if snapshot:
return self._write_npz(snapshot)
return None
# =====================================================================
# INTEGRITY CHECKER
# =====================================================================
class ExFIntegrityChecker:
"""
Verifies integrity between Hazelcast cache and disk persistence.
Can be run periodically to detect data divergence.
"""
def __init__(self, hz_client, persistence_service: ExFPersistenceService):
self.hz_client = hz_client
self.persistence = persistence_service
def check_integrity(self) -> Dict[str, Any]:
"""
Compare HZ data with last persisted file.
Returns integrity report.
"""
try:
# Get HZ data
features_map = self.hz_client.get_map("DOLPHIN_FEATURES")
hz_data_raw = features_map.get("exf_latest").result()
if not hz_data_raw:
return {'status': 'error', 'reason': 'hz_data_missing'}
hz_data = json.loads(hz_data_raw) if isinstance(hz_data_raw, str) else hz_data_raw
# Get last persisted file
stats = self.persistence.get_stats()
last_path = stats.get('last_write_path')
if not last_path:
return {'status': 'error', 'reason': 'no_persisted_file'}
# Load persisted data
with np.load(last_path, allow_pickle=True) as npz:
metadata = json.loads(str(npz['_metadata']))
persisted_indicators = set(k for k in npz.keys() if not k.startswith('_'))
# Compare
hz_indicators = set(k for k in hz_data.keys() if not k.startswith('_'))
missing_in_persist = hz_indicators - persisted_indicators
missing_in_hz = persisted_indicators - hz_indicators
# Check timestamp drift
hz_time = hz_data.get('_pushed_at', '')
persist_time = metadata.get('_timestamp_utc', '')
time_drift_s = None
if hz_time and persist_time:
try:
hz_dt = datetime.fromisoformat(hz_time.replace('Z', '+00:00'))
p_dt = datetime.fromisoformat(persist_time.replace('Z', '+00:00'))
time_drift_s = abs((hz_dt - p_dt).total_seconds())
except:
pass
integrity_ok = len(missing_in_persist) == 0 and len(missing_in_hz) == 0
if time_drift_s and time_drift_s > 600: # >10 min drift
integrity_ok = False
return {
'status': 'ok' if integrity_ok else 'mismatch',
'hz_indicators': len(hz_indicators),
'persisted_indicators': len(persisted_indicators),
'missing_in_persist': list(missing_in_persist)[:5],
'missing_in_hz': list(missing_in_hz)[:5],
'time_drift_seconds': time_drift_s,
'hz_timestamp': hz_time,
'persist_timestamp': persist_time,
'last_persist_path': last_path,
}
except Exception as e:
logger.error(f"Integrity check failed: {e}")
return {'status': 'error', 'reason': str(e)}
# =====================================================================
# STANDALONE TEST
# =====================================================================
if __name__ == "__main__":
# Test the persistence service
logging.basicConfig(level=logging.INFO)
# Create test data
test_snapshot = {
'funding_btc': 0.0001,
'funding_eth': -0.0002,
'basis': 0.01,
'spread': 0.001,
'vix': 20.0,
'_pushed_at': datetime.now(timezone.utc).isoformat(),
'_staleness_s': {'funding_btc': 10.0, 'basis': 0.5},
}
# Start service
svc = ExFPersistenceService(flush_interval_s=5)
svc.start()
# Update snapshot
svc.update_snapshot(test_snapshot)
# Wait for flush
print("Waiting for flush...")
time.sleep(6)
# Check stats
stats = svc.get_stats()
print(f"Stats: {json.dumps(stats, indent=2, default=str)}")
# Check sufficiency
suff = svc.check_data_sufficiency()
print(f"Sufficiency: {json.dumps(suff, indent=2)}")
# Stop
svc.stop()
print("Test complete")