| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 |
from scipy.optimize import leastsq
from statsmodels.tsa import stattools, ar_model
import pandas as pd
from datetime import timedelta
from enum import Enum
import logging
log = logging.getLogger()
class Weights:
def __init__(self, names):
self.values = {n: 1/len(names) for n in names}
def set(self, **kwargs):
for n, v in kwargs.items():
if n in self.values:
self.values[n] = v
# scale weights to one
s = np.sum(self.values.values())
self.values = {n: v/s for n, v in self.values.items()}
def get(self, n):
if n in self.values:
return self.values[n]
def estimate(self, df):
pass
def calculate_cointegration_weights(arr, p_thres=0.05):
"""calculate and tests cointegration of two symbols
Args:
arr (np.ndarray): of size (T, 2)
Returns:
weights (float): normed weights for cointegration portfolio
"""
if arr.shape[1] != 2:
raise Exception("Second dimension must be of size 2")
loss = lambda b: arr[:, 0] - b*arr[:, 1]
beta, _ = leastsq(loss, 0)
adres = stattools.adfuller(resi, regresults=True, maxlag=1)
if adres[1] > p_thres:
raise ValueError("p-value %s ad fuller is above threshold of (%s)"%(adres[1], p_thres))
return np.array([1, -beta])/(1+beta)
class CointegrationAlphaModel(AlphaModel):
"""use cointegration estimated from historical prices"""
class State(Enum):
ShortRatio = -1
FlatRatio = 0
LongRatio = 1
def __init__(self, asset1, asset2, lookback, weights, threshold=1):
"""Initializes a new default instance of the CointegrationAlphaModel class.
Args:
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
lookback(int): Historical return lookback period
"""
self.asset1 = asset1
self.asset2 = asset2
self.asset1Price = RollingWindow[IndicatorDataPoint](lookback)
self.asset2Price = RollingWindow[IndicatorDataPoint](lookback)
self.weights = weights
self.state = self.State.FlatRatio
self.ratio = None
self.mean = None
self.threshold = threshold
self.upperThreshold = None;
self.lowerThreshold = None;
def Update(self, algorithm, data):
''' Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
if self.mean is None or not self.mean.IsReady:
return []
# don't re-emit the same direction
if self.state is not self.State.LongRatio and self.ratio > self.upperThreshold:
self.state = self.State.LongRatio
# asset1/asset2 is more than 2 std away from mean, short asset1, long asset2
shortAsset1 = Insight.Price(self.asset1, timedelta(minutes = 15), InsightDirection.Down)
longAsset2 = Insight.Price(self.asset2, timedelta(minutes = 15), InsightDirection.Up)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(shortAsset1, longAsset2)
# don't re-emit the same direction
if self.state is not self.State.ShortRatio and self.ratio < self.lowerThreshold:
self.state = self.State.ShortRatio
# asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
longAsset1 = Insight.Price(self.asset1, timedelta(minutes = 15), InsightDirection.Up)
shortAsset2 = Insight.Price(self.asset2, timedelta(minutes = 15), InsightDirection.Down)
# creates a group id and set the GroupId property on each insight object
return Insight.Group(longAsset1, shortAsset2)
return []
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed.
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
for added in changes.AddedSecurities:
# this model is limitted to looking at a single pair of assets
if added.Symbol != self.asset1 and added.Symbol != self.asset2:
continue
if added.Symbol == self.asset1:
self.asset1Price.Add(algorithm.Identity(added.Symbol))
else:
self.asset2Price.Add(algorithm.Identity(added.Symbol))
if self.ratio is None:
# initialize indicators dependent on both assets
if self.asset1Price.IsReady() and self.asset2Price.IsReady():
self.ratio = IndicatorExtensions.Over(self.asset1Price, self.asset2Price)
self.mean = IndicatorExtensions.Of(ExponentialMovingAverage(500), self.ratio)
upper = ConstantIndicator[IndicatorDataPoint]("ct", 1 + self.threshold / 100)
self.upperThreshold = IndicatorExtensions.Times(self.mean, upper)
lower = ConstantIndicator[IndicatorDataPoint]("ct", 1 - self.threshold / 100)
self.lowerThreshold = IndicatorExtensions.Times(self.mean, lower)import numpy as np
class WeightedPortfolioConstructionModel(PortfolioConstructionModel):
'''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities.
The target percent holdings of each security is 1/N where N is the number of securities.
For insights of direction InsightDirection.Up, long targets are returned and
for insights of direction InsightDirection.Down, short targets are returned.'''
def __init__(self, weights):
self.weights = weights
self.insightCollection = InsightCollection()
self.removedSymbols = []
def CreateTargets(self, algorithm, insights):
'''Create portfolio targets from the specified insights
Args:
algorithm: The algorithm instance
insights: The insights to create portoflio targets from
Returns:
An enumerable of portoflio targets to be sent to the execution model'''
self.insightCollection.AddRange(insights)
targets = []
if self.removedSymbols is not None:
# zero out securities removes from the universe
for symbol in self.removedSymbols:
targets.append(PortfolioTarget(symbol, 0))
self.removedSymbols = None
if len(insights) == 0:
return targets
# Get symbols that have emit insights and still in the universe
symbols = list(set([x.Symbol for x in self.insightCollection if x.CloseTimeUtc > algorithm.UtcTime]))
for symbol in symbols:
activeInsights = [ x for x in self.insightCollection if x.Symbol == symbol ]
w = self.weights.get(symbol)
if w is None:
return []
direction = activeInsights[-1].Direction
targets.append(PortfolioTarget.Percent(algorithm, symbol, direction * w))
return targets
def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
# save securities removed so we can zero out our holdings
self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities]
# remove the insights of the removed symbol from the collection
for removedSymbol in self.removedSymbols:
if self.insightCollection.ContainsKey(removedSymbol):
for insight in self.insightCollection[removedSymbol]:
self.insightCollection.Remove(insight)import numpy as np
import pandas as pd
def calculate_cointegration_weights(arr, p_thres=0.05):
"""calculate and tests cointegration of two symbols
Args:
arr (np.ndarray): of size (T, 2)
Returns:
weights (float): normed weights for cointegration portfolio
"""
if arr.shape[1] != 2:
raise Exception("Second dimension must be of size 2")
loss = lambda b: arr[:, 0] - b*arr[:, 1]
beta, _ = leastsq(loss, 0)
adres = stattools.adfuller(resi, regresults=True, maxlag=1)
if adres[1] > p_thres:
raise ValueError("p-value %s ad fuller is above threshold of (%s)"%(adres[1], p_thres))
return np.array([1, -beta])/(1+beta)
class LongShortCointegrationPairs(QCAlgorithm):
def Initialize(self):
self.asset1 = 'SPY'
self.asset2 = 'IWM'
self.periods = 10
self.SetStartDate(2002, 1, 1) #Set Start Date
self.SetEndDate(2005, 12, 31) #Set End Date
self.SetCash(10000) #Set Strategy Cash
for sym in [self.asset1, self.asset2]:
self.AddEquity(sym, Resolution.Daily)
self.p1 = RollingWindow[float](self.periods)
self.p2 = RollingWindow[float](self.periods)
self.weights = None
def onData(self, data):
# check if prices for both asset are present
if self.asset1 in data and self.asset2 in data:
self.p1.Add(data[self.asset1].Close)
self.p2.Add(data[self.asset2].Close)
else:
return None
if self.p1.IsReady() and self.p2.IsReady() and self.weights is None:
arr = np.matrix([self.p1[:], self.p2[:]]).T
self.weights = calculate_cointegration_weights(arr)
self.SetHoldings("SPY", 1)