Overall Statistics
Total Orders
20731
Average Win
0.15%
Average Loss
-0.16%
Compounding Annual Return
0.306%
Drawdown
54.300%
Expectancy
0.004
Start Equity
100000
End Equity
107697.33
Net Profit
7.697%
Sharpe Ratio
-0.088
Sortino Ratio
-0.077
Probabilistic Sharpe Ratio
0.000%
Loss Rate
48%
Win Rate
52%
Profit-Loss Ratio
0.93
Alpha
-0.006
Beta
-0.115
Annual Standard Deviation
0.126
Annual Variance
0.016
Information Ratio
-0.244
Tracking Error
0.218
Treynor Ratio
0.097
Total Fees
$1061.53
Estimated Strategy Capacity
$34000000.00
Lowest Capacity Asset
SPLK V5VYQYDIED7P
Portfolio Turnover
2.54%
# https://quantpedia.com/strategies/residual-momentum-factor/
#
# The investment universe consists of all domestic, primary stocks listed on the New York (NYSE), American (AMEX), and NASDAQ 
# stock markets with a price higher than $1. Closed-end funds, REITs, unit trusts, ADRs, and foreign stocks are removed. The 
# 10% largest stocks in terms of market capitalization are then selected for trading.
# The residual momentum strategy is defined as a zero-investment top-minus-bottom decile portfolio based on ranking stocks 
# every month on their past 12-month residual returns, excluding the most recent month, standardized by the standard deviation
# of the residual returns over the same period. Residual returns are estimated each month for all stocks over the past 36 months
# using a regression model. The regression model is calculated every month for all eligible stocks using the Fama and French 
# three factors as independent variables. The portfolio is equally weighted and rebalanced monthly.
#
# QC implementation changes:
#   - Universe consists of 500 most liquid US stock traded on NYSE, AMEX and NASDAQ.

import numpy as np
from AlgorithmImports import *
import statsmodels.api as sm

