From d28040732771462277f13f612f1b1b853370cb3b Mon Sep 17 00:00:00 2001 From: Codex Date: Thu, 11 Jun 2026 21:44:24 +0200 Subject: [PATCH] PINK Phases 1-4: E-anchored capital, atomic snapshot, sizer feedback, kernel hardening Phase 1: account.py anchor_to_exchange, capital_source provenance, settle includes fees in capital delta. Phase 2: atomic snapshot swap, CH provenance DDL (08_provenance.sql), naive-UTC timestamps, ch_writer wait_for_async_insert=1 for all tables, head-of-line stuck-row logging at WARNING per 100 attempts. Phase 3: sizer feedback uses slot realized_pnl (not capital delta), FILL_SETTLED repairs slot-level PnL for price-less exit legs. Phase 4: resolve_slot returns Option, UNRESOLVED_SLOT diagnostic. bars_held clamped to max(0, ...) at row-build time. --- prod/ch_writer.py | 593 ++++++++++++++++++ .../dita_v2/_rust_kernel/src/lib.rs | 67 +- prod/clean_arch/dita_v2/account.py | 86 ++- .../clean_arch/persistence/pink_clickhouse.py | 47 +- prod/clean_arch/runtime/pink_direct.py | 28 +- prod/clickhouse/pink/08_provenance.sql | 17 + 6 files changed, 793 insertions(+), 45 deletions(-) create mode 100644 prod/ch_writer.py create mode 100644 prod/clickhouse/pink/08_provenance.sql diff --git a/prod/ch_writer.py b/prod/ch_writer.py new file mode 100644 index 0000000..d02493c --- /dev/null +++ b/prod/ch_writer.py @@ -0,0 +1,593 @@ +""" +ch_writer.py — Dolphin ClickHouse writer with durable local spool. + +All inserts are fire-and-forget from the caller's perspective. +Rows are first written to a local SQLite spool on durable storage and +then replayed to ClickHouse in batches by a background thread. + +This makes ClickHouse the historical sink instead of a best-effort cache: + - live services do not block on CH availability + - rows survive process restarts + - CH outages no longer drop history silently + +Usage: + from ch_writer import ch_put + ch_put("eigen_scans", {"ts": int(time.time() * 1e6), "scan_number": n, ...}) + +Environment overrides: + CH_URL — default: http://localhost:8123 + CH_USER — default: dolphin + CH_PASS — default: dolphin_ch_2026 + CH_DB — default: dolphin + CH_SPOOL_DIR — default: /mnt/dolphin_training/ch_spool + CH_BATCH_SIZE — default: 250 + CH_FLUSH_INTERVAL_S — default: 1.0 + CH_MAINTENANCE_INTERVAL_S — default: 300 + CH_WAL_TRUNCATE_BYTES — default: 67108864 (64 MiB) + CH_VACUUM_MIN_BYTES — default: 536870912 (512 MiB) + CH_VACUUM_MIN_FREE_RATIO — default: 1.25 + CH_VACUUM_MIN_FREE_BYTES — default: 134217728 (128 MiB) +""" + +from __future__ import annotations + +import json +import logging +import os +import random +import shutil +import sqlite3 +import struct +import threading +import time +import urllib.parse +import urllib.request +from collections import defaultdict +from pathlib import Path +from typing import Any, Dict, Iterable, List, Tuple + +log = logging.getLogger("ch_writer") + +CH_URL = os.environ.get("CH_URL", "http://localhost:8123") +CH_USER = os.environ.get("CH_USER", "dolphin") +CH_PASS = os.environ.get("CH_PASS", "dolphin_ch_2026") +CH_DB = os.environ.get("CH_DB", "dolphin") +CH_SPOOL_DIR = Path(os.environ.get("CH_SPOOL_DIR", "/mnt/dolphin_training/ch_spool")) +CH_BATCH_SIZE = int(os.environ.get("CH_BATCH_SIZE", "250")) +CH_FLUSH_INTERVAL_S = float(os.environ.get("CH_FLUSH_INTERVAL_S", "1.0")) +CH_MAINTENANCE_INTERVAL_S = float(os.environ.get("CH_MAINTENANCE_INTERVAL_S", "300")) +CH_WAL_TRUNCATE_BYTES = int(os.environ.get("CH_WAL_TRUNCATE_BYTES", str(64 * 1024 * 1024))) +CH_VACUUM_MIN_BYTES = int(os.environ.get("CH_VACUUM_MIN_BYTES", str(512 * 1024 * 1024))) +CH_VACUUM_MIN_FREE_RATIO = float(os.environ.get("CH_VACUUM_MIN_FREE_RATIO", "1.25")) +CH_VACUUM_MIN_FREE_BYTES = int(os.environ.get("CH_VACUUM_MIN_FREE_BYTES", str(128 * 1024 * 1024))) + + +# ─── Timestamp helpers ──────────────────────────────────────────────────────── + +def ts_us() -> int: + """Current UTC time as microseconds — for DateTime64(6) fields.""" + return int(time.time() * 1_000_000) + + +def ts_ms() -> int: + """Current UTC time as milliseconds — for DateTime64(3) fields.""" + return int(time.time() * 1_000) + + +# ─── UUIDv7 — time-ordered distributed trace ID ─────────────────────────────── + +def uuid7() -> str: + """ + Generate a UUIDv7 — RFC 9562 time-ordered UUID. + """ + ts_ms_val = int(time.time() * 1_000) + rand_a = random.getrandbits(12) + rand_b = random.getrandbits(62) + + hi = (ts_ms_val << 16) | 0x7000 | rand_a + lo = (0b10 << 62) | rand_b + + b = struct.pack(">QQ", hi, lo) + return ( + f"{b[0:4].hex()}-{b[4:6].hex()}-" + f"{b[6:8].hex()}-{b[8:10].hex()}-{b[10:16].hex()}" + ) + + +def _ensure_dir(path: Path) -> Path: + path.mkdir(parents=True, exist_ok=True) + return path + + +def _resolve_spool_dir(db: str) -> Path: + """ + Choose a durable spool directory. + + Prefer the configured CH spool root; if it is unavailable, fall back to /tmp. + """ + roots = [CH_SPOOL_DIR, Path("/tmp") / "dolphin_ch_spool"] + for root in roots: + try: + path = _ensure_dir(root / db) + test = path / ".write_test" + test.write_text("ok") + test.unlink(missing_ok=True) + return path + except Exception: + continue + # Last resort: current working directory. This should be rare. + return _ensure_dir(Path.cwd() / "ch_spool" / db) + + +class _CHWriter: + """ + Durable ClickHouse writer. + + Ingress: + - put() synchronously appends a row to a local SQLite spool. + - the caller never blocks on network IO. + + Egress: + - a background thread drains unsent rows to ClickHouse. + - rows are deleted from the spool only after a successful CH insert. + """ + + def __init__( + self, + flush_interval_s: float = CH_FLUSH_INTERVAL_S, + batch_size: int = CH_BATCH_SIZE, + db: str = CH_DB, + start_thread: bool = True, + maintenance_interval_s: float = CH_MAINTENANCE_INTERVAL_S, + wal_truncate_bytes: int = CH_WAL_TRUNCATE_BYTES, + vacuum_min_bytes: int = CH_VACUUM_MIN_BYTES, + vacuum_min_free_ratio: float = CH_VACUUM_MIN_FREE_RATIO, + vacuum_min_free_bytes: int = CH_VACUUM_MIN_FREE_BYTES, + ) -> None: + self._interval = max(0.1, float(flush_interval_s)) + self._batch_size = max(1, int(batch_size)) + self._db = db + self._spool_dir = _resolve_spool_dir(db) + self._db_path = self._spool_dir / "writer.sqlite3" + self._lock = threading.RLock() + self._wake = threading.Event() + self._stop = False + self._dropped = 0 + self._failed_local_writes = 0 + self._flush_errors = 0 + self._maintenance_interval_s = max(0.0, float(maintenance_interval_s)) + self._wal_truncate_bytes = max(0, int(wal_truncate_bytes)) + self._vacuum_min_bytes = max(0, int(vacuum_min_bytes)) + self._vacuum_min_free_ratio = max(0.0, float(vacuum_min_free_ratio)) + self._vacuum_min_free_bytes = max(0, int(vacuum_min_free_bytes)) + self._last_maintenance_ts = 0.0 + self._conn = self._open_db() + self._t = threading.Thread( + target=self._run, daemon=True, name=f"ch-writer-{db}" + ) + if start_thread: + self._t.start() + log.info( + "ch_writer[%s]: durable spool at %s (batch=%d interval=%.1fs)", + db, + self._db_path, + self._batch_size, + self._interval, + ) + + def _open_db(self) -> sqlite3.Connection: + conn = sqlite3.connect( + self._db_path, + timeout=0.05, # 50ms — fail fast so put() never stalls the main scan loop + isolation_level=None, + check_same_thread=False, + ) + conn.execute("PRAGMA journal_mode=WAL;") + conn.execute("PRAGMA synchronous=NORMAL;") + conn.execute("PRAGMA temp_store=MEMORY;") + conn.execute( + """ + CREATE TABLE IF NOT EXISTS queue ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + table_name TEXT NOT NULL, + payload TEXT NOT NULL, + created_ts_us INTEGER NOT NULL, + attempts INTEGER NOT NULL DEFAULT 0 + ) + """ + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_queue_id ON queue(id)" + ) + conn.execute( + "CREATE INDEX IF NOT EXISTS idx_queue_table ON queue(table_name, id)" + ) + return conn + + _PUT_LOCK_TIMEOUT_S: float = 0.1 # max wait before dropping the row + + def put(self, table: str, row: dict) -> None: + """Persist a row locally and wake the replay loop.""" + try: + payload = json.dumps(row, separators=(",", ":"), ensure_ascii=False) + if not self._lock.acquire(timeout=self._PUT_LOCK_TIMEOUT_S): + self._dropped += 1 + log.warning( + "ch_writer[%s]: spool lock busy, dropping telemetry row " + "(total_dropped=%d table=%s) — background thread may be in VACUUM/WAL", + self._db, + self._dropped, + table, + ) + return + try: + self._conn.execute( + "INSERT INTO queue(table_name, payload, created_ts_us) VALUES (?, ?, ?)", + (table, payload, ts_us()), + ) + finally: + self._lock.release() + self._wake.set() + except Exception as exc: + self._failed_local_writes += 1 + log.exception( + "ch_writer[%s]: local spool write failed (%d): %s", + self._db, + self._failed_local_writes, + exc, + ) + + def _fetch_batch(self) -> List[Tuple[int, str, dict]]: + with self._lock: + cur = self._conn.execute( + "SELECT id, table_name, payload FROM queue ORDER BY id LIMIT ?", + (self._batch_size,), + ) + rows = cur.fetchall() + batch: List[Tuple[int, str, dict]] = [] + for row_id, table_name, payload in rows: + try: + batch.append((int(row_id), str(table_name), json.loads(payload))) + except Exception as exc: + log.exception( + "ch_writer[%s]: invalid spool row id=%s skipped: %s", + self._db, + row_id, + exc, + ) + with self._lock: + self._conn.execute("DELETE FROM queue WHERE id=?", (row_id,)) + return batch + + def _delete_ids(self, ids: Iterable[int]) -> None: + ids = list(ids) + if not ids: + return + with self._lock: + self._conn.executemany( + "DELETE FROM queue WHERE id=?", + [(int(_id),) for _id in ids], + ) + + def _bump_attempts(self, ids: Iterable[int]) -> None: + ids = list(ids) + if not ids: + return + with self._lock: + cur = self._conn.execute( + "SELECT id, attempts FROM queue WHERE id IN (%s)" % ",".join("?" for _ in ids), + [int(_id) for _ in ids], + ) + high_attempts = [(row[0], int(row[1]) + 1) for row in cur.fetchall() if int(row[1]) >= 1000] + self._conn.executemany( + "UPDATE queue SET attempts = attempts + 1 WHERE id=?", + [(int(_id),) for _id in ids], + ) + for row_id, attempt in high_attempts: + if attempt % 100 == 0: + log.warning( + "CH flush: row id=%s stuck after %d attempts — " + "potential head-of-line blocking (table may be missing or schema changed)", + row_id, attempt, + ) + + def _queue_count(self) -> int: + with self._lock: + row = self._conn.execute("SELECT count(*) FROM queue").fetchone() + return int(row[0]) if row else 0 + + def _file_size(self, path: Path) -> int: + try: + return path.stat().st_size + except FileNotFoundError: + return 0 + except Exception as exc: + log.debug("ch_writer[%s]: size probe failed for %s: %s", self._db, path, exc) + return 0 + + def _checkpoint_wal(self) -> bool: + # Uses a dedicated connection so WAL checkpoint does not hold self._lock. + try: + ckpt_conn = sqlite3.connect( + self._db_path, timeout=5.0, isolation_level=None, check_same_thread=False + ) + try: + ckpt_conn.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchall() + finally: + ckpt_conn.close() + return True + except Exception as exc: + log.warning("ch_writer[%s]: WAL checkpoint/truncate failed: %s", self._db, exc) + return False + + def _has_vacuum_free_space(self, db_size: int) -> bool: + try: + usage = shutil.disk_usage(self._spool_dir) + except Exception as exc: + log.warning("ch_writer[%s]: vacuum free-space probe failed: %s", self._db, exc) + return False + required = int(db_size * self._vacuum_min_free_ratio) + self._vacuum_min_free_bytes + if usage.free < required: + log.warning( + "ch_writer[%s]: vacuum skipped; free=%d required=%d db_size=%d", + self._db, + usage.free, + required, + db_size, + ) + return False + return True + + def _vacuum_spool(self) -> bool: + db_size = self._file_size(self._db_path) + if db_size < self._vacuum_min_bytes: + return False + if self._queue_count() != 0: + return False + if not self._has_vacuum_free_space(db_size): + return False + try: + before = db_size + self._file_size(Path(str(self._db_path) + "-wal")) + # Open a separate connection so VACUUM does not hold self._lock. + # VACUUM in WAL mode is safe to run from a second connection while + # the main connection is idle (queue_count==0 already confirmed above). + vacuum_conn = sqlite3.connect( + self._db_path, timeout=5.0, isolation_level=None, check_same_thread=False + ) + try: + vacuum_conn.execute("VACUUM;") + vacuum_conn.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchall() + finally: + vacuum_conn.close() + after = self._file_size(self._db_path) + self._file_size(Path(str(self._db_path) + "-wal")) + log.info( + "ch_writer[%s]: vacuumed empty spool bytes=%d->%d", + self._db, + before, + after, + ) + return True + except Exception as exc: + log.warning("ch_writer[%s]: vacuum failed: %s", self._db, exc) + return False + + def _maybe_maintain_spool(self, *, force: bool = False) -> dict: + now = time.time() + status = {"ran": False, "checkpointed": False, "vacuumed": False} + if not force and self._maintenance_interval_s > 0: + if now - self._last_maintenance_ts < self._maintenance_interval_s: + return status + self._last_maintenance_ts = now + try: + if self._queue_count() != 0: + return status + status["ran"] = True + wal_path = Path(str(self._db_path) + "-wal") + if self._file_size(wal_path) >= self._wal_truncate_bytes: + status["checkpointed"] = self._checkpoint_wal() + status["vacuumed"] = self._vacuum_spool() + except Exception as exc: + log.warning("ch_writer[%s]: spool maintenance failed: %s", self._db, exc) + return status + + def _post_rows(self, table: str, rows: List[dict]) -> bool: + body = "\n".join(json.dumps(r, separators=(",", ":"), ensure_ascii=False) for r in rows).encode() + wait_for_async_insert = "1" # Phase 2: ALL accounting tables must be durable (loss prevention) + url = ( + f"{CH_URL}/?database={self._db}" + f"&query=INSERT+INTO+{table}+FORMAT+JSONEachRow" + f"&async_insert=1&wait_for_async_insert={wait_for_async_insert}" + # best_effort: some writers emit timezone-aware isoformat ts + # ("…+00:00"); strict parsing rejects those rows forever and + # head-of-line-blocks the spool (2026-06-11 disk-fill incident). + f"&date_time_input_format=best_effort" + ) + req = urllib.request.Request(url, data=body, method="POST") + req.add_header("X-ClickHouse-User", CH_USER) + req.add_header("X-ClickHouse-Key", CH_PASS) + req.add_header("Content-Type", "application/octet-stream") + try: + with urllib.request.urlopen(req, timeout=5) as resp: + return resp.status in (200, 201) + except Exception as exc: + # WARNING, not DEBUG: a permanently failing table head-of-line + # blocks the whole spool; at DEBUG that ran silent for 17 days + # (2026-05-25 → 06-11) while the WAL grew to 11.7 GB. + self._flush_errors += 1 + if self._flush_errors <= 10 or self._flush_errors % 100 == 0: + log.warning( + "CH flush error [%s] (count=%d): %s", table, self._flush_errors, exc + ) + else: + log.debug("CH flush error [%s]: %s", table, exc) + return False + + def _query_lines(self, sql: str, timeout_s: float = 5.0) -> List[str]: + encoded = urllib.parse.quote_plus(sql) + url = f"{CH_URL}/?database={self._db}&query={encoded}" + req = urllib.request.Request(url, method="POST") + req.add_header("X-ClickHouse-User", CH_USER) + req.add_header("X-ClickHouse-Key", CH_PASS) + req.add_header("Content-Type", "text/plain; charset=utf-8") + with urllib.request.urlopen(req, timeout=timeout_s) as resp: + raw = resp.read().decode("utf-8", errors="replace") + return [line for line in raw.splitlines() if line] + + def _existing_trade_keys(self, rows: List[dict]) -> set[Tuple[str, int]]: + trade_ids: List[str] = [] + for row in rows: + trade_id = row.get("trade_id") + if trade_id is None: + continue + tid = str(trade_id).strip() + if tid: + trade_ids.append(tid) + + if not trade_ids: + return set() + + unique = sorted(set(trade_ids)) + existing: set[Tuple[str, int]] = set() + chunk_size = 200 + for i in range(0, len(unique), chunk_size): + chunk = unique[i : i + chunk_size] + quoted = ",".join("'" + tid.replace("'", "''") + "'" for tid in chunk) + sql = ( + "SELECT trade_id, toInt64(toUnixTimestamp64Micro(ts)) " + f"FROM trade_events WHERE trade_id IN ({quoted}) FORMAT TSV" + ) + try: + lines = self._query_lines(sql) + except Exception as exc: + log.debug("CH dedupe probe failed [trade_events]: %s", exc) + # Fail open to keep trading sink non-blocking. + return set() + for line in lines: + parts = line.split("\t", 1) + if len(parts) != 2: + continue + tid, ts_us_s = parts + try: + existing.add((tid, int(ts_us_s))) + except Exception: + continue + return existing + + def flush_once(self) -> int: + """ + Drain a single batch from the local spool. + + Returns the number of rows successfully delivered to ClickHouse. + """ + batch = self._fetch_batch() + if not batch: + self._maybe_maintain_spool() + return 0 + + grouped: Dict[str, List[Tuple[int, dict]]] = defaultdict(list) + for row_id, table_name, payload in batch: + grouped[table_name].append((row_id, payload)) + + delivered = 0 + for table, items in grouped.items(): + ids = [row_id for row_id, _ in items] + rows = [payload for _, payload in items] + if table == "trade_events": + existing = self._existing_trade_keys(rows) + if existing: + kept_ids: List[int] = [] + kept_rows: List[dict] = [] + duplicate_ids: List[int] = [] + for row_id, payload in items: + tid = str(payload.get("trade_id", "")).strip() + try: + ts_us_val = int(payload.get("ts")) + except Exception: + ts_us_val = -1 + if tid and ts_us_val >= 0 and (tid, ts_us_val) in existing: + duplicate_ids.append(row_id) + else: + kept_ids.append(row_id) + kept_rows.append(payload) + if duplicate_ids: + self._delete_ids(duplicate_ids) + log.warning( + "ch_writer[%s]: dropped %d duplicate trade_events rows by trade_id", + self._db, + len(duplicate_ids), + ) + ids = kept_ids + rows = kept_rows + if not rows: + continue + ok = self._post_rows(table, rows) + if ok: + delivered += len(rows) + self._delete_ids(ids) + else: + self._bump_attempts(ids) + self._maybe_maintain_spool() + return delivered + + def _run(self) -> None: + while not self._stop: + delivered = 0 + try: + delivered = self.flush_once() + except Exception as exc: + log.debug("ch_writer[%s]: flush loop error: %s", self._db, exc) + if delivered <= 0: + self._wake.wait(timeout=self._interval) + self._wake.clear() + + +# ─── Module-level singletons ───────────────────────────────────────────────── + +_writer = _CHWriter(db="dolphin") +_writer_green = _CHWriter(db="dolphin_green") +_writer_prodgreen = _CHWriter(db="dolphin_prodgreen") +_writer_pink = _CHWriter(db="dolphin_pink") + + +def ch_put(table: str, row: dict) -> None: + """ + Fire-and-forget insert into dolphin. (BLUE environment). + """ + _writer.put(table, row) + + +def ch_put_green(table: str, row: dict) -> None: + """ + Fire-and-forget insert into dolphin_green.
(GREEN / NT TradingNode environment). + """ + _writer_green.put(table, row) + + +def ch_put_prodgreen(table: str, row: dict) -> None: + """ + Fire-and-forget insert into dolphin_prodgreen.
(PRODGREEN / Nautilus live). + """ + _writer_prodgreen.put(table, row) + + +def ch_put_pink(table: str, row: dict) -> None: + """ + Fire-and-forget insert into dolphin_pink.
(PINK / testnet execution). + """ + _writer_pink.put(table, row) + + +# ─── V7 decision journal (PINK §10 data volume / §37 routing) ──────────────── +PINK_V7_JOURNAL_DB = "dolphin_pink" +V7_DECISION_TABLE = "v7_decision_events" +_writer_pink_v7 = _CHWriter(db=PINK_V7_JOURNAL_DB) + + +def ch_put_pink_v7(table: str, row: dict) -> None: + """Fire-and-forget V7 decision event insert into dolphin_pink.""" + _writer_pink_v7.put(table, row) + + +# ─── Data volume budgets (§10.3) ───────────────────────────────────────────── +PINK_CH_BUDGET_BYTES_DAY = 50 * 1024 * 1024 # 50 MB/day max +PINK_HZ_BUDGET_BYTES_DAY = 500 * 1024 * 1024 # 500 MB/day max diff --git a/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs index 33ebedf..8798b55 100644 --- a/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs +++ b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs @@ -1029,6 +1029,26 @@ impl KernelCore { let realized = parsed.get("realized_pnl").and_then(|v| v.as_f64()).unwrap_or(0.0); let fee = parsed.get("fee").and_then(|v| v.as_f64()).unwrap_or(0.0); let is_maker = parsed.get("is_maker").and_then(|v| v.as_bool()).unwrap_or(false); + // Phase 3.2: Slot-level PnL repair. If a slot_id is provided + // and the slot has realized_skipped_no_price flag (price-less exit + // fill that booked 0 PnL), ADD the exchange's realized to the slot's + // realized_pnl and clear the flag. + if let Some(slot_id) = parsed.get("slot_id").and_then(|v| v.as_u64()) { + let sid = slot_id as usize; + if sid < self.slots.len() && !self.slots[sid].closed { + let was_skipped = self.slots[sid].metadata + .get("realized_skipped_no_price") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + if was_skipped && realized.is_finite() && realized != 0.0 { + self.slots[sid].realized_pnl += realized; + self.slots[sid].metadata.insert( + "realized_skipped_no_price".to_string(), + Value::Bool(false), + ); + } + } + } self.account.apply_fill_settled(realized, fee, is_maker); } "ACCOUNT_UPDATE" => { @@ -1107,24 +1127,24 @@ impl KernelCore { } } - fn resolve_slot(&self, event: &VenueEvent) -> usize { + fn resolve_slot(&self, event: &VenueEvent) -> Option { let slot_id = event.slot_id; if slot_id >= 0 { let slot_id = slot_id as usize; if slot_id < self.slots.len() { - return slot_id; + return Some(slot_id); } } if let Some(slot_id) = self.active_trade_index.get(&event.trade_id) { - return *slot_id; + return Some(*slot_id); } if let Some(slot_id) = self.venue_order_index.get(&event.venue_order_id) { - return *slot_id; + return Some(*slot_id); } if let Some(slot_id) = self.client_order_index.get(&event.venue_client_id) { - return *slot_id; + return Some(*slot_id); } - self.slots.first().map(|slot| slot.slot_id).unwrap_or(0) + None } fn transition( @@ -1571,7 +1591,40 @@ impl KernelCore { control_mode: &str, control_verbosity: &str, ) -> KernelResult { - let slot_id = self.resolve_slot(&event); + let slot_id = match self.resolve_slot(&event) { + Some(id) => id, + None => { + // No matching slot for this venue event — log via detail diagnostic + // and return the current slot 0 state (KernelResult requires a slot + // and snapshot slot; UNRESOLVED_SLOT is a WARNING-level no-op). + let fallback_slot = if self.slots.is_empty() { + TradeSlot::default() + } else { + self.slots[0].clone() + }; + let snap = self.snapshot(); + return KernelResult { + outcome: KernelOutcome { + accepted: true, + slot_id: 0, + trade_id: "".to_string(), + state: fallback_slot.fsm_state.clone(), + diagnostic_code: KernelDiagnosticCode::UNRESOLVED_SLOT, + severity: KernelSeverity::WARNING, + transitions: vec![], + emitted_events: vec![], + details: json!({ + "event_kind": event.kind, + "reason": "UNRESOLVED_SLOT", + "trade_id": event.trade_id, + "venue_order_id": event.venue_order_id, + }).as_object().cloned().unwrap_or_default(), + }, + slot: fallback_slot, + snapshot: snap, + }; + } + }; let mut slot = self.slots[slot_id].clone(); if !event.event_id.is_empty() && slot.seen_event_ids.iter().any(|seen| seen == &event.event_id) { diff --git a/prod/clean_arch/dita_v2/account.py b/prod/clean_arch/dita_v2/account.py index c25564d..19ae271 100644 --- a/prod/clean_arch/dita_v2/account.py +++ b/prod/clean_arch/dita_v2/account.py @@ -26,6 +26,10 @@ class AccountSnapshot: fees_paid: float = 0.0 trade_seq: int = 0 peak_capital: float = 0.0 + # E-anchored provenance (Phase 1): "seed" | "e_anchored" | "k_bridged" + capital_source: str = "seed" + e_wallet_balance: float = 0.0 + event_seq: int = 0 @property def leverage(self) -> float: @@ -49,6 +53,28 @@ class AccountProjection: max_capital: Optional[float] = None snapshot: AccountSnapshot = field(default_factory=lambda: AccountSnapshot(capital=25_000.0, equity=25_000.0)) + def _replace_snapshot(self, **kw: Any) -> None: + """Atomic snapshot swap: replace self.snapshot with a new frozen AccountSnapshot. + + GIL guarantees single-field reference assignment is atomic, so readers + that hold snap = kernel.account.snapshot before use see a consistent view. + """ + cur = self.snapshot + self.snapshot = AccountSnapshot( + capital=kw.get("capital", cur.capital), + equity=kw.get("equity", cur.equity), + realized_pnl=kw.get("realized_pnl", cur.realized_pnl), + unrealized_pnl=kw.get("unrealized_pnl", cur.unrealized_pnl), + open_positions=kw.get("open_positions", cur.open_positions), + open_notional=kw.get("open_notional", cur.open_notional), + fees_paid=kw.get("fees_paid", cur.fees_paid), + trade_seq=kw.get("trade_seq", cur.trade_seq), + peak_capital=kw.get("peak_capital", cur.peak_capital), + capital_source=kw.get("capital_source", cur.capital_source), + e_wallet_balance=kw.get("e_wallet_balance", cur.e_wallet_balance), + event_seq=kw.get("event_seq", cur.event_seq), + ) + def observe_slots(self, slots: Iterable[TradeSlot]) -> None: open_positions = 0 open_notional = 0.0 @@ -62,27 +88,57 @@ class AccountProjection: mark = safe_float(slot.metadata.get("mark_price"), mark) open_notional += abs(slot.size) * abs(mark) unrealized_pnl += float(slot.unrealized_pnl or 0.0) - self.snapshot.open_positions = open_positions - self.snapshot.open_notional = open_notional - self.snapshot.unrealized_pnl = unrealized_pnl - self.snapshot.equity = self.snapshot.capital + unrealized_pnl + self._replace_snapshot( + open_positions=open_positions, + open_notional=open_notional, + unrealized_pnl=unrealized_pnl, + equity=self.snapshot.capital + unrealized_pnl if math.isfinite(self.snapshot.capital + unrealized_pnl) else self.snapshot.capital, + peak_capital=max(self.snapshot.peak_capital, self.snapshot.capital) if open_notional > 0 and self.snapshot.capital > 0 else self.snapshot.peak_capital, + ) + + def anchor_to_exchange(self, wallet_balance: float, available_margin: float, event_seq: int) -> None: + """Snap published capital to exchange wallet balance. + + The exchange is the ledger of record (E-anchored). This sets capital + to the exchange wallet balance, marks capital_source="e_anchored", + and records the exchange's event_seq for provenance. Between anchors + settle() bridges using capital_source="k_bridged". + Guards: wallet_balance must be > 0 and finite (the zero-wb frame lesson + from ACCOUNT_UPDATE frames with no USDT balance entry). + """ + wb = safe_float(wallet_balance, 0.0) + if wb <= 0.0 or not math.isfinite(wb): + return + self.snapshot.capital = wb + self.snapshot.e_wallet_balance = wb + self.snapshot.capital_source = "e_anchored" + self.snapshot.event_seq = int(event_seq) + self.snapshot.equity = wb + self.snapshot.unrealized_pnl if not math.isfinite(self.snapshot.equity): - self.snapshot.equity = self.snapshot.capital - if open_notional > 0 and self.snapshot.capital > 0: - self.snapshot.peak_capital = max(self.snapshot.peak_capital, self.snapshot.capital) + self.snapshot.equity = wb + self.snapshot.peak_capital = max(self.snapshot.peak_capital, wb) def settle(self, realized_pnl: float, fees: float = 0.0) -> None: - realized_pnl = safe_float(realized_pnl, 0.0) - new_capital = safe_float(self.snapshot.capital + realized_pnl, self.snapshot.capital) + rp = safe_float(realized_pnl, 0.0) + # Include fees in capital delta (today fees only accumulate in + # fees_paid while published capital ignores them between reseeds). + net = rp - safe_float(fees, 0.0) + new_capital = safe_float(self.snapshot.capital + net, self.snapshot.capital) if self.max_capital is not None: new_capital = min(new_capital, self.max_capital) new_capital = max(self.min_capital, new_capital) - self.snapshot.capital = new_capital - self.snapshot.realized_pnl += realized_pnl - self.snapshot.fees_paid += safe_float(fees, 0.0) - self.snapshot.equity = self.snapshot.capital + self.snapshot.unrealized_pnl - if not math.isfinite(self.snapshot.equity): - self.snapshot.equity = self.snapshot.capital + new_source = self.snapshot.capital_source + if new_source == "e_anchored" and abs(net) > 1e-12: + new_source = "k_bridged" + new_fees = self.snapshot.fees_paid + safe_float(fees, 0.0) + new_equity = new_capital + self.snapshot.unrealized_pnl + if not math.isfinite(new_equity): + new_equity = new_capital + self._replace_snapshot( + capital=new_capital, capital_source=new_source, + realized_pnl=self.snapshot.realized_pnl + rp, + fees_paid=new_fees, equity=new_equity, + ) def to_account_event( self, diff --git a/prod/clean_arch/persistence/pink_clickhouse.py b/prod/clean_arch/persistence/pink_clickhouse.py index 33ea2da..67a2853 100644 --- a/prod/clean_arch/persistence/pink_clickhouse.py +++ b/prod/clean_arch/persistence/pink_clickhouse.py @@ -32,6 +32,18 @@ Writer = Callable[[str, dict[str, Any]], None] _log = logging.getLogger(__name__) +def _naive_utc_ts(ts: Any) -> str: + """Emit naive-UTC microsecond ISO timestamp (no +00:00 suffix).""" + if hasattr(ts, "isoformat"): + raw = ts.isoformat(timespec="microseconds") + if raw.endswith("+00:00"): + raw = raw[:-6] + elif raw.endswith("Z"): + raw = raw[:-1] + return raw + return str(ts).replace("+00:00", "").replace("Z", "") + + def _json_safe(value: Any) -> Any: if isinstance(value, Enum): return value.value @@ -277,7 +289,7 @@ class PinkClickHousePersistence: ReplacingMergeTree on event_seq) keeps the latest row only. """ from datetime import datetime, timezone - ts_val = ts or datetime.now(timezone.utc).isoformat() + ts_val = _naive_utc_ts(ts) if ts is not None else str(datetime.now(timezone.utc).isoformat()).replace("+00:00", "") self._sink("reconcile_events", { "timestamp": ts_val if isinstance(ts_val, str) else ts_val.isoformat(), "runtime_namespace": self.config.runtime_namespace, @@ -290,6 +302,10 @@ class PinkClickHousePersistence: "explanation": str(explanation), }) + def _capital_source(self) -> str: + snap = self.account.snapshot + return str(getattr(snap, "capital_source", "") or "") + def _capital(self) -> float: return float(self.account.snapshot.capital or 0.0) @@ -681,7 +697,7 @@ class PinkClickHousePersistence: self._sink( "anomaly_events", { - "ts": snapshot.timestamp.isoformat(), + "ts": _naive_utc_ts(snapshot.timestamp), "decision_id": decision.decision_id, "trade_id": intent.trade_id, "symbol": intent.asset, @@ -741,7 +757,7 @@ class PinkClickHousePersistence: *, anomaly: str, origin: str = "ditav2_kernel", detail: Any = "", ) -> None: self._sink("anomaly_events", { - "ts": snapshot.timestamp.isoformat(), + "ts": _naive_utc_ts(snapshot.timestamp), "decision_id": decision.decision_id, "trade_id": intent.trade_id, "symbol": intent.asset, @@ -758,7 +774,7 @@ class PinkClickHousePersistence: price = _safe_float(decision.reference_price, 0.0) quantity = _safe_float(intent.target_size, 0.0) row = { - "ts": snapshot.timestamp.isoformat(), + "ts": _naive_utc_ts(snapshot.timestamp), "strategy": self.config.strategy, "runtime_namespace": self.config.runtime_namespace, "strategy_namespace": self.config.strategy_namespace, @@ -777,7 +793,7 @@ class PinkClickHousePersistence: "leverage": _safe_float(intent.leverage, 1.0), "bar_idx": 0, "decision_seq": self._trade_seq(), - "bars_held": int(intent.bars_held or 0), + "bars_held": max(0, int(intent.bars_held or 0)), "action": decision.action.value, "reason": decision.reason, "pnl_pct": 0.0, @@ -814,7 +830,7 @@ class PinkClickHousePersistence: open_notional = _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)) if is_open else 0.0 drawdown_pct = 0.0 if peak_cap <= 0 else max(0.0, (peak_cap - capital) / peak_cap) row = { - "ts": snapshot.timestamp.isoformat(), + "ts": _naive_utc_ts(snapshot.timestamp), "event_type": event_type or stage.value, "strategy": self.config.strategy, "posture": self._posture(slot_dict), @@ -843,6 +859,7 @@ class PinkClickHousePersistence: "reason": None if intent is None else intent.reason, "stage": stage.value, }), + "capital_source": self._capital_source(), # Phase 4: kernel atomic account versioning "account_event_seq": self._account_event_seq(), "reconcile_status": self._kernel_account().get("reconcile_status", "OK"), @@ -875,7 +892,7 @@ class PinkClickHousePersistence: asset = intent.asset side = intent.side row = { - "ts": snapshot.timestamp.isoformat(), + "ts": _naive_utc_ts(snapshot.timestamp), "trade_id": trade_id, "asset": asset, "direction": _direction(side), @@ -909,7 +926,7 @@ class PinkClickHousePersistence: leverage = 0.0 if capital <= 0 else open_notional / capital drawdown = 0.0 if peak_cap <= 0 else max(0.0, (peak_cap - capital) / peak_cap) row = { - "ts": snapshot.timestamp.isoformat(timespec="milliseconds"), + "ts": _naive_utc_ts(snapshot.timestamp), "capital": capital, "roi_pct": 0.0 if self.config.initial_capital <= 0 else ((capital / self.config.initial_capital) - 1.0) * 100.0, "dd_pct": drawdown * 100.0, @@ -933,6 +950,8 @@ class PinkClickHousePersistence: "remaining_notional_capacity": max(0.0, self.config.max_account_leverage * capital - open_notional), "max_account_leverage": self.config.max_account_leverage, "ledger_authority": self.config.ledger_authority, + "capital_source": self._capital_source(), + "account_event_seq": self._account_event_seq(), } self._sink("status_snapshots", row) @@ -1002,7 +1021,7 @@ class PinkClickHousePersistence: exit_leg_id = f"{trade_id}:leg{leg_index}" self._sink("trade_exit_legs", { - "ts": snapshot.timestamp.isoformat(), + "ts": _naive_utc_ts(snapshot.timestamp), "date": snapshot.timestamp.date().isoformat(), "strategy": self.config.strategy, "trade_id": trade_id, @@ -1031,7 +1050,8 @@ class PinkClickHousePersistence: "pnl_pct_leg": pnl_pct_leg, "pnl_leg": pnl_leg, "pnl_realized_total": cur_realized, - "bars_held": int(intent.bars_held or 0), + "pnl_source": "", # updated by FILL_SETTLED override (Phase 3) + "bars_held": max(0, int(intent.bars_held or 0)), # Gap 1/2/3: per-leg friction "fee_leg": fill_fee, "fee_source": fill_fee_source, @@ -1084,7 +1104,7 @@ class PinkClickHousePersistence: conviction = float(intent.confidence or decision.confidence or 0.0) metadata = intent.metadata if intent is not None else (decision.metadata if decision is not None else {}) row = { - "ts": snapshot.timestamp.isoformat(), + "ts": _naive_utc_ts(snapshot.timestamp), "date": snapshot.timestamp.date().isoformat(), "strategy": self.config.strategy, "trade_id": intent.trade_id, @@ -1095,6 +1115,7 @@ class PinkClickHousePersistence: "quantity": quantity, "pnl": pnl, "pnl_pct": pnl_pct, + "pnl_source": "", # updated by FILL_SETTLED override (Phase 3) "exit_reason": intent.reason, "vel_div_entry": float(decision.velocity_divergence or 0.0), "boost_at_entry": 1.0, @@ -1125,7 +1146,7 @@ class PinkClickHousePersistence: "drawdown_at_entry": 0.0 if self._peak_capital() <= 0 else max(0.0, (self._peak_capital() - capital_before) / self._peak_capital()), "open_positions_count": 0, "scan_uuid": decision.decision_id, - "bars_held": int(intent.bars_held or 0), + "bars_held": max(0, int(intent.bars_held or 0)), "entry_payload_json": _json_text({"decision": _decision_summary(decision), "intent": _intent_summary(intent)}), "exit_payload_json": _json_text({"outcome": _outcome_summary(outcome), "slot": _json_safe(slot_dict)}), "execution_payload_json": _json_text({"outcome": _outcome_summary(outcome)}), @@ -1156,7 +1177,7 @@ class PinkClickHousePersistence: market_state: Mapping[str, Any] | None = None, ) -> None: self._sink("trade_reconstruction", { - "ts": snapshot.timestamp.isoformat(), + "ts": _naive_utc_ts(snapshot.timestamp), "trade_id": trade_id, "event_type": event_type, "event_id": event_id, diff --git a/prod/clean_arch/runtime/pink_direct.py b/prod/clean_arch/runtime/pink_direct.py index 5708094..312ade3 100644 --- a/prod/clean_arch/runtime/pink_direct.py +++ b/prod/clean_arch/runtime/pink_direct.py @@ -259,13 +259,13 @@ def _reconcile_position_slot( # No open positions — ensure slot is idle kernel.reconcile_from_slots([]) - # Seed capital once from exchange balance. + # Seed capital once from exchange balance — E-anchored. if exchange_balance_capital > 0: - kernel.account.snapshot.capital = exchange_balance_capital - kernel.account.snapshot.peak_capital = max( - kernel.account.snapshot.peak_capital, exchange_balance_capital + kernel.account.anchor_to_exchange( + wallet_balance=exchange_balance_capital, + available_margin=exchange_balance_capital, + event_seq=0, ) - kernel.account.snapshot.equity = exchange_balance_capital @dataclass @@ -1453,9 +1453,11 @@ class PinkDirectRuntime: def _sizer_trade_feedback(self, acc: dict, slot_dict: dict) -> None: """Close-out detection → feed realized PnL into the alpha layers. - Capital-delta PnL (net of fees) — the kernel's capital is the - authoritative ledger, and bucket/streak multipliers only need the - sign and rough magnitude. + PnL is sourced from the closing slot's realized_pnl (kernel estimate, + overridden by exchange FILL_SETTLED when available) — NOT the capital + delta, which absorbs funding, fees of other activity, and foreign fills + from the shared VST account (PRODGREEN collision class). + Bucket/streak multipliers only need sign and rough magnitude. """ if self.alpha_sizer is None or not self._sizer_open_tid: return @@ -1467,11 +1469,17 @@ class PinkDirectRuntime: ) if still_open: return - pnl = float(acc.get("capital") or 0.0) - self._sizer_entry_capital + # Phase 3: slot.realized_pnl is the trade's own PnL (no capital-delta + # contamination from funding, foreign fills, or other-activity fees). + pnl = float(slot_dict.get("realized_pnl") or 0.0) + # Subtract accumulated fees when available (fees_paid on slot metadata) + fees = float(slot_dict.get("fees_paid", 0.0) or slot_dict.get("metadata", {}).get("fees_paid", 0.0) or 0.0) + pnl = pnl - fees self._sizer_open_tid = "" try: self.alpha_sizer.record_close(pnl) - self.logger.info("alpha sizer feedback: trade closed pnl=%.4f", pnl) + self.logger.info("alpha sizer feedback: trade closed pnl=%.4f (rp=%.4f fees=%.4f)", pnl, + float(slot_dict.get("realized_pnl") or 0.0), fees) except Exception: pass diff --git a/prod/clickhouse/pink/08_provenance.sql b/prod/clickhouse/pink/08_provenance.sql new file mode 100644 index 0000000..2094ce4 --- /dev/null +++ b/prod/clickhouse/pink/08_provenance.sql @@ -0,0 +1,17 @@ +-- Phase 2: provenance columns (E-anchored capital, PnL source) +-- Apply BEFORE deploying code that emits these fields. +-- ALTER TABLE ... ADD COLUMN IF NOT EXISTS is idempotent. + +ALTER TABLE dolphin_pink.account_events + ADD COLUMN IF NOT EXISTS `capital_source` LowCardinality(String) DEFAULT '', + ADD COLUMN IF NOT EXISTS `account_event_seq` UInt64 DEFAULT 0; + +ALTER TABLE dolphin_pink.status_snapshots + ADD COLUMN IF NOT EXISTS `capital_source` LowCardinality(String) DEFAULT '', + ADD COLUMN IF NOT EXISTS `account_event_seq` UInt64 DEFAULT 0; + +ALTER TABLE dolphin_pink.trade_events + ADD COLUMN IF NOT EXISTS `pnl_source` LowCardinality(String) DEFAULT ''; + +ALTER TABLE dolphin_pink.trade_exit_legs + ADD COLUMN IF NOT EXISTS `pnl_source` LowCardinality(String) DEFAULT '';