| Overall Statistics |
|
Total Trades 152 Average Win 0.08% Average Loss -0.14% Compounding Annual Return -2.442% Drawdown 5.200% Expectancy -0.473 Net Profit -4.823% Sharpe Ratio -1.687 Probabilistic Sharpe Ratio 0.000% Loss Rate 67% Win Rate 33% Profit-Loss Ratio 0.60 Alpha -0.017 Beta 0.002 Annual Standard Deviation 0.01 Annual Variance 0 Information Ratio -0.853 Tracking Error 0.124 Treynor Ratio -9.115 Total Fees $498.73 Estimated Strategy Capacity $2100000.00 Lowest Capacity Asset GOOCV VP83T1ZUHROL Portfolio Turnover 10.38% |
# region imports
from AlgorithmImports import *
from QuantConnect.Data.UniverseSelection import *
from AlgorithmImports import *
from enum import Enum
import statsmodels.api as sm
from statsmodels.tsa.stattools import coint, adfuller
# endregion
'''
The Alpha model looks to implement as insights the ADF test statistics. This alpha model is designed to rank every pair combination
by its ADF test and trade the pair with the highest correlation. The base model is the "Black Litterman Optimization Model"
which calculates a ratio between the two securities by dividing their historical prices over a lookback window.
It then calculates the mean of this ratio by taking the 500-period EMA of the quotient. When the ratio diverges
far enough from the mean ratio, this model emits generates alternating long ratio/short ratio insights emitted as a
group to capture the reversion of the ratio.
Finally, we use the mean-variance optimization for the construction of the portfolio in
combination with a Traling Stop for the risk management model.
'''
class DeterminedTanKitten(QCAlgorithm):
lookback = 6*60 # Last trading day
entry_th = 2
my_resolution = Resolution.Minute
def Initialize(self):
self.SetStartDate(2018, 1, 1) # Set Start Date
self.SetEndDate(2020, 1, 1) # Set Start Date
self.SetCash(100000) # Set Strategy Cash
# Broker
self.SetBrokerageModel(BrokerageName.QuantConnectBrokerage, AccountType.Margin)
# Init Universe, use only GOOG and MSFT
self.UniverseSettings.Resolution = self.my_resolution
tickers = ['GOOG', 'MSFT']
tickers = [Symbol.Create(ticker, SecurityType.Equity, Market.USA) for ticker in tickers]
self.AddUniverseSelection(ManualUniverseSelectionModel(tickers))
# Alpha model base: https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/alpha/supported-models#08-Base-Pairs-Trading-Model
self.AddAlpha(ADFPairsTradingAlphaModel(resolution=self.my_resolution,
minimumZScore=self.entry_th))
# Portfolio construction model: https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/portfolio-construction/supported-models#08-Mean-Variance-Optimization-Model
self.SetPortfolioConstruction(MeanVarianceOptimizationPortfolioConstructionModel(resolution=self.my_resolution,
lookback=self.lookback))
# Risk Model :https://www.quantconnect.com/docs/v2/writing-algorithms/algorithm-framework/risk-management/supported-models#07-Trailing-Stop-Model
self.AddRiskManagement(TrailingStopRiskManagementModel())
# Track of securities in the universe
self.securityTracker = set()
## ------------------------------------- ALPHA MODEL ------------------------------------- ##
class MyBasePairsTradingAlphaModel(AlphaModel):
'''This alpha model is designed to accept every possible pair combination
from securities selected by the universe selection model
This model generates alternating long ratio/short ratio insights emitted as a group'''
def __init__(self, lookback = 1,
resolution = Resolution.Daily,
threshold = 1):
''' Initializes a new instance of the PairsTradingAlphaModel class
Args:
lookback: Lookback period of the analysis
resolution: Analysis resolution
threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
self.lookback = lookback
self.resolution = resolution
self.threshold = threshold
self.predictionInterval = Time.Multiply(Extensions.ToTimeSpan(self.resolution), self.lookback)
self.pairs = dict()
self.Securities = list()
resolutionString = Extensions.GetEnumString(resolution, Resolution)
self.Name = f'{self.__class__.__name__}({self.lookback},{resolutionString},{Extensions.NormalizeToStr(threshold)})'
def Update(self, algorithm, data):
''' Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
insights = []
for key, pair in self.pairs.items():
insights.extend(pair.GetInsightGroup())
return insights
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed.
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
for security in changes.AddedSecurities:
self.Securities.append(security)
for security in changes.RemovedSecurities:
if security in self.Securities:
self.Securities.remove(security)
self.UpdatePairs(algorithm)
for security in changes.RemovedSecurities:
keys = [k for k in self.pairs.keys() if security.Symbol in k]
for key in keys:
self.pairs.pop(key).dispose()
def UpdatePairs(self, algorithm):
symbols = sorted([x.Symbol for x in self.Securities], key=lambda x: str(x.ID))
for i in range(0, len(symbols)):
asset_i = symbols[i]
for j in range(1 + i, len(symbols)):
asset_j = symbols[j]
pair_symbol = (asset_i, asset_j)
invert = (asset_j, asset_i)
if pair_symbol in self.pairs or invert in self.pairs:
continue
if not self.HasPassedTest(algorithm, asset_i, asset_j):
continue
pair = self.Pair(algorithm, asset_i, asset_j, self.predictionInterval, self.threshold)
self.pairs[pair_symbol] = pair
def HasPassedTest(self, algorithm, asset1, asset2):
'''Check whether the assets pass a pairs trading test
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
Returns:
True if the statistical test for the pair is successful'''
return True
class Pair:
class State(Enum):
ShortRatio = -1
FlatRatio = 0
LongRatio = 1
def __init__(self, algorithm, asset1, asset2, predictionInterval, threshold):
'''Create a new pair
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
predictionInterval: Period over which this insight is expected to come to fruition
threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
self.state = self.State.FlatRatio
self.algorithm = algorithm
self.asset1 = asset1
self.asset2 = asset2
# Created the Identity indicator for a given Symbol and
# the consolidator it is registered to. The consolidator reference
# will be used to remove it from SubscriptionManager
def CreateIdentityIndicator(symbol: Symbol):
resolution = min([x.Resolution for x in algorithm.SubscriptionManager.SubscriptionDataConfigService.GetSubscriptionDataConfigs(symbol)])
name = algorithm.CreateIndicatorName(symbol, "close", resolution)
identity = Identity(name)
consolidator = algorithm.ResolveConsolidator(symbol, resolution)
algorithm.RegisterIndicator(symbol, identity, consolidator)
return identity, consolidator
self.asset1Price, self.identityConsolidator1 = CreateIdentityIndicator(asset1);
self.asset2Price, self.identityConsolidator2 = CreateIdentityIndicator(asset2);
self.ratio = IndicatorExtensions.Over(self.asset1Price, self.asset2Price)
self.mean = IndicatorExtensions.Of(ExponentialMovingAverage(500), self.ratio)
upper = ConstantIndicator[IndicatorDataPoint]("ct", 1 + threshold / 100)
self.upperThreshold = IndicatorExtensions.Times(self.mean, upper)
lower = ConstantIndicator[IndicatorDataPoint]("ct", 1 - threshold / 100)
self.lowerThreshold = IndicatorExtensions.Times(self.mean, lower)
self.predictionInterval = predictionInterval
def dispose(self):
'''
On disposal, remove the consolidators from the subscription manager
'''
self.algorithm.SubscriptionManager.RemoveConsolidator(self.asset1, self.identityConsolidator1)
self.algorithm.SubscriptionManager.RemoveConsolidator(self.asset2, self.identityConsolidator2)
def GetInsightGroup(self):
'''Gets the insights group for the pair
Returns:
Insights grouped by an unique group id'''
if not self.mean.IsReady:
return []
# don't re-emit the same direction
if self.state is not self.State.LongRatio and self.ratio > self.upperThreshold:
self.state = self.State.LongRatio
# asset1/asset2 is more than 2 std away from mean, short asset1, long asset2
shortAsset1 = Insight.Price(self.asset1, self.predictionInterval, direction=InsightDirection.Down,magnitude=self.ratio.Current.Value)
longAsset2 = Insight.Price(self.asset2, self.predictionInterval, direction=InsightDirection.Up,magnitude=self.ratio.Current.Value)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(shortAsset1, longAsset2)
# don't re-emit the same direction
if self.state is not self.State.ShortRatio and self.ratio < self.lowerThreshold:
self.state = self.State.ShortRatio
# asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
longAsset1 = Insight.Price(self.asset1, self.predictionInterval, direction=InsightDirection.Up,magnitude=self.ratio.Current.Value)
shortAsset2 = Insight.Price(self.asset2, self.predictionInterval, direction=InsightDirection.Down,magnitude=self.ratio.Current.Value)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(longAsset1, shortAsset2)
return []
class ADFPairsTradingAlphaModel(MyBasePairsTradingAlphaModel):
'''
This model is an adaptation from the "PearsonCorrelationPairsTradingAlphaModel" implemented
by QuantConnect, It looks to implement as insights the ADF test statistics.
This alpha model is designed to rank every pair combination by its ADF test
and trade the pair with the highest correlation
This model generates alternating long ratio/short ratio insights emitted as a group.
'''
def __init__(self, lookback:int = 15,
resolution = Resolution.Minute,
threshold = 1,
minimumZScore = 1,
min_statistical_significance = 0.05):
'''Initializes a new instance of the PearsonCorrelationPairsTradingAlphaModel class
Args:
lookback: lookback period of the analysis
resolution: analysis resolution
threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight
minimumCorrelation: The minimum correlation to consider a tradable pair'''
super().__init__(lookback, resolution, threshold)
self.lookback = lookback
self.resolution = resolution
self.min_statistical_significance = min_statistical_significance
self.minimumZScore = minimumZScore
self.best_pair = ()
def Update(self, algorithm: QCAlgorithm, data: Slice) -> List[Insight]:
# Update every hour
if int(algorithm.Time.minute) != 0:
return []
symbols = []
for x in self.Securities:
symbols.append(x.Symbol)
if not x.Exchange.Hours.IsOpen(algorithm.Time, extendedMarket=False):
return []
history = algorithm.History(symbols, self.lookback, self.resolution).close.unstack(level=0)
if not history.empty:
df = self.get_price_dataframe(history)
stop = len(df.columns)
adf = dict()
for i in range(0, stop):
for j in range(i+1, stop):
if (j, i) not in adf:
adf[(i, j)] = self.ComputeADF(df.iloc[:,i], df.iloc[:,j])
# Statistical significan or Stationarity
corr = list(filter(lambda kv: kv[1][0]<self.min_statistical_significance, adf.items()))
if len(corr)==0:
return []
else:
# Lowest statistical test -> better conintegration
corr = sorted(corr, key = lambda kv: kv[-1][1],reverse=True)
# ZScore filter
if abs(corr[-1][-1][2]) >= self.minimumZScore:
algorithm.Log('The best pair was ({} - {}) with a p-value of {}, ADF-stat of {} and Z-Score of {}'.\
format(symbols[corr[-1][0][0]], symbols[corr[-1][0][1]],
corr[-1][-1][0],corr[-1][-1][1], corr[-1][-1][2]))
self.best_pair = (symbols[corr[-1][0][0]], symbols[corr[-1][0][1]])
self.UpdatePairs(algorithm)
insights = super().Update(algorithm, data)
return insights
def ComputeADF(self, Y, X):
# Get history logs
X = sm.add_constant(X)
# Regression model
model = sm.OLS(Y,X)
results = model.fit()
# Standard deviation of the residual
sigma = np.sqrt(results.mse_resid)
slope = results.params[1]
intercept = results.params[0]
# Regression residual has mean =0 by definition
res = results.resid
zscore = res/sigma
adf = adfuller (res)
adf = pd.Series(adf[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used'])
return [adf['p-value'], adf['Test Statistic'], zscore.values[-1]]
def HasPassedTest(self, algorithm, asset1, asset2):
'''Check whether the assets pass a pairs trading test
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
Returns:
True if the statistical test for the pair is successful'''
return self.best_pair is not None and self.best_pair == (asset1, asset2)
def get_price_dataframe(self, df):
timezones = { x.Symbol.Value: x.Exchange.TimeZone for x in self.Securities }
# Use log prices
df = np.log(df)
is_single_timeZone = len(set(timezones.values())) == 1
if not is_single_timeZone:
series_dict = dict()
for column in df:
# Change the dataframe index from data time to UTC time
to_utc = lambda x: Extensions.ConvertToUtc(x, timezones[column])
if self.resolution == Resolution.Daily:
to_utc = lambda x: Extensions.ConvertToUtc(x, timezones[column]).date()
data = df[[column]]
data.index = data.index.map(to_utc)
series_dict[column] = data[column]
df = pd.DataFrame(series_dict).dropna()
return (df - df.shift(1)).dropna()