Overall Statistics
Total Orders
537
Average Win
0.04%
Average Loss
-0.04%
Compounding Annual Return
-0.285%
Drawdown
3.000%
Expectancy
-0.105
Start Equity
2000000
End Equity
1988649.28
Net Profit
-0.568%
Sharpe Ratio
-0.706
Sortino Ratio
-0.824
Probabilistic Sharpe Ratio
3.066%
Loss Rate
56%
Win Rate
44%
Profit-Loss Ratio
1.03
Alpha
-0.014
Beta
0.083
Annual Standard Deviation
0.018
Annual Variance
0
Information Ratio
-0.224
Tracking Error
0.149
Treynor Ratio
-0.15
Total Fees
$592.75
Estimated Strategy Capacity
$0
Lowest Capacity Asset
NB R735QTJ8XC9X
Portfolio Turnover
0.09%
Drawdown Recovery
229
from AlgorithmImports import *
import numpy as np
from datetime import timedelta, date
from collections import defaultdict


class FVAPosition:
    """Tracks all 4 legs and metadata for one live FVA trade."""
    def __init__(self, eq_sym, near_call, near_put, far_call, far_put,
                 near_expiry, n_contracts, spread, iv_1m, iv_fwd, sector,
                 entry_date):
        self.eq_sym, self.near_call, self.near_put = eq_sym, near_call, near_put
        self.far_call, self.far_put = far_call, far_put
        self.near_expiry, self.n_contracts = near_expiry, n_contracts
        self.spread, self.iv_1m, self.iv_fwd = spread, iv_1m, iv_fwd
        self.sector, self.entry_date = sector, entry_date
        self.pending_close = False


