Overall Statistics
Total Orders
474
Average Win
0.52%
Average Loss
-0.43%
Compounding Annual Return
8.377%
Drawdown
24.000%
Expectancy
0.254
Start Equity
10000
End Equity
14183.26
Net Profit
41.833%
Sharpe Ratio
0.154
Sortino Ratio
0.184
Probabilistic Sharpe Ratio
8.252%
Loss Rate
43%
Win Rate
57%
Profit-Loss Ratio
1.21
Alpha
-0.022
Beta
0.995
Annual Standard Deviation
0.149
Annual Variance
0.022
Information Ratio
-0.598
Tracking Error
0.037
Treynor Ratio
0.023
Total Fees
$474.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
IBB S23QOUCNOW9X
Portfolio Turnover
2.53%
Drawdown Recovery
721
#region imports
from AlgorithmImports import *
#endregion

import numpy as np


"""
SCHEDULED CAPM ALPHA/BETA ROTATION STRATEGY

This strategy ranks a universe of U.S. equity ETFs using a simple CAPM-style
regression versus IVV.

The model estimates each ETF's relationship to IVV over a rolling lookback window:

    ETF return = alpha + beta * IVV return + error

The regression produces:

1. Alpha:
   The intercept of the regression. It represents the ETF's return component not
   explained by IVV over the lookback window.

2. Beta:
   The slope of the regression. It measures the ETF's sensitivity to IVV.

The strategy selects two groups:

- Highest-alpha ETFs
- Highest-beta ETFs

The selected ETFs are combined into one basket. Duplicates are removed. The
selected basket receives a configurable allocation, and the remaining allocation
goes to SPY as a market core.

The strategy rebalances every few months, controlled by the parameter
rebalance_frequency_months.

Risk management:

The model tracks portfolio value from the latest rebalance. If the portfolio
falls more than the stop-loss threshold, it liquidates and waits until the next
scheduled rebalance to restart.

Benchmark:

IVV buy-and-hold is plotted as the benchmark.
"""


