| Overall Statistics |
|
Total Trades 4 Average Win 0.06% Average Loss -0.07% Compounding Annual Return -0.017% Drawdown 0.000% Expectancy -0.061 Net Profit -0.009% Sharpe Ratio -1.425 Probabilistic Sharpe Ratio 0.033% Loss Rate 50% Win Rate 50% Profit-Loss Ratio 0.88 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -3.726 Tracking Error 0.112 Treynor Ratio 3.634 Total Fees $16.06 |
from Execution.ImmediateExecutionModel import ImmediateExecutionModel
from datetime import timedelta, datetime
import numpy as np
import pandas as pd
import statsmodels.tsa.stattools as ts
import statsmodels.api
from clr import AddReference
AddReference("QuantConnect.Common")
AddReference("QuantConnect.Algorithm.Framework")
from QuantConnect import Resolution, Extensions
from QuantConnect.Algorithm.Framework.Alphas import *
from QuantConnect.Algorithm.Framework.Portfolio import *
from itertools import groupby
class ParticleNadionRegulators(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 7) # Set Start Date
self.SetEndDate(2019, 7, 7)
self.SetCash(100000) # Set Strategy Cash
#lookback = self.GetParameter("look-back")
symbols = [Symbol.Create("PEP", SecurityType.Equity, Market.USA), Symbol.Create("KO", SecurityType.Equity, Market.USA)]
stock1 = symbols[0]
stock2 = symbols[1]
self.AddUniverseSelection(ManualUniverseSelectionModel(symbols))
self.UniverseSettings.Resolution = Resolution.Hour
self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw
self.SetAlpha(SinglePairsTrade()) # works when addalpha
#self.SetAlpha(SinglePairsTrade(self, lookback)) # works when addalpha
self.SetExecution(ImmediateExecutionModel())
self.SetPortfolioConstruction(HedgeWeightingPortfolioConstructionModel())
#self.Settings.RebalancePortfolioOnInsightChanges = False
#self.Settings.RebalancePortfolioOnSecurityChanges = True
self.Settings.FreePortfolioValuePercentage = 0.1
'''
def OnEndOfDay(self):
self.Log("MV of stock 1 is "+str((self.Portfolio["PEP"].Quantity)*(self.Portfolio["PEP"].Price))+", and MV of stock 2 is "+str((self.Portfolio["KO"].Quantity)*(self.Portfolio["KO"].Price)))
'''
class SinglePairsTrade(AlphaModel):
def __init__(self):
#self.lookback1 = int(lookback)
self.lookback1 = 100
self.pair = []
self.spreadMean = SimpleMovingAverage(self.lookback1)
self.spreadStd = StandardDeviation(self.lookback1)
self.period = timedelta(hours=1)
self.w1 = 0
self.w2 = 0
pass
def Update(self,algorithm,data):
#Calculate pearson correlation coefficient
h1 = algorithm.History(self.pair[0].Symbol, self.lookback1, Resolution.Hour)
h2 = algorithm.History(self.pair[1].Symbol, self.lookback1, Resolution.Hour)
h1 = h1['close'].unstack(level=0)
h2 = h2['close'].unstack(level=0)
h3 = pd.merge(h1, h2, on = "time", how="inner")
correlation = h3.corr(method="pearson")
pearsoncoef = correlation.iloc[0,1]
algorithm.Plot("Pearson corr", "Pearsoncoef", pearsoncoef*100)
#Calculate cointegration
header1 = h3.columns[0]
header2 = h3.columns[1]
res = statsmodels.api.OLS(endog=h3[str(header1)], exog=h3[str(header2)]).fit()
#algorithm.Debug("Original parameter output:" +str(res.params))
rsq = res.rsquared
#algorithm.Debug("R squared is: "+str(res.rsquared))
#algorithm.Debug(res.params)
#algorithm.Debug(res.params.loc[str(header2)])
beta_hr = res.params.loc[str(header2)]
self.w1 = 1.0/(1.0+beta_hr)
self.w2 = 1.0-self.w1
h3["res"] = h3[str(header1)] - beta_hr*h3[str(header2)]
cadf = ts.adfuller(h3["res"])
#algorithm.Debug(cadf)
#algorithm.Debug("pvalue is "+str(cadf[1]))
significance = 0
pval = cadf[1]
algorithm.Plot("P value", "Pvalue", cadf[1]*100)
if cadf[1] < 0.05:
significance = 1
algorithm.Plot("Hedge ratio", "Ratio of asset 2", beta_hr*100)
algorithm.Plot("R squared plot", "Rsqaured", rsq*100)
#Calculate spread based on cointegration
spread = (self.w2*self.pair[1].Price) - (self.w1*self.pair[0].Price)
self.spreadMean.Update(algorithm.Time, spread)
self.spreadStd.Update(algorithm.Time, spread)
upperthreshold = self.spreadMean.Current.Value + (2*self.spreadStd.Current.Value)
lowerthreshold = self.spreadMean.Current.Value - (2*self.spreadStd.Current.Value)
algorithm.Plot("Spread plot", "Spread", spread)
#Check if we have any holdings. If we do and spread converged, then liquidate fund and take profit:
if ((algorithm.Portfolio[self.pair[0].Symbol].Quantity != 0) or (algorithm.Portfolio[self.pair[1].Symbol].Quantity != 0)):
if (spread < 0.5*self.spreadStd.Current.Value and spread > -0.5*self.spreadStd.Current.Value and algorithm.Portfolio[self.pair[0].Symbol].Quantity>0):
return Insight.Group(
[
Insight.Price(self.pair[0].Symbol, self.period, InsightDirection.Down, None, None, None, 0),
Insight.Price(self.pair[1].Symbol, self.period, InsightDirection.Up, None, None, None, 0)
])
if (spread < 0.5*self.spreadStd.Current.Value and spread > -0.5*self.spreadStd.Current.Value and algorithm.Portfolio[self.pair[0].Symbol].Quantity<0):
return Insight.Group(
[
Insight.Price(self.pair[0].Symbol, self.period, InsightDirection.Up, None, None, None, 0),
Insight.Price(self.pair[1].Symbol, self.period, InsightDirection.Down, None, None, None, 0)
])
return[]
#Check if we don't have any holdings. If so, then
if ((algorithm.Portfolio[self.pair[0].Symbol].Quantity == 0) and (algorithm.Portfolio[self.pair[1].Symbol].Quantity == 0)):
if ((rsq>0.75) and (pval<0.05)):
#Generate insights
if spread > upperthreshold:
#algorithm.SetHoldings([PortfolioTarget(self.pair[0].Symbol, self.w1), PortfolioTarget(self.pair[1].Symbol, -1*self.w2)])
return Insight.Group(
[
Insight.Price(self.pair[0].Symbol, self.period, InsightDirection.Up,None, None, None, self.w1),
Insight.Price(self.pair[1].Symbol, self.period, InsightDirection.Down, None, None, None, self.w2)
])
if spread < lowerthreshold:
#algorithm.SetHoldings([PortfolioTarget(self.pair[0].Symbol, -1*self.w1), PortfolioTarget(self.pair[1].Symbol, self.w2)])
return Insight.Group(
[
Insight.Price(self.pair[0].Symbol, self.period, InsightDirection.Down, None, None, None, self.w1),
Insight.Price(self.pair[1].Symbol, self.period, InsightDirection.Up, None, None, None, self.w2)
])
return[]
def OnSecuritiesChanged(self, algorithm, changes):
self.pair = [x for x in changes.AddedSecurities]
#1. Call for bars of history data for each symbol in the pair and save to the variable history
history = algorithm.History([x.Symbol for x in self.pair], self.lookback1)
#2. Unstack the Pandas data frame to reduce it to the history close price
history = history.close.unstack(level=0)
#3. Iterate through the history tuple and update the mean and standard deviation with historical data
for tuple in history.itertuples():
self.spreadMean.Update(tuple[0], (self.w2*tuple[2])-(tuple[1]*self.w1))
self.spreadStd.Update(tuple[0], (self.w2*tuple[2])-(tuple[1]*self.w1))
class HedgeWeightingPortfolioConstructionModel(PortfolioConstructionModel):
def __init__(self, rebalance = Resolution.Daily, portfolioBias = PortfolioBias.LongShort):
self.portfolioBias = portfolioBias
# If the argument is an instance of Resolution or Timedelta
# Redefine rebalancingFunc
rebalancingFunc = rebalance
if isinstance(rebalance, int):
rebalance = Extensions.ToTimeSpan(rebalance)
if isinstance(rebalance, timedelta):
rebalancingFunc = lambda dt: dt + rebalance
if rebalancingFunc:
self.SetRebalancingFunc(rebalancingFunc)
def DetermineTargetPercent(self, activeInsights):
result = {}
# give equal weighting to each security
count = sum(x.Direction != InsightDirection.Flat and self.RespectPortfolioBias(x) for x in activeInsights)
for insight in activeInsights:
#self.Log("Insight weights" + str(insight.Weight))
if count == 0:
result[insight] = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat)*0
else:
result[insight] = (insight.Direction if self.RespectPortfolioBias(insight) else InsightDirection.Flat)*self.GetValue(insight)
return result
def RespectPortfolioBias(self, insight):
return self.portfolioBias == PortfolioBias.LongShort or insight.Direction == self.portfolioBias
def GetValue(self, insight):
return insight.Weight