PINK DITAv2 L0-L2: two-phase persistence + async-fill pump + LIMIT wiring
Execution-infra only (policy stays MARKET; algorithmic integrity untouched). L0 — two-phase (request->result) persistence (pink_clickhouse.py): - Split persist_step into persist_request (policy_events + trade_reconstruction ORDER_REQUESTED) and persist_result (state snapshot + per-fill lifecycle rows). - Lifecycle rows (ENTRY_FILLED/EXIT/trade_events/trade_exit_legs) gated on evidence of an actual fill (FULL/PARTIAL_FILL event, closed slot, or size drop vs _leg_state) -> a resting LIMIT (ACK only) emits no terminal rows. - Add persist_fill_events: synthesizes a minimal decision/intent from slot+event for async fills and routes through persist_result. L1 — async-fill pump (pink_direct.py): - PinkDirectRuntime.pump_venue_events(): venue.reconcile() -> kernel.on_venue_event (capital settles, FSM advances), persists applied fills; kernel dedups duplicates (no double-settle). Called at the start of step(). L2 — LIMIT placement (bingx_direct.py): - submit_intent now honors _order_type/_limit_price from intent metadata (was hardcoded MARKET): LIMIT -> type=LIMIT + price + GTC; MARKET default; invalid limit price falls back to MARKET. Offline: 63 passed (persistence/groundwork/pump/limit-payload/runtime/accounting/ flaws/kernel). MARKET path unchanged; resting LIMIT now correct end-to-end offline. Live VST validation (L3) pending. BLUE untouched. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
446
prod/clean_arch/adapters/bingx_direct.py
Normal file
446
prod/clean_arch/adapters/bingx_direct.py
Normal file
@@ -0,0 +1,446 @@
|
||||
"""Direct BingX execution adapter with no Nautilus Trader node dependency.
|
||||
|
||||
This adapter speaks BingX REST directly and keeps the exchange state
|
||||
authoritative. It is intended for PINK live execution under the DITA boundary.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import math
|
||||
import uuid
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timezone
|
||||
from decimal import Decimal, ROUND_DOWN
|
||||
from typing import Any, Optional
|
||||
|
||||
from nautilus_trader.model.identifiers import InstrumentId
|
||||
|
||||
from prod.bingx.config import BingxExecClientConfig
|
||||
from prod.bingx.config import BingxInstrumentProviderConfig
|
||||
from prod.bingx.enums import BingxEnvironment
|
||||
from prod.bingx.http import BingxHttpError
|
||||
from prod.bingx.http import BingxHttpClient
|
||||
from prod.bingx.instrument_provider import BingxInstrumentProvider
|
||||
from prod.bingx.leverage import normalize_bingx_leverage_value
|
||||
from prod.bingx.schemas import BingxOrderAck
|
||||
from prod.bingx.schemas import unwrap_order_payload
|
||||
from prod.clean_arch.dita import Intent, TradeSide, DecisionAction
|
||||
from prod.clean_arch.ports.execution import ExchangeStateSnapshot
|
||||
from prod.clean_arch.ports.execution import ExecutionReceipt
|
||||
from prod.clean_arch.ports.execution import ExecutionPort
|
||||
|
||||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _rows_from_payload(payload: Any, *keys: str) -> list[dict[str, Any]]:
|
||||
if isinstance(payload, list):
|
||||
return [row for row in payload if isinstance(row, dict)]
|
||||
if isinstance(payload, dict):
|
||||
for key in keys:
|
||||
rows = payload.get(key)
|
||||
if isinstance(rows, list):
|
||||
return [row for row in rows if isinstance(row, dict)]
|
||||
return []
|
||||
|
||||
|
||||
def _capital_from_balance_rows(rows: Any) -> float:
|
||||
if not isinstance(rows, list):
|
||||
return 0.0
|
||||
for row in rows:
|
||||
if not isinstance(row, dict):
|
||||
continue
|
||||
capital = 0.0
|
||||
for key in ("total", "balance", "equity", "availableMargin", "availableBalance", "walletBalance", "free"):
|
||||
try:
|
||||
capital = float(row.get(key, 0.0) or 0.0)
|
||||
except Exception:
|
||||
continue
|
||||
if capital > 0 and math.isfinite(capital):
|
||||
return capital
|
||||
if capital > 0 and math.isfinite(capital):
|
||||
return capital
|
||||
return 0.0
|
||||
|
||||
|
||||
def _position_notional_from_rows(rows: Any) -> float:
|
||||
if not isinstance(rows, list):
|
||||
return 0.0
|
||||
total = 0.0
|
||||
for row in rows:
|
||||
if not isinstance(row, dict):
|
||||
continue
|
||||
try:
|
||||
qty = abs(
|
||||
float(
|
||||
row.get("positionAmt")
|
||||
or row.get("positionQty")
|
||||
or row.get("positionSize")
|
||||
or row.get("quantity")
|
||||
or row.get("pa")
|
||||
or 0.0
|
||||
)
|
||||
)
|
||||
if qty <= 0.0:
|
||||
continue
|
||||
notional = row.get("positionValue") or row.get("notional") or row.get("openNotional")
|
||||
if notional is not None:
|
||||
total += abs(float(notional or 0.0))
|
||||
continue
|
||||
entry = (
|
||||
row.get("entryPrice")
|
||||
or row.get("avgPrice")
|
||||
or row.get("markPrice")
|
||||
or row.get("avgEntryPrice")
|
||||
or row.get("ep")
|
||||
or row.get("ap")
|
||||
or 0.0
|
||||
)
|
||||
total += qty * abs(float(entry or 0.0))
|
||||
except Exception:
|
||||
continue
|
||||
return total
|
||||
|
||||
|
||||
def _normalize_symbol(symbol: str) -> str:
|
||||
return str(symbol or "").replace("-", "").replace("_", "").replace("/","").upper()
|
||||
|
||||
|
||||
def _venue_symbol_from_asset(asset: str) -> str:
|
||||
text = _normalize_symbol(asset)
|
||||
if text.endswith("USDT"):
|
||||
return f"{text[:-4]}-USDT"
|
||||
return text
|
||||
|
||||
|
||||
def _decimal_text(value: Decimal) -> str:
|
||||
text = format(value.normalize(), "f")
|
||||
if "." in text:
|
||||
text = text.rstrip("0").rstrip(".")
|
||||
return text or "0"
|
||||
|
||||
|
||||
def _is_rate_limited_error(exc: Exception) -> bool:
|
||||
message = str(exc)
|
||||
lowered = message.lower()
|
||||
return "100410" in message or "frequency limit" in lowered or "rate limit" in lowered
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class BingxDirectExecutionConfig:
|
||||
"""Execution-specific knobs for the direct adapter."""
|
||||
|
||||
environment: BingxEnvironment = BingxEnvironment.VST
|
||||
allow_mainnet: bool = False
|
||||
default_leverage: int = 1
|
||||
exchange_leverage_cap: int = 3
|
||||
recv_window_ms: int = 5_000
|
||||
prefer_websocket: bool = False
|
||||
use_reduce_only: bool = True
|
||||
journal_strategy: str = "pink"
|
||||
journal_db: str = "dolphin_pink"
|
||||
instrument_provider: BingxInstrumentProviderConfig = BingxInstrumentProviderConfig(load_all=True)
|
||||
|
||||
|
||||
class BingxDirectExecutionAdapter(ExecutionPort):
|
||||
"""Direct BingX execution boundary with exchange-led state snapshots."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
config: BingxExecClientConfig | BingxDirectExecutionConfig,
|
||||
*,
|
||||
client: BingxHttpClient | None = None,
|
||||
provider: BingxInstrumentProvider | None = None,
|
||||
) -> None:
|
||||
if isinstance(config, BingxExecClientConfig):
|
||||
self._config = BingxDirectExecutionConfig(
|
||||
environment=config.environment,
|
||||
allow_mainnet=config.allow_mainnet,
|
||||
default_leverage=int(config.default_leverage),
|
||||
exchange_leverage_cap=int(config.exchange_leverage_cap),
|
||||
recv_window_ms=int(config.recv_window_ms),
|
||||
prefer_websocket=bool(config.prefer_websocket),
|
||||
use_reduce_only=bool(config.use_reduce_only),
|
||||
journal_strategy=str(config.journal_strategy or "pink"),
|
||||
journal_db=str(config.journal_db or "dolphin_pink"),
|
||||
instrument_provider=config.instrument_provider,
|
||||
)
|
||||
http_config = config
|
||||
else:
|
||||
self._config = config
|
||||
http_config = BingxExecClientConfig(
|
||||
api_key="",
|
||||
secret_key="",
|
||||
environment=config.environment,
|
||||
allow_mainnet=config.allow_mainnet,
|
||||
prefer_websocket=config.prefer_websocket,
|
||||
sizing_mode="testnet",
|
||||
exchange_leverage_cap=config.exchange_leverage_cap,
|
||||
use_reduce_only=config.use_reduce_only,
|
||||
default_leverage=config.default_leverage,
|
||||
recv_window_ms=config.recv_window_ms,
|
||||
journal_strategy=config.journal_strategy,
|
||||
journal_db=config.journal_db,
|
||||
instrument_provider=config.instrument_provider,
|
||||
)
|
||||
self._client = client or BingxHttpClient(http_config)
|
||||
self._provider = provider or BingxInstrumentProvider(client=self._client, config=self._config.instrument_provider)
|
||||
self._log = LOGGER
|
||||
self._client_order_run_id = uuid.uuid4().hex[:8]
|
||||
self._entry_client_order_seq = 0
|
||||
self._exit_client_order_seq = 0
|
||||
self._state: ExchangeStateSnapshot | None = None
|
||||
self._connected = False
|
||||
|
||||
@property
|
||||
def state(self) -> ExchangeStateSnapshot | None:
|
||||
return self._state
|
||||
|
||||
async def connect(self) -> bool:
|
||||
await self._provider.initialize()
|
||||
self._connected = True
|
||||
self._state = await self.refresh_state()
|
||||
return True
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
self._connected = False
|
||||
await self._client.close()
|
||||
|
||||
def _resolve_instrument(self, asset: str):
|
||||
normalized = _normalize_symbol(asset)
|
||||
candidates = [
|
||||
InstrumentId.from_str(f"{normalized}.BINGX"),
|
||||
InstrumentId.from_str(f"{_venue_symbol_from_asset(asset)}.BINGX"),
|
||||
]
|
||||
for candidate in candidates:
|
||||
instrument = self._provider.find(candidate)
|
||||
if instrument is not None:
|
||||
return instrument
|
||||
for instrument in self._provider.list_all():
|
||||
if _normalize_symbol(instrument.symbol.value) == normalized:
|
||||
return instrument
|
||||
if _normalize_symbol(instrument.raw_symbol.value) == normalized:
|
||||
return instrument
|
||||
return None
|
||||
|
||||
def _instrument_venue_symbol(self, asset: str) -> str:
|
||||
instrument = self._resolve_instrument(asset)
|
||||
if instrument is not None:
|
||||
return str(instrument.raw_symbol.value)
|
||||
return _venue_symbol_from_asset(asset)
|
||||
|
||||
def _instrument_step(self, asset: str) -> Decimal:
|
||||
instrument = self._resolve_instrument(asset)
|
||||
if instrument is not None:
|
||||
try:
|
||||
return Decimal(str(instrument.size_increment.as_decimal()))
|
||||
except Exception:
|
||||
pass
|
||||
return Decimal("0.001")
|
||||
|
||||
def _format_quantity(self, asset: str, quantity: float) -> str:
|
||||
step = self._instrument_step(asset)
|
||||
if step <= 0:
|
||||
return str(max(0.0, quantity))
|
||||
value = Decimal(str(quantity))
|
||||
quantized = (value / step).to_integral_value(rounding=ROUND_DOWN) * step
|
||||
return _decimal_text(max(Decimal("0"), quantized))
|
||||
|
||||
def _instrument_tick(self, asset: str) -> Decimal:
|
||||
instrument = self._resolve_instrument(asset)
|
||||
if instrument is not None:
|
||||
try:
|
||||
tick = getattr(instrument, "price_increment", None)
|
||||
if tick is not None:
|
||||
return Decimal(str(tick.as_decimal()))
|
||||
except Exception:
|
||||
pass
|
||||
return Decimal("0.01")
|
||||
|
||||
def _format_price(self, asset: str, price: float) -> str:
|
||||
tick = self._instrument_tick(asset)
|
||||
if tick <= 0:
|
||||
return f"{price:.8f}".rstrip("0").rstrip(".")
|
||||
value = Decimal(str(price))
|
||||
quantized = (value / tick).to_integral_value(rounding=ROUND_DOWN) * tick
|
||||
return _decimal_text(max(Decimal("0"), quantized))
|
||||
|
||||
async def _safe_get(self, endpoint: str, params: dict | None = None, *, fallback: Any = None) -> Any:
|
||||
"""GET an endpoint, returning *fallback* on rate-limit errors."""
|
||||
try:
|
||||
return await self._client.signed_get(endpoint, params)
|
||||
except BingxHttpError as exc:
|
||||
message = str(exc)
|
||||
if "100410" in message or "frequency limit" in message.lower():
|
||||
LOGGER.debug("BingX %s rate-limited; continuing with empty snapshot", endpoint)
|
||||
return fallback if fallback is not None else []
|
||||
raise
|
||||
|
||||
async def _refresh_exchange_state(self, symbol: str | None = None, *, include_history: bool = False) -> ExchangeStateSnapshot:
|
||||
"""Fetch exchange state with parallel HTTP calls.
|
||||
|
||||
The three primary calls (balance, positions, openOrders) are
|
||||
independent and run concurrently via ``asyncio.gather``. Each has
|
||||
its own rate-limit fallback so a single throttle does not block
|
||||
the others. Historical calls (allOrders, allFillOrders) are gated
|
||||
on ``include_history`` and also gathered.
|
||||
"""
|
||||
balance_task = self._safe_get("/openApi/swap/v2/user/balance")
|
||||
positions_task = self._safe_get("/openApi/swap/v2/user/positions")
|
||||
orders_task = self._safe_get("/openApi/swap/v2/trade/openOrders")
|
||||
|
||||
balance_payload, positions_payload, open_orders_payload = await asyncio.gather(
|
||||
balance_task, positions_task, orders_task,
|
||||
)
|
||||
|
||||
all_orders_payload: Any = []
|
||||
all_fills_payload: Any = []
|
||||
if include_history and symbol is not None:
|
||||
venue_symbol = self._instrument_venue_symbol(symbol)
|
||||
hist_tasks = asyncio.gather(
|
||||
self._safe_get("/openApi/swap/v2/trade/allOrders", {"symbol": venue_symbol}),
|
||||
self._safe_get("/openApi/swap/v2/trade/allFillOrders", {"symbol": venue_symbol}),
|
||||
return_exceptions=True,
|
||||
)
|
||||
results = await hist_tasks
|
||||
all_orders_payload = results[0] if not isinstance(results[0], Exception) else []
|
||||
all_fills_payload = results[1] if not isinstance(results[1], Exception) else []
|
||||
|
||||
# Parse results (shared logic, same as before)
|
||||
if isinstance(balance_payload, list):
|
||||
balances = balance_payload
|
||||
elif isinstance(balance_payload, dict):
|
||||
rows_raw = balance_payload.get("balance") or balance_payload.get("balances") or balance_payload.get("data")
|
||||
if isinstance(rows_raw, dict):
|
||||
balances = [rows_raw]
|
||||
elif isinstance(rows_raw, list):
|
||||
balances = rows_raw
|
||||
else:
|
||||
balances = []
|
||||
else:
|
||||
balances = []
|
||||
positions_rows = _rows_from_payload(positions_payload, "positions", "data")
|
||||
positions: dict[str, dict[str, Any]] = {}
|
||||
for row in positions_rows:
|
||||
raw_symbol = str(row.get("symbol") or row.get("symbolName") or row.get("venueSymbol") or "")
|
||||
key = _normalize_symbol(raw_symbol)
|
||||
if not key:
|
||||
continue
|
||||
positions[key] = dict(row)
|
||||
open_orders = _rows_from_payload(open_orders_payload, "orders", "data")
|
||||
capital = _capital_from_balance_rows(balances)
|
||||
open_notional = _position_notional_from_rows(positions_rows)
|
||||
equity = capital
|
||||
if open_notional > 0 and positions_rows:
|
||||
equity = capital
|
||||
snapshot = ExchangeStateSnapshot(
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
capital=capital,
|
||||
equity=equity,
|
||||
open_positions=positions,
|
||||
open_orders=[dict(row) for row in open_orders],
|
||||
all_orders=[dict(row) for row in _rows_from_payload(all_orders_payload, "orders", "data")],
|
||||
all_fills=[dict(row) for row in _rows_from_payload(all_fills_payload, "fills", "data")],
|
||||
account={"balances": balances},
|
||||
open_notional=open_notional,
|
||||
source="bingx",
|
||||
recovered=bool(include_history),
|
||||
)
|
||||
self._state = snapshot
|
||||
return snapshot
|
||||
|
||||
async def refresh_state(self, symbol: str | None = None, *, include_history: bool = False) -> ExchangeStateSnapshot:
|
||||
return await self._refresh_exchange_state(symbol, include_history=include_history)
|
||||
|
||||
async def submit_intent(self, intent: Intent) -> ExecutionReceipt:
|
||||
symbol = self._instrument_venue_symbol(intent.asset)
|
||||
if intent.action == DecisionAction.EXIT:
|
||||
side = "SELL" if intent.side == TradeSide.LONG else "BUY"
|
||||
else:
|
||||
side = "BUY" if intent.side == TradeSide.LONG else "SELL"
|
||||
# Entries must be free to open the slot; only exits are reduce-only.
|
||||
reduce_only = bool(intent.action == DecisionAction.EXIT)
|
||||
if reduce_only:
|
||||
self._exit_client_order_seq += 1
|
||||
client_order_id = f"pink:{self._client_order_run_id}:x{self._exit_client_order_seq:02d}"
|
||||
else:
|
||||
self._entry_client_order_seq += 1
|
||||
client_order_id = f"pink:{self._client_order_run_id}:e{self._entry_client_order_seq:02d}"
|
||||
leverage = normalize_bingx_leverage_value(
|
||||
int(round(float(intent.leverage or self._config.default_leverage))),
|
||||
exchange_max=self._config.exchange_leverage_cap,
|
||||
)
|
||||
try:
|
||||
await self._client.signed_post(
|
||||
"/openApi/swap/v2/trade/leverage",
|
||||
{"symbol": symbol, "side": "BOTH", "leverage": leverage},
|
||||
)
|
||||
# Honor the order type forwarded by the venue adapter
|
||||
# (bingx_venue._legacy_intent sets _order_type/_limit_price). MARKET
|
||||
# is the default; a LIMIT carries a resting price + GTC and will not
|
||||
# fill synchronously — the async-fill pump settles it later.
|
||||
order_type = str((intent.metadata or {}).get("_order_type", "MARKET") or "MARKET").upper()
|
||||
limit_price = float((intent.metadata or {}).get("_limit_price", 0.0) or 0.0)
|
||||
is_limit = order_type == "LIMIT" and limit_price > 0.0
|
||||
payload: dict[str, Any] = {
|
||||
"symbol": symbol,
|
||||
"side": side,
|
||||
"positionSide": "BOTH",
|
||||
"type": "LIMIT" if is_limit else "MARKET",
|
||||
"quantity": self._format_quantity(intent.asset, intent.target_size),
|
||||
"clientOrderId": client_order_id,
|
||||
"recvWindow": str(int(self._config.recv_window_ms)),
|
||||
}
|
||||
if is_limit:
|
||||
payload["price"] = self._format_price(intent.asset, limit_price)
|
||||
payload["timeInForce"] = "GTC"
|
||||
if reduce_only:
|
||||
payload["reduceOnly"] = "true"
|
||||
ack_payload = await self._client.signed_post("/openApi/swap/v2/trade/order", payload)
|
||||
ack = BingxOrderAck.from_http(ack_payload if isinstance(ack_payload, dict) else {})
|
||||
ack_row = dict(unwrap_order_payload(ack_payload)) if isinstance(ack_payload, dict) else {}
|
||||
status = str(ack_row.get("status") or ack.status or "ACKED")
|
||||
fill_price = 0.0
|
||||
for key in ("avgPrice", "avgFilledPrice", "price", "lastFillPrice", "tradePrice"):
|
||||
try:
|
||||
value = float(ack_row.get(key) or 0.0)
|
||||
except Exception:
|
||||
value = 0.0
|
||||
if value > 0:
|
||||
fill_price = value
|
||||
break
|
||||
if fill_price <= 0 and self._state is not None:
|
||||
# Use the last known exchange mark as a fallback for projected accounting.
|
||||
fill_price = next((float(row.get("markPrice") or row.get("avgPrice") or 0.0) for row in self._state.open_positions.values() if float(row.get("markPrice") or row.get("avgPrice") or 0.0) > 0), 0.0)
|
||||
except BingxHttpError as exc:
|
||||
status = "RATE_LIMITED" if _is_rate_limited_error(exc) else "REJECTED"
|
||||
ack_row = {
|
||||
"status": status,
|
||||
"msg": str(exc),
|
||||
"symbol": symbol,
|
||||
"clientOrderId": client_order_id,
|
||||
}
|
||||
fill_price = 0.0
|
||||
ack = None
|
||||
receipt = ExecutionReceipt(
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
status=status,
|
||||
symbol=symbol,
|
||||
side=side,
|
||||
action=intent.action.value,
|
||||
quantity=float(intent.target_size or 0.0),
|
||||
price=fill_price,
|
||||
client_order_id=client_order_id,
|
||||
order_id=str((ack.order_id if 'ack' in locals() and ack is not None else '') or ack_row.get("orderId") or ""),
|
||||
raw_ack=ack_row,
|
||||
raw_state=dict(self._state.account if self._state is not None else {}),
|
||||
)
|
||||
# Refresh from the venue so the direct runtime can use exchange-led state.
|
||||
self._state = await self._refresh_exchange_state(intent.asset, include_history=True)
|
||||
return receipt
|
||||
|
||||
async def reconcile(self, symbol: str | None = None) -> ExchangeStateSnapshot:
|
||||
# Recovery-only path: ask the venue for authoritative account/position/order state.
|
||||
return await self._refresh_exchange_state(symbol, include_history=True)
|
||||
@@ -24,7 +24,8 @@ from enum import Enum
|
||||
from typing import Any, Callable, Mapping, Optional
|
||||
|
||||
from prod.clean_arch.dita import AccountProjection, Decision, DecisionAction, Intent, TradeSide, TradeStage
|
||||
from prod.clean_arch.dita_v2.contracts import KernelDiagnosticCode, KernelOutcome
|
||||
from prod.clean_arch.dita_v2.contracts import KernelDiagnosticCode, KernelEventKind, KernelOutcome
|
||||
from prod.clean_arch.dita_v2.contracts import KernelSeverity, TradeStage as KernelStage
|
||||
|
||||
Writer = Callable[[str, dict[str, Any]], None]
|
||||
|
||||
@@ -222,6 +223,71 @@ class PinkClickHousePersistence:
|
||||
phase: str = "step",
|
||||
market_state: Mapping[str, Any] | None = None,
|
||||
) -> None:
|
||||
"""Two-phase persist: log the REQUEST, then log the RESULT.
|
||||
|
||||
REQUEST (:meth:`persist_request`) — the decision/order that was
|
||||
submitted (policy_events + a trade_reconstruction ORDER_REQUESTED row).
|
||||
RESULT (:meth:`persist_result`) — the settled state snapshot plus the
|
||||
per-fill lifecycle rows, gated on *evidence of an actual fill*. A resting
|
||||
LIMIT order (ACK only, no fill) therefore emits state snapshots but no
|
||||
terminal rows; the async-fill pump persists those later via the same
|
||||
result path. The synchronous-MARKET path is unchanged: its FILL event
|
||||
(or the slot's filled/closed state) trips the same gate.
|
||||
"""
|
||||
self.persist_request(
|
||||
snapshot=snapshot, decision=decision, intent=intent,
|
||||
phase=phase, market_state=market_state,
|
||||
)
|
||||
self.persist_result(
|
||||
snapshot=snapshot, decision=decision, intent=intent, outcome=outcome,
|
||||
slot_dict=slot_dict, phase=phase, market_state=market_state,
|
||||
)
|
||||
|
||||
def persist_request(
|
||||
self,
|
||||
*,
|
||||
snapshot: Any,
|
||||
decision: Decision,
|
||||
intent: Intent,
|
||||
phase: str = "step",
|
||||
market_state: Mapping[str, Any] | None = None,
|
||||
) -> None:
|
||||
"""Phase 1 — log the requested decision/order (no fill data)."""
|
||||
self._write_policy_event(snapshot, decision, intent, phase=phase)
|
||||
if decision.action in (DecisionAction.ENTER, DecisionAction.EXIT):
|
||||
self._write_trade_reconstruction(
|
||||
snapshot, intent.trade_id,
|
||||
event_type="ORDER_REQUESTED",
|
||||
event_id=f"{intent.trade_id}:request:{decision.action.value.lower()}",
|
||||
payload={
|
||||
"decision": _decision_summary(decision),
|
||||
"intent": _intent_summary(intent),
|
||||
"market_state": _json_safe(market_state or {}),
|
||||
},
|
||||
market_state=market_state,
|
||||
)
|
||||
|
||||
def persist_result(
|
||||
self,
|
||||
*,
|
||||
snapshot: Any,
|
||||
decision: Decision,
|
||||
intent: Intent,
|
||||
outcome: KernelOutcome | None = None,
|
||||
slot_dict: dict[str, Any] | None = None,
|
||||
phase: str = "step",
|
||||
market_state: Mapping[str, Any] | None = None,
|
||||
) -> None:
|
||||
"""Phase 2 — log the settled state + per-fill lifecycle rows.
|
||||
|
||||
The state snapshot rows (account_events, position_state,
|
||||
status_snapshots) always reflect the current slot. The lifecycle rows
|
||||
(ENTRY_FILLED / PARTIAL_EXIT / EXIT / trade_events / trade_exit_legs) are
|
||||
emitted only when a fill is *evidenced* — a FULL/PARTIAL_FILL event in
|
||||
``outcome.emitted_events``, a closed slot, or a slot whose size dropped
|
||||
vs the last leg snapshot. A resting LIMIT (ACK only) emits no terminal
|
||||
rows here.
|
||||
"""
|
||||
slot = slot_dict or {}
|
||||
stage = (
|
||||
TradeStage(decision.stage.value)
|
||||
@@ -231,12 +297,10 @@ class PinkClickHousePersistence:
|
||||
)
|
||||
status = self._state_label(slot, phase)
|
||||
|
||||
self._write_policy_event(snapshot, decision, intent, phase=phase)
|
||||
self._write_account_event(snapshot, decision, intent, stage=stage, slot_dict=slot)
|
||||
self._write_position_state(snapshot, decision, intent, slot_dict=slot, stage=stage, status=status, market_state=market_state)
|
||||
self._write_status_snapshot(snapshot, decision, intent, slot_dict=slot, phase=phase)
|
||||
|
||||
# Emit anomaly for diagnostic codes (except OK).
|
||||
if outcome is not None and outcome.diagnostic_code != KernelDiagnosticCode.OK:
|
||||
self._write_anomaly(
|
||||
snapshot, decision, intent,
|
||||
@@ -246,38 +310,56 @@ class PinkClickHousePersistence:
|
||||
)
|
||||
|
||||
if outcome is None:
|
||||
# Decision-only step (HOLD, no execution).
|
||||
# Decision-only step (HOLD): state snapshot already written.
|
||||
return
|
||||
|
||||
events = tuple(outcome.emitted_events or ())
|
||||
has_fill_evt = any(
|
||||
e.kind in (KernelEventKind.FULL_FILL, KernelEventKind.PARTIAL_FILL)
|
||||
for e in events
|
||||
)
|
||||
slot_closed = bool(slot.get("closed", False))
|
||||
cur_size = _safe_float(slot.get("size", 0.0), 0.0)
|
||||
slot_open = (not slot_closed) and cur_size > 0.0
|
||||
|
||||
if decision.action == DecisionAction.ENTER:
|
||||
# Reset per-trade leg deltas: a fresh position starts with zero
|
||||
# realized PnL and the full initial size remaining.
|
||||
self._leg_state[intent.trade_id] = {
|
||||
"prev_realized": 0.0,
|
||||
"prev_size": _safe_float(
|
||||
slot.get("initial_size", slot.get("size", 0.0)), 0.0
|
||||
) or _safe_float(intent.target_size, 0.0),
|
||||
"prev_leg_id": "",
|
||||
}
|
||||
self._write_trade_reconstruction(
|
||||
snapshot, intent.trade_id,
|
||||
event_type="ENTRY_FILLED",
|
||||
event_id=f"{intent.trade_id}:entry",
|
||||
payload={
|
||||
"decision": _decision_summary(decision),
|
||||
"intent": _intent_summary(intent),
|
||||
"outcome": _outcome_summary(outcome),
|
||||
"slot": slot,
|
||||
"market_state": _json_safe(market_state or {}),
|
||||
},
|
||||
market_state=market_state,
|
||||
)
|
||||
# Emit ENTRY_FILLED only once the entry is actually filled (fill event
|
||||
# or an open slot). A resting LIMIT entry emits nothing here.
|
||||
if has_fill_evt or slot_open:
|
||||
self._leg_state[intent.trade_id] = {
|
||||
"prev_realized": 0.0,
|
||||
"prev_size": _safe_float(
|
||||
slot.get("initial_size", slot.get("size", 0.0)), 0.0
|
||||
) or _safe_float(intent.target_size, 0.0),
|
||||
"prev_leg_id": "",
|
||||
}
|
||||
self._write_trade_reconstruction(
|
||||
snapshot, intent.trade_id,
|
||||
event_type="ENTRY_FILLED",
|
||||
event_id=f"{intent.trade_id}:entry",
|
||||
payload={
|
||||
"decision": _decision_summary(decision),
|
||||
"intent": _intent_summary(intent),
|
||||
"outcome": _outcome_summary(outcome),
|
||||
"slot": slot,
|
||||
"market_state": _json_safe(market_state or {}),
|
||||
},
|
||||
market_state=market_state,
|
||||
)
|
||||
return
|
||||
|
||||
if decision.action != DecisionAction.EXIT:
|
||||
return
|
||||
|
||||
partial = slot.get("closed", False) is False and slot.get("size", 0) > 0
|
||||
# An exit leg is evidenced by a fill event, a closed slot, or a drop in
|
||||
# remaining size vs the previous leg snapshot. A resting LIMIT exit (no
|
||||
# size change) emits nothing until the async-fill pump observes the fill.
|
||||
prev_size = _safe_float(self._leg_state.get(intent.trade_id, {}).get("prev_size", 0.0), 0.0)
|
||||
exit_filled = has_fill_evt or slot_closed or (prev_size - cur_size > 1e-12)
|
||||
if not exit_filled:
|
||||
return
|
||||
|
||||
partial = (not slot_closed) and cur_size > 0.0
|
||||
# One trade_exit_legs row per exit leg (partial or final), BLUE-schema
|
||||
# compatible so PINK multi-exit trades reconcile against the same table.
|
||||
self._write_trade_exit_leg(snapshot, decision, intent, slot, outcome)
|
||||
@@ -295,9 +377,63 @@ class PinkClickHousePersistence:
|
||||
market_state=market_state,
|
||||
)
|
||||
# Terminal trade event.
|
||||
if slot.get("closed", False):
|
||||
if slot_closed:
|
||||
self._write_trade_event(snapshot, decision, intent, slot, outcome, market_state=market_state)
|
||||
|
||||
def persist_fill_events(
|
||||
self,
|
||||
*,
|
||||
snapshot: Any,
|
||||
events: Any,
|
||||
slot_dict: dict[str, Any] | None = None,
|
||||
market_state: Mapping[str, Any] | None = None,
|
||||
) -> None:
|
||||
"""Persist a late (async) venue fill drained by the runtime pump.
|
||||
|
||||
There is no fresh policy decision for an async fill, so we synthesize a
|
||||
minimal Decision/Intent from the post-fill slot + event and route it
|
||||
through :meth:`persist_result`. Direction (ENTER vs EXIT) is inferred
|
||||
from the slot: a closed slot or a drop in remaining size vs the last leg
|
||||
snapshot is an EXIT; otherwise an opening fill is an ENTER. Capital
|
||||
authority remains the kernel — this only logs the settled result.
|
||||
"""
|
||||
slot = slot_dict or {}
|
||||
event_list = tuple(events or ())
|
||||
trade_id = str(slot.get("trade_id") or "")
|
||||
asset = str(slot.get("asset") or "")
|
||||
side = self._slot_side(slot)
|
||||
closed = bool(slot.get("closed", False))
|
||||
cur_size = self._slot_size(slot)
|
||||
leverage = _safe_float(slot.get("leverage", 1.0), 1.0)
|
||||
price = next((float(getattr(e, "price", 0.0) or 0.0) for e in event_list if getattr(e, "price", 0.0)), 0.0) or self._slot_entry_price(slot)
|
||||
prev_size = _safe_float(self._leg_state.get(trade_id, {}).get("prev_size", 0.0), 0.0)
|
||||
is_exit = closed or (prev_size > 0.0 and cur_size < prev_size - 1e-12)
|
||||
action = DecisionAction.EXIT if is_exit else DecisionAction.ENTER
|
||||
ts = getattr(snapshot, "timestamp", datetime.now(timezone.utc))
|
||||
|
||||
decision = Decision(
|
||||
timestamp=ts, decision_id=trade_id or "async", asset=asset, action=action,
|
||||
side=side, reason="ASYNC_FILL", confidence=0.0, velocity_divergence=0.0,
|
||||
irp_alignment=0.0, reference_price=price, target_size=cur_size,
|
||||
leverage=leverage, stage=TradeStage.POSITION_UPDATED, metadata={},
|
||||
)
|
||||
intent = Intent(
|
||||
timestamp=ts, trade_id=trade_id, decision_id=trade_id or "async", asset=asset,
|
||||
action=action, side=side, reason="ASYNC_FILL", target_size=cur_size,
|
||||
leverage=leverage, reference_price=price, confidence=0.0,
|
||||
exit_leg_ratios=tuple(slot.get("exit_leg_ratios", (1.0,)) or (1.0,)), metadata={},
|
||||
)
|
||||
outcome = KernelOutcome(
|
||||
accepted=True, slot_id=int(slot.get("slot_id", 0) or 0), trade_id=trade_id,
|
||||
state=KernelStage.CLOSED if closed else KernelStage.POSITION_OPEN,
|
||||
diagnostic_code=KernelDiagnosticCode.OK, severity=KernelSeverity.INFO,
|
||||
transitions=(), emitted_events=event_list, details={"origin": "async_fill_pump"},
|
||||
)
|
||||
self.persist_result(
|
||||
snapshot=snapshot, decision=decision, intent=intent, outcome=outcome,
|
||||
slot_dict=slot, phase="async_fill", market_state=market_state,
|
||||
)
|
||||
|
||||
def persist_recovery_state(
|
||||
self,
|
||||
*,
|
||||
|
||||
@@ -30,6 +30,7 @@ from prod.clean_arch.dita import (
|
||||
)
|
||||
from prod.clean_arch.dita_v2.contracts import (
|
||||
KernelCommandType,
|
||||
KernelDiagnosticCode,
|
||||
KernelIntent,
|
||||
TradeSide as DitaTradeSide,
|
||||
TradeStage,
|
||||
@@ -306,9 +307,68 @@ class PinkDirectRuntime:
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
async def pump_venue_events(
|
||||
self, snapshot: Any | None = None, *, market_state: Any = None
|
||||
) -> int:
|
||||
"""Drain late (async) venue fills into the kernel and persist the result.
|
||||
|
||||
Resting LIMIT and partial fills arrive *after* the submitting
|
||||
``process_intent`` returns. This calls ``venue.reconcile()`` and feeds
|
||||
each event to ``kernel.on_venue_event`` so capital settles and the FSM
|
||||
advances; the kernel dedups duplicates via ``seen_event_ids`` /
|
||||
``_last_settled_pnl`` (no double-settle). Only events the kernel actually
|
||||
applied (accepted, not DUPLICATE_EVENT) are persisted, via the two-phase
|
||||
result-logger. Capital authority stays ``kernel.account``.
|
||||
|
||||
Returns the number of applied events.
|
||||
"""
|
||||
venue = self.kernel.venue
|
||||
reconcile = getattr(venue, "reconcile", None)
|
||||
if reconcile is None:
|
||||
return 0
|
||||
try:
|
||||
events = reconcile()
|
||||
if inspect.isawaitable(events):
|
||||
events = await events
|
||||
except Exception as exc:
|
||||
self.logger.warning("Venue reconcile failed: %s", exc)
|
||||
return 0
|
||||
events = list(events or [])
|
||||
if not events:
|
||||
return 0
|
||||
|
||||
applied: list[Any] = []
|
||||
for event in events:
|
||||
try:
|
||||
outcome = self.kernel.on_venue_event(event)
|
||||
except Exception as exc:
|
||||
self.logger.warning("on_venue_event failed: %s", exc)
|
||||
continue
|
||||
if getattr(outcome, "accepted", False) and getattr(
|
||||
outcome, "diagnostic_code", None
|
||||
) != KernelDiagnosticCode.DUPLICATE_EVENT:
|
||||
applied.append(event)
|
||||
|
||||
if applied and self.persistence is not None:
|
||||
slot_dict = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {}
|
||||
persist_snapshot = snapshot
|
||||
if persist_snapshot is None:
|
||||
persist_snapshot = SimpleNamespace(
|
||||
timestamp=datetime.now(timezone.utc),
|
||||
symbol=str(slot_dict.get("asset", "")),
|
||||
)
|
||||
self.persistence.persist_fill_events(
|
||||
snapshot=persist_snapshot,
|
||||
events=applied,
|
||||
slot_dict=slot_dict,
|
||||
market_state=market_state or {},
|
||||
)
|
||||
return len(applied)
|
||||
|
||||
async def step(self, snapshot: MarketSnapshot) -> Decision:
|
||||
"""Single policy + execution cycle.
|
||||
|
||||
0. Pump late (async) venue fills into the kernel (LIMIT/partial settle)
|
||||
1. Update market state
|
||||
2. Decide (policy layer)
|
||||
3. Plan (intent layer)
|
||||
@@ -317,6 +377,9 @@ class PinkDirectRuntime:
|
||||
6. Persist
|
||||
"""
|
||||
market_state = self._update_market_state_runtime(snapshot)
|
||||
# Drain any late fills BEFORE the policy reads slot/account state, so a
|
||||
# resting LIMIT that filled since the last cycle is reflected.
|
||||
await self.pump_venue_events(snapshot, market_state=market_state)
|
||||
acc = self.kernel.snapshot()["account"]
|
||||
slot_view = self.kernel.slot(0) if self.kernel.max_slots > 0 else None
|
||||
slot_dict = slot_view.to_dict() if slot_view is not None else {}
|
||||
|
||||
Reference in New Issue
Block a user