6 Commits

Author SHA1 Message Date
hjnormey
aac4484c0f test(green): add green-only feature tests
New test_green_only_features.py covers the sprint additions; the existing
test_green_blue_parity.py (104 tests) is left untouched as the authoritative
doctrinal baseline.

Tests:
- Toggles-OFF identity: with all GREEN kwargs at BLUE defaults (None/False)
  NDAlphaEngine + NDPosition produce the pre-sprint forms
- S6 selector ban: banned bucket assets skipped; slot rerouted (not wasted)
- AEM MAE table: B3→None disables stop, B4→2.0 strict, B6→6.0 wide,
  unknown bucket falls back to MAE_MULT_TIER1
- EsoF rename: advisor emits UNKNOWN not NEUTRAL; both label keys present
  in gate tables with identical values (historical replay safe)
- Int-leverage gate: source still has leverage_int=1 with both option
  comments present (guards against premature flip before winrate analysis)

Plan ref: Task 11.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 06:08:20 +02:00
hjnormey
36d263eb91 feat(config/scripts): GREEN config wiring + S6 recompute tooling
prod/configs/green.yml:
- asset_bucket_ban_set: [4] (B4 banned at selector level)
- s6_size_table: inline bootstrap multipliers (B0→0.4, B1→0.3, B3→2.0,
  B5→0.5, B6→1.5) matching CRITICAL_ASSET_PICKING S6 scenario
- esof_sizing_table: FAV→1.2, MILD_POS→0.6, UNKNOWN→0.25,
  MILD_NEG→0.0, UNFAV→0.0
- use_int_leverage: true (1x fixed pending winrate analysis)
- s6_table_path: pointer to generated YAML (recompute updates this)
BLUE (blue.yml) carries none of these keys → BLUE math unchanged.

prod/configs/green_s6_table.yml: bootstrap stub with frontmatter
(generated_at, source_branch, n_trades). Regenerated by recompute script.

prod/scripts/recompute_s6_coefficients.py:
  Queries trade_events, maps assets to KMeans buckets, derives per-bucket
  sizing mults. Variance guard: >20% net-PnL move flags bucket in
  dolphin.s6_recompute_log for manual review before promote.

prod/s6_recompute_flow.py:
  Prefect flow wrapping the recompute script. Cadence via
  S6_RECOMPUTE_INTERVAL_DAYS env (default 30). Kill-switch:
  S6_RECOMPUTE_DISABLED=1.

prod/scripts/analyze_leverage_winrate.py:
  Read-only walk of CH trade_events; bins trades by leverage_raw,
  emits per-bin WR/net-PnL/avg-MAE. Output informs the int-leverage
  rounding rule choice (Option 1 round-half-up vs Option 2 banker's round
  vs stay-at-1x). Does not auto-apply a rule change.

Plan refs: Tasks 3, 8, 10.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 06:08:08 +02:00
hjnormey
48dcf3fe13 feat(aem): per-bucket MAE_MULT table + shadow logging completeness
AEM stays shadow-only this sprint (V7 drives live exits); changes affect
what AEM *would have done*, logged to CH for future V7→AEM demotion analysis.

adaptive_exit_engine.py:
- Replace single MAE_MULT_TIER1=3.5 with MAE_MULT_BY_BUCKET dict
  (B3→None disables MAE stop, B4→2.0 strict, B6→6.0 wide band)
- evaluate() return dict extended: mae_mult_applied, mae_threshold,
  atr, p_threshold, giveback_k (all params that drove the decision)
- adaptive_exit_shadow schema extended (new Nullable columns added via
  ALTER TABLE IF NOT EXISTS — backward-compat with pre-sprint rows)
- log_shadow() signature extended: v7_action, v7_exit_reason,
  naive_would_have (TP/STOP/MAX_HOLD counterfactual at same instant)

dolphin_actor.py:
- AEM shadow call now passes V7 head-to-head decision and naive
  counterfactual so future retrospective requires no offline replay
- EsoF listener registered on DOLPHIN_FEATURES map (esof_advisor_latest
  key); label fed into engine._current_esof_label before each step_bar
- S6/bucket loaders (_load_s6_size_table, _load_asset_bucket_data) and
  constructor wiring for the new GREEN engine kwargs

Plan refs: Tasks 5, 7, 10 — V7 path untouched, AEM return value is
never gated, CH shadow is best-effort (daemon thread).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 06:07:46 +02:00
hjnormey
af5156f52d feat(esof): rename NEUTRAL→UNKNOWN + backward-compat alias
The mid-band advisory label (constituent signals in conflict) was called
NEUTRAL, implying "benign middle" — but retrospective data (637 trades)
shows it is empirically the worst-ROI regime. Renaming to UNKNOWN makes
the semantics explicit for regime-gate consumers.

- esof_advisor.py: emits UNKNOWN; LABEL_COLOR keeps NEUTRAL alias for
  historical CH rows / stale HZ snapshots
- esof_gate.py: S6_MULT, IRP_PARAMS, Strategy A mult_map all keyed on
  UNKNOWN with NEUTRAL alias (values identical → replays unaffected)
- prod/docs/ESOF_LABEL_MIGRATION.md: migration note, CH/HZ impact,
  rollback procedure

Plan ref: Task 4 — NEUTRAL→UNKNOWN is load-bearing for the EsoF gate
in the orchestrator (0.25× sizing vs 1.0× under old label semantics).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 06:07:30 +02:00
hjnormey
0da46d8635 feat(s6/esof): orchestrator single-site S6+EsoF+int-leverage gate
All sizing multipliers now applied at one location (esf_alpha_orchestrator.py
line 565 region) so there are no hidden side-effects and BLUE parity is
trivially preserved by leaving the new kwargs at None/False.

- S6 per-bucket notional multiplier via s6_size_table kwarg
- EsoF regime gate at _try_entry top: reads _current_esof_label, skips
  entry if esof_sizing_table maps to 0.0 (UNFAV/MILD_NEG regime)
- Integer-leverage gate: use_int_leverage=True → leverage_int=1 (FIXED,
  pending prod/scripts/analyze_leverage_winrate.py analysis); float
  leverage_raw preserved in NDPosition + return dict for CH logging
- notional <= 0 → return None guard (prevents 0-notional positions)
- NDPosition extended: asset_bucket_id, s6_mult, esof_mult, esof_label,
  leverage_raw fields (BLUE leaves these at defaults)

Plan ref: Task 2 — sizer stays pure (its returned notional is discarded
by line 565 anyway; adding mults inside would be dead code with hidden
side-effects).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 06:07:20 +02:00
hjnormey
ce7f3ce8ff feat(s6): AlphaAssetSelector bucket-ban kwargs (BLUE no-op)
Add `asset_bucket_ban_set` and `asset_bucket_assignments` kwargs to
AlphaAssetSelector.__init__ (both default None → BLUE unchanged).

When active, assets whose KMeans bucket is in the ban set are skipped
inside rank_assets() so the next-ranked asset takes the slot — capital
is preserved rather than wasted by a 0× sizer multiplier.

Plan ref: Task 9 — ban ≠ 0× is the critical caveat from the sprint plan.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 06:07:07 +02:00
14 changed files with 1309 additions and 30 deletions

View File

@@ -291,7 +291,10 @@ def compute_esof(now: datetime = None) -> dict:
if advisory_score > 0.25: advisory_label = "FAVORABLE" if advisory_score > 0.25: advisory_label = "FAVORABLE"
elif advisory_score > 0.05: advisory_label = "MILD_POSITIVE" elif advisory_score > 0.05: advisory_label = "MILD_POSITIVE"
elif advisory_score > -0.05: advisory_label = "NEUTRAL" # UNKNOWN (was NEUTRAL): constituent signals in conflict. Empirically the worst
# ROI bucket, not a benign mid-state — naming is load-bearing for consumers
# making "stand aside vs size-down" decisions.
elif advisory_score > -0.05: advisory_label = "UNKNOWN"
elif advisory_score > -0.25: advisory_label = "MILD_NEGATIVE" elif advisory_score > -0.25: advisory_label = "MILD_NEGATIVE"
else: advisory_label = "UNFAVORABLE" else: advisory_label = "UNFAVORABLE"
@@ -394,7 +397,8 @@ CYAN = "\033[36m"; BOLD = "\033[1m"; DIM = "\033[2m"; RST = "\033[0m"
LABEL_COLOR = { LABEL_COLOR = {
"FAVORABLE": GREEN, "FAVORABLE": GREEN,
"MILD_POSITIVE":"\033[92m", "MILD_POSITIVE":"\033[92m",
"NEUTRAL": YELLOW, "UNKNOWN": YELLOW, # renamed from NEUTRAL — signals-in-conflict
"NEUTRAL": YELLOW, # backward-compat for historical CH rows / stale HZ snapshots
"MILD_NEGATIVE":"\033[91m", "MILD_NEGATIVE":"\033[91m",
"UNFAVORABLE": RED, "UNFAVORABLE": RED,
} }

View File

