Overall Statistics
Total Orders
7727
Average Win
0.90%
Average Loss
-0.46%
Compounding Annual Return
9.873%
Drawdown
32.800%
Expectancy
0.101
Start Equity
100000
End Equity
502124.80
Net Profit
402.125%
Sharpe Ratio
0.486
Sortino Ratio
0.598
Probabilistic Sharpe Ratio
2.200%
Loss Rate
63%
Win Rate
37%
Profit-Loss Ratio
1.97
Alpha
0.062
Beta
-0.066
Annual Standard Deviation
0.118
Annual Variance
0.014
Information Ratio
-0.04
Tracking Error
0.211
Treynor Ratio
-0.866
Total Fees
$57814.95
Estimated Strategy Capacity
$51000000.00
Lowest Capacity Asset
SPY R735QTJ8XC9X
Portfolio Turnover
311.76%
Drawdown Recovery
1541
# ──────────────────────────────────────────────────────────────────────
#  Beat the Market – Intraday Momentum Strategy for SPY
# ──────────────────────────────────────────────────────────────────────
#  Based on the paper by Carlo Zarattini & Mohamed Gabriel
#  (ConcretumGroup).  Converted from the original Polygon.io / pandas
#  vectorised back-test to QuantConnect Cloud LEAN.
#
#  Strategy overview
#  -----------------
#  1. Each minute, compute an intraday VWAP and track how far the
#     close has moved from the day's open (|close/open − 1|).
#  2. For every "minute of the day" slot, maintain a 14-day rolling
#     average of that move (sigma_open), lagged by one day.
#  3. Build dynamic upper / lower bands:
#        UB = max(open, prev_close_adj) × (1 + band_mult × sigma)
#        LB = min(open, prev_close_adj) × (1 − band_mult × sigma)
#  4. At every TRADE_FREQ-minute boundary, if close > UB AND
#     close > VWAP → go long; if close < LB AND close < VWAP →
#     go short; otherwise flatten.
#  5. Positions are sized via a daily vol-target model, capped at
#     MAX_LEVERAGE.  All positions are closed at the market close.
# ──────────────────────────────────────────────────────────────────────

from AlgorithmImports import *
from collections import deque, defaultdict
from datetime import timedelta
import numpy as np
import math


# ── Custom fee model matching the notebook ───────────────────────────
class CustomFeeModel(FeeModel):
    """$0.0035 per share, $0.35 minimum per order (IBKR-style)."""

    def GetOrderFee(self, parameters):
        qty = abs(parameters.Order.AbsoluteQuantity)
        fee = max(0.35, 0.0035 * qty)
        return OrderFee(CashAmount(fee, "USD"))


