Overall Statistics
Total Orders
3560
Average Win
0.60%
Average Loss
-0.64%
Compounding Annual Return
-1.721%
Drawdown
67.800%
Expectancy
-0.018
Start Equity
100000
End Equity
64379.21
Net Profit
-35.621%
Sharpe Ratio
-0.231
Sortino Ratio
-0.263
Probabilistic Sharpe Ratio
0.000%
Loss Rate
49%
Win Rate
51%
Profit-Loss Ratio
0.94
Alpha
-0.027
Beta
-0.02
Annual Standard Deviation
0.12
Annual Variance
0.014
Information Ratio
-0.337
Tracking Error
0.202
Treynor Ratio
1.35
Total Fees
$447.97
Estimated Strategy Capacity
$250000000.00
Lowest Capacity Asset
COTY VHDII80EJ8MD
Portfolio Turnover
1.47%
# region imports
from AlgorithmImports import *
from dateutil.relativedelta import relativedelta
import statsmodels.api as sm
# endregion

class SymbolData():
    def __init__(self, period: int) -> None:
        self._closes: RollingWindow = RollingWindow[float](period)

    def update_data(self, price: int) -> None:
        self._closes.add(price)

    def get_returns(self) -> np.ndarray:
        returns: np.ndarray = pd.Series(list(self._closes)[::-1]).pct_change().dropna().values
        return returns
        
    def get_last_return(self) -> np.ndarray:
        performance: np.ndarray = pd.Series(list(self._closes)[::-1][2:]).pct_change().dropna().values[0]
        return performance

    def is_ready(self) -> bool:
        return self._closes.is_ready

def multiple_linear_regression(x: np.ndarray, y: np.ndarray):
    x: np.ndarray = np.array(x).T
    x = sm.add_constant(x)
    result = sm.OLS(endog=y, exog=x).fit()
    return result

# Import headquarters data.
# Source: https://en.wikipedia.org/wiki/List_of_S%26P_500_companies
def headquarters_tickers(algo: QCAlgorithm) -> Dict[str, List[str]]:
    headquarters_tickers: Dict[str, List[str]] = {}
    tickers: Set[str] = set()

    csv_string_file: str = algo.Download('data.quantpedia.com/backtesting_data/economic/sp_headquarters.csv')
    lines: List[str] = csv_string_file.split('\r\n')
    for line in lines[1:]:
        line_split: List[str] = line.split(';')
        location: str = line_split[1]
        ticker: str = line_split[0]
        
        if location not in headquarters_tickers:
            headquarters_tickers[location] = []
        headquarters_tickers[location].append(ticker)
        
        tickers.add(ticker)

    return headquarters_tickers, tickers
# https://quantpedia.com/strategies/downside-risk-premium-in-us-stocks/
# 
# The investment universe for this strategy consists of common stocks listed on NYSE, Amex, and Nasdaq, as specified in the research paper. (Stocks are further 
# filtered to exclude those with prices below five dollars, market capitalization below the 20th percentile of the NYSE breakpoint, and financial firms with SIC 
# codes from 6000 to 6999.)
# (Stock returns are from the Center for Research in Security Prices (CRSP). Accounting
# variables are from Compustat. Data on the firm's location is based on its headquarters, specifically the zip code of the firm's headquarters from the Compustat 
# database. Institutional holdings data and the Owner Information data from the Thomson Reuters 13F database.)
# Data Preparation: The selection criteria focus on stocks headquartered in Metropolitan Statistical Areas (MSAs) with at least 15 stocks to ensure sufficient data 
# for local market index construction. The primary selection criterion is the local downside beta (Local B-), where stocks with high local downside beta are 
# preferred, indicating higher sensitivity to adverse local market movements. The local market index is constructed by calculating the equal-weighted average returns 
# of stocks within the same MSA.
# Calculation: Compute the regression (using past 60-month returns against the local market index) pg. 11 with Reti,t, the monthly return of local market j, as the 
# dependent variable, and independent variables, Local Bi,t- is the local downside beta of stock i, estimated as the return sensitivity to the local market index 
# when its return is negative, and Local Bi,t+ is the local upside beta, calculated as the return sensitivity to the local market index if its return is positive.
# Sorting: At the end of each month, stocks are sorted into 10 (decile) portfolios based on their relative local B -, defined as local B - minus local B +.
# Strategy Execution: A long-short portfolio is formed by buying the highest decile and shorting the lowest decile portfolio.
# Weighting & Rebalancing: All portfolios are value-weighted. The portfolios are then rebalanced quarterly.
# 
# QC implementation changes:
#   - Instead of all stocks, we select current s&p 500 stocks (https://en.wikipedia.org/wiki/List_of_S%26P_500_companies) so headquarters data is available.

# region imports
from AlgorithmImports import *
import data_tools
from typing import List, Dict
from pandas.core.frame import DataFrame
from numpy import isnan
# endregion

