Overall Statistics
Total Orders
2535
Average Win
0.51%
Average Loss
-0.35%
Compounding Annual Return
31.647%
Drawdown
44.000%
Expectancy
0.403
Start Equity
10000
End Equity
52080.97
Net Profit
420.810%
Sharpe Ratio
0.829
Sortino Ratio
0.928
Probabilistic Sharpe Ratio
32.438%
Loss Rate
42%
Win Rate
58%
Profit-Loss Ratio
1.43
Alpha
0.075
Beta
1.13
Annual Standard Deviation
0.267
Annual Variance
0.071
Information Ratio
0.729
Tracking Error
0.126
Treynor Ratio
0.196
Total Fees
$2496.04
Estimated Strategy Capacity
$1400000000.00
Lowest Capacity Asset
AMD R735QTJ8XC9X
Portfolio Turnover
12.66%
Drawdown Recovery
770
from AlgorithmImports import *
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from datetime import datetime
import numpy as np
import pandas as pd

class IRPrecisionFalcon(QCAlgorithm):
    def Initialize(self):
        self.SetStartDate(2020, 1, 1)
        self.SetEndDate(2025, 12, 30)
        self.SetCash(10000) 
        
        self.tickers = ["AAPL", "AMZN", "GOOGL", "META", "MSFT", "NVDA", "TSLA", "AMD"]
        self.symbols = [self.AddEquity(t, Resolution.Daily).Symbol for t in self.tickers]
        self._bench = self.AddEquity("QQQ", Resolution.Daily).Symbol 
        self.SetBenchmark(self._bench)
        
        # Macro Gatekeeper (Golden Cross)
        self.sma50 = self.SMA(self._bench, 50, Resolution.Daily)
        self.sma200 = self.SMA(self._bench, 200, Resolution.Daily)
        
        # OPTIMIZED: Better hyperparameters for financial data
        # - Increased trees for better stability
        # - Deeper trees to capture complex patterns
        # - min_samples_split prevents overfitting on noise
        # - min_samples_leaf ensures leaf nodes have enough samples
        # - max_features='sqrt' reduces correlation between trees
        self.model_trump = RandomForestClassifier(
            n_estimators=200,           # More trees = more stable predictions
            max_depth=8,                # Deeper to capture complexity
            min_samples_split=20,       # Prevent splitting on small samples
            min_samples_leaf=10,        # Ensure leaves have enough data
            max_features='sqrt',        # Reduce tree correlation
            random_state=42,
            n_jobs=-1,                  # Parallel processing
            class_weight='balanced'     # Handle class imbalance
        )
        self.model_std = RandomForestClassifier(
            n_estimators=200,
            max_depth=8,
            min_samples_split=20,
            min_samples_leaf=10,
            max_features='sqrt',
            random_state=42,
            n_jobs=-1,
            class_weight='balanced'
        )
        
        # OPTIMIZED: Separate scalers for each regime to avoid data leakage
        self.scaler_trump = StandardScaler()
        self.scaler_std = StandardScaler()
        
        # OPTIMIZED: Adjusted thresholds based on balanced classes
        self.min_confidence_trump = 0.65  # More realistic for balanced RF
        self.min_confidence_std = 0.50   
        
        # OPTIMIZED: Cache for feature calculation
        self.feature_cache = {}
        self.cache_timestamp = None
        
        self.SetWarmUp(252)
        # OPTIMIZED: Train more frequently for better adaptation
        self.Train(self.DateRules.WeekStart(), self.TimeRules.At(0, 0), self.TrainModel)

    def IsTrumpRegime(self, time):
        is_term1 = (time >= datetime(2017, 1, 20) and time < datetime(2021, 1, 20))
        is_term2 = (time >= datetime(2025, 1, 20))
        return is_term1 or is_term2

    def GetFeatures(self, history_df):
        """OPTIMIZED: Enhanced feature engineering with additional technical indicators"""
        closes = history_df['close'].unstack(level=0)
        highs = history_df['high'].unstack(level=0)
        lows = history_df['low'].unstack(level=0)
        volumes = history_df['volume'].unstack(level=0) if 'volume' in history_df.columns else None
        
        # Multiple timeframe momentum
        returns_5 = closes.pct_change(5)
        returns_10 = closes.pct_change(10)
        returns_20 = closes.pct_change(20)
        
        # MACD
        ema12 = closes.ewm(span=12, adjust=False).mean()
        ema26 = closes.ewm(span=26, adjust=False).mean()
        macd = ema12 - ema26
        macd_signal = macd.ewm(span=9, adjust=False).mean()
        macd_hist = macd - macd_signal
        
        # Bollinger Bands
        bb_sma = closes.rolling(window=20).mean()
        bb_std = closes.rolling(window=20).std()
        bb_percent_b = (closes - (bb_sma - 1.25 * bb_std)) / (2.5 * bb_std)
        bb_width = (2.5 * bb_std) / bb_sma  # Volatility measure
        
        # ATR
        tr = np.maximum(highs - lows, np.maximum((highs - closes.shift(1)).abs(), (lows - closes.shift(1)).abs()))
        atr = tr.rolling(window=14).mean()
        norm_atr = atr / closes
        
        # RSI
        delta = closes.diff()
        gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
        loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
        rs = gain / loss
        rsi = 100 - (100 / (1 + rs))
        
        # Volume features (if available)
        if volumes is not None:
            vol_sma = volumes.rolling(window=20).mean()
            vol_ratio = volumes / vol_sma
        else:
            vol_ratio = pd.DataFrame(1, index=closes.index, columns=closes.columns)

        features_dict = {}
        for s in self.symbols:
            s_val = s.Value
            if s_val not in closes.columns: 
                continue
            
            s_closes, s_highs, s_lows = closes[s_val], highs[s_val], lows[s_val]
            
            # OPTIMIZED: Vectorized PSAR calculation
            psar, bull, af, ep = self._calculate_psar(s_closes, s_highs, s_lows)
            psar_signal = (s_closes > psar).astype(int)
            
            # Relative strength vs benchmark
            alpha_5 = returns_5[s_val] - returns_5[self._bench.Value] if self._bench.Value in returns_5.columns else 0
            alpha_10 = returns_10[s_val] - returns_10[self._bench.Value] if self._bench.Value in returns_10.columns else 0
            alpha_20 = returns_20[s_val] - returns_20[self._bench.Value] if self._bench.Value in returns_20.columns else 0
            
            # OPTIMIZED: More comprehensive feature set
            df = pd.DataFrame({
                'ret_5': returns_5[s_val],
                'ret_10': returns_10[s_val], 
                'ret_20': returns_20[s_val],
                'alpha_5': alpha_5,
                'alpha_10': alpha_10,
                'alpha_20': alpha_20,
                'macd': macd[s_val],
                'macd_hist': macd_hist[s_val],
                'bb_pct': bb_percent_b[s_val],
                'bb_width': bb_width[s_val],
                'atr': norm_atr[s_val],
                'rsi': rsi[s_val],
                'psar': psar_signal,
                'vol_ratio': vol_ratio[s_val]
            }).dropna()
            
            features_dict[s_val] = df
            
        return features_dict, closes

    def _calculate_psar(self, closes, highs, lows):
        """OPTIMIZED: Efficient PSAR calculation"""
        psar = closes.copy()
        if len(closes) < 2:
            return psar, True, 0.02, lows.iloc[0]
            
        bull, af, ep = True, 0.02, lows.iloc[0]
        psar.iloc[0] = highs.iloc[0]
        
        for i in range(1, len(closes)):
            psar.iloc[i] = psar.iloc[i-1] + af * (ep - psar.iloc[i-1])
            
            if bull:
                if lows.iloc[i] < psar.iloc[i]:
                    bull, psar.iloc[i], ep, af = False, ep, lows.iloc[i], 0.02
                elif highs.iloc[i] > ep:
                    ep, af = highs.iloc[i], min(af + 0.02, 0.2)
            else:
                if highs.iloc[i] > psar.iloc[i]:
                    bull, psar.iloc[i], ep, af = True, ep, highs.iloc[i], 0.02
                elif lows.iloc[i] < ep:
                    ep, af = lows.iloc[i], min(af + 0.02, 0.2)
        
        return psar, bull, af, ep

    def TrainModel(self):
        """OPTIMIZED: Improved training with better data handling and validation"""
        history = self.History(self.symbols + [self._bench], datetime(2009, 1, 1), self.Time, Resolution.Daily)
        if history.empty: 
            return
        
        feat_data, prices = self.GetFeatures(history)
        t_feat, t_lab, s_feat, s_lab = [], [], [], [] 
        
        # OPTIMIZED: Forward-looking period for labels (5 days)
        forward_days = 5
        
        for s in self.symbols:
            s_val = s.Value
            if s_val not in feat_data or feat_data[s_val].empty: 
                continue
                
            df = feat_data[s_val]
            
            # OPTIMIZED: Ensure we have benchmark prices for all dates
            valid_dates = df.index.intersection(prices.index)
            if len(valid_dates) < forward_days + 1:
                continue
            
            for i in range(len(valid_dates) - forward_days):
                date = valid_dates[i]
                future_date = valid_dates[i + forward_days]
                
                # Get feature vector
                feat_vector = df.loc[date].tolist()
                
                # Calculate forward returns
                try:
                    f_s = (prices[s_val].loc[future_date] / prices[s_val].loc[date]) - 1
                    f_b = (prices[self._bench.Value].loc[future_date] / prices[self._bench.Value].loc[date]) - 1
                except:
                    continue
                
                # OPTIMIZED: More nuanced labeling - significant outperformance
                label = 1 if f_s > f_b + 0.001 else 0  # 0.1% threshold for noise reduction
                
                # Separate by regime
                if self.IsTrumpRegime(date):
                    # Only use data from 2018+ for better quality
                    if date >= datetime(2018, 1, 1):
                        t_feat.append(feat_vector)
                        t_lab.append(label)
                else:
                    s_feat.append(feat_vector)
                    s_lab.append(label)

        # OPTIMIZED: Train with sufficient data and proper scaling
        min_samples = 200  # Increased minimum for better model quality
        
        if len(t_feat) >= min_samples:
            X_t = self.scaler_trump.fit_transform(t_feat)
            self.model_trump.fit(X_t, t_lab)
            train_score = self.model_trump.score(X_t, t_lab)
            self.Log(f"Trump Model Trained: {len(t_feat)} samples, Train Acc: {train_score:.3f}")
        
        if len(s_feat) >= min_samples:
            X_s = self.scaler_std.fit_transform(s_feat)
            self.model_std.fit(X_s, s_lab)
            train_score = self.model_std.score(X_s, s_lab)
            self.Log(f"Std Model Trained: {len(s_feat)} samples, Train Acc: {train_score:.3f}")

    def OnData(self, data):
        if self.IsWarmingUp: 
            return
        
        # OPTIMIZED: Macro Filter with stronger signal
        is_bull_macro = self.sma50.Current.Value > self.sma200.Current.Value
        if not is_bull_macro:
            if not self.Portfolio[self._bench].Invested:
                self.Log(f"MACRO PROTECT: Death Cross detected. Full QQQ allocation.")
                self.SetHoldings(self._bench, 0.98, liquidateExistingHoldings=True)
            return

        # Select regime-appropriate model and scaler
        is_trump = self.IsTrumpRegime(self.Time)
        model = self.model_trump if is_trump else self.model_std
        scaler = self.scaler_trump if is_trump else self.scaler_std
        threshold = self.min_confidence_trump if is_trump else self.min_confidence_std
        
        # Check if model is trained
        try:
            model.predict_proba([[0] * 14])  # Updated for new feature count
        except:
            if not self.Portfolio[self._bench].Invested:
                self.SetHoldings(self._bench, 0.98, liquidateExistingHoldings=True)
            return

        # Get features
        hist = self.History(self.symbols + [self._bench], 60, Resolution.Daily)
        if hist.empty: 
            return
            
        feat_data, _ = self.GetFeatures(hist)
        
        # OPTIMIZED: Score all candidates with feature importance consideration
        candidates = []
        all_scores = []
        
        for s in self.symbols:
            s_val = s.Value
            if s_val not in feat_data or feat_data[s_val].empty: 
                continue
                
            try:
                feat_vector = scaler.transform([feat_data[s_val].iloc[-1].tolist()])
                prob = model.predict_proba(feat_vector)[0][1]
                all_scores.append(f"{s_val}:{prob:.2f}")
                
                if prob > threshold:
                    candidates.append((s, prob))
            except Exception as e:
                continue

        # Log predictions
        regime = "Trump" if is_trump else "Standard"
        self.Log(f"[{regime}] Scores: {', '.join(all_scores)} | Threshold: {threshold:.2f}")

        # OPTIMIZED: Portfolio Construction with risk management
        if candidates:
            # Take top 3 picks for diversification (was 2)
            top_picks = sorted(candidates, key=lambda x: x[1], reverse=True)[:3]
            
            # OPTIMIZED: More conservative exposure scaling
            avg_prob = np.mean([p[1] for p in top_picks])
            max_prob = np.max([p[1] for p in top_picks])
            
            # Combine average and max confidence
            confidence_score = 0.6 * avg_prob + 0.4 * max_prob
            
            # Scale exposure: starts at 40% at threshold, reaches 80% at high confidence
            base_exposure = 0.40
            max_exposure = 0.80
            exposure_scale = base_exposure + (max_exposure - base_exposure) * (confidence_score - threshold) / (1.0 - threshold)
            exposure_scale = np.clip(exposure_scale, base_exposure, max_exposure)
            
            active_total_weight = 0.98 * exposure_scale
            bench_weight = 0.98 - active_total_weight
            
            # Equal weight among picks
            targets = []
            weight_per_stock = active_total_weight / len(top_picks)
            for s, prob in top_picks:
                targets.append(PortfolioTarget(s, weight_per_stock))
            targets.append(PortfolioTarget(self._bench, bench_weight))
            
            picks_str = ", ".join([f"{s[0].Value}({s[1]:.2f})" for s in top_picks])
            self.Log(f"ALLOCATION: Active={exposure_scale:.1%} | QQQ={bench_weight/0.98:.1%} | Picks: {picks_str}")
            self.SetHoldings(targets, liquidateExistingHoldings=True)
            
        elif not self.Portfolio[self._bench].Invested:
            self.Log("No high-conviction signals. Defaulting to QQQ.")
            self.SetHoldings(self._bench, 0.98, liquidateExistingHoldings=True)