Overall Statistics
Total Orders
18
Average Win
0.12%
Average Loss
-0.09%
Compounding Annual Return
-26.453%
Drawdown
0.800%
Expectancy
-0.155
Start Equity
100000
End Equity
99576.5
Net Profit
-0.424%
Sharpe Ratio
-1.556
Sortino Ratio
0
Probabilistic Sharpe Ratio
46.770%
Loss Rate
62%
Win Rate
38%
Profit-Loss Ratio
1.25
Alpha
-0.188
Beta
0.188
Annual Standard Deviation
0.029
Annual Variance
0.001
Information Ratio
-11.509
Tracking Error
0.07
Treynor Ratio
-0.239
Total Fees
$0.00
Estimated Strategy Capacity
$8000.00
Lowest Capacity Asset
SPXW 32U0DFHEXD5RI|SPX 31
Portfolio Turnover
0.20%
Drawdown Recovery
0
from AlgorithmImports import *
from datetime import timedelta

class SPXPutSpreadAlphaModel(AlphaModel):
    """Alpha model for SPX 0DTE put spread signals"""
    
    def __init__(self, target_delta=-0.30, spread_width=10, entry_hour=15, entry_minute=0):
        self.target_delta = target_delta
        self.spread_width = spread_width
        self.entry_hour = entry_hour
        self.entry_minute = entry_minute
        self.trade_entered_today = False
        
    def Update(self, algorithm, data):
        """Generate insights for put spread opportunities"""
        insights = []
        
        # Reset daily flag
        if algorithm.Time.hour == 9 and algorithm.Time.minute == 30:
            self.trade_entered_today = False
            
        # Check if it's time to trade (3:00 PM ET)
        if (algorithm.Time.hour != self.entry_hour or 
            algorithm.Time.minute != self.entry_minute or 
            self.trade_entered_today):
            return insights
        
        # Check if market is open
        spx_symbol = Symbol.Create("SPX", SecurityType.Index, Market.USA)
        if not algorithm.IsMarketOpen(spx_symbol):
            return insights
            
        # Look for option chain data
        for symbol in data.OptionChains.Keys:
            if "SPX" in str(symbol):
                chain = data.OptionChains[symbol]
                spread_opportunity = self._find_put_spread_opportunity(algorithm, chain)
                
                if spread_opportunity:
                    short_symbol, long_symbol = spread_opportunity
                    self.trade_entered_today = True
                    
                    # Create insights for both legs of the spread
                    insights.append(Insight.Price(short_symbol, timedelta(hours=1), 
                                                InsightDirection.Down, weight=-1.0))
                    insights.append(Insight.Price(long_symbol, timedelta(hours=1), 
                                                InsightDirection.Down, weight=1.0))
                    
                    algorithm.Log(f"Generated put spread insights: {short_symbol} / {long_symbol}")
                    break
        
        return insights
    
    def _find_put_spread_opportunity(self, algorithm, option_chain):
        """Find suitable put spread from option chain"""
        try:
            # Get SPX price
            spx_symbol = Symbol.Create("SPX", SecurityType.Index, Market.USA)
            if spx_symbol not in algorithm.Securities:
                return None
            spx_price = algorithm.Securities[spx_symbol].Price
            
            if spx_price <= 0:
                return None
                
            # Filter valid 0DTE puts
            today_date = algorithm.Time.date()
            valid_puts = []
            
            for contract in option_chain:
                if (contract.Right == OptionRight.Put and
                    contract.Expiry.date() == today_date and
                    contract.Strike > 0 and
                    contract.Strike < spx_price and  # OTM
                    contract.Greeks.Delta is not None):
                    valid_puts.append(contract)
            
            if len(valid_puts) < 2:
                return None
                
            # Find best short leg (target delta)
            short_leg = min(valid_puts, 
                          key=lambda x: abs(x.Greeks.Delta - self.target_delta))
            
            # Find long leg (spread_width below)
            target_long_strike = short_leg.Strike - self.spread_width
            long_leg = min([p for p in valid_puts if p.Strike < short_leg.Strike], 
                          key=lambda x: abs(x.Strike - target_long_strike),
                          default=None)
            
            if long_leg is None:
                return None
                
            # Validate spread
            actual_width = short_leg.Strike - long_leg.Strike
            if 5 <= actual_width <= 15:
                return (short_leg.Symbol, long_leg.Symbol)
                
        except Exception as e:
            algorithm.Log(f"Error finding spread opportunity: {e}")
            
        return None