@@ -104,13 +104,16 @@ S6_MULT: Dict[str, Dict[int, float]] = {
# B0 B1 B2 B3 B4 B5 B6 # B0 B1 B2 B3 B4 B5 B6
"FAVORABLE": {0: 0.65, 1: 0.50, 2: 0.0, 3: 2.0, 4: 0.20, 5: 0.75, 6: 1.5}, "FAVORABLE": {0: 0.65, 1: 0.50, 2: 0.0, 3: 2.0, 4: 0.20, 5: 0.75, 6: 1.5},
"MILD_POSITIVE": {0: 0.50, 1: 0.35, 2: 0.0, 3: 2.0, 4: 0.10, 5: 0.60, 6: 1.5}, "MILD_POSITIVE": {0: 0.50, 1: 0.35, 2: 0.0, 3: 2.0, 4: 0.10, 5: 0.60, 6: 1.5},
# UNKNOWN replaces NEUTRAL (constituent signals in conflict — empirically the
# worst-ROI state). Keep NEUTRAL as alias so historical CH replays still resolve.
"UNKNOWN": {0: 0.40, 1: 0.30, 2: 0.0, 3: 2.0, 4: 0.0, 5: 0.50, 6: 1.5},
"NEUTRAL": {0: 0.40, 1: 0.30, 2: 0.0, 3: 2.0, 4: 0.0, 5: 0.50, 6: 1.5}, "NEUTRAL": {0: 0.40, 1: 0.30, 2: 0.0, 3: 2.0, 4: 0.0, 5: 0.50, 6: 1.5},
"MILD_NEGATIVE": {0: 0.20, 1: 0.20, 2: 0.0, 3: 1.5, 4: 0.0, 5: 0.30, 6: 1.2}, "MILD_NEGATIVE": {0: 0.20, 1: 0.20, 2: 0.0, 3: 1.5, 4: 0.0, 5: 0.30, 6: 1.2},
"UNFAVORABLE": {0: 0.0, 1: 0.0, 2: 0.0, 3: 1.5, 4: 0.0, 5: 0.0, 6: 1.2}, "UNFAVORABLE": {0: 0.0, 1: 0.0, 2: 0.0, 3: 1.5, 4: 0.0, 5: 0.0, 6: 1.2},
} }
# Base S6 (NEUTRAL row above) — exposed for quick reference # Base S6 — UNKNOWN/NEUTRAL rows above are identical (alias)
S6_BASE: Dict[int, float] = S6_MULT["NEUTRAL"] S6_BASE: Dict[int, float] = S6_MULT["UNKNOWN"]
# ── IRP filter threshold tables keyed by advisory_label (Strategy S6_IRP) ───── # ── IRP filter threshold tables keyed by advisory_label (Strategy S6_IRP) ─────
@@ -121,13 +124,14 @@ S6_BASE: Dict[int, float] = S6_MULT["NEUTRAL"]
IRP_PARAMS: Dict[str, Dict[str, float]] = { IRP_PARAMS: Dict[str, Dict[str, float]] = {
"FAVORABLE": {"alignment_min": 0.15, "noise_max": 640.0, "latency_max": 24}, "FAVORABLE": {"alignment_min": 0.15, "noise_max": 640.0, "latency_max": 24},
"MILD_POSITIVE": {"alignment_min": 0.17, "noise_max": 560.0, "latency_max": 22}, "MILD_POSITIVE": {"alignment_min": 0.17, "noise_max": 560.0, "latency_max": 22},
"NEUTRAL": {"alignment_min": 0.20, "noise_max": 500.0, "latency_max": 20}, "UNKNOWN": {"alignment_min": 0.20, "noise_max": 500.0, "latency_max": 20},
"NEUTRAL": {"alignment_min": 0.20, "noise_max": 500.0, "latency_max": 20}, # alias
"MILD_NEGATIVE": {"alignment_min": 0.22, "noise_max": 440.0, "latency_max": 18}, "MILD_NEGATIVE": {"alignment_min": 0.22, "noise_max": 440.0, "latency_max": 18},
"UNFAVORABLE": {"alignment_min": 0.25, "noise_max": 380.0, "latency_max": 15}, "UNFAVORABLE": {"alignment_min": 0.25, "noise_max": 380.0, "latency_max": 15},
} }
# Gold-spec thresholds (NEUTRAL row) # Gold-spec thresholds (UNKNOWN/NEUTRAL row)
IRP_GOLD: Dict[str, float] = IRP_PARAMS["NEUTRAL"] IRP_GOLD: Dict[str, float] = IRP_PARAMS["UNKNOWN"]
# ── GateResult ───────────────────────────────────────────────────────────────── # ── GateResult ─────────────────────────────────────────────────────────────────
@@ -157,7 +161,8 @@ def strategy_A_lev_scale(adv: dict) -> GateResult:
mult_map = { mult_map = {
"UNFAVORABLE": 0.50, "UNFAVORABLE": 0.50,
"MILD_NEGATIVE": 0.75, "MILD_NEGATIVE": 0.75,
"NEUTRAL": 1.00, "UNKNOWN": 1.00,
"NEUTRAL": 1.00, # alias — historical CH replays
"MILD_POSITIVE": 1.00, "MILD_POSITIVE": 1.00,
"FAVORABLE": 1.00, "FAVORABLE": 1.00,
} }

View File

@@ -35,7 +35,7 @@ import os
import threading import threading
import time import time
import urllib.request import urllib.request
from typing import Optional from typing import Dict, Optional
import numpy as np import numpy as np
@@ -45,11 +45,28 @@ from adaptive_exit.continuation_model import ContinuationModelBank, FEATURE_COLS
# ── Config ──────────────────────────────────────────────────────────────────── # ── Config ────────────────────────────────────────────────────────────────────
P_THRESHOLD = 0.40 # P(continuation) below this → consider exit P_THRESHOLD = 0.40 # P(continuation) below this → consider exit
GIVEBACK_K = 0.50 # MFE giveback fraction GIVEBACK_K = 0.50 # MFE giveback fraction
MAE_MULT_TIER1 = 3.5 # vol multiplier for tier-1 stop MAE_MULT_TIER1 = 3.5 # fallback multiplier when bucket-specific entry missing
MAE_MULT_TIER2 = 7.0 MAE_MULT_TIER2 = 7.0
ATR_WINDOW = 20 ATR_WINDOW = 20
MIN_ATR = 1e-6 MIN_ATR = 1e-6
# Per-bucket MAE multipliers — replaces single MAE_MULT_TIER1 in the stop check.
# Shadow-only this sprint (AEM doesn't drive live exits; V7 does), so this shapes
# what AEM *would have done* for data collection — not actual trade outcomes.
# `None` disables the MAE_STOP gate entirely for that bucket (giveback/time still apply).
# B3 — natural winners; shadow shows 5.05.1 MAE peaks before FIXED_TP succeeds
# B4 — gross-negative alpha; cut fast before drawdown compounds
# B6 — extreme-vol assets; wide band or we trip on noise
MAE_MULT_BY_BUCKET: Dict[int, Optional[float]] = {
0: 3.5,
1: 3.0,
2: 3.5,
3: None,
4: 2.0,
5: 4.0,
6: 6.0,
}
_CH_URL = "http://localhost:8123/" _CH_URL = "http://localhost:8123/"
_CH_HEADERS = {"X-ClickHouse-User": "dolphin", "X-ClickHouse-Key": "dolphin_ch_2026"} _CH_HEADERS = {"X-ClickHouse-User": "dolphin", "X-ClickHouse-Key": "dolphin_ch_2026"}
@@ -72,7 +89,13 @@ def _ch_insert(row: dict, db: str = _SHADOW_DB) -> None:
def _ensure_shadow_table() -> None: def _ensure_shadow_table() -> None:
"""Create shadow table if it doesn't exist.""" """Create shadow table if it doesn't exist.
Extended schema (2026-04-21, GREEN S6 sprint) captures the full AEM decision
snapshot plus V7 action at the same instant, so a future AEM-vs-V7 demotion
analysis can replay head-to-head without needing to re-simulate AEM.
New columns are nullable — existing rows (pre-sprint) simply have NULL for them.
"""
ddl = ( ddl = (
f"CREATE TABLE IF NOT EXISTS {_SHADOW_DB}.{_SHADOW_TABLE} (" f"CREATE TABLE IF NOT EXISTS {_SHADOW_DB}.{_SHADOW_TABLE} ("
"ts DateTime64(6, 'UTC')," "ts DateTime64(6, 'UTC'),"
@@ -90,19 +113,43 @@ def _ensure_shadow_table() -> None:
"action LowCardinality(String)," "action LowCardinality(String),"
"exit_reason LowCardinality(String)," "exit_reason LowCardinality(String),"
"actual_exit LowCardinality(String)," "actual_exit LowCardinality(String),"
"pnl_pct Float32" "pnl_pct Float32,"
# ── AEM decision params (Nullable to stay backward-compat) ──
"mae_mult_applied Nullable(Float32),"
"mae_threshold Nullable(Float32),"
"atr Nullable(Float32),"
"p_threshold Nullable(Float32),"
"giveback_k Nullable(Float32),"
# ── V7 head-to-head (authoritative path) ──
"v7_action LowCardinality(Nullable(String)),"
"v7_exit_reason LowCardinality(Nullable(String)),"
# ── Naive counterfactual (what the dumb TP/STOP/MAX_HOLD would have done) ──
"naive_would_have LowCardinality(Nullable(String))"
") ENGINE = MergeTree()" ") ENGINE = MergeTree()"
" ORDER BY (ts_day, asset, ts)" " ORDER BY (ts_day, asset, ts)"
" TTL ts_day + INTERVAL 90 DAY" " TTL ts_day + INTERVAL 90 DAY"
) )
# For pre-existing tables, add the new columns idempotently. CH treats
# ADD COLUMN IF NOT EXISTS as a no-op when the column is already present.
alters = [
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS mae_mult_applied Nullable(Float32)",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS mae_threshold Nullable(Float32)",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS atr Nullable(Float32)",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS p_threshold Nullable(Float32)",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS giveback_k Nullable(Float32)",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS v7_action LowCardinality(Nullable(String))",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS v7_exit_reason LowCardinality(Nullable(String))",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS naive_would_have LowCardinality(Nullable(String))",
]
try: try:
body = ddl.encode() for stmt in (ddl, *alters):
req = urllib.request.Request(_CH_URL, data=body, method="POST") body = stmt.encode()
for k, v in _CH_HEADERS.items(): req = urllib.request.Request(_CH_URL, data=body, method="POST")
req.add_header(k, v) for k, v in _CH_HEADERS.items():
urllib.request.urlopen(req, timeout=10) req.add_header(k, v)
urllib.request.urlopen(req, timeout=10)
except Exception as e: except Exception as e:
print(f"[AdaptiveExitEngine] Warning: could not create shadow table: {e}") print(f"[AdaptiveExitEngine] Warning: could not create/alter shadow table: {e}")
# ── Per-trade state ─────────────────────────────────────────────────────────── # ── Per-trade state ───────────────────────────────────────────────────────────
@@ -324,12 +371,14 @@ class AdaptiveExitEngine:
bucket_id=st.bucket_id, bucket_id=st.bucket_id,
) )
# Decision logic # Decision logic — per-bucket MAE multiplier. `None` entry disables the
mae_threshold = max(0.005, MAE_MULT_TIER1 * atr) # MAE_STOP gate for that bucket (giveback + time checks still apply).
mae_mult = MAE_MULT_BY_BUCKET.get(st.bucket_id, MAE_MULT_TIER1)
mae_threshold = max(0.005, mae_mult * atr) if mae_mult is not None else None
action = "HOLD" action = "HOLD"
exit_reason = "" exit_reason = ""
if st.mae > mae_threshold: if mae_threshold is not None and st.mae > mae_threshold:
action = "EXIT" action = "EXIT"
exit_reason = "AE_MAE_STOP" exit_reason = "AE_MAE_STOP"
elif (st.peak_mfe > 0 and st.mfe < GIVEBACK_K * st.peak_mfe elif (st.peak_mfe > 0 and st.mfe < GIVEBACK_K * st.peak_mfe
@@ -352,10 +401,31 @@ class AdaptiveExitEngine:
"bucket_id": st.bucket_id, "bucket_id": st.bucket_id,
"vel_div_entry": st.vel_div_entry, "vel_div_entry": st.vel_div_entry,
"vel_div_now": vel_div_now, "vel_div_now": vel_div_now,
"mae_mult_applied": mae_mult,
"mae_threshold": mae_threshold,
"atr": atr,
"p_threshold": P_THRESHOLD,
"giveback_k": GIVEBACK_K,
} }
def log_shadow(self, shadow: dict, actual_exit: str = "", pnl_pct: float = 0.0) -> None: def log_shadow(
"""Async log a shadow decision to ClickHouse.""" self,
shadow: dict,
actual_exit: str = "",
pnl_pct: float = 0.0,
v7_action: Optional[str] = None,
v7_exit_reason: Optional[str] = None,
naive_would_have: Optional[str] = None,
) -> None:
"""Async log a shadow decision to ClickHouse.
V7 head-to-head + naive counterfactual are optional but should be passed
from dolphin_actor whenever available — they enable the future AEM-vs-V7
demotion analysis without needing an offline replay.
"""
def _opt(v):
return None if v is None else float(v)
row = { row = {
"ts": int(time.time() * 1e6), "ts": int(time.time() * 1e6),
"trade_id": shadow.get("trade_id", ""), "trade_id": shadow.get("trade_id", ""),
@@ -372,5 +442,15 @@ class AdaptiveExitEngine:
"exit_reason": shadow.get("exit_reason_shadow", ""), "exit_reason": shadow.get("exit_reason_shadow", ""),
"actual_exit": actual_exit, "actual_exit": actual_exit,
"pnl_pct": float(pnl_pct), "pnl_pct": float(pnl_pct),
# New AEM-decision params (Nullable-capable)
"mae_mult_applied": _opt(shadow.get("mae_mult_applied")),
"mae_threshold": _opt(shadow.get("mae_threshold")),
"atr": _opt(shadow.get("atr")),
"p_threshold": _opt(shadow.get("p_threshold")),
"giveback_k": _opt(shadow.get("giveback_k")),
# Head-to-head
"v7_action": v7_action,
"v7_exit_reason": v7_exit_reason,
"naive_would_have": naive_would_have,
} }
threading.Thread(target=_ch_insert, args=(row,), daemon=True).start() threading.Thread(target=_ch_insert, args=(row,), daemon=True).start()