class ResidualMomentumFactor(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2000, 1, 1)
        self.SetCash(100000)

        # Monthly price data.
        self.data:Dict[Symbol, RollingWindow] = {}
        self.period:int = 37

        # Warmup market monthly data.
        self.symbol:Symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
        self.data[self.symbol] = RollingWindow[float](self.period)
        
        history = self.History(self.symbol, self.period * 21, Resolution.Daily)
        if history.empty:
            self.Log(f"Not enough data for {self.symbol} yet.")
        else:    
            closes = history.loc[self.symbol].close
            closes_len = len(closes.keys())
            # Find monthly closes.
            for index, time_close in enumerate(closes.items()):
                # index out of bounds check.
                if index + 1 < closes_len:
                    date_month = time_close[0].date().month
                    next_date_month = closes.keys()[index + 1].month
                
                    # Found last day of month.
                    if date_month != next_date_month:
                        self.data[self.symbol].Add(time_close[1])        
        
        # Factors.
        self.size_factor_symbols:List[Symbol] = []                                   # Symbol,long_flag tuple.
        self.size_factor_vector:RollingWindow = RollingWindow[float](self.period - 1) # Monthly performance.

        self.value_factor_symbols:List[Symbol] = []
        self.value_factor_vector:RollingWindow = RollingWindow[float](self.period - 1)
        
        # Monthly residual returns for each stock.
        self.residual_return:Dict[Symbol, RollingWindow] = {}
        self.residual_momentum_period:int = 12
        
        self.long:List[Symbol] = []
        self.short:List[Symbol] = []

        self.fundamental_count:int = 500
        self.fundamental_sorting_key = lambda x: x.DollarVolume

        self.factor_quantile:int = 5
        self.quantile:int = 10
        self.leverage:int = 3
        
        self.last_month:int = -1
        self.selection_flag:bool = False
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse(self.FundamentalSelectionFunction)
        self.Settings.MinimumOrderMarginPortfolioPercentage = 0
        
    def OnSecuritiesChanged(self, changes: SecurityChanges) -> None:
        for security in changes.AddedSecurities:
            security.SetLeverage(self.leverage)
            security.SetFeeModel(CustomFeeModel())
    
    def FundamentalSelectionFunction(self, fundamental: List[Fundamental]) -> List[Symbol]:
        if not self.selection_flag:
            return Universe.Unchanged

        # Update the rolling window every month.
        for stock in fundamental:
            symbol = stock.Symbol
            
            # Store monthly market price.
            if symbol == self.symbol:
                self.data[self.symbol].Add(stock.AdjustedPrice)
            else:
                # Store monthly stock price.
                if symbol in self.data:
                    self.data[symbol].Add(stock.AdjustedPrice)

        selected:List[Fundamental] = [x for x in fundamental if x.HasFundamentalData and x.Market == 'usa' and x.MarketCap != 0 and x.CompanyReference.IsREIT == 0 and \
            ((x.SecurityReference.ExchangeId == "NYS") or (x.SecurityReference.ExchangeId == "NAS") or (x.SecurityReference.ExchangeId == "ASE"))
            ]

        if len(selected) > self.fundamental_count:
            selected = [x for x in sorted(selected, key=self.fundamental_sorting_key, reverse=True)[:self.fundamental_count]]
            
        # Warmup price rolling windows.
        for stock in selected:
            symbol:Symbol = stock.Symbol
            
            if symbol in self.data: continue

            self.data[symbol] = RollingWindow[float](self.period)
            history = self.History(symbol, self.period * 21, Resolution.Daily)
            if history.empty:
                self.Log(f"Not enough data for {symbol} yet.")
                continue
            closes = history.loc[symbol].close
            
            closes_len = len(closes.keys())
            # Find monthly closes.
            for index, time_close in enumerate(closes.items()):
                # index out of bounds check.
                if index + 1 < closes_len:
                    date_month = time_close[0].date().month
                    next_date_month = closes.keys()[index + 1].month
                
                    # Found last day of month.
                    if date_month != next_date_month:
                        self.data[symbol].Add(time_close[1])
            
        selected = [x for x in selected if self.data[x.Symbol].IsReady]

        if len(selected) == 0:
            return Universe.Unchanged

        # Size factor.
        sorted_by_market_cap:List[Fundamental] = sorted(selected, key=lambda x: x.MarketCap, reverse=True)
        quantile:int = int(len(sorted_by_market_cap) / self.factor_quantile)
        size_factor_long:List[Tuple] = [ (i.Symbol, True) for i in sorted_by_market_cap[-quantile:]]
        size_factor_short:List[Tuple] = [(i.Symbol, False) for i in sorted_by_market_cap[:quantile]]
        # Calculate last month's performance.
        if len(self.size_factor_symbols) != 0:
            monthly_return:float = self.CalculateFactorPerformance(self.data, self.size_factor_symbols)
            if monthly_return != 0:
                self.size_factor_vector.Add(monthly_return)
        # Store new factor symbols.
        self.size_factor_symbols = size_factor_long + size_factor_short
                
        # Value factor.
        sorted_by_pb:List[Fundamental] = sorted(selected, key = lambda x:(x.ValuationRatios.PBRatio), reverse=False)
        quantile:int = int(len(sorted_by_pb) / self.factor_quantile)
        value_factor_long:List[Tuple] = [(i.Symbol, True) for i in sorted_by_pb[:quantile]]
        value_factor_short:List[Tuple] = [(i.Symbol, False) for i in sorted_by_pb[-quantile:]]
        # Calculate last month's performance.
        if len(self.value_factor_symbols) != 0:
            monthly_return:float = self.CalculateFactorPerformance(self.data, self.value_factor_symbols)
            if monthly_return != 0:
                self.value_factor_vector.Add(monthly_return)
        # Store new factor symbols.
        self.value_factor_symbols = value_factor_long + value_factor_short
            
        # Every factor vector is ready.
        if self.size_factor_vector.IsReady and self.value_factor_vector.IsReady:
            
            # Market factor.
            if self.symbol in self.data and self.data[self.symbol].IsReady:
                market_factor_prices:np.ndarray = np.array([x for x in self.data[self.symbol]])
                market_factor:np.ndarray = (market_factor_prices[:-1] - market_factor_prices[1:]) / market_factor_prices[1:]
            
                if len(market_factor) == (self.period - 1): 
                    # Residual return calc.
                    x:List[List[float]] = [
                        list(market_factor), 
                        list(self.size_factor_vector), 
                        list(self.value_factor_vector)
                    ]
                    
                    standardized_residual_momentum:Dict[Symbol, float] = {}
                    for stock in sorted_by_market_cap:
                        symbol:Symbol = stock.Symbol
                        monthly_prices:np.ndarray = np.array([x for x in self.data[symbol]])
                        monthly_returns:np.ndarray = (monthly_prices[:-1] - monthly_prices[1:]) / monthly_prices[1:]
                        
                        regression_model = self.MultipleLinearRegression(x, monthly_returns)
                        alpha:float = regression_model.params[0]
                        
                        if symbol not in self.residual_return:
                            self.residual_return[symbol] = RollingWindow[float](self.residual_momentum_period)
                        self.residual_return[symbol].Add(alpha)
                        
                        # Residual data for 12 months is ready.
                        if self.residual_return[symbol].IsReady:
                            residual_returns:List[float] = [x for x in self.residual_return[symbol]]
                            standardized_residual_momentum[symbol] = sum(residual_returns) / np.std(residual_returns)
        
                    sorted_by_resid_momentum:List[Symbol] = sorted(standardized_residual_momentum, key=standardized_residual_momentum.get , reverse=True)
                    quantile:int = int(len(sorted_by_resid_momentum) / self.quantile)
                    self.long = sorted_by_resid_momentum[:quantile]
                    self.short = sorted_by_resid_momentum[-quantile:]
 
        return self.long + self.short
    
    def OnData(self, data: Slice) -> None:
        if self.Time.month != self.last_month:
            self.last_month = self.Time.month
            self.selection_flag = True
            return
        
        if not self.selection_flag:
            return
        self.selection_flag = False
        
        # Trade execution.
        targets:List[PortfolioTarget] = []
        for i, portfolio in enumerate([self.long, self.short]):
            for symbol in portfolio:
                if symbol in data and data[symbol]:
                    targets.append(PortfolioTarget(symbol, ((-1) ** i) / len(portfolio)))
        
        self.SetHoldings(targets, True)

        self.long.clear()
        self.short.clear()

    def CalculateFactorPerformance(self, data, factor_symbols) -> float:
        monthly_return = 0
        if len(factor_symbols) != 0:
            for symbol, long_flag in factor_symbols:
                if symbol in data and data[symbol].Count >= 2:
                    closes = [x for x in data[symbol]]
                    if long_flag:
                        monthly_return += ((closes[0] / closes[1] - 1) / len(factor_symbols))
                    else:
                        monthly_return -= ((closes[0] / closes[1] - 1) / len(factor_symbols))

        return monthly_return

    def MultipleLinearRegression(self, x, y):
        x = np.array(x).T
        x = sm.add_constant(x)
        result = sm.OLS(endog=y, exog=x).fit()
        return result

# Custom fee model.
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))