| Overall Statistics |
|
Total Orders 26 Average Win 0.67% Average Loss -0.32% Compounding Annual Return 27.939% Drawdown 2.100% Expectancy 0.547 Start Equity 50000 End Equity 51045.81 Net Profit 2.092% Sharpe Ratio 2.888 Sortino Ratio 4.977 Probabilistic Sharpe Ratio 74.043% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 2.09 Alpha 0.095 Beta 0.232 Annual Standard Deviation 0.064 Annual Variance 0.004 Information Ratio -2.445 Tracking Error 0.083 Treynor Ratio 0.796 Total Fees $104.00 Estimated Strategy Capacity $560000.00 Lowest Capacity Asset PBCP RH8GLI8VB8TH Portfolio Turnover 29.03% |
#region imports
from AlgorithmImports import *
from itertools import combinations
import statsmodels.tsa.stattools as ts
from pair import Pairs
from symbol_data import SymbolData
from trading_pair import TradingPair
#endregion
class PairsTrading(QCAlgorithm):
_symbols = [
'ING', 'TBC', 'BMA', 'PB', 'FBC', 'STL', 'FCF', 'PFS', 'BOH', 'SCNB',
'BK', 'CMA', 'AF', 'PNC', 'KB', 'SHG', 'BSAC', 'CIB', 'BBD', 'BSBR'
]
_num_bar = 390*21*3
_interval = 10
_pair_num = 10
_leverage = 1
_min_corr_threshold = 0.9
_open_size = 2.32
_close_size = 0.5
_stop_loss_size = 6
def initialize(self):
self.set_start_date(2013, 9, 1)
self.set_end_date(2013, 10, 1)
self.set_cash(50000)
self._symbol_data = {}
self._pair_list = []
self._selected_pair = []
self._trading_pairs = {}
self._regenerate_time = datetime.min
for ticker in self._symbols:
symbol = self.add_equity(ticker, Resolution.MINUTE).symbol
self._symbol_data[symbol] = SymbolData(self, symbol, self._num_bar, self._interval)
for pair in combinations(self._symbol_data.items(), 2):
if pair[0][1].is_ready and pair[1][1].is_ready:
self._pair_list.append(Pairs(pair[0][1], pair[1][1]))
def _generate_pairs(self):
selected_pair = []
for pair in self._pair_list:
# correlation selection
if pair.correlation() < self._min_corr_threshold:
continue
# cointegration selection
coint = pair.cointegration_test()
if coint and pair.stationary_p < 0.05:
selected_pair.append(pair)
if len(selected_pair) == 0:
self.debug('No selected pair')
return []
selected_pair.sort(key = lambda x: x.correlation(), reverse = True)
if len(selected_pair) > self._pair_num:
selected_pair = selected_pair[:self._pair_num]
selected_pair.sort(key = lambda x: x.stationary_p)
return selected_pair
def on_data(self, data):
for symbol, symbolData in self._symbol_data.items():
if data.bars.contains_key(symbol):
symbolData.update(data.bars[symbol])
# generate pairs with correlation and cointegration selection
if self._regenerate_time < self.time:
self._selected_pair = self._generate_pairs()
self._regenerate_time = self.time + timedelta(days=5)
# closing existing position
for pair, trading_pair in self._trading_pairs.copy().items():
# close: if not correlated nor cointegrated anymore
if pair not in self._selected_pair:
self.market_order(pair.a.symbol, -trading_pair.ticket_a.quantity)
self.market_order(pair.b.symbol, -trading_pair.ticket_b.quantity)
self._trading_pairs.pop(pair)
self.debug(f'Close {pair.name}')
continue
# get current cointegrated series deviation from mean
error = pair.a.prices[0].close - (trading_pair.model_intercept + trading_pair.model_slope * pair.b.prices[0].close)
# close: when the cointegrated series is deviated less than 0.5 SD from its mean
if (trading_pair.ticket_a.quantity > 0 and
(error > trading_pair.mean_error - self._close_size * trading_pair.epsilon or
error < trading_pair.mean_error - self._stop_loss_size * trading_pair.epsilon)):
self.market_order(pair.a.symbol, -trading_pair.ticket_a.quantity)
self.market_order(pair.b.symbol, -trading_pair.ticket_b.quantity)
self._trading_pairs.pop(pair)
self.debug(f'Close {pair.name}')
elif (trading_pair.ticket_a.quantity < 0 and
(error < trading_pair.mean_error + self._close_size * trading_pair.epsilon or
error > trading_pair.mean_error + self._stop_loss_size * trading_pair.epsilon)):
self.market_order(pair.a.symbol, -trading_pair.ticket_a.quantity)
self.market_order(pair.b.symbol, -trading_pair.ticket_b.quantity)
self._trading_pairs.pop(pair)
self.debug(f'Close {pair.name}')
# entry: when the cointegrated series is deviated by more than 2.32 SD from its mean
for pair in self._selected_pair:
# get current cointegrated series deviation from mean
price_a = pair.a.prices[0].close
price_b = pair.b.prices[0].close
error = price_a - (pair.model.params[0] + pair.model.params[1] * price_b)
if pair not in self._trading_pairs:
if error < pair.mean_error - self._open_size * pair.epsilon:
qty_a = self.calculate_order_quantity(symbol, self._leverage/self._pair_num / 2)
qty_b = self.calculate_order_quantity(symbol, -self._leverage/self._pair_num / 2)
ticket_a = self.market_order(pair.a.symbol, qty_a)
ticket_b = self.market_order(pair.b.symbol, qty_b)
self._trading_pairs[pair] = TradingPair(ticket_a, ticket_b, pair.model.params[0], pair.model.params[1], pair.mean_error, pair.epsilon)
self.debug(f'Long {qty_a} {pair.a.symbol.value} and short {qty_b} {pair.b.symbol.value}')
elif error > pair.mean_error + self._open_size * pair.epsilon:
qty_a = self.calculate_order_quantity(symbol, -self._leverage/self._pair_num / 2)
qty_b = self.calculate_order_quantity(symbol, self._leverage/self._pair_num / 2)
ticket_a = self.market_order(pair.a.symbol, qty_a)
ticket_b = self.market_order(pair.b.symbol, qty_b)
self._trading_pairs[pair] = TradingPair(ticket_a, ticket_b, pair.model.params[0], pair.model.params[1], pair.mean_error, pair.epsilon)
self.debug(f'Long {qty_b} {pair.b.symbol.value} and short {qty_a} {pair.a.symbol.value}')
#region imports
from AlgorithmImports import *
import statsmodels.formula.api as sm
from statsmodels.tsa.stattools import coint, adfuller
#endregion
class Pairs(object):
def __init__(self, a, b):
self.a = a
self.b = b
self.name = f'{a.symbol.value}:{b.symbol.value}'
self.model = None
self.mean_error = 0
self.epsilon = 0
def _data_frame(self):
df = pd.concat([self.a.data_frame.droplevel([0]), self.b.data_frame.droplevel([0])], axis=1).dropna()
df.columns = [self.a.symbol.value, self.b.symbol.value]
return df
def correlation(self):
return self._data_frame().corr().iloc[0][1]
def cointegration_test(self):
coint_test = coint(self.a.series.values.flatten(), self.b.series.values.flatten(), trend="n", maxlag=0)
# Return if not cointegrated
if coint_test[1] >= 0.05:
return False
self.model = sm.ols(formula = f'{self.a.symbol.value} ~ {self.b.symbol.value}', data=self._data_frame()).fit()
self.stationary_p = adfuller(self.model.resid, autolag = 'BIC')[1]
self.mean_error = np.mean(self.model.resid)
self.epsilon = np.std(self.model.resid)
return True
#region imports
from AlgorithmImports import *
#endregion
class SymbolData(object):
def __init__(self, algorithm, symbol, lookback, interval):
lookback = int(lookback)
self.symbol = symbol
self.prices = RollingWindow[TradeBar](lookback // interval)
self.series = None
self.data_frame = None
self._algorithm = algorithm
self._consolidator = TradeBarConsolidator(timedelta(minutes=interval))
self._consolidator.data_consolidated += self._on_data_consolidated
history = algorithm.history(symbol, lookback, Resolution.Minute)
for bar in history.itertuples():
if np.isnan(bar.volume):
continue
trade_bar = TradeBar(bar.Index[1], symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
self.update(trade_bar)
@property
def is_ready(self):
return self.prices.is_ready
def update(self, trade_bar):
self._consolidator.update(trade_bar)
def _on_data_consolidated(self, sender, consolidated):
self.prices.add(consolidated)
if self.is_ready:
self.series = self._algorithm.pandas_converter.get_data_frame[TradeBar](self.prices)['close']
self.data_frame = self.series.to_frame()
#region imports
from AlgorithmImports import *
#endregion
class TradingPair(object):
def __init__(self, ticket_a, ticket_b, intercept, slope, mean_error, epsilon):
self.ticket_a = ticket_a
self.ticket_b = ticket_b
self.model_intercept = intercept
self.model_slope = slope
self.mean_error = mean_error
self.epsilon = epsilon