Overall Statistics
Total Orders
40589
Average Win
0.13%
Average Loss
-0.15%
Compounding Annual Return
-1.403%
Drawdown
74.300%
Expectancy
0.000
Start Equity
100000
End Equity
69878.61
Net Profit
-30.121%
Sharpe Ratio
-0.084
Sortino Ratio
-0.085
Probabilistic Sharpe Ratio
0.000%
Loss Rate
47%
Win Rate
53%
Profit-Loss Ratio
0.87
Alpha
0.001
Beta
-0.396
Annual Standard Deviation
0.185
Annual Variance
0.034
Information Ratio
-0.199
Tracking Error
0.283
Treynor Ratio
0.039
Total Fees
$3104.62
Estimated Strategy Capacity
$470000.00
Lowest Capacity Asset
SENEB R735QTJ8XC9X
Portfolio Turnover
7.17%
# region imports
from AlgorithmImports import *
from collections import deque
from dateutil.relativedelta import relativedelta
# endregion

class RegressionData():
    def __init__(self, period: int) -> None:
        self._daily_ff_data: List[Tuple[float]] = []
        self._data: Deque = deque(maxlen=period)

    def update_ff(self, ff_data) -> None:
        self._daily_ff_data.append((ff_data.value, ff_data.size, ff_data.market))

    def update_monthly_data(self, cpi: float) -> None:
        aggregated_ff = np.sum(np.array(self._daily_ff_data), axis=0)
        self._data.append(np.insert(aggregated_ff, 0, cpi))
        self._daily_ff_data.clear()

    def is_ready(self) -> bool:
        return len(self._data) == self._data.maxlen and len(self._daily_ff_data) != 0

    def get_regression_data(self) -> np.ndarray:
        data: np.ndarray = np.array(self._data)
        # Inflation data should be as difference.
        cpi_diff = np.diff(data[:, 0], prepend=0) 
        data[:, 0] = cpi_diff

        return data[1:]

class SymbolData():
    def __init__(self, period: int) -> None:
        self.closes: RollingWindow = RollingWindow[float](period)
        self._daily_closes: List[float] = []

    def update_daily_closes(self, close: float) -> None:
        self._daily_closes.append(close)

    def update_monthly_closes(self, close: float) -> None:
        self.closes.add(close)

    def get_max_return(self) -> np.ndarray:
        returns: np.ndarray = pd.Series(self._daily_closes).pct_change().dropna()
        return max(returns)

    def get_monthly_returns(self) -> np.ndarray:
        returns: np.ndarray = pd.Series(list(self.closes)[::-1]).pct_change().dropna().values
        return returns
        
    def is_ready(self) -> bool:
        return self.closes.is_ready and len(self._daily_closes) >= 10

    def reset(self) -> None:
        self._daily_closes.clear()

class LastDateHandler():
    _last_update_date:Dict[Symbol, datetime.date] = {}

    @staticmethod
    def get_last_update_date() -> Dict[Symbol, datetime.date]:
       return LastDateHandler._last_update_date

