PINK Phases 1-4: E-anchored capital, atomic snapshot, sizer feedback, kernel hardening

Phase 1: account.py anchor_to_exchange, capital_source provenance, settle
includes fees in capital delta.
Phase 2: atomic snapshot swap, CH provenance DDL (08_provenance.sql),
naive-UTC timestamps, ch_writer wait_for_async_insert=1 for all tables,
head-of-line stuck-row logging at WARNING per 100 attempts.
Phase 3: sizer feedback uses slot realized_pnl (not capital delta),
FILL_SETTLED repairs slot-level PnL for price-less exit legs.
Phase 4: resolve_slot returns Option<usize>, UNRESOLVED_SLOT diagnostic.
bars_held clamped to max(0, ...) at row-build time.
This commit is contained in:
Codex
2026-06-11 21:44:24 +02:00
parent 2c9da8f592
commit d280407327
6 changed files with 793 additions and 45 deletions

593
prod/ch_writer.py Normal file
View File

@@ -0,0 +1,593 @@
"""
ch_writer.py — Dolphin ClickHouse writer with durable local spool.
All inserts are fire-and-forget from the caller's perspective.
Rows are first written to a local SQLite spool on durable storage and
then replayed to ClickHouse in batches by a background thread.
This makes ClickHouse the historical sink instead of a best-effort cache:
- live services do not block on CH availability
- rows survive process restarts
- CH outages no longer drop history silently
Usage:
from ch_writer import ch_put
ch_put("eigen_scans", {"ts": int(time.time() * 1e6), "scan_number": n, ...})
Environment overrides:
CH_URL — default: http://localhost:8123
CH_USER — default: dolphin
CH_PASS — default: dolphin_ch_2026
CH_DB — default: dolphin
CH_SPOOL_DIR — default: /mnt/dolphin_training/ch_spool
CH_BATCH_SIZE — default: 250
CH_FLUSH_INTERVAL_S — default: 1.0
CH_MAINTENANCE_INTERVAL_S — default: 300
CH_WAL_TRUNCATE_BYTES — default: 67108864 (64 MiB)
CH_VACUUM_MIN_BYTES — default: 536870912 (512 MiB)
CH_VACUUM_MIN_FREE_RATIO — default: 1.25
CH_VACUUM_MIN_FREE_BYTES — default: 134217728 (128 MiB)
"""
from __future__ import annotations
import json
import logging
import os
import random
import shutil
import sqlite3
import struct
import threading
import time
import urllib.parse
import urllib.request
from collections import defaultdict
from pathlib import Path
from typing import Any, Dict, Iterable, List, Tuple
log = logging.getLogger("ch_writer")
CH_URL = os.environ.get("CH_URL", "http://localhost:8123")
CH_USER = os.environ.get("CH_USER", "dolphin")
CH_PASS = os.environ.get("CH_PASS", "dolphin_ch_2026")
CH_DB = os.environ.get("CH_DB", "dolphin")
CH_SPOOL_DIR = Path(os.environ.get("CH_SPOOL_DIR", "/mnt/dolphin_training/ch_spool"))
CH_BATCH_SIZE = int(os.environ.get("CH_BATCH_SIZE", "250"))
CH_FLUSH_INTERVAL_S = float(os.environ.get("CH_FLUSH_INTERVAL_S", "1.0"))
CH_MAINTENANCE_INTERVAL_S = float(os.environ.get("CH_MAINTENANCE_INTERVAL_S", "300"))
CH_WAL_TRUNCATE_BYTES = int(os.environ.get("CH_WAL_TRUNCATE_BYTES", str(64 * 1024 * 1024)))
CH_VACUUM_MIN_BYTES = int(os.environ.get("CH_VACUUM_MIN_BYTES", str(512 * 1024 * 1024)))
CH_VACUUM_MIN_FREE_RATIO = float(os.environ.get("CH_VACUUM_MIN_FREE_RATIO", "1.25"))
CH_VACUUM_MIN_FREE_BYTES = int(os.environ.get("CH_VACUUM_MIN_FREE_BYTES", str(128 * 1024 * 1024)))
# ─── Timestamp helpers ────────────────────────────────────────────────────────
def ts_us() -> int:
"""Current UTC time as microseconds — for DateTime64(6) fields."""
return int(time.time() * 1_000_000)
def ts_ms() -> int:
"""Current UTC time as milliseconds — for DateTime64(3) fields."""
return int(time.time() * 1_000)
# ─── UUIDv7 — time-ordered distributed trace ID ───────────────────────────────
def uuid7() -> str:
"""
Generate a UUIDv7 — RFC 9562 time-ordered UUID.
"""
ts_ms_val = int(time.time() * 1_000)
rand_a = random.getrandbits(12)
rand_b = random.getrandbits(62)
hi = (ts_ms_val << 16) | 0x7000 | rand_a
lo = (0b10 << 62) | rand_b
b = struct.pack(">QQ", hi, lo)
return (
f"{b[0:4].hex()}-{b[4:6].hex()}-"
f"{b[6:8].hex()}-{b[8:10].hex()}-{b[10:16].hex()}"
)
def _ensure_dir(path: Path) -> Path:
path.mkdir(parents=True, exist_ok=True)
return path
def _resolve_spool_dir(db: str) -> Path:
"""
Choose a durable spool directory.
Prefer the configured CH spool root; if it is unavailable, fall back to /tmp.
"""
roots = [CH_SPOOL_DIR, Path("/tmp") / "dolphin_ch_spool"]
for root in roots:
try:
path = _ensure_dir(root / db)
test = path / ".write_test"
test.write_text("ok")
test.unlink(missing_ok=True)
return path
except Exception:
continue
# Last resort: current working directory. This should be rare.
return _ensure_dir(Path.cwd() / "ch_spool" / db)
class _CHWriter:
"""
Durable ClickHouse writer.
Ingress:
- put() synchronously appends a row to a local SQLite spool.
- the caller never blocks on network IO.
Egress:
- a background thread drains unsent rows to ClickHouse.
- rows are deleted from the spool only after a successful CH insert.
"""
def __init__(
self,
flush_interval_s: float = CH_FLUSH_INTERVAL_S,
batch_size: int = CH_BATCH_SIZE,
db: str = CH_DB,
start_thread: bool = True,
maintenance_interval_s: float = CH_MAINTENANCE_INTERVAL_S,
wal_truncate_bytes: int = CH_WAL_TRUNCATE_BYTES,
vacuum_min_bytes: int = CH_VACUUM_MIN_BYTES,
vacuum_min_free_ratio: float = CH_VACUUM_MIN_FREE_RATIO,
vacuum_min_free_bytes: int = CH_VACUUM_MIN_FREE_BYTES,
) -> None:
self._interval = max(0.1, float(flush_interval_s))
self._batch_size = max(1, int(batch_size))
self._db = db
self._spool_dir = _resolve_spool_dir(db)
self._db_path = self._spool_dir / "writer.sqlite3"
self._lock = threading.RLock()
self._wake = threading.Event()
self._stop = False
self._dropped = 0
self._failed_local_writes = 0
self._flush_errors = 0
self._maintenance_interval_s = max(0.0, float(maintenance_interval_s))
self._wal_truncate_bytes = max(0, int(wal_truncate_bytes))
self._vacuum_min_bytes = max(0, int(vacuum_min_bytes))
self._vacuum_min_free_ratio = max(0.0, float(vacuum_min_free_ratio))
self._vacuum_min_free_bytes = max(0, int(vacuum_min_free_bytes))
self._last_maintenance_ts = 0.0
self._conn = self._open_db()
self._t = threading.Thread(
target=self._run, daemon=True, name=f"ch-writer-{db}"
)
if start_thread:
self._t.start()
log.info(
"ch_writer[%s]: durable spool at %s (batch=%d interval=%.1fs)",
db,
self._db_path,
self._batch_size,
self._interval,
)
def _open_db(self) -> sqlite3.Connection:
conn = sqlite3.connect(
self._db_path,
timeout=0.05, # 50ms — fail fast so put() never stalls the main scan loop
isolation_level=None,
check_same_thread=False,
)
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA synchronous=NORMAL;")
conn.execute("PRAGMA temp_store=MEMORY;")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS queue (
id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
payload TEXT NOT NULL,
created_ts_us INTEGER NOT NULL,
attempts INTEGER NOT NULL DEFAULT 0
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_queue_id ON queue(id)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_queue_table ON queue(table_name, id)"
)
return conn
_PUT_LOCK_TIMEOUT_S: float = 0.1 # max wait before dropping the row
def put(self, table: str, row: dict) -> None:
"""Persist a row locally and wake the replay loop."""
try:
payload = json.dumps(row, separators=(",", ":"), ensure_ascii=False)
if not self._lock.acquire(timeout=self._PUT_LOCK_TIMEOUT_S):
self._dropped += 1
log.warning(
"ch_writer[%s]: spool lock busy, dropping telemetry row "
"(total_dropped=%d table=%s) — background thread may be in VACUUM/WAL",
self._db,
self._dropped,
table,
)
return
try:
self._conn.execute(
"INSERT INTO queue(table_name, payload, created_ts_us) VALUES (?, ?, ?)",
(table, payload, ts_us()),
)
finally:
self._lock.release()
self._wake.set()
except Exception as exc:
self._failed_local_writes += 1
log.exception(
"ch_writer[%s]: local spool write failed (%d): %s",
self._db,
self._failed_local_writes,
exc,
)
def _fetch_batch(self) -> List[Tuple[int, str, dict]]:
with self._lock:
cur = self._conn.execute(
"SELECT id, table_name, payload FROM queue ORDER BY id LIMIT ?",
(self._batch_size,),
)
rows = cur.fetchall()
batch: List[Tuple[int, str, dict]] = []
for row_id, table_name, payload in rows:
try:
batch.append((int(row_id), str(table_name), json.loads(payload)))
except Exception as exc:
log.exception(
"ch_writer[%s]: invalid spool row id=%s skipped: %s",
self._db,
row_id,
exc,
)
with self._lock:
self._conn.execute("DELETE FROM queue WHERE id=?", (row_id,))
return batch
def _delete_ids(self, ids: Iterable[int]) -> None:
ids = list(ids)
if not ids:
return
with self._lock:
self._conn.executemany(
"DELETE FROM queue WHERE id=?",
[(int(_id),) for _id in ids],
)
def _bump_attempts(self, ids: Iterable[int]) -> None:
ids = list(ids)
if not ids:
return
with self._lock:
cur = self._conn.execute(
"SELECT id, attempts FROM queue WHERE id IN (%s)" % ",".join("?" for _ in ids),
[int(_id) for _ in ids],
)
high_attempts = [(row[0], int(row[1]) + 1) for row in cur.fetchall() if int(row[1]) >= 1000]
self._conn.executemany(
"UPDATE queue SET attempts = attempts + 1 WHERE id=?",
[(int(_id),) for _id in ids],
)
for row_id, attempt in high_attempts:
if attempt % 100 == 0:
log.warning(
"CH flush: row id=%s stuck after %d attempts — "
"potential head-of-line blocking (table may be missing or schema changed)",
row_id, attempt,
)
def _queue_count(self) -> int:
with self._lock:
row = self._conn.execute("SELECT count(*) FROM queue").fetchone()
return int(row[0]) if row else 0
def _file_size(self, path: Path) -> int:
try:
return path.stat().st_size
except FileNotFoundError:
return 0
except Exception as exc:
log.debug("ch_writer[%s]: size probe failed for %s: %s", self._db, path, exc)
return 0
def _checkpoint_wal(self) -> bool:
# Uses a dedicated connection so WAL checkpoint does not hold self._lock.
try:
ckpt_conn = sqlite3.connect(
self._db_path, timeout=5.0, isolation_level=None, check_same_thread=False
)
try:
ckpt_conn.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchall()
finally:
ckpt_conn.close()
return True
except Exception as exc:
log.warning("ch_writer[%s]: WAL checkpoint/truncate failed: %s", self._db, exc)
return False
def _has_vacuum_free_space(self, db_size: int) -> bool:
try:
usage = shutil.disk_usage(self._spool_dir)
except Exception as exc:
log.warning("ch_writer[%s]: vacuum free-space probe failed: %s", self._db, exc)
return False
required = int(db_size * self._vacuum_min_free_ratio) + self._vacuum_min_free_bytes
if usage.free < required:
log.warning(
"ch_writer[%s]: vacuum skipped; free=%d required=%d db_size=%d",
self._db,
usage.free,
required,
db_size,
)
return False
return True
def _vacuum_spool(self) -> bool:
db_size = self._file_size(self._db_path)
if db_size < self._vacuum_min_bytes:
return False
if self._queue_count() != 0:
return False
if not self._has_vacuum_free_space(db_size):
return False
try:
before = db_size + self._file_size(Path(str(self._db_path) + "-wal"))
# Open a separate connection so VACUUM does not hold self._lock.
# VACUUM in WAL mode is safe to run from a second connection while
# the main connection is idle (queue_count==0 already confirmed above).
vacuum_conn = sqlite3.connect(
self._db_path, timeout=5.0, isolation_level=None, check_same_thread=False
)
try:
vacuum_conn.execute("VACUUM;")
vacuum_conn.execute("PRAGMA wal_checkpoint(TRUNCATE);").fetchall()
finally:
vacuum_conn.close()
after = self._file_size(self._db_path) + self._file_size(Path(str(self._db_path) + "-wal"))
log.info(
"ch_writer[%s]: vacuumed empty spool bytes=%d->%d",
self._db,
before,
after,
)
return True
except Exception as exc:
log.warning("ch_writer[%s]: vacuum failed: %s", self._db, exc)
return False
def _maybe_maintain_spool(self, *, force: bool = False) -> dict:
now = time.time()
status = {"ran": False, "checkpointed": False, "vacuumed": False}
if not force and self._maintenance_interval_s > 0:
if now - self._last_maintenance_ts < self._maintenance_interval_s:
return status
self._last_maintenance_ts = now
try:
if self._queue_count() != 0:
return status
status["ran"] = True
wal_path = Path(str(self._db_path) + "-wal")
if self._file_size(wal_path) >= self._wal_truncate_bytes:
status["checkpointed"] = self._checkpoint_wal()
status["vacuumed"] = self._vacuum_spool()
except Exception as exc:
log.warning("ch_writer[%s]: spool maintenance failed: %s", self._db, exc)
return status
def _post_rows(self, table: str, rows: List[dict]) -> bool:
body = "\n".join(json.dumps(r, separators=(",", ":"), ensure_ascii=False) for r in rows).encode()
wait_for_async_insert = "1" # Phase 2: ALL accounting tables must be durable (loss prevention)
url = (
f"{CH_URL}/?database={self._db}"
f"&query=INSERT+INTO+{table}+FORMAT+JSONEachRow"
f"&async_insert=1&wait_for_async_insert={wait_for_async_insert}"
# best_effort: some writers emit timezone-aware isoformat ts
# ("…+00:00"); strict parsing rejects those rows forever and
# head-of-line-blocks the spool (2026-06-11 disk-fill incident).
f"&date_time_input_format=best_effort"
)
req = urllib.request.Request(url, data=body, method="POST")
req.add_header("X-ClickHouse-User", CH_USER)
req.add_header("X-ClickHouse-Key", CH_PASS)
req.add_header("Content-Type", "application/octet-stream")
try:
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status in (200, 201)
except Exception as exc:
# WARNING, not DEBUG: a permanently failing table head-of-line
# blocks the whole spool; at DEBUG that ran silent for 17 days
# (2026-05-25 → 06-11) while the WAL grew to 11.7 GB.
self._flush_errors += 1
if self._flush_errors <= 10 or self._flush_errors % 100 == 0:
log.warning(
"CH flush error [%s] (count=%d): %s", table, self._flush_errors, exc
)
else:
log.debug("CH flush error [%s]: %s", table, exc)
return False
def _query_lines(self, sql: str, timeout_s: float = 5.0) -> List[str]:
encoded = urllib.parse.quote_plus(sql)
url = f"{CH_URL}/?database={self._db}&query={encoded}"
req = urllib.request.Request(url, method="POST")
req.add_header("X-ClickHouse-User", CH_USER)
req.add_header("X-ClickHouse-Key", CH_PASS)
req.add_header("Content-Type", "text/plain; charset=utf-8")
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
raw = resp.read().decode("utf-8", errors="replace")
return [line for line in raw.splitlines() if line]
def _existing_trade_keys(self, rows: List[dict]) -> set[Tuple[str, int]]:
trade_ids: List[str] = []
for row in rows:
trade_id = row.get("trade_id")
if trade_id is None:
continue
tid = str(trade_id).strip()
if tid:
trade_ids.append(tid)
if not trade_ids:
return set()
unique = sorted(set(trade_ids))
existing: set[Tuple[str, int]] = set()
chunk_size = 200
for i in range(0, len(unique), chunk_size):
chunk = unique[i : i + chunk_size]
quoted = ",".join("'" + tid.replace("'", "''") + "'" for tid in chunk)
sql = (
"SELECT trade_id, toInt64(toUnixTimestamp64Micro(ts)) "
f"FROM trade_events WHERE trade_id IN ({quoted}) FORMAT TSV"
)
try:
lines = self._query_lines(sql)
except Exception as exc:
log.debug("CH dedupe probe failed [trade_events]: %s", exc)
# Fail open to keep trading sink non-blocking.
return set()
for line in lines:
parts = line.split("\t", 1)
if len(parts) != 2:
continue
tid, ts_us_s = parts
try:
existing.add((tid, int(ts_us_s)))
except Exception:
continue
return existing
def flush_once(self) -> int:
"""
Drain a single batch from the local spool.
Returns the number of rows successfully delivered to ClickHouse.
"""
batch = self._fetch_batch()
if not batch:
self._maybe_maintain_spool()
return 0
grouped: Dict[str, List[Tuple[int, dict]]] = defaultdict(list)
for row_id, table_name, payload in batch:
grouped[table_name].append((row_id, payload))
delivered = 0
for table, items in grouped.items():
ids = [row_id for row_id, _ in items]
rows = [payload for _, payload in items]
if table == "trade_events":
existing = self._existing_trade_keys(rows)
if existing:
kept_ids: List[int] = []
kept_rows: List[dict] = []
duplicate_ids: List[int] = []
for row_id, payload in items:
tid = str(payload.get("trade_id", "")).strip()
try:
ts_us_val = int(payload.get("ts"))
except Exception:
ts_us_val = -1
if tid and ts_us_val >= 0 and (tid, ts_us_val) in existing:
duplicate_ids.append(row_id)
else:
kept_ids.append(row_id)
kept_rows.append(payload)
if duplicate_ids:
self._delete_ids(duplicate_ids)
log.warning(
"ch_writer[%s]: dropped %d duplicate trade_events rows by trade_id",
self._db,
len(duplicate_ids),
)
ids = kept_ids
rows = kept_rows
if not rows:
continue
ok = self._post_rows(table, rows)
if ok:
delivered += len(rows)
self._delete_ids(ids)
else:
self._bump_attempts(ids)
self._maybe_maintain_spool()
return delivered
def _run(self) -> None:
while not self._stop:
delivered = 0
try:
delivered = self.flush_once()
except Exception as exc:
log.debug("ch_writer[%s]: flush loop error: %s", self._db, exc)
if delivered <= 0:
self._wake.wait(timeout=self._interval)
self._wake.clear()
# ─── Module-level singletons ─────────────────────────────────────────────────
_writer = _CHWriter(db="dolphin")
_writer_green = _CHWriter(db="dolphin_green")
_writer_prodgreen = _CHWriter(db="dolphin_prodgreen")
_writer_pink = _CHWriter(db="dolphin_pink")
def ch_put(table: str, row: dict) -> None:
"""
Fire-and-forget insert into dolphin.<table> (BLUE environment).
"""
_writer.put(table, row)
def ch_put_green(table: str, row: dict) -> None:
"""
Fire-and-forget insert into dolphin_green.<table> (GREEN / NT TradingNode environment).
"""
_writer_green.put(table, row)
def ch_put_prodgreen(table: str, row: dict) -> None:
"""
Fire-and-forget insert into dolphin_prodgreen.<table> (PRODGREEN / Nautilus live).
"""
_writer_prodgreen.put(table, row)
def ch_put_pink(table: str, row: dict) -> None:
"""
Fire-and-forget insert into dolphin_pink.<table> (PINK / testnet execution).
"""
_writer_pink.put(table, row)
# ─── V7 decision journal (PINK §10 data volume / §37 routing) ────────────────
PINK_V7_JOURNAL_DB = "dolphin_pink"
V7_DECISION_TABLE = "v7_decision_events"
_writer_pink_v7 = _CHWriter(db=PINK_V7_JOURNAL_DB)
def ch_put_pink_v7(table: str, row: dict) -> None:
"""Fire-and-forget V7 decision event insert into dolphin_pink."""
_writer_pink_v7.put(table, row)
# ─── Data volume budgets (§10.3) ─────────────────────────────────────────────
PINK_CH_BUDGET_BYTES_DAY = 50 * 1024 * 1024 # 50 MB/day max
PINK_HZ_BUDGET_BYTES_DAY = 500 * 1024 * 1024 # 500 MB/day max

