import numpy as np
class CrossSectionalMomentum(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2020, 1, 1)
self.SetCash(1000000)
self.SetBrokerageModel(AlphaStreamsBrokerageModel())
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverse(self.CoarseSelection)
self.SetAlpha(ReversalFakeoutAlpha())
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.SetRiskManagement(TrailingStopRiskManagementModel(0.02))
self.SetExecution(ImmediateExecutionModel())
def CoarseSelection(self, coarse):
sortedCoarse = sorted(coarse, key=lambda c:c.DollarVolume, reverse=True)
return [c.Symbol for c in sortedCoarse][:1000]
class ReversalFakeoutAlpha(AlphaModel):
def __init__(self):
self.symbols = {}
self.lastWeek = -1
def Update(self, algorithm, data):
insights = []
# Update daily rolling windows with daily data
for symbol, symbolData in self.symbols.items():
if data.ContainsKey(symbol) and data[symbol] != None:
symbolData.dailyWindow.Add(data[symbol])
# Retrieve week number from datetime
thisWeek = algorithm.Time.isocalendar()[1]
# Only rebalance once a week
if self.lastWeek == thisWeek:
return insights
self.lastWeek = thisWeek
# Retrieve all symbols that are ready
symbols = [symbol for symbol, symbolData in self.symbols.items() if symbolData.IsReady]
# Sort by annualized returns in descending order
sortedByReturns = sorted(symbols, key=lambda s: self.symbols[s].AnnualizedReturn, reverse=True)
winningSymbols = sortedByReturns[:100]
losingSymbols = sortedByReturns[-100:]
# Sort symbols by continuity and filter for symbols with recent reversals
longContinuity = [symbol for symbol in sorted(winningSymbols, key=lambda s: self.symbols[s].Continuity, reverse=False) if self.symbols[symbol].RecentDownTrend]
shortContinuity = [symbol for symbol in sorted(losingSymbols, key=lambda s: self.symbols[s].Continuity, reverse=True) if self.symbols[symbol].RecentUpTrend]
# Create insights for the top 5 most continous symbols which are in a recent reversal
insights += [Insight.Price(symbol, timedelta(days = 5), InsightDirection.Up) for symbol in longContinuity[:5]]
insights += [Insight.Price(symbol, timedelta(days = 5), InsightDirection.Down) for symbol in shortContinuity[:5]]
return insights
def OnSecuritiesChanged(self, algorithm, changes):
for security in changes.AddedSecurities:
symbol = security.Symbol
if symbol not in self.symbols:
self.symbols[symbol] = SymbolData(algorithm, symbol)
for security in changes.RemovedSecurities:
symbol = security.Symbol
if symbol in self.symbols:
# Remove consolidators from algorithm and remove symbol from dictionary
algorithm.SubscriptionManager.RemoveConsolidator(symbol, self.symbols[symbol].monthlyConsolidator)
algorithm.SubscriptionManager.RemoveConsolidator(symbol, self.symbols[symbol].dailyConsolidator)
self.symbols.pop(symbol, None)
class SymbolData:
def __init__(self, algorithm, symbol):
self.algorithm = algorithm
self.symbol = symbol
# Define daily and monthly rolling windows
self.monthlyWindow = RollingWindow[TradeBar](13)
self.dailyWindow = RollingWindow[TradeBar](280)
# Define daily and monthly consolidators
self.monthlyConsolidator = algorithm.Consolidate(symbol, Calendar.Monthly, self.OnMonthlyData)
self.dailyConsolidator = TradeBarConsolidator(timedelta(days = 1))
#ZO CHANGES
#MIGHT need to use MINUTE (to consolidate UP to 15M?)
self.fifteenminConsolidator = TradeBarConsolidator(timedelta(minutes = 15))
algorithm.SubscriptionManager.AddConsolidator(symbol, self.fifteenminConsolidator)
self.BBD = BollingerBands(20,1,MovingAverageType.Simple)
self.BB15 = BollingerBands(20,1,MovingAverageType.Simple)
algorithm.RegisterIndicator(symbol, self.BBD, self.dailyConsolidator)
algorithm.RegisterIndicator(symbol, self.BB15, self.fifteenminConsolidator)
#ZO
# Register daily consolistor to algorithm
algorithm.SubscriptionManager.AddConsolidator(symbol, self.dailyConsolidator)
# Define and register ADX indicator
self.adxThreshold = 25
self.adx = AverageDirectionalIndex(20)
algorithm.RegisterIndicator(symbol, self.adx, self.dailyConsolidator)
# Use historical data to warmup rolling windows, consolidators, and indicators
history = algorithm.History(symbol, 280, Resolution.Daily)
for bar in history.itertuples():
tbar = TradeBar(bar.Index[1], symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
self.dailyWindow.Add(tbar)
self.monthlyConsolidator.Update(tbar)
self.fifteenminConsolidator.Update(tbar)
self.adx.Update(tbar)
def OnMonthlyData(self, bar):
"""Adds monthly bars to monthly rolling window"""
self.monthlyWindow.Add(bar)
@property
def Continuity(self):
"""Returns the difference between losing days and winning days as a percentage of trading days"""
positives = 0
negatives = 0
for bar in self.dailyWindow:
dreturn = (bar.Close-bar.Open/bar.Open)
if dreturn > 0:
positives += 1
else:
negatives += 1
return (negatives - positives)/(negatives + positives)
@property
def AnnualizedReturn(self):
"""Returns the 12 month compounded monthly return over a 13 month lookback period
skipping the latest month."""
returns = []
for bar in self.monthlyWindow:
monthlyReturn = (bar.Close/bar.Open)
returns.append(monthlyReturn)
returns.pop(0)
return np.prod(returns) - 1
@property
def RecentDownTrend(self):
"""Returns true if the ADX is above a given threshold and DX+ is lower than DX-"""
return self.adx.Current.Value > self.adxThreshold and \
self.adx.PositiveDirectionalIndex.Current.Value < self.adx.NegativeDirectionalIndex.Current.Value
@property
def RecentUpTrend(self):
"""Returns true if the ADX is above a given threshold and DX+ is higher than DX-"""
return self.adx.Current.Value > self.adxThreshold and \
self.adx.PositiveDirectionalIndex.Current.Value > self.adx.NegativeDirectionalIndex.Current.Value
@property
def IsReady(self):
"""Returns true if all the rolling windows and indicators are ready: have been updated
with enough inputs to yield valid values"""
return self.monthlyWindow.IsReady and self.dailyWindow.IsReady and self.adx.IsReady
import numpy as np
class CrossSectionalMomentum(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1)
self.SetEndDate(2019, 1, 2)
self.SetCash(1000000)
self.SetBrokerageModel(AlphaStreamsBrokerageModel())
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverse(self.CoarseSelection, self.FineSelection)
self.SetAlpha(CounterTrendAlpha())
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.SetRiskManagement(TrailingStopRiskManagementModel(0.02))
self.SetExecution(ImmediateExecutionModel())
def CoarseSelection(self, coarse):
sbVol = sorted(coarse, key = lambda c: c.DollarVolume, reverse=True)[:100]
return [s.Symbol for s in sbVol]
def FineSelection(self, fine):
#sbFCFG = sorted(fine, key = lambda x: x.ValuationRatios.EVToEBITDA, reverse=True)
posEVTEB = [fine for fine in fine if fine.ValuationRatios.EVToEBITDA > 0]
posFCFG = [fine for fine in posEVTEB if fine.OperationRatios.FCFGrowth.OneYear > 0]
#SPY_ONLY = [fine for fine in fine if fine.Symbol[:3] == 'SPY']
return [f.Symbol for f in posFCFG]
class CounterTrendAlpha(AlphaModel):
def __init__(self):
self.symbols = {}
self.lastWeek = -1
def Update(self, algorithm, data):
insights = []
# Update daily rolling windows with daily data -- Thanks !
for symbol, symbolData in self.symbols.items():
if data.ContainsKey(symbol) and data[symbol] != None:
symbolData.dailyWindow.Add(data[symbol])
#algorithm.Debug(f'{symbol}, {symbolData}')
syms = [sym for sym, data in self.symbols.items() if data.IsReady]
#syms = [symbol for symbol, symbolData in self.symbols.items()] #if symbolData.IsReady]
#self.longs = [sym for sym in syms if sym.]
self.longs = [symbol for symbol, SymbolData in self.symbols.items() if SymbolData.long_cross]
self.shorts = [symbol for symbol, SymbolData in self.symbols.items() if SymbolData.short_cross]
for sym in self.longs:
self.Debug(f'{self.Securities[sym].Price}')
insights += [Insight.Price(symbol, timedelta(days=99), InsightDirection.Up) for symbol in self.longs]
insights += [Insight.Price(symbol, timedelta(days=99), InsightDirection.Down) for symbol in self.shorts]
#Alpha Models DONT EMIT -- simply RETURN insights.. PCM EMITS?
return insights
def OnSecuritiesChanged(self, algorithm, changes):
for security in changes.AddedSecurities:
symbol = security.Symbol
if symbol not in self.symbols:
self.symbols[symbol] = SymbolData(algorithm, symbol) #INITIALIZE Indics etc
algorithm.Debug(f'{self.symbols[symbol]}')
for security in changes.RemovedSecurities:
symbol = security.Symbol
if symbol in self.symbols:
# Remove consolidators from algorithm and remove symbol from dictionary -- GC Equiv.
algorithm.SubscriptionManager.RemoveConsolidator(symbol, self.symbols[symbol].fmCons)
algorithm.SubscriptionManager.RemoveConsolidator(symbol, self.symbols[symbol].dailyCons)
self.symbols.pop(symbol, None)
class SymbolData:
'''Get required indicators, windows, etc'''
#DONT NEED HISTORY dont think -- can CALL INSIDE and DECOUPLE FURTHER
def __init__(self, algorithm, symbol):
self.algo = algorithm
self.symbol = symbol
self.dailyWindow = RollingWindow[TradeBar](10)
self.fmWindow = RollingWindow[TradeBar](10)
self.minWindow = RollingWindow[TradeBar](10)
self.uppers = RollingWindow[float](5)
self.lowers = RollingWindow[float](5)
#Consolidators...
self.dailyCons = TradeBarConsolidator(timedelta(days = 1))
self.fmCons = TradeBarConsolidator(timedelta(minutes = 15))
self.minCons = TradeBarConsolidator(timedelta(minutes = 1))
algorithm.SubscriptionManager.AddConsolidator(symbol, self.fmCons)
algorithm.SubscriptionManager.AddConsolidator(symbol, self.dailyCons)
algorithm.SubscriptionManager.AddConsolidator(symbol, self.minCons)
#Indicators + Subscriptions
self.BBD = BollingerBands(20,1,MovingAverageType.Simple)
self.BB15 = BollingerBands(20,1, MovingAverageType.Simple)
self.MA = SimpleMovingAverage(12)
algorithm.RegisterIndicator(symbol, self.BBD, self.dailyCons)
algorithm.RegisterIndicator(symbol, self.BB15, self.fmCons)
algorithm.RegisterIndicator(symbol, self.MA, self.dailyCons)
#This might need to be minute for 15M consolidator
history = algorithm.History(symbol, 35, Resolution.Daily) #MIGHT need to be minute
for bar in history.itertuples():
tb = TradeBar(bar.Index[1], bar.open, bar.high, bar.low, bar.close, bar.volume)
#Consolidators
self.dailyCons.Update(tb)
self.fmCons.Update(tb)
self.minCons.Update(tb)
#Windows
self.dailyWindow.Add(tb)
self.fmWindow.Add(tb)
self.minWindow.Add(tb)
#Indicators
self.BBD.Update(tb.EndTime, tb.Close)
self.BB15.Update(tb.EndTime, tb.Close)
self.MA.Update(tb.EndTime, tb.Close)
#Indic Windows
self.uppers.Add(self.BBD.UpperBand.Current.Value)
self.lowers.Add(self.BBD.LowerBand.Current.Value)
def res_and_sup(self):
#Helper function -- for Resistance and Support levels.
if self.IsReady:
center = (self.dailyWindow[0].Open + self.dailyWindow[1].Close) / 2
R2 = center + .5 * self.MA.Current.Value
S2 = center - .5 * self.MA.Current.Value
return R2, S2
@property
def long_cross(self):
#hist = self.History(self.symbol, 1,Resolution.Minute)
cond1, cond2 = False, False
if self.IsReady:
R2, _ = self.res_and_sup()
cond1 = self.BB15.LowerBand.Current.Value > self.uppers[3]
cond2 = self.fmWindow[0].Close > R2
return cond1 and cond2
@property
def short_cross(self):
#hist = self.History(self.symbol, 1,Resolution.Minute)
cond1, cond2 = False, False
if self.IsReady:
_, S2 = self.res_and_sup()
cond1 = self.BB15.UpperBand.Current.Value < self.lowers[3]
cond2 = self.fmWindow[0].Close < S2
return cond1 and cond2
#Very fucking cool ! Love this implementation style -- Very C#.
@property
def pivlo(self):
for i in range(1,8):
if self.fmWindow[i].Close > self.fmWindow[i + 1].Close:
return False
if self.fmWindow[0].Close < self.fmWindow[1].Close:
return False
return True
@property
def pivhi(self):
for i in range(1,8):
if self.fmWindow[i].Close < self.fmWindow[i + 1].Close:
return False
if self.fmWindow[0].Close > self.fmWindow[1].Close:
return False
return True
@property
def IsReady(self):
return self.BBD.IsReady and self.BB15.IsReady and self.MA.IsReady and self.uppers.IsReady and self.lowers.IsReady
'''
Changes --
Changed logic of cond2's - why does self.Securities.Price not work?
'''