Overall Statistics Total Trades 842 Average Win 0.29% Average Loss -0.19% Compounding Annual Return 8.604% Drawdown 12.600% Expectancy 1.291 Net Profit 227.421% Sharpe Ratio 1.235 Probabilistic Sharpe Ratio 78.823% Loss Rate 9% Win Rate 91% Profit-Loss Ratio 1.53 Alpha 0.073 Beta -0.012 Annual Standard Deviation 0.058 Annual Variance 0.003 Information Ratio -0.069 Tracking Error 0.192 Treynor Ratio -6.097 Total Fees \$846.44
import matplotlib.pyplot as plt
import matplotlib.colors as colors
import matplotlib.ticker as mticker
import random
import pandas as pd
import numpy as np
from PortfolioOptimizerClass import PortfolioOptimizer

def CalculateRelativeRiskContributions(returnsDf, weights):

''' Calculate the Relative Risk Contributions for each asset given returns and weights '''

covariance = returnsDf.cov()
rrc = weights * np.dot(weights.T, covariance) / np.dot(weights.T, np.dot(covariance, weights))

return rrc

def CalculateDrawdownSeries(timeSeries):

''' Calculate the drawdown series from a time series of cumulative returns '''

df = timeSeries.to_frame()
lastPeak = df.iloc[0][0]
df['Drawdown'] = 1
for i in range(len(df)):
if df.iloc[i, 0] < lastPeak:
df.iloc[i, 1] = df.iloc[i, 0] / lastPeak
else:
lastPeak = df.iloc[i, 0]

finalDf = (df['Drawdown'] - 1)

return finalDf

def CalculateMonthlyReturns(df):

''' Calculate the monthly returns from a dataframe of daily prices '''

dfStartMonth = (df.groupby([df.index.year, df.index.month])
.apply(lambda x: x.head(1))) # group by year and month and take the first value of the month
dfStartMonth.index = dfStartMonth.index.droplevel(0) # drop the index level we don't need
dfStartMonth.index = dfStartMonth.index.droplevel(0) # drop the index level we don't need
dfStartMonth = dfStartMonth.append(df.iloc[-1].to_frame().transpose()) # append last day to dfStartMonth
dfFinal = dfStartMonth.pct_change().shift(-1).dropna() # calculate pct change, shift series up and drop nas

return dfFinal

def CalculateAnnualizedVolatilityReturn(returnsDf, weights = None, weightsType = 'series', factor = 252):

''' Calculate the pair annualized volatility (std) and annualized return
given returns, weights and annualization factor '''

if weights is None:
annualizedVolatility = returnsDf.std() * np.sqrt(factor)
annualizedReturn = ((1 + returnsDf.mean())**factor) - 1
else:
if weightsType =='series':
annualizedVolatility = returnsDf.mul(weights).sum(axis = 1).std() * np.sqrt(factor)
annualizedReturn = ((1 + returnsDf.mul(weights).sum(axis = 1).mean())**factor) - 1
elif weightsType == 'array':
covariance = returnsDf.cov()
annualizedVolatility = np.sqrt( np.dot(weights.T, np.dot(covariance * factor, weights)) )
annualizedReturn = np.sum( ((1 + returnsDf.mean())**factor - 1) * weights )
else:
raise ValueError('weightsType must be one of: series or array')

return annualizedVolatility, annualizedReturn

def PlotVolWeightsRrcComparison(plotDict, title, colors, plotSize = (20, 5)):

''' Plot a comparison across optimizations showing volatility, weights and rrc '''

def ApplyPlotStyle(x, labels):
x.set_xticklabels(labels, rotation = 45)
ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y))
x.yaxis.set_major_formatter(ticks)
x.spines['right'].set_visible(False)
x.spines['top'].set_visible(False)
x.xaxis.set_ticks_position('none')
x.yaxis.set_ticks_position('none')

keysList = sorted(list(plotDict.keys()) * 3)
n = len(list(plotDict.keys()))
seriesList = [0, 1, 2] * n

fig, axs = plt.subplots(n, int(n / 2), figsize = plotSize)
#fig, axs = plt.subplots(3, 3, figsize = plotSize)
fig.suptitle(title, size = 15)

for i, ax in enumerate(fig.axes):
plot = plotDict[keysList[i]][seriesList[i]]
ax.bar(plot.index, plot.values, color = colors)
ApplyPlotStyle(ax, plot.index)

