From 8d85d75ded045b02321e11340de812cdff15e1de Mon Sep 17 00:00:00 2001 From: Codex Date: Wed, 3 Jun 2026 13:26:36 +0200 Subject: [PATCH] PINK DITAv2: Hz writes + vol_ok gate + leverage logging + 8 new tests (94/94 green) --- prod/clean_arch/adapters/bingx_direct.py | 513 ++++++++++++++++++ prod/clean_arch/dita/decision.py | 209 +++++++ .../dita_v2/hazelcast_projection.py | 176 ++++++ prod/clean_arch/dita_v2/test_flaws.py | 140 +++++ prod/clean_arch/runtime/pink_direct.py | 121 ++++- prod/launch_dolphin_pink.py | 417 ++++++++++++++ 6 files changed, 1570 insertions(+), 6 deletions(-) create mode 100644 prod/clean_arch/adapters/bingx_direct.py create mode 100644 prod/clean_arch/dita/decision.py create mode 100644 prod/clean_arch/dita_v2/hazelcast_projection.py create mode 100644 prod/launch_dolphin_pink.py diff --git a/prod/clean_arch/adapters/bingx_direct.py b/prod/clean_arch/adapters/bingx_direct.py new file mode 100644 index 0000000..715a72a --- /dev/null +++ b/prod/clean_arch/adapters/bingx_direct.py @@ -0,0 +1,513 @@ +"""Direct BingX execution adapter with no Nautilus Trader node dependency. + +This adapter speaks BingX REST directly and keeps the exchange state +authoritative. It is intended for PINK live execution under the DITA boundary. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import math +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone +from decimal import Decimal, ROUND_DOWN +from typing import Any, Optional + +from nautilus_trader.model.identifiers import InstrumentId + +from prod.bingx.config import BingxExecClientConfig +from prod.bingx.config import BingxInstrumentProviderConfig +from prod.bingx.enums import BingxEnvironment +from prod.bingx.http import BingxHttpError +from prod.bingx.http import BingxHttpClient +from prod.bingx.instrument_provider import BingxInstrumentProvider +from prod.bingx.leverage import normalize_bingx_leverage_value +from prod.bingx.schemas import BingxOrderAck +from prod.bingx.schemas import unwrap_order_payload +from prod.clean_arch.dita import Intent, TradeSide, DecisionAction +from prod.clean_arch.ports.execution import ExchangeStateSnapshot +from prod.clean_arch.ports.execution import ExecutionReceipt +from prod.clean_arch.ports.execution import ExecutionPort + +LOGGER = logging.getLogger(__name__) + + +def _rows_from_payload(payload: Any, *keys: str) -> list[dict[str, Any]]: + if isinstance(payload, list): + return [row for row in payload if isinstance(row, dict)] + if isinstance(payload, dict): + for key in keys: + rows = payload.get(key) + if isinstance(rows, list): + return [row for row in rows if isinstance(row, dict)] + return [] + + +def _capital_from_balance_rows(rows: Any) -> float: + if not isinstance(rows, list): + return 0.0 + for row in rows: + if not isinstance(row, dict): + continue + capital = 0.0 + for key in ("total", "balance", "equity", "availableMargin", "availableBalance", "walletBalance", "free"): + try: + capital = float(row.get(key, 0.0) or 0.0) + except Exception: + continue + if capital > 0 and math.isfinite(capital): + return capital + if capital > 0 and math.isfinite(capital): + return capital + return 0.0 + + +def _position_notional_from_rows(rows: Any) -> float: + if not isinstance(rows, list): + return 0.0 + total = 0.0 + for row in rows: + if not isinstance(row, dict): + continue + try: + qty = abs( + float( + row.get("positionAmt") + or row.get("positionQty") + or row.get("positionSize") + or row.get("quantity") + or row.get("pa") + or 0.0 + ) + ) + if qty <= 0.0: + continue + notional = row.get("positionValue") or row.get("notional") or row.get("openNotional") + if notional is not None: + total += abs(float(notional or 0.0)) + continue + entry = ( + row.get("entryPrice") + or row.get("avgPrice") + or row.get("markPrice") + or row.get("avgEntryPrice") + or row.get("ep") + or row.get("ap") + or 0.0 + ) + total += qty * abs(float(entry or 0.0)) + except Exception: + continue + return total + + +def _normalize_symbol(symbol: str) -> str: + return str(symbol or "").replace("-", "").replace("_", "").replace("/","").upper() + + +def _venue_symbol_from_asset(asset: str) -> str: + text = _normalize_symbol(asset) + if text.endswith("USDT"): + return f"{text[:-4]}-USDT" + return text + + +def _decimal_text(value: Decimal) -> str: + text = format(value.normalize(), "f") + if "." in text: + text = text.rstrip("0").rstrip(".") + return text or "0" + + +def _is_rate_limited_error(exc: Exception) -> bool: + message = str(exc) + lowered = message.lower() + return "100410" in message or "frequency limit" in lowered or "rate limit" in lowered + + +@dataclass(frozen=True) +class BingxDirectExecutionConfig: + """Execution-specific knobs for the direct adapter.""" + + environment: BingxEnvironment = BingxEnvironment.VST + allow_mainnet: bool = False + default_leverage: int = 1 + exchange_leverage_cap: int = 3 + recv_window_ms: int = 5_000 + prefer_websocket: bool = False + use_reduce_only: bool = True + journal_strategy: str = "pink" + journal_db: str = "dolphin_pink" + instrument_provider: BingxInstrumentProviderConfig = BingxInstrumentProviderConfig(load_all=True) + + +class BingxDirectExecutionAdapter(ExecutionPort): + """Direct BingX execution boundary with exchange-led state snapshots.""" + + def __init__( + self, + config: BingxExecClientConfig | BingxDirectExecutionConfig, + *, + client: BingxHttpClient | None = None, + provider: BingxInstrumentProvider | None = None, + ) -> None: + if isinstance(config, BingxExecClientConfig): + self._config = BingxDirectExecutionConfig( + environment=config.environment, + allow_mainnet=config.allow_mainnet, + default_leverage=int(config.default_leverage), + exchange_leverage_cap=int(config.exchange_leverage_cap), + recv_window_ms=int(config.recv_window_ms), + prefer_websocket=bool(config.prefer_websocket), + use_reduce_only=bool(config.use_reduce_only), + journal_strategy=str(config.journal_strategy or "pink"), + journal_db=str(config.journal_db or "dolphin_pink"), + instrument_provider=config.instrument_provider, + ) + http_config = config + else: + self._config = config + http_config = BingxExecClientConfig( + api_key="", + secret_key="", + environment=config.environment, + allow_mainnet=config.allow_mainnet, + prefer_websocket=config.prefer_websocket, + sizing_mode="testnet", + exchange_leverage_cap=config.exchange_leverage_cap, + use_reduce_only=config.use_reduce_only, + default_leverage=config.default_leverage, + recv_window_ms=config.recv_window_ms, + journal_strategy=config.journal_strategy, + journal_db=config.journal_db, + instrument_provider=config.instrument_provider, + ) + self._client = client or BingxHttpClient(http_config) + self._provider = provider or BingxInstrumentProvider(client=self._client, config=self._config.instrument_provider) + self._log = LOGGER + self._client_order_run_id = uuid.uuid4().hex[:8] + self._entry_client_order_seq = 0 + self._exit_client_order_seq = 0 + self._state: ExchangeStateSnapshot | None = None + self._connected = False + + @property + def state(self) -> ExchangeStateSnapshot | None: + return self._state + + async def connect(self) -> bool: + await self._provider.initialize() + self._connected = True + self._state = await self.refresh_state() + return True + + async def disconnect(self) -> None: + self._connected = False + await self._client.close() + + def _resolve_instrument(self, asset: str): + normalized = _normalize_symbol(asset) + candidates = [ + InstrumentId.from_str(f"{normalized}.BINGX"), + InstrumentId.from_str(f"{_venue_symbol_from_asset(asset)}.BINGX"), + ] + for candidate in candidates: + instrument = self._provider.find(candidate) + if instrument is not None: + return instrument + for instrument in self._provider.list_all(): + if _normalize_symbol(instrument.symbol.value) == normalized: + return instrument + if _normalize_symbol(instrument.raw_symbol.value) == normalized: + return instrument + return None + + def _instrument_venue_symbol(self, asset: str) -> str: + instrument = self._resolve_instrument(asset) + if instrument is not None: + return str(instrument.raw_symbol.value) + return _venue_symbol_from_asset(asset) + + def _instrument_step(self, asset: str) -> Decimal: + instrument = self._resolve_instrument(asset) + if instrument is not None: + try: + return Decimal(str(instrument.size_increment.as_decimal())) + except Exception: + pass + return Decimal("0.001") + + def _format_quantity(self, asset: str, quantity: float) -> str: + step = self._instrument_step(asset) + if step <= 0: + return str(max(0.0, quantity)) + value = Decimal(str(quantity)) + quantized = (value / step).to_integral_value(rounding=ROUND_DOWN) * step + return _decimal_text(max(Decimal("0"), quantized)) + + def _instrument_tick(self, asset: str) -> Decimal: + instrument = self._resolve_instrument(asset) + if instrument is not None: + try: + tick = getattr(instrument, "price_increment", None) + if tick is not None: + return Decimal(str(tick.as_decimal())) + except Exception: + pass + return Decimal("0.01") + + def _format_price(self, asset: str, price: float) -> str: + tick = self._instrument_tick(asset) + if tick <= 0: + return f"{price:.8f}".rstrip("0").rstrip(".") + value = Decimal(str(price)) + quantized = (value / tick).to_integral_value(rounding=ROUND_DOWN) * tick + return _decimal_text(max(Decimal("0"), quantized)) + + async def _safe_get(self, endpoint: str, params: dict | None = None, *, fallback: Any = None) -> Any: + """GET an endpoint, returning *fallback* on rate-limit errors.""" + try: + return await self._client.signed_get(endpoint, params) + except BingxHttpError as exc: + message = str(exc) + if "100410" in message or "frequency limit" in message.lower(): + LOGGER.debug("BingX %s rate-limited; continuing with empty snapshot", endpoint) + return fallback if fallback is not None else [] + raise + + async def _refresh_exchange_state(self, symbol: str | None = None, *, include_history: bool = False) -> ExchangeStateSnapshot: + """Fetch exchange state with parallel HTTP calls. + + The three primary calls (balance, positions, openOrders) are + independent and run concurrently via ``asyncio.gather``. Each has + its own rate-limit fallback so a single throttle does not block + the others. Historical calls (allOrders, allFillOrders) are gated + on ``include_history`` and also gathered. + """ + balance_task = self._safe_get("/openApi/swap/v2/user/balance") + positions_task = self._safe_get("/openApi/swap/v2/user/positions") + orders_task = self._safe_get("/openApi/swap/v2/trade/openOrders") + + balance_payload, positions_payload, open_orders_payload = await asyncio.gather( + balance_task, positions_task, orders_task, + ) + + all_orders_payload: Any = [] + all_fills_payload: Any = [] + if include_history and symbol is not None: + venue_symbol = self._instrument_venue_symbol(symbol) + hist_tasks = asyncio.gather( + self._safe_get("/openApi/swap/v2/trade/allOrders", {"symbol": venue_symbol}), + self._safe_get("/openApi/swap/v2/trade/allFillOrders", {"symbol": venue_symbol}), + return_exceptions=True, + ) + results = await hist_tasks + all_orders_payload = results[0] if not isinstance(results[0], Exception) else [] + all_fills_payload = results[1] if not isinstance(results[1], Exception) else [] + + # Parse results (shared logic, same as before) + if isinstance(balance_payload, list): + balances = balance_payload + elif isinstance(balance_payload, dict): + rows_raw = balance_payload.get("balance") or balance_payload.get("balances") or balance_payload.get("data") + if isinstance(rows_raw, dict): + balances = [rows_raw] + elif isinstance(rows_raw, list): + balances = rows_raw + else: + balances = [] + else: + balances = [] + positions_rows = _rows_from_payload(positions_payload, "positions", "data") + positions: dict[str, dict[str, Any]] = {} + for row in positions_rows: + raw_symbol = str(row.get("symbol") or row.get("symbolName") or row.get("venueSymbol") or "") + key = _normalize_symbol(raw_symbol) + if not key: + continue + positions[key] = dict(row) + open_orders = _rows_from_payload(open_orders_payload, "orders", "data") + capital = _capital_from_balance_rows(balances) + open_notional = _position_notional_from_rows(positions_rows) + equity = capital + if open_notional > 0 and positions_rows: + equity = capital + snapshot = ExchangeStateSnapshot( + timestamp=datetime.now(timezone.utc), + capital=capital, + equity=equity, + open_positions=positions, + open_orders=[dict(row) for row in open_orders], + all_orders=[dict(row) for row in _rows_from_payload(all_orders_payload, "orders", "data")], + all_fills=[dict(row) for row in _rows_from_payload(all_fills_payload, "fills", "data")], + account={"balances": balances}, + open_notional=open_notional, + source="bingx", + recovered=bool(include_history), + ) + self._state = snapshot + return snapshot + + async def refresh_state(self, symbol: str | None = None, *, include_history: bool = False) -> ExchangeStateSnapshot: + return await self._refresh_exchange_state(symbol, include_history=include_history) + + async def submit_intent(self, intent: Intent) -> ExecutionReceipt: + symbol = self._instrument_venue_symbol(intent.asset) + if intent.action == DecisionAction.EXIT: + side = "SELL" if intent.side == TradeSide.LONG else "BUY" + else: + side = "BUY" if intent.side == TradeSide.LONG else "SELL" + # Entries must be free to open the slot; only exits are reduce-only. + reduce_only = bool(intent.action == DecisionAction.EXIT) + if reduce_only: + self._exit_client_order_seq += 1 + client_order_id = f"pink:{self._client_order_run_id}:x{self._exit_client_order_seq:02d}" + else: + self._entry_client_order_seq += 1 + client_order_id = f"pink:{self._client_order_run_id}:e{self._entry_client_order_seq:02d}" + leverage = normalize_bingx_leverage_value( + int(round(float(intent.leverage or self._config.default_leverage))), + exchange_max=self._config.exchange_leverage_cap, + ) + try: + await self._client.signed_post( + "/openApi/swap/v2/trade/leverage", + {"symbol": symbol, "side": "BOTH", "leverage": leverage}, + ) + except Exception as _lev_exc: + # W: leverage POST failed — order will execute at whatever leverage the + # exchange currently has for this symbol. Log prominently; do NOT abort + # the submit because the order may still succeed at the right leverage. + import logging as _logging + _logging.getLogger(__name__).warning( + "BingX leverage set failed (symbol=%s lev=%s): %s — proceeding with submit", + symbol, leverage, _lev_exc, + ) + try: + # Honor the order type forwarded by the venue adapter + # (bingx_venue._legacy_intent sets _order_type/_limit_price). MARKET + # is the default; a LIMIT carries a resting price + GTC and will not + # fill synchronously — the async-fill pump settles it later. + order_type = str((intent.metadata or {}).get("_order_type", "MARKET") or "MARKET").upper() + limit_price = float((intent.metadata or {}).get("_limit_price", 0.0) or 0.0) + is_limit = order_type == "LIMIT" and limit_price > 0.0 + payload: dict[str, Any] = { + "symbol": symbol, + "side": side, + "positionSide": "BOTH", + "type": "LIMIT" if is_limit else "MARKET", + "quantity": self._format_quantity(intent.asset, intent.target_size), + "clientOrderId": client_order_id, + "recvWindow": str(int(self._config.recv_window_ms)), + } + if is_limit: + payload["price"] = self._format_price(intent.asset, limit_price) + payload["timeInForce"] = "GTC" + if reduce_only: + payload["reduceOnly"] = "true" + ack_payload = await self._client.signed_post("/openApi/swap/v2/trade/order", payload) + ack = BingxOrderAck.from_http(ack_payload if isinstance(ack_payload, dict) else {}) + ack_row = dict(unwrap_order_payload(ack_payload)) if isinstance(ack_payload, dict) else {} + status = str(ack_row.get("status") or ack.status or "ACKED") + fill_price = 0.0 + for key in ("avgPrice", "avgFilledPrice", "price", "lastFillPrice", "tradePrice"): + try: + value = float(ack_row.get(key) or 0.0) + except Exception: + value = 0.0 + if value > 0: + fill_price = value + break + if fill_price <= 0 and self._state is not None: + # Use the last known exchange mark as a fallback for projected accounting. + fill_price = next((float(row.get("markPrice") or row.get("avgPrice") or 0.0) for row in self._state.open_positions.values() if float(row.get("markPrice") or row.get("avgPrice") or 0.0) > 0), 0.0) + except BingxHttpError as exc: + status = "RATE_LIMITED" if _is_rate_limited_error(exc) else "REJECTED" + ack_row = { + "status": status, + "msg": str(exc), + "symbol": symbol, + "clientOrderId": client_order_id, + } + fill_price = 0.0 + ack = None + receipt = ExecutionReceipt( + timestamp=datetime.now(timezone.utc), + status=status, + symbol=symbol, + side=side, + action=intent.action.value, + quantity=float(intent.target_size or 0.0), + price=fill_price, + client_order_id=client_order_id, + order_id=str((ack.order_id if 'ack' in locals() and ack is not None else '') or ack_row.get("orderId") or ""), + raw_ack=ack_row, + raw_state=dict(self._state.account if self._state is not None else {}), + ) + # Refresh from the venue so the direct runtime can use exchange-led state. + self._state = await self._refresh_exchange_state(intent.asset, include_history=True) + return receipt + + async def cancel(self, order: Any, *, reason: str = "") -> dict[str, Any]: + """Cancel a working order on the venue (resting LIMIT support). + + Signs the DELETE with the same client used for order placement, keyed by + the venue orderId (propagated onto the slot order by the kernel on ACK) + with a clientOrderId fallback. Returns the raw BingX response for the + venue adapter to map into a CANCEL_ACK / CANCEL_REJECT event. + """ + asset = str((getattr(order, "metadata", None) or {}).get("asset") or "") + symbol = self._instrument_venue_symbol(asset) if asset else "" + params: dict[str, Any] = { + "symbol": symbol, + "recvWindow": str(int(self._config.recv_window_ms)), + } + venue_order_id = str(getattr(order, "venue_order_id", "") or "") + venue_client_id = str(getattr(order, "venue_client_id", "") or "") + if venue_order_id: + params["orderId"] = venue_order_id + elif venue_client_id: + params["clientOrderId"] = venue_client_id + else: + return {"status": "REJECTED", "msg": "no order id to cancel", + "orderId": venue_order_id, "clientOrderId": venue_client_id} + delete_resp: dict[str, Any] = {} + try: + resp = await self._client.signed_delete("/openApi/swap/v2/trade/order", params) + delete_resp = resp if isinstance(resp, dict) else {"status": "CANCELED"} + except BingxHttpError as exc: + delete_resp = {"status": "RATE_LIMITED" if _is_rate_limited_error(exc) else "ERROR", "msg": str(exc)} + + # Truth-based confirmation: the cancel succeeded iff the order is no + # longer open on the venue. BingX can return transient errors (e.g. + # "order not exist", "same order number ... within 1 second" from an + # internal retry) even when the order was actually removed — so we trust + # exchange state, not the DELETE response. + still_open: bool | None = None + try: + oo = await self._client.signed_get("/openApi/swap/v2/trade/openOrders", {"symbol": symbol}) + rows = oo if isinstance(oo, list) else (oo.get("data") or oo.get("orders") or []) + if isinstance(rows, dict): + rows = rows.get("orders") or [] + ids = {str(r.get("orderId")) for r in rows if isinstance(r, dict)} + cids = {str(r.get("clientOrderId") or r.get("clientOrderID")) for r in rows if isinstance(r, dict)} + still_open = (venue_order_id in ids) if venue_order_id else (venue_client_id in cids) + except Exception: + still_open = None + + if still_open is False: + return {"status": "CANCELED", "orderId": venue_order_id, "clientOrderId": venue_client_id} + if str(delete_resp.get("status", "")).upper() in {"CANCELED", "CANCELLED", "SUCCESS", "OK"}: + return {"status": "CANCELED", "orderId": venue_order_id, "clientOrderId": venue_client_id} + return { + "status": delete_resp.get("status", "REJECTED"), + "msg": delete_resp.get("msg", "cancel not confirmed"), + "orderId": venue_order_id, "clientOrderId": venue_client_id, + } + + async def reconcile(self, symbol: str | None = None) -> ExchangeStateSnapshot: + # Recovery-only path: ask the venue for authoritative account/position/order state. + return await self._refresh_exchange_state(symbol, include_history=True) diff --git a/prod/clean_arch/dita/decision.py b/prod/clean_arch/dita/decision.py new file mode 100644 index 0000000..3d2178e --- /dev/null +++ b/prod/clean_arch/dita/decision.py @@ -0,0 +1,209 @@ +"""Pure decision engine.""" + +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime +from typing import Optional + +from prod.clean_arch.tp_curve import compute_our_leverage, compute_soft_tp_pct +from prod.clean_arch.ports.data_feed import MarketSnapshot + +from .contracts import Decision, DecisionAction, DecisionConfig, DecisionContext, TradePosition, TradeSide, TradeStage + + +@dataclass(frozen=True) +class _SnapshotFields: + price: float + vdiv: float + irp: float + ts: datetime + + +class DecisionEngine: + """BLUE-compatible decision engine. + + Decision only answers whether the system should enter/hold/exit. + It does not size orders or own exchange state. + """ + + def __init__(self, config: Optional[DecisionConfig] = None): + self.config = config or DecisionConfig() + + def decide( + self, + snapshot: MarketSnapshot, + context: DecisionContext, + position: Optional[TradePosition] = None, + ) -> Decision: + fields = self._extract(snapshot) + + if ( + not snapshot.is_valid() + or fields.price <= 0 + or not self._finite(fields.price) + or not self._finite(fields.vdiv) + or not self._finite(fields.irp) + ): + return Decision( + timestamp=fields.ts, + decision_id=self._decision_id(snapshot.symbol, context.trade_seq), + asset=snapshot.symbol, + action=DecisionAction.HOLD, + side=TradeSide.FLAT, + reason="INVALID_SNAPSHOT", + confidence=0.0, + velocity_divergence=fields.vdiv, + irp_alignment=fields.irp, + reference_price=fields.price, + target_size=0.0, + leverage=1.0, + metadata={"policy_version": self.config.policy_version}, + ) + + if position is not None and not position.closed: + return self._decide_exit(snapshot, position, context, fields) + + return self._decide_entry(snapshot, context, fields) + + def _decide_entry(self, snapshot: MarketSnapshot, context: DecisionContext, fields: _SnapshotFields) -> Decision: + if context.open_positions >= 1: + return self._hold(snapshot, context, fields, reason="CAPACITY_FULL") + if not self.config.allow_short: + return self._hold(snapshot, context, fields, reason="SHORT_DISABLED") + if fields.vdiv >= self.config.vel_div_threshold or fields.irp < self.config.min_irp_alignment: + return self._hold(snapshot, context, fields, reason="NO_SIGNAL") + # vol_ok gate — scan bridge marks low-volume periods; block ENTERs when absent + if snapshot.scan_payload and not snapshot.scan_payload.get("vol_ok", True): + return self._hold(snapshot, context, fields, reason="VOL_GATE") + confidence = min(1.0, max(0.05, abs(fields.vdiv / self.config.vel_div_threshold))) + leverage = min(self.config.max_leverage, max(1.0, 1.0 + confidence * (self.config.max_leverage - 1.0))) + target_exposure = context.capital * self.config.capital_fraction * leverage + target_size = target_exposure / fields.price if fields.price > 0 else 0.0 + our_leverage = compute_our_leverage(notional=target_exposure, capital=context.capital) + tp_base_pct = float(self.config.fixed_tp_pct) + tp_effective_pct = compute_soft_tp_pct(tp_base_pct, our_leverage) + return Decision( + timestamp=fields.ts, + decision_id=self._decision_id(snapshot.symbol, context.trade_seq), + asset=snapshot.symbol, + action=DecisionAction.ENTER, + side=TradeSide.SHORT, + reason="STRUCTURAL_DISLOCATION", + confidence=confidence, + velocity_divergence=fields.vdiv, + irp_alignment=fields.irp, + reference_price=fields.price, + target_size=target_size, + leverage=leverage, + metadata={ + "policy_version": self.config.policy_version, + "tp_base_pct": tp_base_pct, + "tp_effective_pct": tp_effective_pct, + "our_leverage": our_leverage, + "tp_curve": "soft_leverage_curve_v1", + }, + ) + + def _decide_exit( + self, + snapshot: MarketSnapshot, + position: TradePosition, + context: DecisionContext, + fields: _SnapshotFields, + ) -> Decision: + action = DecisionAction.HOLD + reason = "HOLD" + position_notional = position.size * fields.price if fields.price > 0 else position.size * position.entry_price + our_leverage = compute_our_leverage(notional=position_notional, capital=context.capital) + tp_base_pct = float(self.config.fixed_tp_pct) + tp_effective_pct = compute_soft_tp_pct(tp_base_pct, our_leverage) + + if position.side == TradeSide.SHORT: + tp_price = position.entry_price * (1.0 - tp_effective_pct) + if fields.price <= tp_price: + action = DecisionAction.EXIT + reason = "TAKE_PROFIT" + elif fields.price >= position.entry_price * (1.0 + (self.config.catastrophic_loss_pct / max(position.leverage, 1.0))): + action = DecisionAction.EXIT + reason = "CATASTROPHIC_LOSS" + elif position.bars_held >= self.config.max_hold_bars: + action = DecisionAction.EXIT + reason = "MAX_HOLD" + elif fields.vdiv >= 0.0: + action = DecisionAction.EXIT + reason = "MEAN_REVERSION" + + if position.side == TradeSide.LONG: + tp_price = position.entry_price * (1.0 + tp_effective_pct) + if fields.price >= tp_price: + action = DecisionAction.EXIT + reason = "TAKE_PROFIT" + elif fields.price <= position.entry_price * (1.0 - (self.config.catastrophic_loss_pct / max(position.leverage, 1.0))): + action = DecisionAction.EXIT + reason = "CATASTROPHIC_LOSS" + elif position.bars_held >= self.config.max_hold_bars: + action = DecisionAction.EXIT + reason = "MAX_HOLD" + elif fields.vdiv <= 0.0: + action = DecisionAction.EXIT + reason = "MEAN_REVERSION" + + return Decision( + timestamp=fields.ts, + decision_id=position.trade_id, + asset=position.asset, + action=action, + side=position.side, + reason=reason, + confidence=max(0.0, min(1.0, position.entry_irp_alignment)), + velocity_divergence=fields.vdiv, + irp_alignment=fields.irp, + reference_price=fields.price, + target_size=position.size, + leverage=position.leverage, + bars_held=position.bars_held, + stage=TradeStage.EXIT_REQUESTED if action == DecisionAction.EXIT else TradeStage.POSITION_UPDATED, + metadata={ + "policy_version": self.config.policy_version, + "tp_base_pct": tp_base_pct, + "tp_effective_pct": tp_effective_pct, + "our_leverage": our_leverage, + "tp_curve": "soft_leverage_curve_v1", + }, + ) + + def _hold(self, snapshot: MarketSnapshot, context: DecisionContext, fields: _SnapshotFields, reason: str) -> Decision: + return Decision( + timestamp=fields.ts, + decision_id=self._decision_id(snapshot.symbol, context.trade_seq), + asset=snapshot.symbol, + action=DecisionAction.HOLD, + side=TradeSide.FLAT, + reason=reason, + confidence=0.0, + velocity_divergence=fields.vdiv, + irp_alignment=fields.irp, + reference_price=fields.price, + target_size=0.0, + leverage=1.0, + metadata={"policy_version": self.config.policy_version}, + ) + + @staticmethod + def _extract(snapshot: MarketSnapshot) -> _SnapshotFields: + ts = snapshot.timestamp if isinstance(snapshot.timestamp, datetime) else datetime.utcnow() + return _SnapshotFields( + price=float(snapshot.price or 0.0), + vdiv=float(snapshot.velocity_divergence or 0.0), + irp=float(snapshot.irp_alignment or 0.0), + ts=ts, + ) + + @staticmethod + def _decision_id(symbol: str, seq: int) -> str: + return f"{symbol}-D-{seq:012d}" + + @staticmethod + def _finite(value: float) -> bool: + return value == value and value not in (float("inf"), float("-inf")) diff --git a/prod/clean_arch/dita_v2/hazelcast_projection.py b/prod/clean_arch/dita_v2/hazelcast_projection.py new file mode 100644 index 0000000..3948ed6 --- /dev/null +++ b/prod/clean_arch/dita_v2/hazelcast_projection.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +import json +from datetime import datetime, timezone +from typing import Any, Optional, Protocol + +from .contracts import KernelTransition, TradeSlot +from .control import KernelControlSnapshot +from .journal import _transition_row +from .projection import build_position_state_row +from .utils import json_safe + + +# ── Fire-and-forget Hz write helpers ───────────────────────────────────────── + +def _hz_write_no_wait(hz_map: Any, key: str, value: str) -> None: + """Submit Hz write to the client's internal thread pool. Never blocks. + + .put() without .blocking() returns a hazelcast Future immediately. + The Future is intentionally discarded — the network write is already + queued in the Hz client's thread pool and is not cancelled by GC. + Hz writes are observability-only; any failure must never affect trading. + """ + try: + hz_map.put(key, value) + except Exception: + pass + + +def _json_encode(payload: dict) -> str: + return json.dumps(payload, separators=(",", ":"), ensure_ascii=False, default=str) + + +def _utcnow_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _today_iso() -> str: + return datetime.now(timezone.utc).date().isoformat() + + +class HazelcastClientLike(Protocol): + def get_map(self, name: str): ... + def get_topic(self, name: str): ... + + +class HazelcastProjector: + """Durable BLUE/PINK-compatible projection mirror.""" + + def __init__( + self, + client: HazelcastClientLike | None = None, + *, + active_slots_map: str = "dita_active_slots", + events_topic: str = "dita_trade_events", + ) -> None: + self.client = client + self.active_slots_map = active_slots_map + self.events_topic = events_topic + + def publish_slot(self, slot: TradeSlot) -> None: + if self.client is None: + return + self.client.get_map(self.active_slots_map).put(slot.trade_id, build_position_state_row(slot)) + + def publish_event(self, event_type: str, payload: dict[str, Any]) -> None: + if self.client is None: + return + topic = self.client.get_topic(self.events_topic) + topic.publish( + json.dumps( + {"event_type": event_type, "payload": json_safe(payload)}, + ensure_ascii=False, + sort_keys=True, + default=str, + ) + ) + + +class HazelcastRowWriter: + """Callback bridge for ``HazelcastProjection`` writer hooks.""" + + def __init__(self, client: HazelcastClientLike) -> None: + self.client = client + + def __call__(self, name: str, row: dict[str, Any]) -> None: + if name.endswith("trade_events"): + self.client.get_topic(name).publish( + json.dumps(row, ensure_ascii=False, sort_keys=True, default=str) + ) + return + if name.endswith("control"): + key = "control" + else: + key = str(row.get("trade_id", row.get("slot_id", row.get("event_id", "")))) + self.client.get_map(name).put(key, json_safe(row)) + + +# ── PINK DITAv2 non-blocking Hz state writer ────────────────────────────────── + +class PinkHzStateWriter: + """Non-blocking Hz writer for PINK DITAv2 kernel state. + + Dedicated Hz client (separate from the data-feed read client). + All writes are fire-and-forget: .put() returns a Future that is intentionally + discarded. A failed write = missed TUI update only — never affects trading. + + BLUE-compatible schema (same shape as DOLPHIN_STATE_BLUE) written to + DOLPHIN_STATE_PINK / DOLPHIN_PNL_PINK — no overlap with BLUE maps. + """ + + def __init__( + self, + cluster: str, + host: str, + state_map_name: str, + pnl_map_name: str, + ) -> None: + import hazelcast + self._client = hazelcast.HazelcastClient( + cluster_name=cluster, + cluster_members=[host], + ) + # Non-blocking proxies (.put() returns Future, does NOT block) + self._state_map = self._client.get_map(state_map_name) + self._pnl_map = self._client.get_map(pnl_map_name) + + def write_engine_snapshot( + self, + slot_dict: dict, + acc_dict: dict, + posture: str = "APEX", + our_leverage: float = 0.0, + ) -> None: + """Write full engine state. Called after every kernel mutation (non-blocking).""" + payload: dict[str, Any] = { + "strategy": "pink", + "capital": acc_dict.get("capital", 0.0), + "equity": acc_dict.get("equity", 0.0), + "available_capital": acc_dict.get("available_capital", 0.0), + "pnl": acc_dict.get("realized_pnl_total", 0.0), + "fee_total": acc_dict.get("fee_total", 0.0), + "open_positions": int(acc_dict.get("open_positions", 0)), + "trade_seq": int(acc_dict.get("trade_seq", 0)), + "posture": posture, + "capital_frozen": bool(acc_dict.get("capital_frozen", False)), + "our_leverage": our_leverage, + "slot": slot_dict, + "updated_at": _utcnow_iso(), + } + _hz_write_no_wait(self._state_map, "engine_snapshot", _json_encode(payload)) + # Compact "latest" key — same shape as BLUE's DOLPHIN_STATE_BLUE["latest"] + _hz_write_no_wait(self._state_map, "latest", _json_encode({ + "strategy": "pink", + "capital": payload["capital"], + "date": _today_iso(), + "pnl": payload["pnl"], + "trades": payload["trade_seq"], + "posture": posture, + "updated_at": payload["updated_at"], + })) + + def write_daily_pnl(self, acc_dict: dict, posture: str = "APEX") -> None: + """Write per-date PnL row. Called on trade close only.""" + _hz_write_no_wait(self._pnl_map, _today_iso(), _json_encode({ + "pnl": acc_dict.get("realized_pnl_total", 0.0), + "capital": acc_dict.get("capital", 0.0), + "trades": int(acc_dict.get("trade_seq", 0)), + "posture": posture, + })) + + def close(self) -> None: + try: + self._client.shutdown() + except Exception: + pass diff --git a/prod/clean_arch/dita_v2/test_flaws.py b/prod/clean_arch/dita_v2/test_flaws.py index 8135134..284cada 100644 --- a/prod/clean_arch/dita_v2/test_flaws.py +++ b/prod/clean_arch/dita_v2/test_flaws.py @@ -1252,3 +1252,143 @@ class TestW10HttpErrorMapping: def test_dns_error_is_rate_limited(self): assert self._status("Name or service not known") == "RATE_LIMITED" + + +# ============================================================ +# PinkHzStateWriter: non-blocking Hz write correctness +# ============================================================ + +class TestPinkHzStateWriter: + """PinkHzStateWriter: payload shape, vol_ok gate, and non-blocking guarantees.""" + + def _make_writer_no_hz(self): + """Build a PinkHzStateWriter with a mock client that captures writes.""" + from prod.clean_arch.dita_v2.hazelcast_projection import PinkHzStateWriter + import unittest.mock as mock + + w = object.__new__(PinkHzStateWriter) + w._writes = {} # {(map_attr, key): value} + + # Build fake non-blocking IMap proxy + def _make_map(name): + m = mock.MagicMock(name=f"map:{name}") + def _put(key, value): + w._writes[(name, key)] = value + m.put.side_effect = _put + return m + + w._state_map = _make_map("DOLPHIN_STATE_PINK") + w._pnl_map = _make_map("DOLPHIN_PNL_PINK") + w._client = mock.MagicMock() + return w + + def test_engine_snapshot_writes_two_keys(self): + w = self._make_writer_no_hz() + w.write_engine_snapshot( + {"slot_id": 0, "fsm_state": "IDLE"}, + {"capital": 25000.0, "trade_seq": 42}, + posture="APEX", + ) + assert ("DOLPHIN_STATE_PINK", "engine_snapshot") in w._writes, ( + "PinkHzStateWriter must write engine_snapshot key" + ) + assert ("DOLPHIN_STATE_PINK", "latest") in w._writes, ( + "PinkHzStateWriter must write latest key (BLUE-compatible)" + ) + + def test_engine_snapshot_has_strategy_pink(self): + import json + w = self._make_writer_no_hz() + w.write_engine_snapshot({"slot_id": 0}, {"capital": 10000.0}) + snap = json.loads(w._writes[("DOLPHIN_STATE_PINK", "engine_snapshot")]) + assert snap["strategy"] == "pink", "engine_snapshot must identify as pink" + + def test_latest_key_has_blue_compatible_fields(self): + import json + w = self._make_writer_no_hz() + w.write_engine_snapshot({"slot_id": 0}, {"capital": 5000.0, "realized_pnl_total": 123.4, "trade_seq": 7}) + latest = json.loads(w._writes[("DOLPHIN_STATE_PINK", "latest")]) + for field in ("strategy", "capital", "date", "pnl", "trades", "posture", "updated_at"): + assert field in latest, f"BLUE-compatible 'latest' key missing field: {field}" + + def test_our_leverage_in_snapshot(self): + import json + w = self._make_writer_no_hz() + w.write_engine_snapshot( + {"slot_id": 0, "size": 0.5, "entry_price": 50000.0}, + {"capital": 25000.0}, + our_leverage=1.0, + ) + snap = json.loads(w._writes[("DOLPHIN_STATE_PINK", "engine_snapshot")]) + assert "our_leverage" in snap, "our_leverage (dual-leverage: system layer) must be in Hz snapshot" + + def test_daily_pnl_write(self): + import json + w = self._make_writer_no_hz() + w.write_daily_pnl({"realized_pnl_total": 45.6, "capital": 25000.0, "trade_seq": 3}) + key = next((k for k in w._writes if k[0] == "DOLPHIN_PNL_PINK"), None) + assert key is not None, "write_daily_pnl must write to DOLPHIN_PNL_PINK" + row = json.loads(w._writes[key]) + assert row["pnl"] == 45.6 + + def test_write_survives_exception(self): + """Hz write failure must never propagate — observability must not affect trading.""" + from prod.clean_arch.dita_v2.hazelcast_projection import _hz_write_no_wait + import unittest.mock as mock + bad_map = mock.MagicMock() + bad_map.put.side_effect = RuntimeError("Hz down") + _hz_write_no_wait(bad_map, "key", "value") # must not raise + + +# ============================================================ +# vol_ok gate in DecisionEngine +# ============================================================ + +class TestVolOkGate: + """DecisionEngine must block ENTERs when vol_ok=False in scan_payload.""" + + def _make_snapshot(self, vol_ok: bool, vdiv: float = -0.03, irp: float = 0.60): + from prod.clean_arch.ports.data_feed import MarketSnapshot + from datetime import datetime, timezone + return MarketSnapshot( + timestamp=datetime.now(timezone.utc), + symbol="BTCUSDT", + price=50000.0, + velocity_divergence=vdiv, + irp_alignment=irp, + scan_payload={"vol_ok": vol_ok, "posture": "APEX"}, + ) + + def _engine(self): + from prod.clean_arch.dita.decision import DecisionEngine, DecisionConfig + cfg = DecisionConfig( + vel_div_threshold=-0.02, + vel_div_extreme=-0.05, + fixed_tp_pct=0.0020, + max_hold_bars=250, + capital_fraction=0.20, + max_leverage=3.0, + allow_short=True, + allow_long=False, + ) + return DecisionEngine(cfg) + + def _ctx(self, open_positions: int = 0, capital: float = 25000.0): + from prod.clean_arch.dita.contracts import DecisionContext + return DecisionContext(capital=capital, open_positions=open_positions) + + def test_vol_ok_false_blocks_enter(self): + eng = self._engine() + snap = self._make_snapshot(vol_ok=False) + decision = eng.decide(snap, self._ctx()) + assert decision.action.value in ("HOLD", "NO_ACTION", "SKIP", "VOL_GATE"), ( + f"vol_ok=False must block ENTER, got action={decision.action.value!r} reason={getattr(decision, 'reason', '?')!r}" + ) + + def test_vol_ok_true_allows_enter(self): + eng = self._engine() + snap = self._make_snapshot(vol_ok=True) + decision = eng.decide(snap, self._ctx()) + assert decision.action.value not in ("VOL_GATE",), ( + "vol_ok=True must not block on vol_ok gate" + ) diff --git a/prod/clean_arch/runtime/pink_direct.py b/prod/clean_arch/runtime/pink_direct.py index 4f033b8..8850d30 100644 --- a/prod/clean_arch/runtime/pink_direct.py +++ b/prod/clean_arch/runtime/pink_direct.py @@ -12,10 +12,12 @@ from __future__ import annotations import asyncio import inspect +import json import logging import math from dataclasses import dataclass, field, replace from datetime import datetime, timezone +from pathlib import Path from types import SimpleNamespace from typing import Any, Callable, Optional @@ -75,6 +77,8 @@ def _slot_to_position_dict(slot) -> dict[str, Any]: # overflows to inf as price -> 0. Any real perp quote is far above this floor, # so a price below it (or non-finite) signals corrupt market data, not a trade. _MIN_SANE_PRICE = 1e-8 +# Path for kernel state persistence (crash recovery + session continuity). +_KERNEL_STATE_PATH = Path("/tmp/.pink_kernel_state.json") def _decision_to_kernel_intent( @@ -116,6 +120,46 @@ def _decision_to_kernel_intent( ) +def _persist_kernel_snapshot(kernel, log: logging.Logger) -> None: + """Write full kernel state to disk after each settled fill (G5 snapshot-on-fill).""" + try: + state_json = kernel.save_state() + _KERNEL_STATE_PATH.write_text(state_json, encoding="utf-8") + except Exception as exc: + log.warning("kernel snapshot persist failed (non-fatal): %s", exc) + + +def _restore_kernel_snapshot(kernel, log: logging.Logger) -> bool: + """On startup, restore kernel state from disk if account is flat (no open positions). + + Returns True if a snapshot was found and successfully restored. + """ + if not _KERNEL_STATE_PATH.exists(): + return False + try: + state_json = _KERNEL_STATE_PATH.read_text(encoding="utf-8") + meta = json.loads(state_json) + # Sanity check: only restore if the saved snapshot had no open trades. + saved_slots = meta.get("slots", []) + open_at_save = [s for s in saved_slots if s.get("fsm_state") not in (None, "", "IDLE", "CLOSED")] + if open_at_save: + log.warning( + "kernel snapshot has %d open slot(s) at save time — " + "skipping restore (must be flat for safe handoff)", + len(open_at_save), + ) + return False + ok = kernel.restore_state(state_json) + if ok: + log.info("kernel state restored from %s (fee_calibration + account preserved)", _KERNEL_STATE_PATH) + else: + log.warning("kernel restore_state rejected snapshot (version or slot mismatch)") + return ok + except Exception as exc: + log.warning("kernel snapshot restore failed (non-fatal): %s", exc) + return False + + def _reconcile_position_slot( kernel: ExecutionKernel, exchange_balance_capital: float, @@ -221,11 +265,15 @@ class PinkDirectRuntime: market_state_runtime: Any = None event_sink: Optional[Callable[[dict[str, Any]], None]] = None logger: Any = LOGGER + # Non-blocking Hz state writer (None = Hz unavailable; PINK trades regardless) + hz_state_writer: Any = field(default=None, repr=False, compare=False) # Account stream state — managed by connect/disconnect, not init args _account_stream_task: Optional[asyncio.Task] = field( default=None, init=False, repr=False, compare=False ) _enter_frozen: bool = field(default=False, init=False, repr=False, compare=False) + # Last known posture — carried into Hz writes for TUI/algo monitoring + _last_posture: str = field(default="APEX", init=False, repr=False, compare=False) async def connect(self, initial_capital: float = 25000.0) -> None: """Connect data feed, venue, seed capital from exchange, start WS stream.""" @@ -246,6 +294,11 @@ class PinkDirectRuntime: self.kernel.set_seed_capital(initial_capital) await self._seed_account_from_exchange() + # Restore fee calibration + account state from the previous session if the + # kernel was flat at save time. Must be AFTER set_seed_capital and reconcile + # so the snapshot can override our fresh seed with the last-known calibration. + _restore_kernel_snapshot(self.kernel, self.logger) + # Start WS account stream (primary); poll failover handled inside stream. self._account_stream_task = asyncio.create_task( self._run_account_stream(), name="pink_account_stream" @@ -269,13 +322,13 @@ class PinkDirectRuntime: # BingX VST/LIVE taker fee schedule. These are the current published rates. # Override via set_exchange_config() if the exchange adjusts them. - _BINGX_FEE_CONFIG: dict = { + _BINGX_FEE_CONFIG: dict = field(default_factory=lambda: { "taker_rate": 0.0005, # 0.05% market orders "maker_rate": 0.0002, # 0.02% limit resting "lot_step": 0.001, "tick_size": 0.0001, "funding_interval_secs": 28_800, # 8 h BingX perps - } + }) async def _seed_account_from_exchange(self) -> None: """ @@ -347,7 +400,9 @@ class PinkDirectRuntime: if fill_price <= 0 or fill_qty <= 0 or actual_fee <= 0: self.logger.info("Fee calibration: fill row missing price/qty/fee — skipping") return - report = self.kernel.calibrate_fee(fill_price, fill_qty, actual_fee) + order_type = str(row.get("orderType") or row.get("type") or "MARKET").upper() + is_maker = order_type == "LIMIT" + report = self.kernel.calibrate_fee(fill_price, fill_qty, actual_fee, is_maker=is_maker) status = report.get("calibration_status", "?") log = self.logger.error if status == "ERROR" else self.logger.info log( @@ -395,14 +450,20 @@ class PinkDirectRuntime: "fill_price": event.fill_price, "fill_qty": event.fill_qty, "realized_pnl": event.realized_pnl, + "is_maker": event.is_maker, }) - # Also fold actual fee if WS delivered it - if event.fee > 0: + # Fold actual fee if WS delivered it (replaces prediction) + if event.fee != 0: self.kernel.on_account_event({ "kind": "FILL_SETTLED", + "event_id": event.event_id, "realized_pnl": 0.0, # already folded above - "fee": event.fee, + "fee": event.fee, # negative = rebate + "is_maker": event.is_maker, }) + # Persist full kernel state after every settled fill for + # crash recovery + session-to-session calibration continuity. + _persist_kernel_snapshot(self.kernel, self.logger) elif event.kind == ExchangeEventKind.ACCOUNT_UPDATE: result = self.kernel.on_account_event({ "kind": "ACCOUNT_UPDATE", @@ -421,12 +482,20 @@ class PinkDirectRuntime: result.get("reconcile_explanation", ""), ) self._enter_frozen = True + # Hz write: capital_frozen state changed + _slot = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} + _acc = self.kernel.snapshot().get("account") or {} + self._hz_publish(_slot, _acc) else: if self._enter_frozen: self.logger.info( "Account reconcile %s — unfreezing ENTERs.", status ) self._enter_frozen = False + # Hz write: unfreeze is also a state change + _slot = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} + _acc = self.kernel.snapshot().get("account") or {} + self._hz_publish(_slot, _acc) elif event.kind == ExchangeEventKind.FUNDING_FEE: self.kernel.on_account_event({ "kind": "FUNDING_FEE", @@ -524,12 +593,35 @@ class PinkDirectRuntime: if isinstance(scan_payload.get("esof_payload"), dict) else None, ) + # Track posture for Hz writes + self._last_posture = str(scan_payload.get("posture") or "APEX") return dict( getattr(runtime, "latest_bundle_dict", {}) or bundle.as_dict() ) except Exception: return {} + def _hz_publish(self, slot_dict: dict, acc: dict) -> None: + """Fire-and-forget Hz write after any kernel state change. + + Computes system leverage (our_leverage = notional/capital) for the Hz + snapshot — this is the PINK/BLUE dual-leverage invariant: system leverage + reflects real margin utilisation; exchange leverage (1-3x cap) is set at + the BingX API level and never touches this path. + """ + if self.hz_state_writer is None: + return + try: + size = float(slot_dict.get("size") or 0.0) + ep = float(slot_dict.get("entry_price") or 0.0) + capital = float(acc.get("capital") or 0.0) + our_leverage = (size * ep / capital) if capital > 1e-10 else 0.0 + self.hz_state_writer.write_engine_snapshot( + slot_dict, acc, posture=self._last_posture, our_leverage=our_leverage + ) + except Exception: + pass + async def pump_venue_events( self, snapshot: Any | None = None, *, market_state: Any = None ) -> int: @@ -586,6 +678,9 @@ class PinkDirectRuntime: slot_dict=slot_dict, market_state=market_state or {}, ) + # Hz write after fills settle — slot FSM and capital may have changed + acc = self.kernel.snapshot().get("account") or {} + self._hz_publish(slot_dict, acc) return len(applied) def _unsafe_entry_reason(self, kernel_intent: KernelIntent, context: Any) -> Optional[str]: @@ -785,6 +880,20 @@ class PinkDirectRuntime: phase="execution", market_state=market_state, ) + + # Hz write: ENTER/EXIT changed slot FSM — publish updated state + self._hz_publish(slot_dict, acc) + + # On trade close, write daily PnL row + if ( + self.hz_state_writer is not None + and slot_dict.get("closed") + ): + try: + self.hz_state_writer.write_daily_pnl(acc, posture=self._last_posture) + except Exception: + pass + else: # HOLD / no-op: update mark price in kernel. if snapshot.price and snapshot.price > 0: diff --git a/prod/launch_dolphin_pink.py b/prod/launch_dolphin_pink.py new file mode 100644 index 0000000..2a16a03 --- /dev/null +++ b/prod/launch_dolphin_pink.py @@ -0,0 +1,417 @@ +#!/usr/bin/env python3 +"""PINK live launcher — DITAv2-backed execution. + +Wires PINK decision/intent logic through the DITAv2 kernel + BingX venue +adapter. The kernel owns the single-slot FSM, AccountProjection (capital +settled from fills, not balance-poll overwritten), Zinc shared-memory mirror, +and Hazelcast slot projection. +""" + +from __future__ import annotations + +import asyncio +from copy import deepcopy +import contextlib +import os +import sys +from pathlib import Path +from enum import Enum +from typing import Any +from datetime import datetime + +PROJECT_ROOT = Path(__file__).parent.parent +sys.path.insert(0, str(PROJECT_ROOT / "prod")) +sys.path.insert(0, str(PROJECT_ROOT / "prod" / "clean_arch")) +sys.path.insert(0, str(PROJECT_ROOT)) + +from dotenv import load_dotenv + +load_dotenv(PROJECT_ROOT / ".env") + +from prod.bingx.config import BingxExecClientConfig +from prod.bingx.config import BingxInstrumentProviderConfig +from prod.bingx.enums import BingxEnvironment +from prod.clean_arch.adapters.hazelcast_feed import HazelcastDataFeed +from prod.clean_arch.dita import DecisionConfig +from prod.clean_arch.dita import DecisionEngine +from prod.clean_arch.dita import IntentEngine +from prod.clean_arch.dita_v2.launcher import build_launcher_bundle +from prod.clean_arch.persistence import PinkClickHousePersistence +from adaptive_exit.market_state_runtime import MarketStateRuntime +from prod.clean_arch.runtime.pink_direct import PinkDirectRuntime +from prod.clean_arch.runtime.runner_heartbeat import ( + build_runner_heartbeat_payload, + write_runner_heartbeat, +) + +PINK_DEFAULTS = { + "strategy_name": "pink", + "state_map": "DOLPHIN_STATE_PINK", + "pnl_map": "DOLPHIN_PNL_PINK", + "trader_id": "DOLPHIN-PINK-001", + "journal_strategy": "pink", + "journal_db": "dolphin_pink", + "fixed_tp_pct": 0.0020, + "vol_p60_threshold": -1000000000.0, +} + + +class PinkPhase(str, Enum): + """Feature-gate phases for the standalone PINK launcher.""" + + BOOTSTRAP = "bootstrap" + SINGLE_LEG = "single_leg" + MULTI_EXIT = "multi_exit" + + +def _env_bool(name: str, default: bool = False) -> bool: + raw = os.environ.get(name) + if raw is None: + return default + return str(raw).strip().lower() in {"1", "true", "yes", "on"} + + +def _env_upper(name: str, default: str = "") -> str: + return str(os.environ.get(name, default)).strip().upper() + + +def _resolve_bingx_environment() -> BingxEnvironment: + name = str(os.environ.get("DOLPHIN_BINGX_ENV", "VST")).strip().upper() + return BingxEnvironment.LIVE if name == "LIVE" else BingxEnvironment.VST + + +def _resolve_bingx_allow_mainnet() -> bool: + return _env_bool("DOLPHIN_BINGX_ALLOW_MAINNET", False) + + +def _resolve_bingx_recv_window_ms() -> int: + raw = str(os.environ.get("DOLPHIN_BINGX_RECV_WINDOW_MS", "5000")).strip() + try: + parsed = int(raw) + except Exception: + return 5000 + return parsed if parsed > 0 else 5000 + + +def _resolve_bingx_exchange_leverage_cap() -> int: + raw = str(os.environ.get("DOLPHIN_BINGX_EXCHANGE_LEVERAGE_CAP", "3")).strip() + try: + parsed = int(raw) + except Exception: + return 3 + return parsed if parsed > 0 else 3 + + +def _resolve_pink_vol_p60_threshold() -> float: + raw = str(os.environ.get("DOLPHIN_PINK_VOL_P60_THRESHOLD", PINK_DEFAULTS["vol_p60_threshold"])).strip() + try: + return float(raw) + except Exception: + return float(PINK_DEFAULTS["vol_p60_threshold"]) + + +def _resolve_pink_phase() -> PinkPhase: + raw = str(os.environ.get("DOLPHIN_PINK_PHASE", PinkPhase.SINGLE_LEG.value)).strip().lower() + for phase in PinkPhase: + if raw == phase.value: + return phase + return PinkPhase.SINGLE_LEG + + +def _resolve_pink_account_sync_interval_sec() -> float: + """Account sync is now advisory — kernel tracks capital via settle() + on close. Periodic reconcile re-seeds capital from exchange balance, + mainly as a safety net for long-running sessions.""" + raw = str(os.environ.get("DOLPHIN_PINK_ACCOUNT_SYNC_INTERVAL_SEC", "300")).strip() + try: + parsed = float(raw) + except Exception: + return 300.0 + return parsed if parsed > 0 else 300.0 + + +def _resolve_pink_exit_leg_ratios(phase: PinkPhase) -> tuple[float, ...]: + if phase is PinkPhase.MULTI_EXIT: + raw = str(os.environ.get("DOLPHIN_PINK_EXIT_LEG_RATIOS", "0.5,1.0")).strip() + ratios: list[float] = [] + for chunk in raw.split(","): + try: + value = float(chunk.strip()) + except Exception: + continue + if 0.0 < value <= 1.0: + ratios.append(value) + if ratios: + return tuple(ratios) + return (0.5, 1.0) + return (1.0,) + + +def _set_ditav2_env_defaults() -> None: + os.environ.setdefault("DITA_V2_VENUE", "BINGX") + os.environ.setdefault("DITA_V2_HAZELCAST", "REAL") + os.environ.setdefault("DITA_V2_MODE", "DEBUG") + os.environ.setdefault("DITA_V2_VERBOSITY", "TRACE") + os.environ.setdefault("DITA_V2_PREFIX", "pink") + os.environ.setdefault("DOLPHIN_BINGX_ENV", "VST") + os.environ.setdefault("DOLPHIN_BINGX_ALLOW_MAINNET", "0") + + +def _apply_pink_namespace_env() -> None: + os.environ["DOLPHIN_STRATEGY_NAME"] = PINK_DEFAULTS["strategy_name"] + os.environ["DOLPHIN_STATE_MAP"] = PINK_DEFAULTS["state_map"] + os.environ["DOLPHIN_PNL_MAP"] = PINK_DEFAULTS["pnl_map"] + os.environ["DOLPHIN_JOURNAL_STRATEGY"] = PINK_DEFAULTS["journal_strategy"] + os.environ["DOLPHIN_JOURNAL_DB"] = PINK_DEFAULTS["journal_db"] + os.environ["DOLPHIN_FIXED_TP_PCT"] = f'{PINK_DEFAULTS["fixed_tp_pct"]:.4f}' + os.environ["DOLPHIN_BINGX_ENV"] = "VST" + os.environ["DOLPHIN_BINGX_ALLOW_MAINNET"] = "0" + + +def _apply_pink_env() -> None: + _set_ditav2_env_defaults() + _apply_pink_namespace_env() + + +def _apply_pink_actor_overrides(actor_cfg: dict[str, Any]) -> dict[str, Any]: + cfg: dict[str, Any] = deepcopy(actor_cfg) if actor_cfg else {} + cfg["strategy_name"] = PINK_DEFAULTS["strategy_name"] + hz = cfg.setdefault("hazelcast", {}) + hz["state_map"] = PINK_DEFAULTS["state_map"] + hz["imap_pnl"] = PINK_DEFAULTS["pnl_map"] + hz["state_map_aliases"] = [] + hz["imap_pnl_aliases"] = [] + + adaptive_exit = cfg.setdefault("adaptive_exit", {}) + adaptive_exit["shadow_db"] = PINK_DEFAULTS["journal_db"] + cfg["v7_journal_db"] = PINK_DEFAULTS["journal_db"] + cfg["sync_bar_idx_from_blue"] = False + + vol_p60_threshold = _resolve_pink_vol_p60_threshold() + cfg["vol_p60_threshold"] = vol_p60_threshold + cfg.setdefault("paper_trade", {})["vol_p60"] = vol_p60_threshold + cfg.setdefault("engine", {})["fixed_tp_pct"] = float(PINK_DEFAULTS["fixed_tp_pct"]) + return cfg + + +class BinanceDataClientConfig: # pragma: no cover - compatibility shim + """Local placeholder so legacy tests can patch the symbol without Nautilus imports.""" + + +class TradingNode: # pragma: no cover - compatibility shim + """Local placeholder so legacy tests can patch the symbol without Nautilus imports.""" + + +def build_actor_config( + *, + data_venue: str | None = None, + exec_venue: str | None = None, +) -> dict[str, Any]: + """Build the minimal actor config needed by the direct PINK launcher.""" + return _apply_pink_actor_overrides( + { + "strategy_name": PINK_DEFAULTS["strategy_name"], + "hazelcast": { + "state_map": PINK_DEFAULTS["state_map"], + "imap_pnl": PINK_DEFAULTS["pnl_map"], + "state_map_aliases": [], + "imap_pnl_aliases": [], + }, + "adaptive_exit": {"shadow_db": PINK_DEFAULTS["journal_db"]}, + "paper_trade": {"vol_p60": _resolve_pink_vol_p60_threshold()}, + "engine": {"fixed_tp_pct": PINK_DEFAULTS["fixed_tp_pct"]}, + "data_venue": (data_venue or "BINANCE").upper(), + "exec_venue": (exec_venue or "BINGX").upper(), + "v7_journal_db": PINK_DEFAULTS["journal_db"], + "sync_bar_idx_from_blue": False, + } + ) + + +def build_bingx_exec_client_config(**_: Any) -> BingxExecClientConfig: + """Return the direct BingX client config shared by the DITAv2 bundle.""" + return BingxExecClientConfig( + api_key=os.environ.get("BINGX_API_KEY"), + secret_key=os.environ.get("BINGX_SECRET_KEY"), + environment=_resolve_bingx_environment(), + allow_mainnet=_resolve_bingx_allow_mainnet(), + recv_window_ms=_resolve_bingx_recv_window_ms(), + default_leverage=int(os.environ.get("DOLPHIN_BINGX_DEFAULT_LEVERAGE", "1")), + exchange_leverage_cap=_resolve_bingx_exchange_leverage_cap(), + prefer_websocket=False, + sizing_mode=os.environ.get("DOLPHIN_BINGX_SIZING_MODE", "testnet"), + journal_strategy="pink", + journal_db="dolphin_pink", + instrument_provider=BingxInstrumentProviderConfig(load_all=True), + ) + + +def build_pink_node( + *, + data_venue: str | None = None, + exec_venue: str | None = None, + trader_id: str | None = None, +) -> dict[str, Any]: + """Compatibility shim for legacy tests/tools expecting a node-style builder.""" + resolved_bingx_env = _resolve_bingx_environment() + resolved_bingx_allow_mainnet = _resolve_bingx_allow_mainnet() + if resolved_bingx_env is BingxEnvironment.LIVE and not resolved_bingx_allow_mainnet: + raise RuntimeError("BingX LIVE requested but DOLPHIN_BINGX_ALLOW_MAINNET is not enabled") + + actor_cfg = build_actor_config( + data_venue=(data_venue or "BINANCE"), + exec_venue=(exec_venue or "BINGX"), + ) + actor_cfg = _apply_pink_actor_overrides(actor_cfg) + actor_cfg["trader_id"] = trader_id or PINK_DEFAULTS["trader_id"] + actor_cfg["bingx_environment"] = str(resolved_bingx_env.value) + + return {"actor_cfg": actor_cfg} + + +def _build_data_feed() -> HazelcastDataFeed: + return HazelcastDataFeed( + { + "hazelcast": { + "cluster": os.environ.get("HZ_CLUSTER", "dolphin"), + "host": os.environ.get("HZ_HOST", "localhost:5701"), + } + } + ) + + +def _build_runtime(*, phase: PinkPhase) -> PinkDirectRuntime: + data_feed = _build_data_feed() + market_state_runtime = MarketStateRuntime() + + # Decision and intent policy — unchanged from BLUE semantics. + cfg = DecisionConfig( + vel_div_threshold=-0.02, + vel_div_extreme=-0.05, + fixed_tp_pct=float(os.environ.get("DOLPHIN_FIXED_TP_PCT", "0.0020")), + max_hold_bars=int(os.environ.get("DOLPHIN_MAX_HOLD_BARS", "250")), + capital_fraction=0.20, + max_leverage=3.0, + allow_short=True, + allow_long=False, + policy_version="pink_ditav2_v1", + exit_leg_ratios=_resolve_pink_exit_leg_ratios(phase), + ) + decision = DecisionEngine(cfg) + intent = IntentEngine(cfg) + + # DITAv2 execution bundle: kernel + venue + control + Zinc + projection. + bundle = build_launcher_bundle( + venue_mode="BINGX", + max_slots=1, + bingx_config=build_bingx_exec_client_config(), + ) + kernel = bundle.kernel + + # Persistence reads from the kernel's AccountProjection (single authority). + persistence = PinkClickHousePersistence(kernel.account) + + # Non-blocking Hz state writer — writes DOLPHIN_STATE_PINK / DOLPHIN_PNL_PINK. + # Separate client from data-feed (which is read-only). Lazy: any Hz failure + # during instantiation is caught and silenced so PINK still trades without Hz. + hz_state_writer = None + try: + from prod.clean_arch.dita_v2.hazelcast_projection import PinkHzStateWriter + hz_state_writer = PinkHzStateWriter( + cluster=os.environ.get("HZ_CLUSTER", "dolphin"), + host=os.environ.get("HZ_HOST", "localhost:5701"), + state_map_name=PINK_DEFAULTS["state_map"], # "DOLPHIN_STATE_PINK" + pnl_map_name=PINK_DEFAULTS["pnl_map"], # "DOLPHIN_PNL_PINK" + ) + except Exception: + pass # Hz down at startup → PINK still trades; TUI shows kernel snapshot fallback + + return PinkDirectRuntime( + data_feed=data_feed, + kernel=kernel, + decision_engine=decision, + intent_engine=intent, + persistence=persistence, + market_state_runtime=market_state_runtime, + hz_state_writer=hz_state_writer, + ) + + +async def run() -> None: + _apply_pink_env() + phase = _resolve_pink_phase() + os.environ["DOLPHIN_PINK_PHASE"] = phase.value + runtime = _build_runtime(phase=phase) + symbol = str(os.environ.get("DOLPHIN_PINK_SNAPSHOT_SYMBOL", "BTCUSDT")).strip().upper() + poll_interval = float(os.environ.get("DOLPHIN_PINK_POLL_INTERVAL_SEC", "1.0")) + one_shot = _env_bool("DOLPHIN_PINK_ONE_SHOT", False) + account_sync_interval = _resolve_pink_account_sync_interval_sec() + initial_capital = float(os.environ.get("DOLPHIN_INITIAL_CAPITAL", "25000.0")) + + await runtime.connect(initial_capital=initial_capital) + heartbeat_client = None + heartbeat_map = None + heartbeat_stop = asyncio.Event() + heartbeat_task = None + try: + import hazelcast + heartbeat_client = hazelcast.HazelcastClient( + cluster_name=os.environ.get("HZ_CLUSTER", "dolphin"), + cluster_members=[os.environ.get("HZ_HOST", "localhost:5701")], + ) + heartbeat_map = heartbeat_client.get_map("DOLPHIN_HEARTBEAT").blocking() + + async def _heartbeat_loop() -> None: + while not heartbeat_stop.is_set(): + try: + write_runner_heartbeat( + heartbeat_map, + build_runner_heartbeat_payload( + flow="pink_ditav2_runtime", + phase=phase.value, + run_date=str(datetime.utcnow().date()), + runner="pink", + ), + ) + except Exception: + pass + try: + await asyncio.wait_for(heartbeat_stop.wait(), timeout=10.0) + except asyncio.TimeoutError: + continue + + heartbeat_task = asyncio.create_task(_heartbeat_loop()) + + initial_snapshot = await runtime.data_feed.get_latest_snapshot(symbol) + await runtime.recover_account( + snapshot=initial_snapshot, + phase="startup_reconcile", + event_type="ACCOUNT_RECONCILE", + ) + last_account_sync = asyncio.get_running_loop().time() + while True: + snapshot = await runtime.data_feed.get_latest_snapshot(symbol) + loop_now = asyncio.get_running_loop().time() + if account_sync_interval > 0 and loop_now - last_account_sync >= account_sync_interval: + await runtime.reconcile_account(snapshot) + last_account_sync = loop_now + if phase is not PinkPhase.BOOTSTRAP and snapshot is not None: + await runtime.step(snapshot) + if one_shot: + break + await asyncio.sleep(poll_interval) + finally: + heartbeat_stop.set() + if heartbeat_task is not None: + heartbeat_task.cancel() + with contextlib.suppress(BaseException): + await heartbeat_task + if heartbeat_client is not None: + heartbeat_client.shutdown() + if runtime.hz_state_writer is not None: + runtime.hz_state_writer.close() + await runtime.disconnect() + + +if __name__ == "__main__": + asyncio.run(run())