| Overall Statistics |
|
Total Trades 895 Average Win 0.37% Average Loss -0.31% Compounding Annual Return 47.339% Drawdown 19.500% Expectancy 0.257 Net Profit 40.017% Sharpe Ratio 1.025 Probabilistic Sharpe Ratio 51.130% Loss Rate 42% Win Rate 58% Profit-Loss Ratio 1.18 Alpha 0.328 Beta 0.253 Annual Standard Deviation 0.374 Annual Variance 0.14 Information Ratio 0.429 Tracking Error 0.383 Treynor Ratio 1.517 Total Fees $4030.38 |
import numpy as np
import pandas as pd
from statsmodels.tsa.stattools import adfuller
def TestStationartiy(returns):
# Return pandas Series with True/False for each symbol
return pd.Series([adfuller(values)[1] < 0.05 for columns, values in returns.iteritems()], index = returns.columns)
def GetZScores(returns):
# Return pandas DataFrame containing z-scores
return returns.subtract(returns.mean()).div(returns.std())from HMM import *
import numpy as np
from StationarityAndZScores import *
from QuantConnect.Data.UniverseSelection import *
class ModulatedDynamicCircuit(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1) # Set Start Date
self.SetCash(100000) # Set Strategy Cash
self.SetBrokerageModel(AlphaStreamsBrokerageModel())
self.SetExecution(ImmediateExecutionModel())
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.SetUniverseSelection(LiquidETFUniverse())
self.models = {}
self.AddEquity('GLD')
self.Schedule.On(self.DateRules.EveryDay('GLD'), self.TimeRules.AfterMarketOpen('GLD', 5), self.GenerateInsights)
self.Schedule.On(self.DateRules.MonthStart('GLD'), self.TimeRules.At(19,0), self.RefitModels)
def RefitModels(self):
for symbol, model in self.models.items():
RefitModel(self, symbol, model)
def GenerateInsights(self):
insights = []
qb = self
symbols = [x.Symbol for x in qb.ActiveSecurities.Values]
# Copy and paste from research notebook
# -----------------------------------------------------------------------------
# Fetch history
history = qb.History(symbols, 500, Resolution.Hour)
# Convert to returns
returns = history.unstack(level = 1).close.transpose().pct_change().dropna()
# Test for stationarity
stationarity = TestStationartiy(returns)
# Get z-scores
z_scores = GetZScores(returns)
# -----------------------------------------------------------------------------
insights = []
# Iterate over symbols
for symbol, value in stationarity.iteritems():
# Only emit Insights for those whose returns exhibit stationary behavior
if value:
# Get Hidden Markov model
model = self.CheckForHMM(symbol)
# Predict current state
state_prediction = PredictState(self, model, symbol)
# Get most recent z_score
z_score = z_scores[symbol].tail(1).values[0]
# Determine if we want to invest or not
if (z_score < -1) and (state_prediction == 0):
insights.append(Insight.Price(symbol, timedelta(1), InsightDirection.Up))
elif z_score > 1:
if self.Portfolio[symbol].Invested:
insights.append(Insight.Price(symbol, timedelta(1), InsightDirection.Flat))
elif self.Portfolio[symbol].Invested and (state_prediction == 1):
insights.append(Insight.Price(symbol, timedelta(1), InsightDirection.Flat))
self.EmitInsights(insights)
def CheckForHMM(self, symbol):
if self.models.get(symbol, None) is None:
self.models[symbol] = CreateHMM(self, symbol)
return self.models.get(symbol, None)
def OnSecuritiesChanged(self, changes):
symbols = [x.Symbol for x in changes.AddedSecurities]
# Build model for each symbol
for symbol in symbols:
self.models[symbol] = CreateHMM(self, symbol)import numpy as np
import pandas as pd
from hmmlearn.hmm import GaussianHMM
def CreateHMM(algorithm, symbol):
history = algorithm.History([symbol], 900, Resolution.Daily)
returns = np.array(history.loc[symbol].close.pct_change().dropna())
# Reshape returns
returns = np.array(returns).reshape((len(returns),1))
# Initialize Gaussian Hidden Markov Model
return GaussianHMM(n_components=2, covariance_type="full", n_iter=1000).fit(returns)
def PredictState(algorithm, model, symbol):
# Predict current state
if algorithm.CurrentSlice.ContainsKey(symbol):
price = np.array(algorithm.CurrentSlice[symbol].Close).reshape((1,1))
else:
price = np.array(algorithm.Securities[symbol].Price).reshape((1,1))
return model.predict(price)[0]
def RefitModel(algorithm, symbol, model):
history = algorithm.History([symbol], 900, Resolution.Daily)
returns = np.array(history.loc[symbol].close.pct_change().dropna())
# Reshape returns
returns = np.array(returns).reshape((len(returns),1))
return model.fit(returns)