Overall Statistics
Total Orders
1410
Average Win
0.76%
Average Loss
-0.59%
Compounding Annual Return
30.978%
Drawdown
30.200%
Expectancy
0.417
Start Equity
10000
End Equity
50561.94
Net Profit
405.619%
Sharpe Ratio
1.064
Sortino Ratio
1.215
Probabilistic Sharpe Ratio
63.299%
Loss Rate
38%
Win Rate
62%
Profit-Loss Ratio
1.30
Alpha
0.132
Beta
0.717
Annual Standard Deviation
0.182
Annual Variance
0.033
Information Ratio
0.755
Tracking Error
0.142
Treynor Ratio
0.27
Total Fees
$1427.43
Estimated Strategy Capacity
$0
Lowest Capacity Asset
GOOG T1AZ164W5VTX
Portfolio Turnover
7.84%
Drawdown Recovery
549
from AlgorithmImports import *
from sklearn.linear_model import Ridge
import numpy as np
import pandas as pd
import json
from datetime import datetime

class RidgeInteractionAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2020, 1, 1)
        self.SetEndDate(2025, 12, 31)
        self.SetCash(10000)
        
        # self.ObjectStore.Delete("quarterly_ridge_model")
        
        self.alpha_value = 5.0 
        self.target_window = 63 
        
        self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
        self.tickers = ["AAPL", "TSLA", "NVDA", "AMD", "MSFT", "AMZN", "META", "GOOGL"]
        self.symbols = [self.AddEquity(ticker, Resolution.Daily).Symbol for ticker in self.tickers]
        self.treasury = self.AddEquity("SGOV", Resolution.Daily).Symbol 
        self.vix = self.AddData(CBOE, "VIX").Symbol
        
        # Dual-Regime Filters
        self.sma_long = self.SMA(self.spy, 200, Resolution.Daily) # Major Trend
        self.sma_fast = self.SMA(self.spy, 20, Resolution.Daily)  # Tactical Exit
        self.vix_window = RollingWindow[float](252)
        
        self.SetWarmUp(252)

        self.model = None
        if self.ObjectStore.ContainsKey("quarterly_ridge_model"):
            try:
                data = json.loads(self.ObjectStore.Read("quarterly_ridge_model"))
                self.model = Ridge(alpha=self.alpha_value)
                self.model.coef_ = np.array(data['coef'])
                self.model.intercept_ = float(data['intercept'])
            except: self.model = None

        self.Train(self.DateRules.MonthStart(), self.TimeRules.At(1, 0), self.TrainModel)
        self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.At(10, 0), self.Rebalance)

    def Rebalance(self):
        if self.model is None or self.IsWarmingUp or not self.vix_window.IsReady: return

        # 1. VOLATILITY SCALING (Smooth Drawdown Control)
        vix_current = self.Securities[self.vix].Price
        vix_90th = np.percentile(list(self.vix_window), 90)
        
        # The 'Dimmer Switch': Exposure scales down as VIX rises
        # 1.0 at low VIX, 0.4 at extreme fear
        vol_scaler = max(0.4, min(1.0, 1.0 - (vix_current / (vix_90th * 1.5) - 0.3)))

        # 2. THE MARKET REGIME FILTER
        spy_price = self.Securities[self.spy].Price
        is_bull_market = spy_price > self.sma_long.Current.Value
        is_tactical_safe = spy_price > self.sma_fast.Current.Value

        # Assign Max Leverage based on Regime
        if is_bull_market and is_tactical_safe:
            max_leverage = 0.95 * vol_scaler
        elif is_bull_market and not is_tactical_safe:
            max_leverage = 0.60 * vol_scaler # "Caution" mode
        else:
            max_leverage = 0.30 * vol_scaler # "Survival" mode

        # 3. ABSOLUTE CRISIS EXIT
        if vix_current > vix_90th * 1.1:
            self.SetHoldings(self.treasury, 0.95)
            return

        # 4. PREDICTIONS
        predictions = {}
        for symbol in self.symbols:
            hist = self.History(symbol, 40, Resolution.Daily)
            if hist.empty: continue
            feats = self.get_features(hist.loc[symbol])
            pred = self.model.predict(feats.tail(1))[0]
            
            # Risk-Adjusted Prediction (PSR Booster)
            vol = hist.loc[symbol]['close'].pct_change().std()
            predictions[symbol] = {'pred': pred, 'inv_vol': 1/(vol + 1e-6)}

        # 5. SELECTION (Threshold 0.004)
        picks = sorted([s for s in predictions.items() if s[1]['pred'] > 0.004], 
                       key=lambda x: x[1]['pred'], reverse=True)[:4]
        
        if picks:
            total_inv_vol = sum([x[1]['inv_vol'] for x in picks])
            targets = []
            for s, data in picks:
                weight = (data['inv_vol'] / total_inv_vol) * max_leverage
                targets.append(PortfolioTarget(s, weight))
            self.SetHoldings(targets, True)
        else:
            self.SetHoldings(self.treasury, 0.95)

    def get_features(self, df):
        # ... (Your standard feature engineering logic)
        tr = pd.concat([df['high']-df['low'], (df['high']-df['close'].shift(1)).abs(), (df['low']-df['close'].shift(1)).abs()], axis=1).max(axis=1)
        atr = (tr.rolling(14).mean() / df['close']) * 100
        ema12, ema26 = df['close'].ewm(span=12).mean(), df['close'].ewm(span=26).mean()
        macd = (ema12 - ema26) - (ema12 - ema26).ewm(span=9).mean()
        sma20, std20 = df['close'].rolling(20).mean(), df['close'].rolling(20).std()
        pctb = (df['close'] - (sma20 - 1.25*std20)) / ((sma20 + 1.25*std20) - (sma20 - 1.25*std20) + 1e-6)
        lr_slope = (df['close'].rolling(10).apply(lambda s: np.polyfit(np.arange(len(s)), s.values, 1)[0]) / df['close']) * 100
        psar = (df['close'] > ((df['low'].rolling(14).min() + df['high'].rolling(14).max()) / 2)).astype(int)
        return pd.DataFrame({'f1': atr, 'f2': macd, 'f3': pctb, 'f4': atr*pctb, 'f5': macd*pctb, 'f6': lr_slope, 'f7': psar, 'f8': lr_slope*atr}).dropna()

    def TrainModel(self):
        try:
            h = self.History(self.symbols, (self.Time - datetime(2009,1,1)).days, Resolution.Daily)
            X, y = [], []
            for s in self.symbols:
                if s not in h.index.levels[0]: continue
                f = self.get_features(h.loc[s])
                ret = (h.loc[s]['close'].shift(-self.target_window) / h.loc[s]['close'] - 1).dropna()
                idx = f.index.intersection(ret.index)
                X.append(f.loc[idx]); y.append(ret.loc[idx])
            self.model = Ridge(alpha=self.alpha_value).fit(pd.concat(X), pd.concat(y))
            self.ObjectStore.Save("quarterly_ridge_model", json.dumps({'coef': self.model.coef_.tolist(), 'intercept': float(self.model.intercept_)}))
        except: pass

    def OnData(self, data):
        if data.ContainsKey(self.vix): self.vix_window.Add(float(data[self.vix].Price))