Files
DOLPHIN/prod/esof_prefect_flow.py
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

267 lines
11 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
DOLPHIN — EsoF (Esoteric Factors) Prefect Flow
================================================
Prefect-governed long-running daemon that wraps EsotericFactorsService
and streams its 5-second state snapshot to Hazelcast.
Architecture:
- Local astropy / math calculations — NO external APIs, NO rate limits
- EsotericFactorsService polls at POLL_INTERVAL_S; <1ms retrieval via get_latest()
- 6-hour TTL cache for expensive astro computations (moon, mercury)
- HZ key: DOLPHIN_FEATURES['esof_latest']
- Local JSON: external_factors/eso_cache/latest_esoteric_factors.json
(atomic tmp→rename; legacy fallback for disk-readers)
Timing rationale:
HZ_PUSH_INTERVAL_S = 5 seconds
• Matches the master DOLPHIN scan frequency (5s). Every DOLPHIN scan
will see a fresh EsoF snapshot without unnecessary CPU churn.
• EsoF indicators are pure local computations; the fastest-changing
meaningful indicator (fibonacci_time) has per-minute resolution;
liquidity_session changes at hourly boundaries; all others are slower.
Pushing at 5s therefore captures every meaningful state transition.
• Zero external APIs → zero rate-limit constraints.
POLL_INTERVAL_S = 5 seconds
• Service internal compute cadence; kept equal to push interval so each
HZ push carries a freshly computed snapshot (no stale reads).
WARMUP_S = 6 seconds
• One full internal poll cycle (5s) + 1s safety margin before first push.
Indicators pushed:
moon_illumination, moon_phase_name, mercury_retrograde,
population_weighted_hour, liquidity_weighted_hour, liquidity_session,
market_cycle_position, fibonacci_time, calendar, regional_times,
timestamp, unix, _pushed_at, _push_seq
Usage:
prefect deployment run 'esof-prefect-flow/esof-daemon'
python prod/esof_prefect_flow.py # direct execution
"""
import sys
import json
import time
from pathlib import Path
from datetime import datetime, timezone
from prefect import flow, task, get_run_logger
from prefect.cache_policies import NO_CACHE
_HERE = Path(__file__).parent
_ROOT = _HERE.parent
sys.path.insert(0, str(_HERE))
sys.path.insert(0, str(_ROOT / "external_factors"))
sys.path.insert(0, str(_ROOT))
try:
from esof_persistence import EsoFPersistenceService
_HAS_PERSISTENCE = True
except ImportError as _e:
_HAS_PERSISTENCE = False
print(f"Warning: EsoF persistence layer not available: {_e}")
# ── Constants ──────────────────────────────────────────────────────────────────
HZ_PUSH_INTERVAL_S = 5 # 5s — matches master DOLPHIN scan frequency
POLL_INTERVAL_S = 5.0 # EsotericFactorsService internal compute cadence
WARMUP_S = 6 # one poll cycle (5s) + 1s safety margin
HZ_KEY = "esof_latest"
HZ_MAP = "DOLPHIN_FEATURES"
LOCAL_CACHE_DIR = _ROOT / "external_factors" / "eso_cache"
LOCAL_CACHE_FILE = LOCAL_CACHE_DIR / "latest_esoteric_factors.json"
LOG_STATUS_EVERY = 12 # status line every ~60s (12 × 5s)
# ── HZ Push Task ──────────────────────────────────────────────────────────────
@task(name="hz_push_esof", retries=3, retry_delay_seconds=2, cache_policy=NO_CACHE)
def hz_push_esof_task(client, payload: dict) -> bool:
"""
Push EsoF snapshot to Hazelcast DOLPHIN_FEATURES[esof_latest].
Delegates to shared _hz_push.hz_push (zero-allocation fast path).
Re-raises on failure so Prefect retry logic fires.
NO_CACHE policy prevents Prefect from attempting to serialise the HZ client.
"""
from _hz_push import hz_push
try:
return hz_push(HZ_KEY, payload, client)
except Exception as e:
raise RuntimeError(f"HZ push failed: {e}") from e
# ── Local cache helper (off hot-path) ─────────────────────────────────────────
def _write_local_cache(data: dict) -> None:
"""Atomic tmp→rename write to local JSON (legacy fallback for disk-readers)."""
LOCAL_CACHE_DIR.mkdir(parents=True, exist_ok=True)
tmp = LOCAL_CACHE_DIR / "latest_esoteric_factors.tmp"
try:
with open(tmp, "w") as fh:
json.dump(data, fh, indent=2)
tmp.replace(LOCAL_CACHE_FILE)
except Exception:
# Non-fatal: HZ is the primary hot path
pass
# ── Prefect Flow ──────────────────────────────────────────────────────────────
@flow(name="esof-prefect-flow", log_prints=True)
def esof_prefect_flow(
warmup_s: int = WARMUP_S,
poll_interval_s: float = POLL_INTERVAL_S,
):
"""
DOLPHIN EsoF Prefect-governed daemon.
Long-running flow — NOT a scheduled job; runs as a continuous process.
Starts EsotericFactorsService, waits warmup_s for the first compute cycle,
then loops at 5-second cadence pushing to Hazelcast and local JSON cache.
"""
from esoteric_factors_service import EsotericFactorsService
from _hz_push import make_hz_client
log = get_run_logger()
# ── Service startup ───────────────────────────────────────────────────────
# Redirect the service's internal disk writer to an isolated tmp dir so it
# does not race with the flow's own _write_local_cache() on the same .tmp
# filename. The flow owns the canonical JSON at LOCAL_CACHE_FILE.
import tempfile as _tf
_svc_scratch = Path(_tf.mkdtemp(prefix="esof_svc_scratch_"))
log.info("Starting EsotericFactorsService (poll_interval=%.1fs) …", poll_interval_s)
svc = EsotericFactorsService(output_dir=str(_svc_scratch), poll_interval_s=poll_interval_s)
svc.start()
log.info("Warmup %ds — waiting for first compute cycle …", warmup_s)
time.sleep(warmup_s)
# ── Hazelcast connection ──────────────────────────────────────────────────
client = None
def _connect() -> bool:
nonlocal client
try:
if client is not None:
try:
client.shutdown()
except Exception:
pass
client = make_hz_client()
log.info("Hazelcast connected — pushing to %s['%s'] every %ds",
HZ_MAP, HZ_KEY, HZ_PUSH_INTERVAL_S)
return True
except Exception as exc:
log.warning("HZ connect failed: %s — will retry in loop", exc)
client = None
return False
_connect()
# ── Persistence service (off hot-path) ────────────────────────────────────
persist = None
if _HAS_PERSISTENCE:
try:
persist = EsoFPersistenceService(flush_interval_s=300)
persist.start()
log.info("EsoFPersistenceService started (5-min flush, 60-pt rolling buffer)")
except Exception as exc:
log.warning("EsoF persistence svc failed to start: %s", exc)
# ── Main push loop ────────────────────────────────────────────────────────
push_count = 0
fail_count = 0
consecutive_fails = 0
log.info("=== ESOF PREFECT LOOP STARTED (interval=%ds) ===", HZ_PUSH_INTERVAL_S)
try:
while True:
t0 = time.monotonic()
data = svc.get_latest() # <1ms, thread-safe, non-blocking
if data:
# Write local JSON cache (atomic, off hot-path)
_write_local_cache(data)
# Ensure HZ connectivity
if client is None:
if not _connect():
consecutive_fails += 1
if consecutive_fails > 10:
log.error(
"CRITICAL: HZ unreachable for %d+ consecutive iterations",
consecutive_fails,
)
time.sleep(1.0)
continue
# Push via Prefect task (built-in 3× retry, NO_CACHE)
try:
success = hz_push_esof_task.submit(client, data).result(timeout=4.0)
except Exception as exc:
success = False
log.warning("HZ push exc (fail#%d): %s", fail_count + 1, exc)
if consecutive_fails >= 3:
_connect()
if success:
push_count += 1
consecutive_fails = 0
# Feed rolling deque (off hot-path, O(1))
if persist is not None:
persist.update_snapshot(data)
if push_count % LOG_STATUS_EVERY == 1:
log.info(
"EsoF push#%d moon=%s retro=%s session=%s cycle=%.4f",
push_count,
data.get("moon_phase_name"),
bool(data.get("mercury_retrograde")),
data.get("liquidity_session"),
data.get("market_cycle_position", 0.0),
)
else:
fail_count += 1
consecutive_fails += 1
if consecutive_fails >= 3:
_connect()
else:
log.debug("EsoF state empty — service still warming up")
# Maintain precise 5s cadence
elapsed = time.monotonic() - t0
sleep_time = HZ_PUSH_INTERVAL_S - elapsed
if sleep_time > 0:
time.sleep(sleep_time)
except KeyboardInterrupt:
log.info("Shutdown requested via KeyboardInterrupt")
finally:
log.info("Stopping EsoF services — pushes=%d fails=%d", push_count, fail_count)
svc.stop()
if persist is not None:
try:
persist.stop()
stats = persist.get_stats()
log.info(
"EsoF persistence stopped — files=%d points=%d bytes=%d",
stats["files_written"], stats["points_written"], stats["bytes_written"],
)
except Exception:
pass
if client is not None:
try:
client.shutdown()
except Exception:
pass
log.info("EsoF Prefect flow exited cleanly")
if __name__ == "__main__":
esof_prefect_flow()