| Overall Statistics |
|
Total Orders 487 Average Win 0.71% Average Loss -0.69% Compounding Annual Return 0.868% Drawdown 19.700% Expectancy 0.034 Start Equity 100000 End Equity 104416.36 Net Profit 4.416% Sharpe Ratio -0.361 Sortino Ratio -0.397 Probabilistic Sharpe Ratio 0.848% Loss Rate 49% Win Rate 51% Profit-Loss Ratio 1.03 Alpha -0.014 Beta -0.231 Annual Standard Deviation 0.081 Annual Variance 0.006 Information Ratio -0.512 Tracking Error 0.188 Treynor Ratio 0.126 Total Fees $1929.01 Estimated Strategy Capacity $970000.00 Lowest Capacity Asset SH TJNNZWL5I4IT Portfolio Turnover 16.06% Drawdown Recovery 1386 |
from AlgorithmImports import *
import torch
import torch.nn as nn
class HARNet(nn.Module):
"""CNN-LSTM hybrid model for volatility prediction."""
def __init__(self, dropout_rate=0.3):
super().__init__()
self.conv_short = nn.Conv1d(4, 32, kernel_size=3, padding=1)
self.conv_medium = nn.Conv1d(4, 32, kernel_size=5, padding=2)
self.conv_long = nn.Conv1d(4, 32, kernel_size=11, padding=5)
self.batch_norm = nn.BatchNorm1d(96)
self.relu = nn.ReLU()
self.dropout_conv = nn.Dropout(dropout_rate)
self.lstm = nn.LSTM(96, 64, batch_first=True)
self.layer_norm = nn.LayerNorm(64)
self.dropout_fc = nn.Dropout(dropout_rate)
self.fc = nn.Sequential(
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1)
)
def forward(self, x):
# CNN-LSTM hybrid model for volatility prediction.
x = x.permute(0, 2, 1)
s = self.relu(self.conv_short(x))
m = self.relu(self.conv_medium(x))
l = self.relu(self.conv_long(x))
x = torch.cat([s, m, l], dim=1)
x = self.batch_norm(x)
x = self.dropout_conv(x)
x = x.permute(0, 2, 1)
out, _ = self.lstm(x)
out = self.layer_norm(out[:, -1, :])
out = self.dropout_fc(out)
return self.fc(out).squeeze(-1)
class VolatiltyModel(QCAlgorithm):
"""Volatility-target algorithm using CNN-LSTM predictions."""
def initialize(self):
self.set_start_date(self.end_date - timedelta(5 * 365))
self.set_cash(100_000)
self.settings.seed_initial_prices = True
self._spy = self.add_equity("SPY")
self._hedge = self.add_equity("SH")
# Model parameters.
self._lookback = 25
self._future_horizon = 25
self._window = RollingWindow[TradeBar](self._lookback)
self._model = HARNet()
self._model_trained = False
self._model_is_training = False
# Trading parameters.
self._target_annual_vol = 0.12
self._cooldown_bars = 20
self._cooldown_remaining = 0
# SMA indicator for trend filtering.
self._sma_50 = self.sma(self._spy, 50)
# 15-minute consolidator for bar aggregation and trading signals.
consolidator = TradeBarConsolidator(timedelta(minutes=15))
consolidator.data_consolidated += self._on_15_min_bar
self.subscription_manager.add_consolidator(self._spy, consolidator)
# Train model on historical data.
self.train(self.train_model)
# Retrain model monthly to adapt to market regime changes.
self.train(
self.date_rules.month_start(self._spy),
self.time_rules.after_market_open(self._spy, 30),
self.train_model
)
def train_model(self) -> None:
self._model_is_training = True
# Fetch 120K minute bars for training data.
history = self.history(self._spy, 120_000)
if history.empty:
return
df = history.reset_index()
df = df[['time', 'open', 'high', 'low', 'close']]
df.set_index('time', inplace=True)
# Resample to 15-minute bars.
df = df.resample('15T').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last'
}).dropna()
prices = df[['open', 'high', 'low', 'close']].values
closes = df['close'].values
# Generate training samples: lookback windows and future realized volatility.
x_data, y_data = [], []
for i in range(self._lookback, len(df) - self._future_horizon):
window = prices[i - self._lookback:i]
future_closes = closes[i:i + self._future_horizon - 1]
future_returns = np.diff(closes[i:i + self._future_horizon]) / future_closes
realized_vol = np.sqrt(np.sum(future_returns ** 2))
x_data.append(window)
y_data.append(realized_vol)
x_data = np.array(x_data)
y_data = np.array(y_data)
# Per-sample normalization for stable training.
mean = x_data.mean(axis=1, keepdims=True)
std = x_data.std(axis=1, keepdims=True) + 1e-6
x_data = (x_data - mean) / std
# Train-validation split.
split = int(0.7 * len(x_data))
x_train, x_val = x_data[:split], x_data[split:]
y_train, y_val = y_data[:split], y_data[split:]
# Convert to tensors.
x_train = torch.tensor(x_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
x_val = torch.tensor(x_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
# Training loop with early stopping.
optimizer = torch.optim.Adam(self._model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
best_val = float("inf")
patience = 4
patience_left = patience
# 30 epoch training.
for _ in range(30):
self._model.train()
optimizer.zero_grad()
preds = self._model(x_train)
loss = loss_fn(preds, y_train)
loss.backward()
optimizer.step()
# Evaluate on validation set.
self._model.eval()
with torch.no_grad():
val_preds = self._model(x_val)
val_loss = loss_fn(val_preds, y_val)
# Early stop if validation loss plateaus.
if val_loss < best_val:
best_val = val_loss
patience_left = patience
else:
patience_left -= 1
if patience_left == 0:
break
# Set regime thresholds based on validation predictions percentiles.
preds_np = val_preds.numpy()
self._entry_threshold = np.percentile(preds_np, 20)
self._exit_threshold = np.percentile(preds_np, 80)
# Optimize target volatility based on realized market volatility.
self._optimize_target_vol(df['close'])
self._model.eval()
self._model_is_training = False
self._model_trained = True
def _optimize_target_vol(self, close_prices: pd.Series) -> None:
"""Optimize target vol by testing candidates on recent data for best Sharpe ratio."""
# Use last ~1500 bars (~2 months of 15-min data).
recent_closes = close_prices.tail(1500)
if len(recent_closes) < self._lookback:
return
candidates = [0.08, 0.10, 0.12, 0.14, 0.16]
best_sharpe = -np.inf
best_vol = self._target_annual_vol
for target_vol in candidates:
sharpe = self._calculate_sharpe_for_vol(recent_closes, target_vol)
if sharpe > best_sharpe:
best_sharpe = sharpe
best_vol = target_vol
self._target_annual_vol = best_vol
self.debug(f"Optimized Target Vol: {self._target_annual_vol:.4f} (Best Sharpe: {best_sharpe:.3f})")
def _calculate_sharpe_for_vol(self, close_prices: pd.Series, target_vol: float) -> float:
"""Calculate Sharpe ratio for strategy with given target vol candidate."""
returns = close_prices.pct_change().dropna()
# Estimate position sizing based on rolling volatility.
rolling_vol = returns.rolling(self._lookback).std() * np.sqrt(252)
position_sizes = (target_vol / rolling_vol.clip(lower=0.01)).clip(0, 1)
# Strategy returns = position sizing * market returns.
strategy_returns = position_sizes.iloc[self._lookback:] * returns.iloc[self._lookback:]
if strategy_returns.std() == 0 or len(strategy_returns) < 10:
return -np.inf
sharpe = strategy_returns.mean() / strategy_returns.std() * np.sqrt(252)
return sharpe
def _on_15_min_bar(self, sender: object, bar: TradeBar) -> None:
# Guard clauses: exit early if model not ready or indicator not ready.
if not self._model_trained or self._model_is_training or not self._sma_50.is_ready:
return
# Maintain rolling window of OHLC bars.
self._window.add(bar)
if not self._window.is_ready:
return
# Skip trading during cooldown period to prevent oversignaling.
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
return
# Predict volatility from the rolling window.
window = np.array([[b.open, b.high, b.low, b.close] for b in self._window])
window_mean = window.mean(axis=0)
window_std = window.std(axis=0)
window = (window - window_mean) / (window_std + 1e-6)
x = torch.tensor(window, dtype=torch.float32).unsqueeze(0)
# Generate volatility prediction and scale to annual basis.
torch.no_grad()
pred_vol = self._model(x).item()
annual_vol = pred_vol * np.sqrt(252)
# Compute position size for volatility targeting.
position_size = self._target_annual_vol / max(annual_vol, 1e-4)
position_size = np.clip(position_size, 0, 1.0)
# Determine if trend is up based on SMA.
trend_up = bar.close > self._sma_50.current.value
# Trade logic: long SPY in low volatility with uptrend, else hedge with SH.
spy_qty = self._spy.holdings.quantity
hedge_qty = self._hedge.holdings.quantity
if pred_vol < self._entry_threshold and trend_up:
# Exit hedge if open before entering SPY.
if hedge_qty:
self.liquidate(self._hedge)
self._cooldown_remaining = self._cooldown_bars
elif not spy_qty:
self.set_holdings(self._spy, position_size)
self._cooldown_remaining = self._cooldown_bars
elif pred_vol > self._exit_threshold:
# Exit SPY if open before entering hedge.
if spy_qty:
self.liquidate(self._spy)
self._cooldown_remaining = self._cooldown_bars
elif not hedge_qty:
self.set_holdings(self._hedge, position_size)
self._cooldown_remaining = self._cooldown_bars