Overall Statistics
Total Orders
26
Average Win
0.67%
Average Loss
-0.32%
Compounding Annual Return
27.939%
Drawdown
2.100%
Expectancy
0.547
Start Equity
50000
End Equity
51045.81
Net Profit
2.092%
Sharpe Ratio
2.888
Sortino Ratio
4.977
Probabilistic Sharpe Ratio
74.043%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
2.09
Alpha
0.095
Beta
0.232
Annual Standard Deviation
0.064
Annual Variance
0.004
Information Ratio
-2.445
Tracking Error
0.083
Treynor Ratio
0.796
Total Fees
$104.00
Estimated Strategy Capacity
$560000.00
Lowest Capacity Asset
PBCP RH8GLI8VB8TH
Portfolio Turnover
29.03%
#region imports
from AlgorithmImports import *

from itertools import combinations
import statsmodels.tsa.stattools as ts

from pair import Pairs
from symbol_data import SymbolData
from trading_pair import TradingPair
#endregion


class PairsTrading(QCAlgorithm):     

    _symbols = [
            'ING', 'TBC', 'BMA', 'PB', 'FBC', 'STL', 'FCF', 'PFS', 'BOH', 'SCNB',
            'BK', 'CMA', 'AF', 'PNC', 'KB', 'SHG', 'BSAC', 'CIB', 'BBD', 'BSBR'
        ]
    _num_bar = 390*21*3
    _interval = 10
    _pair_num = 10
    _leverage = 1
    _min_corr_threshold = 0.9
    _open_size = 2.32
    _close_size = 0.5
    _stop_loss_size = 6

    def initialize(self):
        self.set_start_date(2013, 9, 1)
        self.set_end_date(2013, 10, 1)
        self.set_cash(50000)
        
        self._symbol_data = {}
        self._pair_list = []
        self._selected_pair = []
        self._trading_pairs = {}
        self._regenerate_time = datetime.min

        for ticker in self._symbols:
            symbol = self.add_equity(ticker, Resolution.MINUTE).symbol
            self._symbol_data[symbol] = SymbolData(self, symbol, self._num_bar, self._interval)
        
        for pair in combinations(self._symbol_data.items(), 2):
            if pair[0][1].is_ready and pair[1][1].is_ready:
                self._pair_list.append(Pairs(pair[0][1], pair[1][1]))

    def _generate_pairs(self):
        selected_pair = []
        for pair in self._pair_list:
            # correlation selection 
            if pair.correlation() < self._min_corr_threshold:
                continue

            # cointegration selection 
            coint = pair.cointegration_test()
            if coint and pair.stationary_p < 0.05:
                selected_pair.append(pair)

        if len(selected_pair) == 0:
            self.debug('No selected pair')
            return []

        selected_pair.sort(key = lambda x: x.correlation(), reverse = True)
        if len(selected_pair) > self._pair_num:
            selected_pair = selected_pair[:self._pair_num]
        selected_pair.sort(key = lambda x: x.stationary_p)

        return selected_pair

    def on_data(self, data):
        for symbol, symbolData in self._symbol_data.items():
            if data.bars.contains_key(symbol):
                symbolData.update(data.bars[symbol])

        # generate pairs with correlation and cointegration selection 
        if self._regenerate_time < self.time:
            self._selected_pair = self._generate_pairs()
            self._regenerate_time = self.time + timedelta(days=5)

        # closing existing position
        for pair, trading_pair in self._trading_pairs.copy().items():
            # close: if not correlated nor cointegrated anymore
            if pair not in self._selected_pair:
                self.market_order(pair.a.symbol, -trading_pair.ticket_a.quantity)
                self.market_order(pair.b.symbol, -trading_pair.ticket_b.quantity)
                self._trading_pairs.pop(pair)
                self.debug(f'Close {pair.name}')
                continue

            # get current cointegrated series deviation from mean
            error = pair.a.prices[0].close - (trading_pair.model_intercept + trading_pair.model_slope * pair.b.prices[0].close)
            
            # close: when the cointegrated series is deviated less than 0.5 SD from its mean
            if (trading_pair.ticket_a.quantity > 0 and
                (error > trading_pair.mean_error - self._close_size * trading_pair.epsilon or 
                error < trading_pair.mean_error - self._stop_loss_size * trading_pair.epsilon)):
                self.market_order(pair.a.symbol, -trading_pair.ticket_a.quantity)
                self.market_order(pair.b.symbol, -trading_pair.ticket_b.quantity)
                self._trading_pairs.pop(pair)
                self.debug(f'Close {pair.name}')

            elif (trading_pair.ticket_a.quantity < 0 and 
                (error < trading_pair.mean_error + self._close_size * trading_pair.epsilon or 
                error > trading_pair.mean_error + self._stop_loss_size * trading_pair.epsilon)):
                self.market_order(pair.a.symbol, -trading_pair.ticket_a.quantity)
                self.market_order(pair.b.symbol, -trading_pair.ticket_b.quantity)
                self._trading_pairs.pop(pair)
                self.debug(f'Close {pair.name}')

        # entry: when the cointegrated series is deviated by more than 2.32 SD from its mean
        for pair in self._selected_pair:
            # get current cointegrated series deviation from mean
            price_a = pair.a.prices[0].close
            price_b = pair.b.prices[0].close
            error = price_a - (pair.model.params[0] + pair.model.params[1] * price_b)

            if pair not in self._trading_pairs:
                if error < pair.mean_error - self._open_size * pair.epsilon:
                    qty_a = self.calculate_order_quantity(symbol, self._leverage/self._pair_num / 2)
                    qty_b = self.calculate_order_quantity(symbol, -self._leverage/self._pair_num / 2)
                    ticket_a = self.market_order(pair.a.symbol, qty_a)
                    ticket_b = self.market_order(pair.b.symbol, qty_b)
                    
                    self._trading_pairs[pair] = TradingPair(ticket_a, ticket_b, pair.model.params[0], pair.model.params[1], pair.mean_error, pair.epsilon)
                    self.debug(f'Long {qty_a} {pair.a.symbol.value} and short {qty_b} {pair.b.symbol.value}')

                elif error > pair.mean_error + self._open_size * pair.epsilon:
                    qty_a = self.calculate_order_quantity(symbol, -self._leverage/self._pair_num / 2)
                    qty_b = self.calculate_order_quantity(symbol, self._leverage/self._pair_num / 2)
                    ticket_a = self.market_order(pair.a.symbol, qty_a)
                    ticket_b = self.market_order(pair.b.symbol, qty_b)
                    
                    self._trading_pairs[pair] = TradingPair(ticket_a, ticket_b, pair.model.params[0], pair.model.params[1], pair.mean_error, pair.epsilon)
                    self.debug(f'Long {qty_b} {pair.b.symbol.value} and short {qty_a} {pair.a.symbol.value}')
        
