Overall Statistics
Total Trades
11
Average Win
0%
Average Loss
-0.14%
Compounding Annual Return
-42.649%
Drawdown
1.000%
Expectancy
-1
Net Profit
-0.607%
Sharpe Ratio
-6.864
Probabilistic Sharpe Ratio
0%
Loss Rate
100%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0.168
Beta
-13.112
Annual Standard Deviation
0.067
Annual Variance
0.004
Information Ratio
-7.042
Tracking Error
0.072
Treynor Ratio
0.035
Total Fees
$0.00
Estimated Strategy Capacity
$260000.00
Lowest Capacity Asset
TSLA UNU3P8Y3WFAD
# region imports
from AlgorithmImports import *
# endregion
import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.decomposition import PCA
from sklearn.preprocessing import *
from statsmodels.tsa.stattools import adfuller, coint, grangercausalitytests
import statsmodels.api as sm
import datetime

# https://medium.com/@financialnoob/granger-causality-test-in-pairs-trading-bf2fd939e575
class PairTradingGrangerAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2019, 2, 8)     
        self.SetEndDate(2019, 2, 11)     
        self.SetCash(100000) 

        self.lookback = 500
        self.num_equities = 20
        
        self.selected_pairs = None

        self.UniverseSettings.Resolution = Resolution.Daily  
        self.AddUniverse(self.CoarseSelection)        
#        self.SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw))

        self.AddEquity("SPY")

#        self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 20), self.Liquidate)
        self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 10), self.EveryDayBeforeMarketClose)
#        self.Schedule.On(self.DateRules.On(self.EndDate.year, self.EndDate.month, self.EndDate.day),  
#                                self.TimeRules.At(0, 0),  
#                                self.Liquidate)
        self.SetWarmup(500)                 
                 
        self.day = 0

    def CoarseSelection(self, coarse):
        sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
        symbols = [ x.Symbol for x in sortedByDollarVolume if x.Price > 50 and x.DollarVolume > 1000000000 ]
        self.symbols = symbols[:self.num_equities]
         
        return [x for x in self.symbols]

    # returns Numpy array with normalized prices
    def Normalize(self, df_history):
        scaler = MinMaxScaler()
        scaler.fit(df_history)
        history = scaler.transform(df_history)

        history = pd.DataFrame(history)
        history.columns = df_history.columns.values.tolist()

        return history

    # returns array with cointegrated pairs
    def CreateCointegratedPairs(self, stocks, df_log_prices):
        selected_pairs = []
        selected_stocks = []

        for s1 in stocks:
            for s2 in stocks:
                if (s1 != s2) and (s1 not in selected_stocks) and (s2 not in selected_stocks):
                    if (coint(df_log_prices[s1], df_log_prices[s2])[1] < 0.1):
                        selected_stocks.append(s1)
                        selected_stocks.append(s2)
                        
                        selected_pairs.append((s1, s2))
                        
#        self.Debug("selected_pairs=" + str(selected_pairs))
                        
        return selected_pairs

    # returns array with cointegrated pairs
    def SelectGrangerPairs(self, selected_pairs, df_log_prices):
        maxlag = 1
        limit = 0.1
        
        selected_pairs_gc = []
        
        for index, pair in enumerate(selected_pairs):
            s1 = pair[0]
            s2 = pair[1]
            
            if s1 in df_log_prices.columns and s2 in df_log_prices.columns:
                gct12 = grangercausalitytests(df_log_prices[[s1, s2]], maxlag=maxlag)
                pvals12 = [gct12[x][0]['ssr_ftest'][1] for x in range(1, maxlag + 1)]
                pvals12 = np.array(pvals12)

                if len(pvals12[pvals12 < limit]) > 0:
                    selected_pairs_gc.append((s1, s2))
                else:  # switch Granger-leader and Granger-follower
                    gct21 = grangercausalitytests(df_log_prices[[s2, s1]], maxlag=maxlag)
                    pvals21 = [gct21[x][0]['ssr_ftest'][1] for x in range(1, maxlag + 1)]
                    pvals21 = np.array(pvals21)
                
                    if len(pvals21[pvals21 < limit]) > 0:
                        selected_pairs_gc.append((s2, s1))

#        self.Debug("selected_pairs_gc=" + str(selected_pairs_gc))

        return selected_pairs_gc
    
    def CalculateWeights(self, selected_pairs, df_log_prices):
        r = 1  # standard deviation threshold
        
        positions = pd.Series()

        for index, pair in enumerate(selected_pairs):
            s1 = pair[0]
            s2 = pair[1]
            
            self.AddEquity(s1)
            self.AddEquity(s2)

            if s1 in df_log_prices.columns and s2 in df_log_prices.columns:
                model = sm.OLS(df_log_prices[s1], sm.add_constant(df_log_prices[s2]))
                res = model.fit()
                mu = res.resid.mean()  # spread historical mean
                sigma = res.resid.std()  # spread historical sd
                # calculate spread
                spread = df_log_prices[s1] - res.predict(sm.add_constant(df_log_prices[s2]))
                spread = spread.iloc[-1]
                
                if spread > mu + r * sigma:
                    positions[s1] = -1
                    positions[s2] = 1
                elif spread < mu - r * sigma:
                    positions[s1] = 1
                    positions[s2] = -1
                else:
                    positions[s1] = 0
                    positions[s2] = 0

#       # Return the weights for each selected stock
        if positions.abs().sum() == 0:
            weights = positions
        else:
            weights = positions * (0.95 / positions.abs().sum())

#        self.Debug("weights=" + str(weights))

        return weights
    
    def EveryDayBeforeMarketClose(self):
        if not self.IsWarmingUp and self.Time.date():
            self.Trade()

    def Trade(self):
        df_history = self.History(self.symbols, self.lookback, Resolution.Daily).close.unstack(level=0)
        df_history_last = self.History(self.symbols, 1, Resolution.Minute).close.unstack(level=0)

        df_history = pd.concat([df_history, df_history_last]) 

        df_history = df_history.dropna('columns')

        log_prices = self.Normalize(df_history)

        self.day += 1
        if self.selected_pairs is None or len(self.selected_pairs) or self.day % 10 == 0:
            stocks = df_history.columns
            pairs = self.CreateCointegratedPairs(stocks, log_prices)
            self.selected_pairs = self.SelectGrangerPairs(pairs, log_prices)

        weights = self.CalculateWeights(self.selected_pairs, log_prices)

        self.Debug("weights=" + str(weights))

        portfolioTargets = []
        for symbol, weight in weights.items():
            if weight != 0:
                self.Securities[symbol].FeeModel = ConstantFeeModel(0)
                portfolioTargets.append(PortfolioTarget(symbol, weight))

        self.SetHoldings(portfolioTargets, True)

    def OnEndOfAlgorithm(self):
        self.Liquidate()