| Overall Statistics |
|
Total Trades 1267 Average Win 0.08% Average Loss -0.04% Compounding Annual Return 7.764% Drawdown 6.500% Expectancy 0.308 Net Profit 7.126% Sharpe Ratio 0.772 Probabilistic Sharpe Ratio 40.699% Loss Rate 59% Win Rate 41% Profit-Loss Ratio 2.17 Alpha 0.078 Beta -0.058 Annual Standard Deviation 0.084 Annual Variance 0.007 Information Ratio -1.175 Tracking Error 0.146 Treynor Ratio -1.112 Total Fees $1426.42 |
from MyLSTM import MyLSTM
class MultidimensionalHorizontalFlange(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 4) # Set Start Date
self.SetCash(100000) # Set Strategy Cash
self.SetBrokerageModel(AlphaStreamsBrokerageModel())
self.SetExecution(ImmediateExecutionModel())
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.UniverseSettings.Resolution = Resolution.Minute
self.SetUniverseSelection(LiquidETFUniverse())
# Helper dictionaries
self.macro_symbols = {'Bull' : Symbol.Create('SPY', SecurityType.Equity, Market.USA)}
self.models = {'Bull': None, 'Bear': None}
# Use Train() method to avoid runtime error
self.Train(self.TrainMyModel)
self.Train(self.DateRules.MonthEnd(), self.TimeRules.At(8,0), self.TrainMyModel)
# Schedule prediction and plotting
self.AddEquity('SPY')
self.Schedule.On(self.DateRules.EveryDay('SPY'), self.TimeRules.AfterMarketOpen('SPY', 5), self.Predict)
self.Schedule.On(self.DateRules.EveryDay('SPY'), self.TimeRules.AfterMarketOpen('SPY', 6), self.PlotMe)
# Create custom charts
prediction = Chart('Prediction Plot')
prediction.AddSeries(Series('Actual Bull', SeriesType.Line, 0))
prediction.AddSeries(Series('Predicted Bull', SeriesType.Line, 0))
prediction.AddSeries(Series('Actual Bear', SeriesType.Line, 1))
prediction.AddSeries(Series('Predicted Bear', SeriesType.Line, 1))
def TrainMyModel(self):
qb = self
# Fetch history
history = qb.History([symbol for key, symbol in self.macro_symbols.items()], 1280, Resolution.Daily)
# Iterate over macro symbols
for key, symbol in self.macro_symbols.items():
# Initialize LSTM class instance
lstm = MyLSTM()
# Prepare data
features_set, labels, training_data, test_data = lstm.ProcessData(history.loc[symbol].close)
# Build model layers
lstm.CreateModel(features_set, labels)
# Fit model
lstm.FitModel(features_set, labels)
# Add LSTM class to dictionary to store later
self.models[key] = lstm
def Predict(self):
delta = {}
qb = self
for key, symbol in self.macro_symbols.items():
# Fetch LSTM class
lstm = self.models[key]
# Fetch history
history = qb.History([symbol for key, symbol in self.macro_symbols.items()], 80, Resolution.Daily)
# Predict
predictions = lstm.PredictFromModel(history.loc[symbol].close)
# Grab latest prediction and calculate if predict symbol to go up or down
delta[key] = ( predictions[-1] / self.Securities[symbol].Price ) - 1
# Plot prediction
self.Plot('Prediction Plot', f'Predicted {key}', predictions[-1])
insights = []
# Iterate over macro symbols
for key, change in delta.items():
if key == 'Bull':
insights += [Insight.Price(symbol, timedelta(1), InsightDirection.Up if change > 0 else InsightDirection.Flat) for symbol in LiquidETFUniverse.SP500Sectors.Long if self.Securities.ContainsKey(symbol)]
insights += [Insight.Price(symbol, timedelta(1), InsightDirection.Up if change > 0 else InsightDirection.Flat) for symbol in LiquidETFUniverse.Treasuries.Inverse if self.Securities.ContainsKey(symbol)]
insights += [Insight.Price(symbol, timedelta(1), InsightDirection.Flat if change > 0 else InsightDirection.Up) for symbol in LiquidETFUniverse.Treasuries.Long if self.Securities.ContainsKey(symbol)]
self.EmitInsights(insights)
def PlotMe(self):
# Plot current price of symbols to match against prediction
for key, symbol in self.macro_symbols.items():
self.Plot('Prediction Plot', f'Actual {key}', self.Securities[symbol].Price)import numpy as np
import pandas as pd
from keras.layers import LSTM
from keras.layers import Dense
from keras.layers import Dropout
from keras.models import Sequential
from sklearn.preprocessing import MinMaxScaler
class MyLSTM:
def __init__(self):
self.model = None
self.scaler = MinMaxScaler(feature_range = (0, 1))
def ProcessData(self, data, n = 60):
# Split the data
training_data = data[:1260]
test_data = data[1260:]
# Transform data
training_data_array = np.array(training_data).reshape((len(training_data), 1))
training_data_scaled = self.scaler.fit_transform(training_data_array)
# Get features and labels
features_set = []
labels = []
for i in range(60, 1260):
features_set.append(training_data_scaled[i-60:i, 0])
labels.append(training_data_scaled[i, 0])
features_set, labels = np.array(features_set), np.array(labels)
features_set = np.reshape(features_set, (features_set.shape[0], features_set.shape[1], 1))
return features_set, labels, training_data, test_data
def CreateModel(self, features_set, labels):
# Create Model
self.model = Sequential()
self.model.add(LSTM(units = 50, return_sequences=True, input_shape=(features_set.shape[1], 1)))
self.model.add(Dropout(0.2))
self.model.add(LSTM(units=50, return_sequences=True))
self.model.add(Dropout(0.2))
self.model.add(LSTM(units=50, return_sequences=True))
self.model.add(Dropout(0.2))
self.model.add(LSTM(units=50))
self.model.add(Dropout(0.2))
self.model.add(Dense(units = 1))
self.model.compile(optimizer = 'adam', loss = 'mean_squared_error')
def FitModel(self, features_set, labels):
self.model.fit(features_set, labels, epochs = 50, batch_size = 32)
def PredictFromModel(self, test_data):
test_inputs = test_data[-80:].values
test_inputs = test_inputs.reshape(-1,1)
test_inputs = self.scaler.transform(test_inputs)
test_features = []
for i in range(60, 80):
test_features.append(test_inputs[i-60:i, 0])
test_features = np.array(test_features)
test_features = np.reshape(test_features, (test_features.shape[0], test_features.shape[1], 1))
predictions = self.model.predict(test_features)
predictions = self.scaler.inverse_transform(predictions)
return predictions