Overall Statistics
Total Trades
20458
Average Win
0.12%
Average Loss
-0.12%
Compounding Annual Return
1.622%
Drawdown
34.800%
Expectancy
0.034
Net Profit
45.030%
Sharpe Ratio
0.171
Probabilistic Sharpe Ratio
0.000%
Loss Rate
48%
Win Rate
52%
Profit-Loss Ratio
0.98
Alpha
0.018
Beta
-0.048
Annual Standard Deviation
0.089
Annual Variance
0.008
Information Ratio
-0.222
Tracking Error
0.192
Treynor Ratio
-0.315
Total Fees
$1123.82
Estimated Strategy Capacity
$110000.00
Lowest Capacity Asset
BBDO V4VJ7HU9KG9X
# https://quantpedia.com/strategies/residual-momentum-factor/
#
# The investment universe consists of all domestic, primary stocks listed on the New York (NYSE), American (AMEX), and NASDAQ 
# stock markets with a price higher than $1. Closed-end funds, REITs, unit trusts, ADRs, and foreign stocks are removed. The 
# 10% largest stocks in terms of market capitalization are then selected for trading.
# The residual momentum strategy is defined as a zero-investment top-minus-bottom decile portfolio based on ranking stocks 
# every month on their past 12-month residual returns, excluding the most recent month, standardized by the standard deviation
# of the residual returns over the same period. Residual returns are estimated each month for all stocks over the past 36 months
# using a regression model. The regression model is calculated every month for all eligible stocks using the Fama and French 
# three factors as independent variables. The portfolio is equally weighted and rebalanced monthly.
#
# QC implementation changes:
#   - Universe consists of top 3000 US stock by market cap from NYSE, AMEX and NASDAQ.

import numpy as np
from AlgorithmImports import *
import statsmodels.api as sm