View File

@@ -10,7 +10,7 @@ import math
import numpy as np import numpy as np
from numba import njit from numba import njit
from dataclasses import dataclass from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple from typing import Dict, List, Optional, Set, Tuple
# ── Constants (matching dolphin_vbt_real.py lines 104-115) ──────────────────── # ── Constants (matching dolphin_vbt_real.py lines 104-115) ────────────────────
@@ -195,9 +195,13 @@ class AlphaAssetSelector:
self, self,
lookback_horizon: int = IRP_LOOKBACK, lookback_horizon: int = IRP_LOOKBACK,
ars_weights: Optional[List[float]] = None, ars_weights: Optional[List[float]] = None,
asset_bucket_ban_set: Optional[Set[int]] = None,
asset_bucket_assignments: Optional[Dict[str, int]] = None,
): ):
self.H = lookback_horizon self.H = lookback_horizon
self.weights = ars_weights or ARS_WEIGHTS self.weights = ars_weights or ARS_WEIGHTS
self.asset_bucket_ban_set = asset_bucket_ban_set
self.asset_bucket_assignments = asset_bucket_assignments or {}
def rank_assets( def rank_assets(
self, self,
@@ -236,6 +240,15 @@ class AlphaAssetSelector:
best_eff = valid[ri, 4] best_eff = valid[ri, 4]
asset = asset_names[asset_idx] asset = asset_names[asset_idx]
# GREEN-only bucket-ban filter. BLUE passes None → no-op.
# Skips banned-bucket assets so the next-ranked asset takes the slot
# (preserves capital; a sizer 0× would waste the slot).
if self.asset_bucket_ban_set:
bkt = self.asset_bucket_assignments.get(asset)
if bkt is not None and bkt in self.asset_bucket_ban_set:
continue
action = "SHORT" if trade_dir == -1 else "LONG" action = "SHORT" if trade_dir == -1 else "LONG"
orientation = "DIRECT" if trade_dir == regime_direction else "INVERSE" orientation = "DIRECT" if trade_dir == regime_direction else "INVERSE"

View File

@@ -5,6 +5,7 @@ import threading
import time import time
from collections import deque, namedtuple from collections import deque, namedtuple
from datetime import datetime, timezone from datetime import datetime, timezone
from typing import Optional
import numpy as np import numpy as np
import pandas as pd import pandas as pd
from pathlib import Path from pathlib import Path
@@ -136,6 +137,7 @@ class DolphinActor(Strategy):
self._v6_decisions: dict = {} # trade_id → latest evaluate() result self._v6_decisions: dict = {} # trade_id → latest evaluate() result
# EXF macro snapshot — updated from ACB payload, injected into V7 contexts each scan # EXF macro snapshot — updated from ACB payload, injected into V7 contexts each scan
self._last_exf: dict = {} # keys: funding, dvol, fear_greed, taker self._last_exf: dict = {} # keys: funding, dvol, fear_greed, taker
self._current_esof_label: Optional[str] = None # cached from HZ esof_advisor_latest
# Adaptive exit engine — parallel shadow mode (never executes real exits) # Adaptive exit engine — parallel shadow mode (never executes real exits)
self._adaptive_exit = None self._adaptive_exit = None
# Stablecoin symbols ÔÇö kept in eigen for purity, hard-blocked at picker # Stablecoin symbols ÔÇö kept in eigen for purity, hard-blocked at picker
@@ -148,6 +150,57 @@ class DolphinActor(Strategy):
self._bucket_assignments: dict = {} self._bucket_assignments: dict = {}
self._hibernate_protect_active: str | None = None self._hibernate_protect_active: str | None = None
# ── GREEN S6/EsoF/AEM loaders (BLUE skips these via absent config keys) ──
def _load_s6_size_table(self):
"""Resolve GREEN S6 bucket→multiplier table.
Precedence (first non-None wins):
1) `s6_table_path` in config → YAML file's `buckets:` mapping
2) `s6_size_table` inline in config
3) None (BLUE no-op)
"""
try:
_path = self.dolphin_config.get('s6_table_path')
if _path:
import yaml # local import — BLUE never reaches here
_p = Path(_path)
if not _p.is_absolute():
_p = Path.cwd() / _p
if _p.exists():
with open(_p, 'r') as _f:
_doc = yaml.safe_load(_f) or {}
_b = _doc.get('buckets')
if isinstance(_b, dict):
return {int(k): float(v) for k, v in _b.items()}
_inline = self.dolphin_config.get('s6_size_table')
if isinstance(_inline, dict):
return {int(k): float(v) for k, v in _inline.items()}
except Exception as _e:
self.log.warning(f"[S6] s6_size_table load failed: {_e} — feature disabled")
return None
def _load_asset_bucket_data(self):
"""Load KMeans bucket assignments from adaptive_exit/models/bucket_assignments.pkl.
Returns the authoritative `{"assignments": {symbol: int, ...}, ...}` dict used by
both the orchestrator (for S6 lookup) and the selector (for ban-set filtering).
"""
try:
import pickle
_path = self.dolphin_config.get(
'asset_bucket_pkl',
'adaptive_exit/models/bucket_assignments.pkl',
)
_p = Path(_path)
if not _p.is_absolute():
_p = Path.cwd() / _p
if _p.exists():
with open(_p, 'rb') as _f:
return pickle.load(_f)
except Exception as _e:
self.log.warning(f"[S6] bucket_assignments load failed: {_e} — S6 + ban disabled")
return None
def on_start(self): def on_start(self):
# Read posture from HZ DOLPHIN_SAFETY # Read posture from HZ DOLPHIN_SAFETY
self.hz_client = self._connect_hz() self.hz_client = self._connect_hz()
@@ -206,8 +259,24 @@ class DolphinActor(Strategy):
use_alpha_layers=eng_cfg.get('use_alpha_layers', True), use_alpha_layers=eng_cfg.get('use_alpha_layers', True),
use_dynamic_leverage=eng_cfg.get('use_dynamic_leverage', True), use_dynamic_leverage=eng_cfg.get('use_dynamic_leverage', True),
seed=eng_cfg.get('seed', 42), seed=eng_cfg.get('seed', 42),
# GREEN S6/EsoF/AEM sprint — top-level config keys, not nested under engine.
# BLUE leaves these unset → orchestrator reads None/False → BLUE no-op.
s6_size_table=self._load_s6_size_table(),
esof_sizing_table=self.dolphin_config.get('esof_sizing_table'),
asset_bucket_data=self._load_asset_bucket_data(),
use_int_leverage=bool(self.dolphin_config.get('use_int_leverage', False)),
) )
self.engine = create_boost_engine(mode=boost_mode, **_engine_kwargs) self.engine = create_boost_engine(mode=boost_mode, **_engine_kwargs)
# Wire asset selector ban-set (shared file, BLUE-invariant when ban_set is None).
_ban = self.dolphin_config.get('asset_bucket_ban_set')
_bucket_data = _engine_kwargs.get('asset_bucket_data') or {}
if _ban:
try:
self.engine.asset_selector.asset_bucket_ban_set = set(int(b) for b in _ban)
self.engine.asset_selector.asset_bucket_assignments = dict(_bucket_data.get('assignments', {}))
except Exception as _e:
self.log.warning(f"[S6] Failed to wire asset_bucket_ban_set: {_e}")
self.engine.set_esoteric_hazard_multiplier(0.0) # gold spec: init guard, MUST precede set_mc_forewarner self.engine.set_esoteric_hazard_multiplier(0.0) # gold spec: init guard, MUST precede set_mc_forewarner
# == MC-Forewarner injection =========================================== # == MC-Forewarner injection ===========================================
@@ -509,10 +578,34 @@ class DolphinActor(Strategy):
added_func=self._on_scan_event, added_func=self._on_scan_event,
updated_func=self._on_scan_event, updated_func=self._on_scan_event,
) )
self.log.info("[HZ] Push listeners registered: acb_boost + latest_eigen_scan") # EsoF advisor listener — feeds orchestrator regime gate at _try_entry.
# Callback is zero-work (JSON parse + dict write); the label is consumed
# just before step_bar on the timer thread.
features.add_entry_listener(
include_value=True,
key='esof_advisor_latest',
added_func=self._on_esof_event,
updated_func=self._on_esof_event,
)
self.log.info("[HZ] Push listeners registered: acb_boost + latest_eigen_scan + esof_advisor_latest")
except Exception as e: except Exception as e:
self.log.error(f"Failed to setup ACB listener: {e}") self.log.error(f"Failed to setup ACB listener: {e}")
def _on_esof_event(self, event):
"""Cache EsoF label for the orchestrator regime gate. Tolerates stale JSON
schema — on any parse error we fall back to no label (orchestrator treats
None as UNKNOWN, the new renamed conflict-state default)."""
try:
val = event.value
if not val:
return
parsed = json.loads(val) if isinstance(val, str) else val
label = parsed.get('advisory_label') if isinstance(parsed, dict) else None
if isinstance(label, str) and label:
self._current_esof_label = label
except Exception:
self._current_esof_label = None
def _on_scan_event(self, event): def _on_scan_event(self, event):
"""HZ reactor-thread callback -- fires immediately when NG7 writes to HZ. """HZ reactor-thread callback -- fires immediately when NG7 writes to HZ.
Zero-work: stores raw string + sets edge-trigger flag. No JSON parsing, Zero-work: stores raw string + sets edge-trigger flag. No JSON parsing,
@@ -683,6 +776,13 @@ class DolphinActor(Strategy):
getattr(self.engine, '_mc_gate_open', True), getattr(self.engine, '_mc_gate_open', True),
) )
# Feed EsoF label into orchestrator — consumed by regime gate at _try_entry top.
# Engine tolerates None (treated as UNKNOWN under the NEUTRAL→UNKNOWN rename).
try:
self.engine._current_esof_label = self._current_esof_label
except Exception:
pass
_step_start = time.monotonic() _step_start = time.monotonic()
try: try:
result = self.engine.step_bar( result = self.engine.step_bar(
@@ -1536,7 +1636,41 @@ class DolphinActor(Strategy):
exf=self._last_exf, exf=self._last_exf,
) )
_shadow['asset'] = _pend_ae['asset'] _shadow['asset'] = _pend_ae['asset']
self._adaptive_exit.log_shadow(_shadow)
# V7 head-to-head: pull the authoritative-path decision for
# the same trade at the same instant (may be None if V7 not
# wired or hasn't decided yet — log_shadow tolerates None).
_v7_dec_ae = self._v6_decisions.get(_tid_ae) or {}
_v7_action = _v7_dec_ae.get('action')
_v7_exit_reason = _v7_dec_ae.get('reason')
# Naive counterfactual: pure TP/STOP/MAX_HOLD at this bar.
_naive_would_have: Optional[str] = None
try:
_eng_cfg_ae = self.dolphin_config.get('engine', {})
_tp = float(_eng_cfg_ae.get('fixed_tp_pct', 0.0095))
_sl = float(_eng_cfg_ae.get('stop_pct', 0.0)) or 0.0
_mh = int(_eng_cfg_ae.get('max_hold_bars', 120))
_dir_ae = -1 if _pend_ae['side'] == 'SHORT' else 1
_ep_ae = float(_pend_ae['entry_price'] or 0.0)
_pnl_naive = (_dir_ae * (_ep_ae - float(_cur_px_ae)) / _ep_ae) if _ep_ae > 0 else 0.0
if _pnl_naive >= _tp:
_naive_would_have = 'TP'
elif _sl > 0 and _pnl_naive <= -_sl:
_naive_would_have = 'STOP'
elif _bars_ae >= _mh:
_naive_would_have = 'MAX_HOLD'
else:
_naive_would_have = 'HOLD'
except Exception:
_naive_would_have = None
self._adaptive_exit.log_shadow(
_shadow,
v7_action=_v7_action,
v7_exit_reason=_v7_exit_reason,
naive_would_have=_naive_would_have,
)
except Exception: except Exception:
pass # shadow must never affect live trading pass # shadow must never affect live trading

