| Overall Statistics |
|
Total Trades 119 Average Win 0.28% Average Loss -9.71% Compounding Annual Return 15.465% Drawdown 43.600% Expectancy 0.011 Net Profit 34.784% Sharpe Ratio 0.642 Loss Rate 2% Win Rate 98% Profit-Loss Ratio 0.03 Alpha 0.328 Beta -7.512 Annual Standard Deviation 0.282 Annual Variance 0.079 Information Ratio 0.573 Tracking Error 0.282 Treynor Ratio -0.024 Total Fees $165.34 |
# limitations under the License.
import numpy as np
from datetime import timedelta
class CoveredCallOptionsAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2017, 1, 1)
self.SetEndDate(2019, 1, 28)
self.SetCash(1000000)
self.tickers = ["AAPL","IBM","CAT","BA","INTC","NVDA"]
for ticker in self.tickers:
equity = self.AddEquity(ticker, Resolution.Minute)
equity.SetDataNormalizationMode(DataNormalizationMode.Raw)
# Initialize the call contract
self.put = str()
## Initialize two dictionarys to track Rolling Windows and IV Ranks
self.rollingWindows = {}
self.ivRank = {}
self.Schedule.On(
self.DateRules.EveryDay(),
self.TimeRules.At(14, 45),
self.GetLogs)
def GetLogs(self):
for key in self.ivRank.keys():
log_str = "Date: {} -- Symbol: {} -- IVR: {}".format(
str(self.Time.date()), str(key), str(self.ivRank[key]))
self.Log(log_str)
def OnData(self,slice):
## Calculate IV Rank for every contract in all chains for each data slice they appear
for chain in slice.OptionChains.Values:
contracts = sorted(sorted(chain, \
key = lambda x: abs(chain.Underlying.Price - x.Strike)), \
key = lambda x: x.Expiry, reverse=True)
for contract in contracts:
## This calculates IV Rank and updates Rolling Windows
self.ivRank[contract.Symbol] = self.CalculateIVRank(contract)
for underlying in self.tickers:
self.underlying = underlying
#if not self.Portfolio[self.underlying].Invested:
#self.SetHoldings(self.underlying, 0.05) # long the underlying stock
if not (self.Securities.ContainsKey(self.put)):
self.put = self.AddContract(slice) # Add the call option contract (subscribe the contract data)
self.Log(self.put)
self.Log(str(self.put))
if self.Securities.ContainsKey(self.put) and not self.Portfolio[self.put].Invested:
self.Sell(self.put, 10) # short the call option
self.put = str()
def AddContract(self,slice):
filtered_contracts = self.InitialFilter(-3, 3, 0, 30)
if len(filtered_contracts) == 0: return str()
else:
put = [x for x in filtered_contracts if x.ID.OptionRight == OptionRight.Put]
# sorted the contracts according to their expiration dates and choose the ATM options
contracts = sorted(sorted(put, key = lambda x: abs(self.Securities[self.underlying].Price- x.ID.StrikePrice)),
key = lambda x: x.ID.Date, reverse=True)
if len(contracts) > 0:
self.AddOptionContract(contracts[0], Resolution.Minute)
return contracts[0]
else:
return str()
def CalculateIVRank(self, contract):
## Retrieve Rolling Window
rw = self.rollingWindows[contract.Symbol]
currentIV = contract.ImpliedVolatility
## IV Rank is 0% if currentIV is 0
if currentIV == 0:
self.rollingWindows[contract.Symbol].Add(contract.ImpliedVolatility)
return 0.0
## IV Rank is 100% if it is the first sample
if rw.Count < 1:
self.rollingWindows[contract.Symbol].Add(contract.ImpliedVolatility)
return 1.0
## If second sample, IV Rank is current IV / 1st IV
elif rw.Count == 1:
self.rollingWindows[contract.Symbol].Add(contract.ImpliedVolatility)
return currentIV / rw[1]
else:
low = min(float(contract.ImpliedVolatility), min(list(rw)))
high = max(list(rw))
## Check for division by 0
if high == low and low == 0:
self.rollingWindows[contract.Symbol].Add(contract.ImpliedVolatility)
return 1.0
## If high and low are same but non-zero, then IV Rank won't be 1.0
## and also avoids division by 0
elif high == low and low != 0:
self.rollingWindows[contract.Symbol].Add(contract.ImpliedVolatility)
return currentIV/high
else:
self.rollingWindows[contract.Symbol].Add(contract.ImpliedVolatility)
return (currentIV - low) / (high - low)
def InitialFilter(self, min_strike_rank, max_strike_rank, min_expiry, max_expiry):
''' This method is an initial filter of option contracts
according to the range of strike price and the expiration date '''
contracts = self.OptionChainProvider.GetOptionContractList(self.underlying, self.Time.date())
if len(contracts) == 0 : return []
# fitler the contracts based on the expiry range
contract_list = [i for i in contracts if min_expiry < (i.ID.Date.date() - self.Time.date()).days < max_expiry]
# find the strike price of ATM option
atm_strike = sorted(contract_list,
key = lambda x: abs(x.ID.StrikePrice - self.Securities[self.underlying].Price))[0].ID.StrikePrice
strike_list = sorted(set([i.ID.StrikePrice for i in contract_list]))
# find the index of ATM strike in the sorted strike list
atm_strike_rank = strike_list.index(atm_strike)
try:
strikes = strike_list[(atm_strike_rank + min_strike_rank):(atm_strike_rank + max_strike_rank)]
except:
strikes = strike_list
filtered_contracts = [i for i in contract_list if i.ID.StrikePrice in strikes]
return filtered_contracts
def OnOrderEvent(self, orderEvent):
self.Log(str(orderEvent))
def OnSecuritiesChanged(self, changes):
## Initialize Rolling Windows and IV Rank for new contracts
for x in changes.AddedSecurities:
if x.Symbol not in self.rollingWindows.keys():
self.rollingWindows[x.Symbol] = RollingWindow[Decimal](100)
self.ivRank[x.Symbol] = 1.0