diff --git a/Observability/esof_advisor.py b/Observability/esof_advisor.py index 716e8d6..8be31c7 100755 --- a/Observability/esof_advisor.py +++ b/Observability/esof_advisor.py @@ -291,7 +291,10 @@ def compute_esof(now: datetime = None) -> dict: if advisory_score > 0.25: advisory_label = "FAVORABLE" elif advisory_score > 0.05: advisory_label = "MILD_POSITIVE" - elif advisory_score > -0.05: advisory_label = "NEUTRAL" + # UNKNOWN (was NEUTRAL): constituent signals in conflict. Empirically the worst + # ROI bucket, not a benign mid-state — naming is load-bearing for consumers + # making "stand aside vs size-down" decisions. + elif advisory_score > -0.05: advisory_label = "UNKNOWN" elif advisory_score > -0.25: advisory_label = "MILD_NEGATIVE" else: advisory_label = "UNFAVORABLE" @@ -394,7 +397,8 @@ CYAN = "\033[36m"; BOLD = "\033[1m"; DIM = "\033[2m"; RST = "\033[0m" LABEL_COLOR = { "FAVORABLE": GREEN, "MILD_POSITIVE":"\033[92m", - "NEUTRAL": YELLOW, + "UNKNOWN": YELLOW, # renamed from NEUTRAL — signals-in-conflict + "NEUTRAL": YELLOW, # backward-compat for historical CH rows / stale HZ snapshots "MILD_NEGATIVE":"\033[91m", "UNFAVORABLE": RED, } diff --git a/Observability/esof_gate.py b/Observability/esof_gate.py index 6941163..5d182e2 100755 --- a/Observability/esof_gate.py +++ b/Observability/esof_gate.py @@ -104,13 +104,16 @@ S6_MULT: Dict[str, Dict[int, float]] = { # B0 B1 B2 B3 B4 B5 B6 "FAVORABLE": {0: 0.65, 1: 0.50, 2: 0.0, 3: 2.0, 4: 0.20, 5: 0.75, 6: 1.5}, "MILD_POSITIVE": {0: 0.50, 1: 0.35, 2: 0.0, 3: 2.0, 4: 0.10, 5: 0.60, 6: 1.5}, + # UNKNOWN replaces NEUTRAL (constituent signals in conflict — empirically the + # worst-ROI state). Keep NEUTRAL as alias so historical CH replays still resolve. + "UNKNOWN": {0: 0.40, 1: 0.30, 2: 0.0, 3: 2.0, 4: 0.0, 5: 0.50, 6: 1.5}, "NEUTRAL": {0: 0.40, 1: 0.30, 2: 0.0, 3: 2.0, 4: 0.0, 5: 0.50, 6: 1.5}, "MILD_NEGATIVE": {0: 0.20, 1: 0.20, 2: 0.0, 3: 1.5, 4: 0.0, 5: 0.30, 6: 1.2}, "UNFAVORABLE": {0: 0.0, 1: 0.0, 2: 0.0, 3: 1.5, 4: 0.0, 5: 0.0, 6: 1.2}, } -# Base S6 (NEUTRAL row above) — exposed for quick reference -S6_BASE: Dict[int, float] = S6_MULT["NEUTRAL"] +# Base S6 — UNKNOWN/NEUTRAL rows above are identical (alias) +S6_BASE: Dict[int, float] = S6_MULT["UNKNOWN"] # ── IRP filter threshold tables keyed by advisory_label (Strategy S6_IRP) ───── @@ -121,13 +124,14 @@ S6_BASE: Dict[int, float] = S6_MULT["NEUTRAL"] IRP_PARAMS: Dict[str, Dict[str, float]] = { "FAVORABLE": {"alignment_min": 0.15, "noise_max": 640.0, "latency_max": 24}, "MILD_POSITIVE": {"alignment_min": 0.17, "noise_max": 560.0, "latency_max": 22}, - "NEUTRAL": {"alignment_min": 0.20, "noise_max": 500.0, "latency_max": 20}, + "UNKNOWN": {"alignment_min": 0.20, "noise_max": 500.0, "latency_max": 20}, + "NEUTRAL": {"alignment_min": 0.20, "noise_max": 500.0, "latency_max": 20}, # alias "MILD_NEGATIVE": {"alignment_min": 0.22, "noise_max": 440.0, "latency_max": 18}, "UNFAVORABLE": {"alignment_min": 0.25, "noise_max": 380.0, "latency_max": 15}, } -# Gold-spec thresholds (NEUTRAL row) -IRP_GOLD: Dict[str, float] = IRP_PARAMS["NEUTRAL"] +# Gold-spec thresholds (UNKNOWN/NEUTRAL row) +IRP_GOLD: Dict[str, float] = IRP_PARAMS["UNKNOWN"] # ── GateResult ───────────────────────────────────────────────────────────────── @@ -157,7 +161,8 @@ def strategy_A_lev_scale(adv: dict) -> GateResult: mult_map = { "UNFAVORABLE": 0.50, "MILD_NEGATIVE": 0.75, - "NEUTRAL": 1.00, + "UNKNOWN": 1.00, + "NEUTRAL": 1.00, # alias — historical CH replays "MILD_POSITIVE": 1.00, "FAVORABLE": 1.00, } diff --git a/adaptive_exit/adaptive_exit_engine.py b/adaptive_exit/adaptive_exit_engine.py index b858d4b..ff31cc0 100755 --- a/adaptive_exit/adaptive_exit_engine.py +++ b/adaptive_exit/adaptive_exit_engine.py @@ -35,7 +35,7 @@ import os import threading import time import urllib.request -from typing import Optional +from typing import Dict, Optional import numpy as np @@ -45,11 +45,28 @@ from adaptive_exit.continuation_model import ContinuationModelBank, FEATURE_COLS # ── Config ──────────────────────────────────────────────────────────────────── P_THRESHOLD = 0.40 # P(continuation) below this → consider exit GIVEBACK_K = 0.50 # MFE giveback fraction -MAE_MULT_TIER1 = 3.5 # vol multiplier for tier-1 stop +MAE_MULT_TIER1 = 3.5 # fallback multiplier when bucket-specific entry missing MAE_MULT_TIER2 = 7.0 ATR_WINDOW = 20 MIN_ATR = 1e-6 +# Per-bucket MAE multipliers — replaces single MAE_MULT_TIER1 in the stop check. +# Shadow-only this sprint (AEM doesn't drive live exits; V7 does), so this shapes +# what AEM *would have done* for data collection — not actual trade outcomes. +# `None` disables the MAE_STOP gate entirely for that bucket (giveback/time still apply). +# B3 — natural winners; shadow shows 5.0–5.1 MAE peaks before FIXED_TP succeeds +# B4 — gross-negative alpha; cut fast before drawdown compounds +# B6 — extreme-vol assets; wide band or we trip on noise +MAE_MULT_BY_BUCKET: Dict[int, Optional[float]] = { + 0: 3.5, + 1: 3.0, + 2: 3.5, + 3: None, + 4: 2.0, + 5: 4.0, + 6: 6.0, +} + _CH_URL = "http://localhost:8123/" _CH_HEADERS = {"X-ClickHouse-User": "dolphin", "X-ClickHouse-Key": "dolphin_ch_2026"} @@ -72,7 +89,13 @@ def _ch_insert(row: dict, db: str = _SHADOW_DB) -> None: def _ensure_shadow_table() -> None: - """Create shadow table if it doesn't exist.""" + """Create shadow table if it doesn't exist. + + Extended schema (2026-04-21, GREEN S6 sprint) captures the full AEM decision + snapshot plus V7 action at the same instant, so a future AEM-vs-V7 demotion + analysis can replay head-to-head without needing to re-simulate AEM. + New columns are nullable — existing rows (pre-sprint) simply have NULL for them. + """ ddl = ( f"CREATE TABLE IF NOT EXISTS {_SHADOW_DB}.{_SHADOW_TABLE} (" "ts DateTime64(6, 'UTC')," @@ -90,19 +113,43 @@ def _ensure_shadow_table() -> None: "action LowCardinality(String)," "exit_reason LowCardinality(String)," "actual_exit LowCardinality(String)," - "pnl_pct Float32" + "pnl_pct Float32," + # ── AEM decision params (Nullable to stay backward-compat) ── + "mae_mult_applied Nullable(Float32)," + "mae_threshold Nullable(Float32)," + "atr Nullable(Float32)," + "p_threshold Nullable(Float32)," + "giveback_k Nullable(Float32)," + # ── V7 head-to-head (authoritative path) ── + "v7_action LowCardinality(Nullable(String))," + "v7_exit_reason LowCardinality(Nullable(String))," + # ── Naive counterfactual (what the dumb TP/STOP/MAX_HOLD would have done) ── + "naive_would_have LowCardinality(Nullable(String))" ") ENGINE = MergeTree()" " ORDER BY (ts_day, asset, ts)" " TTL ts_day + INTERVAL 90 DAY" ) + # For pre-existing tables, add the new columns idempotently. CH treats + # ADD COLUMN IF NOT EXISTS as a no-op when the column is already present. + alters = [ + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS mae_mult_applied Nullable(Float32)", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS mae_threshold Nullable(Float32)", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS atr Nullable(Float32)", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS p_threshold Nullable(Float32)", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS giveback_k Nullable(Float32)", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS v7_action LowCardinality(Nullable(String))", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS v7_exit_reason LowCardinality(Nullable(String))", + f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS naive_would_have LowCardinality(Nullable(String))", + ] try: - body = ddl.encode() - req = urllib.request.Request(_CH_URL, data=body, method="POST") - for k, v in _CH_HEADERS.items(): - req.add_header(k, v) - urllib.request.urlopen(req, timeout=10) + for stmt in (ddl, *alters): + body = stmt.encode() + req = urllib.request.Request(_CH_URL, data=body, method="POST") + for k, v in _CH_HEADERS.items(): + req.add_header(k, v) + urllib.request.urlopen(req, timeout=10) except Exception as e: - print(f"[AdaptiveExitEngine] Warning: could not create shadow table: {e}") + print(f"[AdaptiveExitEngine] Warning: could not create/alter shadow table: {e}") # ── Per-trade state ─────────────────────────────────────────────────────────── @@ -324,12 +371,14 @@ class AdaptiveExitEngine: bucket_id=st.bucket_id, ) - # Decision logic - mae_threshold = max(0.005, MAE_MULT_TIER1 * atr) + # Decision logic — per-bucket MAE multiplier. `None` entry disables the + # MAE_STOP gate for that bucket (giveback + time checks still apply). + mae_mult = MAE_MULT_BY_BUCKET.get(st.bucket_id, MAE_MULT_TIER1) + mae_threshold = max(0.005, mae_mult * atr) if mae_mult is not None else None action = "HOLD" exit_reason = "" - if st.mae > mae_threshold: + if mae_threshold is not None and st.mae > mae_threshold: action = "EXIT" exit_reason = "AE_MAE_STOP" elif (st.peak_mfe > 0 and st.mfe < GIVEBACK_K * st.peak_mfe @@ -352,10 +401,31 @@ class AdaptiveExitEngine: "bucket_id": st.bucket_id, "vel_div_entry": st.vel_div_entry, "vel_div_now": vel_div_now, + "mae_mult_applied": mae_mult, + "mae_threshold": mae_threshold, + "atr": atr, + "p_threshold": P_THRESHOLD, + "giveback_k": GIVEBACK_K, } - def log_shadow(self, shadow: dict, actual_exit: str = "", pnl_pct: float = 0.0) -> None: - """Async log a shadow decision to ClickHouse.""" + def log_shadow( + self, + shadow: dict, + actual_exit: str = "", + pnl_pct: float = 0.0, + v7_action: Optional[str] = None, + v7_exit_reason: Optional[str] = None, + naive_would_have: Optional[str] = None, + ) -> None: + """Async log a shadow decision to ClickHouse. + + V7 head-to-head + naive counterfactual are optional but should be passed + from dolphin_actor whenever available — they enable the future AEM-vs-V7 + demotion analysis without needing an offline replay. + """ + def _opt(v): + return None if v is None else float(v) + row = { "ts": int(time.time() * 1e6), "trade_id": shadow.get("trade_id", ""), @@ -372,5 +442,15 @@ class AdaptiveExitEngine: "exit_reason": shadow.get("exit_reason_shadow", ""), "actual_exit": actual_exit, "pnl_pct": float(pnl_pct), + # New AEM-decision params (Nullable-capable) + "mae_mult_applied": _opt(shadow.get("mae_mult_applied")), + "mae_threshold": _opt(shadow.get("mae_threshold")), + "atr": _opt(shadow.get("atr")), + "p_threshold": _opt(shadow.get("p_threshold")), + "giveback_k": _opt(shadow.get("giveback_k")), + # Head-to-head + "v7_action": v7_action, + "v7_exit_reason": v7_exit_reason, + "naive_would_have": naive_would_have, } threading.Thread(target=_ch_insert, args=(row,), daemon=True).start() diff --git a/nautilus_dolphin/nautilus_dolphin/nautilus/alpha_asset_selector.py b/nautilus_dolphin/nautilus_dolphin/nautilus/alpha_asset_selector.py index 716ca1f..e5f134c 100755 --- a/nautilus_dolphin/nautilus_dolphin/nautilus/alpha_asset_selector.py +++ b/nautilus_dolphin/nautilus_dolphin/nautilus/alpha_asset_selector.py @@ -10,7 +10,7 @@ import math import numpy as np from numba import njit from dataclasses import dataclass -from typing import Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Set, Tuple # ── Constants (matching dolphin_vbt_real.py lines 104-115) ──────────────────── @@ -195,9 +195,13 @@ class AlphaAssetSelector: self, lookback_horizon: int = IRP_LOOKBACK, ars_weights: Optional[List[float]] = None, + asset_bucket_ban_set: Optional[Set[int]] = None, + asset_bucket_assignments: Optional[Dict[str, int]] = None, ): self.H = lookback_horizon self.weights = ars_weights or ARS_WEIGHTS + self.asset_bucket_ban_set = asset_bucket_ban_set + self.asset_bucket_assignments = asset_bucket_assignments or {} def rank_assets( self, @@ -236,6 +240,15 @@ class AlphaAssetSelector: best_eff = valid[ri, 4] asset = asset_names[asset_idx] + + # GREEN-only bucket-ban filter. BLUE passes None → no-op. + # Skips banned-bucket assets so the next-ranked asset takes the slot + # (preserves capital; a sizer 0× would waste the slot). + if self.asset_bucket_ban_set: + bkt = self.asset_bucket_assignments.get(asset) + if bkt is not None and bkt in self.asset_bucket_ban_set: + continue + action = "SHORT" if trade_dir == -1 else "LONG" orientation = "DIRECT" if trade_dir == regime_direction else "INVERSE" diff --git a/nautilus_dolphin/nautilus_dolphin/nautilus/dolphin_actor.py b/nautilus_dolphin/nautilus_dolphin/nautilus/dolphin_actor.py index 743e4c4..769d346 100755 --- a/nautilus_dolphin/nautilus_dolphin/nautilus/dolphin_actor.py +++ b/nautilus_dolphin/nautilus_dolphin/nautilus/dolphin_actor.py @@ -5,6 +5,7 @@ import threading import time from collections import deque, namedtuple from datetime import datetime, timezone +from typing import Optional import numpy as np import pandas as pd from pathlib import Path @@ -136,6 +137,7 @@ class DolphinActor(Strategy): self._v6_decisions: dict = {} # trade_id → latest evaluate() result # EXF macro snapshot — updated from ACB payload, injected into V7 contexts each scan self._last_exf: dict = {} # keys: funding, dvol, fear_greed, taker + self._current_esof_label: Optional[str] = None # cached from HZ esof_advisor_latest # Adaptive exit engine — parallel shadow mode (never executes real exits) self._adaptive_exit = None # Stablecoin symbols ÔÇö kept in eigen for purity, hard-blocked at picker @@ -147,7 +149,58 @@ class DolphinActor(Strategy): self.btc_prices: deque = deque(maxlen=BTC_VOL_WINDOW + 2) self._bucket_assignments: dict = {} self._hibernate_protect_active: str | None = None - + + # ── GREEN S6/EsoF/AEM loaders (BLUE skips these via absent config keys) ── + def _load_s6_size_table(self): + """Resolve GREEN S6 bucket→multiplier table. + + Precedence (first non-None wins): + 1) `s6_table_path` in config → YAML file's `buckets:` mapping + 2) `s6_size_table` inline in config + 3) None (BLUE no-op) + """ + try: + _path = self.dolphin_config.get('s6_table_path') + if _path: + import yaml # local import — BLUE never reaches here + _p = Path(_path) + if not _p.is_absolute(): + _p = Path.cwd() / _p + if _p.exists(): + with open(_p, 'r') as _f: + _doc = yaml.safe_load(_f) or {} + _b = _doc.get('buckets') + if isinstance(_b, dict): + return {int(k): float(v) for k, v in _b.items()} + _inline = self.dolphin_config.get('s6_size_table') + if isinstance(_inline, dict): + return {int(k): float(v) for k, v in _inline.items()} + except Exception as _e: + self.log.warning(f"[S6] s6_size_table load failed: {_e} — feature disabled") + return None + + def _load_asset_bucket_data(self): + """Load KMeans bucket assignments from adaptive_exit/models/bucket_assignments.pkl. + + Returns the authoritative `{"assignments": {symbol: int, ...}, ...}` dict used by + both the orchestrator (for S6 lookup) and the selector (for ban-set filtering). + """ + try: + import pickle + _path = self.dolphin_config.get( + 'asset_bucket_pkl', + 'adaptive_exit/models/bucket_assignments.pkl', + ) + _p = Path(_path) + if not _p.is_absolute(): + _p = Path.cwd() / _p + if _p.exists(): + with open(_p, 'rb') as _f: + return pickle.load(_f) + except Exception as _e: + self.log.warning(f"[S6] bucket_assignments load failed: {_e} — S6 + ban disabled") + return None + def on_start(self): # Read posture from HZ DOLPHIN_SAFETY self.hz_client = self._connect_hz() @@ -206,8 +259,24 @@ class DolphinActor(Strategy): use_alpha_layers=eng_cfg.get('use_alpha_layers', True), use_dynamic_leverage=eng_cfg.get('use_dynamic_leverage', True), seed=eng_cfg.get('seed', 42), + # GREEN S6/EsoF/AEM sprint — top-level config keys, not nested under engine. + # BLUE leaves these unset → orchestrator reads None/False → BLUE no-op. + s6_size_table=self._load_s6_size_table(), + esof_sizing_table=self.dolphin_config.get('esof_sizing_table'), + asset_bucket_data=self._load_asset_bucket_data(), + use_int_leverage=bool(self.dolphin_config.get('use_int_leverage', False)), ) self.engine = create_boost_engine(mode=boost_mode, **_engine_kwargs) + + # Wire asset selector ban-set (shared file, BLUE-invariant when ban_set is None). + _ban = self.dolphin_config.get('asset_bucket_ban_set') + _bucket_data = _engine_kwargs.get('asset_bucket_data') or {} + if _ban: + try: + self.engine.asset_selector.asset_bucket_ban_set = set(int(b) for b in _ban) + self.engine.asset_selector.asset_bucket_assignments = dict(_bucket_data.get('assignments', {})) + except Exception as _e: + self.log.warning(f"[S6] Failed to wire asset_bucket_ban_set: {_e}") self.engine.set_esoteric_hazard_multiplier(0.0) # gold spec: init guard, MUST precede set_mc_forewarner # == MC-Forewarner injection =========================================== @@ -509,10 +578,34 @@ class DolphinActor(Strategy): added_func=self._on_scan_event, updated_func=self._on_scan_event, ) - self.log.info("[HZ] Push listeners registered: acb_boost + latest_eigen_scan") + # EsoF advisor listener — feeds orchestrator regime gate at _try_entry. + # Callback is zero-work (JSON parse + dict write); the label is consumed + # just before step_bar on the timer thread. + features.add_entry_listener( + include_value=True, + key='esof_advisor_latest', + added_func=self._on_esof_event, + updated_func=self._on_esof_event, + ) + self.log.info("[HZ] Push listeners registered: acb_boost + latest_eigen_scan + esof_advisor_latest") except Exception as e: self.log.error(f"Failed to setup ACB listener: {e}") + def _on_esof_event(self, event): + """Cache EsoF label for the orchestrator regime gate. Tolerates stale JSON + schema — on any parse error we fall back to no label (orchestrator treats + None as UNKNOWN, the new renamed conflict-state default).""" + try: + val = event.value + if not val: + return + parsed = json.loads(val) if isinstance(val, str) else val + label = parsed.get('advisory_label') if isinstance(parsed, dict) else None + if isinstance(label, str) and label: + self._current_esof_label = label + except Exception: + self._current_esof_label = None + def _on_scan_event(self, event): """HZ reactor-thread callback -- fires immediately when NG7 writes to HZ. Zero-work: stores raw string + sets edge-trigger flag. No JSON parsing, @@ -683,6 +776,13 @@ class DolphinActor(Strategy): getattr(self.engine, '_mc_gate_open', True), ) + # Feed EsoF label into orchestrator — consumed by regime gate at _try_entry top. + # Engine tolerates None (treated as UNKNOWN under the NEUTRAL→UNKNOWN rename). + try: + self.engine._current_esof_label = self._current_esof_label + except Exception: + pass + _step_start = time.monotonic() try: result = self.engine.step_bar( @@ -1536,7 +1636,41 @@ class DolphinActor(Strategy): exf=self._last_exf, ) _shadow['asset'] = _pend_ae['asset'] - self._adaptive_exit.log_shadow(_shadow) + + # V7 head-to-head: pull the authoritative-path decision for + # the same trade at the same instant (may be None if V7 not + # wired or hasn't decided yet — log_shadow tolerates None). + _v7_dec_ae = self._v6_decisions.get(_tid_ae) or {} + _v7_action = _v7_dec_ae.get('action') + _v7_exit_reason = _v7_dec_ae.get('reason') + + # Naive counterfactual: pure TP/STOP/MAX_HOLD at this bar. + _naive_would_have: Optional[str] = None + try: + _eng_cfg_ae = self.dolphin_config.get('engine', {}) + _tp = float(_eng_cfg_ae.get('fixed_tp_pct', 0.0095)) + _sl = float(_eng_cfg_ae.get('stop_pct', 0.0)) or 0.0 + _mh = int(_eng_cfg_ae.get('max_hold_bars', 120)) + _dir_ae = -1 if _pend_ae['side'] == 'SHORT' else 1 + _ep_ae = float(_pend_ae['entry_price'] or 0.0) + _pnl_naive = (_dir_ae * (_ep_ae - float(_cur_px_ae)) / _ep_ae) if _ep_ae > 0 else 0.0 + if _pnl_naive >= _tp: + _naive_would_have = 'TP' + elif _sl > 0 and _pnl_naive <= -_sl: + _naive_would_have = 'STOP' + elif _bars_ae >= _mh: + _naive_would_have = 'MAX_HOLD' + else: + _naive_would_have = 'HOLD' + except Exception: + _naive_would_have = None + + self._adaptive_exit.log_shadow( + _shadow, + v7_action=_v7_action, + v7_exit_reason=_v7_exit_reason, + naive_would_have=_naive_would_have, + ) except Exception: pass # shadow must never affect live trading diff --git a/nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py b/nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py index 44fa739..de3f988 100755 --- a/nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py +++ b/nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py @@ -14,6 +14,7 @@ Flow per tick: Source: dolphin_vbt_real.py simulate_multi_asset_nb() lines 1715-2250 """ +import math import uuid import numpy as np from numba import njit @@ -57,6 +58,12 @@ class NDPosition: entry_v50_vel: float = 0.0 entry_v750_vel: float = 0.0 current_price: float = 0.0 + # GREEN-only observability fields (BLUE leaves at defaults). + asset_bucket_id: Optional[int] = None + s6_mult: float = 1.0 + esof_mult: float = 1.0 + esof_label: Optional[str] = None + leverage_raw: Optional[float] = None # pre-int-rounding leverage, for CH analysis @property def pnl_pct(self) -> float: @@ -134,6 +141,11 @@ class NDAlphaEngine: use_dynamic_leverage: bool = True, # Absolute leverage ceiling — ACB/MC/EsoF steer within [base, abs_max]; never breach abs_max_leverage: float = 6.0, + # GREEN-only toggles (BLUE no-op when left at None/False). + s6_size_table: Optional[Dict[int, float]] = None, + esof_sizing_table: Optional[Dict[str, float]] = None, + asset_bucket_data: Optional[Dict[str, Any]] = None, + use_int_leverage: bool = False, # Seed seed: int = 42, ): @@ -147,6 +159,18 @@ class NDAlphaEngine: self.leverage_convexity = leverage_convexity self.abs_max_leverage = abs_max_leverage + # GREEN-only single-site mult tables + int-leverage gate. + # BLUE leaves these at None/False → notional/leverage math unchanged. + self.s6_size_table: Optional[Dict[int, float]] = s6_size_table + self.esof_sizing_table: Optional[Dict[str, float]] = esof_sizing_table + self.asset_bucket_data: Optional[Dict[str, Any]] = asset_bucket_data + self.use_int_leverage: bool = use_int_leverage + # EsoF label for current bar — set externally by dolphin_actor from HZ + # DOLPHIN_FEATURES.esof_advisor_latest each tick. None → treated as UNKNOWN. + self._current_esof_label: Optional[str] = None + # Runtime-computed per-entry mults (populated inside _try_entry). + self._esof_size_mult: float = 1.0 + # Fee/slippage model self.use_sp_fees = use_sp_fees self.use_sp_slippage = use_sp_slippage @@ -466,6 +490,18 @@ class NDAlphaEngine: if self.capital <= 0: return None + # GREEN-only EsoF regime gate. Runs before selector/sizer at the highest-visible level. + # BLUE leaves esof_sizing_table=None → skipped entirely (mult stays 1.0, no gate). + # Treat a missing/None label as UNKNOWN (signals-in-conflict) to honour the renamed default. + esof_label: Optional[str] = None + if self.esof_sizing_table: + esof_label = self._current_esof_label or "UNKNOWN" + self._esof_size_mult = float(self.esof_sizing_table.get(esof_label, 1.0)) + if self._esof_size_mult <= 0.0: + return None # regime-wide skip — no selector/sizer work + else: + self._esof_size_mult = 1.0 + trade_direction = self.regime_direction # 1. IRP asset selection (matches dolphin_vbt_real.py lines 2047-2071) @@ -562,7 +598,46 @@ class NDAlphaEngine: leverage = min(raw_leverage, clamped_max_leverage) leverage = max(self.bet_sizer.min_leverage, leverage) - notional = self.capital * size_result["fraction"] * leverage + # ── SINGLE-SITE notional application (S6 + EsoF + int-leverage gate) ── + # All sizing multipliers (S6 per-bucket, EsoF regime) are applied HERE, not + # inside AlphaBetSizer — the sizer stays pure so BLUE parity is trivial and + # we avoid the historical bug where sizer-internal `notional` was silently + # discarded by this very line. When every GREEN toggle is off/None, math + # collapses to the original BLUE form. + asset_bucket_id: Optional[int] = None + if self.asset_bucket_data is not None: + assignments = self.asset_bucket_data.get("assignments", {}) if isinstance(self.asset_bucket_data, dict) else {} + _bkt = assignments.get(trade_asset) + if _bkt is not None: + asset_bucket_id = int(_bkt) + + s6_mult: float = 1.0 + if self.s6_size_table and asset_bucket_id is not None: + s6_mult = float(self.s6_size_table.get(asset_bucket_id, 1.0)) + + esof_mult: float = self._esof_size_mult + + # INTEGER-LEVERAGE GATE — target exchanges (e.g. Binance) require int leverage. + # DEFAULT = 1x pending leverage-vs-winrate analysis in prod/scripts/analyze_leverage_winrate.py. + # The float `leverage` variable computed above is preserved as `leverage_raw` for CH logging + # so the analysis has the data to later flip this rule. + # + # Once analysis completes, replace `leverage_int = 1` with one of: + # Option 1 (round-half-up, conservative): int(math.floor(leverage + 0.5)) + # Option 2 (banker's round, aggressive): int(round(leverage)) + # and keep the min=1 / abs_max clamp below. + leverage_raw: float = leverage + if self.use_int_leverage: + leverage_int = 1 # FIXED PENDING ANALYSIS + leverage_int = max(1, leverage_int) + leverage_int = min(leverage_int, int(self.abs_max_leverage)) + effective_leverage: float = float(leverage_int) + else: + effective_leverage = leverage # BLUE path — unchanged float leverage + + notional = self.capital * size_result["fraction"] * effective_leverage * s6_mult * esof_mult + if notional <= 0: + return None # explicit skip — prevents 0-notional NDPosition creation # 5. Entry price — NO entry slippage (dolphin_vbt_real.py uses raw price for PnL calculation baseline) entry_price = prices.get(trade_asset, 0) @@ -577,12 +652,17 @@ class NDAlphaEngine: entry_price=entry_price, entry_bar=bar_idx, notional=notional, - leverage=leverage, + leverage=effective_leverage, fraction=size_result["fraction"], entry_vel_div=vel_div, bucket_idx=size_result["bucket_idx"], entry_v50_vel=v50_vel, entry_v750_vel=v750_vel, + asset_bucket_id=asset_bucket_id, + s6_mult=s6_mult, + esof_mult=esof_mult, + esof_label=esof_label, + leverage_raw=leverage_raw, ) # Consume pending overrides (set by subclasses before calling super()._try_entry). @@ -605,10 +685,15 @@ class NDAlphaEngine: "trade_id": trade_id, "asset": trade_asset, "direction": trade_direction, - "leverage": leverage, + "leverage": effective_leverage, + "leverage_raw": leverage_raw, "notional": notional, "vel_div": vel_div, "entry_price": entry_price, # needed by _exec_submit_entry when prices dict is empty + "asset_bucket_id": asset_bucket_id, + "s6_mult": s6_mult, + "esof_mult": esof_mult, + "esof_label": esof_label, } def get_performance_summary(self) -> Dict[str, Any]: diff --git a/nautilus_dolphin/tests/test_green_only_features.py b/nautilus_dolphin/tests/test_green_only_features.py new file mode 100644 index 0000000..efac07e --- /dev/null +++ b/nautilus_dolphin/tests/test_green_only_features.py @@ -0,0 +1,170 @@ +"""GREEN-only feature tests (exp/green-s6-esof-aem-shadow-2026-04-21). + +Split complement to `test_green_blue_parity.py`. That file guards invariants +that must hold across GREEN's divergence (signal, DC, hibernate, bucket SL, etc.). +This file covers the sprint-specific additions: + + * S6 bucket-ban at AlphaAssetSelector (selector skips banned buckets, + rankings slide up so the slot goes to the next-ranked asset) + * S6 + EsoF + int-leverage at the orchestrator single-site (line 565 region) + * AEM per-bucket MAE_MULT table (B3 disables MAE stop, B4 cuts fast, B6 wide) + * NEUTRAL → UNKNOWN label rename (advisor emits UNKNOWN; alias still resolves) + * Toggles-OFF identity: with every GREEN-only kwarg left at the BLUE default + (None / False), the orchestrator's per-entry math equals the pre-sprint form. + +All tests import pure Python modules — no NautilusKernel, no CH/HZ dependency. +""" +from __future__ import annotations + +import math +from typing import Dict, List, Optional + +import pytest + + +# ── Toggles-OFF identity (BLUE invariance) ─────────────────────────────────── + +def test_toggles_off_notional_matches_blue_pre_sprint_formula(): + """With every GREEN toggle left at the BLUE default (None/False), the + single-site notional reduces algebraically to `capital * fraction * leverage` + — the exact form BLUE used before the sprint landed. Guards against silent + regressions that would show up as BLUE PnL drift.""" + from nautilus_dolphin.nautilus.esf_alpha_orchestrator import NDAlphaEngine + + eng = NDAlphaEngine(initial_capital=25000.0) # all GREEN kwargs default + assert eng.s6_size_table is None + assert eng.esof_sizing_table is None + assert eng.asset_bucket_data is None + assert eng.use_int_leverage is False + # EsoF runtime state initialised to 1.0 (no-op) + assert eng._esof_size_mult == 1.0 + assert eng._current_esof_label is None + + +def test_ndposition_new_fields_default_to_neutral_values(): + """New GREEN-only observability fields on NDPosition must default so that + a BLUE entry produces a record indistinguishable from the pre-sprint shape + in every field that already existed.""" + from nautilus_dolphin.nautilus.esf_alpha_orchestrator import NDPosition + + pos = NDPosition( + trade_id="t", asset="BTCUSDT", direction=-1, entry_price=100.0, + entry_bar=0, notional=1000.0, leverage=2.0, fraction=0.1, + entry_vel_div=-0.03, bucket_idx=0, + ) + assert pos.asset_bucket_id is None + assert pos.s6_mult == 1.0 + assert pos.esof_mult == 1.0 + assert pos.esof_label is None + assert pos.leverage_raw is None + + +# ── S6 selector ban ────────────────────────────────────────────────────────── + +def test_selector_bucket_ban_skips_banned_bucket_not_slot(): + """Core caveat from the plan: selector-ban must NOT just zero-size the slot; + it must skip the asset so the next ranking takes the slot. Ban ≠ 0× sizer.""" + from nautilus_dolphin.nautilus.alpha_asset_selector import AlphaAssetSelector + + sel = AlphaAssetSelector( + asset_bucket_ban_set={4}, + asset_bucket_assignments={"AAA": 4, "BBB": 0, "CCC": 2}, + ) + # We inspect the ban wiring directly — the kernel is tested in the parity file. + assert sel.asset_bucket_ban_set == {4} + assert sel.asset_bucket_assignments["AAA"] == 4 + + # Simulate the in-loop guard: banned buckets should be filtered. + candidates = ["AAA", "BBB", "CCC"] + survivors = [ + a for a in candidates + if sel.asset_bucket_assignments.get(a) not in (sel.asset_bucket_ban_set or set()) + ] + assert survivors == ["BBB", "CCC"] + assert "AAA" not in survivors + + +def test_selector_default_ban_set_is_none_for_blue_invariance(): + from nautilus_dolphin.nautilus.alpha_asset_selector import AlphaAssetSelector + + sel = AlphaAssetSelector() + assert sel.asset_bucket_ban_set is None + assert sel.asset_bucket_assignments == {} + + +# ── AEM per-bucket MAE_MULT ────────────────────────────────────────────────── + +def test_mae_mult_b3_disables_stop(): + """B3 (natural winners) is configured with None → MAE_STOP is skipped even + at huge MAE. Giveback/time still apply.""" + from adaptive_exit.adaptive_exit_engine import MAE_MULT_BY_BUCKET + + assert MAE_MULT_BY_BUCKET[3] is None + + +def test_mae_mult_b4_is_strict(): + """B4 (gross-negative alpha) gets a tight 2.0× stop — cut fast.""" + from adaptive_exit.adaptive_exit_engine import MAE_MULT_BY_BUCKET + + assert MAE_MULT_BY_BUCKET[4] == 2.0 + + +def test_mae_mult_b6_is_wide(): + """B6 (extreme vol) gets a wide 6.0× band — noise tolerance.""" + from adaptive_exit.adaptive_exit_engine import MAE_MULT_BY_BUCKET + + assert MAE_MULT_BY_BUCKET[6] == 6.0 + + +def test_mae_mult_default_fallback_matches_legacy_constant(): + """Unknown bucket_id falls back to MAE_MULT_TIER1 (the pre-sprint constant).""" + from adaptive_exit.adaptive_exit_engine import MAE_MULT_BY_BUCKET, MAE_MULT_TIER1 + + unknown_bucket = 99 + assert MAE_MULT_BY_BUCKET.get(unknown_bucket, MAE_MULT_TIER1) == MAE_MULT_TIER1 + + +# ── EsoF label rename (NEUTRAL → UNKNOWN) ──────────────────────────────────── + +def test_esof_advisor_emits_unknown_not_neutral_in_midband(): + """The advisor's mid-band score must produce UNKNOWN, not NEUTRAL. Guards + against accidental revert of the rename (the semantic shift — 'signals in + conflict' — is load-bearing for the regime gate).""" + import Observability.esof_advisor as esof_advisor # noqa: N813 + + # Sanity-scan source for the rename; the computed advisor call requires + # deep Hz/CH plumbing we don't want to mock here. + src = open(esof_advisor.__file__).read() + assert 'advisory_label = "UNKNOWN"' in src + assert 'advisory_label = "NEUTRAL"' not in src + + +def test_esof_gate_tables_accept_both_unknown_and_neutral_keys(): + """S6_MULT and IRP_PARAMS must carry both UNKNOWN and NEUTRAL keys so that + historical CH replays (old label) resolve the same as new advisor output + (new label).""" + from Observability.esof_gate import S6_MULT, IRP_PARAMS + + assert "UNKNOWN" in S6_MULT and "NEUTRAL" in S6_MULT + assert S6_MULT["UNKNOWN"] == S6_MULT["NEUTRAL"] + assert "UNKNOWN" in IRP_PARAMS and "NEUTRAL" in IRP_PARAMS + assert IRP_PARAMS["UNKNOWN"] == IRP_PARAMS["NEUTRAL"] + + +# ── Int-leverage gate ──────────────────────────────────────────────────────── + +def test_int_leverage_gate_default_is_1x_min_clamped(): + """The in-source rule is `leverage_int = 1` plus min/abs_max clamp. Guards + against anyone accidentally switching to Option 1 or 2 without running the + winrate analysis first.""" + import pathlib + + src = pathlib.Path( + "nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py" + ).read_text() + # Fixed default must be explicit and annotated. + assert "leverage_int = 1" in src + assert "FIXED PENDING ANALYSIS" in src + # Both candidate rules must remain documented in-source for the flip PR. + assert "round-half-up" in src + assert "banker's round" in src diff --git a/prod/configs/green.yml b/prod/configs/green.yml index d75471f..196f363 100755 --- a/prod/configs/green.yml +++ b/prod/configs/green.yml @@ -66,3 +66,47 @@ hazelcast: imap_pnl: DOLPHIN_PNL_GREEN imap_state: DOLPHIN_STATE_GREEN state_map: DOLPHIN_STATE_GREEN # capital persistence map (was defaulting to BLUE) + +# ────────────────────────────────────────────────────────────────────────────── +# GREEN S6/EsoF/AEM sprint (exp/green-s6-esof-aem-shadow-2026-04-21). +# BLUE (prod/configs/blue.yml) does NOT set these keys → orchestrator loads them +# as None/False → BLUE math is byte-identical pre-sprint. To disable any branch +# on GREEN, comment the corresponding key — single kill-switch per feature. +# ────────────────────────────────────────────────────────────────────────────── + +# Strictly-zero buckets banned at the ranking layer (selector skips them so the +# slot is handed to the next-best asset — does NOT waste capital with 0× sizing). +# Fractional buckets (B0/B1/B5) stay tradeable via s6_size_table below. +asset_bucket_ban_set: [4] + +# Pointer to the generated S6 coefficient table. prod/scripts/recompute_s6_coefficients.py +# regenerates this file on the configured cadence (env S6_RECOMPUTE_INTERVAL_DAYS). +# Bucket → per-entry notional multiplier applied at esf_alpha_orchestrator.py single-site. +s6_table_path: prod/configs/green_s6_table.yml + +# Inline fallback used when s6_table_path is missing (bootstrap / first-run): +# matches the S6 row from prod/docs/CRITICAL_ASSET_PICKING_BRACKETS_VS._ROI_WR_AT_TRADES.md. +# B4 is absent (banned above); B2 is absent (= 1.0x, no-op). +s6_size_table: + 0: 0.40 + 1: 0.30 + 3: 2.00 + 5: 0.50 + 6: 1.50 + +# EsoF regime gate lives at the top of _try_entry (orchestrator single-site). +# mult == 0 → regime-wide skip (no selector/sizer work). UNKNOWN replaces NEUTRAL +# (signals-in-conflict is empirically the worst ROI state). +esof_sizing_table: + FAVORABLE: 1.20 + MILD_POSITIVE: 0.60 + UNKNOWN: 0.25 + NEUTRAL: 0.25 # alias — historical CH rows / stale HZ snapshots + MILD_NEGATIVE: 0.00 + UNFAVORABLE: 0.00 + +# Target exchanges (e.g. Binance) require integer leverage. Default 1x pending +# CH-trade-walk in prod/scripts/analyze_leverage_winrate.py to pick the rounding +# rule (round-half-up vs banker's round vs stay-at-1x). leverage_raw is preserved +# in CH trade_events + NDPosition for that analysis. +use_int_leverage: true diff --git a/prod/configs/green_s6_table.yml b/prod/configs/green_s6_table.yml new file mode 100644 index 0000000..72fc053 --- /dev/null +++ b/prod/configs/green_s6_table.yml @@ -0,0 +1,34 @@ +# GREEN S6 bucket sizing table +# ──────────────────────────────────────────────────────────────────────────── +# Auto-generated by prod/scripts/recompute_s6_coefficients.py. +# Bucket → per-entry notional multiplier applied at the orchestrator single-site +# (`nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py`). +# +# Semantics: +# value == 1.0 → no-op (same as BLUE) +# value < 1.0 → size down (fractional) +# value > 1.0 → size up (leaned-into bucket) +# absent bucket → 1.0 (no-op) +# value == 0.0 → prefer banning via `asset_bucket_ban_set` in green.yml +# (selector-ban reroutes to next-ranked asset; +# sizer 0× would waste the slot) +# +# Regeneration cadence: env S6_RECOMPUTE_INTERVAL_DAYS (default 30). +# If any bucket's net-PnL moves > ~20% between runs, the recompute flow flags +# the change in CH `dolphin.s6_recompute_log` for manual review before apply. +# ──────────────────────────────────────────────────────────────────────────── + +meta: + generated_at: "2026-04-21T00:00:00Z" # replaced on each recompute + source_branch: "exp/green-s6-esof-aem-shadow-2026-04-21" + n_trades: 0 # populated by recompute script + note: "Bootstrap stub — regenerate with prod/scripts/recompute_s6_coefficients.py before staging" + +# Bucket → multiplier. B4 excluded (banned at selector). B2 omitted (= 1.0 no-op). +# Seeded from the S6 scenario in prod/docs/CRITICAL_ASSET_PICKING_BRACKETS_VS._ROI_WR_AT_TRADES.md. +buckets: + 0: 0.40 + 1: 0.30 + 3: 2.00 + 5: 0.50 + 6: 1.50 diff --git a/prod/docs/BRANCHING_CONVENTION.md b/prod/docs/BRANCHING_CONVENTION.md new file mode 100644 index 0000000..bd1edcc --- /dev/null +++ b/prod/docs/BRANCHING_CONVENTION.md @@ -0,0 +1,39 @@ +# DOLPHIN Branching Convention + +Established 2026-04-21. The DOLPHIN repo (Gitea: `100.119.158.61:3000/hjnormey/DOLPHIN`) uses a fork-per-experiment pattern to support parallel multi-algo benchmarking on shared infrastructure (ClickHouse, HZ, MHS). + +## Branches + +| Branch | Role | +|----------------|-----------------------------------------------------------------| +| `master` | Production baseline. Only merged-and-validated work lands here. | +| `exp/` | Experiment branches. Named `exp/-`. | + +Merge to `master` only after the experiment has passed staging validation (integration replay reproduces expected counterfactual, parity tests pass, 24h staging monitor clean). + +## Active / recent experiments + +- `exp/green-s6-esof-aem-shadow-2026-04-21` — S6 asset picker, EsoF regime gate (NEUTRAL→UNKNOWN), AEM shadow completeness, integer-leverage gate. + +## Infrastructure suffixing (avoid collisions on shared CH / HZ / MHS) + +When an experiment writes runtime state to shared infra, **suffix the table or map name with the branch slug** so parallel experiments don't stomp each other. + +Examples: +- ClickHouse tables: `adaptive_exit_shadow_exp_green_s6`, `trade_events_exp_green_s6` +- HZ maps: `DOLPHIN_PNL_GREEN_S6`, `DOLPHIN_FEATURES_EXP_GREEN_S6` +- Supervisord program names: `dolphin-green-exp-s6` + +The suffix slug is the part of the branch name after `exp/`, with `/` → `_` and `-` → `_` as needed. + +## Workflow + +1. Branch from `master`: `git checkout master && git pull && git checkout -b exp/-`. +2. All experiment code, configs, CH DDL, supervisord entries, and docs land on the experiment branch. +3. Staging deploy runs on the experiment branch only. +4. Validation + monitor period. +5. If the experiment is kept, open a merge request to `master`. If it's abandoned, leave the branch for reference — do not delete immediately (post-mortem value). + +## Commit identity + +Commits should use the operator's Gitea-bound identity (`hjnormey@gmail.com`). Agents making commits should pass identity per-command via `git -c user.email=... -c user.name=...` rather than mutating global config. diff --git a/prod/docs/ESOF_LABEL_MIGRATION.md b/prod/docs/ESOF_LABEL_MIGRATION.md new file mode 100644 index 0000000..fddce34 --- /dev/null +++ b/prod/docs/ESOF_LABEL_MIGRATION.md @@ -0,0 +1,26 @@ +# EsoF Label Migration — `NEUTRAL` → `UNKNOWN` + +Landed in `exp/green-s6-esof-aem-shadow-2026-04-21` on 2026-04-21. + +## What changed + +`Observability/esof_advisor.py` emits `UNKNOWN` where it previously emitted `NEUTRAL` (mid-band, `-0.05 < advisory_score <= 0.05`). All downstream consumers keep a `NEUTRAL` alias so historical CH rows and replays continue to resolve. + +## Why + +Findings from the 637-trade EsoF retrospective (`prod/docs/EsoF_BLUE_IMPLEMENTATION_CURR_AND_RESEARCH.md`) show the mid-band isn't a benign middle — it's the **worst-ROI regime**, corresponding to states where the constituent liq/session/DoW/slot/cell signals are **in conflict**. "NEUTRAL" connoted "no strong read, probably fine"; the data says the opposite. The orchestrator-top EsoF gate uses `UNKNOWN` → `0.25x` sizer mult to encode "stand mostly aside" instead of "trade normally". + +## Touched files + +- `Observability/esof_advisor.py` — emitter renamed; `LABEL_COLOR` carries both keys. +- `Observability/esof_gate.py` — `S6_MULT`, `IRP_PARAMS`, Strategy A mult map all keyed on `UNKNOWN` with a `NEUTRAL` alias entry. +- `nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py` — new GREEN-only `esof_sizing_table` looks up by label; missing/None label defaults to `UNKNOWN`. + +## CH / HZ + +- CH `dolphin.trade_events`, `dolphin.esof_advisory_log`: no DDL change (labels are strings). Historical rows keep `NEUTRAL`; new rows get `UNKNOWN`. Any dashboard/query filtering on the label needs to `IN ('NEUTRAL','UNKNOWN')` or be migrated. +- HZ `DOLPHIN_FEATURES.esof_advisor_latest`: next advisor tick overwrites with `UNKNOWN`. Consumers reading stale snapshots across the cutover should treat both as equivalent. + +## Rollback + +Revert the three files above. The `NEUTRAL` alias means a partial rollback (advisor only) is safe without cascading breakage. diff --git a/prod/s6_recompute_flow.py b/prod/s6_recompute_flow.py new file mode 100755 index 0000000..fdb63df --- /dev/null +++ b/prod/s6_recompute_flow.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +"""Prefect flow — periodically recompute S6 bucket sizing coefficients. + +Cadence +─────── +Controlled by env `S6_RECOMPUTE_INTERVAL_DAYS` (default 30). The flow is +idempotent: running it more often than the interval just produces a fresh +YAML and markdown report; nothing is auto-promoted. + +Kill-switch +─────────── +Set env `S6_RECOMPUTE_DISABLED=1` to skip — the flow logs and exits 0. + +Wiring into supervisord/cron +──────────────────────────── +Run this flow on the supervisord host. Recommended: daily timer that guards +internally on the last-run timestamp in `dolphin.s6_recompute_log`, or a +Prefect deployment with a cron schedule `0 3 * * *` and the interval env set. + +This flow shells out to `prod/scripts/recompute_s6_coefficients.py` rather +than importing it so that the script remains independently runnable without +requiring Prefect — honours the plan's "recompute works whether or not +Prefect is installed" requirement. +""" + +from __future__ import annotations + +import os +import subprocess +import sys +from datetime import datetime, timedelta +from pathlib import Path + +try: + from prefect import flow, task, get_run_logger +except ImportError: # pragma: no cover — allow import-only smoke tests + def flow(fn=None, **_kw): + if fn is None: + return lambda f: f + return fn + def task(fn=None, **_kw): + if fn is None: + return lambda f: f + return fn + def get_run_logger(): # pragma: no cover + import logging + return logging.getLogger("s6_recompute_flow") + + +REPO_ROOT = Path(__file__).resolve().parents[1] +RECOMPUTE_PY = REPO_ROOT / "prod" / "scripts" / "recompute_s6_coefficients.py" +TABLE_YML = REPO_ROOT / "prod" / "configs" / "green_s6_table.yml" + + +@task +def check_interval_elapsed(interval_days: int) -> bool: + """Return True if the YAML is missing OR older than the interval.""" + if not TABLE_YML.exists(): + return True + try: + import yaml + doc = yaml.safe_load(TABLE_YML.read_text()) or {} + ts_str = (doc.get("meta") or {}).get("generated_at", "") + if not ts_str: + return True + ts = datetime.fromisoformat(ts_str.rstrip("Z")) + return datetime.utcnow() - ts >= timedelta(days=interval_days) + except Exception: + # Any parsing trouble → safer to recompute + return True + + +@task +def run_recompute(strategy: str, since: str, min_trades: int, source_branch: str) -> int: + logger = get_run_logger() + cmd = [ + sys.executable, str(RECOMPUTE_PY), + "--strategy", strategy, + "--since", since, + "--min-trades-per-bucket", str(min_trades), + "--source-branch", source_branch, + ] + logger.info(f"[s6_recompute_flow] exec: {' '.join(cmd)}") + rc = subprocess.call(cmd, cwd=str(REPO_ROOT)) + logger.info(f"[s6_recompute_flow] exit code: {rc}") + return rc + + +@flow(name="s6-recompute") +def s6_recompute_flow( + strategy: str = "blue", + since: str = "2026-01-01", + min_trades: int = 20, + source_branch: str = "exp/green-s6-esof-aem-shadow-2026-04-21", +): + logger = get_run_logger() + + if os.environ.get("S6_RECOMPUTE_DISABLED") == "1": + logger.info("[s6_recompute_flow] disabled via S6_RECOMPUTE_DISABLED=1 — skipping") + return 0 + + try: + interval_days = int(os.environ.get("S6_RECOMPUTE_INTERVAL_DAYS", "30")) + except ValueError: + interval_days = 30 + + due = check_interval_elapsed(interval_days) + if not due: + logger.info(f"[s6_recompute_flow] interval not elapsed ({interval_days}d) — skipping") + return 0 + + return run_recompute(strategy, since, min_trades, source_branch) + + +if __name__ == "__main__": + sys.exit(int(s6_recompute_flow() or 0)) diff --git a/prod/scripts/analyze_leverage_winrate.py b/prod/scripts/analyze_leverage_winrate.py new file mode 100755 index 0000000..e6c7156 --- /dev/null +++ b/prod/scripts/analyze_leverage_winrate.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +"""Leverage → WR / net-PnL analysis over CH `dolphin.trade_events`. + +Purpose +─────── +The target exchanges (Binance futures) require integer leverage. The GREEN +orchestrator single-site (`esf_alpha_orchestrator.py`) therefore rounds +leverage to an integer at entry time; the current default is FIXED 1x with +two candidate rounding rules commented in source, pending this analysis: + + Option 1 (round-half-up, conservative): int(math.floor(raw + 0.5)), min=1 + Option 2 (banker's round, aggressive): int(round(raw)), min=1 + Stay-at-1x (current default): leverage_int = 1 + +This script walks historical trades and, for every distinct leverage value +observed (bucketed to a user-chosen step), reports per-bin WR, net-PnL, +avg-MAE and sample count. The output report informs the rule choice. + +Decision rubric (documented in the generated report) +──────────────────────────────────────────────────── + - Higher-lev bins show clearly better net-PnL w/ tight sample error → Option 2 + - Flat / noisy signal across bins → Option 1 + - Higher-lev bins show WORSE net-PnL / mixed → stay 1x + +This script is read-only; it does not auto-apply a rule change. + +Usage +───── + python prod/scripts/analyze_leverage_winrate.py \\ + --strategy blue \\ + --since 2026-01-01 \\ + --step 0.5 \\ + --out prod/docs/LEVERAGE_WINRATE_REPORT_2026-04-21.md +""" + +from __future__ import annotations + +import argparse +import json +import math +import sys +import urllib.parse +import urllib.request +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +CH_URL = "http://localhost:8123/" +CH_HEADERS = { + "X-ClickHouse-User": "dolphin", + "X-ClickHouse-Key": "dolphin_ch_2026", +} + + +def ch_query(sql: str) -> List[Dict[str, Any]]: + """Run a CH query returning JSONEachRow; raise if the HTTP call fails.""" + qs = urllib.parse.urlencode({"query": sql + " FORMAT JSONEachRow"}) + req = urllib.request.Request(f"{CH_URL}?{qs}", method="GET") + for k, v in CH_HEADERS.items(): + req.add_header(k, v) + with urllib.request.urlopen(req, timeout=60) as resp: + body = resp.read().decode("utf-8", errors="replace") + rows: List[Dict[str, Any]] = [] + for line in body.splitlines(): + line = line.strip() + if line: + rows.append(json.loads(line)) + return rows + + +def bin_leverage(lev: float, step: float) -> float: + """Bucket leverage to `step`-wide bins, anchored at 0. Returns bin lower edge.""" + if lev is None or not math.isfinite(lev) or lev <= 0: + return 0.0 + return math.floor(lev / step) * step + + +def load_trades(strategy: str, since: str, until: Optional[str]) -> List[Dict[str, Any]]: + """Pull trade_events rows. Uses leverage_raw if present (post-sprint rows), + else falls back to leverage (pre-sprint rows on the same trade semantics). + """ + where = [f"strategy = '{strategy}'", f"date >= toDate('{since}')"] + if until: + where.append(f"date <= toDate('{until}')") + where_sql = " AND ".join(where) + + sql = ( + "SELECT " + " toDate(date) AS d, " + " asset, " + " coalesce(leverage_raw, leverage) AS lev_raw, " + " leverage AS lev_effective, " + " pnl, pnl_pct, " + " (pnl_pct > 0) AS is_win, " + " exit_reason " + f"FROM dolphin.trade_events WHERE {where_sql}" + ) + return ch_query(sql) + + +def aggregate(trades: List[Dict[str, Any]], step: float) -> List[Dict[str, Any]]: + bins: Dict[float, Dict[str, Any]] = {} + for t in trades: + lev = float(t.get("lev_raw") or 0.0) + b = bin_leverage(lev, step) + s = bins.setdefault(b, { + "bin_lo": b, + "bin_hi": b + step, + "n": 0, + "wins": 0, + "net_pnl": 0.0, + "net_pnl_pct": 0.0, + }) + s["n"] += 1 + if int(t.get("is_win", 0)): + s["wins"] += 1 + s["net_pnl"] += float(t.get("pnl") or 0.0) + s["net_pnl_pct"] += float(t.get("pnl_pct") or 0.0) + + out = [] + for b, s in sorted(bins.items()): + n = max(1, s["n"]) + out.append({ + "bin": f"{s['bin_lo']:.2f}–{s['bin_hi']:.2f}x", + "n": s["n"], + "wr_pct": round(100.0 * s["wins"] / n, 2), + "avg_pnl_pct": round(s["net_pnl_pct"] / n * 100.0, 3), + "net_pnl": round(s["net_pnl"], 2), + }) + return out + + +def render_markdown(rows: List[Dict[str, Any]], + strategy: str, since: str, until: Optional[str], + step: float, total: int) -> str: + hdr = ( + "# Leverage → Winrate / Net-PnL Analysis\n\n" + f"- Strategy: `{strategy}`\n" + f"- Window: `{since}` → `{until or 'now'}`\n" + f"- Leverage bin step: `{step}x`\n" + f"- Total trades in scope: `{total}`\n" + f"- Generated: `{datetime.utcnow().isoformat(timespec='seconds')}Z`\n\n" + "## Per-bin aggregates\n\n" + "| Leverage bin | Trades | WR % | Avg PnL % | Net PnL |\n" + "|---|---:|---:|---:|---:|\n" + ) + body = "\n".join( + f"| {r['bin']} | {r['n']} | {r['wr_pct']} | {r['avg_pnl_pct']} | {r['net_pnl']} |" + for r in rows + ) + decision = ( + "\n\n## Decision rubric (copy into the orchestrator comment block)\n\n" + "- Higher-lev bins show clearly better net-PnL w/ tight sample error → **Option 2** (banker's round, min=1)\n" + "- Flat / noisy signal across bins → **Option 1** (round-half-up, min=1)\n" + "- Higher-lev bins show WORSE net-PnL / mixed → **stay at 1x** (current default)\n\n" + "_Flipping the rule is a follow-up PR — this script is read-only._\n" + ) + return hdr + body + decision + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--strategy", default="blue", + help="trade_events.strategy value to filter on (default: blue — richest historical set)") + ap.add_argument("--since", default="2026-01-01", help="inclusive start date (YYYY-MM-DD)") + ap.add_argument("--until", default=None, help="inclusive end date (YYYY-MM-DD); default = now") + ap.add_argument("--step", type=float, default=1.0, help="leverage bin width (e.g. 0.5 or 1.0)") + ap.add_argument("--out", default="prod/docs/LEVERAGE_WINRATE_REPORT.md", + help="output report path (markdown)") + args = ap.parse_args() + + trades = load_trades(args.strategy, args.since, args.until) + if not trades: + print(f"No trades matched (strategy={args.strategy}, since={args.since}).", file=sys.stderr) + return 1 + + rows = aggregate(trades, args.step) + md = render_markdown(rows, args.strategy, args.since, args.until, args.step, total=len(trades)) + + out_path = Path(args.out) + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(md) + print(f"Wrote {out_path} ({len(rows)} bins, {len(trades)} trades)") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/prod/scripts/recompute_s6_coefficients.py b/prod/scripts/recompute_s6_coefficients.py new file mode 100755 index 0000000..753cd87 --- /dev/null +++ b/prod/scripts/recompute_s6_coefficients.py @@ -0,0 +1,341 @@ +#!/usr/bin/env python3 +"""Recompute S6 bucket sizing coefficients from fresh `dolphin.trade_events`. + +Regenerates: + - prod/configs/green_s6_table.yml (consumed by the GREEN orchestrator single-site) + - prod/docs/S6_COEFFICIENTS_REPORT_.md (human-readable summary) + +Variance guard +────────────── +If any bucket's net-PnL moves more than `--variance-threshold` (default 20%) +relative to the prior pinned table, we STILL write the new YAML (so replay/debug +cycles remain reproducible) but also write a row to `dolphin.s6_recompute_log` +so a human can review before promoting to staging/prod. + +Usage +───── + python prod/scripts/recompute_s6_coefficients.py \\ + --strategy blue --since 2026-01-01 --min-trades-per-bucket 20 + +Intended to be driven by `prod/s6_recompute_flow.py` (Prefect) on a 30-day cadence. +""" + +from __future__ import annotations + +import argparse +import json +import pickle +import sys +import urllib.parse +import urllib.request +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +CH_URL = "http://localhost:8123/" +CH_HEADERS = { + "X-ClickHouse-User": "dolphin", + "X-ClickHouse-Key": "dolphin_ch_2026", +} + +DEFAULT_BUCKET_PKL = Path("adaptive_exit/models/bucket_assignments.pkl") +DEFAULT_TABLE_YML = Path("prod/configs/green_s6_table.yml") +REPORT_DIR = Path("prod/docs") + + +def ch_query(sql: str) -> List[Dict[str, Any]]: + qs = urllib.parse.urlencode({"query": sql + " FORMAT JSONEachRow"}) + req = urllib.request.Request(f"{CH_URL}?{qs}", method="GET") + for k, v in CH_HEADERS.items(): + req.add_header(k, v) + with urllib.request.urlopen(req, timeout=60) as resp: + body = resp.read().decode("utf-8", errors="replace") + rows: List[Dict[str, Any]] = [] + for line in body.splitlines(): + line = line.strip() + if line: + rows.append(json.loads(line)) + return rows + + +def ch_insert(table: str, row: Dict[str, Any], db: str = "dolphin") -> None: + """Best-effort insert into `{db}.{table}` (JSONEachRow). Swallows errors.""" + body = (json.dumps(row) + "\n").encode() + qs = urllib.parse.urlencode({ + "database": db, + "query": f"INSERT INTO {table} FORMAT JSONEachRow", + }) + req = urllib.request.Request(f"{CH_URL}?{qs}", data=body, method="POST") + for k, v in CH_HEADERS.items(): + req.add_header(k, v) + try: + urllib.request.urlopen(req, timeout=10) + except Exception as e: + print(f"[s6_recompute] CH insert warning: {e}", file=sys.stderr) + + +def ensure_recompute_log() -> None: + ddl = ( + "CREATE TABLE IF NOT EXISTS dolphin.s6_recompute_log (" + "ts DateTime64(6, 'UTC')," + "ts_day Date MATERIALIZED toDate(ts)," + "bucket_id UInt8," + "n_trades UInt32," + "wr_pct Float32," + "net_pnl Float64," + "prior_net Nullable(Float64)," + "delta_pct Nullable(Float32)," + "new_mult Float32," + "prior_mult Nullable(Float32)," + "flagged UInt8" + ") ENGINE = MergeTree()" + " ORDER BY (ts_day, bucket_id, ts)" + " TTL ts_day + INTERVAL 365 DAY" + ) + req = urllib.request.Request(CH_URL, data=ddl.encode(), method="POST") + for k, v in CH_HEADERS.items(): + req.add_header(k, v) + try: + urllib.request.urlopen(req, timeout=10) + except Exception as e: + print(f"[s6_recompute] could not ensure log table: {e}", file=sys.stderr) + + +def load_bucket_assignments(path: Path) -> Dict[str, int]: + with open(path, "rb") as f: + doc = pickle.load(f) + assignments = doc.get("assignments") if isinstance(doc, dict) else None + if not isinstance(assignments, dict): + raise RuntimeError(f"{path}: expected dict with 'assignments' key") + return {str(k): int(v) for k, v in assignments.items()} + + +def pull_trades(strategy: str, since: str, until: Optional[str]) -> List[Dict[str, Any]]: + where = [f"strategy = '{strategy}'", f"date >= toDate('{since}')"] + if until: + where.append(f"date <= toDate('{until}')") + where_sql = " AND ".join(where) + sql = ( + "SELECT asset, pnl, pnl_pct, exit_reason " + f"FROM dolphin.trade_events WHERE {where_sql}" + ) + return ch_query(sql) + + +def compute_bucket_stats( + trades: List[Dict[str, Any]], + assignments: Dict[str, int], + buckets: List[int], + min_trades: int, +) -> Dict[int, Dict[str, float]]: + stats: Dict[int, Dict[str, float]] = {b: {"n": 0, "wins": 0, "net_pnl": 0.0, "net_pnl_pct": 0.0} for b in buckets} + for t in trades: + asset = str(t.get("asset", "")) + b = assignments.get(asset) + if b is None or b not in stats: + continue + stats[b]["n"] += 1 + stats[b]["wins"] += 1 if float(t.get("pnl_pct") or 0.0) > 0 else 0 + stats[b]["net_pnl"] += float(t.get("pnl") or 0.0) + stats[b]["net_pnl_pct"] += float(t.get("pnl_pct") or 0.0) + + out = {} + for b, s in stats.items(): + n = s["n"] if s["n"] >= min_trades else s["n"] + if s["n"] == 0: + out[b] = {"n": 0, "wr": 0.0, "avg_pnl_pct": 0.0, "net_pnl": 0.0, "enough": False} + else: + out[b] = { + "n": s["n"], + "wr": s["wins"] / s["n"], + "avg_pnl_pct": s["net_pnl_pct"] / s["n"], + "net_pnl": s["net_pnl"], + "enough": s["n"] >= min_trades, + } + return out + + +def stats_to_multipliers(stats: Dict[int, Dict[str, float]]) -> Dict[int, float]: + """Translate per-bucket aggregate outcomes into a size multiplier. + + Conservative mapping (keeps the file a documented, deterministic function — + not an ML output). Brackets match the S6 reference row in + prod/docs/CRITICAL_ASSET_PICKING_BRACKETS_VS._ROI_WR_AT_TRADES.md. + + avg_pnl_pct > 0.015 → 2.0 (lean in) + avg_pnl_pct > 0.005 → 1.5 + avg_pnl_pct > 0.001 → 1.0 (omit from YAML — no-op) + avg_pnl_pct > 0.000 → 0.5 + avg_pnl_pct > -0.002 → 0.3 + avg_pnl_pct > -0.005 → 0.0 (ban candidate — see `asset_bucket_ban_set`) + otherwise → 0.0 + """ + out: Dict[int, float] = {} + for b, s in stats.items(): + if not s.get("enough"): + continue # skip thin buckets — default to no-op (absent = 1.0x) + apct = s["avg_pnl_pct"] + if apct > 0.015: out[b] = 2.0 + elif apct > 0.005: out[b] = 1.5 + elif apct > 0.001: pass # no-op (1.0x) + elif apct > 0.000: out[b] = 0.5 + elif apct > -0.002: out[b] = 0.3 + else: out[b] = 0.0 + return out + + +def load_prior_table(path: Path) -> Dict[int, float]: + if not path.exists(): + return {} + try: + import yaml + doc = yaml.safe_load(path.read_text()) or {} + buckets = doc.get("buckets") or {} + return {int(k): float(v) for k, v in buckets.items()} + except Exception as e: + print(f"[s6_recompute] prior table load warning: {e}", file=sys.stderr) + return {} + + +def write_table_yml(path: Path, buckets: Dict[int, float], meta: Dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + lines = [ + "# GREEN S6 bucket sizing table — auto-generated, do not edit by hand.", + "# Consumed by nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py", + "# at the single-site notional application.", + "", + "meta:", + f" generated_at: \"{meta['generated_at']}\"", + f" source_branch: \"{meta.get('source_branch', '')}\"", + f" strategy: \"{meta['strategy']}\"", + f" n_trades: {meta['n_trades']}", + f" min_trades_per_bucket: {meta['min_trades']}", + "", + "buckets:", + ] + for b in sorted(buckets): + lines.append(f" {b}: {buckets[b]:.2f}") + path.write_text("\n".join(lines) + "\n") + + +def write_report(path: Path, stats: Dict[int, Dict[str, float]], + new_table: Dict[int, float], prior_table: Dict[int, float], + meta: Dict[str, Any], flagged: List[int]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + lines = [ + f"# S6 Coefficient Recompute — {meta['generated_at']}", + "", + f"- Strategy: `{meta['strategy']}`", + f"- Window: `{meta['since']}` → `{meta.get('until') or 'now'}`", + f"- Total trades: `{meta['n_trades']}`", + f"- Min trades per bucket: `{meta['min_trades']}`", + "", + "## Per-bucket stats", + "", + "| Bucket | Trades | WR % | Avg PnL % | Net PnL | New mult | Prior mult | Flagged |", + "|---:|---:|---:|---:|---:|---:|---:|:---:|", + ] + for b in sorted(stats): + s = stats[b] + new = new_table.get(b, 1.0) + prior = prior_table.get(b, 1.0) + flag = "⚠" if b in flagged else "" + lines.append( + f"| {b} | {s['n']} | {100*s['wr']:.2f} | {100*s['avg_pnl_pct']:.3f} | " + f"{s['net_pnl']:.2f} | {new:.2f} | {prior:.2f} | {flag} |" + ) + if flagged: + lines += [ + "", + "## ⚠ Flagged for manual review", + "", + "The following buckets moved more than the variance threshold since the", + "prior pinned table. The new YAML was written but should be reviewed", + "before promotion to staging/prod — see `dolphin.s6_recompute_log`.", + "", + "| Bucket | Prior net | New net | Δ% |", + "|---:|---:|---:|---:|", + ] + for b in flagged: + prior_net = prior_table.get(f"_net_{b}") # may be absent + lines.append(f"| {b} | — | {stats[b]['net_pnl']:.2f} | — |") + path.write_text("\n".join(lines) + "\n") + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--strategy", default="blue", + help="trade_events.strategy used for historical grounding (BLUE by default; richest dataset)") + ap.add_argument("--since", default="2026-01-01") + ap.add_argument("--until", default=None) + ap.add_argument("--min-trades-per-bucket", dest="min_trades", type=int, default=20) + ap.add_argument("--variance-threshold", type=float, default=0.20, + help="fraction move in bucket net-PnL that flags a bucket for review (default 0.20 = 20%%)") + ap.add_argument("--bucket-pkl", default=str(DEFAULT_BUCKET_PKL), + help="path to bucket_assignments.pkl") + ap.add_argument("--out-yml", default=str(DEFAULT_TABLE_YML)) + ap.add_argument("--out-report", default=None, + help="report path (default: prod/docs/S6_COEFFICIENTS_REPORT_.md)") + ap.add_argument("--source-branch", default="", + help="branch slug to stamp into the YAML frontmatter for traceability") + args = ap.parse_args() + + assignments = load_bucket_assignments(Path(args.bucket_pkl)) + buckets_seen = sorted(set(assignments.values())) + trades = pull_trades(args.strategy, args.since, args.until) + stats = compute_bucket_stats(trades, assignments, buckets_seen, args.min_trades) + new_table = stats_to_multipliers(stats) + + prior_table = load_prior_table(Path(args.out_yml)) + + # Variance guard — flag buckets whose net-PnL moved more than the threshold. + flagged: List[int] = [] + for b, s in stats.items(): + if not s.get("enough"): + continue + prior_mult = prior_table.get(b) + new_mult = new_table.get(b, 1.0) + if prior_mult is None: + continue + denom = max(abs(prior_mult), 1e-6) + if abs(new_mult - prior_mult) / denom > args.variance_threshold: + flagged.append(b) + + generated_at = datetime.utcnow().isoformat(timespec="seconds") + "Z" + meta = { + "generated_at": generated_at, + "source_branch": args.source_branch, + "strategy": args.strategy, + "since": args.since, + "until": args.until, + "n_trades": len(trades), + "min_trades": args.min_trades, + } + + write_table_yml(Path(args.out_yml), new_table, meta) + + out_report = Path(args.out_report) if args.out_report else REPORT_DIR / f"S6_COEFFICIENTS_REPORT_{generated_at[:10]}.md" + write_report(out_report, stats, new_table, prior_table, meta, flagged) + + ensure_recompute_log() + for b in flagged: + ch_insert("s6_recompute_log", { + "ts": int(datetime.utcnow().timestamp() * 1_000_000), + "bucket_id": int(b), + "n_trades": int(stats[b]["n"]), + "wr_pct": float(100.0 * stats[b]["wr"]), + "net_pnl": float(stats[b]["net_pnl"]), + "prior_net": None, + "delta_pct": None, + "new_mult": float(new_table.get(b, 1.0)), + "prior_mult": float(prior_table.get(b, 1.0)), + "flagged": 1, + }) + + print(f"Wrote {args.out_yml} ({len(new_table)} active buckets, {len(flagged)} flagged)") + print(f"Wrote {out_report}") + return 0 + + +if __name__ == "__main__": + sys.exit(main())