| Overall Statistics |
|
Total Orders 1256 Average Win 0.46% Average Loss -0.43% Compounding Annual Return 0.049% Drawdown 4.200% Expectancy 0.007 Start Equity 100000 End Equity 100314.46 Net Profit 0.314% Sharpe Ratio -1.597 Sortino Ratio -1.417 Probabilistic Sharpe Ratio 0.158% Loss Rate 51% Win Rate 49% Profit-Loss Ratio 1.07 Alpha 0 Beta 0 Annual Standard Deviation 0.02 Annual Variance 0 Information Ratio 0.027 Tracking Error 0.02 Treynor Ratio 0 Total Fees $2768.55 Estimated Strategy Capacity $0 Lowest Capacity Asset DVY STHD6FIMA3XH Portfolio Turnover 19.00% Drawdown Recovery 738 |
#region imports
from AlgorithmImports import *
import statsmodels.formula.api as sm
from statsmodels.tsa.stattools import coint, adfuller
#endregion
class Pairs(object):
def __init__(self, a, b):
self.a = a
self.b = b
self.Name = f'{a.Symbol.Value}:{b.Symbol.Value}'
self.Model = None
self.MeanError = 0
self.StandardDeviation = 0
self.Epsilon = 0
@property
def DataFrame(self):
df = pd.concat([self.a.DataFrame.droplevel([0]), self.b.DataFrame.droplevel([0])], axis=1).dropna()
df.columns = [self.a.Symbol.Value, self.b.Symbol.Value]
return df
@property
def Correlation(self):
return self.DataFrame.corr().iloc[0][1]
def cointegration_test(self):
coint_test = coint(self.a.Series.values.flatten(), self.b.Series.values.flatten(), trend="n", maxlag=0)
# Return if not cointegrated
if coint_test[1] >= 0.05:
return False
self.Model = sm.ols(formula = f'{self.a.Symbol.Value} ~ {self.b.Symbol.Value}', data=self.DataFrame).fit()
self.StationaryP = adfuller(self.Model.resid, autolag = 'BIC')[1]
self.MeanError = np.mean(self.Model.resid)
self.Epsilon = np.std(self.Model.resid)
return True#region imports
from AlgorithmImports import *
#endregion
class SymbolData(object):
def __init__(self, algorithm, symbol, lookback, interval):
lookback = int(lookback)
self.Symbol = symbol
self.Prices = RollingWindow[TradeBar](lookback // interval)
self.Series = None
self.DataFrame = None
self._algorithm = algorithm
self._consolidator = TradeBarConsolidator(timedelta(minutes=interval))
self._consolidator.DataConsolidated += self.OnDataConsolidated
history = algorithm.History(symbol, lookback, Resolution.Minute)
for bar in history.itertuples():
trade_bar = TradeBar(bar.Index[1], symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
self.Update(trade_bar)
@property
def IsReady(self):
return self.Prices.IsReady
def Update(self, trade_bar):
self._consolidator.Update(trade_bar)
def OnDataConsolidated(self, sender, consolidated):
self.Prices.Add(consolidated)
if self.IsReady:
self.Series = self._algorithm.PandasConverter.GetDataFrame[TradeBar](self.Prices)['close']
self.DataFrame = self.Series.to_frame()#region imports
from AlgorithmImports import *
#endregion
class TradingPair(object):
def __init__(self, ticket_a, ticket_b, intercept, slope, mean_error, epsilon):
self.ticket_a = ticket_a
self.ticket_b = ticket_b
self.model_intercept = intercept
self.model_slope = slope
self.mean_error = mean_error
self.epsilon = epsilon#region imports
from AlgorithmImports import *
#endregion
import numpy as np
import statsmodels.api as sm
import statsmodels.tsa.stattools as ts
"""
SCHEDULED FACTOR ETF PAIRS TRADING STRATEGY
This strategy applies a pairs-trading framework to a small universe of factor ETFs:
IVV = broad U.S. equity market
USMV = minimum volatility
QUAL = quality
DGRO = dividend growth
DVY = dividend yield
VLUE = value
MTUM = momentum
The model searches for ETF pairs that are both highly correlated and cointegrated.
The purpose is to identify pairs whose relative price relationship appears
statistically stable enough for a mean-reversion trade.
Once per month, the algorithm:
1. Downloads daily price history.
2. Calculates return correlations for every ETF pair.
3. Keeps pairs above the correlation threshold.
4. Tests those pairs for cointegration.
5. Selects the strongest cointegrated pairs without reusing the same ETF in
multiple active pair candidates.
For each selected pair, the model estimates:
log(price A) = intercept + beta * log(price B) + residual
The residual is the spread. The strategy then trades the spread z-score:
z = (current spread - historical spread mean) / historical spread std
Trading logic:
- If z-score is high:
ETF A is expensive relative to ETF B.
Short ETF A and long ETF B.
- If z-score is low:
ETF A is cheap relative to ETF B.
Long ETF A and short ETF B.
- If z-score mean-reverts toward zero:
Close the pair.
Risk controls:
- Maximum number of selected pairs
- Maximum gross exposure
- Per-pair stop loss using z-score
- No symbol is reused across selected pairs
- Cash benchmark, because this is a long/short relative-value strategy
"""
class FactorPairsTrading(QCAlgorithm):
def Initialize(self):
# ------------------------------------------------------------
# 1. BACKTEST SETTINGS
# ------------------------------------------------------------
self.SetStartDate(2020, 1, 1)
self.SetEndDate(2026, 5, 5)
self.initial_cash = 100000
self.SetCash(self.initial_cash)
# ------------------------------------------------------------
# 2. FACTOR ETF UNIVERSE
# ------------------------------------------------------------
tickers = [
"IVV",
"USMV",
"QUAL",
"DGRO",
"DVY",
"VLUE",
"MTUM"
]
self.symbols = []
for ticker in tickers:
symbol = self.AddEquity(ticker, Resolution.Daily).Symbol
self.symbols.append(symbol)
# ------------------------------------------------------------
# 3. PARAMETERS
# ------------------------------------------------------------
self.lookback = self.GetIntParameter("lookback", 126)
self.correlation_threshold = self.GetFloatParameter("correlation_threshold", 0.70)
self.cointegration_pvalue = self.GetFloatParameter("cointegration_pvalue", 0.10)
self.max_pairs = self.GetIntParameter("max_pairs", 2)
self.entry_z = self.GetFloatParameter("entry_z", 1.75)
self.exit_z = self.GetFloatParameter("exit_z", 0.50)
self.stop_z = self.GetFloatParameter("stop_z", 3.00)
self.target_gross_exposure = self.GetFloatParameter("target_gross_exposure", 1.00)
self.minimum_trade_change = self.GetFloatParameter("minimum_trade_change", 0.02)
# Safety checks
self.lookback = max(60, self.lookback)
self.max_pairs = max(1, self.max_pairs)
self.target_gross_exposure = max(0.0, min(2.0, self.target_gross_exposure))
# ------------------------------------------------------------
# 4. STATE
# ------------------------------------------------------------
self.selected_pairs = []
self.active_pairs = {}
self.rebalance_count = 0
# Cash benchmark is appropriate for a long/short relative-value model.
self.SetBenchmark(lambda time: self.initial_cash)
self.SetWarmUp(self.lookback + 5, Resolution.Daily)
# ------------------------------------------------------------
# 5. SCHEDULES
# ------------------------------------------------------------
self.Schedule.On(
self.DateRules.MonthStart(self.symbols[0]),
self.TimeRules.AfterMarketOpen(self.symbols[0], 30),
self.SelectPairs
)
self.Schedule.On(
self.DateRules.EveryDay(self.symbols[0]),
self.TimeRules.AfterMarketOpen(self.symbols[0], 60),
self.TradePairs
)
def SelectPairs(self):
if self.IsWarmingUp:
return
# ------------------------------------------------------------
# 1. LOAD HISTORY
# ------------------------------------------------------------
history = self.History(
self.symbols,
self.lookback,
Resolution.Daily
)
if history.empty:
self.Debug("No history available for factor pair selection.")
return
try:
close = history["close"].unstack(level=0)
except:
self.Debug("Could not unstack close history.")
return
close = close.dropna(axis=1)
if close.shape[1] < 2:
self.Debug("Not enough factor ETFs with complete history.")
return
# ------------------------------------------------------------
# 2. CORRELATION AND COINTEGRATION SCREEN
# ------------------------------------------------------------
returns = close.pct_change().dropna()
correlations = returns.corr()
candidate_pairs = []
symbols_available = list(close.columns)
for i in range(len(symbols_available)):
for j in range(i + 1, len(symbols_available)):
symbol_a = symbols_available[i]
symbol_b = symbols_available[j]
correlation = correlations.loc[symbol_a, symbol_b]
if correlation < self.correlation_threshold:
continue
price_a = close[symbol_a]
price_b = close[symbol_b]
try:
coint_result = ts.coint(
np.log(price_a),
np.log(price_b)
)
pvalue = coint_result[1]
except:
continue
if pvalue > self.cointegration_pvalue:
continue
model = self.EstimateSpreadModel(price_a, price_b)
if model is None:
continue
candidate_pairs.append(
{
"a": symbol_a,
"b": symbol_b,
"correlation": correlation,
"pvalue": pvalue,
"intercept": model["intercept"],
"beta": model["beta"],
"spread_mean": model["spread_mean"],
"spread_std": model["spread_std"]
}
)
candidate_pairs = sorted(
candidate_pairs,
key=lambda x: (x["pvalue"], -x["correlation"])
)
# ------------------------------------------------------------
# 3. SELECT NON-OVERLAPPING PAIRS
# ------------------------------------------------------------
selected = []
used_symbols = set()
for pair in candidate_pairs:
if pair["a"] in used_symbols or pair["b"] in used_symbols:
continue
selected.append(pair)
used_symbols.add(pair["a"])
used_symbols.add(pair["b"])
if len(selected) >= self.max_pairs:
break
# ------------------------------------------------------------
# 4. CLOSE PAIRS NO LONGER SELECTED
# ------------------------------------------------------------
selected_keys = set(
[
self.PairKey(pair["a"], pair["b"])
for pair in selected
]
)
for key in list(self.active_pairs.keys()):
if key not in selected_keys:
old_pair = self.active_pairs[key]
self.Liquidate(old_pair["a"])
self.Liquidate(old_pair["b"])
del self.active_pairs[key]
self.selected_pairs = selected
self.rebalance_count += 1
self.Debug(
"Factor pair selection "
+ str(self.Time.date())
+ " | selected="
+ str([
pair["a"].Value + "/" + pair["b"].Value
for pair in selected
])
)
self.Plot(
"Pair Diagnostics",
"Selected Pair Count",
len(self.selected_pairs)
)
self.Plot(
"Pair Diagnostics",
"Candidate Pair Count",
len(candidate_pairs)
)
self.Plot(
"Pair Diagnostics",
"Rebalance Count",
self.rebalance_count
)
def TradePairs(self):
if self.IsWarmingUp:
return
if len(self.selected_pairs) == 0:
return
pair_budget = self.target_gross_exposure / len(self.selected_pairs)
leg_weight = pair_budget / 2.0
for pair in self.selected_pairs:
symbol_a = pair["a"]
symbol_b = pair["b"]
key = self.PairKey(symbol_a, symbol_b)
if not self.Securities[symbol_a].HasData:
continue
if not self.Securities[symbol_b].HasData:
continue
price_a = self.Securities[symbol_a].Price
price_b = self.Securities[symbol_b].Price
if price_a <= 0 or price_b <= 0:
continue
spread = (
np.log(price_a)
- pair["intercept"]
- pair["beta"] * np.log(price_b)
)
if pair["spread_std"] <= 0:
continue
z_score = (
spread
- pair["spread_mean"]
) / pair["spread_std"]
is_active = key in self.active_pairs
# --------------------------------------------------------
# Entry logic
# --------------------------------------------------------
if not is_active:
if z_score > self.entry_z:
# A is expensive relative to B: short A, long B.
self.SetTargetIfChanged(symbol_a, -leg_weight)
self.SetTargetIfChanged(symbol_b, leg_weight)
self.active_pairs[key] = {
"a": symbol_a,
"b": symbol_b,
"direction": -1,
"entry_z": z_score
}
self.Debug(
"Enter SHORT spread "
+ symbol_a.Value
+ "/"
+ symbol_b.Value
+ " z="
+ str(round(z_score, 2))
)
elif z_score < -self.entry_z:
# A is cheap relative to B: long A, short B.
self.SetTargetIfChanged(symbol_a, leg_weight)
self.SetTargetIfChanged(symbol_b, -leg_weight)
self.active_pairs[key] = {
"a": symbol_a,
"b": symbol_b,
"direction": 1,
"entry_z": z_score
}
self.Debug(
"Enter LONG spread "
+ symbol_a.Value
+ "/"
+ symbol_b.Value
+ " z="
+ str(round(z_score, 2))
)
# --------------------------------------------------------
# Exit logic
# --------------------------------------------------------
else:
mean_reverted = abs(z_score) <= self.exit_z
stopped = abs(z_score) >= self.stop_z
if mean_reverted or stopped:
self.Liquidate(symbol_a)
self.Liquidate(symbol_b)
del self.active_pairs[key]
reason = "mean reversion" if mean_reverted else "stop loss"
self.Debug(
"Exit pair "
+ symbol_a.Value
+ "/"
+ symbol_b.Value
+ " reason="
+ reason
+ " z="
+ str(round(z_score, 2))
)
self.Plot(
"Pair Signal",
symbol_a.Value + "/" + symbol_b.Value,
z_score
)
self.Plot(
"Strategy Equity",
"Portfolio Value",
self.Portfolio.TotalPortfolioValue
)
self.Plot(
"Strategy Equity",
"Cash Benchmark",
self.initial_cash
)
self.Plot(
"Pair Diagnostics",
"Active Pair Count",
len(self.active_pairs)
)
def EstimateSpreadModel(self, price_a, price_b):
if len(price_a) != len(price_b):
return None
log_a = np.log(price_a.values)
log_b = np.log(price_b.values)
if len(log_a) < 30:
return None
x = sm.add_constant(log_b)
try:
model = sm.OLS(log_a, x).fit()
except:
return None
intercept = model.params[0]
beta = model.params[1]
spread = log_a - intercept - beta * log_b
spread_mean = np.mean(spread)
spread_std = np.std(spread)
if spread_std <= 0:
return None
return {
"intercept": intercept,
"beta": beta,
"spread_mean": spread_mean,
"spread_std": spread_std
}
def SetTargetIfChanged(self, symbol, target_weight):
current_weight = self.GetCurrentWeight(symbol)
if abs(target_weight - current_weight) >= self.minimum_trade_change:
self.SetHoldings(symbol, target_weight)
def GetCurrentWeight(self, symbol):
if self.Portfolio.TotalPortfolioValue <= 0:
return 0.0
return (
self.Portfolio[symbol].HoldingsValue
/ self.Portfolio.TotalPortfolioValue
)
def PairKey(self, symbol_a, symbol_b):
return symbol_a.Value + "_" + symbol_b.Value
def GetIntParameter(self, name, default_value):
value = self.GetParameter(name)
if value is None or value == "":
return default_value
return int(value)
def GetFloatParameter(self, name, default_value):
value = self.GetParameter(name)
if value is None or value == "":
return default_value
return float(value)