| Overall Statistics |
|
Total Trades 1249 Average Win 0.03% Average Loss -0.02% Compounding Annual Return 0.741% Drawdown 2.000% Expectancy 0.271 Net Profit 3.764% Sharpe Ratio 0.515 Probabilistic Sharpe Ratio 8.170% Loss Rate 44% Win Rate 56% Profit-Loss Ratio 1.26 Alpha 0.005 Beta 0.002 Annual Standard Deviation 0.01 Annual Variance 0 Information Ratio -0.637 Tracking Error 0.158 Treynor Ratio 2.103 Total Fees $2068.07 |
from Model import Model
import math
class OptimizedMultidimensionalShield(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2015, 9, 15)
self.SetEndDate(2020, 9, 15)
self.SetCash(100000)
self.SetPortfolioConstruction(InsightWeightingPortfolioConstructionModel(lambda time: None))
self.SetSecurityInitializer(lambda x: x.SetMarketPrice(self.GetLastKnownPrice(x)))
symbol = self.AddCrypto('BTCUSD', Resolution.Minute).Symbol
self.models = {}
self.SetWarmup(70, Resolution.Daily)
self.Schedule.On(self.DateRules.EveryDay(symbol), self.TimeRules.BeforeMarketClose(symbol), self.OnClose)
self.Schedule.On(self.DateRules.MonthStart(symbol), self.TimeRules.Midnight, self.TrainModels)
self.curr_month = -1
def OnSecuritiesChanged(self, changed):
for security in changed.AddedSecurities:
self.models[security.Symbol] = Model()
def OnClose(self):
for symbol, model in self.models.items():
model.Update(self.Securities[symbol].Price)
self.Plot('Custom', 'Points', int(model.IsReady))
if self.IsWarmingUp:
return
insights = []
for symbol, model in self.models.items():
if not model.IsReady:
continue
forecast = model.Forecast()
if forecast is None or math.isnan(forecast):
return
price = self.Securities[symbol].Price
weight = (forecast / price) - 1
insights.append(self.InsightHelper(symbol, weight))
if insights:
self.EmitInsights(insights)
def InsightHelper(self, symbol, percentage):
if abs(percentage) < 0.001:
return Insight.Price(symbol, timedelta(1), InsightDirection.Flat)
elif percentage > 0:
return Insight.Price(symbol, timedelta(1), InsightDirection.Up, None, None, None, percentage)
else:
return Insight.Price(symbol, timedelta(1), InsightDirection.Down, None, None, None, abs(percentage))
def TrainModels(self):
self.Debug('Training Models')
for model in self.models.values():
model.Train()from statsmodels.tsa.arima_model import ARIMA
from statsmodels.tsa.stattools import adfuller
from collections import deque
import numpy as np
import pandas as pd
from sklearn import metrics
class Model:
def __init__(self):
self.arima_order = None
self.data = deque(maxlen=70)
def Update(self, data):
self.data.append(data)
@property
def Ready2Train(self):
return len(self.data) == self.data.maxlen
@property
def IsReady(self):
return self.arima_order != None
def __transform_data(self, X):
'''
One method of transforming data so it is stationary
'''
X = np.diff(np.log(X))
return X
def __is_stationary(self, X, significance_level=.05):
'''
Return true if the stationarity of the time-series is significant
(according to the given significance level), else false
'''
result = adfuller(X)
p_value = result[1]
return p_value < significance_level
def __evaluate_arima_model(self, X, arima_order, oos_size=.2):
'''
evaluates an ARIMA model using MSE given an ARIMA order (p, d, q), returns MSE
oos_size - ratio of data used to test the model out-of-sample
'''
train_size = int(len(X) * (1-oos_size))
train_data, oos_data = X[0:train_size], X[train_size:]
history = deque([x for x in train_data], maxlen=len(train_data))
predictions = []
for i in range(len(oos_data)):
model = ARIMA(np.array(history), order=arima_order)
model_fit = model.fit(disp=0)
y_hat = model_fit.forecast()[0]
predictions.append(y_hat)
history.append(oos_data[i])
return metrics.mean_squared_error(oos_data, predictions)
def Train(self, p_values=range(6), d_values=[1], q_values=range(6)):
'''
grid searches the the given p, d, and q values, then updates the
model with the new params
'''
if not self.Ready2Train:
return False
data = self.__transform_data(self.data)
if not self.__is_stationary(data):
return False
best_score, best_pdq = float("inf"), None
for p in p_values:
for d in d_values:
for q in q_values:
order = (p,d,q)
try:
mse = self.__evaluate_arima_model(data, order)
if mse < best_score:
best_score, best_pdq = mse, order
except:
continue
self.arima_order = best_pdq
return True
def Forecast(self):
'''
'''
if not self.IsReady:
return None
og_data = np.array(self.data)[-50:]
data = self.__transform_data(og_data)
if not self.__is_stationary(data):
return None
model = ARIMA(data, self.arima_order)
try:
model_fit = model.fit(disp=0)
forecast = model_fit.forecast()[0][0]
forecast = self.__undo_forecast_transform(og_data, forecast)
return forecast
except:
return None
def __undo_forecast_transform(self, og_data, forecast):
'''
The forecasted value is not of the original data, but of
the transformed data. This method undoes the transformation of the forecasted value
Source of computation: https://stackoverflow.com/questions/52590745/what-is-the-inverse-operation-of-np-log-and-np-diff
'''
# this method will not work using numpy array's because np.diff
# loses the 0th element, while pandas fills it with NaN
series = pd.Series(og_data)
transformed = np.log(series).diff()
transformed.iat[0] = np.log(series.iat[0])
transformed = transformed.append(pd.Series([forecast]))
undone = np.exp(transformed.cumsum())
return undone.iat[-1]