if seriesList[i] == 0:
ax.set_ylabel(keysList[i], rotation = 90, size = 13)

if i == 0:
ax.set_title('Annualized Volatility', size = 13)
elif i == 1:
ax.set_title('Portfolio Weights', size = 13)
elif i == 2:
ax.set_title('Relative Risk Contribution', size = 13)

def PlotMonthlyWeights(weightsDict, lookbackForTitle, colors, plotSize = (20, 5)):

''' Plot a stackplot of the time series of monthly optimal weights for each objective function '''

def ApplyPlotStyle(x, title):
x.set_title(title, fontsize = 13)
x.spines['right'].set_visible(False)
x.spines['top'].set_visible(False)
x.xaxis.set_ticks_position('none')
x.yaxis.set_ticks_position('none')

keysList = list(weightsDict.keys())
n = len(keysList)

fig, axs = plt.subplots(int(n / 2), int(n / 3), figsize = plotSize)
fig.suptitle('Portfolio Optimization (' + str(lookbackForTitle) + '-Month Lookback) - Monthly Weights', size = 15)

for i, ax in enumerate(fig.axes):
plot = weightsDict[keysList[i]]
ax.stackplot(plot.index, plot.T, colors = colors)
title = keysList[i]
if keysList[i] == 'equalWeighting':
title = keysList[i]
ApplyPlotStyle(ax, title)
if i == 0:
ax.legend(list(plot.columns), ncol = len(list(plot.columns)), bbox_to_anchor = (1.1, 1.25), loc = 'upper center')

def GenerateCumRetDf(returnsDf, weightsDict):

''' Generate a dataframe with the time series of cumulative returns
for each objective function and individual asset '''

cumRetList = []
columnsList = []

for objFunction in weightsDict:
cumRet = returnsDf.mul(weightsDict[objFunction]).sum(axis = 1).add(1).cumprod() - 1
cumRetList.append(cumRet)
columnsList.append(objFunction)

for ticker in returnsDf:
data = returnsDf[ticker]
cumRetList.append(cumRet)
columnsList.append(ticker)

cumRetDf = pd.concat(cumRetList, axis = 1)
currentColumnsList = list(cumRetDf.columns)
cumRetDf = cumRetDf.rename(columns = {currentColumnsList[i]: columnsList[i] for i in range(len(currentColumnsList))})

return cumRetDf

def GenerateDrawdownDf(returnsDf, weightsDict):

''' Generate the time series of drawdown for each objective function and individual asset '''

ddList = []
columnsList = []

for objFunction in weightsDict:
ddSeries = CalculateDrawdownSeries(cumRet)
ddSeries.name = None
ddList.append(ddSeries)
columnsList.append(objFunction)

for ticker in returnsDf:
data = returnsDf[ticker]
ddSeries = CalculateDrawdownSeries(cumRet)
ddSeries.name = None
ddList.append(ddSeries)
columnsList.append(ticker)

ddDf = pd.concat(ddList, axis = 1)
currentColumnsList = list(ddDf.columns)
ddDf = ddDf.rename(columns = {currentColumnsList[i]: columnsList[i] for i in range(len(currentColumnsList))})

return ddDf

def PlotCumRetDD(cumRetDf, ddDf, title, colors, plotSize = (12, 8)):

''' Plot the time series of cumulative returns and drawdown '''

def ApplyPlotStyle(x):
ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y))
x.yaxis.set_major_formatter(ticks)
x.yaxis.grid(True)
x.spines['right'].set_visible(False)
x.spines['top'].set_visible(False)
x.xaxis.set_ticks_position('none')
x.yaxis.set_ticks_position('none')

fig, (ax1, ax2) = plt.subplots(2, 1, figsize = plotSize)

ax1.set_prop_cycle('color', colors)
ax1.plot(cumRetDf)
ax1.set_title(title, fontsize = 15)
ax1.axes.get_xaxis().set_visible(False)
ax1.spines['bottom'].set_visible(False)
ApplyPlotStyle(ax1)

ax2.set_prop_cycle('color', colors)
ax2.plot(ddDf)
ApplyPlotStyle(ax2)

ax1.legend(list(cumRetDf.columns), loc = 'left')
fig.tight_layout()

def PlotRiskRewardProfile(returnsDf, weightsDict, cumReturnsDf, drawdownDf, title, colors, plotSize = (12, 8)):

