| Overall Statistics |
|
Total Trades 632 Average Win 0.63% Average Loss -0.66% Compounding Annual Return -2.067% Drawdown 35.500% Expectancy -0.032 Net Profit -11.030% Sharpe Ratio -0.097 Probabilistic Sharpe Ratio 0.384% Loss Rate 51% Win Rate 49% Profit-Loss Ratio 0.96 Alpha 0.001 Beta -0.075 Annual Standard Deviation 0.124 Annual Variance 0.015 Information Ratio -0.831 Tracking Error 0.228 Treynor Ratio 0.16 Total Fees $503.00 Estimated Strategy Capacity $27000000.00 Lowest Capacity Asset AAPL 31PB4NYHEJJVQ|AAPL R735QTJ8XC9X |
import numpy as np
from scipy.stats import norm
from statsmodels.tsa.arima.model import ARIMA
class SignalProcessor:
''' Models for volatility and price prediction '''
def __init__(self, algo):
''' initiate a new instance of Model class object
Args:
algo: the algorithm instance'''
self.algo = algo
# initialize variable holding ARIMAX model
self.arimax = None
def processAltHistory(self, altHistoryDf):
''' process the raw Brain Language Metric of Company Filings historical data dataframe
Args:
altHistoryDf: (n x m df) the raw historical data dataframe
Return:
managementSentiment: (n x 1 series) series of sentiment level of management analysis off financial and operation
reportSentiment: (n x 1 series) series of sentiment level of financial report
riskSentiment: (n x 1 series) series of sentiment level of risk factor statement'''
managementSentiment = altHistoryDf.managementdiscussionanalyasisoffinancialconditionandresultsofoperations.unstack("symbol")
reportSentiment = altHistoryDf.reportsentiment.unstack("symbol")
riskSentiment = altHistoryDf.riskfactorsstatementsentiment.unstack("symbol")
# we extract the sentiment levels from them
def getSentiment(value):
# we are not able to extract the ones without value (nan, which are float object)
if not isinstance(value, float):
sentiment = value.Sentiment
if sentiment >= 0:
return sentiment
else:
return abs(sentiment) * 1.5
# fill nan with 0
else:
return 0
# apply the function of extraction to the dataframes
managementSentiment = managementSentiment.applymap(getSentiment)
reportSentiment = reportSentiment.applymap(getSentiment)
riskSentiment = riskSentiment.applymap(getSentiment)
# align index
managementSentiment.index = managementSentiment.index.date + timedelta(days=1)
reportSentiment.index = reportSentiment.index.date + timedelta(days=1)
riskSentiment.index = riskSentiment.index.date + timedelta(days=1)
return managementSentiment, reportSentiment, riskSentiment
def ARIMAXFitting(self, volatilities, altHistoryDf):
''' fitting a ARIMAX model for volatility modelling
Args:
volatilities: (n x 1 array) annualized volatility data series
altHistoryDf: (n x m df) the raw historical data dataframe'''
# data processing
managementSentiment, reportSentiment, riskSentiment = self.processAltHistory(altHistoryDf)
# convert volatility series into 2-d
volatilities = pd.DataFrame(volatilities)
# align our data
df = pd.concat([volatilities, managementSentiment, reportSentiment, riskSentiment], axis=1).fillna(0)
df.columns = ["volatility", "managementSentiment", "reportSentiment", "riskSentiment"]
# ARIMAX, p & d & q from research ACF/PACF
arima = ARIMA(df["volatility"], df[["managementSentiment", "reportSentiment", "riskSentiment"]],
order=(1, 1, 1))
try:
self.arimax = arima.fit()
return True
except:
return False
def VolatilityPrediction(self, n):
''' volatility prediction from ARIMAX model
Args:
n: (int) number of steps of prediction
Return:
(n x 3 array) series of prediction of the next step of volatility'''
# set up zeros matrix (day without sentiment data consider 0, suppose no entry within 1 month)
exog = np.zeros((n, 3))
return self.arimax.predict(n, exog=exog)
def PricePrediction(self, S_0, mu, volatility, iters=20000):
''' underlying stcok price prediction from SDE
Args:
S_0: (float) initial stock price at time 0
mu: (float) expectation of return
volatility: (n x 1 array) the predicted volatility series
iters: (int) number of iteration of Monte Carlo simulation
Return:
(n size list) list of predicted terminal price'''
# each time step length
t = 1/365
# set variable as terminal price of each step, at time 0 = current price
S_t = np.array([S_0]*iters)
# we predict the number of steps the volatility series provided
for i in range(len(volatility)):
# MonteCarlo simulation
S_t = S_t * np.exp(t * (mu - volatility.iloc[i]**2/2) + volatility.iloc[i] * np.sqrt(t) * norm.rvs(size=iters)).reshape(-1, 1)
return S_tclass Functions:
def __init__(self, algo):
''' a new instance of class Functions
Args:
alog: algorithm instance'''
self.algo = algo
def GetOptionContracts(self, symbol, shortPutPrice, shortCallPrice, longPutPrice, longCallPrice):
''' get the contract for trading the Iron Condor
Args:
symbol: the underlying symbol
shortPutPrice: desire strike for put contract to be short selled
shortCallPrice: desire strike for call contract to be short selled
longPutPrice: desire strike for put contract to be long brought
longCallPrice: desire strike for call contract to be long brought
Return:
(option contracts x4) the 4 contracts to trade as Iron Condor'''
# get list of options
contracts = self.algo.OptionChainProvider.GetOptionContractList(symbol, self.algo.Time)
# return None if no option contract available
if not contracts: return None, None, None, None
# only within 30 days and within 20 strikes as wider ones are prone to have slippage
contracts = self.InitialFilter(symbol, contracts, -20, 20, 0, 30)
# get calls
calls = [contract for contract in contracts if contract.ID.OptionRight == OptionRight.Call]
if not calls: return None, None, None, None
# get contract closest to short call price from Percentile% and a close expiration within a month
shortCallContract = sorted(sorted(calls, key = lambda x: x.ID.Date, reverse=True),
key = lambda x: abs(shortCallPrice - x.ID.StrikePrice))[0]
# get contract closest to long call price from Percentile% and same expiration date
sameExpiryCalls = [contract for contract in calls if contract.ID.Date == shortCallContract.ID.Date]
longCallContract = sorted(sameExpiryCalls, key = lambda x: abs(longCallPrice - x.ID.StrikePrice))[0]
# get puts with same expiration
sameExpiryPuts = [contract for contract in contracts if contract.ID.OptionRight == OptionRight.Put \
and contract.ID.Date == shortCallContract.ID.Date]
if not sameExpiryPuts: return None, None, None, None
# get contract closest to short put price from Percentile% and same expiration date
shortPutContract = sorted(sameExpiryPuts, key = lambda x: abs(shortPutPrice - x.ID.StrikePrice))[0]
# get contract closest to long put price from Percentile% and same expiration date
match = [contract for contract in sameExpiryPuts if contract.ID.StrikePrice == shortPutContract.ID.StrikePrice - \
longCallContract.ID.StrikePrice + shortCallContract.ID.StrikePrice]
# discontinue if no contract match Iron Condor requirement
if not match: return None, None, None, None
longPutContract = match[0]
# subscribe
return self.algo.AddOptionContract(shortPutContract, Resolution.Minute), \
self.algo.AddOptionContract(shortCallContract, Resolution.Minute), \
self.algo.AddOptionContract(longPutContract, Resolution.Minute), \
self.algo.AddOptionContract(longCallContract, Resolution.Minute)
def InitialFilter(self, underlyingsymbol, symbol_list, min_strike_rank, max_strike_rank, min_expiry, max_expiry):
''' This method is an initial filter of option contracts
based on the range of strike price and the expiration date
https://www.quantconnect.com/tutorials/applied-options/iron-condor'''
if len(symbol_list) == 0 : return
# fitler the contracts based on the expiry range
contract_list = [i for i in symbol_list if min_expiry <= (i.ID.Date.date() - self.algo.Time.date()).days <= max_expiry]
# find the strike price of ATM option
atm_strike = sorted(contract_list,
key = lambda x: abs(x.ID.StrikePrice - self.algo.Securities[underlyingsymbol].Price))[0].ID.StrikePrice
strike_list = sorted(set([i.ID.StrikePrice for i in contract_list]))
# find the index of ATM strike in the sorted strike list
atm_strike_rank = strike_list.index(atm_strike)
try:
min_strike = strike_list[atm_strike_rank + min_strike_rank + 1]
max_strike = strike_list[atm_strike_rank + max_strike_rank - 1]
except:
min_strike = strike_list[0]
max_strike = strike_list[-1]
# skip weekly options
filtered_contracts = [i for i in contract_list if i.ID.StrikePrice >= min_strike \
and i.ID.StrikePrice <= max_strike \
and not OptionSymbol.IsWeekly(i)]
return filtered_contracts
def GetPositionSize(self, shortPut, shortCall, longPut, longCall):
''' get the postion size as discrete individual bet by Kelly Criterion
Args:
shortPut: put contract plan to be short selled
shortCall: call contract plan to be short selled
longPut: put contract plan to be long brought
longCall: call contract plan to be long brought
Return:
(int) the absolute number of contracts should be buy/short for each contract'''
# get the spread first
spread = longCall.StrikePrice - shortCall.StrikePrice
# get all contract's current cost
shortCallValue = shortCall.BidPrice
shortPutValue = shortPut.BidPrice
longCallValue = longCall.AskPrice
longPutValue = longPut.AskPrice
# total premium, minus commission 0.5*4
premium = shortCallValue + shortPutValue - longCallValue - longPutValue - 2
# return None to discontinue if no profitability
if premium <= 0: return None
# bet size with option contract multiplier 100
bet = shortCallValue + shortPutValue + longCallValue + longPutValue
# maximum profit and loss %
maxProfitPercent = premium / bet
maxLossPercent = (spread - premium) / bet
# breakeven point, we need either one only as it's symmetric
# if we factor spread as 1, this is the width of upper triangle (profit)
breakEvenRatio = premium / spread
# triangular area, up side is +, down side is -,
# we don't need to divide by 2 as there's 2 tails like this
triangularArea = maxProfitPercent*breakEvenRatio - maxLossPercent*(1 - breakEvenRatio)
# multiply by the ratio of all possibilities (difference in Percentile, already represent 2 tails)
triangularArea *= self.algo.longPercentileLevel - self.algo.shortPercentileLevel
# bet ratio, follows the equation in research notebook
f = (maxProfitPercent * self.algo.shortPercentileLevel - maxLossPercent * (self.algo.longPercentileLevel - 1)) \
/(maxProfitPercent * maxLossPercent * (self.algo.shortPercentileLevel - self.algo.longPercentileLevel + 1)) \
- triangularArea
# half the size such that half the risk but only 3/4 the expected return (Ed Thorp)
f *= 0.5
# since the ratio f is directly taking bet as base, we can use that as number of contract
return max(min(int(f/100), self.algo.maxSize), 1)from QuantConnect.DataSource import BrainCompanyFilingLanguageMetricsAll
import pandas as pd
from scipy.stats import norm
from Functions import Functions
from SignalProcessor import SignalProcessor
class CrawlingTanJellyfish(QCAlgorithm):
def Initialize(self):
# Parameter: determining the strike zone of iron condor
# shortPercentileLevel must be smaller than longPercentileLevel
self.shortPercentileLevel = 0.85
self.longPercentileLevel = 0.95
# parameter: maximum order size
self.maxSize = 5
self.SetStartDate(2016, 1, 1)
self.SetCash(100000)
self.SetSecurityInitializer(self.CustomSecurityInitializer)
tickers = ["FB", "AAPL", "AMZN", "NFLX", "GOOG"]
self.symbols = [self.AddEquity(symbol, Resolution.Minute).Symbol for symbol in tickers]
# dict contain corresponding alt data symbol for each symbol
self.altSymbols = {}
# dict contain corresponding ARIMAX model for each symbkl
self.model = {}
# dict contain corresponding entry condition check indicator
self.checkOptions = {symbol: False for symbol in self.symbols}
# dict contain corresponding historical trade bar data, we use 5+1 year (1 year offset annual volatility)
self.history = {symbol: RollingWindow[TradeBar](252*6) for symbol in self.symbols}
# dict contain order tickets
self.tickets = {symbol: [] for symbol in self.symbols}
# warm up rolling windows
data = self.History(self.symbols, 252*6, Resolution.Daily)
for symbol in self.symbols:
for time, bar in data.loc[symbol].iterrows():
tradeBar = TradeBar(time, symbol, bar.open, bar.high, bar.low, bar.close, bar.volume)
self.history[symbol].Add(tradeBar)
# set up consolidator for future auto-update
self.Consolidate(symbol, Resolution.Daily, self.DailyBarHandler)
# schedule daily check for entry position if signal active
self.Schedule.On(self.DateRules.EveryDay("AAPL"), self.TimeRules.At(10, 0), self.Entry)
# class Functions for utilities
self.fcn = Functions(self)
def CustomSecurityInitializer(self, security):
security.SetDataNormalizationMode(DataNormalizationMode.Raw)
security.SetMarketPrice(self.GetLastKnownPrice(security))
def DailyBarHandler(self, bar):
self.history[bar.Symbol].Add(bar)
def OnData(self, data):
# If any symbol is going to have a split, we close our position
for symbol in self.symbols:
if data.Splits.ContainsKey(symbol):
split = data.Splits[symbol]
# will have a split on next day
if split.Type == 0:
[self.SetHoldings(order, 0) for order in self.tickets[symbol]]
self.SetHoldings(symbol, 0)
self.tickets[symbol] = []
def OnSecuritiesChanged(self, changes):
for security in changes.AddedSecurities:
if security.Type == SecurityType.Equity:
# store the dataset symbol
self.altSymbols[security.Symbol] = self.AddData(BrainCompanyFilingLanguageMetricsAll, security.Symbol).Symbol
# create new ARIMAX model for symbol
self.CreateModel(security.Symbol)
def OnOrderEvent(self, orderEvent):
# liquidate if we have options being exercised at expiration
if orderEvent.IsAssignment:
order = self.Transactions.GetOrderById(orderEvent.OrderId)
if order.Type == OrderType.OptionExercise:
for symbol, orders in self.tickets.items():
if orderEvent.Symbol in orders:
[self.SetHoldings(order, 0) for order in orders]
self.SetHoldings(symbol, 0)
orders = []
def CreateModel(self, symbol):
''' creation of ARIMAX model
Args:
the underlying symbol'''
# initiate new model
self.model[symbol] = SignalProcessor(self)
# set up the model with 5 year data warm up
altHistory = self.History(self.altSymbols[symbol], timedelta(days=365*5), Resolution.Daily)
# get annualized volatility
data = pd.DataFrame(self.history[symbol])[::-1]
history = data.applymap(lambda bar: bar.Close)
history.index = data.applymap(lambda bar: bar.EndTime.date()).values.flatten().tolist()
volatility = history.pct_change().rolling(252).std().dropna()
# fit ARIMAX model
self.model[symbol].ARIMAXFitting(volatility, altHistory)
def Entry(self):
''' check if entry conditions satisfied
if yes, enter short Iron Condor position'''
for symbol, symbolData in self.altSymbols.items():
altHistory = self.History(symbolData, timedelta(days=365*5), Resolution.Daily)
# if no newly updated data, return
if altHistory.empty or self.Time - altHistory.unstack(0).index[-1] > timedelta(days=1): continue
# liquidate previous orders if we have updated data
[self.SetHoldings(order, 0) for order in self.tickets[symbol]]
self.SetHoldings(symbol, 0)
self.tickets[symbol] = []
# get annualized volatility
data = pd.DataFrame(self.history[symbol])[::-1]
history = data.applymap(lambda bar: bar.Close)
history.index = data.applymap(lambda bar: bar.EndTime.date()).values.flatten().tolist()
volatility = history.pct_change().rolling(252).std().dropna()
# update the ARIMAX model
success = self.model[symbol].ARIMAXFitting(volatility, altHistory)
if not success: continue
# get next 21 day prediction on volatility
predictions = self.model[symbol].VolatilityPrediction(21)
# we continue only if the predicted volatility will drop
if volatility.values[-1] > predictions.iloc[-1]:
# get terminal price predicion of n paths
terminalPrices = self.model[symbol].PricePrediction(history.iloc[-1], np.mean(history.pct_change()[1:].values), predictions)
# fit for normal distribution, since we're using N(0, sigma^2) for Monte Carlo, we can assume normal
loc, scale = norm.fit(terminalPrices)
# get preset level desired price range
shortPutPrice = norm.ppf((1-self.shortPercentileLevel)/2, loc=loc, scale=scale)
shortCallPrice = norm.ppf((1+self.shortPercentileLevel)/2, loc=loc, scale=scale)
longPutPrice = norm.ppf((1-self.longPercentileLevel)/2, loc=loc, scale=scale)
longCallPrice = norm.ppf((1+self.longPercentileLevel)/2, loc=loc, scale=scale)
# activate checking in data slice later
self.checkOptions[symbol] = [shortPutPrice, shortCallPrice, longPutPrice, longCallPrice]
# check option for activated symbols
for symbol, check in self.checkOptions.items():
if not check: continue
# get option contracts
shortPut, shortCall, longPut, longCall = self.fcn.GetOptionContracts(symbol, check[0], check[1], check[2], check[3])
# if no contract return, discontinue
if any([contract is None for contract in [shortCall, shortPut, longCall, longPut]]): continue
# get postion size for trading the options
quantity = self.fcn.GetPositionSize(shortPut, shortCall, longPut, longCall)
# if no profitability expected, don't trade
if quantity is None: continue
# deactivate check
self.checkOptions[symbol] = False
# trade short Iron Condor, should be same quantity
shortPutOrder = self.Sell(shortPut.Symbol, quantity)
shortCallOrder = self.Sell(shortCall.Symbol, quantity)
longPutOrder = self.Buy(longPut.Symbol, quantity)
longCallOrder = self.Buy(longCall.Symbol, quantity)
[self.tickets[symbol].append(order.Symbol) for order in [shortPutOrder, shortCallOrder, longPutOrder, longCallOrder]]