Overall Statistics
Total Orders
34712
Average Win
0.17%
Average Loss
-0.19%
Compounding Annual Return
13.152%
Drawdown
17.700%
Expectancy
0.103
Start Equity
1000000
End Equity
29704391.28
Net Profit
2870.439%
Sharpe Ratio
0.655
Sortino Ratio
0.705
Probabilistic Sharpe Ratio
15.012%
Loss Rate
42%
Win Rate
58%
Profit-Loss Ratio
0.90
Alpha
0.05
Beta
0.449
Annual Standard Deviation
0.109
Annual Variance
0.012
Information Ratio
0.196
Tracking Error
0.12
Treynor Ratio
0.159
Total Fees
$4425279.69
Estimated Strategy Capacity
$13000000.00
Lowest Capacity Asset
SLB R735QTJ8XC9X
Portfolio Turnover
30.40%
Drawdown Recovery
738
from collections import deque
from typing import Deque, Dict, List, Optional, Set, Tuple

from AlgorithmImports import *


# ============================================================================
#  Per-symbol intraday state
# ============================================================================

class SymbolData:
    def __init__(self, adx_period: int, hist_size: int) -> None:
        self._adx_period    = adx_period
        self.day_h          : Optional[float] = None
        self.day_l          : Optional[float] = None
        self.day_c          : Optional[float] = None
        self.day_date       = None
        self.yesterday_high : Optional[float] = None
        self._daily         : Deque[Tuple[float, float, float]] = deque(maxlen=hist_size)
        self.momentum_score : float = 0.0

    def update_minute(self, bar: TradeBar) -> None:
        d = bar.time.date()
        if d != self.day_date:
            self.day_h    = bar.high
            self.day_l    = bar.low
            self.day_c    = bar.close
            self.day_date = d
        else:
            self.day_h = max(self.day_h, bar.high)
            self.day_l = min(self.day_l, bar.low)
            self.day_c = bar.close

    def on_daily_bar(self, bar: TradeBar) -> None:
        self.yesterday_high = bar.high
        self._daily.append((bar.high, bar.low, bar.close))

    @property
    def ready(self) -> bool:
        return self.day_h is not None

    def ibs(self) -> float:
        rng = self.day_h - self.day_l
        return 0.5 if rng == 0 else (self.day_c - self.day_l) / rng

    def adx_value(self) -> Optional[float]:
        completed = list(self._daily)
        if len(completed) < self._adx_period * 2:
            return None
        all_bars = completed + [(self.day_h, self.day_l, self.day_c)]
        return Top50IBSMomentumAlgorithm._wilder_adx(all_bars, self._adx_period)


# ============================================================================
#  Main algorithm
# ============================================================================

