| Overall Statistics |
|
Total Orders 18 Average Win 0.12% Average Loss -0.09% Compounding Annual Return -26.453% Drawdown 0.800% Expectancy -0.155 Start Equity 100000 End Equity 99576.5 Net Profit -0.424% Sharpe Ratio -1.556 Sortino Ratio 0 Probabilistic Sharpe Ratio 46.770% Loss Rate 62% Win Rate 38% Profit-Loss Ratio 1.25 Alpha -0.188 Beta 0.188 Annual Standard Deviation 0.029 Annual Variance 0.001 Information Ratio -11.509 Tracking Error 0.07 Treynor Ratio -0.239 Total Fees $0.00 Estimated Strategy Capacity $8000.00 Lowest Capacity Asset SPXW 32U0DFHEXD5RI|SPX 31 Portfolio Turnover 0.20% Drawdown Recovery 0 |
from AlgorithmImports import *
from datetime import timedelta
class SPXPutSpreadAlphaModel(AlphaModel):
"""Alpha model for SPX 0DTE put spread signals"""
def __init__(self, target_delta=-0.30, spread_width=10, entry_hour=15, entry_minute=0):
self.target_delta = target_delta
self.spread_width = spread_width
self.entry_hour = entry_hour
self.entry_minute = entry_minute
self.trade_entered_today = False
def Update(self, algorithm, data):
"""Generate insights for put spread opportunities"""
insights = []
# Reset daily flag
if algorithm.Time.hour == 9 and algorithm.Time.minute == 30:
self.trade_entered_today = False
# Check if it's time to trade (3:00 PM ET)
if (algorithm.Time.hour != self.entry_hour or
algorithm.Time.minute != self.entry_minute or
self.trade_entered_today):
return insights
# Check if market is open
spx_symbol = Symbol.Create("SPX", SecurityType.Index, Market.USA)
if not algorithm.IsMarketOpen(spx_symbol):
return insights
# Look for option chain data
for symbol in data.OptionChains.Keys:
if "SPX" in str(symbol):
chain = data.OptionChains[symbol]
spread_opportunity = self._find_put_spread_opportunity(algorithm, chain)
if spread_opportunity:
short_symbol, long_symbol = spread_opportunity
self.trade_entered_today = True
# Create insights for both legs of the spread
insights.append(Insight.Price(short_symbol, timedelta(hours=1),
InsightDirection.Down, weight=-1.0))
insights.append(Insight.Price(long_symbol, timedelta(hours=1),
InsightDirection.Down, weight=1.0))
algorithm.Log(f"Generated put spread insights: {short_symbol} / {long_symbol}")
break
return insights
def _find_put_spread_opportunity(self, algorithm, option_chain):
"""Find suitable put spread from option chain"""
try:
# Get SPX price
spx_symbol = Symbol.Create("SPX", SecurityType.Index, Market.USA)
if spx_symbol not in algorithm.Securities:
return None
spx_price = algorithm.Securities[spx_symbol].Price
if spx_price <= 0:
return None
# Filter valid 0DTE puts
today_date = algorithm.Time.date()
valid_puts = []
for contract in option_chain:
if (contract.Right == OptionRight.Put and
contract.Expiry.date() == today_date and
contract.Strike > 0 and
contract.Strike < spx_price and # OTM
contract.Greeks.Delta is not None):
valid_puts.append(contract)
if len(valid_puts) < 2:
return None
# Find best short leg (target delta)
short_leg = min(valid_puts,
key=lambda x: abs(x.Greeks.Delta - self.target_delta))
# Find long leg (spread_width below)
target_long_strike = short_leg.Strike - self.spread_width
long_leg = min([p for p in valid_puts if p.Strike < short_leg.Strike],
key=lambda x: abs(x.Strike - target_long_strike),
default=None)
if long_leg is None:
return None
# Validate spread
actual_width = short_leg.Strike - long_leg.Strike
if 5 <= actual_width <= 15:
return (short_leg.Symbol, long_leg.Symbol)
except Exception as e:
algorithm.Log(f"Error finding spread opportunity: {e}")
return None
from AlgorithmImports import *
class SPXPutSpreadExecutionModel(ExecutionModel):
"""Custom execution model for put spreads"""
def Execute(self, algorithm, targets):
"""Execute spread orders"""
if not targets:
return
# Initialize spread tracking
algorithm._current_spread_tracking = {
'fills': 0,
'short_fill_price': 0,
'long_fill_price': 0
}
# Group targets into spread pairs
spread_pairs = []
targets_list = list(targets)
for i in range(0, len(targets_list), 2):
if i + 1 < len(targets_list):
target1 = targets_list[i]
target2 = targets_list[i + 1]
# Determine short vs long based on quantity
if target1.Quantity < 0:
short_target, long_target = target1, target2
else:
short_target, long_target = target2, target1
spread_pairs.append((short_target, long_target))
# Execute each spread
for short_target, long_target in spread_pairs:
self._execute_spread(algorithm, short_target, long_target)
def _execute_spread(self, algorithm, short_target, long_target):
"""Execute both legs of put spread simultaneously"""
try:
# Get current holdings
short_holding = algorithm.Portfolio[short_target.Symbol].Quantity
long_holding = algorithm.Portfolio[long_target.Symbol].Quantity
# Calculate required orders
short_quantity = short_target.Quantity - short_holding
long_quantity = long_target.Quantity - long_holding
if short_quantity == 0 and long_quantity == 0:
return
algorithm.Log(f"Executing put spread:")
algorithm.Log(f" Short leg: {short_target.Symbol} qty: {short_quantity}")
algorithm.Log(f" Long leg: {long_target.Symbol} qty: {long_quantity}")
# Execute both legs
if short_quantity != 0:
algorithm.MarketOrder(short_target.Symbol, short_quantity)
if long_quantity != 0:
algorithm.MarketOrder(long_target.Symbol, long_quantity)
except Exception as e:
algorithm.Error(f"Error executing spread: {e}")
from AlgorithmImports import *
from universe_selection import SPXOptionsUniverseSelectionModel
from alpha_model import SPXPutSpreadAlphaModel
from portfolio_construction import SPXPutSpreadPortfolioConstructionModel
from execution_model import SPXPutSpreadExecutionModel
from spread_tracker import SpreadTracker
class SPX0DTEPutSpreadFrameworkAlgorithm(QCAlgorithm):
"""
Modular SPX 0DTE Put Spread Strategy using Algorithm Framework
Strategy: Daily 0DTE put spread selling on SPX
Entry: 3:00 PM ET (1 hour before expiration)
Target: ~30 delta short put, $10 wide spread
Framework Modules:
- Universe Selection: SPX/SPXW options setup
- Alpha Model: Signal generation for spread opportunities
- Portfolio Construction: Convert signals to position targets
- Execution Model: Handle spread order execution
- Risk Management: Null (minimal risk controls)
"""
def Initialize(self):
# === BACKTEST CONFIGURATION ===
self.SetStartDate(2025, 8, 11) # Monday
self.SetEndDate(2025, 8, 15) # Friday - full week of 0DTE trading
self.SetCash(100000)
self.SetTimeZone(TimeZones.NEW_YORK)
# === STRATEGY PARAMETERS ===
self.target_delta = -0.30 # Target delta for short put leg
self.spread_width = 10 # Dollar width of put spread
self.trade_entry_hour = 15 # 3:00 PM ET entry time
self.trade_entry_minute = 0 # Entry at top of hour
# === INITIALIZE TRACKING ===
self.spread_tracker = SpreadTracker()
# === ALGORITHM FRAMEWORK SETUP ===
# Each module handles a specific aspect of the trading strategy
# 1. Universe Selection: What assets to trade
self.SetUniverseSelection(SPXOptionsUniverseSelectionModel())
# 2. Alpha Model: When and what signals to generate
self.SetAlpha(SPXPutSpreadAlphaModel(
target_delta=self.target_delta,
spread_width=self.spread_width,
entry_hour=self.trade_entry_hour,
entry_minute=self.trade_entry_minute
))
# 3. Portfolio Construction: How to size positions from signals
self.SetPortfolioConstruction(SPXPutSpreadPortfolioConstructionModel())
# 4. Execution: How to execute the trades
self.SetExecution(SPXPutSpreadExecutionModel())
# 5. Risk Management: How to manage downside (using null for now)
self.SetRiskManagement(NullRiskManagementModel())
# === FRAMEWORK SETTINGS ===
# Control when portfolio rebalancing occurs
self.Settings.RebalancePortfolioOnInsightChanges = False
self.Settings.RebalancePortfolioOnSecurityChanges = False
def OnOrderEvent(self, orderEvent):
"""
Handle order fill events for spread tracking
This monitors when orders are filled and tracks spread execution
across both legs (short and long puts).
"""
if orderEvent.Status == OrderStatus.Filled:
symbol = orderEvent.Symbol
quantity = orderEvent.FillQuantity
price = orderEvent.FillPrice
direction = "SOLD" if quantity < 0 else "BOUGHT"
# Log individual order fills
self.Log(f"✅ {direction}: {symbol} Qty: {abs(quantity)} @ ${price:.2f}")
# Track spread execution using our helper class
if not self.spread_tracker.current_spread_tracking:
self.spread_tracker.initialize_spread_tracking()
self.spread_tracker.record_fill(quantity, price)
# Check if spread is complete (both legs filled)
if self.spread_tracker.is_spread_complete():
self.spread_tracker.complete_spread(self)
def OnEndOfAlgorithm(self):
"""
Final performance summary when algorithm completes
"""
self.Log("=" * 50)
self.Log("MODULAR FRAMEWORK STRATEGY RESULTS")
self.Log("=" * 50)
self.Log(f"Total Spreads Executed: {self.spread_tracker.total_trades}")
self.Log(f"Winning Trades: {self.spread_tracker.winning_trades}")
if self.spread_tracker.total_trades > 0:
win_rate = self.spread_tracker.calculate_win_rate()
avg_premium = self.spread_tracker.total_premium_collected / self.spread_tracker.total_trades
self.Log(f"Win Rate: {win_rate:.1f}%")
self.Log(f"Average Premium per Trade: ${avg_premium:.2f}")
self.Log(f"Total Premium Collected: ${self.spread_tracker.total_premium_collected:.2f}")
else:
self.Log("No trades executed during backtest period")
self.Log("=" * 50)
self.Log("Framework modules used:")
self.Log("- Universe Selection: SPXOptionsUniverseSelectionModel")
self.Log("- Alpha Model: SPXPutSpreadAlphaModel")
self.Log("- Portfolio Construction: SPXPutSpreadPortfolioConstructionModel")
self.Log("- Execution: SPXPutSpreadExecutionModel")
self.Log("- Risk Management: NullRiskManagementModel")
self.Log("=" * 50)
# Alternative simple implementation for comparison
class SPX0DTESimpleFramework(QCAlgorithm):
"""
Simplified framework approach for comparison
Uses framework structure but with direct execution
"""
def Initialize(self):
# Basic setup
self.SetStartDate(2025, 8, 11)
self.SetEndDate(2025, 8, 15)
self.SetCash(100000)
self.SetTimeZone(TimeZones.NEW_YORK)
# Add SPX and options manually (not using universe selection)
self.spx = self.AddIndex("SPX", Resolution.Minute)
self.spx_symbol = self.spx.Symbol
option = self.AddIndexOption("SPX", "SPXW", Resolution.Minute)
option.SetFilter(self._option_filter)
self.option_symbol = option.Symbol
# Strategy parameters
self.target_delta = -0.30
self.spread_width = 10
self.trade_entry_hour = 15
self.position_size = 1
# Use null framework models (bypass framework)
self.SetUniverseSelection(NullUniverseSelectionModel())
self.SetAlpha(NullAlphaModel())
self.SetPortfolioConstruction(NullPortfolioConstructionModel())
self.SetExecution(NullExecutionModel())
# Initialize tracker
self.spread_tracker = SpreadTracker()
# Schedule trading directly
self.Schedule.On(
self.DateRules.EveryDay(self.spx_symbol),
self.TimeRules.At(self.trade_entry_hour, 0),
self._try_enter_spread
)
def _option_filter(self, universe):
"""Filter for 0DTE SPXW puts"""
return (universe
.Strikes(-20, 0)
.Expiration(0, 0)
.Delta(-0.6, -0.1)
.IncludeWeeklys()
.OnlyApplyFilterAtMarketOpen())
def _try_enter_spread(self):
"""Try to enter spread using direct approach"""
slice = self.CurrentSlice
if self.option_symbol in slice.OptionChains:
chain = slice.OptionChains[self.option_symbol]
spread = self._find_best_spread(chain)
if spread:
short_symbol, long_symbol = spread
# Execute directly
self.MarketOrder(short_symbol, -self.position_size)
self.MarketOrder(long_symbol, self.position_size)
self.Log(f"✅ Direct execution: {short_symbol} / {long_symbol}")
def _find_best_spread(self, chain):
"""Simple spread finding logic"""
spx_price = self.Securities[self.spx_symbol].Price
today = self.Time.date()
valid_puts = [c for c in chain if
c.Right == OptionRight.Put and
c.Expiry.date() == today and
c.Strike < spx_price and
c.Greeks.Delta is not None]
if len(valid_puts) < 2:
return None
# Find short leg closest to target delta
short_leg = min(valid_puts, key=lambda x: abs(x.Greeks.Delta - self.target_delta))
# Find long leg closest to target strike
target_long_strike = short_leg.Strike - self.spread_width
long_candidates = [p for p in valid_puts if p.Strike < short_leg.Strike]
if not long_candidates:
return None
long_leg = min(long_candidates, key=lambda x: abs(x.Strike - target_long_strike))
return (short_leg.Symbol, long_leg.Symbol)
from AlgorithmImports import *
class SPXPutSpreadPortfolioConstructionModel(PortfolioConstructionModel):
"""Portfolio construction for put spreads"""
def CreateTargets(self, algorithm, insights):
"""Create portfolio targets from insights"""
targets = []
if not insights:
return targets
# Group insights by spread pairs
spread_pairs = []
for i in range(0, len(insights), 2):
if i + 1 < len(insights):
insight1 = insights[i]
insight2 = insights[i + 1]
# Determine which is short (negative weight) and long (positive weight)
if insight1.Weight < 0:
short_insight, long_insight = insight1, insight2
else:
short_insight, long_insight = insight2, insight1
spread_pairs.append((short_insight, long_insight))
# Create targets for each spread
for short_insight, long_insight in spread_pairs:
# One spread = sell 1 short put, buy 1 long put
targets.append(PortfolioTarget(short_insight.Symbol, -1)) # Sell short leg
targets.append(PortfolioTarget(long_insight.Symbol, 1)) # Buy long leg
return targets
from AlgorithmImports import *
class SpreadTracker:
"""Helper class to track spread execution and performance"""
def __init__(self):
self.total_trades = 0
self.winning_trades = 0
self.total_premium_collected = 0
self.total_pnl = 0
self.current_spread_tracking = None
def initialize_spread_tracking(self):
"""Initialize tracking for new spread"""
self.current_spread_tracking = {
'fills': 0,
'short_fill_price': 0,
'long_fill_price': 0,
'entry_time': None
}
def record_fill(self, quantity, price):
"""Record order fill for spread leg"""
if not self.current_spread_tracking:
return
self.current_spread_tracking['fills'] += 1
if quantity < 0: # Short leg
self.current_spread_tracking['short_fill_price'] = price
else: # Long leg
self.current_spread_tracking['long_fill_price'] = price
def is_spread_complete(self):
"""Check if both legs of spread are filled"""
return (self.current_spread_tracking and
self.current_spread_tracking['fills'] == 2)
def get_spread_credit(self):
"""Calculate net credit from current spread"""
if not self.current_spread_tracking:
return 0
return (self.current_spread_tracking['short_fill_price'] -
self.current_spread_tracking['long_fill_price'])
def complete_spread(self, algorithm):
"""Complete spread tracking and update totals"""
if not self.is_spread_complete():
return
net_credit = self.get_spread_credit()
premium_collected = net_credit * 100
self.total_trades += 1
self.total_premium_collected += premium_collected
algorithm.Log(f"✅ PUT SPREAD EXECUTED:")
algorithm.Log(f" Net Credit: ${net_credit:.2f}")
algorithm.Log(f" Premium Collected: ${premium_collected:.2f}")
algorithm.Log(f" Total Trades: {self.total_trades}")
# Reset tracking for next spread
self.current_spread_tracking = None
def calculate_win_rate(self):
"""Calculate current win rate"""
if self.total_trades == 0:
return 0
return (self.winning_trades / self.total_trades) * 100
from AlgorithmImports import *
class SPXOptionsUniverseSelectionModel(UniverseSelectionModel):
"""Universe selection for SPX and SPXW options"""
def CreateUniverses(self, algorithm):
"""Create SPX options universe"""
# Add SPX index
spx = algorithm.AddIndex("SPX", Resolution.Minute)
# Add SPXW options
option = algorithm.AddIndexOption("SPX", "SPXW", Resolution.Minute)
option.SetFilter(self.OptionFilter)
return []
def OptionFilter(self, universe):
"""Filter for 0DTE SPXW puts"""
return (universe
.Strikes(-20, 0)
.Expiration(0, 0) # 0DTE only
.Delta(-0.6, -0.1)
.IncludeWeeklys()
.OnlyApplyFilterAtMarketOpen())