Files
siloqy/prod/clean_arch/persistence/pink_clickhouse.py

1166 lines
50 KiB
Python
Raw Normal View History

"""PINK ClickHouse persistence — DITAv2-backed, reads capital from kernel.
Row families preserved (same schema, no new columns):
- policy_events / v7_decision_events
- position_state
- account_events
- status_snapshots
- trade_events
- trade_reconstruction
- trade_exit_legs
- anomaly_events
Capital/peak_capital/trade_seq are read from the kernel's AccountProjection
(single authority). No duplicate tracking in this module.
"""
from __future__ import annotations
import json
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
import logging
import math
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Any, Callable, Mapping, Optional
from prod.clean_arch.dita import AccountProjection, Decision, DecisionAction, Intent, TradeSide, TradeStage
from prod.clean_arch.dita_v2.contracts import KernelDiagnosticCode, KernelEventKind, KernelOutcome
from prod.clean_arch.dita_v2.contracts import KernelSeverity, TradeStage as KernelStage
Writer = Callable[[str, dict[str, Any]], None]
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
_log = logging.getLogger(__name__)
def _json_safe(value: Any) -> Any:
if isinstance(value, Enum):
return value.value
if isinstance(value, dict):
return {str(key): _json_safe(val) for key, val in value.items()}
if isinstance(value, (list, tuple)):
return [_json_safe(item) for item in value]
if hasattr(value, "isoformat"):
try:
return value.isoformat()
except Exception:
pass
if hasattr(value, "__dict__"):
try:
return _json_safe(dict(vars(value)))
except Exception:
pass
return value
def _json_text(value: Any) -> str:
return json.dumps(_json_safe(value), separators=(",", ":"), ensure_ascii=False, default=str)
def _direction(side: TradeSide) -> int:
return -1 if side == TradeSide.SHORT else 1
def _direction_from_str(side: str) -> int:
return -1 if side.upper() in ("SHORT", "SELL") else 1
def _notional(size: float, price: float) -> float:
if not math.isfinite(size) or not math.isfinite(price):
return 0.0
return abs(size) * abs(price)
def _safe_float(value: Any, default: float = 0.0) -> float:
try:
out = float(value)
except Exception:
return default
if not math.isfinite(out):
return default
return out
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
def _checked_float(
value: Any,
default: float = 0.0,
*,
field: str = "?",
trade_id: str = "",
sink: Writer | None = None,
) -> float:
"""_safe_float with anomaly tracing.
Any NaN/inf/non-numeric value is a bug indicator, not a normal condition.
Sanitise to ``default`` but log a WARNING and optionally write an
``anomaly_events`` spool row so the trace is queryable in ClickHouse.
"""
try:
out = float(value)
except Exception:
out = float("nan")
if not math.isfinite(out):
_log.warning(
"NaN/inf in financial field field=%s trade_id=%s raw=%r → replacing with %s",
field, trade_id or "?", value, default,
)
if sink is not None:
try:
sink("anomaly_events", {
"ts": datetime.now(timezone.utc).isoformat(),
"decision_id": "",
"trade_id": trade_id,
"symbol": "",
"anomaly": f"NaN_FINANCIAL_FIELD:{field}",
"origin": "persistence_nan_guard",
"sensor": field,
"detail": f"raw={value!r} replaced_with={default}",
"rm_meta": 0.0,
})
except Exception:
pass
return default
return out
def _decision_summary(decision: Decision | None) -> dict[str, Any]:
if decision is None:
return {}
return {
"timestamp": decision.timestamp.isoformat() if hasattr(decision.timestamp, "isoformat") else str(decision.timestamp),
"decision_id": decision.decision_id,
"asset": decision.asset,
"action": decision.action.value,
"side": decision.side.value,
"reason": decision.reason,
"confidence": float(decision.confidence or 0.0),
"velocity_divergence": float(decision.velocity_divergence or 0.0),
"irp_alignment": float(decision.irp_alignment or 0.0),
"reference_price": float(decision.reference_price or 0.0),
"target_size": float(decision.target_size or 0.0),
"leverage": float(decision.leverage or 0.0),
"bars_held": int(decision.bars_held or 0),
"stage": decision.stage.value,
"metadata": _json_safe(decision.metadata),
}
def _intent_summary(intent: Intent | None) -> dict[str, Any]:
if intent is None:
return {}
return {
"timestamp": intent.timestamp.isoformat() if hasattr(intent.timestamp, "isoformat") else str(intent.timestamp),
"trade_id": intent.trade_id,
"decision_id": intent.decision_id,
"asset": intent.asset,
"action": intent.action.value,
"side": intent.side.value,
"reason": intent.reason,
"target_size": float(intent.target_size or 0.0),
"leverage": float(intent.leverage or 0.0),
"reference_price": float(intent.reference_price or 0.0),
"confidence": float(intent.confidence or 0.0),
"bars_held": int(intent.bars_held or 0),
"stage": intent.stage.value,
"exit_leg_ratios": [float(r) for r in intent.exit_leg_ratios],
"metadata": _json_safe(intent.metadata),
}
def _outcome_summary(outcome: KernelOutcome | None) -> dict[str, Any]:
if outcome is None:
return {}
return {
"accepted": bool(outcome.accepted),
"slot_id": int(outcome.slot_id),
"trade_id": outcome.trade_id,
"state": outcome.state.value,
"diagnostic_code": outcome.diagnostic_code.value,
"severity": outcome.severity.value,
"details": _json_safe(outcome.details),
}
@dataclass(frozen=True)
class PinkClickHousePersistenceConfig:
"""Row-shape knobs for the PINK ClickHouse mirror."""
strategy: str = "pink"
runtime_namespace: str = "pink"
strategy_namespace: str = "pink"
event_namespace: str = "pink"
actor_name: str = "PinkDirectRuntime"
exec_venue: str = "bingx"
data_venue: str = "binance"
ledger_authority: str = "exchange"
initial_capital: float = 25_000.0
max_account_leverage: float = 3.0
exchange_leverage_mode: str = ""
leverage_mapping_rule: str = "round_half_even_linear_0.5_to_9.0_to_1_to_exchange_cap"
class PinkClickHousePersistence:
"""Durable PINK ClickHouse sink — capital reads from kernel AccountProjection."""
def __init__(
self,
account: AccountProjection,
*,
config: PinkClickHousePersistenceConfig | None = None,
sink: Writer | None = None,
v7_sink: Writer | None = None,
kernel: Optional[Any] = None, # ExecutionKernel — optional; enables event_seq/reconcile writes
) -> None:
self.account = account
self._kernel = kernel # set post-construction via set_kernel() if needed
self.config = config or PinkClickHousePersistenceConfig(
runtime_namespace=account.runtime_namespace,
strategy_namespace=account.strategy_namespace,
event_namespace=account.event_namespace,
actor_name=account.actor_name,
exec_venue=account.exec_venue,
data_venue=account.data_venue,
ledger_authority=account.ledger_authority,
initial_capital=float(account.snapshot.capital or 25_000.0),
)
self._sink = sink or self._resolve_sink("pink")
self._v7_sink = v7_sink or self._resolve_v7_sink("pink")
self._leg_state: dict[str, dict[str, Any]] = {}
def set_kernel(self, kernel: Any) -> None:
"""Wire the ExecutionKernel after construction (e.g. from connect())."""
self._kernel = kernel
@staticmethod
def _resolve_sink(strategy: str) -> Writer:
from prod.ch_writer import ch_put_pink
return ch_put_pink
@staticmethod
def _resolve_v7_sink(strategy: str) -> Writer:
from prod.ch_writer import ch_put_pink_v7
return ch_put_pink_v7
def _account_event_seq(self) -> int:
"""event_seq from kernel's atomic account state; 0 when kernel not wired."""
if self._kernel is None:
return 0
try:
return int(self._kernel.snapshot().get("account", {}).get("event_seq", 0) or 0)
except Exception:
return 0
def _kernel_account(self) -> dict:
"""Full kernel account snapshot dict; empty dict when kernel not wired."""
if self._kernel is None:
return {}
try:
return self._kernel.snapshot().get("account", {}) or {}
except Exception:
return {}
def write_reconcile_event(
self,
*,
event_seq: int,
ts: Optional[Any] = None,
k_capital: float = 0.0,
e_wallet_balance: float = 0.0,
delta: float = 0.0,
status: str = "OK",
explanation: str = "",
) -> None:
"""
Persist one reconcile record to the reconcile_events table.
Idempotent: if the same event_seq is written twice, CH dedup (via
ReplacingMergeTree on event_seq) keeps the latest row only.
"""
from datetime import datetime, timezone
ts_val = ts or datetime.now(timezone.utc).isoformat()
self._sink("reconcile_events", {
"timestamp": ts_val if isinstance(ts_val, str) else ts_val.isoformat(),
"runtime_namespace": self.config.runtime_namespace,
"strategy_namespace": self.config.strategy_namespace,
"event_seq": int(event_seq),
"k_capital": float(k_capital),
"e_wallet_balance": float(e_wallet_balance),
"delta": float(delta),
"reconcile_status": str(status),
"explanation": str(explanation),
})
def _capital(self) -> float:
return float(self.account.snapshot.capital or 0.0)
def _peak_capital(self) -> float:
return float(getattr(self.account.snapshot, "peak_capital", self._capital()) or self._capital())
def _trade_seq(self) -> int:
return int(getattr(self.account.snapshot, "trade_seq", 0) or 0)
def _equity(self) -> float:
return float(self.account.snapshot.equity or self._capital())
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def persist_step(
self,
*,
snapshot: Any,
decision: Decision,
intent: Intent,
outcome: KernelOutcome | None = None,
slot_dict: dict[str, Any] | None = None,
acc_dict: dict[str, Any] | None = None,
phase: str = "step",
market_state: Mapping[str, Any] | None = None,
) -> None:
"""Two-phase persist: log the REQUEST, then log the RESULT.
REQUEST (:meth:`persist_request`) the decision/order that was
submitted (policy_events + a trade_reconstruction ORDER_REQUESTED row).
RESULT (:meth:`persist_result`) the settled state snapshot plus the
per-fill lifecycle rows, gated on *evidence of an actual fill*. A resting
LIMIT order (ACK only, no fill) therefore emits state snapshots but no
terminal rows; the async-fill pump persists those later via the same
result path. The synchronous-MARKET path is unchanged: its FILL event
(or the slot's filled/closed state) trips the same gate.
"""
self.persist_request(
snapshot=snapshot, decision=decision, intent=intent,
phase=phase, market_state=market_state,
)
self.persist_result(
snapshot=snapshot, decision=decision, intent=intent, outcome=outcome,
slot_dict=slot_dict, phase=phase, market_state=market_state,
)
def persist_request(
self,
*,
snapshot: Any,
decision: Decision,
intent: Intent,
phase: str = "step",
market_state: Mapping[str, Any] | None = None,
) -> None:
"""Phase 1 — log the requested decision/order (no fill data)."""
self._write_policy_event(snapshot, decision, intent, phase=phase)
if decision.action in (DecisionAction.ENTER, DecisionAction.EXIT):
self._write_trade_reconstruction(
snapshot, intent.trade_id,
event_type="ORDER_REQUESTED",
event_id=f"{intent.trade_id}:request:{decision.action.value.lower()}",
payload={
"decision": _decision_summary(decision),
"intent": _intent_summary(intent),
"market_state": _json_safe(market_state or {}),
},
market_state=market_state,
)
def persist_result(
self,
*,
snapshot: Any,
decision: Decision,
intent: Intent,
outcome: KernelOutcome | None = None,
slot_dict: dict[str, Any] | None = None,
phase: str = "step",
market_state: Mapping[str, Any] | None = None,
) -> None:
"""Phase 2 — log the settled state + per-fill lifecycle rows.
The state snapshot rows (account_events, position_state,
status_snapshots) always reflect the current slot. The lifecycle rows
(ENTRY_FILLED / PARTIAL_EXIT / EXIT / trade_events / trade_exit_legs) are
emitted only when a fill is *evidenced* a FULL/PARTIAL_FILL event in
``outcome.emitted_events``, a closed slot, or a slot whose size dropped
vs the last leg snapshot. A resting LIMIT (ACK only) emits no terminal
rows here.
"""
slot = slot_dict or {}
stage = (
TradeStage(decision.stage.value)
if hasattr(decision.stage, "value")
else TradeStage(decision.stage) if isinstance(decision.stage, str)
else TradeStage.ORDER_REQUESTED
)
status = self._state_label(slot, phase)
self._write_account_event(snapshot, decision, intent, stage=stage, slot_dict=slot)
self._write_position_state(snapshot, decision, intent, slot_dict=slot, stage=stage, status=status, market_state=market_state)
self._write_status_snapshot(snapshot, decision, intent, slot_dict=slot, phase=phase)
if outcome is not None and outcome.diagnostic_code != KernelDiagnosticCode.OK:
self._write_anomaly(
snapshot, decision, intent,
anomaly=outcome.diagnostic_code.value,
origin="ditav2_kernel",
detail=outcome.details,
)
if outcome is None:
# Decision-only step (HOLD): state snapshot already written.
return
events = tuple(outcome.emitted_events or ())
has_fill_evt = any(
e.kind in (KernelEventKind.FULL_FILL, KernelEventKind.PARTIAL_FILL)
for e in events
)
slot_closed = bool(slot.get("closed", False))
cur_size = _safe_float(slot.get("size", 0.0), 0.0)
slot_open = (not slot_closed) and cur_size > 0.0
if decision.action == DecisionAction.ENTER:
# Emit ENTRY_FILLED only once the entry is actually filled (fill event
# or an open slot). A resting LIMIT entry emits nothing here.
if has_fill_evt or slot_open:
self._leg_state[intent.trade_id] = {
"prev_realized": 0.0,
"prev_size": _safe_float(
slot.get("initial_size", slot.get("size", 0.0)), 0.0
) or _safe_float(intent.target_size, 0.0),
"prev_leg_id": "",
}
self._write_trade_reconstruction(
snapshot, intent.trade_id,
event_type="ENTRY_FILLED",
event_id=f"{intent.trade_id}:entry",
payload={
"decision": _decision_summary(decision),
"intent": _intent_summary(intent),
"outcome": _outcome_summary(outcome),
"slot": slot,
"market_state": _json_safe(market_state or {}),
},
market_state=market_state,
)
return
if decision.action != DecisionAction.EXIT:
return
# An exit leg is evidenced by a fill event, a closed slot, or a drop in
# remaining size vs the previous leg snapshot. A resting LIMIT exit (no
# size change) emits nothing until the async-fill pump observes the fill.
prev_size = _safe_float(self._leg_state.get(intent.trade_id, {}).get("prev_size", 0.0), 0.0)
exit_filled = has_fill_evt or slot_closed or (prev_size - cur_size > 1e-12)
if not exit_filled:
return
partial = (not slot_closed) and cur_size > 0.0
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
# Extract fill price AND friction fields from emitted venue events.
# These are first-class fields on VenueEvent (Gap 1/2/3).
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
fill_price_hint = 0.0
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
fill_fee = 0.0
fill_fee_source = ""
fill_is_maker = False
fill_slippage_bps = 0.0
fill_mark_at_submit = 0.0
fill_exchange_ts = 0
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
for ev in events:
p_val = getattr(ev, "price", 0.0)
if p_val and math.isfinite(float(p_val)) and float(p_val) > 0:
fill_price_hint = float(p_val)
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
if getattr(ev, "fee", 0.0):
fill_fee = float(ev.fee)
if getattr(ev, "fee_source", ""):
fill_fee_source = str(ev.fee_source)
if getattr(ev, "is_maker", False):
fill_is_maker = bool(ev.is_maker)
if getattr(ev, "slippage_bps", 0.0):
fill_slippage_bps = float(ev.slippage_bps)
if getattr(ev, "mark_at_submit", 0.0):
fill_mark_at_submit = float(ev.mark_at_submit)
if getattr(ev, "exchange_ts", 0):
fill_exchange_ts = int(ev.exchange_ts)
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
self._write_trade_exit_leg(snapshot, decision, intent, slot, outcome,
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
fill_price_hint=fill_price_hint,
fill_fee=fill_fee, fill_fee_source=fill_fee_source,
fill_is_maker=fill_is_maker, fill_slippage_bps=fill_slippage_bps)
self._write_trade_reconstruction(
snapshot, intent.trade_id,
event_type="PARTIAL_EXIT" if partial else "EXIT",
event_id=f"{intent.trade_id}:{'partial' if partial else 'close'}",
payload={
"decision": _decision_summary(decision),
"intent": _intent_summary(intent),
"outcome": _outcome_summary(outcome),
"slot": slot,
"market_state": _json_safe(market_state or {}),
},
market_state=market_state,
)
if slot_closed:
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
self._write_trade_event(snapshot, decision, intent, slot, outcome,
market_state=market_state,
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
exit_price_hint=fill_price_hint,
fill_fee=fill_fee, fill_fee_source=fill_fee_source,
fill_is_maker=fill_is_maker,
fill_slippage_bps=fill_slippage_bps,
fill_mark_at_submit=fill_mark_at_submit,
fill_exchange_ts=fill_exchange_ts)
def persist_fill_events(
self,
*,
snapshot: Any,
events: Any,
slot_dict: dict[str, Any] | None = None,
market_state: Mapping[str, Any] | None = None,
) -> None:
"""Persist a late (async) venue fill drained by the runtime pump.
There is no fresh policy decision for an async fill, so we synthesize a
minimal Decision/Intent from the post-fill slot + event and route it
through :meth:`persist_result`. Direction (ENTER vs EXIT) is inferred
from the slot: a closed slot or a drop in remaining size vs the last leg
snapshot is an EXIT; otherwise an opening fill is an ENTER. Capital
authority remains the kernel this only logs the settled result.
"""
slot = slot_dict or {}
event_list = tuple(events or ())
trade_id = str(slot.get("trade_id") or "")
asset = str(slot.get("asset") or "")
side = self._slot_side(slot)
closed = bool(slot.get("closed", False))
cur_size = self._slot_size(slot)
leverage = _safe_float(slot.get("leverage", 1.0), 1.0)
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
# Extract fill price from venue events (used as exit_price_hint for G21 fix).
price = next(
(float(getattr(e, "price", 0.0)) for e in event_list
if getattr(e, "price", 0.0) and math.isfinite(float(getattr(e, "price", 0.0)))),
0.0,
) or self._slot_entry_price(slot)
prev_size = _safe_float(self._leg_state.get(trade_id, {}).get("prev_size", 0.0), 0.0)
is_exit = closed or (prev_size > 0.0 and cur_size < prev_size - 1e-12)
action = DecisionAction.EXIT if is_exit else DecisionAction.ENTER
ts = getattr(snapshot, "timestamp", datetime.now(timezone.utc))
decision = Decision(
timestamp=ts, decision_id=trade_id or "async", asset=asset, action=action,
side=side, reason="ASYNC_FILL", confidence=0.0, velocity_divergence=0.0,
irp_alignment=0.0, reference_price=price, target_size=cur_size,
leverage=leverage, stage=TradeStage.POSITION_UPDATED, metadata={},
)
intent = Intent(
timestamp=ts, trade_id=trade_id, decision_id=trade_id or "async", asset=asset,
action=action, side=side, reason="ASYNC_FILL", target_size=cur_size,
leverage=leverage, reference_price=price, confidence=0.0,
exit_leg_ratios=tuple(slot.get("exit_leg_ratios", (1.0,)) or (1.0,)), metadata={},
)
outcome = KernelOutcome(
accepted=True, slot_id=int(slot.get("slot_id", 0) or 0), trade_id=trade_id,
state=KernelStage.CLOSED if closed else KernelStage.POSITION_OPEN,
diagnostic_code=KernelDiagnosticCode.OK, severity=KernelSeverity.INFO,
transitions=(), emitted_events=event_list, details={"origin": "async_fill_pump"},
)
self.persist_result(
snapshot=snapshot, decision=decision, intent=intent, outcome=outcome,
slot_dict=slot, phase="async_fill", market_state=market_state,
)
def persist_recovery_state(
self,
*,
snapshot: Any,
acc_dict: dict[str, Any] | None = None,
phase: str = "recovery",
event_type: str = "RECOVERY",
market_state: Mapping[str, Any] | None = None,
) -> None:
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
"""Persist recovery-only state after kernel reconcile.
A15 fix: acc_dict is the kernel account snapshot (capital/equity/pnl),
not a slot dict. Read the actual slot from kernel.slot(0) so that
trade_id, asset, size, and entry_price are correctly populated in the
recovery rows.
"""
# A15: read slot from kernel instead of misusing acc_dict as slot dict.
slot_dict: dict[str, Any] = {}
if self._kernel is not None:
try:
slot_view = self._kernel.slot(0)
raw = slot_view.to_dict() if hasattr(slot_view, "to_dict") else {}
slot_dict = dict(raw) if raw else {}
except Exception:
pass
trade_id = (
str(slot_dict.get("trade_id") or "")
or (str(acc_dict.get("trade_id", "")) if acc_dict else "")
)
self._write_status_snapshot(
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
snapshot, decision=None, intent=None, slot_dict=slot_dict, phase=phase,
)
self._write_account_event(
snapshot, decision=None, intent=None,
stage=TradeStage.TRADE_TERMINAL_WRITTEN,
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
slot_dict=slot_dict, event_type=event_type,
)
self._write_position_state(
snapshot, decision=None, intent=None,
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
slot_dict=slot_dict, stage=TradeStage.TRADE_TERMINAL_WRITTEN,
status=self._state_label(slot_dict, phase), market_state=market_state,
)
self._write_trade_reconstruction(
snapshot,
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
trade_id=trade_id,
event_type=event_type,
event_id=f"recovery:{phase}",
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
payload={"acc_dict": _json_safe(acc_dict or {}), "slot": _json_safe(slot_dict), "phase": phase, "market_state": _json_safe(market_state or {})},
market_state=market_state,
)
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
def persist_fee_settled(
self,
*,
trade_id: str,
venue_order_id: str = "",
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
fee: float,
fee_asset: str = "USDT",
is_maker: bool = False,
exchange_ts: int = 0,
realized_pnl_delta: float = 0.0,
ts: Optional[Any] = None,
) -> None:
"""Record the WS FILL_SETTLED fee arriving after the REST submit.
Gap 2: the REST ACK path writes fee_source="ESTIMATED_TAKER/MAKER".
When the WS ORDER_TRADE_UPDATE frame arrives with field "n" (actual
commission), call this method to log the settled truth.
trade_id should be our BTCUSDT-T-N format (from kernel slot).
venue_order_id is BingX's own orderId for bidirectional lookup.
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
The CH spool stores both the original estimated row AND this settled row.
Downstream queries can reconcile using:
SELECT trade_id, MAX(fee) FILTER(WHERE fee_source='WS_SETTLED') AS settled_fee,
MAX(fee) FILTER(WHERE fee_source LIKE 'ESTIMATED%') AS estimated_fee
FROM trade_events GROUP BY trade_id
This method writes to ``fee_settled_events`` (a lightweight supplementary
table, not trade_events) so the original row is never mutated.
"""
ts_val = ts or datetime.now(timezone.utc)
self._sink("fee_settled_events", {
"ts": ts_val.isoformat() if hasattr(ts_val, "isoformat") else str(ts_val),
"trade_id": trade_id,
"venue_order_id": venue_order_id,
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
"fee": float(fee),
"fee_asset": fee_asset,
"fee_source": "WS_SETTLED",
"is_maker": bool(is_maker),
"exchange_ts": int(exchange_ts),
"realized_pnl_delta": float(realized_pnl_delta),
"runtime_namespace": self.config.runtime_namespace,
"strategy": self.config.strategy,
})
def record_anomaly(
self,
*,
snapshot: Any,
decision: Any,
intent: Any,
anomaly: str,
origin: str = "emergent",
sensor: str = "",
detail: Any = "",
rm_meta: float = 0.0,
) -> None:
"""Persist a DITA anomaly row with legacy-compatible shape."""
self._sink(
"anomaly_events",
{
"ts": snapshot.timestamp.isoformat(),
"decision_id": decision.decision_id,
"trade_id": intent.trade_id,
"symbol": intent.asset,
"anomaly": anomaly,
"origin": origin,
"sensor": sensor,
"detail": _json_text(detail) if not isinstance(detail, str) else detail,
"rm_meta": float(rm_meta),
},
)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
def _state_label(slot_dict: dict[str, Any], phase: str) -> str:
if slot_dict.get("closed", False):
return "CLOSED"
if slot_dict.get("size", 0) > 0:
if phase.lower().startswith("recovery"):
return "RECOVERED_OPEN"
return "OPEN"
return "FLAT"
def _posture(self, slot_dict: dict[str, Any]) -> str:
if slot_dict.get("closed", False) or not slot_dict.get("size", 0):
return "FLAT"
return str(slot_dict.get("side", "FLAT"))
def _slot_entry_price(self, slot_dict: dict[str, Any]) -> float:
return _safe_float(slot_dict.get("entry_price", 0.0), 0.0)
def _slot_size(self, slot_dict: dict[str, Any]) -> float:
return _safe_float(slot_dict.get("size", 0.0), 0.0)
def _slot_side(self, slot_dict: dict[str, Any]) -> TradeSide:
raw = str(slot_dict.get("side", "FLAT")).upper()
if raw == "SHORT":
return TradeSide.SHORT
if raw == "LONG":
return TradeSide.LONG
return TradeSide.FLAT
def _slot_trade_id(self, slot_dict: dict[str, Any]) -> str:
return str(slot_dict.get("trade_id", ""))
def _slot_asset(self, slot_dict: dict[str, Any]) -> str:
return str(slot_dict.get("asset", ""))
# ------------------------------------------------------------------
# Row writers
# ------------------------------------------------------------------
def _write_anomaly(
self, snapshot: Any, decision: Decision, intent: Intent,
*, anomaly: str, origin: str = "ditav2_kernel", detail: Any = "",
) -> None:
self._sink("anomaly_events", {
"ts": snapshot.timestamp.isoformat(),
"decision_id": decision.decision_id,
"trade_id": intent.trade_id,
"symbol": intent.asset,
"anomaly": anomaly,
"origin": origin,
"sensor": "",
"detail": _json_text(detail) if not isinstance(detail, str) else detail,
"rm_meta": 0.0,
})
def _write_policy_event(
self, snapshot: Any, decision: Decision, intent: Intent, *, phase: str,
) -> None:
price = _safe_float(decision.reference_price, 0.0)
quantity = _safe_float(intent.target_size, 0.0)
row = {
"ts": snapshot.timestamp.isoformat(),
"strategy": self.config.strategy,
"runtime_namespace": self.config.runtime_namespace,
"strategy_namespace": self.config.strategy_namespace,
"event_namespace": self.config.event_namespace,
"actor_name": self.config.actor_name,
"exec_venue": self.config.exec_venue,
"data_venue": self.config.data_venue,
"source": "ditav2",
"trade_id": intent.trade_id,
"asset": decision.asset,
"side": decision.side.value,
"entry_price": price,
"current_price": price,
"quantity": quantity,
"notional": _notional(quantity, price),
"leverage": _safe_float(intent.leverage, 1.0),
"bar_idx": 0,
"decision_seq": self._trade_seq(),
"bars_held": int(intent.bars_held or 0),
"action": decision.action.value,
"reason": decision.reason,
"pnl_pct": 0.0,
"mfe": 0.0,
"mae": 0.0,
"mfe_risk": 0.0,
"mae_risk": 0.0,
"exit_pressure": 0.0,
"rv_comp": 0.0,
"mae_thresh1": 0.0,
"bounce_score": 0.0,
"bounce_risk": 0.0,
"ob_imbalance": 0.0,
"vel_div_entry": float(decision.velocity_divergence or 0.0),
"vel_div_now": float(decision.velocity_divergence or 0.0),
"v50_vel": 0.0,
"v750_vel": 0.0,
"exf_funding": 0.0,
"exf_dvol": 0.0,
"exf_fear_greed": 0.0,
"exf_taker": 0.0,
"posture": decision.side.value,
}
self._sink("policy_events", row)
self._v7_sink("v7_decision_events", row)
def _write_account_event(
self, snapshot: Any, decision: Decision | None, intent: Intent | None,
*, stage: TradeStage, slot_dict: dict[str, Any], event_type: str | None = None,
) -> None:
capital = self._capital()
peak_cap = self._peak_capital()
is_open = not slot_dict.get("closed", False) and slot_dict.get("size", 0) > 0
open_notional = _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)) if is_open else 0.0
drawdown_pct = 0.0 if peak_cap <= 0 else max(0.0, (peak_cap - capital) / peak_cap)
row = {
"ts": snapshot.timestamp.isoformat(),
"event_type": event_type or stage.value,
"strategy": self.config.strategy,
"posture": self._posture(slot_dict),
"capital": capital,
"peak_capital": peak_cap,
"drawdown_pct": drawdown_pct,
"pnl_today": float(self.account.snapshot.realized_pnl or 0.0),
"trades_today": self._trade_seq(),
"open_positions": 1 if is_open else 0,
"boost": 1.0,
"beta": 0.0,
"current_open_notional": open_notional,
"current_account_leverage": 0.0 if capital <= 0 else open_notional / capital,
"exchange_leverage": int(round(_safe_float(slot_dict.get("leverage", 0.0), 0.0))),
"exchange_leverage_mode": self.config.exchange_leverage_mode,
"leverage_mapping_rule": self.config.leverage_mapping_rule,
"runtime_namespace": self.config.runtime_namespace,
"strategy_namespace": self.config.strategy_namespace,
"event_namespace": self.config.event_namespace,
"actor_name": self.config.actor_name,
"exec_venue": self.config.exec_venue,
"data_venue": self.config.data_venue,
"notes": _json_text({
"decision_id": None if decision is None else decision.decision_id,
"trade_id": None if intent is None else intent.trade_id,
"reason": None if intent is None else intent.reason,
"stage": stage.value,
}),
# Phase 4: kernel atomic account versioning
"account_event_seq": self._account_event_seq(),
"reconcile_status": self._kernel_account().get("reconcile_status", "OK"),
}
self._sink("account_events", row)
# Write a reconcile_events record whenever E-facts are present
ka = self._kernel_account()
if ka.get("e_wallet_balance", 0.0) > 0:
self.write_reconcile_event(
event_seq=ka.get("event_seq", 0),
ts=snapshot.timestamp,
k_capital=ka.get("k_capital", 0.0),
e_wallet_balance=ka.get("e_wallet_balance", 0.0),
delta=ka.get("reconcile_delta", 0.0),
status=ka.get("reconcile_status", "OK"),
explanation=ka.get("reconcile_explanation", ""),
)
def _write_position_state(
self, snapshot: Any, decision: Decision | None, intent: Intent | None,
*, slot_dict: dict[str, Any], stage: TradeStage, status: str,
market_state: Mapping[str, Any] | None = None,
) -> None:
side = self._slot_side(slot_dict)
trade_id = self._slot_trade_id(slot_dict)
asset = self._slot_asset(slot_dict)
if not trade_id and intent is not None:
trade_id = intent.trade_id
asset = intent.asset
side = intent.side
row = {
"ts": snapshot.timestamp.isoformat(),
"trade_id": trade_id,
"asset": asset,
"direction": _direction(side),
"entry_price": self._slot_entry_price(slot_dict),
"quantity": self._slot_size(slot_dict),
"notional": _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)),
"leverage": _safe_float(slot_dict.get("leverage", 0.0), 0.0),
"bucket_id": -1,
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
# A14 fix: active_leg_index is the exit-leg counter, not the bar count.
# Use intent.bars_held when available; fall back to 0.
"entry_bar": int(intent.bars_held if intent is not None else 0) or 0,
"status": status,
"exit_reason": slot_dict.get("close_reason", ""),
"pnl": _safe_float(slot_dict.get("realized_pnl", 0.0), 0.0),
"bars_held": 0,
"market_state_bundle_json": _json_text(market_state or {}),
"tp_base_pct": 0.0,
"tp_effective_pct": 0.0,
"our_leverage": _safe_float(slot_dict.get("leverage", 0.0), 0.0),
}
self._sink("position_state", row)
def _write_status_snapshot(
self, snapshot: Any, decision: Decision | None, intent: Intent | None,
*, slot_dict: dict[str, Any], phase: str,
) -> None:
capital = self._capital()
peak_cap = self._peak_capital()
is_open = not slot_dict.get("closed", False) and slot_dict.get("size", 0) > 0
open_notional = _notional(self._slot_size(slot_dict), self._slot_entry_price(slot_dict)) if is_open else 0.0
leverage = 0.0 if capital <= 0 else open_notional / capital
drawdown = 0.0 if peak_cap <= 0 else max(0.0, (peak_cap - capital) / peak_cap)
row = {
"ts": snapshot.timestamp.isoformat(timespec="milliseconds"),
"capital": capital,
"roi_pct": 0.0 if self.config.initial_capital <= 0 else ((capital / self.config.initial_capital) - 1.0) * 100.0,
"dd_pct": drawdown * 100.0,
"trades_executed": self._trade_seq(),
"posture": self._posture(slot_dict),
"rm": 1.0 if decision is None else max(0.0, min(1.0, decision.confidence)),
"vel_div": 0.0 if decision is None else float(decision.velocity_divergence),
"vol_ok": 1,
"phase": phase,
"mhs_status": "GREEN",
"boost": 1.0,
"cat5": 0.0,
"conviction_multiplier": 0.0 if intent is None else float(intent.confidence or 0.0),
"exchange_leverage": int(round(_safe_float(slot_dict.get("leverage", 0.0), 0.0))),
"exchange_leverage_mode": self.config.exchange_leverage_mode,
"leverage_mapping_rule": self.config.leverage_mapping_rule,
"account_capital": capital,
"portfolio_capital": capital,
"current_open_notional": open_notional,
"current_account_leverage": leverage,
"remaining_notional_capacity": max(0.0, self.config.max_account_leverage * capital - open_notional),
"max_account_leverage": self.config.max_account_leverage,
"ledger_authority": self.config.ledger_authority,
}
self._sink("status_snapshots", row)
def _write_trade_exit_leg(
self, snapshot: Any, decision: Decision, intent: Intent,
slot_dict: dict[str, Any], outcome: KernelOutcome | None,
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
fill_price_hint: float = 0.0,
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
fill_fee: float = 0.0,
fill_fee_source: str = "",
fill_is_maker: bool = False,
fill_slippage_bps: float = 0.0,
) -> None:
"""Emit one BLUE-schema-compatible ``trade_exit_legs`` row per exit leg.
The DITAv2 kernel uses a single slot with sequential exit legs rather
than BLUE's chained per-leg trade_ids, so the chain_* columns describe
the leg sequence within this one trade (root = trade_id). Per-leg deltas
(exit_qty, pnl_leg) are computed against the previous leg's snapshot held
in ``self._leg_state`` so each row is isolated, not cumulative.
"""
trade_id = intent.trade_id
prev = self._leg_state.get(trade_id) or {
"prev_realized": 0.0,
"prev_size": _safe_float(slot_dict.get("initial_size", 0.0), 0.0),
"prev_leg_id": "",
}
entry_price = self._slot_entry_price(slot_dict) or _safe_float(intent.reference_price, 0.0)
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
# G21 fix: use fill price hint (actual exchange fill) > intent ref > decision ref.
exit_price = (
fill_price_hint
or _safe_float(intent.reference_price, 0.0)
or _safe_float(decision.reference_price, 0.0)
)
side = self._slot_side(slot_dict)
if side == TradeSide.FLAT:
side = intent.side
leverage_val = _safe_float(slot_dict.get("leverage", intent.leverage), 1.0)
cur_size = self._slot_size(slot_dict)
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
cur_realized = _checked_float(slot_dict.get("realized_pnl", 0.0), 0.0,
field="realized_pnl", trade_id=trade_id, sink=self._sink)
prev_size = _safe_float(prev.get("prev_size", 0.0), 0.0)
prev_realized = _safe_float(prev.get("prev_realized", 0.0), 0.0)
# active_leg_index is post-fill (already advanced); the leg that just
# filled is therefore one behind. Clamp to a valid ratio index.
ratios = slot_dict.get("exit_leg_ratios", []) or []
leg_index = max(0, int(slot_dict.get("active_leg_index", 0) or 0) - 1)
fraction = _safe_float(ratios[leg_index], 0.0) if 0 <= leg_index < len(ratios) else 0.0
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
# External-position fix: if leg_state was never seeded (position detected via
# reconcile/pump rather than our own ENTER), prev_size=0 would make exit_qty=0.
# Fall back to initial_size or intent.target_size so the leg row is meaningful.
if prev_size <= 1e-12:
# No prior leg tracking — use the slot's initial_size or intent size.
initial = _safe_float(slot_dict.get("initial_size", 0.0), 0.0) or _safe_float(intent.target_size, 0.0)
exit_qty = initial - cur_size if initial > cur_size else initial
else:
exit_qty = max(0.0, prev_size - cur_size)
pnl_leg = cur_realized - prev_realized
capital_after = self._capital()
capital_before = capital_after - pnl_leg
exit_notional = _notional(exit_qty, exit_price or entry_price)
remaining_notional = _notional(cur_size, entry_price)
denom = abs(exit_qty * entry_price * max(leverage_val, 1e-9))
pnl_pct_leg = pnl_leg / denom if denom > 0 else 0.0
exit_leg_id = f"{trade_id}:leg{leg_index}"
self._sink("trade_exit_legs", {
"ts": snapshot.timestamp.isoformat(),
"date": snapshot.timestamp.date().isoformat(),
"strategy": self.config.strategy,
"trade_id": trade_id,
"chain_root_trade_id": trade_id,
"chain_head_leg_id": f"{trade_id}:leg0",
"chain_prev_leg_id": str(prev.get("prev_leg_id", "") or ""),
"chain_seq": leg_index,
"chain_token": trade_id,
"chain_mode": "LIVE",
"exit_leg_id": exit_leg_id,
"exit_seq": leg_index,
"command_id": decision.decision_id,
"source": "ditav2",
"reason": intent.reason,
"asset": intent.asset,
"side": side.value,
"entry_price": entry_price,
"exit_price": exit_price,
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
"exit_qty": exit_qty,
"fraction": fraction,
"capital_before": capital_before,
"capital_after": capital_after,
"exit_notional": exit_notional,
"remaining_notional": remaining_notional,
"remaining_qty": cur_size,
"pnl_pct_leg": pnl_pct_leg,
"pnl_leg": pnl_leg,
"pnl_realized_total": cur_realized,
"bars_held": int(intent.bars_held or 0),
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
# Gap 1/2/3: per-leg friction
"fee_leg": fill_fee,
"fee_source": fill_fee_source,
"is_maker": fill_is_maker,
"slippage_bps": fill_slippage_bps,
})
# Advance the per-trade leg snapshot for the next leg's delta.
self._leg_state[trade_id] = {
"prev_realized": cur_realized,
"prev_size": cur_size,
"prev_leg_id": exit_leg_id,
}
def _write_trade_event(
self, snapshot: Any, decision: Decision, intent: Intent,
slot_dict: dict[str, Any], outcome: KernelOutcome | None,
*, market_state: Mapping[str, Any] | None = None,
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
exit_price_hint: float = 0.0,
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
fill_fee: float = 0.0,
fill_fee_source: str = "",
fill_is_maker: bool = False,
fill_slippage_bps: float = 0.0,
fill_mark_at_submit: float = 0.0,
fill_exchange_ts: int = 0,
) -> None:
entry_price = _safe_float(slot_dict.get("entry_price", 0.0), 0.0) or _safe_float(intent.reference_price, 0.0)
quantity = _safe_float(slot_dict.get("initial_size", slot_dict.get("size", 0.0)), 0.0) or _safe_float(intent.target_size, 0.0)
PINK: fix persistence layer — exit_price, entry_bar, recovery, external exits, NaN tracing G21/E23/A13 — exit_price used entry_price (every trade had exit_price==entry_price): _write_trade_event: exit_price = fill_price_hint > intent.reference_price > decision.reference_price _write_trade_exit_leg: same priority chain via fill_price_hint parameter persist_result: extracts fill_price_hint from FULL_FILL/PARTIAL_FILL events in outcome persist_fill_events: intent.reference_price = actual fill price → propagates correctly A14 — entry_bar was active_leg_index (exit leg counter, not bar count): _write_position_state: entry_bar = intent.bars_held (0 when intent is None) A15 — persist_recovery_state used acc_dict as slot_dict (trade_id always ""): Now reads kernel.slot(0).to_dict() when kernel is wired; trade_id from real slot External-position exit_qty=0 fix: _write_trade_exit_leg: when prev_size<=0 (no prior ENTER tracked), falls back to initial_size or intent.target_size so exit legs for reconcile-detected positions are meaningful exit_qty field added to trade_exit_legs rows (was computed but not emitted) NaN tracing (_checked_float): Introduces _checked_float() wrapper that logs WARNING + writes anomaly_events spool row on NaN/inf in financial fields; applied to realized_pnl in exit paths 29 new persistence unit tests (mocked) + chaos/fuzz suite: exit_price correctness, capital ordering, pnl_leg incremental, entry_bar, recovery trade_id, external position exits, multi-leg, restart-mid-trade, NaN/None fields 164/164 total (97 flaws + 25 kernel reliability + 29 persistence + 13 phase4) green FLAWS doc: pass 6 — G21/E23/A13/A14/A15 closed; 26 total fixed Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 09:30:30 +02:00
# G21 fix: exit_price is the fill/order price, NOT the entry price.
# Priority: explicit hint (fill event price) > intent reference price > decision price.
# Fall back to entry_price only as absolute last resort (avoids the G21 bug where
# every trade_events row had exit_price == entry_price and PnL reconstruction was zero).
exit_price = (
exit_price_hint
or _safe_float(intent.reference_price, 0.0)
or _safe_float(decision.reference_price, 0.0)
or _safe_float(slot_dict.get("entry_price", 0.0), 0.0)
)
tid = intent.trade_id if intent is not None else ""
pnl = _checked_float(slot_dict.get("realized_pnl", 0.0), 0.0,
field="realized_pnl", trade_id=tid, sink=self._sink)
pnl_pct = 0.0
leverage_val = _safe_float(slot_dict.get("leverage", intent.leverage), 1.0)
denom = abs(quantity * entry_price * max(leverage_val, 1e-9))
if denom > 0:
pnl_pct = pnl / denom
capital_after = self._capital()
capital_before = capital_after - pnl
open_notional = _notional(quantity, exit_price or entry_price)
conviction = float(intent.confidence or decision.confidence or 0.0)
metadata = intent.metadata if intent is not None else (decision.metadata if decision is not None else {})
row = {
"ts": snapshot.timestamp.isoformat(),
"date": snapshot.timestamp.date().isoformat(),
"strategy": self.config.strategy,
"trade_id": intent.trade_id,
"asset": intent.asset,
"side": intent.side.value,
"entry_price": entry_price,
"exit_price": exit_price,
"quantity": quantity,
"pnl": pnl,
"pnl_pct": pnl_pct,
"exit_reason": intent.reason,
"vel_div_entry": float(decision.velocity_divergence or 0.0),
"boost_at_entry": 1.0,
"beta_at_entry": 0.0,
"posture": intent.side.value,
"leverage": leverage_val,
"conviction_multiplier": conviction,
"exchange_leverage": int(round(leverage_val)),
"exchange_leverage_mode": self.config.exchange_leverage_mode,
"leverage_mapping_rule": self.config.leverage_mapping_rule,
"runtime_namespace": self.config.runtime_namespace,
"strategy_namespace": self.config.strategy_namespace,
"event_namespace": self.config.event_namespace,
"actor_name": self.config.actor_name,
"exec_venue": self.config.exec_venue,
"data_venue": self.config.data_venue,
"account_capital": capital_after,
"portfolio_capital": capital_after,
"current_open_notional": open_notional,
"remaining_notional_capacity": max(0.0, self.config.max_account_leverage * capital_after - open_notional),
"max_account_leverage": self.config.max_account_leverage,
"margin_required": 0.0 if leverage_val <= 0 else open_notional / leverage_val,
"ledger_authority": self.config.ledger_authority,
"regime_signal": 0,
"capital_before": capital_before,
"capital_after": capital_after,
"peak_capital": self._peak_capital(),
"drawdown_at_entry": 0.0 if self._peak_capital() <= 0 else max(0.0, (self._peak_capital() - capital_before) / self._peak_capital()),
"open_positions_count": 0,
"scan_uuid": decision.decision_id,
"bars_held": int(intent.bars_held or 0),
"entry_payload_json": _json_text({"decision": _decision_summary(decision), "intent": _intent_summary(intent)}),
"exit_payload_json": _json_text({"outcome": _outcome_summary(outcome), "slot": _json_safe(slot_dict)}),
"execution_payload_json": _json_text({"outcome": _outcome_summary(outcome)}),
PINK: S1 leverage cache, S2 background refresh, Gap 1/2/3 fee+slippage logging S1 — Leverage cache (bingx_direct.py): _ensure_leverage(): per-symbol asyncio.Lock + cached value check; skips ~350ms POST when exchange already has the requested leverage. Saves ~350ms/trade. Cache updated ONLY on success; failed POST leaves cache stale → correct retry. Persist: JSON sidecar /tmp/.bingx_leverage_cache_{env}.json; survives restarts. connect(): _verify_leverage_drift() detects when another process changed leverage at the exchange and updates cache to exchange truth (logs WARNING on drift). Multi-runner contract: leverage is account-level on BingX; documented that concurrent runners with different leverage desires for same symbol conflict. 20 mock tests: same-lev skip, change-triggers-POST, failure-no-cache-update, concurrent-same-symbol (lock prevents race), drift-detect, persist/restore, multi-runner known-limitation documentation test. S2 — Background state refresh (bingx_direct.py): MARKET fills: asyncio.create_task(_refresh_state_background) — does not block submit path. WS FILL_SETTLED + ACCOUNT_UPDATE deliver capital truth anyway. LIMIT fills: synchronous refresh retained (include_history=False, not True) — needed to detect resting order state for next pump cycle. Saves ~600–900ms/trade on MARKET exits. ENTER similarly improved. Gap 1 — VenueEvent friction fields (contracts.py): Added: fee, fee_asset, fee_source, is_maker, exchange_ts, slippage_bps, mark_at_submit — all with defaults so existing callers are unaffected. Detailed inline docs for sign conventions and provenance codes. Gap 2 — Fee estimation + WS_SETTLED provenance (bingx_direct.py, pink_clickhouse.py): submit_intent: estimates fee from fill_price × fill_qty × taker/maker rate; annotates ack_row with _fee_estimated, _fee_source, _is_maker_est. persist_fee_settled(): new method writes fee_settled_events row when WS ORDER_TRADE_UPDATE delivers actual commission ("n" field); fee_source="WS_SETTLED". pink_direct._run_account_stream: calls persist_fee_settled on FILL_SETTLED. Gap 3 — Slippage measurement (bingx_direct.py, bingx_venue.py, pink_clickhouse.py): Captures mark_at_submit before the order POST; computes slippage_bps signed by side: positive = adverse (taker overpaid / maker undersold), negative = price improvement. Measured for BOTH taker and maker fills for symmetry. Flows through VenueEvent → trade_events.slippage_bps + trade_exit_legs.slippage_bps. S3 / SOR — Maker order placement: comprehensive TODO block in submit_intent with: SHORT/LONG-aware price offset design, OBF integration requirements, TODO_ADD_PARAMSET_VIBRISS for spread_bps threshold, intelligent timeout_s calibration requirements, price-impact awareness gap, SOR abstraction CRITICAL TODO. REST/WS split: documented why BingX (and all retail venues) separate these and why a unified VenueAdapter protocol is the long-term solution. 151/151 existing tests green + 20 new leverage cache tests = 171 total. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 12:25:12 +02:00
# Gap 1/2/3: fee, maker/taker, slippage, exchange timing.
# fee_source provenance: "ESTIMATED_TAKER" | "ESTIMATED_MAKER" | "WS_SETTLED" | "REST_SETTLED"
"fee": fill_fee,
"fee_source": fill_fee_source,
"is_maker": fill_is_maker,
"slippage_bps": fill_slippage_bps,
"mark_at_submit": fill_mark_at_submit,
"exchange_ts": fill_exchange_ts,
"friction_payload_json": _json_text({
"fee": fill_fee, "fee_source": fill_fee_source,
"is_maker": fill_is_maker, "slippage_bps": fill_slippage_bps,
"mark_at_submit": fill_mark_at_submit, "exchange_ts": fill_exchange_ts,
}),
"event_payload_json": _json_text({"phase": "terminal_close", "trade_id": intent.trade_id}),
"market_state_bundle_json": _json_text(market_state or {}),
"tp_base_pct": _safe_float(metadata.get("tp_base_pct", 0.0), 0.0),
"tp_effective_pct": _safe_float(metadata.get("tp_effective_pct", 0.0), 0.0),
"our_leverage": _safe_float(metadata.get("our_leverage", 0.0), 0.0),
}
self._sink("trade_events", row)
def _write_trade_reconstruction(
self, snapshot: Any, trade_id: str, *,
event_type: str, event_id: str, payload: Any,
market_state: Mapping[str, Any] | None = None,
) -> None:
self._sink("trade_reconstruction", {
"ts": snapshot.timestamp.isoformat(),
"trade_id": trade_id,
"event_type": event_type,
"event_id": event_id,
"payload_json": _json_text(payload),
"market_state_bundle_json": _json_text(market_state or {}),
})