# https://quantpedia.com/strategies/betting-against-beta-factor-in-country-equity-indexes/
#
# The investment universe consists of all stocks from the CRSP database. The beta for each stock is calculated with respect to the MSCI US Equity
# Index using a 1-year rolling window. Stocks are then ranked in ascending order on the basis of their estimated beta. The ranked stocks are assigned
# to one of two portfolios: low beta and high beta. Securities are weighted by the ranked betas, and portfolios are rebalanced every calendar month.
# Both portfolios are rescaled to have a beta of one at portfolio formation. The “Betting-Against-Beta” is the zero-cost zero-beta portfolio that is
# long on the low-beta portfolio and short on the high-beta portfolio. There are a lot of simple modifications (like going long on the bottom beta
# decile and short on the top beta decile), which could probably improve the strategy’s performance.
#
# QC implementation changes:
# - Top 1000 stocks by market cap are selected from QC stock universe.
from scipy import stats
import numpy as np
class BettingAgainstBetaFactorinInternationalEquities(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2000, 1, 1)
self.SetCash(100000)
# Daily price data.
self.data = {}
self.period = 12 * 21
self.leverage_cap = 5
self.symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
self.weight = {}
self.coarse_count = 1000
self.selection_flag = False
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.Schedule.On(self.DateRules.MonthStart(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol), self.Selection)
def OnSecuritiesChanged(self, changes):
for security in changes.AddedSecurities:
symbol = security.Symbol
security.SetFeeModel(CustomFeeModel(self))
security.SetLeverage(10)
if symbol == self.symbol and symbol not in self.data:
self.data[symbol] = RollingWindow[float](self.period)
hist = self.History(symbol, self.period, Resolution.Daily)
if hist.empty:
continue
hist = hist.loc[symbol][:-1]
# Skip last price to prevent adding it twice - in OnData function.
for index, row in hist.iterrows():
self.data[symbol].Add(row.close)
def CoarseSelectionFunction(self, coarse):
# Update the rolling window every day.
for stock in coarse:
symbol = stock.Symbol
if symbol in self.data:
# Store daily price.
self.data[symbol].Add(stock.AdjustedPrice)
# Selection once a month.
if not self.selection_flag:
return Universe.Unchanged
selected = [x.Symbol for x in coarse if x.HasFundamentalData and x.Market == 'usa']
# Warmup price rolling windows.
for symbol in selected:
if symbol in self.data:
continue
self.data[symbol] = RollingWindow[float](self.period)
history = self.History(symbol, self.period, Resolution.Daily)
if history.empty:
self.Log(f"Not enough data for {symbol} yet")
continue
closes = history.loc[symbol].close
for time, close in closes.iteritems():
self.data[symbol].Add(close)
return [x for x in selected if self.data[x].IsReady]
def FineSelectionFunction(self, fine):
fine = [x for x in fine if x.MarketCap != 0]
if len(fine) > self.coarse_count:
sorted_by_market_cap = sorted(fine, key = lambda x: x.MarketCap, reverse=True)
top_by_market_cap = [x.Symbol for x in sorted_by_market_cap[:self.coarse_count]]
else:
top_by_market_cap = [x.Symbol for x in fine]
beta = {}
for symbol in top_by_market_cap:
market_closes = np.array([x for x in self.data[self.symbol]])
stock_closes = np.array([x for x in self.data[symbol]])
market_returns = (market_closes[:-1] - market_closes[1:]) / market_closes[1:]
stock_returns = (stock_closes[:-1] - stock_closes[1:]) / stock_closes[1:]
cov = np.cov(stock_returns, market_returns)[0][1]
market_variance = np.var(market_returns)
beta[symbol] = cov / market_variance
# Doesn't work for original strategy
# beta_, intercept, r_value, p_value, std_err = stats.linregress(market_returns, stock_returns)
# beta[symbol] = beta_
if len(beta) != 0:
# Beta diff calc.
beta_median = np.median([x[1] for x in beta.items()])
long_diff = [(x[0], abs(beta_median - x[1])) for x in beta.items() if x[1] < beta_median]
short_diff = [(x[0], abs(beta_median - x[1])) for x in beta.items() if x[1] > beta_median]
# Beta rescale.
long_portfolio_beta = np.mean([beta[x[0]] for x in long_diff])
# long_leverage = 1 / long_portfolio_beta
short_portfolio_beta = np.mean([beta[x[0]] for x in short_diff])
# short_leverage = 1 / short_portfolio_beta
# Those leverages cause MarginCall
# Cap long and short leverage.
# long_leverage = min(self.leverage_cap, long_leverage)
# long_leverage = max(-self.leverage_cap, long_leverage)
# short_leverage = min(self.leverage_cap, short_leverage)
# short_leverage = max(-self.leverage_cap, short_leverage)
total_long_diff = sum([x[1] for x in long_diff])
total_short_diff = sum([x[1] for x in short_diff])
# Beta diff weighting.
for symbol, diff in long_diff:
self.weight[symbol] = (diff / total_long_diff) # * long_leverage
for symbol, diff in short_diff:
self.weight[symbol] = - (diff / total_short_diff) # * short_leverage
return [x[0] for x in self.weight.items()]
def OnData(self, data):
# Update daily market data.
symbol_obj = self.Symbol(self.symbol)
if symbol_obj in data.Keys:
if data[symbol_obj]:
market_price = data[symbol_obj].Value
if market_price != 0:
self.data[self.symbol].Add(market_price)
if not self.selection_flag:
return
self.selection_flag = False
# Trade execution.
stocks_invested = [x.Key for x in self.Portfolio if x.Value.Invested]
for symbol in stocks_invested:
if symbol not in self.weight:
self.Liquidate(symbol)
for symbol, w in self.weight.items():
if self.Securities[symbol].IsTradable and self.Securities[symbol].Price != 0:
self.SetHoldings(symbol, w)
self.weight.clear()
def Selection(self):
self.selection_flag = True
# Custom fee model.
class CustomFeeModel(FeeModel):
def GetOrderFee(self, parameters):
fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
return OrderFee(CashAmount(fee, "USD"))