Overall Statistics
Total Orders
190
Average Win
3.95%
Average Loss
-0.82%
Compounding Annual Return
14.701%
Drawdown
34.700%
Expectancy
3.338
Start Equity
100000
End Equity
1411555.23
Net Profit
1311.555%
Sharpe Ratio
0.656
Sortino Ratio
0.715
Probabilistic Sharpe Ratio
11.941%
Loss Rate
26%
Win Rate
74%
Profit-Loss Ratio
4.83
Alpha
0.044
Beta
0.669
Annual Standard Deviation
0.134
Annual Variance
0.018
Information Ratio
0.239
Tracking Error
0.095
Treynor Ratio
0.131
Total Fees
$690.46
Estimated Strategy Capacity
$180000000.00
Lowest Capacity Asset
TLT SGNKIKYGE9NP
Portfolio Turnover
0.49%
Drawdown Recovery
807
from AlgorithmImports import *
import torch
from models import VolatilityPredictor
from plot import VolatilityPlotter, SignalPlotState


# Volatility-targeting algorithm that rotates between QQQ and T-bills using CNN-LSTM regime signals.
class VolatiltyModel(QCAlgorithm):

    def initialize(self):
        torch.manual_seed(0)
        # self.set_start_date(self.end_date - timedelta(5*365))
        self.set_start_date(2007,1,1)
        self.set_cash(100_000)
        self.settings.seed_initial_prices = True
        self.settings.automatic_indicator_warm_up = True
        self._qqq = self.add_equity("QQQ")
        self._hedge = self.add_equity("TLT")
        self._qqq.session.size = 1300
        self._lookback = 22
        self._future_horizon = 22
        self._model = VolatilityPredictor(self._lookback, self._future_horizon)
        self._model_is_training = True
        self._target_vol = 0.12
        self._target_vol_sma = SimpleMovingAverage(3)
        self._pred_vol_sma = SimpleMovingAverage(5)
        self._rebalance_weight_tolerance = 0.15
        self._plotter = VolatilityPlotter(self)
        # Register a daily consolidator to drive regime detection on each completed bar.
        daily_consolidator = TradeBarConsolidator(timedelta(1))
        daily_consolidator.data_consolidated += self._on_daily_bar
        self.subscription_manager.add_consolidator(self._qqq, daily_consolidator)
        for bar in self.history[TradeBar](self._qqq, self._lookback, Resolution.DAILY):
            daily_consolidator.update(bar)
        self.set_warm_up(timedelta(365*4))

    def on_warmup_finished(self):
        time_rule = self.time_rules.before_market_open(self._qqq, 30)
        date_rule = self.date_rules.month_start(self._qqq)
        # Schedule monthly retraining and walk-forward optimization before the opening trade.
        self.train(date_rule, time_rule, self.train_model)
        self.train(date_rule, time_rule, self._do_wfo)
        # Run the first training cycle as soon as warm-up finishes.
        if self.live_mode:
            self.train_model()
            self._do_wfo()
        else:
            self.train(self.date_rules.today, time_rule, self.train_model)
            self.train(self.date_rules.today, time_rule, self._do_wfo)

    def _portfolio_weight(self, security):
        return security.holdings.holdings_value / max(self.portfolio.total_portfolio_value, 1e-6)

    def train_model(self):
        self._model_is_training = True
        bars = list(self._qqq.session)[1:][::-1]
        result = self._model.fit(bars)
        if result:
            self._plotter.plot_training(self._model, result)
        self._model_is_training = False

    def _do_wfo(self):
        # Run the WFO grid search on the 65 most recent completed daily bars.
        bars = list(self._qqq.session)[1:66][::-1]
        vol_grid = [0.08, 0.10, 0.12, 0.14, 0.16]
        returns = pd.Series([bar.close for bar in bars]).pct_change().dropna()
        rolling_vol = returns.rolling(self._lookback).std() * np.sqrt(252)
        scores = []
        # Evaluate each candidate target volatility over the recent volatility window.
        for vol in vol_grid:
            position_sizes = (vol / rolling_vol.clip(lower=0.01)).clip(0, 1)
            strategy_returns = position_sizes.iloc[self._lookback:] * returns.iloc[self._lookback:]
            # Annualize the candidate Sharpe ratio.
            scores.append(strategy_returns.mean() / strategy_returns.std() * np.sqrt(252))
        raw_vol = vol_grid[scores.index(max(scores))]
        # Seed the SMA with the initial target vol on the first call.
        if self._target_vol_sma.samples == 0:
            self._target_vol_sma.update(bars[-1].end_time - timedelta(30), self._target_vol)
        self._target_vol_sma.update(bars[-1].end_time, raw_vol)
        self._target_vol = self._target_vol_sma.current.value
        self._plotter.plot_target_vol(raw_vol, self._target_vol)
        
    def _on_daily_bar(self, sender, bar):
        if self._model_is_training:
            return
        prev_bars = list(self._qqq.session)[1:self._lookback][::-1]
        pred_vol = self._model.predict(prev_bars, bar)
        self._pred_vol_sma.update(bar.end_time, pred_vol)
        pred_vol_smooth = self._pred_vol_sma.current.value
        ann_pred_vol = pred_vol_smooth * np.sqrt(252)
        target_weight = np.clip(self._target_vol / max(ann_pred_vol, 1e-6), 0.0, 0.95)
        state = SignalPlotState(
            bar.close, pred_vol, pred_vol_smooth,
            target_weight, self._portfolio_weight(self._qqq), self._target_vol
        )
        self._plotter.plot_signal_state(state)
        if abs(self._portfolio_weight(self._qqq) - target_weight) >= self._rebalance_weight_tolerance:
            target_weight = np.clip(target_weight, 0, 1)
            targets = [PortfolioTarget(self._qqq, target_weight), PortfolioTarget(self._hedge, 1 - target_weight)]
            self.set_holdings(targets)
