| Overall Statistics |
|
Total Trades 1143 Average Win 0.64% Average Loss -0.71% Compounding Annual Return -69.106% Drawdown 85.000% Expectancy -0.460 Net Profit -84.876% Sharpe Ratio -2.315 Probabilistic Sharpe Ratio 0% Loss Rate 72% Win Rate 28% Profit-Loss Ratio 0.90 Alpha -0.672 Beta 0.284 Annual Standard Deviation 0.262 Annual Variance 0.069 Information Ratio -2.701 Tracking Error 0.31 Treynor Ratio -2.138 Total Fees $1431.68 Estimated Strategy Capacity $1600000.00 Lowest Capacity Asset DNJR WSZRSZV9QR1H |
from time import time
from pandas import DataFrame
from datetime import timedelta
import datetime as datetime
import statistics as statistics
class PensiveTanSalamander(QCAlgorithm):
def Initialize(self):
#Start date
self.SetStartDate(2020, 1, 1)
# Set Strategy Cash
self.SetCash(100000)
#Universe selection
self.AddUniverse(self.Coarse, self.Fine)
#Set data normalization mode to raw
self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Raw
#Resolution.Minute for all data
self.UniverseSettings.Resolution = Resolution.Minute
#Adding spy for scheduled events
self.spy = self.AddEquity("SPY", Resolution.Minute)
#Extended hours enabled
self.UniverseSettings.ExtendedMarketHours = True
#Turn fillforward to false
self.UniverseSettings.FillForward = False
#Five minute count
self.five_minute_count = False
#Long limit
self.long_limit = False
#Short limit
self.short_limit = False
#Self count for consolidating 5-minute data
self.count = 1
#Multiplier
self.multiplier = 3
#Short trailing prices
self.trailing_prices_short = {}
#Value
self.Value = {}
#Dictionary to keep track of volatility
self.volatility = {}
#HeikinAshi Close Window
self.heikinashi_close_window = {}
#HeikinAshi High Window
self.heikinashi_high_window = {}
#HeikinAshi Low Window
self.heikinashi_low_window = {}
#HeikinAshi Open Window
self.heikinashi_open_window = {}
#ATR Window
self.stocks_atrWindow = {}
#ATR Down Window
self.stocks_atrDown = {}
#ATR Up Window
self.stocks_atrUp = {}
#Accumulator to store five-minute volume data
self.ticker_volume_accumulator = {}
#Accumulator to store five minute highs
self.five_minute_high_accumulator = {}
#Accumulator to store minute low of stocks
self.five_minute_low_accumulator = {}
#Accumulator to store minute close of stocks
self.five_minute_close_accumulator = {}
#Dictionary to store trailing prices
self.trailing_prices_long = {}
#Storing five minute opening prices
self.five_minute_open = {}
#Storing five minute high prices
self.five_minute_high = {}
#Storing five minute low prices
self.five_minute_low = {}
#Storing five minute closing prices
self.five_minute_close = {}
#Storing five minute volume
self.five_minute_volume = {}
#Storing long buy tickets
self.long_buy_tickets = {}
#Storing long sell tickets
self.long_sell_tickets = {}
#Storing short sell tickets
self.short_sell_tickets = {}
#Storing short cover tickets
self.short_cover_tickets = {}
#tickers list to store all tickers in universe
self.tickers = []
#List to store filtered stocks based on earnings reports
self.filtered = []
#List to store securities that have collected sufficient data
self.ticker_tradable = []
#List to store selling stocks
self.selling_stocks = []
#List to store top ten stocks
self.top_ten_sorted = []
#List to store stocks that pass criteria
self.underscore = []
#Clear lists and dictionaries that need to be cleared
self.Schedule.On(self.DateRules.EveryDay(),
self.TimeRules.At(0, 0),
self.Reset)
#Start collecting data at 4.00 am in the morning
self.Schedule.On(self.DateRules.EveryDay("SPY"),
self.TimeRules.AfterMarketOpen("SPY", -330),
self.is_market_hours)
#Last collection of data happens 1 minute after 8.00 pm, hence -241 before market close
self.Schedule.On(self.DateRules.EveryDay("SPY"),
self.TimeRules.BeforeMarketClose("SPY", -241),
self.not_market_hours)
def Coarse(self, coarse):
filtered = [x for x in coarse if x.HasFundamentalData and x.Price > 10 and x.DollarVolume > 20000000] #and x.Price < 10
sortedStocks = sorted(filtered, key = lambda x: x.DollarVolume, reverse = True)
return [x.Symbol for x in sortedStocks]
def Fine(self, fine):
return [x.Symbol for x in fine if self.Time > x.EarningReports.FileDate + timedelta(days=14)
and x.EarningReports.FileDate != datetime.time()][:50]
def OnSecuritiesChanged(self, changes):
for stock in changes.RemovedSecurities:
symbol = stock.Symbol
self.Liquidate(symbol)
#If symbol is filtered out of universe
if symbol in self.trailing_prices_long:
self.trailing_prices_long.pop(symbol)
if symbol in self.trailing_prices_short:
self.trailing_prices_short.pop(symbol)
if symbol in self.tickers:
#Remove candlebar data
self.five_minute_close.pop(symbol, None)
self.five_minute_open.pop(symbol, None)
self.five_minute_high.pop(symbol, None)
self.five_minute_low.pop(symbol, None)
self.five_minute_volume.pop(symbol, None)
#Remove HA data
self.heikinashi_close_window.pop(symbol, None)
self.heikinashi_high_window.pop(symbol, None)
self.heikinashi_low_window.pop(symbol, None)
self.heikinashi_open_window.pop(symbol, None)
#Remove ATR data
self.stocks_atrWindow.pop(symbol, None)
self.stocks_atrDown.pop(symbol, None)
self.stocks_atrUp.pop(symbol, None)
#Remove accumulator data
self.ticker_volume_accumulator.pop(symbol, None)
self.five_minute_high_accumulator.pop(symbol, None)
self.five_minute_low_accumulator.pop(symbol, None)
self.five_minute_close_accumulator.pop(symbol, None)
#Remove ticker from tickers list
self.tickers.remove(symbol)
#Value
self.Value.pop(symbol, None)
history = self.History([stock.Symbol for stock in changes.AddedSecurities], 300, Resolution.Minute)
for stock in changes.AddedSecurities:
symbol = stock.Symbol
if symbol in history.index:
if symbol not in self.tickers:
if symbol == self.spy.Symbol:
continue
#Add to candlebar data
self.five_minute_close[symbol] = []
self.five_minute_low[symbol] = []
self.five_minute_high[symbol] = []
self.five_minute_open[symbol] = []
self.five_minute_volume[symbol] = []
#Add to HA data
self.heikinashi_close_window[symbol] = []
self.heikinashi_high_window[symbol] = []
self.heikinashi_low_window[symbol] = []
self.heikinashi_open_window[symbol] = []
#Add to ATR data
self.stocks_atrWindow[symbol] = []
self.stocks_atrDown[symbol] = []
self.stocks_atrUp[symbol] = []
#Add to accumulator data
self.ticker_volume_accumulator[symbol] = 0
self.five_minute_low_accumulator[symbol] = []
self.five_minute_high_accumulator[symbol] = []
self.five_minute_close_accumulator[symbol] = []
#Add symbol to tickers list
self.tickers.append(symbol)
#Value
self.Value[symbol] = 0
def OnData(self, data):
#Start logic
if self.market_hours == True:
### Check if there has been a long order submitted
if len(self.long_buy_tickets) > 0:
### Loop through long ticket storage
for i in self.long_buy_tickets:
#Check if order for ticket has been filled
if self.long_buy_tickets[i].Status == OrderStatus.Filled:
pass
#Check if partially filled, if so, cancel order
elif self.long_buy_tickets[i].Status == OrderStatus.PartiallyFilled:
#Cancel order
self.long_buy_tickets[i].Cancel("Cancelled order")
else:
0
self.long_buy_tickets.clear()
### Check if long sell order submitted
if len(self.long_sell_tickets) > 0:
### Loop through long ticket storage
for i in self.long_sell_tickets:
#Check if order for ticket has been filled
if self.long_sell_tickets[i].Status == OrderStatus.Filled:
pass
#Check if partially filled, if so, cancel order
elif self.long_sell_tickets[i].Status == OrderStatus.PartiallyFilled:
#Cancel order
self.long_sell_tickets[i].Cancel("Cancelled order")
else:
0
self.long_sell_tickets.clear()
### Check if short sell order submitted
if len(self.short_sell_tickets) > 0:
### Loop through long ticket storage
for i in self.short_sell_tickets:
#Check if order for ticket has been filled
if self.short_sell_tickets[i].Status == OrderStatus.Filled:
pass
#Check if partially filled, if so, cancel order
elif self.short_sell_tickets[i].Status == OrderStatus.PartiallyFilled:
#Cancel order
self.short_sell_tickets[i].Cancel("Cancelled order")
else:
0
self.short_sell_tickets.clear()
### Check if short cover order submitted
if len(self.short_cover_tickets) > 0:
### Loop through long ticket storage
for i in self.short_cover_tickets:
#Check if order for ticket has been filled
if self.short_cover_tickets[i].Status == OrderStatus.Filled:
pass
#Check if partially filled, if so, cancel order
elif self.short_cover_tickets[i].Status == OrderStatus.PartiallyFilled:
#Cancel order
self.short_cover_tickets[i].Cancel("Cancelled order")
else:
0
self.short_cover_tickets.clear()
#Add count by 1 everytime data is received
self.count += 1
#Trigger 5 minute count if count is divisable by 5
if self.count % 5 == 0:
self.five_minute_count = True
#Store data received in a global variable
self.data = data
#Collect tradebars
self.tradebars = data.Bars
#Store data from self.data into smaller pieces for processing
#Loop through self.tickers
for i in self.tickers:
#Check if round of data contains data for ticker[i]
if not self.data.ContainsKey(i):
continue
#Check if item contains trade bar data
if not self.data.Bars.ContainsKey(i):
continue
#Create temporary variable to store ticker volume
ticker_volume = self.tradebars[i].Volume
#Update five minute volume accumulator for ticker
self.ticker_volume_accumulator[i] += ticker_volume
#Update minute high of stocks
self.five_minute_high_accumulator[i].append(self.tradebars[i].High)
#Update minute low of stocks
self.five_minute_low_accumulator[i].append(self.tradebars[i].Low)
#Update minute close of stocks
self.five_minute_close_accumulator[i].append(self.tradebars[i].Close)
#Check if it is the first minute that this stock has a trade
if len(self.five_minute_high_accumulator[i]) == 1:
#Save opening prices
self.five_minute_open[i].append(self.tradebars[i].Open)
#If five minute count is true
if self.five_minute_count == True:
#Loop through securities
for i in self.tickers:
#If there has been a trade
if self.ticker_volume_accumulator[i] > 0:
#Get highest point from five minute high accumulator
x = max(self.five_minute_high_accumulator[i])
#Update five minute high list
self.five_minute_high[i].append(x)
#Reset accumulator
self.five_minute_high_accumulator[i] = []
del x
#Get lowest point from five minute low accumulator
y = min(self.five_minute_low_accumulator[i])
#Update five minute low list
self.five_minute_low[i].append(y)
#Reset accumulator
self.five_minute_low_accumulator[i] = []
del y
#Update five minute volume tracker
self.five_minute_volume[i].append(self.ticker_volume_accumulator[i])
#Reset accumulator
self.ticker_volume_accumulator[i] = 0
#Update last close price
last_closing_price = self.five_minute_close_accumulator[i][-1]
self.five_minute_close[i].append(last_closing_price)
#Reset accumulator
self.five_minute_close_accumulator[i] = []
#Check if length of ticker[i] price tracker in dictionary is greater than 30
if len(self.five_minute_close[i]) > 30:
self.five_minute_close[i] = self.five_minute_close[i][-30:]
else:
continue
#Check if length of ticker[i] open tracker in dictionary is greater than 30
if len(self.five_minute_open[i]) > 30:
self.five_minute_open[i] = self.five_minute_open[i][-30:]
else:
continue
#Check if length of ticker[i] high tracker in dictionary is greater than 30
if len(self.five_minute_high[i]) > 30:
self.five_minute_high[i] = self.five_minute_high[i][-30:]
else:
continue
#Check if length of ticker[i] low tracker in dictionary is greater than 30
if len(self.five_minute_low[i]) > 30:
self.five_minute_low[i] = self.five_minute_low[i][-30:]
else:
continue
#Check if length of ticker[i] volume tracker in dictionary is greater than 30
if len(self.five_minute_volume[i]) > 30:
self.five_minute_volume[i] = self.five_minute_volume[i][-30:]
else:
continue
### Heikin Ashi bars
HA_open = (self.five_minute_open[i][-2] + self.five_minute_close[i][-2]) / 2
HA_close = (self.five_minute_open[i][-1] + self.five_minute_close[i][-1] + self.five_minute_high[i][-1] + self.five_minute_low[i][-1]) / 4
HA_high = self.five_minute_high[i][-1]
HA_low = self.five_minute_low[i][-1]
self.heikinashi_close_window[i].append(HA_close)
self.heikinashi_high_window[i].append(HA_high)
self.heikinashi_low_window[i].append(HA_low)
self.heikinashi_open_window[i].append(HA_open)
#Check for length of HA close, high, low, open window
if len(self.heikinashi_close_window[i]) > 10:
self.heikinashi_close_window[i] = self.heikinashi_close_window[i][-10:]
else:
continue
if len(self.heikinashi_high_window[i]) > 10:
self.heikinashi_high_window[i] = self.heikinashi_high_window[i][-10:]
else:
continue
if len(self.heikinashi_low_window[i]) > 10:
self.heikinashi_low_window[i] = self.heikinashi_low_window[i][-10:]
else:
continue
if len(self.heikinashi_open_window[i]) > 10:
self.heikinashi_open_window[i] = self.heikinashi_open_window[i][-10:]
else:
continue
#Create create ATR and update
bars = {}
bars["open"] = self.five_minute_open[i]
bars["close"] = self.five_minute_close[i]
bars["high"] = self.five_minute_high[i]
bars["low"] = self.five_minute_low[i]
ATR = DataFrame (bars, columns = ["open","close","high","low"])
ATR["high_low"] = abs(ATR['high'] - ATR['low'])
ATR["high_close"] = abs(ATR['high'] - ATR['close'].shift(1))
ATR["low_close"] = abs(ATR['low'] - ATR['close'].shift(1))
ATR["True_Range"] = ATR[["high_low","high_close","low_close"]].max(axis=1,skipna=False)
ATR["ATR"] = ATR["True_Range"].rolling(2).mean()
self.stocks_atrWindow[i] = ATR["ATR"].to_list()[-10:]
#Condition for atrDown
hl = (self.heikinashi_high_window[i][-1] + self.heikinashi_low_window[i][-1]) / 2
hltwo = (self.heikinashi_high_window[i][-1] + self.heikinashi_low_window[i][-1]) / 2
hltwoPrev = (self.heikinashi_high_window[i][-2] + self.heikinashi_low_window[i][-2]) / 2
downNow = hltwo - (self.multiplier * self.stocks_atrWindow[i][-1])
downPrev = hltwoPrev - (self.multiplier * self.stocks_atrWindow[i][-2])
if self.heikinashi_close_window[i][-2] > downPrev:
atrDown = max(downPrev, downNow)
else:
atrDown = downNow
self.stocks_atrDown[i].append(atrDown)
#Condition for atrUp
upNow = hltwo + (self.multiplier * self.stocks_atrWindow[i][-1])
upPrev = hltwoPrev + (self.multiplier * self.stocks_atrWindow[i][-2])
if self.heikinashi_close_window[i][-2] < upPrev:
atrUp = min(upNow, upPrev)
else:
atrUp = upNow
self.stocks_atrUp[i].append(atrUp)
#Check atrUp and atrDown length
if len(self.stocks_atrUp[i]) > 10:
self.stocks_atrUp[i] = self.stocks_atrUp[i][-10:]
else:
continue
if len(self.stocks_atrDown[i]) > 10:
self.stocks_atrDown[i] = self.stocks_atrDown[i][-10:]
else:
continue
#Value
if self.heikinashi_close_window[i][-1] > self.stocks_atrUp[i][-2]:
self.Value[i] = self.stocks_atrDown[i][-1]
if self.heikinashi_close_window[i][-1] < self.stocks_atrDown[i][-2]:
self.Value[i] = self.stocks_atrUp[i][-1]
#Check value
if self.Value[i] == 0:
continue
#Only trade if there are at least 5 atrDown or atrUp items
self.ticker_tradable.append(i)
#Check how many stocks we are short in portfolio, we're doing 2 as maximum limit
count = 0
for kvp in self.Portfolio:
security_holding = kvp.Value
if security_holding.Invested:
quantity = security_holding.Quantity
if quantity < 0:
count += 1
if count >= 2:
self.short_limit = True
else:
self.short_limit = False
### Sell signal long
if self.five_minute_count == True:
#Start looping through owned securities
for kvp in self.Portfolio:
security_holding = kvp.Value
if security_holding.Invested:
symbol = security_holding.Symbol
quantity = security_holding.Quantity
price = security_holding.AveragePrice
#If symbol is long
if quantity > 0:
## Order submitted for ticker previously but not filled, try again at much lower price
if symbol not in self.trailing_prices_long and symbol in self.tickers:
self.long_sell_tickets[symbol] = self.LimitOrder( str(symbol), -quantity, self.five_minute_close[symbol][-1] * 0.80)
### Trailing stop loss 3% long
elif symbol in self.trailing_prices_long:
if self.five_minute_close[symbol][-1] > self.trailing_prices_long[symbol]:
self.trailing_prices_long[symbol] = self.five_minute_close[symbol][-1]
elif self.five_minute_close[symbol][-1] < self.trailing_prices_long[symbol]:
percent_loss = self.five_minute_close[symbol][-1] / self.trailing_prices_long[symbol]
if percent_loss < 0.97:
#Exit long position at 5% discount
self.long_sell_tickets[symbol] = self.LimitOrder( str(symbol), -quantity, self.five_minute_close[symbol][-1] * 0.95)
self.trailing_prices_long.pop(symbol)
# Submit short order at 3% discount if short limit is not matched
if self.short_limit == False:
self.short_sell_tickets[symbol] = self.LimitOrder( str(symbol), -quantity * 0.75, self.five_minute_close[symbol][-1] * 0.97)
self.trailing_prices_short[symbol] = self.five_minute_close[symbol][-1]
else:
0
### Profit target 10%
elif symbol in self.trailing_prices_long and symbol not in self.long_sell_tickets:
if (self.five_minute_close[symbol][-1] / price) > 1.1:
#Exit long position at 5% discount
self.long_sell_tickets[symbol] = self.LimitOrder( str(symbol), -quantity, self.five_minute_close[symbol][-1] * 0.95)
self.trailing_prices_long.pop(symbol)
#Submit short order at 3% discount
if self.short_limit == False:
self.short_sell_tickets[symbol] = self.LimitOrder( str(symbol), -quantity * 0.75, self.five_minute_close[symbol][-1] * 0.97)
self.trailing_prices_short[symbol] = self.five_minute_close[symbol][-1]
else:
0
### Buy to cover signal short
if self.five_minute_count == True:
for kvp in self.Portfolio:
security_holding = kvp.Value
if security_holding.Invested:
symbol = security_holding.Symbol
quantity = security_holding.Quantity
price = security_holding.AveragePrice
#If symbol is short
if quantity < 0:
### Order submitted for ticker previously but not filled, try again at much higher price
if symbol not in self.trailing_prices_short and symbol in self.tickers:
self.short_cover_tickets[symbol] = self.LimitOrder( str(symbol), -quantity, self.five_minute_close[symbol][-1] * 1.2)
### Trailing stop loss 3% short
elif symbol in self.trailing_prices_short:
if self.five_minute_close[symbol][-1] < self.trailing_prices_short[symbol]:
self.trailing_prices_short[symbol] = self.five_minute_close[symbol][-1]
elif self.five_minute_close[symbol][-1] > self.trailing_prices_short[symbol]:
percent_loss = self.five_minute_close[symbol][-1] / self.trailing_prices_short[symbol]
if percent_loss > 1.03:
#Exit short position at 5% premium
self.short_cover_tickets[symbol] = self.LimitOrder( str(symbol), -quantity, self.five_minute_close[symbol][-1] * 1.05)
self.trailing_prices_short.pop(symbol)
else:
0
### Profit target 10%
elif symbol in self.trailing_prices_short and symbol not in self.short_cover_tickets:
if (self.five_minute_close[symbol][-1] / price) < 0.9:
#Exit short position at 5% discount
self.short_cover_tickets[symbol] = self.LimitOrder( str(symbol), -quantity, self.five_minute_close[symbol][-1] * 1.05)
self.trailing_prices_short.pop(symbol)
else:
0
### Buy signal
if self.five_minute_count == True and len(self.ticker_tradable) > 0 and self.long_limit == False:
### Before running buy signal, check how many stocks we are long in portfolio, we're doing 2 long as maximum limit
count = 0
already_owned_stocks = []
for kvp in self.Portfolio:
security_holding = kvp.Value
if security_holding.Invested:
symbol = security_holding.Symbol
already_owned_stocks.append(symbol)
quantity = security_holding.Quantity
if count >= 2:
self.long_limit = True
else:
self.long_limit = False
### Loop through tradable tickers
for i in self.ticker_tradable:
#Check for momentum - if mean of last 3 close prices less than 3% of mean of last 47 prices, pass
if statistics.mean(self.five_minute_close[i][-3:]) < (statistics.mean(self.five_minute_close[i][:-3]) * 1.03):
continue
#Check for volume growth - if mean of last 3 volume amounts less than mean of last 47 volume amounts, pass
if statistics.mean(self.five_minute_volume[i][-3:]) < (statistics.mean(self.five_minute_volume[i][:-3]) * 1.03):
continue
#Check for volatility - volatility measured through a comparison of standard deviation over all stocks witthin universe
self.volatility[i] = statistics.stdev(self.five_minute_close[i])
#Sort volatility in descending order in top ten list
for i in self.volatility:
if len(self.top_ten_sorted) == 0:
self.top_ten_sorted.append(i)
elif self.volatility[i] > self.volatility[self.top_ten_sorted[0]]:
self.top_ten_sorted.insert(0,i)
else:
0
#Get top ten ticker symbols in terms of volatility
if len(self.top_ten_sorted) > 10:
self.top_ten_sorted = self.top_ten_sorted[:10]
#Loop through top ten list to run buy signal given by Ajmal
for i in self.top_ten_sorted[:10]:
if self.Securities[i].Price < self.Value[i] and i not in already_owned_stocks:
self.long_buy_tickets[i] = self.LimitOrder( str(i), self.CalculateOrderQuantity(i , 0.2), self.five_minute_close[i][-1] * 1.03)
self.Debug("bought")
self.trailing_prices_long[i] = self.five_minute_close[i][-1]
#If count divisable by 5, clear data
if self.five_minute_count == True:
#Set 5 minute count to false
self.five_minute_count = False
#Clear tradable list
self.ticker_tradable.clear()
#Clear underscore
self.underscore.clear()
#Clear top ten
self.top_ten_sorted.clear()
#Clear volatility
self.volatility.clear()
def Reset(self):
self.count = 0
self.ticker_tradable.clear()
self.underscore.clear()
self.five_minute_count = False
def is_market_hours(self):
self.market_hours = True
def not_market_hours(self):
self.market_hours = False