Files
DOLPHIN/adaptive_exit/adaptive_exit_engine.py
hjnormey 48dcf3fe13 feat(aem): per-bucket MAE_MULT table + shadow logging completeness
AEM stays shadow-only this sprint (V7 drives live exits); changes affect
what AEM *would have done*, logged to CH for future V7→AEM demotion analysis.

adaptive_exit_engine.py:
- Replace single MAE_MULT_TIER1=3.5 with MAE_MULT_BY_BUCKET dict
  (B3→None disables MAE stop, B4→2.0 strict, B6→6.0 wide band)
- evaluate() return dict extended: mae_mult_applied, mae_threshold,
  atr, p_threshold, giveback_k (all params that drove the decision)
- adaptive_exit_shadow schema extended (new Nullable columns added via
  ALTER TABLE IF NOT EXISTS — backward-compat with pre-sprint rows)
- log_shadow() signature extended: v7_action, v7_exit_reason,
  naive_would_have (TP/STOP/MAX_HOLD counterfactual at same instant)

dolphin_actor.py:
- AEM shadow call now passes V7 head-to-head decision and naive
  counterfactual so future retrospective requires no offline replay
- EsoF listener registered on DOLPHIN_FEATURES map (esof_advisor_latest
  key); label fed into engine._current_esof_label before each step_bar
- S6/bucket loaders (_load_s6_size_table, _load_asset_bucket_data) and
  constructor wiring for the new GREEN engine kwargs

Plan refs: Tasks 5, 7, 10 — V7 path untouched, AEM return value is
never gated, CH shadow is best-effort (daemon thread).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 06:07:46 +02:00

