| Overall Statistics |
|
Total Orders 1410 Average Win 0.76% Average Loss -0.59% Compounding Annual Return 30.945% Drawdown 30.200% Expectancy 0.417 Start Equity 10000 End Equity 50561.94 Net Profit 405.619% Sharpe Ratio 1.063 Sortino Ratio 1.214 Probabilistic Sharpe Ratio 63.249% Loss Rate 38% Win Rate 62% Profit-Loss Ratio 1.30 Alpha 0.132 Beta 0.717 Annual Standard Deviation 0.182 Annual Variance 0.033 Information Ratio 0.755 Tracking Error 0.142 Treynor Ratio 0.269 Total Fees $1427.43 Estimated Strategy Capacity $0 Lowest Capacity Asset GOOG T1AZ164W5VTX Portfolio Turnover 7.84% Drawdown Recovery 549 |
from AlgorithmImports import *
from sklearn.linear_model import Ridge
import numpy as np
import pandas as pd
import json
from datetime import datetime
class RidgeInteractionAlgorithm(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1)
self.SetCash(10000)
# self.ObjectStore.Delete("quarterly_ridge_model")
self.alpha_value = 5.0
self.target_window = 63
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self.tickers = ["AAPL", "TSLA", "NVDA", "AMD", "MSFT", "AMZN", "META", "GOOGL"]
self.symbols = [self.AddEquity(ticker, Resolution.Daily).Symbol for ticker in self.tickers]
self.treasury = self.AddEquity("SGOV", Resolution.Daily).Symbol
self.vix = self.AddData(CBOE, "VIX").Symbol
# Dual-Regime Filters
self.sma_long = self.SMA(self.spy, 200, Resolution.Daily) # Major Trend
self.sma_fast = self.SMA(self.spy, 20, Resolution.Daily) # Tactical Exit
self.vix_window = RollingWindow[float](252)
self.SetWarmUp(252)
self.model = None
if self.ObjectStore.ContainsKey("quarterly_ridge_model"):
try:
data = json.loads(self.ObjectStore.Read("quarterly_ridge_model"))
self.model = Ridge(alpha=self.alpha_value)
self.model.coef_ = np.array(data['coef'])
self.model.intercept_ = float(data['intercept'])
except: self.model = None
self.Train(self.DateRules.MonthStart(), self.TimeRules.At(1, 0), self.TrainModel)
self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.At(10, 0), self.Rebalance)
def Rebalance(self):
if self.model is None or self.IsWarmingUp or not self.vix_window.IsReady: return
# 1. VOLATILITY SCALING (Smooth Drawdown Control)
vix_current = self.Securities[self.vix].Price
vix_90th = np.percentile(list(self.vix_window), 90)
# The 'Dimmer Switch': Exposure scales down as VIX rises
# 1.0 at low VIX, 0.4 at extreme fear
vol_scaler = max(0.4, min(1.0, 1.0 - (vix_current / (vix_90th * 1.5) - 0.3)))
# 2. THE MARKET REGIME FILTER
spy_price = self.Securities[self.spy].Price
is_bull_market = spy_price > self.sma_long.Current.Value
is_tactical_safe = spy_price > self.sma_fast.Current.Value
# Assign Max Leverage based on Regime
if is_bull_market and is_tactical_safe:
max_leverage = 0.95 * vol_scaler
elif is_bull_market and not is_tactical_safe:
max_leverage = 0.60 * vol_scaler # "Caution" mode
else:
max_leverage = 0.30 * vol_scaler # "Survival" mode
# 3. ABSOLUTE CRISIS EXIT
if vix_current > vix_90th * 1.1:
self.SetHoldings(self.treasury, 0.95)
return
# 4. PREDICTIONS
predictions = {}
for symbol in self.symbols:
hist = self.History(symbol, 40, Resolution.Daily)
if hist.empty: continue
feats = self.get_features(hist.loc[symbol])
pred = self.model.predict(feats.tail(1))[0]
# Risk-Adjusted Prediction (PSR Booster)
vol = hist.loc[symbol]['close'].pct_change().std()
predictions[symbol] = {'pred': pred, 'inv_vol': 1/(vol + 1e-6)}
# 5. SELECTION (Threshold 0.004)
picks = sorted([s for s in predictions.items() if s[1]['pred'] > 0.004],
key=lambda x: x[1]['pred'], reverse=True)[:4]
if picks:
total_inv_vol = sum([x[1]['inv_vol'] for x in picks])
targets = []
for s, data in picks:
weight = (data['inv_vol'] / total_inv_vol) * max_leverage
targets.append(PortfolioTarget(s, weight))
self.SetHoldings(targets, True)
else:
self.SetHoldings(self.treasury, 0.95)
def get_features(self, df):
# ... (Your standard feature engineering logic)
tr = pd.concat([df['high']-df['low'], (df['high']-df['close'].shift(1)).abs(), (df['low']-df['close'].shift(1)).abs()], axis=1).max(axis=1)
atr = (tr.rolling(14).mean() / df['close']) * 100
ema12, ema26 = df['close'].ewm(span=12).mean(), df['close'].ewm(span=26).mean()
macd = (ema12 - ema26) - (ema12 - ema26).ewm(span=9).mean()
sma20, std20 = df['close'].rolling(20).mean(), df['close'].rolling(20).std()
pctb = (df['close'] - (sma20 - 1.25*std20)) / ((sma20 + 1.25*std20) - (sma20 - 1.25*std20) + 1e-6)
lr_slope = (df['close'].rolling(10).apply(lambda s: np.polyfit(np.arange(len(s)), s.values, 1)[0]) / df['close']) * 100
psar = (df['close'] > ((df['low'].rolling(14).min() + df['high'].rolling(14).max()) / 2)).astype(int)
return pd.DataFrame({'f1': atr, 'f2': macd, 'f3': pctb, 'f4': atr*pctb, 'f5': macd*pctb, 'f6': lr_slope, 'f7': psar, 'f8': lr_slope*atr}).dropna()
def TrainModel(self):
try:
h = self.History(self.symbols, (self.Time - datetime(2009,1,1)).days, Resolution.Daily)
X, y = [], []
for s in self.symbols:
if s not in h.index.levels[0]: continue
f = self.get_features(h.loc[s])
ret = (h.loc[s]['close'].shift(-self.target_window) / h.loc[s]['close'] - 1).dropna()
idx = f.index.intersection(ret.index)
X.append(f.loc[idx]); y.append(ret.loc[idx])
self.model = Ridge(alpha=self.alpha_value).fit(pd.concat(X), pd.concat(y))
self.ObjectStore.Save("quarterly_ridge_model", json.dumps({'coef': self.model.coef_.tolist(), 'intercept': float(self.model.intercept_)}))
except: pass
def OnData(self, data):
if data.ContainsKey(self.vix): self.vix_window.Add(float(data[self.vix].Price))