| Overall Statistics |
|
Total Trades 46 Average Win 0.27% Average Loss -0.18% Compounding Annual Return 9.619% Drawdown 0.500% Expectancy 0.359 Net Profit 2.289% Sharpe Ratio 3.729 Probabilistic Sharpe Ratio 96.048% Loss Rate 45% Win Rate 55% Profit-Loss Ratio 1.47 Alpha 0.064 Beta 0.01 Annual Standard Deviation 0.018 Annual Variance 0 Information Ratio -1.223 Tracking Error 0.14 Treynor Ratio 6.665 Total Fees $108.30 Estimated Strategy Capacity $68000000.00 Lowest Capacity Asset PM U1EP4KZP5ROL Portfolio Turnover 3.13% |
#region imports
from AlgorithmImports import *
#endregion
from datetime import timedelta, datetime
import math
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
import numpy as np
import pandas as pd
import scipy.optimize as so
import scipy.integrate as si
import scipy.stats as ss
from math import log,exp,sqrt
import matplotlib.pyplot as plt
from scipy.integrate import quad
class PairsTradingAlgorithm(QCAlgorithm):
def Initialize(self):
#if True:
self.pairs_list = [
['JNJ', 'ABBV'],
['DUK', 'AEP'],
['NKE', 'SBUX'],
['SPGI', 'MA'],
['DLR', 'CCI'],
['PM','PG'],
['TMO', 'UNH'],
['COP', 'EOG']]
#['ADBE', 'MSFT'],
#['SRE','AEP']]
self.SetStartDate(2017, 1, 1)
self.SetEndDate(2021,1,1)
self.AddEquity("SPY", Resolution.Daily)
self.fast = self.SMA("SPY", 7)
#self.med = self.SMA("SPY", 14)
self.slow = self.SMA("SPY",20)
self.capital = 1000000
self.SetCash(self.capital)
self.SetWarmup(252)
self.X = None
self.num_MC = 1000
self.iteration = 3
self.enter = 2 # Set the enter threshold
self.risk_level = 2
self.exit = 0 # Set the exit threshold
self.lookback = 100 # Set the loockback period 90 days
self.dt = 1/self.lookback
self.wt = 1/len(self.pairs_list)
#new code below for list of pairs
self.z = 0
self.trading_ou = False
self.symbols_list =[]
for ticker1, ticker2 in self.pairs_list:
u1 = self.AddEquity(ticker1, Resolution.Daily).Symbol
u2 = self.AddEquity(ticker2, Resolution.Daily).Symbol
self.symbols_list.append([self.Symbol(ticker1),self.Symbol(ticker2)])
def likelihood(self,params,*args):
theta, mu, sigma = params
X,dt = args
n= len(X)
sigma_tilde_squared = (sigma ** 2) * (1- exp(-2 * mu * dt))/(2 * mu)
Sum = 0
for i in range(1,len(X)):
Sum = Sum + (X[i] - X[i -1] * exp(-mu * dt) - theta*(1-exp(-mu * dt)))**2
Sum = -Sum / (2*n*sigma_tilde_squared)
loglikelihood = -0.5 * log(2 * math.pi) - log(sqrt(sigma_tilde_squared)) + Sum
return -loglikelihood
def MLE(self,X,dt,tol = 1e-10):
bounds = ((None,None),(1e-5,None),(1e-5,None)) # bondary for OU parameters
theta_init = X.mean()
initial_guess = (theta_init,1,1)
result = so.minimize(self.likelihood,initial_guess,args = (X,dt),bounds = bounds)
theta,mu,sigma = result.x
return theta,mu,sigma
def OU_process_generator(self,mu,theta,sigma,N,iteration):
self.X = np.zeros((iteration,N))
p_5 = 0
p_50 = 0
p_95 = 0
for j in range(iteration):
for i in range(1,N):
W = ss.norm.rvs( loc=0, scale=1, size = 1)
self.X[j,i] = self.X[j,i-1] + theta*(mu - self.X[j,i-1]) * self.dt + sigma * np.sqrt(self.dt) * W
for i in range(iteration):
p_5 = p_5 + np.percentile(self.X[i],5)
p_50 = p_50 + np.percentile(self.X[i],50)
p_95 = p_95 + np.percentile(self.X[i],95)
return [p_5/iteration,p_50/iteration,p_95/iteration]
def stats(self, symbols):
#Use Statsmodels package to compute linear regression and ADF statistics
self.df = self.History(symbols, self.lookback)
self.dg = self.df["close"].unstack(level=0)
#self.Debug(self.dg)
ticker1= str(symbols[0])
ticker2= str(symbols[1])
Y = self.dg[ticker1].apply(lambda x: math.log(x))
X = self.dg[ticker2].apply(lambda x: math.log(x))
#self.Debug(f"Now regressing {ticker1} {ticker2}")
X = sm.add_constant(X)
model = sm.OLS(Y,X)
results = model.fit()
sigma = math.sqrt(results.mse_resid) # standard deviation of the residual
slope = results.params[1]
intercept = results.params[0]
res = results.resid #regression residual mean of res =0 by definition
zscore = res/sigma
adf = adfuller (res)
return [adf, zscore, slope, res]
def OnData(self, data):
if self.IsWarmingUp:
return
for pairs in self.pairs_list:
stats = self.stats([self.Symbol(pairs[0]),self.Symbol(pairs[1])])
self.beta = stats[2]
self.z= stats[1][-1]
res = stats[3]
#self.Debug(stats[1].values)
params = self.MLE(stats[1].values,self.dt)
#self.Debug(params)
threshold = self.OU_process_generator(params[0],params[1],params[2],self.num_MC,self.iteration)
#self.Debug(threshold)
#self.Debug(self.wt)
#self.Debug( 1 * self.wt/(1+self.beta))
#self.Debug( 1 * -self.beta * self.wt/(1+self.beta))
#self.Debug(self.beta)
#self.Debug(stats[0])
#self.Debug(self.Portfolio[self.Symbol(pairs[0])].HoldingsValue)
#self.Debug(self.Portfolio[self.Symbol(pairs[1])].HoldingsValue)
#self.Debug('z-score: '+ str(self.z))
if 0.5 <self.beta < 5:
if (not self.Portfolio[self.Symbol(pairs[0])].Invested) and self.z > threshold[2]:
self.Liquidate('SPY')
self.SetHoldings(pairs[0], - 1 * self.wt/(1+self.beta))
self.SetHoldings(pairs[1], self.beta * self.wt/(1+self.beta))
if (not self.Portfolio[self.Symbol(pairs[0])].Invested) and self.z < threshold[0]:
self.Liquidate('SPY')
self.SetHoldings(pairs[0], 1 * self.wt/(1+self.beta))
self.SetHoldings(pairs[1], -self.beta * self.wt/(1+self.beta))
if (self.Portfolio[self.Symbol(pairs[0])].IsShort and self.z < (threshold[2]+threshold[1])/4) or (self.Portfolio[self.Symbol(pairs[1])].IsShort and self.z > (threshold[1]+threshold[0])/4) :
self.Liquidate(pairs[0])
self.Liquidate(pairs[1])
l = self.fast.Current.Value > self.slow.Current.Value
if self.Portfolio.TotalHoldingsValue == 0:
if l:
self.SetHoldings("SPY", 1)
else:
self.SetHoldings("SPY", -1)
#region imports
from AlgorithmImports import *
#endregion
from datetime import timedelta, datetime
import math
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
import numpy as np
import pandas as pd
import scipy.optimize as so
import scipy.integrate as si
import scipy.stats as ss
from math import log,exp,sqrt
import matplotlib.pyplot as plt
from scipy.integrate import quad
class PairsTradingAlgorithm(QCAlgorithm):
def Initialize(self):
#if True:
self.pairs_list = [
['JNJ', 'ABBV'],
['DUK', 'AEP'],
['NKE', 'SBUX'],
['SPGI', 'MA'],
['DLR', 'CCI'],
['PM','PG'],
['TMO', 'UNH'],
['COP', 'EOG']]
#['ADBE', 'MSFT'],
#['SRE','AEP']]
self.SetStartDate(2023, 1, 1)
self.SetEndDate(2023,4,1)
self.AddEquity("SPY", Resolution.Daily)
self.fast = self.SMA("SPY", 7)
#self.med = self.SMA("SPY", 14)
self.slow = self.SMA("SPY",20)
self.capital = 1000000
self.SetCash(self.capital)
self.SetWarmup(252)
self.X = None
self.num_MC = 1000
self.iteration = 3
self.enter = 2 # Set the enter threshold
self.risk_level = 2
self.exit = 0 # Set the exit threshold
self.lookback = 100 # Set the loockback period 90 days
self.dt = 1/self.lookback
self.wt = 1/len(self.pairs_list)
#new code below for list of pairs
self.z = 0
self.trading_ou = False
self.symbols_list =[]
for ticker1, ticker2 in self.pairs_list:
u1 = self.AddEquity(ticker1, Resolution.Daily).Symbol
u2 = self.AddEquity(ticker2, Resolution.Daily).Symbol
self.symbols_list.append([self.Symbol(ticker1),self.Symbol(ticker2)])
def likelihood(self,params,*args):
theta, mu, sigma = params
X,dt = args
n= len(X)
sigma_tilde_squared = (sigma ** 2) * (1- exp(-2 * mu * dt))/(2 * mu)
Sum = 0
for i in range(1,len(X)):
Sum = Sum + (X[i] - X[i -1] * exp(-mu * dt) - theta*(1-exp(-mu * dt)))**2
Sum = -Sum / (2*n*sigma_tilde_squared)
loglikelihood = -0.5 * log(2 * math.pi) - log(sqrt(sigma_tilde_squared)) + Sum
return -loglikelihood
def MLE(self,X,dt,tol = 1e-10):
bounds = ((None,None),(1e-5,None),(1e-5,None)) # bondary for OU parameters
theta_init = X.mean()
initial_guess = (theta_init,1,1)
result = so.minimize(self.likelihood,initial_guess,args = (X,dt),bounds = bounds)
theta,mu,sigma = result.x
return theta,mu,sigma
def OU_process_generator(self,mu,theta,sigma,N,iteration):
self.X = np.zeros((iteration,N))
p_5 = 0
p_50 = 0
p_95 = 0
for j in range(iteration):
for i in range(1,N):
W = ss.norm.rvs( loc=0, scale=1, size = 1)
self.X[j,i] = self.X[j,i-1] + theta*(mu - self.X[j,i-1]) * self.dt + sigma * np.sqrt(self.dt) * W
for i in range(iteration):
p_5 = p_5 + np.percentile(self.X[i],5)
p_50 = p_50 + np.percentile(self.X[i],50)
p_95 = p_95 + np.percentile(self.X[i],95)
return [p_5/iteration,p_50/iteration,p_95/iteration]
def stats(self, symbols):
#Use Statsmodels package to compute linear regression and ADF statistics
self.df = self.History(symbols, self.lookback)
self.dg = self.df["close"].unstack(level=0)
#self.Debug(self.dg)
ticker1= str(symbols[0])
ticker2= str(symbols[1])
Y = self.dg[ticker1].apply(lambda x: math.log(x))
X = self.dg[ticker2].apply(lambda x: math.log(x))
#self.Debug(f"Now regressing {ticker1} {ticker2}")
X = sm.add_constant(X)
model = sm.OLS(Y,X)
results = model.fit()
sigma = math.sqrt(results.mse_resid) # standard deviation of the residual
slope = results.params[1]
intercept = results.params[0]
res = results.resid #regression residual mean of res =0 by definition
zscore = res/sigma
adf = adfuller (res)
return [adf, zscore, slope, res]
def OnData(self, data):
if self.IsWarmingUp:
return
for pairs in self.pairs_list:
stats = self.stats([self.Symbol(pairs[0]),self.Symbol(pairs[1])])
self.beta = stats[2]
self.z= stats[1][-1]
res = stats[3]
#self.Debug(stats[1].values)
params = self.MLE(stats[1].values,self.dt)
#self.Debug(params)
threshold = self.OU_process_generator(params[0],params[1],params[2],self.num_MC,self.iteration)
#self.Debug(threshold)
#self.Debug(self.wt)
#self.Debug( 1 * self.wt/(1+self.beta))
#self.Debug( 1 * -self.beta * self.wt/(1+self.beta))
#self.Debug(self.beta)
#self.Debug(stats[0])
#self.Debug(self.Portfolio[self.Symbol(pairs[0])].HoldingsValue)
#self.Debug(self.Portfolio[self.Symbol(pairs[1])].HoldingsValue)
#self.Debug('z-score: '+ str(self.z))
if 0.5 <self.beta < 5:
if (not self.Portfolio[self.Symbol(pairs[0])].Invested) and self.z > threshold[2]:
self.Liquidate('SPY')
self.SetHoldings(pairs[0], - 1 * self.wt/(1+self.beta))
self.SetHoldings(pairs[1], self.beta * self.wt/(1+self.beta))
if (not self.Portfolio[self.Symbol(pairs[0])].Invested) and self.z < threshold[0]:
self.Liquidate('SPY')
self.SetHoldings(pairs[0], 1 * self.wt/(1+self.beta))
self.SetHoldings(pairs[1], -self.beta * self.wt/(1+self.beta))
if (self.Portfolio[self.Symbol(pairs[0])].IsShort and self.z < (threshold[2]+threshold[1])/4) or (self.Portfolio[self.Symbol(pairs[1])].IsShort and self.z > (threshold[1]+threshold[0])/4) :
self.Liquidate(pairs[0])
self.Liquidate(pairs[1])
l = self.fast.Current.Value > self.slow.Current.Value
#if self.Portfolio.TotalHoldingsValue == 0:
#if l:
#self.SetHoldings("SPY", 1)
#else:
# self.SetHoldings("SPY", -1)