Overall Statistics
Total Orders
39
Average Win
2.03%
Average Loss
-1.81%
Compounding Annual Return
13.375%
Drawdown
6.800%
Expectancy
0.940
Start Equity
100000
End Equity
148598.60
Net Profit
48.599%
Sharpe Ratio
0.592
Sortino Ratio
0.562
Probabilistic Sharpe Ratio
77.265%
Loss Rate
8%
Win Rate
92%
Profit-Loss Ratio
1.12
Alpha
0.007
Beta
0.308
Annual Standard Deviation
0.066
Annual Variance
0.004
Information Ratio
-0.623
Tracking Error
0.102
Treynor Ratio
0.126
Total Fees
$40.31
Estimated Strategy Capacity
$150000000.00
Lowest Capacity Asset
SPY R735QTJ8XC9X
Portfolio Turnover
1.31%
Drawdown Recovery
203
# region imports
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
import torch
import torch.nn as nn
# endregion
 
 
class HARNet(nn.Module):
    """CNN-LSTM hybrid model for volatility prediction."""
 
    def __init__(self, dropout_rate=0.3):
        super().__init__()
        self.conv_short = nn.Conv1d(4, 32, kernel_size=3, padding=1)
        self.conv_medium = nn.Conv1d(4, 32, kernel_size=5, padding=2)
        self.conv_long = nn.Conv1d(4, 32, kernel_size=11, padding=5)
        self.norm = nn.InstanceNorm1d(96, affine=True)
        self.relu = nn.ReLU()
        self.dropout_conv = nn.Dropout(dropout_rate)
        self.lstm = nn.LSTM(96, 64, batch_first=True)
        self.layer_norm = nn.LayerNorm(64)
        self.dropout_fc = nn.Dropout(dropout_rate)
        self.fc = nn.Sequential(
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            nn.Softplus()
        )
 
    def forward(self, x):
        x = x.permute(0, 2, 1)
        s = self.relu(self.conv_short(x))
        m = self.relu(self.conv_medium(x))
        l = self.relu(self.conv_long(x))
        x = torch.cat([s, m, l], dim=1)
        x = self.norm(x)
        x = self.dropout_conv(x)
        x = x.permute(0, 2, 1)
        out, _ = self.lstm(x)
        out = self.layer_norm(out[:, -1, :])
        out = self.dropout_fc(out)
        return self.fc(out).squeeze(-1)
 
 
class Regime:
    LONG = 'LONG'
    CASH = 'CASH'
    NEUTRAL = 'NEUTRAL'
 
 