class ScheduledCAPM(QCAlgorithm):

    def Initialize(self):

        # ------------------------------------------------------------
        # 1. BACKTEST SETTINGS
        # ------------------------------------------------------------
        self.SetStartDate(2022, 1, 1)
        self.SetEndDate(2026, 5, 5)

        self.initial_cash = 10000
        self.SetCash(self.initial_cash)

        # ------------------------------------------------------------
        # 2. PARAMETERS
        # ------------------------------------------------------------
        self.lookback = self.GetIntParameter("lookback", 60)
        self.rebalance_frequency_months = self.GetIntParameter("rebalance_frequency_months", 3)

        self.num_alpha = self.GetIntParameter("number_of_stocks", 3)
        self.num_beta = self.GetIntParameter("number_of_stocks_beta", 3)

        self.selected_basket_weight = self.GetFloatParameter("selected_basket_weight", 0.80)
        self.stop_loss = self.GetFloatParameter("stop_loss", 0.15)

        if self.lookback < 30:
            self.lookback = 30

        if self.rebalance_frequency_months < 1:
            self.rebalance_frequency_months = 1

        if self.num_alpha < 0:
            self.num_alpha = 0

        if self.num_beta < 0:
            self.num_beta = 0

        self.selected_basket_weight = max(0.0, min(1.0, self.selected_basket_weight))

        # ------------------------------------------------------------
        # 3. ETF UNIVERSE
        # ------------------------------------------------------------
        self.tickers = [
            "IJR", "IWM", "IWF", "IJH", "IWD", "ITOT", "IVW",
            "IWR", "IWB", "IVE", "IWN", "IWP", "IWS", "IWO",
            "IWV", "IUSG", "IBB", "IUSV", "IHI", "IJS", "IJJ",
            "IJK", "IYW", "OEF"
        ]

        self.symbols = []

        for ticker in self.tickers:
            symbol = self.AddEquity(ticker, Resolution.Daily).Symbol
            self.symbols.append(symbol)

        self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol

        # IMPORTANT:
        # Do not use self.benchmark. That conflicts with QCAlgorithm internals.
        self._benchmark = self.AddEquity("IVV", Resolution.Daily).Symbol

        self.SetBenchmark(self._benchmark)

        # ------------------------------------------------------------
        # 4. BENCHMARK PLOTTING STATE
        # ------------------------------------------------------------
        self.initial_benchmark_price = None

        # ------------------------------------------------------------
        # 5. REBALANCE STATE
        # ------------------------------------------------------------
        self.rebalance_counter = 0
        self.last_targets = {}
        self.last_selected_symbols = []

        for symbol in self.symbols + [self.spy]:
            self.last_targets[symbol] = 0.0

        # ------------------------------------------------------------
        # 6. RISK STATE
        # ------------------------------------------------------------
        self.risk_reference_value = self.initial_cash
        self.stop_triggered = False

        # ------------------------------------------------------------
        # 7. WARMUP
        # ------------------------------------------------------------
        self.SetWarmUp(self.lookback + 5, Resolution.Daily)

        # ------------------------------------------------------------
        # 8. SCHEDULED REBALANCE
        # ------------------------------------------------------------
        self.Schedule.On(
            self.DateRules.MonthStart(self.spy),
            self.TimeRules.AfterMarketOpen(self.spy, 10),
            self.Rebalance
        )

    def Rebalance(self):

        if self.IsWarmingUp:
            return

        self.rebalance_counter += 1

        if self.rebalance_counter % self.rebalance_frequency_months != 0:
            return

        if self.stop_triggered:
            self.stop_triggered = False
            self.risk_reference_value = self.Portfolio.TotalPortfolioValue

            self.Debug(
                "Restarting after stop-loss at rebalance on "
                + str(self.Time.date())
            )

        # ------------------------------------------------------------
        # 1. LOAD HISTORY
        # ------------------------------------------------------------
        all_symbols = self.symbols + [self._benchmark]

        history = self.History(
            all_symbols,
            self.lookback + 1,
            Resolution.Daily
        )

        if history.empty:
            self.Debug("History is empty on " + str(self.Time.date()))
            return

        try:
            close = history["close"].unstack(level=0)
        except:
            self.Debug("Could not unstack history on " + str(self.Time.date()))
            return

        if self._benchmark not in close.columns:
            self.Debug("Benchmark missing from history on " + str(self.Time.date()))
            return

        # ------------------------------------------------------------
        # 2. SELECT SYMBOLS
        # ------------------------------------------------------------
        alpha_table, selected_alpha = self.SelectSymbolsByAlpha(close)
        beta_table, selected_beta = self.SelectSymbolsByBeta(close)

        selected_symbols = []

        for symbol in selected_alpha + selected_beta:
            if symbol not in selected_symbols:
                selected_symbols.append(symbol)

        self.last_selected_symbols = selected_symbols

        # ------------------------------------------------------------
        # 3. BUILD TARGET WEIGHTS
        # ------------------------------------------------------------
        target_weights = {}

        for symbol in self.symbols:
            target_weights[symbol] = 0.0

        target_weights[self.spy] = 0.0

        if len(selected_symbols) > 0:

            selected_weight = self.selected_basket_weight / len(selected_symbols)

            for symbol in selected_symbols:
                target_weights[symbol] = selected_weight

            target_weights[self.spy] = 1.0 - self.selected_basket_weight

        else:

            target_weights[self.spy] = 1.0

        # ------------------------------------------------------------
        # 4. APPLY TARGETS
        # ------------------------------------------------------------
        for symbol, target_weight in target_weights.items():
            self.SetHoldings(symbol, target_weight)

        self.risk_reference_value = self.Portfolio.TotalPortfolioValue

        # ------------------------------------------------------------
        # 5. TURNOVER
        # ------------------------------------------------------------
        turnover = 0.0

        for symbol, target_weight in target_weights.items():
            previous_weight = self.last_targets.get(symbol, 0.0)
            turnover += abs(target_weight - previous_weight)
            self.last_targets[symbol] = target_weight

        turnover = turnover / 2.0

        # ------------------------------------------------------------
        # 6. LOGGING AND PLOTS
        # ------------------------------------------------------------
        self.Debug(
            "Rebalance "
            + str(self.Time.date())
            + " | Selected Alpha: "
            + str([x.Value for x in selected_alpha])
            + " | Selected Beta: "
            + str([x.Value for x in selected_beta])
            + " | Final Basket: "
            + str([x.Value for x in selected_symbols])
            + " | Turnover="
            + str(round(turnover, 4))
        )

        self.Plot("CAPM Diagnostics", "Selected Count", len(selected_symbols))
        self.Plot("CAPM Diagnostics", "Turnover", turnover)

        if len(alpha_table) > 0:
            self.Plot("CAPM Diagnostics", "Top Alpha", alpha_table[0][1])

        if len(beta_table) > 0:
            self.Plot("CAPM Diagnostics", "Top Beta", beta_table[0][1])

    def OnData(self, data):

        if self.IsWarmingUp:
            return

        # ------------------------------------------------------------
        # 1. STOP LOSS CHECK
        # ------------------------------------------------------------
        if not self.stop_triggered and self.risk_reference_value > 0:

            drawdown_from_risk_reference = (
                self.Portfolio.TotalPortfolioValue
                / self.risk_reference_value
                - 1.0
            )

            if drawdown_from_risk_reference <= -self.stop_loss:

                self.Liquidate()
                self.stop_triggered = True

                self.Debug(
                    "Stop-loss triggered on "
                    + str(self.Time.date())
                    + " drawdown="
                    + str(round(drawdown_from_risk_reference, 4))
                )

        # ------------------------------------------------------------
        # 2. BENCHMARK PLOT
        # ------------------------------------------------------------
        if self._benchmark in data and data[self._benchmark] is not None:

            benchmark_price = self.Securities[self._benchmark].Price

            if benchmark_price > 0:

                if self.initial_benchmark_price is None:
                    self.initial_benchmark_price = benchmark_price

                benchmark_value = (
                    self.initial_cash
                    * benchmark_price
                    / self.initial_benchmark_price
                )

                self.Plot(
                    "Strategy Equity",
                    "IVV Benchmark",
                    benchmark_value
                )

        self.Plot(
            "Strategy Equity",
            "Portfolio Value",
            self.Portfolio.TotalPortfolioValue
        )

        self.Plot(
            "Risk Diagnostics",
            "Stop Triggered",
            1 if self.stop_triggered else 0
        )

    def SelectSymbolsByAlpha(self, close):

        alphas = {}

        benchmark_returns = close[self._benchmark].pct_change().dropna()

        for symbol in self.symbols:

            if symbol not in close.columns:
                continue

            returns = close[symbol].pct_change().dropna()

            asset_returns, bench_returns = returns.align(
                benchmark_returns,
                join="inner"
            )

            if len(asset_returns) < 10:
                continue

            x = np.vstack(
                [
                    bench_returns.values,
                    np.ones(len(bench_returns.values))
                ]
            ).T

            y = asset_returns.values

            try:
                result = np.linalg.lstsq(x, y, rcond=None)
                alpha = result[0][1]
                alphas[symbol] = alpha

            except:
                continue

        selected_alphas = sorted(
            alphas.items(),
            key=lambda x: x[1],
            reverse=True
        )

        return selected_alphas, [x[0] for x in selected_alphas[:self.num_alpha]]

    def SelectSymbolsByBeta(self, close):

        betas = {}

        benchmark_returns = close[self._benchmark].pct_change().dropna()

        for symbol in self.symbols:

            if symbol not in close.columns:
                continue

            returns = close[symbol].pct_change().dropna()

            asset_returns, bench_returns = returns.align(
                benchmark_returns,
                join="inner"
            )

            if len(asset_returns) < 10:
                continue

            x = np.vstack(
                [
                    bench_returns.values,
                    np.ones(len(bench_returns.values))
                ]
            ).T

            y = asset_returns.values

            try:
                result = np.linalg.lstsq(x, y, rcond=None)
                beta = result[0][0]
                betas[symbol] = beta

            except:
                continue

        selected_betas = sorted(
            betas.items(),
            key=lambda x: x[1],
            reverse=True
        )

        return selected_betas, [x[0] for x in selected_betas[:self.num_beta]]

    def GetIntParameter(self, name, default_value):

        value = self.GetParameter(name)

        if value is None or value == "":
            return default_value

        return int(value)

    def GetFloatParameter(self, name, default_value):

        value = self.GetParameter(name)

        if value is None or value == "":
            return default_value

        return float(value)