Overall Statistics
Total Orders
85
Average Win
1.57%
Average Loss
-0.78%
Compounding Annual Return
85.302%
Drawdown
6.400%
Expectancy
0.649
Start Equity
100000
End Equity
122610.92
Net Profit
22.611%
Sharpe Ratio
2.869
Sortino Ratio
3.43
Probabilistic Sharpe Ratio
80.913%
Loss Rate
45%
Win Rate
55%
Profit-Loss Ratio
2.01
Alpha
0.582
Beta
0.141
Annual Standard Deviation
0.195
Annual Variance
0.038
Information Ratio
2.755
Tracking Error
0.265
Treynor Ratio
3.962
Total Fees
$524.30
Estimated Strategy Capacity
$1000000.00
Lowest Capacity Asset
SH TJNNZWL5I4IT
Portfolio Turnover
70.00%
Drawdown Recovery
26
from AlgorithmImports import *
from datetime import timedelta
import numpy as np
import torch
import torch.nn as nn


class HARNet(nn.Module):
    """CNN-LSTM hybrid model for volatility prediction."""

    def __init__(self, dropout_rate=0.3):
        super().__init__()
        self.conv_short = nn.Conv1d(4, 32, kernel_size=3, padding=1)
        self.conv_medium = nn.Conv1d(4, 32, kernel_size=5, padding=2)
        self.conv_long = nn.Conv1d(4, 32, kernel_size=11, padding=5)
        self.batch_norm = nn.BatchNorm1d(96)
        self.relu = nn.ReLU()
        self.dropout_conv = nn.Dropout(dropout_rate)
        self.lstm = nn.LSTM(96, 64, batch_first=True)
        self.layer_norm = nn.LayerNorm(64)
        self.dropout_fc = nn.Dropout(dropout_rate)
        self.fc = nn.Sequential(
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            nn.Softplus()
        )

    def forward(self, x):
        # CNN-LSTM hybrid model for volatility prediction.
        x = x.permute(0, 2, 1)
        s = self.relu(self.conv_short(x))
        m = self.relu(self.conv_medium(x))
        l = self.relu(self.conv_long(x))
        x = torch.cat([s, m, l], dim=1)
        x = self.batch_norm(x)
        x = self.dropout_conv(x)
        x = x.permute(0, 2, 1)
        out, _ = self.lstm(x)
        out = self.layer_norm(out[:, -1, :])
        out = self.dropout_fc(out)
        return self.fc(out).squeeze(-1)


class Regime:
    LONG    = 'LONG'
    HEDGE   = 'HEDGE'
    NEUTRAL = 'NEUTRAL'


