Overall Statistics
Total Orders
4282
Average Win
0.24%
Average Loss
-0.14%
Compounding Annual Return
23.784%
Drawdown
21.000%
Expectancy
0.409
Start Equity
1000000
End Equity
10482789.03
Net Profit
948.279%
Sharpe Ratio
1.298
Sortino Ratio
1.418
Probabilistic Sharpe Ratio
94.695%
Loss Rate
48%
Win Rate
52%
Profit-Loss Ratio
1.69
Alpha
0.102
Beta
0.5
Annual Standard Deviation
0.108
Annual Variance
0.012
Information Ratio
0.579
Tracking Error
0.108
Treynor Ratio
0.281
Total Fees
$45631.76
Estimated Strategy Capacity
$9600000.00
Lowest Capacity Asset
BBV R735QTJ8XC9X
Portfolio Turnover
3.95%
Drawdown Recovery
550
from AlgorithmImports import *
import numpy as np
import pandas as pd
from datetime import timedelta, time


class LargeCapDriftDispersionAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2015, 1, 1)
        self.SetCash(1_000_000)

        self.SetTimeZone(TimeZones.Chicago)

        self.resolution = Resolution.Minute
        self.spy = self.AddEquity("SPY", self.resolution).Symbol
        self.SetBenchmark("SPY")

        self.SetBrokerageModel(
            BrokerageName.InteractiveBrokersBrokerage,
            AccountType.Margin
        )

        self.UniverseSettings.Resolution = self.resolution

        tickers = [
            "AAPL","MSFT","AMZN","NVDA","META","GOOGL","BRK.B","JPM","JNJ","V","PG","HD","UNH","MA",
            "XOM","CVX","MRK","KO","PEP","ABBV","AVGO","COST","WMT","MCD","CSCO","IBM","CAT","BA","DIS","GS",
            "NVDA","AMD","TSLA","META","GOOGL","GOOG","AMZN","AVGO","SMCI","PLTR","NFLX","CRM","ADBE","INTU",
            "NOW","ORCL","QCOM","TXN","MU","PANW", "ABT", "T", "TMUS", "GEV", "ANET", "ACN", "PGR", "CEG", "ASML",
            "SCHW", "UL", "COF", "TD", "PFE", "SYK", "BBVA", "BN", "ARM", "PLD", "MDT", "CME", "SBUX", "DASH"
        ]

        symbols = [Symbol.Create(t, SecurityType.Equity, Market.USA) for t in tickers]
        self.SetUniverseSelection(ManualUniverseSelectionModel(symbols))

        self.SetAlpha(TridentBetaNeutralAlpha(
            reference_ticker="SPY",
            lookback=240,
            sma_period=200,
            resolution=self.resolution,
            n_each_side=2,
            emit_after_open_min=245,
            min_beta_spread=0.85,
            min_name_changes=2,
            long_frac_bull=0.60,
            long_frac_bear=0.40,
            beta_short_cap=3.35,
            beta_long_floor=-999.0
        ))

        self.SetPortfolioConstruction(GrossCappedInsightWeightingPCM(max_gross=0.10))
        self.SetExecution(StaggeredLongShortExecutionModel(delay_minutes=45))


class StaggeredLongShortExecutionModel(ExecutionModel):

    def __init__(self, delay_minutes: int = 20):
        self.delay = timedelta(minutes=int(delay_minutes))
        self.pending = {}

    def Execute(self, algorithm: QCAlgorithm, targets):
        if targets is None:
            targets = []

        now = algorithm.Time

        due = [sym for sym, rec in self.pending.items() if rec["time"] <= now]
        for sym in due:
            rec = self.pending.pop(sym, None)
            if rec is None:
                continue

            pct = float(rec["percent"])

            if sym not in algorithm.Securities or not algorithm.Securities[sym].HasData:
                continue

            algorithm.SetHoldings(sym, pct)

        for t in targets:
            sym = t.Symbol

            if sym not in algorithm.Securities:
                continue
            sec = algorithm.Securities[sym]
            if not sec.IsTradable:
                continue

            price = float(sec.Price)
            if not np.isfinite(price) or price <= 0:
                continue

            total_value = float(algorithm.Portfolio.TotalPortfolioValue)
            if not np.isfinite(total_value) or total_value <= 0:
                continue

            qty = float(t.Quantity)
            target_percent = (qty * price) / total_value

            if abs(target_percent) < 1e-6:
                if sym in self.pending:
                    self.pending.pop(sym, None)
                algorithm.Liquidate(sym)
                continue

            if target_percent < 0:
                self.pending[sym] = {"time": now + self.delay, "percent": float(target_percent)}
            else:
                if sym in self.pending:
                    self.pending.pop(sym, None)
                algorithm.SetHoldings(sym, float(target_percent))


