| Overall Statistics |
|
Total Trades 673 Average Win 0.38% Average Loss -0.09% Compounding Annual Return -26.704% Drawdown 22.900% Expectancy -0.681 Net Profit -22.861% Sharpe Ratio -1.864 Probabilistic Sharpe Ratio 0.042% Loss Rate 94% Win Rate 6% Profit-Loss Ratio 4.06 Alpha -0.206 Beta -0.12 Annual Standard Deviation 0.102 Annual Variance 0.01 Information Ratio -0.22 Tracking Error 0.247 Treynor Ratio 1.584 Total Fees $5179.44 Estimated Strategy Capacity $35000.00 Lowest Capacity Asset LLY Y5BMQ79D2EUE|LLY R735QTJ8XC9X Portfolio Turnover 107.94% |
#region imports
from AlgorithmImports import *
#endregion
import numpy as np
import pandas as pd
from datetime import timedelta, datetime
import math
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
import statistics
from collections import defaultdict
from nltk.sentiment import SentimentIntensityAnalyzer
'''
Much of the code in this file was copied or based upon code provided by teaching staff on Sakai.
Credit to the authors.
'''
class PairsTradingAlgorithm(QCAlgorithm):
def Initialize(self):
period = 2
if period == 1:
self.SetStartDate(2017, 1, 1)
self.SetEndDate(2021,1,1)
elif period == 2:
self.SetStartDate(2022, 1, 1)
self.SetEndDate(2022,11,1)
elif period == 3:
self.SetStartDate(2016, 1, 1)
self.SetEndDate(2017,1,1)
elif period == 4:
self.SetStartDate(2010, 1, 1)
self.SetEndDate(2011,1,1)
else:
raise Exception()
self.capital = 1000000
self.SetCash(self.capital)
self.enter = 2 # Set the enter threshold
self.risk_level = 2
self.exit = 0 # Set the exit threshold
self.lookback = 100 # Set the loockback period 90 days
#new code below for list of pairs
if True:
self.pairs_list = [["LLY", "DHR"]]
else:
self.pairs_list = [['CTVA', 'NEM'],['APD', 'LIN'],['HD', 'MCD'],['NKE', 'SBUX'],['SPGI', 'MA'],['WFC', 'BAC'], ['DLR', 'CCI'],['AMT', 'PSA'], ['KO', 'MDLZ'],['PM', 'PG'], \
['TMO', 'UNH'],['JNJ', 'ABBV'], ['DUK', 'AEP'],['SO', 'XEL'], ['DIS', 'ATVI'],['VZ', 'T'], ['COP', 'EOG'],['SLB', 'BP'], ['ADBE', 'MSFT'],['TSM', 'ORCL'], \
['CAT', 'BA']]
self.wt_factor = 1/len(self.pairs_list) #this is a naive factor to decrease investments in each pairs trade
self.symbols_list =[]
self.current_options_holdings = {}
self.options = {}
self.sym_options = {}
for ticker1, ticker2 in self.pairs_list:
u1 = self.AddEquity(ticker1, Resolution.Daily).Symbol
u2 = self.AddEquity(ticker2, Resolution.Daily).Symbol
self.symbols_list.append([self.Symbol(ticker1),self.Symbol(ticker2)])
o1 = self.AddOption(u1, Resolution.Daily)
o2 = self.AddOption(u2, Resolution.Daily)
o1.SetFilter(-100, 100, 75, 90)
o2.SetFilter(-100, 100, 75, 90)
self.options[ticker1] = o1.Symbol
self.options[ticker2] = o2.Symbol
self.current_options_holdings[ticker1] = None
self.current_options_holdings[ticker2] = None
self.sym_options[u1] = o1.Symbol
self.sym_options[u2] = o2.Symbol
self.day1 = True
self.OPT_LOOKBEHIND = 60
self.OPT_LOOKAHEAD = 45
self.MC_N_SIMS = 50
self.OPT_QUANTILE = 0.1
self.sia = SentimentIntensityAnalyzer()
self.tiingo_dict = {}
self.pairs_dict = {}
for ticker1, ticker2 in self.symbols_list:
tiingo_symbol1 = self.AddData(TiingoNews, ticker1, Resolution.Daily).Symbol
tiingo_symbol2 = self.AddData(TiingoNews, ticker2, Resolution.Daily).Symbol
self.tiingo_dict[ticker1] = tiingo_symbol1
self.tiingo_dict[ticker2] = tiingo_symbol2
self.pairs_dict[ticker1] = ticker2
self.pairs_dict[ticker2] = ticker1
self.thresholds = self.get_sent_thresholds()
self.time = 0
def get_sent_thresholds(self):
thresholds = defaultdict(list)
for sym in self.tiingo_dict:
scores = []
history = self.History(self.tiingo_dict[sym], 14, Resolution.Daily)
self.Debug(f"We got {len(history)} items from our history request")
for title in history.title:
scores.append(self.sia.polarity_scores(title)['compound'])
self.average = statistics.mean(scores)
self.std = statistics.stdev(scores)
thresholds[sym] = self.average - 3*self.std
return thresholds
def stats(self, symbols):
#Use Statsmodels package to compute linear regression and ADF statistics
self.df = self.History(symbols, self.lookback)
self.dg = self.df["close"].unstack(level=0)
#self.Debug(self.dg)
ticker1= str(symbols[0])
ticker2= str(symbols[1])
Y = self.dg[ticker1].apply(lambda x: math.log(x))
X = self.dg[ticker2].apply(lambda x: math.log(x))
if len(X) != len(Y):
shorter = min(len(X), len(Y))
X = X[:shorter]
Y = Y[:shorter]
self.Debug(f"Now regressing {ticker1} {ticker2}")
X = sm.add_constant(X)
model = sm.OLS(Y,X)
results = model.fit()
sigma = math.sqrt(results.mse_resid) # standard deviation of the residual
slope = results.params[1]
intercept = results.params[0]
res = results.resid #regression residual mean of res =0 by definition
zscore = res/sigma
adf = adfuller (res)
return [adf, zscore, slope]
def OnData(self, slice: Slice):
# self.sym_pair1 = self.symbols_list[0]
# stats1 = self.stats([self.sym_pair1[0], self.sym_pair1[0]])
# self.Debug(stats1)
for sym_pair1 in self.symbols_list:
stats1 = self.stats([sym_pair1[0], sym_pair1[1]])
#self.Debug(stats1)
self.IsInvested = (self.Portfolio[sym_pair1[0]].Invested) or (self.Portfolio[sym_pair1[1]].Invested)
self.ShortSpread = self.Portfolio[sym_pair1[0]].IsShort
self.LongSpread = self.Portfolio[sym_pair1[1]].IsLong
self.beta = stats1[2]
zscore= stats1[1][-1]
self.wt1 = 1/(1+self.beta) * self.wt_factor
self.wt2 = self.beta/(1+self.beta) * self.wt_factor
self.pos1 = self.Portfolio[sym_pair1[0]].Quantity
self.px1 = self.Portfolio[sym_pair1[0]].Price
self.pos2 = self.Portfolio[sym_pair1[1]].Quantity
self.px2 = self.Portfolio[sym_pair1[1]].Price
px_p = self.px1
px_q = self.px2
if self.IsInvested:
if self.ShortSpread and zscore <= self.exit or \
self.LongSpread and zscore >= self.exit or \
zscore >= abs(self.risk_level):
self.Liquidate(sym_pair1[0])
self.Liquidate(sym_pair1[1])
#self.Liquidate(self.current_options_holdings[sym_pair1[1]])
#self.Liquidate(self.current_options_holdings[sym_pair1[0]])
#self.current_options_holdings[sym_pair1[1]] = None
#self.current_options_holdings[sym_pair1[0]] = None
else:
p_shares = self.wt1 * self.capital / px_p
q_shares = self.wt2 * self.capital / px_q
if zscore > self.enter:
#short spread
#rememebr SetHoldings take a Symbol as its first variable.
# Short P, Long Q
self.SetHoldings(sym_pair1[0], -self.wt1)
self.SetHoldings(sym_pair1[1], self.wt2)
l_cn, s_cn = self.insurance(slice, str(sym_pair1[1]), str(sym_pair1[0]))
if l_cn is not None and s_cn is not None:
self.MarketOrder(l_cn.Symbol, q_shares/100)
self.MarketOrder(s_cn.Symbol, p_shares/100)
#self.current_options_holdings[sym_pair1[1]] = s_cn.Symbol
#self.current_options_holdings[sym_pair1[0]] = l_cn.Symbol
if zscore < - self.enter:
#long the spread
# Short Q, Long P
self.SetHoldings(sym_pair1[0], self.wt1)
self.SetHoldings(sym_pair1[1], -self.wt2)
l_cn, s_cn = self.insurance(slice, str(sym_pair1[0]), str(sym_pair1[1]))
if l_cn is not None and s_cn is not None:
self.MarketOrder(l_cn.Symbol, p_shares/100)
self.MarketOrder(s_cn.Symbol, q_shares/100)
#self.current_options_holdings[sym_pair1[0]] = s_cn.Symbol
#self.current_options_holdings[sym_pair1[1]] = l_cn.Symbol
self.pos1 = self.Portfolio[sym_pair1[0]].Quantity
self.pos2 = self.Portfolio[sym_pair1[1]].Quantity
if self.time % 7 ==0:
for sym in self.tiingo_dict:
if slice.ContainsKey(self.tiingo_dict[sym]):
# Assign a sentiment score to the news article
title_words = slice[self.tiingo_dict[sym]].Description.lower()
score = self.sia.polarity_scores(title_words)['compound']
#self.Debug(score)
if score < self.thresholds[sym]:
self.Liquidate(sym)
self.Liquidate(self.pairs_dict[sym])
self.Liquidate(self.sym_options[sym])
self.Liquidate(self.sym_options[self.pairs_dict[sym]])
self.time +=1
def insurance(self, slice, l_sym, s_sym):
"""
l_sym: Ticker (not symbol) of the security we are trying to long
s_sym: Ticker (not symbol) of the secuirty we are trying to short
"""
expiry = self.Time + timedelta(days=self.OPT_LOOKAHEAD)
l_ret = self.get_returns(self.Symbol(l_sym), self.OPT_LOOKBEHIND)
l_px = self.Securities[l_sym].Price
l_mc = monte_carlo(l_px, l_ret, self.OPT_LOOKAHEAD, self.MC_N_SIMS)
low = l_mc[int(np.floor(len(l_mc) * self.OPT_QUANTILE))]
self.Debug(f"Current Px = {l_px}; Worst case px is {low}")
l_cn = self.search_contract(slice, self.options[l_sym], low, expiry, call=False)
rights = ["Call", "Put"]
if l_cn is not None:
self.Debug(f"Best Put Option is # {l_cn}: Strike {l_cn.Strike}; Right {rights[l_cn.Right]}; Expiry {l_cn.Expiry}")
s_ret = self.get_returns(self.Symbol(s_sym), self.OPT_LOOKBEHIND)
s_px = self.Securities[s_sym].Price
s_mc = monte_carlo(s_px, s_ret, self.OPT_LOOKAHEAD, self.MC_N_SIMS)
high = s_mc[int(np.floor(len(s_mc) * (1-self.OPT_QUANTILE)))]
self.Debug(f"Current Px = {s_px}; Worst (short) case px is {high}")
s_cn = self.search_contract(slice, self.options[s_sym], high, expiry, call=True)
if s_cn is not None:
self.Debug(f"Best Call Option is # {s_cn}: Strike {s_cn.Strike}; Right {rights[s_cn.Right]}; Expiry {s_cn.Expiry}")
return l_cn, s_cn
def search_contract(self, slice, optsym, strike, expiry, call=True):
chain = slice.OptionChains.get(optsym)
if chain:
self.Debug("In Chain")
# Select call contracts
if call==True:
right = OptionRight.Call
else:
right = OptionRight.Put
correct_right = [contract for contract in chain if contract.Right == right]
correct_expiry = [contract for contract in correct_right if contract.Expiry >= expiry]
#good_strike = [contract for contract in correct_expiry if abs(contract.Strike - strike) < 10]
# No longer makes sense to do pre-filtering on strike price in this paradigm
good_strike = correct_expiry
# Search through available options to find the one with the lowest strike price in range:
if len(good_strike) == 0:
return None
best_option = good_strike[0]
best_strike = good_strike[0].Strike
for i in range(len(good_strike)):
if abs(good_strike[i].Strike - strike) < abs(best_strike - strike):
best_option = good_strike[i]
best_strike = good_strike[i].Strike
self.Debug(f"There are {len(good_strike)} options that match!")
self.Debug(f"Best option is {best_option}")
return best_option
else:
self.Debug("No Chain")
def get_returns(self, symbol, lookback):
df = self.History(symbol, lookback, Resolution.Daily)
closes = df.close.tolist()
closes = np.array(closes)
returns = closes[1:] / closes[:-1]
return returns
def monte_carlo(current_px, returns, lookforward, num_sims):
results = [None] * num_sims
for i in range(len(results)):
res = sim(current_px, returns, lookforward)
results[i] = res
results.sort()
return results
def sim(current_px, returns, lookforward):
u = np.random.randint(0, high=len(returns), size=lookforward)
simret = [0] * len(u)
for i in range(len(simret)):
simret[i] = returns[u[i]]
totalsimret = np.prod(simret)
return current_px * totalsimret