View File

@@ -1029,6 +1029,26 @@ impl KernelCore {
let realized = parsed.get("realized_pnl").and_then(|v| v.as_f64()).unwrap_or(0.0);
let fee = parsed.get("fee").and_then(|v| v.as_f64()).unwrap_or(0.0);
let is_maker = parsed.get("is_maker").and_then(|v| v.as_bool()).unwrap_or(false);
// Phase 3.2: Slot-level PnL repair. If a slot_id is provided
// and the slot has realized_skipped_no_price flag (price-less exit
// fill that booked 0 PnL), ADD the exchange's realized to the slot's
// realized_pnl and clear the flag.
if let Some(slot_id) = parsed.get("slot_id").and_then(|v| v.as_u64()) {
let sid = slot_id as usize;
if sid < self.slots.len() && !self.slots[sid].closed {
let was_skipped = self.slots[sid].metadata
.get("realized_skipped_no_price")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if was_skipped && realized.is_finite() && realized != 0.0 {
self.slots[sid].realized_pnl += realized;
self.slots[sid].metadata.insert(
"realized_skipped_no_price".to_string(),
Value::Bool(false),
);
}
}
}
self.account.apply_fill_settled(realized, fee, is_maker);
}
"ACCOUNT_UPDATE" => {
@@ -1107,24 +1127,24 @@ impl KernelCore {
}
}
fn resolve_slot(&self, event: &VenueEvent) -> usize {
fn resolve_slot(&self, event: &VenueEvent) -> Option<usize> {
let slot_id = event.slot_id;
if slot_id >= 0 {
let slot_id = slot_id as usize;
if slot_id < self.slots.len() {
return slot_id;
return Some(slot_id);
}
}
if let Some(slot_id) = self.active_trade_index.get(&event.trade_id) {
return *slot_id;
return Some(*slot_id);
}
if let Some(slot_id) = self.venue_order_index.get(&event.venue_order_id) {
return *slot_id;
return Some(*slot_id);
}
if let Some(slot_id) = self.client_order_index.get(&event.venue_client_id) {
return *slot_id;
return Some(*slot_id);
}
self.slots.first().map(|slot| slot.slot_id).unwrap_or(0)
None
}
fn transition(
@@ -1571,7 +1591,40 @@ impl KernelCore {
control_mode: &str,
control_verbosity: &str,
) -> KernelResult {
let slot_id = self.resolve_slot(&event);
let slot_id = match self.resolve_slot(&event) {
Some(id) => id,
None => {
// No matching slot for this venue event — log via detail diagnostic
// and return the current slot 0 state (KernelResult requires a slot
// and snapshot slot; UNRESOLVED_SLOT is a WARNING-level no-op).
let fallback_slot = if self.slots.is_empty() {
TradeSlot::default()
} else {
self.slots[0].clone()
};
let snap = self.snapshot();
return KernelResult {
outcome: KernelOutcome {
accepted: true,
slot_id: 0,
trade_id: "".to_string(),
state: fallback_slot.fsm_state.clone(),
diagnostic_code: KernelDiagnosticCode::UNRESOLVED_SLOT,
severity: KernelSeverity::WARNING,
transitions: vec![],
emitted_events: vec![],
details: json!({
"event_kind": event.kind,
"reason": "UNRESOLVED_SLOT",
"trade_id": event.trade_id,
"venue_order_id": event.venue_order_id,
}).as_object().cloned().unwrap_or_default(),
},
slot: fallback_slot,
snapshot: snap,
};
}
};
let mut slot = self.slots[slot_id].clone();
if !event.event_id.is_empty() && slot.seen_event_ids.iter().any(|seen| seen == &event.event_id) {

View File

@@ -26,6 +26,10 @@ class AccountSnapshot:
fees_paid: float = 0.0
trade_seq: int = 0
peak_capital: float = 0.0
# E-anchored provenance (Phase 1): "seed" | "e_anchored" | "k_bridged"
capital_source: str = "seed"
e_wallet_balance: float = 0.0
event_seq: int = 0
@property
def leverage(self) -> float:
@@ -49,6 +53,28 @@ class AccountProjection:
max_capital: Optional[float] = None
snapshot: AccountSnapshot = field(default_factory=lambda: AccountSnapshot(capital=25_000.0, equity=25_000.0))
def _replace_snapshot(self, **kw: Any) -> None:
"""Atomic snapshot swap: replace self.snapshot with a new frozen AccountSnapshot.
GIL guarantees single-field reference assignment is atomic, so readers
that hold snap = kernel.account.snapshot before use see a consistent view.
"""
cur = self.snapshot
self.snapshot = AccountSnapshot(
capital=kw.get("capital", cur.capital),
equity=kw.get("equity", cur.equity),
realized_pnl=kw.get("realized_pnl", cur.realized_pnl),
unrealized_pnl=kw.get("unrealized_pnl", cur.unrealized_pnl),
open_positions=kw.get("open_positions", cur.open_positions),
open_notional=kw.get("open_notional", cur.open_notional),
fees_paid=kw.get("fees_paid", cur.fees_paid),
trade_seq=kw.get("trade_seq", cur.trade_seq),
peak_capital=kw.get("peak_capital", cur.peak_capital),
capital_source=kw.get("capital_source", cur.capital_source),
e_wallet_balance=kw.get("e_wallet_balance", cur.e_wallet_balance),
event_seq=kw.get("event_seq", cur.event_seq),
)
def observe_slots(self, slots: Iterable[TradeSlot]) -> None:
open_positions = 0
open_notional = 0.0
@@ -62,27 +88,57 @@ class AccountProjection:
mark = safe_float(slot.metadata.get("mark_price"), mark)
open_notional += abs(slot.size) * abs(mark)
unrealized_pnl += float(slot.unrealized_pnl or 0.0)
self.snapshot.open_positions = open_positions
self.snapshot.open_notional = open_notional
self.snapshot.unrealized_pnl = unrealized_pnl
self.snapshot.equity = self.snapshot.capital + unrealized_pnl
self._replace_snapshot(
open_positions=open_positions,
open_notional=open_notional,
unrealized_pnl=unrealized_pnl,
equity=self.snapshot.capital + unrealized_pnl if math.isfinite(self.snapshot.capital + unrealized_pnl) else self.snapshot.capital,
peak_capital=max(self.snapshot.peak_capital, self.snapshot.capital) if open_notional > 0 and self.snapshot.capital > 0 else self.snapshot.peak_capital,
)
def anchor_to_exchange(self, wallet_balance: float, available_margin: float, event_seq: int) -> None:
"""Snap published capital to exchange wallet balance.
The exchange is the ledger of record (E-anchored). This sets capital
to the exchange wallet balance, marks capital_source="e_anchored",
and records the exchange's event_seq for provenance. Between anchors
settle() bridges using capital_source="k_bridged".
Guards: wallet_balance must be > 0 and finite (the zero-wb frame lesson
from ACCOUNT_UPDATE frames with no USDT balance entry).
"""
wb = safe_float(wallet_balance, 0.0)
if wb <= 0.0 or not math.isfinite(wb):
return
self.snapshot.capital = wb
self.snapshot.e_wallet_balance = wb
self.snapshot.capital_source = "e_anchored"
self.snapshot.event_seq = int(event_seq)
self.snapshot.equity = wb + self.snapshot.unrealized_pnl
if not math.isfinite(self.snapshot.equity):
self.snapshot.equity = self.snapshot.capital
if open_notional > 0 and self.snapshot.capital > 0:
self.snapshot.peak_capital = max(self.snapshot.peak_capital, self.snapshot.capital)
self.snapshot.equity = wb
self.snapshot.peak_capital = max(self.snapshot.peak_capital, wb)
def settle(self, realized_pnl: float, fees: float = 0.0) -> None:
realized_pnl = safe_float(realized_pnl, 0.0)
new_capital = safe_float(self.snapshot.capital + realized_pnl, self.snapshot.capital)
rp = safe_float(realized_pnl, 0.0)
# Include fees in capital delta (today fees only accumulate in
# fees_paid while published capital ignores them between reseeds).
net = rp - safe_float(fees, 0.0)
new_capital = safe_float(self.snapshot.capital + net, self.snapshot.capital)
if self.max_capital is not None:
new_capital = min(new_capital, self.max_capital)
new_capital = max(self.min_capital, new_capital)
self.snapshot.capital = new_capital
self.snapshot.realized_pnl += realized_pnl
self.snapshot.fees_paid += safe_float(fees, 0.0)
self.snapshot.equity = self.snapshot.capital + self.snapshot.unrealized_pnl
if not math.isfinite(self.snapshot.equity):
self.snapshot.equity = self.snapshot.capital
new_source = self.snapshot.capital_source
if new_source == "e_anchored" and abs(net) > 1e-12:
new_source = "k_bridged"
new_fees = self.snapshot.fees_paid + safe_float(fees, 0.0)
new_equity = new_capital + self.snapshot.unrealized_pnl
if not math.isfinite(new_equity):
new_equity = new_capital
self._replace_snapshot(
capital=new_capital, capital_source=new_source,
realized_pnl=self.snapshot.realized_pnl + rp,
fees_paid=new_fees, equity=new_equity,
)
def to_account_event(
self,

View File

@@ -32,6 +32,18 @@ Writer = Callable[[str, dict[str, Any]], None]
_log = logging.getLogger(__name__)
def _naive_utc_ts(ts: Any) -> str:
"""Emit naive-UTC microsecond ISO timestamp (no +00:00 suffix)."""
if hasattr(ts, "isoformat"):
raw = ts.isoformat(timespec="microseconds")
if raw.endswith("+00:00"):
raw = raw[:-6]
elif raw.endswith("Z"):
raw = raw[:-1]
return raw
return str(ts).replace("+00:00", "").replace("Z", "")
def _json_safe(value: Any) -> Any:
if isinstance(value, Enum):
return value.value
@@ -277,7 +289,7 @@ class PinkClickHousePersistence:
ReplacingMergeTree on event_seq) keeps the latest row only.
"""
from datetime import datetime, timezone
ts_val = ts or datetime.now(timezone.utc).isoformat()
ts_val = _naive_utc_ts(ts) if ts is not None else str(datetime.now(timezone.utc).isoformat()).replace("+00:00", "")
self._sink("reconcile_events", {
"timestamp": ts_val if isinstance(ts_val, str) else ts_val.isoformat(),
"runtime_namespace": self.config.runtime_namespace,
@@ -290,6 +302,10 @@ class PinkClickHousePersistence:
"explanation": str(explanation),
})
def _capital_source(self) -> str:
snap = self.account.snapshot
return str(getattr(snap, "capital_source", "") or "")
def _capital(self) -> float:
return float(self.account.snapshot.capital or 0.0)
@@ -681,7 +697,7 @@ class PinkClickHousePersistence:
self._sink(
"anomaly_events",
{
"ts": snapshot.timestamp.isoformat(),
"ts": _naive_utc_ts(snapshot.timestamp),
"decision_id": decision.decision_id,
"trade_id": intent.trade_id,
"symbol": intent.asset,
@@ -741,7 +757,7 @@ class PinkClickHousePersistence:
*, anomaly: str, origin: str = "ditav2_kernel", detail: Any = "",
) -> None:
self._sink("anomaly_events", {
"ts": snapshot.timestamp.isoformat(),
"ts": _naive_utc_ts(snapshot.timestamp),
"decision_id": decision.decision_id,
"trade_id": intent.trade_id,
"symbol": intent.asset,
@@ -758,7 +774,7 @@ class PinkClickHousePersistence:
price = _safe_float(decision.reference_price, 0.0)
quantity = _safe_float(intent.target_size, 0.0)
row = {
"ts": snapshot.timestamp.isoformat(),
"ts": _naive_utc_ts(snapshot.timestamp),
"strategy": self.config.strategy,
"runtime_namespace": self.config.runtime_namespace,
"strategy_namespace": self.config.strategy_namespace,
@@ -777,7 +793,7 @@ class PinkClickHousePersistence:
"leverage": _safe_float(intent.leverage, 1.0),
"bar_idx": 0,
"decision_seq": self._trade_seq(),
"bars_held": int(intent.bars_held or 0),
"bars_held": max(0, int(intent.bars_held or 0)),
"action": decision.action.value,
"reason": decision.reason,
"pnl_pct": 0.0,
@@ -814,7 +830,7 @@ class PinkClickHousePersistence:
open_notional = _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)) if is_open else 0.0
drawdown_pct = 0.0 if peak_cap <= 0 else max(0.0, (peak_cap - capital) / peak_cap)
row = {
"ts": snapshot.timestamp.isoformat(),
"ts": _naive_utc_ts(snapshot.timestamp),
"event_type": event_type or stage.value,
"strategy": self.config.strategy,
"posture": self._posture(slot_dict),
@@ -843,6 +859,7 @@ class PinkClickHousePersistence:
"reason": None if intent is None else intent.reason,
"stage": stage.value,
}),
"capital_source": self._capital_source(),
# Phase 4: kernel atomic account versioning
"account_event_seq": self._account_event_seq(),
"reconcile_status": self._kernel_account().get("reconcile_status", "OK"),
@@ -875,7 +892,7 @@ class PinkClickHousePersistence:
asset = intent.asset
side = intent.side
row = {
"ts": snapshot.timestamp.isoformat(),
"ts": _naive_utc_ts(snapshot.timestamp),
"trade_id": trade_id,
"asset": asset,
"direction": _direction(side),
@@ -909,7 +926,7 @@ class PinkClickHousePersistence:
leverage = 0.0 if capital <= 0 else open_notional / capital
drawdown = 0.0 if peak_cap <= 0 else max(0.0, (peak_cap - capital) / peak_cap)
row = {
"ts": snapshot.timestamp.isoformat(timespec="milliseconds"),
"ts": _naive_utc_ts(snapshot.timestamp),
"capital": capital,
"roi_pct": 0.0 if self.config.initial_capital <= 0 else ((capital / self.config.initial_capital) - 1.0) * 100.0,
"dd_pct": drawdown * 100.0,
@@ -933,6 +950,8 @@ class PinkClickHousePersistence:
"remaining_notional_capacity": max(0.0, self.config.max_account_leverage * capital - open_notional),
"max_account_leverage": self.config.max_account_leverage,
"ledger_authority": self.config.ledger_authority,
"capital_source": self._capital_source(),
"account_event_seq": self._account_event_seq(),
}
self._sink("status_snapshots", row)
@@ -1002,7 +1021,7 @@ class PinkClickHousePersistence:
exit_leg_id = f"{trade_id}:leg{leg_index}"
self._sink("trade_exit_legs", {
"ts": snapshot.timestamp.isoformat(),
"ts": _naive_utc_ts(snapshot.timestamp),
"date": snapshot.timestamp.date().isoformat(),
"strategy": self.config.strategy,
"trade_id": trade_id,
@@ -1031,7 +1050,8 @@ class PinkClickHousePersistence:
"pnl_pct_leg": pnl_pct_leg,
"pnl_leg": pnl_leg,
"pnl_realized_total": cur_realized,
"bars_held": int(intent.bars_held or 0),
"pnl_source": "", # updated by FILL_SETTLED override (Phase 3)
"bars_held": max(0, int(intent.bars_held or 0)),
# Gap 1/2/3: per-leg friction
"fee_leg": fill_fee,
"fee_source": fill_fee_source,
@@ -1084,7 +1104,7 @@ class PinkClickHousePersistence:
conviction = float(intent.confidence or decision.confidence or 0.0)
metadata = intent.metadata if intent is not None else (decision.metadata if decision is not None else {})
row = {
"ts": snapshot.timestamp.isoformat(),
"ts": _naive_utc_ts(snapshot.timestamp),
"date": snapshot.timestamp.date().isoformat(),
"strategy": self.config.strategy,
"trade_id": intent.trade_id,
@@ -1095,6 +1115,7 @@ class PinkClickHousePersistence:
"quantity": quantity,
"pnl": pnl,
"pnl_pct": pnl_pct,
"pnl_source": "", # updated by FILL_SETTLED override (Phase 3)
"exit_reason": intent.reason,
"vel_div_entry": float(decision.velocity_divergence or 0.0),
"boost_at_entry": 1.0,
@@ -1125,7 +1146,7 @@ class PinkClickHousePersistence:
"drawdown_at_entry": 0.0 if self._peak_capital() <= 0 else max(0.0, (self._peak_capital() - capital_before) / self._peak_capital()),
"open_positions_count": 0,
"scan_uuid": decision.decision_id,
"bars_held": int(intent.bars_held or 0),
"bars_held": max(0, int(intent.bars_held or 0)),
"entry_payload_json": _json_text({"decision": _decision_summary(decision), "intent": _intent_summary(intent)}),
"exit_payload_json": _json_text({"outcome": _outcome_summary(outcome), "slot": _json_safe(slot_dict)}),
"execution_payload_json": _json_text({"outcome": _outcome_summary(outcome)}),
@@ -1156,7 +1177,7 @@ class PinkClickHousePersistence:
market_state: Mapping[str, Any] | None = None,
) -> None:
self._sink("trade_reconstruction", {
"ts": snapshot.timestamp.isoformat(),
"ts": _naive_utc_ts(snapshot.timestamp),
"trade_id": trade_id,
"event_type": event_type,
"event_id": event_id,

View File

@@ -259,13 +259,13 @@ def _reconcile_position_slot(
# No open positions — ensure slot is idle
kernel.reconcile_from_slots([])
# Seed capital once from exchange balance.
# Seed capital once from exchange balance — E-anchored.
if exchange_balance_capital > 0:
kernel.account.snapshot.capital = exchange_balance_capital
kernel.account.snapshot.peak_capital = max(
kernel.account.snapshot.peak_capital, exchange_balance_capital
kernel.account.anchor_to_exchange(
wallet_balance=exchange_balance_capital,
available_margin=exchange_balance_capital,
event_seq=0,
)
kernel.account.snapshot.equity = exchange_balance_capital
@dataclass
@@ -1453,9 +1453,11 @@ class PinkDirectRuntime:
def _sizer_trade_feedback(self, acc: dict, slot_dict: dict) -> None:
"""Close-out detection → feed realized PnL into the alpha layers.
Capital-delta PnL (net of fees) — the kernel's capital is the
authoritative ledger, and bucket/streak multipliers only need the
sign and rough magnitude.
PnL is sourced from the closing slot's realized_pnl (kernel estimate,
overridden by exchange FILL_SETTLED when available) — NOT the capital
delta, which absorbs funding, fees of other activity, and foreign fills
from the shared VST account (PRODGREEN collision class).
Bucket/streak multipliers only need sign and rough magnitude.
"""
if self.alpha_sizer is None or not self._sizer_open_tid:
return
@@ -1467,11 +1469,17 @@ class PinkDirectRuntime:
)
if still_open:
return
pnl = float(acc.get("capital") or 0.0) - self._sizer_entry_capital
# Phase 3: slot.realized_pnl is the trade's own PnL (no capital-delta
# contamination from funding, foreign fills, or other-activity fees).
pnl = float(slot_dict.get("realized_pnl") or 0.0)
# Subtract accumulated fees when available (fees_paid on slot metadata)
fees = float(slot_dict.get("fees_paid", 0.0) or slot_dict.get("metadata", {}).get("fees_paid", 0.0) or 0.0)
pnl = pnl - fees
self._sizer_open_tid = ""
try:
self.alpha_sizer.record_close(pnl)
self.logger.info("alpha sizer feedback: trade closed pnl=%.4f", pnl)
self.logger.info("alpha sizer feedback: trade closed pnl=%.4f (rp=%.4f fees=%.4f)", pnl,
float(slot_dict.get("realized_pnl") or 0.0), fees)
except Exception:
pass

View File

@@ -0,0 +1,17 @@
-- Phase 2: provenance columns (E-anchored capital, PnL source)
-- Apply BEFORE deploying code that emits these fields.
-- ALTER TABLE ... ADD COLUMN IF NOT EXISTS is idempotent.
ALTER TABLE dolphin_pink.account_events
ADD COLUMN IF NOT EXISTS `capital_source` LowCardinality(String) DEFAULT '',
ADD COLUMN IF NOT EXISTS `account_event_seq` UInt64 DEFAULT 0;
ALTER TABLE dolphin_pink.status_snapshots
ADD COLUMN IF NOT EXISTS `capital_source` LowCardinality(String) DEFAULT '',
ADD COLUMN IF NOT EXISTS `account_event_seq` UInt64 DEFAULT 0;
ALTER TABLE dolphin_pink.trade_events
ADD COLUMN IF NOT EXISTS `pnl_source` LowCardinality(String) DEFAULT '';
ALTER TABLE dolphin_pink.trade_exit_legs
ADD COLUMN IF NOT EXISTS `pnl_source` LowCardinality(String) DEFAULT '';