Files
DOLPHIN/prod/scripts/recompute_s6_coefficients.py

342 lines
13 KiB
Python
Raw Permalink Normal View History

feat(config/scripts): GREEN config wiring + S6 recompute tooling prod/configs/green.yml: - asset_bucket_ban_set: [4] (B4 banned at selector level) - s6_size_table: inline bootstrap multipliers (B0→0.4, B1→0.3, B3→2.0, B5→0.5, B6→1.5) matching CRITICAL_ASSET_PICKING S6 scenario - esof_sizing_table: FAV→1.2, MILD_POS→0.6, UNKNOWN→0.25, MILD_NEG→0.0, UNFAV→0.0 - use_int_leverage: true (1x fixed pending winrate analysis) - s6_table_path: pointer to generated YAML (recompute updates this) BLUE (blue.yml) carries none of these keys → BLUE math unchanged. prod/configs/green_s6_table.yml: bootstrap stub with frontmatter (generated_at, source_branch, n_trades). Regenerated by recompute script. prod/scripts/recompute_s6_coefficients.py: Queries trade_events, maps assets to KMeans buckets, derives per-bucket sizing mults. Variance guard: >20% net-PnL move flags bucket in dolphin.s6_recompute_log for manual review before promote. prod/s6_recompute_flow.py: Prefect flow wrapping the recompute script. Cadence via S6_RECOMPUTE_INTERVAL_DAYS env (default 30). Kill-switch: S6_RECOMPUTE_DISABLED=1. prod/scripts/analyze_leverage_winrate.py: Read-only walk of CH trade_events; bins trades by leverage_raw, emits per-bin WR/net-PnL/avg-MAE. Output informs the int-leverage rounding rule choice (Option 1 round-half-up vs Option 2 banker's round vs stay-at-1x). Does not auto-apply a rule change. Plan refs: Tasks 3, 8, 10. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 06:08:08 +02:00
#!/usr/bin/env python3
"""Recompute S6 bucket sizing coefficients from fresh `dolphin.trade_events`.
Regenerates:
- prod/configs/green_s6_table.yml (consumed by the GREEN orchestrator single-site)
- prod/docs/S6_COEFFICIENTS_REPORT_<YYYY-MM-DD>.md (human-readable summary)
Variance guard
If any bucket's net-PnL moves more than `--variance-threshold` (default 20%)
relative to the prior pinned table, we STILL write the new YAML (so replay/debug
cycles remain reproducible) but also write a row to `dolphin.s6_recompute_log`
so a human can review before promoting to staging/prod.
Usage
python prod/scripts/recompute_s6_coefficients.py \\
--strategy blue --since 2026-01-01 --min-trades-per-bucket 20
Intended to be driven by `prod/s6_recompute_flow.py` (Prefect) on a 30-day cadence.
"""
from __future__ import annotations
import argparse
import json
import pickle
import sys
import urllib.parse
import urllib.request
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
CH_URL = "http://localhost:8123/"
CH_HEADERS = {
"X-ClickHouse-User": "dolphin",
"X-ClickHouse-Key": "dolphin_ch_2026",
}
DEFAULT_BUCKET_PKL = Path("adaptive_exit/models/bucket_assignments.pkl")
DEFAULT_TABLE_YML = Path("prod/configs/green_s6_table.yml")
REPORT_DIR = Path("prod/docs")
def ch_query(sql: str) -> List[Dict[str, Any]]:
qs = urllib.parse.urlencode({"query": sql + " FORMAT JSONEachRow"})
req = urllib.request.Request(f"{CH_URL}?{qs}", method="GET")
for k, v in CH_HEADERS.items():
req.add_header(k, v)
with urllib.request.urlopen(req, timeout=60) as resp:
body = resp.read().decode("utf-8", errors="replace")
rows: List[Dict[str, Any]] = []
for line in body.splitlines():
line = line.strip()
if line:
rows.append(json.loads(line))
return rows
def ch_insert(table: str, row: Dict[str, Any], db: str = "dolphin") -> None:
"""Best-effort insert into `{db}.{table}` (JSONEachRow). Swallows errors."""
body = (json.dumps(row) + "\n").encode()
qs = urllib.parse.urlencode({
"database": db,
"query": f"INSERT INTO {table} FORMAT JSONEachRow",
})
req = urllib.request.Request(f"{CH_URL}?{qs}", data=body, method="POST")
for k, v in CH_HEADERS.items():
req.add_header(k, v)
try:
urllib.request.urlopen(req, timeout=10)
except Exception as e:
print(f"[s6_recompute] CH insert warning: {e}", file=sys.stderr)
def ensure_recompute_log() -> None:
ddl = (
"CREATE TABLE IF NOT EXISTS dolphin.s6_recompute_log ("
"ts DateTime64(6, 'UTC'),"
"ts_day Date MATERIALIZED toDate(ts),"
"bucket_id UInt8,"
"n_trades UInt32,"
"wr_pct Float32,"
"net_pnl Float64,"
"prior_net Nullable(Float64),"
"delta_pct Nullable(Float32),"
"new_mult Float32,"
"prior_mult Nullable(Float32),"
"flagged UInt8"
") ENGINE = MergeTree()"
" ORDER BY (ts_day, bucket_id, ts)"
" TTL ts_day + INTERVAL 365 DAY"
)
req = urllib.request.Request(CH_URL, data=ddl.encode(), method="POST")
for k, v in CH_HEADERS.items():
req.add_header(k, v)
try:
urllib.request.urlopen(req, timeout=10)
except Exception as e:
print(f"[s6_recompute] could not ensure log table: {e}", file=sys.stderr)
def load_bucket_assignments(path: Path) -> Dict[str, int]:
with open(path, "rb") as f:
doc = pickle.load(f)
assignments = doc.get("assignments") if isinstance(doc, dict) else None
if not isinstance(assignments, dict):
raise RuntimeError(f"{path}: expected dict with 'assignments' key")
return {str(k): int(v) for k, v in assignments.items()}
def pull_trades(strategy: str, since: str, until: Optional[str]) -> List[Dict[str, Any]]:
where = [f"strategy = '{strategy}'", f"date >= toDate('{since}')"]
if until:
where.append(f"date <= toDate('{until}')")
where_sql = " AND ".join(where)
sql = (
"SELECT asset, pnl, pnl_pct, exit_reason "
f"FROM dolphin.trade_events WHERE {where_sql}"
)
return ch_query(sql)
def compute_bucket_stats(
trades: List[Dict[str, Any]],
assignments: Dict[str, int],
buckets: List[int],
min_trades: int,
) -> Dict[int, Dict[str, float]]:
stats: Dict[int, Dict[str, float]] = {b: {"n": 0, "wins": 0, "net_pnl": 0.0, "net_pnl_pct": 0.0} for b in buckets}
for t in trades:
asset = str(t.get("asset", ""))
b = assignments.get(asset)
if b is None or b not in stats:
continue
stats[b]["n"] += 1
stats[b]["wins"] += 1 if float(t.get("pnl_pct") or 0.0) > 0 else 0
stats[b]["net_pnl"] += float(t.get("pnl") or 0.0)
stats[b]["net_pnl_pct"] += float(t.get("pnl_pct") or 0.0)
out = {}
for b, s in stats.items():
n = s["n"] if s["n"] >= min_trades else s["n"]
if s["n"] == 0:
out[b] = {"n": 0, "wr": 0.0, "avg_pnl_pct": 0.0, "net_pnl": 0.0, "enough": False}
else:
out[b] = {
"n": s["n"],
"wr": s["wins"] / s["n"],
"avg_pnl_pct": s["net_pnl_pct"] / s["n"],
"net_pnl": s["net_pnl"],
"enough": s["n"] >= min_trades,
}
return out
def stats_to_multipliers(stats: Dict[int, Dict[str, float]]) -> Dict[int, float]:
"""Translate per-bucket aggregate outcomes into a size multiplier.
Conservative mapping (keeps the file a documented, deterministic function
not an ML output). Brackets match the S6 reference row in
prod/docs/CRITICAL_ASSET_PICKING_BRACKETS_VS._ROI_WR_AT_TRADES.md.
avg_pnl_pct > 0.015 2.0 (lean in)
avg_pnl_pct > 0.005 1.5
avg_pnl_pct > 0.001 1.0 (omit from YAML no-op)
avg_pnl_pct > 0.000 0.5
avg_pnl_pct > -0.002 0.3
avg_pnl_pct > -0.005 0.0 (ban candidate see `asset_bucket_ban_set`)
otherwise 0.0
"""
out: Dict[int, float] = {}
for b, s in stats.items():
if not s.get("enough"):
continue # skip thin buckets — default to no-op (absent = 1.0x)
apct = s["avg_pnl_pct"]
if apct > 0.015: out[b] = 2.0
elif apct > 0.005: out[b] = 1.5
elif apct > 0.001: pass # no-op (1.0x)
elif apct > 0.000: out[b] = 0.5
elif apct > -0.002: out[b] = 0.3
else: out[b] = 0.0
return out
def load_prior_table(path: Path) -> Dict[int, float]:
if not path.exists():
return {}
try:
import yaml
doc = yaml.safe_load(path.read_text()) or {}
buckets = doc.get("buckets") or {}
return {int(k): float(v) for k, v in buckets.items()}
except Exception as e:
print(f"[s6_recompute] prior table load warning: {e}", file=sys.stderr)
return {}
def write_table_yml(path: Path, buckets: Dict[int, float], meta: Dict[str, Any]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
lines = [
"# GREEN S6 bucket sizing table — auto-generated, do not edit by hand.",
"# Consumed by nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py",
"# at the single-site notional application.",
"",
"meta:",
f" generated_at: \"{meta['generated_at']}\"",
f" source_branch: \"{meta.get('source_branch', '')}\"",
f" strategy: \"{meta['strategy']}\"",
f" n_trades: {meta['n_trades']}",
f" min_trades_per_bucket: {meta['min_trades']}",
"",
"buckets:",
]
for b in sorted(buckets):
lines.append(f" {b}: {buckets[b]:.2f}")
path.write_text("\n".join(lines) + "\n")
def write_report(path: Path, stats: Dict[int, Dict[str, float]],
new_table: Dict[int, float], prior_table: Dict[int, float],
meta: Dict[str, Any], flagged: List[int]) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
lines = [
f"# S6 Coefficient Recompute — {meta['generated_at']}",
"",
f"- Strategy: `{meta['strategy']}`",
f"- Window: `{meta['since']}` → `{meta.get('until') or 'now'}`",
f"- Total trades: `{meta['n_trades']}`",
f"- Min trades per bucket: `{meta['min_trades']}`",
"",
"## Per-bucket stats",
"",
"| Bucket | Trades | WR % | Avg PnL % | Net PnL | New mult | Prior mult | Flagged |",
"|---:|---:|---:|---:|---:|---:|---:|:---:|",
]
for b in sorted(stats):
s = stats[b]
new = new_table.get(b, 1.0)
prior = prior_table.get(b, 1.0)
flag = "" if b in flagged else ""
lines.append(
f"| {b} | {s['n']} | {100*s['wr']:.2f} | {100*s['avg_pnl_pct']:.3f} | "
f"{s['net_pnl']:.2f} | {new:.2f} | {prior:.2f} | {flag} |"
)
if flagged:
lines += [
"",
"## ⚠ Flagged for manual review",
"",
"The following buckets moved more than the variance threshold since the",
"prior pinned table. The new YAML was written but should be reviewed",
"before promotion to staging/prod — see `dolphin.s6_recompute_log`.",
"",
"| Bucket | Prior net | New net | Δ% |",
"|---:|---:|---:|---:|",
]
for b in flagged:
prior_net = prior_table.get(f"_net_{b}") # may be absent
lines.append(f"| {b} | — | {stats[b]['net_pnl']:.2f} | — |")
path.write_text("\n".join(lines) + "\n")
def main() -> int:
ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
ap.add_argument("--strategy", default="blue",
help="trade_events.strategy used for historical grounding (BLUE by default; richest dataset)")
ap.add_argument("--since", default="2026-01-01")
ap.add_argument("--until", default=None)
ap.add_argument("--min-trades-per-bucket", dest="min_trades", type=int, default=20)
ap.add_argument("--variance-threshold", type=float, default=0.20,
help="fraction move in bucket net-PnL that flags a bucket for review (default 0.20 = 20%%)")
ap.add_argument("--bucket-pkl", default=str(DEFAULT_BUCKET_PKL),
help="path to bucket_assignments.pkl")
ap.add_argument("--out-yml", default=str(DEFAULT_TABLE_YML))
ap.add_argument("--out-report", default=None,
help="report path (default: prod/docs/S6_COEFFICIENTS_REPORT_<date>.md)")
ap.add_argument("--source-branch", default="",
help="branch slug to stamp into the YAML frontmatter for traceability")
args = ap.parse_args()
assignments = load_bucket_assignments(Path(args.bucket_pkl))
buckets_seen = sorted(set(assignments.values()))
trades = pull_trades(args.strategy, args.since, args.until)
stats = compute_bucket_stats(trades, assignments, buckets_seen, args.min_trades)
new_table = stats_to_multipliers(stats)
prior_table = load_prior_table(Path(args.out_yml))
# Variance guard — flag buckets whose net-PnL moved more than the threshold.
flagged: List[int] = []
for b, s in stats.items():
if not s.get("enough"):
continue
prior_mult = prior_table.get(b)
new_mult = new_table.get(b, 1.0)
if prior_mult is None:
continue
denom = max(abs(prior_mult), 1e-6)
if abs(new_mult - prior_mult) / denom > args.variance_threshold:
flagged.append(b)
generated_at = datetime.utcnow().isoformat(timespec="seconds") + "Z"
meta = {
"generated_at": generated_at,
"source_branch": args.source_branch,
"strategy": args.strategy,
"since": args.since,
"until": args.until,
"n_trades": len(trades),
"min_trades": args.min_trades,
}
write_table_yml(Path(args.out_yml), new_table, meta)
out_report = Path(args.out_report) if args.out_report else REPORT_DIR / f"S6_COEFFICIENTS_REPORT_{generated_at[:10]}.md"
write_report(out_report, stats, new_table, prior_table, meta, flagged)
ensure_recompute_log()
for b in flagged:
ch_insert("s6_recompute_log", {
"ts": int(datetime.utcnow().timestamp() * 1_000_000),
"bucket_id": int(b),
"n_trades": int(stats[b]["n"]),
"wr_pct": float(100.0 * stats[b]["wr"]),
"net_pnl": float(stats[b]["net_pnl"]),
"prior_net": None,
"delta_pct": None,
"new_mult": float(new_table.get(b, 1.0)),
"prior_mult": float(prior_table.get(b, 1.0)),
"flagged": 1,
})
print(f"Wrote {args.out_yml} ({len(new_table)} active buckets, {len(flagged)} flagged)")
print(f"Wrote {out_report}")
return 0
if __name__ == "__main__":
sys.exit(main())