| Overall Statistics |
|
Total Trades 1311 Average Win 0.03% Average Loss -0.03% Compounding Annual Return 0.477% Drawdown 2.900% Expectancy 0.123 Net Profit 2.409% Sharpe Ratio 0.262 Probabilistic Sharpe Ratio 2.186% Loss Rate 40% Win Rate 60% Profit-Loss Ratio 0.88 Alpha 0.003 Beta 0.004 Annual Standard Deviation 0.013 Annual Variance 0 Information Ratio -0.654 Tracking Error 0.156 Treynor Ratio 0.92 Total Fees $1854.40 |
# QUANTCONNECT.COM - Democratizing Finance, Empowering Individuals.
# Lean Algorithmic Trading Engine v2.0. Copyright 2020 QuantConnect Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from Model import Model
import math
class OptimizedMultidimensionalShield(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2015, 9, 15)
self.SetEndDate(2020, 9, 15)
self.SetCash(100000)
self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel(lambda time: None))
self.SetSecurityInitializer(lambda x: x.SetMarketPrice(self.GetLastKnownPrice(x)))
symbol = self.AddCrypto('BTCUSD', Resolution.Minute).Symbol
self.models = {}
self.SetWarmup(70, Resolution.Daily)
self.Schedule.On(self.DateRules.EveryDay(symbol), self.TimeRules.BeforeMarketClose(symbol), self.OnClose)
self.Schedule.On(self.DateRules.MonthStart(symbol), self.TimeRules.Midnight, self.TrainModels)
self.curr_month = -1
def OnSecuritiesChanged(self, changed):
for security in changed.AddedSecurities:
self.models[security.Symbol] = Model()
def OnClose(self):
for symbol, model in self.models.items():
model.Update(self.Securities[symbol].Price)
self.Plot('Custom', 'Points', int(model.IsReady))
if self.IsWarmingUp:
return
insights = []
for symbol, model in self.models.items():
if not model.IsReady:
continue
forecast = model.Forecast()
if forecast is None or math.isnan(forecast):
return
price = self.Securities[symbol].Price
weight = (forecast / price) - 1
insights.append(self.InsightHelper(symbol, weight))
if insights:
self.EmitInsights(insights)
def InsightHelper(self, symbol, percentage):
if abs(percentage) < 0.001:
return Insight.Price(symbol, timedelta(1), InsightDirection.Flat)
elif percentage > 0:
return Insight.Price(symbol, timedelta(1), InsightDirection.Up, None, None, None, percentage)
else:
return Insight.Price(symbol, timedelta(1), InsightDirection.Down, None, None, None, abs(percentage))
def TrainModels(self):
self.Debug('Training Models')
for model in self.models.values():
model.Train()from statsmodels.tsa.arima_model import ARIMA
from statsmodels.tsa.stattools import adfuller
from collections import deque
import numpy as np
import pandas as pd
from sklearn import metrics
class Model:
def __init__(self):
self.arima_order = None
self.data = deque(maxlen=70)
def Update(self, data):
'''
Updates our model with one point of data
'''
self.data.append(data)
@property
def Ready2Train(self):
return len(self.data) == self.data.maxlen
@property
def IsReady(self):
return self.arima_order != None
def __transform_data(self, X):
'''
A method of transforming non-stationary time-series into a stationary time-series
'''
X = np.diff(np.log(X))
return X
def __is_stationary(self, X, significance_level=.05):
'''
Return true if the stationarity of the time-series is significant
(according to the given significance level), else false
'''
result = adfuller(X)
p_value = result[1]
return p_value < significance_level
def __evaluate_arima_model(self, X, arima_order, oos_size=20):
'''
evaluates an ARIMA model using MSE given an ARIMA order (p, d, q), returns MSE
oos_size - ratio of data used to test the model out-of-sample
'''
train_data, oos_data = X[:-oos_size], X[-oos_size:]
history = deque([x for x in train_data], maxlen=len(train_data))
predictions = []
for i in range(len(oos_data)):
model = ARIMA(np.array(history), order=arima_order)
model_fit = model.fit(disp=0)
y_hat = model_fit.forecast()[0]
predictions.append(y_hat)
history.append(oos_data[i])
return metrics.mean_squared_error(oos_data, predictions)
def Train(self, p_values=range(6), d_values=[1], q_values=range(6)):
'''
grid searches the the given p, d, and q values, then updates the
model with the new params
'''
if not self.Ready2Train:
return False
data = self.__transform_data(self.data)
if not self.__is_stationary(data):
return False
best_score, best_pdq = float("inf"), None
for p in p_values:
for d in d_values:
for q in q_values:
order = (p,d,q)
try:
mse = self.__evaluate_arima_model(data, order)
if mse < best_score:
best_score, best_pdq = mse, order
except:
continue
self.arima_order = best_pdq
return True
def Forecast(self):
'''
Forecast one data point into the future
'''
if not self.IsReady:
return None
data = np.array(self.data)[-50:]
data = self.__transform_data(data)
# we want to avoid using an ARIMA model on a non-stationary time-series
if not self.__is_stationary(data):
return None
model = ARIMA(data, self.arima_order)
try:
model_fit = model.fit(disp=0)
forecast = model_fit.forecast()[0][0]
# we need to undo the log+differencing transform
# to get the actual price forecast
price_forecast = np.exp(forecast+np.log(self.data[-1]))
return price_forecast
except:
return None