| Overall Statistics |
|
Total Orders 22 Average Win 4.43% Average Loss -13.10% Compounding Annual Return -65.748% Drawdown 74.200% Expectancy -0.554 Start Equity 1000 End Equity 337.53 Net Profit -66.247% Sharpe Ratio -0.454 Sortino Ratio -0.099 Probabilistic Sharpe Ratio 1.475% Loss Rate 67% Win Rate 33% Profit-Loss Ratio 0.34 Alpha -0.356 Beta 0.192 Annual Standard Deviation 0.676 Annual Variance 0.457 Information Ratio -0.799 Tracking Error 0.708 Treynor Ratio -1.599 Total Fees $23.08 Estimated Strategy Capacity $230000.00 Lowest Capacity Asset SNDL X6N3VNEHX84L Portfolio Turnover 2.12% |
# region imports
from AlgorithmImports import *
import numpy as np
# endregion
class Fallingknives(QCAlgorithm):
def Initialize(self):
self.logging = True
self.min_recent_volume_increase = 8 # Minimum recent volume increase multiplier
self.min_recent_volume_decrease = 8
self.min_price_spike = 1.5 # Minimum price spike percentage
self.min_price_decline = 0.2 # Minimum price decline percentage after spike
self.stop_loss_pct = .1 # Stop-loss percentage for short positions
self.take_profit_pct = 2 # Take-profit percentage for short positions
self.decline_window = 3 * 60 # hours to check for rapid decline
self.min_hold_time = 1 # days to hold a short position
self.max_hold_time = 2 # days to hold a short position
self.min_decline_window_size = 3 * 60 # number of hours over which to check for rapid decline
self.rebalanceTime = datetime.min
self.stay_in_cadidate_list = 7 # days to stay in candidate list
self.split_ratio = .25
self.volumeBarsDaysSize = 10 * 60
#self.volumeBarsDaysSize = self.volumeBarsDaysSize * 24
self.SetStartDate(2020, 3, 6) # Set start date
self.SetEndDate(2021, 3, 10) # Set end date (replace with desired end date)
self.SetCash(1000) # Set initial capital
self.final_universe_size = 100 # Number of stocks in final universe
self.activeStocks = set()
self.AddUniverse(self.CoarseFilter, self.FineFilter)
self.UniverseSettings.Resolution = Resolution.Minute
self.TickerToAnalyze = "ADIL"
self.volumes = {}
self.rolling_prices = {}
self.shortCandidates = set()
self.volumePassed = set()
self.days_in_candidate_list = {}
self.trailing_stop_thresholds = {}
self.hold_times = {}
self.bollingers = {}
self.trailing_stop_percents = [(100, 100), (-.15, -.1), (-.25, -.16), (-.35, -.26), (-.45, -.36), (-.55, -.46), (-.65, -.56), (-.75, -.66), (-.85, -.76), (-.95, -.86)]
self.schedule.on(self.date_rules.every(DayOfWeek.MONDAY, DayOfWeek.TUESDAY, DayOfWeek.WEDNESDAY, DayOfWeek.THURSDAY, DayOfWeek.FRIDAY),
self.time_rules.at(15, 30),
lambda: self.UpdateCandidateList())
def UpdateCandidateList(self):
# remove stocks that have been in the candidate list for too long
toremove = []
for symbol in self.shortCandidates:
if symbol in self.days_in_candidate_list:
self.days_in_candidate_list[symbol] += 1
if self.days_in_candidate_list[symbol] > self.stay_in_cadidate_list:
self.Log("Removing stock: " + str(symbol.value) + " from candidate list after " + str(self.stay_in_cadidate_list) + " days.")
toremove.append(symbol)
else:
self.days_in_candidate_list[symbol] = 1
for symbol in toremove:
self.shortCandidates.remove(symbol)
self.days_in_candidate_list.pop(symbol)
# go through ever symbol in portfolio and update hold times
for symbol in self.Portfolio.Keys:
if self.Portfolio[symbol].Invested:
if symbol in self.hold_times:
self.hold_times[symbol] += 1
else:
self.hold_times[symbol] = 1
if self.hold_times[symbol] > self.max_hold_time:
self.Log("Max hold time reached for stock: " + str(symbol.value) + " at time/date: " + str(self.Time))
self.SetHoldings(symbol, 0)
self.hold_times.pop(symbol)
self.bollingers.pop(symbol)
self.volumes.pop(symbol)
self.rolling_prices.pop(symbol)
self.activeStocks.remove(symbol)
def CoarseFilter(self, coarse):
# Rebalancing weekly
if self.Time <= self.rebalanceTime:
return self.Universe.Unchanged
self.rebalanceTime = self.Time + timedelta(7)
#sortedByPE = sorted(coarse, key=lambda x: x.MarketCap)
#final = [x.Symbol for x in sortedByPE if x.MarketCap > 0][:200]
#return final
#competition coarse
sortedCoarse = sorted(coarse, key=lambda c:c.DollarVolume, reverse=True)
return [c.Symbol for c in sortedCoarse][:1000]
def FineFilter(self, fine):
#sortedByDollarVolume = sorted(fine, key=lambda x: x.DollarVolume, reverse=True)
#final = [x.Symbol for x in sortedByDollarVolume if x.HasFundamentalData and x.price > 1
# ][:self.final_universe_size]
#and x.SecurityReference.ExchangeId == "NAS"
# competition fine filter
sortedByPE = sorted(fine, key=lambda x: x.MarketCap)
final = [x.Symbol for x in sortedByPE if x.MarketCap > 0][:200]
return final
def OnSecuritiesChanged(self, changes):
# close positions in removed securities
for x in changes.RemovedSecurities:
#self.Liquidate(x.Symbol)
if self.TickerToAnalyze in str(x.Symbol):
self.Log("removed " + str(x.Symbol) + " from universe")
if x.Symbol in self.activeStocks:
self.activeStocks.remove(x.Symbol)
# can't open positions here since data might not be added correctly yet
for x in changes.AddedSecurities:
history_trade_bar = self.history[TradeBar](x.Symbol, self.volumeBarsDaysSize, Resolution.Minute)
history_trade_bar_decline = self.history[TradeBar](x.Symbol, self.decline_window, Resolution.Minute)
self.volumes[x.Symbol] = RollingWindow[float](self.volumeBarsDaysSize)
self.rolling_prices[x.Symbol] = RollingWindow[float](self.decline_window)
for trade_bar in history_trade_bar:
self.volumes[x.Symbol].Add(trade_bar.Volume)
for trade_bar in history_trade_bar_decline:
self.rolling_prices[x.Symbol].Add(trade_bar.Close)
self.bollingers[x.symbol] = self.bb(x.Symbol, 20, 2)
self.activeStocks.add(x.Symbol)
def OnData(self, data):
# Check for shorting and exit opportunities
for symbol in self.activeStocks:
if not data.ContainsKey(symbol) or data[symbol] is None:
continue
self.volumes[symbol].Add(data[symbol].Volume)
self.rolling_prices[symbol].Add(data[symbol].price)
if symbol not in self.shortCandidates:
self.IsShortingCandidate(data, symbol)
if symbol in self.shortCandidates:
self.Log("Stock: " + str(symbol.value) + " was in list, check for rapid decline")
self.RapidDecline(data, symbol)
elif self.IsShortExitCandidate(data, symbol):
self.Log("Covering stock: " + str(symbol.value) + " at price: " + str(data[symbol].price) + " at time/date: " + str(self.Time))
self.SetHoldings(symbol, 0)
else:
self.RapidDecline(data, symbol)
def RapidDecline(self, data, symbol):
# check if stock is in a rapid decline over last x days
if not data.ContainsKey(symbol) or data[symbol] is None:
#self.Log("No data for stock: " + str(symbol.value))
return False
if not symbol in self.bollingers:
return False
if not self.bollingers[symbol].IsReady:
#self.Log("bollingers aint ready")
return False
#self.Log("Checking for rapid decline for stock: " + str(symbol.value) + " at time/date: " + str(self.Time))
old_prices_1 = np.fromiter([i for i in self.rolling_prices[symbol]], dtype=float, count=self.rolling_prices[symbol].Count)[30:60]
old_prices_2 = np.fromiter([i for i in self.rolling_prices[symbol]], dtype=float, count=self.rolling_prices[symbol].Count)[0:30]
mean_old_price_1 = np.max(old_prices_1)
mean_old_price_2 = np.mean(old_prices_2)
price_decline = mean_old_price_2 / mean_old_price_1
volumecomparesList1 = np.fromiter([i for i in self.volumes[symbol]], dtype=float, count=self.volumes[symbol].Count)[60:120]
volumecomparesList1 = volumecomparesList1[volumecomparesList1 != 0]
volumecomparesList2 = np.fromiter([i for i in self.volumes[symbol]], dtype=float, count=self.volumes[symbol].Count)[0:60]
volumecomparesList2 = volumecomparesList2[volumecomparesList2 != 0]
lasthourvolume = np.mean(volumecomparesList1)
thishourvolume = np.mean(volumecomparesList2)
min_volume_jump = thishourvolume / lasthourvolume
if "ENSV" in str(symbol):
self.Log("----Price decline: " + str(price_decline) + " at time/date: " + str(self.Time))
self.Log("----price averaged over back half of hour: " + str(mean_old_price_1) + " at time/date: " + str(self.Time))
self.Log("----min volume jump: " + str(min_volume_jump) + " at time/date: " + str(self.Time))
# min_volume_jump >= self.min_recent_volume_decrease and (
if price_decline < 1 - self.min_price_decline:# data[symbol].price <= self.bollingers[symbol].UpperBand.Current.Value:
self.Log("----Price decline: " + str(price_decline) + " at time/date: " + str(self.Time))
self.Log("----price averaged over back half of hour: " + str(mean_old_price_1) + " at time/date: " + str(self.Time))
self.Log("----min volume jump: " + str(min_volume_jump) + " at time/date: " + str(self.Time))
self.Log("shorting stock: " + str(symbol.value) + " at price: " + str(data[symbol].price) + " at time/date: " + str(self.Time))
self.SetHoldings(symbol, -self.split_ratio)
self.shortCandidates.remove(symbol)
self.trailing_stop_thresholds[symbol] = self.trailing_stop_percents[0]
return True
return False
def IsShortingCandidate(self, data, symbol):
# Check for recent volume increase, price spike, and confirmation decline
# check if stock is ready to analyze
if not data.ContainsKey(symbol) or data[symbol] is None:
if self.TickerToAnalyze in str(symbol.value):
self.Log("No data for stock: " + str(symbol.value))
return
if not self.volumes[symbol].IsReady:
if self.TickerToAnalyze in str(symbol.value):
self.Log("Volumes not ready for stock: " + str(symbol.value))
return
# check for recent volume increase
# find the median volume over the volumes window and compare to today
#convert volume rolling window to list, remove 0 values and find minimum
volumecomparesList1 = np.fromiter([i for i in self.volumes[symbol]], dtype=float, count=self.volumes[symbol].Count)[60:120]
volumecomparesList1 = volumecomparesList1[volumecomparesList1 != 0]
volumecomparesList2 = np.fromiter([i for i in self.volumes[symbol]], dtype=float, count=self.volumes[symbol].Count)[0:60]
volumecomparesList2 = volumecomparesList2[volumecomparesList2 != 0]
if len(volumecomparesList1) == 0:
return
max_volume_jump = np.mean(volumecomparesList2) / np.mean(volumecomparesList1)
if max_volume_jump < self.min_recent_volume_increase and symbol not in self.volumePassed:
#if "ENSV" in str(symbol):
#self.Log("Stock: " + str(symbol.value) + " did not pass volume check with jump: " + str(max_volume_jump) + " at time/date: " + str(self.Time))
return
if symbol not in self.volumePassed:
self.volumePassed.add(symbol)
self.Log("*********" + str(symbol.value) + " Passed volume check with jump: " + str(max_volume_jump) + " at time/date: " + str(self.Time) + "*********")
old_prices1 = np.fromiter([i for i in self.rolling_prices[symbol]], dtype=float, count=self.rolling_prices[symbol].Count)[20:120]
old_prices2 = np.fromiter([i for i in self.rolling_prices[symbol]], dtype=float, count=self.rolling_prices[symbol].Count)[0:20]
price_spike = np.mean(old_prices2) / np.mean(old_prices1)
if price_spike < self.min_price_spike:
#if "ENSV" in str(symbol):
self.Log("$$$$$$$$$$$$$Stock: " + str(symbol.value) + " did not pass price spike check with spike: " + str(price_spike) + " at time/date: " + str(self.Time))
return
self.volumePassed.remove(symbol)
self.Log("$$$$$$$$$$$$$$$" + str(symbol.value) + " Passed price spike check, adding to candidate list wiht spike: " + str(price_spike) + " at time/date: " + str(self.Time) + "*********")
# Add stock to candidates to watch to short
self.shortCandidates.add(self.AddEquity(symbol, Resolution.Minute).Symbol)
def IsShortExitCandidate(self, data, symbol):
# Exit on stop-loss or take-profit
if not data.ContainsKey(symbol) or data[symbol] is None or symbol not in self.hold_times:
return False
if self.Portfolio[symbol].Invested:
entry_price = self.Portfolio[symbol].AveragePrice
else:
return False
current_price = data[symbol].close
if current_price <= entry_price * (1 - self.take_profit_pct) and self.hold_times[symbol] >= self.min_hold_time:
self.Log("Take profit hit for stock: " + str(symbol.value) + " at price: " + str(data[symbol].price) + " at time/date: " + str(self.Time))
self.bollingers.pop(symbol)
return True
elif current_price >= entry_price * (1 + self.stop_loss_pct) and self.hold_times[symbol] >= self.min_hold_time:
self.Log("Stop loss hit for stock: " + str(symbol.value) + " at price " + str(data[symbol].price) + " at time/date: " + str(self.Time))
self.bollingers.pop(symbol)
return True
elif current_price >= entry_price * (1 + self.trailing_stop_thresholds[symbol][1]) and self.hold_times[symbol] >= self.min_hold_time:
self.Log("Trailing stop hit for stock: " + str(symbol.value) + " at price " + str(data[symbol].price) + " at time/date: " + str(self.Time))
self.bollingers.pop(symbol)
return True
for thresh in self.trailing_stop_percents:
if thresh[0] >= (data[symbol].close - entry_price) / entry_price and thresh[0] < self.trailing_stop_thresholds[symbol][0]:
self.trailing_stop_thresholds[symbol] = thresh
# finally, sell if price moved above upper bollinger band
#if data[symbol].price > self.bollingers[symbol].UpperBand.Current.Value:
# self.Log("Price moved above upper bollinger band for stock: " + str(symbol.value) + " at price " + str(data[symbol].price) + " at time/date: " + str(self.Time))
# return True
return False