# https://quantpedia.com/strategies/residual-momentum-factor/
#
# The investment universe consists of all domestic, primary stocks listed on the New York (NYSE), American (AMEX), and NASDAQ
# stock markets with a price higher than $1. Closed-end funds, REITs, unit trusts, ADRs, and foreign stocks are removed. The
# 10% largest stocks in terms of market capitalization are then selected for trading.
# The residual momentum strategy is defined as a zero-investment top-minus-bottom decile portfolio based on ranking stocks
# every month on their past 12-month residual returns, excluding the most recent month, standardized by the standard deviation
# of the residual returns over the same period. Residual returns are estimated each month for all stocks over the past 36 months
# using a regression model. The regression model is calculated every month for all eligible stocks using the Fama and French
# three factors as independent variables. The portfolio is equally weighted and rebalanced monthly.
import fk_tools
import numpy as np
from collections import deque
import statsmodels.api as sm
class ResidualMomentumFactor(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2000, 1, 1)
self.SetCash(100000)
self.period = 37
self.coarse_count = 1000
self.symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
# Factors.
self.size_factor_symbols = [] # Symbol,long_flag tuple.
self.size_factor_vector = deque(maxlen = self.period - 1) # Monthly performance.
self.value_factor_symbols = []
self.value_factor_vector = deque(maxlen = self.period - 1)
# Monthly price data.
self.data = {}
# Monthly residual returns for each stock.
self.residual_return = {}
self.residual_momentum_period = 12
self.long = []
self.short = []
self.selection_flag = False
self.rebalance_flag = False
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.Schedule.On(self.DateRules.MonthEnd(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol), self.Selection)
def OnSecuritiesChanged(self, changes):
for security in changes.AddedSecurities:
symbol = security.Symbol
security.SetLeverage(5)
security.SetFeeModel(fk_tools.CustomFeeModel(self))
if symbol not in self.data:
self.data[symbol] = deque(maxlen = self.period)
if symbol not in self.residual_return:
self.residual_return[symbol] = deque(maxlen = self.residual_momentum_period)
def CoarseSelectionFunction(self, coarse):
if not self.selection_flag:
return Universe.Unchanged
self.selection_flag = False
selected = sorted([x for x in coarse if x.HasFundamentalData and x.Price > 1 and x.Market == 'usa'],
key=lambda x: x.DollarVolume, reverse=True)
return [x.Symbol for x in selected[:self.coarse_count]]
def FineSelectionFunction(self, fine):
fine = [x for x in fine if x.EarningReports.BasicAverageShares.ThreeMonths > 0 and x.EarningReports.BasicEPS.TwelveMonths > 0 and x.ValuationRatios.PERatio > 0
and x.ValuationRatios.PBRatio != 0 and x.CompanyReference.IsREIT == 0]
fine_symbols = [x.Symbol for x in fine]
# Market factor.
market_factor = []
if self.symbol in self.data and len(self.data[self.symbol]) == self.data[self.symbol].maxlen:
market_factor_prices = np.array([x for x in self.data[self.symbol]])
market_factor = (market_factor_prices[1:] - market_factor_prices[:-1]) / market_factor_prices[:-1]
# Size factor.
sorted_by_market_cap = sorted(fine, key = lambda x:(x.EarningReports.BasicAverageShares.ThreeMonths * (x.EarningReports.BasicEPS.TwelveMonths * x.ValuationRatios.PERatio)), reverse=True)
quintile = int(len(sorted_by_market_cap) / 5)
size_factor_long = [ (i.Symbol,True) for i in sorted_by_market_cap[-quintile:]]
size_factor_short = [(i.Symbol,False) for i in sorted_by_market_cap[:quintile]]
# Calculate last month's performance.
if len(self.size_factor_symbols) != 0:
monthly_return = self.CalculateFactorPerformance(self.data, self.size_factor_symbols)
if monthly_return != 0:
self.size_factor_vector.append(monthly_return)
# Store new factor symbols.
self.size_factor_symbols = size_factor_long + size_factor_short
# Value factor.
sorted_by_pb = sorted(fine, key = lambda x:(x.ValuationRatios.PBRatio), reverse=False)
quintile = int(len(sorted_by_pb) / 5)
value_factor_long = [(i.Symbol,True) for i in sorted_by_pb[:quintile]]
value_factor_short = [(i.Symbol,False) for i in sorted_by_pb[-quintile:]]
# Calculate last month's performance.
if len(self.value_factor_symbols) != 0:
monthly_return = self.CalculateFactorPerformance(self.data, self.value_factor_symbols)
if monthly_return != 0:
self.value_factor_vector.append(monthly_return)
# Store new factor symbols.
self.value_factor_symbols = value_factor_long + value_factor_short
# Every factor vector is ready.
if len(market_factor) == (self.period - 1) and \
len(self.size_factor_vector) == self.size_factor_vector.maxlen and \
len(self.value_factor_vector) == self.value_factor_vector.maxlen:
# Residual return calc.
x = [market_factor, self.size_factor_vector, self.value_factor_vector]
standardized_residual_momentum = {}
# Calculate residual return for every stock in universe. Not just for currently selected fine selection.
for symbol in self.residual_return:
if symbol in self.data and len(self.data[symbol]) == self.data[symbol].maxlen:
monthly_prices = np.array([x for x in self.data[symbol]])
monthly_returns = (monthly_prices[1:] - monthly_prices[:-1]) / monthly_prices[:-1]
regression_model = self.MultipleLinearRegresion(x, monthly_returns)
alpha = regression_model.params[0]
# Residual data for 12 months is ready.
if len(self.residual_return[symbol]) == self.residual_return[symbol].maxlen:
residual_returns = [x for x in self.residual_return[symbol]][:-1]
standardized_resid_mom = sum(residual_returns) / np.std(residual_returns)
if symbol in fine_symbols:
standardized_residual_momentum[symbol] = standardized_resid_mom
self.residual_return[symbol].append(alpha)
sorted_by_resid_momentum = sorted(standardized_residual_momentum.items(), key = lambda x: x[1], reverse=True)
decile = int(len(sorted_by_resid_momentum) / 10)
self.long = [x[0] for x in sorted_by_resid_momentum[:decile]]
self.short = [x[0] for x in sorted_by_resid_momentum[-decile:]]
self.rebalance_flag = True
return fine_symbols
def OnData(self, data):
if not self.rebalance_flag:
return
self.rebalance_flag = False
# Trade execution.
count = len(self.long + self.short)
if count == 0:
self.Liquidate()
return
stocks_invested = [x.Key for x in self.Portfolio if x.Value.Invested]
for symbol in stocks_invested:
if symbol not in self.long + self.short:
self.Liquidate(symbol)
for symbol in self.long:
if self.Securities[symbol].Price != 0: # Prevent error message.
self.SetHoldings(symbol, 1 / len(self.long))
for symbol in self.short:
if self.Securities[symbol].Price != 0: # Prevent error message.
self.SetHoldings(symbol, -1 / len(self.short))
self.long.clear()
self.short.clear()
def Selection(self):
# Store monthly data for universe.
for symbol in self.data:
if self.Securities.ContainsKey(symbol):
price = self.Securities[symbol].Price
if price != 0:
self.data[symbol].append(price)
else:
# Append latest price as a next one in case there's 0 as price.
if len(self.data[symbol]) > 0:
last_price = self.data[-1]
self.data[symbol].append(last_price)
self.selection_flag = True
def MultipleLinearRegresion(self, x, y):
x = np.array(x).T
x = sm.add_constant(x)
result = sm.OLS(endog=y, exog=x).fit()
return result
def CalculateFactorPerformance(self, data, factor_symbols):
monthly_return = 0
if len(factor_symbols) != 0:
for symbol, long_flag in factor_symbols:
if symbol in data and len(data[symbol]) >= 2:
if long_flag:
monthly_return += (fk_tools.Return([x for x in data[symbol]][-2:]) / len(factor_symbols))
else:
monthly_return -= (fk_tools.Return([x for x in data[symbol]][-2:]) / len(factor_symbols))
return monthly_return
import numpy as np
from scipy.optimize import minimize
sp100_stocks = ['AAPL','MSFT','AMZN','FB','BRKB','GOOGL','GOOG','JPM','JNJ','V','PG','XOM','UNH','BAC','MA','T','DIS','INTC','HD','VZ','MRK','PFE','CVX','KO','CMCSA','CSCO','PEP','WFC','C','BA','ADBE','WMT','CRM','MCD','MDT','BMY','ABT','NVDA','NFLX','AMGN','PM','PYPL','TMO','COST','ABBV','ACN','HON','NKE','UNP','UTX','NEE','IBM','TXN','AVGO','LLY','ORCL','LIN','SBUX','AMT','LMT','GE','MMM','DHR','QCOM','CVS','MO','LOW','FIS','AXP','BKNG','UPS','GILD','CHTR','CAT','MDLZ','GS','USB','CI','ANTM','BDX','TJX','ADP','TFC','CME','SPGI','COP','INTU','ISRG','CB','SO','D','FISV','PNC','DUK','SYK','ZTS','MS','RTN','AGN','BLK']
def MonthDiff(d1, d2):
return (d1.year - d2.year) * 12 + d1.month - d2.month
def Return(values):
return (values[-1] - values[0]) / values[0]
def Volatility(values):
values = np.array(values)
returns = (values[1:] - values[:-1]) / values[:-1]
return np.std(returns)
# Custom fee model
class CustomFeeModel(FeeModel):
def GetOrderFee(self, parameters):
fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
return OrderFee(CashAmount(fee, "USD"))
# Quandl free data
class QuandlFutures(PythonQuandl):
def __init__(self):
self.ValueColumnName = "settle"
# Quandl short interest data.
class QuandlFINRA_ShortVolume(PythonQuandl):
def __init__(self):
self.ValueColumnName = 'SHORTVOLUME' # also 'TOTALVOLUME' is accesible
# Quantpedia data
# NOTE: IMPORTANT: Data order must be ascending (datewise)
class QuantpediaFutures(PythonData):
def GetSource(self, config, date, isLiveMode):
return SubscriptionDataSource("data.quantpedia.com/backtesting_data/futures/{0}.csv".format(config.Symbol.Value), SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)
def Reader(self, config, line, date, isLiveMode):
data = QuantpediaFutures()
data.Symbol = config.Symbol
if not line[0].isdigit(): return None
split = line.split(';')
data.Time = datetime.strptime(split[0], "%d.%m.%Y") + timedelta(days=1)
data['settle'] = float(split[1])
data.Value = float(split[1])
return data
# NOTE: Manager for new trades. It's represented by certain count of equally weighted brackets for long and short positions.
# If there's a place for new trade, it will be managed for time of holding period.
class TradeManager():
def __init__(self, algorithm, long_size, short_size, holding_period):
self.algorithm = algorithm # algorithm to execute orders in.
self.long_size = long_size
self.short_size = short_size
self.weight = 1 / (self.long_size + self.short_size)
self.long_len = 0
self.short_len = 0
# Arrays of ManagedSymbols
self.symbols = []
self.holding_period = holding_period # Days of holding.
# Add stock symbol object
def Add(self, symbol, long_flag):
# Open new long trade.
managed_symbol = ManagedSymbol(symbol, self.holding_period, long_flag)
if long_flag:
# If there's a place for it.
if self.long_len < self.long_size:
self.symbols.append(managed_symbol)
self.algorithm.SetHoldings(symbol, self.weight)
self.long_len += 1
else:
self.algorithm.Log("There's not place for additional trade.")
# Open new short trade.
else:
# If there's a place for it.
if self.short_len < self.short_size:
self.symbols.append(managed_symbol)
self.algorithm.SetHoldings(symbol, - self.weight)
self.short_len += 1
else:
self.algorithm.Log("There's not place for additional trade.")
# Decrement holding period and liquidate symbols.
def TryLiquidate(self):
symbols_to_delete = []
for managed_symbol in self.symbols:
managed_symbol.days_to_liquidate -= 1
# Liquidate.
if managed_symbol.days_to_liquidate == 0:
symbols_to_delete.append(managed_symbol)
self.algorithm.Liquidate(managed_symbol.symbol)
if managed_symbol.long_flag: self.long_len -= 1
else: self.short_len -= 1
# Remove symbols from management.
for managed_symbol in symbols_to_delete:
self.symbols.remove(managed_symbol)
def LiquidateTicker(self, ticker):
symbol_to_delete = None
for managed_symbol in self.symbols:
if managed_symbol.symbol.Value == ticker:
self.algorithm.Liquidate(managed_symbol.symbol)
symbol_to_delete = managed_symbol
if managed_symbol.long_flag: self.long_len -= 1
else: self.short_len -= 1
break
if symbol_to_delete: self.symbols.remove(symbol_to_delete)
else: self.algorithm.Debug("Ticker is not held in portfolio!")
class ManagedSymbol():
def __init__(self, symbol, days_to_liquidate, long_flag):
self.symbol = symbol
self.days_to_liquidate = days_to_liquidate
self.long_flag = long_flag
class PortfolioOptimization(object):
def __init__(self, df_return, risk_free_rate, num_assets):
self.daily_return = df_return
self.risk_free_rate = risk_free_rate
self.n = num_assets # numbers of risk assets in portfolio
self.target_vol = 0.05
def annual_port_return(self, weights):
# calculate the annual return of portfolio
return np.sum(self.daily_return.mean() * weights) * 252
def annual_port_vol(self, weights):
# calculate the annual volatility of portfolio
return np.sqrt(np.dot(weights.T, np.dot(self.daily_return.cov() * 252, weights)))
def min_func(self, weights):
# method 1: maximize sharp ratio
return - self.annual_port_return(weights) / self.annual_port_vol(weights)
# method 2: maximize the return with target volatility
#return - self.annual_port_return(weights) / self.target_vol
def opt_portfolio(self):
# maximize the sharpe ratio to find the optimal weights
cons = ({'type': 'eq', 'fun': lambda x: np.sum(x) - 1})
bnds = tuple((0, 1) for x in range(2)) + tuple((0, 0.25) for x in range(self.n - 2))
opt = minimize(self.min_func, # object function
np.array(self.n * [1. / self.n]), # initial value
method='SLSQP', # optimization method
bounds=bnds, # bounds for variables
constraints=cons) # constraint conditions
opt_weights = opt['x']
return opt_weights