Overall Statistics
Total Orders
277
Average Win
2.31%
Average Loss
-3.07%
Compounding Annual Return
16.137%
Drawdown
24.300%
Expectancy
0.321
Start Equity
1000000
End Equity
3920025.5
Net Profit
292.003%
Sharpe Ratio
0.812
Sortino Ratio
0.51
Probabilistic Sharpe Ratio
45.866%
Loss Rate
25%
Win Rate
75%
Profit-Loss Ratio
0.75
Alpha
0.061
Beta
0.36
Annual Standard Deviation
0.113
Annual Variance
0.013
Information Ratio
0.035
Tracking Error
0.138
Treynor Ratio
0.256
Total Fees
$0.00
Estimated Strategy Capacity
$420000.00
Lowest Capacity Asset
SPX 32OCJW260YWDQ|SPX 31
Portfolio Turnover
0.15%
# region imports
from AlgorithmImports import *

from itertools import groupby
from sklearn.cluster import KMeans
# endregion


# Standard deviation of price
#  in _rebalance, std / 5 to get number of strikes below. Use Option chain provider.

#  Optimize the lower strike in the algorithm that has -60 to see how overfit it is 

class IVRankClustersAlgorithm(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2016, 1, 1)
        self.set_end_date(2025, 2, 14)
        self.set_cash(1_000_000)
        self.set_security_initializer(BrokerageModelSecurityInitializer(self.brokerage_model, FuncSecuritySeeder(self.get_last_known_prices)))
        self._index = self.add_index('SPX')
        #self._index.std = IndicatorExtensions.of(StandardDeviation(252), self.roc(self._index.symbol, 1, Resolution.DAILY))
        self._index.std = self.std(self._index.symbol, 22, Resolution.DAILY)
        self._index.std_factor = 2
        self._option = self.add_index_option('SPX')
        self._option.set_filter(lambda universe: universe.include_weeklys().expiration(30, 90).strikes(-1, 1))  
        self._option.iv_rank = IVRank()
        self._strategy = None
        self.schedule.on(self.date_rules.every_day(self._index.symbol), self.time_rules.after_market_open(self._index.symbol, 1), self._rebalance)
        self.set_warm_up(timedelta(365))  # Warm-up the IV Rank indicator.

    def _rebalance(self):
        data = self.current_slice
        # Update the IV Rank indicator.
        chain = data.option_chains.get(self._option.symbol)
        if not chain or not self._option.iv_rank.update(chain) or self.is_warming_up:
            return
        self.plot('IV Rank', 'Value', self._option.iv_rank.value)
        self.plot('IV Rank', 'Label', self._option.iv_rank.label)
        #self.plot('STD', 'Price', self._index.price)
        #self.plot('STD', f'Price - {self._index.std_factor}SD', self._index.price * (1 - self._index.std_factor*self._index.std.current.value))
        # If IV Rank is low/moderate, we expect low/moderate volatility in the future. 
        # Sell ATM put contracts to collect premium. They should expire OTM since SPX has upward drift.
        if self._option.iv_rank.label < 2 and (self._strategy != 'risk' or not self.portfolio.invested):
            # Select the contract (put contract; closest expiry after 30 days; n STD(price, 22 days) below current price)
            chain = self.option_chain(self._index.symbol, flatten=True).data_frame
            if chain.empty:
                return
            expiry = chain.expiry[self.time - chain.expiry <= timedelta(30)].min()
            contract_symbol = chain[
                (chain.expiry == expiry) &
                (chain.right == OptionRight.PUT) &
                (chain.strike <= self._index.price)
            ].sort_values('strike').index[-int(self._index.std_factor*self._index.std.current.value/5)]
            self.add_option_contract(contract_symbol)
            # Rotate out of the safety strategy.
            self._strategy = 'risk'
            # Sell the contract.
            self.set_holdings(contract_symbol, -0.5)
        # If IV Rank is high, the future is expected to be volatile. 
        elif self._option.iv_rank.label == 2 and (self._strategy != 'safety' or not self.portfolio.invested):
            # Rotate out of the risk-on strategy.
            self.liquidate()
            self._strategy = 'safety'


class IVRank:

    def __init__(self, lookback=252, min_expiry=30):
        self._min_iv = Minimum(lookback)
        self._max_iv = Maximum(lookback)
        self._min_expiry = timedelta(min_expiry)
        self._history = RollingWindow[float](lookback)

    def update(self, chain):
        # Select contracts to use in the aggregation.
        #  1) Contracts have the closest expiry after 1 month.
        expiries = [c.id.date for c in chain if c.id.date >= chain.end_time + self._min_expiry]
        if not expiries:
            return
        expiry = min([c.id.date for c in chain if c.id.date >= chain.end_time + self._min_expiry])
        contracts = [c for c in chain if c.id.date == expiry]
        #  2) ATM contracts.
        abs_delta_by_symbol = {c.symbol: abs(c.underlying_last_price - c.id.strike_price) for c in contracts}
        abs_delta = min(abs_delta_by_symbol.values())
        contracts = [c for c in contracts if abs_delta_by_symbol[c.symbol] == abs_delta]

        # Aggregate the IVs of the selected contracts.
        agg_iv = float(np.median([c.implied_volatility for c in contracts]))
        self._history.add(agg_iv)

        # Calculate the IV Rank and determine if it's high, medium, or low.
        self._min_iv.update(chain.end_time, agg_iv)
        self.is_ready = self._max_iv.update(chain.end_time, agg_iv)
        if self.is_ready:
            self.value = float((agg_iv - self._min_iv.current.value) / (self._max_iv.current.value - self._min_iv.current.value))
            # Cluster the trailing IV Rank values into high, medium, and low groups.
            kmeans = KMeans(n_clusters=3, random_state=0).fit(np.array(list(self._history)[::-1]).reshape(-1, 1))
            # Get the cluster labels.
            labels = kmeans.labels_
            # Update the labels so that 0=Low, 1=Medium, 2=High.
            label_map = {original: sorted_ for sorted_, original in enumerate(np.argsort(kmeans.cluster_centers_.ravel()))}
            labels = [label_map[label] for label in labels]
            # Save the label of the current value.
            self.label = labels[-1] # 0=Low, 1=Medium, 2=High
        return self.is_ready