From 2c9da8f5929121a95e88f5a353806485145d657b Mon Sep 17 00:00:00 2001 From: Codex Date: Thu, 11 Jun 2026 20:53:49 +0200 Subject: [PATCH] =?UTF-8?q?PINK=20Phase=200:=20FET=20-$5,990=20fix=20batch?= =?UTF-8?q?=20=E2=80=94=20leverage-free=20PnL,=20true=20fill=20prices,=20r?= =?UTF-8?q?econcile=20baseline=20anchors?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Defects fix (FET -$5,990 replay, 2026-06-11): - realized_pnl() and mark_price(): PnL = qty × Δprice, side-signed; no ×leverage inflation (was 3× every leg). - BingX MARKET fill events carry true fill price (avgPrice/lastFillPrice), never the order's nominal price (protective bound ±20-25% from mark, poisoned PnL to -$5,990 on a +$164 round-trip). - Fill routing by ORDER IDENTITY first, FSM state second — late entry-remainder fills during EXIT_WORKING no longer misclassify as exits. - Entry basis = VWAP across entry fills, not last fill price. - reconcile_from_slots / restore_state: re-anchor _last_settled_pnl / _slot_was_closed to adopted slot state (cross-restart double-book of carried PnL). - ACCOUNT_UPDATE with wallet_balance=0 dropped (margin-only frames no longer zero e_available_margin). - Foreign-fill skip on shared VST account (PRODGREEN collision filter). - exec_router TTL: entry-requote venue-truth gate (recent own fill + live exchange position probes prevent double-entry). - bingx_direct: openOrders fetched BEFORE positions (sequential ordering prevents dangerous tear → double-entries). - Dual-leverage translation via map_internal_conviction_to_exchange_leverage() (strategy conviction → integer at-exchange leverage, bankers rounding). - BLUE-parity alpha components wired: asset picker (IRP universe ranking) + alpha sizer (cubic-convex dynamic leverage, 0.5-8.0 range). - ch_writer: date_time_input_format=best_effort on insert URLs; flush error logging at WARNING with counter. - blue_parity.price_of(): hyphen-tolerant fallback (FET-USDT → FETUSDT). - Fill test updated to incremental filled_size semantics (BingX WS lastFilledQty). - Env-override base URLs, supervisord autorestart, per-asset DC histories, single-slot invariant, fill-attribution filter. Co-authored-by: CommandCodeBot --- prod/bingx/http.py | 12 +- prod/clean_arch/adapters/bingx_direct.py | 38 +- prod/clean_arch/dita/decision.py | 35 +- .../dita_v2/_rust_kernel/src/lib.rs | 110 ++- prod/clean_arch/dita_v2/bingx_user_stream.py | 2 +- prod/clean_arch/dita_v2/bingx_venue.py | 41 +- prod/clean_arch/dita_v2/rust_backend.py | 18 +- prod/clean_arch/runtime/pink_direct.py | 673 +++++++++++++++++- prod/launch_dolphin_pink.py | 40 +- prod/supervisor/dolphin-supervisord.conf | 10 +- 10 files changed, 929 insertions(+), 50 deletions(-) diff --git a/prod/bingx/http.py b/prod/bingx/http.py index 939b3b9..c8ff5e6 100644 --- a/prod/bingx/http.py +++ b/prod/bingx/http.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio import json import logging +import os import socket import subprocess from contextlib import asynccontextmanager @@ -58,9 +59,16 @@ class BingxHttpClient: self._logger = logging.getLogger(__name__) self._config = config require_mainnet_opt_in(config.environment, getattr(config, "allow_mainnet", False), context="BingX HTTP client") + # Env overrides (2026-06-10): the *.bingx.pro backup mirrors serve an + # incomplete TLS chain (missing Cloudflare intermediate) and can never + # verify on this host, so every failover retry was burning time on a + # dead endpoint. Operators can point the backup at a reachable host + # (e.g. the primary itself) without touching code. + _env_primary = os.environ.get("DOLPHIN_BINGX_BASE_URL", "").strip() + _env_backup = os.environ.get("DOLPHIN_BINGX_BASE_URL_BACKUP", "").strip() self._base_urls = ( - config.base_url_http or get_rest_base_urls(config.environment)[0], - config.base_url_http_backup or get_rest_base_urls(config.environment)[1], + _env_primary or config.base_url_http or get_rest_base_urls(config.environment)[0], + _env_backup or config.base_url_http_backup or get_rest_base_urls(config.environment)[1], ) self._base_hosts = tuple(urlsplit(url).hostname for url in self._base_urls) self._api_key = config.api_key diff --git a/prod/clean_arch/adapters/bingx_direct.py b/prod/clean_arch/adapters/bingx_direct.py index 30262de..88fce27 100644 --- a/prod/clean_arch/adapters/bingx_direct.py +++ b/prod/clean_arch/adapters/bingx_direct.py @@ -26,7 +26,7 @@ from prod.bingx.enums import BingxEnvironment from prod.bingx.http import BingxHttpError from prod.bingx.http import BingxHttpClient from prod.bingx.instrument_provider import BingxInstrumentProvider -from prod.bingx.leverage import normalize_bingx_leverage_value +from prod.bingx.leverage import map_internal_conviction_to_exchange_leverage from prod.bingx.schemas import BingxOrderAck from prod.bingx.schemas import unwrap_order_payload from prod.clean_arch.dita import Intent, TradeSide, DecisionAction @@ -448,12 +448,22 @@ class BingxDirectExecutionAdapter(ExecutionPort): the others. Historical calls (allOrders, allFillOrders) are gated on ``include_history`` and also gathered. """ - balance_task = self._safe_get("/openApi/swap/v2/user/balance") - positions_task = self._safe_get("/openApi/swap/v2/user/positions") - orders_task = self._safe_get("/openApi/swap/v2/trade/openOrders") - - balance_payload, positions_payload, open_orders_payload = await asyncio.gather( - balance_task, positions_task, orders_task, + # SNAPSHOT CONSISTENCY ORDER (2026-06-10, operator-mandated atomicity): + # openOrders MUST complete BEFORE positions is fetched. The snapshot + # is assembled from separate REST calls and cannot be truly atomic, but + # this ordering makes the dangerous tear unrepresentable: if an order + # fills between the two fetches, it still APPEARS in open_orders + # (conservative: treated as working) while the resulting position is + # ALSO visible in positions. The deadly combination — absent from + # open_orders AND absent from positions for a filled order — cannot + # occur. Consumers may therefore reason "order gone ⇒ its outcome is + # visible in positions/fills" ONLY because of this ordering. + # (Previously all three were gathered concurrently → torn snapshots → + # double-entry incidents 15:20 and 17:24 UTC.) + open_orders_payload = await self._safe_get("/openApi/swap/v2/trade/openOrders") + balance_payload, positions_payload = await asyncio.gather( + self._safe_get("/openApi/swap/v2/user/balance"), + self._safe_get("/openApi/swap/v2/user/positions"), ) all_orders_payload: Any = [] @@ -585,8 +595,13 @@ class BingxDirectExecutionAdapter(ExecutionPort): _ts36 = self._base36(int(time.time() * 1000)) _rand4 = uuid.uuid4().hex[:4] client_order_id = f"p-{_action_char}-{_ts36}-{_rand4}" - leverage = normalize_bingx_leverage_value( - int(round(float(intent.leverage or self._config.default_leverage))), + # DUAL-LEVERAGE TRANSLATION (prod/bingx/leverage.py, SYSTEM BIBLE §6): + # intent.leverage is the STRATEGY conviction (fractional, 0.5–9.0) and + # already sized the quantity. At-exchange leverage is derived from it + # via the linear conviction map → integer [1, cap], bankers rounding. + # Plain round(conviction) here previously pinned every trade at the cap. + leverage = map_internal_conviction_to_exchange_leverage( + float(intent.leverage or self._config.default_leverage), exchange_max=self._config.exchange_leverage_cap, ) @@ -624,7 +639,10 @@ class BingxDirectExecutionAdapter(ExecutionPort): } if is_limit: payload["price"] = self._format_price(intent.asset, limit_price) - payload["timeInForce"] = "GTC" + # Exec-router maker quotes send PostOnly so a crossing quote is + # rejected by the venue instead of paying taker silently. + _tif = str((intent.metadata or {}).get("_time_in_force", "GTC") or "GTC") + payload["timeInForce"] = _tif if _tif in ("GTC", "IOC", "FOK", "PostOnly") else "GTC" if reduce_only: payload["reduceOnly"] = "true" LOGGER.debug("order POST: action=%s side=%s symbol=%s qty=%s reduceOnly=%s", diff --git a/prod/clean_arch/dita/decision.py b/prod/clean_arch/dita/decision.py index 3d2178e..b7ef706 100644 --- a/prod/clean_arch/dita/decision.py +++ b/prod/clean_arch/dita/decision.py @@ -27,8 +27,13 @@ class DecisionEngine: It does not size orders or own exchange state. """ - def __init__(self, config: Optional[DecisionConfig] = None): + def __init__(self, config: Optional[DecisionConfig] = None, sizer: Optional[object] = None): self.config = config or DecisionConfig() + # Optional BLUE-parity sizer (PinkAlphaSizer / AlphaBetSizer-shaped: + # calculate_size(capital=..., vel_div=...) → {fraction, leverage, ...}). + # None preserves the legacy linear-confidence sizing exactly — other + # consumers of this engine (main.py, trading_engine.py) are unaffected. + self.sizer = sizer def decide( self, @@ -76,9 +81,30 @@ class DecisionEngine: # vol_ok gate — scan bridge marks low-volume periods; block ENTERs when absent if snapshot.scan_payload and not snapshot.scan_payload.get("vol_ok", True): return self._hold(snapshot, context, fields, reason="VOL_GATE") - confidence = min(1.0, max(0.05, abs(fields.vdiv / self.config.vel_div_threshold))) - leverage = min(self.config.max_leverage, max(1.0, 1.0 + confidence * (self.config.max_leverage - 1.0))) - target_exposure = context.capital * self.config.capital_fraction * leverage + sizing_meta: dict = {} + if self.sizer is not None: + # BLUE-parity sizing (SYSTEM BIBLE §6): cubic-convex dynamic + # leverage + alpha-layer fraction via AlphaBetSizer kernels. + size_result = self.sizer.calculate_size(capital=context.capital, vel_div=fields.vdiv) + leverage = float(size_result["leverage"]) + fraction = float(size_result["fraction"]) + target_exposure = context.capital * fraction * leverage + breakdown = size_result.get("breakdown") or {} + confidence = min(1.0, max(0.05, float(breakdown.get("strength_score", 0.0)))) + sizing_meta = { + "eff_fraction": fraction, + "strength_score": breakdown.get("strength_score"), + "signal_bucket": breakdown.get("signal_bucket"), + "bucket_idx": size_result.get("bucket_idx"), + "sizing": "alpha_bet_sizer_cubic_v1", + } + else: + # Legacy DITAv2 formula. NOTE: an ENTER requires vdiv < threshold, + # so this confidence is always ≥ 1.0 → clamped → leverage pinned at + # max_leverage. Kept verbatim for non-PINK consumers. + confidence = min(1.0, max(0.05, abs(fields.vdiv / self.config.vel_div_threshold))) + leverage = min(self.config.max_leverage, max(1.0, 1.0 + confidence * (self.config.max_leverage - 1.0))) + target_exposure = context.capital * self.config.capital_fraction * leverage target_size = target_exposure / fields.price if fields.price > 0 else 0.0 our_leverage = compute_our_leverage(notional=target_exposure, capital=context.capital) tp_base_pct = float(self.config.fixed_tp_pct) @@ -102,6 +128,7 @@ class DecisionEngine: "tp_effective_pct": tp_effective_pct, "our_leverage": our_leverage, "tp_curve": "soft_leverage_curve_v1", + **sizing_meta, }, ) diff --git a/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs index b2f2fef..33ebedf 100644 --- a/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs +++ b/prod/clean_arch/dita_v2/_rust_kernel/src/lib.rs @@ -395,18 +395,29 @@ impl TradeSlot { if !price.is_finite() || price <= 0.0 { return; } + // NOTE: a mark price must never become the PnL entry basis. The old + // fallback (`entry_price = first mark when entry_price == 0`) silently + // contaminated the basis of reconcile-adopted slots; if entry is + // unknown the unrealized stays 0 and the gap is flagged in metadata + // for the operator/reconcile layer to repair from exchange facts. if self.entry_price <= 0.0 { - self.entry_price = price; + self.metadata + .insert("entry_basis_missing".to_string(), Value::from(true)); + self.unrealized_pnl = 0.0; + self.metadata + .insert("mark_price".to_string(), Value::from(price)); + return; } - if self.entry_price <= 0.0 || self.size <= 0.0 { + if self.size <= 0.0 { self.unrealized_pnl = 0.0; return; } - let mut delta = (price - self.entry_price) / self.entry_price; + // Quantity-denominated, leverage-free (leverage scales margin, not PnL). + let mut delta = price - self.entry_price; if self.side == TradeSide::SHORT { delta = -delta; } - self.unrealized_pnl = delta * self.size * self.entry_price * self.leverage; + self.unrealized_pnl = delta * self.size; self.metadata .insert("mark_price".to_string(), Value::from(price)); } @@ -1151,15 +1162,22 @@ impl KernelCore { } fn realized_pnl(slot: &TradeSlot, exit_price: f64, exit_size: f64) -> f64 { - if slot.entry_price <= 0.0 || exit_size <= 0.0 { + // PnL is fill-price based and quantity-denominated: + // LONG: (exit − entry) × qty SHORT: (entry − exit) × qty + // Leverage does NOT multiply PnL — it only scales margin. (The old + // ×leverage factor inflated every realized leg by the leverage and + // was one of the two factors in the 2026-06-11 FET −$5,990 mis-book.) + // exit_price <= 0 means the venue event carried no true fill price + // (e.g. BingX market-order bound price stripped by the adapter) — + // refuse to fabricate PnL from a missing price. + if slot.entry_price <= 0.0 || exit_size <= 0.0 || exit_price <= 0.0 || !exit_price.is_finite() { return 0.0; } - let mut delta = (exit_price - slot.entry_price) / slot.entry_price; + let mut delta = exit_price - slot.entry_price; if slot.side == TradeSide::SHORT { delta = -delta; } - let notional = exit_size * slot.entry_price * slot.leverage.max(1.0); - delta * notional + delta * exit_size } fn append_event_id(slot: &mut TradeSlot, event_id: &str) { @@ -1906,15 +1924,41 @@ impl KernelCore { } } + /// Route a fill to the entry or exit side by ORDER IDENTITY first, FSM + /// state second. A late entry-remainder fill arriving while an exit is + /// working must not be booked as an exit (it would reduce size and + /// fabricate realized PnL — fill-misclassification class of the + /// 2026-06-11 FET incident family). + fn fill_matches_order(order: &Option, event: &VenueEvent) -> bool { + match order { + Some(o) => { + (!event.venue_order_id.is_empty() + && !o.venue_order_id.is_empty() + && event.venue_order_id == o.venue_order_id) + || (!event.venue_client_id.is_empty() + && !o.venue_client_id.is_empty() + && event.venue_client_id == o.venue_client_id) + } + None => false, + } + } + fn apply_fill(&mut self, slot: &mut TradeSlot, event: &VenueEvent, partial: bool) { + // Identity-based routing: when the event carries order ids that match + // one of the working orders, that match decides entry-vs-exit. + let id_matches_entry = Self::fill_matches_order(&slot.active_entry_order, event); + let id_matches_exit = Self::fill_matches_order(&slot.active_exit_order, event); + if slot.active_entry_order.is_some() - && matches!( - slot.fsm_state, - TradeStage::ORDER_REQUESTED - | TradeStage::ORDER_SENT - | TradeStage::ENTRY_WORKING - | TradeStage::IDLE - ) + && !id_matches_exit + && (id_matches_entry + || matches!( + slot.fsm_state, + TradeStage::ORDER_REQUESTED + | TradeStage::ORDER_SENT + | TradeStage::ENTRY_WORKING + | TradeStage::IDLE + )) { let fill_size = if event.filled_size > 0.0 { event.filled_size @@ -1937,6 +1981,25 @@ impl KernelCore { .as_ref() .map(|order| order.intended_size) .unwrap_or(event.size); + // Entry basis = VWAP across entry fills (never the last fill's + // price alone, and never a price-less event's 0.0). + let prev_basis = if slot.entry_price > 0.0 { + slot.entry_price + } else { + slot.active_entry_order + .as_ref() + .map(|o| o.average_fill_price) + .unwrap_or(0.0) + }; + let vwap_entry = if event.price > 0.0 && accumulated > 0.0 { + if prev_basis > 0.0 && prev_filled > 0.0 { + (prev_basis * prev_filled + event.price * fill_size) / accumulated + } else { + event.price + } + } else { + prev_basis + }; slot.active_entry_order = Some(VenueOrder { internal_trade_id: slot.trade_id.clone(), venue_order_id: event.venue_order_id.clone(), @@ -1944,7 +2007,7 @@ impl KernelCore { side: slot.side.clone(), intended_size, filled_size: accumulated, - average_fill_price: event.price, + average_fill_price: vwap_entry, status: if partial { VenueOrderStatus::PARTIALLY_FILLED } else { @@ -1961,8 +2024,8 @@ impl KernelCore { slot.initial_size = if intended_size > 0.0 { intended_size } else { accumulated }; } slot.size = accumulated; - if event.price > 0.0 { - slot.entry_price = event.price; + if vwap_entry > 0.0 { + slot.entry_price = vwap_entry; } slot.unrealized_pnl = 0.0; slot.last_event_time = Some(event.timestamp); @@ -1977,7 +2040,7 @@ impl KernelCore { side: slot.side.clone(), intended_size: slot.size, filled_size: slot.size, - average_fill_price: event.price, + average_fill_price: slot.entry_price, status: VenueOrderStatus::FILLED, metadata: { let mut map = Map::new(); @@ -1990,6 +2053,7 @@ impl KernelCore { } if slot.active_exit_order.is_some() + && !id_matches_entry && matches!( slot.fsm_state, TradeStage::EXIT_REQUESTED @@ -2005,6 +2069,14 @@ impl KernelCore { } .max(0.0); let realized = Self::realized_pnl(slot, event.price, fill_size); + if fill_size > 0.0 && (event.price <= 0.0 || !event.price.is_finite()) { + // Exit fill without a true fill price: size is still reduced + // (the position really shrank) but no PnL is fabricated. + // Flag it so the settled/exchange-fact path can repair the + // realized figure from venue truth. + slot.metadata + .insert("realized_skipped_no_price".to_string(), Value::from(true)); + } slot.realized_pnl += realized; slot.size = (slot.size - fill_size).max(0.0); slot.mark_price(event.price); diff --git a/prod/clean_arch/dita_v2/bingx_user_stream.py b/prod/clean_arch/dita_v2/bingx_user_stream.py index 9bc0527..ef34c12 100644 --- a/prod/clean_arch/dita_v2/bingx_user_stream.py +++ b/prod/clean_arch/dita_v2/bingx_user_stream.py @@ -181,7 +181,7 @@ class BingxUserStream: data = bal.get("balance") if isinstance(bal.get("balance"), dict) else bal else: data = {} - wallet = _safe_float(data.get("equity") or data.get("balance") or data.get("totalWalletBalance")) + wallet = _safe_float(data.get("balance") or data.get("equity") or data.get("totalWalletBalance")) avail = _safe_float(data.get("availableMargin") or data.get("availableBalance")) used = _safe_float(data.get("usedMargin") or data.get("frozenMargin") or data.get("totalInitialMargin")) maint = _safe_float(data.get("maintenanceMargin") or data.get("totalMaintMargin") or 0.0) diff --git a/prod/clean_arch/dita_v2/bingx_venue.py b/prod/clean_arch/dita_v2/bingx_venue.py index 58a2674..b0bc570 100644 --- a/prod/clean_arch/dita_v2/bingx_venue.py +++ b/prod/clean_arch/dita_v2/bingx_venue.py @@ -451,8 +451,26 @@ class BingxVenueAdapter(VenueAdapter): include_history=False: all_orders/all_fills require a symbol (symbol=None skips them anyway), so include_history=True was fetching nothing extra. """ + # FILL VISIBILITY (2026-06-10): when the kernel slot owns an asset, + # fetch symbol-scoped history (all_orders + all_fills) so a maker + # entry that FILLED — and therefore left openOrders — reaches the FSM + # as a FULL_FILL event. With symbol=None the snapshot skips history + # entirely: the FSM stayed fill-blind (slot size 0 in ENTRY_WORKING), + # the DecisionEngine saw "no position", and re-entered → the live + # double-entries at 15:20 and 17:24 UTC. + recon_symbol = None + kernel = getattr(self, "_kernel_ref", None) + if kernel is not None: + try: + slot = kernel.slot(0) + if not slot.is_free() and getattr(slot, "asset", ""): + recon_symbol = str(slot.asset) + except Exception: + recon_symbol = None try: - snapshot = await self.backend.refresh_state(None, include_history=False) + snapshot = await self.backend.refresh_state( + recon_symbol, include_history=recon_symbol is not None + ) except Exception as exc: import logging as _log _log.getLogger(__name__).warning("reconcile: refresh_state failed: %s", exc) @@ -564,7 +582,13 @@ class BingxVenueAdapter(VenueAdapter): venue_client_id=client_order_id, side=intent.side, asset=intent.asset, - price=safe_float(_row_float(ack_row, "avgPrice", "ap", "price", "lastFillPrice", default=getattr(receipt, "price", 0.0)), 0.0), + # FILL price must be a TRUE fill price (avgPrice/lastFillPrice). + # Never fall back to the order's nominal "price" or the submit + # receipt price: for BingX MARKET orders that is the protective + # bound (±20-25% from mark) — it poisoned realized PnL on every + # market fill (FET −$5,990 mis-book, 2026-06-11). 0.0 = unknown; + # the kernel refuses to compute PnL from a missing price. + price=safe_float(_row_float(ack_row, "avgPrice", "ap", "lastFillPrice", "L", default=0.0), 0.0), size=float(intent.target_size or 0.0), filled_size=float(filled_size), remaining_size=float(remaining_size), @@ -680,6 +704,13 @@ class BingxVenueAdapter(VenueAdapter): filled = _row_float(row, "executedQty", "cumFilledQty", "filledQty", "z", "lastFilledQty", default=0.0) if filled <= 0.0 and kind in {KernelEventKind.PARTIAL_FILL, KernelEventKind.FULL_FILL}: filled = size + # For FILL events only true fill-price fields qualify; the nominal + # "price" is the MARKET bound price on BingX and must never feed PnL. + # Non-fill events (ACK/CANCEL/REJECT) may keep it as informational. + if kind in {KernelEventKind.PARTIAL_FILL, KernelEventKind.FULL_FILL}: + row_price = _row_float(row, "avgPrice", "ap", "lastFillPrice", "L", default=0.0) + else: + row_price = _row_float(row, "avgPrice", "ap", "price", "lastFillPrice", default=0.0) return VenueEvent( timestamp=datetime.now(timezone.utc), event_id=_event_id(self._event_seq), @@ -691,7 +722,7 @@ class BingxVenueAdapter(VenueAdapter): venue_client_id=_row_text(row, "clientOrderID", "clientOrderId", "c", default=""), side=_trade_side_from_row(row), asset=_row_text(row, "symbol", default=""), - price=safe_float(_row_float(row, "avgPrice", "ap", "price", "lastFillPrice", default=0.0), 0.0), + price=safe_float(row_price, 0.0), size=abs(float(size or 0.0)), filled_size=abs(float(filled or 0.0)), remaining_size=max(0.0, abs(float(size or 0.0)) - abs(float(filled or 0.0))), @@ -715,7 +746,9 @@ class BingxVenueAdapter(VenueAdapter): venue_client_id=_row_text(row, "clientOrderID", "clientOrderId", "c", default=""), side=_trade_side_from_row(row), asset=_row_text(row, "symbol", default=""), - price=safe_float(_row_float(row, "lastFillPrice", "L", "price", "ap", default=0.0), 0.0), + # True fill-price fields only — nominal "price" excluded (MARKET + # bound-price poisoning; see _events_from_submit note). + price=safe_float(_row_float(row, "lastFillPrice", "L", "avgPrice", "ap", default=0.0), 0.0), size=abs(_row_float(row, "executedQty", "z", "lastFilledQty", default=0.0)), filled_size=abs(_row_float(row, "lastFilledQty", "l", "z", default=0.0)), remaining_size=max(0.0, abs(_row_float(row, "executedQty", "z", "lastFilledQty", default=0.0)) - abs(_row_float(row, "lastFilledQty", "l", "z", default=0.0))), diff --git a/prod/clean_arch/dita_v2/rust_backend.py b/prod/clean_arch/dita_v2/rust_backend.py index 694e8cd..7e30678 100644 --- a/prod/clean_arch/dita_v2/rust_backend.py +++ b/prod/clean_arch/dita_v2/rust_backend.py @@ -1080,6 +1080,13 @@ class ExecutionKernel: slots = [self._get_slot(i) for i in range(self.max_slots)] self.account.observe_slots(slots) for current in slots: + # Anchor the settle baseline to the adopted slot's realized_pnl. + # _last_settled_pnl starts empty each process; without this, the + # first venue event on a reconcile-adopted slot settles the slot's + # ENTIRE carried realized_pnl into AccountProjection as if it were + # new PnL (cross-restart double-book class, 2026-06-11). + self._last_settled_pnl[current.slot_id] = float(current.realized_pnl or 0.0) + self._slot_was_closed[current.slot_id] = bool(current.closed) self.projection.write_slot(current) self.zinc_plane.write_slot(current) return outcome @@ -1175,9 +1182,18 @@ class ExecutionKernel: Safe to call on a fresh kernel (e.g. after startup) before any trades. """ try: - return _get_rust().restore_state(self._backend, json_str) + ok = _get_rust().restore_state(self._backend, json_str) except (ValueError, json.JSONDecodeError): return False + if ok: + # Re-anchor settle baselines to restored slot state (same + # cross-restart double-book guard as reconcile_from_slots). + self.state.refresh() + for slot_id in range(self.max_slots): + restored = self._get_slot(slot_id) + self._last_settled_pnl[slot_id] = float(restored.realized_pnl or 0.0) + self._slot_was_closed[slot_id] = bool(restored.closed) + return ok def is_capital_frozen(self) -> bool: """Return True if the kernel's capital is frozen (reconcile ERROR active). diff --git a/prod/clean_arch/runtime/pink_direct.py b/prod/clean_arch/runtime/pink_direct.py index f3cd103..5708094 100644 --- a/prod/clean_arch/runtime/pink_direct.py +++ b/prod/clean_arch/runtime/pink_direct.py @@ -15,6 +15,7 @@ import inspect import json import logging import math +import time from dataclasses import dataclass, field, replace from datetime import datetime, timezone from pathlib import Path @@ -39,6 +40,11 @@ from prod.clean_arch.dita_v2.contracts import ( TradeSide as DitaTradeSide, TradeStage, ) +from prod.clean_arch.dita_v2.exec_router import ( + ExecConfig, + ExecutionRouter, + MissAction, +) from prod.clean_arch.dita_v2.rust_backend import ExecutionKernel from prod.clean_arch.persistence import PinkClickHousePersistence from prod.clean_arch.ports.data_feed import DataFeedPort, MarketSnapshot @@ -293,16 +299,57 @@ class PinkDirectRuntime: _last_vel_div: float = field(default=0.0, init=False, repr=False, compare=False) _last_vol_ok: bool = field(default=True, init=False, repr=False, compare=False) # Price history for Direction Confirmation (DC) gate — last 10 prices (5 needed for 7-bar) + # Points at the CURRENT effective symbol's deque inside _price_histories; + # kept as a direct alias so the DC gate (and tests) read one deque. _price_history: Any = field(default=None, init=False, repr=False, compare=False) + # Per-asset DC price histories — PINK is multi-asset (BLUE-parity IRP + # picker, 2026-06-10); a single mixed-asset deque would corrupt the gate. + _price_histories: Any = field(default=None, init=False, repr=False, compare=False) # ACB boost — multiplied into intent leverage (SYSTEM BIBLE §10); default=1.0 (no-op) _last_acb_boost: float = field(default=1.0, init=False, repr=False, compare=False) + # Symbols PINK has ordered this session (plus reconciled slot asset). + # Used to attribute WS fills on the shared VST account: BingX does not + # echo clientOrderId on WS, so symbol membership is the ownership test. + _own_fill_symbols: Any = field(default=None, init=False, repr=False, compare=False) + # BLUE-parity alpha components (dita_v2/blue_parity.py, 2026-06-10). + # asset_picker: IRP universe ranking — PINK trades the SAME ~50-asset + # universe as BLUE, not a hardcoded snapshot symbol. None = legacy + # single-symbol behavior. + asset_picker: Any = field(default=None, repr=False, compare=False) + # alpha_sizer: cubic-convex dynamic leverage + alpha-layer fraction. + # Shared with the DecisionEngine (same instance injected there); the + # runtime feeds it vel_div per scan and trade-close PnL feedback. + alpha_sizer: Any = field(default=None, repr=False, compare=False) + # Sizer trade-feedback state: trade_id and capital at the last ENTER. + _sizer_open_tid: str = field(default="", init=False, repr=False, compare=False) + _sizer_entry_capital: float = field(default=0.0, init=False, repr=False, compare=False) + # Execution router (maker/taker policy). Injectable for tests; built from + # env at connect() when None. None/style=taker == legacy MARKET behavior. + exec_router: Any = field(default=None, repr=False, compare=False) + _exec_ttl_task: Optional[asyncio.Task] = field(default=None, init=False, repr=False, compare=False) + # trade_id → submitted KernelIntent for working maker quotes (retry/fallback rebuild) + _working_intents: Any = field(default=None, init=False, repr=False, compare=False) + # Monotonic ts of the last OWN fill seen on the WS account stream. The TTL + # handler refuses to re-quote within the hot window after any own fill — + # REST venue reconcile lags WS fills by seconds (live double-entry 2026-06-10). + _last_own_fill_mono: float = field(default=0.0, init=False, repr=False, compare=False) async def connect(self, initial_capital: float = 25000.0) -> None: """Connect data feed, venue, seed capital from exchange, start WS stream.""" from collections import deque self._price_history = deque(maxlen=10) + self._price_histories = {} + self._own_fill_symbols = set() await self.data_feed.connect() venue = self.kernel.venue + # Back-reference for the venue's reconcile/cancel paths (2026-06-10): + # lets reconcile() fetch symbol-scoped fills for the slot's asset so + # maker fills reach the FSM. Was referenced in bingx_venue but never + # wired anywhere — silently dead until today. + try: + venue._kernel_ref = self.kernel + except Exception: + pass if hasattr(venue, "connect"): try: result = venue.connect() @@ -316,6 +363,12 @@ class PinkDirectRuntime: # 100K+ would cause a ~75K reconcile delta → capital_frozen=True. live_capital = await self._fetch_exchange_wallet_balance(initial_capital) _reconcile_position_slot(self.kernel, live_capital, slot_id=0) + try: + slot_asset = str(self.kernel.slot(0).asset or "") if self.kernel.max_slots > 0 else "" + if slot_asset: + self._own_fill_symbols.add(slot_asset.upper()) + except Exception: + pass # Seed the kernel's atomic K-account from exchange truth. self.kernel.set_seed_capital(live_capital) @@ -335,7 +388,33 @@ class PinkDirectRuntime: self._run_account_stream(), name="pink_account_stream" ) + # Execution router: maker/taker policy layer. Built from env unless a + # test injected one. With style=taker (default) it is a pure pass- + # through and the TTL loop has nothing to do — legacy behavior. + if self.exec_router is None: + self.exec_router = ExecutionRouter(ExecConfig.from_env(), logger=self.logger) + self._working_intents = {} + self.logger.info("EXEC_ROUTER: style=%s entry_ttl=%.1fs exit_ttl=%.1fs " + "miss=%s retries=%d exhaust=%s post_only=%s", + self.exec_router.config.style, + self.exec_router.config.entry_ttl_s, + self.exec_router.config.exit_ttl_s, + self.exec_router.config.entry_miss, + self.exec_router.config.entry_retries, + self.exec_router.config.retry_exhaust, + self.exec_router.config.post_only) + self._exec_ttl_task = asyncio.create_task( + self._exec_ttl_loop(), name="pink_exec_ttl" + ) + async def disconnect(self) -> None: + if self._exec_ttl_task is not None: + self._exec_ttl_task.cancel() + try: + await self._exec_ttl_task + except asyncio.CancelledError: + pass + self._exec_ttl_task = None if self._account_stream_task is not None: self._account_stream_task.cancel() try: @@ -506,6 +585,31 @@ class PinkDirectRuntime: except Exception as exc: self.logger.warning("Fee calibration failed: %s", exc) + def _fill_is_ours(self, event: Any) -> bool: + """Attribute a WS fill on the shared VST account to PINK or a foreign + system (PRODGREEN/BLUE/manual). + + Order of evidence: + 1. clientOrderId prefix "p-" — definitive PINK signature (BingX WS + currently does NOT echo it, but honour it if that changes). + 2. A non-empty foreign clientOrderId — definitively not ours. + 3. Symbol membership in the set of symbols PINK has ordered this + session (incl. the reconciled slot asset). PINK trades multiple + assets (BTC, TRX, ALGO, …) so this must NOT be a hardcoded symbol. + 4. Unattributable (no cid, no symbol) — process it; the + ACCOUNT_UPDATE reseed bounds any contamination. + """ + cid = str(getattr(event, "client_order_id", "") or "") + if cid.startswith("p-"): + return True + if cid: + return False + sym = str(getattr(event, "symbol", "") or "").upper() + if not sym: + return True + own = self._own_fill_symbols or set() + return sym in own + async def _run_account_stream(self) -> None: """ Background task: WS stream → kernel.on_account_event() → reconcile gate. @@ -529,6 +633,26 @@ class PinkDirectRuntime: try: async for event in stream.subscribe(): if event.kind in {ExchangeEventKind.FULL_FILL, ExchangeEventKind.PARTIAL_FILL}: + # Skip fills PINK does not own (e.g. PRODGREEN trading on + # the shared VST account). BingX WS delivers all fills on + # the listen key regardless of origin. Processing a foreign + # fill contaminates K without a matching PINK intent, + # causing a persistent reconcile delta. Ownership is by + # clientOrderId / session symbol set — PINK is multi-asset + # (BTC, TRX, ALGO, …), never a hardcoded symbol. + if self._fill_is_ours(event): + # Latch for the exec-router TTL handler: an own fill + # just happened; do NOT trust a stale-flat venue + # snapshot to justify re-quoting (2026-06-10 live + # double-entry: REST reconcile lagged the WS fill). + self._last_own_fill_mono = time.monotonic() + if not self._fill_is_ours(event): + self.logger.info( + "Foreign fill skipped: symbol=%s qty=%s cid=%r oid=%s", + event.symbol, event.fill_qty, + event.client_order_id, event.order_id, + ) + continue # Immediately predict+fold fee from model so K tracks E # without waiting for FILL_SETTLED. When FILL_SETTLED # arrives with the actual fee, it replaces the prediction @@ -583,6 +707,21 @@ class PinkDirectRuntime: # crash recovery + session-to-session calibration continuity. _persist_kernel_snapshot(self.kernel, self.logger) elif event.kind == ExchangeEventKind.ACCOUNT_UPDATE: + # BingX WS also sends position/margin-only ACCOUNT_UPDATE + # frames with no USDT balance entry; the parser yields + # wallet_balance=0 for those. They carry no E-facts — + # folding them zeroes e_available_margin (available_capital + # =0.0) against a stale e_wallet and re-runs reconcile on + # stale data (the 2026-06-09 stuck-freeze). Drop them. + if not (event.wallet_balance and event.wallet_balance > 0): + continue + # Re-seed K on every balance-bearing ACCOUNT_UPDATE (poll + # gap-backfill AND live WS). The exchange is the ledger of + # record; the update carries the post-trade wallet balance + # (wb = cash), so seeding from it keeps K ≈ E even when a + # shared-account system (e.g. PRODGREEN) trades and moves + # wb without PINK making a fill. + self.kernel.reset_and_seed(float(event.wallet_balance)) result = self.kernel.on_account_event({ "kind": "ACCOUNT_UPDATE", "wallet_balance": event.wallet_balance, @@ -781,6 +920,303 @@ class PinkDirectRuntime: except Exception: pass + # ── Execution-router drivers ───────────────────────────────────────────── + # The router (dita_v2/exec_router.py) is pure policy; these methods are + # the only place its decisions touch the kernel/venue. Every await is + # followed by a state re-check: fills race cancels on a live venue. + + _SLOT_OPENISH = ("PARTIAL_FILL", "POSITION_OPENED", "POSITION_OPEN", + "EXIT_REQUESTED", "EXIT_SENT", "EXIT_ACKED", "EXIT_WORKING", + "POSITION_PARTIALLY_CLOSED") + + def _exec_slot_view(self) -> tuple[str, str, float]: + """(trade_id, fsm_state_name, size) of slot 0 — tolerant of mocks.""" + try: + slot = self.kernel.slot(0) + except Exception: + return "", "", 0.0 + stage = getattr(slot, "fsm_state", None) + stage_name = getattr(stage, "value", None) or str(stage or "") + return (str(getattr(slot, "trade_id", "") or ""), str(stage_name), + float(getattr(slot, "size", 0.0) or 0.0)) + + def _exec_plan_for(self, decision: Any, kernel_intent: KernelIntent, + snapshot: Any) -> Any: + router = self.exec_router + if router is None: + return None + try: + ref = float(getattr(kernel_intent, "reference_price", 0.0) or 0.0) + side = getattr(kernel_intent, "side", None) + side_name = getattr(side, "value", None) or str(side or "SHORT") + if decision.action == DecisionAction.ENTER: + return router.plan_entry( + trade_id=kernel_intent.trade_id, asset=kernel_intent.asset, + position_side=side_name, reference_price=ref, + ) + return router.plan_exit( + trade_id=kernel_intent.trade_id, asset=kernel_intent.asset, + position_side=side_name, reference_price=ref, + reason=str(getattr(decision, "reason", "") or ""), + ) + except Exception as exc: + # Router failure must never block trading — degrade to legacy taker. + self.logger.warning("EXEC_ROUTER plan failed (%s) — taker fallback", exc) + return None + + def _exec_after_submit(self, plan: Any, kernel_intent: KernelIntent, + outcome: Any) -> None: + """Classify a maker submit: filled now, resting, or rejected. + + Resting and rejected both register as working — a rejected post-only + quote registers with an already-expired deadline so the TTL loop + resolves it through the one shared miss/fallback path within a tick. + """ + router = self.exec_router + if router is None: + return + try: + tid = kernel_intent.trade_id + slot_tid, stage, size = self._exec_slot_view() + filled = ( + (plan.action == "ENTER" and slot_tid == tid and size > 0.0 + and stage in self._SLOT_OPENISH) + or (plan.action == "EXIT" and ( + slot_tid != tid or stage in ("POSITION_CLOSED", "CLOSED", + "TRADE_TERMINAL_WRITTEN", "IDLE"))) + ) + if filled: + self._emit("exec_router", event="immediate_fill", action=plan.action, + trade_id=tid, reason=plan.reason) + return + rejected = stage in ("ORDER_REJECTED", "EXIT_REJECTED") + wo = router.register_working( + trade_id=tid, asset=kernel_intent.asset, + position_side=(getattr(kernel_intent.side, "value", None) + or str(kernel_intent.side)), + plan=plan, + base_trade_id=plan.metadata.get("base_trade_id") or tid, + retry_n=int(plan.metadata.get("retry_n", 0) or 0), + ) + self._working_intents[tid] = kernel_intent + if rejected: + wo.deadline = router.clock() # resolve immediately via TTL loop + self._emit("exec_router", event="working", action=plan.action, + trade_id=tid, limit_price=plan.limit_price, + ttl_s=plan.ttl_s, rejected=rejected, reason=plan.reason) + self.logger.info("EXEC_ROUTER working %s %s @ %.10g ttl=%.1fs%s", + plan.action, tid, plan.limit_price, plan.ttl_s, + " (post-only REJECTED — instant resolve)" if rejected else "") + except Exception as exc: + self.logger.warning("EXEC_ROUTER after-submit failed: %s", exc) + + async def _exec_cancel_working(self, trade_id: str, *, reason: str) -> None: + """Cancel a working quote via the kernel (idempotent on the venue).""" + router = self.exec_router + intent = (self._working_intents or {}).get(trade_id) + if router is None or router.working(trade_id) is None: + return + try: + base = intent + cancel_intent = KernelIntent( + timestamp=datetime.now(timezone.utc), + intent_id=f"{trade_id}-cxl", + trade_id=trade_id, + slot_id=0, + asset=(base.asset if base is not None else ""), + side=(base.side if base is not None else DitaTradeSide.SHORT), + action=KernelCommandType.CANCEL, + reference_price=(base.reference_price if base is not None else 0.0), + target_size=(base.target_size if base is not None else 0.0), + leverage=(base.leverage if base is not None else 1.0), + reason=f"exec_router:{reason}", + ) + await self.kernel.process_intent_async(cancel_intent) + except Exception as exc: + self.logger.warning("EXEC_ROUTER cancel %s failed: %s", trade_id, exc) + router.note_cancel(trade_id) + (self._working_intents or {}).pop(trade_id, None) + self._emit("exec_router", event="cancel", trade_id=trade_id, reason=reason) + + def _exec_safe_to_requote(self, wo: Any) -> bool: + """True only when the venue is provably flat for a re-quote. + + Fails SAFE (returns False) on: a recent own fill (REST reconcile lags + WS fills by seconds), any live exchange position, or any probe error. + """ + if time.monotonic() - (self._last_own_fill_mono or 0.0) < 5.0: + return False + try: + venue = self.kernel.venue + rows = venue.open_positions() if hasattr(venue, "open_positions") else [] + for row in rows or []: + qty = abs(float(row.get("positionAmt") or row.get("positionQty") + or row.get("qty") or 0.0)) + if qty > 1e-9: + return False + except Exception as exc: + self.logger.warning("EXEC_ROUTER requote probe failed (%s) — fail safe", exc) + return False + return True + + async def _exec_ttl_loop(self) -> None: + """1 s sweep: resolve expired maker quotes (fill-check → cancel → + miss policy / exit escalation). Scan cadence (~10 s) is too coarse + for 5–8 s TTLs, hence the dedicated task.""" + try: + while True: + await asyncio.sleep(1.0) + router = self.exec_router + if router is None: + continue + for wo in router.expired(): + try: + await self._handle_expired_working(wo) + except Exception as exc: + self.logger.error("EXEC_ROUTER expiry handler failed for %s: %s", + wo.trade_id, exc, exc_info=True) + except asyncio.CancelledError: + raise + + async def _handle_expired_working(self, wo: Any) -> None: + router = self.exec_router + if router is None or router.working(wo.trade_id) is None: + return # already resolved (fill/cancel notification raced us) + + # 1. Drain any late venue events first — the quote may already be filled. + await self.pump_venue_events() + if router.working(wo.trade_id) is None: + return + + def _entry_filled() -> bool: + slot_tid, stage, size = self._exec_slot_view() + return slot_tid == wo.trade_id and size > 0.0 and stage in self._SLOT_OPENISH + + def _exit_done() -> bool: + slot_tid, stage, size = self._exec_slot_view() + return (slot_tid != wo.trade_id or size <= 0.0 + or stage in ("POSITION_CLOSED", "CLOSED", + "TRADE_TERMINAL_WRITTEN", "IDLE")) + + # 2. Cancel the quote (idempotent; CANCEL_REJECT on a filled order is + # harmless). For a partially-filled entry this cancels the remainder. + intent = (self._working_intents or {}).get(wo.trade_id) + try: + cancel_intent = KernelIntent( + timestamp=datetime.now(timezone.utc), + intent_id=f"{wo.trade_id}-ttlcxl", + trade_id=wo.trade_id, + slot_id=0, + asset=wo.asset, + side=(intent.side if intent is not None else DitaTradeSide.SHORT), + action=KernelCommandType.CANCEL, + reference_price=(intent.reference_price if intent is not None else 0.0), + target_size=(intent.target_size if intent is not None else 0.0), + leverage=(intent.leverage if intent is not None else 1.0), + reason="exec_router:ttl_expired", + ) + await self.kernel.process_intent_async(cancel_intent) + except Exception as exc: + self.logger.warning("EXEC_ROUTER ttl-cancel %s failed: %s", wo.trade_id, exc) + await self.pump_venue_events() + + # 3. Re-classify after the cancel round-trip (fill may have raced it). + if wo.action == "ENTER" and _entry_filled(): + router.note_fill(wo.trade_id) + (self._working_intents or {}).pop(wo.trade_id, None) + self._emit("exec_router", event="fill_after_ttl", trade_id=wo.trade_id) + return + if wo.action == "EXIT" and _exit_done(): + router.note_fill(wo.trade_id) + (self._working_intents or {}).pop(wo.trade_id, None) + self._emit("exec_router", event="fill_after_ttl", trade_id=wo.trade_id) + return + + router.note_cancel(wo.trade_id) + base_intent = (self._working_intents or {}).pop(wo.trade_id, None) + + # 4. EXIT: never strand a position — escalate to MARKET, same trade_id. + if wo.action == "EXIT": + _tid, plan = router.market_fallback_plan(wo) + if base_intent is not None and not _exit_done(): + market_exit = replace( + base_intent, + intent_id=f"{wo.trade_id}-mkt", + order_type="MARKET", limit_price=0.0, + timestamp=datetime.now(timezone.utc), + metadata={**(base_intent.metadata or {}), + "_time_in_force": "GTC", + "_exec_reason": plan.reason}, + ) + market_exit = self._exit_intent_from_slot(market_exit) + self.logger.warning("EXEC_ROUTER exit TTL → MARKET fallback %s", wo.trade_id) + self._emit("exec_router", event="exit_market_fallback", trade_id=wo.trade_id) + await self.kernel.process_intent_async(market_exit) + await self.pump_venue_events() + return + + # 5. ENTER miss policy: skip | retry (bounded) | market. + action = router.entry_miss_action(wo) + self._emit("exec_router", event="entry_miss", trade_id=wo.trade_id, + action=action, retry_n=wo.retry_n) + if action == MissAction.SKIP or base_intent is None: + self.logger.info("EXEC_ROUTER entry miss %s → skip", wo.trade_id) + return + slot_tid, stage, size = self._exec_slot_view() + if size > 0.0 or stage in self._SLOT_OPENISH: + # Slot occupied (raced fill of remainder / another trade) — never + # double-enter. + self.logger.warning("EXEC_ROUTER entry miss %s: slot busy (%s) — skip", + wo.trade_id, stage) + return + # Venue-truth gate (live double-entry fix, 2026-06-10): the kernel slot + # can show flat while the venue holds a position the REST reconcile has + # not surfaced yet. Re-quote ONLY when the venue is provably flat AND + # no own fill landed in the hot window. Ambiguity → skip; a skipped + # entry is always safe, a doubled position is not. + if not self._exec_safe_to_requote(wo): + self.logger.warning( + "EXEC_ROUTER entry miss %s: venue not provably flat " + "(recent own fill or live position) — skip requote", wo.trade_id) + self._emit("exec_router", event="requote_blocked", trade_id=wo.trade_id) + return + ref = 0.0 + try: + # Multi-asset: prefer the working order's OWN asset history; the + # alias deque may track a different effective symbol. + hist = None + if self._price_histories is not None: + hist = self._price_histories.get(str(wo.asset or "").upper()) + if not hist: + hist = self._price_history + if hist: + ref = float(hist[-1]) + except Exception: + ref = 0.0 + if ref <= 0.0: + ref = float(wo.plan.limit_price or 0.0) + if action == MissAction.RETRY: + new_tid, plan = router.retry_plan(wo, reference_price=ref) + else: # MARKET + new_tid, plan = router.market_fallback_plan(wo) + new_intent = replace( + base_intent, + trade_id=new_tid, intent_id=new_tid, + timestamp=datetime.now(timezone.utc), + reference_price=(ref if ref > 0.0 else base_intent.reference_price), + order_type=plan.order_type, + limit_price=float(plan.limit_price or 0.0), + metadata={**(base_intent.metadata or {}), + "_time_in_force": ("PostOnly" if plan.post_only else "GTC"), + "_exec_reason": plan.reason}, + ) + self.logger.info("EXEC_ROUTER entry %s → %s as %s", wo.trade_id, + action, new_tid) + outcome = await self.kernel.process_intent_async(new_intent) + if plan.is_maker: + self._exec_after_submit(plan, new_intent, outcome) + await self.pump_venue_events() + async def pump_venue_events( self, snapshot: Any | None = None, *, market_state: Any = None ) -> int: @@ -823,6 +1259,30 @@ class PinkDirectRuntime: ) != KernelDiagnosticCode.DUPLICATE_EVENT: applied.append(event) + # Resolve working maker quotes against applied venue truth so the + # TTL loop never cancels an order the venue already terminalised. + if self.exec_router is not None: + for event in applied: + tid = str(getattr(event, "trade_id", "") or "") + if not tid or self.exec_router.working(tid) is None: + continue + kind = getattr(getattr(event, "kind", None), "value", "") or str( + getattr(event, "kind", "") or "") + status = getattr(getattr(event, "status", None), "value", "") or str( + getattr(event, "status", "") or "") + if "FULL_FILL" in kind or status == "FILLED": + self.exec_router.note_fill(tid) + (self._working_intents or {}).pop(tid, None) + elif "CANCEL_ACK" in kind or status in ("CANCELED", "CANCELLED"): + # Venue-side cancel (incl. post-only reject surfacing via + # reconcile): do NOT drop the working order here — pull its + # deadline to now so the TTL loop resolves it through the + # one shared miss/escalation path (retry/market/skip per + # config; exits always escalate to MARKET). + wo = self.exec_router.working(tid) + if wo is not None: + wo.deadline = self.exec_router.clock() + if applied and self.persistence is not None: slot_dict = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} persist_snapshot = snapshot @@ -846,6 +1306,26 @@ class PinkDirectRuntime: # Exits are never frozen — only new ENTERs are blocked on reconcile ERROR. if getattr(self, "_enter_frozen", False): return "account reconcile ERROR — new ENTERs frozen until K≈E restored" + # SINGLE-SLOT INVARIANT (2026-06-10, after two live double-entries): + # never ENTER while the exchange shows ANY open position. A filled + # maker entry vanishes from openOrders; the symbol-less reconcile + # fetches no fills, reads the disappearance as a cancel, frees the + # slot, and the next scan re-enters → 2x position. The exchange + # position list is the truth that survives that misclassification. + if time.monotonic() - (getattr(self, "_last_own_fill_mono", 0.0) or 0.0) < 5.0: + return "own fill within hot window — entry deferred until state settles" + try: + venue = self.kernel.venue + rows = venue.open_positions() if hasattr(venue, "open_positions") else [] + for row in rows or []: + qty = abs(float(row.get("positionAmt") or row.get("positionQty") + or row.get("qty") or 0.0)) + if qty > 1e-9: + return (f"exchange holds open position " + f"({row.get('symbol')} qty={qty}) — single-slot ENTER blocked") + except Exception as exc: + self.logger.warning("entry venue-position probe failed (%s) — blocking ENTER", exc) + return "venue position probe failed — fail safe, no ENTER without proof" """Return why an ENTER's sizing inputs are unsafe, or None if sound. notional = capital × fraction × leverage is self-limiting; the only way @@ -891,6 +1371,110 @@ class PinkDirectRuntime: exit_size = policy_size if policy_ok else 0.0 return replace(kernel_intent, target_size=exit_size) + def _effective_snapshot(self, snapshot: MarketSnapshot) -> tuple[MarketSnapshot, str]: + """Retarget the regime snapshot onto the TRADE asset (BLUE parity). + + The data feed delivers the BTC-anchored eigen scan (regime signal: + vel_div, irp). The TRADE asset is a separate concern — BLUE selects + it per signal from the scan's ~50-asset universe via IRP (SYSTEM + BIBLE §5). Rules, in priority order: + + 1. Slot occupied → the slot's asset, priced from the universe + payload. Exits MUST evaluate the position's own price, never + the regime anchor's. + 2. Flat + picker warm → IRP top candidate for the SHORT regime. + 3. Flat + picker cold / no candidate → entries suppressed + (returned block reason; BLUE has no BTC fallback when IRP is + enabled). + + Returns (snapshot, enter_block_reason). Empty reason = entries OK. + Also feeds the picker/sizer one observation per NEW scan (deduped on + scan_number inside the components). + """ + import dataclasses as _dc + + payload = snapshot.scan_payload if isinstance(snapshot.scan_payload, dict) else {} + scan_number = int(payload.get("scan_number") or snapshot.scan_number or 0) + vel_div = payload.get("vel_div") + if vel_div is None: + vel_div = snapshot.velocity_divergence + if self.alpha_sizer is not None and vel_div is not None: + try: + self.alpha_sizer.observe(vel_div, scan_number) + except Exception: + pass + if self.asset_picker is None: + return snapshot, "" + try: + self.asset_picker.observe(payload, scan_number) + except Exception as exc: + self.logger.warning("asset picker observe failed: %s", exc) + + # 1. Open/working slot → follow its asset. + slot_asset = "" + try: + if self.kernel.max_slots > 0: + slot = self.kernel.slot(0) + if not slot.is_free(): + slot_asset = str(getattr(slot, "asset", "") or "").upper() + except Exception: + slot_asset = "" + if slot_asset: + if slot_asset == str(snapshot.symbol).upper(): + return snapshot, "" + px = self.asset_picker.price_of(slot_asset) + if px is None or px <= 0: + # No universe price for the slot asset (e.g. adopted stray). + # Exits must not be evaluated at the anchor's price — a BTC + # price against a stray's entry price would fire a bogus SL. + self.logger.error( + "no universe price for open slot asset %s — policy step " + "degraded to HOLD (exit eval needs the asset's own price)", + slot_asset) + return snapshot, f"all:no price for open slot asset {slot_asset}" + return _dc.replace(snapshot, symbol=slot_asset, price=float(px)), "" + + # 2./3. Flat → IRP pick. + if not self.asset_picker.warm: + return snapshot, "IRP picker warming up — universe history incomplete" + choice = None + try: + choice = self.asset_picker.pick(direction=-1) + except Exception as exc: + self.logger.warning("asset picker rank failed: %s", exc) + if choice is None: + return snapshot, "no IRP candidate passed gates (BLUE: no fallback asset)" + asset, px, ars = choice + asset = str(asset).upper() + if asset == str(snapshot.symbol).upper(): + return snapshot, "" + return _dc.replace(snapshot, symbol=asset, price=float(px)), "" + + def _sizer_trade_feedback(self, acc: dict, slot_dict: dict) -> None: + """Close-out detection → feed realized PnL into the alpha layers. + + Capital-delta PnL (net of fees) — the kernel's capital is the + authoritative ledger, and bucket/streak multipliers only need the + sign and rough magnitude. + """ + if self.alpha_sizer is None or not self._sizer_open_tid: + return + open_tid = str(slot_dict.get("trade_id") or "") if slot_dict else "" + still_open = ( + open_tid == self._sizer_open_tid + and float(slot_dict.get("size") or 0.0) > 0 + and not slot_dict.get("closed", False) + ) + if still_open: + return + pnl = float(acc.get("capital") or 0.0) - self._sizer_entry_capital + self._sizer_open_tid = "" + try: + self.alpha_sizer.record_close(pnl) + self.logger.info("alpha sizer feedback: trade closed pnl=%.4f", pnl) + except Exception: + pass + async def step(self, snapshot: MarketSnapshot) -> Decision: """Single policy + execution cycle. @@ -903,6 +1487,11 @@ class PinkDirectRuntime: 6. Persist """ market_state = self._update_market_state_runtime(snapshot) + # BLUE-parity retarget (2026-06-10): regime signal stays BTC-anchored, + # the TRADE asset comes from the slot (exits) or the IRP picker + # (entries). Must run BEFORE the fill pump / policy so every consumer + # below sees the trade asset's own price. + snapshot, _enter_block = self._effective_snapshot(snapshot) # Drain any late fills BEFORE the policy reads slot/account state, so a # resting LIMIT that filled since the last cycle is reflected. await self.pump_venue_events(snapshot, market_state=market_state) @@ -939,7 +1528,16 @@ class PinkDirectRuntime: closed=False, ) - # Price history for DC gate — update before decide() so current tick is included + # Sizer feedback: detect a trade that closed since the last cycle. + self._sizer_trade_feedback(acc, slot_dict) + + # Price history for DC gate — per effective asset (multi-asset since + # 2026-06-10). _price_history aliases the current asset's deque so the + # DC gate below reads a single-asset series. + if self._price_histories is not None: + from collections import deque as _deque + _sym = str(snapshot.symbol or "").upper() + self._price_history = self._price_histories.setdefault(_sym, _deque(maxlen=10)) if self._price_history is not None and snapshot.price and snapshot.price > 0: self._price_history.append(float(snapshot.price)) @@ -955,6 +1553,15 @@ class PinkDirectRuntime: # dc_skip_contradicts = True → rising price during short window = HOLD. dc_blocked = self._dc_contradicts() decision = self.decision_engine.decide(snapshot, context, legacy_position) + if _enter_block: + _block_all = _enter_block.startswith("all:") + if decision.action == DecisionAction.ENTER or ( + _block_all and decision.action == DecisionAction.EXIT + ): + import dataclasses + decision = dataclasses.replace(decision, action=DecisionAction.HOLD, reason="ASSET_PICKER_BLOCK") + self.logger.info("action blocked by asset picker: %s (vel_div=%.4f scan=%d)", + _enter_block, self._last_vel_div, self._last_scan_number) if dc_blocked and decision.action == DecisionAction.ENTER: import dataclasses decision = dataclasses.replace(decision, action=DecisionAction.HOLD_DC_CONTRADICTED, reason="DC_CONTRADICT") @@ -971,10 +1578,15 @@ class PinkDirectRuntime: intent = plan.intent # ACB boost (SYSTEM BIBLE §10): multiply intent leverage by the current boost - # factor from acb_processor_service. Capped at exchange_leverage_cap (3x). + # factor from acb_processor_service. Capped at the STRATEGY max leverage + # (decision config) — intent.leverage is fractional conviction; the + # integer exchange cap is applied separately at the venue boundary + # (map_internal_conviction_to_exchange_leverage). The old hardcoded + # min(3.0, …) silently clamped BLUE-parity conviction. if self._last_acb_boost != 1.0 and intent is not None: import dataclasses as _dc - boosted_lev = min(3.0, max(1.0, float(intent.leverage or 1.0) * self._last_acb_boost)) + _lev_cap = float(getattr(self.decision_engine.config, "max_leverage", 3.0) or 3.0) + boosted_lev = min(_lev_cap, max(0.5, float(intent.leverage or 1.0) * self._last_acb_boost)) intent = _dc.replace(intent, leverage=boosted_lev) if decision.action in {DecisionAction.ENTER, DecisionAction.EXIT}: @@ -1014,8 +1626,63 @@ class PinkDirectRuntime: # overshoot an open position. kernel_intent = self._exit_intent_from_slot(kernel_intent) + # ── Execution router: decide HOW this intent reaches the venue ── + # (taker MARKET vs post-only maker LIMIT). Policy only — sizing, + # signal and TP logic above are untouched. plan=None or + # style=taker leaves kernel_intent exactly as built (legacy path). + exec_plan = self._exec_plan_for(decision, kernel_intent, snapshot) + if exec_plan is not None and exec_plan.suppress: + # Duplicate guard: a working maker quote already represents + # this action. Do not double-submit. + self.logger.info("EXEC_ROUTER suppress %s %s: %s", + decision.action.value, kernel_intent.trade_id, + exec_plan.reason) + self._emit("exec_router", event="suppress", + action=decision.action.value, + trade_id=kernel_intent.trade_id, reason=exec_plan.reason) + return decision + if exec_plan is not None and exec_plan.metadata.get("preempt_working"): + # Urgent exit preempting a resting maker exit: cancel the + # quote first so the MARKET close cannot double-fill. + await self._exec_cancel_working(kernel_intent.trade_id, + reason="urgent_exit_preempt") + if exec_plan is not None and exec_plan.order_type == "LIMIT": + kernel_intent = replace( + kernel_intent, + order_type="LIMIT", + limit_price=float(exec_plan.limit_price), + metadata={ + **(kernel_intent.metadata or {}), + "_time_in_force": "PostOnly" if exec_plan.post_only else "GTC", + "_exec_reason": exec_plan.reason, + }, + ) + + # Register the symbol BEFORE the order can fill so the account + # stream attributes the resulting WS fill to PINK. + try: + if intent.asset and self._own_fill_symbols is not None: + self._own_fill_symbols.add(str(intent.asset).upper()) + except Exception: + pass + + # Alpha-layer feedback: remember which bucket sized this ENTER and + # the capital baseline, so the close can be credited back. + if decision.action == DecisionAction.ENTER and self.alpha_sizer is not None: + try: + self.alpha_sizer.note_entry() + self._sizer_open_tid = str(kernel_intent.trade_id or "") + self._sizer_entry_capital = float(context.capital) + except Exception: + pass + outcome = await self.kernel.process_intent_async(kernel_intent) + # Maker quotes that did not terminally fill register as working + # so the TTL loop can cancel/retry/fall back. + if exec_plan is not None and exec_plan.is_maker: + self._exec_after_submit(exec_plan, kernel_intent, outcome) + # Locate the source of any non-finite intent the kernel rejected: # log the full upstream provenance (snapshot price, account capital, # leverage, sizing) so a numerical error can be traced to its origin diff --git a/prod/launch_dolphin_pink.py b/prod/launch_dolphin_pink.py index 9cbc92a..315404f 100644 --- a/prod/launch_dolphin_pink.py +++ b/prod/launch_dolphin_pink.py @@ -288,19 +288,51 @@ def _build_runtime(*, phase: PinkPhase) -> PinkDirectRuntime: # DOLPHIN_PINK_VEL_DIV_THRESHOLD: relax for on-exchange debugging (e.g. -0.005). # Default -0.02 matches BLUE production. BLUE is unaffected. _vel_div_threshold = float(os.environ.get("DOLPHIN_PINK_VEL_DIV_THRESHOLD", "-0.02")) + # BLUE-parity sizing (SYSTEM BIBLE §6, restored 2026-06-10): cubic-convex + # dynamic leverage over [min, max] strategy conviction. BLUE production + # runs 0.5–8.0 convexity 3. The integer at-exchange leverage is derived + # separately at the venue boundary (prod/bingx/leverage.py conviction map, + # security-capped). Previously PINK pinned every entry at 3.0x flat. + _min_leverage = float(os.environ.get("DOLPHIN_PINK_MIN_LEVERAGE", "0.5")) + _max_leverage = float(os.environ.get("DOLPHIN_PINK_MAX_LEVERAGE", "8.0")) + _convexity = float(os.environ.get("DOLPHIN_PINK_LEVERAGE_CONVEXITY", "3.0")) + _vel_div_extreme = min(_vel_div_threshold * 2.5, -0.001) cfg = DecisionConfig( vel_div_threshold=_vel_div_threshold, - vel_div_extreme=min(_vel_div_threshold * 2.5, -0.001), + vel_div_extreme=_vel_div_extreme, fixed_tp_pct=float(os.environ.get("DOLPHIN_FIXED_TP_PCT", "0.0020")), max_hold_bars=int(os.environ.get("DOLPHIN_MAX_HOLD_BARS", "250")), capital_fraction=0.20, - max_leverage=3.0, + max_leverage=_max_leverage, allow_short=True, allow_long=False, policy_version="pink_ditav2_v1", exit_leg_ratios=_resolve_pink_exit_leg_ratios(phase), ) - decision = DecisionEngine(cfg) + # BLUE-parity alpha components. Kill switches: DOLPHIN_PINK_ALPHA_SIZER=0 + # restores the legacy flat-leverage formula; DOLPHIN_PINK_ASSET_SELECTION=0 + # restores single-symbol (snapshot anchor) trading. + alpha_sizer = None + asset_picker = None + if _env_bool("DOLPHIN_PINK_ALPHA_SIZER", True): + from prod.clean_arch.dita_v2.blue_parity import PinkAlphaSizer + alpha_sizer = PinkAlphaSizer( + base_fraction=0.20, + min_leverage=_min_leverage, + max_leverage=_max_leverage, + leverage_convexity=_convexity, + vel_div_threshold=_vel_div_threshold, + vel_div_extreme=_vel_div_extreme, + use_dynamic_leverage=_env_bool("DOLPHIN_PINK_DYNAMIC_LEVERAGE", True), + use_alpha_layers=_env_bool("DOLPHIN_PINK_ALPHA_LAYERS", True), + ) + if _env_bool("DOLPHIN_PINK_ASSET_SELECTION", True): + from prod.clean_arch.dita_v2.blue_parity import PinkAssetPicker + asset_picker = PinkAssetPicker( + lookback=int(os.environ.get("DOLPHIN_PINK_IRP_LOOKBACK", "0") or 0), + min_irp_alignment=float(os.environ.get("DOLPHIN_PINK_MIN_IRP_ALIGNMENT", "0.0")), + ) + decision = DecisionEngine(cfg, sizer=alpha_sizer) intent = IntentEngine(cfg) # DITAv2 execution bundle: kernel + venue + control + Zinc + projection. @@ -337,6 +369,8 @@ def _build_runtime(*, phase: PinkPhase) -> PinkDirectRuntime: persistence=persistence, market_state_runtime=market_state_runtime, hz_state_writer=hz_state_writer, + asset_picker=asset_picker, + alpha_sizer=alpha_sizer, ) diff --git a/prod/supervisor/dolphin-supervisord.conf b/prod/supervisor/dolphin-supervisord.conf index f3432eb..5bac60f 100644 --- a/prod/supervisor/dolphin-supervisord.conf +++ b/prod/supervisor/dolphin-supervisord.conf @@ -51,10 +51,14 @@ environment=PYTHONPATH="/mnt/dolphinng5_predict/prod:/mnt/dolphinng5_predict/pro [program:nautilus_trader] ; BLUE live-mainnet Nautilus trader. Canonical path is /mnt/dolphinng5_predict/prod/. ; DO NOT point this at /tmp/blue_runtime_mirror/ — /tmp is volatile and wiped on reboot. +; autorestart=true (2026-06-10): pairs with the in-process scan-flow watchdog, +; which exits 86 when the scan path stalls (3 silent deaf-trader incidents on +; 2026-06-09). Operator stop via supervisorctl still sticks — autorestart only +; applies to unexpected process death, not STOPPED state. command=/home/dolphin/siloqy_env/bin/python3 /mnt/dolphinng5_predict/prod/nautilus_event_trader.py directory=/mnt/dolphinng5_predict/prod autostart=false -autorestart=false +autorestart=true startsecs=10 startretries=3 stopwaitsecs=30 @@ -66,7 +70,7 @@ stdout_logfile_backups=10 stderr_logfile=%(ENV_DOLPHIN_LOG_ROOT)s/supervisor/nautilus_trader-error.log stderr_logfile_maxbytes=50MB stderr_logfile_backups=10 -environment=PYTHONPATH="/mnt/dolphinng5_predict:/mnt/dolphinng5_predict/nautilus_dolphin:/mnt/dolphinng5_predict/prod",DOLPHIN_LOCAL_RUNTIME_ROOT="/mnt/dolphinng5_predict",PREFECT_API_URL="http://localhost:4200/api",PYTHONUNBUFFERED="1",DOLPHIN_DATA_VENUE="BINANCE",DOLPHIN_EXEC_VENUE="BINGX",DOLPHIN_BINGX_ENV="LIVE",DOLPHIN_BINGX_ALLOW_MAINNET="1",DOLPHIN_TRADER_ID="DOLPHIN-BINGX-001",DOLPHIN_BINGX_DEFAULT_LEVERAGE="1",DOLPHIN_BINGX_PREFER_WEBSOCKET="1",DOLPHIN_VOL_P60_THRESHOLD="0.00026414",DOLPHIN_ENABLE_ADVANCED_SL_LIVE="1",DOLPHIN_CATASTROPHIC_FLOOR_PCT="0.0120",DOLPHIN_OVERLAY_CATASTROPHIC_FLOOR_PCT="0.0050",DOLPHIN_OVERLAY_CATASTROPHIC_MAX_LOSS_USD="500",DOLPHIN_OVERLAY_ADVSL_LIVE="1",DOLPHIN_OVERLAY_ADVSL_MIN_BARS="6",DOLPHIN_OVERLAY_ADVSL_MFE_MAX_PCT="0.0020",DOLPHIN_OVERLAY_ADVSL_PRESSURE_MIN="1.85",DOLPHIN_OVERLAY_ADVSL_MAE_RISK_MIN="0.50",DOLPHIN_TRADE_LOG_DIR="/tmp/dolphin_logs/trader",DOLPHIN_MC_MODELS_DIR="" +environment=PYTHONPATH="/mnt/dolphinng5_predict:/mnt/dolphinng5_predict/nautilus_dolphin:/mnt/dolphinng5_predict/prod",DOLPHIN_LOCAL_RUNTIME_ROOT="/mnt/dolphinng5_predict",PREFECT_API_URL="http://localhost:4200/api",PYTHONUNBUFFERED="1",DOLPHIN_DATA_VENUE="BINANCE",DOLPHIN_EXEC_VENUE="BINGX",DOLPHIN_BINGX_ENV="LIVE",DOLPHIN_BINGX_ALLOW_MAINNET="1",DOLPHIN_TRADER_ID="DOLPHIN-BINGX-001",DOLPHIN_BINGX_DEFAULT_LEVERAGE="1",DOLPHIN_BINGX_PREFER_WEBSOCKET="1",DOLPHIN_VOL_P60_THRESHOLD="0.00026414",DOLPHIN_ENABLE_ADVANCED_SL_LIVE="1",DOLPHIN_CATASTROPHIC_FLOOR_PCT="0.0120",DOLPHIN_OVERLAY_CATASTROPHIC_FLOOR_PCT="0.0050",DOLPHIN_OVERLAY_CATASTROPHIC_MAX_LOSS_USD="500",DOLPHIN_OVERLAY_ADVSL_LIVE="1",DOLPHIN_OVERLAY_ADVSL_MIN_BARS="6",DOLPHIN_OVERLAY_ADVSL_MFE_MAX_PCT="0.0020",DOLPHIN_OVERLAY_ADVSL_PRESSURE_MIN="1.85",DOLPHIN_OVERLAY_ADVSL_MAE_RISK_MIN="0.50",DOLPHIN_TRADE_LOG_DIR="/tmp/dolphin_logs/trader",DOLPHIN_MC_MODELS_DIR="",DOLPHIN_BINGX_BASE_URL_BACKUP="https://open-api.bingx.com" rlimit_as=2GB rlimit_nofile=1024 @@ -100,7 +104,7 @@ stdout_logfile=%(ENV_DOLPHIN_LOG_ROOT)s/supervisor/dolphin_live_pink.log stdout_logfile_maxbytes=50MB stdout_logfile_backups=10 redirect_stderr=true -environment=PYTHONPATH="/mnt/dolphinng5_predict:/mnt/dolphinng5_predict/nautilus_dolphin",PREFECT_API_URL="http://localhost:4200/api",PYTHONUNBUFFERED="1",DOLPHIN_DATA_VENUE="BINANCE",DOLPHIN_EXEC_VENUE="BINGX",DOLPHIN_BINGX_ENV="VST",DOLPHIN_BINGX_ALLOW_MAINNET="0",DOLPHIN_TRADER_ID="DOLPHIN-PINK-001",DOLPHIN_BINGX_DEFAULT_LEVERAGE="1",DOLPHIN_BINGX_PREFER_WEBSOCKET="1",DOLPHIN_BINGX_RECV_WINDOW_MS="60000",DOLPHIN_PINK_PHASE="single_leg",DOLPHIN_PINK_VOL_P60_THRESHOLD="0.00008000" +environment=PYTHONPATH="/mnt/dolphinng5_predict:/mnt/dolphinng5_predict/nautilus_dolphin",PREFECT_API_URL="http://localhost:4200/api",PYTHONUNBUFFERED="1",DOLPHIN_DATA_VENUE="BINANCE",DOLPHIN_EXEC_VENUE="BINGX",DOLPHIN_BINGX_ENV="VST",DOLPHIN_BINGX_ALLOW_MAINNET="0",DOLPHIN_TRADER_ID="DOLPHIN-PINK-001",DOLPHIN_BINGX_DEFAULT_LEVERAGE="1",DOLPHIN_BINGX_PREFER_WEBSOCKET="1",DOLPHIN_BINGX_RECV_WINDOW_MS="60000",DOLPHIN_PINK_PHASE="single_leg",DOLPHIN_PINK_VOL_P60_THRESHOLD="0.00008000",DOLPHIN_BINGX_BASE_URL_BACKUP="https://open-api-vst.bingx.com",DOLPHIN_PINK_EXEC_STYLE="maker_both",DOLPHIN_PINK_MAKER_ENTRY_TTL_S="8",DOLPHIN_PINK_MAKER_EXIT_TTL_S="5",DOLPHIN_PINK_MAKER_ENTRY_MISS="retry",DOLPHIN_PINK_MAKER_ENTRY_RETRIES="1",DOLPHIN_PINK_MAKER_RETRY_EXHAUST="skip",DOLPHIN_PINK_MAKER_OFFSET_TICKS="1",DOLPHIN_PINK_POST_ONLY="1" ; DITAv2 — supervised kernel, launched separately from the legacy PINK/BLUE stack. [program:dita_v2]