117 lines
3.8 KiB
Python
117 lines
3.8 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Prefect flow — periodically recompute S6 bucket sizing coefficients.
|
||
|
|
|
||
|
|
Cadence
|
||
|
|
───────
|
||
|
|
Controlled by env `S6_RECOMPUTE_INTERVAL_DAYS` (default 30). The flow is
|
||
|
|
idempotent: running it more often than the interval just produces a fresh
|
||
|
|
YAML and markdown report; nothing is auto-promoted.
|
||
|
|
|
||
|
|
Kill-switch
|
||
|
|
───────────
|
||
|
|
Set env `S6_RECOMPUTE_DISABLED=1` to skip — the flow logs and exits 0.
|
||
|
|
|
||
|
|
Wiring into supervisord/cron
|
||
|
|
────────────────────────────
|
||
|
|
Run this flow on the supervisord host. Recommended: daily timer that guards
|
||
|
|
internally on the last-run timestamp in `dolphin.s6_recompute_log`, or a
|
||
|
|
Prefect deployment with a cron schedule `0 3 * * *` and the interval env set.
|
||
|
|
|
||
|
|
This flow shells out to `prod/scripts/recompute_s6_coefficients.py` rather
|
||
|
|
than importing it so that the script remains independently runnable without
|
||
|
|
requiring Prefect — honours the plan's "recompute works whether or not
|
||
|
|
Prefect is installed" requirement.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import os
|
||
|
|
import subprocess
|
||
|
|
import sys
|
||
|
|
from datetime import datetime, timedelta
|
||
|
|
from pathlib import Path
|
||
|
|
|
||
|
|
try:
|
||
|
|
from prefect import flow, task, get_run_logger
|
||
|
|
except ImportError: # pragma: no cover — allow import-only smoke tests
|
||
|
|
def flow(fn=None, **_kw):
|
||
|
|
if fn is None:
|
||
|
|
return lambda f: f
|
||
|
|
return fn
|
||
|
|
def task(fn=None, **_kw):
|
||
|
|
if fn is None:
|
||
|
|
return lambda f: f
|
||
|
|
return fn
|
||
|
|
def get_run_logger(): # pragma: no cover
|
||
|
|
import logging
|
||
|
|
return logging.getLogger("s6_recompute_flow")
|
||
|
|
|
||
|
|
|
||
|
|
REPO_ROOT = Path(__file__).resolve().parents[1]
|
||
|
|
RECOMPUTE_PY = REPO_ROOT / "prod" / "scripts" / "recompute_s6_coefficients.py"
|
||
|
|
TABLE_YML = REPO_ROOT / "prod" / "configs" / "green_s6_table.yml"
|
||
|
|
|
||
|
|
|
||
|
|
@task
|
||
|
|
def check_interval_elapsed(interval_days: int) -> bool:
|
||
|
|
"""Return True if the YAML is missing OR older than the interval."""
|
||
|
|
if not TABLE_YML.exists():
|
||
|
|
return True
|
||
|
|
try:
|
||
|
|
import yaml
|
||
|
|
doc = yaml.safe_load(TABLE_YML.read_text()) or {}
|
||
|
|
ts_str = (doc.get("meta") or {}).get("generated_at", "")
|
||
|
|
if not ts_str:
|
||
|
|
return True
|
||
|
|
ts = datetime.fromisoformat(ts_str.rstrip("Z"))
|
||
|
|
return datetime.utcnow() - ts >= timedelta(days=interval_days)
|
||
|
|
except Exception:
|
||
|
|
# Any parsing trouble → safer to recompute
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
@task
|
||
|
|
def run_recompute(strategy: str, since: str, min_trades: int, source_branch: str) -> int:
|
||
|
|
logger = get_run_logger()
|
||
|
|
cmd = [
|
||
|
|
sys.executable, str(RECOMPUTE_PY),
|
||
|
|
"--strategy", strategy,
|
||
|
|
"--since", since,
|
||
|
|
"--min-trades-per-bucket", str(min_trades),
|
||
|
|
"--source-branch", source_branch,
|
||
|
|
]
|
||
|
|
logger.info(f"[s6_recompute_flow] exec: {' '.join(cmd)}")
|
||
|
|
rc = subprocess.call(cmd, cwd=str(REPO_ROOT))
|
||
|
|
logger.info(f"[s6_recompute_flow] exit code: {rc}")
|
||
|
|
return rc
|
||
|
|
|
||
|
|
|
||
|
|
@flow(name="s6-recompute")
|
||
|
|
def s6_recompute_flow(
|
||
|
|
strategy: str = "blue",
|
||
|
|
since: str = "2026-01-01",
|
||
|
|
min_trades: int = 20,
|
||
|
|
source_branch: str = "exp/green-s6-esof-aem-shadow-2026-04-21",
|
||
|
|
):
|
||
|
|
logger = get_run_logger()
|
||
|
|
|
||
|
|
if os.environ.get("S6_RECOMPUTE_DISABLED") == "1":
|
||
|
|
logger.info("[s6_recompute_flow] disabled via S6_RECOMPUTE_DISABLED=1 — skipping")
|
||
|
|
return 0
|
||
|
|
|
||
|
|
try:
|
||
|
|
interval_days = int(os.environ.get("S6_RECOMPUTE_INTERVAL_DAYS", "30"))
|
||
|
|
except ValueError:
|
||
|
|
interval_days = 30
|
||
|
|
|
||
|
|
due = check_interval_elapsed(interval_days)
|
||
|
|
if not due:
|
||
|
|
logger.info(f"[s6_recompute_flow] interval not elapsed ({interval_days}d) — skipping")
|
||
|
|
return 0
|
||
|
|
|
||
|
|
return run_recompute(strategy, since, min_trades, source_branch)
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
sys.exit(int(s6_recompute_flow() or 0))
|