diff --git a/adaptive_exit/adaptive_exit_engine.py b/adaptive_exit/adaptive_exit_engine.py index b858d4b..ff31cc0 100755 --- a/adaptive_exit/adaptive_exit_engine.py +++ b/adaptive_exit/adaptive_exit_engine.py @@ -35,7 +35,7 @@ import os import threading import time import urllib.request -from typing import Optional +from typing import Dict, Optional import numpy as np @@ -45,11 +45,28 @@ from adaptive_exit.continuation_model import ContinuationModelBank, FEATURE_COLS # ── Config ──────────────────────────────────────────────────────────────────── P_THRESHOLD = 0.40 # P(continuation) below this → consider exit GIVEBACK_K = 0.50 # MFE giveback fraction -MAE_MULT_TIER1 = 3.5 # vol multiplier for tier-1 stop +MAE_MULT_TIER1 = 3.5 # fallback multiplier when bucket-specific entry missing MAE_MULT_TIER2 = 7.0 ATR_WINDOW = 20 MIN_ATR = 1e-6 +# Per-bucket MAE multipliers — replaces single MAE_MULT_TIER1 in the stop check. +# Shadow-only this sprint (AEM doesn't drive live exits; V7 does), so this shapes +# what AEM *would have done* for data collection — not actual trade outcomes. +# `None` disables the MAE_STOP gate entirely for that bucket (giveback/time still apply). +# B3 — natural winners; shadow shows 5.0–5.1 MAE peaks before FIXED_TP succeeds +# B4 — gross-negative alpha; cut fast before drawdown compounds +# B6 — extreme-vol assets; wide band or we trip on noise +MAE_MULT_BY_BUCKET: Dict[int, Optional[float]] = { + 0: 3.5, + 1: 3.0, + 2: 3.5, + 3: None, + 4: 2.0, + 5: 4.0, + 6: 6.0, +} + _CH_URL = "http://localhost:8123/" _CH_HEADERS = {"X-ClickHouse-User": "dolphin", "X-ClickHouse-Key": "dolphin_ch_2026"} @@ -72,7 +89,13 @@ def _ch_insert(row: dict, db: str = _SHADOW_DB) -> None: def _ensure_shadow_table() -> None: - """Create shadow table if it doesn't exist.""" + """Create shadow table if it doesn't exist. + + Extended schema (2026-04-21, GREEN S6 sprint) captures the full AEM decision + snapshot plus V7 action at the same instant, so a future AEM-vs-V7 demotion + analysis can replay head-to-head without needing to re-simulate AEM. + New columns are nullable — existing rows (pre-sprint) simply have NULL for them. + """ ddl = ( f"CREATE TABLE IF NOT EXISTS {_SHADOW_DB}.{_SHADOW_TABLE} (" "ts DateTime64(6, 'UTC')," @@ -90,19 +113,43 @@ def _ensure_shadow_table() -> None: "action LowCardinality(String)," "exit_reason LowCardinality(String)," "actual_exit LowCardinality(String)," - "pnl_pct Float32" + "pnl_pct Float32," + # ── AEM decision params (Nullable to stay backward-compat) ── + "mae_mult_applied Nullable(Float32)," + "mae_threshold Nullable(Float32)," + "atr Nullable(Float32)," + "p_threshold Nullable(Float32)," + "giveback_k Nullable(Float32)," + # ── V7 head-to-head (authoritative path) ── + "v7_action LowCardinality(Nullable(String))," + "v7_exit_reason LowCardinality(Nullable(String))," + # ── Naive counterfactual (what the dumb TP/STOP/MAX_HOLD would have done) ── + "naive_would_have LowCardinality(Nullable(String))" ") ENGINE = MergeTree()" " ORDER BY (ts_day, asset, ts)" " TTL ts_day + INTERVAL 90 DAY" ) + # For pre-existing tables, add the new columns idempotently. CH treats + # ADD COLUMN IF NOT EXISTS as a no-op when the column is already present. + alters = [ + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS mae_mult_applied Nullable(Float32)", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS mae_threshold Nullable(Float32)", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS atr Nullable(Float32)", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS p_threshold Nullable(Float32)", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS giveback_k Nullable(Float32)", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS v7_action LowCardinality(Nullable(String))", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS v7_exit_reason LowCardinality(Nullable(String))", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS naive_would_have LowCardinality(Nullable(String))", + ] try: - body = ddl.encode() - req = urllib.request.Request(_CH_URL, data=body, method="POST") - for k, v in _CH_HEADERS.items(): - req.add_header(k, v) - urllib.request.urlopen(req, timeout=10) + for stmt in (ddl, *alters): + body = stmt.encode() + req = urllib.request.Request(_CH_URL, data=body, method="POST") + for k, v in _CH_HEADERS.items(): + req.add_header(k, v) + urllib.request.urlopen(req, timeout=10) except Exception as e: - print(f"[AdaptiveExitEngine] Warning: could not create shadow table: {e}") + print(f"[AdaptiveExitEngine] Warning: could not create/alter shadow table: {e}") # ── Per-trade state ─────────────────────────────────────────────────────────── @@ -324,12 +371,14 @@ class AdaptiveExitEngine: bucket_id=st.bucket_id, ) - # Decision logic - mae_threshold = max(0.005, MAE_MULT_TIER1 * atr) + # Decision logic — per-bucket MAE multiplier. `None` entry disables the + # MAE_STOP gate for that bucket (giveback + time checks still apply). + mae_mult = MAE_MULT_BY_BUCKET.get(st.bucket_id, MAE_MULT_TIER1) + mae_threshold = max(0.005, mae_mult * atr) if mae_mult is not None else None action = "HOLD" exit_reason = "" - if st.mae > mae_threshold: + if mae_threshold is not None and st.mae > mae_threshold: action = "EXIT" exit_reason = "AE_MAE_STOP" elif (st.peak_mfe > 0 and st.mfe < GIVEBACK_K * st.peak_mfe @@ -352,10 +401,31 @@ class AdaptiveExitEngine: "bucket_id": st.bucket_id, "vel_div_entry": st.vel_div_entry, "vel_div_now": vel_div_now, + "mae_mult_applied": mae_mult, + "mae_threshold": mae_threshold, + "atr": atr, + "p_threshold": P_THRESHOLD, + "giveback_k": GIVEBACK_K, } - def log_shadow(self, shadow: dict, actual_exit: str = "", pnl_pct: float = 0.0) -> None: - """Async log a shadow decision to ClickHouse.""" + def log_shadow( + self, + shadow: dict, + actual_exit: str = "", + pnl_pct: float = 0.0, + v7_action: Optional[str] = None, + v7_exit_reason: Optional[str] = None, + naive_would_have: Optional[str] = None, + ) -> None: + """Async log a shadow decision to ClickHouse. + + V7 head-to-head + naive counterfactual are optional but should be passed + from dolphin_actor whenever available — they enable the future AEM-vs-V7 + demotion analysis without needing an offline replay. + """ + def _opt(v): + return None if v is None else float(v) + row = { "ts": int(time.time() * 1e6), "trade_id": shadow.get("trade_id", ""), @@ -372,5 +442,15 @@ class AdaptiveExitEngine: "exit_reason": shadow.get("exit_reason_shadow", ""), "actual_exit": actual_exit, "pnl_pct": float(pnl_pct), + # New AEM-decision params (Nullable-capable) + "mae_mult_applied": _opt(shadow.get("mae_mult_applied")), + "mae_threshold": _opt(shadow.get("mae_threshold")), + "atr": _opt(shadow.get("atr")), + "p_threshold": _opt(shadow.get("p_threshold")), + "giveback_k": _opt(shadow.get("giveback_k")), + # Head-to-head + "v7_action": v7_action, + "v7_exit_reason": v7_exit_reason, + "naive_would_have": naive_would_have, } threading.Thread(target=_ch_insert, args=(row,), daemon=True).start() diff --git a/nautilus_dolphin/nautilus_dolphin/nautilus/dolphin_actor.py b/nautilus_dolphin/nautilus_dolphin/nautilus/dolphin_actor.py index 743e4c4..769d346 100755 --- a/nautilus_dolphin/nautilus_dolphin/nautilus/dolphin_actor.py +++ b/nautilus_dolphin/nautilus_dolphin/nautilus/dolphin_actor.py @@ -5,6 +5,7 @@ import threading import time from collections import deque, namedtuple from datetime import datetime, timezone +from typing import Optional import numpy as np import pandas as pd from pathlib import Path @@ -136,6 +137,7 @@ class DolphinActor(Strategy): self._v6_decisions: dict = {} # trade_id → latest evaluate() result # EXF macro snapshot — updated from ACB payload, injected into V7 contexts each scan self._last_exf: dict = {} # keys: funding, dvol, fear_greed, taker + self._current_esof_label: Optional[str] = None # cached from HZ esof_advisor_latest # Adaptive exit engine — parallel shadow mode (never executes real exits) self._adaptive_exit = None # Stablecoin symbols ÔÇö kept in eigen for purity, hard-blocked at picker @@ -147,7 +149,58 @@ class DolphinActor(Strategy): self.btc_prices: deque = deque(maxlen=BTC_VOL_WINDOW + 2) self._bucket_assignments: dict = {} self._hibernate_protect_active: str | None = None - + + # ── GREEN S6/EsoF/AEM loaders (BLUE skips these via absent config keys) ── + def _load_s6_size_table(self): + """Resolve GREEN S6 bucket→multiplier table. + + Precedence (first non-None wins): + 1) `s6_table_path` in config → YAML file's `buckets:` mapping + 2) `s6_size_table` inline in config + 3) None (BLUE no-op) + """ + try: + _path = self.dolphin_config.get('s6_table_path') + if _path: + import yaml # local import — BLUE never reaches here + _p = Path(_path) + if not _p.is_absolute(): + _p = Path.cwd() / _p + if _p.exists(): + with open(_p, 'r') as _f: + _doc = yaml.safe_load(_f) or {} + _b = _doc.get('buckets') + if isinstance(_b, dict): + return {int(k): float(v) for k, v in _b.items()} + _inline = self.dolphin_config.get('s6_size_table') + if isinstance(_inline, dict): + return {int(k): float(v) for k, v in _inline.items()} + except Exception as _e: + self.log.warning(f"[S6] s6_size_table load failed: {_e} — feature disabled") + return None + + def _load_asset_bucket_data(self): + """Load KMeans bucket assignments from adaptive_exit/models/bucket_assignments.pkl. + + Returns the authoritative `{"assignments": {symbol: int, ...}, ...}` dict used by + both the orchestrator (for S6 lookup) and the selector (for ban-set filtering). + """ + try: + import pickle + _path = self.dolphin_config.get( + 'asset_bucket_pkl', + 'adaptive_exit/models/bucket_assignments.pkl', + ) + _p = Path(_path) + if not _p.is_absolute(): + _p = Path.cwd() / _p + if _p.exists(): + with open(_p, 'rb') as _f: + return pickle.load(_f) + except Exception as _e: + self.log.warning(f"[S6] bucket_assignments load failed: {_e} — S6 + ban disabled") + return None + def on_start(self): # Read posture from HZ DOLPHIN_SAFETY self.hz_client = self._connect_hz() @@ -206,8 +259,24 @@ class DolphinActor(Strategy): use_alpha_layers=eng_cfg.get('use_alpha_layers', True), use_dynamic_leverage=eng_cfg.get('use_dynamic_leverage', True), seed=eng_cfg.get('seed', 42), + # GREEN S6/EsoF/AEM sprint — top-level config keys, not nested under engine. + # BLUE leaves these unset → orchestrator reads None/False → BLUE no-op. + s6_size_table=self._load_s6_size_table(), + esof_sizing_table=self.dolphin_config.get('esof_sizing_table'), + asset_bucket_data=self._load_asset_bucket_data(), + use_int_leverage=bool(self.dolphin_config.get('use_int_leverage', False)), ) self.engine = create_boost_engine(mode=boost_mode, **_engine_kwargs) + + # Wire asset selector ban-set (shared file, BLUE-invariant when ban_set is None). + _ban = self.dolphin_config.get('asset_bucket_ban_set') + _bucket_data = _engine_kwargs.get('asset_bucket_data') or {} + if _ban: + try: + self.engine.asset_selector.asset_bucket_ban_set = set(int(b) for b in _ban) + self.engine.asset_selector.asset_bucket_assignments = dict(_bucket_data.get('assignments', {})) + except Exception as _e: + self.log.warning(f"[S6] Failed to wire asset_bucket_ban_set: {_e}") self.engine.set_esoteric_hazard_multiplier(0.0) # gold spec: init guard, MUST precede set_mc_forewarner # == MC-Forewarner injection =========================================== @@ -509,10 +578,34 @@ class DolphinActor(Strategy): added_func=self._on_scan_event, updated_func=self._on_scan_event, ) - self.log.info("[HZ] Push listeners registered: acb_boost + latest_eigen_scan") + # EsoF advisor listener — feeds orchestrator regime gate at _try_entry. + # Callback is zero-work (JSON parse + dict write); the label is consumed + # just before step_bar on the timer thread. + features.add_entry_listener( + include_value=True, + key='esof_advisor_latest', + added_func=self._on_esof_event, + updated_func=self._on_esof_event, + ) + self.log.info("[HZ] Push listeners registered: acb_boost + latest_eigen_scan + esof_advisor_latest") except Exception as e: self.log.error(f"Failed to setup ACB listener: {e}") + def _on_esof_event(self, event): + """Cache EsoF label for the orchestrator regime gate. Tolerates stale JSON + schema — on any parse error we fall back to no label (orchestrator treats + None as UNKNOWN, the new renamed conflict-state default).""" + try: + val = event.value + if not val: + return + parsed = json.loads(val) if isinstance(val, str) else val + label = parsed.get('advisory_label') if isinstance(parsed, dict) else None + if isinstance(label, str) and label: + self._current_esof_label = label + except Exception: + self._current_esof_label = None + def _on_scan_event(self, event): """HZ reactor-thread callback -- fires immediately when NG7 writes to HZ. Zero-work: stores raw string + sets edge-trigger flag. No JSON parsing, @@ -683,6 +776,13 @@ class DolphinActor(Strategy): getattr(self.engine, '_mc_gate_open', True), ) + # Feed EsoF label into orchestrator — consumed by regime gate at _try_entry top. + # Engine tolerates None (treated as UNKNOWN under the NEUTRAL→UNKNOWN rename). + try: + self.engine._current_esof_label = self._current_esof_label + except Exception: + pass + _step_start = time.monotonic() try: result = self.engine.step_bar( @@ -1536,7 +1636,41 @@ class DolphinActor(Strategy): exf=self._last_exf, ) _shadow['asset'] = _pend_ae['asset'] - self._adaptive_exit.log_shadow(_shadow) + + # V7 head-to-head: pull the authoritative-path decision for + # the same trade at the same instant (may be None if V7 not + # wired or hasn't decided yet — log_shadow tolerates None). + _v7_dec_ae = self._v6_decisions.get(_tid_ae) or {} + _v7_action = _v7_dec_ae.get('action') + _v7_exit_reason = _v7_dec_ae.get('reason') + + # Naive counterfactual: pure TP/STOP/MAX_HOLD at this bar. + _naive_would_have: Optional[str] = None + try: + _eng_cfg_ae = self.dolphin_config.get('engine', {}) + _tp = float(_eng_cfg_ae.get('fixed_tp_pct', 0.0095)) + _sl = float(_eng_cfg_ae.get('stop_pct', 0.0)) or 0.0 + _mh = int(_eng_cfg_ae.get('max_hold_bars', 120)) + _dir_ae = -1 if _pend_ae['side'] == 'SHORT' else 1 + _ep_ae = float(_pend_ae['entry_price'] or 0.0) + _pnl_naive = (_dir_ae * (_ep_ae - float(_cur_px_ae)) / _ep_ae) if _ep_ae > 0 else 0.0 + if _pnl_naive >= _tp: + _naive_would_have = 'TP' + elif _sl > 0 and _pnl_naive <= -_sl: + _naive_would_have = 'STOP' + elif _bars_ae >= _mh: + _naive_would_have = 'MAX_HOLD' + else: + _naive_would_have = 'HOLD' + except Exception: + _naive_would_have = None + + self._adaptive_exit.log_shadow( + _shadow, + v7_action=_v7_action, + v7_exit_reason=_v7_exit_reason, + naive_would_have=_naive_would_have, + ) except Exception: pass # shadow must never affect live trading