Overall Statistics
Total Trades
62
Average Win
1.96%
Average Loss
-0.02%
Compounding Annual Return
99.044%
Drawdown
30.700%
Expectancy
75.255
Net Profit
50.556%
Sharpe Ratio
1.551
Probabilistic Sharpe Ratio
53.518%
Loss Rate
20%
Win Rate
80%
Profit-Loss Ratio
94.32
Alpha
0
Beta
0
Annual Standard Deviation
0.602
Annual Variance
0.362
Information Ratio
1.551
Tracking Error
0.602
Treynor Ratio
0
Total Fees
$393.11
Estimated Strategy Capacity
$3000.00
Lowest Capacity Asset
EEV TXA0MGR7LUG5
import numpy as np
from scipy import stats
from statsmodels.distributions.empirical_distribution import ECDF
from scipy.stats import kendalltau, pearsonr, spearmanr
from scipy.optimize import minimize
from scipy.integrate import quad
import sys
from collections import deque
from AlgorithmImports import *

class CopulaPairsTradingAlgorithm(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2022, 1, 1)
        self.SetCash(100000)
        self.numdays = 1000       # length of formation period which determine the copula we use
        self.lookbackdays = 50   # length of history data in trading period
        self.cap_CL = 0.90       # cap confidence level
        self.floor_CL = 0.1  # floor confidence level
        self.per_order_proportion = 0.45 
        self.window = {}
        self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
        self.day = 0              # keep track of current day for daily rebalance
        self.month = 0  
        self.pair = []
        self.trailing_stop = 0.01
        
        self.stock1 = self.AddEquity("SPY", Resolution.Daily).Symbol 

        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverse('PairUniverse', self.PairSelection)
        self.current_state = 0
        self.stopTickets = {}          # keep track of current month for monthly recalculation of optimal trading pair
        self.Schedule.On(self.DateRules.EveryDay(self.stock1), self.TimeRules.AfterMarketOpen(self.stock1,1), self.EveryDayAfterMarketOpen)

    def EveryDayAfterMarketOpen(self):
        self.turn_trading_on = True

    def OnData(self, slice):
        self.SetSignal(slice)     # only executed at first day of each month
        if self.Time.day == self.day:
            return
        long, short = self.pair[0], self.pair[1]
        self.stopTargets = {short:0,long:0}
        for kvp in self.Securities:
            symbol = kvp.Key
            if symbol in self.pair:
                price = kvp.Value.Price
                self.window[symbol].append(price)

        if len(self.window[long]) < 2 or len(self.window[short]) < 2:
            return
        
        MI_u_v, MI_v_u = self._misprice_index()
        if MI_u_v < self.floor_CL and MI_v_u > self.cap_CL and self.current_state!=1:
            self.LiquidateAll()
            if self.Portfolio.UnsettledCash==0:
                quantity = self.CalculateOrderQuantity(short,self.per_order_proportion)
                status = self.MarketOrder(short, quantity*-1)
                stopPrice = round((1+self.trailing_stop)*self.Securities[short].Price,2)
                self.stopTickets[short] = self.StopMarketOrder(short, quantity, stopPrice,'Exited using Sell Stop')
                self.stopTargets[short] = self.Securities[short].Price

                quantity = self.CalculateOrderQuantity(long,self.per_order_proportion)
                status = self.MarketOrder(long, quantity*1)
                stopPrice = round((1-self.trailing_stop)*self.Securities[long].Price,2)
                self.stopTickets[long] = self.StopMarketOrder(long, quantity*-1, stopPrice,'Exited using Sell Stop')
                self.stopTargets[long] = self.Securities[long].Price
                self.update_stop(short,long)
                self.current_state = 1
            else:
                self.current_state = 0
        

        elif MI_u_v > self.cap_CL and MI_v_u < self.floor_CL and self.current_state!=-1:
            self.LiquidateAll()
            if self.Portfolio.UnsettledCash==0:
                quantity = self.CalculateOrderQuantity(short,self.per_order_proportion)
                status = self.MarketOrder(short, quantity*1)
                stopPrice = round((1+self.trailing_stop)*self.Securities[short].Price,2)
                self.stopTickets[short] = self.StopMarketOrder(short, quantity, stopPrice,'Exited using Sell Stop')
                self.stopTargets[short] = self.Securities[short].Price

                quantity = self.CalculateOrderQuantity(long,self.per_order_proportion)
                status = self.MarketOrder(long, quantity*-1)
                stopPrice = round((1-self.trailing_stop)*self.Securities[long].Price,2)
                self.stopTickets[long] = self.StopMarketOrder(long, quantity*-1, stopPrice,'Exited using Sell Stop')
                self.stopTargets[long] = self.Securities[long].Price
                self.current_state = -1
                self.update_stop(short,long)
            else:
                self.current_state = 0
        self.day = self.Time.day

    def SetSignal(self, slice):
        if self.Time.month == self.month:
            return
        logreturns = self._get_historical_returns(self.pair, self.numdays)
        x, y = logreturns[str(self.pair[0])], logreturns[str(self.pair[1])]

        # Convert the two returns series to two uniform values u and v using the empirical distribution functions
        ecdf_x, ecdf_y  = ECDF(x), ECDF(y)
        u, v = [ecdf_x(a) for a in x], [ecdf_y(a) for a in y]
        
        # Compute the Akaike Information Criterion (AIC) for different copulas and choose copula with minimum AIC
        tau = kendalltau(x, y)[0]  # estimate Kendall'rank correlation
        AIC ={}  # generate a dict with key being the copula family, value = [theta, AIC]
        
        for i in ['clayton', 'frank', 'gumbel']:
            param = self._parameter(i, tau)
            lpdf = [self._lpdf_copula(i, param, x, y) for (x, y) in zip(u, v)]
            # Replace nan with zero and inf with finite numbers in lpdf list
            lpdf = np.nan_to_num(lpdf) 
            loglikelihood = sum(lpdf)
            AIC[i] = [param, -2 * loglikelihood + 2]
            
        # Choose the copula with the minimum AIC
        self.copula = min(AIC.items(), key = lambda x: x[1][1])[0]
        logreturns = logreturns.tail(self.lookbackdays)
        x, y = logreturns[str(self.pair[0])], logreturns[str(self.pair[1])]
        
        self.theta = self._parameter(self.copula, tau)
        self.ecdf_x, self.ecdf_y  = ECDF(x), ECDF(y) 
        self.month = self.Time.month
        
    def PairSelection(self, date):
        if date.month == self.month:
            return Universe.Unchanged
        symbols = [ Symbol.Create(x, SecurityType.Equity, Market.USA) 
                    for x in [ "EDZ", "EEV"] ]
        logreturns = self._get_historical_returns(symbols, self.lookbackdays)       
    
        for i in range(0, len(symbols), 2): 
            x = logreturns[str(symbols[i])]
            y = logreturns[str(symbols[i+1])]  
            self.pair = symbols[i:i+2]
        return [x.Value for x in self.pair]

    def OnSecuritiesChanged(self, changes):
        for security in changes.RemovedSecurities:
            symbol = security.Symbol
            self.window.pop(symbol)
            if security.Invested:
                self.Liquidate(symbol, "Removed from Universe")
        
        for security in changes.AddedSecurities:
            self.window[security.Symbol] = deque(maxlen = 2)
        
        history = self.History(list(self.window.keys()), 2, Resolution.Daily)
        history = history.close.unstack(level=0)
        for symbol in self.window:
            self.window[symbol].append(history[str(symbol)][0])

        
    def _get_historical_returns(self, symbols, period):
        history = self.History(symbols, period, Resolution.Daily)
        history = history.close.unstack(level=0)
        return (np.log(history) - np.log(history.shift(1))).dropna()
        
        
    def _parameter(self, family, tau):
        if  family == 'clayton':
            return 2 * tau / (1 - tau)        
        elif family == 'frank':
            integrand = lambda t: t / (np.exp(t) - 1)  # generate the integrand
            frank_fun = lambda theta: ((tau - 1) / 4.0  - (quad(integrand, sys.float_info.epsilon, theta)[0] / theta - 1) / theta) ** 2
            return minimize(frank_fun, 4, method='BFGS', tol=1e-5).x 
        elif family == 'gumbel':
            return 1 / (1 - tau)
            
    def _lpdf_copula(self, family, theta, u, v):
        if  family == 'clayton':
            pdf = (theta + 1) * ((u ** (-theta) + v ** (-theta) - 1) ** (-2 - 1 / theta)) * (u ** (-theta - 1) * v ** (-theta - 1))    
        elif family == 'frank':
            num = -theta * (np.exp(-theta) - 1) * (np.exp(-theta * (u + v)))
            denom = ((np.exp(-theta * u) - 1) * (np.exp(-theta * v) - 1) + (np.exp(-theta) - 1)) ** 2
            pdf = num / denom   
        elif family == 'gumbel':
            A = (-np.log(u)) ** theta + (-np.log(v)) ** theta
            c = np.exp(-A ** (1 / theta))
            pdf = c * (u * v) ** (-1) * (A ** (-2 + 2 / theta)) * ((np.log(u) * np.log(v)) ** (theta - 1)) * (1 + (theta - 1) * A ** (-1 / theta))    
        return np.log(pdf)

    def _misprice_index(self):  
        return_x = np.log(self.window[self.pair[0]][-1] / self.window[self.pair[0]][-2])
        return_y = np.log(self.window[self.pair[1]][-1] / self.window[self.pair[1]][-2])
        
        u = self.ecdf_x(return_x)
        v = self.ecdf_y(return_y)
        
        if self.copula == 'clayton':
            MI_u_v = v ** (-self.theta - 1) * (u ** (-self.theta) + v ** (-self.theta) - 1) ** (-1 / self.theta - 1) # P(U<u|V=v)
            MI_v_u = u ** (-self.theta - 1) * (u ** (-self.theta) + v ** (-self.theta) - 1) ** (-1 / self.theta - 1) # P(V<v|U=u)
        elif self.copula == 'frank':
            A = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta * v) - 1)
            B = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta * u) - 1)
            C = (np.exp(-self.theta * u) - 1) * (np.exp(-self.theta * v) - 1) + (np.exp(-self.theta) - 1)
            MI_u_v = B / C
            MI_v_u = A / C
        elif self.copula == 'gumbel':
            A = (-np.log(u)) ** self.theta + (-np.log(v)) ** self.theta
            C_uv = np.exp(-A ** (1 / self.theta))   # C_uv is gumbel copula function C(u,v)
            MI_u_v = C_uv * (A ** ((1 - self.theta) / self.theta)) * (-np.log(v)) ** (self.theta - 1) * (1.0 / v)
            MI_v_u = C_uv * (A ** ((1 - self.theta) / self.theta)) * (-np.log(u)) ** (self.theta - 1) * (1.0 / u)
            
        return MI_u_v, MI_v_u

    def update_stop(self, buy_sym, sell_sym):
        if self.Securities[buy_sym].Price > self.stopTargets[buy_sym] and self.stopTargets[buy_sym]!=0:
            updateFields = UpdateOrderFields()
            updateFields.StopPrice = round(self.Securities[buy_sym].Price*(1-self.trailing_stop),2)
            self.stopTickets[buy_sym].Update(updateFields)
            self.stopTargets[buy_sym] = self.Securities[buy_sym].Price 
        if self.Securities[sell_sym].Price < self.stopTargets[sell_sym] and self.stopTargets[sell_sym]!=0:
            updateFields = UpdateOrderFields()
            updateFields.StopPrice = round(self.Securities[sell_sym].Price*(1+self.trailing_stop),2)
            self.stopTickets[sell_sym].Update(updateFields)
            self.stopTargets[sell_sym] = self.Securities[sell_sym].Price 

#CREAMOS FUNCION DE LIQUIDATEALL() PARA HACER NUESTRA POSICION DE CIERRE     
    def LiquidateAll(self):
        self.Liquidate()
        self.Transactions.CancelOpenOrders(self.pair[0])
        self.Transactions.CancelOpenOrders(self.pair[1])