Overall Statistics
Total Trades
4266
Average Win
0.24%
Average Loss
-0.14%
Compounding Annual Return
135.685%
Drawdown
34.400%
Expectancy
0.181
Net Profit
72.635%
Sharpe Ratio
2.372
Probabilistic Sharpe Ratio
72.445%
Loss Rate
56%
Win Rate
44%
Profit-Loss Ratio
1.66
Alpha
0.894
Beta
0.428
Annual Standard Deviation
0.416
Annual Variance
0.173
Information Ratio
1.847
Tracking Error
0.417
Treynor Ratio
2.301
Total Fees
$0.00
Estimated Strategy Capacity
$140000.00
Lowest Capacity Asset
LTCUSD XJ
import talib as ta
import numpy as np
import pandas as pd


trends = [
    ("SMA"  , "SMA"),
    ("EMA"  , "EMA"),
    ("DEMA" , "DEMA"),
    ("TEMA" , "TEMA"),
    ]
    
    
def calculate_trends(close, fast, slow): 
    signals = []
    for trend in trends: 
        signals.append(getattr(ta, trend[0])(close,fast) < getattr(ta, trend[1])(close,slow))
        
    signals = np.array(signals).T.sum(axis=1)
    
    return signals
# GA
import numpy as np
import matplotlib.pyplot as plt

def cal_pop_fitness(equation_inputs, pop):
    # Calculating the fitness value of each solution in the current population.
    # The fitness function calulates the sum of products between each input and its corresponding weight.
    # fitness = np.sum(pop*equation_inputs, axis=1)
    # pop size (8,6)
    # fitness size 8x1
    logr = equation_inputs[:,0]
    temp = pop@equation_inputs[:,1:7].T
    fitness = np.sum(temp*logr, axis = 1)/np.sum(pop, axis = 1)
    return fitness

def select_mating_pool(pop, fitness, num_parents):
    # Selecting the best individuals in the current generation as parents for producing the offspring of the next generation.
    parents = np.empty((num_parents, pop.shape[1]))
    for parent_num in range(num_parents):
        max_fitness_idx = np.where(fitness == np.max(fitness))
        max_fitness_idx = max_fitness_idx[0][0]
        parents[parent_num, :] = pop[max_fitness_idx, :]
        fitness[max_fitness_idx] = -99999999999
    return parents

def crossover(parents, offspring_size):
    offspring = np.empty(offspring_size)
    # The point at which crossover takes place between two parents. Usually, it is at the center.
    crossover_point = np.uint8(offspring_size[1]/2)

    for k in range(offspring_size[0]):
        # Index of the first parent to mate.
        parent1_idx = k%parents.shape[0]
        # Index of the second parent to mate.
        parent2_idx = (k+1)%parents.shape[0]
        # The new offspring will have its first half of its genes taken from the first parent.
        offspring[k, 0:crossover_point] = parents[parent1_idx, 0:crossover_point]
        # The new offspring will have its second half of its genes taken from the second parent.
        offspring[k, crossover_point:] = parents[parent2_idx, crossover_point:]
    return offspring

def mutation(offspring_crossover, num_mutations=1):
    mutations_counter = np.uint8(offspring_crossover.shape[1] / num_mutations)
    # Mutation changes a number of genes as defined by the num_mutations argument. The changes are random.
    for idx in range(offspring_crossover.shape[0]):
        gene_idx = mutations_counter - 1
        for _ in range(num_mutations):
            # The random value to be added to the gene.
            random_value = np.random.uniform(-1.0, 1.0, 1)
            offspring_crossover[idx, gene_idx] = offspring_crossover[idx, gene_idx] + random_value
            gene_idx = gene_idx + mutations_counter
    return offspring_crossover

def cal_pop_fitness(equation_inputs, pop, opt = 0):
    # Calculating the fitness value of each solution in the current population.
    # Opt = 0 is the GAMSSR model
    logr = equation_inputs[:,0] # n,
    positions = pop@equation_inputs[:,1:].T
    port_r = (positions*logr).astype(np.float64)
    SSR = np.mean(port_r, axis = 1)/np.std(port_r, axis = 1)/(-np.sum(port_r[port_r<0]))
    #SSR = np.mean(port_r, axis = 1)/np.std(port_r[port_r<0], axis = 1)#/(-np.sum(port_r[port_r<0]))
    return SSR

