Overall Statistics
Total Orders
403
Average Win
2.25%
Average Loss
-1.83%
Compounding Annual Return
26.142%
Drawdown
32.000%
Expectancy
0.413
Start Equity
10000
End Equity
40329.60
Net Profit
303.296%
Sharpe Ratio
0.903
Sortino Ratio
0.902
Probabilistic Sharpe Ratio
48.494%
Loss Rate
37%
Win Rate
63%
Profit-Loss Ratio
1.23
Alpha
0.095
Beta
0.518
Annual Standard Deviation
0.179
Annual Variance
0.032
Information Ratio
0.19
Tracking Error
0.174
Treynor Ratio
0.311
Total Fees
$401.04
Estimated Strategy Capacity
$0
Lowest Capacity Asset
NVDA RHM8UTD8DT2D
Portfolio Turnover
4.88%
Drawdown Recovery
554
from AlgorithmImports import *
import numpy as np
import pandas as pd
import os
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Dense, LSTM, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam

class Mag7LSTM_Institutional_Grade(QCAlgorithm):

    def Initialize(self):
        # 1. PORTFOLIO SETTINGS
        self.SetStartDate(2020, 1, 1)  
        self.SetEndDate(2025, 12, 31)    
        self.SetCash(10000)            
        
        # --- Institutional Risk Parameters ---
        self.TargetPortfolioVol = 0.12  # Target 12% Annual Volatility
        self.MaxPositionSize = 0.40      # Never put more than 40% in one stock
        self.RiskPerTrade = 0.02         # Risk 2% of equity per "unit" of volatility
        
        self.benchmark_ticker = self.AddEquity("QQQ", Resolution.Daily).Symbol
        self.SetBenchmark(self.benchmark_ticker)

        self.equities = ["AAPL", "MSFT", "GOOGL", "AMZN", "META", "TSLA", "NVDA", "AMD"]
        self.treasury = self.AddEquity("BIL", Resolution.Daily).Symbol
        
        self.symbols = []
        self.indicators = {}
        
        for t in self.equities:
            s = self.AddEquity(t, Resolution.Daily).Symbol
            self.symbols.append(s)
            self.indicators[s] = {
                "SMA": self.SMA(s, 200, Resolution.Daily), # Institutional Regime Filter
                "ATR": self.ATR(s, 20, MovingAverageType.Simple, Resolution.Daily),
                "TrailingStop": 0.0
            }

        # 2. MODEL CONFIG
        self.lookback_window = 60      
        self.prediction_window = 21    
        self.feature_count = 4 
        self.model_key = "mag7_institutional_v1"
        self.temp_path = os.path.join("/tmp", "model_v1.h5") 
        self.model = None

        self.Schedule.On(self.DateRules.MonthStart(self.symbols[0]), 
                         self.TimeRules.AfterMarketOpen(self.symbols[0], 30), 
                         self.TrainAndTrade)

    def CalculatePSAR(self, df):
        high, low = df['high'].values, df['low'].values
        psar = np.copy(df['close'].values)
        bull = True
        af, max_af = 0.02, 0.2
        af_val, ep = af, low[0]
        hp, lp = high[0], low[0]
        for i in range(2, len(df)):
            if bull:
                psar[i] = psar[i-1] + af_val * (hp - psar[i-1])
                bull = psar[i] < low[i]
                if not bull: psar[i], lp, af_val = hp, low[i], af
                else:
                    if high[i] > hp: hp = high[i]; af_val = min(af_val + af, max_af)
                    if low[i-1] < psar[i]: psar[i] = low[i-1]
                    if low[i-2] < psar[i]: psar[i] = low[i-2]
            else:
                psar[i] = psar[i-1] + af_val * (lp - psar[i-1])
                bull = psar[i] < high[i]
                if bull: psar[i], hp, af_val = lp, high[i], af
                else:
                    if low[i] < lp: lp = low[i]; af_val = min(af_val + af, max_af)
                    if high[i-1] > psar[i]: psar[i] = high[i-1]
                    if high[i-2] > psar[i]: psar[i] = high[i-2]
        return psar

    def PrepareFeatures(self, df):
        df = df.copy()
        df['Return'] = df['close'].pct_change()
        
        # MACD
        ema12 = df['close'].ewm(span=12, adjust=False).mean()
        ema26 = df['close'].ewm(span=26, adjust=False).mean()
        macd = ema12 - ema26
        df['MACD_Hist'] = macd - macd.ewm(span=9, adjust=False).mean()
        
        # Bollinger Width
        rm, rs = df['close'].rolling(20).mean(), df['close'].rolling(20).std()
        df['BB_Width'] = ((rm + 1.25*rs) - (rm - 1.25*rs)) / rm
        
        # PSAR
        df['PSAR_Dist'] = (df['close'] - self.CalculatePSAR(df)) / df['close']
        
        df.dropna(inplace=True)
        cols = ['Return', 'MACD_Hist', 'BB_Width', 'PSAR_Dist']
        for c in cols:
            df[c] = (df[c] - df[c].rolling(window=252).mean()) / df[c].rolling(window=252).std()
            
        df.dropna(inplace=True)
        return df[cols].values, df['close'].pct_change().shift(-21).dropna()

    def BuildModel(self):
        model = Sequential([
            LSTM(128, return_sequences=True, input_shape=(60, 4)), 
            Dropout(0.3),
            BatchNormalization(),
            LSTM(64, return_sequences=False),
            Dense(32, activation='relu'),
            Dense(1)
        ])
        model.compile(optimizer=Adam(learning_rate=0.001), loss='mse')
        return model

    def TrainAndTrade(self):
        history = self.History(self.symbols, 1260, Resolution.Daily)
        if history.empty: return

        X_train_all, y_train_all, current_sequences = [], [], {}

        for symbol in self.symbols:
            if symbol not in history.index.levels[0]: continue
            feat, target = self.PrepareFeatures(history.loc[symbol])
            
            for i in range(60, len(feat) - 21, 5):
                X_train_all.append(feat[i-60:i])
                y_train_all.append(np.sum(target.iloc[i:i+21]))

            if len(feat) >= 60:
                current_sequences[symbol] = np.reshape(feat[-60:], (1, 60, 4))

        if not X_train_all: return
        
        if not self.model: self.model = self.BuildModel()
        self.model.fit(np.array(X_train_all), np.array(y_train_all), epochs=8, verbose=0)

        # 3. VOLATILITY TARGETED ALLOCATION
        preds = []
        for s, seq in current_sequences.items():
            p = self.model.predict(seq)[0][0]
            # Regime Filter: Only buy if above 200 SMA (Institutional Rule)
            if p > 0 and self.Securities[s].Price > self.indicators[s]["SMA"].Current.Value:
                preds.append((s, p))
        
        preds.sort(key=lambda x: x[1], reverse=True)
        top_picks = [x[0] for x in preds[:4]] 

        if not top_picks:
            self.Liquidate()
            self.SetHoldings(self.treasury, 1.0)
        else:
            self.Liquidate(self.treasury)
            for s in self.symbols:
                if s not in top_picks: self.Liquidate(s)
            
            # --- VOLATILITY TARGETING CALCULATION ---
            total_equity = self.Portfolio.TotalPortfolioValue
            
            # Calculate total volatility sum to normalize
            vol_sum = 0
            vols = {}
            for s in top_picks:
                # Use Annualized Volatility (ATR-based estimate)
                daily_vol = self.indicators[s]["ATR"].Current.Value / self.Securities[s].Price
                vols[s] = daily_vol * np.sqrt(252)
                vol_sum += (1.0 / vols[s])
            
            for s in top_picks:
                # Weight is inversely proportional to volatility
                # (Lower Vol = Higher Weight)
                raw_weight = (1.0 / vols[s]) / vol_sum
                
                # Cap the position size for safety
                final_weight = min(raw_weight, self.MaxPositionSize)
                
                self.SetHoldings(s, final_weight)
                self.indicators[s]["TrailingStop"] = self.Securities[s].Price - (2.5 * self.indicators[s]["ATR"].Current.Value)

    def OnData(self, data):
        for s in self.symbols:
            if self.Portfolio[s].Invested:
                price = self.Securities[s].Price
                stop = self.indicators[s]["TrailingStop"]
                
                # Dynamic Trailing Stop
                new_stop = price - (2.5 * self.indicators[s]["ATR"].Current.Value)
                if new_stop > stop: self.indicators[s]["TrailingStop"] = new_stop
                
                if price < stop:
                    self.Liquidate(s, "Volatility/Stop Exit")