Overall Statistics
Total Trades
8575
Average Win
0.36%
Average Loss
-0.34%
Compounding Annual Return
1.009%
Drawdown
38.200%
Expectancy
0.022
Net Profit
26.195%
Sharpe Ratio
0.124
Probabilistic Sharpe Ratio
0.000%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
1.05
Alpha
0.011
Beta
-0.006
Annual Standard Deviation
0.086
Annual Variance
0.007
Information Ratio
-0.25
Tracking Error
0.184
Treynor Ratio
-1.921
Total Fees
$3590.10
Estimated Strategy Capacity
$340000000.00
Lowest Capacity Asset
EQIXD SKZ55CR4HH0L
# https://quantpedia.com/strategies/technical-indicators-predict-cross-sectional-expected-stock-returns/
# 
# The investment universe consists of all firms from the CRSP database listed on NYSE, AMEX, and NASDAQ. Firstly, exclude all firms with less than 60 monthly return observations.
# Secondly, construct 14 firm-level technical indicators based on three trend-following strategies (moving average, momentum, and volume-based indicators).
# The first strategy is based on the moving average rule, which forms the trading signals by comparing the two moving averages with different lengths.
# The second strategy is based on the momentum trading rule, which generates the trading signals by comparing the current stock price with its level n months ago.
# The third strategy is based on the “on-balance” volume rule, which generates the trading signals by evaluating the changes in stock trading volume.
# For a detailed description of the technical indicators’ construction, see section 2.2. Thirdly, each month t regress the return of each stock i on 14 technical indicators from month t-1,
# using a fixed window of the latest 60 monthly observations to estimate the return over the next month (see equations 5 and 6).
# To mitigate the overfitting problem, take the time-series average of the cross-sectional OLS estimated coefficients applying a 60-month smoothing window (see equations 7a, 7b, and 7c).
# At the end of each month, sort all stocks into value-weighted deciles based on their estimated returns in the next month.
# Buy the top decile (stocks with the highest expected returns) and sell the bottom decile (stocks with the lowest expected returns). 
# The resulting long-short portfolio is value-weighted and rebalanced monthly.
# 
# QC Implementation:
#   - Universe consists of top 500 U.S. stocks by dollar volume listed on NYSE, AMEX, and NASDAQ.

# region imports
from AlgorithmImports import *
import statsmodels.api as sm
# endregion

