Overall Statistics
Total Trades
632
Average Win
0.63%
Average Loss
-0.66%
Compounding Annual Return
-2.067%
Drawdown
35.500%
Expectancy
-0.032
Net Profit
-11.030%
Sharpe Ratio
-0.097
Probabilistic Sharpe Ratio
0.384%
Loss Rate
51%
Win Rate
49%
Profit-Loss Ratio
0.96
Alpha
0.001
Beta
-0.075
Annual Standard Deviation
0.124
Annual Variance
0.015
Information Ratio
-0.831
Tracking Error
0.228
Treynor Ratio
0.16
Total Fees
$503.00
Estimated Strategy Capacity
$27000000.00
Lowest Capacity Asset
AAPL 31PB4NYHEJJVQ|AAPL R735QTJ8XC9X
import numpy as np
from scipy.stats import norm
from statsmodels.tsa.arima.model import ARIMA

class SignalProcessor:
    ''' Models for volatility and price prediction '''
    
    def __init__(self, algo):
        ''' initiate a new instance of Model class object 
        Args: 
            algo: the algorithm instance'''
        self.algo = algo
        
        # initialize variable holding ARIMAX model
        self.arimax = None
        
    def processAltHistory(self, altHistoryDf):
        ''' process the raw Brain Language Metric of Company Filings historical data dataframe
        Args:
            altHistoryDf: (n x m df) the raw historical data dataframe
        Return:
            managementSentiment: (n x 1 series) series of sentiment level of management analysis off financial and operation 
            reportSentiment: (n x 1 series) series of sentiment level of financial report
            riskSentiment: (n x 1 series) series of sentiment level of risk factor statement'''
        managementSentiment = altHistoryDf.managementdiscussionanalyasisoffinancialconditionandresultsofoperations.unstack("symbol")
        reportSentiment = altHistoryDf.reportsentiment.unstack("symbol")
        riskSentiment = altHistoryDf.riskfactorsstatementsentiment.unstack("symbol")
        
        # we extract the sentiment levels from them
        def getSentiment(value):
            # we are not able to extract the ones without value (nan, which are float object)
            if not isinstance(value, float):
                sentiment = value.Sentiment
                if sentiment >= 0:
                    return sentiment
                else:
                    return abs(sentiment) * 1.5
            # fill nan with 0
            else:
                return 0
        
        # apply the function of extraction to the dataframes
        managementSentiment = managementSentiment.applymap(getSentiment)
        reportSentiment = reportSentiment.applymap(getSentiment)
        riskSentiment = riskSentiment.applymap(getSentiment)
        
        # align index
        managementSentiment.index = managementSentiment.index.date + timedelta(days=1)
        reportSentiment.index = reportSentiment.index.date + timedelta(days=1)
        riskSentiment.index = riskSentiment.index.date + timedelta(days=1)
        
        return managementSentiment, reportSentiment, riskSentiment
    
    def ARIMAXFitting(self, volatilities, altHistoryDf):
        ''' fitting a ARIMAX model for volatility modelling 
        Args:
            volatilities: (n x 1 array) annualized volatility data series
            altHistoryDf: (n x m df) the raw historical data dataframe'''
        # data processing
        managementSentiment, reportSentiment, riskSentiment = self.processAltHistory(altHistoryDf)
        
        # convert volatility series into 2-d
        volatilities = pd.DataFrame(volatilities)
        # align our data
        df = pd.concat([volatilities, managementSentiment, reportSentiment, riskSentiment], axis=1).fillna(0)
        df.columns = ["volatility", "managementSentiment", "reportSentiment", "riskSentiment"]
        
        # ARIMAX, p & d & q from research ACF/PACF
        arima = ARIMA(df["volatility"], df[["managementSentiment", "reportSentiment", "riskSentiment"]],
                      order=(1, 1, 1))
        try:
            self.arimax = arima.fit()
            return True
        except:
            return False
        
    def VolatilityPrediction(self, n):
        ''' volatility prediction from ARIMAX model
        Args:
            n: (int) number of steps of prediction
        Return:
            (n x 3 array) series of prediction of the next step of volatility'''
        # set up zeros matrix (day without sentiment data consider 0, suppose no entry within 1 month)
        exog = np.zeros((n, 3))
        
        return self.arimax.predict(n, exog=exog)
        
    def PricePrediction(self, S_0, mu, volatility, iters=20000):
        ''' underlying stcok price prediction from SDE
        Args:
            S_0: (float) initial stock price at time 0
            mu: (float) expectation of return
            volatility: (n x 1 array) the predicted volatility series
            iters: (int) number of iteration of Monte Carlo simulation
        Return:
            (n size list) list of predicted terminal price'''
        # each time step length
        t = 1/365
        # set variable as terminal price of each step, at time 0 = current price
        S_t = np.array([S_0]*iters)
        
        # we predict the number of steps the volatility series provided
        for i in range(len(volatility)):
            # MonteCarlo simulation
            S_t = S_t * np.exp(t * (mu - volatility.iloc[i]**2/2) + volatility.iloc[i] * np.sqrt(t) * norm.rvs(size=iters)).reshape(-1, 1)
        
        return S_t
