""" MAE/MFE training data generator. Simulates SHORT entries from raw price data (vel_div < VEL_DIV_THRESHOLD), extracts state vectors, and labels continuation probability. Data sources: 1. VBT parquet cache -- 48 assets, 512s cadence, 56-day gold window 2. ClickHouse obf_universe -- 542 symbols, static OBF features ExF features joined by date from NPZ backfill: exf_fng, exf_fng_delta, exf_funding_btc, exf_dvol_btc, exf_chg24_btc Output columns: mae_norm, mfe_norm, tau_norm, bucket_id, spread_bps, depth_usd, fill_prob, ret_1, ret_3, exf_fng, exf_fng_delta, exf_funding_btc, exf_dvol_btc, exf_chg24_btc, continuation """ import json import os import urllib.request from typing import Optional import numpy as np import pandas as pd _EXF_NPZ_BASE = "/mnt/dolphin_training/data/eigenvalues" # NPZ field names and their normalisation divisors _EXF_FIELDS = { "fng": 100.0, "fng_prev": 100.0, # only for delta computation "funding_btc": 1.0, "dvol_btc": 100.0, "chg24_btc": 100.0, } VEL_DIV_THRESHOLD = -0.02 MAX_HOLD = 120 HORIZON = 8 ATR_WINDOW = 20 MIN_ATR = 1e-6 _CH_URL = "http://localhost:8123/" _CH_HEADERS = {"X-ClickHouse-User": "dolphin", "X-ClickHouse-Key": "dolphin_ch_2026"} _VBT_DIR = "/mnt/dolphinng5_predict/vbt_cache" def _ch_query(sql: str, timeout: int = 60) -> list[dict]: body = (sql + "\nFORMAT JSONEachRow").encode() req = urllib.request.Request(_CH_URL, data=body, method="POST") for k, v in _CH_HEADERS.items(): req.add_header(k, v) resp = urllib.request.urlopen(req, timeout=timeout) rows = [] for line in resp.read().decode().strip().split("\n"): if line: rows.append(json.loads(line)) return rows # ── ExF NPZ index ───────────────────────────────────────────────────────────── def _load_exf_index(npz_base: str = _EXF_NPZ_BASE) -> dict: """Load daily ExF NPZ files into {date_str: {field: normalised_float_or_None}}.""" index = {} if not os.path.isdir(npz_base): return index for date_dir in sorted(os.listdir(npz_base)): npz_path = os.path.join(npz_base, date_dir, "scan_000001__Indicators.npz") if not os.path.exists(npz_path): continue try: z = np.load(npz_path, allow_pickle=True) nd = {n: (float(v), bool(o)) for n, v, o in zip(z["api_names"], z["api_indicators"], z["api_success"])} row = {} for field, divisor in _EXF_FIELDS.items(): v, good = nd.get(field, (0.0, False)) row[field] = (v / divisor) if good else None index[date_dir] = row except Exception: continue return index def _fill_exf_medians(index: dict) -> dict: """Replace None values with cross-day median for each field.""" from statistics import median for field in _EXF_FIELDS: vals = [row[field] for row in index.values() if row.get(field) is not None] med = median(vals) if vals else 0.0 for row in index.values(): if row[field] is None: row[field] = med return index def _exf_features_for_date(date_str: str, index: dict) -> dict: """Return 5 ExF model features for a date; falls back to nearest prior day.""" if date_str in index: row = index[date_str] else: prior = [d for d in sorted(index.keys()) if d <= date_str] row = index[prior[-1]] if prior else {} fng = row.get("fng", 0.0) or 0.0 fng_prev = row.get("fng_prev", fng) or fng return { "exf_fng": fng, "exf_fng_delta": fng - fng_prev, "exf_funding_btc": row.get("funding_btc", 0.0) or 0.0, "exf_dvol_btc": row.get("dvol_btc", 0.0) or 0.0, "exf_chg24_btc": row.get("chg24_btc", 0.0) or 0.0, } # ── VBT parquet source ──────────────────────────────────────────────────────── def _load_vbt(vbt_dir: str = _VBT_DIR) -> tuple[pd.DataFrame, list[str]]: """Load all VBT parquets, return (df, price_cols).""" files = sorted(f for f in os.listdir(vbt_dir) if f.endswith(".parquet")) meta = {"timestamp", "scan_number", "v50_lambda_max_velocity", "v150_lambda_max_velocity", "v300_lambda_max_velocity", "v750_lambda_max_velocity", "vel_div", "instability_50", "instability_150"} dfs = [pd.read_parquet(os.path.join(vbt_dir, f)) for f in files] df = pd.concat(dfs, ignore_index=True) df["timestamp"] = pd.to_datetime(df["timestamp"], utc=True) df = df.sort_values("timestamp").reset_index(drop=True) price_cols = [c for c in df.columns if c not in meta] return df, price_cols # ── Core trade simulation ───────────────────────────────────────────────────── def _simulate_trades_on_series( prices: np.ndarray, vel_div: Optional[np.ndarray], asset: str, bucket_id: int, obf_row: Optional[dict] = None, timestamps: Optional[np.ndarray] = None, max_samples: int = 50_000, ) -> list[dict]: """ Simulate SHORT entries on a price series and return training samples. Entry bars are pre-selected (ceil(max_samples / MAX_HOLD) bars drawn uniformly at random from all valid candidates) so that peak memory is bounded to ~max_samples dicts regardless of series length. Full k-trajectories are kept for each selected entry, preserving excursion-path structure. timestamps: parallel array for ExF date lookup (None = no date recorded). """ n = len(prices) samples = [] if n < MAX_HOLD + HORIZON + ATR_WINDOW: return samples log_ret = np.diff(np.log(np.maximum(prices, 1e-12))) atr_arr = np.array([ np.std(log_ret[max(0, i - ATR_WINDOW):i]) if i >= ATR_WINDOW else np.std(log_ret[:i + 1]) for i in range(len(log_ret)) ]) obf_spread = float(obf_row["spread_bps"]) if obf_row else 0.0 obf_depth = float(obf_row["depth_usd"]) if obf_row else 0.0 obf_fill = float(obf_row["fill_prob"]) if obf_row else 0.9 # Pre-select entry bars to bound peak memory. # Each entry t generates ≤MAX_HOLD samples; select enough to reach max_samples. # Sorted to preserve temporal order (not required by model, but aids debugging). candidate_ts = np.arange(ATR_WINDOW, n - MAX_HOLD - HORIZON) target_entries = max(1, int(np.ceil(max_samples / MAX_HOLD))) if len(candidate_ts) > target_entries: selected_ts = np.sort( np.random.choice(candidate_ts, target_entries, replace=False) ) else: selected_ts = candidate_ts for t in selected_ts: # Universal sampling: vel_div_entry is a feature, not a filter. # BLUE inference always queries with vel_div < -0.02, naturally selecting # the well-conditioned region of the learned surface. vde = float(vel_div[t]) if vel_div is not None else 0.0 entry = prices[t] atr = max(atr_arr[t], MIN_ATR) date_str = str(timestamps[t])[:10] if timestamps is not None else None mae = 0.0 mfe = 0.0 for k in range(1, MAX_HOLD + 1): if t + k >= n: break cur = prices[t + k] delta = (entry - cur) / entry mae = max(mae, max(0.0, -delta)) mfe = max(mfe, max(0.0, delta)) future_t = t + k + HORIZON if future_t >= n: break future_delta = (entry - prices[future_t]) / entry continuation = 1 if future_delta > 0.0 else 0 ret_1 = log_ret[t + k - 1] if t + k - 1 < len(log_ret) else 0.0 ret_3 = np.mean(log_ret[max(0, t + k - 3):t + k]) if k >= 3 else ret_1 vdn = float(vel_div[t + k]) if vel_div is not None and t + k < len(vel_div) else vde samples.append({ "asset": asset, "bucket_id": bucket_id, "mae_norm": mae / atr, "mfe_norm": mfe / atr, "tau_norm": k / MAX_HOLD, "atr": atr, "vel_div_entry": vde, "vel_div_now": vdn, "spread_bps": obf_spread, "depth_usd": obf_depth, "fill_prob": obf_fill, "ret_1": ret_1, "ret_3": ret_3, "continuation": continuation, "_date": date_str, }) return samples # ── Public API ──────────────────────────────────────────────────────────────── def build_training_data( bucket_assignments: dict, vbt_dir: str = _VBT_DIR, use_obf_ch: bool = True, max_samples_per_asset: int = 50_000, ) -> pd.DataFrame: """Build full training DataFrame from all available price data.""" all_samples = [] # Static OBF features per asset obf_static: dict[str, dict] = {} if use_obf_ch: try: rows = _ch_query(""" SELECT symbol, median(spread_bps) AS spread_bps, median(depth_1pct_usd) AS depth_usd, median(fill_probability) AS fill_prob FROM dolphin.obf_universe GROUP BY symbol """, timeout=60) obf_static = {r["symbol"]: r for r in rows} print(f"[DataPipeline] OBF static features: {len(obf_static)} assets") except Exception as e: print(f"[DataPipeline] OBF unavailable ({e})") # ExF NPZ index print("[DataPipeline] Loading ExF NPZ index...") exf_index = _load_exf_index() if exf_index: exf_index = _fill_exf_medians(exf_index) print(f" ExF: {len(exf_index)} days ({min(exf_index)} -> {max(exf_index)})") else: print(" ExF: unavailable, columns will be zero") # SOURCE 1: VBT parquet cache print("[DataPipeline] Loading VBT parquet cache...") df_vbt, price_cols = _load_vbt(vbt_dir) vel_div_arr = df_vbt["vel_div"].values if "vel_div" in df_vbt.columns else None ts_arr = df_vbt["timestamp"].values if "timestamp" in df_vbt.columns else None for asset in price_cols: prices = df_vbt[asset].values.astype(float) bid = bucket_assignments.get(asset, 0) obf = obf_static.get(asset) samps = _simulate_trades_on_series( prices, vel_div_arr, asset, bid, obf, ts_arr, max_samples=max_samples_per_asset, ) all_samples.extend(samps) print(f" VBT {asset}: {len(samps)} samples -> bucket {bid}") # SOURCE 2: NG7 eigenvalue JSON price data eigen_dir = "/mnt/ng6_data/eigenvalues" if os.path.isdir(eigen_dir): print("[DataPipeline] Scanning NG7 eigenvalue JSON files...") samps = _load_from_eigenvalue_json(eigen_dir, bucket_assignments, obf_static, max_samples_per_asset) all_samples.extend(samps) print(f" NG7 eigen: {len(samps)} samples total") print(f"[DataPipeline] Total samples: {len(all_samples)}") df = pd.DataFrame(all_samples) # Join ExF features by date if exf_index and "_date" in df.columns: print("[DataPipeline] Joining ExF features by date...") unique_dates = df["_date"].dropna().unique() exf_map = {d: _exf_features_for_date(d, exf_index) for d in unique_dates} exf_df = df["_date"].map(exf_map).apply(pd.Series) df = pd.concat([df, exf_df], axis=1) print(f" ExF join: {exf_df.notna().all(axis=1).mean():.1%} rows covered") else: for col in ["exf_fng", "exf_fng_delta", "exf_funding_btc", "exf_dvol_btc", "exf_chg24_btc"]: df[col] = 0.0 df = df.drop(columns=["_date"], errors="ignore") return df def _load_from_eigenvalue_json( eigen_dir: str, bucket_assignments: dict, obf_static: dict, max_per_asset: int, ) -> list[dict]: """Extract price series from NG7 eigenvalue JSON files.""" import glob asset_prices: dict[str, list[float]] = {} for date_dir in sorted(os.listdir(eigen_dir)): day_path = os.path.join(eigen_dir, date_dir) if not os.path.isdir(day_path): continue for jf in sorted(glob.glob(os.path.join(day_path, "scan_*.json")))[::3]: try: with open(jf) as f: data = json.load(f) except Exception: continue prices_json = data.get("asset_prices_json") or data.get("result", {}).get("asset_prices_json") if prices_json: if isinstance(prices_json, str): try: prices_json = json.loads(prices_json) except Exception: continue for sym, px in prices_json.items(): asset_prices.setdefault(sym, []).append(float(px)) samples = [] for asset, prices in asset_prices.items(): if len(prices) < MAX_HOLD + HORIZON + ATR_WINDOW: continue bid = bucket_assignments.get(asset, 0) obf = obf_static.get(asset) arr = np.array(prices, dtype=float) samps = _simulate_trades_on_series(arr, None, asset, bid, obf, max_samples=max_per_asset) samples.extend(samps) return samples