I've been banging my head up against the wall for a couple of mornings on this now. I've looked at the documentation and many form posts. The error I'm getting is…
Runtime Error: InvalidOperationException : Consolidators can only be used with a single symbol. The previous consolidated SecurityIdentifier ( 0) is not the same as in the current data (SPY R735QTJ8XC9X).at QuantConnect.Data.Consolidators.PeriodCountConsolidatorBase`2.Update(T data) in /LeanCloud/CI.Builder/bin/Debug/src/QuantConnect/Lean/Common/Data/Consolidators/PeriodCountConsolidatorBase.cs:line 211 InvalidOperationException : Consolidators can only be used with a single symbol. The previous consolidated SecurityIdentifier ( 0) is not the same as in the current data (SPY R735QTJ8XC9X).at QuantConnect.Data.Consolidators.PeriodCountConsolidatorBase`2.Update(T data) in /LeanCloud/CI.Builder/bin/Debug/src/QuantConnect/Lean/Common/Data/Consolidators/PeriodCountConsolidatorBase.cs:line 211
I'd greatly appreciate some help on this. I was able to build the whole strategy out without using the Algorithm Framework, but I can't even get past setting up the consolidators within the framework. I must just be missing something simple here…
class MyAlgorithm(QCAlgorithmFramework):
def Initialize(self):
self.SetStartDate(2021, 1, 15)
self.SetEndDate(2021, 1, 16)
self.SetCash(100000)
self.UniverseSettings.Resolution = Resolution.Minute
symbols = [ Symbol.Create("SPY", SecurityType.Equity, Market.USA) ]
self.SetUniverseSelection( ManualUniverseSelectionModel(symbols))
self.SetAlpha(MyCustomAlphaModel())
self.SetPortfolioConstruction(NullPortfolioConstructionModel())
self.SetExecution(NullExecutionModel())
self.SetRiskManagement(NullRiskManagementModel())
class MyCustomAlphaModel(AlphaModel):
'''Uses a combination of fast, medium, and slow EMAs to create insights.'''
def __init__(self, **kwargs):
'''Initializes a new default instance of the MyCustomAlphaModel class.
Args:
resolution: The resolution of historical data
consolidation_period(int): length of the consolidator in minutes
window_size(int): rolling window size in bars
fast_length(int): period of the fast ema in bars
med_length(int): period of the med ema in bars
slow_length(int): period of the slow ema in bars'''
self.resolution = kwargs['resolution'] if 'resolution' in kwargs else Resolution.Minute
self.consolidation_period = kwargs['consolidation_period'] if 'consolidation_period' in kwargs else 5
self.window_size = kwargs['window_size'] if 'window_size' in kwargs else 100
self.fast_length = kwargs['fast_length'] if 'fast_length' in kwargs else 60
self.med_length = kwargs['med_length'] if 'med_length' in kwargs else 200
self.slow_length = kwargs['slow_length'] if 'slow_length' in kwargs else 1200
self.prediction_interval = timedelta(minutes = self.consolidation_period)
self.symbol_data_by_symbol = {}
self.Name = 'MyCustomAlphaModel'
def Update(self, algorithm, data):
'''Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
insights = []
# do something here
return insights
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
algorithm.Debug("OnSecuritiesChanged Fired")
algorithm.Debug("Has added securities: " + str(len(changes.AddedSecurities) > 0))
algorithm.Debug("Has removed securities: " + str(len(changes.RemovedSecurities) > 0))
for x in changes.AddedSecurities:
algorithm.Debug(str(x.Symbol))
# clean up data for removed securities
for removed in changes.RemovedSecurities:
symbol_data = self.symbol_data_by_symbol.pop(removed.Symbol, None)
if symbol_data is not None:
symbol_data.RemoveConsolidators(algorithm)
# initialize data for added securities
added_symbols = [ x.Symbol for x in changes.AddedSecurities ]
for symbol in added_symbols:
algorithm.Debug(str(symbol) + ' added to universe at ' + str(algorithm.Time))
history = algorithm.History(added_symbols, self.slow_length*self.consolidation_period, self.resolution)
if history.empty: return
algorithm.Debug('history not empty, adding new symbols...')
for symbol in added_symbols:
algorithm.Debug('symbol: ' + str(symbol))
symbol_data = self.symbol_data_by_symbol.get(symbol)
if symbol_data is None:
symbol_data = SymbolData(
symbol,
algorithm,
self.consolidation_period,
self.window_size,
self.fast_length,
self.med_length,
self.slow_length
)
self.symbol_data_by_symbol[symbol] = symbol_data
algorithm.Debug('symbol_data_by_symbol:')
algorithm.Debug(str(self.symbol_data_by_symbol))
symbol_data.RegisterIndicators(algorithm)
symbol_data.WarmUpIndicators(history.loc[symbol], algorithm)
class SymbolData:
'''Contains data specific to a symbol required by this model'''
def __init__(self, symbol, algorithm, consolidation_period, window_size, fast_length, med_length, slow_length):
algorithm.Log('Initializing new SymbolData class for ' + str(symbol))
self.symbol = symbol
self.algorithm = algorithm
self.consolidation_period = consolidation_period
# create a rolling window so we can access past trade bars
self.window = RollingWindow[TradeBar](window_size)
self.ema_fast_window = RollingWindow[IndicatorDataPoint](window_size)
self.ema_med_window = RollingWindow[IndicatorDataPoint](window_size)
self.ema_slow_window = RollingWindow[IndicatorDataPoint](window_size)
# setup the EMA indicators
self.ema_fast = ExponentialMovingAverage('{}.EMA_FAST({})'.format(symbol, fast_length), fast_length)
self.ema_med = ExponentialMovingAverage('{}.EMA_MEDIUM({})'.format(symbol, med_length), med_length)
self.ema_slow = ExponentialMovingAverage('{}.EMA_SLOW({})'.format(symbol, slow_length), slow_length)
# define our trade bar consolidator. we can
# access the bar from the DataConsolidated events
self.bar_consolidator = TradeBarConsolidator(timedelta(minutes=consolidation_period))
# attach our event handler. the event handler is a function that will
# be called each time we produce a new consolidated piece of data.
self.bar_consolidator.DataConsolidated += self.ConsolidatedBarHandler
# this call adds our consolidator to
# the manager to receive updates from the engine
algorithm.SubscriptionManager.AddConsolidator(symbol, self.bar_consolidator)
def RegisterIndicators(self, algorithm):
# register the consolidated bar data to automatically
# update the EMA indicators
algorithm.RegisterIndicator(self.symbol, self.ema_fast, self.bar_consolidator)
algorithm.RegisterIndicator(self.symbol, self.ema_med, self.bar_consolidator)
algorithm.RegisterIndicator(self.symbol, self.ema_slow, self.bar_consolidator)
def RemoveConsolidators(self, algorithm):
if self.bar_consolidator is not None:
algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.bar_consolidator)
def WarmUpIndicators(self, history, algorithm):
for time, row in history.iterrows():
tradeBar = TradeBar()
tradeBar.Close = row['close']
tradeBar.Open = row['open']
tradeBar.High = row['high']
tradeBar.Low = row['low']
tradeBar.Volume = row['volume']
tradeBar.Time = time
self.bar_consolidator.Update(tradeBar)
def ConsolidatedBarHandler(self, sender, consolidated):
# add the consolidated data to our rolling windows
self.window.Add(consolidated)
self.ema_fast_window.Add(self.ema_fast.Current)
self.ema_med_window.Add(self.ema_med.Current)
self.ema_slow_window.Add(self.ema_slow.Current)
Varad Kabade
Hi Reggie O'Farrell,
In the above code snippet, the following part is leading to an error.
def WarmUpIndicators(self, history, algorithm):
Here the TradeBar we are creating has no Symbol attribute to resolve this we just need to add the Symbol attribute. Refer to the following code snippet
Refer to the attached backtest.
Best,
Varad Kabade
Reggie O'Farrell
Whoops! Thanks a ton Varad kabade ! That was it. What a simple thing to miss. Sometimes those are the most elusive.
Reggie O'Farrell
The material on this website is provided for informational purposes only and does not constitute an offer to sell, a solicitation to buy, or a recommendation or endorsement for any security or strategy, nor does it constitute an offer to provide investment advisory services by QuantConnect. In addition, the material offers no opinion with respect to the suitability of any security or specific investment. QuantConnect makes no guarantees as to the accuracy or completeness of the views expressed in the website. The views are subject to change, and may have become unreliable for various reasons, including changes in market conditions or economic circumstances. All investments involve risk, including loss of principal. You should consult with an investment professional before making any investment decisions.
To unlock posting to the community forums please complete at least 30% of Boot Camp.
You can continue your Boot Camp training progress from the terminal. We hope to see you in the community soon!