Overall Statistics
Total Orders
736
Average Win
0.55%
Average Loss
-0.47%
Compounding Annual Return
0.931%
Drawdown
3.900%
Expectancy
0.024
Start Equity
50000
End Equity
51572.98
Net Profit
3.146%
Sharpe Ratio
-1.455
Sortino Ratio
-1.219
Probabilistic Sharpe Ratio
4.373%
Loss Rate
53%
Win Rate
47%
Profit-Loss Ratio
1.17
Alpha
0
Beta
0
Annual Standard Deviation
0.032
Annual Variance
0.001
Information Ratio
0.214
Tracking Error
0.032
Treynor Ratio
0
Total Fees
$1343.36
Estimated Strategy Capacity
$0
Lowest Capacity Asset
PNC R735QTJ8XC9X
Portfolio Turnover
12.76%
Drawdown Recovery
127
#region imports
from AlgorithmImports import *
#endregion

import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.tsa.stattools as ts


"""
SCHEDULED PAIRS TRADING STRATEGY

This strategy searches for statistically related equity pairs and trades
mean-reversion in their spread.

The universe is a diversified list of large bank and financial stocks. Once per
month, the algorithm:

1. Downloads daily historical prices.
2. Calculates pairwise return correlations.
3. Keeps pairs with sufficiently high correlation.
4. Runs a cointegration test on the price series.
5. Selects the strongest cointegrated pairs without reusing the same symbol in
   multiple pairs.

For each selected pair, the model estimates a hedge relationship:

    log(price A) = intercept + beta * log(price B) + error

The error term is the spread. The model calculates the spread z-score:

    z = (current spread - historical spread mean) / historical spread std

Trading logic:

- If z-score is high:
      stock A is expensive relative to stock B
      short A, long B

- If z-score is low:
      stock A is cheap relative to stock B
      long A, short B

- If z-score mean-reverts toward zero:
      close the pair

Risk management:

- Maximum number of active pairs
- Maximum gross exposure
- Per-pair stop-loss based on spread z-score
- Re-selection of pairs monthly
- Cash benchmark, because this is a long/short relative-value strategy

This version avoids minute-level loops and avoids dependency on an external
pair.py file.
"""


