352 lines
12 KiB
Python
352 lines
12 KiB
Python
|
|
"""Deterministic post-win LONG overlay EFSM.
|
||
|
|
|
||
|
|
This module does not place orders. It tags future entries after realized BLUE
|
||
|
|
SHORT exhaustion wins so the live/shadow caller can decide whether to flip the
|
||
|
|
next one or more SHORT-engine opportunities to LONG.
|
||
|
|
|
||
|
|
EFSM means Execution FSM. The EFSM is deliberately slot-based:
|
||
|
|
|
||
|
|
- a trigger arms N future slots
|
||
|
|
- each future entry consumes exactly one slot
|
||
|
|
- when slots reach zero, state resets to SHORT
|
||
|
|
- flipped LONG trades do not re-arm the overlay
|
||
|
|
- triggers observed while an arm is active are ignored unless explicitly
|
||
|
|
enabled by config
|
||
|
|
|
||
|
|
That prevents the bug class where a one/two-trade rebound probe becomes a
|
||
|
|
self-extending regime switch.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
from dataclasses import dataclass, field
|
||
|
|
from datetime import datetime, timezone
|
||
|
|
from typing import Any, Mapping, Optional, Sequence
|
||
|
|
|
||
|
|
|
||
|
|
def _to_float(value: Any, default: float = 0.0) -> float:
|
||
|
|
try:
|
||
|
|
out = float(value)
|
||
|
|
except (TypeError, ValueError):
|
||
|
|
return default
|
||
|
|
return out if out == out else default
|
||
|
|
|
||
|
|
|
||
|
|
def _to_utc(ts: datetime | None) -> datetime | None:
|
||
|
|
if ts is None:
|
||
|
|
return None
|
||
|
|
if ts.tzinfo is None:
|
||
|
|
return ts.replace(tzinfo=timezone.utc)
|
||
|
|
return ts.astimezone(timezone.utc)
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True)
|
||
|
|
class PostWinFlipTrigger:
|
||
|
|
"""A configurable trigger that arms future LONG flip slots."""
|
||
|
|
|
||
|
|
name: str
|
||
|
|
slots: int
|
||
|
|
min_pnl_abs: float = 0.0
|
||
|
|
max_pnl_abs: Optional[float] = None
|
||
|
|
min_pnl_pct: Optional[float] = None
|
||
|
|
min_leverage: Optional[float] = None
|
||
|
|
strict_min_pnl_abs: bool = True
|
||
|
|
strict_max_pnl_abs: bool = True
|
||
|
|
strict_min_leverage: bool = True
|
||
|
|
|
||
|
|
def matches(self, *, pnl: float, pnl_pct: float, leverage: float) -> bool:
|
||
|
|
if self.slots <= 0:
|
||
|
|
return False
|
||
|
|
if self.strict_min_pnl_abs:
|
||
|
|
if not pnl > self.min_pnl_abs:
|
||
|
|
return False
|
||
|
|
elif pnl < self.min_pnl_abs:
|
||
|
|
return False
|
||
|
|
if self.max_pnl_abs is not None:
|
||
|
|
if self.strict_max_pnl_abs:
|
||
|
|
if not pnl < self.max_pnl_abs:
|
||
|
|
return False
|
||
|
|
elif pnl > self.max_pnl_abs:
|
||
|
|
return False
|
||
|
|
if self.min_pnl_pct is not None and pnl_pct < self.min_pnl_pct:
|
||
|
|
return False
|
||
|
|
if self.min_leverage is not None:
|
||
|
|
if self.strict_min_leverage:
|
||
|
|
if not leverage > self.min_leverage:
|
||
|
|
return False
|
||
|
|
elif leverage < self.min_leverage:
|
||
|
|
return False
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True)
|
||
|
|
class PostWinExecutionFSMConfig:
|
||
|
|
"""Configuration for the BLUE post-win Execution FSM."""
|
||
|
|
|
||
|
|
enabled: bool = True
|
||
|
|
rules: Sequence[PostWinFlipTrigger] = field(
|
||
|
|
default_factory=lambda: (
|
||
|
|
# Order matters: the high-leverage big-win rule must win before the
|
||
|
|
# generic big-win rule, otherwise it would be capped at one slot.
|
||
|
|
PostWinFlipTrigger(
|
||
|
|
name="big_win_high_lev",
|
||
|
|
slots=2,
|
||
|
|
min_pnl_abs=397.0,
|
||
|
|
min_leverage=8.6,
|
||
|
|
strict_min_pnl_abs=True,
|
||
|
|
strict_min_leverage=True,
|
||
|
|
),
|
||
|
|
PostWinFlipTrigger(
|
||
|
|
name="big_win",
|
||
|
|
slots=1,
|
||
|
|
min_pnl_abs=397.0,
|
||
|
|
strict_min_pnl_abs=True,
|
||
|
|
),
|
||
|
|
PostWinFlipTrigger(
|
||
|
|
name="small_dollar_high_return",
|
||
|
|
slots=1,
|
||
|
|
min_pnl_abs=0.0,
|
||
|
|
max_pnl_abs=250.0,
|
||
|
|
min_pnl_pct=0.0075,
|
||
|
|
strict_min_pnl_abs=True,
|
||
|
|
strict_max_pnl_abs=True,
|
||
|
|
),
|
||
|
|
)
|
||
|
|
)
|
||
|
|
max_arm_age_sec: Optional[float] = None
|
||
|
|
allow_rearm_while_armed: bool = False
|
||
|
|
allow_triggers_from_overlay_flips: bool = False
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True)
|
||
|
|
class ActiveFlipArm:
|
||
|
|
"""Currently armed future LONG flip slots."""
|
||
|
|
|
||
|
|
arm_id: int
|
||
|
|
trigger_name: str
|
||
|
|
slots_total: int
|
||
|
|
slots_remaining: int
|
||
|
|
trigger_trade_id: str = ""
|
||
|
|
trigger_asset: str = ""
|
||
|
|
trigger_ts: datetime | None = None
|
||
|
|
trigger_pnl: float = 0.0
|
||
|
|
trigger_pnl_pct: float = 0.0
|
||
|
|
trigger_leverage: float = 0.0
|
||
|
|
|
||
|
|
def with_remaining(self, slots_remaining: int) -> "ActiveFlipArm":
|
||
|
|
return ActiveFlipArm(
|
||
|
|
arm_id=self.arm_id,
|
||
|
|
trigger_name=self.trigger_name,
|
||
|
|
slots_total=self.slots_total,
|
||
|
|
slots_remaining=max(0, int(slots_remaining)),
|
||
|
|
trigger_trade_id=self.trigger_trade_id,
|
||
|
|
trigger_asset=self.trigger_asset,
|
||
|
|
trigger_ts=self.trigger_ts,
|
||
|
|
trigger_pnl=self.trigger_pnl,
|
||
|
|
trigger_pnl_pct=self.trigger_pnl_pct,
|
||
|
|
trigger_leverage=self.trigger_leverage,
|
||
|
|
)
|
||
|
|
|
||
|
|
def to_dict(self) -> dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"arm_id": self.arm_id,
|
||
|
|
"trigger_name": self.trigger_name,
|
||
|
|
"slots_total": self.slots_total,
|
||
|
|
"slots_remaining": self.slots_remaining,
|
||
|
|
"trigger_trade_id": self.trigger_trade_id,
|
||
|
|
"trigger_asset": self.trigger_asset,
|
||
|
|
"trigger_ts": self.trigger_ts.isoformat() if self.trigger_ts else None,
|
||
|
|
"trigger_pnl": self.trigger_pnl,
|
||
|
|
"trigger_pnl_pct": self.trigger_pnl_pct,
|
||
|
|
"trigger_leverage": self.trigger_leverage,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
@dataclass(frozen=True)
|
||
|
|
class OverlayDecision:
|
||
|
|
"""Result returned by observe/entry tagging calls."""
|
||
|
|
|
||
|
|
action: str
|
||
|
|
side: str = "SHORT"
|
||
|
|
reason: str = ""
|
||
|
|
arm: ActiveFlipArm | None = None
|
||
|
|
consumed_slot: int = 0
|
||
|
|
reset: bool = False
|
||
|
|
|
||
|
|
def to_dict(self) -> dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"action": self.action,
|
||
|
|
"side": self.side,
|
||
|
|
"reason": self.reason,
|
||
|
|
"arm": self.arm.to_dict() if self.arm else None,
|
||
|
|
"consumed_slot": self.consumed_slot,
|
||
|
|
"reset": self.reset,
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
class PostWinExecutionFSM:
|
||
|
|
"""Multi-slot post-win LONG tag Execution FSM."""
|
||
|
|
|
||
|
|
def __init__(self, config: PostWinExecutionFSMConfig | None = None) -> None:
|
||
|
|
self.config = config or PostWinExecutionFSMConfig()
|
||
|
|
self._arm: ActiveFlipArm | None = None
|
||
|
|
self._next_arm_id = 1
|
||
|
|
self.ignored_rearm_attempts = 0
|
||
|
|
self.ignored_overlay_flip_triggers = 0
|
||
|
|
self.expired_arms = 0
|
||
|
|
self.consumed_arms = 0
|
||
|
|
|
||
|
|
@property
|
||
|
|
def active_arm(self) -> ActiveFlipArm | None:
|
||
|
|
return self._arm
|
||
|
|
|
||
|
|
@property
|
||
|
|
def pending_slots(self) -> int:
|
||
|
|
return int(self._arm.slots_remaining) if self._arm else 0
|
||
|
|
|
||
|
|
def reset(self, reason: str = "manual") -> OverlayDecision:
|
||
|
|
old = self._arm
|
||
|
|
self._arm = None
|
||
|
|
return OverlayDecision(action="RESET", reason=reason, arm=old, reset=True)
|
||
|
|
|
||
|
|
def observe_closed_trade(
|
||
|
|
self,
|
||
|
|
*,
|
||
|
|
trade_id: str = "",
|
||
|
|
asset: str = "",
|
||
|
|
side: str = "SHORT",
|
||
|
|
pnl: float = 0.0,
|
||
|
|
pnl_pct: float = 0.0,
|
||
|
|
leverage: float = 0.0,
|
||
|
|
closed_ts: datetime | None = None,
|
||
|
|
was_overlay_flip: bool = False,
|
||
|
|
metadata: Mapping[str, Any] | None = None,
|
||
|
|
) -> OverlayDecision:
|
||
|
|
"""Observe a completed trade and possibly arm future LONG slots.
|
||
|
|
|
||
|
|
Parameters are intentionally primitive so this can be called from live
|
||
|
|
code, replay code, or ClickHouse/log readers.
|
||
|
|
"""
|
||
|
|
|
||
|
|
del metadata # reserved for future feature logging without API churn
|
||
|
|
self._expire_if_needed(_to_utc(closed_ts))
|
||
|
|
|
||
|
|
if not self.config.enabled:
|
||
|
|
return OverlayDecision(action="NOOP", reason="disabled", arm=self._arm)
|
||
|
|
|
||
|
|
side_u = str(side or "SHORT").upper()
|
||
|
|
if was_overlay_flip or side_u == "LONG":
|
||
|
|
self.ignored_overlay_flip_triggers += 1
|
||
|
|
return OverlayDecision(action="IGNORED", reason="overlay_flip_outcome", arm=self._arm)
|
||
|
|
|
||
|
|
pnl_f = _to_float(pnl)
|
||
|
|
pnl_pct_f = _to_float(pnl_pct)
|
||
|
|
lev_f = _to_float(leverage)
|
||
|
|
rule = self._match_rule(pnl=pnl_f, pnl_pct=pnl_pct_f, leverage=lev_f)
|
||
|
|
if rule is None:
|
||
|
|
return OverlayDecision(action="NO_TRIGGER", reason="no_rule_match", arm=self._arm)
|
||
|
|
|
||
|
|
if self._arm is not None and not self.config.allow_rearm_while_armed:
|
||
|
|
self.ignored_rearm_attempts += 1
|
||
|
|
return OverlayDecision(action="IGNORED", reason="active_arm_no_rearm", arm=self._arm)
|
||
|
|
|
||
|
|
arm = ActiveFlipArm(
|
||
|
|
arm_id=self._next_arm_id,
|
||
|
|
trigger_name=rule.name,
|
||
|
|
slots_total=int(rule.slots),
|
||
|
|
slots_remaining=int(rule.slots),
|
||
|
|
trigger_trade_id=str(trade_id or ""),
|
||
|
|
trigger_asset=str(asset or ""),
|
||
|
|
trigger_ts=_to_utc(closed_ts),
|
||
|
|
trigger_pnl=pnl_f,
|
||
|
|
trigger_pnl_pct=pnl_pct_f,
|
||
|
|
trigger_leverage=lev_f,
|
||
|
|
)
|
||
|
|
self._next_arm_id += 1
|
||
|
|
self._arm = arm
|
||
|
|
return OverlayDecision(action="ARMED", reason=rule.name, arm=arm)
|
||
|
|
|
||
|
|
def tag_next_entry(
|
||
|
|
self,
|
||
|
|
*,
|
||
|
|
asset: str = "",
|
||
|
|
entry_ts: datetime | None = None,
|
||
|
|
metadata: Mapping[str, Any] | None = None,
|
||
|
|
) -> OverlayDecision:
|
||
|
|
"""Return the side tag for the next engine entry and consume one slot."""
|
||
|
|
|
||
|
|
del asset, metadata # reserved for future asset-specific slot routing
|
||
|
|
self._expire_if_needed(_to_utc(entry_ts))
|
||
|
|
if self._arm is None or self._arm.slots_remaining <= 0:
|
||
|
|
return OverlayDecision(action="PASS", side="SHORT", reason="no_active_arm")
|
||
|
|
|
||
|
|
arm_before = self._arm
|
||
|
|
consumed_slot = arm_before.slots_total - arm_before.slots_remaining + 1
|
||
|
|
remaining = arm_before.slots_remaining - 1
|
||
|
|
if remaining <= 0:
|
||
|
|
self._arm = None
|
||
|
|
self.consumed_arms += 1
|
||
|
|
return OverlayDecision(
|
||
|
|
action="TAG",
|
||
|
|
side="LONG",
|
||
|
|
reason=arm_before.trigger_name,
|
||
|
|
arm=arm_before.with_remaining(0),
|
||
|
|
consumed_slot=consumed_slot,
|
||
|
|
reset=True,
|
||
|
|
)
|
||
|
|
|
||
|
|
self._arm = arm_before.with_remaining(remaining)
|
||
|
|
return OverlayDecision(
|
||
|
|
action="TAG",
|
||
|
|
side="LONG",
|
||
|
|
reason=arm_before.trigger_name,
|
||
|
|
arm=self._arm,
|
||
|
|
consumed_slot=consumed_slot,
|
||
|
|
reset=False,
|
||
|
|
)
|
||
|
|
|
||
|
|
def snapshot(self) -> dict[str, Any]:
|
||
|
|
return {
|
||
|
|
"enabled": self.config.enabled,
|
||
|
|
"active_arm": self._arm.to_dict() if self._arm else None,
|
||
|
|
"pending_slots": self.pending_slots,
|
||
|
|
"ignored_rearm_attempts": self.ignored_rearm_attempts,
|
||
|
|
"ignored_overlay_flip_triggers": self.ignored_overlay_flip_triggers,
|
||
|
|
"expired_arms": self.expired_arms,
|
||
|
|
"consumed_arms": self.consumed_arms,
|
||
|
|
}
|
||
|
|
|
||
|
|
def _match_rule(self, *, pnl: float, pnl_pct: float, leverage: float) -> PostWinFlipTrigger | None:
|
||
|
|
for rule in self.config.rules:
|
||
|
|
if rule.matches(pnl=pnl, pnl_pct=pnl_pct, leverage=leverage):
|
||
|
|
return rule
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _expire_if_needed(self, now: datetime | None) -> None:
|
||
|
|
if self._arm is None:
|
||
|
|
return
|
||
|
|
if self.config.max_arm_age_sec is None:
|
||
|
|
return
|
||
|
|
if now is None or self._arm.trigger_ts is None:
|
||
|
|
return
|
||
|
|
age = (now - self._arm.trigger_ts).total_seconds()
|
||
|
|
if age > float(self.config.max_arm_age_sec):
|
||
|
|
self._arm = None
|
||
|
|
self.expired_arms += 1
|
||
|
|
|
||
|
|
|
||
|
|
# Compatibility aliases for earlier research scripts and tests.
|
||
|
|
PostWinLongOverlayConfig = PostWinExecutionFSMConfig
|
||
|
|
PostWinLongOverlay = PostWinExecutionFSM
|
||
|
|
|
||
|
|
|
||
|
|
__all__ = [
|
||
|
|
"ActiveFlipArm",
|
||
|
|
"OverlayDecision",
|
||
|
|
"PostWinExecutionFSM",
|
||
|
|
"PostWinExecutionFSMConfig",
|
||
|
|
"PostWinFlipTrigger",
|
||
|
|
"PostWinLongOverlay",
|
||
|
|
"PostWinLongOverlayConfig",
|
||
|
|
]
|