Overall Statistics
Total Orders
0
Average Win
0%
Average Loss
0%
Compounding Annual Return
0%
Drawdown
0%
Expectancy
0
Start Equity
100000
End Equity
100000
Net Profit
0%
Sharpe Ratio
0
Sortino Ratio
0
Probabilistic Sharpe Ratio
0%
Loss Rate
0%
Win Rate
0%
Profit-Loss Ratio
0
Alpha
0
Beta
0
Annual Standard Deviation
0
Annual Variance
0
Information Ratio
-2.05
Tracking Error
0.106
Treynor Ratio
0
Total Fees
$0.00
Estimated Strategy Capacity
$0
Lowest Capacity Asset
Portfolio Turnover
0%
from AlgorithmImports import *
import math
import os
import csv
import pandas as pd
from datetime import timedelta
from scipy.optimize import brentq


class VolatilitySurface(QCAlgorithm):
    def initialize(self):
        self.set_start_date(2024, 1, 1)
        self.set_end_date(2025, 1, 1)
        self.set_cash(100_000)
        self.equity = self.add_equity("SPY", Resolution.MINUTE).Symbol
        self.option = self.add_option("SPY", Resolution.MINUTE)
        self.option.set_filter(-10, +10, timedelta(0), timedelta(90))
        self.vol_surface_data = []

    def on_data(self, slice):
        chain = slice.option_chains.get(self.option.Symbol)
        if chain is None:
            return
        for contract in chain:
            expiry = contract.Expiry
            t = (expiry - self.Time).total_seconds() / (365.25 * 24 * 3600)
            if t <= 0:
                continue
            strike = float(contract.strike)
            option_type = contract.right
            bid = contract.bid_price
            ask = contract.ask_price
            if bid > 0 and ask > 0:
                market_price = (bid + ask) / 2.0
            else:
                market_price = contract.last_price
            if market_price <= 0:
                continue
            underlying_price = self.securities[self.equity].price
            try:
                iv = self.calculate_implied_volatility(option_type, underlying_price, strike, t, 0.0, market_price)
            except BaseException as e:
                continue
            self.vol_surface_data.append((strike, t, iv, option_type))

    def calculate_implied_volatility(self, option_type, S, K, T, r, market_price):
        def bs_price(vol):
            d1 = (math.log(S / K) + (r + 0.5 * vol * vol) * T) / (vol * math.sqrt(T))
            d2 = d1 - vol * math.sqrt(T)
            if option_type == OptionRight.CALL:
                price = S * self.normal_cdf(d1) - K * math.exp(-r * T) * self.normal_cdf(d2)
            else:
                price = K * math.exp(-r * T) * self.normal_cdf(-d2) - S * self.normal_cdf(-d1)
            return price

        def objective(vol):
            return bs_price(vol) - market_price

        try:
            iv = brentq(objective, 0.001, 5.0, maxiter=100)
        except BaseException as e:
            raise BaseException("Could not find implied volatility")
        return iv

    def normal_cdf(self, x):
        return 0.5 * (1.0 + math.erf(x / math.sqrt(2)))

    def OnEndOfAlgorithm(self):
        # Create a DataFrame from the collected vol_surface_data.
        # Ensure the column names match your data tuple: Strike, TimeToExpiry, ImpliedVol, OptionType.
        df = pd.DataFrame(self.vol_surface_data, 
                          columns=["Strike", "TimeToExpiry", "ImpliedVol", "OptionType"])
        
        # Get the file path from the Object Store for the key "vol_surface_data"
        file_path = self.ObjectStore.GetFilePath("vol_surface_data.csv")
        
        # Save the DataFrame as a CSV file to the Object Store.
        df.to_csv(file_path, index=False)
        
        self.Debug(f"Volatility surface DataFrame CSV saved to Object Store at: {file_path}")