'''
Plot multiple scatter plots with different risk/reward profiles:
1) Annualized Return/Annualized Volatility
2) Sharpe Ratio/Maximum Drawdown
3) Annualized Return/Maximum Drawdown
4) Final summary table with results
'''

def ApplyPlot1Style(x, title):
x.set_title(title, fontsize = 15)
ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y))
x.xaxis.set_major_formatter(ticks)
x.yaxis.set_major_formatter(ticks)
x.spines['right'].set_visible(False)
x.spines['top'].set_visible(False)
x.set_ylim(ymin = 0)
x.set_xlim(xmin = 0)
x.xaxis.label.set_size(12)
x.yaxis.label.set_size(12)
x.xaxis.set_ticks_position('none')
x.yaxis.set_ticks_position('none')

def ApplyPlot2Style(x, title):
x.set_title(title, fontsize = 15)
ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y))
x.xaxis.set_major_formatter(ticks)
x.spines['right'].set_visible(False)
x.spines['top'].set_visible(False)
x.set_ylim(ymin = 0)
x.set_xlim(xmin = 0)
x.xaxis.label.set_size(12)
x.yaxis.label.set_size(12)
x.xaxis.set_ticks_position('none')
x.yaxis.set_ticks_position('none')

def ApplyFormat(a, b, c, d, e):
a = '{:.0%}'.format(a)
b = '{:.0%}'.format(b)
c = round(c, 1)
d = '{:.0%}'.format(d)
e = '{:.0%}'.format(e)

return (a, b, c, d, e)

fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize = plotSize)
fig.suptitle(title, fontsize = 15)

dataList = []
count = 0
for objFunction in weightsDict:
annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf = returnsDf,
weights = weightsDict[objFunction],
factor = 12)
ax1.scatter(annualizedPortfolioVolatility, annualizedPortfolioReturn, color = colors[count], s = 100)
ax1.text(annualizedPortfolioVolatility, annualizedPortfolioReturn, objFunction, verticalalignment = 'top',
horizontalalignment = 'left', fontsize = 13)

cumRet = cumReturnsDf[objFunction][-1]
maxDD = max(abs(drawdownDf[objFunction]))
ax2.scatter(maxDD, annualizedPortfolioReturn, color = colors[count], s = 100)
ax2.text(maxDD, annualizedPortfolioReturn, objFunction, verticalalignment = 'top',
horizontalalignment = 'left', fontsize = 13)

annualizedPortfolioSharpeRatio = annualizedPortfolioReturn / annualizedPortfolioVolatility
ax3.scatter(maxDD, annualizedPortfolioSharpeRatio, color = colors[count], s = 100)
ax3.text(maxDD, annualizedPortfolioSharpeRatio, objFunction, verticalalignment = 'top',
horizontalalignment = 'left', fontsize = 13)

dataTuple = ApplyFormat(annualizedPortfolioReturn, annualizedPortfolioVolatility, annualizedPortfolioSharpeRatio, maxDD, cumRet)
dataList.append(dataTuple)

count += 1

for ticker in returnsDf:
data = returnsDf[ticker]
annualizedVolatility, annualizedReturn = CalculateAnnualizedVolatilityReturn(returnsDf = data, weightsType = 'array', factor = 12)
ax1.scatter(annualizedVolatility, annualizedReturn, color = 'black', s = 10)
ax1.text(annualizedVolatility, annualizedReturn, ticker, verticalalignment = 'top',
horizontalalignment = 'left', fontsize = 8)

cumRet = cumReturnsDf[[ticker]].iloc[-1][0]
maxDD = max(abs(drawdownDf[[ticker]].values))[0]
ax2.scatter(maxDD, annualizedReturn, color = 'black', s = 10)
ax2.text(maxDD, annualizedReturn, ticker, verticalalignment = 'top',
horizontalalignment = 'left', fontsize = 8)

annualizedSharpeRatio = annualizedReturn / annualizedVolatility
ax3.scatter(maxDD, annualizedSharpeRatio, color = 'black', s = 10)
ax3.text(maxDD, annualizedSharpeRatio, ticker, verticalalignment = 'top',
horizontalalignment = 'left', fontsize = 8)

dataTuple = ApplyFormat(annualizedReturn, annualizedVolatility, annualizedSharpeRatio, maxDD, cumRet)
dataList.append(dataTuple)

