| Overall Statistics |
|
Total Trades 4266 Average Win 0.24% Average Loss -0.14% Compounding Annual Return 135.685% Drawdown 34.400% Expectancy 0.181 Net Profit 72.635% Sharpe Ratio 2.372 Probabilistic Sharpe Ratio 72.445% Loss Rate 56% Win Rate 44% Profit-Loss Ratio 1.66 Alpha 0.894 Beta 0.428 Annual Standard Deviation 0.416 Annual Variance 0.173 Information Ratio 1.847 Tracking Error 0.417 Treynor Ratio 2.301 Total Fees $0.00 Estimated Strategy Capacity $140000.00 Lowest Capacity Asset LTCUSD XJ |
import talib as ta
import numpy as np
import pandas as pd
trends = [
("SMA" , "SMA"),
("EMA" , "EMA"),
("DEMA" , "DEMA"),
("TEMA" , "TEMA"),
]
def calculate_trends(close, fast, slow):
signals = []
for trend in trends:
signals.append(getattr(ta, trend[0])(close,fast) < getattr(ta, trend[1])(close,slow))
signals = np.array(signals).T.sum(axis=1)
return signals# GA
import numpy as np
import matplotlib.pyplot as plt
def cal_pop_fitness(equation_inputs, pop):
# Calculating the fitness value of each solution in the current population.
# The fitness function calulates the sum of products between each input and its corresponding weight.
# fitness = np.sum(pop*equation_inputs, axis=1)
# pop size (8,6)
# fitness size 8x1
logr = equation_inputs[:,0]
temp = pop@equation_inputs[:,1:7].T
fitness = np.sum(temp*logr, axis = 1)/np.sum(pop, axis = 1)
return fitness
def select_mating_pool(pop, fitness, num_parents):
# Selecting the best individuals in the current generation as parents for producing the offspring of the next generation.
parents = np.empty((num_parents, pop.shape[1]))
for parent_num in range(num_parents):
max_fitness_idx = np.where(fitness == np.max(fitness))
max_fitness_idx = max_fitness_idx[0][0]
parents[parent_num, :] = pop[max_fitness_idx, :]
fitness[max_fitness_idx] = -99999999999
return parents
def crossover(parents, offspring_size):
offspring = np.empty(offspring_size)
# The point at which crossover takes place between two parents. Usually, it is at the center.
crossover_point = np.uint8(offspring_size[1]/2)
for k in range(offspring_size[0]):
# Index of the first parent to mate.
parent1_idx = k%parents.shape[0]
# Index of the second parent to mate.
parent2_idx = (k+1)%parents.shape[0]
# The new offspring will have its first half of its genes taken from the first parent.
offspring[k, 0:crossover_point] = parents[parent1_idx, 0:crossover_point]
# The new offspring will have its second half of its genes taken from the second parent.
offspring[k, crossover_point:] = parents[parent2_idx, crossover_point:]
return offspring
def mutation(offspring_crossover, num_mutations=1):
mutations_counter = np.uint8(offspring_crossover.shape[1] / num_mutations)
# Mutation changes a number of genes as defined by the num_mutations argument. The changes are random.
for idx in range(offspring_crossover.shape[0]):
gene_idx = mutations_counter - 1
for _ in range(num_mutations):
# The random value to be added to the gene.
random_value = np.random.uniform(-1.0, 1.0, 1)
offspring_crossover[idx, gene_idx] = offspring_crossover[idx, gene_idx] + random_value
gene_idx = gene_idx + mutations_counter
return offspring_crossover
def cal_pop_fitness(equation_inputs, pop, opt = 0):
# Calculating the fitness value of each solution in the current population.
# Opt = 0 is the GAMSSR model
logr = equation_inputs[:,0] # n,
positions = pop@equation_inputs[:,1:].T
port_r = (positions*logr).astype(np.float64)
SSR = np.mean(port_r, axis = 1)/np.std(port_r, axis = 1)/(-np.sum(port_r[port_r<0]))
#SSR = np.mean(port_r, axis = 1)/np.std(port_r[port_r<0], axis = 1)#/(-np.sum(port_r[port_r<0]))
return SSR
def GA_train(training_df, optimizing_selection=0, sol_per_pop=16, num_parents_mating=8, num_generations = 300):
"""
Genetic algorithm parameters:
Mating pool size
Population size
"""
#Inputs of the equation.
equation_inputs = training_df.values
# Number of the weights we are looking to optimize.
num_weights = training_df.shape[1]-1
# Defining the population size.
pop_size = (sol_per_pop,num_weights)
# The population will have sol_per_pop chromosome
# where each chromosome has num_weights genes.
# Creating the initial population.
new_population = np.random.uniform(low=-1.0, high=1.0, size=pop_size)
# print(new_population)
best_outputs = []
for generation in range(num_generations):
# print("Generation : ", generation)
# Measuring the fitness of each chromosome in the population.
fitness = cal_pop_fitness(equation_inputs, new_population, optimizing_selection)
best_outputs.append(np.max(fitness))
# Selecting the best parents in the population for mating.
parents = select_mating_pool(new_population, fitness,
num_parents_mating)
# Generating next generation using crossover.
offspring_crossover = crossover(parents,
offspring_size=(pop_size[0]-parents.shape[0], num_weights))
# Adding some variations to the offspring using mutation.
offspring_mutation = mutation(offspring_crossover, num_mutations=2)
# Creating the new population based on the parents and offspring.
new_population[0:parents.shape[0], :] = parents
new_population[parents.shape[0]:, :] = offspring_mutation
# Getting the best solution after iterating finishing all generations.
# At first, the fitness is calculated for each solution in the final generation.
fitness = cal_pop_fitness(equation_inputs, new_population, optimizing_selection)
# Then return the index of that solution corresponding to the best fitness.
best_match_idx = np.where(fitness == np.max(fitness))
plt.plot(best_outputs)
plt.xlabel("Iteration")
plt.ylabel('SSR ratio')
plt.show()
return new_population[best_match_idx], fitnessimport talib as ta
import numpy as np
import pandas as pd
patterns = [
'CDL2CROWS ' ,
'CDL3BLACKCROWS ' ,
'CDL3INSIDE ' ,
'CDL3LINESTRIKE ' ,
'CDL3OUTSIDE ' ,
'CDL3STARSINSOUTH ' ,
'CDL3WHITESOLDIERS ' ,
'CDLABANDONEDBABY ' ,
'CDLADVANCEBLOCK ' ,
'CDLBELTHOLD ' ,
'CDLBREAKAWAY ' ,
'CDLCLOSINGMARUBOZU ' ,
'CDLCONCEALBABYSWALL' ,
'CDLCOUNTERATTACK ' ,
'CDLDARKCLOUDCOVER ' ,
'CDLDOJI ' ,
'CDLDOJISTAR ' ,
'CDLDRAGONFLYDOJI ' ,
'CDLENGULFING ' ,
'CDLEVENINGDOJISTAR ' ,
'CDLEVENINGSTAR ' ,
'CDLGAPSIDESIDEWHITE' ,
'CDLGRAVESTONEDOJI ' ,
'CDLHAMMER ' ,
'CDLHANGINGMAN ' ,
'CDLHARAMI ' ,
'CDLHARAMICROSS ' ,
'CDLHIGHWAVE ' ,
'CDLHIKKAKE ' ,
'CDLHIKKAKEMOD ' ,
'CDLHOMINGPIGEON ' ,
'CDLIDENTICAL3CROWS ' ,
'CDLINNECK ' ,
'CDLINVERTEDHAMMER ' ,
'CDLKICKING ' ,
'CDLKICKINGBYLENGTH ' ,
'CDLLADDERBOTTOM ' ,
'CDLLONGLEGGEDDOJI ' ,
'CDLLONGLINE ' ,
'CDLMARUBOZU ' ,
'CDLMATCHINGLOW ' ,
'CDLMATHOLD ' ,
'CDLMORNINGDOJISTAR ' ,
'CDLMORNINGSTAR ' ,
'CDLONNECK ' ,
'CDLPIERCING ' ,
'CDLRICKSHAWMAN ' ,
'CDLRISEFALL3METHODS' ,
'CDLSEPARATINGLINES ' ,
'CDLSHOOTINGSTAR ' ,
'CDLSHORTLINE ' ,
'CDLSPINNINGTOP ' ,
'CDLSTALLEDPATTERN ' ,
'CDLSTICKSANDWICH ' ,
'CDLTAKURI ' ,
'CDLTASUKIGAP ' ,
'CDLTHRUSTING ' ,
'CDLTRISTAR ' ,
'CDLUNIQUE3RIVER ' ,
'CDLUPSIDEGAP2CROWS ' ,
'CDLXSIDEGAP3METHODS'
]
def get_signals(df):
o,h,l,c = df.open,df.high,df.low,df.close
pattern_signals = []
for pattern in patterns:
signals = getattr(ta, pattern.strip())(o,h,l,c)
pattern_signals.append(signals/100)
pattern_signals = np.array(pattern_signals).T.sum(axis=1)
return pattern_signalsimport talib as ta
import numpy as np
import scipy.stats as sps
from patterns import *
from trends import *
from momentum import *
from reversion import *
from geneticAlgorithm import *
class decrypted(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2021, 1, 1) # Set Start Date
#self.SetEndDate(2021,1,5)
#self.SetEndDate(2021, 1, 1)
self.SetCash(10000) # Set Strategy Cash
#self.SetWarmup(timedelta(days=30))
self.pairs = ["BTCUSD", "LTCUSD", "ETHUSD"]
self.data = {}
self.cut_loss = .9
self.n_crash = 0
self.prev_pos = []
#self.leverage = 10
self.risk = {
'curr_max': self.Portfolio.TotalPortfolioValue,
'curr_cut': self.Portfolio.TotalPortfolioValue * self.cut_loss
}
fib = [1,2,3,5,8,13,21,34,55,89,144]
#fast = self.GetParameter("fast")
#slow = self.GetParameter("slow")
self.fast = 5 #if fast is None else fast
self.slow = 20 #if slow is None else slow
self.m_period = self.slow #round((self.slow+self.fast)/2)
self.retrain = False
for pair in self.pairs:
symbol = self.AddCrypto(pair, Resolution.Hour).Symbol
self.data[symbol.Value] = {}
self.data[symbol.Value]['symbol'] = symbol
self.data[symbol.Value]['df'] = self.History(symbol, 144, Resolution.Hour)[['open','high','low','close','volume']]
self.data[symbol.Value]['crashing'] = False
self.Debug(f'Loaded data for {symbol.Value} with {str(self.data[symbol.Value]["df"].shape)}')
[self.calibrate(x) for x in self.Securities.Keys]
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(timedelta(hours=6)), self.checkStandings)
#self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(0,0), self.trade)
#self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(timedelta(minutes=180)), self.trade)
#self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(0,0), self.trade)
def OnData(self, data):
'''update data'''
for pair in self.pairs:
if data.ContainsKey(pair):
bar = data.Bars[self.data[pair]['symbol']]
ohlc = [bar.Open, bar.High, bar.Low, bar.Close]
else:
ohlc = self.data[pair]['df'].iloc[-1].to_numpy()
self.data[pair]['df'] = self.data[pair]['df'].append(pd.DataFrame([ohlc], columns = ['open','high','low','close'], index = [self.Time]))
self.data[pair]['df'].fillna(method='ffill',inplace=True)
'''check if indicators is warming up '''
if self.IsWarmingUp:
return
self.trade()
def trade(self):
if self.retrain == True:
[self.calibrate(x) for x in self.Securities.Keys]
self.retrain = False
positions = self.calculate_positions()
to_buy = []
for pair in self.pairs:
if positions[pair] > 0:
to_buy.append(pair)
''' liquidate portfolio if all exp. return is negative '''
if len(to_buy) == 0:
self.Liquidate()
return
to_sell = [x for x in self.pairs if x not in to_buy]
[self.SetHoldings(x, 0) for x in to_sell]
for pair in to_buy:
if self.Securities[pair].Invested:
continue
else:
self.SetHoldings(pair, .33 * .98)
def calculate_positions(self):
positions = {}
''' generate positions for each pair '''
for pair in self.pairs:
curr_ret = np.log(self.data[pair]['df'].close.iloc[-1]/self.data[pair]['df'].close.iloc[-2])
self.data[pair]['s'] = calculate_s(curr_ret, self.data[pair]['eq_ret'], self.data[pair]['eq_var'])
self.data[pair]['signal'] = get_signals(self.data[pair]['df'])[-1]
self.data[pair]['trend'] = calculate_trends(self.data[pair]['df'].close, self.fast, self.slow)[-1]
self.data[pair]['momentum'] = calculate_momentum(self.data[pair]['df'], self.m_period)[-1]
pos = np.array([
self.data[pair]['s'],
self.data[pair]['signal'],
self.data[pair]['trend'],
self.data[pair]['momentum']
])
pos = pos @ self.data[pair]['ssr'].T
positions[pair] = pos
return positions
def checkStandings(self):
curr_val = self.Portfolio.TotalPortfolioValue
if curr_val > self.risk['curr_max']:
self.risk['curr_max'] = curr_val
self.risk['curr_cut'] = curr_val * self.cut_loss
return True
if curr_val < self.risk['curr_cut']:
self.Log('recalibrating')
self.Liquidate()
self.risk['curr_max'] = curr_val
self.risk['curr_cut'] = curr_val * self.cut_loss
self.retrain = True
[self.calibrate(x) for x in self.Securities.Keys]
return False
def calibrate(self, symbol):
'''preprocess data'''
try:
self.Log('calibrating coefficients')
''' get historical for current symbol '''
df = self.data[symbol.Value]['df'].iloc[-144:]
history = df['close']
'''get historical for peer assets '''
peers = ([self.data[x]['df']['close'].values[-144:] for x in self.pairs if x != symbol.Value])
peers = pd.DataFrame(peers,columns = range(len(peers[0]))).T
peers.fillna(method = 'ffill', inplace=True)
except:
self.Debug('error processing data in calibration')
return
# try:
self.Log('calculating signals for' +" "+ symbol.Value )
self.calculate_arb(symbol, history, peers)
self.data[symbol.Value]['pattern_sig'] = get_signals(df)
self.data[symbol.Value]['trend_sig'] = calculate_trends(df.close, self.fast, self.slow)
self.data[symbol.Value]['momentum_sig'] = calculate_momentum(df, self.m_period)
features = [
df.close,
self.data[symbol.Value]['s'],
self.data[symbol.Value]['pattern_sig'],
self.data[symbol.Value]['trend_sig'],
self.data[symbol.Value]['momentum_sig']
]
train = self.build_features_df(features)
# except:
# self.Debug(f'error calculating signals for {symbol.Value}')
# return
self.data[symbol.Value]['ssr'], fitness = GA_train(train.dropna())
self.Debug(f'GA completed for {symbol.Value}')
def build_features_df(self, features):
features_list = ['s','patterns','trends','momentums']
train = pd.DataFrame(features[0], index = features[0].index)
train['ret'] = np.log(features[0]/features[0].shift(1)).shift(-1)
for idx in range(len(features_list)):
train[features_list[idx]] = features[idx+1][-train.shape[0]:]
train = train.iloc[:,1:]
return train
def calculate_arb(self, symbol, history, peers):
y = np.log(history/history.shift(1)).dropna()
x = np.log(peers/peers.shift(1)).dropna()
eq_ret, eq_var, k = build_model(y.values,x.values)
self.data[symbol.Value]['eq_ret'] = eq_ret
self.data[symbol.Value]['eq_var'] = eq_var
self.data[symbol.Value]['k'] = k
self.data[symbol.Value]['s'] = (y - eq_ret) / eq_var
self.Log(f'model creation completed for: {symbol.Value}')
self.Log(f'eq_ret: {eq_ret}, eq_var: {eq_var}, k: {k}')
return
import numpy as np
import pandas as pd
from statsmodels.regression.linear_model import OLS
def create_OLS(ret, factors):
model = OLS(ret, factors).fit()
return model, model.params, model.resid
from statsmodels.tsa.arima_model import ARIMA
def create_ARIMA(factors, order):
model = ARIMA(factors, order = order).fit()
return model, model.arparams
def build_model(y, x):
ols_model, ols_params, ols_resid = create_OLS(y,x)
residuals = np.mean(ols_resid) - ols_resid
ou_model, ou_params = create_ARIMA(residuals, (1,0,0))
direction = 1 if ou_params[0] > 0 else -1
k = abs(ou_params[0])
eq_ret = np.mean(y)
eq_var = np.var(y)/(2*k)
eq_std = np.sqrt(eq_var)
return eq_ret, eq_std, k
def calculate_s(curr_ret, eq_ret, eq_var):
return (curr_ret - eq_ret)/eq_varimport numpy as np
import pandas as pd
import talib as ta
def calculate_momentum(df, p):
o,h,l,c,v = [df.open,df.high,df.low,df.close,df.volume]
bop = ta.BOP(o,h,l,c)
mfi = ta.MFI(h,l,c,v,p).apply(lambda x: 1 if x < 30 else -1 if x > 70 else 0)
rsi = ta.RSI(c, p).apply(lambda x: 1 if x < 30 else -1 if x > 70 else 0)
signals = np.array([bop, mfi, rsi]).T.sum(axis=1)
return signals