""" MIG6.1 & MIG6.2: ACB Processor Service Watches for new scan arrivals and atomically computes/writes ACB boost to the Hazelcast DOLPHIN_FEATURES map using CP Subsystem lock for atomicity. """ import sys import time import json import logging from pathlib import Path from datetime import datetime import hazelcast HCM_DIR = Path(__file__).parent.parent # Use platform-independent paths from dolphin_paths sys.path.insert(0, str(HCM_DIR)) sys.path.insert(0, str(HCM_DIR / 'prod')) from dolphin_paths import get_eigenvalues_path SCANS_DIR = get_eigenvalues_path() sys.path.insert(0, str(HCM_DIR / 'nautilus_dolphin')) from nautilus_dolphin.nautilus.adaptive_circuit_breaker import AdaptiveCircuitBreaker logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s:%(message)s') class ACBProcessorService: def __init__(self, hz_cluster="dolphin", hz_host="localhost:5701"): try: self.hz_client = hazelcast.HazelcastClient( cluster_name=hz_cluster, cluster_members=[hz_host] ) self.imap = self.hz_client.get_map("DOLPHIN_FEATURES").blocking() # Using CP Subsystem lock as per MIG6.1 self.lock = self.hz_client.cp_subsystem.get_lock("acb_update_lock").blocking() except Exception as e: logging.error(f"Failed to connect to Hazelcast: {e}") raise self.acb = AdaptiveCircuitBreaker() self.acb.config.EIGENVALUES_PATH = SCANS_DIR # CRITICAL: override Windows default for Linux self.acb.preload_w750(self._get_recent_dates(60)) self.last_scan_count = 0 self.last_date = None def _get_recent_dates(self, n=60): try: dirs = sorted([d.name for d in SCANS_DIR.iterdir() if d.is_dir() and len(d.name)==10]) return dirs[-n:] except Exception: return [] def get_today_str(self): return datetime.utcnow().strftime('%Y-%m-%d') def check_new_scans(self, date_str): today_dir = SCANS_DIR / date_str if not today_dir.exists(): return False json_files = list(today_dir.glob("scan_*.json")) count = len(json_files) if self.last_date != date_str: self.last_date = date_str self.last_scan_count = 0 # Preload updated dates when day rolls over self.acb.preload_w750(self._get_recent_dates(60)) if count > self.last_scan_count: self.last_scan_count = count return True return False def process_and_write(self, date_str): """Compute ACB boost and write to HZ acb_boost. Preference order: 1. HZ exf_latest — live, pre-lagged values (preferred, ~0.5 s latency) 2. NPZ disk scan — fallback when HZ data absent or stale (>12 h) """ try: boost_info = None # ── HZ path (preferred) ──────────────────────────────────────────── try: exf_raw = self.imap.get('exf_latest') if exf_raw: exf_snapshot = json.loads(exf_raw) scan_raw = self.imap.get('latest_eigen_scan') w750_live = None if scan_raw: scan_data = json.loads(scan_raw) w750_live = scan_data.get('w750_velocity') boost_info = self.acb.get_dynamic_boost_from_hz( date_str, exf_snapshot, w750_velocity=w750_live ) logging.debug(f"ACB computed from HZ: boost={boost_info['boost']:.4f}") except ValueError as ve: logging.warning(f"ACB HZ snapshot stale: {ve} — falling back to NPZ") boost_info = None except Exception as e: logging.warning(f"ACB HZ read failed: {e} — falling back to NPZ") boost_info = None # ── NPZ fallback ─────────────────────────────────────────────────── if boost_info is None: boost_info = self.acb.get_dynamic_boost_for_date(date_str) logging.debug(f"ACB computed from NPZ: boost={boost_info['boost']:.4f}") payload = json.dumps(boost_info) # Atomic Write via CP Subsystem Lock self.lock.lock() try: self.imap.put("acb_boost", payload) logging.info( f"acb_boost updated (src={boost_info.get('source','npz')}): " f"boost={boost_info['boost']:.4f} signals={boost_info['signals']:.1f}" ) try: from ch_writer import ch_put, ts_us as _ts ch_put("acb_state", { "ts": _ts(), "boost": float(boost_info.get("boost", 0)), "beta": float(boost_info.get("beta", 0)), "signals": float(boost_info.get("signals", 0)), }) except Exception: pass finally: self.lock.unlock() except Exception as e: logging.error(f"Error processing ACB: {e}") def run(self, poll_interval=1.0, hz_refresh_interval=30.0): """Main service loop. Two update triggers: 1. New scan files arrive for today → compute from HZ (preferred) or NPZ. 2. hz_refresh_interval elapsed → re-push acb_boost from live exf_latest even when no new scans exist (covers live-only operation days when scan files land in a different directory or not at all). """ logging.info("Starting ACB Processor Service (Python CP Subsystem)...") today = self.get_today_str() # Write immediately on startup so acb_boost is populated from the first second logging.info(f"Startup write for {today}") self.process_and_write(today) last_hz_refresh = time.monotonic() while True: try: today = self.get_today_str() now = time.monotonic() # Trigger 1: new scan files if self.check_new_scans(today): self.process_and_write(today) last_hz_refresh = now # Trigger 2: periodic HZ refresh (ensures acb_boost stays current # even on days with no new NPZ scan files) elif (now - last_hz_refresh) >= hz_refresh_interval: self.process_and_write(today) last_hz_refresh = now time.sleep(poll_interval) except KeyboardInterrupt: break except Exception as e: logging.error(f"Loop error: {e}") time.sleep(5.0) if __name__ == "__main__": service = ACBProcessorService() service.run()