class CPI(PythonData):
    def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
        return SubscriptionDataSource(f'data.quantpedia.com/backtesting_data/economic/{config.Symbol.Value}.csv', SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

    def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
        data = CPI()
        data.Symbol = config.Symbol

        if not line[0].isdigit(): return None
        split = line.split(';')
        
        if split[1] == '.':
            return None
        # Parse the CSV file's columns into the custom data class
        data.Time = datetime.strptime(split[0], "%Y-%m-%d") + relativedelta(months=1)
        data.Value = float(split[1])

        if config.Symbol not in LastDateHandler._last_update_date:
            LastDateHandler._last_update_date[config.Symbol] = datetime(1,1,1).date()
        if data.Time.date() > LastDateHandler._last_update_date[config.Symbol]:
            LastDateHandler._last_update_date[config.Symbol] = data.Time.date()
        
        return data

# Quantpedia data.
# NOTE: IMPORTANT: Data order must be ascending (datewise)
class QuantpediaFamaFrench(PythonData):
    def GetSource(self, config, date, isLiveMode):
        return SubscriptionDataSource("data.quantpedia.com/backtesting_data/equity/fama_french/fama_french_3_factor.csv".format(config.Symbol.Value), SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

    def Reader(self, config, line, date, isLiveMode):
        data = QuantpediaFamaFrench()
        data.Symbol = config.Symbol
        
        if not line[0].isdigit():
            return None
        
        split = line.split(';')
        
        data.Time = datetime.strptime(split[0], "%d.%m.%Y") + relativedelta(months=1)
        data['market'] = float(split[1])
        data['size'] = float(split[2])
        data['value'] = float(split[3])

        if config.Symbol not in LastDateHandler._last_update_date:
            LastDateHandler._last_update_date[config.Symbol] = datetime(1,1,1).date()
        if data.Time.date() > LastDateHandler._last_update_date[config.Symbol]:
            LastDateHandler._last_update_date[config.Symbol] = data.Time.date()

        return data
        
# https://quantpedia.com/strategies/inflation-gamble-stocks/
# 
# The investment universe for this strategy consists of common stocks listed in the CRSP database, excluding those in the financial sector and firms with negative 
# book equity. Stocks must have an end-of-month share price of at least $1.
# (Stock returns data from the Center for Research in Security Prices (CRSP). Firm-level financial information is from Compustat. Obtain data on Internet search 
# intensity for four gambling-related terms using Google Trends: search intensities for “gambling,” “lottery,” “Powerball,” and “sports betting.” State-level lottery 
# revenues are from the U.S. Census Bureau’s Annual State and Local Government Finances Survey. Professor Kenneth French’s data library contains factor returns and 
# risk-free rates. The financial data with data on consumer prices from Federal Reserve Economic Data (FRED).)
# Rationale Recap: The strategy identifies lottery-type stocks, specifically those in the top quintile of idiosyncratic volatility (IVOL) and maximum daily return 
# (MAXRET). Additionally, it targets stocks with high inflation sensitivity, measured by iBeta, calculated by regressing excess stock returns on monthly inflation 
# innovations and the three Fama-French factors over the past 60 months.
# Variables Briefing: The strategy uses several indicators and tools derived from the research paper: iBeta is used to identify stocks with high inflation sensitivity, 
# while IVOL and MAXRET classify lottery-type stocks. The methodology involves shorting stocks with high iBeta, IVOL, and MAXRET, particularly those with high retail 
# trading intensity (RTI) and a high Catholic-to-Protestant ratio (CPRATIO), in the firm’s headquarters location.
# In detail described in 2.3. Defining Lottery-Type Stocks, our selected MAXRET is computed as the maximum daily return over the previous month
# Formation: Double-sorted portfolios: The portfolios are formed by independently sorting stocks into five at the end of every month based on first inflation sensitivity 
# (iBeta) and second one of the measures of lottery-stock characteristics, we select MAXRET (maximum daily return over the previous month).
# Trading Strategy Execution: The strategy involves constructing a long-short Reverse High−Low portfolio so, in the High Inflation Beta quartile:
# Long (buy) stocks with Low MAXRET, and (sell) short stocks with High MAXRET.
# Rebalancing & Weighting: The strategy is rebalanced monthly to adjust for changes in underlying indicators and capture new opportunities as they arise. The 
# value-weighted method is obeyed.
# 
# QC Implementation changes:
#   - Universe consists of 3000 largest stocks from NYSE, AMEX and NASDAQ.

# region imports
from AlgorithmImports import *
import data_tools
from dateutil.relativedelta import relativedelta
import statsmodels.api as sm
# endregion

class InflationGambleStocks(QCAlgorithm):

    _quantile: int = 5
    _min_share_price: int = 1
    _period: int = 60
    _daily_period: int = 21

    def initialize(self) -> None:
        self.set_start_date(2000, 1, 1)
        self.set_cash(100_000)

        self._tickers_to_ignore: List[str] = ['SGA', 'KELYB']
        self._exchange_codes: List[str] = ['NYS', 'NAS', 'ASE']    

        leverage: int = 5

        self._data: Dict[Symbol, float] = {}
        self._weight: Dict[Symbol, float] = {}

        self._regression_data: data_tools.RegressionData = data_tools.RegressionData(self._period)
        self.fama_french_data: Dict[Symbol, float] = {}

        market: Symbol = self.add_equity('SPY', Resolution.DAILY).symbol
        self._cpi: Symbol = self.add_data(data_tools.CPI, 'CPIAUCSL', Resolution.DAILY).symbol
        self._fama_french: Symbol = self.add_data(data_tools.QuantpediaFamaFrench, 'fama_french_3_factor', Resolution.DAILY).symbol

        self._fundamental_count: int = 3_000
        self._fundamental_sorting_key = lambda x:x.market_cap

        self._selection_flag: bool = False
        self.universe_settings.leverage = leverage
        self.universe_settings.resolution = Resolution.DAILY
        self.add_universe(self.fundamental_selection_function)
        self.settings.daily_precise_end_time = False
        self.settings.minimum_order_margin_portfolio_percentage = 0.
        self.set_security_initializer(lambda security: security.set_fee_model(CustomFeeModel()))

        self.schedule.on(self.date_rules.month_start(market),
                        self.time_rules.after_market_open(market),
                        self.selection)

    def fundamental_selection_function(self, fundamental: List[Fundamental]) -> List[Symbol]:
        # update the data every day
        for stock in fundamental:
            symbol: Symbol = stock.symbol
            
            if symbol in self._data:
                self._data[symbol].update_daily_closes(stock.adjusted_price)
                if self._selection_flag:
                    self._data[symbol].update_monthly_closes(stock.adjusted_price)

        custom_data_last_update_date: Dict[Symbol, datetime.date] = data_tools.LastDateHandler.get_last_update_date()

        # custom data is still coming in
        if any(self.Securities[x].get_last_data() and self.time.date() > custom_data_last_update_date[x] for x in [self._fama_french, self._cpi]):
            self.log('Custom data stopped coming.')
            return Universe.Unchanged

        if not self._selection_flag:
            return Universe.UNCHANGED

        selected: List[Fundamental] = [
            x for x in fundamental 
            if x.has_fundamental_data
            and x.price > self._min_share_price
            and x.security_reference.exchange_id in self._exchange_codes
            and x.asset_classification.morningstar_sector_code != MorningstarSectorCode.FINANCIAL_SERVICES
            and x.symbol.value not in self._tickers_to_ignore
        ]
        
        if len(selected) > self._fundamental_count:
            selected = [x for x in sorted(selected, key=self._fundamental_sorting_key, reverse=True)[:self._fundamental_count]]

        beta: Dict[Symbol, float] = {}
        maxret: Dict[Symbol, float] = {}

        # price warmup
        for stock in selected:
            symbol: Symbol = stock.symbol

            if symbol not in self._data:
                self._data[symbol] = data_tools.SymbolData(self._period)
                history: DataFrame = self.history(symbol, self._period * self._daily_period, Resolution.DAILY)
                if history.empty:
                    self.log(f"Not enough data for {symbol} yet.")
                    continue
                data: DataFrame = history.loc[symbol]
                last_month_data: DataFrame = data[(data.index >= self.time - relativedelta(months=1)) & (data.index <= self.time - timedelta(days=1))]
                monthly_data: Series = data.groupby(pd.Grouper(freq='MS')).first()
                for time, row in last_month_data.iterrows():
                    self._data[symbol].update_daily_closes(row.close)
                for time, row in monthly_data.iterrows():
                    self._data[symbol].update_monthly_closes(row.close)

            if self._data[symbol].is_ready() and self._regression_data.is_ready():
                x: np.ndarray = self._regression_data.get_regression_data()
                y: np.ndarray = self._data[symbol].get_monthly_returns()

                model = self.multiple_linear_regression(x, y)
                beta[stock] = model.params[1]
                maxret[stock] = self._data[symbol].get_max_return()
                self._data[symbol].reset()

        if len(beta) < self._quantile ** 2:
            self.log('Not enough data for further calculation.')
            return Universe.UNCHANGED

        long: List[Fundamental] = []
        short: List[Fundamental] = []

        # Double sorting on beta and max return in last month.
        sorted_beta: List[Fundamental] = sorted(beta, key=beta.get, reverse=True)
        quantile: int = len(sorted_beta) // self._quantile
        high_beta: List[Fundamental] = sorted_beta[:quantile]

        sorted_maxret: List[Tuple[Fundamental, float]] = sorted({symbol: value for symbol, value in maxret.items() if symbol in high_beta}.items(), key=lambda x:x[1])
        quantile: int = len(sorted_maxret) // self._quantile
        long = [x[0] for x in sorted_maxret][:quantile]
        short = [x[0] for x in sorted_maxret][-quantile:]

        # Weighting portfolio.
        for i, portfolio in enumerate([long, short]):
            mc_sum: float = sum(map(lambda stock:stock.market_cap, portfolio))
            if mc_sum == 0:
                continue
            for stock in portfolio:
                self._weight[stock.symbol] = ((-1)**i) * stock.market_cap / mc_sum

        return list(self._weight.keys())

    def on_data(self, slice: Slice) -> None:
        # update fama french and inflation.
        if self.securities[self._cpi].get_last_data() and self.securities[self._fama_french].get_last_data():
            ff_data: PythonData = self.securities[self._fama_french].get_last_data()
            cpi_data: PythonData = self.securities[self._cpi].get_last_data()
            self._regression_data.update_ff(ff_data)
            if self._selection_flag:
                self._regression_data.update_monthly_data(cpi_data.value)

        # Order execution.
        if not self._selection_flag:
            return
        self._selection_flag = False

        portfolio: List[PortfolioTarget] = [PortfolioTarget(symbol, w) for symbol, w in self._weight.items() if slice.contains_key(symbol) and slice[symbol]]
        self.set_holdings(portfolio, True)
        self._weight.clear()

    def selection(self) -> None:
        self._selection_flag = True

    def multiple_linear_regression(self, x: np.ndarray, y: np.ndarray):
        # x: np.ndarray = np.array(x).T
        x = sm.add_constant(x)
        result = sm.OLS(endog=y, exog=x).fit()
        return result

class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters: OrderFeeParameters) -> OrderFee:
        fee: float = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))