Overall Statistics
Total Orders
347
Average Win
4.28%
Average Loss
-2.45%
Compounding Annual Return
11.414%
Drawdown
32.600%
Expectancy
0.271
Start Equity
1000000
End Equity
2683193
Net Profit
168.319%
Sharpe Ratio
0.438
Sortino Ratio
0.384
Probabilistic Sharpe Ratio
6.193%
Loss Rate
54%
Win Rate
46%
Profit-Loss Ratio
1.75
Alpha
0.049
Beta
0.188
Annual Standard Deviation
0.15
Annual Variance
0.022
Information Ratio
-0.113
Tracking Error
0.19
Treynor Ratio
0.35
Total Fees
$0.00
Estimated Strategy Capacity
$3600000.00
Lowest Capacity Asset
SPX 32OCJW1R54RXQ|SPX 31
Portfolio Turnover
0.72%
# region imports
from AlgorithmImports import *

from itertools import groupby
from sklearn.cluster import KMeans
# endregion

class IVRankClustersAlgorithm(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2016, 1, 1)
        self.set_end_date(2025, 2, 14)
        self.set_cash(1_000_000)
        self._gld = self.add_equity('GLD')
        self._option = self.add_index_option('SPX')
        self._option.set_filter(lambda universe: universe.include_weeklys().expiration(30, 90).strikes(-1, 1))
        self._option.iv_rank = IVRank()
        self._strategy = None
        self.schedule.on(self.date_rules.every_day(self._gld.symbol), self.time_rules.after_market_open(self._gld.symbol, 1), self._rebalance)
        self.set_warm_up(timedelta(365))  # Warm-up the IV Rank indicator.

    def _rebalance(self):
        # Update the IV Rank indicator.
        chain = self.current_slice.option_chains.get(self._option.symbol)
        if not chain or not self._option.iv_rank.update(chain) or self.is_warming_up:
            return
        self.plot('IV Rank', 'Value', self._option.iv_rank.value)
        self.plot('IV Rank', 'Label', self._option.iv_rank.label)
        # If IV Rank is low/moderate, we expect low/moderate volatility in the future. 
        # Sell ATM put contracts to collect premium. They should expire OTM since SPX has upward drift.
        if self._option.iv_rank.label < 2 and (self._strategy != 'risk' or not self.portfolio.invested):
            # Select the contracts with the furthest expiry in the chain.
            expiry = max([c.expiry for c in chain if c.expiry])
            chain = [c for c in chain if c.expiry == expiry]
            #  Select the put contracts.
            chain = [c for c in chain if c.right == OptionRight.PUT]
            #  Select the contract with the lowest strike price. Lower strike => more likely to expire OTM.
            strike = min([c.strike for c in chain])
            contract = [c for c in chain if c.strike == strike][0]
            # Rotate out of the safety strategy.
            self.liquidate()
            self._strategy = 'risk'
            # Sell the contract.
            self.set_holdings(contract.symbol, -0.5)
        # If IV Rank is high, the future is expected to be volatile. Flight to saftey => hold gold.
        elif self._option.iv_rank.label == 2 and (self._strategy != 'safety' or not self.portfolio.invested):
            # Rotate out of the risk-on strategy.
            self.liquidate()
            
            # Drop contracts that have no mirror.
            key = lambda c: (c.expiry, c.strike)
            sorted_chain = sorted(chain, key=key)
            contracts_with_mirror = []
            for _, group in groupby(sorted_chain, key=key):
                group_list = list(group)  # Convert group iterator to list
                if len(group_list) == 2:  # Only keep groups with exactly 2 items
                    contracts_with_mirror.extend(group_list)
            chain = contracts_with_mirror
            
            expiries = [c.expiry for c in chain if c.expiry]
            if not expiries:
                return
            expiry = max(expiries)
            chain = [c for c in chain if c.expiry == expiry]
            strike = min([c.strike for c in chain])

            self._strategy = 'safety'
            # Buy the straddle.
            self.buy(OptionStrategies.straddle(self._option.symbol, strike, expiry), 10)


class IVRank:

    def __init__(self, lookback=252, min_expiry=30):
        self._min_iv = Minimum(lookback)
        self._max_iv = Maximum(lookback)
        self._min_expiry = timedelta(min_expiry)
        self._history = RollingWindow[float](lookback)

    def update(self, chain):
        # Select contracts to use in the aggregation.
        #  1) Contracts have the closest expiry after 1 month.
        expiries = [c.id.date for c in chain if c.id.date >= chain.end_time + self._min_expiry]
        if not expiries:
            return
        expiry = min([c.id.date for c in chain if c.id.date >= chain.end_time + self._min_expiry])
        contracts = [c for c in chain if c.id.date == expiry]
        #  2) ATM contracts.
        abs_delta_by_symbol = {c.symbol: abs(c.underlying_last_price - c.id.strike_price) for c in contracts}
        abs_delta = min(abs_delta_by_symbol.values())
        contracts = [c for c in contracts if abs_delta_by_symbol[c.symbol] == abs_delta]

        # Aggregate the IVs of the selected contracts.
        agg_iv = float(np.median([c.implied_volatility for c in contracts]))
        self._history.add(agg_iv)

        # Calculate the IV Rank and determine if it's high, medium, or low.
        self._min_iv.update(chain.end_time, agg_iv)
        self.is_ready = self._max_iv.update(chain.end_time, agg_iv)
        if self.is_ready:
            self.value = float((agg_iv - self._min_iv.current.value) / (self._max_iv.current.value - self._min_iv.current.value))
            # Cluster the trailing IV Rank values into high, medium, and low groups.
            kmeans = KMeans(n_clusters=3, random_state=0).fit(np.array(list(self._history)[::-1]).reshape(-1, 1))
            # Get the cluster labels.
            labels = kmeans.labels_
            # Update the labels so that 0=Low, 1=Medium, 2=High.
            label_map = {original: sorted_ for sorted_, original in enumerate(np.argsort(kmeans.cluster_centers_.ravel()))}
            labels = [label_map[label] for label in labels]
            # Save the label of the current value.
            self.label = labels[-1] # 0=Low, 1=Medium, 2=High
        return self.is_ready