Files
DOLPHIN/prod/exf_simple_production.py

94 lines
3.1 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
DOLPHIN ExF Production - Prefect Managed
==========================================
No Hazelcast client serialization - direct calls only.
"""
from prefect import flow, get_run_logger
from datetime import datetime, timezone
import sys
from pathlib import Path
_HERE = Path(__file__).parent
sys.path.insert(0, str(_HERE))
sys.path.insert(0, str(_HERE.parent / "external_factors"))
@flow(name="exf-production-v2", log_prints=True)
def exf_flow(warmup_s: int = 25):
from realtime_exf_service import RealTimeExFService
from exf_persistence import ExFPersistenceService
from _hz_push import make_hz_client
import json
import time
log = get_run_logger()
# Init services
svc = RealTimeExFService()
svc.start()
time.sleep(warmup_s)
persist = ExFPersistenceService()
persist.start()
# Connect HZ once
client = make_hz_client()
ACB_KEYS = ["funding_btc","funding_eth","dvol_btc","dvol_eth","fng","vix","ls_btc","taker","oi_btc"]
pushes = 0
last_log = 0
log.info("=== EXF PRODUCTION STARTED ===")
try:
while True:
t0 = time.monotonic()
# Fetch
indicators = svc.get_indicators(dual_sample=True)
stale = indicators.pop("_staleness", {})
# Build payload
payload = {k: v for k, v in indicators.items() if isinstance(v, (int, float, str, bool))}
payload["_staleness_s"] = {k: round(v, 1) for k, v in stale.items() if isinstance(v, (int, float))}
# ACB check
acb_ok = [k for k in ACB_KEYS if payload.get(k) is not None and payload[k] == payload[k]]
payload["_acb_ready"] = len(acb_ok) == len(ACB_KEYS)
payload["_acb_present"] = f"{len(acb_ok)}/{len(ACB_KEYS)}"
payload["_acb_missing"] = [k for k in ACB_KEYS if k not in acb_ok]
payload["_timestamp"] = datetime.now(timezone.utc).isoformat()
payload["_pushed_at"] = datetime.now(timezone.utc).isoformat()
# Push to HZ - DIRECT CALL (not as Prefect task)
try:
client.get_map("DOLPHIN_FEATURES").blocking().put("exf_latest", json.dumps(payload))
pushes += 1
except Exception as e:
log.warning(f"HZ push failed: {e}")
# Persist
persist.update_snapshot(payload)
# Log every 60s
if time.time() - last_log > 60:
stats = persist.get_stats()
log.info(f"Pushes: {pushes}, ACB: {payload['_acb_present']}, Files: {stats.get('files_written', 0)}")
last_log = time.time()
# 0.5s interval
elapsed = time.monotonic() - t0
time.sleep(max(0.0, 0.5 - elapsed))
except KeyboardInterrupt:
log.info("Shutdown")
finally:
svc.stop()
persist.stop()
client.shutdown()
log.info(f"Total: {pushes}")
if __name__ == "__main__":
exf_flow()