Hello,
The code below uses a symbol universe and multiple timeframes with quote consolidators.
I am having some trouble pumping in historical data into the consolidator Update method. It's not clear what format this data should be in. I get the following error: TypeError : No method matches given arguments for Update
Also any guidnace or feedback on the overall code structure would also be greatly appretiated.
class Sample(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1)
self.SetCash(100000)
self.AddUniverse(self.selection_coarse)
self.strategies = {}
def selection_coarse(self, coarse):
'''Coarse symbol universe filter'''
selected = [ x for x in coarse if x.HasFundamentalData and (x.Price >= 20) ]
dollar_volume = sorted(selected, key=lambda x: x.DollarVolume, reverse=True)
top = dollar_volume[:5]
symbols = [x.Symbol for x in top]
return symbols
def OnSecuritiesChanged(self, changes):
'''Manage Strategy object for each symbol in filtered universe'''
for security in changes.AddedSecurities:
symbol = security.Symbol
if symbol not in self.strategies:
self.strategies[symbol] = Strategy(self, symbol)
for security in changes.RemovedSecurities:
symbol = security.Symbol
if symbol in self.strategies:
strategy = self.strategies.pop(symbol, None)
self.SubscriptionManager.RemoveConsolidator(symbol, strategy.consolidator_intraday)
def OnData(self, data):
pass
class Strategy:
def __init__(self, qc, symbol):
self.qc = qc
self.symbol = symbol
# Create consolidator
self.consolidator_intraday = TradeBarConsolidator(timedelta(5))
# Attach event handler to consolidator
self.consolidator_intraday.DataConsolidated += self.handler_intraday
# Subscribe consolidator to data
self.qc.SubscriptionManager.AddConsolidator(self.symbol, self.consolidator_intraday)
# Create indicaotr
self._ema_fast_intraday = ExponentialMovingAverage(8)
# Create rolling window
self.ema_fast_intraday = RollingWindow[ExponentialMovingAverage](5)
# Pump historical data into consolidator
self.bars_required = 200
self.history = self.qc.History(self.symbol, self.bars_required)
for bar in self.history.itertuples():
self.consolidator_intraday.Update(bar)
def handler_intraday(self, sender, bar):
'''Event hander for intraday timeframe'''
# Update indicator
self._ema_fast_intraday.Update(bar.EndTime, bar.Close)
# Update rolling window
self.ema_fast_intraday.Add(self._ema_fast_intraday)