Files
DOLPHIN/prod/deploy_services_prefect.py

140 lines
4.1 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
DOLPHIN Services Prefect Deployment
====================================
Deploy all individual services to Prefect.
Usage:
python deploy_services_prefect.py [deploy|status|start|stop]
"""
import subprocess
import sys
import argparse
SERVICES = [
('scan-bridge', 'prefect_services/scan_bridge_prefect.py:scan_bridge_daemon_flow'),
('acb-service', 'prefect_services/acb_service_prefect.py:acb_service_flow'),
('watchdog-service', 'prefect_services/watchdog_service_prefect.py:watchdog_service_flow'),
]
POOL_NAME = "dolphin-services"
def run_cmd(cmd, check=True):
"""Run a command."""
print(f"$ {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.stdout:
print(result.stdout)
if result.stderr and check:
print(result.stderr, file=sys.stderr)
return result
def deploy():
"""Deploy all services."""
print("🚀 Deploying DOLPHIN Services to Prefect")
print("=" * 60)
# Create pool if needed
result = run_cmd(['prefect', 'work-pool', 'ls'], check=False)
if POOL_NAME not in result.stdout:
print(f"\n📦 Creating work pool: {POOL_NAME}")
run_cmd(['prefect', 'work-pool', 'create', POOL_NAME, '--type', 'process'])
# Deploy each service
for name, entrypoint in SERVICES:
print(f"\n🔨 Deploying {name}...")
yaml_file = f"{name.replace('-', '_')}_deployment.yaml"
# Build
run_cmd([
'prefect', 'deployment', 'build',
entrypoint,
'--name', name,
'--pool', POOL_NAME,
'--output', yaml_file
])
# Apply
run_cmd(['prefect', 'deployment', 'apply', yaml_file])
print("\n" + "=" * 60)
print("✅ All services deployed!")
print(f"\nStart worker with:")
print(f" prefect worker start --pool {POOL_NAME}")
def status():
"""Check service status."""
print("📊 DOLPHIN Services Status")
print("=" * 60)
# Check deployments
print("\nPrefect Deployments:")
run_cmd(['prefect', 'deployment', 'ls'])
# Check running processes
print("\nRunning Processes:")
result = subprocess.run(
['pgrep', '-a', '-f', 'scan_bridge|acb_processor|system_watchdog|exf_prefect|obf_prefect'],
capture_output=True, text=True
)
if result.stdout:
for line in result.stdout.strip().split('\n'):
if 'grep' not in line:
print(f" {line}")
else:
print(" No services running")
# Check Hz data
print("\nHazelcast Data:")
try:
import hazelcast
client = hazelcast.HazelcastClient(
cluster_name="dolphin",
cluster_members=["127.0.0.1:5701"],
)
features = client.get_map('DOLPHIN_FEATURES').blocking()
safety = client.get_map('DOLPHIN_SAFETY').blocking()
checks = [
('latest_eigen_scan', features.get('latest_eigen_scan') is not None),
('acb_boost', features.get('acb_boost') is not None),
('exf_latest', features.get('exf_latest') is not None),
('safety_latest', safety.get('latest') is not None),
]
for name, present in checks:
icon = "" if present else ""
print(f" {icon} {name}")
client.shutdown()
except Exception as e:
print(f" Error: {e}")
def main():
parser = argparse.ArgumentParser(description="Manage DOLPHIN services in Prefect")
parser.add_argument('action', choices=['deploy', 'status', 'start', 'stop'],
default='status', nargs='?')
args = parser.parse_args()
if args.action == 'deploy':
deploy()
elif args.action == 'status':
status()
elif args.action == 'start':
print("Start the Prefect worker:")
print(f" prefect worker start --pool {POOL_NAME}")
elif args.action == 'stop':
print("To stop services:")
print(" pkill -f 'scan_bridge|acb_processor|system_watchdog'")
print("Or stop the Prefect worker with Ctrl+C")
if __name__ == "__main__":
main()