Files
DOLPHIN/external_factors/realtime_exf_service_hz_events.py

322 lines
11 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
REAL-TIME EXTERNAL FACTORS SERVICE - EVENT-DRIVEN HZ OPTIMIZATION
=================================================================
Extension to RealTimeExFService that pushes to Hazelcast immediately
when critical indicators change, rather than on a fixed timer.
This achieves near-zero latency (<10ms) for critical indicators:
- basis
- spread
- imbal_btc
- imbal_eth
For slow indicators (funding, etc.), we still batch them.
Author: Kimi, DESTINATION/DOLPHIN Machine dev/prod-Agent
Date: 2026-03-20
"""
import sys
import json
import time
import logging
import threading
from pathlib import Path
from datetime import datetime, timezone
from typing import Dict, Any, Optional, Callable
from dataclasses import dataclass
# Add paths
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(Path(__file__).parent.parent))
from realtime_exf_service import RealTimeExFService, INDICATORS
logger = logging.getLogger("exf_hz_events")
_LOG_DEBUG = logger.isEnabledFor(logging.DEBUG)
_LOG_INFO = logger.isEnabledFor(logging.INFO)
# Critical indicators that trigger immediate HZ push
CRITICAL_INDICATORS = frozenset(['basis', 'spread', 'imbal_btc', 'imbal_eth'])
# Min time between HZ pushes (prevent spam)
MIN_PUSH_INTERVAL_S = 0.05 # 50ms = 20 pushes/sec max
@dataclass
class EventDrivenConfig:
"""Configuration for event-driven HZ optimization."""
hz_cluster: str = "dolphin"
hz_member: str = "localhost:5701"
hz_map: str = "DOLPHIN_FEATURES"
hz_key: str = "exf_latest"
# Push strategy
critical_push_interval_s: float = 0.05 # 50ms min between critical pushes
batch_push_interval_s: float = 1.0 # 1s for full batch updates
# Debouncing
value_change_threshold: float = 0.0001 # Ignore tiny changes
class EventDrivenExFService:
"""
Event-driven wrapper around RealTimeExFService.
Instead of polling get_indicators() on a fixed interval,
we push to Hazelcast immediately when critical indicators change.
"""
def __init__(self, base_service: RealTimeExFService, config: EventDrivenConfig = None):
self.service = base_service
self.config = config or EventDrivenConfig()
# Hazelcast client (initialized on first use)
self._hz_client = None
self._hz_lock = threading.Lock()
# Push tracking
self._last_critical_push = 0.0
self._last_full_push = 0.0
self._push_count = 0
self._critical_push_count = 0
# Previous values for change detection
self._prev_values: Dict[str, float] = {}
# Background thread for full batch updates
self._running = False
self._thread = None
def _get_hz_client(self):
"""Lazy initialization of Hazelcast client."""
if self._hz_client is None:
import hazelcast
self._hz_client = hazelcast.HazelcastClient(
cluster_name=self.config.hz_cluster,
cluster_members=[self.config.hz_member],
connection_timeout=5.0,
)
return self._hz_client
def _push_to_hz(self, indicators: Dict[str, Any], is_critical: bool = False):
"""Push indicators to Hazelcast."""
try:
client = self._get_hz_client()
# Build payload
payload = {
**indicators,
"_pushed_at": datetime.now(timezone.utc).isoformat(),
"_push_type": "critical" if is_critical else "batch",
"_push_seq": self._push_count,
}
# Push
features_map = client.get_map(self.config.hz_map)
features_map.blocking().put(
self.config.hz_key,
json.dumps(payload, default=str)
)
self._push_count += 1
if is_critical:
self._critical_push_count += 1
if _LOG_DEBUG:
logger.debug("HZ push: %s (%d indicators)",
"CRITICAL" if is_critical else "batch",
len(indicators))
return True
except Exception as e:
if _LOG_INFO:
logger.warning("HZ push failed: %s", e)
return False
def _check_critical_changes(self) -> Optional[Dict[str, Any]]:
"""
Check if critical indicators changed significantly.
Returns changed indicators dict or None if no significant change.
"""
now = time.monotonic()
# Rate limiting
if now - self._last_critical_push < self.config.critical_push_interval_s:
return None
with self.service._lock:
changed = {}
for name in CRITICAL_INDICATORS:
if name not in self.service.state:
continue
state = self.service.state[name]
if not state.success:
continue
current_val = state.value
prev_val = self._prev_values.get(name)
# First time or significant change
if prev_val is None:
changed[name] = current_val
self._prev_values[name] = current_val
elif abs(current_val - prev_val) > self.config.value_change_threshold:
changed[name] = current_val
self._prev_values[name] = current_val
if changed:
self._last_critical_push = now
return changed
return None
def _get_full_snapshot(self) -> Dict[str, Any]:
"""Get full indicator snapshot."""
indicators = self.service.get_indicators(dual_sample=True)
staleness = indicators.pop("_staleness", {})
# Build payload like exf_fetcher_flow
payload = {k: v for k, v in indicators.items() if isinstance(v, (int, float))}
payload["_staleness_s"] = {k: round(v, 1) for k, v in staleness.items()}
payload["_acb_ready"] = all(k in payload for k in [
"funding_btc", "funding_eth", "dvol_btc", "dvol_eth",
"fng", "vix", "ls_btc", "taker"
])
payload["_ok_count"] = sum(1 for v in payload.values()
if isinstance(v, float) and v == v)
return payload
def _event_loop(self):
"""Main event loop - checks for changes and pushes to HZ."""
if _LOG_INFO:
logger.info("Event-driven loop started (critical: %s)",
", ".join(CRITICAL_INDICATORS))
while self._running:
t0 = time.monotonic()
# Check for critical changes first
critical_changes = self._check_critical_changes()
if critical_changes:
# Get full snapshot but prioritize critical changes
full_data = self._get_full_snapshot()
self._push_to_hz(full_data, is_critical=True)
# Periodic full batch update
now = time.monotonic()
if now - self._last_full_push >= self.config.batch_push_interval_s:
full_data = self._get_full_snapshot()
self._push_to_hz(full_data, is_critical=False)
self._last_full_push = now
if _LOG_INFO and self._push_count % 10 == 1:
logger.info("Batch push #%d (critical: %d)",
self._push_count, self._critical_push_count)
# Sleep briefly to prevent CPU spinning
# But wake up quickly to catch changes
elapsed = time.monotonic() - t0
sleep_time = max(0.01, 0.05 - elapsed) # 10-50ms sleep
time.sleep(sleep_time)
if _LOG_INFO:
logger.info("Event-driven loop stopped")
def start(self):
"""Start the event-driven service."""
self.service.start()
self._running = True
self._thread = threading.Thread(target=self._event_loop, daemon=True)
self._thread.start()
if _LOG_INFO:
logger.info("Event-driven ExF service started")
def stop(self):
"""Stop the service."""
self._running = False
if self._thread:
self._thread.join(timeout=5.0)
self.service.stop()
if self._hz_client:
try:
self._hz_client.shutdown()
except Exception:
pass
if _LOG_INFO:
logger.info("Event-driven ExF service stopped (%d pushes, %d critical)",
self._push_count, self._critical_push_count)
def get_status(self) -> Dict[str, Any]:
"""Get service status."""
return {
'running': self._running,
'push_count': self._push_count,
'critical_push_count': self._critical_push_count,
'last_critical_push': self._last_critical_push,
'hz_connected': self._hz_client is not None,
}
# =============================================================================
# DROP-IN REPLACEMENT FLOW
# =============================================================================
def run_event_driven_flow(warmup_s: int = 10):
"""
Run event-driven ExF flow (drop-in replacement for exf_fetcher_flow).
This pushes to Hazelcast immediately when critical indicators change,
rather than on a fixed 0.5s interval.
"""
logging.basicConfig(level=logging.INFO)
print("=" * 60)
print("Event-Driven ExF Flow")
print("=" * 60)
# Create base service
base_service = RealTimeExFService()
# Wrap with event-driven layer
event_service = EventDrivenExFService(base_service)
# Start
event_service.start()
print(f"Service started — warmup {warmup_s}s...")
time.sleep(warmup_s)
print(f"Running event-driven (critical: {CRITICAL_INDICATORS})")
print("Push on change vs fixed interval")
try:
while True:
time.sleep(10)
status = event_service.get_status()
print(f"Pushes: {status['push_count']} ({status['critical_push_count']} critical)")
except KeyboardInterrupt:
print("\nStopping...")
finally:
event_service.stop()
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--warmup", type=int, default=10)
args = parser.parse_args()
run_event_driven_flow(warmup_s=args.warmup)