class Top50IBSMomentumAlgorithm(QCAlgorithm):
    """
    Monthly TOP-50 Momentum Universe + IBS/ADX Intraday Mean-Reversion
    ====================================================================

    Universe
    --------
    Coarse: top UNIVERSE_SIZE equities by dollar volume (minute resolution).
    Monthly rank: (12M + 6M) / 2 combined score, both filters positive.
    Active trading universe = top MOMENTUM_TAKE_N.

    Entry  (15:55 ET)
    -----------------
    IBS < IBS_ENTRY_THR  AND  ADX(ADX_PERIOD) > ADX_ENTRY_THR.
    Maximum MAX_POSITIONS concurrent open positions.
    When available slots < signals: highest momentum score wins.

    Exit   (15:55 ET)
    -----------------
    IBS > IBS_EXIT_THR  OR  Close > Yesterday's High.
    OR held >= MAX_HOLD_DAYS calendar days (time-stop; 0 = disabled).

    Benchmark: SPY.
    """

    # -- Universe / momentum --------------------------------------------------
    UNIVERSE_SIZE     : int   = 500
    MOMENTUM_TAKE_N   : int   = 200
    LOOKBACK_12M      : int   = 252
    LOOKBACK_6M       : int   = 126
    USE_12M_FILTER    : bool  = True
    USE_6M_FILTER     : bool  = True
    USE_SPY_FILTER    : bool  = False
    SPY_MOMENTUM_DAYS : int   = 200

    # -- IBS / ADX -----------------------------------------------------------
    IBS_ENTRY_THR : float = 0.10
    IBS_EXIT_THR  : float = 0.90
    ADX_PERIOD    : int   = 10
    ADX_ENTRY_THR : float = 20.0
    ADX_HIST_MULT : int   = 5

    # -- Position sizing ------------------------------------------------------
    MAX_POSITIONS : int   = 10
    POSITION_SIZE : float = 0.10

    # Safety time-stop: liquidate any position held longer than N calendar days.
    # Set to 0 to disable.
    MAX_HOLD_DAYS : int   = 5

    # =========================================================================

    def initialize(self) -> None:
        self.set_start_date(1999, 1, 1)
        self.set_cash(1_000_000)
        self.settings.seed_initial_prices = True
        self.settings.minimum_order_margin_portfolio_percentage = 0

        self.set_brokerage_model(
            BrokerageName.INTERACTIVE_BROKERS_BROKERAGE,
            AccountType.CASH,
        )
        self.set_risk_free_interest_rate_model(InterestRateProvider())

        self.universe_settings.resolution = Resolution.MINUTE
        self.add_universe(self._coarse_selection)

        self._spy = self.add_equity("SPY", Resolution.MINUTE).symbol
        self.set_benchmark("SPY")

        self._spy_closes: Deque[float] = deque(maxlen=self.SPY_MOMENTUM_DAYS + 5)
        spy_con = TradeBarConsolidator(timedelta(days=1))
        spy_con.data_consolidated += self._on_spy_daily
        self.subscription_manager.add_consolidator(self._spy, spy_con)

        self._selected_symbols : Set[Symbol]                = set()
        self._price_history    : Dict[Symbol, Deque[float]] = {}
        self._top50_symbols    : Set[Symbol]                = set()
        self._top50_sorted     : List[Tuple[Symbol, float]] = []
        self._symbol_data      : Dict[Symbol, SymbolData]   = {}
        self._entry_date       : Dict[Symbol, object]       = {}
        self._max_lookback     : int = self.LOOKBACK_12M + 2

        self.set_warm_up(self._max_lookback + 10, Resolution.DAILY)

        self.schedule.on(
            self.date_rules.month_start("SPY"),
            self.time_rules.after_market_open("SPY", 1),
            self._rebalance,
        )
        self.schedule.on(
            self.date_rules.every_day("SPY"),
            self.time_rules.before_market_close("SPY", 5),
            self._check_signals,
        )

    # =========================================================================
    #  Universe
    # =========================================================================

    def _coarse_selection(self, coarse) -> List:
        filtered = [
            x for x in coarse
            if x.has_fundamental_data
            and x.price > 5
            and x.dollar_volume > 1_000_000
        ]
        top = sorted(filtered, key=lambda x: x.dollar_volume, reverse=True)[:self.UNIVERSE_SIZE]
        self._selected_symbols = {x.symbol for x in top}
        return [x.symbol for x in top]

    def on_securities_changed(self, changes: SecurityChanges) -> None:
        for sec in changes.added_securities:
            sym = sec.symbol
            if sym == self._spy:
                continue
            if sym not in self._price_history:
                self._price_history[sym] = deque(maxlen=self._max_lookback)
                con = TradeBarConsolidator(timedelta(days=1))
                def _make_ph_handler(s: Symbol):
                    def _h(_, bar: TradeBar) -> None:
                        ph = self._price_history.get(s)
                        if ph is not None:
                            ph.append(bar.close)
                    return _h
                con.data_consolidated += _make_ph_handler(sym)
                self.subscription_manager.add_consolidator(sym, con)

        for sec in changes.removed_securities:
            self._selected_symbols.discard(sec.symbol)

    # =========================================================================
    #  SPY regime filter
    # =========================================================================

    def _on_spy_daily(self, sender, bar: TradeBar) -> None:
        self._spy_closes.append(bar.close)

    def _spy_allows_trading(self) -> bool:
        if not self.USE_SPY_FILTER:
            return True
        if len(self._spy_closes) < self.SPY_MOMENTUM_DAYS + 1:
            return True
        return self._spy_closes[-1] > self._spy_closes[-(self.SPY_MOMENTUM_DAYS + 1)]

    # =========================================================================
    #  on_data  (every minute)
    # =========================================================================

    def on_data(self, data: Slice) -> None:
        if self.is_warming_up:
            return
        for sym, sd in self._symbol_data.items():
            if sym in data.bars:
                sd.update_minute(data.bars[sym])

    # =========================================================================
    #  Monthly rebalance
    # =========================================================================

    def _rebalance(self) -> None:
        if self.is_warming_up:
            return

        candidates: List[Tuple[Symbol, float]] = []
        for sym in self._selected_symbols:
            hist = self._price_history.get(sym)
            if hist is None or len(hist) < self.LOOKBACK_12M + 1:
                continue
            closes = list(hist)
            m12 = closes[-1] / closes[-(self.LOOKBACK_12M + 1)] - 1.0
            if self.USE_12M_FILTER and m12 <= 0:
                continue
            m6 = None
            if len(closes) >= self.LOOKBACK_6M + 1:
                m6 = closes[-1] / closes[-(self.LOOKBACK_6M + 1)] - 1.0
            if self.USE_6M_FILTER and m6 is not None and m6 <= 0:
                continue
            score = (m12 + (m6 if m6 is not None else m12)) / 2.0
            candidates.append((sym, score))

        candidates.sort(key=lambda x: x[1], reverse=True)
        new_top50      = candidates[:self.MOMENTUM_TAKE_N]
        new_top50_syms = {sym for sym, _ in new_top50}

        newly_added = new_top50_syms - set(self._symbol_data.keys())
        if newly_added:
            hist_size = self.ADX_PERIOD * self.ADX_HIST_MULT + 10
            try:
                ohlc = self.history(list(newly_added), hist_size + 5, Resolution.DAILY)
            except Exception as e:
                self.debug(f"OHLC history fetch failed: {e}")
                ohlc = None

            for sym in newly_added:
                sd = SymbolData(self.ADX_PERIOD, hist_size + 20)
                if ohlc is not None and not ohlc.empty:
                    try:
                        rows = ohlc.loc[sym]
                        for _, row in rows.iterrows():
                            sd._daily.append((row["high"], row["low"], row["close"]))
                        if sd._daily:
                            sd.yesterday_high = sd._daily[-1][0]
                    except (KeyError, Exception):
                        pass

                con = TradeBarConsolidator(timedelta(days=1))
                def _make_sd_handler(s: Symbol):
                    def _h(_, bar: TradeBar) -> None:
                        if s in self._symbol_data:
                            self._symbol_data[s].on_daily_bar(bar)
                    return _h
                con.data_consolidated += _make_sd_handler(sym)
                self.subscription_manager.add_consolidator(sym, con)
                self._symbol_data[sym] = sd

        for sym, score in new_top50:
            if sym in self._symbol_data:
                self._symbol_data[sym].momentum_score = score

        self._top50_symbols = new_top50_syms
        self._top50_sorted  = new_top50

        self.debug(
            f"{self.time.date()} -- rebalanced | "
            f"candidates: {len(candidates)} | top50: {len(new_top50)} | "
            f"spy_filter: {'OK' if self._spy_allows_trading() else 'BLOCKED'}"
        )

    # =========================================================================
    #  Signal check  (15:55 ET)
    # =========================================================================

    def _check_signals(self) -> None:
        if self.is_warming_up:
            return

        # -- EXIT: evaluate all currently invested positions ------------------
        for sym, sd in list(self._symbol_data.items()):
            if not self.portfolio[sym].invested:
                continue
            if not self.securities[sym].is_tradable:
                self.liquidate(sym)
                self._entry_date.pop(sym, None)
                self.log(f"EXIT {self.time:%Y-%m-%d} {sym.value} | NonTradable | forced liquidation")
                continue
            if not sd.ready:
                continue

            ibs = sd.ibs()

            # MAX_HOLD_DAYS time-stop (calendar days; 0 = disabled)
            if self.MAX_HOLD_DAYS > 0:
                entry = self._entry_date.get(sym)
                if entry is not None:
                    held_days = (self.time.date() - entry).days
                    if held_days >= self.MAX_HOLD_DAYS:
                        if not self.is_market_open(sym):
                            continue
                        self.liquidate(sym)
                        self._entry_date.pop(sym, None)
                        self.log(
                            f"EXIT {self.time:%Y-%m-%d} {sym.value} | "
                            f"MaxHoldDays={self.MAX_HOLD_DAYS} | held={held_days}d | "
                            f"IBS={ibs:.3f}"
                        )
                        continue

            exit_ibs  = ibs > self.IBS_EXIT_THR
            exit_high = sd.yesterday_high is not None and sd.day_c > sd.yesterday_high
            if exit_ibs or exit_high:
                if not self.is_market_open(sym):
                    continue
                reason = "IBS>0.8" if exit_ibs else "Close>PrevHigh"
                self.liquidate(sym)
                self._entry_date.pop(sym, None)
                self.log(
                    f"EXIT {self.time:%Y-%m-%d} {sym.value} | {reason} | "
                    f"IBS={ibs:.3f}  close={sd.day_c:.2f}  prevH={sd.yesterday_high:.2f}"
                )

        # -- SPY regime gate --------------------------------------------------
        if not self._spy_allows_trading():
            return

        # -- Slot accounting --------------------------------------------------
        invested_count = sum(1 for s in self._symbol_data if self.portfolio[s].invested)
        available      = self.MAX_POSITIONS - invested_count
        if available <= 0:
            return

        # -- Collect entry signals --------------------------------------------
        entry_cands: List[Tuple[Symbol, float, float, float]] = []
        for sym in self._top50_symbols:
            if self.portfolio[sym].invested:
                continue
            if not self.securities[sym].has_data:
                continue
            sd = self._symbol_data.get(sym)
            if sd is None or not sd.ready:
                continue
            ibs = sd.ibs()
            adx = sd.adx_value()
            if adx is None:
                continue
            if ibs < self.IBS_ENTRY_THR and adx > self.ADX_ENTRY_THR:
                entry_cands.append((sym, sd.momentum_score, ibs, adx))

        # Rank by momentum score; fill slots (best momentum first)
        entry_cands.sort(key=lambda x: x[1], reverse=True)
        for sym, score, ibs, adx in entry_cands[:available]:
            target_value   = self.portfolio.total_portfolio_value * self.POSITION_SIZE
            available_cash = min(self.portfolio.cash, self.portfolio.margin_remaining) * 0.97
            max_value      = min(target_value, available_cash)
            price = self.securities[sym].price
            if price <= 0 or max_value < 1_000:
                continue
            shares = int(max_value / price)
            cost   = shares * price
            if shares <= 0 or cost > available_cash:
                continue
            if not self.is_market_open(sym):
                continue
            self.market_order(sym, shares)
            self._entry_date[sym] = self.time.date()
            self.log(
                f"ENTRY {self.time:%Y-%m-%d} {sym.value} | "
                f"IBS={ibs:.3f}  ADX={adx:.1f}  MomScore={score:.3f}  "
                f"Shares={shares}  Cost=${cost:,.0f}"
            )

    # =========================================================================
    #  Wilder ADX  (pure Python, static)
    # =========================================================================

    @staticmethod
    def _wilder_adx(
        bars: List[Tuple[float, float, float]], period: int
    ) -> Optional[float]:
        n = len(bars)
        if n < period + 1:
            return None

        tr_v, pdm_v, ndm_v = [], [], []
        for i in range(1, n):
            h, l, c    = bars[i]
            ph, pl, pc = bars[i - 1]
            tr  = max(h - l, abs(h - pc), abs(l - pc))
            up  = h - ph
            dn  = pl - l
            tr_v.append(tr)
            pdm_v.append(up if (up > dn and up > 0) else 0.0)
            ndm_v.append(dn if (dn > up and dn > 0) else 0.0)

        def _ws(vals: List[float], p: int) -> List[float]:
            if len(vals) < p:
                return []
            s = sum(vals[:p]) / p
            out = [s]
            for v in vals[p:]:
                s = s - s / p + v / p
                out.append(s)
            return out

        atr_s = _ws(tr_v, period)
        pdm_s = _ws(pdm_v, period)
        ndm_s = _ws(ndm_v, period)
        if not atr_s:
            return None

        dx_v: List[float] = []
        for a, p, nm in zip(atr_s, pdm_s, ndm_s):
            if a == 0:
                dx_v.append(0.0)
            else:
                pdi = 100.0 * p  / a
                ndi = 100.0 * nm / a
                den = pdi + ndi
                dx_v.append(100.0 * abs(pdi - ndi) / den if den else 0.0)

        if len(dx_v) < period:
            return None
        adx_v = _ws(dx_v, period)
        return adx_v[-1] if adx_v else None

    # =========================================================================
    #  End summary
    # =========================================================================

    def on_end_of_algorithm(self) -> None:
        open_positions = [s for s in self._symbol_data if self.portfolio[s].invested]
        self.log(
            f"\n{'='*60}\n"
            f"  Top-50 IBS Momentum -- Final Summary\n"
            f"{'='*60}\n"
            f"  Portfolio Value  : ${self.portfolio.total_portfolio_value:>14,.2f}\n"
            f"  Cash             : ${self.portfolio.cash:>14,.2f}\n"
            f"  Open Positions   : {len(open_positions)}\n"
            f"  Max Hold Days    : {self.MAX_HOLD_DAYS} ({'disabled' if self.MAX_HOLD_DAYS == 0 else 'calendar days'})\n"
            f"{'='*60}"
        )