| Overall Statistics |
|
Total Orders 924 Average Win 2.39% Average Loss -1.92% Compounding Annual Return 30.950% Drawdown 55.600% Expectancy 0.211 Start Equity 10000 End Equity 50448.90 Net Profit 404.489% Sharpe Ratio 0.916 Sortino Ratio 1.147 Probabilistic Sharpe Ratio 44.209% Loss Rate 46% Win Rate 54% Profit-Loss Ratio 1.25 Alpha 0.145 Beta 0.663 Annual Standard Deviation 0.221 Annual Variance 0.049 Information Ratio 0.583 Tracking Error 0.198 Treynor Ratio 0.306 Total Fees $1166.51 Estimated Strategy Capacity $450000000.00 Lowest Capacity Asset AMD R735QTJ8XC9X Portfolio Turnover 25.14% Drawdown Recovery 951 |
#region imports
from AlgorithmImports import *
from xgboost import XGBClassifier
from sklearn.preprocessing import StandardScaler
import numpy as np
import pandas as pd
from datetime import datetime
#endregion
class EnhancedFeatureGoldenCross(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1)
self.SetEndDate(2025, 12, 30)
self.SetCash(10000)
# Assets
self.tickers = ["NVDA", "TSLA", "AMD", "MSFT", "AAPL", "GOOGL", "AMZN", "META"]
self.symbols = [self.AddEquity(t, Resolution.Daily).Symbol for t in self.tickers]
self._spy = self.AddEquity("SPY", Resolution.Daily).Symbol
self._gld = self.AddEquity("GLD", Resolution.Daily).Symbol
self._bil = self.AddEquity("BIL", Resolution.Daily).Symbol
# Macro Data
self.yield_10y = self.AddData(QuandlCustomColumns, "FRED/DGS10", Resolution.Daily).Symbol
self.yield_2y = self.AddData(QuandlCustomColumns, "FRED/DGS2", Resolution.Daily).Symbol
# Moving Averages
self.sma_50 = {}
self.sma_200 = {}
for symbol in self.symbols:
self.sma_50[symbol] = self.SMA(symbol, 50, Resolution.Daily)
self.sma_200[symbol] = self.SMA(symbol, 200, Resolution.Daily)
self.spy_sma_50 = self.SMA(self._spy, 50, Resolution.Daily)
self.spy_sma_200 = self.SMA(self._spy, 200, Resolution.Daily)
self.gld_sma = self.SMA(self._gld, 200, Resolution.Daily)
# Model Infrastructure
self.model = None
self.scaler = StandardScaler()
self.feature_names = None
self.training_data_years = 10
# SIMPLIFIED Risk Management - Back to basics with small tweaks
self.min_probability_threshold = 0.60
self.lookback_period = 250
# MODERATE position sizing (compromise)
self.base_position_size = 0.5
self.max_position_size = 0.85
# Simple risk controls
self.trailing_stop_pct = 0.075
self.min_days_between_trades = 1
# Position Management
self.highest_price = 0
self.invested_ticker = None
self.last_trade_date = None
self.initial_training_complete = False
# Performance Tracking
self.training_count = 0
self.trades_count = 0
self.SetWarmUp(200)
self.Train(self.DateRules.MonthEnd(), self.TimeRules.At(0, 0), self.MonthlyRetrain)
def GetFeatures(self, df):
"""
ENHANCED FEATURE SET - 20+ features for richer model
"""
close = df['close']
high = df['high']
low = df['low']
volume = df['volume']
features = pd.DataFrame(index=df.index)
features['returns_1d'] = close.pct_change(1)
features['returns_3d'] = close.pct_change(3)
features['returns_5d'] = close.pct_change(5)
features['returns_10d'] = close.pct_change(10)
features['returns_20d'] = close.pct_change(20)
features['returns_60d'] = close.pct_change(60)
sma50 = close.rolling(50).mean()
sma200 = close.rolling(200).mean()
features['price_to_sma50'] = (close / (sma50 + 1e-10)) - 1
features['price_to_sma200'] = (close / (sma200 + 1e-10)) - 1
features['golden_cross'] = (sma50 / (sma200 + 1e-10)) - 1
features['golden_cross_momentum'] = features['golden_cross'].diff(5)
features['ma_alignment'] = ((close > sma50).astype(int) +
(close > sma200).astype(int) +
(sma50 > sma200).astype(int))
ema12 = close.ewm(span=12, adjust=False).mean()
ema26 = close.ewm(span=26, adjust=False).mean()
features['macd'] = ema12 - ema26
features['macd_signal'] = features['macd'].ewm(span=9, adjust=False).mean()
features['macd_diff'] = features['macd'] - features['macd_signal']
delta = close.diff()
gain = delta.where(delta > 0, 0).rolling(9).mean()
loss = -delta.where(delta < 0, 0).rolling(9).mean()
rs = gain / (loss + 1e-10)
features['rsi'] = 100 - (100 / (1 + rs))
sma20 = close.rolling(20).mean()
std20 = close.rolling(20).std()
features['bb_position'] = (close - sma20) / (1.25 * std20 + 1e-10)
features['bb_width'] = (2.5 * std20) / (sma20 + 1e-10)
features['atr'] = (high - low).rolling(14).mean() / close
features['volatility_10d'] = close.pct_change().rolling(10).std()
features['volatility_20d'] = close.pct_change().rolling(20).std()
features['volatility_change'] = features['volatility_20d'].pct_change(10)
obv = (np.sign(close.diff()) * volume).fillna(0).cumsum()
features['obv_change'] = obv.pct_change(10)
features['obv_trend'] = obv.rolling(20).mean().pct_change(10)
features['volume_ratio'] = volume / (volume.rolling(20).mean() + 1e-10)
features['volume_momentum'] = features['volume_ratio'].diff(5)
features['high_low_ratio'] = (high - low) / close
features['close_position'] = (close - low) / (high - low + 1e-10)
return features
def GetMarketRegime(self):
if not self.spy_sma_50.IsReady or not self.spy_sma_200.IsReady:
return 'neutral'
spy_price = self.Securities[self._spy].Price
sma50 = self.spy_sma_50.Current.Value
sma200 = self.spy_sma_200.Current.Value
if sma50 > sma200 and spy_price > sma50:
return 'bull'
elif sma50 < sma200:
return 'bear'
else:
return 'neutral'
def InitialTrain(self):
"""Initial training with enhanced features"""
self.Debug(f"[INITIAL TRAINING] Starting")
days_to_request = self.training_data_years * 365 + 3685
history = self.History(self.symbols, days_to_request, Resolution.Daily)
if history.empty:
history = self.History(self.symbols, 1000, Resolution.Daily)
if history.empty:
return False
all_features, all_labels, all_weights = [], [], []
for s in self.symbols:
if s not in history.index:
continue
s_hist = history.loc[s]
feats = self.GetFeatures(s_hist)
future_returns = s_hist['close'].pct_change(5).shift(-5)
target = (future_returns > 0.005).astype(int)
combined = feats.join(target.rename('target')).dropna()
if len(combined) < 200:
continue
valid_data = combined.iloc[:-5]
time_weights = np.exp(np.linspace(-2, 0, len(valid_data)))
all_features.extend(valid_data.drop('target', axis=1).values.tolist())
all_labels.extend(valid_data['target'].values.tolist())
all_weights.extend(time_weights.tolist())
if len(all_features) < 500:
return False
X = np.array(all_features)
y = np.array(all_labels)
weights = np.array(all_weights)
sample_features = self.GetFeatures(history.loc[self.symbols[0]])
self.feature_names = sample_features.columns.tolist()
self.Debug(f"[TRAINING] Using {len(self.feature_names)} features")
X_scaled = self.scaler.fit_transform(X)
# Enhanced XGBoost model
self.model = XGBClassifier(
n_estimators=250, # More trees for more features
max_depth=6, # Deeper for complex interactions
learning_rate=0.03,
subsample=0.8,
colsample_bytree=0.8,
min_child_weight=3,
gamma=0.1,
reg_alpha=0.1,
reg_lambda=1.0,
random_state=42,
eval_metric='logloss'
)
self.model.fit(X_scaled, y, sample_weight=weights, verbose=False)
accuracy = np.mean(self.model.predict(X_scaled) == y)
self.Debug(f"[INITIAL TRAINING] SUCCESS: {len(y)} samples, Acc: {accuracy:.3f}")
self.training_count += 1
self.initial_training_complete = True
return True
def MonthlyRetrain(self):
if self.IsWarmingUp:
return
if not self.initial_training_complete:
self.InitialTrain()
return
self.Debug(f"[RETRAIN] {self.Time.strftime('%Y-%m-%d')}")
days_since_start = (self.Time - self.StartDate).days + (self.training_data_years * 365)
history = self.History(self.symbols, days_since_start, Resolution.Daily)
if history.empty:
return
all_features, all_labels, all_weights = [], [], []
for s in self.symbols:
if s not in history.index:
continue
s_hist = history.loc[s]
feats = self.GetFeatures(s_hist)
future_returns = s_hist['close'].pct_change(5).shift(-5)
target = (future_returns > 0.005).astype(int)
combined = feats.join(target.rename('target')).dropna()
if len(combined) < 200:
continue
valid_data = combined.iloc[:-5]
time_weights = np.exp(np.linspace(-2, 0, len(valid_data)))
all_features.extend(valid_data.drop('target', axis=1).values.tolist())
all_labels.extend(valid_data['target'].values.tolist())
all_weights.extend(time_weights.tolist())
if len(all_features) < 500:
return
X_scaled = self.scaler.fit_transform(np.array(all_features))
self.model.fit(X_scaled, np.array(all_labels), sample_weight=np.array(all_weights), verbose=False)
self.training_count += 1
def OnData(self, data):
if not self.initial_training_complete and not self.IsWarmingUp:
self.InitialTrain()
return
if self.IsWarmingUp or self.model is None:
return
if self.invested_ticker is not None and self.Portfolio[self.invested_ticker].Invested:
curr_price = self.Securities[self.invested_ticker].Price
if curr_price > self.highest_price:
self.highest_price = curr_price
stop_price = self.highest_price * (1 - self.trailing_stop_pct)
if curr_price < stop_price:
self.Debug(f"[STOP] {self.invested_ticker.Value}")
self.Liquidate(self.invested_ticker)
self.invested_ticker = None
self.highest_price = 0
return
if self.last_trade_date is not None:
if (self.Time - self.last_trade_date).days < self.min_days_between_trades:
return
h_stocks = self.History(self.symbols, self.lookback_period, Resolution.Daily)
if h_stocks.empty:
return
predictions = []
for s in self.symbols:
if s not in h_stocks.index or not data.ContainsKey(s):
continue
try:
s_feats = self.GetFeatures(h_stocks.loc[s])
if s_feats.empty or len(s_feats) < 200:
continue
last_row = s_feats.iloc[-1].values.reshape(1, -1)
if np.isnan(last_row).any():
continue
scaled_feat = self.scaler.transform(last_row)
prob = self.model.predict_proba(scaled_feat)[0][1]
sma50_val = self.sma_50[s].Current.Value
sma200_val = self.sma_200[s].Current.Value
is_golden_cross = sma50_val > sma200_val if (sma50_val > 0 and sma200_val > 0) else False
predictions.append({
'symbol': s,
'probability': prob,
'price': self.Securities[s].Price,
'golden_cross': is_golden_cross
})
except:
continue
if not predictions:
return
market_regime = self.GetMarketRegime()
golden_cross_stocks = [p for p in predictions if p['golden_cross']]
if golden_cross_stocks and len(golden_cross_stocks) > 0:
golden_cross_stocks.sort(key=lambda x: x['probability'], reverse=True)
best_pred = golden_cross_stocks[0]
else:
predictions.sort(key=lambda x: x['probability'], reverse=True)
best_pred = predictions[0]
ten_yr = self.Securities[self.yield_10y].Price
two_yr = self.Securities[self.yield_2y].Price
yield_inverted = (ten_yr < two_yr) if (ten_yr > 0 and two_yr > 0) else False
if market_regime == 'bull':
threshold = 0.56
elif market_regime == 'bear':
threshold = 0.62
else:
threshold = 0.58
if best_pred['probability'] > threshold:
if self.invested_ticker != best_pred['symbol']:
base_size = self.base_position_size
conviction = min((best_pred['probability'] - 0.5) * 2, 1.0)
conviction_boost = conviction * 0.10
gc_bonus = 0.05 if best_pred['golden_cross'] else 0
if market_regime == 'bull':
regime_mult = 1.1
elif market_regime == 'bear':
regime_mult = 0.8
else:
regime_mult = 1.0
final_size = (base_size + conviction_boost + gc_bonus) * regime_mult
final_size = min(final_size, self.max_position_size)
final_size = max(final_size, 0.30)
self.Debug(f"[TRADE] {best_pred['symbol'].Value} prob:{best_pred['probability']:.3f} "
f"size:{final_size:.1%} {market_regime}")
self.Liquidate()
self.SetHoldings(best_pred['symbol'], final_size)
self.invested_ticker = best_pred['symbol']
self.highest_price = best_pred['price']
self.last_trade_date = self.Time
self.trades_count += 1
elif yield_inverted and market_regime == 'bear':
if not self.Portfolio[self._bil].Invested:
self.Liquidate()
self.SetHoldings(self._bil, 0.60)
self.invested_ticker = None
# Note: No else clause - if no signal, stay invested in current position
def OnEndOfAlgorithm(self):
self.Debug(f"[COMPLETE] Retraining: {self.training_count}, Trades: {self.trades_count}")
if self.model and self.feature_names:
importances = self.model.feature_importances_
sorted_idx = np.argsort(importances)[::-1]
self.Debug(f"[FEATURES] Total features: {len(self.feature_names)}")
self.Debug("[FEATURES] Top 10 Important:")
for i in sorted_idx[:10]:
self.Debug(f" {i+1}. {self.feature_names[i]}: {importances[i]:.4f}")
class QuandlCustomColumns(PythonQuandl):
def __init__(self):
self.ValueColumnName = "Value"