| Overall Statistics |
|
Total Trades 102 Average Win 1.35% Average Loss -1.03% Compounding Annual Return 13.148% Drawdown 13.400% Expectancy 0.311 Net Profit 16.723% Sharpe Ratio 0.922 Probabilistic Sharpe Ratio 43.467% Loss Rate 43% Win Rate 57% Profit-Loss Ratio 1.31 Alpha 0.095 Beta -0.004 Annual Standard Deviation 0.103 Annual Variance 0.011 Information Ratio -0.309 Tracking Error 0.276 Treynor Ratio -22.752 Total Fees $78.89 Estimated Strategy Capacity $690000.00 |
import datetime, pytz
class VirtualBlackDinosaur(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1) # Set Start Date
self.SetEndDate(2021, 4, 1)
self.SetCash(1000) # Set Strategy Cash
self.ticker = "BTCUSD"
self.BTC = self.AddCrypto(self.ticker, Resolution.Minute).Symbol
#Commission
self.Securities[self.ticker].SetFeeModel(CustomFeeModel())
#Indicators
self.rsiFourteen = RelativeStrengthIndex(14)
self.rsiSeven = RelativeStrengthIndex(7)
self.rsiTwo = RelativeStrengthIndex(2)
self.emaCinquanta = ExponentialMovingAverage(50)
self.emaCento = ExponentialMovingAverage(100)
self.emaDuecento = ExponentialMovingAverage(200)
self.wEmaFast = ExponentialMovingAverage(9)
self.wEmaSlow = ExponentialMovingAverage(20)
#Conolidators
consolidatorWeek = self.Consolidate(self.BTC, timedelta(minutes=10080), self.WeeklyBarHandler)
consolidatorFifteen = self.Consolidate(self.BTC, timedelta(minutes=15), self.FifteenMinuteBarHandler)
consolidatorFive = self.Consolidate(self.BTC, timedelta(minutes=5), self.FiveMinuteBarHandler)
#Register
self.RegisterIndicator(self.ticker, self.rsiFourteen, consolidatorFifteen)
self.RegisterIndicator(self.ticker, self.rsiTwo, consolidatorFifteen)
self.RegisterIndicator(self.ticker, self.emaCento, consolidatorFifteen)
self.RegisterIndicator(self.ticker, self.emaDuecento, consolidatorFifteen)
self.RegisterIndicator(self.ticker, self.emaCinquanta, consolidatorFifteen)
self.RegisterIndicator(self.ticker, self.rsiSeven, consolidatorFive)
self.RegisterIndicator(self.ticker, self.wEmaFast, consolidatorWeek)
self.RegisterIndicator(self.ticker, self.wEmaSlow, consolidatorWeek)
#Rolling Window
self.rsiSevenWindow = RollingWindow[float](2)
self.rsiFourWindow = RollingWindow[float](2)
self.rsiTwoWindow = RollingWindow[float](2)
self.emaCinquantaWindow = RollingWindow[float](2)
self.emaCentoWindow = RollingWindow[float](2)
self.emaDuecentoWindow = RollingWindow[float](2)
self.fastWindow = RollingWindow[float](2)
self.slowWindow = RollingWindow[float](2)
self.loWindow = RollingWindow[float](1)
#Warm up
self.SetWarmUp(46000)
#Variable
self.buyMarketTicket = None
self.newLow = False
self.now = None
self.BuyTime = None
def FifteenMinuteBarHandler(self,consolidated):
self.rsiFourWindow.Add(self.rsiFourteen.Current.Value)
self.rsiTwoWindow.Add(self.rsiTwo.Current.Value)
self.emaCentoWindow.Add(self.emaCento.Current.Value)
self.emaDuecentoWindow.Add(self.emaDuecento.Current.Value)
self.emaCinquantaWindow.Add(self.emaCinquanta.Current.Value)
def FiveMinuteBarHandler(self,consolidated):
self.rsiSevenWindow.Add(self.rsiSeven.Current.Value)
if self.newLow is True:
if consolidated.Low < self.loWindow[0] and (self.buyMarketTicket.Time < self.now < (self.buyMarketTicket.Time + datetime.timedelta(hours=4))):
self.loWindow.Add(consolidated.Low)
self.Log(consolidated.Low)
self.Log(self.now)
self.Log(self.buyMarketTicket.Time)
def WeeklyBarHandler(self,consolidated):
self.fastWindow.Add(self.wEmaFast.Current.Value)
self.slowWindow.Add(self.wEmaSlow.Current.Value)
def OnData(self, data):
if not self.rsiSevenWindow.IsReady: return
if not self.rsiFourWindow.IsReady: return
if not self.rsiTwoWindow.IsReady: return
if not self.emaCentoWindow.IsReady: return
if not self.emaDuecentoWindow.IsReady: return
if not self.fastWindow.IsReady: return
if not self.slowWindow.IsReady: return
if self.IsWarmingUp: return
self.now = pytz.utc.localize(self.Time)
if self.emaCentoWindow[0] > self.emaDuecentoWindow[0] and self.rsiFourWindow[0] < 30 and self.rsiSevenWindow[0] < 20 and self.buyMarketTicket == None and self.fastWindow[0] > self.slowWindow[0]:
dollarShare = self.CalculateOrderQuantity(self.BTC, 1)
self.buyMarketTicket = self.MarketOrder(self.BTC, dollarShare)
#Add new Low
self.loWindow.Add(data[self.BTC].Low)
#Set Variable
self.newLow = True
if (self.rsiTwoWindow[0] > 90 or self.rsiSevenWindow[0] >= 70 or (self.loWindow.IsReady and self.buyMarketTicket is not None and ( self.now > (self.buyMarketTicket.Time + datetime.timedelta(hours=4))) and data[self.BTC].Close < self.loWindow[0])) and self.buyMarketTicket is not None:
Held = self.Portfolio[self.BTC].Quantity
self.MarketOrder(self.BTC, -Held)
self.buyMarketTicket = None
self.newLow = False
class CustomFeeModel:
def GetOrderFee(self, parameters):
fee = (((parameters.Security.Price * parameters.Order.AbsoluteQuantity) * 0.075)/100)
return OrderFee(CashAmount(fee, 'USD'))'''
Supervised classifier to handle IN/OUT predictions
'''
from scipy.signal import savgol_filter as sg
from sklearn.tree import DecisionTreeRegressor as dtr
from sklearn import preprocessing
from sklearn.decomposition import FastICA
from sklearn.model_selection import cross_val_score
import pandas as pd
import numpy as np
import scipy.stats as stats
from sklearn.model_selection import RandomizedSearchCV as opt
from sklearn.cluster import KMeans as kmean
from sklearn.ensemble import RandomForestClassifier as clf
class IOalphaModel:
def __init__(self, algo, signalPeriod = 1, bounceThreshold = 0.01, resolution = Resolution.Daily):
self.ica = FastICA(
n_components=None,
algorithm='parallel',
whiten=True,
fun='logcosh',
fun_args=None,
max_iter=200,
tol=0.0001,
w_init=None,
random_state=0
)
self.params = {
'n_estimators' : stats.randint(2,300),
'ccp_alpha' : stats.expon(scale=0.01),
'bootstrap' : [True, False]
}
learner = clf()
self.model = opt(
learner,
self.params,
n_iter=10,
scoring=None,
n_jobs=None,
refit=True,
cv=None,
verbose=0,
pre_dispatch='2*n_jobs',
random_state=1,
return_train_score=False)
self.cluster = kmean(
n_clusters=8,
init='k-means++',
n_init=10,
max_iter=300,
tol=0.0001,
verbose=0,
random_state=1,
copy_x=True,
algorithm='auto'
)
#self.resolution = resolution
#self.insightPeriod = Time.Multiply(Extensions.ToTimeSpan(resolution), signalPeriod) # not used
#self.bounceThreshold = bounceThreshold # not used
self.algo = algo
self.isTrained = False
self.mu = -1
self.sigma = -1
def TrainModel(self, data, window_length=7, polyorder=5):
copy = data.copy()
if self.algo.IsWarmingUp:
return
# preprocess inputs
y = copy.pop('SPY')
X = copy
X_1, X_2, X_3, y_1 = self.Preprocess(X, y, window_length, polyorder)
# need to shift y left one timestep so we predict y(n+1) with X(n). Then drop last datapoint from X
y_transformed = y_1[1:]
y_transformed = self.Discretize(y_transformed)
self.mu = np.mean(y_transformed)
self.sigma = np.std(y_transformed)
#X_transformed = self.cluster.fit_transform(X_transformed)
X_transformed = np.vstack([ X_1, X_2, X_3, y_1])
for x in X_transformed[:]:
x = self.Discretize(x)
X_transformed = np.transpose(X_transformed)[:-1]
self.model.fit(X_transformed, y_transformed)
scores = cross_val_score(self.model, X_transformed, y_transformed, cv=3)
# moment of truth
self.algo.Debug(f"accuracy:{scores.mean()}, sigma:{scores.std()}")
self.isTrained = True
def Preprocess(self, X, y=[], window_length=7, polyorder=5):
# apply savitzy-golay filter and transform y->y', X->X',X'',X'''
X_1 = [sg(x, window_length, polyorder, deriv=1, delta=1.0, axis=-1, mode='interp', cval=0.0) for x in X.values()]
X_2 = [sg(x, window_length, polyorder, deriv=2, delta=1.0, axis=-1, mode='interp', cval=0.0) for x in X.values()]
X_3 = [sg(x, window_length, polyorder, deriv=3, delta=1.0, axis=-1, mode='interp', cval=0.0) for x in X.values()]
if len(y) > 0:
y_1 = sg(y, window_length, polyorder, deriv=1, delta=1.0, axis=-1, mode='interp', cval=0.0)
return X_1, X_2, X_3, y_1
return X_1, X_2, X_3, y
def Update(self, X):
Xcopy = X.copy()
if not self.isTrained:
self.TrainModel(Xcopy)
if 'SPY' in X.keys():
y = Xcopy.pop("SPY")
X1, X2, X3, y1 = self.Preprocess(Xcopy, y)
#TODO: here and in train method, this next bit goes in preprocess
self.mu = np.mean(y1)
self.sigma = np.std(y1)
X_transformed = np.vstack([ X1, X2, X3, y1 ])
for x in X_transformed[:]:
x = self.Discretize(x)
X_transformed = np.transpose(X_transformed)
return self.InverseZ( self.model.predict(X_transformed) )
def Discretize(self, x):
mu = np.mean(x)
std = np.std(x)
x = [(i-mu)/std for i in x]
coefs = [-3, -2, -1, -1/2, 0, 1/2, 1, 2, 3]
bins = [c * std for c in coefs]
#for i in sorted(bins):
# self.algo.Debug(f'{i}')
return np.digitize(x, bins, right=False)
def InverseZ(self, Z_array):
# get X
return [self.sigma * z + self.mu for z in Z_array]