Hello,

I this is my first algo on quantconnect. I tried to implement an pairs trading algo with automatic pairs selection.

I use predefined models except for my alpha model -> See in first code Snippet.

import statsmodels.api as sm from statsmodels.tsa.stattools import adfuller import numpy as np import pandas as pd from QuantConnect.Algorithm.Framework.Alphas import * class PairsAlpha(AlphaModel): def __init__(self,algorithm): try: self.securities =[] self.look_back_window = 30 self.pairs=dict() self.coint_p_val = 0.05 self.max_trade_pairs = 15 algorithm.Debug("Init Alpha") self.asset1priceHist=[] self.asset2priceHist=[] except: algorithm.Debug("Init Alpha exep") def setPairs(self,pairs): self.pairs = pairs def OnSecuritiesChanged(self, algorithm, changes): algorithm.Debug("Onchange Alpha") for security in changes.AddedSecurities: if not security in self.securities: self.securities.append(security) for security in changes.RemovedSecurities: if security in self.securities: self.securities.remove(security) def Update(self, algorithm, data): algorithm.Debug("Update Alpha") pairs = self.pairs if(len(pairs)==0): self.find_pairs(algorithm) pairs = self.pairs if(len(pairs)==0): return pairs_w_weights = [] for pair in pairs: pairs_w_weights.append([pair[0],pair[1],self.get_weights(self,algorithm,pair,1.25,3.0)]) algorithm.Debug(pairs_w_weights[:5]) Insights=[] for pairWeight in pairs_w_weights[:15]: if pairWeight[2]>0: InsightA=Insight.Price(pairWeight[0], timedelta(minutes = 20), InsightDirection.Up, pairWeight[2], 0.95,None,None,None) InsightB=Insight.Price(pairWeight[1], timedelta(minutes = 20), InsightDirection.Down, pairWeight[2], 0.95,None,None,None) Insights.append([InsightA,InsightB]) elif pairWeight[2]<0: InsightA=Insight.Price(pairWeight[0], timedelta(minutes = 20), InsightDirection.Down, pairWeight[2], 0.95,None,None,None) InsightB=Insight.Price(pairWeight[1], timedelta(minutes = 20), InsightDirection.Up, pairWeight[2], 0.95,None,None,None) Insights.append([InsightA,InsightB]) return Insight.Group(Insights) # Skipping magnitude, confidence and source model and assigning 25% to weighting. # insight = Insight.Price("IBM", timedelta(minutes = 20), InsightDirection.Up, None, None, None, 0.25) #higher price stock (stock A) long -> the difference between the pair will grow if get_weights > 0 def get_weights(self,algorithm,pair,realizeProfitThreshold,stopLossThreshold): algorithm.Log("getWeights regular") try: asset1PriceHist=pair.get_asset1hist asset2PriceHist=pair.get_asset2hist pricedif=asset1PriceHist.sub(asset2PriceHist) hist_mean_pricedif=pricedif.mean() hist_std_pricedif=pricedif.std() asset1_curPrice= algorithm.History(pair[0], 3, Resolution.Minute).mean() asset2_curPrice= algorithm.History(pair[1], 3, Resolution.Minute).mean() curPriceDif=asset1_curPrice-asset2_curPrice curDeviation=abs(curPriceDif-hist_mean_pricedif)/hist_std_pricedif asset1_curPrice=asset1_curPrice.mean() asset2_curPrice=asset2_curPrice.mean() maxprice=max(asset1_curPrice,asset2_curPrice) weight=0 if(hist_std_pricedif/maxprice > 0.1): return 0 if(curDeviation>realizeProfitThreshold and curDeviation<stopLossThreshold): if curPriceDif > hist_mean_pricedif: weight=(-1.0*curDeviation) else: weight=(1.0*curDeviation) return weight except: algorithm.Log("getWeight") def find_pairs(self,algorithm): algorithm.Debug("find_pairs") try: algorithm.Log("before assign Symbols") symbols = list() for kvp in algorithm.ActiveSecurities: security = kvp.Value symbol = security.Symbol symbols.append(security) lengthofSymbols = len(symbols) lengthofSymbols = len(symbols) if(len(symbols)==0): return for i in range(0, len(symbols)): algorithm.Log("symbol loop start") asset_i = symbols[i] algorithm.Log("assign asset symbol") for j in range(1 + i, len(symbols)): asset_j = symbols[j] pair_symbol = (asset_i, asset_j) invert = (asset_j, asset_i) if len(pair_symbol)==0: return PairPassTest = self.HasPassedTest(algorithm=algorithm,asset1=asset_i,asset2=asset_j,significance_level= self.coint_p_val) self if not PairPassTest: #jedes mal neu prüfen ob noch cointegrated, wenn nicht mehr -> dann raus aus der Liste if pair_symbol in self.pairs: self.pairs.pop(pair_symbol) elif invert in self.pairs: self.pairs.pop(invert) continue if(len(self.pairs)>=self.max_trade_pairs): continue pair = Pair(asset_i,asset_j) pair.set_asset1hist(hist=self.asset1PriceHist) pair.set_asset2hist(hist=self.asset2PriceHist) self.pairs[pair_symbol] = pair #eql to pair hist except Exception as e: algorithm.Log(str(e)) algorithm.Log("UpdatePairs Exception") def HasPassedTest(self,algorithm, asset1,asset2,significance_level): self.asset1PriceHist=algorithm.History(asset1.Symbol, self.look_back_window, Resolution.Daily) self.asset2PriceHist=algorithm.History(asset2.Symbol, self.look_back_window, Resolution.Daily) return self.is_cointegrated(algorithm=algorithm,data_x=self.asset1PriceHist,data_y=self.asset2PriceHist,cutoff=significance_level) '''Check whether the assets pass a pairs trading test Args: self: The self instance that experienced the change in securities asset1: The first asset's symbol in the pair asset2: The second asset's symbol in the pair Returns: True if the statistical test for the pair is successful''' def is_stationary(self,algorithm,data_x,cutoff): #Augmented Dickey-Fuller unit root test p_value = adfuller(data_x)[1] if p_value < cutoff: return True else: return False def is_cointegrated(self,algorithm,data_x,data_y,cutoff): #Check linear reg algorithm.Debug(cutoff) X=pd.DataFrame(data_x) Y=pd.DataFrame(data_y) if(Y.empty or X.empty): return False X=sm.add_constant(X['open']) Y=pd.Series(Y['open']) index=0 YPrice=[x for x in range(0,self.look_back_window)] for data in Y: YPrice[index]=data index+=1 XonlyVals=[x for x in range(0,self.look_back_window)] index=0 for xdata in X['open']: XonlyVals[index]=xdata index+=1 xdata X=sm.add_constant(XonlyVals) Y=YPrice Xtype=type(X) Ytype=type(Y) Ylen=len(Y) #Xlen=len(X) #algorithm.Debug(len(Y)) #X.reindex(Y.index) #Ylen=len(Y) #Xlen=len(X) linear_comb=sm.OLS(Y,X,'drop',True) fitted=linear_comb.fit() const,beta=fitted.params residual=fitted.resid if (beta < 0): return False else: return self.is_stationary(algorithm=algorithm,data_x=residual,cutoff=cutoff) class Pair(): def __init__(self,asset1Symbol,asset2Symbol): self.assetList = [asset1Symbol,asset2Symbol] self.asset1hist=[] self.asset2hist=[] def get_asset1hist(self): return self.asset1hist def get_asset2hist(self): return self.asset2hist def set_asset1hist(self,hist): self.asset1hist=hist def set_asset2hist(self,hist): self.asset2hist=hist def __getitem__(self, key): return self.assetList[key]