class TechnicalIndicatorsPredictCrossSectionalExpectedStockReturns(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2000, 1, 1)
        self.SetCash(100000)

        self.quantile:int = 10
        self.month_period:int = 21    
        self.regression_period:int = 60
        self.period:int = self.month_period * 12               
        self.long_periods:list[int] = [9 * self.month_period, 12 * self.month_period]
        self.short_periods:list[int] = [1* self.month_period, 2 * self.month_period, 3 * self.month_period]

        self.last_fine:list = []

        self.data:dict = {}
        self.weights:dict = {}

        self.symbol:Symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
        
        self.coarse_count:int = 500
        self.selection_flag:bool = False
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
        
        self.Schedule.On(self.DateRules.MonthStart(self.symbol), self.TimeRules.BeforeMarketClose(self.symbol, 0), self.Selection)

    def OnSecuritiesChanged(self, changes):
        for security in changes.AddedSecurities:
            security.SetFeeModel(CustomFeeModel())
            security.SetLeverage(5)

    def CoarseSelectionFunction(self, coarse):
        # update stocks data on daily basis
        for stock in coarse:
            symbol:Symbol = stock.Symbol
            
            if symbol in self.data:
                self.data[symbol].update(stock.AdjustedPrice, stock.Volume)

        if not self.selection_flag:
            return Universe.Unchanged
        
        selected:list = sorted([x for x in coarse if x.HasFundamentalData and x.Market == 'usa'],
                key=lambda x: x.DollarVolume, reverse=True)[:self.coarse_count]

        # warm up stock's data
        for stock in selected:
            symbol:Symbol = stock.Symbol

            if symbol not in self.data:
                self.data[symbol] = SymbolData(symbol, self.short_periods, self.long_periods, self.period)
                history = self.History(symbol, self.period, Resolution.Daily)
                if history.empty:
                    continue
                
                closes = history.loc[symbol].close
                volumes = history.loc[symbol].volume

                for (_, close), (_, volume) in zip(closes.iteritems(), volumes.iteritems()):
                    self.data[symbol].update(close, volume)

        return [x.Symbol for x in selected if self.data[x.Symbol].is_ready()]

    def FineSelectionFunction(self, fine):
        fine = [x for x in fine if x.MarketCap != 0 and ((x.SecurityReference.ExchangeId == "NYS") or (x.SecurityReference.ExchangeId == "NAS") or (x.SecurityReference.ExchangeId == "ASE"))]
        
        pred_returns:dict = {}

        for stock in fine:
            symbol:Symbol = stock.Symbol
            symbol_obj = self.data[symbol]

            # make sure data are consecutive
            if symbol not in self.last_fine:
                symbol_obj.clear_regression_data()

            # make sure regression data are ready
            if symbol_obj.is_regression_data_ready(self.regression_period):
                regression_x, regression_y = symbol_obj.get_regression_data(self.regression_period)
                x_transpose:np.array = np.array(regression_x).T

                # skip x series with the same value throughout the whole series since there's not clear decision to make for which zeroed series should be intercept
                x_variable_skip_indices:list[int] = self.GetIndicesOfSameValues(x_transpose=x_transpose)

                # use adjusted x variable for model building and for prediction
                adjusted_x_variable:list = [x for i, x in enumerate(x_transpose) if i not in x_variable_skip_indices]
                regression_x:np.array = np.array(adjusted_x_variable).T
                regression_model = sm.OLS(endog=regression_y, exog=regression_x).fit()
                regression_params:list[float] = list(regression_model.params)

                # update this month regression data
                symbol_obj.update_returns(self.month_period)
                symbol_obj.update_technical_indicators(self.long_periods)

                if symbol_obj.is_smoothing_window_ready(self.regression_period):
                    pred_params:list = symbol_obj.get_prediction_params(self.regression_period)
                    pred_x:list = symbol_obj.get_prediction_x()

                    # predict price based on previous technical indicators
                    stock_pred_return:float = self.CalcStockPrediction(pred_params, pred_x)

                    pred_returns[stock] = stock_pred_return

                # update smoothing window
                smoothing_window_entry:list[float] = []
                for i, x_series in enumerate(x_transpose):
                    if i in x_variable_skip_indices:
                        smoothing_window_entry.append(0)
                    else:
                        smoothing_window_entry.append(regression_params.pop(0))

                symbol_obj.update_smoothing_window(smoothing_window_entry)

            else:
                # update this month regression data
                symbol_obj.update_returns(self.month_period)
                symbol_obj.update_technical_indicators(self.long_periods)

        # last_fine helps to secure data consecution
        self.last_fine = [x.Symbol for x in fine]

        # make sure there are enough stock for selection
        if len(pred_returns) < self.quantile:
            return Universe.Unchanged

        quantile = int(len(pred_returns) / self.quantile)
        sorted_by_pred_returns = [x[0] for x in sorted(pred_returns.items(), key=lambda item: item[1])]

        # buy stocks with the highest expected return
        long_part = sorted_by_pred_returns[-quantile:]

        # sell stocks with the lowest expected return
        short_part = sorted_by_pred_returns[:quantile]

        total_long_cap = sum([x.MarketCap for x in long_part])
        for stock in long_part:
            self.weights[stock.Symbol] = stock.MarketCap / total_long_cap

        total_short_cap = sum([x.MarketCap for x in short_part])
        for stock in short_part:
            self.weights[stock.Symbol] = -stock.MarketCap / total_short_cap

        return [x for x in self.weights]
        
    def OnData(self, data):
        # rebalance monthly
        if not self.selection_flag:
            return
        self.selection_flag = False
        
        # trade execution
        invested:list = [x.Key for x in self.Portfolio if x.Value.Invested]
        for symbol in invested:
            if symbol not in self.weights:
                self.Liquidate(symbol)
                
        for symbol, w in self.weights.items():
            if self.Securities[symbol].Price != 0 and self.Securities[symbol].IsTradable:
                self.SetHoldings(symbol, w)
                
        self.weights.clear()

    def GetIndicesOfSameValues(self, x_transpose:np.array) -> list:
        x_variable_skip_indices:list= []

        for i, x_series in enumerate(x_transpose):
            # don't skip intercept
            if i != 0 and all(x_series[0] == x for x in x_series):
                x_variable_skip_indices.append(i)

        return x_variable_skip_indices

    def CalcStockPrediction(self, pred_params:list, pred_x:list) -> float:
        pred_value:float = 0

        for param, x_value in zip(pred_params, pred_x):
            pred_value += param * x_value

        return pred_value

    def Selection(self):
        self.selection_flag = True