View File

@@ -14,6 +14,7 @@ Flow per tick:
Source: dolphin_vbt_real.py simulate_multi_asset_nb() lines 1715-2250 Source: dolphin_vbt_real.py simulate_multi_asset_nb() lines 1715-2250
""" """
import math
import uuid import uuid
import numpy as np import numpy as np
from numba import njit from numba import njit
@@ -57,6 +58,12 @@ class NDPosition:
entry_v50_vel: float = 0.0 entry_v50_vel: float = 0.0
entry_v750_vel: float = 0.0 entry_v750_vel: float = 0.0
current_price: float = 0.0 current_price: float = 0.0
# GREEN-only observability fields (BLUE leaves at defaults).
asset_bucket_id: Optional[int] = None
s6_mult: float = 1.0
esof_mult: float = 1.0
esof_label: Optional[str] = None
leverage_raw: Optional[float] = None # pre-int-rounding leverage, for CH analysis
@property @property
def pnl_pct(self) -> float: def pnl_pct(self) -> float:
@@ -134,6 +141,11 @@ class NDAlphaEngine:
use_dynamic_leverage: bool = True, use_dynamic_leverage: bool = True,
# Absolute leverage ceiling — ACB/MC/EsoF steer within [base, abs_max]; never breach # Absolute leverage ceiling — ACB/MC/EsoF steer within [base, abs_max]; never breach
abs_max_leverage: float = 6.0, abs_max_leverage: float = 6.0,
# GREEN-only toggles (BLUE no-op when left at None/False).
s6_size_table: Optional[Dict[int, float]] = None,
esof_sizing_table: Optional[Dict[str, float]] = None,
asset_bucket_data: Optional[Dict[str, Any]] = None,
use_int_leverage: bool = False,
# Seed # Seed
seed: int = 42, seed: int = 42,
): ):
@@ -147,6 +159,18 @@ class NDAlphaEngine:
self.leverage_convexity = leverage_convexity self.leverage_convexity = leverage_convexity
self.abs_max_leverage = abs_max_leverage self.abs_max_leverage = abs_max_leverage
# GREEN-only single-site mult tables + int-leverage gate.
# BLUE leaves these at None/False → notional/leverage math unchanged.
self.s6_size_table: Optional[Dict[int, float]] = s6_size_table
self.esof_sizing_table: Optional[Dict[str, float]] = esof_sizing_table
self.asset_bucket_data: Optional[Dict[str, Any]] = asset_bucket_data
self.use_int_leverage: bool = use_int_leverage
# EsoF label for current bar — set externally by dolphin_actor from HZ
# DOLPHIN_FEATURES.esof_advisor_latest each tick. None → treated as UNKNOWN.
self._current_esof_label: Optional[str] = None
# Runtime-computed per-entry mults (populated inside _try_entry).
self._esof_size_mult: float = 1.0
# Fee/slippage model # Fee/slippage model
self.use_sp_fees = use_sp_fees self.use_sp_fees = use_sp_fees
self.use_sp_slippage = use_sp_slippage self.use_sp_slippage = use_sp_slippage
@@ -466,6 +490,18 @@ class NDAlphaEngine:
if self.capital <= 0: if self.capital <= 0:
return None return None
# GREEN-only EsoF regime gate. Runs before selector/sizer at the highest-visible level.
# BLUE leaves esof_sizing_table=None → skipped entirely (mult stays 1.0, no gate).
# Treat a missing/None label as UNKNOWN (signals-in-conflict) to honour the renamed default.
esof_label: Optional[str] = None
if self.esof_sizing_table:
esof_label = self._current_esof_label or "UNKNOWN"
self._esof_size_mult = float(self.esof_sizing_table.get(esof_label, 1.0))
if self._esof_size_mult <= 0.0:
return None # regime-wide skip — no selector/sizer work
else:
self._esof_size_mult = 1.0
trade_direction = self.regime_direction trade_direction = self.regime_direction
# 1. IRP asset selection (matches dolphin_vbt_real.py lines 2047-2071) # 1. IRP asset selection (matches dolphin_vbt_real.py lines 2047-2071)
@@ -562,7 +598,46 @@ class NDAlphaEngine:
leverage = min(raw_leverage, clamped_max_leverage) leverage = min(raw_leverage, clamped_max_leverage)
leverage = max(self.bet_sizer.min_leverage, leverage) leverage = max(self.bet_sizer.min_leverage, leverage)
notional = self.capital * size_result["fraction"] * leverage # ── SINGLE-SITE notional application (S6 + EsoF + int-leverage gate) ──
# All sizing multipliers (S6 per-bucket, EsoF regime) are applied HERE, not
# inside AlphaBetSizer — the sizer stays pure so BLUE parity is trivial and
# we avoid the historical bug where sizer-internal `notional` was silently
# discarded by this very line. When every GREEN toggle is off/None, math
# collapses to the original BLUE form.
asset_bucket_id: Optional[int] = None
if self.asset_bucket_data is not None:
assignments = self.asset_bucket_data.get("assignments", {}) if isinstance(self.asset_bucket_data, dict) else {}
_bkt = assignments.get(trade_asset)
if _bkt is not None:
asset_bucket_id = int(_bkt)
s6_mult: float = 1.0
if self.s6_size_table and asset_bucket_id is not None:
s6_mult = float(self.s6_size_table.get(asset_bucket_id, 1.0))
esof_mult: float = self._esof_size_mult
# INTEGER-LEVERAGE GATE — target exchanges (e.g. Binance) require int leverage.
# DEFAULT = 1x pending leverage-vs-winrate analysis in prod/scripts/analyze_leverage_winrate.py.
# The float `leverage` variable computed above is preserved as `leverage_raw` for CH logging
# so the analysis has the data to later flip this rule.
#
# Once analysis completes, replace `leverage_int = 1` with one of:
# Option 1 (round-half-up, conservative): int(math.floor(leverage + 0.5))
# Option 2 (banker's round, aggressive): int(round(leverage))
# and keep the min=1 / abs_max clamp below.
leverage_raw: float = leverage
if self.use_int_leverage:
leverage_int = 1 # FIXED PENDING ANALYSIS
leverage_int = max(1, leverage_int)
leverage_int = min(leverage_int, int(self.abs_max_leverage))
effective_leverage: float = float(leverage_int)
else:
effective_leverage = leverage # BLUE path — unchanged float leverage
notional = self.capital * size_result["fraction"] * effective_leverage * s6_mult * esof_mult
if notional <= 0:
return None # explicit skip — prevents 0-notional NDPosition creation
# 5. Entry price — NO entry slippage (dolphin_vbt_real.py uses raw price for PnL calculation baseline) # 5. Entry price — NO entry slippage (dolphin_vbt_real.py uses raw price for PnL calculation baseline)
entry_price = prices.get(trade_asset, 0) entry_price = prices.get(trade_asset, 0)
@@ -577,12 +652,17 @@ class NDAlphaEngine:
entry_price=entry_price, entry_price=entry_price,
entry_bar=bar_idx, entry_bar=bar_idx,
notional=notional, notional=notional,
leverage=leverage, leverage=effective_leverage,
fraction=size_result["fraction"], fraction=size_result["fraction"],
entry_vel_div=vel_div, entry_vel_div=vel_div,
bucket_idx=size_result["bucket_idx"], bucket_idx=size_result["bucket_idx"],
entry_v50_vel=v50_vel, entry_v50_vel=v50_vel,
entry_v750_vel=v750_vel, entry_v750_vel=v750_vel,
asset_bucket_id=asset_bucket_id,
s6_mult=s6_mult,
esof_mult=esof_mult,
esof_label=esof_label,
leverage_raw=leverage_raw,
) )
# Consume pending overrides (set by subclasses before calling super()._try_entry). # Consume pending overrides (set by subclasses before calling super()._try_entry).
@@ -605,10 +685,15 @@ class NDAlphaEngine:
"trade_id": trade_id, "trade_id": trade_id,
"asset": trade_asset, "asset": trade_asset,
"direction": trade_direction, "direction": trade_direction,
"leverage": leverage, "leverage": effective_leverage,
"leverage_raw": leverage_raw,
"notional": notional, "notional": notional,
"vel_div": vel_div, "vel_div": vel_div,
"entry_price": entry_price, # needed by _exec_submit_entry when prices dict is empty "entry_price": entry_price, # needed by _exec_submit_entry when prices dict is empty
"asset_bucket_id": asset_bucket_id,
"s6_mult": s6_mult,
"esof_mult": esof_mult,
"esof_label": esof_label,
} }
def get_performance_summary(self) -> Dict[str, Any]: def get_performance_summary(self) -> Dict[str, Any]:

