Overall Statistics
Total Trades
119
Average Win
0.28%
Average Loss
-9.71%
Compounding Annual Return
15.465%
Drawdown
43.600%
Expectancy
0.011
Net Profit
34.784%
Sharpe Ratio
0.642
Loss Rate
2%
Win Rate
98%
Profit-Loss Ratio
0.03
Alpha
0.328
Beta
-7.512
Annual Standard Deviation
0.282
Annual Variance
0.079
Information Ratio
0.573
Tracking Error
0.282
Treynor Ratio
-0.024
Total Fees
$165.34
# limitations under the License.
import numpy as np
from datetime import timedelta

class CoveredCallOptionsAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2017, 1, 1)
        self.SetEndDate(2019, 1, 28)
        self.SetCash(1000000)
        
        self.tickers = ["AAPL","IBM","CAT","BA","INTC","NVDA"]
        for ticker in self.tickers:
            equity = self.AddEquity(ticker, Resolution.Minute)
            equity.SetDataNormalizationMode(DataNormalizationMode.Raw)

        # Initialize the call contract
        self.put = str() 
        ## Initialize two dictionarys to track Rolling Windows and IV Ranks
        self.rollingWindows = {}
        self.ivRank = {}
        self.Schedule.On(
            self.DateRules.EveryDay(),
            self.TimeRules.At(14, 45),
            self.GetLogs)

    def GetLogs(self):
        for key in self.ivRank.keys():
            log_str = "Date: {} -- Symbol: {} -- IVR: {}".format(
                str(self.Time.date()), str(key), str(self.ivRank[key]))
            self.Log(log_str)
            
    def OnData(self,slice):
        
        ## Calculate IV Rank for every contract in all chains for each data slice they appear
        for chain in slice.OptionChains.Values:
            contracts = sorted(sorted(chain, \
                           key = lambda x: abs(chain.Underlying.Price - x.Strike)), \
                           key = lambda x: x.Expiry, reverse=True)
            for contract in contracts:
                ## This calculates IV Rank and updates Rolling Windows
                self.ivRank[contract.Symbol] = self.CalculateIVRank(contract)
        
        for underlying in self.tickers:
            self.underlying = underlying
            #if not self.Portfolio[self.underlying].Invested:
                #self.SetHoldings(self.underlying, 0.05)  # long the underlying stock
            
            if not (self.Securities.ContainsKey(self.put)): 
                self.put = self.AddContract(slice) # Add the call option contract (subscribe the contract data)
                self.Log(self.put)
                self.Log(str(self.put))
                
            if self.Securities.ContainsKey(self.put) and not self.Portfolio[self.put].Invested:
                self.Sell(self.put, 10) # short the call option
                self.put = str()

          

    def AddContract(self,slice):
        filtered_contracts = self.InitialFilter(-3, 3, 0, 30)
        if len(filtered_contracts) == 0: return str()
        else:
            put = [x for x in filtered_contracts if x.ID.OptionRight == OptionRight.Put] 
            # sorted the contracts according to their expiration dates and choose the ATM options
            contracts = sorted(sorted(put, key = lambda x: abs(self.Securities[self.underlying].Price- x.ID.StrikePrice)), 
                                            key = lambda x: x.ID.Date, reverse=True)
            if len(contracts) > 0:
                self.AddOptionContract(contracts[0], Resolution.Minute)
                return contracts[0]
            else:
                return str()
                
    def CalculateIVRank(self, contract):
        
        ## Retrieve Rolling Window
        rw = self.rollingWindows[contract.Symbol]
        currentIV = contract.ImpliedVolatility

        ## IV Rank is 0% if currentIV is 0
        if currentIV == 0:
            self.rollingWindows[contract.Symbol].Add(contract.ImpliedVolatility)
            return 0.0
        
        ## IV Rank is 100% if it is the first sample
        if rw.Count < 1:
            self.rollingWindows[contract.Symbol].Add(contract.ImpliedVolatility)
            return 1.0

        ## If second sample, IV Rank is current IV / 1st IV
        elif rw.Count == 1:
            self.rollingWindows[contract.Symbol].Add(contract.ImpliedVolatility)
            return currentIV / rw[1]

        else:
            low = min(float(contract.ImpliedVolatility), min(list(rw)))
            high = max(list(rw))
            
            ## Check for division by 0
            if high == low and low == 0:
                self.rollingWindows[contract.Symbol].Add(contract.ImpliedVolatility)
                return 1.0
            
            ## If high and low are same but non-zero, then IV Rank won't be 1.0
            ## and also avoids division by 0
            elif high == low and low != 0:
                self.rollingWindows[contract.Symbol].Add(contract.ImpliedVolatility)
                return currentIV/high
            else:
                self.rollingWindows[contract.Symbol].Add(contract.ImpliedVolatility)
                return (currentIV - low) / (high - low)
            
 
    def InitialFilter(self, min_strike_rank, max_strike_rank, min_expiry, max_expiry):
        
        ''' This method is an initial filter of option contracts 
            according to the range of strike price and the expiration date '''
            
        contracts = self.OptionChainProvider.GetOptionContractList(self.underlying, self.Time.date())
        if len(contracts) == 0 : return []
        # fitler the contracts based on the expiry range
        contract_list = [i for i in contracts if min_expiry < (i.ID.Date.date() - self.Time.date()).days < max_expiry]
        # find the strike price of ATM option
        atm_strike = sorted(contract_list,
                            key = lambda x: abs(x.ID.StrikePrice - self.Securities[self.underlying].Price))[0].ID.StrikePrice
        strike_list = sorted(set([i.ID.StrikePrice for i in contract_list]))
        # find the index of ATM strike in the sorted strike list
        atm_strike_rank = strike_list.index(atm_strike)
        try: 
            strikes = strike_list[(atm_strike_rank + min_strike_rank):(atm_strike_rank + max_strike_rank)]
        except:
            strikes = strike_list
        filtered_contracts = [i for i in contract_list if i.ID.StrikePrice in strikes]

        return filtered_contracts 
    
    def OnOrderEvent(self, orderEvent):
        self.Log(str(orderEvent))
        
    def OnSecuritiesChanged(self, changes):
        ## Initialize Rolling Windows and IV Rank for new contracts
        for x in changes.AddedSecurities:
            if x.Symbol not in self.rollingWindows.keys():
                self.rollingWindows[x.Symbol] = RollingWindow[Decimal](100)
                self.ivRank[x.Symbol] = 1.0