ApplyPlot1Style(ax1, title = 'Return-Volatility')
ApplyPlot1Style(ax2, title = 'Return-Max Drawdown')
ApplyPlot2Style(ax3, title = 'Sharpe Ratio-Max Drawdown')
ax1.set(xlabel = 'Annualized Volatility (Standard Deviation)', ylabel = 'Annualized Return')
ax2.set(xlabel = 'Maximum Drawdown', ylabel = 'Annualized Return')
ax3.set(xlabel = 'Maximum Drawdown', ylabel = 'Annualized Sharpe Ratio')

table = ax3.table(cellText = dataList,
rowLabels = list(cumReturnsDf.columns),
colLabels = ['Ann. Return', 'Ann. Volatility', 'Sharpe Ratio', 'Max DD', 'Cum. Return'],
loc = 4, cellLoc = 'center', bbox = [0.1, -1, 0.9, 0.8])

table.auto_set_font_size(False)
table.set_fontsize(10)

fig.subplots_adjust(top = 0.8, bottom = -1.2)

def PlotReturnVolatilityMultipleLookback(startMonth, returnsDf, weightsDict, title, colors, plotSize = (12, 8)):

''' Plot scatter plot with Return/Volatility for multiple combinations of objective functions and lookback periods '''

def ApplyPlotStyle(x, title):
x.set_title(title, fontsize = 15)
x.set(xlabel = 'Annualized Volatility (Standard Deviation)', ylabel = 'Annualized Return')
ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y))
x.xaxis.set_major_formatter(ticks)
x.yaxis.set_major_formatter(ticks)
x.xaxis.label.set_size(13)
x.yaxis.label.set_size(13)
x.spines['right'].set_visible(False)
x.spines['top'].set_visible(False)
x.set_ylim(ymin = 0)
x.set_xlim(xmin = 0)

plt.figure(figsize = plotSize)

keysList = []
for month, objFunctions in weightsDict.items():

count = 0
for objFunction, weights in objFunctions.items():
if objFunction == 'equalWeighting' and 'equalWeighting' in keysList:
count += 1
continue

annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf = adjustedMonthlyReturns,
weights = weights, factor = 12)
plt.scatter(annualizedPortfolioVolatility, annualizedPortfolioReturn, color = colors[count], s = 100)
plt.text(annualizedPortfolioVolatility, annualizedPortfolioReturn, month, verticalalignment = 'top',
horizontalalignment = 'left', fontsize = 13)
keysList.append(objFunction)
count += 1

ApplyPlotStyle(plt.gca(), title = title)
uniqueKeys = sorted(set(keysList), key = keysList.index)
plt.legend(uniqueKeys, loc = 'left')
plt.show()

def GetCoordinatesEfficientFrontier(targetReturnsArray, returnsDf):

''' Return the pairs of volatility/return coordinates for the efficient frontier '''

# initialize PortfolioOptimizer class
portOpt = PortfolioOptimizer(minWeight = 0, maxWeight = 1)

volatilities = []
returns = []
for targetReturn in targetReturnsArray:
weights = portOpt.Optimize('meanVariance', returnsDf, targetReturn)
annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf,
weights,
weightsType = 'array',
factor = 252)
volatilities.append(annualizedPortfolioVolatility)
returns.append(annualizedPortfolioReturn)

efficientFrontierList = [volatilities, returns]

return efficientFrontierList

def PlotMeanVarianceSpace(returnsDf, objFunctionList, simulations = 1000, plotSize = (20, 5)):

''' Plot the mean-variance space (and efficient frontier)
with simulations of portfolios, individual assets and optimal portfolios '''

# initialize PortfolioOptimizer class
portOpt = PortfolioOptimizer(minWeight = 0, maxWeight = 1)

# dictionary to store pairs of volatility/return for objective functions and tickers
volatilityReturnDict = {}

# loop through the objective functions to generate optimal weights and calculate the pairs of volatility/return for each
for objfunction in objFunctionList:
weights = portOpt.Optimize(objfunction, returnsDf)
annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf,
weights,
weightsType = 'array',
factor = 252)
volatilityReturnDict[objfunction] = [annualizedPortfolioVolatility, annualizedPortfolioReturn]

