I'm trying to implement a QC version of QTradableStocksUS that simply simulates it's universe selection model, but I'm keep getting this error during the backtesting:

 Runtime Error: Algorithm took longer than 10 minutes on a single time loop. 

The algorithm screens about 1200 stocks every month, and then it took 200-day price data from History api for each stock, I figured out it's the part that caused slowness.

Is there a performance issue for QC/Lean? does it possbile to fix?

Please find the code below:

import numpy as np import pandas as pd ### <summary> ### Basic algorithm simply simulates quantopian's QTradableStocksUS universe selection model ### </summary> class QTradableStocksUSUniverseAlgorithm(QCAlgorithm): def Initialize(self): self.assets = [] self.universe_changes = [] self.rebalancing = True self.max_positions = 20 self.total_shares = {} self.primary_share_class_ids = {} self.rolling_data = RollingData() self.SetStartDate(2013,10, 7) #Set Start Date self.SetEndDate(2017,10,11) #Set End Date self.SetCash(100000) #Set Strategy Cash # Find more symbols here: http://quantconnect.com/data self.AddEquity("SPY", Resolution.Daily) self.UniverseSettings.Resolution = Resolution.Daily self.AddUniverse(self.coarse_selection, self.fine_selection) self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.AfterMarketOpen("SPY", 10), Action(self.start_rebalance)) def OnData(self, data): if self.universe_changes is None or self.universe_changes.Count < 2: self.Debug("{}: No universe changed, skip rebalancing...".format(self.Time)) return if not self.rebalancing: return if not self.assets: return self.rebalancing = False self.rebalance() #self.Debug("{}: Data changed.".format(self.Time)) def OnSecuritiesChanged(self, changes): #self.Debug("{}: OnSecuritiesChanged fired: {}".format(self.Time, changes)) self.universe_changes = changes def start_rebalance(self): self.universe_changes = None self.rebalancing = True #self.Debug("{}: starting rebalancing...".format(self.Time)) def rebalance(self): assets = self.assets#[:20] self.Debug("{}: start rebalancing: assets = {}".format(self.Time, assets)) for i in self.Portfolio.Values: if (i.Invested) and (i.Symbol not in assets): self.Liquidate(i.Symbol) for symbol in assets: self.SetHoldings(symbol, 1.0/len(assets)) def coarse_selection(self, coarse): if not self.rebalancing: return [] #self.Debug("{}: No. of coarse symbols: {}".format(self.Time, len(coarse))) filtered = [x for x in coarse if x.HasFundamentalData #and x.Volume > 0.0 #and x.Price > 0.0] and x.DollarVolume > 2.5e6] filtered = sorted(filtered, key=lambda x: x.DollarVolume, reverse=True) self.rolling_data.update(self, filtered) return [ x.Symbol for x in filtered ] def fine_selection(self, fine): if not self.rebalancing: return [] filtered = [x for x in fine \ if True #(x.CompanyReference.CountryId == "USA") and (not x.CompanyReference.IsLimitedPartnership) # not limited partnership and (not x.CompanyReference.IsREIT) # not REITs and (not x.SecurityReference.IsDepositaryReceipt) # not ADR and (x.CompanyReference.PrimaryShareClassID) and (x.SecurityReference.ShareClassStatus == "A") # active trading and (x.SecurityReference.CurrencyId == "USD") and (x.SecurityReference.SecurityType == "ST00000001") # common stock and (x.SecurityReference.ExchangeId != "OTC") # not OTC markets and ('_WI' not in x.Symbol.Value) #and x.EarningReports.BasicAverageShares.ThreeMonths * (x.EarningReports.BasicEPS.TwelveMonths * x.ValuationRatios.PERatio) > 5e8 ] self.total_shares = {} self.primary_share_class_ids = {} for x in filtered: self.total_shares[x.Symbol] = x.EarningReports.BasicAverageShares.ThreeMonths self.primary_share_class_ids[x.Symbol] = x.CompanyReference.PrimaryShareClassID symbols = [ x.Symbol for x in filtered ] assets = self.indicator_selection(symbols) #assets = filtered[:self.max_positions] self.Debug("{}: No. of fine symbols: {}".format(self.Time, len(assets))) self.assets = assets return assets def indicator_selection(self, final): symbols = [] data = [] count_new_ipo = 0 count_suspension = 0 count_low_volume = 0 count_micro_size = 0 for symbol in final: try: #algo.Debug("Breakpoint 3: %s" % symbol) prices = self.rolling_data.closes.loc[str(symbol)] #algo.Debug("Breakpoint 4: %d" % len(prices)) if prices['close'].count() < 180: # new IPOs, trading halts count_new_ipo += 1 continue if prices['close'].tail(20).isnull().sum() > 0: # stock suspension count_suspension += 1 continue market_cap = float(self.total_shares[symbol]) * prices['close'].tail(20).mean() if market_cap < 350000000.0: # market cap < 350 Million count_micro_size += 1 continue dollar_volumes = prices['close'] * prices['volume'] dollar_volume_median = dollar_volumes.median() if dollar_volume_median < 2500000.0: # day's volume < 2.5 Million count_low_volume += 1 #algo.Debug("Symbol %s is low volume at %.2f" % (symbol.Value, dollar_volume_median)) continue symbols.append(symbol) data.append([ dollar_volume_median, self.primary_share_class_ids[symbol] ]) except KeyError: self.Debug("Symbol %s is not exists in history data." % symbol.Value) # remove duplicate share classes by the same company, select most liquidity share class merged = pd.DataFrame(data, columns=['dollar_volume', 'primary_share_class_id'], index=symbols)\ .sort_values('dollar_volume', ascending=False)\ .drop_duplicates('primary_share_class_id', keep='first')\ .index.tolist() self.Debug("Removed %d of duplicated companies." % (len(symbols) - len(merged))) self.Debug("Removed %d of new IPO companies." % count_new_ipo) self.Debug("Removed %d of suspended trading companies." % count_suspension) self.Debug("Removed %d of low volume companies." % count_low_volume) self.Debug("Removed %d of micro size companies." % count_micro_size) self.Debug("No. of dynamic symbols: final=%d, merged=%d" % (len(final), len(merged))) return merged[:self.max_positions] class RollingData: window_length = 200 resolution = Resolution.Daily def __init__(self): self.closes = pd.DataFrame(data=[], columns=['symbol', 'time', 'close'], index=[]).set_index(['symbol', 'time']) def update(self, algo, corase): for item in corase: if str(item.Symbol) in self.closes.index.levels[0]: self.closes.loc[(str(item.Symbol), item.EndTime),] = item.Price else: history = algo.History([item.Symbol], self.window_length, self.resolution) prices = history.loc[str(item.Symbol)] for time, price in prices.iterrows(): self.closes.loc[(str(item.Symbol), time),] = price['close'] self.closes.sort_values('time', ascending=True) algo.Debug("{}: No. of records in rolling data: {}".format(algo.Time, len(self.closes)))

Author