| Overall Statistics |
|
Total Trades 220 Average Win 0.47% Average Loss -0.50% Compounding Annual Return -2.197% Drawdown 11.000% Expectancy -0.095 Net Profit -4.351% Sharpe Ratio -0.23 Probabilistic Sharpe Ratio 2.662% Loss Rate 54% Win Rate 46% Profit-Loss Ratio 0.95 Alpha -0.065 Beta 0.215 Annual Standard Deviation 0.069 Annual Variance 0.005 Information Ratio -1.321 Tracking Error 0.186 Treynor Ratio -0.073 Total Fees $214.00 |
from datetime import timedelta, datetime
from collections import deque
from QuantConnect.Data.Custom.CBOE import *
from QuantConnect.Securities.Option import OptionStrategies
class MyAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2019, 1, 1) # Set Start Date
self.SetEndDate(2020, 12, 31) #Set End Date
self.SetCash(100000) # Set Strategy Cash
self.vix = self.AddData(CBOE, "VIX", Resolution.Daily).Symbol
# Set number of coarse selectoin
self.num_coarse_0 = 50
self.num_coarse_1 = 5
self.CandidateStockPool = {} # Record 5 equities we chosen
self.Contracts = {} # Store the contracts we want to buy
# self._Consolidators = {}
self.AddUniverse(self.CoarseSelectionFunction)
self.UniverseSettings.Resolution = Resolution.Daily
self.Months = -1
# parameters ------------------------------------------------------------
self.DTE = 15 # target days till expiration
self.OTM = 0 # target percentage OTM of put
self.DaysBeforeExp = 2 # number of days before expiry to exit
self.NumPosition = 0.1 # The number of capital for each order
self.SpreadRatio = .01 # Calculate the strike price of option
self.minPrice = 50 # Initialize the minimum price of stocks
# ------------------------------------------------------------------------
self.AddEquity("SPY")
# Buy credit spread 30 mins after market open
self.Schedule.On(self.DateRules.EveryDay("SPY"), \
self.TimeRules.At(10, 1), \
self.BuyBullPutSpread)
# Close call 30 mins before market close
self.Schedule.On(self.DateRules.EveryDay("SPY"), \
self.TimeRules.At(15, 31), \
self.ClosePosition)
# Plot our current open positions
self.Schedule.On(self.DateRules.EveryDay(), \
self.TimeRules.At(17, 0), \
self.Plotting
)
def FindContract(self, data, symbol):
if symbol not in self.Contracts:
if self.OptionFilter(data, symbol) != []:
self.Contracts[symbol] = self.OptionFilter(data, symbol)
self.Log("Found contract: {}".format(self.Contracts[symbol][0].ID, self.Contracts[symbol][1].ID))
# Liquidate contracts
def ClosePosition(self):
for symbol in self.Contracts:
if self.Contracts[symbol] != []:
# liquidate if close to expiration date
if (self.Contracts[symbol][0].ID.Date - self.Time) <= timedelta(self.DaysBeforeExp):
self.Log("Closed {}: too close to expiration".format(self.Contracts[symbol]))
self.Liquidate(self.Contracts[symbol][0])
self.Liquidate(self.Contracts[symbol][1])
self.Contracts[symbol] = []
def OnData(self, data):
# Liquidate all positions if VIX>35
# if self.Securities[self.vix].Price > 35:
# self.Log("Current VIX is {}".format(self.Securities[self.vix].Price))
# self.Liquidate()
# return
for candidate in self.CandidateStockPool.values():
if candidate.buy:
self.FindContract(data, candidate.symbol)
# Find satisfied contracts by requirements
def OptionFilter(self, data, symbol):
try:
UnderlyingPrice = data[symbol].Close
except:
return []
OptionChain = self.OptionChainProvider.GetOptionContractList(symbol, data.Time)
sort_by_expire = sorted([i for i in OptionChain if i.ID.OptionRight == OptionRight.Put\
and i.ID.StrikePrice <= UnderlyingPrice*(1+self.SpreadRatio)],\
key = lambda x: abs((x.ID.Date - self.Time).days - self.DTE), reverse = False)
num = 0
for x in sort_by_expire:
num += 1
expire_date = x.ID.Date
filtered_contract = sorted([i for i in sort_by_expire if i.ID.Date == expire_date],\
key = lambda x: x.ID.StrikePrice, reverse = True)
if len(filtered_contract) >= 2:
short_contract = filtered_contract[0]
short_strike = short_contract.ID.StrikePrice
long_strike = min(UnderlyingPrice, short_strike*(1-self.SpreadRatio))
long_contract = list(filter(lambda x: x.ID.StrikePrice <= long_strike, filtered_contract))[0]
self.Log("Current contract is {}".format(short_contract, long_contract))
self.AddOptionContract(short_contract, Resolution.Minute)
self.AddOptionContract(long_contract, Resolution.Minute)
return [short_contract, long_contract]
elif num >= 3:
return []
return []
def BuyBullPutSpread(self):
# If VIX is more than 35, liquidate all positions and return
if self.Securities[self.vix].Price > 35:
self.Log("Current VIX is {}, liquidate all positions".format(self.Securities[self.vix].Price))
self.Liquidate()
for symbol in self.Contracts:
self.Contracts[symbol] = []
return
for symbol in self.Contracts:
# if contract exists
self.Log(self.Contracts[symbol])
if self.Contracts[symbol] != []:
[sell, buy] = self.Contracts[symbol]
# if the ticker hasnt't been invested
if not self.Portfolio[sell].Invested and not self.Portfolio[buy].Invested:
# self.Buy(OptionStrategies.BullPutSpread(sell.Symbol.Value, sell.ID.StrikePrice, buy.ID.StrikePrice, sell.ID.Date))
# self.MarketOrder(sell, -1)
# self.MarketOrder(buy, 1)
self.Buy(buy, 1)
self.Sell(sell, 1)
self.Log('Put Credit Spread {} bought'.format(self.Contracts[symbol][0], self.Contracts[symbol][1]))
return
return
def OnOrderEvent(self, orderEvent):
self.Log(str(orderEvent))
def Plotting(self):
l = list(self.Contracts.keys())
num = len(l)
self.Plot('# of Open Positions', 'Positions', num)
self.Plot('VIX', 'VIX', self.Securities[self.vix].Price)
def CoarseSelectionFunction(self, coarse):
# Pass data to the indicator for its update
if self.CandidateStockPool != {}:
for sec in coarse:
symbol = sec.Symbol
if symbol in self.CandidateStockPool:
input = Input_Class(sec.EndTime, sec.AdjustedPrice)
self.CandidateStockPool[symbol].Update(input)
#1. If it isn't time to update data, return the previous symbols
#2. Update self.lastMonth with current month to make sure only process once per month
if self.Time.month == self.Months:
return Universe.Unchanged
self.Months = self.Time.month
# Prefilter the coarse in 300 stocks by DollarVolume
prefilter = sorted([x for x in coarse if x.DollarVolume > 10000000 and x.Price > self.minPrice], \
key = lambda x: x.DollarVolume, reverse = True)[:self.num_coarse_0]
dict = {}
# Iterate through the coarse and update indicators
for sec in prefilter:
dict[sec.Symbol] = SymbolInd(sec.Symbol)
indicators = dict[sec.Symbol]
try:
history = self.History(sec.Symbol, 60, Resolution.Daily).loc[sec.Symbol]
except:
dict.pop(sec.Symbol)
continue
for tuple in history.itertuples():
indicators.Update(tuple)
# Get the stocks in uptrends
coarse_list = list(filter(lambda x: x.is_uptrend, dict.values()))
coarse_list = [x for x in coarse_list if x.RSI.IsReady and x.MACD.IsReady]
# Choose final stocks, sorted by price improvement.
# Check SymbolInd class for details
final_stocks = sorted([x for x in coarse_list if (x.rsi <= 70) and \
(x.macd > 0)], \
key = lambda x: x.scale, reverse = True)[:self.num_coarse_1]
final_symbols = [x.symbol for x in final_stocks]
# Remove the old stocks in the CandidateStockPool list
for sym in list(self.CandidateStockPool.keys()):
if sym not in final_symbols:
self.CandidateStockPool.pop(sym)
self.RemoveSecurity(sym)
# self.SubscriptionManager.RemoveConsolidator(sym, self._Consolidators[sym])
# self._Consolidators.pop(sym)
# Add new candidates to the CandidateStockPool list
for x in final_stocks:
if x.symbol not in self.CandidateStockPool:
equity = self.AddEquity(x.symbol, Resolution.Minute)
equity.SetDataNormalizationMode(DataNormalizationMode.Raw)
symbol = equity.Symbol
self.CandidateStockPool[symbol] = x
# consolidator = self.ResolveConsolidator(symbol, Resolution.Daily)
# self.RegisterSymbolInd(symbol, self.CandidateStockPool[symbol], Resolution.Daily)
# self._Consolidators[symbol] = consolidator
return Universe.Unchanged
class Input_Class(object):
def __init__(self, time, close):
self.EndTime = time
self.Close = close
class SymbolInd:
"""
Record every indicator for each active security, which is in our dictionary
self.coarse_dict
"""
def __init__(self, symbol):
# Initialize all indicators and parameters
self.symbol = symbol
# self.tolerance = 1
self.MA_10 = SimpleMovingAverage(10)
self.MA_20 = SimpleMovingAverage(20)
self.Volatility = StandardDeviation(14)
self.MACD = MovingAverageConvergenceDivergence(symbol, 12, 26, 9)
self.RSI = RelativeStrengthIndex(symbol, 14)
# The period of price improvement
self.window = RollingWindow[float](30)
self.queue = deque(maxlen = 60)
self.buy = False
self.is_uptrend = False
self.scale = 0 # Quantify the uptrend
self.difference = None # Diff between current close price and 2 weeks ago
self.macd = None
self.rsi = None
self.open_date = None # Record opening date
self.open_price = None # Record open price
self.IsReady = False
def Update(self, input):
try:
close = input.Close
time = input.EndTime
except:
close = input.close
time = input.Index
self.queue.appendleft(close)
count = len(self.queue)
self.IsReady = count == self.queue.maxlen
# Update all indicators using current data
self.window.Add(close)
if self.window.IsReady:
# Calculate the price improvements in a specific period
self.difference = self.window[0] - self.window[self.window.Count - 1]
self.is_uptrend = self.difference >= 0
if self.Volatility.Update(time, close):
self.vol = self.Volatility.Current.Value
if self.is_uptrend:
self.scale = self.difference/self.vol
if self.MA_10.Update(time, close) and self.MA_20.Update(time, close):
ma_10 = self.MA_10.Current.Value
ma_20 = self.MA_20.Current.Value
# Check if it is a pull back as the entry point
if ma_10 > close:
self.buy = True
else:
self.buy = False
if self.MACD.Update(time, close):
self.macd = self.MACD.Signal.Current.Value
if self.RSI.Update(time, close):
self.rsi = self.RSI.Current.Value