| Overall Statistics |
|
Total Trades 523 Average Win 0.04% Average Loss -0.03% Compounding Annual Return -17.442% Drawdown 3.300% Expectancy -0.345 Net Profit -3.017% Sharpe Ratio -8.591 Probabilistic Sharpe Ratio 0.003% Loss Rate 69% Win Rate 31% Profit-Loss Ratio 1.11 Alpha -0.159 Beta 0.008 Annual Standard Deviation 0.018 Annual Variance 0 Information Ratio -5.547 Tracking Error 0.129 Treynor Ratio -20.044 Total Fees $0.81 |
from collections import deque
from MyLSTM import MyLSTM
class MultidimensionalHorizontalFlange(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1) # Set Start Date
self.SetCash(10000) # Set Strategy Cash
self.SetBrokerageModel(AlphaStreamsBrokerageModel())
self.history = dict()
tickers = ["EURUSD", "GBPUSD", "USDJPY", "AUDUSD", "NZDUSD","USDCAD", "USDCHF", "USDNOK", "USDSEK"]
# tickers = ["EURUSD"]
for ticker in tickers:
symbol = self.AddForex(ticker, Resolution.Minute, Market.Oanda).Symbol
self.history[symbol] = deque(maxlen=1280)
# Fetch history and update dictiotnary with historical prices
self.symbols = list(self.history.keys())
history = self.History(self.symbols, 1280).close.unstack(0)
for symbol in self.symbols:
for i, close in history[symbol].iteritems():
self.history[symbol].append(close)
self.SetPortfolioConstruction(EqualWeightingPortfolioConstructionModel())
self.SetExecution(ImmediateExecutionModel())
# Use Train() method to avoid runtime error
self.Train(self.TrainMyModel)
self.Train(self.DateRules.MonthEnd(), self.TimeRules.At(8,0), self.TrainMyModel)
# Schedule prediction and plotting
self.Schedule.On(self.DateRules.EveryDay("EURUSD"), self.TimeRules.AfterMarketOpen("EURUSD", 5), self.Predict)
# self.Schedule.On(self.DateRules.EveryDay("EURUSD"), self.TimeRules.AfterMarketOpen("EURUSD", 10), Action(self.trail_stop))
# self.Schedule.On(self.DateRules.EveryDay(self.symbol), self.TimeRules.AfterMarketOpen(self.symbol, 6), self.PlotMe)
# Create custom charts
# prediction = Chart('Prediction Plot')
# prediction.AddSeries(Series('Actual Bull', SeriesType.Line, 0))
# prediction.AddSeries(Series('Predicted Bull', SeriesType.Line, 0))
# prediction.AddSeries(Series('Actual Bear', SeriesType.Line, 1))
# prediction.AddSeries(Series('Predicted Bear', SeriesType.Line, 1))
def OnData(self, slice):
'''Update the history dict with the current data'''
for symbol in slice.Keys:
self.history[symbol].append(slice[symbol].Close)
def TrainMyModel(self):
# qb = self
# Iterate over macro symbols
for symbol, prices in self.history.items():
# Initialize LSTM class instance
lstm = MyLSTM()
# Prepare data
features_set, labels, training_data, test_data = lstm.ProcessData(prices)
# Build model layers
lstm.CreateModel(features_set, labels)
# Fit model
lstm.FitModel(features_set, labels)
# Add LSTM class to dictionary to store later
# Alex: maybe it should be self.models[symbol], becaus each symbol has one model.
self.models = lstm
def Predict(self):
delta = {}
qb = self
for symbol, prices in self.history.items():
# Fetch LSTM class
lstm = self.models
# Fetch history
data = list(prices)[:80]
# Predict
predictions = lstm.PredictFromModel(data)
# Grab latest prediction and calculate if predict symbol to go up or down
delta = ( predictions[-1] / self.Securities[symbol].Price ) - 1
# Plot prediction
# self.Plot('Prediction Plot', f'Predicted {key}', predictions[-1])
insights = []
# Iterate over macro symbols
for change in delta:
for price in list(self.history.values()):
insights += [Insight.Price(symbol, timedelta(1), InsightDirection.Up if change > 0 else InsightDirection.Down) for symbol in list(self.history.keys())]
# insights += [Insight.Price(symbol, timedelta(1), InsightDirection.Down if change < 0) for symbol in list(self.history.keys())]
# insights += [Insight.Price(symbol, timedelta(1), InsightDirection.Flat if change > 0 else InsightDirection.Up) for symbol in LiquidETFUniverse.Treasuries.Long if self.Securities.ContainsKey(symbol)]
self.EmitInsights(insights)
# def trail_stop(self):
# hist = self.History(self.symbols, 3, Resolution.Daily)
# for symbol in self.symbols:
# mean_price = (hist.loc[symbol.Value]['close']).mean()
# # Stop loss percentage is the return over the lookback period
# stoploss = abs(symbol.close * self.lookback / 252.0) + 1 # percent change per period
# if symbol.close > 0 and symbol.stopprice is not None:
# if symbol.stopprice is not None and symbol.stopprice < 0:
# symbol.stopprice = mean_price / stoploss
# else:
# symbol.stopprice = max(mean_price / stoploss, symbol.stopprice)
# if mean_price < symbol.stopprice:
# symbol.weight = 0
# self.Liquidate(symbol)
# elif symbol.weight < 0 and symbol.stopprice is not None:
# if symbol.stopprice is not None and symbol.stopprice < 0:
# symbol.stopprice = mean_price * stoploss
# else:
# symbol.stopprice = min(mean_price * stoploss, symbol.stopprice)
# if mean_price > symbol.stopprice:
# symbol.weight = 0
# self.Liquidate(symbol)
# else:
# symbol.stopprice = None
# # def PlotMe(self):
# # # Plot current price of symbols to match against prediction
# for key, symbol in self.macro_symbols.items():
# self.Plot('Prediction Plot', f'Actual {key}', self.Securities[symbol].Price)import numpy as np
import pandas as pd
from keras.layers import LSTM
from keras.layers import Dense
from keras.layers import Dropout
from keras.models import Sequential
from sklearn.preprocessing import MinMaxScaler
class MyLSTM:
def __init__(self):
self.model = None
self.scaler = MinMaxScaler(feature_range = (0, 1))
def ProcessData(self, queue, n = 60):
# Convert the queue into a list
data = list(queue)
# Split the data
training_data = data[:1260]
test_data = data[1260:]
# Transform data
training_data_array = np.array(training_data).reshape((len(training_data), 1))
training_data_scaled = self.scaler.fit_transform(training_data_array)
# Get features and labels
features_set = []
labels = []
for i in range(60, 1260):
features_set.append(training_data_scaled[i-60:i, 0])
labels.append(training_data_scaled[i, 0])
features_set, labels = np.array(features_set), np.array(labels)
features_set = np.reshape(features_set, (features_set.shape[0], features_set.shape[1], 1))
return features_set, labels, training_data, test_data
def CreateModel(self, features_set, labels):
# Create Model
self.model = Sequential()
self.model.add(LSTM(units = 5, return_sequences=True, input_shape=(features_set.shape[1], 1)))
self.model.add(Dropout(0.2))
self.model.add(LSTM(units=5, return_sequences=True))
self.model.add(Dropout(0.2))
self.model.add(LSTM(units=5, return_sequences=True))
self.model.add(Dropout(0.2))
self.model.add(LSTM(units=5))
self.model.add(Dropout(0.2))
self.model.add(Dense(units = 1, activation = 'relu'))
# self.model.add(LSTM(units = 100, return_sequences=True, input_shape=(features_set.shape[1], 1)))
# self.model.add(Dropout(0.5))
# self.model.add(Dense(units=20, activation = 'relu'))
# self.model.add(Dense(units=1,activation = 'sigmoid'))
self.model.compile(optimizer = 'adam', loss = 'mean_squared_error')
def FitModel(self, features_set, labels):
self.model.fit(features_set, labels, epochs = 10, batch_size = 10)
def PredictFromModel(self, test_data):
test_inputs = np.array(test_data[-80:])
test_inputs = test_inputs.reshape(-1,1)
test_inputs = self.scaler.transform(test_inputs)
test_features = []
for i in range(60, 80):
test_features.append(test_inputs[i-60:i, 0])
test_features = np.array(test_features)
test_features = np.reshape(test_features, (test_features.shape[0], test_features.shape[1], 1))
predictions = self.model.predict(test_features)
predictions = self.scaler.inverse_transform(predictions)
return predictions# Add EMA to history>if price under ema then down insight, if price above ema up insight # Your New Python File