View File

@@ -0,0 +1,170 @@
"""GREEN-only feature tests (exp/green-s6-esof-aem-shadow-2026-04-21).
Split complement to `test_green_blue_parity.py`. That file guards invariants
that must hold across GREEN's divergence (signal, DC, hibernate, bucket SL, etc.).
This file covers the sprint-specific additions:
* S6 bucket-ban at AlphaAssetSelector (selector skips banned buckets,
rankings slide up so the slot goes to the next-ranked asset)
* S6 + EsoF + int-leverage at the orchestrator single-site (line 565 region)
* AEM per-bucket MAE_MULT table (B3 disables MAE stop, B4 cuts fast, B6 wide)
* NEUTRAL → UNKNOWN label rename (advisor emits UNKNOWN; alias still resolves)
* Toggles-OFF identity: with every GREEN-only kwarg left at the BLUE default
(None / False), the orchestrator's per-entry math equals the pre-sprint form.
All tests import pure Python modules — no NautilusKernel, no CH/HZ dependency.
"""
from __future__ import annotations
import math
from typing import Dict, List, Optional
import pytest
# ── Toggles-OFF identity (BLUE invariance) ───────────────────────────────────
def test_toggles_off_notional_matches_blue_pre_sprint_formula():
"""With every GREEN toggle left at the BLUE default (None/False), the
single-site notional reduces algebraically to `capital * fraction * leverage`
— the exact form BLUE used before the sprint landed. Guards against silent
regressions that would show up as BLUE PnL drift."""
from nautilus_dolphin.nautilus.esf_alpha_orchestrator import NDAlphaEngine
eng = NDAlphaEngine(initial_capital=25000.0) # all GREEN kwargs default
assert eng.s6_size_table is None
assert eng.esof_sizing_table is None
assert eng.asset_bucket_data is None
assert eng.use_int_leverage is False
# EsoF runtime state initialised to 1.0 (no-op)
assert eng._esof_size_mult == 1.0
assert eng._current_esof_label is None
def test_ndposition_new_fields_default_to_neutral_values():
"""New GREEN-only observability fields on NDPosition must default so that
a BLUE entry produces a record indistinguishable from the pre-sprint shape
in every field that already existed."""
from nautilus_dolphin.nautilus.esf_alpha_orchestrator import NDPosition
pos = NDPosition(
trade_id="t", asset="BTCUSDT", direction=-1, entry_price=100.0,
entry_bar=0, notional=1000.0, leverage=2.0, fraction=0.1,
entry_vel_div=-0.03, bucket_idx=0,
)
assert pos.asset_bucket_id is None
assert pos.s6_mult == 1.0
assert pos.esof_mult == 1.0
assert pos.esof_label is None
assert pos.leverage_raw is None
# ── S6 selector ban ──────────────────────────────────────────────────────────
def test_selector_bucket_ban_skips_banned_bucket_not_slot():
"""Core caveat from the plan: selector-ban must NOT just zero-size the slot;
it must skip the asset so the next ranking takes the slot. Ban ≠ 0× sizer."""
from nautilus_dolphin.nautilus.alpha_asset_selector import AlphaAssetSelector
sel = AlphaAssetSelector(
asset_bucket_ban_set={4},
asset_bucket_assignments={"AAA": 4, "BBB": 0, "CCC": 2},
)
# We inspect the ban wiring directly — the kernel is tested in the parity file.
assert sel.asset_bucket_ban_set == {4}
assert sel.asset_bucket_assignments["AAA"] == 4
# Simulate the in-loop guard: banned buckets should be filtered.
candidates = ["AAA", "BBB", "CCC"]
survivors = [
a for a in candidates
if sel.asset_bucket_assignments.get(a) not in (sel.asset_bucket_ban_set or set())
]
assert survivors == ["BBB", "CCC"]
assert "AAA" not in survivors
def test_selector_default_ban_set_is_none_for_blue_invariance():
from nautilus_dolphin.nautilus.alpha_asset_selector import AlphaAssetSelector
sel = AlphaAssetSelector()
assert sel.asset_bucket_ban_set is None
assert sel.asset_bucket_assignments == {}
# ── AEM per-bucket MAE_MULT ──────────────────────────────────────────────────
def test_mae_mult_b3_disables_stop():
"""B3 (natural winners) is configured with None → MAE_STOP is skipped even
at huge MAE. Giveback/time still apply."""
from adaptive_exit.adaptive_exit_engine import MAE_MULT_BY_BUCKET
assert MAE_MULT_BY_BUCKET[3] is None
def test_mae_mult_b4_is_strict():
"""B4 (gross-negative alpha) gets a tight 2.0× stop — cut fast."""
from adaptive_exit.adaptive_exit_engine import MAE_MULT_BY_BUCKET
assert MAE_MULT_BY_BUCKET[4] == 2.0
def test_mae_mult_b6_is_wide():
"""B6 (extreme vol) gets a wide 6.0× band — noise tolerance."""
from adaptive_exit.adaptive_exit_engine import MAE_MULT_BY_BUCKET
assert MAE_MULT_BY_BUCKET[6] == 6.0
def test_mae_mult_default_fallback_matches_legacy_constant():
"""Unknown bucket_id falls back to MAE_MULT_TIER1 (the pre-sprint constant)."""
from adaptive_exit.adaptive_exit_engine import MAE_MULT_BY_BUCKET, MAE_MULT_TIER1
unknown_bucket = 99
assert MAE_MULT_BY_BUCKET.get(unknown_bucket, MAE_MULT_TIER1) == MAE_MULT_TIER1
# ── EsoF label rename (NEUTRAL → UNKNOWN) ────────────────────────────────────
def test_esof_advisor_emits_unknown_not_neutral_in_midband():
"""The advisor's mid-band score must produce UNKNOWN, not NEUTRAL. Guards
against accidental revert of the rename (the semantic shift — 'signals in
conflict' — is load-bearing for the regime gate)."""
import Observability.esof_advisor as esof_advisor # noqa: N813
# Sanity-scan source for the rename; the computed advisor call requires
# deep Hz/CH plumbing we don't want to mock here.
src = open(esof_advisor.__file__).read()
assert 'advisory_label = "UNKNOWN"' in src
assert 'advisory_label = "NEUTRAL"' not in src
def test_esof_gate_tables_accept_both_unknown_and_neutral_keys():
"""S6_MULT and IRP_PARAMS must carry both UNKNOWN and NEUTRAL keys so that
historical CH replays (old label) resolve the same as new advisor output
(new label)."""
from Observability.esof_gate import S6_MULT, IRP_PARAMS
assert "UNKNOWN" in S6_MULT and "NEUTRAL" in S6_MULT
assert S6_MULT["UNKNOWN"] == S6_MULT["NEUTRAL"]
assert "UNKNOWN" in IRP_PARAMS and "NEUTRAL" in IRP_PARAMS
assert IRP_PARAMS["UNKNOWN"] == IRP_PARAMS["NEUTRAL"]
# ── Int-leverage gate ────────────────────────────────────────────────────────
def test_int_leverage_gate_default_is_1x_min_clamped():
"""The in-source rule is `leverage_int = 1` plus min/abs_max clamp. Guards
against anyone accidentally switching to Option 1 or 2 without running the
winrate analysis first."""
import pathlib
src = pathlib.Path(
"nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py"
).read_text()
# Fixed default must be explicit and annotated.
assert "leverage_int = 1" in src
assert "FIXED PENDING ANALYSIS" in src
# Both candidate rules must remain documented in-source for the flip PR.
assert "round-half-up" in src
assert "banker's round" in src

View File