import math 
import numpy as np
from scipy.stats import spearmanr
import torch
import torch.nn as nn


# CNN-LSTM hybrid that extracts multi-scale volatility features via parallel convolutions.
class HARNet(nn.Module):

    def __init__(self, dropout_rate=0.3):
        super().__init__()
        # Three parallel Conv1d kernels capture short, medium, and long-horizon patterns.
        self.conv_short = nn.Conv1d(1, 32, kernel_size=3, padding=1)
        self.conv_medium = nn.Conv1d(1, 32, kernel_size=5, padding=2)
        self.conv_long = nn.Conv1d(1, 32, kernel_size=11, padding=5)
        self.norm = nn.InstanceNorm1d(96, affine=True)
        self.relu = nn.ReLU()
        self.dropout_conv = nn.Dropout(dropout_rate)
        self.lstm = nn.LSTM(96, 64, batch_first=True)
        self.layer_norm = nn.LayerNorm(64)
        self.dropout_fc = nn.Dropout(dropout_rate)
        self.fc = nn.Sequential(
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
        )

    def forward(self, x):
        # Permute to (batch, features, time) required by Conv1d.
        x = x.permute(0, 2, 1)
        s = self.relu(self.conv_short(x))
        m = self.relu(self.conv_medium(x))
        l = self.relu(self.conv_long(x))
        # Concatenate the three feature maps along the channel axis.
        x = torch.cat([s, m, l], dim=1)
        x = self.norm(x)
        x = self.dropout_conv(x)
        # Permute back to (batch, time, features) required by LSTM.
        x = x.permute(0, 2, 1)
        out, _ = self.lstm(x)
        # Take the final hidden state and apply layer norm before the output head.
        out = self.layer_norm(out[:, -1, :])
        out = self.dropout_fc(out)
        return self.fc(out).squeeze(-1)