def GA_train(training_df, optimizing_selection=0, sol_per_pop=16, num_parents_mating=8, num_generations = 300):
    """
    Genetic algorithm parameters:
        Mating pool size
        Population size
    """
    #Inputs of the equation.
    equation_inputs = training_df.values
    # Number of the weights we are looking to optimize.
    num_weights = training_df.shape[1]-1

    # Defining the population size.
    pop_size = (sol_per_pop,num_weights) 
    # The population will have sol_per_pop chromosome 
    # where each chromosome has num_weights genes.
    
    # Creating the initial population.
    new_population = np.random.uniform(low=-1.0, high=1.0, size=pop_size)
    # print(new_population)

    best_outputs = []
    
    for generation in range(num_generations):
    #     print("Generation : ", generation)
        # Measuring the fitness of each chromosome in the population.
        fitness = cal_pop_fitness(equation_inputs, new_population, optimizing_selection)

        best_outputs.append(np.max(fitness))

        # Selecting the best parents in the population for mating.
        parents = select_mating_pool(new_population, fitness, 
                                          num_parents_mating)

        # Generating next generation using crossover.
        offspring_crossover = crossover(parents,
                                           offspring_size=(pop_size[0]-parents.shape[0], num_weights))

        # Adding some variations to the offspring using mutation.
        offspring_mutation = mutation(offspring_crossover, num_mutations=2)

        # Creating the new population based on the parents and offspring.
        new_population[0:parents.shape[0], :] = parents
        new_population[parents.shape[0]:, :] = offspring_mutation

    # Getting the best solution after iterating finishing all generations.
    # At first, the fitness is calculated for each solution in the final generation.
    fitness = cal_pop_fitness(equation_inputs, new_population, optimizing_selection)
    # Then return the index of that solution corresponding to the best fitness.
    best_match_idx = np.where(fitness == np.max(fitness))
        
    plt.plot(best_outputs)
    plt.xlabel("Iteration")
    plt.ylabel('SSR ratio')
    plt.show()
    return new_population[best_match_idx], fitness
import talib as ta
import numpy as np
import pandas as pd

patterns = [
    'CDL2CROWS          ' ,
    'CDL3BLACKCROWS     ' ,
    'CDL3INSIDE         ' ,
    'CDL3LINESTRIKE     ' ,
    'CDL3OUTSIDE        ' ,
    'CDL3STARSINSOUTH   ' ,
    'CDL3WHITESOLDIERS  ' ,
    'CDLABANDONEDBABY   ' ,
    'CDLADVANCEBLOCK    ' ,
    'CDLBELTHOLD        ' ,
    'CDLBREAKAWAY       ' ,
    'CDLCLOSINGMARUBOZU ' ,
    'CDLCONCEALBABYSWALL' ,
    'CDLCOUNTERATTACK   ' ,
    'CDLDARKCLOUDCOVER  ' ,
    'CDLDOJI            ' ,
    'CDLDOJISTAR        ' ,
    'CDLDRAGONFLYDOJI   ' ,
    'CDLENGULFING       ' ,
    'CDLEVENINGDOJISTAR ' ,
    'CDLEVENINGSTAR     ' ,
    'CDLGAPSIDESIDEWHITE' ,
    'CDLGRAVESTONEDOJI  ' ,
    'CDLHAMMER          ' ,
    'CDLHANGINGMAN      ' ,
    'CDLHARAMI          ' ,
    'CDLHARAMICROSS     ' ,
    'CDLHIGHWAVE        ' ,
    'CDLHIKKAKE         ' ,
    'CDLHIKKAKEMOD      ' ,
    'CDLHOMINGPIGEON    ' ,
    'CDLIDENTICAL3CROWS ' ,
    'CDLINNECK          ' ,
    'CDLINVERTEDHAMMER  ' ,
    'CDLKICKING         ' ,
    'CDLKICKINGBYLENGTH ' ,
    'CDLLADDERBOTTOM    ' ,
    'CDLLONGLEGGEDDOJI  ' ,
    'CDLLONGLINE        ' ,
    'CDLMARUBOZU        ' ,
    'CDLMATCHINGLOW     ' ,
    'CDLMATHOLD         ' ,
    'CDLMORNINGDOJISTAR ' ,
    'CDLMORNINGSTAR     ' ,
    'CDLONNECK          ' ,
    'CDLPIERCING        ' ,
    'CDLRICKSHAWMAN     ' ,
    'CDLRISEFALL3METHODS' ,
    'CDLSEPARATINGLINES ' ,
    'CDLSHOOTINGSTAR    ' ,
    'CDLSHORTLINE       ' ,
    'CDLSPINNINGTOP     ' ,
    'CDLSTALLEDPATTERN  ' ,
    'CDLSTICKSANDWICH   ' ,
    'CDLTAKURI          ' ,
    'CDLTASUKIGAP       ' ,
    'CDLTHRUSTING       ' ,
    'CDLTRISTAR         ' ,
    'CDLUNIQUE3RIVER    ' ,
    'CDLUPSIDEGAP2CROWS ' ,
    'CDLXSIDEGAP3METHODS' 
    ]
    
    