class TridentBetaNeutralAlpha(AlphaModel):

    def __init__(self, reference_ticker: str, lookback: int, sma_period: int, resolution: Resolution,
                 n_each_side: int = 2, emit_after_open_min: int = 15,
                 min_beta_spread: float = 0.40, min_name_changes: int = 2,
                 long_frac_bull: float = 0.60, long_frac_bear: float = 0.40,
                 beta_short_cap: float = 1.60, beta_long_floor: float = -999.0):
        super().__init__()
        self.Name = "Trident_Beta_Asym_SMA_Capped_EqualWeight"

        self.reference_ticker = reference_ticker
        self.lookback = int(lookback)
        self.sma_period = int(sma_period)
        self.resolution = resolution

        self.n_each_side = int(n_each_side)
        self.emit_after_open_min = int(emit_after_open_min)

        self.min_beta_spread = float(min_beta_spread)
        self.min_name_changes = int(min_name_changes)

        self.long_frac_bull = float(long_frac_bull)
        self.long_frac_bear = float(long_frac_bear)

        self.beta_short_cap = float(beta_short_cap)
        self.beta_long_floor = float(beta_long_floor)

        self.cache = {}
        self.reference = None
        self.get_next_close = None

        self.last_emit_date = None
        self.last_longs = set()
        self.last_shorts = set()

        self.ref_sma200 = None

    def OnSecuritiesChanged(self, algorithm: QCAlgorithm, changes: SecurityChanges):
        if self.reference is None:
            ref_sec = algorithm.AddEquity(self.reference_ticker, self.resolution)
            self.reference = ref_sec.Symbol
            self.get_next_close = ref_sec.Exchange.Hours.GetNextMarketClose
            self.ref_sma200 = algorithm.SMA(self.reference, 200, Resolution.Daily)

        added = [x.Symbol for x in changes.AddedSecurities]
        if not added:
            return

        if self.reference not in added:
            added = added + [self.reference]

        self._init_symboldata_from_history(algorithm, added)

    def Update(self, algorithm: QCAlgorithm, data: Slice):
        insights = []

        if algorithm.IsWarmingUp:
            return insights

        if not self._after_open(algorithm):
            return insights

        today = algorithm.Time.date()
        if self.last_emit_date == today:
            return insights

        if self.reference is None or self.reference not in data.Bars:
            return insights

        actives = [kv.Key for kv in algorithm.ActiveSecurities if kv.Value.Invested or kv.Value.HasData]
        needed = [s for s in actives if s.SecurityType == SecurityType.Equity and s not in self.cache]
        if self.reference not in self.cache:
            needed.append(self.reference)
        if needed:
            self._init_symboldata_from_history(algorithm, needed)

        ref_sd = self.cache.get(self.reference, None)
        if ref_sd is None:
            return insights

        ref_series = ref_sd.GetSeries()
        if ref_series is None or len(ref_series) < 50:
            return insights

        betas = {}
        for sym, sd in self.cache.items():
            if sym == self.reference:
                continue
            beta = sd.compute_beta_vs(ref_series)
            if beta is not None and np.isfinite(beta):
                betas[sym] = beta

        if len(betas) < (2 * self.n_each_side):
            return insights

        long_candidates = [(s, b) for s, b in betas.items() if b >= self.beta_long_floor]
        short_candidates = [(s, b) for s, b in betas.items() if b <= self.beta_short_cap]

        if len(long_candidates) < self.n_each_side or len(short_candidates) < self.n_each_side:
            self.last_emit_date = today
            return insights

        long_candidates.sort(key=lambda kv: kv[1])
        short_candidates.sort(key=lambda kv: kv[1])

        longs = long_candidates[:self.n_each_side]
        shorts = short_candidates[-self.n_each_side:]

        beta_low = longs[0][1]
        beta_high = shorts[-1][1]
        beta_spread = beta_high - beta_low
        if beta_spread < self.min_beta_spread:
            self.last_emit_date = today
            return insights

        new_longs = set(sym for sym, _ in longs)
        new_shorts = set(sym for sym, _ in shorts)

        if self.last_longs or self.last_shorts:
            changes_count = len(new_longs.symmetric_difference(self.last_longs)) + len(new_shorts.symmetric_difference(self.last_shorts))
            if changes_count < self.min_name_changes:
                self.last_emit_date = today
                return insights

        period = timedelta(days=1)

        long_frac = 0.50
        if self.ref_sma200 is not None and self.ref_sma200.IsReady:
            ref_price = float(algorithm.Securities[self.reference].Price)
            ref_sma = float(self.ref_sma200.Current.Value)
            long_frac = self.long_frac_bull if ref_price >= ref_sma else self.long_frac_bear
        short_frac = 1.0 - long_frac

        w_long_each = long_frac / max(1, len(longs))
        w_short_each = short_frac / max(1, len(shorts))

        self.last_emit_date = today
        self.last_longs = new_longs
        self.last_shorts = new_shorts

        for sym, _ in longs:
            if w_long_each > 0:
                insights.append(Insight.Price(sym, period, InsightDirection.Up, None, None, self.Name, w_long_each))

        for sym, _ in shorts:
            if w_short_each > 0:
                insights.append(Insight.Price(sym, period, InsightDirection.Down, None, None, self.Name, w_short_each))

        return insights

    def _after_open(self, algorithm: QCAlgorithm) -> bool:
        if self.reference is None:
            return False
        sec = algorithm.Securities[self.reference]
        open_time = sec.Exchange.Hours.GetNextMarketOpen(algorithm.Time - timedelta(days=1), False)
        return algorithm.Time >= open_time + timedelta(minutes=self.emit_after_open_min)

    def _init_symboldata_from_history(self, algorithm: QCAlgorithm, symbols):
        symbols = list({s for s in symbols if s is not None})
        if not symbols:
            return

        bars_needed = self.lookback + self.sma_period
        hist = algorithm.History(symbols, bars_needed, self.resolution)
        if hist.empty:
            return

        try:
            close_df = hist["close"].unstack(level=0)
        except Exception:
            try:
                close_df = hist.close.unstack(level=0)
            except Exception:
                return

        for sym in symbols:
            if sym in self.cache:
                continue
            if sym not in close_df.columns:
                continue

            self.cache[sym] = SymbolData(
                algorithm=algorithm,
                symbol=sym,
                lookback=self.lookback,
                sma_period=self.sma_period,
                resolution=self.resolution,
                history_series=close_df[sym]
            )


