Files
DOLPHIN/prod/exf_prefect_deploy.py
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

143 lines
4.6 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Deploy ExF Fetcher as Prefect flow
Usage:
python exf_prefect_deploy.py deploy # Create deployment
python exf_prefect_deploy.py run # Run immediately
"""
import sys
import subprocess
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent))
sys.path.insert(0, str(Path(__file__).parent.parent / "external_factors"))
from prefect import flow, task, get_run_logger
from datetime import datetime, timezone
HZ_PUSH_INTERVAL_S = 0.5
HZ_KEY = "exf_latest"
HZ_MAP_NAME = "DOLPHIN_FEATURES"
ACB_KEYS = frozenset([
"funding_btc", "funding_eth", "dvol_btc", "dvol_eth",
"fng", "vix", "ls_btc", "taker", "oi_btc",
])
@task(name="push_to_hz")
def push_to_hz(client, key: str, payload: dict):
import json
try:
payload = dict(payload)
payload["_pushed_at"] = datetime.now(timezone.utc).isoformat()
client.get_map(HZ_MAP_NAME).blocking().put(key, json.dumps(payload))
return True
except Exception as e:
return False
@flow(name="exf-fetcher-v2", log_prints=True)
def exf_fetcher_flow(warmup_s: int = 30):
from realtime_exf_service import RealTimeExFService
from exf_persistence import ExFPersistenceService
from prod._hz_push import make_hz_client
import time
log = get_run_logger()
svc = RealTimeExFService()
svc.start()
log.info(f"Service started — warmup {warmup_s}s")
time.sleep(warmup_s)
persist = ExFPersistenceService(flush_interval_s=300)
persist.start()
log.info("Persistence started")
client = make_hz_client()
log.info("Hazelcast connected")
push_count = 0
last_log = 0
log.info(f"Loop started — pushing every {HZ_PUSH_INTERVAL_S}s")
try:
while True:
t0 = time.monotonic()
indicators = svc.get_indicators(dual_sample=True)
staleness = indicators.pop("_staleness", {})
payload = {k: v for k, v in indicators.items()
if isinstance(v, (int, float, str, bool))}
payload["_staleness_s"] = {k: round(v, 1)
for k, v in staleness.items()
if isinstance(v, (int, float))}
acb_present = {k for k in ACB_KEYS
if payload.get(k) is not None
and isinstance(payload[k], (int, float))
and payload[k] == payload[k]}
payload["_acb_ready"] = len(acb_present) == len(ACB_KEYS)
payload["_acb_present"] = f"{len(acb_present)}/{len(ACB_KEYS)}"
payload["_acb_missing"] = list(ACB_KEYS - acb_present) if len(acb_present) < len(ACB_KEYS) else []
payload["_ok_count"] = len([k for k in payload.keys() if not k.startswith('_')])
payload["_timestamp"] = datetime.now(timezone.utc).isoformat()
try:
push_to_hz.submit(client, HZ_KEY, payload).result(timeout=2.0)
push_count += 1
except Exception as e:
log.warning(f"Push failed: {e}")
persist.update_snapshot(payload)
if time.time() - last_log > 60:
log.info(f"Status | pushes={push_count} | acb={payload['_acb_present']} ready={payload['_acb_ready']}")
last_log = time.time()
elapsed = time.monotonic() - t0
time.sleep(max(0.0, HZ_PUSH_INTERVAL_S - elapsed))
except KeyboardInterrupt:
log.info("Interrupted")
finally:
svc.stop()
persist.stop()
client.shutdown()
log.info(f"Stopped. Pushes: {push_count}")
def deploy():
"""Create Prefect deployment."""
subprocess.run([
"prefect", "deployment", "build",
"exf_prefect_deploy.py:exf_fetcher_flow",
"--name", "exf-live",
"--pool", "dolphin",
"--cron", "*/5 * * * *", # Run every 5 min if not already running
], cwd="/mnt/dolphinng5_predict/prod", check=True)
subprocess.run([
"prefect", "deployment", "apply",
"exf_prefect_deploy-deployment.yaml"
], cwd="/mnt/dolphinng5_predict/prod", check=True)
print("✓ Deployment created")
def run():
"""Run flow immediately."""
exf_fetcher_flow(warmup_s=20)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("command", choices=["deploy", "run"])
args = parser.parse_args()
if args.command == "deploy":
deploy()
else:
run()