Overall Statistics
Total Trades
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Net Profit
0%
Sharpe Ratio
0
Sortino Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
-0.91
Tracking Error
0.12
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
Portfolio Turnover
0%
from datetime import timedelta
from QuantConnect.DataSource import *
from AlgorithmImports import *
import pandas as pd
from scipy.stats import norm 
from AlgorithmImports import *
from sklearn.svm import SVR
from sklearn.model_selection import GridSearchCV
import joblib
import datetime
from datetime import timedelta

class CalendarSpread(QCAlgorithm):

    def Initialize(self):
       # set start/end date for backtest
       self.SetStartDate(2015, 10, 1)
       self.SetEndDate(2016, 10, 1)
       # set starting balance for backtest
       self.SetCash(1000000)
       self.spx = self.AddIndex("SPX", Resolution.Minute).Symbol
       spxOptions = self.AddIndexOption(self.spx, Resolution.Minute)
       spxOptions.SetFilter(lambda x: x.Strikes(-2,2).Expiration(0, 365)) ##Gets only SPX call options that are relativley ATM
       self.spx_history = None 



    def find_calendar_spread_contracts(self, sorted_contracts):
            # Define the approximate interval we are looking for (around 2 months)
            desired_interval = timedelta(days=60)


            # Iterate through the sorted contracts
            for i, short_term_contract in enumerate(sorted_contracts):
                # Find the expiry date for the long-term contract we are looking for
                long_term_expiry = short_term_contract.Expiry + desired_interval

                # Search for a long-term contract that matches our criteria
                for j in range(i + 1, len(sorted_contracts)):
                    long_term_contract = sorted_contracts[j]
                    # Check if this contract's expiry is within an acceptable range of our desired expiry
                    if abs((long_term_contract.Expiry - long_term_expiry).days) <= 10:
                        return long_term_contract, short_term_contract

            # If no matching pair is found, return None
            return None, None


    def calculate_implied_volatility(self, option_price, strike_price, time_to_expiration, risk_free_rate, underlying_price):
        tol = 1e-5
        max_iterations = 100
        sigma = 0.2  # Initial guess

        for _ in range(max_iterations):
            price = self.black_scholes_call(underlying_price, strike_price, time_to_expiration, risk_free_rate, sigma)
            vega = self.vega(underlying_price, strike_price, time_to_expiration, risk_free_rate, sigma)

            if vega == 0:
                break

            increment = (price - option_price) / vega
            sigma -= increment

            if abs(increment) < tol:
                break

        return sigma

    def black_scholes_call(self, S, K, t, r, sigma):
        d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * t) / (sigma * np.sqrt(t))
        d2 = d1 - sigma * np.sqrt(t)
        return S * norm.cdf(d1) - K * np.exp(-r * t) * norm.cdf(d2)

    def vega(self, S, K, t, r, sigma):
        d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * t) / (sigma * np.sqrt(t))
        return S * norm.pdf(d1) * np.sqrt(t)

    def FetchSPXHistory(self):
        start_time = datetime(2005, 1, 1)
        end_time = self.Time
        all_history_df = self.History(self.spx, start_time, end_time) 
        all_history_df.reset_index(inplace=True)
        all_history_df['time'] = pd.to_datetime(all_history_df['time']).dt.date
        all_history_df = all_history_df.drop_duplicates(subset="time", keep="last")

        all_history_df = all_history_df.drop(['high', 'low', 'open', 'symbol'], axis=1)
        all_history_df.set_index('time', inplace=True)

        all_history_df.index = pd.to_datetime(all_history_df.index)
        spx_history = all_history_df = all_history_df.resample('M').last()
        return spx_history




    def predicted_volatility_svr_garch(self, contract):
        if self.spx_history is None: return
        all_history_df = self.spx_history 
        new = pd.to_numeric(all_history_df['close'], errors='coerce')
        ret = 100 * (new.pct_change()[1:])
        realized_vol = ret.rolling(5).std()  # Calculate realized volatility (rolling)
        realized_vol = realized_vol.dropna().reset_index(drop=True)
        returns_svm = ret ** 2
        returns_svm = returns_svm.reset_index(drop=True)
        X = pd.concat([realized_vol, returns_svm], axis=1, ignore_index=True)
        X_clean = X.dropna()

        # Train SVR Model
        svr_lin = SVR(kernel='linear')
        para_grid = {'gamma': sp_rand(), 'C': sp_rand(), 'epsilon': sp_rand()}
        clf = RandomizedSearchCV(svr_lin, para_grid)
        clf.fit(X_clean.values, realized_vol.values)

        # Predict cumulative volatility
        cumulative_volatility = 0
        contract_expiry = contract.Expiry.date()
        days_to_predict = (contract_expiry - self.Time.date()).days

        # Loop through each day and prepare data for prediction
        for _ in range(days_to_predict):
            # Use the last 30 days of data
            if len(X_clean) > 30:
                rolling_window = X_clean[-30:]
            else:
                rolling_window = X_clean

            feature_value = rolling_window.mean()
            predicted_vol = clf.predict([[feature_value]])[0]

            # Add the squared predicted volatility to the cumulative total
            cumulative_volatility += predicted_vol ** 2

            # Update X_clean with the predicted value (simulating rolling window)
            next_row = pd.DataFrame([[predicted_vol, predicted_vol ** 2]], columns=X_clean.columns)
            X_clean = pd.concat([X_clean, next_row]).iloc[1:]

        # Calculate the square root of the cumulative volatility
        cumulative_volatility = (cumulative_volatility / days_to_predict) ** 0.5

        return cumulative_volatility

    def BuyCall(self, chains):
        expiry = sorted(chains,key = lambda x: x.Expiry, reverse=True)[0].Expiry
        calls = [i for i in chains if i.Expiry == expiry and i.Right == OptionRight.Call]
        call_contracts = sorted(calls,key = lambda x: abs(x.Strike - x.UnderlyingLastPrice))

        if len(call_contracts) == 0:
            return
        self.call = call_contracts[0]

        self.Buy(self.call.Symbol, 1)
      




    def SellCall(self, chains):
        expiry = sorted(chains,key = lambda x: x.Expiry, reverse=True)[0].Expiry
        calls = [i for i in chains if i.Expiry == expiry and i.Right == OptionRight.Call]
        call_contracts = sorted(calls,key = lambda x: abs(x.Strike - x.UnderlyingLastPrice))

        if len(call_contracts) == 0:
            return
        self.call = call_contracts[0]

        self.Sell(self.call.Symbol, -1)



    
       
    def OnData(self, data:Slice):
        if self.spx not in data.Bars:
            return 

           
        option_invested = [x.Key for x in self.Portfolio if x.Value.Invested and x.Value.Type==SecurityType.Option]



        if option_invested:
           if self.Time + timedelta(2) > option_invested[0].ID.Date:
               self.Liquidate(option_invested[0], "Too close to expiration")
           return

        for chain in data.OptionChains.Values:
            sorted_contracts = sorted(chain.Contracts.Values, key = lambda x: x.Expiry)
            long_term_contract, short_term_contract = self.find_calendar_spread_contracts(sorted_contracts)
            if long_term_contract and short_term_contract:
                long_iv = self.calculate_implied_volatility(long_term_contract.LastPrice, long_term_contract.Strike, (long_term_contract.Expiry - self.Time).days / 365, 0.039, data[self.spx].Price)
                short_iv = self.calculate_implied_volatility(short_term_contract.LastPrice, short_term_contract.Strike, (short_term_contract.Expiry - self.Time).days / 365, 0.039, data[self.spx].Price)
                # Predicted volatility using SVR-GARCH
                if self.predicted_volatility_svr_garch(long_term_contract) is None: return
                if self.predicted_volatility_svr_garch(short_term_contract) is None: return
                predicted_volatility_long = self.predict_volatility_svr_garch(long_term_contract)
                predicted_volatility_short = self.predict_volatility_svr_garch(short_term_contract)
                if predicted_volatility_long < long_iv:
                    self.BuyCall(long_term_contract)
                if predicted_volatility_short < short_iv:
                    self.SellCall(short_term_contract)