Files
DOLPHIN/adaptive_exit/adaptive_exit_engine.py

377 lines
15 KiB
Python
Raw Permalink Normal View History

"""
Adaptive Exit Engine parallel shadow mode for BLUE.
Runs alongside V7 per active trade. Does NOT interfere with real exits.
Logs shadow decisions to ClickHouse table `adaptive_exit_shadow` and
accumulates outcomes for online model updates.
Integration pattern (dolphin_actor.py):
from adaptive_exit.adaptive_exit_engine import AdaptiveExitEngine
_adaptive_exit = AdaptiveExitEngine.load()
# In _on_rt_exit_timer or _on_scan_timer, per active trade:
shadow = _adaptive_exit.evaluate(
trade_id=_tid,
asset=_asset,
direction=_dir, # -1 = SHORT
entry_price=_entry,
current_price=_cur_px,
bars_held=_bars,
max_hold=120,
recent_prices=_price_buf, # list[float], last 20+ bars
exf=self._last_exf,
)
# shadow is dict with: action, p_continuation, exit_reason_shadow, bucket_id
# Log it; never use it to exit.
Decision logic mirrors the spec:
EXIT if:
- mae > mae_threshold(vol) [hard stop]
- giveback: mfe < k * peak_mfe AND p_continuation < p_threshold
- tau > 1.0 [time cap]
"""
import json
import os
import threading
import time
import urllib.request
from typing import Optional
import numpy as np
from adaptive_exit.bucket_engine import build_buckets, get_bucket
from adaptive_exit.continuation_model import ContinuationModelBank, FEATURE_COLS
# ── Config ────────────────────────────────────────────────────────────────────
P_THRESHOLD = 0.40 # P(continuation) below this → consider exit
GIVEBACK_K = 0.50 # MFE giveback fraction
MAE_MULT_TIER1 = 3.5 # vol multiplier for tier-1 stop
MAE_MULT_TIER2 = 7.0
ATR_WINDOW = 20
MIN_ATR = 1e-6
_CH_URL = "http://localhost:8123/"
_CH_HEADERS = {"X-ClickHouse-User": "dolphin", "X-ClickHouse-Key": "dolphin_ch_2026"}
# Shadow outcome logging
_SHADOW_TABLE = "adaptive_exit_shadow"
_SHADOW_DB = "dolphin"
def _ch_insert(row: dict, db: str = _SHADOW_DB) -> None:
"""Non-blocking fire-and-forget insert."""
try:
body = (json.dumps(row) + "\n").encode()
url = f"{_CH_URL}?database={db}&query=INSERT+INTO+{_SHADOW_TABLE}+FORMAT+JSONEachRow"
req = urllib.request.Request(url, data=body, method="POST")
for k, v in _CH_HEADERS.items():
req.add_header(k, v)
urllib.request.urlopen(req, timeout=3)
except Exception:
pass # shadow logging is best-effort
def _ensure_shadow_table() -> None:
"""Create shadow table if it doesn't exist."""
ddl = (
f"CREATE TABLE IF NOT EXISTS {_SHADOW_DB}.{_SHADOW_TABLE} ("
"ts DateTime64(6, 'UTC'),"
"ts_day Date MATERIALIZED toDate(ts),"
"trade_id String,"
"asset LowCardinality(String),"
"bucket_id UInt8,"
"bars_held UInt16,"
"mae_norm Float32,"
"mfe_norm Float32,"
"tau_norm Float32,"
"p_cont Float32,"
"vel_div_entry Float32,"
"vel_div_now Float32,"
"action LowCardinality(String),"
"exit_reason LowCardinality(String),"
"actual_exit LowCardinality(String),"
"pnl_pct Float32"
") ENGINE = MergeTree()"
" ORDER BY (ts_day, asset, ts)"
" TTL ts_day + INTERVAL 90 DAY"
)
try:
body = ddl.encode()
req = urllib.request.Request(_CH_URL, data=body, method="POST")
for k, v in _CH_HEADERS.items():
req.add_header(k, v)
urllib.request.urlopen(req, timeout=10)
except Exception as e:
print(f"[AdaptiveExitEngine] Warning: could not create shadow table: {e}")
# ── Per-trade state ───────────────────────────────────────────────────────────
class _TradeState:
def __init__(self, trade_id: str, asset: str, direction: int,
entry_price: float, bucket_id: int, vel_div_entry: float = 0.0):
self.trade_id = trade_id
self.asset = asset
self.direction = direction # -1 = SHORT, 1 = LONG
self.entry_price = entry_price
self.bucket_id = bucket_id
self.vel_div_entry = vel_div_entry
self.mae = 0.0
self.mfe = 0.0
self.peak_mfe = 0.0
self.price_buf: list[float] = [] # rolling price history
# ── Engine ────────────────────────────────────────────────────────────────────
class AdaptiveExitEngine:
def __init__(self, model_bank: ContinuationModelBank, bucket_data: dict):
self._model = model_bank
self._bucket_data = bucket_data
self._states: dict[str, _TradeState] = {}
self._lock = threading.Lock()
self._pending_outcomes: list[dict] = []
@classmethod
def load(cls) -> "AdaptiveExitEngine":
"""Load pre-trained models. Falls back gracefully if not trained yet."""
try:
bank = ContinuationModelBank.load()
print("[AdaptiveExitEngine] Continuation models loaded")
except FileNotFoundError:
print("[AdaptiveExitEngine] WARNING: no trained model found — using untrained fallback")
bank = ContinuationModelBank()
try:
bucket_data = build_buckets(force_rebuild=False)
print(f"[AdaptiveExitEngine] Bucket assignments loaded: "
f"{bucket_data['n_buckets']} buckets")
except Exception as e:
print(f"[AdaptiveExitEngine] WARNING: bucket data unavailable ({e})")
bucket_data = {"assignments": {}, "n_buckets": 0}
_ensure_shadow_table()
return cls(bank, bucket_data)
# ── Trade lifecycle ───────────────────────────────────────────────────────
def on_entry(self, trade_id: str, asset: str, direction: int,
entry_price: float, vel_div_entry: float = 0.0) -> None:
bid = get_bucket(asset, self._bucket_data, fallback=0)
with self._lock:
self._states[trade_id] = _TradeState(trade_id, asset, direction,
entry_price, bid, vel_div_entry)
def on_exit(self, trade_id: str, actual_exit_reason: str,
pnl_pct: float) -> None:
"""Called when the real system closes a trade — records outcome for online update.
Only natural exits feed the model (FIXED_TP, MAX_HOLD, V7/AE stops).
Forced exits (HIBERNATE_HALT, SUBDAY_ACB_NORMALIZATION) are filtered by
the model bank's natural-exits-only guard, preventing regime artifacts
from biasing the continuation distribution.
"""
with self._lock:
st = self._states.pop(trade_id, None)
if st is None:
return
cont = 1 if pnl_pct > 0 else 0
if st.price_buf:
prices = np.array(st.price_buf[-ATR_WINDOW:])
atr = max(np.std(np.diff(np.log(np.maximum(prices, 1e-12)))), MIN_ATR)
mae_norm = st.mae / atr
mfe_norm = st.mfe / atr
tau_norm = min(len(st.price_buf) / 120.0, 1.0)
ret_1 = float(np.log(prices[-1] / prices[-2])) if len(prices) >= 2 else 0.0
ret_3 = float(np.log(prices[-1] / prices[-4])) if len(prices) >= 4 else ret_1
obf = self._bucket_data.get("features", {})
obf_row = {}
if hasattr(obf, "loc") and st.asset in obf.index:
obf_row = obf.loc[st.asset].to_dict()
vel_div_now = float(prices[-1]) if len(prices) >= 1 else st.vel_div_entry # placeholder; overridden if caller passes it
p_pred = self._model.predict(
mae_norm=mae_norm, mfe_norm=mfe_norm, tau_norm=tau_norm,
ret_1=ret_1, ret_3=ret_3,
vel_div_entry=st.vel_div_entry, vel_div_now=st.vel_div_entry,
spread_bps=float(obf_row.get("spread_bps", 0.0)),
depth_usd=float(obf_row.get("depth_usd", 0.0)),
fill_prob=float(obf_row.get("fill_prob", 0.9)),
bucket_id=st.bucket_id,
)
self._model.online_update(
bucket_id=st.bucket_id,
mae_norm=mae_norm,
mfe_norm=mfe_norm,
tau_norm=tau_norm,
ret_1=ret_1,
ret_3=ret_3,
vel_div_entry=st.vel_div_entry,
vel_div_now=st.vel_div_entry,
spread_bps=float(obf_row.get("spread_bps", 0.0)),
depth_usd=float(obf_row.get("depth_usd", 0.0)),
fill_prob=float(obf_row.get("fill_prob", 0.9)),
continuation=cont,
exit_reason=actual_exit_reason,
p_pred=p_pred,
)
# Log one final shadow row at close so actual_exit is queryable for comparison
threading.Thread(target=_ch_insert, args=({
"ts": int(time.time() * 1e6),
"trade_id": trade_id,
"asset": st.asset,
"bucket_id": int(st.bucket_id),
"bars_held": int(tau_norm * 120),
"mae_norm": float(mae_norm),
"mfe_norm": float(mfe_norm),
"tau_norm": float(tau_norm),
"p_cont": float(p_pred),
"vel_div_entry": float(st.vel_div_entry),
"vel_div_now": float(st.vel_div_entry),
"action": "CLOSED",
"exit_reason": "",
"actual_exit": actual_exit_reason,
"pnl_pct": float(pnl_pct),
},), daemon=True).start()
# ── Per-bar evaluation ────────────────────────────────────────────────────
def evaluate(
self,
trade_id: str,
asset: str,
direction: int,
entry_price: float,
current_price: float,
bars_held: int,
max_hold: int = 120,
recent_prices: Optional[list] = None,
exf: Optional[dict] = None,
vel_div_now: float = 0.0,
) -> dict:
"""
Evaluate whether the adaptive engine would exit this trade.
Returns shadow decision dict (never executed caller logs only).
"""
with self._lock:
if trade_id not in self._states:
bid = get_bucket(asset, self._bucket_data, fallback=0)
self._states[trade_id] = _TradeState(trade_id, asset, direction,
entry_price, bid, vel_div_now)
st = self._states[trade_id]
# Update price buffer
if recent_prices:
st.price_buf = list(recent_prices[-ATR_WINDOW - 5:])
elif current_price:
st.price_buf.append(current_price)
# Compute delta (positive = favorable for direction)
delta = direction * (entry_price - current_price) / entry_price
# For SHORT (dir=-1): delta = -(entry - cur)/entry = (cur - entry)/entry
# Wait — direction=-1 means SHORT, favorable = price drops = cur < entry
# delta = (entry - cur)/entry * abs(direction) ... let's be explicit:
if direction == -1: # SHORT
delta = (entry_price - current_price) / entry_price # +ve if price dropped
else: # LONG
delta = (current_price - entry_price) / entry_price # +ve if price rose
adverse = max(0.0, -delta)
favorable = max(0.0, delta)
st.mae = max(st.mae, adverse)
st.mfe = max(st.mfe, favorable)
st.peak_mfe = max(st.peak_mfe, st.mfe)
# ATR from price buffer
prices_arr = np.array(st.price_buf, dtype=float) if st.price_buf else np.array([current_price])
if len(prices_arr) >= 2:
log_rets = np.diff(np.log(np.maximum(prices_arr, 1e-12)))
atr = max(float(np.std(log_rets[-ATR_WINDOW:])), MIN_ATR)
else:
atr = MIN_ATR
mae_norm = st.mae / atr
mfe_norm = st.mfe / atr
tau_norm = bars_held / max_hold
prices_f = prices_arr[-ATR_WINDOW:]
ret_1 = float(np.log(prices_f[-1] / prices_f[-2])) if len(prices_f) >= 2 else 0.0
ret_3 = float(np.log(prices_f[-1] / prices_f[-4])) if len(prices_f) >= 4 else ret_1
# OBF static features for this asset
obf_feats = self._bucket_data.get("features", {})
obf_row = {}
if hasattr(obf_feats, "loc") and asset in obf_feats.index:
obf_row = obf_feats.loc[asset].to_dict()
# P(continuation)
p_cont = self._model.predict(
mae_norm=mae_norm,
mfe_norm=mfe_norm,
tau_norm=tau_norm,
ret_1=ret_1,
ret_3=ret_3,
vel_div_entry=st.vel_div_entry,
vel_div_now=vel_div_now,
spread_bps=float(obf_row.get("spread_bps", 0.0)),
depth_usd=float(obf_row.get("depth_usd", 0.0)),
fill_prob=float(obf_row.get("fill_prob", 0.9)),
bucket_id=st.bucket_id,
)
# Decision logic
mae_threshold = max(0.005, MAE_MULT_TIER1 * atr)
action = "HOLD"
exit_reason = ""
if st.mae > mae_threshold:
action = "EXIT"
exit_reason = "AE_MAE_STOP"
elif (st.peak_mfe > 0 and st.mfe < GIVEBACK_K * st.peak_mfe
and p_cont < P_THRESHOLD):
action = "EXIT"
exit_reason = "AE_GIVEBACK_LOW_CONT"
elif tau_norm > 1.0:
action = "EXIT"
exit_reason = "AE_TIME"
return {
"trade_id": trade_id,
"asset": st.asset,
"action": action,
"exit_reason_shadow": exit_reason,
"p_continuation": p_cont,
"mae_norm": mae_norm,
"mfe_norm": mfe_norm,
"tau_norm": tau_norm,
"bucket_id": st.bucket_id,
"vel_div_entry": st.vel_div_entry,
"vel_div_now": vel_div_now,
}
def log_shadow(self, shadow: dict, actual_exit: str = "", pnl_pct: float = 0.0) -> None:
"""Async log a shadow decision to ClickHouse."""
row = {
"ts": int(time.time() * 1e6),
"trade_id": shadow.get("trade_id", ""),
"asset": shadow.get("asset", ""),
"bucket_id": int(shadow.get("bucket_id", 0)),
"bars_held": int(shadow.get("tau_norm", 0) * 120),
"mae_norm": float(shadow.get("mae_norm", 0)),
"mfe_norm": float(shadow.get("mfe_norm", 0)),
"tau_norm": float(shadow.get("tau_norm", 0)),
"p_cont": float(shadow.get("p_continuation", 0.5)),
"vel_div_entry": float(shadow.get("vel_div_entry", 0.0)),
"vel_div_now": float(shadow.get("vel_div_now", 0.0)),
"action": shadow.get("action", "HOLD"),
"exit_reason": shadow.get("exit_reason_shadow", ""),
"actual_exit": actual_exit,
"pnl_pct": float(pnl_pct),
}
threading.Thread(target=_ch_insert, args=(row,), daemon=True).start()