Overall Statistics
Total Orders
855
Average Win
1.60%
Average Loss
-1.49%
Compounding Annual Return
2.915%
Drawdown
42.900%
Expectancy
0.074
Start Equity
100000
End Equity
138575.71
Net Profit
38.576%
Sharpe Ratio
0.066
Sortino Ratio
0.065
Probabilistic Sharpe Ratio
0.066%
Loss Rate
48%
Win Rate
52%
Profit-Loss Ratio
1.08
Alpha
-0.018
Beta
0.382
Annual Standard Deviation
0.151
Annual Variance
0.023
Information Ratio
-0.376
Tracking Error
0.166
Treynor Ratio
0.026
Total Fees
$2778.97
Estimated Strategy Capacity
$0
Lowest Capacity Asset
FAMA_FRENCH_5_SIZE_EQ.FFFactorsEQ 2S
Portfolio Turnover
11.04%
#region imports
from AlgorithmImports import *
from dateutil.relativedelta import relativedelta
from pandas.core.frame import DataFrame

#endregion

class SymbolData():
    def __init__(
        self, 
        algo: QCAlgorithm,
        year_period: int,
        correlation_period: int,
        z_score_period: int,
        cap: int,
        count: int,
        predictor_symbols: List[Symbol],
        correlation_symbols: List[Symbol],
        custom_data_symbols: List[Symbol]
    ) -> None:
    
        self._algo: QCAlgorithm = algo
        self._year_period: int = year_period
        self._correlation_period: int = correlation_period
        self._z_score_period: int = z_score_period
        self._cap: int = cap
        self._count: int = count
        self._predictor_symbols: List[Symbol] = predictor_symbols
        self._correlation_symbols: List[Symbol] = correlation_symbols
        self._custom_data_symbols: List[Symbol] = custom_data_symbols

        self._z_score_factor_df: DataFrame = pd.DataFrame()

    def _get_predictors_history(self, symbols: List[Symbol]) -> DataFrame:
        history: DataFrame = self._algo.history(
            symbols,
            start=self._algo.time - relativedelta(
                months=self._correlation_period + self._z_score_period + self._year_period
            ),
            end=self._algo.time
        ).unstack(level=0).groupby(pd.Grouper(freq='MS')).first()

        return history

    def _is_ready(self, history: DataFrame) -> bool:
        return len(history[self._correlation_symbols].dropna()) >= self._correlation_period + self._z_score_period + self._year_period

    def _get_correlation(self, history: DataFrame) -> DataFrame:
        correlation: DataFrame = (
            history[self._correlation_symbols]
            .pct_change()
            .dropna()
            .rolling(window=self._correlation_period)
            .corr()
            .dropna()
            .xs(self._correlation_symbols[0], level=1)[self._correlation_symbols[1]]
        )

        return correlation        

    def _get_change(self, history: DataFrame, history_custom_data: DataFrame) -> DataFrame:
        df_combined: DataFrame = pd.concat([
            history.drop([self._correlation_symbols[1]], axis=1), 
            self._get_correlation(history), 
            history_custom_data
        ], axis=1, join='outer')

        df_change: DataFrame = df_combined - df_combined.shift(self._year_period)
        return df_change.dropna()

    def _correlation_is_ready(self, df: DataFrame) -> bool:
        return len(df.columns) == self._count or len(df) >= self._z_score_period

    def _update_z_score(self, df: DataFrame, ff_symbols: List[Symbol]) -> None:
        # Calculate z-score, capped 3, -3.
        z_score: float = sum(np.clip(((df.iloc[-1] - df.mean()) / df.std()).values, -self._cap, self._cap))
        
        # Fama french factors returns.
        factor_history: DataFrame = self._algo.history(ff_symbols, start=self._algo.time - relativedelta(months=1), end=self._algo.time).unstack(level=0)
        returns = factor_history.value.iloc[-1] / factor_history.value.iloc[0] - 1

        df_combined = pd.concat([pd.DataFrame({'z-score': z_score}, index=[self._algo.time.date()]).reset_index(), pd.DataFrame([returns])], axis=1).set_index('index')
        self._z_score_factor_df = pd.concat([self._z_score_factor_df, df_combined])        

    def _df_is_ready(self) -> bool:
        return len(self._z_score_factor_df) >= self._year_period

class LastDateHandler():
    _last_update_date:Dict[Symbol, datetime.date] = {}

    @staticmethod
    def get_last_update_date() -> Dict[Symbol, datetime.date]:
       return LastDateHandler._last_update_date

