| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
from AlgorithmImports import *
from collections import deque
# https://github.com/QuantConnect/Lean/blob/master/Algorithm.Python/CustomIndicatorAlgorithm.py
class ATRTrailingStop():
# class ATRTraillingStop(PythonIndicator):
# no longer inheriting from PythonIndicator because we need to be able
# to pass in a custom data to Update
'''Maybe Aaron should write this description'''
def __init__(self, name, multiplier):
self.Name = name
# self.Value = 0
self.Current = IndicatorDataPoint()
self.queue = deque([0,0], maxlen=1)
self.close_queue = deque([0,0], maxlen=1)
self.multiplier = multiplier
self.IsReady = False
# self.Debug = algo.Debug
def __repr__(self):
return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
def Update(self, bar, atr_value):
'''
Updates the current value of the indicator and determines if it is ready to use
Args:
bar (object): TradeBar object
atr_value (object): IndicatorDataPoint object from an AverageTrueRange indicator
Returns:
(bool): is the indicator ready to use?
'''
# self.Time = bar.EndTime
if self.close_queue[0] != 0:
prev_atr_stop_val = self.queue[0]
prev_close_price = self.close_queue[0]
current_close_price = bar.Close
nLoss = self.multiplier * atr_value
new_atr_stop_val = 0
if current_close_price > prev_atr_stop_val and prev_close_price > prev_atr_stop_val:
new_atr_stop_val = max(prev_atr_stop_val, current_close_price - nLoss)
elif current_close_price < prev_atr_stop_val and prev_close_price < prev_atr_stop_val:
new_atr_stop_val = min(prev_atr_stop_val, current_close_price + nLoss)
elif current_close_price > prev_atr_stop_val:
new_atr_stop_val = current_close_price - nLoss
else:
new_atr_stop_val = current_close_price + nLoss
self.queue.appendleft(new_atr_stop_val)
# self.Value = new_atr_val
self.Current = IndicatorDataPoint(bar.EndTime, new_atr_stop_val)
self.close_queue.appendleft(bar.Close)
self.IsReady = self.queue[0] != 0
# return len(self.queue) == 2
from AlgorithmImports import *
from collections import deque
# https://github.com/QuantConnect/Lean/blob/master/Algorithm.Python/CustomIndicatorAlgorithm.py
class CustomEma():
'''
Custom EMA Indicator that uses an SMA as the first EMA value
https://www.quantconnect.com/forum/discussion/3383/custom-indicator-in-python-algorithm/p1
'''
def __init__(self, name, ema_length):
self.Name = name
# self.Value = 0
self.Current = IndicatorDataPoint()
self.IsReady = False
self.close_queue = deque(maxlen=ema_length)
self.ema_queue = deque(maxlen=1)
self.ema_length = ema_length
self.ema_multiplier = 2/(ema_length+1)
def __repr__(self):
return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
def sma(self, values, divider):
'''Calculates the simple moving average of a list of values and a divider'''
return sum(values) / divider
def Update(self, input):
'''
Updates the current value of the indicator and determines if it is ready to use
Args:
input (object): IBaseData object
Returns:
(bool): is the indicator ready?
'''
close = input.Close
self.close_queue.appendleft(close)
close_count = len(self.close_queue)
# self.Time = input.EndTime
ema = 0
if close_count < self.ema_length:
# if we don't have enough data yet to calculate an SMA
# just push the close price into the queue
self.ema_queue.appendleft(close)
elif close_count >= self.ema_length:
if close_count == self.ema_length:
# if we have exactly enough data in the queue to calculate an
# SMA, use that (this is our starting value)
# self.ema_queue.appendleft(close)
ema = self.sma(self.close_queue, self.ema_length)
elif close_count > self.ema_length:
# https://www.investopedia.com/ask/answers/122314/what-exponential-moving-average-ema-formula-and-how-ema-calculated.asp
# https://github.com/QuantConnect/Lean/blob/master/Indicators/ExponentialMovingAverage.cs
ema = close * self.ema_multiplier + self.ema_queue[0] * (1 - self.ema_multiplier)
self.ema_queue.appendleft(ema)
self.Current = IndicatorDataPoint(input.EndTime, ema)
self.IsReady = True
from AlgorithmImports import *
from collections import deque
import os
import math
import numpy as np
import pickle
import gzip
class CustomHelpers:
'''A class of custom utility functions'''
def Cross(direction, a, b, algorithm):
'''
Determines whether the
'''
if direction not in ['under', 'over', 'any']:
raise ValueError("direction must be 'over', 'under' or 'any'")
if not isinstance(algorithm, QCAlgorithm):
raise ValueError("algorithm must be an instance of QCAlgorithm")
a_is_iterable = False
b_is_iterable = False
if isinstance(a, (RollingWindow, list, deque)):
a_is_iterable = True
if isinstance(a[0], IndicatorDataPoint):
a = [x.Value for x in a]
if isinstance(a[0], TradeBar):
a = [x.Close for x in a]
if not isinstance(a[0], (int, float)):
raise TypeError("parsed values for iterable 2nd argument must be of type float or int")
elif not isinstance(a, (int, float)):
raise TypeError("simple value for 2nd argument must be of type float or int")
if isinstance(b, (RollingWindow, list, deque)):
b_is_iterable = True
if isinstance(a[0], IndicatorDataPoint):
b = [x.Value for x in b]
if isinstance(b[0], TradeBar):
b = [x.Close for x in b]
if not isinstance(b[0], (int, float)):
raise TypeError("parsed values for iterable 3rd argument must be of type float or int")
elif not isinstance(b, (int, float)):
raise TypeError("simple value for 3rd argument must be of type float or int")
# everything passes the sniff test, do the thing...
if direction is 'over' and a_is_iterable and b_is_iterable:
return a[1] <= b[1] and a[0] > b[0]
elif direction is 'over' and a_is_iterable and not b_is_iterable:
return a[1] <= b and a[0] > b
elif direction is 'under' and a_is_iterable and b_is_iterable:
return a[1] >= b[1] and a[0] < b[0]
elif direction is 'under' and a_is_iterable and not b_is_iterable:
return a[1] >= b and a[0] < b
elif direction is 'any' and a_is_iterable and b_is_iterable:
return (a[1] <= b and a[0] > b) or (a[1] >= b[1] and a[0] < b[0])
elif direction is 'any' and a_is_iterable and not b_is_iterable:
return (a[1] <= b and a[0] > b) or (a[1] >= b and a[0] < b)
def IndicatorCrossover(window_1, window_2, algorithm):
'''
Determines whether one rolling indicator iterable curossed over another
Args:
window_1 (list): RollingWindow of IndicatorDataPoint objects
window_2 (list): RollingWindow of IndicatorDataPoint objects
Returns: bool
'''
return window_1[1].Value <= window_2[1].Value and window_1[0].Value > window_2[0].Value
def IndicatorCrossunder(window_1, window_2, algorithm):
'''
Determines whether one rolling indicator window curossed under another
Args:
window_1 (list): RollingWindow of IndicatorDataPoint objects
window_2 (list): RollingWindow of IndicatorDataPoint objects
Returns: bool
'''
return window_1[1].Value >= window_2[1].Value and window_1[0].Value < window_2[0].Value
def EquityCrossoverIndicator(equity_window, indicator_window, algorithm):
'''
Determines whether a rolling equity window crossed over a rolling indicator window
Args:
equity_window (list): RollingWindow of TradeBar objects
indicator_window (list): RollingWindow of IndicatorDataPoint objects
Returns: bool
'''
return equity_window[1].Close <= indicator_window[1].Value and equity_window[0].Close > indicator_window[0].Value
def EquityCrossunderIndicator(equity_window, indicator_window, algorithm):
'''
Determines whether a rolling equity window crossed under a rolling indicator window
Args:
equity_window (list): RollingWindow of TradeBar objects
indicator_window (list): RollingWindow of IndicatorDataPoint objects
Returns: bool
'''
return equity_window[1].Close >= indicator_window[1].Value and equity_window[0].Close < indicator_window[0].Value
def EquityCrossoverValue(equity_window, value, algorithm):
'''
Determines whether a rolling equity window crossed over a static value
Args:
equity_window (list): RollingWindow of TradeBar objects
value (float): RollingWindow of IndicatorDataPoint objects
Returns: bool
'''
return equity_window[1].Close <= value and equity_window[0].Close > value
def EquityCrossunderValue(equity_window, value, algorithm):
'''
Determines whether a rolling equity window crossed under a static value
Args:
equity_window (list): RollingWindow of TradeBar objects
value (float): value to check the equity_window against
Returns: bool
'''
return equity_window[1].Close >= value and equity_window[0].Close < value
def IndicatorCrossoverValue(indicator_window, value, algorithm):
'''
Determines whether a rolling indicator window crossed over a static value
Args:
indicator_window (list): RollingWindow of IndicatorDataPoint objects
value (float): value to check the indicator window against
Returns: bool
'''
return indicator_window[1].Value <= value and indicator_window[0].Value > value
def IndicatorCrossunderValue(indicator_window, value, algorithm):
'''
Determines whether a rolling indicator window crossed under a static value
Args:
indicator_window (list): RollingWindow of IndicatorDataPoint objects
value (float): value to check the indicator window against
Returns: bool
'''
return indicator_window[1].Value >= value and indicator_window[0].Value < value
def IndicatorCrossedValue(indicator_window, value, algorithm):
'''
Determines whether a rolling indicator window crossed either over OR under a static value
Args:
indicator_window (list): RollingWindow of IndicatorDataPoint objects
value (float): value to check the indicator window against
Returns: bool
'''
return CustomHelpers.IndicatorCrossoverValue(indicator_window, value, algorithm) \
or CustomHelpers.IndicatorCrossunderValue(indicator_window, value, algorithm)
def IndicatorCrossoverValueWindow(indicator_window, value_window, algorithm):
'''
Determines whether a rolling indicator window crossed over a simple value window
Args:
indicator_window (list): RollingWindow of IndicatorDataPoint objects
value_window (list): RollingWindow of int or float values
Returns: bool
'''
return indicator_window[1].Value <= value_window[1] and indicator_window[0].Value > value_window[0]
def IndicatorCrossunderValueWindow(indicator_window, value_window, algorithm):
'''
Determines whether a rolling indicator window crossed over a simple value window
Args:
indicator_window (list): RollingWindow of IndicatorDataPoint objects
value_window (list): RollingWindow of int or float values
Returns: bool
'''
return indicator_window[1].Value >= value_window[1] and indicator_window[0].Value < value_window[0]
def NanToZero(input):
'''
Checks if the input is a valid number. Retruns the number if so, otherwise returns zero
Args:
input (any): the value to check
Returns:
int|float
'''
return 0 if math.isnan(input) else input
def ATRTrailingStopValue(equity_window, atr_window, multiplier):
'''
Description: Determines the current ATR Stoploss Value
Note: this was first ATRTrailingStop attempt. I've made a custom
indicator for this instead since we need to keep it in a rolling window
Args:
equity_window (list): rolling window of trade bars
atr_window (list): rolling window of AverageTrueRange IndicatorDataPoint objects
multiplier (float): ¯\_(ツ)_/¯
Returns:
(float): the current ATR Trailing Stop value
'''
nLoss = atr_window[0].Value * multiplier
atr_val = atr_window[1].Value or 0
if equity_window[0] > atr_val and equity_window[1] > atr_val:
return max(atr_val, equity_window[0] - nLoss)
elif equity_window[0] < atr_val and equity_window[1] < atr_val:
return min(atr_val, equity_window + nLoss)
elif equity_window[0] > atr_val:
return equity_window[0] - nLoss
else:
return equity_window[0] + nLoss
def IsMarketHours(algorithm):
'''
Description: Determines if the current time is within normal US market hours
Returns:
(bool)
'''
return (algorithm.Time.hour == 9 and algorithm.Time.minute >= 30) \
or (algorithm.Time.hour > 9 and algorithm.Time.hour < 16) \
or (algorithm.Time.hour == 16 and algorithm.Time.minute == 00)
def Timedelta64ToMinutes(timedelta64):
'''
Description: Converts a numpy timedelta64 object to minutes (float)
Returns:
(float)
'''
return timedelta64.astype('timedelta64[s]').item().total_seconds() / 60
def hurst(ts):
"""Returns the Hurst Exponent of the time series vector ts"""
# Create the range of lag values
lags = range(2, 100)
# Calculate the array of the variances of the lagged differences
tau = [np.sqrt(np.std(np.subtract(ts[lag:], ts[:-lag]))) for lag in lags]
# Use a linear fit to estimate the Hurst Exponent
poly = np.polyfit(np.log(lags), np.log(tau), 1)
# Return the Hurst exponent from the polyfit output
return poly[0] * 2.0
############################# Variance Ratio Functions #####################################
# https://github.com/letianzj/QuantResearch/blob/master/notebooks/mean_reversion.py
def normcdf(X):
(a1, a2, a3, a4, a5) = (0.31938153, -0.356563782, 1.781477937, -1.821255978, 1.330274429)
L = abs(X)
K = 1.0 / (1.0 + 0.2316419 * L)
w = 1.0 - 1.0 / np.sqrt(2 * np.pi) * np.exp(-L * L / 2.) * (
a1 * K + a2 * K * K + a3 * pow(K, 3) + a4 * pow(K, 4) + a5 * pow(K, 5))
if X < 0:
w = 1.0 - w
return w
def vratio(a, lag=2, cor='hom'):
t = (np.std((a[lag:]) - (a[1:-lag + 1]))) ** 2
b = (np.std((a[2:]) - (a[1:-1]))) ** 2
n = float(len(a))
mu = sum(a[1:len(a)] - a[:-1]) / n
m = (n - lag + 1) * (1 - lag / n)
# print mu, m, lag
b = sum(np.square(a[1:len(a)] - a[:len(a) - 1] - mu)) / (n - 1)
t = sum(np.square(a[lag:len(a)] - a[:len(a) - lag] - lag * mu)) / m
vratio = t / (lag * b)
la = float(lag)
if cor == 'hom':
varvrt = 2 * (2 * la - 1) * (la - 1) / (3 * la * n)
elif cor == 'het':
varvrt = 0
sum2 = sum(np.square(a[1:len(a)] - a[:len(a) - 1] - mu))
for j in range(lag - 1):
sum1a = np.square(a[j + 1:len(a)] - a[j:len(a) - 1] - mu)
sum1b = np.square(a[1:len(a) - j] - a[0:len(a) - j - 1] - mu)
sum1 = np.dot(sum1a, sum1b)
delta = sum1 / (sum2 ** 2)
varvrt = varvrt + ((2 * (la - j) / la) ** 2) * delta
zscore = (vratio - 1) / np.sqrt(float(varvrt))
pval = CustomHelpers.normcdf(zscore)
return vratio, zscore, pval
############################# Object Store Functions #####################################
# https://www.quantconnect.com/forum/discussion/9000/objectstore-deleting-all-saved-data-in-a-project/p1
def bytesto(bytes, to, bsize=1024):
a = {'kb' : 1, 'mb': 2, 'gb' : 3, 'tb' : 4, 'pb' : 5, 'eb' : 6 }
r = float(bytes)
return bytes / (bsize ** a[to])
def ObjectStoreOverview(algorithm, storage_limit=50, file_limit=1000):
"""
Some basic ObjectStore information.
"""
keys = [str(j).split(',')[0][1:] for _, j in enumerate(algorithm.ObjectStore.GetEnumerator())]
sizes = [os.path.getsize(algorithm.ObjectStore.GetFilePath(key)) for key in keys]
remaining = storage_limit - sum(sizes)/1e6
print(f'Remaining storage: {remaining} MB.')
print(f'Remaining file limit: {file_limit-len(keys)}')
# For human readability
converted_sizes = []
for size in sizes:
if size < 1024:
s = f'{size} bytes'
elif size < 1024:
s = f'{size/1000} KB'
else:
s = f'{size/1e+6} MB'
converted_sizes.append(s)
print('--')
print('Key, Size')
for file in zip(keys, converted_sizes,): #timestamps):
print(' '.join(file))
return keys
def clear_ObjectStore(alogrithm):
keys = [str(j).split(',')[0][1:] for _, j in enumerate(alogrithm.ObjectStore.GetEnumerator())]
for key in keys:
alogrithm.ObjectStore.Delete(key)
def serialize_and_compress(data):
'''
Serializes and compresses data using gzip and pickle for storage
in ObjectStore using SaveBytes
'''
return gzip.compress(pickle.dumps(data))
def unserialize_and_decompress_bytes(data):
'''
Unserializes and decompresses data that was serialized
and compressed using our serialize_and_compress function
and stored using ObjectStore.SaveBytes and then refetched
using ObjectStore.ReadBytes
'''
return pickle.loads(gzip.decompress(bytes(data)))
from AlgorithmImports import *
from collections import deque
from math import sqrt
# https://github.com/QuantConnect/Lean/blob/master/Algorithm.Python/CustomIndicatorAlgorithm.py
class EmaSampleStdDev():
'''
Custom indicator that calculates the Z-Score of a Sample Standard Deviation of an Exponential
Weighted Moving Average of prices for a given ema sample period and deviation length
https://www.quantconnect.com/forum/discussion/3383/custom-indicator-in-python-algorithm/p1
'''
def __init__(self, name, ema_length, deviation_length):
self.Name = name
# self.Value = 0
self.Current = IndicatorDataPoint()
self.IsReady = False
self.close_queue = deque(maxlen=deviation_length)
self.ema_queue = deque(maxlen=deviation_length+ema_length)
self.ema_length = ema_length
self.ema_multiplier = 2/(ema_length+1)
self.deviation_length = deviation_length
def __repr__(self):
return "{0} -> IsReady: {1}. Time: {2}. Value: {3}".format(self.Name, self.IsReady, self.Time, self.Value)
def sma(self, values, divider):
'''Calculates the simple moving average of a list of values and a divider'''
return sum(values) / divider
def Update(self, input):
'''
Updates the current value of the indicator and determines if it is ready to use
Args:
input (object): IBaseData object
Returns:
(bool): is the indicator ready?
'''
close = input.Close
self.close_queue.appendleft(close)
close_count = len(self.close_queue)
# self.Time = input.EndTime
ema = 0
if close_count < self.ema_length:
# if we don't have enough data yet to calculate an SMA
# just push the close price into the queue
self.ema_queue.appendleft(close)
elif close_count >= self.ema_length:
if close_count == self.ema_length:
# if we have exactly enough data in the queue to calculate an
# SMA, use that (this is our starting value)
# self.ema_queue.appendleft(close)
ema = self.sma(self.close_queue, self.ema_length)
elif close_count > self.ema_length:
# https://www.investopedia.com/ask/answers/122314/what-exponential-moving-average-ema-formula-and-how-ema-calculated.asp
# https://github.com/QuantConnect/Lean/blob/master/Indicators/ExponentialMovingAverage.cs
ema = close * self.ema_multiplier + self.ema_queue[0] * (1 - self.ema_multiplier)
self.ema_queue.appendleft(ema)
if len(self.ema_queue) == self.deviation_length + self.ema_length:
StdSampDev = sqrt(
self.sma(
[(close_price - self.ema_queue[idx])**2 for idx, close_price in enumerate(self.close_queue)], # self.close_queue must be at least deviation_length
self.deviation_length - 1
)
)
z_score = (close - ema) / StdSampDev
self.Current = IndicatorDataPoint(input.EndTime, z_score)
# self.Current = IndicatorDataPoint(input.EndTime, ema)
# self.Value = z_score
# return len(self.ema_queue) == self.deviation_length
self.IsReady = len(self.ema_queue) == self.deviation_length + self.ema_length
# self.IsReady = len(self.ema_queue) == self.ema_length
# self.Current = IndicatorDataPoint(input.EndTime, self.ema_queue[0])
# Mean reversion is a concept in finance that explains, in the long term, the price will typically revert back to a mean.
# The Reverend is a compilation of mean reversion strategies that you can select and test on specific instruments to determine the strategy and perameters you may want to execute.
# Each of the strategies you can select involve some variation of a standard deviation Z score applied to the dataset compared to a statistical mean or baseline.
# The baselines coded below include a linnear regression baseline, a price average baseline (SMA), an EMA baseline, and a volume weighted average price baseline. Computation options include Sample or Population Standard Deviation formulas.
# In general, the further the Z score is away from the mean/baseline (0 on the Z score scale), the more it is statistically improbable that it will stay there. Under normative distribution, 95% of the dataset sits between -2 and +2.
# The Reverend buys an instrument (long) when it is far away from the mean, and sells when the price reverts closer to the mean.
# It operationalizes a buy using a trailing function on the z-score after the z-score has dipped below a preset value, and buys when price reverses over the trailing value (also a % setting).
# It operationalizes a sell using an ATR TSL when the z-score reverts to 0.
# When the z-score reverts to 0, the code checks to see if an ATR TSL would have sold already, and if so, sells (this is the ATR Bail function).
# If the ATR TSL would not have sold already, it "turns on" an ATR TSL when the z-score crosses 0 and follows the rest of the trend until the ATR TSL signals to sell.
from AlgorithmImports import *
from custom.CustomHelpers import CustomHelpers
from custom.EmaSampleStdDev import EmaSampleStdDev
from datetime import datetime,timedelta
from QuantConnect.Data import Custom
from UniverseSelectionData import UniverseSelectionData
from SymbolData import SymbolData
import time
class TheReverendAlgorithm(QCAlgorithm):
def Initialize(self):
# setup back testing parameters
# self.SetStartDate(2015, 7, 1)
# self.SetEndDate(2020, 7, 1)
self.benchmark_symbol = "SPY"
self.AddEquity(self.benchmark_symbol, Resolution.Minute)
self.SetBenchmark(self.benchmark_symbol) # set statistics reference
self.SetStartDate(2021, 7, 12)
self.SetEndDate(2021, 7, 13)
self.SetCash(10000000)
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage)
# import algorithm parameters
self.max_drawdown_perc = float(self.GetParameter("max-drawdown-percent"))
self.bar_length_in_minutes = int(self.GetParameter("bar-length-in-minutes"))
self.ema_length = int(self.GetParameter("ema-length"))
self.deviation_length = int(self.GetParameter("deviation-length"))
self.min_dollar_volume = int(self.GetParameter("min-dollar-volume"))
self.min_price = int(self.GetParameter("min-price"))
self.min_market_cap = int(self.GetParameter("min-market-cap"))
self.vix_length = int(self.GetParameter("vix-length"))
self.vix_filter_max = int(self.GetParameter("vix-filter-max"))
self.ztp = float(self.GetParameter("ztp"))
self.short_ztp = float(self.GetParameter("short-ztp"))
self.resolution = Resolution.Minute
self.UniverseSettings.Resolution = self.resolution
self.AddUniverse(self.CoarseSelectionFilter, self.FineSelectionFilter)
# tracks our maximum portfolio value
self.max_portfolio_value = self.Portfolio.TotalPortfolioValue
# tracks the highest vix ema z-score from the last {self.vix_length} bars
self.vix_zscore_max = 0
# stores a UniverseSelectionData instance for each symbol we tracking
# for universe selection keyed by symbol
self.selection_data = {}
# stores a SymbolData instance for each symbol we tracking for buy/sell
# signals keyed by symbol
self.symbol_data = {}
# stores symbols that should no longer be in our universe but we are
# currently still invested in
self.universe_whitelist = []
# determines whether to try to pull initial history from object store
self.is_initial_setup_period = True
self.is_training_selection_data = False
self.is_training_symbol_data = False
self.selection_data_symbols = []
self.symbol_data_symbols = []
self.added_symbols = []
self.removed_symbols = []
# Add CBOE:VIX data
# self.vix = self.AddIndex("VIX", self.resolution).Symbol
# create a rolling windows so we can access past indicator data
# self.vix_zscore_window = RollingWindow[IndicatorDataPoint](self.vix_length)
# setup the vix zscore indicator
# self.vix_zscore = EmaSampleStdDev(
# '{}.EMA_SAMPLE_STD_DEV({}-{})'.format(self.vix, self.ema_length, self.deviation_length),
# self.ema_length,
# self.deviation_length
# )
self.universe_symbols = set()
# define our trade bar consolidator. we can
# access the bar from the DataConsolidated events
# self.bar_consolidator = TradeBarConsolidator(timedelta(minutes=self.bar_length_in_minutes))
# attach our event handler. the event handler is a function that will
# be called each time we produce a new consolidated piece of data.
# self.bar_consolidator.DataConsolidated += self.ConsolidatedBarHandler
# this call adds our consolidator to
# the manager to receive updates from the engine
# self.SubscriptionManager.AddConsolidator(self.vix, self.bar_consolidator)
# register the consolidated bar data to automatically
# update the indicators
# self.RegisterIndicator(self.vix, self.vix_zscore, self.bar_consolidator)
# vix_history = self.History(self.vix, (self.deviation_length*self.bar_length_in_minutes)*2, self.resolution)
# for time, row in vix_history.loc[self.vix].iterrows():
# tradeBar = TradeBar()
# tradeBar.Close = row['close']
# tradeBar.Open = row['open']
# tradeBar.High = row['high']
# tradeBar.Low = row['low']
# # tradeBar.Volume = row['volume']
# tradeBar.Time = time
# # tradeBar.Time = index[1]
# tradeBar.Symbol = self.vix
# self.bar_consolidator.Update(tradeBar)
self.Schedule.On(self.DateRules.EveryDay(self.benchmark_symbol), self.TimeRules.At(5, 0), self.BeforeMarketOpen)
def CoarseSelectionFilter(self, universe):
self.symbol_data_symbols.clear()
self.selection_data_symbols.clear()
universe = sorted(universe, key=lambda c: c.DollarVolume, reverse=True)
selected = [x.Symbol for x in universe if True \
and x.HasFundamentalData
and x.AdjustedPrice > self.min_price
and x.DollarVolume > self.min_dollar_volume]
self.Debug("Coarse universe size: " + str(len(selected)))
return selected
def FineSelectionFilter(self, universe):
# filter out stocks that IPO'd less than 4 years ago and have a market cap
# greater than our defined paremeter
four_years_ago = datetime.now() - timedelta(days=4*365)
selected = [x.Symbol for x in universe if True \
and x.SecurityReference.IPODate < four_years_ago
and x.MarketCap > self.min_market_cap]
for symbol in selected:
self.universe_symbols.add(symbol)
self.Debug("Fine universe size: " + str(len(self.universe_symbols)))
return list(self.universe_symbols)
def OnSecuritiesChanged(self, changes):
'''
Event fired each time the we add/remove securities from the data feed
Args:
changes: The security additions and removals from the algorithm
'''
added_symbols = [security.Symbol for security in changes.AddedSecurities]
self.Debug(f"Number of Symbols: {len(added_symbols)}")
t = time.process_time()
daily_history = self.History(added_symbols, 200, Resolution.Daily)
t2 = time.process_time()
self.Debug(f"Daily History took {str(t2-t)}")
self.Debug(f"Hourly History Started..")
hour_history = self.History(added_symbols, 2200, Resolution.Hour)
t3 = time.process_time()
self.Debug(f"Hour History took {str(t3-t2)}")
for security in changes.AddedSecurities:
symbol = security.Symbol
if symbol not in self.selection_data:
if symbol in hour_history.index.levels[0] and symbol in daily_history.index.levels[0]:
hour_df = hour_history.loc[symbol]
daily_df = daily_history.loc[symbol]
self.selection_data[symbol] = UniverseSelectionData(
self,
symbol,
hour_df,
daily_df,
)
else:
self.Log(f"No data for found {symbol}")
if self.is_initial_setup_period:
self.Train(self.DateRules.Today, self.TimeRules.At(4, 0), self.BeforeMarketOpen)
def BeforeMarketOpen(self):
self.is_initial_setup_period = False
t = time.process_time()
history_request_amount = (self.ema_length*2 + self.deviation_length)*self.bar_length_in_minutes
symbols_needed = [symbol for symbol, sel_data in self.selection_data.items() if sel_data.IsReady() and \
symbol not in self.symbol_data and \
sel_data.CanTrade()]
symbols_selected = symbols_needed
self.Debug(f"{history_request_amount} minute bars on {len(symbols_selected)} symbols out of {len(symbols_needed)}")
minute_history = self.History(symbols_selected, history_request_amount, self.resolution)
self.Debug(f"{history_request_amount} minute bars on {len(symbols_selected)} symbols took {str(time.process_time()-t)}")
for symbol in symbols_needed:
if symbol in minute_history.index.levels[0]:
symbol_data = SymbolData(symbol, self)
symbol_data.RegisterIndicators(self)
symbol_data.WarmUpIndicators(minute_history.loc[symbol], self)
self.symbol_data[symbol] = symbol_data
def OnFinishedTraining(self):
# clean up added/removed lists
self.added_symbols.clear()
self.removed_symbols.clear()
self.is_initial_setup_period = False
self.is_training_symbol_data = False
def OnData(self, data):
return
'''
OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
Arguments:
data (object): Slice object keyed by symbol containing the stock data
'''
# if not (self.vix_zscore.IsReady or not self.vix_zscore_window.IsReady):
# return
# if not CustomHelpers.IsMarketHours(self): return
t = time.process_time()
self.UpdateMaxPortfolioValue()
if self.IsInMaxDrawdownRange():
self.Notify.Email(
"reggie.ofarrell@gmail.com",
"Reverend - Max Drawdown Hit",
"Reverend script hit max drawdown at {}".format(self.Time)
)
self.Liquidate()
self.Log("Max drawdown hit, exiting algorithm")
self.Quit()
return
if self.is_training_selection_data or self.is_training_symbol_data:
self.Log("OnData called while still Training at " + str(self.Time))
return
if not data.HasData:
self.Log("slice object empty in OnData at " + str(self.Time))
return
invested = [symbol for symbol, portfolio_item in self.Portfolio.items() if portfolio_item.Invested]
# create a list to hold new symbols we will invest in
new_long_investments = []
new_short_investments = []
log_time = False
# for symbol, symbol_data in self.symbol_data.items():
for symbol in list(self.symbol_data):
symbol_data = self.symbol_data[symbol]
if not (self.Securities.ContainsKey(symbol) and self.ActiveSecurities[symbol].IsTradable):
self.Log(f"{str(symbol.Value)} is missing from self.Securities or is not tradable, skipping in OnData")
continue
if not data.Bars.ContainsKey(symbol):
self.Log("missing bars " + str(symbol) + " at " + str(self.Time))
continue
if symbol_data.CanEmit(data[symbol].Close):
log_time = True
# self.Log(f'symbol {symbol_data.symbol.Value} can emit')
# we are not invested, see if it's time to buy
if not self.Portfolio[symbol].Invested:
# buy long logic
if symbol_data.long_signal['signal'] == 'buy':
new_long_investments.append(symbol)
self.SetHoldings(symbol, 0.005, False, 'Buy')
# buy short logic
if symbol_data.short_signal['signal'] == 'buy':
# NOTE: temporarily disable shorting
# new_short_investments.append(symbol)
# self.SetHoldings(symbol, -0.01)
pass
# we are invested, update data and sell if it's time
elif self.Portfolio[symbol].Invested:
if (
self.Portfolio[symbol].IsLong and symbol_data.long_signal['signal'] == 'sell'
or self.Portfolio[symbol].IsShort and symbol_data.short_signal['signal'] == 'sell'
):
self.ExitPosition(symbol)
invested.remove(symbol)
'''Need to figure out portfolio management next'''
if log_time:
log_time = False
elapsed_time = time.process_time() - t
self.Log(f"OnData - Completed for {str(len(self.symbol_data))} symbols in {str(elapsed_time)} seconds")
def ConsolidatedBarHandler(self, sender, consolidated):
'''
This is our event handler for our consolidated trade bar defined above in Initialize().
So each time the consolidator produces a new bar, this function will be called automatically.
Args:
sender (object): IDataConsolidator that invoked the event
consolidated (object): consolidated TradeBar
'''
pass
# if not CustomHelpers.IsMarketHours(self.algorithm):
# return
# self.vix_zscore.Update(consolidated)
# if self.vix_zscore.IsReady:
# self.vix_zscore_window.Add(self.vix_zscore.Current)
# if self.vix_zscore_window.IsReady:
# self.vix_zscore_max = max([x.Value for x in self.vix_zscore_window])
def CheckVixFilter(self):
'''Determines if our stored vix_zscore_max is in the range set by the vix_filter_max parameter'''
return True
return self.vix_zscore_max < self.vix_filter_max
def ExitPosition(self, symbol):
'''
Liquidates our holdings in a stock, updates last_exit_price tracking
and does data cleanup if necessary
'''
self.Liquidate(symbol, "Sell")
# some cleanup we need to do if this stock was only still in
# our universe because we were invested in it
if symbol in self.universe_whitelist:
symbol_data = self.symbol_data.pop(symbol, None)
if symbol_data is not None:
symbol_data.RemoveConsolidators()
self.selection_data.pop(symbol, None)
self.universe_whitelist.remove(symbol)
def UpdateMaxPortfolioValue(self):
'''
Update our stored max_portfolio_value if the current self.Portfolio.TotalPortfolioValue
is higher than our currently stored value
'''
if self.Portfolio.TotalPortfolioValue > self.max_portfolio_value:
self.max_portfolio_value = self.Portfolio.TotalPortfolioValue
def IsInMaxDrawdownRange(self):
'''Check if it's time to liquidate because we've dipped below our max drawdown'''
return self.Portfolio.TotalPortfolioValue < (1 - self.max_drawdown_perc) * self.max_portfolio_value
from AlgorithmImports import *
from custom.CustomHelpers import CustomHelpers
from custom.EmaSampleStdDev import EmaSampleStdDev
from custom.ATRTrailingStop import ATRTrailingStop
from CustomEma import CustomEma
from datetime import timedelta
from collections import deque
class SymbolData:
'''Contains data specific to a symbol required by the EMMA alogrithm'''
def __init__(self, symbol, algorithm ):
self.symbol = symbol
self.algorithm = algorithm
self.consolidation_period = int(algorithm.GetParameter("bar-length-in-minutes"))
self.ema_length = int(algorithm.GetParameter("ema-length"))
self.deviation_length = int(algorithm.GetParameter("deviation-length"))
self.atr_length = int(algorithm.GetParameter("atr-length"))
self.atr_multiplier = float(algorithm.GetParameter("atr-multiplier"))
self.ztp = float(algorithm.GetParameter("ztp"))
self.short_ztp = float(algorithm.GetParameter("short-ztp"))
self.bottom_zx = float(algorithm.GetParameter("bottom-zx"))
self.top_zx = float(algorithm.GetParameter("top-zx"))
self.crash_line = float(algorithm.GetParameter("crash-line"))
self.z_trail_perc = float(algorithm.GetParameter("z-trail-percent"))
self.short_z_trail_perc = float(algorithm.GetParameter("short-z-trail-percent"))
self.bar = None
self.zcrossedzero = 0.0
self.zcrossatbuy = 0.0
self.dippedbelow4 = 0.0
self.szcrossedSZTP = 0.0
self.szcrossatbuy = 0.0
self.z_trail = deque([9999, 9999], maxlen=2)
self.short_z_trail = deque([-9999, -9999], maxlen=2)
# Options for long_signal and short_signal: None, buy, sell
self.long_signal = { "signal": None, "zscore": 0 }
self.short_signal = { "signal": None, "zscore": 0 }
# self.last_exit_price = 0
# self.highest_price_since_last_investment = 0
# self.trailing_stop_loss_price = 0
# create a rolling window so we can access past trade bars
self.window = RollingWindow[TradeBar](2)
self.ema_window = RollingWindow[IndicatorDataPoint](2)
self.ema_std_dev_window = RollingWindow[IndicatorDataPoint](2)
self.atr_tsl_window = RollingWindow[IndicatorDataPoint](2)
self.position_window = RollingWindow[IndicatorDataPoint](2)
# setup the indicators
self.ema_std_dev = EmaSampleStdDev(
'{}.EMA_SAMPLE_STD_DEV({}-{})'.format(symbol, self.ema_length, self.deviation_length),
self.ema_length,
self.deviation_length
)
self.ema = CustomEma(
'{}.EMA({})'.format(symbol, self.ema_length),
self.ema_length,
)
self.atr = AverageTrueRange('{}.ATR({})'.format(symbol, self.atr_length), self.atr_length, MovingAverageType.Wilders)
self.atr_trailing_stop = ATRTrailingStop('{}.ATRTrailingStop'.format(symbol),self.atr_multiplier)
# define our trade bar consolidator. we can
# access the bar from the DataConsolidated events
self.bar_consolidator = TradeBarConsolidator(timedelta(minutes=self.consolidation_period))
# self.bar_consolidator = TradeBarConsolidator(TimeSpan.FromMinutes(consolidation_period))
# attach our event handler. the event handler is a function that will
# be called each time we produce a new consolidated piece of data.
self.bar_consolidator.DataConsolidated += self.ConsolidatedBarHandler
# this call adds our consolidator to
# the manager to receive updates from the engine
algorithm.SubscriptionManager.AddConsolidator(symbol, self.bar_consolidator)
def RegisterIndicators(self, algorithm):
# register the consolidated bar data to automatically update the indicators
# algorithm.RegisterIndicator(self.symbol, self.ema_std_dev, self.bar_consolidator)
algorithm.RegisterIndicator(self.symbol, self.atr, self.bar_consolidator)
# not registering our custom ATRStopLoss indicator here becuase it needs to be manually updated
def RemoveConsolidators(self):
if self.bar_consolidator is not None:
self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.bar_consolidator)
def WarmUpIndicators(self, history, algorithm):
# for index, row in history.iterrows():
for time, row in history.iterrows():
tradeBar = TradeBar()
tradeBar.Close = row['close']
tradeBar.Open = row['open']
tradeBar.High = row['high']
tradeBar.Low = row['low']
tradeBar.Volume = row['volume']
tradeBar.Time = time
# tradeBar.Time = index[1]
tradeBar.Symbol = self.symbol
self.bar_consolidator.Update(tradeBar)
def ConsolidatedBarHandler(self, sender, consolidated):
'''
This is our event handler for our consolidated trade bar defined above in Initialize().
So each time the consolidator produces a new bar, this function will be called automatically.
We will do everything here instead of in OnData. The 'sender' parameter will be the instance
of the IDataConsolidator that invoked the event, but you'll almost never need that!
'''
# self.algorithm.Log("SymbolData bar handler fired")
self.bar = consolidated
# manually update our custom z-score and ema indicators
self.ema.Update(self.bar)
self.ema_std_dev.Update(self.bar)
# add the consolidated bar data to our TradeBar window
self.window.Add(consolidated)
if self.atr.IsReady:
'''
We need the AverageTrueRange indicator to be ready first because
we need it's output to be fed into the ATRTrailingStop indicator
'''
# this custom indicator takes custom parameters for Update
self.atr_trailing_stop.Update(consolidated, self.atr.Current.Value)
if not (
self.ema.IsReady
and self.ema_std_dev.IsReady
and self.atr.IsReady
and self.atr_trailing_stop.IsReady
):
# don't start populating indicator windows (or doing anything else) until the indicators are ready
return
# is_invested = self.algorithm.Portfolio.Contains(self.symbol) and self.algorithm.Portfolio[self.symbol].Invested
is_invested = self.algorithm.Portfolio[self.symbol].Invested
is_long = is_invested and self.algorithm.Portfolio[self.symbol].IsLong
is_short = is_invested and self.algorithm.Portfolio[self.symbol].IsShort
current_position = 1 if is_invested and is_long else -1 if is_invested and is_short else 0
# add consolidated indicator data to our indicator windows
self.ema_window.Add(self.ema.Current)
self.ema_std_dev_window.Add(self.ema_std_dev.Current)
self.atr_tsl_window.Add(self.atr_trailing_stop.Current)
self.position_window.Add(IndicatorDataPoint(self.algorithm.Time, current_position))
if not (self.ema_std_dev_window.IsReady and self.atr_tsl_window.IsReady and self.position_window.IsReady):
# don't do anything else until the rolling windows are ready
return
if self.algorithm.is_training_symbol_data:
return
# prevent signals from being generated outside of market hours
if not CustomHelpers.IsMarketHours(self.algorithm): return
if CustomHelpers.IndicatorCrossedValue(self.ema_std_dev_window, self.ztp, self.algorithm):
self.zcrossedzero = self.zcrossedzero + 1
if CustomHelpers.IndicatorCrossoverValue(self.position_window, 0.5, self.algorithm):
self.zcrossatbuy = self.zcrossedzero
if CustomHelpers.IndicatorCrossunderValue(self.ema_std_dev_window, self.crash_line, self.algorithm):
self.dippedbelow4 = self.zcrossedzero
if CustomHelpers.IndicatorCrossedValue(self.ema_std_dev_window, self.short_ztp, self.algorithm):
self.szcrossedSZTP = self.szcrossedSZTP + 1
if CustomHelpers.IndicatorCrossunderValue(self.position_window, -0.5, self.algorithm):
self.szcrossatbuy = self.szcrossedSZTP
# Update z_trail
if self.ema_std_dev_window[1].Value < self.bottom_zx and not is_invested:
trail_value = self.ema_std_dev_window[0].Value * (1 - self.z_trail_perc)
self.z_trail.appendleft(min(trail_value, self.z_trail[1]))
else:
self.z_trail.appendleft(99999)
# Update short_z_trail
if self.ema_std_dev_window[1].Value > self.top_zx and not is_invested:
short_trail_value = self.ema_std_dev_window[0].Value * (1 - self.short_z_trail_perc)
self.short_z_trail.appendleft(max(short_trail_value, self.short_z_trail[1]))
else:
self.short_z_trail.appendleft(-99999)
#################
# Trade Signals #
#################
buy_long_signal = False
buy_short_signal = False
sell_long_signal = False
sell_short_signal = False
### BUY SIGNALS ###
# self.algorithm.Plot("Long Z-Trail Windows", "Z-Score[0]", self.ema_std_dev_window[0].Value)
# self.algorithm.Plot("Long Z-Trail Windows", "Z-Trail[0]", self.z_trail[0])
# self.algorithm.Plot("Long Z-Trail Windows", "Z-Score[1]", self.ema_std_dev_window[1].Value)
# self.algorithm.Plot("Long Z-Trail Windows", "Z-Trail[1]", self.z_trail[1])
# self.algorithm.Plot("Short Z-Trail Windows", "Z-Score[0]", self.ema_std_dev_window[0].Value)
# self.algorithm.Plot("Short Z-Trail Windows", "Z-Trail[0]", self.short_z_trail[0])
# self.algorithm.Plot("Short Z-Trail Windows", "Z-Score[1]", self.ema_std_dev_window[1].Value)
# self.algorithm.Plot("Short Z-Trail Windows", "Z-Trail[1]", self.short_z_trail[1])
# self.algorithm.Plot("Custom", "Z-Score", self.ema_std_dev.Current.Value)
# self.algorithm.Plot("Custom", "EMA", self.ema.Current.Value)
# self.algorithm.Plot("Custom", "ATR Trail", self.atr_tsl_window[0].Value)
# self.algorithm.Plot("Custom", "Close", self.bar.Close)
if (
CustomHelpers.IndicatorCrossoverValueWindow(self.ema_std_dev_window, self.z_trail, self.algorithm)
and self.CheckCrashFilter()
and self.algorithm.CheckVixFilter()
):
buy_long_signal = True
if not is_invested:
self.algorithm.Log(f"Buy Signal - {self.symbol.Value} - Long")
if CustomHelpers.IndicatorCrossunderValueWindow(self.ema_std_dev_window, self.short_z_trail, self.algorithm):
buy_short_signal = True
if not is_invested:
self.algorithm.Log(f"Buy Signal - {self.symbol.Value} - Short")
### SELL SIGNALS ###
long_atr_ts = False
long_atr_bail = False
long_mean_bail = False
short_atr_ts = False
short_atr_bail = False
short_mean_bail = False
# Long ATR Trailing Stop
if (
CustomHelpers.EquityCrossunderIndicator(self.window, self.atr_tsl_window, self.algorithm)
and self.zcrossedzero > self.zcrossatbuy
):
# sell_long_signal = True
long_atr_ts = True
if is_invested:
self.algorithm.Log(f"Sell Signal - {self.symbol.Value} - Long ATR Trailing Stop")
# Long ATR Bail
if (
CustomHelpers.IndicatorCrossoverValue(self.ema_std_dev_window, self.ztp, self.algorithm)
and self.ema_std_dev_window[0].Value > self.window[0].Close
):
# sell_long_signal = True
long_atr_bail = True
if is_invested:
self.algorithm.Log(f"Sell Signal - {self.symbol.Value} - Long ATR Bail")
# Long Mean Bail (Ruh Roh)
if is_invested and CustomHelpers.IndicatorCrossunderValue(
self.ema_window,
self.algorithm.Portfolio[self.symbol].AveragePrice,
self.algorithm
):
# sell_long_signal = True
long_mean_bail = True
if is_invested:
self.algorithm.Log(f"Sell Signal - {self.symbol.Value} - Long Mean Bail")
# Combine signals
if long_atr_ts or long_atr_bail or long_mean_bail:
sell_long_signal = True
# Short ATR Trailing Stop
if (
CustomHelpers.EquityCrossoverIndicator(self.window, self.atr_tsl_window, self.algorithm)
and self.szcrossedSZTP > self.szcrossatbuy
):
# sell_short_signal = True
short_atr_ts = True
if is_invested:
self.algorithm.Log(f"Sell Signal - {self.symbol.Value} - Short ATR Trailing Stop")
# Short ATR Bail
if (
CustomHelpers.IndicatorCrossunderValue(self.atr_tsl_window, self.short_ztp, self.algorithm)
and self.ema_std_dev_window[0].Value < self.window[0].Close
):
# sell_short_signal = True
short_atr_bail = True
if is_invested:
self.algorithm.Log(f"Sell Signal - {self.symbol.Value} - Short ATR Bail")
# Short Mean Bail (Ruh Roh)
if is_invested and CustomHelpers.IndicatorCrossoverValue(
self.ema_window,
self.algorithm.Portfolio[self.symbol].AveragePrice,
self.algorithm
):
# sell_long_signal = True
short_mean_bail = True
if is_invested:
self.algorithm.Log(f"Sell Signal - {self.symbol.Value} - Short Mean Bail")
# Combine signals
if short_atr_ts or short_atr_bail or short_mean_bail:
sell_short_signal = True
### WHAT TO DO WITH MULTIPLE SIGNALS ###
long_signal = None if (not is_invested and buy_long_signal and sell_long_signal) \
else 'buy' if buy_long_signal and not sell_long_signal \
else 'sell' if \
(is_invested and buy_long_signal and sell_long_signal) \
or (sell_long_signal and not buy_long_signal) \
else self.long_signal["signal"]
if long_signal != self.long_signal["signal"]:
self.long_signal = { "signal": long_signal, "zscore": self.ema_std_dev.Current.Value }
short_signal = None if (not is_invested and buy_short_signal and sell_short_signal) \
else 'buy' if buy_short_signal and not sell_short_signal \
else 'sell' if \
(is_invested and buy_short_signal and sell_short_signal) \
or (sell_short_signal and not buy_short_signal) \
else self.short_signal["signal"]
if short_signal != self.short_signal["signal"]:
self.short_signal = { "signal": short_signal, "zscore": self.ema_std_dev.Current.Value }
# @property
# def Return(self):
# return float(self.ROC.Current.Value)
def CanEmit(self, update_close):
'''
Determines if this symbol can be traded on the current bar in OnData
Args:
update_close (float): current close price, typically passed from OnData
Returns:
bool
'''
# self.algorithm.Log("Can Emit:")
# self.algorithm.Log("self.bar.Close == update_close: " + str(self.bar.Close == update_close))
# self.algorithm.Log("self.ema_std_dev.IsReady: " + str(self.ema_std_dev.IsReady))
# self.algorithm.Log("self.atr.IsReady: " + str(self.atr.IsReady))
# self.algorithm.Log("self.atr_trailing_stop.IsReady: " + str(self.atr_trailing_stop.IsReady))
# self.algorithm.Log("self.window.IsReady: " + str(self.window.IsReady))
# self.algorithm.Log("self.ema_std_dev_window.IsReady: " + str(self.ema_std_dev_window.IsReady))
# self.algorithm.Log("self.atr_tsl_window.IsReady: " + str(self.atr_tsl_window.IsReady))
# self.algorithm.Log("self.position_window.IsReady: " + str(self.position_window.IsReady))
# self.algorithm.Log("")
return self.bar.Close == update_close \
and self.ema.IsReady \
and self.ema_std_dev.IsReady \
and self.atr.IsReady \
and self.atr_trailing_stop.IsReady \
and self.window.IsReady \
and self.ema_window.IsReady \
and self.ema_std_dev_window.IsReady \
and self.atr_tsl_window.IsReady \
and self.position_window.IsReady
def CheckCrashFilter(self):
'''Returns True if we are NOT in a severe z-score dip'''
return self.dippedbelow4 != self.zcrossedzero
# def ResetPriceTracking(self):
# self.highest_price_since_last_investment = 0
# self.trailing_stop_loss_price = 0
# # self.hard_stop_market_ticket = None
# def UpdateHighestPrice(self):
# if (self.bar.Close > self.highest_price_since_last_investment):
# self.highest_price_since_last_investment = self.bar.Close
# def IsInTrailingStopLossRange(self):
# return self.bar.Close < (1 - self.trailing_stop_loss_perc) * self.highest_price_since_last_investment
# def IsInHardStopLossRange(self):
# return self.bar.Close < (1 - self.stop_loss_perc) * self.algorithm.Portfolio[self.symbol].AveragePrice
# def IsInTakeProfitRange(self):
# return CustomHelpers.EquityCrossoverValue(
# self.window,
# self.algorithm.Portfolio[self.symbol].AveragePrice * (1 + self.ema_length),
# self.algorithm
# )
# def CheckHighFilter(self, returnSingleBuffer = False):
# # get the highest closing price between 25 bars back and the end of
# # the rolling window
# high_period = max(list(map(lambda bar: bar.Close, self.window))[25:])
# buffer_1 = high_period * self.high_buffer
# buffer_2 = high_period * self.low_buffer
# buffers = {
# 'buffer_1': buffer_1,
# 'buffer_2': buffer_2
# }
# return buffers[returnSingleBuffer] \
# if returnSingleBuffer != False \
# else self.bar.Close < buffer_1 and self.bar.Close > buffer_2
# # if returnSingleBuffer != False:
# # return buffers[returnSingleBuffer]
# # elif (returnSingleBuffer == False):
# # return self.bar.lose < buffer_1 and self.bar.lose > buffer_2
# def HasMovedEnough(self):
# if not self.algorithm.Portfolio[self.symbol].Invested:
# self.algorithm.Debug("HasMovedEnough called when not invested")
# return False
# avg_price = self.algorithm.Portfolio[self.symbol].AveragePrice
# mindown = avg_price * self.moved_enough_min_down
# minup = avg_price * self.moved_enough_min_up
# return mindown >= self.bar.Close or minup <= self.bar.Close
# def CheckLastExitFilter(self):
# exitbuffer1 = self.last_exit_price * self.last_exit_high_buffer
# exitbuffer2 = self.last_exit_price * self.last_exit_low_buffer
# return self.bar.Close < exitbuffer1 and self.bar.Close > exitbuffer2
from AlgorithmImports import *
from custom.CustomEma import CustomEma
from CustomHelpers import CustomHelpers
from QuantConnect.Data import Custom
from statsmodels.tsa.stattools import adfuller, kpss
import numpy as np
from datetime import timedelta
class UniverseSelectionData():
'''Contains data specific to a fine universe selection required by The Reverend alogrithm'''
def __init__(
self,
algorithm,
symbol,
hour_history,
daily_history
):
self.ema_length = int(algorithm.GetParameter("ema-length"))
self.bar_length_in_minutes = int(algorithm.GetParameter("bar-length-in-minutes"))
self.min_avg_daily_dollar_volume = int(algorithm.GetParameter("min-avg-daily-dollar-volume"))
self.min_avg_daily_stock_price = int(algorithm.GetParameter("min-avg-daily-stock-price"))
self.universe_selection_sma_length = int(algorithm.GetParameter("selection-sma-length"))
self.universe_selection_hour_bar_window_size = int(algorithm.GetParameter("selection-hour-bar-window-size"))
self.adf_p_value = float(algorithm.GetParameter("adf-p-value"))
self.adf_use_ema_diffs = int(algorithm.GetParameter("enable-adf-use-ema-diffs"))
self.kpss_p_value = float(algorithm.GetParameter("kpss-p-value"))
self.variance_ratio_p_value = float(algorithm.GetParameter("variance-ratio-p-value"))
self.hurst_value = float(algorithm.GetParameter("hurst-value"))
self.enable_sma_filter = int(algorithm.GetParameter("enable-sma-filter"))
self.enable_volume_liquidity_filter = int(algorithm.GetParameter("enable-volume-liquidity-filter"))
self.enable_price_qualifier_filter = int(algorithm.GetParameter("enable-price-qualifier-filter"))
self.enable_adf_filter = int(algorithm.GetParameter("enable-adf-filter"))
self.enable_kpss_filter = int(algorithm.GetParameter("enable-kpss-filter"))
self.enable_hurst_variance_filter = int(algorithm.GetParameter("enable-hurst-variance-filter"))
self.symbol = symbol
self.algorithm = algorithm
hour_ema_length = round(self.ema_length/12)
# self.algorithm.Log("hour_ema_length: " + str(hour_ema_length))
self.ema = CustomEma(
'{}.UniverseEMA({})'.format(symbol, hour_ema_length),
hour_ema_length,
)
self.daily_bar_window = RollingWindow[TradeBar](self.universe_selection_sma_length)
self.hour_ema_window = RollingWindow[IndicatorDataPoint](self.universe_selection_hour_bar_window_size)
self.hour_bar_window = RollingWindow[TradeBar](self.universe_selection_hour_bar_window_size)
# we're not using self.algorithm.SubscriptionManager.AddConsolidator here because
# we're manually passing minute bars to self.five_min_consolidator via
# self.five_min_consolidator.Update with data that is coming from a history call
# and you can't add a Subscription before a symbol is in the universe
#
# self.five_min_consolidator = TradeBarConsolidator(timedelta(minutes=5))
# self.five_min_consolidator.DataConsolidated += self.FiveMinuteBarConsolidator
self.daily_consolidator = TradeBarConsolidator(timedelta(days=1))
self.daily_consolidator.DataConsolidated += self.DailyBarHandler
algorithm.SubscriptionManager.AddConsolidator(symbol, self.daily_consolidator)
self.hours_consolidator = TradeBarConsolidator(timedelta(hours=1))
self.hours_consolidator.DataConsolidated += self.HourlyBarHandler
algorithm.SubscriptionManager.AddConsolidator(symbol, self.hours_consolidator)
for time, row in daily_history.iterrows():
tradeBar = TradeBar()
tradeBar.Close = row['close']
tradeBar.Volume = row['volume']
tradeBar.Time = time
tradeBar.Symbol = self.symbol
self.daily_bar_window.Add(tradeBar)
# for time, row in minute_history.iterrows():
for time, row in hour_history.iterrows():
tradeBar = TradeBar()
tradeBar.Close = row['close']
tradeBar.Open = row['open']
tradeBar.High = row['high']
tradeBar.Low = row['low']
tradeBar.Volume = row['volume']
tradeBar.Time = time
tradeBar.Symbol = self.symbol
# self.five_min_consolidator.Update(tradeBar)
self.hour_bar_window.Add(tradeBar)
self.ema.Update(tradeBar)
if self.ema.IsReady:
self.hour_ema_window.Add(self.ema.Current)
def IsReady(self):
# self.algorithm.Log(f"{self.symbol.Value} daily_bar_window.IsReady: {self.daily_bar_window.IsReady}")
# self.algorithm.Log(f"{self.symbol.Value} ema.IsReady: {self.ema.IsReady}")
# self.algorithm.Log(f"{self.symbol.Value} hour_ema_window.IsReady: {self.hour_ema_window.IsReady}")
# self.algorithm.Log(f"{self.symbol.Value} hour_bar_window.IsReady: {self.hour_bar_window.IsReady}")
is_ready = self.daily_bar_window.IsReady \
and self.ema.IsReady \
and self.hour_ema_window.IsReady \
and self.hour_bar_window.IsReady
return is_ready
def CanTrade(self):
'''Determines if this symbol meets our trading criteria'''
trade_days_in_a_month = 21
daily_close_prices = [x.Close for x in self.daily_bar_window]
daily_trade_volumes = [x.Volume for x in self.daily_bar_window]
#################################
# Above Moving Average Criteria #
#################################
# 200 day sma
sma = sum(daily_close_prices[0:self.universe_selection_sma_length])/self.universe_selection_sma_length
is_above_sma = daily_close_prices[0] > sma
if self.enable_sma_filter:
if not is_above_sma: return False
######################################
# Trade Volume & Liquidity Qualifier #
######################################
# Is the average daily trade volume over a certain number the past month (21 trade days)?
# Note that volume by default in tradingview is the number of shares traded.
# We convert that to dollars to normalize it and make trade volume comparible enough to judge liquidity.
# We do this by taking the sum of trade volume over the past month and multiplying it by the average price for the month, then dividing by the days in the month (len).
average_price = sum(daily_close_prices[0:trade_days_in_a_month])/trade_days_in_a_month
combined_daily_share_volume = sum(daily_trade_volumes[0:trade_days_in_a_month])
average_daily_dollar_volume = (combined_daily_share_volume * average_price) / trade_days_in_a_month
# average_daily_dollar_volume = sum(daily_trade_volumes[0:trade_days_in_a_month])/trade_days_in_a_month
has_enough_share_volume = average_daily_dollar_volume > self.min_avg_daily_dollar_volume
if self.enable_volume_liquidity_filter:
if not has_enough_share_volume: return False
###################
# Price Qualifier #
###################
# Only trade things with an average price over X in the past month. X should probably
# land somewhere in between $5-$10.
has_high_enough_avg_price = sum(daily_close_prices[0:trade_days_in_a_month])/trade_days_in_a_month > self.min_avg_daily_stock_price
if self.enable_price_qualifier_filter:
if not has_high_enough_avg_price: return False
############
# ADF Test #
############
# create a list with the difference between the EMA and close price for every bar in the rolling window
bar_ema_diffs_ar = np.array([self.hour_ema_window[idx].Value - val.Close for idx, val in enumerate(self.hour_bar_window)])
close_ar = np.array([x.Close for x in self.hour_bar_window])
# bar_vals = [
# {
# "close": val.Close,
# "ema": self.five_min_ema_window[idx].Value,
# "diff": bar_ema_diffs_ar[idx]
# } for idx, val in enumerate(self.five_min_bar_window)
# ]
# self.algorithm.Log("ADF Bar Vals: " + str(bar_vals))
# self.algorithm.Log("Bar-EMA Diffs: " + str(bar_ema_diffs_ar))
adf_result = adfuller(
bar_ema_diffs_ar if self.adf_use_ema_diffs else close_ar,
None,
'ctt'
)
adf_p = adf_result[1]
# self.algorithm.Log(str(self.symbol.Value) + "," + str(close_p) + "," + str(close_critical['1%']) + "," + str(close_critical['5%']) + "," + str(close_critical['10%']))
# self.algorithm.Log("EMA :: " + str(self.symbol.Value) + " P: " + str(bar_ema_diffs_adf_result[1]) + " C: " + str(bar_ema_diffs_adf_result[4]))
# self.algorithm.Log("Close :: " + str(self.symbol.Value) + " P: " + str(close_adf_result[1]) + " C: " + str(close_adf_result[4]))
if self.enable_adf_filter:
if adf_p >= self.adf_p_value: return False
#############
# KPSS Test #
#############
# bar_vals = [
# {
# "close": val.Close,
# "ema": self.five_min_ema_window[idx].Value,
# "diff": bar_ema_diffs_ar[idx]
# } for idx, val in enumerate(self.five_min_bar_window)
# ]
# self.algorithm.Log("ADF Bar Vals: " + str(bar_vals))
# self.algorithm.Log("Bar-EMA Diffs: " + str(bar_ema_diffs_ar))
# bar_ema_diffs_adf_result = kpss(bar_ema_diffs_ar, 'ct')
kpss_result = kpss(close_ar, 'ct')
kpss_p = kpss_result[1]
# self.algorithm.Log(str(self.symbol.Value) + "," + str(close_p) + "," + str(close_critical['1%']) + "," + str(close_critical['5%']) + "," + str(close_critical['10%']))
# self.algorithm.Log("EMA :: " + str(self.symbol.Value) + " P: " + str(bar_ema_diffs_adf_result[1]) + " C: " + str(bar_ema_diffs_adf_result[4]))
# self.algorithm.Log("Close :: " + str(self.symbol.Value) + " P: " + str(close_adf_result[1]) + " C: " + str(close_adf_result[4]))
if self.enable_kpss_filter:
if kpss_p <= self.kpss_p_value: return False
###################################
# Hurst Exponent & Variance Ratio #
###################################
hurst = CustomHelpers.hurst(close_ar)
vratio = CustomHelpers.vratio(close_ar)
vratio_p = vratio[2]
if self.enable_hurst_variance_filter:
if hurst > self.hurst_value and vratio_p > self.variance_ratio_p_value: return False
##############
# Good to go #
##############
# if we made it this far, return True
return True
def DailyBarHandler(self, daily_bar):
'''
Update the self.daily_bar_window with a new trade bar
Args:
time - datetime object for the bar
price - close price for the bar
volume - volume for the bars
'''
self.daily_bar_window.Add(daily_bar)
def HourlyBarHandler(self, hourly_bar):
'''
Loops over a history of hour bars to update our windows
Args:
history - history dataframe in minute bars
'''
self.hour_bar_window.Add(hourly_bar)
self.ema.Update(hourly_bar)
if self.ema.IsReady:
self.hour_ema_window.Add(self.ema.Current)
# def RemoveConsolidators(self):
# if self.five_min_consolidator is not None:
# self.algorithm.SubscriptionManager.RemoveConsolidator(self.symbol, self.five_min_consolidator)
# def WarmUpIndicators(self, history):
# for time, row in history.iterrows():
# tradeBar = TradeBar()
# tradeBar.Close = row['close']
# tradeBar.Open = row['open']
# tradeBar.High = row['high']
# tradeBar.Low = row['low']
# tradeBar.Volume = row['volume']
# tradeBar.Time = time
# # tradeBar.Time = index[1]
# tradeBar.Symbol = self.symbol
# self.five_min_consolidator.Update(tradeBar)
# def FiveMinuteBarConsolidator(self, sender, consolidated):
# # self.algorithm.Log("FiveMinuteBarConsolidator Time :" + str(consolidated.Time))
# self.five_min_bar_window.Add(consolidated)
# self.ema.Update(consolidated)
# if self.ema.IsReady:
# self.five_min_ema_window.Add(self.ema.Current)