diff --git a/prod/configs/green.yml b/prod/configs/green.yml index d75471f..196f363 100755 --- a/prod/configs/green.yml +++ b/prod/configs/green.yml @@ -66,3 +66,47 @@ hazelcast: imap_pnl: DOLPHIN_PNL_GREEN imap_state: DOLPHIN_STATE_GREEN state_map: DOLPHIN_STATE_GREEN # capital persistence map (was defaulting to BLUE) + +# ────────────────────────────────────────────────────────────────────────────── +# GREEN S6/EsoF/AEM sprint (exp/green-s6-esof-aem-shadow-2026-04-21). +# BLUE (prod/configs/blue.yml) does NOT set these keys → orchestrator loads them +# as None/False → BLUE math is byte-identical pre-sprint. To disable any branch +# on GREEN, comment the corresponding key — single kill-switch per feature. +# ────────────────────────────────────────────────────────────────────────────── + +# Strictly-zero buckets banned at the ranking layer (selector skips them so the +# slot is handed to the next-best asset — does NOT waste capital with 0× sizing). +# Fractional buckets (B0/B1/B5) stay tradeable via s6_size_table below. +asset_bucket_ban_set: [4] + +# Pointer to the generated S6 coefficient table. prod/scripts/recompute_s6_coefficients.py +# regenerates this file on the configured cadence (env S6_RECOMPUTE_INTERVAL_DAYS). +# Bucket → per-entry notional multiplier applied at esf_alpha_orchestrator.py single-site. +s6_table_path: prod/configs/green_s6_table.yml + +# Inline fallback used when s6_table_path is missing (bootstrap / first-run): +# matches the S6 row from prod/docs/CRITICAL_ASSET_PICKING_BRACKETS_VS._ROI_WR_AT_TRADES.md. +# B4 is absent (banned above); B2 is absent (= 1.0x, no-op). +s6_size_table: + 0: 0.40 + 1: 0.30 + 3: 2.00 + 5: 0.50 + 6: 1.50 + +# EsoF regime gate lives at the top of _try_entry (orchestrator single-site). +# mult == 0 → regime-wide skip (no selector/sizer work). UNKNOWN replaces NEUTRAL +# (signals-in-conflict is empirically the worst ROI state). +esof_sizing_table: + FAVORABLE: 1.20 + MILD_POSITIVE: 0.60 + UNKNOWN: 0.25 + NEUTRAL: 0.25 # alias — historical CH rows / stale HZ snapshots + MILD_NEGATIVE: 0.00 + UNFAVORABLE: 0.00 + +# Target exchanges (e.g. Binance) require integer leverage. Default 1x pending +# CH-trade-walk in prod/scripts/analyze_leverage_winrate.py to pick the rounding +# rule (round-half-up vs banker's round vs stay-at-1x). leverage_raw is preserved +# in CH trade_events + NDPosition for that analysis. +use_int_leverage: true diff --git a/prod/configs/green_s6_table.yml b/prod/configs/green_s6_table.yml new file mode 100644 index 0000000..72fc053 --- /dev/null +++ b/prod/configs/green_s6_table.yml @@ -0,0 +1,34 @@ +# GREEN S6 bucket sizing table +# ──────────────────────────────────────────────────────────────────────────── +# Auto-generated by prod/scripts/recompute_s6_coefficients.py. +# Bucket → per-entry notional multiplier applied at the orchestrator single-site +# (`nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py`). +# +# Semantics: +# value == 1.0 → no-op (same as BLUE) +# value < 1.0 → size down (fractional) +# value > 1.0 → size up (leaned-into bucket) +# absent bucket → 1.0 (no-op) +# value == 0.0 → prefer banning via `asset_bucket_ban_set` in green.yml +# (selector-ban reroutes to next-ranked asset; +# sizer 0× would waste the slot) +# +# Regeneration cadence: env S6_RECOMPUTE_INTERVAL_DAYS (default 30). +# If any bucket's net-PnL moves > ~20% between runs, the recompute flow flags +# the change in CH `dolphin.s6_recompute_log` for manual review before apply. +# ──────────────────────────────────────────────────────────────────────────── + +meta: + generated_at: "2026-04-21T00:00:00Z" # replaced on each recompute + source_branch: "exp/green-s6-esof-aem-shadow-2026-04-21" + n_trades: 0 # populated by recompute script + note: "Bootstrap stub — regenerate with prod/scripts/recompute_s6_coefficients.py before staging" + +# Bucket → multiplier. B4 excluded (banned at selector). B2 omitted (= 1.0 no-op). +# Seeded from the S6 scenario in prod/docs/CRITICAL_ASSET_PICKING_BRACKETS_VS._ROI_WR_AT_TRADES.md. +buckets: + 0: 0.40 + 1: 0.30 + 3: 2.00 + 5: 0.50 + 6: 1.50 diff --git a/prod/docs/BRANCHING_CONVENTION.md b/prod/docs/BRANCHING_CONVENTION.md new file mode 100644 index 0000000..bd1edcc --- /dev/null +++ b/prod/docs/BRANCHING_CONVENTION.md @@ -0,0 +1,39 @@ +# DOLPHIN Branching Convention + +Established 2026-04-21. The DOLPHIN repo (Gitea: `100.119.158.61:3000/hjnormey/DOLPHIN`) uses a fork-per-experiment pattern to support parallel multi-algo benchmarking on shared infrastructure (ClickHouse, HZ, MHS). + +## Branches + +| Branch | Role | +|----------------|-----------------------------------------------------------------| +| `master` | Production baseline. Only merged-and-validated work lands here. | +| `exp/` | Experiment branches. Named `exp/-`. | + +Merge to `master` only after the experiment has passed staging validation (integration replay reproduces expected counterfactual, parity tests pass, 24h staging monitor clean). + +## Active / recent experiments + +- `exp/green-s6-esof-aem-shadow-2026-04-21` — S6 asset picker, EsoF regime gate (NEUTRAL→UNKNOWN), AEM shadow completeness, integer-leverage gate. + +## Infrastructure suffixing (avoid collisions on shared CH / HZ / MHS) + +When an experiment writes runtime state to shared infra, **suffix the table or map name with the branch slug** so parallel experiments don't stomp each other. + +Examples: +- ClickHouse tables: `adaptive_exit_shadow_exp_green_s6`, `trade_events_exp_green_s6` +- HZ maps: `DOLPHIN_PNL_GREEN_S6`, `DOLPHIN_FEATURES_EXP_GREEN_S6` +- Supervisord program names: `dolphin-green-exp-s6` + +The suffix slug is the part of the branch name after `exp/`, with `/` → `_` and `-` → `_` as needed. + +## Workflow + +1. Branch from `master`: `git checkout master && git pull && git checkout -b exp/-`. +2. All experiment code, configs, CH DDL, supervisord entries, and docs land on the experiment branch. +3. Staging deploy runs on the experiment branch only. +4. Validation + monitor period. +5. If the experiment is kept, open a merge request to `master`. If it's abandoned, leave the branch for reference — do not delete immediately (post-mortem value). + +## Commit identity + +Commits should use the operator's Gitea-bound identity (`hjnormey@gmail.com`). Agents making commits should pass identity per-command via `git -c user.email=... -c user.name=...` rather than mutating global config. diff --git a/prod/s6_recompute_flow.py b/prod/s6_recompute_flow.py new file mode 100755 index 0000000..fdb63df --- /dev/null +++ b/prod/s6_recompute_flow.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +"""Prefect flow — periodically recompute S6 bucket sizing coefficients. + +Cadence +─────── +Controlled by env `S6_RECOMPUTE_INTERVAL_DAYS` (default 30). The flow is +idempotent: running it more often than the interval just produces a fresh +YAML and markdown report; nothing is auto-promoted. + +Kill-switch +─────────── +Set env `S6_RECOMPUTE_DISABLED=1` to skip — the flow logs and exits 0. + +Wiring into supervisord/cron +──────────────────────────── +Run this flow on the supervisord host. Recommended: daily timer that guards +internally on the last-run timestamp in `dolphin.s6_recompute_log`, or a +Prefect deployment with a cron schedule `0 3 * * *` and the interval env set. + +This flow shells out to `prod/scripts/recompute_s6_coefficients.py` rather +than importing it so that the script remains independently runnable without +requiring Prefect — honours the plan's "recompute works whether or not +Prefect is installed" requirement. +""" + +from __future__ import annotations + +import os +import subprocess +import sys +from datetime import datetime, timedelta +from pathlib import Path + +try: + from prefect import flow, task, get_run_logger +except ImportError: # pragma: no cover — allow import-only smoke tests + def flow(fn=None, **_kw): + if fn is None: + return lambda f: f + return fn + def task(fn=None, **_kw): + if fn is None: + return lambda f: f + return fn + def get_run_logger(): # pragma: no cover + import logging + return logging.getLogger("s6_recompute_flow") + + +REPO_ROOT = Path(__file__).resolve().parents[1] +RECOMPUTE_PY = REPO_ROOT / "prod" / "scripts" / "recompute_s6_coefficients.py" +TABLE_YML = REPO_ROOT / "prod" / "configs" / "green_s6_table.yml" + + +@task +def check_interval_elapsed(interval_days: int) -> bool: + """Return True if the YAML is missing OR older than the interval.""" + if not TABLE_YML.exists(): + return True + try: + import yaml + doc = yaml.safe_load(TABLE_YML.read_text()) or {} + ts_str = (doc.get("meta") or {}).get("generated_at", "") + if not ts_str: + return True + ts = datetime.fromisoformat(ts_str.rstrip("Z")) + return datetime.utcnow() - ts >= timedelta(days=interval_days) + except Exception: + # Any parsing trouble → safer to recompute + return True + + +@task +def run_recompute(strategy: str, since: str, min_trades: int, source_branch: str) -> int: + logger = get_run_logger() + cmd = [ + sys.executable, str(RECOMPUTE_PY), + "--strategy", strategy, + "--since", since, + "--min-trades-per-bucket", str(min_trades), + "--source-branch", source_branch, + ] + logger.info(f"[s6_recompute_flow] exec: {' '.join(cmd)}") + rc = subprocess.call(cmd, cwd=str(REPO_ROOT)) + logger.info(f"[s6_recompute_flow] exit code: {rc}") + return rc + + +@flow(name="s6-recompute") +def s6_recompute_flow( + strategy: str = "blue", + since: str = "2026-01-01", + min_trades: int = 20, + source_branch: str = "exp/green-s6-esof-aem-shadow-2026-04-21", +): + logger = get_run_logger() + + if os.environ.get("S6_RECOMPUTE_DISABLED") == "1": + logger.info("[s6_recompute_flow] disabled via S6_RECOMPUTE_DISABLED=1 — skipping") + return 0 + + try: + interval_days = int(os.environ.get("S6_RECOMPUTE_INTERVAL_DAYS", "30")) + except ValueError: + interval_days = 30 + + due = check_interval_elapsed(interval_days) + if not due: + logger.info(f"[s6_recompute_flow] interval not elapsed ({interval_days}d) — skipping") + return 0 + + return run_recompute(strategy, since, min_trades, source_branch) + + +if __name__ == "__main__": + sys.exit(int(s6_recompute_flow() or 0)) diff --git a/prod/scripts/analyze_leverage_winrate.py b/prod/scripts/analyze_leverage_winrate.py new file mode 100755 index 0000000..e6c7156 --- /dev/null +++ b/prod/scripts/analyze_leverage_winrate.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +"""Leverage → WR / net-PnL analysis over CH `dolphin.trade_events`. + +Purpose +─────── +The target exchanges (Binance futures) require integer leverage. The GREEN +orchestrator single-site (`esf_alpha_orchestrator.py`) therefore rounds +leverage to an integer at entry time; the current default is FIXED 1x with +two candidate rounding rules commented in source, pending this analysis: + + Option 1 (round-half-up, conservative): int(math.floor(raw + 0.5)), min=1 + Option 2 (banker's round, aggressive): int(round(raw)), min=1 + Stay-at-1x (current default): leverage_int = 1 + +This script walks historical trades and, for every distinct leverage value +observed (bucketed to a user-chosen step), reports per-bin WR, net-PnL, +avg-MAE and sample count. The output report informs the rule choice. + +Decision rubric (documented in the generated report) +──────────────────────────────────────────────────── + - Higher-lev bins show clearly better net-PnL w/ tight sample error → Option 2 + - Flat / noisy signal across bins → Option 1 + - Higher-lev bins show WORSE net-PnL / mixed → stay 1x + +This script is read-only; it does not auto-apply a rule change. + +Usage +───── + python prod/scripts/analyze_leverage_winrate.py \\ + --strategy blue \\ + --since 2026-01-01 \\ + --step 0.5 \\ + --out prod/docs/LEVERAGE_WINRATE_REPORT_2026-04-21.md +""" + +from __future__ import annotations + +import argparse +import json +import math +import sys +import urllib.parse +import urllib.request +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +CH_URL = "http://localhost:8123/" +CH_HEADERS = { + "X-ClickHouse-User": "dolphin", + "X-ClickHouse-Key": "dolphin_ch_2026", +} + + +def ch_query(sql: str) -> List[Dict[str, Any]]: + """Run a CH query returning JSONEachRow; raise if the HTTP call fails.""" + qs = urllib.parse.urlencode({"query": sql + " FORMAT JSONEachRow"}) + req = urllib.request.Request(f"{CH_URL}?{qs}", method="GET") + for k, v in CH_HEADERS.items(): + req.add_header(k, v) + with urllib.request.urlopen(req, timeout=60) as resp: + body = resp.read().decode("utf-8", errors="replace") + rows: List[Dict[str, Any]] = [] + for line in body.splitlines(): + line = line.strip() + if line: + rows.append(json.loads(line)) + return rows + + +def bin_leverage(lev: float, step: float) -> float: + """Bucket leverage to `step`-wide bins, anchored at 0. Returns bin lower edge.""" + if lev is None or not math.isfinite(lev) or lev <= 0: + return 0.0 + return math.floor(lev / step) * step + + +def load_trades(strategy: str, since: str, until: Optional[str]) -> List[Dict[str, Any]]: + """Pull trade_events rows. Uses leverage_raw if present (post-sprint rows), + else falls back to leverage (pre-sprint rows on the same trade semantics). + """ + where = [f"strategy = '{strategy}'", f"date >= toDate('{since}')"] + if until: + where.append(f"date <= toDate('{until}')") + where_sql = " AND ".join(where) + + sql = ( + "SELECT " + " toDate(date) AS d, " + " asset, " + " coalesce(leverage_raw, leverage) AS lev_raw, " + " leverage AS lev_effective, " + " pnl, pnl_pct, " + " (pnl_pct > 0) AS is_win, " + " exit_reason " + f"FROM dolphin.trade_events WHERE {where_sql}" + ) + return ch_query(sql) + + +def aggregate(trades: List[Dict[str, Any]], step: float) -> List[Dict[str, Any]]: + bins: Dict[float, Dict[str, Any]] = {} + for t in trades: + lev = float(t.get("lev_raw") or 0.0) + b = bin_leverage(lev, step) + s = bins.setdefault(b, { + "bin_lo": b, + "bin_hi": b + step, + "n": 0, + "wins": 0, + "net_pnl": 0.0, + "net_pnl_pct": 0.0, + }) + s["n"] += 1 + if int(t.get("is_win", 0)): + s["wins"] += 1 + s["net_pnl"] += float(t.get("pnl") or 0.0) + s["net_pnl_pct"] += float(t.get("pnl_pct") or 0.0) + + out = [] + for b, s in sorted(bins.items()): + n = max(1, s["n"]) + out.append({ + "bin": f"{s['bin_lo']:.2f}–{s['bin_hi']:.2f}x", + "n": s["n"], + "wr_pct": round(100.0 * s["wins"] / n, 2), + "avg_pnl_pct": round(s["net_pnl_pct"] / n * 100.0, 3), + "net_pnl": round(s["net_pnl"], 2), + }) + return out + + +def render_markdown(rows: List[Dict[str, Any]], + strategy: str, since: str, until: Optional[str], + step: float, total: int) -> str: + hdr = ( + "# Leverage → Winrate / Net-PnL Analysis\n\n" + f"- Strategy: `{strategy}`\n" + f"- Window: `{since}` → `{until or 'now'}`\n" + f"- Leverage bin step: `{step}x`\n" + f"- Total trades in scope: `{total}`\n" + f"- Generated: `{datetime.utcnow().isoformat(timespec='seconds')}Z`\n\n" + "## Per-bin aggregates\n\n" + "| Leverage bin | Trades | WR % | Avg PnL % | Net PnL |\n" + "|---|---:|---:|---:|---:|\n" + ) + body = "\n".join( + f"| {r['bin']} | {r['n']} | {r['wr_pct']} | {r['avg_pnl_pct']} | {r['net_pnl']} |" + for r in rows + ) + decision = ( + "\n\n## Decision rubric (copy into the orchestrator comment block)\n\n" + "- Higher-lev bins show clearly better net-PnL w/ tight sample error → **Option 2** (banker's round, min=1)\n" + "- Flat / noisy signal across bins → **Option 1** (round-half-up, min=1)\n" + "- Higher-lev bins show WORSE net-PnL / mixed → **stay at 1x** (current default)\n\n" + "_Flipping the rule is a follow-up PR — this script is read-only._\n" + ) + return hdr + body + decision + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--strategy", default="blue", + help="trade_events.strategy value to filter on (default: blue — richest historical set)") + ap.add_argument("--since", default="2026-01-01", help="inclusive start date (YYYY-MM-DD)") + ap.add_argument("--until", default=None, help="inclusive end date (YYYY-MM-DD); default = now") + ap.add_argument("--step", type=float, default=1.0, help="leverage bin width (e.g. 0.5 or 1.0)") + ap.add_argument("--out", default="prod/docs/LEVERAGE_WINRATE_REPORT.md", + help="output report path (markdown)") + args = ap.parse_args() + + trades = load_trades(args.strategy, args.since, args.until) + if not trades: + print(f"No trades matched (strategy={args.strategy}, since={args.since}).", file=sys.stderr) + return 1 + + rows = aggregate(trades, args.step) + md = render_markdown(rows, args.strategy, args.since, args.until, args.step, total=len(trades)) + + out_path = Path(args.out) + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text(md) + print(f"Wrote {out_path} ({len(rows)} bins, {len(trades)} trades)") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/prod/scripts/recompute_s6_coefficients.py b/prod/scripts/recompute_s6_coefficients.py new file mode 100755 index 0000000..753cd87 --- /dev/null +++ b/prod/scripts/recompute_s6_coefficients.py @@ -0,0 +1,341 @@ +#!/usr/bin/env python3 +"""Recompute S6 bucket sizing coefficients from fresh `dolphin.trade_events`. + +Regenerates: + - prod/configs/green_s6_table.yml (consumed by the GREEN orchestrator single-site) + - prod/docs/S6_COEFFICIENTS_REPORT_.md (human-readable summary) + +Variance guard +────────────── +If any bucket's net-PnL moves more than `--variance-threshold` (default 20%) +relative to the prior pinned table, we STILL write the new YAML (so replay/debug +cycles remain reproducible) but also write a row to `dolphin.s6_recompute_log` +so a human can review before promoting to staging/prod. + +Usage +───── + python prod/scripts/recompute_s6_coefficients.py \\ + --strategy blue --since 2026-01-01 --min-trades-per-bucket 20 + +Intended to be driven by `prod/s6_recompute_flow.py` (Prefect) on a 30-day cadence. +""" + +from __future__ import annotations + +import argparse +import json +import pickle +import sys +import urllib.parse +import urllib.request +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional + +CH_URL = "http://localhost:8123/" +CH_HEADERS = { + "X-ClickHouse-User": "dolphin", + "X-ClickHouse-Key": "dolphin_ch_2026", +} + +DEFAULT_BUCKET_PKL = Path("adaptive_exit/models/bucket_assignments.pkl") +DEFAULT_TABLE_YML = Path("prod/configs/green_s6_table.yml") +REPORT_DIR = Path("prod/docs") + + +def ch_query(sql: str) -> List[Dict[str, Any]]: + qs = urllib.parse.urlencode({"query": sql + " FORMAT JSONEachRow"}) + req = urllib.request.Request(f"{CH_URL}?{qs}", method="GET") + for k, v in CH_HEADERS.items(): + req.add_header(k, v) + with urllib.request.urlopen(req, timeout=60) as resp: + body = resp.read().decode("utf-8", errors="replace") + rows: List[Dict[str, Any]] = [] + for line in body.splitlines(): + line = line.strip() + if line: + rows.append(json.loads(line)) + return rows + + +def ch_insert(table: str, row: Dict[str, Any], db: str = "dolphin") -> None: + """Best-effort insert into `{db}.{table}` (JSONEachRow). Swallows errors.""" + body = (json.dumps(row) + "\n").encode() + qs = urllib.parse.urlencode({ + "database": db, + "query": f"INSERT INTO {table} FORMAT JSONEachRow", + }) + req = urllib.request.Request(f"{CH_URL}?{qs}", data=body, method="POST") + for k, v in CH_HEADERS.items(): + req.add_header(k, v) + try: + urllib.request.urlopen(req, timeout=10) + except Exception as e: + print(f"[s6_recompute] CH insert warning: {e}", file=sys.stderr) + + +def ensure_recompute_log() -> None: + ddl = ( + "CREATE TABLE IF NOT EXISTS dolphin.s6_recompute_log (" + "ts DateTime64(6, 'UTC')," + "ts_day Date MATERIALIZED toDate(ts)," + "bucket_id UInt8," + "n_trades UInt32," + "wr_pct Float32," + "net_pnl Float64," + "prior_net Nullable(Float64)," + "delta_pct Nullable(Float32)," + "new_mult Float32," + "prior_mult Nullable(Float32)," + "flagged UInt8" + ") ENGINE = MergeTree()" + " ORDER BY (ts_day, bucket_id, ts)" + " TTL ts_day + INTERVAL 365 DAY" + ) + req = urllib.request.Request(CH_URL, data=ddl.encode(), method="POST") + for k, v in CH_HEADERS.items(): + req.add_header(k, v) + try: + urllib.request.urlopen(req, timeout=10) + except Exception as e: + print(f"[s6_recompute] could not ensure log table: {e}", file=sys.stderr) + + +def load_bucket_assignments(path: Path) -> Dict[str, int]: + with open(path, "rb") as f: + doc = pickle.load(f) + assignments = doc.get("assignments") if isinstance(doc, dict) else None + if not isinstance(assignments, dict): + raise RuntimeError(f"{path}: expected dict with 'assignments' key") + return {str(k): int(v) for k, v in assignments.items()} + + +def pull_trades(strategy: str, since: str, until: Optional[str]) -> List[Dict[str, Any]]: + where = [f"strategy = '{strategy}'", f"date >= toDate('{since}')"] + if until: + where.append(f"date <= toDate('{until}')") + where_sql = " AND ".join(where) + sql = ( + "SELECT asset, pnl, pnl_pct, exit_reason " + f"FROM dolphin.trade_events WHERE {where_sql}" + ) + return ch_query(sql) + + +def compute_bucket_stats( + trades: List[Dict[str, Any]], + assignments: Dict[str, int], + buckets: List[int], + min_trades: int, +) -> Dict[int, Dict[str, float]]: + stats: Dict[int, Dict[str, float]] = {b: {"n": 0, "wins": 0, "net_pnl": 0.0, "net_pnl_pct": 0.0} for b in buckets} + for t in trades: + asset = str(t.get("asset", "")) + b = assignments.get(asset) + if b is None or b not in stats: + continue + stats[b]["n"] += 1 + stats[b]["wins"] += 1 if float(t.get("pnl_pct") or 0.0) > 0 else 0 + stats[b]["net_pnl"] += float(t.get("pnl") or 0.0) + stats[b]["net_pnl_pct"] += float(t.get("pnl_pct") or 0.0) + + out = {} + for b, s in stats.items(): + n = s["n"] if s["n"] >= min_trades else s["n"] + if s["n"] == 0: + out[b] = {"n": 0, "wr": 0.0, "avg_pnl_pct": 0.0, "net_pnl": 0.0, "enough": False} + else: + out[b] = { + "n": s["n"], + "wr": s["wins"] / s["n"], + "avg_pnl_pct": s["net_pnl_pct"] / s["n"], + "net_pnl": s["net_pnl"], + "enough": s["n"] >= min_trades, + } + return out + + +def stats_to_multipliers(stats: Dict[int, Dict[str, float]]) -> Dict[int, float]: + """Translate per-bucket aggregate outcomes into a size multiplier. + + Conservative mapping (keeps the file a documented, deterministic function — + not an ML output). Brackets match the S6 reference row in + prod/docs/CRITICAL_ASSET_PICKING_BRACKETS_VS._ROI_WR_AT_TRADES.md. + + avg_pnl_pct > 0.015 → 2.0 (lean in) + avg_pnl_pct > 0.005 → 1.5 + avg_pnl_pct > 0.001 → 1.0 (omit from YAML — no-op) + avg_pnl_pct > 0.000 → 0.5 + avg_pnl_pct > -0.002 → 0.3 + avg_pnl_pct > -0.005 → 0.0 (ban candidate — see `asset_bucket_ban_set`) + otherwise → 0.0 + """ + out: Dict[int, float] = {} + for b, s in stats.items(): + if not s.get("enough"): + continue # skip thin buckets — default to no-op (absent = 1.0x) + apct = s["avg_pnl_pct"] + if apct > 0.015: out[b] = 2.0 + elif apct > 0.005: out[b] = 1.5 + elif apct > 0.001: pass # no-op (1.0x) + elif apct > 0.000: out[b] = 0.5 + elif apct > -0.002: out[b] = 0.3 + else: out[b] = 0.0 + return out + + +def load_prior_table(path: Path) -> Dict[int, float]: + if not path.exists(): + return {} + try: + import yaml + doc = yaml.safe_load(path.read_text()) or {} + buckets = doc.get("buckets") or {} + return {int(k): float(v) for k, v in buckets.items()} + except Exception as e: + print(f"[s6_recompute] prior table load warning: {e}", file=sys.stderr) + return {} + + +def write_table_yml(path: Path, buckets: Dict[int, float], meta: Dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + lines = [ + "# GREEN S6 bucket sizing table — auto-generated, do not edit by hand.", + "# Consumed by nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py", + "# at the single-site notional application.", + "", + "meta:", + f" generated_at: \"{meta['generated_at']}\"", + f" source_branch: \"{meta.get('source_branch', '')}\"", + f" strategy: \"{meta['strategy']}\"", + f" n_trades: {meta['n_trades']}", + f" min_trades_per_bucket: {meta['min_trades']}", + "", + "buckets:", + ] + for b in sorted(buckets): + lines.append(f" {b}: {buckets[b]:.2f}") + path.write_text("\n".join(lines) + "\n") + + +def write_report(path: Path, stats: Dict[int, Dict[str, float]], + new_table: Dict[int, float], prior_table: Dict[int, float], + meta: Dict[str, Any], flagged: List[int]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + lines = [ + f"# S6 Coefficient Recompute — {meta['generated_at']}", + "", + f"- Strategy: `{meta['strategy']}`", + f"- Window: `{meta['since']}` → `{meta.get('until') or 'now'}`", + f"- Total trades: `{meta['n_trades']}`", + f"- Min trades per bucket: `{meta['min_trades']}`", + "", + "## Per-bucket stats", + "", + "| Bucket | Trades | WR % | Avg PnL % | Net PnL | New mult | Prior mult | Flagged |", + "|---:|---:|---:|---:|---:|---:|---:|:---:|", + ] + for b in sorted(stats): + s = stats[b] + new = new_table.get(b, 1.0) + prior = prior_table.get(b, 1.0) + flag = "⚠" if b in flagged else "" + lines.append( + f"| {b} | {s['n']} | {100*s['wr']:.2f} | {100*s['avg_pnl_pct']:.3f} | " + f"{s['net_pnl']:.2f} | {new:.2f} | {prior:.2f} | {flag} |" + ) + if flagged: + lines += [ + "", + "## ⚠ Flagged for manual review", + "", + "The following buckets moved more than the variance threshold since the", + "prior pinned table. The new YAML was written but should be reviewed", + "before promotion to staging/prod — see `dolphin.s6_recompute_log`.", + "", + "| Bucket | Prior net | New net | Δ% |", + "|---:|---:|---:|---:|", + ] + for b in flagged: + prior_net = prior_table.get(f"_net_{b}") # may be absent + lines.append(f"| {b} | — | {stats[b]['net_pnl']:.2f} | — |") + path.write_text("\n".join(lines) + "\n") + + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + ap.add_argument("--strategy", default="blue", + help="trade_events.strategy used for historical grounding (BLUE by default; richest dataset)") + ap.add_argument("--since", default="2026-01-01") + ap.add_argument("--until", default=None) + ap.add_argument("--min-trades-per-bucket", dest="min_trades", type=int, default=20) + ap.add_argument("--variance-threshold", type=float, default=0.20, + help="fraction move in bucket net-PnL that flags a bucket for review (default 0.20 = 20%%)") + ap.add_argument("--bucket-pkl", default=str(DEFAULT_BUCKET_PKL), + help="path to bucket_assignments.pkl") + ap.add_argument("--out-yml", default=str(DEFAULT_TABLE_YML)) + ap.add_argument("--out-report", default=None, + help="report path (default: prod/docs/S6_COEFFICIENTS_REPORT_.md)") + ap.add_argument("--source-branch", default="", + help="branch slug to stamp into the YAML frontmatter for traceability") + args = ap.parse_args() + + assignments = load_bucket_assignments(Path(args.bucket_pkl)) + buckets_seen = sorted(set(assignments.values())) + trades = pull_trades(args.strategy, args.since, args.until) + stats = compute_bucket_stats(trades, assignments, buckets_seen, args.min_trades) + new_table = stats_to_multipliers(stats) + + prior_table = load_prior_table(Path(args.out_yml)) + + # Variance guard — flag buckets whose net-PnL moved more than the threshold. + flagged: List[int] = [] + for b, s in stats.items(): + if not s.get("enough"): + continue + prior_mult = prior_table.get(b) + new_mult = new_table.get(b, 1.0) + if prior_mult is None: + continue + denom = max(abs(prior_mult), 1e-6) + if abs(new_mult - prior_mult) / denom > args.variance_threshold: + flagged.append(b) + + generated_at = datetime.utcnow().isoformat(timespec="seconds") + "Z" + meta = { + "generated_at": generated_at, + "source_branch": args.source_branch, + "strategy": args.strategy, + "since": args.since, + "until": args.until, + "n_trades": len(trades), + "min_trades": args.min_trades, + } + + write_table_yml(Path(args.out_yml), new_table, meta) + + out_report = Path(args.out_report) if args.out_report else REPORT_DIR / f"S6_COEFFICIENTS_REPORT_{generated_at[:10]}.md" + write_report(out_report, stats, new_table, prior_table, meta, flagged) + + ensure_recompute_log() + for b in flagged: + ch_insert("s6_recompute_log", { + "ts": int(datetime.utcnow().timestamp() * 1_000_000), + "bucket_id": int(b), + "n_trades": int(stats[b]["n"]), + "wr_pct": float(100.0 * stats[b]["wr"]), + "net_pnl": float(stats[b]["net_pnl"]), + "prior_net": None, + "delta_pct": None, + "new_mult": float(new_table.get(b, 1.0)), + "prior_mult": float(prior_table.get(b, 1.0)), + "flagged": 1, + }) + + print(f"Wrote {args.out_yml} ({len(new_table)} active buckets, {len(flagged)} flagged)") + print(f"Wrote {out_report}") + return 0 + + +if __name__ == "__main__": + sys.exit(main())