Files
siloqy/prod/clean_arch/dita/decision.py

210 lines
8.5 KiB
Python
Raw Normal View History

"""Pure decision engine."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from prod.clean_arch.tp_curve import compute_our_leverage, compute_soft_tp_pct
from prod.clean_arch.ports.data_feed import MarketSnapshot
from .contracts import Decision, DecisionAction, DecisionConfig, DecisionContext, TradePosition, TradeSide, TradeStage
@dataclass(frozen=True)
class _SnapshotFields:
price: float
vdiv: float
irp: float
ts: datetime
class DecisionEngine:
"""BLUE-compatible decision engine.
Decision only answers whether the system should enter/hold/exit.
It does not size orders or own exchange state.
"""
def __init__(self, config: Optional[DecisionConfig] = None):
self.config = config or DecisionConfig()
def decide(
self,
snapshot: MarketSnapshot,
context: DecisionContext,
position: Optional[TradePosition] = None,
) -> Decision:
fields = self._extract(snapshot)
if (
not snapshot.is_valid()
or fields.price <= 0
or not self._finite(fields.price)
or not self._finite(fields.vdiv)
or not self._finite(fields.irp)
):
return Decision(
timestamp=fields.ts,
decision_id=self._decision_id(snapshot.symbol, context.trade_seq),
asset=snapshot.symbol,
action=DecisionAction.HOLD,
side=TradeSide.FLAT,
reason="INVALID_SNAPSHOT",
confidence=0.0,
velocity_divergence=fields.vdiv,
irp_alignment=fields.irp,
reference_price=fields.price,
target_size=0.0,
leverage=1.0,
metadata={"policy_version": self.config.policy_version},
)
if position is not None and not position.closed:
return self._decide_exit(snapshot, position, context, fields)
return self._decide_entry(snapshot, context, fields)
def _decide_entry(self, snapshot: MarketSnapshot, context: DecisionContext, fields: _SnapshotFields) -> Decision:
if context.open_positions >= 1:
return self._hold(snapshot, context, fields, reason="CAPACITY_FULL")
if not self.config.allow_short:
return self._hold(snapshot, context, fields, reason="SHORT_DISABLED")
if fields.vdiv >= self.config.vel_div_threshold or fields.irp < self.config.min_irp_alignment:
return self._hold(snapshot, context, fields, reason="NO_SIGNAL")
# vol_ok gate — scan bridge marks low-volume periods; block ENTERs when absent
if snapshot.scan_payload and not snapshot.scan_payload.get("vol_ok", True):
return self._hold(snapshot, context, fields, reason="VOL_GATE")
confidence = min(1.0, max(0.05, abs(fields.vdiv / self.config.vel_div_threshold)))
leverage = min(self.config.max_leverage, max(1.0, 1.0 + confidence * (self.config.max_leverage - 1.0)))
target_exposure = context.capital * self.config.capital_fraction * leverage
target_size = target_exposure / fields.price if fields.price > 0 else 0.0
our_leverage = compute_our_leverage(notional=target_exposure, capital=context.capital)
tp_base_pct = float(self.config.fixed_tp_pct)
tp_effective_pct = compute_soft_tp_pct(tp_base_pct, our_leverage)
return Decision(
timestamp=fields.ts,
decision_id=self._decision_id(snapshot.symbol, context.trade_seq),
asset=snapshot.symbol,
action=DecisionAction.ENTER,
side=TradeSide.SHORT,
reason="STRUCTURAL_DISLOCATION",
confidence=confidence,
velocity_divergence=fields.vdiv,
irp_alignment=fields.irp,
reference_price=fields.price,
target_size=target_size,
leverage=leverage,
metadata={
"policy_version": self.config.policy_version,
"tp_base_pct": tp_base_pct,
"tp_effective_pct": tp_effective_pct,
"our_leverage": our_leverage,
"tp_curve": "soft_leverage_curve_v1",
},
)
def _decide_exit(
self,
snapshot: MarketSnapshot,
position: TradePosition,
context: DecisionContext,
fields: _SnapshotFields,
) -> Decision:
action = DecisionAction.HOLD
reason = "HOLD"
position_notional = position.size * fields.price if fields.price > 0 else position.size * position.entry_price
our_leverage = compute_our_leverage(notional=position_notional, capital=context.capital)
tp_base_pct = float(self.config.fixed_tp_pct)
tp_effective_pct = compute_soft_tp_pct(tp_base_pct, our_leverage)
if position.side == TradeSide.SHORT:
tp_price = position.entry_price * (1.0 - tp_effective_pct)
if fields.price <= tp_price:
action = DecisionAction.EXIT
reason = "TAKE_PROFIT"
elif fields.price >= position.entry_price * (1.0 + (self.config.catastrophic_loss_pct / max(position.leverage, 1.0))):
action = DecisionAction.EXIT
reason = "CATASTROPHIC_LOSS"
elif position.bars_held >= self.config.max_hold_bars:
action = DecisionAction.EXIT
reason = "MAX_HOLD"
elif fields.vdiv >= 0.0:
action = DecisionAction.EXIT
reason = "MEAN_REVERSION"
if position.side == TradeSide.LONG:
tp_price = position.entry_price * (1.0 + tp_effective_pct)
if fields.price >= tp_price:
action = DecisionAction.EXIT
reason = "TAKE_PROFIT"
elif fields.price <= position.entry_price * (1.0 - (self.config.catastrophic_loss_pct / max(position.leverage, 1.0))):
action = DecisionAction.EXIT
reason = "CATASTROPHIC_LOSS"
elif position.bars_held >= self.config.max_hold_bars:
action = DecisionAction.EXIT
reason = "MAX_HOLD"
elif fields.vdiv <= 0.0:
action = DecisionAction.EXIT
reason = "MEAN_REVERSION"
return Decision(
timestamp=fields.ts,
decision_id=position.trade_id,
asset=position.asset,
action=action,
side=position.side,
reason=reason,
confidence=max(0.0, min(1.0, position.entry_irp_alignment)),
velocity_divergence=fields.vdiv,
irp_alignment=fields.irp,
reference_price=fields.price,
target_size=position.size,
leverage=position.leverage,
bars_held=position.bars_held,
stage=TradeStage.EXIT_REQUESTED if action == DecisionAction.EXIT else TradeStage.POSITION_UPDATED,
metadata={
"policy_version": self.config.policy_version,
"tp_base_pct": tp_base_pct,
"tp_effective_pct": tp_effective_pct,
"our_leverage": our_leverage,
"tp_curve": "soft_leverage_curve_v1",
},
)
def _hold(self, snapshot: MarketSnapshot, context: DecisionContext, fields: _SnapshotFields, reason: str) -> Decision:
return Decision(
timestamp=fields.ts,
decision_id=self._decision_id(snapshot.symbol, context.trade_seq),
asset=snapshot.symbol,
action=DecisionAction.HOLD,
side=TradeSide.FLAT,
reason=reason,
confidence=0.0,
velocity_divergence=fields.vdiv,
irp_alignment=fields.irp,
reference_price=fields.price,
target_size=0.0,
leverage=1.0,
metadata={"policy_version": self.config.policy_version},
)
@staticmethod
def _extract(snapshot: MarketSnapshot) -> _SnapshotFields:
ts = snapshot.timestamp if isinstance(snapshot.timestamp, datetime) else datetime.utcnow()
return _SnapshotFields(
price=float(snapshot.price or 0.0),
vdiv=float(snapshot.velocity_divergence or 0.0),
irp=float(snapshot.irp_alignment or 0.0),
ts=ts,
)
@staticmethod
def _decision_id(symbol: str, seq: int) -> str:
return f"{symbol}-D-{seq:012d}"
@staticmethod
def _finite(value: float) -> bool:
return value == value and value not in (float("inf"), float("-inf"))