| Overall Statistics |
|
Total Orders 4739 Average Win 0.68% Average Loss -0.57% Compounding Annual Return 71.858% Drawdown 12.600% Expectancy 0.147 Start Equity 25000 End Equity 33072.86 Net Profit 32.291% Sharpe Ratio 1.887 Sortino Ratio 3.078 Probabilistic Sharpe Ratio 78.053% Loss Rate 47% Win Rate 53% Profit-Loss Ratio 1.18 Alpha 0.442 Beta -0.066 Annual Standard Deviation 0.232 Annual Variance 0.054 Information Ratio 1.192 Tracking Error 0.32 Treynor Ratio -6.604 Total Fees $0.00 Estimated Strategy Capacity $3000000.00 Lowest Capacity Asset QQQ RIWIV7K5Z9LX Portfolio Turnover 380.33% |
# region imports
from AlgorithmImports import *
# endregion
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
import xgboost as xgb
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.neural_network import MLPClassifier
class MultiModelQQQScalpingAlgorithm(QCAlgorithm):
def Initialize(self):
# Set start and end dates
self.SetStartDate(2025, 1, 1)
self.SetEndDate(2025, 7, 8)
self.SetCash(25000)
self.set_brokerage_model(BrokerageName.CHARLES_SCHWAB, AccountType.MARGIN)
# Add QQQ with minute resolution
self.symbol = self.AddEquity("QQQ", Resolution.Minute).Symbol
# Model parameters
self.lookback_periods = 20
self.prediction_horizon = 5
self.buy_threshold = 0.001
self.sell_threshold = -0.001
# Model selection - choose your preferred model
self.model_type = "random_forest" # Options: "xgboost", "random_forest", "gradient_boost", "logistic", "svm", "neural_net", "ensemble"
# Initialize model
self.model = self._initialize_model()
self.scaler = StandardScaler()
self.feature_columns = []
# Label mapping for XGBoost compatibility
self.label_mapping = {-1: 0, 0: 1, 1: 2} # sell->0, hold->1, buy->2
self.reverse_mapping = {0: -1, 1: 0, 2: 1} # reverse mapping for predictions
# For ensemble method
self.models = {}
if self.model_type == "ensemble":
self._initialize_ensemble()
# Data storage
self.price_data = []
self.is_model_trained = False
self.last_prediction = 0
self.model_accuracy = 0.0
# Performance tracking
self.model_performance = {}
# Schedule model retraining every day at market open
self.Schedule.On(
self.DateRules.EveryDay(self.symbol),
self.TimeRules.AfterMarketOpen(self.symbol, 30),
self.TrainModel
)
# Schedule for Jan–Mar
self.Schedule.On(
self.DateRules.EveryDay(self.symbol),
self.TimeRules.Every(TimeSpan.FromMinutes(5)),
self.RunJanToMarLogic
)
# Schedule for Apr–Jul
self.Schedule.On(
self.DateRules.EveryDay(self.symbol),
self.TimeRules.Every(TimeSpan.FromMinutes(5)),
self.RunAprToJulLogic
)
# Risk management
self.max_position_size = 0.95
self.transaction_cost = 0.001
# Install XGBoost if using that model
if self.model_type == "xgboost":
self.Debug("XGBoost model selected")
def _initialize_model(self):
"""Initialize the selected model with optimized parameters"""
if self.model_type == "xgboost":
return xgb.XGBClassifier(
n_estimators=100,
max_depth=6,
learning_rate=0.1,
subsample=0.8,
colsample_bytree=0.8,
random_state=42,
n_jobs=-1,
eval_metric='mlogloss'
)
elif self.model_type == "random_forest":
return RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42,
n_jobs=-1,
min_samples_split=5,
min_samples_leaf=2
)
elif self.model_type == "gradient_boost":
return GradientBoostingClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=6,
random_state=42,
subsample=0.8
)
elif self.model_type == "logistic":
return LogisticRegression(
random_state=42,
max_iter=1000,
C=1.0,
solver='liblinear'
)
elif self.model_type == "svm":
return SVC(
kernel='rbf',
C=1.0,
gamma='scale',
random_state=42,
probability=True
)
elif self.model_type == "neural_net":
return MLPClassifier(
hidden_layer_sizes=(100, 50),
activation='relu',
solver='adam',
alpha=0.01,
learning_rate='adaptive',
random_state=42,
max_iter=500
)
else:
# Default to Random Forest
return RandomForestClassifier(
n_estimators=100,
max_depth=10,
random_state=42,
n_jobs=-1
)
def _initialize_ensemble(self):
"""Initialize multiple models for ensemble approach"""
self.models = {
'xgboost': xgb.XGBClassifier(n_estimators=50, max_depth=4, random_state=42),
'random_forest': RandomForestClassifier(n_estimators=50, max_depth=8, random_state=42),
'gradient_boost': GradientBoostingClassifier(n_estimators=50, max_depth=4, random_state=42),
'logistic': LogisticRegression(random_state=42, max_iter=500)
}
self.model_weights = {k: 1.0 for k in self.models.keys()}
def RunJanToMarLogic(self):
if 1 <= self.Time.month <= 3:
# Call your Jan-Mar logic here
self.MakeTradeDecision()
def RunAprToJulLogic(self):
if 4 <= self.Time.month <= 8:
# Call your Apr-Jul logic here
self.MakeReversedTradeDecision()
def OnData(self, data):
"""Store incoming data for model training and prediction"""
if not data.Bars.ContainsKey(self.symbol):
return
bar = data.Bars[self.symbol]
# Store price and volume data
price_info = {
'timestamp': self.Time,
'open': float(bar.Open),
'high': float(bar.High),
'low': float(bar.Low),
'close': float(bar.Close),
'volume': float(bar.Volume)
}
self.price_data.append(price_info)
# Keep only last 1000 data points for efficiency
if len(self.price_data) > 1000:
self.price_data = self.price_data[-1000:]
def CreateFeatures(self, df):
"""Create technical indicators and features"""
df = df.copy()
# Basic price features
df['price_change'] = df['close'].pct_change()
df['price_change_abs'] = df['price_change'].abs()
df['hl_ratio'] = (df['high'] - df['low']) / df['close']
df['oc_ratio'] = (df['open'] - df['close']) / df['close']
# Volume features
df['volume_change'] = df['volume'].pct_change()
df['volume_price_trend'] = df['volume'] * df['price_change']
# Momentum indicators
for period in [3, 5, 10, 20]:
df[f'momentum_{period}'] = df['close'].pct_change(period)
df[f'volatility_{period}'] = df['close'].rolling(period).std()
df[f'volume_ma_{period}'] = df['volume'].rolling(period).mean()
df[f'volume_ratio_{period}'] = df['volume'] / df[f'volume_ma_{period}']
# RSI
delta = df['close'].diff()
gain = delta.where(delta > 0, 0).rolling(window=14).mean()
loss = -delta.where(delta < 0, 0).rolling(window=14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))
# Moving averages
for ma in [5, 10, 20]:
df[f'ma_{ma}'] = df['close'].rolling(ma).mean()
df[f'ma_signal_{ma}'] = (df['close'] - df[f'ma_{ma}']) / df[f'ma_{ma}']
# Bollinger Bands
df['bb_upper'] = df['ma_20'] + df['close'].rolling(20).std() * 2
df['bb_lower'] = df['ma_20'] - df['close'].rolling(20).std() * 2
df['bb_position'] = (df['close'] - df['bb_lower']) / (df['bb_upper'] - df['bb_lower'])
# Lagged features
for lag in [1, 2, 3, 5]:
df[f'price_change_lag_{lag}'] = df['price_change'].shift(lag)
df[f'volume_change_lag_{lag}'] = df['volume_change'].shift(lag)
return df
def CreateLabels(self, df):
"""Create target labels for training"""
future_returns = df['close'].shift(-self.prediction_horizon) / df['close'] - 1
# Create labels: -1 (sell), 0 (hold), 1 (buy)
raw_labels = np.where(future_returns > self.buy_threshold, 1,
np.where(future_returns < self.sell_threshold, -1, 0))
# Map labels to be compatible with XGBoost (0, 1, 2)
mapped_labels = np.array([self.label_mapping[label] for label in raw_labels])
return mapped_labels
def PrepareFeatures(self, df):
"""Select and prepare features for model"""
feature_list = [
'price_change', 'price_change_abs', 'hl_ratio', 'oc_ratio',
'volume_change', 'volume_price_trend',
'momentum_3', 'momentum_5', 'momentum_10', 'momentum_20',
'volatility_3', 'volatility_5', 'volatility_10', 'volatility_20',
'volume_ratio_3', 'volume_ratio_5', 'volume_ratio_10', 'volume_ratio_20',
'rsi', 'ma_signal_5', 'ma_signal_10', 'ma_signal_20', 'bb_position',
'price_change_lag_1', 'price_change_lag_2', 'price_change_lag_3', 'price_change_lag_5',
'volume_change_lag_1', 'volume_change_lag_2', 'volume_change_lag_3', 'volume_change_lag_5'
]
self.feature_columns = [col for col in feature_list if col in df.columns]
return df[self.feature_columns]
def TrainModel(self):
"""Train the selected model or ensemble"""
if len(self.price_data) < 100:
return
try:
# Convert to DataFrame
df = pd.DataFrame(self.price_data)
# Create features
df = self.CreateFeatures(df)
# Create labels (now mapped to 0, 1, 2)
labels = self.CreateLabels(df)
# Prepare features
features = self.PrepareFeatures(df)
# Remove invalid data
valid_idx = ~(features.isna().any(axis=1) | np.isnan(labels))
features_clean = features[valid_idx]
labels_clean = labels[valid_idx]
if len(features_clean) < 50:
return
# Check if we have multiple classes
unique_labels = np.unique(labels_clean)
if len(unique_labels) < 2:
return
# Split data
X_train, X_test, y_train, y_test = train_test_split(
features_clean, labels_clean,
test_size=0.2,
stratify=labels_clean,
random_state=42
)
# Scale features
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
if self.model_type == "ensemble":
self._train_ensemble(X_train_scaled, X_test_scaled, y_train, y_test)
else:
self._train_single_model(X_train_scaled, X_test_scaled, y_train, y_test)
self.is_model_trained = True
except Exception as e:
self.Debug(f"Model training failed: {str(e)}")
def _train_single_model(self, X_train, X_test, y_train, y_test):
"""Train a single model"""
# Train model
self.model.fit(X_train, y_train)
# Evaluate
y_pred = self.model.predict(X_test)
self.model_accuracy = accuracy_score(y_test, y_pred)
self.Debug(f"{self.model_type.upper()} Model trained - Accuracy: {self.model_accuracy:.4f}")
# Feature importance (if available)
if hasattr(self.model, 'feature_importances_'):
importances = pd.DataFrame({
'feature': self.feature_columns,
'importance': self.model.feature_importances_
}).sort_values('importance', ascending=False)
self.Debug(f"Top 5 features: {importances.head().to_dict()}")
def _train_ensemble(self, X_train, X_test, y_train, y_test):
"""Train ensemble of models"""
accuracies = {}
for name, model in self.models.items():
try:
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
accuracies[name] = accuracy
self.Debug(f"{name.upper()} accuracy: {accuracy:.4f}")
except Exception as e:
self.Debug(f"Failed to train {name}: {str(e)}")
accuracies[name] = 0.0
# Update model weights based on performance
total_accuracy = sum(accuracies.values())
if total_accuracy > 0:
self.model_weights = {k: v/total_accuracy for k, v in accuracies.items()}
self.model_accuracy = max(accuracies.values()) if accuracies else 0.0
self.Debug(f"Ensemble trained - Best accuracy: {self.model_accuracy:.4f}")
def PredictSignal(self):
"""Make prediction using trained model"""
if not self.is_model_trained or len(self.price_data) < self.lookback_periods:
return 0
try:
# Convert recent data to DataFrame
recent_data = pd.DataFrame(self.price_data[-100:])
# Create features
df_features = self.CreateFeatures(recent_data)
# Prepare features
features = self.PrepareFeatures(df_features)
# Get the last row
last_features = features.iloc[-1:].values
# Check for NaN values
if np.isnan(last_features).any():
return 0
# Scale features
scaled_features = self.scaler.transform(last_features)
if self.model_type == "ensemble":
prediction = self._predict_ensemble(scaled_features)
else:
# Get prediction (0, 1, or 2) and map back to (-1, 0, 1)
mapped_prediction = self.model.predict(scaled_features)[0]
prediction = self.reverse_mapping[mapped_prediction]
return prediction
except Exception as e:
self.Debug(f"Prediction failed: {str(e)}")
return 0
def _predict_ensemble(self, scaled_features):
"""Make ensemble prediction"""
predictions = []
weights = []
for name, model in self.models.items():
try:
# Get mapped prediction and convert back
mapped_pred = model.predict(scaled_features)[0]
pred = self.reverse_mapping[mapped_pred]
predictions.append(pred)
weights.append(self.model_weights.get(name, 0))
except:
continue
if not predictions:
return 0
# Weighted average prediction
weighted_pred = np.average(predictions, weights=weights)
# Convert to discrete signal
if weighted_pred > 0.5:
return 1
elif weighted_pred < -0.5:
return -1
else:
return 0
def MakeTradeDecision(self):
"""Make trading decisions based on model predictions"""
if not self.is_model_trained:
return
# Get current position
current_position = self.Portfolio[self.symbol].Quantity
current_price = self.Securities[self.symbol].Price
# Get prediction
prediction = self.PredictSignal()
self.last_prediction = prediction
# Calculate position size
portfolio_value = self.Portfolio.TotalPortfolioValue
max_position_value = portfolio_value * self.max_position_size
max_shares = int(max_position_value / current_price)
# Trading logic
if prediction == 1 and current_position <= 0: # Buy signal
if current_position < 0:
self.MarketOrder(self.symbol, -current_position)
shares_to_buy = max_shares
if shares_to_buy > 0:
self.MarketOrder(self.symbol, shares_to_buy)
self.Debug(f"BUY: {shares_to_buy} shares at ${current_price:.2f} | Model: {self.model_type}")
elif prediction == -1 and current_position >= 0: # Sell signal
if current_position > 0:
self.MarketOrder(self.symbol, -current_position)
shares_to_short = -max_shares
if shares_to_short < 0:
self.MarketOrder(self.symbol, shares_to_short)
self.Debug(f"SELL: {abs(shares_to_short)} shares at ${current_price:.2f} | Model: {self.model_type}")
# Log current status
self.Debug(f"Model: {self.model_type} | Prediction: {prediction} | Accuracy: {self.model_accuracy:.4f} | Position: {current_position}")
def MakeReversedTradeDecision(self):
"""Make reversed trading decisions based on model predictions"""
if not self.is_model_trained:
return
# Get current position
current_position = self.Portfolio[self.symbol].Quantity
current_price = self.Securities[self.symbol].Price
# Get prediction
prediction = self.PredictSignal()
self.last_prediction = prediction
# Calculate position size
portfolio_value = self.Portfolio.TotalPortfolioValue
max_position_value = portfolio_value * self.max_position_size
max_shares = int(max_position_value / current_price)
# Reversed Trading logic
if prediction == 1 and current_position >= 0:
if current_position > 0:
self.MarketOrder(self.symbol, -current_position)
shares_to_short = -max_shares
if shares_to_short < 0:
self.MarketOrder(self.symbol, shares_to_short)
self.Debug(f"REVERSED SELL: {abs(shares_to_short)} shares at ${current_price:.2f} | Model: {self.model_type}")
elif prediction == -1 and current_position <= 0:
if current_position < 0:
self.MarketOrder(self.symbol, -current_position)
shares_to_buy = max_shares
if shares_to_buy > 0:
self.MarketOrder(self.symbol, shares_to_buy)
self.Debug(f"REVERSED BUY: {shares_to_buy} shares at ${current_price:.2f} | Model: {self.model_type}")
# Log current status
self.Debug(f"Model: {self.model_type} | Reversed Prediction: {prediction} | Accuracy: {self.model_accuracy:.4f} | Position: {current_position}")
def OnOrderEvent(self, orderEvent):
"""Handle order events"""
if orderEvent.Status == OrderStatus.Filled:
self.Debug(f"Order filled: {orderEvent.Symbol} - {orderEvent.FillQuantity} shares at ${orderEvent.FillPrice:.2f}")
def OnEndOfDay(self):
"""Log daily performance"""
daily_return = self.Portfolio.TotalPortfolioValue / 100000 - 1
self.Debug(f"Daily Return: {daily_return:.4f} | Model: {self.model_type} | Accuracy: {self.model_accuracy:.4f}")
self.Debug(f"Holdings: {self.Portfolio[self.symbol].Quantity} shares")
self.Debug(f"Cash: ${self.Portfolio.Cash:.2f}")