| Overall Statistics |
|
Total Trades 263 Average Win 5.83% Average Loss -1.29% Compounding Annual Return 38.219% Drawdown 12.800% Expectancy 0.689 Net Profit 214.730% Sharpe Ratio 1.775 Probabilistic Sharpe Ratio 87.447% Loss Rate 69% Win Rate 31% Profit-Loss Ratio 4.53 Alpha 0 Beta 0 Annual Standard Deviation 0.184 Annual Variance 0.034 Information Ratio 1.775 Tracking Error 0.184 Treynor Ratio 0 Total Fees $419.19 Estimated Strategy Capacity $95000.00 Lowest Capacity Asset BSQR RP1305HZ49K5 |
'''
https://www.quantconnect.com/forum/discussion/10630/shorting-bubbles-at-the-top/p1
detects bubbles and shorts
'''
class Roboto(QCAlgorithm):
# bubble signal
FAST = 3 # for EMA, lower values >> higher risk, higher returns
SLOW = 30 # for EMA
MAGNITUDE = 2.00 # magnitude of the bubble
# position configuration for opening in CheckForEntries
FREE_CASH = 0 # adjust based on risk tolerance for FreePortfolioValuePercentage in Initialize
DYN_POSITION_SIZE = -0.50 # variable affecting dynamic position sizing for next short positions
MAX_POSITION_SIZE = -0.15 # maximum individual position size. has a big effect on total returns (more negative values >> larger returns)
MIN_BP = 0.01 # liquidate most profitable position if buying power (= MarginRemaining / PortfolioValue) is too low
OF_TOTAL_DV = 0.05
MAX_POS = 9 # max number of open positions
USE_BULL = False
# position configuration for liquidation in CheckForExits
CUT_LOSS = -0.10 # -10% = -0.10 !!!
CUT_LOSS_DAY_TWO = -0.05
TCL_GET_EVEN = 0.05 # how fast is TCL trailing until break even (0.0 for none) !!!
TCL_TRAIL = 0.00 # how fast is TCL trailing after break even (0.0 for none, if larger than TCL_GET_EVEN, overrides it) !!!
TAKE_PROFIT = 0.55 # 55% = 0.55 !!!
MAX_POSITION_AGE = 45 # 45 days optimal
TP_TRAIL = 0.5 # decreases TP with age up to 0.55 * (1 - 0.5) at MAX_POSITION_AGE (0.0 for none) !!!
TP_KICK_IN = 0.7 # decreases TP with age kicking in at 80% of MAX_POSITION_AGE (never 1.0)
# liquidity configuration
MIN_Price = 5. # min price !!!
MAX_Price = 50. # max price !!!
MIN_VOLUME = 1e6 # min volume !!!
MIN_DOLLAR_VOLUME = 1e5 # min dollar volume
#MIN_TIME_OF_HISTORY = 0 # only include if there is a min of x days of history data (currently unused)
MIN_TIME_IN_UNIVERSE = SLOW # min amount of time a security must remain in the universe before being removed (drives the speed of the backtest)
# funnel
N_COARSE = MAX_POS # max number of coarse securities
# portfolio configuration
STARTING_CASH = 10000 # for backtest in Initialize
LEVERAGE = 1
# debugging level
#MSGS = ['main', 'filter', 'logic', 'order', 'debug', 'error', 'debug_order_event', 'stale', 'squeeze', 'bull', 'shortable']
MSGS = ['logic', 'order', 'error', 'stale', 'shortable']
class SecurityData:
# access yesterday's close via self.universe[Symbol].close
def __init__(self, symbol, history):
self.symbol = symbol
self.close = 0
self.ratio = 0
self.isBubble = False
self.fast = ExponentialMovingAverage(Roboto.FAST)
self.slow = ExponentialMovingAverage(Roboto.SLOW)
self.vol = ExponentialMovingAverage(Roboto.SLOW)
# update all but the last day, as this will be updated after adding a new obj
for bar in history[:history.size-1].itertuples():
self.fast.Update(bar.Index[1], bar.close)
self.slow.Update(bar.Index[1], bar.close)
self.vol.Update(bar.Index[1], ((bar.open + bar.close)/2.0) * bar.volume) # we need to init with DollarVolume
def update(self, time, price, volume, magnitude):
self.close = price
self.ratio = 0
self.isBubble = False
if self.fast.Update(time, price) and self.slow.Update(time, price) and self.vol.Update(time, volume):
self.ratio = self.fast.Current.Value / self.slow.Current.Value
self.isBubble = (self.ratio > magnitude) and (price / self.slow.Current.Value > magnitude)
def Initialize(self):
self.Debug("*** Roboto is initializing ***")
self.SetTimeZone("America/New_York")
# backtest
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
# self.SetBrokerageModel(InteractiveBrokersBrokerageModelWithShortable())
self.SetStartDate(2018, 1, 1)
self.SetEndDate(2021, 7, 16)
self.minutes = 5
res = Resolution.Minute
# portfolio
self.SetCash(Roboto.STARTING_CASH)
self.Settings.FreePortfolioValuePercentage = Roboto.FREE_CASH
self.min_dollar_vol = Roboto.MIN_DOLLAR_VOLUME
# universe selection
self.UniverseSettings.Resolution = res
self.UniverseSettings.MinimumTimeInUniverse = Roboto.MIN_TIME_IN_UNIVERSE # min amount of time a security must remain in the universe before being removed
self.AddUniverse(self.CoarseFilter)
self.universe = {} # contains all tracked securities in the universe
self.open = [] # positions to open based on signal
# further vars
self.positions = {} # opened positions data
self.trail_cut_loss = {} # contains trailing max of unrealized profit pct for cut loss
self.bp = Roboto.LEVERAGE # buying power
# set security symbols
self.market = self.AddEquity("SPY", res).Symbol
self.bull = self.AddEquity("QQQ", res).Symbol
self.excl_smbls = [self.market, self.bull]
self.magnitude = Roboto.MAGNITUDE
# schedule our CheckForExits check for liquidation of positions using range(start, stop, step), NYSE 9:30 .. 16:00
for i in range(0, 389, self.minutes):
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen(self.bull, i),
self.CheckForExits)
# schedule our CheckForStaleOrders check for cancelation of pending orders using range(start, stop, step), NYSE 9:30 .. 16:00
for i in range(0, 389, 60):
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen(self.bull, i),
self.CheckForStaleOrders)
# schedule our CheckForShortSqueeze check for quick exit of an open order using range(start, stop, step), NYSE 9:30 .. 16:00
for i in range(0, 389, 3):
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen(self.bull, i),
self.CheckForShortSqueeze)
# schedule our CheckForEntries check for shorting and entering bull security
for i in range(60, 389, 60):
self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.AfterMarketOpen(self.bull, i),
self.CheckForEntries)
def CoarseFilter(self, coarse):
# for CoarseFundamental see https://www.quantconnect.com/docs/algorithm-reference/universes#
if ('main' in Roboto.MSGS): self.Debug("{} CoarseFilter".format(self.Time))
# ensure a minimum dollar volume corresponding to orders of 2 x our maximum fill rate
self.min_dollar_vol = max(Roboto.MIN_DOLLAR_VOLUME,
(self.Portfolio.TotalPortfolioValue * (1-Roboto.FREE_CASH) * abs(Roboto.MAX_POSITION_SIZE)) / (2. * Roboto.OF_TOTAL_DV))
# 1st filter for hard and soft criteria
cf_selected = [x for x in coarse
if x.Market == "usa"
and x.HasFundamentalData
and x.Volume > Roboto.MIN_VOLUME
and x.DollarVolume > self.min_dollar_vol
and float(x.Price) >= Roboto.MIN_Price
and float(x.Price) <= Roboto.MAX_Price
]
if 'filter' in Roboto.MSGS: self.Debug("{} CoarseFilter-1 len:{}".format(self.Time, len(cf_selected))) # approx 500 securities
# collect symbols which are new to our universe
new_universe = {}
for cf in cf_selected:
# for every new symbol, create an entry in our universe with a SecurityData object (including initial population of our indicators with daily history)
if (cf.Symbol not in self.universe):
history = self.History(cf.Symbol, Roboto.SLOW, Resolution.Daily)
self.universe[cf.Symbol] = Roboto.SecurityData(cf.Symbol, history)
# for our complete universe, update our indicators with the data of the last trading day
self.universe[cf.Symbol].update(cf.EndTime, cf.AdjustedPrice, cf.DollarVolume, self.magnitude)
# for all cf_selected securities (and not the dropped ones), based on our newly created or updated universe entries,
new_universe[cf.Symbol] = self.universe[cf.Symbol]
self.universe = new_universe
# 2nd filter the values of our SecurityData dict to those who are over their bubble
values = [x for x in self.universe.values() if x.isBubble]
if 'filter' in Roboto.MSGS: self.Debug("{} CoarseFilter-2 len:{}".format(self.Time, len(values)))
# 3rd filter for n_coarse sorted by the highest ratio
values.sort(key = lambda x: x.ratio, reverse = True) # highest ratios first
# we need to return only our array of Symbol objects
symbols = [x.symbol for x in values[:Roboto.N_COARSE]]
if 'filter' in Roboto.MSGS: self.Debug("{} CoarseFilter-3 len:{}".format(self.Time, len(symbols)))
return symbols
def OnSecuritiesChanged(self, changes):
# is called whenever the universe changes
if 'main' in Roboto.MSGS: self.Debug("{} Securities changed".format(self.Time))
# remember all changed securities so they can be opened, and cancel all their orders
self.open = []
for security in changes.AddedSecurities:
self.CancelAllOrders(security.Symbol)
if not security.Invested and (security.Symbol not in self.excl_smbls):
if 'logic' in Roboto.MSGS: self.Debug("{} Identified bubble for security {}".format(self.Time, security.Symbol))
self.open.append(security.Symbol)
def CheckForEntries(self):
# once per day, check for entering new short positions for added securities from UniverseSelection
if 'main' in Roboto.MSGS: self.Debug("{} CheckForEntries".format(self.Time))
num_pos = len([f.Key for f in self.ActiveSecurities if f.Value.Invested]) # positions incl. bullish stock
# open new positions based on self.open which is populated in OnSecuritiesChanged
new_pos=0
for symb in self.open:
if (num_pos+new_pos) < Roboto.MAX_POS and float(self.Securities[symb].Price) >= Roboto.MIN_Price:
new_pos += 1
dynamic = Roboto.DYN_POSITION_SIZE/(num_pos + new_pos) # negtive
target = max(Roboto.MAX_POSITION_SIZE, dynamic) # max of negative = min of positive
tag = "New pos. target allocation {}".format(round(target, 4))
self.Short(symb, target, tag)
self.open.remove(symb)
# set some portion of portfolio to hold bullish index
if Roboto.USE_BULL:
remaining_allocation = max(1.0 - self.MIN_BP - (num_pos * (-1 * Roboto.MAX_POSITION_SIZE)), Roboto.MAX_POSITION_SIZE)
if 'order' in Roboto.MSGS: self.Debug("{} *** Entering: bull security with {}".format(self.Time, remaining_allocation))
self.SetHoldings([PortfolioTarget(self.bull, remaining_allocation)])
self.Plot("Buying Power", "Bull", remaining_allocation)
self.bp = (self.Portfolio.MarginRemaining/self.Portfolio.TotalPortfolioValue)
self.Plot("Buying Power", "BP", self.bp)
self.Plot("# Positions", "pos", num_pos)
def Short(self, symbol, target, tag = "No Tag Provided"):
# handle entry position sizing, target = negative
if (symbol not in self.universe):
if 'error' in Roboto.MSGS: self.Debug("Not in universe: {}, {}".format(symbol, self.Time))
return
# get close of yesterday, mean close of last 30 minutes, and price of last minute
close_yesterday = self.universe[symbol].close
price = float(self.Securities[symbol].Close) # price of last bar according to res
if 'logic' in Roboto.MSGS: self.Debug("{} Short check {} @ yest:{}, price:{}".format(self.Time, symbol, close_yesterday, price))
# enter short if price is decreasing
if price > 0:
# calc target order quantity from target percent (quantity is calculated based on current price and is adjusted for the fee model attached to that security)
q_target = self.CalculateOrderQuantity(symbol, target)
# calc maximum order quantity based on max allowed securities dollar volume, must be negative for shorting
q_max = - float(Roboto.OF_TOTAL_DV * self.universe[symbol].vol.Current.Value) / price
# enter short with allowed quantity
q = int(max(q_target, q_max)) # max of negative = min of positive
# checks if there is enough shares to be shorted
# if not self.Shortable(symbol, q):
# if 'shortable' in Roboto.MSGS: self.Debug('Not engouh shares for available {}, {}'.format(symbol, self.Time))
# return
if q < 0:
if 'order' in Roboto.MSGS: self.Debug("{} *** Entering: short for {} @ {}".format(self.Time, q, symbol, price))
self.EmitInsights(Insight.Price(symbol, timedelta(days = Roboto.MAX_POSITION_AGE), InsightDirection.Down, None, None, None, target))
self.LimitOrder(symbol, q, price, tag)
else:
if q != 0:
if 'error' in Roboto.MSGS: self.Error("{} Received positive quantity for short order: {} {} @ {} (Target: {})".format(self.Time, q, symbol, price, target))
else:
if 'logic' in Roboto.MSGS: self.Debug("{} Shorting skipped for {} @ {}".format(self.Time, symbol, price))
# checks fast price acceleration
def CheckForShortSqueeze(self):
invested = [f.Key for f in self.ActiveSecurities if (f.Value.Invested and (f.Value.Symbol not in self.excl_smbls))]
for symb in invested:
holding = self.Portfolio[symb]
position = self.positions[symb]
if 'squeeze' in Roboto.MSGS: self.Debug("Position open time: {}, symbol: {}, executed price: {}, curr price: {}".format(position.orderFillTime, \
position.symbol, position.price, self.Securities[symb].Price))
# checks is the price has risen by X% in the last Y minutes
if (position.price - self.Securities[symb].Price) < (position.price * - 0.03) \
and ((self.Time - position.orderFillTime).days < 1) \
and (((self.Time - position.orderFillTime).seconds / 60) < 60) \
and position.price > position.fillPrice:
tag = "Short squeeze, age {} minutes, result {}%".format((self.Time - position.orderFillTime).seconds / 60, round(holding.UnrealizedProfitPercent * 100, 4) )
if 'squeeze' in Roboto.MSGS: self.Debug(tag)
self.RapidExit(holding.Symbol, tag)
else:
position.price = self.Securities[symb].Price
def CheckForExits(self):
# every OnDate event, check for liquidation of portfolio positions based on loss, profit, age, and buying power
if 'main' in Roboto.MSGS: self.Debug("{} CheckForExits".format(self.Time))
closing = set()
invested = [f.Key for f in self.ActiveSecurities if (f.Value.Invested and (f.Value.Symbol not in self.excl_smbls))]
# liquidate loss positions or old positions or take profit
for symb in invested:
holding = self.Portfolio[symb]
position = self.positions[symb]
# dynamicaly set the cut loss based on position age
TRADE_CUT_LOSS = Roboto.CUT_LOSS if ((self.Time - position.orderFillTime).days < 1 ) else Roboto.CUT_LOSS_DAY_TWO
# update cut loss, limited to UnrealizedProfitPercent = 0
self.trail_cut_loss[symb] = min(-TRADE_CUT_LOSS, max(self.trail_cut_loss[symb], holding.UnrealizedProfitPercent * Roboto.TCL_GET_EVEN))
# update cut loss, not limited
self.trail_cut_loss[symb] = max(self.trail_cut_loss[symb], holding.UnrealizedProfitPercent * Roboto.TCL_TRAIL)
take_profit = Roboto.TAKE_PROFIT
# update trailing profit decrease, kicking in at 70% of the days, decreasing up to Roboto.TP_TRAIL
tp_decrease = 1 - max(0, ( (1+(self.Time - position.orderFillTime).days) - Roboto.MAX_POSITION_AGE*Roboto.TP_KICK_IN) / (Roboto.MAX_POSITION_AGE - Roboto.MAX_POSITION_AGE*Roboto.TP_KICK_IN)) * Roboto.TP_TRAIL
# exit positions with a large loss quickly with a market order
if (holding.UnrealizedProfitPercent < (self.trail_cut_loss[symb] + TRADE_CUT_LOSS)):
if 'order' in Roboto.MSGS: self.Debug("{} *** Liquidating: Market Order for Cutting Losses on {} at {} days, {}%".format(self.Time, holding.Symbol, (self.Time - position.orderFillTime).days, round(holding.UnrealizedProfitPercent, 4) * 100))
self.CancelAllOrders(holding.Symbol)
tag = "Cutting loss, age {} days, result {}%".format((self.Time - position.orderFillTime).days, round(holding.UnrealizedProfitPercent, 4) * 100)
self.RapidExit(holding.Symbol, tag)
closing.add(holding.Symbol)
# exit positions that have a large profit with a limit order
elif (holding.UnrealizedProfitPercent > take_profit * tp_decrease):
if 'order' in Roboto.MSGS: self.Debug("{} *** Liquidating: Limit Order for Taking Profit on {} at {} days, {}%".format(self.Time, holding.Symbol, (self.Time - position.orderFillTime).days, round(holding.UnrealizedProfitPercent, 4) * 100))
self.CancelAllOrders(holding.Symbol)
tag = "Taking profit, age {} days, result {}%".format((self.Time - position.orderFillTime).days, round(holding.UnrealizedProfitPercent, 4) * 100)
self.Cover(holding.Symbol, tag)
closing.add(holding.Symbol)
# exit old positions with a limit order
elif (self.Time - position.orderFillTime).days > Roboto.MAX_POSITION_AGE:
if 'order' in Roboto.MSGS: self.Debug("{} *** Liquidating: Limit Order for Expired {} at {} days, {}%".format(self.Time, holding.Symbol, (self.Time - position.orderFillTime).days, round(holding.UnrealizedProfitPercent, 4) * 100))
self.CancelAllOrders(holding.Symbol)
tag = "Expired, age {} days, result {}%".format((self.Time - position.orderFillTime).days, round(holding.UnrealizedProfitPercent, 4) * 100)
#self.RapidExit(holding.Symbol, tag)
self.Cover(holding.Symbol, tag)
closing.add(holding.Symbol)
# liquidate most profitable position if buying power is too low
self.bp = self.Portfolio.MarginRemaining / self.Portfolio.TotalPortfolioValue
if self.bp < Roboto.MIN_BP:
if 'logic' in Roboto.MSGS: self.Debug("{} Buying Power too low: {}".format(self.Time, self.bp))
class Factor:
def __init__(self, holding):
self.holding = holding
self.unrealized = self.holding.UnrealizedProfitPercent
track = {}
for symb in invested:
holding = self.Portfolio[symb]
track[holding.Symbol] = Factor(holding)
values = list(set(track.values()) - set(closing)) # remove any symbols already beeing closed above (loss positions or old positions or take profit)
if len(values) > 0:
values.sort(key=lambda f: f.unrealized, reverse=True)
if 'order' in Roboto.MSGS: self.Debug("{} *** Liquidating: Limit Order for Buying Power {} @ {}".format(self.Time, values[0].holding.Symbol, values[0].unrealized))
self.CancelAllOrders(values[0].holding.Symbol)
tag = "Liquidating, age {} days, result {}%".format((self.Time - position.orderFillTime).days, round(holding.UnrealizedProfitPercent, 4) * 100)
self.Cover(values[0].holding.Symbol, tag)
else:
if 'error' in Roboto.MSGS: self.Error("{} Unable to liquidate: {} {}".format(self.Time, len(values), len(closing)))
def RapidExit(self, symbol, tag = "No Tag Provided"):
# performs a market order for a quick exit
q = -1 * int(self.Portfolio[symbol].Quantity)
if q > 0:
if 'debug' in Roboto.MSGS: self.Debug("{} Rapid Exit {} {}".format(self.Time, q, symbol))
self.EmitInsights(Insight.Price(symbol, timedelta(days = Roboto.MAX_POSITION_AGE), InsightDirection.Up, None, None, None, 0.00))
self.MarketOrder(symbol, q, False, tag)
else:
if q != 0:
if 'error' in Roboto.MSGS: self.Error("{} Received negative quantity for rapid exit order: {} {}".format(self.Time, q, symbol))
def Cover(self, symbol, tag = "No Tag Provided"):
# performs a limit order at previous close price for an exit
q = -1 * int(self.Portfolio[symbol].Quantity)
price = self.Securities[symbol].Close
if q > 0:
if 'debug' in Roboto.MSGS: self.Debug("{} Cover {} {} @ {}".format(self.Time, q, symbol, price))
self.EmitInsights(Insight.Price(symbol, timedelta(days = Roboto.MAX_POSITION_AGE), InsightDirection.Flat, None, None, None, 0.00))
self.LimitOrder(symbol, q, price, tag)
else:
if q != 0:
if 'error' in Roboto.MSGS: self.Error("{} Received negative quantity for cover order: {} {} @ {}".format(self.Time, q, symbol, price))
# removes unfilled limit orders after 24 hours
def CheckForStaleOrders(self):
oo = self.Transactions.GetOpenOrders()
for order in oo:
if order.Status != OrderStatus.Filled:
order_time = order.Time.date()
if (self.Time.date() - order_time).days > 0:
if 'stale' in Roboto.MSGS: self.Debug("Stale order for {}, open for {} days".format(order.Symbol, (self.Time.date() - order_time).days))
self.CancelAllOrders(order.Symbol)
def CancelAllOrders(self, symbol):
if 'debug' in Roboto.MSGS: self.Debug("{} Cancelling all orders for {}".format(self.Time, symbol))
openOrders = self.Transactions.CancelOpenOrders(symbol)
for oo in openOrders:
if not (oo.Status == OrderStatus.CancelPending):
r = oo.Cancel()
if not r.IsSuccess:
if 'error' in Roboto.MSGS: self.Error("{} Failed to cancel open order {} of {} for reason: {}, {}".format(self.Time, oo.Quantity, oo.Symbol, r.ErrorMessage, r.ErrorCode))
def OnOrderEvent(self, orderEvent):
if orderEvent.Status == OrderStatus.Filled:
order = self.Transactions.GetOrderById(orderEvent.OrderId)
if 'debug_order_event' in Roboto.MSGS: self.Debug("{} Filled {} of {} at {}".format(self.Time, order.Quantity, order.Symbol, order.Price))
# if completely liquidating position, stop tracking position age
if not self.Portfolio[order.Symbol].Invested:
try:
# del self.positions[symb]
# del self.trail_cut_loss[order.Symbol]
self.trail_cut_loss[order.Symbol] = Roboto.CUT_LOSS
if 'debug_order_event' in Roboto.MSGS: self.Debug("{} No longer tracking {}".format(self.Time, order.Symbol))
except Error:
if 'error' in Roboto.MSGS: self.Error("{} Key deletion failed for {}".format(self.Time, order.Symbol))
# if position is completely new, start tracking position age
else:
if (order.Symbol not in self.positions):
class SymbolData:
def __init__(self, order, currentTime, securityInfo, orderEvent):
self.orderFillTime = currentTime
self.price = securityInfo.Price
self.symbol = order.Symbol
self.fillPrice = orderEvent.FillPrice
self.positions[order.Symbol] = SymbolData(order, self.Time, self.Securities[order.Symbol], orderEvent)
if 'debug_order_event' in Roboto.MSGS: self.Debug("Symbol: {}, time: {}".format(order.Symbol, self.Time))
else:
if 'error' in Roboto.MSGS: self.Error("{} Key already existed for {}".format(self.Time, order.Symbol))
if (order.Symbol not in self.trail_cut_loss):
self.trail_cut_loss[order.Symbol] = 0
else:
if 'error' in Roboto.MSGS: self.Error("{} Key already existed for {}".format(self.Time, order.Symbol))
class InteractiveBrokersBrokerageModelWithShortable(InteractiveBrokersBrokerageModel):
def __init__(self):
super().__init__()
self.ShortableProvider = AtreyuShortableProvider(SecurityType.Equity, Market.USA)