| Overall Statistics |
|
Total Orders 79 Average Win 4.86% Average Loss -1.95% Compounding Annual Return 16.096% Drawdown 14.300% Expectancy 1.249 Start Equity 10000 End Equity 25203.25 Net Profit 152.032% Sharpe Ratio 0.749 Sortino Ratio 0.558 Probabilistic Sharpe Ratio 48.142% Loss Rate 36% Win Rate 64% Profit-Loss Ratio 2.50 Alpha 0.07 Beta 0.164 Annual Standard Deviation 0.111 Annual Variance 0.012 Information Ratio 0.013 Tracking Error 0.178 Treynor Ratio 0.508 Total Fees $92.45 Estimated Strategy Capacity $300000.00 Lowest Capacity Asset UPRO UDQRQQYTO12D Portfolio Turnover 2.48% Drawdown Recovery 366 |
from AlgorithmImports import *
import numpy as np
import pandas as pd
from hmmlearn.hmm import GaussianHMM
from arch import arch_model
from sklearn.ensemble import RandomForestRegressor
import warnings
warnings.filterwarnings("ignore")
class AlignedLeveragedReEntryAlphaEngine(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1)
self.SetCash(10000)
# 1. Broad Market Core Aligned Leveraged Bucket
self.leveraged_bucket = ["UPRO", "SSO"]
self.symbols = [self.AddEquity(ticker, Resolution.Minute).Symbol for ticker in self.leveraged_bucket]
# Enforce raw data mode to ensure clean, split-adjusted ATR tracking boundaries
for symbol in self.symbols:
self.Securities[symbol].SetDataNormalizationMode(DataNormalizationMode.Raw)
# 2. Benchmark & Core Macro Tracking Elements
self.spy = self.AddEquity("SPY", Resolution.Minute)
self.spy.SetDataNormalizationMode(DataNormalizationMode.Raw)
self.rsp = self.AddEquity("RSP", Resolution.Minute)
self.rsp.SetDataNormalizationMode(DataNormalizationMode.Raw)
self.SetBenchmark("SPY")
# 3. Consolidated Safe-Haven Vault Anchor
self.shv = self.AddEquity("SHV", Resolution.Minute).Symbol # Clean Cash Proxy Anchor
self.dbc = self.AddEquity("DBC", Resolution.Minute).Symbol # Inflation Commodities Anchor
# 4. Institutional Macro Data Feeds (FRED Natively Registered)
self.vix3m = self.AddIndex("VIX3M", Resolution.Daily).Symbol
self.vix = self.AddIndex("VIX", Resolution.Daily).Symbol
self.vvix = self.AddIndex("VVIX", Resolution.Daily).Symbol
self.dgs1 = self.AddData(Fred, "DGS1", Resolution.Daily).Symbol
self.credit_spread = self.AddData(Fred, "BAMLH0A0HYM2", Resolution.Daily).Symbol
self.yield_curve = self.AddData(Fred, "T10Y2Y", Resolution.Daily).Symbol
# --- REGIME SWITCHING TIMELINES ---
self.current_regime = None
self.pending_regime = None
self.consecutive_days = 0
self.confirmation_days = 5
self.lookback_days = 1250
# --- POSITION TRACERS ---
self.halved_positions = {}
# UPGRADE 1: Initialize target weight registries to govern mid-month re-entries safely
self.monthly_target_leaders = []
self.monthly_leader_weights = {}
# 5. Volatility Machine Learning Pricing Setup & GARCH Steering Variables
self.ml_model = RandomForestRegressor(n_estimators=100, max_depth=7, random_state=42)
self.is_model_trained = False
self.current_predicted_iv = None
self.current_rf_rate = 0.04
self.garch_vol_forecast = 0.15
self.long_term_vol_mean = 0.15
self.garch_multiplier = 1.0
# 6. Performance Graph Settings
benchmark_chart = Chart("Strategy vs Buy & Hold")
benchmark_chart.AddSeries(Series("Algorithm Equity", SeriesType.Line, 0))
benchmark_chart.AddSeries(Series("SPY Buy & Hold", SeriesType.Line, 0))
self.AddChart(benchmark_chart)
self.initial_spy_price = None
self.starting_cash = 10000
# 7. Warmup Data Pumps & Scheduled Tasks
self.SetWarmup(self.lookback_days, Resolution.Daily)
self.Train(self.TrainMLModel)
self.Train(self.DateRules.MonthStart("SPY"), self.TimeRules.At(8, 0), self.TrainMLModel)
self.Schedule.On(self.DateRules.EveryDay("SPY"), self.TimeRules.AfterMarketOpen("SPY", 5), self.UpdateDailyMetrics)
self.Schedule.On(self.DateRules.MonthStart("SPY"), self.TimeRules.AfterMarketOpen("SPY", 30), self.RebalanceUniverseRotation)
def GetCleanHistory(self, symbol, days):
try:
hist = self.History(symbol, days, Resolution.Daily)
if hist.empty: return pd.Series(dtype=float)
close_data = hist['close'].unstack(level=0)[symbol] if 'close' in hist.columns else hist['value'].unstack(level=0)[symbol]
close_data.index = pd.to_datetime(close_data.index).tz_localize(None).normalize()
return close_data[~close_data.index.duplicated(keep='last')]
except Exception: return pd.Series(dtype=float)
def TrainMLModel(self):
spy_close = self.GetCleanHistory(self.spy.Symbol, self.lookback_days)
if spy_close.empty: return
vix3m_close = self.GetCleanHistory(self.vix3m, self.lookback_days)
vix_close = self.GetCleanHistory(self.vix, self.lookback_days)
vvix_close = self.GetCleanHistory(self.vvix, self.lookback_days)
credit_spread = self.GetCleanHistory(self.credit_spread, self.lookback_days)
yield_close = self.GetCleanHistory(self.yield_curve, self.lookback_days)
if vix_close.empty: vix_close = pd.Series(20.0, index=spy_close.index)
if vix3m_close.empty: vix3m_close = pd.Series(22.0, index=spy_close.index)
if vvix_close.empty: vvix_close = pd.Series(90.0, index=spy_close.index)
if credit_spread.empty: credit_spread = pd.Series(4.0, index=spy_close.index)
if yield_close.empty: yield_close = pd.Series(1.0, index=spy_close.index)
spy_returns = np.log(spy_close / spy_close.shift(1)).dropna()
rolling_vol = spy_returns.rolling(window=180).std() * np.sqrt(252)
df = pd.concat([rolling_vol, vix_close, vix3m_close, vvix_close, credit_spread, yield_close], axis=1)
df.columns = ['Feature_Vol', 'VIX', 'VIX3M', 'VVIX', 'Credit_Spread', 'Yield_Curve']
df = df.ffill().dropna()
if df.empty: return
df['VIX_Ratio'] = df['VIX'] / df['VIX3M']
df['Target_Vol'] = df['Feature_Vol'].shift(-180)
df = df.dropna()
if len(df) < 190: return
features = ['Feature_Vol', 'VIX', 'VIX3M', 'VIX_Ratio', 'VVIX', 'Credit_Spread', 'Yield_Curve']
self.ml_model.fit(df[features].values, df['Target_Vol'].values)
self.is_model_trained = True
def UpdateDailyMetrics(self):
if self.IsWarmingUp or not self.is_model_trained: return
history = self.History(self.spy.Symbol, 500, Resolution.Daily)
rsp_history = self.History(self.rsp.Symbol, 500, Resolution.Daily)
if not history.empty and not rsp_history.empty and len(history) >= 252:
spy_prices = history['close'].unstack(level=0)[self.spy.Symbol].dropna()
rsp_prices = rsp_history['close'].unstack(level=0)[self.rsp.Symbol].dropna()
common_idx = spy_prices.index.intersection(rsp_prices.index)
spy_prices = spy_prices.loc[common_idx]
rsp_prices = rsp_prices.loc[common_idx]
spy_log_returns = np.log(spy_prices / spy_prices.shift(1)).dropna()
current_vol = spy_log_returns[-252:].std() * np.sqrt(252)
c_vix3m = self.Securities[self.vix3m].Price if self.Securities.ContainsKey(self.vix3m) and self.Securities[self.vix3m].Price > 0 else 22.0
c_vix = self.Securities[self.vix].Price if self.Securities.ContainsKey(self.vix) and self.Securities[self.vix].Price > 0 else 20.0
c_vvix = self.Securities[self.vvix].Price if self.Securities.ContainsKey(self.vvix) and self.Securities[self.vvix].Price > 0 else 90.0
c_credit = self.Securities[self.credit_spread].Price if self.Securities.ContainsKey(self.credit_spread) and self.Securities[self.credit_spread].Price > 0 else 4.0
c_yield = self.Securities[self.yield_curve].Price if self.Securities.ContainsKey(self.yield_curve) and self.Securities[self.yield_curve].Price != 0 else 1.0
c_vix_ratio = c_vix / c_vix3m if c_vix3m > 0 else 1.0
if c_vix3m > 0 and c_vix > 0:
current_features = [[current_vol, c_vix, c_vix3m, c_vix_ratio, c_vvix, c_credit, c_yield]]
self.current_predicted_iv = self.ml_model.predict(current_features)[0]
if self.Securities.ContainsKey(self.dgs1) and self.Securities[self.dgs1].Price > 0:
self.current_rf_rate = self.Securities[self.dgs1].Price / 100.0
try:
garch_input = spy_log_returns[-252:].values * 100.0
am = arch_model(garch_input, vol='Garch', p=1, q=1, dist='normal')
res = am.fit(disp='off')
self.garch_vol_forecast = (np.sqrt(res.forecast(horizon=1).variance.iloc[-1, 0]) / 100.0) * np.sqrt(252)
if self.garch_vol_forecast > 0:
self.garch_multiplier = self.long_term_vol_mean / self.garch_vol_forecast
self.garch_multiplier = max(min(self.garch_multiplier, 1.75), 0.55)
except Exception: pass
# --- 2D SPECIFIED BREADTH HMM MATRIX GENERATION ---
breadth_ratio = rsp_prices / spy_prices
breadth_log_returns = np.log(breadth_ratio / breadth_ratio.shift(1)).dropna()
hmm_dataframe = pd.concat([spy_log_returns * 100.0, breadth_log_returns * 100.0], axis=1).dropna()
hmm_dataframe.columns = ['Log_Returns', 'Market_Breadth']
mean_returns, std_returns = hmm_dataframe['Log_Returns'].mean(), hmm_dataframe['Log_Returns'].std()
mean_breadth, std_breadth = hmm_dataframe['Market_Breadth'].mean(), hmm_dataframe['Market_Breadth'].std()
if std_returns > 0 and std_breadth > 0:
hmm_dataframe['Scaled_Returns'] = (hmm_dataframe['Log_Returns'] - mean_returns) / std_returns
hmm_dataframe['Scaled_Breadth'] = (hmm_dataframe['Market_Breadth'] - mean_breadth) / std_breadth
hmm_matrix = hmm_dataframe[['Scaled_Returns', 'Scaled_Breadth']].values
try:
hmm = GaussianHMM(n_components=3, covariance_type="full", n_iter=100, random_state=42)
hmm.fit(hmm_matrix)
if np.allclose(hmm.transmat_.sum(axis=1), 1.0):
state_profiles = {}
for i in range(3):
unscaled_mean_return = hmm.means_[i][0] * std_returns + mean_returns
unscaled_mean_breadth = hmm.means_[i][1] * std_breadth + mean_breadth
state_profiles[i] = {'return': unscaled_mean_return, 'breadth': unscaled_mean_breadth}
bear_idx = min(state_profiles, key=lambda k: state_profiles[k]['return'])
remaining_indices = [i for i in range(3) if i != bear_idx]
if state_profiles[remaining_indices[0]]['return'] > state_profiles[remaining_indices[1]]['return']:
bull_idx, sideways_idx = remaining_indices[0], remaining_indices[1]
else:
bull_idx, sideways_idx = remaining_indices[1], remaining_indices[0]
predicted_state_idx = hmm.predict(hmm_matrix)[-1]
if predicted_state_idx == bull_idx: raw_regime = "BULL"
elif predicted_state_idx == bear_idx: raw_regime = "BEAR"
else: raw_regime = "SIDEWAYS"
self.current_regime_log = raw_regime
if self.current_regime is None:
self.current_regime = raw_regime
self.ExecuteRegimeChange()
elif raw_regime == self.current_regime:
self.consecutive_days = 0
self.pending_regime = None
else:
if raw_regime == self.pending_regime:
self.consecutive_days += 1
if self.consecutive_days >= self.confirmation_days:
self.Debug(f"[{self.Time}] HMM TRANSITION LOCK: Moving state to {raw_regime}")
self.current_regime = raw_regime
self.ExecuteRegimeChange()
self.consecutive_days, self.pending_regime = 0, None
else:
self.pending_regime, self.consecutive_days = raw_regime, 1
except Exception: pass
current_spy_price = self.Securities["SPY"].Price
if current_spy_price > 0:
if self.initial_spy_price is None: self.initial_spy_price = current_spy_price
spy_shares_bought = self.starting_cash / self.initial_spy_price
benchmark_equity = spy_shares_bought * current_spy_price
self.Plot("Strategy vs Buy & Hold", "Algorithm Equity", self.Portfolio.TotalPortfolioValue)
self.Plot("Strategy vs Buy & Hold", "SPY Buy & Hold", benchmark_equity)
def ExecuteRegimeChange(self):
if self.IsWarmingUp: return
if self.current_regime in ["BEAR", "SIDEWAYS"]:
self.Liquidate()
self.halved_positions.clear()
self.monthly_target_leaders.clear()
self.monthly_leader_weights.clear()
def RebalanceUniverseRotation(self):
"""Scans and anchors target distribution blueprints on Month Start"""
if self.IsWarmingUp or self.current_regime != "BULL": return
for asset in [self.shv, self.dbc]:
if self.Portfolio[asset].Invested: self.Liquidate(asset)
momentum_scores = []
valid_histories = {}
for symbol in self.symbols:
if self.CurrentSlice is None or not self.CurrentSlice.Bars.ContainsKey(symbol) or self.CurrentSlice[symbol] is None: continue
history = self.History(symbol, 200, Resolution.Daily)
if history.empty or len(history) < 189: continue
prices = history['close'].unstack(level=0)[symbol]
valid_histories[symbol] = prices
ret_3m = (prices.iloc[-1] - prices.iloc[-63]) / prices.iloc[-63]
ret_6m = (prices.iloc[-1] - prices.iloc[-126]) / prices.iloc[-126]
ret_9m = (prices.iloc[-1] - prices.iloc[-189]) / prices.iloc[-189]
blended_score = (ret_3m + ret_6m + ret_9m) / 3.0
momentum_scores.append((symbol, blended_score))
if len(momentum_scores) == 0: return
momentum_scores.sort(key=lambda x: x[1], reverse=True)
# Save the intended top leader blueprint for governance
self.monthly_target_leaders = [momentum_scores[0][0]]
for symbol, holding in self.Portfolio.items():
if holding.Invested and symbol not in self.monthly_target_leaders and symbol in self.symbols:
self.Liquidate(symbol, "Outdated Ranking Reset")
self.halved_positions.pop(symbol, None)
top_leader = self.monthly_target_leaders[0]
if self.CurrentSlice.Bars.ContainsKey(top_leader) and self.CurrentSlice.Bars[top_leader] is not None and self.CurrentSlice.Bars[top_leader].Close > 0:
# Risk-Parity base allocation footprint calculation
prices = valid_histories[top_leader]
daily_returns = np.log(prices / prices.shift(1)).dropna()[-21:]
vol = daily_returns.std()
if vol <= 0: vol = 0.05
# Position-sizing modulation via broad GARCH conditional volatility multipliers
base_weight = min(0.95 * self.garch_multiplier, 0.95)
self.monthly_leader_weights[top_leader] = base_weight
self.SetHoldings(top_leader, base_weight)
if top_leader not in self.halved_positions:
self.halved_positions[top_leader] = False
def OnData(self, slice):
if self.IsWarmingUp: return
# =====================================================================
# 1. THE DEFENSIVE TRACK: Execute Macro Haven Protection
# =====================================================================
if self.current_regime == "BEAR":
if not self.Portfolio[self.shv].Invested and len(self.Transactions.GetOpenOrders()) == 0:
self.SetHoldings(self.shv, 0.95)
return
elif self.current_regime == "SIDEWAYS":
current_vix = self.Securities[self.vix].Price if self.Securities.ContainsKey(self.vix) else 20.0
current_curve = self.Securities[self.yield_curve].Price if self.Securities.ContainsKey(self.yield_curve) else 0.50
if current_curve <= 0.355:
if not self.Portfolio[self.shv].Invested and len(self.Transactions.GetOpenOrders()) == 0:
self.SetHoldings(self.shv, 0.95)
else:
if current_vix <= 40.815:
if not self.Portfolio[self.dbc].Invested and len(self.Transactions.GetOpenOrders()) == 0:
self.SetHoldings(self.dbc, 0.475)
self.SetHoldings(self.shv, 0.475)
else:
if not self.Portfolio[self.shv].Invested and len(self.Transactions.GetOpenOrders()) == 0:
self.SetHoldings(self.shv, 0.95)
return
elif self.current_regime != "BULL":
return
# =====================================================================
# 2. THE OFFENSIVE TRACK: Mid-Month Re-Entry Trigger Logic
# =====================================================================
# UPGRADE 2: Check for mid-month recovery if the system got stopped out prematurely
for leader in self.monthly_target_leaders:
if not self.Portfolio[leader].Invested and leader in self.monthly_leader_weights:
# Extract clean high-frequency indicators to construct short-term moving average crossovers
hist_data = self.History(leader, 30, Resolution.Daily)
if not hist_data.empty and len(hist_data) >= 20:
close_series = hist_data['close'].unstack(level=0)[leader]
ema_fast = close_series.ewm(span=5, adjust=False).mean().iloc[-1]
ema_slow = close_series.ewm(span=20, adjust=False).mean().iloc[-1]
# If stopped out but price breaks back into short-term trend, execute re-entry trigger
if ema_fast > ema_slow and len(self.Transactions.GetOpenOrders(leader)) == 0:
re_entry_target_weight = self.monthly_leader_weights[leader]
self.Debug(f"[{self.Time}] TRACER RE-ENTRY TRIGGERED: {leader.Value} fast trend restored. Deploying target weight: {re_entry_target_weight*100:.2f}%")
self.SetHoldings(leader, re_entry_target_weight)
self.halved_positions[leader] = False
# =====================================================================
# 3. TRADING RISK MITIGATION LOOP
# =====================================================================
for symbol in list(self.halved_positions.keys()):
if not self.Portfolio.ContainsKey(symbol): continue
holding = self.Portfolio[symbol]
if not holding.Invested: continue
if not slice.Bars.ContainsKey(symbol) or slice[symbol] is None: continue
current_price = slice[symbol].Close
avg_entry_price = holding.AveragePrice
if avg_entry_price <= 0 or current_price <= 0: continue
history = self.History(symbol, 20, Resolution.Daily)
if history.empty or 'high' not in history.columns: continue
highs = history['high'].unstack(level=0)[symbol]
lows = history['low'].unstack(level=0)[symbol]
closes = history['close'].unstack(level=0)[symbol]
tr1 = highs - lows
tr2 = (highs - closes.shift(1)).abs()
tr3 = (lows - closes.shift(1)).abs()
true_range = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
atr = true_range.rolling(14).mean().iloc[-1]
if atr <= 0: continue
tranche_1_target = avg_entry_price + (3.5 * atr)
tranche_2_target = avg_entry_price + (7.5 * atr)
# UPGRADE 3: Initial stop loss floor expanded out to -2.5 * ATR to absorb normal intraday noise
if self.halved_positions[symbol]:
stop_loss_floor = avg_entry_price
else:
stop_loss_floor = avg_entry_price - (2.5 * atr)
# --- HARD STOP SAFETY VALVE DISPATCH ---
if current_price <= stop_loss_floor:
self.Debug(f"[{self.Time}] TRAILING RISK STOP TRIGGERED: Liquidating {symbol.Value} at ${current_price:.2f}.")
self.Liquidate(symbol, f"ALIGNED STOP TRIGGERED: Volatility floor breached at {current_price}. Exiting.")
self.halved_positions.pop(symbol, None)
continue
# --- ASYMMETRICAL HARVESTING DISPATCH ---
if current_price >= tranche_1_target and not self.halved_positions[symbol]:
sell_quantity = int(holding.Quantity / 2)
if sell_quantity > 0:
self.MarketOrder(symbol, -sell_quantity, False, f"TRANCHE 1 TAKE-PROFIT: Cleared 50% lines at {current_price}")
self.halved_positions[symbol] = True
continue
if current_price >= tranche_2_target and self.halved_positions[symbol]:
self.Liquidate(symbol, f"TRANCHE 2 MAX RUNNER CLEAR: Max sweep target hit at {current_price}")
self.halved_positions.pop(symbol, None)