Overall Statistics
Total Orders
79
Average Win
81.21%
Average Loss
-32.99%
Compounding Annual Return
138949.882%
Drawdown
92.700%
Expectancy
1.042
Start Equity
100000
End Equity
31401821.3
Net Profit
31301.821%
Sharpe Ratio
18019.593
Sortino Ratio
32049.778
Probabilistic Sharpe Ratio
99.212%
Loss Rate
41%
Win Rate
59%
Profit-Loss Ratio
2.46
Alpha
72159.464
Beta
1.391
Annual Standard Deviation
4.005
Annual Variance
16.036
Information Ratio
18050.102
Tracking Error
3.998
Treynor Ratio
51864.124
Total Fees
$120049.70
Estimated Strategy Capacity
$0
Lowest Capacity Asset
GLD YW2LXOJOBRZA|GLD T3SKPOF94JFP
Portfolio Turnover
11.97%
Drawdown Recovery
112
# region imports
from AlgorithmImports import *
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
# endregion


class MonthlyATMOptionsAlgorithm(QCAlgorithm):
    """
    Monthly ATM Options Strategy (SPY or SPX)
    
    Parameters:
      - start_year (int, default 2015)
      - end_year (int, default 2024)
      - notional_pct (float, default 1.0): fraction of cash to use per trade
      - option_type (str, default 'call'): 'call' or 'put'
      - index (bool, default False): if True, trade SPX index options; if False, trade SPY ETF options
      - weekly (bool, default False): if True, buy weekly options every Friday; if False, buy monthly
    
    Strategy:
      - Monthly mode: On the last trading day of each month, buy ATM options expiring within 7 days
      - Weekly mode: Every Friday, buy ATM options expiring the next Friday (7 days later)
      - For SPY options: sell 16 minutes before expiry with MOC order
      - For SPX options: hold until expiration (cash-settled)
      - Calculate and report monthly returns and Sharpe ratios
    """

    def Initialize(self) -> None:
        # --- Parameters ---
        self.start_year = int(self.GetParameter("start_year") or 2025)
        self.end_year = int(self.GetParameter("end_year") or 2025)
        self.notional_pct = float(self.GetParameter("notional_pct") or 0.45)
        self.option_type = (self.GetParameter("option_type") or "call").lower()
        self.index = (self.GetParameter("index") or "false").lower() in {"true", "1", "yes", "y"}
        self.weekly = (self.GetParameter("weekly") or "true").lower() in {"true", "1", "yes", "y"}
        
        # --- Backtest range ---
        self.SetStartDate(self.start_year, 1, 1)
        self.SetEndDate(self.end_year, 12, 31)
        self.SetCash(100000)
        self.sym = "GLD"
        
        # Add underlying and options
        if self.index:
            # SPX Index Options
            self.underlying = self.AddIndex("SPX", Resolution.Minute).Symbol
            option = self.AddIndexOption(self.underlying, Resolution.Minute)
            option.SetFilter(lambda universe: universe.IncludeWeeklys().Strikes(-5, 5).Expiration(0, 7))
            self.option_symbol = self.underlying  # For index options, use underlying symbol
        else:
            # SPY ETF Options
            self.underlying = self.AddEquity(self.sym, Resolution.Minute).Symbol
            option = self.AddOption(self.sym, Resolution.Minute)
            option.SetFilter(lambda universe: universe.IncludeWeeklys().Strikes(-5, 5).Expiration(0, 7))
            self.option_symbol = option.Symbol
        
        # --- State ---
        self._current_option_contract = None
        self._entry_price = 0.0
        self._entry_date = None
        self._entry_cash = 0.0
        self._entry_quantity = 0
        self._expiry_date = None
        self._did_month_entry = {}  # track (year, month) -> bool
        self._did_week_entry = {}  # track (year, week_number) -> bool
        self._scheduled_exit = {}  # track scheduled exits
        self._exit_tracked = False  # flag to track if we've recorded exit
        
        # For monthly statistics
        self._monthly_returns = {}  # (year, month) -> return
        self._daily_returns = pd.Series(dtype=float)
        
        # Schedule: check last trading day
        self.Schedule.On(
            self.DateRules.EveryDay(self.underlying),
            self.TimeRules.BeforeMarketClose(self.underlying, 30),
            self._on_daily_check
        )
        
        underlying_type = "SPX" if self.index else self.sym
        mode = "Weekly" if self.weekly else "Monthly"
        self.Debug(f"Initialized {mode} ATM Options Strategy - {underlying_type}")
        self.Debug(f"Parameters: weekly={self.weekly}, index={self.index}, option_type={self.option_type}, notional_pct={self.notional_pct}")
    
    def _is_last_trading_day_of_month(self) -> bool:
        """Check if today is the last trading day of the current month"""
        exchange = self.Securities[self.underlying].Exchange
        next_open = exchange.Hours.GetNextMarketOpen(self.Time, False)
        return next_open.month != self.Time.month
    
    def _is_friday(self) -> bool:
        """Check if today is Friday"""
        return self.Time.weekday() == 4
    
    def OnData(self, data: Slice) -> None:
        """Track position changes to capture exit value before it's cleared"""
        if self._current_option_contract is None or self._exit_tracked:
            return
        
        # Check if position was just closed
        if self._current_option_contract in self.Portfolio:
            holdings = self.Portfolio[self._current_option_contract]
            
            # If we had a position and now it's closed, record the exit
            if not holdings.Invested and self._entry_quantity > 0:
                self._record_exit()
    
    def _on_daily_check(self) -> None:
        """Daily check for entry/exit logic"""
        year, month = self.Time.year, self.Time.month
        
        if year < self.start_year or year > self.end_year:
            return
        
        # Check if we have an open position that expired or should be closed
        if self._current_option_contract is not None:
            if not self.Portfolio[self._current_option_contract].Invested:
                # Option expired or was closed
                self._record_exit()
        
        # Weekly mode: enter every Friday
        if self.weekly:
            if self._is_friday():
                week_number = self.Time.isocalendar()[1]  # ISO week number
                if not self._did_week_entry.get((year, week_number), False):
                    self._did_week_entry[(year, week_number)] = True
                    self._enter_position()
        # Monthly mode: enter on last trading day of month
        else:
            if self._is_last_trading_day_of_month():
                if not self._did_month_entry.get((year, month), False):
                    self._did_month_entry[(year, month)] = True
                    self._enter_position()
    
    def _enter_position(self) -> None:
        """Enter ATM option position"""
        year, month = self.Time.year, self.Time.month
        
        # Get current underlying price
        if not self.Securities[self.underlying].HasData:
            self.Debug(f"[{year}-{month:02d}] No underlying data available")
            return
        
        underlying_price = self.Securities[self.underlying].Price
        
        # Find ATM option expiring within 7 days
        # For index options, chain is keyed by the underlying symbol
        # For equity options, chain is keyed by the option symbol
        if self.index:
            option_chain = self.CurrentSlice.OptionChains.get(self.underlying)
        else:
            option_chain = self.CurrentSlice.OptionChains.get(self.option_symbol)
        
        if option_chain is None or len(option_chain) == 0:
            # Try alternative approach - iterate all chains
            if len(self.CurrentSlice.OptionChains) > 0:
                option_chain = list(self.CurrentSlice.OptionChains.values())[0]
                self.Debug(f"[{year}-{month:02d}] Using first available option chain")
            else:
                self.Debug(f"[{year}-{month:02d}] No option chain available")
                return
        
        # Filter for desired option type and expiration
        # Weekly mode: look for options expiring in 7 days (next Friday)
        # Monthly mode: look for options expiring within 7 days
        if self.weekly:
            # Target expiration: next Friday (7 days from now)
            target_expiry = self.Time + timedelta(days=7)
            contracts = [
                x for x in option_chain 
                if (x.Right == OptionRight.Call if self.option_type == 'call' else x.Right == OptionRight.Put)
                and abs((x.Expiry.date() - target_expiry.date()).days) <= 3  # Allow 3-day window
            ]
        else:
            # Monthly mode: options expiring within 7 days
            contracts = [
                x for x in option_chain 
                if (x.Right == OptionRight.Call if self.option_type == 'call' else x.Right == OptionRight.Put)
                and (x.Expiry - self.Time).days <= 7
                and (x.Expiry - self.Time).days >= 0
            ]
        
        if not contracts:
            self.Debug(f"[{year}-{month:02d}] No suitable contracts found")
            return
        
        # Find ATM contract (closest strike to current price)
        atm_contract = min(contracts, key=lambda x: abs(x.Strike - underlying_price))
        
        if atm_contract.BidPrice <= 0 or atm_contract.AskPrice <= 0:
            self.Debug(f"[{year}-{month:02d}] Invalid option prices")
            return
        
        # Calculate position size
        option_price = (atm_contract.BidPrice + atm_contract.AskPrice) / 2.0
        cash_to_use = self.Portfolio.Cash * self.notional_pct
        contracts_to_buy = int(cash_to_use / (option_price * 100))  # 100 shares per contract
        
        if contracts_to_buy <= 0:
            self.Debug(f"[{year}-{month:02d}] Insufficient cash for position")
            return
        
        # Place order
        self.MarketOrder(atm_contract.Symbol, contracts_to_buy)
        
        # Record entry
        self._current_option_contract = atm_contract.Symbol
        self._entry_price = option_price
        self._entry_date = self.Time
        self._entry_cash = contracts_to_buy * option_price * 100  # actual cash spent
        self._entry_quantity = contracts_to_buy
        self._expiry_date = atm_contract.Expiry
        self._exit_tracked = False
        
        # For SPY options, schedule exit 16 minutes before expiry
        if not self.index:
            exit_time = atm_contract.Expiry - timedelta(minutes=16)
            if exit_time > self.Time:
                self.Schedule.On(
                    self.DateRules.On(exit_time.year, exit_time.month, exit_time.day),
                    self.TimeRules.At(exit_time.hour, exit_time.minute),
                    lambda: self._exit_spy_position()
                )
                self._scheduled_exit[atm_contract.Symbol] = True
        
        underlying_type = "SPX" if self.index else "SPY"
        self.Debug(
            f"[{year}-{month:02d}] Entered: {contracts_to_buy} contracts of {underlying_type} "
            f"{atm_contract.Symbol.Value} @ ${option_price:.2f}, "
            f"Strike: ${atm_contract.Strike:.2f}, Underlying: ${underlying_price:.2f}, "
            f"Expiry: {atm_contract.Expiry.strftime('%Y-%m-%d %H:%M')}"
        )
    
    def _exit_spy_position(self) -> None:
        """Exit SPY option position 16 minutes before expiry with MOC order"""
        if self._current_option_contract is None:
            return
        
        if not self.Portfolio[self._current_option_contract].Invested:
            return
        
        # Place Market-On-Close order to exit
        quantity = self.Portfolio[self._current_option_contract].Quantity
        if quantity > 0:
            self.MarketOnCloseOrder(self._current_option_contract, -quantity)
            self.Debug(f"Scheduled SPY option exit (MOC): {self._current_option_contract.Value}, qty={quantity}")
    
    def _record_exit(self) -> None:
        """Record the exit of current position"""
        if self._current_option_contract is None or self._entry_date is None or self._exit_tracked:
            return
        
        # Mark as tracked to avoid double-counting
        self._exit_tracked = True
        
        # Get exit information
        exit_date = self.Time
        entry_year, entry_month = self._entry_date.year, self._entry_date.month
        
        # Calculate return based on portfolio statistics
        if self._current_option_contract in self.Portfolio:
            holdings = self.Portfolio[self._current_option_contract]
            
            # Calculate actual exit value
            # If still invested, use current holdings value
            # If closed, use the last known value from entry + profit
            if holdings.Invested:
                exit_value = holdings.HoldingsValue
            else:
                # Position closed - calculate from realized profit
                exit_value = self._entry_cash + holdings.LastTradeProfit
            
            position_return = (exit_value / self._entry_cash - 1.0) if self._entry_cash > 0 else -1.0
            
            self.Debug(
                f"DEBUG: Entry=${self._entry_cash:.2f}, Exit=${exit_value:.2f}, "
                f"LastTradeProfit=${holdings.LastTradeProfit:.2f}, Return={position_return*100:.2f}%"
            )
        else:
            position_return = -1.0
        
        # Store monthly return
        self._monthly_returns[(entry_year, entry_month)] = position_return
        
        # Create daily return series for this position
        if self._entry_date and exit_date > self._entry_date:
            days_held = (exit_date - self._entry_date).days
            if days_held > 0:
                # Approximate daily returns (assuming geometric)
                daily_ret = (1 + position_return) ** (1.0 / days_held) - 1.0
                dates = pd.date_range(self._entry_date, exit_date, freq='D')
                daily_series = pd.Series([daily_ret] * len(dates), index=dates)
                self._daily_returns = pd.concat([self._daily_returns, daily_series])
    
        underlying_type = "SPX" if self.index else self.sym
        self.Debug(
            f"[{entry_year}-{entry_month:02d}] Exited {underlying_type}: Return = {position_return*100:.2f}%, "
            f"Entry: {self._entry_date.strftime('%Y-%m-%d')}, "
            f"Exit: {exit_date.strftime('%Y-%m-%d')}"
        )
        
        # Reset state
        self._scheduled_exit.pop(self._current_option_contract, None)
        self._current_option_contract = None
        self._entry_price = 0.0
        self._entry_date = None
        self._entry_cash = 0.0
        self._entry_quantity = 0
        self._expiry_date = None
        self._exit_tracked = False
    
    def OnEndOfAlgorithm(self) -> None:
        """Calculate and display monthly statistics"""
        # Close any open position
        if self._current_option_contract is not None and self.Portfolio[self._current_option_contract].Invested:
            self._record_exit()
        
        self.Debug("\n" + "="*60)
        underlying_type = "SPX" if self.index else self.sym
        self.Debug(f"MONTHLY STATISTICS - {underlying_type} {self.option_type.upper()} OPTIONS")
        self.Debug("="*60)
        
        if not self._monthly_returns:
            self.Debug("No trades completed.")
            return
        return
        # Calculate statistics per calendar month (aggregating across years)
        monthly_stats = {}
        
        for (year, month), ret in self._monthly_returns.items():
            if month not in monthly_stats:
                monthly_stats[month] = []
            monthly_stats[month].append(ret)
        
        # Display results
        month_names = ["Jan", "Feb", "Mar", "Apr", "May", "Jun", 
                      "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
        
        for month in range(1, 13):
            month_name = month_names[month - 1]
            
            if month not in monthly_stats or len(monthly_stats[month]) == 0:
                self.Debug(f"{month_name}: No trades")
                continue
            
            returns = monthly_stats[month]
            
            # Average return (arithmetic mean)
            avg_return = np.mean(returns)
            
            # Annualized return: (1 + avg_return)^12 - 1
            # This preserves the sign of avg_return
            if avg_return >= 0:
                annualized_return = (1 + avg_return) ** 12 - 1
            else:
                annualized_return = -((1 - avg_return) ** 12 - 1)
            
            # Annualized Sharpe ratio
            if len(returns) > 1:
                std_return = np.std(returns, ddof=1)
                if std_return > 0:
                    sharpe = (avg_return / std_return) * np.sqrt(12)
                else:
                    sharpe = np.nan
            else:
                sharpe = np.nan
            
            sharpe_str = f"{sharpe:.4f}" if not np.isnan(sharpe) else "N/A"
            
            self.Debug(
                f"{month_name}: Annualized Return={annualized_return*100:+.2f}%, "
                f"Annualized Sharpe={sharpe_str}, Trades={len(returns)}"
            )
        
        self.Debug("="*60)
        
        # Overall statistics
        all_returns = list(self._monthly_returns.values())
        if len(all_returns) > 1:
            overall_avg = np.mean(all_returns)
            overall_std = np.std(all_returns, ddof=1)
            
            # Annualized return preserving sign
            if overall_avg >= 0:
                overall_annualized_return = (1 + overall_avg) ** 12 - 1
            else:
                overall_annualized_return = -((1 - overall_avg) ** 12 - 1)
            
            if overall_std > 0:
                overall_sharpe = (overall_avg / overall_std) * np.sqrt(12)
            else:
                overall_sharpe = np.nan
            
            self.Debug(f"\nOVERALL: Annualized Return={overall_annualized_return*100:+.2f}%, "
                      f"Annualized Sharpe={overall_sharpe:.4f if not np.isnan(overall_sharpe) else 'N/A'}, "
                      f"Total Trades={len(all_returns)}")
        
        self.Debug("="*60)