| Overall Statistics |
|
Total Trades 738 Average Win 6.86% Average Loss -2.47% Compounding Annual Return 138472.576% Drawdown 30.500% Expectancy 0.421 Net Profit 2548.656% Sharpe Ratio 243.101 Probabilistic Sharpe Ratio 99.960% Loss Rate 62% Win Rate 38% Profit-Loss Ratio 2.77 Alpha 296.608 Beta -0.713 Annual Standard Deviation 1.221 Annual Variance 1.491 Information Ratio 235.359 Tracking Error 1.262 Treynor Ratio -416.002 Total Fees $75387.50 Estimated Strategy Capacity $130000000.00 Lowest Capacity Asset ES XZDYPWUWC7I9 |
# region imports
from AlgorithmImports import *
from my_ind import *
#from supportlibrary import *
#from channelLib import *
from enum import Enum
# endregion
class TradeType(Enum):
NO_TRADE = 0
LOW_PIERCE = 1
HIGH_PIERCE = 2
LOW_REBOUND = 3
HIGH_REBOUND = 4
MID_SHORT = 5
MID_LONG = 6
### <summary>
### Basic Continuous Futures Template Algorithm
### </summary>
class BasicTemplateContinuousFutureAlgorithm(QCAlgorithm):
'''Basic template algorithm simply initializes the date range and cash'''
def Initialize(self):
'''Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized.'''
self.SetCash(100000)
self.SetStartDate(2022, 1, 1)
#self.SetEndDate(2021, 1, 10)
#self.SetEndDate(2022,1,3)
#self.SetEndDate(2022,1,20)
#self.SetEndDate(2020, 6, 5)
#self.SetEndDate(2022, 3, 16)
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
# params
self.positionSize = self.GetParameter("positionSize")
self.positionSize = 0.75 if self.positionSize == None else float(self.GetParameter("positionSize"))
# 0 to disable feature
self.positionThreshold = self.GetParameter("positionThreshold")
self.positionThreshold = 0 if self.positionThreshold == None else float(self.GetParameter("positionThreshold")) # 10
self.positionPremarket = self.GetParameter("positionPremarket") # position size modifier if outside RTH
self.positionPremarket = 0.9 if self.positionPremarket == None else float(self.GetParameter("positionPremarket")) # 10
# TR based trailing stop (x times tr)
self.trailStop = True
self.stopTrail = self.GetParameter("stopTrail")
self.stopTrail = 4.5 if self.stopTrail == None else float(self.GetParameter("stopTrail")) # 10 4.5
self.minTrail = self.GetParameter("minTrail")
self.minTrail = 8 if self.minTrail == None else float(self.GetParameter("minTrail")) # 10 4.5
self.green_threshold = self.GetParameter("green_threshold")
self.green_threshold = 25 if self.green_threshold == None else float(self.GetParameter("green_threshold"))
self.allowLong = True
self.allowShort = True
self.useStops = True
self.useTP = True
self.updateStops = False
self.RTHOnly = True
self.OnlyExchangeHours = False
self.tradeMid = False
self.exitMid = False
self.blockMid = True
self.noChase = True
self.ladderStop = True
self.skipMidday = False
self.useDevStop = False
self.twapExit = False
self.skipClose = False
self.verbose = self.LiveMode # be more verbose if running live
#self.verbose = True
if self.verbose:
self.Debug("Using Verbose Logs")
else:
self.Debug("Using Non Verbose Logs")
self.strThreshold = self.GetParameter("strThreshold")
self.strThreshold = 0.5 if self.strThreshold is None else float(self.GetParameter("strThreshold")) # 0.85, 1, 0.5, 0.2
self.trLen = self.GetParameter("trLen")
self.trLen = 10 if self.trLen is None else int(self.GetParameter("trLen"))
self.trMult = self.GetParameter("trMult")
self.trMult = 1 if self.trMult is None else float(self.GetParameter("trMult")) # 4.5 2.5 1 3
self.negDiv = self.GetParameter("negDiv")
self.negDiv = 1.6 if self.negDiv is None else float(self.GetParameter("negDiv"))
self.fast_ema = self.GetParameter("fastEMA")
self.fast_ema = 9 if self.fast_ema is None else int(self.GetParameter("fastEMA")) # 34
self.slow_ema = self.GetParameter("slowEMA")
self.slow_ema = 21 if self.slow_ema is None else int(self.GetParameter("slowEMA")) # 72
self.tpMult = self.GetParameter("tpMult")
self.tpMult = 2.7 if self.tpMult is None else float(self.GetParameter("tpMult")) # 4.5 2.5 1 3
self.left = self.GetParameter("left")
self.left = 6 if self.left is None else int(self.GetParameter("left")) # 4
self.right = self.GetParameter("right")
self.right = 2 if self.right is None else int(self.GetParameter("right")) # 2
self.pivot_ema = self.GetParameter("pivot_ema")
self.pivot_ema = 15 if self.pivot_ema is None else int(self.GetParameter("pivot_ema")) # 16 30 15
self.pivot_min = self.GetParameter("pivot_min")
self.pivot_min = 15 if self.pivot_min is None else int(self.GetParameter("pivot_min")) # 18 17 20
self.trend_min = self.GetParameter("trend_min")
self.trend_min = 3 if self.trend_min is None else int(self.GetParameter("trend_min")) # 10 3 15 7
self.trade_min = self.GetParameter("trade_min")
self.trade_min = 8 if self.trade_min is None else int(self.GetParameter("trade_min")) # 10
self.side_min = self.GetParameter("side_min")
self.side_min = 3 if self.side_min is None else int(self.GetParameter("side_min")) # 10
self.dayLossCap = self.GetParameter("dayLossCap")
self.dayLossCap = 0.09 if self.dayLossCap is None else float(self.GetParameter("dayLossCap")) # 0.07 0.1 0.09
self.dayProfitCap = self.GetParameter("dayProfitCap")
self.dayProfitCap = 0.13 if self.dayProfitCap is None else float(self.GetParameter("dayProfitCap")) # 0.1
self.tradeLossCap = self.GetParameter("tradeLossCap")
self.tradeLossCap = 0.0025 if self.tradeLossCap is None else float(self.GetParameter("tradeLossCap")) #0.004 0.001 0.001 0.0016 0.001
self.tradeProfitCap = self.GetParameter("tradeProfitCap")
self.tradeProfitCap = 0.012 if self.tradeProfitCap is None else float(self.GetParameter("tradeProfitCap")) #0.1
self.adx_period = self.GetParameter("adx_period")
self.adx_period = 14 if self.adx_period is None else int(self.GetParameter("adx_period")) # 12
self.adx_threshold = self.GetParameter("adx_threshold")
self.adx_threshold = 15 if self.adx_threshold is None else int(self.GetParameter("adx_threshold")) # 15 10
self.epsilon = self.GetParameter("epsilon")
self.epsilon = -1 if self.epsilon is None else float(self.GetParameter("epsilon")) # 1.1 -1
self.epsilon2 = self.GetParameter("epsilon2")
self.epsilon2 = -0.5 if self.epsilon2 is None else float(self.GetParameter("epsilon2")) # 1 -0.75
self.pepsilon = self.GetParameter("pepsilon")
self.pepsilon = 2.5 if self.pepsilon is None else float(self.GetParameter("pepsilon")) # 0, 3, 4.5
self.tr_threshold = self.GetParameter("tr_threshold")
self.tr_threshold = 0 if self.tr_threshold is None else float(self.GetParameter("tr_threshold")) # 1.1
self.relVol_Threshold = self.GetParameter("relVol_Threshold")
self.relVol_Threshold = 0.3 if self.relVol_Threshold is None else float(self.GetParameter("relVol_Threshold")) # 1.1
self.stop_div = self.GetParameter("stop_div")
self.stop_div = 1.3 if self.stop_div is None else float(self.GetParameter("stop_div")) # 1.3, 1
self.chase_mult = self.GetParameter("chase_mult")
self.chase_mult = 0.05 if self.chase_mult is None else float(self.GetParameter("chase_mult")) # 1.3, 1 0.25 0.05
self.ladder_mult = self.GetParameter("ladder_mult")
self.ladder_mult = 7.5 if self.ladder_mult is None else float(self.GetParameter("ladder_mult")) # 1.3, 1
self.bb_min = self.GetParameter("bb_min")
self.bb_min = 7 if self.bb_min is None else int(self.GetParameter("bb_min")) # 7
self.bb_mult = self.GetParameter("bb_mult")
self.bb_mult = 2 if self.bb_mult is None else float(self.GetParameter("bb_mult")) # 2 1.6
self.bb_period = self.GetParameter("bb_period")
self.bb_period = 20 if self.bb_period is None else int(self.GetParameter("bb_period")) # 20
self.ttm_period = self.GetParameter("ttm_period")
self.ttm_period = 20 if self.ttm_period is None else int(self.GetParameter("ttm_period")) # 7
self.rth_hour = self.GetParameter("rth_hour")
self.rth_hour = 8 if self.rth_hour is None else int(self.GetParameter("rth_hour")) # 1
self.eod_offset = self.GetParameter("eod_offset")
self.eod_offset = 15 if self.eod_offset is None else int(self.GetParameter("eod_offset")) # 25
self.midday_start = self.GetParameter("midday_start")
self.midday_start = 12 if self.midday_start is None else int(self.GetParameter("midday_start")) # 25
self.midday_end = self.GetParameter("midday_end")
self.midday_end = 13 if self.midday_end is None else int(self.GetParameter("midday_end")) # 25
self._continuousContract = self.AddFuture(Futures.Indices.SP500EMini,
resolution = Resolution.Minute,
dataNormalizationMode = DataNormalizationMode.BackwardsRatio,
dataMappingMode = DataMappingMode.LastTradingDay,
contractDepthOffset= 0)
self._vxx = self.AddEquity("VXX", Resolution.Minute, None, True, 0, False)
self._vxx.SetDataNormalizationMode(DataNormalizationMode.Raw)
self._vxx_bar = None
self.vxx_threshold = self.GetParameter("vxx_threshold")
self.vxx_threshold = 50 if self.vxx_threshold is None else float(self.GetParameter("vxx_threshold")) # 30
self.SetBenchmark("SPY")
self._bb = BollingerBands(self._continuousContract.Symbol, self.bb_period, self.bb_mult)
self._fast = ExponentialMovingAverage(self.fast_ema)
self._slow = ExponentialMovingAverage(self.slow_ema)
self._vwap = VolumeWeightedAveragePriceIndicator(14)
self._adx = AverageDirectionalIndex(self._continuousContract.Symbol, self.adx_period)
self._ttm = TTMSqueeze(self._continuousContract.Symbol, period=self.ttm_period, BBMult = 2, KCMult = 1.5)
self._twap = TWAP(self, smooth_period=14, daily_reset=True)
self._relVol = RelativeDailyVolume(period=14)
self._dev = DevStop(self, length=20)
# simple daily fib pivots
self._sfp = None
self._pivots = PivotsHL(self, self._continuousContract.Symbol, self.left, self.right, self.pivot_ema, self.pivot_min, False)
#self._pivots.PrimeData()
self.lst = LadderSuperTrend(self.Symbol, self, trPeriod = self.trLen, trMult = self.trMult, doubleConfirm = False, volConfirm = 0, verbose = False, useExtended = True, reversalBars = 0, negDiv = self.negDiv)
#self.lst = TVSuperTrend(self.Symbol, self, trPeriod = self.trLen, trMult = self.trMult, doubleConfirm = False, volConfirm = 0, verbose = False, useExtended = False)
self._ladderStop = LadderSuperTrend(self.Symbol, self, trPeriod = 10, trMult = self.ladder_mult, doubleConfirm = False, volConfirm = 0, verbose = False, useExtended = True, reversalBars = 0, negDiv = self.negDiv)
self._currentContract = None
self.Con = TradeBarConsolidator(TimeSpan.FromMinutes(self.trade_min))
self.Con.DataConsolidated += self.HandleBar
self.TrendCon = TradeBarConsolidator(TimeSpan.FromMinutes(self.trend_min))
self.TrendCon.DataConsolidated += self.TrendBar
self.StopCon = TradeBarConsolidator(TimeSpan.FromMinutes(1))
self.StopCon.DataConsolidated += self.StopBar
self.BBCon = TradeBarConsolidator(TimeSpan.FromMinutes(self.bb_min))
self.BBCon.DataConsolidated += self.BBBar
self.SideCon = TradeBarConsolidator(TimeSpan.FromMinutes(self.side_min))
self.SideCon.DataConsolidated += self.SideBar
self.rawFeed_1 = TradeBarConsolidator(TimeSpan.FromMinutes(1))
self.rawFeed_1.DataConsolidated += self.rawFeed_1_handler
self.rawFeed_Day = TradeBarConsolidator(TimeSpan.FromDays(1))
self.rawFeed_Day.DataConsolidated += self.rawFeed_Day_handler
self.SubscriptionManager.AddConsolidator(self._continuousContract.Symbol, self.Con)
self.SubscriptionManager.AddConsolidator(self._continuousContract.Symbol, self.TrendCon)
self.SubscriptionManager.AddConsolidator(self._continuousContract.Symbol, self.StopCon)
self.SubscriptionManager.AddConsolidator(self._continuousContract.Symbol, self.BBCon)
self.SubscriptionManager.AddConsolidator(self._continuousContract.Symbol, self.SideCon)
self.SubscriptionManager.AddConsolidator(self._continuousContract.Symbol, self.rawFeed_1)
self.SubscriptionManager.AddConsolidator(self._continuousContract.Symbol, self.rawFeed_Day)
self.stopPrice = None
self.tpPrice = None
self.currentTrade = TradeType.NO_TRADE
self._prev_lPrice = None
self._prev_mPrice = None
self._prev_hPrice = None
# enable trades daily, and rotate the daily trade stats at the same time
self.noTrades = False
self.Schedule.On(self.DateRules.EveryDay(self._continuousContract.Symbol),
self.TimeRules.AfterMarketOpen(self._continuousContract.Symbol, 0),
self.EnableTrades)
if self.OnlyExchangeHours:
# self.Schedule.On(self.DateRules.WeekEnd(self._continuousContract.Symbol),
# self.TimeRules.BeforeMarketClose(self._continuousContract.Symbol, 5),
# self.LiquidateDisableTrades)
self.Schedule.On(self.DateRules.Every(DayOfWeek.Friday),
self.TimeRules.BeforeMarketClose(self._continuousContract.Symbol, 5),
self.LiquidateDisableTrades)
#if self.RTHOnly:
# self.Schedule.On(self.DateRules.EveryDay(self._continuousContract.Symbol),
# self.TimeRules.BeforeMarketClose(self._continuousContract.Symbol, self.eod_offset),
# self.LiquidateDisableTrades)
if self.skipMidday:
self.Schedule.On(self.DateRules.EveryDay(self._continuousContract.Symbol),
self.TimeRules.At(self.midday_start, 0, 0),
self.PauseTrades)
self.Schedule.On(self.DateRules.EveryDay(self._continuousContract.Symbol),
self.TimeRules.At(self.midday_end, 0, 0),
self.StartTrades)
self.dayTrades = 0
self.dayLoss = 0
self.dayWin = 0
self.dayPL = 0
self.dayPLPer = 0
self.dayPeakPL = 0
self.dayEquityStart = self.Portfolio.TotalPortfolioValue
self.peakUnrealized = 0
self.peakUnrealizedPer = 0
self.minUnrealized = 0
self.minUnrealizedPer = 0
self.PrimeData(20000)
def PrimeData(self, minutes = 1000):
self.Debug(f"Requesting Minute Resolution History")
history_raw = self.History([self._continuousContract.Symbol], minutes)
self.Debug(f"Parsing Minute Resolution History")
if history_raw.empty:
self.Error(f'Empty data frame for {self._continuousContract.Symbol} on {self.Time}')
return
history = history_raw.droplevel(0).loc[self._continuousContract.Symbol]
hislen = len(history)
self.Debug(f"Warming up {hislen} History {self._continuousContract.Symbol}")
if hislen > 0:
self.Debug(f"Minute: {history.index}")
for idx, bar in history.iterrows():
tradeBar = None
time = idx - timedelta(minutes=1)
if math.isnan(bar.volume):
tradebar = TradeBar(time, self._continuousContract.Symbol, bar.open, bar.high, bar.low, bar.close, 0, TimeSpan.FromMinutes(1))
self.Debug(f"Bad volume: {bar}")
else:
tradeBar = TradeBar(time, self._continuousContract.Symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, TimeSpan.FromMinutes(1))
#self.Debug(f"history: {idx}, corrected {time} {tradeBar}")
if (tradeBar is None) or (bar.close == 0): # bad data from history?
self.Debug(f"Invalid tradebar {tradeBar}")
continue
self._pivots.Con.Update(tradeBar)
self.TrendCon.Update(tradeBar)
self.BBCon.Update(tradeBar)
self.SideCon.Update(tradeBar)
self.rawFeed_1.Update(tradeBar)
self.rawFeed_Day.Update(tradeBar)
self.Debug("Primed Indicators")
return
def ResetTradeStats(self):
self.peakUnrealized = 0
self.peakUnrealizedPer = 0
self.minUnrealized = 0
self.minUnrealizedPer = 0
self.currentTrade = TradeType.NO_TRADE
self.tpPrice = None
self.stopPrice = None
def PauseTrades(self):
self.noTrades = True
#self.Debug(f"Pausing trades at: {self.Time}")
def StartTrades(self):
self.noTrades = False
#self.Debug(f"Unpausing trades at: {self.Time}")
# called once a day at open
def EnableTrades(self):
self.noTrades = False
#self.Debug(f"Enabling trades at: {self.Time}")
self.Debug(f"Trade Stats: Day {self.dayTrades} Win {self.dayWin} Lose {self.dayLoss} PL {round(self.dayPL, 2)} PL% {self.dayPLPer} PeakPL {round(self.dayPeakPL, 2)} Start EQ {self.dayEquityStart} End EQ {self.Portfolio.TotalPortfolioValue}")
self.dayTrades = 0
self.dayWin = 0
self.dayLoss = 0
self.dayPL = 0
self.dayPLPer = 0
self.dayPeakPL = 0
self.dayEquityStart = self.Portfolio.TotalPortfolioValue
# toggle off trades until the next day and close any active positions
def LiquidateDisableTrades(self):
if self._currentContract.Symbol is not None:
if self.Portfolio[self._currentContract.Symbol].Invested:
self.LogTrade()
self.Liquidate(tag=f"EOD {self.Time} PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.noTrades = True
self.ResetTradeStats()
#self.Debug(f"Liquidate Disabling trades at: {self.Time}")
def LogTrade(self):
self.dayTrades += 1
pl = self.Portfolio[self._currentContract.Symbol].UnrealizedProfit
if pl > 0:
self.dayWin += 1
else:
self.dayLoss += 1
self.dayPL += pl
self.dayPLPer = self.dayPL / self.dayEquityStart
self.dayPeakPL += self.peakUnrealized
self.SetRuntimeStatistic("Day PNL", round(self.dayPL, 2))
self.SetRuntimeStatistic("Stop", "None")
# lost too much today, shut down for the day
if self.dayPLPer < -1*self.dayLossCap:
self.Debug(f"Shutting down trades for loss cap: %{self.dayPLPer} ${self.dayPL}")
self.noTrades = True
elif self.dayPLPer > self.dayProfitCap: # we made enough money, call it a day
self.Debug(f"Shutting down trades for profit cap: %{self.dayPLPer} ${self.dayPL}")
self.noTrades = True
# TODO: revise to use limit orders and to chase ask if needed
def GoLong(self, size, tag):
quantity = self.CalculateOrderQuantity(self._currentContract.Symbol, size)
# Raise to 1 if needed because size is too small
if quantity < 1:
quantity = 1
return self.MarketOrder(self._currentContract.Symbol, quantity, tag=tag)
def GoShort(self, size, tag):
quantity = self.CalculateOrderQuantity(self._currentContract.Symbol, size)
# Raise to 1 if needed because size is too small
if quantity > -1:
quantity = -1
return self.MarketOrder(self._currentContract.Symbol, quantity, tag=tag)
def OnData(self, data):
'''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.
Arguments:
data: Slice object keyed by symbol containing the stock data
'''
for changedEvent in data.SymbolChangedEvents.Values:
if changedEvent.Symbol == self._continuousContract.Symbol:
self.Log(f"SymbolChanged event: {changedEvent}")
bar = None
if data.Bars.ContainsKey(self._continuousContract.Symbol):
bar = data.Bars[self._continuousContract.Symbol]
#self.lst.Update(bar)
self._vxx_bar = None
if data.Bars.ContainsKey(self._vxx.Symbol):
self._vxx_bar = data.Bars[self._vxx.Symbol]
if bar is None:
self.Error(f"None bar at {self.Time}")
return
if self.IsWarmingUp:
return
if not self.lst.IsReady or not self._bb.IsReady or not self._ladderStop.IsReady or not self._ttm.IsReady:
return
if self._currentContract is None:
self._currentContract = self.Securities[self._continuousContract.Mapped]
#self.lst.Trend[0] == 1
if self.Portfolio[self._currentContract.Symbol].Invested:
pl = self.Portfolio[self._currentContract.Symbol].UnrealizedProfit
plPer = self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent
self.peakUnrealized = max(self.peakUnrealized, pl)
self.peakUnrealizedPer = max(self.peakUnrealizedPer, plPer)
self.minUnrealized = min(self.minUnrealized, pl)
self.minUnrealizedPer = min(self.minUnrealizedPer, plPer)
curPNL = self.dayPL + pl
self.SetRuntimeStatistic("Day PNL", round(curPNL, 2))
self.SetRuntimeStatistic("Stop", round(self.stopPrice, 2) if self.stopPrice is not None else "None")
self.SetRuntimeStatistic("TP", round(self.tpPrice, 2) if self.tpPrice is not None else "None")
# stop trading if vxx spikes too high
if self.vxx_threshold != 0 and self._vxx_bar is not None:
if self._vxx_bar.Close > self.vxx_threshold:
# shut down trades, too volatile
if (self.verbose):
self.Debug(f"Disabling Trading due to VXX: {self._vxx_bar.Close}")
self.LiquidateDisableTrades()
# skip before RTH + early offset
if self.RTHOnly and (time(self.rth_hour,30) > bar.EndTime.time()):
return
# and after RTH
if self.RTHOnly and bar.Time.hour == 15 and bar.Time.minute == 59 and self.Portfolio[self._currentContract.Symbol].Invested:
#self.LogTrade()
#self.Liquidate(tag=f"EOD Close: PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
#self.ResetTradeStats()
self.LiquidateDisableTrades()
if self.RTHOnly and (bar.Time.hour > 15):
return
# skip trading if flagged noTrades
if self.noTrades:
return
if self.skipClose and (time(15,50) < bar.EndTime.time()):
self.noTrades = True
return
if self._currentContract is not None and self._currentContract.Symbol != self._continuousContract.Mapped:
self.Log(f"{Time} - rolling position from {self._currentContract.Symbol} to {self._continuousContract.Mapped}")
currentPositionSize = self._currentContract.Holdings.Quantity
if currentPositionSize != 0:
self.LogTrade()
self.Liquidate(self._currentContract.Symbol)
type = self.currentTrade
self.ResetTradeStats()
self.Buy(self._continuousContract.Mapped, currentPositionSize)
self.currentTrade = type
self._currentContract = self.Securities[self._continuousContract.Mapped]
def StopBar(self, sender, bar):
if bar is None:
return False
if self.IsWarmingUp:
return False
if self._currentContract is None:
return False
lPrice = self._pivots.emaLow.Current.Value
#lPrice = min(self._fast.Current.Value, self._slow.Current.Value)
#lPrice = self._bb.LowerBand.Current.Value
#lPrice = self._prev_lPrice
#lPrice = self._sfp.FibBelow(bar)
#if lPrice is None:
# lPrice = self._bb.LowerBand.Current.Value
#lPrice = min(self._bb.LowerBand.Current.Value, self._pivots.emaLow.Current.Value)
#hPrice = self._pivots.hPrice
hPrice = self._pivots.emaHigh.Current.Value
#hPrice = max(self._fast.Current.Value, self._slow.Current.Value)
#hPrice = self._bb.UpperBand.Current.Value
#hPrice = self._prev_hPrice
#hPrice = self._sfp.FibAbove(bar)
#if hPrice is None:
# hPrice = self._bb.UpperBand.Current.Value
#hPrice = max(self._bb.LowerBand.Current.Value, self._pivots.emaHigh.Current.Value)
stop = (hPrice - lPrice) / 2
#mPrice = self._bb.MiddleBand.Current.Value
mPrice = self._slow.Current.Value
#mPrice = self._prev_mPrice
#mPrice = (hPrice + lPrice) / 2
# cap stops based on ATR in case of large pivot/reversal divergence
tr = self.lst.tr.Current.Value
stop = max(stop, tr)
#self.stopTrail = stop
trailAmount = max(self.minTrail, (self.stopTrail * tr))
epsilon = self.epsilon
if (self.Portfolio[self._currentContract.Symbol].Invested):
if self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent < (-1 * self.tradeLossCap):
self.LogTrade()
self.Liquidate(tag=f"stopLoss {bar.Close} PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
elif self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent > self.tradeProfitCap:
self.LogTrade()
self.Liquidate(tag=f"stopProfit {bar.Close} PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
if (self.Portfolio[self._currentContract.Symbol].Invested and self.stopPrice is not None and self.useStops):
if self.Portfolio[self._currentContract.Symbol].IsShort:
profitLevel = self.Portfolio[self._currentContract.Symbol].AveragePrice - bar.Close
breakEven = self.Portfolio[self._currentContract.Symbol].AveragePrice - 1
if ((self.green_threshold > 0) and (profitLevel > self.green_threshold) and (self.stopPrice > breakEven)):
self.stopPrice = breakEven
if self.trailStop and ((bar.Close + trailAmount) < self.stopPrice):
self.stopPrice = bar.Close + trailAmount
if (self.ladderStop and (bar.Close < self._ladderStop.TDown[0]) and (self._ladderStop.TDown[0] < self.stopPrice) and (self._ladderStop.Trend[0] == -1)):
self.stopPrice = self._ladderStop.TDown[0] if self._ladderStop.TDown[0] is not None else self.stopPrice
if (self.exitMid and (bar.Close > mPrice) and (bar.Open < mPrice)):
self.LogTrade()
self.Liquidate(tag=f"midPrice {mPrice} vs {bar.Close} PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
elif bar.Close > self.stopPrice:
self.LogTrade()
self.Liquidate(tag=f"stopPrice {self.stopPrice} vs {bar.Close} PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
elif (self.useDevStop):
self.stopPrice = self._dev.B3
if bar.Close > self.stopPrice:
self.LogTrade()
self.Liquidate(tag=f"devStop {self.stopPrice} vs {bar.Close} PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
elif self.Portfolio[self._currentContract.Symbol].IsLong:
profitLevel = bar.Close - self.Portfolio[self._currentContract.Symbol].AveragePrice
breakEven = self.Portfolio[self._currentContract.Symbol].AveragePrice + 1
if ((self.green_threshold > 0) and (profitLevel > self.green_threshold) and (self.stopPrice < breakEven)):
self.stopPrice = breakEven
if self.trailStop and ((bar.Close - trailAmount) > self.stopPrice):
self.stopPrice = bar.Close - trailAmount
if (self.ladderStop and (bar.Close > self._ladderStop.TUp[0]) and (self._ladderStop.TUp[0] > self.stopPrice) and (self._ladderStop.Trend[0] == 1)):
self.stopPrice = self._ladderStop.TUp[0] if self._ladderStop.TUp[0] is not None else self.stopPrice
if (self.exitMid and (bar.Close < mPrice) and (bar.Open > mPrice)):
self.LogTrade()
self.Liquidate(tag=f"midPrice {mPrice} vs {bar.Close} PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
elif bar.Close < self.stopPrice:
self.LogTrade()
self.Liquidate(tag=f"stopPrice {self.stopPrice} vs {bar.Close} PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
elif (self.useDevStop):
self.stopPrice = self._dev.A3
if bar.Close < self.stopPrice:
self.LogTrade()
self.Liquidate(tag=f"devStop {self.stopPrice} vs {bar.Close} PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
if (self.Portfolio[self._currentContract.Symbol].Invested and self.tpPrice is not None and self.useTP):
if self.Portfolio[self._currentContract.Symbol].IsShort:
if bar.Low < self.tpPrice:
self.LogTrade()
self.Liquidate(tag=f"tpPrice {self.tpPrice} vs {bar.Close} PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
#self.Debug(f"TP Triggered: {self.tpPrice} vs {bar.Close}")
#self.stopPrice = self.tpPrice
#self.tpPrice = None
elif self.Portfolio[self._currentContract.Symbol].IsLong:
if bar.High > self.tpPrice:
self.LogTrade()
self.Liquidate(tag=f"tpPrice {self.stopPrice} vs {bar.Close} PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
#self.Debug(f"TP Triggered: {self.tpPrice} vs {bar.Close}")
#self.stopPrice = self.tpPrice
#self.tpPrice = None
#update stops higher
if (self.updateStops):
if self.Portfolio[self._currentContract.Symbol].IsShort:
if (bar.Close < (lPrice - stop)) and self.trailStop: # start trailing stop behind price
if self.stopPrice > (bar.Close + stop):
self.stopPrice = bar.Close + stop
elif bar.Close < lPrice - stop:
if self.stopPrice > (lPrice):
self.stopPrice = lPrice
elif bar.Close < hPrice - stop:
if self.stopPrice > (hPrice):
self.stopPrice = hPrice
if self.Portfolio[self._currentContract.Symbol].IsLong:
if (bar.Close > hPrice + stop) and self.trailStop:
if self.stopPrice < (bar.Close - stop):
self.stopPrice = bar.Close - stop
elif bar.Close > hPrice + stop:
if self.stopPrice < (hPrice):
self.stopPrice = hPrice
elif bar.Close > lPrice + stop:
if self.stopPrice < (lPrice):
self.stopPrice = lPrice
def TrendBar(self, sender, bar):
if bar is None:
return False
self.lst.Update(bar)
def BBBar(self, sender, bar):
if bar is None:
return False
self._prev_lPrice = self._bb.LowerBand.Current.Value
self._prev_mPrice = self._bb.MiddleBand.Current.Value
self._prev_hPrice = self._bb.UpperBand.Current.Value
self._bb.Update(bar.EndTime, bar.Close)
self._slow.Update(bar.EndTime, bar.Close)
self._fast.Update(bar.EndTime, bar.Close)
self._relVol.Update(bar)
def SideBar(self, sender, bar):
if bar is None:
return False
self._adx.Update(bar)
def rawFeed_1_handler(self, sender, bar):
if bar is None:
return False
#self._adx.Update(bar)
self._vwap.Update(bar)
self._ladderStop.Update(bar)
self._ttm.Update(bar)
self._dev.Update(bar)
def rawFeed_Day_handler(self, sender, bar):
if bar is None:
return False
self._sfp = Pivots(bar)
# all entry logic here
def HandleBar(self, sender, bar):
if bar is None:
return False
if self.IsWarmingUp:
return
if self._currentContract is None:
self._currentContract = self.Securities[self._continuousContract.Mapped]
if not self.lst.IsReady or not self._bb.IsReady or not self._ladderStop.IsReady or not self._ttm.IsReady or self._sfp is None:
if self.verbose:
self.Debug(f"Ready State: ladderstop {self._ladderStop.IsReady} lst {self.lst.IsReady} bb {self._bb.IsReady} adx {self._adx.IsReady} ttm {self._ttm.IsReady} sfp {self._sfp is None}")
return
# skip trading if flagged noTrades
if self.noTrades:
return
if self.RTHOnly and (time(self.rth_hour,30) > bar.EndTime.time()):
return
# and after RTH
if self.RTHOnly and (bar.EndTime.hour > 15):
return
barStr = self.BarStrength(bar)
#lPrice = self._pivots.lPrice
lPrice = self._pivots.emaLow.Current.Value
#lPrice = min(self._fast.Current.Value, self._slow.Current.Value)
#lPrice = self._bb.LowerBand.Current.Value
#lPrice = self._prev_lPrice
#lPrice = self._sfp.FibBelow(bar)
#if lPrice is None:
# lPrice = self._bb.LowerBand.Current.Value
#lPrice = min(self._bb.LowerBand.Current.Value, self._pivots.emaLow.Current.Value)
#hPrice = self._pivots.hPrice
hPrice = self._pivots.emaHigh.Current.Value
#hPrice = max(self._fast.Current.Value, self._slow.Current.Value)
#hPrice = self._bb.UpperBand.Current.Value
#hPrice = self._prev_hPrice
#hPrice = self._sfp.FibAbove(bar)
#if hPrice is None:
# hPrice = self._bb.UpperBand.Current.Value
#hPrice = max(self._bb.LowerBand.Current.Value, self._pivots.emaHigh.Current.Value)
stop = (hPrice - lPrice) / 2
mPrice = self._bb.MiddleBand.Current.Value
#mPrice = self._prev_mPrice
#mPrice = (hPrice + lPrice) / 2
# cap stops based on ATR in case of large pivot/reversal divergence
tr = self.lst.tr.Current.Value
stop = max(stop, tr)
#stop = stop / self.stop_div
#self.stopTrail = stop
positionSize = self.positionSize
# simple logic to adjust position size if huge stops needed
if (self.positionThreshold > 0) and (stop > self.positionThreshold):
positionSize = positionSize / 2
# adjust position if in premarket (and different margin needs)
if (time(9,30) > bar.EndTime.time()) or (time(16,0) < bar.EndTime.time()):
positionSize = positionSize * self.positionPremarket
if self.verbose:
self.Debug(f"Levels: {bar} {round(barStr, 2)} {round(lPrice, 2)} {round(mPrice, 2)} {round(hPrice, 2)} {round(stop, 2)} {round(self._vwap.Current.Value, 2)} {round(self.lst.Trend[0], 2)} {round(self.lst.TUp[0], 2)} {round(self.lst.TDown[0], 2)} {round(self._adx.Current.Value, 2)} {self._ttm.Squeeze} {round(tr, 2)}")
#if self._adx.Current.Value < self.adx_threshold:
# if self.verbose:
# self.Debug(f"Skipping trade for sideways price action: {bar} {self._adx.Current.Value}")
# return
pierce = True
rebound = True
# block trades if no vol right now
if (self.tr_threshold != 0) and (tr > self.tr_threshold):
if self.verbose:
self.Debug("Low TR Trade Block")
pierce = False
rebound = False
trendUp = self.lst.Trend[0] == 1
#trendUp = bar.Close > self._sfp.Base
#trendUp = self._adx.PositiveDirectionalIndex > self._adx.NegativeDirectionalIndex
#trendUp = self._fast.Current.Value > self._slow.Current.Value
#trendUp = self._vwap.Current.Value > bar.Close
#trendUp = True
trendDown = self.lst.Trend[0] == -1
#trendDown = bar.Close < self._sfp.Base
#trendDown = self._adx.PositiveDirectionalIndex < self._adx.NegativeDirectionalIndex
#trendDown = self._fast.Current.Value < self._slow.Current.Value
#trendDown = self._vwap.Current.Value < bar.Close
#trendDown = True
pivotAbove = self._sfp.FibAbove(bar.Close)
tpAbove = self._sfp.FibAbove(bar.Close + stop)
pivotBelow = self._sfp.FibBelow(bar.Close)
tpBelow = self._sfp.FibBelow(bar.Close - stop)
pepsilon = self.pepsilon
# if we may be rebounding from a pivot, allow rebound plays
if (pivotAbove is not None):
if ((bar.Open + pepsilon) > pivotAbove):
trendUp = False
trendDown = True
#pierce = False
if self.verbose:
self.Debug("Pivot Rebound Down")
if (pivotBelow is not None):
if ((bar.Open - pepsilon) < pivotBelow):
trendUp = True
trendDown = False
#pierce = False
if self.verbose:
self.Debug("Pivot Rebound Up")
vwap = self._vwap.Current.Value
#if (bar.Open > vwap) and (bar.Close < vwap):
#trendUp = False
#trendDown = True
#if (bar.Open < vwap) and (bar.Close > vwap):
#trendUp = True
#trendDown = False
skipMid = False
# if skip trades, until direction confirms
if self._adx.Current.Value < self.adx_threshold:
trendUp = True
trendDown = True
skipMid = True
#pierce = False
rebound = False
if self.verbose:
self.Debug("ADX Rebound Block")
# if we are squeezing, rebound play until it breaks
if self._ttm.Squeeze:
trendUp = True
trendDown = True
#pierce = False
rebound = False
if self.verbose:
self.Debug("Squeeze Rebound Block")
# else:
# trendUp = False
# trendDown = False
if barStr > self.strThreshold:
trendUp = bar.Open < bar.Close
trendDown = bar.Open > bar.Close
if self.verbose:
self.Debug(f"Bar STR Trend Override: {trendUp} {trendDown}")
# fudge factor to define zones rather than just prices
#epsilon = self.epsilon * tr
epsilon = self.epsilon
if self.currentTrade == TradeType.NO_TRADE:
epsilon = self.epsilon2
if self.twapExit and self.Portfolio[self._currentContract.Symbol].Invested:
self.stopPrice = self._twap.Value
# block trades if super low relative volume
if self._relVol.IsReady and self._relVol.Current.Value < self.relVol_Threshold:
pierce = False
rebound = False
if self.verbose:
self.Debug(f"Blocked for low vol: {self._relVol.Current.Value}")
# rebound entry
if (((bar.Low - epsilon) < lPrice) and ((bar.Close + epsilon) > lPrice) and (not self.blockMid or (self.blockMid and ((bar.Close + epsilon) < mPrice)))): # self.currentTrade != TradeType.LOW_PIERCE
if self.Portfolio[self._currentContract.Symbol].IsShort:
self.LogTrade()
self.Liquidate(tag=f"Level Close PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
if (self.allowLong and not self.Portfolio[self._currentContract.Symbol].Invested and rebound and trendUp and not self.noTrades):
self._currentContract = self.Securities[self._continuousContract.Mapped]
self.currentTrade = TradeType.LOW_REBOUND
self.stopPrice = lPrice - stop
if self.useTP:
self.tpPrice = hPrice + tr
self.GoLong(positionSize, tag=f"Lower Rebound Entry Long stop: {self.stopPrice} tp: {self.tpPrice}")
return
elif (((bar.High - epsilon) > hPrice) and ((bar.Close + epsilon) > hPrice) and (not self.noChase or (self.noChase and (bar.Close + epsilon) > (hPrice + (tr * self.chase_mult)) ))):
if self.Portfolio[self._currentContract.Symbol].IsShort:
self.LogTrade()
self.Liquidate(tag=f"Level Close PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
if (self.allowLong and not self.Portfolio[self._currentContract.Symbol].Invested and pierce and trendUp and not self.noTrades):
self._currentContract = self.Securities[self._continuousContract.Mapped]
self.currentTrade = TradeType.HIGH_PIERCE
self.stopPrice = hPrice - (stop / self.stop_div)
#if self.useTP:
# self.tpPrice = tpAbove
self.GoLong(positionSize, tag=f"Upper Pierce Entry Long stop: {self.stopPrice} tp: {self.tpPrice}")
return
elif (((bar.High + epsilon) > hPrice) and ((bar.Close - epsilon) < hPrice) and (not self.blockMid or (self.blockMid and ((bar.Close - epsilon) > mPrice)))): # self.currentTrade != TradeType.HIGH_PIERCE
if self.Portfolio[self._currentContract.Symbol].IsLong:
self.LogTrade()
self.Liquidate(tag=f"Level Close PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
if (self.allowShort and not self.Portfolio[self._currentContract.Symbol].Invested and rebound and trendDown and not self.noTrades):
self._currentContract = self.Securities[self._continuousContract.Mapped]
self.currentTrade = TradeType.HIGH_REBOUND
self.stopPrice = hPrice + stop
if self.useTP:
self.tpPrice = lPrice - tr
self.GoShort(-1 * positionSize, tag=f"Upper Rebound Entry Short stop: {self.stopPrice} tp: {self.tpPrice}")
return
elif (((bar.High + epsilon) > lPrice) and ((bar.Close - epsilon) < lPrice) and (not self.noChase or (self.noChase and (bar.Close - epsilon) < (lPrice - (tr * self.chase_mult)) ))):
if self.Portfolio[self._currentContract.Symbol].IsLong:
self.LogTrade()
self.Liquidate(tag=f"Level Close PL: %{self.Portfolio[self._currentContract.Symbol].UnrealizedProfitPercent} ${self.Portfolio[self._currentContract.Symbol].UnrealizedProfit} peak: {self.peakUnrealized} peak%: {self.peakUnrealizedPer}")
self.ResetTradeStats()
if (self.allowShort and not self.Portfolio[self._currentContract.Symbol].Invested and pierce and trendDown and not self.noTrades):
self._currentContract = self.Securities[self._continuousContract.Mapped]
self.currentTrade = TradeType.LOW_PIERCE
self.stopPrice = lPrice + (stop / self.stop_div)
#if self.useTP:
# self.tpPrice = tpBelow
self.GoShort(-1 * positionSize, tag=f"Lower Pierce Entry Short stop: {self.stopPrice} tp: {self.tpPrice}")
return
#mPrice = (hPrice + lPrice) / 2
# if allowing reentry at midpoint touch
if (self.tradeMid and not skipMid and not self.Portfolio[self._currentContract.Symbol].Invested and not self.noTrades):
if (((bar.Open - epsilon) < mPrice) and ((bar.Close + epsilon) > mPrice) and trendUp):
self._currentContract = self.Securities[self._continuousContract.Mapped]
self.stopPrice = mPrice - stop
self.GoLong(self.positionSize, tag=f"Mid Entry Long stop: {self.stopPrice}")
return
elif (((bar.Open + epsilon) > mPrice) and ((bar.Close - epsilon) < mPrice) and trendDown):
self._currentContract = self.Securities[self._continuousContract.Mapped]
self.stopPrice = mPrice + stop
self.GoShort(-1 * self.positionSize, tag=f"Mid Entry Short stop: {self.stopPrice}")
return
return
def OnOrderEvent(self, orderEvent):
self.Debug("Purchased Stock: {0}".format(orderEvent.Symbol))
def OnSecuritiesChanged(self, changes):
self.Debug(f"{self.Time}-{changes}")
self._pivots.Symbol = changes
#self._pivots.PrimeData()
def BarStrength(self, bar):
size = bar.High - bar.Low
if size == 0:
self.Error(f"Bad bar: {bar}")
return 1
wickU = bar.High - (max(bar.Open, bar.Close))
wickD = (min(bar.Open, bar.Close) - bar.Low)
return (wickU + wickD) / size#region imports
from AlgorithmImports import *
#endregion
# Your New Python File
def hl2(bar):
return (bar.High + bar.Low) / 2
def hlc3(bar):
return (bar.High + bar.Low + bar.Close) / 3
def ohlc4(bar):
return (bar.High + bar.Low + bar.Open + bar.Close) / 4
# conceptually, splits the ATR into a directional variance so that it rides fast trends with a tighter stop
class LadderSuperTrend():
def __init__(self, symbol, algorithm, trPeriod = 10, trMult = 2.25, doubleConfirm = False, volConfirm = 0, verbose = False, useExtended = False, reversalBars = 0, negDiv = 1):
self.algorithm = algorithm
self.useExtended = useExtended
# store the bars for windowed PP calcs
self.bars = RollingWindow[TradeBar](3)
self.TUp = RollingWindow[float](3)
self.TDown = RollingWindow[float](3)
self.Trend = RollingWindow[int](3)
self.negTR = WilderMovingAverage(trPeriod)
self.posTR = WilderMovingAverage(trPeriod)
self.reversalBars = reversalBars # number of bars to consider accelerated reversals
self.negDiv = negDiv # amount the opposite direction ATR is divided by
self.lastChange = 0
self.levelMoves = 0 # number of times the reversal level has moved vs been stable
self.lastLevel = 0
#set up ATR calculation
self.trPeriod = trPeriod
self.trMult = trMult
self.tr = AverageTrueRange(self.trPeriod, MovingAverageType.Wilders)
self.atrs = RollingWindow[float](3)
self.doubleConfirm = doubleConfirm
self.volConfirm = volConfirm
self.verbose = verbose
self.volMA = None
if self.volConfirm > 0:
self.volMA = SimpleMovingAverage(self.volConfirm)
#self.volMA = ExponentialMovingAverage(self.volConfirm)
self.trueR = TrueRange()
self.Value = None
self.Center = None
self.TrendChanged = False
def Update(self, bar):
if bar is None:
return False
# check if we are in main market hours:
if not self.useExtended and ((time(9,30) > bar.EndTime.time()) or (time(16,0) < bar.EndTime.time())):
return False
#verbose = (self.algorithm.Time - TimeSpan.FromMinutes(60)) < bar.EndTime
#verbose = ((bar.EndTime.minute - bar.Time.minute) == 10) and ((self.algorithm.Time - TimeSpan.FromMinutes(60)) < bar.EndTime)
verbose = ((self.algorithm.Time - TimeSpan.FromMinutes(60)) < bar.EndTime) and self.verbose
#changes = (self.algorithm.Time - TimeSpan.FromMinutes(60)) < bar.EndTime
changes = False
self.bars.Add(bar)
self.tr.Update(bar)
self.trueR.Update(bar)
if (bar.Open > bar.Close):
self.negTR.Update(self.trueR.Current)
else:
self.posTR.Update(self.trueR.Current)
if self.volMA != None:
self.volMA.Update(bar.EndTime, bar.Volume)
# once we have a full window and atr start calculating
if self.bars.IsReady and self.tr.IsReady:
self.atrs.Add(self.tr.Current.Value)
if not self.atrs.IsReady or not self.negTR.IsReady or not self.posTR.IsReady:
return False # cant compute yet
else:
return False # not complete with bars and atr
# update base bar stats
#src = hl2(self.bars[0])
#src = self.bars[0].Close
src = hlc3(self.bars[0])
#src = ohlc4(self.bars[0])
up = (src) - (self.trMult * self.posTR.Current.Value)
down = (src) + (self.trMult * self.negTR.Current.Value)
if self.Trend.Count > 0:
if self.Trend[0] == 1:
down = (src) + ((self.trMult * self.negTR.Current.Value) / self.negDiv)
else:
up = (src) - ((self.trMult * self.posTR.Current.Value) / self.negDiv)
up_prev = self.TUp[0] if self.TUp.Count > 0 else up
up_2prev = self.TUp[1] if self.TUp.Count > 1 else up_prev
down_prev = self.TDown[0] if self.TDown.Count > 0 else down
down_2prev = self.TDown[1] if self.TDown.Count > 1 else down_prev
#TUp = max(up, up_prev) if self.bars[1].Close > up_prev else up
#TDown = min(down, down_prev) if self.bars[1].Close < down_prev else down
#TUp = max(up, up_prev) if (self.bars[1].Close > up_prev) or ((self.doubleConfirm) and self.bars[0].Close > up_prev) else up
#TDown = min(down, down_prev) if (self.bars[1].Close < down_prev) or ((self.doubleConfirm) and self.bars[0].Close < down_prev) else down
TUp = max(up, up_prev) if (self.bars[1].Close > up_prev) or ((self.doubleConfirm) and ((self.bars[0].Close + self.bars[1].Close)/2) > up_prev) else up
TDown = min(down, down_prev) if (self.bars[1].Close < down_prev) or ((self.doubleConfirm) and ((self.bars[0].Close + self.bars[1].Close)/2) < down_prev) else down
self.TUp.Add(TUp)
self.TDown.Add(TDown)
self.Center = (TUp + TDown) / 2
if self.Trend.Count > 0:
trend_prev = self.Trend[0]
else:
trend_prev = 1
self.lastLevel = TUp
trend = trend_prev
volMA = self.volMA.Current.Value * 1 if self.volMA != None else 0
self.TrendChanged = False
#if (trend_prev == -1) and (self.bars[0].Close > down_prev) and (not self.doubleConfirm or self.bars[1].Close > down_prev) and (self.bars[0].Volume > volMA):
if (trend_prev == -1) and (self.bars[0].Close > down_prev) and (not self.doubleConfirm or ((self.lastChange < self.reversalBars) and (self.bars[0].Close > self.lastLevel)) or ((self.bars[0].Close + self.bars[1].Close)/2) > down_prev) and (self.bars[0].Volume > volMA):
self.Trend.Add(1)
self.Value = self.TUp[0]
self.TrendChanged = True
self.lastChange = 0
self.levelMoves = 0
self.lastLevel = down_prev
if verbose or changes:
self.algorithm.Debug(f"LadderST Changed Trend UP {self.bars[0].Symbol}: {self.bars[0].Close} and {self.bars[1].Close} vs {down_prev} vol: {self.bars[0].Volume} vs volMA: {volMA}")
#elif (trend_prev == 1) and (self.bars[0].Close < up_prev) and (not self.doubleConfirm or self.bars[1].Close < up_prev) and (self.bars[0].Volume > volMA):
elif (trend_prev == 1) and (self.bars[0].Close < up_prev) and (not self.doubleConfirm or ((self.lastChange < self.reversalBars) and (self.bars[0].Close < self.lastLevel)) or ((self.bars[0].Close + self.bars[1].Close)/2) < up_prev) and (self.bars[0].Volume > volMA):
self.Trend.Add(-1)
self.Value = self.TUp[0]
self.TrendChanged = True
self.lastChange = 0
self.levelMoves = 0
self.lastLevel = up_prev
if verbose or changes:
self.algorithm.Debug(f"LadderST Changed Trend DOWN {self.bars[0].Symbol}: {self.bars[0].Close} and {self.bars[1].Close} vs {up_prev} vol: {self.bars[0].Volume} vs volMA: {volMA}")
else:
self.Trend.Add(self.Trend[0] if self.Trend.Count > 0 else 1)
self.Value = self.TUp[0] if self.Trend[0] == 1 else self.TDown[0]
# correct for keeping the current stops if it was the volume filter that kept the trade away
self.lastChange += 1
if self.Trend[0] == 1:
TUp = max(TUp, up_prev)
self.TUp[0] = TUp
if (TUp > up_prev):
# moved up, record it
self.levelMoves += 1
elif (self.Trend[0] == -1):
TDown = min(TDown, down_prev)
self.TDown[0] = TDown
if (TDown < down_prev):
# moved up, record it
self.levelMoves += 1
if verbose:
self.algorithm.Debug(f"LadderST Trend Continue {self.bars[0].Symbol}: {self.Trend[0]} {self.bars[0].Close} and and {self.bars[1].Close} vs {self.TDown[1] if self.TDown.Count > 1 else down_prev} or {self.TUp[1] if self.TUp.Count > 1 else up_prev} vol: {self.bars[0].Volume} vs volMA: {volMA}")
if verbose:
self.algorithm.Debug(f"LadderST: up: {up} down: {down} TUp: {TUp} TDown: {TDown} tr: {self.trueR.Current.Value} atr: {self.atrs[0]} center: {self.Center} Trend: {self.Trend[0]} volMA: {volMA} bar: {bar}")
@property
def IsReady(self):
return self.bars.IsReady and self.tr.IsReady and self.Trend.IsReady
def __repr__(self):
return f"LadderST: {self.IsReady} Close: {self.bars[0].Close if self.bars else 0} Center: {self.Center} TUp: {self.TUp[0]} TDown: {self.TDown[0]} Trend: {self.Trend[0]} TR: {self.tr.Current.Value} Changed: {self.TrendChanged}"
class DevStop():
def __init__(self, algorithm, length=20):
self.algorithm = algorithm
self.length = length
self.Bars = RollingWindow[TradeBar](self.length)
self.std = StandardDeviation(self.length)
self.sma = SimpleMovingAverage(self.length)
self.A1 = 0
self.A1s = RollingWindow[float](self.length)
self.A2 = 0
self.A2s = RollingWindow[float](self.length)
self.A3 = 0
self.A3s = RollingWindow[float](self.length)
self.A4 = 0
self.A4s = RollingWindow[float](self.length)
self.B1 = 0
self.B1s = RollingWindow[float](self.length)
self.B2 = 0
self.B2s = RollingWindow[float](self.length)
self.B3 = 0
self.B3s = RollingWindow[float](self.length)
self.B4 = 0
self.B4s = RollingWindow[float](self.length)
def Update(self, bar):
if bar is None:
return False
self.Bars.Add(bar)
if self.Bars.Count < 2:
return False
TRD = max(self.Bars[0].High, self.Bars[1].High) - min(self.Bars[0].Low, self.Bars[1].Low)
self.std.Update(self.Bars[0].EndTime, TRD)
self.sma.Update(self.Bars[0].EndTime, TRD)
if not (self.std.IsReady and self.sma.IsReady):
return False
self.A1s.Add((self.Bars[0].High - self.sma.Current.Value) - (3.6 * self.std.Current.Value))
self.A1 = max(list(self.A1s))
self.A2s.Add((self.Bars[0].High - self.sma.Current.Value) - (2.2 * self.std.Current.Value))
self.A2 = max(list(self.A2s))
self.A3s.Add((self.Bars[0].High - self.sma.Current.Value) - (self.std.Current.Value))
self.A3 = max(list(self.A3s))
self.A4s.Add((self.Bars[0].High - self.sma.Current.Value))
self.A4 = max(list(self.A4s))
self.B1s.Add((self.Bars[0].Low + self.sma.Current.Value) + (3.6 * self.std.Current.Value))
self.B1 = max(list(self.B1s))
self.B2s.Add((self.Bars[0].Low + self.sma.Current.Value) + (2.2 * self.std.Current.Value))
self.B2 = max(list(self.B2s))
self.B3s.Add((self.Bars[0].Low + self.sma.Current.Value) + (self.std.Current.Value))
self.B3 = max(list(self.B3s))
self.B4s.Add((self.Bars[0].Low + self.sma.Current.Value))
self.B4 = max(list(self.B4s))
return True
@property
def IsReady(self):
return self.std.IsReady and self.sma.IsReady and self.Bars
def __repr__(self):
return f"DevStop: B4: {self.B4} B3: {self.B3} B2: {self.B2} B1 {self.B1} A4: {self.A4} A3: {self.A3} A2: {self.A2} A1: {self.A1}"
class Pivots():
Base = None
S1 = None
S2 = None
S3 = None
R1 = None
R2 = None
R3 = None
Bar = None
IsWarmingUp = True
def __init__(self, bar):
self.Bar = bar
self.Base = (bar.High + bar.Low + bar.Close) / 3
# traditional pivots
self.S1 = (self.Base * 2) - bar.High
self.S2 = self.Base - (bar.High - bar.Low)
self.S3 = bar.Low - (2*(bar.High - self.Base))
self.R1 = (self.Base * 2) - bar.Low
self.R2 = self.Base + (bar.High - bar.Low)
self.R3 = bar.High + (2*(self.Base - bar.Low))
# Fib pivots
self.FS1 = self.Base - (0.382 * (bar.High - bar.Low))
self.FS2 = self.Base - (0.618 * (bar.High - bar.Low))
self.FS3 = self.S2
self.FS4 = self.Base - (1.414 * (bar.High - bar.Low))
self.FS5 = self.Base - (2 * (bar.High - bar.Low))
self.FR1 = self.Base + (0.382 * (bar.High - bar.Low))
self.FR2 = self.Base + (0.618 * (bar.High - bar.Low))
self.FR3 = self.R2
self.FR4 = self.Base + (1.414 * (bar.High - bar.Low))
self.FR5 = self.Base + (2 * (bar.High - bar.Low))
self.IsWarmingUp = False
self.IsReady = True
def FibBelow(self, price):
level = None
if self.FR5 < price:
level = self.FR5
if self.FR4 < price:
level = self.FR4
if self.FR3 < price:
level = self.FR3
elif self.FR2 < price:
level = self.FR2
elif self.FR1 < price:
level = self.FR1
elif self.Base < price:
level = self.Base
elif self.FS1 < price:
level = self.FS1
elif self.FS2 < price:
level = self.FS2
elif self.FS3 < price:
level = self.FS3
elif self.FS4 < price:
level = self.FS4
elif self.FS5 < price:
level = self.FS5
return level
def FibAbove(self, price):
level = None
if self.FS5 > price:
level = self.FS5
if self.FS4 > price:
level = self.FS4
if self.FS3 > price:
level = self.FS3
elif self.FS2 > price:
level = self.FS2
elif self.FS1 > price:
level = self.FS1
elif self.Base > price:
level = self.Base
elif self.FR1 > price:
level = self.FR1
elif self.FR2 > price:
level = self.FR2
elif self.FR3 > price:
level = self.FR3
elif self.FR4 > price:
level = self.FR4
elif self.FR5 > price:
level = self.FR5
return level
def Update(self, bar):
self.Bar = bar
self.Base = (bar.High + bar.Low + bar.Close) / 3
self.S1 = (self.Base * 2) - bar.High
self.S2 = self.Base - (bar.High - bar.Low)
self.S3 = bar.Low - (2*(bar.High - self.Base))
self.R1 = (self.Base * 2) - bar.Low
self.R2 = self.Base + (bar.High - bar.Low)
self.R3 = bar.High + (2*(self.Base - bar.Low))
# Fib pivots
self.FS1 = self.Base - (0.382 * (bar.High - bar.Low))
self.FS2 = self.Base - (0.618 * (bar.High - bar.Low))
self.FS3 = self.S2
self.FS4 = self.Base - (1.414 * (bar.High - bar.Low))
self.FS5 = self.Base - (2 * (bar.High - bar.Low))
self.FR1 = self.Base + (0.382 * (bar.High - bar.Low))
self.FR2 = self.Base + (0.618 * (bar.High - bar.Low))
self.FR3 = self.R2
self.FR4 = self.Base + (1.414 * (bar.High - bar.Low))
self.FR5 = self.Base + (2 * (bar.High - bar.Low))
self.IsWarmingUp = False
self.IsReady = True
return self.IsReady
def __repr__(self):
return f"Pivots: {self.Bar.Symbol} {self.Bar.EndTime} S3: {self.S3} S2: {self.S2} S1: {self.S1} Base: {self.Base} R1: {self.R1} R2: {self.R2} R3: {self.R3}"
class TTMSqueeze:
def __init__(self, symbol, period = 20, BBMult = 2.0, KCMult = 1.5):
self.Symbol = symbol
self.Name = f"TTMSqueeze - {self.Symbol} {period}"
self.Time = datetime.min
self.Squeeze = False
self.IsReady = False
self.Value = 0
self.Squeezes = RollingWindow[bool](period)
self.Histogram = RollingWindow[IndicatorDataPoint](period)
self.bb = BollingerBands(self.Symbol, period, BBMult, MovingAverageType.Simple)
self.kch = KeltnerChannels(self.Symbol, period, KCMult, MovingAverageType.Simple)
self.dch = DonchianChannel(self.Symbol, period, period)
self.sma = SimpleMovingAverage(period)
self.lsma = LeastSquaresMovingAverage(period)
def Update(self, bar):
self.Time = bar.EndTime
self.bb.Update(bar.EndTime, bar.Close)
self.kch.Update(bar)
self.dch.Update(bar)
self.sma.Update(bar.EndTime, bar.Close)
if not (self.bb.IsReady and self.kch.IsReady and self.dch.IsReady and self.sma.IsReady):
return False
self.Squeeze = (self.bb.LowerBand.Current.Value > self.kch.LowerBand.Current.Value) and (self.bb.UpperBand.Current.Value < self.kch.UpperBand.Current.Value)
#self.Squeezes.Add(IndicatorDataPoint(bar.EndTime, self.Squeeze))
self.Squeezes.Add(self.Squeeze)
#val = linreg(source - avg(avg(highest(high, lengthKC), lowest(low, lengthKC)),sma(close,lengthKC)), lengthKC,0)
#delta of (close - ((dch + sma) / 2) = momo
delta = bar.Close - ((self.dch.Current.Value + self.sma.Current.Value)/ 2)
self.lsma.Update(bar.EndTime, delta)
self.Value = self.lsma.Current.Value
self.Histogram.Add(self.lsma.Current)
self.IsReady = self.bb.IsReady and self.kch.IsReady and self.dch.IsReady and self.sma.IsReady and self.lsma.IsReady
return self.IsReady
# @property
# def IsReady(self):
# return self.bb.IsReady and self.kch.IsReady and self.dch.IsReady and self.sma.IsReady and self.lsma.IsReady
def __repr__(self):
return f"{self.Name} -> {self.IsReady}, Time: {self.Time}, Squeeze: {self.Squeeze} Histogram: {self.Value}"
# trades reversals off recent pivot points (determined from rolling window of left + right bars)
class PivotsHL():
def __init__(self, algorithm, symbol, left = 4, right = 2, ema_len = 10, timeframe = 1, verbose = False):
self.algorithm = algorithm
self.Symbol = symbol
self.verbose = verbose
self.timeframe = timeframe
self.pivotWindow = 10 # how many recent pivots to store, note, only most recent is used for trades presently
self.ema_len = ema_len
self.hPrice = None
self.hLast = 0
self.lPrice = None
self.lLast = 0
self.stopPrice = 0.0
self.newHigh = False
self.newLow = False
self.Last = 0
# TODO: find correct api call
#self.minTick = symbol.SymbolProperties.MinimumPriceVariation
self.pRange = 1 + left + right
self.left = left
self.right = right
self.bars = RollingWindow[TradeBar](self.pRange)
self.pHighs = RollingWindow[TradeBar](self.pivotWindow)
self.pLows = RollingWindow[TradeBar](self.pivotWindow)
self.emaHigh = ExponentialMovingAverage(ema_len)
self.emaLow = ExponentialMovingAverage(ema_len)
self.Con = TradeBarConsolidator(TimeSpan.FromMinutes(timeframe))
self.Con.DataConsolidated += self.handleBar
self.algorithm.SubscriptionManager.AddConsolidator(self.Symbol, self.Con)
def handleBar(self, sender, bar):
if bar is None:
return False
self.bars.Add(bar)
newHigh = True
newLow = True
# once we have a full window, start finding PP
if self.bars.IsReady:
# check right for pivot confirm
for i in range(self.right):
if self.bars[i].High > self.bars[self.right].High:
newHigh = False
if self.bars[i].Low < self.bars[self.right].Low:
newLow = False
# and check left as well
for i in range(self.right+1, self.pRange):
if self.bars[i].High > self.bars[self.right].High:
newHigh = False
if self.bars[i].Low < self.bars[self.right].Low:
newLow = False
else:
return False # not complete yet
self.newHigh = False
self.newLow = False
# shouldn't be possible to have both a new high and new low in the same
if newHigh:
self.pHighs.Add(self.bars[self.right])
self.hPrice = self.bars[self.right].High
self.newHigh = True
self.Last = self.hPrice
if self.verbose:
self.algorithm.Debug("New High Pivot: {self.bars[self.right]} {self.bars[self.right].EndTime}")
if newLow:
self.pLows.Add(self.bars[self.right])
self.lPrice = self.bars[self.right].Low
self.newLow = True
self.Last = self.lPrice
if self.verbose:
self.algorithm.Debug("New Low Pivot: {self.bars[self.right]} {self.bars[self.right].EndTime}")
#update emas
if self.lPrice is not None:
self.emaLow.Update(bar.EndTime, self.lPrice)
if self.hPrice is not None:
self.emaHigh.Update(bar.EndTime, self.hPrice)
return True
@property
def IsReady(self):
return self.bars.IsReady and (self.hPrice is not None) and (self.lPrice is not None) and (self.hPrice is not None)
def PrimeData(self, minutes = 600):
self.algorithm.Debug(f"Requesting Minute Resolution History")
history_raw = self.algorithm.History([self.Symbol], minutes)
self.algorithm.Debug(f"Parsing Minute Resolution History")
history = history_raw.droplevel(0).loc[self.Symbol]
hislen = len(history)
self.algorithm.Debug(f"Warming up {hislen} History {self.Symbol} val {history}")
if hislen > 0:
self.algorithm.Debug(f"Minute: {history.index}")
for idx, bar in history.iterrows():
tradeBar = None
time = idx - timedelta(minutes=1)
if math.isnan(bar.volume):
tradebar = TradeBar(time, self.Symbol, bar.open, bar.high, bar.low, bar.close, 0, TimeSpan.FromMinutes(1))
self.algorithm.Debug(f"Bad volume: {bar}")
else:
tradeBar = TradeBar(time, self.Symbol, bar.open, bar.high, bar.low, bar.close, bar.volume, TimeSpan.FromMinutes(1))
#self.algorithm.Debug(f"history: {idx}, corrected {time} {tradeBar}")
if (tradeBar is None) or (bar.close == 0): # bad data from history?
self.algorithm.Debug(f"Invalid tradebar {tradeBar}")
continue
self.Con.Update(tradeBar)
self.algorithm.Debug("Primed Pivots")
return
class TWAP():
def __init__(self, algorithm, smooth_period = 14, daily_reset = True):
self.smooth_period = smooth_period
self.algorithm = algorithm
self.daily_reset = daily_reset
self.sma = SimpleMovingAverage(smooth_period)
self.lastBar = None
self.bar_count = 0
self.sum = 0
self.Value = 0
def Update(self, bar):
if bar is None:
return False
if self.lastBar is None:
self.lastBar = bar
if self.daily_reset and (bar.EndTime.day != self.lastBar.EndTime.day):
self.Reset()
val = (bar.High + bar.Low + bar.Close + bar.Open) / 4
self.sum += val
self.bar_count += 1
self.Value = self.sum / self.bar_count
self.sma.Update(bar.EndTime, self.Value)
self.lastBar = bar
return True
def Reset(self):
self.bar_count = 0
self.sum = 0
self.Value = 0
self.lastBar = None
self.sma.Reset()
def __repr__(self):
return f"TWAP: {self.Value} SMA: {self.sma.Current.Value}"