def get_signals(df):
    o,h,l,c = df.open,df.high,df.low,df.close
    pattern_signals = []
    for pattern in patterns: 
        signals = getattr(ta, pattern.strip())(o,h,l,c)
        pattern_signals.append(signals/100)
    pattern_signals = np.array(pattern_signals).T.sum(axis=1)
    return pattern_signals
import talib as ta
import numpy as np
import scipy.stats as sps
from patterns import *
from trends import *
from momentum import *
from reversion import *
from geneticAlgorithm import *

class decrypted(QCAlgorithm):
    
    def Initialize(self):
        
        self.SetStartDate(2021, 1,  1)  # Set Start Date
        #self.SetEndDate(2021,1,5)
        #self.SetEndDate(2021, 1, 1) 
        self.SetCash(10000)  # Set Strategy Cash
        #self.SetWarmup(timedelta(days=30))
        self.pairs = ["BTCUSD", "LTCUSD", "ETHUSD"]
        self.data = {}
        self.cut_loss = .9
        self.n_crash = 0
        
        self.prev_pos = []
        
        #self.leverage = 10
        self.risk = {
            'curr_max': self.Portfolio.TotalPortfolioValue,
            'curr_cut': self.Portfolio.TotalPortfolioValue * self.cut_loss
        }
        
        fib = [1,2,3,5,8,13,21,34,55,89,144]
        
        #fast  =  self.GetParameter("fast")
        #slow =  self.GetParameter("slow")
        
        self.fast = 5  #if fast is None else fast
        self.slow = 20 #if slow is None else slow
        self.m_period = self.slow #round((self.slow+self.fast)/2)
        self.retrain = False
        
        for pair in self.pairs: 
            symbol = self.AddCrypto(pair, Resolution.Hour).Symbol
            self.data[symbol.Value] = {} 
            self.data[symbol.Value]['symbol'] = symbol
            self.data[symbol.Value]['df'] = self.History(symbol, 144, Resolution.Hour)[['open','high','low','close','volume']]
            self.data[symbol.Value]['crashing'] = False
            self.Debug(f'Loaded data for {symbol.Value} with {str(self.data[symbol.Value]["df"].shape)}')
        
        [self.calibrate(x) for x in self.Securities.Keys]
        
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(timedelta(hours=6)), self.checkStandings)
        #self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(0,0), self.trade)
        #self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.Every(timedelta(minutes=180)), self.trade)
        #self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.At(0,0), self.trade)
        
    def OnData(self, data):
        
        '''update data'''
        for pair in self.pairs: 
            if data.ContainsKey(pair):
                bar = data.Bars[self.data[pair]['symbol']]
                ohlc = [bar.Open, bar.High, bar.Low, bar.Close]
            else: 
                ohlc = self.data[pair]['df'].iloc[-1].to_numpy()
            
            self.data[pair]['df'] = self.data[pair]['df'].append(pd.DataFrame([ohlc], columns = ['open','high','low','close'], index = [self.Time]))
            self.data[pair]['df'].fillna(method='ffill',inplace=True)
            
            
        '''check if indicators is warming up '''
        if self.IsWarmingUp:
            return
        
        self.trade()
        
    def trade(self): 
        
        if self.retrain == True: 
            [self.calibrate(x) for x in self.Securities.Keys]
            self.retrain = False
        
        positions = self.calculate_positions()
        to_buy = []
        
        for pair in self.pairs: 
            if positions[pair] > 0: 
                to_buy.append(pair)
        
        ''' liquidate portfolio if all exp. return is negative '''        
        if len(to_buy) == 0: 
            self.Liquidate()
            return
                
        to_sell = [x for x in self.pairs if x not in to_buy]
        [self.SetHoldings(x, 0) for x in to_sell]
        
        for pair in to_buy: 
            if self.Securities[pair].Invested: 
                continue
        else: 
            self.SetHoldings(pair, .33 * .98)
    
    def calculate_positions(self): 
        positions = {}
        ''' generate positions for each pair '''
        for pair in self.pairs: 
            curr_ret = np.log(self.data[pair]['df'].close.iloc[-1]/self.data[pair]['df'].close.iloc[-2])
            
            self.data[pair]['s'] = calculate_s(curr_ret, self.data[pair]['eq_ret'], self.data[pair]['eq_var'])
            self.data[pair]['signal'] = get_signals(self.data[pair]['df'])[-1]
            self.data[pair]['trend'] = calculate_trends(self.data[pair]['df'].close, self.fast, self.slow)[-1]
            self.data[pair]['momentum'] = calculate_momentum(self.data[pair]['df'], self.m_period)[-1]
            
            pos = np.array([
                self.data[pair]['s'], 
                self.data[pair]['signal'], 
                self.data[pair]['trend'], 
                self.data[pair]['momentum']
                ])
                
            pos = pos @ self.data[pair]['ssr'].T
            
            positions[pair] = pos
        
        return positions
            
    def checkStandings(self): 
        curr_val = self.Portfolio.TotalPortfolioValue
        if curr_val > self.risk['curr_max']: 
            self.risk['curr_max'] = curr_val
            self.risk['curr_cut'] = curr_val * self.cut_loss
            return True
        
        if curr_val < self.risk['curr_cut']: 
            self.Log('recalibrating')
            self.Liquidate()
            self.risk['curr_max'] = curr_val
            self.risk['curr_cut'] = curr_val * self.cut_loss
            self.retrain = True
            
            
            [self.calibrate(x) for x in self.Securities.Keys]
            
            return False
    
    def calibrate(self, symbol): 
        
        '''preprocess data'''
        try: 
            self.Log('calibrating coefficients')
            
            ''' get historical for current symbol '''
            df = self.data[symbol.Value]['df'].iloc[-144:]
            history = df['close']
            
            '''get historical for peer assets '''
            peers = ([self.data[x]['df']['close'].values[-144:] for x in self.pairs if x != symbol.Value])
            peers = pd.DataFrame(peers,columns = range(len(peers[0]))).T
            peers.fillna(method = 'ffill', inplace=True)
            
        except: 
            self.Debug('error processing data in calibration')
            return
        
        # try: 
        self.Log('calculating signals for' +" "+ symbol.Value )
        self.calculate_arb(symbol, history, peers)
        self.data[symbol.Value]['pattern_sig'] = get_signals(df)
        self.data[symbol.Value]['trend_sig'] = calculate_trends(df.close, self.fast, self.slow)
        self.data[symbol.Value]['momentum_sig'] = calculate_momentum(df, self.m_period)
        
        features = [
            df.close,
            self.data[symbol.Value]['s'],
            self.data[symbol.Value]['pattern_sig'],
            self.data[symbol.Value]['trend_sig'],
            self.data[symbol.Value]['momentum_sig']
            ]
            
        train = self.build_features_df(features)
        # except: 
        #     self.Debug(f'error calculating signals for {symbol.Value}')
        #     return
            
        self.data[symbol.Value]['ssr'], fitness =  GA_train(train.dropna())
        self.Debug(f'GA completed for {symbol.Value}')
    
    def build_features_df(self, features): 
        features_list = ['s','patterns','trends','momentums']
        train = pd.DataFrame(features[0], index = features[0].index)
        train['ret'] = np.log(features[0]/features[0].shift(1)).shift(-1)
        
        for idx in range(len(features_list)): 
            train[features_list[idx]] = features[idx+1][-train.shape[0]:]
            
        train = train.iloc[:,1:]
        return train
        
    def calculate_arb(self, symbol, history, peers):
        y = np.log(history/history.shift(1)).dropna()
        x = np.log(peers/peers.shift(1)).dropna()
        eq_ret, eq_var, k = build_model(y.values,x.values)
        self.data[symbol.Value]['eq_ret'] = eq_ret
        self.data[symbol.Value]['eq_var'] = eq_var
        self.data[symbol.Value]['k'] = k
        self.data[symbol.Value]['s'] = (y - eq_ret) / eq_var
        
        self.Log(f'model creation completed for: {symbol.Value}')
        self.Log(f'eq_ret: {eq_ret}, eq_var: {eq_var}, k: {k}')
        
        return
            
