| Overall Statistics |
|
Total Orders 1100 Average Win 0.03% Average Loss -0.01% Compounding Annual Return -0.802% Drawdown 2.200% Expectancy -0.353 Start Equity 50000 End Equity 49069.84 Net Profit -1.860% Sharpe Ratio -5.804 Sortino Ratio -2.592 Probabilistic Sharpe Ratio 0.000% Loss Rate 84% Win Rate 16% Profit-Loss Ratio 3.00 Alpha -0.059 Beta -0.002 Annual Standard Deviation 0.01 Annual Variance 0 Information Ratio -1.068 Tracking Error 0.135 Treynor Ratio 24.271 Total Fees $1100.00 Estimated Strategy Capacity $1000000.00 Lowest Capacity Asset USMV V0WRDXSSH205 Portfolio Turnover 12.09% |
#region imports
from AlgorithmImports import *
import statsmodels.formula.api as sm
from statsmodels.tsa.stattools import coint, adfuller
#endregion
class Pairs(object):
def __init__(self, a, b):
self.a = a
self.b = b
self.Name = f'{a.Symbol.Value}:{b.Symbol.Value}'
self.Model = None
self.MeanError = 0
self.StandardDeviation = 0
self.Epsilon = 0
@property
def DataFrame(self):
df = pd.concat([self.a.DataFrame.droplevel([0]), self.b.DataFrame.droplevel([0])], axis=1).dropna()
df.columns = [self.a.Symbol.Value, self.b.Symbol.Value]
return df
@property
def Correlation(self):
return self.DataFrame.corr().iloc[0][1]
def cointegration_test(self):
coint_test = coint(self.a.Series.values.flatten(), self.b.Series.values.flatten(), trend="n", maxlag=0)
# Return if not cointegrated
if coint_test[1] >= 0.05:
return False
self.Model = sm.ols(formula = f'{self.a.Symbol.Value} ~ {self.b.Symbol.Value}', data=self.DataFrame).fit()
self.StationaryP = adfuller(self.Model.resid, autolag = 'BIC')[1]
self.MeanError = np.mean(self.Model.resid)
self.Epsilon = np.std(self.Model.resid)
return True#region imports
from AlgorithmImports import *
#endregion
class SymbolData(object):
def __init__(self, algorithm, symbol, lookback, interval):
lookback = int(lookback)
self.Symbol = symbol
self.Prices = RollingWindow[TradeBar](lookback // interval)
self.Series = None
self.DataFrame = None
self._algorithm = algorithm
self._consolidator = TradeBarConsolidator(timedelta(minutes=interval))
self._consolidator.DataConsolidated += self.OnDataConsolidated
history = algorithm.History(symbol, lookback, Resolution.Minute)
for bar in history.itertuples():
trade_bar = TradeBar(bar.Index[1], symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
self.Update(trade_bar)
@property
def IsReady(self):
return self.Prices.IsReady
def Update(self, trade_bar):
self._consolidator.Update(trade_bar)
def OnDataConsolidated(self, sender, consolidated):
self.Prices.Add(consolidated)
if self.IsReady:
self.Series = self._algorithm.PandasConverter.GetDataFrame[TradeBar](self.Prices)['close']
self.DataFrame = self.Series.to_frame()#region imports
from AlgorithmImports import *
#endregion
class TradingPair(object):
def __init__(self, ticket_a, ticket_b, intercept, slope, mean_error, epsilon):
self.ticket_a = ticket_a
self.ticket_b = ticket_b
self.model_intercept = intercept
self.model_slope = slope
self.mean_error = mean_error
self.epsilon = epsilon#region imports
from AlgorithmImports import *
from Pair import *
from SymbolData import *
from TradingPair import *
from itertools import combinations
import statsmodels.tsa.stattools as ts
#endregion
class PairsTrading(QCAlgorithm):
symbols = ['IVV', 'USMV', 'QUAL', 'DGRO' , 'DVY', 'VLUE', 'MTUM']
num_bar = 390*21*3
interval = 10
pair_num = 10
leverage = 0.5
min_corr_threshold = 0.9
open_size = 1.32
close_size = 0.5
stop_loss_size = 6
def Initialize(self):
self.SetStartDate(2023, 1, 1)
self.SetEndDate(2025, 5, 1)
self.SetCash(50000)
self.symbol_data = {}
self.pair_list = []
self.selected_pair = []
self.trading_pairs = {}
self.regenerate_time = datetime.min
for ticker in self.symbols:
symbol = self.AddEquity(ticker, Resolution.Minute).Symbol
self.symbol_data[symbol] = SymbolData(self, symbol, self.num_bar, self.interval)
for pair in combinations(self.symbol_data.items(), 2):
if pair[0][1].IsReady and pair[1][1].IsReady:
self.pair_list.append(Pairs(pair[0][1], pair[1][1]))
def GeneratePairs(self):
selected_pair = []
for pair in self.pair_list:
# correlation selection
if pair.Correlation < self.min_corr_threshold:
continue
# cointegration selection
coint = pair.cointegration_test()
if coint and pair.StationaryP < 0.05:
selected_pair.append(pair)
if len(selected_pair) == 0:
self.Debug('No selected pair')
return []
selected_pair.sort(key = lambda x: x.Correlation, reverse = True)
if len(selected_pair) > self.pair_num:
selected_pair = selected_pair[:self.pair_num]
selected_pair.sort(key = lambda x: x.StationaryP)
return selected_pair
def OnData(self, data):
for symbol, symbolData in self.symbol_data.items():
if data.Bars.ContainsKey(symbol):
symbolData.Update(data.Bars[symbol])
# generate pairs with correlation and cointegration selection
if self.regenerate_time < self.Time:
self.selected_pair = self.GeneratePairs()
self.regenerate_time = self.Time + timedelta(days=5)
# closing existing position
for pair, trading_pair in self.trading_pairs.copy().items():
# close: if not correlated nor cointegrated anymore
if pair not in self.selected_pair:
self.MarketOrder(pair.a.Symbol, -trading_pair.ticket_a.Quantity)
self.MarketOrder(pair.b.Symbol, -trading_pair.ticket_b.Quantity)
self.trading_pairs.pop(pair)
self.Debug(f'Close {pair.Name}')
continue
# get current cointegrated series deviation from mean
error = pair.a.Prices[0].Close - (trading_pair.model_intercept + trading_pair.model_slope * pair.b.Prices[0].Close)
# close: when the cointegrated series is deviated less than 0.5 SD from its mean
if trading_pair.ticket_a.Quantity > 0 \
and (error > trading_pair.mean_error - self.close_size * trading_pair.epsilon \
or error < trading_pair.mean_error - self.stop_loss_size * trading_pair.epsilon):
self.MarketOrder(pair.a.Symbol, -trading_pair.ticket_a.Quantity)
self.MarketOrder(pair.b.Symbol, -trading_pair.ticket_b.Quantity)
self.trading_pairs.pop(pair)
self.Debug(f'Close {pair.Name}')
elif trading_pair.ticket_a.Quantity < 0 \
and (error < trading_pair.mean_error + self.close_size * trading_pair.epsilon \
or error > trading_pair.mean_error + self.stop_loss_size * trading_pair.epsilon):
self.MarketOrder(pair.a.Symbol, -trading_pair.ticket_a.Quantity)
self.MarketOrder(pair.b.Symbol, -trading_pair.ticket_b.Quantity)
self.trading_pairs.pop(pair)
self.Debug(f'Close {pair.Name}')
# entry: when the cointegrated series is deviated by more than 2.32 SD from its mean
for pair in self.selected_pair:
# get current cointegrated series deviation from mean
price_a = pair.a.Prices[0].Close
price_b = pair.b.Prices[0].Close
error = price_a - (pair.Model.params[0] + pair.Model.params[1] * price_b)
if pair not in self.trading_pairs:
if error < pair.MeanError - self.open_size * pair.Epsilon:
qty_a = self.CalculateOrderQuantity(symbol, self.leverage/self.pair_num / 2)
qty_b = self.CalculateOrderQuantity(symbol, -self.leverage/self.pair_num / 2)
ticket_a = self.MarketOrder(pair.a.Symbol, qty_a)
ticket_b = self.MarketOrder(pair.b.Symbol, qty_b)
self.trading_pairs[pair] = TradingPair(ticket_a, ticket_b, pair.Model.params[0], pair.Model.params[1], pair.MeanError, pair.Epsilon)
self.Debug(f'Long {qty_a} {pair.a.Symbol.Value} and short {qty_b} {pair.b.Symbol.Value}')
elif error > pair.MeanError + self.open_size * pair.Epsilon:
qty_a = self.CalculateOrderQuantity(symbol, -self.leverage/self.pair_num / 2)
qty_b = self.CalculateOrderQuantity(symbol, self.leverage/self.pair_num / 2)
ticket_a = self.MarketOrder(pair.a.Symbol, qty_a)
ticket_b = self.MarketOrder(pair.b.Symbol, qty_b)
self.trading_pairs[pair] = TradingPair(ticket_a, ticket_b, pair.Model.params[0], pair.Model.params[1], pair.MeanError, pair.Epsilon)
self.Debug(f'Long {qty_b} {pair.b.Symbol.Value} and short {qty_a} {pair.a.Symbol.Value}')