# loop through the tickers and calculate the pairs of volatility/return for each
for ticker in returnsDf:
data = returnsDf[ticker]
annualizedVolatility, annualizedReturn = CalculateAnnualizedVolatilityReturn(returnsDf = data, weightsType = 'array',
factor = 252)
volatilityReturnDict[ticker] = [annualizedVolatility, annualizedReturn]

# calculate the efficient frontier
# make an array of target returns between the minimum return and maximum return to run mean-variance optimization
returnsList = [value[1] for key, value in volatilityReturnDict.items()]

targetReturnsArray = np.linspace(volatilityReturnDict['minVariance'][1], max(returnsList), 100)
efficientFrontierList = GetCoordinatesEfficientFrontier(targetReturnsArray, returnsDf)

targetReturnsArray = np.linspace(min(returnsList), volatilityReturnDict['minVariance'][1], 100)
beforeEfficientFrontierList = GetCoordinatesEfficientFrontier(targetReturnsArray, returnsDf)

# run simulations of portfolios and extract their pairs of volatility/return
n = returnsDf.columns.size
volatilities = []
returns = []

for i in range(simulations):
weights = np.random.dirichlet(np.ones(n), size = 1)
weights = weights[0]
annualizedPortfolioVolatility, annualizedPortfolioReturn = CalculateAnnualizedVolatilityReturn(returnsDf = returnsDf, weights = weights,
weightsType = 'array', factor = 252)
volatilities.append(annualizedPortfolioVolatility)
returns.append(annualizedPortfolioReturn)

simulationsVolatilitiesReturns = [np.array(volatilities), np.array(returns)]

# plotting
def ApplyPlotStyle(x, title):
x.set_title(title, fontsize = 15)
x.set(xlabel = 'Annualized Volatility (Standard Deviation)', ylabel = 'Annualized Return')
ticks = mticker.FuncFormatter(lambda y, _: '{:.0%}'.format(y))
x.xaxis.set_major_formatter(ticks)
x.yaxis.set_major_formatter(ticks)
x.xaxis.label.set_size(13)
x.yaxis.label.set_size(13)
x.spines['right'].set_visible(False)
x.spines['top'].set_visible(False)
x.xaxis.set_ticks_position('none')
x.yaxis.set_ticks_position('none')

# initialize figure size
plt.figure(figsize = plotSize)

# plot simulated portfolios
plt.scatter(simulationsVolatilitiesReturns[0], simulationsVolatilitiesReturns[1],
c = simulationsVolatilitiesReturns[1] / simulationsVolatilitiesReturns[0],
marker = 'o', alpha = 0.1)

# plot efficient frontier
plt.plot(efficientFrontierList[0], efficientFrontierList[1], '-', c = 'red')
plt.plot(beforeEfficientFrontierList[0], beforeEfficientFrontierList[1], '--', c = 'black')

# plot optimal portfolios
for objFunction, volatilityReturn in volatilityReturnDict.items():
if objFunction == 'maxReturn':
offsetText = -0.005
else:
offsetText = 0
plt.plot(volatilityReturn[0], volatilityReturn[1], '-o', c = 'black')
plt.text(volatilityReturn[0], volatilityReturn[1] + offsetText, objFunction, verticalalignment = 'top',
horizontalalignment = 'left', fontsize = 12, c = 'black')

ApplyPlotStyle(plt.gca(), title = 'Mean-Variance')
plt.colorbar(label = 'Sharpe Ratio', pad = 0.1)
plt.show()

from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import AlphaModel, Insight, InsightType, InsightDirection

import pandas as pd
from PortfolioOptimizerClass import PortfolioOptimizer

class OptimalWeightsAlphaCreationModel(AlphaModel):

def __init__(self, rebalancingPeriod = Expiry.EndOfMonth, objectiveFunction = 'riskParity', lookbackOptimization = 63):

self.rebalancingPeriod = rebalancingPeriod
self.objectiveFunction = objectiveFunction
self.lookbackOptimization = lookbackOptimization

self.securities = [] # list to store securities to consider
self.portfolioValueHigh = 0 # initialize portfolioValueHigh for drawdown calculation
self.portfolioValueHighInitialized = False # initialize portfolioValueHighInitialized for drawdown calculation

# initialize PortfolioOptimizer class
self.portOpt = PortfolioOptimizer(minWeight = 0, maxWeight = 1)

self.nextRebalancingTime = None

def Update(self, algorithm, data):

