Overall Statistics Total Trades26Average Win0.65%Average Loss-0.31%Compounding Annual Return26.924%Drawdown2.100%Expectancy0.544Net Profit2.023%Sharpe Ratio3.011Probabilistic Sharpe Ratio74.391%Loss Rate50%Win Rate50%Profit-Loss Ratio2.09Alpha0.093Beta0.232Annual Standard Deviation0.062Annual Variance0.004Information Ratio-2.585Tracking Error0.081Treynor Ratio0.798Total Fees$96.59Estimated Strategy Capacity$600000.00Lowest Capacity AssetPBCP RH8GLI8VB8TH
#region imports
from AlgorithmImports import *
import statsmodels.formula.api as sm
#endregion

class Pairs(object):

def __init__(self, a, b):
self.a = a
self.b = b
self.Name = f'{a.Symbol.Value}:{b.Symbol.Value}'

self.Model = None
self.MeanError = 0
self.StandardDeviation = 0
self.Epsilon = 0

@property
def DataFrame(self):
df = pd.concat([self.a.DataFrame.droplevel([0]), self.b.DataFrame.droplevel([0])], axis=1).dropna()
df.columns = [self.a.Symbol.Value, self.b.Symbol.Value]
return df

@property
def Correlation(self):
return self.DataFrame.corr().iloc[0][1]

def cointegration_test(self):
coint_test = coint(self.a.Series.values.flatten(), self.b.Series.values.flatten(), trend="n", maxlag=0)

# Return if not cointegrated
if coint_test[1] >= 0.05:
return False

self.Model = sm.ols(formula = f'{self.a.Symbol.Value} ~ {self.b.Symbol.Value}', data=self.DataFrame).fit()
self.StationaryP = adfuller(self.Model.resid, autolag = 'BIC')[1]
self.MeanError = np.mean(self.Model.resid)
self.Epsilon = np.std(self.Model.resid)

return True
#region imports
from AlgorithmImports import *
#endregion

class SymbolData(object):

def __init__(self, algorithm, symbol, lookback, interval):
lookback = int(lookback)
self.Symbol = symbol
self.Series = None
self.DataFrame = None

self._algorithm = algorithm
self._consolidator.DataConsolidated += self.OnDataConsolidated

history = algorithm.History(symbol, lookback, Resolution.Minute)
for bar in history.itertuples():

@property

def OnDataConsolidated(self, sender, consolidated):
self.DataFrame = self.Series.to_frame()
#region imports
from AlgorithmImports import *
#endregion

def __init__(self, ticket_a, ticket_b, intercept, slope, mean_error, epsilon):
self.ticket_a = ticket_a
self.ticket_b = ticket_b

self.model_intercept = intercept
self.model_slope = slope

self.mean_error = mean_error
self.epsilon = epsilon
#region imports
from AlgorithmImports import *
from Pair import *
from SymbolData import *

from itertools import combinations
import statsmodels.tsa.stattools as ts
#endregion

symbols = [
'ING', 'TBC', 'BMA', 'PB', 'FBC', 'STL', 'FCF', 'PFS', 'BOH', 'SCNB',
'BK', 'CMA', 'AF', 'PNC', 'KB', 'SHG', 'BSAC', 'CIB', 'BBD', 'BSBR'
]
num_bar = 390*21*3
interval = 10
pair_num = 10
leverage = 1
min_corr_threshold = 0.9
open_size = 2.32
close_size = 0.5
stop_loss_size = 6

def Initialize(self):
self.SetStartDate(2013, 9, 1)
self.SetEndDate(2013, 10, 1)
self.SetCash(50000)

self.symbol_data = {}
self.pair_list = []
self.selected_pair = []
self.regenerate_time = datetime.min

for ticker in self.symbols:
self.symbol_data[symbol] = SymbolData(self, symbol, self.num_bar, self.interval)

for pair in combinations(self.symbol_data.items(), 2):
self.pair_list.append(Pairs(pair[0][1], pair[1][1]))

def GeneratePairs(self):
selected_pair = []
for pair in self.pair_list:
# correlation selection
if pair.Correlation < self.min_corr_threshold:
continue

# cointegration selection
coint = pair.cointegration_test()
if coint and pair.StationaryP < 0.05:
selected_pair.append(pair)

if len(selected_pair) == 0:
self.Debug('No selected pair')
return []

selected_pair.sort(key = lambda x: x.Correlation, reverse = True)
if len(selected_pair) > self.pair_num:
selected_pair = selected_pair[:self.pair_num]
selected_pair.sort(key = lambda x: x.StationaryP)

return selected_pair

def OnData(self, data):
for symbol, symbolData in self.symbol_data.items():
if data.Bars.ContainsKey(symbol):
symbolData.Update(data.Bars[symbol])

# generate pairs with correlation and cointegration selection
if self.regenerate_time < self.Time:
self.selected_pair = self.GeneratePairs()
self.regenerate_time = self.Time + timedelta(days=5)

# closing existing position
# close: if not correlated nor cointegrated anymore
if pair not in self.selected_pair:
self.Debug(f'Close {pair.Name}')
continue

# get current cointegrated series deviation from mean

# close: when the cointegrated series is deviated less than 0.5 SD from its mean
self.Debug(f'Close {pair.Name}')

self.Debug(f'Close {pair.Name}')

# entry: when the cointegrated series is deviated by more than 2.32 SD from its mean
for pair in self.selected_pair:
# get current cointegrated series deviation from mean
price_a = pair.a.Prices[0].Close
price_b = pair.b.Prices[0].Close
error = price_a - (pair.Model.params[0] + pair.Model.params[1] * price_b)

if error < pair.MeanError - self.open_size * pair.Epsilon:
qty_a = self.CalculateOrderQuantity(symbol, self.leverage/self.pair_num / 2)
qty_b = self.CalculateOrderQuantity(symbol, -self.leverage/self.pair_num / 2)
ticket_a = self.MarketOrder(pair.a.Symbol, qty_a)
ticket_b = self.MarketOrder(pair.b.Symbol, qty_b)

self.Debug(f'Long {qty_b} {pair.b.Symbol.Value} and short {qty_a} {pair.a.Symbol.Value}')