import numpy as np
import pandas as pd
from statsmodels.regression.linear_model import OLS

def create_OLS(ret, factors): 
    model = OLS(ret, factors).fit()
    return model, model.params, model.resid

from statsmodels.tsa.arima_model import ARIMA
def create_ARIMA(factors, order): 
    model = ARIMA(factors, order = order).fit()
    return model, model.arparams

def build_model(y, x): 
    ols_model, ols_params, ols_resid = create_OLS(y,x)
    residuals = np.mean(ols_resid) - ols_resid
    ou_model, ou_params = create_ARIMA(residuals, (1,0,0))
    direction = 1 if ou_params[0] > 0 else -1
    k = abs(ou_params[0])

    eq_ret = np.mean(y)
    eq_var = np.var(y)/(2*k)
    eq_std = np.sqrt(eq_var)
    
    return eq_ret, eq_std, k

def calculate_s(curr_ret, eq_ret, eq_var): 
    return (curr_ret - eq_ret)/eq_var
import numpy as np
import pandas as pd
import talib as ta

def calculate_momentum(df, p): 
    o,h,l,c,v = [df.open,df.high,df.low,df.close,df.volume]
    bop = ta.BOP(o,h,l,c) 
    mfi = ta.MFI(h,l,c,v,p).apply(lambda x: 1 if x < 30 else -1 if x > 70 else 0)
    rsi = ta.RSI(c, p).apply(lambda x: 1 if x < 30 else -1 if x > 70 else 0)

    signals = np.array([bop, mfi, rsi]).T.sum(axis=1)
    return signals