from AlgorithmImports import *

class SPXPutSpreadExecutionModel(ExecutionModel):
    """Custom execution model for put spreads"""
    
    def Execute(self, algorithm, targets):
        """Execute spread orders"""
        if not targets:
            return
            
        # Initialize spread tracking
        algorithm._current_spread_tracking = {
            'fills': 0,
            'short_fill_price': 0,
            'long_fill_price': 0
        }
        
        # Group targets into spread pairs
        spread_pairs = []
        targets_list = list(targets)
        
        for i in range(0, len(targets_list), 2):
            if i + 1 < len(targets_list):
                target1 = targets_list[i]
                target2 = targets_list[i + 1]
                
                # Determine short vs long based on quantity
                if target1.Quantity < 0:
                    short_target, long_target = target1, target2
                else:
                    short_target, long_target = target2, target1
                    
                spread_pairs.append((short_target, long_target))
        
        # Execute each spread
        for short_target, long_target in spread_pairs:
            self._execute_spread(algorithm, short_target, long_target)
    
    def _execute_spread(self, algorithm, short_target, long_target):
        """Execute both legs of put spread simultaneously"""
        try:
            # Get current holdings
            short_holding = algorithm.Portfolio[short_target.Symbol].Quantity
            long_holding = algorithm.Portfolio[long_target.Symbol].Quantity
            
            # Calculate required orders
            short_quantity = short_target.Quantity - short_holding
            long_quantity = long_target.Quantity - long_holding
            
            if short_quantity == 0 and long_quantity == 0:
                return
                
            algorithm.Log(f"Executing put spread:")
            algorithm.Log(f"  Short leg: {short_target.Symbol} qty: {short_quantity}")
            algorithm.Log(f"  Long leg: {long_target.Symbol} qty: {long_quantity}")
            
            # Execute both legs
            if short_quantity != 0:
                algorithm.MarketOrder(short_target.Symbol, short_quantity)
            if long_quantity != 0:
                algorithm.MarketOrder(long_target.Symbol, long_quantity)
                
        except Exception as e:
            algorithm.Error(f"Error executing spread: {e}")
from AlgorithmImports import *
from universe_selection import SPXOptionsUniverseSelectionModel
from alpha_model import SPXPutSpreadAlphaModel
from portfolio_construction import SPXPutSpreadPortfolioConstructionModel
from execution_model import SPXPutSpreadExecutionModel
from spread_tracker import SpreadTracker

