| Overall Statistics |
|
Total Orders 2535 Average Win 0.51% Average Loss -0.35% Compounding Annual Return 31.647% Drawdown 44.000% Expectancy 0.403 Start Equity 10000 End Equity 52080.97 Net Profit 420.810% Sharpe Ratio 0.829 Sortino Ratio 0.928 Probabilistic Sharpe Ratio 32.438% Loss Rate 42% Win Rate 58% Profit-Loss Ratio 1.43 Alpha 0.075 Beta 1.13 Annual Standard Deviation 0.267 Annual Variance 0.071 Information Ratio 0.729 Tracking Error 0.126 Treynor Ratio 0.196 Total Fees $2496.04 Estimated Strategy Capacity $1400000000.00 Lowest Capacity Asset AMD R735QTJ8XC9X Portfolio Turnover 12.66% Drawdown Recovery 770 |
from AlgorithmImports import *
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from datetime import datetime
import numpy as np
import pandas as pd
class IRPrecisionFalcon(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1)
self.SetEndDate(2025, 12, 30)
self.SetCash(10000)
self.tickers = ["AAPL", "AMZN", "GOOGL", "META", "MSFT", "NVDA", "TSLA", "AMD"]
self.symbols = [self.AddEquity(t, Resolution.Daily).Symbol for t in self.tickers]
self._bench = self.AddEquity("QQQ", Resolution.Daily).Symbol
self.SetBenchmark(self._bench)
# Macro Gatekeeper (Golden Cross)
self.sma50 = self.SMA(self._bench, 50, Resolution.Daily)
self.sma200 = self.SMA(self._bench, 200, Resolution.Daily)
# OPTIMIZED: Better hyperparameters for financial data
# - Increased trees for better stability
# - Deeper trees to capture complex patterns
# - min_samples_split prevents overfitting on noise
# - min_samples_leaf ensures leaf nodes have enough samples
# - max_features='sqrt' reduces correlation between trees
self.model_trump = RandomForestClassifier(
n_estimators=200, # More trees = more stable predictions
max_depth=8, # Deeper to capture complexity
min_samples_split=20, # Prevent splitting on small samples
min_samples_leaf=10, # Ensure leaves have enough data
max_features='sqrt', # Reduce tree correlation
random_state=42,
n_jobs=-1, # Parallel processing
class_weight='balanced' # Handle class imbalance
)
self.model_std = RandomForestClassifier(
n_estimators=200,
max_depth=8,
min_samples_split=20,
min_samples_leaf=10,
max_features='sqrt',
random_state=42,
n_jobs=-1,
class_weight='balanced'
)
# OPTIMIZED: Separate scalers for each regime to avoid data leakage
self.scaler_trump = StandardScaler()
self.scaler_std = StandardScaler()
# OPTIMIZED: Adjusted thresholds based on balanced classes
self.min_confidence_trump = 0.65 # More realistic for balanced RF
self.min_confidence_std = 0.50
# OPTIMIZED: Cache for feature calculation
self.feature_cache = {}
self.cache_timestamp = None
self.SetWarmUp(252)
# OPTIMIZED: Train more frequently for better adaptation
self.Train(self.DateRules.WeekStart(), self.TimeRules.At(0, 0), self.TrainModel)
def IsTrumpRegime(self, time):
is_term1 = (time >= datetime(2017, 1, 20) and time < datetime(2021, 1, 20))
is_term2 = (time >= datetime(2025, 1, 20))
return is_term1 or is_term2
def GetFeatures(self, history_df):
"""OPTIMIZED: Enhanced feature engineering with additional technical indicators"""
closes = history_df['close'].unstack(level=0)
highs = history_df['high'].unstack(level=0)
lows = history_df['low'].unstack(level=0)
volumes = history_df['volume'].unstack(level=0) if 'volume' in history_df.columns else None
# Multiple timeframe momentum
returns_5 = closes.pct_change(5)
returns_10 = closes.pct_change(10)
returns_20 = closes.pct_change(20)
# MACD
ema12 = closes.ewm(span=12, adjust=False).mean()
ema26 = closes.ewm(span=26, adjust=False).mean()
macd = ema12 - ema26
macd_signal = macd.ewm(span=9, adjust=False).mean()
macd_hist = macd - macd_signal
# Bollinger Bands
bb_sma = closes.rolling(window=20).mean()
bb_std = closes.rolling(window=20).std()
bb_percent_b = (closes - (bb_sma - 1.25 * bb_std)) / (2.5 * bb_std)
bb_width = (2.5 * bb_std) / bb_sma # Volatility measure
# ATR
tr = np.maximum(highs - lows, np.maximum((highs - closes.shift(1)).abs(), (lows - closes.shift(1)).abs()))
atr = tr.rolling(window=14).mean()
norm_atr = atr / closes
# RSI
delta = closes.diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
rsi = 100 - (100 / (1 + rs))
# Volume features (if available)
if volumes is not None:
vol_sma = volumes.rolling(window=20).mean()
vol_ratio = volumes / vol_sma
else:
vol_ratio = pd.DataFrame(1, index=closes.index, columns=closes.columns)
features_dict = {}
for s in self.symbols:
s_val = s.Value
if s_val not in closes.columns:
continue
s_closes, s_highs, s_lows = closes[s_val], highs[s_val], lows[s_val]
# OPTIMIZED: Vectorized PSAR calculation
psar, bull, af, ep = self._calculate_psar(s_closes, s_highs, s_lows)
psar_signal = (s_closes > psar).astype(int)
# Relative strength vs benchmark
alpha_5 = returns_5[s_val] - returns_5[self._bench.Value] if self._bench.Value in returns_5.columns else 0
alpha_10 = returns_10[s_val] - returns_10[self._bench.Value] if self._bench.Value in returns_10.columns else 0
alpha_20 = returns_20[s_val] - returns_20[self._bench.Value] if self._bench.Value in returns_20.columns else 0
# OPTIMIZED: More comprehensive feature set
df = pd.DataFrame({
'ret_5': returns_5[s_val],
'ret_10': returns_10[s_val],
'ret_20': returns_20[s_val],
'alpha_5': alpha_5,
'alpha_10': alpha_10,
'alpha_20': alpha_20,
'macd': macd[s_val],
'macd_hist': macd_hist[s_val],
'bb_pct': bb_percent_b[s_val],
'bb_width': bb_width[s_val],
'atr': norm_atr[s_val],
'rsi': rsi[s_val],
'psar': psar_signal,
'vol_ratio': vol_ratio[s_val]
}).dropna()
features_dict[s_val] = df
return features_dict, closes
def _calculate_psar(self, closes, highs, lows):
"""OPTIMIZED: Efficient PSAR calculation"""
psar = closes.copy()
if len(closes) < 2:
return psar, True, 0.02, lows.iloc[0]
bull, af, ep = True, 0.02, lows.iloc[0]
psar.iloc[0] = highs.iloc[0]
for i in range(1, len(closes)):
psar.iloc[i] = psar.iloc[i-1] + af * (ep - psar.iloc[i-1])
if bull:
if lows.iloc[i] < psar.iloc[i]:
bull, psar.iloc[i], ep, af = False, ep, lows.iloc[i], 0.02
elif highs.iloc[i] > ep:
ep, af = highs.iloc[i], min(af + 0.02, 0.2)
else:
if highs.iloc[i] > psar.iloc[i]:
bull, psar.iloc[i], ep, af = True, ep, highs.iloc[i], 0.02
elif lows.iloc[i] < ep:
ep, af = lows.iloc[i], min(af + 0.02, 0.2)
return psar, bull, af, ep
def TrainModel(self):
"""OPTIMIZED: Improved training with better data handling and validation"""
history = self.History(self.symbols + [self._bench], datetime(2009, 1, 1), self.Time, Resolution.Daily)
if history.empty:
return
feat_data, prices = self.GetFeatures(history)
t_feat, t_lab, s_feat, s_lab = [], [], [], []
# OPTIMIZED: Forward-looking period for labels (5 days)
forward_days = 5
for s in self.symbols:
s_val = s.Value
if s_val not in feat_data or feat_data[s_val].empty:
continue
df = feat_data[s_val]
# OPTIMIZED: Ensure we have benchmark prices for all dates
valid_dates = df.index.intersection(prices.index)
if len(valid_dates) < forward_days + 1:
continue
for i in range(len(valid_dates) - forward_days):
date = valid_dates[i]
future_date = valid_dates[i + forward_days]
# Get feature vector
feat_vector = df.loc[date].tolist()
# Calculate forward returns
try:
f_s = (prices[s_val].loc[future_date] / prices[s_val].loc[date]) - 1
f_b = (prices[self._bench.Value].loc[future_date] / prices[self._bench.Value].loc[date]) - 1
except:
continue
# OPTIMIZED: More nuanced labeling - significant outperformance
label = 1 if f_s > f_b + 0.001 else 0 # 0.1% threshold for noise reduction
# Separate by regime
if self.IsTrumpRegime(date):
# Only use data from 2018+ for better quality
if date >= datetime(2018, 1, 1):
t_feat.append(feat_vector)
t_lab.append(label)
else:
s_feat.append(feat_vector)
s_lab.append(label)
# OPTIMIZED: Train with sufficient data and proper scaling
min_samples = 200 # Increased minimum for better model quality
if len(t_feat) >= min_samples:
X_t = self.scaler_trump.fit_transform(t_feat)
self.model_trump.fit(X_t, t_lab)
train_score = self.model_trump.score(X_t, t_lab)
self.Log(f"Trump Model Trained: {len(t_feat)} samples, Train Acc: {train_score:.3f}")
if len(s_feat) >= min_samples:
X_s = self.scaler_std.fit_transform(s_feat)
self.model_std.fit(X_s, s_lab)
train_score = self.model_std.score(X_s, s_lab)
self.Log(f"Std Model Trained: {len(s_feat)} samples, Train Acc: {train_score:.3f}")
def OnData(self, data):
if self.IsWarmingUp:
return
# OPTIMIZED: Macro Filter with stronger signal
is_bull_macro = self.sma50.Current.Value > self.sma200.Current.Value
if not is_bull_macro:
if not self.Portfolio[self._bench].Invested:
self.Log(f"MACRO PROTECT: Death Cross detected. Full QQQ allocation.")
self.SetHoldings(self._bench, 0.98, liquidateExistingHoldings=True)
return
# Select regime-appropriate model and scaler
is_trump = self.IsTrumpRegime(self.Time)
model = self.model_trump if is_trump else self.model_std
scaler = self.scaler_trump if is_trump else self.scaler_std
threshold = self.min_confidence_trump if is_trump else self.min_confidence_std
# Check if model is trained
try:
model.predict_proba([[0] * 14]) # Updated for new feature count
except:
if not self.Portfolio[self._bench].Invested:
self.SetHoldings(self._bench, 0.98, liquidateExistingHoldings=True)
return
# Get features
hist = self.History(self.symbols + [self._bench], 60, Resolution.Daily)
if hist.empty:
return
feat_data, _ = self.GetFeatures(hist)
# OPTIMIZED: Score all candidates with feature importance consideration
candidates = []
all_scores = []
for s in self.symbols:
s_val = s.Value
if s_val not in feat_data or feat_data[s_val].empty:
continue
try:
feat_vector = scaler.transform([feat_data[s_val].iloc[-1].tolist()])
prob = model.predict_proba(feat_vector)[0][1]
all_scores.append(f"{s_val}:{prob:.2f}")
if prob > threshold:
candidates.append((s, prob))
except Exception as e:
continue
# Log predictions
regime = "Trump" if is_trump else "Standard"
self.Log(f"[{regime}] Scores: {', '.join(all_scores)} | Threshold: {threshold:.2f}")
# OPTIMIZED: Portfolio Construction with risk management
if candidates:
# Take top 3 picks for diversification (was 2)
top_picks = sorted(candidates, key=lambda x: x[1], reverse=True)[:3]
# OPTIMIZED: More conservative exposure scaling
avg_prob = np.mean([p[1] for p in top_picks])
max_prob = np.max([p[1] for p in top_picks])
# Combine average and max confidence
confidence_score = 0.6 * avg_prob + 0.4 * max_prob
# Scale exposure: starts at 40% at threshold, reaches 80% at high confidence
base_exposure = 0.40
max_exposure = 0.80
exposure_scale = base_exposure + (max_exposure - base_exposure) * (confidence_score - threshold) / (1.0 - threshold)
exposure_scale = np.clip(exposure_scale, base_exposure, max_exposure)
active_total_weight = 0.98 * exposure_scale
bench_weight = 0.98 - active_total_weight
# Equal weight among picks
targets = []
weight_per_stock = active_total_weight / len(top_picks)
for s, prob in top_picks:
targets.append(PortfolioTarget(s, weight_per_stock))
targets.append(PortfolioTarget(self._bench, bench_weight))
picks_str = ", ".join([f"{s[0].Value}({s[1]:.2f})" for s in top_picks])
self.Log(f"ALLOCATION: Active={exposure_scale:.1%} | QQQ={bench_weight/0.98:.1%} | Picks: {picks_str}")
self.SetHoldings(targets, liquidateExistingHoldings=True)
elif not self.Portfolio[self._bench].Invested:
self.Log("No high-conviction signals. Defaulting to QQQ.")
self.SetHoldings(self._bench, 0.98, liquidateExistingHoldings=True)