| Overall Statistics |
|
Total Orders 2592 Average Win 0.32% Average Loss -0.24% Compounding Annual Return 26.068% Drawdown 16.000% Expectancy 0.783 Start Equity 100000 End Equity 1021795.50 Net Profit 921.796% Sharpe Ratio 1.298 Sortino Ratio 1.539 Probabilistic Sharpe Ratio 94.190% Loss Rate 24% Win Rate 76% Profit-Loss Ratio 1.35 Alpha 0.093 Beta 0.719 Annual Standard Deviation 0.119 Annual Variance 0.014 Information Ratio 1.032 Tracking Error 0.067 Treynor Ratio 0.215 Total Fees $2880.27 Estimated Strategy Capacity $0 Lowest Capacity Asset SPY R735QTJ8XC9X Portfolio Turnover 4.83% Drawdown Recovery 227 |
from AlgorithmImports import *
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
# ─────────────────────────────────────────────────────────────────────────────
# VolatilityHarvest — Production v1.0
#
# Strategy summary
# ────────────────
# A VIX-regime rules system enhanced by a Random Forest ML overlay.
# The rules classify market conditions into six regimes and allocate between
# SPY, GLD, and cash accordingly. The ML model — trained on 24 features
# spanning VIX structure, credit spreads, market breadth, yield curve slope,
# and multi-horizon momentum — provides a conviction boost (+10–15% SPY) when
# the probability of an above-median 21-day forward return exceeds 0.65.
#
# Universe
# ────────
# SPY — S&P 500 ETF (primary equity exposure)
# GLD — Gold ETF (defensive hedge)
# VIX — CBOE Volatility Index (regime anchor, custom data)
# VIX3M — CBOE 3-Month VIX (term structure signal)
# HYG — iShares High Yield (credit spread proxy)
# LQD — iShares Inv. Grade (credit spread proxy)
# RSP — Equal Weight S&P 500 (market breadth signal)
# IEF — iShares 7-10yr TSY (yield curve proxy)
# SHY — iShares 1-3yr TSY (yield curve proxy)
#
# Regime rules (SPY / GLD / cash)
# ────────────────────────────────
# VIX spike + deep dip → 85–100% / 0–15% / 0%
# VIX spike + shallow dip → 60–75% / 25–40% / 0%
# VIX very low + extended → 40% / 20% / 40%
# VIX elevated but falling → 70–85% / 10% / 5–20%
# VIX rising sharply → 30% / 20% / 50%
# Above 200MA (default) → 60–70% / 15% / 15–25%
# Below 200MA (default) → 30% / 20% / 50%
#
# Backtest results (2016–2026, out-of-sample stress test)
# ────────────────────────────────────────────────────────
# CAGR 21.5%
# Sharpe ratio 1.227
# Sortino ratio 1.370
# Max drawdown 14.0%
# Drawdown recovery 133 days
# PSR 94.1%
# Expectancy 0.832
# Win rate 83%
# Net profit 604.7% ($100k → $704k)
#
# Live deployment notes
# ─────────────────────
# 1. Paper trade for at least 30 days before committing capital. Verify
# that VIX and VIX3M CBOE remote CSV feeds update correctly each day —
# stale VIX data will misclassify regimes silently.
# 2. Check monthly TrainModel logs: val_acc, majority_baseline, and edge.
# If edge turns persistently negative, disable ML (set self.trained=False)
# and run rules-only until the next regime stabilises.
# 3. This strategy holds cash (unallocated weight). Ensure your brokerage
# account earns interest on idle cash — at 50% cash in defensive regimes
# that drag is meaningful at low rate environments.
# 4. Estimated strategy capacity is low ($0 in QC estimate) — suitable for
# personal/small fund sizing only, not institutional scale.
# ─────────────────────────────────────────────────────────────────────────────
# ── Constants ─────────────────────────────────────────────────────────────────
LABEL_HORIZON = 21 # forward return window for ML labels (trading days)
SAFETY_BUFFER = 10 # bars excluded at tail of history to prevent look-ahead
TRAIN_VAL_GAP = 126 # bars held out as validation set (~6 months)
MIN_TRAIN_ROWS = 100 # minimum training samples required to fit model
ML_THRESHOLD = 0.65 # minimum predicted probability to activate ML tilt
MIN_VIX_BARS = 50 # minimum VIX history for feature computation
MIN_SPY_BARS = 260 # minimum SPY history (252d momentum + margin)
MIN_AUX_BARS = 60 # minimum history for auxiliary series (60d features)
DIP_DEEP_THRESHOLD = -0.08 # 10d SPY return below this = deep dip
DIP_SHALLOW_SPY_W = 0.60 # SPY weight for shallow dip entry
DIP_SHALLOW_SPY_W_ML = 0.75 # SPY weight for shallow dip + ML bullish
DIP_DEEP_SPY_W = 0.85 # SPY weight for deep dip entry
DIP_DEEP_SPY_W_ML = 1.00 # SPY weight for deep dip + ML bullish
# ── Algorithm ─────────────────────────────────────────────────────────────────
class VolatilityHarvest(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2016, 1, 1)
self.SetCash(100000)
# Trading assets
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self.gld = self.AddEquity("GLD", Resolution.Daily).Symbol
# VIX and term structure
self.vix = self.AddData(CBOE, "VIX", Resolution.Daily).Symbol
self.vix3m = self.AddData(CBOE, "VIX3M", Resolution.Daily).Symbol
# Credit spread proxy
self.hyg = self.AddEquity("HYG", Resolution.Daily).Symbol
self.lqd = self.AddEquity("LQD", Resolution.Daily).Symbol
# Market breadth
self.rsp = self.AddEquity("RSP", Resolution.Daily).Symbol
# Yield curve proxy
self.ief = self.AddEquity("IEF", Resolution.Daily).Symbol
self.shy = self.AddEquity("SHY", Resolution.Daily).Symbol
# ML model
self.model = RandomForestClassifier(
n_estimators=200,
max_depth=6,
min_samples_leaf=20,
random_state=42,
)
self.scaler = StandardScaler()
self.trained = False
# Daily signal check — 30 min after open (avoids open auction noise)
self.Schedule.On(
self.DateRules.EveryDay("SPY"),
self.TimeRules.AfterMarketOpen("SPY", 30),
self.CheckSignal,
)
# Monthly model retraining — first trading day, 60 min after open
self.Schedule.On(
self.DateRules.MonthStart("SPY"),
self.TimeRules.AfterMarketOpen("SPY", 60),
self.TrainModel,
)
self.SetWarmUp(252)
# ── Feature engineering ───────────────────────────────────────────────────
def GetFeatures(self, vix_closes, spy_closes,
vix3m_closes=None,
hyg_closes=None, lqd_closes=None,
rsp_closes=None,
ief_closes=None, shy_closes=None):
"""
Build a 24-element feature vector. Returns None if core series are
too short. Auxiliary series default to 0.0 when unavailable.
"""
if len(vix_closes) < MIN_VIX_BARS or len(spy_closes) < MIN_SPY_BARS:
return None
# VIX features
current_vix = vix_closes[-1]
vix_sma20 = np.mean(vix_closes[-20:])
vix_sma50 = np.mean(vix_closes[-50:])
vix_std = np.std(vix_closes[-20:])
vix_zscore = (current_vix - vix_sma20) / vix_std if vix_std > 0 else 0.0
vix_pct_rank = np.sum(vix_closes < current_vix) / len(vix_closes)
# SPY features
spy_current = spy_closes[-1]
spy_sma50 = np.mean(spy_closes[-50:])
spy_sma200 = np.mean(spy_closes[-200:])
spy_5d_ret = spy_closes[-1] / spy_closes[-5] - 1
spy_10d_ret = spy_closes[-1] / spy_closes[-10] - 1
spy_20d_ret = spy_closes[-1] / spy_closes[-20] - 1
spy_vol = np.std(np.diff(spy_closes[-21:]) / spy_closes[-21:-1])
# Medium-term momentum (G5)
spy_60d_ret = spy_closes[-1] / spy_closes[-60] - 1
spy_120d_ret = spy_closes[-1] / spy_closes[-120] - 1
spy_252d_ret = spy_closes[-1] / spy_closes[-252] - 1
# VIX term structure (G1)
if vix3m_closes is not None and len(vix3m_closes) >= 5:
vix_term_ratio = current_vix / vix3m_closes[-1] if vix3m_closes[-1] > 0 else 1.0
vix_term_5d_chg = (current_vix / vix_closes[-5]) - (vix3m_closes[-1] / vix3m_closes[-5])
else:
vix_term_ratio = 0.0
vix_term_5d_chg = 0.0
# Credit spread (G2)
if (hyg_closes is not None and lqd_closes is not None
and len(hyg_closes) >= MIN_AUX_BARS and len(lqd_closes) >= MIN_AUX_BARS):
credit_ratio = hyg_closes[-1] / lqd_closes[-1]
credit_5d_chg = (hyg_closes[-1] / hyg_closes[-5]) - (lqd_closes[-1] / lqd_closes[-5])
credit_20d_chg = (hyg_closes[-1] / hyg_closes[-20]) - (lqd_closes[-1] / lqd_closes[-20])
else:
credit_ratio = 0.0
credit_5d_chg = 0.0
credit_20d_chg = 0.0
# Market breadth (G3)
if rsp_closes is not None and len(rsp_closes) >= MIN_AUX_BARS:
breadth_ratio = rsp_closes[-1] / spy_closes[-1]
breadth_5d = (rsp_closes[-1] / rsp_closes[-5]) - (spy_closes[-1] / spy_closes[-5])
breadth_20d = (rsp_closes[-1] / rsp_closes[-20]) - (spy_closes[-1] / spy_closes[-20])
else:
breadth_ratio = 0.0
breadth_5d = 0.0
breadth_20d = 0.0
# Yield curve (G4)
if (ief_closes is not None and shy_closes is not None
and len(ief_closes) >= MIN_AUX_BARS and len(shy_closes) >= MIN_AUX_BARS):
curve_slope = (ief_closes[-1] / ief_closes[-20]) - (shy_closes[-1] / shy_closes[-20])
curve_slope_60d = (ief_closes[-1] / ief_closes[-60]) - (shy_closes[-1] / shy_closes[-60])
else:
curve_slope = 0.0
curve_slope_60d = 0.0
return [
current_vix, vix_zscore, vix_pct_rank,
current_vix / vix_sma20, current_vix / vix_sma50,
spy_5d_ret, spy_10d_ret, spy_20d_ret,
spy_current / spy_sma50, spy_current / spy_sma200,
spy_vol * np.sqrt(252),
spy_60d_ret, spy_120d_ret, spy_252d_ret,
vix_term_ratio, vix_term_5d_chg,
credit_ratio, credit_5d_chg, credit_20d_chg,
breadth_ratio, breadth_5d, breadth_20d,
curve_slope, curve_slope_60d,
]
# ── History helpers ───────────────────────────────────────────────────────
def _closes(self, symbol, start_dt, end_dt, min_bars=1):
try:
h = self.History(symbol, start_dt, end_dt, Resolution.Daily)
if h.empty:
return None
c = h['close'].values
return c if len(c) >= min_bars else None
except Exception as e:
self.Log(f"History error ({symbol}): {e}")
return None
def _cboe_closes(self, symbol, start_dt, end_dt, min_bars=1):
try:
h = self.History(CBOE, symbol, start_dt, end_dt, Resolution.Daily)
if h.empty:
return None
c = h['close'].values
return c if len(c) >= min_bars else None
except Exception as e:
self.Log(f"CBOE history error ({symbol}): {e}")
return None
# ── Model training ────────────────────────────────────────────────────────
def TrainModel(self):
if self.IsWarmingUp:
return
end_dt = self.Time
start_dt = end_dt - timedelta(days=4000)
# Core series — required
vix_c = self._cboe_closes(self.vix, start_dt, end_dt, MIN_VIX_BARS)
spy_c = self._closes(self.spy, start_dt, end_dt, MIN_SPY_BARS)
if vix_c is None or spy_c is None:
self.Log("TrainModel: missing core history, skipping.")
return
# Auxiliary series — optional
vix3m_c = self._cboe_closes(self.vix3m, start_dt, end_dt, 5)
hyg_c = self._closes(self.hyg, start_dt, end_dt, MIN_AUX_BARS)
lqd_c = self._closes(self.lqd, start_dt, end_dt, MIN_AUX_BARS)
rsp_c = self._closes(self.rsp, start_dt, end_dt, MIN_AUX_BARS)
ief_c = self._closes(self.ief, start_dt, end_dt, MIN_AUX_BARS)
shy_c = self._closes(self.shy, start_dt, end_dt, MIN_AUX_BARS)
self.Log(f"TrainModel | SPY {spy_c[0]:.2f}→{spy_c[-1]:.2f} "
f"bars={len(spy_c)} date={end_dt.date()}")
# Look-ahead guard
label_cutoff = len(spy_c) - LABEL_HORIZON - SAFETY_BUFFER
if label_cutoff < MIN_SPY_BARS + MIN_TRAIN_ROWS:
self.Log("TrainModel: insufficient data, skipping.")
return
# Walk-forward split
train_end = label_cutoff - TRAIN_VAL_GAP
if train_end - MIN_SPY_BARS < MIN_TRAIN_ROWS:
self.Log("TrainModel: training window too small, skipping.")
return
# Percentile-based labels — median split guarantees ~50/50
indices = list(range(MIN_SPY_BARS, label_cutoff))
fwd_rets = [spy_c[i + LABEL_HORIZON] / spy_c[i] - 1 for i in indices]
median_r = np.median(fwd_rets)
X_all, y_all = [], []
for idx, i in enumerate(indices):
f = self.GetFeatures(
vix_c[:i], spy_c[:i],
vix3m_closes = vix3m_c[:i] if vix3m_c is not None else None,
hyg_closes = hyg_c[:i] if hyg_c is not None else None,
lqd_closes = lqd_c[:i] if lqd_c is not None else None,
rsp_closes = rsp_c[:i] if rsp_c is not None else None,
ief_closes = ief_c[:i] if ief_c is not None else None,
shy_closes = shy_c[:i] if shy_c is not None else None,
)
if f is not None:
X_all.append(f)
y_all.append(1 if fwd_rets[idx] > median_r else 0)
if len(X_all) < MIN_TRAIN_ROWS + 20:
self.Log(f"TrainModel: too few samples ({len(X_all)}), skipping.")
return
X_all = np.array(X_all)
y_all = np.array(y_all)
# Degenerate label guard
class_1_rate = float(np.mean(y_all))
if class_1_rate > 0.95 or class_1_rate < 0.05:
self.Log(f"TrainModel: degenerate labels ({class_1_rate:.3f}), "
f"disabling ML until next month.")
self.trained = False
return
# Train / validation split
split = train_end - MIN_SPY_BARS
X_tr, y_tr = X_all[:split], y_all[:split]
X_va, y_va = X_all[split:], y_all[split:]
if len(X_tr) < MIN_TRAIN_ROWS:
self.Log("TrainModel: not enough training rows, skipping.")
return
self.scaler.fit(X_tr)
self.model.fit(self.scaler.transform(X_tr), y_tr)
self.trained = True
# Validation logging
if len(X_va) > 0:
val_acc = self.model.score(self.scaler.transform(X_va), y_va)
baseline = float(np.mean(y_va))
self.Log(f"TrainModel | train={len(X_tr)} val={len(X_va)} "
f"val_acc={val_acc:.3f} baseline={baseline:.3f} "
f"edge={val_acc - baseline:+.3f} "
f"label_rate={class_1_rate:.3f} "
f"median_fwd={median_r:.4f}")
# Feature importance — top 7
names = [
"vix_level","vix_zscore","vix_pct_rank","vix_vs_sma20","vix_vs_sma50",
"spy_5d","spy_10d","spy_20d","spy_vs_sma50","spy_vs_sma200","spy_vol",
"spy_60d","spy_120d","spy_252d",
"vix_term_ratio","vix_term_5d_chg",
"credit_ratio","credit_5d_chg","credit_20d_chg",
"breadth_ratio","breadth_5d","breadth_20d",
"curve_slope_20d","curve_slope_60d",
]
top = sorted(zip(names, self.model.feature_importances_),
key=lambda x: -x[1])[:7]
self.Log("Features: " + " | ".join(f"{n}={v:.3f}" for n, v in top))
# ── Signal generation ─────────────────────────────────────────────────────
def CheckSignal(self):
if self.IsWarmingUp:
return
# Fetch core history
spy_hist = self.History([self.spy], 270, Resolution.Daily)
vix_hist = self.History([self.vix], 100, Resolution.Daily)
if spy_hist.empty or vix_hist.empty:
return
try:
spy_c = spy_hist.loc[self.spy]['close'].values
vix_c = vix_hist.loc[self.vix]['close'].values
except Exception as e:
self.Log(f"CheckSignal core history error: {e}")
return
if len(vix_c) < MIN_VIX_BARS or len(spy_c) < MIN_SPY_BARS:
return
# Fetch auxiliary series
def _get(sym, n):
try:
h = self.History([sym], n, Resolution.Daily)
return None if h.empty else h.loc[sym]['close'].values
except:
return None
vix3m_c = _get(self.vix3m, 10)
hyg_c = _get(self.hyg, MIN_AUX_BARS)
lqd_c = _get(self.lqd, MIN_AUX_BARS)
rsp_c = _get(self.rsp, MIN_AUX_BARS)
ief_c = _get(self.ief, MIN_AUX_BARS)
shy_c = _get(self.shy, MIN_AUX_BARS)
# Regime indicators
current_vix = vix_c[-1]
vix_sma = np.mean(vix_c[-20:])
vix_level_80 = np.percentile(vix_c, 80)
spy_current = spy_c[-1]
spy_sma50 = np.mean(spy_c[-50:])
spy_sma200 = np.mean(spy_c[-200:])
spy_5d_ret = spy_c[-1] / spy_c[-5] - 1
spy_10d_ret = spy_c[-1] / spy_c[-10] - 1
# ML signal
ml_bullish = False
if self.trained:
feats = self.GetFeatures(
vix_c, spy_c,
vix3m_closes=vix3m_c,
hyg_closes=hyg_c, lqd_closes=lqd_c,
rsp_closes=rsp_c,
ief_closes=ief_c, shy_closes=shy_c,
)
if feats is not None:
proba = self.model.predict_proba(
self.scaler.transform([feats]))[0]
prob = proba[1] if len(proba) == 2 else (
0.7 if self.model.predict(
self.scaler.transform([feats]))[0] == 1 else 0.5)
ml_bullish = prob > ML_THRESHOLD
# ── Regime rules ──────────────────────────────────────────────────────
# SPY and GLD always set explicitly. Unallocated weight = cash.
# Rule 1 — VIX spike + oversold → staged dip entry
if current_vix > vix_level_80 and spy_5d_ret < -0.03:
if spy_10d_ret <= DIP_DEEP_THRESHOLD:
spy_w = DIP_DEEP_SPY_W_ML if ml_bullish else DIP_DEEP_SPY_W
else:
spy_w = DIP_SHALLOW_SPY_W_ML if ml_bullish else DIP_SHALLOW_SPY_W
self.SetHoldings(self.spy, spy_w)
self.SetHoldings(self.gld, max(0.0, 1.0 - spy_w))
self.Log(f"R1-dip | vix={current_vix:.1f} 5d={spy_5d_ret:.3f} "
f"10d={spy_10d_ret:.3f} spy={spy_w:.2f} ml={ml_bullish}")
return
# Rule 2 — VIX very low + SPY extended → reduce risk
if current_vix < 13 and spy_current > spy_sma50 * 1.05:
self.SetHoldings(self.spy, 0.40)
self.SetHoldings(self.gld, 0.20)
# 40% cash
return
# Rule 3 — VIX elevated but falling → recovery trade
if 20 < current_vix < vix_sma:
self.SetHoldings(self.spy, 0.85 if ml_bullish else 0.70)
self.SetHoldings(self.gld, 0.10)
# remainder cash
return
# Rule 4 — VIX rising sharply → defensive
if current_vix > vix_sma * 1.2:
self.SetHoldings(self.spy, 0.30)
self.SetHoldings(self.gld, 0.20)
# 50% cash
return
# Rule 5 — Default: SPY above 200MA → trend following
if spy_current > spy_sma200:
self.SetHoldings(self.spy, 0.70 if ml_bullish else 0.60)
self.SetHoldings(self.gld, 0.15)
# remainder cash
else:
# Rule 6 — Default: SPY below 200MA → defensive
self.SetHoldings(self.spy, 0.30)
self.SetHoldings(self.gld, 0.20)
# 50% cash
def OnData(self, data):
pass
# ─────────────────────────────────────────────────────────────────────────────
# CBOE custom data reader
# Routes VIX and VIX3M to their respective CBOE history CSVs.
# ─────────────────────────────────────────────────────────────────────────────
class CBOE(PythonData):
def GetSource(self, config, date, isLive):
urls = {
"VIX": "https://cdn.cboe.com/api/global/us_indices/daily_prices/VIX_History.csv",
"VIX3M": "https://cdn.cboe.com/api/global/us_indices/daily_prices/VIX3M_History.csv",
}
url = urls.get(config.Symbol.Value, urls["VIX"])
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLive):
if not (line.strip() and line[0].isdigit()):
return None
cols = line.split(',')
try:
obj = CBOE()
obj.Symbol = config.Symbol
obj.Time = datetime.strptime(cols[0], "%m/%d/%Y")
obj.Value = float(cols[4])
obj["close"] = float(cols[4])
obj["open"] = float(cols[1])
obj["high"] = float(cols[2])
obj["low"] = float(cols[3])
return obj
except:
return Nonefrom AlgorithmImports import *
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
# ─────────────────────────────────────────────────────────────────────────────
# HYBRID STRATEGY — Production v1.0
#
# Macro engine : VolatilityHarvest (SPY / GLD / VIX regime + RF ML overlay)
# Equity sleeve : Fundamental value + ROC63 momentum
# Design : Cash in VolatilityHarvest replaced by equity sleeve
#
# Sleeve is active in calm / recovery regimes (Rules 2, 3, 5).
# Sleeve is liquidated in stress regimes (Rules 1, 4, 6).
#
# Backtest results (2016–2026)
# ─────────────────────────────
# CAGR 27.5% | Sharpe 1.336 | Max DD 15.6% | PSR 95.3% | Expectancy 0.791
#
# Live hardening applied (v1.0)
# ──────────────────────────────
# - _get_closes() unified helper handles both multi-index (backtest) and
# flat (live) DataFrame formats — fixes the .loc[symbol] crash
# - CheckSignal and TrainModel wrapped in outer try/except
# - CheckSignal only updates weight state variables; all SetHoldings
# calls consolidated into RebalanceSleeve for a single atomic rebalance
# - RebalanceSleeve logs candidate count, tickers, and weights each month
# ─────────────────────────────────────────────────────────────────────────────
# ── VolatilityHarvest constants ───────────────────────────────────────────────
LABEL_HORIZON = 21
SAFETY_BUFFER = 10
TRAIN_VAL_GAP = 126
MIN_TRAIN_ROWS = 100
ML_THRESHOLD = 0.65
MIN_VIX_BARS = 50
MIN_SPY_BARS = 260
MIN_AUX_BARS = 60
DIP_DEEP_THRESHOLD = -0.08
DIP_SHALLOW_SPY_W = 0.60
DIP_SHALLOW_SPY_W_ML = 0.75
DIP_DEEP_SPY_W = 0.85
DIP_DEEP_SPY_W_ML = 1.00
# ── Algorithm ─────────────────────────────────────────────────────────────────
class HybridVolatilityHarvestFundamental(QCAlgorithm):
# ── Initialise ────────────────────────────────────────────────────────────
def Initialize(self):
self.SetStartDate(2016, 1, 1)
self.SetCash(100000)
self.SetBrokerageModel(
BrokerageName.InteractiveBrokersBrokerage,
AccountType.Margin,
)
self.SetBenchmark("SPY")
# Macro assets
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self.gld = self.AddEquity("GLD", Resolution.Daily).Symbol
self.vix = self.AddData(CBOE, "VIX", Resolution.Daily).Symbol
self.vix3m = self.AddData(CBOE, "VIX3M", Resolution.Daily).Symbol
self.hyg = self.AddEquity("HYG", Resolution.Daily).Symbol
self.lqd = self.AddEquity("LQD", Resolution.Daily).Symbol
self.rsp = self.AddEquity("RSP", Resolution.Daily).Symbol
self.ief = self.AddEquity("IEF", Resolution.Daily).Symbol
self.shy = self.AddEquity("SHY", Resolution.Daily).Symbol
# ML model
self.model = RandomForestClassifier(
n_estimators=200,
max_depth=6,
min_samples_leaf=20,
random_state=42,
)
self.scaler = StandardScaler()
self.trained = False
# Macro weight state — written by CheckSignal, read by RebalanceSleeve
self.spy_weight = 0.0
self.gld_weight = 0.0
self.cash_sleeve_weight = 0.0
# Equity sleeve config
self.MAX_POSITION_WEIGHT = 0.20
self.MAX_POSITIONS = 10
self.MIN_HISTORY_DAYS = 5
self.MOMENTUM_LOOKBACK = 63
self.MOMENTUM_MIN_RETURN = 0.0
self._selected_symbols: List[Symbol] = []
self._symbol_added_date: dict = {}
self._momentum: dict = {}
self.UniverseSettings.Resolution = Resolution.Daily
self.UniverseSettings.DataNormalizationMode = DataNormalizationMode.Adjusted
self.UniverseSettings.FillDataBeforeStart = True
self.AddUniverse(self.FundamentalSelection)
# Schedules
# CheckSignal — every day, 30 min after open → updates weight state only
# TrainModel — month start, 60 min after open → retrains RF
# RebalanceSleeve — month start, 90 min after open → single atomic rebalance
self.Schedule.On(
self.DateRules.EveryDay(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.CheckSignal,
)
self.Schedule.On(
self.DateRules.MonthStart(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 60),
self.TrainModel,
)
self.Schedule.On(
self.DateRules.MonthStart(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 90),
self.RebalanceSleeve,
)
self.SetWarmUp(252)
# ── History helpers ───────────────────────────────────────────────────────
def _get_closes(self, symbol, n_bars, is_custom=False):
"""
Fetch close prices as a numpy array. Returns None on any failure.
Handles two DataFrame structures seen across QC environments:
Multi-index (symbol, time) → standard backtest format
Flat index with 'close' column → some live environments
is_custom=True fetches CBOE data via date-range History.
"""
try:
if is_custom:
end_dt = self.Time
start_dt = end_dt - timedelta(days=n_bars * 2)
df = self.History(CBOE, symbol, start_dt, end_dt, Resolution.Daily)
else:
df = self.History([symbol], n_bars, Resolution.Daily)
if df is None or df.empty:
return None
# Try multi-index access (standard)
if isinstance(df.index, pd.MultiIndex):
for key in (symbol, symbol.Value if hasattr(symbol, 'Value') else None):
if key is None:
continue
try:
closes = df.xs(key, level=0)['close'].values
if len(closes) > 0:
return closes
except Exception:
pass
# Flat DataFrame fallback
if 'close' in df.columns:
closes = df['close'].values
if len(closes) > 0:
return closes
self.Log(f"_get_closes: no closes extracted for {symbol}")
return None
except Exception as e:
self.Log(f"_get_closes error ({symbol}): {e}")
return None
def _get_cboe_closes(self, symbol, days=4000, min_bars=1):
"""Fetch CBOE custom data closes over a date range."""
try:
end_dt = self.Time
start_dt = end_dt - timedelta(days=days)
df = self.History(CBOE, symbol, start_dt, end_dt, Resolution.Daily)
if df is None or df.empty:
return None
closes = df['close'].values
return closes if len(closes) >= min_bars else None
except Exception as e:
self.Log(f"_get_cboe_closes error ({symbol}): {e}")
return None
# ── Fundamental helpers ───────────────────────────────────────────────────
def _get_float(self, f, paths: List[str]):
for p in paths:
try:
obj = f
for part in p.split('.'):
obj = getattr(obj, part)
if isinstance(obj, (float, int)) and np.isfinite(obj):
return float(obj)
if hasattr(obj, 'Value'):
val = obj.Value
if isinstance(val, (float, int)) and np.isfinite(val):
return float(val)
val = float(obj)
if np.isfinite(val):
return val
except:
continue
return float('nan')
def _is_finite(self, v) -> bool:
try:
return v is not None and np.isfinite(float(v))
except:
return False
# ── Fundamental universe ──────────────────────────────────────────────────
def FundamentalSelection(self, fundamental: List[Fundamental]) -> List[Symbol]:
filtered = [
f for f in fundamental
if getattr(f, 'HasFundamentalData', False)
and float(getattr(f, 'Price', 0)) > 5
and getattr(f, 'DollarVolume', 0) > 10_000_000
]
top1000 = sorted(filtered, key=lambda f: f.DollarVolume, reverse=True)[:1000]
selected = []
for f in top1000:
pe = self._get_float(f, ["ValuationRatios.PERatio",
"ValuationRatios.PriceEarningsRatio"])
dte = self._get_float(f, ["OperationRatios.DebtToEquity",
"OperationRatios.TotalDebtEquityRatio"])
div_yield = self._get_float(f, ["ValuationRatios.TrailingDividendYield",
"ValuationRatios.ForwardDividendYield"])
roi = self._get_float(f, ["OperationRatios.ROIC",
"ProfitabilityRatios.ROIC",
"ProfitabilityRatios.ReturnOnInvestedCapital",
"ProfitabilityRatios.ReturnOnInvestment"])
if not all(self._is_finite(v) for v in [pe, dte, div_yield, roi]):
continue
if pe < 5 or pe > 18: continue
if dte >= 1.0: continue
if div_yield <= 0.01: continue
if roi <= 0.12: continue
selected.append((f.Symbol, float(roi)))
symbols = [x[0] for x in sorted(selected, key=lambda x: x[1], reverse=True)[:20]]
self._selected_symbols = symbols
return symbols
# ── Securities changed ────────────────────────────────────────────────────
def OnSecuritiesChanged(self, changes: SecurityChanges):
macro_symbols = {self.spy, self.gld, self.hyg, self.lqd,
self.rsp, self.ief, self.shy}
for sec in changes.RemovedSecurities:
if sec.Symbol in macro_symbols:
continue
self._momentum.pop(sec.Symbol, None)
self._symbol_added_date.pop(sec.Symbol, None)
for sec in changes.AddedSecurities:
if sec.Symbol in macro_symbols:
continue
self._symbol_added_date[sec.Symbol] = self.Time
self._momentum[sec.Symbol] = self.ROC(
sec.Symbol, self.MOMENTUM_LOOKBACK, Resolution.Daily
)
# ── Feature engineering ───────────────────────────────────────────────────
def GetFeatures(self, vix_c, spy_c,
vix3m_closes=None, hyg_closes=None, lqd_closes=None,
rsp_closes=None, ief_closes=None, shy_closes=None):
if len(vix_c) < MIN_VIX_BARS or len(spy_c) < MIN_SPY_BARS:
return None
try:
cv = vix_c[-1]
vix_sma20 = np.mean(vix_c[-20:])
vix_sma50 = np.mean(vix_c[-50:])
vix_std = np.std(vix_c[-20:])
vix_zscore = (cv - vix_sma20) / vix_std if vix_std > 0 else 0.0
vix_pct_rank = float(np.sum(vix_c < cv)) / len(vix_c)
sc = spy_c[-1]
spy_sma50 = np.mean(spy_c[-50:])
spy_sma200 = np.mean(spy_c[-200:])
spy_5d = spy_c[-1] / spy_c[-5] - 1
spy_10d = spy_c[-1] / spy_c[-10] - 1
spy_20d = spy_c[-1] / spy_c[-20] - 1
spy_vol = np.std(np.diff(spy_c[-21:]) / spy_c[-21:-1])
spy_60d = spy_c[-1] / spy_c[-60] - 1
spy_120d = spy_c[-1] / spy_c[-120] - 1
spy_252d = spy_c[-1] / spy_c[-252] - 1
if vix3m_closes is not None and len(vix3m_closes) >= 5 and vix3m_closes[-1] > 0:
vix_tr = cv / vix3m_closes[-1]
vix_t5 = (cv / vix_c[-5]) - (vix3m_closes[-1] / vix3m_closes[-5])
else:
vix_tr = vix_t5 = 0.0
if (hyg_closes is not None and lqd_closes is not None
and len(hyg_closes) >= MIN_AUX_BARS and len(lqd_closes) >= MIN_AUX_BARS
and lqd_closes[-1] > 0):
cr_r = hyg_closes[-1] / lqd_closes[-1]
cr_5 = (hyg_closes[-1] / hyg_closes[-5]) - (lqd_closes[-1] / lqd_closes[-5])
cr_20 = (hyg_closes[-1] / hyg_closes[-20]) - (lqd_closes[-1] / lqd_closes[-20])
else:
cr_r = cr_5 = cr_20 = 0.0
if rsp_closes is not None and len(rsp_closes) >= MIN_AUX_BARS and sc > 0:
br_r = rsp_closes[-1] / sc
br_5 = (rsp_closes[-1] / rsp_closes[-5]) - (spy_c[-1] / spy_c[-5])
br_20 = (rsp_closes[-1] / rsp_closes[-20]) - (spy_c[-1] / spy_c[-20])
else:
br_r = br_5 = br_20 = 0.0
if (ief_closes is not None and shy_closes is not None
and len(ief_closes) >= MIN_AUX_BARS and len(shy_closes) >= MIN_AUX_BARS):
cu_20 = (ief_closes[-1] / ief_closes[-20]) - (shy_closes[-1] / shy_closes[-20])
cu_60 = (ief_closes[-1] / ief_closes[-60]) - (shy_closes[-1] / shy_closes[-60])
else:
cu_20 = cu_60 = 0.0
return [
cv, vix_zscore, vix_pct_rank, cv / vix_sma20, cv / vix_sma50,
spy_5d, spy_10d, spy_20d, sc / spy_sma50, sc / spy_sma200,
spy_vol * np.sqrt(252),
spy_60d, spy_120d, spy_252d,
vix_tr, vix_t5,
cr_r, cr_5, cr_20,
br_r, br_5, br_20,
cu_20, cu_60,
]
except Exception as e:
self.Log(f"GetFeatures error: {e}")
return None
# ── Model training ────────────────────────────────────────────────────────
def TrainModel(self):
if self.IsWarmingUp:
return
try:
self._TrainModelInner()
except Exception as e:
self.Log(f"TrainModel unhandled error: {e}")
def _TrainModelInner(self):
end_dt = self.Time
start_dt = end_dt - timedelta(days=4000)
vix_c = self._get_cboe_closes(self.vix, 4000, MIN_VIX_BARS)
spy_c = self._get_closes(self.spy, 4000)
if vix_c is None or spy_c is None:
self.Log("TrainModel: missing core history, skipping.")
return
self.Log(f"TrainModel | SPY {spy_c[0]:.2f}→{spy_c[-1]:.2f} "
f"bars={len(spy_c)} date={end_dt.date()}")
vix3m_c = self._get_cboe_closes(self.vix3m, 4000, 5)
hyg_c = self._get_closes(self.hyg, 4000)
lqd_c = self._get_closes(self.lqd, 4000)
rsp_c = self._get_closes(self.rsp, 4000)
ief_c = self._get_closes(self.ief, 4000)
shy_c = self._get_closes(self.shy, 4000)
label_cutoff = len(spy_c) - LABEL_HORIZON - SAFETY_BUFFER
if label_cutoff < MIN_SPY_BARS + MIN_TRAIN_ROWS:
self.Log("TrainModel: insufficient data.")
return
train_end = label_cutoff - TRAIN_VAL_GAP
if train_end - MIN_SPY_BARS < MIN_TRAIN_ROWS:
self.Log("TrainModel: window too small.")
return
indices = list(range(MIN_SPY_BARS, label_cutoff))
fwd_rets = [spy_c[i + LABEL_HORIZON] / spy_c[i] - 1 for i in indices]
median_r = np.median(fwd_rets)
X_all, y_all = [], []
for idx, i in enumerate(indices):
f = self.GetFeatures(
vix_c[:i], spy_c[:i],
vix3m_closes = vix3m_c[:i] if vix3m_c is not None else None,
hyg_closes = hyg_c[:i] if hyg_c is not None else None,
lqd_closes = lqd_c[:i] if lqd_c is not None else None,
rsp_closes = rsp_c[:i] if rsp_c is not None else None,
ief_closes = ief_c[:i] if ief_c is not None else None,
shy_closes = shy_c[:i] if shy_c is not None else None,
)
if f is not None:
X_all.append(f)
y_all.append(1 if fwd_rets[idx] > median_r else 0)
if len(X_all) < MIN_TRAIN_ROWS + 20:
self.Log(f"TrainModel: too few samples ({len(X_all)}).")
return
X_all = np.array(X_all)
y_all = np.array(y_all)
class_1_rate = float(np.mean(y_all))
if class_1_rate > 0.95 or class_1_rate < 0.05:
self.Log(f"TrainModel: degenerate labels ({class_1_rate:.3f}), disabling ML.")
self.trained = False
return
split = train_end - MIN_SPY_BARS
X_tr, y_tr = X_all[:split], y_all[:split]
X_va, y_va = X_all[split:], y_all[split:]
if len(X_tr) < MIN_TRAIN_ROWS:
self.Log("TrainModel: not enough training rows.")
return
self.scaler.fit(X_tr)
self.model.fit(self.scaler.transform(X_tr), y_tr)
self.trained = True
if len(X_va) > 0:
val_acc = self.model.score(self.scaler.transform(X_va), y_va)
baseline = float(np.mean(y_va))
self.Log(f"TrainModel | train={len(X_tr)} val={len(X_va)} "
f"val_acc={val_acc:.3f} baseline={baseline:.3f} "
f"edge={val_acc - baseline:+.3f} "
f"label_rate={class_1_rate:.3f} median_fwd={median_r:.4f}")
names = [
"vix_level","vix_zscore","vix_pct_rank","vix_vs_sma20","vix_vs_sma50",
"spy_5d","spy_10d","spy_20d","spy_vs_sma50","spy_vs_sma200","spy_vol",
"spy_60d","spy_120d","spy_252d",
"vix_term_ratio","vix_term_5d_chg",
"credit_ratio","credit_5d_chg","credit_20d_chg",
"breadth_ratio","breadth_5d","breadth_20d",
"curve_slope_20d","curve_slope_60d",
]
top = sorted(zip(names, self.model.feature_importances_),
key=lambda x: -x[1])[:7]
self.Log("Features: " + " | ".join(f"{n}={v:.3f}" for n, v in top))
# ── Macro signal — updates weight state only, no SetHoldings ─────────────
def CheckSignal(self):
if self.IsWarmingUp:
return
try:
self._CheckSignalInner()
except Exception as e:
self.Log(f"CheckSignal unhandled error: {e}")
def _CheckSignalInner(self):
# Core history via hardened helper
spy_c = self._get_closes(self.spy, 270)
vix_c = self._get_closes(self.vix, 100, is_custom=True)
if spy_c is None or vix_c is None:
self.Log("CheckSignal: missing core history, skipping.")
return
if len(vix_c) < MIN_VIX_BARS or len(spy_c) < MIN_SPY_BARS:
self.Log(f"CheckSignal: insufficient bars (vix={len(vix_c)} spy={len(spy_c)}), skipping.")
return
# Auxiliary series
vix3m_c = self._get_closes(self.vix3m, 10, is_custom=True)
hyg_c = self._get_closes(self.hyg, MIN_AUX_BARS)
lqd_c = self._get_closes(self.lqd, MIN_AUX_BARS)
rsp_c = self._get_closes(self.rsp, MIN_AUX_BARS)
ief_c = self._get_closes(self.ief, MIN_AUX_BARS)
shy_c = self._get_closes(self.shy, MIN_AUX_BARS)
# Regime indicators
current_vix = vix_c[-1]
vix_sma = np.mean(vix_c[-20:])
vix_level_80 = np.percentile(vix_c, 80)
spy_current = spy_c[-1]
spy_sma50 = np.mean(spy_c[-50:])
spy_sma200 = np.mean(spy_c[-200:])
spy_5d_ret = spy_c[-1] / spy_c[-5] - 1
spy_10d_ret = spy_c[-1] / spy_c[-10] - 1
# ML signal
ml_bullish = False
if self.trained:
feats = self.GetFeatures(
vix_c, spy_c,
vix3m_closes=vix3m_c,
hyg_closes=hyg_c, lqd_closes=lqd_c,
rsp_closes=rsp_c, ief_closes=ief_c, shy_closes=shy_c,
)
if feats is not None:
try:
proba = self.model.predict_proba(
self.scaler.transform([feats]))[0]
prob = proba[1] if len(proba) == 2 else 0.5
ml_bullish = prob > ML_THRESHOLD
except Exception as e:
self.Log(f"ML predict error: {e}")
# ── Determine regime weights ──────────────────────────────────────────
spy_w = gld_w = sleeve_w = 0.0
# Rule 1 — VIX spike + oversold → full dip entry, no sleeve
if current_vix > vix_level_80 and spy_5d_ret < -0.03:
if spy_10d_ret <= DIP_DEEP_THRESHOLD:
spy_w = DIP_DEEP_SPY_W_ML if ml_bullish else DIP_DEEP_SPY_W
else:
spy_w = DIP_SHALLOW_SPY_W_ML if ml_bullish else DIP_SHALLOW_SPY_W
gld_w = max(0.0, 1.0 - spy_w)
sleeve_w = 0.0
self.Log(f"R1-dip | vix={current_vix:.1f} 5d={spy_5d_ret:.3f} "
f"10d={spy_10d_ret:.3f} spy={spy_w:.2f} ml={ml_bullish}")
# Rule 2 — VIX very low + SPY extended → reduce risk, sleeve active
elif current_vix < 13 and spy_current > spy_sma50 * 1.05:
spy_w = 0.40
gld_w = 0.20
sleeve_w = 0.40
# Rule 3 — VIX elevated but falling → recovery + sleeve
elif 20 < current_vix < vix_sma:
spy_w = 0.85 if ml_bullish else 0.70
gld_w = 0.10
sleeve_w = max(0.0, 1.0 - spy_w - gld_w)
# Rule 4 — VIX rising sharply → defensive, no sleeve
elif current_vix > vix_sma * 1.2:
spy_w = 0.30
gld_w = 0.20
sleeve_w = 0.0
# Rule 5 — Above 200MA → trend following + sleeve
elif spy_current > spy_sma200:
spy_w = 0.70 if ml_bullish else 0.60
gld_w = 0.15
sleeve_w = max(0.0, 1.0 - spy_w - gld_w)
# Rule 6 — Below 200MA → defensive, no sleeve
else:
spy_w = 0.30
gld_w = 0.20
sleeve_w = 0.0
# Store state for RebalanceSleeve (runs 60 min later on month-start)
self.spy_weight = spy_w
self.gld_weight = gld_w
self.cash_sleeve_weight = max(0.0, min(1.0, sleeve_w))
# ── Daily execution ───────────────────────────────────────────────────
# On non-month-start days, RebalanceSleeve does not fire.
# Apply macro weights directly and liquidate sleeve if it's off.
# On month-start days, RebalanceSleeve fires 60 min later and handles
# everything atomically — we still apply macro here immediately.
self.SetHoldings(self.spy, self.spy_weight)
self.SetHoldings(self.gld, self.gld_weight)
if self.cash_sleeve_weight == 0.0:
self._liquidate_sleeve()
# ── Sleeve helpers ────────────────────────────────────────────────────────
def _liquidate_sleeve(self):
"""Close all sleeve equity positions."""
for sym in list(self._selected_symbols):
if sym in self.Securities and self.Portfolio[sym].Invested:
self.Liquidate(sym)
def RebalanceSleeve(self):
"""
Monthly atomic rebalance. Fires at MonthStart + 90 min, after
CheckSignal (+ 30 min) and TrainModel (+ 60 min) have both run.
Consolidates all SetHoldings into a single PortfolioTarget list
so SPY, GLD, and sleeve weights are set atomically.
"""
if self.IsWarmingUp:
return
try:
self._RebalanceSleeveInner()
except Exception as e:
self.Log(f"RebalanceSleeve unhandled error: {e}")
def _RebalanceSleeveInner(self):
if self.cash_sleeve_weight <= 0 or not self._selected_symbols:
self._liquidate_sleeve()
# Ensure macro weights are still applied
self.SetHoldings(self.spy, self.spy_weight)
self.SetHoldings(self.gld, self.gld_weight)
self.Log(f"RebalanceSleeve | sleeve OFF | "
f"spy={self.spy_weight:.2f} gld={self.gld_weight:.2f}")
return
now = self.Time
candidates = []
for sym in self._selected_symbols:
if sym not in self.Securities:
continue
sec = self.Securities[sym]
if not sec.HasData or sec.Price <= 0 or not sec.IsTradable:
continue
added = self._symbol_added_date.get(sym)
if added is not None and (now - added).days < self.MIN_HISTORY_DAYS:
continue
roc = self._momentum.get(sym)
if roc is None or not roc.IsReady:
continue
if float(roc.Current.Value) < self.MOMENTUM_MIN_RETURN:
continue
candidates.append((sym, float(roc.Current.Value)))
# Sort by momentum descending, cap at MAX_POSITIONS
candidates = sorted(candidates, key=lambda x: -x[1])[:self.MAX_POSITIONS]
if not candidates:
self._liquidate_sleeve()
self.SetHoldings(self.spy, self.spy_weight)
self.SetHoldings(self.gld, self.gld_weight)
self.Log("RebalanceSleeve | no valid candidates, sleeve liquidated.")
return
n = len(candidates)
base_w = min(1.0 / n, self.MAX_POSITION_WEIGHT)
scaled_w = base_w * self.cash_sleeve_weight
# Log sleeve composition for monitoring
tickers = [sym.Value for sym, _ in candidates]
self.Log(f"RebalanceSleeve | n={n} sleeve_w={self.cash_sleeve_weight:.2f} "
f"per_stock={scaled_w:.3f} | {' '.join(tickers[:10])}")
# Build single atomic target list
targets = [PortfolioTarget(sym, scaled_w) for sym, _ in candidates]
targets.append(PortfolioTarget(self.spy, self.spy_weight))
targets.append(PortfolioTarget(self.gld, self.gld_weight))
self.SetHoldings(targets)
def OnData(self, data: Slice):
pass
# ─────────────────────────────────────────────────────────────────────────────
# CBOE custom data reader
# ─────────────────────────────────────────────────────────────────────────────
class CBOE(PythonData):
def GetSource(self, config, date, isLive):
urls = {
"VIX": "https://cdn.cboe.com/api/global/us_indices/daily_prices/VIX_History.csv",
"VIX3M": "https://cdn.cboe.com/api/global/us_indices/daily_prices/VIX3M_History.csv",
}
url = urls.get(config.Symbol.Value, urls["VIX"])
return SubscriptionDataSource(url, SubscriptionTransportMedium.RemoteFile)
def Reader(self, config, line, date, isLive):
if not (line.strip() and line[0].isdigit()):
return None
cols = line.split(',')
try:
obj = CBOE()
obj.Symbol = config.Symbol
obj.Time = datetime.strptime(cols[0], "%m/%d/%Y")
obj.Value = float(cols[4])
obj["close"] = float(cols[4])
obj["open"] = float(cols[1])
obj["high"] = float(cols[2])
obj["low"] = float(cols[3])
return obj
except:
return None