class SPX0DTEPutSpreadFrameworkAlgorithm(QCAlgorithm):
    """
    Modular SPX 0DTE Put Spread Strategy using Algorithm Framework
    
    Strategy: Daily 0DTE put spread selling on SPX
    Entry: 3:00 PM ET (1 hour before expiration)
    Target: ~30 delta short put, $10 wide spread
    
    Framework Modules:
    - Universe Selection: SPX/SPXW options setup
    - Alpha Model: Signal generation for spread opportunities  
    - Portfolio Construction: Convert signals to position targets
    - Execution Model: Handle spread order execution
    - Risk Management: Null (minimal risk controls)
    """
    
    def Initialize(self):
        # === BACKTEST CONFIGURATION ===
        self.SetStartDate(2025, 8, 11)  # Monday  
        self.SetEndDate(2025, 8, 15)    # Friday - full week of 0DTE trading  
        self.SetCash(100000)
        self.SetTimeZone(TimeZones.NEW_YORK)
        
        # === STRATEGY PARAMETERS ===
        self.target_delta = -0.30        # Target delta for short put leg
        self.spread_width = 10           # Dollar width of put spread
        self.trade_entry_hour = 15       # 3:00 PM ET entry time
        self.trade_entry_minute = 0      # Entry at top of hour
        
        # === INITIALIZE TRACKING ===
        self.spread_tracker = SpreadTracker()
        
        # === ALGORITHM FRAMEWORK SETUP ===
        # Each module handles a specific aspect of the trading strategy
        
        # 1. Universe Selection: What assets to trade
        self.SetUniverseSelection(SPXOptionsUniverseSelectionModel())
        
        # 2. Alpha Model: When and what signals to generate
        self.SetAlpha(SPXPutSpreadAlphaModel(
            target_delta=self.target_delta,
            spread_width=self.spread_width, 
            entry_hour=self.trade_entry_hour,
            entry_minute=self.trade_entry_minute
        ))
        
        # 3. Portfolio Construction: How to size positions from signals
        self.SetPortfolioConstruction(SPXPutSpreadPortfolioConstructionModel())
        
        # 4. Execution: How to execute the trades
        self.SetExecution(SPXPutSpreadExecutionModel())
        
        # 5. Risk Management: How to manage downside (using null for now)
        self.SetRiskManagement(NullRiskManagementModel())
        
        # === FRAMEWORK SETTINGS ===
        # Control when portfolio rebalancing occurs
        self.Settings.RebalancePortfolioOnInsightChanges = False
        self.Settings.RebalancePortfolioOnSecurityChanges = False

    def OnOrderEvent(self, orderEvent):
        """
        Handle order fill events for spread tracking
        
        This monitors when orders are filled and tracks spread execution
        across both legs (short and long puts).
        """
        if orderEvent.Status == OrderStatus.Filled:
            symbol = orderEvent.Symbol
            quantity = orderEvent.FillQuantity
            price = orderEvent.FillPrice
            direction = "SOLD" if quantity < 0 else "BOUGHT"
            
            # Log individual order fills
            self.Log(f"✅ {direction}: {symbol} Qty: {abs(quantity)} @ ${price:.2f}")
            
            # Track spread execution using our helper class
            if not self.spread_tracker.current_spread_tracking:
                self.spread_tracker.initialize_spread_tracking()
            
            self.spread_tracker.record_fill(quantity, price)
            
            # Check if spread is complete (both legs filled)
            if self.spread_tracker.is_spread_complete():
                self.spread_tracker.complete_spread(self)

    def OnEndOfAlgorithm(self):
        """
        Final performance summary when algorithm completes
        """
        self.Log("=" * 50)
        self.Log("MODULAR FRAMEWORK STRATEGY RESULTS")
        self.Log("=" * 50)
        
        self.Log(f"Total Spreads Executed: {self.spread_tracker.total_trades}")
        self.Log(f"Winning Trades: {self.spread_tracker.winning_trades}")
        
        if self.spread_tracker.total_trades > 0:
            win_rate = self.spread_tracker.calculate_win_rate()
            avg_premium = self.spread_tracker.total_premium_collected / self.spread_tracker.total_trades
            
            self.Log(f"Win Rate: {win_rate:.1f}%")
            self.Log(f"Average Premium per Trade: ${avg_premium:.2f}")
            self.Log(f"Total Premium Collected: ${self.spread_tracker.total_premium_collected:.2f}")
        else:
            self.Log("No trades executed during backtest period")
            
        self.Log("=" * 50)
        self.Log("Framework modules used:")
        self.Log("- Universe Selection: SPXOptionsUniverseSelectionModel")
        self.Log("- Alpha Model: SPXPutSpreadAlphaModel") 
        self.Log("- Portfolio Construction: SPXPutSpreadPortfolioConstructionModel")
        self.Log("- Execution: SPXPutSpreadExecutionModel")
        self.Log("- Risk Management: NullRiskManagementModel")
        self.Log("=" * 50)


