Files
siloqy/prod/clean_arch/persistence/pink_clickhouse.py
Codex c864e9c550 PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging
S1 — Leverage cache (bingx_direct.py):
  _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms
  POST when exchange already has the requested leverage.  Saves ~350ms/trade.
  Cache updated ONLY on success; failed POST leaves cache stale → correct retry.
  Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts.
  connect(): _verify_leverage_drift() detects when another process changed leverage
  at the exchange and updates cache to exchange truth (logs WARNING on drift).
  Multi-runner contract: leverage is account-level on BingX; documented that
  concurrent runners with different leverage desires for same symbol conflict.
  20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update,
  concurrent-same-symbol (lock prevents race), drift-detect, persist/restore,
  multi-runner known-limitation documentation test.

S2 — Background state refresh (bingx_direct.py):
  MARKET fills: asyncio.create_task(_refresh_state_background) — does not block
  submit path.  WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway.
  LIMIT fills: synchronous refresh retained (include_history=False, not True) —
  needed to detect resting order state for next pump cycle.
  Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved.

Gap 1 — VenueEvent friction fields (contracts.py):
  Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps,
  mark_at_submit — all with defaults so existing callers are unaffected.
  Detailed inline docs for sign conventions and provenance codes.

Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py):
  submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate;
  annotates ack_row with _fee_estimated, _fee_source, _is_maker_est.
  persist_fee_settled(): new method writes fee_settled_events row when WS
  ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED".
  pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED.

Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py):
  Captures mark_at_submit before the order POST; computes slippage_bps signed
  by side: positive = adverse (taker overpaid / maker undersold), negative =
  price improvement.  Measured for BOTH taker and maker fills for symmetry.
  Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps.

S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with:
  SHORT/LONG-aware price offset design, OBF integration requirements,
  TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s
  calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO.
  REST/WS split: documented why BingX (and all retail venues) separate these
  and why a unified VenueAdapter protocol is the long-term solution.

151/151 existing tests green + 20 new leverage cache tests = 171 total.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00

1161 lines
49 KiB
Python