class ResidualMomentumFactor(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2000, 1, 1)
        self.SetCash(100000)

        self.coarse_count = 500

        # Monthly price data.
        self.data = {}
        self.period = 37

        # Warmup market monthly data.
        self.symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
        self.data[self.symbol] = RollingWindow[float](self.period)
        
        history = self.History(self.symbol, self.period * 21, Resolution.Daily)
        if history.empty:
            self.Log(f"Not enough data for {self.symbol} yet.")
        else:    
            closes = history.loc[self.symbol].close
            closes_len = len(closes.keys())
            # Find monthly closes.
            for index, time_close in enumerate(closes.iteritems()):
                # index out of bounds check.
                if index + 1 < closes_len:
                    date_month = time_close[0].date().month
                    next_date_month = closes.keys()[index + 1].month
                
                    # Found last day of month.
                    if date_month != next_date_month:
                        self.data[self.symbol].Add(time_close[1])        
        
        # Factors.
        self.size_factor_symbols = []                                   # Symbol,long_flag tuple.
        self.size_factor_vector = RollingWindow[float](self.period - 1) # Monthly performance.

        self.value_factor_symbols = []
        self.value_factor_vector = RollingWindow[float](self.period - 1)
        
        # Monthly residual returns for each stock.
        self.residual_return = {}
        self.residual_momentum_period = 12
        
        self.long = []
        self.short = []
        
        self.last_month = -1
        self.selection_flag = False
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
        
    def OnSecuritiesChanged(self, changes):
        for security in changes.AddedSecurities:
            security.SetLeverage(10)
            security.SetFeeModel(CustomFeeModel())
    
    def CoarseSelectionFunction(self, coarse):
        if not self.selection_flag:
            return Universe.Unchanged

        # Update the rolling window every month.
        for stock in coarse:
            symbol = stock.Symbol
            
            # Store monthly market price.
            if symbol == self.symbol:
                self.data[self.symbol].Add(stock.AdjustedPrice)
            else:
                # Store monthly stock price.
                if symbol in self.data:
                    self.data[symbol].Add(stock.AdjustedPrice)

        selected = [x.Symbol for x in coarse if x.HasFundamentalData and x.Market == 'usa']
        # selected = [x.Symbol
        #     for x in sorted([x for x in coarse if x.HasFundamentalData and x.Market == 'usa'],
        #         key = lambda x: x.DollarVolume, reverse = True)[:self.coarse_count]]
        
        # Warmup price rolling windows.
        for symbol in selected:
            if symbol in self.data:
                continue
            
            self.data[symbol] = RollingWindow[float](self.period)
            history = self.History(symbol, self.period * 21, Resolution.Daily)
            if history.empty:
                self.Log(f"Not enough data for {symbol} yet.")
                continue
            closes = history.loc[symbol].close
            
            closes_len = len(closes.keys())
            # Find monthly closes.
            for index, time_close in enumerate(closes.iteritems()):
                # index out of bounds check.
                if index + 1 < closes_len:
                    date_month = time_close[0].date().month
                    next_date_month = closes.keys()[index + 1].month
                
                    # Found last day of month.
                    if date_month != next_date_month:
                        self.data[symbol].Add(time_close[1])
            
        return [x for x in selected if self.data[x].IsReady]        

    def FineSelectionFunction(self, fine):
        fine = [x for x in fine if x.MarketCap != 0 and x.CompanyReference.IsREIT == 0 and      \
                ((x.SecurityReference.ExchangeId == "NYS") or (x.SecurityReference.ExchangeId == "NAS") or (x.SecurityReference.ExchangeId == "ASE"))]
                
        if len(fine) > self.coarse_count:
            sorted_by_market_cap = sorted(fine, key = lambda x: x.MarketCap, reverse=True)
            top_by_market_cap = sorted_by_market_cap[:self.coarse_count]
        else:
            top_by_market_cap = fine

        # Size factor.
        # sorted_by_market_cap = sorted(top_by_market_cap, key = lambda x:(x.MarketCap), reverse=False)
        quintile = int(len(top_by_market_cap) / 5)
        size_factor_long = [ (i.Symbol,True) for i in top_by_market_cap[-quintile:]]
        size_factor_short = [(i.Symbol,False) for i in top_by_market_cap[:quintile]]
        # Calculate last month's performance.
        if len(self.size_factor_symbols) != 0:
            monthly_return = self.CalculateFactorPerformance(self.data, self.size_factor_symbols)
            if monthly_return != 0:
                self.size_factor_vector.Add(monthly_return)
        # Store new factor symbols.
        self.size_factor_symbols = size_factor_long + size_factor_short
                
        # Value factor.
        sorted_by_pb = sorted(top_by_market_cap, key = lambda x:(x.ValuationRatios.PBRatio), reverse=False)
        quintile = int(len(sorted_by_pb) / 5)
        value_factor_long = [(i.Symbol,True) for i in sorted_by_pb[:quintile]]
        value_factor_short = [(i.Symbol,False) for i in sorted_by_pb[-quintile:]]
        # Calculate last month's performance.
        if len(self.value_factor_symbols) != 0:
            monthly_return = self.CalculateFactorPerformance(self.data, self.value_factor_symbols)
            if monthly_return != 0:
                self.value_factor_vector.Add(monthly_return)
        # Store new factor symbols.
        self.value_factor_symbols = value_factor_long + value_factor_short
            
        # Every factor vector is ready.
        if self.size_factor_vector.IsReady and self.value_factor_vector.IsReady:
            
            # Market factor.
            market_factor = []
            if self.symbol in self.data and self.data[self.symbol].IsReady:
                market_factor_prices = np.array([x for x in self.data[self.symbol]])
                market_factor = (market_factor_prices[:-1] - market_factor_prices[1:]) / market_factor_prices[1:]
            
                if len(market_factor) == (self.period - 1): 
                    # Residual return calc.
                    x = [
                        [x for x in market_factor], 
                        [x for x in self.size_factor_vector], 
                        [x for x in self.value_factor_vector]
                    ]
                    
                    standardized_residual_momentum = {}
                    for stock in top_by_market_cap:
                        symbol = stock.Symbol
                        if symbol in self.data and self.data[symbol].IsReady:
                            monthly_prices = np.array([x for x in self.data[symbol]])
                            monthly_returns = (monthly_prices[:-1] - monthly_prices[1:]) / monthly_prices[1:]
                            
                            regression_model = self.MultipleLinearRegression(x, monthly_returns)
                            alpha = regression_model.params[0]
                            
                            if symbol not in self.residual_return:
                                self.residual_return[symbol] = RollingWindow[float](self.residual_momentum_period)
                            self.residual_return[symbol].Add(alpha)
                            
                            # Residual data for 12 months is ready.
                            if self.residual_return[symbol].IsReady:
                                residual_returns = [x for x in self.residual_return[symbol]]
                                standardized_residual_momentum[symbol] = sum(residual_returns) / np.std(residual_returns)
        
                    sorted_by_resid_momentum = sorted(standardized_residual_momentum.items(), key = lambda x: x[1], reverse=True)
                    decile = int(len(sorted_by_resid_momentum) / 10)
                    self.long = [x[0] for x in sorted_by_resid_momentum[:decile]]
                    self.short = [x[0] for x in sorted_by_resid_momentum[-decile:]]
 
        return self.long + self.short
    
    def OnData(self, data):
        if self.Time.month != self.last_month:
            self.last_month = self.Time.month
            self.selection_flag = True
            return
        
        if not self.selection_flag:
            return
        self.selection_flag = False
        
        # Trade execution.
        stocks_invested = [x.Key for x in self.Portfolio if x.Value.Invested]
        for symbol in stocks_invested:
            if symbol not in self.long + self.short:
                self.Liquidate(symbol)

        for symbol in self.long:
            self.SetHoldings(symbol, 1 / len(self.long))
        for symbol in self.short:
            self.SetHoldings(symbol, -1 / len(self.short))
        
        self.long.clear()
        self.short.clear()

    def CalculateFactorPerformance(self, data, factor_symbols):
        monthly_return = 0
        if len(factor_symbols) != 0:
            for symbol, long_flag in factor_symbols:
                if symbol in data and data[symbol].Count >= 2:
                    closes = [x for x in data[symbol]]
                    if long_flag:
                        monthly_return += ((closes[0] / closes[1] - 1) / len(factor_symbols))
                    else:
                        monthly_return -= ((closes[0] / closes[1] - 1) / len(factor_symbols))

        return monthly_return

    def MultipleLinearRegression(self, x, y):
        x = np.array(x).T
        x = sm.add_constant(x)
        result = sm.OLS(endog=y, exog=x).fit()
        return result

# Custom fee model.
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))