#!/usr/bin/env python3 """ REAL-TIME EXTERNAL FACTORS SERVICE - EVENT-DRIVEN HZ OPTIMIZATION ================================================================= Extension to RealTimeExFService that pushes to Hazelcast immediately when critical indicators change, rather than on a fixed timer. This achieves near-zero latency (<10ms) for critical indicators: - basis - spread - imbal_btc - imbal_eth For slow indicators (funding, etc.), we still batch them. Author: Kimi, DESTINATION/DOLPHIN Machine dev/prod-Agent Date: 2026-03-20 """ import sys import json import time import logging import threading from pathlib import Path from datetime import datetime, timezone from typing import Dict, Any, Optional, Callable from dataclasses import dataclass # Add paths sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent.parent)) from realtime_exf_service import RealTimeExFService, INDICATORS logger = logging.getLogger("exf_hz_events") _LOG_DEBUG = logger.isEnabledFor(logging.DEBUG) _LOG_INFO = logger.isEnabledFor(logging.INFO) # Critical indicators that trigger immediate HZ push CRITICAL_INDICATORS = frozenset(['basis', 'spread', 'imbal_btc', 'imbal_eth']) # Min time between HZ pushes (prevent spam) MIN_PUSH_INTERVAL_S = 0.05 # 50ms = 20 pushes/sec max @dataclass class EventDrivenConfig: """Configuration for event-driven HZ optimization.""" hz_cluster: str = "dolphin" hz_member: str = "localhost:5701" hz_map: str = "DOLPHIN_FEATURES" hz_key: str = "exf_latest" # Push strategy critical_push_interval_s: float = 0.05 # 50ms min between critical pushes batch_push_interval_s: float = 1.0 # 1s for full batch updates # Debouncing value_change_threshold: float = 0.0001 # Ignore tiny changes class EventDrivenExFService: """ Event-driven wrapper around RealTimeExFService. Instead of polling get_indicators() on a fixed interval, we push to Hazelcast immediately when critical indicators change. """ def __init__(self, base_service: RealTimeExFService, config: EventDrivenConfig = None): self.service = base_service self.config = config or EventDrivenConfig() # Hazelcast client (initialized on first use) self._hz_client = None self._hz_lock = threading.Lock() # Push tracking self._last_critical_push = 0.0 self._last_full_push = 0.0 self._push_count = 0 self._critical_push_count = 0 # Previous values for change detection self._prev_values: Dict[str, float] = {} # Background thread for full batch updates self._running = False self._thread = None def _get_hz_client(self): """Lazy initialization of Hazelcast client.""" if self._hz_client is None: import hazelcast self._hz_client = hazelcast.HazelcastClient( cluster_name=self.config.hz_cluster, cluster_members=[self.config.hz_member], connection_timeout=5.0, ) return self._hz_client def _push_to_hz(self, indicators: Dict[str, Any], is_critical: bool = False): """Push indicators to Hazelcast.""" try: client = self._get_hz_client() # Build payload payload = { **indicators, "_pushed_at": datetime.now(timezone.utc).isoformat(), "_push_type": "critical" if is_critical else "batch", "_push_seq": self._push_count, } # Push features_map = client.get_map(self.config.hz_map) features_map.blocking().put( self.config.hz_key, json.dumps(payload, default=str) ) self._push_count += 1 if is_critical: self._critical_push_count += 1 if _LOG_DEBUG: logger.debug("HZ push: %s (%d indicators)", "CRITICAL" if is_critical else "batch", len(indicators)) return True except Exception as e: if _LOG_INFO: logger.warning("HZ push failed: %s", e) return False def _check_critical_changes(self) -> Optional[Dict[str, Any]]: """ Check if critical indicators changed significantly. Returns changed indicators dict or None if no significant change. """ now = time.monotonic() # Rate limiting if now - self._last_critical_push < self.config.critical_push_interval_s: return None with self.service._lock: changed = {} for name in CRITICAL_INDICATORS: if name not in self.service.state: continue state = self.service.state[name] if not state.success: continue current_val = state.value prev_val = self._prev_values.get(name) # First time or significant change if prev_val is None: changed[name] = current_val self._prev_values[name] = current_val elif abs(current_val - prev_val) > self.config.value_change_threshold: changed[name] = current_val self._prev_values[name] = current_val if changed: self._last_critical_push = now return changed return None def _get_full_snapshot(self) -> Dict[str, Any]: """Get full indicator snapshot.""" indicators = self.service.get_indicators(dual_sample=True) staleness = indicators.pop("_staleness", {}) # Build payload like exf_fetcher_flow payload = {k: v for k, v in indicators.items() if isinstance(v, (int, float))} payload["_staleness_s"] = {k: round(v, 1) for k, v in staleness.items()} payload["_acb_ready"] = all(k in payload for k in [ "funding_btc", "funding_eth", "dvol_btc", "dvol_eth", "fng", "vix", "ls_btc", "taker" ]) payload["_ok_count"] = sum(1 for v in payload.values() if isinstance(v, float) and v == v) return payload def _event_loop(self): """Main event loop - checks for changes and pushes to HZ.""" if _LOG_INFO: logger.info("Event-driven loop started (critical: %s)", ", ".join(CRITICAL_INDICATORS)) while self._running: t0 = time.monotonic() # Check for critical changes first critical_changes = self._check_critical_changes() if critical_changes: # Get full snapshot but prioritize critical changes full_data = self._get_full_snapshot() self._push_to_hz(full_data, is_critical=True) # Periodic full batch update now = time.monotonic() if now - self._last_full_push >= self.config.batch_push_interval_s: full_data = self._get_full_snapshot() self._push_to_hz(full_data, is_critical=False) self._last_full_push = now if _LOG_INFO and self._push_count % 10 == 1: logger.info("Batch push #%d (critical: %d)", self._push_count, self._critical_push_count) # Sleep briefly to prevent CPU spinning # But wake up quickly to catch changes elapsed = time.monotonic() - t0 sleep_time = max(0.01, 0.05 - elapsed) # 10-50ms sleep time.sleep(sleep_time) if _LOG_INFO: logger.info("Event-driven loop stopped") def start(self): """Start the event-driven service.""" self.service.start() self._running = True self._thread = threading.Thread(target=self._event_loop, daemon=True) self._thread.start() if _LOG_INFO: logger.info("Event-driven ExF service started") def stop(self): """Stop the service.""" self._running = False if self._thread: self._thread.join(timeout=5.0) self.service.stop() if self._hz_client: try: self._hz_client.shutdown() except Exception: pass if _LOG_INFO: logger.info("Event-driven ExF service stopped (%d pushes, %d critical)", self._push_count, self._critical_push_count) def get_status(self) -> Dict[str, Any]: """Get service status.""" return { 'running': self._running, 'push_count': self._push_count, 'critical_push_count': self._critical_push_count, 'last_critical_push': self._last_critical_push, 'hz_connected': self._hz_client is not None, } # ============================================================================= # DROP-IN REPLACEMENT FLOW # ============================================================================= def run_event_driven_flow(warmup_s: int = 10): """ Run event-driven ExF flow (drop-in replacement for exf_fetcher_flow). This pushes to Hazelcast immediately when critical indicators change, rather than on a fixed 0.5s interval. """ logging.basicConfig(level=logging.INFO) print("=" * 60) print("Event-Driven ExF Flow") print("=" * 60) # Create base service base_service = RealTimeExFService() # Wrap with event-driven layer event_service = EventDrivenExFService(base_service) # Start event_service.start() print(f"Service started — warmup {warmup_s}s...") time.sleep(warmup_s) print(f"Running event-driven (critical: {CRITICAL_INDICATORS})") print("Push on change vs fixed interval") try: while True: time.sleep(10) status = event_service.get_status() print(f"Pushes: {status['push_count']} ({status['critical_push_count']} critical)") except KeyboardInterrupt: print("\nStopping...") finally: event_service.stop() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--warmup", type=int, default=10) args = parser.parse_args() run_event_driven_flow(warmup_s=args.warmup)