Overall Statistics
Total Trades
6
Average Win
0%
Average Loss
0%
Compounding Annual Return
-4.808%
Drawdown
0.300%
Expectancy
0
Net Profit
-0.054%
Sharpe Ratio
-9.007
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0.001
Beta
-0.755
Annual Standard Deviation
0.004
Annual Variance
0
Information Ratio
-9.22
Tracking Error
0.009
Treynor Ratio
0.046
Total Fees
$6.00
Estimated Strategy Capacity
$4300000.00
Lowest Capacity Asset
CMG TFNWQ96IBZ39
# Pair Trading


from AlgorithmImports import *
import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.decomposition import PCA
from sklearn.preprocessing import *
from statsmodels.tsa.stattools import adfuller, coint, grangercausalitytests
import statsmodels.api as sm
import datetime

# https://medium.com/@financialnoob/granger-causality-test-in-pairs-trading-bf2fd939e575
class PairTradingGrangerAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2019, 2, 8)     
        self.SetEndDate(2019, 2, 11)    
        self.SetCash(100000) 

        self.lookback = 500
        self.num_equities = 20
        
        self.selected_pairs = None
        self.UniverseSettings.Resolution = Resolution.Minute
        self.AddUniverse(self.CoarseSelection) 
        self.AddEquity("SPY")
        self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 10), 
            self.EveryDayBeforeMarketClose)    
        self.day = 0


    def CoarseSelection(self, coarse):
        sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
        symbols = [ x.Symbol for x in sortedByDollarVolume if x.Price > 50 and x.DollarVolume > 1000000000 ]
        self.symbols = symbols[:self.num_equities]
         
        return [x for x in self.symbols]


    def Normalize(self, df_history):
        scaler = MinMaxScaler()
        scaler.fit(df_history)
        history = scaler.transform(df_history)
        history = pd.DataFrame(history)
        history.columns = df_history.columns.values.tolist()

        return history

    # returns array with cointegrated pairs
    def CreateCointegratedPairs(self, stocks, df_log_prices):
        selected_pairs = []
        selected_stocks = []

        for s1 in stocks:
            for s2 in stocks:
                if (s1 != s2) and (s1 not in selected_stocks) and (s2 not in selected_stocks):
                    if (coint(df_log_prices[s1], df_log_prices[s2])[1] < 0.1):
                        selected_stocks.append(s1)
                        selected_stocks.append(s2)
                        
                        selected_pairs.append((s1, s2))
                        
        return selected_pairs

    # returns array with cointegrated pairs
    def SelectGrangerPairs(self, selected_pairs, df_log_prices):
        maxlag = 1
        limit = 0.1
        
        selected_pairs_gc = []
        
        for index, pair in enumerate(selected_pairs):
            s1 = pair[0]
            s2 = pair[1]
            
            if s1 in df_log_prices.columns and s2 in df_log_prices.columns:
                gct12 = grangercausalitytests(df_log_prices[[s1, s2]], maxlag=maxlag)
                pvals12 = [gct12[x][0]['ssr_ftest'][1] for x in range(1, maxlag + 1)]
                pvals12 = np.array(pvals12)

                if len(pvals12[pvals12 < limit]) > 0:
                    selected_pairs_gc.append((s1, s2))
                else:  # switch Granger-leader and Granger-follower
                    gct21 = grangercausalitytests(df_log_prices[[s2, s1]], maxlag=maxlag)
                    pvals21 = [gct21[x][0]['ssr_ftest'][1] for x in range(1, maxlag + 1)]
                    pvals21 = np.array(pvals21)
                
                    if len(pvals21[pvals21 < limit]) > 0:
                        selected_pairs_gc.append((s2, s1))

        return selected_pairs_gc

    
    def CalculateWeights(self, selected_pairs, df_log_prices):
        r = 1  
        
        positions = pd.Series()

        for index, pair in enumerate(selected_pairs):
            s1 = pair[0]
            s2 = pair[1]
            
            self.AddEquity(s1)
            self.AddEquity(s2)

            if s1 in df_log_prices.columns and s2 in df_log_prices.columns:
                model = sm.OLS(df_log_prices[s1], sm.add_constant(df_log_prices[s2]))
                res = model.fit()
                mu = res.resid.mean()  # spread historical mean
                sigma = res.resid.std()  # spread historical sd
                # calculate spread
                spread = df_log_prices[s1] - res.predict(sm.add_constant(df_log_prices[s2]))
                spread = spread.iloc[-1]
                
                if spread > mu + r * sigma:
                    positions[s1] = -1
                    positions[s2] = 1
                elif spread < mu - r * sigma:
                    positions[s1] = 1
                    positions[s2] = -1
                    
                else:
                    positions[s1] = 0
                    positions[s2] = 0
                  

        if positions.abs().sum() == 0:
            weights = positions
        else:
            weights = positions * (0.475 / positions.abs().sum())

        return weights

    
    def EveryDayBeforeMarketClose(self):
        if not self.IsWarmingUp and self.Time.date():
            self.Trade()


    def Trade(self):
        df_history = self.History(self.symbols, self.lookback, Resolution.Daily).close.unstack(level=0)
        df_history_last = self.History(self.symbols, 1, Resolution.Minute).close.unstack(level=0)
        df_history = pd.concat([df_history, df_history_last]) 
        df_history = df_history.dropna('columns')
        log_prices = self.Normalize(df_history)

        self.day += 1
        if self.selected_pairs is None or len(self.selected_pairs) or self.day % 10 == 0:
            stocks = df_history.columns
            pairs = self.CreateCointegratedPairs(stocks, log_prices)
            self.selected_pairs = self.SelectGrangerPairs(pairs, log_prices)

        weights = self.CalculateWeights(self.selected_pairs, log_prices)

        # self.Debug("weights=" + str(weights))

        portfolioTargets = []
        for symbol, weight in weights.items():
            self.SetHoldings(symbol, weight)