#!/usr/bin/env python3 """Recompute S6 bucket sizing coefficients from fresh `dolphin.trade_events`. Regenerates: - prod/configs/green_s6_table.yml (consumed by the GREEN orchestrator single-site) - prod/docs/S6_COEFFICIENTS_REPORT_.md (human-readable summary) Variance guard ────────────── If any bucket's net-PnL moves more than `--variance-threshold` (default 20%) relative to the prior pinned table, we STILL write the new YAML (so replay/debug cycles remain reproducible) but also write a row to `dolphin.s6_recompute_log` so a human can review before promoting to staging/prod. Usage ───── python prod/scripts/recompute_s6_coefficients.py \\ --strategy blue --since 2026-01-01 --min-trades-per-bucket 20 Intended to be driven by `prod/s6_recompute_flow.py` (Prefect) on a 30-day cadence. """ from __future__ import annotations import argparse import json import pickle import sys import urllib.parse import urllib.request from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional CH_URL = "http://localhost:8123/" CH_HEADERS = { "X-ClickHouse-User": "dolphin", "X-ClickHouse-Key": "dolphin_ch_2026", } DEFAULT_BUCKET_PKL = Path("adaptive_exit/models/bucket_assignments.pkl") DEFAULT_TABLE_YML = Path("prod/configs/green_s6_table.yml") REPORT_DIR = Path("prod/docs") def ch_query(sql: str) -> List[Dict[str, Any]]: qs = urllib.parse.urlencode({"query": sql + " FORMAT JSONEachRow"}) req = urllib.request.Request(f"{CH_URL}?{qs}", method="GET") for k, v in CH_HEADERS.items(): req.add_header(k, v) with urllib.request.urlopen(req, timeout=60) as resp: body = resp.read().decode("utf-8", errors="replace") rows: List[Dict[str, Any]] = [] for line in body.splitlines(): line = line.strip() if line: rows.append(json.loads(line)) return rows def ch_insert(table: str, row: Dict[str, Any], db: str = "dolphin") -> None: """Best-effort insert into `{db}.{table}` (JSONEachRow). Swallows errors.""" body = (json.dumps(row) + "\n").encode() qs = urllib.parse.urlencode({ "database": db, "query": f"INSERT INTO {table} FORMAT JSONEachRow", }) req = urllib.request.Request(f"{CH_URL}?{qs}", data=body, method="POST") for k, v in CH_HEADERS.items(): req.add_header(k, v) try: urllib.request.urlopen(req, timeout=10) except Exception as e: print(f"[s6_recompute] CH insert warning: {e}", file=sys.stderr) def ensure_recompute_log() -> None: ddl = ( "CREATE TABLE IF NOT EXISTS dolphin.s6_recompute_log (" "ts DateTime64(6, 'UTC')," "ts_day Date MATERIALIZED toDate(ts)," "bucket_id UInt8," "n_trades UInt32," "wr_pct Float32," "net_pnl Float64," "prior_net Nullable(Float64)," "delta_pct Nullable(Float32)," "new_mult Float32," "prior_mult Nullable(Float32)," "flagged UInt8" ") ENGINE = MergeTree()" " ORDER BY (ts_day, bucket_id, ts)" " TTL ts_day + INTERVAL 365 DAY" ) req = urllib.request.Request(CH_URL, data=ddl.encode(), method="POST") for k, v in CH_HEADERS.items(): req.add_header(k, v) try: urllib.request.urlopen(req, timeout=10) except Exception as e: print(f"[s6_recompute] could not ensure log table: {e}", file=sys.stderr) def load_bucket_assignments(path: Path) -> Dict[str, int]: with open(path, "rb") as f: doc = pickle.load(f) assignments = doc.get("assignments") if isinstance(doc, dict) else None if not isinstance(assignments, dict): raise RuntimeError(f"{path}: expected dict with 'assignments' key") return {str(k): int(v) for k, v in assignments.items()} def pull_trades(strategy: str, since: str, until: Optional[str]) -> List[Dict[str, Any]]: where = [f"strategy = '{strategy}'", f"date >= toDate('{since}')"] if until: where.append(f"date <= toDate('{until}')") where_sql = " AND ".join(where) sql = ( "SELECT asset, pnl, pnl_pct, exit_reason " f"FROM dolphin.trade_events WHERE {where_sql}" ) return ch_query(sql) def compute_bucket_stats( trades: List[Dict[str, Any]], assignments: Dict[str, int], buckets: List[int], min_trades: int, ) -> Dict[int, Dict[str, float]]: stats: Dict[int, Dict[str, float]] = {b: {"n": 0, "wins": 0, "net_pnl": 0.0, "net_pnl_pct": 0.0} for b in buckets} for t in trades: asset = str(t.get("asset", "")) b = assignments.get(asset) if b is None or b not in stats: continue stats[b]["n"] += 1 stats[b]["wins"] += 1 if float(t.get("pnl_pct") or 0.0) > 0 else 0 stats[b]["net_pnl"] += float(t.get("pnl") or 0.0) stats[b]["net_pnl_pct"] += float(t.get("pnl_pct") or 0.0) out = {} for b, s in stats.items(): n = s["n"] if s["n"] >= min_trades else s["n"] if s["n"] == 0: out[b] = {"n": 0, "wr": 0.0, "avg_pnl_pct": 0.0, "net_pnl": 0.0, "enough": False} else: out[b] = { "n": s["n"], "wr": s["wins"] / s["n"], "avg_pnl_pct": s["net_pnl_pct"] / s["n"], "net_pnl": s["net_pnl"], "enough": s["n"] >= min_trades, } return out def stats_to_multipliers(stats: Dict[int, Dict[str, float]]) -> Dict[int, float]: """Translate per-bucket aggregate outcomes into a size multiplier. Conservative mapping (keeps the file a documented, deterministic function — not an ML output). Brackets match the S6 reference row in prod/docs/CRITICAL_ASSET_PICKING_BRACKETS_VS._ROI_WR_AT_TRADES.md. avg_pnl_pct > 0.015 → 2.0 (lean in) avg_pnl_pct > 0.005 → 1.5 avg_pnl_pct > 0.001 → 1.0 (omit from YAML — no-op) avg_pnl_pct > 0.000 → 0.5 avg_pnl_pct > -0.002 → 0.3 avg_pnl_pct > -0.005 → 0.0 (ban candidate — see `asset_bucket_ban_set`) otherwise → 0.0 """ out: Dict[int, float] = {} for b, s in stats.items(): if not s.get("enough"): continue # skip thin buckets — default to no-op (absent = 1.0x) apct = s["avg_pnl_pct"] if apct > 0.015: out[b] = 2.0 elif apct > 0.005: out[b] = 1.5 elif apct > 0.001: pass # no-op (1.0x) elif apct > 0.000: out[b] = 0.5 elif apct > -0.002: out[b] = 0.3 else: out[b] = 0.0 return out def load_prior_table(path: Path) -> Dict[int, float]: if not path.exists(): return {} try: import yaml doc = yaml.safe_load(path.read_text()) or {} buckets = doc.get("buckets") or {} return {int(k): float(v) for k, v in buckets.items()} except Exception as e: print(f"[s6_recompute] prior table load warning: {e}", file=sys.stderr) return {} def write_table_yml(path: Path, buckets: Dict[int, float], meta: Dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) lines = [ "# GREEN S6 bucket sizing table — auto-generated, do not edit by hand.", "# Consumed by nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py", "# at the single-site notional application.", "", "meta:", f" generated_at: \"{meta['generated_at']}\"", f" source_branch: \"{meta.get('source_branch', '')}\"", f" strategy: \"{meta['strategy']}\"", f" n_trades: {meta['n_trades']}", f" min_trades_per_bucket: {meta['min_trades']}", "", "buckets:", ] for b in sorted(buckets): lines.append(f" {b}: {buckets[b]:.2f}") path.write_text("\n".join(lines) + "\n") def write_report(path: Path, stats: Dict[int, Dict[str, float]], new_table: Dict[int, float], prior_table: Dict[int, float], meta: Dict[str, Any], flagged: List[int]) -> None: path.parent.mkdir(parents=True, exist_ok=True) lines = [ f"# S6 Coefficient Recompute — {meta['generated_at']}", "", f"- Strategy: `{meta['strategy']}`", f"- Window: `{meta['since']}` → `{meta.get('until') or 'now'}`", f"- Total trades: `{meta['n_trades']}`", f"- Min trades per bucket: `{meta['min_trades']}`", "", "## Per-bucket stats", "", "| Bucket | Trades | WR % | Avg PnL % | Net PnL | New mult | Prior mult | Flagged |", "|---:|---:|---:|---:|---:|---:|---:|:---:|", ] for b in sorted(stats): s = stats[b] new = new_table.get(b, 1.0) prior = prior_table.get(b, 1.0) flag = "⚠" if b in flagged else "" lines.append( f"| {b} | {s['n']} | {100*s['wr']:.2f} | {100*s['avg_pnl_pct']:.3f} | " f"{s['net_pnl']:.2f} | {new:.2f} | {prior:.2f} | {flag} |" ) if flagged: lines += [ "", "## ⚠ Flagged for manual review", "", "The following buckets moved more than the variance threshold since the", "prior pinned table. The new YAML was written but should be reviewed", "before promotion to staging/prod — see `dolphin.s6_recompute_log`.", "", "| Bucket | Prior net | New net | Δ% |", "|---:|---:|---:|---:|", ] for b in flagged: prior_net = prior_table.get(f"_net_{b}") # may be absent lines.append(f"| {b} | — | {stats[b]['net_pnl']:.2f} | — |") path.write_text("\n".join(lines) + "\n") def main() -> int: ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--strategy", default="blue", help="trade_events.strategy used for historical grounding (BLUE by default; richest dataset)") ap.add_argument("--since", default="2026-01-01") ap.add_argument("--until", default=None) ap.add_argument("--min-trades-per-bucket", dest="min_trades", type=int, default=20) ap.add_argument("--variance-threshold", type=float, default=0.20, help="fraction move in bucket net-PnL that flags a bucket for review (default 0.20 = 20%%)") ap.add_argument("--bucket-pkl", default=str(DEFAULT_BUCKET_PKL), help="path to bucket_assignments.pkl") ap.add_argument("--out-yml", default=str(DEFAULT_TABLE_YML)) ap.add_argument("--out-report", default=None, help="report path (default: prod/docs/S6_COEFFICIENTS_REPORT_.md)") ap.add_argument("--source-branch", default="", help="branch slug to stamp into the YAML frontmatter for traceability") args = ap.parse_args() assignments = load_bucket_assignments(Path(args.bucket_pkl)) buckets_seen = sorted(set(assignments.values())) trades = pull_trades(args.strategy, args.since, args.until) stats = compute_bucket_stats(trades, assignments, buckets_seen, args.min_trades) new_table = stats_to_multipliers(stats) prior_table = load_prior_table(Path(args.out_yml)) # Variance guard — flag buckets whose net-PnL moved more than the threshold. flagged: List[int] = [] for b, s in stats.items(): if not s.get("enough"): continue prior_mult = prior_table.get(b) new_mult = new_table.get(b, 1.0) if prior_mult is None: continue denom = max(abs(prior_mult), 1e-6) if abs(new_mult - prior_mult) / denom > args.variance_threshold: flagged.append(b) generated_at = datetime.utcnow().isoformat(timespec="seconds") + "Z" meta = { "generated_at": generated_at, "source_branch": args.source_branch, "strategy": args.strategy, "since": args.since, "until": args.until, "n_trades": len(trades), "min_trades": args.min_trades, } write_table_yml(Path(args.out_yml), new_table, meta) out_report = Path(args.out_report) if args.out_report else REPORT_DIR / f"S6_COEFFICIENTS_REPORT_{generated_at[:10]}.md" write_report(out_report, stats, new_table, prior_table, meta, flagged) ensure_recompute_log() for b in flagged: ch_insert("s6_recompute_log", { "ts": int(datetime.utcnow().timestamp() * 1_000_000), "bucket_id": int(b), "n_trades": int(stats[b]["n"]), "wr_pct": float(100.0 * stats[b]["wr"]), "net_pnl": float(stats[b]["net_pnl"]), "prior_net": None, "delta_pct": None, "new_mult": float(new_table.get(b, 1.0)), "prior_mult": float(prior_table.get(b, 1.0)), "flagged": 1, }) print(f"Wrote {args.out_yml} ({len(new_table)} active buckets, {len(flagged)} flagged)") print(f"Wrote {out_report}") return 0 if __name__ == "__main__": sys.exit(main())