| Overall Statistics |
|
Total Orders 315 Average Win 0.04% Average Loss -0.03% Compounding Annual Return -1.971% Drawdown 2.500% Expectancy 0.036 Start Equity 2000000 End Equity 1960795.9 Net Profit -1.960% Sharpe Ratio -4.62 Sortino Ratio -5.465 Probabilistic Sharpe Ratio 1.013% Loss Rate 57% Win Rate 43% Profit-Loss Ratio 1.42 Alpha -0.055 Beta -0.082 Annual Standard Deviation 0.014 Annual Variance 0 Information Ratio -1.659 Tracking Error 0.118 Treynor Ratio 0.795 Total Fees $406.60 Estimated Strategy Capacity $0 Lowest Capacity Asset TSLA UNU3P8Y3WFAD Portfolio Turnover 0.12% Drawdown Recovery 154 |
from AlgorithmImports import *
import numpy as np
from datetime import timedelta, date
from collections import defaultdict
class FVAPosition:
"""Tracks all 4 legs and metadata for one live FVA trade."""
def __init__(self, eq_sym, near_call, near_put, far_call, far_put,
near_expiry, n_contracts, spread, iv_1m, iv_fwd, sector,
entry_date):
self.eq_sym, self.near_call, self.near_put = eq_sym, near_call, near_put
self.far_call, self.far_put = far_call, far_put
self.near_expiry, self.n_contracts = near_expiry, n_contracts
self.spread, self.iv_1m, self.iv_fwd = spread, iv_1m, iv_fwd
self.sector, self.entry_date = sector, entry_date
self.pending_close = False
class ForwardVolatilityAssetStrategy(QCAlgorithm):
BIOTECH_SIC = {2836, 2835, 2830, 2833, 2834}
MIN_SPREAD_IMPROVEMENT = 0.05
def Initialize(self):
# start, end = date(2021, 1, 1), date(2022, 12, 31)
start, end = date(2023, 1, 1), date(2023, 12, 31)
self.SetStartDate(start.year, start.month, start.day)
self.SetEndDate(end.year, end.month, end.day) # 2-yr regime benchmark
self.SetCash(2_000_000)
# Per-entry ENT/STOP/REM lines blow QC's 10kb-per-BACKTEST log cap on
# multi-year runs. Keep them only for short (≤200d) windows; long runs
# emit the monthly MO aggregate only.
self._verbose_entries = (end - start).days <= 200
self.SetBrokerageModel(
BrokerageName.InteractiveBrokersBrokerage,
AccountType.Margin
)
spy = self.AddEquity("SPY", Resolution.Daily)
self.SetBenchmark(spy.Symbol)
# Raw normalization required for options. Must set BEFORE AddUniverse.
self.universe_settings.resolution = Resolution.Daily
self.universe_settings.data_normalization_mode = DataNormalizationMode.Raw
self.AddUniverse(self._universe_selector)
# ── Parameters (PROD values active; POC values commented for reference) ──
# POC: MAX_POSITIONS=3, MIN_OPTION_VOLUME=0, MIN_IV_FWD=0.10,
# POC: MAX_IV_FWD=5.00, MAX_BID_ASK_PCT=0.30, SPREAD_SD_FLOOR=0.0
self.MAX_POSITIONS = 10 # $200k/slot
self.TOTAL_CAPITAL = 2_000_000
# Tightened from (15,45)/(45,75) to (20,40)/(50,65) — shaves edge expiries
# off the subscribed chain to reduce DataManager memory footprint.
self.NEAR_DTE_MIN = 20
self.NEAR_DTE_MAX = 40
self.FAR_DTE_MIN = 50
self.FAR_DTE_MAX = 65
self.MIN_OPTION_VOLUME = 2_000 # daily option volume floor
self.MIN_IV_FWD = 0.16 # paper's min meaningful fwd IV
self.MAX_IV_FWD = 2.00 # data-quality cap
self.MAX_BID_ASK_PCT = 0.10 # spread tightness
self.EARNINGS_BUFFER = 30
# Cross-sectional sanity floor (paper's "1.5SD above market average").
# Primary selection is top-N by raw spread; this floor just rejects names
# that don't stand out vs the day's universe. Looser than paper's 1.5.
self.SPREAD_SD_FLOOR = 0.5
self.MAX_PER_SECTOR = 3
# Equal-risk sizing: each slot budgets the same 1σ underlying move over
# the holding period. ≈2% portfolio 1σ/month if all 10 slots are full —
# standard vol-targeting risk budget, not premium-derived.
self.TARGET_RISK_PCT = 0.02
# Stop loss — catastrophic MTM brake only; positions otherwise held to
# near-leg expiry (paper design). Directional move-stop removed.
self.STOP_LOSS_PCT = 0.50 # MTM < -PCT × slot_budget triggers exit
self.STOP_GRACE_DAYS = 1 # suppress stop for first N days post-entry
self._positions: dict = {}
self._option_cache: dict = {}
self._monthly_cache: dict = {}
self._sector_cache: dict = {}
self._active_equities: set = set()
self._sector_counts: dict = defaultdict(int)
# Schedule near close: daily option chain reaches CurrentSlice by then.
self.Schedule.On(
self.DateRules.EveryDay("SPY"),
self.TimeRules.BeforeMarketClose("SPY", 15),
self._daily_lifecycle_check
)
self.DIAGNOSTIC_MODE = False # full funnel report once per month
self._last_diag_month = -1
# Monthly log aggregation — QC free tier caps logs at 10kb/day, so we
# never log per-event; instead each closed trade folds into the month's
# bucket and one MO line is emitted at each month rollover.
self._monthly_stats = defaultdict(lambda: {
'ent': 0, 'exp': 0, 'stop': 0, 'rem': 0, 'skip': 0,
'win': 0, 'loss': 0, 'pnl': 0.0, 'slip': 0.0,
})
self._last_flush_month = None
# Warmup lets the universe + option subscriptions settle before live
# trading. The cross-sectional signal carries no per-symbol history.
self.SetWarmUp(70, Resolution.Daily)
# --- Universe selection ---
def _universe_selector(self, fundamentals):
# Pre-filters tuned for option liquidity (not just market cap):
# - Price ∈ [$20, $500]: avoids penny stocks AND Berkshire-class names
# whose option strike grids are unusable.
# - DollarVolume ≥ $50M/day: implies a real option market exists.
# - Rank by DollarVolume (liquidity), not MarketCap. Most-traded names
# typically carry the tightest option spreads and deepest chains.
filtered = []
for f in fundamentals:
if not (20 <= f.Price <= 500):
continue
if f.DollarVolume < 50_000_000:
continue
if f.MarketCap <= 0 or not f.HasFundamentalData:
continue
try:
if f.AssetClassification.SIC in self.BIOTECH_SIC:
continue
except Exception:
pass
filtered.append(f)
sorted_f = sorted(filtered, key=lambda f: f.DollarVolume, reverse=True)
# Two-stage funnel:
# Stage 1 (here): top 100 by DollarVolume — wider candidate pool using
# cheap fundamental filters. Bounded by DataManager memory (now safe
# thanks to RemoveSecurity cleanup in OnSecuritiesChanged).
# Stage 2 (daily): rank these 100 by raw spread, take MAX_POSITIONS that
# clear the cross-sectional floor. Selection by signal, not liquidity.
return [f.Symbol for f in sorted_f[:100]]
# --- Daily lifecycle ---
def _daily_lifecycle_check(self):
today = self.Time.date()
# Warmup only lets the universe + option subscriptions settle. The
# cross-sectional signal needs no per-symbol history, so nothing to seed.
if self.IsWarmingUp:
return
# Emit the prior month's aggregate line once the calendar month rolls over.
mkey = (today.year, today.month)
if self._last_flush_month is not None and mkey != self._last_flush_month:
self._flush_monthly(self._last_flush_month)
self._last_flush_month = mkey
# 1. Natural expiry: near leg has expired, close far leg
for eq_sym, pos in list(self._positions.items()):
if today > pos.near_expiry and not pos.pending_close:
pos.pending_close = True
self._record_close(pos, 'exp')
self._sell_far_leg(pos)
self._close_position(pos)
# 1b. Stop-loss check on remaining live positions
for eq_sym, pos in list(self._positions.items()):
if pos.pending_close:
continue
reason = self._check_stop_loss(pos, today)
if reason:
self._stop_out(pos, reason)
# 2. Early exit when portfolio full
open_slots = self.MAX_POSITIONS - len(self._positions)
if open_slots <= 0:
return
# 3. Score universe
run_diag = (self.DIAGNOSTIC_MODE and today.month != self._last_diag_month)
if run_diag:
self._last_diag_month = today.month
all_scores = self._score_universe(today, diagnostic=run_diag)
# Cross-sectional dispersion of today's spreads = the paper's "market
# average" reference. sd_floor is a looser sanity gate; primary selection
# is top-N by raw spread (the paper's Decile-10 logic).
spreads = [sd['spread'] for sd in all_scores.values()]
if len(spreads) >= 2:
arr = np.array(spreads)
sd_floor = arr.mean() + self.SPREAD_SD_FLOOR * arr.std()
else:
sd_floor = 0.0
# 4. Fill open slots: richest spread first, must clear thesis + sanity floors.
candidates = [(sym, sd) for sym, sd in all_scores.items()
if sym not in self._positions]
ranked = sorted(candidates, key=lambda x: x[1]['spread'], reverse=True)
filled = 0
for eq_sym, score_data in ranked:
if filled >= open_slots:
break
spread = score_data['spread']
# Thesis floor: near IV must be richer than forward (spread > 0).
# Sanity floor: spread must stand out vs today's cross-section.
if spread <= 0 or spread < sd_floor:
continue
iv_fwd = score_data['iv_fwd']
if not (self.MIN_IV_FWD <= iv_fwd <= self.MAX_IV_FWD):
continue
if self._has_upcoming_earnings(eq_sym, today):
self._monthly_stats[mkey]['skip'] += 1
continue
sector = score_data.get('sector', 'Unknown')
if self._sector_counts[sector] >= self.MAX_PER_SECTOR:
self._monthly_stats[mkey]['skip'] += 1
continue
if self.Portfolio.MarginRemaining < score_data['estimated_cost']:
self._monthly_stats[mkey]['skip'] += 1
continue
if self._enter_fva(eq_sym, score_data):
filled += 1
# --- Entry / exit ---
def _close_position(self, pos):
self._sector_counts[pos.sector] = max(0, self._sector_counts[pos.sector] - 1)
self._positions.pop(pos.eq_sym, None)
def _record_close(self, pos, reason):
# Capture MTM pnl before the legs liquidate, then fold this trade into the
# current month's bucket. reason ∈ {exp, stop, rem}.
pnl = self._mtm_pnl(pos)
m = self._monthly_stats[(self.Time.year, self.Time.month)]
m[reason] += 1
m['pnl'] += pnl
m['win' if pnl >= 0 else 'loss'] += 1
def _flush_monthly(self, key):
m = self._monthly_stats.get(key)
if not m:
return
y, mo = key
self.Log(f"MO {y}-{mo:02d} | ent={m['ent']} exp={m['exp']} stop={m['stop']} "
f"rem={m['rem']} skip={m['skip']} w/l={m['win']}/{m['loss']} "
f"pnl=${m['pnl']:,.0f} slip=${m['slip']:,.0f} "
f"eq=${self.Portfolio.TotalPortfolioValue:,.0f}")
def _halfspread_cost(self, bid, ask, n):
# Part A instrumentation: $ paid to cross from mid to the touch on one
# leg. gross(mid-fill) P&L = observed net + Σ these. Lets us decide if a
# term-structure edge exists BEFORE bid/ask execution destroys it (the
# paper's named #1 profit killer) without touching the fill path.
if bid <= 0 or ask <= 0 or ask < bid:
return 0.0
return (ask - bid) / 2.0 * 100 * n
def _sell_far_leg(self, pos):
m = self._monthly_stats[(self.Time.year, self.Time.month)]
for sym in (pos.far_call, pos.far_put):
if self.Portfolio[sym].Invested:
qty = self.Portfolio[sym].Quantity
sec = self.Securities[sym]
m['slip'] += self._halfspread_cost(sec.BidPrice, sec.AskPrice, abs(qty))
self.MarketOrder(sym, -qty, tag=f"FVA exit far leg {pos.eq_sym.Value}")
def _margin_for_one_spread(self, nc, np_, fc, fp):
# Sum initial margin across 4 legs at qty=1. Long legs = premium paid;
# short legs = brokerage-model formula. Fallback if API errors.
legs = [(nc.Symbol, -1), (np_.Symbol, -1), (fc.Symbol, +1), (fp.Symbol, +1)]
try:
total = 0.0
for sym, qty in legs:
sec = self.Securities[sym]
req = sec.BuyingPowerModel.GetInitialMarginRequirement(
InitialMarginParameters(sec, qty))
total += abs(req.Value)
return total
except Exception as e:
self.Log(f" margin-API fallback ({e}); 25%-notional heuristic")
short_notional = (nc.Strike + np_.Strike) * 100
far_premium = (fc.AskPrice + fp.AskPrice) * 100
return short_notional * 0.25 + far_premium
def _enter_fva(self, eq_sym, score_data):
slot_budget = self.TOTAL_CAPITAL / self.MAX_POSITIONS
estimated_cost = score_data['estimated_cost']
if estimated_cost <= 0:
return False
sector = score_data.get('sector', 'Unknown')
nc, np_, fc, fp = (score_data['near_call'], score_data['near_put'],
score_data['far_call'], score_data['far_put'])
# Equal-risk sizing. Premium-based sizing (slot ÷ cost) structurally
# explodes on cheap names (tiny premium → 10k-contract orders → margin
# rejections, Nov-2021 failure). Instead size so every position carries
# the same statistical risk: risk/contract = straddle notional × iv_1m ×
# √(dte/252) = 1σ $ move of the underlying notional over the hold.
near_dte = (score_data['near_expiry'] - self.Time.date()).days
risk_per_contract = ((nc.Strike + np_.Strike) * 100
* score_data['iv_1m'] * (near_dte / 252) ** 0.5)
if risk_per_contract <= 0:
return False
slot_risk = self.TOTAL_CAPITAL * self.TARGET_RISK_PCT / self.MAX_POSITIONS
n_by_risk = max(1, int(slot_risk / risk_per_contract))
# Margin-aware sizing via QC BuyingPowerModel
margin_per_spread = self._margin_for_one_spread(nc, np_, fc, fp)
if margin_per_spread <= 0:
self.Log(f" SKIP {eq_sym.Value}: margin query returned 0")
return False
n_by_margin = int(slot_budget / margin_per_spread)
if n_by_margin < 1:
self.Log(f" SKIP margin-cap {eq_sym.Value}: 1-spread margin"
f"=${margin_per_spread:,.0f} > slot ${slot_budget:,.0f}")
return False
n_contracts = min(n_by_risk, n_by_margin)
# Cap to actually-remaining account margin
margin_remaining = self.Portfolio.MarginRemaining
if margin_per_spread * n_contracts > margin_remaining:
n_contracts = int(margin_remaining / margin_per_spread)
if n_contracts < 1:
self.Log(f" SKIP margin-remaining {eq_sym.Value}: "
f"need ≥${margin_per_spread:,.0f}, have ${margin_remaining:,.0f}")
return False
# Atomic 4-leg execution via OptionStrategies combo orders
canonical = self._option_cache.get(eq_sym)
if canonical is None:
self.Log(f" SKIP {eq_sym.Value}: no option canonical")
return False
try:
short_near = OptionStrategies.short_straddle(canonical, nc.Strike, nc.Expiry)
long_far = OptionStrategies.straddle(canonical, fc.Strike, fc.Expiry)
self.Buy(short_near, n_contracts) # buy short_straddle = sell straddle
self.Buy(long_far, n_contracts) # buy long_straddle = buy straddle
self._positions[eq_sym] = FVAPosition(
eq_sym=eq_sym, near_call=nc.Symbol, near_put=np_.Symbol,
far_call=fc.Symbol, far_put=fp.Symbol,
near_expiry=score_data['near_expiry'], n_contracts=n_contracts,
spread=score_data['spread'], iv_1m=score_data['iv_1m'],
iv_fwd=score_data['iv_fwd'], sector=sector,
entry_date=self.Time.date(),
)
self._sector_counts[sector] += 1
mstat = self._monthly_stats[(self.Time.year, self.Time.month)]
mstat['ent'] += 1
mstat['slip'] += sum(self._halfspread_cost(l.BidPrice, l.AskPrice, n_contracts)
for l in (nc, np_, fc, fp))
if self._verbose_entries:
self.Log(f"ENT {eq_sym.Value} x{n_contracts} "
f"sp={score_data['spread']:.2f} "
f"iv={score_data['iv_1m']:.2f}/{score_data['iv_fwd']:.2f} "
f"K={nc.Strike:.0f} c=${estimated_cost * n_contracts:,.0f}")
return True
except Exception as e:
self.Log(f" ERROR entering {eq_sym.Value}: {e}")
return False
# --- Stop loss ---
def _mtm_pnl(self, pos):
return sum(self.Portfolio[s].UnrealizedProfit
for s in (pos.near_call, pos.near_put, pos.far_call, pos.far_put))
def _check_stop_loss(self, pos, today):
# Catastrophic-MTM brake only. The vol-scaled directional move-stop was
# removed: it exits on underlying direction, which is anti-thesis for a
# volatility trade and prevented the hold-to-near-expiry harvest the
# strategy depends on (was 24/36 closes, only 2/42 reached expiry).
if (today - pos.entry_date).days < self.STOP_GRACE_DAYS:
return None
slot_budget = self.TOTAL_CAPITAL / self.MAX_POSITIONS
mtm = self._mtm_pnl(pos)
if mtm < -self.STOP_LOSS_PCT * slot_budget:
return f"pnl=${mtm:,.0f} < -{self.STOP_LOSS_PCT:.0%}×${slot_budget:,.0f}"
return None
def _stop_out(self, pos, reason):
self._record_close(pos, 'stop')
if self._verbose_entries:
self.Log(f"STOP {pos.eq_sym.Value} {reason}")
canonical = self._option_cache.get(pos.eq_sym)
closed_via_combo = False
if canonical is not None:
try:
near_strike = pos.near_call.ID.StrikePrice
far_strike = pos.far_call.ID.StrikePrice
near_exp = pos.near_call.ID.Date
far_exp = pos.far_call.ID.Date
short_near = OptionStrategies.short_straddle(canonical, near_strike, near_exp)
long_far = OptionStrategies.straddle(canonical, far_strike, far_exp)
self.Sell(short_near, pos.n_contracts) # close short straddle
self.Sell(long_far, pos.n_contracts) # close long straddle
closed_via_combo = True
except Exception as e:
self.Log(f" combo close failed ({e}); per-leg fallback")
if not closed_via_combo:
for s in (pos.near_call, pos.near_put, pos.far_call, pos.far_put):
if self.Portfolio[s].Invested:
self.Liquidate(s, "STOP OUT fallback")
self._close_position(pos)
# --- Scoring engine ---
def _score_universe(self, today, diagnostic=False):
scores = {}
dropped = {
'no_price': 0,
'exception': [],
'no_option_sub': [],
'no_chain': [],
'low_volume': [],
'no_monthly_calls': [],
'no_near_calls': [],
'no_far_calls': [],
'bad_iv': [],
'bad_tau': [],
'negative_fwd_var': [],
'no_near_put': [],
'no_far_put': [],
'bid_ask_too_wide': [],
'passed': [],
} if diagnostic else None
for sym in self._active_equities:
sec = self.ActiveSecurities.get(sym)
if sec is None or not sec.Price > 0:
if diagnostic:
dropped['no_price'] += 1
continue
try:
result = self._compute_fva_spread(sym, today,
dropped=dropped, diagnostic=diagnostic)
if result is not None:
scores[sym] = result
if diagnostic:
dropped['passed'].append(sym.Value)
except Exception as e:
if diagnostic:
dropped['exception'].append((sym.Value, str(e)))
if diagnostic:
self.Log("=" * 60)
self.Log(f"DIAGNOSTIC SCORE REPORT — {today}")
self.Log(f" Universe equities : {len(self._active_equities)}")
self.Log(f" Skipped (no price) : {dropped['no_price']}")
self.Log(f" Exceptions : {len(dropped['exception'])}")
for t, e in dropped['exception'][:5]:
self.Log(f" {t}: {e}")
for label, key, show in [
("No option subscription", "no_option_sub", 5),
("Chain absent/empty", "no_chain", 5),
("Low option volume", "low_volume", 5),
("No standard monthly", "no_monthly_calls", 5),
("No near calls (DTE)", "no_near_calls", 5),
("No far calls (DTE)", "no_far_calls", 5),
("Bad IV (<=0)", "bad_iv", 5),
("Bad tau (tau2<=tau1)", "bad_tau", 5),
("Negative fwd variance", "negative_fwd_var", 5),
("No ATM near put", "no_near_put", 5),
("No ATM far put", "no_far_put", 5),
("Bid-ask too wide", "bid_ask_too_wide", 5),
("PASSED all filters", "passed", 10),
]:
self.Log(f" {label:<24}: {len(dropped[key])} — {dropped[key][:show]}")
self.Log("=" * 60)
return scores
def _compute_fva_spread(self, eq_sym, today, dropped=None, diagnostic=False):
ticker = eq_sym.Value if diagnostic else None
def drop(bucket, detail=None):
if diagnostic and dropped is not None:
entry = ticker if detail is None else f"{ticker}({detail})"
dropped[bucket].append(entry)
return None
option_sym = self._get_option_subscription(eq_sym)
if option_sym is None:
return drop('no_option_sub')
chain = self.CurrentSlice.OptionChains.get(option_sym)
if chain is None or not chain.Contracts:
return drop('no_chain')
contracts = list(chain.Contracts.Values)
underlying_price = chain.Underlying.Price
if underlying_price <= 0:
return drop('no_chain', f'price={underlying_price}')
if self.MIN_OPTION_VOLUME > 0:
total_vol = sum(c.Volume for c in contracts)
if total_vol < self.MIN_OPTION_VOLUME:
return drop('low_volume', f'vol={total_vol}')
# Single pass: bucket calls (by near/far DTE) + puts (by expiry)
near_calls, far_calls = [], []
monthly_call_expiries = set()
puts_by_expiry = defaultdict(list)
for c in contracts:
expiry = c.Expiry.date()
if c.Right == OptionRight.Call:
if c.ImpliedVolatility > 0 and c.BidPrice > 0 and self._is_standard_monthly(expiry):
monthly_call_expiries.add(expiry)
dte = (expiry - today).days
if self.NEAR_DTE_MIN <= dte <= self.NEAR_DTE_MAX:
near_calls.append(c)
elif self.FAR_DTE_MIN <= dte <= self.FAR_DTE_MAX:
far_calls.append(c)
else:
if c.ImpliedVolatility > 0 and c.BidPrice > 0:
puts_by_expiry[expiry].append(c)
if not monthly_call_expiries:
return drop('no_monthly_calls', f'total_contracts={len(contracts)}')
if not near_calls:
return drop('no_near_calls', f'monthly_expiries={sorted(monthly_call_expiries)}')
if not far_calls:
return drop('no_far_calls', f'monthly_expiries={sorted(monthly_call_expiries)}')
near_atm = min(near_calls, key=lambda c: abs(c.Strike - underlying_price))
far_atm = min(far_calls, key=lambda c: abs(c.Strike - underlying_price))
iv_1m, iv_2m = near_atm.ImpliedVolatility, far_atm.ImpliedVolatility
if iv_1m <= 0 or iv_2m <= 0:
return drop('bad_iv', f'iv_1m={iv_1m:.4f} iv_2m={iv_2m:.4f}')
tau1 = (near_atm.Expiry.date() - today).days
tau2 = (far_atm.Expiry.date() - today).days
if tau2 <= tau1 or (tau2 - tau1) < 5:
return drop('bad_tau', f'tau1={tau1} tau2={tau2}')
fwd_var = (iv_2m**2 * tau2 - iv_1m**2 * tau1) / (tau2 - tau1)
if fwd_var <= 0:
return drop('negative_fwd_var',
f'fwd_var={fwd_var:.6f} iv1={iv_1m:.3f} iv2={iv_2m:.3f}')
iv_fwd = np.sqrt(fwd_var)
spread = (iv_1m / iv_fwd) - 1
# Put at SAME strike as call → true straddle (required by OptionStrategies API)
near_put = next((p for p in puts_by_expiry.get(near_atm.Expiry.date(), [])
if p.Strike == near_atm.Strike), None)
if near_put is None:
return drop('no_near_put', f'strike={near_atm.Strike}')
far_put = next((p for p in puts_by_expiry.get(far_atm.Expiry.date(), [])
if p.Strike == far_atm.Strike), None)
if far_put is None:
return drop('no_far_put', f'strike={far_atm.Strike}')
for leg in (near_atm, near_put, far_atm, far_put):
mid = (leg.BidPrice + leg.AskPrice) / 2
if mid <= 0:
return drop('bid_ask_too_wide', f'{leg.Symbol.Value} mid=0')
ba_pct = (leg.AskPrice - leg.BidPrice) / mid
if ba_pct > self.MAX_BID_ASK_PCT:
return drop('bid_ask_too_wide', f'{leg.Symbol.Value} ba%={ba_pct:.1%}')
far_cost = (far_atm.AskPrice + far_put.AskPrice) * 100
near_credit = (near_atm.BidPrice + near_put.BidPrice) * 100
estimated_cost = max(far_cost - near_credit, 50)
return {
'spread': spread, 'iv_1m': iv_1m, 'iv_fwd': iv_fwd, 'iv_2m': iv_2m,
'near_call': near_atm, 'far_call': far_atm,
'near_put': near_put, 'far_put': far_put,
'near_expiry': near_atm.Expiry.date(), 'far_expiry': far_atm.Expiry.date(),
'estimated_cost': estimated_cost, 'underlying_price': underlying_price,
'sector': self._get_sector(eq_sym, today),
}
# --- Filter helpers ---
def _has_upcoming_earnings(self, eq_sym, today):
# Heuristic: last EarningReports.FileDate + 90d ≈ next quarterly earnings.
# Imprecise (~±2 weeks) but catches NFLX-style traps inside near-leg DTE.
try:
sec = self.ActiveSecurities.get(eq_sym)
fund = getattr(sec, 'Fundamentals', None) if sec else None
er = getattr(fund, 'EarningReports', None) if fund else None
fd = getattr(er, 'FileDate', None) if er else None
val = getattr(fd, 'Value', None) if fd is not None else None
if val is None or not hasattr(val, 'date'):
return False
days_to = ((val.date() + timedelta(days=90)) - today).days
return -3 <= days_to <= self.EARNINGS_BUFFER
except Exception:
return False
def _get_sector(self, eq_sym, today):
cached = self._sector_cache.get(eq_sym)
if cached and (today - cached[0]).days < 5:
return cached[1]
try:
sec = self.ActiveSecurities.get(eq_sym)
if sec and sec.Fundamentals:
code = str(sec.Fundamentals.AssetClassification.MorningstarSectorCode)
self._sector_cache[eq_sym] = (today, code)
return code
except Exception:
pass
self._sector_cache[eq_sym] = (today, 'Unknown')
return 'Unknown'
# --- Helpers ---
def _subscribe_options(self, eq_sym):
# Resolution.Hour: 8 bars/day (vs 390 for Minute) → ~50× less option data
# to process. Schedule fires at 15:45; the 15:30 hour-bar chain is in
# CurrentSlice by then. Daily would not deliver chains intraday at all.
# Pass Symbol (not .Value) to avoid canonical key mismatch.
if eq_sym in self._option_cache:
return
try:
opt = self.AddOption(eq_sym, Resolution.Hour)
opt.SetFilter(lambda u: u
.Strikes(-1, +1) # ATM ± 1 strike — minimum for matched-strike straddle
.Expiration(self.NEAR_DTE_MIN, self.FAR_DTE_MAX))
self._option_cache[eq_sym] = opt.Symbol
except Exception:
self._option_cache[eq_sym] = None
def _get_option_subscription(self, eq_sym):
return self._option_cache.get(eq_sym)
def _is_standard_monthly(self, d):
# 3rd Friday = weekday==4 (Fri) and day in [15,21]. Cached.
if d in self._monthly_cache:
return self._monthly_cache[d]
result = d.weekday() == 4 and 15 <= d.day <= 21
self._monthly_cache[d] = result
return result
# --- Housekeeping ---
def OnEndOfAlgorithm(self):
# No rollover fires for the final (partial) month — flush it explicitly.
if self._last_flush_month is not None:
self._flush_monthly(self._last_flush_month)
def OnSecuritiesChanged(self, changes):
for added in changes.AddedSecurities:
if added.Symbol.SecurityType == SecurityType.Equity:
self._active_equities.add(added.Symbol)
self._subscribe_options(added.Symbol)
for removed in changes.RemovedSecurities:
sym = removed.Symbol
if sym.SecurityType != SecurityType.Equity:
continue
self._active_equities.discard(sym)
# Close active position first (uses its leg subscriptions).
if sym in self._positions:
pos = self._positions[sym]
self._record_close(pos, reason='rem')
if self._verbose_entries:
self.Log(f"REM {sym.Value}")
self._sell_far_leg(pos)
for s in (pos.near_call, pos.near_put):
if self.Portfolio[s].Invested:
self.Liquidate(s, "Universe removal")
self._close_position(pos)
# Release chain subscription + per-symbol caches. Without this,
# stale option chains accumulate in DataManager (130+ over 6mo
# of universe churn) and trigger OutOfMemoryException.
# Safe inside OnSecuritiesChanged (data-engine context); avoid
# calling Remove/AddSecurity from scheduled events.
self._sector_cache.pop(sym, None)
canonical = self._option_cache.pop(sym, None)
if canonical is not None:
try:
self.RemoveSecurity(canonical)
except Exception:
pass # no-op if API rejects; no worse than prior leak