# Alternative simple implementation for comparison
class SPX0DTESimpleFramework(QCAlgorithm):
    """
    Simplified framework approach for comparison
    Uses framework structure but with direct execution
    """
    
    def Initialize(self):
        # Basic setup
        self.SetStartDate(2025, 8, 11)
        self.SetEndDate(2025, 8, 15)
        self.SetCash(100000)
        self.SetTimeZone(TimeZones.NEW_YORK)
        
        # Add SPX and options manually (not using universe selection)
        self.spx = self.AddIndex("SPX", Resolution.Minute)
        self.spx_symbol = self.spx.Symbol
        
        option = self.AddIndexOption("SPX", "SPXW", Resolution.Minute)
        option.SetFilter(self._option_filter)
        self.option_symbol = option.Symbol
        
        # Strategy parameters
        self.target_delta = -0.30
        self.spread_width = 10
        self.trade_entry_hour = 15
        self.position_size = 1
        
        # Use null framework models (bypass framework)
        self.SetUniverseSelection(NullUniverseSelectionModel())
        self.SetAlpha(NullAlphaModel())
        self.SetPortfolioConstruction(NullPortfolioConstructionModel())
        self.SetExecution(NullExecutionModel())
        
        # Initialize tracker
        self.spread_tracker = SpreadTracker()
        
        # Schedule trading directly
        self.Schedule.On(
            self.DateRules.EveryDay(self.spx_symbol),
            self.TimeRules.At(self.trade_entry_hour, 0),
            self._try_enter_spread
        )
        
    def _option_filter(self, universe):
        """Filter for 0DTE SPXW puts"""
        return (universe
                .Strikes(-20, 0)
                .Expiration(0, 0)
                .Delta(-0.6, -0.1)
                .IncludeWeeklys()
                .OnlyApplyFilterAtMarketOpen())
    
    def _try_enter_spread(self):
        """Try to enter spread using direct approach"""
        slice = self.CurrentSlice
        if self.option_symbol in slice.OptionChains:
            chain = slice.OptionChains[self.option_symbol]
            spread = self._find_best_spread(chain)
            
            if spread:
                short_symbol, long_symbol = spread
                
                # Execute directly
                self.MarketOrder(short_symbol, -self.position_size)
                self.MarketOrder(long_symbol, self.position_size)
                
                self.Log(f"✅ Direct execution: {short_symbol} / {long_symbol}")
    
    def _find_best_spread(self, chain):
        """Simple spread finding logic"""
        spx_price = self.Securities[self.spx_symbol].Price
        today = self.Time.date()
        
        valid_puts = [c for c in chain if 
                     c.Right == OptionRight.Put and
                     c.Expiry.date() == today and
                     c.Strike < spx_price and
                     c.Greeks.Delta is not None]
        
        if len(valid_puts) < 2:
            return None
        
        # Find short leg closest to target delta
        short_leg = min(valid_puts, key=lambda x: abs(x.Greeks.Delta - self.target_delta))
        
        # Find long leg closest to target strike
        target_long_strike = short_leg.Strike - self.spread_width
        long_candidates = [p for p in valid_puts if p.Strike < short_leg.Strike]
        
        if not long_candidates:
            return None
            
        long_leg = min(long_candidates, key=lambda x: abs(x.Strike - target_long_strike))
        
        return (short_leg.Symbol, long_leg.Symbol)
from AlgorithmImports import *

