"""Pure decision engine.""" from __future__ import annotations from dataclasses import dataclass from datetime import datetime from typing import Optional from prod.clean_arch.tp_curve import compute_our_leverage, compute_soft_tp_pct from prod.clean_arch.ports.data_feed import MarketSnapshot from .contracts import Decision, DecisionAction, DecisionConfig, DecisionContext, TradePosition, TradeSide, TradeStage @dataclass(frozen=True) class _SnapshotFields: price: float vdiv: float irp: float ts: datetime class DecisionEngine: """BLUE-compatible decision engine. Decision only answers whether the system should enter/hold/exit. It does not size orders or own exchange state. """ def __init__(self, config: Optional[DecisionConfig] = None): self.config = config or DecisionConfig() def decide( self, snapshot: MarketSnapshot, context: DecisionContext, position: Optional[TradePosition] = None, ) -> Decision: fields = self._extract(snapshot) if ( not snapshot.is_valid() or fields.price <= 0 or not self._finite(fields.price) or not self._finite(fields.vdiv) or not self._finite(fields.irp) ): return Decision( timestamp=fields.ts, decision_id=self._decision_id(snapshot.symbol, context.trade_seq), asset=snapshot.symbol, action=DecisionAction.HOLD, side=TradeSide.FLAT, reason="INVALID_SNAPSHOT", confidence=0.0, velocity_divergence=fields.vdiv, irp_alignment=fields.irp, reference_price=fields.price, target_size=0.0, leverage=1.0, metadata={"policy_version": self.config.policy_version}, ) if position is not None and not position.closed: return self._decide_exit(snapshot, position, context, fields) return self._decide_entry(snapshot, context, fields) def _decide_entry(self, snapshot: MarketSnapshot, context: DecisionContext, fields: _SnapshotFields) -> Decision: if context.open_positions >= 1: return self._hold(snapshot, context, fields, reason="CAPACITY_FULL") if not self.config.allow_short: return self._hold(snapshot, context, fields, reason="SHORT_DISABLED") if fields.vdiv >= self.config.vel_div_threshold or fields.irp < self.config.min_irp_alignment: return self._hold(snapshot, context, fields, reason="NO_SIGNAL") # vol_ok gate — scan bridge marks low-volume periods; block ENTERs when absent if snapshot.scan_payload and not snapshot.scan_payload.get("vol_ok", True): return self._hold(snapshot, context, fields, reason="VOL_GATE") confidence = min(1.0, max(0.05, abs(fields.vdiv / self.config.vel_div_threshold))) leverage = min(self.config.max_leverage, max(1.0, 1.0 + confidence * (self.config.max_leverage - 1.0))) target_exposure = context.capital * self.config.capital_fraction * leverage target_size = target_exposure / fields.price if fields.price > 0 else 0.0 our_leverage = compute_our_leverage(notional=target_exposure, capital=context.capital) tp_base_pct = float(self.config.fixed_tp_pct) tp_effective_pct = compute_soft_tp_pct(tp_base_pct, our_leverage) return Decision( timestamp=fields.ts, decision_id=self._decision_id(snapshot.symbol, context.trade_seq), asset=snapshot.symbol, action=DecisionAction.ENTER, side=TradeSide.SHORT, reason="STRUCTURAL_DISLOCATION", confidence=confidence, velocity_divergence=fields.vdiv, irp_alignment=fields.irp, reference_price=fields.price, target_size=target_size, leverage=leverage, metadata={ "policy_version": self.config.policy_version, "tp_base_pct": tp_base_pct, "tp_effective_pct": tp_effective_pct, "our_leverage": our_leverage, "tp_curve": "soft_leverage_curve_v1", }, ) def _decide_exit( self, snapshot: MarketSnapshot, position: TradePosition, context: DecisionContext, fields: _SnapshotFields, ) -> Decision: action = DecisionAction.HOLD reason = "HOLD" position_notional = position.size * fields.price if fields.price > 0 else position.size * position.entry_price our_leverage = compute_our_leverage(notional=position_notional, capital=context.capital) tp_base_pct = float(self.config.fixed_tp_pct) tp_effective_pct = compute_soft_tp_pct(tp_base_pct, our_leverage) if position.side == TradeSide.SHORT: tp_price = position.entry_price * (1.0 - tp_effective_pct) if fields.price <= tp_price: action = DecisionAction.EXIT reason = "TAKE_PROFIT" elif fields.price >= position.entry_price * (1.0 + (self.config.catastrophic_loss_pct / max(position.leverage, 1.0))): action = DecisionAction.EXIT reason = "CATASTROPHIC_LOSS" elif position.bars_held >= self.config.max_hold_bars: action = DecisionAction.EXIT reason = "MAX_HOLD" elif fields.vdiv >= 0.0: action = DecisionAction.EXIT reason = "MEAN_REVERSION" if position.side == TradeSide.LONG: tp_price = position.entry_price * (1.0 + tp_effective_pct) if fields.price >= tp_price: action = DecisionAction.EXIT reason = "TAKE_PROFIT" elif fields.price <= position.entry_price * (1.0 - (self.config.catastrophic_loss_pct / max(position.leverage, 1.0))): action = DecisionAction.EXIT reason = "CATASTROPHIC_LOSS" elif position.bars_held >= self.config.max_hold_bars: action = DecisionAction.EXIT reason = "MAX_HOLD" elif fields.vdiv <= 0.0: action = DecisionAction.EXIT reason = "MEAN_REVERSION" return Decision( timestamp=fields.ts, decision_id=position.trade_id, asset=position.asset, action=action, side=position.side, reason=reason, confidence=max(0.0, min(1.0, position.entry_irp_alignment)), velocity_divergence=fields.vdiv, irp_alignment=fields.irp, reference_price=fields.price, target_size=position.size, leverage=position.leverage, bars_held=position.bars_held, stage=TradeStage.EXIT_REQUESTED if action == DecisionAction.EXIT else TradeStage.POSITION_UPDATED, metadata={ "policy_version": self.config.policy_version, "tp_base_pct": tp_base_pct, "tp_effective_pct": tp_effective_pct, "our_leverage": our_leverage, "tp_curve": "soft_leverage_curve_v1", }, ) def _hold(self, snapshot: MarketSnapshot, context: DecisionContext, fields: _SnapshotFields, reason: str) -> Decision: return Decision( timestamp=fields.ts, decision_id=self._decision_id(snapshot.symbol, context.trade_seq), asset=snapshot.symbol, action=DecisionAction.HOLD, side=TradeSide.FLAT, reason=reason, confidence=0.0, velocity_divergence=fields.vdiv, irp_alignment=fields.irp, reference_price=fields.price, target_size=0.0, leverage=1.0, metadata={"policy_version": self.config.policy_version}, ) @staticmethod def _extract(snapshot: MarketSnapshot) -> _SnapshotFields: ts = snapshot.timestamp if isinstance(snapshot.timestamp, datetime) else datetime.utcnow() return _SnapshotFields( price=float(snapshot.price or 0.0), vdiv=float(snapshot.velocity_divergence or 0.0), irp=float(snapshot.irp_alignment or 0.0), ts=ts, ) @staticmethod def _decision_id(symbol: str, seq: int) -> str: return f"{symbol}-D-{seq:012d}" @staticmethod def _finite(value: float) -> bool: return value == value and value not in (float("inf"), float("-inf"))