# Wraps HARNet with data preparation, training, and inference.
class VolatilityPredictor:

    def __init__(self, lookback=25, future_horizon=25):
        self.lookback = lookback
        self.future_horizon = future_horizon
        self.prediction_error_std = 1e-6
        self.validation_rank_corr = 0.0
        # Use GPU for training and inference if available.
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        self.net = HARNet().to(self.device)
  
    def _parkinson_series(self, bars):  
        # Compute the Parkinson volatility estimator series.
        return np.array([math.sqrt((1.0 / (4.0 * math.log(2))) * math.log(max(bar.high / bar.low, 1.0 + 1e-9)) ** 2) for bar in bars])

    def _normalize_windows(self, windows):
        axis = 1 if windows.ndim == 3 else 0
        mean = windows.mean(axis=axis, keepdims=True)
        std = windows.std(axis=axis, keepdims=True) + 1e-6
        return (windows - mean) / std

    def fit(self, bars):
        # Start each retrain from fresh random weights.
        self.net = HARNet().to(self.device)
        park_arr = self._parkinson_series(bars)
        # Build sliding Parkinson windows; label = mean daily Parkinson vol over the future horizon.
        x_data, y_data = [], []
        for i in range(self.lookback, len(park_arr) - self.future_horizon):
            x_data.append(park_arr[i - self.lookback:i].reshape(-1, 1))
            y_data.append(np.log(np.mean(park_arr[i:i + self.future_horizon])))
        x_arr = self._normalize_windows(np.array(x_data))
        y_arr = np.array(y_data)
        # Reserve the newest samples for validation to avoid time leakage.
        split_idx = int(0.7 * len(x_arr))
        x_train = torch.tensor(x_arr[:split_idx], dtype=torch.float32).to(self.device)
        y_train = torch.tensor(y_arr[:split_idx], dtype=torch.float32).to(self.device)
        x_val = torch.tensor(x_arr[split_idx:], dtype=torch.float32).to(self.device)
        y_val = torch.tensor(y_arr[split_idx:], dtype=torch.float32).to(self.device)
        loss_fn = nn.MSELoss()
        optimizer = torch.optim.Adam(self.net.parameters(), lr=1e-3)
        best_val = float("inf")
        best_state = None
        patience_left = 6
        batch_size = 256
        for _ in range(50):
            self.net.train()
            # Shuffle sample order each epoch to reduce gradient bias across mini-batches.
            indices = torch.randperm(len(x_train))
            for start in range(0, len(x_train), batch_size):
                batch_idx = indices[start:start + batch_size]
                optimizer.zero_grad()
                loss = loss_fn(self.net(x_train[batch_idx]), y_train[batch_idx])
                loss.backward()
                # Clip gradients to a unit norm to prevent exploding gradients.
                torch.nn.utils.clip_grad_norm_(self.net.parameters(), max_norm=1.0)
                optimizer.step()
            self.net.eval()
            with torch.no_grad():
                val_loss = loss_fn(self.net(x_val), y_val).item()
            # Save the best checkpoint and reset patience whenever validation loss improves.
            if val_loss < best_val:
                best_val = val_loss
                best_state = {k: v.detach().clone() for k, v in self.net.state_dict().items()}
                patience_left = 6
            else:
                patience_left -= 1
                if patience_left == 0:
                    break
        self.net.load_state_dict(best_state)
        self.net.eval()
        with torch.no_grad():
            train_loss = loss_fn(self.net(x_train), y_train).item()
            val_loss = loss_fn(self.net(x_val), y_val).item()
            preds_np = self.net(x_val).cpu().detach().numpy()
        y_val_np = y_val.cpu().detach().numpy()
        residuals = preds_np - y_val_np
        self.prediction_error_std = np.std(residuals) + 1e-6
        self.validation_rank_corr = spearmanr(y_val_np, preds_np).statistic
        return train_loss, val_loss, np.min(preds_np), np.max(preds_np)

    def predict(self, prev_bars, current_bar):
        park_vals = self._parkinson_series(prev_bars + [current_bar])
        window = self._normalize_windows(park_vals.reshape(-1, 1))
        x = torch.tensor(window, dtype=torch.float32).unsqueeze(0).to(self.device)
        self.net.eval()
        with torch.no_grad():
            return np.exp(self.net(x).item())
from AlgorithmImports import *


class SignalPlotState:

    def __init__(self, bar_close, pred_vol, pred_vol_smooth, target_weight, actual_weight, target_vol):
        self.bar_close = bar_close
        self.pred_vol = pred_vol
        self.pred_vol_smooth = pred_vol_smooth
        self.target_weight = target_weight
        self.actual_weight = actual_weight
        self.target_vol = target_vol


class VolatilityPlotter:

    def __init__(self, algorithm):
        self._algorithm = algorithm

    def _plot_series(self, chart_name, series):
        for name, value in series:
            self._algorithm.plot(chart_name, name, value)

    def plot_training(self, model, report):
        if report:
            train_loss, val_loss = report[0], report[1]
            self._plot_series(
                "Model Training",
                [("Train Loss", train_loss), ("Val Loss", val_loss)],
            )
        self._plot_series(
            "Validation Diagnostics",
            [("Rank Corr", model.validation_rank_corr), ("Pred Error Std", model.prediction_error_std)],
        )

    def plot_target_vol(self, raw_vol, smooth_vol):
        # Plot the walk-forward target volatility selected by the WFO grid search.
        self._plot_series("Target Vol", [("Raw", raw_vol), ("Smooth", smooth_vol)])

    def plot_signal_state(self, state):
        self._plot_series(
            "QQQ",
            [("Price", state.bar_close)],
        )
        self._plot_series(
            "Volatility Signal",
            [
                ("Pred Vol", state.pred_vol),
                ("Pred Vol Smooth", state.pred_vol_smooth),
            ],
        )
        self._plot_series(
            "Allocation",
            [
                ("QQQTargetWeight", state.target_weight),
                ("QQQActualWeight", state.actual_weight),
                ("TargetVol", state.target_vol),
            ],
        )