"""Deterministic post-win LONG overlay EFSM. This module does not place orders. It tags future entries after realized BLUE SHORT exhaustion wins so the live/shadow caller can decide whether to flip the next one or more SHORT-engine opportunities to LONG. EFSM means Execution FSM. The EFSM is deliberately slot-based: - a trigger arms N future slots - each future entry consumes exactly one slot - when slots reach zero, state resets to SHORT - flipped LONG trades do not re-arm the overlay - triggers observed while an arm is active are ignored unless explicitly enabled by config That prevents the bug class where a one/two-trade rebound probe becomes a self-extending regime switch. """ from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, Mapping, Optional, Sequence def _to_float(value: Any, default: float = 0.0) -> float: try: out = float(value) except (TypeError, ValueError): return default return out if out == out else default def _to_utc(ts: datetime | None) -> datetime | None: if ts is None: return None if ts.tzinfo is None: return ts.replace(tzinfo=timezone.utc) return ts.astimezone(timezone.utc) @dataclass(frozen=True) class PostWinFlipTrigger: """A configurable trigger that arms future LONG flip slots.""" name: str slots: int min_pnl_abs: float = 0.0 max_pnl_abs: Optional[float] = None min_pnl_pct: Optional[float] = None min_leverage: Optional[float] = None strict_min_pnl_abs: bool = True strict_max_pnl_abs: bool = True strict_min_leverage: bool = True def matches(self, *, pnl: float, pnl_pct: float, leverage: float) -> bool: if self.slots <= 0: return False if self.strict_min_pnl_abs: if not pnl > self.min_pnl_abs: return False elif pnl < self.min_pnl_abs: return False if self.max_pnl_abs is not None: if self.strict_max_pnl_abs: if not pnl < self.max_pnl_abs: return False elif pnl > self.max_pnl_abs: return False if self.min_pnl_pct is not None and pnl_pct < self.min_pnl_pct: return False if self.min_leverage is not None: if self.strict_min_leverage: if not leverage > self.min_leverage: return False elif leverage < self.min_leverage: return False return True @dataclass(frozen=True) class PostWinExecutionFSMConfig: """Configuration for the BLUE post-win Execution FSM.""" enabled: bool = True rules: Sequence[PostWinFlipTrigger] = field( default_factory=lambda: ( # Order matters: the high-leverage big-win rule must win before the # generic big-win rule, otherwise it would be capped at one slot. PostWinFlipTrigger( name="big_win_high_lev", slots=2, min_pnl_abs=397.0, min_leverage=8.6, strict_min_pnl_abs=True, strict_min_leverage=True, ), PostWinFlipTrigger( name="big_win", slots=1, min_pnl_abs=397.0, strict_min_pnl_abs=True, ), PostWinFlipTrigger( name="small_dollar_high_return", slots=1, min_pnl_abs=0.0, max_pnl_abs=250.0, min_pnl_pct=0.0075, strict_min_pnl_abs=True, strict_max_pnl_abs=True, ), ) ) max_arm_age_sec: Optional[float] = None allow_rearm_while_armed: bool = False allow_triggers_from_overlay_flips: bool = False @dataclass(frozen=True) class ActiveFlipArm: """Currently armed future LONG flip slots.""" arm_id: int trigger_name: str slots_total: int slots_remaining: int trigger_trade_id: str = "" trigger_asset: str = "" trigger_ts: datetime | None = None trigger_pnl: float = 0.0 trigger_pnl_pct: float = 0.0 trigger_leverage: float = 0.0 def with_remaining(self, slots_remaining: int) -> "ActiveFlipArm": return ActiveFlipArm( arm_id=self.arm_id, trigger_name=self.trigger_name, slots_total=self.slots_total, slots_remaining=max(0, int(slots_remaining)), trigger_trade_id=self.trigger_trade_id, trigger_asset=self.trigger_asset, trigger_ts=self.trigger_ts, trigger_pnl=self.trigger_pnl, trigger_pnl_pct=self.trigger_pnl_pct, trigger_leverage=self.trigger_leverage, ) def to_dict(self) -> dict[str, Any]: return { "arm_id": self.arm_id, "trigger_name": self.trigger_name, "slots_total": self.slots_total, "slots_remaining": self.slots_remaining, "trigger_trade_id": self.trigger_trade_id, "trigger_asset": self.trigger_asset, "trigger_ts": self.trigger_ts.isoformat() if self.trigger_ts else None, "trigger_pnl": self.trigger_pnl, "trigger_pnl_pct": self.trigger_pnl_pct, "trigger_leverage": self.trigger_leverage, } @dataclass(frozen=True) class OverlayDecision: """Result returned by observe/entry tagging calls.""" action: str side: str = "SHORT" reason: str = "" arm: ActiveFlipArm | None = None consumed_slot: int = 0 reset: bool = False def to_dict(self) -> dict[str, Any]: return { "action": self.action, "side": self.side, "reason": self.reason, "arm": self.arm.to_dict() if self.arm else None, "consumed_slot": self.consumed_slot, "reset": self.reset, } class PostWinExecutionFSM: """Multi-slot post-win LONG tag Execution FSM.""" def __init__(self, config: PostWinExecutionFSMConfig | None = None) -> None: self.config = config or PostWinExecutionFSMConfig() self._arm: ActiveFlipArm | None = None self._next_arm_id = 1 self.ignored_rearm_attempts = 0 self.ignored_overlay_flip_triggers = 0 self.expired_arms = 0 self.consumed_arms = 0 @property def active_arm(self) -> ActiveFlipArm | None: return self._arm @property def pending_slots(self) -> int: return int(self._arm.slots_remaining) if self._arm else 0 def reset(self, reason: str = "manual") -> OverlayDecision: old = self._arm self._arm = None return OverlayDecision(action="RESET", reason=reason, arm=old, reset=True) def observe_closed_trade( self, *, trade_id: str = "", asset: str = "", side: str = "SHORT", pnl: float = 0.0, pnl_pct: float = 0.0, leverage: float = 0.0, closed_ts: datetime | None = None, was_overlay_flip: bool = False, metadata: Mapping[str, Any] | None = None, ) -> OverlayDecision: """Observe a completed trade and possibly arm future LONG slots. Parameters are intentionally primitive so this can be called from live code, replay code, or ClickHouse/log readers. """ del metadata # reserved for future feature logging without API churn self._expire_if_needed(_to_utc(closed_ts)) if not self.config.enabled: return OverlayDecision(action="NOOP", reason="disabled", arm=self._arm) side_u = str(side or "SHORT").upper() if was_overlay_flip or side_u == "LONG": self.ignored_overlay_flip_triggers += 1 return OverlayDecision(action="IGNORED", reason="overlay_flip_outcome", arm=self._arm) pnl_f = _to_float(pnl) pnl_pct_f = _to_float(pnl_pct) lev_f = _to_float(leverage) rule = self._match_rule(pnl=pnl_f, pnl_pct=pnl_pct_f, leverage=lev_f) if rule is None: return OverlayDecision(action="NO_TRIGGER", reason="no_rule_match", arm=self._arm) if self._arm is not None and not self.config.allow_rearm_while_armed: self.ignored_rearm_attempts += 1 return OverlayDecision(action="IGNORED", reason="active_arm_no_rearm", arm=self._arm) arm = ActiveFlipArm( arm_id=self._next_arm_id, trigger_name=rule.name, slots_total=int(rule.slots), slots_remaining=int(rule.slots), trigger_trade_id=str(trade_id or ""), trigger_asset=str(asset or ""), trigger_ts=_to_utc(closed_ts), trigger_pnl=pnl_f, trigger_pnl_pct=pnl_pct_f, trigger_leverage=lev_f, ) self._next_arm_id += 1 self._arm = arm return OverlayDecision(action="ARMED", reason=rule.name, arm=arm) def tag_next_entry( self, *, asset: str = "", entry_ts: datetime | None = None, metadata: Mapping[str, Any] | None = None, ) -> OverlayDecision: """Return the side tag for the next engine entry and consume one slot.""" del asset, metadata # reserved for future asset-specific slot routing self._expire_if_needed(_to_utc(entry_ts)) if self._arm is None or self._arm.slots_remaining <= 0: return OverlayDecision(action="PASS", side="SHORT", reason="no_active_arm") arm_before = self._arm consumed_slot = arm_before.slots_total - arm_before.slots_remaining + 1 remaining = arm_before.slots_remaining - 1 if remaining <= 0: self._arm = None self.consumed_arms += 1 return OverlayDecision( action="TAG", side="LONG", reason=arm_before.trigger_name, arm=arm_before.with_remaining(0), consumed_slot=consumed_slot, reset=True, ) self._arm = arm_before.with_remaining(remaining) return OverlayDecision( action="TAG", side="LONG", reason=arm_before.trigger_name, arm=self._arm, consumed_slot=consumed_slot, reset=False, ) def snapshot(self) -> dict[str, Any]: return { "enabled": self.config.enabled, "active_arm": self._arm.to_dict() if self._arm else None, "pending_slots": self.pending_slots, "ignored_rearm_attempts": self.ignored_rearm_attempts, "ignored_overlay_flip_triggers": self.ignored_overlay_flip_triggers, "expired_arms": self.expired_arms, "consumed_arms": self.consumed_arms, } def _match_rule(self, *, pnl: float, pnl_pct: float, leverage: float) -> PostWinFlipTrigger | None: for rule in self.config.rules: if rule.matches(pnl=pnl, pnl_pct=pnl_pct, leverage=leverage): return rule return None def _expire_if_needed(self, now: datetime | None) -> None: if self._arm is None: return if self.config.max_arm_age_sec is None: return if now is None or self._arm.trigger_ts is None: return age = (now - self._arm.trigger_ts).total_seconds() if age > float(self.config.max_arm_age_sec): self._arm = None self.expired_arms += 1 # Compatibility aliases for earlier research scripts and tests. PostWinLongOverlayConfig = PostWinExecutionFSMConfig PostWinLongOverlay = PostWinExecutionFSM __all__ = [ "ActiveFlipArm", "OverlayDecision", "PostWinExecutionFSM", "PostWinExecutionFSMConfig", "PostWinFlipTrigger", "PostWinLongOverlay", "PostWinLongOverlayConfig", ]