| Overall Statistics |
|
Total Orders 5556 Average Win 0.38% Average Loss -0.40% Compounding Annual Return 61.711% Drawdown 16.700% Expectancy 0.496 Start Equity 100000 End Equity 20735608.86 Net Profit 20635.609% Sharpe Ratio 2.04 Sortino Ratio 2.478 Probabilistic Sharpe Ratio 99.985% Loss Rate 23% Win Rate 77% Profit-Loss Ratio 0.94 Alpha 0.343 Beta 0.658 Annual Standard Deviation 0.193 Annual Variance 0.037 Information Ratio 1.813 Tracking Error 0.174 Treynor Ratio 0.598 Total Fees $73109.22 Estimated Strategy Capacity $0 Lowest Capacity Asset AAPL R735QTJ8XC9X Portfolio Turnover 12.08% Drawdown Recovery 291 |
from AlgorithmImports import *
from datetime import datetime
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
class VolatilityHarvestML_LongShort(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2015, 1, 1)
self.SetCash(100000)
self.SetBrokerageModel(BrokerageName.InteractiveBrokersBrokerage, AccountType.Margin)
self.Settings.FreePortfolioValuePercentage = 0.025
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self.SetBenchmark(self.spy)
self.gld = self.AddEquity("GLD", Resolution.Daily).Symbol
self.vix = self.AddData(self.CBOE, "VIX", Resolution.Daily).Symbol
self.long_gross = float(self.GetParameter("long_gross") or 0.9)
self.short_gross = float(self.GetParameter("short_gross") or 0.6)
self.UniverseSettings.Resolution = Resolution.Daily
self._top_set = set()
self._last_top_month = -1
self.ml_tilt = float(self.GetParameter("ml_tilt") or 0.25)
self.top_weight_max = float(self.GetParameter("top_weight_max") or 0.35)
self.top_weight_min = float(self.GetParameter("top_weight_min") or 0.0)
self.coarse_count = int(self.GetParameter("coarse_count") or 2000)
self.max_universe = int(self.GetParameter("max_universe") or 150)
self.top_n = int(self.GetParameter("top_n") or 1)
self.min_ipo_days = int(self.GetParameter("min_ipo_days") or 365)
self.lookback_bars = int(self.GetParameter("lookback_bars") or 260)
self.n_list = [10, 10, 40, 60, 90, 100]
self.sma_len = int(self.GetParameter("sma_len") or 195)
self.ext_k = float(self.GetParameter("ext_k") or 2.0)
self.mom_k = float(self.GetParameter("mom_k") or 1.75)
self.score_threshold = float(self.GetParameter("score_threshold") or 0.85)
self.stop_atr = float(self.GetParameter("stop_atr") or 2.0)
self._active = []
self._entry = {}
self.long_trail_1 = float(self.GetParameter("long_trail_1") or 0.095)
self.long_trail_2 = float(self.GetParameter("long_trail_2") or 0.07)
self.long_trail_3 = float(self.GetParameter("long_trail_3") or 0.0485)
self._long_trail = {}
self.model = RandomForestClassifier(n_estimators=100, max_depth=5, random_state=42)
self.scaler = StandardScaler()
self.trained = False
self.min_training = 504
self.AddUniverse(self.CoarseSelection, self.FineSelection)
self.Schedule.On(
self.DateRules.EveryDay("SPY"),
self.TimeRules.AfterMarketOpen("SPY", 30),
self.CheckSignal_Long
)
self.Schedule.On(
self.DateRules.MonthStart("SPY"),
self.TimeRules.AfterMarketOpen("SPY", 60),
self.TrainModel
)
self.Schedule.On(
self.DateRules.Every(DayOfWeek.Monday),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.Rebalance_Short
)
self.Schedule.On(
self.DateRules.EveryDay(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 160),
self.RiskCheck_Short
)
self.Schedule.On(
self.DateRules.EveryDay(self.spy),
self.TimeRules.AfterMarketOpen(self.spy, 90),
self.RiskCheck_Long
)
self.SetWarmUp(252)
def _safe_set_holdings(self, symbol, target_weight):
pv = float(self.Portfolio.TotalPortfolioValue)
if pv <= 0:
return
mr = float(self.Portfolio.MarginRemaining)
max_abs = max(0.0, mr / pv)
w = float(np.clip(float(target_weight), -max_abs, max_abs))
self.SetHoldings(symbol, w)
def CoarseSelection(self, coarse):
filtered = [
c for c in coarse
if c.HasFundamentalData
and c.Price is not None and c.Price > 5
and c.DollarVolume is not None and c.DollarVolume > 2e7
]
filtered.sort(key=lambda c: c.DollarVolume, reverse=True)
return [c.Symbol for c in filtered[:self.coarse_count]]
def FineSelection(self, fine):
today = self.Time.date()
if self.Time.month != self._last_top_month:
fine_mc = [f for f in fine if f.MarketCap and f.MarketCap > 0]
fine_mc.sort(key=lambda f: f.MarketCap, reverse=True)
self._top_set = set([f.Symbol for f in fine_mc[:4]])
self._last_top_month = self.Time.month
kept = []
for f in fine:
if not f.MarketCap or f.MarketCap < 1_000_000_000:
continue
sr = f.SecurityReference
if sr is None or sr.IPODate is None:
continue
days_since_ipo = (today - sr.IPODate.date()).days
if days_since_ipo < self.min_ipo_days:
continue
kept.append(f)
kept.sort(key=lambda f: f.DollarVolume, reverse=True)
short_symbols = [f.Symbol for f in kept[:self.max_universe]]
self._active = short_symbols
return list(set(short_symbols) | set(self._top_set))
def _cap_and_renormalize(self, weights, total, wmin, wmax):
w = np.array(weights, dtype=float)
n = len(w)
if n == 0:
return w
if wmin > 0:
w = np.maximum(w, wmin)
if wmax > 0:
w = np.minimum(w, wmax)
target = float(total)
for _ in range(10):
s = float(np.sum(w))
diff = target - s
if abs(diff) < 1e-8:
break
if diff > 0:
adjustable = [i for i in range(n) if (wmax <= 0 or w[i] < wmax - 1e-12)]
else:
adjustable = [i for i in range(n) if (wmin <= 0 or w[i] > wmin + 1e-12)]
if not adjustable:
break
incr = diff / float(len(adjustable))
for i in adjustable:
w[i] += incr
if wmin > 0:
w = np.maximum(w, wmin)
if wmax > 0:
w = np.minimum(w, wmax)
return w
def _pick_overweight_symbol(self, syms):
best = syms[0]
best_cap = -1.0
for sym in syms:
cap_val = -1.0
sec = self.Securities[sym] if self.Securities.ContainsKey(sym) else None
if sec is not None and sec.Fundamentals is not None:
cap = sec.Fundamentals.MarketCap
if cap and cap > 0:
cap_val = float(cap)
if cap_val > best_cap:
best_cap = cap_val
best = sym
return best
def _ensure_long_trail_state(self, sym, target_w):
if not self.Securities.ContainsKey(sym):
return
px = float(self.Securities[sym].Price)
if px <= 0:
return
st = self._long_trail.get(sym)
if st is None:
self._long_trail[sym] = {"high": px, "stage": 0, "target_w": float(target_w)}
else:
st["target_w"] = float(target_w)
def AllocateTop(self, total_weight, ml_bullish=False):
syms = list(self._top_set)
if not syms:
return
TW = float(total_weight)
if TW <= 0:
for sym in syms:
if self.Portfolio[sym].Invested and self.Portfolio[sym].Quantity > 0:
self.Liquidate(sym)
self._long_trail.pop(sym, None)
return
n = len(syms)
base = TW / float(n)
weights = np.array([base] * n, dtype=float)
if ml_bullish and self.ml_tilt > 0 and n >= 2:
ow = self._pick_overweight_symbol(syms)
i_ow = syms.index(ow)
extra = base * float(self.ml_tilt)
weights[i_ow] += extra
sub = extra / float(n - 1)
for i in range(n):
if i != i_ow:
weights[i] -= sub
weights = self._cap_and_renormalize(weights, TW, self.top_weight_min, self.top_weight_max)
for i, sym in enumerate(syms):
w = float(weights[i])
self._safe_set_holdings(sym, w)
self._ensure_long_trail_state(sym, w)
if self.Portfolio[self.spy].Invested:
self.Liquidate(self.spy)
def LiquidateNonTopLongsOnly(self):
for kvp in self.Portfolio:
sym = kvp.Key
if sym.SecurityType != SecurityType.Equity:
continue
if sym in (self.spy, self.gld):
continue
holding = kvp.Value
if holding.Invested and holding.Quantity > 0 and sym not in self._top_set:
self.Liquidate(sym)
self._long_trail.pop(sym, None)
def GetFeatures(self, vix_closes, spy_closes):
if len(vix_closes) < 50 or len(spy_closes) < 200:
return None
current_vix = vix_closes[-1]
vix_sma20 = np.mean(vix_closes[-20:])
vix_sma50 = np.mean(vix_closes[-50:])
vix_std = np.std(vix_closes[-20:])
vix_zscore = (current_vix - vix_sma20) / vix_std if vix_std > 0 else 0.0
vix_percentile = float(np.sum(vix_closes < current_vix)) / float(len(vix_closes))
spy_current = spy_closes[-1]
spy_sma50 = np.mean(spy_closes[-50:])
spy_sma200 = np.mean(spy_closes[-200:])
spy_5d_ret = spy_closes[-1] / spy_closes[-5] - 1
spy_10d_ret = spy_closes[-1] / spy_closes[-10] - 1
spy_20d_ret = spy_closes[-1] / spy_closes[-20] - 1
spy_vol = np.std(np.diff(spy_closes[-21:]) / spy_closes[-21:-1])
return [
float(current_vix),
float(vix_zscore),
float(vix_percentile),
float(current_vix / vix_sma20) if vix_sma20 != 0 else 1.0,
float(current_vix / vix_sma50) if vix_sma50 != 0 else 1.0,
float(spy_5d_ret),
float(spy_10d_ret),
float(spy_20d_ret),
float(spy_current / spy_sma50) if spy_sma50 != 0 else 1.0,
float(spy_current / spy_sma200) if spy_sma200 != 0 else 1.0,
float(spy_vol * np.sqrt(252)),
]
def TrainModel(self):
if self.IsWarmingUp:
return
vix_hist = self.History([self.vix], 800, Resolution.Daily)
spy_hist = self.History([self.spy], 800, Resolution.Daily)
if vix_hist.empty or spy_hist.empty:
return
try:
vix_closes = vix_hist.loc[self.vix]["close"].values
spy_closes = spy_hist.loc[self.spy]["close"].values
except:
return
if len(vix_closes) < self.min_training or len(spy_closes) < self.min_training:
return
X, y = [], []
for i in range(200, len(spy_closes) - 21):
feats = self.GetFeatures(vix_closes[:i], spy_closes[:i])
if feats is None:
continue
label = 1 if spy_closes[i + 21] / spy_closes[i] > 0.02 else 0
X.append(feats)
y.append(label)
if len(X) < 100:
return
X = np.array(X)
y = np.array(y)
self.scaler.fit(X)
Xs = self.scaler.transform(X)
self.model.fit(Xs, y)
self.trained = True
class CBOE(PythonData):
def GetSource(self, config, date, isLive):
return SubscriptionDataSource(
"https://cdn.cboe.com/api/global/us_indices/daily_prices/VIX_History.csv",
SubscriptionTransportMedium.RemoteFile
)
def Reader(self, config, line, date, isLive):
if not (line.strip() and line[0].isdigit()):
return None
data = line.split(',')
try:
bar = VolatilityHarvestML_LongShort.CBOE()
bar.Symbol = config.Symbol
bar.Time = datetime.strptime(data[0], "%m/%d/%Y")
bar.Value = float(data[4])
bar["close"] = float(data[4])
bar["open"] = float(data[1])
bar["high"] = float(data[2])
bar["low"] = float(data[3])
return bar
except:
return None
def CheckSignal_Long(self):
if self.IsWarmingUp:
return
self.LiquidateNonTopLongsOnly()
vix_hist = self.History([self.vix], 100, Resolution.Daily)
spy_hist = self.History([self.spy], 200, Resolution.Daily)
if vix_hist.empty or spy_hist.empty:
return
try:
vix_closes = vix_hist.loc[self.vix]["close"].values
spy_closes = spy_hist.loc[self.spy]["close"].values
except:
return
if len(vix_closes) < 50 or len(spy_closes) < 200:
return
current_vix = float(vix_closes[-1])
vix_sma = float(np.mean(vix_closes[-20:]))
vix_p80 = float(np.percentile(vix_closes, 80))
spy_current = float(spy_closes[-1])
spy_sma50 = float(np.mean(spy_closes[-50:]))
spy_sma200 = float(np.mean(spy_closes[-200:]))
spy_5d_ret = float(spy_closes[-1] / spy_closes[-5] - 1)
ml_bullish = False
if self.trained:
feats = self.GetFeatures(vix_closes, spy_closes)
if feats is not None:
X = self.scaler.transform([feats])
prob_array = self.model.predict_proba(X)[0]
if len(prob_array) == 2:
prob = float(prob_array[1])
else:
prob = 0.7 if self.model.predict(X)[0] == 1 else 0.5
ml_bullish = prob > 0.6
LG = float(self.long_gross)
if current_vix > vix_p80 and spy_5d_ret < -0.03:
weight = 1.0 if ml_bullish else 0.85
eq_w = LG * weight
self.AllocateTop(eq_w, ml_bullish=ml_bullish)
self._safe_set_holdings(self.gld, LG * (1.0 - weight))
return
if current_vix < 13 and spy_current > spy_sma50 * 1.05:
self.AllocateTop(LG * 0.40, ml_bullish=ml_bullish)
self._safe_set_holdings(self.gld, LG * 0.40)
return
if 20 < current_vix < vix_sma:
weight = 0.85 if ml_bullish else 0.70
eq_w = LG * weight
self.AllocateTop(eq_w, ml_bullish=ml_bullish)
self._safe_set_holdings(self.gld, LG * (1.0 - weight))
return
if current_vix > vix_sma * 1.2:
self.AllocateTop(0.0, ml_bullish=ml_bullish)
self._safe_set_holdings(self.gld, LG * 0.50)
return
if spy_current > spy_sma200:
base = 0.90 if ml_bullish else 0.70
self.AllocateTop(LG * base, ml_bullish=ml_bullish)
self._safe_set_holdings(self.gld, LG * (1.0 - base))
else:
self.AllocateTop(LG * 0.30, ml_bullish=ml_bullish)
self._safe_set_holdings(self.gld, LG * 0.50)
def RiskCheck_Long(self):
if self.IsWarmingUp:
return
for sym in list(self._long_trail.keys()):
if not self.Securities.ContainsKey(sym) or not self.Portfolio[sym].Invested or self.Portfolio[sym].Quantity <= 0:
self._long_trail.pop(sym, None)
continue
if sym not in self._top_set:
continue
px = float(self.Securities[sym].Price)
if px <= 0:
continue
st = self._long_trail.get(sym)
if st is None:
continue
if px > float(st["high"]):
st["high"] = px
high = float(st["high"])
if high <= 0:
continue
dd = (high - px) / high
stage = int(st["stage"])
full_target_w = float(st["target_w"])
if stage == 0 and dd >= self.long_trail_1:
new_w = full_target_w * (2.0 / 3.0)
self._safe_set_holdings(sym, new_w)
st["stage"] = 1
st["high"] = px
elif stage == 1 and dd >= self.long_trail_2:
new_w = full_target_w * (1.0 / 3.0)
self._safe_set_holdings(sym, new_w)
st["stage"] = 2
st["high"] = px
elif stage == 2 and dd >= self.long_trail_3:
self.Liquidate(sym)
self._long_trail.pop(sym, None)
def _atr(self, df, n):
w = df.shape[0]
if w < n + 1:
return None
s = 0.0
for i in range(1, n + 1):
hi = float(df["high"].iloc[w - i])
lo = float(df["low"].iloc[w - i])
cl = float(df["close"].iloc[w - i])
s += max(hi - lo, abs(hi - cl), abs(lo - cl))
return s / float(n)
def _hurst_like(self, df, n, bump):
atr = self._atr(df, n)
if atr is None or atr <= 0:
return None
high_max = float(df["high"].tail(n).max())
low_min = float(df["low"].tail(n).min())
span = high_max - low_min
if span <= 0:
return None
h = (np.log(span) - np.log(atr)) / np.log(float(n))
if h > 0.45:
h += bump
elif h < 0.45:
h -= bump
return float(h)
def _compute_score_and_filters(self, symbol):
df = self.History(symbol, self.lookback_bars, Resolution.Daily)
if df is None or df.empty:
return None
if isinstance(df.index, pd.MultiIndex):
df = df.xs(symbol)
if len(df) < max(self.n_list) + 6:
return None
hvals = []
for n in self.n_list:
hv = self._hurst_like(df, n, 0.01 + 0.0002 * n)
if hv is not None:
hvals.append(hv)
if len(hvals) < 4:
return None
havg = float(sum(hvals) / float(len(hvals)))
agree = int(sum(1 for x in hvals if x > 0.6))
close_now = float(df["close"].iloc[-1])
sma = float(df["close"].tail(self.sma_len).mean())
atr20 = self._atr(df, 20)
if atr20 is None or atr20 <= 0:
return None
close_5 = float(df["close"].iloc[-5])
ext_ok = (close_now - sma) > self.ext_k * atr20
mom_ok = (close_now - close_5) > self.mom_k * atr20
score = havg + 0.02 * max(0, agree - 3)
return float(score), bool(ext_ok), bool(mom_ok), close_now, float(atr20)
def Rebalance_Short(self):
if self.IsWarmingUp or not self._active:
return
scored = []
for sym in self._active:
if sym == self.spy:
continue
if not self.Securities.ContainsKey(sym) or not self.Securities[sym].HasData:
continue
out = self._compute_score_and_filters(sym)
if out is None:
continue
score, ext_ok, mom_ok, close_now, atr20 = out
if score >= self.score_threshold and ext_ok and mom_ok:
scored.append((score, sym, close_now, atr20))
scored.sort(reverse=True, key=lambda x: x[0])
picked = scored[:self.top_n]
selected = [sym for _, sym, _, _ in picked]
for kvp in self.Portfolio:
sym = kvp.Key
if sym in (self.spy, self.gld):
continue
holding = kvp.Value
if holding.Invested and holding.Quantity < 0 and sym not in selected:
self.Liquidate(sym)
self._entry.pop(sym, None)
if selected:
w = -abs(self.short_gross) / float(len(selected))
for _, sym, close_now, atr20 in picked:
self._safe_set_holdings(sym, w)
if sym not in self._entry:
self._entry[sym] = {"entry_price": close_now, "entry_atr": atr20}
def RiskCheck_Short(self):
if self.IsWarmingUp:
return
exits = []
for sym, info in list(self._entry.items()):
if not self.Securities.ContainsKey(sym) or not self.Portfolio[sym].Invested:
self._entry.pop(sym, None)
continue
if self.Portfolio[sym].Quantity >= 0:
self._entry.pop(sym, None)
continue
price = float(self.Securities[sym].Price)
if price <= 0:
continue
entry = float(info["entry_price"])
atr = float(info["entry_atr"])
if atr <= 0:
continue
if (price - entry) > self.stop_atr * atr:
exits.append(sym)
for sym in exits:
self.Liquidate(sym)
self._entry.pop(sym, None)