Files
siloqy/prod/ch_writer.py
Codex d280407327 PINK Phases 1-4: E-anchored capital, atomic snapshot, sizer feedback, kernel hardening
Phase 1: account.py anchor_to_exchange, capital_source provenance, settle
includes fees in capital delta.
Phase 2: atomic snapshot swap, CH provenance DDL (08_provenance.sql),
naive-UTC timestamps, ch_writer wait_for_async_insert=1 for all tables,
head-of-line stuck-row logging at WARNING per 100 attempts.
Phase 3: sizer feedback uses slot realized_pnl (not capital delta),
FILL_SETTLED repairs slot-level PnL for price-less exit legs.
Phase 4: resolve_slot returns Option<usize>, UNRESOLVED_SLOT diagnostic.
bars_held clamped to max(0, ...) at row-build time.
2026-06-11 21:44:24 +02:00

594 lines
22 KiB
Python

"""
ch_writer.py — Dolphin ClickHouse writer with durable local spool.
All inserts are fire-and-forget from the caller's perspective.
Rows are first written to a local SQLite spool on durable storage and
then replayed to ClickHouse in batches by a background thread.
This makes ClickHouse the historical sink instead of a best-effort cache:
- live services do not block on CH availability
- rows survive process restarts
- CH outages no longer drop history silently
Usage:
from ch_writer import ch_put
ch_put("eigen_scans", {"ts": int(time.time() * 1e6), "scan_number": n, ...})
Environment overrides:
CH_URL — default: http://localhost:8123
CH_USER — default: dolphin
CH_PASS — default: dolphin_ch_2026
CH_DB — default: dolphin
CH_SPOOL_DIR — default: /mnt/dolphin_training/ch_spool
CH_BATCH_SIZE — default: 250
CH_FLUSH_INTERVAL_S — default: 1.0
CH_MAINTENANCE_INTERVAL_S — default: 300
CH_WAL_TRUNCATE_BYTES — default: 67108864 (64 MiB)
CH_VACUUM_MIN_BYTES — default: 536870912 (512 MiB)
CH_VACUUM_MIN_FREE_RATIO — default: 1.25
CH_VACUUM_MIN_FREE_BYTES — default: 134217728 (128 MiB)
"""
from __future__ import annotations
import json
import logging
import os
import random
import shutil
import sqlite3
import struct
import threading
import time
import urllib.parse
import urllib.request
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, Iterable, List, Tuple
log = logging.getLogger("ch_writer")
CH_URL = os.environ.get("CH_URL", "http://localhost:8123")
CH_USER = os.environ.get("CH_USER", "dolphin")
CH_PASS = os.environ.get("CH_PASS", "dolphin_ch_2026")
CH_DB = os.environ.get("CH_DB", "dolphin")
CH_SPOOL_DIR = Path(os.environ.get("CH_SPOOL_DIR", "/mnt/dolphin_training/ch_spool"))
CH_BATCH_SIZE = int(os.environ.get("CH_BATCH_SIZE", "250"))
CH_FLUSH_INTERVAL_S = float(os.environ.get("CH_FLUSH_INTERVAL_S", "1.0"))
CH_MAINTENANCE_INTERVAL_S = float(os.environ.get("CH_MAINTENANCE_INTERVAL_S", "300"))
CH_WAL_TRUNCATE_BYTES = int(os.environ.get("CH_WAL_TRUNCATE_BYTES", str(64 * 1024 * 1024)))
CH_VACUUM_MIN_BYTES = int(os.environ.get("CH_VACUUM_MIN_BYTES", str(512 * 1024 * 1024)))
CH_VACUUM_MIN_FREE_RATIO = float(os.environ.get("CH_VACUUM_MIN_FREE_RATIO", "1.25"))
CH_VACUUM_MIN_FREE_BYTES = int(os.environ.get("CH_VACUUM_MIN_FREE_BYTES", str(128 * 1024 * 1024)))
# ─── Timestamp helpers ────────────────────────────────────────────────────────
def ts_us() -> int:
"""Current UTC time as microseconds — for DateTime64(6) fields."""
return int(time.time() * 1_000_000)
def ts_ms() -> int:
"""Current UTC time as milliseconds — for DateTime64(3) fields."""
return int(time.time() * 1_000)
# ─── UUIDv7 — time-ordered distributed trace ID ───────────────────────────────
def uuid7() -> str:
"""
Generate a UUIDv7 — RFC 9562 time-ordered UUID.
"""
ts_ms_val = int(time.time() * 1_000)
rand_a = random.getrandbits(12)
rand_b = random.getrandbits(62)
hi = (ts_ms_val << 16) | 0x7000 | rand_a
lo = (0b10 << 62) | rand_b
b = struct.pack(">QQ", hi, lo)
return (
f"{b[0:4].hex()}-{b[4:6].hex()}-"
f"{b[6:8].hex()}-{b[8:10].hex()}-{b[10:16].hex()}"
)
def _ensure_dir(path: Path) -> Path:
path.mkdir(parents=True, exist_ok=True)
return path
def _resolve_spool_dir(db: str) -> Path:
"""
Choose a durable spool directory.
Prefer the configured CH spool root; if it is unavailable, fall back to /tmp.
"""
roots = [CH_SPOOL_DIR, Path("/tmp") / "dolphin_ch_spool"]
for root in roots:
try:
path = _ensure_dir(root / db)
test = path / ".write_test"
test.write_text("ok")
test.unlink(missing_ok=True)
return path
except Exception:
continue
# Last resort: current working directory. This should be rare.
return _ensure_dir(Path.cwd() / "ch_spool" / db)
class _CHWriter:
"""
Durable ClickHouse writer.
Ingress:
- put() synchronously appends a row to a local SQLite spool.
- the caller never blocks on network IO.
Egress:
- a background thread drains unsent rows to ClickHouse.
- rows are deleted from the spool only after a successful CH insert.
"""
def __init__(
self,
flush_interval_s: float = CH_FLUSH_INTERVAL_S,
batch_size: int = CH_BATCH_SIZE,
db: str = CH_DB,
start_thread: bool = True,
maintenance_interval_s: float = CH_MAINTENANCE_INTERVAL_S,
wal_truncate_bytes: int = CH_WAL_TRUNCATE_BYTES,
vacuum_min_bytes: int = CH_VACUUM_MIN_BYTES,
vacuum_min_free_ratio: float = CH_VACUUM_MIN_FREE_RATIO,
vacuum_min_free_bytes: int = CH_VACUUM_MIN_FREE_BYTES,
) -> None:
self._interval = max(0.1, float(flush_interval_s))
self._batch_size = max(1, int(batch_size))
self._db = db
self._spool_dir = _resolve_spool_dir(db)
self._db_path = self._spool_dir / "writer.sqlite3"
self._lock = threading.RLock()
self._wake = threading.Event()
self._stop = False
self._dropped = 0
self._failed_local_writes = 0
self._flush_errors = 0
self._maintenance_interval_s = max(0.0, float(maintenance_interval_s))
self._wal_truncate_bytes = max(0, int(wal_truncate_bytes))
self._vacuum_min_bytes = max(0, int(vacuum_min_bytes))
self._vacuum_min_free_ratio = max(0.0, float(vacuum_min_free_ratio))
self._vacuum_min_free_bytes = max(0, int(vacuum_min_free_bytes))
self._last_maintenance_ts = 0.0
self._conn = self._open_db()
self._t = threading.Thread(
target=self._run, daemon=True, name=f"ch-writer-{db}"
)
if start_thread:
self._t.start()
log.info(
"ch_writer[%s]: durable spool at %s (batch=%d interval=%.1fs)",
db,
self._db_path,
self._batch_size,
self._interval,
)
def _open_db(self) -> sqlite3.Connection:
conn = sqlite3.connect(
self._db_path,
timeout=0.05, # 50ms — fail fast so put() never stalls the main scan loop
isolation_level=None,
check_same_thread=False,
)
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA synchronous=NORMAL;")
conn.execute("PRAGMA temp_store=MEMORY;")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
payload TEXT NOT NULL,
created_ts_us INTEGER NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_queue_id ON queue(id)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_queue_table ON queue(table_name, id)"
)
return conn
_PUT_LOCK_TIMEOUT_S: float = 0.1 # max wait before dropping the row
def put(self, table: str, row: dict) -> None:
"""Persist a row locally and wake the replay loop."""
try:
payload = json.dumps(row, separators=(",", ":"), ensure_ascii=False)
if not self._lock.acquire(timeout=self._PUT_LOCK_TIMEOUT_S):
self._dropped += 1
log.warning(
"ch_writer[%s]: spool lock busy, dropping telemetry row "
"(total_dropped=%d table=%s) — background thread may be in VACUUM/WAL",
self._db,
self._dropped,
table,
)
return
try:
self._conn.execute(
"INSERT INTO queue(table_name, payload, created_ts_us) VALUES (?, ?, ?)",
(table, payload, ts_us()),
)
finally:
self._lock.release()
self._wake.set()
except Exception as exc:
self._failed_local_writes += 1
log.exception(
"ch_writer[%s]: local spool write failed (%d): %s",
self._db,
self._failed_local_writes,
exc,
)
def _fetch_batch(self) -> List[Tuple[int, str, dict]]:
with self._lock:
cur = self._conn.execute(
"SELECT id, table_name, payload FROM queue ORDER BY id LIMIT ?",
(self._batch_size,),
)
rows = cur.fetchall()
batch: List[Tuple[int, str, dict]] = []
for row_id, table_name, payload in rows:
try:
batch.append((int(row_id), str(table_name), json.loads(payload)))
except Exception as exc:
log.exception(
"ch_writer[%s]: invalid spool row id=%s skipped: %s",
self._db,
row_id,
exc,
)
with self._lock:
self._conn.execute("DELETE FROM queue WHERE id=?", (row_id,))
return batch
def _delete_ids(self, ids: Iterable[int]) -> None:
ids = list(ids)
if not ids:
return
with self._lock:
self._conn.executemany(
"DELETE FROM queue WHERE id=?",
[(int(_id),) for _id in ids],
)
def _bump_attempts(self, ids: Iterable[int]) -> None:
ids = list(ids)
if not ids:
return
with self._lock:
cur = self._conn.execute(
"SELECT id, attempts FROM queue WHERE id IN (%s)" % ",".join("?" for _ in ids),
[int(_id) for _ in ids],
)
high_attempts = [(row[0], int(row[1]) + 1) for row in cur.fetchall() if int(row[1]) >= 1000]
self._conn.executemany(
"UPDATE queue SET attempts = attempts + 1 WHERE id=?",
[(int(_id),) for _id in ids],
)
for row_id, attempt in high_attempts:
if attempt % 100 == 0:
log.warning(
"CH flush: row id=%s stuck after %d attempts — "
"potential head-of-line blocking (table may be missing or schema changed)",
row_id, attempt,
)
def _queue_count(self) -> int:
with self._lock:
row = self._conn.execute("SELECT count(*) FROM queue").fetchone()
return int(row[0]) if row else 0
def _file_size(self, path: Path) -> int:
try:
return path.stat().st_size
except FileNotFoundError:
return 0
except Exception as exc:
log.debug("ch_writer[%s]: size probe failed for %s: %s", self._db, path, exc)
return 0
def _checkpoint_wal(self) -> bool:
# Uses a dedicated connection so WAL checkpoint does not hold self._lock.
try:
ckpt_conn = sqlite3.connect(
self._db_path, timeout=5.0, isolation_level=None, check_same_thread=False
)
try:
ckpt_conn.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchall()
finally:
ckpt_conn.close()
return True
except Exception as exc:
log.warning("ch_writer[%s]: WAL checkpoint/truncate failed: %s", self._db, exc)
return False
def _has_vacuum_free_space(self, db_size: int) -> bool:
try:
usage = shutil.disk_usage(self._spool_dir)
except Exception as exc:
log.warning("ch_writer[%s]: vacuum free-space probe failed: %s", self._db, exc)
return False
required = int(db_size * self._vacuum_min_free_ratio) + self._vacuum_min_free_bytes
if usage.free < required:
log.warning(
"ch_writer[%s]: vacuum skipped; free=%d required=%d db_size=%d",
self._db,
usage.free,
required,
db_size,
)
return False
return True
def _vacuum_spool(self) -> bool:
db_size = self._file_size(self._db_path)
if db_size < self._vacuum_min_bytes:
return False
if self._queue_count() != 0:
return False
if not self._has_vacuum_free_space(db_size):
return False
try:
before = db_size + self._file_size(Path(str(self._db_path) + "-wal"))
# Open a separate connection so VACUUM does not hold self._lock.
# VACUUM in WAL mode is safe to run from a second connection while
# the main connection is idle (queue_count==0 already confirmed above).
vacuum_conn = sqlite3.connect(
self._db_path, timeout=5.0, isolation_level=None, check_same_thread=False
)
try:
vacuum_conn.execute("VACUUM;")
vacuum_conn.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchall()
finally:
vacuum_conn.close()
after = self._file_size(self._db_path) + self._file_size(Path(str(self._db_path) + "-wal"))
log.info(
"ch_writer[%s]: vacuumed empty spool bytes=%d->%d",
self._db,
before,
after,
)
return True
except Exception as exc:
log.warning("ch_writer[%s]: vacuum failed: %s", self._db, exc)
return False
def _maybe_maintain_spool(self, *, force: bool = False) -> dict:
now = time.time()
status = {"ran": False, "checkpointed": False, "vacuumed": False}
if not force and self._maintenance_interval_s > 0:
if now - self._last_maintenance_ts < self._maintenance_interval_s:
return status
self._last_maintenance_ts = now
try:
if self._queue_count() != 0:
return status
status["ran"] = True
wal_path = Path(str(self._db_path) + "-wal")
if self._file_size(wal_path) >= self._wal_truncate_bytes:
status["checkpointed"] = self._checkpoint_wal()
status["vacuumed"] = self._vacuum_spool()
except Exception as exc:
log.warning("ch_writer[%s]: spool maintenance failed: %s", self._db, exc)
return status
def _post_rows(self, table: str, rows: List[dict]) -> bool:
body = "\n".join(json.dumps(r, separators=(",", ":"), ensure_ascii=False) for r in rows).encode()
wait_for_async_insert = "1" # Phase 2: ALL accounting tables must be durable (loss prevention)
url = (
f"{CH_URL}/?database={self._db}"
f"&query=INSERT+INTO+{table}+FORMAT+JSONEachRow"
f"&async_insert=1&wait_for_async_insert={wait_for_async_insert}"
# best_effort: some writers emit timezone-aware isoformat ts
# ("…+00:00"); strict parsing rejects those rows forever and
# head-of-line-blocks the spool (2026-06-11 disk-fill incident).
f"&date_time_input_format=best_effort"
)
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("X-ClickHouse-User", CH_USER)
req.add_header("X-ClickHouse-Key", CH_PASS)
req.add_header("Content-Type", "application/octet-stream")
try:
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status in (200, 201)
except Exception as exc:
# WARNING, not DEBUG: a permanently failing table head-of-line
# blocks the whole spool; at DEBUG that ran silent for 17 days
# (2026-05-25 → 06-11) while the WAL grew to 11.7 GB.
self._flush_errors += 1
if self._flush_errors <= 10 or self._flush_errors % 100 == 0:
log.warning(
"CH flush error [%s] (count=%d): %s", table, self._flush_errors, exc
)
else:
log.debug("CH flush error [%s]: %s", table, exc)
return False
def _query_lines(self, sql: str, timeout_s: float = 5.0) -> List[str]:
encoded = urllib.parse.quote_plus(sql)
url = f"{CH_URL}/?database={self._db}&query={encoded}"
req = urllib.request.Request(url, method="POST")
req.add_header("X-ClickHouse-User", CH_USER)
req.add_header("X-ClickHouse-Key", CH_PASS)
req.add_header("Content-Type", "text/plain; charset=utf-8")
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
raw = resp.read().decode("utf-8", errors="replace")
return [line for line in raw.splitlines() if line]
def _existing_trade_keys(self, rows: List[dict]) -> set[Tuple[str, int]]:
trade_ids: List[str] = []
for row in rows:
trade_id = row.get("trade_id")
if trade_id is None:
continue
tid = str(trade_id).strip()
if tid:
trade_ids.append(tid)
if not trade_ids:
return set()
unique = sorted(set(trade_ids))
existing: set[Tuple[str, int]] = set()
chunk_size = 200
for i in range(0, len(unique), chunk_size):
chunk = unique[i : i + chunk_size]
quoted = ",".join("'" + tid.replace("'", "''") + "'" for tid in chunk)
sql = (
"SELECT trade_id, toInt64(toUnixTimestamp64Micro(ts)) "
f"FROM trade_events WHERE trade_id IN ({quoted}) FORMAT TSV"
)
try:
lines = self._query_lines(sql)
except Exception as exc:
log.debug("CH dedupe probe failed [trade_events]: %s", exc)
# Fail open to keep trading sink non-blocking.
return set()
for line in lines:
parts = line.split("\t", 1)
if len(parts) != 2:
continue
tid, ts_us_s = parts
try:
existing.add((tid, int(ts_us_s)))
except Exception:
continue
return existing
def flush_once(self) -> int:
"""
Drain a single batch from the local spool.
Returns the number of rows successfully delivered to ClickHouse.
"""
batch = self._fetch_batch()
if not batch:
self._maybe_maintain_spool()
return 0
grouped: Dict[str, List[Tuple[int, dict]]] = defaultdict(list)
for row_id, table_name, payload in batch:
grouped[table_name].append((row_id, payload))
delivered = 0
for table, items in grouped.items():
ids = [row_id for row_id, _ in items]
rows = [payload for _, payload in items]
if table == "trade_events":
existing = self._existing_trade_keys(rows)
if existing:
kept_ids: List[int] = []
kept_rows: List[dict] = []
duplicate_ids: List[int] = []
for row_id, payload in items:
tid = str(payload.get("trade_id", "")).strip()
try:
ts_us_val = int(payload.get("ts"))
except Exception:
ts_us_val = -1
if tid and ts_us_val >= 0 and (tid, ts_us_val) in existing:
duplicate_ids.append(row_id)
else:
kept_ids.append(row_id)
kept_rows.append(payload)
if duplicate_ids:
self._delete_ids(duplicate_ids)
log.warning(
"ch_writer[%s]: dropped %d duplicate trade_events rows by trade_id",
self._db,
len(duplicate_ids),
)
ids = kept_ids
rows = kept_rows
if not rows:
continue
ok = self._post_rows(table, rows)
if ok:
delivered += len(rows)
self._delete_ids(ids)
else:
self._bump_attempts(ids)
self._maybe_maintain_spool()
return delivered
def _run(self) -> None:
while not self._stop:
delivered = 0
try:
delivered = self.flush_once()
except Exception as exc:
log.debug("ch_writer[%s]: flush loop error: %s", self._db, exc)
if delivered <= 0:
self._wake.wait(timeout=self._interval)
self._wake.clear()
# ─── Module-level singletons ─────────────────────────────────────────────────
_writer = _CHWriter(db="dolphin")
_writer_green = _CHWriter(db="dolphin_green")
_writer_prodgreen = _CHWriter(db="dolphin_prodgreen")
_writer_pink = _CHWriter(db="dolphin_pink")
def ch_put(table: str, row: dict) -> None:
"""
Fire-and-forget insert into dolphin.<table> (BLUE environment).
"""
_writer.put(table, row)
def ch_put_green(table: str, row: dict) -> None:
"""
Fire-and-forget insert into dolphin_green.<table> (GREEN / NT TradingNode environment).
"""
_writer_green.put(table, row)
def ch_put_prodgreen(table: str, row: dict) -> None:
"""
Fire-and-forget insert into dolphin_prodgreen.<table> (PRODGREEN / Nautilus live).
"""
_writer_prodgreen.put(table, row)
def ch_put_pink(table: str, row: dict) -> None:
"""
Fire-and-forget insert into dolphin_pink.<table> (PINK / testnet execution).
"""
_writer_pink.put(table, row)
# ─── V7 decision journal (PINK §10 data volume / §37 routing) ────────────────
PINK_V7_JOURNAL_DB = "dolphin_pink"
V7_DECISION_TABLE = "v7_decision_events"
_writer_pink_v7 = _CHWriter(db=PINK_V7_JOURNAL_DB)
def ch_put_pink_v7(table: str, row: dict) -> None:
"""Fire-and-forget V7 decision event insert into dolphin_pink."""
_writer_pink_v7.put(table, row)
# ─── Data volume budgets (§10.3) ─────────────────────────────────────────────
PINK_CH_BUDGET_BYTES_DAY = 50 * 1024 * 1024 # 50 MB/day max
PINK_HZ_BUDGET_BYTES_DAY = 500 * 1024 * 1024 # 500 MB/day max