#!/usr/bin/env python3 """ DOLPHIN Scan Bridge Prefect Deployment Script ============================================== Deploy the scan bridge daemon to Prefect. Usage: python scan_bridge_deploy.py [create|start|stop|status] """ import sys import subprocess from pathlib import Path PROJECT_ROOT = Path(__file__).parent.parent sys.path.insert(0, str(PROJECT_ROOT)) sys.path.insert(0, str(PROJECT_ROOT / 'nautilus_dolphin')) DAEMON_POOL = "dolphin-daemon-pool" DAEMON_NAME = "scan-bridge-daemon" DAEMON_FILE = "prod/scan_bridge_prefect_daemon.py" def run_cmd(cmd: list, check: bool = True) -> subprocess.CompletedProcess: """Run a command and return result.""" print(f"$ {' '.join(cmd)}") return subprocess.run(cmd, check=check, capture_output=True, text=True) def create_deployment(): """Create the Prefect deployment.""" print("=" * 60) print("Creating Scan Bridge Prefect Deployment") print("=" * 60) # Check if pool exists result = run_cmd(["prefect", "work-pool", "ls"], check=False) if DAEMON_POOL not in result.stdout: print(f"\nšŸ“¦ Creating work pool: {DAEMON_POOL}") run_cmd(["prefect", "work-pool", "create", DAEMON_POOL, "--type", "process"]) else: print(f"\nāœ… Work pool exists: {DAEMON_POOL}") # Build deployment print(f"\nšŸ”Ø Building deployment...") cmd = [ "prefect", "deployment", "build", f"{DAEMON_FILE}:scan_bridge_daemon_flow", "--name", DAEMON_NAME, "--pool", DAEMON_POOL, "--output", "scan-bridge-daemon-deployment.yaml" ] run_cmd(cmd) # Apply deployment print(f"\nšŸš€ Applying deployment...") run_cmd(["prefect", "deployment", "apply", "scan-bridge-daemon-deployment.yaml"]) print("\n" + "=" * 60) print("āœ… Deployment created!") print("=" * 60) print(f"\nTo start the daemon:") print(f" python scan_bridge_deploy.py start") print(f"\nOr manually:") print(f" prefect deployment run 'scan-bridge-daemon/scan-bridge-daemon'") print(f" prefect worker start --pool {DAEMON_POOL}") def start_worker(): """Start the Prefect worker for the daemon pool.""" print("=" * 60) print("Starting Prefect Worker") print("=" * 60) print(f"\nPool: {DAEMON_POOL}") print("Press Ctrl+C to stop\n") try: subprocess.run(["prefect", "worker", "start", "--pool", DAEMON_POOL]) except KeyboardInterrupt: print("\n\nšŸ‘‹ Worker stopped") def check_status(): """Check deployment status.""" print("=" * 60) print("Scan Bridge Prefect Status") print("=" * 60) # List deployments print("\nšŸ“‹ Deployments:") result = run_cmd(["prefect", "deployment", "ls"], check=False) if DAEMON_NAME in result.stdout: for line in result.stdout.split('\n'): if DAEMON_NAME in line: print(f" {line}") else: print(" āŒ No deployment found") # List work pools print("\nšŸŠ Work Pools:") result = run_cmd(["prefect", "work-pool", "ls"], check=False) if DAEMON_POOL in result.stdout: for line in result.stdout.split('\n'): if DAEMON_POOL in line: print(f" {line}") else: print(f" āŒ Pool '{DAEMON_POOL}' not found") # Check flow runs print("\nšŸ”„ Recent Flow Runs:") result = run_cmd(["prefect", "flow-run", "ls", "--limit", "5"], check=False) if result.stdout: print(result.stdout) def quick_health_check(): """Run standalone health check.""" print("=" * 60) print("Scan Bridge Health Check") print("=" * 60) import sys sys.path.insert(0, '/mnt/dolphinng5_predict/prod') from scan_bridge_prefect_daemon import quick_health_check as check result = check() print("\n" + "=" * 60) status = result.get("status", "unknown") age = result.get("age_sec", 0) if status == "healthy": print(f"āœ… HEALTHY: Data age {age:.0f}s") elif status == "warning": print(f"āš ļø WARNING: Data age {age:.0f}s") elif status == "stale": print(f"āŒ STALE: Data age {age:.0f}s") else: print(f"āŒ ERROR: {status}") def main(): import argparse parser = argparse.ArgumentParser( description="Deploy and manage Scan Bridge Prefect daemon" ) parser.add_argument( "action", choices=["create", "start", "stop", "status", "health"], default="status", nargs="?", help="Action to perform" ) args = parser.parse_args() if args.action == "create": create_deployment() elif args.action == "start": start_worker() elif args.action == "status": check_status() elif args.action == "health": quick_health_check() elif args.action == "stop": print("To stop: Press Ctrl+C in the worker terminal") print("Or: pkill -f 'prefect worker'") else: parser.print_help() if __name__ == "__main__": main()