Overall Statistics
Total Orders
852
Average Win
0.69%
Average Loss
-0.31%
Compounding Annual Return
17.360%
Drawdown
11.000%
Expectancy
0.593
Start Equity
100000
End Equity
222639.00
Net Profit
122.639%
Sharpe Ratio
0.895
Sortino Ratio
1.069
Probabilistic Sharpe Ratio
71.202%
Loss Rate
50%
Win Rate
50%
Profit-Loss Ratio
2.20
Alpha
0.054
Beta
0.437
Annual Standard Deviation
0.095
Annual Variance
0.009
Information Ratio
0.129
Tracking Error
0.108
Treynor Ratio
0.195
Total Fees
$1281.65
Estimated Strategy Capacity
$0
Lowest Capacity Asset
DHI R735QTJ8XC9X
Portfolio Turnover
3.56%
Drawdown Recovery
223
# region imports
from AlgorithmImports import *
import numpy as np
import math
from datetime import datetime
# endregion


# ──────────────────────────────────────────────────────────────────────────────
# Strategy 2 Helper: Trend Predictor
# ──────────────────────────────────────────────────────────────────────────────

class TrendPredictor:
    """OLS log-price slope as a lightweight trend proxy."""

    def __init__(self, lookback: int = 21) -> None:
        self._lookback = max(2, int(lookback))

    @property
    def lookback(self) -> int:
        return self._lookback

    def predict(self, closes: list) -> tuple:
        n = len(closes)
        if n < 2:
            return 0.0, False

        log_prices = []
        for p in closes:
            if p is None:
                return 0.0, False
            price = float(p)
            if price <= 0:
                return 0.0, False
            log_prices.append(math.log(price))

        x_mean = (n - 1) / 2.0
        y_mean = sum(log_prices) / float(n)
        num = den = 0.0

        for i in range(n):
            dx = float(i) - x_mean
            dy = log_prices[i] - y_mean
            num += dx * dy
            den += dx * dx

        if den == 0.0:
            return 0.0, False

        slope       = num / den
        trend_score = slope * float(n)
        return float(trend_score), slope > 0.0


# ──────────────────────────────────────────────────────────────────────────────
# Combined Algorithm
# ──────────────────────────────────────────────────────────────────────────────