"""PINK ClickHouse persistence — DITAv2-backed, reads capital from kernel.
Row families preserved (same schema, no new columns):
- policy_events / v7_decision_events
- position_state
- account_events
- status_snapshots
- trade_events
- trade_reconstruction
- trade_exit_legs
- anomaly_events
Capital/peak_capital/trade_seq are read from the kernel's AccountProjection
(single authority). No duplicate tracking in this module.
"""
from __future__ import annotations
import json
import logging
import math
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Callable, Mapping, Optional
from prod.clean_arch.dita import AccountProjection, Decision, DecisionAction, Intent, TradeSide, TradeStage
from prod.clean_arch.dita_v2.contracts import KernelDiagnosticCode, KernelEventKind, KernelOutcome
from prod.clean_arch.dita_v2.contracts import KernelSeverity, TradeStage as KernelStage
Writer = Callable[[str, dict[str, Any]], None]
_log = logging.getLogger(__name__)
def _json_safe(value: Any) -> Any:
if isinstance(value, Enum):
return value.value
if isinstance(value, dict):
return {str(key): _json_safe(val) for key, val in value.items()}
if isinstance(value, (list, tuple)):
return [_json_safe(item) for item in value]
if hasattr(value, "isoformat"):
try:
return value.isoformat()
except Exception:
pass
if hasattr(value, "__dict__"):
try:
return _json_safe(dict(vars(value)))
except Exception:
pass
return value
def _json_text(value: Any) -> str:
return json.dumps(_json_safe(value), separators=(",", ":"), ensure_ascii=False, default=str)
def _direction(side: TradeSide) -> int:
return -1 if side == TradeSide.SHORT else 1
def _direction_from_str(side: str) -> int:
return -1 if side.upper() in ("SHORT", "SELL") else 1
def _notional(size: float, price: float) -> float:
if not math.isfinite(size) or not math.isfinite(price):
return 0.0
return abs(size) * abs(price)
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
out = float(value)
except Exception:
return default
if not math.isfinite(out):
return default
return out
def _checked_float(
value: Any,
default: float = 0.0,
*,
field: str = "?",
trade_id: str = "",
sink: Writer | None = None,
) -> float:
"""_safe_float with anomaly tracing.
Any NaN/inf/non-numeric value is a bug indicator, not a normal condition.
Sanitise to ``default`` but log a WARNING and optionally write an
``anomaly_events`` spool row so the trace is queryable in ClickHouse.
"""
try:
out = float(value)
except Exception:
out = float("nan")
if not math.isfinite(out):
_log.warning(
"NaN/inf in financial field field=%s trade_id=%s raw=%r → replacing with %s",
field, trade_id or "?", value, default,
)
if sink is not None:
try:
sink("anomaly_events", {
"ts": datetime.now(timezone.utc).isoformat(),
"decision_id": "",
"trade_id": trade_id,
"symbol": "",
"anomaly": f"NaN_FINANCIAL_FIELD:{field}",
"origin": "persistence_nan_guard",
"sensor": field,
"detail": f"raw={value!r} replaced_with={default}",
"rm_meta": 0.0,
})
except Exception:
pass
return default
return out
def _decision_summary(decision: Decision | None) -> dict[str, Any]:
if decision is None:
return {}
return {
"timestamp": decision.timestamp.isoformat() if hasattr(decision.timestamp, "isoformat") else str(decision.timestamp),
"decision_id": decision.decision_id,
"asset": decision.asset,
"action": decision.action.value,
"side": decision.side.value,
"reason": decision.reason,
"confidence": float(decision.confidence or 0.0),
"velocity_divergence": float(decision.velocity_divergence or 0.0),
"irp_alignment": float(decision.irp_alignment or 0.0),
"reference_price": float(decision.reference_price or 0.0),
"target_size": float(decision.target_size or 0.0),
"leverage": float(decision.leverage or 0.0),
"bars_held": int(decision.bars_held or 0),
"stage": decision.stage.value,
"metadata": _json_safe(decision.metadata),
}
def _intent_summary(intent: Intent | None) -> dict[str, Any]:
if intent is None:
return {}
return {
"timestamp": intent.timestamp.isoformat() if hasattr(intent.timestamp, "isoformat") else str(intent.timestamp),
"trade_id": intent.trade_id,
"decision_id": intent.decision_id,
"asset": intent.asset,
"action": intent.action.value,
"side": intent.side.value,
"reason": intent.reason,
"target_size": float(intent.target_size or 0.0),
"leverage": float(intent.leverage or 0.0),
"reference_price": float(intent.reference_price or 0.0),
"confidence": float(intent.confidence or 0.0),
"bars_held": int(intent.bars_held or 0),
"stage": intent.stage.value,
"exit_leg_ratios": [float(r) for r in intent.exit_leg_ratios],
"metadata": _json_safe(intent.metadata),
}
def _outcome_summary(outcome: KernelOutcome | None) -> dict[str, Any]:
if outcome is None:
return {}
return {
"accepted": bool(outcome.accepted),
"slot_id": int(outcome.slot_id),
"trade_id": outcome.trade_id,
"state": outcome.state.value,
"diagnostic_code": outcome.diagnostic_code.value,
"severity": outcome.severity.value,
"details": _json_safe(outcome.details),
}
@dataclass(frozen=True)
class PinkClickHousePersistenceConfig:
"""Row-shape knobs for the PINK ClickHouse mirror."""
strategy: str = "pink"
runtime_namespace: str = "pink"
strategy_namespace: str = "pink"
event_namespace: str = "pink"
actor_name: str = "PinkDirectRuntime"
exec_venue: str = "bingx"
data_venue: str = "binance"
ledger_authority: str = "exchange"
initial_capital: float = 25_000.0
max_account_leverage: float = 3.0
exchange_leverage_mode: str = ""
leverage_mapping_rule: str = "round_half_even_linear_0.5_to_9.0_to_1_to_exchange_cap"
class PinkClickHousePersistence:
"""Durable PINK ClickHouse sink — capital reads from kernel AccountProjection."""
def __init__(
self,
account: AccountProjection,
*,
config: PinkClickHousePersistenceConfig | None = None,
sink: Writer | None = None,
v7_sink: Writer | None = None,
kernel: Optional[Any] = None, # ExecutionKernel — optional; enables event_seq/reconcile writes
) -> None:
self.account = account
self._kernel = kernel # set post-construction via set_kernel() if needed
self.config = config or PinkClickHousePersistenceConfig(
runtime_namespace=account.runtime_namespace,
strategy_namespace=account.strategy_namespace,
event_namespace=account.event_namespace,
actor_name=account.actor_name,
exec_venue=account.exec_venue,
data_venue=account.data_venue,
ledger_authority=account.ledger_authority,
initial_capital=float(account.snapshot.capital or 25_000.0),
)
self._sink = sink or self._resolve_sink("pink")
self._v7_sink = v7_sink or self._resolve_v7_sink("pink")
self._leg_state: dict[str, dict[str, Any]] = {}
def set_kernel(self, kernel: Any) -> None:
"""Wire the ExecutionKernel after construction (e.g. from connect())."""
self._kernel = kernel
@staticmethod
def _resolve_sink(strategy: str) -> Writer:
from prod.ch_writer import ch_put_pink
return ch_put_pink
@staticmethod
def _resolve_v7_sink(strategy: str) -> Writer:
from prod.ch_writer import ch_put_pink_v7
return ch_put_pink_v7
def _account_event_seq(self) -> int:
"""event_seq from kernel's atomic account state; 0 when kernel not wired."""
if self._kernel is None:
return 0
try:
return int(self._kernel.snapshot().get("account", {}).get("event_seq", 0) or 0)
except Exception:
return 0
def _kernel_account(self) -> dict:
"""Full kernel account snapshot dict; empty dict when kernel not wired."""
if self._kernel is None:
return {}
try:
return self._kernel.snapshot().get("account", {}) or {}
except Exception:
return {}
def write_reconcile_event(
self,
*,
event_seq: int,
ts: Optional[Any] = None,
k_capital: float = 0.0,
e_wallet_balance: float = 0.0,
delta: float = 0.0,
status: str = "OK",
explanation: str = "",
) -> None:
"""
Persist one reconcile record to the reconcile_events table.
Idempotent: if the same event_seq is written twice, CH dedup (via
ReplacingMergeTree on event_seq) keeps the latest row only.
"""
from datetime import datetime, timezone
ts_val = ts or datetime.now(timezone.utc).isoformat()
self._sink("reconcile_events", {
"timestamp": ts_val if isinstance(ts_val, str) else ts_val.isoformat(),
"runtime_namespace": self.config.runtime_namespace,
"strategy_namespace": self.config.strategy_namespace,
"event_seq": int(event_seq),
"k_capital": float(k_capital),
"e_wallet_balance": float(e_wallet_balance),
"delta": float(delta),
"reconcile_status": str(status),
"explanation": str(explanation),
})
def _capital(self) -> float:
return float(self.account.snapshot.capital or 0.0)
def _peak_capital(self) -> float:
return float(getattr(self.account.snapshot, "peak_capital", self._capital()) or self._capital())
def _trade_seq(self) -> int:
return int(getattr(self.account.snapshot, "trade_seq", 0) or 0)
def _equity(self) -> float:
return float(self.account.snapshot.equity or self._capital())
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def persist_step(
self,
*,
snapshot: Any,
decision: Decision,
intent: Intent,
outcome: KernelOutcome | None = None,
slot_dict: dict[str, Any] | None = None,
acc_dict: dict[str, Any] | None = None,
phase: str = "step",
market_state: Mapping[str, Any] | None = None,
) -> None:
"""Two-phase persist: log the REQUEST, then log the RESULT.
REQUEST (:meth:`persist_request`) — the decision/order that was
submitted (policy_events + a trade_reconstruction ORDER_REQUESTED row).
RESULT (:meth:`persist_result`) — the settled state snapshot plus the
per-fill lifecycle rows, gated on *evidence of an actual fill*. A resting
LIMIT order (ACK only, no fill) therefore emits state snapshots but no
terminal rows; the async-fill pump persists those later via the same
result path. The synchronous-MARKET path is unchanged: its FILL event
(or the slot's filled/closed state) trips the same gate.
"""
self.persist_request(
snapshot=snapshot, decision=decision, intent=intent,
phase=phase, market_state=market_state,
)
self.persist_result(
snapshot=snapshot, decision=decision, intent=intent, outcome=outcome,
slot_dict=slot_dict, phase=phase, market_state=market_state,
)
def persist_request(
self,
*,
snapshot: Any,
decision: Decision,
intent: Intent,
phase: str = "step",
market_state: Mapping[str, Any] | None = None,
) -> None:
"""Phase 1 — log the requested decision/order (no fill data)."""
self._write_policy_event(snapshot, decision, intent, phase=phase)
if decision.action in (DecisionAction.ENTER, DecisionAction.EXIT):
self._write_trade_reconstruction(
snapshot, intent.trade_id,
event_type="ORDER_REQUESTED",
event_id=f"{intent.trade_id}:request:{decision.action.value.lower()}",
payload={
"decision": _decision_summary(decision),
"intent": _intent_summary(intent),
"market_state": _json_safe(market_state or {}),
},
market_state=market_state,
)
def persist_result(
self,
*,
snapshot: Any,
decision: Decision,
intent: Intent,
outcome: KernelOutcome | None = None,
slot_dict: dict[str, Any] | None = None,
phase: str = "step",
market_state: Mapping[str, Any] | None = None,
) -> None:
"""Phase 2 — log the settled state + per-fill lifecycle rows.
The state snapshot rows (account_events, position_state,
status_snapshots) always reflect the current slot. The lifecycle rows
(ENTRY_FILLED / PARTIAL_EXIT / EXIT / trade_events / trade_exit_legs) are
emitted only when a fill is *evidenced* — a FULL/PARTIAL_FILL event in
``outcome.emitted_events``, a closed slot, or a slot whose size dropped
vs the last leg snapshot. A resting LIMIT (ACK only) emits no terminal
rows here.
"""
slot = slot_dict or {}
stage = (
TradeStage(decision.stage.value)
if hasattr(decision.stage, "value")
else TradeStage(decision.stage) if isinstance(decision.stage, str)
else TradeStage.ORDER_REQUESTED
)
status = self._state_label(slot, phase)
self._write_account_event(snapshot, decision, intent, stage=stage, slot_dict=slot)
self._write_position_state(snapshot, decision, intent, slot_dict=slot, stage=stage, status=status, market_state=market_state)
self._write_status_snapshot(snapshot, decision, intent, slot_dict=slot, phase=phase)
if outcome is not None and outcome.diagnostic_code != KernelDiagnosticCode.OK:
self._write_anomaly(
snapshot, decision, intent,
anomaly=outcome.diagnostic_code.value,
origin="ditav2_kernel",
detail=outcome.details,
)
if outcome is None:
# Decision-only step (HOLD): state snapshot already written.
return
events = tuple(outcome.emitted_events or ())
has_fill_evt = any(
e.kind in (KernelEventKind.FULL_FILL, KernelEventKind.PARTIAL_FILL)
for e in events
)
slot_closed = bool(slot.get("closed", False))
cur_size = _safe_float(slot.get("size", 0.0), 0.0)
slot_open = (not slot_closed) and cur_size > 0.0
if decision.action == DecisionAction.ENTER:
# Emit ENTRY_FILLED only once the entry is actually filled (fill event
# or an open slot). A resting LIMIT entry emits nothing here.
if has_fill_evt or slot_open:
self._leg_state[intent.trade_id] = {
"prev_realized": 0.0,
"prev_size": _safe_float(
slot.get("initial_size", slot.get("size", 0.0)), 0.0
) or _safe_float(intent.target_size, 0.0),
"prev_leg_id": "",
}
self._write_trade_reconstruction(
snapshot, intent.trade_id,
event_type="ENTRY_FILLED",
event_id=f"{intent.trade_id}:entry",
payload={
"decision": _decision_summary(decision),
"intent": _intent_summary(intent),
"outcome": _outcome_summary(outcome),
"slot": slot,
"market_state": _json_safe(market_state or {}),
},
market_state=market_state,
)
return
if decision.action != DecisionAction.EXIT:
return
# An exit leg is evidenced by a fill event, a closed slot, or a drop in
# remaining size vs the previous leg snapshot. A resting LIMIT exit (no
# size change) emits nothing until the async-fill pump observes the fill.
prev_size = _safe_float(self._leg_state.get(intent.trade_id, {}).get("prev_size", 0.0), 0.0)
exit_filled = has_fill_evt or slot_closed or (prev_size - cur_size > 1e-12)
if not exit_filled:
return
partial = (not slot_closed) and cur_size > 0.0
# Extract fill price AND friction fields from emitted venue events.
# These are first-class fields on VenueEvent (Gap 1/2/3).
fill_price_hint = 0.0
fill_fee = 0.0
fill_fee_source = ""
fill_is_maker = False
fill_slippage_bps = 0.0
fill_mark_at_submit = 0.0
fill_exchange_ts = 0
for ev in events:
p_val = getattr(ev, "price", 0.0)
if p_val and math.isfinite(float(p_val)) and float(p_val) > 0:
fill_price_hint = float(p_val)
if getattr(ev, "fee", 0.0):
fill_fee = float(ev.fee)
if getattr(ev, "fee_source", ""):
fill_fee_source = str(ev.fee_source)
if getattr(ev, "is_maker", False):
fill_is_maker = bool(ev.is_maker)
if getattr(ev, "slippage_bps", 0.0):
fill_slippage_bps = float(ev.slippage_bps)
if getattr(ev, "mark_at_submit", 0.0):
fill_mark_at_submit = float(ev.mark_at_submit)
if getattr(ev, "exchange_ts", 0):
fill_exchange_ts = int(ev.exchange_ts)
self._write_trade_exit_leg(snapshot, decision, intent, slot, outcome,
fill_price_hint=fill_price_hint,
fill_fee=fill_fee, fill_fee_source=fill_fee_source,
fill_is_maker=fill_is_maker, fill_slippage_bps=fill_slippage_bps)
self._write_trade_reconstruction(
snapshot, intent.trade_id,
event_type="PARTIAL_EXIT" if partial else "EXIT",
event_id=f"{intent.trade_id}:{'partial' if partial else 'close'}",
payload={
"decision": _decision_summary(decision),
"intent": _intent_summary(intent),
"outcome": _outcome_summary(outcome),
"slot": slot,
"market_state": _json_safe(market_state or {}),
},
market_state=market_state,
)
if slot_closed:
self._write_trade_event(snapshot, decision, intent, slot, outcome,
market_state=market_state,
exit_price_hint=fill_price_hint,
fill_fee=fill_fee, fill_fee_source=fill_fee_source,
fill_is_maker=fill_is_maker,
fill_slippage_bps=fill_slippage_bps,
fill_mark_at_submit=fill_mark_at_submit,
fill_exchange_ts=fill_exchange_ts)
def persist_fill_events(
self,
*,
snapshot: Any,
events: Any,
slot_dict: dict[str, Any] | None = None,
market_state: Mapping[str, Any] | None = None,
) -> None:
"""Persist a late (async) venue fill drained by the runtime pump.
There is no fresh policy decision for an async fill, so we synthesize a
minimal Decision/Intent from the post-fill slot + event and route it
through :meth:`persist_result`. Direction (ENTER vs EXIT) is inferred
from the slot: a closed slot or a drop in remaining size vs the last leg
snapshot is an EXIT; otherwise an opening fill is an ENTER. Capital
authority remains the kernel — this only logs the settled result.
"""
slot = slot_dict or {}
event_list = tuple(events or ())
trade_id = str(slot.get("trade_id") or "")
asset = str(slot.get("asset") or "")
side = self._slot_side(slot)
closed = bool(slot.get("closed", False))
cur_size = self._slot_size(slot)
leverage = _safe_float(slot.get("leverage", 1.0), 1.0)
# Extract fill price from venue events (used as exit_price_hint for G21 fix).
price = next(
(float(getattr(e, "price", 0.0)) for e in event_list
if getattr(e, "price", 0.0) and math.isfinite(float(getattr(e, "price", 0.0)))),
0.0,
) or self._slot_entry_price(slot)
prev_size = _safe_float(self._leg_state.get(trade_id, {}).get("prev_size", 0.0), 0.0)
is_exit = closed or (prev_size > 0.0 and cur_size < prev_size - 1e-12)
action = DecisionAction.EXIT if is_exit else DecisionAction.ENTER
ts = getattr(snapshot, "timestamp", datetime.now(timezone.utc))
decision = Decision(
timestamp=ts, decision_id=trade_id or "async", asset=asset, action=action,
side=side, reason="ASYNC_FILL", confidence=0.0, velocity_divergence=0.0,
irp_alignment=0.0, reference_price=price, target_size=cur_size,
leverage=leverage, stage=TradeStage.POSITION_UPDATED, metadata={},
)
intent = Intent(
timestamp=ts, trade_id=trade_id, decision_id=trade_id or "async", asset=asset,
action=action, side=side, reason="ASYNC_FILL", target_size=cur_size,
leverage=leverage, reference_price=price, confidence=0.0,
exit_leg_ratios=tuple(slot.get("exit_leg_ratios", (1.0,)) or (1.0,)), metadata={},
)
outcome = KernelOutcome(
accepted=True, slot_id=int(slot.get("slot_id", 0) or 0), trade_id=trade_id,
state=KernelStage.CLOSED if closed else KernelStage.POSITION_OPEN,
diagnostic_code=KernelDiagnosticCode.OK, severity=KernelSeverity.INFO,
transitions=(), emitted_events=event_list, details={"origin": "async_fill_pump"},
)
self.persist_result(
snapshot=snapshot, decision=decision, intent=intent, outcome=outcome,
slot_dict=slot, phase="async_fill", market_state=market_state,
)
def persist_recovery_state(
self,
*,
snapshot: Any,
acc_dict: dict[str, Any] | None = None,
phase: str = "recovery",
event_type: str = "RECOVERY",
market_state: Mapping[str, Any] | None = None,
) -> None:
"""Persist recovery-only state after kernel reconcile.
A15 fix: acc_dict is the kernel account snapshot (capital/equity/pnl),
not a slot dict. Read the actual slot from kernel.slot(0) so that
trade_id, asset, size, and entry_price are correctly populated in the
recovery rows.
"""
# A15: read slot from kernel instead of misusing acc_dict as slot dict.
slot_dict: dict[str, Any] = {}
if self._kernel is not None:
try:
slot_view = self._kernel.slot(0)
raw = slot_view.to_dict() if hasattr(slot_view, "to_dict") else {}
slot_dict = dict(raw) if raw else {}
except Exception:
pass
trade_id = (
str(slot_dict.get("trade_id") or "")
or (str(acc_dict.get("trade_id", "")) if acc_dict else "")
)
self._write_status_snapshot(
snapshot, decision=None, intent=None, slot_dict=slot_dict, phase=phase,
)
self._write_account_event(
snapshot, decision=None, intent=None,
stage=TradeStage.TRADE_TERMINAL_WRITTEN,
slot_dict=slot_dict, event_type=event_type,
)
self._write_position_state(
snapshot, decision=None, intent=None,
slot_dict=slot_dict, stage=TradeStage.TRADE_TERMINAL_WRITTEN,
status=self._state_label(slot_dict, phase), market_state=market_state,
)
self._write_trade_reconstruction(
snapshot,
trade_id=trade_id,
event_type=event_type,
event_id=f"recovery:{phase}",
payload={"acc_dict": _json_safe(acc_dict or {}), "slot": _json_safe(slot_dict), "phase": phase, "market_state": _json_safe(market_state or {})},
market_state=market_state,
)
def persist_fee_settled(
self,
*,
trade_id: str,
fee: float,
fee_asset: str = "USDT",
is_maker: bool = False,
exchange_ts: int = 0,
realized_pnl_delta: float = 0.0,
ts: Optional[Any] = None,
) -> None:
"""Record the WS FILL_SETTLED fee arriving after the REST submit.
Gap 2: the REST ACK path writes fee_source="ESTIMATED_TAKER/MAKER".
When the WS ORDER_TRADE_UPDATE frame arrives with field "n" (actual
commission), call this method to log the settled truth.
The CH spool stores both the original estimated row AND this settled row.
Downstream queries can reconcile using:
SELECT trade_id, MAX(fee) FILTER(WHERE fee_source='WS_SETTLED') AS settled_fee,
MAX(fee) FILTER(WHERE fee_source LIKE 'ESTIMATED%') AS estimated_fee
FROM trade_events GROUP BY trade_id
This method writes to ``fee_settled_events`` (a lightweight supplementary
table, not trade_events) so the original row is never mutated.
"""
ts_val = ts or datetime.now(timezone.utc)
self._sink("fee_settled_events", {
"ts": ts_val.isoformat() if hasattr(ts_val, "isoformat") else str(ts_val),
"trade_id": trade_id,
"fee": float(fee),
"fee_asset": fee_asset,
"fee_source": "WS_SETTLED",
"is_maker": bool(is_maker),
"exchange_ts": int(exchange_ts),
"realized_pnl_delta": float(realized_pnl_delta),
"runtime_namespace": self.config.runtime_namespace,
"strategy": self.config.strategy,
})
def record_anomaly(
self,
*,
snapshot: Any,
decision: Any,
intent: Any,
anomaly: str,
origin: str = "emergent",
sensor: str = "",
detail: Any = "",
rm_meta: float = 0.0,
) -> None:
"""Persist a DITA anomaly row with legacy-compatible shape."""
self._sink(
"anomaly_events",
{
"ts": snapshot.timestamp.isoformat(),
"decision_id": decision.decision_id,
"trade_id": intent.trade_id,
"symbol": intent.asset,
"anomaly": anomaly,
"origin": origin,
"sensor": sensor,
"detail": _json_text(detail) if not isinstance(detail, str) else detail,
"rm_meta": float(rm_meta),
},
)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _state_label(slot_dict: dict[str, Any], phase: str) -> str:
if slot_dict.get("closed", False):
return "CLOSED"
if slot_dict.get("size", 0) > 0:
if phase.lower().startswith("recovery"):
return "RECOVERED_OPEN"
return "OPEN"
return "FLAT"
def _posture(self, slot_dict: dict[str, Any]) -> str:
if slot_dict.get("closed", False) or not slot_dict.get("size", 0):
return "FLAT"
return str(slot_dict.get("side", "FLAT"))
def _slot_entry_price(self, slot_dict: dict[str, Any]) -> float:
return _safe_float(slot_dict.get("entry_price", 0.0), 0.0)
def _slot_size(self, slot_dict: dict[str, Any]) -> float:
return _safe_float(slot_dict.get("size", 0.0), 0.0)
def _slot_side(self, slot_dict: dict[str, Any]) -> TradeSide:
raw = str(slot_dict.get("side", "FLAT")).upper()
if raw == "SHORT":
return TradeSide.SHORT
if raw == "LONG":
return TradeSide.LONG
return TradeSide.FLAT
def _slot_trade_id(self, slot_dict: dict[str, Any]) -> str:
return str(slot_dict.get("trade_id", ""))
def _slot_asset(self, slot_dict: dict[str, Any]) -> str:
return str(slot_dict.get("asset", ""))
# ------------------------------------------------------------------
# Row writers
# ------------------------------------------------------------------
def _write_anomaly(
self, snapshot: Any, decision: Decision, intent: Intent,
*, anomaly: str, origin: str = "ditav2_kernel", detail: Any = "",
) -> None:
self._sink("anomaly_events", {
"ts": snapshot.timestamp.isoformat(),
"decision_id": decision.decision_id,
"trade_id": intent.trade_id,
"symbol": intent.asset,
"anomaly": anomaly,
"origin": origin,
"sensor": "",
"detail": _json_text(detail) if not isinstance(detail, str) else detail,
"rm_meta": 0.0,
})
def _write_policy_event(
self, snapshot: Any, decision: Decision, intent: Intent, *, phase: str,
) -> None:
price = _safe_float(decision.reference_price, 0.0)
quantity = _safe_float(intent.target_size, 0.0)
row = {
"ts": snapshot.timestamp.isoformat(),
"strategy": self.config.strategy,
"runtime_namespace": self.config.runtime_namespace,
"strategy_namespace": self.config.strategy_namespace,
"event_namespace": self.config.event_namespace,
"actor_name": self.config.actor_name,
"exec_venue": self.config.exec_venue,
"data_venue": self.config.data_venue,
"source": "ditav2",
"trade_id": intent.trade_id,
"asset": decision.asset,
"side": decision.side.value,
"entry_price": price,
"current_price": price,
"quantity": quantity,
"notional": _notional(quantity, price),
"leverage": _safe_float(intent.leverage, 1.0),
"bar_idx": 0,
"decision_seq": self._trade_seq(),
"bars_held": int(intent.bars_held or 0),
"action": decision.action.value,
"reason": decision.reason,
"pnl_pct": 0.0,
"mfe": 0.0,
"mae": 0.0,
"mfe_risk": 0.0,
"mae_risk": 0.0,
"exit_pressure": 0.0,
"rv_comp": 0.0,
"mae_thresh1": 0.0,
"bounce_score": 0.0,
"bounce_risk": 0.0,
"ob_imbalance": 0.0,
"vel_div_entry": float(decision.velocity_divergence or 0.0),
"vel_div_now": float(decision.velocity_divergence or 0.0),
"v50_vel": 0.0,
"v750_vel": 0.0,
"exf_funding": 0.0,
"exf_dvol": 0.0,
"exf_fear_greed": 0.0,
"exf_taker": 0.0,
"posture": decision.side.value,
}
self._sink("policy_events", row)
self._v7_sink("v7_decision_events", row)
def _write_account_event(
self, snapshot: Any, decision: Decision | None, intent: Intent | None,
*, stage: TradeStage, slot_dict: dict[str, Any], event_type: str | None = None,
) -> None:
capital = self._capital()
peak_cap = self._peak_capital()
is_open = not slot_dict.get("closed", False) and slot_dict.get("size", 0) > 0
open_notional = _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)) if is_open else 0.0
drawdown_pct = 0.0 if peak_cap <= 0 else max(0.0, (peak_cap - capital) / peak_cap)
row = {
"ts": snapshot.timestamp.isoformat(),
"event_type": event_type or stage.value,
"strategy": self.config.strategy,
"posture": self._posture(slot_dict),
"capital": capital,
"peak_capital": peak_cap,
"drawdown_pct": drawdown_pct,
"pnl_today": float(self.account.snapshot.realized_pnl or 0.0),
"trades_today": self._trade_seq(),
"open_positions": 1 if is_open else 0,
"boost": 1.0,
"beta": 0.0,
"current_open_notional": open_notional,
"current_account_leverage": 0.0 if capital <= 0 else open_notional / capital,
"exchange_leverage": int(round(_safe_float(slot_dict.get("leverage", 0.0), 0.0))),
"exchange_leverage_mode": self.config.exchange_leverage_mode,
"leverage_mapping_rule": self.config.leverage_mapping_rule,
"runtime_namespace": self.config.runtime_namespace,
"strategy_namespace": self.config.strategy_namespace,
"event_namespace": self.config.event_namespace,
"actor_name": self.config.actor_name,
"exec_venue": self.config.exec_venue,
"data_venue": self.config.data_venue,
"notes": _json_text({
"decision_id": None if decision is None else decision.decision_id,
"trade_id": None if intent is None else intent.trade_id,
"reason": None if intent is None else intent.reason,
"stage": stage.value,
}),
# Phase 4: kernel atomic account versioning
"account_event_seq": self._account_event_seq(),
"reconcile_status": self._kernel_account().get("reconcile_status", "OK"),
}
self._sink("account_events", row)
# Write a reconcile_events record whenever E-facts are present
ka = self._kernel_account()
if ka.get("e_wallet_balance", 0.0) > 0:
self.write_reconcile_event(
event_seq=ka.get("event_seq", 0),
ts=snapshot.timestamp,
k_capital=ka.get("k_capital", 0.0),
e_wallet_balance=ka.get("e_wallet_balance", 0.0),
delta=ka.get("reconcile_delta", 0.0),
status=ka.get("reconcile_status", "OK"),
explanation=ka.get("reconcile_explanation", ""),
)
def _write_position_state(
self, snapshot: Any, decision: Decision | None, intent: Intent | None,
*, slot_dict: dict[str, Any], stage: TradeStage, status: str,
market_state: Mapping[str, Any] | None = None,
) -> None:
side = self._slot_side(slot_dict)
trade_id = self._slot_trade_id(slot_dict)
asset = self._slot_asset(slot_dict)
if not trade_id and intent is not None:
trade_id = intent.trade_id
asset = intent.asset
side = intent.side
row = {
"ts": snapshot.timestamp.isoformat(),
"trade_id": trade_id,
"asset": asset,
"direction": _direction(side),
"entry_price": self._slot_entry_price(slot_dict),
"quantity": self._slot_size(slot_dict),
"notional": _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)),
"leverage": _safe_float(slot_dict.get("leverage", 0.0), 0.0),
"bucket_id": -1,
# A14 fix: active_leg_index is the exit-leg counter, not the bar count.
# Use intent.bars_held when available; fall back to 0.
"entry_bar": int(intent.bars_held if intent is not None else 0) or 0,
"status": status,
"exit_reason": slot_dict.get("close_reason", ""),
"pnl": _safe_float(slot_dict.get("realized_pnl", 0.0), 0.0),
"bars_held": 0,
"market_state_bundle_json": _json_text(market_state or {}),
"tp_base_pct": 0.0,
"tp_effective_pct": 0.0,
"our_leverage": _safe_float(slot_dict.get("leverage", 0.0), 0.0),
}
self._sink("position_state", row)
def _write_status_snapshot(
self, snapshot: Any, decision: Decision | None, intent: Intent | None,
*, slot_dict: dict[str, Any], phase: str,
) -> None:
capital = self._capital()
peak_cap = self._peak_capital()
is_open = not slot_dict.get("closed", False) and slot_dict.get("size", 0) > 0
open_notional = _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)) if is_open else 0.0
leverage = 0.0 if capital <= 0 else open_notional / capital
drawdown = 0.0 if peak_cap <= 0 else max(0.0, (peak_cap - capital) / peak_cap)
row = {
"ts": snapshot.timestamp.isoformat(timespec="milliseconds"),
"capital": capital,
"roi_pct": 0.0 if self.config.initial_capital <= 0 else ((capital / self.config.initial_capital) - 1.0) * 100.0,
"dd_pct": drawdown * 100.0,
"trades_executed": self._trade_seq(),
"posture": self._posture(slot_dict),
"rm": 1.0 if decision is None else max(0.0, min(1.0, decision.confidence)),
"vel_div": 0.0 if decision is None else float(decision.velocity_divergence),
"vol_ok": 1,
"phase": phase,
"mhs_status": "GREEN",
"boost": 1.0,
"cat5": 0.0,
"conviction_multiplier": 0.0 if intent is None else float(intent.confidence or 0.0),
"exchange_leverage": int(round(_safe_float(slot_dict.get("leverage", 0.0), 0.0))),
"exchange_leverage_mode": self.config.exchange_leverage_mode,
"leverage_mapping_rule": self.config.leverage_mapping_rule,
"account_capital": capital,
"portfolio_capital": capital,
"current_open_notional": open_notional,
"current_account_leverage": leverage,
"remaining_notional_capacity": max(0.0, self.config.max_account_leverage * capital - open_notional),
"max_account_leverage": self.config.max_account_leverage,
"ledger_authority": self.config.ledger_authority,
}
self._sink("status_snapshots", row)
def _write_trade_exit_leg(
self, snapshot: Any, decision: Decision, intent: Intent,
slot_dict: dict[str, Any], outcome: KernelOutcome | None,
fill_price_hint: float = 0.0,
fill_fee: float = 0.0,
fill_fee_source: str = "",
fill_is_maker: bool = False,
fill_slippage_bps: float = 0.0,
) -> None:
"""Emit one BLUE-schema-compatible ``trade_exit_legs`` row per exit leg.
The DITAv2 kernel uses a single slot with sequential exit legs rather
than BLUE's chained per-leg trade_ids, so the chain_* columns describe
the leg sequence within this one trade (root = trade_id). Per-leg deltas
(exit_qty, pnl_leg) are computed against the previous leg's snapshot held
in ``self._leg_state`` so each row is isolated, not cumulative.
"""
trade_id = intent.trade_id
prev = self._leg_state.get(trade_id) or {
"prev_realized": 0.0,
"prev_size": _safe_float(slot_dict.get("initial_size", 0.0), 0.0),
"prev_leg_id": "",
}
entry_price = self._slot_entry_price(slot_dict) or _safe_float(intent.reference_price, 0.0)
# G21 fix: use fill price hint (actual exchange fill) > intent ref > decision ref.
exit_price = (
fill_price_hint
or _safe_float(intent.reference_price, 0.0)
or _safe_float(decision.reference_price, 0.0)
)
side = self._slot_side(slot_dict)
if side == TradeSide.FLAT:
side = intent.side
leverage_val = _safe_float(slot_dict.get("leverage", intent.leverage), 1.0)
cur_size = self._slot_size(slot_dict)
cur_realized = _checked_float(slot_dict.get("realized_pnl", 0.0), 0.0,
field="realized_pnl", trade_id=trade_id, sink=self._sink)
prev_size = _safe_float(prev.get("prev_size", 0.0), 0.0)
prev_realized = _safe_float(prev.get("prev_realized", 0.0), 0.0)
# active_leg_index is post-fill (already advanced); the leg that just
# filled is therefore one behind. Clamp to a valid ratio index.
ratios = slot_dict.get("exit_leg_ratios", []) or []
leg_index = max(0, int(slot_dict.get("active_leg_index", 0) or 0) - 1)
fraction = _safe_float(ratios[leg_index], 0.0) if 0 <= leg_index < len(ratios) else 0.0
# External-position fix: if leg_state was never seeded (position detected via
# reconcile/pump rather than our own ENTER), prev_size=0 would make exit_qty=0.
# Fall back to initial_size or intent.target_size so the leg row is meaningful.
if prev_size <= 1e-12:
# No prior leg tracking — use the slot's initial_size or intent size.
initial = _safe_float(slot_dict.get("initial_size", 0.0), 0.0) or _safe_float(intent.target_size, 0.0)
exit_qty = initial - cur_size if initial > cur_size else initial
else:
exit_qty = max(0.0, prev_size - cur_size)
pnl_leg = cur_realized - prev_realized
capital_after = self._capital()
capital_before = capital_after - pnl_leg
exit_notional = _notional(exit_qty, exit_price or entry_price)
remaining_notional = _notional(cur_size, entry_price)
denom = abs(exit_qty * entry_price * max(leverage_val, 1e-9))
pnl_pct_leg = pnl_leg / denom if denom > 0 else 0.0
exit_leg_id = f"{trade_id}:leg{leg_index}"
self._sink("trade_exit_legs", {
"ts": snapshot.timestamp.isoformat(),
"date": snapshot.timestamp.date().isoformat(),
"strategy": self.config.strategy,
"trade_id": trade_id,
"chain_root_trade_id": trade_id,
"chain_head_leg_id": f"{trade_id}:leg0",
"chain_prev_leg_id": str(prev.get("prev_leg_id", "") or ""),
"chain_seq": leg_index,
"chain_token": trade_id,
"chain_mode": "LIVE",
"exit_leg_id": exit_leg_id,
"exit_seq": leg_index,
"command_id": decision.decision_id,
"source": "ditav2",
"reason": intent.reason,
"asset": intent.asset,
"side": side.value,
"entry_price": entry_price,
"exit_price": exit_price,
"exit_qty": exit_qty,
"fraction": fraction,
"capital_before": capital_before,
"capital_after": capital_after,
"exit_notional": exit_notional,
"remaining_notional": remaining_notional,
"remaining_qty": cur_size,
"pnl_pct_leg": pnl_pct_leg,
"pnl_leg": pnl_leg,
"pnl_realized_total": cur_realized,
"bars_held": int(intent.bars_held or 0),
# Gap 1/2/3: per-leg friction
"fee_leg": fill_fee,
"fee_source": fill_fee_source,
"is_maker": fill_is_maker,
"slippage_bps": fill_slippage_bps,
})
# Advance the per-trade leg snapshot for the next leg's delta.
self._leg_state[trade_id] = {
"prev_realized": cur_realized,
"prev_size": cur_size,
"prev_leg_id": exit_leg_id,
}
def _write_trade_event(
self, snapshot: Any, decision: Decision, intent: Intent,
slot_dict: dict[str, Any], outcome: KernelOutcome | None,
*, market_state: Mapping[str, Any] | None = None,
exit_price_hint: float = 0.0,
fill_fee: float = 0.0,
fill_fee_source: str = "",
fill_is_maker: bool = False,
fill_slippage_bps: float = 0.0,
fill_mark_at_submit: float = 0.0,
fill_exchange_ts: int = 0,
) -> None:
entry_price = _safe_float(slot_dict.get("entry_price", 0.0), 0.0) or _safe_float(intent.reference_price, 0.0)
quantity = _safe_float(slot_dict.get("initial_size", slot_dict.get("size", 0.0)), 0.0) or _safe_float(intent.target_size, 0.0)
# G21 fix: exit_price is the fill/order price, NOT the entry price.
# Priority: explicit hint (fill event price) > intent reference price > decision price.
# Fall back to entry_price only as absolute last resort (avoids the G21 bug where
# every trade_events row had exit_price == entry_price and PnL reconstruction was zero).
exit_price = (
exit_price_hint
or _safe_float(intent.reference_price, 0.0)
or _safe_float(decision.reference_price, 0.0)
or _safe_float(slot_dict.get("entry_price", 0.0), 0.0)
)
tid = intent.trade_id if intent is not None else ""
pnl = _checked_float(slot_dict.get("realized_pnl", 0.0), 0.0,
field="realized_pnl", trade_id=tid, sink=self._sink)
pnl_pct = 0.0
leverage_val = _safe_float(slot_dict.get("leverage", intent.leverage), 1.0)
denom = abs(quantity * entry_price * max(leverage_val, 1e-9))
if denom > 0:
pnl_pct = pnl / denom
capital_after = self._capital()
capital_before = capital_after - pnl
open_notional = _notional(quantity, exit_price or entry_price)
conviction = float(intent.confidence or decision.confidence or 0.0)
metadata = intent.metadata if intent is not None else (decision.metadata if decision is not None else {})
row = {
"ts": snapshot.timestamp.isoformat(),
"date": snapshot.timestamp.date().isoformat(),
"strategy": self.config.strategy,
"trade_id": intent.trade_id,
"asset": intent.asset,
"side": intent.side.value,
"entry_price": entry_price,
"exit_price": exit_price,
"quantity": quantity,
"pnl": pnl,
"pnl_pct": pnl_pct,
"exit_reason": intent.reason,
"vel_div_entry": float(decision.velocity_divergence or 0.0),
"boost_at_entry": 1.0,
"beta_at_entry": 0.0,
"posture": intent.side.value,
"leverage": leverage_val,
"conviction_multiplier": conviction,
"exchange_leverage": int(round(leverage_val)),
"exchange_leverage_mode": self.config.exchange_leverage_mode,
"leverage_mapping_rule": self.config.leverage_mapping_rule,
"runtime_namespace": self.config.runtime_namespace,
"strategy_namespace": self.config.strategy_namespace,
"event_namespace": self.config.event_namespace,
"actor_name": self.config.actor_name,
"exec_venue": self.config.exec_venue,
"data_venue": self.config.data_venue,
"account_capital": capital_after,
"portfolio_capital": capital_after,
"current_open_notional": open_notional,
"remaining_notional_capacity": max(0.0, self.config.max_account_leverage * capital_after - open_notional),
"max_account_leverage": self.config.max_account_leverage,
"margin_required": 0.0 if leverage_val <= 0 else open_notional / leverage_val,
"ledger_authority": self.config.ledger_authority,
"regime_signal": 0,
"capital_before": capital_before,
"capital_after": capital_after,
"peak_capital": self._peak_capital(),
"drawdown_at_entry": 0.0 if self._peak_capital() <= 0 else max(0.0, (self._peak_capital() - capital_before) / self._peak_capital()),
"open_positions_count": 0,
"scan_uuid": decision.decision_id,
"bars_held": int(intent.bars_held or 0),
"entry_payload_json": _json_text({"decision": _decision_summary(decision), "intent": _intent_summary(intent)}),
"exit_payload_json": _json_text({"outcome": _outcome_summary(outcome), "slot": _json_safe(slot_dict)}),
"execution_payload_json": _json_text({"outcome": _outcome_summary(outcome)}),
# Gap 1/2/3: fee, maker/taker, slippage, exchange timing.
# fee_source provenance: "ESTIMATED_TAKER" | "ESTIMATED_MAKER" | "WS_SETTLED" | "REST_SETTLED"
"fee": fill_fee,
"fee_source": fill_fee_source,
"is_maker": fill_is_maker,
"slippage_bps": fill_slippage_bps,
"mark_at_submit": fill_mark_at_submit,
"exchange_ts": fill_exchange_ts,
"friction_payload_json": _json_text({
"fee": fill_fee, "fee_source": fill_fee_source,
"is_maker": fill_is_maker, "slippage_bps": fill_slippage_bps,
"mark_at_submit": fill_mark_at_submit, "exchange_ts": fill_exchange_ts,
}),
"event_payload_json": _json_text({"phase": "terminal_close", "trade_id": intent.trade_id}),
"market_state_bundle_json": _json_text(market_state or {}),
"tp_base_pct": _safe_float(metadata.get("tp_base_pct", 0.0), 0.0),
"tp_effective_pct": _safe_float(metadata.get("tp_effective_pct", 0.0), 0.0),
"our_leverage": _safe_float(metadata.get("our_leverage", 0.0), 0.0),
}
self._sink("trade_events", row)
def _write_trade_reconstruction(
self, snapshot: Any, trade_id: str, *,
event_type: str, event_id: str, payload: Any,
market_state: Mapping[str, Any] | None = None,
) -> None:
self._sink("trade_reconstruction", {
"ts": snapshot.timestamp.isoformat(),
"trade_id": trade_id,
"event_type": event_type,
"event_id": event_id,
"payload_json": _json_text(payload),
"market_state_bundle_json": _json_text(market_state or {}),
})