Overall Statistics
Total Orders
30712
Average Win
0.17%
Average Loss
-0.20%
Compounding Annual Return
15.626%
Drawdown
22.200%
Expectancy
0.132
Start Equity
1000000
End Equity
53782447.89
Net Profit
5278.245%
Sharpe Ratio
0.769
Sortino Ratio
0.832
Probabilistic Sharpe Ratio
32.112%
Loss Rate
38%
Win Rate
62%
Profit-Loss Ratio
0.82
Alpha
0.065
Beta
0.495
Annual Standard Deviation
0.115
Annual Variance
0.013
Information Ratio
0.351
Tracking Error
0.116
Treynor Ratio
0.179
Total Fees
$5055112.34
Estimated Strategy Capacity
$46000000.00
Lowest Capacity Asset
SLB R735QTJ8XC9X
Portfolio Turnover
27.02%
Drawdown Recovery
788
from collections import deque
from typing import Deque, Dict, List, Optional, Set, Tuple

from AlgorithmImports import *


# ============================================================================
#  Per-symbol intraday state
# ============================================================================

class SymbolData:
    """
    Running intraday bar + completed daily OHLC history for one equity.
    Created only for top-50 momentum stocks (and kept while position is open).

    Two data sources feed this object:
      - update_minute()  : called from on_data every minute bar
      - on_daily_bar()   : called by a daily TradeBarConsolidator
    """

    def __init__(self, adx_period: int, hist_size: int) -> None:
        self._adx_period    = adx_period
        self.day_h          : Optional[float] = None
        self.day_l          : Optional[float] = None
        self.day_c          : Optional[float] = None
        self.day_date       = None
        self.yesterday_high : Optional[float] = None
        # Completed daily (H, L, C) tuples for ADX calculation
        self._daily         : Deque[Tuple[float, float, float]] = deque(maxlen=hist_size)
        # Momentum score from last monthly rebalance (used for tie-breaking)
        self.momentum_score : float = 0.0

    def update_minute(self, bar: TradeBar) -> None:
        """Maintain running intraday OHLC from 1-min bars."""
        d = bar.time.date()
        if d != self.day_date:
            self.day_h    = bar.high
            self.day_l    = bar.low
            self.day_c    = bar.close
            self.day_date = d
        else:
            self.day_h = max(self.day_h, bar.high)
            self.day_l = min(self.day_l, bar.low)
            self.day_c = bar.close

    def on_daily_bar(self, bar: TradeBar) -> None:
        """Called by daily consolidator when a completed day is emitted."""
        self.yesterday_high = bar.high
        self._daily.append((bar.high, bar.low, bar.close))

    @property
    def ready(self) -> bool:
        """True once the first minute bar of the current session has arrived."""
        return self.day_h is not None

    def ibs(self) -> float:
        """Internal Bar Strength on the running (unfinished) daily bar."""
        rng = self.day_h - self.day_l
        return 0.5 if rng == 0 else (self.day_c - self.day_l) / rng

    def adx_value(self) -> Optional[float]:
        """
        Wilder ADX on completed daily bars + today's running bar injected last.
        Returns None until enough history is available.
        """
        completed = list(self._daily)
        if len(completed) < self._adx_period * 2:
            return None
        all_bars = completed + [(self.day_h, self.day_l, self.day_c)]
        return Top50IBSMomentumAlgorithm._wilder_adx(all_bars, self._adx_period)


# ============================================================================
#  Main algorithm
# ============================================================================