457 lines
20 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Adaptive Exit Engine — parallel shadow mode for BLUE.
Runs alongside V7 per active trade. Does NOT interfere with real exits.
Logs shadow decisions to ClickHouse table `adaptive_exit_shadow` and
accumulates outcomes for online model updates.
Integration pattern (dolphin_actor.py):
from adaptive_exit.adaptive_exit_engine import AdaptiveExitEngine
_adaptive_exit = AdaptiveExitEngine.load()
# In _on_rt_exit_timer or _on_scan_timer, per active trade:
shadow = _adaptive_exit.evaluate(
trade_id=_tid,
asset=_asset,
direction=_dir, # -1 = SHORT
entry_price=_entry,
current_price=_cur_px,
bars_held=_bars,
max_hold=120,
recent_prices=_price_buf, # list[float], last 20+ bars
exf=self._last_exf,
)
# shadow is dict with: action, p_continuation, exit_reason_shadow, bucket_id
# Log it; never use it to exit.
Decision logic mirrors the spec:
EXIT if:
- mae > mae_threshold(vol) [hard stop]
- giveback: mfe < k * peak_mfe AND p_continuation < p_threshold
- tau > 1.0 [time cap]
"""
import json
import os
import threading
import time
import urllib.request
from typing import Dict, Optional
import numpy as np
from adaptive_exit.bucket_engine import build_buckets, get_bucket
from adaptive_exit.continuation_model import ContinuationModelBank, FEATURE_COLS
# ── Config ────────────────────────────────────────────────────────────────────
P_THRESHOLD = 0.40 # P(continuation) below this → consider exit
GIVEBACK_K = 0.50 # MFE giveback fraction
MAE_MULT_TIER1 = 3.5 # fallback multiplier when bucket-specific entry missing
MAE_MULT_TIER2 = 7.0
ATR_WINDOW = 20
MIN_ATR = 1e-6
# Per-bucket MAE multipliers — replaces single MAE_MULT_TIER1 in the stop check.
# Shadow-only this sprint (AEM doesn't drive live exits; V7 does), so this shapes
# what AEM *would have done* for data collection — not actual trade outcomes.
# `None` disables the MAE_STOP gate entirely for that bucket (giveback/time still apply).
# B3 — natural winners; shadow shows 5.05.1 MAE peaks before FIXED_TP succeeds
# B4 — gross-negative alpha; cut fast before drawdown compounds
# B6 — extreme-vol assets; wide band or we trip on noise
MAE_MULT_BY_BUCKET: Dict[int, Optional[float]] = {
0: 3.5,
1: 3.0,
2: 3.5,
3: None,
4: 2.0,
5: 4.0,
6: 6.0,
}
_CH_URL = "http://localhost:8123/"
_CH_HEADERS = {"X-ClickHouse-User": "dolphin", "X-ClickHouse-Key": "dolphin_ch_2026"}
# Shadow outcome logging
_SHADOW_TABLE = "adaptive_exit_shadow"
_SHADOW_DB = "dolphin"
def _ch_insert(row: dict, db: str = _SHADOW_DB) -> None:
"""Non-blocking fire-and-forget insert."""
try:
body = (json.dumps(row) + "\n").encode()
url = f"{_CH_URL}?database={db}&query=INSERT+INTO+{_SHADOW_TABLE}+FORMAT+JSONEachRow"
req = urllib.request.Request(url, data=body, method="POST")
for k, v in _CH_HEADERS.items():
req.add_header(k, v)
urllib.request.urlopen(req, timeout=3)
except Exception:
pass # shadow logging is best-effort
def _ensure_shadow_table() -> None:
"""Create shadow table if it doesn't exist.
Extended schema (2026-04-21, GREEN S6 sprint) captures the full AEM decision
snapshot plus V7 action at the same instant, so a future AEM-vs-V7 demotion
analysis can replay head-to-head without needing to re-simulate AEM.
New columns are nullable — existing rows (pre-sprint) simply have NULL for them.
"""
ddl = (
f"CREATE TABLE IF NOT EXISTS {_SHADOW_DB}.{_SHADOW_TABLE} ("
"ts DateTime64(6, 'UTC'),"
"ts_day Date MATERIALIZED toDate(ts),"
"trade_id String,"
"asset LowCardinality(String),"
"bucket_id UInt8,"
"bars_held UInt16,"
"mae_norm Float32,"
"mfe_norm Float32,"
"tau_norm Float32,"
"p_cont Float32,"
"vel_div_entry Float32,"
"vel_div_now Float32,"
"action LowCardinality(String),"
"exit_reason LowCardinality(String),"
"actual_exit LowCardinality(String),"
"pnl_pct Float32,"
# ── AEM decision params (Nullable to stay backward-compat) ──
"mae_mult_applied Nullable(Float32),"
"mae_threshold Nullable(Float32),"
"atr Nullable(Float32),"
"p_threshold Nullable(Float32),"
"giveback_k Nullable(Float32),"
# ── V7 head-to-head (authoritative path) ──
"v7_action LowCardinality(Nullable(String)),"
"v7_exit_reason LowCardinality(Nullable(String)),"
# ── Naive counterfactual (what the dumb TP/STOP/MAX_HOLD would have done) ──
"naive_would_have LowCardinality(Nullable(String))"
") ENGINE = MergeTree()"
" ORDER BY (ts_day, asset, ts)"
" TTL ts_day + INTERVAL 90 DAY"
)
# For pre-existing tables, add the new columns idempotently. CH treats
# ADD COLUMN IF NOT EXISTS as a no-op when the column is already present.
alters = [
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS mae_mult_applied Nullable(Float32)",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS mae_threshold Nullable(Float32)",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS atr Nullable(Float32)",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS p_threshold Nullable(Float32)",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS giveback_k Nullable(Float32)",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS v7_action LowCardinality(Nullable(String))",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS v7_exit_reason LowCardinality(Nullable(String))",
f"ALTER TABLE {_SHADOW_DB}.{_SHADOW_TABLE} ADD COLUMN IF NOT EXISTS naive_would_have LowCardinality(Nullable(String))",
]
try:
for stmt in (ddl, *alters):
body = stmt.encode()
req = urllib.request.Request(_CH_URL, data=body, method="POST")
for k, v in _CH_HEADERS.items():
req.add_header(k, v)
urllib.request.urlopen(req, timeout=10)
except Exception as e:
print(f"[AdaptiveExitEngine] Warning: could not create/alter shadow table: {e}")
# ── Per-trade state ───────────────────────────────────────────────────────────
class _TradeState:
def __init__(self, trade_id: str, asset: str, direction: int,
entry_price: float, bucket_id: int, vel_div_entry: float = 0.0):
self.trade_id = trade_id
self.asset = asset
self.direction = direction # -1 = SHORT, 1 = LONG
self.entry_price = entry_price
self.bucket_id = bucket_id
self.vel_div_entry = vel_div_entry
self.mae = 0.0
self.mfe = 0.0
self.peak_mfe = 0.0
self.price_buf: list[float] = [] # rolling price history
# ── Engine ────────────────────────────────────────────────────────────────────
class AdaptiveExitEngine:
def __init__(self, model_bank: ContinuationModelBank, bucket_data: dict):
self._model = model_bank
self._bucket_data = bucket_data
self._states: dict[str, _TradeState] = {}
self._lock = threading.Lock()
self._pending_outcomes: list[dict] = []
@classmethod
def load(cls) -> "AdaptiveExitEngine":
"""Load pre-trained models. Falls back gracefully if not trained yet."""
try:
bank = ContinuationModelBank.load()
print("[AdaptiveExitEngine] Continuation models loaded")
except FileNotFoundError:
print("[AdaptiveExitEngine] WARNING: no trained model found — using untrained fallback")
bank = ContinuationModelBank()
try:
bucket_data = build_buckets(force_rebuild=False)
print(f"[AdaptiveExitEngine] Bucket assignments loaded: "
f"{bucket_data['n_buckets']} buckets")
except Exception as e:
print(f"[AdaptiveExitEngine] WARNING: bucket data unavailable ({e})")
bucket_data = {"assignments": {}, "n_buckets": 0}
_ensure_shadow_table()
return cls(bank, bucket_data)
# ── Trade lifecycle ───────────────────────────────────────────────────────
def on_entry(self, trade_id: str, asset: str, direction: int,
entry_price: float, vel_div_entry: float = 0.0) -> None:
bid = get_bucket(asset, self._bucket_data, fallback=0)
with self._lock:
self._states[trade_id] = _TradeState(trade_id, asset, direction,
entry_price, bid, vel_div_entry)
def on_exit(self, trade_id: str, actual_exit_reason: str,
pnl_pct: float) -> None:
"""Called when the real system closes a trade — records outcome for online update.
Only natural exits feed the model (FIXED_TP, MAX_HOLD, V7/AE stops).
Forced exits (HIBERNATE_HALT, SUBDAY_ACB_NORMALIZATION) are filtered by
the model bank's natural-exits-only guard, preventing regime artifacts
from biasing the continuation distribution.
"""
with self._lock:
st = self._states.pop(trade_id, None)
if st is None:
return
cont = 1 if pnl_pct > 0 else 0
if st.price_buf:
prices = np.array(st.price_buf[-ATR_WINDOW:])
atr = max(np.std(np.diff(np.log(np.maximum(prices, 1e-12)))), MIN_ATR)
mae_norm = st.mae / atr
mfe_norm = st.mfe / atr
tau_norm = min(len(st.price_buf) / 120.0, 1.0)
ret_1 = float(np.log(prices[-1] / prices[-2])) if len(prices) >= 2 else 0.0
ret_3 = float(np.log(prices[-1] / prices[-4])) if len(prices) >= 4 else ret_1
obf = self._bucket_data.get("features", {})
obf_row = {}
if hasattr(obf, "loc") and st.asset in obf.index:
obf_row = obf.loc[st.asset].to_dict()
vel_div_now = float(prices[-1]) if len(prices) >= 1 else st.vel_div_entry # placeholder; overridden if caller passes it
p_pred = self._model.predict(
mae_norm=mae_norm, mfe_norm=mfe_norm, tau_norm=tau_norm,
ret_1=ret_1, ret_3=ret_3,
vel_div_entry=st.vel_div_entry, vel_div_now=st.vel_div_entry,
spread_bps=float(obf_row.get("spread_bps", 0.0)),
depth_usd=float(obf_row.get("depth_usd", 0.0)),
fill_prob=float(obf_row.get("fill_prob", 0.9)),
bucket_id=st.bucket_id,
)
self._model.online_update(
bucket_id=st.bucket_id,
mae_norm=mae_norm,
mfe_norm=mfe_norm,
tau_norm=tau_norm,
ret_1=ret_1,
ret_3=ret_3,
vel_div_entry=st.vel_div_entry,
vel_div_now=st.vel_div_entry,
spread_bps=float(obf_row.get("spread_bps", 0.0)),
depth_usd=float(obf_row.get("depth_usd", 0.0)),
fill_prob=float(obf_row.get("fill_prob", 0.9)),
continuation=cont,
exit_reason=actual_exit_reason,
p_pred=p_pred,
)
# Log one final shadow row at close so actual_exit is queryable for comparison
threading.Thread(target=_ch_insert, args=({
"ts": int(time.time() * 1e6),
"trade_id": trade_id,
"asset": st.asset,
"bucket_id": int(st.bucket_id),
"bars_held": int(tau_norm * 120),
"mae_norm": float(mae_norm),
"mfe_norm": float(mfe_norm),
"tau_norm": float(tau_norm),
"p_cont": float(p_pred),
"vel_div_entry": float(st.vel_div_entry),
"vel_div_now": float(st.vel_div_entry),
"action": "CLOSED",
"exit_reason": "",
"actual_exit": actual_exit_reason,
"pnl_pct": float(pnl_pct),
},), daemon=True).start()
# ── Per-bar evaluation ────────────────────────────────────────────────────
def evaluate(
self,
trade_id: str,
asset: str,
direction: int,
entry_price: float,
current_price: float,
bars_held: int,
max_hold: int = 120,
recent_prices: Optional[list] = None,
exf: Optional[dict] = None,
vel_div_now: float = 0.0,
) -> dict:
"""
Evaluate whether the adaptive engine would exit this trade.
Returns shadow decision dict (never executed — caller logs only).
"""
with self._lock:
if trade_id not in self._states:
bid = get_bucket(asset, self._bucket_data, fallback=0)
self._states[trade_id] = _TradeState(trade_id, asset, direction,
entry_price, bid, vel_div_now)
st = self._states[trade_id]
# Update price buffer
if recent_prices:
st.price_buf = list(recent_prices[-ATR_WINDOW - 5:])
elif current_price:
st.price_buf.append(current_price)
# Compute delta (positive = favorable for direction)
delta = direction * (entry_price - current_price) / entry_price
# For SHORT (dir=-1): delta = -(entry - cur)/entry = (cur - entry)/entry
# Wait — direction=-1 means SHORT, favorable = price drops = cur < entry
# delta = (entry - cur)/entry * abs(direction) ... let's be explicit:
if direction == -1: # SHORT
delta = (entry_price - current_price) / entry_price # +ve if price dropped
else: # LONG
delta = (current_price - entry_price) / entry_price # +ve if price rose
adverse = max(0.0, -delta)
favorable = max(0.0, delta)
st.mae = max(st.mae, adverse)
st.mfe = max(st.mfe, favorable)
st.peak_mfe = max(st.peak_mfe, st.mfe)
# ATR from price buffer
prices_arr = np.array(st.price_buf, dtype=float) if st.price_buf else np.array([current_price])
if len(prices_arr) >= 2:
log_rets = np.diff(np.log(np.maximum(prices_arr, 1e-12)))
atr = max(float(np.std(log_rets[-ATR_WINDOW:])), MIN_ATR)
else:
atr = MIN_ATR
mae_norm = st.mae / atr
mfe_norm = st.mfe / atr
tau_norm = bars_held / max_hold
prices_f = prices_arr[-ATR_WINDOW:]
ret_1 = float(np.log(prices_f[-1] / prices_f[-2])) if len(prices_f) >= 2 else 0.0
ret_3 = float(np.log(prices_f[-1] / prices_f[-4])) if len(prices_f) >= 4 else ret_1
# OBF static features for this asset
obf_feats = self._bucket_data.get("features", {})
obf_row = {}
if hasattr(obf_feats, "loc") and asset in obf_feats.index:
obf_row = obf_feats.loc[asset].to_dict()
# P(continuation)
p_cont = self._model.predict(
mae_norm=mae_norm,
mfe_norm=mfe_norm,
tau_norm=tau_norm,
ret_1=ret_1,
ret_3=ret_3,
vel_div_entry=st.vel_div_entry,
vel_div_now=vel_div_now,
spread_bps=float(obf_row.get("spread_bps", 0.0)),
depth_usd=float(obf_row.get("depth_usd", 0.0)),
fill_prob=float(obf_row.get("fill_prob", 0.9)),
bucket_id=st.bucket_id,
)
# Decision logic — per-bucket MAE multiplier. `None` entry disables the
# MAE_STOP gate for that bucket (giveback + time checks still apply).
mae_mult = MAE_MULT_BY_BUCKET.get(st.bucket_id, MAE_MULT_TIER1)
mae_threshold = max(0.005, mae_mult * atr) if mae_mult is not None else None
action = "HOLD"
exit_reason = ""
if mae_threshold is not None and st.mae > mae_threshold:
action = "EXIT"
exit_reason = "AE_MAE_STOP"
elif (st.peak_mfe > 0 and st.mfe < GIVEBACK_K * st.peak_mfe
and p_cont < P_THRESHOLD):
action = "EXIT"
exit_reason = "AE_GIVEBACK_LOW_CONT"
elif tau_norm > 1.0:
action = "EXIT"
exit_reason = "AE_TIME"
return {
"trade_id": trade_id,
"asset": st.asset,
"action": action,
"exit_reason_shadow": exit_reason,
"p_continuation": p_cont,
"mae_norm": mae_norm,
"mfe_norm": mfe_norm,
"tau_norm": tau_norm,
"bucket_id": st.bucket_id,
"vel_div_entry": st.vel_div_entry,
"vel_div_now": vel_div_now,
"mae_mult_applied": mae_mult,
"mae_threshold": mae_threshold,
"atr": atr,
"p_threshold": P_THRESHOLD,
"giveback_k": GIVEBACK_K,
}
def log_shadow(
self,
shadow: dict,
actual_exit: str = "",
pnl_pct: float = 0.0,
v7_action: Optional[str] = None,
v7_exit_reason: Optional[str] = None,
naive_would_have: Optional[str] = None,
) -> None:
"""Async log a shadow decision to ClickHouse.
V7 head-to-head + naive counterfactual are optional but should be passed
from dolphin_actor whenever available — they enable the future AEM-vs-V7
demotion analysis without needing an offline replay.
"""
def _opt(v):
return None if v is None else float(v)
row = {
"ts": int(time.time() * 1e6),
"trade_id": shadow.get("trade_id", ""),
"asset": shadow.get("asset", ""),
"bucket_id": int(shadow.get("bucket_id", 0)),
"bars_held": int(shadow.get("tau_norm", 0) * 120),
"mae_norm": float(shadow.get("mae_norm", 0)),
"mfe_norm": float(shadow.get("mfe_norm", 0)),
"tau_norm": float(shadow.get("tau_norm", 0)),
"p_cont": float(shadow.get("p_continuation", 0.5)),
"vel_div_entry": float(shadow.get("vel_div_entry", 0.0)),
"vel_div_now": float(shadow.get("vel_div_now", 0.0)),
"action": shadow.get("action", "HOLD"),
"exit_reason": shadow.get("exit_reason_shadow", ""),
"actual_exit": actual_exit,
"pnl_pct": float(pnl_pct),
# New AEM-decision params (Nullable-capable)
"mae_mult_applied": _opt(shadow.get("mae_mult_applied")),
"mae_threshold": _opt(shadow.get("mae_threshold")),
"atr": _opt(shadow.get("atr")),
"p_threshold": _opt(shadow.get("p_threshold")),
"giveback_k": _opt(shadow.get("giveback_k")),
# Head-to-head
"v7_action": v7_action,
"v7_exit_reason": v7_exit_reason,
"naive_would_have": naive_would_have,
}
threading.Thread(target=_ch_insert, args=(row,), daemon=True).start()