342 lines
13 KiB
Python
342 lines
13 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Recompute S6 bucket sizing coefficients from fresh `dolphin.trade_events`.
|
||
|
|
|
||
|
|
Regenerates:
|
||
|
|
- prod/configs/green_s6_table.yml (consumed by the GREEN orchestrator single-site)
|
||
|
|
- prod/docs/S6_COEFFICIENTS_REPORT_<YYYY-MM-DD>.md (human-readable summary)
|
||
|
|
|
||
|
|
Variance guard
|
||
|
|
──────────────
|
||
|
|
If any bucket's net-PnL moves more than `--variance-threshold` (default 20%)
|
||
|
|
relative to the prior pinned table, we STILL write the new YAML (so replay/debug
|
||
|
|
cycles remain reproducible) but also write a row to `dolphin.s6_recompute_log`
|
||
|
|
so a human can review before promoting to staging/prod.
|
||
|
|
|
||
|
|
Usage
|
||
|
|
─────
|
||
|
|
python prod/scripts/recompute_s6_coefficients.py \\
|
||
|
|
--strategy blue --since 2026-01-01 --min-trades-per-bucket 20
|
||
|
|
|
||
|
|
Intended to be driven by `prod/s6_recompute_flow.py` (Prefect) on a 30-day cadence.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import argparse
|
||
|
|
import json
|
||
|
|
import pickle
|
||
|
|
import sys
|
||
|
|
import urllib.parse
|
||
|
|
import urllib.request
|
||
|
|
from datetime import datetime
|
||
|
|
from pathlib import Path
|
||
|
|
from typing import Any, Dict, List, Optional
|
||
|
|
|
||
|
|
CH_URL = "http://localhost:8123/"
|
||
|
|
CH_HEADERS = {
|
||
|
|
"X-ClickHouse-User": "dolphin",
|
||
|
|
"X-ClickHouse-Key": "dolphin_ch_2026",
|
||
|
|
}
|
||
|
|
|
||
|
|
DEFAULT_BUCKET_PKL = Path("adaptive_exit/models/bucket_assignments.pkl")
|
||
|
|
DEFAULT_TABLE_YML = Path("prod/configs/green_s6_table.yml")
|
||
|
|
REPORT_DIR = Path("prod/docs")
|
||
|
|
|
||
|
|
|
||
|
|
def ch_query(sql: str) -> List[Dict[str, Any]]:
|
||
|
|
qs = urllib.parse.urlencode({"query": sql + " FORMAT JSONEachRow"})
|
||
|
|
req = urllib.request.Request(f"{CH_URL}?{qs}", method="GET")
|
||
|
|
for k, v in CH_HEADERS.items():
|
||
|
|
req.add_header(k, v)
|
||
|
|
with urllib.request.urlopen(req, timeout=60) as resp:
|
||
|
|
body = resp.read().decode("utf-8", errors="replace")
|
||
|
|
rows: List[Dict[str, Any]] = []
|
||
|
|
for line in body.splitlines():
|
||
|
|
line = line.strip()
|
||
|
|
if line:
|
||
|
|
rows.append(json.loads(line))
|
||
|
|
return rows
|
||
|
|
|
||
|
|
|
||
|
|
def ch_insert(table: str, row: Dict[str, Any], db: str = "dolphin") -> None:
|
||
|
|
"""Best-effort insert into `{db}.{table}` (JSONEachRow). Swallows errors."""
|
||
|
|
body = (json.dumps(row) + "\n").encode()
|
||
|
|
qs = urllib.parse.urlencode({
|
||
|
|
"database": db,
|
||
|
|
"query": f"INSERT INTO {table} FORMAT JSONEachRow",
|
||
|
|
})
|
||
|
|
req = urllib.request.Request(f"{CH_URL}?{qs}", data=body, method="POST")
|
||
|
|
for k, v in CH_HEADERS.items():
|
||
|
|
req.add_header(k, v)
|
||
|
|
try:
|
||
|
|
urllib.request.urlopen(req, timeout=10)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[s6_recompute] CH insert warning: {e}", file=sys.stderr)
|
||
|
|
|
||
|
|
|
||
|
|
def ensure_recompute_log() -> None:
|
||
|
|
ddl = (
|
||
|
|
"CREATE TABLE IF NOT EXISTS dolphin.s6_recompute_log ("
|
||
|
|
"ts DateTime64(6, 'UTC'),"
|
||
|
|
"ts_day Date MATERIALIZED toDate(ts),"
|
||
|
|
"bucket_id UInt8,"
|
||
|
|
"n_trades UInt32,"
|
||
|
|
"wr_pct Float32,"
|
||
|
|
"net_pnl Float64,"
|
||
|
|
"prior_net Nullable(Float64),"
|
||
|
|
"delta_pct Nullable(Float32),"
|
||
|
|
"new_mult Float32,"
|
||
|
|
"prior_mult Nullable(Float32),"
|
||
|
|
"flagged UInt8"
|
||
|
|
") ENGINE = MergeTree()"
|
||
|
|
" ORDER BY (ts_day, bucket_id, ts)"
|
||
|
|
" TTL ts_day + INTERVAL 365 DAY"
|
||
|
|
)
|
||
|
|
req = urllib.request.Request(CH_URL, data=ddl.encode(), method="POST")
|
||
|
|
for k, v in CH_HEADERS.items():
|
||
|
|
req.add_header(k, v)
|
||
|
|
try:
|
||
|
|
urllib.request.urlopen(req, timeout=10)
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[s6_recompute] could not ensure log table: {e}", file=sys.stderr)
|
||
|
|
|
||
|
|
|
||
|
|
def load_bucket_assignments(path: Path) -> Dict[str, int]:
|
||
|
|
with open(path, "rb") as f:
|
||
|
|
doc = pickle.load(f)
|
||
|
|
assignments = doc.get("assignments") if isinstance(doc, dict) else None
|
||
|
|
if not isinstance(assignments, dict):
|
||
|
|
raise RuntimeError(f"{path}: expected dict with 'assignments' key")
|
||
|
|
return {str(k): int(v) for k, v in assignments.items()}
|
||
|
|
|
||
|
|
|
||
|
|
def pull_trades(strategy: str, since: str, until: Optional[str]) -> List[Dict[str, Any]]:
|
||
|
|
where = [f"strategy = '{strategy}'", f"date >= toDate('{since}')"]
|
||
|
|
if until:
|
||
|
|
where.append(f"date <= toDate('{until}')")
|
||
|
|
where_sql = " AND ".join(where)
|
||
|
|
sql = (
|
||
|
|
"SELECT asset, pnl, pnl_pct, exit_reason "
|
||
|
|
f"FROM dolphin.trade_events WHERE {where_sql}"
|
||
|
|
)
|
||
|
|
return ch_query(sql)
|
||
|
|
|
||
|
|
|
||
|
|
def compute_bucket_stats(
|
||
|
|
trades: List[Dict[str, Any]],
|
||
|
|
assignments: Dict[str, int],
|
||
|
|
buckets: List[int],
|
||
|
|
min_trades: int,
|
||
|
|
) -> Dict[int, Dict[str, float]]:
|
||
|
|
stats: Dict[int, Dict[str, float]] = {b: {"n": 0, "wins": 0, "net_pnl": 0.0, "net_pnl_pct": 0.0} for b in buckets}
|
||
|
|
for t in trades:
|
||
|
|
asset = str(t.get("asset", ""))
|
||
|
|
b = assignments.get(asset)
|
||
|
|
if b is None or b not in stats:
|
||
|
|
continue
|
||
|
|
stats[b]["n"] += 1
|
||
|
|
stats[b]["wins"] += 1 if float(t.get("pnl_pct") or 0.0) > 0 else 0
|
||
|
|
stats[b]["net_pnl"] += float(t.get("pnl") or 0.0)
|
||
|
|
stats[b]["net_pnl_pct"] += float(t.get("pnl_pct") or 0.0)
|
||
|
|
|
||
|
|
out = {}
|
||
|
|
for b, s in stats.items():
|
||
|
|
n = s["n"] if s["n"] >= min_trades else s["n"]
|
||
|
|
if s["n"] == 0:
|
||
|
|
out[b] = {"n": 0, "wr": 0.0, "avg_pnl_pct": 0.0, "net_pnl": 0.0, "enough": False}
|
||
|
|
else:
|
||
|
|
out[b] = {
|
||
|
|
"n": s["n"],
|
||
|
|
"wr": s["wins"] / s["n"],
|
||
|
|
"avg_pnl_pct": s["net_pnl_pct"] / s["n"],
|
||
|
|
"net_pnl": s["net_pnl"],
|
||
|
|
"enough": s["n"] >= min_trades,
|
||
|
|
}
|
||
|
|
return out
|
||
|
|
|
||
|
|
|
||
|
|
def stats_to_multipliers(stats: Dict[int, Dict[str, float]]) -> Dict[int, float]:
|
||
|
|
"""Translate per-bucket aggregate outcomes into a size multiplier.
|
||
|
|
|
||
|
|
Conservative mapping (keeps the file a documented, deterministic function —
|
||
|
|
not an ML output). Brackets match the S6 reference row in
|
||
|
|
prod/docs/CRITICAL_ASSET_PICKING_BRACKETS_VS._ROI_WR_AT_TRADES.md.
|
||
|
|
|
||
|
|
avg_pnl_pct > 0.015 → 2.0 (lean in)
|
||
|
|
avg_pnl_pct > 0.005 → 1.5
|
||
|
|
avg_pnl_pct > 0.001 → 1.0 (omit from YAML — no-op)
|
||
|
|
avg_pnl_pct > 0.000 → 0.5
|
||
|
|
avg_pnl_pct > -0.002 → 0.3
|
||
|
|
avg_pnl_pct > -0.005 → 0.0 (ban candidate — see `asset_bucket_ban_set`)
|
||
|
|
otherwise → 0.0
|
||
|
|
"""
|
||
|
|
out: Dict[int, float] = {}
|
||
|
|
for b, s in stats.items():
|
||
|
|
if not s.get("enough"):
|
||
|
|
continue # skip thin buckets — default to no-op (absent = 1.0x)
|
||
|
|
apct = s["avg_pnl_pct"]
|
||
|
|
if apct > 0.015: out[b] = 2.0
|
||
|
|
elif apct > 0.005: out[b] = 1.5
|
||
|
|
elif apct > 0.001: pass # no-op (1.0x)
|
||
|
|
elif apct > 0.000: out[b] = 0.5
|
||
|
|
elif apct > -0.002: out[b] = 0.3
|
||
|
|
else: out[b] = 0.0
|
||
|
|
return out
|
||
|
|
|
||
|
|
|
||
|
|
def load_prior_table(path: Path) -> Dict[int, float]:
|
||
|
|
if not path.exists():
|
||
|
|
return {}
|
||
|
|
try:
|
||
|
|
import yaml
|
||
|
|
doc = yaml.safe_load(path.read_text()) or {}
|
||
|
|
buckets = doc.get("buckets") or {}
|
||
|
|
return {int(k): float(v) for k, v in buckets.items()}
|
||
|
|
except Exception as e:
|
||
|
|
print(f"[s6_recompute] prior table load warning: {e}", file=sys.stderr)
|
||
|
|
return {}
|
||
|
|
|
||
|
|
|
||
|
|
def write_table_yml(path: Path, buckets: Dict[int, float], meta: Dict[str, Any]) -> None:
|
||
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
lines = [
|
||
|
|
"# GREEN S6 bucket sizing table — auto-generated, do not edit by hand.",
|
||
|
|
"# Consumed by nautilus_dolphin/nautilus_dolphin/nautilus/esf_alpha_orchestrator.py",
|
||
|
|
"# at the single-site notional application.",
|
||
|
|
"",
|
||
|
|
"meta:",
|
||
|
|
f" generated_at: \"{meta['generated_at']}\"",
|
||
|
|
f" source_branch: \"{meta.get('source_branch', '')}\"",
|
||
|
|
f" strategy: \"{meta['strategy']}\"",
|
||
|
|
f" n_trades: {meta['n_trades']}",
|
||
|
|
f" min_trades_per_bucket: {meta['min_trades']}",
|
||
|
|
"",
|
||
|
|
"buckets:",
|
||
|
|
]
|
||
|
|
for b in sorted(buckets):
|
||
|
|
lines.append(f" {b}: {buckets[b]:.2f}")
|
||
|
|
path.write_text("\n".join(lines) + "\n")
|
||
|
|
|
||
|
|
|
||
|
|
def write_report(path: Path, stats: Dict[int, Dict[str, float]],
|
||
|
|
new_table: Dict[int, float], prior_table: Dict[int, float],
|
||
|
|
meta: Dict[str, Any], flagged: List[int]) -> None:
|
||
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
||
|
|
lines = [
|
||
|
|
f"# S6 Coefficient Recompute — {meta['generated_at']}",
|
||
|
|
"",
|
||
|
|
f"- Strategy: `{meta['strategy']}`",
|
||
|
|
f"- Window: `{meta['since']}` → `{meta.get('until') or 'now'}`",
|
||
|
|
f"- Total trades: `{meta['n_trades']}`",
|
||
|
|
f"- Min trades per bucket: `{meta['min_trades']}`",
|
||
|
|
"",
|
||
|
|
"## Per-bucket stats",
|
||
|
|
"",
|
||
|
|
"| Bucket | Trades | WR % | Avg PnL % | Net PnL | New mult | Prior mult | Flagged |",
|
||
|
|
"|---:|---:|---:|---:|---:|---:|---:|:---:|",
|
||
|
|
]
|
||
|
|
for b in sorted(stats):
|
||
|
|
s = stats[b]
|
||
|
|
new = new_table.get(b, 1.0)
|
||
|
|
prior = prior_table.get(b, 1.0)
|
||
|
|
flag = "⚠" if b in flagged else ""
|
||
|
|
lines.append(
|
||
|
|
f"| {b} | {s['n']} | {100*s['wr']:.2f} | {100*s['avg_pnl_pct']:.3f} | "
|
||
|
|
f"{s['net_pnl']:.2f} | {new:.2f} | {prior:.2f} | {flag} |"
|
||
|
|
)
|
||
|
|
if flagged:
|
||
|
|
lines += [
|
||
|
|
"",
|
||
|
|
"## ⚠ Flagged for manual review",
|
||
|
|
"",
|
||
|
|
"The following buckets moved more than the variance threshold since the",
|
||
|
|
"prior pinned table. The new YAML was written but should be reviewed",
|
||
|
|
"before promotion to staging/prod — see `dolphin.s6_recompute_log`.",
|
||
|
|
"",
|
||
|
|
"| Bucket | Prior net | New net | Δ% |",
|
||
|
|
"|---:|---:|---:|---:|",
|
||
|
|
]
|
||
|
|
for b in flagged:
|
||
|
|
prior_net = prior_table.get(f"_net_{b}") # may be absent
|
||
|
|
lines.append(f"| {b} | — | {stats[b]['net_pnl']:.2f} | — |")
|
||
|
|
path.write_text("\n".join(lines) + "\n")
|
||
|
|
|
||
|
|
|
||
|
|
def main() -> int:
|
||
|
|
ap = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
|
||
|
|
ap.add_argument("--strategy", default="blue",
|
||
|
|
help="trade_events.strategy used for historical grounding (BLUE by default; richest dataset)")
|
||
|
|
ap.add_argument("--since", default="2026-01-01")
|
||
|
|
ap.add_argument("--until", default=None)
|
||
|
|
ap.add_argument("--min-trades-per-bucket", dest="min_trades", type=int, default=20)
|
||
|
|
ap.add_argument("--variance-threshold", type=float, default=0.20,
|
||
|
|
help="fraction move in bucket net-PnL that flags a bucket for review (default 0.20 = 20%%)")
|
||
|
|
ap.add_argument("--bucket-pkl", default=str(DEFAULT_BUCKET_PKL),
|
||
|
|
help="path to bucket_assignments.pkl")
|
||
|
|
ap.add_argument("--out-yml", default=str(DEFAULT_TABLE_YML))
|
||
|
|
ap.add_argument("--out-report", default=None,
|
||
|
|
help="report path (default: prod/docs/S6_COEFFICIENTS_REPORT_<date>.md)")
|
||
|
|
ap.add_argument("--source-branch", default="",
|
||
|
|
help="branch slug to stamp into the YAML frontmatter for traceability")
|
||
|
|
args = ap.parse_args()
|
||
|
|
|
||
|
|
assignments = load_bucket_assignments(Path(args.bucket_pkl))
|
||
|
|
buckets_seen = sorted(set(assignments.values()))
|
||
|
|
trades = pull_trades(args.strategy, args.since, args.until)
|
||
|
|
stats = compute_bucket_stats(trades, assignments, buckets_seen, args.min_trades)
|
||
|
|
new_table = stats_to_multipliers(stats)
|
||
|
|
|
||
|
|
prior_table = load_prior_table(Path(args.out_yml))
|
||
|
|
|
||
|
|
# Variance guard — flag buckets whose net-PnL moved more than the threshold.
|
||
|
|
flagged: List[int] = []
|
||
|
|
for b, s in stats.items():
|
||
|
|
if not s.get("enough"):
|
||
|
|
continue
|
||
|
|
prior_mult = prior_table.get(b)
|
||
|
|
new_mult = new_table.get(b, 1.0)
|
||
|
|
if prior_mult is None:
|
||
|
|
continue
|
||
|
|
denom = max(abs(prior_mult), 1e-6)
|
||
|
|
if abs(new_mult - prior_mult) / denom > args.variance_threshold:
|
||
|
|
flagged.append(b)
|
||
|
|
|
||
|
|
generated_at = datetime.utcnow().isoformat(timespec="seconds") + "Z"
|
||
|
|
meta = {
|
||
|
|
"generated_at": generated_at,
|
||
|
|
"source_branch": args.source_branch,
|
||
|
|
"strategy": args.strategy,
|
||
|
|
"since": args.since,
|
||
|
|
"until": args.until,
|
||
|
|
"n_trades": len(trades),
|
||
|
|
"min_trades": args.min_trades,
|
||
|
|
}
|
||
|
|
|
||
|
|
write_table_yml(Path(args.out_yml), new_table, meta)
|
||
|
|
|
||
|
|
out_report = Path(args.out_report) if args.out_report else REPORT_DIR / f"S6_COEFFICIENTS_REPORT_{generated_at[:10]}.md"
|
||
|
|
write_report(out_report, stats, new_table, prior_table, meta, flagged)
|
||
|
|
|
||
|
|
ensure_recompute_log()
|
||
|
|
for b in flagged:
|
||
|
|
ch_insert("s6_recompute_log", {
|
||
|
|
"ts": int(datetime.utcnow().timestamp() * 1_000_000),
|
||
|
|
"bucket_id": int(b),
|
||
|
|
"n_trades": int(stats[b]["n"]),
|
||
|
|
"wr_pct": float(100.0 * stats[b]["wr"]),
|
||
|
|
"net_pnl": float(stats[b]["net_pnl"]),
|
||
|
|
"prior_net": None,
|
||
|
|
"delta_pct": None,
|
||
|
|
"new_mult": float(new_table.get(b, 1.0)),
|
||
|
|
"prior_mult": float(prior_table.get(b, 1.0)),
|
||
|
|
"flagged": 1,
|
||
|
|
})
|
||
|
|
|
||
|
|
print(f"Wrote {args.out_yml} ({len(new_table)} active buckets, {len(flagged)} flagged)")
|
||
|
|
print(f"Wrote {out_report}")
|
||
|
|
return 0
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
sys.exit(main())
|