Overall Statistics
Total Orders
210
Average Win
2.95%
Average Loss
-1.55%
Compounding Annual Return
18.180%
Drawdown
26.600%
Expectancy
0.677
Start Equity
10000
End Equity
28078.89
Net Profit
180.789%
Sharpe Ratio
0.676
Sortino Ratio
0.624
Probabilistic Sharpe Ratio
30.913%
Loss Rate
42%
Win Rate
58%
Profit-Loss Ratio
1.90
Alpha
0.092
Beta
0.139
Annual Standard Deviation
0.153
Annual Variance
0.023
Information Ratio
0.101
Tracking Error
0.21
Treynor Ratio
0.741
Total Fees
$250.21
Estimated Strategy Capacity
$5800000.00
Lowest Capacity Asset
SATS TYZ2C9FOCMED
Portfolio Turnover
2.23%
Drawdown Recovery
477
from AlgorithmImports import *
import numpy as np
import pandas as pd
from hmmlearn.hmm import GaussianHMM
from arch import arch_model
from sklearn.ensemble import RandomForestRegressor 
import warnings

warnings.filterwarnings("ignore")

class InstitutionalUniverseRotation(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2020, 1, 1)
        self.SetCash(10000)

        # 1. Hybrid Universe Selection Setup
        self.UniverseSettings.Resolution = Resolution.Daily
        self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction))
        self.active_universe_symbols = []

        # HARD GATES: Explicit Sector Blocklist to wipe out hyper-volatile crypto proxies
        self.crypto_blocklist = {"MARA", "CLSK", "IREN", "CIFR", "COIN", "MSTR", "RIOT", "HUT"}

        # 2. Benchmark & Macro Core Elements
        self.spy = self.AddEquity("SPY", Resolution.Minute)
        self.spy.SetDataNormalizationMode(DataNormalizationMode.Raw)
        self.rsp = self.AddEquity("RSP", Resolution.Minute) 
        self.rsp.SetDataNormalizationMode(DataNormalizationMode.Raw)
        self.SetBenchmark("SPY")

        # 3. Defensive Risk-Parity Vault Assets 
        self.shv = self.AddEquity("SHV", Resolution.Minute).Symbol   # High-Yield Cash proxy anchor
        self.pst = self.AddEquity("PST", Resolution.Minute).Symbol   # Inverse Bond rate hedge
        self.dbc = self.AddEquity("DBC", Resolution.Minute).Symbol   # Tangible Commodities structural hedge

        # 4. Institutional Macro Data Feeds (Registered natively via Fred)
        self.vix3m = self.AddIndex("VIX3M", Resolution.Daily).Symbol
        self.vix = self.AddIndex("VIX", Resolution.Daily).Symbol
        self.vvix = self.AddIndex("VVIX", Resolution.Daily).Symbol
        
        self.dgs1 = self.AddData(Fred, "DGS1", Resolution.Daily).Symbol 
        self.credit_spread = self.AddData(Fred, "BAMLH0A0HYM2", Resolution.Daily).Symbol 
        self.yield_curve = self.AddData(Fred, "T10Y2Y", Resolution.Daily).Symbol 

        # --- REGIME SWITCHING TIMELINES ---
        self.current_regime = None
        self.pending_regime = None
        self.consecutive_days = 0
        self.confirmation_days = 5  
        self.lookback_days = 1250               

        # --- PROFIT-TAKING TRACERS ---
        self.halved_positions = {}

        # 5. Volatility Machine Learning Pricing Setup & GARCH Steering Variables
        self.ml_model = RandomForestRegressor(n_estimators=100, max_depth=7, random_state=42)
        self.is_model_trained = False
        self.current_predicted_iv = None
        self.current_rf_rate = 0.04
        self.garch_vol_forecast = 0.15 
        
        # Institutional Volatility Steering parameters initialized
        self.long_term_vol_mean = 0.15
        self.garch_multiplier = 1.0
        
        # 6. Performance Graph Settings
        benchmark_chart = Chart("Strategy vs Buy & Hold")
        benchmark_chart.AddSeries(Series("Algorithm Equity", SeriesType.Line, 0))
        benchmark_chart.AddSeries(Series("SPY Buy & Hold", SeriesType.Line, 0))
        self.AddChart(benchmark_chart)
        
        self.initial_spy_price = None
        self.starting_cash = 10000 
        
        # 7. Warmup Data Pumps & Scheduled Tasks
        self.SetWarmup(self.lookback_days, Resolution.Daily)
        self.Train(self.TrainMLModel)
        self.Train(self.DateRules.MonthStart("SPY"), self.TimeRules.At(8, 0), self.TrainMLModel)
        self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 5), self.UpdateDailyMetrics)
        
        # Enforce Monthly Cadence Rebalancing Schedule
        self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.AfterMarketOpen("SPY", 30), self.RebalanceUniverseRotation)

    def CoarseSelectionFunction(self, coarse):
        """Filters the entire US market down to the top 300 most liquid assets"""
        sorted_by_dollar_volume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
        return [x.Symbol for x in sorted_by_dollar_volume[:300]]

    def FineSelectionFunction(self, fine):
        """Filters liquidity pool using fundamental constraints and removes blocked sectors"""
        nasdaq_share = [x for x in fine if x.CompanyReference.PrimaryExchangeID == "NAS" and x.SecurityReference.IsPrimaryShare]
        
        institutional_quality_pool = []
        for x in nasdaq_share:
            if x.FinancialStatements is None or x.FinancialStatements.IncomeStatement is None:
                continue
                
            # Drop the asset if it matches a blocked crypto ticker
            if x.Symbol.Value in self.crypto_blocklist:
                continue

            if hasattr(x, "AssetClassification") and x.AssetClassification is not None:
                if x.AssetClassification.MorningstarIndustryCode == 10303040: 
                    continue

            # Enforce baseline cap scale size criteria ($2B floor)
            if x.MarketCap >= 2000000000:
                institutional_quality_pool.append(x.Symbol)

        self.active_universe_symbols = institutional_quality_pool
        return self.active_universe_symbols

    def GetCleanHistory(self, symbol, days):
        try:
            hist = self.History(symbol, days, Resolution.Daily)
            if hist.empty: return pd.Series(dtype=float)
            close_data = hist['close'].unstack(level=0)[symbol] if 'close' in hist.columns else hist['value'].unstack(level=0)[symbol]
            close_data.index = pd.to_datetime(close_data.index).tz_localize(None).normalize()
            return close_data[~close_data.index.duplicated(keep='last')]
        except Exception: return pd.Series(dtype=float)

    def TrainMLModel(self):
        spy_close = self.GetCleanHistory(self.spy.Symbol, self.lookback_days)
        if spy_close.empty: return

        vix3m_close = self.GetCleanHistory(self.vix3m, self.lookback_days)
        vix_close = self.GetCleanHistory(self.vix, self.lookback_days)
        vvix_close = self.GetCleanHistory(self.vvix, self.lookback_days)
        credit_close = self.GetCleanHistory(self.credit_spread, self.lookback_days)
        yield_close = self.GetCleanHistory(self.yield_curve, self.lookback_days)
        
        if vix_close.empty: vix_close = pd.Series(20.0, index=spy_close.index)
        if vix3m_close.empty: vix3m_close = pd.Series(22.0, index=spy_close.index)
        if vvix_close.empty: vvix_close = pd.Series(90.0, index=spy_close.index)
        if credit_close.empty: credit_close = pd.Series(4.0, index=spy_close.index)
        if yield_close.empty: yield_close = pd.Series(1.0, index=spy_close.index)

        spy_returns = np.log(spy_close / spy_close.shift(1)).dropna()
        rolling_vol = spy_returns.rolling(window=180).std() * np.sqrt(252)

        df = pd.concat([rolling_vol, vix_close, vix3m_close, vvix_close, credit_close, yield_close], axis=1)
        df.columns = ['Feature_Vol', 'VIX', 'VIX3M', 'VVIX', 'Credit_Spread', 'Yield_Curve']
        df = df.ffill().dropna()
        if df.empty: return

        df['VIX_Ratio'] = df['VIX'] / df['VIX3M']
        df['Target_Vol'] = df['Feature_Vol'].shift(-180) 
        df = df.dropna()

        if len(df) < 190: return 
        features = ['Feature_Vol', 'VIX', 'VIX3M', 'VIX_Ratio', 'VVIX', 'Credit_Spread', 'Yield_Curve']
        self.ml_model.fit(df[features].values, df['Target_Vol'].values)
        self.is_model_trained = True

    def UpdateDailyMetrics(self):
        if self.IsWarmingUp or not self.is_model_trained: return
            
        history = self.History(self.spy.Symbol, 500, Resolution.Daily) 
        rsp_history = self.History(self.rsp.Symbol, 500, Resolution.Daily)

        if not history.empty and not rsp_history.empty and len(history) >= 252:
            spy_prices = history['close'].unstack(level=0)[self.spy.Symbol].dropna()
            rsp_prices = rsp_history['close'].unstack(level=0)[self.rsp.Symbol].dropna()
            
            common_idx = spy_prices.index.intersection(rsp_prices.index)
            spy_prices = spy_prices.loc[common_idx]
            rsp_prices = rsp_prices.loc[common_idx]

            spy_log_returns = np.log(spy_prices / spy_prices.shift(1)).dropna()
            
            current_vol = spy_log_returns[-252:].std() * np.sqrt(252)
            c_vix3m = self.Securities[self.vix3m].Price if self.Securities.ContainsKey(self.vix3m) and self.Securities[self.vix3m].Price > 0 else 22.0
            c_vix = self.Securities[self.vix].Price if self.Securities.ContainsKey(self.vix) and self.Securities[self.vix].Price > 0 else 20.0
            c_vvix = self.Securities[self.vvix].Price if self.Securities.ContainsKey(self.vvix) and self.Securities[self.vvix].Price > 0 else 90.0
            c_credit = self.Securities[self.credit_spread].Price if self.Securities.ContainsKey(self.credit_spread) and self.Securities[self.credit_spread].Price > 0 else 4.0
            c_yield = self.Securities[self.yield_curve].Price if self.Securities.ContainsKey(self.yield_curve) and self.Securities[self.yield_curve].Price != 0 else 1.0
            c_vix_ratio = c_vix / c_vix3m if c_vix3m > 0 else 1.0

            if c_vix3m > 0 and c_vix > 0:
                current_features = [[current_vol, c_vix, c_vix3m, c_vix_ratio, c_vvix, c_credit, c_yield]]
                self.current_predicted_iv = self.ml_model.predict(current_features)[0]
                
            if self.Securities.ContainsKey(self.dgs1) and self.Securities[self.dgs1].Price > 0:
                self.current_rf_rate = self.Securities[self.dgs1].Price / 100.0

            try:
                garch_input = spy_log_returns[-252:].values * 100.0
                am = arch_model(garch_input, vol='Garch', p=1, q=1, dist='normal')
                res = am.fit(disp='off')
                self.garch_vol_forecast = (np.sqrt(res.forecast(horizon=1).variance.iloc[-1, 0]) / 100.0) * np.sqrt(252)
                
                if self.garch_vol_forecast > 0:
                    self.garch_multiplier = self.long_term_vol_mean / self.garch_vol_forecast
                    self.garch_multiplier = max(min(self.garch_multiplier, 1.75), 0.55)
            except Exception: pass

            # --- 2D SPECIFIED BREADTH HMM MATRIX GENERATION ---
            breadth_ratio = rsp_prices / spy_prices
            breadth_log_returns = np.log(breadth_ratio / breadth_ratio.shift(1)).dropna()
            
            hmm_dataframe = pd.concat([spy_log_returns * 100.0, breadth_log_returns * 100.0], axis=1).dropna()
            hmm_dataframe.columns = ['Log_Returns', 'Market_Breadth']
            
            mean_returns, std_returns = hmm_dataframe['Log_Returns'].mean(), hmm_dataframe['Log_Returns'].std()
            mean_breadth, std_breadth = hmm_dataframe['Market_Breadth'].mean(), hmm_dataframe['Market_Breadth'].std()
            
            if std_returns > 0 and std_breadth > 0:
                hmm_dataframe['Scaled_Returns'] = (hmm_dataframe['Log_Returns'] - mean_returns) / std_returns
                hmm_dataframe['Scaled_Breadth'] = (hmm_dataframe['Market_Breadth'] - mean_breadth) / std_breadth
                hmm_matrix = hmm_dataframe[['Scaled_Returns', 'Scaled_Breadth']].values

                try:
                    hmm = GaussianHMM(n_components=3, covariance_type="full", n_iter=100, random_state=42)
                    hmm.fit(hmm_matrix)
                    
                    if np.allclose(hmm.transmat_.sum(axis=1), 1.0):
                        state_profiles = {}
                        for i in range(3):
                            unscaled_mean_return = hmm.means_[i][0] * std_returns + mean_returns
                            unscaled_mean_breadth = hmm.means_[i][1] * std_breadth + mean_breadth
                            state_profiles[i] = {'return': unscaled_mean_return, 'breadth': unscaled_mean_breadth}
                        
                        bear_idx = min(state_profiles, key=lambda k: state_profiles[k]['return'])
                        remaining_indices = [i for i in range(3) if i != bear_idx]
                        
                        if state_profiles[remaining_indices[0]]['return'] > state_profiles[remaining_indices[1]]['return']:
                            bull_idx, sideways_idx = remaining_indices[0], remaining_indices[1]
                        else:
                            bull_idx, sideways_idx = remaining_indices[1], remaining_indices[0]
                        
                        predicted_state_idx = hmm.predict(hmm_matrix)[-1]
                        
                        if predicted_state_idx == bull_idx: raw_regime = "BULL"
                        elif predicted_state_idx == bear_idx: raw_regime = "BEAR"
                        else: raw_regime = "SIDEWAYS"
                        
                        self.CurrentRegimeLog = raw_regime

                        if self.current_regime is None:
                            self.current_regime = raw_regime
                            self.ExecuteRegimeChange()
                        elif raw_regime == self.current_regime:
                            self.consecutive_days = 0 
                            self.pending_regime = None
                        else:
                            if raw_regime == self.pending_regime:
                                self.consecutive_days += 1
                                if self.consecutive_days >= self.confirmation_days:
                                    self.current_regime = raw_regime
                                    self.ExecuteRegimeChange()
                                    self.consecutive_days, self.pending_regime = 0, None
                            else:
                                self.pending_regime, self.consecutive_days = raw_regime, 1
                except Exception as e:
                    pass

    def ExecuteRegimeChange(self):
        if self.IsWarmingUp: return
        if self.current_regime in ["BEAR", "SIDEWAYS"]:
            self.Liquidate() 
            self.halved_positions.clear()

    def RebalanceUniverseRotation(self):
        """Scans, scores, and rebalances the Top 5 Nasdaq momentum leaders on Month Start"""
        if self.IsWarmingUp or self.current_regime != "BULL": return

        momentum_scores = []
        valid_histories = {}
        
        for symbol in self.active_universe_symbols:
            if not self.Securities.ContainsKey(symbol): continue
            if self.CurrentSlice is None or not self.CurrentSlice.Bars.ContainsKey(symbol) or self.CurrentSlice[symbol] is None: continue
            
            history = self.History(symbol, 200, Resolution.Daily)
            if history.empty or len(history) < 189: continue
            
            prices = history['close'].unstack(level=0)[symbol]
            valid_histories[symbol] = prices
            
            ret_3m = (prices.iloc[-1] - prices.iloc[-63]) / prices.iloc[-63]
            ret_6m = (prices.iloc[-1] - prices.iloc[-126]) / prices.iloc[-126]
            ret_9m = (prices.iloc[-1] - prices.iloc[-189]) / prices.iloc[-189]
            
            blended_score = (ret_3m + ret_6m + ret_9m) / 3.0
            momentum_scores.append((symbol, blended_score))

        if len(momentum_scores) == 0: return

        momentum_scores.sort(key=lambda x: x[1], reverse=True)
        top_5_leaders = [item[0] for item in momentum_scores[:5]]

        # Liquidate assets dropping out of the leadership tier completely
        for symbol, holding in self.Portfolio.items():
            if holding.Invested and symbol not in top_5_leaders and symbol.SecurityType == SecurityType.Equity:
                self.Liquidate(symbol, "Dropped From Top 5 Monthly Momentum Rankings")
                self.halved_positions.pop(symbol, None)

        # Hard Double Price Bar Security Filter Guard 
        valid_leaders = []
        for symbol in top_5_leaders:
            if self.CurrentSlice.Bars.ContainsKey(symbol):
                bar = self.CurrentSlice.Bars[symbol]
                if bar is not None and bar.Close > 0:
                    valid_leaders.append(symbol)

        # Hard Cap Inverse Volatility Risk-Parity Allocation Matrix
        if len(valid_leaders) > 0:
            inverse_vols = []
            for symbol in valid_leaders:
                prices = valid_histories[symbol]
                daily_returns = np.log(prices / prices.shift(1)).dropna()[-21:]
                vol = daily_returns.std()
                if vol <= 0: vol = 0.03 
                inverse_vols.append(1.0 / vol)
            
            total_inverse_vol = sum(inverse_vols)
            raw_weights = [(iv / total_inverse_vol) * 0.95 for iv in inverse_vols]
            
            # Enforce 25% single-stock ceiling to block concentration risk
            capped_weights = [min(w, 0.25) for w in raw_weights]
            leftover_cash = 0.95 - sum(capped_weights)
            
            # Redistribute residual footprint safely among below-cap leaders
            uncapped_count = sum(1 for w in capped_weights if w < 0.25)
            if uncapped_count > 0:
                redistribution = leftover_cash / float(uncapped_count)
                for i in range(len(capped_weights)):
                    if capped_weights[i] < 0.25:
                        capped_weights[i] = min(capped_weights[i] + redistribution, 0.25)

            for i, symbol in enumerate(valid_leaders):
                # Dynamically register asset at minute-resolution for real-time tracking
                self.AddEquity(symbol, Resolution.Minute)
                
                self.SetHoldings(symbol, capped_weights[i])
                if symbol not in self.halved_positions:
                    self.halved_positions[symbol] = False

    def OnData(self, slice):
        if self.IsWarmingUp: return

        # =====================================================================
        # 1. THE DEFENSIVE TRACK: Execute Symmetrical Risk-Parity Allocation
        # =====================================================================
        if self.current_regime == "BEAR":
            if not self.Portfolio[self.shv].Invested and not self.Portfolio[self.pst].Invested and len(self.Transactions.GetOpenOrders()) == 0:
                self.SetHoldings(self.shv, 0.57) 
                self.SetHoldings(self.pst, 0.38) 
            return

        elif self.current_regime == "SIDEWAYS":
            current_vix = self.Securities[self.vix].Price if self.Securities.ContainsKey(self.vix) else 20.0
            current_curve = self.Securities[self.yield_curve].Price if self.Securities.ContainsKey(self.yield_curve) else 0.50
            
            if current_curve <= 0.355:
                if current_curve <= -0.735:
                    if not self.Portfolio[self.dbc].Invested and len(self.Transactions.GetOpenOrders()) == 0:
                        self.SetHoldings(self.dbc, 0.475)
                        self.SetHoldings(self.shv, 0.475)
                else:
                    if not self.Portfolio[self.shv].Invested and len(self.Transactions.GetOpenOrders()) == 0:
                        self.SetHoldings(self.shv, 0.95)
            else:
                if current_vix <= 40.815:
                    if not self.Portfolio[self.dbc].Invested and len(self.Transactions.GetOpenOrders()) == 0:
                        self.SetHoldings(self.dbc, 0.475)
                        self.SetHoldings(self.shv, 0.475)
                else:
                    if not self.Portfolio[self.shv].Invested and len(self.Transactions.GetOpenOrders()) == 0:
                        self.SetHoldings(self.shv, 0.95)
            return

        elif self.current_regime != "BULL":
            return

        # =====================================================================
        # 2. THE OFFENSIVE TRACK: Symmetrical Volatility-Targeted Execution Layer
        # =====================================================================
        for symbol in list(self.halved_positions.keys()):
            if not self.Portfolio.ContainsKey(symbol): continue
            holding = self.Portfolio[symbol]
            if not holding.Invested: continue

            if not slice.Bars.ContainsKey(symbol) or slice[symbol] is None: continue
            current_price = slice[symbol].Close
            avg_entry_price = holding.AveragePrice

            if avg_entry_price <= 0 or current_price <= 0: continue

            # Fetch historical daily bars to extract clean statistical range vectors
            history = self.History(symbol, 20, Resolution.Daily)
            if history.empty or 'high' not in history.columns: continue
            
            highs = history['high'].unstack(level=0)[symbol]
            lows = history['low'].unstack(level=0)[symbol]
            closes = history['close'].unstack(level=0)[symbol]

            # Trailing 14-day statistical ATR boundary window
            tr1 = highs - lows
            tr2 = (highs - closes.shift(1)).abs()
            tr3 = (lows - closes.shift(1)).abs()
            true_range = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
            atr = true_range.rolling(14).mean().iloc[-1]
            
            if atr <= 0: continue

            # Symmetrical Volatility Target Array Configuration
            tranche_1_target = avg_entry_price + (2.5 * atr * self.garch_multiplier)
            tranche_2_target = avg_entry_price + (5.5 * atr * self.garch_multiplier)
            
            # UPGRADE IMPLEMENTED: Dynamic Volatility Stop-Loss anchor limits downside risk factor
            stop_loss_floor = avg_entry_price - (1.5 * atr)

            # --- HARD STOP RISK DISPATCH ENGINE ---
            if current_price <= stop_loss_floor:
                self.Liquidate(symbol, f"CRITICAL RISK ACTION: Volatility stop boundaries violated at {current_price}. Exiting position.")
                self.halved_positions.pop(symbol, None)
                continue

            # --- DYNAMIC HARVESTING ENGINE ---
            # TRANCHE 1 EXIT: Scale out 50% of position if price breaches the lower ATR boundary
            if current_price >= tranche_1_target and not self.halved_positions[symbol]:
                current_quantity = holding.Quantity
                sell_quantity = int(current_quantity / 2)
                
                if sell_quantity > 0:
                    self.MarketOrder(symbol, -sell_quantity, False, f"GARCH STEERED GATE 1 HIT: Volatility target hit at price {current_price}")
                    self.halved_positions[symbol] = True
                    continue

            # TRANCHE 2 EXIT: Liquidate remaining runner shares if price breaches upper ATR boundary
            if current_price >= tranche_2_target and self.halved_positions[symbol]:
                self.Liquidate(symbol, f"GARCH STEERED GATE 2 HIT: Max volatility target hit at price {current_price}")
                self.halved_positions.pop(symbol, None)