class VolatiltyModel(QCAlgorithm):
    """Volatility-target algorithm using CNN-LSTM predictions."""
 
    def initialize(self):
        self.set_start_date(2023, 1, 3)
        self.set_end_date(2026, 3, 1)
        self.set_cash(100_000)
        self.settings.seed_initial_prices = True
        self._spy = self.add_equity("SPY")
        # Model parameters.
        self._lookback = 25
        self._future_horizon = 25
        self._training_history_days = 365 * 5
        self._entry_percentile = 15
        self._exit_percentile = 85
        self._min_exit_ratio = 1.3
        self._daily_window = RollingWindow[TradeBar](self._lookback)
        self._model = HARNet()
        self._model_is_training = True
        # Trading parameters.
        self._target_annual_vol = 0.12
        self._cooldown_days = 10
        self._cooldown_remaining = 0
        self._confirm_bars_required = 2
        self._confirm_count = 0
        self._confirm_direction = Regime.NEUTRAL
        self._rebalance_weight_tolerance = 0.10
        # WFO parameter search space for target volatility candidates.
        self._parameter_sets = [{'target_vol': v} for v in [0.08, 0.10, 0.12, 0.14, 0.16]]
        # SMA indicator for trend filtering (daily resolution).
        self._sma_50 = self.sma(self._spy, 50, Resolution.DAILY)
        # Daily consolidator for regime detection.
        daily_consolidator = TradeBarConsolidator(timedelta(days=1))
        daily_consolidator.data_consolidated += self._on_daily_bar
        self.subscription_manager.add_consolidator(self._spy, daily_consolidator)
        # Train model and run initial WFO on historical data.
        self.train(self.train_model)
        self.train(lambda: self._do_wfo(self._vol_optimization_func, max, self._sharpe_objective))
        # Retrain model and re-optimize monthly before market open.
        self.train(
            self.date_rules.month_start(self._spy),
            self.time_rules.before_market_open(self._spy, 30),
            self.train_model
        )
        self.train(
            self.date_rules.month_start(self._spy),
            self.time_rules.before_market_open(self._spy, 30),
            lambda: self._do_wfo(self._vol_optimization_func, max, self._sharpe_objective)
        )
 
    def train_model(self):
        self._model_is_training = True
        self._model = HARNet()
        history = self.history(
            self._spy,
            timedelta(days=self._training_history_days),
            Resolution.DAILY
        )
        if history.empty:
            self._finish_training()
            return
        df = history[['open', 'high', 'low', 'close']].copy()
        if getattr(df.index, "nlevels", 1) > 1:
            df = df.droplevel(0)
        df = df.sort_index().dropna()
        prices = df[['open', 'high', 'low', 'close']].values
        closes = df['close'].values
        x_data, y_data = [], []
        for i in range(self._lookback, len(df) - self._future_horizon):
            window = prices[i - self._lookback:i]
            future_closes = closes[i:i + self._future_horizon]
            future_returns = np.diff(closes[i:i + self._future_horizon + 1]) / future_closes
            realized_vol = np.sqrt(np.sum(future_returns ** 2))
            x_data.append(window)
            y_data.append(realized_vol)
        if not x_data:
            self._finish_training()
            return
        x_arr = np.array(x_data)
        y_arr = np.array(y_data)
        mean = x_arr.mean(axis=1, keepdims=True)
        std = x_arr.std(axis=1, keepdims=True) + 1e-6
        x_arr = (x_arr - mean) / std
        split = int(0.7 * len(x_arr))
        if split == 0 or split >= len(x_arr):
            self._finish_training()
            return
        x_train, x_val = x_arr[:split], x_arr[split:]
        y_train, y_val = y_arr[:split], y_arr[split:]
        x_train = torch.tensor(x_train, dtype=torch.float32)
        y_train = torch.tensor(y_train, dtype=torch.float32)
        x_val = torch.tensor(x_val, dtype=torch.float32)
        y_val = torch.tensor(y_val, dtype=torch.float32)
        optimizer = torch.optim.Adam(self._model.parameters(), lr=1e-3)
        loss_fn = nn.MSELoss()
        best_val = float("inf")
        best_train_loss = None
        best_state = None
        patience = 6
        patience_left = patience
        batch_size = 256
        for _ in range(50):
            self._model.train()
            indices = torch.randperm(len(x_train))
            epoch_losses = []
            for start in range(0, len(x_train), batch_size):
                batch_idx = indices[start:start + batch_size]
                optimizer.zero_grad()
                preds = self._model(x_train[batch_idx])
                loss = loss_fn(preds, y_train[batch_idx])
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self._model.parameters(), max_norm=1.0)
                optimizer.step()
                epoch_losses.append(loss.item())
            train_loss = float(np.mean(epoch_losses))
            self._model.eval()
            with torch.no_grad():
                val_preds = self._model(x_val)
                val_loss = float(loss_fn(val_preds, y_val).item())
            if val_loss < best_val:
                best_val = val_loss
                best_train_loss = train_loss
                best_state = {
                    key: value.detach().clone()
                    for key, value in self._model.state_dict().items()
                }
                patience_left = patience
            else:
                patience_left -= 1
                if patience_left == 0:
                    break
        if best_state is None:
            self._finish_training()
            return
        self._model.load_state_dict(best_state)
        self._model.eval()
        with torch.no_grad():
            preds_np = self._model(x_val).detach().numpy()
        self._entry_threshold = np.percentile(preds_np, self._entry_percentile)
        self._exit_threshold = np.percentile(preds_np, self._exit_percentile)
        self._exit_threshold = max(
            self._exit_threshold,
            self._entry_threshold * self._min_exit_ratio
        )
        self.plot("Model Training", "Train Loss", float(best_train_loss))
        self.plot("Model Training", "Val Loss", float(best_val))
        self.plot("Regime Thresholds", "Entry Threshold", float(self._entry_threshold))
        self.plot("Regime Thresholds", "Exit Threshold", float(self._exit_threshold))
        self.debug(
            f"Training - train_loss={best_train_loss:.6f}, val_loss={best_val:.6f} | "
            f"entry={self._entry_threshold:.6f}, exit={self._exit_threshold:.6f} | "
            f"pred_vol range=[{preds_np.min():.6f}, {preds_np.max():.6f}]"
        )
        self._finish_training()
 
    def _sharpe_objective(self, returns):
        if returns.std() == 0 or len(returns) < 10:
            return -np.inf
        return returns.mean() / returns.std() * np.sqrt(252)
 
    def _vol_optimization_func(self, data, parameter_set, objective):
        returns = data['close'].pct_change().dropna()
        rolling_vol = returns.rolling(self._lookback).std() * np.sqrt(252)
        position_sizes = (parameter_set['target_vol'] / rolling_vol.clip(lower=0.01)).clip(0, 1)
        strategy_returns = position_sizes.iloc[self._lookback:] * returns.iloc[self._lookback:]
        return objective(strategy_returns)
 
    def _do_wfo(self, optimization_func, min_max, objective):
        history = self.history(self._spy, timedelta(days=90), Resolution.DAILY)
        if history.empty:
            return
        data = history[['close']].copy()
        if getattr(data.index, "nlevels", 1) > 1:
            data = data.droplevel(0)
        data = data.sort_index().dropna()
        scores = [
            optimization_func(data, param_set, objective)
            for param_set in self._parameter_sets
        ]
        best_idx = scores.index(min_max(scores))
        self._update_vol_target(self._parameter_sets[best_idx])
 
    def _update_vol_target(self, optimal_parameters):
        self._target_annual_vol = optimal_parameters['target_vol']
        self.debug(f"WFO Optimized Target Vol: {self._target_annual_vol:.4f}")
 
    def _on_daily_bar(self, sender, bar):
        if self._model_is_training or not self._sma_50.is_ready:
            return
        self._daily_window.add(bar)
        if not self._daily_window.is_ready:
            return
        if self._cooldown_remaining > 0:
            self._cooldown_remaining -= 1
            return
        window = np.array([[b.open, b.high, b.low, b.close] for b in self._daily_window])[::-1]
        window_mean = window.mean(axis=0)
        window_std = window.std(axis=0)
        window = (window - window_mean) / (window_std + 1e-6)
        x = torch.tensor(window, dtype=torch.float32).unsqueeze(0)
        with torch.no_grad():
            pred_vol = self._model(x).item()
        trend_up = bar.close > self._sma_50.current.value
        desired = self._desired_regime(pred_vol, trend_up)
        current = self._current_regime()
        target_weight = self._target_weight(pred_vol)
        self.plot("Volatility Signal", "Pred Vol", pred_vol)
        self.plot("Volatility Signal", "Entry Threshold", self._entry_threshold)
        self.plot("Volatility Signal", "Exit Threshold", self._exit_threshold)
        if current == Regime.LONG and trend_up and pred_vol < self._exit_threshold:
            self._rebalance_if_needed(pred_vol, target_weight)
        if desired == Regime.NEUTRAL or desired == current:
            if desired != self._confirm_direction:
                self._confirm_count = 0
                self._confirm_direction = Regime.NEUTRAL
            return
        if desired == self._confirm_direction:
            self._confirm_count += 1
        else:
            self._confirm_direction = desired
            self._confirm_count = 1
        if self._confirm_count < self._confirm_bars_required:
            return
        self._confirm_count = 0
        self._confirm_direction = Regime.NEUTRAL
        self.debug(
            f"Regime: {current} -> {desired} | pred_vol={pred_vol:.6f} | "
            f"trend_up={trend_up} | weight={target_weight:.2f}"
        )
        if desired == Regime.LONG:
            self.set_holdings(self._spy, target_weight)
        else:
            self.liquidate(self._spy)
        self._cooldown_remaining = self._cooldown_days
 
    def _desired_regime(self, pred_vol: float, trend_up: bool) -> str:
        if pred_vol < self._entry_threshold and trend_up:
            return Regime.LONG
        if pred_vol > self._exit_threshold and not trend_up:
            return Regime.CASH
        return Regime.NEUTRAL
 
    def _current_regime(self) -> str:
        if self._spy.holdings.invested:
            return Regime.LONG
        return Regime.CASH
 
    def _finish_training(self) -> None:
        self._model.eval()
        self._model_is_training = False
 
    def _target_weight(self, pred_vol: float) -> float:
        predicted_annual_vol = pred_vol * np.sqrt(252 / self._future_horizon)
        return float(np.clip(self._target_annual_vol / max(predicted_annual_vol, 0.01), 0, 1))
 
    def _rebalance_if_needed(self, pred_vol: float, target_weight: float) -> None:
        if self.portfolio.total_portfolio_value <= 0:
            return
        current_weight = self._spy.holdings.holdings_value / self.portfolio.total_portfolio_value
        if abs(current_weight - target_weight) < self._rebalance_weight_tolerance:
            return
        self.debug(
            f"Rebalance LONG | pred_vol={pred_vol:.6f} | "
            f"target_weight={target_weight:.2f} | current_weight={current_weight:.2f}"
        )
        self.set_holdings(self._spy, target_weight)