Overall Statistics
Total Orders
9128
Average Win
0.61%
Average Loss
-0.32%
Compounding Annual Return
42.985%
Drawdown
25.200%
Expectancy
0.156
Start Equity
10000000
End Equity
85707877.8
Net Profit
757.079%
Sharpe Ratio
1.179
Sortino Ratio
1.837
Probabilistic Sharpe Ratio
67.463%
Loss Rate
61%
Win Rate
39%
Profit-Loss Ratio
1.93
Alpha
0.279
Beta
-0.064
Annual Standard Deviation
0.232
Annual Variance
0.054
Information Ratio
0.666
Tracking Error
0.296
Treynor Ratio
-4.292
Total Fees
$5197104.70
Estimated Strategy Capacity
$53000000.00
Lowest Capacity Asset
YM YTG30NVEFBQD
Portfolio Turnover
670.11%
# region imports
from AlgorithmImports import *
# endregion


class NoiseAreaBreakoutAlgorithm(QCAlgorithm):

    def initialize(self):
        self.set_start_date(2019, 5, 1)
        self.set_end_date(2025, 5, 1)
        self.set_cash(10_000_000)
        # Set some parameters.
        self._trading_interval_length = timedelta(minutes=30)
        self._lookback = 14  # days
        self._max_exposure = 4  # 4 = 400%
        self._max_positions = 3
        # Add a universe of Futures contracts to trade.
        self._futures = []
        tickers = [
            Futures.Indices.SP_500_E_MINI,
            Futures.Indices.NASDAQ_100_E_MINI,
            Futures.Indices.DOW_30_E_MINI,
        ]
        for ticker in tickers:
            future = self.add_future(
                ticker,
                data_mapping_mode=DataMappingMode.LAST_TRADING_DAY,
                data_normalization_mode=DataNormalizationMode.BACKWARDS_PANAMA_CANAL,
                contract_depth_offset=0
            )
            future.set_filter(0, 180)
            future.vwap = self.vwap(future.symbol)
            future.daily_volatility = IndicatorExtensions.of(StandardDeviation(self._lookback), self.roc(future.symbol, 1, Resolution.DAILY))
            future.mean_volatility = IndicatorExtensions.sma(future.daily_volatility, 252)
            future.avg_move_by_interval = {}
            future.yesterdays_close = None
            future.todays_open = None
            future.long_kelly_criterion = KellyCriterion(1.25, 50)
            future.short_kelly_criterion = KellyCriterion(1.25, 50)
            self._futures.append(future)
            # Add Scheduled Events.
            date_rule = self.date_rules.every_day(future.symbol)
            self.schedule.on(date_rule, self.time_rules.after_market_close(future.symbol, 1), lambda future=future: setattr(future, 'yesterdays_close', future.price))
            self.schedule.on(date_rule, self.time_rules.after_market_open(future.symbol, 1), lambda future=future: setattr(future, 'todays_open', future.open))
            self.schedule.on(date_rule, self.time_rules.before_market_close(future.symbol, 1), lambda future=future: self._exit(future))
        self.schedule.on(self.date_rules.every_day(), self.time_rules.every(self._trading_interval_length), self._rebalance)
        # Set a warm-up period to warm-up the indicators and Kelly Criterion.
        self.set_warm_up(timedelta(365*2))

    def _rebalance(self):
        t = self.time
        trading_interval = (t.hour, t.minute)
        for future in self._futures:
            # Wait until the market is open.
            if (not future.yesterdays_close or
                not future.todays_open or
                not future.exchange.hours.is_open(t, False) or
                not future.exchange.hours.is_open(t - self._trading_interval_length, False)):
                continue
            # Create an indicator for this time interval if it doesn't already exist.
            if trading_interval not in future.avg_move_by_interval:
                future.avg_move_by_interval[trading_interval] = SimpleMovingAverage(self._lookback)
            avg_move = future.avg_move_by_interval[trading_interval]
            # Update the average move indicator.
            move = abs(future.price / future.todays_open - 1)
            if not avg_move.update(t, move):
                continue
            # Wait until the volatility indicators are ready.
            if (not future.mean_volatility.is_ready or not future.daily_volatility.current.value):
                continue
            # Calculate the noise area.
            upper_bound = max(future.yesterdays_close, future.todays_open) * (1+avg_move.current.value)
            lower_bound = min(future.yesterdays_close, future.todays_open) * (1-avg_move.current.value)
            # Scan for a long entry.
            contract = self.securities[future.mapped]
            if not future.long_kelly_criterion.signal and future.price > upper_bound:
                if contract.holdings.is_short:
                    future.short_kelly_criterion.update_signal(0, -contract.ask_price)
                future.long_kelly_criterion.update_signal(1, contract.ask_price)
            # Scan for a short entry.
            elif not future.short_kelly_criterion.signal and future.price < lower_bound:
                if contract.holdings.is_long:
                    future.long_kelly_criterion.update_signal(0, contract.bid_price)
                future.short_kelly_criterion.update_signal(1, -contract.bid_price)
            # Scan for a long exit.
            elif future.long_kelly_criterion.signal and future.price < max(upper_bound, future.vwap.current.value):
                future.long_kelly_criterion.update_signal(0, contract.bid_price)
            # Scan for a short exit.
            elif future.short_kelly_criterion.signal and future.price > min(lower_bound, future.vwap.current.value):
                future.short_kelly_criterion.update_signal(0, -contract.ask_price)

        # Wait until the warm-up period is over.
        if self.is_warming_up:
            return
        # Create portfolio targets.
        entry_targets = []
        exit_targets = []
        holds = 0
        for future in self._futures:
            contract = self.securities[future.mapped]
            # Wait until the KC objects are ready.
            if not (future.long_kelly_criterion.is_ready and future.short_kelly_criterion.is_ready):
                continue
            # If the signals have ended, liquidate the position.
            elif not future.long_kelly_criterion.signal and not future.short_kelly_criterion.signal and contract.invested:
                exit_targets.append(PortfolioTarget(contract.symbol, 0))
            elif future.long_kelly_criterion.signal and not contract.holdings.is_long or future.short_kelly_criterion.signal and not contract.holdings.is_short:
                # Calculate the weight.
                kc = (future.long_kelly_criterion if future.long_kelly_criterion.signal else future.short_kelly_criterion).weight() * (-1 if future.short_kelly_criterion.signal else 1)
                weight = (
                    min(self._max_exposure, future.mean_volatility.current.value/future.daily_volatility.current.value) 
                    / self._max_exposure 
                    * max(-1, min(1, kc))
                    / self._max_positions
                ) 
                if not weight:
                    continue
                # Create the target.
                target = PortfolioTarget(contract.symbol, weight)
                if not contract.invested:
                    entry_targets = entry_targets + [target]
                else:
                    # When flipping from long to short (or vice versa), insert at the beginning of the list to ensure the position changes.
                    entry_targets = [target] + entry_targets
            elif (future.long_kelly_criterion.signal or future.short_kelly_criterion.signal) and contract.invested:
                holds += 1
        # Place trades to rebalance the portfolio without exceeding the maximum number of positions.
        self.set_holdings(entry_targets[:self._max_positions-holds] + exit_targets)

    def _exit(self, future):
        contract = self.securities[future.mapped]
        if future.long_kelly_criterion.signal:
            future.long_kelly_criterion.update_signal(0, contract.bid_price)
        if future.short_kelly_criterion.signal:
            future.short_kelly_criterion.update_signal(0, -contract.ask_price)
        if not self.is_warming_up:
            self.liquidate(future.mapped)


class KellyCriterion:

    def __init__(self, factor, period):
        self.signal = 0
        self._factor = factor
        self._period = period
        self._trades = np.array([])

    def update_signal(self, signal, price):
        if signal: # Enter
            self._entry_price = price
        else: # Exit
            self._trades = np.append(self._trades, [price - self._entry_price])[-self._period:]
        self.signal = signal

    def weight(self):
        # Wait until there are enough trade samples.
        if not self.is_ready:
            return None
        # Calculate the Kelly %.
        wins = self._trades[self._trades > 0]
        losses = self._trades[self._trades < 0]
        if not losses.sum():
            return self._factor
        if not wins.sum():
            return 0
        win_loss_ratio = wins.mean() / losses.mean()
        winning_probability = len(wins) / self._period
        return self._factor*max(0, winning_probability - (1-winning_probability)/win_loss_ratio)
    
    @property
    def is_ready(self):
        return len(self._trades) == self._period