| Overall Statistics |
|
Total Orders 210 Average Win 2.95% Average Loss -1.55% Compounding Annual Return 18.180% Drawdown 26.600% Expectancy 0.677 Start Equity 10000 End Equity 28078.89 Net Profit 180.789% Sharpe Ratio 0.676 Sortino Ratio 0.624 Probabilistic Sharpe Ratio 30.913% Loss Rate 42% Win Rate 58% Profit-Loss Ratio 1.90 Alpha 0.092 Beta 0.139 Annual Standard Deviation 0.153 Annual Variance 0.023 Information Ratio 0.101 Tracking Error 0.21 Treynor Ratio 0.741 Total Fees $250.21 Estimated Strategy Capacity $5800000.00 Lowest Capacity Asset SATS TYZ2C9FOCMED Portfolio Turnover 2.23% Drawdown Recovery 477 |
from AlgorithmImports import *
import numpy as np
import pandas as pd
from hmmlearn.hmm import GaussianHMM
from arch import arch_model
from sklearn.ensemble import RandomForestRegressor
import warnings
warnings.filterwarnings("ignore")
class InstitutionalUniverseRotation(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1)
self.SetCash(10000)
# 1. Hybrid Universe Selection Setup
self.UniverseSettings.Resolution = Resolution.Daily
self.AddUniverseSelection(FineFundamentalUniverseSelectionModel(self.CoarseSelectionFunction, self.FineSelectionFunction))
self.active_universe_symbols = []
# HARD GATES: Explicit Sector Blocklist to wipe out hyper-volatile crypto proxies
self.crypto_blocklist = {"MARA", "CLSK", "IREN", "CIFR", "COIN", "MSTR", "RIOT", "HUT"}
# 2. Benchmark & Macro Core Elements
self.spy = self.AddEquity("SPY", Resolution.Minute)
self.spy.SetDataNormalizationMode(DataNormalizationMode.Raw)
self.rsp = self.AddEquity("RSP", Resolution.Minute)
self.rsp.SetDataNormalizationMode(DataNormalizationMode.Raw)
self.SetBenchmark("SPY")
# 3. Defensive Risk-Parity Vault Assets
self.shv = self.AddEquity("SHV", Resolution.Minute).Symbol # High-Yield Cash proxy anchor
self.pst = self.AddEquity("PST", Resolution.Minute).Symbol # Inverse Bond rate hedge
self.dbc = self.AddEquity("DBC", Resolution.Minute).Symbol # Tangible Commodities structural hedge
# 4. Institutional Macro Data Feeds (Registered natively via Fred)
self.vix3m = self.AddIndex("VIX3M", Resolution.Daily).Symbol
self.vix = self.AddIndex("VIX", Resolution.Daily).Symbol
self.vvix = self.AddIndex("VVIX", Resolution.Daily).Symbol
self.dgs1 = self.AddData(Fred, "DGS1", Resolution.Daily).Symbol
self.credit_spread = self.AddData(Fred, "BAMLH0A0HYM2", Resolution.Daily).Symbol
self.yield_curve = self.AddData(Fred, "T10Y2Y", Resolution.Daily).Symbol
# --- REGIME SWITCHING TIMELINES ---
self.current_regime = None
self.pending_regime = None
self.consecutive_days = 0
self.confirmation_days = 5
self.lookback_days = 1250
# --- PROFIT-TAKING TRACERS ---
self.halved_positions = {}
# 5. Volatility Machine Learning Pricing Setup & GARCH Steering Variables
self.ml_model = RandomForestRegressor(n_estimators=100, max_depth=7, random_state=42)
self.is_model_trained = False
self.current_predicted_iv = None
self.current_rf_rate = 0.04
self.garch_vol_forecast = 0.15
# Institutional Volatility Steering parameters initialized
self.long_term_vol_mean = 0.15
self.garch_multiplier = 1.0
# 6. Performance Graph Settings
benchmark_chart = Chart("Strategy vs Buy & Hold")
benchmark_chart.AddSeries(Series("Algorithm Equity", SeriesType.Line, 0))
benchmark_chart.AddSeries(Series("SPY Buy & Hold", SeriesType.Line, 0))
self.AddChart(benchmark_chart)
self.initial_spy_price = None
self.starting_cash = 10000
# 7. Warmup Data Pumps & Scheduled Tasks
self.SetWarmup(self.lookback_days, Resolution.Daily)
self.Train(self.TrainMLModel)
self.Train(self.DateRules.MonthStart("SPY"), self.TimeRules.At(8, 0), self.TrainMLModel)
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 5), self.UpdateDailyMetrics)
# Enforce Monthly Cadence Rebalancing Schedule
self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.AfterMarketOpen("SPY", 30), self.RebalanceUniverseRotation)
def CoarseSelectionFunction(self, coarse):
"""Filters the entire US market down to the top 300 most liquid assets"""
sorted_by_dollar_volume = sorted(coarse, key=lambda x: x.DollarVolume, reverse=True)
return [x.Symbol for x in sorted_by_dollar_volume[:300]]
def FineSelectionFunction(self, fine):
"""Filters liquidity pool using fundamental constraints and removes blocked sectors"""
nasdaq_share = [x for x in fine if x.CompanyReference.PrimaryExchangeID == "NAS" and x.SecurityReference.IsPrimaryShare]
institutional_quality_pool = []
for x in nasdaq_share:
if x.FinancialStatements is None or x.FinancialStatements.IncomeStatement is None:
continue
# Drop the asset if it matches a blocked crypto ticker
if x.Symbol.Value in self.crypto_blocklist:
continue
if hasattr(x, "AssetClassification") and x.AssetClassification is not None:
if x.AssetClassification.MorningstarIndustryCode == 10303040:
continue
# Enforce baseline cap scale size criteria ($2B floor)
if x.MarketCap >= 2000000000:
institutional_quality_pool.append(x.Symbol)
self.active_universe_symbols = institutional_quality_pool
return self.active_universe_symbols
def GetCleanHistory(self, symbol, days):
try:
hist = self.History(symbol, days, Resolution.Daily)
if hist.empty: return pd.Series(dtype=float)
close_data = hist['close'].unstack(level=0)[symbol] if 'close' in hist.columns else hist['value'].unstack(level=0)[symbol]
close_data.index = pd.to_datetime(close_data.index).tz_localize(None).normalize()
return close_data[~close_data.index.duplicated(keep='last')]
except Exception: return pd.Series(dtype=float)
def TrainMLModel(self):
spy_close = self.GetCleanHistory(self.spy.Symbol, self.lookback_days)
if spy_close.empty: return
vix3m_close = self.GetCleanHistory(self.vix3m, self.lookback_days)
vix_close = self.GetCleanHistory(self.vix, self.lookback_days)
vvix_close = self.GetCleanHistory(self.vvix, self.lookback_days)
credit_close = self.GetCleanHistory(self.credit_spread, self.lookback_days)
yield_close = self.GetCleanHistory(self.yield_curve, self.lookback_days)
if vix_close.empty: vix_close = pd.Series(20.0, index=spy_close.index)
if vix3m_close.empty: vix3m_close = pd.Series(22.0, index=spy_close.index)
if vvix_close.empty: vvix_close = pd.Series(90.0, index=spy_close.index)
if credit_close.empty: credit_close = pd.Series(4.0, index=spy_close.index)
if yield_close.empty: yield_close = pd.Series(1.0, index=spy_close.index)
spy_returns = np.log(spy_close / spy_close.shift(1)).dropna()
rolling_vol = spy_returns.rolling(window=180).std() * np.sqrt(252)
df = pd.concat([rolling_vol, vix_close, vix3m_close, vvix_close, credit_close, yield_close], axis=1)
df.columns = ['Feature_Vol', 'VIX', 'VIX3M', 'VVIX', 'Credit_Spread', 'Yield_Curve']
df = df.ffill().dropna()
if df.empty: return
df['VIX_Ratio'] = df['VIX'] / df['VIX3M']
df['Target_Vol'] = df['Feature_Vol'].shift(-180)
df = df.dropna()
if len(df) < 190: return
features = ['Feature_Vol', 'VIX', 'VIX3M', 'VIX_Ratio', 'VVIX', 'Credit_Spread', 'Yield_Curve']
self.ml_model.fit(df[features].values, df['Target_Vol'].values)
self.is_model_trained = True
def UpdateDailyMetrics(self):
if self.IsWarmingUp or not self.is_model_trained: return
history = self.History(self.spy.Symbol, 500, Resolution.Daily)
rsp_history = self.History(self.rsp.Symbol, 500, Resolution.Daily)
if not history.empty and not rsp_history.empty and len(history) >= 252:
spy_prices = history['close'].unstack(level=0)[self.spy.Symbol].dropna()
rsp_prices = rsp_history['close'].unstack(level=0)[self.rsp.Symbol].dropna()
common_idx = spy_prices.index.intersection(rsp_prices.index)
spy_prices = spy_prices.loc[common_idx]
rsp_prices = rsp_prices.loc[common_idx]
spy_log_returns = np.log(spy_prices / spy_prices.shift(1)).dropna()
current_vol = spy_log_returns[-252:].std() * np.sqrt(252)
c_vix3m = self.Securities[self.vix3m].Price if self.Securities.ContainsKey(self.vix3m) and self.Securities[self.vix3m].Price > 0 else 22.0
c_vix = self.Securities[self.vix].Price if self.Securities.ContainsKey(self.vix) and self.Securities[self.vix].Price > 0 else 20.0
c_vvix = self.Securities[self.vvix].Price if self.Securities.ContainsKey(self.vvix) and self.Securities[self.vvix].Price > 0 else 90.0
c_credit = self.Securities[self.credit_spread].Price if self.Securities.ContainsKey(self.credit_spread) and self.Securities[self.credit_spread].Price > 0 else 4.0
c_yield = self.Securities[self.yield_curve].Price if self.Securities.ContainsKey(self.yield_curve) and self.Securities[self.yield_curve].Price != 0 else 1.0
c_vix_ratio = c_vix / c_vix3m if c_vix3m > 0 else 1.0
if c_vix3m > 0 and c_vix > 0:
current_features = [[current_vol, c_vix, c_vix3m, c_vix_ratio, c_vvix, c_credit, c_yield]]
self.current_predicted_iv = self.ml_model.predict(current_features)[0]
if self.Securities.ContainsKey(self.dgs1) and self.Securities[self.dgs1].Price > 0:
self.current_rf_rate = self.Securities[self.dgs1].Price / 100.0
try:
garch_input = spy_log_returns[-252:].values * 100.0
am = arch_model(garch_input, vol='Garch', p=1, q=1, dist='normal')
res = am.fit(disp='off')
self.garch_vol_forecast = (np.sqrt(res.forecast(horizon=1).variance.iloc[-1, 0]) / 100.0) * np.sqrt(252)
if self.garch_vol_forecast > 0:
self.garch_multiplier = self.long_term_vol_mean / self.garch_vol_forecast
self.garch_multiplier = max(min(self.garch_multiplier, 1.75), 0.55)
except Exception: pass
# --- 2D SPECIFIED BREADTH HMM MATRIX GENERATION ---
breadth_ratio = rsp_prices / spy_prices
breadth_log_returns = np.log(breadth_ratio / breadth_ratio.shift(1)).dropna()
hmm_dataframe = pd.concat([spy_log_returns * 100.0, breadth_log_returns * 100.0], axis=1).dropna()
hmm_dataframe.columns = ['Log_Returns', 'Market_Breadth']
mean_returns, std_returns = hmm_dataframe['Log_Returns'].mean(), hmm_dataframe['Log_Returns'].std()
mean_breadth, std_breadth = hmm_dataframe['Market_Breadth'].mean(), hmm_dataframe['Market_Breadth'].std()
if std_returns > 0 and std_breadth > 0:
hmm_dataframe['Scaled_Returns'] = (hmm_dataframe['Log_Returns'] - mean_returns) / std_returns
hmm_dataframe['Scaled_Breadth'] = (hmm_dataframe['Market_Breadth'] - mean_breadth) / std_breadth
hmm_matrix = hmm_dataframe[['Scaled_Returns', 'Scaled_Breadth']].values
try:
hmm = GaussianHMM(n_components=3, covariance_type="full", n_iter=100, random_state=42)
hmm.fit(hmm_matrix)
if np.allclose(hmm.transmat_.sum(axis=1), 1.0):
state_profiles = {}
for i in range(3):
unscaled_mean_return = hmm.means_[i][0] * std_returns + mean_returns
unscaled_mean_breadth = hmm.means_[i][1] * std_breadth + mean_breadth
state_profiles[i] = {'return': unscaled_mean_return, 'breadth': unscaled_mean_breadth}
bear_idx = min(state_profiles, key=lambda k: state_profiles[k]['return'])
remaining_indices = [i for i in range(3) if i != bear_idx]
if state_profiles[remaining_indices[0]]['return'] > state_profiles[remaining_indices[1]]['return']:
bull_idx, sideways_idx = remaining_indices[0], remaining_indices[1]
else:
bull_idx, sideways_idx = remaining_indices[1], remaining_indices[0]
predicted_state_idx = hmm.predict(hmm_matrix)[-1]
if predicted_state_idx == bull_idx: raw_regime = "BULL"
elif predicted_state_idx == bear_idx: raw_regime = "BEAR"
else: raw_regime = "SIDEWAYS"
self.CurrentRegimeLog = raw_regime
if self.current_regime is None:
self.current_regime = raw_regime
self.ExecuteRegimeChange()
elif raw_regime == self.current_regime:
self.consecutive_days = 0
self.pending_regime = None
else:
if raw_regime == self.pending_regime:
self.consecutive_days += 1
if self.consecutive_days >= self.confirmation_days:
self.current_regime = raw_regime
self.ExecuteRegimeChange()
self.consecutive_days, self.pending_regime = 0, None
else:
self.pending_regime, self.consecutive_days = raw_regime, 1
except Exception as e:
pass
def ExecuteRegimeChange(self):
if self.IsWarmingUp: return
if self.current_regime in ["BEAR", "SIDEWAYS"]:
self.Liquidate()
self.halved_positions.clear()
def RebalanceUniverseRotation(self):
"""Scans, scores, and rebalances the Top 5 Nasdaq momentum leaders on Month Start"""
if self.IsWarmingUp or self.current_regime != "BULL": return
momentum_scores = []
valid_histories = {}
for symbol in self.active_universe_symbols:
if not self.Securities.ContainsKey(symbol): continue
if self.CurrentSlice is None or not self.CurrentSlice.Bars.ContainsKey(symbol) or self.CurrentSlice[symbol] is None: continue
history = self.History(symbol, 200, Resolution.Daily)
if history.empty or len(history) < 189: continue
prices = history['close'].unstack(level=0)[symbol]
valid_histories[symbol] = prices
ret_3m = (prices.iloc[-1] - prices.iloc[-63]) / prices.iloc[-63]
ret_6m = (prices.iloc[-1] - prices.iloc[-126]) / prices.iloc[-126]
ret_9m = (prices.iloc[-1] - prices.iloc[-189]) / prices.iloc[-189]
blended_score = (ret_3m + ret_6m + ret_9m) / 3.0
momentum_scores.append((symbol, blended_score))
if len(momentum_scores) == 0: return
momentum_scores.sort(key=lambda x: x[1], reverse=True)
top_5_leaders = [item[0] for item in momentum_scores[:5]]
# Liquidate assets dropping out of the leadership tier completely
for symbol, holding in self.Portfolio.items():
if holding.Invested and symbol not in top_5_leaders and symbol.SecurityType == SecurityType.Equity:
self.Liquidate(symbol, "Dropped From Top 5 Monthly Momentum Rankings")
self.halved_positions.pop(symbol, None)
# Hard Double Price Bar Security Filter Guard
valid_leaders = []
for symbol in top_5_leaders:
if self.CurrentSlice.Bars.ContainsKey(symbol):
bar = self.CurrentSlice.Bars[symbol]
if bar is not None and bar.Close > 0:
valid_leaders.append(symbol)
# Hard Cap Inverse Volatility Risk-Parity Allocation Matrix
if len(valid_leaders) > 0:
inverse_vols = []
for symbol in valid_leaders:
prices = valid_histories[symbol]
daily_returns = np.log(prices / prices.shift(1)).dropna()[-21:]
vol = daily_returns.std()
if vol <= 0: vol = 0.03
inverse_vols.append(1.0 / vol)
total_inverse_vol = sum(inverse_vols)
raw_weights = [(iv / total_inverse_vol) * 0.95 for iv in inverse_vols]
# Enforce 25% single-stock ceiling to block concentration risk
capped_weights = [min(w, 0.25) for w in raw_weights]
leftover_cash = 0.95 - sum(capped_weights)
# Redistribute residual footprint safely among below-cap leaders
uncapped_count = sum(1 for w in capped_weights if w < 0.25)
if uncapped_count > 0:
redistribution = leftover_cash / float(uncapped_count)
for i in range(len(capped_weights)):
if capped_weights[i] < 0.25:
capped_weights[i] = min(capped_weights[i] + redistribution, 0.25)
for i, symbol in enumerate(valid_leaders):
# Dynamically register asset at minute-resolution for real-time tracking
self.AddEquity(symbol, Resolution.Minute)
self.SetHoldings(symbol, capped_weights[i])
if symbol not in self.halved_positions:
self.halved_positions[symbol] = False
def OnData(self, slice):
if self.IsWarmingUp: return
# =====================================================================
# 1. THE DEFENSIVE TRACK: Execute Symmetrical Risk-Parity Allocation
# =====================================================================
if self.current_regime == "BEAR":
if not self.Portfolio[self.shv].Invested and not self.Portfolio[self.pst].Invested and len(self.Transactions.GetOpenOrders()) == 0:
self.SetHoldings(self.shv, 0.57)
self.SetHoldings(self.pst, 0.38)
return
elif self.current_regime == "SIDEWAYS":
current_vix = self.Securities[self.vix].Price if self.Securities.ContainsKey(self.vix) else 20.0
current_curve = self.Securities[self.yield_curve].Price if self.Securities.ContainsKey(self.yield_curve) else 0.50
if current_curve <= 0.355:
if current_curve <= -0.735:
if not self.Portfolio[self.dbc].Invested and len(self.Transactions.GetOpenOrders()) == 0:
self.SetHoldings(self.dbc, 0.475)
self.SetHoldings(self.shv, 0.475)
else:
if not self.Portfolio[self.shv].Invested and len(self.Transactions.GetOpenOrders()) == 0:
self.SetHoldings(self.shv, 0.95)
else:
if current_vix <= 40.815:
if not self.Portfolio[self.dbc].Invested and len(self.Transactions.GetOpenOrders()) == 0:
self.SetHoldings(self.dbc, 0.475)
self.SetHoldings(self.shv, 0.475)
else:
if not self.Portfolio[self.shv].Invested and len(self.Transactions.GetOpenOrders()) == 0:
self.SetHoldings(self.shv, 0.95)
return
elif self.current_regime != "BULL":
return
# =====================================================================
# 2. THE OFFENSIVE TRACK: Symmetrical Volatility-Targeted Execution Layer
# =====================================================================
for symbol in list(self.halved_positions.keys()):
if not self.Portfolio.ContainsKey(symbol): continue
holding = self.Portfolio[symbol]
if not holding.Invested: continue
if not slice.Bars.ContainsKey(symbol) or slice[symbol] is None: continue
current_price = slice[symbol].Close
avg_entry_price = holding.AveragePrice
if avg_entry_price <= 0 or current_price <= 0: continue
# Fetch historical daily bars to extract clean statistical range vectors
history = self.History(symbol, 20, Resolution.Daily)
if history.empty or 'high' not in history.columns: continue
highs = history['high'].unstack(level=0)[symbol]
lows = history['low'].unstack(level=0)[symbol]
closes = history['close'].unstack(level=0)[symbol]
# Trailing 14-day statistical ATR boundary window
tr1 = highs - lows
tr2 = (highs - closes.shift(1)).abs()
tr3 = (lows - closes.shift(1)).abs()
true_range = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = true_range.rolling(14).mean().iloc[-1]
if atr <= 0: continue
# Symmetrical Volatility Target Array Configuration
tranche_1_target = avg_entry_price + (2.5 * atr * self.garch_multiplier)
tranche_2_target = avg_entry_price + (5.5 * atr * self.garch_multiplier)
# UPGRADE IMPLEMENTED: Dynamic Volatility Stop-Loss anchor limits downside risk factor
stop_loss_floor = avg_entry_price - (1.5 * atr)
# --- HARD STOP RISK DISPATCH ENGINE ---
if current_price <= stop_loss_floor:
self.Liquidate(symbol, f"CRITICAL RISK ACTION: Volatility stop boundaries violated at {current_price}. Exiting position.")
self.halved_positions.pop(symbol, None)
continue
# --- DYNAMIC HARVESTING ENGINE ---
# TRANCHE 1 EXIT: Scale out 50% of position if price breaches the lower ATR boundary
if current_price >= tranche_1_target and not self.halved_positions[symbol]:
current_quantity = holding.Quantity
sell_quantity = int(current_quantity / 2)
if sell_quantity > 0:
self.MarketOrder(symbol, -sell_quantity, False, f"GARCH STEERED GATE 1 HIT: Volatility target hit at price {current_price}")
self.halved_positions[symbol] = True
continue
# TRANCHE 2 EXIT: Liquidate remaining runner shares if price breaches upper ATR boundary
if current_price >= tranche_2_target and self.halved_positions[symbol]:
self.Liquidate(symbol, f"GARCH STEERED GATE 2 HIT: Max volatility target hit at price {current_price}")
self.halved_positions.pop(symbol, None)