# initialize nextRebalancingTime
if self.nextRebalancingTime is None:
self.nextRebalancingTime = self.rebalancingPeriod(algorithm.Time)

insights = [] # list to store the new insights to be created

# make sure we only send insights at rebalancing
if algorithm.Time >= self.nextRebalancingTime:

### plotting ----------------------------------------------------------------------------------------
currentTotalPortfolioValue = algorithm.Portfolio.TotalPortfolioValue # get current portfolio value

# plot the daily total portfolio exposure %
totalPortfolioExposure = (algorithm.Portfolio.TotalHoldingsValue / currentTotalPortfolioValue) * 100
algorithm.Plot('Chart Total Portfolio Exposure %', 'Daily Portfolio Exposure %', totalPortfolioExposure)

# plot the drawdown % from the most recent high
if not self.portfolioValueHighInitialized:
self.portfolioHigh = currentTotalPortfolioValue # set initial portfolio value
self.portfolioValueHighInitialized = True

# update trailing high value of the portfolio
if self.portfolioValueHigh < currentTotalPortfolioValue:
self.portfolioValueHigh = currentTotalPortfolioValue

currentDrawdownPercent = ((float(currentTotalPortfolioValue) / float(self.portfolioValueHigh)) - 1.0) * 100
algorithm.Plot('Chart Drawdown %', 'Drawdown %', currentDrawdownPercent)

### generate insights ------------------------------------------------------------------------------

# calculate optimal weights
symbols = [security.Symbol for security in self.securities]
weights = self.CalculateOptimalWeights(algorithm, symbols, self.objectiveFunction, self.lookbackOptimization)

# insight expiry time and direction
insightExpiry = Expiry.EndOfDay(algorithm.Time)
insightDirection = InsightDirection.Up # insight direction

# loop through securities and generate insights
for security in self.securities:
weight = weights[str(security.Symbol.Value)]

# append the insights list with the prediction and weights for each symbol
insights.append(Insight.Price(security.Symbol, insightExpiry, insightDirection,
None, None, None, weight))

algorithm.Plot('Chart Optimal Weights %', security.Symbol.Value, float(weight))

self.nextRebalancingTime = self.rebalancingPeriod(algorithm.Time)

return insights

def CalculateOptimalWeights(self, algorithm, symbols, objectiveFunction, lookbackOptimization):

'''
Description:
Calculate the optimal weights for each symbol provided
Args:
algorithm: The algorithm instance that experienced the change in securities
symbols: The symbols in current universe
objectiveFunction: The objective function for optimization
lookbackOptimization: The number of days to look back to calculate optimal weights
'''

# get historical close prices
historyClosePrices = algorithm.History(symbols, lookbackOptimization, Resolution.Daily)['close'].unstack(level = 0)

# calculate daily returns
returnsDf = historyClosePrices.pct_change().dropna()
# rename the columns in the dataframe in order to have tickers and not symbol strings
columnsList = list(returnsDf.columns)
returnsDf.rename(columns = {columnsList[i]: algorithm.ActiveSecurities[columnsList[i]].Symbol.Value for i in range(len(columnsList))}, inplace = True)

# calculate optimal weights
weights = self.portOpt.Optimize(objectiveFunction, returnsDf)
# convert the weights to a pandas Series
weights = pd.Series(weights, index = returnsDf.columns, name = 'weights')

algorithm.Log(weights)

return weights

def OnSecuritiesChanged(self, algorithm, changes):

'''
Description:
Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm
'''

# remove securities
for removed in changes.RemovedSecurities:
if removed in self.securities:
self.securities.remove(removed)
import pandas as pd
import numpy as np
from scipy.optimize import minimize

class PortfolioOptimizer:

'''
Description:
Implementation of a custom optimizer that calculates the weights for each asset to optimize a given objective function
Details:
Optimization can be:
- Equal Weighting
- Maximize Portfolio Return
- Minimize Portfolio Standard Deviation
- Mean-Variance (minimize Standard Deviation given a target return)
- Maximize Portfolio Sharpe Ratio
- Maximize Portfolio Sortino Ratio
- Risk Parity Portfolio
Constraints:
- Weights must be between some given boundaries
- Weights must sum to 1
'''

def __init__(self,
minWeight = 0,
maxWeight = 1):

