Overall Statistics |
Total Trades 62 Average Win 1.96% Average Loss -0.02% Compounding Annual Return 99.044% Drawdown 30.700% Expectancy 75.255 Net Profit 50.556% Sharpe Ratio 1.551 Probabilistic Sharpe Ratio 53.518% Loss Rate 20% Win Rate 80% Profit-Loss Ratio 94.32 Alpha 0 Beta 0 Annual Standard Deviation 0.602 Annual Variance 0.362 Information Ratio 1.551 Tracking Error 0.602 Treynor Ratio 0 Total Fees $393.11 Estimated Strategy Capacity $3000.00 Lowest Capacity Asset EEV TXA0MGR7LUG5 |
import numpy as np from scipy import stats from statsmodels.distributions.empirical_distribution import ECDF from scipy.stats import kendalltau, pearsonr, spearmanr from scipy.optimize import minimize from scipy.integrate import quad import sys from collections import deque from AlgorithmImports import * class CopulaPairsTradingAlgorithm(QCAlgorithm): def Initialize(self): self.SetStartDate(2022, 1, 1) self.SetCash(100000) self.numdays = 1000 # length of formation period which determine the copula we use self.lookbackdays = 50 # length of history data in trading period self.cap_CL = 0.90 # cap confidence level self.floor_CL = 0.1 # floor confidence level self.per_order_proportion = 0.45 self.window = {} self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin) self.day = 0 # keep track of current day for daily rebalance self.month = 0 self.pair = [] self.trailing_stop = 0.01 self.stock1 = self.AddEquity("SPY", Resolution.Daily).Symbol self.UniverseSettings.Resolution = Resolution.Daily self.AddUniverse('PairUniverse', self.PairSelection) self.current_state = 0 self.stopTickets = {} # keep track of current month for monthly recalculation of optimal trading pair self.Schedule.On(self.DateRules.EveryDay(self.stock1), self.TimeRules.AfterMarketOpen(self.stock1,1), self.EveryDayAfterMarketOpen) def EveryDayAfterMarketOpen(self): self.turn_trading_on = True def OnData(self, slice): self.SetSignal(slice) # only executed at first day of each month if self.Time.day == self.day: return long, short = self.pair[0], self.pair[1] self.stopTargets = {short:0,long:0} for kvp in self.Securities: symbol = kvp.Key if symbol in self.pair: price = kvp.Value.Price self.window[symbol].append(price) if len(self.window[long]) < 2 or len(self.window[short]) < 2: return MI_u_v, MI_v_u = self._misprice_index() if MI_u_v < self.floor_CL and MI_v_u > self.cap_CL and self.current_state!=1: self.LiquidateAll() if self.Portfolio.UnsettledCash==0: quantity = self.CalculateOrderQuantity(short,self.per_order_proportion) status = self.MarketOrder(short, quantity*-1) stopPrice = round((1+self.trailing_stop)*self.Securities[short].Price,2) self.stopTickets[short] = self.StopMarketOrder(short, quantity, stopPrice,'Exited using Sell Stop') self.stopTargets[short] = self.Securities[short].Price quantity = self.CalculateOrderQuantity(long,self.per_order_proportion) status = self.MarketOrder(long, quantity*1) stopPrice = round((1-self.trailing_stop)*self.Securities[long].Price,2) self.stopTickets[long] = self.StopMarketOrder(long, quantity*-1, stopPrice,'Exited using Sell Stop') self.stopTargets[long] = self.Securities[long].Price self.update_stop(short,long) self.current_state = 1 else: self.current_state = 0 elif MI_u_v > self.cap_CL and MI_v_u < self.floor_CL and self.current_state!=-1: self.LiquidateAll() if self.Portfolio.UnsettledCash==0: quantity = self.CalculateOrderQuantity(short,self.per_order_proportion) status = self.MarketOrder(short, quantity*1) stopPrice = round((1+self.trailing_stop)*self.Securities[short].Price,2) self.stopTickets[short] = self.StopMarketOrder(short, quantity, stopPrice,'Exited using Sell Stop') self.stopTargets[short] = self.Securities[short].Price quantity = self.CalculateOrderQuantity(long,self.per_order_proportion) status = self.MarketOrder(long, quantity*-1) stopPrice = round((1-self.trailing_stop)*self.Securities[long].Price,2) self.stopTickets[long] = self.StopMarketOrder(long, quantity*-1, stopPrice,'Exited using Sell Stop') self.stopTargets[long] = self.Securities[long].Price self.current_state = -1 self.update_stop(short,long) else: self.current_state = 0 self.day = self.Time.day def SetSignal(self, slice): if self.Time.month == self.month: return logreturns = self._get_historical_returns(self.pair, self.numdays) x, y = logreturns[str(self.pair[0])], logreturns[str(self.pair[1])] # Convert the two returns series to two uniform values u and v using the empirical distribution functions ecdf_x, ecdf_y = ECDF(x), ECDF(y) u, v = [ecdf_x(a) for a in x], [ecdf_y(a) for a in y] # Compute the Akaike Information Criterion (AIC) for different copulas and choose copula with minimum AIC tau = kendalltau(x, y)[0] # estimate Kendall'rank correlation AIC ={} # generate a dict with key being the copula family, value = [theta, AIC] for i in ['clayton', 'frank', 'gumbel']: param = self._parameter(i, tau) lpdf = [self._lpdf_copula(i, param, x, y) for (x, y) in zip(u, v)] # Replace nan with zero and inf with finite numbers in lpdf list lpdf = np.nan_to_num(lpdf) loglikelihood = sum(lpdf) AIC[i] = [param, -2 * loglikelihood + 2] # Choose the copula with the minimum AIC self.copula = min(AIC.items(), key = lambda x: x[1][1])[0] logreturns = logreturns.tail(self.lookbackdays) x, y = logreturns[str(self.pair[0])], logreturns[str(self.pair[1])] self.theta = self._parameter(self.copula, tau) self.ecdf_x, self.ecdf_y = ECDF(x), ECDF(y) self.month = self.Time.month def PairSelection(self, date): if date.month == self.month: return Universe.Unchanged symbols = [ Symbol.Create(x, SecurityType.Equity, Market.USA) for x in [ "EDZ", "EEV"] ] logreturns = self._get_historical_returns(symbols, self.lookbackdays) for i in range(0, len(symbols), 2): x = logreturns[str(symbols[i])] y = logreturns[str(symbols[i+1])] self.pair = symbols[i:i+2] return [x.Value for x in self.pair] def OnSecuritiesChanged(self, changes): for security in changes.RemovedSecurities: symbol = security.Symbol self.window.pop(symbol) if security.Invested: self.Liquidate(symbol, "Removed from Universe") for security in changes.AddedSecurities: self.window[security.Symbol] = deque(maxlen = 2) history = self.History(list(self.window.keys()), 2, Resolution.Daily) history = history.close.unstack(level=0) for symbol in self.window: self.window[symbol].append(history[str(symbol)][0]) def _get_historical_returns(self, symbols, period): history = self.History(symbols, period, Resolution.Daily) history = history.close.unstack(level=0) return (np.log(history) - np.log(history.shift(1))).dropna() def _parameter(self, family, tau): if family == 'clayton': return 2 * tau / (1 - tau) elif family == 'frank': integrand = lambda t: t / (np.exp(t) - 1) # generate the integrand frank_fun = lambda theta: ((tau - 1) / 4.0 - (quad(integrand, sys.float_info.epsilon, theta)[0] / theta - 1) / theta) ** 2 return minimize(frank_fun, 4, method='BFGS', tol=1e-5).x elif family == 'gumbel': return 1 / (1 - tau) def _lpdf_copula(self, family, theta, u, v): if family == 'clayton': pdf = (theta + 1) * ((u ** (-theta) + v ** (-theta) - 1) ** (-2 - 1 / theta)) * (u ** (-theta - 1) * v ** (-theta - 1)) elif family == 'frank': num = -theta * (np.exp(-theta) - 1) * (np.exp(-theta * (u + v))) denom = ((np.exp(-theta * u) - 1) * (np.exp(-theta * v) - 1) + (np.exp(-theta) - 1)) ** 2 pdf = num / denom elif family == 'gumbel': A = (-np.log(u)) ** theta + (-np.log(v)) ** theta c = np.exp(-A ** (1 / theta)) pdf = c * (u * v) ** (-1) * (A ** (-2 + 2 / theta)) * ((np.log(u) * np.log(v)) ** (theta - 1)) * (1 + (theta - 1) * A ** (-1 / theta)) return np.log(pdf) def _misprice_index(self): return_x = np.log(self.window[self.pair[0]][-1] / self.window[self.pair[0]][-2]) return_y = np.log(self.window[self.pair[1]][-1] / self.window[self.pair[1]][-2]) u = self.ecdf_x(return_x) v = self.ecdf_y(return_y) if self.copula == 'clayton': MI_u_v = v ** (-self.theta - 1) * (u ** (-self.theta) + v ** (-self.theta) - 1) ** (-1 / self.theta - 1) # P(U<u|V=v) MI_v_u = u ** (-self.theta - 1) * (u ** (-self.theta) + v ** (-self.theta) - 1) ** (-1 / self.theta - 1) # P(V<v|U=u) elif self.copula == 'frank': A = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta * v) - 1) B = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta * u) - 1) C = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta) - 1) MI_u_v = B / C MI_v_u = A / C elif self.copula == 'gumbel': A = (-np.log(u)) ** self.theta + (-np.log(v)) ** self.theta C_uv = np.exp(-A ** (1 / self.theta)) # C_uv is gumbel copula function C(u,v) MI_u_v = C_uv * (A ** ((1 - self.theta) / self.theta)) * (-np.log(v)) ** (self.theta - 1) * (1.0 / v) MI_v_u = C_uv * (A ** ((1 - self.theta) / self.theta)) * (-np.log(u)) ** (self.theta - 1) * (1.0 / u) return MI_u_v, MI_v_u def update_stop(self, buy_sym, sell_sym): if self.Securities[buy_sym].Price > self.stopTargets[buy_sym] and self.stopTargets[buy_sym]!=0: updateFields = UpdateOrderFields() updateFields.StopPrice = round(self.Securities[buy_sym].Price*(1-self.trailing_stop),2) self.stopTickets[buy_sym].Update(updateFields) self.stopTargets[buy_sym] = self.Securities[buy_sym].Price if self.Securities[sell_sym].Price < self.stopTargets[sell_sym] and self.stopTargets[sell_sym]!=0: updateFields = UpdateOrderFields() updateFields.StopPrice = round(self.Securities[sell_sym].Price*(1+self.trailing_stop),2) self.stopTickets[sell_sym].Update(updateFields) self.stopTargets[sell_sym] = self.Securities[sell_sym].Price #CREAMOS FUNCION DE LIQUIDATEALL() PARA HACER NUESTRA POSICION DE CIERRE def LiquidateAll(self): self.Liquidate() self.Transactions.CancelOpenOrders(self.pair[0]) self.Transactions.CancelOpenOrders(self.pair[1])