| Overall Statistics |
|
Total Trades 11 Average Win 0% Average Loss -0.14% Compounding Annual Return -42.649% Drawdown 1.000% Expectancy -1 Net Profit -0.607% Sharpe Ratio -6.864 Probabilistic Sharpe Ratio 0% Loss Rate 100% Win Rate 0% Profit-Loss Ratio 0 Alpha 0.168 Beta -13.112 Annual Standard Deviation 0.067 Annual Variance 0.004 Information Ratio -7.042 Tracking Error 0.072 Treynor Ratio 0.035 Total Fees $0.00 Estimated Strategy Capacity $260000.00 Lowest Capacity Asset TSLA UNU3P8Y3WFAD |
# region imports
from AlgorithmImports import *
# endregion
import numpy as np
import pandas as pd
import statsmodels.api as sm
from sklearn.decomposition import PCA
from sklearn.preprocessing import *
from statsmodels.tsa.stattools import adfuller, coint, grangercausalitytests
import statsmodels.api as sm
import datetime
# https://medium.com/@financialnoob/granger-causality-test-in-pairs-trading-bf2fd939e575
class PairTradingGrangerAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 2, 8)
self.SetEndDate(2019, 2, 11)
self.SetCash(100000)
self.lookback = 500
self.num_equities = 20
self.selected_pairs = None
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverse(self.CoarseSelection)
# self.SetSecurityInitializer(lambda x: x.SetDataNormalizationMode(DataNormalizationMode.Raw))
self.AddEquity("SPY")
# self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 20), self.Liquidate)
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.BeforeMarketClose("SPY", 10), self.EveryDayBeforeMarketClose)
# self.Schedule.On(self.DateRules.On(self.EndDate.year, self.EndDate.month, self.EndDate.day),
# self.TimeRules.At(0, 0),
# self.Liquidate)
self.SetWarmup(500)
self.day = 0
def CoarseSelection(self, coarse):
sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
symbols = [ x.Symbol for x in sortedByDollarVolume if x.Price > 50 and x.DollarVolume > 1000000000 ]
self.symbols = symbols[:self.num_equities]
return [x for x in self.symbols]
# returns Numpy array with normalized prices
def Normalize(self, df_history):
scaler = MinMaxScaler()
scaler.fit(df_history)
history = scaler.transform(df_history)
history = pd.DataFrame(history)
history.columns = df_history.columns.values.tolist()
return history
# returns array with cointegrated pairs
def CreateCointegratedPairs(self, stocks, df_log_prices):
selected_pairs = []
selected_stocks = []
for s1 in stocks:
for s2 in stocks:
if (s1 != s2) and (s1 not in selected_stocks) and (s2 not in selected_stocks):
if (coint(df_log_prices[s1], df_log_prices[s2])[1] < 0.1):
selected_stocks.append(s1)
selected_stocks.append(s2)
selected_pairs.append((s1, s2))
# self.Debug("selected_pairs=" + str(selected_pairs))
return selected_pairs
# returns array with cointegrated pairs
def SelectGrangerPairs(self, selected_pairs, df_log_prices):
maxlag = 1
limit = 0.1
selected_pairs_gc = []
for index, pair in enumerate(selected_pairs):
s1 = pair[0]
s2 = pair[1]
if s1 in df_log_prices.columns and s2 in df_log_prices.columns:
gct12 = grangercausalitytests(df_log_prices[[s1, s2]], maxlag=maxlag)
pvals12 = [gct12[x][0]['ssr_ftest'][1] for x in range(1, maxlag + 1)]
pvals12 = np.array(pvals12)
if len(pvals12[pvals12 < limit]) > 0:
selected_pairs_gc.append((s1, s2))
else: # switch Granger-leader and Granger-follower
gct21 = grangercausalitytests(df_log_prices[[s2, s1]], maxlag=maxlag)
pvals21 = [gct21[x][0]['ssr_ftest'][1] for x in range(1, maxlag + 1)]
pvals21 = np.array(pvals21)
if len(pvals21[pvals21 < limit]) > 0:
selected_pairs_gc.append((s2, s1))
# self.Debug("selected_pairs_gc=" + str(selected_pairs_gc))
return selected_pairs_gc
def CalculateWeights(self, selected_pairs, df_log_prices):
r = 1 # standard deviation threshold
positions = pd.Series()
for index, pair in enumerate(selected_pairs):
s1 = pair[0]
s2 = pair[1]
self.AddEquity(s1)
self.AddEquity(s2)
if s1 in df_log_prices.columns and s2 in df_log_prices.columns:
model = sm.OLS(df_log_prices[s1], sm.add_constant(df_log_prices[s2]))
res = model.fit()
mu = res.resid.mean() # spread historical mean
sigma = res.resid.std() # spread historical sd
# calculate spread
spread = df_log_prices[s1] - res.predict(sm.add_constant(df_log_prices[s2]))
spread = spread.iloc[-1]
if spread > mu + r * sigma:
positions[s1] = -1
positions[s2] = 1
elif spread < mu - r * sigma:
positions[s1] = 1
positions[s2] = -1
else:
positions[s1] = 0
positions[s2] = 0
# # Return the weights for each selected stock
if positions.abs().sum() == 0:
weights = positions
else:
weights = positions * (0.95 / positions.abs().sum())
# self.Debug("weights=" + str(weights))
return weights
def EveryDayBeforeMarketClose(self):
if not self.IsWarmingUp and self.Time.date():
self.Trade()
def Trade(self):
df_history = self.History(self.symbols, self.lookback, Resolution.Daily).close.unstack(level=0)
df_history_last = self.History(self.symbols, 1, Resolution.Minute).close.unstack(level=0)
df_history = pd.concat([df_history, df_history_last])
df_history = df_history.dropna('columns')
log_prices = self.Normalize(df_history)
self.day += 1
if self.selected_pairs is None or len(self.selected_pairs) or self.day % 10 == 0:
stocks = df_history.columns
pairs = self.CreateCointegratedPairs(stocks, log_prices)
self.selected_pairs = self.SelectGrangerPairs(pairs, log_prices)
weights = self.CalculateWeights(self.selected_pairs, log_prices)
self.Debug("weights=" + str(weights))
portfolioTargets = []
for symbol, weight in weights.items():
if weight != 0:
self.Securities[symbol].FeeModel = ConstantFeeModel(0)
portfolioTargets.append(PortfolioTarget(symbol, weight))
self.SetHoldings(portfolioTargets, True)
def OnEndOfAlgorithm(self):
self.Liquidate()