Files
DOLPHIN/nautilus_dolphin/run_tail_conditional_probs.py

313 lines
13 KiB
Python
Raw Permalink Normal View History

import sys, time, datetime, zoneinfo, json, math
from pathlib import Path
import numpy as np
import pandas as pd
sys.path.insert(0, str(Path(__file__).parent))
from astropy.time import Time
import astropy.coordinates as coord
import astropy.units as u
from astropy.coordinates import solar_system_ephemeris, get_body, EarthLocation
from nautilus_dolphin.nautilus.alpha_orchestrator import NDAlphaEngine
from nautilus_dolphin.nautilus.adaptive_circuit_breaker import AdaptiveCircuitBreaker
from nautilus_dolphin.nautilus.ob_features import OBFeatureEngine
from nautilus_dolphin.nautilus.ob_provider import MockOBProvider
class MarketIndicators:
def __init__(self):
self.regions = [
{'name': 'Americas', 'tz': 'America/New_York', 'pop': 1000, 'liq_weight': 0.35},
{'name': 'EMEA', 'tz': 'Europe/London', 'pop': 2200, 'liq_weight': 0.30},
{'name': 'South_Asia', 'tz': 'Asia/Kolkata', 'pop': 1400, 'liq_weight': 0.05},
{'name': 'East_Asia', 'tz': 'Asia/Shanghai', 'pop': 1600, 'liq_weight': 0.20},
{'name': 'Oceania_SEA', 'tz': 'Asia/Singapore', 'pop': 800, 'liq_weight': 0.10}
]
self.cycle_length_days = 1460
self.last_halving = datetime.datetime(2024, 4, 20, tzinfo=datetime.timezone.utc)
def get_calendar_items(self, now):
return {
'year': now.year,
'month': now.month,
'day_of_month': now.day,
'hour': now.hour,
'minute': now.minute,
'day_of_week': now.weekday(),
'week_of_year': now.isocalendar().week
}
def get_market_cycle_position(self, now_utc):
days_since_halving = (now_utc - self.last_halving).days
position = (days_since_halving % self.cycle_length_days) / self.cycle_length_days
return position
def get_moon_phase(self, now_utc):
t = Time(now_utc)
with solar_system_ephemeris.set('builtin'):
moon = get_body('moon', t)
sun = get_body('sun', t)
elongation = sun.separation(moon)
phase_angle = np.arctan2(sun.distance * np.sin(elongation),
moon.distance - sun.distance * np.cos(elongation))
illumination = (1 + np.cos(phase_angle)) / 2.0
phase_name = "WAXING"
if illumination < 0.03: phase_name = "NEW_MOON"
elif illumination > 0.97: phase_name = "FULL_MOON"
elif illumination < 0.5: phase_name = "WAXING_CRESCENT" if moon.dec.deg > sun.dec.deg else "WANING_CRESCENT"
else: phase_name = "WAXING_GIBBOUS" if moon.dec.deg > sun.dec.deg else "WANING_GIBBOUS"
return {'illumination': float(illumination), 'phase_name': phase_name}
def is_mercury_retrograde(self, now_utc):
t = Time(now_utc)
is_retro = False
try:
with solar_system_ephemeris.set('builtin'):
loc = EarthLocation.of_site('greenwich')
merc_now = get_body('mercury', t, loc)
merc_later = get_body('mercury', t + 1 * u.day, loc)
lon_now = merc_now.geometrictrueecliptic.lon.deg
lon_later = merc_later.geometrictrueecliptic.lon.deg
diff = (lon_later - lon_now) % 360
is_retro = diff > 180
except Exception as e:
pass
return is_retro
def get_indicators(self, now_utc):
# We drop hour-specific ones since we evaluate at a fixed time daily
moon_data = self.get_moon_phase(now_utc)
calendar = self.get_calendar_items(now_utc)
return {
'timestamp': now_utc.isoformat(),
'day_of_week': calendar['day_of_week'],
'week_of_year': calendar['week_of_year'],
'market_cycle_position': round(self.get_market_cycle_position(now_utc), 4),
'moon_illumination': moon_data['illumination'],
'moon_phase_name': moon_data['phase_name'],
'mercury_retrograde': int(self.is_mercury_retrograde(now_utc)),
}
VBT_DIR = Path(r"C:\Users\Lenovo\Documents\- DOLPHIN NG HD HCM TSF Predict\vbt_cache")
parquet_files = sorted(VBT_DIR.glob("*.parquet"))
parquet_files = [p for p in parquet_files if 'catalog' not in str(p)]
print("Loading data & extracting daily precursor AND ESOTERIC metrics...")
daily_metrics = []
mi = MarketIndicators()
for pf in parquet_files:
ds = pf.stem
df = pd.read_parquet(pf)
# 1. Structural Precursors
vd = df['vel_div'].fillna(0).values
vol_accel = np.diff(vd, prepend=vd[0])
daily_vol_accel_max = np.max(np.abs(vol_accel))
assets = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT', 'SOLUSDT']
valid_assets = [a for a in assets if a in df.columns]
if len(valid_assets) > 1:
rets = df[valid_assets].pct_change().fillna(0)
corr_matrix = rets.corr().values
cross_corr = corr_matrix[np.triu_indices_from(corr_matrix, k=1)]
mean_cross_corr = np.nanmean(cross_corr)
max_cross_corr = np.nanmax(cross_corr)
else:
mean_cross_corr = 0; max_cross_corr = 0
entropy_max = df['instability_50'].max() if 'instability_50' in df.columns else 0
v750_max = df['v750_lambda_max_velocity'].max() if 'v750_lambda_max_velocity' in df.columns else 0
v50_max = df['v50_lambda_max_velocity'].max() if 'v50_lambda_max_velocity' in df.columns else 0
# 2. Esoteric Factors
y, m, d = map(int, ds.split('-'))
dt_utc = datetime.datetime(y, m, d, 12, 0, 0, tzinfo=datetime.timezone.utc)
eso = mi.get_indicators(dt_utc)
# Optionally save to disk as requested
eso_path = VBT_DIR / f"ESOTERIC_data_{ds}.json"
if not eso_path.exists():
with open(eso_path, 'w') as f:
json.dump(eso, f, indent=2)
daily_metrics.append({
'Date': ds,
'vol_accel_max': daily_vol_accel_max,
'cross_corr_mean': mean_cross_corr,
'cross_corr_max': max_cross_corr,
'entropy_max': entropy_max,
'v750_max': v750_max,
'v50_max': v50_max,
'day_of_week': eso['day_of_week'],
'week_of_year': eso['week_of_year'],
'market_cycle_position': eso['market_cycle_position'],
'moon_illumination': eso['moon_illumination'],
'mercury_retrograde': eso['mercury_retrograde']
})
metrics_df = pd.DataFrame(daily_metrics).set_index('Date')
precursor_df = metrics_df.shift(1).dropna() # Shift 1 day to ensure it's a T-1 precursor
print("Running fast 6.0x trajectory to isolate daily PnL...")
pq_data = {}
all_vols = []
for pf in parquet_files:
df = pd.read_parquet(pf)
ac = [c for c in df.columns if c not in {'timestamp', 'scan_number', 'v50_lambda_max_velocity', 'v150_lambda_max_velocity',
'v300_lambda_max_velocity', 'v750_lambda_max_velocity', 'vel_div',
'instability_50', 'instability_150'}]
dv = df['vel_div'].values if 'vel_div' in df.columns else np.zeros(len(df))
pq_data[pf.stem] = (df, ac, dv)
if 'BTCUSDT' in df.columns:
pr = df['BTCUSDT'].values
for i in range(60, len(pr)):
seg = pr[max(0,i-50):i]
if len(seg)<10: continue
v = float(np.std(np.diff(seg)/seg[:-1]))
if v > 0: all_vols.append(v)
vol_p60 = float(np.percentile(all_vols, 60))
acb = AdaptiveCircuitBreaker()
acb.preload_w750([pf.stem for pf in parquet_files])
mock = MockOBProvider(imbalance_bias=-0.09, depth_scale=1.0,
assets=["BTCUSDT", "ETHUSDT", "BNBUSDT", "SOLUSDT"],
imbalance_biases={"BNBUSDT": 0.20, "SOLUSDT": 0.20})
ob_engine_inst = OBFeatureEngine(mock)
ob_engine_inst.preload_date("mock", mock.get_assets())
ENGINE_KWARGS = dict(
initial_capital=25000.0, vel_div_threshold=-0.02, vel_div_extreme=-0.05,
min_leverage=0.5, max_leverage=6.0, leverage_convexity=3.0,
fraction=0.20, fixed_tp_pct=0.0099, stop_pct=1.0, max_hold_bars=120,
use_direction_confirm=True, dc_lookback_bars=7, dc_min_magnitude_bps=0.75,
dc_skip_contradicts=True, dc_leverage_boost=1.0, dc_leverage_reduce=0.5,
use_asset_selection=True, min_irp_alignment=0.45,
use_sp_fees=True, use_sp_slippage=True,
use_ob_edge=True, ob_edge_bps=5.0, ob_confirm_rate=0.40,
lookback=100, use_alpha_layers=True, use_dynamic_leverage=True, seed=42,
)
engine = NDAlphaEngine(**ENGINE_KWARGS)
engine.set_ob_engine(ob_engine_inst)
daily_returns = {}
bar_idx = 0
all_vols_engine = []
for pf in parquet_files:
ds = pf.stem
cs = engine.capital
acb_info = acb.get_dynamic_boost_for_date(ds, ob_engine=ob_engine_inst)
base_boost = acb_info['boost']
beta = acb_info['beta']
df, acols, dvol_raw = pq_data[ds]
ph = {}
for ri in range(len(df)):
row = df.iloc[ri]
vd = dvol_raw[ri]
if not np.isfinite(vd): bar_idx+=1; continue
prices = {}
for ac in acols:
p = row[ac]
if p and p > 0 and np.isfinite(p):
prices[ac] = float(p)
if ac not in ph: ph[ac] = []
ph[ac].append(float(p))
if len(ph[ac]) > 500: ph[ac] = ph[ac][-200:]
if not prices: bar_idx+=1; continue
btc_hist = ph.get("BTCUSDT", [])
engine_vrok = False
if len(btc_hist) >= 50:
seg = btc_hist[-50:]
vd_eng = float(np.std(np.diff(seg)/np.array(seg[:-1])))
all_vols_engine.append(vd_eng)
if len(all_vols_engine) > 100:
engine_vrok = vd_eng > np.percentile(all_vols_engine, 60)
if beta > 0:
ss = 0.0
if vd < -0.02:
raw = (-0.02 - float(vd)) / (-0.02 - -0.05)
ss = min(1.0, max(0.0, raw)) ** 3.0
engine.regime_size_mult = base_boost * (1.0 + beta * ss)
else:
engine.regime_size_mult = base_boost
engine.process_bar(bar_idx=bar_idx, vel_div=float(vd), prices=prices, vol_regime_ok=engine_vrok, price_histories=ph)
bar_idx += 1
daily_returns[ds] = (engine.capital - cs) / cs if cs > 0 else 0
returns_df = pd.DataFrame.from_dict(daily_returns, orient='index', columns=['Return'])
merged = precursor_df.join(returns_df, how='inner')
threshold_pnl = merged['Return'].quantile(0.10)
merged['Is_Tail'] = merged['Return'] <= threshold_pnl
base_p_tail = merged['Is_Tail'].mean()
print(f"\n==================================================================================================")
print(f" CONDITIONAL PROBABILITY ANALYSIS: P(Tail | Precursor Spike)")
print(f" Baseline P(Tail) = {base_p_tail:.1%}")
print(f"==================================================================================================")
print(f"{'Feature':<25} | {'P(Tail|X > 75th)':<20} | {'P(Tail|X > 90th)':<20} | {'P(Tail|X > 95th)':<20}")
print("-" * 98)
features = ['vol_accel_max', 'v750_max', 'v50_max', 'entropy_max', 'cross_corr_max']
for f in features:
p75 = merged[f].quantile(0.75)
p90 = merged[f].quantile(0.90)
p95 = merged[f].quantile(0.95)
cond_75 = merged[merged[f] > p75]['Is_Tail'].mean() if len(merged[merged[f]>p75]) > 0 else 0
cond_90 = merged[merged[f] > p90]['Is_Tail'].mean() if len(merged[merged[f]>p90]) > 0 else 0
cond_95 = merged[merged[f] > p95]['Is_Tail'].mean() if len(merged[merged[f]>p95]) > 0 else 0
print(f"{f:<25} | {cond_75:>19.1%} | {cond_90:>19.1%} | {cond_95:>19.1%}")
print(f"\n==================================================================================================")
print(f" ESOTERIC FACTORS CORRELATION WITH EXTREME LEFT TAILS")
print(f"==================================================================================================")
esoteric_features = ['day_of_week', 'moon_illumination', 'market_cycle_position', 'mercury_retrograde']
for f in esoteric_features:
# See if Esoteric factors map to higher chance of tail events
if f == 'day_of_week':
print("\nDay of Week P(Tail):")
for d in range(7):
subset = merged[merged[f] == d]
if len(subset) > 0:
print(f" Day {d}: {subset['Is_Tail'].mean():.1%} (N={len(subset)})")
elif f == 'mercury_retrograde':
print("\nMercury Retrograde P(Tail):")
for d in [0, 1]:
subset = merged[merged[f] == d]
if len(subset) > 0:
print(f" Retrograde={d}: {subset['Is_Tail'].mean():.1%} (N={len(subset)})")
else:
# Continuous values
p75 = merged[f].quantile(0.75)
cond_75 = merged[merged[f] > p75]['Is_Tail'].mean() if len(merged[merged[f]>p75]) > 0 else 0
p25 = merged[f].quantile(0.25)
cond_25 = merged[merged[f] < p25]['Is_Tail'].mean() if len(merged[merged[f]<p25]) > 0 else 0
print(f"\n{f}:")
print(f" P(Tail | Top 25% {f}): {cond_75:.1%}")
print(f" P(Tail | Bot 25% {f}): {cond_25:.1%}")