class CombinedStrategy_V1_1(QCAlgorithm):
    """
    Combined Strategy V1.1 — Three Improvements Build

    Improvements over V1.0:
    ─────────────────────────────────────────────────────────────────────────
    [1] REGIME-AWARE ALLOCATION
        Dynamic S1/S2 split based on SPY vs 200-day SMA.
        Bull regime  (SPY > SMA +2%): S1=70%, S2=30% — momentum thrives
        Bear regime  (SPY < SMA -2%): S1=50%, S2=50% — value/quality holds
        Neutral:                       S1=60%, S2=40% — default

    [2] S2 LIQUIDITY FILTER
        Minimum $5M average daily dollar volume added to fine selection.
        Eliminates micro-caps that backtest optimistically but fill poorly live.
        Addresses the $0 capacity constraint from V1.0.

    [3] S2 INTRADAY STOP LOSS
        8% stop loss applied daily to all S2 positions.
        Prevents holding deteriorating positions until monthly rebalance.
        Stopped-out symbols are removed from s2_last_selection immediately.
    ─────────────────────────────────────────────────────────────────────────

    Capital Split (dynamic):
        Bull:    S1=70% / S2=30%
        Neutral: S1=60% / S2=40%
        Bear:    S1=50% / S2=50%
    """

    # ── Global Toggles ─────────────────────────────────────────────────────
    USE_CASH_PROXY = False   # True = SGOV | False = IBKR native interest

    # ── Default Allocations (overridden dynamically) ────────────────────────
    S1_ALLOCATION  = 0.60
    S2_ALLOCATION  = 0.40

   
    def Initialize(self):
        # ── 1. Backtest & Brokerage Settings ──────────────────────────────
        #self.SetStartDate(2021, 1, 1)
        #self.SetEndDate(2026, 1, 1)
        self.SetEndDate(2026, 1, 1)
        self.set_start_date(self.end_date - timedelta(5*365)) 
        self.SetCash(100_000)
        self.SetBrokerageModel(
            BrokerageName.InteractiveBrokersBrokerage,
            AccountType.Margin
        )
        self.SetBenchmark("SPY")
       
        # ── 2. Cash Proxy (shared across both strategies) ──────────────────
        if self.USE_CASH_PROXY:
            self.cash_proxy = self.AddEquity("SGOV", Resolution.Daily).Symbol
        else:
            self.cash_proxy = None
            self.Log("SGOV disabled — idle cash earning IBKR native interest")

        # ══════════════════════════════════════════════════════════════════
        # STRATEGY 1 SETUP
        # ══════════════════════════════════════════════════════════════════

        self.s1_max_positions  = 5
        self.s1_rsi_entry      = 41
        self.s1_max_trade_days = 50
        self.s1_sgov_tolerance = 0.02

        # S1 Hybrid Crypto
        self.ibit_launch = datetime(2024, 1, 11)
        self.gbtc = self.AddEquity("GBTC", Resolution.Daily).Symbol
        self.ibit = self.AddEquity("IBIT", Resolution.Daily).Symbol

        # S1 Universe
        self.s1_assets = {
            "QQQ"  : "Tech",
            "NVDA" : "Tech",
            "XLF"  : "Finance",
            "XLV"  : "Health",
            "GLD"  : "Gold",
            "TLT"  : "Treasury",
            "USO"  : "Commodity",
            "XLE"  : "Commodity",
            "XLP"  : "Equity",
        }

        self.s1_symbols = []
        self.s1_data    = {}

        for ticker, asset_type in self.s1_assets.items():
            symbol = self.AddEquity(ticker, Resolution.Daily).Symbol
            self.s1_symbols.append(symbol)
            self.s1_data[symbol] = self._MakeIndicatorBundle(symbol, asset_type)

        for symbol in [self.gbtc, self.ibit]:
            self.s1_data[symbol] = self._MakeIndicatorBundle(symbol, "Crypto")

        # S1 Market Regime Filter
        self.spy     = self.AddEquity("SPY", Resolution.Daily).Symbol
        self.spy_std = self.STD(self.spy, 10, Resolution.Daily)

   
        # ══════════════════════════════════════════════════════════════════
        # STRATEGY 2 SETUP
        # ══════════════════════════════════════════════════════════════════

        self.universe_settings.resolution = Resolution.Daily

        self.s2_portfolio_size           = 15
        self.s2_trend_predictor          = TrendPredictor(lookback=21)
        self.s2_vol_lookback             = 21
        self.s2_max_drawdown_for_scaling = 0.10
        self.s2_coarse_count             = 1000

        # S2 State
        self.s2_fine_working_set    = []
        self.s2_pe_by_symbol        = {}
        self.s2_last_selection      = []
        self.s2_equity_peak         = self.Portfolio.TotalPortfolioValue
        self.s2_latest_coarse_count = 0
        self.s2_latest_fine_pass    = 0
        self.s2_latest_ml_pass      = 0

        # S2 Universe Selection
        self.AddUniverse(
            self._s2_coarse_selection,
            self._s2_fine_selection
        )

        # S2 Monthly Rebalance
        self.Schedule.On(
            self.DateRules.MonthStart(self.spy),
            self.TimeRules.AfterMarketOpen(self.spy, 30),
            self._s2_rebalance
        )

        # ── Warmup ────────────────────────────────────────────────────────
        self.SetWarmUp(200)

    # ══════════════════════════════════════════════════════════════════════════
    # [IMPROVEMENT 1] REGIME-AWARE ALLOCATION
    # ══════════════════════════════════════════════════════════════════════════

       # ══════════════════════════════════════════════════════════════════════════
    # STRATEGY 1 HELPERS
    # ══════════════════════════════════════════════════════════════════════════

    def _MakeIndicatorBundle(self, symbol, asset_type):
        return {
            "rsi"       : self.RSI(symbol, 14, MovingAverageType.Wilders, Resolution.Daily),
            "sma_200"   : self.SMA(symbol, 200, Resolution.Daily),
            "atr"       : self.ATR(symbol, 14, MovingAverageType.Wilders, Resolution.Daily),
            "mom"       : self.ROC(symbol, 20, Resolution.Daily),
            "hwm"       : 0.0,
            "entry_date": None,
            "type"      : asset_type,
        }

    def _GetActiveCrypto(self):
        return self.ibit if self.Time >= self.ibit_launch else self.gbtc

    # ══════════════════════════════════════════════════════════════════════════
    # STRATEGY 2 — UNIVERSE SELECTION
    # ══════════════════════════════════════════════════════════════════════════

    def _s2_coarse_selection(self, coarse):
        filtered = [
            x for x in coarse
            if x.HasFundamentalData
            and x.Price is not None
            and x.Price > 5
            #and x.DollarVolume > 5_000_000   # Liquidity filter — reliable here
        ]
        filtered.sort(key=lambda c: c.DollarVolume, reverse=True)
        selected = filtered[:self.s2_coarse_count]
        self.s2_latest_coarse_count = len(selected)
        return [c.Symbol for c in selected]


    def _s2_fine_selection(self, fine):
        passed       = []
        pe_by_symbol = {}

        for f in fine:

            pe = self._s2_get_float(f, [
                "ValuationRatios.PERatio", "ValuationRatios.PE",
                "PERatio", "PE"
            ], float("nan"))

            dte = self._s2_get_float(f, [
                "OperationRatios.DebtEquityRatio",
                "OperationRatios.TotalDebtEquityRatio",
                "DebtEquityRatio",
            ], float("nan"))

            div = self._s2_get_float(f, [
                "ValuationRatios.ForwardDividendYield",
                "ValuationRatios.DividendYield",
                "DividendYield",
            ], float("nan"))

            roi = self._s2_get_float(f, [
                "OperationRatios.ROIC",
                "OperationRatios.ReturnOnInvestment",
                "ROIC", "ROI",
            ], float("nan"))

            if not all(self._s2_is_finite(v) for v in [pe, dte, div, roi]):
                continue
            if pe < 5.0 or pe > 15.0:  continue
            if dte >= 1.0:              continue
            if div <= 0.01:             continue
            if roi <= 0.10:             continue

            passed.append(f)
            pe_by_symbol[f.Symbol] = float(pe)

        passed.sort(
            key=lambda ff: pe_by_symbol.get(ff.Symbol, float("inf")),
            reverse=True
        )
        passed = passed[:self.s2_portfolio_size + 5]

        self.s2_fine_working_set = [ff.Symbol for ff in passed]
        self.s2_pe_by_symbol     = pe_by_symbol
        self.s2_latest_fine_pass = len(self.s2_fine_working_set)

        return self.s2_fine_working_set

    # ══════════════════════════════════════════════════════════════════════════
    # STRATEGY 2 — MONTHLY REBALANCE
    # ══════════════════════════════════════════════════════════════════════════

    def _s2_rebalance(self):
        if self.IsWarmingUp:
            return

        # Get dynamic allocations at rebalance time
        s1_alloc, s2_alloc = self.S1_ALLOCATION, self.S2_ALLOCATION

        self.s2_equity_peak = max(
            self.s2_equity_peak,
            self.Portfolio.TotalPortfolioValue
        )
        current_dd = self._s2_portfolio_drawdown()

        candidates = list(self.s2_fine_working_set)
        if not candidates:
            self.Debug("S2 Rebalance: no candidates, skipping.")
            return

        # Trend scoring
        scores  = {}
        vols    = {}
        ml_pass = []

        history = self.History(
            candidates,
            self.s2_trend_predictor.lookback,
            Resolution.Daily
        )
        if history.empty:
            self.Debug("S2 Rebalance: empty history, skipping.")
            return

        for symbol in candidates:
            try:
                if symbol not in history.index.get_level_values(0):
                    continue
                sym_hist = history.loc[symbol]
                if "close" not in sym_hist.columns:
                    continue
                closes = [
                    float(x)
                    for x in sym_hist["close"].dropna().values.tolist()
                ]
                if len(closes) < self.s2_trend_predictor.lookback:
                    continue

                score, up        = self.s2_trend_predictor.predict(closes)
                scores[symbol]   = float(score)
                vols[symbol]     = self._s2_calculate_volatility(
                    closes[-self.s2_vol_lookback:]
                )
                if up:
                    ml_pass.append(symbol)
            except Exception:
                continue

        self.s2_latest_ml_pass = len(ml_pass)

        # Select top 15 by trend score then sort by P/E
        selected = sorted(
            ml_pass,
            key=lambda s: scores.get(s, float("-inf")),
            reverse=True
        )[:self.s2_portfolio_size]

        selected.sort(
            key=lambda s: self.s2_pe_by_symbol.get(s, float("inf")),
            reverse=True
        )
        selected = selected[:self.s2_portfolio_size]

        self.Debug(
            f"S2 funnel: coarse={self.s2_latest_coarse_count}, "
            f"fine={self.s2_latest_fine_pass}, "
            f"ml={self.s2_latest_ml_pass}, "
            f"selected={len(selected)} | "
            f"regime alloc S1={s1_alloc:.0%} S2={s2_alloc:.0%}"
        )

        # Liquidate S2 positions no longer selected
        # Never liquidate S1 symbols or cash proxy
        s1_active    = self.s1_symbols + [self._GetActiveCrypto()]
        selected_set = set(selected)

        for symbol, holding in self.Portfolio.items():
            if (holding.Invested
                    and symbol not in selected_set
                    and symbol not in s1_active
                    and symbol != self.cash_proxy):
                self.Liquidate(symbol, "S2 removed from selection")

        if not selected:
            self.s2_last_selection = []
            return

        # Inverse-vol weighting scaled to dynamic S2 allocation
        inv_vols = {}
        for symbol in selected:
            vol = vols.get(symbol, float("inf"))
            if vol and vol > 0 and math.isfinite(vol):
                inv_vols[symbol] = 1.0 / vol

        if not inv_vols:
            base_weight = s2_alloc / float(len(selected))
            weights     = {s: base_weight for s in selected}
        else:
            total_inv_vol = sum(inv_vols.values())
            weights = {
                s: (inv_vols[s] / total_inv_vol) * s2_alloc
                for s in inv_vols
            }

        # Drawdown guard — scale by 0.5 if portfolio DD > 10%
        scale = 0.5 if current_dd > self.s2_max_drawdown_for_scaling else 1.0
        if scale < 1.0:
            self.Debug(f"S2 DD guard triggered: {current_dd:.2%}, scaling by 0.5")

        for symbol in selected:
            if symbol not in self.Securities:
                continue
            if not self.Securities[symbol].IsTradable:
                continue

            target = weights.get(symbol, 0.0) * scale
            if target <= 0:
                continue

            # Overlap handling: if S1 also holds this symbol, add weights
            # but cap combined at 40% to prevent over-concentration
            if symbol in s1_active:
                current_weight = (
                    self.Portfolio[symbol].HoldingsValue
                    / self.Portfolio.TotalPortfolioValue
                )
                target = min(current_weight + target, 0.40)

            self.SetHoldings(symbol, target)

        self.s2_last_selection = selected

    # ══════════════════════════════════════════════════════════════════════════
    # MAIN EVENT — STRATEGY 1 DAILY + S2 STOP LOSS
    # ══════════════════════════════════════════════════════════════════════════

    def OnData(self, data: Slice):
        if self.IsWarmingUp or not self.spy_std.IsReady:
            return

        s1_alloc, s2_alloc = self.S1_ALLOCATION, self.S2_ALLOCATION

        active_crypto   = self._GetActiveCrypto()
        inactive_crypto = self.ibit if active_crypto == self.gbtc else self.gbtc

        # Crypto switchover guard
        if self.Portfolio[inactive_crypto].Invested:
            self.Liquidate(inactive_crypto, "Crypto proxy switchover")
            self.s1_data[inactive_crypto]["hwm"]        = 0.0
            self.s1_data[inactive_crypto]["entry_date"] = None

        s1_active = self.s1_symbols + [active_crypto]

        # ── S1 EXIT LOGIC ──────────────────────────────────────────────────
        for s in [s for s in s1_active if self.Portfolio[s].Invested]:
            if not data.Bars.ContainsKey(s):
                continue

            d     = self.s1_data[s]
            price = self.Securities[s].Price
            d["hwm"] = max(d["hwm"], price)

            if d["type"] == "Crypto":
                exit_rsi, atr_mult = 85, 3.2
            elif d["type"] == "Commodity":
                exit_rsi, atr_mult = 80, 3.0
            else:
                exit_rsi, atr_mult = 75, 2.5

            days_held     = (self.Time - d["entry_date"]).days
            trailing_stop = d["hwm"] - (d["atr"].Current.Value * atr_mult)

            if (d["rsi"].Current.Value > exit_rsi
                    or price < trailing_stop
                    or days_held > self.s1_max_trade_days):

                self.Liquidate(s, "S1 Sniper Exit")
                d["hwm"]        = 0.0
                d["entry_date"] = None

        # ── S1 ENTRY LOGIC ─────────────────────────────────────────────────
        is_high_vol = self.spy_std.Current.Value > (
            self.Securities[self.spy].Price * 0.015
        )
        s1_invested = sum(1 for s in s1_active if self.Portfolio[s].Invested)

        if s1_invested < self.s1_max_positions:
            candidates = []
            for s in s1_active:
                if self.Portfolio[s].Invested or not data.Bars.ContainsKey(s):
                    continue
                d = self.s1_data[s]
                if not d["rsi"].IsReady or not d["sma_200"].IsReady:
                    continue
                if (self.Securities[s].Price > d["sma_200"].Current.Value
                        and d["rsi"].Current.Value < self.s1_rsi_entry):
                    candidates.append(s)

            candidates.sort(
                key=lambda x: self.s1_data[x]["mom"].Current.Value,
                reverse=True
            )

            for s in candidates:
                if sum(
                    1 for sym in s1_active
                    if self.Portfolio[sym].Invested
                ) >= self.s1_max_positions:
                    break

                d        = self.s1_data[s]
                risk_pct = 0.01 if is_high_vol else 0.03
                atr_val  = d["atr"].Current.Value
                if atr_val <= 0:
                    continue

                qty_factor = (
                    self.Portfolio.TotalPortfolioValue * risk_pct
                ) / (atr_val * 3)

                if d["type"] == "Crypto":      max_w = 0.30
                elif d["type"] == "Commodity":  max_w = 0.25
                else:                           max_w = 0.33

                # Scale S1 target by dynamic s1_alloc
                raw_weight = min(
                    max_w,
                    (qty_factor * self.Securities[s].Price)
                    / self.Portfolio.TotalPortfolioValue
                )
                target_weight = raw_weight * s1_alloc

                if target_weight > 0.02:
                    self.SetHoldings(s, target_weight)
                    d["hwm"]        = self.Securities[s].Price
                    d["entry_date"] = self.Time

        # ── SHARED SGOV CASH MANAGEMENT ────────────────────────────────────
        if not self.USE_CASH_PROXY:
            return
        if not data.Bars.ContainsKey(self.cash_proxy):
            return

        s2_symbols   = set(self.s2_last_selection)
        all_tactical = set(s1_active) | s2_symbols

        tactical_value = sum(
            self.Portfolio[s].HoldingsValue
            for s in all_tactical
            if s in self.Portfolio
        )
        tactical_weight     = tactical_value / self.Portfolio.TotalPortfolioValue
        sgov_target_weight  = max(0, 0.95 - tactical_weight)
        sgov_current_weight = (
            self.Portfolio[self.cash_proxy].HoldingsValue
            / self.Portfolio.TotalPortfolioValue
        )
        sgov_deviation = abs(sgov_current_weight - sgov_target_weight)

        if sgov_deviation > self.s1_sgov_tolerance:
            if sgov_target_weight > 0.05:
                self.SetHoldings(self.cash_proxy, sgov_target_weight)
            elif tactical_weight > 0.92:
                self.Liquidate(
                    self.cash_proxy,
                    "Clearing for tactical signal"
                )

    # ══════════════════════════════════════════════════════════════════════════
    # STRATEGY 2 UTILITY METHODS
    # ══════════════════════════════════════════════════════════════════════════

    def _s2_get_float(self, root, paths, default):
        for path in paths:
            value   = self._s2_try_attr(root, path)
            numeric = self._s2_coerce(value)
            if numeric is not None:
                return float(numeric)
        return float(default)

    def _s2_try_attr(self, root, path):
        current = root
        for part in path.split("."):
            if current is None or not hasattr(current, part):
                return None
            current = getattr(current, part)
        return current

    def _s2_coerce(self, value):
        if value is None:
            return None
        if hasattr(value, "Value"):
            try:
                return float(getattr(value, "Value"))
            except Exception:
                return None
        try:
            return float(value)
        except Exception:
            return None

    def _s2_is_finite(self, x):
        try:
            return x is not None and math.isfinite(float(x))
        except Exception:
            return False

    def _s2_calculate_volatility(self, closes):
        if len(closes) < 2:
            return float("inf")
        returns = []
        for i in range(1, len(closes)):
            if closes[i - 1] > 0 and closes[i] > 0:
                returns.append(math.log(closes[i] / closes[i - 1]))
        if len(returns) < 2:
            return float("inf")
        return float(np.std(returns))

    def _s2_portfolio_drawdown(self):
        if self.s2_equity_peak <= 0:
            return 0.0
        equity = self.Portfolio.TotalPortfolioValue
        return max(
            0.0,
            (self.s2_equity_peak - equity) / self.s2_equity_peak
        )