| Overall Statistics |
|
Total Orders 39 Average Win 2.03% Average Loss -1.81% Compounding Annual Return 13.375% Drawdown 6.800% Expectancy 0.940 Start Equity 100000 End Equity 148598.60 Net Profit 48.599% Sharpe Ratio 0.592 Sortino Ratio 0.562 Probabilistic Sharpe Ratio 77.265% Loss Rate 8% Win Rate 92% Profit-Loss Ratio 1.12 Alpha 0.007 Beta 0.308 Annual Standard Deviation 0.066 Annual Variance 0.004 Information Ratio -0.623 Tracking Error 0.102 Treynor Ratio 0.126 Total Fees $40.31 Estimated Strategy Capacity $150000000.00 Lowest Capacity Asset SPY R735QTJ8XC9X Portfolio Turnover 1.31% Drawdown Recovery 203 |
# region imports
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
import torch
import torch.nn as nn
# endregion
class HARNet(nn.Module):
"""CNN-LSTM hybrid model for volatility prediction."""
def __init__(self, dropout_rate=0.3):
super().__init__()
self.conv_short = nn.Conv1d(4, 32, kernel_size=3, padding=1)
self.conv_medium = nn.Conv1d(4, 32, kernel_size=5, padding=2)
self.conv_long = nn.Conv1d(4, 32, kernel_size=11, padding=5)
self.norm = nn.InstanceNorm1d(96, affine=True)
self.relu = nn.ReLU()
self.dropout_conv = nn.Dropout(dropout_rate)
self.lstm = nn.LSTM(96, 64, batch_first=True)
self.layer_norm = nn.LayerNorm(64)
self.dropout_fc = nn.Dropout(dropout_rate)
self.fc = nn.Sequential(
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1),
nn.Softplus()
)
def forward(self, x):
x = x.permute(0, 2, 1)
s = self.relu(self.conv_short(x))
m = self.relu(self.conv_medium(x))
l = self.relu(self.conv_long(x))
x = torch.cat([s, m, l], dim=1)
x = self.norm(x)
x = self.dropout_conv(x)
x = x.permute(0, 2, 1)
out, _ = self.lstm(x)
out = self.layer_norm(out[:, -1, :])
out = self.dropout_fc(out)
return self.fc(out).squeeze(-1)
class Regime:
LONG = 'LONG'
CASH = 'CASH'
NEUTRAL = 'NEUTRAL'
class VolatiltyModel(QCAlgorithm):
"""Volatility-target algorithm using CNN-LSTM predictions."""
def initialize(self):
self.set_start_date(2023, 1, 3)
self.set_end_date(2026, 3, 1)
self.set_cash(100_000)
self.settings.seed_initial_prices = True
self._spy = self.add_equity("SPY")
# Model parameters.
self._lookback = 25
self._future_horizon = 25
self._training_history_days = 365 * 5
self._entry_percentile = 15
self._exit_percentile = 85
self._min_exit_ratio = 1.3
self._daily_window = RollingWindow[TradeBar](self._lookback)
self._model = HARNet()
self._model_is_training = True
# Trading parameters.
self._target_annual_vol = 0.12
self._cooldown_days = 10
self._cooldown_remaining = 0
self._confirm_bars_required = 2
self._confirm_count = 0
self._confirm_direction = Regime.NEUTRAL
self._rebalance_weight_tolerance = 0.10
# WFO parameter search space for target volatility candidates.
self._parameter_sets = [{'target_vol': v} for v in [0.08, 0.10, 0.12, 0.14, 0.16]]
# SMA indicator for trend filtering (daily resolution).
self._sma_50 = self.sma(self._spy, 50, Resolution.DAILY)
# Daily consolidator for regime detection.
daily_consolidator = TradeBarConsolidator(timedelta(days=1))
daily_consolidator.data_consolidated += self._on_daily_bar
self.subscription_manager.add_consolidator(self._spy, daily_consolidator)
# Train model and run initial WFO on historical data.
self.train(self.train_model)
self.train(lambda: self._do_wfo(self._vol_optimization_func, max, self._sharpe_objective))
# Retrain model and re-optimize monthly before market open.
self.train(
self.date_rules.month_start(self._spy),
self.time_rules.before_market_open(self._spy, 30),
self.train_model
)
self.train(
self.date_rules.month_start(self._spy),
self.time_rules.before_market_open(self._spy, 30),
lambda: self._do_wfo(self._vol_optimization_func, max, self._sharpe_objective)
)
def train_model(self):
self._model_is_training = True
self._model = HARNet()
history = self.history(
self._spy,
timedelta(days=self._training_history_days),
Resolution.DAILY
)
if history.empty:
self._finish_training()
return
df = history[['open', 'high', 'low', 'close']].copy()
if getattr(df.index, "nlevels", 1) > 1:
df = df.droplevel(0)
df = df.sort_index().dropna()
prices = df[['open', 'high', 'low', 'close']].values
closes = df['close'].values
x_data, y_data = [], []
for i in range(self._lookback, len(df) - self._future_horizon):
window = prices[i - self._lookback:i]
future_closes = closes[i:i + self._future_horizon]
future_returns = np.diff(closes[i:i + self._future_horizon + 1]) / future_closes
realized_vol = np.sqrt(np.sum(future_returns ** 2))
x_data.append(window)
y_data.append(realized_vol)
if not x_data:
self._finish_training()
return
x_arr = np.array(x_data)
y_arr = np.array(y_data)
mean = x_arr.mean(axis=1, keepdims=True)
std = x_arr.std(axis=1, keepdims=True) + 1e-6
x_arr = (x_arr - mean) / std
split = int(0.7 * len(x_arr))
if split == 0 or split >= len(x_arr):
self._finish_training()
return
x_train, x_val = x_arr[:split], x_arr[split:]
y_train, y_val = y_arr[:split], y_arr[split:]
x_train = torch.tensor(x_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
x_val = torch.tensor(x_val, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
optimizer = torch.optim.Adam(self._model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
best_val = float("inf")
best_train_loss = None
best_state = None
patience = 6
patience_left = patience
batch_size = 256
for _ in range(50):
self._model.train()
indices = torch.randperm(len(x_train))
epoch_losses = []
for start in range(0, len(x_train), batch_size):
batch_idx = indices[start:start + batch_size]
optimizer.zero_grad()
preds = self._model(x_train[batch_idx])
loss = loss_fn(preds, y_train[batch_idx])
loss.backward()
torch.nn.utils.clip_grad_norm_(self._model.parameters(), max_norm=1.0)
optimizer.step()
epoch_losses.append(loss.item())
train_loss = float(np.mean(epoch_losses))
self._model.eval()
with torch.no_grad():
val_preds = self._model(x_val)
val_loss = float(loss_fn(val_preds, y_val).item())
if val_loss < best_val:
best_val = val_loss
best_train_loss = train_loss
best_state = {
key: value.detach().clone()
for key, value in self._model.state_dict().items()
}
patience_left = patience
else:
patience_left -= 1
if patience_left == 0:
break
if best_state is None:
self._finish_training()
return
self._model.load_state_dict(best_state)
self._model.eval()
with torch.no_grad():
preds_np = self._model(x_val).detach().numpy()
self._entry_threshold = np.percentile(preds_np, self._entry_percentile)
self._exit_threshold = np.percentile(preds_np, self._exit_percentile)
self._exit_threshold = max(
self._exit_threshold,
self._entry_threshold * self._min_exit_ratio
)
self.plot("Model Training", "Train Loss", float(best_train_loss))
self.plot("Model Training", "Val Loss", float(best_val))
self.plot("Regime Thresholds", "Entry Threshold", float(self._entry_threshold))
self.plot("Regime Thresholds", "Exit Threshold", float(self._exit_threshold))
self.debug(
f"Training - train_loss={best_train_loss:.6f}, val_loss={best_val:.6f} | "
f"entry={self._entry_threshold:.6f}, exit={self._exit_threshold:.6f} | "
f"pred_vol range=[{preds_np.min():.6f}, {preds_np.max():.6f}]"
)
self._finish_training()
def _sharpe_objective(self, returns):
if returns.std() == 0 or len(returns) < 10:
return -np.inf
return returns.mean() / returns.std() * np.sqrt(252)
def _vol_optimization_func(self, data, parameter_set, objective):
returns = data['close'].pct_change().dropna()
rolling_vol = returns.rolling(self._lookback).std() * np.sqrt(252)
position_sizes = (parameter_set['target_vol'] / rolling_vol.clip(lower=0.01)).clip(0, 1)
strategy_returns = position_sizes.iloc[self._lookback:] * returns.iloc[self._lookback:]
return objective(strategy_returns)
def _do_wfo(self, optimization_func, min_max, objective):
history = self.history(self._spy, timedelta(days=90), Resolution.DAILY)
if history.empty:
return
data = history[['close']].copy()
if getattr(data.index, "nlevels", 1) > 1:
data = data.droplevel(0)
data = data.sort_index().dropna()
scores = [
optimization_func(data, param_set, objective)
for param_set in self._parameter_sets
]
best_idx = scores.index(min_max(scores))
self._update_vol_target(self._parameter_sets[best_idx])
def _update_vol_target(self, optimal_parameters):
self._target_annual_vol = optimal_parameters['target_vol']
self.debug(f"WFO Optimized Target Vol: {self._target_annual_vol:.4f}")
def _on_daily_bar(self, sender, bar):
if self._model_is_training or not self._sma_50.is_ready:
return
self._daily_window.add(bar)
if not self._daily_window.is_ready:
return
if self._cooldown_remaining > 0:
self._cooldown_remaining -= 1
return
window = np.array([[b.open, b.high, b.low, b.close] for b in self._daily_window])[::-1]
window_mean = window.mean(axis=0)
window_std = window.std(axis=0)
window = (window - window_mean) / (window_std + 1e-6)
x = torch.tensor(window, dtype=torch.float32).unsqueeze(0)
with torch.no_grad():
pred_vol = self._model(x).item()
trend_up = bar.close > self._sma_50.current.value
desired = self._desired_regime(pred_vol, trend_up)
current = self._current_regime()
target_weight = self._target_weight(pred_vol)
self.plot("Volatility Signal", "Pred Vol", pred_vol)
self.plot("Volatility Signal", "Entry Threshold", self._entry_threshold)
self.plot("Volatility Signal", "Exit Threshold", self._exit_threshold)
if current == Regime.LONG and trend_up and pred_vol < self._exit_threshold:
self._rebalance_if_needed(pred_vol, target_weight)
if desired == Regime.NEUTRAL or desired == current:
if desired != self._confirm_direction:
self._confirm_count = 0
self._confirm_direction = Regime.NEUTRAL
return
if desired == self._confirm_direction:
self._confirm_count += 1
else:
self._confirm_direction = desired
self._confirm_count = 1
if self._confirm_count < self._confirm_bars_required:
return
self._confirm_count = 0
self._confirm_direction = Regime.NEUTRAL
self.debug(
f"Regime: {current} -> {desired} | pred_vol={pred_vol:.6f} | "
f"trend_up={trend_up} | weight={target_weight:.2f}"
)
if desired == Regime.LONG:
self.set_holdings(self._spy, target_weight)
else:
self.liquidate(self._spy)
self._cooldown_remaining = self._cooldown_days
def _desired_regime(self, pred_vol: float, trend_up: bool) -> str:
if pred_vol < self._entry_threshold and trend_up:
return Regime.LONG
if pred_vol > self._exit_threshold and not trend_up:
return Regime.CASH
return Regime.NEUTRAL
def _current_regime(self) -> str:
if self._spy.holdings.invested:
return Regime.LONG
return Regime.CASH
def _finish_training(self) -> None:
self._model.eval()
self._model_is_training = False
def _target_weight(self, pred_vol: float) -> float:
predicted_annual_vol = pred_vol * np.sqrt(252 / self._future_horizon)
return float(np.clip(self._target_annual_vol / max(predicted_annual_vol, 0.01), 0, 1))
def _rebalance_if_needed(self, pred_vol: float, target_weight: float) -> None:
if self.portfolio.total_portfolio_value <= 0:
return
current_weight = self._spy.holdings.holdings_value / self.portfolio.total_portfolio_value
if abs(current_weight - target_weight) < self._rebalance_weight_tolerance:
return
self.debug(
f"Rebalance LONG | pred_vol={pred_vol:.6f} | "
f"target_weight={target_weight:.2f} | current_weight={current_weight:.2f}"
)
self.set_holdings(self._spy, target_weight)