class ScheduledPairsTrading(QCAlgorithm):

    def Initialize(self):

        # ------------------------------------------------------------
        # 1. BACKTEST SETTINGS
        # ------------------------------------------------------------
        self.SetStartDate(2023, 1, 1)
        self.SetEndDate(2026, 5, 5)

        self.initial_cash = 50000
        self.SetCash(self.initial_cash)

        # ------------------------------------------------------------
        # 2. UNIVERSE
        # ------------------------------------------------------------
        tickers = [
            "BAC", "JPM", "WFC", "C", "USB", "PNC", "BK", "STT",
            "KEY", "RF", "CFG", "FITB", "TFC", "MTB", "HBAN",
            "TD", "RY", "BMO", "BNS", "CM"
        ]

        self.symbols = []

        for ticker in tickers:
            symbol = self.AddEquity(ticker, Resolution.Daily).Symbol
            self.symbols.append(symbol)

        # ------------------------------------------------------------
        # 3. PARAMETERS
        # ------------------------------------------------------------
        self.lookback = self.GetIntParameter("lookback", 126)
        self.correlation_threshold = self.GetFloatParameter("correlation_threshold", 0.75)
        self.cointegration_pvalue = self.GetFloatParameter("cointegration_pvalue", 0.05)

        self.max_pairs = self.GetIntParameter("max_pairs", 3)
        self.entry_z = self.GetFloatParameter("entry_z", 2.0)
        self.exit_z = self.GetFloatParameter("exit_z", 0.50)
        self.stop_z = self.GetFloatParameter("stop_z", 3.5)

        self.target_gross_exposure = self.GetFloatParameter("target_gross_exposure", 1.00)
        self.minimum_trade_change = self.GetFloatParameter("minimum_trade_change", 0.02)

        # Safety checks.
        self.lookback = max(60, self.lookback)
        self.max_pairs = max(1, self.max_pairs)
        self.target_gross_exposure = max(0.0, min(2.0, self.target_gross_exposure))

        # ------------------------------------------------------------
        # 4. STATE
        # ------------------------------------------------------------
        self.selected_pairs = []
        self.active_pairs = {}

        self.rebalance_count = 0

        # Cash benchmark is appropriate for long/short relative value.
        self.SetBenchmark(lambda time: self.initial_cash)

        self.SetWarmUp(self.lookback + 5, Resolution.Daily)

        # ------------------------------------------------------------
        # 5. SCHEDULES
        # ------------------------------------------------------------
        self.Schedule.On(
            self.DateRules.MonthStart(self.symbols[0]),
            self.TimeRules.AfterMarketOpen(self.symbols[0], 30),
            self.SelectPairs
        )

        self.Schedule.On(
            self.DateRules.EveryDay(self.symbols[0]),
            self.TimeRules.AfterMarketOpen(self.symbols[0], 60),
            self.TradePairs
        )

    def SelectPairs(self):

        if self.IsWarmingUp:
            return

        history = self.History(
            self.symbols,
            self.lookback,
            Resolution.Daily
        )

        if history.empty:
            self.Debug("No history available for pair selection.")
            return

        try:
            close = history["close"].unstack(level=0)
        except:
            self.Debug("Could not unstack close history.")
            return

        close = close.dropna(axis=1)

        if close.shape[1] < 2:
            self.Debug("Not enough symbols with complete history.")
            return

        returns = close.pct_change().dropna()
        correlations = returns.corr()

        candidate_pairs = []

        symbols_available = list(close.columns)

        for i in range(len(symbols_available)):

            for j in range(i + 1, len(symbols_available)):

                symbol_a = symbols_available[i]
                symbol_b = symbols_available[j]

                correlation = correlations.loc[symbol_a, symbol_b]

                if correlation < self.correlation_threshold:
                    continue

                price_a = close[symbol_a]
                price_b = close[symbol_b]

                try:
                    coint_result = ts.coint(
                        np.log(price_a),
                        np.log(price_b)
                    )

                    pvalue = coint_result[1]

                except:
                    continue

                if pvalue > self.cointegration_pvalue:
                    continue

                model = self.EstimateSpreadModel(price_a, price_b)

                if model is None:
                    continue

                candidate_pairs.append(
                    {
                        "a": symbol_a,
                        "b": symbol_b,
                        "correlation": correlation,
                        "pvalue": pvalue,
                        "intercept": model["intercept"],
                        "beta": model["beta"],
                        "spread_mean": model["spread_mean"],
                        "spread_std": model["spread_std"]
                    }
                )

        candidate_pairs = sorted(
            candidate_pairs,
            key=lambda x: (x["pvalue"], -x["correlation"])
        )

        selected = []
        used_symbols = set()

        for pair in candidate_pairs:

            if pair["a"] in used_symbols or pair["b"] in used_symbols:
                continue

            selected.append(pair)
            used_symbols.add(pair["a"])
            used_symbols.add(pair["b"])

            if len(selected) >= self.max_pairs:
                break

        # Close active pairs that are no longer selected.
        selected_keys = set([self.PairKey(x["a"], x["b"]) for x in selected])

        for key in list(self.active_pairs.keys()):

            if key not in selected_keys:

                old_pair = self.active_pairs[key]
                self.Liquidate(old_pair["a"])
                self.Liquidate(old_pair["b"])
                del self.active_pairs[key]

        self.selected_pairs = selected
        self.rebalance_count += 1

        self.Debug(
            "Pair selection "
            + str(self.Time.date())
            + " | selected="
            + str([
                pair["a"].Value + "/" + pair["b"].Value
                for pair in selected
            ])
        )

        self.Plot("Pair Diagnostics", "Selected Pair Count", len(self.selected_pairs))
        self.Plot("Pair Diagnostics", "Rebalance Count", self.rebalance_count)

    def TradePairs(self):

        if self.IsWarmingUp:
            return

        if len(self.selected_pairs) == 0:
            return

        pair_budget = self.target_gross_exposure / len(self.selected_pairs)
        leg_weight = pair_budget / 2.0

        for pair in self.selected_pairs:

            symbol_a = pair["a"]
            symbol_b = pair["b"]
            key = self.PairKey(symbol_a, symbol_b)

            if not self.Securities[symbol_a].HasData:
                continue

            if not self.Securities[symbol_b].HasData:
                continue

            price_a = self.Securities[symbol_a].Price
            price_b = self.Securities[symbol_b].Price

            if price_a <= 0 or price_b <= 0:
                continue

            spread = (
                np.log(price_a)
                - pair["intercept"]
                - pair["beta"] * np.log(price_b)
            )

            if pair["spread_std"] <= 0:
                continue

            z_score = (
                spread
                - pair["spread_mean"]
            ) / pair["spread_std"]

            is_active = key in self.active_pairs

            # --------------------------------------------------------
            # Entry logic
            # --------------------------------------------------------
            if not is_active:

                if z_score > self.entry_z:

                    # A is rich relative to B: short A, long B.
                    self.SetTargetIfChanged(symbol_a, -leg_weight)
                    self.SetTargetIfChanged(symbol_b, leg_weight)

                    self.active_pairs[key] = {
                        "a": symbol_a,
                        "b": symbol_b,
                        "direction": -1,
                        "entry_z": z_score
                    }

                    self.Debug(
                        "Enter SHORT spread "
                        + symbol_a.Value
                        + "/"
                        + symbol_b.Value
                        + " z="
                        + str(round(z_score, 2))
                    )

                elif z_score < -self.entry_z:

                    # A is cheap relative to B: long A, short B.
                    self.SetTargetIfChanged(symbol_a, leg_weight)
                    self.SetTargetIfChanged(symbol_b, -leg_weight)

                    self.active_pairs[key] = {
                        "a": symbol_a,
                        "b": symbol_b,
                        "direction": 1,
                        "entry_z": z_score
                    }

                    self.Debug(
                        "Enter LONG spread "
                        + symbol_a.Value
                        + "/"
                        + symbol_b.Value
                        + " z="
                        + str(round(z_score, 2))
                    )

            # --------------------------------------------------------
            # Exit logic
            # --------------------------------------------------------
            else:

                active = self.active_pairs[key]

                mean_reverted = abs(z_score) <= self.exit_z
                stopped = abs(z_score) >= self.stop_z

                if mean_reverted or stopped:

                    self.Liquidate(symbol_a)
                    self.Liquidate(symbol_b)
                    del self.active_pairs[key]

                    reason = "mean reversion" if mean_reverted else "stop loss"

                    self.Debug(
                        "Exit pair "
                        + symbol_a.Value
                        + "/"
                        + symbol_b.Value
                        + " reason="
                        + reason
                        + " z="
                        + str(round(z_score, 2))
                    )

            self.Plot("Pair Signal", symbol_a.Value + "/" + symbol_b.Value, z_score)

        self.Plot("Strategy Equity", "Portfolio Value", self.Portfolio.TotalPortfolioValue)
        self.Plot("Strategy Equity", "Cash Benchmark", self.initial_cash)
        self.Plot("Pair Diagnostics", "Active Pair Count", len(self.active_pairs))

    def EstimateSpreadModel(self, price_a, price_b):

        if len(price_a) != len(price_b):
            return None

        log_a = np.log(price_a.values)
        log_b = np.log(price_b.values)

        if len(log_a) < 30:
            return None

        x = sm.add_constant(log_b)

        try:
            model = sm.OLS(log_a, x).fit()
        except:
            return None

        intercept = model.params[0]
        beta = model.params[1]

        spread = log_a - intercept - beta * log_b

        spread_mean = np.mean(spread)
        spread_std = np.std(spread)

        if spread_std <= 0:
            return None

        return {
            "intercept": intercept,
            "beta": beta,
            "spread_mean": spread_mean,
            "spread_std": spread_std
        }

    def SetTargetIfChanged(self, symbol, target_weight):

        current_weight = self.GetCurrentWeight(symbol)

        if abs(target_weight - current_weight) >= self.minimum_trade_change:
            self.SetHoldings(symbol, target_weight)

    def GetCurrentWeight(self, symbol):

        if self.Portfolio.TotalPortfolioValue <= 0:
            return 0.0

        return (
            self.Portfolio[symbol].HoldingsValue
            / self.Portfolio.TotalPortfolioValue
        )

    def PairKey(self, symbol_a, symbol_b):

        return symbol_a.Value + "_" + symbol_b.Value

    def GetIntParameter(self, name, default_value):

        value = self.GetParameter(name)

        if value is None or value == "":
            return default_value

        return int(value)

    def GetFloatParameter(self, name, default_value):

        value = self.GetParameter(name)

        if value is None or value == "":
            return default_value

        return float(value)
