Overall Statistics
Total Trades
59
Average Win
0.83%
Average Loss
-3.77%
Compounding Annual Return
7.427%
Drawdown
50.300%
Expectancy
0.031
Net Profit
465.515%
Sharpe Ratio
0.281
Sortino Ratio
0.279
Probabilistic Sharpe Ratio
0.029%
Loss Rate
16%
Win Rate
84%
Profit-Loss Ratio
0.22
Alpha
0.007
Beta
0.808
Annual Standard Deviation
0.144
Annual Variance
0.021
Information Ratio
-0.018
Tracking Error
0.07
Treynor Ratio
0.05
Total Fees
$149.30
Estimated Strategy Capacity
$1500000000.00
Lowest Capacity Asset
SPY R735QTJ8XC9X
Portfolio Turnover
0.19%
#region imports
from AlgorithmImports import *
from dateutil.relativedelta import relativedelta
#endregion

class LastDateHandler():
    _last_update_date:Dict[Symbol, datetime.date] = {}
    
    @staticmethod
    def get_last_update_date() -> Dict[Symbol, datetime.date]:
       return LastDateHandler._last_update_date

# Quantpedia monthly custom data.
# NOTE: IMPORTANT: Data order must be ascending (datewise)
class QuantpediaMonthlyData(PythonData):
    def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
        return SubscriptionDataSource(f'data.quantpedia.com/backtesting_data/economic/{config.Symbol.Value}.csv', SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

    def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
        data = QuantpediaMonthlyData()
        data.Symbol = config.Symbol
        
        if not line[0].isdigit(): return None
        split: str = line.split(';')
        
        data.Time = datetime.strptime(split[0], "%Y-%m-%d") + relativedelta(months=1)
        data.Value = float(split[1])

        if config.Symbol not in LastDateHandler._last_update_date:
            LastDateHandler._last_update_date[config.Symbol] = datetime(1,1,1).date()
        if data.Time.date() > LastDateHandler._last_update_date[config.Symbol]:
            LastDateHandler._last_update_date[config.Symbol] = data.Time.date()

        return data

# Quantpedia bond yield data.
# NOTE: IMPORTANT: Data order must be ascending (datewise)
class QuantpediaBondYield(PythonData):
    def GetSource(self, config: SubscriptionDataConfig, date: datetime, isLiveMode: bool) -> SubscriptionDataSource:
        return SubscriptionDataSource("data.quantpedia.com/backtesting_data/bond_yield/{0}.csv".format(config.Symbol.Value), SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

    def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, isLiveMode: bool) -> BaseData:
        data = QuantpediaBondYield()
        data.Symbol = config.Symbol
        
        if not line[0].isdigit(): return None
        split = line.split(',')
        
        data.Time = datetime.strptime(split[0], "%Y-%m-%d") + timedelta(days=1)
        data['yield'] = float(split[1])
        data.Value = float(split[1])

        # store last update date
        if config.Symbol not in LastDateHandler._last_update_date:
            LastDateHandler._last_update_date[config.Symbol] = datetime(1,1,1).date()
        if data.Time.date() > LastDateHandler._last_update_date[config.Symbol]:
            LastDateHandler._last_update_date[config.Symbol] = data.Time.date()

        return data

class InterestRate3M(PythonData):
    def GetSource(self, config:SubscriptionDataConfig, date:datetime, isLiveMode:bool) -> SubscriptionDataSource:
        return SubscriptionDataSource("data.quantpedia.com/backtesting_data/interbank_rate/{0}.csv".format(config.Symbol.Value), SubscriptionTransportMedium.RemoteFile, FileFormat.Csv)

    def Reader(self, config:SubscriptionDataConfig, line:str, date:datetime, isLiveMode:bool) -> BaseData:
        data = InterestRate3M()
        data.Symbol = config.Symbol
        
        if not line[0].isdigit(): return None
        split = line.split(';')
        
        data.Time = datetime.strptime(split[0], "%Y-%m-%d") + relativedelta(months=1)
        data['value'] = float(split[1])
        data.Value = float(split[1])

        # store last update date
        if config.Symbol not in LastDateHandler._last_update_date:
            LastDateHandler._last_update_date[config.Symbol] = datetime(1,1,1).date()
        if data.Time.date() > LastDateHandler._last_update_date[config.Symbol]:
            LastDateHandler._last_update_date[config.Symbol] = data.Time.date()

        return data
# https://quantpedia.com/strategies/fed-model/
#
# Each month, the investor conducts a one-month predictive regression (using all available data up to that date) predicting excess stock market
# returns using the yield gap as an independent variable. The “Yield gap” is calculated as YG = EY − y, with earnings yield EY ≡ ln (1 ++ E/P)
# and y = ln (1 ++ Y) is the log 10 year Treasury bond yield. Then, the strategy allocates 100% in the risky asset if the forecasted excess
# returns are positive, and otherwise, it invests 100% in the risk-free rate.

from collections import deque
from AlgorithmImports import *
from typing import List, Tuple, Deque
import numpy as np
import data_tools
from scipy import stats

class FEDModel(QCAlgorithm):

    def Initialize(self) -> None:
        self.SetStartDate(2000, 1, 1)
        self.SetCash(100000)
        
        self.period: int = 12 * 21
        self.SetWarmUp(self.period)

        self.market: Symbol = self.AddEquity('SPY', Resolution.Daily).Symbol
        self.market_data: Deque[Tuple[float,float]] = deque()
        
        self.cash: Symbol = self.AddEquity('SHY', Resolution.Daily).Symbol
        
        # risk free rate
        self.risk_free_rate: Symbol = self.AddData(data_tools.InterestRate3M, 'IR3TIB01USM156N', Resolution.Daily).Symbol
        
        # 10Y bond yield symbol
        self.bond_yield: Symbol = self.AddData(data_tools.QuantpediaBondYield, 'US10YT', Resolution.Daily).Symbol
        
        # SP500 earnings yield data
        self.sp_earnings_yield: Symbol = self.AddData(data_tools.QuantpediaMonthlyData, 'SP500_EARNINGS_YIELD_MONTH', Resolution.Daily).Symbol

        self.yield_gap: Deque[float] = deque()
        
        self.Settings.MinimumOrderMarginPortfolioPercentage = 0.
        self.recent_month: int = -1
    
    def OnData(self, data: Slice) -> None:
        custom_data_last_update_date: Dict[Symbol, datetime.date] = data_tools.LastDateHandler.get_last_update_date()
        rebalance_flag: bool = False
        
        if self.sp_earnings_yield in data and data[self.sp_earnings_yield]:
            if self.Time.month != self.recent_month:
                self.recent_month = self.Time.month
                rebalance_flag = True

        if not rebalance_flag:
            # earnings yield data is no longer comming in
            if self.Securities[self.sp_earnings_yield].GetLastData():
                if self.Time.date() > custom_data_last_update_date[self.sp_earnings_yield]:
                    self.Liquidate()
            return

        # update market price data
        if self.market in data and self.bond_yield in data and data[self.market] and data[self.bond_yield]:
            if self.Securities[self.risk_free_rate].GetLastData() and self.Securities[self.bond_yield].GetLastData() and \
                self.Time.date() <= custom_data_last_update_date[self.risk_free_rate] and self.Time.date() <= custom_data_last_update_date[self.bond_yield]: 
                market_price: float = data[self.market].Value
                rf_rate: float = self.Securities[self.risk_free_rate].Price / 100
                bond_yield: float = data[self.bond_yield].Value / 100
                sp_ey: float = data[self.sp_earnings_yield].Value / 100
                
                if market_price != 0 and rf_rate != 0 and bond_yield != 0 and sp_ey != 0:
                    self.market_data.append((market_price, rf_rate))

                    yield_gap: float = np.log(sp_ey) - np.log(bond_yield)
                    self.yield_gap.append(yield_gap)
                    rebalance_flag = True
        
        # ensure minimum data points to calculate regression
        min_count: int = 6
        if len(self.market_data) >= min_count:
            market_closes: np.ndarray = np.array([x[0] for x in self.market_data])
            market_returns: np.ndarray = (market_closes[1:] - market_closes[:-1]) / market_closes[:-1]
            rf_rates: np.ndarray = np.array([x[1] for x in self.market_data][1:])
            excess_returns: np.ndarray = market_returns - rf_rates
            
            yield_gaps: List[float] = list(self.yield_gap)

            # linear regression
            # Y = α + (β ∗ X)
            # intercept = alpha
            # slope = beta
            beta, alpha, r_value, p_value, std_err = stats.linregress(yield_gaps[1:-1], market_returns[1:])
            
            # predicted market return
            Y: float = alpha + (beta * yield_gaps[-1])
            
            # trade execution / rebalance
            if Y > 0:
                if self.Portfolio[self.cash].Invested:
                    self.Liquidate(self.cash)
                self.SetHoldings(self.market, 1)
            else:
                if self.Portfolio[self.market].Invested:
                    self.Liquidate(self.market)
                self.SetHoldings(self.cash, 1)