Overall Statistics Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees \$0.00
```import numpy as np
import pandas as pd

def calculate_cointegration_weights(arr, p_thres=0.05):
"""calculate and tests cointegration of two symbols

Args:
arr (np.ndarray): of size (T, 2)
Returns:
weights (float): normed weights for cointegration portfolio
"""

if arr.shape != 2:
raise Exception("Second dimension must be of size 2")

loss = lambda b: arr[:, 0] - b*arr[:, 1]
beta, _ = leastsq(loss, 0)

return np.array([1, -beta])/(1+beta)

class LongShortCointegrationPairs(QCAlgorithm):
def Initialize(self):
self.asset1 = 'SPY'
self.asset2 = 'IWM'
self.periods = 10

self.SetStartDate(2002, 1, 1)  #Set Start Date
self.SetEndDate(2005, 12, 31)    #Set End Date
self.SetCash(10000)           #Set Strategy Cash

for sym in [self.asset1, self.asset2]:

self.p1 = RollingWindow[float](self.periods)
self.p2 = RollingWindow[float](self.periods)
self.weights = None

def onData(self, data):
# check if prices for both asset are present
if self.asset1 in data and self.asset2 in data:
else:
return None

arr = np.matrix([self.p1[:], self.p2[:]]).T
self.weights = calculate_cointegration_weights(arr)
self.SetHoldings("SPY", 1)```
```from scipy.optimize import leastsq
from statsmodels.tsa import stattools, ar_model
import pandas as pd
from datetime import timedelta
from enum import Enum
import logging

log = logging.getLogger()

class Weights:
def __init__(self, names):
self.values = {n: 1/len(names) for n in names}

def set(self, **kwargs):
for n, v in kwargs.items():
if n in self.values:
self.values[n] = v

# scale weights to one
s = np.sum(self.values.values())
self.values = {n: v/s for n, v in self.values.items()}

def get(self, n):
if n in self.values:
return self.values[n]

def estimate(self, df):
pass

def calculate_cointegration_weights(arr, p_thres=0.05):
"""calculate and tests cointegration of two symbols

Args:
arr (np.ndarray): of size (T, 2)
Returns:
weights (float): normed weights for cointegration portfolio
"""

if arr.shape != 2:
raise Exception("Second dimension must be of size 2")

loss = lambda b: arr[:, 0] - b*arr[:, 1]
beta, _ = leastsq(loss, 0)

return np.array([1, -beta])/(1+beta)

class CointegrationAlphaModel(AlphaModel):
"""use cointegration estimated from historical prices"""

class State(Enum):
ShortRatio = -1
FlatRatio = 0
LongRatio = 1

def __init__(self, asset1, asset2, lookback, weights, threshold=1):
"""Initializes a new default instance of the CointegrationAlphaModel class.
Args:
asset1: The first asset's symbol in the pair
asset2: The second asset's symbol in the pair
lookback(int): Historical return lookback period
"""

self.asset1 = asset1
self.asset2 = asset2
self.asset1Price = RollingWindow[IndicatorDataPoint](lookback)
self.asset2Price = RollingWindow[IndicatorDataPoint](lookback)
self.weights = weights
self.state = self.State.FlatRatio
self.ratio = None
self.mean = None
self.threshold = threshold
self.upperThreshold = None;
self.lowerThreshold = None;

def Update(self, algorithm, data):
''' Updates this alpha model with the latest data from the algorithm.
This is called each time the algorithm receives data for subscribed securities
Args:
algorithm: The algorithm instance
data: The new data available
Returns:
The new insights generated'''
if self.mean is None or not self.mean.IsReady:
return []

# don't re-emit the same direction
if self.state is not self.State.LongRatio and self.ratio > self.upperThreshold:
self.state = self.State.LongRatio

# asset1/asset2 is more than 2 std away from mean, short asset1, long asset2
shortAsset1 = Insight.Price(self.asset1, timedelta(minutes = 15), InsightDirection.Down)
longAsset2 = Insight.Price(self.asset2, timedelta(minutes = 15), InsightDirection.Up)

# creates a group id and set the GroupId property on each insight object
return Insight.Group(shortAsset1, longAsset2)

# don't re-emit the same direction
if self.state is not self.State.ShortRatio and self.ratio < self.lowerThreshold:
self.state = self.State.ShortRatio

# asset1/asset2 is less than 2 std away from mean, long asset1, short asset2
longAsset1 = Insight.Price(self.asset1, timedelta(minutes = 15), InsightDirection.Up)
shortAsset2 = Insight.Price(self.asset2, timedelta(minutes = 15), InsightDirection.Down)

# creates a group id and set the GroupId property on each insight object
return Insight.Group(longAsset1, shortAsset2)

return []

def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed.
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''
# this model is limitted to looking at a single pair of assets
continue

else:

if self.ratio is None:
# initialize indicators dependent on both assets

self.ratio = IndicatorExtensions.Over(self.asset1Price, self.asset2Price)
self.mean = IndicatorExtensions.Of(ExponentialMovingAverage(500), self.ratio)

upper = ConstantIndicator[IndicatorDataPoint]("ct", 1 + self.threshold / 100)
self.upperThreshold = IndicatorExtensions.Times(self.mean, upper)

lower = ConstantIndicator[IndicatorDataPoint]("ct", 1 - self.threshold / 100)
self.lowerThreshold = IndicatorExtensions.Times(self.mean, lower)```
```import numpy as np

class WeightedPortfolioConstructionModel(PortfolioConstructionModel):
'''Provides an implementation of IPortfolioConstructionModel that gives equal weighting to all securities.
The target percent holdings of each security is 1/N where N is the number of securities.
For insights of direction InsightDirection.Up, long targets are returned and
for insights of direction InsightDirection.Down, short targets are returned.'''
def __init__(self, weights):
self.weights = weights
self.insightCollection = InsightCollection()
self.removedSymbols = []

def CreateTargets(self, algorithm, insights):
'''Create portfolio targets from the specified insights
Args:
algorithm: The algorithm instance
insights: The insights to create portoflio targets from
Returns:
An enumerable of portoflio targets to be sent to the execution model'''

targets = []

if self.removedSymbols is not None:
# zero out securities removes from the universe
for symbol in self.removedSymbols:
targets.append(PortfolioTarget(symbol, 0))
self.removedSymbols = None

if len(insights) == 0:
return targets

# Get symbols that have emit insights and still in the universe
symbols = list(set([x.Symbol for x in self.insightCollection if x.CloseTimeUtc > algorithm.UtcTime]))

for symbol in symbols:
activeInsights = [ x for x in self.insightCollection if x.Symbol == symbol ]
w = self.weights.get(symbol)
if w is None:
return []
direction = activeInsights[-1].Direction
targets.append(PortfolioTarget.Percent(algorithm, symbol, direction * w))

return targets

def OnSecuritiesChanged(self, algorithm, changes):
'''Event fired each time the we add/remove securities from the data feed
Args:
algorithm: The algorithm instance that experienced the change in securities
changes: The security additions and removals from the algorithm'''

# save securities removed so we can zero out our holdings
self.removedSymbols = [x.Symbol for x in changes.RemovedSecurities]

# remove the insights of the removed symbol from the collection
for removedSymbol in self.removedSymbols:
if self.insightCollection.ContainsKey(removedSymbol):
for insight in self.insightCollection[removedSymbol]:
self.insightCollection.Remove(insight)```