| Overall Statistics |
|
Total Trades 470 Average Win 0.37% Average Loss -0.36% Compounding Annual Return 4.283% Drawdown 3.100% Expectancy 0.191 Net Profit 18.235% Sharpe Ratio 1.226 Probabilistic Sharpe Ratio 68.852% Loss Rate 41% Win Rate 59% Profit-Loss Ratio 1.03 Alpha 0.029 Beta 0.009 Annual Standard Deviation 0.024 Annual Variance 0.001 Information Ratio -0.553 Tracking Error 0.167 Treynor Ratio 3.448 Total Fees $1824.02 Estimated Strategy Capacity $130000000.00 Lowest Capacity Asset PM U1EP4KZP5ROL Portfolio Turnover 2.02% |
#region imports
from AlgorithmImports import *
#endregion
from datetime import timedelta, datetime
import math
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
import numpy as np
import pandas as pd
import scipy.optimize as so
import scipy.integrate as si
import scipy.stats as ss
from math import log,exp,sqrt
import matplotlib.pyplot as plt
from scipy.integrate import quad
class PairsTradingAlgorithm(QCAlgorithm):
def Initialize(self):
#if True:
self.pairs_list = [
['JNJ', 'ABBV'],
['DUK', 'AEP'],
['NKE', 'SBUX'],
['SPGI', 'MA'],
['DLR', 'CCI'],
['PM','PG'],
['TMO', 'UNH'],
['COP', 'EOG']]
#['ADBE', 'MSFT'],
#['SRE','AEP']]
self.SetStartDate(2017, 1, 1)
self.SetEndDate(2021,1,1)
self.AddEquity("SPY", Resolution.Daily)
self.fast = self.SMA("SPY", 7)
#self.med = self.SMA("SPY", 14)
self.slow = self.SMA("SPY",20)
self.capital = 1000000
self.SetCash(self.capital)
self.SetWarmup(252)
self.X = None
self.num_MC = 1000
self.iteration = 3
self.enter = 2 # Set the enter threshold
self.risk_level = 2
self.exit = 0 # Set the exit threshold
self.lookback = 100 # Set the loockback period 90 days
self.dt = 1/self.lookback
self.wt = 1/len(self.pairs_list)
#new code below for list of pairs
self.z = 0
self.trading_ou = False
self.symbols_list =[]
for ticker1, ticker2 in self.pairs_list:
u1 = self.AddEquity(ticker1, Resolution.Daily).Symbol
u2 = self.AddEquity(ticker2, Resolution.Daily).Symbol
self.symbols_list.append([self.Symbol(ticker1),self.Symbol(ticker2)])
def likelihood(self,params,*args):
theta, mu, sigma = params
X,dt = args
n= len(X)
sigma_tilde_squared = (sigma ** 2) * (1- exp(-2 * mu * dt))/(2 * mu)
Sum = 0
for i in range(1,len(X)):
Sum = Sum + (X[i] - X[i -1] * exp(-mu * dt) - theta*(1-exp(-mu * dt)))**2
Sum = -Sum / (2*n*sigma_tilde_squared)
loglikelihood = -0.5 * log(2 * math.pi) - log(sqrt(sigma_tilde_squared)) + Sum
return -loglikelihood
def MLE(self,X,dt,tol = 1e-10):
bounds = ((None,None),(1e-5,None),(1e-5,None)) # bondary for OU parameters
theta_init = X.mean()
initial_guess = (theta_init,1,1)
result = so.minimize(self.likelihood,initial_guess,args = (X,dt),bounds = bounds)
theta,mu,sigma = result.x
return theta,mu,sigma
def OU_process_generator(self,mu,theta,sigma,N,iteration):
self.X = np.zeros((iteration,N))
p_5 = 0
p_50 = 0
p_95 = 0
for j in range(iteration):
for i in range(1,N):
W = ss.norm.rvs( loc=0, scale=1, size = 1)
self.X[j,i] = self.X[j,i-1] + theta*(mu - self.X[j,i-1]) * self.dt + sigma * np.sqrt(self.dt) * W
for i in range(iteration):
p_5 = p_5 + np.percentile(self.X[i],5)
p_50 = p_50 + np.percentile(self.X[i],50)
p_95 = p_95 + np.percentile(self.X[i],95)
return [p_5/iteration,p_50/iteration,p_95/iteration]
def stats(self, symbols):
#Use Statsmodels package to compute linear regression and ADF statistics
self.df = self.History(symbols, self.lookback)
self.dg = self.df["close"].unstack(level=0)
#self.Debug(self.dg)
ticker1= str(symbols[0])
ticker2= str(symbols[1])
Y = self.dg[ticker1].apply(lambda x: math.log(x))
X = self.dg[ticker2].apply(lambda x: math.log(x))
#self.Debug(f"Now regressing {ticker1} {ticker2}")
X = sm.add_constant(X)
model = sm.OLS(Y,X)
results = model.fit()
sigma = math.sqrt(results.mse_resid) # standard deviation of the residual
slope = results.params[1]
intercept = results.params[0]
res = results.resid #regression residual mean of res =0 by definition
zscore = res/sigma
adf = adfuller (res)
return [adf, zscore, slope, res]
def OnData(self, data):
if self.IsWarmingUp:
return
for pairs in self.pairs_list:
stats = self.stats([self.Symbol(pairs[0]),self.Symbol(pairs[1])])
self.beta = stats[2]
self.z= stats[1][-1]
res = stats[3]
#self.Debug(stats[1].values)
params = self.MLE(stats[1].values,self.dt)
#self.Debug(params)
threshold = self.OU_process_generator(params[0],params[1],params[2],self.num_MC,self.iteration)
#self.Debug(threshold)
#self.Debug(self.wt)
#self.Debug( 1 * self.wt/(1+self.beta))
#self.Debug( 1 * -self.beta * self.wt/(1+self.beta))
#self.Debug(self.beta)
#self.Debug(stats[0])
#self.Debug(self.Portfolio[self.Symbol(pairs[0])].HoldingsValue)
#self.Debug(self.Portfolio[self.Symbol(pairs[1])].HoldingsValue)
#self.Debug('z-score: '+ str(self.z))
if 0.5 <self.beta < 5:
if (not self.Portfolio[self.Symbol(pairs[0])].Invested) and self.z > threshold[2]:
self.Liquidate('SPY')
self.SetHoldings(pairs[0], - 1 * self.wt/(1+self.beta))
self.SetHoldings(pairs[1], self.beta * self.wt/(1+self.beta))
if (not self.Portfolio[self.Symbol(pairs[0])].Invested) and self.z < threshold[0]:
self.Liquidate('SPY')
self.SetHoldings(pairs[0], 1 * self.wt/(1+self.beta))
self.SetHoldings(pairs[1], -self.beta * self.wt/(1+self.beta))
if (self.Portfolio[self.Symbol(pairs[0])].IsShort and self.z < (threshold[2]+threshold[1])/4) or (self.Portfolio[self.Symbol(pairs[1])].IsShort and self.z > (threshold[1]+threshold[0])/4) :
self.Liquidate(pairs[0])
self.Liquidate(pairs[1])
l = self.fast.Current.Value > self.slow.Current.Value
#if self.Portfolio.TotalHoldingsValue == 0:
#if l:
#self.SetHoldings("SPY", 1)
#else:
# self.SetHoldings("SPY", -1)