class SymbolData():
    def __init__(self, symbol:Symbol, short_periods:list, long_periods:list, period:float) -> None:
        self.short_SMA:list = []
        self.long_SMA:list = []

        self.long_volumes:list = []
        self.short_volumes:list = []

        self.technical_indicators:list = []
        self.returns:list = []

        self.smoothing_window:list = []

        self.prices:RollingWindow = RollingWindow[float](period)

        for period in short_periods:
            self.short_SMA.append(RollingWindow[float](period))
            self.short_volumes.append(RollingWindow[float](period))

        for period in long_periods:
            self.long_SMA.append(RollingWindow[float](period))
            self.long_volumes.append(RollingWindow[float](period))

    def update(self, stock_price:float, stock_volume:float) -> None:
        for short_SMA_roll_win, short_volume in zip(self.short_SMA, self.short_volumes):
            short_SMA_roll_win.Add(stock_price)
            short_volume.Add(stock_volume)

        for long_SMA_roll_win, long_volume in zip(self.long_SMA, self.long_volumes):
            long_SMA_roll_win.Add(stock_price)
            long_volume.Add(stock_volume)

        self.prices.Add(stock_price)

    def is_ready(self) -> bool:
        for short_SMA_roll_win, short_volume in zip(self.short_SMA, self.short_volumes):
            if not short_SMA_roll_win.IsReady or not short_volume.IsReady:
                return False

        for long_SMA_roll_win, long_volume in zip(self.long_SMA, self.long_volumes):
            if not long_SMA_roll_win.IsReady or not long_volume.IsReady:
                return False

        return self.prices.IsReady

    def is_regression_data_ready(self, regression_period:int) -> bool:
        return len(self.technical_indicators) >= regression_period and len(self.returns) >= regression_period

    def is_smoothing_window_ready(self, regression_period:int) -> bool:
        return len(self.smoothing_window) >= regression_period

    def clear_regression_data(self):
        self.technical_indicators.clear()
        self.smoothing_window.clear()
        self.returns.clear()

    def update_returns(self, period:int):
        # make sure between regression x and y is right shift
        if len(self.technical_indicators) > 0:
            prices:list = [x for x in self.prices][:period]
            return_value:float = (prices[0] - prices[-1]) / prices[-1]

            self.returns.append(return_value)

    def update_technical_indicators(self, periods:list) -> list:
        technical_indicators_values:list = []

        # MA and OBV technical indicators
        for long_SMA_roll_win, long_volume in zip(self.long_SMA, self.long_volumes):
            mean_long_volume:float = np.mean([x for x in long_volume])
            long_SMA_value:float = self.calc_simple_moving_average([x for x in long_SMA_roll_win])

            for short_SMA_roll_win, short_volume in zip(self.short_SMA, self.short_volumes):
                mean_short_volume:float = np.mean([x for x in short_volume])
                short_SMA_value:float = self.calc_simple_moving_average([x for x in short_SMA_roll_win])

                if long_SMA_value > short_SMA_value:
                    technical_indicators_values.append(0)
                else:
                    technical_indicators_values.append(1)

                if mean_long_volume > mean_short_volume:
                    technical_indicators_values.append(0)
                else:
                    technical_indicators_values.append(1)

        prices:list = [x for x in self.prices]
        curr_price:float = prices[0]

        # MOM technical indicators
        for period in periods:
            if curr_price >= prices[period - 1]:
                technical_indicators_values.append(1)
            else:
                technical_indicators_values.append(0)

        self.technical_indicators.append(technical_indicators_values)

    def update_smoothing_window(self, smoothing_window_entry:list):
        self.smoothing_window.append(smoothing_window_entry)

    def calc_simple_moving_average(self, prices:list) -> float:
        return sum(prices) / len(prices)

    def get_regression_data(self, regression_period:int) -> list:
        x = self.technical_indicators[-regression_period:]
        # add constant
        x = [[1] + tech_indi for tech_indi in x]
        y = self.returns[-regression_period:]

        return x, y

    def get_prediction_params(self, regression_period:int) -> list:
        window_transpose:np.array = np.array(self.smoothing_window[-regression_period:]).T
        params:list = [np.mean(params_list) for params_list in window_transpose]

        return params

    def get_prediction_x(self) -> list:
        last_indicators:list = self.technical_indicators[-1]
        return [1] + last_indicators

# Custom fee model
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))