| Overall Statistics |
|
Total Trades 826 Average Win 2.18% Average Loss -1.14% Compounding Annual Return 40.793% Drawdown 22.300% Expectancy 0.896 Net Profit 8581.545% Sharpe Ratio 1.702 Probabilistic Sharpe Ratio 96.538% Loss Rate 35% Win Rate 65% Profit-Loss Ratio 1.92 Alpha 0 Beta 0 Annual Standard Deviation 0.208 Annual Variance 0.043 Information Ratio 1.702 Tracking Error 0.208 Treynor Ratio 0 Total Fees $15340.67 |
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pandas as pd
import numpy as np
from scipy.optimize import minimize
class myTrailingStopRiskManagementModel:
'''
Credit goes to: Alex Catarino and many of his friends at QuantConnect
https://github.com/QuantConnect/Lean/blob/master/Algorithm.Framework/Risk/TrailingStopRiskManagementModel.py
Description:
Limits the maximum possible loss measured from the highest unrealized profit
'''
def __init__(self, maximumDrawdownPercent = 0.08):
'''initializes the class
Args: maximumDrawdownPercent: The maximum percentage drawdown allowed for algorithm portfolio compared with the highest unrealized profit, defaults to 5% drawdown
'''
self.maximumDrawdownPercent = -abs(maximumDrawdownPercent)
self.trailingHighs = dict()
def setDD(self, maximumDrawdownPercent = 0.08):
'''allows to change the drawdown
Args: maximumDrawdownPercent: The maximum percentage drawdown allowed for algorithm portfolio compared with the highest unrealized profit, defaults to 5% drawdown
'''
self.maximumDrawdownPercent = -abs(maximumDrawdownPercent)
def setWTtoZeroIfDDtooHigh(self, algo, targets=None):
'''If drawdown is too high, set wt[symbol] to zero
algo.wt[symbol] = weights which will be set to 0 in case drawdown exceeds the maximum
'''
for kvp in algo.Securities:
symbol = kvp.Key
security = kvp.Value
# Remove from trailingHighs dict if not invested
if not security.Invested:
self.trailingHighs.pop(symbol, None)
continue
# Add newly invested securities to trailingHighs dict
if symbol not in self.trailingHighs:
self.trailingHighs[symbol] = security.Holdings.AveragePrice
continue
# Check for new highs and update trailingHighs dict
if self.trailingHighs[symbol] < security.High:
self.trailingHighs[symbol] = security.High
continue
# Calc the drawdown
securityHigh = self.trailingHighs[symbol]
drawdown = (security.Low / securityHigh) - 1
# If drawdown is too high, set symbol weight to zero
if drawdown < self.maximumDrawdownPercent:
algo.wt[symbol] = 0
return
class myPortfolioOptimizer:
'''
Credit goes to: Emilio Freire / InnoQuantivity
https://innoquantivity.com/blogs/inno-blog/portfolio-optimization-quantconnect-research-algorithm
https://www.quantconnect.com/forum/discussion/8128/portfolio-optimization-research-amp-algorithm-for-better-workflows/p1/comment-22952
Description:
Implementation of a custom optimizer that calculates the weights for each asset to optimize a given objective function
Details:
Optimization can be:
- Equal Weighting
- Maximize Portfolio Return
- Minimize Portfolio Standard Deviation
- Mean-Variance (minimize Standard Deviation given a target return)
- Maximize Portfolio Sharpe Ratio
- Maximize Portfolio Sortino Ratio
- Risk Parity Portfolio
Constraints:
- Weights must be between some given boundaries
- Weights must sum to 1
'''
def __init__(self,
minWeight = 0,
maxWeight = 1):
'''
Description:
Initialize the CustomPortfolioOptimizer
Args:
minWeight(float): The lower bound on portfolio weights
maxWeight(float): The upper bound on portfolio weights
'''
self.minWeight = minWeight
self.maxWeight = maxWeight
def CalcWeights(self, algo, symbols, objectiveFunction='riskParity', lookback=63, targetReturn=None):
'''
Description:
Calculate weights from daily returns, return a pandas Series
'''
history = algo.History(symbols, lookback, Resolution.Daily)['close'].unstack(level = 0)
returnsDf = history.pct_change().dropna()
returnsDf.columns = [algo.AddEquity(i).Symbol.Value for i in list(returnsDf.columns)]
weights = self.Optimize(objectiveFunction, returnsDf, targetReturn)
return pd.Series(weights, index=returnsDf.columns, name='weights')
def Optimize(self, objFunction, dailyReturnsDf, targetReturn = None):
'''
Description:
Perform portfolio optimization given a series of returns
Args:
objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance, maxSharpe, maxSortino, riskParity)
dailyReturnsDf: DataFrame of historical daily arithmetic returns
Returns:
Array of double with the portfolio weights (size: K x 1)
'''
# initial weights: equally weighted
size = dailyReturnsDf.columns.size # K x 1
self.initWeights = np.array(size * [1. / size])
# get sample covariance matrix
covariance = dailyReturnsDf.cov()
# get the sample covariance matrix of only negative returns for sortino ratio
negativeReturnsDf = dailyReturnsDf[dailyReturnsDf < 0]
covarianceNegativeReturns = negativeReturnsDf.cov()
if objFunction == 'equalWeighting':
return self.initWeights
bounds = tuple((self.minWeight, self.maxWeight) for x in range(size))
constraints = [{'type': 'eq', 'fun': lambda x: np.sum(x) - 1.0}]
if objFunction == 'meanVariance':
# if no target return is provided, use the resulting from equal weighting
if targetReturn is None:
targetReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, self.initWeights)
constraints.append( {'type': 'eq', 'fun': lambda weights:
self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights) - targetReturn} )
opt = minimize(lambda weights: self.ObjectiveFunction(objFunction, dailyReturnsDf,
covariance, covarianceNegativeReturns,
weights),
x0 = self.initWeights,
bounds = bounds,
constraints = constraints,
method = 'SLSQP')
return opt['x']
def ObjectiveFunction(self, objFunction, dailyReturnsDf, covariance, covarianceNegativeReturns, weights):
'''
Description:
Compute the objective function
Args:
objFunction: The objective function to optimize (equalWeighting, maxReturn, minVariance, meanVariance,
maxSharpe, maxSortino, riskParity)
dailyReturnsDf: DataFrame of historical daily returns
covariance: Sample covariance
covarianceNegativeReturns: Sample covariance matrix of only negative returns
weights: Portfolio weights
'''
if objFunction == 'maxReturn':
f = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
return -f # convert to negative to be minimized
elif objFunction == 'minVariance':
f = self.CalculateAnnualizedPortfolioStd(covariance, weights)
return f
elif objFunction == 'meanVariance':
f = self.CalculateAnnualizedPortfolioStd(covariance, weights)
return f
elif objFunction == 'maxSharpe':
f = self.CalculateAnnualizedPortfolioSharpeRatio(dailyReturnsDf, covariance, weights)
return -f # convert to negative to be minimized
elif objFunction == 'maxSortino':
f = self.CalculateAnnualizedPortfolioSortinoRatio(dailyReturnsDf, covarianceNegativeReturns, weights)
return -f # convert to negative to be minimized
elif objFunction == 'riskParity':
f = self.CalculateRiskParityFunction(covariance, weights)
return f
else:
raise ValueError(f'PortfolioOptimizer.ObjectiveFunction: objFunction input has to be one of equalWeighting,'
+ ' maxReturn, minVariance, meanVariance, maxSharpe, maxSortino or riskParity')
def CalculateAnnualizedPortfolioReturn(self, dailyReturnsDf, weights):
annualizedPortfolioReturns = np.sum( ((1 + dailyReturnsDf.mean())**252 - 1) * weights )
return annualizedPortfolioReturns
def CalculateAnnualizedPortfolioStd(self, covariance, weights):
annualizedPortfolioStd = np.sqrt( np.dot(weights.T, np.dot(covariance * 252, weights)) )
if annualizedPortfolioStd == 0:
raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioStd: annualizedPortfolioStd cannot be zero. Weights: {weights}')
return annualizedPortfolioStd
def CalculateAnnualizedPortfolioNegativeStd(self, covarianceNegativeReturns, weights):
annualizedPortfolioNegativeStd = np.sqrt( np.dot(weights.T, np.dot(covarianceNegativeReturns * 252, weights)) )
if annualizedPortfolioNegativeStd == 0:
raise ValueError(f'PortfolioOptimizer.CalculateAnnualizedPortfolioNegativeStd: annualizedPortfolioNegativeStd cannot be zero. Weights: {weights}')
return annualizedPortfolioNegativeStd
def CalculateAnnualizedPortfolioSharpeRatio(self, dailyReturnsDf, covariance, weights):
annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
annualizedPortfolioStd = self.CalculateAnnualizedPortfolioStd(covariance, weights)
annualizedPortfolioSharpeRatio = annualizedPortfolioReturn / annualizedPortfolioStd
return annualizedPortfolioSharpeRatio
def CalculateAnnualizedPortfolioSortinoRatio(self, dailyReturnsDf, covarianceNegativeReturns, weights):
annualizedPortfolioReturn = self.CalculateAnnualizedPortfolioReturn(dailyReturnsDf, weights)
annualizedPortfolioNegativeStd = self.CalculateAnnualizedPortfolioNegativeStd(covarianceNegativeReturns, weights)
annualizedPortfolioSortinoRatio = annualizedPortfolioReturn / annualizedPortfolioNegativeStd
return annualizedPortfolioSortinoRatio
def CalculateRiskParityFunction(self, covariance, weights):
''' Spinu formulation for risk parity portfolio '''
assetsRiskBudget = self.initWeights
portfolioVolatility = self.CalculateAnnualizedPortfolioStd(covariance, weights)
x = weights / portfolioVolatility
riskParity = (np.dot(x.T, np.dot(covariance, x)) / 2) - np.dot(assetsRiskBudget.T, np.log(x))
return riskParity'''
Intersection of ROC comparison using OUT_DAY approach by Vladimir v1.3
(with dynamic selector for fundamental factors and momentum)
inspired by Peter Guenther, Tentor Testivis, Dan Whitnable, Thomas Chang, Miko M, Leandro Maia
Updates by Frank Schikarski:
- Trailing Stop Loss based on logic from Quant Connect
- Adaptiion of the level of Trailing Stop Loss following Vlad's regime logic
- Portfolio Optimization adapted based on logic from Emilio Freire
- Weighted Fundamentals logic from some other nice person
'''
import numpy as np
import pandas as pd
from QuantConnect.Data.UniverseSelection import *
from helpers import myPortfolioOptimizer
from helpers import myTrailingStopRiskManagementModel
# ---------------------------------------------------------------------------------------------------------------------------------------------
BONDS = ['TLT']; VOLA = 126; BASE_RET = 85; STK_MOM = 126; N_COARSE = 100; N_FACTOR = 20; N_MOM = 5; LEV = 1.00; REBA = 84; PFO = 0; TSL = 0.11
# ---------------------------------------------------------------------------------------------------------------------------------------------
class Fundamental_Factors_Momentum_ROC_Comparison_OUT_DAY(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2008, 1, 1)
self.SetEndDate(2021, 1, 13)
self.InitCash = 100000
self.SetCash(self.InitCash)
self.MKT = self.AddEquity("SPY", Resolution.Hour).Symbol
self.mkt = []
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
res = Resolution.Hour
self.BONDS = [self.AddEquity(ticker, res).Symbol for ticker in BONDS]
self.INI_WAIT_DAYS = 15
self.wait_days = self.INI_WAIT_DAYS
self.GLD = self.AddEquity('GLD', res).Symbol
self.SLV = self.AddEquity('SLV', res).Symbol
self.XLU = self.AddEquity('XLU', res).Symbol
self.XLI = self.AddEquity('XLI', res).Symbol
self.UUP = self.AddEquity('UUP', res).Symbol
self.DBB = self.AddEquity('DBB', res).Symbol
self.pairs = [self.GLD, self.SLV, self.XLU, self.XLI, self.UUP, self.DBB]
self.bull = 1
self.bull_prior = 0
self.count = 0
self.outday = (-self.INI_WAIT_DAYS+1)
self.SetWarmUp(timedelta(max(VOLA, BASE_RET, STK_MOM, REBA))) # <- changed
self.UniverseSettings.Resolution = res
self.AddUniverse(self.CoarseFilter, self.FineFilter)
self.data = {}
self.UpdateFineFilter = 0
self.symbols = None
self.RebalanceCount = 0
self.wt = {}
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen('SPY', 30),
self.daily_check)
if TSL != 0: # <- added
self.tsl_1 = 0.15 # <- added
self.tsl_2 = TSL # <- added
self.tsl_3 = 0.20 # <- added
self.tsl_4 = 0.20 # <- added
self.tsl = myTrailingStopRiskManagementModel(maximumDrawdownPercent=self.tsl_2) # <- added
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(TimeSpan.FromMinutes(60)), self.stop_loss) # <- added
if PFO != 0: # <- added
self.pfo = myPortfolioOptimizer(minWeight=0, maxWeight=1) # <- added
self.weights_stks = self.pfo.CalcWeights(self, symbols=self.BONDS) # <- added
self.weights_bnds = self.pfo.CalcWeights(self, symbols=self.BONDS) # <- added
symbols = [self.MKT] + self.pairs
for symbol in symbols:
self.consolidator = TradeBarConsolidator(timedelta(days=1))
self.consolidator.DataConsolidated += self.consolidation_handler
self.SubscriptionManager.AddConsolidator(symbol, self.consolidator)
self.history = self.History(symbols, VOLA, Resolution.Daily)
if self.history.empty or 'close' not in self.history.columns:
return
self.history = self.history['close'].unstack(level=0).dropna()
def consolidation_handler(self, sender, consolidated):
self.history.loc[consolidated.EndTime, consolidated.Symbol] = consolidated.Close
self.history = self.history.iloc[-VOLA:]
def derive_vola_waitdays(self):
sigma = 0.6 * np.log1p(self.history[[self.MKT]].pct_change()).std() * np.sqrt(252)
wait_days = int(sigma * BASE_RET)
period = int((1.0 - sigma) * BASE_RET)
return wait_days, period
def CoarseFilter(self, coarse):
if not (((self.count - self.RebalanceCount) == REBA) or (self.count == self.outday + self.wait_days - 1)):
self.UpdateFineFilter = 0
return Universe.Unchanged
self.UpdateFineFilter = 1
selected = [x for x in coarse
if x.HasFundamentalData
and float(x.Price) > 5.
]
filtered = sorted(selected, key=lambda x: x.DollarVolume, reverse=True)
return [x.Symbol for x in filtered[:N_COARSE]]
def FineFilter(self, fundamental):
if self.UpdateFineFilter == 0:
return Universe.Unchanged
filtered_fundamental = [x for x in fundamental
if x.SecurityReference.IsPrimaryShare
and x.SecurityReference.SecurityType == "ST00000001"
and x.SecurityReference.IsDepositaryReceipt == 0
and x.CompanyReference.IsLimitedPartnership == 0
and x.EarningReports.BasicAverageShares.ThreeMonths > 0
and float(x.EarningReports.BasicAverageShares.ThreeMonths) * x.Price > 2e9
and x.ValuationRatios.EVToEBITDA
and x.ValuationRatios.PricetoEBITDA
and x.ValuationRatios.PERatio
#and x.EarningReports.TotalDividendPerShare.ThreeMonths
]
# https://www.quantconnect.com/docs/data-library/fundamentals#Fundamentals-Reference-Tables
# sorting: reverse=False means "longing highest", reverse=True means "longing lowest"
s1 = sorted(filtered_fundamental, key=lambda x: x.ValuationRatios.EVToEBITDA, reverse=False) # <- added
s2 = sorted(filtered_fundamental, key=lambda x: x.ValuationRatios.PricetoEBITDA, reverse=False) # <- added
s3 = sorted(filtered_fundamental, key=lambda x: x.ValuationRatios.PERatio, reverse=False) # <- added
dict = {}
for i, elem in enumerate(s1): # <- added
i1 = i # <- added
i2 = s2.index(elem) # <- added
i3 = s3.index(elem) # <- added
score = sum([i1 * 0.85, i2 * 0.10, i3 * 0.05]) # <- added
dict[elem] = score # <- added
top = sorted(dict.items(), key = lambda x: x[1], reverse=True)[:N_FACTOR] # <- changed
self.symbols = [x[0].Symbol for x in top] # <- changed
self.UpdateFineFilter = 0
self.RebalanceCount = self.count
return self.symbols
def OnSecuritiesChanged(self, changes):
addedSymbols = []
for security in changes.AddedSecurities:
addedSymbols.append(security.Symbol)
if security.Symbol not in self.data:
self.data[security.Symbol] = SymbolData(security.Symbol, STK_MOM, self)
if len(addedSymbols) > 0:
history = self.History(addedSymbols, 1 + STK_MOM, Resolution.Daily).loc[addedSymbols]
for symbol in addedSymbols:
try:
self.data[symbol].Warmup(history.loc[symbol])
except:
self.Debug(str(symbol))
continue
def daily_check(self):
self.wait_days, period = self.derive_vola_waitdays()
r = self.history.pct_change(period).iloc[-1]
bear = ((r[self.SLV] < r[self.GLD]) and (r[self.XLI] < r[self.XLU]) and (r[self.DBB] < r[self.UUP]))
down2x = ((r[self.SLV] < r[self.GLD]) and (r[self.XLI] < r[self.XLU])) or \
((r[self.XLI] < r[self.XLU]) and (r[self.DBB] < r[self.UUP])) or \
((r[self.SLV] < r[self.GLD]) and (r[self.DBB] < r[self.UUP])) # <- changed
up2x = ((r[self.SLV] > r[self.GLD]) and (r[self.XLI] > r[self.XLU])) or \
((r[self.XLI] > r[self.XLU]) and (r[self.DBB] > r[self.UUP])) or \
((r[self.SLV] > r[self.GLD]) and (r[self.DBB] > r[self.UUP])) # <- changed
up3x = ((r[self.SLV] > r[self.GLD]) and (r[self.XLI] > r[self.XLU]) and (r[self.DBB] > r[self.UUP])) # <- changed
if bear:
self.bull = False
self.outday = self.count
if (self.count >= self.outday + self.wait_days):
self.bull = True
self.wt_stk = LEV if self.bull else 0
self.wt_bnd = 0 if self.bull else LEV
if bear:
self.trade_out()
if (self.bull and not self.bull_prior) or (self.bull and (self.count==self.RebalanceCount)):
self.trade_in()
if TSL != 0: # <- added
if bear: # <- added
self.tsl.setDD(maximumDrawdownPercent = self.tsl_1) # <- added
if down2x: # <- added
self.tsl.setDD(maximumDrawdownPercent = self.tsl_2) # <- added
elif up2x: # <- added
self.tsl.setDD(maximumDrawdownPercent = self.tsl_3) # <- added
elif up3x: # <- added
self.tsl.setDD(maximumDrawdownPercent = self.tsl_4) # <- added
self.bull_prior = self.bull
self.count += 1
def trade_out(self):
for sec in self.Portfolio.Keys:
if sec not in self.BONDS:
self.wt[sec] = 0
for sec in self.BONDS:
self.wt[sec] = self.wt_bnd/len(self.BONDS)
for sec, weight in self.wt.items():
if weight == 0 and self.Portfolio[sec].IsLong:
self.Liquidate(sec)
for sec, weight in self.wt.items():
if weight != 0:
self.SetHoldings(sec, weight)
def trade_in(self):
if self.symbols is None: return
stocks = self.calc_return(self.symbols).iloc[:N_MOM].index
for sec in self.Portfolio.Keys:
if sec not in stocks:
self.wt[sec] = 0
if PFO == 0: # <- added
for sec in stocks:
self.wt[sec] = self.wt_stk/N_MOM
else: # <- added
self.weights_stks = self.pfo.CalcWeights(self, symbols=stocks.values.tolist()) # <- added
for sec in stocks: # <- added
if (self.weights_stks.index==sec.Value).any(): # <- added
self.wt[sec] = self.wt_stk * self.weights_stks.loc[self.weights_stks.index==sec.Value][0] # <- added
else: # <- added
self.wt[sec] = 0 # <- added
for sec, weight in self.wt.items():
self.SetHoldings(sec, weight)
def calc_return(self, stocks):
ret = {}
for symbol in stocks:
try:
ret[symbol] = self.data[symbol].Roc.Current.Value
except:
self.Debug(str(symbol))
continue
df_ret = pd.DataFrame.from_dict(ret, orient='index')
df_ret.columns = ['return']
sort_return = df_ret.sort_values(by = ['return'], ascending = False)
return sort_return
def stop_loss(self): # <- added
if self.symbols is None: return # <- added
if TSL != 0: # <- added
self.tsl.setWTtoZeroIfDDtooHigh(self) # <- added
for sec, weight in self.wt.items(): # <- added
if weight == 0 and self.Portfolio[sec].IsLong: # <- added
self.Liquidate(sec) # <- added
self.RebalanceCount = self.count + 2 # <- added
def OnEndOfDay(self):
mkt_price = self.Securities[self.MKT].Close
self.mkt.append(mkt_price)
mkt_perf = self.InitCash * self.mkt[-1] / self.mkt[0]
self.Plot('Strategy Equity', self.MKT, mkt_perf)
account_leverage = self.Portfolio.TotalHoldingsValue / self.Portfolio.TotalPortfolioValue
self.Plot('Holdings', 'leverage', round(account_leverage, 2))
self.Plot('Holdings', 'Target Leverage', LEV)
class SymbolData(object):
def __init__(self, symbol, roc, algorithm):
self.Symbol = symbol
self.Roc = RateOfChange(roc)
self.algorithm = algorithm
self.consolidator = algorithm.ResolveConsolidator(symbol, Resolution.Daily)
algorithm.RegisterIndicator(symbol, self.Roc, self.consolidator)
def Warmup(self, history):
for index, row in history.iterrows():
self.Roc.Update(index, row['close'])