class SPXPutSpreadPortfolioConstructionModel(PortfolioConstructionModel):
    """Portfolio construction for put spreads"""
    
    def CreateTargets(self, algorithm, insights):
        """Create portfolio targets from insights"""
        targets = []
        
        if not insights:
            return targets
            
        # Group insights by spread pairs
        spread_pairs = []
        for i in range(0, len(insights), 2):
            if i + 1 < len(insights):
                insight1 = insights[i]
                insight2 = insights[i + 1]
                
                # Determine which is short (negative weight) and long (positive weight)
                if insight1.Weight < 0:
                    short_insight, long_insight = insight1, insight2
                else:
                    short_insight, long_insight = insight2, insight1
                    
                spread_pairs.append((short_insight, long_insight))
        
        # Create targets for each spread
        for short_insight, long_insight in spread_pairs:
            # One spread = sell 1 short put, buy 1 long put
            targets.append(PortfolioTarget(short_insight.Symbol, -1))  # Sell short leg
            targets.append(PortfolioTarget(long_insight.Symbol, 1))    # Buy long leg
            
        return targets
from AlgorithmImports import *

class SpreadTracker:
    """Helper class to track spread execution and performance"""
    
    def __init__(self):
        self.total_trades = 0
        self.winning_trades = 0
        self.total_premium_collected = 0
        self.total_pnl = 0
        self.current_spread_tracking = None
        
    def initialize_spread_tracking(self):
        """Initialize tracking for new spread"""
        self.current_spread_tracking = {
            'fills': 0,
            'short_fill_price': 0,
            'long_fill_price': 0,
            'entry_time': None
        }
        
    def record_fill(self, quantity, price):
        """Record order fill for spread leg"""
        if not self.current_spread_tracking:
            return
            
        self.current_spread_tracking['fills'] += 1
        if quantity < 0:  # Short leg
            self.current_spread_tracking['short_fill_price'] = price
        else:  # Long leg
            self.current_spread_tracking['long_fill_price'] = price
            
    def is_spread_complete(self):
        """Check if both legs of spread are filled"""
        return (self.current_spread_tracking and 
                self.current_spread_tracking['fills'] == 2)
    
    def get_spread_credit(self):
        """Calculate net credit from current spread"""
        if not self.current_spread_tracking:
            return 0
            
        return (self.current_spread_tracking['short_fill_price'] - 
                self.current_spread_tracking['long_fill_price'])
    
    def complete_spread(self, algorithm):
        """Complete spread tracking and update totals"""
        if not self.is_spread_complete():
            return
            
        net_credit = self.get_spread_credit()
        premium_collected = net_credit * 100
        
        self.total_trades += 1
        self.total_premium_collected += premium_collected
        
        algorithm.Log(f"✅ PUT SPREAD EXECUTED:")
        algorithm.Log(f"  Net Credit: ${net_credit:.2f}")
        algorithm.Log(f"  Premium Collected: ${premium_collected:.2f}")
        algorithm.Log(f"  Total Trades: {self.total_trades}")
        
        # Reset tracking for next spread
        self.current_spread_tracking = None
        
    def calculate_win_rate(self):
        """Calculate current win rate"""
        if self.total_trades == 0:
            return 0
        return (self.winning_trades / self.total_trades) * 100
from AlgorithmImports import *

class SPXOptionsUniverseSelectionModel(UniverseSelectionModel):
    """Universe selection for SPX and SPXW options"""
    
    def CreateUniverses(self, algorithm):
        """Create SPX options universe"""
        # Add SPX index
        spx = algorithm.AddIndex("SPX", Resolution.Minute)
        
        # Add SPXW options
        option = algorithm.AddIndexOption("SPX", "SPXW", Resolution.Minute)
        option.SetFilter(self.OptionFilter)
        
        return []
    
    def OptionFilter(self, universe):
        """Filter for 0DTE SPXW puts"""
        return (universe
                .Strikes(-20, 0)
                .Expiration(0, 0)  # 0DTE only
                .Delta(-0.6, -0.1)
                .IncludeWeeklys()
                .OnlyApplyFilterAtMarketOpen())