initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems: - prod/ (BLUE harness, configs, scripts, docs) - nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved) - adaptive_exit/ (AEM engine + models/bucket_assignments.pkl) - Observability/ (EsoF advisor, TUI, dashboards) - external_factors/ (EsoF producer) - mc_forewarning_qlabs_fork/ (MC regime/envelope) Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
This commit is contained in:
223
prod/scan_bridge_prefect_flow.py
Executable file
223
prod/scan_bridge_prefect_flow.py
Executable file
@@ -0,0 +1,223 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
DOLPHIN Scan Bridge - Prefect Managed Service
|
||||
==============================================
|
||||
Long-running flow that continuously watches for Arrow scan files
|
||||
and pushes them to Hazelcast. Self-healing via Prefect.
|
||||
|
||||
Usage:
|
||||
prefect deploy scan_bridge_prefect_flow.py:scan_bridge_flow \
|
||||
--name scan-bridge --pool dolphin-services
|
||||
|
||||
prefect worker start --pool dolphin-services
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import time
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timezone
|
||||
from typing import Optional
|
||||
|
||||
sys.path.insert(0, '/mnt/dolphinng5_predict')
|
||||
sys.path.insert(0, '/mnt/dolphinng5_predict/prod')
|
||||
|
||||
from prefect import flow, task, get_run_logger
|
||||
from prefect.runtime import flow_run
|
||||
|
||||
import pyarrow as pa
|
||||
import pyarrow.ipc as ipc
|
||||
import hazelcast
|
||||
|
||||
|
||||
# Configuration
|
||||
ARROW_DIR = Path('/mnt/ng6_data/arrow_scans') / datetime.now().strftime('%Y-%m-%d')
|
||||
HZ_CLUSTER = "dolphin"
|
||||
HZ_HOST = "127.0.0.1:5701"
|
||||
POLL_INTERVAL = 5.0 # seconds when idle
|
||||
HEALTH_LOG_INTERVAL = 60 # log status every 60 iterations (~5 min)
|
||||
|
||||
|
||||
class NumpyEncoder(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
if hasattr(obj, 'item'): # numpy scalar
|
||||
return obj.item()
|
||||
if isinstance(obj, (list, tuple)):
|
||||
return [self.default(x) for x in obj]
|
||||
return super().default(obj)
|
||||
|
||||
|
||||
def load_scan_file(filepath: Path) -> dict:
|
||||
"""Load and parse an Arrow scan file."""
|
||||
with pa.memory_map(str(filepath), 'r') as source:
|
||||
table = ipc.open_file(source).read_all()
|
||||
|
||||
result = {}
|
||||
for col in table.column_names:
|
||||
val = table.column(col)[0].as_py()
|
||||
# Parse JSON columns
|
||||
if col.endswith('_json') and val:
|
||||
result[col.replace('_json', '')] = json.loads(val)
|
||||
else:
|
||||
result[col] = val
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def get_latest_arrow_file() -> Optional[Path]:
|
||||
"""Find the most recently modified .arrow file."""
|
||||
arrow_dir = Path('/mnt/ng6_data/arrow_scans') / datetime.now().strftime('%Y-%m-%d')
|
||||
|
||||
if not arrow_dir.exists():
|
||||
return None
|
||||
|
||||
latest_file = None
|
||||
latest_mtime = 0
|
||||
|
||||
try:
|
||||
with os.scandir(arrow_dir) as it:
|
||||
for entry in it:
|
||||
if entry.name.endswith('.arrow') and entry.is_file():
|
||||
mtime = entry.stat().st_mtime
|
||||
if mtime > latest_mtime:
|
||||
latest_mtime = mtime
|
||||
latest_file = Path(entry.path)
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
|
||||
return latest_file
|
||||
|
||||
|
||||
@task(name="push-to-hazelcast", retries=3, retry_delay_seconds=5)
|
||||
def push_scan_to_hz(scan_data: dict, filepath: Path) -> bool:
|
||||
"""Push scan data to Hazelcast DOLPHIN_FEATURES map."""
|
||||
client = hazelcast.HazelcastClient(
|
||||
cluster_name=HZ_CLUSTER,
|
||||
cluster_members=[HZ_HOST],
|
||||
)
|
||||
|
||||
try:
|
||||
features_map = client.get_map('DOLPHIN_FEATURES').blocking()
|
||||
|
||||
# Add metadata
|
||||
scan_data['bridge_ts'] = datetime.now(timezone.utc).isoformat()
|
||||
scan_data['bridge_source'] = 'scan_bridge_prefect'
|
||||
scan_data['file_mtime'] = filepath.stat().st_mtime
|
||||
|
||||
# Push to Hz
|
||||
features_map.put("latest_eigen_scan", json.dumps(scan_data, cls=NumpyEncoder))
|
||||
|
||||
return True
|
||||
|
||||
finally:
|
||||
client.shutdown()
|
||||
|
||||
|
||||
@task(name="health-check")
|
||||
def check_hz_connection() -> bool:
|
||||
"""Verify Hazelcast connectivity."""
|
||||
try:
|
||||
client = hazelcast.HazelcastClient(
|
||||
cluster_name=HZ_CLUSTER,
|
||||
cluster_members=[HZ_HOST],
|
||||
)
|
||||
client.shutdown()
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
@flow(
|
||||
name="scan-bridge-flow",
|
||||
description="Continuously watch Arrow files and push to Hazelcast",
|
||||
log_prints=True,
|
||||
task_runner=None, # Use default sequential runner
|
||||
)
|
||||
def scan_bridge_flow():
|
||||
"""
|
||||
Main scan bridge flow - runs indefinitely.
|
||||
|
||||
- Watches /mnt/ng6_data/arrow_scans/ for new .arrow files
|
||||
- Pushes parsed data to Hazelcast DOLPHIN_FEATURES
|
||||
- Handles idle periods (no new scans)
|
||||
- Self-healing via Prefect retries/restarts
|
||||
"""
|
||||
logger = get_run_logger()
|
||||
|
||||
logger.info("=" * 70)
|
||||
logger.info("🐬 DOLPHIN Scan Bridge - Prefect Managed")
|
||||
logger.info("=" * 70)
|
||||
logger.info(f"Arrow directory: {ARROW_DIR}")
|
||||
logger.info(f"Hazelcast: {HZ_HOST} (cluster: {HZ_CLUSTER})")
|
||||
logger.info(f"Poll interval: {POLL_INTERVAL}s")
|
||||
logger.info("=" * 70)
|
||||
|
||||
# Health check
|
||||
if not check_hz_connection():
|
||||
logger.error("❌ Cannot connect to Hazelcast - exiting")
|
||||
raise RuntimeError("Hazelcast connection failed")
|
||||
|
||||
logger.info("✅ Connected to Hazelcast")
|
||||
|
||||
last_scan_number = -1
|
||||
last_file_mtime = 0
|
||||
iterations = 0
|
||||
scans_pushed = 0
|
||||
|
||||
try:
|
||||
while True:
|
||||
iterations += 1
|
||||
|
||||
# Find latest file
|
||||
latest_file = get_latest_arrow_file()
|
||||
|
||||
if not latest_file:
|
||||
if iterations % HEALTH_LOG_INTERVAL == 0:
|
||||
logger.info(f"⏳ No arrow files yet (iteration {iterations})")
|
||||
time.sleep(POLL_INTERVAL)
|
||||
continue
|
||||
|
||||
# Check if file is new
|
||||
mtime = latest_file.stat().st_mtime
|
||||
if mtime <= last_file_mtime:
|
||||
if iterations % HEALTH_LOG_INTERVAL == 0:
|
||||
logger.info(f"⏳ Idle - waiting for new scans (pushed: {scans_pushed})")
|
||||
time.sleep(POLL_INTERVAL)
|
||||
continue
|
||||
|
||||
# New file found - process it
|
||||
try:
|
||||
scan_data = load_scan_file(latest_file)
|
||||
scan_number = scan_data.get('scan_number', 0)
|
||||
|
||||
# Push to Hz
|
||||
push_scan_to_hz(scan_data, latest_file)
|
||||
|
||||
last_file_mtime = mtime
|
||||
scans_pushed += 1
|
||||
|
||||
# Log every 10 scans
|
||||
if scans_pushed % 10 == 0:
|
||||
logger.info(f"📊 Pushed {scans_pushed} scans (latest: #{scan_number})")
|
||||
else:
|
||||
logger.debug(f"Pushed scan #{scan_number}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing {latest_file.name}: {e}")
|
||||
time.sleep(1.0) # Brief delay on error
|
||||
continue
|
||||
|
||||
time.sleep(POLL_INTERVAL)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
logger.info("🛑 Interrupted by user")
|
||||
except Exception as e:
|
||||
logger.error(f"❌ Fatal error: {e}")
|
||||
raise
|
||||
finally:
|
||||
logger.info(f"✅ Scan bridge stopped. Total scans pushed: {scans_pushed}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
scan_bridge_flow()
|
||||
Reference in New Issue
Block a user