| Overall Statistics |
|
Total Orders 19017 Average Win 0.23% Average Loss -0.43% Compounding Annual Return 33.869% Drawdown 67.000% Expectancy 0.153 Start Equity 100000 End Equity 320900.58 Net Profit 220.901% Sharpe Ratio 0.742 Sortino Ratio 0.816 Probabilistic Sharpe Ratio 21.584% Loss Rate 25% Win Rate 75% Profit-Loss Ratio 0.55 Alpha 0.154 Beta 2.317 Annual Standard Deviation 0.46 Annual Variance 0.212 Information Ratio 0.901 Tracking Error 0.289 Treynor Ratio 0.147 Total Fees $2404.23 Estimated Strategy Capacity $28000000.00 Lowest Capacity Asset SBC R735QTJ8XC9X Portfolio Turnover 3.18% |
#region imports
from AlgorithmImports import *
#endregion
import talib
import numpy as np
weights = {}
weights["TASUKIGAP"] = 1.5
weights["SEPARATINGLINES"] = 1
weights["GAPSIDESIDEWHITE"] = .5
weights["HARAMI"] = 1.5
weights["HIKKAKE"] = 1.5
weights["HOMINGPIGEON"] = 1
weights["HAMMER"] = .5
weights["MARUBOZU"] = .5
weights["DARKCLOUDCOVER"] = -1.5
weights["3LINESTRIKE"] = -1.5
weights["ENGULFING"] = -1
weights["SHOOTINGSTAR"] = -.5
def get_score(rolling_window, size, trend):
O = np.array([rolling_window[i].Open for i in range(size)])
H = np.array([rolling_window[i].High for i in range(size)])
L = np.array([rolling_window[i].Low for i in range(size)])
C = np.array([rolling_window[i].Close for i in range(size)])
continuation_patterns = []
continuation_patterns.append(talib.CDLTASUKIGAP(O, H, L, C))
continuation_patterns.append(talib.CDLSEPARATINGLINES(O, H, L, C))
continuation_patterns.append(talib.CDLGAPSIDESIDEWHITE(O,H,L,C))
reversal_to_bull_patterns = []
reversal_to_bull_patterns.append(talib.CDLHARAMI(O,H,L,C))
reversal_to_bull_patterns.append(talib.CDLHIKKAKE(O,H,L,C))
reversal_to_bull_patterns.append(talib.CDLHOMINGPIGEON(O,H,L,C))
reversal_to_bull_patterns.append(talib.CDLHAMMER(O,H,L,C))
reversal_to_bull_patterns.append(talib.CDLMARUBOZU(O,H,L,C))
reversal_to_bear_patterns = []
reversal_to_bear_patterns.append(talib.CDLDARKCLOUDCOVER(O,H,L,C))
reversal_to_bear_patterns.append(talib.CDL3LINESTRIKE(O,H,L,C))
reversal_to_bear_patterns.append(talib.CDLENGULFING(O,H,L,C))
reversal_to_bear_patterns.append(talib.CDLSHOOTINGSTAR(O,H,L,C))
final_weight = 0
if trend == 1 or trend == -1:
for i in range(len(continuation_patterns)-1):
if continuation_patterns[i].any() > 0:
if i == 0:
# TASUKI GAP
final_weight += weights["TASUKIGAP"] * trend
elif i == 1:
# SEPARATING LINES
final_weight += weights["SEPARATINGLINES"] * trend
elif i == 2:
# GAP SIDE SIDE WHITE
final_weight += weights["GAPSIDESIDEWHITE"] * trend
elif trend == -.5:
for i in range(len(reversal_to_bull_patterns)-1):
if reversal_to_bull_patterns[i].any() > 0:
if i == 0:
# HARAMI
final_weight += weights["HARAMI"]
elif i == 1:
# HIKKAKE
final_weight += weights["HIKKAKE"]
elif i == 2:
# HOMING PIGEON
final_weight += weights["HOMINGPIGEON"]
elif i == 3:
# HAMMER
final_weight += weights["HAMMER"]
elif i == 4:
# MARUBOZU
final_weight += weights["MARUBOZU"]
elif trend == .5:
for i in range(len(reversal_to_bear_patterns)-1):
if reversal_to_bear_patterns[i].any() > 0:
if i == 0:
# DARK CLOUD COVER
final_weight += weights["DARKCLOUDCOVER"]
elif i == 1:
# 3 LINE STRIKE
final_weight += weights["3LINESTRIKE"]
elif i == 2:
# ENGULFING
final_weight += weights["ENGULFING"]
elif i == 3:
# SHOOTING STAR
final_weight += weights["SHOOTINGSTAR"]
return final_weight# region imports
from datetime import datetime
from AlgorithmImports import *
from collections import deque
import numpy as np
from candlestickScore import get_score
from trendCalculator import get_trend
class bollinger_holder:
def __init__(self, lower, middle, upper):
self.lower = lower
self.middle = middle
self.upper = upper
class CompetitionAlgorithm(QCAlgorithm):
class bollinger_holder:
def __init__(self, lower, middle, upper):
self.lower = lower
self.middle = middle
self.upper = upper
class macd_holder:
def __init__(self, fast, slow, signal, macd):
self.fast = fast
self.slow = slow
self.signal = signal
self.macd = macd
def Initialize(self):
self.SetStartDate(2020, 1, 1)
self.SetEndDate(2024, 1, 1)
self.SetCash(100000)
self.SetWarmUp(15)
# Parameters:
self.final_universe_size = 25
self.candles_history_size = 11 # 1.5 days
self.clear_old_scores_days = 1
#self.min_buy_score = float(self.get_parameter("min_buy_score"))
#self.max_sell_score = float(self.get_parameter("max_sell_score"))
self.min_buy_score = .65
self.max_sell_score = .35
self.macd_lookback = 21 # 3 days
self.min_macd = .45
self.Bollinger_window_size = 14 # 2 days
self.SMA_rolling_window_length = 280 # 2 months
self.trend_order = 10
self.K_order = 2
self.RSIS_rolling_window_length = 500 # 2.5 months
# score weights
#self.ccs = float(self.get_parameter("ccs"))
#self.mcd = float(self.get_parameter("mcd"))
#self.bb = float(self.get_parameter("bb"))
#self.rs = float(self.get_parameter("rs"))
#self.ptf = float(self.get_parameter("ptf"))
#self.ccs = 1 - self.mcd - self.bb - self.rs - self.ptf
self.mcd = .15
self.bb = .1
self.rs = .3
self.ptf = .1
self.ccs = .35
self.trend_history_size = 500 # 2.5 month
self.macd_score_info = [-35, 38]
self.candlestick_score_info = [-20, 20]
# and all the weights for the candlestick patterns
# data holders
self.rollingWindows = {}
self.trend_rolling_windows = {}
self.price_movements = {}
self.check_agains = []
self.MACDS = {}
self.MACDS_rolling_windows = {}
self.Bollingers = {}
self.Bollingers_rolling_windows = {}
self.SMAS200 = {}
self.SMAS50 = {}
self.SMAS200_rolling_windows = {}
self.SMAS50_rolling_windows = {}
self.RSIS = {}
self.RSIS_rolling_windows = {}
# portfolio management
self.candleStickScores = {}
self.candlestick_test_dates_to_sell = {}
self.investedScores = {}
# Universe selection
self.rebalanceTime = self.time
self.activeStocks = set()
# Define universe
self.universe_type = self.get_parameter("universe_type")
if self.universe_type == "crypto":
self.add_universe(CryptoUniverse.coinbase(self.crypto_filter))
elif self.universe_type == "equity":
self.AddUniverse(self.CoarseFilter, self.FineFilter)
self.UniverseSettings.Resolution = Resolution.Hour
# Debug
self.macd_scores = []
self.candlestick_scores = []
self.final_scores = []
def crypto_filter(self, crypto):
sortedByVolume = sorted(crypto, key=lambda x: x.Volume, reverse=True)
return [x.Symbol for x in sortedByVolume][:10]
def CoarseFilter(self, coarse):
# Rebalancing monthly
if self.Time <= self.rebalanceTime:
return self.Universe.Unchanged
self.rebalanceTime = self.Time + timedelta(days=10000)
sortedByDollarVolume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
return [x.Symbol for x in sortedByDollarVolume if x.HasFundamentalData][:1000]
def FineFilter(self, fine):
sortedbyVolume = sorted(fine, key=lambda x: x.DollarVolume, reverse=True )
fine_output = [x.Symbol for x in sortedbyVolume if x.MarketCap > 0][:self.final_universe_size]
fine_output_str = ""
for x in fine_output:
fine_output_str += str(x).split(" ")[0] + " "
#self.Log("fine_output_str: " + fine_output_str)
return fine_output
def OnSecuritiesChanged(self, changes):
# close positions in removed securities
for x in changes.RemovedSecurities:
self.Liquidate(x.Symbol)
self.activeStocks.remove(x.Symbol)
# can't open positions here since data might not be added correctly yet
for x in changes.AddedSecurities:
self.activeStocks.add(x.Symbol)
if self.universe_type == "crypto":
self.rollingWindows[x.Symbol] = RollingWindow[QuoteBar](self.candles_history_size)
elif self.universe_type == "equity":
self.rollingWindows[x.Symbol] = RollingWindow[TradeBar](self.candles_history_size)
self.trend_rolling_windows[x.Symbol] = RollingWindow[float](self.trend_history_size)
self.MACDS[x.Symbol] = self.MACD(x.Symbol, 12, 26, 9, MovingAverageType.Simple, Resolution.HOUR)
self.MACDS_rolling_windows[x.Symbol] = deque(maxlen=self.macd_lookback)
self.Bollingers[x.Symbol] = self.BB(x.Symbol, 20, 2, MovingAverageType.Simple, Resolution.HOUR)
self.Bollingers_rolling_windows[x.Symbol] = deque(maxlen=self.Bollinger_window_size)
self.SMAS200[x.Symbol] = self.SMA(x.Symbol, 200, Resolution.HOUR)
self.SMAS50[x.Symbol] = self.SMA(x.Symbol, 50, Resolution.HOUR)
self.SMAS200_rolling_windows[x.Symbol] = RollingWindow[float](self.SMA_rolling_window_length)
self.SMAS50_rolling_windows[x.Symbol] = RollingWindow[float](self.SMA_rolling_window_length)
self.RSIS[x.Symbol] = self.RSI(x.Symbol, 14, MovingAverageType.Simple, Resolution.Hour)
self.RSIS_rolling_windows[x.Symbol] = RollingWindow[float](self.RSIS_rolling_window_length)
if self.universe_type == "crypto":
history = self.History[QuoteBar](x.Symbol, self.trend_history_size, Resolution.Hour)
elif self.universe_type == "equity":
history = self.History[TradeBar](x.Symbol, self.trend_history_size, Resolution.Hour)
for bar in history:
self.rollingWindows[x.Symbol].Add(bar)
self.trend_rolling_windows[x.Symbol].Add(bar.Close)
self.MACDS[x.Symbol].Update(bar.EndTime, bar.Close)
new_macd = self.macd_holder(self.MACDS[x.Symbol].Fast.Current.Value, self.MACDS[x.Symbol].Slow.Current.Value, self.MACDS[x.Symbol].Signal.Current.Value, self.MACDS[x.Symbol].Current.Value)
self.MACDS_rolling_windows[x.Symbol].append(new_macd)
self.Bollingers[x.Symbol].Update(bar.EndTime, bar.Close)
new_bol = bollinger_holder(self.Bollingers[x.Symbol].LowerBand.Current.Value, self.Bollingers[x.Symbol].MiddleBand.Current.Value, self.Bollingers[x.Symbol].UpperBand.Current.Value)
self.Bollingers_rolling_windows[x.Symbol].append(new_bol)
self.SMAS200[x.Symbol].Update(bar.EndTime, bar.Close)
self.SMAS50[x.Symbol].Update(bar.EndTime, bar.Close)
self.RSIS[x.Symbol].Update(bar.EndTime, bar.Close)
self.RSIS_rolling_windows[x.Symbol].Add(self.RSIS[x.Symbol].Current.Value)
def OnData(self, data):
#self.activeStocks = set()
#self.activeStocks.add(self.AddEquity("AAPL").Symbol)
self.finalScores = {}
#self.activeStocks = set()
#self.activeStocks.add(self.AddEquity("SPY", Resolution.Hour).Symbol)
for symbol in self.activeStocks:
# if symbol not in data slice return
if self.universe_type == "crypto":
if not data.QuoteBars.ContainsKey(symbol) or data.QuoteBars[symbol] is None:
return
elif self.universe_type == "equity":
if not data.ContainsKey(symbol) or data[symbol] is None:
return
if not self.rollingWindows[symbol].IsReady:
self.rollingWindows[symbol].Add(data[symbol])
return
if not self.MACDS[symbol].IsReady:
return
if self.universe_type == "crypto":
self.rollingWindows[symbol].Add(data.QuoteBars[symbol])
elif self.universe_type == "equity":
self.rollingWindows[symbol].Add(data[symbol])
self.trend_rolling_windows[symbol].Add(data[symbol].Close)
self.Bollingers_rolling_windows[symbol].append(self.bollinger_holder(self.Bollingers[symbol].LowerBand.Current.Value, self.Bollingers[symbol].MiddleBand.Current.Value, self.Bollingers[symbol].UpperBand.Current.Value))
self.MACDS_rolling_windows[symbol].append(self.macd_holder(self.MACDS[symbol].Fast.Current.Value, self.MACDS[symbol].Slow.Current.Value, self.MACDS[symbol].Signal.Current.Value, self.MACDS[symbol].Current.Value))
self.SMAS200_rolling_windows[symbol].Add(self.SMAS200[symbol].Current.Value)
self.SMAS50_rolling_windows[symbol].Add(self.SMAS50[symbol].Current.Value)
self.RSIS_rolling_windows[symbol].Add(self.RSIS[symbol].Current.Value)
#if self.time.hour == 10 and self.time.minute == 0:
self.plot_indicators(symbol, data)
rolling_data = [x for x in self.trend_rolling_windows[symbol]]
self.Log("rolling_Data: " + str(rolling_data))
price_trend_info = get_trend(self.trend_rolling_windows[symbol], self.trend_order, self.K_order)
price_trend = price_trend_info[0]
price_recent_swing = price_trend_info[1]
price_total_swing = price_trend_info[2]
#self.Plot("trend", "recent_swing", recent_swing)
#self.Plot("trend", "total_swing", total_swing)
#self.Plot("trend", "price", data[symbol].Close)
if price_recent_swing <= 0 and price_total_swing <= 0:
price_trend = -1
price_trend_factor = .4
elif price_recent_swing >= 0 and price_total_swing >= 0:
price_trend = 1
price_trend_factor = .6
elif price_recent_swing <= 0 and price_total_swing >= 0:
price_trend = .5
price_trend_factor = .55
elif price_recent_swing >= 0 and price_total_swing <= 0:
price_trend = -.5
price_trend_factor = .45
self.Log("Trend: " + str(price_trend))
self.Plot("trend", "price_trend", price_trend * 100)
rsi_trend_info = get_trend(self.RSIS_rolling_windows[symbol], self.trend_order, self.K_order)
rsi_trend = rsi_trend_info[0]
rsi_recent_swing = rsi_trend_info[1]
rsi_total_swing = rsi_trend_info[2]
if rsi_recent_swing <= 0 and rsi_total_swing <= 0:
rsi_trend = -1
elif rsi_recent_swing >= 0 and rsi_total_swing >= 0:
rsi_trend = 1
elif rsi_recent_swing <= 0 and rsi_total_swing >= 0:
rsi_trend = .5
elif rsi_recent_swing >= 0 and rsi_total_swing <= 0:
rsi_trend = -.5
self.plot("trend", "rsi_trendd", rsi_trend * 100)
rsi_score = self.get_rsi_score(price_trend, rsi_trend)
self.Plot("rsi score", "score", rsi_score)
# get a candlestick score
candlestick_score = get_score(self.rollingWindows[symbol], self.candles_history_size, price_trend)
if symbol not in self.candleStickScores:
self.candleStickScores[symbol] = []
self.candleStickScores[symbol].append((self.time, candlestick_score))
else:
self.candleStickScores[symbol].append((self.time, candlestick_score))
# clear out old scores
for x in self.candleStickScores[symbol]:
if self.time - x[0] > timedelta(days=self.clear_old_scores_days):
self.candleStickScores[symbol].remove(x)
cumulative_candlestick_score = np.sum(x[1] for x in self.candleStickScores[symbol])
cumulative_candlestick_score = (cumulative_candlestick_score - self.candlestick_score_info[0]) / (self.candlestick_score_info[1] - self.candlestick_score_info[0])
self.Plot("candlestick score", "score", cumulative_candlestick_score)
self.Plot("price", "price", data[symbol].Close)
if cumulative_candlestick_score != 0:
self.candlestick_scores.append(cumulative_candlestick_score)
# get a macd score between 0 and 1
macd_score = self.get_macd_score(symbol, price_trend)
self.Log("macd score after normalization: " + str(macd_score))
if macd_score != 0:
self.macd_scores.append(macd_score)
self.Plot("macd score", "score", macd_score)
bollinger_score = float(self.get_bollinger_score(symbol, price_trend))
self.Plot("bollinger score", "score", bollinger_score)
self.finalScores[symbol] = self.ccs * cumulative_candlestick_score + self.mcd * macd_score + self.bb * bollinger_score + self.rs * rsi_score + self.ptf * price_trend_factor
self.final_scores.append(self.finalScores[symbol])
self.Log("Final score: " + str(self.finalScores[symbol]))
self.Plot("final score", "score", self.finalScores[symbol])
if self.time.hour == 10 and self.time.minute == 0:
self.log_score_details()
self.BuyAndSell()
def BuyAndSell(self):
score_total = 0
for symbol in self.finalScores.keys():
if self.finalScores[symbol] >= self.min_buy_score:
score_total += self.finalScores[symbol]
if self.finalScores[symbol] <= self.max_sell_score:
self.Liquidate(symbol)
self.Log("Selling " + str(symbol) + " with score: " + str(self.finalScores[symbol]))
self.investedScores.pop(symbol)
for symbol in self.finalScores.keys():
if score_total != 0:
if self.finalScores[symbol] >= self.min_buy_score:
self.SetHoldings(symbol, self.finalScores[symbol]/score_total)
self.Log("Buying " + str(symbol) + " with score: " + str(self.finalScores[symbol]))
self.investedScores[symbol] = self.finalScores[symbol]
def get_rsi_score(self, price_trend, rsi_trend):
if price_trend == 1:
if rsi_trend == 1:
return .8
elif rsi_trend == -1:
return .45
elif rsi_trend == .5:
return .66
elif rsi_trend == -.5:
return .5
elif price_trend == -1:
if rsi_trend == 1:
return .55
elif rsi_trend == -1:
return .25
elif rsi_trend == .5:
return .45
elif rsi_trend == -.5:
return .35
elif price_trend == .5:
if rsi_trend == 1:
return .66
elif rsi_trend == -1:
return .35
elif rsi_trend == .5:
return .55
elif rsi_trend == -.5:
return .45
elif price_trend == -.5:
if rsi_trend == 1:
return .6
elif rsi_trend == -1:
return .35
elif rsi_trend == .5:
return .55
elif rsi_trend == -.5:
return .45
return 0
def get_macd_score(self, symbol, trend):
self.Log("macd: " + str(self.MACDS[symbol].Current.Value))
self.Plot("real_macd: ", "macd", self.MACDS[symbol].Current.Value)
macds = [x.macd for x in self.MACDS_rolling_windows[symbol]]
is_above = None
if trend == 1:
# check for a cross downward
for i in range(len(macds) - 1):
if macds[i] >= 0:
if is_above == None:
is_above = True
elif is_above == False:
return .65
elif macds[i] < 0:
if is_above == None:
is_above = False
elif is_above == True:
return .45
elif trend == -1:
for i in range(len(macds) - 1):
if macds[i] >= 0:
if is_above == None:
is_above = True
elif is_above == False:
return .65
elif macds[i] < 0:
if is_above == None:
is_above = False
elif is_above == True:
return .45
elif trend == -.5:
for i in range(len(macds) -1):
if macds[i] >= 0:
if is_above == None:
is_above ==True
elif is_above == False:
return .65
elif macds[i] < 0:
if is_above == None:
is_above = False
elif is_above == True:
return .45
elif trend == .5:
for i in range(len(macds) -1):
if macds[i] >= 0:
if is_above == None:
is_above = True
elif is_above == False:
return .65
elif macds[i] < 0:
if is_above == None:
is_above = False
elif is_above == True:
return .45
return self.MACDS[symbol].Current.Value/5 + .5
def get_bollinger_score(self, symbol, trend):
lowers = [x.lower for x in self.Bollingers_rolling_windows[symbol]]
middles = [x.middle for x in self.Bollingers_rolling_windows[symbol]]
uppers = [x.upper for x in self.Bollingers_rolling_windows[symbol]]
prices = [x for x in self.trend_rolling_windows[symbol]]
self.Log("length of prices: " + str(len(prices)))
prices = prices[:self.Bollinger_window_size]
self.Log("Length of prices after slicing: " + str(len(prices)))
above_upper = 0
middle_upper = 0
lower_middle = 0
below_lower = 0
for i in range(len(lowers)):
low = lowers[i]
middle = middles[i]
high = uppers[i]
price = prices[i]
if price >= high:
above_upper += 1
elif price >= middle:
middle_upper += 1
elif price >= low:
lower_middle += 1
else:
below_lower += 1
self.Log("above_upper: " + str(above_upper))
self.Log("middle_upper: " + str(middle_upper))
self.Log("lower_middle: " + str(lower_middle))
self.Log("below_lower: " + str(below_lower))
if trend == 1:
self.Log("trend is 1, calculating bollinger score")
score = middle_upper + .66 * above_upper + .33 * lower_middle + 0
score = score / len(prices)
elif trend == -1:
self.Log("trend is -1, calculating bollinger score")
score = .33 * below_lower + .66 * middle_upper + above_upper + 0
score = score/ len(prices)
elif trend == .5:
score = 0 + .33 * lower_middle + .5 * middle_upper + .66 * above_upper
score = score / len(prices)
elif trend == -.5:
score = middle_upper * .85 + .55 * middle_upper + .33 * lower_middle + 0
score = score / len(prices)
self.Log("Bollinger score: " + str(score))
return score
def plot_indicators(self, symbol, data):
#self.Plot("macd", "fast", self.MACDS[symbol].Fast.Current.Value)
#self.Plot("macd", "slow", self.MACDS[symbol].Slow.Current.Value)
#self.Plot("macd", "signal", self.MACDS[symbol].Signal.Current.Value)
self.Plot("macd", "macd", self.MACDS[symbol].Current.Value)
self.Plot("smas", "50", self.SMAS50[symbol].Current.Value)
self.Plot("smas", "200", self.SMAS200[symbol].Current.Value)
self.Plot("bollinger", "lower", self.Bollingers[symbol].LowerBand.Current.Value)
self.Plot("bollinger", "middle", self.Bollingers[symbol].MiddleBand.Current.Value)
self.Plot("bollinger", "upper", self.Bollingers[symbol].UpperBand.Current.Value)
self.Plot("bollinger", "price", data[symbol].Close)
def log_score_details(self):
# log macd average, min, max
if len(self.macd_scores) > 0:
self.Log("MACD average: " + str(np.mean(self.macd_scores)))
self.Log("MACD min: " + str(np.min(self.macd_scores)))
self.Log("MACD max: " + str(np.max(self.macd_scores)))
self.Log("MACD median: " + str(np.median(self.macd_scores)))
self.Log("MACD std: " + str(np.std(self.macd_scores)))
# log candlestick average, min, max
if len(self.candlestick_scores) > 0:
self.Log("Candlestick average: " + str(np.mean(self.candlestick_scores)))
self.Log("Candlestick min: " + str(np.min(self.candlestick_scores)))
self.Log("Candlestick max: " + str(np.max(self.candlestick_scores)))
self.Log("Candlestick median: " + str(np.median(self.candlestick_scores)))
self.Log("Candlestick std: " + str(np.std(self.candlestick_scores)))
# log final average, min, max
self.Log("Final average: " + str(np.mean(self.final_scores)))
self.Log("Final min: " + str(np.min(self.final_scores)))
self.Log("Final max: " + str(np.max(self.final_scores)))
self.Log("Final median: " + str(np.median(self.final_scores)))
self.Log("Final std: " + str(np.std(self.final_scores)))
#region imports
from AlgorithmImports import *
#endregion
def get_targets(invested_scores, new_scores, uninvested_capital, invested_capital):
return None
#region imports
from AlgorithmImports import *
#endregion
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy.signal import argrelextrema
from collections import deque
from matplotlib.lines import Line2D
from datetime import timedelta
'''
Much of this code is sourced at the following link: https://raposa.trade/blog/higher-highs-lower-lows-and-calculating-price-trends-in-python/
'''
def getHigherLows(data: np.array, order, K):
'''
Finds consecutive higher lows in price pattern.
Must not be exceeded within the number of periods indicated by the width
parameter for the value to be confirmed.
K determines how many consecutive lows need to be higher.
'''
# Get lows
low_idx = argrelextrema(data, np.less, order=order)[0]
lows = data[low_idx]
# Ensure consecutive lows are higher than previous lows
extrema = []
ex_deque = deque(maxlen=K)
for i, idx in enumerate(low_idx):
if i == 0:
ex_deque.append(idx)
continue
if lows[i] < lows[i-1]:
ex_deque.clear()
ex_deque.append(idx)
if len(ex_deque) == K:
extrema.append(ex_deque.copy())
return extrema
def getLowerHighs(data: np.array, order=5, K=2):
'''
Finds consecutive lower highs in price pattern.
Must not be exceeded within the number of periods indicated by the width
parameter for the value to be confirmed.
K determines how many consecutive highs need to be lower.
'''
# Get highs
high_idx = argrelextrema(data, np.greater, order=order)[0]
highs = data[high_idx]
# Ensure consecutive highs are lower than previous highs
extrema = []
ex_deque = deque(maxlen=K)
for i, idx in enumerate(high_idx):
if i == 0:
ex_deque.append(idx)
continue
if highs[i] > highs[i-1]:
ex_deque.clear()
ex_deque.append(idx)
if len(ex_deque) == K:
extrema.append(ex_deque.copy())
return extrema
def getHigherHighs(data: np.array, order, K):
'''
Finds consecutive higher highs in price pattern.
Must not be exceeded within the number of periods indicated by the width
parameter for the value to be confirmed.
K determines how many consecutive highs need to be higher.
'''
# Get highs
high_idx = argrelextrema(data, np.greater, order = order)[0]
highs = data[high_idx]
# Ensure consecutive highs are higher than previous highs
extrema = []
ex_deque = deque(maxlen=K)
for i, idx in enumerate(high_idx):
if i == 0:
ex_deque.append(idx)
continue
if highs[i] < highs[i-1]:
ex_deque.clear()
ex_deque.append(idx)
if len(ex_deque) == K:
extrema.append(ex_deque.copy())
return extrema
def getLowerLows(data: np.array, order, K):
'''
Finds consecutive lower lows in price pattern.
Must not be exceeded within the number of periods indicated by the width
parameter for the value to be confirmed.
K determines how many consecutive lows need to be lower.
'''
# Get lows
low_idx = argrelextrema(data, np.less, order=order)[0]
lows = data[low_idx]
# Ensure consecutive lows are lower than previous lows
extrema = []
ex_deque = deque(maxlen=K)
for i, idx in enumerate(low_idx):
if i == 0:
ex_deque.append(idx)
continue
if lows[i] > lows[i-1]:
ex_deque.clear()
ex_deque.append(idx)
if len(ex_deque) == K:
extrema.append(ex_deque.copy())
return extrema
def get_trend(close_data, order, K):
'''
Get the trend of the stock
'''
close_data = [x for x in close_data]
close_data.reverse()
# data set to dataframe empty
data = pd.DataFrame()
data['Close'] = close_data
close = data['Close'].values
hh = getHigherHighs(close, order, K)
hl = getHigherLows(close, order, K)
ll = getLowerLows(close, order, K)
lh = getLowerHighs(close, order, K)
# format for tuples inside patterns: [type, location first price, location second price, first price, second price]
patterns = []
for pattern in hh:
# append a tuple with date and "hh"
patterns.append(('hh', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]]))
for pattern in hl:
patterns.append(('hl', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]]))
for pattern in ll:
patterns.append(('ll', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]]))
for pattern in lh:
patterns.append(('lh', pattern[0], pattern[1], close[pattern[0]], close[pattern[1]]))
# sort by the second date
patterns.sort(key=lambda x: x[2], reverse=True)
trend = 0
recent_movements_length = 3
recent_movements = patterns[:recent_movements_length]
recent_swing_up = 0
recent_swing_down = 0
for x in recent_movements:
if x[0] == 'hh' or x[0] == 'hl':
recent_swing_up += (x[4] - x[3])
print("hh or hl, adding: ", (x[4] - x[3]))
else:
recent_swing_down += (x[4] - x[3])
print("ll or lh, adding: ", (x[4] - x[3]))
recent_swing = recent_swing_up + recent_swing_down
print("recent_swing: ", recent_swing)
total_movements_length = 10
total_movements = patterns[:total_movements_length]
total_swing_up = 0
total_swing_down = 0
for x in total_movements:
if x[0] == 'hh' or x[0] == 'hl':
total_swing_up += (x[4] - x[3])
else:
total_swing_down += (x[4] - x[3])
total_swing = total_swing_up + total_swing_down
print("total_swing: ", total_swing)
return (trend, recent_swing, total_swing)