class Functions:
    
    def __init__(self, algo):
        ''' a new instance of class Functions 
        Args:
            alog: algorithm instance'''
        self.algo = algo
    
    def GetOptionContracts(self, symbol, shortPutPrice, shortCallPrice, longPutPrice, longCallPrice):
        ''' get the contract for trading the Iron Condor
        Args: 
            symbol: the underlying symbol
            shortPutPrice: desire strike for put contract to be short selled
            shortCallPrice: desire strike for call contract to be short selled
            longPutPrice: desire strike for put contract to be long brought
            longCallPrice: desire strike for call contract to be long brought
        Return:
            (option contracts x4) the 4 contracts to trade as Iron Condor'''
        # get list of options
        contracts = self.algo.OptionChainProvider.GetOptionContractList(symbol, self.algo.Time)
        # return None if no option contract available
        if not contracts: return None, None, None, None
        # only within 30 days and within 20 strikes as wider ones are prone to have slippage
        contracts = self.InitialFilter(symbol, contracts, -20, 20, 0, 30)
        
        # get calls
        calls = [contract for contract in contracts if contract.ID.OptionRight == OptionRight.Call]
        if not calls: return None, None, None, None
        # get contract closest to short call price from Percentile% and a close expiration within a month
        shortCallContract = sorted(sorted(calls, key = lambda x: x.ID.Date, reverse=True),
                                             key = lambda x: abs(shortCallPrice - x.ID.StrikePrice))[0]
                                             
        # get contract closest to long call price from Percentile% and same expiration date
        sameExpiryCalls = [contract for contract in calls if contract.ID.Date == shortCallContract.ID.Date]
        longCallContract = sorted(sameExpiryCalls, key = lambda x: abs(longCallPrice - x.ID.StrikePrice))[0]
        
        # get puts with same expiration
        sameExpiryPuts = [contract for contract in contracts if contract.ID.OptionRight == OptionRight.Put \
                                                            and contract.ID.Date == shortCallContract.ID.Date]
        if not sameExpiryPuts: return None, None, None, None
        # get contract closest to short put price from Percentile% and same expiration date
        shortPutContract = sorted(sameExpiryPuts, key = lambda x: abs(shortPutPrice - x.ID.StrikePrice))[0]
                                             
        # get contract closest to long put price from Percentile% and same expiration date
        match = [contract for contract in sameExpiryPuts if contract.ID.StrikePrice == shortPutContract.ID.StrikePrice - \
                                                            longCallContract.ID.StrikePrice + shortCallContract.ID.StrikePrice]
        # discontinue if no contract match Iron Condor requirement
        if not match: return None, None, None, None
        longPutContract = match[0]
        
        # subscribe
        return self.algo.AddOptionContract(shortPutContract, Resolution.Minute), \
               self.algo.AddOptionContract(shortCallContract, Resolution.Minute), \
               self.algo.AddOptionContract(longPutContract, Resolution.Minute), \
               self.algo.AddOptionContract(longCallContract, Resolution.Minute)
               
    def InitialFilter(self, underlyingsymbol, symbol_list, min_strike_rank, max_strike_rank, min_expiry, max_expiry):
        ''' This method is an initial filter of option contracts
            based on the range of strike price and the expiration date 
            https://www.quantconnect.com/tutorials/applied-options/iron-condor'''
        if len(symbol_list) == 0 : return
        # fitler the contracts based on the expiry range
        contract_list = [i for i in symbol_list if min_expiry <= (i.ID.Date.date() - self.algo.Time.date()).days <= max_expiry]
        # find the strike price of ATM option
        atm_strike = sorted(contract_list,
                            key = lambda x: abs(x.ID.StrikePrice - self.algo.Securities[underlyingsymbol].Price))[0].ID.StrikePrice
        strike_list = sorted(set([i.ID.StrikePrice for i in contract_list]))
        # find the index of ATM strike in the sorted strike list
        atm_strike_rank = strike_list.index(atm_strike)
        try: 
            min_strike = strike_list[atm_strike_rank + min_strike_rank + 1]
            max_strike = strike_list[atm_strike_rank + max_strike_rank - 1]

        except:
            min_strike = strike_list[0]
            max_strike = strike_list[-1]
            
        # skip weekly options
        filtered_contracts = [i for i in contract_list if i.ID.StrikePrice >= min_strike \
                                                        and i.ID.StrikePrice <= max_strike \
                                                        and not OptionSymbol.IsWeekly(i)]

        return filtered_contracts
        
    def GetPositionSize(self, shortPut, shortCall, longPut, longCall):
        ''' get the postion size as discrete individual bet by Kelly Criterion
        Args: 
            shortPut: put contract plan to be short selled
            shortCall: call contract plan to be short selled
            longPut: put contract plan to be long brought
            longCall: call contract plan to be long brought
        Return:
            (int) the absolute number of contracts should be buy/short for each contract'''
        # get the spread first
        spread = longCall.StrikePrice - shortCall.StrikePrice
        
        # get all contract's current cost
        shortCallValue = shortCall.BidPrice
        shortPutValue = shortPut.BidPrice
        longCallValue = longCall.AskPrice
        longPutValue = longPut.AskPrice
        
        # total premium, minus commission 0.5*4
        premium = shortCallValue + shortPutValue - longCallValue - longPutValue - 2
        # return None to discontinue if no profitability
        if premium <= 0: return None
        
        # bet size with option contract multiplier 100
        bet = shortCallValue + shortPutValue + longCallValue + longPutValue
        
        # maximum profit and loss %
        maxProfitPercent = premium / bet
        maxLossPercent = (spread - premium) / bet
        
        # breakeven point, we need either one only as it's symmetric
        # if we factor spread as 1, this is the width of upper triangle (profit)
        breakEvenRatio = premium / spread
        
        # triangular area, up side is +, down side is -, 
        # we don't need to divide by 2 as there's 2 tails like this
        triangularArea = maxProfitPercent*breakEvenRatio - maxLossPercent*(1 - breakEvenRatio)
        # multiply by the ratio of all possibilities (difference in Percentile, already represent 2 tails)
        triangularArea *= self.algo.longPercentileLevel - self.algo.shortPercentileLevel
        
        # bet ratio, follows the equation in research notebook
        f = (maxProfitPercent * self.algo.shortPercentileLevel - maxLossPercent * (self.algo.longPercentileLevel - 1)) \
            /(maxProfitPercent * maxLossPercent * (self.algo.shortPercentileLevel - self.algo.longPercentileLevel + 1)) \
            - triangularArea
        # half the size such that half the risk but only 3/4 the expected return (Ed Thorp)
        f *= 0.5
        
        # since the ratio f is directly taking bet as base, we can use that as number of contract
        return max(min(int(f/100), self.algo.maxSize), 1)
