"""Node-free PINK runtime built on DITAv2 kernel + BingX venue adapter. The kernel owns the single-slot FSM, AccountProjection, and event normalization. This module translates policy-layer Decision/Intent into KernelIntent and reads final state from the kernel's slot + account snapshot. Capital is seeded from exchange balance at startup/recovery then maintained by kernel.account.settle() on close — no balance-poll overwrites during the hot loop. """ from __future__ import annotations import asyncio import inspect import json import logging import math import time from dataclasses import dataclass, field, replace from datetime import datetime, timezone from pathlib import Path from types import SimpleNamespace from typing import Any, Callable, Optional from prod.clean_arch.dita import ( Decision, DecisionAction, DecisionConfig, DecisionContext, DecisionEngine, Intent, IntentContext, IntentEngine, TradeSide as LegacyTradeSide, ) from prod.clean_arch.dita_v2.contracts import ( KernelCommandType, KernelDiagnosticCode, KernelIntent, TradeSide as DitaTradeSide, TradeStage, ) from prod.clean_arch.dita_v2.exec_router import ( ExecConfig, ExecutionRouter, MissAction, ) from prod.clean_arch.dita_v2.rust_backend import ExecutionKernel from prod.clean_arch.persistence import PinkClickHousePersistence from prod.clean_arch.ports.data_feed import DataFeedPort, MarketSnapshot LOGGER = logging.getLogger(__name__) def _slot_to_position_dict(slot) -> dict[str, Any]: """Convert a DITAv2 TradeSlot into a simple position dict compatible with the persistence layer's expected shape.""" if slot is None: return {} return { "trade_id": slot.trade_id, "asset": slot.asset, "side": slot.side.value, "entry_price": float(slot.entry_price or 0.0), "entry_time": slot.entry_time.isoformat() if hasattr(slot.entry_time, "isoformat") else str(slot.entry_time), "size": float(slot.size or 0.0), "initial_size": float(slot.initial_size or 0.0), "leverage": float(slot.leverage or 0.0), "realized_pnl": float(slot.realized_pnl or 0.0), "unrealized_pnl": float(slot.unrealized_pnl or 0.0), "closed": bool(slot.closed), "close_reason": slot.close_reason or "", "fsm_state": slot.fsm_state.value, "exit_leg_ratios": list(slot.exit_leg_ratios), "active_leg_index": int(slot.active_leg_index or 0), "active_exit_order": dict(slot.active_exit_order.to_dict()) if slot.active_exit_order and hasattr(slot.active_exit_order, "to_dict") else ({"status": slot.active_exit_order.status.value, "venue_order_id": slot.active_exit_order.venue_order_id} if slot.active_exit_order else None), "active_entry_order": dict(slot.active_entry_order.to_dict()) if slot.active_entry_order and hasattr(slot.active_entry_order, "to_dict") else ({"status": slot.active_entry_order.status.value, "venue_order_id": slot.active_entry_order.venue_order_id} if slot.active_entry_order else None), } # Industry-smallest sane quote price. notional (capital × fraction × leverage) # is self-limiting; the only unbounded step is size = notional / price, which # overflows to inf as price -> 0. Any real perp quote is far above this floor, # so a price below it (or non-finite) signals corrupt market data, not a trade. _MIN_SANE_PRICE = 1e-8 # Path for kernel state persistence (crash recovery + session continuity). _KERNEL_STATE_PATH = Path("/tmp/.pink_kernel_state.json") def _decision_to_kernel_intent( decision: Decision, intent: Intent, slot_id: int = 0, ) -> KernelIntent: """Translate policy-layer Decision/Intent into a DITAv2 KernelIntent. The action map is: ENTER -> KernelCommandType.ENTER EXIT -> KernelCommandType.EXIT HOLD -> KernelCommandType.MARK_PRICE """ action_map = { DecisionAction.ENTER: KernelCommandType.ENTER, DecisionAction.EXIT: KernelCommandType.EXIT, DecisionAction.HOLD: KernelCommandType.MARK_PRICE, } side = ( DitaTradeSide.SHORT if intent.side == LegacyTradeSide.SHORT else DitaTradeSide.LONG ) return KernelIntent( timestamp=decision.timestamp, intent_id=decision.decision_id, trade_id=intent.trade_id, slot_id=slot_id, asset=intent.asset, side=side, action=action_map.get(decision.action, KernelCommandType.MARK_PRICE), reference_price=float(decision.reference_price or intent.reference_price or 0.0), target_size=float(intent.target_size or 0.0), leverage=float(intent.leverage or 1.0), exit_leg_ratios=tuple(intent.exit_leg_ratios), reason=intent.reason, metadata=dict(intent.metadata or {}), ) def _persist_kernel_snapshot(kernel, log: logging.Logger) -> None: """Write full kernel state to disk after each settled fill (G5 snapshot-on-fill).""" try: state_json = kernel.save_state() _KERNEL_STATE_PATH.write_text(state_json, encoding="utf-8") except Exception as exc: log.warning("kernel snapshot persist failed (non-fatal): %s", exc) def _restore_kernel_snapshot(kernel, log: logging.Logger) -> bool: """On startup, restore kernel state from disk if account is flat (no open positions). Returns True if a snapshot was found and successfully restored. """ if not _KERNEL_STATE_PATH.exists(): return False try: state_json = _KERNEL_STATE_PATH.read_text(encoding="utf-8") meta = json.loads(state_json) # Sanity check: only restore if the saved snapshot had no open trades. saved_slots = meta.get("slots", []) open_at_save = [s for s in saved_slots if s.get("fsm_state") not in (None, "", "IDLE", "CLOSED")] if open_at_save: log.warning( "kernel snapshot has %d open slot(s) at save time — " "skipping restore (must be flat for safe handoff)", len(open_at_save), ) return False ok = kernel.restore_state(state_json) if ok: log.info("kernel state restored from %s (fee_calibration + account preserved)", _KERNEL_STATE_PATH) else: log.warning("kernel restore_state rejected snapshot (version or slot mismatch)") return ok except Exception as exc: log.warning("kernel snapshot restore failed (non-fatal): %s", exc) return False def _reconcile_position_slot( kernel: ExecutionKernel, exchange_balance_capital: float, slot_id: int = 0, ) -> None: """Synchronise a single kernel slot from the venue's open positions. This is called at startup/recovery to make the kernel state match the exchange. It also seeds the kernel's AccountProjection.capital from the exchange balance — the single place where an external balance snapshot writes capital. """ venue = kernel.venue try: positions = venue.open_positions() if hasattr(venue, "open_positions") else [] except Exception: positions = [] # Build TradeSlot[] from exchange positions from prod.clean_arch.dita_v2.contracts import TradeSlot, TradeSide _log = logging.getLogger(__name__) reconciled = [] if positions: for row in positions if isinstance(positions, list) else ( list(positions.values()) if isinstance(positions, dict) else []): raw_side = str(row.get("positionSide") or row.get("side") or "").upper() raw_qty = 0.0 for key in ("positionAmt", "positionQty", "positionSize", "quantity", "pa", "qty"): try: raw_qty = float(row.get(key) or 0.0) except Exception: continue if raw_qty != 0.0: break if abs(raw_qty) <= 1e-12: continue qty = abs(raw_qty) entry = 0.0 for key in ("entryPrice", "avgPrice", "avgEntryPrice", "ep", "ap", "price"): try: entry = float(row.get(key) or 0.0) except Exception: continue if entry > 0: break mark = 0.0 for key in ("markPrice", "mark", "price"): try: mark = float(row.get(key) or 0.0) except Exception: continue if mark > 0: break if mark <= 0: mark = entry lev = float(row.get("leverage") or row.get("lev") or 1.0) side = TradeSide.SHORT if raw_side in {"SHORT", "SELL"} or raw_qty < 0 else TradeSide.LONG asset = str(row.get("symbol") or row.get("symbolName") or "") trade_id = asset # use asset as trade ID for exchange-led recovery slot = TradeSlot( slot_id=slot_id, trade_id=trade_id, asset=asset, side=side, entry_price=entry if entry > 0 else mark, size=qty, initial_size=qty, leverage=lev if lev > 0 else 1.0, entry_time=datetime.now(timezone.utc), fsm_state=TradeStage.POSITION_OPEN, metadata={"reconciled_from_exchange": True}, ) reconciled.append(slot) if reconciled: if len(reconciled) > 1: # Single-slot kernel: multiple open positions = orphan contamination from # prior retry-duplicate bug. Take the largest by size so the kernel can # exit it; the rest must be flattened manually before restart. reconciled.sort(key=lambda s: float(s.size or 0), reverse=True) orphan_syms = [s.asset for s in reconciled[1:]] _log.error( "RECONCILE WARNING: %d BingX positions found for single slot_id=%d. " "Taking largest (%s size=%.4f). ORPHANS IGNORED (must flatten manually): %s", len(reconciled), slot_id, reconciled[0].asset, float(reconciled[0].size or 0), orphan_syms, ) reconciled = reconciled[:1] kernel.reconcile_from_slots(reconciled) else: # No open positions — ensure slot is idle kernel.reconcile_from_slots([]) # Seed capital once from exchange balance — E-anchored. if exchange_balance_capital > 0: kernel.account.anchor_to_exchange( wallet_balance=exchange_balance_capital, available_margin=exchange_balance_capital, event_seq=0, ) @dataclass class PinkDirectRuntime: """Drive DITAv2 kernel against BingX exchange and a market data feed. The kernel owns the FSM and account projection. This runtime provides the policy loop: data feed -> decision engine -> intent engine -> kernel intent -> outcome -> persistence. """ data_feed: DataFeedPort kernel: ExecutionKernel decision_engine: DecisionEngine intent_engine: IntentEngine persistence: Optional[PinkClickHousePersistence] = None market_state_runtime: Any = None event_sink: Optional[Callable[[dict[str, Any]], None]] = None logger: Any = LOGGER # Non-blocking Hz state writer (None = Hz unavailable; PINK trades regardless) hz_state_writer: Any = field(default=None, repr=False, compare=False) # Account stream state — managed by connect/disconnect, not init args _account_stream_task: Optional[asyncio.Task] = field( default=None, init=False, repr=False, compare=False ) _enter_frozen: bool = field(default=False, init=False, repr=False, compare=False) # Last known posture — carried into Hz writes for TUI/algo monitoring _last_posture: str = field(default="APEX", init=False, repr=False, compare=False) # Scan-derived fields for Hz writes and DC gate _last_scan_number: int = field(default=0, init=False, repr=False, compare=False) _last_vel_div: float = field(default=0.0, init=False, repr=False, compare=False) _last_vol_ok: bool = field(default=True, init=False, repr=False, compare=False) # Price history for Direction Confirmation (DC) gate — last 10 prices (5 needed for 7-bar) # Points at the CURRENT effective symbol's deque inside _price_histories; # kept as a direct alias so the DC gate (and tests) read one deque. _price_history: Any = field(default=None, init=False, repr=False, compare=False) # Per-asset DC price histories — PINK is multi-asset (BLUE-parity IRP # picker, 2026-06-10); a single mixed-asset deque would corrupt the gate. _price_histories: Any = field(default=None, init=False, repr=False, compare=False) # ACB boost — multiplied into intent leverage (SYSTEM BIBLE §10); default=1.0 (no-op) _last_acb_boost: float = field(default=1.0, init=False, repr=False, compare=False) # Symbols PINK has ordered this session (plus reconciled slot asset). # Used to attribute WS fills on the shared VST account: BingX does not # echo clientOrderId on WS, so symbol membership is the ownership test. _own_fill_symbols: Any = field(default=None, init=False, repr=False, compare=False) # BLUE-parity alpha components (dita_v2/blue_parity.py, 2026-06-10). # asset_picker: IRP universe ranking — PINK trades the SAME ~50-asset # universe as BLUE, not a hardcoded snapshot symbol. None = legacy # single-symbol behavior. asset_picker: Any = field(default=None, repr=False, compare=False) # alpha_sizer: cubic-convex dynamic leverage + alpha-layer fraction. # Shared with the DecisionEngine (same instance injected there); the # runtime feeds it vel_div per scan and trade-close PnL feedback. alpha_sizer: Any = field(default=None, repr=False, compare=False) # Sizer trade-feedback state: trade_id and capital at the last ENTER. _sizer_open_tid: str = field(default="", init=False, repr=False, compare=False) _sizer_entry_capital: float = field(default=0.0, init=False, repr=False, compare=False) # Execution router (maker/taker policy). Injectable for tests; built from # env at connect() when None. None/style=taker == legacy MARKET behavior. exec_router: Any = field(default=None, repr=False, compare=False) _exec_ttl_task: Optional[asyncio.Task] = field(default=None, init=False, repr=False, compare=False) # trade_id → submitted KernelIntent for working maker quotes (retry/fallback rebuild) _working_intents: Any = field(default=None, init=False, repr=False, compare=False) # Monotonic ts of the last OWN fill seen on the WS account stream. The TTL # handler refuses to re-quote within the hot window after any own fill — # REST venue reconcile lags WS fills by seconds (live double-entry 2026-06-10). _last_own_fill_mono: float = field(default=0.0, init=False, repr=False, compare=False) async def connect(self, initial_capital: float = 25000.0) -> None: """Connect data feed, venue, seed capital from exchange, start WS stream.""" from collections import deque self._price_history = deque(maxlen=10) self._price_histories = {} self._own_fill_symbols = set() await self.data_feed.connect() venue = self.kernel.venue # Back-reference for the venue's reconcile/cancel paths (2026-06-10): # lets reconcile() fetch symbol-scoped fills for the slot's asset so # maker fills reach the FSM. Was referenced in bingx_venue but never # wired anywhere — silently dead until today. try: venue._kernel_ref = self.kernel except Exception: pass if hasattr(venue, "connect"): try: result = venue.connect() if inspect.isawaitable(result): await result except Exception as exc: self.logger.warning("Venue connect failed: %s", exc) # BingX is the ledger of record. Fetch wallet balance BEFORE seeding # the kernel so set_seed_capital() and reconcile agree with the exchange. # Using a hardcoded fallback (e.g. 25000) while the VST account holds # 100K+ would cause a ~75K reconcile delta → capital_frozen=True. live_capital = await self._fetch_exchange_wallet_balance(initial_capital) _reconcile_position_slot(self.kernel, live_capital, slot_id=0) try: slot_asset = str(self.kernel.slot(0).asset or "") if self.kernel.max_slots > 0 else "" if slot_asset: self._own_fill_symbols.add(slot_asset.upper()) except Exception: pass # Seed the kernel's atomic K-account from exchange truth. self.kernel.set_seed_capital(live_capital) await self._seed_account_from_exchange() # Restore fee calibration from the previous session if the kernel was flat # at save time. Must be AFTER set_seed_capital so the snapshot can carry # forward fee model parameters. Re-apply live_capital immediately after to # ensure BingX is the ledger of record for capital — the snapshot's capital # is stale (it reflects the exchange balance at the PREVIOUS session's last # fill), whereas live_capital was just fetched from BingX right now. _restore_kernel_snapshot(self.kernel, self.logger) self.kernel.reset_and_seed(live_capital) # zeros stale accumulators; K=E=live_capital # Start WS account stream (primary); poll failover handled inside stream. self._account_stream_task = asyncio.create_task( self._run_account_stream(), name="pink_account_stream" ) # Execution router: maker/taker policy layer. Built from env unless a # test injected one. With style=taker (default) it is a pure pass- # through and the TTL loop has nothing to do — legacy behavior. if self.exec_router is None: self.exec_router = ExecutionRouter(ExecConfig.from_env(), logger=self.logger) self._working_intents = {} self.logger.info("EXEC_ROUTER: style=%s entry_ttl=%.1fs exit_ttl=%.1fs " "miss=%s retries=%d exhaust=%s post_only=%s", self.exec_router.config.style, self.exec_router.config.entry_ttl_s, self.exec_router.config.exit_ttl_s, self.exec_router.config.entry_miss, self.exec_router.config.entry_retries, self.exec_router.config.retry_exhaust, self.exec_router.config.post_only) self._exec_ttl_task = asyncio.create_task( self._exec_ttl_loop(), name="pink_exec_ttl" ) async def disconnect(self) -> None: if self._exec_ttl_task is not None: self._exec_ttl_task.cancel() try: await self._exec_ttl_task except asyncio.CancelledError: pass self._exec_ttl_task = None if self._account_stream_task is not None: self._account_stream_task.cancel() try: await self._account_stream_task except asyncio.CancelledError: pass self._account_stream_task = None await self.data_feed.disconnect() venue = self.kernel.venue if hasattr(venue, "disconnect"): try: await venue.disconnect() except Exception: pass # BingX VST/LIVE taker fee schedule. These are the current published rates. # Override via set_exchange_config() if the exchange adjusts them. _BINGX_FEE_CONFIG: dict = field(default_factory=lambda: { "taker_rate": 0.0005, # 0.05% market orders "maker_rate": 0.0002, # 0.02% limit resting "lot_step": 0.001, "tick_size": 0.0001, "funding_interval_secs": 28_800, # 8 h BingX perps }) async def _fetch_exchange_wallet_balance(self, fallback: float) -> float: """Query BingX for the current wallet balance to use as the capital seed. BingX VST (and live) is the ledger of record. Seeding the kernel from a hardcoded constant while the exchange holds a different balance causes a large reconcile delta → capital_frozen=True on every startup. Falls back to *fallback* if the HTTP client is unavailable or the request fails, logging a WARNING so operators know sizing is approximate. """ http_client = self._venue_http_client() if http_client is None: self.logger.warning( "No HTTP client — capital seeded from fallback=%.2f (BingX unreachable)", fallback, ) return fallback try: from prod.clean_arch.dita_v2.bingx_user_stream import BingxUserStream stream = BingxUserStream(http_client=http_client, ws_base_url="") ev = await stream.account_snapshot() balance = float(ev.wallet_balance or 0.0) if balance <= 0: self.logger.warning( "BingX returned wallet_balance=%.2f ≤ 0 — using fallback=%.2f", balance, fallback, ) return fallback self.logger.info( "Capital seeded from BingX ledger: wallet=%.2f (fallback was %.2f)", balance, fallback, ) return balance except Exception as exc: self.logger.warning( "BingX wallet fetch failed (%s) — capital seeded from fallback=%.2f", exc, fallback, ) return fallback async def _seed_account_from_exchange(self) -> None: """ Startup/crash-recovery: 1. Load fee schedule into kernel (enables immediate fee prediction at fills). 2. Fetch recent fill history — run calibration loop to confirm K's fee maths matches exchange actuals before the first ENTER is permitted. 3. REST balance snapshot → E-facts → reconcile. """ http_client = self._venue_http_client() # Step 1: fee schedule — always load regardless of HTTP client self.kernel.set_exchange_config(self._BINGX_FEE_CONFIG) self.logger.info( "Fee model loaded: taker=%.4f%% maker=%.4f%%", self._BINGX_FEE_CONFIG["taker_rate"] * 100, self._BINGX_FEE_CONFIG["maker_rate"] * 100, ) if http_client is None: return try: from prod.clean_arch.dita_v2.bingx_user_stream import BingxUserStream stream = BingxUserStream(http_client=http_client, ws_base_url="") # Step 2: calibration loop — fetch recent fills and validate fee model await self._calibrate_fee_model(http_client) # Step 3: balance/margin E-facts ev = await stream.account_snapshot() result = self.kernel.on_account_event({ "kind": "ACCOUNT_UPDATE", "wallet_balance": ev.wallet_balance, "available_margin": ev.available_margin, "used_margin": ev.used_margin, "maint_margin": ev.maint_margin, }) self.logger.info( "Startup account seeded: wallet=%.2f avail=%.2f " "reconcile=%s delta=%.4f", ev.wallet_balance, ev.available_margin, (result or {}).get("reconcile_status", "?"), (result or {}).get("reconcile_delta", 0.0), ) except Exception as exc: self.logger.warning("Startup exchange snapshot failed: %s", exc) async def _calibrate_fee_model(self, http_client: object) -> None: """ Fetch the most recent closed fill from the exchange and run one calibration pass to confirm K's fee maths vs exchange actuals. Logs the result; does NOT block startup on WARNING — only ERROR triggers a log at ERROR level so operators are alerted. """ try: fills = await http_client.signed_get( # type: ignore[attr-defined] "/openApi/swap/v2/trade/fillHistory", {"limit": 5, "pageIndex": 1}, ) items = fills if isinstance(fills, list) else (fills or {}).get("list") or [] if not items: self.logger.info("Fee calibration: no fill history — skipping") return row = items[0] if isinstance(items[0], dict) else {} fill_price = float(row.get("price") or row.get("tradePrice") or 0.0) fill_qty = float(row.get("qty") or row.get("executedQty") or row.get("volume") or 0.0) actual_fee = abs(float(row.get("commission") or row.get("fee") or 0.0)) if fill_price <= 0 or fill_qty <= 0 or actual_fee <= 0: self.logger.info("Fee calibration: fill row missing price/qty/fee — skipping") return order_type = str(row.get("orderType") or row.get("type") or "MARKET").upper() is_maker = order_type == "LIMIT" # Guard: check raw deviation (ignoring any stale calibration_ratio) before # mutating kernel state. A REST fill with >15% deviation from published rate # poisons calibration_ratio and causes ESTIMATED fees to drift from actuals. raw_rate = ( self._BINGX_FEE_CONFIG.get("maker_rate", 0.0002) if is_maker else self._BINGX_FEE_CONFIG.get("taker_rate", 0.0005) ) raw_expected = fill_price * fill_qty * raw_rate raw_deviation_pct = abs(actual_fee - raw_expected) / raw_expected * 100 if raw_expected > 0 else 0.0 if raw_deviation_pct > 15.0: self.logger.warning( "Fee calibration SKIPPED: REST fill shows %.2f%% deviation from " "published %.4f%% rate (expected=%.6f actual=%.6f). " "Holding calibration_ratio=1.0 to avoid poisoning kernel fee model.", raw_deviation_pct, raw_rate * 100, raw_expected, actual_fee, ) return report = self.kernel.calibrate_fee(fill_price, fill_qty, actual_fee, is_maker=is_maker) status = report.get("calibration_status", "?") log = self.logger.error if status == "ERROR" else self.logger.info log( "Fee calibration: price=%.4f qty=%.4f expected=%.6f actual=%.6f " "ratio=%.4f deviation=%.2f%% status=%s", fill_price, fill_qty, report.get("expected_fee", 0.0), actual_fee, report.get("ratio", 0.0), report.get("deviation_pct", 0.0), status, ) except Exception as exc: self.logger.warning("Fee calibration failed: %s", exc) def _fill_is_ours(self, event: Any) -> bool: """Attribute a WS fill on the shared VST account to PINK or a foreign system (PRODGREEN/BLUE/manual). Order of evidence: 1. clientOrderId prefix "p-" — definitive PINK signature (BingX WS currently does NOT echo it, but honour it if that changes). 2. A non-empty foreign clientOrderId — definitively not ours. 3. Symbol membership in the set of symbols PINK has ordered this session (incl. the reconciled slot asset). PINK trades multiple assets (BTC, TRX, ALGO, …) so this must NOT be a hardcoded symbol. 4. Unattributable (no cid, no symbol) — process it; the ACCOUNT_UPDATE reseed bounds any contamination. """ cid = str(getattr(event, "client_order_id", "") or "") if cid.startswith("p-"): return True if cid: return False sym = str(getattr(event, "symbol", "") or "").upper() if not sym: return True own = self._own_fill_symbols or set() return sym in own async def _run_account_stream(self) -> None: """ Background task: WS stream → kernel.on_account_event() → reconcile gate. Fills fold K-values (realized PnL + fee). ACCOUNT_UPDATE stores E-facts and triggers reconcile; if status==ERROR new ENTERs are frozen until K≈E is restored. Exits never frozen. Funding folds into K-funding_net. """ from prod.clean_arch.dita_v2.bingx_user_stream import BingxUserStream from prod.clean_arch.dita_v2.exchange_event import ExchangeEventKind http_client = self._venue_http_client() ws_url = self._venue_ws_url() if http_client is None: self.logger.warning( "pink_account_stream: no HTTP client on venue — stream disabled" ) return stream = BingxUserStream(http_client=http_client, ws_base_url=ws_url) try: async for event in stream.subscribe(): if event.kind in {ExchangeEventKind.FULL_FILL, ExchangeEventKind.PARTIAL_FILL}: # Skip fills PINK does not own (e.g. PRODGREEN trading on # the shared VST account). BingX WS delivers all fills on # the listen key regardless of origin. Processing a foreign # fill contaminates K without a matching PINK intent, # causing a persistent reconcile delta. Ownership is by # clientOrderId / session symbol set — PINK is multi-asset # (BTC, TRX, ALGO, …), never a hardcoded symbol. if self._fill_is_ours(event): # Latch for the exec-router TTL handler: an own fill # just happened; do NOT trust a stale-flat venue # snapshot to justify re-quoting (2026-06-10 live # double-entry: REST reconcile lagged the WS fill). self._last_own_fill_mono = time.monotonic() if not self._fill_is_ours(event): self.logger.info( "Foreign fill skipped: symbol=%s qty=%s cid=%r oid=%s", event.symbol, event.fill_qty, event.client_order_id, event.order_id, ) continue # Immediately predict+fold fee from model so K tracks E # without waiting for FILL_SETTLED. When FILL_SETTLED # arrives with the actual fee, it replaces the prediction # and recalibrates the fee model. self.kernel.on_account_event({ "kind": "PREDICTED_FILL", "fill_price": event.fill_price, "fill_qty": event.fill_qty, "realized_pnl": event.realized_pnl, "is_maker": event.is_maker, }) # Fold actual fee if WS delivered it (replaces prediction) if event.fee != 0: self.kernel.on_account_event({ "kind": "FILL_SETTLED", "event_id": event.event_id, "realized_pnl": 0.0, # already folded above "fee": event.fee, # negative = rebate "is_maker": event.is_maker, # Slot-level PnL repair plumbing (Phase 3.2): the # kernel repairs a price-less exit leg # (realized_skipped_no_price) from the exchange's # realized figure. Separate field so the K-fold is # never double-counted; slot 0 = single-slot kernel # and the fill ownership filter already passed. "slot_id": 0, "repair_realized_pnl": float(event.realized_pnl or 0.0), }) # Gap 2: log settled fee with WS_SETTLED provenance so # downstream can reconcile against the ESTIMATED_TAKER row. if self.persistence is not None: try: # BingX WS does not echo back our clientOrderId ("c" field # is empty). Read trade_id from the kernel slot instead — # the slot retains its trade_id until the next ENTER. # Store BingX's own orderId as venue_order_id for # bidirectional reconciliation. _venue_order_id = str(event.order_id or "") try: _slot_dict = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} _our_trade_id = str(_slot_dict.get("trade_id") or "") except Exception: _our_trade_id = "" # Fall back: if clientOrderId was echoed (future BingX change) # parse our trade_id prefix from "BTCUSDT-T-N:intent_id" if not _our_trade_id: _c = str(event.client_order_id or "") _our_trade_id = _c.split(":")[0] if ":" in _c else (_c or _venue_order_id) self.persistence.persist_fee_settled( trade_id=_our_trade_id, venue_order_id=_venue_order_id, fee=event.fee, fee_asset=event.fee_asset or "USDT", is_maker=event.is_maker, exchange_ts=event.exchange_ts, ) except Exception as _fee_exc: self.logger.debug("persist_fee_settled failed: %s", _fee_exc) # Persist full kernel state after every settled fill for # crash recovery + session-to-session calibration continuity. _persist_kernel_snapshot(self.kernel, self.logger) elif event.kind == ExchangeEventKind.ACCOUNT_UPDATE: # BingX WS also sends position/margin-only ACCOUNT_UPDATE # frames with no USDT balance entry; the parser yields # wallet_balance=0 for those. They carry no E-facts — # folding them zeroes e_available_margin (available_capital # =0.0) against a stale e_wallet and re-runs reconcile on # stale data (the 2026-06-09 stuck-freeze). Drop them. if not (event.wallet_balance and event.wallet_balance > 0): continue # Re-seed K on every balance-bearing ACCOUNT_UPDATE (poll # gap-backfill AND live WS). The exchange is the ledger of # record; the update carries the post-trade wallet balance # (wb = cash), so seeding from it keeps K ≈ E even when a # shared-account system (e.g. PRODGREEN) trades and moves # wb without PINK making a fill. self.kernel.reset_and_seed(float(event.wallet_balance)) result = self.kernel.on_account_event({ "kind": "ACCOUNT_UPDATE", "wallet_balance": event.wallet_balance, "available_margin": event.available_margin, "used_margin": event.used_margin, "maint_margin": event.maint_margin, }) or {} status = result.get("reconcile_status", "OK") if status == "ERROR": if not self._enter_frozen: self.logger.error( "Account reconcile ERROR — freezing new ENTERs. " "delta=%.4f %s", result.get("reconcile_delta", 0.0), result.get("reconcile_explanation", ""), ) self._enter_frozen = True # Hz write: capital_frozen state changed _slot = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} _acc = self.kernel.snapshot().get("account") or {} self._hz_publish(_slot, _acc) else: if self._enter_frozen: self.logger.info( "Account reconcile %s — unfreezing ENTERs.", status ) self._enter_frozen = False # Hz write: unfreeze is also a state change _slot = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} _acc = self.kernel.snapshot().get("account") or {} self._hz_publish(_slot, _acc) elif event.kind == ExchangeEventKind.FUNDING_FEE: self.kernel.on_account_event({ "kind": "FUNDING_FEE", "funding_amount": event.funding_amount, }) except asyncio.CancelledError: pass except Exception as exc: self.logger.error("pink_account_stream crashed: %s", exc, exc_info=True) finally: await stream.close() def _venue_http_client(self) -> Optional[object]: """Extract the BingxHttpClient from the venue adapter, if available.""" venue = self.kernel.venue backend = getattr(venue, "backend", None) return getattr(backend, "_client", None) def _venue_ws_url(self) -> str: """Return the private WS URL for the configured environment.""" venue = self.kernel.venue backend = getattr(venue, "backend", None) config = getattr(backend, "_config", None) if config is None: return "wss://vst-open-api-ws.bingx.com/swap-market" explicit = getattr(config, "base_url_ws_private", None) if explicit: return str(explicit) try: from prod.bingx.urls import get_private_ws_url url = get_private_ws_url(config.environment) return str(url) if url else "wss://vst-open-api-ws.bingx.com/swap-market" except Exception: return "wss://vst-open-api-ws.bingx.com/swap-market" def _emit(self, phase: str, **fields: Any) -> None: if self.event_sink is not None: payload = {"phase": phase, **fields} self.event_sink(payload) @staticmethod def _scan_payload_prices( scan_payload: dict[str, Any] | None, fallback_symbol: str, fallback_price: float, ) -> dict[str, float]: payload = scan_payload or {} assets = payload.get("assets") or [] prices = payload.get("asset_prices") or [] out: dict[str, float] = {} if isinstance(assets, list) and isinstance(prices, list): for asset, price in zip(assets, prices): try: px = float(price) except Exception: continue if px > 0: out[str(asset).upper()] = px if not out and fallback_symbol and fallback_price > 0: out[str(fallback_symbol).upper()] = float(fallback_price) return out def _update_market_state_runtime( self, snapshot: MarketSnapshot ) -> dict[str, Any]: runtime = self.market_state_runtime scan_payload = ( snapshot.scan_payload if isinstance(snapshot.scan_payload, dict) else {} ) if runtime is None or not scan_payload: return {} try: prices_dict = self._scan_payload_prices( scan_payload, snapshot.symbol, snapshot.price ) bundle = runtime.update_scan_state( scan_payload=scan_payload, prices_dict=prices_dict, scan_number=int( scan_payload.get("scan_number") or snapshot.scan_number or 0 ), vel_div=float( scan_payload.get("vel_div") or snapshot.velocity_divergence or 0.0 ), v50_vel=float(scan_payload.get("w50_velocity") or 0.0), v750_vel=float(scan_payload.get("w750_velocity") or 0.0), vol_ok=bool(scan_payload.get("vol_ok", True)), posture=str(scan_payload.get("posture") or "APEX"), exf_snapshot=scan_payload.get("exf_snapshot") if isinstance(scan_payload.get("exf_snapshot"), dict) else None, esof_payload=scan_payload.get("esof_payload") if isinstance(scan_payload.get("esof_payload"), dict) else None, ) # Track scan-derived fields for Hz writes and DC gate self._last_posture = str(scan_payload.get("posture") or "APEX") self._last_vel_div = float(scan_payload.get("vel_div") or scan_payload.get("velocity_divergence") or 0.0) self._last_vol_ok = bool(scan_payload.get("vol_ok", True)) self._last_scan_number = int(scan_payload.get("scan_number") or snapshot.scan_number or 0) # ACB boost — read from scan_payload (scan bridge may embed it) or Hz direct acb_data = scan_payload.get("acb_boost") or {} if isinstance(acb_data, dict) and "boost" in acb_data: self._last_acb_boost = max(0.1, float(acb_data.get("boost", 1.0))) else: # Fall back to Hz direct read (non-blocking — features_map is blocking proxy) try: feed = getattr(self.data_feed, "features_map", None) if feed is not None: raw = feed.get("acb_boost") if raw: import json as _json d = _json.loads(raw) self._last_acb_boost = max(0.1, float(d.get("boost", 1.0))) except Exception: pass # Hz read failure must never affect trading return dict( getattr(runtime, "latest_bundle_dict", {}) or bundle.as_dict() ) except Exception: return {} def _dc_contradicts(self, lookback: int = 7, min_bps: float = 0.75) -> bool: """Direction Confirmation gate (SYSTEM BIBLE §4.2, champion config). Returns True if the price over the last `lookback` ticks ROSE by ≥ min_bps bps (0.75 bps). A rising price contradicts a SHORT signal → block ENTER. Champion: dc_skip_contradicts=True, dc_leverage_boost=1.0 (no boost). """ hist = self._price_history if hist is None or len(hist) < lookback + 1: return False # not enough history → NEUTRAL, allow entry p0 = hist[-lookback - 1] p1 = hist[-1] if p0 <= 0: return False chg_bps = (p1 - p0) / p0 * 10_000.0 return chg_bps > min_bps # rising price → CONTRADICT → skip def _hz_publish(self, slot_dict: dict, acc: dict) -> None: """Fire-and-forget Hz write after any kernel state change. Computes system leverage (our_leverage = notional/capital) for the Hz snapshot — PINK/BLUE dual-leverage invariant: system leverage reflects real margin utilisation; exchange leverage (1-3x cap) is set at BingX API level. """ if self.hz_state_writer is None: return try: size = float(slot_dict.get("size") or 0.0) ep = float(slot_dict.get("entry_price") or 0.0) capital = float(acc.get("capital") or 0.0) our_leverage = (size * ep / capital) if capital > 1e-10 else 0.0 self.hz_state_writer.write_engine_snapshot( slot_dict, acc, posture=self._last_posture, our_leverage=our_leverage, scan_number=self._last_scan_number, vel_div=self._last_vel_div, vol_ok=self._last_vol_ok, ) except Exception: pass # ── Execution-router drivers ───────────────────────────────────────────── # The router (dita_v2/exec_router.py) is pure policy; these methods are # the only place its decisions touch the kernel/venue. Every await is # followed by a state re-check: fills race cancels on a live venue. _SLOT_OPENISH = ("PARTIAL_FILL", "POSITION_OPENED", "POSITION_OPEN", "EXIT_REQUESTED", "EXIT_SENT", "EXIT_ACKED", "EXIT_WORKING", "POSITION_PARTIALLY_CLOSED") def _exec_slot_view(self) -> tuple[str, str, float]: """(trade_id, fsm_state_name, size) of slot 0 — tolerant of mocks.""" try: slot = self.kernel.slot(0) except Exception: return "", "", 0.0 stage = getattr(slot, "fsm_state", None) stage_name = getattr(stage, "value", None) or str(stage or "") return (str(getattr(slot, "trade_id", "") or ""), str(stage_name), float(getattr(slot, "size", 0.0) or 0.0)) def _exec_plan_for(self, decision: Any, kernel_intent: KernelIntent, snapshot: Any) -> Any: router = self.exec_router if router is None: return None try: ref = float(getattr(kernel_intent, "reference_price", 0.0) or 0.0) side = getattr(kernel_intent, "side", None) side_name = getattr(side, "value", None) or str(side or "SHORT") if decision.action == DecisionAction.ENTER: return router.plan_entry( trade_id=kernel_intent.trade_id, asset=kernel_intent.asset, position_side=side_name, reference_price=ref, ) return router.plan_exit( trade_id=kernel_intent.trade_id, asset=kernel_intent.asset, position_side=side_name, reference_price=ref, reason=str(getattr(decision, "reason", "") or ""), ) except Exception as exc: # Router failure must never block trading — degrade to legacy taker. self.logger.warning("EXEC_ROUTER plan failed (%s) — taker fallback", exc) return None def _exec_after_submit(self, plan: Any, kernel_intent: KernelIntent, outcome: Any) -> None: """Classify a maker submit: filled now, resting, or rejected. Resting and rejected both register as working — a rejected post-only quote registers with an already-expired deadline so the TTL loop resolves it through the one shared miss/fallback path within a tick. """ router = self.exec_router if router is None: return try: tid = kernel_intent.trade_id slot_tid, stage, size = self._exec_slot_view() filled = ( (plan.action == "ENTER" and slot_tid == tid and size > 0.0 and stage in self._SLOT_OPENISH) or (plan.action == "EXIT" and ( slot_tid != tid or stage in ("POSITION_CLOSED", "CLOSED", "TRADE_TERMINAL_WRITTEN", "IDLE"))) ) if filled: self._emit("exec_router", event="immediate_fill", action=plan.action, trade_id=tid, reason=plan.reason) return rejected = stage in ("ORDER_REJECTED", "EXIT_REJECTED") wo = router.register_working( trade_id=tid, asset=kernel_intent.asset, position_side=(getattr(kernel_intent.side, "value", None) or str(kernel_intent.side)), plan=plan, base_trade_id=plan.metadata.get("base_trade_id") or tid, retry_n=int(plan.metadata.get("retry_n", 0) or 0), ) self._working_intents[tid] = kernel_intent if rejected: wo.deadline = router.clock() # resolve immediately via TTL loop self._emit("exec_router", event="working", action=plan.action, trade_id=tid, limit_price=plan.limit_price, ttl_s=plan.ttl_s, rejected=rejected, reason=plan.reason) self.logger.info("EXEC_ROUTER working %s %s @ %.10g ttl=%.1fs%s", plan.action, tid, plan.limit_price, plan.ttl_s, " (post-only REJECTED — instant resolve)" if rejected else "") except Exception as exc: self.logger.warning("EXEC_ROUTER after-submit failed: %s", exc) async def _exec_cancel_working(self, trade_id: str, *, reason: str) -> None: """Cancel a working quote via the kernel (idempotent on the venue).""" router = self.exec_router intent = (self._working_intents or {}).get(trade_id) if router is None or router.working(trade_id) is None: return try: base = intent cancel_intent = KernelIntent( timestamp=datetime.now(timezone.utc), intent_id=f"{trade_id}-cxl", trade_id=trade_id, slot_id=0, asset=(base.asset if base is not None else ""), side=(base.side if base is not None else DitaTradeSide.SHORT), action=KernelCommandType.CANCEL, reference_price=(base.reference_price if base is not None else 0.0), target_size=(base.target_size if base is not None else 0.0), leverage=(base.leverage if base is not None else 1.0), reason=f"exec_router:{reason}", ) await self.kernel.process_intent_async(cancel_intent) except Exception as exc: self.logger.warning("EXEC_ROUTER cancel %s failed: %s", trade_id, exc) router.note_cancel(trade_id) (self._working_intents or {}).pop(trade_id, None) self._emit("exec_router", event="cancel", trade_id=trade_id, reason=reason) def _exec_safe_to_requote(self, wo: Any) -> bool: """True only when the venue is provably flat for a re-quote. Fails SAFE (returns False) on: a recent own fill (REST reconcile lags WS fills by seconds), any live exchange position, or any probe error. """ if time.monotonic() - (self._last_own_fill_mono or 0.0) < 5.0: return False try: venue = self.kernel.venue rows = venue.open_positions() if hasattr(venue, "open_positions") else [] for row in rows or []: qty = abs(float(row.get("positionAmt") or row.get("positionQty") or row.get("qty") or 0.0)) if qty > 1e-9: return False except Exception as exc: self.logger.warning("EXEC_ROUTER requote probe failed (%s) — fail safe", exc) return False return True async def _exec_ttl_loop(self) -> None: """1 s sweep: resolve expired maker quotes (fill-check → cancel → miss policy / exit escalation). Scan cadence (~10 s) is too coarse for 5–8 s TTLs, hence the dedicated task.""" try: while True: await asyncio.sleep(1.0) router = self.exec_router if router is None: continue for wo in router.expired(): try: await self._handle_expired_working(wo) except Exception as exc: self.logger.error("EXEC_ROUTER expiry handler failed for %s: %s", wo.trade_id, exc, exc_info=True) except asyncio.CancelledError: raise async def _handle_expired_working(self, wo: Any) -> None: router = self.exec_router if router is None or router.working(wo.trade_id) is None: return # already resolved (fill/cancel notification raced us) # 1. Drain any late venue events first — the quote may already be filled. await self.pump_venue_events() if router.working(wo.trade_id) is None: return def _entry_filled() -> bool: slot_tid, stage, size = self._exec_slot_view() return slot_tid == wo.trade_id and size > 0.0 and stage in self._SLOT_OPENISH def _exit_done() -> bool: slot_tid, stage, size = self._exec_slot_view() return (slot_tid != wo.trade_id or size <= 0.0 or stage in ("POSITION_CLOSED", "CLOSED", "TRADE_TERMINAL_WRITTEN", "IDLE")) # 2. Cancel the quote (idempotent; CANCEL_REJECT on a filled order is # harmless). For a partially-filled entry this cancels the remainder. intent = (self._working_intents or {}).get(wo.trade_id) try: cancel_intent = KernelIntent( timestamp=datetime.now(timezone.utc), intent_id=f"{wo.trade_id}-ttlcxl", trade_id=wo.trade_id, slot_id=0, asset=wo.asset, side=(intent.side if intent is not None else DitaTradeSide.SHORT), action=KernelCommandType.CANCEL, reference_price=(intent.reference_price if intent is not None else 0.0), target_size=(intent.target_size if intent is not None else 0.0), leverage=(intent.leverage if intent is not None else 1.0), reason="exec_router:ttl_expired", ) await self.kernel.process_intent_async(cancel_intent) except Exception as exc: self.logger.warning("EXEC_ROUTER ttl-cancel %s failed: %s", wo.trade_id, exc) await self.pump_venue_events() # 3. Re-classify after the cancel round-trip (fill may have raced it). if wo.action == "ENTER" and _entry_filled(): router.note_fill(wo.trade_id) (self._working_intents or {}).pop(wo.trade_id, None) self._emit("exec_router", event="fill_after_ttl", trade_id=wo.trade_id) return if wo.action == "EXIT" and _exit_done(): router.note_fill(wo.trade_id) (self._working_intents or {}).pop(wo.trade_id, None) self._emit("exec_router", event="fill_after_ttl", trade_id=wo.trade_id) return router.note_cancel(wo.trade_id) base_intent = (self._working_intents or {}).pop(wo.trade_id, None) # 4. EXIT: never strand a position — escalate to MARKET, same trade_id. if wo.action == "EXIT": _tid, plan = router.market_fallback_plan(wo) if base_intent is not None and not _exit_done(): market_exit = replace( base_intent, intent_id=f"{wo.trade_id}-mkt", order_type="MARKET", limit_price=0.0, timestamp=datetime.now(timezone.utc), metadata={**(base_intent.metadata or {}), "_time_in_force": "GTC", "_exec_reason": plan.reason}, ) market_exit = self._exit_intent_from_slot(market_exit) self.logger.warning("EXEC_ROUTER exit TTL → MARKET fallback %s", wo.trade_id) self._emit("exec_router", event="exit_market_fallback", trade_id=wo.trade_id) await self.kernel.process_intent_async(market_exit) await self.pump_venue_events() return # 5. ENTER miss policy: skip | retry (bounded) | market. action = router.entry_miss_action(wo) self._emit("exec_router", event="entry_miss", trade_id=wo.trade_id, action=action, retry_n=wo.retry_n) if action == MissAction.SKIP or base_intent is None: self.logger.info("EXEC_ROUTER entry miss %s → skip", wo.trade_id) return slot_tid, stage, size = self._exec_slot_view() if size > 0.0 or stage in self._SLOT_OPENISH: # Slot occupied (raced fill of remainder / another trade) — never # double-enter. self.logger.warning("EXEC_ROUTER entry miss %s: slot busy (%s) — skip", wo.trade_id, stage) return # Venue-truth gate (live double-entry fix, 2026-06-10): the kernel slot # can show flat while the venue holds a position the REST reconcile has # not surfaced yet. Re-quote ONLY when the venue is provably flat AND # no own fill landed in the hot window. Ambiguity → skip; a skipped # entry is always safe, a doubled position is not. if not self._exec_safe_to_requote(wo): self.logger.warning( "EXEC_ROUTER entry miss %s: venue not provably flat " "(recent own fill or live position) — skip requote", wo.trade_id) self._emit("exec_router", event="requote_blocked", trade_id=wo.trade_id) return ref = 0.0 try: # Multi-asset: prefer the working order's OWN asset history; the # alias deque may track a different effective symbol. hist = None if self._price_histories is not None: hist = self._price_histories.get(str(wo.asset or "").upper()) if not hist: hist = self._price_history if hist: ref = float(hist[-1]) except Exception: ref = 0.0 if ref <= 0.0: ref = float(wo.plan.limit_price or 0.0) if action == MissAction.RETRY: new_tid, plan = router.retry_plan(wo, reference_price=ref) else: # MARKET new_tid, plan = router.market_fallback_plan(wo) new_intent = replace( base_intent, trade_id=new_tid, intent_id=new_tid, timestamp=datetime.now(timezone.utc), reference_price=(ref if ref > 0.0 else base_intent.reference_price), order_type=plan.order_type, limit_price=float(plan.limit_price or 0.0), metadata={**(base_intent.metadata or {}), "_time_in_force": ("PostOnly" if plan.post_only else "GTC"), "_exec_reason": plan.reason}, ) self.logger.info("EXEC_ROUTER entry %s → %s as %s", wo.trade_id, action, new_tid) outcome = await self.kernel.process_intent_async(new_intent) if plan.is_maker: self._exec_after_submit(plan, new_intent, outcome) await self.pump_venue_events() async def pump_venue_events( self, snapshot: Any | None = None, *, market_state: Any = None ) -> int: """Drain late (async) venue fills into the kernel and persist the result. Resting LIMIT and partial fills arrive *after* the submitting ``process_intent`` returns. This calls ``venue.reconcile()`` and feeds each event to ``kernel.on_venue_event`` so capital settles and the FSM advances; the kernel dedups duplicates via ``seen_event_ids`` / ``_last_settled_pnl`` (no double-settle). Only events the kernel actually applied (accepted, not DUPLICATE_EVENT) are persisted, via the two-phase result-logger. Capital authority stays ``kernel.account``. Returns the number of applied events. """ venue = self.kernel.venue reconcile = getattr(venue, "reconcile", None) if reconcile is None: return 0 try: events = reconcile() if inspect.isawaitable(events): events = await events except Exception as exc: self.logger.warning("Venue reconcile failed: %s", exc) return 0 events = list(events or []) if not events: return 0 applied: list[Any] = [] for event in events: try: outcome = self.kernel.on_venue_event(event) except Exception as exc: self.logger.warning("on_venue_event failed: %s", exc) continue if getattr(outcome, "accepted", False) and getattr( outcome, "diagnostic_code", None ) != KernelDiagnosticCode.DUPLICATE_EVENT: applied.append(event) # Resolve working maker quotes against applied venue truth so the # TTL loop never cancels an order the venue already terminalised. if self.exec_router is not None: for event in applied: tid = str(getattr(event, "trade_id", "") or "") if not tid or self.exec_router.working(tid) is None: continue kind = getattr(getattr(event, "kind", None), "value", "") or str( getattr(event, "kind", "") or "") status = getattr(getattr(event, "status", None), "value", "") or str( getattr(event, "status", "") or "") if "FULL_FILL" in kind or status == "FILLED": self.exec_router.note_fill(tid) (self._working_intents or {}).pop(tid, None) elif "CANCEL_ACK" in kind or status in ("CANCELED", "CANCELLED"): # Venue-side cancel (incl. post-only reject surfacing via # reconcile): do NOT drop the working order here — pull its # deadline to now so the TTL loop resolves it through the # one shared miss/escalation path (retry/market/skip per # config; exits always escalate to MARKET). wo = self.exec_router.working(tid) if wo is not None: wo.deadline = self.exec_router.clock() if applied and self.persistence is not None: slot_dict = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} persist_snapshot = snapshot if persist_snapshot is None: persist_snapshot = SimpleNamespace( timestamp=datetime.now(timezone.utc), symbol=str(slot_dict.get("asset", "")), ) self.persistence.persist_fill_events( snapshot=persist_snapshot, events=applied, slot_dict=slot_dict, market_state=market_state or {}, ) # Hz write after fills settle — slot FSM and capital may have changed acc = self.kernel.snapshot().get("account") or {} self._hz_publish(slot_dict, acc) return len(applied) def _unsafe_entry_reason(self, kernel_intent: KernelIntent, context: Any) -> Optional[str]: # Exits are never frozen — only new ENTERs are blocked on reconcile ERROR. if getattr(self, "_enter_frozen", False): return "account reconcile ERROR — new ENTERs frozen until K≈E restored" # SINGLE-SLOT INVARIANT (2026-06-10, after two live double-entries): # never ENTER while the exchange shows ANY open position. A filled # maker entry vanishes from openOrders; the symbol-less reconcile # fetches no fills, reads the disappearance as a cancel, frees the # slot, and the next scan re-enters → 2x position. The exchange # position list is the truth that survives that misclassification. if time.monotonic() - (getattr(self, "_last_own_fill_mono", 0.0) or 0.0) < 5.0: return "own fill within hot window — entry deferred until state settles" try: venue = self.kernel.venue rows = venue.open_positions() if hasattr(venue, "open_positions") else [] for row in rows or []: qty = abs(float(row.get("positionAmt") or row.get("positionQty") or row.get("qty") or 0.0)) if qty > 1e-9: return (f"exchange holds open position " f"({row.get('symbol')} qty={qty}) — single-slot ENTER blocked") except Exception as exc: self.logger.warning("entry venue-position probe failed (%s) — blocking ENTER", exc) return "venue position probe failed — fail safe, no ENTER without proof" """Return why an ENTER's sizing inputs are unsafe, or None if sound. notional = capital × fraction × leverage is self-limiting; the only way size = notional/price goes non-finite is a corrupt raw input. We reject the OPEN (not clamp) because a corrupt sizing input is an untrustworthy signal — better to skip the trade than open on bad math. """ cap = float(getattr(context, "capital", 0.0) or 0.0) price = float(getattr(kernel_intent, "reference_price", 0.0) or 0.0) lev = float(getattr(kernel_intent, "leverage", 0.0) or 0.0) size = float(getattr(kernel_intent, "target_size", 0.0) or 0.0) if not math.isfinite(cap) or cap <= 0.0: return f"non-finite/non-positive capital={cap!r}" if not math.isfinite(price) or price < _MIN_SANE_PRICE: return f"price below sane floor or non-finite price={price!r} (floor={_MIN_SANE_PRICE:g})" if not math.isfinite(lev) or lev <= 0.0: return f"non-finite/non-positive leverage={lev!r}" if not math.isfinite(size) or size <= 0.0: return f"non-finite/non-positive size={size!r}" return None def _exit_intent_from_slot(self, kernel_intent: KernelIntent) -> KernelIntent: """Size an EXIT from the kernel's authoritative slot accounting. The close quantity is the real remaining position size (capped to it), never an externally-computed value — so a malformed policy size can neither strand a position (refuse to close) nor overshoot it. A non-finite policy size falls back to the full remaining size. """ try: slot_size = float(self.kernel.slot(int(kernel_intent.slot_id)).size or 0.0) except Exception: slot_size = 0.0 policy_size = float(getattr(kernel_intent, "target_size", 0.0) or 0.0) policy_ok = math.isfinite(policy_size) and policy_size > 0.0 if slot_size > 0.0: # Authoritative remaining size known: cap the close to it (and fall # back to the full remaining if the policy size is malformed). exit_size = min(policy_size, slot_size) if policy_ok else slot_size else: # Kernel reports no/unknown remaining size: trust the policy size # (the kernel rejects NO_OPEN_POSITION if there is genuinely none). exit_size = policy_size if policy_ok else 0.0 return replace(kernel_intent, target_size=exit_size) def _effective_snapshot(self, snapshot: MarketSnapshot) -> tuple[MarketSnapshot, str]: """Retarget the regime snapshot onto the TRADE asset (BLUE parity). The data feed delivers the BTC-anchored eigen scan (regime signal: vel_div, irp). The TRADE asset is a separate concern — BLUE selects it per signal from the scan's ~50-asset universe via IRP (SYSTEM BIBLE §5). Rules, in priority order: 1. Slot occupied → the slot's asset, priced from the universe payload. Exits MUST evaluate the position's own price, never the regime anchor's. 2. Flat + picker warm → IRP top candidate for the SHORT regime. 3. Flat + picker cold / no candidate → entries suppressed (returned block reason; BLUE has no BTC fallback when IRP is enabled). Returns (snapshot, enter_block_reason). Empty reason = entries OK. Also feeds the picker/sizer one observation per NEW scan (deduped on scan_number inside the components). """ import dataclasses as _dc payload = snapshot.scan_payload if isinstance(snapshot.scan_payload, dict) else {} scan_number = int(payload.get("scan_number") or snapshot.scan_number or 0) vel_div = payload.get("vel_div") if vel_div is None: vel_div = snapshot.velocity_divergence if self.alpha_sizer is not None and vel_div is not None: try: self.alpha_sizer.observe(vel_div, scan_number) except Exception: pass if self.asset_picker is None: return snapshot, "" try: self.asset_picker.observe(payload, scan_number) except Exception as exc: self.logger.warning("asset picker observe failed: %s", exc) # 1. Open/working slot → follow its asset. slot_asset = "" try: if self.kernel.max_slots > 0: slot = self.kernel.slot(0) if not slot.is_free(): slot_asset = str(getattr(slot, "asset", "") or "").upper() except Exception: slot_asset = "" if slot_asset: if slot_asset == str(snapshot.symbol).upper(): return snapshot, "" px = self.asset_picker.price_of(slot_asset) if px is None or px <= 0: # No universe price for the slot asset (e.g. adopted stray). # Exits must not be evaluated at the anchor's price — a BTC # price against a stray's entry price would fire a bogus SL. self.logger.error( "no universe price for open slot asset %s — policy step " "degraded to HOLD (exit eval needs the asset's own price)", slot_asset) return snapshot, f"all:no price for open slot asset {slot_asset}" return _dc.replace(snapshot, symbol=slot_asset, price=float(px)), "" # 2./3. Flat → IRP pick. if not self.asset_picker.warm: return snapshot, "IRP picker warming up — universe history incomplete" choice = None try: choice = self.asset_picker.pick(direction=-1) except Exception as exc: self.logger.warning("asset picker rank failed: %s", exc) if choice is None: return snapshot, "no IRP candidate passed gates (BLUE: no fallback asset)" asset, px, ars = choice asset = str(asset).upper() if asset == str(snapshot.symbol).upper(): return snapshot, "" return _dc.replace(snapshot, symbol=asset, price=float(px)), "" def _sizer_trade_feedback(self, acc: dict, slot_dict: dict) -> None: """Close-out detection → feed realized PnL into the alpha layers. PnL is sourced from the closing slot's realized_pnl (kernel estimate, overridden by exchange FILL_SETTLED when available) — NOT the capital delta, which absorbs funding, fees of other activity, and foreign fills from the shared VST account (PRODGREEN collision class). Bucket/streak multipliers only need sign and rough magnitude. """ if self.alpha_sizer is None or not self._sizer_open_tid: return open_tid = str(slot_dict.get("trade_id") or "") if slot_dict else "" still_open = ( open_tid == self._sizer_open_tid and float(slot_dict.get("size") or 0.0) > 0 and not slot_dict.get("closed", False) ) if still_open: return # Phase 3: slot.realized_pnl is the trade's own PnL (no capital-delta # contamination from funding, foreign fills, or other-activity fees). pnl = float(slot_dict.get("realized_pnl") or 0.0) # Subtract accumulated fees when available (fees_paid on slot metadata) fees = float(slot_dict.get("fees_paid", 0.0) or slot_dict.get("metadata", {}).get("fees_paid", 0.0) or 0.0) pnl = pnl - fees self._sizer_open_tid = "" try: self.alpha_sizer.record_close(pnl) self.logger.info("alpha sizer feedback: trade closed pnl=%.4f (rp=%.4f fees=%.4f)", pnl, float(slot_dict.get("realized_pnl") or 0.0), fees) except Exception: pass async def step(self, snapshot: MarketSnapshot) -> Decision: """Single policy + execution cycle. 0. Pump late (async) venue fills into the kernel (LIMIT/partial settle) 1. Update market state 2. Decide (policy layer) 3. Plan (intent layer) 4. Translate to KernelIntent -> kernel.process_intent_async() 5. Read final slot + account state from kernel 6. Persist """ market_state = self._update_market_state_runtime(snapshot) # BLUE-parity retarget (2026-06-10): regime signal stays BTC-anchored, # the TRADE asset comes from the slot (exits) or the IRP picker # (entries). Must run BEFORE the fill pump / policy so every consumer # below sees the trade asset's own price. snapshot, _enter_block = self._effective_snapshot(snapshot) # Drain any late fills BEFORE the policy reads slot/account state, so a # resting LIMIT that filled since the last cycle is reflected. await self.pump_venue_events(snapshot, market_state=market_state) acc = self.kernel.snapshot()["account"] slot_view = self.kernel.slot(0) if self.kernel.max_slots > 0 else None slot_dict = slot_view.to_dict() if slot_view is not None else {} is_open = slot_dict and slot_dict.get("size", 0) > 0 and not slot_dict.get("closed", False) # Convert the kernel slot dict into a TradePosition for the legacy # decision/intent engines. legacy_position = None if is_open: from prod.clean_arch.dita import TradePosition, TradeSide as LS legacy_position = TradePosition( trade_id=slot_dict.get("trade_id", ""), asset=slot_dict.get("asset", ""), side=LS.SHORT if slot_dict.get("side", "").upper() in ("SHORT", "SELL") else LS.LONG, entry_price=float(slot_dict.get("entry_price", 0.0)), entry_time=datetime.now(timezone.utc), size=float(slot_dict.get("size", 0.0)), leverage=float(slot_dict.get("leverage", 1.0)), entry_velocity_divergence=float(slot_dict.get("entry_velocity_divergence", 0.0)), entry_irp_alignment=float(slot_dict.get("entry_irp_alignment", 0.0)), current_price=float(slot_dict.get("entry_price", 0.0)), initial_size=float(slot_dict.get("initial_size", 0.0)), exit_leg_ratios=tuple(slot_dict.get("exit_leg_ratios", [1.0])), # Carry the kernel's authoritative leg progression so the intent # engine consumes the CORRECT exit-leg ratio. The legacy position # is rebuilt every step; without this exit_leg_index resets to 0 # and every leg uses ratio[0] — under-closing each leg and leaving # a residual (kernel believes flat, exchange does not). exit_leg_index=int(slot_dict.get("active_leg_index", 0) or 0), closed=False, ) # Sizer feedback: detect a trade that closed since the last cycle. self._sizer_trade_feedback(acc, slot_dict) # Price history for DC gate — per effective asset (multi-asset since # 2026-06-10). _price_history aliases the current asset's deque so the # DC gate below reads a single-asset series. if self._price_histories is not None: from collections import deque as _deque _sym = str(snapshot.symbol or "").upper() self._price_history = self._price_histories.setdefault(_sym, _deque(maxlen=10)) if self._price_history is not None and snapshot.price and snapshot.price > 0: self._price_history.append(float(snapshot.price)) context = DecisionContext( # E-provided available_capital when present (E rules); K-fallback otherwise. capital=float(acc.get("available_capital") or acc.get("capital", 0.0)), open_positions=int(acc.get("open_positions", 0)), trade_seq=int(acc.get("trade_seq", 0)), ) # DC gate (Direction Confirmation, SYSTEM BIBLE §4.2): # Check BEFORE DecisionEngine so a CONTRADICT voids the ENTER without # touching the kernel. Champion params: 7-tick lookback, 0.75 bps threshold. # dc_skip_contradicts = True → rising price during short window = HOLD. dc_blocked = self._dc_contradicts() decision = self.decision_engine.decide(snapshot, context, legacy_position) if _enter_block: _block_all = _enter_block.startswith("all:") if decision.action == DecisionAction.ENTER or ( _block_all and decision.action == DecisionAction.EXIT ): import dataclasses decision = dataclasses.replace(decision, action=DecisionAction.HOLD, reason="ASSET_PICKER_BLOCK") self.logger.info("action blocked by asset picker: %s (vel_div=%.4f scan=%d)", _enter_block, self._last_vel_div, self._last_scan_number) if dc_blocked and decision.action == DecisionAction.ENTER: import dataclasses decision = dataclasses.replace(decision, action=DecisionAction.HOLD_DC_CONTRADICTED, reason="DC_CONTRADICT") self.logger.info("DC CONTRADICT: ENTER blocked (vel_div=%.4f scan=%d symbol=%s)", self._last_vel_div, self._last_scan_number, getattr(snapshot, "symbol", "?")) self._emit("decision", decision=decision) intent_context = IntentContext( capital=context.capital, open_positions=context.open_positions, trade_seq=context.trade_seq, ) plan = self.intent_engine.plan(decision, intent_context, legacy_position) intent = plan.intent # ACB boost (SYSTEM BIBLE §10): multiply intent leverage by the current boost # factor from acb_processor_service. Capped at the STRATEGY max leverage # (decision config) — intent.leverage is fractional conviction; the # integer exchange cap is applied separately at the venue boundary # (map_internal_conviction_to_exchange_leverage). The old hardcoded # min(3.0, …) silently clamped BLUE-parity conviction. if self._last_acb_boost != 1.0 and intent is not None: import dataclasses as _dc _lev_cap = float(getattr(self.decision_engine.config, "max_leverage", 3.0) or 3.0) boosted_lev = min(_lev_cap, max(0.5, float(intent.leverage or 1.0) * self._last_acb_boost)) intent = _dc.replace(intent, leverage=boosted_lev) if decision.action in {DecisionAction.ENTER, DecisionAction.EXIT}: kernel_intent = _decision_to_kernel_intent(decision, intent, slot_id=0) if decision.action == DecisionAction.ENTER: # Source guard: notional (capital×fraction×leverage) is self- # limiting, so a non-finite size can only come from corrupt raw # inputs — a non-finite capital, or a price below the industry # floor that overflows size = notional/price. A corrupt sizing # input is an untrustworthy signal: do NOT open (exits are never # suppressed — they size from slot accounting below). unsafe = self._unsafe_entry_reason(kernel_intent, context) if unsafe is not None: self.logger.error( "ENTER suppressed (%s): price=%r capital=%r size=%r leverage=%r " "floor=%g asset=%s", unsafe, getattr(kernel_intent, "reference_price", None), context.capital, getattr(kernel_intent, "target_size", None), getattr(kernel_intent, "leverage", None), _MIN_SANE_PRICE, intent.asset, ) sp = float(getattr(snapshot, "price", 0.0) or 0.0) if math.isfinite(sp) and sp >= _MIN_SANE_PRICE: self.kernel.mark_price(snapshot.symbol, sp) slot_dict = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} acc = self.kernel.snapshot()["account"] if self.persistence is not None: self.persistence.persist_step( snapshot=snapshot, decision=decision, intent=intent, outcome=None, slot_dict=slot_dict, acc_dict=acc, phase="entry_suppressed", market_state=market_state, ) return decision else: # EXIT: size the close from the kernel's authoritative slot # accounting so a malformed policy size can never strand or # overshoot an open position. kernel_intent = self._exit_intent_from_slot(kernel_intent) # ── Execution router: decide HOW this intent reaches the venue ── # (taker MARKET vs post-only maker LIMIT). Policy only — sizing, # signal and TP logic above are untouched. plan=None or # style=taker leaves kernel_intent exactly as built (legacy path). exec_plan = self._exec_plan_for(decision, kernel_intent, snapshot) if exec_plan is not None and exec_plan.suppress: # Duplicate guard: a working maker quote already represents # this action. Do not double-submit. self.logger.info("EXEC_ROUTER suppress %s %s: %s", decision.action.value, kernel_intent.trade_id, exec_plan.reason) self._emit("exec_router", event="suppress", action=decision.action.value, trade_id=kernel_intent.trade_id, reason=exec_plan.reason) return decision if exec_plan is not None and exec_plan.metadata.get("preempt_working"): # Urgent exit preempting a resting maker exit: cancel the # quote first so the MARKET close cannot double-fill. await self._exec_cancel_working(kernel_intent.trade_id, reason="urgent_exit_preempt") if exec_plan is not None and exec_plan.order_type == "LIMIT": kernel_intent = replace( kernel_intent, order_type="LIMIT", limit_price=float(exec_plan.limit_price), metadata={ **(kernel_intent.metadata or {}), "_time_in_force": "PostOnly" if exec_plan.post_only else "GTC", "_exec_reason": exec_plan.reason, }, ) # Register the symbol BEFORE the order can fill so the account # stream attributes the resulting WS fill to PINK. try: if intent.asset and self._own_fill_symbols is not None: self._own_fill_symbols.add(str(intent.asset).upper()) except Exception: pass # Alpha-layer feedback: remember which bucket sized this ENTER and # the capital baseline, so the close can be credited back. if decision.action == DecisionAction.ENTER and self.alpha_sizer is not None: try: self.alpha_sizer.note_entry() self._sizer_open_tid = str(kernel_intent.trade_id or "") self._sizer_entry_capital = float(context.capital) except Exception: pass outcome = await self.kernel.process_intent_async(kernel_intent) # Maker quotes that did not terminally fill register as working # so the TTL loop can cancel/retry/fall back. if exec_plan is not None and exec_plan.is_maker: self._exec_after_submit(exec_plan, kernel_intent, outcome) # Locate the source of any non-finite intent the kernel rejected: # log the full upstream provenance (snapshot price, account capital, # leverage, sizing) so a numerical error can be traced to its origin # rather than silently rejected. if outcome.diagnostic_code == KernelDiagnosticCode.INVALID_INTENT: self.logger.error( "INVALID_INTENT rejected by kernel: %s | provenance: " "snapshot.price=%r capital=%r open_positions=%r leverage=%r " "target_size=%r reference_price=%r limit_price=%r action=%s asset=%s", dict(outcome.details or {}), getattr(snapshot, "price", None), context.capital, context.open_positions, getattr(kernel_intent, "leverage", None), getattr(kernel_intent, "target_size", None), getattr(kernel_intent, "reference_price", None), getattr(kernel_intent, "limit_price", None), decision.action.value, intent.asset, ) # Read authoritative final state from kernel. final_slot = self.kernel.slot(0) slot_dict = final_slot.to_dict() acc = self.kernel.snapshot()["account"] self._emit( "execution", decision=decision, intent=intent, outcome_code=outcome.diagnostic_code.value, ) if self.persistence is not None: self.persistence.persist_step( snapshot=snapshot, decision=decision, intent=intent, outcome=outcome, slot_dict=slot_dict, acc_dict=acc, phase="execution", market_state=market_state, ) # Hz write: ENTER/EXIT changed slot FSM — publish updated state self._hz_publish(slot_dict, acc) # On trade close, write daily PnL row if ( self.hz_state_writer is not None and slot_dict.get("closed") ): try: self.hz_state_writer.write_daily_pnl(acc, posture=self._last_posture) except Exception: pass else: # HOLD / no-op: update mark price in kernel. if snapshot.price and snapshot.price > 0: self.kernel.mark_price(snapshot.symbol, snapshot.price) slot_dict = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} acc = self.kernel.snapshot()["account"] if self.persistence is not None: self.persistence.persist_step( snapshot=snapshot, decision=decision, intent=intent, outcome=None, slot_dict=slot_dict, acc_dict=acc, phase="decision", market_state=market_state, ) return decision async def recover( self, snapshot: MarketSnapshot | None = None ) -> dict[str, Any]: """Full recovery — reconcile exchange state into kernel and reseed capital.""" return await self.recover_account( snapshot=snapshot, phase="recovery", event_type="RECOVERY" ) async def recover_account( self, *, snapshot: MarketSnapshot | None = None, phase: str = "recovery", event_type: str = "RECOVERY", ) -> dict[str, Any]: """Reconcile exchange state, reseed capital, and persist recovery row. The kernel's VenueAdapter is sync — all async bridging is handled internally by ``_run()``. We seed capital from the kernel's existing value (which was set at startup) rather than re-polling the exchange. """ capital = float(self.kernel.account.snapshot.capital or 25000.0) _reconcile_position_slot(self.kernel, capital, slot_id=0) acc = self.kernel.snapshot()["account"] if self.persistence is not None: persist_snapshot = snapshot if persist_snapshot is None: persist_snapshot = SimpleNamespace( timestamp=datetime.now(timezone.utc), symbol="" ) market_state = {} if snapshot is not None: market_state = self._update_market_state_runtime(snapshot) self.persistence.persist_recovery_state( snapshot=persist_snapshot, acc_dict=acc, phase=phase, event_type=event_type, market_state=market_state, ) return acc async def reconcile_account( self, snapshot: MarketSnapshot | None = None ) -> dict[str, Any]: """Periodic exchange-led account sync. Tags the recovery path as a scheduled reconciliation. Capital is re-seeded from the exchange balance as a guard against long-running drift, but the primary capital authority remains kernel.settle(). """ return await self.recover_account( snapshot=snapshot, phase="account_reconcile", event_type="ACCOUNT_RECONCILE", )