| Overall Statistics |
|
Total Trades 9 Average Win 0.42% Average Loss -1.71% Compounding Annual Return -89.585% Drawdown 8.400% Expectancy -0.688 Net Profit -3.650% Sharpe Ratio -2.834 Probabilistic Sharpe Ratio 6.694% Loss Rate 75% Win Rate 25% Profit-Loss Ratio 0.25 Alpha -1.319 Beta 1.015 Annual Standard Deviation 0.316 Annual Variance 0.1 Information Ratio -4.195 Tracking Error 0.313 Treynor Ratio -0.882 Total Fees $0.00 Estimated Strategy Capacity $430000.00 Lowest Capacity Asset BTCUSD E3 |
import clr
clr.AddReference("System")
clr.AddReference("QuantConnect.Algorithm")
clr.AddReference("QuantConnect.Common")
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
import tensorflow as tf
tf.config.threading.set_inter_op_parallelism_threads(2)
tf.config.threading.set_intra_op_parallelism_threads(1)
import json
import numpy as np
import pandas as pd
from io import StringIO
from keras.models import Sequential
from keras.layers import Dense, Activation,LSTM,Dropout,Embedding,Flatten
from keras.optimizers import SGD, Adam
from keras.utils.generic_utils import serialize_keras_object
from sklearn.preprocessing import MinMaxScaler
class SimpleEnergyTroll(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2021, 10, 20) # Set Start Date
self.SetEndDate(2021, 10, 25) # Set End Date
self.SetCash(3000) # Set Strategy Cash
self.contracts = {}
self.modelBySymbol = {}
self.scalersBySymbol = {}
# In Initialize
for ticker in ["BTCUSD"]: #, "LTCUSD"]:
symbol = self.AddCrypto(ticker, Resolution.Minute, Market.Bitfinex).Symbol
self.modelBySymbol[symbol] = None
# Read the model saved in the ObjectStore
#if self.ObjectStore.ContainsKey(f'{symbol}_model'):
# modelStr = self.ObjectStore.Read(f'{symbol}_model')
# config = json.loads(modelStr)['config']
# self.modelBySymbol[symbol] = Sequential.from_config(config)
# Look-back period for training set
self.lookback = 60 * 2
# Timesteps defines how much features will feed to the network
self.timesteps = 60
# Train Neural Network every monday
self.Train(
#self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Tuesday, DayOfWeek.Wednesday, DayOfWeek.Thursday, DayOfWeek.Friday),
self.DateRules.EveryDay(),
#self.TimeRules.Every(TimeSpan.FromMinutes(20)),
self.TimeRules.At(2, 0),
self.NeuralNetworkTraining)
# Place trades every Weekday, 30 minutes after the market is open
self.Train(self.DateRules.Every(DayOfWeek.Monday, DayOfWeek.Tuesday, DayOfWeek.Wednesday, DayOfWeek.Thursday, DayOfWeek.Friday), \
#self.TimeRules.At(4, 0), \
self.TimeRules.Every(TimeSpan.FromMinutes(60)), \
Action(self.Trade))
# Explore the future contract chain
#def OnData(self, slice):
#self.contracts = {}
#for chain in slice.FutureChains:
# contract = list(chain.Value)[0]
# self.contracts[contract.Symbol] = contract
def OnEndOfAlgorithm(self):
''' Save the data and the mode using the ObjectStore '''
for symbol, model in self.modelBySymbol.items():
modelStr = json.dumps(serialize_keras_object(model))
self.ObjectStore.Save(f'{symbol}_model', modelStr)
self.Debug(f'Model for {symbol} sucessfully saved in the ObjectStore')
def NeuralNetworkTraining(self):
'''Train the Neural Network and save the model in the ObjectStore'''
symbols = list(self.modelBySymbol.keys())
if len(symbols) == 0:
self.Debug("no contracts found")
return
for symbol in symbols:
try:
# Hourly historical data is used to train the machine learning model
history = self.History(symbol, (self.lookback + self.timesteps), Resolution.Minute)
#self.Debug(history)
except:
self.Debug("Failed to receive history")
#history = self.x_scaler.fit_transform(history)
if 'open' in history and 'close' in history and 'high' in history and 'low' in history:
history = np.column_stack((history['open'], history['close'], history['high'], history['low']))
#history = np.column_stack((history['open']))
if len(history) < self.lookback:
self.Debug("Error while collecting the training data")
continue
#history = list([i[0] for i in history])
#self.Debug("Start Training for symbol {0}".format(symbol))
#First convert the data into 3D Array with (x train samples, 60 timesteps, 1 feature)
x_train = []
y_train = []
for i in range(self.timesteps, len(history)):
x_train.append(history[i - self.timesteps:i])
y_train.append([history[i][0]])
x_train, y_train = np.array(x_train), np.array(y_train)
x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 4))
y_train = np.reshape(y_train, (y_train.shape[0], 1))
if np.any(np.isnan(x_train)):
self.Debug("Error in Training Data")
continue
if np.any(np.isnan(y_train)):
self.Debug("Error in Validation Data")
continue
x_scaler = MinMaxScaler(feature_range=(0, 1))
y_scaler = MinMaxScaler(feature_range=(0, 1))
x_train = x_scaler.fit_transform(x_train.reshape(-1, x_train.shape[-1])).reshape(x_train.shape)
#x_train = self.x_scaler.fit_transform(x_train)
y_train = y_scaler.fit_transform(y_train)
#self.Debug(x_train.shape)
#self.Debug(y_train.shape)
#self.Debug(y_train)
# build a neural network from the 1st layer to the last layer
'''
model = Sequential()
model.add(Dense(10, input_dim = 1))
model.add(Activation('relu'))
model.add(Dense(1))
sgd = SGD(lr = 0.01) # learning rate = 0.01
# choose loss function and optimizing method
model.compile(loss='mse', optimizer=sgd)
'''
if symbol in self.modelBySymbol and self.modelBySymbol[symbol] is not None:
model = self.modelBySymbol[symbol]
iterations = 1
else:
#If Model not exist for symbol then create one
opt_cells = 5
model = Sequential()
model.add(LSTM(units = opt_cells, return_sequences = True, input_shape = (x_train.shape[1], 4)))
model.add(Dropout(0.2))
model.add(LSTM(units = opt_cells, return_sequences = True))
model.add(Dropout(0.2))
model.add(LSTM(units = opt_cells, return_sequences = True))
model.add(Dropout(0.2))
model.add(LSTM(units = opt_cells, return_sequences = False))
model.add(Dropout(0.2))
model.add(Dense(1, activation='linear'))
adam = Adam(lr=0.001, clipnorm=1.0)
model.compile(loss='mean_squared_error', optimizer=adam, metrics=['accuracy'])
iterations = 50
# pick an iteration number large enough for convergence
for step in range(iterations):
# training the model
#cost = model.train_on_batch(predictor, predictand)
hist = model.fit(x_train, y_train, epochs = 1) #verbose=0,
acc = list(hist.history['accuracy'])[-1]
loss = list(hist.history['loss'])[-1]
self.scalersBySymbol[symbol] = (x_scaler, y_scaler)
self.modelBySymbol[symbol] = model
self.Debug("End Training for symbol {0} with accuracy {1}".format(symbol, acc))
def Trade(self):
'''
Predict the price using the trained model and out-of-sample data
Enter or exit positions based on relationship of the open price of the current bar and the prices defined by the machine learning model.
Liquidate if the open price is below the sell price and buy if the open price is above the buy price
'''
target = 1 / len(self.Securities)
for symbol, model in self.modelBySymbol.items():
if model is None:
continue
# Get the out-of-sample history
try:
history = self.History(symbol, (self.lookback + self.timesteps), Resolution.Minute)
except:
self.Debug("Failed to receive history")
#history = self.x_scaler.fit_transform(history)
if 'open' in history and 'close' in history and 'high' in history and 'low' in history:
history = np.column_stack((history['open'], history['close'], history['high'], history['low']))
#history = np.column_stack((history['open']))
if len(history) < self.lookback:
self.Debug("Error while collecting the testing data")
continue
#history = self.x_scaler.fit_transform(history)
#history = list([i[0] for i in history])
#if not 'open' in history:
# continue
#history = history['open'].to_list()
#if len(history) < self.lookback:
# self.Debug("Error while collecting the testing data")
# continue
#First convert the data into 3D Array with (x train samples, 60 timesteps, 1 feature)
x_test = []
for i in range(self.timesteps, len(history)):
x_test.append(history[i - self.timesteps:i])
x_test = np.array(x_test)
x_test = np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 4))
if np.any(np.isnan(x_test)):
self.Debug("Error in Testing Data")
continue
if symbol in self.scalersBySymbol and self.scalersBySymbol[symbol] is not None:
x_scaler = self.scalersBySymbol[symbol][0]
y_scaler = self.scalersBySymbol[symbol][1]
x_test = x_scaler.transform(x_test.reshape(-1, x_test.shape[-1])).reshape(x_test.shape)
#x_test = self.x_scaler.fit_transform(x_test)
#self.Debug(x_test.shape)
# Get the final predicted price
#self.Debug(model.predict(history))
predictions = model.predict(x_test)
#self.Debug(predictions)
prediction = y_scaler.inverse_transform(predictions)[0][-1]
historyStd = np.std(history)
self.Debug("Prediction for symbol {0}: {1}".format(symbol, prediction))
holding = self.Portfolio[symbol]
openPrice = self.Securities[symbol].Open;
self.Debug("Open Price for symbol {0}: {1}".format(symbol, openPrice))
# Follow the trend
if holding.Invested:
#self.Debug("if {0} < {1}".format(openPrice, prediction - historyStd))
if openPrice < prediction - historyStd:
self.Liquidate(symbol)
else:
self.Debug("if {0} > {1}".format(openPrice, prediction + historyStd))
if openPrice > prediction + historyStd:
self.SetHoldings(symbol, target)