#!/usr/bin/env python3 """Leverage → WR / net-PnL analysis over CH `dolphin.trade_events`. Purpose ─────── The target exchanges (Binance futures) require integer leverage. The GREEN orchestrator single-site (`esf_alpha_orchestrator.py`) therefore rounds leverage to an integer at entry time; the current default is FIXED 1x with two candidate rounding rules commented in source, pending this analysis: Option 1 (round-half-up, conservative): int(math.floor(raw + 0.5)), min=1 Option 2 (banker's round, aggressive): int(round(raw)), min=1 Stay-at-1x (current default): leverage_int = 1 This script walks historical trades and, for every distinct leverage value observed (bucketed to a user-chosen step), reports per-bin WR, net-PnL, avg-MAE and sample count. The output report informs the rule choice. Decision rubric (documented in the generated report) ──────────────────────────────────────────────────── - Higher-lev bins show clearly better net-PnL w/ tight sample error → Option 2 - Flat / noisy signal across bins → Option 1 - Higher-lev bins show WORSE net-PnL / mixed → stay 1x This script is read-only; it does not auto-apply a rule change. Usage ───── python prod/scripts/analyze_leverage_winrate.py \\ --strategy blue \\ --since 2026-01-01 \\ --step 0.5 \\ --out prod/docs/LEVERAGE_WINRATE_REPORT_2026-04-21.md """ from __future__ import annotations import argparse import json import math import sys import urllib.parse import urllib.request from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional CH_URL = "http://localhost:8123/" CH_HEADERS = { "X-ClickHouse-User": "dolphin", "X-ClickHouse-Key": "dolphin_ch_2026", } def ch_query(sql: str) -> List[Dict[str, Any]]: """Run a CH query returning JSONEachRow; raise if the HTTP call fails.""" qs = urllib.parse.urlencode({"query": sql + " FORMAT JSONEachRow"}) req = urllib.request.Request(f"{CH_URL}?{qs}", method="GET") for k, v in CH_HEADERS.items(): req.add_header(k, v) with urllib.request.urlopen(req, timeout=60) as resp: body = resp.read().decode("utf-8", errors="replace") rows: List[Dict[str, Any]] = [] for line in body.splitlines(): line = line.strip() if line: rows.append(json.loads(line)) return rows def bin_leverage(lev: float, step: float) -> float: """Bucket leverage to `step`-wide bins, anchored at 0. Returns bin lower edge.""" if lev is None or not math.isfinite(lev) or lev <= 0: return 0.0 return math.floor(lev / step) * step def load_trades(strategy: str, since: str, until: Optional[str]) -> List[Dict[str, Any]]: """Pull trade_events rows. Uses leverage_raw if present (post-sprint rows), else falls back to leverage (pre-sprint rows on the same trade semantics). """ where = [f"strategy = '{strategy}'", f"date >= toDate('{since}')"] if until: where.append(f"date <= toDate('{until}')") where_sql = " AND ".join(where) sql = ( "SELECT " " toDate(date) AS d, " " asset, " " coalesce(leverage_raw, leverage) AS lev_raw, " " leverage AS lev_effective, " " pnl, pnl_pct, " " (pnl_pct > 0) AS is_win, " " exit_reason " f"FROM dolphin.trade_events WHERE {where_sql}" ) return ch_query(sql) def aggregate(trades: List[Dict[str, Any]], step: float) -> List[Dict[str, Any]]: bins: Dict[float, Dict[str, Any]] = {} for t in trades: lev = float(t.get("lev_raw") or 0.0) b = bin_leverage(lev, step) s = bins.setdefault(b, { "bin_lo": b, "bin_hi": b + step, "n": 0, "wins": 0, "net_pnl": 0.0, "net_pnl_pct": 0.0, }) s["n"] += 1 if int(t.get("is_win", 0)): s["wins"] += 1 s["net_pnl"] += float(t.get("pnl") or 0.0) s["net_pnl_pct"] += float(t.get("pnl_pct") or 0.0) out = [] for b, s in sorted(bins.items()): n = max(1, s["n"]) out.append({ "bin": f"{s['bin_lo']:.2f}–{s['bin_hi']:.2f}x", "n": s["n"], "wr_pct": round(100.0 * s["wins"] / n, 2), "avg_pnl_pct": round(s["net_pnl_pct"] / n * 100.0, 3), "net_pnl": round(s["net_pnl"], 2), }) return out def render_markdown(rows: List[Dict[str, Any]], strategy: str, since: str, until: Optional[str], step: float, total: int) -> str: hdr = ( "# Leverage → Winrate / Net-PnL Analysis\n\n" f"- Strategy: `{strategy}`\n" f"- Window: `{since}` → `{until or 'now'}`\n" f"- Leverage bin step: `{step}x`\n" f"- Total trades in scope: `{total}`\n" f"- Generated: `{datetime.utcnow().isoformat(timespec='seconds')}Z`\n\n" "## Per-bin aggregates\n\n" "| Leverage bin | Trades | WR % | Avg PnL % | Net PnL |\n" "|---|---:|---:|---:|---:|\n" ) body = "\n".join( f"| {r['bin']} | {r['n']} | {r['wr_pct']} | {r['avg_pnl_pct']} | {r['net_pnl']} |" for r in rows ) decision = ( "\n\n## Decision rubric (copy into the orchestrator comment block)\n\n" "- Higher-lev bins show clearly better net-PnL w/ tight sample error → **Option 2** (banker's round, min=1)\n" "- Flat / noisy signal across bins → **Option 1** (round-half-up, min=1)\n" "- Higher-lev bins show WORSE net-PnL / mixed → **stay at 1x** (current default)\n\n" "_Flipping the rule is a follow-up PR — this script is read-only._\n" ) return hdr + body + decision def main() -> int: ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) ap.add_argument("--strategy", default="blue", help="trade_events.strategy value to filter on (default: blue — richest historical set)") ap.add_argument("--since", default="2026-01-01", help="inclusive start date (YYYY-MM-DD)") ap.add_argument("--until", default=None, help="inclusive end date (YYYY-MM-DD); default = now") ap.add_argument("--step", type=float, default=1.0, help="leverage bin width (e.g. 0.5 or 1.0)") ap.add_argument("--out", default="prod/docs/LEVERAGE_WINRATE_REPORT.md", help="output report path (markdown)") args = ap.parse_args() trades = load_trades(args.strategy, args.since, args.until) if not trades: print(f"No trades matched (strategy={args.strategy}, since={args.since}).", file=sys.stderr) return 1 rows = aggregate(trades, args.step) md = render_markdown(rows, args.strategy, args.since, args.until, args.step, total=len(trades)) out_path = Path(args.out) out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text(md) print(f"Wrote {out_path} ({len(rows)} bins, {len(trades)} trades)") return 0 if __name__ == "__main__": sys.exit(main())