| Overall Statistics |
|
Total Trades 4 Average Win 0% Average Loss 0% Compounding Annual Return -2.673% Drawdown 0.400% Expectancy 0 Net Profit -0.222% Sharpe Ratio -2.92 Probabilistic Sharpe Ratio 0.370% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha -0.018 Beta 0.007 Annual Standard Deviation 0.007 Annual Variance 0 Information Ratio 1.219 Tracking Error 0.169 Treynor Ratio -2.787 Total Fees $98.02 Estimated Strategy Capacity $35000.00 Lowest Capacity Asset GBL RI5XSOOLWW9X |
#region imports
from AlgorithmImports import *
#endregion
class ConstantEtfAlphaModel(AlphaModel):
# Constant Alpha Model for ETFs
def __init__(self, etfTickers):
self.Name = 'ETF'
self.etfSymbols = [Symbol.Create(ticker, SecurityType.Equity, Market.USA) for ticker in etfTickers]
self.nextUpdateTime = datetime.min
def Update(self, algorithm, data):
insights = []
if algorithm.Time < self.nextUpdateTime:
return insights
expiry = Expiry.EndOfYear(algorithm.Time)
for symbol in self.etfSymbols:
if symbol not in data.Bars:
continue
insights.append(Insight.Price(symbol, expiry, InsightDirection.Up))
if all([algorithm.Portfolio[symbol].TotalSaleVolume > 0 for symbol in self.etfSymbols]):
self.nextUpdateTime = expiry
return insights
def OnSecuritiesChanged(self, algorithm, changes):
pass
#region imports
from AlgorithmImports import *
#endregion
import operator
import numpy as np
from collections import namedtuple
import functools
class MomentumAlphaModel(AlphaModel):
def __init__(self, algorithm, numberOfMonthsUntilFullyInvested, maxNumberOfStocksToHold_Momentum, maxNumberOfStocksToHold_Momentum_VolatilityCondition,
numberOfTradingDaysInvestedIfInProfit, numberOfTradingDaysInvestedIfInLoss, minMomentumPercent, riskManagementEnabled):
self.Name = "Momentum"
self.symbolDataDict = {}
self.nextUpdateTime = datetime.min
self.numberOfMonthsUntilFullyInvested = numberOfMonthsUntilFullyInvested
self.maxNumberOfStocksToHold = maxNumberOfStocksToHold_Momentum
self.numberOfStocksToAddPerMonth = int(maxNumberOfStocksToHold_Momentum/numberOfMonthsUntilFullyInvested)
insightDuration = namedtuple('InsightDuration', 'Profit, Loss')
self.insightDuration = insightDuration(Profit=int(365/252*numberOfTradingDaysInvestedIfInProfit), Loss=int(365/252*numberOfTradingDaysInvestedIfInLoss))
self.minMomentumPercent = minMomentumPercent
self.IsInitRun = True
self.monthsSinceStart = 0
self.month = None
self.scheduledEvent = None
self.pcm = algorithm.pcm
self.riskModel = None
self.riskManagementEnabled = riskManagementEnabled
self.insightCollection = InsightCollection()
def Update(self, algorithm, data):
insights = []
# check if it's time to emit insights
if algorithm.Time < self.nextUpdateTime:
return insights
# if this is the first run and we're in live mode, we must load the insights from the object store
if self.IsInitRun:
insightsFromObjectStore = self.LoadInsightsFromObjectStore(algorithm)
insights.extend(insightsFromObjectStore)
self.monthsSinceStart += int(len(insightsFromObjectStore)/self.numberOfStocksToAddPerMonth)
self.IsInitRun = False
if self.riskModel is not None and self.riskModel.safeModeExpiryTime > algorithm.Time:
insights.extend(self.AdjustInsightExpiryTimesForActiveMomentumHoldings(algorithm))
return insights
# check if we can add more holdings for this strategy
targetCount = min(self.maxNumberOfStocksToHold, self.monthsSinceStart*self.numberOfStocksToAddPerMonth)
count = len([symbol for symbol, symbolData in self.symbolDataDict.items() if symbolData.Holdings.Invested and symbolData.InsightExpiry >= algorithm.Time])
if count > targetCount - self.numberOfStocksToAddPerMonth:
self.nextUpdateTime = self.GetNextUpdateTime(algorithm)
insights.extend(self.AdjustInsightExpiryTimesForActiveMomentumHoldings(algorithm))
return insights
if self.month != algorithm.Time.month:
# create the ranking (Momentum & Kaufman Efficiency Ratio) and select the best N stocks where we're not invested yet; default is N=2
filteredSymbolDataDict = {symbol : symbolData for symbol, symbolData in self.symbolDataDict.items() if (symbol in data.Bars
and symbolData.IsReady
and symbolData.Momentum > self.minMomentumPercent
and (not symbolData.Holdings.Invested))}
sortedByMomentum = [*map(operator.itemgetter(0), sorted(filteredSymbolDataDict.items(), key = lambda x: x[1].Momentum, reverse=True))]
sortedByKER = [*map(operator.itemgetter(0), sorted(filteredSymbolDataDict.items(), key = lambda x: x[1].KER ))]
totalRank = {symbol : sortedByMomentum.index(symbol) + sortedByKER.index(symbol) for symbol, _ in filteredSymbolDataDict.items()}
sortedByTotalRank = sorted(totalRank.items(), key = lambda x: x[1])
selection = [x[0] for x in sortedByTotalRank if not algorithm.Portfolio[x[0]].Invested][:self.numberOfStocksToAddPerMonth]
if len(selection) >= self.numberOfStocksToAddPerMonth and count >= targetCount:
self.month = algorithm.Time.month
elif len(selection) == 0:
insights.extend(self.AdjustInsightExpiryTimesForActiveMomentumHoldings(algorithm))
return insights
# create insights for the selected stocks
for symbol in selection:
symbolData = self.symbolDataDict[symbol]
if not algorithm.Portfolio[symbol].Invested:
symbolData.InsightExpiry = algorithm.Time + timedelta(days=min(self.insightDuration))
insights.append(Insight.Price(symbol, symbolData.InsightExpiry, InsightDirection.Up))
count += 1
# adjust the time for nextInsightUpdate to avoid unnecessary computations
if len(insights) >= self.numberOfStocksToAddPerMonth or (count >=targetCount):
self.nextUpdateTime = self.GetNextUpdateTime(algorithm)
insights.extend(self.AdjustInsightExpiryTimesForActiveMomentumHoldings(algorithm))
if algorithm.LiveMode:
self.insightCollection.AddRange(insights)
return insights
def AdjustInsightExpiryTimesForActiveMomentumHoldings(self, algorithm):
# adjust insight expiry times for symbols with active insights if necessary
insights = []
for symbol, symbolData in self.symbolDataDict.items():
if not symbolData.Holdings.Invested or symbolData.InsightExpiry < algorithm.Time:
continue
if symbolData.InsightExpiry > self.nextUpdateTime + timedelta(1):
continue
if (not symbolData.Flag and symbolData.Holdings.UnrealizedProfit > 0):
symbolData.Flag = True
td = timedelta(days = self.insightDuration.Profit - self.insightDuration.Loss)
if td.days > 0:
symbolData.InsightExpiry += td
insights.append(Insight.Price(symbol, symbolData.InsightExpiry, InsightDirection.Up))
return insights
def GetNextUpdateTime(self, algorithm):
nextInsightExpiry = min([symbolData.InsightExpiry for _, symbolData in self.symbolDataDict.items() if symbolData.Holdings.Invested and symbolData.InsightExpiry > algorithm.Time], default=datetime.max)
return min([Expiry.EndOfMonth(algorithm.Time) - timedelta(days=3), nextInsightExpiry])
def OnSecuritiesChanged(self, algorithm, changes):
if self.riskManagementEnabled and self.riskModel is None and self.pcm.riskModel is not None:
self.riskModel = self.pcm.riskModel
# tracks the changes in our universe of securities
if self.scheduledEvent is None and algorithm.Benchmark is not None:
symbol = algorithm.Benchmark.Security.Symbol
self.scheduledEvent = algorithm.Schedule.On(algorithm.DateRules.MonthEnd(symbol, 1), algorithm.TimeRules.Midnight, functools.partial(self.PersistInsightCollection, algorithm) )
for security in changes.RemovedSecurities:
symbolData = self.symbolDataDict.pop(security.Symbol, None)
if symbolData is not None:
symbolData.Dispose(algorithm)
for security in changes.AddedSecurities:
if not security.IsTradable or security.Fundamentals is None:
continue
symbol = security.Symbol
if symbol not in self.symbolDataDict:
self.symbolDataDict[symbol] = SymbolData(algorithm, security)
def PersistInsightCollection(self, algorithm):
# we also use this scheduled event handler to adjust self.monthsSinceStart
self.monthsSinceStart += 1
# stores all active insights from this alpha model in an object store
if not algorithm.LiveMode:
return
activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)
insightDict = {insight.Id : [insight.Symbol, insight.Period, insight.Direction, insight.SourceModel, insight.GeneratedTimeUtc, insight.CloseTimeUtc] for insight in activeInsights}
df = pd.DataFrame(insightDict.values(), index=insightDict.keys(), columns=['Symbol','Period','Direction','SourceModel','GeneratedTimeUtc','CloseTimeUtc'])
algorithm.ObjectStore.Save(self.Name + "InsightCollection", df.to_json(orient='split', default_handler = str))
def LoadInsightsFromObjectStore(self, algorithm):
# loads all insights from the object store if we are in live mode
# ensures we track the insight timings correctly between redeployments
if not algorithm.LiveMode:
return []
insights = []
insightCollection = InsightCollection()
key = self.Name + "InsightCollection"
if not algorithm.ObjectStore.ContainsKey(key):
return insights
storedInsightsDf = pd.read_json(algorithm.ObjectStore.Read(key), orient='split')
storedInsightsDf.loc[:, 'Period'] = pd.to_timedelta(storedInsightsDf.Period, unit='ms')
storedInsightsDf.loc[:, 'GeneratedTimeUtc'] = pd.to_datetime(storedInsightsDf.GeneratedTimeUtc, unit='ms', utc=True)
storedInsightsDf.loc[:, 'CloseTimeUtc'] = pd.to_datetime(storedInsightsDf.CloseTimeUtc, unit='ms', utc=True)
filteredInsightsDf = storedInsightsDf.loc[(storedInsightsDf.SourceModel.eq(self.Name)
& storedInsightsDf.CloseTimeUtc.gt(algorithm.UtcTime)
& storedInsightsDf.Direction.eq(1)
& storedInsightsDf.GeneratedTimeUtc.lt(algorithm.UtcTime))]
for row in filteredInsightsDf.itertuples():
symbol = SymbolCache.GetSymbol(row.Symbol)
if symbol not in self.symbolDataDict:
if symbol not in algorithm.ActiveSecurities.Keys:
continue
security = algorithm.Securities[symbol]
if not (security.IsTradable and symbol in algorithm.CurrentSlice.Bars):
continue
self.symbolDataDict[symbol] = SymbolData(algorithm, security)
self.symbolDataDict[symbol].InsightExpiry = row.CloseTimeUtc.to_pydatetime()
insight = Insight.Price(symbol, row.CloseTimeUtc.to_pydatetime(), row.Direction)
insights.append(insight)
return insights
class SymbolData:
def __init__(self, algorithm, security):
self.algorithm = algorithm
self.Security = security
self.Symbol = security.Symbol
self.Holdings = security.Holdings
self.InsightExpiry = datetime.min
self.Flag = False
self.SetupIndicators(algorithm)
def SetupIndicators(self, algorithm):
resolution = Resolution.Daily
period = self.Security.Exchange.TradingDaysPerYear
delay_period = int(period/12)
self.momp = MomentumPercent(period)
self.momentum = IndicatorExtensions.Of(Delay(delay_period), self.momp)
algorithm.RegisterIndicator(self.Symbol, self.momp, resolution)
algorithm.WarmUpIndicator(self.Symbol, self.momp, resolution)
self.ker = KaufmanEfficiencyRatio(period)
self.ker = IndicatorExtensions.Of(Delay(delay_period), self.ker)
algorithm.RegisterIndicator(self.Symbol, self.ker, resolution)
algorithm.WarmUpIndicator(self.Symbol, self.ker, resolution)
self.Consolidator = algorithm.ResolveConsolidator(self.Symbol, resolution)
def Dispose(self, algorithm):
algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.Consolidator)
@property
def Momentum(self):
return self.momentum.Current.Value
@property
def KER(self):
return self.ker.Current.Value
@property
def IsReady(self):
return self.momp.IsReady and self.ker.IsReady
class RiskModel:
def __init__(self, algorithm, security,
momentumBudget, momentumBudgetVolCond,
valueBudget, valueBudgetVolCond,
riskManagementEnabled, maxDrawdown, maxVIX, numberOfTradingDaysBeforeReset):
self.algorithm = algorithm
self.Security = security
self.Symbol = security.Symbol
self.portfolioHigh = algorithm.Portfolio.TotalPortfolioValue
self.SafeMode = False
self.safeModeExpiryTime = datetime.min
self.momentumBudget = momentumBudget
self.momentumBudgetDefault = momentumBudget
self.momentumBudgetVolCond = momentumBudgetVolCond
self.valueBudget = valueBudget
self.valueBudgetDefault = valueBudget
self.valueBudgetVolCond = valueBudgetVolCond
self.riskManagementEnabled = riskManagementEnabled
self.maxDrawdown = -abs(maxDrawdown)
self.maxVIX = maxVIX
self.numberOfTradingDaysBeforeReset = timedelta(days=numberOfTradingDaysBeforeReset)
self.budgetBySourceModel = defaultdict(float)
self.budgetBySourceModel.update({'Momentum':momentumBudget, 'Value':valueBudget})
self.hour = None
algorithm.Schedule.On(algorithm.DateRules.EveryDay(self.Symbol), algorithm.TimeRules.Every(timedelta(hours=1)), self.Update)
@property
def BudgetBySourceModel(self):
if not self.riskManagementEnabled:
return self.budgetBySourceModel
self.budgetBySourceModel.update({"Momentum":self.momentumBudget, "Value":self.valueBudget})
return self.budgetBySourceModel
def Update(self):
if not self.SafeMode and self.algorithm.Time > self.safeModeExpiryTime:
self.momentumBudget = self.momentumBudgetDefault
self.valueBudget = self.valueBudgetDefault
self.SafeMode = self.Security.Price > self.maxVIX and self.CurrentDrawdown < self.maxDrawdown
if self.SafeMode:
self.momentumBudget = self.momentumBudgetVolCond
self.valueBudget = self.valueBudgetVolCond
self.algorithm.Debug(f"{self.algorithm.Time} | ACHTUNG !!! Volatility-Condition erfüllt!")
else:
if self.SafeMode and self.Security.Price < self.maxVIX and self.CurrentDrawdown > self.maxDrawdown:
self.safeModeExpiryTime = self.algorithm.Time + self.numberOfTradingDaysBeforeReset
self.SafeMode = False
self.algorithm.Debug(f"{self.algorithm.Time} | Volatility-Condition wieder aufgehoben!")
self.algorithm.Debug(f"Safe-Mode verfällt am {self.safeModeExpiryTime}.")
@property
def CurrentDrawdown(self):
currentTPV = self.algorithm.Portfolio.TotalPortfolioValue
if currentTPV > self.portfolioHigh:
self.portfolioHigh = currentTPV
return currentTPV/self.portfolioHigh - 1
#region imports
from AlgorithmImports import *
#endregion
from collections import namedtuple
import operator
import functools
class ValueAlphaModel(AlphaModel):
def __init__(self, algorithm, numberOfMonthsUntilFullyInvested, maxNumberOfStocksToHold_Value, maxNumberOfStocksToHold_Value_VolatilityCondition,
min_ROA, min_PE_Ratio, numberOfTradingDaysInvestedIfInProfit, numberOfTradingDaysInvestedIfInLoss, riskManagementEnabled):
self.Name = "Value"
self.symbolDataDict = {}
self.nextUpdateTime = datetime.min
self.numberOfMonthsUntilFullyInvested = numberOfMonthsUntilFullyInvested
self.maxNumberOfStocksToHold = maxNumberOfStocksToHold_Value
self.maxNumberOfStocksToHold_Value_VolatilityCondition = maxNumberOfStocksToHold_Value_VolatilityCondition
self.numberOfStocksToAddPerMonth = int(maxNumberOfStocksToHold_Value/numberOfMonthsUntilFullyInvested)
self.min_ROA = min_ROA
self.min_PE_Ratio = min_PE_Ratio
insightDuration = namedtuple('InsightDuration', 'Profit, Loss')
self.insightDuration = insightDuration(Profit=int(365/252*numberOfTradingDaysInvestedIfInProfit), Loss=int(365/252*numberOfTradingDaysInvestedIfInLoss))
self.monthsSinceStart = 0
self.month = None
self.IsInitRun = True
self.scheduledEvent = None
self.insightCollection = InsightCollection()
self.pcm = algorithm.pcm
self.riskModel = None
self.riskManagementEnabled = riskManagementEnabled
def Update(self, algorithm, data):
# !!!!!!!!!!!!
######## DEBUG
if algorithm.Time > datetime(2020, 3, 1):
BREAK = "BREAK!"
######## END OF DEBUG
# !!!!!!!!!!!!!!!!!
insights = []
# check if it's time to emit insights
if algorithm.Time < self.nextUpdateTime:
return insights
# if this is the first run and we're in live mode, we must load the insights from the object store
if self.IsInitRun:
insightsFromObjectStore = self.LoadInsightsFromObjectStore(algorithm)
insights.extend(insightsFromObjectStore)
self.monthsSinceStart += int(len(insightsFromObjectStore)/self.numberOfStocksToAddPerMonth)
self.IsInitRun = False
targetCount = min(self.maxNumberOfStocksToHold, self.monthsSinceStart*self.numberOfStocksToAddPerMonth)
# adjust the target holdings count if volatility condition is met
if self.riskManagementEnabled and self.riskModel is not None and (self.riskModel.SafeMode or self.riskModel.safeModeExpiryTime > algorithm.Time):
targetCount = self.riskModel.NumberOfStocksBySourceModel[self.Name]
count = len([symbol for symbol, symbolData in self.symbolDataDict.items() if symbolData.Holdings.Invested and symbolData.InsightExpiry >= algorithm.Time])
# check if we can add more holdings for this strategy
if count > targetCount - self.numberOfStocksToAddPerMonth:
self.nextUpdateTime = self.GetNextUpdateTime(algorithm)
insights.extend(self.AdjustInsightExpiryTimesForActiveValueHoldings(algorithm))
return insights
if self.month != algorithm.Time.month:
# create ranking for this strategy (based on EVtoEBIT)
filteredSymbolDataDict = {symbol : symbolData for symbol, symbolData in self.symbolDataDict.items() if (symbol in data.Bars
and symbolData.ROA > self.min_ROA
and symbolData.PE_Ratio > self.min_PE_Ratio
and (not symbolData.Holdings.Invested))}
sortedByEVToEBIT = sorted(filteredSymbolDataDict.items(), key = lambda x: x[1].EVtoEBIT)
selection = [*map(operator.itemgetter(0), sortedByEVToEBIT)][:self.numberOfStocksToAddPerMonth]
if len(selection) >= self.numberOfStocksToAddPerMonth and count >= targetCount:
self.month = algorithm.Time.month
elif len(selection) == 0:
insights.extend(self.AdjustInsightExpiryTimesForActiveValueHoldings(algorithm))
return insights
# create insights for the selected stocks
for symbol in selection:
symbolData = self.symbolDataDict[symbol]
if not algorithm.Portfolio[symbol].Invested:
symbolData.InsightTime = algorithm.Time
symbolData.InsightExpiry = algorithm.Time + timedelta(days=min(self.insightDuration))
insights.append(Insight.Price(symbol, symbolData.InsightExpiry, InsightDirection.Up))
count +=1
# adjust the time for nextInsightUpdate to avoid unnecessary computations
if len(insights) >= self.numberOfStocksToAddPerMonth or (count >=targetCount):
self.nextUpdateTime = self.GetNextUpdateTime(algorithm)
insights.extend(self.AdjustInsightExpiryTimesForActiveValueHoldings(algorithm))
if algorithm.LiveMode:
self.insightCollection.AddRange(insights)
return insights
def AdjustInsightExpiryTimesForActiveValueHoldings(self, algorithm):
# adjust insight expiry times for symbols with active insights if necessary
insights = []
for symbol, symbolData in self.symbolDataDict.items():
if not symbolData.Holdings.Invested or symbolData.InsightExpiry < algorithm.Time:
continue
if symbolData.InsightExpiry > self.nextUpdateTime + timedelta(1):
continue
if (not symbolData.Flag and symbolData.Holdings.UnrealizedProfit > 0):
symbolData.Flag = True
td = timedelta(days = self.insightDuration.Profit - self.insightDuration.Loss)
if td.days > 0:
symbolData.InsightExpiry += td
insights.append(Insight.Price(symbol, symbolData.InsightExpiry, InsightDirection.Up))
return insights
def GetNextUpdateTime(self, algorithm):
nextInsightExpiry = min([symbolData.InsightExpiry for _, symbolData in self.symbolDataDict.items() if symbolData.Holdings.Invested and symbolData.InsightExpiry > algorithm.Time], default=datetime.max)
return min([Expiry.EndOfMonth(algorithm.Time) - timedelta(days=3), nextInsightExpiry])
def OnSecuritiesChanged(self, algorithm, changes):
if self.riskManagementEnabled and self.riskModel is None and self.pcm.riskModel is not None:
self.riskModel = self.pcm.riskModel
if self.scheduledEvent is None and algorithm.Benchmark is not None:
symbol = algorithm.Benchmark.Security.Symbol
self.scheduledEvent = algorithm.Schedule.On(algorithm.DateRules.MonthEnd(symbol, 1), algorithm.TimeRules.Midnight, functools.partial(self.PersistInsightCollection, algorithm) )
for security in changes.RemovedSecurities:
symbolData = self.symbolDataDict.pop(security.Symbol, None)
if symbolData is not None:
symbolData.Dispose(algorithm)
for security in changes.AddedSecurities:
if not security.IsTradable or security.Fundamentals is None:
continue
symbol = security.Symbol
if symbol not in self.symbolDataDict:
self.symbolDataDict[symbol] = SymbolData(algorithm, security)
def PersistInsightCollection(self, algorithm):
# we also use this scheduled event handler to adjust self.monthsSinceStart
self.monthsSinceStart += 1
# stores all active insights from this alpha model in an object store
if not algorithm.LiveMode:
return
activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)
insightDict = {insight.Id : [insight.Symbol, insight.Period, insight.Direction, insight.SourceModel, insight.GeneratedTimeUtc, insight.CloseTimeUtc] for insight in activeInsights}
df = pd.DataFrame(insightDict.values(), index=insightDict.keys(), columns=['Symbol','Period','Direction','SourceModel','GeneratedTimeUtc','CloseTimeUtc'])
algorithm.ObjectStore.Save(self.Name + "InsightCollection", df.to_json(orient='split', default_handler = str))
def LoadInsightsFromObjectStore(self, algorithm):
if not algorithm.LiveMode:
return []
insights = []
insightCollection = InsightCollection()
key = self.Name + "InsightCollection"
if not algorithm.ObjectStore.ContainsKey(key):
return []
storedInsightsDf = pd.read_json(algorithm.ObjectStore.Read(key), orient='split')
storedInsightsDf.loc[:, 'Period'] = pd.to_timedelta(storedInsightsDf.Period, unit='ms')
storedInsightsDf.loc[:, 'GeneratedTimeUtc'] = pd.to_datetime(storedInsightsDf.GeneratedTimeUtc, unit='ms', utc=True)
storedInsightsDf.loc[:, 'CloseTimeUtc'] = pd.to_datetime(storedInsightsDf.CloseTimeUtc, unit='ms', utc=True)
filteredInsightsDf = storedInsightsDf.loc[(storedInsightsDf.SourceModel.eq(self.Name)
& storedInsightsDf.CloseTimeUtc.gt(algorithm.UtcTime)
& storedInsightsDf.Direction.eq(1)
& storedInsightsDf.GeneratedTimeUtc.lt(algorithm.UtcTime))]
for row in filteredInsightsDf.itertuples():
symbol = SymbolCache.GetSymbol(row.Symbol)
if symbol not in self.symbolDataDict:
security = algorithm.Securities[symbol]
if not (security.IsTradable and symbol in algorithm.CurrentSlice.Bars):
continue
self.symbolDataDict[symbol] = SymbolData(algorithm, security)
self.symbolDataDict[symbol].InsightExpiry = row.CloseTimeUtc.to_pydatetime()
insight = Insight.Price(symbol, row.CloseTimeUtc.to_pydatetime(), row.Direction)
insights.append(insight)
return insights
class SymbolData:
def __init__(self, algorithm, security):
self.algorithm = algorithm
self.Security = security
self.Symbol = security.Symbol
self.Holdings = security.Holdings
self.InsightExpiry = datetime.min
self.Flag = False
self.InsightTime = datetime.min
self.Consolidator = algorithm.ResolveConsolidator(self.Symbol, Resolution.Daily)
def Dispose(self, algorithm):
algorithm.SubscriptionManager.RemoveConsolidator(self.Symbol, self.Consolidator)
@property
def ROA(self):
return self.Security.Fundamentals.OperationRatios.ROA.OneYear
@property
def PE_Ratio(self):
return self.Security.Fundamentals.ValuationRatios.PERatio
@property
def EVtoEBIT(self):
return self.Security.Fundamentals.ValuationRatios.EVtoEBIT
class RiskModel:
def __init__(self, algorithm, security,
momentumBudget, momentumBudgetVolCond,
valueBudget, valueBudgetVolCond,
riskManagementEnabled, maxDrawdown, maxVIX, numberOfTradingDaysBeforeReset):
self.algorithm = algorithm
self.Security = security
self.Symbol = security.Symbol
self.portfolioHigh = algorithm.Portfolio.TotalPortfolioValue
self.SafeMode = False
self.safeModeExpiryTime = datetime.min
self.momentumBudget = momentumBudget
self.momentumBudgetDefault = momentumBudget
self.momentumBudgetVolCond = momentumBudgetVolCond
self.valueBudget = valueBudget
self.valueBudgetDefault = valueBudget
self.valueBudgetVolCond = valueBudgetVolCond
self.riskManagementEnabled = riskManagementEnabled
self.maxDrawdown = -abs(maxDrawdown)
self.maxVIX = maxVIX
self.numberOfTradingDaysBeforeReset = timedelta(days=numberOfTradingDaysBeforeReset)
self.budgetBySourceModel = defaultdict(float)
self.budgetBySourceModel.update({'Momentum':momentumBudget, 'Value':valueBudget})
self.hour = None
algorithm.Schedule.On(algorithm.DateRules.EveryDay(self.Symbol), algorithm.TimeRules.Every(timedelta(hours=1)), self.Update)
@property
def BudgetBySourceModel(self):
if not self.riskManagementEnabled:
return self.budgetBySourceModel
self.budgetBySourceModel.update({"Momentum":self.momentumBudget, "Value":self.valueBudget})
return self.budgetBySourceModel
def Update(self):
if not self.SafeMode and self.algorithm.Time > self.safeModeExpiryTime:
self.momentumBudget = self.momentumBudgetDefault
self.valueBudget = self.valueBudgetDefault
self.SafeMode = self.Security.Price > self.maxVIX and self.CurrentDrawdown < self.maxDrawdown
if self.SafeMode:
self.momentumBudget = self.momentumBudgetVolCond
self.valueBudget = self.valueBudgetVolCond
self.algorithm.Debug(f"{self.algorithm.Time} | ACHTUNG !!! Volatility-Condition erfüllt!")
else:
if self.SafeMode and self.Security.Price < self.maxVIX and self.CurrentDrawdown > self.maxDrawdown:
self.safeModeExpiryTime = self.algorithm.Time + self.numberOfTradingDaysBeforeReset
self.SafeMode = False
self.algorithm.Debug(f"{self.algorithm.Time} | Volatility-Condition wieder aufgehoben!")
self.algorithm.Debug(f"Safe-Mode verfällt am {self.safeModeExpiryTime}.")
@property
def CurrentDrawdown(self):
currentTPV = self.algorithm.Portfolio.TotalPortfolioValue
if currentTPV > self.portfolioHigh:
self.portfolioHigh = currentTPV
return currentTPV/self.portfolioHigh - 1
#region imports
from AlgorithmImports import *
#endregion
############
# def GetLatestLivePortfolioStateFromObjectStore(self, algorithm):
# if not ( self.IsInitRun and algorithm.ObjectStore.ContainsKey('PortfolioState')): #algorithm.LiveMode and ## <--- wieder hinzufügen!!!
# return []
# portfolioState = pd.read_json(algorithm.ObjectStore.Read('PortfolioState'))
# if portfolioState is None or portfolioState.empty:
# return []
# portfolioState.loc[:,'Entry'] = pd.to_datetime(portfolioState.Entry.clip(lower=0).replace(0, np.nan), unit='ms')
# portfolioState.loc[:, 'Exit'] = pd.to_datetime(portfolioState.Exit.clip(lower=0).replace(0, np.nan), unit='ms')
# previousLiveHoldings = portfolioState.loc[portfolioState.Exit.isnull()].index.map(SymbolCache.GetSymbol).tolist()
# self.IsInitRun = False
# return previousLiveHoldings
# self.positionChangesDict = defaultdict(PortfolioSnapshot)
# self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(0,0), self.UpdateCharts)
# def UpdateCharts(self):
# numHoldings = len([holding for _, holding in self.Portfolio.items() if holding.Invested])
# self.Plot('Portfolio Holdings', 'Number of Holdings', numHoldings)
# def OnOrderEvent(self, orderEvent):
# # if not self.LiveMode:
# # return
# if orderEvent.Status == OrderStatus.Filled:
# symbol = orderEvent.Symbol
# if not self.Portfolio[symbol].Invested and symbol in self.positionChangesDict:
# # das heißt es handelt sich hierbei um einen Exit
# self.positionChangesDict.update({symbol : self.positionChangesDict[symbol]._replace(Exit = self.Time, Quantity = 0)})
# return
# self.positionChangesDict.update({symbol : self.positionChangesDict[symbol]._replace(Entry = self.Time, Quantity = orderEvent.Quantity)})
# def OnEndOfAlgorithm(self):
# if not self.LiveMode:
# return
# for key in list([x.Key for x in self.ObjectStore.GetEnumerator()]):
# if key.endswith('InsightCollection'):
# self.ObjectStore.Delete(key)
# hier die positionChangedDict im ObjectStore speichern, falls LiveMode
# if not self.LiveMode:
# return
# if self.ObjectStore.ContainsKey('PortfolioState'):
# self.ObjectStore.Delete('PortfolioState')
# self.ObjectStore.Save('PortfolioState', pd.DataFrame(self.positionChangesDict.values(), index=self.positionChangesDict.keys()).to_json(default_handler=str))
#############
# def GetObjectStoreSymbols(self, algorithm): # SymbolsFromObjectStore(self, algorithm):
# if not (self.IsInitRun and algorithm.ObjectStore.ContainsKey('PortfolioState')):
# return 0
# portfolioState = pd.read_json(algorithm.ObjectStore.Read('PortfolioState'))
# if portfolioState is None or portfolioState.empty:
# return 0
# portfolioState.loc[:, 'Entry'] = pd.to_datetime(portfolioState.Entry.clip(lower=0).replace(0, np.nan), unit='ms')
# portfolioState.loc[:, 'Exit'] = pd.to_datetime(portfolioState.Exit.clip(lower=0).replace(0, np.nan), unit='ms')
# latestPortfolioState = portfolioState.loc[portfolioState.Exit.isnull() & portfolioState.Quantity.gt(0)] \
# .index.map(SymbolCache.GetSymbol).tolist()
# # .apply(lambda x: PortfolioSnapshot(Entry = x.Entry, Exit = x.Exit, Quantity = x.Quantity), axis=1) \
# # .rename(lambda x: SymbolCache.GetSymbol(x)) \
# # .to_dict()
# self.IsInitRun = False
# return latestPortfolioState
# return len([symbol for symbol in latestPortfolioState if algorithm.Portfolio[symbol].Invested])
#####################
#########
# currentTPV = self.algorithm.Portfolio.TotalPortfolioValue
# if currentTPV > self.portfolioHigh:
# self.portfolioHigh = currentTPV
# drawdown = currentTPV/self.portfolioHigh - 1
# # targets.extend(self.CreateTargetsForLivePortfolioStateRecovery(algorithm))
# if not self.latestLivePortfolioState:
# self.latestLivePortfolioState = self.GetLatestLivePortfolioStateFromObjectStore(algorithm)
# for symbol, portfolioSnapshot in list(self.latestLivePortfolioState.items()):
# if algorithm.Time - portfolioSnapshot.Entry > timedelta(days=265):
# target = PortfolioTarget(symbol, 0)
# targets.append(target)
# del self.latestLivePortfolioState[symbol]
# # self.latestLivePortfolioState.pop(symbol)
# continue
# else:
# if not algorithm.Portfolio[symbol].Invested:
# target = PortfolioTarget(symbol, portfolioSnapshot.Quantity)
# residualBuyingPower -= abs(target.Quantity*algorithm.Securities[symbol].AskPrice)
# targets.append(target)
# # if algorithm.Portfolio[symbol].Invested and (algorithm.Time - portfolioSnapshot.Entry < timedelta(days=265)):
# # continue
# # if not algorithm.Portfolio[symbol].Invested and (algorithm.Time - portfolioSnapshot.Entry < timedelta(days=265)):
# # target = PortfolioTarget(symbol, portfolioSnapshot.Quantity)
# # residualBuyingPower -= abs(target.Quantity*algorithm.Securities[symbol].Price)
# # if target is None:
# # self.symbolsToIgnore.add(symbol)
# # continue
# # targets.append(target)
###
# def GetLatestLivePortfolioStateFromObjectStore(self, algorithm):
# if not ( self.IsInitRun and algorithm.ObjectStore.ContainsKey('PortfolioState')): #algorithm.LiveMode and
# return {}
# portfolioState = pd.read_json(algorithm.ObjectStore.Read('PortfolioState'))
# if portfolioState is None or portfolioState.empty:
# return {}
# portfolioState.loc[:, 'Entry'] = pd.to_datetime(portfolioState.Entry.clip(lower=0).replace(0, np.nan), unit='ms')
# portfolioState.loc[:, 'Exit'] = pd.to_datetime(portfolioState.Exit.clip(lower=0).replace(0, np.nan), unit='ms')
# # previousLiveHoldings = portfolioState.loc[portfolioState.Exit.isnull()].index.map(SymbolCache.GetSymbol).tolist()
# # previousLiveHoldings = portfolioState.Quantity.rename(lambda x: SymbolCache.GetSymbol(x)).to_dict()
# # exit = null means there wasn't an exit yet, i.e. we have an active position for that security
# latestPortfolioState = portfolioState.loc[portfolioState.Exit.isnull() & portfolioState.Quantity.gt(0)] \
# .apply(lambda x: PortfolioSnapshot(Entry = x.Entry, Exit = x.Exit, Quantity = x.Quantity), axis=1) \
# .rename(lambda x: SymbolCache.GetSymbol(x)) \
# .to_dict()
# # algorithm.ObjectStore.Delete('PortfolioState')
# self.IsInitRun = False
# return latestPortfolioState
############
# def GetObjectStoreRelatedSymbols(self, algorithm): # SymbolsFromObjectStore(self, algorithm):
# if not (self.IsInitRun and algorithm.ObjectStore.ContainsKey('PortfolioState')): # LIVE_MODE !
# return 0
# portfolioState = pd.read_json(algorithm.ObjectStore.Read('PortfolioState'))
# if portfolioState is None or portfolioState.empty:
# return 0
# portfolioState.loc[:, 'Entry'] = pd.to_datetime(portfolioState.Entry.clip(lower=0).replace(0, np.nan), unit='ms')
# portfolioState.loc[:, 'Exit'] = pd.to_datetime(portfolioState.Exit.clip(lower=0).replace(0, np.nan), unit='ms')
# latestPortfolioState = portfolioState.loc[portfolioState.Exit.isnull() & portfolioState.Quantity.gt(0)] \
# .index.map(SymbolCache.GetSymbol).tolist()
# # .apply(lambda x: PortfolioSnapshot(Entry = x.Entry, Exit = x.Exit, Quantity = x.Quantity), axis=1) \
# # .rename(lambda x: SymbolCache.GetSymbol(x)) \
# # .to_dict()
# self.IsInitRun = False
# return latestPortfolioState
# # CHECK = len([symbol for symbol in latestPortfolioState if algorithm.Portfolio[symbol].Invested]) ###DEBUG
# # return len([symbol for symbol in latestPortfolioState if algorithm.Portfolio[symbol].Invested])
#region imports
from AlgorithmImports import *
#endregion
import math
from helper_tools import sign
from collections import defaultdict
class CustomExecutionModel(ExecutionModel):
def __init__(self):
self.targetCollection = PortfolioTargetCollection()
self.tol = .1
def Execute(self, algorithm, targets):
self.targetCollection.AddRange(targets)
if self.targetCollection.Count > 0:
for target in self.targetCollection.OrderByMarginImpact(algorithm):
orderQuantity = OrderSizing.GetUnorderedQuantity(algorithm, target)
if orderQuantity == 0:
continue
symbol = target.Symbol
security = algorithm.Securities[symbol]
holding = algorithm.Portfolio[symbol]
if security.IsTradable:
marginRemaining = algorithm.Portfolio.MarginRemaining
# we only need to check the remaining buying power if the order execution would increase the margin usage
if not holding.Invested and holding.Quantity*orderQuantity >= 0 and marginRemaining < security.Price*abs(orderQuantity):
buyingPower = min(1, security.Leverage)*marginRemaining*(1 - self.tol)
orderQuantity = min(math.floor(buyingPower/security.Price), abs(orderQuantity))*sign(orderQuantity)
if orderQuantity != 0:
algorithm.MarketOrder(symbol, orderQuantity)
self.targetCollection.ClearFulfilled(algorithm)
#region imports
from AlgorithmImports import *
#endregion
from collections import defaultdict, namedtuple
PortfolioSnapshot = namedtuple('PortfolioSnapshot', 'Entry, Exit, Quantity')
PortfolioSnapshot.__new__.__defaults__ = (datetime.min, )*(len(PortfolioSnapshot._fields) - 1) + (0, )
# faster alternative for itertools.groupby
def GroupBy(iterable, key = lambda x: x):
d = defaultdict(list)
for item in iterable:
d[key(item)].append(item)
return d.items()
def sign(x):
return 1 if x > 0 else(-1 if x < 0 else 0)
#region imports
from AlgorithmImports import *
#endregion
from helper_tools import PortfolioSnapshot
from datetime import datetime, timedelta, timezone
from collections import namedtuple, defaultdict
# ----------------------------------------------------------
# Import the Framework Modules for this strategy
from selection import CustomUniverseSelectionModel
from alpha_mom import MomentumAlphaModel
from alpha_value import ValueAlphaModel
from alpha_etf import ConstantEtfAlphaModel
from portfolio import MultiSourcesPortfolioConstructionModel
from execution import CustomExecutionModel
# ----------------------------------------------------------
class MomentumValueStrategy(QCAlgorithm):
def Initialize(self):
########################################################################
# --------------------------------------------------------------------
# Modifiable code block
# --------------------------------------------------------------------
########################################################################
# General Settings
# --------------------
self.SetStartDate(2022, 8, 1)
# self.SetEndDate(2021, 2, 1)
self.SetCash(1_000_000)
numberOfSymbolsCoarse = 2000
numberOfSymbolsFine = 1000
numberOfMonthsUntilFullyInvested = 12
numberOfTradingDaysInvestedIfInProfit = 260
numberOfTradingDaysInvestedIfInLoss = 260
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage)
self.UniverseSettings.Resolution = Resolution.Minute
self.Settings.FreePortfolioValuePercentage = 0.0
self.DebugMode = False
# Momentum Strategy Params
# -------------------------
maxNumberOfStocksToHold_Momentum = 24
maxNumberOfStocksToHold_Momentum_VolatilityCondition = 24
minMomentumPercent = 0.1
maxBudget_Momentum = 0.5
maxBudget_Momentum_VolatilityCondition = 0.5
# Value Strategy Params
# ----------------------
maxNumberOfStocksToHold_Value = 24
maxNumberOfStocksToHold_Value_VolatilityCondition = 48
min_ROA = 0.15
min_PE_Ratio = 5
maxBudget_Value = 0.5
maxBudget_Value_VolatilityCondition = 0.5
# Risk Management / Volatility Condition Params
# ----------------------------------------------
riskManagementEnabled = False
maxDrawdown = 0.2
maxVIX = 35
numberOfTradingDaysBeforeReset = 265
# ETF Investing Params
# ------------------------
etfTickers = ["XXXZ11"]
minPortfolioBufferToEnableEtfInvesting = 0.1
########################################################################
# ----------------------------------------------------------------------
# End of modifiable code block
# ----------------------------------------------------------------------
########################################################################
self.InitCharts()
self.UniverseSettings.Leverage = 1
self.pcm = MultiSourcesPortfolioConstructionModel(maxBudget_Momentum, maxBudget_Momentum_VolatilityCondition, maxNumberOfStocksToHold_Momentum, maxNumberOfStocksToHold_Momentum_VolatilityCondition,
maxBudget_Value, maxBudget_Value_VolatilityCondition, maxNumberOfStocksToHold_Value, maxNumberOfStocksToHold_Value_VolatilityCondition,
numberOfTradingDaysBeforeReset, riskManagementEnabled, maxDrawdown, maxVIX, minPortfolioBufferToEnableEtfInvesting)
self.AddUniverseSelection(CustomUniverseSelectionModel(numberOfSymbolsCoarse, numberOfSymbolsFine, etfTickers))
self.AddAlpha(MomentumAlphaModel(self, numberOfMonthsUntilFullyInvested, maxNumberOfStocksToHold_Momentum, maxNumberOfStocksToHold_Momentum_VolatilityCondition,
numberOfTradingDaysInvestedIfInProfit, numberOfTradingDaysInvestedIfInLoss, minMomentumPercent, riskManagementEnabled))
self.AddAlpha(ValueAlphaModel(self, numberOfMonthsUntilFullyInvested, maxNumberOfStocksToHold_Value, maxNumberOfStocksToHold_Value_VolatilityCondition,
min_ROA, min_PE_Ratio, numberOfTradingDaysInvestedIfInProfit, numberOfTradingDaysInvestedIfInLoss, riskManagementEnabled))
self.AddAlpha(ConstantEtfAlphaModel(etfTickers))
self.SetPortfolioConstruction(self.pcm)
self.SetExecution(CustomExecutionModel())
def InitCharts(self):
spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self.SetBenchmark(spy)
chart = Chart('Number of Stocks')
chart.AddSeries(Series('Total', SeriesType.Line, 0, ""))
chart.AddSeries(Series('ETF', SeriesType.Line, 0, ""))
chart.AddSeries(Series('Momentum', SeriesType.Line, 0, ""))
chart.AddSeries(Series('Value', SeriesType.Line, 0, ""))
self.AddChart(chart)
#region imports
from AlgorithmImports import *
#endregion
from collections import defaultdict
from datetime import datetime, timedelta, timezone
import math
import functools
from helper_tools import GroupBy, PortfolioSnapshot
class MultiSourcesPortfolioConstructionModel(PortfolioConstructionModel):
def __init__(self, momentumBudget, momentumBudgetVolCond, momentumNumHoldings, momentumNumHoldingsVolCond,
valueBudget, valueBudgetVolCond, valueNumHoldings, valueNumHoldingsVolCond,
numberOfTradingDaysBeforeReset, riskManagementEnabled, maxDrawdown, maxVIX, minPortfolioBufferToEnableEtfInvesting):
self.insightCollection = InsightCollection()
self.removedSymbols = []
self.nextExpiryTime = datetime.min.replace(tzinfo=timezone.utc)
self.updateFreq = timedelta(1)
self.symbolsToIgnore = set()
self.riskModel = None
self.vix = Symbol.Create("VIX", SecurityType.Index, Market.USA)
self.momentumBudget = momentumBudget
self.momentumBudgetVolCond = momentumBudgetVolCond
self.numberOfStocksMomentum = momentumNumHoldings
self.numberOfStocksMomentumVolCond = momentumNumHoldingsVolCond
self.valueBudget = valueBudget
self.valueBudgetVolCond = valueBudgetVolCond
self.numberOfStocksValue = valueNumHoldings
self.numberOfStocksValueVolCond = valueNumHoldingsVolCond
self.numberOfTradingDaysBeforeReset = numberOfTradingDaysBeforeReset
self.riskManagementEnabled = riskManagementEnabled
self.maxDrawdown = maxDrawdown
self.maxVIX = maxVIX
self.etfInvestingEnabled = True
self.minPortfolioBufferToEnableEtfInvesting = minPortfolioBufferToEnableEtfInvesting
self.scheduledEvent = None
self.IsInitRun = True
self.prevRiskMode = False
def CreateTargets(self, algorithm, insights):
targets = []
if not self.CanUpdateTargets(algorithm, insights):
return targets
self.insightCollection.AddRange(insights)
targets.extend(self.CreateFlatTargetsForRemovedSecurities())
targets.extend(self.CreateFlatTargetsForExpiredInsights(algorithm))
lastActiveInsights = self.GetLastActiveInsights(algorithm)
if self.ShouldUpdateTargets(algorithm, lastActiveInsights):
targets.extend(self.UpdatePortfolioTargets(algorithm, lastActiveInsights))
self.UpdateNextExpiryTime(algorithm)
return targets
def CanUpdateTargets(self, algorithm, insights):
return len(insights) > 0 or algorithm.UtcTime > self.nextExpiryTime
def CreateFlatTargetsForRemovedSecurities(self):
if len(self.removedSymbols) == 0:
return []
flatTargets = [PortfolioTarget(symbol, 0) for symbol in self.removedSymbols]
self.insightCollection.Clear(self.removedSymbols)
self.removedSymbols = []
return flatTargets
def CreateFlatTargetsForExpiredInsights(self, algorithm):
flatTargets = []
expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime)
if len(expiredInsights) == 0:
return flatTargets
for symbol, _ in GroupBy(expiredInsights, key = lambda insight: insight.Symbol):
if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime):
flatTargets.append(PortfolioTarget(symbol, 0))
continue
return flatTargets
def GetLastActiveInsights(self, algorithm):
activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)
lastActiveInsights = []
for k,g in GroupBy(activeInsights, key = lambda insight: (insight.Symbol, insight.SourceModel)):
lastActiveInsights.append(sorted(g, key = lambda insight: insight.GeneratedTimeUtc)[-1])
return lastActiveInsights
def ShouldUpdateTargets(self, algorithm, lastActiveInsights):
if algorithm.UtcTime > self.nextExpiryTime:
return True
for insight in lastActiveInsights:
if insight.Symbol in self.symbolsToIgnore:
continue
holding = algorithm.Portfolio[insight.Symbol]
if insight.Direction != InsightDirection.Flat and (not holding.Invested):
return True
if insight.Direction == InsightDirection.Up and (not holding.IsLong):
return True
if insight.Direction == InsightDirection.Down and (not holding.IsShort):
return True
if insight.Direction == InsightDirection.Flat and holding.Invested:
return True
else:
continue
return False
def UpdateNextExpiryTime(self, algorithm):
self.nextExpiryTime = self.insightCollection.GetNextExpiryTime()
if self.nextExpiryTime is None:
self.nextExpiryTime = algorithm.UtcTime + self.updateFreq
self.nextExpiryTime = min(self.nextExpiryTime, algorithm.UtcTime + self.updateFreq)
def UpdatePortfolioTargets(self, algorithm, lastActiveInsights):
targets = []
buffer = algorithm.Settings.FreePortfolioValuePercentage
residualBuyingPower = algorithm.Portfolio.MarginRemaining/algorithm.Portfolio.TotalPortfolioValue/algorithm.UniverseSettings.Leverage - buffer
# create PortfolioTargets for Momentum and Value Alpha Model Insights, taking into account Volatility Condition
for insight in lastActiveInsights:
if (insight.SourceModel == 'ETF') or (insight.Symbol in self.symbolsToIgnore):
continue
if algorithm.Portfolio[insight.Symbol].Invested and insight.Direction != InsightDirection.Flat:
continue
count = self.riskModel.NumberOfStocksBySourceModel[insight.SourceModel]
budget = self.riskModel.BudgetBySourceModel[insight.SourceModel]
if count*budget == 0:
target = PortfolioTarget(insight.Symbol, 0)
targets.append(target)
continue
weight = (budget * insight.Direction - buffer)/ count
target = PortfolioTarget.Percent(algorithm, insight.Symbol, weight)
if target is None:
self.symbolsToIgnore.add(insight.Symbol)
continue
targets.append(target)
residualBuyingPower -= abs(weight)
self.prevRiskMode = self.riskModel.SafeMode
# create portfolio targets for ETFs, if necessary
if self.etfInvestingEnabled:
count = len([symbol for symbol, holding in algorithm.Portfolio.items() if holding.Invested])
etfInsightSymbols = [insight.Symbol for insight in lastActiveInsights if insight.SourceModel == 'ETF' and insight.Direction != InsightDirection.Flat]
count -= len(etfInsightSymbols)
if count >= self.numberOfStocksValue + self.numberOfStocksMomentum:
etfWeight = 0
self.etfInvestingEnabled = False
if len(etfInsightSymbols) == 0:
return targets
if self.etfInvestingEnabled:
leverage = max(1, algorithm.UniverseSettings.Leverage)
currentEtfAllocation = sum([holding.AbsoluteHoldingsValue for symbol,holding in algorithm.Portfolio.items() if symbol in etfInsightSymbols]) \
/algorithm.Portfolio.TotalPortfolioValue
if (currentEtfAllocation + residualBuyingPower > self.minPortfolioBufferToEnableEtfInvesting):
etfWeight = (currentEtfAllocation/leverage + residualBuyingPower)/len(etfInsightSymbols)
else:
etfWeight = 0
self.etfInvestingEnabled = False
for insight in lastActiveInsights:
if insight.SourceModel == 'ETF' and insight.Symbol not in self.symbolsToIgnore:
target = PortfolioTarget.Percent(algorithm, insight.Symbol, etfWeight*insight.Direction)
if target is None:
self.symbolsToIgnore.add(insight.Symbol)
continue
targets.append(target)
return targets
def OnSecuritiesChanged(self, algorithm, changes):
if self.scheduledEvent is None:
self.scheduledEvent = algorithm.Schedule.On(algorithm.DateRules.EveryDay(), algorithm.TimeRules.Midnight, functools.partial(self.UpdateCharts, algorithm))
for security in changes.RemovedSecurities:
symbol = security.Symbol
if symbol.Equals(self.vix):
continue
self.removedSymbols.append(symbol)
if symbol in self.symbolsToIgnore:
self.symbolsToIgnore.remove(symbol)
for security in changes.AddedSecurities:
if security.Symbol.Equals(self.vix) and self.riskModel is None:
self.riskModel = RiskModel(algorithm, security, self.momentumBudget, self.momentumBudgetVolCond, self.numberOfStocksMomentum, self.numberOfStocksMomentumVolCond,
self.valueBudget, self.valueBudgetVolCond, self.numberOfStocksValue, self.numberOfStocksValueVolCond,
self.riskManagementEnabled, self.maxDrawdown, self.maxVIX, self.numberOfTradingDaysBeforeReset)
def UpdateCharts(self, algorithm):
activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)
holdingsCountBySourceModel = defaultdict(int)
for k,g in GroupBy(activeInsights, key = lambda insight: insight.SourceModel):
holdingsCountBySourceModel[k] = len({x.Symbol for x in g if x.Direction != InsightDirection.Flat and algorithm.Portfolio[x.Symbol].Invested})
for sourceModel in ['ETF', 'Momentum', 'Value']:
algorithm.Plot('Number of Stocks', sourceModel, int(holdingsCountBySourceModel[sourceModel]))
totalCount = len([holding for _,holding in algorithm.Portfolio.items() if holding.Invested])
algorithm.Plot('Number of Stocks', 'Total', totalCount)
class RiskModel:
def __init__(self, algorithm, security,
momentumBudget, momentumBudgetVolCond, numberOfStocksMomentum, numberOfStocksMomentumVolCond,
valueBudget, valueBudgetVolCond, numberOfStocksValue, numberOfStocksValueVolCond,
riskManagementEnabled, maxDrawdown, maxVIX, numberOfTradingDaysBeforeReset):
self.algorithm = algorithm
self.Security = security
self.Symbol = security.Symbol
self.portfolioHigh = algorithm.Portfolio.TotalPortfolioValue
self.SafeMode = False
self.safeModeExpiryTime = datetime.min
self.momentumBudget = momentumBudget
self.momentumBudgetDefault = momentumBudget
self.momentumBudgetVolCond = momentumBudgetVolCond
self.numberOfStocksMomentum = numberOfStocksMomentum
self.numberOfStocksMomentumDefault = numberOfStocksMomentum
self.numberOfStocksMomentumVolCond = numberOfStocksMomentumVolCond
self.valueBudget = valueBudget
self.valueBudgetDefault = valueBudget
self.valueBudgetVolCond = valueBudgetVolCond
self.numberOfStocksValue = numberOfStocksValue
self.numberOfStocksValueDefault = numberOfStocksValue
self.numberOfStocksValueVolCond = numberOfStocksValueVolCond
self.numberOfStocksBySourceModel = {'Momentum': numberOfStocksMomentum, 'Value': numberOfStocksValue}
self.riskManagementEnabled = riskManagementEnabled
self.maxDrawdown = -abs(maxDrawdown)
self.maxVIX = maxVIX
self.numberOfTradingDaysBeforeReset = timedelta(days=numberOfTradingDaysBeforeReset)
self.budgetBySourceModel = defaultdict(float)
self.budgetBySourceModel.update({'Momentum':momentumBudget, 'Value':valueBudget})
self.hour = None
algorithm.Schedule.On(algorithm.DateRules.EveryDay(self.Symbol), algorithm.TimeRules.Every(timedelta(hours=1)), self.Update)
@property
def BudgetBySourceModel(self):
if not self.riskManagementEnabled:
return self.budgetBySourceModel
self.budgetBySourceModel.update({"Momentum":self.momentumBudget, "Value":self.valueBudget})
return self.budgetBySourceModel
@property
def NumberOfStocksBySourceModel(self):
if not self.riskManagementEnabled:
return self.numberOfStocksBySourceModel
self.numberOfStocksBySourceModel.update({'Momentum':self.numberOfStocksMomentum, 'Value':self.numberOfStocksValue})
return self.numberOfStocksBySourceModel
def Update(self):
if not self.SafeMode and self.algorithm.Time > self.safeModeExpiryTime:
self.momentumBudget = self.momentumBudgetDefault
self.valueBudget = self.valueBudgetDefault
self.numberOfStocksMomentum = self.numberOfStocksMomentumDefault
self.numberOfStocksValue = self.numberOfStocksValueDefault
self.SafeMode = self.Security.Price > self.maxVIX and self.CurrentDrawdown < self.maxDrawdown
if self.SafeMode:
self.momentumBudget = self.momentumBudgetVolCond
self.valueBudget = self.valueBudgetVolCond
self.numberOfStocksMomentum = self.numberOfStocksMomentumVolCond
self.numberOfStocksValue = self.numberOfStocksValueVolCond
self.algorithm.Debug(f"{self.algorithm.Time} | Volatility Condition triggered!\nAdjusting Portfolio Targets and permitted Number of Stocks for each Alpha Model.")
else:
if self.SafeMode and self.Security.Price < self.maxVIX and self.CurrentDrawdown > self.maxDrawdown:
self.safeModeExpiryTime = self.algorithm.Time + self.numberOfTradingDaysBeforeReset
self.SafeMode = False
self.algorithm.Debug(f"{self.algorithm.Time} | Volatility Condition removed!\nKeep Risk Management Settings unchanged until {self.safeModeExpiryTime}.")
@property
def CurrentDrawdown(self):
currentTPV = self.algorithm.Portfolio.TotalPortfolioValue
if currentTPV > self.portfolioHigh:
self.portfolioHigh = currentTPV
return currentTPV/self.portfolioHigh - 1
#region imports
from AlgorithmImports import *
#endregion
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
class CustomUniverseSelectionModel(FundamentalUniverseSelectionModel):
def __init__(self, maxNumberOfSymbolsCoarse, maxNumberOfSymbolsFine, etfTickers):
self.nextSelectionUpdate = datetime.min
self.maxNumberOfSymbolsCoarse = maxNumberOfSymbolsCoarse
self.maxNumberOfSymbolsFine = maxNumberOfSymbolsFine
self.etfSymbols = [Symbol.Create(ticker, SecurityType.Equity, Market.USA) for ticker in etfTickers]
self.vix = Symbol.Create("VIX", SecurityType.Index, Market.USA)
self.IsInitRun = True
super().__init__(True, None)
def SelectCoarse(self, algorithm, coarse):
if algorithm.Time < self.nextSelectionUpdate:
return Universe.Unchanged
TPV = algorithm.Portfolio.TotalPortfolioValue
coarseFiltered = [c for c in coarse if (c.HasFundamentalData and (c.Price < TPV/100) and (algorithm.Time - c.Symbol.ID.Date).days > 365)]
sortedByDollarVolume = sorted(coarseFiltered, key = lambda c: c.DollarVolume, reverse = False)
return [c.Symbol for c in sortedByDollarVolume][:self.maxNumberOfSymbolsCoarse]
def SelectFine(self, algorithm, fine):
filteredFine = [f for f in fine if (f.CompanyReference.CountryId in {"USA"}
and f.CompanyReference.PrimaryExchangeID in {"NYS", "NAS"}
and (algorithm.Time - f.SecurityReference.IPODate).days > 360
and f.CompanyReference.IndustryTemplateCode not in {"I", "B"}
and 1e8 < f.MarketCap #< 1e10
and (f.FinancialStatements.BalanceSheet.TotalLiabilitiesAsReported.Value - f.FinancialStatements.BalanceSheet.CashAndCashEquivalents.Value
< f.FinancialStatements.BalanceSheet.TangibleBookValue.Value*1.25 ))]
# Note: .Value refers to the default period field which is "TwelveMonths". You can also use "ThreeMonths".
sortedByMarketCap = sorted(filteredFine, key = lambda f: f.MarketCap, reverse=False)
self.nextSelectionUpdate = Expiry.EndOfMonth(algorithm.Time) - timedelta(days=3)
selection = [f.Symbol for f in sortedByMarketCap][:self.maxNumberOfSymbolsFine] + self.etfSymbols + [self.vix] + self.CurrentHoldings(algorithm)
selection.extend(self.LoadSymbolsFromObjectStore(algorithm))
return selection
def LoadSymbolsFromObjectStore(self, algorithm):
# load symbols from the object store which insights haven't expired yet to ensure we have a data feed for those symbols
if not (algorithm.LiveMode and self.IsInitRun):
return []
suffix = 'InsightCollection'
keys = [x.Key for x in algorithm.ObjectStore if x.Key.endswith(suffix)]
dfList = []
symbols = []
for key in keys:
storedInsightsDf = pd.read_json(algorithm.ObjectStore.Read(key), orient='split')
storedInsightsDf.loc[:, 'Period'] = pd.to_timedelta(storedInsightsDf.Period, unit='ms')
storedInsightsDf.loc[:, 'GeneratedTimeUtc'] = pd.to_datetime(storedInsightsDf.GeneratedTimeUtc, unit='ms', utc=True)
storedInsightsDf.loc[:, 'CloseTimeUtc'] = pd.to_datetime(storedInsightsDf.CloseTimeUtc, unit='ms', utc=True)
filteredInsightsDf = storedInsightsDf.loc[(storedInsightsDf.SourceModel.eq(key.split(suffix)[0])
& storedInsightsDf.CloseTimeUtc.gt(algorithm.UtcTime)
& storedInsightsDf.Direction.eq(1)
& storedInsightsDf.GeneratedTimeUtc.lt(algorithm.UtcTime))]
if not filteredInsightsDf.empty:
dfList.append(filteredInsightsDf)
if len(dfList) > 0:
df = pd.concat(dfList)
symbols = df.Symbol.apply(SymbolCache.GetSymbol).unique().tolist()
self.IsInitRun = False
return symbols
def CurrentHoldings(self, algorithm):
return [security.Key for security in algorithm.ActiveSecurities if security.Value.Holdings.Invested]