Overall Statistics
Total Orders
116
Average Win
0.08%
Average Loss
-0.05%
Compounding Annual Return
16.122%
Drawdown
6.200%
Expectancy
1.219
Start Equity
100000
End Equity
117605.58
Net Profit
17.606%
Sharpe Ratio
0.695
Sortino Ratio
0.779
Probabilistic Sharpe Ratio
63.656%
Loss Rate
14%
Win Rate
86%
Profit-Loss Ratio
1.59
Alpha
-0.019
Beta
0.604
Annual Standard Deviation
0.083
Annual Variance
0.007
Information Ratio
-1.025
Tracking Error
0.068
Treynor Ratio
0.096
Total Fees
$117.64
Estimated Strategy Capacity
$5000000.00
Lowest Capacity Asset
EMQQ VVJK2NHU61R9
Portfolio Turnover
0.31%
from AlgorithmImports import *
from datetime import datetime, timedelta


def get_third_friday(year, month):
    """
    Get the date of the third Friday of the given month and year.
    """
    # Start with the first day of the month
    first_day = datetime(year, month, 1)
    first_friday = first_day + timedelta(days=(4 - first_day.weekday() + 7) % 7)
    
    # Add two weeks to get to the third Friday
    third_friday = first_friday + timedelta(weeks=2)
    
    return third_friday.date()

def is_monthly_expiration_week(current_date):
    """
    Check if current date is in the monthly options expiration week.
    Returns:
        tuple: (bool, int, str) - (is_expiration_week, days_until_expiration, day_description)
              days_until_expiration will be:
              2 for Wednesday before expiration
              1 for Thursday before expiration
              0 for expiration Friday
              -1 if not in expiration week
              day_description will describe the current day (e.g., 'Wednesday before expiration').
    """

    third_friday = get_third_friday(current_date.year, current_date.month)
    
    # Find the Wednesday of expiration week
    expiration_wednesday = third_friday - timedelta(days=2)  # Wednesday
    expiration_thursday = third_friday - timedelta(days=1)    # Thursday
    
    # Calculate days until expiration
    days_until = (third_friday - current_date).days
    
    # Check if current date is within Wednesday to Friday of expiration week
    if expiration_wednesday <= current_date <= third_friday:
        if days_until == 2:
            return 2
        elif days_until == 1:
            return 1
    
    return False

# region imports
from AlgorithmImports import *
from date_utils import get_third_friday, is_monthly_expiration_week
# endregion