class DownsideRiskPremiumInUSStocks(QCAlgorithm):

    _quantile: int = 10
    _period: int = 61
    _daily_period: int = 21
    _min_count: int = 3

    def initialize(self) -> None:
        self.set_start_date(2000, 1, 1)
        self.set_cash(100_000)

        leverage: int = 5

        self._data: Dict[Symbol, float] = {}
        self._local_area_index: Dict[str, RollingWindow] = {}
        self._weight: Dict[Symbol, float] = {}

        self._market: Symbol = self.add_equity('SPY', Resolution.DAILY).symbol

        self._headquarters_tickers, self._tickers = data_tools.headquarters_tickers(self)

        self._selection_flag: bool = False
        self._rebalance_flag: bool = False
        self.universe_settings.leverage = leverage
        self.universe_settings.resolution = Resolution.DAILY
        self.add_universe(self.fundamental_selection_function)
        self.settings.daily_precise_end_time = False
        self.settings.minimum_order_margin_portfolio_percentage = 0.
        self.set_security_initializer(lambda security: security.set_fee_model(CustomFeeModel()))

        self.schedule.on(self.date_rules.month_end(self._market),
                        self.time_rules.after_market_open(self._market),
                        self._selection)

    def fundamental_selection_function(self, fundamental: List[Fundamental]) -> List[Symbol]:
        if not self._selection_flag:
            return Universe.UNCHANGED
        self._selection_flag = False

        # Update the data every month.
        for stock in fundamental:
            symbol: Symbol = stock.symbol

            if symbol in self._data:
                self._data[symbol].update_data(stock.adjusted_price)

        selected: List[Fundamental] = [
            x for x in fundamental 
            if x.has_fundamental_data
            and x.symbol.value in self._tickers
        ]
        
        # price warmup
        for stock in [self._market] + selected:
            symbol: Symbol = stock.symbol if stock != self._market else stock

            if symbol not in self._data:
                self._data[symbol] = data_tools.SymbolData(self._period)
                history: DataFrame = self.history(symbol, self._period * self._daily_period, Resolution.DAILY)
                if history.empty:
                    self.log(f"Not enough data for {symbol} yet.")
                    continue
                data: DataFrame = history.loc[symbol]
                monthly_data: DataFrame = data.groupby(pd.Grouper(freq='MS')).first()
                for time, row in monthly_data.iterrows():
                    self._data[symbol].update_data(row.close)

        relative_local_betas: Dict[Symbol, float] = {}

        for local_area, tickers in self._headquarters_tickers.items():
            local_symbols: List[Symbol] = [
                stock for stock in selected 
                if stock.symbol.value in tickers
                and self._data[stock.symbol].is_ready()
            ]
            if len(local_symbols) < self._min_count:
                continue

            current_local_area_index: float = np.mean([self._data[stock.symbol].get_last_return() for stock in local_symbols])
            if local_area not in self._local_area_index:
                self._local_area_index[local_area] = RollingWindow[float](self._period - 1)
                local_area_index_returns: np.ndarray = np.mean([self._data[stock.symbol].get_returns() for stock in local_symbols], axis=0)
                for index in local_area_index_returns:
                    self._local_area_index[local_area].add(index)
            else:
                if not isnan(current_local_area_index):
                    self._local_area_index[local_area].add(current_local_area_index)

            if not self._local_area_index[local_area].is_ready or not self._data[self._market].is_ready():
                continue

            for stock in local_symbols:
                symbol: Symbol = stock.symbol
                local_area_index: Series = pd.Series(list(self._local_area_index[local_area])[::-1])
                y: np.ndarray = self._data[symbol].get_returns()
                x: np.ndarray = np.vstack((
                    local_area_index.where(local_area_index < 0, 0), 
                    local_area_index.where(local_area_index > 0, 0), 
                    self._data[self._market].get_returns()
                ))

                model = data_tools.multiple_linear_regression(x, y)

                relative_local_betas[stock] = model.params[1] - model.params[2]

        if len(relative_local_betas) < self._quantile:
            self.log('Not enough data for further calculation.')
            return Universe.UNCHANGED

        # Sort by betas.
        sorted_relative_local_betas: List[Fundamental] = sorted(relative_local_betas, key=relative_local_betas.get, reverse=True)
        quantile: int = len(sorted_relative_local_betas) // self._quantile
        long: List[Fundamental] = sorted_relative_local_betas[:quantile]
        short: List[Fundamental] = sorted_relative_local_betas[-quantile:]

        if self._rebalance_flag:
            # Weighting portfolio.
            for i, portfolio in enumerate([long, short]):
                mc_sum: float = sum(map(lambda x: x.market_cap, portfolio))
                if mc_sum == 0:
                    continue
                for stock in portfolio:
                    self._weight[stock.symbol] = ((-1)**i) * stock.market_cap / mc_sum

        return list(self._weight.keys())

    def on_data(self, slice: Slice) -> None:
        # Quarterly rebalance.
        if not self._rebalance_flag:
            return
        self._rebalance_flag = False

        # Order execution.
        portfolio: List[PortfolioTarget] = [PortfolioTarget(symbol, w) for symbol, w in self._weight.items() if slice.contains_key(symbol) and slice[symbol]]
        self.set_holdings(portfolio, True)
        self._weight.clear()

    def _selection(self) -> None:
        self._selection_flag = True
        if self.time.month % 3 == 0: 
            self._rebalance_flag = True

class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters: OrderFeeParameters) -> OrderFee:
        fee: float = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))