Checkpoint BLUE V7 long overlay work

This commit is contained in:
Codex
2026-05-08 19:54:13 +02:00
parent 351ce2044d
commit 83f007caa8
6 changed files with 5850 additions and 0 deletions

View File

@@ -0,0 +1,351 @@
"""Deterministic post-win LONG overlay EFSM.
This module does not place orders. It tags future entries after realized BLUE
SHORT exhaustion wins so the live/shadow caller can decide whether to flip the
next one or more SHORT-engine opportunities to LONG.
EFSM means Execution FSM. The EFSM is deliberately slot-based:
- a trigger arms N future slots
- each future entry consumes exactly one slot
- when slots reach zero, state resets to SHORT
- flipped LONG trades do not re-arm the overlay
- triggers observed while an arm is active are ignored unless explicitly
enabled by config
That prevents the bug class where a one/two-trade rebound probe becomes a
self-extending regime switch.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Mapping, Optional, Sequence
def _to_float(value: Any, default: float = 0.0) -> float:
try:
out = float(value)
except (TypeError, ValueError):
return default
return out if out == out else default
def _to_utc(ts: datetime | None) -> datetime | None:
if ts is None:
return None
if ts.tzinfo is None:
return ts.replace(tzinfo=timezone.utc)
return ts.astimezone(timezone.utc)
@dataclass(frozen=True)
class PostWinFlipTrigger:
"""A configurable trigger that arms future LONG flip slots."""
name: str
slots: int
min_pnl_abs: float = 0.0
max_pnl_abs: Optional[float] = None
min_pnl_pct: Optional[float] = None
min_leverage: Optional[float] = None
strict_min_pnl_abs: bool = True
strict_max_pnl_abs: bool = True
strict_min_leverage: bool = True
def matches(self, *, pnl: float, pnl_pct: float, leverage: float) -> bool:
if self.slots <= 0:
return False
if self.strict_min_pnl_abs:
if not pnl > self.min_pnl_abs:
return False
elif pnl < self.min_pnl_abs:
return False
if self.max_pnl_abs is not None:
if self.strict_max_pnl_abs:
if not pnl < self.max_pnl_abs:
return False
elif pnl > self.max_pnl_abs:
return False
if self.min_pnl_pct is not None and pnl_pct < self.min_pnl_pct:
return False
if self.min_leverage is not None:
if self.strict_min_leverage:
if not leverage > self.min_leverage:
return False
elif leverage < self.min_leverage:
return False
return True
@dataclass(frozen=True)
class PostWinExecutionFSMConfig:
"""Configuration for the BLUE post-win Execution FSM."""
enabled: bool = True
rules: Sequence[PostWinFlipTrigger] = field(
default_factory=lambda: (
# Order matters: the high-leverage big-win rule must win before the
# generic big-win rule, otherwise it would be capped at one slot.
PostWinFlipTrigger(
name="big_win_high_lev",
slots=2,
min_pnl_abs=397.0,
min_leverage=8.6,
strict_min_pnl_abs=True,
strict_min_leverage=True,
),
PostWinFlipTrigger(
name="big_win",
slots=1,
min_pnl_abs=397.0,
strict_min_pnl_abs=True,
),
PostWinFlipTrigger(
name="small_dollar_high_return",
slots=1,
min_pnl_abs=0.0,
max_pnl_abs=250.0,
min_pnl_pct=0.0075,
strict_min_pnl_abs=True,
strict_max_pnl_abs=True,
),
)
)
max_arm_age_sec: Optional[float] = None
allow_rearm_while_armed: bool = False
allow_triggers_from_overlay_flips: bool = False
@dataclass(frozen=True)
class ActiveFlipArm:
"""Currently armed future LONG flip slots."""
arm_id: int
trigger_name: str
slots_total: int
slots_remaining: int
trigger_trade_id: str = ""
trigger_asset: str = ""
trigger_ts: datetime | None = None
trigger_pnl: float = 0.0
trigger_pnl_pct: float = 0.0
trigger_leverage: float = 0.0
def with_remaining(self, slots_remaining: int) -> "ActiveFlipArm":
return ActiveFlipArm(
arm_id=self.arm_id,
trigger_name=self.trigger_name,
slots_total=self.slots_total,
slots_remaining=max(0, int(slots_remaining)),
trigger_trade_id=self.trigger_trade_id,
trigger_asset=self.trigger_asset,
trigger_ts=self.trigger_ts,
trigger_pnl=self.trigger_pnl,
trigger_pnl_pct=self.trigger_pnl_pct,
trigger_leverage=self.trigger_leverage,
)
def to_dict(self) -> dict[str, Any]:
return {
"arm_id": self.arm_id,
"trigger_name": self.trigger_name,
"slots_total": self.slots_total,
"slots_remaining": self.slots_remaining,
"trigger_trade_id": self.trigger_trade_id,
"trigger_asset": self.trigger_asset,
"trigger_ts": self.trigger_ts.isoformat() if self.trigger_ts else None,
"trigger_pnl": self.trigger_pnl,
"trigger_pnl_pct": self.trigger_pnl_pct,
"trigger_leverage": self.trigger_leverage,
}
@dataclass(frozen=True)
class OverlayDecision:
"""Result returned by observe/entry tagging calls."""
action: str
side: str = "SHORT"
reason: str = ""
arm: ActiveFlipArm | None = None
consumed_slot: int = 0
reset: bool = False
def to_dict(self) -> dict[str, Any]:
return {
"action": self.action,
"side": self.side,
"reason": self.reason,
"arm": self.arm.to_dict() if self.arm else None,
"consumed_slot": self.consumed_slot,
"reset": self.reset,
}
class PostWinExecutionFSM:
"""Multi-slot post-win LONG tag Execution FSM."""
def __init__(self, config: PostWinExecutionFSMConfig | None = None) -> None:
self.config = config or PostWinExecutionFSMConfig()
self._arm: ActiveFlipArm | None = None
self._next_arm_id = 1
self.ignored_rearm_attempts = 0
self.ignored_overlay_flip_triggers = 0
self.expired_arms = 0
self.consumed_arms = 0
@property
def active_arm(self) -> ActiveFlipArm | None:
return self._arm
@property
def pending_slots(self) -> int:
return int(self._arm.slots_remaining) if self._arm else 0
def reset(self, reason: str = "manual") -> OverlayDecision:
old = self._arm
self._arm = None
return OverlayDecision(action="RESET", reason=reason, arm=old, reset=True)
def observe_closed_trade(
self,
*,
trade_id: str = "",
asset: str = "",
side: str = "SHORT",
pnl: float = 0.0,
pnl_pct: float = 0.0,
leverage: float = 0.0,
closed_ts: datetime | None = None,
was_overlay_flip: bool = False,
metadata: Mapping[str, Any] | None = None,
) -> OverlayDecision:
"""Observe a completed trade and possibly arm future LONG slots.
Parameters are intentionally primitive so this can be called from live
code, replay code, or ClickHouse/log readers.
"""
del metadata # reserved for future feature logging without API churn
self._expire_if_needed(_to_utc(closed_ts))
if not self.config.enabled:
return OverlayDecision(action="NOOP", reason="disabled", arm=self._arm)
side_u = str(side or "SHORT").upper()
if was_overlay_flip or side_u == "LONG":
self.ignored_overlay_flip_triggers += 1
return OverlayDecision(action="IGNORED", reason="overlay_flip_outcome", arm=self._arm)
pnl_f = _to_float(pnl)
pnl_pct_f = _to_float(pnl_pct)
lev_f = _to_float(leverage)
rule = self._match_rule(pnl=pnl_f, pnl_pct=pnl_pct_f, leverage=lev_f)
if rule is None:
return OverlayDecision(action="NO_TRIGGER", reason="no_rule_match", arm=self._arm)
if self._arm is not None and not self.config.allow_rearm_while_armed:
self.ignored_rearm_attempts += 1
return OverlayDecision(action="IGNORED", reason="active_arm_no_rearm", arm=self._arm)
arm = ActiveFlipArm(
arm_id=self._next_arm_id,
trigger_name=rule.name,
slots_total=int(rule.slots),
slots_remaining=int(rule.slots),
trigger_trade_id=str(trade_id or ""),
trigger_asset=str(asset or ""),
trigger_ts=_to_utc(closed_ts),
trigger_pnl=pnl_f,
trigger_pnl_pct=pnl_pct_f,
trigger_leverage=lev_f,
)
self._next_arm_id += 1
self._arm = arm
return OverlayDecision(action="ARMED", reason=rule.name, arm=arm)
def tag_next_entry(
self,
*,
asset: str = "",
entry_ts: datetime | None = None,
metadata: Mapping[str, Any] | None = None,
) -> OverlayDecision:
"""Return the side tag for the next engine entry and consume one slot."""
del asset, metadata # reserved for future asset-specific slot routing
self._expire_if_needed(_to_utc(entry_ts))
if self._arm is None or self._arm.slots_remaining <= 0:
return OverlayDecision(action="PASS", side="SHORT", reason="no_active_arm")
arm_before = self._arm
consumed_slot = arm_before.slots_total - arm_before.slots_remaining + 1
remaining = arm_before.slots_remaining - 1
if remaining <= 0:
self._arm = None
self.consumed_arms += 1
return OverlayDecision(
action="TAG",
side="LONG",
reason=arm_before.trigger_name,
arm=arm_before.with_remaining(0),
consumed_slot=consumed_slot,
reset=True,
)
self._arm = arm_before.with_remaining(remaining)
return OverlayDecision(
action="TAG",
side="LONG",
reason=arm_before.trigger_name,
arm=self._arm,
consumed_slot=consumed_slot,
reset=False,
)
def snapshot(self) -> dict[str, Any]:
return {
"enabled": self.config.enabled,
"active_arm": self._arm.to_dict() if self._arm else None,
"pending_slots": self.pending_slots,
"ignored_rearm_attempts": self.ignored_rearm_attempts,
"ignored_overlay_flip_triggers": self.ignored_overlay_flip_triggers,
"expired_arms": self.expired_arms,
"consumed_arms": self.consumed_arms,
}
def _match_rule(self, *, pnl: float, pnl_pct: float, leverage: float) -> PostWinFlipTrigger | None:
for rule in self.config.rules:
if rule.matches(pnl=pnl, pnl_pct=pnl_pct, leverage=leverage):
return rule
return None
def _expire_if_needed(self, now: datetime | None) -> None:
if self._arm is None:
return
if self.config.max_arm_age_sec is None:
return
if now is None or self._arm.trigger_ts is None:
return
age = (now - self._arm.trigger_ts).total_seconds()
if age > float(self.config.max_arm_age_sec):
self._arm = None
self.expired_arms += 1
# Compatibility aliases for earlier research scripts and tests.
PostWinLongOverlayConfig = PostWinExecutionFSMConfig
PostWinLongOverlay = PostWinExecutionFSM
__all__ = [
"ActiveFlipArm",
"OverlayDecision",
"PostWinExecutionFSM",
"PostWinExecutionFSMConfig",
"PostWinFlipTrigger",
"PostWinLongOverlay",
"PostWinLongOverlayConfig",
]