Files
DOLPHIN/prod/docs/SCAN_BRIDGE_PREFECT_INTEGRATION_STUDY.md
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

19 KiB
Executable File

Scan Bridge Prefect Integration Study

Date: 2026-03-24
Version: v1.0
Status: Analysis Complete - Recommendation: Hybrid Approach


Executive Summary

The Scan Bridge Service can be integrated into Prefect orchestration, but NOT as a standard flow task. Due to its continuous watchdog nature (file system monitoring), it requires special handling. The recommended approach is a hybrid architecture where the bridge runs as a standalone supervised service with Prefect providing health monitoring and automatic restart capabilities.


1. Current Architecture (Standalone)

┌─────────────────────────────────────────────────────────────────┐
│                  CURRENT: Standalone Service                    │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────┐      ┌─────────────────┐                   │
│  │  scan_bridge_   │─────▶│   Hazelcast     │                   │
│  │  service.py     │      │   (SSOT)        │                   │
│  │                 │      │                 │                   │
│  │  • watchdog     │      │  latest_eigen_  │                   │
│  │  • mtime-based  │      │  scan           │                   │
│  │  • continuous   │      │                 │                   │
│  └─────────────────┘      └─────────────────┘                   │
│           ▲                                                     │
│           │ watches                                             │
│  ┌────────┴─────────────────┐                                   │
│  │  /mnt/ng6_data/arrow_    │                                   │
│  │  scans/YYYY-MM-DD/*.     │                                   │
│  │  arrow                   │                                   │
│  └──────────────────────────┘                                   │
│                                                                 │
│  MANAGEMENT: Manual (./scan_bridge_restart.sh)                  │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Current Issues

  • No automatic restart on crash
  • No health monitoring
  • No integration with system-wide orchestration
  • Manual log rotation

2. Integration Options Analysis

Option A: Prefect Flow Task (REJECTED)

Concept: Run scan bridge as a Prefect flow task

@flow
def scan_bridge_flow():
    while True:  # ← PROBLEM: Infinite loop in task
        scan_files()
        sleep(1)

Why Rejected:

Issue Explanation
Task Timeout Prefect tasks have default 3600s timeout
Worker Lock Blocks Prefect worker indefinitely
Resource Waste Prefect worker tied up doing file watching
Anti-pattern Prefect is for discrete workflows, not continuous daemons

Verdict: Not suitable


Concept: Use Prefect's infrastructure to manage the bridge as a long-running service

┌─────────────────────────────────────────────────────────────────┐
│              RECOMMENDED: Prefect-Supervised Daemon             │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌──────────────────────────────────────────────────────┐      │
│  │         Prefect Server (localhost:4200)              │      │
│  │                                                      │      │
│  │  ┌────────────────┐    ┌─────────────────────────┐  │      │
│  │  │ Health Check   │───▶│  Scan Bridge Deployment │  │      │
│  │  │ Flow (30s)     │    │  (type: daemon)         │  │      │
│  │  └────────────────┘    └─────────────────────────┘  │      │
│  │           │                       │                  │      │
│  │           │ monitors              │ manages          │      │
│  │           ▼                       ▼                  │      │
│  │  ┌─────────────────────────────────────────────┐    │      │
│  │  │     scan_bridge_service.py process          │    │      │
│  │  │     • systemd/Prefect managed               │    │      │
│  │  │     • auto-restart on failure               │    │      │
│  │  │     • stdout/stderr to Prefect logs         │    │      │
│  │  └─────────────────────────────────────────────┘    │      │
│  └──────────────────────────────────────────────────────┘      │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Implementation:

# scan_bridge_prefect_daemon.py
from prefect import flow, task, get_run_logger
from prefect.runner import Runner
import subprocess
import time
import signal
import sys

DAEMON_CMD = [sys.executable, "/mnt/dolphinng5_predict/prod/scan_bridge_service.py"]

class ScanBridgeDaemon:
    def __init__(self):
        self.process = None
        self.logger = get_run_logger()
    
    def start(self):
        """Start the scan bridge daemon."""
        self.logger.info("Starting Scan Bridge daemon...")
        self.process = subprocess.Popen(
            DAEMON_CMD,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            universal_newlines=True
        )
        # Wait for startup confirmation
        time.sleep(2)
        if self.process.poll() is None:
            self.logger.info(f"✓ Daemon started (PID: {self.process.pid})")
            return True
        else:
            self.logger.error("✗ Daemon failed to start")
            return False
    
    def health_check(self) -> bool:
        """Check if daemon is healthy."""
        if self.process is None:
            return False
        
        # Check process is running
        if self.process.poll() is not None:
            self.logger.error(f"Daemon exited with code {self.process.poll()}")
            return False
        
        # Check Hazelcast for recent data
        from dolphin_hz_utils import check_scan_freshness
        try:
            age_sec = check_scan_freshness()
            if age_sec > 60:  # Data older than 60s
                self.logger.warning(f"Stale data detected (age: {age_sec}s)")
                return False
            return True
        except Exception as e:
            self.logger.error(f"Health check failed: {e}")
            return False
    
    def stop(self):
        """Stop the daemon gracefully."""
        if self.process and self.process.poll() is None:
            self.logger.info("Stopping daemon...")
            self.process.send_signal(signal.SIGTERM)
            self.process.wait(timeout=5)
            self.logger.info("✓ Daemon stopped")

# Global daemon instance
daemon = ScanBridgeDaemon()

@flow(name="scan-bridge-daemon")
def scan_bridge_daemon_flow():
    """
    Long-running Prefect flow that manages the scan bridge daemon.
    This flow runs indefinitely, monitoring and restarting the bridge as needed.
    """
    logger = get_run_logger()
    logger.info("=" * 60)
    logger.info("🐬 Scan Bridge Daemon Manager (Prefect)")
    logger.info("=" * 60)
    
    # Initial start
    if not daemon.start():
        raise RuntimeError("Failed to start daemon")
    
    try:
        while True:
            # Health check every 30 seconds
            time.sleep(30)
            
            if not daemon.health_check():
                logger.warning("Health check failed, restarting daemon...")
                daemon.stop()
                time.sleep(1)
                if daemon.start():
                    logger.info("✓ Daemon restarted")
                else:
                    logger.error("✗ Failed to restart daemon")
                    raise RuntimeError("Daemon restart failed")
            else:
                logger.debug("Health check passed")
                
    except KeyboardInterrupt:
        logger.info("Shutting down...")
    finally:
        daemon.stop()

if __name__ == "__main__":
    # Deploy as long-running daemon
    scan_bridge_daemon_flow()

Pros:

Advantage Description
Auto-restart Prefect manages process lifecycle
Centralized Logs Bridge logs in Prefect UI
Health Monitoring Automatic detection of stale data
Integration Part of overall orchestration

Cons:

Disadvantage Mitigation
Requires Prefect worker Use dedicated worker pool
Flow never completes Mark as "daemon" deployment type

Option C: Systemd Service with Prefect Monitoring (ALTERNATIVE)

Concept: Use systemd for process management, Prefect for health checks

┌─────────────────────────────────────────────────────────────────┐
│           ALTERNATIVE: Systemd + Prefect Monitoring             │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  ┌─────────────────┐      ┌─────────────────┐                   │
│  │     systemd     │      │    Prefect      │                   │
│  │                 │      │    Server       │                   │
│  │  ┌───────────┐  │      │                 │                   │
│  │  │ scan-bridge│◀─┼──────┤  Health Check   │                   │
│  │  │  service  │  │      │  Flow (60s)     │                   │
│  │  │  (auto-   │  │      │                 │                   │
│  │  │  restart) │  │      │  Alerts on:     │                   │
│  │  └───────────┘  │      │  • stale data   │                   │
│  │       │         │      │  • process down │                   │
│  │       ▼         │      │                 │                   │
│  │  ┌───────────┐  │      │                 │                   │
│  │  │  journald │──┼──────┤  Log ingestion  │                   │
│  │  │  (logs)   │  │      │                 │                   │
│  │  └───────────┘  │      │                 │                   │
│  └─────────────────┘      └─────────────────┘                   │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Systemd Service:

# /etc/systemd/system/dolphin-scan-bridge.service
[Unit]
Description=DOLPHIN Scan Bridge Service
After=network.target hazelcast.service
Wants=hazelcast.service

[Service]
Type=simple
User=dolphin
Group=dolphin
WorkingDirectory=/mnt/dolphinng5_predict/prod
Environment="PATH=/home/dolphin/siloqy_env/bin"
ExecStart=/home/dolphin/siloqy_env/bin/python3 \
          /mnt/dolphinng5_predict/prod/scan_bridge_service.py
Restart=always
RestartSec=5
StartLimitInterval=60s
StartLimitBurst=3
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target

Prefect Health Check Flow:

@flow(name="scan-bridge-health-check")
def scan_bridge_health_check():
    """Periodic health check for scan bridge (runs every 60s)."""
    logger = get_run_logger()
    
    # Check 1: Process running
    result = subprocess.run(
        ["systemctl", "is-active", "dolphin-scan-bridge"],
        capture_output=True
    )
    if result.returncode != 0:
        logger.error("❌ Scan bridge service DOWN")
        send_alert("Scan bridge service not active")
        return False
    
    # Check 2: Data freshness
    age_sec = check_hz_scan_freshness()
    if age_sec > 60:
        logger.error(f"❌ Stale data detected (age: {age_sec}s)")
        send_alert(f"Scan data stale: {age_sec}s old")
        return False
    
    logger.info(f"✅ Healthy (data age: {age_sec}s)")
    return True

Pros:

  • Industry-standard process management
  • Automatic restart on crash
  • Independent of Prefect availability

Cons:

  • Requires root access for systemd
  • Log aggregation separate from Prefect
  • Two systems to manage

3. Comparative Analysis

Criteria Option A (Flow Task) Option B (Prefect Daemon) Option C (Systemd + Prefect)
Complexity Low Medium Medium
Auto-restart No Yes Yes (systemd)
Centralized Logs Yes Yes ⚠️ Partial (journald)
Prefect Integration Poor Full ⚠️ Monitoring only
Resource Usage High (blocks worker) Efficient Efficient
Restart Speed N/A ~5 seconds ~5 seconds
Root Required No No Yes
Production Ready No Yes Yes

4. Recommendation

Primary: Option B - Prefect Daemon Service

Rationale:

  1. Unified orchestration - Everything in Prefect (flows, logs, alerts)
  2. No root required - Runs as dolphin user
  3. Auto-restart - Prefect manages lifecycle
  4. Health monitoring - Built-in stale data detection

Deployment Plan:

# 1. Create deployment
cd /mnt/dolphinng5_predict/prod
prefect deployment build \
    scan_bridge_prefect_daemon.py:scan_bridge_daemon_flow \
    --name "scan-bridge-daemon" \
    --pool dolphin-daemon-pool \
    --type process

# 2. Configure as long-running
cat >> prefect.yaml << 'EOF'
deployments:
  - name: scan-bridge-daemon
    entrypoint: scan_bridge_prefect_daemon.py:scan_bridge_daemon_flow
    work_pool:
      name: dolphin-daemon-pool
    parameters: {}
    # Long-running daemon settings
    enforce_parameter_schema: false
    schedules: []
    is_schedule_active: true
EOF

# 3. Deploy
prefect deployment apply scan_bridge_daemon-deployment.yaml

# 4. Start daemon worker
prefect worker start --pool dolphin-daemon-pool

Secondary: Option C - Systemd (if Prefect unstable)

If Prefect server experiences downtime, systemd ensures the bridge continues running.


5. Implementation Phases

Phase 1: Immediate (Today)

  • Created scan_bridge_restart.sh wrapper
  • Created dolphin-scan-bridge.service systemd file
  • Use manual script for now

Phase 2: Prefect Integration (Next Sprint)

  • Create scan_bridge_prefect_daemon.py
  • Implement health check flow
  • Set up daemon worker pool
  • Deploy to Prefect
  • Configure alerting

Phase 3: Monitoring Hardening

  • Dashboard for scan bridge metrics
  • Alert on data staleness > 30s
  • Log rotation strategy
  • Performance metrics (lag from file write to Hz push)

6. Health Check Specifications

Metrics to Monitor

Metric Warning Critical Action
Data age > 30s > 60s Alert / Restart
Process CPU > 50% > 80% Investigate
Memory > 100MB > 500MB Restart
Hz connection - Failed Restart
Files processed < 1/min < 1/5min Alert

Alerting Rules

ALERT_RULES = {
    "stale_data": {
        "condition": "hz_data_age > 60",
        "severity": "critical",
        "action": "restart_bridge",
        "notify": ["ops", "trading"]
    },
    "high_lag": {
        "condition": "file_to_hz_lag > 10",
        "severity": "warning",
        "action": "log_only",
        "notify": ["ops"]
    },
    "process_crash": {
        "condition": "process_exit_code != 0",
        "severity": "critical",
        "action": "auto_restart",
        "notify": ["ops"]
    }
}

7. Conclusion

The scan bridge SHOULD be integrated into Prefect orchestration using Option B (Prefect Daemon). This provides:

  1. Automatic management - Start, stop, restart handled by Prefect
  2. Unified observability - Logs, metrics, alerts in one place
  3. Self-healing - Automatic restart on failure
  4. No root required - Runs as dolphin user

Next Steps:

  1. Implement scan_bridge_prefect_daemon.py
  2. Create Prefect deployment
  3. Add to SYSTEM_BIBLE v4.1

Document: SCAN_BRIDGE_PREFECT_INTEGRATION_STUDY.md
Version: 1.0
Author: DOLPHIN System Architecture
Date: 2026-03-24