# Scan Bridge Prefect Integration Study **Date:** 2026-03-24 **Version:** v1.0 **Status:** Analysis Complete - Recommendation: Hybrid Approach --- ## Executive Summary The Scan Bridge Service can be integrated into Prefect orchestration, but **NOT as a standard flow task**. Due to its continuous watchdog nature (file system monitoring), it requires special handling. The recommended approach is a **hybrid architecture** where the bridge runs as a standalone supervised service with Prefect providing health monitoring and automatic restart capabilities. --- ## 1. Current Architecture (Standalone) ``` ┌─────────────────────────────────────────────────────────────────┐ │ CURRENT: Standalone Service │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────┐ ┌─────────────────┐ │ │ │ scan_bridge_ │─────▶│ Hazelcast │ │ │ │ service.py │ │ (SSOT) │ │ │ │ │ │ │ │ │ │ • watchdog │ │ latest_eigen_ │ │ │ │ • mtime-based │ │ scan │ │ │ │ • continuous │ │ │ │ │ └─────────────────┘ └─────────────────┘ │ │ ▲ │ │ │ watches │ │ ┌────────┴─────────────────┐ │ │ │ /mnt/ng6_data/arrow_ │ │ │ │ scans/YYYY-MM-DD/*. │ │ │ │ arrow │ │ │ └──────────────────────────┘ │ │ │ │ MANAGEMENT: Manual (./scan_bridge_restart.sh) │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` ### Current Issues - ❌ No automatic restart on crash - ❌ No health monitoring - ❌ No integration with system-wide orchestration - ❌ Manual log rotation --- ## 2. Integration Options Analysis ### Option A: Prefect Flow Task (REJECTED) **Concept:** Run scan bridge as a Prefect flow task ```python @flow def scan_bridge_flow(): while True: # ← PROBLEM: Infinite loop in task scan_files() sleep(1) ``` **Why Rejected:** | Issue | Explanation | |-------|-------------| | **Task Timeout** | Prefect tasks have default 3600s timeout | | **Worker Lock** | Blocks Prefect worker indefinitely | | **Resource Waste** | Prefect worker tied up doing file watching | | **Anti-pattern** | Prefect is for discrete workflows, not continuous daemons | **Verdict:** ❌ Not suitable --- ### Option B: Prefect Daemon Service (RECOMMENDED) **Concept:** Use Prefect's infrastructure to manage the bridge as a long-running service ``` ┌─────────────────────────────────────────────────────────────────┐ │ RECOMMENDED: Prefect-Supervised Daemon │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────────────────┐ │ │ │ Prefect Server (localhost:4200) │ │ │ │ │ │ │ │ ┌────────────────┐ ┌─────────────────────────┐ │ │ │ │ │ Health Check │───▶│ Scan Bridge Deployment │ │ │ │ │ │ Flow (30s) │ │ (type: daemon) │ │ │ │ │ └────────────────┘ └─────────────────────────┘ │ │ │ │ │ │ │ │ │ │ │ monitors │ manages │ │ │ │ ▼ ▼ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ │ scan_bridge_service.py process │ │ │ │ │ │ • systemd/Prefect managed │ │ │ │ │ │ • auto-restart on failure │ │ │ │ │ │ • stdout/stderr to Prefect logs │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` **Implementation:** ```python # scan_bridge_prefect_daemon.py from prefect import flow, task, get_run_logger from prefect.runner import Runner import subprocess import time import signal import sys DAEMON_CMD = [sys.executable, "/mnt/dolphinng5_predict/prod/scan_bridge_service.py"] class ScanBridgeDaemon: def __init__(self): self.process = None self.logger = get_run_logger() def start(self): """Start the scan bridge daemon.""" self.logger.info("Starting Scan Bridge daemon...") self.process = subprocess.Popen( DAEMON_CMD, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True ) # Wait for startup confirmation time.sleep(2) if self.process.poll() is None: self.logger.info(f"✓ Daemon started (PID: {self.process.pid})") return True else: self.logger.error("✗ Daemon failed to start") return False def health_check(self) -> bool: """Check if daemon is healthy.""" if self.process is None: return False # Check process is running if self.process.poll() is not None: self.logger.error(f"Daemon exited with code {self.process.poll()}") return False # Check Hazelcast for recent data from dolphin_hz_utils import check_scan_freshness try: age_sec = check_scan_freshness() if age_sec > 60: # Data older than 60s self.logger.warning(f"Stale data detected (age: {age_sec}s)") return False return True except Exception as e: self.logger.error(f"Health check failed: {e}") return False def stop(self): """Stop the daemon gracefully.""" if self.process and self.process.poll() is None: self.logger.info("Stopping daemon...") self.process.send_signal(signal.SIGTERM) self.process.wait(timeout=5) self.logger.info("✓ Daemon stopped") # Global daemon instance daemon = ScanBridgeDaemon() @flow(name="scan-bridge-daemon") def scan_bridge_daemon_flow(): """ Long-running Prefect flow that manages the scan bridge daemon. This flow runs indefinitely, monitoring and restarting the bridge as needed. """ logger = get_run_logger() logger.info("=" * 60) logger.info("🐬 Scan Bridge Daemon Manager (Prefect)") logger.info("=" * 60) # Initial start if not daemon.start(): raise RuntimeError("Failed to start daemon") try: while True: # Health check every 30 seconds time.sleep(30) if not daemon.health_check(): logger.warning("Health check failed, restarting daemon...") daemon.stop() time.sleep(1) if daemon.start(): logger.info("✓ Daemon restarted") else: logger.error("✗ Failed to restart daemon") raise RuntimeError("Daemon restart failed") else: logger.debug("Health check passed") except KeyboardInterrupt: logger.info("Shutting down...") finally: daemon.stop() if __name__ == "__main__": # Deploy as long-running daemon scan_bridge_daemon_flow() ``` **Pros:** | Advantage | Description | |-----------|-------------| | Auto-restart | Prefect manages process lifecycle | | Centralized Logs | Bridge logs in Prefect UI | | Health Monitoring | Automatic detection of stale data | | Integration | Part of overall orchestration | **Cons:** | Disadvantage | Mitigation | |--------------|------------| | Requires Prefect worker | Use dedicated worker pool | | Flow never completes | Mark as "daemon" deployment type | --- ### Option C: Systemd Service with Prefect Monitoring (ALTERNATIVE) **Concept:** Use systemd for process management, Prefect for health checks ``` ┌─────────────────────────────────────────────────────────────────┐ │ ALTERNATIVE: Systemd + Prefect Monitoring │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────┐ ┌─────────────────┐ │ │ │ systemd │ │ Prefect │ │ │ │ │ │ Server │ │ │ │ ┌───────────┐ │ │ │ │ │ │ │ scan-bridge│◀─┼──────┤ Health Check │ │ │ │ │ service │ │ │ Flow (60s) │ │ │ │ │ (auto- │ │ │ │ │ │ │ │ restart) │ │ │ Alerts on: │ │ │ │ └───────────┘ │ │ • stale data │ │ │ │ │ │ │ • process down │ │ │ │ ▼ │ │ │ │ │ │ ┌───────────┐ │ │ │ │ │ │ │ journald │──┼──────┤ Log ingestion │ │ │ │ │ (logs) │ │ │ │ │ │ │ └───────────┘ │ │ │ │ │ └─────────────────┘ └─────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘ ``` **Systemd Service:** ```ini # /etc/systemd/system/dolphin-scan-bridge.service [Unit] Description=DOLPHIN Scan Bridge Service After=network.target hazelcast.service Wants=hazelcast.service [Service] Type=simple User=dolphin Group=dolphin WorkingDirectory=/mnt/dolphinng5_predict/prod Environment="PATH=/home/dolphin/siloqy_env/bin" ExecStart=/home/dolphin/siloqy_env/bin/python3 \ /mnt/dolphinng5_predict/prod/scan_bridge_service.py Restart=always RestartSec=5 StartLimitInterval=60s StartLimitBurst=3 StandardOutput=journal StandardError=journal [Install] WantedBy=multi-user.target ``` **Prefect Health Check Flow:** ```python @flow(name="scan-bridge-health-check") def scan_bridge_health_check(): """Periodic health check for scan bridge (runs every 60s).""" logger = get_run_logger() # Check 1: Process running result = subprocess.run( ["systemctl", "is-active", "dolphin-scan-bridge"], capture_output=True ) if result.returncode != 0: logger.error("❌ Scan bridge service DOWN") send_alert("Scan bridge service not active") return False # Check 2: Data freshness age_sec = check_hz_scan_freshness() if age_sec > 60: logger.error(f"❌ Stale data detected (age: {age_sec}s)") send_alert(f"Scan data stale: {age_sec}s old") return False logger.info(f"✅ Healthy (data age: {age_sec}s)") return True ``` **Pros:** - Industry-standard process management - Automatic restart on crash - Independent of Prefect availability **Cons:** - Requires root access for systemd - Log aggregation separate from Prefect - Two systems to manage --- ## 3. Comparative Analysis | Criteria | Option A (Flow Task) | Option B (Prefect Daemon) | Option C (Systemd + Prefect) | |----------|---------------------|---------------------------|------------------------------| | **Complexity** | Low | Medium | Medium | | **Auto-restart** | ❌ No | ✅ Yes | ✅ Yes (systemd) | | **Centralized Logs** | ✅ Yes | ✅ Yes | ⚠️ Partial (journald) | | **Prefect Integration** | ❌ Poor | ✅ Full | ⚠️ Monitoring only | | **Resource Usage** | ❌ High (blocks worker) | ✅ Efficient | ✅ Efficient | | **Restart Speed** | N/A | ~5 seconds | ~5 seconds | | **Root Required** | ❌ No | ❌ No | ✅ Yes | | **Production Ready** | ❌ No | ✅ Yes | ✅ Yes | --- ## 4. Recommendation ### Primary: Option B - Prefect Daemon Service **Rationale:** 1. **Unified orchestration** - Everything in Prefect (flows, logs, alerts) 2. **No root required** - Runs as dolphin user 3. **Auto-restart** - Prefect manages lifecycle 4. **Health monitoring** - Built-in stale data detection **Deployment Plan:** ```bash # 1. Create deployment cd /mnt/dolphinng5_predict/prod prefect deployment build \ scan_bridge_prefect_daemon.py:scan_bridge_daemon_flow \ --name "scan-bridge-daemon" \ --pool dolphin-daemon-pool \ --type process # 2. Configure as long-running cat >> prefect.yaml << 'EOF' deployments: - name: scan-bridge-daemon entrypoint: scan_bridge_prefect_daemon.py:scan_bridge_daemon_flow work_pool: name: dolphin-daemon-pool parameters: {} # Long-running daemon settings enforce_parameter_schema: false schedules: [] is_schedule_active: true EOF # 3. Deploy prefect deployment apply scan_bridge_daemon-deployment.yaml # 4. Start daemon worker prefect worker start --pool dolphin-daemon-pool ``` ### Secondary: Option C - Systemd (if Prefect unstable) If Prefect server experiences downtime, systemd ensures the bridge continues running. --- ## 5. Implementation Phases ### Phase 1: Immediate (Today) - ✅ Created `scan_bridge_restart.sh` wrapper - ✅ Created `dolphin-scan-bridge.service` systemd file - Use manual script for now ### Phase 2: Prefect Integration (Next Sprint) - [ ] Create `scan_bridge_prefect_daemon.py` - [ ] Implement health check flow - [ ] Set up daemon worker pool - [ ] Deploy to Prefect - [ ] Configure alerting ### Phase 3: Monitoring Hardening - [ ] Dashboard for scan bridge metrics - [ ] Alert on data staleness > 30s - [ ] Log rotation strategy - [ ] Performance metrics (lag from file write to Hz push) --- ## 6. Health Check Specifications ### Metrics to Monitor | Metric | Warning | Critical | Action | |--------|---------|----------|--------| | Data age | > 30s | > 60s | Alert / Restart | | Process CPU | > 50% | > 80% | Investigate | | Memory | > 100MB | > 500MB | Restart | | Hz connection | - | Failed | Restart | | Files processed | < 1/min | < 1/5min | Alert | ### Alerting Rules ```python ALERT_RULES = { "stale_data": { "condition": "hz_data_age > 60", "severity": "critical", "action": "restart_bridge", "notify": ["ops", "trading"] }, "high_lag": { "condition": "file_to_hz_lag > 10", "severity": "warning", "action": "log_only", "notify": ["ops"] }, "process_crash": { "condition": "process_exit_code != 0", "severity": "critical", "action": "auto_restart", "notify": ["ops"] } } ``` --- ## 7. Conclusion The scan bridge **SHOULD** be integrated into Prefect orchestration using **Option B (Prefect Daemon)**. This provides: 1. **Automatic management** - Start, stop, restart handled by Prefect 2. **Unified observability** - Logs, metrics, alerts in one place 3. **Self-healing** - Automatic restart on failure 4. **No root required** - Runs as dolphin user **Next Steps:** 1. Implement `scan_bridge_prefect_daemon.py` 2. Create Prefect deployment 3. Add to SYSTEM_BIBLE v4.1 --- **Document:** SCAN_BRIDGE_PREFECT_INTEGRATION_STUDY.md **Version:** 1.0 **Author:** DOLPHIN System Architecture **Date:** 2026-03-24