| Overall Statistics |
|
Total Trades 1080 Average Win 0.37% Average Loss -0.33% Compounding Annual Return 9.693% Drawdown 6.100% Expectancy 0.117 Net Profit 22.005% Sharpe Ratio 1.149 Loss Rate 47% Win Rate 53% Profit-Loss Ratio 1.12 Alpha 0.15 Beta -2.68 Annual Standard Deviation 0.084 Annual Variance 0.007 Information Ratio 0.91 Tracking Error 0.084 Treynor Ratio -0.036 Total Fees $2525.73 |
# Derek M Tishler - 2018 - dmtishler@gmail.com
# DEAP Genetic Programming Example for Symbolic Regression Classification on Quant Connect
#DEAP Source: https://github.com/DEAP/deap
#DEAP Docs: https://deap.readthedocs.io/en/master/
from clr import AddReference
AddReference("System")
AddReference("QuantConnect.Algorithm")
AddReference("QuantConnect.Common")
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Brokerages import BrokerageName
import random
from scipy import stats
import numpy as np
from scipy import stats
from scipy import stats as sstats
import pandas as pd
import operator
import math
import time
from evo import *
# using random math on random inputs can lead to many warnings(ex try in protected div, undefined math, etc). This cleans the logs for reading evo table.
# Remove when adjusting/testing pset ops
import warnings
warnings.filterwarnings('ignore')
class BasicTemplateAlgorithm(QCAlgorithm):
def Initialize(self):
self.evo_time = 0.
self.evo = Evolution(self)
self.SetStartDate(2016,1,1) #Set Start Date
#self.SetEndDate(2018,1,1) #Set End Date
self.SetCash(100000) #Set Strategy Cash
self.symbol = "SPY"
self.evo.symbol = self.symbol
self.granularity = Resolution.Daily
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.symbol_Symbol = self.AddEquity(self.symbol, Resolution.Minute, extendedMarketHours=False).Symbol
sPlot = Chart('Strategy Equity')
sPlot.AddSeries(Series('Signal', SeriesType.Scatter, 2))
self.AddChart(sPlot)
fit1Plot = Chart('Strategy Evolution')
fit1Plot.AddSeries(Series('Mean_Loss', SeriesType.Line, 0))
fit1Plot.AddSeries(Series('Max_Loss', SeriesType.Line, 1))
fit1Plot.AddSeries(Series('Consistency', SeriesType.Line, 2))
fit1Plot.AddSeries(Series('Size', SeriesType.Line, 3))
fit1Plot.AddSeries(Series('Size_Max', SeriesType.Line, 3))
fit1Plot.AddSeries(Series('Size_Min', SeriesType.Line, 3))
fit1Plot.AddSeries(Series('Size_Mean', SeriesType.Line, 3))
fit1Plot.AddSeries(Series('Label Dist', SeriesType.Line, 4))
fit1Plot.AddSeries(Series('N_Long', SeriesType.Line, 4))
fit1Plot.AddSeries(Series('N_Short', SeriesType.Line, 4))
self.AddChart(fit1Plot)
sPlot2 = Chart('Strategy Info')
sPlot2.AddSeries(Series('Leverage', SeriesType.Line, 0))
sPlot2.AddSeries(Series('RAM', SeriesType.Line, 1))
sPlot2.AddSeries(Series('Evo Time', SeriesType.Line, 2)) #Label
self.AddChart(sPlot2)
self.Schedule.On(self.DateRules.EveryDay(self.symbol),
self.TimeRules.AfterMarketOpen(self.symbol, 2),
Action(self.Evolve))
self.Schedule.On(self.DateRules.EveryDay(self.symbol),
self.TimeRules.AfterMarketOpen(self.symbol, 30),
Action(self.Checkpoint))
self.Schedule.On(self.DateRules.EveryDay(self.symbol),
self.TimeRules.BeforeMarketClose(self.symbol, 2),
Action(self.Liquidate))
# in case you want to add a relative stop, needs to uncomment in OnData
self.max_loss_frac = 0.03
self.asset_best_price = {}
# trigger large history download one time
self.do_once = True
# weight used for SetHoldings
self.signal = 0.0
def Evolve(self):
# update data in smaller batches for speed
self.evo.current_price = float(self.Securities[self.symbol].Price)
if not self.do_once:
new_hist = self.History([self.symbol], 1, self.granularity, extendedMarketHours=False).astype(np.float32)
self.evo.hist_data = self.evo.hist_data.append(new_hist).iloc[1:] #append and pop stack
# large download, one time only
else:
self.evo.hist_data = self.History([self.symbol], self.evo.warmup_count, self.granularity, extendedMarketHours=False).astype(np.float32)
self.do_once = False
# perform evolution and get trading signal
self.signal = self.evo.OnEvolve()
# handle trading signals
self.SetHoldings(self.symbol, self.signal)#, liquidateExistingHoldings=True)
def Checkpoint(self):
self.Plot("Strategy Equity", 'Signal', self.signal)
self.Plot("Strategy Evolution",'Mean_Loss', float(self.evo.logbook.chapters["fitness"].select("min")[-1][0]))
self.Plot("Strategy Evolution",'Max_Loss', float(self.evo.logbook.chapters["fitness"].select("min")[-1][1]))
self.Plot("Strategy Evolution",'Consistency', float(-self.evo.logbook.chapters["fitness"].select("min")[-1][2]))
self.Plot("Strategy Evolution", 'Size_Max', float(self.evo.logbook.chapters["size"].select("max")[-1]))
self.Plot("Strategy Evolution", 'Size_Min', float(self.evo.logbook.chapters["size"].select("min")[-1]))
self.Plot("Strategy Evolution", 'Size_Mean', float(self.evo.logbook.chapters["size"].select("avg")[-1]))
t = float(self.evo.n_long_labels) + float(self.evo.n_short_labels)
self.Plot("Strategy Evolution", 'N_Long', float(self.evo.n_long_labels)/t)
self.Plot("Strategy Evolution", 'N_Short', float(self.evo.n_short_labels)/t)
self.account_leverage = self.Portfolio.TotalHoldingsValue / self.Portfolio.TotalPortfolioValue
self.Plot('Strategy Info','Leverage', float(self.account_leverage))
self.Plot('Strategy Info','RAM', float(OS.ApplicationMemoryUsed/1024.))
self.Plot('Strategy Info','Evo Time', float(self.evo_time))
def OnData(self, data):
# risk managment to limit per position loss to n%
#map(self.RiskManagement, [self.symbol_Symbol])
pass
def RiskManagement(self, symbol):
# https://github.com/QuantConnect/Lean/blob/24fcd239a702c391c26854601a99c514136eba7c/Common/Securities/SecurityHolding.cs#L79https://github.com/QuantConnect/Lean/blob/24fcd239a702c391c26854601a99c514136eba7c/Common/Securities/SecurityHolding.cs#L79
if self.Portfolio[symbol].Quantity != 0:
# init the avg price as our current best price for the asset
if symbol not in self.asset_best_price:
self.asset_best_price[symbol] = float(self.Portfolio[symbol].AveragePrice)
# have we exceded the target?
if self.Portfolio[symbol].Quantity > 0:
self.asset_best_price[symbol] = np.maximum(self.asset_best_price[symbol], float(self.Securities[symbol].Price))
if (float(self.Securities[symbol].Price)-self.asset_best_price[symbol])/self.asset_best_price[symbol] < -self.max_loss_frac:
self.Log("RM Exit of Long pos: %s"%symbol)
self.Liquidate(symbol, tag="RM")
del self.asset_best_price[symbol]
elif self.Portfolio[symbol].Quantity < 0:
self.asset_best_price[symbol] = np.minimum(self.asset_best_price[symbol], float(self.Securities[symbol].Price))
if (float(self.Securities[symbol].Price)-self.asset_best_price[symbol])/self.asset_best_price[symbol] > self.max_loss_frac:
self.Log("RM Exit of Short pos: %s"%symbol)
self.Liquidate(symbol, tag="RM")
del self.asset_best_price[symbol]# Derek M Tishler - 2018 - dmtishler@gmail.com
# DEAP Genetic Programming Example for Symbolic Regression Classification on Quant Connect
#DEAP Source: https://github.com/DEAP/deap
#DEAP Docs: https://deap.readthedocs.io/en/master/
from System import *
from QuantConnect import *
from QuantConnect.Algorithm import *
from QuantConnect.Brokerages import BrokerageName
import random
from scipy import stats
import numpy as np
from scipy import stats
#from scipy import stats as sstats
import pandas as pd
import operator
import math
import time
import os
from deap import algorithms
from deap import base
from deap import creator
from deap import tools
from deap import gp
from functools import partial
from sklearn.metrics import log_loss
from sklearn.model_selection import train_test_split
# using random math on random inputs can lead to many warnings(ex try in protected div, undefined math, etc). This cleans the logs for reading evo table.
# Remove when adjusting/testing pset ops
import warnings
warnings.filterwarnings('ignore')
# how many individuals in our populations
n_pop = 100
seed = 8675309
random.seed(seed)
np.random.seed(seed)
# simple logic template
def if_then_else(input, output1, output2):
if input: return output1
else: return output2
# avoid errors in evaluations
def protectedDiv(left, right):
if right == 0:
return 0.
else:
return left/right
# we create a fake bool class, this is done to avoid a int-bool confusion in deap operators as they mix up the two easily.
class BOOL:
pass
class Evolution(object):
def __init__(self, context):
# this is here for when you need to log for debugging
self.context = context
self.n_long_labels = 0
self.n_short_labels = 0
# len of hitory
self.n_features = 10#
self.n_samples = 100
self.warmup_count = self.n_features + self.n_samples + 1
# persist the evolution, warning though you have to track when its run and on what data
try_load_saved_pop = False
# The primitive set defines what is possible in the program
self.pset = gp.PrimitiveSetTyped("MAIN", [float]*(self.n_features), float)
self.pset.addPrimitive(operator.add, [float, float], float)
self.pset.addPrimitive(operator.sub, [float, float], float)
self.pset.addPrimitive(operator.mul, [float, float], float)
self.pset.addPrimitive(protectedDiv, [float, float], float)
self.pset.addPrimitive(operator.neg, [float], float)
self.pset.addPrimitive(operator.abs, [float], float)
self.pset.addPrimitive(np.hypot, [float, float], float)
self.pset.addPrimitive(np.absolute, [float], float)
self.pset.addPrimitive(np.fmax, [float, float], float)
self.pset.addPrimitive(np.fmin, [float, float], float)
self.pset.addPrimitive(np.sign, [float], float)
self.pset.addPrimitive(np.square, [float], float)
self.pset.addPrimitive(math.cos, [float], float)
self.pset.addPrimitive(math.sin, [float], float)
self.pset.addPrimitive(operator.and_, [BOOL, BOOL], BOOL)
self.pset.addPrimitive(operator.or_, [BOOL, BOOL], BOOL)
self.pset.addPrimitive(operator.not_, [BOOL], BOOL)
self.pset.addPrimitive(operator.lt, [float, float], BOOL)
self.pset.addPrimitive(operator.le, [float, float], BOOL)
self.pset.addPrimitive(operator.eq, [float, float], BOOL)
self.pset.addPrimitive(operator.ne, [float, float], BOOL)
self.pset.addPrimitive(operator.ge, [float, float], BOOL)
self.pset.addPrimitive(operator.gt, [float, float], BOOL)
self.pset.addPrimitive(if_then_else, [BOOL, float, float], float, 'ite_float')
self.pset.addPrimitive(if_then_else, [BOOL, BOOL, BOOL], BOOL, 'ite_bool')
self.pset.addEphemeralConstant("rand1", lambda: random.random(), float)
self.pset.addEphemeralConstant("rand-1", lambda: -random.random(), float)
self.pset.addTerminal(-0.5, float)
self.pset.addTerminal(-1.0, float)
self.pset.addTerminal(0.0, float)
self.pset.addTerminal(0.5, float)
self.pset.addTerminal(1.0, float)
self.pset.addTerminal(False, BOOL)
self.pset.addTerminal(True, BOOL)
creator.create("FitnessMin", base.Fitness, weights=(-1.0,-1.0, -1.0))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMin)
self.toolbox = base.Toolbox()
self.toolbox.register("expr", gp.genHalfAndHalf, pset=self.pset, min_=1, max_=3)
self.toolbox.register("individual", tools.initIterate, creator.Individual, self.toolbox.expr)
self.toolbox.register("population", tools.initRepeat, list, self.toolbox.individual)
self.toolbox.register("compile", gp.compile, pset=self.pset)
self.toolbox.register("evaluate", self.evalSymbReg)
self.toolbox.register("select", tools.selNSGA2)
self.toolbox.register("mate", gp.cxOnePoint)
self.toolbox.register("expr_mut", gp.genFull, min_=0, max_=2)
self.toolbox.register("mutate", gp.mutUniform, expr=self.toolbox.expr_mut, pset=self.pset)
self.toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17)) #bloat control
self.toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=17)) #bloat control
self.stats_fit = tools.Statistics(lambda ind: ind.fitness.values)
self.stats_size = tools.Statistics(len)
self.stats = tools.MultiStatistics(fitness=self.stats_fit, size=self.stats_size)
self.stats.register("avg", np.mean, axis=0)
self.stats.register("std", np.std, axis=0)
self.stats.register("min", np.min, axis=0)
self.stats.register("max", np.max, axis=0)
# persist the evolution, warning though you have to track when its run and on what data
checkpoint = 'checkpoint.pkl'
self.gen = 0
self.halloffame = tools.ParetoFront()
self.logbook = tools.Logbook()
self.logbook.header = ['gen', 'nevals'] + (self.stats.fields if self.stats else [])
self.population = self.toolbox.population(n=n_pop)
self.selected_individuals = None
def process_batch(self, individual, i):
# prepare the input features of each sample
current_step_input = list(100.*self.hist_data.iloc[i-self.n_features-1:i].open.pct_change().dropna().values.flatten().astype(np.float32))
# run sample through program & get probability. clip used to prevent nan/inf issues
#probability = np.clip(np.nan_to_num(individual(*current_step_input)), 0.001, 0.999)
probability = np.nan_to_num(individual(*current_step_input))
# label for each sample
dp = 100.*(self.hist_data.close.values[i]-self.hist_data.open.values[i])/self.hist_data.open.values[i]
if dp >= 0.0:
label = 1
else:
label = 0
return label, probability
def evalSymbReg(self, individual):
# Transform the tree expression in a callable function
f = self.toolbox.compile(expr=individual)
# loop over and: create each sample, evaluate it, and compare against the actual result(label)
idx_steps_to_eval = np.arange(self.n_features+1, len(self.hist_data.index))
results = map(self.process_batch, [f]*len(idx_steps_to_eval), idx_steps_to_eval)
labels, pred_probs = zip(*results) #unpack
labels = np.array(labels)
# count number of positive/negative class
self.n_long_labels = len(np.where(labels == 1)[0])
self.n_short_labels = len(np.where(labels == 0)[0])
# evaluate in batches as way to reduce overfit to older items in rolling history-inputs
n_eval_groups = 3
batch_labels = np.array_split(labels, n_eval_groups)
batch_pred_probs = np.array_split(pred_probs, n_eval_groups)
batch_losses = []
consistency_score = []
for i in np.arange(len(batch_labels)):
loss_n = log_loss(batch_labels[i], batch_pred_probs[i], labels=[0,1])
if not np.isfinite(loss_n):
loss_n = 25.
batch_losses.append(loss_n)
if loss_n < 0.68: # let be more strict than -ln(0.5)
consistency_score.append(1.)
else:
consistency_score.append(0.)
# forced negative so every fitness is minimized, easier to read in print logs(commented out below)
consistency_score = -np.mean(consistency_score)
# easily influenced by overfit/lucky regions. Have to balance n_samples, batch size, pop size, world peace. Easy job.
avg_loss = np.mean(batch_losses)
# what is our worst batch? I bet its the recent one...lets improve on that(super difficult metric often flat till endgame)
max_loss = np.max(batch_losses)
# you HAVE to return a tuple to DEAP when evaluating
return avg_loss, max_loss, consistency_score
def evalLive(self, individual):
# most recent sample
current_step_input = list(100.*self.hist_data.iloc[-self.n_features-1:].open.pct_change().dropna().values.flatten().astype(np.float32))
# Transform the tree expression in a callable function
compiled_indv = self.toolbox.compile(expr=individual)
pred_prob = np.clip(np.nan_to_num(compiled_indv(*current_step_input)), 0.001, 0.999)
if pred_prob >= 0.5:
signal = 1.
else:
signal = -1.
return signal
# NOTE, so this looks scary...but it is just a copied eaMuPlusLambda algo from:
# https://github.com/DEAP/deap/blob/master/deap/algorithms.py
# explained: http://deap.readthedocs.io/en/master/api/algo.html
# Since we are using the dead-evolutionary-algo in a weird way, we need to manually set up the evolution which gives us full access.
# In the DEAP tutorials they just call eaMuPlusLambda or eaSimple and make it looks very clean.
def OnEvolve(self, cxpb=0.6, mutpb=0.2, lambda_=n_pop*2, verbose=__debug__):
if self.gen == 0:
start_time = time.time()
invalid_ind = [ind for ind in self.population if not ind.fitness.valid]
fitnesses = self.toolbox.map(self.toolbox.evaluate, invalid_ind)
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
if self.halloffame is not None:
self.halloffame.update(self.population)
record = self.stats.compile(self.population) if self.stats else {}
self.logbook.record(gen=0, nevals=len(invalid_ind), **record)
if verbose:
elapsed_time = time.time() - start_time
#print self.logbook.stream + "\t\t%0.2f sec"%(elapsed_time)
#self.Log('\n'+self.logbook.stream)
self.context.evo_time = elapsed_time
self.gen += 1
self.selected_individuals = self.halloffame[:1]
# save to file
#self.Checkpoint()
else:
start_time = time.time()
offspring = algorithms.varOr(self.population, self.toolbox, lambda_, cxpb, mutpb)
invalid_ind = [ind for ind in offspring]# if not ind.fitness.valid] # force eval of every indv, as history is a moving widnow to eval on
fitnesses = self.toolbox.map(self.toolbox.evaluate, invalid_ind)
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
# Update the hall of fame with the generated individuals
if self.halloffame is not None:
self.halloffame.clear() # force eval of every indv, as history is a moving widnow to eval on
self.halloffame.update(offspring)
self.population[:] = self.toolbox.select(self.population + offspring, n_pop)
# Append the current generation statistics to the logbook
record = self.stats.compile(self.population) if self.stats else {}
self.logbook.record(gen=self.gen, nevals=len(invalid_ind), **record)
if verbose:
elapsed_time = time.time() - start_time
#print self.logbook.stream + "\t\t%0.2f sec"%(elapsed_time)
#self.Log('\n'+self.logbook.stream)
self.context.evo_time = elapsed_time
self.gen += 1
self.selected_individuals = self.halloffame[:1]
# save to file
#self.Checkpoint()
# using the selected best item
#signal = self.evalLive(self.halloffame[0])
# but with pareto front we have ANY number of non dominated individuals each gen, just use them all as an ensemble model
signal = stats.mode([self.evalLive(indv) for indv in self.halloffame]).mode[0]
self.context.Log(str(self.gen) + ' : ' + str(self.halloffame[0]))
return signal