791 lines
33 KiB
Plaintext
791 lines
33 KiB
Plaintext
import pandas as pd
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import seaborn as sns
|
|
from typing import Tuple, Dict, List, Optional, Union
|
|
from dataclasses import dataclass
|
|
from scipy import stats
|
|
import warnings
|
|
warnings.filterwarnings('ignore')
|
|
|
|
@dataclass
|
|
class DrawdownMetrics:
|
|
"""Container for drawdown analysis results"""
|
|
avg_drawdown: float
|
|
max_drawdown: float
|
|
recovery_count: int
|
|
recovery_probability: float
|
|
avg_recovery_time: float
|
|
confidence_interval: Tuple[float, float]
|
|
|
|
@dataclass
|
|
class OrderBookSnapshot:
|
|
"""Simulated order book data"""
|
|
bid_depth: float
|
|
ask_depth: float
|
|
spread: float
|
|
imbalance: float
|
|
|
|
class ConditionalDrawdownAnalyzer:
|
|
"""
|
|
Complete framework for conditional drawdown analysis with all enhancements:
|
|
- Recursive self-consistent drawdowns
|
|
- Volatility adjustment
|
|
- Order book integration
|
|
- Backtesting capabilities
|
|
- Regime detection
|
|
- Entry optimization
|
|
"""
|
|
|
|
def __init__(self, confidence_level: float = 0.95):
|
|
self.confidence_level = confidence_level
|
|
self.results_cache = {}
|
|
|
|
def basic_conditional_drawdown(self,
|
|
price: pd.Series,
|
|
entry_price: float) -> DrawdownMetrics:
|
|
"""
|
|
Basic conditional drawdown: track lows below entry until recovery above entry
|
|
"""
|
|
below_entry = price < entry_price
|
|
lows = []
|
|
recovery_times = []
|
|
i = 0
|
|
n = len(price)
|
|
|
|
while i < n:
|
|
if below_entry.iloc[i]:
|
|
start_idx = i
|
|
min_price = price.iloc[i]
|
|
|
|
# Track the drawdown period
|
|
while i < n and price.iloc[i] < entry_price:
|
|
min_price = min(min_price, price.iloc[i])
|
|
i += 1
|
|
|
|
# Check if we recovered (reached end or price >= entry)
|
|
if i < n and price.iloc[i] >= entry_price:
|
|
lows.append(min_price)
|
|
recovery_times.append(i - start_idx)
|
|
elif i == n and len(lows) > 0:
|
|
# End of series - only count if we had previous recoveries
|
|
lows.append(min_price)
|
|
recovery_times.append(i - start_idx)
|
|
else:
|
|
i += 1
|
|
|
|
if not lows:
|
|
return DrawdownMetrics(0, 0, 0, 0, 0, (0, 0))
|
|
|
|
drawdowns = [entry_price - low for low in lows]
|
|
avg_dd = np.mean(drawdowns)
|
|
max_dd = np.max(drawdowns)
|
|
|
|
# Confidence interval
|
|
if len(drawdowns) > 1:
|
|
ci = stats.t.interval(self.confidence_level, len(drawdowns)-1,
|
|
loc=avg_dd, scale=stats.sem(drawdowns))
|
|
else:
|
|
ci = (avg_dd, avg_dd)
|
|
|
|
return DrawdownMetrics(
|
|
avg_drawdown=avg_dd,
|
|
max_drawdown=max_dd,
|
|
recovery_count=len(lows),
|
|
recovery_probability=len(lows) / max(1, np.sum(below_entry)),
|
|
avg_recovery_time=np.mean(recovery_times) if recovery_times else 0,
|
|
confidence_interval=ci
|
|
)
|
|
|
|
def recursive_conditional_drawdown(self,
|
|
price: pd.Series,
|
|
entry_price: float,
|
|
tol: float = 1e-5,
|
|
max_iter: int = 50) -> DrawdownMetrics:
|
|
"""
|
|
Recursive conditional drawdown where recovery must exceed entry + avg_drawdown
|
|
"""
|
|
avg_dd = 0.0
|
|
|
|
for iteration in range(max_iter):
|
|
lows = []
|
|
recovery_times = []
|
|
i = 0
|
|
n = len(price)
|
|
recovery_threshold = entry_price + avg_dd
|
|
|
|
while i < n:
|
|
if price.iloc[i] < entry_price:
|
|
start_idx = i
|
|
min_price = price.iloc[i]
|
|
|
|
# Track drawdown until sufficient recovery
|
|
while i < n and price.iloc[i] < recovery_threshold:
|
|
min_price = min(min_price, price.iloc[i])
|
|
i += 1
|
|
|
|
# Only count if we actually recovered above threshold
|
|
if i < n and price.iloc[i] >= recovery_threshold:
|
|
lows.append(min_price)
|
|
recovery_times.append(i - start_idx)
|
|
else:
|
|
i += 1
|
|
|
|
if lows:
|
|
new_avg_dd = np.mean([entry_price - low for low in lows])
|
|
else:
|
|
new_avg_dd = 0.0
|
|
|
|
if abs(new_avg_dd - avg_dd) < tol:
|
|
break
|
|
avg_dd = new_avg_dd
|
|
|
|
if not lows:
|
|
return DrawdownMetrics(0, 0, 0, 0, 0, (0, 0))
|
|
|
|
drawdowns = [entry_price - low for low in lows]
|
|
max_dd = np.max(drawdowns)
|
|
|
|
# Confidence interval
|
|
if len(drawdowns) > 1:
|
|
ci = stats.t.interval(self.confidence_level, len(drawdowns)-1,
|
|
loc=avg_dd, scale=stats.sem(drawdowns))
|
|
else:
|
|
ci = (avg_dd, avg_dd)
|
|
|
|
return DrawdownMetrics(
|
|
avg_drawdown=avg_dd,
|
|
max_drawdown=max_dd,
|
|
recovery_count=len(lows),
|
|
recovery_probability=len(lows) / max(1, len(price)),
|
|
avg_recovery_time=np.mean(recovery_times) if recovery_times else 0,
|
|
confidence_interval=ci
|
|
)
|
|
|
|
def calculate_volatility(self,
|
|
price: pd.Series,
|
|
method: str = 'atr',
|
|
window: int = 14) -> pd.Series:
|
|
"""
|
|
Calculate volatility using various methods
|
|
"""
|
|
if method == 'atr':
|
|
# Approximate ATR using price changes
|
|
returns = price.pct_change().abs()
|
|
return returns.rolling(window).mean() * price
|
|
elif method == 'std':
|
|
returns = price.pct_change()
|
|
return returns.rolling(window).std() * price
|
|
elif method == 'ewm':
|
|
returns = price.pct_change()
|
|
return returns.ewm(span=window).std() * price
|
|
else:
|
|
raise ValueError(f"Unknown volatility method: {method}")
|
|
|
|
def volatility_adjusted_drawdown(self,
|
|
price: pd.Series,
|
|
entry_price: float,
|
|
vol_method: str = 'atr',
|
|
vol_window: int = 14,
|
|
recursive: bool = True) -> Dict:
|
|
"""
|
|
Calculate volatility-adjusted conditional drawdowns
|
|
"""
|
|
# Get base drawdown
|
|
if recursive:
|
|
base_metrics = self.recursive_conditional_drawdown(price, entry_price)
|
|
else:
|
|
base_metrics = self.basic_conditional_drawdown(price, entry_price)
|
|
|
|
# Calculate volatility at entry point
|
|
volatility = self.calculate_volatility(price, vol_method, vol_window)
|
|
entry_idx = (price - entry_price).abs().idxmin()
|
|
entry_vol = volatility.loc[entry_idx] if entry_idx in volatility.index else volatility.mean()
|
|
|
|
# Volatility adjustment coefficient
|
|
avg_vol = volatility.mean()
|
|
vol_coeff = entry_vol / avg_vol if avg_vol > 0 else 1.0
|
|
|
|
adjusted_metrics = DrawdownMetrics(
|
|
avg_drawdown=base_metrics.avg_drawdown / vol_coeff,
|
|
max_drawdown=base_metrics.max_drawdown / vol_coeff,
|
|
recovery_count=base_metrics.recovery_count,
|
|
recovery_probability=base_metrics.recovery_probability,
|
|
avg_recovery_time=base_metrics.avg_recovery_time,
|
|
confidence_interval=(base_metrics.confidence_interval[0] / vol_coeff,
|
|
base_metrics.confidence_interval[1] / vol_coeff)
|
|
)
|
|
|
|
return {
|
|
'base_metrics': base_metrics,
|
|
'adjusted_metrics': adjusted_metrics,
|
|
'volatility_coefficient': vol_coeff,
|
|
'entry_volatility': entry_vol,
|
|
'average_volatility': avg_vol
|
|
}
|
|
|
|
def simulate_order_book(self,
|
|
price: pd.Series,
|
|
entry_price: float) -> OrderBookSnapshot:
|
|
"""
|
|
Simulate order book metrics based on price action
|
|
"""
|
|
# Simple simulation based on recent volatility and volume proxy
|
|
recent_vol = price.pct_change().tail(20).std()
|
|
distance_to_entry = abs(price.iloc[-1] - entry_price) / entry_price
|
|
|
|
# Simulate depth (higher vol = lower depth)
|
|
base_depth = 10000
|
|
bid_depth = base_depth * (1 - recent_vol * 10)
|
|
ask_depth = base_depth * (1 - recent_vol * 10)
|
|
|
|
# Simulate spread (higher vol = wider spread)
|
|
spread = entry_price * (0.001 + recent_vol * 5)
|
|
|
|
# Simulate imbalance
|
|
imbalance = 0.5 + np.random.normal(0, 0.1) # Random around balanced
|
|
|
|
return OrderBookSnapshot(
|
|
bid_depth=max(1000, bid_depth),
|
|
ask_depth=max(1000, ask_depth),
|
|
spread=spread,
|
|
imbalance=max(0.1, min(0.9, imbalance))
|
|
)
|
|
|
|
def order_book_adjusted_drawdown(self,
|
|
price: pd.Series,
|
|
entry_price: float,
|
|
order_book: Optional[OrderBookSnapshot] = None) -> Dict:
|
|
"""
|
|
Adjust drawdown based on order book liquidity
|
|
"""
|
|
base_result = self.volatility_adjusted_drawdown(price, entry_price)
|
|
|
|
if order_book is None:
|
|
order_book = self.simulate_order_book(price, entry_price)
|
|
|
|
# Liquidity adjustment factor
|
|
min_depth = min(order_book.bid_depth, order_book.ask_depth)
|
|
depth_factor = 1.0 + (10000 - min_depth) / 50000 # Higher adjustment for thin books
|
|
spread_factor = 1.0 + order_book.spread / entry_price * 100 # Spread penalty
|
|
imbalance_factor = 1.0 + abs(0.5 - order_book.imbalance) * 0.5 # Imbalance penalty
|
|
|
|
liquidity_adjustment = depth_factor * spread_factor * imbalance_factor
|
|
|
|
adjusted_metrics = base_result['adjusted_metrics']
|
|
liquidity_adjusted = DrawdownMetrics(
|
|
avg_drawdown=adjusted_metrics.avg_drawdown * liquidity_adjustment,
|
|
max_drawdown=adjusted_metrics.max_drawdown * liquidity_adjustment,
|
|
recovery_count=adjusted_metrics.recovery_count,
|
|
recovery_probability=adjusted_metrics.recovery_probability / liquidity_adjustment,
|
|
avg_recovery_time=adjusted_metrics.avg_recovery_time * liquidity_adjustment,
|
|
confidence_interval=(adjusted_metrics.confidence_interval[0] * liquidity_adjustment,
|
|
adjusted_metrics.confidence_interval[1] * liquidity_adjustment)
|
|
)
|
|
|
|
return {
|
|
**base_result,
|
|
'liquidity_adjusted_metrics': liquidity_adjusted,
|
|
'order_book': order_book,
|
|
'liquidity_adjustment_factor': liquidity_adjustment
|
|
}
|
|
|
|
def create_drawdown_surface(self,
|
|
price: pd.Series,
|
|
price_range: Optional[Tuple[float, float]] = None,
|
|
resolution: float = 0.005, # 0.5% steps
|
|
recursive: bool = True) -> pd.DataFrame:
|
|
"""
|
|
Create drawdown surface across all potential entry prices
|
|
"""
|
|
if price_range is None:
|
|
price_min, price_max = price.min() * 0.95, price.max() * 1.05
|
|
else:
|
|
price_min, price_max = price_range
|
|
|
|
# Create entry price grid
|
|
num_points = int((price_max - price_min) / (price_min * resolution))
|
|
entry_prices = np.linspace(price_min, price_max, min(num_points, 200)) # Limit for performance
|
|
|
|
results = []
|
|
for entry_price in entry_prices:
|
|
try:
|
|
if recursive:
|
|
metrics = self.recursive_conditional_drawdown(price, entry_price)
|
|
else:
|
|
metrics = self.basic_conditional_drawdown(price, entry_price)
|
|
|
|
results.append({
|
|
'entry_price': entry_price,
|
|
'avg_drawdown': metrics.avg_drawdown,
|
|
'max_drawdown': metrics.max_drawdown,
|
|
'recovery_count': metrics.recovery_count,
|
|
'recovery_probability': metrics.recovery_probability,
|
|
'avg_recovery_time': metrics.avg_recovery_time,
|
|
'ci_lower': metrics.confidence_interval[0],
|
|
'ci_upper': metrics.confidence_interval[1]
|
|
})
|
|
except Exception as e:
|
|
# Skip problematic entry prices
|
|
continue
|
|
|
|
return pd.DataFrame(results)
|
|
|
|
def detect_abnormal_regime(self,
|
|
price: pd.Series,
|
|
entry_price: float,
|
|
current_drawdown: float,
|
|
threshold_multiplier: float = 2.0) -> Dict:
|
|
"""
|
|
Detect if current drawdown indicates abnormal market regime
|
|
"""
|
|
metrics = self.recursive_conditional_drawdown(price, entry_price)
|
|
|
|
# Check if current drawdown exceeds historical norms
|
|
expected_dd = metrics.avg_drawdown
|
|
ci_upper = metrics.confidence_interval[1]
|
|
|
|
is_abnormal = current_drawdown > (expected_dd * threshold_multiplier)
|
|
is_extreme = current_drawdown > ci_upper
|
|
|
|
severity_score = current_drawdown / max(expected_dd, 0.001) # Avoid division by zero
|
|
|
|
return {
|
|
'is_abnormal_regime': is_abnormal,
|
|
'is_extreme_drawdown': is_extreme,
|
|
'severity_score': severity_score,
|
|
'expected_drawdown': expected_dd,
|
|
'current_drawdown': current_drawdown,
|
|
'confidence_upper': ci_upper,
|
|
'recommendation': self._get_regime_recommendation(severity_score, is_abnormal, is_extreme)
|
|
}
|
|
|
|
def _get_regime_recommendation(self, severity_score: float, is_abnormal: bool, is_extreme: bool) -> str:
|
|
"""Generate trading recommendations based on regime analysis"""
|
|
if is_extreme:
|
|
return "EXTREME: Consider immediate exit or significant position reduction"
|
|
elif is_abnormal:
|
|
return "ABNORMAL: Tighten stops, reduce position size, or hedge"
|
|
elif severity_score > 1.5:
|
|
return "ELEVATED: Monitor closely, consider partial profit-taking"
|
|
else:
|
|
return "NORMAL: Continue with current strategy"
|
|
|
|
def optimize_entries(self,
|
|
price: pd.Series,
|
|
reward_target: float = 0.02, # 2% target
|
|
max_entries: int = 10) -> pd.DataFrame:
|
|
"""
|
|
Find optimal entry points based on risk-reward ratio
|
|
"""
|
|
surface = self.create_drawdown_surface(price)
|
|
|
|
if surface.empty:
|
|
return pd.DataFrame()
|
|
|
|
# Calculate risk-reward ratios
|
|
surface['potential_reward'] = reward_target
|
|
surface['risk_reward_ratio'] = surface['potential_reward'] / surface['avg_drawdown'].replace(0, np.inf)
|
|
surface['score'] = (surface['risk_reward_ratio'] * surface['recovery_probability'] *
|
|
(1 / (surface['avg_recovery_time'] + 1)))
|
|
|
|
# Filter out low-probability entries
|
|
surface_filtered = surface[surface['recovery_probability'] > 0.3].copy()
|
|
|
|
if surface_filtered.empty:
|
|
return surface.nlargest(max_entries, 'score')
|
|
|
|
# Get top entries
|
|
optimal_entries = surface_filtered.nlargest(max_entries, 'score')
|
|
|
|
return optimal_entries[['entry_price', 'avg_drawdown', 'recovery_probability',
|
|
'risk_reward_ratio', 'score']].round(4)
|
|
|
|
def backtest_strategy(self,
|
|
price: pd.Series,
|
|
entry_signals: pd.Series,
|
|
exit_method: str = 'adaptive_stop',
|
|
initial_capital: float = 100000) -> Dict:
|
|
"""
|
|
Backtest trading strategy using conditional drawdown insights
|
|
"""
|
|
trades = []
|
|
capital = initial_capital
|
|
position = 0
|
|
entry_price = 0
|
|
|
|
for i, (timestamp, signal) in enumerate(entry_signals.items()):
|
|
current_price = price.loc[timestamp]
|
|
|
|
if signal > 0 and position == 0: # Entry signal
|
|
# Calculate position size based on drawdown risk
|
|
metrics = self.recursive_conditional_drawdown(price.loc[:timestamp], current_price)
|
|
risk_amount = capital * 0.02 # Risk 2% per trade
|
|
|
|
if metrics.avg_drawdown > 0:
|
|
position_size = risk_amount / metrics.avg_drawdown
|
|
position = min(position_size, capital / current_price) # Can't buy more than we have
|
|
entry_price = current_price
|
|
|
|
elif position > 0: # Manage existing position
|
|
current_drawdown = max(0, entry_price - current_price)
|
|
|
|
# Check for exit conditions
|
|
should_exit = False
|
|
exit_reason = ""
|
|
|
|
if exit_method == 'adaptive_stop':
|
|
# Use conditional drawdown as adaptive stop
|
|
metrics = self.recursive_conditional_drawdown(price.loc[:timestamp], entry_price)
|
|
if current_drawdown > metrics.avg_drawdown * 1.5:
|
|
should_exit = True
|
|
exit_reason = "Adaptive stop loss"
|
|
|
|
elif exit_method == 'regime_detection':
|
|
regime_info = self.detect_abnormal_regime(price.loc[:timestamp], entry_price, current_drawdown)
|
|
if regime_info['is_abnormal_regime']:
|
|
should_exit = True
|
|
exit_reason = "Abnormal regime detected"
|
|
|
|
# Exit on signal or end of data
|
|
if should_exit or signal < 0 or i == len(entry_signals) - 1:
|
|
pnl = position * (current_price - entry_price)
|
|
capital += pnl
|
|
|
|
trades.append({
|
|
'entry_time': timestamp,
|
|
'entry_price': entry_price,
|
|
'exit_time': timestamp,
|
|
'exit_price': current_price,
|
|
'position_size': position,
|
|
'pnl': pnl,
|
|
'return_pct': pnl / (position * entry_price),
|
|
'exit_reason': exit_reason or "Signal exit"
|
|
})
|
|
|
|
position = 0
|
|
entry_price = 0
|
|
|
|
trades_df = pd.DataFrame(trades)
|
|
|
|
if trades_df.empty:
|
|
return {'trades': trades_df, 'total_return': 0, 'win_rate': 0, 'sharpe_ratio': 0}
|
|
|
|
# Calculate performance metrics
|
|
total_return = (capital - initial_capital) / initial_capital
|
|
win_rate = (trades_df['pnl'] > 0).mean()
|
|
returns = trades_df['return_pct']
|
|
sharpe_ratio = returns.mean() / returns.std() if returns.std() > 0 else 0
|
|
|
|
return {
|
|
'trades': trades_df,
|
|
'total_return': total_return,
|
|
'win_rate': win_rate,
|
|
'sharpe_ratio': sharpe_ratio,
|
|
'final_capital': capital,
|
|
'num_trades': len(trades_df),
|
|
'avg_return_per_trade': returns.mean(),
|
|
'max_loss': trades_df['pnl'].min(),
|
|
'max_gain': trades_df['pnl'].max()
|
|
}
|
|
|
|
class DrawdownVisualizer:
|
|
"""Visualization tools for conditional drawdown analysis"""
|
|
|
|
def __init__(self, analyzer: ConditionalDrawdownAnalyzer):
|
|
self.analyzer = analyzer
|
|
plt.style.use('seaborn-v0_8')
|
|
|
|
def plot_price_with_drawdowns(self,
|
|
price: pd.Series,
|
|
entry_price: float,
|
|
recursive: bool = True,
|
|
figsize: Tuple[int, int] = (12, 8)):
|
|
"""Plot price series with drawdown analysis overlay"""
|
|
|
|
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=figsize, height_ratios=[3, 1])
|
|
|
|
# Main price plot
|
|
ax1.plot(price.index, price.values, 'b-', linewidth=1, alpha=0.7, label='Price')
|
|
ax1.axhline(y=entry_price, color='red', linestyle='--', alpha=0.8, label=f'Entry: ${entry_price:.2f}')
|
|
|
|
# Get drawdown metrics
|
|
if recursive:
|
|
metrics = self.analyzer.recursive_conditional_drawdown(price, entry_price)
|
|
else:
|
|
metrics = self.analyzer.basic_conditional_drawdown(price, entry_price)
|
|
|
|
# Draw confidence bands
|
|
ax1.axhline(y=entry_price - metrics.confidence_interval[0], color='orange',
|
|
linestyle=':', alpha=0.6, label=f'Expected DD: ${metrics.avg_drawdown:.2f}')
|
|
ax1.axhline(y=entry_price - metrics.confidence_interval[1], color='red',
|
|
linestyle=':', alpha=0.6, label=f'Max Expected: ${metrics.confidence_interval[1]:.2f}')
|
|
|
|
ax1.fill_between(price.index,
|
|
entry_price - metrics.confidence_interval[0],
|
|
entry_price - metrics.confidence_interval[1],
|
|
alpha=0.2, color='red', label='Risk Zone')
|
|
|
|
ax1.set_title(f'Price Analysis with Conditional Drawdown\n'
|
|
f'Avg DD: ${metrics.avg_drawdown:.2f} | Max DD: ${metrics.max_drawdown:.2f} | '
|
|
f'Recovery Rate: {metrics.recovery_probability:.1%}')
|
|
ax1.set_ylabel('Price ($)')
|
|
ax1.legend()
|
|
ax1.grid(True, alpha=0.3)
|
|
|
|
# Drawdown visualization
|
|
drawdown_series = price.apply(lambda x: max(0, entry_price - x))
|
|
ax2.fill_between(price.index, 0, drawdown_series,
|
|
where=(drawdown_series > 0), alpha=0.7, color='red', label='Current Drawdown')
|
|
ax2.axhline(y=metrics.avg_drawdown, color='orange', linestyle='-',
|
|
label=f'Average DD: ${metrics.avg_drawdown:.2f}')
|
|
ax2.set_ylabel('Drawdown ($)')
|
|
ax2.set_xlabel('Time')
|
|
ax2.legend()
|
|
ax2.grid(True, alpha=0.3)
|
|
|
|
plt.tight_layout()
|
|
return fig
|
|
|
|
def plot_drawdown_surface(self,
|
|
price: pd.Series,
|
|
surface: Optional[pd.DataFrame] = None,
|
|
figsize: Tuple[int, int] = (14, 10)):
|
|
"""Create comprehensive drawdown surface visualization"""
|
|
|
|
if surface is None:
|
|
surface = self.analyzer.create_drawdown_surface(price)
|
|
|
|
fig = plt.figure(figsize=figsize)
|
|
gs = fig.add_gridspec(2, 2, height_ratios=[1, 1], width_ratios=[3, 1])
|
|
|
|
# Main heatmap
|
|
ax1 = fig.add_subplot(gs[:, 0])
|
|
|
|
# Pivot for heatmap (if we have enough data points)
|
|
if len(surface) > 10:
|
|
# Create a simplified heatmap
|
|
pivot_data = surface.set_index('entry_price')[['avg_drawdown', 'recovery_probability']].T
|
|
|
|
im = ax1.imshow(pivot_data.values, aspect='auto', cmap='RdYlBu_r',
|
|
extent=[surface['entry_price'].min(), surface['entry_price'].max(), 0, 2])
|
|
|
|
ax1.set_xlabel('Entry Price ($)')
|
|
ax1.set_yticks([0.5, 1.5])
|
|
ax1.set_yticklabels(['Avg Drawdown', 'Recovery Prob'])
|
|
ax1.set_title('Drawdown Risk Surface')
|
|
|
|
# Add colorbar
|
|
cbar = plt.colorbar(im, ax=ax1)
|
|
cbar.set_label('Risk Level')
|
|
|
|
# Risk-reward scatter
|
|
ax2 = fig.add_subplot(gs[0, 1])
|
|
scatter = ax2.scatter(surface['avg_drawdown'], surface['recovery_probability'],
|
|
c=surface['entry_price'], s=30, alpha=0.7, cmap='viridis')
|
|
ax2.set_xlabel('Avg Drawdown')
|
|
ax2.set_ylabel('Recovery Probability')
|
|
ax2.set_title('Risk vs Recovery')
|
|
ax2.grid(True, alpha=0.3)
|
|
|
|
# Entry price distribution
|
|
ax3 = fig.add_subplot(gs[1, 1])
|
|
ax3.hist(surface['avg_drawdown'], bins=20, alpha=0.7, color='skyblue', edgecolor='black')
|
|
ax3.set_xlabel('Average Drawdown')
|
|
ax3.set_ylabel('Frequency')
|
|
ax3.set_title('Drawdown Distribution')
|
|
ax3.grid(True, alpha=0.3)
|
|
|
|
plt.tight_layout()
|
|
return fig
|
|
|
|
def plot_regime_analysis(self,
|
|
price: pd.Series,
|
|
entry_price: float,
|
|
current_time: Optional[pd.Timestamp] = None,
|
|
figsize: Tuple[int, int] = (12, 6)):
|
|
"""Plot regime detection analysis"""
|
|
|
|
if current_time is None:
|
|
current_time = price.index[-1]
|
|
|
|
# Calculate regime metrics over time
|
|
regime_data = []
|
|
lookback_window = min(50, len(price) // 4)
|
|
|
|
for i in range(lookback_window, len(price)):
|
|
subset = price.iloc[:i]
|
|
current_price = price.iloc[i]
|
|
current_dd = max(0, entry_price - current_price)
|
|
|
|
regime_info = self.analyzer.detect_abnormal_regime(subset, entry_price, current_dd)
|
|
regime_data.append({
|
|
'timestamp': price.index[i],
|
|
'severity_score': regime_info['severity_score'],
|
|
'is_abnormal': regime_info['is_abnormal_regime'],
|
|
'current_drawdown': current_dd,
|
|
'expected_drawdown': regime_info['expected_drawdown']
|
|
})
|
|
|
|
regime_df = pd.DataFrame(regime_data)
|
|
|
|
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=figsize, sharex=True)
|
|
|
|
# Price and regime zones
|
|
ax1.plot(price.index, price.values, 'b-', linewidth=1, label='Price')
|
|
ax1.axhline(y=entry_price, color='red', linestyle='--', label='Entry Price')
|
|
|
|
# Highlight abnormal periods
|
|
abnormal_periods = regime_df[regime_df['is_abnormal']]
|
|
if not abnormal_periods.empty:
|
|
for _, period in abnormal_periods.iterrows():
|
|
ax1.axvline(x=period['timestamp'], color='red', alpha=0.3)
|
|
|
|
ax1.set_ylabel('Price ($)')
|
|
ax1.set_title('Price Movement with Regime Detection')
|
|
ax1.legend()
|
|
ax1.grid(True, alpha=0.3)
|
|
|
|
# Severity score over time
|
|
ax2.plot(regime_df['timestamp'], regime_df['severity_score'],
|
|
'orange', linewidth=2, label='Severity Score')
|
|
ax2.axhline(y=1.0, color='green', linestyle='--', alpha=0.7, label='Normal')
|
|
ax2.axhline(y=2.0, color='red', linestyle='--', alpha=0.7, label='Abnormal Threshold')
|
|
|
|
# Shade abnormal regions
|
|
abnormal_mask = regime_df['severity_score'] > 2.0
|
|
ax2.fill_between(regime_df['timestamp'], 0, regime_df['severity_score'],
|
|
where=abnormal_mask, alpha=0.3, color='red', label='Abnormal Regime')
|
|
|
|
ax2.set_ylabel('Severity Score')
|
|
ax2.set_xlabel('Time')
|
|
ax2.set_title('Regime Abnormality Detection')
|
|
ax2.legend()
|
|
ax2.grid(True, alpha=0.3)
|
|
|
|
plt.tight_layout()
|
|
return fig
|
|
|
|
# Example usage and demonstration
|
|
def generate_sample_data(n_points: int = 1000, trend: float = 0.0005, volatility: float = 0.02) -> pd.Series:
|
|
"""Generate sample price data for testing"""
|
|
dates = pd.date_range(start='2023-01-01', periods=n_points, freq='H')
|
|
|
|
# Generate price path with trend and volatility
|
|
returns = np.random.normal(trend, volatility, n_points)
|
|
returns[100:150] = np.random.normal(-0.003, 0.04, 50) # Add a volatile period
|
|
returns[500:550] = np.random.normal(0.002, 0.01, 50) # Add a calm period
|
|
|
|
prices = 100 * np.exp(np.cumsum(returns))
|
|
return pd.Series(prices, index=dates)
|
|
|
|
def comprehensive_demo():
|
|
"""Run a comprehensive demonstration of the framework"""
|
|
print("🚀 Conditional Drawdown Framework - Comprehensive Demo\n")
|
|
|
|
# Generate sample data
|
|
print("📊 Generating sample price data...")
|
|
price_data = generate_sample_data(n_points=1000)
|
|
entry_price = price_data.iloc[100] # Enter after some history
|
|
|
|
# Initialize analyzer
|
|
analyzer = ConditionalDrawdownAnalyzer(confidence_level=0.95)
|
|
visualizer = DrawdownVisualizer(analyzer)
|
|
|
|
print(f"Entry Price: ${entry_price:.2f}")
|
|
print(f"Price Range: ${price_data.min():.2f} - ${price_data.max():.2f}\n")
|
|
|
|
# 1. Basic Analysis
|
|
print("🔍 1. Basic Conditional Drawdown Analysis")
|
|
basic_metrics = analyzer.basic_conditional_drawdown(price_data, entry_price)
|
|
print(f" Average Drawdown: ${basic_metrics.avg_drawdown:.2f}")
|
|
print(f" Maximum Drawdown: ${basic_metrics.max_drawdown:.2f}")
|
|
print(f" Recovery Probability: {basic_metrics.recovery_probability:.1%}")
|
|
print(f" Average Recovery Time: {basic_metrics.avg_recovery_time:.1f} periods\n")
|
|
|
|
# 2. Recursive Analysis
|
|
print("🔄 2. Recursive Conditional Drawdown Analysis")
|
|
recursive_metrics = analyzer.recursive_conditional_drawdown(price_data, entry_price)
|
|
print(f" Average Drawdown: ${recursive_metrics.avg_drawdown:.2f}")
|
|
print(f" Maximum Drawdown: ${recursive_metrics.max_drawdown:.2f}")
|
|
print(f" Recovery Probability: {recursive_metrics.recovery_probability:.1%}\n")
|
|
|
|
# 3. Volatility Adjustment
|
|
print("📈 3. Volatility-Adjusted Analysis")
|
|
vol_result = analyzer.volatility_adjusted_drawdown(price_data, entry_price)
|
|
print(f" Volatility Coefficient: {vol_result['volatility_coefficient']:.3f}")
|
|
print(f" Adjusted Avg Drawdown: ${vol_result['adjusted_metrics'].avg_drawdown:.2f}")
|
|
print(f" Base vs Adjusted: ${vol_result['base_metrics'].avg_drawdown:.2f} → "
|
|
f"${vol_result['adjusted_metrics'].avg_drawdown:.2f}\n")
|
|
|
|
# 4. Order Book Integration
|
|
print("📋 4. Order Book Integration")
|
|
ob_result = analyzer.order_book_adjusted_drawdown(price_data, entry_price)
|
|
print(f" Liquidity Adjustment Factor: {ob_result['liquidity_adjustment_factor']:.3f}")
|
|
print(f" Final Adjusted Drawdown: ${ob_result['liquidity_adjusted_metrics'].avg_drawdown:.2f}")
|
|
print(f" Order Book - Bid Depth: {ob_result['order_book'].bid_depth:.0f}, "
|
|
f"Spread: ${ob_result['order_book'].spread:.4f}\n")
|
|
|
|
# 5. Drawdown Surface
|
|
print("🗺️ 5. Creating Drawdown Surface...")
|
|
surface = analyzer.create_drawdown_surface(price_data, resolution=0.01)
|
|
print(f" Surface calculated for {len(surface)} entry points")
|
|
print(f" Min Risk Entry: ${surface.loc[surface['avg_drawdown'].idxmin(), 'entry_price']:.2f} "
|
|
f"(DD: ${surface['avg_drawdown'].min():.2f})")
|
|
print(f" Max Risk Entry: ${surface.loc[surface['avg_drawdown'].idxmax(), 'entry_price']:.2f} "
|
|
f"(DD: ${surface['avg_drawdown'].max():.2f})\n")
|
|
|
|
# 6. Entry Optimization
|
|
print("🎯 6. Entry Optimization")
|
|
optimal_entries = analyzer.optimize_entries(price_data, reward_target=0.03, max_entries=5)
|
|
if not optimal_entries.empty:
|
|
print(" Top 5 Optimal Entry Points:")
|
|
for i, row in optimal_entries.iterrows():
|
|
print(f" ${row['entry_price']:.2f} - Risk/Reward: {row['risk_reward_ratio']:.2f}, "
|
|
f"Score: {row['score']:.3f}")
|
|
print()
|
|
|
|
# 7. Regime Detection
|
|
print("🚨 7. Regime Detection")
|
|
current_price = price_data.iloc[-1]
|
|
current_drawdown = max(0, entry_price - current_price)
|
|
regime_info = analyzer.detect_abnormal_regime(price_data, entry_price, current_drawdown)
|
|
print(f" Current Drawdown: ${current_drawdown:.2f}")
|
|
print(f" Expected Drawdown: ${regime_info['expected_drawdown']:.2f}")
|
|
print(f" Severity Score: {regime_info['severity_score']:.2f}")
|
|
print(f" Abnormal Regime: {regime_info['is_abnormal_regime']}")
|
|
print(f" Recommendation: {regime_info['recommendation']}\n")
|
|
|
|
# 8. Backtesting
|
|
print("📊 8. Strategy Backtesting")
|
|
# Create simple entry signals (random for demo)
|
|
np.random.seed(42)
|
|
signals = pd.Series(np.random.choice([0, 1, -1], size=len(price_data), p=[0.8, 0.1, 0.1]),
|
|
index=price_data.index)
|
|
|
|
backtest_result = analyzer.backtest_strategy(price_data, signals, exit_method='adaptive_stop')
|
|
print(f" Total Return: {backtest_result['total_return']:.1%}")
|
|
print(f" Win Rate: {backtest_result['win_rate']:.1%}")
|
|
print(f" Sharpe Ratio: {backtest_result['sharpe_ratio']:.2f}")
|
|
print(f" Number of Trades: {backtest_result['num_trades']}")
|
|
print(f" Average Return per Trade: {backtest_result['avg_return_per_trade']:.1%}\n")
|
|
|
|
print("✅ Framework demonstration complete!")
|
|
print("📈 Use the visualizer to create plots:")
|
|
print(" - visualizer.plot_price_with_drawdowns(price_data, entry_price)")
|
|
print(" - visualizer.plot_drawdown_surface(price_data)")
|
|
print(" - visualizer.plot_regime_analysis(price_data, entry_price)")
|
|
|
|
return {
|
|
'price_data': price_data,
|
|
'entry_price': entry_price,
|
|
'analyzer': analyzer,
|
|
'visualizer': visualizer,
|
|
'surface': surface,
|
|
'backtest_result': backtest_result
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
# Run the comprehensive demo
|
|
demo_results = comprehensive_demo()
|