Overall Statistics
Total Orders
3800
Average Win
0.57%
Average Loss
-0.64%
Compounding Annual Return
8.672%
Drawdown
38.000%
Expectancy
0.148
Start Equity
100000
End Equity
824686.59
Net Profit
724.687%
Sharpe Ratio
0.356
Sortino Ratio
0.378
Probabilistic Sharpe Ratio
0.182%
Loss Rate
39%
Win Rate
61%
Profit-Loss Ratio
0.88
Alpha
0.02
Beta
0.594
Annual Standard Deviation
0.125
Annual Variance
0.016
Information Ratio
0.035
Tracking Error
0.105
Treynor Ratio
0.075
Total Fees
$20108.34
Estimated Strategy Capacity
$12000000.00
Lowest Capacity Asset
IEF SGNKIKYGE9NP
Portfolio Turnover
17.82%
#region imports
from AlgorithmImports import *
from abc import ABC, abstractmethod
#endregion

class Charts(ABC):
    def __init__(
        self, 
        algo: QCAlgorithm, 
        chart_name: str, 
        series_list: List[Dict]
    ) -> None:

        self._algo: QCAlgorithm = algo
        self._chart_name: str = chart_name
        self._series_list: List[Dict] = series_list

        self._add_chart()
        self._scheduler()

    def _add_chart(self) -> None:
        chart = Chart(self._chart_name)
        self._algo.add_chart(chart)

        for series in self._series_list:
            chart.AddSeries(Series(series['name'], series['type'], series['unit']))

    def _scheduler(self) -> None:
        algo: QCAlgorithm = self._algo
        algo.schedule.on(
            algo.date_rules.every_day('SPY'), 
            algo.time_rules.before_market_close('SPY', 0), 
            self._update_chart
        )

    @abstractmethod
    def _update_chart(self):
        pass

class Benchmark(Charts):
    def __init__(
        self, 
        algo: QCAlgorithm, 
        benchmark_symbol: Symbol,
        init_cash: int
    ) -> None:

        series_list: List[Dict] = [
            {
            'name': f'Benchmark {benchmark_symbol.value}', 
            'type': SeriesType.LINE, 
            'unit': '$'
            }
        ]

        self._benchmark_values: List[float] = []
        self._benchmark_symbol: Symbol = benchmark_symbol
        self._init_cash: int = init_cash

        super().__init__(algo = algo, chart_name = 'Strategy Equity', series_list = series_list)
    
    def _update_chart(self):
        # wait for available data
        if not self._algo.portfolio.invested:
            return

        # print benchmark in main equity plot
        mkt_price_df: DataFrame = self._algo.history(self._benchmark_symbol, 2, Resolution.DAILY)
        if not mkt_price_df.empty:
            benchmark_price: float = mkt_price_df['close'].unstack(level= 0).iloc[-1]
            if len(self._benchmark_values) == 2:
                self._benchmark_values[-1] = benchmark_price
                benchmark_perf: float = self._init_cash * (self._benchmark_values[-1] / self._benchmark_values[0])
                self._algo.plot(self._chart_name, self._series_list[0]['name'], benchmark_perf)
            else:
                self._benchmark_values.append(benchmark_price)

class VIXFilterValue(Charts):
    def __init__(
        self, 
        algo: QCAlgorithm, 
        vix_filter
    ) -> None:

        series_list: List[Dict] = [
            {
            'name': vix_filter.name, 
            'type': SeriesType.LINE, 
            'unit': ''
            }
        ]

        self._vix_filter: VIXFilter = vix_filter

        super().__init__(algo = algo, chart_name = 'VIX Filter', series_list = series_list)
    
    def _update_chart(self):
        if self._vix_filter.is_ready:
            _, filter_value = self._vix_filter.get_value()
            self._algo.plot(self._chart_name, self._series_list[0]['name'], filter_value)
# region imports
from AlgorithmImports import *
from enum import Enum
from abc import ABC, abstractmethod
from pandas.core.frame import DataFrame
# endregion

class FilterType(Enum):
    VVIX = 1
    INDEX_RANK = 2
    INDEX_RATIO = 3

class IndexFilter(ABC):
    def __init__(
        self,
        algo: QCAlgorithm, 
        filter_type: FilterType, 
        filter_value_threshold: float, 
        compare_fn: Callable
    ) -> None:

        self._algo: QCAlgorithm = algo
        self._filter_type: FilterType = filter_type
        self._filter_value_threshold: float = filter_value_threshold
        self._compare_fn: Callable = compare_fn
        
        self._signal_assets: List[Symbol] = self._subscribe_assets()

    @property
    def name(self) -> str:
        return self._filter_type.name

    @property
    def is_ready(self) -> bool:
        return all(
            self._algo.securities.contains_key(sa) and self._algo.securities[sa].get_last_data() and self._algo.securities[sa].price != 0 for sa in self._signal_assets
        )
    
    @abstractmethod
    def _subscribe_assets(self) -> List[Symbol]:
        ...
    
    @abstractmethod
    def trade_signal(self) -> bool:
        ...

    @abstractmethod
    def _get_rank(self, vix: Symbol, lookback: int = 150) -> float:
        ...
    
    @abstractmethod
    def get_value(self) -> Tuple[str, float]:
        ...