#region imports
from AlgorithmImports import *
#endregion
import numpy as np
import pandas as pd
import datetime as datetime
import statsmodels.formula.api as sm
import statsmodels.tsa.stattools as ts

class pairs(object):

    def __init__(self,a,b):
        self.a = a
        self.b = b
        self.name = str(a) + ':' + str(b)
        self.df = pd.concat([a.df,b.df],axis = 1).dropna()
        self.num_bar = self.df.shape[0]
        self.cor = self.df.corr().iloc[0][1]
        self.error = 0
        self.last_error = 0
        self.a_price = []
        self.a_date = []
        self.b_price = []
        self.b_date = []

    def cor_update(self):
        self.cor = self.df.corr().iloc[0][1]

    def cointegration_test(self):
        self.model = sm.ols(formula = '%s ~ %s'%(str(self.a),str(self.b)), data = self.df).fit()
        self.adf = ts.adfuller(self.model.resid,autolag = 'BIC')[0]
        self.mean_error = np.mean(self.model.resid)
        self.sd = np.std(self.model.resid)

    def price_record(self,data_a,data_b):
        self.a_price.append(float(data_a.Close))
        self.a_date.append(data_a.EndTime)
        self.b_price.append(float(data_b.Close))
        self.b_date.append(data_b.EndTime)

    def df_update(self):
        new_df = pd.DataFrame({str(self.a):self.a_price,str(self.b):self.b_price},index = [self.a_date]).dropna()
        self.df = pd.concat([self.df,new_df])
        self.df = self.df.tail(self.num_bar)
        for i in [self.a_price,self.a_date,self.b_price,self.b_date]:
            i = []