Overall Statistics
Total Orders
487
Average Win
0.71%
Average Loss
-0.69%
Compounding Annual Return
0.868%
Drawdown
19.700%
Expectancy
0.034
Start Equity
100000
End Equity
104416.36
Net Profit
4.416%
Sharpe Ratio
-0.361
Sortino Ratio
-0.397
Probabilistic Sharpe Ratio
0.848%
Loss Rate
49%
Win Rate
51%
Profit-Loss Ratio
1.03
Alpha
-0.014
Beta
-0.231
Annual Standard Deviation
0.081
Annual Variance
0.006
Information Ratio
-0.512
Tracking Error
0.188
Treynor Ratio
0.126
Total Fees
$1929.01
Estimated Strategy Capacity
$970000.00
Lowest Capacity Asset
SH TJNNZWL5I4IT
Portfolio Turnover
16.06%
Drawdown Recovery
1386
from AlgorithmImports import *
import torch
import torch.nn as nn


class HARNet(nn.Module):
    """CNN-LSTM hybrid model for volatility prediction."""

    def __init__(self, dropout_rate=0.3):
        super().__init__()
        self.conv_short = nn.Conv1d(4, 32, kernel_size=3, padding=1)
        self.conv_medium = nn.Conv1d(4, 32, kernel_size=5, padding=2)
        self.conv_long = nn.Conv1d(4, 32, kernel_size=11, padding=5)
        self.batch_norm = nn.BatchNorm1d(96)
        self.relu = nn.ReLU()
        self.dropout_conv = nn.Dropout(dropout_rate)
        self.lstm = nn.LSTM(96, 64, batch_first=True)
        self.layer_norm = nn.LayerNorm(64)
        self.dropout_fc = nn.Dropout(dropout_rate)
        self.fc = nn.Sequential(
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1)
        )

    def forward(self, x):
        # CNN-LSTM hybrid model for volatility prediction.
        x = x.permute(0, 2, 1)
        s = self.relu(self.conv_short(x))
        m = self.relu(self.conv_medium(x))
        l = self.relu(self.conv_long(x))
        x = torch.cat([s, m, l], dim=1)
        x = self.batch_norm(x)
        x = self.dropout_conv(x)
        x = x.permute(0, 2, 1)
        out, _ = self.lstm(x)
        out = self.layer_norm(out[:, -1, :])
        out = self.dropout_fc(out)
        return self.fc(out).squeeze(-1)


