Overall Statistics
Total Orders
547
Average Win
0.29%
Average Loss
-0.21%
Compounding Annual Return
17.792%
Drawdown
10.600%
Expectancy
0.723
Start Equity
1000000
End Equity
1826447.97
Net Profit
82.645%
Sharpe Ratio
0.779
Sortino Ratio
0.871
Probabilistic Sharpe Ratio
72.746%
Loss Rate
27%
Win Rate
73%
Profit-Loss Ratio
1.36
Alpha
0.02
Beta
0.595
Annual Standard Deviation
0.093
Annual Variance
0.009
Information Ratio
-0.226
Tracking Error
0.072
Treynor Ratio
0.121
Total Fees
$3326.04
Estimated Strategy Capacity
$0
Lowest Capacity Asset
AGG SSC0EI5J2F6T
Portfolio Turnover
2.29%
Drawdown Recovery
130
#region imports
from AlgorithmImports import *
#endregion

import numpy as np


class ScheduledCAPM(QCAlgorithm):

    def Initialize(self):

        self.SetStartDate(2022, 9, 1)
        self.SetEndDate(2026, 5, 5)

        self._cash = 1000000
        self.SetCash(self._cash)

        # ------------------------------------------------------------
        # Universe
        # ------------------------------------------------------------
        self._tickers = [
            "AGG", "IWM", "IAU", "COMT", "USMV", "DGRO",
            "QUAL", "DVY", "MTUM", "VLUE", "EFAV", "EEMV",
            "IDV", "IQLT", "IYW", "IGF", "IYH"
        ]

        self.symbols = [
            self.AddEquity(ticker, Resolution.Daily).Symbol
            for ticker in self._tickers
        ]

        self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
        self.ivv = self.AddEquity("IVV", Resolution.Daily).Symbol

        # Regression benchmark
        self._bench = self.spy
        self.SetBenchmark(self.spy)

        # ------------------------------------------------------------
        # Parameters
        # ------------------------------------------------------------
        self.lookback = self.GetIntParameter("lookback", 60)
        self.selection_count_alpha = self.GetIntParameter("selection_count_alpha", 5)
        self.selection_count_beta = self.GetIntParameter("selection_count_beta", 5)

        # Total weight assigned to selected alpha/beta ETF basket.
        # The remainder goes to SPY.
        self.satellite_weight = self.GetFloatParameter("satellite_weight", 0.50)

        # Run the model every third Monday by default.
        self.rebalance_every_n_mondays = self.GetIntParameter("rebalance_every_n_mondays", 3)

        # Stop loss from latest rebalance value.
        self.stop_loss = self.GetFloatParameter("stop_loss", 0.05)

        self.lookback = max(30, self.lookback)
        self.selection_count_alpha = max(0, self.selection_count_alpha)
        self.selection_count_beta = max(0, self.selection_count_beta)
        self.satellite_weight = max(0.0, min(1.0, self.satellite_weight))
        self.rebalance_every_n_mondays = max(1, self.rebalance_every_n_mondays)
        self.stop_loss = max(0.0, self.stop_loss)

        # ------------------------------------------------------------
        # State
        # ------------------------------------------------------------
        self.counter = 0
        self.stop_triggered = False
        self.risk_reference_value = self._cash

        self.last_targets = {}

        for symbol in self.symbols + [self.spy]:
            self.last_targets[symbol] = 0.0

        history = self.History(self.spy, 10, Resolution.Daily)

        if not history.empty:
            self._initialValue = history["close"].iloc[0]
        else:
            self._initialValue = None

        self.SetWarmUp(self.lookback + 5, Resolution.Daily)

        # ------------------------------------------------------------
        # Scheduled events only. No OnData loop.
        # ------------------------------------------------------------
        self.Schedule.On(
            self.DateRules.Every(DayOfWeek.Monday),
            self.TimeRules.AfterMarketOpen(self.spy, 10),
            self.Rebalance
        )

        self.Schedule.On(
            self.DateRules.Every(DayOfWeek.Monday),
            self.TimeRules.AfterMarketOpen(self.spy, 30),
            self.RiskCheck
        )

    def Rebalance(self):

        self.counter += 1

        if self.IsWarmingUp:
            return

        if self.counter % self.rebalance_every_n_mondays != 0:
            return

        # Restart only at scheduled rebalance after stop loss.
        if self.stop_triggered:
            self.stop_triggered = False
            self.risk_reference_value = self.Portfolio.TotalPortfolioValue
            self.Debug("Restarting after stop-loss on " + str(self.Time.date()))

        history = self.History(
            self.symbols + [self._bench],
            self.lookback + 1,
            Resolution.Daily
        )

        if history.empty:
            self.Debug("History empty on " + str(self.Time.date()))
            return

        try:
            close = history["close"].unstack(level=0)
        except:
            self.Debug("Could not unstack history on " + str(self.Time.date()))
            return

        if self._bench not in close.columns:
            self.Debug("Benchmark missing from history on " + str(self.Time.date()))
            return

        alpha_table = self.CalculateAlphaTable(close)
        beta_table = self.CalculateBetaTable(close)

        selected_alpha = [x[0] for x in alpha_table[:self.selection_count_alpha]]
        selected_beta = [x[0] for x in beta_table[:self.selection_count_beta]]

        selected_symbols = []

        for symbol in selected_alpha + selected_beta:
            if symbol not in selected_symbols:
                selected_symbols.append(symbol)

        target_weights = self.BuildTargetWeights(selected_symbols)
        turnover = self.ApplyTargets(target_weights)

        self.risk_reference_value = self.Portfolio.TotalPortfolioValue

        self.Debug(
            "Rebalance "
            + str(self.Time.date())
            + " | Alpha: "
            + str([x.Value for x in selected_alpha])
            + " | Low Beta: "
            + str([x.Value for x in selected_beta])
            + " | Final: "
            + str([x.Value for x in selected_symbols])
            + " | Turnover="
            + str(round(turnover, 4))
        )

        self.Plot("Relative Performance", "Total Portfolio Value", self.Portfolio.TotalPortfolioValue)

        if self._initialValue is not None and self.Securities[self.spy].Price > 0:
            benchmark_value = self._cash * self.Securities[self.spy].Close / self._initialValue
            self.Plot("Relative Performance", "SPY Benchmark", benchmark_value)

        self.Plot("CAPM Diagnostics", "Selected Count", len(selected_symbols))
        self.Plot("CAPM Diagnostics", "Turnover", turnover)

        if len(alpha_table) > 0:
            self.Plot("CAPM Diagnostics", "Top Alpha", alpha_table[0][1])

        if len(beta_table) > 0:
            self.Plot("CAPM Diagnostics", "Lowest Beta", beta_table[0][1])

    def RiskCheck(self):

        if self.IsWarmingUp:
            return

        if self.stop_triggered:
            return

        if self.risk_reference_value <= 0:
            return

        drawdown_from_rebalance = (
            self.Portfolio.TotalPortfolioValue
            / self.risk_reference_value
            - 1.0
        )

        if drawdown_from_rebalance <= -self.stop_loss:

            self.Liquidate()
            self.stop_triggered = True

            for symbol in self.last_targets:
                self.last_targets[symbol] = 0.0

            self.Debug(
                "Stop-loss triggered on "
                + str(self.Time.date())
                + " drawdown="
                + str(round(drawdown_from_rebalance, 4))
            )

    def BuildTargetWeights(self, selected_symbols):

        target_weights = {}

        for symbol in self.symbols:
            target_weights[symbol] = 0.0

        target_weights[self.spy] = 0.0

        if len(selected_symbols) == 0:
            target_weights[self.spy] = 1.0
            return target_weights

        selected_weight = self.satellite_weight / len(selected_symbols)

        for symbol in selected_symbols:
            target_weights[symbol] = selected_weight

        # This prevents the portfolio from going above 100%.
        target_weights[self.spy] = 1.0 - self.satellite_weight

        return target_weights

    def ApplyTargets(self, target_weights):

        turnover = 0.0

        for symbol, target_weight in target_weights.items():
            previous_weight = self.last_targets.get(symbol, 0.0)
            turnover += abs(target_weight - previous_weight)

        turnover = turnover / 2.0

        for symbol, target_weight in target_weights.items():
            self.SetHoldings(symbol, target_weight)
            self.last_targets[symbol] = target_weight

        return turnover

    def CalculateAlphaTable(self, close):

        alphas = {}

        bench_returns = close[self._bench].pct_change().dropna()

        for symbol in self.symbols:

            if symbol not in close.columns:
                continue

            returns = close[symbol].pct_change().dropna()

            asset_returns, aligned_bench_returns = returns.align(
                bench_returns,
                join="inner"
            )

            if len(asset_returns) < 10:
                continue

            x = np.vstack([
                aligned_bench_returns.values,
                np.ones(len(aligned_bench_returns.values))
            ]).T

            y = asset_returns.values

            try:
                result = np.linalg.lstsq(x, y, rcond=None)
                alphas[symbol] = result[0][1]
            except:
                continue

        return sorted(
            alphas.items(),
            key=lambda x: x[1],
            reverse=True
        )

    def CalculateBetaTable(self, close):

        betas = {}

        bench_returns = close[self._bench].pct_change().dropna()

        for symbol in self.symbols:

            if symbol not in close.columns:
                continue

            returns = close[symbol].pct_change().dropna()

            asset_returns, aligned_bench_returns = returns.align(
                bench_returns,
                join="inner"
            )

            if len(asset_returns) < 10:
                continue

            x = np.vstack([
                aligned_bench_returns.values,
                np.ones(len(aligned_bench_returns.values))
            ]).T

            y = asset_returns.values

            try:
                result = np.linalg.lstsq(x, y, rcond=None)
                betas[symbol] = result[0][0]
            except:
                continue

        return sorted(
            betas.items(),
            key=lambda x: x[1],
            reverse=False
        )

    def GetIntParameter(self, name, default_value):

        value = self.GetParameter(name)

        if value is None or value == "":
            return default_value

        return int(value)

    def GetFloatParameter(self, name, default_value):

        value = self.GetParameter(name)

        if value is None or value == "":
            return default_value

        return float(value)