| Overall Statistics |
|
Total Trades 6382 Average Win 0.31% Average Loss -0.39% Compounding Annual Return 0% Drawdown 113.700% Expectancy -0.231 Net Profit -113.789% Sharpe Ratio -0.548 Probabilistic Sharpe Ratio 0.000% Loss Rate 57% Win Rate 43% Profit-Loss Ratio 0.80 Alpha 0 Beta 0 Annual Standard Deviation 0.62 Annual Variance 0.384 Information Ratio -0.548 Tracking Error 0.62 Treynor Ratio 0 Total Fees $77871.66 Estimated Strategy Capacity $9400000.00 Lowest Capacity Asset NEM R735QTJ8XC9X Portfolio Turnover 142.06% |
# region imports
from AlgorithmImports import *
from universe import SectorETFUniverseSelectionModel
from portfolio import CointegratedVectorPortfolioConstructionModel
# endregion
class ETFPairsTrading(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1) # Set Start Date
self.SetCash(1000000) # Set Strategy Cash
lookback = self.GetParameter("lookback", 100) # lookback window on correlation & coinetgration
threshold = self.GetParameter("threshold", 2) # we want at least 2% expected profit margin to cover fees
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.UniverseSettings.Resolution = Resolution.Minute
self.SetUniverseSelection(SectorETFUniverseSelectionModel(self.UniverseSettings))
# This alpha model helps to pick the most correlated pair
# and emit signal when they have mispricing that stay active for a predicted period
# https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/alpha/supported-models#09-Pearson-Correlation-Pairs-Trading-Model
self.AddAlpha(PearsonCorrelationPairsTradingAlphaModel(lookback, Resolution.Daily, threshold=threshold))
# We try to use cointegrating vector to decide the relative movement magnitude of the paired assets
pcm = CointegratedVectorPortfolioConstructionModel(self, lookback, Resolution.Daily)
pcm.RebalancePortfolioOnSecurityChanges = False
self.SetPortfolioConstruction(pcm)
self.SetWarmUp(timedelta(90))#region imports
from AlgorithmImports import *
from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel
from arch.unitroot.cointegration import engle_granger
#endregion
class CointegratedVectorPortfolioConstructionModel(EqualWeightingPortfolioConstructionModel):
def __init__(self, algorithm, lookback = 252, resolution = Resolution.Minute,
rebalance = Expiry.EndOfWeek, portfolioBias = PortfolioBias.LongShort) -> None:
super().__init__(rebalance, portfolioBias)
self.algorithm = algorithm
self.lookback = lookback
self.resolution = resolution
self.symbol_data = {}
def ShouldCreateTargetForInsight(self, insight: Insight) -> bool:
# Ignore insights if the asset has open position in the same direction
return self.symbol_data[insight.Symbol].ShouldCreateNewTarget(insight.Direction)
def DetermineTargetPercent(self, activeInsights: List[Insight]) -> Dict[Insight, float]:
result = {}
# Reset indicators when corporate actions occur
for symbol in self.algorithm.CurrentSlice.Splits.keys():
if symbol in self.symbol_data:
self.symbol_data[symbol].Reset()
self.symbol_data[symbol].WarmUpIndicator()
for symbol in self.algorithm.CurrentSlice.Dividends.keys():
if symbol in self.symbol_data:
self.symbol_data[symbol].Reset()
self.symbol_data[symbol].WarmUpIndicator()
# If less than 2 active insights, no valid pair trading can be resulted
if len(activeInsights) < 2:
self.LiveLog(self.algorithm, f'PortfolioContructionModel: Less then 2 insights. Create zero-quantity targets')
return {insight: 0 for insight in activeInsights}
# Get log return for cointegrating vector regression
logr = pd.DataFrame({symbol: data.Return for symbol, data in self.symbol_data.items()
if symbol in [x.Symbol for x in activeInsights]})
# fill nans with mean, if the whole column is nan, drop it
logr = logr.fillna(logr.mean()).dropna(axis=1)
# make sure we have at least 2 columns
if logr.shape[1] < 2:
self.LiveLog(self.algorithm, f'PortfolioContructionModel: Less then 2 insights. Create zero-quantity targets.')
return {insight: 0 for insight in activeInsights}
# Obtain the cointegrating vector of all signaled assets for statistical arbitrage
model = engle_granger(logr.iloc[:, 0], logr.iloc[:, 1:], trend='n', max_lags=1)
# If result not significant, return
if model.pvalue > 0.05:
return {insight: 0 for insight in activeInsights}
# Normalization for budget constraint
coint_vector = model.cointegrating_vector
total_weight = sum(abs(coint_vector))
for insight, weight in zip(activeInsights, coint_vector):
# we can assume any paired assets' 2 dimensions in coint_vector are in opposite sign
result[insight] = abs(weight) / total_weight * insight.Direction
return result
def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges) -> None:
self.LiveLog(algorithm, f'PortfolioContructionModel.OnSecuritiesChanged: Changes: {changes}')
super().OnSecuritiesChanged(algorithm, changes)
for removed in changes.RemovedSecurities:
symbolData = self.symbol_data.pop(removed.Symbol, None)
if symbolData:
symbolData.Dispose()
for added in changes.AddedSecurities:
symbol = added.Symbol
if symbol not in self.symbol_data:
symbolData = self.SymbolData(algorithm, symbol, self.lookback, self.resolution)
self.symbol_data[symbol] = symbolData
def LiveLog(self, algorithm, message):
if algorithm.LiveMode:
algorithm.Log(message)
class SymbolData:
def __init__(self, algorithm, symbol, lookback, resolution):
self.algorithm = algorithm
self.symbol = symbol
self.lookback = lookback
self.resolution = resolution
# To store the historical daily log return
self.windows = RollingWindow[IndicatorDataPoint](lookback)
# Use daily log return to predict cointegrating vector
self.logr = LogReturn(1)
self.logr.Updated += self.OnUpdate
self.consolidator = TradeBarConsolidator(timedelta(1))
# Subscribe the consolidator and indicator to data for automatic update
algorithm.RegisterIndicator(symbol, self.logr, self.consolidator)
algorithm.SubscriptionManager.AddConsolidator(symbol, self.consolidator)
self.WarmUpIndicator()
def WarmUpIndicator(self):
# historical warm-up on the log return indicator
history = self.algorithm.History[TradeBar](self.symbol, self.lookback, self.resolution)
for bar in list(history)[:-1]:
self.logr.Update(bar.EndTime, bar.Close)
def OnUpdate(self, sender, updated):
self.windows.Add(IndicatorDataPoint(updated.EndTime, updated.Value))
def Dispose(self):
self.logr.Updated -= self.OnUpdate
self.Reset()
self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.consolidator)
def Reset(self):
self.logr.Reset()
self.windows.Reset()
def ShouldCreateNewTarget(self, direction):
quantity = self.algorithm.Portfolio[self.symbol].Quantity
return quantity == 0 or direction != np.sign(quantity)
@property
def Return(self):
return pd.Series(
data = [x.Value for x in self.windows],
index = [x.EndTime.date() for x in self.windows])[::-1]
#region imports
from AlgorithmImports import *
#endregion
class SectorETFUniverseSelectionModel(ETFConstituentsUniverseSelectionModel):
def __init__(self, universe_settings: UniverseSettings = None) -> None:
# Select the tech sector ETF constituents to get correlated assets
symbol = Symbol.Create("IYM", SecurityType.Equity, Market.USA)
super().__init__(symbol, universe_settings, self.ETFConstituentsFilter)
def ETFConstituentsFilter(self, constituents: List[ETFConstituentData]) -> List[Symbol]:
# Get the 10 securities with the largest weight in the index to reduce slippage and keep speed of the algorithm
selected = sorted([c for c in constituents if c.Weight],
key=lambda c: c.Weight, reverse=True)
return [c.Symbol for c in selected[:10]]