| Overall Statistics |
|
Total Trades 81 Average Win 1.37% Average Loss -0.90% Compounding Annual Return -2.366% Drawdown 15.800% Expectancy 0.060 Net Profit -2.750% Sharpe Ratio -0.041 Probabilistic Sharpe Ratio 8.858% Loss Rate 58% Win Rate 42% Profit-Loss Ratio 1.52 Alpha 0.003 Beta 0.687 Annual Standard Deviation 0.157 Annual Variance 0.025 Information Ratio 0.06 Tracking Error 0.12 Treynor Ratio -0.009 Total Fees $134.91 Estimated Strategy Capacity $760000000.00 Lowest Capacity Asset SPY R735QTJ8XC9X |
# region imports
from AlgorithmImports import *
import tensorflow.compat.v1 as tf
from google.protobuf import json_format
import json5
tf.disable_v2_behavior()
# endregion
class TensorflowExampleAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2021, 7, 1)
self.SetCash(100000)
self.symbol = self.AddEquity("SPY", Resolution.Daily).Symbol
training_length = 252*2
self.training_data = RollingWindow[float](training_length)
history = self.History[TradeBar](self.symbol, training_length, Resolution.Daily)
for trade_bar in history:
self.training_data.Add(trade_bar.Close)
if self.ObjectStore.ContainsKey('graph') and self.ObjectStore.ContainsKey('weights'):
json_graph = self.ObjectStore.Read('graph')
json_weights = self.ObjectStore.Read('weights')
# Restore the tensorflow graph from JSON objects
tf.reset_default_graph()
graph_definition = json_format.Parse(json_graph, tf.MetaGraphDef())
self.model = tf.Session()
tf.train.import_meta_graph(graph_definition)
# Select the input, output tensors and optimizer
self.X = tf.get_default_graph().get_tensor_by_name('X:0')
self.Y = tf.get_default_graph().get_tensor_by_name('Y:0')
self.output = tf.get_default_graph().get_tensor_by_name('outer:0')
self.optimizer = tf.get_default_graph().get_collection('Variable/Adam')
# Restore the model weights from the JSON object.
weights = [np.asarray(x) for x in json5.loads(json_weights)]
assign_ops = []
feed_dict = {}
vs = tf.trainable_variables()
zipped_values = zip(vs, weights)
for var, value in zipped_values:
value = np.asarray(value)
assign_placeholder = tf.placeholder(var.dtype, shape=value.shape)
assign_op = var.assign(assign_placeholder)
assign_ops.append(assign_op)
feed_dict[assign_placeholder] = value
self.model.run(assign_ops, feed_dict=feed_dict)
else:
self.model, self.X, self.Y, self.output, self.optimizer = self.BuildModel()
self.model.run(tf.global_variables_initializer())
self.Train(self.my_training_method)
# Train the model every Sunday at 8:00 AM
self.Train(self.DateRules.Every(DayOfWeek.Sunday), self.TimeRules.At(8, 0), self.my_training_method)
def BuildModel(self):
# Instantiate a tensorflow session
sess = tf.Session()
# Declare the number of factors and then create placeholders for the input and output layers.
num_factors = 5
X = tf.placeholder(dtype=tf.float32, shape=[None, num_factors], name='X')
Y = tf.placeholder(dtype=tf.float32, shape=[None], name='Y')
# Set up the weights and bias initializers for each layer.
weight_initializer = tf.variance_scaling_initializer(mode="fan_avg", distribution="uniform", scale=1)
bias_initializer = tf.zeros_initializer()
# Create hidden layers that use the Relu activator.
num_neurons_1 = 32
num_neurons_2 = 16
num_neurons_3 = 8
W_hidden_1 = tf.Variable(weight_initializer([num_factors, num_neurons_1]))
bias_hidden_1 = tf.Variable(bias_initializer([num_neurons_1]))
hidden_1 = tf.nn.relu(tf.add(tf.matmul(X, W_hidden_1), bias_hidden_1))
W_hidden_2 = tf.Variable(weight_initializer([num_neurons_1, num_neurons_2]))
bias_hidden_2 = tf.Variable(bias_initializer([num_neurons_2]))
hidden_2 = tf.nn.relu(tf.add(tf.matmul(hidden_1, W_hidden_2), bias_hidden_2))
W_hidden_3 = tf.Variable(weight_initializer([num_neurons_2, num_neurons_3]))
bias_hidden_3 = tf.Variable(bias_initializer([num_neurons_3]))
hidden_3 = tf.nn.relu(tf.add(tf.matmul(hidden_2, W_hidden_3), bias_hidden_3))
# Create the output layer and give it a name, so it is accessible after saving and loading the model.
W_out = tf.Variable(weight_initializer([num_neurons_3, 1]))
bias_out = tf.Variable(bias_initializer([1]))
output = tf.transpose(tf.add(tf.matmul(hidden_3, W_out), bias_out), name='outer')
# Set up the loss function and optimizers for gradient descent optimization and backpropagation.
# This example uses mean-square error as the loss function because the close price is a continuous data and uses Adam as the optimizer because of its adaptive step size.
loss = tf.reduce_mean(tf.squared_difference(output, Y))
optimizer = tf.train.AdamOptimizer().minimize(loss)
return sess, X, Y, output, optimizer
def get_features_and_labels(self, n_steps=5):
close_prices = list(self.training_data)[::-1]
features = []
labels = []
for i in range(len(close_prices)-n_steps):
features.append(close_prices[i:i+n_steps])
labels.append(close_prices[i+n_steps])
features = np.array(features)
labels = np.array(labels)
return features, labels
def my_training_method(self):
features, labels = self.get_features_and_labels()
self.model.run(self.optimizer, feed_dict={self.X: features, self.Y: labels})
def OnData(self, slice: Slice) -> None:
if self.symbol in slice.Bars:
self.training_data.Add(slice.Bars[self.symbol].Close)
new_features, __ = self.get_features_and_labels()
prediction = self.model.run(self.output, feed_dict={self.X: new_features[-1].reshape(1, -1)})
prediction = float(prediction.flatten()[-1])
if prediction > slice[self.symbol].Price:
self.SetHoldings(self.symbol, 1)
else:
self.SetHoldings(self.symbol, -1)
def OnEndOfAlgorithm(self):
graph_definition = tf.train.export_meta_graph()
json_graph = json_format.MessageToJson(graph_definition)
weights = self.model.run(tf.trainable_variables())
weights = [w.tolist() for w in weights]
json_weights = json5.dumps(weights)
self.ObjectStore.Save('graph', json_graph)
self.ObjectStore.Save('weights', json_weights)