| Overall Statistics |
|
Total Orders 25 Average Win 2.46% Average Loss -2.55% Compounding Annual Return 12.322% Drawdown 17.200% Expectancy 0.310 Start Equity 100000 End Equity 109147.86 Net Profit 9.148% Sharpe Ratio 0.255 Sortino Ratio 0.257 Probabilistic Sharpe Ratio 30.705% Loss Rate 33% Win Rate 67% Profit-Loss Ratio 0.96 Alpha 0.011 Beta 0.28 Annual Standard Deviation 0.197 Annual Variance 0.039 Information Ratio -0.435 Tracking Error 0.209 Treynor Ratio 0.179 Total Fees $34.61 Estimated Strategy Capacity $0 Lowest Capacity Asset JNJ R735QTJ8XC9X Portfolio Turnover 4.75% |
from AlgorithmImports import *
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.neural_network import MLPRegressor
import pandas as pd
import numpy as np
from datetime import timedelta
class ExpectedOptionsReturnPredictabilityAlgorithm(QCAlgorithm):
def Initialize(self):
# **Change 1: Adjusted Start Date**
# Set backtest dates and initial capital
self.SetStartDate(2024, 1, 1) # Adjusted start date to 2014
self.SetEndDate(2024, 10, 1)
self.SetCash(100000)
# **Define self.symbols Before Use**
self.symbols = []
# Add SPY for scheduling purposes
self.spy = self.AddEquity("SPY", Resolution.Daily).Symbol
# **Change 2: Universe Warm-Up**
# Manually add initial universe symbols to access historical data before the first coarse selection
initial_universe = [
"AAPL", "MSFT", "GOOG", "AMZN", "META", "TSLA", "BRK.B", "JPM", "JNJ", "V"
]
for ticker in initial_universe:
# **Handle Tickers with Special Characters**
if ticker == "BRK.B":
# For tickers with '.', use the canonical ticker
symbol = self.AddEquity(ticker, Resolution.Daily).Symbol
else:
symbol = self.AddEquity(ticker, Resolution.Daily).Symbol
self.symbols.append(symbol)
# Universe selection: All US equities listed on NYSE, AMEX, or NASDAQ with price > $5
self.AddUniverse(self.CoarseSelectionFunction)
# Schedule monthly training and rebalance
self.Schedule.On(
self.DateRules.MonthStart(),
self.TimeRules.AfterMarketOpen(self.spy, 30),
self.TrainAndRebalance
)
# Initialize containers
self.features = pd.DataFrame()
self.target = pd.Series(dtype=np.float64)
self.models = []
self.ensemble_predictions = pd.Series(dtype=np.float64)
self.long_symbols = []
self.short_symbols = []
# **Change 3: Adjusted History Length**
# Initialize a rolling window for historical data
self.history_length = 252 * 3 # Approximately 3 years of trading days
# Flags
self.is_initialized = False
def CoarseSelectionFunction(self, coarse):
"""
Coarse selection function to filter symbols based on fundamental data,
price, and liquidity (volume).
"""
# Filter for US equities with price > $5 and high liquidity
filtered = [
c.Symbol for c in coarse
if c.HasFundamentalData
and c.Price > 5
and c.Volume > 1e6 # Liquidity filter
]
# Log the number of symbols selected
self.Log(f"CoarseSelectionFunction: Selected {len(filtered)} symbols.")
# Limit to top 100 symbols to manage computational load
return filtered[:100]
def TrainAndRebalance(self):
"""
Method to train machine learning models and rebalance the portfolio monthly.
"""
self.Log(f"TrainAndRebalance: Executing on {self.Time}")
# **Ensure Symbols are Added**
# Ensure that symbols are added to the algorithm before requesting historical data
self.symbols = [symbol for symbol in self.ActiveSecurities.Keys if symbol != self.spy]
self.Log(f"TrainAndRebalance: {len(self.symbols)} symbols selected for training and rebalance.")
if not self.symbols:
self.Log("TrainAndRebalance: No symbols available for training.")
return
# Step 1: Data Collection and Preprocessing
history = self.History(self.symbols, self.history_length, Resolution.Daily)
if history.empty:
self.Log("TrainAndRebalance: No historical data retrieved.")
return
# Prepare data for feature construction
self.PreprocessData(history)
# **Important Fix:** Align self.symbols with preprocessed data
# Update self.symbols to include only symbols present in self.prices
preprocessed_symbols = self.prices.columns.tolist()
self.symbols = [symbol for symbol in self.symbols if symbol.Value in preprocessed_symbols]
self.Log(f"TrainAndRebalance: {len(self.symbols)} symbols after preprocessing.")
if not self.symbols:
self.Log("TrainAndRebalance: No symbols available after preprocessing.")
return
# Step 2: Feature Construction
self.ConstructFeatures()
# Step 3: Define Target Variable
self.DefineTargetVariable()
if self.features.empty or self.target.empty:
self.Log("TrainAndRebalance: Features or target variable is empty after preprocessing.")
return
# Step 4: Model Training
self.TrainModels()
# Step 5: Ensemble Forecasting
self.EnsemblePredict()
# Step 6: Portfolio Construction and Rebalancing
self.RebalancePortfolio()
def PreprocessData(self, history):
"""
Preprocess historical data by extracting adjusted close prices and handling missing data.
"""
self.Log("PreprocessData: Preprocessing data...")
# Extract adjusted close prices
self.prices = history['close'].unstack(level=0)
self.prices = self.prices.fillna(method='ffill').fillna(method='bfill')
# Drop symbols with any missing data
self.prices = self.prices.dropna(axis=1, how='any')
self.Log(f"PreprocessData: {self.prices.shape[1]} symbols after dropping symbols with missing data.")
def ConstructFeatures(self):
"""
Construct features for machine learning models, including returns, volatility, and momentum.
"""
self.Log("ConstructFeatures: Constructing features...")
# Calculate past returns
returns = self.prices.pct_change().fillna(0)
# Calculate rolling volatility (20-day standard deviation of returns)
volatility = returns.rolling(window=20).std().fillna(0)
# Calculate momentum (price ratio over 20 days)
momentum = self.prices / self.prices.shift(20) - 1
momentum = momentum.fillna(0)
# Combine features into a single DataFrame
self.features = pd.concat([returns, volatility, momentum], axis=1)
# Dynamically name the features based on symbols
return_columns = [f"Return_{symbol}" for symbol in self.prices.columns]
volatility_columns = [f"Volatility_{symbol}" for symbol in self.prices.columns]
momentum_columns = [f"Momentum_{symbol}" for symbol in self.prices.columns]
self.features.columns = return_columns + volatility_columns + momentum_columns
self.Log(f"ConstructFeatures: Features shape {self.features.shape}")
def DefineTargetVariable(self):
"""
Define the target variable as the next day's return for each symbol.
"""
self.Log("DefineTargetVariable: Defining target variable...")
# Define target as next day's return for each symbol
future_returns = self.prices.pct_change().shift(-1)
# Drop the last day since it doesn't have a future return
future_returns = future_returns.iloc[:-1]
current_features = self.features.iloc[:-1]
# Check for alignment
if current_features.shape[0] != future_returns.shape[0]:
self.Log("DefineTargetVariable: Mismatch between features and future returns.")
return
# Number of symbols
num_symbols = len(self.prices.columns)
num_features_per_symbol = 3
# Initialize lists
feature_list = []
target_list = []
symbol_list = []
for day in range(current_features.shape[0]):
for idx, symbol in enumerate(self.prices.columns):
# Get features for the symbol
feature_values = current_features.iloc[day].values[
idx:num_symbols * num_features_per_symbol:num_symbols
]
feature_list.append(feature_values)
# Get target for the symbol
target_value = future_returns.iloc[day][symbol]
target_list.append(target_value)
# Record symbol
symbol_list.append(symbol)
# Convert to numpy arrays
X = np.array(feature_list)
y = np.array(target_list)
# Remove samples with NaN targets
valid_indices = ~np.isnan(y)
X = X[valid_indices]
y = y[valid_indices]
symbol_list = np.array(symbol_list)[valid_indices]
# Assign to self.features and self.target
self.features = pd.DataFrame(X, columns=["Return", "Volatility", "Momentum"])
self.target = pd.Series(y)
self.symbol_list = symbol_list.tolist()
self.Log(f"DefineTargetVariable: Features shape {self.features.shape}, Target shape {self.target.shape}")
def TrainModels(self):
"""
Train machine learning models using the prepared features and target variable.
"""
self.Log("TrainModels: Training models...")
# Prepare training data
X_train = self.features.values
y_train = self.target.values
self.Log(f"TrainModels: X_train shape {X_train.shape}, y_train shape {y_train.shape}")
if len(X_train) == 0 or len(y_train) == 0:
self.Log("TrainModels: Insufficient data for training.")
return
# Initialize models
rf = RandomForestRegressor(n_estimators=100, random_state=42)
gbt = GradientBoostingRegressor(n_estimators=100, random_state=42)
mlp = MLPRegressor(hidden_layer_sizes=(50,), max_iter=500, random_state=42)
# Train models
rf.fit(X_train, y_train)
gbt.fit(X_train, y_train)
mlp.fit(X_train, y_train)
self.models = [rf, gbt, mlp]
self.Log("TrainModels: Models trained successfully.")
def EnsemblePredict(self):
"""
Generate ensemble predictions for each symbol based on trained models.
"""
self.Log("EnsemblePredict: Generating ensemble predictions...")
if not self.models:
self.Log("EnsemblePredict: No trained models available for prediction.")
return
# Make predictions for all symbols using their latest features
# self.features now has rows: (samples, 3)
# and symbol_list has the same length as self.features
# To get the latest features per symbol, find the last occurrence of each symbol
latest_features = self.features.copy()
latest_features['Symbol'] = self.symbol_list
# Keep the last occurrence per symbol
latest_features = latest_features.drop_duplicates(subset=['Symbol'], keep='last')
latest_features = latest_features.set_index('Symbol')
self.ensemble_predictions = pd.Series(dtype=np.float64)
for symbol in self.prices.columns:
if symbol not in latest_features.index:
self.Log(f"EnsemblePredict: No features found for symbol {symbol}.")
continue
# Get the symbol's latest features
symbol_features = latest_features.loc[symbol].values.reshape(1, -1)
# Predict using each model
model_predictions = []
for model in self.models:
try:
pred = model.predict(symbol_features)[0]
model_predictions.append(pred)
except Exception as e:
self.Log(f"EnsemblePredict: Prediction error for {symbol} with model {model.__class__.__name__}: {e}")
model_predictions.append(0) # Assign a neutral prediction in case of error
# Average predictions to form ensemble prediction
ensemble_pred = np.mean(model_predictions)
# Store the prediction
# Map back to the QuantConnect Symbol object
qc_symbol = [s for s in self.symbols if s.Value == symbol]
if qc_symbol:
self.ensemble_predictions[qc_symbol[0]] = ensemble_pred
self.Log(f"EnsemblePredict: Symbol {symbol} ensemble prediction: {ensemble_pred}")
def RebalancePortfolio(self):
"""
Rebalance the portfolio based on ensemble predictions by creating a long-short strategy.
"""
self.Log("RebalancePortfolio: Rebalancing portfolio...")
if self.ensemble_predictions.empty:
self.Log("RebalancePortfolio: No ensemble predictions available for rebalance.")
return
# Rank predictions and assign deciles
ranked = self.ensemble_predictions.rank(method='first')
try:
deciles = pd.qcut(ranked, 10, labels=False)
self.Log(f"RebalancePortfolio: Deciles assigned successfully.")
except ValueError as e:
self.Log(f"RebalancePortfolio: Error in decile assignment: {e}")
return
# Assign symbols to deciles
top_decile = deciles[deciles == 9].index.tolist()
bottom_decile = deciles[deciles == 0].index.tolist()
self.long_symbols = top_decile
self.short_symbols = bottom_decile
self.Log(f"RebalancePortfolio: Long symbols ({len(self.long_symbols)}): {[s.Value for s in self.long_symbols]}")
self.Log(f"RebalancePortfolio: Short symbols ({len(self.short_symbols)}): {[s.Value for s in self.short_symbols]}")
# Check if deciles have symbols
if not self.long_symbols:
self.Log("RebalancePortfolio: No symbols in the top decile for long positions.")
if not self.short_symbols:
self.Log("RebalancePortfolio: No symbols in the bottom decile for short positions.")
# Determine weights
num_long = len(self.long_symbols)
num_short = len(self.short_symbols)
long_weight = 0.5 / num_long if num_long > 0 else 0
short_weight = -0.5 / num_short if num_short > 0 else 0
# Liquidate existing positions not in our new lists
symbols_to_hold = set(self.long_symbols + self.short_symbols)
for symbol in list(self.Portfolio.Keys):
if symbol not in symbols_to_hold:
self.Liquidate(symbol)
self.Log(f"RebalancePortfolio: Liquidated {symbol.Value}")
# Set holdings for long positions
for symbol in self.long_symbols:
self.SetHoldings(symbol, long_weight)
self.Log(f"RebalancePortfolio: Long {symbol.Value} at weight {long_weight}")
# Set holdings for short positions
for symbol in self.short_symbols:
self.SetHoldings(symbol, short_weight)
self.Log(f"RebalancePortfolio: Short {symbol.Value} at weight {short_weight}")
self.Log("RebalancePortfolio: Portfolio rebalanced successfully.")
def OnData(self, data):
"""
Handle incoming data. Since the algorithm makes trades monthly,
this function can be used to monitor portfolio performance or implement delta-hedging logic.
"""
# Example: Log the current portfolio's value and holdings
if not data:
return
# Log latest prices for symbols in the universe
for symbol in self.symbols:
if data.ContainsKey(symbol):
security_data = data[symbol]
if security_data is not None and security_data.Price is not None:
price = security_data.Price
self.Debug(f"OnData: {symbol.Value} price: {price}")
else:
self.Debug(f"OnData: {symbol.Value} price data is not available.")
else:
self.Debug(f"OnData: {symbol.Value} not present in current data.")
# Monitor and log portfolio holdings
for holding in self.Portfolio.Values:
if holding.Invested:
# Ensure that MarketPrice is available
market_price = holding.Price if holding.Price is not None else "N/A"
self.Debug(f"OnData: Holding {holding.Symbol.Value}, Quantity: {holding.Quantity}, "
f"AveragePrice: {holding.AveragePrice}, MarketPrice: {market_price}")