| Overall Statistics |
|
Total Trades 1270 Average Win 0.01% Average Loss -0.01% Compounding Annual Return 14.112% Drawdown 2.800% Expectancy -0.577 Net Profit 0.253% Sharpe Ratio 0.986 Probabilistic Sharpe Ratio 49.444% Loss Rate 78% Win Rate 22% Profit-Loss Ratio 0.89 Alpha 0.714 Beta 1.678 Annual Standard Deviation 0.204 Annual Variance 0.041 Information Ratio 4.304 Tracking Error 0.118 Treynor Ratio 0.12 Total Fees $1278.46 Estimated Strategy Capacity $1400000.00 Lowest Capacity Asset WWFE RP03K24T1IED |
from Alphas.EmaCrossAlphaModel import EmaCrossAlphaModel
from Alphas.MacdAlphaModel import MacdAlphaModel
from Alphas.RsiAlphaModel import RsiAlphaModel
from Execution.VolumeWeightedAveragePriceExecutionModel import VolumeWeightedAveragePriceExecutionModel
from Portfolio.BlackLittermanOptimizationPortfolioConstructionModel import BlackLittermanOptimizationPortfolioConstructionModel
from Risk.MaximumDrawdownPercentPerSecurity import MaximumDrawdownPercentPerSecurity
#For BT 1 ONLY
from BuyAndHoldAlphaCreation import BuyAndHoldAlphaCreationModel
from LiquidGrowthUniverse import SMIDGrowth
from ManualInputs import ManualInputs
'''
Purpose of this algo is to learn how to consolidate alphas
Thesis: using a combination of EMACross, MACD and RSI to generate alpha
Optimized using the black - litterman optimization model
First BT is using a buy and hold model to test the isolated optimizer.
5.18.21: Look into why the algo is not trading, theres no trading logic.
This logic should be located in the optimizer.
There is a disconnect between insights from alpha and the port constructor.
'''
class MeasuredRedAnt(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 11, 17) # Set Start Date
self.SetEndDate(2020, 11, 23) # Set Start Date
self.SetCash(100000) # Set Strategy Cash
# self.AddEquity("SPY", Resolution.Minute)
#Create an instance of our LiquidValueUniverseSelectionModel and set to hourly resolution
self.UniverseSettings.Resolution = Resolution.Minute
self.UniverseSettings.FillForward = False
self.AddUniverseSelection(SMIDGrowth())
self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw
# self.AddAlpha(EmaCrossAlphaModel(50, 200, Resolution.Minute))
# self.AddAlpha(MacdAlphaModel(12, 26, 9, MovingAverageType.Simple, Resolution.Daily))
# self.AddAlpha(RsiAlphaModel(60, Resolution.Minute))
self.AddAlpha(BuyAndHoldAlphaCreationModel())
self.SetExecution(VolumeWeightedAveragePriceExecutionModel())
self.SetPortfolioConstruction(BlackLittermanOptimizationPortfolioConstructionModel())
self.SetRiskManagement(MaximumDrawdownPercentPerSecurity(0.01))
self.SetWarmup(50)
### Charts --------------------------------------------------------------------------------------------------
# let's plot the series of daily total portfolio exposure %
portfolioExposurePlot = Chart('Chart Total Portfolio Exposure %')
portfolioExposurePlot.AddSeries(Series('Daily Portfolio Exposure %', SeriesType.Line, ''))
self.AddChart(portfolioExposurePlot)
# let's plot the series of daily number of open longs and shorts
nLongShortPlot = Chart('Chart Number Of Longs/Shorts')
nLongShortPlot.AddSeries(Series('Daily N Longs', SeriesType.Line, ''))
nLongShortPlot.AddSeries(Series('Daily N Shorts', SeriesType.Line, ''))
self.AddChart(nLongShortPlot)
# let's plot the series of drawdown % from the most recent high
drawdownPlot = Chart('Chart Drawdown %')
drawdownPlot.AddSeries(Series('Drawdown %', SeriesType.Line, '%'))
self.AddChart(drawdownPlot)
def OnData(self, data):
'''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
Arguments:
data: Slice object keyed by symbol containing the stock data
'''
# if not self.Portfolio.Invested:
# self.SetHoldings("SPY", 1)
def security_initializer(self, security):
'''
Description:
Initialize the security with adjusted prices
Args:
security: Security which characteristics we want to change
'''
security.SetDataNormalizationMode(DataNormalizationMode.Adjusted)
#security.SetMarketPrice = self.GetLastKnownPrice(security)
if security.Type == SecurityType.Option:
security.SetMarketPrice(self.GetLastKnownPrice(security))
def OnOrderEvent(self, OrderEvent):
#Event when the order is filled. Debug log the order fill. :OrderEvent:```
if OrderEvent.FillQuantity == 0:
return
fetched = self.Transactions.GetOrderById(OrderEvent.OrderId)
self.Debug("{} was filled. Symbol: {}. Quantity: {}. Direction: {}"
.format(str(fetched.Type),
str(OrderEvent.Symbol),
str(OrderEvent.FillQuantity),
str(OrderEvent.Direction)))from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import AlphaModel, Insight, InsightType, InsightDirection
class BuyAndHoldAlphaCreationModel(AlphaModel):
'''
Description:
This Alpha model creates InsightDirection.Up (to go Long) for a duration of 1 day, every day for all active securities in our Universe
Details:
The important thing to understand here is the concept of Insight:
- A prediction about the future of the security, indicating an expected Up, Down or Flat move
- This prediction has an expiration time/date, meaning we think the insight holds for some amount of time
- In the case of a Buy and Hold strategy, we are just updating every day the Up prediction for another extra day
- In other words, every day we are making the conscious decision of staying invested in the security one more day
'''
def __init__(self, resolution = Resolution.Daily):
self.insightExpiry = Time.Multiply(Extensions.ToTimeSpan(resolution), 0.25) # insight duration
self.insightDirection = InsightDirection.Up # insight direction
self.securities = [] # list to store securities to consider
def Update(self, algorithm, data):
insights = [] # list to store the new insights to be created
# loop through securities and generate insights
for security in self.securities:
# check if there's new data for the security or we're already invested
# if there's no new data but we're invested, we keep updating the insight since we don't really need to place orders
if data.ContainsKey(security.Symbol) or algorithm.Portfolio[security.Symbol].Invested:
# append the insights list with the prediction for each symbol
insights.append(Insight.Price(security.Symbol, self.insightExpiry, self.insightDirection, .1, None, 'BuyAndHoldAlphaCreationModel', None))
else:
algorithm.Log('(Alpha) excluding this security due to missing data: ' + str(security.Symbol.Value))
return insights
def OnSecuritiesChanged(self, algorithm, changes):
'''
Description:
Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm
'''
# add new securities
for added in changes.AddedSecurities:
self.securities.append(added)
# remove securities
for removed in changes.RemovedSecurities:
if removed in self.securities:
self.securities.remove(removed)from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Indicators")
AddReference("QuantConnect.Algorithm.Framework")
from datetime import timedelta
from QuantConnect.Data.UniverseSelection import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
from HelperFunctions import GetFundamentalDataDict, MakeCalculations, GetLongShortLists, UpdatePlots
#from RiskManagement import *
from ManualInputs import ManualInputs
import pandas as pd
import numpy as np
# Define the Universe Model Class
class SMIDGrowth(FundamentalUniverseSelectionModel):
def __init__(self,
benchmark = 'SPY',
nStocks = 500,
lookback = 252,
maxNumberOfPositions = 20,
rebalancingFunc = Expiry.EndOfMonth,
filterFineData = True,
universeSettings = None,
securityInitializer = None):
self.benchmark = benchmark
self.nStocks = nStocks
self.lookback = lookback
self.maxNumberOfPositions = maxNumberOfPositions
self.rebalancingFunc = rebalancingFunc
self.nextRebalance = None
self.initBenchmarkPrice = 0
self.portfolioValueHigh = 0 # initialize portfolioValueHigh for drawdown calculation
self.portfolioValueHighInitialized = False # initialize portfolioValueHighInitialized for drawdown calculation
super().__init__(filterFineData, universeSettings, securityInitializer)
#Declare Variables
tickers = ManualInputs.m_tickers
self.averages = { }
self.hist = RollingWindow[float](390*22)
self.contract = None
#self.SetSecurityInitializer(self.security_initializer)
self.buys = []
self.sells = []
self.contract_by_equity = {}
'''
for x in ManualInputs.m_tickers:
self.AddEquity(x, Resolution.Daily)
'''
#SelectCoarse() method with its parameters
def SelectCoarse(self, algorithm, coarse):
# update plots -----------------------------------------------------------------------------------------------
UpdatePlots(self, algorithm)
#If it isn't time to update data, return the previous symbols
'''
if self.lastMonth == algorithm.Time.month:
return Universe.Unchanged
# Update self.lastMonth with current month to make sure only process once per month
self.lastMonth = algorithm.Time.month
'''
sortedByDollarVolume = sorted([x for x in coarse if x.HasFundamentalData and x.Price > 10],
key=lambda x: x.DollarVolume, reverse=True)
coarseSymbols = [x.Symbol for x in sortedByDollarVolume][:(self.nStocks * 2)]
#Return the top 100 Symbols by Dollar Volume
return coarseSymbols
#Add an empty SelectFine() method with is parameters
def SelectFine(self, algorithm, fine):
#Sort by SMID Cap Growth Criteria
sortedByRatios = sorted([f for f in fine if 5e6 < f.MarketCap < 1e10
and f.ValuationRatios.PERatio > 5
and f.ValuationRatios.PBRatio > 5
and f.Symbol not in ManualInputs.restrictedList
or f.Symbol in ManualInputs.m_tickers],
key=lambda f: f.ValuationRatios.PBRatio, reverse=True)[:self.nStocks]
#Take top 10 most profitable stocks -- and bottom 10 least profitable stocks | Save to the variable universe
#universe = sortedByRatios[:self.nStocks]
#Return the symbol objects by iterating through self.universe with list comprehension
# generate dictionary with factors -----------------------------------------------------------------------------
fundamentalDataBySymbolDict = GetFundamentalDataDict(algorithm, sortedByRatios, 'universe')
# make calculations to create long/short lists -----------------------------------------------------------------
fineSymbols = list(fundamentalDataBySymbolDict.keys())
calculations = MakeCalculations(algorithm, fineSymbols, self.lookback, Resolution.Daily, fundamentalDataBySymbolDict)
# get long/short lists of symbols
longs, shorts = GetLongShortLists(self, algorithm, calculations, 'universe')
finalSymbols = longs + shorts
return finalSymbols
#Method for monitoring if universe has changed
def OnSecuritiesChanged(self, changes):
self.Log(f'New Securities Added: {[security.Symbol.Value for security in changes.AddedSecurities]}')
self.Log(f'Securities Removed{[security.Symbol.Value for security in changes.RemovedSecurities]}')
for security in changes.AddedSecurities:
self.buys.append(security)
self.contract_by_equity[security.Symbol] = self.BuyPut(security.Symbol)
for security in changes.RemovedSecurities:
self.sells.append(security)
#Sell Put on equity assets
def BuyPut(self, symbol):
contracts = self.OptionChainProvider.GetOptionContractList(symbol, self.Time)
self.Debug(f"BuyPut: {symbol} {len(contracts)}")
#contracts = self.OptionChainProvider.GetOptionChains(self.Symbol, self.Time.date())
if len(contracts) == 0: return
min_expiry = 0
max_expiry = 40
filtered_contracts = [i for i in contracts if min_expiry <= (i.ID.Date.date() - self.Time.date()).days <= max_expiry]
put = [x for x in filtered_contracts if x.ID.OptionRight == 1]
if len(put) == 0: return
price = self.Securities[symbol].Price
# sorted the contracts according to their expiration dates and choose the ATM options
self.contract = sorted(sorted(put, key = lambda x: abs(price - x.ID.StrikePrice)),
key = lambda x: x.ID.Date, reverse=True)[0]
self.AddOptionContract(self.contract, Resolution.Minute)
self.MarketOrder(self.contract, 1)
return self.contractimport pandas as pd
from scipy.stats import zscore
from classSymbolData import SymbolData
def MakeCalculations(algorithm, symbols, lookback, resolution, fundamentalDataBySymbolDict):
'''
Description:
Make required calculations using historical data for each symbol
Args:
symbols: The symbols to make calculations for
lookback: Lookback period for historical data
resolution: Resolution for historical data
fundamentalDataBySymbolDict: Dictionary of symbols containing factors and the direction of the factor (for sorting)
Return:
calculations: Dictionary containing the calculations per symbol
'''
# store calculations
calculations = {}
if len(symbols) > 0:
# get historical prices for new symbols
history = GetHistory(algorithm, symbols,
lookbackPeriod = lookback,
resolution = resolution)
for symbol in symbols:
# if symbol has no historical data continue the loop
if (symbol not in history.index
or len(history.loc[symbol]['close']) < lookback
or history.loc[symbol].get('close') is None
or history.loc[symbol].get('close').isna().any()):
algorithm.Log('no history found for: ' + str(symbol.Value))
continue
else:
# add symbol to calculations
calculations[symbol] = SymbolData(symbol)
try:
calculations[symbol].CalculateFactors(history, fundamentalDataBySymbolDict)
except Exception as e:
algorithm.Log('removing from calculations due to ' + str(e))
calculations.pop(symbol)
continue
return calculations
def GetFundamentalDataDict(algorithm, securitiesData, module = 'universe'):
''' Create a dictionary of symbols and fundamental factors ready for sorting '''
fundamentalDataBySymbolDict = {}
# loop through data and get fundamental data
for x in securitiesData:
if module == 'alpha':
if not x.Symbol in algorithm.ActiveSecurities.Keys:
continue
fundamental = algorithm.ActiveSecurities[x.Symbol].Fundamentals
elif module == 'universe':
fundamental = x
else:
raise ValueError('module argument must be either universe or alpha')
# dictionary of symbols containing factors and the direction of the factor (1 for sorting descending and -1 for sorting ascending)
fundamentalDataBySymbolDict[x.Symbol] = {
#fundamental.ValuationRatios.BookValuePerShare: 1,
#fundamental.FinancialStatements.BalanceSheet.TotalEquity.Value: -1,
#fundamental.OperationRatios.OperationMargin.Value: 1,
#fundamental.OperationRatios.ROE.Value: 1,
#fundamental.OperationRatios.TotalAssetsGrowth.Value: 1,
#fundamental.ValuationRatios.NormalizedPERatio: 1,
fundamental.ValuationRatios.PBRatio: -1,
#fundamental.OperationRatios.TotalDebtEquityRatio.Value: -1,
fundamental.ValuationRatios.FCFRatio: -1,
fundamental.ValuationRatios.PEGRatio: -1,
#fundamental.MarketCap: 1,
}
# check validity of data
if None in list(fundamentalDataBySymbolDict[x.Symbol].keys()):
fundamentalDataBySymbolDict.pop(x.Symbol)
return fundamentalDataBySymbolDict
def GetLongShortLists(self, algorithm, calculations, module = 'universe'):
''' Create lists of long/short stocks '''
# get factors
factorsDict = { symbol: symbolData.factorsList for symbol, symbolData in calculations.items() if symbolData.factorsList is not None }
factorsDf = pd.DataFrame.from_dict(factorsDict, orient = 'index')
# normalize factor
normFactorsDf = factorsDf.apply(zscore)
normFactorsDf.columns = ['Factor_' + str(x + 1) for x in normFactorsDf.columns]
# combine factors using equal weighting
#normFactorsDf['combinedFactor'] = normFactorsDf.sum(axis = 1)
normFactorsDf['combinedFactor'] = normFactorsDf['Factor_1'] * 1 + normFactorsDf['Factor_2'] * 1
# sort descending
sortedNormFactorsDf = normFactorsDf.sort_values(by = 'combinedFactor', ascending = False) # descending
# create long/short lists
positionsEachSide = int(self.maxNumberOfPositions / 2)
longs = list(sortedNormFactorsDf[:positionsEachSide].index)
shorts = list(sortedNormFactorsDf[-positionsEachSide:].index)
shorts = [x for x in shorts if x not in longs]
if module == 'alpha' and algorithm.LiveMode:
algorithm.Log({'longs': {x.Value: factorsDict[x] for x in longs}, 'shorts': {x.Value: factorsDict[x] for x in shorts}})
return longs, shorts
def GetHistory(algorithm, symbols, lookbackPeriod, resolution):
''' Pull historical data in batches '''
total = len(symbols)
batchsize = 50
if total <= batchsize:
history = algorithm.History(symbols, lookbackPeriod, resolution)
else:
history = algorithm.History(symbols[0:batchsize], lookbackPeriod, resolution)
for i in range(batchsize, total + 1, batchsize):
batch = symbols[i:(i + batchsize)]
historyTemp = algorithm.History(batch, lookbackPeriod, resolution)
history = pd.concat([history, historyTemp])
return history
def UpdateBenchmarkValue(self, algorithm):
''' Simulate buy and hold the Benchmark '''
if self.initBenchmarkPrice == 0:
self.initBenchmarkCash = algorithm.Portfolio.Cash
self.initBenchmarkPrice = algorithm.Benchmark.Evaluate(algorithm.Time)
self.benchmarkValue = self.initBenchmarkCash
else:
currentBenchmarkPrice = algorithm.Benchmark.Evaluate(algorithm.Time)
self.benchmarkValue = (currentBenchmarkPrice / self.initBenchmarkPrice) * self.initBenchmarkCash
def UpdatePlots(self, algorithm):
''' Update Portfolio Exposure and Drawdown plots '''
# simulate buy and hold the benchmark and plot its daily value --------------
UpdateBenchmarkValue(self, algorithm)
algorithm.Plot('Strategy Equity', self.benchmark, self.benchmarkValue)
# get current portfolio value
currentTotalPortfolioValue = algorithm.Portfolio.TotalPortfolioValue
# plot the daily total portfolio exposure % --------------------------------
longHoldings = sum([x.HoldingsValue for x in algorithm.Portfolio.Values if x.IsLong])
shortHoldings = sum([x.HoldingsValue for x in algorithm.Portfolio.Values if x.IsShort])
totalHoldings = longHoldings + shortHoldings
totalPortfolioExposure = (totalHoldings / currentTotalPortfolioValue) * 100
algorithm.Plot('Chart Total Portfolio Exposure %', 'Daily Portfolio Exposure %', totalPortfolioExposure)
# plot the daily number of longs and shorts --------------------------------
nLongs = sum(x.IsLong for x in algorithm.Portfolio.Values)
nShorts = sum(x.IsShort for x in algorithm.Portfolio.Values)
algorithm.Plot('Chart Number Of Longs/Shorts', 'Daily N Longs', nLongs)
algorithm.Plot('Chart Number Of Longs/Shorts', 'Daily N Shorts', nShorts)
# plot the drawdown % from the most recent high ---------------------------
if not self.portfolioValueHighInitialized:
self.portfolioHigh = currentTotalPortfolioValue # set initial portfolio value
self.portfolioValueHighInitialized = True
# update trailing high value of the portfolio
if self.portfolioValueHigh < currentTotalPortfolioValue:
self.portfolioValueHigh = currentTotalPortfolioValue
currentDrawdownPercent = ((float(currentTotalPortfolioValue) / float(self.portfolioValueHigh)) - 1.0) * 100
algorithm.Plot('Chart Drawdown %', 'Drawdown %', currentDrawdownPercent)
'''
symbols = []
# loop through the tickers list and create symbols for the universe
for i in range(len(algorithm.Portfolio.Values)):
symbols.append(Symbol.Create(tickers[i], SecurityType.Equity, Market.USA))
allocationPlot.AddSeries(Series(tickers[i], SeriesType.Line, ''))
#algorithm.Plot('Optimal Allocation', )
'''import pandas as pd
import numpy as np
from scipy.stats import skew, kurtosis
class SymbolData:
''' Perform calculations '''
def __init__(self, symbol):
self.Symbol = symbol
self.fundamentalDataDict = {}
self.momentum = None
self.volatility = None
self.skewness = None
self.kurt = None
self.positionVsHL = None
self.meanOvernightReturns = None
def CalculateFactors(self, history, fundamentalDataBySymbolDict):
self.fundamentalDataDict = fundamentalDataBySymbolDict[self.Symbol]
self.momentum = self.CalculateMomentum(history)
self.volatility = self.CalculateVolatility(history)
#self.skewness = self.CalculateSkewness(history)
#self.kurt = self.CalculateKurtosis(history)
#self.distanceVsHL = self.CalculateDistanceVsHL(history)
#self.meanOvernightReturns = self.CalculateMeanOvernightReturns(history)
def CalculateMomentum(self, history):
closePrices = history.loc[self.Symbol]['close']
momentum = (closePrices[-1] / closePrices[-252]) - 1
return momentum
def CalculateVolatility(self, history):
closePrices = history.loc[self.Symbol]['close']
returns = closePrices.pct_change().dropna()
volatility = np.nanstd(returns, axis = 0)
return volatility
def CalculateSkewness(self, history):
closePrices = history.loc[self.Symbol]['close']
returns = closePrices.pct_change().dropna()
skewness = skew(returns)
return skewness
def CalculateKurtosis(self, history):
closePrices = history.loc[self.Symbol]['close']
returns = closePrices.pct_change().dropna()
kurt = kurtosis(returns)
return kurt
def CalculateDistanceVsHL(self, history):
closePrices = history.loc[self.Symbol]['close']
annualHigh = max(closePrices)
annualLow = min(closePrices)
distanceVsHL = (closePrices[-1] - annualLow) / (annualHigh - annualLow)
return distanceVsHL
def CalculateMeanOvernightReturns(self, history):
overnnightReturns = (history.loc[self.Symbol]['open'] / history.loc[self.Symbol]['close'].shift(1)) - 1
meanOvernightReturns = np.nanmean(overnnightReturns, axis = 0)
return meanOvernightReturns
@property
def factorsList(self):
technicalFactors = [self.momentum, self.volatility]
fundamentalFactors = [float(key) * value for key, value in self.fundamentalDataDict.items()]
if all(v is not None for v in technicalFactors):
return technicalFactors + fundamentalFactors
else:
return Noneclass ManualInputs:
#m_tickers = ['AAPL', 'AMZN', 'NFLX', 'GOOG','FB']
m_tickers = ['AMZN', 'ARKK']
#m_tickers = []
restrictedList = ["GME"]# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Logging")
AddReference("QuantConnect.Indicators")
from System import *
from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm import *
from QuantConnect.Logging import Log
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import InsightCollection, InsightDirection
from QuantConnect.Algorithm.Framework.Portfolio import PortfolioConstructionModel, PortfolioTarget, PortfolioBias
from Portfolio.MaximumSharpeRatioPortfolioOptimizer import MaximumSharpeRatioPortfolioOptimizer
from datetime import datetime, timedelta
from itertools import groupby
import pandas as pd
import numpy as np
from numpy import dot, transpose
from numpy.linalg import inv
### <summary>
### Provides an implementation of Black-Litterman portfolio optimization. The model adjusts equilibrium market
### returns by incorporating views from multiple alpha models and therefore to get the optimal risky portfolio
### reflecting those views. If insights of all alpha models have None magnitude or there are linearly dependent
### vectors in link matrix of views, the expected return would be the implied excess equilibrium return.
### The interval of weights in optimization method can be changed based on the long-short algorithm.
### The default model uses the 0.0025 as weight-on-views scalar parameter tau and
### MaximumSharpeRatioPortfolioOptimizer that accepts a 63-row matrix of 1-day returns.
### </summary>
class BlackLittermanOptimizationPortfolioConstructionModel(PortfolioConstructionModel):
def __init__(self,
rebalance = Resolution.Daily,
portfolioBias = PortfolioBias.LongShort,
lookback = 1,
period = 63,
resolution = Resolution.Daily,
risk_free_rate = 0,
delta = 2.5,
tau = 0.05,
optimizer = None):
"""Initialize the model
Args:
rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function.
If None will be ignored.
The function returns the next expected rebalance time for a given algorithm UTC DateTime.
The function returns null if unknown, in which case the function will be called again in the
next loop. Returning current time will trigger rebalance.
portfolioBias: Specifies the bias of the portfolio (Short, Long/Short, Long)
lookback(int): Historical return lookback period
period(int): The time interval of history price to calculate the weight
resolution: The resolution of the history price
risk_free_rate(float): The risk free rate
delta(float): The risk aversion coeffficient of the market portfolio
tau(float): The model parameter indicating the uncertainty of the CAPM prior"""
self.lookback = lookback
self.period = period
self.resolution = resolution
self.risk_free_rate = risk_free_rate
self.delta = delta
self.tau = tau
self.portfolioBias = portfolioBias
lower = 0 if portfolioBias == PortfolioBias.Long else -1
upper = 0 if portfolioBias == PortfolioBias.Short else 1
self.optimizer = MaximumSharpeRatioPortfolioOptimizer(lower, upper, risk_free_rate) if optimizer is None else optimizer
self.sign = lambda x: -1 if x < 0 else (1 if x > 0 else 0)
self.symbolDataBySymbol = {}
# If the argument is an instance of Resolution or Timedelta
# Redefine rebalancingFunc
rebalancingFunc = rebalance
if isinstance(rebalance, int):
rebalance = Extensions.ToTimeSpan(rebalance)
if isinstance(rebalance, timedelta):
rebalancingFunc = lambda dt: dt + rebalance
if rebalancingFunc:
self.SetRebalancingFunc(rebalancingFunc)
def ShouldCreateTargetForInsight(self, insight):
return len(PortfolioConstructionModel.FilterInvalidInsightMagnitude(self.Algorithm, [ insight ])) != 0
def DetermineTargetPercent(self, lastActiveInsights):
targets = {}
# Get view vectors
P, Q = self.get_views(lastActiveInsights)
if P is not None:
returns = dict()
# Updates the BlackLittermanSymbolData with insights
# Create a dictionary keyed by the symbols in the insights with an pandas.Series as value to create a data frame
for insight in lastActiveInsights:
symbol = insight.Symbol
symbolData = self.symbolDataBySymbol.get(symbol, self.BlackLittermanSymbolData(symbol, self.lookback, self.period))
if insight.Magnitude is None:
self.Algorithm.SetRunTimeError(ArgumentNullException('BlackLittermanOptimizationPortfolioConstructionModel does not accept \'None\' as Insight.Magnitude. Please make sure your Alpha Model is generating Insights with the Magnitude property set.'))
return targets
symbolData.Add(insight.GeneratedTimeUtc, insight.Magnitude)
returns[symbol] = symbolData.Return
returns = pd.DataFrame(returns)
# Calculate prior estimate of the mean and covariance
Pi, Sigma = self.get_equilibrium_return(returns)
# Calculate posterior estimate of the mean and covariance
Pi, Sigma = self.apply_blacklitterman_master_formula(Pi, Sigma, P, Q)
# Create portfolio targets from the specified insights
weights = self.optimizer.Optimize(returns, Pi, Sigma)
weights = pd.Series(weights, index = Sigma.columns)
for symbol, weight in weights.items():
for insight in lastActiveInsights:
if str(insight.Symbol) == str(symbol):
# don't trust the optimizer
if self.portfolioBias != PortfolioBias.LongShort and self.sign(weight) != self.portfolioBias:
weight = 0
targets[insight] = weight
break;
return targets
def GetTargetInsights(self):
# Get insight that haven't expired of each symbol that is still in the universe
activeInsights = self.InsightCollection.GetActiveInsights(self.Algorithm.UtcTime)
# Get the last generated active insight for each symbol
lastActiveInsights = []
for sourceModel, f in groupby(sorted(activeInsights, key = lambda ff: ff.SourceModel), lambda fff: fff.SourceModel):
for symbol, g in groupby(sorted(list(f), key = lambda gg: gg.Symbol), lambda ggg: ggg.Symbol):
lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1])
return lastActiveInsights
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
# Get removed symbol and invalidate them in the insight collection
super().OnSecuritiesChanged(algorithm, changes)
for security in changes.RemovedSecurities:
symbol = security.Symbol
symbolData = self.symbolDataBySymbol.pop(symbol, None)
if symbolData is not None:
symbolData.Reset()
# initialize data for added securities
addedSymbols = { x.Symbol: x.Exchange.TimeZone for x in changes.AddedSecurities }
history = algorithm.History(list(addedSymbols.keys()), self.lookback * self.period, self.resolution)
if history.empty:
return
history = history.close.unstack(0)
symbols = history.columns
for symbol, timezone in addedSymbols.items():
if str(symbol) not in symbols:
continue
symbolData = self.symbolDataBySymbol.get(symbol, self.BlackLittermanSymbolData(symbol, self.lookback, self.period))
for time, close in history[symbol].items():
utcTime = Extensions.ConvertToUtc(time, timezone)
symbolData.Update(utcTime, close)
self.symbolDataBySymbol[symbol] = symbolData
def apply_blacklitterman_master_formula(self, Pi, Sigma, P, Q):
'''Apply Black-Litterman master formula
http://www.blacklitterman.org/cookbook.html
Args:
Pi: Prior/Posterior mean array
Sigma: Prior/Posterior covariance matrix
P: A matrix that identifies the assets involved in the views (size: K x N)
Q: A view vector (size: K x 1)'''
ts = self.tau * Sigma
# Create the diagonal Sigma matrix of error terms from the expressed views
omega = np.dot(np.dot(P, ts), P.T) * np.eye(Q.shape[0])
if np.linalg.det(omega) == 0:
return Pi, Sigma
A = np.dot(np.dot(ts, P.T), inv(np.dot(np.dot(P, ts), P.T) + omega))
Pi = np.squeeze(np.asarray((
np.expand_dims(Pi, axis=0).T +
np.dot(A, (Q - np.expand_dims(np.dot(P, Pi.T), axis=1))))
))
M = ts - np.dot(np.dot(A, P), ts)
Sigma = (Sigma + M) * self.delta
return Pi, Sigma
def get_equilibrium_return(self, returns):
'''Calculate equilibrium returns and covariance
Args:
returns: Matrix of returns where each column represents a security and each row returns for the given date/time (size: K x N)
Returns:
equilibrium_return: Array of double of equilibrium returns
cov: Multi-dimensional array of double with the portfolio covariance of returns (size: K x K)'''
size = len(returns.columns)
# equal weighting scheme
W = np.array([1/size]*size)
# the covariance matrix of excess returns (N x N matrix)
cov = returns.cov()*252
# annualized return
annual_return = np.sum(((1 + returns.mean())**252 -1) * W)
# annualized variance of return
annual_variance = dot(W.T, dot(cov, W))
# the risk aversion coefficient
risk_aversion = (annual_return - self.risk_free_rate ) / annual_variance
# the implied excess equilibrium return Vector (N x 1 column vector)
equilibrium_return = dot(dot(risk_aversion, cov), W)
return equilibrium_return, cov
def get_views(self, insights):
'''Generate views from multiple alpha models
Args
insights: Array of insight that represent the investors' views
Returns
P: A matrix that identifies the assets involved in the views (size: K x N)
Q: A view vector (size: K x 1)'''
try:
P = {}
Q = {}
for model, group in groupby(insights, lambda x: x.SourceModel):
group = list(group)
up_insights_sum = 0.0
dn_insights_sum = 0.0
for insight in group:
if insight.Direction == InsightDirection.Up:
up_insights_sum = up_insights_sum + np.abs(insight.Magnitude)
if insight.Direction == InsightDirection.Down:
dn_insights_sum = dn_insights_sum + np.abs(insight.Magnitude)
q = up_insights_sum if up_insights_sum > dn_insights_sum else dn_insights_sum
if q == 0:
continue
Q[model] = q
# generate the link matrix of views: P
P[model] = dict()
for insight in group:
value = insight.Direction * np.abs(insight.Magnitude)
P[model][insight.Symbol] = value / q
# Add zero for other symbols that are listed but active insight
for symbol in self.symbolDataBySymbol.keys():
if symbol not in P[model]:
P[model][symbol] = 0
Q = np.array([[x] for x in Q.values()])
if len(Q) > 0:
P = np.array([list(x.values()) for x in P.values()])
return P, Q
except:
pass
return None, None
class BlackLittermanSymbolData:
'''Contains data specific to a symbol required by this model'''
def __init__(self, symbol, lookback, period):
self.symbol = symbol
self.roc = RateOfChange(f'{symbol}.ROC({lookback})', lookback)
self.roc.Updated += self.OnRateOfChangeUpdated
self.window = RollingWindow[IndicatorDataPoint](period)
def Reset(self):
self.roc.Updated -= self.OnRateOfChangeUpdated
self.roc.Reset()
self.window.Reset()
def Update(self, utcTime, close):
self.roc.Update(utcTime, close)
def OnRateOfChangeUpdated(self, roc, value):
if roc.IsReady:
self.window.Add(value)
def Add(self, time, value):
if self.window.Samples > 0 and self.window[0].EndTime == time:
return;
item = IndicatorDataPoint(self.symbol, time, value)
self.window.Add(item)
@property
def Return(self):
return pd.Series(
data = [x.Value for x in self.window],
index = [x.EndTime for x in self.window])
@property
def IsReady(self):
return self.window.IsReady
def __str__(self, **kwargs):
return f'{self.roc.Name}: {(1 + self.window[0])**252 - 1:.2%}'