from QuantConnect.DataSource import BrainCompanyFilingLanguageMetricsAll
import pandas as pd
from scipy.stats import norm
from Functions import Functions
from SignalProcessor import SignalProcessor

class CrawlingTanJellyfish(QCAlgorithm):

    def Initialize(self):
        # Parameter: determining the strike zone of iron condor
        # shortPercentileLevel must be smaller than longPercentileLevel
        self.shortPercentileLevel = 0.85
        self.longPercentileLevel = 0.95
        # parameter: maximum order size
        self.maxSize = 5
        
        self.SetStartDate(2016, 1, 1)
        self.SetCash(100000) 
        
        self.SetSecurityInitializer(self.CustomSecurityInitializer)
        
        tickers = ["FB", "AAPL", "AMZN", "NFLX", "GOOG"]
        self.symbols = [self.AddEquity(symbol, Resolution.Minute).Symbol for symbol in tickers]
        
         # dict contain corresponding alt data symbol for each symbol
        self.altSymbols = {}
        # dict contain corresponding ARIMAX model for each symbkl
        self.model = {}
        # dict contain corresponding entry condition check indicator
        self.checkOptions = {symbol: False for symbol in self.symbols}
        # dict contain corresponding historical trade bar data, we use 5+1 year (1 year offset annual volatility)
        self.history = {symbol: RollingWindow[TradeBar](252*6) for symbol in self.symbols}
        # dict contain order tickets
        self.tickets = {symbol: [] for symbol in self.symbols}
        
        # warm up rolling windows
        data = self.History(self.symbols, 252*6, Resolution.Daily)
        for symbol in self.symbols:
            for time, bar in data.loc[symbol].iterrows():
                tradeBar = TradeBar(time, symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
                self.history[symbol].Add(tradeBar)
            # set up consolidator for future auto-update
            self.Consolidate(symbol, Resolution.Daily, self.DailyBarHandler)
        
        # schedule daily check for entry position if signal active
        self.Schedule.On(self.DateRules.EveryDay("AAPL"), self.TimeRules.At(10, 0), self.Entry)
        
        # class Functions for utilities
        self.fcn = Functions(self)
        
    def CustomSecurityInitializer(self, security):
        security.SetDataNormalizationMode(DataNormalizationMode.Raw)
        security.SetMarketPrice(self.GetLastKnownPrice(security))
    
    def DailyBarHandler(self, bar):
        self.history[bar.Symbol].Add(bar)
        
    def OnData(self, data):
        # If any symbol is going to have a split, we close our position
        for symbol in self.symbols:
            if data.Splits.ContainsKey(symbol):
                split = data.Splits[symbol]
                # will have a split on next day
                if split.Type == 0:
                    [self.SetHoldings(order, 0) for order in self.tickets[symbol]]
                    self.SetHoldings(symbol, 0)
                    self.tickets[symbol] = []
    
    def OnSecuritiesChanged(self, changes):
        for security in changes.AddedSecurities:
            if security.Type == SecurityType.Equity:
                # store the dataset symbol
                self.altSymbols[security.Symbol] = self.AddData(BrainCompanyFilingLanguageMetricsAll, security.Symbol).Symbol
                
                # create new ARIMAX model for symbol
                self.CreateModel(security.Symbol)
                
    def OnOrderEvent(self, orderEvent):
        # liquidate if we have options being exercised at expiration
        if orderEvent.IsAssignment:
            order = self.Transactions.GetOrderById(orderEvent.OrderId)
            if order.Type == OrderType.OptionExercise:
                for symbol, orders in self.tickets.items():
                    if orderEvent.Symbol in orders:
                        [self.SetHoldings(order, 0) for order in orders]
                        self.SetHoldings(symbol, 0)
                        orders = []
                
    def CreateModel(self, symbol):
        ''' creation of ARIMAX model
        Args:
            the underlying symbol'''
        # initiate new model
        self.model[symbol] = SignalProcessor(self)
        
        # set up the model with 5 year data warm up
        altHistory = self.History(self.altSymbols[symbol], timedelta(days=365*5), Resolution.Daily)
        # get annualized volatility
        data = pd.DataFrame(self.history[symbol])[::-1]
        history = data.applymap(lambda bar: bar.Close)
        history.index = data.applymap(lambda bar: bar.EndTime.date()).values.flatten().tolist()
        volatility = history.pct_change().rolling(252).std().dropna()
        
        # fit ARIMAX model
        self.model[symbol].ARIMAXFitting(volatility, altHistory)
    
    def Entry(self):
        ''' check if entry conditions satisfied 
            if yes, enter short Iron Condor position'''
        for symbol, symbolData in self.altSymbols.items():
            altHistory = self.History(symbolData, timedelta(days=365*5), Resolution.Daily)
            # if no newly updated data, return
            if altHistory.empty or self.Time - altHistory.unstack(0).index[-1] > timedelta(days=1): continue
        
            # liquidate previous orders if we have updated data
            [self.SetHoldings(order, 0) for order in self.tickets[symbol]]
            self.SetHoldings(symbol, 0)
            self.tickets[symbol] = []
        
            # get annualized volatility
            data = pd.DataFrame(self.history[symbol])[::-1]
            history = data.applymap(lambda bar: bar.Close)
            history.index = data.applymap(lambda bar: bar.EndTime.date()).values.flatten().tolist()
            volatility = history.pct_change().rolling(252).std().dropna()
            
            # update the ARIMAX model
            success = self.model[symbol].ARIMAXFitting(volatility, altHistory)
            if not success: continue
            
            # get next 21 day prediction on volatility
            predictions = self.model[symbol].VolatilityPrediction(21)
            
            # we continue only if the predicted volatility will drop
            if volatility.values[-1] > predictions.iloc[-1]:
                # get terminal price predicion of n paths
                terminalPrices = self.model[symbol].PricePrediction(history.iloc[-1], np.mean(history.pct_change()[1:].values), predictions)
                
                # fit for normal distribution, since we're using N(0, sigma^2) for Monte Carlo, we can assume normal
                loc, scale = norm.fit(terminalPrices)
                # get preset level desired price range
                shortPutPrice = norm.ppf((1-self.shortPercentileLevel)/2, loc=loc, scale=scale)
                shortCallPrice = norm.ppf((1+self.shortPercentileLevel)/2, loc=loc, scale=scale)
                longPutPrice = norm.ppf((1-self.longPercentileLevel)/2, loc=loc, scale=scale)
                longCallPrice = norm.ppf((1+self.longPercentileLevel)/2, loc=loc, scale=scale)
                
                # activate checking in data slice later
                self.checkOptions[symbol] = [shortPutPrice, shortCallPrice, longPutPrice, longCallPrice]
        
        # check option for activated symbols
        for symbol, check in self.checkOptions.items():
            if not check: continue
        
            # get option contracts
            shortPut, shortCall, longPut, longCall = self.fcn.GetOptionContracts(symbol, check[0], check[1], check[2], check[3])
            # if no contract return, discontinue
            if any([contract is None for contract in [shortCall, shortPut, longCall, longPut]]): continue
            
            # get postion size for trading the options
            quantity = self.fcn.GetPositionSize(shortPut, shortCall, longPut, longCall)
            # if no profitability expected, don't trade
            if quantity is None: continue
        
            # deactivate check
            self.checkOptions[symbol] = False
            
            # trade short Iron Condor, should be same quantity
            shortPutOrder = self.Sell(shortPut.Symbol, quantity)
            shortCallOrder = self.Sell(shortCall.Symbol, quantity)
            longPutOrder = self.Buy(longPut.Symbol, quantity)
            longCallOrder = self.Buy(longCall.Symbol, quantity)
            [self.tickets[symbol].append(order.Symbol) for order in [shortPutOrder, shortCallOrder, longPutOrder, longCallOrder]]