| Overall Statistics |
|
Total Trades 37 Average Win 0.28% Average Loss -1.27% Compounding Annual Return 2525.865% Drawdown 6.800% Expectancy 0.035 Net Profit 12.680% Sharpe Ratio 27.627 Probabilistic Sharpe Ratio 77.542% Loss Rate 15% Win Rate 85% Profit-Loss Ratio 0.22 Alpha 0 Beta 0 Annual Standard Deviation 0.628 Annual Variance 0.394 Information Ratio 27.627 Tracking Error 0.628 Treynor Ratio 0 Total Fees $39.85 |
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd
import numpy as np
from scipy.optimize import minimize
class myTrailingStopRiskManagementModel:
'''
Credit goes to: Alex Catarino and many of his friends at QuantConnect
https://github.com/QuantConnect/Lean/blob/master/Algorithm.Framework/Risk/TrailingStopRiskManagementModel.py
Description:
Limits the maximum possible loss measured from the highest unrealized profit
'''
def __init__(self, maximumDrawdownPercent = 0.05):
'''initializes the class
Args: maximumDrawdownPercent: The maximum percentage drawdown allowed for algorithm portfolio compared with the highest unrealized profit, defaults to 5% drawdown
'''
self.maximumDrawdownPercent = -abs(maximumDrawdownPercent)
self.trailingHighs = dict()
def SetWTtoZeroIfDDtooHigh(self, algo, targets=None):
'''If drawdown is too high, set wt[symbol] to zero
algo.wt[symbol] = weights which will be set to 0 in case drawdown exceeds the maximum
'''
for kvp in algo.Securities:
symbol = kvp.Key
security = kvp.Value
# Remove from trailingHighs dict if not invested
if not security.Invested:
self.trailingHighs.pop(symbol, None)
continue
# Add newly invested securities to trailingHighs dict
if symbol not in self.trailingHighs:
self.trailingHighs[symbol] = security.Holdings.AveragePrice
continue
# Check for new highs and update trailingHighs dict
if self.trailingHighs[symbol] < security.High:
self.trailingHighs[symbol] = security.High
continue
# Calc the drawdown
securityHigh = self.trailingHighs[symbol]
drawdown = (security.Low / securityHigh) - 1
# If drawdown is too high, set symbol weight to zero
if drawdown < self.maximumDrawdownPercent:
algo.wt[symbol] = 0
return
class myPortfolioOptimizer:
'''
Credit goes to: Emilio Freire / InnoQuantivity
https://innoquantivity.com/blogs/inno-blog/portfolio-optimization-quantconnect-research-algorithm
https://www.quantconnect.com/forum/discussion/8128/portfolio-optimization-research-amp-algorithm-for-better-workflows/p1/comment-22952
Description:
Implementation of a custom optimizer that calculates the weights for each asset to optimize a given objective function
Details:
Optimization can be:
- Equal Weighting
- Maximize Portfolio Return
- Minimize Portfolio Standard Deviation
- Mean-Variance (minimize Standard Deviation given a target return)
- Maximize Portfolio Sharpe Ratio
- Maximize Portfolio Sortino Ratio
- Risk Parity Portfolio
Constraints:
- Weights must be between some given boundaries
- Weights must sum to 1
'''
def __init__(self,
minWeight = 0,
maxWeight = 1):
'''
Description:
Initialize the CustomPortfolioOptimizer
Args:
minWeight(float): The lower bound on portfolio weights
maxWeight(float): The upper bound on portfolio weights
'''
self.minWeight = minWeight
self.maxWeight = maxWeight
def CalcWeights(self, algo, symbols, objectiveFunction='riskParity', lookback=63, targetReturn=None):
'''
Description:
Calculate weights from daily returns, return a pandas Series
'''
history = algo.History(symbols, lookback, Resolution.Daily)['close'].unstack(level = 0)
returnsDf = history.pct_change().dropna()
returnsDf.columns = map(lambda x : x.Value, symbols)
weights = self.Optimize(objectiveFunction, returnsDf, targetReturn)
return pd.Series(weights, index=returnsDf.columns, name='weights')
def Optimize(self, objFunction, dailyReturnsDf, targetReturn = None):
'''
Description:
Perform portfolio optimization given a series of returns
Args:
objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance, maxSharpe, maxSortino, riskParity)
dailyReturnsDf: DataFrame of historical daily arithmetic returns
Returns:
Array of double with the portfolio weights (size: K x 1)
'''
# initial weights: equally weighted
size = dailyReturnsDf.columns.size # K x 1
self.initWeights = np.array(size * [1. / size])
# get sample covariance matrix
covariance = dailyReturnsDf.cov()
# get the sample covariance matrix of only negative returns for sortino ratio
negativeReturnsDf = dailyReturnsDf[dailyReturnsDf < 0]
covarianceNegativeReturns = negativeReturnsDf.cov()
if objFunction == 'equalWeighting':
return self.initWeights
bounds = tuple((self.minWeight, self.maxWeight) for x in range(size))
constraints = [{'type': 'eq', 'fun': lambda x: np.sum(x) - 1.0}]
if objFunction == 'meanVariance':
# if no target return is provided, use the resulting from equal weighting
if targetReturn is None:
targetReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, self.initWeights)
constraints.append( {'type': 'eq', 'fun': lambda weights:
self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) - targetReturn} )
opt = minimize(lambda weights: self.ObjectiveFunction(objFunction, dailyReturnsDf,
covariance, covarianceNegativeReturns,
weights),
x0 = self.initWeights,
bounds = bounds,
constraints = constraints,
method = 'SLSQP')
return opt['x']
def ObjectiveFunction(self, objFunction, dailyReturnsDf, covariance, covarianceNegativeReturns, weights):
'''
Description:
Compute the objective function
Args:
objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance,
maxSharpe, maxSortino, riskParity)
dailyReturnsDf: DataFrame of historical daily returns
covariance: Sample covariance
covarianceNegativeReturns: Sample covariance matrix of only negative returns
weights: Portfolio weights
'''
if objFunction == 'maxReturn':
f = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
return -f # convert to negative to be minimized
elif objFunction == 'minVariance':
f = self.CalculateAnnualizedPortfolioStd(covariance, weights)
return f
elif objFunction == 'meanVariance':
f = self.CalculateAnnualizedPortfolioStd(covariance, weights)
return f
elif objFunction == 'maxSharpe':
f = self.CalculateAnnualizedPortfolioSharpeRatio(dailyReturnsDf, covariance, weights)
return -f # convert to negative to be minimized
elif objFunction == 'maxSortino':
f = self.CalculateAnnualizedPortfolioSortinoRatio(dailyReturnsDf, covarianceNegativeReturns, weights)
return -f # convert to negative to be minimized
elif objFunction == 'riskParity':
f = self.CalculateRiskParityFunction(covariance, weights)
return f
else:
raise ValueError(f'PortfolioOptimizer.ObjectiveFunction: objFunction input has to be one of equalWeighting,'
+ ' maxReturn, minVariance, meanVariance, maxSharpe, maxSortino or riskParity')
def CalculateAnnualizedPortfolioReturn(self, dailyReturnsDf, weights):
annualizedPortfolioReturns = np.sum( ((1 + dailyReturnsDf.mean())**252 - 1) * weights )
return annualizedPortfolioReturns
def CalculateAnnualizedPortfolioStd(self, covariance, weights):
annualizedPortfolioStd = np.sqrt( np.dot(weights.T, np.dot(covariance * 252, weights)) )
if annualizedPortfolioStd == 0:
raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioStd: annualizedPortfolioStd cannot be zero. Weights: {weights}')
return annualizedPortfolioStd
def CalculateAnnualizedPortfolioNegativeStd(self, covarianceNegativeReturns, weights):
annualizedPortfolioNegativeStd = np.sqrt( np.dot(weights.T, np.dot(covarianceNegativeReturns * 252, weights)) )
if annualizedPortfolioNegativeStd == 0:
raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioNegativeStd: annualizedPortfolioNegativeStd cannot be zero. Weights: {weights}')
return annualizedPortfolioNegativeStd
def CalculateAnnualizedPortfolioSharpeRatio(self, dailyReturnsDf, covariance, weights):
annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
annualizedPortfolioStd = self.CalculateAnnualizedPortfolioStd(covariance, weights)
annualizedPortfolioSharpeRatio = annualizedPortfolioReturn / annualizedPortfolioStd
return annualizedPortfolioSharpeRatio
def CalculateAnnualizedPortfolioSortinoRatio(self, dailyReturnsDf, covarianceNegativeReturns, weights):
annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
annualizedPortfolioNegativeStd = self.CalculateAnnualizedPortfolioNegativeStd(covarianceNegativeReturns, weights)
annualizedPortfolioSortinoRatio = annualizedPortfolioReturn / annualizedPortfolioNegativeStd
return annualizedPortfolioSortinoRatio
def CalculateRiskParityFunction(self, covariance, weights):
''' Spinu formulation for risk parity portfolio '''
assetsRiskBudget = self.initWeights
portfolioVolatility = self.CalculateAnnualizedPortfolioStd(covariance, weights)
x = weights / portfolioVolatility
riskParity = (np.dot(x.T, np.dot(covariance, x)) / 2) - np.dot(assetsRiskBudget.T, np.log(x))
return riskParity'''
v1.5. Intersection of ROC comparison using OUT_DAY approach by Vladimir
(with dynamic stocks selector by fundamental factors and momentum)
eliminated fee saving part of the code plus daily rebalance
inspired by Peter Guenther, Tentor Testivis, Dan Whitnable, Thomas Chang, Miko M, Leandro Maia
Updates:
- Logic for Trailing Stop Loss from Quant Connect
- Logic for Portfolio Optimization from Emilio Freire
- Option for Weighted Fundamentals from some other nice person
'''
from QuantConnect.Data.UniverseSelection import *
import numpy as np
import pandas as pd
from helpers import myPortfolioOptimizer
from helpers import myTrailingStopRiskManagementModel
# ------------------------------------------------------------------------------------------------------------------------------------------------
BONDS = ['TLT']; VOLA = 126; BASE_RET = 85; STK_MOM = 126; N_COARSE = 100; N_FACTOR = 20; N_MOM = 5; PFO = 0; LEV = 1.50; HEDGE = 0.00; TSL = 0.00
# ------------------------------------------------------------------------------------------------------------------------------------------------
class Fundamental_Factors_Momentum_ROC_Comparison_OUT_DAY(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2021, 1, 1)
self.SetEndDate(2021, 1, 13)
self.InitCash = 100000
self.SetCash(self.InitCash)
self.MKT = self.AddEquity("SPY", Resolution.Hour).Symbol
self.mkt = []
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
res = Resolution.Minute # <- changed
self.BONDS = [self.AddEquity(ticker, res).Symbol for ticker in BONDS]
self.INI_WAIT_DAYS = 15
self.wait_days = self.INI_WAIT_DAYS
self.GLD = self.AddEquity('GLD', res).Symbol
self.SLV = self.AddEquity('SLV', res).Symbol
self.XLU = self.AddEquity('XLU', res).Symbol
self.XLI = self.AddEquity('XLI', res).Symbol
self.UUP = self.AddEquity('UUP', res).Symbol
self.DBB = self.AddEquity('DBB', res).Symbol
self.pairs = [self.GLD, self.SLV, self.XLU, self.XLI, self.UUP, self.DBB]
self.bull = 1
self.bull_prior = 0
self.count = 0
self.outday = (-self.INI_WAIT_DAYS+1)
self.SetWarmUp(timedelta(350))
self.UniverseSettings.Resolution = res
self.AddUniverse(self.CoarseFilter, self.FineFilter)
self.data = {}
self.RebalanceFreq = 60
self.UpdateFineFilter = 0
self.symbols = None
self.RebalanceCount = 0
self.wt = {}
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen('SPY', 30),
self.daily_check)
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen('SPY', 60),
self.trade)
if TSL != 0: # <- added
self.tsl_max = float(self.GetParameter("tsl_max"))
self.tsl = myTrailingStopRiskManagementModel(maximumDrawdownPercent=self.tsl_max) # <- added
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(TimeSpan.FromMinutes(60)),
self.stop_loss) # <- added
if PFO != 0: # <- added
self.pfo = myPortfolioOptimizer(minWeight=0, maxWeight=1) # <- added
self.weights_stks = self.pfo.CalcWeights(self, symbols=self.BONDS) # <- added
self.weights_bnds = self.pfo.CalcWeights(self, symbols=self.BONDS) # <- added
symbols = [self.MKT] + self.pairs
for symbol in symbols:
self.consolidator = TradeBarConsolidator(timedelta(days=1))
self.consolidator.DataConsolidated += self.consolidation_handler
self.SubscriptionManager.AddConsolidator(symbol, self.consolidator)
self.history = self.History(symbols, VOLA, Resolution.Daily)
if self.history.empty or 'close' not in self.history.columns:
return
self.history = self.history['close'].unstack(level=0).dropna()
def consolidation_handler(self, sender, consolidated):
self.history.loc[consolidated.EndTime, consolidated.Symbol] = consolidated.Close
self.history = self.history.iloc[-VOLA:]
def derive_vola_waitdays(self):
sigma = 0.6 * np.log1p(self.history[[self.MKT]].pct_change()).std() * np.sqrt(252)
wait_days = int(sigma * BASE_RET)
period = int((1.0 - sigma) * BASE_RET)
return wait_days, period
def CoarseFilter(self, coarse):
if not (((self.count-self.RebalanceCount) == self.RebalanceFreq) or (self.count == self.outday + self.wait_days - 1)):
self.UpdateFineFilter = 0
return Universe.Unchanged
self.UpdateFineFilter = 1
selected = [x for x in coarse if (x.HasFundamentalData) and (float(x.Price) > 5)]
filtered = sorted(selected, key=lambda x: x.DollarVolume, reverse=True)
return [x.Symbol for x in filtered[:N_COARSE]]
def FineFilter(self, fundamental):
if self.UpdateFineFilter == 0:
return Universe.Unchanged
filtered_fundamental = [x for x in fundamental
if (x.ValuationRatios.EVToEBITDA > 0)
and (x.EarningReports.BasicAverageShares.ThreeMonths > 0)
and float(x.EarningReports.BasicAverageShares.ThreeMonths) * x.Price > 2e9
and x.ValuationRatios.PricetoEBITDA
and x.ValuationRatios.PERatio
and x.SecurityReference.IsPrimaryShare
and x.SecurityReference.SecurityType == "ST00000001"
and x.SecurityReference.IsDepositaryReceipt == 0
and x.CompanyReference.IsLimitedPartnership == 0
]
# https://www.quantconnect.com/docs/data-library/fundamentals#Fundamentals-Reference-Tables
s1 = sorted(filtered_fundamental, key=lambda x: x.ValuationRatios.EVToEBITDA, reverse=False) # <- added
s2 = sorted(filtered_fundamental, key=lambda x: x.ValuationRatios.PricetoEBITDA, reverse=False) # <- added
s3 = sorted(filtered_fundamental, key=lambda x: x.ValuationRatios.PERatio, reverse=False) # <- added
dict = {}
for i, elem in enumerate(s1): # <- added
i1 = i # <- added
i2 = s2.index(elem) # <- added
i3 = s3.index(elem) # <- added
score = sum([i1 * 1.0, i2 * 0.0, i3 * 0.0]) # <- added
dict[elem] = score # <- added
#top = sorted(filtered_fundamental, key = lambda x: x.ValuationRatios.EVToEBITDA, reverse=True)[:N_FACTOR]
#self.symbols = [x.Symbol for x in top]
top = sorted(dict.items(), key = lambda x: x[1], reverse=True)[:N_FACTOR] # <- changed
self.symbols = [x[0].Symbol for x in top] # <- changed
self.UpdateFineFilter = 0
self.RebalanceCount = self.count
return self.symbols
def OnSecuritiesChanged(self, changes):
addedSymbols = []
for security in changes.AddedSecurities:
addedSymbols.append(security.Symbol)
if security.Symbol not in self.data:
self.data[security.Symbol] = SymbolData(security.Symbol, STK_MOM, self)
if len(addedSymbols) > 0:
history = self.History(addedSymbols, 1 + STK_MOM, Resolution.Daily).loc[addedSymbols]
for symbol in addedSymbols:
try:
self.data[symbol].Warmup(history.loc[symbol])
except:
self.Debug(str(symbol))
continue
def calc_return(self, stocks):
ret = {}
for symbol in stocks:
try:
ret[symbol] = self.data[symbol].Roc.Current.Value
except:
self.Debug(str(symbol))
continue
df_ret = pd.DataFrame.from_dict(ret, orient='index')
df_ret.columns = ['return']
sort_return = df_ret.sort_values(by = ['return'], ascending = False)
return sort_return
def daily_check(self):
self.wait_days, period = self.derive_vola_waitdays()
r = self.history.pct_change(period).iloc[-1]
self.bear = ((r[self.SLV] < r[self.GLD]) and (r[self.XLI] < r[self.XLU]) and (r[self.DBB] < r[self.UUP]))
self.down2x = ((r[self.SLV] < r[self.GLD]) and (r[self.XLI] < r[self.XLU])) or \
((r[self.XLI] < r[self.XLU]) and (r[self.DBB] < r[self.UUP])) or \
((r[self.SLV] < r[self.GLD]) and (r[self.DBB] < r[self.UUP])) # <- changed
self.up2x = ((r[self.SLV] > r[self.GLD]) and (r[self.XLI] > r[self.XLU])) or \
((r[self.XLI] > r[self.XLU]) and (r[self.DBB] > r[self.UUP])) or \
((r[self.SLV] > r[self.GLD]) and (r[self.DBB] > r[self.UUP])) # <- changed
self.up3x = ((r[self.SLV] > r[self.GLD]) and (r[self.XLI] > r[self.XLU]) and (r[self.DBB] > r[self.UUP])) # <- changed
if self.bear:
self.bull = False
self.outday = self.count
if (self.count >= self.outday + self.wait_days):
self.bull = True
self.bull_prior = self.bull
self.count += 1
def trade(self):
if self.symbols is None: return
output = self.calc_return(self.symbols)
stocks = output.iloc[:N_MOM].index
for sec in self.Portfolio.Keys:
if sec not in stocks and sec not in self.BONDS:
self.wt[sec] = 0.
if PFO == 0: # <- added
for sec in stocks: # <- added
self.wt[sec] = LEV*(1.0 - HEDGE)/len(stocks) if self.bull else LEV*HEDGE/len(stocks);
else: # <- added
self.weights_stks = self.pfo.CalcWeights(self, symbols=stocks.values.tolist()) # <- added
for sec in stocks: # <- added
sec_wt = self.weights_stks[self.weights_stks.index==str(sec.Value)][0] # <- added
self.wt[sec] = LEV*(1.0 - HEDGE)*sec_wt if self.bull else LEV*HEDGE*sec_wt # <- added
for sec in self.BONDS:
self.wt[sec] = LEV*HEDGE/len(self.BONDS) if self.bull else LEV*(1.0 - HEDGE)/len(self.BONDS);
for sec, weight in self.wt.items():
if weight == 0. and self.Portfolio[sec].IsLong:
self.Liquidate(sec)
for sec, weight in self.wt.items():
if weight != 0.:
self.SetHoldings(sec, weight)
def stop_loss(self): # <- added
if self.symbols is None: return # <- added
if TSL != 0: # <- added
self.tsl.SetWTtoZeroIfDDtooHigh(self) # <- added
for sec, weight in self.wt.items(): # <- added
if weight == 0. and self.Portfolio[sec].IsLong: # <- added
self.Liquidate(sec) # <- added
def OnEndOfDay(self):
mkt_price = self.Securities[self.MKT].Close
self.mkt.append(mkt_price)
mkt_perf = self.InitCash * self.mkt[-1] / self.mkt[0]
self.Plot('Strategy Equity', self.MKT, mkt_perf)
account_leverage = self.Portfolio.TotalHoldingsValue / self.Portfolio.TotalPortfolioValue
self.Plot('Holdings', 'leverage', round(account_leverage, 2))
self.Plot('Holdings', 'Target Leverage', LEV)
class SymbolData(object):
def __init__(self, symbol, roc, algorithm):
self.Symbol = symbol
self.Roc = RateOfChange(roc)
self.algorithm = algorithm
self.consolidator = algorithm.ResolveConsolidator(symbol, Resolution.Daily)
algorithm.RegisterIndicator(symbol, self.Roc, self.consolidator)
def Warmup(self, history):
for index, row in history.iterrows():
self.Roc.Update(index, row['close'])