""" ch_writer.py — Dolphin ClickHouse writer with durable local spool. All inserts are fire-and-forget from the caller's perspective. Rows are first written to a local SQLite spool on durable storage and then replayed to ClickHouse in batches by a background thread. This makes ClickHouse the historical sink instead of a best-effort cache: - live services do not block on CH availability - rows survive process restarts - CH outages no longer drop history silently Usage: from ch_writer import ch_put ch_put("eigen_scans", {"ts": int(time.time() * 1e6), "scan_number": n, ...}) Environment overrides: CH_URL — default: http://localhost:8123 CH_USER — default: dolphin CH_PASS — default: dolphin_ch_2026 CH_DB — default: dolphin CH_SPOOL_DIR — default: /mnt/dolphin_training/ch_spool CH_BATCH_SIZE — default: 250 CH_FLUSH_INTERVAL_S — default: 1.0 CH_MAINTENANCE_INTERVAL_S — default: 300 CH_WAL_TRUNCATE_BYTES — default: 67108864 (64 MiB) CH_VACUUM_MIN_BYTES — default: 536870912 (512 MiB) CH_VACUUM_MIN_FREE_RATIO — default: 1.25 CH_VACUUM_MIN_FREE_BYTES — default: 134217728 (128 MiB) """ from __future__ import annotations import json import logging import os import random import shutil import sqlite3 import struct import threading import time import urllib.parse import urllib.request from collections import defaultdict from pathlib import Path from typing import Any, Dict, Iterable, List, Tuple log = logging.getLogger("ch_writer") CH_URL = os.environ.get("CH_URL", "http://localhost:8123") CH_USER = os.environ.get("CH_USER", "dolphin") CH_PASS = os.environ.get("CH_PASS", "dolphin_ch_2026") CH_DB = os.environ.get("CH_DB", "dolphin") CH_SPOOL_DIR = Path(os.environ.get("CH_SPOOL_DIR", "/mnt/dolphin_training/ch_spool")) CH_BATCH_SIZE = int(os.environ.get("CH_BATCH_SIZE", "250")) CH_FLUSH_INTERVAL_S = float(os.environ.get("CH_FLUSH_INTERVAL_S", "1.0")) CH_MAINTENANCE_INTERVAL_S = float(os.environ.get("CH_MAINTENANCE_INTERVAL_S", "300")) CH_WAL_TRUNCATE_BYTES = int(os.environ.get("CH_WAL_TRUNCATE_BYTES", str(64 * 1024 * 1024))) CH_VACUUM_MIN_BYTES = int(os.environ.get("CH_VACUUM_MIN_BYTES", str(512 * 1024 * 1024))) CH_VACUUM_MIN_FREE_RATIO = float(os.environ.get("CH_VACUUM_MIN_FREE_RATIO", "1.25")) CH_VACUUM_MIN_FREE_BYTES = int(os.environ.get("CH_VACUUM_MIN_FREE_BYTES", str(128 * 1024 * 1024))) # ─── Timestamp helpers ──────────────────────────────────────────────────────── def ts_us() -> int: """Current UTC time as microseconds — for DateTime64(6) fields.""" return int(time.time() * 1_000_000) def ts_ms() -> int: """Current UTC time as milliseconds — for DateTime64(3) fields.""" return int(time.time() * 1_000) # ─── UUIDv7 — time-ordered distributed trace ID ─────────────────────────────── def uuid7() -> str: """ Generate a UUIDv7 — RFC 9562 time-ordered UUID. """ ts_ms_val = int(time.time() * 1_000) rand_a = random.getrandbits(12) rand_b = random.getrandbits(62) hi = (ts_ms_val << 16) | 0x7000 | rand_a lo = (0b10 << 62) | rand_b b = struct.pack(">QQ", hi, lo) return ( f"{b[0:4].hex()}-{b[4:6].hex()}-" f"{b[6:8].hex()}-{b[8:10].hex()}-{b[10:16].hex()}" ) def _ensure_dir(path: Path) -> Path: path.mkdir(parents=True, exist_ok=True) return path def _resolve_spool_dir(db: str) -> Path: """ Choose a durable spool directory. Prefer the configured CH spool root; if it is unavailable, fall back to /tmp. """ roots = [CH_SPOOL_DIR, Path("/tmp") / "dolphin_ch_spool"] for root in roots: try: path = _ensure_dir(root / db) test = path / ".write_test" test.write_text("ok") test.unlink(missing_ok=True) return path except Exception: continue # Last resort: current working directory. This should be rare. return _ensure_dir(Path.cwd() / "ch_spool" / db) class _CHWriter: """ Durable ClickHouse writer. Ingress: - put() synchronously appends a row to a local SQLite spool. - the caller never blocks on network IO. Egress: - a background thread drains unsent rows to ClickHouse. - rows are deleted from the spool only after a successful CH insert. """ def __init__( self, flush_interval_s: float = CH_FLUSH_INTERVAL_S, batch_size: int = CH_BATCH_SIZE, db: str = CH_DB, start_thread: bool = True, maintenance_interval_s: float = CH_MAINTENANCE_INTERVAL_S, wal_truncate_bytes: int = CH_WAL_TRUNCATE_BYTES, vacuum_min_bytes: int = CH_VACUUM_MIN_BYTES, vacuum_min_free_ratio: float = CH_VACUUM_MIN_FREE_RATIO, vacuum_min_free_bytes: int = CH_VACUUM_MIN_FREE_BYTES, ) -> None: self._interval = max(0.1, float(flush_interval_s)) self._batch_size = max(1, int(batch_size)) self._db = db self._spool_dir = _resolve_spool_dir(db) self._db_path = self._spool_dir / "writer.sqlite3" self._lock = threading.RLock() self._wake = threading.Event() self._stop = False self._dropped = 0 self._failed_local_writes = 0 self._flush_errors = 0 self._maintenance_interval_s = max(0.0, float(maintenance_interval_s)) self._wal_truncate_bytes = max(0, int(wal_truncate_bytes)) self._vacuum_min_bytes = max(0, int(vacuum_min_bytes)) self._vacuum_min_free_ratio = max(0.0, float(vacuum_min_free_ratio)) self._vacuum_min_free_bytes = max(0, int(vacuum_min_free_bytes)) self._last_maintenance_ts = 0.0 self._conn = self._open_db() self._t = threading.Thread( target=self._run, daemon=True, name=f"ch-writer-{db}" ) if start_thread: self._t.start() log.info( "ch_writer[%s]: durable spool at %s (batch=%d interval=%.1fs)", db, self._db_path, self._batch_size, self._interval, ) def _open_db(self) -> sqlite3.Connection: conn = sqlite3.connect( self._db_path, timeout=0.05, # 50ms — fail fast so put() never stalls the main scan loop isolation_level=None, check_same_thread=False, ) conn.execute("PRAGMA journal_mode=WAL;") conn.execute("PRAGMA synchronous=NORMAL;") conn.execute("PRAGMA temp_store=MEMORY;") conn.execute( """ CREATE TABLE IF NOT EXISTS queue ( id INTEGER PRIMARY KEY AUTOINCREMENT, table_name TEXT NOT NULL, payload TEXT NOT NULL, created_ts_us INTEGER NOT NULL, attempts INTEGER NOT NULL DEFAULT 0 ) """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_queue_id ON queue(id)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_queue_table ON queue(table_name, id)" ) return conn _PUT_LOCK_TIMEOUT_S: float = 0.1 # max wait before dropping the row def put(self, table: str, row: dict) -> None: """Persist a row locally and wake the replay loop.""" try: payload = json.dumps(row, separators=(",", ":"), ensure_ascii=False) if not self._lock.acquire(timeout=self._PUT_LOCK_TIMEOUT_S): self._dropped += 1 log.warning( "ch_writer[%s]: spool lock busy, dropping telemetry row " "(total_dropped=%d table=%s) — background thread may be in VACUUM/WAL", self._db, self._dropped, table, ) return try: self._conn.execute( "INSERT INTO queue(table_name, payload, created_ts_us) VALUES (?, ?, ?)", (table, payload, ts_us()), ) finally: self._lock.release() self._wake.set() except Exception as exc: self._failed_local_writes += 1 log.exception( "ch_writer[%s]: local spool write failed (%d): %s", self._db, self._failed_local_writes, exc, ) def _fetch_batch(self) -> List[Tuple[int, str, dict]]: with self._lock: cur = self._conn.execute( "SELECT id, table_name, payload FROM queue ORDER BY id LIMIT ?", (self._batch_size,), ) rows = cur.fetchall() batch: List[Tuple[int, str, dict]] = [] for row_id, table_name, payload in rows: try: batch.append((int(row_id), str(table_name), json.loads(payload))) except Exception as exc: log.exception( "ch_writer[%s]: invalid spool row id=%s skipped: %s", self._db, row_id, exc, ) with self._lock: self._conn.execute("DELETE FROM queue WHERE id=?", (row_id,)) return batch def _delete_ids(self, ids: Iterable[int]) -> None: ids = list(ids) if not ids: return with self._lock: self._conn.executemany( "DELETE FROM queue WHERE id=?", [(int(_id),) for _id in ids], ) def _bump_attempts(self, ids: Iterable[int]) -> None: ids = list(ids) if not ids: return with self._lock: cur = self._conn.execute( "SELECT id, attempts FROM queue WHERE id IN (%s)" % ",".join("?" for _ in ids), [int(_id) for _ in ids], ) high_attempts = [(row[0], int(row[1]) + 1) for row in cur.fetchall() if int(row[1]) >= 1000] self._conn.executemany( "UPDATE queue SET attempts = attempts + 1 WHERE id=?", [(int(_id),) for _id in ids], ) for row_id, attempt in high_attempts: if attempt % 100 == 0: log.warning( "CH flush: row id=%s stuck after %d attempts — " "potential head-of-line blocking (table may be missing or schema changed)", row_id, attempt, ) def _queue_count(self) -> int: with self._lock: row = self._conn.execute("SELECT count(*) FROM queue").fetchone() return int(row[0]) if row else 0 def _file_size(self, path: Path) -> int: try: return path.stat().st_size except FileNotFoundError: return 0 except Exception as exc: log.debug("ch_writer[%s]: size probe failed for %s: %s", self._db, path, exc) return 0 def _checkpoint_wal(self) -> bool: # Uses a dedicated connection so WAL checkpoint does not hold self._lock. try: ckpt_conn = sqlite3.connect( self._db_path, timeout=5.0, isolation_level=None, check_same_thread=False ) try: ckpt_conn.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchall() finally: ckpt_conn.close() return True except Exception as exc: log.warning("ch_writer[%s]: WAL checkpoint/truncate failed: %s", self._db, exc) return False def _has_vacuum_free_space(self, db_size: int) -> bool: try: usage = shutil.disk_usage(self._spool_dir) except Exception as exc: log.warning("ch_writer[%s]: vacuum free-space probe failed: %s", self._db, exc) return False required = int(db_size * self._vacuum_min_free_ratio) + self._vacuum_min_free_bytes if usage.free < required: log.warning( "ch_writer[%s]: vacuum skipped; free=%d required=%d db_size=%d", self._db, usage.free, required, db_size, ) return False return True def _vacuum_spool(self) -> bool: db_size = self._file_size(self._db_path) if db_size < self._vacuum_min_bytes: return False if self._queue_count() != 0: return False if not self._has_vacuum_free_space(db_size): return False try: before = db_size + self._file_size(Path(str(self._db_path) + "-wal")) # Open a separate connection so VACUUM does not hold self._lock. # VACUUM in WAL mode is safe to run from a second connection while # the main connection is idle (queue_count==0 already confirmed above). vacuum_conn = sqlite3.connect( self._db_path, timeout=5.0, isolation_level=None, check_same_thread=False ) try: vacuum_conn.execute("VACUUM;") vacuum_conn.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchall() finally: vacuum_conn.close() after = self._file_size(self._db_path) + self._file_size(Path(str(self._db_path) + "-wal")) log.info( "ch_writer[%s]: vacuumed empty spool bytes=%d->%d", self._db, before, after, ) return True except Exception as exc: log.warning("ch_writer[%s]: vacuum failed: %s", self._db, exc) return False def _maybe_maintain_spool(self, *, force: bool = False) -> dict: now = time.time() status = {"ran": False, "checkpointed": False, "vacuumed": False} if not force and self._maintenance_interval_s > 0: if now - self._last_maintenance_ts < self._maintenance_interval_s: return status self._last_maintenance_ts = now try: if self._queue_count() != 0: return status status["ran"] = True wal_path = Path(str(self._db_path) + "-wal") if self._file_size(wal_path) >= self._wal_truncate_bytes: status["checkpointed"] = self._checkpoint_wal() status["vacuumed"] = self._vacuum_spool() except Exception as exc: log.warning("ch_writer[%s]: spool maintenance failed: %s", self._db, exc) return status def _post_rows(self, table: str, rows: List[dict]) -> bool: body = "\n".join(json.dumps(r, separators=(",", ":"), ensure_ascii=False) for r in rows).encode() wait_for_async_insert = "1" # Phase 2: ALL accounting tables must be durable (loss prevention) url = ( f"{CH_URL}/?database={self._db}" f"&query=INSERT+INTO+{table}+FORMAT+JSONEachRow" f"&async_insert=1&wait_for_async_insert={wait_for_async_insert}" # best_effort: some writers emit timezone-aware isoformat ts # ("…+00:00"); strict parsing rejects those rows forever and # head-of-line-blocks the spool (2026-06-11 disk-fill incident). f"&date_time_input_format=best_effort" ) req = urllib.request.Request(url, data=body, method="POST") req.add_header("X-ClickHouse-User", CH_USER) req.add_header("X-ClickHouse-Key", CH_PASS) req.add_header("Content-Type", "application/octet-stream") try: with urllib.request.urlopen(req, timeout=5) as resp: return resp.status in (200, 201) except Exception as exc: # WARNING, not DEBUG: a permanently failing table head-of-line # blocks the whole spool; at DEBUG that ran silent for 17 days # (2026-05-25 → 06-11) while the WAL grew to 11.7 GB. self._flush_errors += 1 if self._flush_errors <= 10 or self._flush_errors % 100 == 0: log.warning( "CH flush error [%s] (count=%d): %s", table, self._flush_errors, exc ) else: log.debug("CH flush error [%s]: %s", table, exc) return False def _query_lines(self, sql: str, timeout_s: float = 5.0) -> List[str]: encoded = urllib.parse.quote_plus(sql) url = f"{CH_URL}/?database={self._db}&query={encoded}" req = urllib.request.Request(url, method="POST") req.add_header("X-ClickHouse-User", CH_USER) req.add_header("X-ClickHouse-Key", CH_PASS) req.add_header("Content-Type", "text/plain; charset=utf-8") with urllib.request.urlopen(req, timeout=timeout_s) as resp: raw = resp.read().decode("utf-8", errors="replace") return [line for line in raw.splitlines() if line] def _existing_trade_keys(self, rows: List[dict]) -> set[Tuple[str, int]]: trade_ids: List[str] = [] for row in rows: trade_id = row.get("trade_id") if trade_id is None: continue tid = str(trade_id).strip() if tid: trade_ids.append(tid) if not trade_ids: return set() unique = sorted(set(trade_ids)) existing: set[Tuple[str, int]] = set() chunk_size = 200 for i in range(0, len(unique), chunk_size): chunk = unique[i : i + chunk_size] quoted = ",".join("'" + tid.replace("'", "''") + "'" for tid in chunk) sql = ( "SELECT trade_id, toInt64(toUnixTimestamp64Micro(ts)) " f"FROM trade_events WHERE trade_id IN ({quoted}) FORMAT TSV" ) try: lines = self._query_lines(sql) except Exception as exc: log.debug("CH dedupe probe failed [trade_events]: %s", exc) # Fail open to keep trading sink non-blocking. return set() for line in lines: parts = line.split("\t", 1) if len(parts) != 2: continue tid, ts_us_s = parts try: existing.add((tid, int(ts_us_s))) except Exception: continue return existing def flush_once(self) -> int: """ Drain a single batch from the local spool. Returns the number of rows successfully delivered to ClickHouse. """ batch = self._fetch_batch() if not batch: self._maybe_maintain_spool() return 0 grouped: Dict[str, List[Tuple[int, dict]]] = defaultdict(list) for row_id, table_name, payload in batch: grouped[table_name].append((row_id, payload)) delivered = 0 for table, items in grouped.items(): ids = [row_id for row_id, _ in items] rows = [payload for _, payload in items] if table == "trade_events": existing = self._existing_trade_keys(rows) if existing: kept_ids: List[int] = [] kept_rows: List[dict] = [] duplicate_ids: List[int] = [] for row_id, payload in items: tid = str(payload.get("trade_id", "")).strip() try: ts_us_val = int(payload.get("ts")) except Exception: ts_us_val = -1 if tid and ts_us_val >= 0 and (tid, ts_us_val) in existing: duplicate_ids.append(row_id) else: kept_ids.append(row_id) kept_rows.append(payload) if duplicate_ids: self._delete_ids(duplicate_ids) log.warning( "ch_writer[%s]: dropped %d duplicate trade_events rows by trade_id", self._db, len(duplicate_ids), ) ids = kept_ids rows = kept_rows if not rows: continue ok = self._post_rows(table, rows) if ok: delivered += len(rows) self._delete_ids(ids) else: self._bump_attempts(ids) self._maybe_maintain_spool() return delivered def _run(self) -> None: while not self._stop: delivered = 0 try: delivered = self.flush_once() except Exception as exc: log.debug("ch_writer[%s]: flush loop error: %s", self._db, exc) if delivered <= 0: self._wake.wait(timeout=self._interval) self._wake.clear() # ─── Module-level singletons ───────────────────────────────────────────────── _writer = _CHWriter(db="dolphin") _writer_green = _CHWriter(db="dolphin_green") _writer_prodgreen = _CHWriter(db="dolphin_prodgreen") _writer_pink = _CHWriter(db="dolphin_pink") _writer_violet = _CHWriter(db="dolphin_violet") def ch_put(table: str, row: dict) -> None: """ Fire-and-forget insert into dolphin. (BLUE environment). """ _writer.put(table, row) def ch_put_green(table: str, row: dict) -> None: """ Fire-and-forget insert into dolphin_green.
(GREEN / NT TradingNode environment). """ _writer_green.put(table, row) def ch_put_prodgreen(table: str, row: dict) -> None: """ Fire-and-forget insert into dolphin_prodgreen.
(PRODGREEN / Nautilus live). """ _writer_prodgreen.put(table, row) def ch_put_pink(table: str, row: dict) -> None: """ Fire-and-forget insert into dolphin_pink.
(PINK / testnet execution). """ _writer_pink.put(table, row) def ch_put_violet(table: str, row: dict) -> None: """ Fire-and-forget insert into dolphin_violet.
(VIOLET / observe-only rebuild runtime). Namespace isolation is a stage gate: VIOLET must never write to dolphin or dolphin_pink. """ _writer_violet.put(table, row) # ─── V7 decision journal (PINK §10 data volume / §37 routing) ──────────────── PINK_V7_JOURNAL_DB = "dolphin_pink" V7_DECISION_TABLE = "v7_decision_events" _writer_pink_v7 = _CHWriter(db=PINK_V7_JOURNAL_DB) def ch_put_pink_v7(table: str, row: dict) -> None: """Fire-and-forget V7 decision event insert into dolphin_pink.""" _writer_pink_v7.put(table, row) # ─── Data volume budgets (§10.3) ───────────────────────────────────────────── PINK_CH_BUDGET_BYTES_DAY = 50 * 1024 * 1024 # 50 MB/day max PINK_HZ_BUDGET_BYTES_DAY = 500 * 1024 * 1024 # 500 MB/day max