Overall Statistics
Total Orders
1256
Average Win
0.46%
Average Loss
-0.43%
Compounding Annual Return
0.049%
Drawdown
4.200%
Expectancy
0.007
Start Equity
100000
End Equity
100314.46
Net Profit
0.314%
Sharpe Ratio
-1.597
Sortino Ratio
-1.417
Probabilistic Sharpe Ratio
0.158%
Loss Rate
51%
Win Rate
49%
Profit-Loss Ratio
1.07
Alpha
0
Beta
0
Annual Standard Deviation
0.02
Annual Variance
0
Information Ratio
0.027
Tracking Error
0.02
Treynor Ratio
0
Total Fees
$2768.55
Estimated Strategy Capacity
$0
Lowest Capacity Asset
DVY STHD6FIMA3XH
Portfolio Turnover
19.00%
Drawdown Recovery
738
#region imports
from AlgorithmImports import *
import statsmodels.formula.api as sm
from statsmodels.tsa.stattools import coint, adfuller
#endregion

class Pairs(object):

    def __init__(self, a, b):
        self.a = a
        self.b = b
        self.Name = f'{a.Symbol.Value}:{b.Symbol.Value}'

        self.Model = None
        self.MeanError = 0
        self.StandardDeviation = 0
        self.Epsilon = 0

    @property
    def DataFrame(self):
        df = pd.concat([self.a.DataFrame.droplevel([0]), self.b.DataFrame.droplevel([0])], axis=1).dropna()
        df.columns = [self.a.Symbol.Value, self.b.Symbol.Value]
        return df
    
    @property
    def Correlation(self):
        return self.DataFrame.corr().iloc[0][1]

    def cointegration_test(self):
        coint_test = coint(self.a.Series.values.flatten(), self.b.Series.values.flatten(), trend="n", maxlag=0)

        # Return if not cointegrated
        if coint_test[1] >= 0.05:
            return False

        self.Model = sm.ols(formula = f'{self.a.Symbol.Value} ~ {self.b.Symbol.Value}', data=self.DataFrame).fit()
        self.StationaryP = adfuller(self.Model.resid, autolag = 'BIC')[1]
        self.MeanError = np.mean(self.Model.resid)
        self.Epsilon = np.std(self.Model.resid)
        
        return True
#region imports
from AlgorithmImports import *
#endregion

