| Overall Statistics |
|
Total Orders 7727 Average Win 0.90% Average Loss -0.46% Compounding Annual Return 9.873% Drawdown 32.800% Expectancy 0.101 Start Equity 100000 End Equity 502124.80 Net Profit 402.125% Sharpe Ratio 0.486 Sortino Ratio 0.598 Probabilistic Sharpe Ratio 2.200% Loss Rate 63% Win Rate 37% Profit-Loss Ratio 1.97 Alpha 0.062 Beta -0.066 Annual Standard Deviation 0.118 Annual Variance 0.014 Information Ratio -0.04 Tracking Error 0.211 Treynor Ratio -0.866 Total Fees $57814.95 Estimated Strategy Capacity $51000000.00 Lowest Capacity Asset SPY R735QTJ8XC9X Portfolio Turnover 311.76% Drawdown Recovery 1541 |
# ──────────────────────────────────────────────────────────────────────
# Beat the Market – Intraday Momentum Strategy for SPY
# ──────────────────────────────────────────────────────────────────────
# Based on the paper by Carlo Zarattini & Mohamed Gabriel
# (ConcretumGroup). Converted from the original Polygon.io / pandas
# vectorised back-test to QuantConnect Cloud LEAN.
#
# Strategy overview
# -----------------
# 1. Each minute, compute an intraday VWAP and track how far the
# close has moved from the day's open (|close/open − 1|).
# 2. For every "minute of the day" slot, maintain a 14-day rolling
# average of that move (sigma_open), lagged by one day.
# 3. Build dynamic upper / lower bands:
# UB = max(open, prev_close_adj) × (1 + band_mult × sigma)
# LB = min(open, prev_close_adj) × (1 − band_mult × sigma)
# 4. At every TRADE_FREQ-minute boundary, if close > UB AND
# close > VWAP → go long; if close < LB AND close < VWAP →
# go short; otherwise flatten.
# 5. Positions are sized via a daily vol-target model, capped at
# MAX_LEVERAGE. All positions are closed at the market close.
# ──────────────────────────────────────────────────────────────────────
from AlgorithmImports import *
from collections import deque, defaultdict
from datetime import timedelta
import numpy as np
import math
# ── Custom fee model matching the notebook ───────────────────────────
class CustomFeeModel(FeeModel):
"""$0.0035 per share, $0.35 minimum per order (IBKR-style)."""
def GetOrderFee(self, parameters):
qty = abs(parameters.Order.AbsoluteQuantity)
fee = max(0.35, 0.0035 * qty)
return OrderFee(CashAmount(fee, "USD"))
# ── Main algorithm ───────────────────────────────────────────────────
class BeatTheMarketMomentum(QCAlgorithm):
# ================================================================
# Initialisation
# ================================================================
def Initialize(self):
# Back-test window
self.SetStartDate(2007, 4, 1)
self.SetEndDate(2024, 5, 16)
self.SetCash(100_000)
# Equity subscription – raw prices so dividends are explicit
equity = self.AddEquity("SPY", Resolution.Minute)
equity.SetDataNormalizationMode(DataNormalizationMode.Raw)
equity.SetFeeModel(CustomFeeModel())
equity.SetFillModel(ImmediateFillModel())
equity.SetSlippageModel(NullSlippageModel())
equity.SetLeverage(5) # allow up to 4× + buffer
self.spy = equity.Symbol
self.SetBenchmark(self.spy)
# ── Strategy parameters (mirror the notebook) ────────────
self.BAND_MULT = 1
self.TRADE_FREQ = 30 # signal eval every N min
self.TARGET_VOL = 0.02 # daily vol target
self.MAX_LEVERAGE = 4
self.SIGMA_WINDOW = 14 # rolling window for sigma
self.SIGMA_MIN_PERIODS = 13 # min observations needed
# ── Daily state ──────────────────────────────────────────
self.current_date = None
self.today_open = None
self.prev_close_adj = None # previous close − dividend
self.daily_vol = float("nan")
self.shares = 0 # position size for the day
self.pending_dividend = 0.0
# ── Historical tracking ──────────────────────────────────
# End-of-day close prices (enough for 15-return vol window)
self.eod_closes = deque(maxlen=20)
# Per-minute-of-day history of |close / open − 1|
self.move_open_history = defaultdict(
lambda: deque(maxlen=self.SIGMA_WINDOW)
)
self.today_move_open = {} # minute → move_open today
# ── Intraday state ───────────────────────────────────────
self.cum_vol_hlc = 0.0 # VWAP numerator
self.cum_vol = 0.0 # VWAP denominator
self.current_exposure = 0 # −1, 0, or +1
self.last_close = None # updated every bar
self.EOD_BUFFER_MIN = 5 # flatten N min before close
# ── Diagnostic: track daily data for comparison ──────────
self.diag_day_start_aum = 0.0
self.diag_trades_today = 0
self.diag_log = [] # list of daily dicts
# NOTE: scheduled EOD event removed — it fires BEFORE bar data
# at the same time step, causing fills at stale (previous bar)
# prices. Liquidation is handled solely by the OnData eod_cutoff.
# ================================================================
# Per-bar processing
# ================================================================
def OnData(self, data: Slice):
# Always capture dividend events (may arrive before bar data)
if data.Dividends.ContainsKey(self.spy):
self.pending_dividend = float(
data.Dividends[self.spy].Distribution
)
if not data.Bars.ContainsKey(self.spy):
return
bar = data.Bars[self.spy]
trade_date = self.Time.date()
# ── New trading day ──────────────────────────────────────
if trade_date != self.current_date:
self._on_new_day(bar, trade_date)
# ── Minute-from-open (1-based, matching the notebook) ────
# QC minute bars are end-timestamped, so bar_start = Time − 1 min
bar_start = self.Time - timedelta(minutes=1)
mfo = (bar_start.hour * 60 + bar_start.minute) - 570 + 1 # 570 = 9h30
if mfo < 1 or mfo > 390:
return
# ── Update running VWAP ──────────────────────────────────
close_px = float(bar.Close)
hlc = (float(bar.High) + float(bar.Low) + close_px) / 3.0
vol = float(bar.Volume)
if vol > 0:
self.cum_vol_hlc += vol * hlc
self.cum_vol += vol
vwap = (self.cum_vol_hlc / self.cum_vol
if self.cum_vol > 0 else close_px)
# ── Track |close / open − 1| for sigma history ──────────
self.today_move_open[mfo] = abs(close_px / self.today_open - 1.0)
self.last_close = close_px
# ── Close out 5 min before market close ──────────────────
eod_cutoff = 390 - self.EOD_BUFFER_MIN + 1 # mfo = 386
if mfo >= eod_cutoff:
if self.Portfolio[self.spy].Quantity != 0:
self.Liquidate(self.spy, tag="EOD flatten")
self.current_exposure = 0
return
# ── Only evaluate signals at TRADE_FREQ boundaries ───────
if mfo % self.TRADE_FREQ != 0:
return
# ── Sigma for this specific minute-of-day slot ───────────
sigma = self._get_sigma(mfo)
if sigma is None:
return
# ── Dynamic bands ────────────────────────────────────────
upper_anchor = max(self.today_open, self.prev_close_adj)
lower_anchor = min(self.today_open, self.prev_close_adj)
ub = upper_anchor * (1.0 + self.BAND_MULT * sigma)
lb = lower_anchor * (1.0 - self.BAND_MULT * sigma)
# ── Signal ───────────────────────────────────────────────
if close_px > ub and close_px > vwap:
signal = 1
elif close_px < lb and close_px < vwap:
signal = -1
else:
signal = 0
# ── Per-bar debug on the first divergent day ─────────────
if str(trade_date) == "2026-01-26":
self.Log(f"SIG|mfo={mfo}|close={close_px:.4f}|vwap={vwap:.4f}"
f"|sigma={sigma:.8f}|ub={ub:.4f}|lb={lb:.4f}"
f"|signal={signal}|exp={self.current_exposure}")
# ── Execute ──────────────────────────────────────────────
direction = {1: "Long", -1: "Short", 0: "Flat"}[signal]
tag = f"{direction} | mfo={mfo} sigma={sigma:.5f}"
self._set_exposure(signal, tag=tag)
# ================================================================
# Day-level helpers
# ================================================================
def _on_new_day(self, first_bar: TradeBar, trade_date):
"""Flush yesterday's data and initialise the new day."""
# ── Diagnostic: log completed day ────────────────────────
if self.current_date is not None:
end_aum = float(self.Portfolio.TotalPortfolioValue)
day_pnl = end_aum - self.diag_day_start_aum
day_ret = day_pnl / self.diag_day_start_aum if self.diag_day_start_aum else 0
self.diag_log.append({
"date": str(self.current_date),
"open": self.today_open,
"prev_close": self.prev_close_adj,
"vol": self.daily_vol,
"shares": self.shares,
"trades": self.diag_trades_today,
"start_aum": round(self.diag_day_start_aum, 2),
"end_aum": round(end_aum, 2),
"pnl": round(day_pnl, 2),
"ret": round(day_ret, 6),
})
# 1. Push yesterday's per-minute move_open into history
if self.current_date is not None:
for minute, mo in self.today_move_open.items():
self.move_open_history[minute].append(mo)
# 2. Store yesterday's closing price
if self.last_close is not None:
self.eod_closes.append(self.last_close)
# 3. Rolling daily volatility (matches spy_ret.iloc[d-15:d-1])
self._compute_daily_vol()
# 4. Reference prices for today
self.current_date = trade_date
self.today_open = float(first_bar.Open)
prev_close = (
self.eod_closes[-1] if self.eod_closes else self.today_open
)
self.prev_close_adj = prev_close - self.pending_dividend
# 5. Position sizing – vol-target model
aum = float(self.Portfolio.TotalPortfolioValue)
if math.isnan(self.daily_vol) or self.daily_vol <= 0:
leverage = self.MAX_LEVERAGE
else:
leverage = min(self.TARGET_VOL / self.daily_vol,
self.MAX_LEVERAGE)
self.shares = round(aum / self.today_open * leverage)
# 6. Reset intraday accumulators
self.cum_vol_hlc = 0.0
self.cum_vol = 0.0
self.current_exposure = 0
self.today_move_open = {}
self.pending_dividend = 0.0
# 7. Diagnostic: record start-of-day state
self.diag_day_start_aum = float(self.Portfolio.TotalPortfolioValue)
self.diag_trades_today = 0
# ================================================================
# Indicator helpers
# ================================================================
def _compute_daily_vol(self):
"""Rolling std of 14 daily returns, excluding the most recent.
Matches the notebook's ``spy_ret.iloc[d-15:d-1].std(ddof=1)``.
Uses only fully-known past close prices so there is no
look-ahead.
"""
closes = list(self.eod_closes)
if len(closes) < 2:
self.daily_vol = float("nan")
return
rets = [closes[i] / closes[i - 1] - 1.0
for i in range(1, len(closes))]
if len(rets) >= 15:
self.daily_vol = float(np.std(rets[-15:-1], ddof=1))
else:
self.daily_vol = float("nan")
def _get_sigma(self, mfo: int):
"""Mean |close/open − 1| for this minute slot over the last
SIGMA_WINDOW days. Returns None when history is insufficient.
Mirrors the notebook's ``sigma_open`` which is the shift(1) of
a rolling(window=14, min_periods=13).mean() grouped by
minute_of_day. Because we only push yesterday's data into
``move_open_history`` at the start of today, querying the
deque now implicitly gives the lagged (shift-1) value.
"""
hist = self.move_open_history.get(mfo)
if hist is None or len(hist) < self.SIGMA_MIN_PERIODS:
return None
return float(np.mean(hist))
# ================================================================
# Execution helpers
# ================================================================
def _set_exposure(self, target: int, tag: str = ""):
"""Adjust the portfolio to the target exposure (−1, 0, +1).
In the notebook the exposure vector is shifted by one bar
before PnL. In QC's event-driven model a market order placed
on bar T fills at bar T's close, so PnL accrues from close[T]
onward – equivalent to the notebook's entry at close[T-1]
followed by PnL = close[T] − close[T-1].
"""
if target == self.current_exposure:
return
target_qty = target * self.shares
current_qty = int(self.Portfolio[self.spy].Quantity)
delta = target_qty - current_qty
if delta != 0:
self.MarketOrder(self.spy, delta, tag=tag)
self.diag_trades_today += 1
self.current_exposure = target
def _scheduled_eod(self):
"""Fallback liquidation via scheduled event before market close."""
if self.Portfolio[self.spy].Quantity != 0:
self.Liquidate(self.spy, tag="Scheduled EOD flatten")
self.diag_trades_today += 1
self.current_exposure = 0
def OnEndOfAlgorithm(self):
"""Log every day's diagnostic to the backtest log."""
if not self.diag_log:
return
header = "date|open|prev_close|vol|shares|trades|start_aum|end_aum|pnl|ret"
self.Log(f"DIAG|{header}")
for row in self.diag_log:
line = (f'{row["date"]}|{row["open"]:.2f}|{row["prev_close"]:.2f}|'
f'{row["vol"]:.6f}|{row["shares"]}|{row["trades"]}|'
f'{row["start_aum"]:.2f}|{row["end_aum"]:.2f}|'
f'{row["pnl"]:.2f}|{row["ret"]:.6f}')
self.Log(f"DIAG|{line}")
self.Log(f"DIAG|Total days: {len(self.diag_log)}")