| Overall Statistics |
|
Total Orders 85 Average Win 1.57% Average Loss -0.78% Compounding Annual Return 85.302% Drawdown 6.400% Expectancy 0.649 Start Equity 100000 End Equity 122610.92 Net Profit 22.611% Sharpe Ratio 2.869 Sortino Ratio 3.43 Probabilistic Sharpe Ratio 80.913% Loss Rate 45% Win Rate 55% Profit-Loss Ratio 2.01 Alpha 0.582 Beta 0.141 Annual Standard Deviation 0.195 Annual Variance 0.038 Information Ratio 2.755 Tracking Error 0.265 Treynor Ratio 3.962 Total Fees $524.30 Estimated Strategy Capacity $1000000.00 Lowest Capacity Asset SH TJNNZWL5I4IT Portfolio Turnover 70.00% Drawdown Recovery 26 |
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
import torch
import torch.nn as nn
class HARNet(nn.Module):
"""CNN-LSTM hybrid model for volatility prediction."""
def __init__(self, dropout_rate=0.3):
super().__init__()
self.conv_short = nn.Conv1d(4, 32, kernel_size=3, padding=1)
self.conv_medium = nn.Conv1d(4, 32, kernel_size=5, padding=2)
self.conv_long = nn.Conv1d(4, 32, kernel_size=11, padding=5)
self.batch_norm = nn.BatchNorm1d(96)
self.relu = nn.ReLU()
self.dropout_conv = nn.Dropout(dropout_rate)
self.lstm = nn.LSTM(96, 64, batch_first=True)
self.layer_norm = nn.LayerNorm(64)
self.dropout_fc = nn.Dropout(dropout_rate)
self.fc = nn.Sequential(
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1),
nn.Softplus()
)
def forward(self, x):
# CNN-LSTM hybrid model for volatility prediction.
x = x.permute(0, 2, 1)
s = self.relu(self.conv_short(x))
m = self.relu(self.conv_medium(x))
l = self.relu(self.conv_long(x))
x = torch.cat([s, m, l], dim=1)
x = self.batch_norm(x)
x = self.dropout_conv(x)
x = x.permute(0, 2, 1)
out, _ = self.lstm(x)
out = self.layer_norm(out[:, -1, :])
out = self.dropout_fc(out)
return self.fc(out).squeeze(-1)
class Regime:
LONG = 'LONG'
HEDGE = 'HEDGE'
NEUTRAL = 'NEUTRAL'
class VolatiltyModel(QCAlgorithm):
"""Volatility-target algorithm using CNN-LSTM predictions."""
def initialize(self):
self.set_start_date(2022, 2, 1)
self.set_end_date(2022, 6, 1)
self.set_cash(100_000)
self.settings.seed_initial_prices = True
self._spy = self.add_equity("SPY")
self._hedge = self.add_equity("SH")
# Model parameters.
self._lookback = 25
self._future_horizon = 25
self._window = RollingWindow[TradeBar](self._lookback)
self._model = HARNet()
self._model_is_training = True
# Trading parameters.
self._target_annual_vol = 0.12
self._cooldown_bars = 20
self._cooldown_remaining = 0
# WFO parameter search space for target volatility candidates.
self._parameter_sets = [{'target_vol': v} for v in [0.08, 0.10, 0.12, 0.14, 0.16]]
# SMA indicator for trend filtering.
self._sma_50 = self.sma(self._spy, 50)
# 15-minute consolidator for bar aggregation and trading signals.
consolidator = TradeBarConsolidator(timedelta(minutes=15))
consolidator.data_consolidated += self._on_15_min_bar
self.subscription_manager.add_consolidator(self._spy, consolidator)
# Train model and run initial WFO on historical data.
self.train(self.train_model)
self.train(lambda: self._do_wfo(self._vol_optimization_func, max, self._sharpe_objective))
# Retrain model and re-optimize monthly before market open.
self.train(
self.date_rules.month_start(self._spy),
self.time_rules.before_market_open(self._spy, 30),
self.train_model
)
self.train(
self.date_rules.month_start(self._spy),
self.time_rules.before_market_open(self._spy, 30),
lambda: self._do_wfo(self._vol_optimization_func, max, self._sharpe_objective)
)
def train_model(self):
self._model_is_training = True
# Fetch 120K minute bars for training data.
history = self.history(self._spy, 120_000)
if history.empty:
return
df = history.reset_index()
df = df[['time', 'open', 'high', 'low', 'close']]
df.set_index('time', inplace=True)
# Resample to 15-minute bars.
df = df.resample('15T').agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last'
}).dropna()
prices = df[['open', 'high', 'low', 'close']].values
closes = df['close'].values
# Generate training samples: lookback windows and future realized volatility.
x_data, y_data = [], []
for i in range(self._lookback, len(df) - self._future_horizon):
window = prices[i - self._lookback:i]
future_closes = closes[i:i + self._future_horizon - 1]
future_returns = np.diff(closes[i:i + self._future_horizon]) / future_closes
realized_vol = np.sqrt(np.sum(future_returns ** 2))
x_data.append(window)
y_data.append(realized_vol)
x_arr = np.array(x_data)
y_arr = np.array(y_data)
# Per-sample normalization for stable training.
mean = x_arr.mean(axis=1, keepdims=True)
std = x_arr.std(axis=1, keepdims=True) + 1e-6
x_arr = (x_arr - mean) / std
# Train-validation split.
split = int(0.7 * len(x_arr))
x_train, x_val = x_arr[:split], x_arr[split:]
y_train, y_val = y_arr[:split], y_arr[split:]
# Convert to tensors.
x_train = torch.tensor(x_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
x_val = torch.tensor(x_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
# Training loop with early stopping.
optimizer = torch.optim.Adam(self._model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
best_val = float("inf")
patience = 4
patience_left = patience
# 30 epoch training.
for _ in range(30):
self._model.train()
optimizer.zero_grad()
preds = self._model(x_train)
loss = loss_fn(preds, y_train)
loss.backward()
optimizer.step()
# Evaluate on validation set.
self._model.eval()
with torch.no_grad():
val_preds = self._model(x_val)
val_loss = loss_fn(val_preds, y_val)
# Early stop if validation loss plateaus.
if val_loss < best_val:
best_val = val_loss
patience_left = patience
else:
patience_left -= 1
if patience_left == 0:
break
# Set regime thresholds based on validation predictions percentiles.
# Wider percentiles (15/85) give more hysteresis; minimum 1.3x ratio
# prevents band collapse when prediction distribution is right-skewed.
preds_np = val_preds.numpy()
self._entry_threshold = np.percentile(preds_np, 15)
self._exit_threshold = np.percentile(preds_np, 85)
self._exit_threshold = max(self._exit_threshold, self._entry_threshold * 1.3)
# Chart training diagnostics.
self.plot("Model Training", "Train Loss", float(loss.item()))
self.plot("Model Training", "Val Loss", float(val_loss.item()))
self.plot("Regime Thresholds", "Entry Threshold", float(self._entry_threshold))
self.plot("Regime Thresholds", "Exit Threshold", float(self._exit_threshold))
self.debug(
f"Training — train_loss={loss.item():.6f}, val_loss={val_loss.item():.6f} | "
f"entry={self._entry_threshold:.6f}, exit={self._exit_threshold:.6f} | "
f"pred_vol range=[{preds_np.min():.6f}, {preds_np.max():.6f}]"
)
self._model.eval()
self._model_is_training = False
def _sharpe_objective(self, returns):
if returns.std() == 0 or len(returns) < 10:
return -np.inf
return returns.mean() / returns.std() * np.sqrt(252 * 26)
def _vol_optimization_func(self, data, parameter_set, objective):
returns = data['close'].pct_change().dropna()
rolling_vol = returns.rolling(self._lookback).std() * np.sqrt(252 * 26)
position_sizes = (parameter_set['target_vol'] / rolling_vol.clip(lower=0.01)).clip(0, 1)
strategy_returns = position_sizes.iloc[self._lookback:] * returns.iloc[self._lookback:]
return objective(strategy_returns)
def _do_wfo(self, optimization_func, min_max, objective):
# Fetch ~3 months of minute data and resample to 15-min bars.
history = self.history(self._spy, timedelta(days=90))
if history.empty:
return
df = history.reset_index()[['time', 'close']].set_index('time')
data = df.resample('15T').last().dropna()
scores = [
optimization_func(data, param_set, objective)
for param_set in self._parameter_sets
]
best_idx = scores.index(min_max(scores))
self._update_vol_target(self._parameter_sets[best_idx])
def _update_vol_target(self, optimal_parameters):
self._target_annual_vol = optimal_parameters['target_vol']
self.debug(f"WFO Optimized Target Vol: {self._target_annual_vol:.4f}")
def _on_15_min_bar(self, sender, bar):
if self._model_is_training or not self._sma_50.is_ready:
return
# Maintain rolling window of OHLC bars.
self._window.add(bar)
if not self._window.is_ready:
return
# Skip trading during cooldown period to prevent oversignaling.
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
return
# Predict volatility from the rolling window.
window = np.array([[b.open, b.high, b.low, b.close] for b in self._window])[::-1]
window_mean = window.mean(axis=0)
window_std = window.std(axis=0)
window = (window - window_mean) / (window_std + 1e-6)
x = torch.tensor(window, dtype=torch.float32).unsqueeze(0)
# Generate volatility prediction and scale to annual basis.
with torch.no_grad():
pred_vol = self._model(x).item()
# State machine: transition only when regime changes.
trend_up = bar.close > self._sma_50.current.value
desired = self._desired_regime(pred_vol, trend_up)
current = self._current_regime()
if desired == Regime.NEUTRAL or desired == current:
return
# Chart and log regime transition signal.
self.plot("Volatility Signal", "Pred Vol", pred_vol)
self.plot("Volatility Signal", "Entry Threshold", self._entry_threshold)
self.plot("Volatility Signal", "Exit Threshold", self._exit_threshold)
self.debug(f"Regime: {current} → {desired} | pred_vol={pred_vol:.6f} | trend_up={trend_up}")
if desired == Regime.LONG:
if self._hedge.holdings.invested:
self.liquidate(self._hedge)
self.set_holdings(self._spy, 1.0)
else: # Regime.HEDGE
if self._spy.holdings.invested:
self.liquidate(self._spy)
self.set_holdings(self._hedge, 1.0)
self._cooldown_remaining = self._cooldown_bars
def _desired_regime(self, pred_vol: float, trend_up: bool) -> str:
if pred_vol < self._entry_threshold and trend_up:
return Regime.LONG
if pred_vol > self._exit_threshold:
return Regime.HEDGE
return Regime.NEUTRAL
def _current_regime(self) -> str:
if self._spy.holdings.invested:
return Regime.LONG
if self._hedge.holdings.invested:
return Regime.HEDGE
return Regime.NEUTRAL