@@ -66,3 +66,47 @@ hazelcast:
imap_pnl: DOLPHIN_PNL_GREEN imap_pnl: DOLPHIN_PNL_GREEN
imap_state: DOLPHIN_STATE_GREEN imap_state: DOLPHIN_STATE_GREEN
state_map: DOLPHIN_STATE_GREEN # capital persistence map (was defaulting to BLUE) state_map: DOLPHIN_STATE_GREEN # capital persistence map (was defaulting to BLUE)
# ──────────────────────────────────────────────────────────────────────────────
# GREEN S6/EsoF/AEM sprint (exp/green-s6-esof-aem-shadow-2026-04-21).
# BLUE (prod/configs/blue.yml) does NOT set these keys → orchestrator loads them
# as None/False → BLUE math is byte-identical pre-sprint. To disable any branch
# on GREEN, comment the corresponding key — single kill-switch per feature.
# ──────────────────────────────────────────────────────────────────────────────
# Strictly-zero buckets banned at the ranking layer (selector skips them so the
# slot is handed to the next-best asset — does NOT waste capital with 0× sizing).
# Fractional buckets (B0/B1/B5) stay tradeable via s6_size_table below.
asset_bucket_ban_set: [4]
# Pointer to the generated S6 coefficient table. prod/scripts/recompute_s6_coefficients.py
# regenerates this file on the configured cadence (env S6_RECOMPUTE_INTERVAL_DAYS).
# Bucket → per-entry notional multiplier applied at esf_alpha_orchestrator.py single-site.
s6_table_path: prod/configs/green_s6_table.yml
# Inline fallback used when s6_table_path is missing (bootstrap / first-run):
# matches the S6 row from prod/docs/CRITICAL_ASSET_PICKING_BRACKETS_VS._ROI_WR_AT_TRADES.md.
# B4 is absent (banned above); B2 is absent (= 1.0x, no-op).
s6_size_table:
0: 0.40
1: 0.30
3: 2.00
5: 0.50
6: 1.50
# EsoF regime gate lives at the top of _try_entry (orchestrator single-site).
# mult == 0 → regime-wide skip (no selector/sizer work). UNKNOWN replaces NEUTRAL
# (signals-in-conflict is empirically the worst ROI state).
esof_sizing_table:
FAVORABLE: 1.20
MILD_POSITIVE: 0.60
UNKNOWN: 0.25
NEUTRAL: 0.25 # alias — historical CH rows / stale HZ snapshots
MILD_NEGATIVE: 0.00
UNFAVORABLE: 0.00
# Target exchanges (e.g. Binance) require integer leverage. Default 1x pending
# CH-trade-walk in prod/scripts/analyze_leverage_winrate.py to pick the rounding
# rule (round-half-up vs banker's round vs stay-at-1x). leverage_raw is preserved
# in CH trade_events + NDPosition for that analysis.
use_int_leverage: true

View File

@@ -0,0 +1,34 @@
# GREEN S6 bucket sizing table
# ────────────────────────────────────────────────────────────────────────────
# Auto-generated by prod/scripts/recompute_s6_coefficients.py.
# Bucket → per-entry notional multiplier applied at the orchestrator single-site
# (`nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py`).
#
# Semantics:
# value == 1.0 → no-op (same as BLUE)
# value < 1.0 → size down (fractional)
# value > 1.0 → size up (leaned-into bucket)
# absent bucket → 1.0 (no-op)
# value == 0.0 → prefer banning via `asset_bucket_ban_set` in green.yml
# (selector-ban reroutes to next-ranked asset;
# sizer 0× would waste the slot)
#
# Regeneration cadence: env S6_RECOMPUTE_INTERVAL_DAYS (default 30).
# If any bucket's net-PnL moves > ~20% between runs, the recompute flow flags
# the change in CH `dolphin.s6_recompute_log` for manual review before apply.
# ────────────────────────────────────────────────────────────────────────────
meta:
generated_at: "2026-04-21T00:00:00Z" # replaced on each recompute
source_branch: "exp/green-s6-esof-aem-shadow-2026-04-21"
n_trades: 0 # populated by recompute script
note: "Bootstrap stub — regenerate with prod/scripts/recompute_s6_coefficients.py before staging"
# Bucket → multiplier. B4 excluded (banned at selector). B2 omitted (= 1.0 no-op).
# Seeded from the S6 scenario in prod/docs/CRITICAL_ASSET_PICKING_BRACKETS_VS._ROI_WR_AT_TRADES.md.
buckets:
0: 0.40
1: 0.30
3: 2.00
5: 0.50
6: 1.50

View File

@@ -0,0 +1,39 @@
# DOLPHIN Branching Convention
Established 2026-04-21. The DOLPHIN repo (Gitea: `100.119.158.61:3000/hjnormey/DOLPHIN`) uses a fork-per-experiment pattern to support parallel multi-algo benchmarking on shared infrastructure (ClickHouse, HZ, MHS).
## Branches
| Branch | Role |
|----------------|-----------------------------------------------------------------|
| `master` | Production baseline. Only merged-and-validated work lands here. |
| `exp/<slug>` | Experiment branches. Named `exp/<feature>-<YYYY-MM-DD>`. |
Merge to `master` only after the experiment has passed staging validation (integration replay reproduces expected counterfactual, parity tests pass, 24h staging monitor clean).
## Active / recent experiments
- `exp/green-s6-esof-aem-shadow-2026-04-21` — S6 asset picker, EsoF regime gate (NEUTRAL→UNKNOWN), AEM shadow completeness, integer-leverage gate.
## Infrastructure suffixing (avoid collisions on shared CH / HZ / MHS)
When an experiment writes runtime state to shared infra, **suffix the table or map name with the branch slug** so parallel experiments don't stomp each other.
Examples:
- ClickHouse tables: `adaptive_exit_shadow_exp_green_s6`, `trade_events_exp_green_s6`
- HZ maps: `DOLPHIN_PNL_GREEN_S6`, `DOLPHIN_FEATURES_EXP_GREEN_S6`
- Supervisord program names: `dolphin-green-exp-s6`
The suffix slug is the part of the branch name after `exp/`, with `/``_` and `-``_` as needed.
## Workflow
1. Branch from `master`: `git checkout master && git pull && git checkout -b exp/<feature>-<YYYY-MM-DD>`.
2. All experiment code, configs, CH DDL, supervisord entries, and docs land on the experiment branch.
3. Staging deploy runs on the experiment branch only.
4. Validation + monitor period.
5. If the experiment is kept, open a merge request to `master`. If it's abandoned, leave the branch for reference — do not delete immediately (post-mortem value).
## Commit identity
Commits should use the operator's Gitea-bound identity (`hjnormey@gmail.com`). Agents making commits should pass identity per-command via `git -c user.email=... -c user.name=...` rather than mutating global config.

View File

@@ -0,0 +1,26 @@
# EsoF Label Migration — `NEUTRAL` → `UNKNOWN`
Landed in `exp/green-s6-esof-aem-shadow-2026-04-21` on 2026-04-21.
## What changed
`Observability/esof_advisor.py` emits `UNKNOWN` where it previously emitted `NEUTRAL` (mid-band, `-0.05 < advisory_score <= 0.05`). All downstream consumers keep a `NEUTRAL` alias so historical CH rows and replays continue to resolve.
## Why
Findings from the 637-trade EsoF retrospective (`prod/docs/EsoF_BLUE_IMPLEMENTATION_CURR_AND_RESEARCH.md`) show the mid-band isn't a benign middle — it's the **worst-ROI regime**, corresponding to states where the constituent liq/session/DoW/slot/cell signals are **in conflict**. "NEUTRAL" connoted "no strong read, probably fine"; the data says the opposite. The orchestrator-top EsoF gate uses `UNKNOWN``0.25x` sizer mult to encode "stand mostly aside" instead of "trade normally".
## Touched files
- `Observability/esof_advisor.py` — emitter renamed; `LABEL_COLOR` carries both keys.
- `Observability/esof_gate.py``S6_MULT`, `IRP_PARAMS`, Strategy A mult map all keyed on `UNKNOWN` with a `NEUTRAL` alias entry.
- `nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py` — new GREEN-only `esof_sizing_table` looks up by label; missing/None label defaults to `UNKNOWN`.
## CH / HZ
- CH `dolphin.trade_events`, `dolphin.esof_advisory_log`: no DDL change (labels are strings). Historical rows keep `NEUTRAL`; new rows get `UNKNOWN`. Any dashboard/query filtering on the label needs to `IN ('NEUTRAL','UNKNOWN')` or be migrated.
- HZ `DOLPHIN_FEATURES.esof_advisor_latest`: next advisor tick overwrites with `UNKNOWN`. Consumers reading stale snapshots across the cutover should treat both as equivalent.
## Rollback
Revert the three files above. The `NEUTRAL` alias means a partial rollback (advisor only) is safe without cascading breakage.

116
prod/s6_recompute_flow.py Executable file
View File

@@ -0,0 +1,116 @@
#!/usr/bin/env python3
"""Prefect flow — periodically recompute S6 bucket sizing coefficients.
Cadence
───────
Controlled by env `S6_RECOMPUTE_INTERVAL_DAYS` (default 30). The flow is
idempotent: running it more often than the interval just produces a fresh
YAML and markdown report; nothing is auto-promoted.
Kill-switch
───────────
Set env `S6_RECOMPUTE_DISABLED=1` to skip — the flow logs and exits 0.
Wiring into supervisord/cron
────────────────────────────
Run this flow on the supervisord host. Recommended: daily timer that guards
internally on the last-run timestamp in `dolphin.s6_recompute_log`, or a
Prefect deployment with a cron schedule `0 3 * * *` and the interval env set.
This flow shells out to `prod/scripts/recompute_s6_coefficients.py` rather
than importing it so that the script remains independently runnable without
requiring Prefect — honours the plan's "recompute works whether or not
Prefect is installed" requirement.
"""
from __future__ import annotations
import os
import subprocess
import sys
from datetime import datetime, timedelta
from pathlib import Path
try:
from prefect import flow, task, get_run_logger
except ImportError: # pragma: no cover — allow import-only smoke tests
def flow(fn=None, **_kw):
if fn is None:
return lambda f: f
return fn
def task(fn=None, **_kw):
if fn is None:
return lambda f: f
return fn
def get_run_logger(): # pragma: no cover
import logging
return logging.getLogger("s6_recompute_flow")
REPO_ROOT = Path(__file__).resolve().parents[1]
RECOMPUTE_PY = REPO_ROOT / "prod" / "scripts" / "recompute_s6_coefficients.py"
TABLE_YML = REPO_ROOT / "prod" / "configs" / "green_s6_table.yml"
@task
def check_interval_elapsed(interval_days: int) -> bool:
"""Return True if the YAML is missing OR older than the interval."""
if not TABLE_YML.exists():
return True
try:
import yaml
doc = yaml.safe_load(TABLE_YML.read_text()) or {}
ts_str = (doc.get("meta") or {}).get("generated_at", "")
if not ts_str:
return True
ts = datetime.fromisoformat(ts_str.rstrip("Z"))
return datetime.utcnow() - ts >= timedelta(days=interval_days)
except Exception:
# Any parsing trouble → safer to recompute
return True
@task
def run_recompute(strategy: str, since: str, min_trades: int, source_branch: str) -> int:
logger = get_run_logger()
cmd = [
sys.executable, str(RECOMPUTE_PY),
"--strategy", strategy,
"--since", since,
"--min-trades-per-bucket", str(min_trades),
"--source-branch", source_branch,
]
logger.info(f"[s6_recompute_flow] exec: {' '.join(cmd)}")
rc = subprocess.call(cmd, cwd=str(REPO_ROOT))
logger.info(f"[s6_recompute_flow] exit code: {rc}")
return rc
@flow(name="s6-recompute")
def s6_recompute_flow(
strategy: str = "blue",
since: str = "2026-01-01",
min_trades: int = 20,
source_branch: str = "exp/green-s6-esof-aem-shadow-2026-04-21",
):
logger = get_run_logger()
if os.environ.get("S6_RECOMPUTE_DISABLED") == "1":
logger.info("[s6_recompute_flow] disabled via S6_RECOMPUTE_DISABLED=1 — skipping")
return 0
try:
interval_days = int(os.environ.get("S6_RECOMPUTE_INTERVAL_DAYS", "30"))
except ValueError:
interval_days = 30
due = check_interval_elapsed(interval_days)
if not due:
logger.info(f"[s6_recompute_flow] interval not elapsed ({interval_days}d) — skipping")
return 0
return run_recompute(strategy, since, min_trades, source_branch)
if __name__ == "__main__":
sys.exit(int(s6_recompute_flow() or 0))