class VolatiltyModel(QCAlgorithm):
    """Volatility-target algorithm using CNN-LSTM predictions."""

    def initialize(self):
        self.set_start_date(2022, 2, 1)
        self.set_end_date(2022, 6, 1)
        self.set_cash(100_000)
        self.settings.seed_initial_prices = True
        self._spy = self.add_equity("SPY")
        self._hedge = self.add_equity("SH")
        # Model parameters.
        self._lookback = 25
        self._future_horizon = 25
        self._window = RollingWindow[TradeBar](self._lookback)
        self._model = HARNet()
        self._model_is_training = True
        # Trading parameters.
        self._target_annual_vol = 0.12
        self._cooldown_bars = 20
        self._cooldown_remaining = 0
        # WFO parameter search space for target volatility candidates.
        self._parameter_sets = [{'target_vol': v} for v in [0.08, 0.10, 0.12, 0.14, 0.16]]
        # SMA indicator for trend filtering.
        self._sma_50 = self.sma(self._spy, 50)
        # 15-minute consolidator for bar aggregation and trading signals.
        consolidator = TradeBarConsolidator(timedelta(minutes=15))
        consolidator.data_consolidated += self._on_15_min_bar
        self.subscription_manager.add_consolidator(self._spy, consolidator)
        # Train model and run initial WFO on historical data.
        self.train(self.train_model)
        self.train(lambda: self._do_wfo(self._vol_optimization_func, max, self._sharpe_objective))
        # Retrain model and re-optimize monthly before market open.
        self.train(
            self.date_rules.month_start(self._spy),
            self.time_rules.before_market_open(self._spy, 30),
            self.train_model
        )
        self.train(
            self.date_rules.month_start(self._spy),
            self.time_rules.before_market_open(self._spy, 30),
            lambda: self._do_wfo(self._vol_optimization_func, max, self._sharpe_objective)
        )

    def train_model(self):
        self._model_is_training = True
        # Fetch 120K minute bars for training data.
        history = self.history(self._spy, 120_000)
        if history.empty:
            return
        df = history.reset_index()
        df = df[['time', 'open', 'high', 'low', 'close']]
        df.set_index('time', inplace=True)
        # Resample to 15-minute bars.
        df = df.resample('15T').agg({
            'open': 'first',
            'high': 'max',
            'low': 'min',
            'close': 'last'
        }).dropna()
        prices = df[['open', 'high', 'low', 'close']].values
        closes = df['close'].values
        # Generate training samples: lookback windows and future realized volatility.
        x_data, y_data = [], []
        for i in range(self._lookback, len(df) - self._future_horizon):
            window = prices[i - self._lookback:i]
            future_closes = closes[i:i + self._future_horizon - 1]
            future_returns = np.diff(closes[i:i + self._future_horizon]) / future_closes
            realized_vol = np.sqrt(np.sum(future_returns ** 2))
            x_data.append(window)
            y_data.append(realized_vol)
        x_arr = np.array(x_data)
        y_arr = np.array(y_data)
        # Per-sample normalization for stable training.
        mean = x_arr.mean(axis=1, keepdims=True)
        std = x_arr.std(axis=1, keepdims=True) + 1e-6
        x_arr = (x_arr - mean) / std
        # Train-validation split.
        split = int(0.7 * len(x_arr))
        x_train, x_val = x_arr[:split], x_arr[split:]
        y_train, y_val = y_arr[:split], y_arr[split:]
        # Convert to tensors.
        x_train = torch.tensor(x_train, dtype=torch.float32)
        y_train = torch.tensor(y_train, dtype=torch.float32)
        x_val = torch.tensor(x_val, dtype=torch.float32)
        y_val = torch.tensor(y_val, dtype=torch.float32)
        # Training loop with early stopping.
        optimizer = torch.optim.Adam(self._model.parameters(), lr=1e-3)
        loss_fn = nn.MSELoss()
        best_val = float("inf")
        patience = 4
        patience_left = patience
        # 30 epoch training.
        for _ in range(30):
            self._model.train()
            optimizer.zero_grad()
            preds = self._model(x_train)
            loss = loss_fn(preds, y_train)
            loss.backward()
            optimizer.step()
            # Evaluate on validation set.
            self._model.eval()
            with torch.no_grad():
                val_preds = self._model(x_val)
                val_loss = loss_fn(val_preds, y_val)
            # Early stop if validation loss plateaus.
            if val_loss < best_val:
                best_val = val_loss
                patience_left = patience
            else:
                patience_left -= 1
                if patience_left == 0:
                    break
        # Set regime thresholds based on validation predictions percentiles.
        # Wider percentiles (15/85) give more hysteresis; minimum 1.3x ratio
        # prevents band collapse when prediction distribution is right-skewed.
        preds_np = val_preds.numpy()
        self._entry_threshold = np.percentile(preds_np, 15)
        self._exit_threshold = np.percentile(preds_np, 85)
        self._exit_threshold = max(self._exit_threshold, self._entry_threshold * 1.3)
        # Chart training diagnostics.
        self.plot("Model Training", "Train Loss", float(loss.item()))
        self.plot("Model Training", "Val Loss", float(val_loss.item()))
        self.plot("Regime Thresholds", "Entry Threshold", float(self._entry_threshold))
        self.plot("Regime Thresholds", "Exit Threshold", float(self._exit_threshold))
        self.debug(
            f"Training — train_loss={loss.item():.6f}, val_loss={val_loss.item():.6f} | "
            f"entry={self._entry_threshold:.6f}, exit={self._exit_threshold:.6f} | "
            f"pred_vol range=[{preds_np.min():.6f}, {preds_np.max():.6f}]"
        )
        self._model.eval()
        self._model_is_training = False

    def _sharpe_objective(self, returns):
        if returns.std() == 0 or len(returns) < 10:
            return -np.inf
        return returns.mean() / returns.std() * np.sqrt(252 * 26)

    def _vol_optimization_func(self, data, parameter_set, objective):
        returns = data['close'].pct_change().dropna()
        rolling_vol = returns.rolling(self._lookback).std() * np.sqrt(252 * 26)
        position_sizes = (parameter_set['target_vol'] / rolling_vol.clip(lower=0.01)).clip(0, 1)
        strategy_returns = position_sizes.iloc[self._lookback:] * returns.iloc[self._lookback:]
        return objective(strategy_returns)

    def _do_wfo(self, optimization_func, min_max, objective):
        # Fetch ~3 months of minute data and resample to 15-min bars.
        history = self.history(self._spy, timedelta(days=90))
        if history.empty:
            return
        df = history.reset_index()[['time', 'close']].set_index('time')
        data = df.resample('15T').last().dropna()
        scores = [
            optimization_func(data, param_set, objective)
            for param_set in self._parameter_sets
        ]
        best_idx = scores.index(min_max(scores))
        self._update_vol_target(self._parameter_sets[best_idx])

    def _update_vol_target(self, optimal_parameters):
        self._target_annual_vol = optimal_parameters['target_vol']
        self.debug(f"WFO Optimized Target Vol: {self._target_annual_vol:.4f}")

    def _on_15_min_bar(self, sender, bar):
        if self._model_is_training or not self._sma_50.is_ready:
            return
        # Maintain rolling window of OHLC bars.
        self._window.add(bar)
        if not self._window.is_ready:
            return
        # Skip trading during cooldown period to prevent oversignaling.
        if self._cooldown_remaining > 0:
            self._cooldown_remaining -= 1
            return
        # Predict volatility from the rolling window.
        window = np.array([[b.open, b.high, b.low, b.close] for b in self._window])[::-1]
        window_mean = window.mean(axis=0)
        window_std = window.std(axis=0)
        window = (window - window_mean) / (window_std + 1e-6)
        x = torch.tensor(window, dtype=torch.float32).unsqueeze(0)
        # Generate volatility prediction and scale to annual basis.
        with torch.no_grad():
            pred_vol = self._model(x).item()
        # State machine: transition only when regime changes.
        trend_up = bar.close > self._sma_50.current.value
        desired = self._desired_regime(pred_vol, trend_up)
        current = self._current_regime()
        if desired == Regime.NEUTRAL or desired == current:
            return
        # Chart and log regime transition signal.
        self.plot("Volatility Signal", "Pred Vol", pred_vol)
        self.plot("Volatility Signal", "Entry Threshold", self._entry_threshold)
        self.plot("Volatility Signal", "Exit Threshold", self._exit_threshold)
        self.debug(f"Regime: {current} → {desired} | pred_vol={pred_vol:.6f} | trend_up={trend_up}")
        if desired == Regime.LONG:
            if self._hedge.holdings.invested:
                self.liquidate(self._hedge)
            self.set_holdings(self._spy, 1.0)
        else:  # Regime.HEDGE
            if self._spy.holdings.invested:
                self.liquidate(self._spy)
            self.set_holdings(self._hedge, 1.0)
        self._cooldown_remaining = self._cooldown_bars

    def _desired_regime(self, pred_vol: float, trend_up: bool) -> str:
        if pred_vol < self._entry_threshold and trend_up:
            return Regime.LONG
        if pred_vol > self._exit_threshold:
            return Regime.HEDGE
        return Regime.NEUTRAL

    def _current_regime(self) -> str:
        if self._spy.holdings.invested:
            return Regime.LONG
        if self._hedge.holdings.invested:
            return Regime.HEDGE
        return Regime.NEUTRAL