class VIXFilter(IndexFilter):
    def __init__(
        self,
        algo: QCAlgorithm, 
        filter_type: FilterType, 
        filter_value_threshold: float, 
        compare_fn: Callable
    ) -> None:

        super(VIXFilter, self).__init__(algo, filter_type, filter_value_threshold, compare_fn)

    def get_value(self) -> Tuple[str, float]:
        if self._filter_type == FilterType.VVIX:
            value: float = self._algo.securities[self._signal_assets[0]].price
        elif self._filter_type == FilterType.INDEX_RANK:
            value: float = self._get_rank(self._signal_assets[0])
        elif self._filter_type == FilterType.INDEX_RATIO:
            value: float = self._algo.securities[self._signal_assets[0]].price / self._algo.securities[self._signal_assets[1]].price
        
        return self._filter_type.name, value

    def _get_rank(self, index: Symbol, lookback: int = 150) -> float:
        history: DataFrame = self._algo.history(
            CBOE, index, lookback, Resolution.DAILY
        )
        
        rank: float = ((self._algo.securities[index].price - min(history["low"])) / (max(history["high"]) - min(history["low"])))
        return rank

    def _subscribe_assets(self) -> List[Symbol]:
        signal_assets: List[Symbol] = []

        if self._filter_type == FilterType.VVIX:
            iv: Symbol = self._algo.add_index("VVIX", Resolution.MINUTE).symbol
            signal_assets = [iv]
        elif self._filter_type == FilterType.INDEX_RANK:
            iv: Symbol = self._algo.add_index("VIX", Resolution.MINUTE).symbol
            signal_assets = [iv]
        elif self._filter_type == FilterType.INDEX_RATIO:
            iv: Symbol = self._algo.add_index("VIX", Resolution.MINUTE).symbol
            iv_3m: Symbol = self._algo.add_index("VIX3M", Resolution.MINUTE).symbol
            signal_assets = [iv, iv_3m]
        
        return signal_assets

    def trade_signal(self) -> bool:
        result: bool = False

        if self._filter_type == FilterType.VVIX:
            vvix: float = self._algo.securities[self._signal_assets[0]].price
            if self._compare_fn(vvix, self._filter_value_threshold):
                result = True
        elif self._filter_type == FilterType.INDEX_RANK:
            vix_rank: float = self._get_rank(self._signal_assets[0])
            if self._compare_fn(vix_rank, self._filter_value_threshold):
                result = True
        elif self._filter_type == FilterType.INDEX_RATIO:
            vix_ratio: float = self._algo.securities[self._signal_assets[0]].price / self._algo.securities[self._signal_assets[1]].price
            if self._compare_fn(vix_ratio, self._filter_value_threshold):
                result = True

        return result

class VSTOXXFilter(IndexFilter):
    def __init__(
        self,
        algo: QCAlgorithm, 
        filter_type: FilterType, 
        filter_value_threshold: float, 
        compare_fn: Callable
    ) -> None:

        super(VSTOXXFilter, self).__init__(algo, filter_type, filter_value_threshold, compare_fn)

        if self._filter_type == FilterType.VVIX:
            raise Exception(f'{self._filter_type.name} filter type not supported')

    def get_value(self) -> Tuple[str, float]:
        if self._filter_type == FilterType.VVIX:
            pass
        elif self._filter_type == FilterType.INDEX_RANK:
            value: float = self._get_rank(self._signal_assets[0])
        elif self._filter_type == FilterType.INDEX_RATIO:
            value: float = self._algo.securities[self._signal_assets[0]].price / self._algo.securities[self._signal_assets[1]].price
        
        return self._filter_type.name, value

    def _get_rank(self, index: Symbol, lookback: int = 150) -> float:
        history: DataFrame = self._algo.history(
            index, lookback, Resolution.DAILY
        )
        
        rank: float = ((self._algo.securities[index].price - min(history["close"])) / (max(history["close"]) - min(history["close"])))
        return rank

    def _subscribe_assets(self) -> List[Symbol]:
        signal_assets: List[Symbol] = []

        if self._filter_type == FilterType.VVIX:
            pass
        elif self._filter_type == FilterType.INDEX_RANK:
            index: Symbol = self._algo.add_data(VSTOXX, "VSTOXX", Resolution.DAILY).symbol
            signal_assets = [index]
        elif self._filter_type == FilterType.INDEX_RATIO:
            index: Symbol = self._algo.add_data(VSTOXX, "VSTOXX", Resolution.DAILY).symbol
            index_3m: Symbol = self._algo.add_data(VSTOXX, "VSTOXX3M", Resolution.DAILY).symbol
            signal_assets = [index, index_3m]
        
        return signal_assets

    def trade_signal(self) -> bool:
        result: bool = False

        if self._filter_type == FilterType.VVIX:
            pass
        elif self._filter_type == FilterType.INDEX_RANK:
            vix_rank: float = self._get_rank(self._signal_assets[0])
            if self._compare_fn(vix_rank, self._filter_value_threshold):
                result = True
        elif self._filter_type == FilterType.INDEX_RATIO:
            vix_ratio: float = self._algo.securities[self._signal_assets[0]].price / self._algo.securities[self._signal_assets[1]].price
            if self._compare_fn(vix_ratio, self._filter_value_threshold):
                result = True

        return result