class SymbolData(object):

    def __init__(self, algorithm, symbol, lookback, interval):
        lookback = int(lookback)
        self.Symbol = symbol
        self.Prices = RollingWindow[TradeBar](lookback // interval)
        self.Series = None
        self.DataFrame = None

        self._algorithm = algorithm
        self._consolidator = TradeBarConsolidator(timedelta(minutes=interval))
        self._consolidator.DataConsolidated += self.OnDataConsolidated

        history = algorithm.History(symbol, lookback, Resolution.Minute)
        for bar in history.itertuples():
            trade_bar = TradeBar(bar.Index[1], symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
            self.Update(trade_bar)

    @property
    def IsReady(self):
        return self.Prices.IsReady
    
    def Update(self, trade_bar):
        self._consolidator.Update(trade_bar)
    
    def OnDataConsolidated(self, sender, consolidated):
        self.Prices.Add(consolidated)
        if self.IsReady:
            self.Series = self._algorithm.PandasConverter.GetDataFrame[TradeBar](self.Prices)['close']
            self.DataFrame = self.Series.to_frame()
#region imports
from AlgorithmImports import *
#endregion

class TradingPair(object):

    def __init__(self, ticket_a, ticket_b, intercept, slope, mean_error, epsilon):
        self.ticket_a = ticket_a
        self.ticket_b = ticket_b

        self.model_intercept = intercept
        self.model_slope = slope

        self.mean_error = mean_error
        self.epsilon = epsilon
#region imports
from AlgorithmImports import *
#endregion

import numpy as np
import statsmodels.api as sm
import statsmodels.tsa.stattools as ts


"""
SCHEDULED FACTOR ETF PAIRS TRADING STRATEGY

This strategy applies a pairs-trading framework to a small universe of factor ETFs:

    IVV   = broad U.S. equity market
    USMV  = minimum volatility
    QUAL  = quality
    DGRO  = dividend growth
    DVY   = dividend yield
    VLUE  = value
    MTUM  = momentum

The model searches for ETF pairs that are both highly correlated and cointegrated.
The purpose is to identify pairs whose relative price relationship appears
statistically stable enough for a mean-reversion trade.

Once per month, the algorithm:

1. Downloads daily price history.
2. Calculates return correlations for every ETF pair.
3. Keeps pairs above the correlation threshold.
4. Tests those pairs for cointegration.
5. Selects the strongest cointegrated pairs without reusing the same ETF in
   multiple active pair candidates.

For each selected pair, the model estimates:

    log(price A) = intercept + beta * log(price B) + residual

The residual is the spread. The strategy then trades the spread z-score:

    z = (current spread - historical spread mean) / historical spread std

Trading logic:

- If z-score is high:
      ETF A is expensive relative to ETF B.
      Short ETF A and long ETF B.

- If z-score is low:
      ETF A is cheap relative to ETF B.
      Long ETF A and short ETF B.

- If z-score mean-reverts toward zero:
      Close the pair.

Risk controls:

- Maximum number of selected pairs
- Maximum gross exposure
- Per-pair stop loss using z-score
- No symbol is reused across selected pairs
- Cash benchmark, because this is a long/short relative-value strategy
"""


class FactorPairsTrading(QCAlgorithm):

    def Initialize(self):

        # ------------------------------------------------------------
        # 1. BACKTEST SETTINGS
        # ------------------------------------------------------------
        self.SetStartDate(2020, 1, 1)
        self.SetEndDate(2026, 5, 5)

        self.initial_cash = 100000
        self.SetCash(self.initial_cash)

        # ------------------------------------------------------------
        # 2. FACTOR ETF UNIVERSE
        # ------------------------------------------------------------
        tickers = [
            "IVV",
            "USMV",
            "QUAL",
            "DGRO",
            "DVY",
            "VLUE",
            "MTUM"
        ]

        self.symbols = []

        for ticker in tickers:
            symbol = self.AddEquity(ticker, Resolution.Daily).Symbol
            self.symbols.append(symbol)

        # ------------------------------------------------------------
        # 3. PARAMETERS
        # ------------------------------------------------------------
        self.lookback = self.GetIntParameter("lookback", 126)
        self.correlation_threshold = self.GetFloatParameter("correlation_threshold", 0.70)
        self.cointegration_pvalue = self.GetFloatParameter("cointegration_pvalue", 0.10)

        self.max_pairs = self.GetIntParameter("max_pairs", 2)

        self.entry_z = self.GetFloatParameter("entry_z", 1.75)
        self.exit_z = self.GetFloatParameter("exit_z", 0.50)
        self.stop_z = self.GetFloatParameter("stop_z", 3.00)

        self.target_gross_exposure = self.GetFloatParameter("target_gross_exposure", 1.00)
        self.minimum_trade_change = self.GetFloatParameter("minimum_trade_change", 0.02)

        # Safety checks
        self.lookback = max(60, self.lookback)
        self.max_pairs = max(1, self.max_pairs)
        self.target_gross_exposure = max(0.0, min(2.0, self.target_gross_exposure))

        # ------------------------------------------------------------
        # 4. STATE
        # ------------------------------------------------------------
        self.selected_pairs = []
        self.active_pairs = {}
        self.rebalance_count = 0

        # Cash benchmark is appropriate for a long/short relative-value model.
        self.SetBenchmark(lambda time: self.initial_cash)

        self.SetWarmUp(self.lookback + 5, Resolution.Daily)

        # ------------------------------------------------------------
        # 5. SCHEDULES
        # ------------------------------------------------------------
        self.Schedule.On(
            self.DateRules.MonthStart(self.symbols[0]),
            self.TimeRules.AfterMarketOpen(self.symbols[0], 30),
            self.SelectPairs
        )

        self.Schedule.On(
            self.DateRules.EveryDay(self.symbols[0]),
            self.TimeRules.AfterMarketOpen(self.symbols[0], 60),
            self.TradePairs
        )

    def SelectPairs(self):

        if self.IsWarmingUp:
            return

        # ------------------------------------------------------------
        # 1. LOAD HISTORY
        # ------------------------------------------------------------
        history = self.History(
            self.symbols,
            self.lookback,
            Resolution.Daily
        )

        if history.empty:
            self.Debug("No history available for factor pair selection.")
            return

        try:
            close = history["close"].unstack(level=0)
        except:
            self.Debug("Could not unstack close history.")
            return

        close = close.dropna(axis=1)

        if close.shape[1] < 2:
            self.Debug("Not enough factor ETFs with complete history.")
            return

        # ------------------------------------------------------------
        # 2. CORRELATION AND COINTEGRATION SCREEN
        # ------------------------------------------------------------
        returns = close.pct_change().dropna()
        correlations = returns.corr()

        candidate_pairs = []
        symbols_available = list(close.columns)

        for i in range(len(symbols_available)):

            for j in range(i + 1, len(symbols_available)):

                symbol_a = symbols_available[i]
                symbol_b = symbols_available[j]

                correlation = correlations.loc[symbol_a, symbol_b]

                if correlation < self.correlation_threshold:
                    continue

                price_a = close[symbol_a]
                price_b = close[symbol_b]

                try:
                    coint_result = ts.coint(
                        np.log(price_a),
                        np.log(price_b)
                    )

                    pvalue = coint_result[1]

                except:
                    continue

                if pvalue > self.cointegration_pvalue:
                    continue

                model = self.EstimateSpreadModel(price_a, price_b)

                if model is None:
                    continue

                candidate_pairs.append(
                    {
                        "a": symbol_a,
                        "b": symbol_b,
                        "correlation": correlation,
                        "pvalue": pvalue,
                        "intercept": model["intercept"],
                        "beta": model["beta"],
                        "spread_mean": model["spread_mean"],
                        "spread_std": model["spread_std"]
                    }
                )

        candidate_pairs = sorted(
            candidate_pairs,
            key=lambda x: (x["pvalue"], -x["correlation"])
        )

        # ------------------------------------------------------------
        # 3. SELECT NON-OVERLAPPING PAIRS
        # ------------------------------------------------------------
        selected = []
        used_symbols = set()

        for pair in candidate_pairs:

            if pair["a"] in used_symbols or pair["b"] in used_symbols:
                continue

            selected.append(pair)
            used_symbols.add(pair["a"])
            used_symbols.add(pair["b"])

            if len(selected) >= self.max_pairs:
                break

        # ------------------------------------------------------------
        # 4. CLOSE PAIRS NO LONGER SELECTED
        # ------------------------------------------------------------
        selected_keys = set(
            [
                self.PairKey(pair["a"], pair["b"])
                for pair in selected
            ]
        )

        for key in list(self.active_pairs.keys()):

            if key not in selected_keys:

                old_pair = self.active_pairs[key]

                self.Liquidate(old_pair["a"])
                self.Liquidate(old_pair["b"])

                del self.active_pairs[key]

        self.selected_pairs = selected
        self.rebalance_count += 1

        self.Debug(
            "Factor pair selection "
            + str(self.Time.date())
            + " | selected="
            + str([
                pair["a"].Value + "/" + pair["b"].Value
                for pair in selected
            ])
        )

        self.Plot(
            "Pair Diagnostics",
            "Selected Pair Count",
            len(self.selected_pairs)
        )

        self.Plot(
            "Pair Diagnostics",
            "Candidate Pair Count",
            len(candidate_pairs)
        )

        self.Plot(
            "Pair Diagnostics",
            "Rebalance Count",
            self.rebalance_count
        )

    def TradePairs(self):

        if self.IsWarmingUp:
            return

        if len(self.selected_pairs) == 0:
            return

        pair_budget = self.target_gross_exposure / len(self.selected_pairs)
        leg_weight = pair_budget / 2.0

        for pair in self.selected_pairs:

            symbol_a = pair["a"]
            symbol_b = pair["b"]
            key = self.PairKey(symbol_a, symbol_b)

            if not self.Securities[symbol_a].HasData:
                continue

            if not self.Securities[symbol_b].HasData:
                continue

            price_a = self.Securities[symbol_a].Price
            price_b = self.Securities[symbol_b].Price

            if price_a <= 0 or price_b <= 0:
                continue

            spread = (
                np.log(price_a)
                - pair["intercept"]
                - pair["beta"] * np.log(price_b)
            )

            if pair["spread_std"] <= 0:
                continue

            z_score = (
                spread
                - pair["spread_mean"]
            ) / pair["spread_std"]

            is_active = key in self.active_pairs

            # --------------------------------------------------------
            # Entry logic
            # --------------------------------------------------------
            if not is_active:

                if z_score > self.entry_z:

                    # A is expensive relative to B: short A, long B.
                    self.SetTargetIfChanged(symbol_a, -leg_weight)
                    self.SetTargetIfChanged(symbol_b, leg_weight)

                    self.active_pairs[key] = {
                        "a": symbol_a,
                        "b": symbol_b,
                        "direction": -1,
                        "entry_z": z_score
                    }

                    self.Debug(
                        "Enter SHORT spread "
                        + symbol_a.Value
                        + "/"
                        + symbol_b.Value
                        + " z="
                        + str(round(z_score, 2))
                    )

                elif z_score < -self.entry_z:

                    # A is cheap relative to B: long A, short B.
                    self.SetTargetIfChanged(symbol_a, leg_weight)
                    self.SetTargetIfChanged(symbol_b, -leg_weight)

                    self.active_pairs[key] = {
                        "a": symbol_a,
                        "b": symbol_b,
                        "direction": 1,
                        "entry_z": z_score
                    }

                    self.Debug(
                        "Enter LONG spread "
                        + symbol_a.Value
                        + "/"
                        + symbol_b.Value
                        + " z="
                        + str(round(z_score, 2))
                    )

            # --------------------------------------------------------
            # Exit logic
            # --------------------------------------------------------
            else:

                mean_reverted = abs(z_score) <= self.exit_z
                stopped = abs(z_score) >= self.stop_z

                if mean_reverted or stopped:

                    self.Liquidate(symbol_a)
                    self.Liquidate(symbol_b)

                    del self.active_pairs[key]

                    reason = "mean reversion" if mean_reverted else "stop loss"

                    self.Debug(
                        "Exit pair "
                        + symbol_a.Value
                        + "/"
                        + symbol_b.Value
                        + " reason="
                        + reason
                        + " z="
                        + str(round(z_score, 2))
                    )

            self.Plot(
                "Pair Signal",
                symbol_a.Value + "/" + symbol_b.Value,
                z_score
            )

        self.Plot(
            "Strategy Equity",
            "Portfolio Value",
            self.Portfolio.TotalPortfolioValue
        )

        self.Plot(
            "Strategy Equity",
            "Cash Benchmark",
            self.initial_cash
        )

        self.Plot(
            "Pair Diagnostics",
            "Active Pair Count",
            len(self.active_pairs)
        )

    def EstimateSpreadModel(self, price_a, price_b):

        if len(price_a) != len(price_b):
            return None

        log_a = np.log(price_a.values)
        log_b = np.log(price_b.values)

        if len(log_a) < 30:
            return None

        x = sm.add_constant(log_b)

        try:
            model = sm.OLS(log_a, x).fit()
        except:
            return None

        intercept = model.params[0]
        beta = model.params[1]

        spread = log_a - intercept - beta * log_b

        spread_mean = np.mean(spread)
        spread_std = np.std(spread)

        if spread_std <= 0:
            return None

        return {
            "intercept": intercept,
            "beta": beta,
            "spread_mean": spread_mean,
            "spread_std": spread_std
        }

    def SetTargetIfChanged(self, symbol, target_weight):

        current_weight = self.GetCurrentWeight(symbol)

        if abs(target_weight - current_weight) >= self.minimum_trade_change:
            self.SetHoldings(symbol, target_weight)

    def GetCurrentWeight(self, symbol):

        if self.Portfolio.TotalPortfolioValue <= 0:
            return 0.0

        return (
            self.Portfolio[symbol].HoldingsValue
            / self.Portfolio.TotalPortfolioValue
        )

    def PairKey(self, symbol_a, symbol_b):

        return symbol_a.Value + "_" + symbol_b.Value

    def GetIntParameter(self, name, default_value):

        value = self.GetParameter(name)

        if value is None or value == "":
            return default_value

        return int(value)

    def GetFloatParameter(self, name, default_value):

        value = self.GetParameter(name)

        if value is None or value == "":
            return default_value

        return float(value)