class MonthlyIVAlgorithm(QCAlgorithm):
    def Initialize(self):
        self.set_start_date(2024, 1, 1)
        self.set_end_date(2025, 1, 30)
        self.InitCash = 100000
        self.set_cash(self.InitCash) 

        self.MKT = self.AddEquity("SPY", Resolution.Daily).Symbol
        self.spy = []

        self.low_risk_tickers = ["TLT", "LQD", "IEF", "BNDX", "EMB", "XLU", "GLD", "SLV", "UNG", "CPER", "DBA", "XLP", "NLR"]
        self.mid_risk_tickers = ["SPY", "ACWI", "IWM", "EFA", "EWG", "EWU", "EWJ", "IYR", "VNQI", "XLF", "XLB", "XLE", "IXJ"]
        self.high_risk_tickers = ["ARKK", "SMH", "XLK", "CQQQ", "EMQQ", "FXI", "EEM", "EWZ", "INDA", "EWT", "ILF", "EUFN", "XBI", "XRT", "XHB", "XOP", "XLY"]

        all_tickers = self.low_risk_tickers + self.mid_risk_tickers + self.high_risk_tickers

        self.symbols = [self.add_equity(ticker, Resolution.DAILY).Symbol for ticker in all_tickers]

        self.universe_settings.asynchronous = True

        for symbol in self.symbols:
            option = self.add_option(symbol, Resolution.DAILY)
            option.set_filter(self._contract_selector)
        
        self.Schedule.On(self.DateRules.EveryDay(), self.TimeRules.BeforeMarketClose('SPY', 0), 
            self.record_vars)


    def _contract_selector(self, option_filter_universe: OptionFilterUniverse) -> OptionFilterUniverse:
        return (
            option_filter_universe
            .include_weeklys()
            #.include_weeklys(False) # monthlies only
            .expiration(0, 45)
            .IV(0, 100)    # Filter contracts between 0% and 100%
        )
   
    
    def get_forward_implied_volatility(self, underlying_symbol):
        # Fetch the option chain for the given symbol
        option_chain = self.option_chain(underlying_symbol, flatten=True).data_frame
        self.Debug(f"Fetched option chain for {underlying_symbol} at {self.Time}")

        if option_chain is None or option_chain.empty:
            self.Debug(f"Option chain empty: {option_chain}")
            self.Log(f"Option chain empty: {option_chain}")
            return None

        # Calculate the next month
        next_month = self.Time.month % 12 + 1  # Get the next month
        next_year = self.Time.year if next_month > self.Time.month else self.Time.year + 1

        # Filter for options expiring in the next month (long-term options)
        next_month_options = option_chain[
            (option_chain['expiry'].dt.month == next_month) &
            (option_chain['expiry'].dt.year == next_year)
        ]

        if next_month_options.empty:
            self.error(f"No options available for the next month ({next_month})")
            self.Log(f"No options available for the next month ({next_month})")
            return None

        # Filter for options expiring this month (short-term options)
        current_month_options = option_chain[
            (option_chain['expiry'].dt.month == self.Time.month) &
            (option_chain['expiry'].dt.year == self.Time.year)
        ]

        if current_month_options.empty:
            self.error(f"No options available for the current month ({self.Time.month})")
            self.Log(f"No options available for the current month ({self.Time.month})")
            return None

        # Calculate moneyness for both sets of options
        underlying_price = self.Securities[underlying_symbol].Price
        option_chain['moneyness'] = option_chain['strike'] / underlying_price

        # Select ATM options based on moneyness
        atm_threshold = 0.05  # Only options within 5% of ATM are selected.
        short_term_atm_options = current_month_options[
            abs(option_chain['moneyness'] - 1) <= atm_threshold
        ]
        long_term_atm_options = next_month_options[
            abs(option_chain['moneyness'] - 1) <= atm_threshold
        ]

        if short_term_atm_options.empty:
            self.error(f"No ATM options available for the current month ({self.Time.month})")
            self.Log(f"No ATM options available for the current month ({self.Time.month})")
            return None

        if long_term_atm_options.empty:
            self.error(f"No ATM options available for the next month ({next_month})")
            self.Log(f"No ATM options available for the next month ({next_month})")
            return None

        # Extract the implied volatility for both sets of ATM options
        short_term_iv = short_term_atm_options['impliedvolatility'].mean()
        long_term_iv = long_term_atm_options['impliedvolatility'].mean()

        # Calculate the time to expiry between options (in years)
        T1 = 0
        T2 = (next_month_options['expiry'].iloc[0] - self.Time).days / 365.0

        if T2 == T1:
            self.Debug(f"Error: Time to expiry for both options is the same for {underlying_symbol}. Cannot calculate forward IV.")
            self.Log(f"Error: Time to expiry for both options is the same for {underlying_symbol}. Cannot calculate forward IV.")
            return None

        if (T2 * long_term_iv**2 - T1 * short_term_iv**2) < 0:
            self.Debug(f"Error: The calculated value for forward IV is negative for {underlying_symbol}, T1={T1} T2={T2}")
            self.Log(f"Error: The calculated value for forward IV is negative for {underlying_symbol}, T1={T1} T2={T2}")
            return None

        # Calculate forward implied volatility using the formula
        try:
            forward_iv = math.sqrt((T2 * long_term_iv**2 - T1 * short_term_iv**2) / (T2 - T1))
            return forward_iv
        except ValueError as e:
            self.Debug(f"Math error while calculating forward IV for {underlying_symbol}: {e}")
            return None


  

    def OnData(self, data):
        days_until_exp = is_monthly_expiration_week(self.Time.date())

        if days_until_exp != 1:
            return

        # Calculate Forward IV for each symbol on thursday before expiration
        forward_iv_values = {}
        for symbol in self.symbols:
            forward_iv = self.get_forward_implied_volatility(symbol)
            if forward_iv is not None:
                forward_iv_values[symbol.Value] = forward_iv

        if forward_iv_values:
            # Portfolio sub-allocation strategy
            risk_allocations = {
                'low_risk': 0.4,    # 40% in low-risk assets
                'mid_risk': 0.3,    # 30% in mid-risk assets
                'high_risk': 0.3    # 30% in high-risk assets
            }

            # Sort tickers within each risk category by their forward IV
            def sort_by_iv(tickers):
                return sorted(
                    [t for t in tickers if t in forward_iv_values], 
                    key=lambda x: forward_iv_values[x]
                )

            low_risk_sorted = sort_by_iv(self.low_risk_tickers)
            mid_risk_sorted = sort_by_iv(self.mid_risk_tickers)
            high_risk_sorted = sort_by_iv(self.high_risk_tickers)

            # Rebalance with risk-stratified approach
            for risk_level, allocation in risk_allocations.items():
                risk_tickers = {
                    'low_risk': low_risk_sorted,
                    'mid_risk': mid_risk_sorted,
                    'high_risk': high_risk_sorted
                }[risk_level]

                sub_portfolio_value = self.Portfolio.TotalPortfolioValue * allocation
                per_asset_allocation = sub_portfolio_value / len(risk_tickers)

                for ticker in risk_tickers:
                    symbol = self.Securities[ticker].Symbol
                    target_qty = per_asset_allocation / self.Securities[symbol].Price
                    self.set_holdings(symbol, per_asset_allocation / self.Portfolio.TotalPortfolioValue)
                    self.Log(f"{ticker}: Risk={risk_level}, IV={forward_iv_values.get(ticker, 'N/A')}")
    
    def record_vars(self):             
        hist = self.History(self.MKT, 2, Resolution.Daily)['close'].unstack(level= 0).dropna() 
        self.spy.append(hist[self.MKT].iloc[-1])
        spy_perf = self.spy[-1] / self.spy[0] * self.InitCash
        self.Plot('Strategy Equity', 'SPY', spy_perf)