class VSTOXX(PythonData):
    _last_update_date: Dict[str, datetime.date] = {}

    @staticmethod
    def get_last_update_date() -> Dict[str, datetime.date]:
       return VSTOXX._last_update_date

    def GetSource(self, config: SubscriptionDataConfig, date: datetime, is_live_mode: bool) -> SubscriptionDataSource:
        return SubscriptionDataSource("data.quantpedia.com/backtesting_data/index/{0}.csv".format(config.symbol.value), SubscriptionTransportMedium.REMOTE_FILE, FileFormat.CSV)

    def Reader(self, config: SubscriptionDataConfig, line: str, date: datetime, is_live_mode: bool) -> BaseData:
        data = VSTOXX()
        data.Symbol = config.Symbol
        
        if not line[0].isdigit(): return None
        split: List[str] = line.split(';')
        
        data.time = datetime.strptime(split[0], "%Y-%m-%d") + timedelta(days=1)
        data['close'] = float(split[1])
        data.value = float(split[1])

        # store last update date
        if config.symbol.value not in VSTOXX._last_update_date:
            VSTOXX._last_update_date[config.symbol.value] = datetime(1,1,1).date()

        if data.time.date() > VSTOXX._last_update_date[config.symbol.value]:
            VSTOXX._last_update_date[config.symbol.value] = data.Time.date()

        return data
# region imports
from AlgorithmImports import *
from index_filter import VIXFilter, FilterType, IndexFilter, VSTOXXFilter
from charts import Benchmark, VIXFilterValue
# endregion

class PortfolioWeighting(Enum):
    EW = 1
    INV_VOL = 2

