diff --git a/prod/ch_writer.py b/prod/ch_writer.py
new file mode 100644
index 0000000..d02493c
--- /dev/null
+++ b/prod/ch_writer.py
@@ -0,0 +1,593 @@
+"""
+ch_writer.py — Dolphin ClickHouse writer with durable local spool.
+
+All inserts are fire-and-forget from the caller's perspective.
+Rows are first written to a local SQLite spool on durable storage and
+then replayed to ClickHouse in batches by a background thread.
+
+This makes ClickHouse the historical sink instead of a best-effort cache:
+ - live services do not block on CH availability
+ - rows survive process restarts
+ - CH outages no longer drop history silently
+
+Usage:
+ from ch_writer import ch_put
+ ch_put("eigen_scans", {"ts": int(time.time() * 1e6), "scan_number": n, ...})
+
+Environment overrides:
+ CH_URL — default: http://localhost:8123
+ CH_USER — default: dolphin
+ CH_PASS — default: dolphin_ch_2026
+ CH_DB — default: dolphin
+ CH_SPOOL_DIR — default: /mnt/dolphin_training/ch_spool
+ CH_BATCH_SIZE — default: 250
+ CH_FLUSH_INTERVAL_S — default: 1.0
+ CH_MAINTENANCE_INTERVAL_S — default: 300
+ CH_WAL_TRUNCATE_BYTES — default: 67108864 (64 MiB)
+ CH_VACUUM_MIN_BYTES — default: 536870912 (512 MiB)
+ CH_VACUUM_MIN_FREE_RATIO — default: 1.25
+ CH_VACUUM_MIN_FREE_BYTES — default: 134217728 (128 MiB)
+"""
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import random
+import shutil
+import sqlite3
+import struct
+import threading
+import time
+import urllib.parse
+import urllib.request
+from collections import defaultdict
+from pathlib import Path
+from typing import Any, Dict, Iterable, List, Tuple
+
+log = logging.getLogger("ch_writer")
+
+CH_URL = os.environ.get("CH_URL", "http://localhost:8123")
+CH_USER = os.environ.get("CH_USER", "dolphin")
+CH_PASS = os.environ.get("CH_PASS", "dolphin_ch_2026")
+CH_DB = os.environ.get("CH_DB", "dolphin")
+CH_SPOOL_DIR = Path(os.environ.get("CH_SPOOL_DIR", "/mnt/dolphin_training/ch_spool"))
+CH_BATCH_SIZE = int(os.environ.get("CH_BATCH_SIZE", "250"))
+CH_FLUSH_INTERVAL_S = float(os.environ.get("CH_FLUSH_INTERVAL_S", "1.0"))
+CH_MAINTENANCE_INTERVAL_S = float(os.environ.get("CH_MAINTENANCE_INTERVAL_S", "300"))
+CH_WAL_TRUNCATE_BYTES = int(os.environ.get("CH_WAL_TRUNCATE_BYTES", str(64 * 1024 * 1024)))
+CH_VACUUM_MIN_BYTES = int(os.environ.get("CH_VACUUM_MIN_BYTES", str(512 * 1024 * 1024)))
+CH_VACUUM_MIN_FREE_RATIO = float(os.environ.get("CH_VACUUM_MIN_FREE_RATIO", "1.25"))
+CH_VACUUM_MIN_FREE_BYTES = int(os.environ.get("CH_VACUUM_MIN_FREE_BYTES", str(128 * 1024 * 1024)))
+
+
+# ─── Timestamp helpers ────────────────────────────────────────────────────────
+
+def ts_us() -> int:
+ """Current UTC time as microseconds — for DateTime64(6) fields."""
+ return int(time.time() * 1_000_000)
+
+
+def ts_ms() -> int:
+ """Current UTC time as milliseconds — for DateTime64(3) fields."""
+ return int(time.time() * 1_000)
+
+
+# ─── UUIDv7 — time-ordered distributed trace ID ───────────────────────────────
+
+def uuid7() -> str:
+ """
+ Generate a UUIDv7 — RFC 9562 time-ordered UUID.
+ """
+ ts_ms_val = int(time.time() * 1_000)
+ rand_a = random.getrandbits(12)
+ rand_b = random.getrandbits(62)
+
+ hi = (ts_ms_val << 16) | 0x7000 | rand_a
+ lo = (0b10 << 62) | rand_b
+
+ b = struct.pack(">QQ", hi, lo)
+ return (
+ f"{b[0:4].hex()}-{b[4:6].hex()}-"
+ f"{b[6:8].hex()}-{b[8:10].hex()}-{b[10:16].hex()}"
+ )
+
+
+def _ensure_dir(path: Path) -> Path:
+ path.mkdir(parents=True, exist_ok=True)
+ return path
+
+
+def _resolve_spool_dir(db: str) -> Path:
+ """
+ Choose a durable spool directory.
+
+ Prefer the configured CH spool root; if it is unavailable, fall back to /tmp.
+ """
+ roots = [CH_SPOOL_DIR, Path("/tmp") / "dolphin_ch_spool"]
+ for root in roots:
+ try:
+ path = _ensure_dir(root / db)
+ test = path / ".write_test"
+ test.write_text("ok")
+ test.unlink(missing_ok=True)
+ return path
+ except Exception:
+ continue
+ # Last resort: current working directory. This should be rare.
+ return _ensure_dir(Path.cwd() / "ch_spool" / db)
+
+
+class _CHWriter:
+ """
+ Durable ClickHouse writer.
+
+ Ingress:
+ - put() synchronously appends a row to a local SQLite spool.
+ - the caller never blocks on network IO.
+
+ Egress:
+ - a background thread drains unsent rows to ClickHouse.
+ - rows are deleted from the spool only after a successful CH insert.
+ """
+
+ def __init__(
+ self,
+ flush_interval_s: float = CH_FLUSH_INTERVAL_S,
+ batch_size: int = CH_BATCH_SIZE,
+ db: str = CH_DB,
+ start_thread: bool = True,
+ maintenance_interval_s: float = CH_MAINTENANCE_INTERVAL_S,
+ wal_truncate_bytes: int = CH_WAL_TRUNCATE_BYTES,
+ vacuum_min_bytes: int = CH_VACUUM_MIN_BYTES,
+ vacuum_min_free_ratio: float = CH_VACUUM_MIN_FREE_RATIO,
+ vacuum_min_free_bytes: int = CH_VACUUM_MIN_FREE_BYTES,
+ ) -> None:
+ self._interval = max(0.1, float(flush_interval_s))
+ self._batch_size = max(1, int(batch_size))
+ self._db = db
+ self._spool_dir = _resolve_spool_dir(db)
+ self._db_path = self._spool_dir / "writer.sqlite3"
+ self._lock = threading.RLock()
+ self._wake = threading.Event()
+ self._stop = False
+ self._dropped = 0
+ self._failed_local_writes = 0
+ self._flush_errors = 0
+ self._maintenance_interval_s = max(0.0, float(maintenance_interval_s))
+ self._wal_truncate_bytes = max(0, int(wal_truncate_bytes))
+ self._vacuum_min_bytes = max(0, int(vacuum_min_bytes))
+ self._vacuum_min_free_ratio = max(0.0, float(vacuum_min_free_ratio))
+ self._vacuum_min_free_bytes = max(0, int(vacuum_min_free_bytes))
+ self._last_maintenance_ts = 0.0
+ self._conn = self._open_db()
+ self._t = threading.Thread(
+ target=self._run, daemon=True, name=f"ch-writer-{db}"
+ )
+ if start_thread:
+ self._t.start()
+ log.info(
+ "ch_writer[%s]: durable spool at %s (batch=%d interval=%.1fs)",
+ db,
+ self._db_path,
+ self._batch_size,
+ self._interval,
+ )
+
+ def _open_db(self) -> sqlite3.Connection:
+ conn = sqlite3.connect(
+ self._db_path,
+ timeout=0.05, # 50ms — fail fast so put() never stalls the main scan loop
+ isolation_level=None,
+ check_same_thread=False,
+ )
+ conn.execute("PRAGMA journal_mode=WAL;")
+ conn.execute("PRAGMA synchronous=NORMAL;")
+ conn.execute("PRAGMA temp_store=MEMORY;")
+ conn.execute(
+ """
+ CREATE TABLE IF NOT EXISTS queue (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ table_name TEXT NOT NULL,
+ payload TEXT NOT NULL,
+ created_ts_us INTEGER NOT NULL,
+ attempts INTEGER NOT NULL DEFAULT 0
+ )
+ """
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_queue_id ON queue(id)"
+ )
+ conn.execute(
+ "CREATE INDEX IF NOT EXISTS idx_queue_table ON queue(table_name, id)"
+ )
+ return conn
+
+ _PUT_LOCK_TIMEOUT_S: float = 0.1 # max wait before dropping the row
+
+ def put(self, table: str, row: dict) -> None:
+ """Persist a row locally and wake the replay loop."""
+ try:
+ payload = json.dumps(row, separators=(",", ":"), ensure_ascii=False)
+ if not self._lock.acquire(timeout=self._PUT_LOCK_TIMEOUT_S):
+ self._dropped += 1
+ log.warning(
+ "ch_writer[%s]: spool lock busy, dropping telemetry row "
+ "(total_dropped=%d table=%s) — background thread may be in VACUUM/WAL",
+ self._db,
+ self._dropped,
+ table,
+ )
+ return
+ try:
+ self._conn.execute(
+ "INSERT INTO queue(table_name, payload, created_ts_us) VALUES (?, ?, ?)",
+ (table, payload, ts_us()),
+ )
+ finally:
+ self._lock.release()
+ self._wake.set()
+ except Exception as exc:
+ self._failed_local_writes += 1
+ log.exception(
+ "ch_writer[%s]: local spool write failed (%d): %s",
+ self._db,
+ self._failed_local_writes,
+ exc,
+ )
+
+ def _fetch_batch(self) -> List[Tuple[int, str, dict]]:
+ with self._lock:
+ cur = self._conn.execute(
+ "SELECT id, table_name, payload FROM queue ORDER BY id LIMIT ?",
+ (self._batch_size,),
+ )
+ rows = cur.fetchall()
+ batch: List[Tuple[int, str, dict]] = []
+ for row_id, table_name, payload in rows:
+ try:
+ batch.append((int(row_id), str(table_name), json.loads(payload)))
+ except Exception as exc:
+ log.exception(
+ "ch_writer[%s]: invalid spool row id=%s skipped: %s",
+ self._db,
+ row_id,
+ exc,
+ )
+ with self._lock:
+ self._conn.execute("DELETE FROM queue WHERE id=?", (row_id,))
+ return batch
+
+ def _delete_ids(self, ids: Iterable[int]) -> None:
+ ids = list(ids)
+ if not ids:
+ return
+ with self._lock:
+ self._conn.executemany(
+ "DELETE FROM queue WHERE id=?",
+ [(int(_id),) for _id in ids],
+ )
+
+ def _bump_attempts(self, ids: Iterable[int]) -> None:
+ ids = list(ids)
+ if not ids:
+ return
+ with self._lock:
+ cur = self._conn.execute(
+ "SELECT id, attempts FROM queue WHERE id IN (%s)" % ",".join("?" for _ in ids),
+ [int(_id) for _ in ids],
+ )
+ high_attempts = [(row[0], int(row[1]) + 1) for row in cur.fetchall() if int(row[1]) >= 1000]
+ self._conn.executemany(
+ "UPDATE queue SET attempts = attempts + 1 WHERE id=?",
+ [(int(_id),) for _id in ids],
+ )
+ for row_id, attempt in high_attempts:
+ if attempt % 100 == 0:
+ log.warning(
+ "CH flush: row id=%s stuck after %d attempts — "
+ "potential head-of-line blocking (table may be missing or schema changed)",
+ row_id, attempt,
+ )
+
+ def _queue_count(self) -> int:
+ with self._lock:
+ row = self._conn.execute("SELECT count(*) FROM queue").fetchone()
+ return int(row[0]) if row else 0
+
+ def _file_size(self, path: Path) -> int:
+ try:
+ return path.stat().st_size
+ except FileNotFoundError:
+ return 0
+ except Exception as exc:
+ log.debug("ch_writer[%s]: size probe failed for %s: %s", self._db, path, exc)
+ return 0
+
+ def _checkpoint_wal(self) -> bool:
+ # Uses a dedicated connection so WAL checkpoint does not hold self._lock.
+ try:
+ ckpt_conn = sqlite3.connect(
+ self._db_path, timeout=5.0, isolation_level=None, check_same_thread=False
+ )
+ try:
+ ckpt_conn.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchall()
+ finally:
+ ckpt_conn.close()
+ return True
+ except Exception as exc:
+ log.warning("ch_writer[%s]: WAL checkpoint/truncate failed: %s", self._db, exc)
+ return False
+
+ def _has_vacuum_free_space(self, db_size: int) -> bool:
+ try:
+ usage = shutil.disk_usage(self._spool_dir)
+ except Exception as exc:
+ log.warning("ch_writer[%s]: vacuum free-space probe failed: %s", self._db, exc)
+ return False
+ required = int(db_size * self._vacuum_min_free_ratio) + self._vacuum_min_free_bytes
+ if usage.free < required:
+ log.warning(
+ "ch_writer[%s]: vacuum skipped; free=%d required=%d db_size=%d",
+ self._db,
+ usage.free,
+ required,
+ db_size,
+ )
+ return False
+ return True
+
+ def _vacuum_spool(self) -> bool:
+ db_size = self._file_size(self._db_path)
+ if db_size < self._vacuum_min_bytes:
+ return False
+ if self._queue_count() != 0:
+ return False
+ if not self._has_vacuum_free_space(db_size):
+ return False
+ try:
+ before = db_size + self._file_size(Path(str(self._db_path) + "-wal"))
+ # Open a separate connection so VACUUM does not hold self._lock.
+ # VACUUM in WAL mode is safe to run from a second connection while
+ # the main connection is idle (queue_count==0 already confirmed above).
+ vacuum_conn = sqlite3.connect(
+ self._db_path, timeout=5.0, isolation_level=None, check_same_thread=False
+ )
+ try:
+ vacuum_conn.execute("VACUUM;")
+ vacuum_conn.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchall()
+ finally:
+ vacuum_conn.close()
+ after = self._file_size(self._db_path) + self._file_size(Path(str(self._db_path) + "-wal"))
+ log.info(
+ "ch_writer[%s]: vacuumed empty spool bytes=%d->%d",
+ self._db,
+ before,
+ after,
+ )
+ return True
+ except Exception as exc:
+ log.warning("ch_writer[%s]: vacuum failed: %s", self._db, exc)
+ return False
+
+ def _maybe_maintain_spool(self, *, force: bool = False) -> dict:
+ now = time.time()
+ status = {"ran": False, "checkpointed": False, "vacuumed": False}
+ if not force and self._maintenance_interval_s > 0:
+ if now - self._last_maintenance_ts < self._maintenance_interval_s:
+ return status
+ self._last_maintenance_ts = now
+ try:
+ if self._queue_count() != 0:
+ return status
+ status["ran"] = True
+ wal_path = Path(str(self._db_path) + "-wal")
+ if self._file_size(wal_path) >= self._wal_truncate_bytes:
+ status["checkpointed"] = self._checkpoint_wal()
+ status["vacuumed"] = self._vacuum_spool()
+ except Exception as exc:
+ log.warning("ch_writer[%s]: spool maintenance failed: %s", self._db, exc)
+ return status
+
+ def _post_rows(self, table: str, rows: List[dict]) -> bool:
+ body = "\n".join(json.dumps(r, separators=(",", ":"), ensure_ascii=False) for r in rows).encode()
+ wait_for_async_insert = "1" # Phase 2: ALL accounting tables must be durable (loss prevention)
+ url = (
+ f"{CH_URL}/?database={self._db}"
+ f"&query=INSERT+INTO+{table}+FORMAT+JSONEachRow"
+ f"&async_insert=1&wait_for_async_insert={wait_for_async_insert}"
+ # best_effort: some writers emit timezone-aware isoformat ts
+ # ("…+00:00"); strict parsing rejects those rows forever and
+ # head-of-line-blocks the spool (2026-06-11 disk-fill incident).
+ f"&date_time_input_format=best_effort"
+ )
+ req = urllib.request.Request(url, data=body, method="POST")
+ req.add_header("X-ClickHouse-User", CH_USER)
+ req.add_header("X-ClickHouse-Key", CH_PASS)
+ req.add_header("Content-Type", "application/octet-stream")
+ try:
+ with urllib.request.urlopen(req, timeout=5) as resp:
+ return resp.status in (200, 201)
+ except Exception as exc:
+ # WARNING, not DEBUG: a permanently failing table head-of-line
+ # blocks the whole spool; at DEBUG that ran silent for 17 days
+ # (2026-05-25 → 06-11) while the WAL grew to 11.7 GB.
+ self._flush_errors += 1
+ if self._flush_errors <= 10 or self._flush_errors % 100 == 0:
+ log.warning(
+ "CH flush error [%s] (count=%d): %s", table, self._flush_errors, exc
+ )
+ else:
+ log.debug("CH flush error [%s]: %s", table, exc)
+ return False
+
+ def _query_lines(self, sql: str, timeout_s: float = 5.0) -> List[str]:
+ encoded = urllib.parse.quote_plus(sql)
+ url = f"{CH_URL}/?database={self._db}&query={encoded}"
+ req = urllib.request.Request(url, method="POST")
+ req.add_header("X-ClickHouse-User", CH_USER)
+ req.add_header("X-ClickHouse-Key", CH_PASS)
+ req.add_header("Content-Type", "text/plain; charset=utf-8")
+ with urllib.request.urlopen(req, timeout=timeout_s) as resp:
+ raw = resp.read().decode("utf-8", errors="replace")
+ return [line for line in raw.splitlines() if line]
+
+ def _existing_trade_keys(self, rows: List[dict]) -> set[Tuple[str, int]]:
+ trade_ids: List[str] = []
+ for row in rows:
+ trade_id = row.get("trade_id")
+ if trade_id is None:
+ continue
+ tid = str(trade_id).strip()
+ if tid:
+ trade_ids.append(tid)
+
+ if not trade_ids:
+ return set()
+
+ unique = sorted(set(trade_ids))
+ existing: set[Tuple[str, int]] = set()
+ chunk_size = 200
+ for i in range(0, len(unique), chunk_size):
+ chunk = unique[i : i + chunk_size]
+ quoted = ",".join("'" + tid.replace("'", "''") + "'" for tid in chunk)
+ sql = (
+ "SELECT trade_id, toInt64(toUnixTimestamp64Micro(ts)) "
+ f"FROM trade_events WHERE trade_id IN ({quoted}) FORMAT TSV"
+ )
+ try:
+ lines = self._query_lines(sql)
+ except Exception as exc:
+ log.debug("CH dedupe probe failed [trade_events]: %s", exc)
+ # Fail open to keep trading sink non-blocking.
+ return set()
+ for line in lines:
+ parts = line.split("\t", 1)
+ if len(parts) != 2:
+ continue
+ tid, ts_us_s = parts
+ try:
+ existing.add((tid, int(ts_us_s)))
+ except Exception:
+ continue
+ return existing
+
+ def flush_once(self) -> int:
+ """
+ Drain a single batch from the local spool.
+
+ Returns the number of rows successfully delivered to ClickHouse.
+ """
+ batch = self._fetch_batch()
+ if not batch:
+ self._maybe_maintain_spool()
+ return 0
+
+ grouped: Dict[str, List[Tuple[int, dict]]] = defaultdict(list)
+ for row_id, table_name, payload in batch:
+ grouped[table_name].append((row_id, payload))
+
+ delivered = 0
+ for table, items in grouped.items():
+ ids = [row_id for row_id, _ in items]
+ rows = [payload for _, payload in items]
+ if table == "trade_events":
+ existing = self._existing_trade_keys(rows)
+ if existing:
+ kept_ids: List[int] = []
+ kept_rows: List[dict] = []
+ duplicate_ids: List[int] = []
+ for row_id, payload in items:
+ tid = str(payload.get("trade_id", "")).strip()
+ try:
+ ts_us_val = int(payload.get("ts"))
+ except Exception:
+ ts_us_val = -1
+ if tid and ts_us_val >= 0 and (tid, ts_us_val) in existing:
+ duplicate_ids.append(row_id)
+ else:
+ kept_ids.append(row_id)
+ kept_rows.append(payload)
+ if duplicate_ids:
+ self._delete_ids(duplicate_ids)
+ log.warning(
+ "ch_writer[%s]: dropped %d duplicate trade_events rows by trade_id",
+ self._db,
+ len(duplicate_ids),
+ )
+ ids = kept_ids
+ rows = kept_rows
+ if not rows:
+ continue
+ ok = self._post_rows(table, rows)
+ if ok:
+ delivered += len(rows)
+ self._delete_ids(ids)
+ else:
+ self._bump_attempts(ids)
+ self._maybe_maintain_spool()
+ return delivered
+
+ def _run(self) -> None:
+ while not self._stop:
+ delivered = 0
+ try:
+ delivered = self.flush_once()
+ except Exception as exc:
+ log.debug("ch_writer[%s]: flush loop error: %s", self._db, exc)
+ if delivered <= 0:
+ self._wake.wait(timeout=self._interval)
+ self._wake.clear()
+
+
+# ─── Module-level singletons ─────────────────────────────────────────────────
+
+_writer = _CHWriter(db="dolphin")
+_writer_green = _CHWriter(db="dolphin_green")
+_writer_prodgreen = _CHWriter(db="dolphin_prodgreen")
+_writer_pink = _CHWriter(db="dolphin_pink")
+
+
+def ch_put(table: str, row: dict) -> None:
+ """
+ Fire-and-forget insert into dolphin.
(BLUE environment).
+ """
+ _writer.put(table, row)
+
+
+def ch_put_green(table: str, row: dict) -> None:
+ """
+ Fire-and-forget insert into dolphin_green. (GREEN / NT TradingNode environment).
+ """
+ _writer_green.put(table, row)
+
+
+def ch_put_prodgreen(table: str, row: dict) -> None:
+ """
+ Fire-and-forget insert into dolphin_prodgreen. (PRODGREEN / Nautilus live).
+ """
+ _writer_prodgreen.put(table, row)
+
+
+def ch_put_pink(table: str, row: dict) -> None:
+ """
+ Fire-and-forget insert into dolphin_pink. (PINK / testnet execution).
+ """
+ _writer_pink.put(table, row)
+
+
+# ─── V7 decision journal (PINK §10 data volume / §37 routing) ────────────────
+PINK_V7_JOURNAL_DB = "dolphin_pink"
+V7_DECISION_TABLE = "v7_decision_events"
+_writer_pink_v7 = _CHWriter(db=PINK_V7_JOURNAL_DB)
+
+
+def ch_put_pink_v7(table: str, row: dict) -> None:
+ """Fire-and-forget V7 decision event insert into dolphin_pink."""
+ _writer_pink_v7.put(table, row)
+
+
+# ─── Data volume budgets (§10.3) ─────────────────────────────────────────────
+PINK_CH_BUDGET_BYTES_DAY = 50 * 1024 * 1024 # 50 MB/day max
+PINK_HZ_BUDGET_BYTES_DAY = 500 * 1024 * 1024 # 500 MB/day max
diff --git a/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs
index 33ebedf..8798b55 100644
--- a/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs
+++ b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs
@@ -1029,6 +1029,26 @@ impl KernelCore {
let realized = parsed.get("realized_pnl").and_then(|v| v.as_f64()).unwrap_or(0.0);
let fee = parsed.get("fee").and_then(|v| v.as_f64()).unwrap_or(0.0);
let is_maker = parsed.get("is_maker").and_then(|v| v.as_bool()).unwrap_or(false);
+ // Phase 3.2: Slot-level PnL repair. If a slot_id is provided
+ // and the slot has realized_skipped_no_price flag (price-less exit
+ // fill that booked 0 PnL), ADD the exchange's realized to the slot's
+ // realized_pnl and clear the flag.
+ if let Some(slot_id) = parsed.get("slot_id").and_then(|v| v.as_u64()) {
+ let sid = slot_id as usize;
+ if sid < self.slots.len() && !self.slots[sid].closed {
+ let was_skipped = self.slots[sid].metadata
+ .get("realized_skipped_no_price")
+ .and_then(|v| v.as_bool())
+ .unwrap_or(false);
+ if was_skipped && realized.is_finite() && realized != 0.0 {
+ self.slots[sid].realized_pnl += realized;
+ self.slots[sid].metadata.insert(
+ "realized_skipped_no_price".to_string(),
+ Value::Bool(false),
+ );
+ }
+ }
+ }
self.account.apply_fill_settled(realized, fee, is_maker);
}
"ACCOUNT_UPDATE" => {
@@ -1107,24 +1127,24 @@ impl KernelCore {
}
}
- fn resolve_slot(&self, event: &VenueEvent) -> usize {
+ fn resolve_slot(&self, event: &VenueEvent) -> Option {
let slot_id = event.slot_id;
if slot_id >= 0 {
let slot_id = slot_id as usize;
if slot_id < self.slots.len() {
- return slot_id;
+ return Some(slot_id);
}
}
if let Some(slot_id) = self.active_trade_index.get(&event.trade_id) {
- return *slot_id;
+ return Some(*slot_id);
}
if let Some(slot_id) = self.venue_order_index.get(&event.venue_order_id) {
- return *slot_id;
+ return Some(*slot_id);
}
if let Some(slot_id) = self.client_order_index.get(&event.venue_client_id) {
- return *slot_id;
+ return Some(*slot_id);
}
- self.slots.first().map(|slot| slot.slot_id).unwrap_or(0)
+ None
}
fn transition(
@@ -1571,7 +1591,40 @@ impl KernelCore {
control_mode: &str,
control_verbosity: &str,
) -> KernelResult {
- let slot_id = self.resolve_slot(&event);
+ let slot_id = match self.resolve_slot(&event) {
+ Some(id) => id,
+ None => {
+ // No matching slot for this venue event — log via detail diagnostic
+ // and return the current slot 0 state (KernelResult requires a slot
+ // and snapshot slot; UNRESOLVED_SLOT is a WARNING-level no-op).
+ let fallback_slot = if self.slots.is_empty() {
+ TradeSlot::default()
+ } else {
+ self.slots[0].clone()
+ };
+ let snap = self.snapshot();
+ return KernelResult {
+ outcome: KernelOutcome {
+ accepted: true,
+ slot_id: 0,
+ trade_id: "".to_string(),
+ state: fallback_slot.fsm_state.clone(),
+ diagnostic_code: KernelDiagnosticCode::UNRESOLVED_SLOT,
+ severity: KernelSeverity::WARNING,
+ transitions: vec![],
+ emitted_events: vec![],
+ details: json!({
+ "event_kind": event.kind,
+ "reason": "UNRESOLVED_SLOT",
+ "trade_id": event.trade_id,
+ "venue_order_id": event.venue_order_id,
+ }).as_object().cloned().unwrap_or_default(),
+ },
+ slot: fallback_slot,
+ snapshot: snap,
+ };
+ }
+ };
let mut slot = self.slots[slot_id].clone();
if !event.event_id.is_empty() && slot.seen_event_ids.iter().any(|seen| seen == &event.event_id) {
diff --git a/prod/clean_arch/dita_v2/account.py b/prod/clean_arch/dita_v2/account.py
index c25564d..19ae271 100644
--- a/prod/clean_arch/dita_v2/account.py
+++ b/prod/clean_arch/dita_v2/account.py
@@ -26,6 +26,10 @@ class AccountSnapshot:
fees_paid: float = 0.0
trade_seq: int = 0
peak_capital: float = 0.0
+ # E-anchored provenance (Phase 1): "seed" | "e_anchored" | "k_bridged"
+ capital_source: str = "seed"
+ e_wallet_balance: float = 0.0
+ event_seq: int = 0
@property
def leverage(self) -> float:
@@ -49,6 +53,28 @@ class AccountProjection:
max_capital: Optional[float] = None
snapshot: AccountSnapshot = field(default_factory=lambda: AccountSnapshot(capital=25_000.0, equity=25_000.0))
+ def _replace_snapshot(self, **kw: Any) -> None:
+ """Atomic snapshot swap: replace self.snapshot with a new frozen AccountSnapshot.
+
+ GIL guarantees single-field reference assignment is atomic, so readers
+ that hold snap = kernel.account.snapshot before use see a consistent view.
+ """
+ cur = self.snapshot
+ self.snapshot = AccountSnapshot(
+ capital=kw.get("capital", cur.capital),
+ equity=kw.get("equity", cur.equity),
+ realized_pnl=kw.get("realized_pnl", cur.realized_pnl),
+ unrealized_pnl=kw.get("unrealized_pnl", cur.unrealized_pnl),
+ open_positions=kw.get("open_positions", cur.open_positions),
+ open_notional=kw.get("open_notional", cur.open_notional),
+ fees_paid=kw.get("fees_paid", cur.fees_paid),
+ trade_seq=kw.get("trade_seq", cur.trade_seq),
+ peak_capital=kw.get("peak_capital", cur.peak_capital),
+ capital_source=kw.get("capital_source", cur.capital_source),
+ e_wallet_balance=kw.get("e_wallet_balance", cur.e_wallet_balance),
+ event_seq=kw.get("event_seq", cur.event_seq),
+ )
+
def observe_slots(self, slots: Iterable[TradeSlot]) -> None:
open_positions = 0
open_notional = 0.0
@@ -62,27 +88,57 @@ class AccountProjection:
mark = safe_float(slot.metadata.get("mark_price"), mark)
open_notional += abs(slot.size) * abs(mark)
unrealized_pnl += float(slot.unrealized_pnl or 0.0)
- self.snapshot.open_positions = open_positions
- self.snapshot.open_notional = open_notional
- self.snapshot.unrealized_pnl = unrealized_pnl
- self.snapshot.equity = self.snapshot.capital + unrealized_pnl
+ self._replace_snapshot(
+ open_positions=open_positions,
+ open_notional=open_notional,
+ unrealized_pnl=unrealized_pnl,
+ equity=self.snapshot.capital + unrealized_pnl if math.isfinite(self.snapshot.capital + unrealized_pnl) else self.snapshot.capital,
+ peak_capital=max(self.snapshot.peak_capital, self.snapshot.capital) if open_notional > 0 and self.snapshot.capital > 0 else self.snapshot.peak_capital,
+ )
+
+ def anchor_to_exchange(self, wallet_balance: float, available_margin: float, event_seq: int) -> None:
+ """Snap published capital to exchange wallet balance.
+
+ The exchange is the ledger of record (E-anchored). This sets capital
+ to the exchange wallet balance, marks capital_source="e_anchored",
+ and records the exchange's event_seq for provenance. Between anchors
+ settle() bridges using capital_source="k_bridged".
+ Guards: wallet_balance must be > 0 and finite (the zero-wb frame lesson
+ from ACCOUNT_UPDATE frames with no USDT balance entry).
+ """
+ wb = safe_float(wallet_balance, 0.0)
+ if wb <= 0.0 or not math.isfinite(wb):
+ return
+ self.snapshot.capital = wb
+ self.snapshot.e_wallet_balance = wb
+ self.snapshot.capital_source = "e_anchored"
+ self.snapshot.event_seq = int(event_seq)
+ self.snapshot.equity = wb + self.snapshot.unrealized_pnl
if not math.isfinite(self.snapshot.equity):
- self.snapshot.equity = self.snapshot.capital
- if open_notional > 0 and self.snapshot.capital > 0:
- self.snapshot.peak_capital = max(self.snapshot.peak_capital, self.snapshot.capital)
+ self.snapshot.equity = wb
+ self.snapshot.peak_capital = max(self.snapshot.peak_capital, wb)
def settle(self, realized_pnl: float, fees: float = 0.0) -> None:
- realized_pnl = safe_float(realized_pnl, 0.0)
- new_capital = safe_float(self.snapshot.capital + realized_pnl, self.snapshot.capital)
+ rp = safe_float(realized_pnl, 0.0)
+ # Include fees in capital delta (today fees only accumulate in
+ # fees_paid while published capital ignores them between reseeds).
+ net = rp - safe_float(fees, 0.0)
+ new_capital = safe_float(self.snapshot.capital + net, self.snapshot.capital)
if self.max_capital is not None:
new_capital = min(new_capital, self.max_capital)
new_capital = max(self.min_capital, new_capital)
- self.snapshot.capital = new_capital
- self.snapshot.realized_pnl += realized_pnl
- self.snapshot.fees_paid += safe_float(fees, 0.0)
- self.snapshot.equity = self.snapshot.capital + self.snapshot.unrealized_pnl
- if not math.isfinite(self.snapshot.equity):
- self.snapshot.equity = self.snapshot.capital
+ new_source = self.snapshot.capital_source
+ if new_source == "e_anchored" and abs(net) > 1e-12:
+ new_source = "k_bridged"
+ new_fees = self.snapshot.fees_paid + safe_float(fees, 0.0)
+ new_equity = new_capital + self.snapshot.unrealized_pnl
+ if not math.isfinite(new_equity):
+ new_equity = new_capital
+ self._replace_snapshot(
+ capital=new_capital, capital_source=new_source,
+ realized_pnl=self.snapshot.realized_pnl + rp,
+ fees_paid=new_fees, equity=new_equity,
+ )
def to_account_event(
self,
diff --git a/prod/clean_arch/persistence/pink_clickhouse.py b/prod/clean_arch/persistence/pink_clickhouse.py
index 33ea2da..67a2853 100644
--- a/prod/clean_arch/persistence/pink_clickhouse.py
+++ b/prod/clean_arch/persistence/pink_clickhouse.py
@@ -32,6 +32,18 @@ Writer = Callable[[str, dict[str, Any]], None]
_log = logging.getLogger(__name__)
+def _naive_utc_ts(ts: Any) -> str:
+ """Emit naive-UTC microsecond ISO timestamp (no +00:00 suffix)."""
+ if hasattr(ts, "isoformat"):
+ raw = ts.isoformat(timespec="microseconds")
+ if raw.endswith("+00:00"):
+ raw = raw[:-6]
+ elif raw.endswith("Z"):
+ raw = raw[:-1]
+ return raw
+ return str(ts).replace("+00:00", "").replace("Z", "")
+
+
def _json_safe(value: Any) -> Any:
if isinstance(value, Enum):
return value.value
@@ -277,7 +289,7 @@ class PinkClickHousePersistence:
ReplacingMergeTree on event_seq) keeps the latest row only.
"""
from datetime import datetime, timezone
- ts_val = ts or datetime.now(timezone.utc).isoformat()
+ ts_val = _naive_utc_ts(ts) if ts is not None else str(datetime.now(timezone.utc).isoformat()).replace("+00:00", "")
self._sink("reconcile_events", {
"timestamp": ts_val if isinstance(ts_val, str) else ts_val.isoformat(),
"runtime_namespace": self.config.runtime_namespace,
@@ -290,6 +302,10 @@ class PinkClickHousePersistence:
"explanation": str(explanation),
})
+ def _capital_source(self) -> str:
+ snap = self.account.snapshot
+ return str(getattr(snap, "capital_source", "") or "")
+
def _capital(self) -> float:
return float(self.account.snapshot.capital or 0.0)
@@ -681,7 +697,7 @@ class PinkClickHousePersistence:
self._sink(
"anomaly_events",
{
- "ts": snapshot.timestamp.isoformat(),
+ "ts": _naive_utc_ts(snapshot.timestamp),
"decision_id": decision.decision_id,
"trade_id": intent.trade_id,
"symbol": intent.asset,
@@ -741,7 +757,7 @@ class PinkClickHousePersistence:
*, anomaly: str, origin: str = "ditav2_kernel", detail: Any = "",
) -> None:
self._sink("anomaly_events", {
- "ts": snapshot.timestamp.isoformat(),
+ "ts": _naive_utc_ts(snapshot.timestamp),
"decision_id": decision.decision_id,
"trade_id": intent.trade_id,
"symbol": intent.asset,
@@ -758,7 +774,7 @@ class PinkClickHousePersistence:
price = _safe_float(decision.reference_price, 0.0)
quantity = _safe_float(intent.target_size, 0.0)
row = {
- "ts": snapshot.timestamp.isoformat(),
+ "ts": _naive_utc_ts(snapshot.timestamp),
"strategy": self.config.strategy,
"runtime_namespace": self.config.runtime_namespace,
"strategy_namespace": self.config.strategy_namespace,
@@ -777,7 +793,7 @@ class PinkClickHousePersistence:
"leverage": _safe_float(intent.leverage, 1.0),
"bar_idx": 0,
"decision_seq": self._trade_seq(),
- "bars_held": int(intent.bars_held or 0),
+ "bars_held": max(0, int(intent.bars_held or 0)),
"action": decision.action.value,
"reason": decision.reason,
"pnl_pct": 0.0,
@@ -814,7 +830,7 @@ class PinkClickHousePersistence:
open_notional = _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)) if is_open else 0.0
drawdown_pct = 0.0 if peak_cap <= 0 else max(0.0, (peak_cap - capital) / peak_cap)
row = {
- "ts": snapshot.timestamp.isoformat(),
+ "ts": _naive_utc_ts(snapshot.timestamp),
"event_type": event_type or stage.value,
"strategy": self.config.strategy,
"posture": self._posture(slot_dict),
@@ -843,6 +859,7 @@ class PinkClickHousePersistence:
"reason": None if intent is None else intent.reason,
"stage": stage.value,
}),
+ "capital_source": self._capital_source(),
# Phase 4: kernel atomic account versioning
"account_event_seq": self._account_event_seq(),
"reconcile_status": self._kernel_account().get("reconcile_status", "OK"),
@@ -875,7 +892,7 @@ class PinkClickHousePersistence:
asset = intent.asset
side = intent.side
row = {
- "ts": snapshot.timestamp.isoformat(),
+ "ts": _naive_utc_ts(snapshot.timestamp),
"trade_id": trade_id,
"asset": asset,
"direction": _direction(side),
@@ -909,7 +926,7 @@ class PinkClickHousePersistence:
leverage = 0.0 if capital <= 0 else open_notional / capital
drawdown = 0.0 if peak_cap <= 0 else max(0.0, (peak_cap - capital) / peak_cap)
row = {
- "ts": snapshot.timestamp.isoformat(timespec="milliseconds"),
+ "ts": _naive_utc_ts(snapshot.timestamp),
"capital": capital,
"roi_pct": 0.0 if self.config.initial_capital <= 0 else ((capital / self.config.initial_capital) - 1.0) * 100.0,
"dd_pct": drawdown * 100.0,
@@ -933,6 +950,8 @@ class PinkClickHousePersistence:
"remaining_notional_capacity": max(0.0, self.config.max_account_leverage * capital - open_notional),
"max_account_leverage": self.config.max_account_leverage,
"ledger_authority": self.config.ledger_authority,
+ "capital_source": self._capital_source(),
+ "account_event_seq": self._account_event_seq(),
}
self._sink("status_snapshots", row)
@@ -1002,7 +1021,7 @@ class PinkClickHousePersistence:
exit_leg_id = f"{trade_id}:leg{leg_index}"
self._sink("trade_exit_legs", {
- "ts": snapshot.timestamp.isoformat(),
+ "ts": _naive_utc_ts(snapshot.timestamp),
"date": snapshot.timestamp.date().isoformat(),
"strategy": self.config.strategy,
"trade_id": trade_id,
@@ -1031,7 +1050,8 @@ class PinkClickHousePersistence:
"pnl_pct_leg": pnl_pct_leg,
"pnl_leg": pnl_leg,
"pnl_realized_total": cur_realized,
- "bars_held": int(intent.bars_held or 0),
+ "pnl_source": "", # updated by FILL_SETTLED override (Phase 3)
+ "bars_held": max(0, int(intent.bars_held or 0)),
# Gap 1/2/3: per-leg friction
"fee_leg": fill_fee,
"fee_source": fill_fee_source,
@@ -1084,7 +1104,7 @@ class PinkClickHousePersistence:
conviction = float(intent.confidence or decision.confidence or 0.0)
metadata = intent.metadata if intent is not None else (decision.metadata if decision is not None else {})
row = {
- "ts": snapshot.timestamp.isoformat(),
+ "ts": _naive_utc_ts(snapshot.timestamp),
"date": snapshot.timestamp.date().isoformat(),
"strategy": self.config.strategy,
"trade_id": intent.trade_id,
@@ -1095,6 +1115,7 @@ class PinkClickHousePersistence:
"quantity": quantity,
"pnl": pnl,
"pnl_pct": pnl_pct,
+ "pnl_source": "", # updated by FILL_SETTLED override (Phase 3)
"exit_reason": intent.reason,
"vel_div_entry": float(decision.velocity_divergence or 0.0),
"boost_at_entry": 1.0,
@@ -1125,7 +1146,7 @@ class PinkClickHousePersistence:
"drawdown_at_entry": 0.0 if self._peak_capital() <= 0 else max(0.0, (self._peak_capital() - capital_before) / self._peak_capital()),
"open_positions_count": 0,
"scan_uuid": decision.decision_id,
- "bars_held": int(intent.bars_held or 0),
+ "bars_held": max(0, int(intent.bars_held or 0)),
"entry_payload_json": _json_text({"decision": _decision_summary(decision), "intent": _intent_summary(intent)}),
"exit_payload_json": _json_text({"outcome": _outcome_summary(outcome), "slot": _json_safe(slot_dict)}),
"execution_payload_json": _json_text({"outcome": _outcome_summary(outcome)}),
@@ -1156,7 +1177,7 @@ class PinkClickHousePersistence:
market_state: Mapping[str, Any] | None = None,
) -> None:
self._sink("trade_reconstruction", {
- "ts": snapshot.timestamp.isoformat(),
+ "ts": _naive_utc_ts(snapshot.timestamp),
"trade_id": trade_id,
"event_type": event_type,
"event_id": event_id,
diff --git a/prod/clean_arch/runtime/pink_direct.py b/prod/clean_arch/runtime/pink_direct.py
index 5708094..312ade3 100644
--- a/prod/clean_arch/runtime/pink_direct.py
+++ b/prod/clean_arch/runtime/pink_direct.py
@@ -259,13 +259,13 @@ def _reconcile_position_slot(
# No open positions — ensure slot is idle
kernel.reconcile_from_slots([])
- # Seed capital once from exchange balance.
+ # Seed capital once from exchange balance — E-anchored.
if exchange_balance_capital > 0:
- kernel.account.snapshot.capital = exchange_balance_capital
- kernel.account.snapshot.peak_capital = max(
- kernel.account.snapshot.peak_capital, exchange_balance_capital
+ kernel.account.anchor_to_exchange(
+ wallet_balance=exchange_balance_capital,
+ available_margin=exchange_balance_capital,
+ event_seq=0,
)
- kernel.account.snapshot.equity = exchange_balance_capital
@dataclass
@@ -1453,9 +1453,11 @@ class PinkDirectRuntime:
def _sizer_trade_feedback(self, acc: dict, slot_dict: dict) -> None:
"""Close-out detection → feed realized PnL into the alpha layers.
- Capital-delta PnL (net of fees) — the kernel's capital is the
- authoritative ledger, and bucket/streak multipliers only need the
- sign and rough magnitude.
+ PnL is sourced from the closing slot's realized_pnl (kernel estimate,
+ overridden by exchange FILL_SETTLED when available) — NOT the capital
+ delta, which absorbs funding, fees of other activity, and foreign fills
+ from the shared VST account (PRODGREEN collision class).
+ Bucket/streak multipliers only need sign and rough magnitude.
"""
if self.alpha_sizer is None or not self._sizer_open_tid:
return
@@ -1467,11 +1469,17 @@ class PinkDirectRuntime:
)
if still_open:
return
- pnl = float(acc.get("capital") or 0.0) - self._sizer_entry_capital
+ # Phase 3: slot.realized_pnl is the trade's own PnL (no capital-delta
+ # contamination from funding, foreign fills, or other-activity fees).
+ pnl = float(slot_dict.get("realized_pnl") or 0.0)
+ # Subtract accumulated fees when available (fees_paid on slot metadata)
+ fees = float(slot_dict.get("fees_paid", 0.0) or slot_dict.get("metadata", {}).get("fees_paid", 0.0) or 0.0)
+ pnl = pnl - fees
self._sizer_open_tid = ""
try:
self.alpha_sizer.record_close(pnl)
- self.logger.info("alpha sizer feedback: trade closed pnl=%.4f", pnl)
+ self.logger.info("alpha sizer feedback: trade closed pnl=%.4f (rp=%.4f fees=%.4f)", pnl,
+ float(slot_dict.get("realized_pnl") or 0.0), fees)
except Exception:
pass
diff --git a/prod/clickhouse/pink/08_provenance.sql b/prod/clickhouse/pink/08_provenance.sql
new file mode 100644
index 0000000..2094ce4
--- /dev/null
+++ b/prod/clickhouse/pink/08_provenance.sql
@@ -0,0 +1,17 @@
+-- Phase 2: provenance columns (E-anchored capital, PnL source)
+-- Apply BEFORE deploying code that emits these fields.
+-- ALTER TABLE ... ADD COLUMN IF NOT EXISTS is idempotent.
+
+ALTER TABLE dolphin_pink.account_events
+ ADD COLUMN IF NOT EXISTS `capital_source` LowCardinality(String) DEFAULT '',
+ ADD COLUMN IF NOT EXISTS `account_event_seq` UInt64 DEFAULT 0;
+
+ALTER TABLE dolphin_pink.status_snapshots
+ ADD COLUMN IF NOT EXISTS `capital_source` LowCardinality(String) DEFAULT '',
+ ADD COLUMN IF NOT EXISTS `account_event_seq` UInt64 DEFAULT 0;
+
+ALTER TABLE dolphin_pink.trade_events
+ ADD COLUMN IF NOT EXISTS `pnl_source` LowCardinality(String) DEFAULT '';
+
+ALTER TABLE dolphin_pink.trade_exit_legs
+ ADD COLUMN IF NOT EXISTS `pnl_source` LowCardinality(String) DEFAULT '';