| Overall Statistics |
|
Total Trades 14353 Average Win 0.15% Average Loss -0.09% Compounding Annual Return 39.243% Drawdown 14.300% Expectancy 0.539 Net Profit 2993.856% Sharpe Ratio 1.846 Probabilistic Sharpe Ratio 99.618% Loss Rate 41% Win Rate 59% Profit-Loss Ratio 1.62 Alpha 0.236 Beta 0.331 Annual Standard Deviation 0.146 Annual Variance 0.021 Information Ratio 0.999 Tracking Error 0.167 Treynor Ratio 0.815 Total Fees $614549.05 Estimated Strategy Capacity $2600000.00 Lowest Capacity Asset VIXM UT076X30D0MD |
#region imports
from AlgorithmImports import *
#endregion
def GetAlphaParameters(self, alphaName=None, parameter=None):
stocksTicker = 'TQQQ'
spyTicker = 'SPXL'
bondsTicker = 'TMF'
cashTicker = 'IEF'
inflTicker = 'TIP'
goldTicker = 'DGP'
volatilityTicker = 'UVXY'
volTicker = 'VIXM'
mmTicker = 'UUP'
oilTicker = 'UCO'
#volatilityFactor = 1
minutesAfterStart = 90
alphaParametersDict = {'RotationalOptimizer': {'activate': True,
'riskyAssetsParameters': {
'SPY': {'addTicker': [True, spyTicker],
'sma': [200, (-0.10, 0.10), 0.0],
'macd': [(231, 567, 168), 0, 0.3],
'yield': [True, 0.20],
'optimizationBounds': (0, 1)},
'QQQ': {'addTicker': [True, stocksTicker],
'sma': [200, (-0.20, 0.20), 0.0],
'macd': [(231, 567, 168), 0, 0.3],
'yield': [True, 0.20],
'optimizationBounds': (0, 1)},
'TLT': {'addTicker': [True, bondsTicker],
'sma': [300, (-0.15, 0.15), 0.0],
'macd': [(50, 150, 30), 0, 0.0],
'yield': [False, 0],
'optimizationBounds': (0, 1)},
'GLD': {'addTicker': [True, goldTicker],
'sma': [100, (-0.10, 0.10), 0.0],
'macd': [(50, 150, 30), 0, 0.0],
'yield': [False, 0],
'optimizationBounds': (0, 1)},
'UUP':{'addTicker': [False, 'UUP'],
'sma': [200, (-0.20, 0.20), 0],
'macd': [(231, 567, 168), 0, 0.3],
'yield': [False, 0.20],
'optimizationBounds': (0, 0.2)},
},
'lookbackOptimization': 63,
'objFunction': 'minVariance',
'tickerCash': cashTicker,
'rebalancingFrequency': (self.DateRules.WeekStart, minutesAfterStart),
'optimizationBounds': (0.01, 1.00)},
#'optimizationBounds': (0.40, 1.00)},
'RotationalOptimizer2x': {'activate': False,
'riskyAssetsParameters': {
'SPY': {'addTicker': [True, spyTicker],
'sma': [200, (-0.10, 0.10), 0],
'macd': [(231, 567, 168), 0, 0.3],
'yield': [True, 0.20],
'optimizationBounds': (0, 1)},
'QQQ': {'addTicker': [True, stocksTicker],
'sma': [200, (-0.20, 0.20), 0],
'macd': [(231, 567, 168), 0, 0.3],
'yield': [True, 0.20],
'optimizationBounds': (0, 1)},
'TLT': {'addTicker': [True, bondsTicker],
'sma': [300, (-0.15, 0.15), 0],
'macd': [(50, 150, 30), 0, 0],
'yield': [False, 0],
'optimizationBounds': (0, 1)},
'GLD': {'addTicker': [True, goldTicker],
'sma': [100, (-0.10, 0.10), 0],
'macd': [(50, 150, 30), 0, 0],
'yield': [False, 0],
'optimizationBounds': (0, 1)},
'UUP':{'addTicker': [False, mmTicker],
'sma': [200, (-0.20, 0.20), 0],
'macd': [(231, 567, 168), 0, 0.3],
'yield': [False, 0.20],
'optimizationBounds': (0, 1)},
},
'lookbackOptimization': 42,
'objFunction': 'minVariance',
'tickerCash': cashTicker,
'rebalancingFrequency': (self.DateRules.EveryDay, minutesAfterStart),
'optimizationBounds': (0.01, 1.00)},
'RotationalOptimizer1x': {'activate': False,
'riskyAssetsParameters': {
'SPY': {'addTicker': [True, spyTicker],
'sma': [200, (-0.10, 0.10), 0],
'macd': [(231, 567, 168), 0, 0.3],
'yield': [True, 0.20],
'optimizationBounds': (0, 1)},
'QQQ': {'addTicker': [True, stocksTicker],
'sma': [200, (-0.20, 0.20), 0],
'macd': [(231, 567, 168), 0, 0.3],
'yield': [True, 0.20],
'optimizationBounds': (0, 1)},
'TLT': {'addTicker': [True, bondsTicker],
'sma': [300, (-0.15, 0.15), 0],
'macd': [(50, 150, 30), 0, 0],
'yield': [False, 0],
'optimizationBounds': (0, 1)},
'GLD': {'addTicker': [True, goldTicker],
'sma': [100, (-0.10, 0.10), 0],
'macd': [(50, 150, 30), 0, 0],
'yield': [False, 0],
'optimizationBounds': (0, 1)},
'UUP':{'addTicker': [True, mmTicker],
'sma': [200, (-0.20, 0.20), 1],
'macd': [(231, 567, 168), 0, 1],
'yield': [False, 0.20],
'optimizationBounds': (0, 1)},
},
'lookbackOptimization': 21,
'objFunction': 'minVariance',
'tickerCash': cashTicker,
'rebalancingFrequency': (self.DateRules.MonthStart, minutesAfterStart),
'optimizationBounds': (0.01, 0.02)},
'BuyAndHoldSVOL': {'activate': False,
'tickersWeights': {'TIP': 0.80, 'IEF': 0.00, mmTicker : 0.10, stocksTicker: 0.10, bondsTicker: 0.00, goldTicker: 0.00, cashTicker: 0, spyTicker: 0},
'rebalancingFrequency': (self.DateRules.MonthStart, minutesAfterStart),
'optimizationBounds': (0.01, 0.01)},
'VixTermStructure': {'activate': True,
'tickersWeights': { volatilityTicker: [0.08, 0.00, 0.08, 0.08, 0.16, 0.80],
stocksTicker: [0.65, 0.40, 0.40, 0.32, 0.32, 0.00],
bondsTicker: [0.00, 0.12, 0.08, 0.08, 0.08, 0.00],
volTicker: [0.08, 0.00, 0.08, 0.08, 0.08, 0.08],
cashTicker: [0.00, 0.00, 0.00, 0.00, 0.00, 0.00],
goldTicker: [0.00, 0.00, 0.00, 0.00, 0.00, 0.00],
inflTicker: [0.00, 0.00, 0.00, 0.00, 0.00, 0.00],
mmTicker: [0.19, 0.48, 0.36, 0.44, 0.36, 0.12]
},
'canaryTicker': 'SPY',
'periodShortSMA': 3,
'periodLongSMA': 13,
'contangoThresholds': [1.25, 0.923],
'rebalancingFrequency': (self.DateRules.EveryDay, minutesAfterStart),
'optimizationBounds': (0.01, 0.02)},
#'optimizationBounds': (0.25, 1.00)},
'MovingAverageSPY': {'activate': False,
'tickersWeights': {
stocksTicker : [0.20, 0.20, 0.00],
bondsTicker: [0.00, 0.00, 0.00],
goldTicker: [0.00, 0.00, 0.00],
cashTicker: [0.00, 0.00, 0.00],
inflTicker: [0.60, 0.60, 0.50],
mmTicker: [0.20, 0.20, 0.50],
},
'canaryTicker': 'SPY',
'periodShortSMA': 1,
'periodLongSMA': 168,
'movingAverageThresholds': [1.1, 1.0],
'rebalancingFrequency': (self.DateRules.WeekStart, minutesAfterStart),
'optimizationBounds': (0.01, 0.20)},
'InAndOut': {'activate': True,
'assetsWeights': {'stocksProxy': [stocksTicker, 0.6],
'bondsProxy': [bondsTicker, 0.4],
'cashProxy': [cashTicker, 0],
'inflProxy': [inflTicker, 0],
'goldProxy': [goldTicker, 0],
'volatilityProxy': [volatilityTicker, 0],
'volProxy': [volTicker, 0],
'mmProxy': [mmTicker, 0],
'GSY': ['GSY', 0]
},
'rebalancingFrequency': (self.DateRules.EveryDay, minutesAfterStart),
'optimizationBounds': (0.50, 1.00)},
#'optimizationBounds': (0.99, 1.00)},
}
# if no filtering elements provided, return the whole dictionary
if alphaName is None and parameter is None:
return alphaParametersDict
return alphaParametersDict[alphaName][parameter]
#region imports
from AlgorithmImports import *
#endregion
import pandas as pd
import general_utils as utils
import json
class BuyAndHoldAlphaCreationModel(AlphaModel):
def __init__(self, algorithm, name, activateAlpha, parametersDict, rebalancingFrequency):
self.algo = algorithm
self.Name = name
self.activateAlpha = activateAlpha
self.parametersDict = parametersDict
self.rebalancingFrequency = rebalancingFrequency
self.weightsDict = {}
self.timeToEmitInsights = False
self.weightsRetrieved = False
self.monthStartIsHere = False
# retrieve information from object store in live mode
if self.algo.LiveMode:
dictFromOS = utils.ReadFromObjectStore(self.algo, 'BuyAndHoldSVOL')
if dictFromOS:
self.algo.Log('BuyAndHoldSVOL; retrieving last optimal weights from object store: ' + str(dictFromOS))
self.weightsDict = {self.algo.Symbol(ticker): weight for ticker, weight in dictFromOS.items()}
self.weightsRetrieved = True
def Update(self, algorithm, data):
insights = []
if not self.timeToEmitInsights:
return insights
insightExpiry = Expiry.EndOfDay(algorithm.Time)
# loop through insights and send portfolio targets
for symbol, weight in self.weightsDict.items():
if symbol.Value.split('_')[0] != 'CUSTOM' and not data.ContainsKey(symbol):
continue
if weight > 0:
insightDirection = InsightDirection.Up
elif weight < 0:
insightDirection = InsightDirection.Down
else:
insightDirection = InsightDirection.Flat
insights.append(Insight.Price(symbol, insightExpiry, insightDirection, None, None, None, abs(weight)))
self.timeToEmitInsights = False
return insights
def GenerateSignals(self):
""" Create new signals """
if not self.activateAlpha:
return
if self.monthStartIsHere or self.weightsRetrieved:
self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value]
for security in self.algo.ActiveSecurities.Values
if security.Symbol.Value in self.parametersDict}
self.weightsRetrieved = False
self.timeToEmitInsights = True
if self.algo.LiveMode or self.algo.BackTestWarmUp == True:
dictToOS = {symbol.Value: weight for symbol, weight in self.weightsDict.items()}
utils.SaveToObjectStore(self.algo, 'BuyAndHoldSVOL', dictToOS)
self.monthStartIsHere = False
def MonthStart(self):
self.monthStartIsHere = True
def OnSecuritiesChanged(self, algorithm, changes):
""" Actions to take every time the universe changes """
for security in changes.AddedSecurities:
if security.Symbol.Value == 'SPY':
symbol = security.Symbol
algorithm.Schedule.On(self.rebalancingFrequency[0](symbol),
algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]),
self.MonthStart)
algorithm.Schedule.On(algorithm.DateRules.EveryDay(symbol),
algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]),
self.GenerateSignals)
#region imports
from AlgorithmImports import *
#endregion
from itertools import groupby
from OptimizerClass import PortfolioOptimizer
import numpy as np
import pandas as pd
import general_utils as utils
class CompositePortfolioConstructionModel(PortfolioConstructionModel):
def __init__(self, algorithm, alphaModels, optimizationBounds, alphaPortfolioOptLookback=21,
alphaPortfolioOptObjFunction='minVariance'):
self.algo = algorithm
self.alphasInsightReturns = {alphaModel: {} for alphaModel in alphaModels} # nested dict for insight returns
self.alphasEqualWeighting = 1 / len(self.alphasInsightReturns.keys())
self.alphasDailyReturns = {alphaModel: {} for alphaModel in alphaModels} # nested dict for daily returns
self.alphasCumReturns = {alphaModel: 1 for alphaModel in alphaModels} # store alphas cumulative returns
self.alphasLastPeak = {alphaModel: 1 for alphaModel in alphaModels} # store alphas last peak for DD calc
self.algo.alphasDD = {alphaModel: 0 for alphaModel in alphaModels} # store alphas DD
self.algo.leverageFactors = {alphaModel: 0 for alphaModel in alphaModels} # store alphas DD
self.alphasOptWeights = {alphaModel: 0 for alphaModel in alphaModels} # store alphas optimal allocations
self.alphasAccuracy = {alphaModel: 0 for alphaModel in alphaModels} # store alphas accuracy
self.entryDirection = {alphaModel: {} for alphaModel in alphaModels} # store alphas insights entry direction
self.previousPrice = {alphaModel: {} for alphaModel in alphaModels} # store alphas insights previous price
# variables for alpha weight optimization
self.alphaPortfolioOptLookback = alphaPortfolioOptLookback # number of days for rolling window of returns
self.alphaPortfolioOptObjFunction = alphaPortfolioOptObjFunction
self.allowAlphaWeightOptimization = True
self.optAlphaAllocationReady = False
self.optimizer = PortfolioOptimizer()
self.optimizationBounds = optimizationBounds
self.insightCollection = InsightCollection()
self.lastActiveInsights = []
self.removedSymbols = []
self.portfolioLastValue = 0
self.portfolioCumReturns = 1
self.portfolioLastPeak = 1
self.DDDecay = 1
self.lastDD = 0
self.symbolsWeights = {}
# retrieve information from object store in live mode
if self.algo.LiveMode:
alphasOptWeightsOS = utils.ReadFromObjectStore(self.algo, 'AlphasOptWeights')
if alphasOptWeightsOS:
self.alphasOptWeights = alphasOptWeightsOS
self.optAlphaAllocationReady = True
self.algo.Log('Portfolio; retrieving alphasOptWeights from object store: ' + str(self.alphasOptWeights))
alphasDdOS = utils.ReadFromObjectStore(self.algo, 'AlphasDD')
if alphasDdOS:
self.algo.alphasDD = alphasDdOS
self.algo.Log('Portfolio; retrieving alphasDD from object store: ' + str(self.algo.alphasDD))
alphasDDDecay = utils.ReadFromObjectStore(self.algo, 'DD-Decay')
if alphasDDDecay:
self.DDDecay = alphasDDDecay
self.algo.Log('Portfolio; retrieving Decay Factor from object store: ' + str(self.DDDecay))
alphasCumReturns = utils.ReadFromObjectStore(self.algo, 'Cum-Returns')
alphasLastPeak = utils.ReadFromObjectStore(self.algo, 'Last-Peak')
if alphasCumReturns and alphasLastPeak:
self.portfolioCumReturns = alphasCumReturns
self.portfolioLastPeak = alphasLastPeak
self.algo.Log('Portfolio; retrieving Cumulative Returns from object store: ' + str(self.portfolioCumReturns))
self.algo.Log('Portfolio; retrieving Last Peak from object store: ' + str(self.portfolioLastPeak))
algorithm.Schedule.On(algorithm.DateRules.EveryDay(), algorithm.TimeRules.At(0, 1), self.GetTodayDate)
self.GetTodayDate()
def CreateTargets(self, algorithm, insights):
"""
Description:
Create portfolio targets from the specified insights
Args:
algorithm: The algorithm instance
insights: The insights to create portfolio targets from
Returns:
An enumerable of portfolio targets to be sent to the execution model
"""
targets = []
# skip if no action to be taken
if not self.ShouldCreateTargets(algorithm, insights) and self.removedSymbols is None:
return targets
# create flatten target for each security that was removed from the universe
if self.removedSymbols is not None:
universeDeselectionTargets = [PortfolioTarget(symbol, 0) for symbol in self.removedSymbols]
algorithm.Log('liquidate any open positions for removed symbols: '
+ str([x.Value for x in self.removedSymbols]))
targets.extend(universeDeselectionTargets)
self.removedSymbols = None
# add new insights to insight collection
self.insightCollection.AddRange(insights)
# more than one alpha model can generate insights for the same symbol
# we group by alpha model and symbol, and keep the most recent insight
self.lastActiveInsights = []
for sourceModel, f in groupby(sorted(self.insightCollection, key=lambda ff: ff.SourceModel),
lambda fff: fff.SourceModel):
for symbol, g in groupby(sorted(list(f), key=lambda gg: gg.Symbol), lambda ggg: ggg.Symbol):
self.lastActiveInsights.append(sorted(g, key=lambda x: x.GeneratedTimeUtc)[-1])
# calculate targets ----------------------------------------------------
symbolsNewWeights = {}
for insight in self.lastActiveInsights:
if not algorithm.ActiveSecurities.ContainsKey(insight.Symbol):
continue
# check if we can do optimal alpha allocation and if we have enough data for it
if self.optAlphaAllocationReady:
# insight weight * the optimal allocation for the model
weight = insight.Direction * insight.Weight * self.alphasOptWeights[insight.SourceModel]
else:
weight = insight.Direction * insight.Weight * self.alphasEqualWeighting
# apply leverage
weight *= self.algo.accountLeverage
# update returns if we close the position with flat insight
if (insight.Direction == InsightDirection.Flat
and self.entryDirection[insight.SourceModel].get(insight.Symbol) is not None
and self.entryDirection[insight.SourceModel].get(insight.Symbol) != 0):
self.UpdateReturnsDictionaries(algorithm, insight)
self.entryDirection[insight.SourceModel].pop(insight.Symbol)
self.previousPrice[insight.SourceModel].pop(insight.Symbol)
elif insight.Direction != InsightDirection.Flat:
if (self.entryDirection[insight.SourceModel].get(insight.Symbol) is None
and self.previousPrice[insight.SourceModel].get(insight.Symbol) is None):
self.entryDirection[insight.SourceModel][insight.Symbol] = insight.Direction
# simulate entry price
self.previousPrice[insight.SourceModel][insight.Symbol] = algorithm.ActiveSecurities[insight.Symbol].Price
# add weight to weightBySymbol
if insight.Symbol not in symbolsNewWeights:
symbolsNewWeights[insight.Symbol] = weight
else:
symbolsNewWeights[insight.Symbol] += weight
algorithm.Log(str(insight.SourceModel) + '; ' + str(insight.Symbol.Value) + '; '
+ str(insight.Direction) + '; ' + str(round(weight, 2)))
countWeightChangesAbovethreshold = self.CountWeightChangesAboveThreshold(symbolsNewWeights)
if countWeightChangesAbovethreshold > 0 or not self.symbolsWeights or self.algo.stopLossTriggered == True:
self.symbolsWeights = symbolsNewWeights
self.algo.stopLossTriggered == False
else:
return targets
if self.algo.allAssetsBelowMA and self.algo.uup == 1:
usdallocation = 1
else:
usdallocation = 0.1
# we add together all the weights for each symbol
for symbol, finalWeight in self.symbolsWeights.items():
if symbol.Value == 'UUP':
target = PortfolioTarget.Percent(algorithm, symbol, finalWeight*self.DDDecay + (1-self.DDDecay)*usdallocation)
elif symbol.Value == 'GSY':
target = PortfolioTarget.Percent(algorithm, symbol, finalWeight*self.DDDecay + (1-self.DDDecay)*(1-usdallocation))
else:
target = PortfolioTarget.Percent(algorithm, symbol, finalWeight*self.DDDecay)
targets.append(target)
algorithm.Log('final weights; ' + str(symbol.Value) + '; ' + str(round(finalWeight, 2)))
#self.algo.Plot('Alphas Ticker Allocation', symbol.Value, float(round(finalWeight, 2)) * 100)
return targets
def EveryDayAtOpen(self):
""" Update dictionaries every day at the open """
# update and plot portfolio cumulative returns
self.PlotPortfolioInfo()
# update daily returns -------------------------------------------------
for insight in self.lastActiveInsights:
if (not self.algo.ActiveSecurities.ContainsKey(insight.Symbol)
or not self.algo.ActiveSecurities[insight.Symbol].Invested
or insight.Direction == InsightDirection.Flat):
continue
self.UpdateReturnsDictionaries(self.algo, insight)
# update previous price
self.previousPrice[insight.SourceModel][insight.Symbol] = self.algo.ActiveSecurities[insight.Symbol].Price
# calculate the optimal weights ----------------------------------------
# create a dataframe of returns of last n days from the dictionary alphasDailyReturns
dailyReturnsDf = pd.DataFrame(self.alphasDailyReturns)[-self.alphaPortfolioOptLookback:]
dailyReturnsDf = dailyReturnsDf.dropna()
# run optimization if needed, otherwise keep using the alphasOptWeights from last optimization
if self.allowAlphaWeightOptimization:
# wait until we have at least a week of returns
if len(dailyReturnsDf) >= 5:
try:
bounds = [self.optimizationBounds[x] for x in dailyReturnsDf.columns]
newAlphasOptWeights = self.optimizer.Optimize(objFunction=self.alphaPortfolioOptObjFunction,
dailyReturnsDf=dailyReturnsDf, bounds=bounds)
self.alphasOptWeights = pd.Series(newAlphasOptWeights, index=dailyReturnsDf.columns).to_dict()
# Add left-over allocation to default alpha
leftoverallocation = max(0,1 - sum(self.alphasOptWeights.values()))
if leftoverallocation !=0:
self.alphasOptWeights['BuyAndHoldSVOL'] = self.alphasOptWeights['BuyAndHoldSVOL'] + leftoverallocation
self.algo.Debug('LO Allc ' + str(leftoverallocation) + '/n ' + 'BH Allc ' + str(self.alphasOptWeights['BuyAndHoldSVOL']))
self.allowAlphaWeightOptimization = False
self.algo.Log('optimal weights for alphas: ' + '\n' + str(newAlphasOptWeights))
except BaseException as e:
self.algo.Log('Optimize failed due to ' + str(e))
# check if all alphas have been optimized and have positive weights
self.optAlphaAllocationReady = False
if all(x > 0 for x in self.alphasOptWeights.values()):
self.optAlphaAllocationReady = True
# for each alpha model -------------------------------------------------
for key in self.alphasInsightReturns:
# update the cumulative returns for each alpha model and plot it
filteredDailyReturns = pd.DataFrame.from_dict({key: value
for key, value in self.alphasDailyReturns[key].items()
if not np.isnan(value)}, orient='index')
self.alphasCumReturns[key] = (filteredDailyReturns.add(1).cumprod().iloc[-1]
if not filteredDailyReturns.empty else pd.Series(1))[0]
#self.algo.Plot('Alphas Cumulative Returns (%)', key, float((self.alphasCumReturns[key] - 1) * 100))
# update DD for each alpha and plot it
if self.alphasCumReturns[key] > self.alphasLastPeak[key]:
self.alphasLastPeak[key] = self.alphasCumReturns[key]
self.algo.alphasDD[key] = (self.alphasCumReturns[key] / self.alphasLastPeak[key]) - 1
#self.algo.Plot('Alphas DD (%)', key, float(self.alphasDD[key]) * 100)
# get the optimal allocation per alpha and plot it
#if self.optAlphaAllocationReady:
# self.algo.Plot('Alphas Allocation (%)', key, float(self.alphasOptWeights[key] * 100))
#else:
# self.algo.Plot('Alphas Allocation (%)', key, float(self.alphasEqualWeighting * 100))
# calculate the accuracy per model for last n insights and plot it
# these are the individual returns of last n insights
# (we're using lookback times X, assuming X insights per day)
#flatReturns = [val for sublist in self.alphasInsightReturns[key].values()
# for val in sublist][-(self.alphaPortfolioOptLookback * 5):]
#if len(flatReturns) > 0:
# self.alphasAccuracy[key] = sum(x > 0 for x in flatReturns) / len(flatReturns)
# self.algo.Plot('Alphas Accuracy (% of Positive Days)', key, float(self.alphasAccuracy[key] * 100))
if self.algo.LiveMode or self.algo.BackTestWarmUp == True:
utils.SaveToObjectStore(self.algo, 'AlphasOptWeights', self.alphasOptWeights)
utils.SaveToObjectStore(self.algo, 'AlphasDD', self.algo.alphasDD)
utils.SaveToObjectStore(self.algo, 'DD-Decay', self.DDDecay)
utils.SaveToObjectStore(self.algo, 'Cum-Returns', self.portfolioCumReturns)
utils.SaveToObjectStore(self.algo, 'Last-Peak', self.portfolioLastPeak)
def TriggerAlphaWeightOptimization(self):
""" Allow alpha weight optimization to happen """
self.allowAlphaWeightOptimization = True
def OnSecuritiesChanged(self, algorithm, changes):
""" Actions to take every time the universe changes """
# get removed symbol and invalidate them in the insight collection
self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities]
self.insightCollection.Clear(self.removedSymbols)
for security in changes.AddedSecurities:
if security.Symbol.Value == 'SPY':
symbol = security.Symbol
algorithm.Schedule.On(algorithm.DateRules.EveryDay(symbol),
algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes+10),
self.EveryDayAtOpen)
algorithm.Schedule.On(algorithm.DateRules.WeekStart(symbol),
algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes ),
self.TriggerAlphaWeightOptimization)
def ShouldCreateTargets(self, algorithm, insights):
"""
Description:
Determine whether we should rebalance the portfolio when:
- We want to include some new security in the portfolio
- We want to modify the direction of some existing security
Args:
insights: The last set of insights sent
"""
for insight in insights:
if not algorithm.Portfolio[insight.Symbol].Invested and insight.Direction != InsightDirection.Flat:
return True
elif algorithm.Portfolio[insight.Symbol].Invested:
return True
return False
def CalculateInsightReturn(self, algorithm, insight):
""" Calculate the returns from the insights of each model as if 100% was invested in the model,
with daily rebalancing """
insightReturn = 0
if (algorithm.ActiveSecurities[insight.Symbol].Price != 0
and self.entryDirection[insight.SourceModel].get(insight.Symbol) is not None
and self.previousPrice[insight.SourceModel].get(insight.Symbol) is not None):
previousPrice = self.previousPrice[insight.SourceModel][insight.Symbol]
currentPrice = algorithm.ActiveSecurities[insight.Symbol].Price
entryDirection = self.entryDirection[insight.SourceModel][insight.Symbol]
weight = insight.Weight * self.algo.accountLeverage
# return the insight return (weight-adjusted)
insightReturn = entryDirection * (currentPrice / previousPrice - 1) * weight
return insightReturn
def UpdateReturnsDictionaries(self, algorithm, insight):
""" Update alphasInsightReturns and alphasDailyReturns """
# calculate the insight return
insightReturn = self.CalculateInsightReturn(algorithm, insight)
# append the dictionary of insight returns by model
self.alphasInsightReturns[insight.SourceModel][self.todayDate].append(insightReturn)
# update the dictionary of daily returns by model
self.alphasDailyReturns[insight.SourceModel][self.todayDate] = sum(self.alphasInsightReturns[insight.SourceModel][self.todayDate])
def CountWeightChangesAboveThreshold(self, symbolsNewWeights, threshold=0.025):
""" Check how many new targets deviate enough from current targets """
# calculate the change in new weights vs current weights
weightChanges = [abs(symbolsNewWeights[symbol] - weight) for symbol, weight in self.symbolsWeights.items()]
# count how many are above a certain threshold
countWeightChangesAbovethreshold = sum(x for x in weightChanges if x > threshold)
return countWeightChangesAbovethreshold
def PlotPortfolioInfo(self):
""" Update and plot the portfolio cumulative returns and DD """
if self.portfolioLastValue == 0:
self.portfolioLastValue = self.algo.Portfolio.TotalPortfolioValue
# update and plot portfolio cumulative returns
currentPortfolioValue = self.algo.Portfolio.TotalPortfolioValue
portfolioDailyReturn = (currentPortfolioValue / self.portfolioLastValue - 1)/self.DDDecay/self.algo.accountLeverage
self.portfolioCumReturns *= (1 + portfolioDailyReturn)
#self.algo.Plot('Alphas Cumulative Returns (%)', 'Portfolio', float((self.portfolioCumReturns - 1) * 100))
# update and plot portfolio DD
if self.portfolioCumReturns > self.portfolioLastPeak:
self.portfolioLastPeak = self.portfolioCumReturns
currentPortfolioDD = (self.portfolioCumReturns / self.portfolioLastPeak) - 1
self.algo.Plot('Alphas DD (%)', 'Portfolio', float(currentPortfolioDD) * 100)
if currentPortfolioDD > self.lastDD:
DDreducing = False
else:
DDreducing = True
self.lastDD = currentPortfolioDD
sp = 0.75
maxDDLevel = 1
#ddLevels = [0.001, 0.05, 0.10, 0.15]
#decayLevels = [0.750, 0.50, 0.25, 0.10]
ddLevels = [0.001, 0.05, 0.10, 0.15]
decayLevels = [0.750, 0.75, 0.25, 0.10]
if currentPortfolioDD < ddLevels[3]*-1 :
if DDreducing: self.DDDecay = min(maxDDLevel,max(sp*decayLevels[3], self.DDDecay*1.1))
else: self.DDDecay = max(sp*decayLevels[3], min(sp*decayLevels[2],self.DDDecay*0.9))
if currentPortfolioDD < ddLevels[2]*-1 :
if DDreducing: self.DDDecay = min(maxDDLevel,max(sp*decayLevels[2], self.DDDecay*1.1))
else: self.DDDecay = max(sp*decayLevels[2], min(sp*decayLevels[1],self.DDDecay*0.9))
elif currentPortfolioDD < ddLevels[1]*-1:
if DDreducing: self.DDDecay = min(maxDDLevel,max(sp*decayLevels[1], self.DDDecay*1.1))
else: self.DDDecay = max(sp*decayLevels[1], self.DDDecay*0.9)
else:
if DDreducing: self.DDDecay = min(maxDDLevel,max(sp*decayLevels[0], self.DDDecay*1.1))
else: self.DDDecay = self.DDDecay #max(sp*decayLevels[0], self.DDDecay*0.95)
self.algo.Plot('DD Decay', 'DD Decay', self.DDDecay)
self.algo.Log('DD Decay ' + str(self.DDDecay))
self.portfolioLastValue = self.algo.Portfolio.TotalPortfolioValue
def GetTodayDate(self):
""" Get today date and update relevant dictionaries """
# get today date for reference
self.todayDate = datetime(self.algo.Time.year, self.algo.Time.month, self.algo.Time.day).date()
# initialize today empty list for alphas insight returns and daily returns
for key in self.alphasInsightReturns:
self.alphasInsightReturns[key][self.todayDate] = []
self.alphasDailyReturns[key][self.todayDate] = np.nan#region imports
from AlgorithmImports import *
#endregion
import pickle
import numpy as np
import general_utils as utils
class InAndOutAlphaCreationModel(AlphaModel):
def __init__(self, algorithm, name, activateAlpha, assetsWeights, rebalancingFrequency):
self.algo = algorithm
self.Name = name
self.activateAlpha = activateAlpha
self.assetsWeights = assetsWeights
self.rebalancingFrequency = rebalancingFrequency
self.weightsDict = {}
self.timeToEmitInsights = False
self.WeekEndIsHere = False
self.weightsRetrieved = False
self.previousVolatilityAllocation = 0
self.stocksWeight = 0.8
self.MMRatio = 0.5
self.permInsurance = 0.10
# market and list of signals based on ETFs
self.signalInstruments = {'PRDC': ['XLI', None], 'METL': ['DBB', None], 'NRES': ['IGE', None],
'DEBT': ['SHY', None], 'USDX': ['UUP', None], 'GOLD': ['GLD', None],
'SLVA': ['SLV', None], 'UTIL': ['XLU', None], 'SHCU': ['FXF', None],
'RICU': ['FXA', None], 'INDU': ['XLI', None], 'MRKT': ['QQQ', None], 'MM': ['UUP', None],
'TLT': ['TLT', None], 'QQQ': ['QQQ', None], 'SPY': ['SPY', None], 'IEF': ['IEF', None], 'TIP': ['TIP', None]}
for signal, tickerSymbol in self.signalInstruments.items():
self.signalInstruments[signal][1] = self.algo.AddEquity(tickerSymbol[0], Resolution.Hour).Symbol
self.smaSPY = self.algo.SMA(self.signalInstruments['SPY'][1], 231, Resolution.Daily)
self.smaTLT = self.algo.SMA(self.signalInstruments['IEF'][1], 21, Resolution.Daily)
self.smaGLD = self.algo.SMA(self.signalInstruments['TIP'][1], 21, Resolution.Daily)
self.smaMM = self.algo.SMA(self.signalInstruments['MM'][1], 21, Resolution.Daily)
self.signalSymbols = [self.signalInstruments['PRDC'][1], self.signalInstruments['METL'][1],
self.signalInstruments['NRES'][1], self.signalInstruments['USDX'][1],
self.signalInstruments['DEBT'][1], self.signalInstruments['MRKT'][1]]
self.forPairs = [self.signalInstruments['GOLD'][1], self.signalInstruments['SLVA'][1],
self.signalInstruments['UTIL'][1], self.signalInstruments['INDU'][1]]
self.pairList = ['G_S', 'U_I']
# initialize constants and variables
# [out for 3 trading weeks, set period for returns sample,
# 'in'/'out' indicator, count of total days since start,
# daysCount when self.beIn=0, portfolio value]
self.initWaitDays, self.lookback, self.beIn, self.daysCount, self.outDay = [15, 252 * 5, True, 0, 0]
# create symbols list
self.symbols = list(set(self.signalSymbols + [self.signalInstruments['MRKT'][1]]
+ self.forPairs + [self.signalInstruments['QQQ'][1]]))
# retrieve information from object store in live mode
if self.algo.LiveMode or self.algo.BackTestWarmUp:
dictFromOS = utils.ReadFromObjectStore(self.algo, self.Name)
if 'daysCount' in dictFromOS and 'outDay' in dictFromOS and 'stocksweight' in dictFromOS:
self.algo.Log('IO; retrieving variable states from object store; daysCount: '
+ str(dictFromOS['daysCount']) + '; outDay: ' + str(dictFromOS['outDay']) + '; stocksweight: ' + str(dictFromOS['stocksweight']))
self.daysCount, self.outDay, self.stocksWeight = [dictFromOS['daysCount'], dictFromOS['outDay'], dictFromOS['stocksweight']]
self.weightsRetrieved = True
def Update(self, algorithm, data):
insights = []
if not self.timeToEmitInsights:
return insights
insightExpiry = Expiry.EndOfDay(algorithm.Time)
# loop through insights and send portfolio targets
for symbol, weight in self.weightsDict.items():
if symbol.Value.split('_')[0] != 'CUSTOM' and not data.ContainsKey(symbol):
continue
if weight > 0:
insightDirection = InsightDirection.Up
elif weight < 0:
insightDirection = InsightDirection.Down
else:
insightDirection = InsightDirection.Flat
insights.append(Insight.Price(symbol, insightExpiry, insightDirection, None, None, None, abs(weight)))
self.timeToEmitInsights = False
return insights
def GenerateSignals(self):
self.timeToEmitInsights = True
def EvaluateSignals(self):
""" Create new signals """
if not self.activateAlpha:
return
self.algo.Log(str(self.Name) + " Parameters ------------->")
# get historical data
latestHistory, latestHistoryShifted = self.GetHistoricalData()
# returns sample to detect extreme observations
returnsSample = (latestHistory / latestHistoryShifted) - 1
# Reverse code USDX: sort largest changes to bottom
returnsSample[self.signalInstruments['USDX'][1]] = returnsSample[self.signalInstruments['USDX'][1]] * -1
# for pairs, take returns differential, reverse coded
returnsSample['G_S'] = -(returnsSample[self.signalInstruments['GOLD'][1]] - returnsSample[self.signalInstruments['SLVA'][1]])
returnsSample['U_I'] = -(returnsSample[self.signalInstruments['UTIL'][1]] - returnsSample[self.signalInstruments['INDU'][1]])
# extreme observations; statistical significance = 5%
extremeB = returnsSample.iloc[-1] < np.nanpercentile(returnsSample, 5, axis=0)
# re-assess/disambiguate double-edged signals
aboveMedian = returnsSample.iloc[-1] > np.nanmedian(returnsSample, axis=0)
# interest rate expectations (cost of debt) may increase because the economic outlook improves
# (showing in rising input prices) = actually not a negative signal
extremeB.loc[self.signalInstruments['DEBT'][1]] = np.where((extremeB.loc[self.signalInstruments['DEBT'][1]].any())
& (aboveMedian[[self.signalInstruments['METL'][1],
self.signalInstruments['NRES'][1]]].any()),
False, extremeB.loc[self.signalInstruments['DEBT'][1]])
# determine whether 'in' or 'out' of the market
if (extremeB[self.signalSymbols + self.pairList]).any():
self.beIn = False
self.outDay = self.daysCount
if self.daysCount >= self.outDay + self.initWaitDays:
self.beIn = True
tip = 0
ief = 0
uup = 0
if self.algo.Securities[self.signalInstruments['TIP'][1]].Price > self.smaGLD.Current.Value: tip = 1
if self.algo.Securities[self.signalInstruments['IEF'][1]].Price > self.smaTLT.Current.Value: ief = 1
if self.algo.Securities[self.signalInstruments['MM'][1]].Price > self.smaMM.Current.Value: self.algo.uup = 1
if self.algo.Securities[self.signalInstruments['TIP'][1]].Price != 0 and self.smaGLD.Current.Value != 0:
self.algo.TLTorGLD = (self.algo.Securities[self.signalInstruments['IEF'][1]].Price / self.algo.Securities[self.signalInstruments['TIP'][1]].Price) \
> (self.smaTLT.Current.Value/ self.smaGLD.Current.Value)
else:
self.algo.TLTorGLD = True
if self.algo.allAssetsBelowMA and self.algo.uup == 1:
uupallocation = 1.0
else: uupallocation = 0.5
if self.algo.TLTorGLD:
if ief == 1:
tmfallocation = 1-uupallocation
tipallocation = 0
else:
tmfallocation = 0.0
tipallocation = 1 - uupallocation
iefallocation = 0
else:
tmfallocation = 0
iefallocation = 0
tipallocation = 1- uupallocation
self.algo.Log("IO - Is IEF > TIP ? :" + str(self.algo.TLTorGLD))
volatilityMax = 0.80
volatilityAllocation = 0
if self.algo.risklevel in [5]:
volatilityAllocation = volatilityMax
insurancePercentage = 0.90
elif self.algo.risklevel in [4]:
volatilityAllocation = volatilityMax*0.20
insurancePercentage = 0.95
elif self.algo.risklevel in [2,3]:
volatilityAllocation = volatilityMax*0.00
insurancePercentage = 1
else:
volatilityAllocation = 0
insurancePercentage = 1.00
if self.algo.risklevel in [4,5]:
self.stocksWeight = self.stocksWeight * insurancePercentage
volatilityAllocation = volatilityAllocation/insurancePercentage
else:
#self.permInsurance = 0.10
if self.algo.reversalRisk:
self.stocksWeight = 0.3
#self.beIn = False
else:
self.stocksWeight = 0.8
if not self.beIn:
self.stocksWeight = 0.3
self.assetsWeights['stocksProxy'][1] = self.stocksWeight
self.assetsWeights['volatilityProxy'][1] = 0.0
self.assetsWeights['volProxy'][1] = 0.0
self.assetsWeights['bondsProxy'][1] = (1 - self.stocksWeight)*tmfallocation
self.assetsWeights['cashProxy'][1] = (1 - self.stocksWeight)*iefallocation
self.assetsWeights['inflProxy'][1] = (1 - self.stocksWeight)*tipallocation
self.assetsWeights['goldProxy'][1] = (1 - self.stocksWeight)*0
self.assetsWeights['mmProxy'][1] = (1 - self.stocksWeight)*uupallocation
if self.WeekEndIsHere or self.weightsRetrieved == True or self.stocksWeight > 0.3:
self.WeekEndIsHere = False
self.weightsRetrieved = False
if self.beIn:
self.assetsWeights['stocksProxy'][1] = self.stocksWeight
self.assetsWeights['volatilityProxy'][1] = 0.0
self.assetsWeights['volProxy'][1] = 0.0
self.assetsWeights['bondsProxy'][1] = (1 - self.stocksWeight)*tmfallocation
self.assetsWeights['cashProxy'][1] = (1 - self.stocksWeight)*iefallocation
self.assetsWeights['inflProxy'][1] = (1 - self.stocksWeight)*tipallocation
self.assetsWeights['goldProxy'][1] = (1 - self.stocksWeight)*0
self.assetsWeights['mmProxy'][1] = (1 - self.stocksWeight)*uupallocation
if self.previousVolatilityAllocation != volatilityAllocation or self.algo.stopLossTriggered:
volatilityAllocationChanged = True
else:
volatilityAllocationChanged = False
self.previousVolatilityAllocation = volatilityAllocation
if volatilityAllocationChanged:
self.assetsWeights['volatilityProxy'][1] = volatilityAllocation*(1-self.permInsurance)
self.assetsWeights['volProxy'][1] = volatilityAllocation*self.permInsurance
self.assetsWeights['bondsProxy'][1] = self.assetsWeights['bondsProxy'][1]*(1- volatilityAllocation)
self.assetsWeights['cashProxy'][1] = self.assetsWeights['cashProxy'][1]*(1- volatilityAllocation)
self.assetsWeights['stocksProxy'][1] = self.assetsWeights['stocksProxy'][1]*(1- volatilityAllocation )
self.assetsWeights['inflProxy'][1] = self.assetsWeights['inflProxy'][1]*(1- volatilityAllocation)
self.assetsWeights['goldProxy'][1] = self.assetsWeights['goldProxy'][1]*(1- volatilityAllocation )
self.assetsWeights['mmProxy'][1] = self.assetsWeights['mmProxy'][1]*(1- volatilityAllocation)
# totalweight = self.assetsWeights['volatilityProxy'][1] + self.assetsWeights['volProxy'][1] + self.assetsWeights['bondsProxy'][1] + \
# self.assetsWeights['cashProxy'][1] + self.assetsWeights['stocksProxy'][1] + self.assetsWeights['inflProxy'][1] + \
# self.assetsWeights['goldProxy'][1] + self.assetsWeights['mmProxy'][1]
# self.algo.Debug("Total Weight :" + str(totalweight))
# update the weightsDict
for proxy, symbolWeight in self.assetsWeights.items():
self.weightsDict[symbolWeight[0]] = symbolWeight[1]
self.daysCount += 1
#self.timeToEmitInsights = True
#self.CalculateModelLeverageFactors()
if self.algo.LiveMode or self.algo.BackTestWarmUp == True:
dictToOS = {'daysCount': self.daysCount, 'outDay': self.outDay, 'stocksweight': self.stocksWeight}
utils.SaveToObjectStore(self.algo, self.Name, dictToOS)
self.algo.Log('InOut : ' + str(self.beIn))
self.algo.Log('InOut Weights - SBC : ' + str(round(self.assetsWeights['stocksProxy'][1], 2))
+ "|" + str(round(self.assetsWeights['bondsProxy'][1], 2))
+ "|" + str(round(self.assetsWeights['cashProxy'][1], 2))
+ "|" + str(round(self.assetsWeights['volatilityProxy'][1], 2)))
def WeekEnd(self):
self.WeekEndIsHere = True
def CalculateModelLeverageFactors(self):
for key in self.algo.alphasDD:
if key == self.Name:
if self.algo.alphasDD[key] < -0.20 :
self.algo.leverageFactors[key] = self.algo.leverageFactors[key]*0.98
elif self.algo.alphasDD[key] < -0.10:
self.algo.leverageFactors[key] = self.algo.leverageFactors[key]*0.99
else:
self.algo.leverageFactors[key] = 1
for key in self.weightsDict:
self.weightsDict[key] = self.weightsDict[key]* float(self.algo.leverageFactors[self.Name])
# Adds the non-leveraged portion to the "cash" portion of the algo
for symbol in self.weightsDict.keys():
if symbol.Value == 'TIP':
self.weightsDict[symbol] = self.weightsDict[symbol] + max(0,1-float(self.algo.leverageFactors[self.Name]))
def OnSecuritiesChanged(self, algorithm, changes):
""" Actions to take every time the universe changes """
for security in changes.AddedSecurities:
if security.Symbol.Value == 'SPY':
symbol = security.Symbol
algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.GenerateSignals)
algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes + 1), self.EvaluateSignals)
algorithm.Schedule.On(algorithm.DateRules.WeekEnd(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes), self.WeekEnd)
def GetHistoricalData(self):
""" Get two dataframes with historical data """
latestHistory = self.algo.History(self.symbols, self.lookback, Resolution.Daily)
latestHistory = latestHistory['close'].unstack(level=0).dropna()
latestHistoryShifted = latestHistory.rolling(11, center=True).mean().shift(60)
return latestHistory, latestHistoryShifted#region imports
from AlgorithmImports import *
#endregion
class MovingAverageAlphaCreationModel(AlphaModel):
def __init__(self, algorithm, name, activateAlpha, parametersDict, canarySymbol,
periodShortSMA, periodLongSMA, movingAverageThresholds, rebalancingFrequency):
self.algo = algorithm
self.Name = name
self.activateAlpha = activateAlpha
self.parametersDict = parametersDict
self.movingAverageThresholds = movingAverageThresholds
self.rebalancingFrequency = rebalancingFrequency
self.weightsDict = {}
self.timeToEmitInsights = False
self.smaRatio = None
if self.activateAlpha:
shortSMA = algorithm.SMA(canarySymbol, periodShortSMA, Resolution.Daily)
longSMA = algorithm.SMA(canarySymbol, periodLongSMA, Resolution.Daily)
self.smaRatio = IndicatorExtensions.Over(shortSMA, longSMA)
def Update(self, algorithm, data):
insights = []
if not self.timeToEmitInsights:
return insights
insightExpiry = Expiry.EndOfDay(algorithm.Time)
# loop through insights and send portfolio targets
for symbol, weight in self.weightsDict.items():
if symbol.Value.split('_')[0] != 'CUSTOM' and not data.ContainsKey(symbol):
continue
if weight > 0:
insightDirection = InsightDirection.Up
elif weight < 0:
insightDirection = InsightDirection.Down
else:
insightDirection = InsightDirection.Flat
insights.append(Insight.Price(symbol, insightExpiry, insightDirection, None, None, None, abs(weight)))
self.timeToEmitInsights = False
return insights
def GenerateSignals(self):
""" Create new signals """
if not self.activateAlpha or not self.smaRatio.IsReady:
return
if self.smaRatio.Current.Value >= self.movingAverageThresholds[0]:
self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][0]
for security in self.algo.ActiveSecurities.Values
if security.Symbol.Value in self.parametersDict}
elif self.smaRatio.Current.Value >= self.movingAverageThresholds[1]:
self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][1]
for security in self.algo.ActiveSecurities.Values
if security.Symbol.Value in self.parametersDict}
else:
self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][2]
for security in self.algo.ActiveSecurities.Values
if security.Symbol.Value in self.parametersDict}
self.timeToEmitInsights = True
def OnSecuritiesChanged(self, algorithm, changes):
""" Actions to take every time the universe changes """
for security in changes.AddedSecurities:
if security.Symbol.Value == 'SPY':
symbol = security.Symbol
algorithm.Schedule.On(self.rebalancingFrequency[0](symbol),
algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]),
self.GenerateSignals)
#region imports
from AlgorithmImports import *
#endregion
import numpy as np
from scipy.optimize import minimize
class PortfolioOptimizer:
"""
Description:
Implementation of a custom optimizer that calculates the weights for each asset to optimize a given objective function
Details:
Optimization can be:
- Equal Weighting
- Maximize Portfolio Return
- Minimize Portfolio Standard Deviation
- Maximize Portfolio Sharpe Ratio
- Maximize Portfolio Sortino Ratio
- Risk Parity Portfolio
- Target Return (minimize Standard Deviation given a min target return)
- Target Variance (maximize Return given a max target variance)
- Target Variance Negative Returns (maximize Return given a max target variance during negative returns)
Constraints:
- Weights must be between some given boundaries
- Weights must sum to 1
"""
def __init__(self):
""" Initialize the CustomPortfolioOptimizer """
self.targetReturn = 0.1
self.targetVariance = 0.20
def Optimize(self, objFunction, dailyReturnsDf, bounds):
"""
Description:
Perform portfolio optimization given a series of returns
Args:
objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance,
maxSharpe, maxSortino, riskParity, targetReturn,
targetVariance, targetVarianceNegativeReturns)
dailyReturnsDf: DataFrame of historical daily arithmetic returns
bounds: List of tuples with the min and max weights for each element
Returns:
Array of double with the portfolio weights (size: K x 1)
"""
# initial weights: equally weighted
size = dailyReturnsDf.columns.size # K x 1
self.initWeights = np.array(size * [1. / size])
# get sample covariance matrix
covariance = dailyReturnsDf.cov()
# get the sample covariance matrix of only negative returns for sortino ratio
negativeReturnsDf = dailyReturnsDf[dailyReturnsDf < 0]
covarianceNegativeReturns = negativeReturnsDf.cov()
if objFunction == 'equalWeighting':
return self.initWeights
# create constraints
constraints = {'type': 'eq', 'fun': lambda x: np.sum(x) - 1.0}
if objFunction == 'targetReturn':
constraints.update({'type': 'eq', 'fun': lambda weights: self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf,
weights) - self.targetReturn}, {'type': 'eq', 'fun': lambda x: np.sum(x) - 1.0})
if objFunction == 'targetVariance':
constraints = ({'type': 'ineq', 'fun': lambda weights: self.targetVariance - self.CalculateAnnualizedPortfolioStd(covariance, weights)},
{'type': 'ineq', 'fun': lambda x: 1.0 - np.sum(x)} )
if objFunction == 'targetVarianceNegativeReturns':
constraints = ({'type': 'ineq', 'fun': lambda weights: self.targetVariance - self.CalculateAnnualizedPortfolioNegativeStd(covarianceNegativeReturns, weights)},
{'type': 'ineq', 'fun': lambda x: 1.0 - np.sum(x)} )
opt = minimize(lambda weights: self.ObjectiveFunction(objFunction, dailyReturnsDf,
covariance, covarianceNegativeReturns,
weights),
x0=self.initWeights,
bounds=bounds,
constraints=constraints,
method='SLSQP')
return opt['x']
def ObjectiveFunction(self, objFunction, dailyReturnsDf, covariance, covarianceNegativeReturns, weights):
"""
Description:
Compute the objective function
Args:
objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance,
maxSharpe, maxSortino, riskParity)
dailyReturnsDf: DataFrame of historical daily returns
covariance: Sample covariance
covarianceNegativeReturns: Sample covariance matrix of only negative returns
weights: Portfolio weights
"""
if objFunction == 'maxReturn':
f = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
return -f # convert to negative to be maximized
elif objFunction == 'minVariance':
f = self.CalculateAnnualizedPortfolioStd(covariance, weights)
return f
elif objFunction == 'maxSharpe':
f = self.CalculateAnnualizedPortfolioSharpeRatio(dailyReturnsDf, covariance, weights)
return -f # convert to negative to be maximized
elif objFunction == 'maxSortino':
f = self.CalculateAnnualizedPortfolioSortinoRatio(dailyReturnsDf, covarianceNegativeReturns, weights)
return -f # convert to negative to be maximized
elif objFunction == 'riskParity':
f = self.CalculateRiskParityFunction(covariance, weights)
return f
elif objFunction == 'targetReturn':
f = self.CalculateAnnualizedPortfolioStd(covariance, weights)
return f
elif objFunction in ['targetVariance', 'targetVarianceNegativeReturns']:
f = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
return -f # convert to negative to be maximized
else:
raise ValueError(f'PortfolioOptimizer.ObjectiveFunction: objFunction input has to be one of equalWeighting,'
+ ' maxReturn, minVariance, maxSharpe, maxSortino, riskParity, targetReturn,'
+ ' targetVariance', 'targetVarianceNegativeReturns')
def CalculateAnnualizedPortfolioReturn(self, dailyReturnsDf, weights):
annualizedPortfolioReturns = np.sum(((1 + dailyReturnsDf.mean())**252 - 1) * weights)
return annualizedPortfolioReturns
def CalculateAnnualizedPortfolioStd(self, covariance, weights):
annualizedPortfolioStd = np.sqrt( np.dot(weights.T, np.dot(covariance * 252, weights)) )
if annualizedPortfolioStd == 0:
raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioStd: annualizedPortfolioStd cannot be zero. Weights: {weights}')
return annualizedPortfolioStd
def CalculateAnnualizedPortfolioNegativeStd(self, covarianceNegativeReturns, weights):
annualizedPortfolioNegativeStd = np.sqrt(np.dot(weights.T, np.dot(covarianceNegativeReturns * 252, weights)))
if annualizedPortfolioNegativeStd == 0:
raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioNegativeStd: annualizedPortfolioNegativeStd cannot be zero. Weights: {weights}')
return annualizedPortfolioNegativeStd
def CalculateAnnualizedPortfolioSharpeRatio(self, dailyReturnsDf, covariance, weights):
annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
annualizedPortfolioStd = self.CalculateAnnualizedPortfolioStd(covariance, weights)
annualizedPortfolioSharpeRatio = annualizedPortfolioReturn / annualizedPortfolioStd
return annualizedPortfolioSharpeRatio
def CalculateAnnualizedPortfolioSortinoRatio(self, dailyReturnsDf, covarianceNegativeReturns, weights):
annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
annualizedPortfolioNegativeStd = self.CalculateAnnualizedPortfolioNegativeStd(covarianceNegativeReturns, weights)
annualizedPortfolioSortinoRatio = annualizedPortfolioReturn / annualizedPortfolioNegativeStd
return annualizedPortfolioSortinoRatio
def CalculateRiskParityFunction(self, covariance, weights):
""" Spinu formulation for risk parity portfolio """
assetsRiskBudget = self.initWeights
portfolioVolatility = self.CalculateAnnualizedPortfolioStd(covariance, weights)
x = weights / portfolioVolatility
riskParity = (np.dot(x.T, np.dot(covariance, x)) / 2) - np.dot(assetsRiskBudget.T, np.log(x))
return riskParity
#region imports
from AlgorithmImports import *
#endregion
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm.Framework")
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
class PriceVolumeUniverseSelectionModel(FundamentalUniverseSelectionModel):
def __init__(self,
minPrice=20,
nStocks=500,
filterFineData=False,
universeSettings=None):
super().__init__(filterFineData, universeSettings)
self.minPrice = minPrice # min price for stock selection
self.nStocks = nStocks # number of stocks to select in universe
def SelectCoarse(self, algorithm, coarse):
""" Perform coarse selection based on price and dollar volume """
# filter universe to select only stocks with price above
preFiltered = [x for x in coarse if x.HasFundamentalData and x.Price > self.minPrice]
# sort the tickers by criteria and take the top candidates by dollar volume
topDollarVolume = sorted(preFiltered, key=lambda x: x.DollarVolume, reverse=True)[:self.nStocks]
topDollarVolumeSymbols = [x.Symbol for x in topDollarVolume]
return topDollarVolumeSymbols
#region imports
from AlgorithmImports import *
#endregion
from OptimizerClass import PortfolioOptimizer
import pandas as pd
import general_utils as utils
class RotationalOptimizer1xAlphaCreationModel(AlphaModel):
def __init__(self, algorithm, name, activateAlpha, parametersDict, rebalancingFrequency,
lookbackOptimization, objFunction, tickerCash, treasuryRatesSymbol):
self.algo = algorithm
self.Name = name
self.activateAlpha = activateAlpha
self.parametersDict = parametersDict
self.rebalancingFrequency = rebalancingFrequency
self.lookbackOptimization = lookbackOptimization
self.objFunction = objFunction
self.tickerCash = tickerCash
self.treasuryRatesSymbol = treasuryRatesSymbol
self.insuranceMonitor = False
self.triggerSignals = False
self.tickerVolatility = self.algo.volatilityTicker
self.tickerVol = self.algo.volTicker
self.permInsurance = 0.5
# for yield signal crisis
self.lookbackNegativeYield = 147 # number of days to lookback for negative values
self.startCrisisYieldValue = 0.3 # the yield value above which we apply the yield weight condition
# initialize the optimizer
self.optimizer = PortfolioOptimizer()
# get all the parameters for the indicators
valuesList = []
self.alphaTickers = []
for ticker in parametersDict.keys():
if parametersDict[ticker]['addTicker'][0]:
valuesList.append(parametersDict[ticker]['sma'][0])
valuesList.append(sum(parametersDict[ticker]['macd'][0]))
self.alphaTickers.append(ticker)
tradableTicker = parametersDict[ticker]['addTicker'][1]
if tradableTicker != ticker:
self.alphaTickers.append(tradableTicker)
#self.alphaTickers.append(self.tickerCash)
self.alphaTickers.append(self.tickerVolatility)
self.alphaTickers.append(self.tickerVol)
self.alphaTickers.append("TIP")
self.alphaTickers.append("IEF")
self.alphaTickers.append("GLD")
self.smaIEF = self.algo.SMA("IEF", 21, Resolution.Daily)
self.smaTIP = self.algo.SMA("TIP", 21, Resolution.Daily)
self.smaSPY = self.algo.SMA("SPY", 21, Resolution.Daily)
self.smaGLD = self.algo.SMA("GLD", 21, Resolution.Daily)
#self.allAssetsBelowMA = False
# keep the highest parameter provided to call history
self.lookbackHistory = max(lookbackOptimization, max(valuesList))
self.weightsDict = {}
self.timeToEmitInsights = False
self.atrStocksCrossover = None
self.atrBondsCrossover = None
self.weightsRetrieved = False
self.weekStart = False
# Sudhir changes to code to optimize on start if nothing found in object store.
if self.algo.LiveMode:
dictFromOS = utils.ReadFromObjectStore(self.algo, self.Name)
if dictFromOS:
self.algo.Log(str(self.Name) + 'Retrieving last optimal weights from object store: ' + str(dictFromOS))
self.weightsDict = {self.algo.Symbol(ticker): weight for ticker, weight in dictFromOS.items()}
self.weightsRetrieved = True
def Update(self, algorithm, data):
insights = []
if not self.timeToEmitInsights:
return insights
insightExpiry = Expiry.EndOfDay(algorithm.Time)
# loop through insights and send portfolio targets
for symbol, weight in self.weightsDict.items():
if symbol.Value.split('_')[0] != 'CUSTOM' and not data.ContainsKey(symbol):
continue
if weight > 0:
insightDirection = InsightDirection.Up
elif weight < 0:
insightDirection = InsightDirection.Down
else:
insightDirection = InsightDirection.Flat
insights.append(Insight.Price(symbol, insightExpiry, insightDirection, None, None, None, abs(weight)))
self.timeToEmitInsights = False
return insights
def CalculateSignals(self):
""" Create new signals """
if not self.activateAlpha:
return
# get active symbols for this alpha
symbols = [x.Symbol for x in self.algo.ActiveSecurities.Values if x.Symbol.Value in self.alphaTickers]
# determine target percent for the given insights (check function DetermineTargetPercent for details)
self.weightsDict = self.DetermineTargetPercent(self.algo, symbols)
#self.timeToEmitInsights = True
if self.algo.LiveMode or self.algo.BackTestWarmUp == True:
dictToOS = {symbol.Value: weight for symbol, weight in self.weightsDict.items()}
utils.SaveToObjectStore(self.algo, self.Name, dictToOS)
def GenerateSignals(self):
self.timeToEmitInsights = True
def CheckForSignals(self):
if not self.activateAlpha:
return
self.algo.Log(str(self.Name) + " Parameters ------------->")
self.algo.allAssetsBelowMA = self.algo.Securities["SPY"].Price < self.smaSPY.Current.Value and self.algo.Securities["IEF"].Price < self.smaIEF.Current.Value and self.algo.Securities["GLD"].Price < self.smaGLD.Current.Value
if self.weightsRetrieved == True:
self.weightsRetrieved = False
#self.timeToEmitInsights = True
if self.triggerSignals:
self.algo.Log(str(self.Name) + 'Triggering GenerateSignals based on schedule')
self.CalculateSignals()
self.triggerSignals = False
else:
if (self.atrStocksCrossover is not None and self.atrBondsCrossover is not None
and (self.atrStocksCrossover.Current.Value > 1.5 or self.atrBondsCrossover.Current.Value > 1.5) # ):
and (self.weekStart)):
self.algo.Log(str(self.Name) + 'Triggering GenerateSignals due to ATR Crossover')
self.CalculateSignals()
if self.algo.isRiskHighest or self.algo.isRiskHigh or self.algo.isRisk: # or self.algo.reversalRisk:
self.insuranceMonitor = True
self.algo.Log(str(self.Name) + 'Triggering Weight Adjustment due to High Risk flag')
# get active symbols for this alpha
symbols = [x.Symbol for x in self.algo.ActiveSecurities.Values if x.Symbol.Value in self.alphaTickers]
self.AdjustWeightsForInsurance()
if self.insuranceMonitor == True and not (self.algo.isRiskHighest or self.algo.isRiskHigh or self.algo.isRisk or self.algo.reversalRisk):
self.CalculateSignals()
self.insuranceMonitor = False
self.weekStart = False
def AdjustWeightsForInsurance(self):
if self.algo.isRiskHighest:
insurancePercentage = 0.80
elif self.algo.isRisk or self.algo.isRiskHigh or self.algo.reversalRisk:
insurancePercentage = 0.90
else:
insurancePercentage = 1.00
if self.algo.isRiskHighest or self.algo.isRiskHigh or self.algo.isRisk or self.algo.reversalRisk:
stocksWeight = 0
for symbol, weight in self.weightsDict.items():
if symbol.Value == "SPXL" or symbol.Value == 'TQQQ':
stocksWeight = stocksWeight + self.weightsDict[symbol]
self.weightsDict[symbol] = self.weightsDict[symbol]*insurancePercentage
stocksWeight = stocksWeight*(1- insurancePercentage)
for symbol, weight in self.weightsDict.items():
if symbol.Value == 'TIP':
self.weightsDict[symbol] = self.weightsDict[symbol] + stocksWeight*0
if symbol.Value == 'IEF':
self.weightsDict[symbol] = self.weightsDict[symbol] + stocksWeight*0
if symbol.Value == self.tickerVolatility:
self.weightsDict[symbol] = self.weightsDict[symbol] + stocksWeight*(1-self.permInsurance)
if symbol.Value == self.tickerVol:
self.weightsDict[symbol] = self.weightsDict[symbol] + stocksWeight*self.permInsurance
#self.timeToEmitInsights = True
def OnWeekStart(self):
self.weekStart = True
def TriggerScheduledSignals(self):
self.triggerSignals = True
def OnSecuritiesChanged(self, algorithm, changes):
""" Actions to take every time the universe changes """
for security in changes.AddedSecurities:
if security.Symbol.Value == 'SPY':
symbol = security.Symbol
algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes), self.TriggerScheduledSignals)
algorithm.Schedule.On(algorithm.DateRules.EveryDay(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes + 1), self.CheckForSignals)
algorithm.Schedule.On(algorithm.DateRules.EveryDay(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.GenerateSignals)
algorithm.Schedule.On(algorithm.DateRules.WeekStart(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes), self.OnWeekStart)
# initialize atr indicators
if self.activateAlpha and security.Symbol.Value in ['SPY', 'TLT']:
shortATR = algorithm.ATR(security.Symbol, 10, Resolution.Hour)
longATR = algorithm.ATR(security.Symbol, 63, Resolution.Hour)
if security.Symbol.Value == 'SPY':
self.atrStocksCrossover = IndicatorExtensions.Over(shortATR, longATR)
elif security.Symbol.Value == 'TLT':
self.atrBondsCrossover = IndicatorExtensions.Over(shortATR, longATR)
def DetermineTargetPercent(self, algorithm, symbols):
"""
Description:
Determine the target percent for each insight
Args:
algorithm: The algorithm instance
symbols: The symbols to generate an insight for
"""
# empty dictionary to store portfolio targets by symbol
finalWeights = {}
# get symbols for calculations
calculationSymbols = [x for x in symbols if x.Value in self.parametersDict.keys()]
# get historical data for calculationSymbols for the last n trading days
history = algorithm.History(calculationSymbols, self.lookbackHistory, Resolution.Daily)
# empty dictionary for calculations
calculations = {}
# iterate over all symbols and perform calculations
for symbol in calculationSymbols:
# check if we have enough historical data, otherwise just skip this security
if not utils.CheckHistory(symbol, history):
algorithm.Log(str(symbol.Value) + ': skipping security due to no/not enough historical data')
continue
else:
# add symbol to calculations
calculations[symbol] = SymbolData(symbol, parametersDict=self.parametersDict)
try:
# get series of daily returns
calculations[symbol].CalculateDailyReturnSeries(history, self.lookbackOptimization)
# update technical indicators
calculations[symbol].UpdateIndicators(history)
except BaseException as e:
algorithm.Log('returning empty weights for now for: ' + str(symbol.Value)
+ 'due to ' + str(e) + '; we will try again at the next iteration')
return finalWeights
# calculate optimal weights
optWeights = self.CalculateOptimalWeights(algorithm, calculations)
algorithm.Log('optimal weights for the period: ' + str(optWeights))
if optWeights is None:
algorithm.Log('returning empty weights due to optWeights being None; we will try again at the next iteration')
return finalWeights
# modify optimal weights using specific criteria
finalWeights = self.FilterOptimalWeights(algorithm, calculations, optWeights)
algorithm.Log(str(self.Name) + 'Filtered optimal weights for the period: ' + str({symbol.Value: weight
for symbol, weight in finalWeights.items()}))
# check if we can trade the tradable symbol, otherwise we trade the original ticker
replaceSymbol = {}
for symbol in finalWeights:
tradableTicker = self.parametersDict[symbol.Value]['addTicker'][1]
if symbol.Value != tradableTicker:
tradableSymbol = algorithm.Symbol(tradableTicker)
# check if the price of tradableSymbol is not zero
if algorithm.Securities[tradableSymbol].Price != 0:
# we trade the original ticker
replaceSymbol[tradableSymbol] = symbol
for newSymbol, replacedSymbol in replaceSymbol.items():
finalWeights[newSymbol] = finalWeights[replacedSymbol]
finalWeights.pop(replacedSymbol)
# check how much we have allocated so far to risky assets
totalWeight = sum(finalWeights.values())
if totalWeight >= 1:
totalWeight = 1
algorithm.Log('total allocation after weight filtering: ' + str(totalWeight))
if self.algo.Securities["TIP"].Price != 0 and self.smaTIP.Current.Value != 0:
IEForTIP = (self.algo.Securities["IEF"].Price / self.algo.Securities["TIP"].Price) > (self.smaIEF.Current.Value/ self.smaTIP.Current.Value)
else:
IEForTIP = True
# allocate remaining cash to tickerCash
#cashWeight = 1 - totalWeight
#for symbol in symbols:
# if symbol.Value == 'SPXL': finalWeights[symbol] = finalWeights[symbol] + cashWeight/9
# if symbol.Value == 'TMF': finalWeights[symbol] = finalWeights[symbol] + cashWeight/9
# if symbol.Value == 'DGP': finalWeights[symbol] = finalWeights[symbol] + cashWeight/9
# if symbol.Value == 'TIP': finalWeights[symbol] = cashWeight*2/3
for symbol in symbols:
if IEForTIP == True:
if symbol.Value == 'IEF':
finalWeights[symbol] = (1 - totalWeight)*0
algorithm.Log(str(self.tickerCash) + '; final allocation for tickerCash: ' + str(finalWeights[symbol]))
if symbol.Value == 'TIP':
finalWeights[symbol] = (1- totalWeight)*2/3
if symbol.Value == 'UUP':
finalWeights[symbol] = finalWeights[symbol] + (1- totalWeight)*1/3
else:
if symbol.Value == 'TIP':
finalWeights[symbol] = (1 - totalWeight)*2/3
algorithm.Log(str('TIP') + '; final allocation for tickerCash: ' + str(finalWeights[symbol]))
if symbol.Value == 'IEF':
finalWeights[symbol] = 0
if symbol.Value == 'UUP':
finalWeights[symbol] = finalWeights[symbol] + (1 - totalWeight)*1/3
for symbol in symbols:
if symbol.Value == self.tickerVolatility:
finalWeights[symbol] = 0
# avoid very small numbers and make them 0
for symbol, weight in finalWeights.items():
if weight <= 1e-10:
finalWeights[symbol] = 0
return finalWeights
def CalculateOptimalWeights(self, algorithm, calculations):
"""
Description:
Calculate the individual weights for each symbol that optimize some given objective function
Args:
algorithm: The algorithm instance
calculations: Dictionary containing calculations for each symbol
"""
# create a dictionary keyed by the symbols in calculations with a pandas.Series as value
# to create a dataframe of returns
dailyReturnsDict = {symbol.Value: symbolData.dailyReturnsSeries for symbol, symbolData in calculations.items()}
dailyReturnsDf = pd.DataFrame(dailyReturnsDict)
listTickers = list(dailyReturnsDf.columns)
try:
# portfolio optimizer finds the optimal weights for the given data
bounds = [self.parametersDict[x]['optimizationBounds'] for x in dailyReturnsDf.columns]
listOptWeights = self.optimizer.Optimize(objFunction=self.objFunction, dailyReturnsDf=dailyReturnsDf,
bounds=bounds)
except BaseException as e:
algorithm.Log('Optimize failed due to ' + str(e))
return None
# create dictionary with the optimal weights by symbol
weights = {listTickers[i]: listOptWeights[i] for i in range(len(listTickers))}
# avoid very small numbers and make them 0
for ticker, weight in weights.items():
if weight <= 1e-10:
weights[ticker] = 0
return weights
def CheckYieldSignalCrisis(self):
""" Check the Yield condition """
yieldSignalCrisis = False
# get the last six months of historical USTREASURY/YIELD values
histYield = self.algo.History([self.treasuryRatesSymbol], self.lookbackNegativeYield + 1,
Resolution.Daily).loc[self.treasuryRatesSymbol]
tenYr = histYield['10 yr'] # get the 10-year yield
threeMo = histYield['3 mo'] # get the 3-month yield
tenYrMinusThreeMo = tenYr - threeMo # calculate the difference between the two
indexNegative = tenYrMinusThreeMo[tenYrMinusThreeMo < 0].head(1).index
# check if there was actually some negative yield values
if len(indexNegative) > 0:
cutOff = indexNegative[0]
# filter the series for days after that day with negative value
afterNegative = tenYrMinusThreeMo[tenYrMinusThreeMo.index > cutOff]
# check if at some point it reached our startCrisisYieldValue
if len(afterNegative) > 0 and max(afterNegative) > self.startCrisisYieldValue:
yieldSignalCrisis = True
return yieldSignalCrisis
def FilterOptimalWeights(self, algorithm, calculations, optWeights):
"""
Description:
Filter and modify the optimal weights using a combination of technical indicators
Args:
algorithm: The algorithm instance
calculations: Dictionary containing calculations for each symbol
optWeights: Dictionary with the optimal weights by symbol
"""
# check for yield signal crisis
yieldSignalCrisis = self.CheckYieldSignalCrisis()
# empty dictionary to store weights
weights = {}
# loop through calculations and check conditions for weight filtering ------------------------
for symbol, symbolData in calculations.items():
if symbolData.SMA.IsReady and symbolData.MACD.IsReady:
currentPrice = algorithm.ActiveSecurities[symbol].Price
# check if sma condition is met and act accordingly ----------------------------------
smaLowerBoundCondition = self.parametersDict[symbol.Value]['sma'][1][0]
smaUpperBoundCondition = self.parametersDict[symbol.Value]['sma'][1][1]
smaConditionWeight = self.parametersDict[symbol.Value]['sma'][2]
algorithm.Log(str(symbol.Value)
+ '; current price: ' + str(round(currentPrice, 2))
+ '; SMA: ' + str(round(symbolData.SMA.Current.Value, 2))
+ '; Price vs SMA: ' + str(round((currentPrice / symbolData.SMA.Current.Value) - 1, 2)))
if (currentPrice <= symbolData.SMA.Current.Value * (1 + smaLowerBoundCondition)
or currentPrice >= symbolData.SMA.Current.Value * (1 + smaUpperBoundCondition)):
weights[symbol] = min(optWeights[symbol.Value], smaConditionWeight)
algorithm.Log(str(symbol.Value)
+ '; modifying weight due to sma filtering from '
+ str(optWeights[symbol.Value]) + ' to ' + str(weights[symbol]))
else:
weights[symbol] = optWeights[symbol.Value]
smaModifiedWeight = weights[symbol]
# check if macd condition is met and act accordingly ----------------------------------
macdCondition = self.parametersDict[symbol.Value]['macd'][1]
macdConditionWeight = self.parametersDict[symbol.Value]['macd'][2]
# calculate our macd vs signal score between -1 and 1
macdMinusSignal = symbolData.MACD.Current.Value - symbolData.MACD.Signal.Current.Value
macdVsSignalScore = macdMinusSignal / (1 + abs(macdMinusSignal))
algorithm.Log(str(symbol.Value)
+ '; MACD: ' + str(round(symbolData.MACD.Current.Value, 2))
+ '; MACD Signal: ' + str(round(symbolData.MACD.Signal.Current.Value, 2))
+ '; MACD vs Signal Score: ' + str(round(macdVsSignalScore, 2)))
if macdVsSignalScore <= macdCondition:
weights[symbol] = min(smaModifiedWeight, macdConditionWeight)
algorithm.Log(str(symbol.Value)
+ '; modifying weight due to macd filtering from '
+ str(smaModifiedWeight) + ' to ' + str(weights[symbol]))
else:
weights[symbol] = smaModifiedWeight
macdModifiedWeight = weights[symbol]
# check if yield condition is met and act accordingly ----------------------------------
activateYield = self.parametersDict[symbol.Value]['yield'][0]
yieldConditionWeight = self.parametersDict[symbol.Value]['yield'][1]
if yieldSignalCrisis and activateYield:
weights[symbol] = min(macdModifiedWeight, yieldConditionWeight)
else:
weights[symbol] = macdModifiedWeight
else:
weights[symbol] = optWeights[symbol.Value]
algorithm.Log(str(symbol.Value) + '; indicators are not ready so assign optimal weight')
return weights
class SymbolData:
def __init__(self, symbol, parametersDict):
self.Symbol = symbol
self.dailyReturnsSeries = None
smaPeriod = parametersDict[symbol.Value]['sma'][0]
self.SMA = SimpleMovingAverage(smaPeriod)
macdFastPeriod = parametersDict[self.Symbol.Value]['macd'][0][0]
macdSlowPeriod = parametersDict[self.Symbol.Value]['macd'][0][1]
macdSignalPeriod = parametersDict[self.Symbol.Value]['macd'][0][2]
self.MACD = MovingAverageConvergenceDivergence(macdFastPeriod, macdSlowPeriod, macdSignalPeriod,
MovingAverageType.Exponential)
def CalculateDailyReturnSeries(self, history, lookbackOptimization):
""" Calculate the daily returns series for each security """
tempReturnsSeries = history.loc[str(self.Symbol)]['close'].pct_change(periods=2).dropna() # 2-day returns
self.dailyReturnsSeries = tempReturnsSeries[-lookbackOptimization:]
def UpdateIndicators(self, history):
""" Update the indicators with historical data """
for index, row in history.loc[str(self.Symbol)].iterrows():
self.SMA.Update(index, row['close'])
self.MACD.Update(index, row['close'])
#region imports
from AlgorithmImports import *
#endregion
from OptimizerClass import PortfolioOptimizer
import pandas as pd
import general_utils as utils
class RotationalOptimizer2xAlphaCreationModel(AlphaModel):
def __init__(self, algorithm, name, activateAlpha, parametersDict, rebalancingFrequency,
lookbackOptimization, objFunction, tickerCash, treasuryRatesSymbol):
self.algo = algorithm
self.Name = name
self.activateAlpha = activateAlpha
self.parametersDict = parametersDict
self.rebalancingFrequency = rebalancingFrequency
self.lookbackOptimization = lookbackOptimization
self.objFunction = objFunction
self.tickerCash = tickerCash
self.treasuryRatesSymbol = treasuryRatesSymbol
self.insuranceMonitor = False
self.triggerSignals = False
self.tickerVolatility = self.algo.volatilityTicker
self.tickerVol = self.algo.volTicker
self.permInsurance = 0.5
# for yield signal crisis
self.lookbackNegativeYield = 147 # number of days to lookback for negative values
self.startCrisisYieldValue = 0.3 # the yield value above which we apply the yield weight condition
# initialize the optimizer
self.optimizer = PortfolioOptimizer()
# get all the parameters for the indicators
valuesList = []
self.alphaTickers = []
for ticker in parametersDict.keys():
if parametersDict[ticker]['addTicker'][0]:
valuesList.append(parametersDict[ticker]['sma'][0])
valuesList.append(sum(parametersDict[ticker]['macd'][0]))
self.alphaTickers.append(ticker)
tradableTicker = parametersDict[ticker]['addTicker'][1]
if tradableTicker != ticker:
self.alphaTickers.append(tradableTicker)
#self.alphaTickers.append(self.tickerCash)
self.alphaTickers.append(self.tickerVolatility)
self.alphaTickers.append(self.tickerVol)
self.alphaTickers.append("TIP")
self.alphaTickers.append("IEF")
self.alphaTickers.append("GLD")
self.smaIEF = self.algo.SMA("IEF", 34, Resolution.Daily)
self.smaTIP = self.algo.SMA("TIP", 34, Resolution.Daily)
self.smaSPY = self.algo.SMA("SPY", 34, Resolution.Daily)
self.smaGLD = self.algo.SMA("GLD", 34, Resolution.Daily)
#self.allAssetsBelowMA = False
# keep the highest parameter provided to call history
self.lookbackHistory = max(lookbackOptimization, max(valuesList))
self.weightsDict = {}
self.timeToEmitInsights = False
self.atrStocksCrossover = None
self.atrBondsCrossover = None
self.weightsRetrieved = False
self.weekStart = False
# Sudhir changes to code to optimize on start if nothing found in object store.
if self.algo.LiveMode:
dictFromOS = utils.ReadFromObjectStore(self.algo, self.Name)
if dictFromOS:
self.algo.Log(str(self.Name) + 'Retrieving last optimal weights from object store: ' + str(dictFromOS))
self.weightsDict = {self.algo.Symbol(ticker): weight for ticker, weight in dictFromOS.items()}
self.weightsRetrieved = True
def Update(self, algorithm, data):
insights = []
if not self.timeToEmitInsights:
return insights
insightExpiry = Expiry.EndOfDay(algorithm.Time)
# loop through insights and send portfolio targets
for symbol, weight in self.weightsDict.items():
if symbol.Value.split('_')[0] != 'CUSTOM' and not data.ContainsKey(symbol):
continue
if weight > 0:
insightDirection = InsightDirection.Up
elif weight < 0:
insightDirection = InsightDirection.Down
else:
insightDirection = InsightDirection.Flat
insights.append(Insight.Price(symbol, insightExpiry, insightDirection, None, None, None, abs(weight)))
self.timeToEmitInsights = False
return insights
def CalculateSignals(self):
""" Create new signals """
if not self.activateAlpha:
return
# get active symbols for this alpha
symbols = [x.Symbol for x in self.algo.ActiveSecurities.Values if x.Symbol.Value in self.alphaTickers]
# determine target percent for the given insights (check function DetermineTargetPercent for details)
self.weightsDict = self.DetermineTargetPercent(self.algo, symbols)
#self.timeToEmitInsights = True
if self.algo.LiveMode or self.algo.BackTestWarmUp == True:
dictToOS = {symbol.Value: weight for symbol, weight in self.weightsDict.items()}
utils.SaveToObjectStore(self.algo, self.Name, dictToOS)
def GenerateSignals(self):
self.timeToEmitInsights = True
def CheckForSignals(self):
if not self.activateAlpha:
return
self.algo.Log(str(self.Name) + " Parameters ------------->")
self.algo.allAssetsBelowMA = self.algo.Securities["SPY"].Price < self.smaSPY.Current.Value and self.algo.Securities["IEF"].Price < self.smaIEF.Current.Value and self.algo.Securities["GLD"].Price < self.smaGLD.Current.Value
if self.weightsRetrieved == True:
self.weightsRetrieved = False
#self.timeToEmitInsights = True
if self.triggerSignals:
self.algo.Log(str(self.Name) + 'Triggering GenerateSignals based on schedule')
self.CalculateSignals()
self.triggerSignals = False
else:
if (self.atrStocksCrossover is not None and self.atrBondsCrossover is not None
and (self.atrStocksCrossover.Current.Value > 1.5 or self.atrBondsCrossover.Current.Value > 1.5) # ):
and (self.weekStart)):
self.algo.Log(str(self.Name) + 'Triggering GenerateSignals due to ATR Crossover')
self.CalculateSignals()
if self.algo.isRiskHighest or self.algo.isRiskHigh or self.algo.isRisk: # or self.algo.reversalRisk:
self.insuranceMonitor = True
self.algo.Log(str(self.Name) + 'Triggering Weight Adjustment due to High Risk flag')
# get active symbols for this alpha
symbols = [x.Symbol for x in self.algo.ActiveSecurities.Values if x.Symbol.Value in self.alphaTickers]
self.AdjustWeightsForInsurance()
if self.insuranceMonitor == True and not (self.algo.isRiskHighest or self.algo.isRiskHigh or self.algo.isRisk or self.algo.reversalRisk):
self.CalculateSignals()
self.insuranceMonitor = False
self.weekStart = False
def AdjustWeightsForInsurance(self):
if self.algo.isRiskHighest:
insurancePercentage = 0.80
elif self.algo.isRisk or self.algo.isRiskHigh or self.algo.reversalRisk:
insurancePercentage = 0.90
else:
insurancePercentage = 1.00
if self.algo.isRiskHighest or self.algo.isRiskHigh or self.algo.isRisk or self.algo.reversalRisk:
stocksWeight = 0
for symbol, weight in self.weightsDict.items():
if symbol.Value == "SPXL" or symbol.Value == 'TQQQ':
stocksWeight = stocksWeight + self.weightsDict[symbol]
self.weightsDict[symbol] = self.weightsDict[symbol]*insurancePercentage
stocksWeight = stocksWeight*(1- insurancePercentage)
for symbol, weight in self.weightsDict.items():
if symbol.Value == 'TIP':
self.weightsDict[symbol] = self.weightsDict[symbol] + stocksWeight*0
if symbol.Value == 'IEF':
self.weightsDict[symbol] = self.weightsDict[symbol] + stocksWeight*0
if symbol.Value == self.tickerVolatility:
self.weightsDict[symbol] = self.weightsDict[symbol] + stocksWeight*(1-self.permInsurance)
if symbol.Value == self.tickerVol:
self.weightsDict[symbol] = self.weightsDict[symbol] + stocksWeight*self.permInsurance
#self.timeToEmitInsights = True
def OnWeekStart(self):
self.weekStart = True
def TriggerScheduledSignals(self):
self.triggerSignals = True
def OnSecuritiesChanged(self, algorithm, changes):
""" Actions to take every time the universe changes """
for security in changes.AddedSecurities:
if security.Symbol.Value == 'SPY':
symbol = security.Symbol
algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes), self.TriggerScheduledSignals)
algorithm.Schedule.On(algorithm.DateRules.EveryDay(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes + 1), self.CheckForSignals)
algorithm.Schedule.On(algorithm.DateRules.EveryDay(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.GenerateSignals)
algorithm.Schedule.On(algorithm.DateRules.WeekStart(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes), self.OnWeekStart)
# initialize atr indicators
if self.activateAlpha and security.Symbol.Value in ['SPY', 'TLT']:
shortATR = algorithm.ATR(security.Symbol, 10, Resolution.Hour)
longATR = algorithm.ATR(security.Symbol, 63, Resolution.Hour)
if security.Symbol.Value == 'SPY':
self.atrStocksCrossover = IndicatorExtensions.Over(shortATR, longATR)
elif security.Symbol.Value == 'TLT':
self.atrBondsCrossover = IndicatorExtensions.Over(shortATR, longATR)
def DetermineTargetPercent(self, algorithm, symbols):
"""
Description:
Determine the target percent for each insight
Args:
algorithm: The algorithm instance
symbols: The symbols to generate an insight for
"""
# empty dictionary to store portfolio targets by symbol
finalWeights = {}
# get symbols for calculations
calculationSymbols = [x for x in symbols if x.Value in self.parametersDict.keys()]
# get historical data for calculationSymbols for the last n trading days
history = algorithm.History(calculationSymbols, self.lookbackHistory, Resolution.Daily)
# empty dictionary for calculations
calculations = {}
# iterate over all symbols and perform calculations
for symbol in calculationSymbols:
# check if we have enough historical data, otherwise just skip this security
if not utils.CheckHistory(symbol, history):
algorithm.Log(str(symbol.Value) + ': skipping security due to no/not enough historical data')
continue
else:
# add symbol to calculations
calculations[symbol] = SymbolData(symbol, parametersDict=self.parametersDict)
try:
# get series of daily returns
calculations[symbol].CalculateDailyReturnSeries(history, self.lookbackOptimization)
# update technical indicators
calculations[symbol].UpdateIndicators(history)
except BaseException as e:
algorithm.Log('returning empty weights for now for: ' + str(symbol.Value)
+ 'due to ' + str(e) + '; we will try again at the next iteration')
return finalWeights
# calculate optimal weights
optWeights = self.CalculateOptimalWeights(algorithm, calculations)
algorithm.Log('optimal weights for the period: ' + str(optWeights))
if optWeights is None:
algorithm.Log('returning empty weights due to optWeights being None; we will try again at the next iteration')
return finalWeights
# modify optimal weights using specific criteria
finalWeights = self.FilterOptimalWeights(algorithm, calculations, optWeights)
algorithm.Log(str(self.Name) + 'Filtered optimal weights for the period: ' + str({symbol.Value: weight
for symbol, weight in finalWeights.items()}))
# check if we can trade the tradable symbol, otherwise we trade the original ticker
replaceSymbol = {}
for symbol in finalWeights:
tradableTicker = self.parametersDict[symbol.Value]['addTicker'][1]
if symbol.Value != tradableTicker:
tradableSymbol = algorithm.Symbol(tradableTicker)
# check if the price of tradableSymbol is not zero
if algorithm.Securities[tradableSymbol].Price != 0:
# we trade the original ticker
replaceSymbol[tradableSymbol] = symbol
for newSymbol, replacedSymbol in replaceSymbol.items():
finalWeights[newSymbol] = finalWeights[replacedSymbol]
finalWeights.pop(replacedSymbol)
# check how much we have allocated so far to risky assets
totalWeight = sum(finalWeights.values())
if totalWeight >= 1:
totalWeight = 1
algorithm.Log('total allocation after weight filtering: ' + str(totalWeight))
if self.algo.Securities["TIP"].Price != 0 and self.smaTIP.Current.Value != 0:
IEForTIP = (self.algo.Securities["IEF"].Price / self.algo.Securities["TIP"].Price) > (self.smaIEF.Current.Value/ self.smaTIP.Current.Value)
else:
IEForTIP = True
# allocate remaining cash to tickerCash
#cashWeight = 1 - totalWeight
#for symbol in symbols:
# if symbol.Value == 'SPXL': finalWeights[symbol] = finalWeights[symbol] + cashWeight/9
# if symbol.Value == 'TMF': finalWeights[symbol] = finalWeights[symbol] + cashWeight/9
# if symbol.Value == 'DGP': finalWeights[symbol] = finalWeights[symbol] + cashWeight/9
# if symbol.Value == 'TIP': finalWeights[symbol] = cashWeight*2/3
for symbol in symbols:
if IEForTIP == True:
if symbol.Value == 'IEF':
finalWeights[symbol] = (1 - totalWeight)*0
algorithm.Log(str(self.tickerCash) + '; final allocation for tickerCash: ' + str(finalWeights[symbol]))
if symbol.Value == 'TIP':
finalWeights[symbol] = (1- totalWeight)*2/3
if symbol.Value == 'UUP':
finalWeights[symbol] = finalWeights[symbol] + (1- totalWeight)*1/3
else:
if symbol.Value == 'TIP':
finalWeights[symbol] = (1 - totalWeight)*2/3
algorithm.Log(str('TIP') + '; final allocation for tickerCash: ' + str(finalWeights[symbol]))
if symbol.Value == 'IEF':
finalWeights[symbol] = 0
if symbol.Value == 'UUP':
finalWeights[symbol] = finalWeights[symbol] + (1 - totalWeight)*1/3
for symbol in symbols:
if symbol.Value == self.tickerVolatility:
finalWeights[symbol] = 0
# avoid very small numbers and make them 0
for symbol, weight in finalWeights.items():
if weight <= 1e-10:
finalWeights[symbol] = 0
return finalWeights
def CalculateOptimalWeights(self, algorithm, calculations):
"""
Description:
Calculate the individual weights for each symbol that optimize some given objective function
Args:
algorithm: The algorithm instance
calculations: Dictionary containing calculations for each symbol
"""
# create a dictionary keyed by the symbols in calculations with a pandas.Series as value
# to create a dataframe of returns
dailyReturnsDict = {symbol.Value: symbolData.dailyReturnsSeries for symbol, symbolData in calculations.items()}
dailyReturnsDf = pd.DataFrame(dailyReturnsDict)
listTickers = list(dailyReturnsDf.columns)
try:
# portfolio optimizer finds the optimal weights for the given data
bounds = [self.parametersDict[x]['optimizationBounds'] for x in dailyReturnsDf.columns]
listOptWeights = self.optimizer.Optimize(objFunction=self.objFunction, dailyReturnsDf=dailyReturnsDf,
bounds=bounds)
except BaseException as e:
algorithm.Log('Optimize failed due to ' + str(e))
return None
# create dictionary with the optimal weights by symbol
weights = {listTickers[i]: listOptWeights[i] for i in range(len(listTickers))}
# avoid very small numbers and make them 0
for ticker, weight in weights.items():
if weight <= 1e-10:
weights[ticker] = 0
return weights
def CheckYieldSignalCrisis(self):
""" Check the Yield condition """
yieldSignalCrisis = False
# get the last six months of historical USTREASURY/YIELD values
histYield = self.algo.History([self.treasuryRatesSymbol], self.lookbackNegativeYield + 1,
Resolution.Daily).loc[self.treasuryRatesSymbol]
tenYr = histYield['10 yr'] # get the 10-year yield
threeMo = histYield['3 mo'] # get the 3-month yield
tenYrMinusThreeMo = tenYr - threeMo # calculate the difference between the two
indexNegative = tenYrMinusThreeMo[tenYrMinusThreeMo < 0].head(1).index
# check if there was actually some negative yield values
if len(indexNegative) > 0:
cutOff = indexNegative[0]
# filter the series for days after that day with negative value
afterNegative = tenYrMinusThreeMo[tenYrMinusThreeMo.index > cutOff]
# check if at some point it reached our startCrisisYieldValue
if len(afterNegative) > 0 and max(afterNegative) > self.startCrisisYieldValue:
yieldSignalCrisis = True
return yieldSignalCrisis
def FilterOptimalWeights(self, algorithm, calculations, optWeights):
"""
Description:
Filter and modify the optimal weights using a combination of technical indicators
Args:
algorithm: The algorithm instance
calculations: Dictionary containing calculations for each symbol
optWeights: Dictionary with the optimal weights by symbol
"""
# check for yield signal crisis
yieldSignalCrisis = self.CheckYieldSignalCrisis()
# empty dictionary to store weights
weights = {}
# loop through calculations and check conditions for weight filtering ------------------------
for symbol, symbolData in calculations.items():
if symbolData.SMA.IsReady and symbolData.MACD.IsReady:
currentPrice = algorithm.ActiveSecurities[symbol].Price
# check if sma condition is met and act accordingly ----------------------------------
smaLowerBoundCondition = self.parametersDict[symbol.Value]['sma'][1][0]
smaUpperBoundCondition = self.parametersDict[symbol.Value]['sma'][1][1]
smaConditionWeight = self.parametersDict[symbol.Value]['sma'][2]
algorithm.Log(str(symbol.Value)
+ '; current price: ' + str(round(currentPrice, 2))
+ '; SMA: ' + str(round(symbolData.SMA.Current.Value, 2))
+ '; Price vs SMA: ' + str(round((currentPrice / symbolData.SMA.Current.Value) - 1, 2)))
if (currentPrice <= symbolData.SMA.Current.Value * (1 + smaLowerBoundCondition)
or currentPrice >= symbolData.SMA.Current.Value * (1 + smaUpperBoundCondition)):
weights[symbol] = min(optWeights[symbol.Value], smaConditionWeight)
algorithm.Log(str(symbol.Value)
+ '; modifying weight due to sma filtering from '
+ str(optWeights[symbol.Value]) + ' to ' + str(weights[symbol]))
else:
weights[symbol] = optWeights[symbol.Value]
smaModifiedWeight = weights[symbol]
# check if macd condition is met and act accordingly ----------------------------------
macdCondition = self.parametersDict[symbol.Value]['macd'][1]
macdConditionWeight = self.parametersDict[symbol.Value]['macd'][2]
# calculate our macd vs signal score between -1 and 1
macdMinusSignal = symbolData.MACD.Current.Value - symbolData.MACD.Signal.Current.Value
macdVsSignalScore = macdMinusSignal / (1 + abs(macdMinusSignal))
algorithm.Log(str(symbol.Value)
+ '; MACD: ' + str(round(symbolData.MACD.Current.Value, 2))
+ '; MACD Signal: ' + str(round(symbolData.MACD.Signal.Current.Value, 2))
+ '; MACD vs Signal Score: ' + str(round(macdVsSignalScore, 2)))
if macdVsSignalScore <= macdCondition:
weights[symbol] = min(smaModifiedWeight, macdConditionWeight)
algorithm.Log(str(symbol.Value)
+ '; modifying weight due to macd filtering from '
+ str(smaModifiedWeight) + ' to ' + str(weights[symbol]))
else:
weights[symbol] = smaModifiedWeight
macdModifiedWeight = weights[symbol]
# check if yield condition is met and act accordingly ----------------------------------
activateYield = self.parametersDict[symbol.Value]['yield'][0]
yieldConditionWeight = self.parametersDict[symbol.Value]['yield'][1]
if yieldSignalCrisis and activateYield:
weights[symbol] = min(macdModifiedWeight, yieldConditionWeight)
else:
weights[symbol] = macdModifiedWeight
else:
weights[symbol] = optWeights[symbol.Value]
algorithm.Log(str(symbol.Value) + '; indicators are not ready so assign optimal weight')
return weights
class SymbolData:
def __init__(self, symbol, parametersDict):
self.Symbol = symbol
self.dailyReturnsSeries = None
smaPeriod = parametersDict[symbol.Value]['sma'][0]
self.SMA = SimpleMovingAverage(smaPeriod)
macdFastPeriod = parametersDict[self.Symbol.Value]['macd'][0][0]
macdSlowPeriod = parametersDict[self.Symbol.Value]['macd'][0][1]
macdSignalPeriod = parametersDict[self.Symbol.Value]['macd'][0][2]
self.MACD = MovingAverageConvergenceDivergence(macdFastPeriod, macdSlowPeriod, macdSignalPeriod,
MovingAverageType.Exponential)
def CalculateDailyReturnSeries(self, history, lookbackOptimization):
""" Calculate the daily returns series for each security """
tempReturnsSeries = history.loc[str(self.Symbol)]['close'].pct_change(periods=2).dropna() # 2-day returns
self.dailyReturnsSeries = tempReturnsSeries[-lookbackOptimization:]
def UpdateIndicators(self, history):
""" Update the indicators with historical data """
for index, row in history.loc[str(self.Symbol)].iterrows():
self.SMA.Update(index, row['close'])
self.MACD.Update(index, row['close'])
#region imports
from AlgorithmImports import *
#endregion
from OptimizerClass import PortfolioOptimizer
import pandas as pd
import general_utils as utils
class RotationalOptimizerAlphaCreationModel(AlphaModel):
def __init__(self, algorithm, name, activateAlpha, parametersDict, rebalancingFrequency,
lookbackOptimization, objFunction, tickerCash, treasuryRatesSymbol):
self.algo = algorithm
self.Name = name
self.activateAlpha = activateAlpha
self.parametersDict = parametersDict
self.rebalancingFrequency = rebalancingFrequency
self.lookbackOptimization = lookbackOptimization
self.objFunction = objFunction
self.tickerCash = tickerCash
self.treasuryRatesSymbol = treasuryRatesSymbol
self.insuranceMonitor = False
self.triggerSignals = False
self.tickerVolatility = self.algo.volatilityTicker
self.tickerVol = self.algo.volTicker
self.permInsurance = 0.3
# for yield signal crisis
self.lookbackNegativeYield = 147 # number of days to lookback for negative values
self.startCrisisYieldValue = 0.3 # the yield value above which we apply the yield weight condition
# initialize the optimizer
self.optimizer = PortfolioOptimizer()
# get all the parameters for the indicators
valuesList = []
self.alphaTickers = []
for ticker in parametersDict.keys():
if parametersDict[ticker]['addTicker'][0]:
valuesList.append(parametersDict[ticker]['sma'][0])
valuesList.append(sum(parametersDict[ticker]['macd'][0]))
self.alphaTickers.append(ticker)
tradableTicker = parametersDict[ticker]['addTicker'][1]
if tradableTicker != ticker:
self.alphaTickers.append(tradableTicker)
#self.alphaTickers.append(self.tickerCash)
self.alphaTickers.append(self.tickerVolatility)
self.alphaTickers.append(self.tickerVol)
self.alphaTickers.append("TIP")
self.alphaTickers.append("IEF")
self.alphaTickers.append("GLD")
self.alphaTickers.append("UUP")
self.alphaTickers.append("GSY")
self.smaIEF = self.algo.SMA("IEF", 100, Resolution.Daily)
self.smaTIP = self.algo.SMA("TIP", 100, Resolution.Daily)
self.smaSPY = self.algo.SMA("SPY", 100, Resolution.Daily)
self.smaGLD = self.algo.SMA("GLD", 100, Resolution.Daily)
self.smaUUP = self.algo.SMA("UUP", 100, Resolution.Daily)
#self.allAssetsBelowMA = False
# keep the highest parameter provided to call history
self.lookbackHistory = max(lookbackOptimization, max(valuesList))
self.weightsDict = {}
self.timeToEmitInsights = False
self.atrStocksCrossover = None
self.atrBondsCrossover = None
self.weightsRetrieved = False
self.weekStart = False
# Sudhir changes to code to optimize on start if nothing found in object store.
if self.algo.LiveMode:
dictFromOS = utils.ReadFromObjectStore(self.algo, self.Name)
if dictFromOS:
self.algo.Log(str(self.Name) + 'Retrieving last optimal weights from object store: ' + str(dictFromOS))
self.weightsDict = {self.algo.Symbol(ticker): weight for ticker, weight in dictFromOS.items()}
self.weightsRetrieved = True
def Update(self, algorithm, data):
insights = []
if not self.timeToEmitInsights:
return insights
insightExpiry = Expiry.EndOfDay(algorithm.Time)
# loop through insights and send portfolio targets
for symbol, weight in self.weightsDict.items():
if symbol.Value.split('_')[0] != 'CUSTOM' and not data.ContainsKey(symbol):
continue
if weight > 0:
insightDirection = InsightDirection.Up
elif weight < 0:
insightDirection = InsightDirection.Down
else:
insightDirection = InsightDirection.Flat
insights.append(Insight.Price(symbol, insightExpiry, insightDirection, None, None, None, abs(weight)))
self.timeToEmitInsights = False
return insights
def CalculateSignals(self):
""" Create new signals """
if not self.activateAlpha:
return
# get active symbols for this alpha
symbols = [x.Symbol for x in self.algo.ActiveSecurities.Values if x.Symbol.Value in self.alphaTickers]
# determine target percent for the given insights (check function DetermineTargetPercent for details)
self.weightsDict = self.DetermineTargetPercent(self.algo, symbols)
#self.timeToEmitInsights = True
if self.algo.LiveMode or self.algo.BackTestWarmUp == True:
dictToOS = {symbol.Value: weight for symbol, weight in self.weightsDict.items()}
utils.SaveToObjectStore(self.algo, self.Name, dictToOS)
def GenerateSignals(self):
self.timeToEmitInsights = True
def CheckForSignals(self):
if not self.activateAlpha:
return
self.algo.Log(str(self.Name) + " Parameters ------------->")
self.algo.allAssetsBelowMA = self.algo.Securities["SPY"].Price < self.smaSPY.Current.Value and self.algo.Securities["IEF"].Price < self.smaIEF.Current.Value and self.algo.Securities["GLD"].Price < self.smaGLD.Current.Value
if self.weightsRetrieved == True:
self.weightsRetrieved = False
#self.timeToEmitInsights = True
if self.triggerSignals:
self.algo.Log(str(self.Name) + 'Triggering GenerateSignals based on schedule')
self.CalculateSignals()
self.triggerSignals = False
else:
if (self.atrStocksCrossover is not None and self.atrBondsCrossover is not None
and (self.atrStocksCrossover.Current.Value > 1.5 or self.atrBondsCrossover.Current.Value > 1.5) # ):
and (self.weekStart)):
self.algo.Log(str(self.Name) + 'Triggering GenerateSignals due to ATR Crossover')
self.CalculateSignals()
if self.algo.risklevel in [4,5]: # or self.algo.reversalRisk:
self.insuranceMonitor = True
self.algo.Log(str(self.Name) + 'Triggering Weight Adjustment due to High Risk flag')
# get active symbols for this alpha
symbols = [x.Symbol for x in self.algo.ActiveSecurities.Values if x.Symbol.Value in self.alphaTickers]
self.AdjustWeightsForInsurance()
if self.insuranceMonitor == True and not (self.algo.risklevel in [4,5] or self.algo.reversalRisk):
self.CalculateSignals()
self.insuranceMonitor = False
self.weekStart = False
def AdjustWeightsForInsurance(self):
if self.algo.risklevel in [5]:
insurancePercentage = 0.80
elif self.algo.risklevel in [4] or self.algo.reversalRisk:
insurancePercentage = 0.90
else:
insurancePercentage = 1.00
if self.algo.risklevel in [4,5] or self.algo.reversalRisk:
stocksWeight = 0
for symbol, weight in self.weightsDict.items():
if symbol.Value == "SPXL" or symbol.Value == 'TQQQ':
stocksWeight = stocksWeight + self.weightsDict[symbol]
self.weightsDict[symbol] = self.weightsDict[symbol]*insurancePercentage
stocksWeight = stocksWeight*(1- insurancePercentage)
for symbol, weight in self.weightsDict.items():
if symbol.Value == self.tickerVolatility:
self.weightsDict[symbol] = self.weightsDict[symbol] + stocksWeight*(1-self.permInsurance)
if symbol.Value == self.tickerVol:
self.weightsDict[symbol] = self.weightsDict[symbol] + stocksWeight*self.permInsurance
#self.timeToEmitInsights = True
def OnWeekStart(self):
self.weekStart = True
def TriggerScheduledSignals(self):
self.triggerSignals = True
def OnSecuritiesChanged(self, algorithm, changes):
""" Actions to take every time the universe changes """
for security in changes.AddedSecurities:
if security.Symbol.Value == 'SPY':
symbol = security.Symbol
algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes), self.TriggerScheduledSignals)
algorithm.Schedule.On(algorithm.DateRules.EveryDay(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes + 1), self.CheckForSignals)
algorithm.Schedule.On(algorithm.DateRules.EveryDay(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.GenerateSignals)
algorithm.Schedule.On(algorithm.DateRules.WeekStart(symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes), self.OnWeekStart)
# initialize atr indicators
if self.activateAlpha and security.Symbol.Value in ['SPY', 'TLT']:
shortATR = algorithm.ATR(security.Symbol, 10, Resolution.Hour)
longATR = algorithm.ATR(security.Symbol, 63, Resolution.Hour)
if security.Symbol.Value == 'SPY':
self.atrStocksCrossover = IndicatorExtensions.Over(shortATR, longATR)
elif security.Symbol.Value == 'TLT':
self.atrBondsCrossover = IndicatorExtensions.Over(shortATR, longATR)
def DetermineTargetPercent(self, algorithm, symbols):
"""
Description:
Determine the target percent for each insight
Args:
algorithm: The algorithm instance
symbols: The symbols to generate an insight for
"""
# empty dictionary to store portfolio targets by symbol
finalWeights = {}
# get symbols for calculations
calculationSymbols = [x for x in symbols if x.Value in self.parametersDict.keys()]
# get historical data for calculationSymbols for the last n trading days
history = algorithm.History(calculationSymbols, self.lookbackHistory, Resolution.Daily)
# empty dictionary for calculations
calculations = {}
# iterate over all symbols and perform calculations
for symbol in calculationSymbols:
# check if we have enough historical data, otherwise just skip this security
if not utils.CheckHistory(symbol, history):
algorithm.Log(str(symbol.Value) + ': skipping security due to no/not enough historical data')
continue
else:
# add symbol to calculations
calculations[symbol] = SymbolData(symbol, parametersDict=self.parametersDict)
try:
# get series of daily returns
calculations[symbol].CalculateDailyReturnSeries(history, self.lookbackOptimization)
# update technical indicators
calculations[symbol].UpdateIndicators(history)
except BaseException as e:
algorithm.Log('returning empty weights for now for: ' + str(symbol.Value)
+ 'due to ' + str(e) + '; we will try again at the next iteration')
return finalWeights
# calculate optimal weights
optWeights = self.CalculateOptimalWeights(algorithm, calculations)
algorithm.Log('optimal weights for the period: ' + str(optWeights))
if optWeights is None:
algorithm.Log('returning empty weights due to optWeights being None; we will try again at the next iteration')
return finalWeights
# modify optimal weights using specific criteria
finalWeights = self.FilterOptimalWeights(algorithm, calculations, optWeights)
algorithm.Log(str(self.Name) + 'Filtered optimal weights for the period: ' + str({symbol.Value: weight
for symbol, weight in finalWeights.items()}))
# check if we can trade the tradable symbol, otherwise we trade the original ticker
replaceSymbol = {}
for symbol in finalWeights:
tradableTicker = self.parametersDict[symbol.Value]['addTicker'][1]
if symbol.Value != tradableTicker:
tradableSymbol = algorithm.Symbol(tradableTicker)
# check if the price of tradableSymbol is not zero
if algorithm.Securities[tradableSymbol].Price != 0:
# we trade the original ticker
replaceSymbol[tradableSymbol] = symbol
for newSymbol, replacedSymbol in replaceSymbol.items():
finalWeights[newSymbol] = finalWeights[replacedSymbol]
finalWeights.pop(replacedSymbol)
# check how much we have allocated so far to risky assets
totalWeight = sum(finalWeights.values())
if totalWeight >= 1:
totalWeight = 1
algorithm.Log('total allocation after weight filtering: ' + str(totalWeight))
if self.algo.Securities["TIP"].Price != 0 and self.smaTIP.Current.Value != 0:
IEForTIP = (self.algo.Securities["IEF"].Price / self.algo.Securities["TIP"].Price) > (self.smaIEF.Current.Value/ self.smaTIP.Current.Value)
else:
IEForTIP = True
if self.algo.Securities["UUP"].Price != 0 and self.smaUUP.Current.Value != 0:
TIPorUUP = (self.algo.Securities["TIP"].Price / self.algo.Securities["UUP"].Price) > (self.smaTIP.Current.Value/ self.smaUUP.Current.Value)
else:
TIPorUUP = True
if (not TIPorUUP or self.algo.allAssetsBelowMA) and self.algo.uup == 1 :
iefallocation = 0.0
tipallocation = 0.0
uupallocation = 1.0
else:
iefallocation = 0.0
tipallocation = 1.0
uupallocation = 0.0
volallocation = 0.0
for symbol in symbols:
if symbol.Value == self.tickerVolatility or symbol.Value == self.tickerVol or symbol.Value in ['GSY', 'IEF', 'TIP']:
finalWeights[symbol] = 0
for symbol in symbols:
if symbol.Value == 'IEF':
finalWeights[symbol] = finalWeights[symbol] + (1 - totalWeight)*iefallocation
if symbol.Value == 'TIP':
finalWeights[symbol] = finalWeights[symbol] + (1- totalWeight)*tipallocation
if symbol.Value == 'UUP':
finalWeights[symbol] = finalWeights[symbol] + (1- totalWeight)*uupallocation
if symbol.Value == self.tickerVol:
finalWeights[symbol] = finalWeights[symbol] + (1- totalWeight)*volallocation
# avoid very small numbers and make them 0
for symbol, weight in finalWeights.items():
if weight <= 1e-10:
finalWeights[symbol] = 0
return finalWeights
def CalculateOptimalWeights(self, algorithm, calculations):
"""
Description:
Calculate the individual weights for each symbol that optimize some given objective function
Args:
algorithm: The algorithm instance
calculations: Dictionary containing calculations for each symbol
"""
# create a dictionary keyed by the symbols in calculations with a pandas.Series as value
# to create a dataframe of returns
dailyReturnsDict = {symbol.Value: symbolData.dailyReturnsSeries for symbol, symbolData in calculations.items()}
dailyReturnsDf = pd.DataFrame(dailyReturnsDict)
listTickers = list(dailyReturnsDf.columns)
try:
# portfolio optimizer finds the optimal weights for the given data
bounds = [self.parametersDict[x]['optimizationBounds'] for x in dailyReturnsDf.columns]
listOptWeights = self.optimizer.Optimize(objFunction=self.objFunction, dailyReturnsDf=dailyReturnsDf,
bounds=bounds)
except BaseException as e:
algorithm.Log('Optimize failed due to ' + str(e))
return None
# create dictionary with the optimal weights by symbol
weights = {listTickers[i]: listOptWeights[i] for i in range(len(listTickers))}
# avoid very small numbers and make them 0
for ticker, weight in weights.items():
if weight <= 1e-10:
weights[ticker] = 0
return weights
def CheckYieldSignalCrisis(self):
""" Check the Yield condition """
yieldSignalCrisis = False
# get the last six months of historical USTREASURY/YIELD values
histYield = self.algo.History([self.treasuryRatesSymbol], self.lookbackNegativeYield + 1,
Resolution.Daily).loc[self.treasuryRatesSymbol]
tenYr = histYield['10 yr'] # get the 10-year yield
threeMo = histYield['3 mo'] # get the 3-month yield
tenYrMinusThreeMo = tenYr - threeMo # calculate the difference between the two
indexNegative = tenYrMinusThreeMo[tenYrMinusThreeMo < 0].head(1).index
# check if there was actually some negative yield values
if len(indexNegative) > 0:
cutOff = indexNegative[0]
# filter the series for days after that day with negative value
afterNegative = tenYrMinusThreeMo[tenYrMinusThreeMo.index > cutOff]
# check if at some point it reached our startCrisisYieldValue
if len(afterNegative) > 0 and max(afterNegative) > self.startCrisisYieldValue:
yieldSignalCrisis = True
return yieldSignalCrisis
def FilterOptimalWeights(self, algorithm, calculations, optWeights):
"""
Description:
Filter and modify the optimal weights using a combination of technical indicators
Args:
algorithm: The algorithm instance
calculations: Dictionary containing calculations for each symbol
optWeights: Dictionary with the optimal weights by symbol
"""
# check for yield signal crisis
yieldSignalCrisis = self.CheckYieldSignalCrisis()
# empty dictionary to store weights
weights = {}
# loop through calculations and check conditions for weight filtering ------------------------
for symbol, symbolData in calculations.items():
if symbolData.SMA.IsReady and symbolData.MACD.IsReady:
currentPrice = algorithm.ActiveSecurities[symbol].Price
# check if sma condition is met and act accordingly ----------------------------------
smaLowerBoundCondition = self.parametersDict[symbol.Value]['sma'][1][0]
smaUpperBoundCondition = self.parametersDict[symbol.Value]['sma'][1][1]
smaConditionWeight = self.parametersDict[symbol.Value]['sma'][2]
algorithm.Log(str(symbol.Value)
+ '; current price: ' + str(round(currentPrice, 2))
+ '; SMA: ' + str(round(symbolData.SMA.Current.Value, 2))
+ '; Price vs SMA: ' + str(round((currentPrice / symbolData.SMA.Current.Value) - 1, 2)))
if (currentPrice <= symbolData.SMA.Current.Value * (1 + smaLowerBoundCondition)
or currentPrice >= symbolData.SMA.Current.Value * (1 + smaUpperBoundCondition)):
weights[symbol] = min(optWeights[symbol.Value], smaConditionWeight)
algorithm.Log(str(symbol.Value)
+ '; modifying weight due to sma filtering from '
+ str(optWeights[symbol.Value]) + ' to ' + str(weights[symbol]))
else:
weights[symbol] = optWeights[symbol.Value]
smaModifiedWeight = weights[symbol]
# check if macd condition is met and act accordingly ----------------------------------
macdCondition = self.parametersDict[symbol.Value]['macd'][1]
macdConditionWeight = self.parametersDict[symbol.Value]['macd'][2]
# calculate our macd vs signal score between -1 and 1
macdMinusSignal = symbolData.MACD.Current.Value - symbolData.MACD.Signal.Current.Value
macdVsSignalScore = macdMinusSignal / (1 + abs(macdMinusSignal))
algorithm.Log(str(symbol.Value)
+ '; MACD: ' + str(round(symbolData.MACD.Current.Value, 2))
+ '; MACD Signal: ' + str(round(symbolData.MACD.Signal.Current.Value, 2))
+ '; MACD vs Signal Score: ' + str(round(macdVsSignalScore, 2)))
if macdVsSignalScore <= macdCondition:
weights[symbol] = min(smaModifiedWeight, macdConditionWeight)
algorithm.Log(str(symbol.Value)
+ '; modifying weight due to macd filtering from '
+ str(smaModifiedWeight) + ' to ' + str(weights[symbol]))
else:
weights[symbol] = smaModifiedWeight
macdModifiedWeight = weights[symbol]
# check if yield condition is met and act accordingly ----------------------------------
activateYield = self.parametersDict[symbol.Value]['yield'][0]
yieldConditionWeight = self.parametersDict[symbol.Value]['yield'][1]
if yieldSignalCrisis and activateYield:
weights[symbol] = min(macdModifiedWeight, yieldConditionWeight)
else:
weights[symbol] = macdModifiedWeight
else:
weights[symbol] = optWeights[symbol.Value]
algorithm.Log(str(symbol.Value) + '; indicators are not ready so assign optimal weight')
return weights
class SymbolData:
def __init__(self, symbol, parametersDict):
self.Symbol = symbol
self.dailyReturnsSeries = None
smaPeriod = parametersDict[symbol.Value]['sma'][0]
self.SMA = SimpleMovingAverage(smaPeriod)
macdFastPeriod = parametersDict[self.Symbol.Value]['macd'][0][0]
macdSlowPeriod = parametersDict[self.Symbol.Value]['macd'][0][1]
macdSignalPeriod = parametersDict[self.Symbol.Value]['macd'][0][2]
self.MACD = MovingAverageConvergenceDivergence(macdFastPeriod, macdSlowPeriod, macdSignalPeriod,
MovingAverageType.Exponential)
def CalculateDailyReturnSeries(self, history, lookbackOptimization):
""" Calculate the daily returns series for each security """
tempReturnsSeries = history.loc[str(self.Symbol)]['close'].pct_change(periods=2).dropna() # 2-day returns
self.dailyReturnsSeries = tempReturnsSeries[-lookbackOptimization:]
def UpdateIndicators(self, history):
""" Update the indicators with historical data """
for index, row in history.loc[str(self.Symbol)].iterrows():
self.SMA.Update(index, row['close'])
self.MACD.Update(index, row['close'])from AlgorithmImports import *
import numpy as np
class VixTermStructureAlphaCreationModel(AlphaModel):
def __init__(self, algorithm, name, activateAlpha, parametersDict, vtsSymbols, canarySymbol,
periodShortSMA, periodLongSMA, contangoThresholds, rebalancingFrequency):
self.algo = algorithm
self.Name = name
self.activateAlpha = activateAlpha
self.parametersDict = parametersDict
self.canarySymbol = canarySymbol
self.contangoThresholds = contangoThresholds
self.rebalancingFrequency = rebalancingFrequency
self.weightsDict = {}
self.timeToEmitInsights = False
self.isCanaryGreaterThanMA = False
self.isCanaryLessThanMA = False
self.isCanaryAboveMA = True
self.atrStocksCrossover = None
self.vtsRatioDiffMA = SimpleMovingAverage(3)
self.waterMark = -0.10
self.priceWaterMark = -0.10
self.waterMarkStocks = -0.03
self.VolatilityStopped = False
self.counterdays = 0
self.currentPrice = 0
self.entryPrice = 0
self.isAwesomeOscillatorNegative = False
self.isTermStructureNegative = False
self.tickerVolatility = self.algo.volatilityTicker
self.tickerVol = self.algo.volTicker
self.decayFactor = 1
self.canaryDecay = 1
self.canaryLastPeak = 0
self.oldRisk = 0
self.noRiskCounter = 0
self.IEFSymbol = self.algo.AddEquity('IEF', Resolution.Hour).Symbol
self.TIPSymbol = self.algo.AddEquity('TIP', Resolution.Hour).Symbol
self.VIXSymbol = vtsSymbols[0]
self.VXVSymbol = vtsSymbols[1]
self.VIXStopLossLevel = 0.0175
self.periodLongSMA = periodLongSMA
self.periodShortSMA = periodShortSMA
if self.activateAlpha:
self.vixLongSMA = algorithm.SMA(self.VIXSymbol, periodLongSMA, Resolution.Daily)
self.vxvLongSMA = algorithm.SMA(self.VXVSymbol , periodLongSMA, Resolution.Daily)
self.vixShortSMA = algorithm.SMA(self.VIXSymbol, periodShortSMA, Resolution.Daily)
self.vxvShortSMA = algorithm.SMA(self.VXVSymbol , periodShortSMA, Resolution.Daily)
self.canarySymbolSMA = algorithm.SMA(self.canarySymbol, 231, Resolution.Daily)
self.canarySymbolSMA34 = algorithm.SMA(self.canarySymbol, 21, Resolution.Daily)
self.canarySymbolSMA05 = algorithm.SMA(self.canarySymbol, 3, Resolution.Daily)
self.smaIEF = self.algo.SMA(self.IEFSymbol, 21, Resolution.Daily)
self.smaTIP = self.algo.SMA(self.TIPSymbol, 21, Resolution.Daily)
def Update(self, algorithm, data):
insights = []
if not self.timeToEmitInsights:
return insights
insightExpiry = Expiry.EndOfDay(algorithm.Time)
# loop through insights and send portfolio targets
for symbol, weight in self.weightsDict.items():
if symbol.Value.split('_')[0] != 'CUSTOM' and not data.ContainsKey(symbol):
continue
if weight > 0:
insightDirection = InsightDirection.Up
elif weight < 0:
insightDirection = InsightDirection.Down
else:
insightDirection = InsightDirection.Flat
insights.append(Insight.Price(symbol, insightExpiry, insightDirection, None, None, None, abs(weight)))
self.timeToEmitInsights = False
return insights
def EvaluateRisk(self):
self.algo.Log(str(self.Name) + " Parameters ------------->")
if (not self.activateAlpha or not self.vixLongSMA.IsReady
or not self.vxvLongSMA.IsReady or not self.canarySymbolSMA):
return
calculationSymbols = [self.IEFSymbol, self.TIPSymbol, self.canarySymbol]
history = self.algo.History(calculationSymbols, 100, Resolution.Daily)
df = self.algo.History(self.VIXSymbol, self.periodLongSMA*2, Resolution.Daily).close.unstack(0)
vixShortSMAValue = np.mean(df.tail(self.periodShortSMA).values)
vixLongSMAValue = np.mean(df.tail(self.periodLongSMA).values)
self.algo.Log("VIX Values: " + str(df))
#self.algo.Debug(str(df.tail(self.periodShortSMA).values))
#self.algo.Debug(str(df))
#self.algo.Debug('Short SMA ' + str(vixShortSMAValue))
#self.algo.Debug(str(df.tail(self.periodLongSMA).values))
#self.algo.Debug('Long SMA ' + str(vixLongSMAValue))
df = self.algo.History(self.VXVSymbol,self.periodLongSMA*2).close.unstack(0)
vxvShortSMAValue = np.mean(df.tail(self.periodShortSMA).values)
vxvLongSMAValue = np.mean(df.tail(self.periodLongSMA).values)
self.algo.Log("VIX 3M Values: " + str(df))
# made it a dynamic calculation to avoid problems with the Indicator Extension
#vtsRatioShort = self.vxvShortSMA.Current.Value / self.vixShortSMA.Current.Value
#vtsRatioLong = self.vxvLongSMA.Current.Value / self.vixLongSMA.Current.Value
vtsRatioShort = vxvShortSMAValue / vixShortSMAValue
vtsRatioLong = vxvLongSMAValue / vixLongSMAValue
diff = vtsRatioShort - vtsRatioLong
self.vtsRatioDiffMA.Update(self.algo.Time, diff)
isvtsRatioDiffMANegative = self.vtsRatioDiffMA.Current.Value < 0
canaryPrice = self.algo.Securities[self.canarySymbol].Price
canaryMA = self.canarySymbolSMA.Current.Value
canaryMA34 = self.canarySymbolSMA34.Current.Value
canaryMA05 = self.canarySymbolSMA05.Current.Value
self.IsCanaryGreaterThanMA = False #canaryPrice > canaryMA * 1.1
self.isCanaryLessThanMA = canaryPrice < canaryMA
self.isTermStructureNegative = vtsRatioShort < vtsRatioLong
#isTermStructureNegative = isvtsRatioDiffMANegative
isTermStructureLower = vtsRatioShort < 0.923
isTermStructureGreater = vtsRatioShort > 1.25
isVixGreater = vixShortSMAValue > 24
isATRCrossOverSPY = self.atrStocksCrossover.Current.Value > 1.5
self.isAwesomeOscillatorNegative = canaryMA05 < canaryMA34
# isVixTrendingHigher = self.vixShortSMA.Current.Value > self.vixLongSMA.Current.Value
isVixTrendingHigher = vixShortSMAValue > vixLongSMAValue
#self.algo.isRiskHighest = isTermStructureNegative and isTermStructureLower and isVixGreater
#self.algo.isRiskHigh = isTermStructureNegative and isVixGreater
self.algo.isRiskHighest = False
self.algo.isRiskHigh = False
self.algo.isRisk = False
if isTermStructureGreater : #or (self.isTermStructureNegative and vtsRatioShort > 1.20):
self.algo.reversalRisk = True
else:
self.algo.reversalRisk = False
self.algo.Log("VIX Latest Value :" + str(self.algo.Securities[self.VIXSymbol].Price))
self.algo.Log("Term Structure Negative ? :" + str(self.isTermStructureNegative) + "VTSRatioShort:" + str(vtsRatioShort) + "VTSRatioLong:" + str(vtsRatioLong))
self.algo.Log("Is Awesome Oscillator Negative? :" + str(self.isAwesomeOscillatorNegative) + "Short MA " + str(canaryMA05) + "Long MA " + str(canaryMA34))
self.algo.Log("VIX Trending Higher ? :" + str(isVixTrendingHigher) + "VIXShort :" + str(vixShortSMAValue) + "VIXLong :" + str(vixLongSMAValue))
self.algo.Log("Term Structure Lower ? :" + str(isTermStructureLower))
self.algo.Log("Is VIX Greater ? :" + str(isVixGreater))
self.algo.Log("Has ATR CrossOvered ? :" + str(isATRCrossOverSPY) + str(self.atrStocksCrossover.Current.Value))
self.algo.Log("Is SPY < MA ? :" + str(self.isCanaryLessThanMA) + "Current Price " + str(canaryPrice) + "MA " + str(canaryMA))
isRiskCondition5 = self.isTermStructureNegative and isVixTrendingHigher and self.isAwesomeOscillatorNegative and isVixGreater
isRiskCondition4 = (self.isTermStructureNegative and isVixTrendingHigher) or (self.isAwesomeOscillatorNegative and isVixTrendingHigher) or (self.isAwesomeOscillatorNegative and self.isCanaryLessThanMA)
#isRiskCondition4 = True
if (isATRCrossOverSPY or self.isCanaryLessThanMA) or isRiskCondition5:
if self.isTermStructureNegative and isTermStructureLower:
self.algo.risklevel = 5
elif isRiskCondition4:
self.algo.risklevel = 4
else:
if self.isTermStructureNegative:
self.algo.risklevel = 4
elif vixShortSMAValue > 22:
self.algo.risklevel = 3
elif self.algo.reversalRisk or self.isCanaryLessThanMA:
self.algo.risklevel = 2
else:
self.algo.risklevel = 1
else:
if (self.isTermStructureNegative and isTermStructureLower and isVixGreater) or isRiskCondition5:
self.algo.risklevel = 5
elif isRiskCondition4:
self.algo.risklevel = 4
else:
if self.isTermStructureNegative and self.isAwesomeOscillatorNegative: #isVixGreater:
self.algo.risklevel = 4
elif vixShortSMAValue > 22 :
self.algo.risklevel = 3
elif self.algo.reversalRisk or self.isCanaryLessThanMA:
self.algo.risklevel = 2
else:
self.algo.risklevel = 1
if (vtsRatioShort < 0.85 and self.isTermStructureNegative == False):
self.algo.risklevel = 0
elif (vtsRatioShort > 0.923 and vixShortSMAValue > 27 and self.isTermStructureNegative == False and self.isAwesomeOscillatorNegative == False ):
self.algo.risklevel = 1
self.algo.isRiskHighest = False
self.algo.isRiskHigh = False
self.algo.isRisk = False
self.algo.Risk = False
if self.algo.risklevel == 5:
self.algo.isRiskHighest = True
elif self.algo.risklevel == 4:
self.algo.isRiskHigh = True
self.algo.isRisk = True
elif self.algo.risklevel == 3:
self.algo.Risk = True
self.weightsDict = {security.Symbol: self.parametersDict[security.Symbol.Value][self.algo.risklevel]
for security in self.algo.ActiveSecurities.Values
if security.Symbol.Value in self.parametersDict}
if self.algo.Securities[self.TIPSymbol].Price != 0 and self.smaTIP.Current.Value != 0:
IEForTIP = (self.algo.Securities[self.IEFSymbol].Price / self.algo.Securities[self.TIPSymbol].Price) > (self.smaIEF.Current.Value/ self.smaTIP.Current.Value)
else:
IEForTIP = True
self.algo.Log("Is IEF > TIP ? :" + str(IEForTIP))
self.algo.Log("is Risk Highest :" + str(self.algo.isRiskHighest))
self.algo.Log("is Risk High :" + str(self.algo.isRiskHigh))
self.algo.Log("is Risk :" + str(self.algo.isRisk))
self.algo.Log("Stop Loss Level :" + str(self.VIXStopLossLevel))
self.algo.Plot('Risk Level', 'Risk Level', self.algo.risklevel)
TMFweight = 0
#IEForTIP = True
if not IEForTIP:
for symbol, weight in self.weightsDict.items():
if symbol.Value == "TMF":
TMFweight = self.weightsDict[symbol]
self.weightsDict[symbol] = 0
for symbol, weight in self.weightsDict.items():
if symbol.Value == self.TIPSymbol.Value:
self.weightsDict[symbol] = self.weightsDict[symbol] + TMFweight
if self.algo.risklevel in [5]:
self.VIXStopLossLevel = 0.02
elif self.algo.risklevel in [2,3,4]:
self.VIXStopLossLevel = 0.02
else:
self.VIXStopLossLevel = 0.02
if self.algo.risklevel in [2,3,4,5]:
isRisk = True
self.decayFactor = self.decayFactor * 1
elif self.algo.reversalRisk:
isRisk = True
self.decayFactor = self.decayFactor * 1
else:
isRisk = False
self.decayFactor = 1
if self.oldRisk in [2,3,4,5] and self.algo.risklevel in [0,1]:
self.noRiskCounter = 1
elif self.algo.risklevel in [0,1]:
self.noRiskCounter +=1
self.oldRisk = self.algo.risklevel
if canaryPrice > self.canaryLastPeak:
self.canaryLastPeak = canaryPrice
canaryDD = (canaryPrice/ self.canaryLastPeak)-1
if self.algo.risklevel in [0,1] and self.noRiskCounter > 3:
self.canaryDecay = 1 #min(1,self.canaryDecay*1.1)
self.noRiskCounter = 0
elif canaryDD >= -0.01:
self.canaryDecay = 1
elif canaryDD < -0.05 :
self.canaryDecay = max(0.5, self.canaryDecay*0.95)
elif canaryDD < -0.01:
self.canaryDecay = max(0.5, self.canaryDecay*0.98)
#self.algo.Plot('Canary Decay', 'Canary Decay', self.canaryDecay)
for symbol, weight in self.weightsDict.items():
if symbol.Value == "TQQQ" or symbol.Value == 'SPXL':
stocksWeight = self.weightsDict[symbol]
self.weightsDict[symbol] = stocksWeight*self.canaryDecay*self.decayFactor
for symbol, weight in self.weightsDict.items():
if symbol.Value == self.tickerVolatility :
currentWeight = self.weightsDict[symbol]
self.weightsDict[symbol] = currentWeight + stocksWeight*(1 - self.canaryDecay*self.decayFactor)*0.8
if symbol.Value == self.tickerVol :
currentWeight = self.weightsDict[symbol]
self.weightsDict[symbol] = currentWeight + stocksWeight*(1 - self.canaryDecay*self.decayFactor)*0.1
if symbol.Value == 'UUP' :
currentWeight = self.weightsDict[symbol]
self.weightsDict[symbol] = currentWeight + stocksWeight*(1 - self.canaryDecay*self.decayFactor)*0.1
def GenerateSignals(self):
for symbol, weight in self.weightsDict.items():
if symbol.Value == self.tickerVolatility and weight > 0 and self.algo.stopLossTriggered == True:
self.entryPrice = self.algo.Securities[self.VIXSymbol].Price
else:
self.entryPrice = 0
self.timeToEmitInsights = True
def VolatilityStopLoss(self):
if (self.algo.Time.hour >= 12 and self.algo.Time.hour <= 15) :
stopLossLevel = self.VIXStopLossLevel
for symbol, weight in self.weightsDict.items():
if symbol.Value == self.tickerVolatility and weight > 0 : #and not self.algo.stopLossTriggered:
vxxHoldingsPercentage = (self.algo.Securities[symbol.Value].Holdings.Quantity * self.algo.Securities[symbol.Value].Holdings.Price)/ self.algo.Portfolio.TotalPortfolioValue
#vxxHoldingsPercentage = (self.algo.Securities[symbol.Value].Holdings.Quantity * self.entryPrice)/ self.algo.Portfolio.TotalPortfolioValue
pnlVolatility = self.algo.Securities[symbol.Value].Holdings.UnrealizedProfitPercent
if (pnlVolatility > self.waterMark + stopLossLevel): #and pnlVolatility != 0:
self.waterMark = pnlVolatility - stopLossLevel
self.algo.Log("Current Level " + str(pnlVolatility) + " Water Mark Level " + str(self.waterMark) + " Stop Loss Level " + str(stopLossLevel))
if pnlVolatility < self.waterMark: #changed to a different stop loss mechanism, but returned back to 0.
self.weightsDict[symbol] = 0
#if self.algo.stopLossTriggered == False:
if self.algo.isRiskHighest:
self.algo.SetHoldings(symbol, min(0.00, vxxHoldingsPercentage))
elif self.algo.isRiskHigh:
self.algo.SetHoldings(symbol, min(0.00, vxxHoldingsPercentage))
elif self.algo.isRisk:
self.algo.SetHoldings(symbol, min(0.00, vxxHoldingsPercentage))
else:
self.algo.SetHoldings(symbol, 0.00)
self.algo.stopLossTriggered = True
self.waterMark = -0.03
def VolatilityStopLossNew(self):
if (self.algo.Time.hour >= 12 and self.algo.Time.hour <= 15) :
stopLossLevel = self.VIXStopLossLevel
for symbol, weight in self.weightsDict.items():
if symbol.Value == self.tickerVolatility and weight > 0.01 : #and not self.algo.stopLossTriggered:
self.currentPrice = self.algo.Securities[self.VIXSymbol].Price
if self.entryPrice != 0:
pnl = self.currentPrice/self.entryPrice - 1
else:
pnl = 0
if (pnl > self.priceWaterMark + stopLossLevel):
self.priceWaterMark = pnl - stopLossLevel
self.algo.Log("Entry Price " + str(self.entryPrice) + " Current Price " + str(self.currentPrice))
self.algo.Log("Current Level " + str(pnl) + " Water Mark Level " + str(self.priceWaterMark) + " Stop Loss Level " + str(stopLossLevel))
if pnl < self.priceWaterMark: #changed to a different stop loss mechanism, but returned back to 0.
self.weightsDict[symbol] = 0
#if self.algo.stopLossTriggered == False:
if self.algo.isRiskHighest:
self.algo.SetHoldings(symbol, 0)
elif self.algo.isRiskHigh:
self.algo.SetHoldings(symbol, 0)
elif self.algo.isRisk:
self.algo.SetHoldings(symbol, 0)
else:
self.algo.SetHoldings(symbol, 0.00)
self.algo.stopLossTriggered = True
self.priceWaterMark = -0.10
def CalculateModelLeverageFactors(self):
# Calculates the scaling percentage based on a straight-line relationship between DD and Scaling
# Minimum leveragefactor of 50% that then scales up to a max of 100% when DD increases to 15%
for key in self.algo.alphasDD:
if key == self.Name:
if self.algo.alphasDD[key] < -0.10 :
self.algo.leverageFactors[key] = self.algo.leverageFactors[key]*0.99
elif self.algo.alphasDD[key] < -0.05:
self.algo.leverageFactors[key] = 1
else:
self.algo.leverageFactors[key] = self.algo.leverageFactors[key]*0.9955
for key in self.weightsDict:
self.weightsDict[key] = self.weightsDict[key]* float(self.algo.leverageFactors[self.Name])
# Adds the non-leveraged portion to the "cash" portion of the algo
for symbol in self.weightsDict.keys():
if symbol.Value == 'TIP':
self.weightsDict[symbol] = self.weightsDict[symbol] + max(0,1-float(self.algo.leverageFactors[self.Name]))
def OnSecuritiesChanged(self, algorithm, changes):
""" Actions to take every time the universe changes """
for security in changes.AddedSecurities:
if security.Symbol.Value == 'SPY':
symbol = security.Symbol
algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.algo.startMinutes), self.EvaluateRisk)
algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.AfterMarketOpen(symbol, self.rebalancingFrequency[1]), self.GenerateSignals)
algorithm.Schedule.On(self.rebalancingFrequency[0](symbol), algorithm.TimeRules.Every(TimeSpan.FromMinutes(60)) , self.VolatilityStopLoss)
if self.activateAlpha and security.Symbol.Value in ['SPY']:
shortATR = algorithm.ATR(security.Symbol, 10, Resolution.Hour)
longATR = algorithm.ATR(security.Symbol, 63, Resolution.Hour)
longerATR = algorithm.ATR(security.Symbol, 210, Resolution.Hour)
self.atrStocksCrossover = IndicatorExtensions.Over(shortATR, longATR)
self.atrStocksCrossoverLongTerm = IndicatorExtensions.Over(longATR, longerATR)
class Fred(PythonQuandl):
def __init__(self):
self.ValueColumnName = 'value'#region imports
from AlgorithmImports import *
#endregion
class CustomData(PythonData):
""" Add custom ticker """
def __init__(self):
# dictionary with custom tickers and urls
self.urls = {'CUSTOM_SPD': 'https://www.dropbox.com/s/fgx97uwvcdnb3ib/spd_index.csv?dl=1',
'CUSTOM_SVOL': 'https://www.dropbox.com/s/v4aq9l49572gyvr/svol_bsv_3_index.csv?dl=1'}
def GetSource(self, config, date, isLive):
# retrieve the relevant url
source = self.urls.get(config.Symbol.Value, '')
return SubscriptionDataSource(source, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLive):
# if first character is not digit, pass
if not (line.strip() and line[0].isdigit()):
return None
data = line.split(',')
customTicker = CustomData()
customTicker.Symbol = config.Symbol
customTicker.Time = datetime.strptime(data[0], '%d/%m/%Y')
customTicker.Value = float(data[1])
return customTicker#region imports
from AlgorithmImports import *
#endregion
import json
def CheckHistory(symbol, history):
""" Check if the history dataframe is valid """
if (str(symbol) not in history.index
or history.loc[str(symbol)].get('open') is None
or history.loc[str(symbol)].get('open').isna().any()
or history.loc[str(symbol)].get('high') is None
or history.loc[str(symbol)].get('high').isna().any()
or history.loc[str(symbol)].get('low') is None
or history.loc[str(symbol)].get('low').isna().any()
or history.loc[str(symbol)].get('close') is None
or history.loc[str(symbol)].get('close').isna().any()):
return False
else:
return True
def ShouldEmitInsight(self, algorithm, symbol):
""" Check if we should emit new insights for a symbol """
tradingDay = algorithm.Time.day
generatedInsight = self.generatedInsightBySymbol.get(symbol)
if generatedInsight is not None:
if self.generatedInsightBySymbol[symbol] == tradingDay:
return False
else:
self.generatedInsightBySymbol[symbol] = tradingDay
return True
else:
self.generatedInsightBySymbol[symbol] = tradingDay
return True
def SaveToObjectStore(algorithm, fileName, dictToSave):
""" Save information into the Object Store """
jsonFile = json.dumps(dictToSave)
algorithm.ObjectStore.Save(fileName, jsonFile)
def ReadFromObjectStore(algorithm, fileName):
if not algorithm.ObjectStore.ContainsKey(fileName):
return {}
jsonObj = algorithm.ObjectStore.Read(fileName)
dictObj = json.loads(jsonObj)
return dictObj
from RotationalOptimizerAlphaCreation import RotationalOptimizerAlphaCreationModel
from RotationalOptimizer1xAlphaCreation import RotationalOptimizer1xAlphaCreationModel
from RotationalOptimizer2xAlphaCreation import RotationalOptimizer2xAlphaCreationModel
from BuyAndHoldAlphaCreation import BuyAndHoldAlphaCreationModel
from VixTermStructureAlphaCreation import VixTermStructureAlphaCreationModel
from MovingAverageAlphaCreation import MovingAverageAlphaCreationModel
from InAndOutAlphaCreation import InAndOutAlphaCreationModel
from CompositePortfolioConstruction import CompositePortfolioConstructionModel
from AlphaParameters import GetAlphaParameters
from QuantConnect.DataSource import *
from AlgorithmImports import *
from custom_data import CustomData
class MultiAlphaFrameworkAlgorithm(QCAlgorithmFramework):
def Initialize(self):
"""
# delete all data in OS
keys = [str(j).split(',')[0][1:] for _, j in enumerate(self.ObjectStore.GetEnumerator())]
for key in keys:
self.ObjectStore.Delete(key)
"""
### user-defined inputs ---------------------------------------------------------------------------
self.SetStartDate(2012, 1, 1) # set start date
#self.SetEndDate(2022, 4, 3) # set end date
self.SetCash(1000000) # set strategy cash
# set the account leverage
self.accountLeverage = 1.0
self.BackTestWarmUp = False #REMEMBER TO SWITCH THIS BACK
self.stopLossTriggered = False
self.isRiskHighest = False
self.isRiskHigh = False
self.isRisk = False
self.Risk = False
self.reversalRisk = False
self.TLTflag = False
self.TLTorGLD = False
self.allAssetsBelowMA = False
self.startMinutes = 1
self.volatilityTicker = "UVXY"
self.volTicker = "VIXM"
self.alphasDD = {} # store alphas DD
self.leverageFactors= {}
self.risklevel = 1
self.uup = 1
# PORTFOLIO ------------------------------------------------------------
# number of days for rolling window of returns
alphaPortfolioOptLookback = 147
# options are: maxReturn, minVariance, maxSharpe, maxSortino, riskParity, targetReturn,
# targetVariance, targetVarianceNegativeReturns
alphaPortfolioOptObjFunction = 'minVariance'
### ------------------------------------------------------------------------------------------------
# apply CustomSecurityInitializer
self.SetSecurityInitializer(lambda x: CustomSecurityInitializer(self, x))
# add universe ---------------------------------------------------------
tickersToAdd = []
# RO universe
parametersDictRO = GetAlphaParameters(self, 'RotationalOptimizer', 'riskyAssetsParameters')
tickerCashRO = GetAlphaParameters(self, 'RotationalOptimizer', 'tickerCash')
treasuryRatesSymbol = None
if GetAlphaParameters(self, 'RotationalOptimizer', 'activate') or 2 > 1:
for ticker in parametersDictRO.keys():
if parametersDictRO[ticker]['addTicker'][0]:
tickersToAdd.append(ticker)
tradableTicker = parametersDictRO[ticker]['addTicker'][1]
if tradableTicker not in tickersToAdd:
tickersToAdd.append(tradableTicker)
if tickerCashRO is not None and tickerCashRO not in tickersToAdd:
tickersToAdd.append(tickerCashRO)
tickersToAdd.append("TIP")
treasuryRatesSymbol = self.AddData(QuandlTreasuryRates, 'USTREASURY/YIELD', Resolution.Daily).Symbol
# RO universe - 1x
parametersDictRO1x = GetAlphaParameters(self, 'RotationalOptimizer1x', 'riskyAssetsParameters')
tickerCashRO1x = GetAlphaParameters(self, 'RotationalOptimizer1x', 'tickerCash')
treasuryRatesSymbol1x = None
if GetAlphaParameters(self, 'RotationalOptimizer1x', 'activate'):
for ticker in parametersDictRO1x.keys():
if parametersDictRO1x[ticker]['addTicker'][0]:
tickersToAdd.append(ticker)
tradableTicker = parametersDictRO1x[ticker]['addTicker'][1]
if tradableTicker not in tickersToAdd:
tickersToAdd.append(tradableTicker)
if tickerCashRO1x is not None and tickerCashRO1x not in tickersToAdd:
tickersToAdd.append(tickerCashRO1x)
treasuryRatesSymbol1x = self.AddData(QuandlTreasuryRates, 'USTREASURY/YIELD', Resolution.Daily).Symbol
# RO universe - 2x
parametersDictRO2x = GetAlphaParameters(self, 'RotationalOptimizer2x', 'riskyAssetsParameters')
tickerCashRO2x = GetAlphaParameters(self, 'RotationalOptimizer2x', 'tickerCash')
treasuryRatesSymbol2x = None
if GetAlphaParameters(self, 'RotationalOptimizer2x', 'activate'):
for ticker in parametersDictRO2x.keys():
if parametersDictRO2x[ticker]['addTicker'][0]:
tickersToAdd.append(ticker)
tradableTicker = parametersDictRO2x[ticker]['addTicker'][1]
if tradableTicker not in tickersToAdd:
tickersToAdd.append(tradableTicker)
if tickerCashRO2x is not None and tickerCashRO2x not in tickersToAdd:
tickersToAdd.append(tickerCashRO2x)
treasuryRatesSymbol2x = self.AddData(QuandlTreasuryRates, 'USTREASURY/YIELD', Resolution.Daily).Symbol
# BH universe - SVOL
parametersDictBuyHoldSVOL = GetAlphaParameters(self, 'BuyAndHoldSVOL', 'tickersWeights')
if GetAlphaParameters(self, 'BuyAndHoldSVOL', 'activate'):
for ticker in parametersDictBuyHoldSVOL.keys():
if ticker not in tickersToAdd:
tickersToAdd.append(ticker)
# VTS universe
parametersDictVTS = GetAlphaParameters(self, 'VixTermStructure', 'tickersWeights')
canarySymbolVTS = None
vixSymbol, vxvSymbol = None, None
if GetAlphaParameters(self, 'VixTermStructure', 'activate'):
for ticker in parametersDictVTS.keys():
if ticker not in tickersToAdd:
tickersToAdd.append(ticker)
canaryTickerVTS = GetAlphaParameters(self, 'MovingAverageSPY', 'canaryTicker')
canarySymbolVTS = self.AddEquity(canaryTickerVTS, Resolution.Minute).Symbol
vixSymbol = self.AddData(CBOE, 'VIX', Resolution.Daily).Symbol
vxvSymbol = self.AddData(CBOE, 'VIX3M', Resolution.Daily).Symbol
# MA universe - SPY
parametersDictMovingAverageSPY = GetAlphaParameters(self, 'MovingAverageSPY', 'tickersWeights')
canarySymbolMovingAverageSPY = None
if GetAlphaParameters(self, 'MovingAverageSPY', 'activate'):
for ticker in parametersDictMovingAverageSPY.keys():
if ticker not in tickersToAdd:
tickersToAdd.append(ticker)
canaryTickerMovingAverageSPY = GetAlphaParameters(self, 'MovingAverageSPY', 'canaryTicker')
canarySymbolMovingAverageSPY = self.AddEquity(canaryTickerMovingAverageSPY, Resolution.Minute).Symbol
# IO universe
assetsWeights = GetAlphaParameters(self, 'InAndOut', 'assetsWeights')
if GetAlphaParameters(self, 'InAndOut', 'activate'):
for proxy, tickerWeight in assetsWeights.items():
# replace ticker with symbol
ticker = tickerWeight[0]
assetsWeights[proxy][0] = self.AddEquity(ticker, Resolution.Minute).Symbol
if ticker not in tickersToAdd:
tickersToAdd.append(ticker)
# add all universes
for ticker in tickersToAdd:
if ticker.split('_')[0] == 'CUSTOM':
customSecurity = self.AddData(CustomData, ticker, Resolution.Daily)
customSecurity.SetFeeModel(ConstantFeeModel(0))
else:
self.AddEquity(ticker, Resolution.Minute)
# make sure we add SPY for scheduling
if 'SPY' not in tickersToAdd:
self.AddEquity('SPY', Resolution.Minute)
# get all active alpha models
alphaModels = [key for key, value in GetAlphaParameters(self).items() if value['activate']]
# add alpha models -----------------------------------------------------
self.SetAlpha(
CompositeAlphaModel(
RotationalOptimizerAlphaCreationModel(self, name='RotationalOptimizer',
activateAlpha=GetAlphaParameters(self, 'RotationalOptimizer', 'activate'),
parametersDict=parametersDictRO,
rebalancingFrequency=GetAlphaParameters(self, 'RotationalOptimizer', 'rebalancingFrequency'),
lookbackOptimization=GetAlphaParameters(self, 'RotationalOptimizer', 'lookbackOptimization'),
objFunction=GetAlphaParameters(self, 'RotationalOptimizer', 'objFunction'),
tickerCash=tickerCashRO, treasuryRatesSymbol=treasuryRatesSymbol),
RotationalOptimizer1xAlphaCreationModel(self, name='RotationalOptimizer1x',
activateAlpha=GetAlphaParameters(self, 'RotationalOptimizer1x', 'activate'),
parametersDict=parametersDictRO1x,
rebalancingFrequency=GetAlphaParameters(self, 'RotationalOptimizer1x', 'rebalancingFrequency'),
lookbackOptimization=GetAlphaParameters(self, 'RotationalOptimizer1x', 'lookbackOptimization'),
objFunction=GetAlphaParameters(self, 'RotationalOptimizer1x', 'objFunction'),
tickerCash=tickerCashRO1x, treasuryRatesSymbol=treasuryRatesSymbol1x),
RotationalOptimizer2xAlphaCreationModel(self, name='RotationalOptimizer2x',
activateAlpha=GetAlphaParameters(self, 'RotationalOptimizer2x', 'activate'),
parametersDict=parametersDictRO2x,
rebalancingFrequency=GetAlphaParameters(self, 'RotationalOptimizer2x', 'rebalancingFrequency'),
lookbackOptimization=GetAlphaParameters(self, 'RotationalOptimizer2x', 'lookbackOptimization'),
objFunction=GetAlphaParameters(self, 'RotationalOptimizer2x', 'objFunction'),
tickerCash=tickerCashRO2x, treasuryRatesSymbol=treasuryRatesSymbol2x),
BuyAndHoldAlphaCreationModel(self, name='BuyAndHoldSVOL',
activateAlpha=GetAlphaParameters(self, 'BuyAndHoldSVOL', 'activate'),
parametersDict=parametersDictBuyHoldSVOL,
rebalancingFrequency=GetAlphaParameters(self, 'BuyAndHoldSVOL', 'rebalancingFrequency')),
VixTermStructureAlphaCreationModel(self, name='VixTermStructure',
activateAlpha=GetAlphaParameters(self, 'VixTermStructure', 'activate'),
parametersDict=parametersDictVTS,
vtsSymbols=[vixSymbol, vxvSymbol],
canarySymbol=canarySymbolVTS,
periodShortSMA=GetAlphaParameters(self, 'VixTermStructure', 'periodShortSMA'),
periodLongSMA=GetAlphaParameters(self, 'VixTermStructure', 'periodLongSMA'),
contangoThresholds=GetAlphaParameters(self, 'VixTermStructure', 'contangoThresholds'),
rebalancingFrequency=GetAlphaParameters(self, 'VixTermStructure', 'rebalancingFrequency')),
MovingAverageAlphaCreationModel(self, name='MovingAverageSPY',
activateAlpha=GetAlphaParameters(self, 'MovingAverageSPY', 'activate'),
parametersDict=parametersDictMovingAverageSPY,
canarySymbol=canarySymbolMovingAverageSPY,
periodShortSMA=GetAlphaParameters(self, 'MovingAverageSPY', 'periodShortSMA'),
periodLongSMA=GetAlphaParameters(self, 'MovingAverageSPY', 'periodLongSMA'),
movingAverageThresholds=GetAlphaParameters(self, 'MovingAverageSPY', 'movingAverageThresholds'),
rebalancingFrequency=GetAlphaParameters(self, 'MovingAverageSPY', 'rebalancingFrequency')),
InAndOutAlphaCreationModel(self, name='InAndOut',
activateAlpha=GetAlphaParameters(self, 'InAndOut', 'activate'),
assetsWeights=assetsWeights,
rebalancingFrequency=GetAlphaParameters(self, 'InAndOut',
'rebalancingFrequency'))
)
)
# add portfolio model --------------------------------------------------
optimizationBounds = {key: value['optimizationBounds']
for key, value in GetAlphaParameters(self).items() if value['activate']}
self.SetPortfolioConstruction(
CompositePortfolioConstructionModel(self, alphaModels=alphaModels,
optimizationBounds=optimizationBounds,
alphaPortfolioOptLookback=alphaPortfolioOptLookback,
alphaPortfolioOptObjFunction=alphaPortfolioOptObjFunction))
# add execution model --------------------------------------------------
self.SetExecution(ImmediateExecutionModel())
# add risk model -------------------------------------------------------
self.SetRiskManagement(NullRiskManagementModel())
# warm up algorithm
self.SetWarmup(200, Resolution.Daily)
def CustomSecurityInitializer(self, security):
""" Initialize the security """
security.SetLeverage(self.accountLeverage + 1)
security.SetMarketPrice(self.GetLastKnownPrice(security))
class QuandlTreasuryRates(PythonQuandl):
def __init__(self):
super().__init__()
self.ValueColumnName = 'value'