| Overall Statistics |
|
Total Trades 841 Average Win 0.53% Average Loss -0.30% Compounding Annual Return 0.891% Drawdown 5.900% Expectancy 0.078 Net Profit 9.281% Sharpe Ratio 0.231 Probabilistic Sharpe Ratio 0.057% Loss Rate 61% Win Rate 39% Profit-Loss Ratio 1.74 Alpha 0 Beta 0 Annual Standard Deviation 0.028 Annual Variance 0.001 Information Ratio 0.231 Tracking Error 0.028 Treynor Ratio 0 Total Fees $902.85 Estimated Strategy Capacity $3800000.00 Lowest Capacity Asset JNJ R735QTJ8XC9X |
#region imports
from AlgorithmImports import *
from statsmodels.tsa.stattools import coint
import statsmodels.api as sm
import statsmodels.tsa.stattools as ts
#endregion
def GetCointegrationFromHistory(symbols, history, pvaluefunc):
pvalue_list = []
if not history.empty:
close_history = history.close.unstack(level=0)
close_history = close_history.dropna(axis=1, how="any")
n = close_history.shape[1]
keys = close_history.columns
for i in range(n):
for ii in range(i+1, n):
stock1 = close_history[keys[i]]
stock2 = close_history[keys[ii]]
#Get the name of the stock 1 and 2
asset1 = keys[i]
asset2 = keys[ii]
#If there is nans in the frames, we continue (broken data)
if stock1.hasnans or stock2.hasnans:
self.Debug(f'WARNING! {asset1} and {asset2} has Nans. Did not perform coint')
continue
# pvalue = cointPValue(stock1, stock2)
pvalue = pvaluefunc(stock1, stock2)
sym1 = [item for item in symbols if (str(item.ID) == keys[i])][0]
sym2 = [item for item in symbols if (str(item.ID) == keys[ii])][0]
pvalue_list.append((sym1, sym2, pvalue))
pvalue_list = sorted(pvalue_list, key=lambda x: x[2])
return pvalue_list
def cointPValue(stock1, stock2):
result = coint(stock1, stock2)
return result[1]
def cadfPValue(stock1, stock2):
res = sm.OLS(endog=stock2, exog=stock1).fit()
beta_hr=res.params[0]
residuals = stock2 - beta_hr*stock1
cadf = ts.adfuller(residuals)
return cadf[1]#region imports
from AlgorithmImports import *
#endregion
import numpy as np
from math import floor
class KalmanFilter:
def __init__(self):
self.delta = 1e-4
self.wt = self.delta / (1 - self.delta) * np.eye(2)
self.vt = 1e-3
self.theta = np.zeros(2)
self.P = np.zeros((2, 2))
self.R = None
self.qty = 2000
def update(self, price_one, price_two):
# Create the observation matrix of the latest prices
# of TLT and the intercept value (1.0)
F = np.asarray([price_one, 1.0]).reshape((1, 2))
y = price_two
# The prior value of the states \theta_t is
# distributed as a multivariate Gaussian with
# mean a_t and variance-covariance R_t
if self.R is not None:
self.R = self.C + self.wt
else:
self.R = np.zeros((2, 2))
# Calculate the Kalman Filter update
# ----------------------------------
# Calculate prediction of new observation
# as well as forecast error of that prediction
yhat = F.dot(self.theta)
et = y - yhat
# Q_t is the variance of the prediction of
# observations and hence \sqrt{Q_t} is the
# standard deviation of the predictions
Qt = F.dot(self.R).dot(F.T) + self.vt
sqrt_Qt = np.sqrt(Qt)
# The posterior value of the states \theta_t is
# distributed as a multivariate Gaussian with mean
# m_t and variance-covariance C_t
At = self.R.dot(F.T) / Qt
self.theta = self.theta + At.flatten() * et
self.C = self.R - At * F.dot(self.R)
hedge_quantity = int(floor(self.qty*self.theta[0]))
return et, sqrt_Qt, hedge_quantity
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
from Alphas.BasePairsTradingAlphaModel import BasePairsTradingAlphaModel
from scipy.stats import pearsonr
import statsmodels.api as sm
import statsmodels.tsa.stattools as ts
import numpy as np
from math import floor
from KalmanFilter import KalmanFilter
from Cointegration import GetCointegrationFromHistory, cointPValue, cadfPValue
class KalmanMeanReversionTrading(BasePairsTradingAlphaModel):
''' This alpha model is designed to rank every pair combination by its cointegration tested via CADF
and trade the pairs with the hightest cointegration
This model generates alternating long ratio/short ratio insights emitted as a group'''
def __init__(self, cointegrationLookback = 8,
resolution = Resolution.Daily,
maximumPValue = 0.01,
tradeableUniverseSize = 1,
pvaluefunc = cadfPValue):
'''Initializes a new instance of the KalmanMeanReversionTrading class
Args:
resolution: analysis resolution
maximumPValue: The minimum cointegration to consider a tradable pair
tradeableUniverseSize: the nuber of pairs to be considered for creating insights
'''
super().__init__(resolution = resolution)
self.refreshCointegrationTime = datetime.min
self.candidatesSelectionLookback = cointegrationLookback
self.resolution = resolution
self.maximumPValue = maximumPValue
self.best_pair = ()
self.tradeableUniverseSize = tradeableUniverseSize
self.pvaluefunc = pvaluefunc
def Update(self, algorithm, data):
''' Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
insights = []
if not algorithm.IsWarmingUp:
# goes through the list of pairs and creates insights for those
for key, pair in self.pairs.items():
if data.Bars.ContainsKey(str(pair.asset1.ID)) and data.Bars.ContainsKey(str(pair.asset2.ID)):
if data[str(pair.asset1.ID)].Price and data[str(pair.asset2.ID)].Price:
insights.extend(pair.GetKalmanInsightGroup())
activePositions = [i.Symbol for i in algorithm.Portfolio.Values if i.Security.Holdings.Invested]
if activePositions:
algorithm.Debug(str(algorithm.Time)+" Active Positions "+str([i.Value for i in activePositions]))
candidates = list(dict.fromkeys(list(sum(list(list(zip(*self.candidates))[:2]), ()))))
outdatedPositions = [i for i in activePositions if i not in candidates]
if outdatedPositions:
algorithm.Debug(str(algorithm.Time)+" Outdated Assets - Positions will be closed: "+str([i.Value for i in outdatedPositions]))
for i in outdatedPositions:
if data.ContainsKey(str(i.ID)) and data[str(i.ID)]:
if data[str(i.ID)].Price:
insights.extend(Insight.Group(Insight.Price(i, timedelta(1), InsightDirection.Flat)))
return insights
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed.
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
self.candidates = []
for security in changes.AddedSecurities:
self.Securities.append(security)
for security in changes.RemovedSecurities:
if security in self.Securities:
self.Securities.remove(security)
symbols = [ x.Symbol for x in self.Securities ]
history = algorithm.History(symbols, self.candidatesSelectionLookback, self.resolution)
pvalue_list = GetCointegrationFromHistory(symbols, history, self.pvaluefunc)
self.candidates = [x for x in pvalue_list if x[2]<self.maximumPValue][:self.tradeableUniverseSize]
# remove "stealth" duplicates, i.e. symbols that resolve to the same ticker but with different IDs
duplicates = [item for item in self.candidates if item[0].Value == item[1].Value]
for x in duplicates:
self.candidates.remove(x)
# remove multile pairs containing the same symbol. This would otherwise confuse the trading logic, when insights
# are being generated for an asset with an existing position. Keep only the pair with the lowest p-Value
for item in self.candidates:
l = [x for x in self.candidates if x[0]==item[0] or x[1]==item[0]]
l.extend([x for x in self.candidates if x[0]==item[1] or x[1]==item[1]])
multiples = sorted([*set(l)], key=lambda x: x[2])[1:]
for x in multiples:
self.candidates.remove(x)
universestring = ""
for i in self.candidates:
universestring = universestring + i[0].Value + "/" + i[1].Value + " "
algorithm.Debug (str(algorithm.Time)+" Pairs found with p-Value below "+str(self.maximumPValue)+": "+str(len(self.candidates))+": "+universestring)
self.UpdatePairs(algorithm)
for security in changes.RemovedSecurities:
keys = [k for k in self.pairs.keys() if security.Symbol in k]
for key in keys:
self.pairs.pop(key)
def UpdatePairs(self, algorithm):
# adds only the candidate pairs to the list of pairs (the ones that return HasPassedTest as True)
symbols = sorted([x.Symbol for x in self.Securities], key=lambda x: str(x.ID))
for i in range(0, len(symbols)):
asset_i = symbols[i]
for j in range(1 + i, len(symbols)):
asset_j = symbols[j]
pair_symbol = (asset_i, asset_j)
invert = (asset_j, asset_i)
# if the pair is not in our list of candidates to trade (that's what HasPassedTest checks),
# then remove the pair for the overall pairs list and skip the rest of this loop
if not self.HasPassedTest(algorithm, asset_i, asset_j):
# check if the pair is currently in our pairs list (i.e. it was a candidate pair before, but not anymore)
# then remove the pair from our list, but only if we don't have a position open
# (we want the algo to be able to create flat insights on it to close the pair!)
if pair_symbol in self.pairs:
if self.pairs[pair_symbol].state == 0:
self.pairs.pop(pair_symbol)
elif invert in self.pairs:
if self.pairs[invert].state == 0:
self.pairs.pop(invert)
continue
# to get here the pair must have already passed the test, i.e. is in our candidates list.
# if it is already in our list of pairs, don't do anything further.
if pair_symbol in self.pairs or invert in self.pairs:
continue
# this part is only reached for new pairs that have passed the candidates test.
# so we create a new pair instance and add it to our list
# using timedelta(1) as prediction interval seems to work well. This used to be the interval determeined in the base class BasePairsTradingAlphaModel
pair = self.Pair(algorithm, asset_i, asset_j, timedelta(1), self.resolution)
self.pairs[pair_symbol] = pair
def HasPassedTest(self, algorithm, asset1, asset2):
'''Check whether the assets pass a pairs trading test
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
Returns:
True if the pair is in the candidates list'''
# all elements of self.candidates pass the test
pairToTest = (asset1.Value, asset2.Value)
inversePairToTest = (asset2.Value, asset1.Value)
for candidate in self.candidates:
if (candidate[0].Value, candidate[1].Value) == pairToTest or (candidate[0].Value, candidate[1].Value) == inversePairToTest:
return True
return False
class Pair:
class State(Enum):
ShortRatio = -1
FlatRatio = 0
LongRatio = 1
def __init__(self, algorithm, asset1, asset2, predictionInterval, resolution):
'''Create a new pair
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
predictionInterval: Period over which this insight is expected to come to fruition
resolution: analysis resolution
'''
self.kf = KalmanFilter()
self.algo = algorithm
self.resolution = resolution
self.state = self.State.FlatRatio
self.asset1 = asset1
self.asset2 = asset2
self.asset1Price = algorithm.Identity(asset1)
self.asset2Price = algorithm.Identity(asset2)
self.predictionInterval = predictionInterval
def GetKalmanInsightGroup(self):
'''Gets the insights group for the pair
Returns:
Insights grouped by an unique group id'''
forecast_error, prediction_std_dev, hedge_quantity = self.kf.update(self.asset1Price.Current.Price, self.asset2Price.Current.Price)
# Close existing Long/Short positions if the Kalman Filter predicts reversion to the mean
if (self.state is self.State.LongRatio and (forecast_error >= -prediction_std_dev)) \
or (self.state is self.State.ShortRatio and (forecast_error >= -prediction_std_dev)):
self.state = self.State.FlatRatio
flatAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Flat)
flatAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Flat)
return Insight.Group(flatAsset1, flatAsset2)
if self.state is self.State.FlatRatio:
# Open Long positions if the Kalman Filter predicts breakout from the mean to the downside
if forecast_error < -prediction_std_dev:
self.state = self.State.LongRatio
shortAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Down)
longAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Up)
return Insight.Group(shortAsset1, longAsset2)
# Open Short positions if the Kalman Filter predicts breakout from the mean to the upside
elif forecast_error > prediction_std_dev:
self.state = self.State.ShortRatio
longAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Up)
shortAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Down)
return Insight.Group(longAsset1, shortAsset2)
return []#region imports
from AlgorithmImports import *
#endregion
from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from itertools import groupby
from datetime import datetime, timedelta
from pytz import utc
UTCMIN = datetime.min.replace(tzinfo=utc)
#endregion
class EqualWeightedPairsTradingPortfolio(PortfolioConstructionModel):
def __init__(self, percentage = 1.0):
self.insightCollection = InsightCollection()
self.removedSymbols = []
self.percentage = percentage
def CreateTargets(self, algorithm, insights):
targets = []
#Get expiredInsights
expiredInsights = self.insightCollection.RemoveExpiredInsights(algorithm.UtcTime)
#rebalance logic
if self.ShouldCreateTargets(insights, expiredInsights) == False:
return targets
# here we get the new insights and add them to our insight collection
for insight in insights:
self.insightCollection.Add(insight)
# create flatten target for each security that was removed from the universe
if len(self.removedSymbols) > 0:
#check if the tickers is in invested, otherwise, do nothing
universeDeselectionTargets = [PortfolioTarget(symbol, 0) for symbol in self.removedSymbols if algorithm.Portfolio[symbol].Invested]
self.removedSymbols = []
pop_insights = []
#If we have something in the universeDeselectionTargets
if universeDeselectionTargets:
#loop over the targets
for target in universeDeselectionTargets:
#If the symbol is in our insightCollection then we have to remove that insight
if self.insightCollection.ContainsKey(target.Symbol):
#Get a list of the insights (there maybe more than 1)
insights_list = self.insightCollection[target.Symbol]
#loop over the insights
for insight in insights_list:
#loop over the insights in insightcollection
for insightCollection in self.insightCollection:
#if the insights have been send together (.GroupId), we liquidate both stocks and send the insights to a list, to remove those insights from the collection
if insight.GroupId == insightCollection.GroupId:
targets.extend([PortfolioTarget(insight.Symbol, 0)] + [PortfolioTarget(insightCollection.Symbol, 0)])
pop_insights.extend([insight] + [insightCollection])
for insight in pop_insights:
self.insightCollection.Remove(insight)
#loop over the insights. If the symbol does NOT have an active insight, we can liquidate this stock
for symbol, f in groupby(expiredInsights, lambda x: x.Symbol):
if not self.insightCollection.HasActiveInsights(symbol, algorithm.UtcTime):
targets.append(PortfolioTarget(symbol, 0))
# get insight that have not expired of each symbol that is still in the universe
activeInsights = self.insightCollection.GetActiveInsights(algorithm.UtcTime)
#sort by the most recent insight generated, so it is only the first insights being generated that is being used
lastActiveInsights = sorted(activeInsights, key= lambda x: x.GeneratedTimeUtc, reverse=True)
#get the len of the active insights, and loop over the insights
pairs = {}
for i in range(len(lastActiveInsights)):
for ii in range(i+1, len(lastActiveInsights)):
#get the insights
insight_i = lastActiveInsights[i]
insight_ii = lastActiveInsights[ii]
#get the pairs
pairs_symbol = (insight_i.Symbol, insight_ii.Symbol)
invert = (insight_ii.Symbol, insight_i.Symbol)
#if the stocks is already in the pairs, continue
if pairs_symbol in pairs or invert in pairs:
continue
#If the insights is of the same groupId, we know that these belong together, so we append to the pairs
if insight_i.GroupId == insight_ii.GroupId:
pairs[(pairs_symbol)] = [insight_i.Direction, insight_ii.Direction]
#Here, we calculated the score of the insights
calculatedTargets = {}
for key, value in pairs.items():
for insight, direction in zip(key, value):
if insight not in calculatedTargets:
calculatedTargets[insight] = direction
else:
calculatedTargets[insight] += direction
# determine target percent for the given insights
# weightFactor = self.percentage
weightFactor = 1.0
weightSums = sum(abs(direction) for symbol, direction in calculatedTargets.items())
if weightSums > 1:
# CHECK THIS:
weightFactor = 1 / weightSums
#Send the portfolio targets out, with the correct allocation percent, and append to the targets
for symbol, weight in calculatedTargets.items():
allocationPercent = weight * weightFactor * self.percentage
target = PortfolioTarget.Percent(algorithm, symbol, allocationPercent)
targets.append(target)
# targetsymbols = [i.Symbol.Value for i in targets]
# algorithm.Debug(str(algorithm.Time)+" Targets are "+str(targetsymbols))
# try:
return targets
# except:
# algorithm.Debug(str(algorithm.Time)+" EXCEPTION WHEN RETURNING TARGETS")
# return []
def OnSecuritiesChanged(self, algorithm, changes):
#Get the removed symbols
newRemovedSymbols = [x.Symbol for x in changes.RemovedSecurities if x.Symbol not in self.removedSymbols]
# get removed symbol and invalidate them in the insight collection
self.removedSymbols.extend(newRemovedSymbols)
#remove insights that have not been invested in anymore
not_invested_symbols = [symbol for symbol in self.removedSymbols if not algorithm.Portfolio[symbol].Invested]
self.insightCollection.Clear(not_invested_symbols)
def ShouldCreateTargets(self, insights, expiredInsights):
if len(insights) == 0 and len(self.removedSymbols) == 0 and len(expiredInsights) == 0:
return False
else:
return True
class EqualWeightingPortfolioConstructionModelPercentage(PortfolioConstructionModel):
'''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities.
The target percent holdings of each security is 1/N where N is the number of securities.
For insights of direction InsightDirection.Up, long targets are returned and
for insights of direction InsightDirection.Down, short targets are returned.'''
def __init__(self, rebalance = Resolution.Daily, portfolioBias = PortfolioBias.LongShort, percentage = 1.0):
'''Initialize a new instance of EqualWeightingPortfolioConstructionModel
Args:
rebalance: Rebalancing parameter. If it is a timedelta, date rules or Resolution, it will be converted into a function.
If None will be ignored.
The function returns the next expected rebalance time for a given algorithm UTC DateTime.
The function returns null if unknown, in which case the function will be called again in the
next loop. Returning current time will trigger rebalance.
portfolioBias: Specifies the bias of the portfolio (Short, Long/Short, Long)'''
super().__init__()
self.portfolioBias = portfolioBias
self.percentage = percentage
# If the argument is an instance of Resolution or Timedelta
# Redefine rebalancingFunc
rebalancingFunc = rebalance
if isinstance(rebalance, int):
rebalance = Extensions.ToTimeSpan(rebalance)
if isinstance(rebalance, timedelta):
rebalancingFunc = lambda dt: dt + rebalance
if rebalancingFunc:
self.SetRebalancingFunc(rebalancingFunc)
def DetermineTargetPercent(self, activeInsights):
'''Will determine the target percent for each insight
Args:
activeInsights: The active insights to generate a target for'''
result = {}
# give equal weighting to each security
count = sum(x.Direction != InsightDirection.Flat and self.RespectPortfolioBias(x) for x in activeInsights)
percent = 0 if count == 0 else self.percentage / count
for insight in activeInsights:
result[insight] = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat) * percent
return result
def RespectPortfolioBias(self, insight):
'''Method that will determine if a given insight respects the portfolio bias
Args:
insight: The insight to create a target for
'''
return self.portfolioBias == PortfolioBias.LongShort or insight.Direction == self.portfolioBias#region imports
from AlgorithmImports import *
#endregion
from QuantConnect.Algorithm.Framework.Risk import RiskManagementModel
# bracket risk model class
class BracketRiskModel(RiskManagementModel):
'''Creates a trailing stop loss for the maximumDrawdownPercent value and a profit taker for the maximumUnrealizedProfitPercent value'''
def __init__(self, maximumDrawdownPercent = 0.05, maximumUnrealizedProfitPercent = 0.05):
self.maximumDrawdownPercent = -abs(maximumDrawdownPercent)
self.trailingHighs = dict()
self.maximumUnrealizedProfitPercent = abs(maximumUnrealizedProfitPercent)
def ManageRisk(self, algorithm, targets):
riskAdjustedTargets = list()
for kvp in algorithm.Securities:
symbol = kvp.Key
security = kvp.Value
# Remove if not invested
if not security.Invested:
self.trailingHighs.pop(symbol, None)
continue
pnl = security.Holdings.UnrealizedProfitPercent
if pnl > self.maximumUnrealizedProfitPercent:
# liquidate
algorithm.Debug(f"Profit Taken: {security.Symbol}")
algorithm.Log(f"Profit Taken: {security.Symbol}")
riskAdjustedTargets.append(PortfolioTarget(security.Symbol, 0))
return riskAdjustedTargets
# Add newly invested securities
if symbol not in self.trailingHighs:
self.trailingHighs[symbol] = security.Holdings.AveragePrice # Set to average holding cost
continue
# Check for new highs and update - set to tradebar high
if self.trailingHighs[symbol] < security.High:
self.trailingHighs[symbol] = security.High
continue
# Check for securities past the drawdown limit
securityHigh = self.trailingHighs[symbol]
drawdown = (security.Low / securityHigh) - 1
if drawdown < self.maximumDrawdownPercent:
# liquidate
algorithm.Debug(f"Losses Taken: {security.Symbol}")
algorithm.Log(f"Losses Taken: {security.Symbol}")
riskAdjustedTargets.append(PortfolioTarget(symbol, 0))
return riskAdjustedTargets#region imports
from AlgorithmImports import *
from QuantConnect.Data.UniverseSelection import *
from Selection.FundamentalUniverseSelectionModel import FundamentalUniverseSelectionModel
from statsmodels.tsa.stattools import coint
import statistics
import operator
from Cointegration import GetCointegrationFromHistory, cointPValue, cadfPValue
class StaticCategorySelection(FundamentalUniverseSelectionModel):
class Category:
Sectors = {
"FinancialServices": MorningstarSectorCode.FinancialServices,
"RealEstate": MorningstarSectorCode.RealEstate,
"Healthcare": MorningstarSectorCode.Healthcare,
"Utilities": MorningstarSectorCode.Utilities,
"Technology": MorningstarSectorCode.Technology,
"BasicMaterials": MorningstarSectorCode.BasicMaterials,
"ConsumerCyclical": MorningstarSectorCode.ConsumerCyclical,
"ConsumerDefensive": MorningstarSectorCode.ConsumerDefensive,
"CommunicationServices": MorningstarSectorCode.CommunicationServices,
"Energy": MorningstarSectorCode.Energy,
"Industrials": MorningstarSectorCode.Industrials
}
Industries = {
"Agriculture": MorningstarIndustryGroupCode.Agriculture,
"BuildingMaterials": MorningstarIndustryGroupCode.BuildingMaterials,
"Chemicals": MorningstarIndustryGroupCode.Chemicals,
"ForestProducts": MorningstarIndustryGroupCode.ForestProducts,
"MetalsAndMining": MorningstarIndustryGroupCode.MetalsAndMining,
"Steel": MorningstarIndustryGroupCode.Steel,
"VehiclesAndParts": MorningstarIndustryGroupCode.VehiclesAndParts,
"Furnishings": MorningstarIndustryGroupCode.Furnishings,
"FixturesAndAppliances": MorningstarIndustryGroupCode.FixturesAndAppliances,
"HomebuildingAndConstruction": MorningstarIndustryGroupCode.HomebuildingAndConstruction,
"ManufacturingApparelAndAccessories": MorningstarIndustryGroupCode.ManufacturingApparelAndAccessories,
"PackagingAndContainers": MorningstarIndustryGroupCode.PackagingAndContainers,
"PersonalServices": MorningstarIndustryGroupCode.PersonalServices,
"Restaurants": MorningstarIndustryGroupCode.Restaurants,
"RetailCyclical": MorningstarIndustryGroupCode.RetailCyclical,
"TravelAndLeisure": MorningstarIndustryGroupCode.TravelAndLeisure,
"AssetManagement": MorningstarIndustryGroupCode.AssetManagement,
"Banks": MorningstarIndustryGroupCode.Banks,
"CapitalMarkets": MorningstarIndustryGroupCode.CapitalMarkets,
"Insurance": MorningstarIndustryGroupCode.Insurance,
"DiversifiedFinancialServices": MorningstarIndustryGroupCode.DiversifiedFinancialServices,
"CreditServices": MorningstarIndustryGroupCode.CreditServices,
"RealEstateIndustry": MorningstarIndustryGroupCode.RealEstate,
"REITs": MorningstarIndustryGroupCode.REITs,
"BeveragesAlcoholic": MorningstarIndustryGroupCode.BeveragesAlcoholic,
"BeveragesNonAlcoholic": MorningstarIndustryGroupCode.BeveragesNonAlcoholic,
"ConsumerPackagedGoods": MorningstarIndustryGroupCode.ConsumerPackagedGoods,
"Education": MorningstarIndustryGroupCode.Education,
"RetailDefensive": MorningstarIndustryGroupCode.RetailDefensive,
"TobaccoProducts": MorningstarIndustryGroupCode.TobaccoProducts,
"Biotechnology": MorningstarIndustryGroupCode.Biotechnology,
"DrugManufacturers": MorningstarIndustryGroupCode.DrugManufacturers,
"HealthcarePlans": MorningstarIndustryGroupCode.HealthcarePlans,
"HealthcareProvidersAndServices": MorningstarIndustryGroupCode.HealthcareProvidersAndServices,
"MedicalDevicesAndInstruments": MorningstarIndustryGroupCode.MedicalDevicesAndInstruments,
"MedicalDiagnosticsAndResearch": MorningstarIndustryGroupCode.MedicalDiagnosticsAndResearch,
"MedicalDistribution": MorningstarIndustryGroupCode.MedicalDistribution,
"UtilitiesIndependentPowerProducers": MorningstarIndustryGroupCode.UtilitiesIndependentPowerProducers,
"UtilitiesRegulated": MorningstarIndustryGroupCode.UtilitiesRegulated,
"TelecommunicationServices": MorningstarIndustryGroupCode.TelecommunicationServices,
"MediaDiversified": MorningstarIndustryGroupCode.MediaDiversified,
"InteractiveMedia": MorningstarIndustryGroupCode.InteractiveMedia,
"OilAndGas": MorningstarIndustryGroupCode.OilAndGas,
"OtherEnergySources": MorningstarIndustryGroupCode.OtherEnergySources,
"AerospaceAndDefense": MorningstarIndustryGroupCode.AerospaceAndDefense,
"BusinessServices": MorningstarIndustryGroupCode.BusinessServices,
"Conglomerates": MorningstarIndustryGroupCode.Conglomerates,
"Construction": MorningstarIndustryGroupCode.Construction,
"FarmAndHeavyConstructionMachinery": MorningstarIndustryGroupCode.FarmAndHeavyConstructionMachinery,
"IndustrialDistribution": MorningstarIndustryGroupCode.IndustrialDistribution,
"IndustrialProducts": MorningstarIndustryGroupCode.IndustrialProducts,
"Transportation": MorningstarIndustryGroupCode.Transportation,
"WasteManagement": MorningstarIndustryGroupCode.WasteManagement,
"Software": MorningstarIndustryGroupCode.Software,
"Hardware": MorningstarIndustryGroupCode.Hardware,
"Semiconductors": MorningstarIndustryGroupCode.Semiconductors
}
def __init__(self, rebalanceFunc, numCoarse, minmarketcap, minPrice, minVolume, category):
super().__init__(filterFineData = True, universeSettings = None)
self.rebalanceFunc = rebalanceFunc
self.numCoarse = numCoarse
self.minmarketcap = minmarketcap
self.minPrice = minPrice
self.minVolume = minVolume
self.category = category
self.nextRebalance = None
def SelectCoarse(self, algorithm, coarse):
#rebalance function
if self.nextRebalance is not None and algorithm.Time < self.nextRebalance:
return Universe.Unchanged
self.nextRebalance = self.rebalanceFunc(algorithm.Time)
selected = sorted([x for x in coarse if x.HasFundamentalData and x.Price > self.minPrice and x.DollarVolume > self.minVolume], key=lambda x: x.DollarVolume, reverse=True)
return [x.Symbol for x in selected[:self.numCoarse]]
def SelectFine(self, algorithm, fine):
sorted_market_cap = [x for x in fine if x.MarketCap > self.minmarketcap]
if self.category != "all":
# self.category is a string with a key for either self.sectors or self.industries
# the keys in the two dicts are non overlapping (after renaming 'RealEstate' to 'RealEstateIndustry' in the latter)
# so that we can just switch between these two classifications by checking which dict self.category belongs to
if self.category in self.Category.Sectors:
selectedSymbols = [x.Symbol for x in sorted_market_cap if x.AssetClassification.MorningstarSectorCode == self.Category.Sectors[self.category]]
elif self.category in self.Category.Industries:
selectedSymbols = [x.Symbol for x in sorted_market_cap if x.AssetClassification.MorningstarIndustryGroupCode == self.Category.Industries[self.category]]
else:
selectedSymbols = [x.Symbol for x in sorted_market_cap]
# if for whaetver reason the universe only contains 1 symbol, return none as we can't form any pairs
if len(selectedSymbols) < 2:
selectedSymbols = []
algorithm.Debug(str(algorithm.Time)+" Rebalance Done | Static Category: "+self.category+" | Number of symbols: "+str(len(selectedSymbols))+" | Next Rebalance at "+str(self.nextRebalance.date()))
return selectedSymbols
class DynamicCategorySelection(StaticCategorySelection):
def __init__(self, rebalanceFunc, numCoarse, lookback, resolution, minmarketcap, minPrice, minVolume, category, categories, pvaluefunc, percentile):
super().__init__(rebalanceFunc, numCoarse, minmarketcap, minPrice, minVolume, category)
self.lookback = lookback
self.resolution = resolution
self.categories = categories
self.pvaluefunc = pvaluefunc
self.pecentile = percentile
self.nextRebalance = None
def SelectFine(self, algorithm, fine):
sorted_market_cap = [x for x in fine if x.MarketCap > self.minmarketcap]
industry_pvalue = {}
if not self.categories:
return [x.Symbol for x in sorted_market_cap]
for industry in list(self.categories.values()):
if self.categories == self.Category.Sectors:
industry_list_symbol = [x.Symbol for x in sorted_market_cap if x.AssetClassification.MorningstarSectorCode == industry]
else:
industry_list_symbol = [x.Symbol for x in sorted_market_cap if x.AssetClassification.MorningstarIndustryGroupCode == industry]
history = algorithm.History(industry_list_symbol, self.lookback, self.resolution)
pvalue_list = [x[2] for x in GetCointegrationFromHistory(industry_list_symbol, history, self.pvaluefunc)]
if pvalue_list:
mean = statistics.fmean(pvalue_list[:math.ceil(len(pvalue_list)*self.pecentile/100)])
industry_pvalue[industry] = mean
if not industry_pvalue:
return []
top = min(industry_pvalue, key=industry_pvalue.get)
selectedSymbols = [x.Symbol for x in fine if x.AssetClassification.MorningstarIndustryGroupCode == top]
# if for whaetver reason the universe only contains 1 symbol, return none as we can't form any pairs
if len(selectedSymbols) < 2:
selectedSymbols = []
industryname = str(list(self.categories.keys())[list(self.categories.values()).index(top)])
algorithm.Debug(str(algorithm.Time)+" Rebalance Done | Best Category: "+industryname+" | Industry p-Value: "+str(industry_pvalue[top])+" | Number of symbols: "+str(len(selectedSymbols))+" | Next Rebalance at "+str(self.nextRebalance.date()))
return selectedSymbols# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
from Alphas.BasePairsTradingAlphaModel import BasePairsTradingAlphaModel
from scipy.stats import pearsonr
import statsmodels.api as sm
import statsmodels.tsa.stattools as ts
from KalmanMeanReversionAlphaModel import KalmanMeanReversionTrading
import numpy as np
from math import floor
from KalmanFilter import KalmanFilter
from Cointegration import GetCointegrationFromHistory, cointPValue, cadfPValue
class ZScoreMeanReversionTrading(KalmanMeanReversionTrading):
''' This alpha model is designed to rank every pair combination by its cointegration tested via CADF
and trade the pairs with the hightest cointegration
This model generates alternating long ratio/short ratio insights emitted as a group'''
def __init__(self, cointegrationLookback = 8,
resolution = Resolution.Daily,
maximumPValue = 0.01,
tradeableUniverseSize = 1,
pvaluefunc = cadfPValue,
lookback = 475,
threshold = 1,
closingThreshold = 0.0,
invalidationThreshold = 10):
'''Initializes a new instance of the ZScoreMeanReversionTrading class
Args:
resolution: analysis resolution
maximumPValue: The minimum cointegration to consider a tradable pair
tradeableUniverseSize: the nuber of pairs to be considered for creating insights
'''
super().__init__(resolution = resolution)
self.refreshCointegrationTime = datetime.min
self.candidatesSelectionLookback = cointegrationLookback
self.resolution = resolution
self.maximumPValue = maximumPValue
self.best_pair = ()
self.tradeableUniverseSize = tradeableUniverseSize
self.pvaluefunc = pvaluefunc
self.lookback = lookback
self.threshold = threshold
self.closingThreshold = closingThreshold
self.invalidationThreshold = invalidationThreshold
def Update(self, algorithm, data):
''' Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
insights = []
if not algorithm.IsWarmingUp:
# goes through the list of pairs and creates insights for those
for key, pair in self.pairs.items():
if data.Bars.ContainsKey(str(pair.asset1.ID)) and data.Bars.ContainsKey(str(pair.asset2.ID)):
if data[str(pair.asset1.ID)].Price and data[str(pair.asset2.ID)].Price:
insights.extend(pair.GetZScoreInsightGroup())
activePositions = [i.Symbol for i in algorithm.Portfolio.Values if i.Security.Holdings.Invested]
if activePositions:
algorithm.Debug(str(algorithm.Time)+" Active Positions "+str([i.Value for i in activePositions]))
candidates = list(dict.fromkeys(list(sum(list(list(zip(*self.candidates))[:2]), ()))))
outdatedPositions = [i for i in activePositions if i not in candidates]
if outdatedPositions:
algorithm.Debug(str(algorithm.Time)+" Outdated Assets - Positions will be closed: "+str([i.Value for i in outdatedPositions]))
for i in outdatedPositions:
if data.ContainsKey(str(i.ID)) and data[str(i.ID)]:
if data[str(i.ID)].Price:
insights.extend(Insight.Group(Insight.Price(i, timedelta(1), InsightDirection.Flat)))
return insights
def OnSecuritiesChanged(self, algorithm, changes):
super().OnSecuritiesChanged(algorithm, changes)
def UpdatePairs(self, algorithm):
# adds only the candidate pairs to the list of pairs (the ones that return HasPassedTest as True)
symbols = sorted([x.Symbol for x in self.Securities], key=lambda x: str(x.ID))
for i in range(0, len(symbols)):
asset_i = symbols[i]
for j in range(1 + i, len(symbols)):
asset_j = symbols[j]
pair_symbol = (asset_i, asset_j)
invert = (asset_j, asset_i)
# if the pair is not in our list of candidates to trade (that's what HasPassedTest checks),
# then remove the pair for the overall pairs list and skip the rest of this loop
if not self.HasPassedTest(algorithm, asset_i, asset_j):
# check if the pair is currently in our pairs list (i.e. it was a candidate pair before, but not anymore)
# then remove the pair from opur list, but only if we don't have a position open
# (we want the algo to be able to create flat insights on it to close the pair!)
if pair_symbol in self.pairs:
if self.pairs[pair_symbol].state == 0:
self.pairs.pop(pair_symbol)
elif invert in self.pairs:
if self.pairs[invert].state == 0:
self.pairs.pop(invert)
continue
# to get here the pair must have already passed the test, i.e. is in our candidates list.
# if it is already in our list of pairs, don't do anything further.
if pair_symbol in self.pairs or invert in self.pairs:
continue
# this part is only reached for new pairs that have passed the candidates test.
# so we create a new pair instance and add it to our list
# using timedelta(1) as prediction interval seems to work well. This used to be the interval determeined in the base class BasePairsTradingAlphaModel
pair = self.Pair(algorithm, asset_i, asset_j, timedelta(1), self.resolution, self.lookback, self.threshold, self.closingThreshold, self.invalidationThreshold)
self.pairs[pair_symbol] = pair
class Pair:
class State(Enum):
ShortRatio = -1
FlatRatio = 0
LongRatio = 1
def __init__(self, algorithm, asset1, asset2, predictionInterval, resolution, lookback, threshold, closingThreshold, invalidationThreshold):
'''Create a new pair
Args:
algorithm: The algorithm instance that experienced the change in securities
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
predictionInterval: Period over which this insight is expected to come to fruition
resolution: analysis resolution
'''
self.algo = algorithm
self.resolution = resolution
self.lookback = lookback
self.threshold = threshold
self.closingThreshold = closingThreshold
self.invalidationThreshold = invalidationThreshold
self.state = self.State.FlatRatio
self.asset1 = asset1
self.asset2 = asset2
self.asset1Price = algorithm.Identity(asset1)
self.asset2Price = algorithm.Identity(asset2)
# Indicators needed for z-Score:
self.ratio = IndicatorExtensions.Over(self.asset2Price, self.asset1Price)
self.mean = IndicatorExtensions.Of(SimpleMovingAverage(self.lookback), self.ratio)
self.std = IndicatorExtensions.Of(StandardDeviation(self.lookback), self.ratio)
self.residual = IndicatorExtensions.Minus(self.ratio, self.mean)
self.zscore = IndicatorExtensions.Over(self.residual,self.std)
self.predictionInterval = predictionInterval
def GetZScoreInsightGroup(self):
'''Gets the insights group for the pair
Returns:
Insights grouped by an unique group id'''
# close positions if ratio reverts to mean or exceeds safe area
if (self.state is self.State.LongRatio and (self.zscore.Current.Value <= self.closingThreshold or self.zscore.Current.Value >= self.invalidationThreshold)) \
or (self.state is self.State.ShortRatio and (self.zscore.Current.Value >= -self.closingThreshold or self.zscore.Current.Value <= -self.invalidationThreshold)):
self.state = self.State.FlatRatio
flatAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Flat)
flatAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Flat)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(flatAsset1, flatAsset2)
# don't re-emit the same direction
if self.state is not self.State.LongRatio and self.zscore.Current.Value > self.threshold:
self.state = self.State.LongRatio
# asset1/asset2 is more than 1 std away from mean, short asset1, long asset2
shortAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Down)
longAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Up)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(shortAsset1, longAsset2)
# don't re-emit the same direction
if self.state is not self.State.ShortRatio and self.zscore.Current.Value < -self.threshold:
self.state = self.State.ShortRatio
# asset1/asset2 is less than 1 std away from mean, long asset1, short asset2
longAsset1 = Insight.Price(self.asset1, self.predictionInterval, InsightDirection.Up)
shortAsset2 = Insight.Price(self.asset2, self.predictionInterval, InsightDirection.Down)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(longAsset1, shortAsset2)
return []# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2014 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from AlgorithmImports import *
from KalmanMeanReversionAlphaModel import KalmanMeanReversionTrading
from ZScoreMeanReversionAlphaModel import ZScoreMeanReversionTrading
from PortfolioConstruction import EqualWeightedPairsTradingPortfolio
from Cointegration import GetCointegrationFromHistory, cointPValue, cadfPValue
from UniverseSelection import StaticCategorySelection, DynamicCategorySelection
from statsmodels.tsa.stattools import coint
import statistics
import operator
import numpy as np
import math
### <summary>
### Framework algorithm that uses MeanReversionTrading.
### This model extendes BasePairsTradingAlphaModel and uses Cointegration
### to rank the pairs trading candidates and use the n best candidates to trade.
### </summary>
class CointegrationPairsTradingAlphaModelFrameworkAlgorithm(QCAlgorithm):
class Timeframe(Enum):
Minute = 0
Hour = 1
Day = 2
Week = 3
Month = 4
Quarter = 5
class Categorisation(Enum):
Sectors = 0
Industries = 1
class AlgoType:
Pearson = 0
ZScore = 1
Kalman = 2
class UniverseSelectionType:
Static = 0
Dynamic = 1
def Initialize(self):
self.candidates = []
self.SetStartDate(2010, 1, 1)
self.SetEndDate(2020, 1, 1)
self.SetCash(10000)
# used to iterate through all possible sectors or industry groups in an optimiser
# self.categoryIndex = self.GetParameter("categoryIndex", 10)
# self.category = list(self.sectors)[self.categoryIndex]
# self.category = list(self.industries)[self.categoryIndex]
# all other parameters as external parameters - uncomment selectively
# (and comment the corresponding declaration further below) for optimiser runs
# Overall parameters:
# self.resolution = self.GetResolution(self.GetParameter("resolution", self.GetResolution(self.Timeframe.Hour)))
# self.universeResolution = self.GetResolution(self.GetParameter("universeResolution", self.GetResolution(self.Timeframe.Hour)))
# self.refreshCycle = self.GetParameter("refreshCycle", self.Timeframe.Week)
# self.algoType = self.GetParameter("algoType", self.AlgoType.ZScore)
# self.selectionType = self.GetParameter("selectionType", self.UniverseSelectionType.Dynamic)
# self.capitalAllocation = self.GetParameter("capitalAllocation", 0.95)
# Fundamental Data selection parameters:
# self.minmarketcap = self.GetParameter("minmarketcap", 500000000)
# self.numCoarse = self.GetParameter("numCoarse", 100)
# self.minPrice = self.GetParameter("minPrice", 5)
# self.minVolume = self.GetParameter("minVolume", 0)
# used for Kalman and z-Score AlphaModule AND in Dynamic Universe Selection:
# pvaluefunc = self.GetParameter("pvaluefunc") or "cointPValue"
# if pvaluefunc == "cadfPValue":
# self.pvaluefunc = cadfPValue
# else:
# self.pvaluefunc = cointPValue
# used only for Static Universe Selection
# self.category = self.GetParameter("category") or "MedicalDiagnosticsAndResearch"
# used only for Dynamic Universe Selection
# self.categorisation = self.GetParameter("categorisation", self.Categorisation.Industries)
# self.categorySelectionLookback = self.GetParameter("categorySelectionLookback", 500)
# self.percentile = self.GetParameter("percentile", 5)
# Pearson Correlation only:
# self.minimumCorrelation = self.GetParameter("minimumCorrelation", 0.5)
# z-Score trading logic only:
# self.closingThreshold = self.GetParameter("closingThreshold", 0.0)
# self.invalidationThreshold = self.GetParameter("invalidationThreshold", 10)
# Pearson and z-Score trading logic:
# self.threshold = self.GetParameter("threshold", 1.2)
# self.TradingLogicLookback = self.GetParameter("TradingLogicLookback", 520)
# Kalman and z-Score trading logic:
# self.candidatesSelectionLookback = self.GetParameter("candidatesSelectionLookback", 1200)
# self.tradeableUniverseSize = self.GetParameter("tradeableUniverseSize", 1)
# self.maximumPValue = self.GetParameter("maximumPValue", 0.01)
# BEGIN ---------------------- all parameters here (comment here and uncomment external parameter declarations for optimisers)
# Overall parameters:
self.resolution = self.GetResolution(self.Timeframe.Hour) # resolution of bars for trading and lookbacks
self.universeResolution = self.GetResolution(self.Timeframe.Hour) # Universe resolution can be set differently from the resolution used in the AlphaModule
self.refreshCycle = self.Timeframe.Week # the cycle for refreshing the universe and determining which industry/sector to trade in
self.algoType = self.AlgoType.ZScore # AlphaModule to use: Pearson, ZScore or Kalman
self.selectionType = self.UniverseSelectionType.Dynamic # Universe Selection Static or Dynamic (refreshed periodically)
self.capitalAllocation = 0.95 # percentage of available cash to be invested
# Fundamental Data selection parameters:
self.minmarketcap = 500000000 # the minimum market cap for a symbol to be included in the universe
self.numCoarse = 100 # the top x symbols by USD Vollum to include in the universe
self.minPrice = 5 # minimim stock price (to eliminate penny stocks)
self.minVolume = 0 # minimum trading volume to eliminate illiquid markets
# used for Kalman and z-Score AlphaModule AND in Dynamic Universe Selection:
self.pvaluefunc = cadfPValue # the calculation method to determine the degree of cointegrated-ness cointPValue or cadfPValue
# used only for Static Universe Selection
self.category = "MedicalDiagnosticsAndResearch" # Morning Star Category (either a sector or an indusyty group), only applicable for static universe selection
# used only for Dynamic Universe Selection
self.categorisation = self.Categorisation.Industries # use Morning Star Sectors or Industry Groups as categorisation for the universe
self.categorySelectionLookback = 500 # lookback (in bars) used in the cointegration test to determine the industry/sector
self.percentile = 5 # the top x% of pairs ordered by p-Value to take the mean of in determining how cointegrated an industry/sector is
# Pearson Correlation only:
self.minimumCorrelation = 0.5 # the minimum correaltion for the Pearson Correlation algo to consider a pair for trading
# z-Score trading logic only:
self.closingThreshold = 0.0 # threshold for closing an open poition (+/- around zero)
self.invalidationThreshold = 10 # how many standard deviations away from the mean until we have to assume that one stock just runs away
# Pearson and z-Score trading logic:
self.threshold = 1.2 # threshold to trigger a long/short position (standard deviations breakout from the ratio's mean)
self.TradingLogicLookback = 1200 # lookback for ratio within the trading logic
# Kalman and z-Score trading logic:
self.candidatesSelectionLookback = 1200 # lookback (in bars) used in the cointegration test to determine the actual pairs to consider for trading
self.tradeableUniverseSize = 1 # maximum number of pairs to consider for trading at any one time
self.maximumPValue = 0.01 # maximi=um p-Value in the cointegration test for a pair to be considered a candidate for trading
# END ---------------------- all parameters
# BEGIN Framework options ----------------------------------------------------
# Universe selection methods: Dynamic or Static
if self.selectionType == self.UniverseSelectionType.Dynamic:
if self.categorisation == self.Categorisation.Sectors:
categories = DynamicCategorySelection.Category.Sectors
elif self.categorisation == self.Categorisation.Industries:
categories = DynamicCategorySelection.Category.Industries
self.AddUniverseSelection(DynamicCategorySelection(self.GetRebalanceFunc(self.refreshCycle),
self.numCoarse,
self.categorySelectionLookback,
self.resolution,
self.minmarketcap,
self.minPrice,
self.minVolume,
self.category,
categories,
self.pvaluefunc,
self.percentile))
else:
self.AddUniverseSelection(StaticCategorySelection(self.GetRebalanceFunc(self.refreshCycle),
self.numCoarse,
self.minmarketcap,
self.minPrice,
self.minVolume,
self.category))
# ----------------------------------------------------------------------------
# Alpha Models: Pearson, Kalman Filter or Z-Score based trading logic
if self.algoType == self.AlgoType.Kalman:
self.SetAlpha(KalmanMeanReversionTrading(self.candidatesSelectionLookback,
self.resolution,
self.maximumPValue,
self.tradeableUniverseSize,
self.pvaluefunc))
elif self.algoType == self.AlgoType.ZScore:
self.SetAlpha(ZScoreMeanReversionTrading(self.candidatesSelectionLookback,
self.resolution,
self.maximumPValue,
self.tradeableUniverseSize,
self.pvaluefunc,
self.TradingLogicLookback,
self.threshold,
self.closingThreshold,
self.invalidationThreshold))
else:
self.SetAlpha(PearsonCorrelationPairsTradingAlphaModel(self.TradingLogicLookback,
self.resolution,
self.threshold,
self.minimumCorrelation))
# ----------------------------------------------------------------------------
# PCM
self.SetPortfolioConstruction(EqualWeightedPairsTradingPortfolio(self.capitalAllocation/self.tradeableUniverseSize))
# ----------------------------------------------------------------------------
# Risk Model
self.SetRiskManagement(NullRiskManagementModel())
# END Framework options ----------------------------------------------------
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.UniverseSettings.Resolution = self.universeResolution
self.refreshUniverseTime = datetime.min
self.SetExecution(ImmediateExecutionModel())
self.SetSecurityInitializer(lambda x: x.SetMarketPrice(self.GetLastKnownPrice(x)))
def GetResolution(self, resolution):
if resolution == self.Timeframe.Minute:
return Resolution.Minute
elif resolution == self.Timeframe.Hour:
return Resolution.Hour
else:
return Resolution.Daily
def GetRebalanceFunc(self, refreshCycle):
if refreshCycle == self.Timeframe.Week:
return Expiry.EndOfWeek
elif refreshCycle == self.Timeframe.Month:
return Expiry.EndOfMonth
elif refreshCycle == self.Timeframe.Quarter:
return Expiry.EndOfQuarter
else:
return Expiry.EndOfDay