'''
Description:
Initialize the CustomPortfolioOptimizer
Args:
minWeight(float): The lower bound on portfolio weights
maxWeight(float): The upper bound on portfolio weights
'''

self.minWeight = minWeight
self.maxWeight = maxWeight

def Optimize(self, objFunction, dailyReturnsDf, targetReturn = None):

'''
Description:
Perform portfolio optimization given a series of returns
Args:
objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance, maxSharpe, maxSortino, riskParity)
dailyReturnsDf: DataFrame of historical daily arithmetic returns
Returns:
Array of double with the portfolio weights (size: K x 1)
'''

# initial weights: equally weighted
size = dailyReturnsDf.columns.size # K x 1
self.initWeights = np.array(size * [1. / size])

# get sample covariance matrix
covariance = dailyReturnsDf.cov()
# get the sample covariance matrix of only negative returns for sortino ratio
negativeReturnsDf = dailyReturnsDf[dailyReturnsDf < 0]
covarianceNegativeReturns = negativeReturnsDf.cov()

if objFunction == 'equalWeighting':
return self.initWeights

bounds = tuple((self.minWeight, self.maxWeight) for x in range(size))
constraints = [{'type': 'eq', 'fun': lambda x: np.sum(x) - 1.0}]

if objFunction == 'meanVariance':
# if no target return is provided, use the resulting from equal weighting
if targetReturn is None:
targetReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, self.initWeights)
constraints.append( {'type': 'eq', 'fun': lambda weights:
self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) - targetReturn} )

opt = minimize(lambda weights: self.ObjectiveFunction(objFunction, dailyReturnsDf,
covariance, covarianceNegativeReturns,
weights),
x0 = self.initWeights,
bounds = bounds,
constraints = constraints,
method = 'SLSQP')

return opt['x']

def ObjectiveFunction(self, objFunction, dailyReturnsDf, covariance, covarianceNegativeReturns, weights):

'''
Description:
Compute the objective function
Args:
objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance,
maxSharpe, maxSortino, riskParity)
dailyReturnsDf: DataFrame of historical daily returns
covariance: Sample covariance
covarianceNegativeReturns: Sample covariance matrix of only negative returns
weights: Portfolio weights
'''

if objFunction == 'maxReturn':
f = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
return -f # convert to negative to be minimized
elif objFunction == 'minVariance':
f = self.CalculateAnnualizedPortfolioStd(covariance, weights)
return f
elif objFunction == 'meanVariance':
f = self.CalculateAnnualizedPortfolioStd(covariance, weights)
return f
elif objFunction == 'maxSharpe':
f = self.CalculateAnnualizedPortfolioSharpeRatio(dailyReturnsDf, covariance, weights)
return -f # convert to negative to be minimized
elif objFunction == 'maxSortino':
f = self.CalculateAnnualizedPortfolioSortinoRatio(dailyReturnsDf, covarianceNegativeReturns, weights)
return -f # convert to negative to be minimized
elif objFunction == 'riskParity':
f = self.CalculateRiskParityFunction(covariance, weights)
return f
else:
raise ValueError(f'PortfolioOptimizer.ObjectiveFunction: objFunction input has to be one of equalWeighting,'
+ ' maxReturn, minVariance, meanVariance, maxSharpe, maxSortino or riskParity')

def CalculateAnnualizedPortfolioReturn(self, dailyReturnsDf, weights):

annualizedPortfolioReturns = np.sum( ((1 + dailyReturnsDf.mean())**252 - 1) * weights )

return annualizedPortfolioReturns

def CalculateAnnualizedPortfolioStd(self, covariance, weights):

annualizedPortfolioStd = np.sqrt( np.dot(weights.T, np.dot(covariance * 252, weights)) )

if annualizedPortfolioStd == 0:
raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioStd: annualizedPortfolioStd cannot be zero. Weights: {weights}')

return annualizedPortfolioStd

def CalculateAnnualizedPortfolioNegativeStd(self, covarianceNegativeReturns, weights):

annualizedPortfolioNegativeStd = np.sqrt( np.dot(weights.T, np.dot(covarianceNegativeReturns * 252, weights)) )

if annualizedPortfolioNegativeStd == 0:
raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioNegativeStd: annualizedPortfolioNegativeStd cannot be zero. Weights: {weights}')

return annualizedPortfolioNegativeStd

def CalculateAnnualizedPortfolioSharpeRatio(self, dailyReturnsDf, covariance, weights):

annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
annualizedPortfolioStd = self.CalculateAnnualizedPortfolioStd(covariance, weights)
annualizedPortfolioSharpeRatio = annualizedPortfolioReturn / annualizedPortfolioStd

return annualizedPortfolioSharpeRatio

def CalculateAnnualizedPortfolioSortinoRatio(self, dailyReturnsDf, covarianceNegativeReturns, weights):

annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
annualizedPortfolioNegativeStd = self.CalculateAnnualizedPortfolioNegativeStd(covarianceNegativeReturns, weights)
annualizedPortfolioSortinoRatio = annualizedPortfolioReturn / annualizedPortfolioNegativeStd

return annualizedPortfolioSortinoRatio

def CalculateRiskParityFunction(self, covariance, weights):

''' Spinu formulation for risk parity portfolio '''

assetsRiskBudget = self.initWeights
portfolioVolatility = self.CalculateAnnualizedPortfolioStd(covariance, weights)

x = weights / portfolioVolatility
riskParity = (np.dot(x.T, np.dot(covariance, x)) / 2) - np.dot(assetsRiskBudget.T, np.log(x))

return riskParity
from OptimalWeightsAlphaCreation import OptimalWeightsAlphaCreationModel

class PortfolioOptimizationSystem(QCAlgorithmFramework):

def Initialize(self):

### user-defined inputs ---------------------------------------------------------------------------------------------------

self.SetStartDate(2006, 1, 1)  # Set Start Date
#self.SetEndDate(2020, 5, 1)     # Set End Date
self.SetCash(100000)            # Set Strategy Cash

# UNIVERSE -------------------------------------------------------------

# select tickers
tickers = ['IEF', 'TLT', 'GLD', 'SPY', 'QQQ']

# ALPHA ----------------------------------------------------------------

# select the objective function to optimize the portfolio weights
# options are: equalWeighting, maxReturn, minVariance, meanVariance, maxSharpe, maxSortino, riskParity
objectiveFunction = 'riskParity'

# select number of lookback days for optimization
lookbackOptimization = 63

# select the logic for rebalancing period
# date rules (for the first trading day of period): Expiry.EndOfDay, Expiry.EndOfWeek, Expiry.EndOfMonth,
#                                                   Expiry.EndOfQuarter, Expiry.EndOfYear
rebalancingPeriod = Expiry.EndOfMonth

### plotting ----------------------------------------------------------------------------------------------------------------

# let's plot the series of daily total portfolio exposure %
portfolioExposurePlot = Chart('Chart Total Portfolio Exposure %')
portfolioExposurePlot.AddSeries(Series('Daily Portfolio Exposure %', SeriesType.Line, ''))

# let's plot the series of drawdown % from the most recent high
drawdownPlot = Chart('Chart Drawdown %')

# let's plot the series of optimal weights
optWeightsPlot = Chart('Chart Optimal Weights %')
for ticker in tickers:

### select modules ------------------------------------------------------------------------------------------------------------

# set the brokerage model for slippage and fees
self.SetBrokerageModel(AlphaStreamsBrokerageModel())

# set requested data resolution
self.UniverseSettings.Resolution = Resolution.Daily

# Universe -------------------------------------------------------------
symbols = []
# loop through the tickers list and create symbols for the Universe
for ticker in tickers:
symbols.append(Symbol.Create(ticker, SecurityType.Equity, Market.USA))
self.SetUniverseSelection(ManualUniverseSelectionModel(symbols))

# Alpha ---------------------------------------------------------------
self.SetAlpha(OptimalWeightsAlphaCreationModel(rebalancingPeriod = rebalancingPeriod,
objectiveFunction = objectiveFunction,
lookbackOptimization = lookbackOptimization))

# Portfolio ------------------------------------------------------------
pcm = InsightWeightingPortfolioConstructionModel(lambda time: rebalancingPeriod(time))
pcm.RebalanceOnInsightChanges = False # disable rebalancing on insights changes (new/expired insights)
pcm.RebalanceOnSecurityChanges = False # enable rebalancing on universe changes
self.SetPortfolioConstruction(pcm)

# Execution ------------------------------------------------------------
self.SetExecution(ImmediateExecutionModel())

# Risk -----------------------------------------------------------------
self.SetRiskManagement(NullRiskManagementModel())