| Overall Statistics |
|
Total Trades 20 Average Win 2.17% Average Loss -2.19% Compounding Annual Return 0.935% Drawdown 16.000% Expectancy 0.193 Net Profit 3.910% Sharpe Ratio 0.182 Probabilistic Sharpe Ratio 5.130% Loss Rate 40% Win Rate 60% Profit-Loss Ratio 0.99 Alpha -0.023 Beta 0.232 Annual Standard Deviation 0.062 Annual Variance 0.004 Information Ratio -1.232 Tracking Error 0.112 Treynor Ratio 0.048 Total Fees $20.00 |
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller
def TestStationartiy(returns):
# Return pandas Series with True/False for each symbol
return pd.Series([adfuller(values)[1] < 0.05 for columns, values in returns.iteritems()], index = returns.columns)
def GetZScores(returns):
# Return pandas DataFrame containing z-scores
return returns.subtract(returns.mean()).div(returns.std())from HMM import *
import numpy as np
from StationarityAndZScores import *
from QuantConnect.Data.UniverseSelection import *
class ModulatedDynamicCircuit(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2016, 1, 1) # Set Start Date
self.SetCash(10000) # Set Strategy Cash
self.SetBrokerageModel(AlphaStreamsBrokerageModel())
self.SetExecution(ImmediateExecutionModel())
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.UniverseSettings.Resolution = Resolution.Minute
#self.SetUniverseSelection(LiquidETFUniverse())
#self.AddUniverse(self.CoarseSelectionFunction, self.FineSelectionFunction)
self.models = {}
SPY = self.AddEquity('SPY')
self.Schedule.On(self.DateRules.EveryDay('SPY'), self.TimeRules.AfterMarketOpen('SPY', 5), self.GenerateInsights)
self.Schedule.On(self.DateRules.MonthStart('SPY'), self.TimeRules.At(19,0), self.RefitModels)
def RefitModels(self):
for symbol, model in self.models.items():
RefitModel(self, symbol, model)
def GenerateInsights(self):
insights = []
qb = self
'''---------------------------------------------------------------------------
Insert fundamental selection criteria here. Then pass that to symbols variable below.
Use regime filter on SPY, not individual stocks.
---------------------------------------------------------------------------'''
symbols = ["SPY"] #<---update this to take input from above, not activesecurities
# Copy and paste from research notebook
# -----------------------------------------------------------------------------
# Fetch history
history = qb.History(symbols, 500, Resolution.Hour)
# Convert to returns
returns = history.unstack(level = 1).close.transpose().pct_change().dropna()
# Test for stationarity
stationarity = TestStationartiy(returns)
# Get z-scores
z_scores = GetZScores(returns)
# -----------------------------------------------------------------------------
insights = []
# Iterate over symbols
for symbol, value in stationarity.iteritems():
# Only emit Insights for those whose returns exhibit stationary behavior
if value:
# Get Hidden Markov model
model = self.CheckForHMM(symbol)
# Predict current state
state_prediction = PredictState(self, model, symbol)
# Get most recent z_score
z_score = z_scores[symbol].tail(1).values[0]
# Determine if we want to invest or not
#if (z_score < -1) and (state_prediction == 0):
if (state_prediction == 0): # and not self.Portfolio[symbol].Invested:
insights.append(Insight.Price(symbol, timedelta(1), InsightDirection.Up))
elif z_score > 1:
if self.Portfolio[symbol].Invested:
insights.append(Insight.Price(symbol, timedelta(1), InsightDirection.Flat))
elif self.Portfolio[symbol].Invested and (state_prediction == 1):
insights.append(Insight.Price(symbol, timedelta(1), InsightDirection.Flat))
self.Log("State = " + str(state_prediction))
self.EmitInsights(insights)
def CheckForHMM(self, symbol):
if self.models.get(symbol, None) is None:
self.models[symbol] = CreateHMM(self, symbol)
return self.models.get(symbol, None)
def OnSecuritiesChanged(self, changes):
symbols = ["SPY"]
# Build model for each symbol
for symbol in symbols:
self.models[symbol] = CreateHMM(self, symbol)import numpy as np
import pandas as pd
from hmmlearn.hmm import GaussianHMM
def CreateHMM(algorithm, symbol):
history = algorithm.History([symbol], 900, Resolution.Daily)
returns = np.array(history.loc[symbol].close.pct_change().dropna())
# Reshape returns
returns = np.array(returns).reshape((len(returns),1))
# Initialize Gaussian Hidden Markov Model
return GaussianHMM(n_components=2, covariance_type="full", n_iter=10000).fit(returns)
def PredictState(algorithm, model, symbol):
# Predict current state
if algorithm.CurrentSlice.ContainsKey(symbol):
price = np.array(algorithm.CurrentSlice[symbol].Close).reshape((1,1))
else:
price = np.array(algorithm.Securities[symbol].Price).reshape((1,1))
return model.predict(price)[0]
def RefitModel(algorithm, symbol, model):
history = algorithm.History([symbol], 900, Resolution.Daily)
returns = np.array(history.loc[symbol].close.pct_change().dropna())
# Reshape returns
returns = np.array(returns).reshape((len(returns),1))
return model.fit(returns)