class ForwardVolatilityAssetStrategy(QCAlgorithm):

    BIOTECH_SIC = {2836, 2835, 2830, 2833, 2834}
    MIN_SPREAD_IMPROVEMENT = 0.05

    def Initialize(self):
        start, end = date(2021, 1, 1), date(2022, 12, 31)
        # start, end = date(2023, 1, 1), date(2023, 12, 31)
        
        self.SetStartDate(start.year, start.month, start.day)
        self.SetEndDate(end.year, end.month, end.day)   # 2-yr regime benchmark
        self.SetCash(2_000_000)

        # Per-entry ENT/STOP/REM lines blow QC's 10kb-per-BACKTEST log cap on
        # multi-year runs. Keep them only for short (≤200d) windows; long runs
        # emit the monthly MO aggregate only.
        self._verbose_entries = (end - start).days <= 200

        self.SetBrokerageModel(
            BrokerageName.InteractiveBrokersBrokerage,
            AccountType.Margin
        )

        spy = self.AddEquity("SPY", Resolution.Daily)
        self.SetBenchmark(spy.Symbol)

        # Raw normalization required for options. Must set BEFORE AddUniverse.
        self.universe_settings.resolution = Resolution.Daily
        self.universe_settings.data_normalization_mode = DataNormalizationMode.Raw
        self.AddUniverse(self._universe_selector)

        # ── Parameters (PROD values active; POC values commented for reference) ──
        # POC: MAX_POSITIONS=3, MIN_OPTION_VOLUME=0, MIN_IV_FWD=0.10,
        # POC: MAX_IV_FWD=5.00, MAX_BID_ASK_PCT=0.30, SPREAD_SD_FLOOR=0.0
        self.MAX_POSITIONS      = 10           # $200k/slot
        self.TOTAL_CAPITAL      = 2_000_000
        # Tightened from (15,45)/(45,75) to (20,40)/(50,65) — shaves edge expiries
        # off the subscribed chain to reduce DataManager memory footprint.
        self.NEAR_DTE_MIN       = 20
        self.NEAR_DTE_MAX       = 40
        self.FAR_DTE_MIN        = 50
        self.FAR_DTE_MAX        = 65
        self.MIN_OPTION_VOLUME  = 2_000        # daily option volume floor
        self.MIN_IV_FWD         = 0.16         # paper's min meaningful fwd IV
        self.MAX_IV_FWD         = 2.00         # data-quality cap
        self.MAX_BID_ASK_PCT    = 0.10         # spread tightness
        self.EARNINGS_BUFFER    = 30
        # Cross-sectional sanity floor (paper's "1.5SD above market average").
        # Primary selection is top-N by raw spread; this floor just rejects names
        # that don't stand out vs the day's universe. Looser than paper's 1.5.
        self.SPREAD_SD_FLOOR    = 0.5
        self.MAX_PER_SECTOR     = 3
        # Equal-risk sizing: each slot budgets the same 1σ underlying move over
        # the holding period. ≈2% portfolio 1σ/month if all 10 slots are full —
        # standard vol-targeting risk budget, not premium-derived.
        self.TARGET_RISK_PCT    = 0.02

        # Stop loss — catastrophic MTM brake only; positions otherwise held to
        # near-leg expiry (paper design). Directional move-stop removed.
        self.STOP_LOSS_PCT      = 0.50   # MTM < -PCT × slot_budget triggers exit
        self.STOP_GRACE_DAYS    = 1      # suppress stop for first N days post-entry

        self._positions:        dict = {}
        self._option_cache:     dict = {}
        self._monthly_cache:    dict = {}
        self._sector_cache:     dict = {}
        self._active_equities:  set  = set()
        self._sector_counts:    dict = defaultdict(int)

        # Schedule near close: daily option chain reaches CurrentSlice by then.
        self.Schedule.On(
            self.DateRules.EveryDay("SPY"),
            self.TimeRules.BeforeMarketClose("SPY", 15),
            self._daily_lifecycle_check
        )

        self.DIAGNOSTIC_MODE  = False    # full funnel report once per month
        self._last_diag_month = -1

        # Monthly log aggregation — QC free tier caps logs at 10kb/day, so we
        # never log per-event; instead each closed trade folds into the month's
        # bucket and one MO line is emitted at each month rollover.
        self._monthly_stats = defaultdict(lambda: {
            'ent': 0, 'exp': 0, 'stop': 0, 'rem': 0, 'skip': 0,
            'win': 0, 'loss': 0, 'pnl': 0.0, 'slip': 0.0,
        })
        self._last_flush_month = None

        # Warmup lets the universe + option subscriptions settle before live
        # trading. The cross-sectional signal carries no per-symbol history.
        self.SetWarmUp(70, Resolution.Daily)

    # --- Universe selection ---

    def _universe_selector(self, fundamentals):
        # Pre-filters tuned for option liquidity (not just market cap):
        #   - Price ∈ [$20, $500]: avoids penny stocks AND Berkshire-class names
        #     whose option strike grids are unusable.
        #   - DollarVolume ≥ $50M/day: implies a real option market exists.
        #   - Rank by DollarVolume (liquidity), not MarketCap. Most-traded names
        #     typically carry the tightest option spreads and deepest chains.
        filtered = []
        for f in fundamentals:
            if not (20 <= f.Price <= 500):
                continue
            if f.DollarVolume < 50_000_000:
                continue
            if f.MarketCap <= 0 or not f.HasFundamentalData:
                continue
            try:
                if f.AssetClassification.SIC in self.BIOTECH_SIC:
                    continue
            except Exception:
                pass
            filtered.append(f)
        sorted_f = sorted(filtered, key=lambda f: f.DollarVolume, reverse=True)
        # Two-stage funnel:
        #   Stage 1 (here): top 100 by DollarVolume — wider candidate pool using
        #     cheap fundamental filters. Bounded by DataManager memory (now safe
        #     thanks to RemoveSecurity cleanup in OnSecuritiesChanged).
        #   Stage 2 (daily): rank these 100 by raw spread, take MAX_POSITIONS that
        #     clear the cross-sectional floor. Selection by signal, not liquidity.
        return [f.Symbol for f in sorted_f[:100]]

    # --- Daily lifecycle ---

    def _daily_lifecycle_check(self):
        today = self.Time.date()

        # Warmup only lets the universe + option subscriptions settle. The
        # cross-sectional signal needs no per-symbol history, so nothing to seed.
        if self.IsWarmingUp:
            return

        # Emit the prior month's aggregate line once the calendar month rolls over.
        mkey = (today.year, today.month)
        if self._last_flush_month is not None and mkey != self._last_flush_month:
            self._flush_monthly(self._last_flush_month)
        self._last_flush_month = mkey

        # 1. Natural expiry: near leg has expired, close far leg
        for eq_sym, pos in list(self._positions.items()):
            if today > pos.near_expiry and not pos.pending_close:
                pos.pending_close = True
                self._record_close(pos, 'exp')
                self._sell_far_leg(pos)
                self._close_position(pos)

        # 1b. Stop-loss check on remaining live positions
        for eq_sym, pos in list(self._positions.items()):
            if pos.pending_close:
                continue
            reason = self._check_stop_loss(pos, today)
            if reason:
                self._stop_out(pos, reason)

        # 2. Early exit when portfolio full
        open_slots = self.MAX_POSITIONS - len(self._positions)
        if open_slots <= 0:
            return

        # 3. Score universe
        run_diag = (self.DIAGNOSTIC_MODE and today.month != self._last_diag_month)
        if run_diag:
            self._last_diag_month = today.month
        all_scores = self._score_universe(today, diagnostic=run_diag)

        # Cross-sectional dispersion of today's spreads = the paper's "market
        # average" reference. sd_floor is a looser sanity gate; primary selection
        # is top-N by raw spread (the paper's Decile-10 logic).
        spreads = [sd['spread'] for sd in all_scores.values()]
        if len(spreads) >= 2:
            arr = np.array(spreads)
            sd_floor = arr.mean() + self.SPREAD_SD_FLOOR * arr.std()
        else:
            sd_floor = 0.0

        # 4. Fill open slots: richest spread first, must clear thesis + sanity floors.
        candidates = [(sym, sd) for sym, sd in all_scores.items()
                      if sym not in self._positions]
        ranked = sorted(candidates, key=lambda x: x[1]['spread'], reverse=True)

        filled = 0
        for eq_sym, score_data in ranked:
            if filled >= open_slots:
                break
            spread = score_data['spread']
            # Thesis floor: near IV must be richer than forward (spread > 0).
            # Sanity floor: spread must stand out vs today's cross-section.
            if spread <= 0 or spread < sd_floor:
                continue
            iv_fwd = score_data['iv_fwd']
            if not (self.MIN_IV_FWD <= iv_fwd <= self.MAX_IV_FWD):
                continue
            if self._has_upcoming_earnings(eq_sym, today):
                self._monthly_stats[mkey]['skip'] += 1
                continue
            sector = score_data.get('sector', 'Unknown')
            if self._sector_counts[sector] >= self.MAX_PER_SECTOR:
                self._monthly_stats[mkey]['skip'] += 1
                continue
            if self.Portfolio.MarginRemaining < score_data['estimated_cost']:
                self._monthly_stats[mkey]['skip'] += 1
                continue
            if self._enter_fva(eq_sym, score_data):
                filled += 1

    # --- Entry / exit ---

    def _close_position(self, pos):
        self._sector_counts[pos.sector] = max(0, self._sector_counts[pos.sector] - 1)
        self._positions.pop(pos.eq_sym, None)

    def _record_close(self, pos, reason):
        # Capture MTM pnl before the legs liquidate, then fold this trade into the
        # current month's bucket. reason ∈ {exp, stop, rem}.
        pnl = self._mtm_pnl(pos)
        m = self._monthly_stats[(self.Time.year, self.Time.month)]
        m[reason] += 1
        m['pnl']  += pnl
        m['win' if pnl >= 0 else 'loss'] += 1

    def _flush_monthly(self, key):
        m = self._monthly_stats.get(key)
        if not m:
            return
        y, mo = key
        self.Log(f"MO {y}-{mo:02d} | ent={m['ent']} exp={m['exp']} stop={m['stop']} "
                 f"rem={m['rem']} skip={m['skip']} w/l={m['win']}/{m['loss']} "
                 f"pnl=${m['pnl']:,.0f} slip=${m['slip']:,.0f} "
                 f"eq=${self.Portfolio.TotalPortfolioValue:,.0f}")

    def _halfspread_cost(self, bid, ask, n):
        # Part A instrumentation: $ paid to cross from mid to the touch on one
        # leg. gross(mid-fill) P&L = observed net + Σ these. Lets us decide if a
        # term-structure edge exists BEFORE bid/ask execution destroys it (the
        # paper's named #1 profit killer) without touching the fill path.
        if bid <= 0 or ask <= 0 or ask < bid:
            return 0.0
        return (ask - bid) / 2.0 * 100 * n

    def _sell_far_leg(self, pos):
        m = self._monthly_stats[(self.Time.year, self.Time.month)]
        for sym in (pos.far_call, pos.far_put):
            if self.Portfolio[sym].Invested:
                qty = self.Portfolio[sym].Quantity
                sec = self.Securities[sym]
                m['slip'] += self._halfspread_cost(sec.BidPrice, sec.AskPrice, abs(qty))
                self.MarketOrder(sym, -qty, tag=f"FVA exit far leg {pos.eq_sym.Value}")

    def _margin_for_one_spread(self, nc, np_, fc, fp):
        # Sum initial margin across 4 legs at qty=1. Long legs = premium paid;
        # short legs = brokerage-model formula. Fallback if API errors.
        legs = [(nc.Symbol, -1), (np_.Symbol, -1), (fc.Symbol, +1), (fp.Symbol, +1)]
        try:
            total = 0.0
            for sym, qty in legs:
                sec = self.Securities[sym]
                req = sec.BuyingPowerModel.GetInitialMarginRequirement(
                    InitialMarginParameters(sec, qty))
                total += abs(req.Value)
            return total
        except Exception as e:
            self.Log(f"  margin-API fallback ({e}); 25%-notional heuristic")
            short_notional = (nc.Strike + np_.Strike) * 100
            far_premium    = (fc.AskPrice + fp.AskPrice) * 100
            return short_notional * 0.25 + far_premium

    def _enter_fva(self, eq_sym, score_data):
        slot_budget    = self.TOTAL_CAPITAL / self.MAX_POSITIONS
        estimated_cost = score_data['estimated_cost']
        if estimated_cost <= 0:
            return False

        sector       = score_data.get('sector', 'Unknown')
        nc, np_, fc, fp = (score_data['near_call'], score_data['near_put'],
                           score_data['far_call'], score_data['far_put'])

        # Equal-risk sizing. Premium-based sizing (slot ÷ cost) structurally
        # explodes on cheap names (tiny premium → 10k-contract orders → margin
        # rejections, Nov-2021 failure). Instead size so every position carries
        # the same statistical risk: risk/contract = straddle notional × iv_1m ×
        # √(dte/252) = 1σ $ move of the underlying notional over the hold.
        near_dte = (score_data['near_expiry'] - self.Time.date()).days
        risk_per_contract = ((nc.Strike + np_.Strike) * 100
                             * score_data['iv_1m'] * (near_dte / 252) ** 0.5)
        if risk_per_contract <= 0:
            return False
        slot_risk = self.TOTAL_CAPITAL * self.TARGET_RISK_PCT / self.MAX_POSITIONS
        n_by_risk = max(1, int(slot_risk / risk_per_contract))

        # Margin-aware sizing via QC BuyingPowerModel
        margin_per_spread = self._margin_for_one_spread(nc, np_, fc, fp)
        if margin_per_spread <= 0:
            self.Log(f"  SKIP {eq_sym.Value}: margin query returned 0")
            return False
        n_by_margin = int(slot_budget / margin_per_spread)
        if n_by_margin < 1:
            self.Log(f"  SKIP margin-cap {eq_sym.Value}: 1-spread margin"
                     f"=${margin_per_spread:,.0f} > slot ${slot_budget:,.0f}")
            return False
        n_contracts = min(n_by_risk, n_by_margin)

        # Cap to actually-remaining account margin
        margin_remaining = self.Portfolio.MarginRemaining
        if margin_per_spread * n_contracts > margin_remaining:
            n_contracts = int(margin_remaining / margin_per_spread)
            if n_contracts < 1:
                self.Log(f"  SKIP margin-remaining {eq_sym.Value}: "
                         f"need ≥${margin_per_spread:,.0f}, have ${margin_remaining:,.0f}")
                return False

        # Atomic 4-leg execution via OptionStrategies combo orders
        canonical = self._option_cache.get(eq_sym)
        if canonical is None:
            self.Log(f"  SKIP {eq_sym.Value}: no option canonical")
            return False

        try:
            short_near = OptionStrategies.short_straddle(canonical, nc.Strike, nc.Expiry)
            long_far   = OptionStrategies.straddle(canonical, fc.Strike, fc.Expiry)
            self.Buy(short_near, n_contracts)   # buy short_straddle = sell straddle
            self.Buy(long_far,   n_contracts)   # buy long_straddle  = buy straddle

            self._positions[eq_sym] = FVAPosition(
                eq_sym=eq_sym, near_call=nc.Symbol, near_put=np_.Symbol,
                far_call=fc.Symbol, far_put=fp.Symbol,
                near_expiry=score_data['near_expiry'], n_contracts=n_contracts,
                spread=score_data['spread'], iv_1m=score_data['iv_1m'],
                iv_fwd=score_data['iv_fwd'], sector=sector,
                entry_date=self.Time.date(),
            )
            self._sector_counts[sector] += 1
            mstat = self._monthly_stats[(self.Time.year, self.Time.month)]
            mstat['ent'] += 1
            mstat['slip'] += sum(self._halfspread_cost(l.BidPrice, l.AskPrice, n_contracts)
                                 for l in (nc, np_, fc, fp))
            if self._verbose_entries:
                self.Log(f"ENT {eq_sym.Value} x{n_contracts} "
                         f"sp={score_data['spread']:.2f} "
                         f"iv={score_data['iv_1m']:.2f}/{score_data['iv_fwd']:.2f} "
                         f"K={nc.Strike:.0f} c=${estimated_cost * n_contracts:,.0f}")
            return True
        except Exception as e:
            self.Log(f"  ERROR entering {eq_sym.Value}: {e}")
            return False

    # --- Stop loss ---

    def _mtm_pnl(self, pos):
        return sum(self.Portfolio[s].UnrealizedProfit
                   for s in (pos.near_call, pos.near_put, pos.far_call, pos.far_put))

    def _check_stop_loss(self, pos, today):
        # Catastrophic-MTM brake only. The vol-scaled directional move-stop was
        # removed: it exits on underlying direction, which is anti-thesis for a
        # volatility trade and prevented the hold-to-near-expiry harvest the
        # strategy depends on (was 24/36 closes, only 2/42 reached expiry).
        if (today - pos.entry_date).days < self.STOP_GRACE_DAYS:
            return None
        slot_budget = self.TOTAL_CAPITAL / self.MAX_POSITIONS
        mtm = self._mtm_pnl(pos)
        if mtm < -self.STOP_LOSS_PCT * slot_budget:
            return f"pnl=${mtm:,.0f} < -{self.STOP_LOSS_PCT:.0%}×${slot_budget:,.0f}"
        return None

    def _stop_out(self, pos, reason):
        self._record_close(pos, 'stop')
        if self._verbose_entries:
            self.Log(f"STOP {pos.eq_sym.Value} {reason}")
        canonical = self._option_cache.get(pos.eq_sym)
        closed_via_combo = False
        if canonical is not None:
            try:
                near_strike = pos.near_call.ID.StrikePrice
                far_strike  = pos.far_call.ID.StrikePrice
                near_exp    = pos.near_call.ID.Date
                far_exp     = pos.far_call.ID.Date
                short_near  = OptionStrategies.short_straddle(canonical, near_strike, near_exp)
                long_far    = OptionStrategies.straddle(canonical, far_strike, far_exp)
                self.Sell(short_near, pos.n_contracts)   # close short straddle
                self.Sell(long_far,   pos.n_contracts)   # close long straddle
                closed_via_combo = True
            except Exception as e:
                self.Log(f"  combo close failed ({e}); per-leg fallback")
        if not closed_via_combo:
            for s in (pos.near_call, pos.near_put, pos.far_call, pos.far_put):
                if self.Portfolio[s].Invested:
                    self.Liquidate(s, "STOP OUT fallback")
        self._close_position(pos)

    # --- Scoring engine ---

    def _score_universe(self, today, diagnostic=False):
        scores  = {}
        dropped = {
            'no_price':          0,
            'exception':         [],
            'no_option_sub':     [],
            'no_chain':          [],
            'low_volume':        [],
            'no_monthly_calls':  [],
            'no_near_calls':     [],
            'no_far_calls':      [],
            'bad_iv':            [],
            'bad_tau':           [],
            'negative_fwd_var':  [],
            'no_near_put':       [],
            'no_far_put':        [],
            'bid_ask_too_wide':  [],
            'passed':            [],
        } if diagnostic else None

        for sym in self._active_equities:
            sec = self.ActiveSecurities.get(sym)
            if sec is None or not sec.Price > 0:
                if diagnostic:
                    dropped['no_price'] += 1
                continue
            try:
                result = self._compute_fva_spread(sym, today,
                                                  dropped=dropped, diagnostic=diagnostic)
                if result is not None:
                    scores[sym] = result
                    if diagnostic:
                        dropped['passed'].append(sym.Value)
            except Exception as e:
                if diagnostic:
                    dropped['exception'].append((sym.Value, str(e)))

        if diagnostic:
            self.Log("=" * 60)
            self.Log(f"DIAGNOSTIC SCORE REPORT — {today}")
            self.Log(f"  Universe equities       : {len(self._active_equities)}")
            self.Log(f"  Skipped (no price)      : {dropped['no_price']}")
            self.Log(f"  Exceptions              : {len(dropped['exception'])}")
            for t, e in dropped['exception'][:5]:
                self.Log(f"    {t}: {e}")
            for label, key, show in [
                ("No option subscription", "no_option_sub",    5),
                ("Chain absent/empty",     "no_chain",          5),
                ("Low option volume",      "low_volume",        5),
                ("No standard monthly",    "no_monthly_calls",  5),
                ("No near calls (DTE)",    "no_near_calls",     5),
                ("No far calls (DTE)",     "no_far_calls",      5),
                ("Bad IV (<=0)",           "bad_iv",            5),
                ("Bad tau (tau2<=tau1)",   "bad_tau",           5),
                ("Negative fwd variance",  "negative_fwd_var",  5),
                ("No ATM near put",        "no_near_put",       5),
                ("No ATM far put",         "no_far_put",        5),
                ("Bid-ask too wide",       "bid_ask_too_wide",  5),
                ("PASSED all filters",     "passed",           10),
            ]:
                self.Log(f"  {label:<24}: {len(dropped[key])} — {dropped[key][:show]}")
            self.Log("=" * 60)

        return scores

    def _compute_fva_spread(self, eq_sym, today, dropped=None, diagnostic=False):
        ticker = eq_sym.Value if diagnostic else None

        def drop(bucket, detail=None):
            if diagnostic and dropped is not None:
                entry = ticker if detail is None else f"{ticker}({detail})"
                dropped[bucket].append(entry)
            return None

        option_sym = self._get_option_subscription(eq_sym)
        if option_sym is None:
            return drop('no_option_sub')

        chain = self.CurrentSlice.OptionChains.get(option_sym)
        if chain is None or not chain.Contracts:
            return drop('no_chain')

        contracts        = list(chain.Contracts.Values)
        underlying_price = chain.Underlying.Price
        if underlying_price <= 0:
            return drop('no_chain', f'price={underlying_price}')

        if self.MIN_OPTION_VOLUME > 0:
            total_vol = sum(c.Volume for c in contracts)
            if total_vol < self.MIN_OPTION_VOLUME:
                return drop('low_volume', f'vol={total_vol}')

        # Single pass: bucket calls (by near/far DTE) + puts (by expiry)
        near_calls, far_calls = [], []
        monthly_call_expiries = set()
        puts_by_expiry = defaultdict(list)

        for c in contracts:
            expiry = c.Expiry.date()
            if c.Right == OptionRight.Call:
                if c.ImpliedVolatility > 0 and c.BidPrice > 0 and self._is_standard_monthly(expiry):
                    monthly_call_expiries.add(expiry)
                    dte = (expiry - today).days
                    if self.NEAR_DTE_MIN <= dte <= self.NEAR_DTE_MAX:
                        near_calls.append(c)
                    elif self.FAR_DTE_MIN <= dte <= self.FAR_DTE_MAX:
                        far_calls.append(c)
            else:
                if c.ImpliedVolatility > 0 and c.BidPrice > 0:
                    puts_by_expiry[expiry].append(c)

        if not monthly_call_expiries:
            return drop('no_monthly_calls', f'total_contracts={len(contracts)}')
        if not near_calls:
            return drop('no_near_calls', f'monthly_expiries={sorted(monthly_call_expiries)}')
        if not far_calls:
            return drop('no_far_calls', f'monthly_expiries={sorted(monthly_call_expiries)}')

        near_atm = min(near_calls, key=lambda c: abs(c.Strike - underlying_price))
        far_atm  = min(far_calls,  key=lambda c: abs(c.Strike - underlying_price))

        iv_1m, iv_2m = near_atm.ImpliedVolatility, far_atm.ImpliedVolatility
        if iv_1m <= 0 or iv_2m <= 0:
            return drop('bad_iv', f'iv_1m={iv_1m:.4f} iv_2m={iv_2m:.4f}')

        tau1 = (near_atm.Expiry.date() - today).days
        tau2 = (far_atm.Expiry.date()  - today).days
        if tau2 <= tau1 or (tau2 - tau1) < 5:
            return drop('bad_tau', f'tau1={tau1} tau2={tau2}')

        fwd_var = (iv_2m**2 * tau2 - iv_1m**2 * tau1) / (tau2 - tau1)
        if fwd_var <= 0:
            return drop('negative_fwd_var',
                        f'fwd_var={fwd_var:.6f} iv1={iv_1m:.3f} iv2={iv_2m:.3f}')
        iv_fwd = np.sqrt(fwd_var)
        spread = (iv_1m / iv_fwd) - 1

        # Put at SAME strike as call → true straddle (required by OptionStrategies API)
        near_put = next((p for p in puts_by_expiry.get(near_atm.Expiry.date(), [])
                         if p.Strike == near_atm.Strike), None)
        if near_put is None:
            return drop('no_near_put', f'strike={near_atm.Strike}')
        far_put = next((p for p in puts_by_expiry.get(far_atm.Expiry.date(), [])
                        if p.Strike == far_atm.Strike), None)
        if far_put is None:
            return drop('no_far_put', f'strike={far_atm.Strike}')

        for leg in (near_atm, near_put, far_atm, far_put):
            mid = (leg.BidPrice + leg.AskPrice) / 2
            if mid <= 0:
                return drop('bid_ask_too_wide', f'{leg.Symbol.Value} mid=0')
            ba_pct = (leg.AskPrice - leg.BidPrice) / mid
            if ba_pct > self.MAX_BID_ASK_PCT:
                return drop('bid_ask_too_wide', f'{leg.Symbol.Value} ba%={ba_pct:.1%}')

        far_cost       = (far_atm.AskPrice  + far_put.AskPrice)  * 100
        near_credit    = (near_atm.BidPrice + near_put.BidPrice) * 100
        estimated_cost = max(far_cost - near_credit, 50)

        return {
            'spread': spread, 'iv_1m': iv_1m, 'iv_fwd': iv_fwd, 'iv_2m': iv_2m,
            'near_call': near_atm, 'far_call': far_atm,
            'near_put':  near_put, 'far_put':  far_put,
            'near_expiry': near_atm.Expiry.date(), 'far_expiry': far_atm.Expiry.date(),
            'estimated_cost': estimated_cost, 'underlying_price': underlying_price,
            'sector': self._get_sector(eq_sym, today),
        }

    # --- Filter helpers ---

    def _has_upcoming_earnings(self, eq_sym, today):
        # Heuristic: last EarningReports.FileDate + 90d ≈ next quarterly earnings.
        # Imprecise (~±2 weeks) but catches NFLX-style traps inside near-leg DTE.
        try:
            sec  = self.ActiveSecurities.get(eq_sym)
            fund = getattr(sec, 'Fundamentals', None) if sec else None
            er   = getattr(fund, 'EarningReports', None) if fund else None
            fd   = getattr(er, 'FileDate', None) if er else None
            val  = getattr(fd, 'Value', None) if fd is not None else None
            if val is None or not hasattr(val, 'date'):
                return False
            days_to = ((val.date() + timedelta(days=90)) - today).days
            return -3 <= days_to <= self.EARNINGS_BUFFER
        except Exception:
            return False

    def _get_sector(self, eq_sym, today):
        cached = self._sector_cache.get(eq_sym)
        if cached and (today - cached[0]).days < 5:
            return cached[1]
        try:
            sec = self.ActiveSecurities.get(eq_sym)
            if sec and sec.Fundamentals:
                code = str(sec.Fundamentals.AssetClassification.MorningstarSectorCode)
                self._sector_cache[eq_sym] = (today, code)
                return code
        except Exception:
            pass
        self._sector_cache[eq_sym] = (today, 'Unknown')
        return 'Unknown'

    # --- Helpers ---

    def _subscribe_options(self, eq_sym):
        # Resolution.Hour: 8 bars/day (vs 390 for Minute) → ~50× less option data
        # to process. Schedule fires at 15:45; the 15:30 hour-bar chain is in
        # CurrentSlice by then. Daily would not deliver chains intraday at all.
        # Pass Symbol (not .Value) to avoid canonical key mismatch.
        if eq_sym in self._option_cache:
            return
        try:
            opt = self.AddOption(eq_sym, Resolution.Hour)
            opt.SetFilter(lambda u: u
                          .Strikes(-1, +1)         # ATM ± 1 strike — minimum for matched-strike straddle
                          .Expiration(self.NEAR_DTE_MIN, self.FAR_DTE_MAX))
            self._option_cache[eq_sym] = opt.Symbol
        except Exception:
            self._option_cache[eq_sym] = None

    def _get_option_subscription(self, eq_sym):
        return self._option_cache.get(eq_sym)

    def _is_standard_monthly(self, d):
        # 3rd Friday = weekday==4 (Fri) and day in [15,21]. Cached.
        if d in self._monthly_cache:
            return self._monthly_cache[d]
        result = d.weekday() == 4 and 15 <= d.day <= 21
        self._monthly_cache[d] = result
        return result

    # --- Housekeeping ---

    def OnEndOfAlgorithm(self):
        # No rollover fires for the final (partial) month — flush it explicitly.
        if self._last_flush_month is not None:
            self._flush_monthly(self._last_flush_month)

    def OnSecuritiesChanged(self, changes):
        for added in changes.AddedSecurities:
            if added.Symbol.SecurityType == SecurityType.Equity:
                self._active_equities.add(added.Symbol)
                self._subscribe_options(added.Symbol)

        for removed in changes.RemovedSecurities:
            sym = removed.Symbol
            if sym.SecurityType != SecurityType.Equity:
                continue
            self._active_equities.discard(sym)

            # Close active position first (uses its leg subscriptions).
            if sym in self._positions:
                pos = self._positions[sym]
                self._record_close(pos, reason='rem')
                if self._verbose_entries:
                    self.Log(f"REM {sym.Value}")
                self._sell_far_leg(pos)
                for s in (pos.near_call, pos.near_put):
                    if self.Portfolio[s].Invested:
                        self.Liquidate(s, "Universe removal")
                self._close_position(pos)

            # Release chain subscription + per-symbol caches. Without this,
            # stale option chains accumulate in DataManager (130+ over 6mo
            # of universe churn) and trigger OutOfMemoryException.
            # Safe inside OnSecuritiesChanged (data-engine context); avoid
            # calling Remove/AddSecurity from scheduled events.
            self._sector_cache.pop(sym, None)
            canonical = self._option_cache.pop(sym, None)
            if canonical is not None:
                try:
                    self.RemoveSecurity(canonical)
                except Exception:
                    pass   # no-op if API rejects; no worse than prior leak