# Source: https://mba.tuck.dartmouth.edu/pages/faculty/ken.french/data_library.html
class FFFactorsEQ(PythonData):
    def GetSource(self, config, date, isLiveMode):
        return SubscriptionDataSource(f'data.quantpedia.com/backtesting_data/equity/fama_french/{config.Symbol.Value.lower()}.csv', SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

    _last_update_date:datetime.date = datetime(1,1,1).date()

    @staticmethod
    def get_last_update_date() -> Dict[Symbol, datetime.date]:
       return FFFactorsEQ._last_update_date

    def Reader(self, config, line, date, isLiveMode):
        data = FFFactorsEQ()
        data.Symbol = config.Symbol

        if not line[0].isdigit(): return None
        split = line.split(';')
        
        # Parse the CSV file's columns into the custom data class
        data.Time = datetime.strptime(split[0], "%d.%m.%Y") + timedelta(days=1)
        data.Value = float(split[1])

        if config.Symbol not in LastDateHandler._last_update_date:
            LastDateHandler._last_update_date[config.Symbol] = datetime(1,1,1).date()
        if data.Time.date() > LastDateHandler._last_update_date[config.Symbol]:
            LastDateHandler._last_update_date[config.Symbol] = data.Time.date()
        
        return data

# Source: https://fred.stlouisfed.org/series/T10Y3M
class DailyCustomData(PythonData):
    _last_update_date:Dict[Symbol, datetime.date] = {}

    @staticmethod
    def get_last_update_date() -> Dict[Symbol, datetime.date]:
       return DailyCustomData._last_update_date

    def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
        return SubscriptionDataSource(f'data.quantpedia.com/backtesting_data/economic/{config.Symbol.Value}.csv', SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

    def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
        data = DailyCustomData()
        data.Symbol = config.Symbol

        if not line[0].isdigit(): return None
        split = line.split(';')
        
        # Parse the CSV file's columns into the custom data class
        data.Time = datetime.strptime(split[0], "%Y-%m-%d") + timedelta(days=1)
        if split[1] != '':
            data.Value = float(split[1])

        if config.Symbol not in LastDateHandler._last_update_date:
            LastDateHandler._last_update_date[config.Symbol] = datetime(1,1,1).date()
        if data.Time.date() > LastDateHandler._last_update_date[config.Symbol]:
            LastDateHandler._last_update_date[config.Symbol] = data.Time.date()

        return data

# Source: https://fred.stlouisfed.org/series/GS3M
class MonthlyCustomData(PythonData):
    def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
        return SubscriptionDataSource(f'data.quantpedia.com/backtesting_data/economic/{config.Symbol.Value}.csv', SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

    def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
        data = MonthlyCustomData()
        data.Symbol = config.Symbol
        
        if not line[0].isdigit(): return None
        split: str = line.split(';')
        
        data.Time = datetime.strptime(split[0], "%Y-%m-%d") + relativedelta(months=1)
        if split[1] != '':
            data.Value = float(split[1])

        if config.Symbol not in LastDateHandler._last_update_date:
            LastDateHandler._last_update_date[config.Symbol] = datetime(1,1,1).date()
        if data.Time.date() > LastDateHandler._last_update_date[config.Symbol]:
            LastDateHandler._last_update_date[config.Symbol] = data.Time.date()

        return data

# Custom fee model.
class CustomFeeModel(FeeModel):
    def GetOrderFee(self, parameters):
        fee = parameters.Security.Price * parameters.Order.AbsoluteQuantity * 0.00005
        return OrderFee(CashAmount(fee, "USD"))
# https://quantpedia.com/strategies/switching-regimes-factor-strategy/
# 
# The investment universe for this strategy consists of six long-short stock factors, using the Fama-French five research factors (Market, Size, Value, Profitability, 
# and Investment) plus the 12-month Momentum factor. (As outlined in the research paper, these factors are selected based on their historical performance and relevance 
# in different economic regimes. The instruments within these factors are selected based on their alignment with the identified economic state variables and their 
# historical performance in similar regimes.)
# (Complete data sources are described in Exhibit 2. Economic state variables and sources.)
# Recapitulation: The strategy employs a systematic approach using economic state variables to identify current economic regimes. The tools and indicators used include 
# z-scores of selected economic variables such as the S&P500 index level, yield curve slope, crude oil, copper prices, treasury bill yield, VIX, and stock-bond correlation. 
# The methodology involves calculating the similarity score by measuring the squared distance of today’s z-score to historical observations.
# Trading Instructions:
# Investor forms 2 portfolios – quintile 1 and 5.
# For the quintile 1 portfolio, we are using the 20% most similar historical dates. We are long a factor if the average of returns subsequent to the most similar dates 
# was positive, and short if it was negative.
# For the quintile 5 portfolio, we are using the 20% most dis-similar historical dates. We are long a factor if the average of returns subsequent to the most dis-similar 
# dates was positive, and short if it was negative.
# Create a less correlated z-score difference portfolio by going long the quintile 1 portfolio and short the quintile 5 portfolios (as illustrated in the right-hand panel of 
# Exhibit 10).
# Rebalancing & Weighting: The strategy involves monthly rebalancing to adjust positions based on updated similarity scores. Choose equal weights for all portfolio 
# positions.

# region imports
from AlgorithmImports import *
from dateutil.relativedelta import relativedelta
import data_tools
from typing import List
from pandas.core.frame import DataFrame
# endregion

class SwitchingRegimesFactorStrategy(QCAlgorithm):

    def initialize(self) -> None:
        self.set_start_date(2014, 1, 1)
        self.set_cash(100_000)

        leverage: int = 5
        self._quantile: int = 5
        self._year_period: int = 12
        self._correlation_period: int = 3 * self._year_period
        self._z_score_period: int = 3 * self._year_period
        self._cap: int = 3
        self._predictors_count: int = 7

        ff_factors: List[str] = [
            'fama_french_5_market_eq',
            'fama_french_5_investment_eq',
            'fama_french_5_profitability_eq',
            'fama_french_5_size_eq',
            'fama_french_5_value_eq',
            'fama_french_5_momentum_eq'
        ]
        
        self._ff_factors: List[Symbol] = []

        for ticker in ff_factors:
            security: Security = self.add_data(data_tools.FFFactorsEQ, ticker, Resolution.DAILY)
            security.set_leverage(leverage)
            security.set_fee_model(data_tools.CustomFeeModel())
            self._ff_factors.append(security.symbol)

        predictor_tickers: List[str] = ['USO', 'CPER', 'VIX']
        correlation_tickers: List[str] = ['SPY', 'IEF']
        self._predictor_symbols: List[Symbol] = []

        for ticker in predictor_tickers:
            if ticker == 'VIX':
                self._predictor_symbols.append(self.add_data(CBOE, ticker, Resolution.DAILY).symbol)
            else:
                self._predictor_symbols.append(self.add_equity(ticker, Resolution.DAILY).symbol)

        self._correlation_symbols: List[Symbol] = [
            self.add_equity(ticker, Resolution.DAILY).symbol for ticker in correlation_tickers
        ]   

        self._market_yield: Symbol = self.add_data(data_tools.MonthlyCustomData, 'GS3M', Resolution.DAILY).symbol
        self._yield_curve: Symbol = self.add_data(data_tools.DailyCustomData, 'T10Y3M', Resolution.DAILY).symbol
        self._custom_data_symbols: List[Symbol] = [self._market_yield, self._yield_curve]

        self._data: data_tools.SymbolData = data_tools.SymbolData(
            self,
            self._year_period,
            self._correlation_period,
            self._z_score_period,
            self._cap,
            self._predictors_count,
            self._predictor_symbols,
            self._correlation_symbols,
            self._custom_data_symbols
        )

        self._selection_flag: bool = False
        self.schedule.on(
            self.date_rules.month_start(self._correlation_symbols[0]),
            self.time_rules.after_market_open(self._correlation_symbols[0]),
            self._selection)   

        self.settings.daily_precise_end_time = False
        self.settings.minimum_order_margin_portfolio_percentage = 0.    

    def on_data(self, slice: Slice) -> None:
        custom_data_last_update_date: datetime.date = data_tools.LastDateHandler.get_last_update_date()

        # Custom data is still comming in.
        for symbol in self._ff_factors + [self._yield_curve, self._market_yield]:
            if self.securities[symbol].get_last_data() and self.time.date() >= custom_data_last_update_date[symbol]:
                self.liquidate()
                self.log(f'Data stopped coming for symbol: {symbol.value}. Terminating backtest.')
                return 

        if not self._selection_flag:
            return
        self._selection_flag = False

        history: DataFrame = self._data._get_predictors_history(self._predictor_symbols + self._correlation_symbols).close
        history_custom_data: DataFrame = self._data._get_predictors_history(self._custom_data_symbols).value

        if not self._data._is_ready(history):
            self.log('Not enough data for calculating correlation.')
            return

        # 12-month predictor variables change.
        df_change: DataFrame = self._data._get_change(history, history_custom_data)

        if not self._data._correlation_is_ready(df_change):
            self.log('Not enough data for z-score calculation.')
            return

        self._data._update_z_score(df_change, self._ff_factors)

        if not self._data._df_is_ready():
            return

        last_value: float = self._data._z_score_factor_df.iloc[-1]['z-score']

        df_copy: DataFrame = self._data._z_score_factor_df.copy()
        # Compute the absolute z-score difference.
        df_copy['z-score difference'] = abs(df_copy['z-score'] - last_value)

        # Sort by similarity (ascending order of z-score difference).
        df_sorted: DataFrame = df_copy.iloc[:-1].sort_values(by='z-score difference')

        quantile: int = len(df_sorted) // self._quantile
        top_similar_dates: DataFrame = df_sorted.iloc[:quantile].iloc[:, 1:-1].mean()
        bottom_similar_dates: DataFrame = df_sorted.iloc[-quantile:].iloc[:, 1:-1].mean()
        top_long: List[Symbol] = top_similar_dates.sort_values().index[-1]
        top_short: List[Symbol] = top_similar_dates.sort_values().index[0]
        bottom_long: List[Symbol] = bottom_similar_dates.sort_values().index[-1]
        bottom_short: List[Symbol] = bottom_similar_dates.sort_values().index[0]

        self.liquidate()
        for i, portfolio in enumerate([[top_long, bottom_long], [top_short, bottom_short]]):
            # for n, subportfolio in enumerate(portfolio):
            for symbol in portfolio:
                if slice.contains_key(symbol) and slice[symbol]:
                    quantity: int = self.portfolio.total_portfolio_value / len(portfolio) // slice[symbol].price
                    self.market_order(symbol, ((-1) ** i) * quantity)

    def _selection(self) -> None:
        self._selection_flag = True