class GrossCappedInsightWeightingPCM(PortfolioConstructionModel):
    def __init__(self, max_gross: float = 0.30):
        super().__init__()
        self.max_gross = float(max_gross)

    def CreateTargets(self, algorithm: QCAlgorithm, insights):
        if not insights:
            return []

        desired = {}
        for ins in insights:
            if ins.Weight is None:
                continue
            w = float(ins.Weight)
            if w <= 0:
                continue
            sign = -1.0 if ins.Direction == InsightDirection.Down else 1.0
            desired[ins.Symbol] = sign * w

        desired = {s: w for s, w in desired.items() if abs(w) > 1e-6}
        if not desired:
            return []

        gross = sum(abs(w) for w in desired.values())
        if gross <= 1e-9:
            return []

        scale = min(1.0, self.max_gross / gross)
        return [PortfolioTarget.Percent(algorithm, sym, w * scale) for sym, w in desired.items()]


class SymbolData:
    def __init__(self, algorithm: QCAlgorithm, symbol: Symbol, lookback: int, sma_period: int,
                 resolution: Resolution, history_series: pd.Series):
        self.symbol = symbol
        self.lookback = int(lookback)

        self.window = RollingWindow[IndicatorDataPoint](self.lookback)
        self.sma = algorithm.SMA(symbol, int(sma_period), resolution)

        def on_sma_updated(sender, updated):
            if sender.IsReady:
                self.window.Add(updated)

        self.sma.Updated += on_sma_updated

        s = history_series.ffill().bfill()
        for t, price in s.items():
            self.sma.Update(t, float(price))

    def GetSeries(self):
        if not self.window.IsReady:
            return None
        series = pd.Series({x.EndTime: x.Value for x in self.window})
        return series.iloc[::-1]

    def compute_beta_vs(self, reference_series: pd.Series):
        sym_series = self.GetSeries()
        if sym_series is None or reference_series is None:
            return None

        df = pd.concat([sym_series.rename("sym"), reference_series.rename("ref")], axis=1).dropna()
        if len(df) < 50:
            return None

        sym = df["sym"].values
        ref = df["ref"].values

        sym_ret = np.diff(sym) / sym[:-1]
        ref_ret = np.diff(ref) / ref[:-1]

        n = min(len(sym_ret), len(ref_ret))
        if n < 50:
            return None

        x = sym_ret[:n]
        y = ref_ret[:n]

        A = np.vstack([x, np.ones(n)]).T
        beta, alpha = np.linalg.lstsq(A, y, rcond=None)[0]
        return float(beta)
OrderTypeKeys = [
    'Market', 'Limit', 'StopMarket', 'StopLimit', 'MarketOnOpen',
    'MarketOnClose', 'OptionExercise',
]

OrderTypeCodes = dict(zip(range(len(OrderTypeKeys)), OrderTypeKeys))