| Overall Statistics |
|
Total Orders 0 Average Win 0% Average Loss 0% Compounding Annual Return 0% Drawdown 0% Expectancy 0 Start Equity 100000 End Equity 100000 Net Profit 0% Sharpe Ratio 0 Sortino Ratio 0 Probabilistic Sharpe Ratio 0% Loss Rate 0% Win Rate 0% Profit-Loss Ratio 0 Alpha 0 Beta 0 Annual Standard Deviation 0 Annual Variance 0 Information Ratio -4.498 Tracking Error 0.089 Treynor Ratio 0 Total Fees $0.00 Estimated Strategy Capacity $0 Lowest Capacity Asset Portfolio Turnover 0% Drawdown Recovery 0 |
from AlgorithmImports import *
from collections import defaultdict
from datetime import timedelta
import numpy as np
class SkewCaptureStrategy(QCAlgorithm):
"""
Skew Capture Strategy: Captures rich implied volatility skew in equity options
by entering directional debit spreads when skew is historically elevated and
momentum aligns.
Strategy Logic:
- Monitors 6 tech stocks for IV skew opportunities
- Enters call spreads when bullish momentum + elevated call skew
- Enters put spreads when bearish momentum + elevated put skew
- Exits at 80% profit target, expiration, or time-based rules
NOTE:
Currently using the iv indicator, which uses end of day IV.
Issue faced now: IV Skew is not matching patterns seen on other
IV data providers (eg OQuants, market chameleon). Might need to
change smoothing or mirror contract. See QC post that may have some insights.
https://www.quantconnect.com/research/16977/greeks-and-iv-implementation/p1
NOTE:
Currently this code plots IV Skew, as the main priority at the moment is
making sure IV skew is showing the outlier anomalies / skew spikes that
tell us its time to enter.
"""
def initialize(self) -> None:
"""Initialize strategy parameters, universe, and data structures"""
# ==================================================================
# 1. BACKTEST CONFIGURATION
# ==================================================================
self.set_start_date(2025, 8, 1)
self.set_end_date(2025, 9, 1)
self.set_cash(100_000)
# ==================================================================
# 2. UNIVERSE DEFINITION
# ==================================================================
# Fixed watchlist as specified in strategy
self.universe_tickers = ["AAPL"]
# self.universe_tickers = ["GOOG", "AMC", "AMD", "META", "OPEN", "NFLX", "LLY"]
# ==================================================================
# 3. STRATEGY PARAMETERS (From MVP Spec)
# ==================================================================
# Delta ranges for spread leg search (NEW - replaces fixed deltas)
self.long_delta_min = 0.45
self.long_delta_max = 0.55
self.short_delta_min = 0.10
self.short_delta_max = 0.25
# IV/RV Ratio Thresholds (NEW)
self.long_ivrv_preferred = 1.2 # Preferred: fairly priced
self.long_ivrv_max = 1.5 # Maximum tolerance
self.short_ivrv_min = 1.3 # Minimum: overpriced
self.short_ivrv_preferred = 1.6 # Preferred: very overpriced
# Differential requirement (NEW)
self.min_ivrv_differential = 0.3 # Minimum IV/RV Short - IV/RV Long
# Realized volatility lookback
self.rv_lookback = 20 # trading days
# Skew Z-score parameters
self.skew_dte = self.get_parameter("skew_dte", 30)
self.skew_history_days = self.get_parameter("skew_z_lookback", 60) # rolling window for Z-score - default 60
self.skew_zscore_threshold = -1.5 # enter when Z ≤ -2.0 (CHANGED from -1.5 to -2.0)
self.skew_use_avg_atm_iv = 1 == self.get_parameter("skew_use_avg_atm_iv", 1)
self.skew_normalize_on_atm = 1 == self.get_parameter("skew_normalize_on_atm", 1)
# Momentum parameters
self.momentum_lookback = 60 # trading days
self.bullish_momentum_threshold = 0.05 # 5% log return
self.bearish_momentum_threshold = -0.05 # -5% log return
# DTE filters
self.min_dte = 7 # calendar days # 7
self.max_dte = 21 # calendar days. # 21
# Entry filters
self.min_stock_price = 1.00 # minimum underlying price - $10
self.max_bid_ask_spread = 0.15 # 0.15 max spread as fraction of mid - remove for now
self.min_reward_risk_ratio = 3.0 # min profit/loss ratio (CHANGED from 2.0 to 3.0)
self.pct_of_folio_size = 0.03 # 0.05 = 5% of folio value
self.hour_of_entry = self.get_parameter("hour_of_entry", 15)
# Exit parameters
self.profit_target_pct = 0.8 # exit at 80% of max profit
self.days_before_expiry_exit = 2 # close if not OTM this many days before exp
# ==================================================================
# 4. DATA STRUCTURES
# ==================================================================
# Store Symbol objects for each ticker
self.equity_symbols = {}
self.option_symbols = {}
# Historical data storage: ticker -> list of values
self.price_history = defaultdict(list) # Daily close prices
self.call_skew_history = defaultdict(list) # Daily call skew values
self.put_skew_history = defaultdict(list) # Daily put skew values
# Track active positions: ticker -> position info dict
self.active_positions = {}
# Track last processing date to ensure daily execution
self.last_process_date = None
# ==================================================================
# 5. ADD SECURITIES
# ==================================================================
for ticker in self.universe_tickers:
# Add equity
equity = self.add_equity(ticker, Resolution.HOUR)
self.equity_symbols[ticker] = equity.symbol
# Add option chain
option = self.add_option(ticker, Resolution.HOUR)
price_model_index = self.get_parameter("price_model_index", 0)
price_models = [OptionPriceModels.black_scholes,
OptionPriceModels.binomial_cox_ross_rubinstein,
OptionPriceModels.bjerksund_stensland,
OptionPriceModels.crank_nicolson_fd]
price_model = price_models[price_model_index]
option.price_model = price_model()
# price_modelsOptionPriceModels.black_scholes()
# option.price_model = OptionPriceModels.black_scholes()
# .binomial_cox_ross_rubinstein()
# Set filter: -20 to +20 strikes, 0 to 50 days
# (We'll filter more precisely in OnData)
# option.set_filter(-20, 20, timedelta(0), timedelta(days=50))
option.set_filter(lambda x: x.include_weeklys().strikes(-10, 10).expiration(5, 40))
self.option_symbols[ticker] = option.symbol
self.log(f"✓ Added {ticker} equity and options to universe")
# ==================================================================
# 6. WARM-UP PERIOD
# ==================================================================
# Need at least 60 days of history for momentum and skew Z-scores
self.set_warm_up(timedelta(days=140))
self.log("=" * 60)
self.log("Skew Capture Strategy Initialized")
self.log(f"Universe: {', '.join(self.universe_tickers)}")
self.log(f"Capital: ${self.portfolio.cash:,.2f}")
self.log("=" * 60)
# ======================================================================
# DATA PROCESSING & METRIC CALCULATION
# ======================================================================
def on_data(self, slice: Slice) -> None:
if not self.is_warming_up:
# Check exit conditions for existing positions first
self._check_exits(slice)
# Only process at 3pm (hour 15)
if self.time.hour != self.hour_of_entry: #testing 10am TODO: change back to 15
return
# Only process at 3:58pm
# if self.time.hour != 15 or self.time.minute != 58:
# return
# Ensure once per day (even during warmup)
current_date = self.time.date()
if self.last_process_date == current_date:
return
self.last_process_date = current_date
# Accumulate history (during warmup and after)
self._accumulate_price_history(slice)
self._accumulate_skew_history(slice)
if self.is_warming_up:
return
# Scan for new entry opportunities
self._scan_for_entries(slice)
def _accumulate_price_history(self, slice: Slice) -> None:
"""
Store daily closing prices for all tickers to calculate
realized volatility and momentum
"""
for ticker, symbol in self.equity_symbols.items():
if symbol in slice.bars:
close_price = slice.bars[symbol].close
self.price_history[ticker].append(close_price)
# Keep only what we need (60 days + 1 for calculations)
max_history = max(self.momentum_lookback, self.rv_lookback) + 1
if len(self.price_history[ticker]) > max_history:
self.price_history[ticker] = self.price_history[ticker][-max_history:]
def _accumulate_skew_history(self, slice: Slice) -> None:
"""
Calculate and store daily call and put skew for all tickers
This runs during warm-up and regular operation.
"""
# We only need to store history for tickers that have an option chain today
for ticker in self.universe_tickers:
if ticker not in slice.option_chains:
continue
chain = slice.option_chains[ticker]
if chain is not None and len(chain) > 0:
## TODO: Here we should be trying to get contracts closest to 30 DTE
# //////////////////
# Use the nearest valid expiration to calculate and store daily skew
# valid_expirations = self._filter_expirations(chain)
contracts = self._filter_contracts_by_dte(chain, self.skew_dte)
if contracts:
# target_expiry = min(valid_expirations)
# contracts = [c for c in chain if c.expiry == target_expiry]
# NOTE: _calculate_skew now uses fixed 50D and 20D for historical measurement
call_metrics = self._calculate_skew(contracts, OptionRight.CALL, True)
put_metrics = self._calculate_skew(contracts, OptionRight.PUT)
max_skew_history = self.skew_history_days + 10
# Store call skew history
if call_metrics['skew'] is not None:
self.call_skew_history[ticker].append(call_metrics['skew'])
if len(self.call_skew_history[ticker]) > max_skew_history:
self.call_skew_history[ticker] = self.call_skew_history[ticker][-max_skew_history:]
if( not self.is_warming_up):
self.plot("call skew", ticker, call_metrics['skew'])
# Store put skew history
if put_metrics['skew'] is not None:
self.put_skew_history[ticker].append(put_metrics['skew'])
if len(self.put_skew_history[ticker]) > max_skew_history:
self.put_skew_history[ticker] = self.put_skew_history[ticker][-max_skew_history:]
# ======================================================================
# METRIC CALCULATIONS
# ======================================================================
def _calculate_realized_volatility(self, ticker: str) -> float:
"""
Calculate 20-day annualized realized volatility using log returns
RV = sqrt(252) * StDev(ln(P_t / P_{t-1})) for t=1...20
Returns:
Annualized volatility as decimal (e.g., 0.25 = 25% vol)
"""
prices = self.price_history[ticker]
if len(prices) < self.rv_lookback + 1:
return None
# Get last 21 prices (to calculate 20 returns)
recent_prices = prices[-(self.rv_lookback + 1):]
# Calculate log returns
log_returns = []
for i in range(1, len(recent_prices)):
log_ret = np.log(recent_prices[i] / recent_prices[i-1])
log_returns.append(log_ret)
# Annualized standard deviation
daily_std = np.std(log_returns, ddof=1)
annualized_vol = daily_std * np.sqrt(252)
return annualized_vol
def _calculate_momentum(self, ticker: str) -> float:
"""
Calculate 60-day momentum as log return
MOM = ln(P_current / P_60_days_ago)
Returns:
Log return as decimal (e.g., 0.05 = 5% momentum)
"""
prices = self.price_history[ticker]
if len(prices) < self.momentum_lookback + 1:
return None
current_price = prices[-1]
past_price = prices[-(self.momentum_lookback + 1)]
momentum = np.log(current_price / past_price)
return momentum
def _calculate_iv_rv_ratio(self, iv: float, rv: float) -> float:
"""
Calculate the IV/RV ratio.
"""
if rv is None or rv == 0:
return None
return iv / rv
def _get_iv_from_indicator(self, option_contract, underlying_symbol: Symbol) -> float:
"""
Calculate IV using indicator with mirror option for smoothing.
Args:
option_contract: The option contract to get IV for
underlying_symbol: The underlying equity symbol
Returns:
Implied volatility as decimal or None if unavailable
"""
try:
# Find mirror option (opposite right, same strike, same expiry)
mirror_right = OptionRight.PUT if option_contract.right == OptionRight.CALL else OptionRight.CALL
# Get the option chain to find mirror
option_symbol = self.option_symbols.get(option_contract.symbol.underlying.value)
if option_symbol is None:
return None
# Request the mirror option contract symbol
mirror_symbol = Symbol.create_option(
option_contract.symbol.underlying,
option_contract.symbol.id.market,
option_contract.symbol.id.option_style,
mirror_right,
option_contract.symbol.id.strike_price,
option_contract.symbol.id.date
)
# Add the mirror contract if not already added
if mirror_symbol not in self.securities:
self.add_option_contract(mirror_symbol, Resolution.HOUR)
# Create IV indicator
iv_indicator = self.iv(option_contract.symbol, mirror_symbol)
# Use indicator_history to get the most recent IV value
# Request 1 bar of history at daily resolution to get end-of-day style calculation
indicator_history = self.indicator_history(
iv_indicator,
[underlying_symbol, option_contract.symbol, mirror_symbol],
1,
Resolution.DAILY
)
# Extract the IV value
if iv_indicator.is_ready:
return float(iv_indicator.current.value)
return None
except Exception as e:
self.log(f"Error getting IV from indicator: {e}")
return None
def _calculate_skew(self, contracts: list, right: OptionRight, log_msg:bool = False) -> dict:
"""
Calculate IV skew for calls or puts relative to ATM.
Uses IV indicators with mirror option smoothing instead of slice IVs.
Skew = (ATM_IV - IV_20_delta) / ATM_IV
where ATM_IV = average of 50Δ call and 50Δ put IVs from indicators
Args:
contracts: List of option contracts for one expiration
right: OptionRight.CALL or OptionRight.PUT
Returns:
Dict with keys: 'skew', 'atm_iv', 'otm_iv', etc.
Returns None values if insufficient data
"""
# Get underlying symbol for indicator history
ticker = contracts[0].symbol.underlying.value
underlying_symbol = self.equity_symbols.get(ticker)
if underlying_symbol is None:
return {'skew': None, 'iv_short_delta': None,
'strike_long_delta': None, 'strike_short_delta': None}
# --- Calculate ATM_IV from 50D call and 50D put using INDICATORS ---
call_50d = min(
[c for c in contracts if c.right == OptionRight.CALL and c.greeks.delta is not None],
key=lambda c: abs(c.greeks.delta - 0.50),
default=None
)
put_50d = min(
[c for c in contracts if c.right == OptionRight.PUT and c.greeks.delta is not None],
key=lambda c: abs(c.greeks.delta - (-0.50)),
default=None
)
# CRITICAL: Get IVs from indicators, not from slice
if call_50d is None or put_50d is None:
atm_iv = None
else:
call_50d_iv = self._get_iv_from_indicator(call_50d, underlying_symbol)
put_50d_iv = self._get_iv_from_indicator(put_50d, underlying_symbol)
if call_50d_iv is None or call_50d_iv <= 0 or put_50d_iv is None or put_50d_iv <= 0:
atm_iv = None
else:
if self.skew_use_avg_atm_iv:
atm_iv = (call_50d_iv + put_50d_iv) / 2
else:
atm_iv = call_50d_iv if right == OptionRight.CALL else put_50d_iv
# --- Get OTM IV using INDICATOR ---
otm_delta = 0.25 # 25 delta for skew measurement
# Filter for the specified right (call or put)
filtered = [c for c in contracts if c.right == right]
if not filtered or atm_iv is None:
return {'skew': None, 'iv_short_delta': None,
'strike_long_delta': None, 'strike_short_delta': None}
# Delta sign: Call delta is positive, Put delta is negative
otm_delta = otm_delta if right == OptionRight.CALL else -otm_delta
# Find OTM contract (25D)
otm_contract = min(
[c for c in filtered if c.greeks.delta is not None],
key=lambda c: abs(c.greeks.delta - otm_delta),
default=None
)
if otm_contract is None:
return {'skew': None, 'iv_short_delta': None,
'strike_long_delta': None, 'strike_short_delta': None}
# Get OTM IV from indicator
otm_iv = self._get_iv_from_indicator(otm_contract, underlying_symbol)
if otm_iv is None or otm_iv <= 0:
return {'skew': None, 'iv_short_delta': None,
'strike_long_delta': None, 'strike_short_delta': None}
# Calculate skew
if self.skew_normalize_on_atm:
skew = (atm_iv - otm_iv) / atm_iv
else:
skew = (atm_iv - otm_iv)
opt_right = 'CALL' if right == OptionRight.CALL else 'PUT '
if log_msg:
self.log(f"{opt_right} SKEW_CALC: \t skew:{skew:.6f} DTE:{(otm_contract.expiry.date() - self.time.date()).days} - ({otm_contract.expiry.date().strftime('%m-%d-%Y')}) " )
return {
'skew': skew,
'skew_dte': (otm_contract.expiry.date() - self.time.date()).days,
'skew_expiry': otm_contract.expiry.date(),
'skew_right':opt_right,
'atm_iv': atm_iv,
'otm_iv':otm_iv,
}
def _calculate_skew_zscore(self, ticker: str, current_skew: float,
skew_type: str) -> float:
"""
Calculate Z-score of current skew relative to 60-day history
Z = (Current - 60D_Mean) / 60D_StDev
Args:
ticker: Ticker symbol
current_skew: Current skew value
skew_type: 'call' or 'put'
Returns:
Z-score value or None if insufficient history
"""
# Select appropriate history
if skew_type == 'call':
history = self.call_skew_history[ticker]
else:
history = self.put_skew_history[ticker]
# Need at least n days of history
if len(history) < self.skew_history_days:
return None
# Get last n days; exclude most recent
recent_history = history[-self.skew_history_days-1:-1]
# Calculate mean and std dev
mean = np.mean(recent_history)
std = np.std(recent_history, ddof=1)
# Avoid division by zero
if std == 0:
return None
# Calculate Z-score
z_score = (current_skew - mean) / std
return z_score
# ======================================================================
# ENTRY SCANNING & SIGNAL GENERATION
# ======================================================================
def _scan_for_entries(self, slice: Slice) -> None:
"""
Scan all tickers for entry opportunities.
This is the main daily scan that checks each ticker for valid setups.
"""
# self.log("\n" + "─" * 60)
# self.log(f"DAILY SCAN: {self.time.date()}")
# self.log("─" * 60)
# --- BEGIN LOGGING METRICS FOR ALL TICKERS ---
# We'll collect all metrics before doing any entry logic, for display purposes.
log_metrics = []
for ticker in self.universe_tickers:
# Check if we already have a position in this ticker
if ticker in self.active_positions:
continue
# Get current price
symbol = self.equity_symbols[ticker]
if symbol not in slice.bars:
continue
underlying_price = slice.bars[symbol].close
# Get option chain
if ticker not in slice.option_chains:
continue
chain = slice.option_chains[ticker]
if chain is None or len(chain) == 0:
# Log the filter failure
self.log(f"DBG: {ticker} skipped. No option chain data.")
continue
# Calculate metrics
rv = self._calculate_realized_volatility(ticker)
# TODO:
# Once we have momentum, check if it passes criteria before digging
# into skew calcs. Right now we dont really use momenum until we are
# in _process_chain_for_entry , but we should use it here
momentum = self._calculate_momentum(ticker)
# Calculate daily skew metrics using the nearest valid expiration
call_metrics = None
put_metrics = None
call_zscore = None
put_zscore = None
# valid_expirations = self._filter_expirations(chain)
contracts = self._filter_contracts_by_dte(chain, self.skew_dte)
if contracts:
# target_expiry = min(valid_expirations)
# contracts = [c for c in chain if c.expiry == target_expiry]
call_metrics = self._calculate_skew(contracts, OptionRight.CALL)
put_metrics = self._calculate_skew(contracts, OptionRight.PUT)
# Note: Skew history storage is now in _accumulate_skew_history
if call_metrics['skew'] is not None:
call_zscore = self._calculate_skew_zscore(ticker, call_metrics['skew'], 'call')
if put_metrics['skew'] is not None:
put_zscore = self._calculate_skew_zscore(ticker, put_metrics['skew'], 'put')
call_skew = call_metrics['skew'] if call_metrics is not None else None
put_skew = put_metrics['skew'] if put_metrics is not None else None
# Calculate IV/RV for Long and Short legs (using fixed deltas for now, to be updated later)
# call_ivrv_long = self._calculate_iv_rv_ratio(call_metrics['iv_long_delta'], rv) if (call_metrics and call_metrics['iv_long_delta'] is not None) else None
# call_ivrv_short = self._calculate_iv_rv_ratio(call_metrics['iv_short_delta'], rv) if (call_metrics and call_metrics['iv_short_delta'] is not None) else None
# put_ivrv_long = self._calculate_iv_rv_ratio(put_metrics['iv_long_delta'], rv) if (put_metrics and put_metrics['iv_long_delta'] is not None) else None
# put_ivrv_short = self._calculate_iv_rv_ratio(put_metrics['iv_short_delta'], rv) if (put_metrics and put_metrics['iv_short_delta'] is not None) else None
log_metrics.append({
'ticker': ticker,
'price': underlying_price,
'rv': rv,
'mom': momentum,
'c_skew': call_skew,
'c_z': call_zscore,
# 'c_ivrv_l': call_ivrv_long,
# 'c_ivrv_s': call_ivrv_short,
'p_skew': put_skew,
'p_z': put_zscore,
# 'p_ivrv_l': put_ivrv_long,
# 'p_ivrv_s': put_ivrv_short,
})
# --- END LOGGING METRICS & CALCULATING SKEW METRICS FOR ENTRY ---
# Filter: Minimum stock price
# TODO: We should check these before calc z score
if underlying_price < self.min_stock_price:
# Log the filter failure
self.log(f"DBG: {ticker} skipped. Price ${underlying_price:.2f} < Min ${self.min_stock_price:.2f}")
continue
# Check for sufficient momentum history
# TODO: Check this before calc skew and z score
if momentum is None:
self.log(f"DBG: {ticker} skipped. No momentum data.")
continue
# Process the chain for entry signals, passing the calculated metrics
self._process_chain_for_entry(ticker, chain, underlying_price, momentum, rv,
call_metrics, put_metrics, call_zscore, put_zscore)
# Log all metrics at the end of the daily scan
for m in log_metrics:
c_skew_str = (f"Csk:{m['c_skew']:.4f} Z:{m['c_z']:.2f} "
if m['c_skew'] is not None and m['c_z'] is not None
else (f"Csk:{m['c_skew']:.4f} Z:N/A " if m['c_skew'] is not None else "Csk:N/A Z:N/A "))
p_skew_str = (f"Psk:{m['p_skew']:.4f} Z:{m['p_z']:.2f}"
if m['p_skew'] is not None and m['p_z'] is not None
else (f"Psk:{m['p_skew']:.4f} Z:N/A" if m['p_skew'] is not None else "Psk:N/A Z:N/A"))
# TODO: Bring Back Later.
# self.log(
# f"METRIC: {m['ticker']} Px:{m['price']:.2f} " +
# (f"RV20:{m['rv']*100:.2f}% " if m['rv'] is not None else "RV20:N/A ") +
# (f"MOM60:{m['mom']*100:.2f}% " if m['mom'] is not None else "MOM60:N/A ") +
# c_skew_str +
# p_skew_str )
def _process_chain_for_entry(self, ticker: str, chain,
underlying_price: float, momentum: float, rv: float,
call_metrics: dict, put_metrics: dict,
call_zscore: float, put_zscore: float) -> None:
"""
Process option chain to find valid entry setups
Args:
ticker: Ticker symbol
chain: Option chain data
underlying_price: Current underlying price
momentum: 60-day momentum value
rv: Realized volatility
call_metrics: Dict of call skew metrics for target expiry
put_metrics: Dict of put skew metrics for target expiry
call_zscore: Current call skew Z-score
put_zscore: Current put skew Z-score
"""
# Filter chain for valid expiration (7-21 DTE)
valid_expirations = self._filter_expirations(chain)
if not valid_expirations:
# Log the filter failure
self.log(f"DBG: {ticker} skipped. No valid expiry in [{self.min_dte}-{self.max_dte}] DTE.")
return
# Use nearest valid expiration
# TODO:
# we should actually scan all expirations when we searc
# for the best spread. i think we can do this inside
# the 'attempt' methods, where we do that searching
target_expiry = min(valid_expirations)
days_to_expiry = (target_expiry.date() - self.time.date()).days
# #TODO: Check for earnings between now and expiration
# Note: This is commented out in original, but if implemented, it's a filter
# if self._has_earnings_before_expiry(ticker, target_expiry):
# return
# NOTE: Skew calculation has been moved to _scan_for_entries and passed in.
# Check for bullish call spread entry
if momentum >= self.bullish_momentum_threshold and call_metrics is not None and call_metrics['skew'] is not None:
if call_zscore is not None:
# Log momentum and skew condition check
if call_zscore > self.skew_zscore_threshold:
self.log(f"DBG: {ticker} Call Spread Fail. Z-score {call_zscore:.2f} > Threshold {self.skew_zscore_threshold:.2f}. MOM: {momentum*100:.2f}%.")
if call_zscore <= self.skew_zscore_threshold:
self.log(f"DBG: {ticker} Call Spread Pass MOM/SKEW. MOM: {momentum*100:.2f}%, Z: {call_zscore:.2f}. DTE: {days_to_expiry}.")
self._attempt_call_spread_entry(ticker, chain, target_expiry, underlying_price,
momentum, call_zscore, rv)
else:
self.log(f"DBG: {ticker} Call Spread Fail. Insufficient skew history for Z-score.")
# Check for bearish put spread entry
if momentum <= self.bearish_momentum_threshold and put_metrics is not None and put_metrics['skew'] is not None:
if put_zscore is not None:
# Log momentum and skew condition check
if put_zscore > self.skew_zscore_threshold:
self.log(f"DBG: {ticker} Put Spread Fail. Z-score {put_zscore:.2f} > Threshold {self.skew_zscore_threshold:.2f}. MOM: {momentum*100:.2f}%.")
if put_zscore <= self.skew_zscore_threshold:
self.log(f"DBG: {ticker} Put Spread Pass MOM/SKEW. MOM: {momentum*100:.2f}%, Z: {put_zscore:.2f}. DTE: {days_to_expiry}.")
self._attempt_put_spread_entry(ticker, chain, target_expiry, underlying_price,
momentum, put_zscore, rv)
else:
self.log(f"DBG: {ticker} Put Spread Fail. Insufficient skew history for Z-score.")
def _filter_expirations(self, chain) -> list:
"""
Filter option chain for expirations within DTE range
------------------
TODO:
We seem to run this logic to filter expirations all
the time, but we can probbaly do it only once.
------------------
Returns:
List of valid expiration dates
"""
valid_expirations = []
for contract in chain:
dte = (contract.expiry.date() - self.time.date()).days
if self.min_dte <= dte <= self.max_dte:
if contract.expiry not in valid_expirations:
valid_expirations.append(contract.expiry)
return valid_expirations
def _filter_contracts_by_dte(self, chain, target_dte: int) -> list:
"""
Finds the single expiration date closest to the target_dte
and returns all contracts for that expiration.
Args:
chain: The option chain slice.
target_dte: The target days-to-expiration (e.g., 30).
Returns:
A list of OptionContract objects for the closest DTE,
or an empty list if the chain is empty.
"""
if not chain:
return []
# Find all unique expiration dates and calculate their DTE
exp_to_dte = {
c.expiry: (c.expiry.date() - self.time.date()).days
for c in chain
}
if not exp_to_dte:
return []
# Find the expiration date that is closest to the target_dte
closest_expiry = min(
exp_to_dte.keys(),
key=lambda expiry: abs(exp_to_dte[expiry] - target_dte)
)
# Return all contracts associated with that closest expiration date
return [c for c in chain if c.expiry == closest_expiry]
def _has_earnings_before_expiry(self, ticker: str, expiry: datetime) -> bool:
"""
Check if earnings are scheduled between now and expiration
Args:
ticker: Ticker symbol
expiry: Option expiration date
Returns:
True if earnings scheduled, False otherwise
"""
# Get the equity symbol
symbol = self.equity_symbols[ticker]
# Access fundamental data for earnings date
# Note: In QuantConnect, earnings dates may not always be available
# This is a basic implementation
if hasattr(self.securities[symbol], 'fundamentals') and self.securities[symbol].fundamentals is not None:
fundamentals = self.securities[symbol].fundamentals
if hasattr(fundamentals, 'earning_reports') and fundamentals.earning_reports is not None:
earning_reports = fundamentals.earning_reports
if hasattr(earning_reports, 'file_date') and earning_reports.file_date is not None:
next_earnings = earning_reports.file_date
# Check if earnings between now and expiry
if self.time.date() < next_earnings.date() <= expiry.date():
self.log(f"⊗ {ticker}: Earnings on {next_earnings.date()}, skipping")
return True
return False
def _attempt_call_spread_entry(self, ticker: str, chain, expiry: datetime,
underlying_price: float, momentum: float,
call_zscore: float, rv: float) -> None:
"""
Attempt to enter a bullish call debit spread by searching across delta ranges
and optimizing for maximum IV/RV differential
Spread: Long 45-55Δ Call / Short 10-25Δ Call
"""
# Get contracts for target expiry
contracts = [c for c in chain if c.expiry == expiry and c.right == OptionRight.CALL]
if not contracts:
self.log(f"DBG: {ticker} Call Spread Fail. No call contracts for expiry {expiry.date()}.")
return
# Search for optimal spread combination
best_spread = self._find_optimal_spread(
contracts=contracts,
rv=rv,
right=OptionRight.CALL
)
if best_spread is None:
self.log(f"DBG: {ticker} Call Spread Fail. No valid strike combination found.")
return
long_contract = best_spread['long_contract']
short_contract = best_spread['short_contract']
# Log strike and IV information
self.log(f"DBG: {ticker} Call Strikes: Long {abs(long_contract.greeks.delta)*100:.0f}Δ @ {long_contract.strike:.2f} (IV: {long_contract.implied_volatility:.4f}, IV/RV: {best_spread['long_ivrv']:.2f}), Short {abs(short_contract.greeks.delta)*100:.0f}Δ @ {short_contract.strike:.2f} (IV: {short_contract.implied_volatility:.4f}, IV/RV: {best_spread['short_ivrv']:.2f}). Differential: {best_spread['differential']:.2f}")
# Validate liquidity: bid-ask spread check
long_liq = self._check_liquidity(long_contract)
short_liq = self._check_liquidity(short_contract)
if not long_liq or not short_liq:
self.log(f"DBG: {ticker} Call Spread Fail. Liquidity Check. Long OK: {long_liq}, Short OK: {short_liq}.")
return
# Calculate spread pricing
long_mid = (long_contract.bid_price + long_contract.ask_price) / 2
short_mid = (short_contract.bid_price + short_contract.ask_price) / 2
debit = long_mid - short_mid
if debit <= 0:
self.log(f"DBG: {ticker} Call Spread Fail. Debit is ${debit:.4f}.")
return
# Calculate max profit and loss
strike_width = short_contract.strike - long_contract.strike
max_profit = strike_width - debit
max_loss = debit
# Check reward/risk ratio
reward_risk = max_profit / max_loss if max_loss > 0 else 0
if reward_risk < self.min_reward_risk_ratio:
self.log(f"DBG: {ticker} Call Spread Fail. R/R {reward_risk:.2f}x < Min {self.min_reward_risk_ratio:.2f}x.")
return
# Create call_metrics dict for logging
call_metrics = {
'skew': (long_contract.implied_volatility - short_contract.implied_volatility) / long_contract.implied_volatility,
# 'iv_long_delta': long_contract.implied_volatility,
'iv_short_delta': short_contract.implied_volatility,
'reward_risk': reward_risk,
'long_ivrv': best_spread['long_ivrv'],
'short_ivrv': best_spread['short_ivrv'],
'differential': best_spread['differential']
}
# All filters passed - enter the trade
self._enter_call_spread(ticker, long_contract, short_contract, debit,
max_profit, max_loss, expiry, momentum, call_zscore, call_metrics)
def _attempt_put_spread_entry(self, ticker: str, chain, expiry: datetime,
underlying_price: float, momentum: float,
put_zscore: float, rv: float) -> None:
"""
Attempt to enter a bearish put debit spread by searching across delta ranges
and optimizing for maximum IV/RV differential
Spread: Long 45-55Δ Put / Short 10-25Δ Put
"""
# Get contracts for target expiry
contracts = [c for c in chain if c.expiry == expiry and c.right == OptionRight.PUT]
if not contracts:
self.log(f"DBG: {ticker} Put Spread Fail. No put contracts for expiry {expiry.date()}.")
return
# Search for optimal spread combination
best_spread = self._find_optimal_spread(
contracts=contracts,
rv=rv,
right=OptionRight.PUT
)
if best_spread is None:
self.log(f"DBG: {ticker} Put Spread Fail. No valid strike combination found.")
return
long_contract = best_spread['long_contract']
short_contract = best_spread['short_contract']
# Log strike and IV information
self.log(f"DBG: {ticker} Put Strikes: Long {abs(long_contract.greeks.delta)*100:.0f}Δ @ {long_contract.strike:.2f} (IV: {long_contract.implied_volatility:.4f}, IV/RV: {best_spread['long_ivrv']:.2f}), Short {abs(short_contract.greeks.delta)*100:.0f}Δ @ {short_contract.strike:.2f} (IV: {short_contract.implied_volatility:.4f}, IV/RV: {best_spread['short_ivrv']:.2f}). Differential: {best_spread['differential']:.2f}")
# Validate liquidity
long_liq = self._check_liquidity(long_contract)
short_liq = self._check_liquidity(short_contract)
if not long_liq or not short_liq:
self.log(f"DBG: {ticker} Put Spread Fail. Liquidity Check. Long OK: {long_liq}, Short OK: {short_liq}.")
return
# Calculate spread pricing
long_mid = (long_contract.bid_price + long_contract.ask_price) / 2
short_mid = (short_contract.bid_price + short_contract.ask_price) / 2
debit = long_mid - short_mid
if debit <= 0:
self.log(f"DBG: {ticker} Put Spread Fail. Debit is ${debit:.4f}.")
return
# Calculate max profit and loss
strike_width = long_contract.strike - short_contract.strike
max_profit = strike_width - debit
max_loss = debit
# Check reward/risk ratio
reward_risk = max_profit / max_loss if max_loss > 0 else 0
if reward_risk < self.min_reward_risk_ratio:
self.log(f"DBG: {ticker} Put Spread Fail. R/R {reward_risk:.2f}x < Min {self.min_reward_risk_ratio:.2f}x.")
return
# Create put_metrics dict for logging
put_metrics = {
'skew': (long_contract.implied_volatility - short_contract.implied_volatility) / long_contract.implied_volatility,
# 'iv_long_delta': long_contract.implied_volatility,
'iv_short_delta': short_contract.implied_volatility,
'reward_risk': reward_risk,
'long_ivrv': best_spread['long_ivrv'],
'short_ivrv': best_spread['short_ivrv'],
'differential': best_spread['differential']
}
# All filters passed - enter the trade
self._enter_put_spread(ticker, long_contract, short_contract, debit,
max_profit, max_loss, expiry, momentum, put_zscore, put_metrics)
def _find_optimal_spread(self, contracts: list, rv: float, right: OptionRight) -> dict:
"""
Search across delta ranges to find the optimal spread that maximizes IV/RV differential
Args:
contracts: List of option contracts (calls or puts) for target expiry
rv: Realized volatility
right: OptionRight.CALL or OptionRight.PUT
Returns:
Dict with optimal spread details or None if no valid combination found
{
'long_contract': contract,
'short_contract': contract,
'long_ivrv': float,
'short_ivrv': float,
'differential': float,
'reward_risk': float
}
"""
# Filter contracts with valid delta and IV
valid_contracts = [
c for c in contracts
if c.greeks.delta is not None
and c.implied_volatility is not None
and c.implied_volatility > 0
]
if not valid_contracts:
return None
# RV validation
if rv is None or rv == 0:
return None
# Determine delta sign based on option type
delta_sign = 1 if right == OptionRight.CALL else -1
# Find long leg candidates (45-55Δ)
long_candidates = [
c for c in valid_contracts
if self.long_delta_min <= abs(c.greeks.delta) <= self.long_delta_max
]
# Find short leg candidates (10-25Δ)
short_candidates = [
c for c in valid_contracts
if self.short_delta_min <= abs(c.greeks.delta) <= self.short_delta_max
]
if not long_candidates or not short_candidates:
return None
# Search for best combination
best_spread = None
best_differential = -float('inf')
for long_contract in long_candidates:
# Calculate IV/RV for long leg
long_ivrv = long_contract.implied_volatility / rv
# Check long leg pricing thresholds
if long_ivrv > self.long_ivrv_max:
continue # Long leg too expensive
for short_contract in short_candidates:
# Validate strike ordering
if right == OptionRight.CALL:
if short_contract.strike <= long_contract.strike:
continue # Short strike must be higher for call spreads
else: # PUT
if short_contract.strike >= long_contract.strike:
continue # Short strike must be lower for put spreads
# Calculate IV/RV for short leg
short_ivrv = short_contract.implied_volatility / rv
# Check short leg pricing thresholds
if short_ivrv < self.short_ivrv_min:
continue # Short leg not expensive enough
# Calculate differential
differential = short_ivrv - long_ivrv
# Check differential requirement
if differential < self.min_ivrv_differential:
continue # Differential not large enough
# Validate relative IV: short leg IV must be > long leg IV
if short_contract.implied_volatility <= long_contract.implied_volatility:
continue
# Update best spread if this differential is better
if differential > best_differential:
best_differential = differential
best_spread = {
'long_contract': long_contract,
'short_contract': short_contract,
'long_ivrv': long_ivrv,
'short_ivrv': short_ivrv,
'differential': differential
}
return best_spread
def _check_liquidity(self, contract) -> bool:
"""
Check if option contract meets liquidity requirements
Bid-ask spread must be <= 15% of mid price
"""
if contract.bid_price <= 0 or contract.ask_price <= 0:
return False
mid = (contract.bid_price + contract.ask_price) / 2
spread = contract.ask_price - contract.bid_price
if mid == 0:
return False
spread_pct = spread / mid
return spread_pct <= self.max_bid_ask_spread
def _enter_call_spread(self, ticker: str, long_contract, short_contract,
debit: float, max_profit: float, max_loss: float,
expiry: datetime, momentum: float, call_zscore: float, call_metrics: dict) -> None:
"""
Execute entry of bullish call debit spread and record position
"""
# Calculate quantity based on capital allocation
# Allocate 5% of portfolio to each trade (conservative sizing)
allocation = self.portfolio.total_portfolio_value * self.pct_of_folio_size
max_quantity = int(allocation / (debit * 100)) # 100 shares per contract
if max_quantity < 1:
self.log(f"DBG: {ticker} Call Spread Fail. Max quantity {max_quantity} < 1. Debit: ${debit:.2f}, Allocation: ${allocation:.2f}.")
return
quantity = max_quantity
# Get additional metrics for tags (already calculated or easily accessible)
underlying_price = self.securities[self.equity_symbols[ticker]].price
rv = self._calculate_realized_volatility(ticker)
# Create concise order tag
tag = (f" [⬆ CALL] Px:{underlying_price:.2f} | RR:{call_metrics['reward_risk']:.2f} | "
f"RV:{rv:.3f} | MOM:{momentum:.3f} | "
f"CSk:{call_metrics['skew']:.4f} | Zs:{call_zscore:.2f} | "
f"IVRV_L:{call_metrics['long_ivrv']:.2f} | IVRV_S:{call_metrics['short_ivrv']:.2f} | Diff:{call_metrics['differential']:.2f} | "
f"MaxP:{max_profit * quantity * 100:.2f} | MaxL:{max_loss * quantity * 100:.2f} | "
f"LSpread:{(long_contract.ask_price - long_contract.bid_price):.2f} | "
f"SSpread:{(short_contract.ask_price - short_contract.bid_price):.2f}")
# Execute market orders
self.market_order(long_contract.symbol, quantity, tag=tag)
self.market_order(short_contract.symbol, -quantity, tag=tag)
# Record position
self.active_positions[ticker] = {
'type': 'call_spread',
'long_symbol': long_contract.symbol,
'short_symbol': short_contract.symbol,
'long_strike': long_contract.strike,
'short_strike': short_contract.strike,
'quantity': quantity,
'entry_date': self.time,
'expiry': expiry,
'entry_debit': debit,
'max_profit': max_profit,
'max_loss': max_loss,
'profit_target': max_profit * self.profit_target_pct,
'entry_momentum': momentum,
'entry_zscore': call_zscore
}
self.log("═" * 60)
self.log(f"⬆ ENTERED BULLISH CALL SPREAD - {ticker}")
self.log(f" Long: {quantity}x {long_contract.strike:.2f} Call @ {(long_contract.bid_price + long_contract.ask_price)/2:.2f}")
self.log(f" Short: {quantity}x {short_contract.strike:.2f} Call @ {(short_contract.bid_price + short_contract.ask_price)/2:.2f}")
self.log(f" Net Debit: ${debit * quantity * 100:,.2f}")
self.log(f" Max Profit: ${max_profit * quantity * 100:,.2f} | Max Loss: ${max_loss * quantity * 100:,.2f}")
self.log(f" R/R: {max_profit/max_loss:.2f}x | Expiry: {expiry.date()} ({(expiry.date() - self.time.date()).days} DTE)")
self.log(f" Momentum: {momentum*100:.2f}% | Call Skew Z: {call_zscore:.2f}")
self.log(f" IV/RV: Long {call_metrics['long_ivrv']:.2f}, Short {call_metrics['short_ivrv']:.2f}, Diff {call_metrics['differential']:.2f}")
self.log("═" * 60)
def _enter_put_spread(self, ticker: str, long_contract, short_contract,
debit: float, max_profit: float, max_loss: float,
expiry: datetime, momentum: float, put_zscore: float, put_metrics: dict) -> None:
"""
Execute entry of bearish put debit spread and record position
"""
# Calculate quantity
allocation = self.portfolio.total_portfolio_value * 0.05
max_quantity = int(allocation / (debit * 100))
if max_quantity < 1:
self.log(f"DBG: {ticker} Put Spread Fail. Max quantity {max_quantity} < 1. Debit: ${debit:.2f}, Allocation: ${allocation:.2f}.")
return
quantity = max_quantity
# Get additional metrics for tags (already calculated or easily accessible)
underlying_price = self.securities[self.equity_symbols[ticker]].price
rv = self._calculate_realized_volatility(ticker)
# Create concise order tag
tag = (f" [⬇ PUT] Px:{underlying_price:.2f} | RR:{put_metrics['reward_risk']:.2f} | "
f"RV:{rv:.3f} | MOM:{momentum:.3f} | "
f"PSk:{put_metrics['skew']:.4f} | Zs:{put_zscore:.2f} | "
f"IVRV_L:{put_metrics['long_ivrv']:.2f} | IVRV_S:{put_metrics['short_ivrv']:.2f} | Diff:{put_metrics['differential']:.2f} | "
f"MaxP:{max_profit * quantity * 100:.2f} | MaxL:{max_loss * quantity * 100:.2f} | "
f"LSpread:{(long_contract.ask_price - long_contract.bid_price):.2f} | "
f"SSpread:{(short_contract.ask_price - short_contract.bid_price):.2f}")
# Execute market orders
self.market_order(long_contract.symbol, quantity, tag=tag)
self.market_order(short_contract.symbol, -quantity, tag=tag)
# Record position
self.active_positions[ticker] = {
'type': 'put_spread',
'long_symbol': long_contract.symbol,
'short_symbol': short_contract.symbol,
'long_strike': long_contract.strike,
'short_strike': short_contract.strike,
'quantity': quantity,
'entry_date': self.time,
'expiry': expiry,
'entry_debit': debit,
'max_profit': max_profit,
'max_loss': max_loss,
'profit_target': max_profit * self.profit_target_pct,
'entry_momentum': momentum,
'entry_zscore': put_zscore
}
self.log("═" * 60)
self.log(f"⬇ ENTERED BEARISH PUT SPREAD - {ticker}")
self.log(f" Long: {quantity}x {long_contract.strike:.2f} Put @ {(long_contract.bid_price + long_contract.ask_price)/2:.2f}")
self.log(f" Short: {quantity}x {short_contract.strike:.2f} Put @ {(short_contract.bid_price + short_contract.ask_price)/2:.2f}")
self.log(f" Net Debit: ${debit * quantity * 100:,.2f}")
self.log(f" Max Profit: ${max_profit * quantity * 100:,.2f} | Max Loss: ${max_loss * quantity * 100:,.2f}")
self.log(f" R/R: {max_profit/max_loss:.2f}x | Expiry: {expiry.date()} ({(expiry.date() - self.time.date()).days} DTE)")
self.log(f" Momentum: {momentum*100:.2f}% | Put Skew Z: {put_zscore:.2f}")
self.log(f" IV/RV: Long {put_metrics['long_ivrv']:.2f}, Short {put_metrics['short_ivrv']:.2f}, Diff {put_metrics['differential']:.2f}")
self.log("═" * 60)
# ======================================================================
# EXIT LOGIC
# ======================================================================
def _check_exits(self, slice: Slice) -> None:
"""
Check all active positions for exit conditions:
1. Profit target (80% of max profit)
2. Time exit (2 days before expiry if not OTM)
3. Expiration (let expire if both legs OTM)
"""
tickers_to_close = []
for ticker, position in self.active_positions.items():
# Check if we have current option prices
long_symbol = position['long_symbol']
short_symbol = position['short_symbol']
# Get current holdings
if not self.portfolio[long_symbol].invested or not self.portfolio[short_symbol].invested:
# Position already closed or expired
self.log(f"DBG: {ticker} Position already liquidated, removing from active_positions.")
tickers_to_close.append(ticker)
continue
# Get current option contracts from securities
if long_symbol not in self.securities or short_symbol not in self.securities:
self.log(f"DBG: {ticker} Security objects missing, skipping exit check.")
continue
long_sec = self.securities[long_symbol]
short_sec = self.securities[short_symbol]
# Check expiration
days_to_expiry = (position['expiry'].date() - self.time.date()).days
# Exit Rule 1: Expiration Day - Let it expire if both legs OTM
if days_to_expiry <= 0:
is_otm = self._both_legs_otm(ticker, position, slice)
if is_otm:
self.log(f"○ {ticker}: Both legs OTM at expiration, letting expire")
tickers_to_close.append(ticker)
else:
self.log(f"DBG: {ticker} Closing at EXPIRATION. Not OTM.")
self._close_position(ticker, position, "EXPIRATION")
tickers_to_close.append(ticker)
continue
# Exit Rule 2: Time Exit (2 days before expiry if not OTM)
if days_to_expiry <= self.days_before_expiry_exit:
is_otm = self._both_legs_otm(ticker, position, slice)
if not is_otm:
self.log(f"DBG: {ticker} Closing at TIME_EXIT. DTE {days_to_expiry} <= {self.days_before_expiry_exit} AND not OTM.")
self._close_position(ticker, position, "TIME_EXIT")
tickers_to_close.append(ticker)
else:
self.log(f"DBG: {ticker} Skipping TIME_EXIT. DTE {days_to_expiry} <= {self.days_before_expiry_exit} BUT both legs OTM.")
continue
# Exit Rule 3: Profit Target (80% of max profit)
current_pnl = self._calculate_position_pnl(position, long_sec, short_sec)
if current_pnl is not None:
self.log(f"DBG: {ticker} Current PnL: ${current_pnl * position['quantity'] * 100:,.2f}. Target: ${position['profit_target'] * position['quantity'] * 100:,.2f}.")
if current_pnl * position['quantity'] * 100 >= position['profit_target'] * position['quantity'] * 100:
self.log(f"DBG: {ticker} Closing at PROFIT_TARGET.")
self._close_position(ticker, position, "PROFIT_TARGET", current_pnl)
tickers_to_close.append(ticker)
# Remove closed positions
for ticker in tickers_to_close:
if ticker in self.active_positions:
del self.active_positions[ticker]
def _both_legs_otm(self, ticker: str, position: dict, slice: Slice) -> bool:
"""
Check if both legs of the spread are out of the money
"""
symbol = self.equity_symbols[ticker]
if symbol not in slice.bars:
return False
underlying_price = slice.bars[symbol].close
if position['type'] == 'call_spread':
# For call spread: OTM if underlying < long strike
is_otm = underlying_price < position['long_strike']
self.log(f"DBG: {ticker} OTM Check (Call): Px {underlying_price:.2f} < Long Strike {position['long_strike']:.2f} -> {is_otm}")
return is_otm
else: # put_spread
# For put spread: OTM if underlying > long strike
is_otm = underlying_price > position['long_strike']
self.log(f"DBG: {ticker} OTM Check (Put): Px {underlying_price:.2f} > Long Strike {position['long_strike']:.2f} -> {is_otm}")
return is_otm
def _calculate_position_pnl(self, position: dict, long_sec, short_sec) -> float:
"""
Calculate current P&L of the spread (per share/contract basis)
"""
try:
# Get current mid prices
long_mid = (long_sec.bid_price + long_sec.ask_price) / 2
short_mid = (short_sec.bid_price + short_sec.ask_price) / 2
if long_mid <= 0 or short_mid <= 0:
return None
# Current spread value (per share)
current_spread_value = long_mid - short_mid
# P&L (per contract) = Current Value - Entry Value
entry_value = position['entry_debit']
pnl_per_contract = current_spread_value - entry_value
return pnl_per_contract
except:
return None
def _close_position(self, ticker: str, position: dict, reason: str,
pnl_per_contract: float = None) -> None:
"""
Close the spread position by liquidating both legs
"""
# Calculate final P&L if not provided
if pnl_per_contract is None:
long_sec = self.securities[position['long_symbol']]
short_sec = self.securities[position['short_symbol']]
pnl_per_contract = self._calculate_position_pnl(position, long_sec, short_sec)
total_pnl = pnl_per_contract * position['quantity'] * 100 if pnl_per_contract is not None else 0
entry_cost = position['entry_debit'] * position['quantity'] * 100
# Calculate Exit Metrics
underlying_price = self.securities[self.equity_symbols[ticker]].price
# --- MODIFIED: Use DTE instead of Days Held ---
days_to_expiry = (position['expiry'].date() - self.time.date()).days
# ---------------------------------------------
max_profit_total = position['max_profit'] * position['quantity'] * 100
max_loss_total = position['max_loss'] * position['quantity'] * 100
# P&L as % of Max Profit and Max Loss (capped at 100% for profit side)
pnl_pct_max_profit = (total_pnl / max_profit_total) if max_profit_total != 0 else 0
# For loss, we calculate total P&L relative to the max loss amount. A profit will be > 0.
pnl_pct_max_loss = (total_pnl / max_loss_total) if max_loss_total != 0 else 0
# Create concise exit order tag
tag = (f" ⨂ {reason} | Px:{underlying_price:.2f} | DTE:{days_to_expiry} | "
f"PnL:{total_pnl:+.2f} |"
f"PctOfMaxP:{pnl_pct_max_profit:+.2f} | PctOfMaxL:{pnl_pct_max_loss:+.2f}")
# Liquidate both legs
self.liquidate(position['long_symbol'], tag)
self.liquidate(position['short_symbol'], tag)
self.log("─" * 60)
self.log(f"✕ CLOSED POSITION - {ticker} ({reason})")
self.log(f" Type: {position['type'].upper()}")
self.log(f" Held: {(self.time - position['entry_date']).days} days | DTE at Exit: {days_to_expiry}")
self.log(f" P&L: ${total_pnl:,.2f} (MaxP: {pnl_pct_max_profit*100:+.1f}%, MaxL: {pnl_pct_max_loss*100:+.1f}%)" if pnl_per_contract is not None else " P&L: N/A")
self.log("─" * 60)