class VolatiltyModel(QCAlgorithm):
    """Volatility-target algorithm using CNN-LSTM predictions."""

    def initialize(self):
        self.set_start_date(self.end_date - timedelta(5 * 365))
        self.set_cash(100_000)
        self.settings.seed_initial_prices = True
        self._spy = self.add_equity("SPY")
        self._hedge = self.add_equity("SH")
        # Model parameters.
        self._lookback = 25
        self._future_horizon = 25
        self._window = RollingWindow[TradeBar](self._lookback)
        self._model = HARNet()
        self._model_trained = False
        self._model_is_training = False
        # Trading parameters.
        self._target_annual_vol = 0.12
        self._cooldown_bars = 20
        self._cooldown_remaining = 0
        # SMA indicator for trend filtering.
        self._sma_50 = self.sma(self._spy, 50)
        # 15-minute consolidator for bar aggregation and trading signals.
        consolidator = TradeBarConsolidator(timedelta(minutes=15))
        consolidator.data_consolidated += self._on_15_min_bar
        self.subscription_manager.add_consolidator(self._spy, consolidator)
        # Train model on historical data.
        self.train(self.train_model)
        # Retrain model monthly to adapt to market regime changes.
        self.train(
            self.date_rules.month_start(self._spy),
            self.time_rules.after_market_open(self._spy, 30),
            self.train_model
        )

    def train_model(self) -> None:
        self._model_is_training = True
        # Fetch 120K minute bars for training data.
        history = self.history(self._spy, 120_000)
        if history.empty:
            return
        df = history.reset_index()
        df = df[['time', 'open', 'high', 'low', 'close']]
        df.set_index('time', inplace=True)
        # Resample to 15-minute bars.
        df = df.resample('15T').agg({
            'open': 'first',
            'high': 'max',
            'low': 'min',
            'close': 'last'
        }).dropna()
        prices = df[['open', 'high', 'low', 'close']].values
        closes = df['close'].values
        # Generate training samples: lookback windows and future realized volatility.
        x_data, y_data = [], []
        for i in range(self._lookback, len(df) - self._future_horizon):
            window = prices[i - self._lookback:i]
            future_closes = closes[i:i + self._future_horizon - 1]
            future_returns = np.diff(closes[i:i + self._future_horizon]) / future_closes
            realized_vol = np.sqrt(np.sum(future_returns ** 2))
            x_data.append(window)
            y_data.append(realized_vol)
        x_data = np.array(x_data)
        y_data = np.array(y_data)
        # Per-sample normalization for stable training.
        mean = x_data.mean(axis=1, keepdims=True)
        std = x_data.std(axis=1, keepdims=True) + 1e-6
        x_data = (x_data - mean) / std
        # Train-validation split.
        split = int(0.7 * len(x_data))
        x_train, x_val = x_data[:split], x_data[split:]
        y_train, y_val = y_data[:split], y_data[split:]
        # Convert to tensors.
        x_train = torch.tensor(x_train, dtype=torch.float32)
        y_train = torch.tensor(y_train, dtype=torch.float32)
        x_val = torch.tensor(x_val, dtype=torch.float32)
        y_val = torch.tensor(y_val, dtype=torch.float32)
        # Training loop with early stopping.
        optimizer = torch.optim.Adam(self._model.parameters(), lr=1e-3)
        loss_fn = nn.MSELoss()
        best_val = float("inf")
        patience = 4
        patience_left = patience
        # 30 epoch training.
        for _ in range(30):
            self._model.train()
            optimizer.zero_grad()
            preds = self._model(x_train)
            loss = loss_fn(preds, y_train)
            loss.backward()
            optimizer.step()
            # Evaluate on validation set.
            self._model.eval()
            with torch.no_grad():
                val_preds = self._model(x_val)
                val_loss = loss_fn(val_preds, y_val)
            # Early stop if validation loss plateaus.
            if val_loss < best_val:
                best_val = val_loss
                patience_left = patience
            else:
                patience_left -= 1
                if patience_left == 0:
                    break
        # Set regime thresholds based on validation predictions percentiles.
        preds_np = val_preds.numpy()
        self._entry_threshold = np.percentile(preds_np, 20)
        self._exit_threshold = np.percentile(preds_np, 80)
        # Optimize target volatility based on realized market volatility.
        self._optimize_target_vol(df['close'])
        self._model.eval()
        self._model_is_training = False
        self._model_trained = True

    def _optimize_target_vol(self, close_prices: pd.Series) -> None:
        """Optimize target vol by testing candidates on recent data for best Sharpe ratio."""
        # Use last ~1500 bars (~2 months of 15-min data).
        recent_closes = close_prices.tail(1500)        
        if len(recent_closes) < self._lookback:
            return        
        candidates = [0.08, 0.10, 0.12, 0.14, 0.16]
        best_sharpe = -np.inf
        best_vol = self._target_annual_vol        
        for target_vol in candidates:
            sharpe = self._calculate_sharpe_for_vol(recent_closes, target_vol)
            if sharpe > best_sharpe:
                best_sharpe = sharpe
                best_vol = target_vol        
        self._target_annual_vol = best_vol
        self.debug(f"Optimized Target Vol: {self._target_annual_vol:.4f} (Best Sharpe: {best_sharpe:.3f})")

    def _calculate_sharpe_for_vol(self, close_prices: pd.Series, target_vol: float) -> float:
        """Calculate Sharpe ratio for strategy with given target vol candidate."""
        returns = close_prices.pct_change().dropna()        
        # Estimate position sizing based on rolling volatility.
        rolling_vol = returns.rolling(self._lookback).std() * np.sqrt(252)
        position_sizes = (target_vol / rolling_vol.clip(lower=0.01)).clip(0, 1)        
        # Strategy returns = position sizing * market returns.
        strategy_returns = position_sizes.iloc[self._lookback:] * returns.iloc[self._lookback:]        
        if strategy_returns.std() == 0 or len(strategy_returns) < 10:
            return -np.inf        
        sharpe = strategy_returns.mean() / strategy_returns.std() * np.sqrt(252)
        return sharpe

    def _on_15_min_bar(self, sender: object, bar: TradeBar) -> None:
        # Guard clauses: exit early if model not ready or indicator not ready.
        if not self._model_trained or self._model_is_training or not self._sma_50.is_ready:
            return
        # Maintain rolling window of OHLC bars.
        self._window.add(bar)
        if not self._window.is_ready:
            return
        # Skip trading during cooldown period to prevent oversignaling.
        if self._cooldown_remaining > 0:
            self._cooldown_remaining -= 1
            return
        # Predict volatility from the rolling window.
        window = np.array([[b.open, b.high, b.low, b.close] for b in self._window])
        window_mean = window.mean(axis=0)
        window_std = window.std(axis=0)
        window = (window - window_mean) / (window_std + 1e-6)
        x = torch.tensor(window, dtype=torch.float32).unsqueeze(0)
        # Generate volatility prediction and scale to annual basis.
        torch.no_grad()
        pred_vol = self._model(x).item()
        annual_vol = pred_vol * np.sqrt(252)
        # Compute position size for volatility targeting.
        position_size = self._target_annual_vol / max(annual_vol, 1e-4)
        position_size = np.clip(position_size, 0, 1.0)
        # Determine if trend is up based on SMA.
        trend_up = bar.close > self._sma_50.current.value
        # Trade logic: long SPY in low volatility with uptrend, else hedge with SH.
        spy_qty = self._spy.holdings.quantity
        hedge_qty = self._hedge.holdings.quantity
        if pred_vol < self._entry_threshold and trend_up:
            # Exit hedge if open before entering SPY.
            if hedge_qty:
                self.liquidate(self._hedge)
                self._cooldown_remaining = self._cooldown_bars
            elif not spy_qty:
                self.set_holdings(self._spy, position_size)
                self._cooldown_remaining = self._cooldown_bars
        elif pred_vol > self._exit_threshold:
            # Exit SPY if open before entering hedge.
            if spy_qty:
                self.liquidate(self._spy)
                self._cooldown_remaining = self._cooldown_bars
            elif not hedge_qty:
                self.set_holdings(self._hedge, position_size)
                self._cooldown_remaining = self._cooldown_bars