#region imports
from AlgorithmImports import *

import statsmodels.formula.api as sm
from statsmodels.tsa.stattools import coint, adfuller
#endregion


class Pairs(object):

    def __init__(self, a, b):
        self.a = a
        self.b = b
        self.name = f'{a.symbol.value}:{b.symbol.value}'

        self.model = None
        self.mean_error = 0
        self.epsilon = 0

    def _data_frame(self):
        df = pd.concat([self.a.data_frame.droplevel([0]), self.b.data_frame.droplevel([0])], axis=1).dropna()
        df.columns = [self.a.symbol.value, self.b.symbol.value]
        return df
    
    def correlation(self):
        return self._data_frame().corr().iloc[0][1]

    def cointegration_test(self):
        coint_test = coint(self.a.series.values.flatten(), self.b.series.values.flatten(), trend="n", maxlag=0)

        # Return if not cointegrated
        if coint_test[1] >= 0.05:
            return False

        self.model = sm.ols(formula = f'{self.a.symbol.value} ~ {self.b.symbol.value}', data=self._data_frame()).fit()
        self.stationary_p = adfuller(self.model.resid, autolag = 'BIC')[1]
        self.mean_error = np.mean(self.model.resid)
        self.epsilon = np.std(self.model.resid)
        
        return True
        
#region imports
from AlgorithmImports import *
#endregion


class SymbolData(object):

    def __init__(self, algorithm, symbol, lookback, interval):
        lookback = int(lookback)
        self.symbol = symbol
        self.prices = RollingWindow[TradeBar](lookback // interval)
        self.series = None
        self.data_frame = None

        self._algorithm = algorithm
        self._consolidator = TradeBarConsolidator(timedelta(minutes=interval))
        self._consolidator.data_consolidated += self._on_data_consolidated

        history = algorithm.history(symbol, lookback, Resolution.Minute)        
        for bar in history.itertuples():
            if np.isnan(bar.volume):
                continue
            trade_bar = TradeBar(bar.Index[1], symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
            self.update(trade_bar)

    @property
    def is_ready(self):
        return self.prices.is_ready
    
    def update(self, trade_bar):
        self._consolidator.update(trade_bar)
    
    def _on_data_consolidated(self, sender, consolidated):
        self.prices.add(consolidated)
        if self.is_ready:
            self.series = self._algorithm.pandas_converter.get_data_frame[TradeBar](self.prices)['close']
            self.data_frame = self.series.to_frame()
        
#region imports
from AlgorithmImports import *
#endregion


class TradingPair(object):

    def __init__(self, ticket_a, ticket_b, intercept, slope, mean_error, epsilon):
        self.ticket_a = ticket_a
        self.ticket_b = ticket_b

        self.model_intercept = intercept
        self.model_slope = slope

        self.mean_error = mean_error
        self.epsilon = epsilon