#!/usr/bin/env python3 """ Deploy ExF Fetcher as Prefect flow Usage: python exf_prefect_deploy.py deploy # Create deployment python exf_prefect_deploy.py run # Run immediately """ import sys import subprocess from pathlib import Path sys.path.insert(0, str(Path(__file__).parent)) sys.path.insert(0, str(Path(__file__).parent.parent / "external_factors")) from prefect import flow, task, get_run_logger from datetime import datetime, timezone HZ_PUSH_INTERVAL_S = 0.5 HZ_KEY = "exf_latest" HZ_MAP_NAME = "DOLPHIN_FEATURES" ACB_KEYS = frozenset([ "funding_btc", "funding_eth", "dvol_btc", "dvol_eth", "fng", "vix", "ls_btc", "taker", "oi_btc", ]) @task(name="push_to_hz") def push_to_hz(client, key: str, payload: dict): import json try: payload = dict(payload) payload["_pushed_at"] = datetime.now(timezone.utc).isoformat() client.get_map(HZ_MAP_NAME).blocking().put(key, json.dumps(payload)) return True except Exception as e: return False @flow(name="exf-fetcher-v2", log_prints=True) def exf_fetcher_flow(warmup_s: int = 30): from realtime_exf_service import RealTimeExFService from exf_persistence import ExFPersistenceService from prod._hz_push import make_hz_client import time log = get_run_logger() svc = RealTimeExFService() svc.start() log.info(f"Service started — warmup {warmup_s}s") time.sleep(warmup_s) persist = ExFPersistenceService(flush_interval_s=300) persist.start() log.info("Persistence started") client = make_hz_client() log.info("Hazelcast connected") push_count = 0 last_log = 0 log.info(f"Loop started — pushing every {HZ_PUSH_INTERVAL_S}s") try: while True: t0 = time.monotonic() indicators = svc.get_indicators(dual_sample=True) staleness = indicators.pop("_staleness", {}) payload = {k: v for k, v in indicators.items() if isinstance(v, (int, float, str, bool))} payload["_staleness_s"] = {k: round(v, 1) for k, v in staleness.items() if isinstance(v, (int, float))} acb_present = {k for k in ACB_KEYS if payload.get(k) is not None and isinstance(payload[k], (int, float)) and payload[k] == payload[k]} payload["_acb_ready"] = len(acb_present) == len(ACB_KEYS) payload["_acb_present"] = f"{len(acb_present)}/{len(ACB_KEYS)}" payload["_acb_missing"] = list(ACB_KEYS - acb_present) if len(acb_present) < len(ACB_KEYS) else [] payload["_ok_count"] = len([k for k in payload.keys() if not k.startswith('_')]) payload["_timestamp"] = datetime.now(timezone.utc).isoformat() try: push_to_hz.submit(client, HZ_KEY, payload).result(timeout=2.0) push_count += 1 except Exception as e: log.warning(f"Push failed: {e}") persist.update_snapshot(payload) if time.time() - last_log > 60: log.info(f"Status | pushes={push_count} | acb={payload['_acb_present']} ready={payload['_acb_ready']}") last_log = time.time() elapsed = time.monotonic() - t0 time.sleep(max(0.0, HZ_PUSH_INTERVAL_S - elapsed)) except KeyboardInterrupt: log.info("Interrupted") finally: svc.stop() persist.stop() client.shutdown() log.info(f"Stopped. Pushes: {push_count}") def deploy(): """Create Prefect deployment.""" subprocess.run([ "prefect", "deployment", "build", "exf_prefect_deploy.py:exf_fetcher_flow", "--name", "exf-live", "--pool", "dolphin", "--cron", "*/5 * * * *", # Run every 5 min if not already running ], cwd="/mnt/dolphinng5_predict/prod", check=True) subprocess.run([ "prefect", "deployment", "apply", "exf_prefect_deploy-deployment.yaml" ], cwd="/mnt/dolphinng5_predict/prod", check=True) print("✓ Deployment created") def run(): """Run flow immediately.""" exf_fetcher_flow(warmup_s=20) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("command", choices=["deploy", "run"]) args = parser.parse_args() if args.command == "deploy": deploy() else: run()