#!/usr/bin/env python3 """Apply the dolphin_violet DDL set — ONE statement per HTTP POST. ClickHouse's HTTP interface rejects multi-statement posts (proven on 2026-06-12 when prod/clickhouse/pink/08_provenance.sql failed as one body), so this applier splits each .sql file into single statements and posts them individually. Idempotent by construction: every statement in the set is CREATE ... IF NOT EXISTS. Usage: python3 prod/clickhouse/violet/apply_violet_ddl.py # apply + verify python3 prod/clickhouse/violet/apply_violet_ddl.py --dry-run # print statements python3 prod/clickhouse/violet/apply_violet_ddl.py --verify # verify only Exit codes: 0 = applied + verified; 1 = a statement failed or verify found missing tables. """ from __future__ import annotations import argparse import os import sys import urllib.parse import urllib.request from pathlib import Path from typing import Iterator, List, Tuple SQL_DIR = Path(__file__).resolve().parent EXPECTED_TABLES = { "policy_events", "trade_reconstruction", "trade_exit_legs", "position_state", "anomaly_events", "account_events", "status_snapshots", "trade_events", "v7_decision_events", "adaptive_exit_shadow", "fee_settled_events", "sc_bucket_gauge_shadow", "sc_threshold_advisor_shadow", "violet_feed_divergence", } def _strip_comments(sql: str) -> str: lines = [] for line in sql.splitlines(): if line.strip().startswith("--"): continue lines.append(line) return "\n".join(lines) def iter_statements(sql_dir: Path = SQL_DIR) -> Iterator[Tuple[str, str]]: """Yield (filename, statement) pairs in sorted file order.""" for path in sorted(sql_dir.glob("*.sql")): body = _strip_comments(path.read_text()) for stmt in body.split(";"): stmt = stmt.strip() if stmt: yield (path.name, stmt) def apply_statement(stmt: str, *, url: str, user: str, password: str) -> None: req = urllib.request.Request(url + "/", data=stmt.encode(), method="POST") req.add_header("X-ClickHouse-User", user) req.add_header("X-ClickHouse-Key", password) with urllib.request.urlopen(req, timeout=30) as resp: resp.read() def verify(*, url: str, user: str, password: str) -> List[str]: """Return the list of expected tables missing from dolphin_violet.""" q = urllib.parse.quote_plus("SHOW TABLES FROM dolphin_violet") req = urllib.request.Request(f"{url}/?query={q}", method="POST") req.add_header("X-ClickHouse-User", user) req.add_header("X-ClickHouse-Key", password) try: with urllib.request.urlopen(req, timeout=15) as resp: present = set(resp.read().decode().split()) except Exception as exc: # noqa: BLE001 — db may not exist yet print(f"verify: SHOW TABLES failed ({exc})") return sorted(EXPECTED_TABLES) return sorted(EXPECTED_TABLES - present) def main(argv: List[str] | None = None) -> int: ap = argparse.ArgumentParser(description="Apply dolphin_violet DDLs") ap.add_argument("--ch-url", default=os.environ.get("CH_URL", "http://localhost:8123")) ap.add_argument("--user", default=os.environ.get("CH_USER", "dolphin")) ap.add_argument("--password", default=os.environ.get("CH_PASS", "dolphin_ch_2026")) ap.add_argument("--dry-run", action="store_true") ap.add_argument("--verify", action="store_true", help="verify only, no apply") args = ap.parse_args(argv) if not args.verify: for fname, stmt in iter_statements(): head = stmt.splitlines()[0][:90] if args.dry_run: print(f"[dry-run] {fname}: {head}") continue try: apply_statement(stmt, url=args.ch_url, user=args.user, password=args.password) print(f"OK {fname}: {head}") except Exception as exc: # noqa: BLE001 print(f"FAIL {fname}: {head}\n {exc}") return 1 if args.dry_run: return 0 missing = verify(url=args.ch_url, user=args.user, password=args.password) if missing: print(f"verify: MISSING tables in dolphin_violet: {missing}") return 1 print(f"verify: all {len(EXPECTED_TABLES)} expected tables present") return 0 if __name__ == "__main__": raise SystemExit(main())