| Overall Statistics |
|
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.91 Tracking Error 0.12 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% |
from datetime import timedelta
from QuantConnect.DataSource import *
from AlgorithmImports import *
import pandas as pd
from scipy.stats import norm
from AlgorithmImports import *
from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV
import joblib
import datetime
from datetime import timedelta
class CalendarSpread(QCAlgorithm):
def Initialize(self):
# set start/end date for backtest
self.SetStartDate(2015, 10, 1)
self.SetEndDate(2016, 10, 1)
# set starting balance for backtest
self.SetCash(1000000)
self.spx = self.AddIndex("SPX", Resolution.Minute).Symbol
spxOptions = self.AddIndexOption(self.spx, Resolution.Minute)
spxOptions.SetFilter(lambda x: x.Strikes(-2,2).Expiration(0, 365)) ##Gets only SPX call options that are relativley ATM
self.spx_history = None
def find_calendar_spread_contracts(self, sorted_contracts):
# Define the approximate interval we are looking for (around 2 months)
desired_interval = timedelta(days=60)
# Iterate through the sorted contracts
for i, short_term_contract in enumerate(sorted_contracts):
# Find the expiry date for the long-term contract we are looking for
long_term_expiry = short_term_contract.Expiry + desired_interval
# Search for a long-term contract that matches our criteria
for j in range(i + 1, len(sorted_contracts)):
long_term_contract = sorted_contracts[j]
# Check if this contract's expiry is within an acceptable range of our desired expiry
if abs((long_term_contract.Expiry - long_term_expiry).days) <= 10:
return long_term_contract, short_term_contract
# If no matching pair is found, return None
return None, None
def calculate_implied_volatility(self, option_price, strike_price, time_to_expiration, risk_free_rate, underlying_price):
tol = 1e-5
max_iterations = 100
sigma = 0.2 # Initial guess
for _ in range(max_iterations):
price = self.black_scholes_call(underlying_price, strike_price, time_to_expiration, risk_free_rate, sigma)
vega = self.vega(underlying_price, strike_price, time_to_expiration, risk_free_rate, sigma)
if vega == 0:
break
increment = (price - option_price) / vega
sigma -= increment
if abs(increment) < tol:
break
return sigma
def black_scholes_call(self, S, K, t, r, sigma):
d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * t) / (sigma * np.sqrt(t))
d2 = d1 - sigma * np.sqrt(t)
return S * norm.cdf(d1) - K * np.exp(-r * t) * norm.cdf(d2)
def vega(self, S, K, t, r, sigma):
d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * t) / (sigma * np.sqrt(t))
return S * norm.pdf(d1) * np.sqrt(t)
def FetchSPXHistory(self):
start_time = datetime(2005, 1, 1)
end_time = self.Time
all_history_df = self.History(self.spx, start_time, end_time)
all_history_df.reset_index(inplace=True)
all_history_df['time'] = pd.to_datetime(all_history_df['time']).dt.date
all_history_df = all_history_df.drop_duplicates(subset="time", keep="last")
all_history_df = all_history_df.drop(['high', 'low', 'open', 'symbol'], axis=1)
all_history_df.set_index('time', inplace=True)
all_history_df.index = pd.to_datetime(all_history_df.index)
spx_history = all_history_df = all_history_df.resample('M').last()
return spx_history
def predicted_volatility_svr_garch(self, contract):
if self.spx_history is None: return
all_history_df = self.spx_history
new = pd.to_numeric(all_history_df['close'], errors='coerce')
ret = 100 * (new.pct_change()[1:])
realized_vol = ret.rolling(5).std() # Calculate realized volatility (rolling)
realized_vol = realized_vol.dropna().reset_index(drop=True)
returns_svm = ret ** 2
returns_svm = returns_svm.reset_index(drop=True)
X = pd.concat([realized_vol, returns_svm], axis=1, ignore_index=True)
X_clean = X.dropna()
# Train SVR Model
svr_lin = SVR(kernel='linear')
para_grid = {'gamma': sp_rand(), 'C': sp_rand(), 'epsilon': sp_rand()}
clf = RandomizedSearchCV(svr_lin, para_grid)
clf.fit(X_clean.values, realized_vol.values)
# Predict cumulative volatility
cumulative_volatility = 0
contract_expiry = contract.Expiry.date()
days_to_predict = (contract_expiry - self.Time.date()).days
# Loop through each day and prepare data for prediction
for _ in range(days_to_predict):
# Use the last 30 days of data
if len(X_clean) > 30:
rolling_window = X_clean[-30:]
else:
rolling_window = X_clean
feature_value = rolling_window.mean()
predicted_vol = clf.predict([[feature_value]])[0]
# Add the squared predicted volatility to the cumulative total
cumulative_volatility += predicted_vol ** 2
# Update X_clean with the predicted value (simulating rolling window)
next_row = pd.DataFrame([[predicted_vol, predicted_vol ** 2]], columns=X_clean.columns)
X_clean = pd.concat([X_clean, next_row]).iloc[1:]
# Calculate the square root of the cumulative volatility
cumulative_volatility = (cumulative_volatility / days_to_predict) ** 0.5
return cumulative_volatility
def BuyCall(self, chains):
expiry = sorted(chains,key = lambda x: x.Expiry, reverse=True)[0].Expiry
calls = [i for i in chains if i.Expiry == expiry and i.Right == OptionRight.Call]
call_contracts = sorted(calls,key = lambda x: abs(x.Strike - x.UnderlyingLastPrice))
if len(call_contracts) == 0:
return
self.call = call_contracts[0]
self.Buy(self.call.Symbol, 1)
def SellCall(self, chains):
expiry = sorted(chains,key = lambda x: x.Expiry, reverse=True)[0].Expiry
calls = [i for i in chains if i.Expiry == expiry and i.Right == OptionRight.Call]
call_contracts = sorted(calls,key = lambda x: abs(x.Strike - x.UnderlyingLastPrice))
if len(call_contracts) == 0:
return
self.call = call_contracts[0]
self.Sell(self.call.Symbol, -1)
def OnData(self, data:Slice):
if self.spx not in data.Bars:
return
option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option]
if option_invested:
if self.Time + timedelta(2) > option_invested[0].ID.Date:
self.Liquidate(option_invested[0], "Too close to expiration")
return
for chain in data.OptionChains.Values:
sorted_contracts = sorted(chain.Contracts.Values, key = lambda x: x.Expiry)
long_term_contract, short_term_contract = self.find_calendar_spread_contracts(sorted_contracts)
if long_term_contract and short_term_contract:
long_iv = self.calculate_implied_volatility(long_term_contract.LastPrice, long_term_contract.Strike, (long_term_contract.Expiry - self.Time).days / 365, 0.039, data[self.spx].Price)
short_iv = self.calculate_implied_volatility(short_term_contract.LastPrice, short_term_contract.Strike, (short_term_contract.Expiry - self.Time).days / 365, 0.039, data[self.spx].Price)
# Predicted volatility using SVR-GARCH
if self.predicted_volatility_svr_garch(long_term_contract) is None: return
if self.predicted_volatility_svr_garch(short_term_contract) is None: return
predicted_volatility_long = self.predict_volatility_svr_garch(long_term_contract)
predicted_volatility_short = self.predict_volatility_svr_garch(short_term_contract)
if predicted_volatility_long < long_iv:
self.BuyCall(long_term_contract)
if predicted_volatility_short < short_iv:
self.SellCall(short_term_contract)