View File

@@ -0,0 +1,188 @@
#!/usr/bin/env python3
"""Leverage → WR / net-PnL analysis over CH `dolphin.trade_events`.
Purpose
───────
The target exchanges (Binance futures) require integer leverage. The GREEN
orchestrator single-site (`esf_alpha_orchestrator.py`) therefore rounds
leverage to an integer at entry time; the current default is FIXED 1x with
two candidate rounding rules commented in source, pending this analysis:
Option 1 (round-half-up, conservative): int(math.floor(raw + 0.5)), min=1
Option 2 (banker's round, aggressive): int(round(raw)), min=1
Stay-at-1x (current default): leverage_int = 1
This script walks historical trades and, for every distinct leverage value
observed (bucketed to a user-chosen step), reports per-bin WR, net-PnL,
avg-MAE and sample count. The output report informs the rule choice.
Decision rubric (documented in the generated report)
────────────────────────────────────────────────────
- Higher-lev bins show clearly better net-PnL w/ tight sample error → Option 2
- Flat / noisy signal across bins → Option 1
- Higher-lev bins show WORSE net-PnL / mixed → stay 1x
This script is read-only; it does not auto-apply a rule change.
Usage
─────
python prod/scripts/analyze_leverage_winrate.py \\
--strategy blue \\
--since 2026-01-01 \\
--step 0.5 \\
--out prod/docs/LEVERAGE_WINRATE_REPORT_2026-04-21.md
"""
from __future__ import annotations
import argparse
import json
import math
import sys
import urllib.parse
import urllib.request
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
CH_URL = "http://localhost:8123/"
CH_HEADERS = {
"X-ClickHouse-User": "dolphin",
"X-ClickHouse-Key": "dolphin_ch_2026",
}
def ch_query(sql: str) -> List[Dict[str, Any]]:
"""Run a CH query returning JSONEachRow; raise if the HTTP call fails."""
qs = urllib.parse.urlencode({"query": sql + " FORMAT JSONEachRow"})
req = urllib.request.Request(f"{CH_URL}?{qs}", method="GET")
for k, v in CH_HEADERS.items():
req.add_header(k, v)
with urllib.request.urlopen(req, timeout=60) as resp:
body = resp.read().decode("utf-8", errors="replace")
rows: List[Dict[str, Any]] = []
for line in body.splitlines():
line = line.strip()
if line:
rows.append(json.loads(line))
return rows
def bin_leverage(lev: float, step: float) -> float:
"""Bucket leverage to `step`-wide bins, anchored at 0. Returns bin lower edge."""
if lev is None or not math.isfinite(lev) or lev <= 0:
return 0.0
return math.floor(lev / step) * step
def load_trades(strategy: str, since: str, until: Optional[str]) -> List[Dict[str, Any]]:
"""Pull trade_events rows. Uses leverage_raw if present (post-sprint rows),
else falls back to leverage (pre-sprint rows on the same trade semantics).
"""
where = [f"strategy = '{strategy}'", f"date >= toDate('{since}')"]
if until:
where.append(f"date <= toDate('{until}')")
where_sql = " AND ".join(where)
sql = (
"SELECT "
" toDate(date) AS d, "
" asset, "
" coalesce(leverage_raw, leverage) AS lev_raw, "
" leverage AS lev_effective, "
" pnl, pnl_pct, "
" (pnl_pct > 0) AS is_win, "
" exit_reason "
f"FROM dolphin.trade_events WHERE {where_sql}"
)
return ch_query(sql)
def aggregate(trades: List[Dict[str, Any]], step: float) -> List[Dict[str, Any]]:
bins: Dict[float, Dict[str, Any]] = {}
for t in trades:
lev = float(t.get("lev_raw") or 0.0)
b = bin_leverage(lev, step)
s = bins.setdefault(b, {
"bin_lo": b,
"bin_hi": b + step,
"n": 0,
"wins": 0,
"net_pnl": 0.0,
"net_pnl_pct": 0.0,
})
s["n"] += 1
if int(t.get("is_win", 0)):
s["wins"] += 1
s["net_pnl"] += float(t.get("pnl") or 0.0)
s["net_pnl_pct"] += float(t.get("pnl_pct") or 0.0)
out = []
for b, s in sorted(bins.items()):
n = max(1, s["n"])
out.append({
"bin": f"{s['bin_lo']:.2f}{s['bin_hi']:.2f}x",
"n": s["n"],
"wr_pct": round(100.0 * s["wins"] / n, 2),
"avg_pnl_pct": round(s["net_pnl_pct"] / n * 100.0, 3),
"net_pnl": round(s["net_pnl"], 2),
})
return out
def render_markdown(rows: List[Dict[str, Any]],
strategy: str, since: str, until: Optional[str],
step: float, total: int) -> str:
hdr = (
"# Leverage → Winrate / Net-PnL Analysis\n\n"
f"- Strategy: `{strategy}`\n"
f"- Window: `{since}` → `{until or 'now'}`\n"
f"- Leverage bin step: `{step}x`\n"
f"- Total trades in scope: `{total}`\n"
f"- Generated: `{datetime.utcnow().isoformat(timespec='seconds')}Z`\n\n"
"## Per-bin aggregates\n\n"
"| Leverage bin | Trades | WR % | Avg PnL % | Net PnL |\n"
"|---|---:|---:|---:|---:|\n"
)
body = "\n".join(
f"| {r['bin']} | {r['n']} | {r['wr_pct']} | {r['avg_pnl_pct']} | {r['net_pnl']} |"
for r in rows
)
decision = (
"\n\n## Decision rubric (copy into the orchestrator comment block)\n\n"
"- Higher-lev bins show clearly better net-PnL w/ tight sample error → **Option 2** (banker's round, min=1)\n"
"- Flat / noisy signal across bins → **Option 1** (round-half-up, min=1)\n"
"- Higher-lev bins show WORSE net-PnL / mixed → **stay at 1x** (current default)\n\n"
"_Flipping the rule is a follow-up PR — this script is read-only._\n"
)
return hdr + body + decision
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--strategy", default="blue",
help="trade_events.strategy value to filter on (default: blue — richest historical set)")
ap.add_argument("--since", default="2026-01-01", help="inclusive start date (YYYY-MM-DD)")
ap.add_argument("--until", default=None, help="inclusive end date (YYYY-MM-DD); default = now")
ap.add_argument("--step", type=float, default=1.0, help="leverage bin width (e.g. 0.5 or 1.0)")
ap.add_argument("--out", default="prod/docs/LEVERAGE_WINRATE_REPORT.md",
help="output report path (markdown)")
args = ap.parse_args()
trades = load_trades(args.strategy, args.since, args.until)
if not trades:
print(f"No trades matched (strategy={args.strategy}, since={args.since}).", file=sys.stderr)
return 1
rows = aggregate(trades, args.step)
md = render_markdown(rows, args.strategy, args.since, args.until, args.step, total=len(trades))
out_path = Path(args.out)
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_text(md)
print(f"Wrote {out_path} ({len(rows)} bins, {len(trades)} trades)")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,341 @@
#!/usr/bin/env python3
"""Recompute S6 bucket sizing coefficients from fresh `dolphin.trade_events`.
Regenerates:
- prod/configs/green_s6_table.yml (consumed by the GREEN orchestrator single-site)
- prod/docs/S6_COEFFICIENTS_REPORT_<YYYY-MM-DD>.md (human-readable summary)
Variance guard
──────────────
If any bucket's net-PnL moves more than `--variance-threshold` (default 20%)
relative to the prior pinned table, we STILL write the new YAML (so replay/debug
cycles remain reproducible) but also write a row to `dolphin.s6_recompute_log`
so a human can review before promoting to staging/prod.
Usage
─────
python prod/scripts/recompute_s6_coefficients.py \\
--strategy blue --since 2026-01-01 --min-trades-per-bucket 20
Intended to be driven by `prod/s6_recompute_flow.py` (Prefect) on a 30-day cadence.
"""
from __future__ import annotations
import argparse
import json
import pickle
import sys
import urllib.parse
import urllib.request
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
CH_URL = "http://localhost:8123/"
CH_HEADERS = {
"X-ClickHouse-User": "dolphin",
"X-ClickHouse-Key": "dolphin_ch_2026",
}
DEFAULT_BUCKET_PKL = Path("adaptive_exit/models/bucket_assignments.pkl")
DEFAULT_TABLE_YML = Path("prod/configs/green_s6_table.yml")
REPORT_DIR = Path("prod/docs")
def ch_query(sql: str) -> List[Dict[str, Any]]:
qs = urllib.parse.urlencode({"query": sql + " FORMAT JSONEachRow"})
req = urllib.request.Request(f"{CH_URL}?{qs}", method="GET")
for k, v in CH_HEADERS.items():
req.add_header(k, v)
with urllib.request.urlopen(req, timeout=60) as resp:
body = resp.read().decode("utf-8", errors="replace")
rows: List[Dict[str, Any]] = []
for line in body.splitlines():
line = line.strip()
if line:
rows.append(json.loads(line))
return rows
def ch_insert(table: str, row: Dict[str, Any], db: str = "dolphin") -> None:
"""Best-effort insert into `{db}.{table}` (JSONEachRow). Swallows errors."""
body = (json.dumps(row) + "\n").encode()
qs = urllib.parse.urlencode({
"database": db,
"query": f"INSERT INTO {table} FORMAT JSONEachRow",
})
req = urllib.request.Request(f"{CH_URL}?{qs}", data=body, method="POST")
for k, v in CH_HEADERS.items():
req.add_header(k, v)
try:
urllib.request.urlopen(req, timeout=10)
except Exception as e:
print(f"[s6_recompute] CH insert warning: {e}", file=sys.stderr)
def ensure_recompute_log() -> None:
ddl = (
"CREATE TABLE IF NOT EXISTS dolphin.s6_recompute_log ("
"ts DateTime64(6, 'UTC'),"
"ts_day Date MATERIALIZED toDate(ts),"
"bucket_id UInt8,"
"n_trades UInt32,"
"wr_pct Float32,"
"net_pnl Float64,"
"prior_net Nullable(Float64),"
"delta_pct Nullable(Float32),"
"new_mult Float32,"
"prior_mult Nullable(Float32),"
"flagged UInt8"
") ENGINE = MergeTree()"
" ORDER BY (ts_day, bucket_id, ts)"
" TTL ts_day + INTERVAL 365 DAY"
)
req = urllib.request.Request(CH_URL, data=ddl.encode(), method="POST")
for k, v in CH_HEADERS.items():
req.add_header(k, v)
try:
urllib.request.urlopen(req, timeout=10)
except Exception as e:
print(f"[s6_recompute] could not ensure log table: {e}", file=sys.stderr)
def load_bucket_assignments(path: Path) -> Dict[str, int]:
with open(path, "rb") as f:
doc = pickle.load(f)
assignments = doc.get("assignments") if isinstance(doc, dict) else None
if not isinstance(assignments, dict):
raise RuntimeError(f"{path}: expected dict with 'assignments' key")
return {str(k): int(v) for k, v in assignments.items()}
def pull_trades(strategy: str, since: str, until: Optional[str]) -> List[Dict[str, Any]]:
where = [f"strategy = '{strategy}'", f"date >= toDate('{since}')"]
if until:
where.append(f"date <= toDate('{until}')")
where_sql = " AND ".join(where)
sql = (
"SELECT asset, pnl, pnl_pct, exit_reason "
f"FROM dolphin.trade_events WHERE {where_sql}"
)
return ch_query(sql)
def compute_bucket_stats(
trades: List[Dict[str, Any]],
assignments: Dict[str, int],
buckets: List[int],
min_trades: int,
) -> Dict[int, Dict[str, float]]:
stats: Dict[int, Dict[str, float]] = {b: {"n": 0, "wins": 0, "net_pnl": 0.0, "net_pnl_pct": 0.0} for b in buckets}
for t in trades:
asset = str(t.get("asset", ""))
b = assignments.get(asset)
if b is None or b not in stats:
continue
stats[b]["n"] += 1
stats[b]["wins"] += 1 if float(t.get("pnl_pct") or 0.0) > 0 else 0
stats[b]["net_pnl"] += float(t.get("pnl") or 0.0)
stats[b]["net_pnl_pct"] += float(t.get("pnl_pct") or 0.0)
out = {}
for b, s in stats.items():
n = s["n"] if s["n"] >= min_trades else s["n"]
if s["n"] == 0:
out[b] = {"n": 0, "wr": 0.0, "avg_pnl_pct": 0.0, "net_pnl": 0.0, "enough": False}
else:
out[b] = {
"n": s["n"],
"wr": s["wins"] / s["n"],
"avg_pnl_pct": s["net_pnl_pct"] / s["n"],
"net_pnl": s["net_pnl"],
"enough": s["n"] >= min_trades,
}
return out
def stats_to_multipliers(stats: Dict[int, Dict[str, float]]) -> Dict[int, float]:
"""Translate per-bucket aggregate outcomes into a size multiplier.
Conservative mapping (keeps the file a documented, deterministic function —
not an ML output). Brackets match the S6 reference row in
prod/docs/CRITICAL_ASSET_PICKING_BRACKETS_VS._ROI_WR_AT_TRADES.md.
avg_pnl_pct > 0.015 → 2.0 (lean in)
avg_pnl_pct > 0.005 → 1.5
avg_pnl_pct > 0.001 → 1.0 (omit from YAML — no-op)
avg_pnl_pct > 0.000 → 0.5
avg_pnl_pct > -0.002 → 0.3
avg_pnl_pct > -0.005 → 0.0 (ban candidate — see `asset_bucket_ban_set`)
otherwise → 0.0
"""
out: Dict[int, float] = {}
for b, s in stats.items():
if not s.get("enough"):
continue # skip thin buckets — default to no-op (absent = 1.0x)
apct = s["avg_pnl_pct"]
if apct > 0.015: out[b] = 2.0
elif apct > 0.005: out[b] = 1.5
elif apct > 0.001: pass # no-op (1.0x)
elif apct > 0.000: out[b] = 0.5
elif apct > -0.002: out[b] = 0.3
else: out[b] = 0.0
return out
def load_prior_table(path: Path) -> Dict[int, float]:
if not path.exists():
return {}
try:
import yaml
doc = yaml.safe_load(path.read_text()) or {}
buckets = doc.get("buckets") or {}
return {int(k): float(v) for k, v in buckets.items()}
except Exception as e:
print(f"[s6_recompute] prior table load warning: {e}", file=sys.stderr)
return {}
def write_table_yml(path: Path, buckets: Dict[int, float], meta: Dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
lines = [
"# GREEN S6 bucket sizing table — auto-generated, do not edit by hand.",
"# Consumed by nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py",
"# at the single-site notional application.",
"",
"meta:",
f" generated_at: \"{meta['generated_at']}\"",
f" source_branch: \"{meta.get('source_branch', '')}\"",
f" strategy: \"{meta['strategy']}\"",
f" n_trades: {meta['n_trades']}",
f" min_trades_per_bucket: {meta['min_trades']}",
"",
"buckets:",
]
for b in sorted(buckets):
lines.append(f" {b}: {buckets[b]:.2f}")
path.write_text("\n".join(lines) + "\n")
def write_report(path: Path, stats: Dict[int, Dict[str, float]],
new_table: Dict[int, float], prior_table: Dict[int, float],
meta: Dict[str, Any], flagged: List[int]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
lines = [
f"# S6 Coefficient Recompute — {meta['generated_at']}",
"",
f"- Strategy: `{meta['strategy']}`",
f"- Window: `{meta['since']}` → `{meta.get('until') or 'now'}`",
f"- Total trades: `{meta['n_trades']}`",
f"- Min trades per bucket: `{meta['min_trades']}`",
"",
"## Per-bucket stats",
"",
"| Bucket | Trades | WR % | Avg PnL % | Net PnL | New mult | Prior mult | Flagged |",
"|---:|---:|---:|---:|---:|---:|---:|:---:|",
]
for b in sorted(stats):
s = stats[b]
new = new_table.get(b, 1.0)
prior = prior_table.get(b, 1.0)
flag = "" if b in flagged else ""
lines.append(
f"| {b} | {s['n']} | {100*s['wr']:.2f} | {100*s['avg_pnl_pct']:.3f} | "
f"{s['net_pnl']:.2f} | {new:.2f} | {prior:.2f} | {flag} |"
)
if flagged:
lines += [
"",
"## ⚠ Flagged for manual review",
"",
"The following buckets moved more than the variance threshold since the",
"prior pinned table. The new YAML was written but should be reviewed",
"before promotion to staging/prod — see `dolphin.s6_recompute_log`.",
"",
"| Bucket | Prior net | New net | Δ% |",
"|---:|---:|---:|---:|",
]
for b in flagged:
prior_net = prior_table.get(f"_net_{b}") # may be absent
lines.append(f"| {b} | — | {stats[b]['net_pnl']:.2f} | — |")
path.write_text("\n".join(lines) + "\n")
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--strategy", default="blue",
help="trade_events.strategy used for historical grounding (BLUE by default; richest dataset)")
ap.add_argument("--since", default="2026-01-01")
ap.add_argument("--until", default=None)
ap.add_argument("--min-trades-per-bucket", dest="min_trades", type=int, default=20)
ap.add_argument("--variance-threshold", type=float, default=0.20,
help="fraction move in bucket net-PnL that flags a bucket for review (default 0.20 = 20%%)")
ap.add_argument("--bucket-pkl", default=str(DEFAULT_BUCKET_PKL),
help="path to bucket_assignments.pkl")
ap.add_argument("--out-yml", default=str(DEFAULT_TABLE_YML))
ap.add_argument("--out-report", default=None,
help="report path (default: prod/docs/S6_COEFFICIENTS_REPORT_<date>.md)")
ap.add_argument("--source-branch", default="",
help="branch slug to stamp into the YAML frontmatter for traceability")
args = ap.parse_args()
assignments = load_bucket_assignments(Path(args.bucket_pkl))
buckets_seen = sorted(set(assignments.values()))
trades = pull_trades(args.strategy, args.since, args.until)
stats = compute_bucket_stats(trades, assignments, buckets_seen, args.min_trades)
new_table = stats_to_multipliers(stats)
prior_table = load_prior_table(Path(args.out_yml))
# Variance guard — flag buckets whose net-PnL moved more than the threshold.
flagged: List[int] = []
for b, s in stats.items():
if not s.get("enough"):
continue
prior_mult = prior_table.get(b)
new_mult = new_table.get(b, 1.0)
if prior_mult is None:
continue
denom = max(abs(prior_mult), 1e-6)
if abs(new_mult - prior_mult) / denom > args.variance_threshold:
flagged.append(b)
generated_at = datetime.utcnow().isoformat(timespec="seconds") + "Z"
meta = {
"generated_at": generated_at,
"source_branch": args.source_branch,
"strategy": args.strategy,
"since": args.since,
"until": args.until,
"n_trades": len(trades),
"min_trades": args.min_trades,
}
write_table_yml(Path(args.out_yml), new_table, meta)
out_report = Path(args.out_report) if args.out_report else REPORT_DIR / f"S6_COEFFICIENTS_REPORT_{generated_at[:10]}.md"
write_report(out_report, stats, new_table, prior_table, meta, flagged)
ensure_recompute_log()
for b in flagged:
ch_insert("s6_recompute_log", {
"ts": int(datetime.utcnow().timestamp() * 1_000_000),
"bucket_id": int(b),
"n_trades": int(stats[b]["n"]),
"wr_pct": float(100.0 * stats[b]["wr"]),
"net_pnl": float(stats[b]["net_pnl"]),
"prior_net": None,
"delta_pct": None,
"new_mult": float(new_table.get(b, 1.0)),
"prior_mult": float(prior_table.get(b, 1.0)),
"flagged": 1,
})
print(f"Wrote {args.out_yml} ({len(new_table)} active buckets, {len(flagged)} flagged)")
print(f"Wrote {out_report}")
return 0
if __name__ == "__main__":
sys.exit(main())