# ── Main algorithm ───────────────────────────────────────────────────
class BeatTheMarketMomentum(QCAlgorithm):

    # ================================================================
    #  Initialisation
    # ================================================================

    def Initialize(self):
        # Back-test window
        self.SetStartDate(2007, 4, 1)
        self.SetEndDate(2024, 5, 16)
        self.SetCash(100_000)

        # Equity subscription – raw prices so dividends are explicit
        equity = self.AddEquity("SPY", Resolution.Minute)
        equity.SetDataNormalizationMode(DataNormalizationMode.Raw)
        equity.SetFeeModel(CustomFeeModel())
        equity.SetFillModel(ImmediateFillModel())
        equity.SetSlippageModel(NullSlippageModel())
        equity.SetLeverage(5)                    # allow up to 4× + buffer
        self.spy = equity.Symbol
        self.SetBenchmark(self.spy)

        # ── Strategy parameters (mirror the notebook) ────────────
        self.BAND_MULT         = 1
        self.TRADE_FREQ        = 30              # signal eval every N min
        self.TARGET_VOL        = 0.02            # daily vol target
        self.MAX_LEVERAGE      = 4
        self.SIGMA_WINDOW      = 14              # rolling window for sigma
        self.SIGMA_MIN_PERIODS = 13              # min observations needed

        # ── Daily state ──────────────────────────────────────────
        self.current_date      = None
        self.today_open        = None
        self.prev_close_adj    = None            # previous close − dividend
        self.daily_vol         = float("nan")
        self.shares            = 0               # position size for the day
        self.pending_dividend  = 0.0

        # ── Historical tracking ──────────────────────────────────
        # End-of-day close prices (enough for 15-return vol window)
        self.eod_closes = deque(maxlen=20)

        # Per-minute-of-day history of |close / open − 1|
        self.move_open_history = defaultdict(
            lambda: deque(maxlen=self.SIGMA_WINDOW)
        )
        self.today_move_open = {}                # minute → move_open today

        # ── Intraday state ───────────────────────────────────────
        self.cum_vol_hlc       = 0.0             # VWAP numerator
        self.cum_vol           = 0.0             # VWAP denominator
        self.current_exposure  = 0               # −1, 0, or +1
        self.last_close        = None            # updated every bar

        self.EOD_BUFFER_MIN    = 5               # flatten N min before close

        # ── Diagnostic: track daily data for comparison ──────────
        self.diag_day_start_aum = 0.0
        self.diag_trades_today  = 0
        self.diag_log = []                       # list of daily dicts

        # NOTE: scheduled EOD event removed — it fires BEFORE bar data
        # at the same time step, causing fills at stale (previous bar)
        # prices.  Liquidation is handled solely by the OnData eod_cutoff.

    # ================================================================
    #  Per-bar processing
    # ================================================================

    def OnData(self, data: Slice):
        # Always capture dividend events (may arrive before bar data)
        if data.Dividends.ContainsKey(self.spy):
            self.pending_dividend = float(
                data.Dividends[self.spy].Distribution
            )

        if not data.Bars.ContainsKey(self.spy):
            return

        bar = data.Bars[self.spy]
        trade_date = self.Time.date()

        # ── New trading day ──────────────────────────────────────
        if trade_date != self.current_date:
            self._on_new_day(bar, trade_date)

        # ── Minute-from-open (1-based, matching the notebook) ────
        #    QC minute bars are end-timestamped, so bar_start = Time − 1 min
        bar_start = self.Time - timedelta(minutes=1)
        mfo = (bar_start.hour * 60 + bar_start.minute) - 570 + 1   # 570 = 9h30
        if mfo < 1 or mfo > 390:
            return

        # ── Update running VWAP ──────────────────────────────────
        close_px = float(bar.Close)
        hlc = (float(bar.High) + float(bar.Low) + close_px) / 3.0
        vol = float(bar.Volume)
        if vol > 0:
            self.cum_vol_hlc += vol * hlc
            self.cum_vol     += vol
        vwap = (self.cum_vol_hlc / self.cum_vol
                if self.cum_vol > 0 else close_px)

        # ── Track |close / open − 1| for sigma history ──────────
        self.today_move_open[mfo] = abs(close_px / self.today_open - 1.0)
        self.last_close = close_px

        # ── Close out 5 min before market close ──────────────────
        eod_cutoff = 390 - self.EOD_BUFFER_MIN + 1          # mfo = 386
        if mfo >= eod_cutoff:
            if self.Portfolio[self.spy].Quantity != 0:
                self.Liquidate(self.spy, tag="EOD flatten")
            self.current_exposure = 0
            return

        # ── Only evaluate signals at TRADE_FREQ boundaries ───────
        if mfo % self.TRADE_FREQ != 0:
            return

        # ── Sigma for this specific minute-of-day slot ───────────
        sigma = self._get_sigma(mfo)
        if sigma is None:
            return

        # ── Dynamic bands ────────────────────────────────────────
        upper_anchor = max(self.today_open, self.prev_close_adj)
        lower_anchor = min(self.today_open, self.prev_close_adj)
        ub = upper_anchor * (1.0 + self.BAND_MULT * sigma)
        lb = lower_anchor * (1.0 - self.BAND_MULT * sigma)

        # ── Signal ───────────────────────────────────────────────
        if close_px > ub and close_px > vwap:
            signal = 1
        elif close_px < lb and close_px < vwap:
            signal = -1
        else:
            signal = 0

        # ── Per-bar debug on the first divergent day ─────────────
        if str(trade_date) == "2026-01-26":
            self.Log(f"SIG|mfo={mfo}|close={close_px:.4f}|vwap={vwap:.4f}"
                     f"|sigma={sigma:.8f}|ub={ub:.4f}|lb={lb:.4f}"
                     f"|signal={signal}|exp={self.current_exposure}")

        # ── Execute ──────────────────────────────────────────────
        direction = {1: "Long", -1: "Short", 0: "Flat"}[signal]
        tag = f"{direction} | mfo={mfo} sigma={sigma:.5f}"
        self._set_exposure(signal, tag=tag)

    # ================================================================
    #  Day-level helpers
    # ================================================================

    def _on_new_day(self, first_bar: TradeBar, trade_date):
        """Flush yesterday's data and initialise the new day."""

        # ── Diagnostic: log completed day ────────────────────────
        if self.current_date is not None:
            end_aum = float(self.Portfolio.TotalPortfolioValue)
            day_pnl = end_aum - self.diag_day_start_aum
            day_ret = day_pnl / self.diag_day_start_aum if self.diag_day_start_aum else 0
            self.diag_log.append({
                "date":       str(self.current_date),
                "open":       self.today_open,
                "prev_close": self.prev_close_adj,
                "vol":        self.daily_vol,
                "shares":     self.shares,
                "trades":     self.diag_trades_today,
                "start_aum":  round(self.diag_day_start_aum, 2),
                "end_aum":    round(end_aum, 2),
                "pnl":        round(day_pnl, 2),
                "ret":        round(day_ret, 6),
            })

        # 1.  Push yesterday's per-minute move_open into history
        if self.current_date is not None:
            for minute, mo in self.today_move_open.items():
                self.move_open_history[minute].append(mo)

        # 2.  Store yesterday's closing price
        if self.last_close is not None:
            self.eod_closes.append(self.last_close)

        # 3.  Rolling daily volatility  (matches spy_ret.iloc[d-15:d-1])
        self._compute_daily_vol()

        # 4.  Reference prices for today
        self.current_date = trade_date
        self.today_open   = float(first_bar.Open)
        prev_close = (
            self.eod_closes[-1] if self.eod_closes else self.today_open
        )
        self.prev_close_adj = prev_close - self.pending_dividend

        # 5.  Position sizing – vol-target model
        aum = float(self.Portfolio.TotalPortfolioValue)
        if math.isnan(self.daily_vol) or self.daily_vol <= 0:
            leverage = self.MAX_LEVERAGE
        else:
            leverage = min(self.TARGET_VOL / self.daily_vol,
                           self.MAX_LEVERAGE)
        self.shares = round(aum / self.today_open * leverage)

        # 6.  Reset intraday accumulators
        self.cum_vol_hlc     = 0.0
        self.cum_vol         = 0.0
        self.current_exposure = 0
        self.today_move_open  = {}
        self.pending_dividend = 0.0

        # 7.  Diagnostic: record start-of-day state
        self.diag_day_start_aum = float(self.Portfolio.TotalPortfolioValue)
        self.diag_trades_today  = 0

    # ================================================================
    #  Indicator helpers
    # ================================================================

    def _compute_daily_vol(self):
        """Rolling std of 14 daily returns, excluding the most recent.

        Matches the notebook's ``spy_ret.iloc[d-15:d-1].std(ddof=1)``.
        Uses only fully-known past close prices so there is no
        look-ahead.
        """
        closes = list(self.eod_closes)
        if len(closes) < 2:
            self.daily_vol = float("nan")
            return

        rets = [closes[i] / closes[i - 1] - 1.0
                for i in range(1, len(closes))]

        if len(rets) >= 15:
            self.daily_vol = float(np.std(rets[-15:-1], ddof=1))
        else:
            self.daily_vol = float("nan")

    def _get_sigma(self, mfo: int):
        """Mean |close/open − 1| for this minute slot over the last
        SIGMA_WINDOW days.  Returns None when history is insufficient.

        Mirrors the notebook's ``sigma_open`` which is the shift(1) of
        a rolling(window=14, min_periods=13).mean() grouped by
        minute_of_day.  Because we only push yesterday's data into
        ``move_open_history`` at the start of today, querying the
        deque now implicitly gives the lagged (shift-1) value.
        """
        hist = self.move_open_history.get(mfo)
        if hist is None or len(hist) < self.SIGMA_MIN_PERIODS:
            return None
        return float(np.mean(hist))

    # ================================================================
    #  Execution helpers
    # ================================================================

    def _set_exposure(self, target: int, tag: str = ""):
        """Adjust the portfolio to the target exposure (−1, 0, +1).

        In the notebook the exposure vector is shifted by one bar
        before PnL.  In QC's event-driven model a market order placed
        on bar T fills at bar T's close, so PnL accrues from close[T]
        onward – equivalent to the notebook's entry at close[T-1]
        followed by PnL = close[T] − close[T-1].
        """
        if target == self.current_exposure:
            return

        target_qty  = target * self.shares
        current_qty = int(self.Portfolio[self.spy].Quantity)
        delta       = target_qty - current_qty

        if delta != 0:
            self.MarketOrder(self.spy, delta, tag=tag)
            self.diag_trades_today += 1
        self.current_exposure = target

    def _scheduled_eod(self):
        """Fallback liquidation via scheduled event before market close."""
        if self.Portfolio[self.spy].Quantity != 0:
            self.Liquidate(self.spy, tag="Scheduled EOD flatten")
            self.diag_trades_today += 1
        self.current_exposure = 0

    def OnEndOfAlgorithm(self):
        """Log every day's diagnostic to the backtest log."""
        if not self.diag_log:
            return
        header = "date|open|prev_close|vol|shares|trades|start_aum|end_aum|pnl|ret"
        self.Log(f"DIAG|{header}")
        for row in self.diag_log:
            line = (f'{row["date"]}|{row["open"]:.2f}|{row["prev_close"]:.2f}|'
                    f'{row["vol"]:.6f}|{row["shares"]}|{row["trades"]}|'
                    f'{row["start_aum"]:.2f}|{row["end_aum"]:.2f}|'
                    f'{row["pnl"]:.2f}|{row["ret"]:.6f}')
            self.Log(f"DIAG|{line}")
        self.Log(f"DIAG|Total days: {len(self.diag_log)}")