from QuantConnect import *
from QuantConnect.Data import *
from QuantConnect.Algorithm import *
from QuantConnect.Indicators import *
from System.Collections.Generic import List
class EmaCrossUniverseSelectionAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020,5,15) #Set Start Date
self.SetCash(100000) #Set Strategy Cash
self.resolution = Resolution.Daily
self.UniverseSettings.Resolution = Resolution.Daily
self.AddEquity("SPY", Resolution.Daily)
self.SetRiskManagement(MaximumDrawdownPercentPerSecurity(0.1))
self.momentumpick_portfolio_percentage = 0.16
self.stocks = []
self.universeCount = 5
self.averages = {}
self.SetUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction))
self.Schedule.On(self.DateRules.EveryDay(),self.TimeRules.AfterMarketOpen("SPY",30),self.trade_stocks)
self.Schedule.On(self.DateRules.EveryDay(),self.TimeRules.AfterMarketOpen("SPY",360),self.trade_stocks)
def CoarseSelectionFunction(self, coarse):
self.exit_stocks = []
hasfundamental = [x for x in coarse if x.HasFundamentalData ]
filtered = [ x for x in hasfundamental if x.Price > 1 and x.Volume > 3000000]
sortedByDollarVolume = sorted(filtered, key=lambda x: x.Volume,reverse=True)[:200]
for security in sortedByDollarVolume:
if security.Symbol not in self.averages:
self.averages[security.Symbol] = SymbolData(security.Symbol, self)
self.history = self.History(security.Symbol, 30, Resolution.Daily)
try:
self.averages[security.Symbol].WarmUpIndicators(self.history)
self.averages[security.Symbol].is_uptrend = self.averages[security.Symbol].bullSqueeze()
except:
self.Debug(str(security.Symbol))
# if not self.history.empty:
# self.averages[security.Symbol].WarmUpIndicators(self.history)
self.averages[security.Symbol].update(security.EndTime, security.Price, security.Volume)
values = list(filter(lambda x: x.is_uptrend, self.averages.values()))
values = sorted(values, key=lambda x: x.scale)
return [ x.symbol for x in values]
def FineSelectionFunction(self,fine):
sortedByROE = sorted(fine, key = lambda x: x.OperationRatios.ROE.OneYear)
if len(sortedByROE)>1:
self.stocks = [x for x in sortedByROE[-self.universeCount:]]
return [x.Symbol for x in sortedByROE[-self.universeCount:]]
return []
def trade_stocks(self):
if self.Portfolio.Invested:
invested = [x.Key for x in self.Portfolio if x.Value.Invested]
for security in invested:
security_in_portfolio = self.Portfolio[security]
profitPercentage = security_in_portfolio.UnrealizedProfitPercent
if profitPercentage > 0.10:
self.Liquidate(security, "---Take Profit: " + str(profitPercentage*100))
break
if profitPercentage < -0.05:
self.Liquidate(security, "---Take Loss: " + str(profitPercentage*100))
break
for security in self.stocks:
if 'SPY' in str(security.Symbol): continue
if self.Portfolio.Cash > 5000:
self.SetHoldings(security.Symbol, self.momentumpick_portfolio_percentage)
if len(self.stocks) > 0:
self.stocks = []
# class used to improve readability of the coarse selection function
class SymbolData:
def __init__(self, symbol, algo):
self.algo = algo
self.symbol = symbol
self.is_uptrend = False
self.scale = 0
self.resolution = Resolution.Daily
self.priceSMA = SimpleMovingAverage(3)
self.volSMA = SimpleMovingAverage(3)
self.priceWin = RollingWindow[float](3)
#direcitonal indicators
self._macdHistogram = RollingWindow[float](12)
self._bbupwindow = RollingWindow[Decimal](12) # For other security types, use QuoteBar
self._bbmidwindow = RollingWindow[Decimal](12) # For other security types, use QuoteBar
self._bblowindow = RollingWindow[Decimal](12) # For other security types, use QuoteBar
self._keltnerupwindow = RollingWindow[Decimal](12) # For other security types, use QuoteBar
self._keltnermidwindow = RollingWindow[Decimal](12) # For other security types, use QuoteBar
self._keltnerlowindow = RollingWindow[Decimal](12) # For other security types, use QuoteBar
self._mom = RollingWindow[float](12)
self.stock = self.algo.AddEquity(self.symbol,self.resolution)
self._macd = self.algo.MACD(self.symbol, 12, 26, 9, MovingAverageType.Exponential, self.resolution)
self._bb = self.algo.BB(self.symbol, 20, 2, MovingAverageType.Simple, self.resolution)
self._keltner = self.algo.KCH(self.symbol, 20, Decimal(1.5), MovingAverageType.Simple, self.resolution)
self._mom1 = self.algo.MOM(self.symbol, 20, self.resolution)
def bullSqueeze(self):
a = False
b = False
c = 3
d = 3
for num in range(0,c):
if self._bbupwindow.Samples > c and self._keltnerupwindow.Samples > c and self._bblowindow.Samples > c and self._keltnerlowindow.Samples > c:
if self._bbupwindow[num] < self._keltnerupwindow[num] and self._bblowindow[num] > self._keltnerlowindow[num]:
a = True
else:
return False
else:
return False
if a == True:
for num in range(1,d):
if self._mom.Samples > d and self._macdHistogram.Samples > d:
if self._macdHistogram[num] < self._macdHistogram[0] and self._mom[num] < self._mom[0]:
b = True
else: return False
else: return False
return b
def update(self, time, price, volume):
if self._macd.IsReady and self._keltner.IsReady and self._bb.IsReady and self.priceSMA.IsReady and self.volSMA.IsReady:
self.priceWin.Add(price)
self.priceSMA.Update(time, price)
self.volSMA.Update(time, volume)
self._macdHistogram.Add(self._macd.Histogram.Current.Value)
self._mom.Add(self._mom1.Current.Value)
self._bbupwindow.Add(self._bb.UpperBand.Current.Value)
self._bbmidwindow.Add(self._bb.MiddleBand.Current.Value)
self._bblowindow.Add(self._bb.LowerBand.Current.Value)
self._keltnerupwindow.Add(self._keltner.UpperBand.Current.Value)
self._keltnermidwindow.Add(self._keltner.MiddleBand.Current.Value)
self._keltnerlowindow.Add(self._keltner.LowerBand.Current.Value)
self.MAprice = self.priceSMA.Current.Value
self.MAvol = self.volSMA.Current.Value
self.is_uptrend = self.bullSqueeze()
if self.is_uptrend:
self.scale = ((volume - self.MAvol) / ((self.MAvol + volume) / 2.0))
def WarmUpIndicators(self, history):
for index, row in history.loc[str(self.symbol)].iterrows():
self.priceWin.Add(row["close"])
self._macd.Update(index, row["close"])
self._bb.Update(index, row["close"])
self.priceSMA.Update(index, row["close"])
self.volSMA.Update(index, row["volume"])
self._mom.Add(self._mom1.Current.Value)
if self._macd.IsReady and self._bb.IsReady:
self._macdHistogram.Add(self._macd.Histogram.Current.Value)
self._bbupwindow.Add(self._bb.UpperBand.Current.Value)
self._bbmidwindow.Add(self._bb.MiddleBand.Current.Value)
self._bblowindow.Add(self._bb.LowerBand.Current.Value)
for bar in history.itertuples():
tradeBar = TradeBar(bar.Index[1], bar.Index[0], bar.open, bar.high, bar.low, bar.close, bar.volume, timedelta(1))
self._keltner.Update(tradeBar)
if self._keltner.IsReady:
self._keltnerupwindow.Add(self._keltner.UpperBand.Current.Value)
self._keltnermidwindow.Add(self._keltner.MiddleBand.Current.Value)
self._keltnerlowindow.Add(self._keltner.LowerBand.Current.Value)