"""Node-free PINK runtime built on DITAv2 kernel + BingX venue adapter. The kernel owns the single-slot FSM, AccountProjection, and event normalization. This module translates policy-layer Decision/Intent into KernelIntent and reads final state from the kernel's slot + account snapshot. Capital is seeded from exchange balance at startup/recovery then maintained by kernel.account.settle() on close — no balance-poll overwrites during the hot loop. """ from __future__ import annotations import inspect import logging import math from dataclasses import dataclass, replace from datetime import datetime, timezone from types import SimpleNamespace from typing import Any, Callable, Optional from prod.clean_arch.dita import ( Decision, DecisionAction, DecisionConfig, DecisionContext, DecisionEngine, Intent, IntentContext, IntentEngine, TradeSide as LegacyTradeSide, ) from prod.clean_arch.dita_v2.contracts import ( KernelCommandType, KernelDiagnosticCode, KernelIntent, TradeSide as DitaTradeSide, TradeStage, ) from prod.clean_arch.dita_v2.rust_backend import ExecutionKernel from prod.clean_arch.persistence import PinkClickHousePersistence from prod.clean_arch.ports.data_feed import DataFeedPort, MarketSnapshot LOGGER = logging.getLogger(__name__) def _slot_to_position_dict(slot) -> dict[str, Any]: """Convert a DITAv2 TradeSlot into a simple position dict compatible with the persistence layer's expected shape.""" if slot is None: return {} return { "trade_id": slot.trade_id, "asset": slot.asset, "side": slot.side.value, "entry_price": float(slot.entry_price or 0.0), "entry_time": slot.entry_time.isoformat() if hasattr(slot.entry_time, "isoformat") else str(slot.entry_time), "size": float(slot.size or 0.0), "initial_size": float(slot.initial_size or 0.0), "leverage": float(slot.leverage or 0.0), "realized_pnl": float(slot.realized_pnl or 0.0), "unrealized_pnl": float(slot.unrealized_pnl or 0.0), "closed": bool(slot.closed), "close_reason": slot.close_reason or "", "fsm_state": slot.fsm_state.value, "exit_leg_ratios": list(slot.exit_leg_ratios), "active_leg_index": int(slot.active_leg_index or 0), "active_exit_order": dict(slot.active_exit_order.to_dict()) if slot.active_exit_order and hasattr(slot.active_exit_order, "to_dict") else ({"status": slot.active_exit_order.status.value, "venue_order_id": slot.active_exit_order.venue_order_id} if slot.active_exit_order else None), "active_entry_order": dict(slot.active_entry_order.to_dict()) if slot.active_entry_order and hasattr(slot.active_entry_order, "to_dict") else ({"status": slot.active_entry_order.status.value, "venue_order_id": slot.active_entry_order.venue_order_id} if slot.active_entry_order else None), } # Industry-smallest sane quote price. notional (capital × fraction × leverage) # is self-limiting; the only unbounded step is size = notional / price, which # overflows to inf as price -> 0. Any real perp quote is far above this floor, # so a price below it (or non-finite) signals corrupt market data, not a trade. _MIN_SANE_PRICE = 1e-8 def _decision_to_kernel_intent( decision: Decision, intent: Intent, slot_id: int = 0, ) -> KernelIntent: """Translate policy-layer Decision/Intent into a DITAv2 KernelIntent. The action map is: ENTER -> KernelCommandType.ENTER EXIT -> KernelCommandType.EXIT HOLD -> KernelCommandType.MARK_PRICE """ action_map = { DecisionAction.ENTER: KernelCommandType.ENTER, DecisionAction.EXIT: KernelCommandType.EXIT, DecisionAction.HOLD: KernelCommandType.MARK_PRICE, } side = ( DitaTradeSide.SHORT if intent.side == LegacyTradeSide.SHORT else DitaTradeSide.LONG ) return KernelIntent( timestamp=decision.timestamp, intent_id=decision.decision_id, trade_id=intent.trade_id, slot_id=slot_id, asset=intent.asset, side=side, action=action_map.get(decision.action, KernelCommandType.MARK_PRICE), reference_price=float(decision.reference_price or intent.reference_price or 0.0), target_size=float(intent.target_size or 0.0), leverage=float(intent.leverage or 1.0), exit_leg_ratios=tuple(intent.exit_leg_ratios), reason=intent.reason, metadata=dict(intent.metadata or {}), ) def _reconcile_position_slot( kernel: ExecutionKernel, exchange_balance_capital: float, slot_id: int = 0, ) -> None: """Synchronise a single kernel slot from the venue's open positions. This is called at startup/recovery to make the kernel state match the exchange. It also seeds the kernel's AccountProjection.capital from the exchange balance — the single place where an external balance snapshot writes capital. """ venue = kernel.venue try: positions = venue.open_positions() if hasattr(venue, "open_positions") else [] except Exception: positions = [] # Build TradeSlot[] from exchange positions from prod.clean_arch.dita_v2.contracts import TradeSlot, TradeSide reconciled = [] if positions: for row in positions if isinstance(positions, list) else ( list(positions.values()) if isinstance(positions, dict) else []): raw_side = str(row.get("positionSide") or row.get("side") or "").upper() raw_qty = 0.0 for key in ("positionAmt", "positionQty", "positionSize", "quantity", "pa", "qty"): try: raw_qty = float(row.get(key) or 0.0) except Exception: continue if raw_qty != 0.0: break if abs(raw_qty) <= 1e-12: continue qty = abs(raw_qty) entry = 0.0 for key in ("entryPrice", "avgPrice", "avgEntryPrice", "ep", "ap", "price"): try: entry = float(row.get(key) or 0.0) except Exception: continue if entry > 0: break mark = 0.0 for key in ("markPrice", "mark", "price"): try: mark = float(row.get(key) or 0.0) except Exception: continue if mark > 0: break if mark <= 0: mark = entry lev = float(row.get("leverage") or row.get("lev") or 1.0) side = TradeSide.SHORT if raw_side in {"SHORT", "SELL"} or raw_qty < 0 else TradeSide.LONG asset = str(row.get("symbol") or row.get("symbolName") or "") trade_id = asset # use asset as trade ID for exchange-led recovery slot = TradeSlot( slot_id=slot_id, trade_id=trade_id, asset=asset, side=side, entry_price=entry if entry > 0 else mark, size=qty, initial_size=qty, leverage=lev if lev > 0 else 1.0, entry_time=datetime.now(timezone.utc), fsm_state=TradeStage.POSITION_OPEN, metadata={"reconciled_from_exchange": True}, ) reconciled.append(slot) if reconciled: kernel.reconcile_from_slots(reconciled) else: # No open positions — ensure slot is idle kernel.reconcile_from_slots([]) # Seed capital once from exchange balance. if exchange_balance_capital > 0: kernel.account.snapshot.capital = exchange_balance_capital kernel.account.snapshot.peak_capital = max( kernel.account.snapshot.peak_capital, exchange_balance_capital ) kernel.account.snapshot.equity = exchange_balance_capital @dataclass class PinkDirectRuntime: """Drive DITAv2 kernel against BingX exchange and a market data feed. The kernel owns the FSM and account projection. This runtime provides the policy loop: data feed -> decision engine -> intent engine -> kernel intent -> outcome -> persistence. """ data_feed: DataFeedPort kernel: ExecutionKernel decision_engine: DecisionEngine intent_engine: IntentEngine persistence: Optional[PinkClickHousePersistence] = None market_state_runtime: Any = None event_sink: Optional[Callable[[dict[str, Any]], None]] = None logger: Any = LOGGER async def connect(self, initial_capital: float = 25000.0) -> None: """Connect data feed, venue, and seed capital from exchange.""" await self.data_feed.connect() venue = self.kernel.venue # VenueAdapter methods are synchronous (the adapter bridges async # internally via _run). Try connect() if it exists. if hasattr(venue, "connect"): try: result = venue.connect() if inspect.isawaitable(result): await result except Exception as exc: self.logger.warning("Venue connect failed: %s", exc) # Seed capital from env default — the kernel tracks capital via # settle() on close, not from exchange balance polls. _reconcile_position_slot(self.kernel, initial_capital, slot_id=0) async def disconnect(self) -> None: await self.data_feed.disconnect() venue = self.kernel.venue if hasattr(venue, "disconnect"): try: await venue.disconnect() except Exception: pass def _emit(self, phase: str, **fields: Any) -> None: if self.event_sink is not None: payload = {"phase": phase, **fields} self.event_sink(payload) @staticmethod def _scan_payload_prices( scan_payload: dict[str, Any] | None, fallback_symbol: str, fallback_price: float, ) -> dict[str, float]: payload = scan_payload or {} assets = payload.get("assets") or [] prices = payload.get("asset_prices") or [] out: dict[str, float] = {} if isinstance(assets, list) and isinstance(prices, list): for asset, price in zip(assets, prices): try: px = float(price) except Exception: continue if px > 0: out[str(asset).upper()] = px if not out and fallback_symbol and fallback_price > 0: out[str(fallback_symbol).upper()] = float(fallback_price) return out def _update_market_state_runtime( self, snapshot: MarketSnapshot ) -> dict[str, Any]: runtime = self.market_state_runtime scan_payload = ( snapshot.scan_payload if isinstance(snapshot.scan_payload, dict) else {} ) if runtime is None or not scan_payload: return {} try: prices_dict = self._scan_payload_prices( scan_payload, snapshot.symbol, snapshot.price ) bundle = runtime.update_scan_state( scan_payload=scan_payload, prices_dict=prices_dict, scan_number=int( scan_payload.get("scan_number") or snapshot.scan_number or 0 ), vel_div=float( scan_payload.get("vel_div") or snapshot.velocity_divergence or 0.0 ), v50_vel=float(scan_payload.get("w50_velocity") or 0.0), v750_vel=float(scan_payload.get("w750_velocity") or 0.0), vol_ok=bool(scan_payload.get("vol_ok", True)), posture=str(scan_payload.get("posture") or "APEX"), exf_snapshot=scan_payload.get("exf_snapshot") if isinstance(scan_payload.get("exf_snapshot"), dict) else None, esof_payload=scan_payload.get("esof_payload") if isinstance(scan_payload.get("esof_payload"), dict) else None, ) return dict( getattr(runtime, "latest_bundle_dict", {}) or bundle.as_dict() ) except Exception: return {} async def pump_venue_events( self, snapshot: Any | None = None, *, market_state: Any = None ) -> int: """Drain late (async) venue fills into the kernel and persist the result. Resting LIMIT and partial fills arrive *after* the submitting ``process_intent`` returns. This calls ``venue.reconcile()`` and feeds each event to ``kernel.on_venue_event`` so capital settles and the FSM advances; the kernel dedups duplicates via ``seen_event_ids`` / ``_last_settled_pnl`` (no double-settle). Only events the kernel actually applied (accepted, not DUPLICATE_EVENT) are persisted, via the two-phase result-logger. Capital authority stays ``kernel.account``. Returns the number of applied events. """ venue = self.kernel.venue reconcile = getattr(venue, "reconcile", None) if reconcile is None: return 0 try: events = reconcile() if inspect.isawaitable(events): events = await events except Exception as exc: self.logger.warning("Venue reconcile failed: %s", exc) return 0 events = list(events or []) if not events: return 0 applied: list[Any] = [] for event in events: try: outcome = self.kernel.on_venue_event(event) except Exception as exc: self.logger.warning("on_venue_event failed: %s", exc) continue if getattr(outcome, "accepted", False) and getattr( outcome, "diagnostic_code", None ) != KernelDiagnosticCode.DUPLICATE_EVENT: applied.append(event) if applied and self.persistence is not None: slot_dict = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} persist_snapshot = snapshot if persist_snapshot is None: persist_snapshot = SimpleNamespace( timestamp=datetime.now(timezone.utc), symbol=str(slot_dict.get("asset", "")), ) self.persistence.persist_fill_events( snapshot=persist_snapshot, events=applied, slot_dict=slot_dict, market_state=market_state or {}, ) return len(applied) def _unsafe_entry_reason(self, kernel_intent: KernelIntent, context: Any) -> Optional[str]: """Return why an ENTER's sizing inputs are unsafe, or None if sound. notional = capital × fraction × leverage is self-limiting; the only way size = notional/price goes non-finite is a corrupt raw input. We reject the OPEN (not clamp) because a corrupt sizing input is an untrustworthy signal — better to skip the trade than open on bad math. """ cap = float(getattr(context, "capital", 0.0) or 0.0) price = float(getattr(kernel_intent, "reference_price", 0.0) or 0.0) lev = float(getattr(kernel_intent, "leverage", 0.0) or 0.0) size = float(getattr(kernel_intent, "target_size", 0.0) or 0.0) if not math.isfinite(cap) or cap <= 0.0: return f"non-finite/non-positive capital={cap!r}" if not math.isfinite(price) or price < _MIN_SANE_PRICE: return f"price below sane floor or non-finite price={price!r} (floor={_MIN_SANE_PRICE:g})" if not math.isfinite(lev) or lev <= 0.0: return f"non-finite/non-positive leverage={lev!r}" if not math.isfinite(size) or size <= 0.0: return f"non-finite/non-positive size={size!r}" return None def _exit_intent_from_slot(self, kernel_intent: KernelIntent) -> KernelIntent: """Size an EXIT from the kernel's authoritative slot accounting. The close quantity is the real remaining position size (capped to it), never an externally-computed value — so a malformed policy size can neither strand a position (refuse to close) nor overshoot it. A non-finite policy size falls back to the full remaining size. """ try: slot_size = float(self.kernel.slot(int(kernel_intent.slot_id)).size or 0.0) except Exception: slot_size = 0.0 policy_size = float(getattr(kernel_intent, "target_size", 0.0) or 0.0) policy_ok = math.isfinite(policy_size) and policy_size > 0.0 if slot_size > 0.0: # Authoritative remaining size known: cap the close to it (and fall # back to the full remaining if the policy size is malformed). exit_size = min(policy_size, slot_size) if policy_ok else slot_size else: # Kernel reports no/unknown remaining size: trust the policy size # (the kernel rejects NO_OPEN_POSITION if there is genuinely none). exit_size = policy_size if policy_ok else 0.0 return replace(kernel_intent, target_size=exit_size) async def step(self, snapshot: MarketSnapshot) -> Decision: """Single policy + execution cycle. 0. Pump late (async) venue fills into the kernel (LIMIT/partial settle) 1. Update market state 2. Decide (policy layer) 3. Plan (intent layer) 4. Translate to KernelIntent -> kernel.process_intent() 5. Read final slot + account state from kernel 6. Persist """ market_state = self._update_market_state_runtime(snapshot) # Drain any late fills BEFORE the policy reads slot/account state, so a # resting LIMIT that filled since the last cycle is reflected. await self.pump_venue_events(snapshot, market_state=market_state) acc = self.kernel.snapshot()["account"] slot_view = self.kernel.slot(0) if self.kernel.max_slots > 0 else None slot_dict = slot_view.to_dict() if slot_view is not None else {} is_open = slot_dict and slot_dict.get("size", 0) > 0 and not slot_dict.get("closed", False) # Convert the kernel slot dict into a TradePosition for the legacy # decision/intent engines. legacy_position = None if is_open: from prod.clean_arch.dita import TradePosition, TradeSide as LS legacy_position = TradePosition( trade_id=slot_dict.get("trade_id", ""), asset=slot_dict.get("asset", ""), side=LS.SHORT if slot_dict.get("side", "").upper() in ("SHORT", "SELL") else LS.LONG, entry_price=float(slot_dict.get("entry_price", 0.0)), entry_time=datetime.now(timezone.utc), size=float(slot_dict.get("size", 0.0)), leverage=float(slot_dict.get("leverage", 1.0)), entry_velocity_divergence=float(slot_dict.get("entry_velocity_divergence", 0.0)), entry_irp_alignment=float(slot_dict.get("entry_irp_alignment", 0.0)), current_price=float(slot_dict.get("entry_price", 0.0)), initial_size=float(slot_dict.get("initial_size", 0.0)), exit_leg_ratios=tuple(slot_dict.get("exit_leg_ratios", [1.0])), closed=False, ) context = DecisionContext( capital=float(acc.get("capital", 0.0)), open_positions=int(acc.get("open_positions", 0)), trade_seq=int(acc.get("trade_seq", 0)), ) decision = self.decision_engine.decide(snapshot, context, legacy_position) self._emit("decision", decision=decision) intent_context = IntentContext( capital=context.capital, open_positions=context.open_positions, trade_seq=context.trade_seq, ) plan = self.intent_engine.plan(decision, intent_context, legacy_position) intent = plan.intent if decision.action in {DecisionAction.ENTER, DecisionAction.EXIT}: kernel_intent = _decision_to_kernel_intent(decision, intent, slot_id=0) if decision.action == DecisionAction.ENTER: # Source guard: notional (capital×fraction×leverage) is self- # limiting, so a non-finite size can only come from corrupt raw # inputs — a non-finite capital, or a price below the industry # floor that overflows size = notional/price. A corrupt sizing # input is an untrustworthy signal: do NOT open (exits are never # suppressed — they size from slot accounting below). unsafe = self._unsafe_entry_reason(kernel_intent, context) if unsafe is not None: self.logger.error( "ENTER suppressed (%s): price=%r capital=%r size=%r leverage=%r " "floor=%g asset=%s", unsafe, getattr(kernel_intent, "reference_price", None), context.capital, getattr(kernel_intent, "target_size", None), getattr(kernel_intent, "leverage", None), _MIN_SANE_PRICE, intent.asset, ) sp = float(getattr(snapshot, "price", 0.0) or 0.0) if math.isfinite(sp) and sp >= _MIN_SANE_PRICE: self.kernel.mark_price(snapshot.symbol, sp) slot_dict = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} acc = self.kernel.snapshot()["account"] if self.persistence is not None: self.persistence.persist_step( snapshot=snapshot, decision=decision, intent=intent, outcome=None, slot_dict=slot_dict, acc_dict=acc, phase="entry_suppressed", market_state=market_state, ) return decision else: # EXIT: size the close from the kernel's authoritative slot # accounting so a malformed policy size can never strand or # overshoot an open position. kernel_intent = self._exit_intent_from_slot(kernel_intent) outcome = self.kernel.process_intent(kernel_intent) # Locate the source of any non-finite intent the kernel rejected: # log the full upstream provenance (snapshot price, account capital, # leverage, sizing) so a numerical error can be traced to its origin # rather than silently rejected. if outcome.diagnostic_code == KernelDiagnosticCode.INVALID_INTENT: self.logger.error( "INVALID_INTENT rejected by kernel: %s | provenance: " "snapshot.price=%r capital=%r open_positions=%r leverage=%r " "target_size=%r reference_price=%r limit_price=%r action=%s asset=%s", dict(outcome.details or {}), getattr(snapshot, "price", None), context.capital, context.open_positions, getattr(kernel_intent, "leverage", None), getattr(kernel_intent, "target_size", None), getattr(kernel_intent, "reference_price", None), getattr(kernel_intent, "limit_price", None), decision.action.value, intent.asset, ) # Read authoritative final state from kernel. final_slot = self.kernel.slot(0) slot_dict = final_slot.to_dict() acc = self.kernel.snapshot()["account"] self._emit( "execution", decision=decision, intent=intent, outcome_code=outcome.diagnostic_code.value, ) if self.persistence is not None: self.persistence.persist_step( snapshot=snapshot, decision=decision, intent=intent, outcome=outcome, slot_dict=slot_dict, acc_dict=acc, phase="execution", market_state=market_state, ) else: # HOLD / no-op: update mark price in kernel. if snapshot.price and snapshot.price > 0: self.kernel.mark_price(snapshot.symbol, snapshot.price) slot_dict = self.kernel.slot(0).to_dict() if self.kernel.max_slots > 0 else {} acc = self.kernel.snapshot()["account"] if self.persistence is not None: self.persistence.persist_step( snapshot=snapshot, decision=decision, intent=intent, outcome=None, slot_dict=slot_dict, acc_dict=acc, phase="decision", market_state=market_state, ) return decision async def recover( self, snapshot: MarketSnapshot | None = None ) -> dict[str, Any]: """Full recovery — reconcile exchange state into kernel and reseed capital.""" return await self.recover_account( snapshot=snapshot, phase="recovery", event_type="RECOVERY" ) async def recover_account( self, *, snapshot: MarketSnapshot | None = None, phase: str = "recovery", event_type: str = "RECOVERY", ) -> dict[str, Any]: """Reconcile exchange state, reseed capital, and persist recovery row. The kernel's VenueAdapter is sync — all async bridging is handled internally by ``_run()``. We seed capital from the kernel's existing value (which was set at startup) rather than re-polling the exchange. """ capital = float(self.kernel.account.snapshot.capital or 25000.0) _reconcile_position_slot(self.kernel, capital, slot_id=0) acc = self.kernel.snapshot()["account"] if self.persistence is not None: persist_snapshot = snapshot if persist_snapshot is None: persist_snapshot = SimpleNamespace( timestamp=datetime.now(timezone.utc), symbol="" ) market_state = {} if snapshot is not None: market_state = self._update_market_state_runtime(snapshot) self.persistence.persist_recovery_state( snapshot=persist_snapshot, acc_dict=acc, phase=phase, event_type=event_type, market_state=market_state, ) return acc async def reconcile_account( self, snapshot: MarketSnapshot | None = None ) -> dict[str, Any]: """Periodic exchange-led account sync. Tags the recovery path as a scheduled reconciliation. Capital is re-seeded from the exchange balance as a guard against long-running drift, but the primary capital authority remains kernel.settle(). """ return await self.recover_account( snapshot=snapshot, phase="account_reconcile", event_type="ACCOUNT_RECONCILE", )