Overall Statistics |
Total Trades 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Net Profit 0% Sharpe Ratio 0 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -0.91 Tracking Error 0.12 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% |
from datetime import timedelta from QuantConnect.DataSource import * from AlgorithmImports import * import pandas as pd from scipy.stats import norm from AlgorithmImports import * from sklearn.svm import SVR from sklearn.model_selection import GridSearchCV import joblib import datetime from datetime import timedelta class CalendarSpread(QCAlgorithm): def Initialize(self): # set start/end date for backtest self.SetStartDate(2015, 10, 1) self.SetEndDate(2016, 10, 1) # set starting balance for backtest self.SetCash(1000000) self.spx = self.AddIndex("SPX", Resolution.Minute).Symbol spxOptions = self.AddIndexOption(self.spx, Resolution.Minute) spxOptions.SetFilter(lambda x: x.Strikes(-2,2).Expiration(0, 365)) ##Gets only SPX call options that are relativley ATM self.spx_history = None def find_calendar_spread_contracts(self, sorted_contracts): # Define the approximate interval we are looking for (around 2 months) desired_interval = timedelta(days=60) # Iterate through the sorted contracts for i, short_term_contract in enumerate(sorted_contracts): # Find the expiry date for the long-term contract we are looking for long_term_expiry = short_term_contract.Expiry + desired_interval # Search for a long-term contract that matches our criteria for j in range(i + 1, len(sorted_contracts)): long_term_contract = sorted_contracts[j] # Check if this contract's expiry is within an acceptable range of our desired expiry if abs((long_term_contract.Expiry - long_term_expiry).days) <= 10: return long_term_contract, short_term_contract # If no matching pair is found, return None return None, None def calculate_implied_volatility(self, option_price, strike_price, time_to_expiration, risk_free_rate, underlying_price): tol = 1e-5 max_iterations = 100 sigma = 0.2 # Initial guess for _ in range(max_iterations): price = self.black_scholes_call(underlying_price, strike_price, time_to_expiration, risk_free_rate, sigma) vega = self.vega(underlying_price, strike_price, time_to_expiration, risk_free_rate, sigma) if vega == 0: break increment = (price - option_price) / vega sigma -= increment if abs(increment) < tol: break return sigma def black_scholes_call(self, S, K, t, r, sigma): d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * t) / (sigma * np.sqrt(t)) d2 = d1 - sigma * np.sqrt(t) return S * norm.cdf(d1) - K * np.exp(-r * t) * norm.cdf(d2) def vega(self, S, K, t, r, sigma): d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * t) / (sigma * np.sqrt(t)) return S * norm.pdf(d1) * np.sqrt(t) def FetchSPXHistory(self): start_time = datetime(2005, 1, 1) end_time = self.Time all_history_df = self.History(self.spx, start_time, end_time) all_history_df.reset_index(inplace=True) all_history_df['time'] = pd.to_datetime(all_history_df['time']).dt.date all_history_df = all_history_df.drop_duplicates(subset="time", keep="last") all_history_df = all_history_df.drop(['high', 'low', 'open', 'symbol'], axis=1) all_history_df.set_index('time', inplace=True) all_history_df.index = pd.to_datetime(all_history_df.index) spx_history = all_history_df = all_history_df.resample('M').last() return spx_history def predicted_volatility_svr_garch(self, contract): if self.spx_history is None: return all_history_df = self.spx_history new = pd.to_numeric(all_history_df['close'], errors='coerce') ret = 100 * (new.pct_change()[1:]) realized_vol = ret.rolling(5).std() # Calculate realized volatility (rolling) realized_vol = realized_vol.dropna().reset_index(drop=True) returns_svm = ret ** 2 returns_svm = returns_svm.reset_index(drop=True) X = pd.concat([realized_vol, returns_svm], axis=1, ignore_index=True) X_clean = X.dropna() # Train SVR Model svr_lin = SVR(kernel='linear') para_grid = {'gamma': sp_rand(), 'C': sp_rand(), 'epsilon': sp_rand()} clf = RandomizedSearchCV(svr_lin, para_grid) clf.fit(X_clean.values, realized_vol.values) # Predict cumulative volatility cumulative_volatility = 0 contract_expiry = contract.Expiry.date() days_to_predict = (contract_expiry - self.Time.date()).days # Loop through each day and prepare data for prediction for _ in range(days_to_predict): # Use the last 30 days of data if len(X_clean) > 30: rolling_window = X_clean[-30:] else: rolling_window = X_clean feature_value = rolling_window.mean() predicted_vol = clf.predict([[feature_value]])[0] # Add the squared predicted volatility to the cumulative total cumulative_volatility += predicted_vol ** 2 # Update X_clean with the predicted value (simulating rolling window) next_row = pd.DataFrame([[predicted_vol, predicted_vol ** 2]], columns=X_clean.columns) X_clean = pd.concat([X_clean, next_row]).iloc[1:] # Calculate the square root of the cumulative volatility cumulative_volatility = (cumulative_volatility / days_to_predict) ** 0.5 return cumulative_volatility def BuyCall(self, chains): expiry = sorted(chains,key = lambda x: x.Expiry, reverse=True)[0].Expiry calls = [i for i in chains if i.Expiry == expiry and i.Right == OptionRight.Call] call_contracts = sorted(calls,key = lambda x: abs(x.Strike - x.UnderlyingLastPrice)) if len(call_contracts) == 0: return self.call = call_contracts[0] self.Buy(self.call.Symbol, 1) def SellCall(self, chains): expiry = sorted(chains,key = lambda x: x.Expiry, reverse=True)[0].Expiry calls = [i for i in chains if i.Expiry == expiry and i.Right == OptionRight.Call] call_contracts = sorted(calls,key = lambda x: abs(x.Strike - x.UnderlyingLastPrice)) if len(call_contracts) == 0: return self.call = call_contracts[0] self.Sell(self.call.Symbol, -1) def OnData(self, data:Slice): if self.spx not in data.Bars: return option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option] if option_invested: if self.Time + timedelta(2) > option_invested[0].ID.Date: self.Liquidate(option_invested[0], "Too close to expiration") return for chain in data.OptionChains.Values: sorted_contracts = sorted(chain.Contracts.Values, key = lambda x: x.Expiry) long_term_contract, short_term_contract = self.find_calendar_spread_contracts(sorted_contracts) if long_term_contract and short_term_contract: long_iv = self.calculate_implied_volatility(long_term_contract.LastPrice, long_term_contract.Strike, (long_term_contract.Expiry - self.Time).days / 365, 0.039, data[self.spx].Price) short_iv = self.calculate_implied_volatility(short_term_contract.LastPrice, short_term_contract.Strike, (short_term_contract.Expiry - self.Time).days / 365, 0.039, data[self.spx].Price) # Predicted volatility using SVR-GARCH if self.predicted_volatility_svr_garch(long_term_contract) is None: return if self.predicted_volatility_svr_garch(short_term_contract) is None: return predicted_volatility_long = self.predict_volatility_svr_garch(long_term_contract) predicted_volatility_short = self.predict_volatility_svr_garch(short_term_contract) if predicted_volatility_long < long_iv: self.BuyCall(long_term_contract) if predicted_volatility_short < short_iv: self.SellCall(short_term_contract)