For your Interest my algo:
 

from Execution.ImmediateExecutionModel import ImmediateExecutionModel import time from PairAlpha import PairsAlpha from Selection.QC500UniverseSelectionModel import QC500UniverseSelectionModel class TransdimensionalUncoupledShield(QCAlgorithm): def Initialize(self): self.SetStartDate(2019, 1, 5) # Set Start Date self.SetEndDate(2019,6,25) self.SetCash(100000) # Set Strategy Cash self.look_back_window=30 self.coint_p_val=0.05 self.SetUniverseSelection(QC500UniverseSelectionModel()) self.UniverseSettings.Resolution = Resolution.Daily self.max_trade_pairs = 15 self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel()) self.SetRiskManagement(MaximumDrawdownPercentPerSecurity(0.02)) self.SetExecution(ImmediateExecutionModel()) self.SetWarmup(timedelta(7)) self.Debug("init algoparts done") self.RemoveSecurity("MSG") self.SetAlpha(PairsAlpha(algorithm=self)) def OnData(self, data): self.Debug("onData") data '''OnData event is the primary entry point for your self. Each new data point will be pumped in here. Arguments: data: Slice object keyed by symbol containing the stock data ''' # if not self.Portfolio.Invested: # self.SetHoldings("SPY", 1) # sort the data by daily dollar volume and take the top '5' def CoarseSelectionFunction(self, coarse): # sort descending by daily dollar volume sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True) MinPrice = [ x.Symbol for x in sortedByDollarVolume if x.Price > 20 ] # return the symbol objects of the top entries from our sorted collection return MinPrice[:100]

and the error message/Stacktrace:
System.TimeoutException: Algorithm took longer than 10 minutes on a single time loop. CurrentTimeStepElapsed: 10.0 minutes
at QuantConnect.Isolator.MonitorTask (System.Threading.Tasks.Task task, System.TimeSpan timeSpan, System.Func`1[TResult] withinCustomLimits, System.Int64 memoryCap, System.Int32 sleepIntervalMillis) [0x002d3] in <78602b4cadea4f3a881b10b839873497>:0
at QuantConnect.Isolator.ExecuteWithTimeLimit (System.TimeSpan timeSpan, System.Func`1[TResult] withinCustomLimits, System.Action codeBlock, System.Int64 memoryCap, System.Int32 sleepIntervalMillis, QuantConnect.Util.WorkerThread workerThread) [0x00092] in <78602b4cadea4f3a881b10b839873497>:0
at QuantConnect.Lean.Engine.Engine.Run (QuantConnect.Packets.AlgorithmNodePacket job, QuantConnect.Lean.Engine.AlgorithmManager manager, System.String assemblyPath, QuantConnect.Util.WorkerThread workerThread) [0x009f0] in Lean.Engine.AlgorithmManager manager, System.String assemblyPath, QuantConnect.Util.WorkerThread workerThread) [0x009f0] in <7aff72eb1cda49b9aff04ff68484be84>:0

Author