| Overall Statistics |
|
Total Orders 5636 Average Win 0.18% Average Loss -0.09% Compounding Annual Return 14.656% Drawdown 37.300% Expectancy 1.047 Start Equity 100000 End Equity 1400724.51 Net Profit 1300.725% Sharpe Ratio 0.644 Sortino Ratio 0.701 Probabilistic Sharpe Ratio 10.439% Loss Rate 33% Win Rate 67% Profit-Loss Ratio 2.05 Alpha 0.043 Beta 0.682 Annual Standard Deviation 0.136 Annual Variance 0.019 Information Ratio 0.239 Tracking Error 0.095 Treynor Ratio 0.129 Total Fees $6330.41 Estimated Strategy Capacity $470000000.00 Lowest Capacity Asset TLT SGNKIKYGE9NP Portfolio Turnover 1.76% Drawdown Recovery 890 |
from AlgorithmImports import *
import torch
from models import VolatilityPredictor
from plot import VolatilityPlotter, SignalPlotState
# Volatility-targeting algorithm that rotates between QQQ and T-bills using CNN-LSTM regime signals.
class VolatiltyModel(QCAlgorithm):
def initialize(self):
torch.manual_seed(0)
self.set_start_date(2007,1,1)
self.set_cash(100_000)
self.settings.seed_initial_prices = True
self._qqq = self.add_equity("QQQ", Resolution.DAILY)
self._hedge = self.add_equity("TLT", Resolution.DAILY)
self._qqq.session.size = 1300
self._lookback = 22
self._future_horizon = 22
self._model = VolatilityPredictor(self._lookback, self._future_horizon)
self._model_is_training = True
self._target_vol = 0.12
self._target_vol_sma = SimpleMovingAverage(3)
self._pred_vol_sma = SimpleMovingAverage(5)
self._plotter = VolatilityPlotter(self)
self.set_warm_up(timedelta(365*4))
def on_warmup_finished(self):
time_rule = self.time_rules.before_market_open(self._qqq, 30)
date_rule = self.date_rules.month_start(self._qqq)
# Schedule monthly retraining and walk-forward optimization before the opening trade.
self.train(date_rule, time_rule, self._train_model)
self.train(date_rule, time_rule, self._do_wfo)
# Run the first training cycle as soon as warm-up finishes.
if self.live_mode:
self._train_model()
self._do_wfo()
else:
self.train(self.date_rules.today, time_rule, self._train_model)
self.train(self.date_rules.today, time_rule, self._do_wfo)
def _portfolio_weight(self, security):
return security.holdings.holdings_value / max(self.portfolio.total_portfolio_value, 1e-6)
def _train_model(self):
self._model_is_training = True
bars = list(self._qqq.session)[1:][::-1]
train_loss, val_loss, rank_corr, pred_error_std = self._model.fit(bars)
self._plotter.plot_training(train_loss, val_loss, rank_corr, pred_error_std)
self._model_is_training = False
def _do_wfo(self):
# Run the WFO grid search on the 65 most recent completed daily bars.
bars = list(self._qqq.session)[1:66][::-1]
vol_grid = [0.08, 0.10, 0.12, 0.14, 0.16]
returns = pd.Series([bar.close for bar in bars]).pct_change().dropna()
rolling_vol = returns.rolling(self._lookback).std() * np.sqrt(252)
scores = []
# Evaluate each candidate target volatility over the recent volatility window.
for vol in vol_grid:
position_sizes = (vol / rolling_vol.clip(lower=0.01)).clip(0, 1)
strategy_returns = position_sizes.iloc[self._lookback:-1] * returns.iloc[self._lookback + 1:]
# Annualize the candidate Sharpe ratio.
scores.append(strategy_returns.mean() / strategy_returns.std() * np.sqrt(252))
raw_vol = vol_grid[scores.index(max(scores))]
# Seed the SMA with the initial target vol on the first call.
if self._target_vol_sma.samples == 0:
self._target_vol_sma.update(bars[-1].end_time - timedelta(30), self._target_vol)
self._target_vol_sma.update(bars[-1].end_time, raw_vol)
self._target_vol = self._target_vol_sma.current.value
self._plotter.plot_target_vol(raw_vol, self._target_vol)
def on_data(self, data):
if self._model_is_training or self._qqq not in data.bars:
return
bar = data.bars[self._qqq]
prev_bars = list(self._qqq.session)[1:self._lookback][::-1]
pred_vol = self._model.predict(prev_bars, bar)
self._pred_vol_sma.update(bar.end_time, pred_vol)
pred_vol_smooth = self._pred_vol_sma.current.value
ann_pred_vol = pred_vol_smooth * np.sqrt(252)
target_weight = np.clip(self._target_vol / max(ann_pred_vol, 1e-6), 0.0, 0.95)
state = SignalPlotState(
bar.close, pred_vol, pred_vol_smooth,
target_weight, self._portfolio_weight(self._qqq), self._target_vol
)
self._plotter.plot_signal_state(state)
targets = [PortfolioTarget(self._qqq, target_weight), PortfolioTarget(self._hedge, 1 - target_weight)]
self.set_holdings(targets)
import math
import numpy as np
from scipy.stats import spearmanr
import torch
import torch.nn as nn
# CNN-LSTM hybrid that extracts multi-scale volatility features via parallel convolutions.
class HARNet(nn.Module):
def __init__(self, dropout_rate=0.3):
super().__init__()
# Three parallel Conv1d kernels capture short, medium, and long-horizon patterns.
self.conv_short = nn.Conv1d(1, 32, kernel_size=3, padding=1)
self.conv_medium = nn.Conv1d(1, 32, kernel_size=5, padding=2)
self.conv_long = nn.Conv1d(1, 32, kernel_size=11, padding=5)
self.norm = nn.InstanceNorm1d(96, affine=True)
self.relu = nn.ReLU()
self.dropout_conv = nn.Dropout(dropout_rate)
self.lstm = nn.LSTM(96, 64, batch_first=True)
self.layer_norm = nn.LayerNorm(64)
self.dropout_fc = nn.Dropout(dropout_rate)
self.fc = nn.Sequential(
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1),
)
def forward(self, x):
# Permute to (batch, features, time) required by Conv1d.
x = x.permute(0, 2, 1)
s = self.relu(self.conv_short(x))
m = self.relu(self.conv_medium(x))
l = self.relu(self.conv_long(x))
# Concatenate the three feature maps along the channel axis.
x = torch.cat([s, m, l], dim=1)
x = self.norm(x)
x = self.dropout_conv(x)
# Permute back to (batch, time, features) required by LSTM.
x = x.permute(0, 2, 1)
out, _ = self.lstm(x)
# Take the final hidden state and apply layer norm before the output head.
out = self.layer_norm(out[:, -1, :])
out = self.dropout_fc(out)
return self.fc(out).squeeze(-1)
# Wraps HARNet with data preparation, training, and inference.
class VolatilityPredictor:
def __init__(self, lookback=25, future_horizon=25):
self.lookback = lookback
self.future_horizon = future_horizon
# Use GPU for training and inference if available.
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
def _parkinson_series(self, bars):
# Compute the Parkinson volatility estimator series.
return np.array([math.sqrt((1.0 / (4.0 * math.log(2))) * math.log(max(bar.high / bar.low, 1.0 + 1e-9)) ** 2) for bar in bars])
def _normalize_windows(self, windows):
axis = 1 if windows.ndim == 3 else 0
mean = windows.mean(axis=axis, keepdims=True)
std = windows.std(axis=axis, keepdims=True) + 1e-6
return (windows - mean) / std
def fit(self, bars):
# Start each retrain from fresh random weights.
self.net = HARNet().to(self.device)
park_arr = self._parkinson_series(bars)
# Build sliding Parkinson windows; label = mean daily Parkinson vol over the future horizon.
x_data, y_data = [], []
for i in range(self.lookback, len(park_arr) - self.future_horizon):
x_data.append(park_arr[i - self.lookback:i].reshape(-1, 1))
y_data.append(np.log(np.mean(park_arr[i:i + self.future_horizon])))
x_arr = self._normalize_windows(np.array(x_data))
y_arr = np.array(y_data)
# Reserve the newest samples for validation to avoid time leakage.
split_idx = int(0.7 * len(x_arr))
x_train = torch.tensor(x_arr[:split_idx], dtype=torch.float32).to(self.device)
y_train = torch.tensor(y_arr[:split_idx], dtype=torch.float32).to(self.device)
x_val = torch.tensor(x_arr[split_idx:], dtype=torch.float32).to(self.device)
y_val = torch.tensor(y_arr[split_idx:], dtype=torch.float32).to(self.device)
loss_fn = nn.MSELoss()
optimizer = torch.optim.Adam(self.net.parameters(), lr=1e-3)
best_val = float("inf")
best_state = None
patience_left = 6
batch_size = 256
for _ in range(50):
self.net.train()
# Shuffle sample order each epoch to reduce gradient bias across mini-batches.
indices = torch.randperm(len(x_train))
for start in range(0, len(x_train), batch_size):
batch_idx = indices[start:start + batch_size]
optimizer.zero_grad()
loss = loss_fn(self.net(x_train[batch_idx]), y_train[batch_idx])
loss.backward()
# Clip gradients to a unit norm to prevent exploding gradients.
torch.nn.utils.clip_grad_norm_(self.net.parameters(), max_norm=1.0)
optimizer.step()
self.net.eval()
with torch.no_grad():
val_loss = loss_fn(self.net(x_val), y_val).item()
# Save the best checkpoint and reset patience whenever validation loss improves.
if val_loss < best_val:
best_val = val_loss
best_state = {k: v.detach().clone() for k, v in self.net.state_dict().items()}
patience_left = 6
else:
patience_left -= 1
if patience_left == 0:
break
self.net.load_state_dict(best_state)
self.net.eval()
with torch.no_grad():
train_loss = loss_fn(self.net(x_train), y_train).item()
val_loss = loss_fn(self.net(x_val), y_val).item()
preds_np = self.net(x_val).cpu().detach().numpy()
y_val_np = y_val.cpu().detach().numpy()
residuals = preds_np - y_val_np
prediction_error_std = np.std(residuals) + 1e-6
validation_rank_corr = spearmanr(y_val_np, preds_np).statistic
return train_loss, val_loss, validation_rank_corr, prediction_error_std
def predict(self, prev_bars, current_bar):
park_vals = self._parkinson_series(prev_bars + [current_bar])
window = self._normalize_windows(park_vals.reshape(-1, 1))
x = torch.tensor(window, dtype=torch.float32).unsqueeze(0).to(self.device)
self.net.eval()
with torch.no_grad():
return np.exp(self.net(x).item())
from AlgorithmImports import *
class SignalPlotState:
def __init__(self, bar_close, pred_vol, pred_vol_smooth, target_weight, actual_weight, target_vol):
self.bar_close = bar_close
self.pred_vol = pred_vol
self.pred_vol_smooth = pred_vol_smooth
self.target_weight = target_weight
self.actual_weight = actual_weight
self.target_vol = target_vol
class VolatilityPlotter:
def __init__(self, algorithm):
self._algorithm = algorithm
def _plot_series(self, chart_name, series):
for name, value in series:
self._algorithm.plot(chart_name, name, value)
def plot_training(self, train_loss, val_loss, validation_rank_corr, prediction_error_std):
self._plot_series(
"Model Training",
[("Train Loss", train_loss), ("Val Loss", val_loss)],
)
self._plot_series(
"Validation Diagnostics",
[("Rank Corr", validation_rank_corr), ("Pred Error Std", prediction_error_std)],
)
def plot_target_vol(self, raw_vol, smooth_vol):
# Plot the walk-forward target volatility selected by the WFO grid search.
self._plot_series("Target Vol", [("Raw", raw_vol), ("Smooth", smooth_vol)])
def plot_signal_state(self, state):
self._plot_series(
"QQQ",
[("Price", state.bar_close)],
)
self._plot_series(
"Volatility Signal",
[
("Pred Vol", state.pred_vol),
("Pred Vol Smooth", state.pred_vol_smooth),
],
)
self._plot_series(
"Allocation",
[
("QQQTargetWeight", state.target_weight),
("QQQActualWeight", state.actual_weight),
("TargetVol", state.target_vol),
],
)