class Top50IBSMomentumAlgorithm(QCAlgorithm):
    """
    Monthly TOP-50 Momentum Universe + IBS/ADX Intraday Mean-Reversion
    ====================================================================

    Universe
    --------
    Coarse: top UNIVERSE_SIZE equities by dollar volume (minute resolution).
    Monthly rank: (12M + 6M) / 2 combined score, both filters positive.
    Active trading universe = top MOMENTUM_TAKE_N.

    Entry  (15:55 ET)
    -----------------
    IBS < IBS_ENTRY_THR  AND  ADX(ADX_PERIOD) > ADX_ENTRY_THR.
    Maximum MAX_POSITIONS concurrent open positions.
    When available slots < signals on same day:
      -> highest momentum score wins (6M+12M score from last rebalance).

    Exit   (15:55 ET)
    -----------------
    IBS > IBS_EXIT_THR  OR  Close > Yesterday's High.
    Stocks that fall out of top-50 are NOT force-closed;
    they exit naturally via the IBS/high conditions above.

    Sizing
    ------
    POSITION_SIZE per trade (unleveraged equity shares, no CFD mechanics).

    Index
    -----
    'Traded Index' mirrors the live portfolio return (base = 100).
    Plotted daily alongside SPY rebased to 100.

    Performance note
    ----------------
    UNIVERSE_SIZE = 200 gives a reasonable balance of universe breadth vs
    backtest speed (minute data for ~200 stocks).  Increase to 300-500 for
    broader coverage; expect proportionally longer backtest runtimes.
    """

    # ── Universe / momentum ──────────────────────────────────────────────────
    UNIVERSE_SIZE      : int  = 500    # top N by dollar volume; all get minute data
    MOMENTUM_TAKE_N    : int  = 200
    LOOKBACK_12M       : int  = 252
    LOOKBACK_6M        : int  = 126
    USE_12M_FILTER     : bool = True
    USE_6M_FILTER      : bool = True
    USE_SPY_FILTER     : bool = False
    SPY_MOMENTUM_DAYS  : int  = 200    # SPY 12M return must be > 0 to allow entries

    # ── IBS / ADX ────────────────────────────────────────────────────────────
    IBS_ENTRY_THR  : float = 0.10
    IBS_EXIT_THR   : float = 0.80
    ADX_PERIOD     : int   = 10
    ADX_ENTRY_THR  : float = 20.0
    ADX_HIST_MULT  : int   = 5        # daily OHLC kept = ADX_PERIOD * ADX_HIST_MULT bars

    # ── Position sizing ──────────────────────────────────────────────────────
    MAX_POSITIONS  : int   = 10
    POSITION_SIZE  : float = 0.10     # X % per position, unleveraged equity shares

    # ============================================================================

    def initialize(self) -> None:
        self.set_start_date(1999, 1, 1)
        self.set_cash(1_000_000)
        self.settings.seed_initial_prices = True
        self.settings.minimum_order_margin_portfolio_percentage = 0

        self.set_brokerage_model(
            BrokerageName.INTERACTIVE_BROKERS_BROKERAGE,
            AccountType.CASH,
        )
        self.set_risk_free_interest_rate_model(InterestRateProvider())

        # ── Universe: minute data for all selected stocks ────────────────────
        # on_data fires every minute; daily consolidators aggregate internally.
        self.universe_settings.resolution = Resolution.MINUTE
        self.add_universe(self._coarse_selection)

        # ── SPY: benchmark + 252-day regime filter ───────────────────────────
        self._spy = self.add_equity("SPY", Resolution.MINUTE).symbol
        self.set_benchmark("SPY")

        self._spy_closes: Deque[float] = deque(maxlen=self.SPY_MOMENTUM_DAYS + 5)
        spy_con = TradeBarConsolidator(timedelta(days=1))
        spy_con.data_consolidated += self._on_spy_daily
        self.subscription_manager.add_consolidator(self._spy, spy_con)

        # ── State ────────────────────────────────────────────────────────────
        self._selected_symbols : Set[Symbol]                = set()
        self._price_history    : Dict[Symbol, Deque[float]] = {}
        self._top50_symbols    : Set[Symbol]                = set()
        self._top50_sorted     : List[Tuple[Symbol, float]] = []
        self._symbol_data      : Dict[Symbol, SymbolData]   = {}
        self._max_lookback     : int = self.LOOKBACK_12M + 2

        # ── Warm-up: DAILY resolution replay for 12M momentum + ADX history ──
        # Consolidators fire during warm-up and populate _price_history /
        # _spy_closes before the first live rebalance.
        self.set_warm_up(self._max_lookback + 10, Resolution.DAILY)

        # ── Schedules ────────────────────────────────────────────────────────
        self.schedule.on(
            self.date_rules.month_start("SPY"),
            self.time_rules.after_market_open("SPY", 1),
            self._rebalance,
        )
        self.schedule.on(
            self.date_rules.every_day("SPY"),
            self.time_rules.before_market_close("SPY", 5),
            self._check_signals,
        )

        # ── Custom index ─────────────────────────────────────────────────────
        self._traded_index_value : float               = 100.0
        self._index_prev_prices  : Dict[Symbol, float] = {}
        self._index_today_prices : Dict[Symbol, float] = {}
        self._spy_index_value    : float               = 100.0
        self._spy_index_prev     : Optional[float]     = None
        self._spy_today_price    : Optional[float]     = None

        chart = Chart("Strategy Index vs SPY")
        chart.add_series(Series("Traded Index",  SeriesType.LINE, "$"))
        chart.add_series(Series("SPY (rebased)", SeriesType.LINE, "$"))
        self.add_chart(chart)

    # ============================================================================
    #  UNIVERSE
    # ============================================================================

    def _coarse_selection(self, coarse) -> List:
        filtered = [
            x for x in coarse
            if x.has_fundamental_data
            and x.price > 5
            and x.dollar_volume > 1_000_000
        ]
        top = sorted(filtered, key=lambda x: x.dollar_volume, reverse=True)[:self.UNIVERSE_SIZE]
        symbols = [x.symbol for x in top]
        self._selected_symbols = set(symbols)
        return symbols

    def on_securities_changed(self, changes: SecurityChanges) -> None:
        """
        For each new universe security, register a daily consolidator that
        appends the closing price to _price_history (used for momentum scoring).
        One consolidator per symbol; duplicate-add guarded by the _price_history
        presence check.
        """
        for sec in changes.added_securities:
            sym = sec.symbol
            if sym == self._spy:
                continue
            if sym not in self._price_history:
                self._price_history[sym] = deque(maxlen=self._max_lookback)
                con = TradeBarConsolidator(timedelta(days=1))
                def _make_ph_handler(s: Symbol):
                    def _h(_, bar: TradeBar) -> None:
                        ph = self._price_history.get(s)
                        if ph is not None:
                            ph.append(bar.close)
                    return _h
                con.data_consolidated += _make_ph_handler(sym)
                self.subscription_manager.add_consolidator(sym, con)

        for sec in changes.removed_securities:
            self._selected_symbols.discard(sec.symbol)

    # ============================================================================
    #  SPY REGIME FILTER
    # ============================================================================

    def _on_spy_daily(self, sender, bar: TradeBar) -> None:
        self._spy_closes.append(bar.close)

    def _spy_allows_trading(self) -> bool:
        if not self.USE_SPY_FILTER:
            return True
        if len(self._spy_closes) < self.SPY_MOMENTUM_DAYS + 1:
            return True   # insufficient history -> allow (conservative fallback)
        return self._spy_closes[-1] > self._spy_closes[-(self.SPY_MOMENTUM_DAYS + 1)]

    # ============================================================================
    #  ON DATA  (called every minute)
    # ============================================================================

    def on_data(self, data: Slice) -> None:
        if self.is_warming_up:
            return

        # Update running intraday bars for tracked symbols (top-50 + open positions)
        for sym, sd in self._symbol_data.items():
            if sym in data.bars:
                sd.update_minute(data.bars[sym])

        # SPY price tracking (daily close, updated every minute)
        if self._spy in data.bars:
            self._spy_today_price = data.bars[self._spy].close

        # Track latest prices for currently held stocks (for daily index calc)
        for sym in self._symbol_data:
            if self.portfolio[sym].invested and sym in data.bars:
                self._index_today_prices[sym] = data.bars[sym].close

    # ============================================================================
    #  MONTHLY REBALANCE
    # ============================================================================

    def _rebalance(self) -> None:
        if self.is_warming_up:
            return

        # Score all universe symbols from _price_history (no API call needed)
        candidates: List[Tuple[Symbol, float]] = []
        for sym in self._selected_symbols:
            hist = self._price_history.get(sym)
            if hist is None or len(hist) < self.LOOKBACK_12M + 1:
                continue
            closes = list(hist)
            m12 = closes[-1] / closes[-(self.LOOKBACK_12M + 1)] - 1.0
            if self.USE_12M_FILTER and m12 <= 0:
                continue
            m6 = None
            if len(closes) >= self.LOOKBACK_6M + 1:
                m6 = closes[-1] / closes[-(self.LOOKBACK_6M + 1)] - 1.0
            if self.USE_6M_FILTER and m6 is not None and m6 <= 0:
                continue
            score = (m12 + (m6 if m6 is not None else m12)) / 2.0
            candidates.append((sym, score))

        candidates.sort(key=lambda x: x[1], reverse=True)
        new_top50      = candidates[:self.MOMENTUM_TAKE_N]
        new_top50_syms = {sym for sym, _ in new_top50}

        # Setup SymbolData for stocks entering the top-50 for the first time
        newly_added = new_top50_syms - set(self._symbol_data.keys())
        if newly_added:
            hist_size = self.ADX_PERIOD * self.ADX_HIST_MULT + 10
            try:
                # Bulk daily OHLC fetch to seed ADX history
                ohlc = self.history(list(newly_added), hist_size + 5, Resolution.DAILY)
            except Exception as e:
                self.debug(f"OHLC history fetch failed: {e}")
                ohlc = None

            for sym in newly_added:
                sd = SymbolData(self.ADX_PERIOD, hist_size + 20)

                # Backfill completed daily bars
                if ohlc is not None and not ohlc.empty:
                    try:
                        rows = ohlc.loc[sym]
                        for _, row in rows.iterrows():
                            sd._daily.append((row["high"], row["low"], row["close"]))
                        if sd._daily:
                            sd.yesterday_high = sd._daily[-1][0]
                    except (KeyError, Exception):
                        pass

                # Daily consolidator: keeps ADX history and yesterday_high alive
                con = TradeBarConsolidator(timedelta(days=1))
                def _make_sd_handler(s: Symbol):
                    def _h(_, bar: TradeBar) -> None:
                        if s in self._symbol_data:
                            self._symbol_data[s].on_daily_bar(bar)
                    return _h
                con.data_consolidated += _make_sd_handler(sym)
                self.subscription_manager.add_consolidator(sym, con)

                self._symbol_data[sym] = sd

        # Update momentum scores for all current top-50 entries
        for sym, score in new_top50:
            if sym in self._symbol_data:
                self._symbol_data[sym].momentum_score = score

        self._top50_symbols = new_top50_syms
        self._top50_sorted  = new_top50

        self.debug(
            f"{self.time.date()} -- rebalanced | "
            f"candidates: {len(candidates)} | top50: {len(new_top50)} | "
            f"spy_filter: {'OK' if self._spy_allows_trading() else 'BLOCKED'}"
        )

    # ============================================================================
    #  SIGNAL CHECK  (15:55 ET)
    # ============================================================================

    def _check_signals(self) -> None:
        if self.is_warming_up:
            return

        # ── EXIT: evaluate all currently invested positions ──────────────────
        for sym, sd in list(self._symbol_data.items()):
            if not self.portfolio[sym].invested:
                continue
            # If security is no longer tradable, force liquidation
            if not self.securities[sym].is_tradable:
                self.liquidate(sym)
                self.log(
                    f"EXIT  {self.time:%Y-%m-%d} {sym.value} | NonTradable | "
                    f"forced liquidation"
                )
                continue
            if not sd.ready:
                continue
            ibs       = sd.ibs()
            exit_ibs  = ibs > self.IBS_EXIT_THR
            exit_high = sd.yesterday_high is not None and sd.day_c > sd.yesterday_high
            if exit_ibs or exit_high:
                if not self.is_market_open(sym):
                    continue
                reason = "IBS>0.8" if exit_ibs else "Close>PrevHigh"
                self.liquidate(sym)
                self.log(
                    f"EXIT  {self.time:%Y-%m-%d} {sym.value} | {reason} | "
                    f"IBS={ibs:.3f}  close={sd.day_c:.2f}  "
                    f"prevH={sd.yesterday_high:.2f}"
                )

        # ── Daily index calculation (close-to-close, once per session) ───────
        # SPY rebased index
        if self._spy_index_prev is not None and self._spy_index_prev > 0 and self._spy_today_price is not None:
            self._spy_index_value *= self._spy_today_price / self._spy_index_prev
        if self._spy_today_price is not None:
            self._spy_index_prev = self._spy_today_price
        self._spy_today_price = None

        # Traded index: equal-weight daily return of held stocks
        held = [s for s in self._symbol_data if self.portfolio[s].invested]
        daily_rets = []
        for sym in held:
            if sym in self._index_prev_prices and sym in self._index_today_prices:
                prev = self._index_prev_prices[sym]
                curr = self._index_today_prices[sym]
                if prev > 0:
                    daily_rets.append(curr / prev - 1.0)
        if daily_rets:
            self._traded_index_value *= 1.0 + sum(daily_rets) / len(daily_rets)

        # Update prev prices for tomorrow (only for stocks still held)
        self._index_prev_prices = {sym: self._index_today_prices[sym]
                                   for sym in held
                                   if sym in self._index_today_prices}
        self._index_today_prices = {}

        # ── Plot daily index values once per session ─────────────────────────
        self.plot("Strategy Index vs SPY", "Traded Index",  self._traded_index_value)
        self.plot("Strategy Index vs SPY", "SPY (rebased)", self._spy_index_value)

        # ── SPY regime gate: block new entries when market is in downtrend ───
        if not self._spy_allows_trading():
            return

        # ── Count available position slots ───────────────────────────────────
        invested_count = sum(1 for s in self._symbol_data if self.portfolio[s].invested)
        available      = self.MAX_POSITIONS - invested_count
        if available <= 0:
            return

        # ── Collect all entry signals from top-50, not currently invested ────
        entry_cands: List[Tuple[Symbol, float, float, float]] = []
        for sym in self._top50_symbols:
            if self.portfolio[sym].invested:
                continue
            if not self.securities[sym].has_data:
                continue
            sd = self._symbol_data.get(sym)
            if sd is None or not sd.ready:
                continue
            ibs = sd.ibs()
            adx = sd.adx_value()
            if adx is None:
                continue
            if ibs < self.IBS_ENTRY_THR and adx > self.ADX_ENTRY_THR:
                entry_cands.append((sym, sd.momentum_score, ibs, adx))

        # ── Rank by momentum score; fill slots (best momentum first) ─────────
        # When available slots < signals, highest (6M+12M)/2 score wins.
        entry_cands.sort(key=lambda x: x[1], reverse=True)
        for sym, score, ibs, adx in entry_cands[:available]:
            # Explicit cash-capped sizing: target 20 % of TPV but never use margin.
            # We leave a 3 % cash buffer so broker margin overheads don't reject
            # the order (Initial Margin can be slightly above notional value).
            target_value   = self.portfolio.total_portfolio_value * self.POSITION_SIZE
            available_cash = min(self.portfolio.cash, self.portfolio.margin_remaining) * 0.97
            max_value      = min(target_value, available_cash)
            price = self.securities[sym].price
            if price <= 0 or max_value < 1000:   # skip micro-orders
                continue
            shares = int(max_value / price)      # floor = round down
            cost   = shares * price
            if shares <= 0 or cost > available_cash:
                continue
            # Guard: only trade if market is open (avoid extended-hour fills)
            if not self.is_market_open(sym):
                continue
            self.market_order(sym, shares)
            self.log(
                f"ENTRY {self.time:%Y-%m-%d} {sym.value} | "
                f"IBS={ibs:.3f}  ADX={adx:.1f}  MomScore={score:.3f}  "
                f"Shares={shares}  Cost=${cost:,.0f}"
            )
            # Seed prev price so the stock starts contributing to index from tomorrow
            sd = self._symbol_data.get(sym)
            if sd is not None and sd.day_c is not None:
                self._index_prev_prices[sym] = sd.day_c

    # ============================================================================
    #  WILDER ADX  (pure Python, static — same implementation as IBS/ADX QQQ)
    # ============================================================================

    @staticmethod
    def _wilder_adx(
        bars: List[Tuple[float, float, float]], period: int
    ) -> Optional[float]:
        """
        Wilder ADX from a list of (high, low, close) tuples, oldest first.
        Returns None if insufficient bars.
        """
        n = len(bars)
        if n < period + 1:
            return None

        tr_v, pdm_v, ndm_v = [], [], []
        for i in range(1, n):
            h, l, c    = bars[i]
            ph, pl, pc = bars[i - 1]
            tr  = max(h - l, abs(h - pc), abs(l - pc))
            up  = h - ph
            dn  = pl - l
            tr_v.append(tr)
            pdm_v.append(up if (up > dn and up > 0) else 0.0)
            ndm_v.append(dn if (dn > up and dn > 0) else 0.0)

        def _ws(vals: List[float], p: int) -> List[float]:
            if len(vals) < p:
                return []
            s = sum(vals[:p]) / p
            out = [s]
            for v in vals[p:]:
                s = s - s / p + v / p
                out.append(s)
            return out

        atr_s = _ws(tr_v, period)
        pdm_s = _ws(pdm_v, period)
        ndm_s = _ws(ndm_v, period)
        if not atr_s:
            return None

        dx_v: List[float] = []
        for a, p, nm in zip(atr_s, pdm_s, ndm_s):
            if a == 0:
                dx_v.append(0.0)
            else:
                pdi = 100.0 * p  / a
                ndi = 100.0 * nm / a
                den = pdi + ndi
                dx_v.append(100.0 * abs(pdi - ndi) / den if den else 0.0)

        if len(dx_v) < period:
            return None
        adx_v = _ws(dx_v, period)
        return adx_v[-1] if adx_v else None

    # ============================================================================
    #  END SUMMARY
    # ============================================================================

    def on_end_of_algorithm(self) -> None:
        open_positions = [s for s in self._symbol_data if self.portfolio[s].invested]
        self.log(
            f"\n{'='*60}\n"
            f"  Top-50 IBS Momentum -- Final Summary\n"
            f"{'='*60}\n"
            f"  Portfolio Value  : ${self.portfolio.total_portfolio_value:>14,.2f}\n"
            f"  Cash             : ${self.portfolio.cash:>14,.2f}\n"
            f"  Open Positions   : {len(open_positions)}\n"
            f"  Traded Index     : {self._traded_index_value:.4f}\n"
            f"  SPY Index        : {self._spy_index_value:.4f}\n"
            f"{'='*60}"
        )