Overall Statistics
Total Orders
1131
Average Win
0.99%
Average Loss
-0.76%
Compounding Annual Return
33.733%
Drawdown
23.000%
Expectancy
0.426
Start Equity
10000
End Equity
57250.81
Net Profit
472.508%
Sharpe Ratio
1.228
Sortino Ratio
1.259
Probabilistic Sharpe Ratio
78.801%
Loss Rate
38%
Win Rate
62%
Profit-Loss Ratio
1.31
Alpha
0.173
Beta
0.41
Annual Standard Deviation
0.17
Annual Variance
0.029
Information Ratio
0.656
Tracking Error
0.185
Treynor Ratio
0.508
Total Fees
$1167.29
Estimated Strategy Capacity
$0
Lowest Capacity Asset
NVDA RHM8UTD8DT2D
Portfolio Turnover
9.58%
Drawdown Recovery
428
from AlgorithmImports import *
from sklearn.linear_model import Ridge
import numpy as np
import pandas as pd
import json
from datetime import datetime

class RidgeInteractionAlgorithm(QCAlgorithm):

    def Initialize(self):
        self.SetStartDate(2020, 1, 1) 
        self.SetCash(10000)
        
        # --- MODEL PERSISTENCE ---
        # Keep this commented to preserve your 76% PSR model
        # self.ObjectStore.Delete("quarterly_ridge_model")
        
        # 1. Settings & Risk Parameters (Aggressive for CAGR)
        self.Settings.FreePortfolioValuePercentage = 0.05 
        self.alpha_value = 5.0 
        
        # 2. Assets
        self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
        self.tickers = ["AAPL", "TSLA", "NVDA", "AMD", "MSFT", "AMZN", "META", "GOOGL"]
        self.symbols = [self.AddEquity(ticker, Resolution.Daily).Symbol for ticker in self.tickers]
        self.treasury = self.AddEquity("SGOV", Resolution.Daily).Symbol 
        self.vix = self.AddData(CBOE, "VIX").Symbol
        
        # 3. Indicators for Regime Detection
        self.sma_turbo = self.SMA(self.spy, 20, Resolution.Daily)
        self.vix_window = RollingWindow[float](252)
        
        self.SetWarmUp(252)

        # 4. Model Loading
        self.model = None
        if self.ObjectStore.ContainsKey("quarterly_ridge_model"):
            try:
                data = json.loads(self.ObjectStore.Read("quarterly_ridge_model"))
                self.model = Ridge(alpha=self.alpha_value)
                self.model.coef_ = np.array(data['coef'])
                self.model.intercept_ = float(data['intercept'])
                self.Debug("Successfully loaded 76% PSR Model.")
            except:
                self.model = None

        # 5. Scheduling
        self.Train(self.DateRules.MonthStart(), self.TimeRules.At(1, 0), self.TrainModel)
        self.Schedule.On(self.DateRules.Every(DayOfWeek.Monday), self.TimeRules.At(10, 0), self.Rebalance)
        
        if self.model is None: self.TrainModel()

    def get_features(self, history):
        df = history.copy()
        tr = pd.concat([df['high']-df['low'], (df['high']-df['close'].shift(1)).abs(), (df['low']-df['close'].shift(1)).abs()], axis=1).max(axis=1)
        atr = (tr.rolling(14).mean() / df['close']) * 100
        ema12 = df['close'].ewm(span=12, adjust=False).mean()
        ema26 = df['close'].ewm(span=26, adjust=False).mean()
        macd = (ema12 - ema26) - (ema12 - ema26).ewm(span=9, adjust=False).mean()
        sma20 = df['close'].rolling(window=20).mean()
        std20 = df['close'].rolling(window=20).std()
        pctb = (df['close'] - (sma20 - 1.25 * std20)) / ((sma20 + 1.25 * std20) - (sma20 - 1.25 * std20) + 1e-6)

        def get_slope(series):
            y = series.values
            x = np.arange(len(y))
            return np.polyfit(x, y, 1)[0]
        
        lr_slope = df['close'].rolling(window=10).apply(get_slope)
        lr_feature = (lr_slope / df['close']) * 100
        low_min = df['low'].rolling(window=14).min()
        high_max = df['high'].rolling(window=14).max()
        psar_signal = (df['close'] > ((low_min + high_max) / 2)).astype(int)

        data = {
            'f1': atr, 'f2': macd, 'f3': pctb, 'f4': atr * pctb, 
            'f5': macd * pctb, 'f6': lr_feature, 'f7': psar_signal, 'f8': lr_feature * atr
        }
        return pd.DataFrame(data).dropna()

    def Rebalance(self):
        if self.model is None or self.IsWarmingUp or not self.vix_window.IsReady: return

        # A. Market Context
        vix_current = self.Securities[self.vix].Price
        vix_threshold = np.percentile(list(self.vix_window), 90)
        is_bullish = self.Securities[self.spy].Price > self.sma_turbo.Current.Value

        # B. DYNAMIC LEVERAGE (Drawdown Defense)
        # Scale exposure based on market fear to stay in the 20-25% DD range
        if vix_current > vix_threshold * 0.85:
            max_exposure = 0.60  # Tighten safety net
        elif vix_current > vix_threshold * 0.70:
            max_exposure = 0.80  # Moderate caution
        else:
            max_exposure = 0.95  # Full "CAGR Boost" mode

        # C. Crisis Switch
        if not is_bullish or vix_current > vix_threshold:
            self.SetHoldings(self.treasury, 0.95, liquidateExistingHoldings=True)
            return

        # D. Predictions & Volatility Weighting
        predictions = {}
        for symbol in self.symbols:
            hist = self.History(symbol, 40, Resolution.Daily)
            if hist.empty or symbol not in hist.index.levels[0]: continue
            feats = self.get_features(hist.loc[symbol])
            if feats.empty: continue
            
            pred = self.model.predict(feats.tail(1))[0]
            
            # Inverse Volatility Weighting (Boosts PSR)
            tr = (hist.loc[symbol]['high'] - hist.loc[symbol]['low']).tail(14).mean()
            vol_inv = 1 / (tr / hist.loc[symbol]['close'].iloc[-1] + 1e-6)
            predictions[symbol] = {'pred': pred, 'vol_inv': vol_inv}

        # E. Selection (0.002 Threshold for CAGR)
        valid_picks = [s for s in predictions.items() if s[1]['pred'] > 0.002]
        top_picks = sorted(valid_picks, key=lambda x: x[1]['pred'], reverse=True)[:4]
        
        if top_picks:
            total_vol_inv = sum([x[1]['vol_inv'] for x in top_picks])
            targets = []
            for symbol, data in top_picks:
                weight = (data['vol_inv'] / total_vol_inv) * max_exposure
                
                # F. INDIVIDUAL ASSET HARD STOP (Drawdown Defense)
                # Cut individual losers at 10% to prevent portfolio-wide 30% drawdowns
                if self.Portfolio[symbol].UnrealizedProfitPercent < -0.10:
                    targets.append(PortfolioTarget(symbol, 0))
                else:
                    # Profit Trim logic preserved for stability
                    if self.Portfolio[symbol].UnrealizedProfitPercent > 0.25:
                        targets.append(PortfolioTarget(symbol, weight * 0.5))
                    else:
                        targets.append(PortfolioTarget(symbol, weight))
            
            targets.append(PortfolioTarget(self.treasury, 0))
            self.SetHoldings(targets, liquidateExistingHoldings=True)
        else:
            self.SetHoldings(self.treasury, 0.95, liquidateExistingHoldings=True)

    def TrainModel(self):
        try:
            # Expanding Window Training (2009 to present)
            history = self.History(self.symbols, (self.Time - datetime(2009, 1, 1)).days, Resolution.Daily)
            X_list, y_list = [], []
            for symbol in self.symbols:
                if symbol not in history.index.levels[0]: continue
                symbol_df = history.loc[symbol].copy()
                feats = self.get_features(symbol_df)
                target = (symbol_df['close'].shift(-63) / symbol_df['close'] - 1).dropna()
                idx = feats.index.intersection(target.index)
                X_list.append(feats.loc[idx]); y_list.append(target.loc[idx])
            
            self.model = Ridge(alpha=self.alpha_value)
            self.model.fit(pd.concat(X_list), pd.concat(y_list))
            
            save_obj = {'coef': self.model.coef_.tolist(), 'intercept': float(self.model.intercept_)}
            self.ObjectStore.Save("quarterly_ridge_model", json.dumps(save_obj))
            self.Debug(f"Model trained/updated on {self.Time.strftime('%Y-%m-%d')}")
        except Exception as e: 
            self.Debug(f"Train error: {e}")

    def OnData(self, data):
        if data.ContainsKey(self.vix):
            self.vix_window.Add(float(data[self.vix].Price))