| Overall Statistics |
|
Total Trades 6 Average Win 0% Average Loss -0.02% Compounding Annual Return -0.319% Drawdown 0.400% Expectancy -1 Net Profit -0.351% Sharpe Ratio -0.867 Probabilistic Sharpe Ratio 0.869% Loss Rate 100% Win Rate 0% Profit-Loss Ratio 0 Alpha -0.002 Beta 0.001 Annual Standard Deviation 0.003 Annual Variance 0 Information Ratio -2.428 Tracking Error 0.107 Treynor Ratio -3.276 Total Fees $6.00 Estimated Strategy Capacity $17000000.00 Lowest Capacity Asset SHY SGNKIKYGE9NP |
# REF. https://www.quantconnect.com/forum/discussion/2657/a-simple-vix-strategy
from QuantConnect.Python import PythonQuandl # quandl data not CLOSE
from QuantConnect.Python import PythonData # custom data
from QuantConnect.Data import SubscriptionDataSource
from datetime import datetime, timedelta
import decimal
class CboeVix(PythonData):
'''CBOE Vix Download Custom Data Class'''
def GetSource(self, config, date, isLiveMode):
url_vix = "https://cdn.cboe.com/api/global/us_indices/daily_prices/VIX_History.csv"
return SubscriptionDataSource(url_vix,
SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLiveMode):
if not (line.strip() and line[0].isdigit()): return None
# New CboeVix object
index = CboeVix();
index.Symbol = config.Symbol
try:
# Example File Format:
# Date VIX Open VIX High VIX Low VIX Close
# 01/02/2004 17.96 18.68 17.54 18.22
#print line
data = line.split(',')
date = data[0].split('/')
index.Time = datetime(int(date[2]), int(date[0]), int(date[1]))
index.Value = decimal.Decimal(data[4])
index["Open"] = float(data[1])
index["High"] = float(data[2])
index["Low"] = float(data[3])
index["Close"] = float(data[4])
except ValueError:
# Do nothing
return None
# except KeyError, e:
# print 'I got a KeyError - reason "%s"' % str(e)
return index
# NB: CboeVxV class == CboeVix class, except for the URL
class CboeVxV(PythonData):
'''CBOE VXV Download Custom Data Class'''
def GetSource(self, config, date, isLiveMode):
url_vxv = "https://cdn.cboe.com/api/global/us_indices/daily_prices/VIX3M_History.csv"
return SubscriptionDataSource(url_vxv,
SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLiveMode):
if not (line.strip() and line[0].isdigit()): return None
index = CboeVxV();
index.Symbol = config.Symbol
try:
# Example File Format:
# OPEN HIGH LOW CLOSE
# 12/04/2007 24.8 25.01 24.15 24.65
data = line.split(',')
date = data[0].split('/')
index.Time = datetime(int(date[2]), int(date[0]), int(date[1]))
index.Value = decimal.Decimal(data[4])
index["Open"] = float(data[1])
index["High"] = float(data[2])
index["Low"] = float(data[3])
index["Close"] = float(data[4])
except ValueError:
# Do nothing
return None
return index
# for using VIX futures settle in calc. ratios like VIX/VIX1
class QuandlFuture(PythonQuandl):
'''Custom quandl data type for setting customized value column name.
Value column is used for the primary trading calculations and charting.'''
def __init__(self):
# Define ValueColumnName: cannot be None, Empty or non-existant column name
# If ValueColumnName is "Close", do not use PythonQuandl, use Quandl:
# self.AddData[QuandlFuture](self.VIX1, Resolution.Daily)
self.ValueColumnName = "Settle"# REF. https://www.quantconnect.com/forum/discussion/6931/from-research-to-production-long-short-term-memory
from MyLSTM import MyLSTM, LOOKBACK_DAYS, FEATURE_LEN
from my_custom_data import CboeVix, CboeVxV
import numpy as np
import pandas as pd
initial_lookback = LOOKBACK_DAYS*30
training_lookback = LOOKBACK_DAYS*10
predict_lookback = FEATURE_LEN*10 # look back seems like needs to be multiplied by a factor depend on number of symbols?
class MultidimensionalHorizontalFlange(QCAlgorithm):
def Initialize(self):
# November 12, 2007 vxv +4
#self.SetStartDate(2014, 12, 1)
# ^^^ Algo doesn't really trade. Likely bug, need research to debug
# PSR: 59%, win loss rate: 100/0 is just wrong...
self.SetStartDate(2020, 11, 1) # cherry picked date range...for wow factor. # PSR: 84% , win loss rate: 75/25.
self.SetCash(10000) # Set Strategy Cash
self.SetBrokerageModel(AlphaStreamsBrokerageModel())
self.SetExecution(ImmediateExecutionModel())
#self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.UniverseSettings.Resolution = Resolution.Daily
#self.SetUniverseSelection(LiquidETFUniverse())
self.short_volatility = self.AddEquity('SPY', Resolution.Daily).Symbol
self.long_volatility = self.AddEquity('SHY', Resolution.Daily).Symbol
self.spy = self.AddEquity('SPY', Resolution.Daily).Symbol
self.vix = self.AddData(CboeVix, "VIX").Symbol
self.vxv = self.AddData(CboeVxV, "VXV").Symbol
self.SetWarmUp(timedelta(LOOKBACK_DAYS))
self.models = {
self.spy:None,
}
self.macro_symbols = {
'Bull':self.spy,
}
# Use Train() method to avoid runtime error
self.Train(self.TrainMyModel)
self.Train(self.DateRules.MonthEnd(), self.TimeRules.At(8,0), self.TrainMyModel)
# Schedule prediction and plotting
self.AddEquity('SPY')
self.Schedule.On(self.DateRules.EveryDay(self.spy), self.TimeRules.AfterMarketOpen(self.spy, 5), self.Predict)
#self.Schedule.On(self.DateRules.EveryDay(self.spy), self.TimeRules.BeforeMarketClose(self.spy, 5), self.ClosePosition)
self.Schedule.On(self.DateRules.EveryDay(self.spy), self.TimeRules.BeforeMarketClose(self.spy, 5), self.PlotMe)
# Create custom charts
prediction = Chart('Prediction Plot')
prediction.AddSeries(Series('Actual Bull', SeriesType.Line, 0))
prediction.AddSeries(Series('Predicted Bull', SeriesType.Line, 0))
prediction.AddSeries(Series('Actual Bear', SeriesType.Line, 1))
prediction.AddSeries(Series('Predicted Bear', SeriesType.Line, 1))
self.ready = False
def get_data(self,lookback):
history = self.History([self.spy,self.vix,self.vxv], lookback, Resolution.Daily)
df = pd.DataFrame()
#df['date'] = history.loc[self.spy].index
df['SPY'] = history.loc[self.spy].close
df['VIX'] = history.loc[self.vix].close
df['VXV'] = history.loc[self.vxv].close
return df
def TrainMyModel(self):
qb = self
for key, symbol in self.macro_symbols.items():
if self.models[symbol] is None:
df = self.get_data(initial_lookback)
else:
df = self.get_data(training_lookback)
self.Log('data {}...train'.format(df.shape))
if df.shape[0] < predict_lookback: # dont really have that much training_lookback, nor initial_lookback
self.ready = False
continue
self.Log('shape {}'.format(df.shape))
# Build model layers
if self.models[symbol] is None:
# Initialize LSTM class instance
lstm = MyLSTM()
# Prepare data
features_set, labels, stratify = lstm.ProcessData(df)
# Create model
lstm.CreateModel()
# Fit model
lstm.FitModel(features_set, labels, stratify)
# Add LSTM class to dictionary to store later
self.models[symbol] = lstm
self.ready = True
self.Log('training done')
else:
lstm = self.models[symbol]
# uncomment for real test, might take a while or timeout...
#features_set, labels, stratify = lstm.ProcessData(df)
#lstm.FitModel(features_set, labels)
self.Log('no training done')
# close position at the day.
def ClosePosition(self):
self.Log('closing position')
self.SetHoldings([PortfolioTarget(self.short_volatility, 0.0)])
self.SetHoldings([PortfolioTarget(self.long_volatility, 0.0)])
def Predict(self):
delta = {}
qb = self
for key, symbol in self.macro_symbols.items():
self.Log('predict')
self.Log('ready is {}'.format(self.ready))
if self.ready is False:
continue
self.Log('fetch data.')
# Fetch history
df = self.get_data(predict_lookback)
self.Log('data {}...predict'.format(df.shape))
if df.shape[0] < predict_lookback:
raise ValueError('not enough data {}'.format(df.shape))
continue
# Fetch LSTM class
lstm = self.models[symbol]
# Predict
predictions = lstm.PredictFromModel(df)
# Grab latest prediction and calculate if predict symbol to go up or down
delta[key] = predictions
# Plot prediction
self.Plot('Prediction Plot', f'Predicted {key}', predictions)
#confidence = np.clip(np.abs(predictions-0.5)/0.10,0,1)
#insight = Insight.Price(symbol, timedelta(1), InsightDirection.Up if predictions > 0.5 else InsightDirection.Down, confidence)
confidence = np.clip(np.abs(predictions)/0.10,0,1)
insight = Insight.Price(symbol, timedelta(1), InsightDirection.Up if predictions > 0.0 else InsightDirection.Down, confidence)
self.EmitInsights(insight)
#if predictions > 0.5:
if predictions > 0.0:
self.Log('Long!')
self.SetHoldings([PortfolioTarget(self.short_volatility, 1.0)])
self.SetHoldings([PortfolioTarget(self.long_volatility, 0.0)])
else:
self.Log('Short!')
self.SetHoldings([PortfolioTarget(self.short_volatility, 0.0)])
self.SetHoldings([PortfolioTarget(self.long_volatility, 0.5)])
def PlotMe(self):
# Plot current price of symbols to match against prediction
for key, symbol in self.macro_symbols.items():
up = 1.0 if (self.Securities[symbol].Close-self.Securities[symbol].Open) > 0 else 0.0
self.Plot('Prediction Plot', f'Actual {key}', up)
self.Plot('Prediction Plot', f'Actual {key}', self.Securities[symbol].Price)# https://stackoverflow.com/questions/38714959/understanding-keras-lstms?rq=1
SEED = 42
import os
import random as rn
import numpy as np
from tensorflow import set_random_seed
os.environ['PYTHONHASHSEED']=str(SEED)
np.random.seed(SEED)
set_random_seed(SEED)
rn.seed(SEED)
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from keras.models import load_model
from keras.optimizers import Adam
from keras.layers import LSTM
from keras.layers import Dense
from keras.layers import Dropout, BatchNormalization
from keras.layers import LeakyReLU
from keras.models import Sequential
from sklearn.model_selection import train_test_split
from keras import backend as K
K.clear_session()
LOOKBACK_DAYS = 1200
FEATURE_LEN = 40 #400
FEATURE_DIM = 2
def get_model():
#input_shape = (features_set.shape[1],features_set.shape[2])
input_shape = (FEATURE_LEN,FEATURE_DIM)
feature_len = input_shape[0]
dropout_rate = 0.2
drop = True
norm = False
model = Sequential()
model.add(LSTM(units=feature_len, return_sequences=True, input_shape=input_shape))
if drop:
model.add(Dropout(rate=dropout_rate))
if norm:
model.add(BatchNormalization())
model.add(LeakyReLU())
model.add(LSTM(units=feature_len, return_sequences=True))
if drop:
model.add(Dropout(rate=dropout_rate))
if norm:
model.add(BatchNormalization())
model.add(LeakyReLU())
model.add(LSTM(units=int(feature_len/2), return_sequences=True))
if drop:
model.add(Dropout(rate=dropout_rate))
if norm:
model.add(BatchNormalization())
model.add(LeakyReLU())
model.add(LSTM(units=int(feature_len/2)))
if drop:
model.add(Dropout(rate=dropout_rate))
if norm:
model.add(BatchNormalization())
model.add(LeakyReLU())
model.add(Dense(units=10))
if drop:
model.add(Dropout(rate=dropout_rate))
if norm:
model.add(BatchNormalization())
model.add(LeakyReLU())
model.add(Dense(units=10))
if drop:
model.add(Dropout(rate=dropout_rate))
if norm:
model.add(BatchNormalization())
model.add(LeakyReLU())
model.add(Dense(units=1,activation='linear'))
return model
class MyLSTM:
def __init__(self):
self.model = None
self.scaler = MinMaxScaler(feature_range = (0, 1))
self.feature_len = FEATURE_LEN
self.feature_dim = FEATURE_DIM
self.lookback_num = LOOKBACK_DAYS
def _transform(self,df,shift=True):
df['SPY_RET'] = np.log(df['SPY']).diff()
df['HV'] = df['SPY_RET'].rolling(10).std() * np.sqrt(252)
df['HV_MA'] = df['HV'].rolling(5).mean()
df['VRP'] = df['VIX'] - df['HV_MA']*100
df['VV'] = df['VIX']/df['VXV']
if shift:
df['VRP'] = df['VRP'].shift(1)
df['VV'] = df['VV'].shift(1)
df = df.dropna()
#if df.shape[0] < LOOKBACK_DAYS:
# raise ValueError('No Data! df shape {}'.format(df.shape))
return df
def ProcessData(self, df):
df = self._transform(df,shift=True)
data = df[['SPY_RET','VRP','VV']].values
# build dataset
features_set = []
labels = []
stratify = []
for i in range(self.feature_len, data.shape[0]):
tmp_ret = data[i,0]
tmp_c = 1.0 if tmp_ret > 0 else 0.0
#tmp_y = [tmp_c]
tmp_y = [tmp_ret]
tmp_x = data[i-self.feature_len:i, 1:]
features_set.append(tmp_x)
stratify.append(tmp_c)
# wishful thinking for predicting 1-day direction, but still fun.
labels.append(tmp_y)
features_set, labels, stratify = np.array(features_set), np.array(labels), np.array(stratify)
if features_set.shape[1] != self.feature_len:
raise ValueError('Debug {} {} {} {} {}'.format(df.shape,data.shape,features_set.shape, labels.shape, np.unique(labels)))
return features_set, labels, stratify
def CreateModel(self):
lr = 0.001
mom = 0.9
# Create Model
self.model = get_model()
self.opt = Adam(lr,mom)
#self.model.compile(optimizer=self.opt,loss='binary_crossentropy',metrics=['acc'])
self.model.compile(optimizer=self.opt,loss='mse',metrics=['acc'])
def FitModel(self, features_set, labels, stratify):
epochs = 3
batch_size = 32
x, y= features_set, labels
X_train, X_test, y_train, y_test = train_test_split(x, y, test_size=0.33, random_state=42,stratify=stratify)
self.model.fit(X_train, y_train, epochs=epochs, batch_size=batch_size,validation_data=(X_test,y_test))
def PredictFromModel(self, df):
df = self._transform(df,shift=True) # shift set to True to avoid look ahead??
df = df[['VRP','VV']].values
test_inputs = df[-1*self.feature_len:,:]
test_inputs = np.expand_dims(test_inputs,axis=0)
if test_inputs.shape[1] != self.feature_len:
raise ValueError('Prdict... Debug {}'.format(df.shape))
predictions = self.model.predict(test_inputs)
return predictions[0][0]