| Overall Statistics |
|
Total Orders 736 Average Win 0.55% Average Loss -0.47% Compounding Annual Return 0.931% Drawdown 3.900% Expectancy 0.024 Start Equity 50000 End Equity 51572.98 Net Profit 3.146% Sharpe Ratio -1.455 Sortino Ratio -1.219 Probabilistic Sharpe Ratio 4.373% Loss Rate 53% Win Rate 47% Profit-Loss Ratio 1.17 Alpha 0 Beta 0 Annual Standard Deviation 0.032 Annual Variance 0.001 Information Ratio 0.214 Tracking Error 0.032 Treynor Ratio 0 Total Fees $1343.36 Estimated Strategy Capacity $0 Lowest Capacity Asset PNC R735QTJ8XC9X Portfolio Turnover 12.76% Drawdown Recovery 127 |
#region imports
from AlgorithmImports import *
#endregion
import numpy as np
import pandas as pd
import statsmodels.api as sm
import statsmodels.tsa.stattools as ts
"""
SCHEDULED PAIRS TRADING STRATEGY
This strategy searches for statistically related equity pairs and trades
mean-reversion in their spread.
The universe is a diversified list of large bank and financial stocks. Once per
month, the algorithm:
1. Downloads daily historical prices.
2. Calculates pairwise return correlations.
3. Keeps pairs with sufficiently high correlation.
4. Runs a cointegration test on the price series.
5. Selects the strongest cointegrated pairs without reusing the same symbol in
multiple pairs.
For each selected pair, the model estimates a hedge relationship:
log(price A) = intercept + beta * log(price B) + error
The error term is the spread. The model calculates the spread z-score:
z = (current spread - historical spread mean) / historical spread std
Trading logic:
- If z-score is high:
stock A is expensive relative to stock B
short A, long B
- If z-score is low:
stock A is cheap relative to stock B
long A, short B
- If z-score mean-reverts toward zero:
close the pair
Risk management:
- Maximum number of active pairs
- Maximum gross exposure
- Per-pair stop-loss based on spread z-score
- Re-selection of pairs monthly
- Cash benchmark, because this is a long/short relative-value strategy
This version avoids minute-level loops and avoids dependency on an external
pair.py file.
"""
class ScheduledPairsTrading(QCAlgorithm):
def Initialize(self):
# ------------------------------------------------------------
# 1. BACKTEST SETTINGS
# ------------------------------------------------------------
self.SetStartDate(2023, 1, 1)
self.SetEndDate(2026, 5, 5)
self.initial_cash = 50000
self.SetCash(self.initial_cash)
# ------------------------------------------------------------
# 2. UNIVERSE
# ------------------------------------------------------------
tickers = [
"BAC", "JPM", "WFC", "C", "USB", "PNC", "BK", "STT",
"KEY", "RF", "CFG", "FITB", "TFC", "MTB", "HBAN",
"TD", "RY", "BMO", "BNS", "CM"
]
self.symbols = []
for ticker in tickers:
symbol = self.AddEquity(ticker, Resolution.Daily).Symbol
self.symbols.append(symbol)
# ------------------------------------------------------------
# 3. PARAMETERS
# ------------------------------------------------------------
self.lookback = self.GetIntParameter("lookback", 126)
self.correlation_threshold = self.GetFloatParameter("correlation_threshold", 0.75)
self.cointegration_pvalue = self.GetFloatParameter("cointegration_pvalue", 0.05)
self.max_pairs = self.GetIntParameter("max_pairs", 3)
self.entry_z = self.GetFloatParameter("entry_z", 2.0)
self.exit_z = self.GetFloatParameter("exit_z", 0.50)
self.stop_z = self.GetFloatParameter("stop_z", 3.5)
self.target_gross_exposure = self.GetFloatParameter("target_gross_exposure", 1.00)
self.minimum_trade_change = self.GetFloatParameter("minimum_trade_change", 0.02)
# Safety checks.
self.lookback = max(60, self.lookback)
self.max_pairs = max(1, self.max_pairs)
self.target_gross_exposure = max(0.0, min(2.0, self.target_gross_exposure))
# ------------------------------------------------------------
# 4. STATE
# ------------------------------------------------------------
self.selected_pairs = []
self.active_pairs = {}
self.rebalance_count = 0
# Cash benchmark is appropriate for long/short relative value.
self.SetBenchmark(lambda time: self.initial_cash)
self.SetWarmUp(self.lookback + 5, Resolution.Daily)
# ------------------------------------------------------------
# 5. SCHEDULES
# ------------------------------------------------------------
self.Schedule.On(
self.DateRules.MonthStart(self.symbols[0]),
self.TimeRules.AfterMarketOpen(self.symbols[0], 30),
self.SelectPairs
)
self.Schedule.On(
self.DateRules.EveryDay(self.symbols[0]),
self.TimeRules.AfterMarketOpen(self.symbols[0], 60),
self.TradePairs
)
def SelectPairs(self):
if self.IsWarmingUp:
return
history = self.History(
self.symbols,
self.lookback,
Resolution.Daily
)
if history.empty:
self.Debug("No history available for pair selection.")
return
try:
close = history["close"].unstack(level=0)
except:
self.Debug("Could not unstack close history.")
return
close = close.dropna(axis=1)
if close.shape[1] < 2:
self.Debug("Not enough symbols with complete history.")
return
returns = close.pct_change().dropna()
correlations = returns.corr()
candidate_pairs = []
symbols_available = list(close.columns)
for i in range(len(symbols_available)):
for j in range(i + 1, len(symbols_available)):
symbol_a = symbols_available[i]
symbol_b = symbols_available[j]
correlation = correlations.loc[symbol_a, symbol_b]
if correlation < self.correlation_threshold:
continue
price_a = close[symbol_a]
price_b = close[symbol_b]
try:
coint_result = ts.coint(
np.log(price_a),
np.log(price_b)
)
pvalue = coint_result[1]
except:
continue
if pvalue > self.cointegration_pvalue:
continue
model = self.EstimateSpreadModel(price_a, price_b)
if model is None:
continue
candidate_pairs.append(
{
"a": symbol_a,
"b": symbol_b,
"correlation": correlation,
"pvalue": pvalue,
"intercept": model["intercept"],
"beta": model["beta"],
"spread_mean": model["spread_mean"],
"spread_std": model["spread_std"]
}
)
candidate_pairs = sorted(
candidate_pairs,
key=lambda x: (x["pvalue"], -x["correlation"])
)
selected = []
used_symbols = set()
for pair in candidate_pairs:
if pair["a"] in used_symbols or pair["b"] in used_symbols:
continue
selected.append(pair)
used_symbols.add(pair["a"])
used_symbols.add(pair["b"])
if len(selected) >= self.max_pairs:
break
# Close active pairs that are no longer selected.
selected_keys = set([self.PairKey(x["a"], x["b"]) for x in selected])
for key in list(self.active_pairs.keys()):
if key not in selected_keys:
old_pair = self.active_pairs[key]
self.Liquidate(old_pair["a"])
self.Liquidate(old_pair["b"])
del self.active_pairs[key]
self.selected_pairs = selected
self.rebalance_count += 1
self.Debug(
"Pair selection "
+ str(self.Time.date())
+ " | selected="
+ str([
pair["a"].Value + "/" + pair["b"].Value
for pair in selected
])
)
self.Plot("Pair Diagnostics", "Selected Pair Count", len(self.selected_pairs))
self.Plot("Pair Diagnostics", "Rebalance Count", self.rebalance_count)
def TradePairs(self):
if self.IsWarmingUp:
return
if len(self.selected_pairs) == 0:
return
pair_budget = self.target_gross_exposure / len(self.selected_pairs)
leg_weight = pair_budget / 2.0
for pair in self.selected_pairs:
symbol_a = pair["a"]
symbol_b = pair["b"]
key = self.PairKey(symbol_a, symbol_b)
if not self.Securities[symbol_a].HasData:
continue
if not self.Securities[symbol_b].HasData:
continue
price_a = self.Securities[symbol_a].Price
price_b = self.Securities[symbol_b].Price
if price_a <= 0 or price_b <= 0:
continue
spread = (
np.log(price_a)
- pair["intercept"]
- pair["beta"] * np.log(price_b)
)
if pair["spread_std"] <= 0:
continue
z_score = (
spread
- pair["spread_mean"]
) / pair["spread_std"]
is_active = key in self.active_pairs
# --------------------------------------------------------
# Entry logic
# --------------------------------------------------------
if not is_active:
if z_score > self.entry_z:
# A is rich relative to B: short A, long B.
self.SetTargetIfChanged(symbol_a, -leg_weight)
self.SetTargetIfChanged(symbol_b, leg_weight)
self.active_pairs[key] = {
"a": symbol_a,
"b": symbol_b,
"direction": -1,
"entry_z": z_score
}
self.Debug(
"Enter SHORT spread "
+ symbol_a.Value
+ "/"
+ symbol_b.Value
+ " z="
+ str(round(z_score, 2))
)
elif z_score < -self.entry_z:
# A is cheap relative to B: long A, short B.
self.SetTargetIfChanged(symbol_a, leg_weight)
self.SetTargetIfChanged(symbol_b, -leg_weight)
self.active_pairs[key] = {
"a": symbol_a,
"b": symbol_b,
"direction": 1,
"entry_z": z_score
}
self.Debug(
"Enter LONG spread "
+ symbol_a.Value
+ "/"
+ symbol_b.Value
+ " z="
+ str(round(z_score, 2))
)
# --------------------------------------------------------
# Exit logic
# --------------------------------------------------------
else:
active = self.active_pairs[key]
mean_reverted = abs(z_score) <= self.exit_z
stopped = abs(z_score) >= self.stop_z
if mean_reverted or stopped:
self.Liquidate(symbol_a)
self.Liquidate(symbol_b)
del self.active_pairs[key]
reason = "mean reversion" if mean_reverted else "stop loss"
self.Debug(
"Exit pair "
+ symbol_a.Value
+ "/"
+ symbol_b.Value
+ " reason="
+ reason
+ " z="
+ str(round(z_score, 2))
)
self.Plot("Pair Signal", symbol_a.Value + "/" + symbol_b.Value, z_score)
self.Plot("Strategy Equity", "Portfolio Value", self.Portfolio.TotalPortfolioValue)
self.Plot("Strategy Equity", "Cash Benchmark", self.initial_cash)
self.Plot("Pair Diagnostics", "Active Pair Count", len(self.active_pairs))
def EstimateSpreadModel(self, price_a, price_b):
if len(price_a) != len(price_b):
return None
log_a = np.log(price_a.values)
log_b = np.log(price_b.values)
if len(log_a) < 30:
return None
x = sm.add_constant(log_b)
try:
model = sm.OLS(log_a, x).fit()
except:
return None
intercept = model.params[0]
beta = model.params[1]
spread = log_a - intercept - beta * log_b
spread_mean = np.mean(spread)
spread_std = np.std(spread)
if spread_std <= 0:
return None
return {
"intercept": intercept,
"beta": beta,
"spread_mean": spread_mean,
"spread_std": spread_std
}
def SetTargetIfChanged(self, symbol, target_weight):
current_weight = self.GetCurrentWeight(symbol)
if abs(target_weight - current_weight) >= self.minimum_trade_change:
self.SetHoldings(symbol, target_weight)
def GetCurrentWeight(self, symbol):
if self.Portfolio.TotalPortfolioValue <= 0:
return 0.0
return (
self.Portfolio[symbol].HoldingsValue
/ self.Portfolio.TotalPortfolioValue
)
def PairKey(self, symbol_a, symbol_b):
return symbol_a.Value + "_" + symbol_b.Value
def GetIntParameter(self, name, default_value):
value = self.GetParameter(name)
if value is None or value == "":
return default_value
return int(value)
def GetFloatParameter(self, name, default_value):
value = self.GetParameter(name)
if value is None or value == "":
return default_value
return float(value)#region imports
from AlgorithmImports import *
#endregion
import numpy as np
import pandas as pd
import datetime as datetime
import statsmodels.formula.api as sm
import statsmodels.tsa.stattools as ts
class pairs(object):
def __init__(self,a,b):
self.a = a
self.b = b
self.name = str(a) + ':' + str(b)
self.df = pd.concat([a.df,b.df],axis = 1).dropna()
self.num_bar = self.df.shape[0]
self.cor = self.df.corr().iloc[0][1]
self.error = 0
self.last_error = 0
self.a_price = []
self.a_date = []
self.b_price = []
self.b_date = []
def cor_update(self):
self.cor = self.df.corr().iloc[0][1]
def cointegration_test(self):
self.model = sm.ols(formula = '%s ~ %s'%(str(self.a),str(self.b)), data = self.df).fit()
self.adf = ts.adfuller(self.model.resid,autolag = 'BIC')[0]
self.mean_error = np.mean(self.model.resid)
self.sd = np.std(self.model.resid)
def price_record(self,data_a,data_b):
self.a_price.append(float(data_a.Close))
self.a_date.append(data_a.EndTime)
self.b_price.append(float(data_b.Close))
self.b_date.append(data_b.EndTime)
def df_update(self):
new_df = pd.DataFrame({str(self.a):self.a_price,str(self.b):self.b_price},index = [self.a_date]).dropna()
self.df = pd.concat([self.df,new_df])
self.df = self.df.tail(self.num_bar)
for i in [self.a_price,self.a_date,self.b_price,self.b_date]:
i = []