| Overall Statistics |
|
Total Trades 4824 Average Win 0.02% Average Loss -0.01% Compounding Annual Return -12.729% Drawdown 12.700% Expectancy -0.601 Net Profit -12.718% Sharpe Ratio -10.248 Loss Rate 87% Win Rate 13% Profit-Loss Ratio 2.08 Alpha -0.144 Beta 0.429 Annual Standard Deviation 0.013 Annual Variance 0 Information Ratio -11.748 Tracking Error 0.013 Treynor Ratio -0.316 Total Fees $6352.60 |
import numpy as np
np.warnings.filterwarnings("ignore")
import pandas as pd
import datetime as datetime
import statsmodels.formula.api as sm
import statsmodels.tsa.stattools as ts
np.warnings.resetwarnings()
# generates all possible pairs
# calculates correlation and cointegration for all pairs every day
# every day will try to enter trades if for pairs with good correlation and cointegration and deviation too big/too small
# every day will try to exit trades for pairs with deviation too small/too big
class PairsTrading(QCAlgorithm):
def __init__(self):
self.symbols = ['CORN', 'SOYB', 'PALL', 'BAC', 'BXS', 'T', 'F', 'GM', 'MSFT', 'AAPL', 'IBM']
self.maximalNumberOfPairs = 100000
self.numberOfBars = 40
# correlation selection criteria
self.minimalCorrelation = 0.9
# co-integration selection criteria
self.maximalBIC = -3.5
# number of standard deviations to open
self.open_size = 2
# number of standard deviations to close
self.close_size = 1
self.stop_loss = 6
self.trading_pairs = []
def Initialize(self):
self.SetStartDate(2014, 1, 1)
self.SetEndDate(2015, 1, 1)
self.SetCash(50000)
for i in range(len(self.symbols)):
equity = self.AddEquity(self.symbols[i], Resolution.Minute).Symbol
self.symbols[i] = equity
self.generatedPairs = self.generatePairs(self.symbols, self.numberOfBars)
def OnData(self, data):
# update indicators in pairs
for pair in self.generatedPairs:
pair.update(data)
if len(self.generatedPairs) == 0:
return
selectedPairs = self.selectPairsByCorrelation(self.generatedPairs, self.minimalCorrelation, self.maximalNumberOfPairs)
if len(selectedPairs) == 0:
return
selectedPairs = self.selectPairsByCointegration(selectedPairs, self.maximalBIC, self.maximalNumberOfPairs)
if len(selectedPairs) == 0:
return
self.Log('selectPairsByCointegration pairs= %s' % str(len(selectedPairs)))
# select top maximalNumberOfPairs pairs
if len(selectedPairs) > self.maximalNumberOfPairs:
selectedPairs = selectedPairs[:self.maximalNumberOfPairs]
## enter
for pair in selectedPairs:
if pair.touch == 0:
if pair.error < pair.mean_error - self.open_size * pair.standardDeviation and pair.last_error > pair.mean_error - self.open_size * pair.standardDeviation:
pair.touch += -1
elif pair.error > pair.mean_error + self.open_size * pair.standardDeviation and pair.last_error < pair.mean_error + self.open_size * pair.standardDeviation:
pair.touch += 1
elif pair.touch == -1:
if pair.error > pair.mean_error - self.open_size * pair.standardDeviation and pair.last_error < pair.mean_error - self.open_size * pair.standardDeviation:
self.Log('long %s and short %s' % (str(pair.a),str(pair.b)))
pair.record_model = pair.model
pair.record_mean_error = pair.mean_error
pair.record_sd = pair.standardDeviation
self.trading_pairs.append(pair)
self.SetHoldings(pair.a, 0.2 / (len(selectedPairs)))
self.SetHoldings(pair.b, -0.2 / (len(selectedPairs)))
pair.touch = 0
elif pair.touch == 1:
if pair.error < pair.mean_error + self.open_size * pair.standardDeviation and pair.last_error > pair.mean_error + self.open_size * pair.standardDeviation:
self.Log('long %s and short %s' % (str(pair.b),str(pair.a)))
pair.record_model = pair.model
pair.record_mean_error = pair.mean_error
pair.record_sd = pair.standardDeviation
self.trading_pairs.append(pair)
self.SetHoldings(pair.b, 0.2 / (len(selectedPairs)))
self.SetHoldings(pair.a, -0.2 / (len(selectedPairs)))
pair.touch = 0
else:
# close
for pair in self.trading_pairs:
if ((pair.error < pair.record_mean_error + self.close_size * pair.record_sd and pair.last_error > pair.record_mean_error + self.close_size * pair.record_sd) or (pair.error > pair.record_mean_error - self.close_size * pair.record_sd and pair.last_error < pair.record_mean_error - self.close_size * pair.record_sd)):
self.Log('close %s' % str(pair.name))
self.Liquidate(pair.a)
self.Liquidate(pair.b)
self.trading_pairs.remove(pair)
elif pair.error < pair.record_mean_error - self.stop_loss * pair.record_sd or pair.error > pair.record_mean_error + self.stop_loss * pair.record_sd:
self.Log('close %s to stop loss' % str(pair.name))
self.Liquidate(pair.a)
self.Liquidate(pair.b)
self.trading_pairs.remove(pair)
return
def generatePairs(self, equities, numberOfBars):
# generates all possible pairs from equities
generatedPairs = []
for i in range(len(equities)):
for j in range(i + 1, len(equities)):
generatedPairs.append(Pair(self, equities[i], equities[j], numberOfBars))
self.Log('generated pairs= %s' % str(len(generatedPairs)))
return generatedPairs
def selectPairsByCorrelation(self, pairs, minimalCorrelation, maximalNumberOfPairs):
# returns pairs with correlation > minimalCorrelation, sorted by correlation
# select pairs with correlation > minimalCorrelation
selectedPairs = [pair for pair in pairs if pair.isReady() and pair.correlation > minimalCorrelation]
# sort pairs by correlation, descending
selectedPairs.sort(key = lambda pair: pair.correlation, reverse = True)
return selectedPairs
def selectPairsByCointegration(self, pairs, maximalBIC, maximalNumberOfPairs):
# returns pairs with co-integration ADF < maximalBIC, sorted by co-integration
# select pairs with co-integration ADF < maximalBIC
selectedPairs = [pair for pair in pairs if pair.isReady() and pair.adf < maximalBIC]
# sort pairs by co-integration ADF
selectedPairs.sort(key = lambda pair: pair.adf)
return selectedPairs
class Pair(object):
def __init__(self, algorithm, a, b, numberOfBars):
# pair: stock a, stock b
# stock will contain DataFrame with prices and dates
self.algorithm = algorithm
self.a = a
self.b = b
# keep num_bar of data points
self.numberOfBars = numberOfBars
# name of pair
self.name = str(a) + ':' + str(b)
self.error = 0
self.last_error = 0
self.df = None
self.touch = 0
def calculateCorrelation(self):
# calculate correlation
self.correlation = self.df.corr().ix[0][1]
def calculateCointegration(self):
self.model = sm.ols(formula = '%s ~ %s' % (str(self.a), str(self.b)), data = self.df).fit()
self.adf = ts.adfuller(self.model.resid, autolag = 'BIC')[0]
self.mean_error = np.mean(self.model.resid)
self.standardDeviation = np.std(self.model.resid)
def isReady(self):
return not self.df is None and len(self.df) == self.numberOfBars
def update(self, data):
if not data.ContainsKey(self.a):
self.df = None
return
if not data.ContainsKey(self.b):
self.df = None
return
data_a = data[self.a]
data_b = data[self.b]
try:
priceOfStockA = float(data_a.Close)
priceOfStockB = float(data_b.Close)
except:
self.df = None
return
a_price = []
a_date = []
b_price = []
b_date = []
a_price.append(priceOfStockA)
a_date.append(data_a.EndTime)
b_price.append(priceOfStockB)
b_date.append(data_b.EndTime)
new_df = pd.DataFrame({str(self.a):a_price, str(self.b):b_price}, index = [a_date]).dropna()
if self.df is None:
self.df = new_df
else:
# concatenate existing DataFrame with new data
self.df = pd.concat([self.df, new_df])
# keep numberOfBars of data points
self.df = self.df.tail(self.numberOfBars)
if self.isReady():
self.calculateCorrelation()
self.calculateCointegration()
self.last_error = self.error
self.error = priceOfStockA - (self.model.params[0] + self.model.params[1] * priceOfStockB)