class RiskOnOff(QCAlgorithm):

    # choose which strategy to trade
    _trade_TOM: bool = True
    _trade_risk_on_off: bool = True

    _before_close_min_offset: int = 5
    
    # TOM params
    _before_tom_d_offset: int = 5
    _after_tom_sell_days: List[int] = [4,5,6,9]

    # vix filter params for risk on/off strategy
    _filter_type: FilterType = FilterType.INDEX_RATIO
    _filter_threshold: float = 1.
    _filter_condition = staticmethod(lambda x, y: x < y) # meaning _vix_filter_type < _vix_filter_threshold => trade risk on symbols/s

    # ticker and direction (+1 or -1)
    # NOTE do not put the same symbols in here twice or more
    _risk_off: Dict[str, int] = {
        'IEF': 1,
    }

    # ticker and direction (+1 or -1) 
    # NOTE do not put the same symbols in here twice or more
    _risk_on: Dict[str, int] = {
        'SPY': +1,
        'IWM': +1,
    }

    # only relevant if _trade_risk_on_off is set to True
    _portfolio_weighting: int = PortfolioWeighting.EW
    _vol_period: int = 21 # only relevant with PortfolioWeighting.INV_VOL used
    
    _benchmark_ticker: str = 'SPY'

    def initialize(self) -> None:
        self.set_start_date(2000, 1, 1)
        self._init_cash: int = 100_000
        self.set_cash(self._init_cash)

        self._tickers: List[str] = list(
            set(list(self._risk_off.keys()) + list(self._risk_on.keys()))
        )

        for ticker in self._tickers:
            self.add_equity(ticker, Resolution.MINUTE)

        # self._vf: IndexFilter = VIXFilter(
        #     self, 
        #     self._filter_type,
        #     self._filter_threshold,
        #     RiskOnOff._filter_condition
        # )
        
        self._vf: IndexFilter = VSTOXXFilter(
            self, 
            self._filter_type,
            self._filter_threshold,
            RiskOnOff._filter_condition
        )

        if self._portfolio_weighting == PortfolioWeighting.INV_VOL:
            self._price_data: Dict[str, RollingWindow] = {
                ticker : RollingWindow[float](self._vol_period) for ticker in self._tickers
            }

        self._benchmark_symbol: Symbol = self.add_equity(self._benchmark_ticker, Resolution.MINUTE).symbol
        self._market: Symbol = self.add_equity('SPY', Resolution.MINUTE).symbol

        self._rebalance_flag: bool = False
        self.schedule.on(
            self.date_rules.every_day(self._market),
            self.time_rules.before_market_close(self._benchmark_symbol, self._before_close_min_offset),
            self._rebalance
        )

        self._TOM_is_traded: bool = False
        assert max(self._after_tom_sell_days) <= 10, 'maximum day offset is 10'
        assert all(np.diff(self._after_tom_sell_days) > 0), 'exit day series (self._after_tom_sell_days) should be ascending and number should be repeated'

        self._sell_day_count: float = len(self._after_tom_sell_days)
        self._quantity_portion_to_sell: int = 0

        self.schedule.on(
            self.date_rules.month_end(self._market, self._before_tom_d_offset - 1),
            self.time_rules.before_market_close(self._market, self._before_close_min_offset),
            self._open_trade
        )

        for d_offset in self._after_tom_sell_days:
            self.schedule.on(
                self.date_rules.month_start(self._market, d_offset - 1),
                self.time_rules.before_market_close(self._market, self._before_close_min_offset),
                self._close_trade
            )

        # charting
        Benchmark(self, self._benchmark_symbol, self._init_cash)
        VIXFilterValue(self, self._vf)

    def _open_trade(self) -> None:
        if not self._trade_TOM: return

        if self._trade_risk_on_off:
            if not self._vf.is_ready: return

        # open new trade
        if self.portfolio.invested:
            self.liquidate(tag='liquidate for TOM trade')
        
        self._quantity_portion_to_sell = 0

        q_to_trade: int = self.calculate_order_quantity(self._market, 1.)
        adjusted_q: int = (q_to_trade // self._sell_day_count) * self._sell_day_count   # adjust to nearest lower divisable quantity
        self.market_order(self._market, adjusted_q, tag='TOM_entry')
        self._TOM_is_traded = True
    
    def _close_trade(self) -> None:
        # close trade partially
        if self.portfolio[self._market].invested:
            self.market_order(
                self._market, 
                -self._quantity_portion_to_sell, 
                tag=f'portional exit weight: {1. / self._sell_day_count} of Q ({self._sell_day_count * self._quantity_portion_to_sell})'
            )
        
        # TOM full liquidation has been done
        if not self.portfolio[self._market].invested:
            self._TOM_is_traded = False

    def on_order_event(self, order_event: OrderEvent) -> None:
        order = self.transactions.get_order_by_id(order_event.order_id)
        if order_event.status == OrderStatus.FILLED:
            if 'TOM_entry' in order.tag:
                self._quantity_portion_to_sell = order.quantity / self._sell_day_count

    def _rebalance(self) -> None:
        self._rebalance_flag = True
    
    def on_data(self, slice: Slice) -> None:
        # daily rebalance
        if not self._rebalance_flag:
            return
        self._rebalance_flag = False

        if self._portfolio_weighting == PortfolioWeighting.INV_VOL:
            # store daily prices
            for ticker, price_data in self._price_data.items():
                if ticker in slice and slice[ticker]:
                    price_data.add(slice[ticker].value)

        if not self._trade_risk_on_off: return
        if self._TOM_is_traded: return

        # wait for index filter
        if not self._vf.is_ready: return
        
        # wait for std indicators
        if self._portfolio_weighting == PortfolioWeighting.INV_VOL:
            if not all(price_data.is_ready for price_data in self._price_data.values()):
                return

        # choose traded symbol/s
        traded_symbols: Dict[str, int] = self._risk_on if self._vf.trade_signal() else self._risk_off

        # weighting
        targets: List[PortfolioTarget] = []
        if self._portfolio_weighting == PortfolioWeighting.EW:
            targets = [
                PortfolioTarget(
                    self.symbol(ticker), (1. / len(traded_symbols)) * direction
                ) for ticker, direction in traded_symbols.items()
            ]
        elif self._portfolio_weighting == PortfolioWeighting.INV_VOL:
            inv_vol_sum: float = sum([1. / self._volatility(list(self._price_data[ticker])) for ticker in traded_symbols.keys()])
            targets = [
                PortfolioTarget(
                    self.symbol(ticker), ( (1. / self._volatility(list(self._price_data[ticker]))) / inv_vol_sum ) * direction
                ) for ticker, direction in traded_symbols.items()
            ]
        
        # execution
        self.set_holdings(targets, liquidate_existing_holdings=True, tag='Risk On Trade' if self._vf.trade_signal() else 'Risk Off Trade')

    def _volatility(self, daily_prices: List[float]) -> float:
        prices: np.ndarray = np.array(prices)
        returns: np.ndarray = prices[:-1] / prices[1:] - 1
        return np.std(returns)