| Overall Statistics |
|
Total Trades 11162 Average Win 0.26% Average Loss -0.14% Compounding Annual Return 4753.504% Drawdown 4.200% Expectancy 0.266 Net Profit 4753.504% Sharpe Ratio 13.252 Probabilistic Sharpe Ratio 0% Loss Rate 54% Win Rate 46% Profit-Loss Ratio 1.77 Alpha 3.244 Beta -0.161 Annual Standard Deviation 0.244 Annual Variance 0.06 Information Ratio 10.796 Tracking Error 0.295 Treynor Ratio -20.16 Total Fees $317156.05 |
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
import pprint
from datetime import timedelta
from scipy.stats import spearmanr
from statsmodels.tsa.stattools import coint, adfuller
import numpy as np
import pandas as pd
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")
from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm.Framework.Alphas import *
from statsmodels.distributions.empirical_distribution import ECDF
from statsmodels.tsa.vector_ar.vecm import coint_johansen
GlobalPairsList = []
class PairsUniverseSelection(FundamentalUniverseSelectionModel):
def __init__(self, filterFineData = True, universeSettings = None, securityInitializer = None):
'''Initializes a new default instance of the TechnologyUniverseModule'''
super().__init__(filterFineData, universeSettings, securityInitializer)
self.numberOfSymbolsCoarse = 3000
self.numberOfSymbolsFine = 1000
self.dollarVolumeBySymbol = {}
self.lastMonth = -1
self.pairsList = list()
self.resolution = Resolution.Daily
self.minCorr = 0.75
self.maxAdfP = 1e-10 #1e-15
self.maxCointP = 1e-10 #1e-15
self.lookback = 252 * 3
self.max_pairs = 6
def PairsTest(self, algorithm, history):
df = self.get_returns_dataframe(history, algorithm)
pairs = list()
symbols = list()
corr = df.corr('spearman').unstack()
corr = corr.sort_values(kind='quicksort', ascending=False,)[corr < 1][corr>=self.minCorr]
for i in range(0, len(corr)):
pair = corr.index[i]
if corr.index[i] in pairs or (corr.index[i][1], corr.index[i][0]) in pairs:
continue
if coint( df.loc[:,pair[0]], df.loc[:,pair[1]] )[1] <= self.maxCointP:
pairs.append(pair)
for symbol in pair:
if not symbol in symbols:
symbols.append(symbol)
if len(pairs) >= self.max_pairs:
return pairs, symbols
return pairs, symbols
def SelectCoarse(self, algorithm, coarse):
'''
Performs a coarse selection:
-The stock must have fundamental data
-The stock must have positive previous-day close price
-The stock must have positive volume on the previous trading day
'''
if algorithm.Time.month == self.lastMonth:
return Universe.Unchanged
filtered = [x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0]
sortedByDollarVolume = sorted(filtered, key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse]
symbols = []
self.dollarVolumeBySymbol.clear()
for x in sortedByDollarVolume:
symbols.append(x.Symbol)
self.dollarVolumeBySymbol[x.Symbol] = x.DollarVolume
return symbols
def SelectFine(self, algorithm, fine):
'''
Performs a fine selection:
-The company's headquarter must in the U.S.
-The stock must be traded on the NASDAQ stock exchange
-At least half a year since its initial public offering
-The stock must be in the Industry Template Code catagory N
'''
if algorithm.Time.month == self.lastMonth:
return Universe.Unchanged
self.lastMonth = algorithm.Time.month
# Filter stocks
filteredFine = [x for x in fine if x.CompanyReference.CountryId == "USA"
and ( (x.CompanyReference.PrimaryExchangeID == "NAS")
or (x.CompanyReference.PrimaryExchangeID == "NYSE") )
and (algorithm.Time - x.SecurityReference.IPODate).days > 180
and x.CompanyReference.IndustryTemplateCode == "N"
]
sortedByDollarVolume = []
# Sort stocks on dollar volume
sortedByDollarVolume = sorted(filteredFine, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True)
symbols = [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]]
#symbol_strings = list()
#for symbol in symbols:
# symbol_strings.append(str(symbol))
symbolsInPairs = list()
GlobalPairsList.clear()
history = algorithm.History(symbols, self.lookback, self.resolution)
if history.empty:
return symbolsInPairs
Test = self.PairsTest(algorithm, history)
GlobalPairsList.extend(Test[0]) ##################
symbolsInPairs.extend(Test[1])
algorithm.Debug(GlobalPairsList)
algorithm.Log(GlobalPairsList)
algorithm.Debug(str(len(symbolsInPairs)) + " symbols in universe after")
algorithm.Log(str(len(symbolsInPairs)) + " symbols in universe after on " + str(algorithm.UtcTime))
algorithm.Debug(str(len(GlobalPairsList)) + " Global Pairs List " + str(algorithm.UtcTime))
algorithm.Log(str(len(GlobalPairsList)) + " Global Pairs List " + str(algorithm.UtcTime))
return symbolsInPairs
def get_returns_dataframe(self, history, algorithm):
df = np.log(history.unstack(level = 0).close)
df = 1 + pd.DataFrame(df).pct_change().dropna(axis=1, thresh = int(len(df.index) * .85)).dropna()
return df
from collections import OrderedDict
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")
from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm.Framework.Alphas import *
from BasePairsFrameworkAlpha import BasePairsTradingAlphaModel
class CorrelationAndCointegrationPairsTrading(BasePairsTradingAlphaModel):
''' This alpha model is designed to rank every pair combination by its pearson corrDictelation
and trade the pair with the hightest corrDictelation
This model generates alternating long ratio/short ratio insights emitted as a group'''
def __init__(self,
lookback = 1080,
resolution = Resolution.Daily,
threshold = .1,
minimumcorrelation = .99):
'''Initializes a new instance of the PearsoncorrDictelationPairsTradingAlphaModel class
Args:
lookback: lookback period of the analysis
resolution: analysis resolution
threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight
minimumcorrDictelation: The minimum corrDictelation to consider a tradable pair'''
super().__init__(lookback, resolution, threshold)
self.lookback = lookback
self.resolution = resolution
self.minimumcorrelation = .75
self.max_p_value = 1 - minimumcorrelation
self.pairsList = list()
self.DataIsConsolidated = False
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed.
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
self.pairsList = GlobalPairsList
if len(self.pairsList) == 0:
algorithm.Debug("No Pairs; Date: " + str(algorithm.UtcTime))
algorithm.Log("No Pairs; Date: " + str(algorithm.UtcTime))
else:
algorithm.Debug("Number of pairs: " + str(len(self.pairsList)) + ", Date: " + str(algorithm.UtcTime))
algorithm.Log("Number of pairs: " + str(len(self.pairsList)) + ", Date: " + str(algorithm.UtcTime))
super().OnSecuritiesChanged(algorithm, changes)
def HasPassedTest(self, algorithm, asset1, asset2):
'''Check whether the assets pass a pairs trading test
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
Returns:
True if the statistical test for the pair is successful'''
pair = (str(asset1), str(asset2))
reverse = (str(asset2), str(asset1))
if pair in self.pairsList \
or reverse in self.pairsList:
return True
return False
## QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")
from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import *
from datetime import timedelta
from enum import Enum
import numpy as np
from collections import deque
class BasePairsTradingAlphaModel(AlphaModel):
'''This alpha model is designed to accept every possible pair combination
from securities selected by the universe selection model
This model generates alternating long ratio/short ratio insights emitted as a group'''
def __init__(self, lookback = 252,
resolution = Resolution.Daily,
threshold = 1,
predictionInterval = Time.Multiply(Extensions.ToTimeSpan(Resolution.Minute), 120)):
''' Initializes a new instance of the PairsTradingAlphaModel class
Args:
lookback: Lookback self.period of the analysis
resolution: Analysis resolution
threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
self.lookback = lookback
self.resolution = resolution
self.threshold = threshold
self.predictionInterval = predictionInterval
self.consolidated = False
self.pairs = dict()
self.Securities = list()
resolutionString = Extensions.GetEnumString(resolution, Resolution)
self.Name = f'{self.__class__.__name__}({self.lookback},{resolutionString},{Extensions.NormalizeToStr(threshold)})'
def Update(self, algorithm, data):
''' Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
insights = []
for key, pair in self.pairs.items():
insights.extend(pair.GetInsightGroup(algorithm))
return insights
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed.
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
self.UpdatePairs(algorithm)
def UpdatePairs(self, algorithm):
symbols = sorted([x.Symbol for x in algorithm.ActiveSecurities.Values], key=lambda x: str(x.ID))
self.pairs = {}
for i in range(0, len(symbols)):
asset_i = symbols[i]
for j in range(1 + i, len(symbols)):
asset_j = symbols[j]
pair_symbol = (asset_i, asset_j)
invert = (asset_j, asset_i)
if pair_symbol in self.pairs or invert in self.pairs:
continue
if not self.HasPassedTest(algorithm, asset_i, asset_j):
continue
pair = self.Pair(algorithm, asset_i, asset_j, self.predictionInterval, self.threshold)
self.pairs[pair_symbol] = pair
def HasPassedTest(self, algorithm, asset1, asset2):
'''Check whether the assets pass a pairs trading test
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
Returns:
True if the statistical test for the pair is successful'''
return True
class Pair:
class State(Enum):
ShortRatio = -1
FlatRatio = 0
LongRatio = 1
def __init__(self, algorithm, asset1, asset2, predictionInterval, threshold):
'''Create a new pair
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
predictionInterval: self.period over which this insight is expected to come to fruition
threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
self.state = self.State.FlatRatio
self.period = 500
self.barsize = 1
consolidatorLength = timedelta(minutes=self.barsize)
self.pair = (asset1, asset2)
tradeBarConsolidator1 = TradeBarConsolidator(consolidatorLength)
tradeBarConsolidator2 = TradeBarConsolidator(consolidatorLength)
algorithm.SubscriptionManager.AddConsolidator(asset1, tradeBarConsolidator1)
algorithm.SubscriptionManager.AddConsolidator(asset2, tradeBarConsolidator2)
self.asset1 = asset1
self.asset2 = asset2
self.spreadDeque = deque(maxlen=self.period)
self.asset1Price = SimpleMovingAverage(1)
self.asset2Price = SimpleMovingAverage(1)
algorithm.RegisterIndicator(asset1, self.asset1Price, tradeBarConsolidator1)
algorithm.RegisterIndicator(asset2, self.asset2Price, tradeBarConsolidator2)
tradeBarConsolidator1.DataConsolidated += self.asset1Consolidated
tradeBarConsolidator2.DataConsolidated += self.asset2Consolidated
self.upperThreshold = 1 + (threshold / 100)
self.lowerThreshold = 1 - (threshold / 100)
self.count = 0
self.predictionInterval = predictionInterval
self.Data1IsConsolidated = False
self.Data2IsConsolidated = False
hist1 = algorithm.History(asset1, self.period * self.barsize, Resolution.Minute)
hist2 = algorithm.History(asset2, self.period * self.barsize, Resolution.Minute)
if not 'close' in hist1:
return
if not 'close' in hist2:
return
###bar1 = np.log(np.array(hist1['close'][0::self.barsize]))
bar1 = np.array(hist1['close'][0::self.barsize])
###bar2 = np.log(np.array(hist2['close'][0::self.barsize]))
bar2 = np.array(hist2['close'][0::self.barsize])
minhist = min(len(bar1), len(bar2))
self.spread = np.divide(bar1[:minhist], bar2[:minhist])
for i in range (0, len(self.spread)):
self.spreadDeque.appendleft(self.spread[i])
self.mean = np.mean(self.spreadDeque)
self.std = np.std(self.spreadDeque)
self.ratio = None
def asset1Consolidated(self, sender, consolidated):
self.Data1IsConsolidated = True
def asset2Consolidated(self, sender, consolidated):
self.Data2IsConsolidated = True
def GetInsightGroup(self, algorithm):
'''Gets the insights group for the pair
Returns:
Insights grouped by an unique group id'''
# *** Add History Warmup
if not (self.Data1IsConsolidated & self.Data2IsConsolidated):
return []
###self.ratio = np.log(self.asset1Price.Current.Value) / np.log(self.asset2Price.Current.Value)
self.ratio = self.asset1Price.Current.Value / self.asset2Price.Current.Value
self.spreadDeque.appendleft(self.ratio)
self.mean = np.mean(self.spreadDeque)
self.std = np.std(self.spreadDeque)
self.zscore = (self.ratio - self.mean) / self.std
self.Data1IsConsolidated = False
self.Data2IsConsolidated = False
if len(self.spreadDeque) != self.period:
return []
if self.state is not self.State.LongRatio and self.ratio > (self.mean * self.upperThreshold):
self.state = self.State.LongRatio
# asset1/asset2 is more than 2 std away from mean, short asset1, long asset2
shortAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Down)
longAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Up)
#algorithm.SetHoldings(self.asset1, -weight)
#algorithm.SetHoldings(self.asset2, weight)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(shortAsset1, longAsset2)
# don't re-emit the same direction
if self.state is not self.State.ShortRatio and self.ratio < (self.mean * self.lowerThreshold):
self.state = self.State.ShortRatio
# asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
longAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Up)
shortAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Down)
#algorithm.SetHoldings(self.asset1, weight)
#algorithm.SetHoldings(self.asset2, -weight)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(longAsset1, shortAsset2)
# don't re-emit the same direction
if self.state is self.State.ShortRatio and self.ratio > self.mean:
self.state = self.State.FlatRatio
# asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
neutralasset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Flat)
neutralasset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Flat)
#algorithm.SetHoldings(self.asset1, 0)
#algorithm.SetHoldings(self.asset2, 0)
# creates a group id and set the GroupId property on each insight object
return []#Insight.Group(neutralasset1, neutralasset2)
# don't re-emit the same direction
if self.state is self.State.LongRatio and self.ratio < self.mean:
self.state = self.State.FlatRatio
# asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
neutralasset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Flat)
neutralasset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Flat)
#algorithm.SetHoldings(self.asset1, 0)
#algorithm.SetHoldings(self.asset2, 0)
# creates a group id and set the GroupId property on each insight object
return []#Insight.Group(neutralasset1, neutralasset2)
return []
### QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")
from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Alphas import *
from datetime import timedelta
from enum import Enum
import numpy as np
from collections import deque
class BasePairsTradingAlphaModel(AlphaModel):
'''This alpha model is designed to accept every possible pair combination
from securities selected by the universe selection model
This model generates alternating long ratio/short ratio insights emitted as a group'''
def __init__(self, lookback = 252,
resolution = Resolution.Daily,
threshold = .4,
predictionInterval = Time.Multiply(Extensions.ToTimeSpan(Resolution.Minute), 1000)):
''' Initializes a new instance of the PairsTradingAlphaModel class
Args:
lookback: Lookback self.period of the analysis
resolution: Analysis resolution
threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
self.lookback = lookback
self.resolution = resolution
self.threshold = threshold
self.predictionInterval = predictionInterval
self.consolidated = False
self.pairs = dict()
self.Securities = list()
resolutionString = Extensions.GetEnumString(resolution, Resolution)
self.Name = f'{self.__class__.__name__}({self.lookback},{resolutionString},{Extensions.NormalizeToStr(threshold)})'
def Update(self, algorithm, data):
''' Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
insights = []
for key, pair in self.pairs.items():
insights.extend(pair.GetInsightGroup(algorithm))
return insights
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed.
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
self.UpdatePairs(algorithm, changes)
def UpdatePairs(self, algorithm, changes):
symbols = sorted([x.Symbol for x in algorithm.ActiveSecurities.Values], key=lambda x: str(x.ID))
for symbol in changes.RemovedSecurities:
if symbol in symbols:
symbols.remove(symbol)
# Remove pair from dict and its consolidators from the algorithm
pair = self.pairs.pop(symbol, None)
if pair is not None:
algorithm.Log(f"Removing Consolidators {algorithm.UtcTime}")
algorithm.Debug(f"Removing Consolidators {algorithm.UtcTime}")
pair.RemoveConsolidators(algorithm)
for i in range(0, len(symbols)):
asset_i = symbols[i]
for j in range(1 + i, len(symbols)):
asset_j = symbols[j]
pair_symbol = (asset_i, asset_j)
invert = (asset_j, asset_i)
if pair_symbol in self.pairs or invert in self.pairs:
continue
if not self.HasPassedTest(algorithm, asset_i, asset_j):
continue
pair = self.Pair(algorithm, asset_i, asset_j, self.predictionInterval, self.threshold)
self.pairs[pair_symbol] = pair
def HasPassedTest(self, algorithm, asset1, asset2):
'''Check whether the assets pass a pairs trading test
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
Returns:
True if the statistical test for the pair is successful'''
return True
class Pair:
class State(Enum):
ShortRatio = -1
FlatRatio = 0
LongRatio = 1
def __init__(self, algorithm, asset1, asset2, predictionInterval, threshold):
'''Create a new pair
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
predictionInterval: self.period over which this insight is expected to come to fruition
threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight'''
self.state = self.State.FlatRatio
self.period = 500
self.barsize = 1
consolidatorLength = timedelta(minutes=self.barsize)
self.pair = (asset1, asset2)
self.consolidator1 = TradeBarConsolidator(consolidatorLength)
self.consolidator2 = TradeBarConsolidator(consolidatorLength)
#algorithm.SubscriptionManager.AddConsolidator(asset1, tradeBarConsolidator1)
#algorithm.SubscriptionManager.AddConsolidator(asset2, tradeBarConsolidator2)
self.asset1 = asset1
self.asset2 = asset2
self.security1 = algorithm.Securities[asset1]
self.security2 = algorithm.Securities[asset2]
self.spreadDeque = deque(maxlen=self.period)
self.asset1Price = Identity(asset1)
self.asset2Price = Identity(asset2)
algorithm.RegisterIndicator(self.asset1, self.asset1Price, self.consolidator1)
algorithm.RegisterIndicator(self.asset2, self.asset2Price, self.consolidator2)
#tradeBarConsolidator1.DataConsolidated += self.asset1Consolidated
#tradeBarConsolidator2.DataConsolidated += self.asset2Consolidated
self.upperThreshold = 1 + (threshold / 100)
self.lowerThreshold = 1 - (threshold / 100)
self.count = 0
self.predictionInterval = predictionInterval
#self.Data1IsConsolidated = False
#self.Data2IsConsolidated = False
hist1 = algorithm.History(asset1, self.period * self.barsize, Resolution.Minute)
hist2 = algorithm.History(asset2, self.period * self.barsize, Resolution.Minute)
if not 'close' in hist1:
return
if not 'close' in hist2:
return
###bar1 = np.log(np.array(hist1['close'][0::self.barsize]))
bar1 = np.array(hist1['close'])
###bar2 = np.log(np.array(hist2['close'][0::self.barsize]))
bar2 = np.array(hist2['close'])
minhist = min(len(bar1), len(bar2))
self.spread = np.divide(bar1[:minhist], bar2[:minhist])
for i in range (0, len(self.spread)):
self.spreadDeque.appendleft(self.spread[i])
self.mean = np.mean(self.spreadDeque)
#self.std = np.std(self.spreadDeque)
self.ratio = None
#def asset1Consolidated(self, sender, consolidated):
#self.Data1IsConsolidated = True
#def asset2Consolidated(self, sender, consolidated):
#self.Data2IsConsolidated = True
def RemoveConsolidators(self, algorithm):
algorithm.SubscriptionManager.RemoveConsolidator(self.asset1, self.consolidator1)
algorithm.SubscriptionManager.RemoveConsolidator(self.asset2, self.consolidator2)
def GetInsightGroup(self, algorithm):
'''Gets the insights group for the pair
Returns:
Insights grouped by an unique group id'''
# *** Add History Warmup
if not (self.security1.IsTradable and self.security2.IsTradable):
return []
###self.ratio = np.log(self.asset1Price.Current.Value) / np.log(self.asset2Price.Current.Value)
self.ratio = np.divide(self.asset1Price.Current.Value, self.asset2Price.Current.Value)
self.spreadDeque.appendleft(self.ratio)
self.mean = np.mean(self.spreadDeque)
#self.std = np.std(self.spreadDeque)
#self.zscore = (self.ratio - self.mean) / self.std
#self.Data1IsConsolidated = False
#self.Data2IsConsolidated = False
if len(self.spreadDeque) != self.period:
return []
if self.state is not self.State.LongRatio and self.ratio > (self.mean * self.upperThreshold):
self.state = self.State.LongRatio
# asset1/asset2 is more than 2 std away from mean, short asset1, long asset2
shortAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Down)
longAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Up)
#algorithm.SetHoldings(self.asset1, -weight)
#algorithm.SetHoldings(self.asset2, weight)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(shortAsset1, longAsset2)
# don't re-emit the same direction
if self.state is not self.State.ShortRatio and self.ratio < (self.mean * self.lowerThreshold):
self.state = self.State.ShortRatio
# asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
longAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Up)
shortAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Down)
#algorithm.SetHoldings(self.asset1, weight)
#algorithm.SetHoldings(self.asset2, -weight)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(longAsset1, shortAsset2)
return []
### QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import OrderedDict
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm.Framework")
AddReference("QuantConnect.Indicators")
from QuantConnect import *
from QuantConnect.Indicators import *
from QuantConnect.Algorithm.Framework.Alphas import *
from BasePairsFrameworkAlpha import BasePairsTradingAlphaModel
from datetime import timedelta
from scipy.stats import pearsonr
from statsmodels.tsa.stattools import coint, adfuller
import numpy as np
import pandas as pd
class CorrelationAndCointegrationPairsTrading(BasePairsTradingAlphaModel):
''' This alpha model is designed to rank every pair combination by its pearson corrDictelation
and trade the pair with the hightest corrDictelation
This model generates alternating long ratio/short ratio insights emitted as a group'''
def __init__(self,
lookback = 1080,
resolution = Resolution.Daily,
threshold = .1,
minimumcorrelation = .99):
'''Initializes a new instance of the PearsoncorrDictelationPairsTradingAlphaModel class
Args:
lookback: lookback period of the analysis
resolution: analysis resolution
threshold: The percent [0, 100] deviation of the ratio from the mean before emitting an insight
minimumcorrDictelation: The minimum corrDictelation to consider a tradable pair'''
super().__init__(lookback, resolution, threshold)
self.lookback = lookback
self.resolution = resolution
self.minimumcorrelation = .85
self.max_p_value = 1 - minimumcorrelation
self.pairsList = list()
self.DataIsConsolidated = False
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed.
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
self.pairsList = []
symbols = [ x.Symbol for x in algorithm.ActiveSecurities.Values ]
symbols.pop()
pairsDict = dict()
history = algorithm.History(symbols, self.lookback, self.resolution).close.unstack(level=0)
if not history.empty:
df = self.get_price_dataframe(history, algorithm)
for i in range(0, len(df.columns)):
for j in range(i+1, len(df.columns)):
pair = (i, j)
array1 = df.iloc[:,i]
array2 = df.iloc[:,j]
minhist = min(len(array1), len(array2))
spreadArray = np.divide(array1[:minhist], array2[:minhist])
IsFinite = np.isfinite(spreadArray)
if not np.all(IsFinite):
continue
pearsonr_value = pearsonr(array1, array2)[0]
if pearsonr_value == 1:
continue
if (pearsonr_value >= self.minimumcorrelation):
p_value = coint(array1, array2)[1]
if p_value == 0:
continue
if (p_value <= self.max_p_value):
adfP = adfuller(spreadArray)[1]
if adfP == 0:
continue
if adfP < 0.95:
algorithm.Debug(str(p_value) + " = Coint P Value, " \
+ str(adfP) + " = Adfuller P Value, " \
+ str(pearsonr_value) + " = Pearson Value, " \
+ str(df.columns[i]) + ", " \
+ str(df.columns[j]))
algorithm.Log(str(p_value) + " = Coint P Value, " \
+ str(adfP) + " = Adfuller P Value, " \
+ str(pearsonr_value) + " = Pearson Value, " \
+ str(df.columns[i]) + ", " \
+ str(df.columns[j]))
self.pairsList.append( (df.columns[i], df.columns[j]) ) ##################
if len(self.pairsList) == 0:
algorithm.Debug("No Pairs; Date: " + str(algorithm.UtcTime))
algorithm.Log("No Pairs; Date: " + str(algorithm.UtcTime))
else:
algorithm.Debug("Number of pairs: " + str(len(self.pairsList)) + ", Date: " + str(algorithm.UtcTime))
algorithm.Log("Number of pairs: " + str(len(self.pairsList)) + ", Date: " + str(algorithm.UtcTime))
super().OnSecuritiesChanged(algorithm, changes)
def HasPassedTest(self, algorithm, asset1, asset2):
'''Check whether the assets pass a pairs trading test
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
Returns:
True if the statistical test for the pair is successful'''
pair = (str(asset1), str(asset2))
reverse = (str(asset2), str(asset1))
if pair in self.pairsList \
or reverse in self.pairsList:
return True
return False
def get_price_dataframe(self, df, algorithm):
df = np.log(df)
return (df - df.shift(1)).dropna()# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm.Framework")
from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from itertools import groupby
from datetime import datetime, timedelta
from pytz import utc
UTCMIN = datetime.min.replace(tzinfo=utc)
class PairsTradingPortfolioConstruction(PortfolioConstructionModel):
'''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities.
The target percent holdings of each security is 1/N where N is the number of securities.
For insights of direction InsightDirection.Up, long targets are returned and
for insights of direction InsightDirection.Down, short targets are returned.'''
def __init__(self, rebalancingParam = Resolution.Daily):
'''Initialize a new instance of EqualWeightingPortfolioConstructionModel
Args:
rebalancingParam: Rebalancing parameter. If it is a timedelta or Resolution, it will be converted into a function.
The function returns the next expected rebalance time for a given algorithm UTC DateTime'''
self.insightCollection = InsightCollection()
self.removedSymbols = []
self.nextExpiryTime = UTCMIN
self.rebalancingTime = UTCMIN
self.rebalancingFunc = rebalancingParam
# If the argument is an instance if Resolution or Timedelta
# Redefine self.rebalancingFunc
if isinstance(rebalancingParam, int):
rebalancingParam = Extensions.ToTimeSpan(rebalancingParam)
if isinstance(rebalancingParam, timedelta):
self.rebalancingFunc = lambda dt: dt + rebalancingParam
def ShouldCreateTargetForInsight(self, insight):
'''Method that will determine if the portfolio construction model should create a
target for this insight
Args:
insight: The insight to create a target for'''
return True
def DetermineTargetPercent(self, activeInsights):
'''Will determine the target percent for each insight
Args:
activeInsights: The active insights to generate a target for'''
result = {}
# give equal weighting to each security
count = sum(x.Direction != InsightDirection.Flat for x in activeInsights)
percent = 0 if count == 0 else 1.0 / count
for insight in activeInsights:
result[insight] = insight.Direction * percent
return result
def CreateTargets(self, algorithm, insights):
'''Create portfolio targets from the specified insights
Args:
algorithm: The algorithm instance
insights: The insights to create portfolio targets from
Returns:
An enumerable of portfolio targets to be sent to the execution model'''
targets = []
if (algorithm.UtcTime <= self.nextExpiryTime and
algorithm.UtcTime <= self.rebalancingTime and
len(insights) == 0 and
self.removedSymbols is None):
return targets
for insight in insights:
if self.ShouldCreateTargetForInsight(insight):
self.insightCollection.Add(insight)
# Create flatten target for each security that was removed from the universe
if self.removedSymbols is not None:
universeDeselectionTargets = [ PortfolioTarget(symbol, 0) for symbol in self.removedSymbols ]
targets.extend(universeDeselectionTargets)
self.removedSymbols = None
# Get insight that haven't expired of each symbol that is still in the universe
activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)
# Get the last generated active insight for each symbol
lastActiveInsights = []
for symbol, g in groupby(activeInsights, lambda x: x.Symbol):
lastActiveInsights.append(sorted(g, key = lambda x: x.GeneratedTimeUtc)[-1])
# Determine target percent for the given insights
percents = self.DetermineTargetPercent(lastActiveInsights)
errorSymbols = {}
for insight in lastActiveInsights:
target = PortfolioTarget.Percent(algorithm, insight.Symbol, percents[insight])
if not target is None:
targets.append(target)
else:
errorSymbols[insight.Symbol] = insight.Symbol
# Get expired insights and create flatten targets for each symbol
expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime)
expiredTargets = []
for symbol, f in groupby(expiredInsights, lambda x: x.Symbol):
if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime) and not symbol in errorSymbols:
expiredTargets.append(PortfolioTarget(symbol, 0))
continue
targets.extend(expiredTargets)
self.nextExpiryTime = self.insightCollection.GetNextExpiryTime()
if self.nextExpiryTime is None:
self.nextExpiryTime = UTCMIN
self.rebalancingTime = self.rebalancingFunc(algorithm.UtcTime)
return targets
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
# Get removed symbol and invalidate them in the insight collection
self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities]
self.insightCollection.Clear(self.removedSymbols)# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Algorithm.Framework")
import numpy as np
from System import *
from QuantConnect import *
from QuantConnect.Orders import *
from QuantConnect.Algorithm import *
from QuantConnect.Algorithm.Framework import *
from QuantConnect.Algorithm.Framework.Execution import *
from QuantConnect.Algorithm.Framework.Portfolio import *
class PairsTradingOrderExecution(ExecutionModel):
'''Provides an implementation of IExecutionModel that immediately submits market orders to achieve the desired portfolio targets'''
def __init__(self):
'''Initializes a new instance of the ImmediateExecutionModel class'''
self.targetsCollection = PortfolioTargetCollection()
def Execute(self, algorithm, targets):
'''Immediately submits orders for the specified portfolio targets.
Args:
algorithm: The algorithm instance
targets: The portfolio targets to be ordered'''
# for performance we check count value, OrderByMarginImpact and ClearFulfilled are expensive to call
self.targetsCollection.AddRange(targets)
if self.targetsCollection.Count > 0:
for target in self.targetsCollection.OrderByMarginImpact(algorithm):
open_quantity = sum([x.Quantity - x.QuantityFilled for x in algorithm.Transactions.GetOpenOrderTickets(target.Symbol)])
existing = algorithm.Securities[target.Symbol].Holdings.Quantity + open_quantity
quantity = int(target.Quantity - existing)
if np.absolute(quantity) > 1:
try: algorithm.MarketOrder(target.Symbol, quantity)
except: pass
self.targetsCollection.ClearFulfilled(algorithm)
#from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
class TechnologyUniverseModule(FundamentalUniverseSelectionModel):
'''
This module selects the most liquid stocks listed on the Nasdaq Stock Exchange.
'''
def __init__(self, filterFineData = True, universeSettings = None, securityInitializer = None):
'''Initializes a new default instance of the TechnologyUniverseModule'''
super().__init__(filterFineData, universeSettings, securityInitializer)
self.numberOfSymbolsCoarse = 1000
self.numberOfSymbolsFine = 500
self.dollarVolumeBySymbol = {}
self.symbols = []
self.lastMonth = -1
def SelectCoarse(self, algorithm, coarse):
'''
Performs a coarse selection:
-The stock must have fundamental data
-The stock must have positive previous-day close price
-The stock must have positive volume on the previous trading day
'''
if algorithm.Time.month == self.lastMonth:
return symbols
filtered = [x for x in coarse if x.HasFundamentalData and x.Volume > 0 and x.Price > 0]
sortedByDollarVolume = sorted(filtered, key = lambda x: x.DollarVolume, reverse=True)[:self.numberOfSymbolsCoarse]
self.dollarVolumeBySymbol.clear()
for x in sortedByDollarVolume:
symbols.append(x.Symbol)
self.dollarVolumeBySymbol[x.Symbol] = x.DollarVolume
return symbols
def SelectFine(self, algorithm, fine):
'''
Performs a fine selection:
-The company's headquarter must in the U.S.
-The stock must be traded on the NASDAQ stock exchange
-At least half a year since its initial public offering
-The stock must be in the Industry Template Code catagory N
'''
if algorithm.Time.month == self.lastMonth:
return self.symbols
self.lastMonth = algorithm.Time.month
# Filter stocks
filteredFine = [x for x in fine if x.CompanyReference.CountryId == "USA"
#and (x.CompanyReference.PrimaryExchangeID == "NAS")
and (algorithm.Time - x.SecurityReference.IPODate).days > 180
and x.CompanyReference.IndustryTemplateCode == "N"]
sortedByDollarVolume = []
# Sort stocks on dollar volume
sortedByDollarVolume = sorted(filteredFine, key = lambda x: self.dollarVolumeBySymbol[x.Symbol], reverse=True)
self.symbols = [x.Symbol for x in sortedByDollarVolume[:self.numberOfSymbolsFine]]
return self.symbolsfrom Execution.ImmediateExecutionModel import ImmediateExecutionModel
from Portfolio.EqualWeightingPortfolioConstructionModel import EqualWeightingPortfolioConstructionModel
#from CorrelatedPairsTrading import CorrelationAndCointegrationPairsTrading
from PairsUniverseSelection import PairsUniverseSelection, CorrelationAndCointegrationPairsTrading
from PairsTradingPortfolioConstruction import PairsTradingPortfolioConstruction
from PairsTradingOrderExecution import PairsTradingOrderExecution
from Selection.UncorrelatedUniverseSelectionModel import UncorrelatedUniverseSelectionModel
class NadionParticleGearbox(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2007, 1, 1) # Set Start Date
self.SetEndDate(2007, 12, 31) # Set Start Date
self.SetCash(2e4) # Set Strategy Cash ** min cash for alpha streams is 1M/1e6
'''
Universes:
LiquidETFUniverse
TechnologyUniverseModule
VolatilityETFUniverse
QC500UniverseSelectionModel
UncorrelatedUniverseSelectionModel
'''
self.SetUniverseSelection(PairsUniverseSelection())
self.UniverseSettings.Resolution = Resolution.Minute # Universe Settings
benchmark = self.AddEquity("SPY").Symbol # Benchmark Symbol
self.SetBenchmark(benchmark) # Setting Benchmark
self.AddAlpha(CorrelationAndCointegrationPairsTrading()) # Alpha Model
self.SetPortfolioConstruction(PairsTradingPortfolioConstruction()) # Portfolio Construction Model
self.SetExecution(PairsTradingOrderExecution())
#self.SetRiskManagement(TrailingStopRiskManagementModel(0.02)) # Add Risk Managment
# AlphaStreamsBrokerageModel
# InteractiveBrokersBrokerageModel
self.SetBrokerageModel(InteractiveBrokersBrokerageModel ()) # Brokerage Model (Alpha Streams is for Prime Brokerage)
#