| Overall Statistics |
|
Total Orders 932 Average Win 0.72% Average Loss -0.59% Compounding Annual Return 10.245% Drawdown 22.800% Expectancy 0.083 Start Equity 1000000 End Equity 1206012.79 Net Profit 20.601% Sharpe Ratio 0.18 Sortino Ratio 0.227 Probabilistic Sharpe Ratio 18.099% Loss Rate 51% Win Rate 49% Profit-Loss Ratio 1.22 Alpha 0.052 Beta -0.182 Annual Standard Deviation 0.187 Annual Variance 0.035 Information Ratio -0.278 Tracking Error 0.247 Treynor Ratio -0.185 Total Fees $5783.08 Estimated Strategy Capacity $55000.00 Lowest Capacity Asset IPDN VEN1SVFIVSKL Portfolio Turnover 6.31% Drawdown Recovery 267 |
# from AlgorithmImports import *
# from sklearn.ensemble import RandomForestRegressor
# import numpy as np
# import pandas as pd
# class GKXMachineLearning(QCAlgorithm):
# """
# Simplified implementation of Gu, Kelly, Xiu (2020)
# Using Random Forest for return prediction
# """
# def Initialize(self):
# self.SetStartDate(2024, 1, 1)
# self.SetEndDate(2025, 12, 1)
# self.SetCash(1000000)
# # Universe: Liquid stocks
# self.AddUniverse(self.CoarseSelectionFunction)
# # Model settings
# self.lookback = 252 # 1 year of data
# self.retrain_frequency = 21 # Monthly
# self.days_since_training = 0
# # Track if model is trained
# self.model_trained = False
# # Initialize Random Forest model
# self.rf_model = RandomForestRegressor(
# n_estimators=100,
# max_depth=6,
# max_features=3, # Key hyperparameter from paper
# random_state=42
# )
# # Warm up period
# self.SetWarmUp(self.lookback)
# def CoarseSelectionFunction(self, coarse):
# """Select top 500 liquid stocks"""
# # Filter for stocks with fundamental data and price > $5
# filtered = [x for x in coarse
# if x.HasFundamentalData
# and x.Price > 5
# and x.DollarVolume > 1000000] # Min $1M daily volume
# # Sort by dollar volume and take top 500
# sorted_by_volume = sorted(filtered, key=lambda x: x.DollarVolume, reverse=True)
# return [x.Symbol for x in sorted_by_volume[:500]]
# def CalculateFeatures(self, symbol):
# """
# Calculate features for prediction (simplified version of GKX)
# """
# features = {}
# try:
# # Get price history
# history = self.History(symbol, self.lookback, Resolution.Daily)
# if history.empty or len(history) < self.lookback:
# return None
# # Handle multi-index DataFrame if necessary
# if isinstance(history.index, pd.MultiIndex):
# # If multi-index, get data for this specific symbol
# if symbol in history.index.get_level_values('symbol'):
# history = history.loc[symbol]
# else:
# return None
# # Access data - QuantConnect returns lowercase column names
# if 'close' in history.columns:
# prices = history['close'].values
# else:
# return None
# if 'volume' in history.columns:
# volumes = history['volume'].values
# else:
# return None
# if 'high' in history.columns:
# highs = history['high'].values
# else:
# return None
# if 'low' in history.columns:
# lows = history['low'].values
# else:
# return None
# # Ensure we have valid data
# if len(prices) < 21: # Need at least 21 days for momentum
# return None
# # 1. MOMENTUM FEATURES (most important in paper)
# features['momentum_1m'] = (prices[-1] / prices[-21] - 1) if len(prices) > 21 else 0
# features['momentum_3m'] = (prices[-1] / prices[-63] - 1) if len(prices) > 63 else 0
# features['momentum_6m'] = (prices[-1] / prices[-126] - 1) if len(prices) > 126 else 0
# features['momentum_12m'] = (prices[-1] / prices[-252] - 1) if len(prices) > 252 else 0
# # Skip month momentum (important feature from paper)
# if len(prices) > 252:
# features['momentum_12m_skip'] = (prices[-21] / prices[-252] - 1)
# else:
# features['momentum_12m_skip'] = 0
# # 2. VOLATILITY FEATURES
# if len(prices) > 1:
# returns = np.diff(prices) / prices[:-1]
# features['volatility_1m'] = np.std(returns[-21:]) if len(returns) >= 21 else 0
# features['volatility_3m'] = np.std(returns[-63:]) if len(returns) >= 63 else 0
# features['volatility_12m'] = np.std(returns) if len(returns) > 0 else 0
# else:
# features['volatility_1m'] = 0
# features['volatility_3m'] = 0
# features['volatility_12m'] = 0
# # 3. VOLUME FEATURES
# if np.mean(volumes[-21:]) > 0:
# features['volume'] = np.log(np.mean(volumes[-21:]))
# features['volume_trend'] = (np.mean(volumes[-5:]) / np.mean(volumes[-21:]) - 1)
# else:
# features['volume'] = 0
# features['volume_trend'] = 0
# # Dollar volume
# dollar_volumes = prices * volumes
# features['dollar_volume'] = np.log(np.mean(dollar_volumes[-21:]) + 1)
# # 4. FUNDAMENTAL FEATURES (if available)
# if self.Securities[symbol].Fundamentals:
# fundamentals = self.Securities[symbol].Fundamentals
# # Size
# if fundamentals.MarketCap and fundamentals.MarketCap > 0:
# features['size'] = np.log(fundamentals.MarketCap)
# else:
# features['size'] = 0
# # Value
# if fundamentals.ValuationRatios:
# # Book to Market
# if fundamentals.ValuationRatios.BookValuePerShare and prices[-1] > 0:
# features['book_to_market'] = fundamentals.ValuationRatios.BookValuePerShare / prices[-1]
# else:
# features['book_to_market'] = 0
# # P/E Ratio
# if fundamentals.ValuationRatios.PERatio:
# features['pe_ratio'] = fundamentals.ValuationRatios.PERatio
# else:
# features['pe_ratio'] = 0
# else:
# features['book_to_market'] = 0
# features['pe_ratio'] = 0
# # Quality
# if fundamentals.OperationRatios:
# # ROE
# if fundamentals.OperationRatios.ROE and fundamentals.OperationRatios.ROE.Value:
# features['profitability'] = fundamentals.OperationRatios.ROE.Value
# else:
# features['profitability'] = 0
# # Revenue Growth
# if fundamentals.OperationRatios.RevenueGrowth and fundamentals.OperationRatios.RevenueGrowth.Value:
# features['investment'] = fundamentals.OperationRatios.RevenueGrowth.Value
# else:
# features['investment'] = 0
# # ROA
# if fundamentals.OperationRatios.ROA and fundamentals.OperationRatios.ROA.Value:
# features['roa'] = fundamentals.OperationRatios.ROA.Value
# else:
# features['roa'] = 0
# else:
# features['profitability'] = 0
# features['investment'] = 0
# features['roa'] = 0
# else:
# # Set all fundamental features to 0 if not available
# features['size'] = 0
# features['book_to_market'] = 0
# features['pe_ratio'] = 0
# features['profitability'] = 0
# features['investment'] = 0
# features['roa'] = 0
# # 5. TECHNICAL FEATURES
# # RSI
# features['rsi'] = self.CalculateRSI(prices)
# # Price relative to moving averages
# if len(prices) >= 20:
# features['price_to_sma20'] = prices[-1] / np.mean(prices[-20:]) - 1
# else:
# features['price_to_sma20'] = 0
# if len(prices) >= 50:
# features['price_to_sma50'] = prices[-1] / np.mean(prices[-50:]) - 1
# else:
# features['price_to_sma50'] = 0
# # 6. MARKET MICROSTRUCTURE
# # Bid-ask spread proxy using high-low
# if len(prices) > 0:
# features['bid_ask_spread'] = np.mean((highs - lows) / prices)
# else:
# features['bid_ask_spread'] = 0
# # Amihud illiquidity
# if len(prices) > 1 and len(volumes) > 1:
# returns_abs = np.abs(np.diff(prices) / prices[:-1])
# volumes_adj = volumes[1:] + 1 # Add 1 to avoid division by zero
# features['illiquidity'] = np.mean(returns_abs / volumes_adj) * 1e6
# else:
# features['illiquidity'] = 0
# # Clean features - replace NaN and Inf with 0
# for key in features:
# if np.isnan(features[key]) or np.isinf(features[key]):
# features[key] = 0
# return features
# except Exception as e:
# self.Debug(f"Error calculating features for {symbol}: {str(e)}")
# return None
# def CalculateRSI(self, prices, period=14):
# """Calculate RSI indicator"""
# if len(prices) < period + 1:
# return 50 # Neutral RSI
# # Calculate price changes
# deltas = np.diff(prices[-period-1:])
# # Separate gains and losses
# gains = np.where(deltas > 0, deltas, 0)
# losses = np.where(deltas < 0, -deltas, 0)
# # Calculate average gain and loss
# avg_gain = np.mean(gains)
# avg_loss = np.mean(losses)
# # Calculate RSI
# if avg_loss == 0:
# return 100 # Maximum RSI
# rs = avg_gain / avg_loss
# rsi = 100 - (100 / (1 + rs))
# return rsi
# def TrainModel(self):
# """
# Train Random Forest model monthly
# Following GKX methodology
# """
# try:
# symbols = list(self.ActiveSecurities.Keys)
# # Limit to stocks only (exclude indices, etc.)
# symbols = [s for s in symbols if self.Securities[s].Type == SecurityType.Equity]
# X_train = []
# y_train = []
# valid_symbols = []
# # Collect training data (limit to 100 for speed)
# for symbol in symbols[:100]:
# features = self.CalculateFeatures(symbol)
# if features is not None:
# # Get next month's return as target
# future_history = self.History(symbol, 22, Resolution.Daily)
# if not future_history.empty and len(future_history) >= 2:
# # Handle multi-index if necessary
# if isinstance(future_history.index, pd.MultiIndex):
# if symbol in future_history.index.get_level_values('symbol'):
# future_history = future_history.loc[symbol]
# else:
# continue
# # Get future prices
# if 'close' in future_history.columns:
# future_prices = future_history['close'].values
# else:
# continue
# if len(future_prices) >= 2:
# # Calculate forward return (return over next month)
# future_return = (future_prices[-1] / future_prices[0] - 1)
# X_train.append(list(features.values()))
# y_train.append(future_return)
# valid_symbols.append(symbol)
# # Train model if we have enough data
# if len(X_train) >= 30:
# # Convert to numpy arrays
# X_train = np.array(X_train)
# y_train = np.array(y_train)
# # Train Random Forest
# self.rf_model.fit(X_train, y_train)
# self.model_trained = True
# self.Debug(f"Model trained with {len(X_train)} samples")
# # Feature importance
# feature_names = list(features.keys()) if features else []
# if feature_names and hasattr(self.rf_model, 'feature_importances_'):
# importances = self.rf_model.feature_importances_
# top_features_idx = np.argsort(importances)[-5:] # Top 5 features
# self.Debug("Top 5 important features:")
# for idx in top_features_idx:
# if idx < len(feature_names):
# self.Debug(f" {feature_names[idx]}: {importances[idx]:.4f}")
# return True
# else:
# self.Debug(f"Insufficient training data: {len(X_train)} samples")
# return False
# except Exception as e:
# self.Debug(f"Error in TrainModel: {str(e)}")
# return False
# def OnData(self, data):
# """
# Monthly rebalancing based on predicted returns
# """
# # Only rebalance if we're not warming up
# if self.IsWarmingUp:
# return
# self.days_since_training += 1
# # Retrain and rebalance monthly
# if self.days_since_training >= self.retrain_frequency:
# if self.TrainModel():
# self.days_since_training = 0
# # Make predictions for all stocks
# predictions = {}
# for symbol in self.ActiveSecurities.Keys:
# # Skip if not equity
# if self.Securities[symbol].Type != SecurityType.Equity:
# continue
# features = self.CalculateFeatures(symbol)
# if features is not None:
# X = [list(features.values())]
# try:
# predicted_return = self.rf_model.predict(X)[0]
# predictions[symbol] = predicted_return
# except Exception as e:
# self.Debug(f"Prediction error for {symbol}: {str(e)}")
# continue
# # Portfolio construction: Long top decile, Short bottom decile
# if len(predictions) >= 20:
# sorted_stocks = sorted(predictions.items(), key=lambda x: x[1])
# # Clear existing positions
# self.Liquidate()
# # Long positions (top 10%)
# n_long = max(len(sorted_stocks) // 10, 1)
# long_weight = 0.5 / n_long if n_long > 0 else 0
# for symbol, pred_return in sorted_stocks[-n_long:]:
# if pred_return > 0: # Only long if positive predicted return
# self.SetHoldings(symbol, long_weight)
# self.Debug(f"Long {symbol}: predicted return {pred_return:.4f}")
# # Short positions (bottom 10%)
# n_short = max(len(sorted_stocks) // 10, 1)
# short_weight = 0.5 / n_short if n_short > 0 else 0
# for symbol, pred_return in sorted_stocks[:n_short]:
# if pred_return < 0: # Only short if negative predicted return
# self.SetHoldings(symbol, -short_weight)
# self.Debug(f"Short {symbol}: predicted return {pred_return:.4f}")
# # Log portfolio summary
# self.Debug(f"Portfolio updated: {n_long} longs, {n_short} shorts from {len(predictions)} predictions")
############################################################################################################
from AlgorithmImports import *
from sklearn.ensemble import RandomForestRegressor
import numpy as np
import pandas as pd
class GKXMachineLearningOptimized(QCAlgorithm):
"""
Optimized implementation of Gu, Kelly, Xiu (2020)
Key optimizations:
1. Smaller universe (100 stocks vs 500)
2. Batch history requests instead of per-stock calls
3. Scheduled monthly rebalancing instead of OnData checks
4. Reduced lookback period (126 days vs 252)
5. Feature caching
"""
def Initialize(self):
self.SetStartDate(2024, 1, 1)
self.SetEndDate(2025, 12, 1)
self.SetCash(1000000)
# Universe: Reduced to 100 stocks for speed
self.universe_size = 100
self.AddUniverse(self.CoarseSelectionFunction)
# Model settings - reduced lookback for speed
self.lookback = 126 # 6 months instead of 1 year
# Track if model is trained
self.model_trained = False
# Store current universe symbols
self.universe_symbols = []
# Initialize Random Forest model
self.rf_model = RandomForestRegressor(
n_estimators=50, # Reduced from 100 for speed
max_depth=5, # Reduced from 6
max_features=3,
random_state=42,
n_jobs=-1 # Use all CPU cores
)
# Schedule monthly rebalancing instead of checking every OnData
self.Schedule.On(
self.DateRules.MonthStart("SPY"),
self.TimeRules.AfterMarketOpen("SPY", 30),
self.Rebalance
)
# Add SPY for scheduling reference
self.AddEquity("SPY", Resolution.Daily)
# Warm up period
self.SetWarmUp(self.lookback)
def CoarseSelectionFunction(self, coarse):
"""Select top 100 liquid stocks (reduced from 500)"""
if self.IsWarmingUp:
return []
filtered = [x for x in coarse
if x.HasFundamentalData
and x.Price > 5
and x.DollarVolume > 5000000] # Increased min volume for better liquidity
sorted_by_volume = sorted(filtered, key=lambda x: x.DollarVolume, reverse=True)
self.universe_symbols = [x.Symbol for x in sorted_by_volume[:self.universe_size]]
return self.universe_symbols
def CalculateFeaturesForSymbol(self, symbol, history_df):
"""
Calculate features for a single symbol from pre-fetched history DataFrame
"""
features = {}
try:
# Get data for this symbol from the batch history
if isinstance(history_df.index, pd.MultiIndex):
if symbol not in history_df.index.get_level_values(0):
return None
df = history_df.loc[symbol]
else:
df = history_df
if len(df) < 21:
return None
# Check required columns
required_cols = ['close', 'volume', 'high', 'low']
for col in required_cols:
if col not in df.columns:
return None
prices = df['close'].values
volumes = df['volume'].values
highs = df['high'].values
lows = df['low'].values
n = len(prices)
# 1. MOMENTUM FEATURES
features['momentum_1m'] = (prices[-1] / prices[-21] - 1) if n > 21 else 0
features['momentum_3m'] = (prices[-1] / prices[-63] - 1) if n > 63 else 0
features['momentum_6m'] = (prices[-1] / prices[-126] - 1) if n > 126 else 0
# Skip month momentum
if n > 126:
features['momentum_6m_skip'] = (prices[-21] / prices[-126] - 1)
else:
features['momentum_6m_skip'] = 0
# 2. VOLATILITY FEATURES
returns = np.diff(prices) / prices[:-1]
features['volatility_1m'] = np.std(returns[-21:]) if len(returns) >= 21 else 0
features['volatility_3m'] = np.std(returns[-63:]) if len(returns) >= 63 else 0
# 3. VOLUME FEATURES
avg_vol = np.mean(volumes[-21:])
if avg_vol > 0:
features['volume'] = np.log(avg_vol)
features['volume_trend'] = (np.mean(volumes[-5:]) / avg_vol - 1)
else:
features['volume'] = 0
features['volume_trend'] = 0
# Dollar volume
features['dollar_volume'] = np.log(np.mean(prices[-21:] * volumes[-21:]) + 1)
# 4. FUNDAMENTAL FEATURES
if symbol in self.Securities and self.Securities[symbol].Fundamentals:
fund = self.Securities[symbol].Fundamentals
features['size'] = np.log(fund.MarketCap) if fund.MarketCap and fund.MarketCap > 0 else 0
if fund.ValuationRatios:
features['book_to_market'] = (fund.ValuationRatios.BookValuePerShare / prices[-1]
if fund.ValuationRatios.BookValuePerShare and prices[-1] > 0 else 0)
features['pe_ratio'] = fund.ValuationRatios.PERatio if fund.ValuationRatios.PERatio else 0
else:
features['book_to_market'] = 0
features['pe_ratio'] = 0
if fund.OperationRatios:
features['profitability'] = (fund.OperationRatios.ROE.Value
if fund.OperationRatios.ROE and fund.OperationRatios.ROE.Value else 0)
features['roa'] = (fund.OperationRatios.ROA.Value
if fund.OperationRatios.ROA and fund.OperationRatios.ROA.Value else 0)
else:
features['profitability'] = 0
features['roa'] = 0
else:
features['size'] = 0
features['book_to_market'] = 0
features['pe_ratio'] = 0
features['profitability'] = 0
features['roa'] = 0
# 5. TECHNICAL FEATURES - RSI
features['rsi'] = self.CalculateRSI(prices)
# Price relative to moving averages
features['price_to_sma20'] = (prices[-1] / np.mean(prices[-20:]) - 1) if n >= 20 else 0
features['price_to_sma50'] = (prices[-1] / np.mean(prices[-50:]) - 1) if n >= 50 else 0
# 6. MARKET MICROSTRUCTURE
features['bid_ask_spread'] = np.mean((highs - lows) / prices)
# Amihud illiquidity
returns_abs = np.abs(returns)
volumes_adj = volumes[1:] + 1
features['illiquidity'] = np.mean(returns_abs / volumes_adj) * 1e6
# Clean features
for key in features:
if np.isnan(features[key]) or np.isinf(features[key]):
features[key] = 0
return features
except Exception as e:
return None
def CalculateRSI(self, prices, period=14):
"""Calculate RSI indicator"""
if len(prices) < period + 1:
return 50
deltas = np.diff(prices[-period-1:])
gains = np.where(deltas > 0, deltas, 0)
losses = np.where(deltas < 0, -deltas, 0)
avg_gain = np.mean(gains)
avg_loss = np.mean(losses)
if avg_loss == 0:
return 100
rs = avg_gain / avg_loss
return 100 - (100 / (1 + rs))
def Rebalance(self):
"""
Monthly rebalancing with batch operations
"""
if self.IsWarmingUp:
return
# Get current universe symbols (equities only, exclude SPY benchmark)
symbols = [s for s in self.ActiveSecurities.Keys
if self.Securities[s].Type == SecurityType.Equity
and s.Value != "SPY"]
if len(symbols) < 20:
self.Debug(f"Not enough symbols: {len(symbols)}")
return
# BATCH HISTORY REQUEST - Key optimization!
# Single call for all symbols instead of one per symbol
try:
history = self.History(symbols, self.lookback, Resolution.Daily)
except Exception as e:
self.Debug(f"History request failed: {str(e)}")
return
if history.empty:
self.Debug("Empty history")
return
# Calculate features for all symbols
features_dict = {}
for symbol in symbols:
features = self.CalculateFeaturesForSymbol(symbol, history)
if features is not None:
features_dict[symbol] = features
if len(features_dict) < 30:
self.Debug(f"Not enough valid features: {len(features_dict)}")
return
# Prepare training data
# Use last month's return as proxy (in production, you'd use proper lagged data)
X_train = []
y_train = []
valid_symbols = []
for symbol, features in features_dict.items():
# Use 1-month momentum as a proxy for "what the model should have predicted"
# This is a simplification - proper implementation would use lagged features
target = features['momentum_1m']
X_train.append(list(features.values()))
y_train.append(target)
valid_symbols.append(symbol)
if len(X_train) < 30:
return
# Train model
X_train = np.array(X_train)
y_train = np.array(y_train)
try:
self.rf_model.fit(X_train, y_train)
self.model_trained = True
except Exception as e:
self.Debug(f"Training failed: {str(e)}")
return
# Make predictions
predictions = {}
for symbol in valid_symbols:
features = features_dict[symbol]
X = [list(features.values())]
try:
pred = self.rf_model.predict(X)[0]
predictions[symbol] = pred
except:
continue
if len(predictions) < 20:
return
# Portfolio construction
sorted_stocks = sorted(predictions.items(), key=lambda x: x[1])
# Liquidate all
self.Liquidate()
# Long top 10%
n_long = max(len(sorted_stocks) // 10, 1)
long_weight = 0.5 / n_long
for symbol, pred_return in sorted_stocks[-n_long:]:
if pred_return > 0:
self.SetHoldings(symbol, long_weight)
# Short bottom 10%
n_short = max(len(sorted_stocks) // 10, 1)
short_weight = 0.5 / n_short
for symbol, pred_return in sorted_stocks[:n_short]:
if pred_return < 0:
self.SetHoldings(symbol, -short_weight)
self.Debug(f"Rebalanced: {n_long} longs, {n_short} shorts from {len(predictions)} stocks")
def OnData(self, data):
"""
OnData is now minimal - all logic moved to scheduled Rebalance
"""
pass