Files
DOLPHIN/adaptive_exit/bucket_engine.py
hjnormey 01c19662cb initial: import DOLPHIN baseline 2026-04-21 from dolphinng5_predict working tree
Includes core prod + GREEN/BLUE subsystems:
- prod/ (BLUE harness, configs, scripts, docs)
- nautilus_dolphin/ (GREEN Nautilus-native impl + dvae/ preserved)
- adaptive_exit/ (AEM engine + models/bucket_assignments.pkl)
- Observability/ (EsoF advisor, TUI, dashboards)
- external_factors/ (EsoF producer)
- mc_forewarning_qlabs_fork/ (MC regime/envelope)

Excludes runtime caches, logs, backups, and reproducible artifacts per .gitignore.
2026-04-21 16:58:38 +02:00

183 lines
6.6 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Asset bucketing engine.
Clusters assets into N buckets using price-based characteristics computed
from 1m klines historical data (5-year window):
- vol_daily_pct : annualised daily return volatility
- corr_btc : correlation of returns with BTC
- log_price : log of median close price (price tier proxy)
- vov : vol-of-vol (instability of vol regime)
OBF (spread, depth, imbalance) is NOT used here — it covers only ~21 days
and would overfit to a tiny recent window. OBF is overlay-phase only.
"""
import os
import pickle
from typing import Optional
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
from sklearn.preprocessing import StandardScaler
_BUCKET_PATH = os.path.join(os.path.dirname(__file__), "models", "bucket_assignments.pkl")
_DEFAULT_KLINES_DIR = "/mnt/dolphin_training/data/vbt_cache_klines"
# Sample every Nth file to keep memory manageable (1711 files × 1440 rows = 2.5M rows/asset)
_SAMPLE_STRIDE = 30 # ~57 monthly samples from 5yr history
def _load_klines_features(klines_dir: str) -> pd.DataFrame:
"""
Load sampled 1m klines parquets and compute per-asset characteristics.
Returns DataFrame indexed by symbol with columns:
vol_daily_pct, corr_btc, log_price, vov
"""
files = sorted(f for f in os.listdir(klines_dir) if f.endswith(".parquet"))
if not files:
raise RuntimeError(f"No parquet files in {klines_dir}")
sampled = files[::_SAMPLE_STRIDE]
print(f" Klines: {len(files)} files, sampling every {_SAMPLE_STRIDE}th → {len(sampled)} files")
dfs = []
for fn in sampled:
try:
df = pd.read_parquet(os.path.join(klines_dir, fn))
dfs.append(df)
except Exception:
continue
if not dfs:
raise RuntimeError("Failed to load any klines parquets")
combined = pd.concat(dfs, ignore_index=True)
# Price columns are bare symbol names; exclude known metadata columns
meta_cols = {"timestamp", "open_time", "close_time", "date", "scan_number",
"v50_lambda_max_velocity", "v150_lambda_max_velocity",
"v300_lambda_max_velocity", "v750_lambda_max_velocity",
"vel_div", "instability_50", "instability_150"}
price_cols = [c for c in combined.columns if c not in meta_cols]
# If OHLCV multi-level columns, extract close
if any("_close" in c.lower() for c in price_cols):
price_cols = [c for c in price_cols if "_close" in c.lower()]
sym_map = {c: c.lower().replace("_close", "").upper() for c in price_cols}
else:
sym_map = {c: c for c in price_cols} # already bare symbol names
prices = combined[price_cols].rename(columns=sym_map).astype(float)
# Ensure BTC present for correlation
btc_col = next((c for c in prices.columns if "BTC" in c.upper()), None)
if btc_col is None:
raise RuntimeError("BTCUSDT not found in klines — cannot compute corr_btc")
rets = prices.pct_change(fill_method=None).dropna()
btc_rets = rets[btc_col]
records = []
for sym in prices.columns:
r = rets[sym].dropna()
if len(r) < 100:
continue
# Daily vol proxy: std of 1m returns × sqrt(1440) (1440 bars/day) × sqrt(252)
vol_daily = r.std() * np.sqrt(1440 * 252)
corr_btc = r.corr(btc_rets)
log_price = np.log1p(prices[sym].median())
# Vol-of-vol: rolling 60-bar std, then std of that series
rolling_vol = r.rolling(60).std().dropna()
vov = rolling_vol.std() / (rolling_vol.mean() + 1e-9)
corr_val = float(corr_btc) if not np.isnan(corr_btc) else 0.5
records.append({
"symbol": sym,
"vol_daily_pct": vol_daily * 100,
"corr_btc": corr_val,
"log_price": log_price,
"btc_relevance": corr_val * log_price, # market-significance proxy
"vov": vov,
})
df = pd.DataFrame(records).set_index("symbol")
df = df.replace([np.inf, -np.inf], np.nan).dropna()
print(f" Computed characteristics for {len(df)} assets")
return df
def find_optimal_k(X_scaled: np.ndarray, k_min: int = 4, k_max: int = 12) -> int:
"""Silhouette search for best k."""
best_k, best_sil = k_min, -1.0
for k in range(k_min, min(k_max + 1, len(X_scaled))):
km = KMeans(n_clusters=k, random_state=42, n_init=10)
labels = km.fit_predict(X_scaled)
sil = silhouette_score(X_scaled, labels, sample_size=min(500, len(X_scaled)))
if sil > best_sil:
best_sil, best_k = sil, k
return best_k
def build_buckets(
klines_dir: str = _DEFAULT_KLINES_DIR,
k_override: Optional[int] = None,
force_rebuild: bool = False,
) -> dict:
"""
Build or load bucket assignments from 1m klines price characteristics.
Returns dict:
- 'assignments': {symbol: bucket_id}
- 'n_buckets': int
- 'model': fitted KMeans
- 'scaler': fitted StandardScaler
- 'features': DataFrame of per-asset characteristics
"""
if not force_rebuild and os.path.exists(_BUCKET_PATH):
with open(_BUCKET_PATH, "rb") as f:
return pickle.load(f)
print(f"[BucketEngine] Computing price characteristics from {klines_dir} ...")
feat = _load_klines_features(klines_dir)
if len(feat) < 4:
raise RuntimeError(f"Only {len(feat)} assets — need at least 4 for bucketing")
feature_cols = ["vol_daily_pct", "corr_btc", "log_price", "btc_relevance", "vov"]
X = feat[feature_cols].values
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
if k_override is not None:
k = k_override
else:
k_max = min(12, len(feat) // 4)
print(f" Searching optimal k in [4, {k_max}] for {len(feat)} assets...")
k = find_optimal_k(X_scaled, k_min=4, k_max=k_max)
print(f" Fitting KMeans k={k}...")
km = KMeans(n_clusters=k, random_state=42, n_init=20)
labels = km.fit_predict(X_scaled)
assignments = {sym: int(lbl) for sym, lbl in zip(feat.index, labels)}
result = {
"assignments": assignments,
"n_buckets": k,
"model": km,
"scaler": scaler,
"features": feat,
}
os.makedirs(os.path.dirname(_BUCKET_PATH), exist_ok=True)
with open(_BUCKET_PATH, "wb") as f:
pickle.dump(result, f)
print(f" Saved bucket assignments: {k} buckets, {len(assignments)} assets → {_BUCKET_PATH}")
return result
def get_bucket(symbol: str, bucket_data: dict, fallback: int = 0) -> int:
"""Return bucket ID for a symbol, with fallback for unknowns."""
return bucket_data["assignments"].get(symbol, fallback)