| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio 0 Tracking Error 0 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset |
from clr import AddReference
AddReference("QuantConnect.Research")
#clr.AddReference('QuantConnect.Research')
from QuantConnect.Research import QuantBook
import statistics
import pandas as pd
import numpy as np
import math
import time as tm
from sklearn.linear_model import LinearRegression
import statsmodels.formula.api as smf
import statsmodels.api as sm
import copy
from scipy.stats import zscore
import funcs
class TachyonMultidimensionalChamber(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2021, 10, 19) # Set Start Date
self.SetEndDate(2021, 10, 19) # Set End Date
self.symbol = "" #set this to "" and it will do all symbols
self.SetCash(400000) # Set Strategy Cash
self.AddUniverse(self.CoarseSelectionFunction)
self.SetSecurityInitializer(self.SecurityInitializer)
self.UniverseSettings.ExtendedMarketHours = True
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.UniverseSettings.Leverage = 4
self.UniverseSettings.Resolution = Resolution.Second #can comment/change this out
#self.SetWarmUp(5)
for s in self.Securities:
self.Debug(self.Securities[s])
###variables to keep track of
self.sd = {} #all symbol data
self.initVars()
def initVars(self):
self.pastNoPre = {}
self.moves = {} #this has both PM and RH moves
self.ranges = {}
self.curr_ranges = {} #ranges for a certain day, for updateRangesMultiple()
self.pastSixty = {}
self.models = {}
#pm
self.pmHigh = {}
self.pmVol = {}
self.pmNonZeroVol = {}
self.everBelowOneDollar = set()
self.PM_atSelection = {}
#
self.prevVWMA = {}
self.prevVWAP = {}
self.end = tm.time()
self.start = tm.time()
self.dailyLevels = {}
self.hod = {}
self.lod = {}
self.hod_level = {} #hod and lod for calculating daily range
self.lod_level = {}
self.open = {}
self.minutes = {} #n minutes of data for each symbol
self.everPosition = {}
self.buffer = 0 #only enter trades after N buffer after open (prob just 1 minute)
#error reports
self.max_arg_empty_seq = set()
self.calc_EMA_error = set()
self.VWMA_vol_zero = set()
self.EMAwPMerror = set()
self.EMAerrorLong = set()
self.EMAerrorShort = set()
self.EODmodelErr = set()
self.avgdevoob = set()
self.notInPMHigh = set()
#prevPrices, for EMA calc -- if they are the same, don't need to recalculate EMA
self.prevPrices = {}
self.prevPrices["ab"] = {}
self.prevPrices["ab_wPM"] = {}
self.prevPrices["bc"] = {}
self.prevPrices["cd"] = {}
self.prevPrices["postbreak"] = {}
self.prevPrices["ab_inpos"] = {}
self.prevStartPeriod = {}
self.prevStartPeriod["ab"] = {}
self.prevStartPeriod["ab_wPM"] = {}
self.prevStartPeriod["bc"] = {}
self.prevStartPeriod["cd"] = {}
self.prevStartPeriod["postbreak"] = {}
self.prevStartPeriod["ab_inpos"] = {}
#EOD model things
self.EODmodels = {}
self.EODmodels_entry = {}
#times (just regular hours), debugging
self.times = {}
self.s_times = {}
#end day early
self.stop_trading = 0
self.stop_entering = 0
#check these on shorter tf than 1 minute
self.staging = {}
def SecurityInitializer(self, security):
security.SetLeverage(4)
def CoarseSelectionFunction(self, universe):
selected = []
blacklist = set()
#blacklist = set(["IBM", "WMT", "IST", "MNDL", "FCAU", "TEVIY", "P", "HSGX", "CLNP", "PSTH"])
#PSTH takes so long for some reason
for coarse in universe:
#if coarse.Volume > 5000000 and coarse.Value > 1 and coarse.HasFundamentalData:
if self.symbol != "":
if coarse.Symbol.Value == self.symbol:
symbol = coarse.Symbol
selected.append(symbol)
elif coarse.Volume > 20000000 and coarse.Value > .90 and coarse.HasFundamentalData:
if coarse.Symbol.Value not in blacklist:
symbol = coarse.Symbol
selected.append(symbol)
return selected #list of objects of type Symbol
def OnSecuritiesChanged(self, changed):
for security in changed.AddedSecurities:
symbol = security.Symbol
if symbol not in self.sd:
self.sd[symbol] = SymbolData(self, symbol)
for security in changed.RemovedSecurities:
symbol = security.Symbol
self.sd.pop(symbol, None)
###reset these every day
def OnEndOfDay(self):
for s in self.sd:
self.sd[s].vwap.Reset()
self.initVars()
################
# functions #
################
#green or doji --> green
def red_or_green(o, h, l, c):
if c >= o:
return "g"
elif c < o:
return "r"
#add to "candle_halves", every half of candle
def updateCandleHalves(self, s, mins, opened, high, low, close, vol, typeOfCandle, t):
#new point every 20 seconds
#change between earlier high or low depending on if candle red or green
#init new move
if "candle_halves" not in self.moves[s][t] or typeOfCandle == "low":
self.moves[s][t]["candle_halves"] = {}
self.moves[s][t]["candle_halves"]["Price"] = []
self.moves[s][t]["candle_halves"]["Volume"] = []
self.moves[s][t]["candle_halves"]["Time"] = []
self.moves[s][t]["candle_halves"]["Time_wall"] = []
#add first candle, start at low
self.moves[s][t]["candle_halves"]["Price"].append(low)
self.moves[s][t]["candle_halves"]["Volume"].append(vol / 3.0)
self.moves[s][t]["candle_halves"]["Time_wall"].append(str(self.Time).split()[1])
if close >= opened:
self.moves[s][t]["candle_halves"]["Price"].append(high)
self.moves[s][t]["candle_halves"]["Volume"].append(vol / 3.0)
self.moves[s][t]["candle_halves"]["Time"].append(mins - .667)
self.moves[s][t]["candle_halves"]["Time_wall"].append(str(self.Time).split()[1])
self.moves[s][t]["candle_halves"]["Time"].append(mins - .333)
#always add the close
self.moves[s][t]["candle_halves"]["Price"].append(close)
self.moves[s][t]["candle_halves"]["Volume"].append(vol / 6.0)
self.moves[s][t]["candle_halves"]["Time"].append(mins)
self.moves[s][t]["candle_halves"]["Time_wall"].append(str(self.Time).split()[1])
else:
#add new 1/3 candles
#add 1/6 of volume to old close
#1/3 to low, 1/3 to high
#1/6 to current close
self.moves[s][t]["candle_halves"]["Price"][-1] = (self.moves[s][t]["candle_halves"]["Price"][-1] + opened) / 2.0
self.moves[s][t]["candle_halves"]["Volume"][-1] += vol / 6.0
if close >= opened:
self.moves[s][t]["candle_halves"]["Price"].append(low)
self.moves[s][t]["candle_halves"]["Price"].append(high)
elif close < opened:
self.moves[s][t]["candle_halves"]["Price"].append(high)
self.moves[s][t]["candle_halves"]["Price"].append(low)
#these are always the same
self.moves[s][t]["candle_halves"]["Price"].append(close)
self.moves[s][t]["candle_halves"]["Volume"].append(vol / 3.0)
self.moves[s][t]["candle_halves"]["Volume"].append(vol / 3.0)
self.moves[s][t]["candle_halves"]["Volume"].append(vol / 6.0)
self.moves[s][t]["candle_halves"]["Time"].append(mins - .667)
self.moves[s][t]["candle_halves"]["Time"].append(mins - .333)
self.moves[s][t]["candle_halves"]["Time"].append(mins)
self.moves[s][t]["candle_halves"]["Time_wall"].append(str(self.Time).split()[1])
self.moves[s][t]["candle_halves"]["Time_wall"].append(str(self.Time).split()[1])
self.moves[s][t]["candle_halves"]["Time_wall"].append(str(self.Time).split()[1])
return 0
#get high and low of move, volume
#and convert candles to 1/4 candles
def updateMoves(self, s, opened, high, low, close, vol, t):
#t = time = "PM" or "RH"
#init for either t
if s not in self.moves:
self.moves[s] = {}
##############################
# ignition and consolidation #
##############################
mins = self.timeInMin(str(self.Time).split()[1])
#new low, reset everything
if (t in self.moves[s] and low < self.moves[s][t]["low"]) or t not in self.moves[s]:
#update HOD
#it's the highest high of all previous moves
##so, can only exist if there is a new move
if t == "RH" and t in self.moves[s]:
#if s not in self.hod_level or self.moves[s]["RH"]["high"] > self.hod_level[s]:
if s not in self.hod_level or self.hod[s] > self.hod_level[s]:
#self.Debug("update hod_level")
#self.Debug(self.Time)
#self.Debug(self.moves[s]["RH"]["high"])
#self.hod_level[s] = self.moves[s]["RH"]["high"]
self.hod_level[s] = self.hod[s]
#update move
self.moves[s][t] = {}
self.moves[s][t]["low"] = low
self.moves[s][t]["consol_low"] = "NA"
#self.moves[s][t]["ign_vol"] = vol
#self.moves[s][t]["consol_vol"] = 0
self.moves[s][t]["candles"] = {} #reset all candles
#half candles
self.updateCandleHalves(s, mins, opened, high, low, close, vol, "low", t)
#if first candle is red and is max of move, then high won't exist in candle_halves
self.moves[s][t]["high"] = max(self.moves[s][t]["candle_halves"]["Price"])
#if self.Time.hour == 9 and 30 < self.Time.minute < 40:
# self.Debug(self.Time)
# self.Debug(self.moves[s][t]["candle_halves"]["Price"])
#new high (post new low)
elif high > self.moves[s][t]["high"]:
self.moves[s][t]["high"] = high
#self.moves[s][t]["ign_vol"] += vol + self.moves[s][t]["consol_vol"]
self.moves[s][t]["consol_low"] = "NA"
#self.moves[s][t]["consol_vol"] = 0
#half candles
self.updateCandleHalves(s, mins, opened, high, low, close, vol, "high", t)
#consolidation (no new high)
else:
#self.moves[s][t]["consol_vol"] += vol
#new low of consolidation
if self.moves[s][t]["consol_low"] == "NA" or low < self.moves[s][t]["consol_low"]:
self.moves[s][t]["consol_low"] = low
self.updateCandleHalves(s, mins, opened, high, low, close, vol, "consol_low", t)
#always add new candle
if "o" not in self.moves[s][t]["candles"]:
self.moves[s][t]["candles"]["o"] = []
self.moves[s][t]["candles"]["h"] = []
self.moves[s][t]["candles"]["l"] = []
self.moves[s][t]["candles"]["c"] = []
self.moves[s][t]["candles"]["v"] = []
self.moves[s][t]["candles"]["dv"] = []
self.moves[s][t]["candles"]["o"].append(opened)
self.moves[s][t]["candles"]["h"].append(high)
self.moves[s][t]["candles"]["l"].append(low)
self.moves[s][t]["candles"]["c"].append(close)
self.moves[s][t]["candles"]["v"].append(vol)
self.moves[s][t]["candles"]["dv"].append(vol*close)
return 0
def enter(self, s, close):
##############
# above VWAP #
##############
vwap = self.sd[s].vwap.Current.Value
if not (close > vwap): return #above VWAP
#doesn't need to break VWAP, just be above it
######################
# resolution too low #
######################
if not (len(self.moves[s]["RH"]["candles"]["o"]) >= 4): return
###################################
# enough volume, or dollar volume #
###################################
if not (statistics.median(self.moves[s]["RH"]["candles"]["v"]) > 10000 or statistics.median(self.moves[s]["RH"]["candles"]["dv"]) > 1000000): return
##################
# not just cents #
##################
if not ((self.moves[s]["RH"]["high"] - self.moves[s]["RH"]["low"]) >= .10): return
#########
# print #
#########
self.Debug("LONG\t\t" + str(self.Time).split()[1] + "\t" + str(s.Value))
return
#time is premarket or not
#time of format: HH:MM:SS
def isPM(self, clock):
hour, minute, sec = clock.split(":")
if int(hour) < 9:
return 1
if int(hour) == 9 and int(minute) < 31:
return 1
return 0
def isAH(self, clock):
hour, minute, sec = clock.split(":")
if int(hour) > 16 or (int(hour) == 16 and int(minute) > 0):
return 1
return 0
#time in minutes (since midnight)
def timeInMin(self, clock):
hour, minute, sec = clock.split(":")
return (int(hour)*60) + int(minute)
###########
# on data #
###########
def OnData(self, data):
'''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
Arguments:
data: Slice object keyed by symbol containing the stock data
'''
#stop trading at whatever time e.g., noon
#then, after market closes, allow trading again for the next day
if self.stop_trading != 0:
return
if self.Time.hour == 4 and self.Time.minute == 1 and self.Time.second == 1:
self.initVars()
self.Debug("Number of symbols: ")
self.Debug(len(self.sd))
#every minute:
#check if below 1, if so remove
#only do this in PM (?)
#reset list of symbols to stage
if self.Time.second == 1:
for s in self.everBelowOneDollar:
if s in self.ranges:
del self.ranges[s]
if s in self.sd:
del self.sd[s]
self.staging = {}
if (self.Time.hour >= 10) or (self.Time.hour == 9 and self.Time.minute > 30):
self.Debug("Direction\tTime\t\tSymbol\tPropPredMove\tPredPrice\tMed_1min_vol\tGRP\tPredRange")
#remove stocks without premarket action
if self.Time.hour == 9 and self.Time.minute == 11 and self.Time.second == 1:
self.Debug("stocks before pm removal: " + str(len(self.sd)))
toremove = set()
for s in self.sd:
if s in self.pmVol and s in self.pmNonZeroVol:
if self.pmVol[s] < 5000 or self.pmNonZeroVol[s] < 25: #dollar volume > $100000?
toremove.add(s)
elif s not in self.pmHigh:
toremove.add(s)
for s in toremove:
if s in self.sd:
del self.sd[s]
self.Debug("stocks after pm removal: " + str(len(self.sd)))
for s in self.sd:
if s in self.pmVol and s in self.pmNonZeroVol:
#self.Debug(str(s))
#self.Debug(self.pmVol[s])
#self.Debug(self.pmNonZeroVol[s])
self.PM_atSelection[s] = {}
self.PM_atSelection[s]["pmvol"] = self.pmVol[s]
self.PM_atSelection[s]["pm_nonzero"] = self.pmNonZeroVol[s]
for s in self.sd:
start = tm.time()
if data.ContainsKey(s) and data.Bars.ContainsKey(s) and self.Time.hour < 16:
second = data.Bars
if s not in self.pastSixty:
self.pastSixty[s] = {}
self.pastSixty[s]["o"] = second[s].Open
self.pastSixty[s]["h"] = second[s].High
self.pastSixty[s]["l"] = second[s].Low
self.pastSixty[s]["v"] = second[s].Volume
else:
#update
if second[s].Open > self.pastSixty[s]["h"]:
self.pastSixty[s]["h"] = second[s].High
if second[s].Low < self.pastSixty[s]["l"]:
self.pastSixty[s]["l"] = second[s].Low
self.pastSixty[s]["v"] += second[s].Volume
if self.Time.second == 1: #should this be 1 actually?
#add 1 more minute for this symbol
if s not in self.minutes:
self.minutes[s] = 1
else:
self.minutes[s] += 1
#if s in self.positions:
# self.Debug(self.Portfolio[s].Quantity)
############################################################
# store ohlcv for past self.keepPast candles, with premarket #
############################################################
###consolidate past 60 seconds
opened = self.pastSixty[s]["o"]
high = self.pastSixty[s]["h"]
low = self.pastSixty[s]["l"]
close = second[s].Close
vol = self.pastSixty[s]["v"]
currRange = self.pastSixty[s]["h"] - self.pastSixty[s]["l"]
#re-init with current second
self.pastSixty[s]["o"] = second[s].Open
self.pastSixty[s]["h"] = second[s].High
self.pastSixty[s]["l"] = second[s].Low
self.pastSixty[s]["v"] = second[s].Volume
#save pmHigh
#put this in a function
if not self.IsMarketOpen(s):
if s not in self.pmHigh:
self.pmHigh[s] = high
self.pmVol[s] = vol
self.pmNonZeroVol[s] = 0
if high > self.pmHigh[s]:
self.pmHigh[s] = high
self.pmVol[s] += vol
if vol != 0:
self.pmNonZeroVol[s] += 1
if low < 1:
self.everBelowOneDollar.add(s)
#update moves
self.updateMoves(s, opened, high, low, close, vol, "PM")
#market is open now
if self.IsMarketOpen(s) and self.buffer == 1 and self.stop_entering == 0:
#if self.Time.hour < 10 and self.Time.minute < 40:
# self.Debug(self.Time)
#9:31:00 is first
if s not in self.open:
self.open[s] = opened
##########################################################
# save ohlcv of intraday candles (save range separately) #
##########################################################
if s not in self.pastNoPre:
self.pastNoPre[s] = {}
self.pastNoPre[s]["o"] = []
self.pastNoPre[s]["h"] = []
self.pastNoPre[s]["l"] = []
self.pastNoPre[s]["c"] = []
self.pastNoPre[s]["v"] = []
self.pastNoPre[s]["o"].append(opened)
self.pastNoPre[s]["h"].append(high)
self.pastNoPre[s]["l"].append(low)
self.pastNoPre[s]["c"].append(close)
self.pastNoPre[s]["v"].append(vol)
#if s in self.hod_level:
# self.Debug(self.Time)
# self.Debug(self.hod_level[s])
################################
# price, volume, time of moves #
# ranges #
# levels #
# EMAs (trend) #
################################
#if self.Time.hour == 9 and 29 < self.Time.minute < 40:
# self.Debug(self.Time)
# self.Debug(low)
self.updateMoves(s, opened, high, low, close, vol, "RH")
#######################
# HOD and LOD, for range calc #
# these are not necessarily levels
# literally just HOD and LOD
#######################
if s not in self.hod or (s in self.hod and high > self.hod[s]):
self.hod[s] = high
if s not in self.lod or (s in self.lod and low < self.lod[s]):
self.lod[s] = low
#########
# entry #
#########
self.enter(s, close)
#if self.Time.hour == 10 and self.Time.minute == 0:
# funcs.minuteHistory(self, s)
###################
# previous values #
###################
self.prevVWAP[s] = self.sd[s].vwap.Current.Value
##############################
# debugging #
# print things out for test #
##############################
#if self.Time.hour == 10 and self.Time.minute == 0:
# self.printCandleHalves(s)
#if self.Time.hour == 11 and self.Time.minute == 19:
# if s in self.positions:
# self.printCandleHalvesCurr(s)
#count total time for each ticker
end = tm.time()
if s not in self.times:
self.times[s] = 0
self.times[s] += (end - start)
if ((self.Time.hour == 10 and self.Time.minute >= 0) or self.Time.hour >= 11) and self.stop_entering != 1:
#if self.Time.hour >= 12 and self.stop_entering != 1:
self.stop_entering = 1
if ((self.Time.hour == 10 and self.Time.minute >= 0) or self.Time.hour >= 11) and self.stop_trading == 0:
#if self.Time.hour >= 12 and self.stop_trading == 0 and not self.positions:
self.Debug("Stopping, 10:00 stop")
self.Debug("max() arg is empty sequence:")
for s in self.max_arg_empty_seq:
self.Debug(s.Value)
self.Debug("Volume in VWMA is 0:")
for s in self.VWMA_vol_zero:
self.Debug(s.Value)
self.Debug("calc EMA error:")
for s in self.calc_EMA_error:
self.Debug(s.Value)
self.Debug("calc EMA wPM error:")
for s in self.EMAwPMerror:
self.Debug(s.Value)
self.Debug("calc EMA long error:")
for s in self.EMAerrorLong:
self.Debug(s.Value)
self.Debug("EOD model:")
for s in self.EODmodelErr:
self.Debug(str(s.Value))
self.Debug("avg dev ign ind oob:")
for s in self.avgdevoob:
self.Debug(str(s.Value))
profit_s = {}
for s in self.everPosition:
profit_s[self.Portfolio[s].NetProfit] = s
for profit in sorted(profit_s):
self.Debug("net profit " + profit_s[profit].Value + ": " + str(profit))
#end the day
self.initVars()
self.stop_trading = 1
#close all at 3:55 if any positions are open
#should change this to time to market close
#use BeforeMarketClose()
if self.Time.hour >= 15 and self.Time.minute >= 55:
self.Debug("Stopping, 3:55 stop")
self.Liquidate()
#first minute has passed
if self.sd and self.IsMarketOpen(s) and self.buffer == 0 and self.Time.minute == 31:
#if self.Time.hour == 9 and self.Time.minute == 31 and self.buffer == 0:
self.buffer = 1
class SymbolData:
def __init__(self, algorithm, symbol):
self.vwap = algorithm.VWAP(symbol, 2000, Resolution.Minute) #60*24 = 1440 minutes in a day
prehist = algorithm.History(symbol, 10, Resolution.Minute)
if not prehist.empty:
hist = prehist.loc[symbol]
if 'volume' not in prehist.columns:
algorithm.Log(f"No volume: {symbol}")
return
for idx, bar in hist.iterrows():
tradeBar = TradeBar(idx, symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, timedelta(minutes=1))
self.vwap.Update(tradeBar)
import statistics
from sklearn.linear_model import LinearRegression
from scipy.stats import norm
import time as tm
import statsmodels.formula.api as smf
import talib
import pandas
def green_red_prop_any(self, opened, high, low, close):
green = 0
red = 0
#doji
if opened == close:
return .50
#green
elif close > opened:
green += abs(close - opened)
#wicks
green += abs(high - close) + abs(low - opened)
red += abs(high - close) + abs(low - opened)
#red
elif close < opened:
red += abs(close - opened)
#wicks
green += abs(high - opened) + abs(low - close)
red += abs(high - opened) + abs(low - close)
if (green + red) != 0:
return green / (green + red)
else:
return "NA"
#for the candle that is high of ign, exclude/include wicks depending on if red or green candle
def red_volume_high_of_ign(self, opened, high, low, close, volume):
green = 0
red = 0
#doji
if opened == close:
return .50*volume
#green
elif close > opened:
green += abs(close - opened)
#wicks
green += abs(high - close)
red += abs(high - close) + abs(low - opened)
#red volume
return (abs(close - high) / (green + red))*volume
#red
elif close < opened:
red += abs(close - opened)
#wicks
green += abs(high - opened) + abs(low - close)
red += abs(high - opened) + abs(low - close)
#red volume
return ( (green + red - abs(high - opened)) / (green + red))*volume
else:
return "NA"
#for the candle that is high of ign, exclude/include wicks depending on if red or green candle
def green_volume_low_of_ign(self, opened, high, low, close, volume):
green = 0
red = 0
#doji
if opened == close:
return .50*volume
#green
elif close > opened:
green += abs(close - opened)
#wicks
green += abs(high - close)
red += abs(high - close) + abs(low - opened)
#green volume
return ( (green + red - abs(high - opened)) / (green + red))*volume
#red
elif close < opened:
red += abs(close - opened)
#wicks
green += abs(high - opened) + abs(low - close)
red += abs(high - opened) + abs(low - close)
#green volume
return (abs(close - low) / (green + red))*volume
else:
return "NA"
def angles(self, s, close):
ab = (self.moves[s]["high"] - self.moves[s]["low"]) / self.moves[s]["ign_time"]
bd = (self.moves[s]["high"] - close) / (self.moves[s]["ign_time"] + self.moves[s]["consol_time"])
if abs(ab) > abs(bd):
return 1
return 0
def get_currhigh_ind(s, prices):
ind = "NA"
max_price = max(prices)
for i in range(0, len(prices)):
if prices[i] < max_price:
return i
def calc_z(self, x, mu, sd):
return (x - mu) / sd
def x_from_z(self, z, mu, sd):
return (z*sd) + mu
#index of consol low
def get_consol_low_ind(self, prices, high, curr_consol_low):
consol_low_ind = "NA"
hit_high = 0
for i in range(0, len(prices)):
if prices[i] == high:
hit_high = 1
if hit_high == 1 and prices[i] == curr_consol_low:
consol_low_ind = i
break
return consol_low_ind
#index of ignition high
def ign_high_ind(self, prices, high):
for i in range(0, len(prices)):
if prices[i] == high:
return i
#Returns a value rounded down to a specific number of decimal places.
#https://kodify.net/python/math/round-decimals/
def round_decimals_down(self, number:float, decimals:int=2):
if not isinstance(decimals, int):
raise TypeError("decimal places must be an integer")
elif decimals < 0:
raise ValueError("decimal places has to be 0 or more")
elif decimals == 0:
return math.floor(number)
factor = 10 ** decimals
return math.floor(number * factor) / factor
#running simple moving average
def calcSMA(self, vals):
count = 1.0
running_sum = 0.0
SMAs = []
for v in vals:
running_sum += v
SMAs.append(running_sum / count)
count += 1
return SMAs
#thinkorswim also includes 4 lookback periods
#the earliest are first
def calcEMA(self, s, closes, period):
currEMA = 0
EMAs = []
alpha = 2.0 / (float(period)+1.0)
for i in range(0, len(closes)):
if i == 0:
currEMA = closes[0]
else:
try:
currEMA = alpha*closes[i] + (1.0-alpha)*currEMA
except:
self.calc_EMA_error.add(s)
return 0, 0
EMAs.append(currEMA)
return currEMA, EMAs
#print out table of ranges
def print_ranges(self, s):
#self.Debug("Period\tOpen\tHigh\tLow\tClose\tVolume\tDistance\tMins_since_first\tTime\tDate")
self.Debug("Period\tClose_minus_open\tVolume\tMins_since_first") #start is Mins_since_first - Period
for date in self.ranges[s]:
for period in sorted(self.ranges[s][date]):
for i in range (0, len(self.ranges[s][date][period]["high"])):
#self.Debug(str(period) + "\t" + str(round(self.ranges[s][period]["open"][i], 2)) + "\t" + str(round(self.ranges[s][period]["high"][i], 2)) + "\t" + str(round(self.ranges[s][period]["low"][i], 2)) + "\t" + str(round(self.ranges[s][period]["close"][i], 2)) + "\t" + str(self.ranges[s][period]["vol"][i]) + "\t" + str(self.ranges[s][period]["time"][i]))
#self.Debug(str(period) + "\t" + str(round(self.ranges[s][date][period]["open"][i], 2)) + "\t" + str(round(self.ranges[s][date][period]["high"][i], 2)) + "\t" + str(round(self.ranges[s][date][period]["low"][i], 2)) + "\t" + str(round(self.ranges[s][date][period]["close"][i], 2)) + "\t" + str(self.ranges[s][date][period]["vol"][i]) + "\t" + str(round(self.ranges[s][date][period]["dist"][i], 2)) + "\t" + str(self.ranges[s][date][period]["minute"][i]) + "\t" + str(self.ranges[s][date][period]["time"][i]) + "\t" + date)
self.Debug(str(period) + "\t" + str(round(self.ranges[s][date][period]["close"][i] - self.ranges[s][date][period]["open"][i], 2)) + "\t" + str(self.ranges[s][date][period]["vol"][i]) + "\t" + str(self.ranges[s][date][period]["minute"][i]))
'''
#low of consolidation to current price, postbreak
#so, use movesCurr
def calc_consolEMA(self, s):
#get consol low to current price
consol_low_prices = []
hit_high = 0
consol_low = 2000000000
for p in self.moves[s]["RH"]["candle_halves"]["Price"]:
#consolidating, get the low
if hit_high == 1:
#if new low, reset
if p < consol_low:
consol_low = p
consol_low_prices = []
consol_low_prices.append(p)
#past high of the move
elif p == self.moves[s]["RH"]["high"]:
hit_high = 1
if len(consol_low_prices) == 0:
return "NAN"
#midpoint, and low of consolidation to high of consolidation post-low
midpoint_consol = ((max(consol_low_prices) + min(consol_low_prices)) / 2.0)
consol_low_to_high = []
for p in consol_low_prices:
consol_low_to_high.append(p)
if p == max(consol_low_prices):
break
#only need to recalc if it's going to be different
start_consol = "N"
EMA_period_consol = "A"
if s not in self.prevStartPeriod["cd"] or consol_low_to_high != self.prevPrices["cd"][s]:
start_consol, EMA_period_consol = EMA_midpoint(self, s, consol_low_to_high, midpoint_consol)
self.prevStartPeriod["cd"][s] = str(start_consol) + " " + str(EMA_period_consol)
self.prevPrices["cd"][s] = consol_low_to_high
else:
start_consol, EMA_period_consol = self.prevStartPeriod["cd"][s].split()
if not (start_consol != "N" and EMA_period_consol != "A"):
self.Debug("is NA")
return
adj_closes_EMA_consol = []
try:
adj_closes_EMA_consol = consol_low_prices[int(start_consol):]
except:
self.Debug("adj_closes_EMA_consol error")
self.Debug(s)
adj_closes_EMA_consol[0] = consol_low_prices[0]
currEMA_consol, EMAs_consol = self.calcEMA(s, adj_closes_EMA_consol, EMA_period_consol)
return currEMA_consol, EMAs_consol, start_consol
'''