| Overall Statistics |
|
Total Trades 344 Average Win 0.39% Average Loss -0.36% Compounding Annual Return 6.275% Drawdown 11.400% Expectancy 0.278 Net Profit 36.337% Sharpe Ratio 0.852 Probabilistic Sharpe Ratio 33.684% Loss Rate 39% Win Rate 61% Profit-Loss Ratio 1.09 Alpha 0.054 Beta -0.024 Annual Standard Deviation 0.062 Annual Variance 0.004 Information Ratio -0.021 Tracking Error 0.179 Treynor Ratio -2.168 Total Fees $469.49 |
from datetime import timedelta
import numpy as np
import pandas as pd
import traceback
# https://www.cboe.com/micro/vix/vixwhite.pdf
# look here for some pandas inspiration.
# https://github.com/khrapovs/vix/blob/master/vix/reproduce_vix.py
# https://github.com/michaelchu/optopsy
def compute_iv(chain,interest1mnth=0.0003,interest2mnth=0.00031):
if len([x for x in chain]) < 5:
#raise ValueError("not enought data?")
return np.nan
try:
near_term_expiry = sorted([x for x in chain if (x.Expiry-x.Time).days <= 30], key=lambda x: x.Expiry)[-1].Expiry
next_term_expiry = sorted([x for x in chain if (x.Expiry-x.Time).days > 30], key=lambda x: x.Expiry)[0].Expiry
except:
#raise ValueError("near and next term option not found?")
return np.nan
data = {}
# filter the call and put options from the contracts
for name,expiry,interest_rate in [('near',near_term_expiry,interest1mnth),('next',next_term_expiry,interest2mnth)]:
calls = {x.Strike:x for x in chain if x.Expiry == expiry and x.Right == 0}
puts = {x.Strike:x for x in chain if x.Expiry == expiry and x.Right == 1}
if len(calls) < 5 or len(puts) < 5:
#raise ValueError("bad data??")
return np.nan
# transform
mydata = []
strikes = sorted([x for x in calls.keys()])
price = list(calls.values())[0].UnderlyingLastPrice
timenow = list(calls.values())[0].Time
nt = (expiry-timenow).days*24*60
t = nt / 525600
r = interest_rate
p_ind = np.argmin([np.abs(x-price) for x in strikes])
p_strike = strikes[p_ind]
call_df = pd.DataFrame([ \
dict(strike=x.Strike,call_bid=x.BidPrice,call_ask=x.AskPrice) \
for x in chain if x.Expiry == expiry and x.Right == 0]).sort_values("strike")
put_df = pd.DataFrame([ \
dict(strike=x.Strike,put_bid=x.BidPrice,put_ask=x.AskPrice) \
for x in chain if x.Expiry == expiry and x.Right == 1]).sort_values("strike")
merged = call_df.merge(put_df,on='strike')
merged['call_mid']=(merged.call_bid+merged.call_ask)/2
merged['put_mid']=(merged.put_bid+merged.put_ask)/2
# merged columns:
# call_bid,call_mid,call_ask,strike,put_bid,put_mid,put_ask
atm_ind = np.argmin(np.abs((merged.call_mid-merged.put_mid).values))
y = interest_rate
c = merged.call_mid.values[atm_ind]
p = merged.put_mid.values[atm_ind]
s = merged.strike.values[atm_ind]
f = s+np.exp(y*t)*(c-p)
k0 = np.max([x for x in strikes if x<f])
merged = merged.sort_values('strike',ascending=True)
merged['call_bid_zero_cumsum']=(merged['call_bid']==0).cumsum()
merged = merged.sort_values('strike',ascending=False)
merged['put_bid_zero_cumsum']=(merged['put_bid']==0).cumsum()
merged = merged.sort_values('strike',ascending=True)
# filter out super otm contracts, i.e. all(cumsum<=1,bid!=0,strike>price)
merged['use_put'] = np.logical_and((merged.put_bid!=0).values,np.logical_and((merged.put_bid_zero_cumsum <= 1).values,(merged.strike < k0).values))
merged['use_put_and_call'] = merged.strike == k0
merged['use_call'] = np.logical_and((merged.call_bid!=0).values,np.logical_and((merged.call_bid_zero_cumsum <= 1).values,(merged.strike > k0).values))
merged['strike_p1'] = merged.strike.shift(-1)
merged['strike_m1'] = merged.strike.shift(1)
def compute_contrib(x):
if np.isnan(x.strike_m1) or np.isnan(x.strike_p1):
return np.nan
if x.use_put_and_call:
qk = np.mean([x.put_bid,x.put_ask,x.call_bid,x.call_ask])
elif x.use_call:
qk = np.mean([x.call_bid,x.call_ask])
elif x.use_put:
qk = np.mean([x.put_bid,x.put_ask])
else:
return np.nan
dk = (x.strike_p1-x.strike_m1)/2
k2 = x.strike**2
contrib = (dk/k2)*np.exp(r*t)*qk
return contrib
merged['contrib'] = merged.apply(lambda x: compute_contrib(x), axis=1)
# compute voaltility at x-term
sigsqr = (2/t)*np.nansum(merged.contrib) - (1/t)*((f/k0)-1)**2
data[name] = dict(
t=t,
nt=nt,
sigsqr=sigsqr,
)
if len(data) != 2:
return np.nan
n365 = 525600
n30 = 43200
t1 = data['near']['t']
t2 = data['next']['t']
nt1 = data['near']['nt']
nt2 = data['next']['nt']
sigsqr1 = data['near']['sigsqr']
sigsqr2 = data['next']['sigsqr']
iv = 100 *np.sqrt(
(t1*sigsqr1*((nt2-n30)/(nt2-nt1))+t2*sigsqr2*((n30-nt1)/(nt2-nt1)))*(n365/n30)
)
return iv
from QuantConnect.Data.Custom import Quandl
from QuantConnect.Python import PythonData
from QuantConnect.Data import SubscriptionDataSource
from datetime import datetime, timedelta
import decimal
class MyYield(PythonData):
def GetSource(self, config, date, isLiveMode):
url = "na"
return SubscriptionDataSource(url,SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLiveMode):
if not (line.strip() and line[0].isdigit()): return None
inst = MyYield()
inst.Symbol = config.Symbol
try:
data = line.split(',')
# # Make sure we only get this data AFTER trading day - don't want forward bias.
inst.Time = datetime.strptime(data[0], '%Y-%m-%d')+timedelta(hours=20)
inst.Value = decimal.Decimal(data[1])
except:
return None
return inst
class MyYield1mnth(MyYield):
def GetSource(self, config, date, isLiveMode):
url = "https://fred.stlouisfed.org/graph/fredgraph.csv?id=TB4WK"
return SubscriptionDataSource(url,SubscriptionTransportMedium.RemoteFile)
class MyYield3mnth(MyYield):
def GetSource(self, config, date, isLiveMode):
url = "https://fred.stlouisfed.org/graph/fredgraph.csv?id=TB3MS"
return SubscriptionDataSource(url,SubscriptionTransportMedium.RemoteFile)
from collections import deque
from scipy import stats
class OptionAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2015, 3, 1)
self.SetEndDate(2020, 4, 1)
self.SetCash(100000)
resolution = Resolution.Minute
self.SetBenchmark("SPY")
self.spy = self.AddEquity("SPY", resolution).Symbol
option = self.AddOption(self.spy, resolution)
option.SetFilter(self.UniverseFunc)
self.vix = self.AddData(Quandl,"CHRIS/CBOE_VX1", Resolution.Daily).Symbol
self.ivr = None
self.slice = None
self.period = 125
self.queue = deque(maxlen=self.period)
self.interest1mnth = np.nan
self.interest2mnth = np.nan
self.interest3mnth = np.nan
self.yield_1mnth = self.AddData(MyYield1mnth, "MYYIELD").Symbol
self.yield_3mnth = self.AddData(MyYield3mnth, "MYYIELD").Symbol
self.Schedule.On(
self.DateRules.EveryDay(),
self.TimeRules.AfterMarketOpen(self.spy, 60),
Action(self.MyCompute)
)
self.Schedule.On(
self.DateRules.EveryDay(),
self.TimeRules.AfterMarketOpen(self.spy, 95),
Action(self.MyTrade)
)
myplot = Chart('myplot')
# plot actual, expected, rank
myplot.AddSeries(Series('iv', SeriesType.Line, 0))
myplot.AddSeries(Series('vix', SeriesType.Line, 0))
myplot.AddSeries(Series('ivr', SeriesType.Line, 0))
def UniverseFunc(self, universe):
# obtain 30 DTE options +/- 2*sd from last price.
#return universe.IncludeWeeklys().Expiration(timedelta(20), timedelta(40)).Strikes(-5,5)
data = self.History(self.spy,timedelta(days=360*2),Resolution.Daily)
sd = np.std(data['close'].pct_change(30).dropna().values)
price = data['close'].values[-1]
l_lim,u_lim = int(-2*sd*price),int(2*sd*price)
return universe.IncludeWeeklys().Expiration(timedelta(20), timedelta(40)).Strikes(l_lim,u_lim)
def OnData(self,slice):
self.slice = slice
if slice.ContainsKey(self.yield_1mnth):
self.interest1mnth = slice[self.yield_1mnth].Value
if slice.ContainsKey(self.yield_3mnth):
self.interest3mnth = slice[self.yield_3mnth].Value
if not np.isnan(self.interest1mnth) and not np.isnan(self.interest3mnth):
self.interest2mnth = np.mean([self.interest1mnth,self.interest3mnth])
def MyTrade(self):
if self.ivr is None:
return
if self.ivr > 70:
self.SetHoldings(self.spy,-0.2)
else:
# risk on.
self.SetHoldings(self.spy, 0.8)
def MyCompute(self):
slice = self.slice
if slice is None:
return
if slice.OptionChains.Count == 0:
return
for sliceitem in slice.OptionChains:
chain = sliceitem.Value
if len([x for x in chain]) < 1:
continue
iv = compute_iv(chain,interest1mnth=self.interest1mnth*0.01,interest2mnth=self.interest2mnth*0.01)
if np.isnan(iv):
continue
self.queue.append(iv)
self.Plot('myplot', 'iv', iv)
# try:
# pass
# except:
# self.Log(traceback.format_exc())
data = self.History(self.vix,timedelta(days=10),Resolution.Daily)
if len(data)>0:
values = data['close'].values
self.Plot('myplot', 'vix', values[-1])
if len(self.queue) > 10: # == self.period:
self.ivr = stats.percentileofscore(self.queue, iv)
self.Plot('myplot', 'ivr', self.ivr)